Skip to main content

reddb_server/wire/redwire/
queue_wait.rs

1//! RedWire live queue-wait dispatch (issue #917, PRD #915).
2//!
3//! Carries the wire-side envelopes for the live queue-wait happy path:
4//!   - `QueueWaitOpen`  (client→server) — open a wait on a queue. The
5//!     awaiting session parks on the queue-wait registry's async wake
6//!     head (no blocking OS thread) and re-probes the normal delivery
7//!     path on each wake.
8//!   - `QueueEventPush` (server→client) — the delivered message,
9//!     pushed the instant one becomes deliverable on that queue.
10//!
11//! Distinct from the `OpenStream`/`StreamChunk` output-stream family in
12//! [`super::output_stream`], which stays query-result pull. These
13//! envelopes carry queue delivery and reuse the frame's `stream_id` for
14//! multiplexing so a wait can coexist with other streams on the same
15//! connection.
16
17use crate::serde_json::{self, Value as JsonValue};
18use reddb_wire::redwire::frame::{Frame, MessageKind};
19
20use super::FrameBuilder;
21
22/// Parsed `QueueWaitOpen` payload. Shape:
23///
24/// ```json
25/// { "queue": "jobs", "group": "g?", "consumer": "w1",
26///   "count": 1, "wait_ms": 5000 }
27/// ```
28///
29/// `group` is optional (the runtime resolves the default work / fanout
30/// group when absent, matching the SQL `QUEUE READ` path). `count`
31/// defaults to 1 and `wait_ms` to 0 (a single re-probe of current
32/// state) when omitted.
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct QueueWaitOpenRequest {
35    pub queue: String,
36    pub group: Option<String>,
37    pub consumer: String,
38    pub count: usize,
39    pub wait_ms: u64,
40}
41
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub enum QueueWaitParseError {
44    NotJson,
45    NotObject,
46    MissingQueue,
47    MissingConsumer,
48}
49
50impl QueueWaitParseError {
51    pub fn code(&self) -> &'static str {
52        match self {
53            Self::NotJson | Self::NotObject => "queue_wait_invalid_payload",
54            Self::MissingQueue => "queue_wait_missing_queue",
55            Self::MissingConsumer => "queue_wait_missing_consumer",
56        }
57    }
58    pub fn message(&self) -> &'static str {
59        match self {
60            Self::NotJson => "QueueWaitOpen payload must be JSON",
61            Self::NotObject => "QueueWaitOpen payload must be a JSON object",
62            Self::MissingQueue => "QueueWaitOpen payload missing 'queue' string field",
63            Self::MissingConsumer => "QueueWaitOpen payload missing 'consumer' string field",
64        }
65    }
66}
67
68pub fn parse_queue_wait_open(payload: &[u8]) -> Result<QueueWaitOpenRequest, QueueWaitParseError> {
69    let v: JsonValue = serde_json::from_slice(payload).map_err(|_| QueueWaitParseError::NotJson)?;
70    let obj = v.as_object().ok_or(QueueWaitParseError::NotObject)?;
71    let queue = obj
72        .get("queue")
73        .and_then(|x| x.as_str())
74        .filter(|s| !s.is_empty())
75        .ok_or(QueueWaitParseError::MissingQueue)?
76        .to_string();
77    let consumer = obj
78        .get("consumer")
79        .and_then(|x| x.as_str())
80        .filter(|s| !s.is_empty())
81        .ok_or(QueueWaitParseError::MissingConsumer)?
82        .to_string();
83    let group = obj
84        .get("group")
85        .and_then(|x| x.as_str())
86        .filter(|s| !s.is_empty())
87        .map(|s| s.to_string());
88    // `count` defaults to 1; clamp to at least 1 so a wait always asks
89    // for a deliverable message.
90    let count = obj
91        .get("count")
92        .and_then(|x| x.as_f64())
93        .map(|n| (n as usize).max(1))
94        .unwrap_or(1);
95    let wait_ms = obj
96        .get("wait_ms")
97        .and_then(|x| x.as_f64())
98        .map(|n| n.max(0.0) as u64)
99        .unwrap_or(0);
100    Ok(QueueWaitOpenRequest {
101        queue,
102        group,
103        consumer,
104        count,
105        wait_ms,
106    })
107}
108
109/// Build the `QueueEventPush` payload for one delivered message. The
110/// `message` value is the JSON object rendered by the runtime
111/// (`message_id` / `payload` / `consumer` / `delivery_count`).
112pub fn build_event_push_payload(message: &JsonValue) -> Vec<u8> {
113    serde_json::to_vec(message).unwrap_or_default()
114}
115
116/// Build a `QueueEventPush` frame echoing the open request's
117/// `correlation_id` and `stream_id` so the client pairs the push with
118/// the wait it opened.
119pub fn build_event_push_frame(
120    correlation_id: u64,
121    stream_id: u16,
122    message: &JsonValue,
123) -> Result<Frame, super::BuildError> {
124    FrameBuilder::reply_to(correlation_id)
125        .kind(MessageKind::QueueEventPush)
126        .stream_id(stream_id)
127        .payload(build_event_push_payload(message))
128        .build()
129}
130
131/// Build a `QueueWaitTimeout` frame for an elapsed wait (issue #919).
132///
133/// A distinct frame kind — not a `QueueEventPush` (which always carries
134/// a delivered message) and not a `StreamError` (reserved for parse
135/// failures, cancellation, and runtime errors) — so the client can tell
136/// "your wait budget elapsed with nothing deliverable" apart from a
137/// delivery and apart from a cancellation purely from the frame kind.
138/// Echoes the open's `correlation_id` + `stream_id` so the client pairs
139/// the timeout with the wait it opened; the small JSON body restates the
140/// queue and the budget that elapsed for client-side logging.
141pub fn build_queue_wait_timeout_frame(
142    correlation_id: u64,
143    stream_id: u16,
144    queue: &str,
145    wait_ms: u64,
146) -> Result<Frame, super::BuildError> {
147    let mut obj = serde_json::Map::new();
148    obj.insert(
149        "outcome".to_string(),
150        JsonValue::String("timeout".to_string()),
151    );
152    obj.insert("queue".to_string(), JsonValue::String(queue.to_string()));
153    obj.insert("wait_ms".to_string(), JsonValue::Number(wait_ms as f64));
154    FrameBuilder::reply_to(correlation_id)
155        .kind(MessageKind::QueueWaitTimeout)
156        .stream_id(stream_id)
157        .payload(serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default())
158        .build()
159}
160
161/// `StreamError` code for a live queue-wait terminated by server-side
162/// cancellation (registry `cancel_all`, e.g. shutdown). Distinct from
163/// [`WAIT_FAILED_CODE`] (a genuine runtime error) and from the timeout
164/// frame, so the three non-delivery outcomes never alias on the wire.
165pub const WAIT_CANCELLED_CODE: &str = "queue_wait_cancelled";
166
167/// `StreamError` code for a wait open whose requested budget exceeds the
168/// server's maximum wait cap (issue #919). The accompanying message
169/// names the `red.config` key so an operator can act on it.
170pub const WAIT_EXCEEDS_CAP_CODE: &str = "queue_wait_exceeds_cap";
171
172/// `StreamError` code for a runtime failure while servicing a wait
173/// (e.g. the queue read errored). Non-fatal at the connection level.
174pub const WAIT_FAILED_CODE: &str = "queue_wait_failed";
175
176/// Build a `StreamError` frame carrying a queue-wait parse/validation
177/// failure for a specific `stream_id`. Non-fatal at the connection
178/// level — the session keeps reading other frames.
179pub fn build_queue_wait_error_frame(
180    correlation_id: u64,
181    stream_id: u16,
182    code: &str,
183    message: &str,
184) -> Result<Frame, super::BuildError> {
185    let mut obj = serde_json::Map::new();
186    obj.insert("code".to_string(), JsonValue::String(code.to_string()));
187    obj.insert(
188        "message".to_string(),
189        JsonValue::String(message.to_string()),
190    );
191    FrameBuilder::reply_to(correlation_id)
192        .kind(MessageKind::StreamError)
193        .stream_id(stream_id)
194        .payload(serde_json::to_vec(&JsonValue::Object(obj)).unwrap_or_default())
195        .build()
196}
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201
202    #[test]
203    fn parse_minimal_request_applies_defaults() {
204        let req = parse_queue_wait_open(br#"{"queue":"jobs","consumer":"w1"}"#).unwrap();
205        assert_eq!(req.queue, "jobs");
206        assert_eq!(req.consumer, "w1");
207        assert_eq!(req.group, None);
208        assert_eq!(req.count, 1);
209        assert_eq!(req.wait_ms, 0);
210    }
211
212    #[test]
213    fn parse_full_request() {
214        let req = parse_queue_wait_open(
215            br#"{"queue":"jobs","group":"g","consumer":"w1","count":3,"wait_ms":5000}"#,
216        )
217        .unwrap();
218        assert_eq!(req.group.as_deref(), Some("g"));
219        assert_eq!(req.count, 3);
220        assert_eq!(req.wait_ms, 5000);
221    }
222
223    #[test]
224    fn parse_rejects_missing_queue_and_consumer() {
225        assert_eq!(
226            parse_queue_wait_open(br#"{"consumer":"w1"}"#).unwrap_err(),
227            QueueWaitParseError::MissingQueue
228        );
229        assert_eq!(
230            parse_queue_wait_open(br#"{"queue":"jobs"}"#).unwrap_err(),
231            QueueWaitParseError::MissingConsumer
232        );
233    }
234
235    #[test]
236    fn parse_rejects_non_json() {
237        assert_eq!(
238            parse_queue_wait_open(b"not json").unwrap_err(),
239            QueueWaitParseError::NotJson
240        );
241    }
242
243    #[test]
244    fn event_push_frame_echoes_correlation_and_stream() {
245        let mut obj = serde_json::Map::new();
246        obj.insert("message_id".to_string(), JsonValue::String("42".into()));
247        let frame = build_event_push_frame(99, 7, &JsonValue::Object(obj)).unwrap();
248        assert_eq!(frame.kind, MessageKind::QueueEventPush);
249        assert_eq!(frame.correlation_id, 99);
250        assert_eq!(frame.stream_id, 7);
251    }
252
253    #[test]
254    fn timeout_frame_is_distinct_kind_echoing_open() {
255        let frame = build_queue_wait_timeout_frame(99, 7, "jobs", 5000).unwrap();
256        // Distinct kind — not QueueEventPush (delivery) or StreamError
257        // (cancellation / failure) — so the outcome is unambiguous on
258        // the wire (AC #1, AC #2).
259        assert_eq!(frame.kind, MessageKind::QueueWaitTimeout);
260        assert_ne!(frame.kind, MessageKind::QueueEventPush);
261        assert_ne!(frame.kind, MessageKind::StreamError);
262        assert_eq!(frame.correlation_id, 99, "echoes the open correlation");
263        assert_eq!(frame.stream_id, 7, "echoes the open stream_id");
264        let body: JsonValue = serde_json::from_slice(&frame.payload).unwrap();
265        assert_eq!(body["outcome"], JsonValue::String("timeout".into()));
266        assert_eq!(body["queue"], JsonValue::String("jobs".into()));
267        assert_eq!(body["wait_ms"], JsonValue::Number(5000.0));
268    }
269
270    #[test]
271    fn cancellation_and_cap_codes_are_distinct() {
272        // The three non-delivery StreamError-bearing outcomes must not
273        // alias one another (AC #2 distinguishability extends to the
274        // error codes the client switches on).
275        assert_ne!(WAIT_CANCELLED_CODE, WAIT_FAILED_CODE);
276        assert_ne!(WAIT_CANCELLED_CODE, WAIT_EXCEEDS_CAP_CODE);
277        assert_ne!(WAIT_EXCEEDS_CAP_CODE, WAIT_FAILED_CODE);
278    }
279}