cross-stream 0.12.0

An event stream store for personal, local-first use, specializing in event sourcing.
Documentation
use tempfile::TempDir;

use serde_json::json;

use crate::error::Error;
use crate::store::{FollowOption, Frame, ReadOptions, Store};

#[tokio::test]
async fn test_action_with_pipeline() -> Result<(), Error> {
    let (_dir, store) = setup_test_environment().await;
    let options = ReadOptions::builder().follow(FollowOption::On).build();
    let mut recver = store.read(options).await;
    assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");

    // Define the action
    let frame_action = store.append(
        Frame::builder("echo.define")
            .hash(
                store
                    .cas_insert(
                        r#"{
                            run: {|frame|
                                let input = if ($frame.hash != null) { .cas $frame.hash } else { null }
                                let n = $frame.meta.args.n
                                1..($n) | each {$"($in): ($input)"}
                            }
                            return_options: { target: "cas" }
                        }"#,
                    )
                    .await?,
            )
            .build(),
    )?;
    assert_eq!(recver.recv().await.unwrap().topic, "echo.define");
    assert_eq!(recver.recv().await.unwrap().topic, "echo.ready");

    // Call the action
    let frame_call = store.append(
        Frame::builder("echo.call")
            .hash(store.cas_insert(r#"foo"#).await?)
            .meta(json!({"args": {"n": 3}}))
            .build(),
    )?;
    assert_eq!(recver.recv().await.unwrap().topic, "echo.call");

    // Validate the response event with all outputs
    let frame = recver.recv().await.unwrap();
    assert_eq!(frame.topic, "echo.response");
    let meta = frame.meta.as_ref().expect("Meta should be present");
    assert_eq!(meta["action_id"], frame_action.id.to_string());
    assert_eq!(meta["frame_id"], frame_call.id.to_string());

    let hash = frame.hash.as_ref().expect("Hash should be present");
    let content = store.cas_read(hash).await?;
    let content_str = String::from_utf8(content)?;
    let values: Vec<String> = serde_json::from_str(&content_str)?;
    assert_eq!(values, vec!["1: foo", "2: foo", "3: foo"]);

    assert_no_more_frames(&mut recver).await;
    Ok(())
}

#[tokio::test]
async fn test_action_error_handling() -> Result<(), Error> {
    let (_dir, store) = setup_test_environment().await;
    let options = ReadOptions::builder().follow(FollowOption::On).build();
    let mut recver = store.read(options).await;
    assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");

    // Define action that will error with invalid access
    let frame_action = store
        .append(
            Frame::builder("will_error.define")
                .hash(
                    store
                        .cas_insert(
                            r#"{
                            run: {|frame|
                                $frame.meta.args.not_exists # This will error
                            }
                        }"#,
                        )
                        .await?,
                )
                .build(),
        )
        .unwrap();
    assert_eq!(recver.recv().await.unwrap().topic, "will_error.define");
    assert_eq!(recver.recv().await.unwrap().topic, "will_error.ready");

    // Call the action
    let frame_call = store
        .append(
            Frame::builder("will_error.call")
                .hash(store.cas_insert(r#""input""#).await?)
                .meta(json!({"args": {}}))
                .build(),
        )
        .unwrap();
    assert_eq!(recver.recv().await.unwrap().topic, "will_error.call");

    // Should get error event
    let frame = recver.recv().await.unwrap();
    assert_eq!(frame.topic, "will_error.error");
    let meta = frame.meta.as_ref().expect("Meta should be present");
    assert_eq!(meta["action_id"], frame_action.id.to_string());
    assert_eq!(meta["frame_id"], frame_call.id.to_string());
    assert!(meta["error"].as_str().unwrap().contains("not_exists"));

    assert_no_more_frames(&mut recver).await;
    Ok(())
}

#[tokio::test]
async fn test_action_single_value() -> Result<(), Error> {
    let (_dir, store) = setup_test_environment().await;
    let options = ReadOptions::builder().follow(FollowOption::On).build();
    let mut recver = store.read(options).await;
    assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");

    // Define the action
    let frame_action = store.append(
        Frame::builder("single.define")
            .hash(
                store
                    .cas_insert(
                        r#"{
                            run: {|frame| "single value output"}
                            return_options: { target: "cas" }
                        }"#,
                    )
                    .await?,
            )
            .build(),
    )?;
    assert_eq!(recver.recv().await.unwrap().topic, "single.define");
    assert_eq!(recver.recv().await.unwrap().topic, "single.ready");

    // Call the action
    let frame_call = store.append(Frame::builder("single.call").build())?;
    assert_eq!(recver.recv().await.unwrap().topic, "single.call");

    // Expect single response event
    let frame_resp = recver.recv().await.unwrap();
    assert_eq!(frame_resp.topic, "single.response");
    let meta_resp = frame_resp.meta.as_ref().expect("Meta should be present");
    assert_eq!(meta_resp["action_id"], frame_action.id.to_string());
    assert_eq!(meta_resp["frame_id"], frame_call.id.to_string());

    let hash = frame_resp.hash.as_ref().expect("Hash should be present");
    let content = store.cas_read(hash).await?;
    let content_str = String::from_utf8(content)?;
    let value: String = serde_json::from_str(&content_str)?;
    assert_eq!(value, "single value output".to_string());

    assert_no_more_frames(&mut recver).await;
    Ok(())
}

#[tokio::test]
async fn test_action_empty_output() -> Result<(), Error> {
    let (_dir, store) = setup_test_environment().await;
    let options = ReadOptions::builder().follow(FollowOption::On).build();
    let mut recver = store.read(options).await;
    assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");

    // Define the action
    let frame_action = store.append(
        Frame::builder("empty.define")
            .hash(
                store
                    .cas_insert(
                        r#"{
                            run: {|frame|}
                        }"#,
                    )
                    .await?,
            )
            .build(),
    )?;
    assert_eq!(recver.recv().await.unwrap().topic, "empty.define");
    assert_eq!(recver.recv().await.unwrap().topic, "empty.ready");

    // Call the action
    let frame_call = store.append(Frame::builder("empty.call").build())?;
    assert_eq!(recver.recv().await.unwrap().topic, "empty.call");

    // Expect single response event with no hash
    let frame = recver.recv().await.unwrap();
    assert_eq!(frame.topic, "empty.response");
    let meta = frame.meta.as_ref().expect("Meta should be present");
    assert_eq!(meta["action_id"], frame_action.id.to_string());
    assert_eq!(meta["frame_id"], frame_call.id.to_string());
    assert!(frame.hash.is_none(), "empty output should have no hash");

    assert_no_more_frames(&mut recver).await;
    Ok(())
}

#[tokio::test]
async fn test_action_tee_and_append() -> Result<(), Error> {
    let (_dir, store) = setup_test_environment().await;
    let options = ReadOptions::builder().follow(FollowOption::On).build();
    let mut recver = store.read(options).await;
    assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");

    // Define the action that outputs a simple pipeline of 1, 2, 3
    let frame_action = store.append(
        Frame::builder("numbers.define")
            .hash(
                store
                    .cas_insert(
                        r#"{
                            run: {|frame|
                                [1 2 3] | tee { collect { math sum } | to json -r | .append sum }
                            }
                            return_options: { target: "cas" }
                        }"#,
                    )
                    .await?,
            )
            .build(),
    )?;
    assert_eq!(recver.recv().await.unwrap().topic, "numbers.define");
    assert_eq!(recver.recv().await.unwrap().topic, "numbers.ready");

    // Call the action
    let frame_call = store.append(Frame::builder("numbers.call").build())?;
    assert_eq!(recver.recv().await.unwrap().topic, "numbers.call");

    let expected_meta = json!({"action_id": frame_action.id, "frame_id": frame_call.id});

    // Expect sum event from tee side pipeline
    let frame = recver.recv().await.unwrap();
    assert_eq!(frame.topic, "sum");
    assert_eq!(frame.meta.unwrap(), expected_meta);
    let content = store.cas_read(&frame.hash.unwrap()).await?;
    let content_str = String::from_utf8(content)?;
    assert_eq!(content_str, "6");

    // Then expect response with the collected pipeline
    let frame = recver.recv().await.unwrap();
    assert_eq!(frame.topic, "numbers.response");
    assert_eq!(frame.meta.unwrap(), expected_meta);
    let content = store.cas_read(&frame.hash.unwrap()).await?;
    let content_str = String::from_utf8(content)?;
    let values: Vec<i64> = serde_json::from_str(&content_str)?;
    assert_eq!(values, vec![1, 2, 3]);

    assert_no_more_frames(&mut recver).await;
    Ok(())
}

#[tokio::test]
async fn test_action_record_output_goes_to_meta() -> Result<(), Error> {
    let (_dir, store) = setup_test_environment().await;
    let options = ReadOptions::builder().follow(FollowOption::On).build();
    let mut recver = store.read(options).await;
    assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");

    let frame_action = store.append(
        Frame::builder("rec.define")
            .hash(
                store
                    .cas_insert(
                        r#"{
                            run: {|frame| {status: "ok", count: 42} }
                        }"#,
                    )
                    .await?,
            )
            .build(),
    )?;
    assert_eq!(recver.recv().await.unwrap().topic, "rec.define");
    assert_eq!(recver.recv().await.unwrap().topic, "rec.ready");

    let frame_call = store.append(Frame::builder("rec.call").build())?;
    assert_eq!(recver.recv().await.unwrap().topic, "rec.call");

    let frame = recver.recv().await.unwrap();
    assert_eq!(frame.topic, "rec.response");
    assert!(frame.hash.is_none(), "record output should not use CAS");
    let meta = frame.meta.unwrap();
    assert_eq!(meta["action_id"], frame_action.id.to_string());
    assert_eq!(meta["frame_id"], frame_call.id.to_string());
    assert_eq!(meta["status"], "ok");
    assert_eq!(meta["count"], 42);

    assert_no_more_frames(&mut recver).await;
    Ok(())
}

async fn assert_no_more_frames(recver: &mut tokio::sync::mpsc::Receiver<Frame>) {
    let timeout = tokio::time::sleep(std::time::Duration::from_millis(50));
    tokio::pin!(timeout);
    tokio::select! {
        Some(frame) = recver.recv() => {
            panic!("Unexpected frame processed: {:?}", frame);
        }
        _ = &mut timeout => {
            // Success - no additional frames were processed
        }
    }
}

async fn setup_test_environment() -> (TempDir, Store) {
    let temp_dir = TempDir::new().unwrap();
    let store = Store::new(temp_dir.path().to_path_buf()).unwrap();

    {
        let store = store.clone();
        drop(tokio::spawn(async move {
            crate::processor::action::run(store).await.unwrap();
        }));
    }

    (temp_dir, store)
}