use std::sync::Arc;
use crate::filter::{Env, Filter};
use crate::value::Value;
use super::super::eval::eval;
use super::super::value_ops::set_path;
pub(super) fn eval_streaming(
name: &str,
args: &[Filter],
input: &Value,
env: &Env,
output: &mut dyn FnMut(Value),
) {
match name {
"tostream" => {
to_stream(input, &mut Vec::new(), output);
}
"fromstream" => {
if let Some(expr) = args.first() {
from_stream(expr, input, env, output);
}
}
"truncate_stream" => {
if let Some(expr) = args.first() {
truncate_stream(expr, input, env, output);
}
}
_ => {}
}
}
fn to_stream(value: &Value, path: &mut Vec<Value>, output: &mut dyn FnMut(Value)) {
match value {
Value::Object(obj) if !obj.is_empty() => {
let pairs = obj.as_ref();
for (key, val) in pairs {
path.push(Value::String(key.clone()));
to_stream(val, path, output);
path.pop();
}
let last_key = &pairs[pairs.len() - 1].0;
path.push(Value::String(last_key.clone()));
output(Value::Array(Arc::new(vec![Value::Array(Arc::new(
path.clone(),
))])));
path.pop();
}
Value::Array(arr) if !arr.is_empty() => {
let items = arr.as_ref();
for (i, val) in items.iter().enumerate() {
path.push(Value::Int(i as i64));
to_stream(val, path, output);
path.pop();
}
path.push(Value::Int((items.len() - 1) as i64));
output(Value::Array(Arc::new(vec![Value::Array(Arc::new(
path.clone(),
))])));
path.pop();
}
_ => {
output(Value::Array(Arc::new(vec![
Value::Array(Arc::new(path.clone())),
value.clone(),
])));
}
}
}
fn from_stream(expr: &Filter, input: &Value, env: &Env, output: &mut dyn FnMut(Value)) {
let mut acc: Option<Value> = None;
eval(expr, input, env, &mut |item| {
if let Value::Array(arr) = &item {
let len = arr.len();
if len == 2 {
if let Value::Array(path_arr) = &arr[0] {
if path_arr.is_empty() {
output(arr[1].clone());
acc = None;
} else {
let base = acc.take().unwrap_or(Value::Null);
match set_path(&base, path_arr, &arr[1]) {
Ok(v) => acc = Some(v),
Err(_) => acc = Some(base),
}
}
}
} else if len == 1 {
if let Value::Array(path_arr) = &arr[0]
&& path_arr.len() <= 1
{
if let Some(v) = acc.take() {
output(v);
}
}
}
}
});
}
fn truncate_stream(expr: &Filter, input: &Value, env: &Env, output: &mut dyn FnMut(Value)) {
eval(expr, input, env, &mut |item| {
if let Value::Array(arr) = &item {
let len = arr.len();
if len == 2 {
if let Value::Array(path_arr) = &arr[0] {
let new_path = if path_arr.len() > 1 {
Value::Array(Arc::new(path_arr[1..].to_vec()))
} else {
Value::Array(Arc::new(vec![]))
};
output(Value::Array(Arc::new(vec![new_path, arr[1].clone()])));
}
} else if len == 1 {
if let Value::Array(path_arr) = &arr[0] {
if path_arr.len() > 1 {
output(Value::Array(Arc::new(vec![Value::Array(Arc::new(
path_arr[1..].to_vec(),
))])));
} else {
output(Value::Array(Arc::new(vec![Value::Array(Arc::new(vec![]))])));
}
}
}
}
});
}
#[cfg(test)]
mod tests {
use super::*;
fn collect_stream(value: &Value) -> Vec<Value> {
let mut out = Vec::new();
to_stream(value, &mut Vec::new(), &mut |v| out.push(v));
out
}
fn collect_from_stream(items: Vec<Value>) -> Vec<Value> {
let stream = Value::Array(Arc::new(items));
let expr = Filter::Iterate;
let env = Env::empty();
let mut out = Vec::new();
from_stream(&expr, &stream, &env, &mut |v| out.push(v));
out
}
fn path(keys: &[Value]) -> Value {
Value::Array(Arc::new(keys.to_vec()))
}
fn leaf(keys: &[Value], val: Value) -> Value {
Value::Array(Arc::new(vec![path(keys), val]))
}
fn terminator(keys: &[Value]) -> Value {
Value::Array(Arc::new(vec![path(keys)]))
}
#[test]
fn tostream_scalar() {
let entries = collect_stream(&Value::Int(42));
assert_eq!(entries, vec![leaf(&[], Value::Int(42))]);
}
#[test]
fn tostream_empty_array() {
let entries = collect_stream(&Value::Array(Arc::new(vec![])));
assert_eq!(entries, vec![leaf(&[], Value::Array(Arc::new(vec![])))]);
}
#[test]
fn tostream_empty_object() {
let entries = collect_stream(&Value::Object(Arc::new(vec![])));
assert_eq!(entries, vec![leaf(&[], Value::Object(Arc::new(vec![])))]);
}
#[test]
fn tostream_flat_object() {
let obj = Value::Object(Arc::new(vec![
("a".to_string(), Value::Int(1)),
("b".to_string(), Value::Int(2)),
]));
let entries = collect_stream(&obj);
assert_eq!(
entries,
vec![
leaf(&[Value::String("a".into())], Value::Int(1)),
leaf(&[Value::String("b".into())], Value::Int(2)),
terminator(&[Value::String("b".into())]),
]
);
}
#[test]
fn tostream_flat_array() {
let arr = Value::Array(Arc::new(vec![Value::Int(10), Value::Int(20)]));
let entries = collect_stream(&arr);
assert_eq!(
entries,
vec![
leaf(&[Value::Int(0)], Value::Int(10)),
leaf(&[Value::Int(1)], Value::Int(20)),
terminator(&[Value::Int(1)]),
]
);
}
#[test]
fn tostream_nested() {
let inner = Value::Object(Arc::new(vec![("x".to_string(), Value::Int(1))]));
let obj = Value::Object(Arc::new(vec![("a".to_string(), inner)]));
let entries = collect_stream(&obj);
assert_eq!(
entries,
vec![
leaf(
&[Value::String("a".into()), Value::String("x".into())],
Value::Int(1)
),
terminator(&[Value::String("a".into()), Value::String("x".into())]),
terminator(&[Value::String("a".into())]),
]
);
}
#[test]
fn fromstream_roundtrip_object() {
let obj = Value::Object(Arc::new(vec![
("a".to_string(), Value::Int(1)),
("b".to_string(), Value::Int(2)),
]));
let entries = collect_stream(&obj);
let rebuilt = collect_from_stream(entries);
assert_eq!(rebuilt, vec![obj]);
}
#[test]
fn fromstream_roundtrip_array() {
let arr = Value::Array(Arc::new(vec![
Value::Int(10),
Value::Int(20),
Value::Int(30),
]));
let entries = collect_stream(&arr);
let rebuilt = collect_from_stream(entries);
assert_eq!(rebuilt, vec![arr]);
}
#[test]
fn fromstream_scalar_emits_directly() {
let items = vec![leaf(&[], Value::Int(42))];
let rebuilt = collect_from_stream(items);
assert_eq!(rebuilt, vec![Value::Int(42)]);
}
}