use std::sync::Arc;
use serde_json::json;
use tinyagents::harness::context::{RunConfig, RunContext};
use tinyagents::harness::events::{AgentEvent, RecordingListener};
use tinyagents::harness::ids::{ExecutionStatus, RunId};
use tinyagents::harness::message::{AssistantMessage, ContentBlock, Message};
use tinyagents::harness::model::ModelResponse;
use tinyagents::harness::providers::MockModel;
use tinyagents::harness::runtime::AgentHarness;
use tinyagents::harness::testkit::FakeTool;
use tinyagents::harness::tool::ToolCall;
use tinyagents::harness::usage::Usage;
use tinyagents::{
GraphBuilder, GraphEvent, GraphEventJournal, HarnessEventJournal, HarnessStatusStore,
InMemoryEventJournal, InMemoryGraphEventJournal, InMemoryStatusStore, JournalSink, NodeContext,
NodeResult, RedactingSink,
};
const MODEL_NAME: &str = "gpt-sk-SEKRET-key";
const SECRET: &str = "sk-SEKRET";
fn tool_call_response(id: &str, name: &str, arguments: serde_json::Value) -> ModelResponse {
ModelResponse {
message: AssistantMessage {
id: Some(format!("msg-{id}")),
content: Vec::new(),
tool_calls: vec![ToolCall::new(id, name, arguments)],
usage: Some(Usage::new(7, 3)),
},
usage: Some(Usage::new(7, 3)),
finish_reason: Some("tool_calls".to_string()),
raw: None,
resolved_model: None,
}
}
fn text_response(text: &str) -> ModelResponse {
ModelResponse {
message: AssistantMessage {
id: None,
content: vec![ContentBlock::Text(text.to_string())],
tool_calls: Vec::new(),
usage: Some(Usage::new(4, 2)),
},
usage: Some(Usage::new(4, 2)),
finish_reason: Some("stop".to_string()),
raw: None,
resolved_model: None,
}
}
#[tokio::test]
async fn harness_run_journals_redacted_replayable_events() {
let mut harness: AgentHarness<()> = AgentHarness::new();
harness
.register_model(
MODEL_NAME,
Arc::new(MockModel::with_responses(vec![
tool_call_response("call-1", "lookup", json!({ "q": "x" })),
text_response("done"),
])),
)
.set_default_model(MODEL_NAME)
.register_tool(Arc::new(FakeTool::returning("lookup", "tool-output")));
let journal: Arc<InMemoryEventJournal> = Arc::new(InMemoryEventJournal::new());
let recorder = Arc::new(RecordingListener::new());
let run_id = RunId::new("run-obs");
let journal_sink = JournalSink::new(journal.clone(), run_id.clone());
let redacting = RedactingSink::new(Arc::new(journal_sink), vec![SECRET.to_string()]);
let ctx: RunContext<()> = RunContext::new(RunConfig::new(run_id.as_str()), ());
ctx.events.subscribe(Arc::new(redacting));
ctx.events.subscribe(recorder.clone());
let result = harness
.invoke_in_context_with_status(&(), ctx, vec![Message::user("please look up")])
.await
.expect("run succeeds");
assert_eq!(result.run.model_calls, 2);
assert_eq!(result.run.tool_calls, 1);
let status_store = InMemoryStatusStore::new();
status_store
.put_status(result.status.clone())
.await
.expect("put status");
let stored = status_store
.get_status(run_id.as_str())
.await
.expect("get status")
.expect("status present");
assert_eq!(stored.status, ExecutionStatus::Completed);
assert!(stored.ended_at.is_some());
assert!(stored.error.is_none());
let all = journal
.read_from(run_id.as_str(), 0)
.await
.expect("replay journal");
assert!(
all.len() >= 4,
"expected run/model/tool/completion observations, got {}",
all.len()
);
for (i, obs) in all.iter().enumerate() {
assert_eq!(obs.offset, i as u64, "dense offsets from 0");
assert_eq!(obs.run_id.as_str(), run_id.as_str());
assert_eq!(obs.root_run_id.as_str(), run_id.as_str());
assert!(obs.parent_run_id.is_none(), "top-level run has no parent");
}
assert!(
matches!(all.first().unwrap().event, AgentEvent::RunStarted { .. }),
"first journaled event is RunStarted"
);
assert!(
all.iter()
.any(|o| matches!(o.event, AgentEvent::RunCompleted { .. })),
"completion is observable from the journal"
);
let journal_json = serde_json::to_string(&all).expect("serialize observations");
assert!(
!journal_json.contains(SECRET),
"the secret must be masked before it is journaled"
);
assert!(
journal_json.contains(RedactingSink::DEFAULT_MASK),
"the redaction mask should appear where the secret was"
);
let live_json = serde_json::to_string(&recorder.events()).expect("serialize live records");
assert!(
live_json.contains(SECRET),
"the live broadcast carries full detail; redaction is sink-local"
);
let tail = journal
.read_from(run_id.as_str(), 2)
.await
.expect("replay tail");
assert_eq!(tail.len(), all.len() - 2);
assert_eq!(tail.first().unwrap().offset, 2);
}
fn line_graph() -> tinyagents::CompiledGraph<i32, i32> {
GraphBuilder::<i32, i32>::overwrite()
.add_node("a", |_s, _c: NodeContext| async move {
Ok(NodeResult::Update(1))
})
.add_node("b", |s, _c: NodeContext| async move {
Ok(NodeResult::Update(s + 1))
})
.set_entry("a")
.add_edge("a", "b")
.set_finish("b")
.compile()
.expect("graph compiles")
}
#[tokio::test]
async fn graph_run_journals_replayable_observations_with_coords() {
let journal = Arc::new(InMemoryGraphEventJournal::new());
let graph = line_graph().with_event_journal(journal.clone());
let run = graph.run(0).await.expect("graph run succeeds");
let run_id = run.status.run_id.as_str().to_string();
let obs = journal
.read_from(&run_id, 0)
.await
.expect("replay graph journal");
assert!(
obs.len() >= 3,
"expected several graph observations, got {}",
obs.len()
);
for (i, o) in obs.iter().enumerate() {
assert_eq!(o.offset, i as u64, "dense offsets from 0");
assert_eq!(o.run_id.as_str(), run_id);
assert_eq!(o.root_run_id.as_str(), run_id);
assert_eq!(&o.graph_id, graph.graph_id());
}
assert!(
obs.iter().any(|o| o.step >= 1),
"observations should carry superstep coordinates"
);
assert!(
matches!(obs.first().unwrap().event, GraphEvent::RunStarted { .. }),
"first observation is RunStarted"
);
assert!(
obs.iter()
.any(|o| matches!(o.event, GraphEvent::RunCompleted { .. })),
"completion is observable from the graph journal"
);
let tail = journal.read_from(&run_id, 2).await.expect("replay tail");
assert_eq!(tail.len(), obs.len() - 2);
assert_eq!(tail.first().unwrap().offset, 2);
assert!(
journal.read_from("nope", 0).await.unwrap().is_empty(),
"unknown run replays empty"
);
}