Skip to main content

alien_bindings/providers/queue/
gcp_pubsub.rs

1use crate::error::{ErrorData, Result};
2use crate::traits::{
3    Binding, MessagePayload, Queue, QueueMessage, MAX_BATCH_SIZE, MAX_MESSAGE_BYTES,
4};
5use alien_error::{Context, IntoAlienError};
6use alien_gcp_clients::pubsub::{
7    AcknowledgeRequest, ModifyAckDeadlineRequest, PubSubApi, PubSubClient, PublishRequest,
8    PubsubMessage, PullRequest,
9};
10use async_trait::async_trait;
11use base64::prelude::*;
12use std::fmt::{Debug, Formatter};
13
14pub struct GcpPubSubQueue {
15    topic: String,
16    subscription: String,
17    client: PubSubClient,
18}
19
20impl Debug for GcpPubSubQueue {
21    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
22        f.debug_struct("GcpPubSubQueue")
23            .field("topic", &self.topic)
24            .field("subscription", &self.subscription)
25            .finish()
26    }
27}
28
29impl GcpPubSubQueue {
30    pub async fn new(
31        topic: String,
32        subscription: String,
33        gcp_config: alien_gcp_clients::GcpClientConfig,
34    ) -> Result<Self> {
35        Ok(Self {
36            topic,
37            subscription,
38            client: PubSubClient::new(crate::http_client::create_http_client(), gcp_config),
39        })
40    }
41}
42
43impl Binding for GcpPubSubQueue {}
44
45#[async_trait]
46impl Queue for GcpPubSubQueue {
47    async fn send(&self, _queue: &str, message: MessagePayload) -> Result<()> {
48        let data = match message {
49            MessagePayload::Json(v) => serde_json::to_vec(&v).into_alien_error().context(
50                ErrorData::BindingSetupFailed {
51                    binding_type: "queue.pubsub".to_string(),
52                    reason: "Failed to serialize JSON payload".to_string(),
53                },
54            )?,
55            MessagePayload::Text(s) => s.into_bytes(),
56        };
57
58        // Client-side validation: check message size
59        if data.len() > MAX_MESSAGE_BYTES {
60            return Err(alien_error::AlienError::new(
61                ErrorData::BindingSetupFailed {
62                    binding_type: "queue.pubsub".to_string(),
63                    reason: format!(
64                        "Message size {} bytes exceeds limit of {} bytes",
65                        data.len(),
66                        MAX_MESSAGE_BYTES
67                    ),
68                },
69            ));
70        }
71        let msg = PubsubMessage {
72            data: Some(base64::prelude::BASE64_STANDARD.encode(data)),
73            attributes: None,
74            message_id: None,
75            publish_time: None,
76            ordering_key: None,
77        };
78        let req = PublishRequest {
79            messages: vec![msg],
80        };
81        self.client
82            .publish(self.topic.clone(), req)
83            .await
84            .map(|_| ())
85            .context(ErrorData::BindingSetupFailed {
86                binding_type: "queue.pubsub".to_string(),
87                reason: "Failed to publish".to_string(),
88            })
89    }
90
91    async fn receive(&self, _queue: &str, max_messages: usize) -> Result<Vec<QueueMessage>> {
92        // Client-side validation: check batch size
93        if max_messages == 0 || max_messages > MAX_BATCH_SIZE {
94            return Err(alien_error::AlienError::new(
95                ErrorData::BindingSetupFailed {
96                    binding_type: "queue.pubsub".to_string(),
97                    reason: format!(
98                        "Batch size {} is invalid. Must be between 1 and {}",
99                        max_messages, MAX_BATCH_SIZE
100                    ),
101                },
102            ));
103        }
104
105        let req = PullRequest {
106            max_messages: Some(std::cmp::min(max_messages, MAX_BATCH_SIZE) as i32),
107            return_immediately: None,
108            allow_excess_messages: None,
109        };
110
111        let response = self
112            .client
113            .pull(self.subscription.clone(), req)
114            .await
115            .context(ErrorData::BindingSetupFailed {
116                binding_type: "queue.pubsub".to_string(),
117                reason: "Failed to pull messages".to_string(),
118            })?;
119
120        // Set ack deadline to 30s for all received messages
121        if !response.received_messages.is_empty() {
122            let ack_ids: Vec<String> = response
123                .received_messages
124                .iter()
125                .map(|msg| msg.ack_id.clone())
126                .collect();
127
128            let modify_req = ModifyAckDeadlineRequest {
129                ack_ids,
130                ack_deadline_seconds: 30,
131            };
132
133            let _ = self
134                .client
135                .modify_ack_deadline(self.subscription.clone(), modify_req)
136                .await;
137        }
138
139        let messages = response
140            .received_messages
141            .into_iter()
142            .filter_map(|received_msg| {
143                let message = received_msg.message;
144                let raw_data = message.data.unwrap_or_default();
145                let data = base64::prelude::BASE64_STANDARD.decode(&raw_data).ok()?;
146                let raw = String::from_utf8_lossy(&data).into_owned();
147                let payload = serde_json::from_str::<serde_json::Value>(&raw)
148                    .map(MessagePayload::Json)
149                    .unwrap_or_else(|_| MessagePayload::Text(raw));
150                Some(QueueMessage {
151                    payload,
152                    receipt_handle: received_msg.ack_id,
153                })
154            })
155            .collect();
156
157        Ok(messages)
158    }
159
160    async fn ack(&self, _queue: &str, receipt_handle: &str) -> Result<()> {
161        let req = AcknowledgeRequest {
162            ack_ids: vec![receipt_handle.to_string()],
163        };
164
165        self.client
166            .acknowledge(self.subscription.clone(), req)
167            .await
168            .context(ErrorData::BindingSetupFailed {
169                binding_type: "queue.pubsub".to_string(),
170                reason: "Failed to acknowledge message".to_string(),
171            })
172    }
173}