Skip to main content

reddb_wire/redwire/
queue.rs

1//! RedWire live queue-wait payload contracts.
2
3use serde_json::Value as JsonValue;
4
5use super::{BuildError, Frame, FrameBuilder, MessageKind};
6
7#[derive(Debug, Clone, PartialEq, Eq)]
8pub struct QueueWaitOpenRequest {
9    pub queue: String,
10    pub group: Option<String>,
11    pub consumer: String,
12    pub count: usize,
13    pub wait_ms: u64,
14}
15
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub enum QueueWaitParseError {
18    NotJson,
19    NotObject,
20    MissingQueue,
21    MissingConsumer,
22}
23
24impl QueueWaitParseError {
25    pub fn code(&self) -> &'static str {
26        match self {
27            Self::NotJson | Self::NotObject => "queue_wait_invalid_payload",
28            Self::MissingQueue => "queue_wait_missing_queue",
29            Self::MissingConsumer => "queue_wait_missing_consumer",
30        }
31    }
32
33    pub fn message(&self) -> &'static str {
34        match self {
35            Self::NotJson => "QueueWaitOpen payload must be JSON",
36            Self::NotObject => "QueueWaitOpen payload must be a JSON object",
37            Self::MissingQueue => "QueueWaitOpen payload missing 'queue' string field",
38            Self::MissingConsumer => "QueueWaitOpen payload missing 'consumer' string field",
39        }
40    }
41}
42
43pub fn parse_queue_wait_open(payload: &[u8]) -> Result<QueueWaitOpenRequest, QueueWaitParseError> {
44    let v: JsonValue = serde_json::from_slice(payload).map_err(|_| QueueWaitParseError::NotJson)?;
45    let obj = v.as_object().ok_or(QueueWaitParseError::NotObject)?;
46    let queue = obj
47        .get("queue")
48        .and_then(|x| x.as_str())
49        .filter(|s| !s.is_empty())
50        .ok_or(QueueWaitParseError::MissingQueue)?
51        .to_string();
52    let consumer = obj
53        .get("consumer")
54        .and_then(|x| x.as_str())
55        .filter(|s| !s.is_empty())
56        .ok_or(QueueWaitParseError::MissingConsumer)?
57        .to_string();
58    let group = obj
59        .get("group")
60        .and_then(|x| x.as_str())
61        .filter(|s| !s.is_empty())
62        .map(|s| s.to_string());
63    let count = obj
64        .get("count")
65        .and_then(|x| x.as_u64())
66        .map(|n| (n as usize).max(1))
67        .unwrap_or(1);
68    let wait_ms = obj.get("wait_ms").and_then(|x| x.as_u64()).unwrap_or(0);
69    Ok(QueueWaitOpenRequest {
70        queue,
71        group,
72        consumer,
73        count,
74        wait_ms,
75    })
76}
77
78pub fn build_queue_wait_open_payload(request: &QueueWaitOpenRequest) -> Vec<u8> {
79    let mut obj = serde_json::Map::new();
80    obj.insert(
81        "queue".to_string(),
82        JsonValue::String(request.queue.clone()),
83    );
84    if let Some(group) = request.group.as_ref() {
85        obj.insert("group".to_string(), JsonValue::String(group.clone()));
86    }
87    obj.insert(
88        "consumer".to_string(),
89        JsonValue::String(request.consumer.clone()),
90    );
91    obj.insert(
92        "count".to_string(),
93        JsonValue::Number((request.count as u64).into()),
94    );
95    obj.insert(
96        "wait_ms".to_string(),
97        JsonValue::Number(request.wait_ms.into()),
98    );
99    serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
100}
101
102pub fn build_queue_wait_open_frame(
103    correlation_id: u64,
104    stream_id: u16,
105    request: &QueueWaitOpenRequest,
106) -> Result<Frame, BuildError> {
107    FrameBuilder::request(correlation_id)
108        .kind(MessageKind::QueueWaitOpen)
109        .stream_id(stream_id)
110        .payload(build_queue_wait_open_payload(request))
111        .build()
112}
113
114pub fn build_event_push_payload(message: &JsonValue) -> Vec<u8> {
115    serde_json::to_vec(message).unwrap_or_default()
116}
117
118pub fn build_event_push_payload_from_json_bytes(message: &[u8]) -> Vec<u8> {
119    let value = serde_json::from_slice(message).unwrap_or(JsonValue::Null);
120    build_event_push_payload(&value)
121}
122
123pub fn build_queue_event_push_frame_from_json_bytes(
124    correlation_id: u64,
125    stream_id: u16,
126    message: &[u8],
127) -> Result<Frame, BuildError> {
128    FrameBuilder::reply_to(correlation_id)
129        .kind(MessageKind::QueueEventPush)
130        .stream_id(stream_id)
131        .payload(build_event_push_payload_from_json_bytes(message))
132        .build()
133}
134
135pub fn build_queue_wait_timeout_payload(queue: &str, wait_ms: u64) -> Vec<u8> {
136    let mut obj = serde_json::Map::new();
137    obj.insert(
138        "outcome".to_string(),
139        JsonValue::String("timeout".to_string()),
140    );
141    obj.insert("queue".to_string(), JsonValue::String(queue.to_string()));
142    obj.insert("wait_ms".to_string(), JsonValue::Number(wait_ms.into()));
143    serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
144}
145
146pub fn build_queue_wait_timeout_frame(
147    correlation_id: u64,
148    stream_id: u16,
149    queue: &str,
150    wait_ms: u64,
151) -> Result<Frame, BuildError> {
152    FrameBuilder::reply_to(correlation_id)
153        .kind(MessageKind::QueueWaitTimeout)
154        .stream_id(stream_id)
155        .payload(build_queue_wait_timeout_payload(queue, wait_ms))
156        .build()
157}
158
159pub fn build_queue_wait_error_payload(code: &str, message: &str) -> Vec<u8> {
160    let mut obj = serde_json::Map::new();
161    obj.insert("code".to_string(), JsonValue::String(code.to_string()));
162    obj.insert(
163        "message".to_string(),
164        JsonValue::String(message.to_string()),
165    );
166    serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default()
167}
168
169pub fn build_queue_wait_error_frame(
170    correlation_id: u64,
171    stream_id: u16,
172    code: &str,
173    message: &str,
174) -> Result<Frame, BuildError> {
175    FrameBuilder::reply_to(correlation_id)
176        .kind(MessageKind::StreamError)
177        .stream_id(stream_id)
178        .payload(build_queue_wait_error_payload(code, message))
179        .build()
180}
181
182pub const WAIT_CANCELLED_CODE: &str = "queue_wait_cancelled";
183pub const WAIT_EXCEEDS_CAP_CODE: &str = "queue_wait_exceeds_cap";
184pub const WAIT_FAILED_CODE: &str = "queue_wait_failed";
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189
190    #[test]
191    fn queue_wait_open_applies_defaults() {
192        let req = parse_queue_wait_open(br#"{"queue":"jobs","consumer":"w1"}"#).unwrap();
193        assert_eq!(req.queue, "jobs");
194        assert_eq!(req.consumer, "w1");
195        assert_eq!(req.group, None);
196        assert_eq!(req.count, 1);
197        assert_eq!(req.wait_ms, 0);
198    }
199
200    #[test]
201    fn queue_wait_open_rejects_missing_required_fields() {
202        assert_eq!(
203            parse_queue_wait_open(br#"{"consumer":"w1"}"#),
204            Err(QueueWaitParseError::MissingQueue)
205        );
206        assert_eq!(
207            parse_queue_wait_open(br#"{"queue":"jobs"}"#),
208            Err(QueueWaitParseError::MissingConsumer)
209        );
210    }
211
212    #[test]
213    fn queue_wait_open_builder_round_trips_request() {
214        let request = QueueWaitOpenRequest {
215            queue: "jobs".to_string(),
216            group: Some("workers".to_string()),
217            consumer: "w1".to_string(),
218            count: 2,
219            wait_ms: 5000,
220        };
221        let frame = build_queue_wait_open_frame(7, 3, &request).unwrap();
222        assert_eq!(frame.kind, MessageKind::QueueWaitOpen);
223        assert_eq!(frame.correlation_id, 7);
224        assert_eq!(frame.stream_id, 3);
225        assert_eq!(parse_queue_wait_open(&frame.payload).unwrap(), request);
226    }
227
228    #[test]
229    fn timeout_payload_has_distinct_outcome() {
230        let bytes = build_queue_wait_timeout_payload("jobs", 5000);
231        let value: JsonValue = serde_json::from_slice(&bytes).unwrap();
232        assert_eq!(value["outcome"], "timeout");
233        assert_eq!(value["queue"], "jobs");
234        assert_eq!(value["wait_ms"], 5000);
235    }
236
237    #[test]
238    fn queue_wait_frames_echo_open_stream() {
239        let event =
240            build_queue_event_push_frame_from_json_bytes(99, 7, br#"{"message_id":"42"}"#).unwrap();
241        assert_eq!(event.kind, MessageKind::QueueEventPush);
242        assert_eq!(event.correlation_id, 99);
243        assert_eq!(event.stream_id, 7);
244
245        let timeout = build_queue_wait_timeout_frame(99, 7, "jobs", 5000).unwrap();
246        assert_eq!(timeout.kind, MessageKind::QueueWaitTimeout);
247        assert_eq!(timeout.correlation_id, 99);
248        assert_eq!(timeout.stream_id, 7);
249
250        let error = build_queue_wait_error_frame(99, 7, WAIT_CANCELLED_CODE, "cancelled").unwrap();
251        assert_eq!(error.kind, MessageKind::StreamError);
252        assert_eq!(error.correlation_id, 99);
253        assert_eq!(error.stream_id, 7);
254    }
255}