synap_sdk/
queue_reactive.rs

1//! Reactive queue operations
2//!
3//! Provides Stream-based message consumption for queues.
4
5use crate::error::Result;
6use crate::reactive::{MessageStream, SubscriptionHandle};
7use crate::types::Message;
8use futures::Stream;
9use std::time::Duration;
10use tokio::sync::mpsc;
11use tokio::time::sleep;
12
13impl crate::queue::QueueManager {
14    /// Observe messages from a queue reactively
15    ///
16    /// Returns a Stream of messages that can be processed asynchronously.
17    /// The stream will poll the queue at the specified interval.
18    ///
19    /// # Arguments
20    /// * `queue_name` - Name of the queue
21    /// * `consumer_id` - Unique consumer identifier
22    /// * `poll_interval` - How often to poll for new messages
23    ///
24    /// # Example
25    /// ```no_run
26    /// use futures::StreamExt;
27    /// use synap_sdk::{SynapClient, SynapConfig};
28    /// use std::time::Duration;
29    ///
30    /// # #[tokio::main]
31    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
32    /// # let client = SynapClient::new(SynapConfig::new("http://localhost:15500"))?;
33    /// let (mut stream, handle) = client.queue()
34    ///     .observe_messages("tasks", "worker-1", Duration::from_millis(100));
35    ///
36    /// // Process messages reactively
37    /// while let Some(message) = stream.next().await {
38    ///     println!("Received: {:?}", message);
39    ///     // ACK handled automatically
40    /// }
41    ///
42    /// // Stop consuming
43    /// handle.unsubscribe();
44    /// # Ok(())
45    /// # }
46    /// ```
47    pub fn observe_messages(
48        &self,
49        queue_name: impl Into<String>,
50        consumer_id: impl Into<String>,
51        poll_interval: Duration,
52    ) -> (impl Stream<Item = Message> + 'static, SubscriptionHandle) {
53        let queue_name = queue_name.into();
54        let consumer_id = consumer_id.into();
55        let client = self.client.clone();
56
57        let (tx, rx) = mpsc::unbounded_channel();
58        let (cancel_tx, mut cancel_rx) = mpsc::unbounded_channel();
59
60        tokio::spawn(async move {
61            loop {
62                tokio::select! {
63                    _ = cancel_rx.recv() => {
64                        tracing::debug!("Message stream cancelled");
65                        break;
66                    }
67                    _ = sleep(poll_interval) => {
68                        match client.queue().consume(&queue_name, &consumer_id).await {
69                            Ok(Some(message)) => {
70                                if tx.send(message).is_err() {
71                                    break; // Receiver dropped
72                                }
73                            }
74                            Ok(None) => {
75                                // No messages available, continue polling
76                            }
77                            Err(e) => {
78                                tracing::error!("Error consuming message: {}", e);
79                                // Continue polling despite errors
80                            }
81                        }
82                    }
83                }
84            }
85        });
86
87        let stream: MessageStream<Message> =
88            Box::pin(tokio_stream::wrappers::UnboundedReceiverStream::new(rx));
89        let handle = SubscriptionHandle::new(cancel_tx);
90
91        (stream, handle)
92    }
93
94    /// Process messages from a queue with automatic ACK/NACK
95    ///
96    /// Automatically acknowledges successfully processed messages and
97    /// requeues failed ones.
98    ///
99    /// # Example
100    /// ```no_run
101    /// use synap_sdk::{SynapClient, SynapConfig};
102    /// use std::time::Duration;
103    ///
104    /// # #[tokio::main]
105    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
106    /// # let client = SynapClient::new(SynapConfig::new("http://localhost:15500"))?;
107    /// let handle = client.queue().process_messages(
108    ///     "tasks",
109    ///     "worker-1",
110    ///     Duration::from_millis(100),
111    ///     |message| async move {
112    ///         // Process the message
113    ///         println!("Processing: {:?}", message.id);
114    ///         Ok(()) // Success = ACK, Err = NACK
115    ///     }
116    /// );
117    ///
118    /// // Stop processing
119    /// handle.unsubscribe();
120    /// # Ok(())
121    /// # }
122    /// ```
123    pub fn process_messages<F, Fut>(
124        &self,
125        queue_name: impl Into<String>,
126        consumer_id: impl Into<String>,
127        poll_interval: Duration,
128        handler: F,
129    ) -> SubscriptionHandle
130    where
131        F: Fn(Message) -> Fut + Send + 'static,
132        Fut: std::future::Future<Output = Result<()>> + Send,
133    {
134        let queue_name = queue_name.into();
135        let consumer_id = consumer_id.into();
136        let client = self.client.clone();
137
138        let (cancel_tx, mut cancel_rx) = mpsc::unbounded_channel();
139
140        tokio::spawn(async move {
141            loop {
142                tokio::select! {
143                    _ = cancel_rx.recv() => {
144                        tracing::debug!("Message processor cancelled");
145                        break;
146                    }
147                    _ = sleep(poll_interval) => {
148                        match client.queue().consume(&queue_name, &consumer_id).await {
149                            Ok(Some(message)) => {
150                                let msg_id = message.id.clone();
151
152                                // Process message
153                                match handler(message).await {
154                                    Ok(()) => {
155                                        // Success: ACK
156                                        if let Err(e) = client.queue().ack(&queue_name, &msg_id).await {
157                                            tracing::error!("Failed to ACK message {}: {}", msg_id, e);
158                                        }
159                                    }
160                                    Err(e) => {
161                                        // Error: NACK (requeue)
162                                        tracing::warn!("Processing failed for {}: {}", msg_id, e);
163                                        if let Err(e) = client.queue().nack(&queue_name, &msg_id).await {
164                                            tracing::error!("Failed to NACK message {}: {}", msg_id, e);
165                                        }
166                                    }
167                                }
168                            }
169                            Ok(None) => {
170                                // No messages, continue polling
171                            }
172                            Err(e) => {
173                                tracing::error!("Error consuming message: {}", e);
174                            }
175                        }
176                    }
177                }
178            }
179        });
180
181        SubscriptionHandle::new(cancel_tx)
182    }
183}