#[cfg(test)]
mod tests {
use crate::channel::manager::{ChannelManager, IncomingHandler, ManagedChannel};
use crate::channel::node::ChannelsRegistry;
use crate::channel::wrapper::tests::make_wrapper;
use crate::config::{ConfigManager, MapConfigManager};
use crate::executor::Executor;
use crate::flow::manager::{
ChannelNodeConfig, ExecutionReport, Flow, FlowManager, NodeConfig, NodeKind, ResolveError,
TemplateContext, ToolNodeConfig, ValueOrTemplate,
};
use crate::flow::session::{InMemorySessionStore, SessionStoreType};
use crate::flow::state::{InMemoryState, SessionStateType, StateValue};
use crate::logger::{LogConfig, Logger, OpenTelemetryLogger};
use crate::mapper::{CopyKey, CopyMapper, Mapper};
use crate::message::Message;
use crate::node::{ChannelOrigin, NodeContext, NodeErr, NodeError, NodeOut, NodeType, Routing};
use crate::process::debug_process::DebugProcessNode;
use crate::process::manager::{BuiltInProcess, ProcessManager};
use crate::process::script_process::ScriptProcessNode;
use crate::secret::{TestSecretsManager, SecretsManager};
use async_trait::async_trait;
use channel_plugin::message::{MessageContent, MessageDirection, Participant};
use dashmap::DashMap;
use petgraph::visit::Topo;
use schemars::{JsonSchema, Schema, schema_for};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tempfile::TempDir;
impl Flow {
pub fn equal_for_test(&self, other: &Self) -> bool {
self.id() == other.id()
&& self.title() == other.title()
&& self.description() == other.description()
&& self.channels() == other.channels()
&& self.nodes() == other.nodes()
&& self.connections() == other.connections()
}
}
fn dummy_flow(name: &str) -> Flow {
let flow = Flow::new(name.to_string(), name.to_string(), "dummy".to_string())
.add_node("start".to_string(), dummy_tool_node())
.build();
flow
}
fn dummy_tool_node() -> NodeConfig {
NodeConfig {
id: "start".to_string(),
kind: NodeKind::Tool {
tool: ToolNodeConfig {
name: "noop".to_string(),
action: "noop".to_string(),
in_map: None,
out_map: None,
err_map: None,
on_ok: None,
on_err: None,
},
},
config: None,
max_retries: Some(0),
retry_delay_secs: Some(0),
}
}
fn make_executor() -> Arc<Executor> {
let secrets = SecretsManager(TestSecretsManager::new());
let logger = Logger(Box::new(OpenTelemetryLogger::new()));
Executor::new(secrets, logger)
}
struct DummyCtx;
impl TemplateContext for DummyCtx {
fn render_template(&self, template: &str) -> Result<String, String> {
Ok(template.to_string())
}
}
fn make_ctx() -> NodeContext {
NodeContext::new(
"123".to_string(),
InMemoryState::new(), DashMap::new(), Executor::dummy(),
ChannelManager::dummy(),
ProcessManager::dummy(),
SecretsManager(TestSecretsManager::new()),
None, )
}
#[derive(Clone, JsonSchema, Debug, Serialize, Deserialize)]
struct FailableNode {
#[serde(skip)]
#[schemars(skip)]
counter: Arc<AtomicUsize>,
}
#[async_trait]
#[typetag::serde]
impl NodeType for FailableNode {
fn type_name(&self) -> String {
"failable".into()
}
fn schema(&self) -> Schema {
schema_for!(FailableNode)
}
async fn process(
&self,
msg: Message,
_ctx: &mut NodeContext,
) -> Result<NodeOut, crate::node::NodeErr> {
let prev = self.counter.fetch_add(1, Ordering::SeqCst);
if prev == 0 {
Err(NodeErr::fail(NodeError::ExecutionFailed("boom".into())))
} else {
Ok(NodeOut::with_routing(msg, Routing::FollowGraph))
}
}
fn clone_box(&self) -> Box<dyn crate::node::NodeType> {
Box::new(self.clone())
}
}
#[tokio::test]
async fn test_linear_run() {
let dbg = BuiltInProcess::Debug(DebugProcessNode { print: false });
let flow = Flow::new("linear", "Linear", "A ā B ā C")
.add_node(
"start".into(),
NodeConfig::new(
"start",
NodeKind::Process {
process: dbg.clone(),
},
None,
),
)
.add_node(
"middle".into(),
NodeConfig::new(
"middle",
NodeKind::Process {
process: dbg.clone(),
},
None,
),
)
.add_node(
"end".into(),
NodeConfig::new(
"end",
NodeKind::Process {
process: dbg.clone(),
},
None,
),
)
.add_connection("start".into(), vec!["middle".into()])
.add_connection("middle".into(), vec!["end".into()])
.build();
let mut ctx = make_ctx();
let msg = Message::new("m1", json!({"foo":"bar"}), "123".to_string());
let report = flow.clone().run(msg.clone(), "start", &mut ctx).await;
assert!(report.error.is_none());
assert_eq!(report.records.len(), 3);
let ids: Vec<_> = report.records.iter().map(|r| r.node_id.as_str()).collect();
assert_eq!(ids, &["start", "middle", "end"]);
for rec in report.records {
assert!(matches!(rec.result, Ok(ref out) if out.message().payload() == msg.payload()));
}
}
#[tokio::test]
async fn test_branch_and_merge() {
let dbg = BuiltInProcess::Debug(DebugProcessNode { print: false });
let flow = Flow::new("branch", "Branch & Merge", "")
.add_node(
"A".into(),
NodeConfig::new(
"A",
NodeKind::Process {
process: dbg.clone(),
},
None,
),
)
.add_node(
"B".into(),
NodeConfig::new(
"B",
NodeKind::Process {
process: dbg.clone(),
},
None,
),
)
.add_node(
"C".into(),
NodeConfig::new(
"C",
NodeKind::Process {
process: dbg.clone(),
},
None,
),
)
.add_node(
"D".into(),
NodeConfig::new(
"D",
NodeKind::Process {
process: dbg.clone(),
},
None,
),
)
.add_connection("A".into(), vec!["B".into(), "C".into()])
.add_connection("B".into(), vec!["D".into()])
.add_connection("C".into(), vec!["D".into()])
.build();
let mut ctx = make_ctx();
let input = Message::new("m2", json!({"val":123}), "123".to_string());
let report = flow.clone().run(input.clone(), "A", &mut ctx).await;
assert!(report.error.is_none());
let ids: Vec<_> = report.records.iter().map(|r| r.node_id.as_str()).collect();
assert_eq!(ids, &["A", "B", "C", "D", "D"]);
assert!(report.error.is_none());
let ids: Vec<_> = report.records.iter().map(|r| r.node_id.as_str()).collect();
assert_eq!(ids, &["A", "B", "C", "D", "D"]);
for r in &report.records[0..3] {
assert!(matches!(r.result, Ok(ref o) if o.message().payload() == input.payload()));
}
let d_records: Vec<_> = report.records.iter().filter(|r| r.node_id == "D").collect();
assert_eq!(d_records.len(), 2);
for r in d_records {
match &r.result {
Ok(out) => {
assert_eq!(out.message().payload(), input.payload());
}
Err(e) => panic!("D failed with error: {:?}", e),
}
}
}
#[tokio::test]
async fn test_retry_once() {
let fp = BuiltInProcess::Script(ScriptProcessNode::new(
r#"
if "tries" !in state {
state["tries"] = 1;
throw "boom";
} else {
payload
}
"#
.to_string(),
));
let process = NodeConfig::new(
"start",
NodeKind::Process {
process: fp.clone(),
},
None,
);
let debug = NodeConfig::new(
"end",
NodeKind::Process {
process: BuiltInProcess::Debug(DebugProcessNode { print: false }),
},
None,
);
let flow = Flow::new("retry", "Retry", "fail once then succeed")
.add_node("start".into(), process)
.add_node("end".into(), debug)
.add_connection("start".into(), vec!["end".into()])
.build();
flow.nodes().get_mut("start").unwrap().max_retries = Some(1);
let mut ctx = make_ctx();
let input = Message::new("m-retry", json!({"x":1}), "123".to_string());
let report = flow.run(input.clone(), "start", &mut ctx).await;
let recs = &report.records;
assert_eq!(recs.len(), 3);
assert_eq!(recs[0].attempt, 0);
assert!(matches!(recs[0].result, Err(_)));
assert_eq!(recs[1].attempt, 1);
assert!(matches!(recs[1].result, Ok(_)));
assert_eq!(recs[2].node_id, "end");
assert!(report.error.is_none());
}
#[tokio::test]
async fn test_out_only_override() {
let a_node = BuiltInProcess::Script(ScriptProcessNode::new(
r#"
if "tries" !in state {
state["tries"] = 1;
throw "boom";
} else {
let json = #{
"__greentic": #{
"payload": payload,
"out": ["X"]
}
};
return json;
}
"#
.to_string(),
));
let dbg = BuiltInProcess::Debug(DebugProcessNode { print: false });
let flow = Flow::new("override", "out_only override", "")
.add_node(
"A".into(),
NodeConfig::new("A", NodeKind::Process { process: a_node }, None),
)
.add_node(
"X".into(),
NodeConfig::new(
"X",
NodeKind::Process {
process: dbg.clone(),
},
None,
),
)
.add_node(
"Y".into(),
NodeConfig::new(
"Y",
NodeKind::Process {
process: dbg.clone(),
},
None,
),
)
.add_connection("A".into(), vec!["X".into(), "Y".into()])
.build();
let mut ctx = make_ctx();
let input = Message::new("m-o", json!({"ok":true}), "123".to_string());
let report = flow.run(input.clone(), "A", &mut ctx).await;
assert!(report.error.is_none());
let ids: Vec<_> = report.records.iter().map(|r| r.node_id.as_str()).collect();
assert_eq!(ids, &["A", "A", "X"]);
}
#[tokio::test]
async fn test_err_only_override() {
let a_node = BuiltInProcess::Script(ScriptProcessNode::new(
r#"
if "tries" !in state {
state["tries"] = 1;
throw "boom";
} else {
let json = #{
"__greentic": #{
"payload": payload,
"err": ["Z"]
}
};
return json;
}
"#
.to_string(),
));
let dbg = BuiltInProcess::Debug(DebugProcessNode { print: false });
let flow = Flow::new("test_err_only_override", "test_err_only_override", "")
.add_node(
"A".into(),
NodeConfig::new("A", NodeKind::Process { process: a_node }, None),
)
.add_node(
"Z".into(),
NodeConfig::new(
"Z",
NodeKind::Process {
process: dbg.clone(),
},
None,
),
)
.add_connection("A".into(), vec!["Z".into()])
.build();
let mut ctx = make_ctx();
let input = Message::new("m-o", json!({"ok":true}), "123".to_string());
let report = flow.run(input.clone(), "A", &mut ctx).await;
assert!(report.error.is_none());
let ids: Vec<_> = report.records.iter().map(|r| r.node_id.as_str()).collect();
assert_eq!(ids, &["A", "A", "Z"]);
let attempt_0 = &report.records[0];
assert!(attempt_0.result.is_err());
let attempt_1 = &report.records[1];
assert!(attempt_1.result.is_err());
}
#[tokio::test]
async fn test_err_and_out_prefers_err() {
let a_node = BuiltInProcess::Script(ScriptProcessNode::new(
r#"
if "tries" !in state {
state["tries"] = 1;
throw boom;
} else {
let json = #{
"__greentic": #{
"payload": payload,
"out": ["Y"],
"err": ["Z"]
}
};
return json;
}
"#
.to_string(),
));
let dbg = BuiltInProcess::Debug(DebugProcessNode { print: false });
let flow = Flow::new(
"test_err_and_out_prefers_err",
"test_err_and_out_prefers_err",
"",
)
.add_node(
"A".into(),
NodeConfig::new("A", NodeKind::Process { process: a_node }, None),
)
.add_node(
"Y".into(),
NodeConfig::new(
"Y",
NodeKind::Process {
process: dbg.clone(),
},
None,
),
)
.add_node(
"Z".into(),
NodeConfig::new(
"Z",
NodeKind::Process {
process: dbg.clone(),
},
None,
),
)
.add_connection("A".into(), vec!["Y".into(), "Z".into()])
.build();
let mut ctx = make_ctx();
let input = Message::new("m-o", json!({"ok":true}), "123".to_string());
let report = flow.run(input.clone(), "A", &mut ctx).await;
assert!(report.error.is_none());
let ids: Vec<_> = report.records.iter().map(|r| r.node_id.as_str()).collect();
assert_eq!(ids, &["A", "A", "Z"]);
}
#[tokio::test]
async fn test_channel_out_node() {
let channel = NodeConfig::new(
"chan",
NodeKind::Channel {
cfg: ChannelNodeConfig {
channel_name: "mock".into(),
channel_in: false,
channel_out: true,
channel_remote: false,
from: None,
to: Some(vec![ValueOrTemplate::Value(Participant::new(
"dbg".into(),
None,
None,
))]),
content: None,
thread_id: None,
reply_to_id: None,
},
},
None,
);
let process = NodeConfig::new(
"dbg",
NodeKind::Process {
process: BuiltInProcess::Debug(DebugProcessNode { print: false }),
},
None,
);
let flow = Flow::new("chanout", "Channel Out", "")
.add_channel("mock".to_string())
.add_node("chan".into(), channel)
.add_node("dbg".into(), process)
.add_connection("chan".into(), vec!["dbg".into()])
.build();
let mut ctx = make_ctx();
let cm = ctx.channel_manager();
let wrapper = make_wrapper().await;
let mock = ManagedChannel::new(wrapper, None, None);
assert!(cm.register_channel("mock".to_string(), mock).await.is_ok());
let m = Message::new("m-c", json!({"foo":"bar"}), "123".to_string());
let report = flow.run(m.clone(), "chan", &mut ctx).await;
assert!(report.error.is_none());
let ids: Vec<_> = report.records.iter().map(|r| r.node_id.as_str()).collect();
assert_eq!(ids, &["chan", "dbg"]);
assert!(
matches!(report.records[1].result, Ok(ref o) if o.message().payload() == m.payload())
);
}
#[test]
fn value_or_template_resolves_value_directly() {
let v: ValueOrTemplate<i32> = ValueOrTemplate::Value(100);
let ctx = DummyCtx;
assert_eq!(v.resolve(&ctx).unwrap(), 100);
}
#[test]
fn value_or_template_resolves_from_template() {
let tmpl: ValueOrTemplate<String> = ValueOrTemplate::Template("\"hello world\"".into());
let ctx = DummyCtx;
assert_eq!(tmpl.resolve(&ctx).unwrap(), "hello world");
}
#[test]
fn value_or_template_parse_error() {
let bad: ValueOrTemplate<i32> = ValueOrTemplate::Template("not a number".into());
let ctx = DummyCtx;
match bad.resolve(&ctx) {
Err(ResolveError::Parse(_)) => {}
other => panic!("Expected Parse error, got: {:?}", other),
}
}
#[tokio::test]
async fn create_out_msg_error_on_missing_to_and_no_origin() {
let executor = make_executor();
let secrets = SecretsManager(TestSecretsManager::new());
let cfg_mgr = ConfigManager(MapConfigManager::new());
let store = InMemorySessionStore::new(10);
let channel_mgr =
ChannelManager::new(cfg_mgr, secrets.clone(), "123".to_string(), store, LogConfig::default())
.await
.expect("channel manager");
let tempdir = TempDir::new().unwrap();
let process_mgr = ProcessManager::new(tempdir.path()).unwrap();
let ctx = NodeContext::new(
"123".to_string(),
InMemoryState::new(),
DashMap::new(),
executor.clone(),
channel_mgr.clone(),
Arc::new(process_mgr.clone()),
secrets.clone(),
None,
);
let cfg = ChannelNodeConfig {
channel_name: "test".into(),
channel_in: false,
channel_out: true,
channel_remote: false,
from: None,
to: None,
content: None,
thread_id: None,
reply_to_id: None,
};
let result = cfg.create_out_msg(
&ctx,
"id1".into(),
"123".to_string(),
json!("payload"),
MessageDirection::Outgoing,
);
assert!(result.is_err(), "Expected error, got {:?}", result);
}
#[tokio::test]
async fn create_out_msg_uses_template_for_to_and_content() {
let executor = make_executor();
let secrets = SecretsManager(TestSecretsManager::new());
let cfg_mgr = ConfigManager(MapConfigManager::new());
let store = InMemorySessionStore::new(10);
let channel_mgr =
ChannelManager::new(cfg_mgr, secrets.clone(),"123".to_string(), store, LogConfig::default())
.await
.expect("channel manager");
let state = InMemoryState::new();
let part_json = json!({ "id": "p1", "display_name": "Alice", "channel_specific_id": "a1" });
let part_val: StateValue = serde_json::from_value(part_json.clone()).unwrap();
state.set("recipient".into(), part_val);
let tempdir = TempDir::new().unwrap();
let process_mgr = ProcessManager::new(tempdir.path()).unwrap();
let ctx = NodeContext::new(
"123".to_string(),
state,
DashMap::new(),
executor.clone(),
channel_mgr.clone(),
Arc::new(process_mgr.clone()),
secrets.clone(),
None,
);
let cfg = ChannelNodeConfig {
channel_name: "ch".into(),
channel_in: false,
channel_out: true,
channel_remote: false,
from: None,
to: Some(vec![ValueOrTemplate::Template("{{recipient.id}}".into())]),
content: Some(ValueOrTemplate::Value(MessageContent::Text {
text: "fixed".into(),
})),
thread_id: None,
reply_to_id: None,
};
let msg = cfg
.create_out_msg(
&ctx,
"id2".into(),
"123".to_string(),
json!("ignored"),
MessageDirection::Outgoing,
)
.expect("message can be produced");
assert_eq!(msg.to.len(), 1);
let rcpt = &msg.to[0];
assert_eq!(rcpt.id, "p1");
assert_eq!(
msg.content,
vec![MessageContent::Text {
text: "fixed".into()
}]
);
}
#[test]
fn complex_flow_serializes_and_validates_against_schema() {
let mock_in = NodeConfig::new(
"mock_in".to_string(),
NodeKind::Channel {
cfg: ChannelNodeConfig {
channel_name: "mock".to_string(),
channel_in: true,
channel_out: false,
channel_remote: false,
from: None,
to: None,
content: None,
thread_id: None,
reply_to_id: None,
},
},
None,
);
let mock_middle = NodeConfig::new(
"mock_middle".to_string(),
NodeKind::Channel {
cfg: ChannelNodeConfig {
channel_name: "mock".to_string(),
channel_in: true,
channel_out: true,
channel_remote: false,
from: None,
to: None,
content: None,
thread_id: None,
reply_to_id: None,
},
},
None,
);
let mock_out = NodeConfig::new(
"mock_out".to_string(),
NodeKind::Channel {
cfg: ChannelNodeConfig {
channel_name: "mock".to_string(),
channel_in: false,
channel_out: true,
channel_remote: false,
from: None,
to: None,
content: None,
thread_id: None,
reply_to_id: None,
},
},
None,
);
let weather_in = NodeConfig::new(
"weather_in".to_string(),
NodeKind::Tool {
tool: ToolNodeConfig {
name: "weather_api".to_string(),
action: "forecast_weather".to_string(),
in_map: Some(Mapper::Copy(CopyMapper {
payload: Some(vec![
CopyKey::Key("q".to_string()),
CopyKey::Key("days".to_string()),
]),
config: None,
state: None,
})),
out_map: None,
err_map: None,
on_ok: None,
on_err: None,
},
},
None,
)
.with_retry(2, 1);
let weather_out = NodeConfig::new(
"weather_out".to_string(),
NodeKind::Tool {
tool: ToolNodeConfig {
name: "weather_api".to_string(),
action: "forecast_weather".to_string(),
in_map: Some(Mapper::Copy(CopyMapper {
payload: Some(vec![
CopyKey::Key("q".to_string()),
CopyKey::Key("days".to_string()),
]),
config: None,
state: None,
})),
out_map: None,
err_map: None,
on_ok: None,
on_err: None,
},
},
None,
)
.with_retry(2, 1);
let flow = Flow::new(
"sample.greentic".to_string(),
"TelegramāWeather Forecast Flow".to_string(),
"A sample flow".to_string(),
)
.add_channel("mock".to_string())
.add_node("mock_in".to_string(), mock_in)
.add_node("mock_middle".to_string(), mock_middle)
.add_node("mock_out".to_string(), mock_out)
.add_node("weather_in".to_string(), weather_in)
.add_node("weather_out".to_string(), weather_out)
.add_connection("mock_in".to_string(), vec!["weather_in".to_string()])
.add_connection("weather_in".to_string(), vec!["mock_middle".to_string()])
.add_connection("mock_middle".to_string(), vec!["weather_out".to_string()])
.add_connection("weather_out".to_string(), vec!["mock_out".to_string()])
.build();
let schema = schema_for!(Flow);
let schema_json = serde_json::to_value(&schema).unwrap();
let instance = serde_json::to_value(&flow).unwrap();
let compiled = jsonschema::validator_for(&schema_json).expect("schema compiles");
assert!(compiled.is_valid(&instance), "instance did not validate");
let rendered = serde_json::to_string_pretty(&instance).unwrap();
println!("{}", rendered);
let expected = r#"
{
"id": "sample.greentic",
"title": "TelegramāWeather Forecast Flow",
"description": "A sample flow",
"channels": [
"mock"
],
"nodes": {
"mock_in": {
"channel": "mock",
"max_retries": 3,
"retry_delay_secs": 1,
"in": true
},
"mock_middle": {
"channel": "mock",
"max_retries": 3,
"retry_delay_secs": 1,
"in": true
},
"mock_middle__out": {
"channel": "mock_out",
"out": true
},
"mock_out": {
"channel": "mock",
"max_retries": 3,
"retry_delay_secs": 1,
"out": true
},
"weather_in": {
"tool": {
"name": "weather_api",
"action": "forecast_weather",
"in_map": { "type": "copy", "payload": ["q", "days"] }
},
"max_retries": 2,
"retry_delay_secs": 1
},
"weather_out": {
"tool": {
"name": "weather_api",
"action": "forecast_weather",
"in_map": { "type": "copy", "payload": ["q", "days"] }
},
"max_retries": 2,
"retry_delay_secs": 1
}
},
"connections": {
"mock_in": ["weather_in"],
"mock_middle": ["mock_middle__out"],
"mock_middle__out":["weather_out"],
"weather_in": ["mock_middle"],
"weather_out": ["mock_out"]
}
}
"#;
let expected_value: serde_json::Value = serde_json::from_str(expected).unwrap();
assert_eq!(instance, expected_value);
}
#[test]
fn json_roundtrip_and_build_graph() {
let n1 = NodeConfig {
id: "n1".into(),
kind: NodeKind::Channel {
cfg: ChannelNodeConfig {
channel_name: "slack".to_string(),
channel_in: true,
channel_out: false,
channel_remote: false,
from: None,
to: None,
content: None,
thread_id: None,
reply_to_id: None,
},
},
config: None,
max_retries: Some(2),
retry_delay_secs: Some(0),
};
let n2 = NodeConfig {
id: "n2".into(),
kind: NodeKind::Channel {
cfg: ChannelNodeConfig {
channel_name: "slack".to_string(),
channel_in: false,
channel_out: true,
channel_remote: false,
from: None,
to: None,
content: None,
thread_id: None,
reply_to_id: None,
},
},
config: None,
max_retries: Some(2),
retry_delay_secs: Some(0),
};
let flow = Flow::new("fid", "My Flow", "testing roundtrip")
.add_node("n1".into(), n1)
.add_node("n2".into(), n2)
.add_connection("n1".into(), vec!["n2".into()])
.add_connection("n2".into(), vec![]);
let text = serde_json::to_string_pretty(&flow).expect("serialize");
let flow2: Flow = serde_json::from_str(&text).expect("deserialize");
let built = flow2.build();
assert_eq!(built.graph().node_count(), 2);
let mut topo = Topo::new(&built.graph());
let mut seen = Vec::new();
while let Some(nx) = topo.next(&built.graph()) {
let cfg = &built.graph()[nx];
seen.push(cfg.id.clone());
}
assert_eq!(seen, vec!["n1".to_string(), "n2".to_string()]);
}
#[test]
#[should_panic(expected = "has cycles")]
fn build_cycle_panics() {
let n1 = NodeConfig {
id: "n1".into(),
kind: NodeKind::Channel {
cfg: ChannelNodeConfig {
channel_name: "c".to_string(),
channel_in: true,
channel_out: false,
channel_remote: false,
from: None,
to: None,
content: None,
thread_id: None,
reply_to_id: None,
},
},
config: None,
max_retries: Some(1),
retry_delay_secs: Some(0),
};
let n2 = NodeConfig {
id: "n2".into(),
kind: NodeKind::Channel {
cfg: ChannelNodeConfig {
channel_name: "c".to_string(),
channel_in: false,
channel_out: true,
channel_remote: false,
from: None,
to: None,
content: None,
thread_id: None,
reply_to_id: None,
},
},
config: None,
max_retries: Some(1),
retry_delay_secs: Some(0),
};
let _ = Flow::new("fid", "cyclic", "should fail")
.add_node("n1".into(), n1)
.add_node("n2".into(), n2)
.add_connection("n1".into(), vec!["n2".into()])
.add_connection("n2".into(), vec!["n1".into()])
.build();
}
#[tokio::test]
async fn run_two_channel_nodes() {
let first = NodeConfig {
id: "first".into(),
kind: NodeKind::Channel {
cfg: ChannelNodeConfig {
channel_name: "mock".to_string(),
channel_in: true,
channel_out: false,
channel_remote: false,
from: None,
to: None,
content: None,
thread_id: None,
reply_to_id: None,
},
},
config: None,
max_retries: Some(0),
retry_delay_secs: Some(0),
};
let second = NodeConfig {
id: "second".into(),
kind: NodeKind::Channel {
cfg: ChannelNodeConfig {
channel_name: "mock".to_string(),
channel_in: false,
channel_out: true,
channel_remote: false,
from: None,
to: None,
content: None,
thread_id: None,
reply_to_id: None,
},
},
config: None,
max_retries: Some(0),
retry_delay_secs: Some(0),
};
let flow = Flow::new("fid", "seq", "two step")
.add_channel("mock")
.add_node("first".into(), first)
.add_node("second".into(), second)
.add_connection("first".into(), vec!["second".into()])
.add_connection("second".into(), vec![])
.build();
let msg = Message::new("msg1", json!({ "hello": "world" }), "123".to_string());
let store = InMemorySessionStore::new(10);
let executor = make_executor();
let secrets = SecretsManager(TestSecretsManager::new());
let config_mgr = ConfigManager(MapConfigManager::new());
let channel_manager = ChannelManager::new(
config_mgr,
secrets.clone(),
"123".to_string(),
store.clone(),
LogConfig::default(),
)
.await
.expect("could not create channel manager");
let tempdir = TempDir::new().unwrap();
let process_mgr = ProcessManager::new(tempdir.path()).unwrap();
let fm = FlowManager::new(
store.clone(),
executor.clone(),
channel_manager.clone(),
Arc::new(process_mgr.clone()),
secrets.clone(),
);
let registry = ChannelsRegistry::new(fm.clone(), channel_manager.clone()).await;
channel_manager.subscribe_incoming(registry.clone() as Arc<dyn IncomingHandler>);
let wrapper = make_wrapper().await;
channel_manager
.register_channel("mock".into(), ManagedChannel::new(wrapper, None, None))
.await
.expect("failed to register noop channel");
fm.register_flow(flow.clone());
let participant = Participant {
id: "id".to_string(),
display_name: None,
channel_specific_id: None,
};
let co = ChannelOrigin::new("channel".to_string(), None, None, participant, false);
let mut ctx = NodeContext::new(
"123".to_string(),
store.get_or_create("123").await,
DashMap::new(),
executor,
channel_manager,
Arc::new(process_mgr),
secrets,
Some(co),
);
let report: ExecutionReport = flow.run(msg.clone(), "first", &mut ctx).await;
assert_eq!(report.records.len(), 2);
assert_eq!(report.records[0].node_id, "first");
assert_eq!(report.records[1].node_id, "second");
for rec in &report.records {
assert_eq!(rec.attempt, 0);
assert!(rec.result.is_ok(), "expected success, got {:?}", rec.result);
}
assert!(report.total.num_milliseconds() >= 0);
store.clear();
}
#[tokio::test]
async fn test_lazy_flow_registration() {
let session_store = InMemorySessionStore::new(15);
let flow = dummy_flow("lazy_flow");
let manager = FlowManager::new_test(session_store.clone());
manager.register_flow(flow.clone());
let msg = Message::new("1", serde_json::json!({"q": "hello"}), "sess1".to_string());
let report = manager
.process_message("lazy_flow", "start", msg.clone(), None)
.await;
assert!(report.is_some());
assert!(report.as_ref().unwrap().error.is_none());
let session = session_store.get("sess1").await.unwrap();
let flows = session.flows().unwrap();
assert!(flows.contains(&"lazy_flow".to_string()));
}
#[tokio::test]
async fn test_block_disallowed_flow() {
let session_store = InMemorySessionStore::new(15);
let flow = dummy_flow("blocked_flow");
let manager = FlowManager::new_test(session_store.clone());
manager.register_flow(flow.clone());
let session = session_store.get_or_create("sess2").await;
session.set_flows(vec!["allowed_flow".to_string()]);
let msg = Message::new(
"2",
serde_json::json!({"q": "block test"}),
"sess2".to_string(),
);
let report = manager
.process_message("blocked_flow", "start", msg.clone(), None)
.await;
assert!(report.is_some());
let r = report.unwrap();
println!("@@@ REMOVE {:?}", r);
assert!(r.records.is_empty());
assert!(r.error.is_some());
assert_eq!(r.total.num_milliseconds(), 0);
}
#[tokio::test]
async fn test_valid_flow_executes() {
let session_store = InMemorySessionStore::new(15);
let flow = dummy_flow("valid_flow");
let manager = FlowManager::new_test(session_store.clone());
manager.register_flow(flow.clone());
let session = session_store.get_or_create("sess3").await;
session.set_flows(vec!["valid_flow".to_string()]);
let msg = Message::new("3", serde_json::json!({"q": "run"}), "sess3".to_string());
let report = manager
.process_message("valid_flow", "start", msg.clone(), None)
.await;
assert!(report.is_some());
let r = report.unwrap();
assert!(!r.records.is_empty());
assert!(r.error.is_none());
}
}