use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use futures::stream;
use tokio::sync::{broadcast, RwLock};
use bamboo_agent_core::storage::Storage;
use bamboo_agent_core::tools::{ToolCall, ToolError, ToolResult, ToolSchema};
use bamboo_agent_core::{AgentEvent, Message, Session};
use bamboo_domain::subagent::{SubagentProfile, ToolPolicy};
use bamboo_infrastructure::{LLMChunk, LLMError, LLMProvider, LLMStream};
use crate::runtime::execution::spawn::{SpawnContext, SpawnJob};
use crate::sdk::runner::{ProfileRunner, RunProfileInput};
use crate::sdk::spawn::run_child_spawn;
use crate::MetricsCollector;
use crate::SkillManager;
struct CompletedProvider;
#[async_trait]
impl LLMProvider for CompletedProvider {
async fn chat_stream(
&self,
_messages: &[Message],
_tools: &[ToolSchema],
_max_output_tokens: Option<u32>,
_model: &str,
) -> Result<LLMStream, LLMError> {
let items: Vec<bamboo_infrastructure::provider::Result<LLMChunk>> =
vec![Ok(LLMChunk::Token("done".to_string())), Ok(LLMChunk::Done)];
Ok(Box::pin(stream::iter(items)))
}
}
struct HangingProvider;
#[async_trait]
impl LLMProvider for HangingProvider {
async fn chat_stream(
&self,
_messages: &[Message],
_tools: &[ToolSchema],
_max_output_tokens: Option<u32>,
_model: &str,
) -> Result<LLMStream, LLMError> {
let chunks: Vec<bamboo_infrastructure::provider::Result<LLMChunk>> = vec![
Ok(LLMChunk::Token("slow-1".to_string())),
Ok(LLMChunk::Token("slow-2".to_string())),
Ok(LLMChunk::Done),
];
let s = stream::unfold(chunks.into_iter(), |mut it| async move {
let next = it.next()?;
tokio::time::sleep(Duration::from_secs(2)).await;
Some((next, it))
});
Ok(Box::pin(s))
}
}
struct CatalogToolExecutor {
names: Vec<String>,
}
#[async_trait]
impl bamboo_agent_core::tools::ToolExecutor for CatalogToolExecutor {
async fn execute(&self, _call: &ToolCall) -> Result<ToolResult, ToolError> {
Err(ToolError::NotFound("noop".to_string()))
}
fn list_tools(&self) -> Vec<ToolSchema> {
self.names
.iter()
.map(|name| ToolSchema {
schema_type: "function".to_string(),
function: bamboo_agent_core::tools::FunctionSchema {
name: name.clone(),
description: String::new(),
parameters: serde_json::json!({}),
},
})
.collect()
}
}
struct Harness {
ctx: SpawnContext,
storage: Arc<dyn Storage>,
parent_session_id: String,
child_session_id: String,
parent_rx: broadcast::Receiver<AgentEvent>,
parent_tx: broadcast::Sender<AgentEvent>,
}
fn temp_dir(prefix: &str) -> std::path::PathBuf {
std::env::temp_dir().join(format!("{prefix}-{}", uuid::Uuid::new_v4()))
}
async fn build_harness(
provider: Arc<dyn LLMProvider>,
tool_names: Vec<String>,
child_metadata: &[(&str, &str)],
) -> Harness {
let home = temp_dir("bamboo-sdk-test");
tokio::fs::create_dir_all(&home).await.unwrap();
let session_store = Arc::new(
bamboo_infrastructure::SessionStoreV2::new(home.clone())
.await
.unwrap(),
);
let storage_dir = home.join("storage");
tokio::fs::create_dir_all(&storage_dir).await.unwrap();
let jsonl = bamboo_infrastructure::JsonlStorage::new(&storage_dir);
jsonl.init().await.unwrap();
let storage: Arc<dyn Storage> = Arc::new(jsonl);
let metrics_storage = Arc::new(crate::SqliteMetricsStorage::new(home.join("metrics.db")));
let metrics_collector = MetricsCollector::spawn(metrics_storage, 7);
let sessions_cache = Arc::new(RwLock::new(HashMap::new()));
let agent_runners = Arc::new(RwLock::new(HashMap::new()));
let session_event_senders = Arc::new(RwLock::new(HashMap::<
String,
broadcast::Sender<AgentEvent>,
>::new()));
let parent_session_id = "root-session".to_string();
let child_session_id = "child-session".to_string();
let (parent_tx, parent_rx) = broadcast::channel(1000);
{
let mut senders = session_event_senders.write().await;
senders.insert(parent_session_id.clone(), parent_tx.clone());
}
let mut parent = Session::new(parent_session_id.clone(), "gpt-5");
parent.title = "Root".to_string();
storage.save_session(&parent).await.unwrap();
let mut child = Session::new_child(
child_session_id.clone(),
parent_session_id.clone(),
"gpt-5",
"Child session",
);
child
.metadata
.insert("last_run_status".to_string(), "queued".to_string());
for (k, v) in child_metadata {
child.metadata.insert(k.to_string(), v.to_string());
}
child.add_message(Message::system("child system"));
child.add_message(Message::user("do the task"));
storage.save_session(&child).await.unwrap();
let agent = Arc::new(
crate::Agent::builder()
.storage(storage.clone())
.persistence(Arc::new(bamboo_infrastructure::LockedSessionStore::new(
storage.clone(),
)))
.attachment_reader(session_store.clone())
.skill_manager(Arc::new(SkillManager::new()))
.metrics_collector(metrics_collector)
.config(Arc::new(RwLock::new(
bamboo_infrastructure::Config::default(),
)))
.provider(provider)
.default_tools(Arc::new(CatalogToolExecutor {
names: tool_names.clone(),
}))
.build()
.expect("test agent should build"),
);
let ctx = SpawnContext {
agent,
tools: Arc::new(CatalogToolExecutor { names: tool_names }),
sessions_cache,
agent_runners,
session_event_senders,
external_child_runner: None,
provider_router: None,
app_data_dir: None,
completion_handler: None,
account_feed_inbox: None,
};
Harness {
ctx,
storage,
parent_session_id,
child_session_id,
parent_rx,
parent_tx,
}
}
async fn collect_until_completed(rx: &mut broadcast::Receiver<AgentEvent>) -> Vec<AgentEvent> {
collect_until_completed_with_budget(rx, Duration::from_secs(15)).await
}
async fn collect_until_completed_with_budget(
rx: &mut broadcast::Receiver<AgentEvent>,
budget: Duration,
) -> Vec<AgentEvent> {
let mut events = Vec::new();
let deadline = tokio::time::Instant::now() + budget;
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
panic!("timed out waiting for SubAgentCompleted; saw: {events:?}");
}
match tokio::time::timeout(remaining, rx.recv()).await {
Ok(Ok(event)) => {
let done = matches!(event, AgentEvent::SubAgentCompleted { .. });
events.push(event);
if done {
return events;
}
}
Ok(Err(broadcast::error::RecvError::Lagged(_))) => continue,
Ok(Err(broadcast::error::RecvError::Closed)) => {
panic!("parent channel closed before completion; saw: {events:?}")
}
Err(_) => panic!("timed out waiting for SubAgentCompleted; saw: {events:?}"),
}
}
}
fn researcher_allowlist() -> SubagentProfile {
SubagentProfile {
id: "researcher".to_string(),
display_name: "Researcher".to_string(),
description: String::new(),
system_prompt: "research".to_string(),
tools: ToolPolicy::Allowlist {
allow: vec!["Read".to_string(), "Grep".to_string()],
},
model_hint: None,
default_responsibility: None,
ui: Default::default(),
}
}
fn inherit_profile() -> SubagentProfile {
SubagentProfile {
id: "general-purpose".to_string(),
display_name: "General".to_string(),
description: String::new(),
system_prompt: "general".to_string(),
tools: ToolPolicy::Inherit,
model_hint: None,
default_responsibility: None,
ui: Default::default(),
}
}
#[tokio::test]
async fn s_t2_1_run_child_spawn_emits_started_event_completed_in_order() {
let mut harness = build_harness(Arc::new(CompletedProvider), Vec::new(), &[]).await;
harness
.parent_tx
.send(AgentEvent::SubAgentStarted {
parent_session_id: harness.parent_session_id.clone(),
child_session_id: harness.child_session_id.clone(),
title: Some("Child session".to_string()),
})
.unwrap();
run_child_spawn(
harness.ctx.clone(),
SpawnJob {
parent_session_id: harness.parent_session_id.clone(),
child_session_id: harness.child_session_id.clone(),
model: "gpt-5".to_string(),
disabled_tools: None,
},
)
.await
.unwrap();
let events = collect_until_completed(&mut harness.parent_rx).await;
let started_idx = events
.iter()
.position(|e| matches!(e, AgentEvent::SubAgentStarted { .. }))
.expect("SubAgentStarted present");
let completed_idx = events
.iter()
.position(|e| matches!(e, AgentEvent::SubAgentCompleted { .. }))
.expect("SubAgentCompleted present");
let event_idx = events
.iter()
.position(|e| matches!(e, AgentEvent::SubAgentEvent { .. }));
assert!(
started_idx < completed_idx,
"Started must precede Completed: {events:?}"
);
if let Some(event_idx) = event_idx {
assert!(
started_idx < event_idx,
"Started must precede SubAgentEvent"
);
assert!(
event_idx < completed_idx,
"SubAgentEvent must precede Completed"
);
}
match events.last().unwrap() {
AgentEvent::SubAgentCompleted { status, .. } => {
assert_eq!(status, "completed", "child should finish completed");
}
other => panic!("last event must be SubAgentCompleted, got {other:?}"),
}
let persisted = harness
.storage
.load_session(&harness.child_session_id)
.await
.unwrap()
.unwrap();
assert_eq!(
persisted
.metadata
.get("last_run_status")
.map(String::as_str),
Some("completed")
);
}
#[tokio::test]
async fn s_t2_2_run_profile_allowlist_disables_non_allowlisted_tools() {
let tool_names = vec![
"Read".to_string(),
"Grep".to_string(),
"Edit".to_string(),
"Write".to_string(),
];
let harness = build_harness(Arc::new(CompletedProvider), tool_names, &[]).await;
let runner = ProfileRunner::new(harness.ctx.clone());
let input = RunProfileInput {
child_session_id: harness.child_session_id.clone(),
parent_session_id: harness.parent_session_id.clone(),
model: "gpt-5".to_string(),
};
let job = runner.build_job(&researcher_allowlist(), &input);
let disabled = job.disabled_tools.expect("allowlist yields disabled tools");
assert!(disabled.contains(&"Edit".to_string()));
assert!(disabled.contains(&"Write".to_string()));
assert!(!disabled.contains(&"Read".to_string()));
assert!(!disabled.contains(&"Grep".to_string()));
}
#[tokio::test]
async fn s_t2_3_run_profile_inherit_has_no_disabled_tools() {
let tool_names = vec!["Read".to_string(), "Edit".to_string()];
let harness = build_harness(Arc::new(CompletedProvider), tool_names, &[]).await;
let runner = ProfileRunner::new(harness.ctx.clone());
let input = RunProfileInput {
child_session_id: harness.child_session_id.clone(),
parent_session_id: harness.parent_session_id.clone(),
model: "gpt-5".to_string(),
};
let job = runner.build_job(&inherit_profile(), &input);
assert!(
job.disabled_tools.is_none(),
"Inherit policy must not disable any tools"
);
}
#[tokio::test]
async fn s_t2_4_run_profile_model_override_is_persisted() {
let harness = build_harness(Arc::new(CompletedProvider), Vec::new(), &[]).await;
let runner = ProfileRunner::new(harness.ctx.clone());
let override_model = "claude-3-7-sonnet";
let mut parent_rx = harness.parent_rx.resubscribe();
runner
.run_profile(
&inherit_profile(),
RunProfileInput {
child_session_id: harness.child_session_id.clone(),
parent_session_id: harness.parent_session_id.clone(),
model: override_model.to_string(),
},
)
.await
.unwrap();
let _ = collect_until_completed(&mut parent_rx).await;
let persisted = harness
.storage
.load_session(&harness.child_session_id)
.await
.unwrap()
.unwrap();
assert_eq!(
persisted.model, override_model,
"child session model must reflect the runner override"
);
}
#[tokio::test]
async fn s_t2_5_watchdog_timeout_completes_with_timeout_status() {
let harness = build_harness(
Arc::new(HangingProvider),
Vec::new(),
&[
("child_watchdog.max_total_secs", "1"),
("child_watchdog.max_idle_secs", "1"),
("child_watchdog.check_interval_secs", "1"),
],
)
.await;
let mut parent_rx = harness.parent_rx.resubscribe();
run_child_spawn(
harness.ctx.clone(),
SpawnJob {
parent_session_id: harness.parent_session_id.clone(),
child_session_id: harness.child_session_id.clone(),
model: "gpt-5".to_string(),
disabled_tools: None,
},
)
.await
.unwrap();
let events = collect_until_completed_with_budget(&mut parent_rx, Duration::from_secs(45)).await;
match events.last().unwrap() {
AgentEvent::SubAgentCompleted { status, .. } => {
assert_eq!(status, "timeout", "watchdog must yield timeout status");
}
other => panic!("expected SubAgentCompleted, got {other:?}"),
}
}