Skip to main content

reddb_server/wire/redwire/
queue_wait.rs

1//! RedWire live queue-wait dispatch (issue #917, PRD #915).
2//!
3//! Carries the wire-side envelopes for the live queue-wait happy path:
4//!   - `QueueWaitOpen`  (client→server) — open a wait on a queue. The
5//!     awaiting session parks on the queue-wait registry's async wake
6//!     head (no blocking OS thread) and re-probes the normal delivery
7//!     path on each wake.
8//!   - `QueueEventPush` (server→client) — the delivered message,
9//!     pushed the instant one becomes deliverable on that queue.
10//!
11//! Distinct from the `OpenStream`/`StreamChunk` output-stream family in
12//! [`super::output_stream`], which stays query-result pull. These
13//! envelopes carry queue delivery and reuse the frame's `stream_id` for
14//! multiplexing so a wait can coexist with other streams on the same
15//! connection.
16
17use crate::serde_json::{self, Value as JsonValue};
18pub use reddb_wire::redwire::queue::{
19    QueueWaitOpenRequest, QueueWaitParseError, WAIT_CANCELLED_CODE, WAIT_EXCEEDS_CAP_CODE,
20    WAIT_FAILED_CODE,
21};
22use reddb_wire::redwire::Frame;
23
24pub fn parse_queue_wait_open(payload: &[u8]) -> Result<QueueWaitOpenRequest, QueueWaitParseError> {
25    reddb_wire::redwire::queue::parse_queue_wait_open(payload)
26}
27
28/// Build a `QueueEventPush` frame echoing the open request's
29/// `correlation_id` and `stream_id` so the client pairs the push with
30/// the wait it opened.
31pub fn build_event_push_frame(
32    correlation_id: u64,
33    stream_id: u16,
34    message: &JsonValue,
35) -> Result<Frame, super::BuildError> {
36    let bytes = message.to_string_compact().into_bytes();
37    reddb_wire::redwire::queue::build_queue_event_push_frame_from_json_bytes(
38        correlation_id,
39        stream_id,
40        &bytes,
41    )
42}
43
44/// Build a `QueueWaitTimeout` frame for an elapsed wait (issue #919).
45///
46/// A distinct frame kind — not a `QueueEventPush` (which always carries
47/// a delivered message) and not a `StreamError` (reserved for parse
48/// failures, cancellation, and runtime errors) — so the client can tell
49/// "your wait budget elapsed with nothing deliverable" apart from a
50/// delivery and apart from a cancellation purely from the frame kind.
51/// Echoes the open's `correlation_id` + `stream_id` so the client pairs
52/// the timeout with the wait it opened; the small JSON body restates the
53/// queue and the budget that elapsed for client-side logging.
54pub fn build_queue_wait_timeout_frame(
55    correlation_id: u64,
56    stream_id: u16,
57    queue: &str,
58    wait_ms: u64,
59) -> Result<Frame, super::BuildError> {
60    reddb_wire::redwire::queue::build_queue_wait_timeout_frame(
61        correlation_id,
62        stream_id,
63        queue,
64        wait_ms,
65    )
66}
67
68/// Build a `StreamError` frame carrying a queue-wait parse/validation
69/// failure for a specific `stream_id`. Non-fatal at the connection
70/// level — the session keeps reading other frames.
71pub fn build_queue_wait_error_frame(
72    correlation_id: u64,
73    stream_id: u16,
74    code: &str,
75    message: &str,
76) -> Result<Frame, super::BuildError> {
77    reddb_wire::redwire::queue::build_queue_wait_error_frame(
78        correlation_id,
79        stream_id,
80        code,
81        message,
82    )
83}
84
85#[cfg(test)]
86mod tests {
87    use super::*;
88    use reddb_wire::redwire::MessageKind;
89
90    #[test]
91    fn parse_minimal_request_applies_defaults() {
92        let req = parse_queue_wait_open(br#"{"queue":"jobs","consumer":"w1"}"#).unwrap();
93        assert_eq!(req.queue, "jobs");
94        assert_eq!(req.consumer, "w1");
95        assert_eq!(req.group, None);
96        assert_eq!(req.count, 1);
97        assert_eq!(req.wait_ms, 0);
98    }
99
100    #[test]
101    fn parse_full_request() {
102        let req = parse_queue_wait_open(
103            br#"{"queue":"jobs","group":"g","consumer":"w1","count":3,"wait_ms":5000}"#,
104        )
105        .unwrap();
106        assert_eq!(req.group.as_deref(), Some("g"));
107        assert_eq!(req.count, 3);
108        assert_eq!(req.wait_ms, 5000);
109    }
110
111    #[test]
112    fn parse_rejects_missing_queue_and_consumer() {
113        assert_eq!(
114            parse_queue_wait_open(br#"{"consumer":"w1"}"#).unwrap_err(),
115            QueueWaitParseError::MissingQueue
116        );
117        assert_eq!(
118            parse_queue_wait_open(br#"{"queue":"jobs"}"#).unwrap_err(),
119            QueueWaitParseError::MissingConsumer
120        );
121    }
122
123    #[test]
124    fn parse_rejects_non_json() {
125        assert_eq!(
126            parse_queue_wait_open(b"not json").unwrap_err(),
127            QueueWaitParseError::NotJson
128        );
129    }
130
131    #[test]
132    fn event_push_frame_echoes_correlation_and_stream() {
133        let mut obj = serde_json::Map::new();
134        obj.insert("message_id".to_string(), JsonValue::String("42".into()));
135        let frame = build_event_push_frame(99, 7, &JsonValue::Object(obj)).unwrap();
136        assert_eq!(frame.kind, MessageKind::QueueEventPush);
137        assert_eq!(frame.correlation_id, 99);
138        assert_eq!(frame.stream_id, 7);
139    }
140
141    #[test]
142    fn timeout_frame_is_distinct_kind_echoing_open() {
143        let frame = build_queue_wait_timeout_frame(99, 7, "jobs", 5000).unwrap();
144        // Distinct kind — not QueueEventPush (delivery) or StreamError
145        // (cancellation / failure) — so the outcome is unambiguous on
146        // the wire (AC #1, AC #2).
147        assert_eq!(frame.kind, MessageKind::QueueWaitTimeout);
148        assert_ne!(frame.kind, MessageKind::QueueEventPush);
149        assert_ne!(frame.kind, MessageKind::StreamError);
150        assert_eq!(frame.correlation_id, 99, "echoes the open correlation");
151        assert_eq!(frame.stream_id, 7, "echoes the open stream_id");
152        let body: JsonValue = serde_json::from_slice(&frame.payload).unwrap();
153        assert_eq!(body["outcome"], JsonValue::String("timeout".into()));
154        assert_eq!(body["queue"], JsonValue::String("jobs".into()));
155        assert_eq!(body["wait_ms"], JsonValue::Number(5000.0));
156    }
157
158    #[test]
159    fn cancellation_and_cap_codes_are_distinct() {
160        // The three non-delivery StreamError-bearing outcomes must not
161        // alias one another (AC #2 distinguishability extends to the
162        // error codes the client switches on).
163        assert_ne!(WAIT_CANCELLED_CODE, WAIT_FAILED_CODE);
164        assert_ne!(WAIT_CANCELLED_CODE, WAIT_EXCEEDS_CAP_CODE);
165        assert_ne!(WAIT_EXCEEDS_CAP_CODE, WAIT_FAILED_CODE);
166    }
167}