use tempfile::TempDir;
use crate::error::Error;
use crate::store::TTL;
use crate::store::{FollowOption, Frame, ReadOptions, Store};
use std::collections::HashSet;
macro_rules! validate_actor_output_frame {
($frame_expr:expr, $expected_topic:expr, $handler:expr, $trigger:expr, $state_frame:expr) => {{
let frame = $frame_expr; assert_eq!(frame.topic, $expected_topic, "Unexpected topic");
let meta = frame.meta.as_ref().expect("Meta is None");
assert_eq!(
meta["actor_id"],
$handler.id.to_string(),
"Unexpected actor_id"
);
assert_eq!(
meta["frame_id"],
$trigger.id.to_string(),
"Unexpected frame_id"
);
let state_frame: Option<&Frame> = $state_frame; if let Some(state_frame) = state_frame {
assert_eq!(
meta["state_id"],
state_frame.id.to_string(),
"Unexpected state_id"
);
}
}};
}
macro_rules! validate_actor_output_frames {
($recver:expr, $handler:expr, $trigger:expr, $state_frame:expr, [$( $topic:expr ),+ $(,)?]) => {{
let state_frame: Option<&Frame> = $state_frame; $(
validate_actor_output_frame!(
$recver.recv().await.unwrap(),
$topic,
$handler,
$trigger,
state_frame
);
)+
}};
}
macro_rules! validate_frame {
($frame:expr, { $( $field:ident : $value:expr ),* $(,)? }) => {{
let frame = $frame;
$(
validate_field!(frame, $field : $value);
)*
}};
}
macro_rules! validate_field {
($frame:expr, topic : $value:expr) => {{
assert_eq!(
$frame.topic, $value,
"Topic mismatch: expected '{}', got '{}'",
$value, $frame.topic
);
}};
($frame:expr, error : $value:expr) => {{
let meta = $frame.meta.as_ref().expect("Meta is None");
let error_message = meta["error"]
.as_str()
.expect("Expected 'error' to be a string");
assert!(
error_message.contains($value),
"Error message '{}' does not contain expected substring '{}'",
error_message,
$value
);
}};
($frame:expr, $field:ident : $value:expr) => {{
let meta = $frame.meta.as_ref().expect("Meta is None");
let key = match stringify!($field) {
"handler" => "actor_id",
"trigger" => "frame_id",
"state" => "state_id",
_ => panic!("Invalid field: {}", stringify!($field)),
};
assert_eq!(
meta[key],
$value.id.to_string(),
"{} mismatch: expected '{}', got '{}'",
key,
$value.id.to_string(),
meta[key]
);
}};
}
#[tokio::test]
async fn test_register_invalid_closure_no_args() {
let (store, _temp_dir) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(
recver.recv().await.unwrap().topic,
"xs.threshold".to_string()
);
let frame_actor = store
.append(
Frame::builder("invalid.register")
.hash(store.cas_insert(r#"{run: {|| 42}}"#).await.unwrap())
.build(),
)
.unwrap();
assert_eq!(
recver.recv().await.unwrap().topic,
"invalid.register".to_string()
);
validate_frame!(
recver.recv().await.unwrap(), {
topic: "invalid.unregistered",
handler: frame_actor,
error: "Closure must accept exactly 2 params",
});
assert_no_more_frames(&mut recver).await;
}
#[tokio::test]
async fn test_register_invalid_closure_old_one_arg() {
let (store, _temp_dir) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(
recver.recv().await.unwrap().topic,
"xs.threshold".to_string()
);
let frame_actor = store
.append(
Frame::builder("invalid.register")
.hash(
store
.cas_insert(r#"{run: {|frame| $frame}}"#)
.await
.unwrap(),
)
.build(),
)
.unwrap();
assert_eq!(
recver.recv().await.unwrap().topic,
"invalid.register".to_string()
);
validate_frame!(
recver.recv().await.unwrap(), {
topic: "invalid.unregistered",
handler: frame_actor,
error: "Closure must accept exactly 2 params",
});
assert_no_more_frames(&mut recver).await;
}
#[tokio::test]
async fn test_register_parse_error() {
let (store, _temp_dir) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(
recver.recv().await.unwrap().topic,
"xs.threshold".to_string()
);
let frame_actor = store
.append(
Frame::builder("invalid.register")
.hash(
store
.cas_insert(
r#"
{
run: {|frame, state = null|
.last index.html | .cas
}
}
"#,
)
.await
.unwrap(),
)
.build(),
)
.unwrap();
assert_eq!(
recver.recv().await.unwrap().topic,
"invalid.register".to_string()
);
validate_frame!(
recver.recv().await.unwrap(), {
topic: "invalid.unregistered",
handler: frame_actor,
error: "Parse error", });
assert_no_more_frames(&mut recver).await;
}
#[tokio::test]
async fn test_no_self_loop() {
let (store, _temp_dir) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(
recver.recv().await.unwrap().topic,
"xs.threshold".to_string()
);
store
.append(
Frame::builder("echo.register")
.hash(
store
.cas_insert(r#"{run: {|frame, state = null| {out: $frame, next: $state}}}"#)
.await
.unwrap(),
)
.build(),
)
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "echo.register");
assert_eq!(recver.recv().await.unwrap().topic, "echo.active");
store.append(Frame::builder("a-frame").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "a-frame");
assert_eq!(recver.recv().await.unwrap().topic, "echo.out");
assert_no_more_frames(&mut recver).await;
}
#[tokio::test]
async fn test_essentials() {
let (store, _temp_dir) = setup_test_environment().await;
let pew1 = store.append(Frame::builder("pew").build()).unwrap();
let pew2 = store.append(Frame::builder("pew").build()).unwrap();
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "pew");
assert_eq!(recver.recv().await.unwrap().topic, "pew");
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let _pointer_frame = store
.append(
Frame::builder("action.out")
.meta(serde_json::json!({
"frame_id": pew1.id.to_string()
}))
.build(),
)
.unwrap();
let actor_proto = Frame::builder("action.register")
.hash(
store
.cas_insert(
r#"
{
run: {|frame, state = null|
if $frame.topic == "pew" {
{out: {status: "processed"}, next: $state}
} else {
{next: $state}
}
}
start: (.last "action.out" | get meta.frame_id)
}"#,
)
.await
.unwrap(),
)
.build();
let frame_actor = store.append(actor_proto.clone()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "action.out"); assert_eq!(recver.recv().await.unwrap().topic, "action.register");
let frame = recver.recv().await.unwrap();
assert_eq!(frame.topic, "action.active");
let meta = frame.meta.unwrap();
assert_eq!(meta["actor_id"], frame_actor.id.to_string());
assert_eq!(meta["start"]["after"], pew1.id.to_string());
validate_frame!(recver.recv().await.unwrap(), {
topic: "action.out",
handler: &frame_actor,
trigger: &pew2,
});
assert_no_more_frames(&mut recver).await;
store
.append(Frame::builder("action.unregister").build())
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "action.unregister");
assert_eq!(recver.recv().await.unwrap().topic, "action.unregistered");
assert_no_more_frames(&mut recver).await;
let frame_actor_2 = store.append(actor_proto.clone()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "action.register");
let frame = recver.recv().await.unwrap();
assert_eq!(frame.topic, "action.active");
let meta = frame.meta.unwrap();
assert_eq!(meta["actor_id"], frame_actor_2.id.to_string());
assert_eq!(meta["start"]["after"], pew2.id.to_string());
let pew3 = store.append(Frame::builder("pew").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "pew");
validate_frame!(recver.recv().await.unwrap(), {
topic: "action.out",
handler: &frame_actor_2,
trigger: &pew3,
});
assert_no_more_frames(&mut recver).await;
}
#[tokio::test]
async fn test_unregister_on_error() {
let (store, _temp_dir) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let frame_trigger = store.append(Frame::builder("trigger").build()).unwrap();
validate_frame!(recver.recv().await.unwrap(), {topic: "trigger"});
let _ = store.append(Frame::builder("trigger").build()).unwrap();
validate_frame!(recver.recv().await.unwrap(), {topic: "trigger"});
let frame_actor = store
.append(
Frame::builder("error.register")
.hash(
store
.cas_insert(
r#"{
run: {|frame, state = null|
let x = {"foo": null}
$x.foo.bar # Will error at runtime - null access
}
start: "first"
}"#,
)
.await
.unwrap(),
)
.build(),
)
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "error.register");
assert_eq!(recver.recv().await.unwrap().topic, "error.active");
validate_frame!(recver.recv().await.unwrap(), {
topic: "error.unregistered",
handler: &frame_actor,
trigger: &frame_trigger,
error: "nothing doesn't support cell paths",
});
assert_no_more_frames(&mut recver).await;
}
#[tokio::test]
async fn test_return_options() {
let (store, _temp_dir) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let actor_proto = Frame::builder("echo.register")
.hash(
store
.cas_insert(
r#"{
return_options: {
suffix: ".warble"
ttl: "last:1"
}
run: {|frame, state = null|
if $frame.topic == "ping" {
{out: {reply: "pong"}, next: $state}
} else {
{next: $state}
}
}
}"#,
)
.await
.unwrap(),
)
.build();
let frame_actor = store.append(actor_proto).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "echo.register");
assert_eq!(recver.recv().await.unwrap().topic, "echo.active");
let frame1 = store.append(Frame::builder("ping").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "ping");
let response1 = recver.recv().await.unwrap();
assert_eq!(response1.topic, "echo.warble");
assert_eq!(response1.ttl, Some(TTL::Last(1)));
let meta = response1.meta.unwrap();
assert_eq!(meta["actor_id"], frame_actor.id.to_string());
assert_eq!(meta["frame_id"], frame1.id.to_string());
let frame2 = store.append(Frame::builder("ping").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "ping");
let response2 = recver.recv().await.unwrap();
assert_eq!(response2.topic, "echo.warble");
let meta = response2.meta.unwrap();
assert_eq!(meta["frame_id"], frame2.id.to_string());
store.wait_for_gc().await;
let options = ReadOptions::default();
let recver = store.read(options).await;
use tokio_stream::StreamExt;
let frames: Vec<_> = tokio_stream::wrappers::ReceiverStream::new(recver)
.filter(|f| f.topic == "echo.warble")
.collect::<Vec<_>>()
.await;
assert_eq!(frames.len(), 1);
assert_eq!(
frames[0].meta.as_ref().unwrap()["frame_id"],
frame2.id.to_string()
);
}
#[tokio::test]
async fn test_binary_return_value() {
let (store, _temp_dir) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let actor_proto = Frame::builder("binary.register")
.hash(
store
.cas_insert(
r#"{
return_options: { target: "cas" }
run: {|frame, state = null|
if $frame.topic == "trigger" {
{out: ('test' | to msgpackz), next: $state}
} else {
{next: $state}
}
}
}"#,
)
.await
.unwrap(),
)
.build();
let frame_actor = store.append(actor_proto).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "binary.register");
assert_eq!(recver.recv().await.unwrap().topic, "binary.active");
let trigger_frame = store.append(Frame::builder("trigger").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "trigger");
let output_frame = recver.recv().await.unwrap();
assert_eq!(output_frame.topic, "binary.out");
let meta = output_frame.meta.unwrap();
assert_eq!(meta["actor_id"], frame_actor.id.to_string());
assert_eq!(meta["frame_id"], trigger_frame.id.to_string());
let stored_content = store.cas_read(&output_frame.hash.unwrap()).await.unwrap();
assert_ne!(stored_content, b"null");
assert_ne!(stored_content, b"\"null\"");
assert!(!stored_content.is_empty());
let content_str = String::from_utf8_lossy(&stored_content);
assert!(content_str.contains("test"));
assert_ne!(content_str, "null");
assert_ne!(content_str, "\"null\"");
assert_no_more_frames(&mut recver).await;
}
#[tokio::test]
async fn test_custom_append() {
let (store, _temp_dir) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let actor_proto = Frame::builder("action.register")
.hash(
store
.cas_insert(
r#"{
run: {|frame, state = null|
if $frame.topic != "trigger" { {next: $state} } else {
"1" | .append topic1 --meta {"t": "1"}
"2" | .append topic2 --meta {"t": "2"}
{out: {value: "out"}, next: $state}
}
}
}"#,
)
.await
.unwrap(),
)
.build();
let frame_actor = store.append(actor_proto.clone()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "action.register");
assert_eq!(recver.recv().await.unwrap().topic, "action.active");
let trigger_frame = store.append(Frame::builder("trigger").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "trigger");
validate_actor_output_frames!(
recver,
frame_actor,
trigger_frame,
None,
["topic1", "topic2", "action.out"]
);
assert_no_more_frames(&mut recver).await;
}
#[tokio::test]
async fn test_actor_replacement() {
let (store, _temp_dir) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let _ = store
.append(
Frame::builder("h.register")
.hash(
store
.cas_insert(
r#"{run: {|frame, state = null|
if $frame.topic == "trigger" {
{out: {handler: "handler1"}, next: $state}
} else {
{next: $state}
}
}}"#,
)
.await
.unwrap(),
)
.build(),
)
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "h.register");
assert_eq!(recver.recv().await.unwrap().topic, "h.active");
let actor2 = store
.append(
Frame::builder("h.register")
.hash(
store
.cas_insert(
r#"{run: {|frame, state = null|
if $frame.topic == "trigger" {
{out: {handler: "handler2"}, next: $state}
} else {
{next: $state}
}
}}"#,
)
.await
.unwrap(),
)
.build(),
)
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "h.register");
let topics: HashSet<_> = [
recver.recv().await.unwrap().topic,
recver.recv().await.unwrap().topic,
]
.into_iter()
.collect();
assert_eq!(
topics,
HashSet::from(["h.unregistered".to_string(), "h.active".to_string(),])
);
let trigger = store.append(Frame::builder("trigger").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "trigger");
let response = recver.recv().await.unwrap();
assert_eq!(response.topic, "h.out");
let meta = response.meta.as_ref().unwrap();
assert_eq!(meta["actor_id"], actor2.id.to_string());
assert_eq!(meta["frame_id"], trigger.id.to_string());
assert_eq!(meta["handler"], "handler2");
assert_no_more_frames(&mut recver).await;
}
#[tokio::test]
async fn test_actor_with_module() -> Result<(), Error> {
let (store, _temp_dir) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
store
.append(
Frame::builder("mymod.nu")
.hash(
store
.cas_insert(
r#"
# Add two numbers and format result
export def add_nums [x, y] {
$"sum is ($x + $y)"
}
"#,
)
.await?,
)
.build(),
)
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "mymod.nu");
let actor_script = r#"{
run: {|frame, state = null|
if $frame.topic == "trigger" {
use mymod
{out: {result: (mymod add_nums 40 2)}, next: $state}
} else {
{next: $state}
}
}
}"#;
let frame_actor = store
.append(
Frame::builder("test.register")
.hash(store.cas_insert(&actor_script).await?)
.build(),
)
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "test.register");
assert_eq!(recver.recv().await.unwrap().topic, "test.active");
let trigger = store.append(Frame::builder("trigger").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "trigger");
let output = recver.recv().await.unwrap();
validate_actor_output_frame!(&output, "test.out", frame_actor, trigger, None);
assert_eq!(output.meta.as_ref().unwrap()["result"], "sum is 42");
assert_no_more_frames(&mut recver).await;
Ok(())
}
#[tokio::test]
async fn test_actor_preserve_env() -> Result<(), Error> {
let (store, _temp_dir) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let _ = store
.append(
Frame::builder("abc.init")
.hash(store.cas_insert(r#"42"#).await.unwrap())
.build(),
)
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "abc.init");
let _ = store
.append(
Frame::builder("abc.delta")
.hash(store.cas_insert(r#"2"#).await.unwrap())
.build(),
)
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "abc.delta");
let frame_actor = store
.append(
Frame::builder("test.register")
.hash(store.cas_insert_sync(
r#"
$env.abc = .last abc.init | .cas $in.hash | from json
def --env inc-abc [] {
$env.abc = $env.abc + (.last abc.delta | .cas $in.hash | from json)
$env.abc
}
{
run: {|frame, state = null|
if $frame.topic == "trigger" {
{out: {value: (inc-abc)}, next: $state}
} else {
{next: $state}
}
}
}
"#,
)?)
.build(),
)
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "test.register");
assert_eq!(recver.recv().await.unwrap().topic, "test.active");
let trigger = store.append(Frame::builder("trigger").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "trigger");
let output = recver.recv().await.unwrap();
validate_actor_output_frame!(&output, "test.out", frame_actor, trigger, None);
assert_eq!(output.meta.as_ref().unwrap()["value"], 44);
let trigger = store.append(Frame::builder("trigger").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "trigger");
let output = recver.recv().await.unwrap();
validate_actor_output_frame!(&output, "test.out", frame_actor, trigger, None);
assert_eq!(output.meta.as_ref().unwrap()["value"], 46);
assert_no_more_frames(&mut recver).await;
Ok(())
}
#[tokio::test]
async fn test_state_threading() {
let (store, _temp_dir) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let frame_actor = store
.append(
Frame::builder("counter.register")
.hash(
store
.cas_insert(
r#"{
run: {|frame, state = 0|
if $frame.topic == "trigger" {
{out: {count: $state}, next: ($state + 1)}
} else {
{next: $state}
}
}
}"#,
)
.await
.unwrap(),
)
.build(),
)
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "counter.register");
assert_eq!(recver.recv().await.unwrap().topic, "counter.active");
store.append(Frame::builder("trigger").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "trigger");
let output = recver.recv().await.unwrap();
assert_eq!(output.topic, "counter.out");
assert_eq!(output.meta.as_ref().unwrap()["count"], 0);
store.append(Frame::builder("noise").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "noise");
assert_no_more_frames(&mut recver).await;
store.append(Frame::builder("trigger").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "trigger");
let output = recver.recv().await.unwrap();
assert_eq!(output.topic, "counter.out");
assert_eq!(output.meta.as_ref().unwrap()["count"], 1);
store.append(Frame::builder("trigger").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "trigger");
let output = recver.recv().await.unwrap();
assert_eq!(output.topic, "counter.out");
let meta = output.meta.as_ref().unwrap();
assert_eq!(meta["actor_id"], frame_actor.id.to_string());
assert_eq!(meta["count"], 2);
assert_no_more_frames(&mut recver).await;
}
#[tokio::test]
async fn test_out_only_stops() {
let (store, _temp_dir) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let frame_actor = store
.append(
Frame::builder("stopper.register")
.hash(
store
.cas_insert(
r#"{
run: {|frame, state = null|
{out: {msg: "goodbye"}}
}
start: "first"
}"#,
)
.await
.unwrap(),
)
.build(),
)
.unwrap();
let trigger = store.append(Frame::builder("trigger").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "stopper.register");
assert_eq!(recver.recv().await.unwrap().topic, "trigger");
assert_eq!(recver.recv().await.unwrap().topic, "stopper.active");
let output = recver.recv().await.unwrap();
assert_eq!(output.topic, "stopper.out");
assert_eq!(output.meta.as_ref().unwrap()["msg"], "goodbye");
validate_frame!(recver.recv().await.unwrap(), {
topic: "stopper.unregistered",
handler: &frame_actor,
trigger: &trigger,
});
assert_no_more_frames(&mut recver).await;
}
#[tokio::test]
async fn test_nothing_stops() {
let (store, _temp_dir) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let frame_actor = store
.append(
Frame::builder("stopper.register")
.hash(
store
.cas_insert(
r#"{
run: {|frame, state = null|
null
}
start: "first"
}"#,
)
.await
.unwrap(),
)
.build(),
)
.unwrap();
let trigger = store.append(Frame::builder("trigger").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "stopper.register");
assert_eq!(recver.recv().await.unwrap().topic, "trigger");
assert_eq!(recver.recv().await.unwrap().topic, "stopper.active");
validate_frame!(recver.recv().await.unwrap(), {
topic: "stopper.unregistered",
handler: &frame_actor,
trigger: &trigger,
});
assert_no_more_frames(&mut recver).await;
}
#[tokio::test]
async fn test_extra_keys_error() {
let (store, _temp_dir) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let frame_actor = store
.append(
Frame::builder("bad.register")
.hash(
store
.cas_insert(
r#"{
run: {|frame, state = null|
{out: 1, next: 2, bad: 3}
}
start: "first"
}"#,
)
.await
.unwrap(),
)
.build(),
)
.unwrap();
let trigger = store.append(Frame::builder("trigger").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "bad.register");
assert_eq!(recver.recv().await.unwrap().topic, "trigger");
assert_eq!(recver.recv().await.unwrap().topic, "bad.active");
validate_frame!(recver.recv().await.unwrap(), {
topic: "bad.unregistered",
handler: &frame_actor,
trigger: &trigger,
error: "Unexpected key 'bad'",
});
assert_no_more_frames(&mut recver).await;
}
#[tokio::test]
async fn test_initial_config() {
let (store, _temp_dir) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
store
.append(
Frame::builder("counter.register")
.hash(
store
.cas_insert(
r#"{
initial: 10
run: {|frame, state = 0|
if $frame.topic == "trigger" {
{out: {count: $state}, next: ($state + 1)}
} else {
{next: $state}
}
}
}"#,
)
.await
.unwrap(),
)
.build(),
)
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "counter.register");
assert_eq!(recver.recv().await.unwrap().topic, "counter.active");
store.append(Frame::builder("trigger").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "trigger");
let output = recver.recv().await.unwrap();
assert_eq!(output.topic, "counter.out");
assert_eq!(output.meta.as_ref().unwrap()["count"], 10);
store.append(Frame::builder("trigger").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "trigger");
let output = recver.recv().await.unwrap();
assert_eq!(output.topic, "counter.out");
assert_eq!(output.meta.as_ref().unwrap()["count"], 11);
assert_no_more_frames(&mut recver).await;
}
#[tokio::test]
async fn test_initial_config_required_state() {
let (store, _temp_dir) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
store
.append(
Frame::builder("counter.register")
.hash(
store
.cas_insert(
r#"{
initial: 100
run: {|frame, state|
if $frame.topic == "trigger" {
{out: {count: $state}, next: ($state + 1)}
} else {
{next: $state}
}
}
}"#,
)
.await
.unwrap(),
)
.build(),
)
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "counter.register");
assert_eq!(recver.recv().await.unwrap().topic, "counter.active");
store.append(Frame::builder("trigger").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "trigger");
let output = recver.recv().await.unwrap();
assert_eq!(output.topic, "counter.out");
assert_eq!(output.meta.as_ref().unwrap()["count"], 100);
assert_no_more_frames(&mut recver).await;
}
#[tokio::test]
async fn test_required_state_defaults_to_null() {
let (store, _temp_dir) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
store
.append(
Frame::builder("test.register")
.hash(
store
.cas_insert(
r#"{
run: {|frame, state|
if $frame.topic == "trigger" {
{out: {is_null: ($state == null)}, next: 42}
} else {
{next: $state}
}
}
}"#,
)
.await
.unwrap(),
)
.build(),
)
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "test.register");
assert_eq!(recver.recv().await.unwrap().topic, "test.active");
store.append(Frame::builder("trigger").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "trigger");
let output = recver.recv().await.unwrap();
assert_eq!(output.topic, "test.out");
assert_eq!(output.meta.as_ref().unwrap()["is_null"], true);
store.append(Frame::builder("trigger").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "trigger");
let output = recver.recv().await.unwrap();
assert_eq!(output.topic, "test.out");
assert_eq!(output.meta.as_ref().unwrap()["is_null"], false);
assert_no_more_frames(&mut recver).await;
}
#[tokio::test]
async fn test_default_param_value() {
let (store, _temp_dir) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
store
.append(
Frame::builder("counter.register")
.hash(
store
.cas_insert(
r#"{
run: {|frame, state = 42|
if $frame.topic == "trigger" {
{out: {count: $state}, next: ($state + 1)}
} else {
{next: $state}
}
}
}"#,
)
.await
.unwrap(),
)
.build(),
)
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "counter.register");
assert_eq!(recver.recv().await.unwrap().topic, "counter.active");
store.append(Frame::builder("trigger").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "trigger");
let output = recver.recv().await.unwrap();
assert_eq!(output.topic, "counter.out");
assert_eq!(output.meta.as_ref().unwrap()["count"], 42);
store.append(Frame::builder("trigger").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "trigger");
let output = recver.recv().await.unwrap();
assert_eq!(output.topic, "counter.out");
assert_eq!(output.meta.as_ref().unwrap()["count"], 43);
assert_no_more_frames(&mut recver).await;
}
#[tokio::test]
async fn test_non_record_return_error() {
let (store, _temp_dir) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
let frame_actor = store
.append(
Frame::builder("bad.register")
.hash(
store
.cas_insert(
r#"{
run: {|frame, state = null|
"bare string"
}
start: "first"
}"#,
)
.await
.unwrap(),
)
.build(),
)
.unwrap();
let trigger = store.append(Frame::builder("trigger").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "bad.register");
assert_eq!(recver.recv().await.unwrap().topic, "trigger");
assert_eq!(recver.recv().await.unwrap().topic, "bad.active");
validate_frame!(recver.recv().await.unwrap(), {
topic: "bad.unregistered",
handler: &frame_actor,
trigger: &trigger,
error: "Closure must return a record",
});
assert_no_more_frames(&mut recver).await;
}
async fn assert_no_more_frames(recver: &mut tokio::sync::mpsc::Receiver<Frame>) {
let timeout = tokio::time::sleep(std::time::Duration::from_millis(50));
tokio::pin!(timeout);
tokio::select! {
Some(frame) = recver.recv() => {
panic!("Unexpected frame processed: {:?}", frame);
}
_ = &mut timeout => {
}
}
}
#[tokio::test]
async fn test_record_out_goes_to_meta() {
let (store, _temp_dir) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
store
.append(
Frame::builder("metaout.register")
.hash(
store
.cas_insert(
r#"{
run: {|frame, state = 0|
if $frame.topic == "sale" {
let total = $state + $frame.meta.amount
{out: {total: $total}, next: $total}
} else {
{next: $state}
}
}
}"#,
)
.await
.unwrap(),
)
.build(),
)
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "metaout.register");
assert_eq!(recver.recv().await.unwrap().topic, "metaout.active");
store
.append(
Frame::builder("sale")
.meta(serde_json::json!({"amount": 42}))
.build(),
)
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "sale");
let out_frame = recver.recv().await.unwrap();
assert_eq!(out_frame.topic, "metaout.out");
assert!(
out_frame.hash.is_none(),
"Record output should not be stored in CAS"
);
let meta = out_frame.meta.as_ref().unwrap();
assert_eq!(meta["total"], 42);
assert!(meta.get("actor_id").is_some());
assert!(meta.get("frame_id").is_some());
assert_no_more_frames(&mut recver).await;
}
#[tokio::test]
async fn test_non_record_out_errors_without_cas_target() {
let (store, _temp_dir) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
store
.append(
Frame::builder("strerr.register")
.hash(
store
.cas_insert(
r#"{
run: {|frame, state = null|
{out: "a plain string", next: $state}
}
}"#,
)
.await
.unwrap(),
)
.build(),
)
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "strerr.register");
assert_eq!(recver.recv().await.unwrap().topic, "strerr.active");
store.append(Frame::builder("ping").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "ping");
let frame = recver.recv().await.unwrap();
assert_eq!(frame.topic, "strerr.unregistered");
let meta = frame.meta.as_ref().unwrap();
let error = meta["error"].as_str().unwrap();
assert!(
error.contains("record") || error.contains("target"),
"Expected error about non-record output, got: {error}"
);
assert_no_more_frames(&mut recver).await;
}
#[tokio::test]
async fn test_cas_target_allows_non_record_out() {
let (store, _temp_dir) = setup_test_environment().await;
let options = ReadOptions::builder().follow(FollowOption::On).build();
let mut recver = store.read(options).await;
assert_eq!(recver.recv().await.unwrap().topic, "xs.threshold");
store
.append(
Frame::builder("cascfg.register")
.hash(
store
.cas_insert(
r#"{
run: {|frame, state = null|
{out: "stored in cas", next: $state}
}
return_options: { target: "cas" }
}"#,
)
.await
.unwrap(),
)
.build(),
)
.unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "cascfg.register");
assert_eq!(recver.recv().await.unwrap().topic, "cascfg.active");
store.append(Frame::builder("ping").build()).unwrap();
assert_eq!(recver.recv().await.unwrap().topic, "ping");
let out_frame = recver.recv().await.unwrap();
assert_eq!(out_frame.topic, "cascfg.out");
assert!(out_frame.hash.is_some(), "CAS target should store in CAS");
let content = store.cas_read(&out_frame.hash.unwrap()).await.unwrap();
let content_str = std::str::from_utf8(&content).unwrap();
assert!(content_str.contains("stored in cas"));
assert_no_more_frames(&mut recver).await;
}
async fn setup_test_environment() -> (Store, TempDir) {
let temp_dir = TempDir::new().unwrap();
let store = Store::new(temp_dir.path().to_path_buf()).unwrap();
{
let store = store.clone();
drop(tokio::spawn(async move {
crate::processor::actor::run(store).await.unwrap();
}));
}
(store, temp_dir)
}