reddb_server/wire/redwire/
queue_wait.rs1use crate::serde_json::{self, Value as JsonValue};
18use reddb_wire::redwire::frame::{Frame, MessageKind};
19
20use super::FrameBuilder;
21
22#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct QueueWaitOpenRequest {
35 pub queue: String,
36 pub group: Option<String>,
37 pub consumer: String,
38 pub count: usize,
39 pub wait_ms: u64,
40}
41
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub enum QueueWaitParseError {
44 NotJson,
45 NotObject,
46 MissingQueue,
47 MissingConsumer,
48}
49
50impl QueueWaitParseError {
51 pub fn code(&self) -> &'static str {
52 match self {
53 Self::NotJson | Self::NotObject => "queue_wait_invalid_payload",
54 Self::MissingQueue => "queue_wait_missing_queue",
55 Self::MissingConsumer => "queue_wait_missing_consumer",
56 }
57 }
58 pub fn message(&self) -> &'static str {
59 match self {
60 Self::NotJson => "QueueWaitOpen payload must be JSON",
61 Self::NotObject => "QueueWaitOpen payload must be a JSON object",
62 Self::MissingQueue => "QueueWaitOpen payload missing 'queue' string field",
63 Self::MissingConsumer => "QueueWaitOpen payload missing 'consumer' string field",
64 }
65 }
66}
67
68pub fn parse_queue_wait_open(payload: &[u8]) -> Result<QueueWaitOpenRequest, QueueWaitParseError> {
69 let v: JsonValue = serde_json::from_slice(payload).map_err(|_| QueueWaitParseError::NotJson)?;
70 let obj = v.as_object().ok_or(QueueWaitParseError::NotObject)?;
71 let queue = obj
72 .get("queue")
73 .and_then(|x| x.as_str())
74 .filter(|s| !s.is_empty())
75 .ok_or(QueueWaitParseError::MissingQueue)?
76 .to_string();
77 let consumer = obj
78 .get("consumer")
79 .and_then(|x| x.as_str())
80 .filter(|s| !s.is_empty())
81 .ok_or(QueueWaitParseError::MissingConsumer)?
82 .to_string();
83 let group = obj
84 .get("group")
85 .and_then(|x| x.as_str())
86 .filter(|s| !s.is_empty())
87 .map(|s| s.to_string());
88 let count = obj
91 .get("count")
92 .and_then(|x| x.as_f64())
93 .map(|n| (n as usize).max(1))
94 .unwrap_or(1);
95 let wait_ms = obj
96 .get("wait_ms")
97 .and_then(|x| x.as_f64())
98 .map(|n| n.max(0.0) as u64)
99 .unwrap_or(0);
100 Ok(QueueWaitOpenRequest {
101 queue,
102 group,
103 consumer,
104 count,
105 wait_ms,
106 })
107}
108
109pub fn build_event_push_payload(message: &JsonValue) -> Vec<u8> {
113 serde_json::to_vec(message).unwrap_or_default()
114}
115
116pub fn build_event_push_frame(
120 correlation_id: u64,
121 stream_id: u16,
122 message: &JsonValue,
123) -> Result<Frame, super::BuildError> {
124 FrameBuilder::reply_to(correlation_id)
125 .kind(MessageKind::QueueEventPush)
126 .stream_id(stream_id)
127 .payload(build_event_push_payload(message))
128 .build()
129}
130
131pub fn build_queue_wait_timeout_frame(
142 correlation_id: u64,
143 stream_id: u16,
144 queue: &str,
145 wait_ms: u64,
146) -> Result<Frame, super::BuildError> {
147 let mut obj = serde_json::Map::new();
148 obj.insert(
149 "outcome".to_string(),
150 JsonValue::String("timeout".to_string()),
151 );
152 obj.insert("queue".to_string(), JsonValue::String(queue.to_string()));
153 obj.insert("wait_ms".to_string(), JsonValue::Number(wait_ms as f64));
154 FrameBuilder::reply_to(correlation_id)
155 .kind(MessageKind::QueueWaitTimeout)
156 .stream_id(stream_id)
157 .payload(serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default())
158 .build()
159}
160
161pub const WAIT_CANCELLED_CODE: &str = "queue_wait_cancelled";
166
167pub const WAIT_EXCEEDS_CAP_CODE: &str = "queue_wait_exceeds_cap";
171
172pub const WAIT_FAILED_CODE: &str = "queue_wait_failed";
175
176pub fn build_queue_wait_error_frame(
180 correlation_id: u64,
181 stream_id: u16,
182 code: &str,
183 message: &str,
184) -> Result<Frame, super::BuildError> {
185 let mut obj = serde_json::Map::new();
186 obj.insert("code".to_string(), JsonValue::String(code.to_string()));
187 obj.insert(
188 "message".to_string(),
189 JsonValue::String(message.to_string()),
190 );
191 FrameBuilder::reply_to(correlation_id)
192 .kind(MessageKind::StreamError)
193 .stream_id(stream_id)
194 .payload(serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default())
195 .build()
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201
202 #[test]
203 fn parse_minimal_request_applies_defaults() {
204 let req = parse_queue_wait_open(br#"{"queue":"jobs","consumer":"w1"}"#).unwrap();
205 assert_eq!(req.queue, "jobs");
206 assert_eq!(req.consumer, "w1");
207 assert_eq!(req.group, None);
208 assert_eq!(req.count, 1);
209 assert_eq!(req.wait_ms, 0);
210 }
211
212 #[test]
213 fn parse_full_request() {
214 let req = parse_queue_wait_open(
215 br#"{"queue":"jobs","group":"g","consumer":"w1","count":3,"wait_ms":5000}"#,
216 )
217 .unwrap();
218 assert_eq!(req.group.as_deref(), Some("g"));
219 assert_eq!(req.count, 3);
220 assert_eq!(req.wait_ms, 5000);
221 }
222
223 #[test]
224 fn parse_rejects_missing_queue_and_consumer() {
225 assert_eq!(
226 parse_queue_wait_open(br#"{"consumer":"w1"}"#).unwrap_err(),
227 QueueWaitParseError::MissingQueue
228 );
229 assert_eq!(
230 parse_queue_wait_open(br#"{"queue":"jobs"}"#).unwrap_err(),
231 QueueWaitParseError::MissingConsumer
232 );
233 }
234
235 #[test]
236 fn parse_rejects_non_json() {
237 assert_eq!(
238 parse_queue_wait_open(b"not json").unwrap_err(),
239 QueueWaitParseError::NotJson
240 );
241 }
242
243 #[test]
244 fn event_push_frame_echoes_correlation_and_stream() {
245 let mut obj = serde_json::Map::new();
246 obj.insert("message_id".to_string(), JsonValue::String("42".into()));
247 let frame = build_event_push_frame(99, 7, &JsonValue::Object(obj)).unwrap();
248 assert_eq!(frame.kind, MessageKind::QueueEventPush);
249 assert_eq!(frame.correlation_id, 99);
250 assert_eq!(frame.stream_id, 7);
251 }
252
253 #[test]
254 fn timeout_frame_is_distinct_kind_echoing_open() {
255 let frame = build_queue_wait_timeout_frame(99, 7, "jobs", 5000).unwrap();
256 assert_eq!(frame.kind, MessageKind::QueueWaitTimeout);
260 assert_ne!(frame.kind, MessageKind::QueueEventPush);
261 assert_ne!(frame.kind, MessageKind::StreamError);
262 assert_eq!(frame.correlation_id, 99, "echoes the open correlation");
263 assert_eq!(frame.stream_id, 7, "echoes the open stream_id");
264 let body: JsonValue = serde_json::from_slice(&frame.payload).unwrap();
265 assert_eq!(body["outcome"], JsonValue::String("timeout".into()));
266 assert_eq!(body["queue"], JsonValue::String("jobs".into()));
267 assert_eq!(body["wait_ms"], JsonValue::Number(5000.0));
268 }
269
270 #[test]
271 fn cancellation_and_cap_codes_are_distinct() {
272 assert_ne!(WAIT_CANCELLED_CODE, WAIT_FAILED_CODE);
276 assert_ne!(WAIT_CANCELLED_CODE, WAIT_EXCEEDS_CAP_CODE);
277 assert_ne!(WAIT_EXCEEDS_CAP_CODE, WAIT_FAILED_CODE);
278 }
279}