1use crate::client::SynapClient;
4use crate::error::Result;
5use crate::types::{Message, QueueStats};
6use serde_json::json;
7
8pub use crate::reactive::{MessageStream, SubscriptionHandle};
10
11#[derive(Clone)]
15pub struct QueueManager {
16 pub(crate) client: SynapClient,
17}
18
19impl QueueManager {
20 pub(crate) fn new(client: SynapClient) -> Self {
22 Self { client }
23 }
24
25 pub async fn create_queue(
32 &self,
33 queue_name: &str,
34 max_depth: Option<usize>,
35 ack_deadline_secs: Option<u64>,
36 ) -> Result<()> {
37 let mut config = serde_json::Map::new();
38 if let Some(depth) = max_depth {
39 config.insert("max_depth".to_string(), json!(depth));
40 }
41 if let Some(deadline) = ack_deadline_secs {
42 config.insert("ack_deadline_secs".to_string(), json!(deadline));
43 }
44 let mut payload = json!({"name": queue_name});
45 if !config.is_empty() {
46 payload["config"] = serde_json::Value::Object(config);
47 }
48
49 self.client.send_command("queue.create", payload).await?;
50 Ok(())
51 }
52
53 pub async fn publish(
66 &self,
67 queue_name: &str,
68 payload: &[u8],
69 priority: Option<u8>,
70 max_retries: Option<u32>,
71 ) -> Result<String> {
72 let body = json!({
73 "queue": queue_name,
74 "payload": payload,
75 "priority": priority,
76 "max_retries": max_retries,
77 });
78
79 let response = self.client.send_command("queue.publish", body).await?;
80
81 Ok(response["message_id"]
82 .as_str()
83 .unwrap_or_default()
84 .to_string())
85 }
86
87 pub async fn consume(&self, queue_name: &str, consumer_id: &str) -> Result<Option<Message>> {
89 let payload = json!({
90 "queue": queue_name,
91 "consumer_id": consumer_id,
92 });
93
94 let response = self.client.send_command("queue.consume", payload).await?;
95
96 let msg_val = if let Some(inner) = response.get("message") {
99 inner.clone()
100 } else {
101 response
102 };
103
104 if msg_val.is_null() {
105 return Ok(None);
106 }
107
108 Ok(serde_json::from_value(msg_val).ok())
109 }
110
111 pub async fn ack(&self, queue_name: &str, message_id: &str) -> Result<()> {
113 let payload = json!({
114 "queue": queue_name,
115 "message_id": message_id
116 });
117
118 self.client.send_command("queue.ack", payload).await?;
119 Ok(())
120 }
121
122 pub async fn nack(&self, queue_name: &str, message_id: &str) -> Result<()> {
124 let payload = json!({
125 "queue": queue_name,
126 "message_id": message_id
127 });
128
129 self.client.send_command("queue.nack", payload).await?;
130 Ok(())
131 }
132
133 pub async fn stats(&self, queue_name: &str) -> Result<QueueStats> {
135 let payload = json!({"queue": queue_name});
136 let response = self.client.send_command("queue.stats", payload).await?;
137 Ok(serde_json::from_value(response)?)
138 }
139
140 pub async fn list(&self) -> Result<Vec<String>> {
142 let response = self.client.send_command("queue.list", json!({})).await?;
143 Ok(serde_json::from_value(response["queues"].clone())?)
144 }
145
146 pub async fn delete_queue(&self, queue_name: &str) -> Result<()> {
148 let payload = json!({"queue": queue_name});
149 self.client.send_command("queue.delete", payload).await?;
150 Ok(())
151 }
152}
153
154#[cfg(test)]
155mod tests {
156 use super::*;
157 use crate::SynapConfig;
158
159 #[test]
160 fn test_queue_manager_creation() {
161 let config = SynapConfig::new("http://localhost:15500");
162 let client = SynapClient::new(config).unwrap();
163 let queue = client.queue();
164
165 assert!(std::mem::size_of_val(&queue) > 0);
166 }
167
168 #[test]
169 fn test_queue_manager_clone() {
170 let config = SynapConfig::new("http://localhost:15500");
171 let client = SynapClient::new(config).unwrap();
172 let queue1 = client.queue();
173 let queue2 = queue1.clone();
174
175 assert!(std::mem::size_of_val(&queue1) > 0);
176 assert!(std::mem::size_of_val(&queue2) > 0);
177 }
178}