Skip to main content

alien_bindings/providers/queue/
gcp_pubsub.rs

1use crate::error::{ErrorData, Result};
2use crate::traits::{
3    Binding, MessagePayload, Queue, QueueMessage, MAX_BATCH_SIZE, MAX_MESSAGE_BYTES,
4};
5use alien_error::{Context, IntoAlienError};
6use alien_gcp_clients::pubsub::{
7    AcknowledgeRequest, ModifyAckDeadlineRequest, PubSubApi, PubSubClient, PublishRequest,
8    PubsubMessage, PullRequest,
9};
10use async_trait::async_trait;
11use base64::prelude::*;
12use std::fmt::{Debug, Formatter};
13
14pub struct GcpPubSubQueue {
15    topic: String,
16    subscription: String,
17    client: PubSubClient,
18}
19
20impl Debug for GcpPubSubQueue {
21    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
22        f.debug_struct("GcpPubSubQueue")
23            .field("topic", &self.topic)
24            .field("subscription", &self.subscription)
25            .finish()
26    }
27}
28
29impl GcpPubSubQueue {
30    pub async fn new(
31        topic: String,
32        subscription: String,
33        gcp_config: alien_gcp_clients::GcpClientConfig,
34    ) -> Result<Self> {
35        Ok(Self {
36            topic,
37            subscription,
38            client: PubSubClient::new(crate::http_client::create_http_client(), gcp_config),
39        })
40    }
41}
42
43impl Binding for GcpPubSubQueue {}
44
45#[async_trait]
46impl Queue for GcpPubSubQueue {
47    async fn send(&self, _queue: &str, message: MessagePayload) -> Result<()> {
48        let data = match message {
49            MessagePayload::Json(v) => serde_json::to_vec(&v).into_alien_error().context(
50                ErrorData::BindingSetupFailed {
51                    binding_type: "queue.pubsub".to_string(),
52                    reason: "Failed to serialize JSON payload".to_string(),
53                },
54            )?,
55            MessagePayload::Text(s) => s.into_bytes(),
56        };
57
58        // Client-side validation: check message size
59        if data.len() > MAX_MESSAGE_BYTES {
60            return Err(alien_error::AlienError::new(
61                ErrorData::BindingSetupFailed {
62                    binding_type: "queue.pubsub".to_string(),
63                    reason: format!(
64                        "Message size {} bytes exceeds limit of {} bytes",
65                        data.len(),
66                        MAX_MESSAGE_BYTES
67                    ),
68                },
69            ));
70        }
71        let msg = PubsubMessage {
72            data: Some(base64::prelude::BASE64_STANDARD.encode(data)),
73            attributes: None,
74            message_id: None,
75            publish_time: None,
76            ordering_key: None,
77        };
78        let req = PublishRequest {
79            messages: vec![msg],
80        };
81        self.client
82            .publish(self.topic.clone(), req)
83            .await
84            .map(|_| ())
85            .map_err(|e| {
86                let reason = format!("Failed to publish to topic '{}': {}", self.topic, e);
87                tracing::error!(%reason, "PubSub publish failed");
88                alien_error::AlienError::new(ErrorData::BindingSetupFailed {
89                    binding_type: "queue.pubsub".to_string(),
90                    reason,
91                })
92            })
93    }
94
95    async fn receive(&self, _queue: &str, max_messages: usize) -> Result<Vec<QueueMessage>> {
96        // Client-side validation: check batch size
97        if max_messages == 0 || max_messages > MAX_BATCH_SIZE {
98            return Err(alien_error::AlienError::new(
99                ErrorData::BindingSetupFailed {
100                    binding_type: "queue.pubsub".to_string(),
101                    reason: format!(
102                        "Batch size {} is invalid. Must be between 1 and {}",
103                        max_messages, MAX_BATCH_SIZE
104                    ),
105                },
106            ));
107        }
108
109        let req = PullRequest {
110            max_messages: Some(std::cmp::min(max_messages, MAX_BATCH_SIZE) as i32),
111            return_immediately: None,
112            allow_excess_messages: None,
113        };
114
115        let response = self
116            .client
117            .pull(self.subscription.clone(), req)
118            .await
119            .context(ErrorData::BindingSetupFailed {
120                binding_type: "queue.pubsub".to_string(),
121                reason: "Failed to pull messages".to_string(),
122            })?;
123
124        // Set ack deadline to 30s for all received messages
125        if !response.received_messages.is_empty() {
126            let ack_ids: Vec<String> = response
127                .received_messages
128                .iter()
129                .map(|msg| msg.ack_id.clone())
130                .collect();
131
132            let modify_req = ModifyAckDeadlineRequest {
133                ack_ids,
134                ack_deadline_seconds: 30,
135            };
136
137            let _ = self
138                .client
139                .modify_ack_deadline(self.subscription.clone(), modify_req)
140                .await;
141        }
142
143        let messages = response
144            .received_messages
145            .into_iter()
146            .filter_map(|received_msg| {
147                let message = received_msg.message;
148                let raw_data = message.data.unwrap_or_default();
149                let data = base64::prelude::BASE64_STANDARD.decode(&raw_data).ok()?;
150                let raw = String::from_utf8_lossy(&data).into_owned();
151                let payload = serde_json::from_str::<serde_json::Value>(&raw)
152                    .map(MessagePayload::Json)
153                    .unwrap_or_else(|_| MessagePayload::Text(raw));
154                Some(QueueMessage {
155                    payload,
156                    receipt_handle: received_msg.ack_id,
157                })
158            })
159            .collect();
160
161        Ok(messages)
162    }
163
164    async fn ack(&self, _queue: &str, receipt_handle: &str) -> Result<()> {
165        let req = AcknowledgeRequest {
166            ack_ids: vec![receipt_handle.to_string()],
167        };
168
169        self.client
170            .acknowledge(self.subscription.clone(), req)
171            .await
172            .context(ErrorData::BindingSetupFailed {
173                binding_type: "queue.pubsub".to_string(),
174                reason: "Failed to acknowledge message".to_string(),
175            })
176    }
177}