use ff_core::contracts::{StreamFrame, StreamFrames, STREAM_READ_HARD_CAP};
use ff_core::types::TimestampMs;
use ferriskey::{Client, Value};
use crate::error::ScriptError;
use crate::functions::stream::{parse_entries, parse_fields_kv, value_to_string, FieldShape};
pub async fn xread_block(
client: &Client,
stream_key: &str,
stream_meta_key: &str,
last_id: &str,
block_ms: u64,
count_limit: u64,
) -> Result<StreamFrames, ScriptError> {
if count_limit == 0 {
return Err(ScriptError::InvalidInput(
"xread_block: count_limit must be >= 1".into(),
));
}
if count_limit > STREAM_READ_HARD_CAP {
return Err(ScriptError::InvalidInput(format!(
"xread_block: count_limit exceeds STREAM_READ_HARD_CAP ({STREAM_READ_HARD_CAP})"
)));
}
debug_assert!(
STREAM_READ_HARD_CAP as i128 <= i64::MAX as i128,
"STREAM_READ_HARD_CAP must fit in i64 — RESP COUNT arg"
);
let cmd = client.cmd("XREAD").arg("COUNT").arg(count_limit);
let cmd = if block_ms > 0 {
cmd.arg("BLOCK").arg(block_ms)
} else {
cmd
};
let cmd = cmd.arg("STREAMS").arg(stream_key).arg(last_id);
let raw: Value = cmd.execute().await.map_err(ScriptError::Valkey)?;
let frames = parse_xread_reply(&raw, stream_key)?;
let (closed_at, closed_reason) = fetch_closed_meta(client, stream_meta_key).await?;
Ok(StreamFrames { frames, closed_at, closed_reason })
}
async fn fetch_closed_meta(
client: &Client,
stream_meta_key: &str,
) -> Result<(Option<TimestampMs>, Option<String>), ScriptError> {
let values: Vec<Option<String>> = client
.cmd("HMGET")
.arg(stream_meta_key)
.arg("closed_at")
.arg("closed_reason")
.execute()
.await
.map_err(ScriptError::Valkey)?;
let closed_at = values
.first()
.and_then(|v| v.as_deref())
.filter(|s| !s.is_empty())
.and_then(|s| s.parse::<i64>().ok())
.map(TimestampMs::from_millis);
let closed_reason = values
.get(1)
.and_then(|v| v.clone())
.filter(|s| !s.is_empty());
Ok((closed_at, closed_reason))
}
fn parse_xread_reply(raw: &Value, stream_key: &str) -> Result<Vec<StreamFrame>, ScriptError> {
let outer = match raw {
Value::Nil => return Ok(Vec::new()),
Value::Map(m) => m,
Value::Array(arr) => {
let mut non_match_count: usize = 0;
for entry in arr {
let Ok(Value::Array(pair)) = entry.as_ref() else {
non_match_count += 1;
continue;
};
if pair.len() != 2 {
non_match_count += 1;
continue;
}
let matches_key = match pair[0].as_ref() {
Ok(Value::BulkString(b)) => b.as_ref() == stream_key.as_bytes(),
Ok(Value::SimpleString(s)) => s == stream_key,
_ => false,
};
if !matches_key {
non_match_count += 1;
continue;
}
let entries_val = match pair[1].as_ref() {
Ok(v) => v,
Err(e) => {
return Err(ScriptError::Parse {
fcall: "stream_tail_decode".into(),
execution_id: None,
message: format!("XREAD entries (RESP2): {e}"),
});
}
};
return parse_entries_any(entries_val);
}
if non_match_count > 0 {
tracing::trace!(
non_match = non_match_count,
stream_key,
"XREAD RESP2 reply had entries but none matched the requested stream key"
);
}
return Ok(Vec::new());
}
other => {
return Err(ScriptError::Parse {
fcall: "stream_tail_decode".into(),
execution_id: None,
message: format!("XREAD: expected Map/Nil/Array, got {other:?}"),
});
}
};
let mut non_match_count: usize = 0;
for (k, v) in outer.iter() {
let matches_key = match k {
Value::BulkString(b) => b.as_ref() == stream_key.as_bytes(),
Value::SimpleString(s) => s == stream_key,
_ => false,
};
if !matches_key {
non_match_count += 1;
continue;
}
let frames = match v {
Value::Map(entries) => {
let mut frames = Vec::with_capacity(entries.len());
for (id_v, field_v) in entries {
let id = value_to_string(Some(id_v))
.ok_or_else(|| ScriptError::Parse {
fcall: "stream_tail_decode".into(),
execution_id: None,
message: "XREAD entry: bad id".into(),
})?;
let fields = parse_fields_kv(field_v, FieldShape::Pairs)?;
frames.push(StreamFrame { id, fields });
}
frames
}
Value::Array(arr) => {
if arr.is_empty() {
tracing::trace!(
stream_key,
"XREAD reply matched stream key but entries Array was empty — possible \
malformed reply"
);
}
parse_entries_any(v)?
}
Value::Nil => Vec::new(),
other => {
return Err(ScriptError::Parse {
fcall: "stream_tail_decode".into(),
execution_id: None,
message: format!("XREAD entries: expected Map/Array, got {other:?}"),
});
}
};
return Ok(frames);
}
if non_match_count > 0 {
tracing::trace!(
non_match = non_match_count,
stream_key,
"XREAD Map reply had entries but none matched the requested stream key"
);
}
Ok(Vec::new())
}
fn parse_entries_any(raw: &Value) -> Result<Vec<StreamFrame>, ScriptError> {
match raw {
Value::Nil => Ok(Vec::new()),
Value::Array(_) => parse_entries(raw, FieldShape::Pairs),
Value::Map(map) => {
let mut frames = Vec::with_capacity(map.len());
for (id_v, field_v) in map {
let id = value_to_string(Some(id_v))
.ok_or_else(|| ScriptError::Parse {
fcall: "parse_entries_any".into(),
execution_id: None,
message: "XREAD entry: bad id".into(),
})?;
let fields = parse_fields_kv(field_v, FieldShape::Pairs)?;
frames.push(StreamFrame { id, fields });
}
Ok(frames)
}
other => Err(ScriptError::Parse {
fcall: "parse_entries_any".into(),
execution_id: None,
message: format!("XREAD entries: expected Array/Map/Nil, got {other:?}"),
}),
}
}