use ff_core::contracts::*;
use crate::error::ScriptError;
use ff_core::keys::ExecKeyContext;
use crate::result::{FcallResult, FromFcallResult};
pub struct StreamOpKeys<'a> {
pub ctx: &'a ExecKeyContext,
}
ff_function! {
pub ff_append_frame(args: AppendFrameArgs) -> AppendFrameResult {
keys(k: &StreamOpKeys<'_>) {
k.ctx.core(), k.ctx.stream(args.attempt_index), k.ctx.stream_meta(args.attempt_index), }
argv {
args.execution_id.to_string(), args.attempt_index.to_string(), args.lease_id.to_string(), args.lease_epoch.to_string(), args.frame_type.clone(), args.timestamp.to_string(), String::from_utf8_lossy(&args.payload).into_owned(), args.encoding.clone().unwrap_or_else(|| "utf8".into()), args.correlation_id.clone().unwrap_or_default(), args.source.clone().unwrap_or_else(|| "worker".into()), args.retention_maxlen.unwrap_or(0).to_string(), args.attempt_id.to_string(), args.max_payload_bytes.unwrap_or(65536).to_string(), }
}
}
impl FromFcallResult for AppendFrameResult {
fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
let r = FcallResult::parse(raw)?.into_success()?;
let entry_id = r.field_str(0);
let frame_count = r.field_str(1).parse::<u64>()
.map_err(|e| ScriptError::Parse {
fcall: "ff_append_frame".into(),
execution_id: None,
message: format!("bad frame_count: {e}"),
})?;
Ok(AppendFrameResult::Appended {
entry_id,
frame_count,
})
}
}
ff_function! {
pub ff_read_attempt_stream(args: ReadFramesArgs) -> ReadFramesResult {
keys(k: &StreamOpKeys<'_>) {
k.ctx.stream(args.attempt_index), k.ctx.stream_meta(args.attempt_index), }
argv {
args.from_id.clone(), args.to_id.clone(), args.count_limit.to_string(), }
}
}
impl FromFcallResult for ReadFramesResult {
fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
let r = FcallResult::parse(raw)?.into_success()?;
let entries = r.fields.first().ok_or_else(|| ScriptError::Parse {
fcall: "ff_read_attempt_stream".into(),
execution_id: None,
message: "missing entries field".into(),
})?;
let frames = parse_entries(entries, FieldShape::Flat)?;
let closed_at = r.field_str(1);
let closed_at = if closed_at.is_empty() {
None
} else {
closed_at
.parse::<i64>()
.ok()
.map(ff_core::types::TimestampMs::from_millis)
};
let closed_reason = r.field_str(2);
let closed_reason = if closed_reason.is_empty() {
None
} else {
Some(closed_reason)
};
Ok(ReadFramesResult::Frames(StreamFrames {
frames,
closed_at,
closed_reason,
}))
}
}
#[derive(Clone, Copy, Debug)]
pub enum FieldShape {
Flat,
Pairs,
Map,
}
pub(crate) fn parse_entries(
raw: &ferriskey::Value,
shape: FieldShape,
) -> Result<Vec<StreamFrame>, ScriptError> {
let entries = match raw {
ferriskey::Value::Array(arr) => arr,
ferriskey::Value::Nil => return Ok(Vec::new()),
other => {
return Err(ScriptError::Parse {
fcall: "parse_entries".into(),
execution_id: None,
message: format!("XRANGE/XREAD entries: expected Array, got {other:?}"),
});
}
};
let mut frames = Vec::with_capacity(entries.len());
for entry in entries.iter() {
let entry = entry.as_ref().map_err(|e| ScriptError::Parse {
fcall: "parse_entries".into(),
execution_id: None,
message: format!("XRANGE entry error: {e}"),
})?;
let parts = match entry {
ferriskey::Value::Array(a) => a,
other => {
return Err(ScriptError::Parse {
fcall: "parse_entries".into(),
execution_id: None,
message: format!("XRANGE entry: expected Array, got {other:?}"),
});
}
};
if parts.len() != 2 {
return Err(ScriptError::Parse {
fcall: "parse_entries".into(),
execution_id: None,
message: format!("XRANGE entry: expected 2 elements, got {}", parts.len()),
});
}
let id = value_to_string(parts[0].as_ref().ok()).ok_or_else(|| ScriptError::Parse {
fcall: "parse_entries".into(),
execution_id: None,
message: "XRANGE entry: missing/invalid id".into(),
})?;
let field_val = match parts[1].as_ref() {
Ok(v) => v,
Err(e) => {
return Err(ScriptError::Parse {
fcall: "parse_entries".into(),
execution_id: None,
message: format!("XRANGE entry fields error: {e}"),
});
}
};
let fields = parse_fields_kv(field_val, shape)?;
frames.push(StreamFrame { id, fields });
}
Ok(frames)
}
pub(crate) fn parse_fields_kv(
v: &ferriskey::Value,
shape: FieldShape,
) -> Result<std::collections::BTreeMap<String, String>, ScriptError> {
let mut out = std::collections::BTreeMap::new();
if matches!(v, ferriskey::Value::Nil) {
return Ok(out);
}
match shape {
FieldShape::Flat => {
let arr = match v {
ferriskey::Value::Array(arr) => arr,
other => {
return Err(ScriptError::Parse {
fcall: "parse_fields_kv".into(),
execution_id: None,
message: format!(
"stream fields (Flat): expected Array, got {other:?}"
),
});
}
};
if !arr.len().is_multiple_of(2) {
return Err(ScriptError::Parse {
fcall: "parse_fields_kv".into(),
execution_id: None,
message: format!(
"stream fields (Flat): odd element count {}",
arr.len()
),
});
}
let mut i = 0;
while i < arr.len() {
let k = value_to_string(arr[i].as_ref().ok())
.ok_or_else(|| ScriptError::Parse {
fcall: "parse_fields_kv".into(),
execution_id: None,
message: "stream field: bad key".into(),
})?;
let val = value_to_string(arr[i + 1].as_ref().ok()).unwrap_or_default();
out.insert(k, val);
i += 2;
}
}
FieldShape::Pairs => {
let arr = match v {
ferriskey::Value::Array(arr) => arr,
other => {
return Err(ScriptError::Parse {
fcall: "parse_fields_kv".into(),
execution_id: None,
message: format!(
"stream fields (Pairs): expected Array, got {other:?}"
),
});
}
};
for pair in arr.iter() {
let inner = match pair.as_ref() {
Ok(ferriskey::Value::Array(inner)) => inner,
_ => {
return Err(ScriptError::Parse {
fcall: "parse_fields_kv".into(),
execution_id: None,
message: "stream fields (Pairs): expected 2-element Array per entry"
.into(),
});
}
};
if inner.len() != 2 {
return Err(ScriptError::Parse {
fcall: "parse_fields_kv".into(),
execution_id: None,
message: format!(
"stream fields (Pairs): expected len=2, got {}",
inner.len()
),
});
}
let k = value_to_string(inner[0].as_ref().ok())
.ok_or_else(|| ScriptError::Parse {
fcall: "parse_fields_kv".into(),
execution_id: None,
message: "stream field: bad key".into(),
})?;
let val = value_to_string(inner[1].as_ref().ok()).unwrap_or_default();
out.insert(k, val);
}
}
FieldShape::Map => {
let pairs = match v {
ferriskey::Value::Map(pairs) => pairs,
other => {
return Err(ScriptError::Parse {
fcall: "parse_fields_kv".into(),
execution_id: None,
message: format!(
"stream fields (Map): expected Map, got {other:?}"
),
});
}
};
for (k, vv) in pairs {
let key = value_to_string(Some(k))
.ok_or_else(|| ScriptError::Parse {
fcall: "parse_fields_kv".into(),
execution_id: None,
message: "stream field: bad key".into(),
})?;
let val = value_to_string(Some(vv)).unwrap_or_default();
out.insert(key, val);
}
}
}
Ok(out)
}
pub(crate) fn value_to_string(v: Option<&ferriskey::Value>) -> Option<String> {
match v? {
ferriskey::Value::BulkString(b) => Some(String::from_utf8_lossy(b).into_owned()),
ferriskey::Value::SimpleString(s) => Some(s.clone()),
ferriskey::Value::Int(n) => Some(n.to_string()),
ferriskey::Value::Okay => Some("OK".into()),
_ => None,
}
}