1use crate::client::SynapClient;
4use crate::error::Result;
5use crate::types::{Message, QueueStats};
6use serde_json::json;
7
8pub use crate::reactive::{MessageStream, SubscriptionHandle};
10
11#[derive(Clone)]
15pub struct QueueManager {
16 pub(crate) client: SynapClient,
17}
18
19impl QueueManager {
20 pub(crate) fn new(client: SynapClient) -> Self {
22 Self { client }
23 }
24
25 pub async fn create_queue(
32 &self,
33 queue_name: &str,
34 max_depth: Option<usize>,
35 ack_deadline_secs: Option<u64>,
36 ) -> Result<()> {
37 let mut payload = json!({"queue_name": queue_name});
38 if let Some(depth) = max_depth {
39 payload["max_depth"] = json!(depth);
40 }
41 if let Some(deadline) = ack_deadline_secs {
42 payload["ack_deadline_secs"] = json!(deadline);
43 }
44
45 self.client.send_command("queue.create", payload).await?;
46 Ok(())
47 }
48
49 pub async fn publish(
62 &self,
63 queue_name: &str,
64 payload: &[u8],
65 priority: Option<u8>,
66 max_retries: Option<u32>,
67 ) -> Result<String> {
68 let body = json!({
69 "queue_name": queue_name,
70 "payload": payload,
71 "priority": priority,
72 "max_retries": max_retries,
73 });
74
75 let response = self.client.send_command("queue.publish", body).await?;
76
77 Ok(response["message_id"]
78 .as_str()
79 .unwrap_or_default()
80 .to_string())
81 }
82
83 pub async fn consume(&self, queue_name: &str, consumer_id: &str) -> Result<Option<Message>> {
85 let payload = json!({
86 "queue_name": queue_name,
87 "consumer_id": consumer_id,
88 });
89
90 let response = self.client.send_command("queue.consume", payload).await?;
91
92 if response.is_null() {
93 return Ok(None);
94 }
95
96 Ok(serde_json::from_value(response).ok())
97 }
98
99 pub async fn ack(&self, queue_name: &str, message_id: &str) -> Result<()> {
101 let payload = json!({
102 "queue_name": queue_name,
103 "message_id": message_id
104 });
105
106 self.client.send_command("queue.ack", payload).await?;
107 Ok(())
108 }
109
110 pub async fn nack(&self, queue_name: &str, message_id: &str) -> Result<()> {
112 let payload = json!({
113 "queue_name": queue_name,
114 "message_id": message_id
115 });
116
117 self.client.send_command("queue.nack", payload).await?;
118 Ok(())
119 }
120
121 pub async fn stats(&self, queue_name: &str) -> Result<QueueStats> {
123 let payload = json!({"queue_name": queue_name});
124 let response = self.client.send_command("queue.stats", payload).await?;
125 Ok(serde_json::from_value(response)?)
126 }
127
128 pub async fn list(&self) -> Result<Vec<String>> {
130 let response = self.client.send_command("queue.list", json!({})).await?;
131 Ok(serde_json::from_value(response["queues"].clone())?)
132 }
133
134 pub async fn delete_queue(&self, queue_name: &str) -> Result<()> {
136 let payload = json!({"queue_name": queue_name});
137 self.client.send_command("queue.delete", payload).await?;
138 Ok(())
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use super::*;
145 use crate::SynapConfig;
146
147 #[test]
148 fn test_queue_manager_creation() {
149 let config = SynapConfig::new("http://localhost:15500");
150 let client = SynapClient::new(config).unwrap();
151 let queue = client.queue();
152
153 assert!(std::mem::size_of_val(&queue) > 0);
154 }
155
156 #[test]
157 fn test_queue_manager_clone() {
158 let config = SynapConfig::new("http://localhost:15500");
159 let client = SynapClient::new(config).unwrap();
160 let queue1 = client.queue();
161 let queue2 = queue1.clone();
162
163 assert!(std::mem::size_of_val(&queue1) > 0);
164 assert!(std::mem::size_of_val(&queue2) > 0);
165 }
166}