alien_bindings/providers/queue/
gcp_pubsub.rs1use crate::error::{ErrorData, Result};
2use crate::traits::{
3 Binding, MessagePayload, Queue, QueueMessage, MAX_BATCH_SIZE, MAX_MESSAGE_BYTES,
4};
5use alien_error::{Context, IntoAlienError};
6use alien_gcp_clients::pubsub::{
7 AcknowledgeRequest, ModifyAckDeadlineRequest, PubSubApi, PubSubClient, PublishRequest,
8 PubsubMessage, PullRequest,
9};
10use async_trait::async_trait;
11use base64::prelude::*;
12use std::fmt::{Debug, Formatter};
13
14pub struct GcpPubSubQueue {
15 topic: String,
16 subscription: String,
17 client: PubSubClient,
18}
19
20impl Debug for GcpPubSubQueue {
21 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
22 f.debug_struct("GcpPubSubQueue")
23 .field("topic", &self.topic)
24 .field("subscription", &self.subscription)
25 .finish()
26 }
27}
28
29impl GcpPubSubQueue {
30 pub async fn new(
31 topic: String,
32 subscription: String,
33 gcp_config: alien_gcp_clients::GcpClientConfig,
34 ) -> Result<Self> {
35 Ok(Self {
36 topic,
37 subscription,
38 client: PubSubClient::new(crate::http_client::create_http_client(), gcp_config),
39 })
40 }
41}
42
43impl Binding for GcpPubSubQueue {}
44
45#[async_trait]
46impl Queue for GcpPubSubQueue {
47 async fn send(&self, _queue: &str, message: MessagePayload) -> Result<()> {
48 let data = match message {
49 MessagePayload::Json(v) => serde_json::to_vec(&v).into_alien_error().context(
50 ErrorData::BindingSetupFailed {
51 binding_type: "queue.pubsub".to_string(),
52 reason: "Failed to serialize JSON payload".to_string(),
53 },
54 )?,
55 MessagePayload::Text(s) => s.into_bytes(),
56 };
57
58 if data.len() > MAX_MESSAGE_BYTES {
60 return Err(alien_error::AlienError::new(
61 ErrorData::BindingSetupFailed {
62 binding_type: "queue.pubsub".to_string(),
63 reason: format!(
64 "Message size {} bytes exceeds limit of {} bytes",
65 data.len(),
66 MAX_MESSAGE_BYTES
67 ),
68 },
69 ));
70 }
71 let msg = PubsubMessage {
72 data: Some(base64::prelude::BASE64_STANDARD.encode(data)),
73 attributes: None,
74 message_id: None,
75 publish_time: None,
76 ordering_key: None,
77 };
78 let req = PublishRequest {
79 messages: vec![msg],
80 };
81 self.client
82 .publish(self.topic.clone(), req)
83 .await
84 .map(|_| ())
85 .map_err(|e| {
86 let reason = format!("Failed to publish to topic '{}': {}", self.topic, e);
87 tracing::error!(%reason, "PubSub publish failed");
88 alien_error::AlienError::new(ErrorData::BindingSetupFailed {
89 binding_type: "queue.pubsub".to_string(),
90 reason,
91 })
92 })
93 }
94
95 async fn receive(&self, _queue: &str, max_messages: usize) -> Result<Vec<QueueMessage>> {
96 if max_messages == 0 || max_messages > MAX_BATCH_SIZE {
98 return Err(alien_error::AlienError::new(
99 ErrorData::BindingSetupFailed {
100 binding_type: "queue.pubsub".to_string(),
101 reason: format!(
102 "Batch size {} is invalid. Must be between 1 and {}",
103 max_messages, MAX_BATCH_SIZE
104 ),
105 },
106 ));
107 }
108
109 let req = PullRequest {
110 max_messages: Some(std::cmp::min(max_messages, MAX_BATCH_SIZE) as i32),
111 return_immediately: None,
112 allow_excess_messages: None,
113 };
114
115 let response = self
116 .client
117 .pull(self.subscription.clone(), req)
118 .await
119 .context(ErrorData::BindingSetupFailed {
120 binding_type: "queue.pubsub".to_string(),
121 reason: "Failed to pull messages".to_string(),
122 })?;
123
124 if !response.received_messages.is_empty() {
126 let ack_ids: Vec<String> = response
127 .received_messages
128 .iter()
129 .map(|msg| msg.ack_id.clone())
130 .collect();
131
132 let modify_req = ModifyAckDeadlineRequest {
133 ack_ids,
134 ack_deadline_seconds: 30,
135 };
136
137 let _ = self
138 .client
139 .modify_ack_deadline(self.subscription.clone(), modify_req)
140 .await;
141 }
142
143 let messages = response
144 .received_messages
145 .into_iter()
146 .filter_map(|received_msg| {
147 let message = received_msg.message;
148 let raw_data = message.data.unwrap_or_default();
149 let data = base64::prelude::BASE64_STANDARD.decode(&raw_data).ok()?;
150 let raw = String::from_utf8_lossy(&data).into_owned();
151 let payload = serde_json::from_str::<serde_json::Value>(&raw)
152 .map(MessagePayload::Json)
153 .unwrap_or_else(|_| MessagePayload::Text(raw));
154 Some(QueueMessage {
155 payload,
156 receipt_handle: received_msg.ack_id,
157 })
158 })
159 .collect();
160
161 Ok(messages)
162 }
163
164 async fn ack(&self, _queue: &str, receipt_handle: &str) -> Result<()> {
165 let req = AcknowledgeRequest {
166 ack_ids: vec![receipt_handle.to_string()],
167 };
168
169 self.client
170 .acknowledge(self.subscription.clone(), req)
171 .await
172 .context(ErrorData::BindingSetupFailed {
173 binding_type: "queue.pubsub".to_string(),
174 reason: "Failed to acknowledge message".to_string(),
175 })
176 }
177}