alien_bindings/providers/queue/
grpc.rs1use crate::error::{ErrorData, Result};
2use crate::traits::{Binding, MessagePayload, Queue, QueueMessage};
3use alien_error::{Context as _, IntoAlienError as _};
4use async_trait::async_trait;
5use std::fmt::{Debug, Formatter};
6use tonic::transport::Channel;
7
8pub mod proto {
10 tonic::include_proto!("alien_bindings.queue");
11}
12
13use proto::{
14 queue_service_client::QueueServiceClient, AckRequest, MessagePayload as ProtoMessagePayload,
15 ReceiveRequest, SendRequest,
16};
17
18pub struct GrpcQueue {
20 client: QueueServiceClient<Channel>,
21 binding_name: String,
22}
23
24impl Debug for GrpcQueue {
25 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
26 f.debug_struct("GrpcQueue")
27 .field("binding_name", &self.binding_name)
28 .finish()
29 }
30}
31
32impl GrpcQueue {
33 pub async fn new(binding_name: String, grpc_endpoint: String) -> Result<Self> {
35 let channel = crate::providers::grpc_provider::create_grpc_channel(grpc_endpoint).await?;
36 Self::new_from_channel(channel, binding_name).await
37 }
38
39 pub async fn new_from_channel(channel: Channel, binding_name: String) -> Result<Self> {
41 let client = QueueServiceClient::new(channel);
42
43 Ok(Self {
44 client,
45 binding_name,
46 })
47 }
48
49 fn message_payload_to_proto(payload: MessagePayload) -> ProtoMessagePayload {
51 let proto_payload = match payload {
52 MessagePayload::Json(value) => {
53 let json_str = serde_json::to_string(&value).unwrap_or_else(|_| "{}".to_string());
55 proto::message_payload::Payload::Json(json_str)
56 }
57 MessagePayload::Text(text) => proto::message_payload::Payload::Text(text),
58 };
59
60 ProtoMessagePayload {
61 payload: Some(proto_payload),
62 }
63 }
64
65 fn proto_to_queue_message(proto_msg: proto::QueueMessage) -> Result<QueueMessage> {
67 let payload = proto_msg.payload.ok_or_else(|| {
68 alien_error::AlienError::new(ErrorData::InvalidInput {
69 operation_context: "Queue message deserialization".to_string(),
70 details: "Queue message payload is missing".to_string(),
71 field_name: Some("payload".to_string()),
72 })
73 })?;
74
75 let message_payload = match payload.payload {
76 Some(proto::message_payload::Payload::Json(json_str)) => {
77 let json_value: serde_json::Value = serde_json::from_str(&json_str)
78 .into_alien_error()
79 .context(ErrorData::InvalidInput {
80 operation_context: "Queue message payload parsing".to_string(),
81 details: format!("Invalid JSON payload in queue message: {}", json_str),
82 field_name: Some("payload.json".to_string()),
83 })?;
84 MessagePayload::Json(json_value)
85 }
86 Some(proto::message_payload::Payload::Text(text)) => MessagePayload::Text(text),
87 None => {
88 return Err(alien_error::AlienError::new(ErrorData::InvalidInput {
89 operation_context: "Queue message payload parsing".to_string(),
90 details: "Queue message payload type not specified".to_string(),
91 field_name: Some("payload.payload".to_string()),
92 }));
93 }
94 };
95
96 Ok(QueueMessage {
97 payload: message_payload,
98 receipt_handle: proto_msg.receipt_handle,
99 })
100 }
101}
102
103impl Binding for GrpcQueue {}
104
105#[async_trait]
106impl Queue for GrpcQueue {
107 async fn send(&self, queue: &str, message: MessagePayload) -> Result<()> {
108 let mut client = self.client.clone();
109
110 let request = tonic::Request::new(SendRequest {
111 binding_name: self.binding_name.clone(),
112 queue: queue.to_string(),
113 message: Some(Self::message_payload_to_proto(message)),
114 });
115
116 client
117 .send(request)
118 .await
119 .into_alien_error()
120 .context(ErrorData::GrpcRequestFailed {
121 service: "QueueService".to_string(),
122 method: "send".to_string(),
123 details: format!("Failed to send message to queue: {}", queue),
124 })?;
125
126 Ok(())
127 }
128
129 async fn receive(&self, queue: &str, max_messages: usize) -> Result<Vec<QueueMessage>> {
130 let mut client = self.client.clone();
131
132 let request = tonic::Request::new(ReceiveRequest {
133 binding_name: self.binding_name.clone(),
134 queue: queue.to_string(),
135 max_messages: max_messages as u32,
136 });
137
138 let response = client.receive(request).await.into_alien_error().context(
139 ErrorData::GrpcRequestFailed {
140 service: "QueueService".to_string(),
141 method: "receive".to_string(),
142 details: format!("Failed to receive messages from queue: {}", queue),
143 },
144 )?;
145
146 let proto_messages = response.into_inner().messages;
147 let messages: Result<Vec<QueueMessage>> = proto_messages
148 .into_iter()
149 .map(Self::proto_to_queue_message)
150 .collect();
151
152 messages
153 }
154
155 async fn ack(&self, queue: &str, receipt_handle: &str) -> Result<()> {
156 let mut client = self.client.clone();
157
158 let request = tonic::Request::new(AckRequest {
159 binding_name: self.binding_name.clone(),
160 queue: queue.to_string(),
161 receipt_handle: receipt_handle.to_string(),
162 });
163
164 client
165 .ack(request)
166 .await
167 .into_alien_error()
168 .context(ErrorData::GrpcRequestFailed {
169 service: "QueueService".to_string(),
170 method: "ack".to_string(),
171 details: format!("Failed to acknowledge message in queue: {}", queue),
172 })?;
173
174 Ok(())
175 }
176}