flowstdlib 1.0.0

The standard library of functions and flows for 'flow' programs
Documentation
use serde_json::{json, Value};

use flowcore::errors::Result;
use flowcore::flow_output;
use flowcore::RunAgain;
use flowmacro::flow_function;

#[flow_function]
fn inner_accumulate(
    values: &Value,
    partial: &Value,
    chunk_size: &Value,
) -> Result<(Option<Value>, RunAgain)> {
    if values.is_null() {
        let partial_arr = partial.as_array().ok_or("Could not get partial")?;
        return if partial_arr.is_empty() {
            flow_output!("chunk" => Value::Null)
        } else {
            flow_output!("chunk" => Value::Array(partial_arr.clone()))
        };
    }

    let mut partial_input = partial.clone();
    let chunk_size = chunk_size.as_u64().filter(|&s| s > 0);

    let partial = partial_input
        .as_array_mut()
        .ok_or("Could not get partial")?;
    partial.push(values.clone());

    if let Some(size) = chunk_size {
        if partial.len() >= usize::try_from(size)? {
            flow_output!(
                "chunk" => Value::Array(partial.clone()),
                "partial" => Value::Array(vec![]),
                "chunk_size" => json!(size),
            )
        } else {
            flow_output!(
                "partial" => Value::Array(partial.clone()),
                "chunk_size" => json!(size),
            )
        }
    } else {
        flow_output!(
            "partial" => Value::Array(partial.clone()),
            "chunk_size" => json!(0),
        )
    }
}

#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod test {
    use serde_json::json;

    use super::inner_accumulate;

    #[test]
    fn accumulate_start_and_finish() {
        let value = json!(1);
        let partial = json!([]);
        let chunk_size = json!(1);

        let (result, _) =
            inner_accumulate(&value, &partial, &chunk_size).expect("_accumulate() failed");
        let output = result.expect("Could not get the Value from the output");
        assert_eq!(
            output
                .pointer("/chunk")
                .expect("Could not get the /chunk from the output"),
            &json!([1])
        );
    }

    #[test]
    fn accumulate_start_not_finish() {
        let value = json!(1);
        let partial = json!([]);
        let chunk_size = json!(2);

        let (result, _) =
            inner_accumulate(&value, &partial, &chunk_size).expect("_accumulate() failed");
        let output = result.expect("Could not get the Value from the output");
        assert_eq!(output.pointer("/chunk"), None);
        assert_eq!(
            output
                .pointer("/partial")
                .expect("Could not get the /partial from the output"),
            &json!([1])
        );
    }

    #[test]
    fn accumulate_started_then_finish() {
        let value = json!(2);
        let partial = json!([1]);
        let chunk_size = json!(2);

        let (result, _) =
            inner_accumulate(&value, &partial, &chunk_size).expect("_accumulate() failed");
        let output = result.expect("Could not get the Value from the output");
        assert_eq!(
            output
                .pointer("/chunk")
                .expect("Could not get the /chunk from the output"),
            &json!([1, 2])
        );
    }
}