Skip to main content

alien_bindings/providers/queue/
grpc.rs

1use crate::error::{ErrorData, Result};
2use crate::traits::{Binding, MessagePayload, Queue, QueueMessage};
3use alien_error::{Context as _, IntoAlienError as _};
4use async_trait::async_trait;
5use std::fmt::{Debug, Formatter};
6use tonic::transport::Channel;
7
8// Import generated protobuf types
9pub mod proto {
10    tonic::include_proto!("alien_bindings.queue");
11}
12
13use proto::{
14    queue_service_client::QueueServiceClient, AckRequest, MessagePayload as ProtoMessagePayload,
15    ReceiveRequest, SendRequest,
16};
17
18/// gRPC-based Queue implementation that forwards calls to a remote Queue service
19pub struct GrpcQueue {
20    client: QueueServiceClient<Channel>,
21    binding_name: String,
22}
23
24impl Debug for GrpcQueue {
25    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
26        f.debug_struct("GrpcQueue")
27            .field("binding_name", &self.binding_name)
28            .finish()
29    }
30}
31
32impl GrpcQueue {
33    /// Create a new gRPC Queue client
34    pub async fn new(binding_name: String, grpc_endpoint: String) -> Result<Self> {
35        let channel = crate::providers::grpc_provider::create_grpc_channel(grpc_endpoint).await?;
36        Self::new_from_channel(channel, binding_name).await
37    }
38
39    /// Create a new gRPC Queue client from an existing channel
40    pub async fn new_from_channel(channel: Channel, binding_name: String) -> Result<Self> {
41        let client = QueueServiceClient::new(channel);
42
43        Ok(Self {
44            client,
45            binding_name,
46        })
47    }
48
49    /// Convert MessagePayload to proto MessagePayload
50    fn message_payload_to_proto(payload: MessagePayload) -> ProtoMessagePayload {
51        let proto_payload = match payload {
52            MessagePayload::Json(value) => {
53                // Convert JSON Value to string
54                let json_str = serde_json::to_string(&value).unwrap_or_else(|_| "{}".to_string());
55                proto::message_payload::Payload::Json(json_str)
56            }
57            MessagePayload::Text(text) => proto::message_payload::Payload::Text(text),
58        };
59
60        ProtoMessagePayload {
61            payload: Some(proto_payload),
62        }
63    }
64
65    /// Convert proto QueueMessage to QueueMessage
66    fn proto_to_queue_message(proto_msg: proto::QueueMessage) -> Result<QueueMessage> {
67        let payload = proto_msg.payload.ok_or_else(|| {
68            alien_error::AlienError::new(ErrorData::InvalidInput {
69                operation_context: "Queue message deserialization".to_string(),
70                details: "Queue message payload is missing".to_string(),
71                field_name: Some("payload".to_string()),
72            })
73        })?;
74
75        let message_payload = match payload.payload {
76            Some(proto::message_payload::Payload::Json(json_str)) => {
77                let json_value: serde_json::Value = serde_json::from_str(&json_str)
78                    .into_alien_error()
79                    .context(ErrorData::InvalidInput {
80                        operation_context: "Queue message payload parsing".to_string(),
81                        details: format!("Invalid JSON payload in queue message: {}", json_str),
82                        field_name: Some("payload.json".to_string()),
83                    })?;
84                MessagePayload::Json(json_value)
85            }
86            Some(proto::message_payload::Payload::Text(text)) => MessagePayload::Text(text),
87            None => {
88                return Err(alien_error::AlienError::new(ErrorData::InvalidInput {
89                    operation_context: "Queue message payload parsing".to_string(),
90                    details: "Queue message payload type not specified".to_string(),
91                    field_name: Some("payload.payload".to_string()),
92                }));
93            }
94        };
95
96        Ok(QueueMessage {
97            payload: message_payload,
98            receipt_handle: proto_msg.receipt_handle,
99        })
100    }
101}
102
103impl Binding for GrpcQueue {}
104
105#[async_trait]
106impl Queue for GrpcQueue {
107    async fn send(&self, queue: &str, message: MessagePayload) -> Result<()> {
108        let mut client = self.client.clone();
109
110        let request = tonic::Request::new(SendRequest {
111            binding_name: self.binding_name.clone(),
112            queue: queue.to_string(),
113            message: Some(Self::message_payload_to_proto(message)),
114        });
115
116        client
117            .send(request)
118            .await
119            .into_alien_error()
120            .context(ErrorData::GrpcRequestFailed {
121                service: "QueueService".to_string(),
122                method: "send".to_string(),
123                details: format!("Failed to send message to queue: {}", queue),
124            })?;
125
126        Ok(())
127    }
128
129    async fn receive(&self, queue: &str, max_messages: usize) -> Result<Vec<QueueMessage>> {
130        let mut client = self.client.clone();
131
132        let request = tonic::Request::new(ReceiveRequest {
133            binding_name: self.binding_name.clone(),
134            queue: queue.to_string(),
135            max_messages: max_messages as u32,
136        });
137
138        let response = client.receive(request).await.into_alien_error().context(
139            ErrorData::GrpcRequestFailed {
140                service: "QueueService".to_string(),
141                method: "receive".to_string(),
142                details: format!("Failed to receive messages from queue: {}", queue),
143            },
144        )?;
145
146        let proto_messages = response.into_inner().messages;
147        let messages: Result<Vec<QueueMessage>> = proto_messages
148            .into_iter()
149            .map(Self::proto_to_queue_message)
150            .collect();
151
152        messages
153    }
154
155    async fn ack(&self, queue: &str, receipt_handle: &str) -> Result<()> {
156        let mut client = self.client.clone();
157
158        let request = tonic::Request::new(AckRequest {
159            binding_name: self.binding_name.clone(),
160            queue: queue.to_string(),
161            receipt_handle: receipt_handle.to_string(),
162        });
163
164        client
165            .ack(request)
166            .await
167            .into_alien_error()
168            .context(ErrorData::GrpcRequestFailed {
169                service: "QueueService".to_string(),
170                method: "ack".to_string(),
171                details: format!("Failed to acknowledge message in queue: {}", queue),
172            })?;
173
174        Ok(())
175    }
176}