Skip to main content

synap_sdk/
queue.rs

1//! Queue operations
2
3use crate::client::SynapClient;
4use crate::error::Result;
5use crate::types::{Message, QueueStats};
6use serde_json::json;
7
8// Re-export for convenience
9pub use crate::reactive::{MessageStream, SubscriptionHandle};
10
11/// Queue Manager interface
12///
13/// Uses StreamableHTTP protocol for all operations.
14#[derive(Clone)]
15pub struct QueueManager {
16    pub(crate) client: SynapClient,
17}
18
19impl QueueManager {
20    /// Create a new Queue manager interface
21    pub(crate) fn new(client: SynapClient) -> Self {
22        Self { client }
23    }
24
25    /// Create a new queue
26    ///
27    /// # Arguments
28    /// * `queue_name` - Name of the queue
29    /// * `max_depth` - Maximum queue depth (optional)
30    /// * `ack_deadline_secs` - ACK deadline in seconds (optional)
31    pub async fn create_queue(
32        &self,
33        queue_name: &str,
34        max_depth: Option<usize>,
35        ack_deadline_secs: Option<u64>,
36    ) -> Result<()> {
37        let mut config = serde_json::Map::new();
38        if let Some(depth) = max_depth {
39            config.insert("max_depth".to_string(), json!(depth));
40        }
41        if let Some(deadline) = ack_deadline_secs {
42            config.insert("ack_deadline_secs".to_string(), json!(deadline));
43        }
44        let mut payload = json!({"name": queue_name});
45        if !config.is_empty() {
46            payload["config"] = serde_json::Value::Object(config);
47        }
48
49        self.client.send_command("queue.create", payload).await?;
50        Ok(())
51    }
52
53    /// Publish a message to a queue
54    ///
55    /// # Example
56    /// ```no_run
57    /// # use synap_sdk::{SynapClient, SynapConfig};
58    /// # #[tokio::main]
59    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
60    /// # let client = SynapClient::new(SynapConfig::new("http://localhost:15500"))?;
61    /// client.queue().publish("tasks", b"process-video", Some(9), None).await?;
62    /// # Ok(())
63    /// # }
64    /// ```
65    pub async fn publish(
66        &self,
67        queue_name: &str,
68        payload: &[u8],
69        priority: Option<u8>,
70        max_retries: Option<u32>,
71    ) -> Result<String> {
72        let body = json!({
73            "queue": queue_name,
74            "payload": payload,
75            "priority": priority,
76            "max_retries": max_retries,
77        });
78
79        let response = self.client.send_command("queue.publish", body).await?;
80
81        Ok(response["message_id"]
82            .as_str()
83            .unwrap_or_default()
84            .to_string())
85    }
86
87    /// Consume a message from a queue
88    pub async fn consume(&self, queue_name: &str, consumer_id: &str) -> Result<Option<Message>> {
89        let payload = json!({
90            "queue": queue_name,
91            "consumer_id": consumer_id,
92        });
93
94        let response = self.client.send_command("queue.consume", payload).await?;
95
96        // HTTP transport wraps the message: {"message": {...}} or {"message": null}
97        // Native transports (SynapRPC, RESP3) return the message directly or Null.
98        let msg_val = if let Some(inner) = response.get("message") {
99            inner.clone()
100        } else {
101            response
102        };
103
104        if msg_val.is_null() {
105            return Ok(None);
106        }
107
108        Ok(serde_json::from_value(msg_val).ok())
109    }
110
111    /// Acknowledge a message
112    pub async fn ack(&self, queue_name: &str, message_id: &str) -> Result<()> {
113        let payload = json!({
114            "queue": queue_name,
115            "message_id": message_id
116        });
117
118        self.client.send_command("queue.ack", payload).await?;
119        Ok(())
120    }
121
122    /// Negative acknowledge a message (requeue)
123    pub async fn nack(&self, queue_name: &str, message_id: &str) -> Result<()> {
124        let payload = json!({
125            "queue": queue_name,
126            "message_id": message_id
127        });
128
129        self.client.send_command("queue.nack", payload).await?;
130        Ok(())
131    }
132
133    /// Get queue statistics
134    pub async fn stats(&self, queue_name: &str) -> Result<QueueStats> {
135        let payload = json!({"queue": queue_name});
136        let response = self.client.send_command("queue.stats", payload).await?;
137        Ok(serde_json::from_value(response)?)
138    }
139
140    /// List all queues
141    pub async fn list(&self) -> Result<Vec<String>> {
142        let response = self.client.send_command("queue.list", json!({})).await?;
143        Ok(serde_json::from_value(response["queues"].clone())?)
144    }
145
146    /// Delete a queue
147    pub async fn delete_queue(&self, queue_name: &str) -> Result<()> {
148        let payload = json!({"queue": queue_name});
149        self.client.send_command("queue.delete", payload).await?;
150        Ok(())
151    }
152}
153
154#[cfg(test)]
155mod tests {
156    use super::*;
157    use crate::SynapConfig;
158
159    #[test]
160    fn test_queue_manager_creation() {
161        let config = SynapConfig::new("http://localhost:15500");
162        let client = SynapClient::new(config).unwrap();
163        let queue = client.queue();
164
165        assert!(std::mem::size_of_val(&queue) > 0);
166    }
167
168    #[test]
169    fn test_queue_manager_clone() {
170        let config = SynapConfig::new("http://localhost:15500");
171        let client = SynapClient::new(config).unwrap();
172        let queue1 = client.queue();
173        let queue2 = queue1.clone();
174
175        assert!(std::mem::size_of_val(&queue1) > 0);
176        assert!(std::mem::size_of_val(&queue2) > 0);
177    }
178}