1use ff_core::contracts::*;
4use crate::error::ScriptError;
5use ff_core::keys::ExecKeyContext;
6
7use crate::result::{FcallResult, FromFcallResult};
8
9pub struct StreamOpKeys<'a> {
11 pub ctx: &'a ExecKeyContext,
12}
13
14ff_function! {
22 pub ff_append_frame(args: AppendFrameArgs) -> AppendFrameResult {
23 keys(k: &StreamOpKeys<'_>) {
24 k.ctx.core(), k.ctx.stream(args.attempt_index), k.ctx.stream_meta(args.attempt_index), }
28 argv {
29 args.execution_id.to_string(), args.attempt_index.to_string(), args.lease_id.to_string(), args.lease_epoch.to_string(), args.frame_type.clone(), args.timestamp.to_string(), String::from_utf8_lossy(&args.payload).into_owned(), args.encoding.clone().unwrap_or_else(|| "utf8".into()), args.correlation_id.clone().unwrap_or_default(), args.source.clone().unwrap_or_else(|| "worker".into()), args.retention_maxlen.unwrap_or(0).to_string(), args.attempt_id.to_string(), args.max_payload_bytes.unwrap_or(65536).to_string(), }
43 }
44}
45
46impl FromFcallResult for AppendFrameResult {
47 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
48 let r = FcallResult::parse(raw)?.into_success()?;
49 let entry_id = r.field_str(0);
51 let frame_count = r.field_str(1).parse::<u64>()
52 .map_err(|e| ScriptError::Parse {
53 fcall: "ff_append_frame".into(),
54 execution_id: None,
55 message: format!("bad frame_count: {e}"),
56 })?;
57 Ok(AppendFrameResult::Appended {
58 entry_id,
59 frame_count,
60 })
61 }
62}
63
64ff_function! {
70 pub ff_read_attempt_stream(args: ReadFramesArgs) -> ReadFramesResult {
71 keys(k: &StreamOpKeys<'_>) {
72 k.ctx.stream(args.attempt_index), k.ctx.stream_meta(args.attempt_index), }
75 argv {
76 args.from_id.clone(), args.to_id.clone(), args.count_limit.to_string(), }
80 }
81}
82
83impl FromFcallResult for ReadFramesResult {
84 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
85 let r = FcallResult::parse(raw)?.into_success()?;
86 let entries = r.fields.first().ok_or_else(|| ScriptError::Parse {
91 fcall: "ff_read_attempt_stream".into(),
92 execution_id: None,
93 message: "missing entries field".into(),
94 })?;
95 let frames = parse_entries(entries, FieldShape::Flat)?;
98
99 let closed_at = r.field_str(1);
100 let closed_at = if closed_at.is_empty() {
101 None
102 } else {
103 closed_at
104 .parse::<i64>()
105 .ok()
106 .map(ff_core::types::TimestampMs::from_millis)
107 };
108
109 let closed_reason = r.field_str(2);
110 let closed_reason = if closed_reason.is_empty() {
111 None
112 } else {
113 Some(closed_reason)
114 };
115
116 Ok(ReadFramesResult::Frames(StreamFrames {
117 frames,
118 closed_at,
119 closed_reason,
120 }))
121 }
122}
123
124#[derive(Clone, Copy, Debug)]
129pub enum FieldShape {
130 Flat,
133 Pairs,
136 Map,
138}
139
140pub(crate) fn parse_entries(
149 raw: &ferriskey::Value,
150 shape: FieldShape,
151) -> Result<Vec<StreamFrame>, ScriptError> {
152 let entries = match raw {
153 ferriskey::Value::Array(arr) => arr,
154 ferriskey::Value::Nil => return Ok(Vec::new()),
155 other => {
156 return Err(ScriptError::Parse {
157 fcall: "parse_entries".into(),
158 execution_id: None,
159 message: format!("XRANGE/XREAD entries: expected Array, got {other:?}"),
160 });
161 }
162 };
163
164 let mut frames = Vec::with_capacity(entries.len());
165 for entry in entries.iter() {
166 let entry = entry.as_ref().map_err(|e| ScriptError::Parse {
167 fcall: "parse_entries".into(),
168 execution_id: None,
169 message: format!("XRANGE entry error: {e}"),
170 })?;
171 let parts = match entry {
172 ferriskey::Value::Array(a) => a,
173 other => {
174 return Err(ScriptError::Parse {
175 fcall: "parse_entries".into(),
176 execution_id: None,
177 message: format!("XRANGE entry: expected Array, got {other:?}"),
178 });
179 }
180 };
181 if parts.len() != 2 {
182 return Err(ScriptError::Parse {
183 fcall: "parse_entries".into(),
184 execution_id: None,
185 message: format!("XRANGE entry: expected 2 elements, got {}", parts.len()),
186 });
187 }
188 let id = value_to_string(parts[0].as_ref().ok()).ok_or_else(|| ScriptError::Parse {
189 fcall: "parse_entries".into(),
190 execution_id: None,
191 message: "XRANGE entry: missing/invalid id".into(),
192 })?;
193
194 let field_val = match parts[1].as_ref() {
195 Ok(v) => v,
196 Err(e) => {
197 return Err(ScriptError::Parse {
198 fcall: "parse_entries".into(),
199 execution_id: None,
200 message: format!("XRANGE entry fields error: {e}"),
201 });
202 }
203 };
204 let fields = parse_fields_kv(field_val, shape)?;
205 frames.push(StreamFrame { id, fields });
206 }
207 Ok(frames)
208}
209
210pub(crate) fn parse_fields_kv(
217 v: &ferriskey::Value,
218 shape: FieldShape,
219) -> Result<std::collections::BTreeMap<String, String>, ScriptError> {
220 let mut out = std::collections::BTreeMap::new();
221 if matches!(v, ferriskey::Value::Nil) {
222 return Ok(out);
223 }
224 match shape {
225 FieldShape::Flat => {
226 let arr = match v {
227 ferriskey::Value::Array(arr) => arr,
228 other => {
229 return Err(ScriptError::Parse {
230 fcall: "parse_fields_kv".into(),
231 execution_id: None,
232 message: format!(
233 "stream fields (Flat): expected Array, got {other:?}"
234 ),
235 });
236 }
237 };
238 if !arr.len().is_multiple_of(2) {
239 return Err(ScriptError::Parse {
240 fcall: "parse_fields_kv".into(),
241 execution_id: None,
242 message: format!(
243 "stream fields (Flat): odd element count {}",
244 arr.len()
245 ),
246 });
247 }
248 let mut i = 0;
249 while i < arr.len() {
250 let k = value_to_string(arr[i].as_ref().ok())
251 .ok_or_else(|| ScriptError::Parse {
252 fcall: "parse_fields_kv".into(),
253 execution_id: None,
254 message: "stream field: bad key".into(),
255 })?;
256 let val = value_to_string(arr[i + 1].as_ref().ok()).unwrap_or_default();
257 out.insert(k, val);
258 i += 2;
259 }
260 }
261 FieldShape::Pairs => {
262 let arr = match v {
263 ferriskey::Value::Array(arr) => arr,
264 other => {
265 return Err(ScriptError::Parse {
266 fcall: "parse_fields_kv".into(),
267 execution_id: None,
268 message: format!(
269 "stream fields (Pairs): expected Array, got {other:?}"
270 ),
271 });
272 }
273 };
274 for pair in arr.iter() {
275 let inner = match pair.as_ref() {
276 Ok(ferriskey::Value::Array(inner)) => inner,
277 _ => {
278 return Err(ScriptError::Parse {
279 fcall: "parse_fields_kv".into(),
280 execution_id: None,
281 message: "stream fields (Pairs): expected 2-element Array per entry"
282 .into(),
283 });
284 }
285 };
286 if inner.len() != 2 {
287 return Err(ScriptError::Parse {
288 fcall: "parse_fields_kv".into(),
289 execution_id: None,
290 message: format!(
291 "stream fields (Pairs): expected len=2, got {}",
292 inner.len()
293 ),
294 });
295 }
296 let k = value_to_string(inner[0].as_ref().ok())
297 .ok_or_else(|| ScriptError::Parse {
298 fcall: "parse_fields_kv".into(),
299 execution_id: None,
300 message: "stream field: bad key".into(),
301 })?;
302 let val = value_to_string(inner[1].as_ref().ok()).unwrap_or_default();
303 out.insert(k, val);
304 }
305 }
306 FieldShape::Map => {
307 let pairs = match v {
308 ferriskey::Value::Map(pairs) => pairs,
309 other => {
310 return Err(ScriptError::Parse {
311 fcall: "parse_fields_kv".into(),
312 execution_id: None,
313 message: format!(
314 "stream fields (Map): expected Map, got {other:?}"
315 ),
316 });
317 }
318 };
319 for (k, vv) in pairs {
320 let key = value_to_string(Some(k))
321 .ok_or_else(|| ScriptError::Parse {
322 fcall: "parse_fields_kv".into(),
323 execution_id: None,
324 message: "stream field: bad key".into(),
325 })?;
326 let val = value_to_string(Some(vv)).unwrap_or_default();
327 out.insert(key, val);
328 }
329 }
330 }
331 Ok(out)
332}
333
334pub(crate) fn value_to_string(v: Option<&ferriskey::Value>) -> Option<String> {
335 match v? {
336 ferriskey::Value::BulkString(b) => Some(String::from_utf8_lossy(b).into_owned()),
337 ferriskey::Value::SimpleString(s) => Some(s.clone()),
338 ferriskey::Value::Int(n) => Some(n.to_string()),
339 ferriskey::Value::Okay => Some("OK".into()),
340 _ => None,
341 }
342}