#![allow(clippy::await_holding_lock)]
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
use nemo_flow::api::event::{Event, ScopeCategory};
use nemo_flow::api::registry::{
deregister_tool_request_intercept, deregister_tool_sanitize_request_guardrail,
register_tool_request_intercept, register_tool_sanitize_request_guardrail,
scope_register_tool_conditional_execution_guardrail, scope_register_tool_request_intercept,
scope_register_tool_sanitize_request_guardrail,
};
use nemo_flow::api::runtime::NemoFlowContextState;
use nemo_flow::api::runtime::ToolExecutionNextFn;
use nemo_flow::api::runtime::global_context;
use nemo_flow::api::runtime::{create_scope_stack, set_thread_scope_stack};
use nemo_flow::api::scope::{ScopeHandle, ScopeType};
use nemo_flow::api::scope::{pop_scope, push_scope};
use nemo_flow::api::subscriber::{
deregister_subscriber, register_subscriber, scope_register_subscriber,
};
use nemo_flow::api::tool::{tool_call, tool_call_end, tool_call_execute};
use nemo_flow::error::FlowError;
use serde_json::json;
static TEST_MUTEX: Mutex<()> = Mutex::new(());
fn reset_global() {
let ctx = global_context();
let mut state = ctx.write().unwrap();
*state = NemoFlowContextState::new();
}
fn setup_isolated_scope(name: &str) -> ScopeHandle {
let stack = create_scope_stack();
set_thread_scope_stack(stack);
push_scope(
nemo_flow::api::scope::PushScopeParams::builder()
.name(name)
.scope_type(ScopeType::Agent)
.build(),
)
.unwrap()
}
#[test]
fn test_scope_local_guardrail_registration_and_execution() {
let _lock = TEST_MUTEX.lock().unwrap();
reset_global();
let handle = setup_isolated_scope("scope_guardrail");
scope_register_tool_sanitize_request_guardrail(
&handle.uuid,
"local_sanitizer",
10,
Box::new(|_name, mut args| {
args.as_object_mut()
.unwrap()
.insert("scope_sanitized".into(), json!(true));
args
}),
)
.unwrap();
let events: Arc<Mutex<Vec<Event>>> = Arc::new(Mutex::new(Vec::new()));
let ec = events.clone();
register_subscriber(
"sanitize_observer",
Arc::new(move |e: &Event| {
ec.lock().unwrap().push(e.clone());
}),
)
.unwrap();
let tool_handle = tool_call(
nemo_flow::api::tool::ToolCallParams::builder()
.name("test_tool")
.args(json!({"input": "data"}))
.build(),
)
.unwrap();
{
let captured = events.lock().unwrap();
let start_event = &captured[0];
let input = start_event.input().unwrap();
assert_eq!(input["scope_sanitized"], true);
assert_eq!(input["input"], "data");
}
tool_call_end(
nemo_flow::api::tool::ToolCallEndParams::builder()
.handle(&tool_handle)
.result(json!("ok"))
.build(),
)
.unwrap();
deregister_subscriber("sanitize_observer").unwrap();
pop_scope(
nemo_flow::api::scope::PopScopeParams::builder()
.handle_uuid(&handle.uuid)
.build(),
)
.unwrap();
}
#[tokio::test]
async fn test_auto_cleanup_on_scope_pop() {
let _lock = TEST_MUTEX.lock().unwrap();
reset_global();
let stack = create_scope_stack();
set_thread_scope_stack(stack);
let handle = push_scope(
nemo_flow::api::scope::PushScopeParams::builder()
.name("ephemeral")
.scope_type(ScopeType::Function)
.build(),
)
.unwrap();
scope_register_tool_request_intercept(
&handle.uuid,
"ephemeral_intercept",
1,
false,
Box::new(|_name, mut args| {
args.as_object_mut()
.unwrap()
.insert("ephemeral".into(), json!(true));
Ok(args)
}),
)
.unwrap();
let func: ToolExecutionNextFn = Arc::new(|args| Box::pin(async move { Ok(args) }));
let result = tool_call_execute(
nemo_flow::api::tool::ToolCallExecuteParams::builder()
.name("tool")
.args(json!({"v": 1}))
.func(func)
.build(),
)
.await
.unwrap();
assert_eq!(result["ephemeral"], true);
pop_scope(
nemo_flow::api::scope::PopScopeParams::builder()
.handle_uuid(&handle.uuid)
.build(),
)
.unwrap();
let func2: ToolExecutionNextFn = Arc::new(|args| Box::pin(async move { Ok(args) }));
let result2 = tool_call_execute(
nemo_flow::api::tool::ToolCallExecuteParams::builder()
.name("tool")
.args(json!({"v": 2}))
.func(func2)
.build(),
)
.await
.unwrap();
assert!(result2.get("ephemeral").is_none());
assert_eq!(result2["v"], 2);
}
#[tokio::test]
async fn test_priority_merge_global_and_scope_local() {
let _lock = TEST_MUTEX.lock().unwrap();
reset_global();
let handle = setup_isolated_scope("merge_test");
let order = Arc::new(Mutex::new(Vec::<i32>::new()));
let o1 = order.clone();
register_tool_request_intercept(
"global_p10",
10,
false,
Box::new(move |_name, mut args| {
o1.lock().unwrap().push(10);
args.as_object_mut()
.unwrap()
.insert("p10".into(), json!(true));
Ok(args)
}),
)
.unwrap();
let o3 = order.clone();
register_tool_request_intercept(
"global_p30",
30,
false,
Box::new(move |_name, mut args| {
o3.lock().unwrap().push(30);
args.as_object_mut()
.unwrap()
.insert("p30".into(), json!(true));
Ok(args)
}),
)
.unwrap();
let o2 = order.clone();
scope_register_tool_request_intercept(
&handle.uuid,
"local_p20",
20,
false,
Box::new(move |_name, mut args| {
o2.lock().unwrap().push(20);
args.as_object_mut()
.unwrap()
.insert("p20".into(), json!(true));
Ok(args)
}),
)
.unwrap();
let func: ToolExecutionNextFn = Arc::new(|args| Box::pin(async move { Ok(args) }));
let result = tool_call_execute(
nemo_flow::api::tool::ToolCallExecuteParams::builder()
.name("tool")
.args(json!({}))
.func(func)
.build(),
)
.await
.unwrap();
assert_eq!(result["p10"], true);
assert_eq!(result["p20"], true);
assert_eq!(result["p30"], true);
let recorded = order.lock().unwrap();
assert_eq!(*recorded, vec![10, 20, 30]);
deregister_tool_request_intercept("global_p10").unwrap();
deregister_tool_request_intercept("global_p30").unwrap();
pop_scope(
nemo_flow::api::scope::PopScopeParams::builder()
.handle_uuid(&handle.uuid)
.build(),
)
.unwrap();
}
#[test]
fn test_name_coexistence_global_and_scope_local() {
let _lock = TEST_MUTEX.lock().unwrap();
reset_global();
let handle = setup_isolated_scope("coexist_test");
let count = Arc::new(AtomicU32::new(0));
let c1 = count.clone();
register_tool_sanitize_request_guardrail(
"shared_name",
1,
Box::new(move |_name, args| {
c1.fetch_add(1, Ordering::SeqCst);
args
}),
)
.unwrap();
let c2 = count.clone();
scope_register_tool_sanitize_request_guardrail(
&handle.uuid,
"shared_name",
2,
Box::new(move |_name, args| {
c2.fetch_add(1, Ordering::SeqCst);
args
}),
)
.unwrap();
let _tool_handle = tool_call(
nemo_flow::api::tool::ToolCallParams::builder()
.name("tool")
.args(json!({}))
.build(),
)
.unwrap();
assert_eq!(count.load(Ordering::SeqCst), 2);
deregister_tool_sanitize_request_guardrail("shared_name").unwrap();
pop_scope(
nemo_flow::api::scope::PopScopeParams::builder()
.handle_uuid(&handle.uuid)
.build(),
)
.unwrap();
}
#[tokio::test]
async fn test_scope_isolation_between_stacks() {
let _lock = TEST_MUTEX.lock().unwrap();
reset_global();
let stack_a = create_scope_stack();
let stack_b = create_scope_stack();
let scope_a = {
set_thread_scope_stack(stack_a.clone());
let s = push_scope(
nemo_flow::api::scope::PushScopeParams::builder()
.name("agent_a")
.scope_type(ScopeType::Agent)
.build(),
)
.unwrap();
scope_register_tool_request_intercept(
&s.uuid,
"a_intercept",
1,
false,
Box::new(|_name, mut args| {
args.as_object_mut()
.unwrap()
.insert("agent".into(), json!("a"));
Ok(args)
}),
)
.unwrap();
s
};
let scope_b = {
set_thread_scope_stack(stack_b.clone());
let s = push_scope(
nemo_flow::api::scope::PushScopeParams::builder()
.name("agent_b")
.scope_type(ScopeType::Agent)
.build(),
)
.unwrap();
scope_register_tool_request_intercept(
&s.uuid,
"b_intercept",
1,
false,
Box::new(|_name, mut args| {
args.as_object_mut()
.unwrap()
.insert("agent".into(), json!("b"));
Ok(args)
}),
)
.unwrap();
s
};
set_thread_scope_stack(stack_a.clone());
let func_a: ToolExecutionNextFn = Arc::new(|args| Box::pin(async move { Ok(args) }));
let result_a = tool_call_execute(
nemo_flow::api::tool::ToolCallExecuteParams::builder()
.name("tool")
.args(json!({}))
.func(func_a)
.build(),
)
.await
.unwrap();
assert_eq!(result_a["agent"], "a");
set_thread_scope_stack(stack_b.clone());
let func_b: ToolExecutionNextFn = Arc::new(|args| Box::pin(async move { Ok(args) }));
let result_b = tool_call_execute(
nemo_flow::api::tool::ToolCallExecuteParams::builder()
.name("tool")
.args(json!({}))
.func(func_b)
.build(),
)
.await
.unwrap();
assert_eq!(result_b["agent"], "b");
set_thread_scope_stack(stack_a);
pop_scope(
nemo_flow::api::scope::PopScopeParams::builder()
.handle_uuid(&scope_a.uuid)
.build(),
)
.unwrap();
set_thread_scope_stack(stack_b);
pop_scope(
nemo_flow::api::scope::PopScopeParams::builder()
.handle_uuid(&scope_b.uuid)
.build(),
)
.unwrap();
}
#[tokio::test]
async fn test_nested_scope_inheritance() {
let _lock = TEST_MUTEX.lock().unwrap();
reset_global();
let stack = create_scope_stack();
set_thread_scope_stack(stack);
let order = Arc::new(Mutex::new(Vec::<String>::new()));
let og = order.clone();
register_tool_request_intercept(
"global_intercept",
1,
false,
Box::new(move |_name, mut args| {
og.lock().unwrap().push("global".into());
args.as_object_mut()
.unwrap()
.insert("global".into(), json!(true));
Ok(args)
}),
)
.unwrap();
let scope_a = push_scope(
nemo_flow::api::scope::PushScopeParams::builder()
.name("scope_a")
.scope_type(ScopeType::Agent)
.build(),
)
.unwrap();
let oa = order.clone();
scope_register_tool_request_intercept(
&scope_a.uuid,
"a_intercept",
5,
false,
Box::new(move |_name, mut args| {
oa.lock().unwrap().push("scope_a".into());
args.as_object_mut()
.unwrap()
.insert("scope_a".into(), json!(true));
Ok(args)
}),
)
.unwrap();
let scope_b = push_scope(
nemo_flow::api::scope::PushScopeParams::builder()
.name("scope_b")
.scope_type(ScopeType::Function)
.parent(&scope_a)
.build(),
)
.unwrap();
let ob = order.clone();
scope_register_tool_request_intercept(
&scope_b.uuid,
"b_intercept",
10,
false,
Box::new(move |_name, mut args| {
ob.lock().unwrap().push("scope_b".into());
args.as_object_mut()
.unwrap()
.insert("scope_b".into(), json!(true));
Ok(args)
}),
)
.unwrap();
let func: ToolExecutionNextFn = Arc::new(|args| Box::pin(async move { Ok(args) }));
let result = tool_call_execute(
nemo_flow::api::tool::ToolCallExecuteParams::builder()
.name("tool")
.args(json!({}))
.func(func)
.build(),
)
.await
.unwrap();
assert_eq!(result["global"], true);
assert_eq!(result["scope_a"], true);
assert_eq!(result["scope_b"], true);
let recorded = order.lock().unwrap();
assert_eq!(*recorded, vec!["global", "scope_a", "scope_b"]);
pop_scope(
nemo_flow::api::scope::PopScopeParams::builder()
.handle_uuid(&scope_b.uuid)
.build(),
)
.unwrap();
pop_scope(
nemo_flow::api::scope::PopScopeParams::builder()
.handle_uuid(&scope_a.uuid)
.build(),
)
.unwrap();
deregister_tool_request_intercept("global_intercept").unwrap();
}
#[test]
fn test_scope_local_subscriber() {
let _lock = TEST_MUTEX.lock().unwrap();
reset_global();
let handle = setup_isolated_scope("sub_scope");
let events = Arc::new(Mutex::new(Vec::<String>::new()));
let ec = events.clone();
scope_register_subscriber(
&handle.uuid,
"local_sub",
Arc::new(move |e: &Event| {
let phase = match e.scope_category() {
Some(ScopeCategory::Start) => "start",
Some(ScopeCategory::End) => "end",
None => e.kind(),
};
ec.lock().unwrap().push(phase.to_string());
}),
)
.unwrap();
let child = push_scope(
nemo_flow::api::scope::PushScopeParams::builder()
.name("child")
.scope_type(ScopeType::Function)
.parent(&handle)
.build(),
)
.unwrap();
pop_scope(
nemo_flow::api::scope::PopScopeParams::builder()
.handle_uuid(&child.uuid)
.build(),
)
.unwrap();
{
let captured = events.lock().unwrap();
assert_eq!(captured.len(), 2);
assert_eq!(captured[0], "start");
assert_eq!(captured[1], "end");
}
pop_scope(
nemo_flow::api::scope::PopScopeParams::builder()
.handle_uuid(&handle.uuid)
.build(),
)
.unwrap();
{
let captured = events.lock().unwrap();
assert_eq!(captured.len(), 3);
assert_eq!(captured[2], "end");
}
let another = push_scope(
nemo_flow::api::scope::PushScopeParams::builder()
.name("after_pop")
.scope_type(ScopeType::Function)
.build(),
)
.unwrap();
pop_scope(
nemo_flow::api::scope::PopScopeParams::builder()
.handle_uuid(&another.uuid)
.build(),
)
.unwrap();
let captured2 = events.lock().unwrap();
assert_eq!(captured2.len(), 3);
}
#[tokio::test]
async fn test_scope_local_conditional_execution_guardrail() {
let _lock = TEST_MUTEX.lock().unwrap();
reset_global();
let handle = setup_isolated_scope("cond_scope");
scope_register_tool_conditional_execution_guardrail(
&handle.uuid,
"tool_blocker",
1,
Box::new(|name, _args| {
if name == "banned_tool" {
Ok(Some("banned_tool is not allowed in this scope".to_string()))
} else {
Ok(None)
}
}),
)
.unwrap();
let func_banned: ToolExecutionNextFn = Arc::new(|args| Box::pin(async move { Ok(args) }));
let err = tool_call_execute(
nemo_flow::api::tool::ToolCallExecuteParams::builder()
.name("banned_tool")
.args(json!({"input": 1}))
.func(func_banned)
.build(),
)
.await;
assert!(err.is_err());
match err.unwrap_err() {
FlowError::GuardrailRejected(reason) => {
assert!(reason.contains("banned_tool is not allowed"));
}
other => panic!("Expected GuardrailRejected, got: {:?}", other),
}
let func_ok: ToolExecutionNextFn = Arc::new(|args| Box::pin(async move { Ok(args) }));
let result = tool_call_execute(
nemo_flow::api::tool::ToolCallExecuteParams::builder()
.name("allowed_tool")
.args(json!({"input": 2}))
.func(func_ok)
.build(),
)
.await
.unwrap();
assert_eq!(result["input"], 2);
pop_scope(
nemo_flow::api::scope::PopScopeParams::builder()
.handle_uuid(&handle.uuid)
.build(),
)
.unwrap();
}