synap_sdk/queue_reactive.rs
1//! Reactive queue operations
2//!
3//! Provides Stream-based message consumption for queues.
4
5use crate::error::Result;
6use crate::reactive::{MessageStream, SubscriptionHandle};
7use crate::types::Message;
8use futures::Stream;
9use std::time::Duration;
10use tokio::sync::mpsc;
11use tokio::time::sleep;
12
13impl crate::queue::QueueManager {
14 /// Observe messages from a queue reactively
15 ///
16 /// Returns a Stream of messages that can be processed asynchronously.
17 /// The stream will poll the queue at the specified interval.
18 ///
19 /// # Arguments
20 /// * `queue_name` - Name of the queue
21 /// * `consumer_id` - Unique consumer identifier
22 /// * `poll_interval` - How often to poll for new messages
23 ///
24 /// # Example
25 /// ```no_run
26 /// use futures::StreamExt;
27 /// use synap_sdk::{SynapClient, SynapConfig};
28 /// use std::time::Duration;
29 ///
30 /// # #[tokio::main]
31 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
32 /// # let client = SynapClient::new(SynapConfig::new("http://localhost:15500"))?;
33 /// let (mut stream, handle) = client.queue()
34 /// .observe_messages("tasks", "worker-1", Duration::from_millis(100));
35 ///
36 /// // Process messages reactively
37 /// while let Some(message) = stream.next().await {
38 /// println!("Received: {:?}", message);
39 /// // ACK handled automatically
40 /// }
41 ///
42 /// // Stop consuming
43 /// handle.unsubscribe();
44 /// # Ok(())
45 /// # }
46 /// ```
47 pub fn observe_messages(
48 &self,
49 queue_name: impl Into<String>,
50 consumer_id: impl Into<String>,
51 poll_interval: Duration,
52 ) -> (impl Stream<Item = Message> + 'static, SubscriptionHandle) {
53 let queue_name = queue_name.into();
54 let consumer_id = consumer_id.into();
55 let client = self.client.clone();
56
57 let (tx, rx) = mpsc::unbounded_channel();
58 let (cancel_tx, mut cancel_rx) = mpsc::unbounded_channel();
59
60 tokio::spawn(async move {
61 loop {
62 tokio::select! {
63 _ = cancel_rx.recv() => {
64 tracing::debug!("Message stream cancelled");
65 break;
66 }
67 _ = sleep(poll_interval) => {
68 match client.queue().consume(&queue_name, &consumer_id).await {
69 Ok(Some(message)) => {
70 if tx.send(message).is_err() {
71 break; // Receiver dropped
72 }
73 }
74 Ok(None) => {
75 // No messages available, continue polling
76 }
77 Err(e) => {
78 tracing::error!("Error consuming message: {}", e);
79 // Continue polling despite errors
80 }
81 }
82 }
83 }
84 }
85 });
86
87 let stream: MessageStream<Message> =
88 Box::pin(tokio_stream::wrappers::UnboundedReceiverStream::new(rx));
89 let handle = SubscriptionHandle::new(cancel_tx);
90
91 (stream, handle)
92 }
93
94 /// Process messages from a queue with automatic ACK/NACK
95 ///
96 /// Automatically acknowledges successfully processed messages and
97 /// requeues failed ones.
98 ///
99 /// # Example
100 /// ```no_run
101 /// use synap_sdk::{SynapClient, SynapConfig};
102 /// use std::time::Duration;
103 ///
104 /// # #[tokio::main]
105 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
106 /// # let client = SynapClient::new(SynapConfig::new("http://localhost:15500"))?;
107 /// let handle = client.queue().process_messages(
108 /// "tasks",
109 /// "worker-1",
110 /// Duration::from_millis(100),
111 /// |message| async move {
112 /// // Process the message
113 /// println!("Processing: {:?}", message.id);
114 /// Ok(()) // Success = ACK, Err = NACK
115 /// }
116 /// );
117 ///
118 /// // Stop processing
119 /// handle.unsubscribe();
120 /// # Ok(())
121 /// # }
122 /// ```
123 pub fn process_messages<F, Fut>(
124 &self,
125 queue_name: impl Into<String>,
126 consumer_id: impl Into<String>,
127 poll_interval: Duration,
128 handler: F,
129 ) -> SubscriptionHandle
130 where
131 F: Fn(Message) -> Fut + Send + 'static,
132 Fut: std::future::Future<Output = Result<()>> + Send,
133 {
134 let queue_name = queue_name.into();
135 let consumer_id = consumer_id.into();
136 let client = self.client.clone();
137
138 let (cancel_tx, mut cancel_rx) = mpsc::unbounded_channel();
139
140 tokio::spawn(async move {
141 loop {
142 tokio::select! {
143 _ = cancel_rx.recv() => {
144 tracing::debug!("Message processor cancelled");
145 break;
146 }
147 _ = sleep(poll_interval) => {
148 match client.queue().consume(&queue_name, &consumer_id).await {
149 Ok(Some(message)) => {
150 let msg_id = message.id.clone();
151
152 // Process message
153 match handler(message).await {
154 Ok(()) => {
155 // Success: ACK
156 if let Err(e) = client.queue().ack(&queue_name, &msg_id).await {
157 tracing::error!("Failed to ACK message {}: {}", msg_id, e);
158 }
159 }
160 Err(e) => {
161 // Error: NACK (requeue)
162 tracing::warn!("Processing failed for {}: {}", msg_id, e);
163 if let Err(e) = client.queue().nack(&queue_name, &msg_id).await {
164 tracing::error!("Failed to NACK message {}: {}", msg_id, e);
165 }
166 }
167 }
168 }
169 Ok(None) => {
170 // No messages, continue polling
171 }
172 Err(e) => {
173 tracing::error!("Error consuming message: {}", e);
174 }
175 }
176 }
177 }
178 }
179 });
180
181 SubscriptionHandle::new(cancel_tx)
182 }
183}