alien_bindings/providers/queue/
gcp_pubsub.rs1use crate::error::{ErrorData, Result};
2use crate::traits::{
3 Binding, MessagePayload, Queue, QueueMessage, MAX_BATCH_SIZE, MAX_MESSAGE_BYTES,
4};
5use alien_error::{Context, IntoAlienError};
6use alien_gcp_clients::pubsub::{
7 AcknowledgeRequest, ModifyAckDeadlineRequest, PubSubApi, PubSubClient, PublishRequest,
8 PubsubMessage, PullRequest,
9};
10use async_trait::async_trait;
11use base64::prelude::*;
12use std::fmt::{Debug, Formatter};
13
14pub struct GcpPubSubQueue {
15 topic: String,
16 subscription: String,
17 client: PubSubClient,
18}
19
20impl Debug for GcpPubSubQueue {
21 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
22 f.debug_struct("GcpPubSubQueue")
23 .field("topic", &self.topic)
24 .field("subscription", &self.subscription)
25 .finish()
26 }
27}
28
29impl GcpPubSubQueue {
30 pub async fn new(
31 topic: String,
32 subscription: String,
33 gcp_config: alien_gcp_clients::GcpClientConfig,
34 ) -> Result<Self> {
35 Ok(Self {
36 topic,
37 subscription,
38 client: PubSubClient::new(crate::http_client::create_http_client(), gcp_config),
39 })
40 }
41}
42
43impl Binding for GcpPubSubQueue {}
44
45#[async_trait]
46impl Queue for GcpPubSubQueue {
47 async fn send(&self, _queue: &str, message: MessagePayload) -> Result<()> {
48 let data = match message {
49 MessagePayload::Json(v) => serde_json::to_vec(&v).into_alien_error().context(
50 ErrorData::BindingSetupFailed {
51 binding_type: "queue.pubsub".to_string(),
52 reason: "Failed to serialize JSON payload".to_string(),
53 },
54 )?,
55 MessagePayload::Text(s) => s.into_bytes(),
56 };
57
58 if data.len() > MAX_MESSAGE_BYTES {
60 return Err(alien_error::AlienError::new(
61 ErrorData::BindingSetupFailed {
62 binding_type: "queue.pubsub".to_string(),
63 reason: format!(
64 "Message size {} bytes exceeds limit of {} bytes",
65 data.len(),
66 MAX_MESSAGE_BYTES
67 ),
68 },
69 ));
70 }
71 let msg = PubsubMessage {
72 data: Some(base64::prelude::BASE64_STANDARD.encode(data)),
73 attributes: None,
74 message_id: None,
75 publish_time: None,
76 ordering_key: None,
77 };
78 let req = PublishRequest {
79 messages: vec![msg],
80 };
81 self.client
82 .publish(self.topic.clone(), req)
83 .await
84 .map(|_| ())
85 .context(ErrorData::BindingSetupFailed {
86 binding_type: "queue.pubsub".to_string(),
87 reason: "Failed to publish".to_string(),
88 })
89 }
90
91 async fn receive(&self, _queue: &str, max_messages: usize) -> Result<Vec<QueueMessage>> {
92 if max_messages == 0 || max_messages > MAX_BATCH_SIZE {
94 return Err(alien_error::AlienError::new(
95 ErrorData::BindingSetupFailed {
96 binding_type: "queue.pubsub".to_string(),
97 reason: format!(
98 "Batch size {} is invalid. Must be between 1 and {}",
99 max_messages, MAX_BATCH_SIZE
100 ),
101 },
102 ));
103 }
104
105 let req = PullRequest {
106 max_messages: Some(std::cmp::min(max_messages, MAX_BATCH_SIZE) as i32),
107 return_immediately: None,
108 allow_excess_messages: None,
109 };
110
111 let response = self
112 .client
113 .pull(self.subscription.clone(), req)
114 .await
115 .context(ErrorData::BindingSetupFailed {
116 binding_type: "queue.pubsub".to_string(),
117 reason: "Failed to pull messages".to_string(),
118 })?;
119
120 if !response.received_messages.is_empty() {
122 let ack_ids: Vec<String> = response
123 .received_messages
124 .iter()
125 .map(|msg| msg.ack_id.clone())
126 .collect();
127
128 let modify_req = ModifyAckDeadlineRequest {
129 ack_ids,
130 ack_deadline_seconds: 30,
131 };
132
133 let _ = self
134 .client
135 .modify_ack_deadline(self.subscription.clone(), modify_req)
136 .await;
137 }
138
139 let messages = response
140 .received_messages
141 .into_iter()
142 .filter_map(|received_msg| {
143 let message = received_msg.message;
144 let raw_data = message.data.unwrap_or_default();
145 let data = base64::prelude::BASE64_STANDARD.decode(&raw_data).ok()?;
146 let raw = String::from_utf8_lossy(&data).into_owned();
147 let payload = serde_json::from_str::<serde_json::Value>(&raw)
148 .map(MessagePayload::Json)
149 .unwrap_or_else(|_| MessagePayload::Text(raw));
150 Some(QueueMessage {
151 payload,
152 receipt_handle: received_msg.ack_id,
153 })
154 })
155 .collect();
156
157 Ok(messages)
158 }
159
160 async fn ack(&self, _queue: &str, receipt_handle: &str) -> Result<()> {
161 let req = AcknowledgeRequest {
162 ack_ids: vec![receipt_handle.to_string()],
163 };
164
165 self.client
166 .acknowledge(self.subscription.clone(), req)
167 .await
168 .context(ErrorData::BindingSetupFailed {
169 binding_type: "queue.pubsub".to_string(),
170 reason: "Failed to acknowledge message".to_string(),
171 })
172 }
173}