synap_sdk/
queue.rs

1//! Queue operations
2
3use crate::client::SynapClient;
4use crate::error::Result;
5use crate::types::{Message, QueueStats};
6use serde_json::json;
7
8// Re-export for convenience
9pub use crate::reactive::{MessageStream, SubscriptionHandle};
10
11/// Queue Manager interface
12///
13/// Uses StreamableHTTP protocol for all operations.
14#[derive(Clone)]
15pub struct QueueManager {
16    pub(crate) client: SynapClient,
17}
18
19impl QueueManager {
20    /// Create a new Queue manager interface
21    pub(crate) fn new(client: SynapClient) -> Self {
22        Self { client }
23    }
24
25    /// Create a new queue
26    ///
27    /// # Arguments
28    /// * `queue_name` - Name of the queue
29    /// * `max_depth` - Maximum queue depth (optional)
30    /// * `ack_deadline_secs` - ACK deadline in seconds (optional)
31    pub async fn create_queue(
32        &self,
33        queue_name: &str,
34        max_depth: Option<usize>,
35        ack_deadline_secs: Option<u64>,
36    ) -> Result<()> {
37        let mut payload = json!({"queue_name": queue_name});
38        if let Some(depth) = max_depth {
39            payload["max_depth"] = json!(depth);
40        }
41        if let Some(deadline) = ack_deadline_secs {
42            payload["ack_deadline_secs"] = json!(deadline);
43        }
44
45        self.client.send_command("queue.create", payload).await?;
46        Ok(())
47    }
48
49    /// Publish a message to a queue
50    ///
51    /// # Example
52    /// ```no_run
53    /// # use synap_sdk::{SynapClient, SynapConfig};
54    /// # #[tokio::main]
55    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
56    /// # let client = SynapClient::new(SynapConfig::new("http://localhost:15500"))?;
57    /// client.queue().publish("tasks", b"process-video", Some(9), None).await?;
58    /// # Ok(())
59    /// # }
60    /// ```
61    pub async fn publish(
62        &self,
63        queue_name: &str,
64        payload: &[u8],
65        priority: Option<u8>,
66        max_retries: Option<u32>,
67    ) -> Result<String> {
68        let body = json!({
69            "queue_name": queue_name,
70            "payload": payload,
71            "priority": priority,
72            "max_retries": max_retries,
73        });
74
75        let response = self.client.send_command("queue.publish", body).await?;
76
77        Ok(response["message_id"]
78            .as_str()
79            .unwrap_or_default()
80            .to_string())
81    }
82
83    /// Consume a message from a queue
84    pub async fn consume(&self, queue_name: &str, consumer_id: &str) -> Result<Option<Message>> {
85        let payload = json!({
86            "queue_name": queue_name,
87            "consumer_id": consumer_id,
88        });
89
90        let response = self.client.send_command("queue.consume", payload).await?;
91
92        if response.is_null() {
93            return Ok(None);
94        }
95
96        Ok(serde_json::from_value(response).ok())
97    }
98
99    /// Acknowledge a message
100    pub async fn ack(&self, queue_name: &str, message_id: &str) -> Result<()> {
101        let payload = json!({
102            "queue_name": queue_name,
103            "message_id": message_id
104        });
105
106        self.client.send_command("queue.ack", payload).await?;
107        Ok(())
108    }
109
110    /// Negative acknowledge a message (requeue)
111    pub async fn nack(&self, queue_name: &str, message_id: &str) -> Result<()> {
112        let payload = json!({
113            "queue_name": queue_name,
114            "message_id": message_id
115        });
116
117        self.client.send_command("queue.nack", payload).await?;
118        Ok(())
119    }
120
121    /// Get queue statistics
122    pub async fn stats(&self, queue_name: &str) -> Result<QueueStats> {
123        let payload = json!({"queue_name": queue_name});
124        let response = self.client.send_command("queue.stats", payload).await?;
125        Ok(serde_json::from_value(response)?)
126    }
127
128    /// List all queues
129    pub async fn list(&self) -> Result<Vec<String>> {
130        let response = self.client.send_command("queue.list", json!({})).await?;
131        Ok(serde_json::from_value(response["queues"].clone())?)
132    }
133
134    /// Delete a queue
135    pub async fn delete_queue(&self, queue_name: &str) -> Result<()> {
136        let payload = json!({"queue_name": queue_name});
137        self.client.send_command("queue.delete", payload).await?;
138        Ok(())
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use super::*;
145    use crate::SynapConfig;
146
147    #[test]
148    fn test_queue_manager_creation() {
149        let config = SynapConfig::new("http://localhost:15500");
150        let client = SynapClient::new(config).unwrap();
151        let queue = client.queue();
152
153        assert!(std::mem::size_of_val(&queue) > 0);
154    }
155
156    #[test]
157    fn test_queue_manager_clone() {
158        let config = SynapConfig::new("http://localhost:15500");
159        let client = SynapClient::new(config).unwrap();
160        let queue1 = client.queue();
161        let queue2 = queue1.clone();
162
163        assert!(std::mem::size_of_val(&queue1) > 0);
164        assert!(std::mem::size_of_val(&queue2) > 0);
165    }
166}