Skip to main content

ff_script/functions/
stream.rs

1//! Typed FCALL wrapper for stream append function (lua/stream.lua).
2
3use ff_core::contracts::*;
4use crate::error::ScriptError;
5use ff_core::keys::ExecKeyContext;
6
7use crate::result::{FcallResult, FromFcallResult};
8
9/// Key context for stream operations — only needs exec keys (no indexes).
10pub struct StreamOpKeys<'a> {
11    pub ctx: &'a ExecKeyContext,
12}
13
14// ─── ff_append_frame ──────────────────────────────────────────────────
15//
16// Lua KEYS (3): exec_core, stream_data, stream_meta
17// Lua ARGV (13): execution_id, attempt_index, lease_id, lease_epoch,
18//                frame_type, ts, payload, encoding, correlation_id,
19//                source, retention_maxlen, attempt_id, max_payload_bytes
20
21ff_function! {
22    pub ff_append_frame(args: AppendFrameArgs) -> AppendFrameResult {
23        keys(k: &StreamOpKeys<'_>) {
24            k.ctx.core(),                                  // 1
25            k.ctx.stream(args.attempt_index),              // 2
26            k.ctx.stream_meta(args.attempt_index),         // 3
27        }
28        argv {
29            args.execution_id.to_string(),                 // 1
30            args.attempt_index.to_string(),                // 2
31            args.lease_id.to_string(),                     // 3
32            args.lease_epoch.to_string(),                  // 4
33            args.frame_type.clone(),                       // 5
34            args.timestamp.to_string(),                    // 6
35            String::from_utf8_lossy(&args.payload).into_owned(), // 7
36            args.encoding.clone().unwrap_or_else(|| "utf8".into()), // 8
37            args.correlation_id.clone().unwrap_or_default(), // 9
38            args.source.clone().unwrap_or_else(|| "worker".into()), // 10
39            args.retention_maxlen.unwrap_or(0).to_string(), // 11
40            args.attempt_id.to_string(),                   // 12
41            args.max_payload_bytes.unwrap_or(65536).to_string(), // 13
42        }
43    }
44}
45
46impl FromFcallResult for AppendFrameResult {
47    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
48        let r = FcallResult::parse(raw)?.into_success()?;
49        // ok(entry_id, frame_count)
50        let entry_id = r.field_str(0);
51        let frame_count = r.field_str(1).parse::<u64>()
52            .map_err(|e| ScriptError::Parse(format!("bad frame_count: {e}")))?;
53        Ok(AppendFrameResult::Appended {
54            entry_id,
55            frame_count,
56        })
57    }
58}
59
60// ─── ff_read_attempt_stream ───────────────────────────────────────────
61//
62// Lua KEYS (2): stream_data, stream_meta
63// Lua ARGV (3): from_id, to_id, count_limit
64
65ff_function! {
66    pub ff_read_attempt_stream(args: ReadFramesArgs) -> ReadFramesResult {
67        keys(k: &StreamOpKeys<'_>) {
68            k.ctx.stream(args.attempt_index),              // 1
69            k.ctx.stream_meta(args.attempt_index),         // 2
70        }
71        argv {
72            args.from_id.clone(),                          // 1
73            args.to_id.clone(),                            // 2
74            args.count_limit.to_string(),                  // 3
75        }
76    }
77}
78
79impl FromFcallResult for ReadFramesResult {
80    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
81        let r = FcallResult::parse(raw)?.into_success()?;
82        // Lua returns `ok(entries, closed_at, closed_reason)`.
83        // - entries: array of [entry_id, [f1, v1, ...]]
84        // - closed_at: string (ms timestamp) or "" if open
85        // - closed_reason: string or "" if open
86        let entries = r.fields.first().ok_or_else(|| {
87            ScriptError::Parse("ff_read_attempt_stream: missing entries field".into())
88        })?;
89        // XRANGE from a Valkey Function passes fields through as raw Lua
90        // arrays — no ArrayOfPairs conversion, so always Flat here.
91        let frames = parse_entries(entries, FieldShape::Flat)?;
92
93        let closed_at = r.field_str(1);
94        let closed_at = if closed_at.is_empty() {
95            None
96        } else {
97            closed_at
98                .parse::<i64>()
99                .ok()
100                .map(ff_core::types::TimestampMs::from_millis)
101        };
102
103        let closed_reason = r.field_str(2);
104        let closed_reason = if closed_reason.is_empty() {
105            None
106        } else {
107            Some(closed_reason)
108        };
109
110        Ok(ReadFramesResult::Frames(StreamFrames {
111            frames,
112            closed_at,
113            closed_reason,
114        }))
115    }
116}
117
118/// Explicit shape tag for stream-entry field payloads. Passed from the
119/// caller because each code path *knows* which shape ferriskey will emit —
120/// heuristic detection ("first entry looks like a 2-elem array") can
121/// mis-classify when field values happen to be arrays themselves.
122#[derive(Clone, Copy, Debug)]
123pub enum FieldShape {
124    /// Flat alternating `[k, v, k, v, ...]`. Emitted by raw Lua FCALL
125    /// replies and by RESP2 servers that bypass the ArrayOfPairs adapter.
126    Flat,
127    /// `[[k, v], [k, v], ...]` — ferriskey's `ArrayOfPairs` adapter for
128    /// XRANGE / XREAD direct commands.
129    Pairs,
130    /// RESP3 `{k: v, ...}` map.
131    Map,
132}
133
134/// Parse an XRANGE/XREAD-shaped `[entry_id, fields]` list.
135///
136/// Caller must tell us the field shape — see [`FieldShape`]. Inferring
137/// shape from data alone is unsound: any call site always knows whether
138/// it's reading a raw Lua payload (Flat), a ferriskey-adapted XRANGE/XREAD
139/// reply (Pairs), or a RESP3 Map.
140///
141/// Nil fields → an empty frame (no fields).
142pub(crate) fn parse_entries(
143    raw: &ferriskey::Value,
144    shape: FieldShape,
145) -> Result<Vec<StreamFrame>, ScriptError> {
146    let entries = match raw {
147        ferriskey::Value::Array(arr) => arr,
148        ferriskey::Value::Nil => return Ok(Vec::new()),
149        other => {
150            return Err(ScriptError::Parse(format!(
151                "XRANGE/XREAD entries: expected Array, got {other:?}"
152            )));
153        }
154    };
155
156    let mut frames = Vec::with_capacity(entries.len());
157    for entry in entries.iter() {
158        let entry = entry.as_ref().map_err(|e| {
159            ScriptError::Parse(format!("XRANGE entry error: {e}"))
160        })?;
161        let parts = match entry {
162            ferriskey::Value::Array(a) => a,
163            other => {
164                return Err(ScriptError::Parse(format!(
165                    "XRANGE entry: expected Array, got {other:?}"
166                )));
167            }
168        };
169        if parts.len() != 2 {
170            return Err(ScriptError::Parse(format!(
171                "XRANGE entry: expected 2 elements, got {}",
172                parts.len()
173            )));
174        }
175        let id = value_to_string(parts[0].as_ref().ok()).ok_or_else(|| {
176            ScriptError::Parse("XRANGE entry: missing/invalid id".into())
177        })?;
178
179        let field_val = match parts[1].as_ref() {
180            Ok(v) => v,
181            Err(e) => {
182                return Err(ScriptError::Parse(format!(
183                    "XRANGE entry fields error: {e}"
184                )));
185            }
186        };
187        let fields = parse_fields_kv(field_val, shape)?;
188        frames.push(StreamFrame { id, fields });
189    }
190    Ok(frames)
191}
192
193/// Parse a stream-entry field payload into a sorted map, using an explicit
194/// [`FieldShape`] tag supplied by the caller.
195///
196/// `Value::Nil` always yields an empty map (entry exists but has no
197/// fields) regardless of `shape` — both XRANGE and XREAD can produce this
198/// for empty writes.
199pub(crate) fn parse_fields_kv(
200    v: &ferriskey::Value,
201    shape: FieldShape,
202) -> Result<std::collections::BTreeMap<String, String>, ScriptError> {
203    let mut out = std::collections::BTreeMap::new();
204    if matches!(v, ferriskey::Value::Nil) {
205        return Ok(out);
206    }
207    match shape {
208        FieldShape::Flat => {
209            let arr = match v {
210                ferriskey::Value::Array(arr) => arr,
211                other => {
212                    return Err(ScriptError::Parse(format!(
213                        "stream fields (Flat): expected Array, got {other:?}"
214                    )));
215                }
216            };
217            if !arr.len().is_multiple_of(2) {
218                return Err(ScriptError::Parse(format!(
219                    "stream fields (Flat): odd element count {}",
220                    arr.len()
221                )));
222            }
223            let mut i = 0;
224            while i < arr.len() {
225                let k = value_to_string(arr[i].as_ref().ok())
226                    .ok_or_else(|| ScriptError::Parse("stream field: bad key".into()))?;
227                let val = value_to_string(arr[i + 1].as_ref().ok()).unwrap_or_default();
228                out.insert(k, val);
229                i += 2;
230            }
231        }
232        FieldShape::Pairs => {
233            let arr = match v {
234                ferriskey::Value::Array(arr) => arr,
235                other => {
236                    return Err(ScriptError::Parse(format!(
237                        "stream fields (Pairs): expected Array, got {other:?}"
238                    )));
239                }
240            };
241            for pair in arr.iter() {
242                let inner = match pair.as_ref() {
243                    Ok(ferriskey::Value::Array(inner)) => inner,
244                    _ => {
245                        return Err(ScriptError::Parse(
246                            "stream fields (Pairs): expected 2-element Array per entry".into(),
247                        ));
248                    }
249                };
250                if inner.len() != 2 {
251                    return Err(ScriptError::Parse(format!(
252                        "stream fields (Pairs): expected len=2, got {}",
253                        inner.len()
254                    )));
255                }
256                let k = value_to_string(inner[0].as_ref().ok())
257                    .ok_or_else(|| ScriptError::Parse("stream field: bad key".into()))?;
258                let val = value_to_string(inner[1].as_ref().ok()).unwrap_or_default();
259                out.insert(k, val);
260            }
261        }
262        FieldShape::Map => {
263            let pairs = match v {
264                ferriskey::Value::Map(pairs) => pairs,
265                other => {
266                    return Err(ScriptError::Parse(format!(
267                        "stream fields (Map): expected Map, got {other:?}"
268                    )));
269                }
270            };
271            for (k, vv) in pairs {
272                let key = value_to_string(Some(k))
273                    .ok_or_else(|| ScriptError::Parse("stream field: bad key".into()))?;
274                let val = value_to_string(Some(vv)).unwrap_or_default();
275                out.insert(key, val);
276            }
277        }
278    }
279    Ok(out)
280}
281
282pub(crate) fn value_to_string(v: Option<&ferriskey::Value>) -> Option<String> {
283    match v? {
284        ferriskey::Value::BulkString(b) => Some(String::from_utf8_lossy(b).into_owned()),
285        ferriskey::Value::SimpleString(s) => Some(s.clone()),
286        ferriskey::Value::Int(n) => Some(n.to_string()),
287        ferriskey::Value::Okay => Some("OK".into()),
288        _ => None,
289    }
290}