use tempfile::TempDir;
use serde_json::json;
use crate::error::Error;
use crate::store::{FollowOption, Frame, ReadOptions, Store};
#[tokio::test]
async fn test_action_with_pipeline() -> Result<(), Error> {
let (_dir, store) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let frame_action = store.append(
Frame::builder("echo.define")
.hash(
store
.cas_insert(
r#"{
run: {|frame|
let input = if ($frame.hash != null) { .cas $frame.hash } else { null }
let n = $frame.meta.args.n
1..($n) | each {$"($in): ($input)"}
}
return_options: { target: "cas" }
}"#,
)
.await?,
)
.build(),
)?;
assert_eq!(recver.recv().await.unwrap().topic, "echo.define");
assert_eq!(recver.recv().await.unwrap().topic, "echo.ready");
let frame_call = store.append(
Frame::builder("echo.call")
.hash(store.cas_insert(r#"foo"#).await?)
.meta(json!({"args": {"n": 3}}))
.build(),
)?;
assert_eq!(recver.recv().await.unwrap().topic, "echo.call");
let frame = recver.recv().await.unwrap();
assert_eq!(frame.topic, "echo.response");
let meta = frame.meta.as_ref().expect("Meta should be present");
assert_eq!(meta["action_id"], frame_action.id.to_string());
assert_eq!(meta["frame_id"], frame_call.id.to_string());
let hash = frame.hash.as_ref().expect("Hash should be present");
let content = store.cas_read(hash).await?;
let content_str = String::from_utf8(content)?;
let values: Vec<String> = serde_json::from_str(&content_str)?;
assert_eq!(values, vec!["1: foo", "2: foo", "3: foo"]);
assert_no_more_frames(&mut recver).await;
Ok(())
}
#[tokio::test]
async fn test_action_error_handling() -> Result<(), Error> {
let (_dir, store) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let frame_action = store
.append(
Frame::builder("will_error.define")
.hash(
store
.cas_insert(
r#"{
run: {|frame|
$frame.meta.args.not_exists # This will error
}
}"#,
)
.await?,
)
.build(),
)
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "will_error.define");
assert_eq!(recver.recv().await.unwrap().topic, "will_error.ready");
let frame_call = store
.append(
Frame::builder("will_error.call")
.hash(store.cas_insert(r#""input""#).await?)
.meta(json!({"args": {}}))
.build(),
)
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "will_error.call");
let frame = recver.recv().await.unwrap();
assert_eq!(frame.topic, "will_error.error");
let meta = frame.meta.as_ref().expect("Meta should be present");
assert_eq!(meta["action_id"], frame_action.id.to_string());
assert_eq!(meta["frame_id"], frame_call.id.to_string());
assert!(meta["error"].as_str().unwrap().contains("not_exists"));
assert_no_more_frames(&mut recver).await;
Ok(())
}
#[tokio::test]
async fn test_action_single_value() -> Result<(), Error> {
let (_dir, store) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let frame_action = store.append(
Frame::builder("single.define")
.hash(
store
.cas_insert(
r#"{
run: {|frame| "single value output"}
return_options: { target: "cas" }
}"#,
)
.await?,
)
.build(),
)?;
assert_eq!(recver.recv().await.unwrap().topic, "single.define");
assert_eq!(recver.recv().await.unwrap().topic, "single.ready");
let frame_call = store.append(Frame::builder("single.call").build())?;
assert_eq!(recver.recv().await.unwrap().topic, "single.call");
let frame_resp = recver.recv().await.unwrap();
assert_eq!(frame_resp.topic, "single.response");
let meta_resp = frame_resp.meta.as_ref().expect("Meta should be present");
assert_eq!(meta_resp["action_id"], frame_action.id.to_string());
assert_eq!(meta_resp["frame_id"], frame_call.id.to_string());
let hash = frame_resp.hash.as_ref().expect("Hash should be present");
let content = store.cas_read(hash).await?;
let content_str = String::from_utf8(content)?;
let value: String = serde_json::from_str(&content_str)?;
assert_eq!(value, "single value output".to_string());
assert_no_more_frames(&mut recver).await;
Ok(())
}
#[tokio::test]
async fn test_action_empty_output() -> Result<(), Error> {
let (_dir, store) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let frame_action = store.append(
Frame::builder("empty.define")
.hash(
store
.cas_insert(
r#"{
run: {|frame|}
}"#,
)
.await?,
)
.build(),
)?;
assert_eq!(recver.recv().await.unwrap().topic, "empty.define");
assert_eq!(recver.recv().await.unwrap().topic, "empty.ready");
let frame_call = store.append(Frame::builder("empty.call").build())?;
assert_eq!(recver.recv().await.unwrap().topic, "empty.call");
let frame = recver.recv().await.unwrap();
assert_eq!(frame.topic, "empty.response");
let meta = frame.meta.as_ref().expect("Meta should be present");
assert_eq!(meta["action_id"], frame_action.id.to_string());
assert_eq!(meta["frame_id"], frame_call.id.to_string());
assert!(frame.hash.is_none(), "empty output should have no hash");
assert_no_more_frames(&mut recver).await;
Ok(())
}
#[tokio::test]
async fn test_action_tee_and_append() -> Result<(), Error> {
let (_dir, store) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let frame_action = store.append(
Frame::builder("numbers.define")
.hash(
store
.cas_insert(
r#"{
run: {|frame|
[1 2 3] | tee { collect { math sum } | to json -r | .append sum }
}
return_options: { target: "cas" }
}"#,
)
.await?,
)
.build(),
)?;
assert_eq!(recver.recv().await.unwrap().topic, "numbers.define");
assert_eq!(recver.recv().await.unwrap().topic, "numbers.ready");
let frame_call = store.append(Frame::builder("numbers.call").build())?;
assert_eq!(recver.recv().await.unwrap().topic, "numbers.call");
let expected_meta = json!({"action_id": frame_action.id, "frame_id": frame_call.id});
let frame = recver.recv().await.unwrap();
assert_eq!(frame.topic, "sum");
assert_eq!(frame.meta.unwrap(), expected_meta);
let content = store.cas_read(&frame.hash.unwrap()).await?;
let content_str = String::from_utf8(content)?;
assert_eq!(content_str, "6");
let frame = recver.recv().await.unwrap();
assert_eq!(frame.topic, "numbers.response");
assert_eq!(frame.meta.unwrap(), expected_meta);
let content = store.cas_read(&frame.hash.unwrap()).await?;
let content_str = String::from_utf8(content)?;
let values: Vec<i64> = serde_json::from_str(&content_str)?;
assert_eq!(values, vec![1, 2, 3]);
assert_no_more_frames(&mut recver).await;
Ok(())
}
#[tokio::test]
async fn test_action_record_output_goes_to_meta() -> Result<(), Error> {
let (_dir, store) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let frame_action = store.append(
Frame::builder("rec.define")
.hash(
store
.cas_insert(
r#"{
run: {|frame| {status: "ok", count: 42} }
}"#,
)
.await?,
)
.build(),
)?;
assert_eq!(recver.recv().await.unwrap().topic, "rec.define");
assert_eq!(recver.recv().await.unwrap().topic, "rec.ready");
let frame_call = store.append(Frame::builder("rec.call").build())?;
assert_eq!(recver.recv().await.unwrap().topic, "rec.call");
let frame = recver.recv().await.unwrap();
assert_eq!(frame.topic, "rec.response");
assert!(frame.hash.is_none(), "record output should not use CAS");
let meta = frame.meta.unwrap();
assert_eq!(meta["action_id"], frame_action.id.to_string());
assert_eq!(meta["frame_id"], frame_call.id.to_string());
assert_eq!(meta["status"], "ok");
assert_eq!(meta["count"], 42);
assert_no_more_frames(&mut recver).await;
Ok(())
}
async fn assert_no_more_frames(recver: &mut tokio::sync::mpsc::Receiver<Frame>) {
let timeout = tokio::time::sleep(std::time::Duration::from_millis(50));
tokio::pin!(timeout);
tokio::select! {
Some(frame) = recver.recv() => {
panic!("Unexpected frame processed: {:?}", frame);
}
_ = &mut timeout => {
}
}
}
async fn setup_test_environment() -> (TempDir, Store) {
let temp_dir = TempDir::new().unwrap();
let store = Store::new(temp_dir.path().to_path_buf()).unwrap();
{
let store = store.clone();
drop(tokio::spawn(async move {
crate::processor::action::run(store).await.unwrap();
}));
}
(temp_dir, store)
}