use tempfile::TempDir;
use serde_json::json;
use crate::error::Error;
use crate::store::{FollowOption, Frame, ReadOptions, Store};
#[tokio::test]
async fn test_action_with_pipeline() -> Result<(), Error> {
let (_dir, store) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let frame_action = store.append(
Frame::builder("xs.action.echo.create")
.hash(
store
.cas_insert(
r#"{
run: {|frame|
let input = if ($frame.hash != null) { .cas $frame.hash } else { null }
let n = $frame.meta.args.n
1..($n) | each {$"($in): ($input)"}
}
return_options: { target: "cas" }
}"#,
)
.await?,
)
.build(),
)?;
assert_eq!(recver.recv().await.unwrap().topic, "xs.action.echo.create");
assert_eq!(recver.recv().await.unwrap().topic, "xs.action.echo.active");
let frame_call = store.append(
Frame::builder("echo.call")
.hash(store.cas_insert(r#"foo"#).await?)
.meta(json!({"args": {"n": 3}}))
.build(),
)?;
assert_eq!(recver.recv().await.unwrap().topic, "echo.call");
let frame = recver.recv().await.unwrap();
assert_eq!(frame.topic, "echo.response");
let meta = frame.meta.as_ref().expect("Meta should be present");
assert_eq!(meta["action_id"], frame_action.id.to_string());
assert_eq!(meta["frame_id"], frame_call.id.to_string());
let hash = frame.hash.as_ref().expect("Hash should be present");
let content = store.cas_read(hash).await?;
let content_str = String::from_utf8(content)?;
let values: Vec<String> = serde_json::from_str(&content_str)?;
assert_eq!(values, vec!["1: foo", "2: foo", "3: foo"]);
assert_no_more_frames(&mut recver).await;
Ok(())
}
#[tokio::test]
async fn test_action_error_handling() -> Result<(), Error> {
let (_dir, store) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let frame_action = store
.append(
Frame::builder("xs.action.will_error.create")
.hash(
store
.cas_insert(
r#"{
run: {|frame|
$frame.meta.args.not_exists # This will error
}
}"#,
)
.await?,
)
.build(),
)
.unwrap();
assert_eq!(
recver.recv().await.unwrap().topic,
"xs.action.will_error.create"
);
assert_eq!(
recver.recv().await.unwrap().topic,
"xs.action.will_error.active"
);
let frame_call = store
.append(
Frame::builder("will_error.call")
.hash(store.cas_insert(r#""input""#).await?)
.meta(json!({"args": {}}))
.build(),
)
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "will_error.call");
let frame = recver.recv().await.unwrap();
assert_eq!(frame.topic, "will_error.error");
let meta = frame.meta.as_ref().expect("Meta should be present");
assert_eq!(meta["action_id"], frame_action.id.to_string());
assert_eq!(meta["frame_id"], frame_call.id.to_string());
assert!(meta["error"].as_str().unwrap().contains("not_exists"));
assert_no_more_frames(&mut recver).await;
Ok(())
}
#[tokio::test]
async fn test_action_single_value() -> Result<(), Error> {
let (_dir, store) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let frame_action = store.append(
Frame::builder("xs.action.single.create")
.hash(
store
.cas_insert(
r#"{
run: {|frame| "single value output"}
return_options: { target: "cas" }
}"#,
)
.await?,
)
.build(),
)?;
assert_eq!(
recver.recv().await.unwrap().topic,
"xs.action.single.create"
);
assert_eq!(
recver.recv().await.unwrap().topic,
"xs.action.single.active"
);
let frame_call = store.append(Frame::builder("single.call").build())?;
assert_eq!(recver.recv().await.unwrap().topic, "single.call");
let frame_resp = recver.recv().await.unwrap();
assert_eq!(frame_resp.topic, "single.response");
let meta_resp = frame_resp.meta.as_ref().expect("Meta should be present");
assert_eq!(meta_resp["action_id"], frame_action.id.to_string());
assert_eq!(meta_resp["frame_id"], frame_call.id.to_string());
let hash = frame_resp.hash.as_ref().expect("Hash should be present");
let content = store.cas_read(hash).await?;
let content_str = String::from_utf8(content)?;
let value: String = serde_json::from_str(&content_str)?;
assert_eq!(value, "single value output".to_string());
assert_no_more_frames(&mut recver).await;
Ok(())
}
#[tokio::test]
async fn test_action_empty_output() -> Result<(), Error> {
let (_dir, store) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let frame_action = store.append(
Frame::builder("xs.action.empty.create")
.hash(
store
.cas_insert(
r#"{
run: {|frame|}
}"#,
)
.await?,
)
.build(),
)?;
assert_eq!(recver.recv().await.unwrap().topic, "xs.action.empty.create");
assert_eq!(recver.recv().await.unwrap().topic, "xs.action.empty.active");
let frame_call = store.append(Frame::builder("empty.call").build())?;
assert_eq!(recver.recv().await.unwrap().topic, "empty.call");
let frame = recver.recv().await.unwrap();
assert_eq!(frame.topic, "empty.response");
let meta = frame.meta.as_ref().expect("Meta should be present");
assert_eq!(meta["action_id"], frame_action.id.to_string());
assert_eq!(meta["frame_id"], frame_call.id.to_string());
assert!(frame.hash.is_none(), "empty output should have no hash");
assert_no_more_frames(&mut recver).await;
Ok(())
}
#[tokio::test]
async fn test_action_tee_and_append() -> Result<(), Error> {
let (_dir, store) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let frame_action = store.append(
Frame::builder("xs.action.numbers.create")
.hash(
store
.cas_insert(
r#"{
run: {|frame|
[1 2 3] | tee { collect { math sum } | to json -r | .append sum }
}
return_options: { target: "cas" }
}"#,
)
.await?,
)
.build(),
)?;
assert_eq!(
recver.recv().await.unwrap().topic,
"xs.action.numbers.create"
);
assert_eq!(
recver.recv().await.unwrap().topic,
"xs.action.numbers.active"
);
let frame_call = store.append(Frame::builder("numbers.call").build())?;
assert_eq!(recver.recv().await.unwrap().topic, "numbers.call");
let expected_meta = json!({"action_id": frame_action.id, "frame_id": frame_call.id});
let frame = recver.recv().await.unwrap();
assert_eq!(frame.topic, "sum");
assert_eq!(frame.meta.unwrap(), expected_meta);
let content = store.cas_read(&frame.hash.unwrap()).await?;
let content_str = String::from_utf8(content)?;
assert_eq!(content_str, "6");
let frame = recver.recv().await.unwrap();
assert_eq!(frame.topic, "numbers.response");
assert_eq!(frame.meta.unwrap(), expected_meta);
let content = store.cas_read(&frame.hash.unwrap()).await?;
let content_str = String::from_utf8(content)?;
let values: Vec<i64> = serde_json::from_str(&content_str)?;
assert_eq!(values, vec![1, 2, 3]);
assert_no_more_frames(&mut recver).await;
Ok(())
}
#[tokio::test]
async fn test_action_record_output_goes_to_meta() -> Result<(), Error> {
let (_dir, store) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let frame_action = store.append(
Frame::builder("xs.action.rec.create")
.hash(
store
.cas_insert(
r#"{
run: {|frame| {status: "ok", count: 42} }
}"#,
)
.await?,
)
.build(),
)?;
assert_eq!(recver.recv().await.unwrap().topic, "xs.action.rec.create");
assert_eq!(recver.recv().await.unwrap().topic, "xs.action.rec.active");
let frame_call = store.append(Frame::builder("rec.call").build())?;
assert_eq!(recver.recv().await.unwrap().topic, "rec.call");
let frame = recver.recv().await.unwrap();
assert_eq!(frame.topic, "rec.response");
assert!(frame.hash.is_none(), "record output should not use CAS");
let meta = frame.meta.unwrap();
assert_eq!(meta["action_id"], frame_action.id.to_string());
assert_eq!(meta["frame_id"], frame_call.id.to_string());
assert_eq!(meta["status"], "ok");
assert_eq!(meta["count"], 42);
assert_no_more_frames(&mut recver).await;
Ok(())
}
#[tokio::test]
async fn inv4_action_term_removes_action_and_blocks_calls() -> Result<(), Error> {
let (_dir, store) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let define = store
.append(
Frame::builder("xs.action.echo.create")
.hash(
store
.cas_insert(r#"{run: {|frame| "hi"}, return_options: {target: "cas"}}"#)
.await?,
)
.build(),
)
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "xs.action.echo.create");
let ready = recver.recv().await.unwrap();
assert_eq!(ready.topic, "xs.action.echo.active");
assert_eq!(
ready.meta.as_ref().unwrap()["action_id"],
define.id.to_string()
);
let call = store.append(Frame::builder("echo.call").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "echo.call");
let resp = recver.recv().await.unwrap();
assert_eq!(resp.topic, "echo.response");
let _ = call;
store
.append(Frame::builder("xs.action.echo.term").build())
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "xs.action.echo.term");
let fin = recver.recv().await.unwrap();
assert_eq!(fin.topic, "xs.action.echo.fin.term");
store.append(Frame::builder("echo.call").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "echo.call");
assert_no_more_frames(&mut recver).await;
Ok(())
}
#[tokio::test]
async fn inv6_action_acks_carry_source_pointer() -> Result<(), Error> {
let (_dir, store) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let define = store
.append(
Frame::builder("xs.action.tr.create")
.hash(store.cas_insert(r#"{run: {|frame| "hi"}}"#).await?)
.build(),
)
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "xs.action.tr.create");
let active = recver.recv().await.unwrap();
assert_eq!(active.topic, "xs.action.tr.active");
assert_eq!(
active.meta.as_ref().unwrap()["action_id"],
define.id.to_string()
);
let term = store
.append(Frame::builder("xs.action.tr.term").build())
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "xs.action.tr.term");
let fin = recver.recv().await.unwrap();
assert_eq!(fin.topic, "xs.action.tr.fin.term");
assert_eq!(fin.meta.as_ref().unwrap()["frame_id"], term.id.to_string());
let bad = store
.append(
Frame::builder("xs.action.tr.create")
.hash(store.cas_insert(r#"not valid nu"#).await?)
.build(),
)
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "xs.action.tr.create");
let invalid = recver.recv().await.unwrap();
assert_eq!(invalid.topic, "xs.action.tr.invalid");
assert_eq!(
invalid.meta.as_ref().unwrap()["action_id"],
bad.id.to_string()
);
Ok(())
}
#[tokio::test]
async fn inv8_action_single_live_instance_per_name() -> Result<(), Error> {
let (_dir, store) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let _first = store
.append(
Frame::builder("xs.action.echo.create")
.hash(
store
.cas_insert(
r#"{run: {|frame| {who: "first"}}, return_options: {target: "cas"}}"#,
)
.await?,
)
.build(),
)
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "xs.action.echo.create");
assert_eq!(recver.recv().await.unwrap().topic, "xs.action.echo.active");
let second = store
.append(
Frame::builder("xs.action.echo.create")
.hash(
store
.cas_insert(
r#"{run: {|frame| {who: "second"}}, return_options: {target: "cas"}}"#,
)
.await?,
)
.build(),
)
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "xs.action.echo.create");
assert_eq!(recver.recv().await.unwrap().topic, "xs.action.echo.active");
let call = store.append(Frame::builder("echo.call").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "echo.call");
let resp = recver.recv().await.unwrap();
assert_eq!(resp.topic, "echo.response");
let meta = resp.meta.as_ref().unwrap();
assert_eq!(meta["action_id"], second.id.to_string());
assert_eq!(meta["frame_id"], call.id.to_string());
assert_no_more_frames(&mut recver).await;
Ok(())
}
async fn assert_no_more_frames(recver: &mut tokio::sync::mpsc::Receiver<Frame>) {
let timeout = tokio::time::sleep(std::time::Duration::from_millis(50));
tokio::pin!(timeout);
tokio::select! {
Some(frame) = recver.recv() => {
panic!("Unexpected frame processed: {:?}", frame);
}
_ = &mut timeout => {
}
}
}
async fn setup_test_environment() -> (TempDir, Store) {
let temp_dir = TempDir::new().unwrap();
let store = Store::new(temp_dir.path().to_path_buf()).unwrap();
{
let store = store.clone();
drop(tokio::spawn(async move {
crate::processor::action::run(store).await.unwrap();
}));
}
(temp_dir, store)
}