use serde_json::{json, Value};
use flowcore::errors::Result;
use flowcore::flow_output;
use flowcore::RunAgain;
use flowmacro::flow_function;
#[flow_function]
fn inner_accumulate(
values: &Value,
partial: &Value,
chunk_size: &Value,
) -> Result<(Option<Value>, RunAgain)> {
if values.is_null() {
let partial_arr = partial.as_array().ok_or("Could not get partial")?;
return if partial_arr.is_empty() {
flow_output!("chunk" => Value::Null)
} else {
flow_output!("chunk" => Value::Array(partial_arr.clone()))
};
}
let mut partial_input = partial.clone();
let chunk_size = chunk_size.as_u64().filter(|&s| s > 0);
let partial = partial_input
.as_array_mut()
.ok_or("Could not get partial")?;
partial.push(values.clone());
if let Some(size) = chunk_size {
if partial.len() >= usize::try_from(size)? {
flow_output!(
"chunk" => Value::Array(partial.clone()),
"partial" => Value::Array(vec![]),
"chunk_size" => json!(size),
)
} else {
flow_output!(
"partial" => Value::Array(partial.clone()),
"chunk_size" => json!(size),
)
}
} else {
flow_output!(
"partial" => Value::Array(partial.clone()),
"chunk_size" => json!(0),
)
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod test {
use serde_json::json;
use super::inner_accumulate;
#[test]
fn accumulate_start_and_finish() {
let value = json!(1);
let partial = json!([]);
let chunk_size = json!(1);
let (result, _) =
inner_accumulate(&value, &partial, &chunk_size).expect("_accumulate() failed");
let output = result.expect("Could not get the Value from the output");
assert_eq!(
output
.pointer("/chunk")
.expect("Could not get the /chunk from the output"),
&json!([1])
);
}
#[test]
fn accumulate_start_not_finish() {
let value = json!(1);
let partial = json!([]);
let chunk_size = json!(2);
let (result, _) =
inner_accumulate(&value, &partial, &chunk_size).expect("_accumulate() failed");
let output = result.expect("Could not get the Value from the output");
assert_eq!(output.pointer("/chunk"), None);
assert_eq!(
output
.pointer("/partial")
.expect("Could not get the /partial from the output"),
&json!([1])
);
}
#[test]
fn accumulate_started_then_finish() {
let value = json!(2);
let partial = json!([1]);
let chunk_size = json!(2);
let (result, _) =
inner_accumulate(&value, &partial, &chunk_size).expect("_accumulate() failed");
let output = result.expect("Could not get the Value from the output");
assert_eq!(
output
.pointer("/chunk")
.expect("Could not get the /chunk from the output"),
&json!([1, 2])
);
}
}