Skip to main content

ff_script/
stream_tail.rs

1//! Direct XREAD / XREAD BLOCK tail for attempt-scoped streams.
2//!
3//! XREAD BLOCK cannot run inside a Valkey Function (blocking commands are
4//! rejected by `FUNCTION`), so tailing is implemented as a direct client
5//! command against the same `ferriskey::Client` the server already holds.
6//!
7//! Semantics:
8//! - `block_ms == 0` → non-blocking `XREAD` (returns immediately)
9//! - `block_ms > 0`  → `XREAD BLOCK <ms>` (returns nil on timeout → empty result)
10//!
11//! Cluster-safe: the stream key carries the execution's `{p:N}` hash tag, so
12//! XREAD routes to the single owning slot.
13//!
14//! # Timeout interaction with the ferriskey client
15//!
16//! The client's configured `request_timeout` (5s on the server default) does
17//! NOT cap blocking tail calls. `ferriskey::client::get_request_timeout`
18//! inspects every outgoing cmd; for `XREAD`/`XREADGROUP` with a `BLOCK`
19//! argument, it returns `RequestTimeoutOption::BlockingCommand(block_ms +
20//! 500ms)` via `BLOCKING_CMD_TIMEOUT_EXTENSION`. That means a
21//! `block_ms = 30_000` call gets a 30_500ms effective timeout regardless of
22//! the client's default. No manual override is needed from this module.
23//!
24//! # Terminal signal (closed-stream propagation)
25//!
26//! After every XREAD/XREAD BLOCK we HMGET the sibling `stream_meta` hash
27//! to learn whether the producer has closed the stream (`closed_at` /
28//! `closed_reason`). This is the terminal-signal contract described in
29//! RFC-006 — consumers poll `tail_stream` until `is_closed()` is true,
30//! then drain any remaining frames.
31//!
32//! # Atomicity (XREAD vs HMGET)
33//!
34//! The XREAD and the follow-up HMGET are separate Valkey round trips. A
35//! close can land between them, so a caller can observe a result that
36//! was not simultaneously true at any single instant: `frames` reflect
37//! the stream as of T1, `closed_at`/`closed_reason` reflect
38//! `stream_meta` as of T2 > T1.
39//!
40//! This is correctness-safe. `ff_append_frame` gates on `closed_at`
41//! (see `lua/stream.lua`), so once the stream is closed no new frames
42//! can be appended — any frames returned by XREAD are guaranteed to be
43//! from before the close. The terminal signal arriving "slightly later"
44//! just means a tail loop may take one extra iteration to observe the
45//! closure. Consumers should treat `closed_at.is_some()` as the exit
46//! condition and perform a final `read_attempt_stream` drain from
47//! `last_seen_id` to `+` to pick up any frames that landed before close
48//! but after the tail's last read — see RFC-006 Impl Notes
49//! §"Terminal-signal drain pattern".
50
51use ff_core::contracts::{StreamFrame, StreamFrames, STREAM_READ_HARD_CAP};
52use ff_core::types::TimestampMs;
53use ferriskey::{Client, Value};
54
55use crate::error::ScriptError;
56use crate::functions::stream::{parse_entries, parse_fields_kv, value_to_string, FieldShape};
57
58/// Tail frames from `stream_key`, blocking up to `block_ms` (0 = no block).
59///
60/// `last_id` is the exclusive cursor — XREAD returns entries with id > last_id.
61/// Pass `"0-0"` (or `"0"`) to read from the beginning.
62///
63/// Returns a [`StreamFrames`] with `frames=Vec::new()` on timeout or when
64/// the stream has no new entries. The `closed_at`/`closed_reason` fields
65/// are populated from `stream_meta` on every call so consumers can stop
66/// polling when the producer finalizes the stream.
67///
68/// `count_limit` must be `>= 1` and `<= STREAM_READ_HARD_CAP`. This
69/// mirrors [`crate::functions::stream::ff_read_attempt_stream`] and the
70/// REST/SDK boundaries; callers that silently clamped to 0 previously must
71/// now pass `STREAM_READ_HARD_CAP` explicitly.
72///
73/// # Timeout handling
74///
75/// For blocking calls (`block_ms > 0`), the ferriskey client automatically
76/// extends its `request_timeout` to `block_ms + 500ms` for the duration of
77/// this command. Callers do not need to (and should not) pass a custom
78/// client with a larger `request_timeout` just to accommodate tail. See the
79/// module-level docs for the exact ferriskey code path.
80pub async fn xread_block(
81    client: &Client,
82    stream_key: &str,
83    stream_meta_key: &str,
84    last_id: &str,
85    block_ms: u64,
86    count_limit: u64,
87) -> Result<StreamFrames, ScriptError> {
88    // These are input-validation errors — `InvalidInput` is the right
89    // class (Terminal per `ScriptError::class()`). `Parse` is reserved for
90    // FCALL-result parse failures per the contract, so using it here
91    // would mislead callers that route retry decisions off the variant.
92    if count_limit == 0 {
93        return Err(ScriptError::InvalidInput(
94            "xread_block: count_limit must be >= 1".into(),
95        ));
96    }
97    if count_limit > STREAM_READ_HARD_CAP {
98        return Err(ScriptError::InvalidInput(format!(
99            "xread_block: count_limit exceeds STREAM_READ_HARD_CAP ({STREAM_READ_HARD_CAP})"
100        )));
101    }
102    debug_assert!(
103        STREAM_READ_HARD_CAP as i128 <= i64::MAX as i128,
104        "STREAM_READ_HARD_CAP must fit in i64 — RESP COUNT arg"
105    );
106
107    let cmd = client.cmd("XREAD").arg("COUNT").arg(count_limit);
108    let cmd = if block_ms > 0 {
109        cmd.arg("BLOCK").arg(block_ms)
110    } else {
111        cmd
112    };
113    let cmd = cmd.arg("STREAMS").arg(stream_key).arg(last_id);
114
115    let raw: Value = cmd.execute().await.map_err(ScriptError::Valkey)?;
116
117    let frames = parse_xread_reply(&raw, stream_key)?;
118
119    // Fetch terminal markers separately. HMGET on a missing key returns
120    // [nil, nil] which normalizes to (None, None) — an in-progress /
121    // never-written attempt is indistinguishable from "still open" here,
122    // which is the intended semantics.
123    let (closed_at, closed_reason) = fetch_closed_meta(client, stream_meta_key).await?;
124
125    Ok(StreamFrames { frames, closed_at, closed_reason })
126}
127
128async fn fetch_closed_meta(
129    client: &Client,
130    stream_meta_key: &str,
131) -> Result<(Option<TimestampMs>, Option<String>), ScriptError> {
132    let values: Vec<Option<String>> = client
133        .cmd("HMGET")
134        .arg(stream_meta_key)
135        .arg("closed_at")
136        .arg("closed_reason")
137        .execute()
138        .await
139        .map_err(ScriptError::Valkey)?;
140
141    let closed_at = values
142        .first()
143        .and_then(|v| v.as_deref())
144        .filter(|s| !s.is_empty())
145        .and_then(|s| s.parse::<i64>().ok())
146        .map(TimestampMs::from_millis);
147    let closed_reason = values
148        .get(1)
149        .and_then(|v| v.clone())
150        .filter(|s| !s.is_empty());
151    Ok((closed_at, closed_reason))
152}
153
154/// Parse an XREAD reply into frames for a single stream.
155///
156/// Handles every shape ferriskey can produce:
157/// - `Value::Nil` on BLOCK timeout or no new entries → `Vec::new()`
158/// - `Value::Map({stream_key: Map({entry_id: fields, ...})})` — RESP3
159/// - `Value::Map({stream_key: Array([[entry_id, fields], ...])})` — mixed
160/// - `Value::Array([[stream_key, [[entry_id, fields], ...]], ...])` — RESP2
161///
162/// Per-entry field payloads are always decoded with `FieldShape::Pairs`
163/// because ferriskey's XREAD adapter (`ArrayOfPairs`) is unconditional for
164/// the XREAD command family — see `ferriskey::client::value_conversion`.
165fn parse_xread_reply(raw: &Value, stream_key: &str) -> Result<Vec<StreamFrame>, ScriptError> {
166    let outer = match raw {
167        Value::Nil => return Ok(Vec::new()),
168        Value::Map(m) => m,
169        // RESP2 fallback: array of [stream_key, entries] pairs.
170        Value::Array(arr) => {
171            let mut non_match_count: usize = 0;
172            for entry in arr {
173                let Ok(Value::Array(pair)) = entry.as_ref() else {
174                    non_match_count += 1;
175                    continue;
176                };
177                if pair.len() != 2 {
178                    non_match_count += 1;
179                    continue;
180                }
181                let matches_key = match pair[0].as_ref() {
182                    Ok(Value::BulkString(b)) => b.as_ref() == stream_key.as_bytes(),
183                    Ok(Value::SimpleString(s)) => s == stream_key,
184                    _ => false,
185                };
186                if !matches_key {
187                    non_match_count += 1;
188                    continue;
189                }
190                let entries_val = match pair[1].as_ref() {
191                    Ok(v) => v,
192                    Err(e) => {
193                        return Err(ScriptError::Parse {
194                            fcall: "stream_tail_decode".into(),
195                            execution_id: None,
196                            message: format!("XREAD entries (RESP2): {e}"),
197                        });
198                    }
199                };
200                return parse_entries_any(entries_val);
201            }
202            if non_match_count > 0 {
203                tracing::trace!(
204                    non_match = non_match_count,
205                    stream_key,
206                    "XREAD RESP2 reply had entries but none matched the requested stream key"
207                );
208            }
209            return Ok(Vec::new());
210        }
211        other => {
212            return Err(ScriptError::Parse {
213                fcall: "stream_tail_decode".into(),
214                execution_id: None,
215                message: format!("XREAD: expected Map/Nil/Array, got {other:?}"),
216            });
217        }
218    };
219
220    let mut non_match_count: usize = 0;
221    for (k, v) in outer.iter() {
222        let matches_key = match k {
223            Value::BulkString(b) => b.as_ref() == stream_key.as_bytes(),
224            Value::SimpleString(s) => s == stream_key,
225            _ => false,
226        };
227        if !matches_key {
228            non_match_count += 1;
229            continue;
230        }
231        let frames = match v {
232            // ferriskey's XREAD adapter (`Map { key: BulkString, value: Map
233            // { key: BulkString, value: ArrayOfPairs }}`) produces a
234            // `Map<entry_id, ArrayOfPairs-fields>` here — keys lift to Map
235            // but field payloads retain the Pairs shape.
236            Value::Map(entries) => {
237                let mut frames = Vec::with_capacity(entries.len());
238                for (id_v, field_v) in entries {
239                    let id = value_to_string(Some(id_v))
240                        .ok_or_else(|| ScriptError::Parse {
241                            fcall: "stream_tail_decode".into(),
242                            execution_id: None,
243                            message: "XREAD entry: bad id".into(),
244                        })?;
245                    let fields = parse_fields_kv(field_v, FieldShape::Pairs)?;
246                    frames.push(StreamFrame { id, fields });
247                }
248                frames
249            }
250            Value::Array(arr) => {
251                if arr.is_empty() {
252                    tracing::trace!(
253                        stream_key,
254                        "XREAD reply matched stream key but entries Array was empty — possible \
255                         malformed reply"
256                    );
257                }
258                parse_entries_any(v)?
259            }
260            Value::Nil => Vec::new(),
261            other => {
262                return Err(ScriptError::Parse {
263                    fcall: "stream_tail_decode".into(),
264                    execution_id: None,
265                    message: format!("XREAD entries: expected Map/Array, got {other:?}"),
266                });
267            }
268        };
269        return Ok(frames);
270    }
271    if non_match_count > 0 {
272        tracing::trace!(
273            non_match = non_match_count,
274            stream_key,
275            "XREAD Map reply had entries but none matched the requested stream key"
276        );
277    }
278    Ok(Vec::new())
279}
280
281/// Parse an Array-of-entries (or Map-of-entries) payload. Used for the
282/// inner XREAD entry list — field shape is always `Pairs` because that's
283/// what ferriskey's `ArrayOfPairs` XREAD adapter emits even when the outer
284/// stream/entry wrappers are lifted to `Map`.
285fn parse_entries_any(raw: &Value) -> Result<Vec<StreamFrame>, ScriptError> {
286    match raw {
287        Value::Nil => Ok(Vec::new()),
288        Value::Array(_) => parse_entries(raw, FieldShape::Pairs),
289        Value::Map(map) => {
290            let mut frames = Vec::with_capacity(map.len());
291            for (id_v, field_v) in map {
292                let id = value_to_string(Some(id_v))
293                    .ok_or_else(|| ScriptError::Parse {
294                        fcall: "parse_entries_any".into(),
295                        execution_id: None,
296                        message: "XREAD entry: bad id".into(),
297                    })?;
298                let fields = parse_fields_kv(field_v, FieldShape::Pairs)?;
299                frames.push(StreamFrame { id, fields });
300            }
301            Ok(frames)
302        }
303        other => Err(ScriptError::Parse {
304            fcall: "parse_entries_any".into(),
305            execution_id: None,
306            message: format!("XREAD entries: expected Array/Map/Nil, got {other:?}"),
307        }),
308    }
309}