Skip to main content

ff_script/functions/
stream.rs

1//! Typed FCALL wrapper for stream append function (lua/stream.lua).
2
3use ff_core::contracts::*;
4use crate::error::ScriptError;
5use ff_core::keys::ExecKeyContext;
6
7use crate::result::{FcallResult, FromFcallResult};
8
9/// Key context for stream operations — only needs exec keys (no indexes).
10pub struct StreamOpKeys<'a> {
11    pub ctx: &'a ExecKeyContext,
12}
13
14// ─── ff_append_frame ──────────────────────────────────────────────────
15//
16// Lua KEYS (3): exec_core, stream_data, stream_meta
17// Lua ARGV (13): execution_id, attempt_index, lease_id, lease_epoch,
18//                frame_type, ts, payload, encoding, correlation_id,
19//                source, retention_maxlen, attempt_id, max_payload_bytes
20
21ff_function! {
22    pub ff_append_frame(args: AppendFrameArgs) -> AppendFrameResult {
23        keys(k: &StreamOpKeys<'_>) {
24            k.ctx.core(),                                  // 1
25            k.ctx.stream(args.attempt_index),              // 2
26            k.ctx.stream_meta(args.attempt_index),         // 3
27        }
28        argv {
29            args.execution_id.to_string(),                 // 1
30            args.attempt_index.to_string(),                // 2
31            args.lease_id.to_string(),                     // 3
32            args.lease_epoch.to_string(),                  // 4
33            args.frame_type.clone(),                       // 5
34            args.timestamp.to_string(),                    // 6
35            String::from_utf8_lossy(&args.payload).into_owned(), // 7
36            args.encoding.clone().unwrap_or_else(|| "utf8".into()), // 8
37            args.correlation_id.clone().unwrap_or_default(), // 9
38            args.source.clone().unwrap_or_else(|| "worker".into()), // 10
39            args.retention_maxlen.unwrap_or(0).to_string(), // 11
40            args.attempt_id.to_string(),                   // 12
41            args.max_payload_bytes.unwrap_or(65536).to_string(), // 13
42        }
43    }
44}
45
46impl FromFcallResult for AppendFrameResult {
47    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
48        let r = FcallResult::parse(raw)?.into_success()?;
49        // ok(entry_id, frame_count)
50        let entry_id = r.field_str(0);
51        let frame_count = r.field_str(1).parse::<u64>()
52            .map_err(|e| ScriptError::Parse {
53                fcall: "ff_append_frame".into(),
54                execution_id: None,
55                message: format!("bad frame_count: {e}"),
56            })?;
57        Ok(AppendFrameResult::Appended {
58            entry_id,
59            frame_count,
60        })
61    }
62}
63
64// ─── ff_read_attempt_stream ───────────────────────────────────────────
65//
66// Lua KEYS (2): stream_data, stream_meta
67// Lua ARGV (3): from_id, to_id, count_limit
68
69ff_function! {
70    pub ff_read_attempt_stream(args: ReadFramesArgs) -> ReadFramesResult {
71        keys(k: &StreamOpKeys<'_>) {
72            k.ctx.stream(args.attempt_index),              // 1
73            k.ctx.stream_meta(args.attempt_index),         // 2
74        }
75        argv {
76            args.from_id.clone(),                          // 1
77            args.to_id.clone(),                            // 2
78            args.count_limit.to_string(),                  // 3
79        }
80    }
81}
82
83impl FromFcallResult for ReadFramesResult {
84    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
85        let r = FcallResult::parse(raw)?.into_success()?;
86        // Lua returns `ok(entries, closed_at, closed_reason)`.
87        // - entries: array of [entry_id, [f1, v1, ...]]
88        // - closed_at: string (ms timestamp) or "" if open
89        // - closed_reason: string or "" if open
90        let entries = r.fields.first().ok_or_else(|| ScriptError::Parse {
91            fcall: "ff_read_attempt_stream".into(),
92            execution_id: None,
93            message: "missing entries field".into(),
94        })?;
95        // XRANGE from a Valkey Function passes fields through as raw Lua
96        // arrays — no ArrayOfPairs conversion, so always Flat here.
97        let frames = parse_entries(entries, FieldShape::Flat)?;
98
99        let closed_at = r.field_str(1);
100        let closed_at = if closed_at.is_empty() {
101            None
102        } else {
103            closed_at
104                .parse::<i64>()
105                .ok()
106                .map(ff_core::types::TimestampMs::from_millis)
107        };
108
109        let closed_reason = r.field_str(2);
110        let closed_reason = if closed_reason.is_empty() {
111            None
112        } else {
113            Some(closed_reason)
114        };
115
116        Ok(ReadFramesResult::Frames(StreamFrames {
117            frames,
118            closed_at,
119            closed_reason,
120        }))
121    }
122}
123
124/// Explicit shape tag for stream-entry field payloads. Passed from the
125/// caller because each code path *knows* which shape ferriskey will emit —
126/// heuristic detection ("first entry looks like a 2-elem array") can
127/// mis-classify when field values happen to be arrays themselves.
128#[derive(Clone, Copy, Debug)]
129pub enum FieldShape {
130    /// Flat alternating `[k, v, k, v, ...]`. Emitted by raw Lua FCALL
131    /// replies and by RESP2 servers that bypass the ArrayOfPairs adapter.
132    Flat,
133    /// `[[k, v], [k, v], ...]` — ferriskey's `ArrayOfPairs` adapter for
134    /// XRANGE / XREAD direct commands.
135    Pairs,
136    /// RESP3 `{k: v, ...}` map.
137    Map,
138}
139
140/// Parse an XRANGE/XREAD-shaped `[entry_id, fields]` list.
141///
142/// Caller must tell us the field shape — see [`FieldShape`]. Inferring
143/// shape from data alone is unsound: any call site always knows whether
144/// it's reading a raw Lua payload (Flat), a ferriskey-adapted XRANGE/XREAD
145/// reply (Pairs), or a RESP3 Map.
146///
147/// Nil fields → an empty frame (no fields).
148pub(crate) fn parse_entries(
149    raw: &ferriskey::Value,
150    shape: FieldShape,
151) -> Result<Vec<StreamFrame>, ScriptError> {
152    let entries = match raw {
153        ferriskey::Value::Array(arr) => arr,
154        ferriskey::Value::Nil => return Ok(Vec::new()),
155        other => {
156            return Err(ScriptError::Parse {
157                fcall: "parse_entries".into(),
158                execution_id: None,
159                message: format!("XRANGE/XREAD entries: expected Array, got {other:?}"),
160            });
161        }
162    };
163
164    let mut frames = Vec::with_capacity(entries.len());
165    for entry in entries.iter() {
166        let entry = entry.as_ref().map_err(|e| ScriptError::Parse {
167            fcall: "parse_entries".into(),
168            execution_id: None,
169            message: format!("XRANGE entry error: {e}"),
170        })?;
171        let parts = match entry {
172            ferriskey::Value::Array(a) => a,
173            other => {
174                return Err(ScriptError::Parse {
175                    fcall: "parse_entries".into(),
176                    execution_id: None,
177                    message: format!("XRANGE entry: expected Array, got {other:?}"),
178                });
179            }
180        };
181        if parts.len() != 2 {
182            return Err(ScriptError::Parse {
183                fcall: "parse_entries".into(),
184                execution_id: None,
185                message: format!("XRANGE entry: expected 2 elements, got {}", parts.len()),
186            });
187        }
188        let id = value_to_string(parts[0].as_ref().ok()).ok_or_else(|| ScriptError::Parse {
189            fcall: "parse_entries".into(),
190            execution_id: None,
191            message: "XRANGE entry: missing/invalid id".into(),
192        })?;
193
194        let field_val = match parts[1].as_ref() {
195            Ok(v) => v,
196            Err(e) => {
197                return Err(ScriptError::Parse {
198                    fcall: "parse_entries".into(),
199                    execution_id: None,
200                    message: format!("XRANGE entry fields error: {e}"),
201                });
202            }
203        };
204        let fields = parse_fields_kv(field_val, shape)?;
205        frames.push(StreamFrame { id, fields });
206    }
207    Ok(frames)
208}
209
210/// Parse a stream-entry field payload into a sorted map, using an explicit
211/// [`FieldShape`] tag supplied by the caller.
212///
213/// `Value::Nil` always yields an empty map (entry exists but has no
214/// fields) regardless of `shape` — both XRANGE and XREAD can produce this
215/// for empty writes.
216pub(crate) fn parse_fields_kv(
217    v: &ferriskey::Value,
218    shape: FieldShape,
219) -> Result<std::collections::BTreeMap<String, String>, ScriptError> {
220    let mut out = std::collections::BTreeMap::new();
221    if matches!(v, ferriskey::Value::Nil) {
222        return Ok(out);
223    }
224    match shape {
225        FieldShape::Flat => {
226            let arr = match v {
227                ferriskey::Value::Array(arr) => arr,
228                other => {
229                    return Err(ScriptError::Parse {
230                        fcall: "parse_fields_kv".into(),
231                        execution_id: None,
232                        message: format!(
233                            "stream fields (Flat): expected Array, got {other:?}"
234                        ),
235                    });
236                }
237            };
238            if !arr.len().is_multiple_of(2) {
239                return Err(ScriptError::Parse {
240                    fcall: "parse_fields_kv".into(),
241                    execution_id: None,
242                    message: format!(
243                        "stream fields (Flat): odd element count {}",
244                        arr.len()
245                    ),
246                });
247            }
248            let mut i = 0;
249            while i < arr.len() {
250                let k = value_to_string(arr[i].as_ref().ok())
251                    .ok_or_else(|| ScriptError::Parse {
252                        fcall: "parse_fields_kv".into(),
253                        execution_id: None,
254                        message: "stream field: bad key".into(),
255                    })?;
256                let val = value_to_string(arr[i + 1].as_ref().ok()).unwrap_or_default();
257                out.insert(k, val);
258                i += 2;
259            }
260        }
261        FieldShape::Pairs => {
262            let arr = match v {
263                ferriskey::Value::Array(arr) => arr,
264                other => {
265                    return Err(ScriptError::Parse {
266                        fcall: "parse_fields_kv".into(),
267                        execution_id: None,
268                        message: format!(
269                            "stream fields (Pairs): expected Array, got {other:?}"
270                        ),
271                    });
272                }
273            };
274            for pair in arr.iter() {
275                let inner = match pair.as_ref() {
276                    Ok(ferriskey::Value::Array(inner)) => inner,
277                    _ => {
278                        return Err(ScriptError::Parse {
279                            fcall: "parse_fields_kv".into(),
280                            execution_id: None,
281                            message: "stream fields (Pairs): expected 2-element Array per entry"
282                                .into(),
283                        });
284                    }
285                };
286                if inner.len() != 2 {
287                    return Err(ScriptError::Parse {
288                        fcall: "parse_fields_kv".into(),
289                        execution_id: None,
290                        message: format!(
291                            "stream fields (Pairs): expected len=2, got {}",
292                            inner.len()
293                        ),
294                    });
295                }
296                let k = value_to_string(inner[0].as_ref().ok())
297                    .ok_or_else(|| ScriptError::Parse {
298                        fcall: "parse_fields_kv".into(),
299                        execution_id: None,
300                        message: "stream field: bad key".into(),
301                    })?;
302                let val = value_to_string(inner[1].as_ref().ok()).unwrap_or_default();
303                out.insert(k, val);
304            }
305        }
306        FieldShape::Map => {
307            let pairs = match v {
308                ferriskey::Value::Map(pairs) => pairs,
309                other => {
310                    return Err(ScriptError::Parse {
311                        fcall: "parse_fields_kv".into(),
312                        execution_id: None,
313                        message: format!(
314                            "stream fields (Map): expected Map, got {other:?}"
315                        ),
316                    });
317                }
318            };
319            for (k, vv) in pairs {
320                let key = value_to_string(Some(k))
321                    .ok_or_else(|| ScriptError::Parse {
322                        fcall: "parse_fields_kv".into(),
323                        execution_id: None,
324                        message: "stream field: bad key".into(),
325                    })?;
326                let val = value_to_string(Some(vv)).unwrap_or_default();
327                out.insert(key, val);
328            }
329        }
330    }
331    Ok(out)
332}
333
334pub(crate) fn value_to_string(v: Option<&ferriskey::Value>) -> Option<String> {
335    match v? {
336        ferriskey::Value::BulkString(b) => Some(String::from_utf8_lossy(b).into_owned()),
337        ferriskey::Value::SimpleString(s) => Some(s.clone()),
338        ferriskey::Value::Int(n) => Some(n.to_string()),
339        ferriskey::Value::Okay => Some("OK".into()),
340        _ => None,
341    }
342}