ff_script/stream_tail.rs
1//! Direct XREAD / XREAD BLOCK tail for attempt-scoped streams.
2//!
3//! XREAD BLOCK cannot run inside a Valkey Function (blocking commands are
4//! rejected by `FUNCTION`), so tailing is implemented as a direct client
5//! command against the same `ferriskey::Client` the server already holds.
6//!
7//! Semantics:
8//! - `block_ms == 0` → non-blocking `XREAD` (returns immediately)
9//! - `block_ms > 0` → `XREAD BLOCK <ms>` (returns nil on timeout → empty result)
10//!
11//! Cluster-safe: the stream key carries the execution's `{p:N}` hash tag, so
12//! XREAD routes to the single owning slot.
13//!
14//! # Timeout interaction with the ferriskey client
15//!
16//! The client's configured `request_timeout` (5s on the server default) does
17//! NOT cap blocking tail calls. `ferriskey::client::get_request_timeout`
18//! inspects every outgoing cmd; for `XREAD`/`XREADGROUP` with a `BLOCK`
19//! argument, it returns `RequestTimeoutOption::BlockingCommand(block_ms +
20//! 500ms)` via `BLOCKING_CMD_TIMEOUT_EXTENSION`. That means a
21//! `block_ms = 30_000` call gets a 30_500ms effective timeout regardless of
22//! the client's default. No manual override is needed from this module.
23//!
24//! # Terminal signal (closed-stream propagation)
25//!
26//! After every XREAD/XREAD BLOCK we HMGET the sibling `stream_meta` hash
27//! to learn whether the producer has closed the stream (`closed_at` /
28//! `closed_reason`). This is the terminal-signal contract described in
29//! RFC-006 — consumers poll `tail_stream` until `is_closed()` is true,
30//! then drain any remaining frames.
31//!
32//! # Atomicity (XREAD vs HMGET)
33//!
34//! The XREAD and the follow-up HMGET are separate Valkey round trips. A
35//! close can land between them, so a caller can observe a result that
36//! was not simultaneously true at any single instant: `frames` reflect
37//! the stream as of T1, `closed_at`/`closed_reason` reflect
38//! `stream_meta` as of T2 > T1.
39//!
40//! This is correctness-safe. `ff_append_frame` gates on `closed_at`
41//! (see `lua/stream.lua`), so once the stream is closed no new frames
42//! can be appended — any frames returned by XREAD are guaranteed to be
43//! from before the close. The terminal signal arriving "slightly later"
44//! just means a tail loop may take one extra iteration to observe the
45//! closure. Consumers should treat `closed_at.is_some()` as the exit
46//! condition and perform a final `read_attempt_stream` drain from
47//! `last_seen_id` to `+` to pick up any frames that landed before close
48//! but after the tail's last read — see RFC-006 Impl Notes
49//! §"Terminal-signal drain pattern".
50
51use ff_core::contracts::{StreamFrame, StreamFrames, STREAM_READ_HARD_CAP};
52use ff_core::types::TimestampMs;
53use ferriskey::{Client, Value};
54
55use crate::error::ScriptError;
56use crate::functions::stream::{parse_entries, parse_fields_kv, value_to_string, FieldShape};
57
58/// Tail frames from `stream_key`, blocking up to `block_ms` (0 = no block).
59///
60/// `last_id` is the exclusive cursor — XREAD returns entries with id > last_id.
61/// Pass `"0-0"` (or `"0"`) to read from the beginning.
62///
63/// Returns a [`StreamFrames`] with `frames=Vec::new()` on timeout or when
64/// the stream has no new entries. The `closed_at`/`closed_reason` fields
65/// are populated from `stream_meta` on every call so consumers can stop
66/// polling when the producer finalizes the stream.
67///
68/// `count_limit` must be `>= 1` and `<= STREAM_READ_HARD_CAP`. This
69/// mirrors [`crate::functions::stream::ff_read_attempt_stream`] and the
70/// REST/SDK boundaries; callers that silently clamped to 0 previously must
71/// now pass `STREAM_READ_HARD_CAP` explicitly.
72///
73/// # Timeout handling
74///
75/// For blocking calls (`block_ms > 0`), the ferriskey client automatically
76/// extends its `request_timeout` to `block_ms + 500ms` for the duration of
77/// this command. Callers do not need to (and should not) pass a custom
78/// client with a larger `request_timeout` just to accommodate tail. See the
79/// module-level docs for the exact ferriskey code path.
80pub async fn xread_block(
81 client: &Client,
82 stream_key: &str,
83 stream_meta_key: &str,
84 last_id: &str,
85 block_ms: u64,
86 count_limit: u64,
87) -> Result<StreamFrames, ScriptError> {
88 // These are input-validation errors — `InvalidInput` is the right
89 // class (Terminal per `ScriptError::class()`). `Parse` is reserved for
90 // FCALL-result parse failures per the contract, so using it here
91 // would mislead callers that route retry decisions off the variant.
92 if count_limit == 0 {
93 return Err(ScriptError::InvalidInput(
94 "xread_block: count_limit must be >= 1".into(),
95 ));
96 }
97 if count_limit > STREAM_READ_HARD_CAP {
98 return Err(ScriptError::InvalidInput(format!(
99 "xread_block: count_limit exceeds STREAM_READ_HARD_CAP ({STREAM_READ_HARD_CAP})"
100 )));
101 }
102 debug_assert!(
103 STREAM_READ_HARD_CAP as i128 <= i64::MAX as i128,
104 "STREAM_READ_HARD_CAP must fit in i64 — RESP COUNT arg"
105 );
106
107 let cmd = client.cmd("XREAD").arg("COUNT").arg(count_limit);
108 let cmd = if block_ms > 0 {
109 cmd.arg("BLOCK").arg(block_ms)
110 } else {
111 cmd
112 };
113 let cmd = cmd.arg("STREAMS").arg(stream_key).arg(last_id);
114
115 let raw: Value = cmd.execute().await.map_err(ScriptError::Valkey)?;
116
117 let frames = parse_xread_reply(&raw, stream_key)?;
118
119 // Fetch terminal markers separately. HMGET on a missing key returns
120 // [nil, nil] which normalizes to (None, None) — an in-progress /
121 // never-written attempt is indistinguishable from "still open" here,
122 // which is the intended semantics.
123 let (closed_at, closed_reason) = fetch_closed_meta(client, stream_meta_key).await?;
124
125 Ok(StreamFrames { frames, closed_at, closed_reason })
126}
127
128async fn fetch_closed_meta(
129 client: &Client,
130 stream_meta_key: &str,
131) -> Result<(Option<TimestampMs>, Option<String>), ScriptError> {
132 let values: Vec<Option<String>> = client
133 .cmd("HMGET")
134 .arg(stream_meta_key)
135 .arg("closed_at")
136 .arg("closed_reason")
137 .execute()
138 .await
139 .map_err(ScriptError::Valkey)?;
140
141 let closed_at = values
142 .first()
143 .and_then(|v| v.as_deref())
144 .filter(|s| !s.is_empty())
145 .and_then(|s| s.parse::<i64>().ok())
146 .map(TimestampMs::from_millis);
147 let closed_reason = values
148 .get(1)
149 .and_then(|v| v.clone())
150 .filter(|s| !s.is_empty());
151 Ok((closed_at, closed_reason))
152}
153
154/// Parse an XREAD reply into frames for a single stream.
155///
156/// Handles every shape ferriskey can produce:
157/// - `Value::Nil` on BLOCK timeout or no new entries → `Vec::new()`
158/// - `Value::Map({stream_key: Map({entry_id: fields, ...})})` — RESP3
159/// - `Value::Map({stream_key: Array([[entry_id, fields], ...])})` — mixed
160/// - `Value::Array([[stream_key, [[entry_id, fields], ...]], ...])` — RESP2
161///
162/// Per-entry field payloads are always decoded with `FieldShape::Pairs`
163/// because ferriskey's XREAD adapter (`ArrayOfPairs`) is unconditional for
164/// the XREAD command family — see `ferriskey::client::value_conversion`.
165fn parse_xread_reply(raw: &Value, stream_key: &str) -> Result<Vec<StreamFrame>, ScriptError> {
166 let outer = match raw {
167 Value::Nil => return Ok(Vec::new()),
168 Value::Map(m) => m,
169 // RESP2 fallback: array of [stream_key, entries] pairs.
170 Value::Array(arr) => {
171 let mut non_match_count: usize = 0;
172 for entry in arr {
173 let Ok(Value::Array(pair)) = entry.as_ref() else {
174 non_match_count += 1;
175 continue;
176 };
177 if pair.len() != 2 {
178 non_match_count += 1;
179 continue;
180 }
181 let matches_key = match pair[0].as_ref() {
182 Ok(Value::BulkString(b)) => b.as_ref() == stream_key.as_bytes(),
183 Ok(Value::SimpleString(s)) => s == stream_key,
184 _ => false,
185 };
186 if !matches_key {
187 non_match_count += 1;
188 continue;
189 }
190 let entries_val = match pair[1].as_ref() {
191 Ok(v) => v,
192 Err(e) => {
193 return Err(ScriptError::Parse(format!(
194 "XREAD entries (RESP2): {e}"
195 )));
196 }
197 };
198 return parse_entries_any(entries_val);
199 }
200 if non_match_count > 0 {
201 tracing::trace!(
202 non_match = non_match_count,
203 stream_key,
204 "XREAD RESP2 reply had entries but none matched the requested stream key"
205 );
206 }
207 return Ok(Vec::new());
208 }
209 other => {
210 return Err(ScriptError::Parse(format!(
211 "XREAD: expected Map/Nil/Array, got {other:?}"
212 )));
213 }
214 };
215
216 let mut non_match_count: usize = 0;
217 for (k, v) in outer.iter() {
218 let matches_key = match k {
219 Value::BulkString(b) => b.as_ref() == stream_key.as_bytes(),
220 Value::SimpleString(s) => s == stream_key,
221 _ => false,
222 };
223 if !matches_key {
224 non_match_count += 1;
225 continue;
226 }
227 let frames = match v {
228 // ferriskey's XREAD adapter (`Map { key: BulkString, value: Map
229 // { key: BulkString, value: ArrayOfPairs }}`) produces a
230 // `Map<entry_id, ArrayOfPairs-fields>` here — keys lift to Map
231 // but field payloads retain the Pairs shape.
232 Value::Map(entries) => {
233 let mut frames = Vec::with_capacity(entries.len());
234 for (id_v, field_v) in entries {
235 let id = value_to_string(Some(id_v))
236 .ok_or_else(|| ScriptError::Parse("XREAD entry: bad id".into()))?;
237 let fields = parse_fields_kv(field_v, FieldShape::Pairs)?;
238 frames.push(StreamFrame { id, fields });
239 }
240 frames
241 }
242 Value::Array(arr) => {
243 if arr.is_empty() {
244 tracing::trace!(
245 stream_key,
246 "XREAD reply matched stream key but entries Array was empty — possible \
247 malformed reply"
248 );
249 }
250 parse_entries_any(v)?
251 }
252 Value::Nil => Vec::new(),
253 other => {
254 return Err(ScriptError::Parse(format!(
255 "XREAD entries: expected Map/Array, got {other:?}"
256 )));
257 }
258 };
259 return Ok(frames);
260 }
261 if non_match_count > 0 {
262 tracing::trace!(
263 non_match = non_match_count,
264 stream_key,
265 "XREAD Map reply had entries but none matched the requested stream key"
266 );
267 }
268 Ok(Vec::new())
269}
270
271/// Parse an Array-of-entries (or Map-of-entries) payload. Used for the
272/// inner XREAD entry list — field shape is always `Pairs` because that's
273/// what ferriskey's `ArrayOfPairs` XREAD adapter emits even when the outer
274/// stream/entry wrappers are lifted to `Map`.
275fn parse_entries_any(raw: &Value) -> Result<Vec<StreamFrame>, ScriptError> {
276 match raw {
277 Value::Nil => Ok(Vec::new()),
278 Value::Array(_) => parse_entries(raw, FieldShape::Pairs),
279 Value::Map(map) => {
280 let mut frames = Vec::with_capacity(map.len());
281 for (id_v, field_v) in map {
282 let id = value_to_string(Some(id_v))
283 .ok_or_else(|| ScriptError::Parse("XREAD entry: bad id".into()))?;
284 let fields = parse_fields_kv(field_v, FieldShape::Pairs)?;
285 frames.push(StreamFrame { id, fields });
286 }
287 Ok(frames)
288 }
289 other => Err(ScriptError::Parse(format!(
290 "XREAD entries: expected Array/Map/Nil, got {other:?}"
291 ))),
292 }
293}