ff_script/stream_tail.rs
1//! Direct XREAD / XREAD BLOCK tail for attempt-scoped streams.
2//!
3//! XREAD BLOCK cannot run inside a Valkey Function (blocking commands are
4//! rejected by `FUNCTION`), so tailing is implemented as a direct client
5//! command against the same `ferriskey::Client` the server already holds.
6//!
7//! Semantics:
8//! - `block_ms == 0` → non-blocking `XREAD` (returns immediately)
9//! - `block_ms > 0` → `XREAD BLOCK <ms>` (returns nil on timeout → empty result)
10//!
11//! Cluster-safe: the stream key carries the execution's `{p:N}` hash tag, so
12//! XREAD routes to the single owning slot.
13//!
14//! # Timeout interaction with the ferriskey client
15//!
16//! The client's configured `request_timeout` (5s on the server default) does
17//! NOT cap blocking tail calls. `ferriskey::client::get_request_timeout`
18//! inspects every outgoing cmd; for `XREAD`/`XREADGROUP` with a `BLOCK`
19//! argument, it returns `RequestTimeoutOption::BlockingCommand(block_ms +
20//! 500ms)` via `BLOCKING_CMD_TIMEOUT_EXTENSION`. That means a
21//! `block_ms = 30_000` call gets a 30_500ms effective timeout regardless of
22//! the client's default. No manual override is needed from this module.
23//!
24//! # Terminal signal (closed-stream propagation)
25//!
26//! After every XREAD/XREAD BLOCK we HMGET the sibling `stream_meta` hash
27//! to learn whether the producer has closed the stream (`closed_at` /
28//! `closed_reason`). This is the terminal-signal contract described in
29//! RFC-006 — consumers poll `tail_stream` until `is_closed()` is true,
30//! then drain any remaining frames.
31//!
32//! # Atomicity (XREAD vs HMGET)
33//!
34//! The XREAD and the follow-up HMGET are separate Valkey round trips. A
35//! close can land between them, so a caller can observe a result that
36//! was not simultaneously true at any single instant: `frames` reflect
37//! the stream as of T1, `closed_at`/`closed_reason` reflect
38//! `stream_meta` as of T2 > T1.
39//!
40//! This is correctness-safe. `ff_append_frame` gates on `closed_at`
41//! (see `lua/stream.lua`), so once the stream is closed no new frames
42//! can be appended — any frames returned by XREAD are guaranteed to be
43//! from before the close. The terminal signal arriving "slightly later"
44//! just means a tail loop may take one extra iteration to observe the
45//! closure. Consumers should treat `closed_at.is_some()` as the exit
46//! condition and perform a final `read_attempt_stream` drain from
47//! `last_seen_id` to `+` to pick up any frames that landed before close
48//! but after the tail's last read — see RFC-006 Impl Notes
49//! §"Terminal-signal drain pattern".
50
51use ff_core::contracts::{StreamFrame, StreamFrames, STREAM_READ_HARD_CAP};
52use ff_core::types::TimestampMs;
53use ferriskey::{Client, Value};
54
55use crate::error::ScriptError;
56use crate::functions::stream::{parse_entries, parse_fields_kv, value_to_string, FieldShape};
57
58/// Tail frames from `stream_key`, blocking up to `block_ms` (0 = no block).
59///
60/// `last_id` is the exclusive cursor — XREAD returns entries with id > last_id.
61/// Pass `"0-0"` (or `"0"`) to read from the beginning.
62///
63/// Returns a [`StreamFrames`] with `frames=Vec::new()` on timeout or when
64/// the stream has no new entries. The `closed_at`/`closed_reason` fields
65/// are populated from `stream_meta` on every call so consumers can stop
66/// polling when the producer finalizes the stream.
67///
68/// `count_limit` must be `>= 1` and `<= STREAM_READ_HARD_CAP`. This
69/// mirrors [`crate::functions::stream::ff_read_attempt_stream`] and the
70/// REST/SDK boundaries; callers that silently clamped to 0 previously must
71/// now pass `STREAM_READ_HARD_CAP` explicitly.
72///
73/// # Timeout handling
74///
75/// For blocking calls (`block_ms > 0`), the ferriskey client automatically
76/// extends its `request_timeout` to `block_ms + 500ms` for the duration of
77/// this command. Callers do not need to (and should not) pass a custom
78/// client with a larger `request_timeout` just to accommodate tail. See the
79/// module-level docs for the exact ferriskey code path.
80pub async fn xread_block(
81 client: &Client,
82 stream_key: &str,
83 stream_meta_key: &str,
84 last_id: &str,
85 block_ms: u64,
86 count_limit: u64,
87) -> Result<StreamFrames, ScriptError> {
88 // These are input-validation errors — `InvalidInput` is the right
89 // class (Terminal per `ScriptError::class()`). `Parse` is reserved for
90 // FCALL-result parse failures per the contract, so using it here
91 // would mislead callers that route retry decisions off the variant.
92 if count_limit == 0 {
93 return Err(ScriptError::InvalidInput(
94 "xread_block: count_limit must be >= 1".into(),
95 ));
96 }
97 if count_limit > STREAM_READ_HARD_CAP {
98 return Err(ScriptError::InvalidInput(format!(
99 "xread_block: count_limit exceeds STREAM_READ_HARD_CAP ({STREAM_READ_HARD_CAP})"
100 )));
101 }
102 debug_assert!(
103 STREAM_READ_HARD_CAP as i128 <= i64::MAX as i128,
104 "STREAM_READ_HARD_CAP must fit in i64 — RESP COUNT arg"
105 );
106
107 let cmd = client.cmd("XREAD").arg("COUNT").arg(count_limit);
108 let cmd = if block_ms > 0 {
109 cmd.arg("BLOCK").arg(block_ms)
110 } else {
111 cmd
112 };
113 let cmd = cmd.arg("STREAMS").arg(stream_key).arg(last_id);
114
115 let raw: Value = cmd.execute().await.map_err(ScriptError::Valkey)?;
116
117 let frames = parse_xread_reply(&raw, stream_key)?;
118
119 // Fetch terminal markers separately. HMGET on a missing key returns
120 // [nil, nil] which normalizes to (None, None) — an in-progress /
121 // never-written attempt is indistinguishable from "still open" here,
122 // which is the intended semantics.
123 let (closed_at, closed_reason) = fetch_closed_meta(client, stream_meta_key).await?;
124
125 Ok(StreamFrames { frames, closed_at, closed_reason })
126}
127
128async fn fetch_closed_meta(
129 client: &Client,
130 stream_meta_key: &str,
131) -> Result<(Option<TimestampMs>, Option<String>), ScriptError> {
132 let values: Vec<Option<String>> = client
133 .cmd("HMGET")
134 .arg(stream_meta_key)
135 .arg("closed_at")
136 .arg("closed_reason")
137 .execute()
138 .await
139 .map_err(ScriptError::Valkey)?;
140
141 let closed_at = values
142 .first()
143 .and_then(|v| v.as_deref())
144 .filter(|s| !s.is_empty())
145 .and_then(|s| s.parse::<i64>().ok())
146 .map(TimestampMs::from_millis);
147 let closed_reason = values
148 .get(1)
149 .and_then(|v| v.clone())
150 .filter(|s| !s.is_empty());
151 Ok((closed_at, closed_reason))
152}
153
154/// Parse an XREAD reply into frames for a single stream.
155///
156/// Handles every shape ferriskey can produce:
157/// - `Value::Nil` on BLOCK timeout or no new entries → `Vec::new()`
158/// - `Value::Map({stream_key: Map({entry_id: fields, ...})})` — RESP3
159/// - `Value::Map({stream_key: Array([[entry_id, fields], ...])})` — mixed
160/// - `Value::Array([[stream_key, [[entry_id, fields], ...]], ...])` — RESP2
161///
162/// Per-entry field payloads are always decoded with `FieldShape::Pairs`
163/// because ferriskey's XREAD adapter (`ArrayOfPairs`) is unconditional for
164/// the XREAD command family — see `ferriskey::client::value_conversion`.
165fn parse_xread_reply(raw: &Value, stream_key: &str) -> Result<Vec<StreamFrame>, ScriptError> {
166 let outer = match raw {
167 Value::Nil => return Ok(Vec::new()),
168 Value::Map(m) => m,
169 // RESP2 fallback: array of [stream_key, entries] pairs.
170 Value::Array(arr) => {
171 let mut non_match_count: usize = 0;
172 for entry in arr {
173 let Ok(Value::Array(pair)) = entry.as_ref() else {
174 non_match_count += 1;
175 continue;
176 };
177 if pair.len() != 2 {
178 non_match_count += 1;
179 continue;
180 }
181 let matches_key = match pair[0].as_ref() {
182 Ok(Value::BulkString(b)) => b.as_ref() == stream_key.as_bytes(),
183 Ok(Value::SimpleString(s)) => s == stream_key,
184 _ => false,
185 };
186 if !matches_key {
187 non_match_count += 1;
188 continue;
189 }
190 let entries_val = match pair[1].as_ref() {
191 Ok(v) => v,
192 Err(e) => {
193 return Err(ScriptError::Parse {
194 fcall: "stream_tail_decode".into(),
195 execution_id: None,
196 message: format!("XREAD entries (RESP2): {e}"),
197 });
198 }
199 };
200 return parse_entries_any(entries_val);
201 }
202 if non_match_count > 0 {
203 tracing::trace!(
204 non_match = non_match_count,
205 stream_key,
206 "XREAD RESP2 reply had entries but none matched the requested stream key"
207 );
208 }
209 return Ok(Vec::new());
210 }
211 other => {
212 return Err(ScriptError::Parse {
213 fcall: "stream_tail_decode".into(),
214 execution_id: None,
215 message: format!("XREAD: expected Map/Nil/Array, got {other:?}"),
216 });
217 }
218 };
219
220 let mut non_match_count: usize = 0;
221 for (k, v) in outer.iter() {
222 let matches_key = match k {
223 Value::BulkString(b) => b.as_ref() == stream_key.as_bytes(),
224 Value::SimpleString(s) => s == stream_key,
225 _ => false,
226 };
227 if !matches_key {
228 non_match_count += 1;
229 continue;
230 }
231 let frames = match v {
232 // ferriskey's XREAD adapter (`Map { key: BulkString, value: Map
233 // { key: BulkString, value: ArrayOfPairs }}`) produces a
234 // `Map<entry_id, ArrayOfPairs-fields>` here — keys lift to Map
235 // but field payloads retain the Pairs shape.
236 Value::Map(entries) => {
237 let mut frames = Vec::with_capacity(entries.len());
238 for (id_v, field_v) in entries {
239 let id = value_to_string(Some(id_v))
240 .ok_or_else(|| ScriptError::Parse {
241 fcall: "stream_tail_decode".into(),
242 execution_id: None,
243 message: "XREAD entry: bad id".into(),
244 })?;
245 let fields = parse_fields_kv(field_v, FieldShape::Pairs)?;
246 frames.push(StreamFrame { id, fields });
247 }
248 frames
249 }
250 Value::Array(arr) => {
251 if arr.is_empty() {
252 tracing::trace!(
253 stream_key,
254 "XREAD reply matched stream key but entries Array was empty — possible \
255 malformed reply"
256 );
257 }
258 parse_entries_any(v)?
259 }
260 Value::Nil => Vec::new(),
261 other => {
262 return Err(ScriptError::Parse {
263 fcall: "stream_tail_decode".into(),
264 execution_id: None,
265 message: format!("XREAD entries: expected Map/Array, got {other:?}"),
266 });
267 }
268 };
269 return Ok(frames);
270 }
271 if non_match_count > 0 {
272 tracing::trace!(
273 non_match = non_match_count,
274 stream_key,
275 "XREAD Map reply had entries but none matched the requested stream key"
276 );
277 }
278 Ok(Vec::new())
279}
280
281/// Parse an Array-of-entries (or Map-of-entries) payload. Used for the
282/// inner XREAD entry list — field shape is always `Pairs` because that's
283/// what ferriskey's `ArrayOfPairs` XREAD adapter emits even when the outer
284/// stream/entry wrappers are lifted to `Map`.
285fn parse_entries_any(raw: &Value) -> Result<Vec<StreamFrame>, ScriptError> {
286 match raw {
287 Value::Nil => Ok(Vec::new()),
288 Value::Array(_) => parse_entries(raw, FieldShape::Pairs),
289 Value::Map(map) => {
290 let mut frames = Vec::with_capacity(map.len());
291 for (id_v, field_v) in map {
292 let id = value_to_string(Some(id_v))
293 .ok_or_else(|| ScriptError::Parse {
294 fcall: "parse_entries_any".into(),
295 execution_id: None,
296 message: "XREAD entry: bad id".into(),
297 })?;
298 let fields = parse_fields_kv(field_v, FieldShape::Pairs)?;
299 frames.push(StreamFrame { id, fields });
300 }
301 Ok(frames)
302 }
303 other => Err(ScriptError::Parse {
304 fcall: "parse_entries_any".into(),
305 execution_id: None,
306 message: format!("XREAD entries: expected Array/Map/Nil, got {other:?}"),
307 }),
308 }
309}