reddb_server/wire/redwire/
queue_wait.rs1use crate::serde_json::{self, Value as JsonValue};
18pub use reddb_wire::redwire::queue::{
19 QueueWaitOpenRequest, QueueWaitParseError, WAIT_CANCELLED_CODE, WAIT_EXCEEDS_CAP_CODE,
20 WAIT_FAILED_CODE,
21};
22use reddb_wire::redwire::Frame;
23
24pub fn parse_queue_wait_open(payload: &[u8]) -> Result<QueueWaitOpenRequest, QueueWaitParseError> {
25 reddb_wire::redwire::queue::parse_queue_wait_open(payload)
26}
27
28pub fn build_event_push_frame(
32 correlation_id: u64,
33 stream_id: u16,
34 message: &JsonValue,
35) -> Result<Frame, super::BuildError> {
36 let bytes = message.to_string_compact().into_bytes();
37 reddb_wire::redwire::queue::build_queue_event_push_frame_from_json_bytes(
38 correlation_id,
39 stream_id,
40 &bytes,
41 )
42}
43
44pub fn build_queue_wait_timeout_frame(
55 correlation_id: u64,
56 stream_id: u16,
57 queue: &str,
58 wait_ms: u64,
59) -> Result<Frame, super::BuildError> {
60 reddb_wire::redwire::queue::build_queue_wait_timeout_frame(
61 correlation_id,
62 stream_id,
63 queue,
64 wait_ms,
65 )
66}
67
68pub fn build_queue_wait_error_frame(
72 correlation_id: u64,
73 stream_id: u16,
74 code: &str,
75 message: &str,
76) -> Result<Frame, super::BuildError> {
77 reddb_wire::redwire::queue::build_queue_wait_error_frame(
78 correlation_id,
79 stream_id,
80 code,
81 message,
82 )
83}
84
85#[cfg(test)]
86mod tests {
87 use super::*;
88 use reddb_wire::redwire::MessageKind;
89
90 #[test]
91 fn parse_minimal_request_applies_defaults() {
92 let req = parse_queue_wait_open(br#"{"queue":"jobs","consumer":"w1"}"#).unwrap();
93 assert_eq!(req.queue, "jobs");
94 assert_eq!(req.consumer, "w1");
95 assert_eq!(req.group, None);
96 assert_eq!(req.count, 1);
97 assert_eq!(req.wait_ms, 0);
98 }
99
100 #[test]
101 fn parse_full_request() {
102 let req = parse_queue_wait_open(
103 br#"{"queue":"jobs","group":"g","consumer":"w1","count":3,"wait_ms":5000}"#,
104 )
105 .unwrap();
106 assert_eq!(req.group.as_deref(), Some("g"));
107 assert_eq!(req.count, 3);
108 assert_eq!(req.wait_ms, 5000);
109 }
110
111 #[test]
112 fn parse_rejects_missing_queue_and_consumer() {
113 assert_eq!(
114 parse_queue_wait_open(br#"{"consumer":"w1"}"#).unwrap_err(),
115 QueueWaitParseError::MissingQueue
116 );
117 assert_eq!(
118 parse_queue_wait_open(br#"{"queue":"jobs"}"#).unwrap_err(),
119 QueueWaitParseError::MissingConsumer
120 );
121 }
122
123 #[test]
124 fn parse_rejects_non_json() {
125 assert_eq!(
126 parse_queue_wait_open(b"not json").unwrap_err(),
127 QueueWaitParseError::NotJson
128 );
129 }
130
131 #[test]
132 fn event_push_frame_echoes_correlation_and_stream() {
133 let mut obj = serde_json::Map::new();
134 obj.insert("message_id".to_string(), JsonValue::String("42".into()));
135 let frame = build_event_push_frame(99, 7, &JsonValue::Object(obj)).unwrap();
136 assert_eq!(frame.kind, MessageKind::QueueEventPush);
137 assert_eq!(frame.correlation_id, 99);
138 assert_eq!(frame.stream_id, 7);
139 }
140
141 #[test]
142 fn timeout_frame_is_distinct_kind_echoing_open() {
143 let frame = build_queue_wait_timeout_frame(99, 7, "jobs", 5000).unwrap();
144 assert_eq!(frame.kind, MessageKind::QueueWaitTimeout);
148 assert_ne!(frame.kind, MessageKind::QueueEventPush);
149 assert_ne!(frame.kind, MessageKind::StreamError);
150 assert_eq!(frame.correlation_id, 99, "echoes the open correlation");
151 assert_eq!(frame.stream_id, 7, "echoes the open stream_id");
152 let body: JsonValue = serde_json::from_slice(&frame.payload).unwrap();
153 assert_eq!(body["outcome"], JsonValue::String("timeout".into()));
154 assert_eq!(body["queue"], JsonValue::String("jobs".into()));
155 assert_eq!(body["wait_ms"], JsonValue::Number(5000.0));
156 }
157
158 #[test]
159 fn cancellation_and_cap_codes_are_distinct() {
160 assert_ne!(WAIT_CANCELLED_CODE, WAIT_FAILED_CODE);
164 assert_ne!(WAIT_CANCELLED_CODE, WAIT_EXCEEDS_CAP_CODE);
165 assert_ne!(WAIT_EXCEEDS_CAP_CODE, WAIT_FAILED_CODE);
166 }
167}