ff_script/functions/
stream.rs1use ff_core::contracts::*;
4use crate::error::ScriptError;
5use ff_core::keys::ExecKeyContext;
6
7use crate::result::{FcallResult, FromFcallResult};
8
9pub struct StreamOpKeys<'a> {
11 pub ctx: &'a ExecKeyContext,
12}
13
14ff_function! {
22 pub ff_append_frame(args: AppendFrameArgs) -> AppendFrameResult {
23 keys(k: &StreamOpKeys<'_>) {
24 k.ctx.core(), k.ctx.stream(args.attempt_index), k.ctx.stream_meta(args.attempt_index), }
28 argv {
29 args.execution_id.to_string(), args.attempt_index.to_string(), args.lease_id.to_string(), args.lease_epoch.to_string(), args.frame_type.clone(), args.timestamp.to_string(), String::from_utf8_lossy(&args.payload).into_owned(), args.encoding.clone().unwrap_or_else(|| "utf8".into()), args.correlation_id.clone().unwrap_or_default(), args.source.clone().unwrap_or_else(|| "worker".into()), args.retention_maxlen.unwrap_or(0).to_string(), args.attempt_id.to_string(), args.max_payload_bytes.unwrap_or(65536).to_string(), }
43 }
44}
45
46impl FromFcallResult for AppendFrameResult {
47 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
48 let r = FcallResult::parse(raw)?.into_success()?;
49 let entry_id = r.field_str(0);
51 let frame_count = r.field_str(1).parse::<u64>()
52 .map_err(|e| ScriptError::Parse(format!("bad frame_count: {e}")))?;
53 Ok(AppendFrameResult::Appended {
54 entry_id,
55 frame_count,
56 })
57 }
58}
59
60ff_function! {
66 pub ff_read_attempt_stream(args: ReadFramesArgs) -> ReadFramesResult {
67 keys(k: &StreamOpKeys<'_>) {
68 k.ctx.stream(args.attempt_index), k.ctx.stream_meta(args.attempt_index), }
71 argv {
72 args.from_id.clone(), args.to_id.clone(), args.count_limit.to_string(), }
76 }
77}
78
79impl FromFcallResult for ReadFramesResult {
80 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
81 let r = FcallResult::parse(raw)?.into_success()?;
82 let entries = r.fields.first().ok_or_else(|| {
87 ScriptError::Parse("ff_read_attempt_stream: missing entries field".into())
88 })?;
89 let frames = parse_entries(entries, FieldShape::Flat)?;
92
93 let closed_at = r.field_str(1);
94 let closed_at = if closed_at.is_empty() {
95 None
96 } else {
97 closed_at
98 .parse::<i64>()
99 .ok()
100 .map(ff_core::types::TimestampMs::from_millis)
101 };
102
103 let closed_reason = r.field_str(2);
104 let closed_reason = if closed_reason.is_empty() {
105 None
106 } else {
107 Some(closed_reason)
108 };
109
110 Ok(ReadFramesResult::Frames(StreamFrames {
111 frames,
112 closed_at,
113 closed_reason,
114 }))
115 }
116}
117
118#[derive(Clone, Copy, Debug)]
123pub enum FieldShape {
124 Flat,
127 Pairs,
130 Map,
132}
133
134pub(crate) fn parse_entries(
143 raw: &ferriskey::Value,
144 shape: FieldShape,
145) -> Result<Vec<StreamFrame>, ScriptError> {
146 let entries = match raw {
147 ferriskey::Value::Array(arr) => arr,
148 ferriskey::Value::Nil => return Ok(Vec::new()),
149 other => {
150 return Err(ScriptError::Parse(format!(
151 "XRANGE/XREAD entries: expected Array, got {other:?}"
152 )));
153 }
154 };
155
156 let mut frames = Vec::with_capacity(entries.len());
157 for entry in entries.iter() {
158 let entry = entry.as_ref().map_err(|e| {
159 ScriptError::Parse(format!("XRANGE entry error: {e}"))
160 })?;
161 let parts = match entry {
162 ferriskey::Value::Array(a) => a,
163 other => {
164 return Err(ScriptError::Parse(format!(
165 "XRANGE entry: expected Array, got {other:?}"
166 )));
167 }
168 };
169 if parts.len() != 2 {
170 return Err(ScriptError::Parse(format!(
171 "XRANGE entry: expected 2 elements, got {}",
172 parts.len()
173 )));
174 }
175 let id = value_to_string(parts[0].as_ref().ok()).ok_or_else(|| {
176 ScriptError::Parse("XRANGE entry: missing/invalid id".into())
177 })?;
178
179 let field_val = match parts[1].as_ref() {
180 Ok(v) => v,
181 Err(e) => {
182 return Err(ScriptError::Parse(format!(
183 "XRANGE entry fields error: {e}"
184 )));
185 }
186 };
187 let fields = parse_fields_kv(field_val, shape)?;
188 frames.push(StreamFrame { id, fields });
189 }
190 Ok(frames)
191}
192
193pub(crate) fn parse_fields_kv(
200 v: &ferriskey::Value,
201 shape: FieldShape,
202) -> Result<std::collections::BTreeMap<String, String>, ScriptError> {
203 let mut out = std::collections::BTreeMap::new();
204 if matches!(v, ferriskey::Value::Nil) {
205 return Ok(out);
206 }
207 match shape {
208 FieldShape::Flat => {
209 let arr = match v {
210 ferriskey::Value::Array(arr) => arr,
211 other => {
212 return Err(ScriptError::Parse(format!(
213 "stream fields (Flat): expected Array, got {other:?}"
214 )));
215 }
216 };
217 if !arr.len().is_multiple_of(2) {
218 return Err(ScriptError::Parse(format!(
219 "stream fields (Flat): odd element count {}",
220 arr.len()
221 )));
222 }
223 let mut i = 0;
224 while i < arr.len() {
225 let k = value_to_string(arr[i].as_ref().ok())
226 .ok_or_else(|| ScriptError::Parse("stream field: bad key".into()))?;
227 let val = value_to_string(arr[i + 1].as_ref().ok()).unwrap_or_default();
228 out.insert(k, val);
229 i += 2;
230 }
231 }
232 FieldShape::Pairs => {
233 let arr = match v {
234 ferriskey::Value::Array(arr) => arr,
235 other => {
236 return Err(ScriptError::Parse(format!(
237 "stream fields (Pairs): expected Array, got {other:?}"
238 )));
239 }
240 };
241 for pair in arr.iter() {
242 let inner = match pair.as_ref() {
243 Ok(ferriskey::Value::Array(inner)) => inner,
244 _ => {
245 return Err(ScriptError::Parse(
246 "stream fields (Pairs): expected 2-element Array per entry".into(),
247 ));
248 }
249 };
250 if inner.len() != 2 {
251 return Err(ScriptError::Parse(format!(
252 "stream fields (Pairs): expected len=2, got {}",
253 inner.len()
254 )));
255 }
256 let k = value_to_string(inner[0].as_ref().ok())
257 .ok_or_else(|| ScriptError::Parse("stream field: bad key".into()))?;
258 let val = value_to_string(inner[1].as_ref().ok()).unwrap_or_default();
259 out.insert(k, val);
260 }
261 }
262 FieldShape::Map => {
263 let pairs = match v {
264 ferriskey::Value::Map(pairs) => pairs,
265 other => {
266 return Err(ScriptError::Parse(format!(
267 "stream fields (Map): expected Map, got {other:?}"
268 )));
269 }
270 };
271 for (k, vv) in pairs {
272 let key = value_to_string(Some(k))
273 .ok_or_else(|| ScriptError::Parse("stream field: bad key".into()))?;
274 let val = value_to_string(Some(vv)).unwrap_or_default();
275 out.insert(key, val);
276 }
277 }
278 }
279 Ok(out)
280}
281
282pub(crate) fn value_to_string(v: Option<&ferriskey::Value>) -> Option<String> {
283 match v? {
284 ferriskey::Value::BulkString(b) => Some(String::from_utf8_lossy(b).into_owned()),
285 ferriskey::Value::SimpleString(s) => Some(s.clone()),
286 ferriskey::Value::Int(n) => Some(n.to_string()),
287 ferriskey::Value::Okay => Some("OK".into()),
288 _ => None,
289 }
290}