mod builder;
mod execute_request;
mod tools;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
pub use builder::AgentBuilder;
pub use execute_request::ExecuteRequestBuilder;
pub use tools::{
builtin_tool_names, builtin_tool_specs, BuiltinTool, ToolSpec, CANONICAL_TOOL_NAMES,
};
pub use bamboo_agent_core::{
AgentError, AgentEvent, Message, MessageContent, Role, Session, TokenBudgetUsage, TokenUsage,
};
pub use bamboo_domain::{TaskItem, TaskItemStatus, TaskList};
pub use bamboo_engine::ExecuteRequest;
pub use bamboo_infrastructure::LLMProvider;
pub use bamboo_tools::{BuiltinToolExecutor, BuiltinToolExecutorBuilder, ToolOutputManager};
const EVENT_CHANNEL_CAPACITY: usize = 256;
#[derive(Clone)]
pub struct Agent {
inner: bamboo_engine::Agent,
system_prompt: Option<String>,
model: Option<String>,
}
impl Agent {
pub fn builder() -> AgentBuilder {
AgentBuilder::new()
}
pub fn from_runtime(inner: bamboo_engine::Agent) -> Self {
Self {
inner,
system_prompt: None,
model: None,
}
}
pub(crate) fn from_runtime_with_config(
inner: bamboo_engine::Agent,
system_prompt: Option<String>,
model: Option<String>,
) -> Self {
Self {
inner,
system_prompt,
model,
}
}
pub async fn run(
&self,
session: &mut Session,
input: impl Into<String>,
) -> Result<(), AgentError> {
session.add_message(Message::user(input.into()));
self.run_session(session).await
}
pub async fn run_session(&self, session: &mut Session) -> Result<(), AgentError> {
let (event_tx, mut event_rx) = mpsc::channel::<AgentEvent>(EVENT_CHANNEL_CAPACITY);
let cancel_token = CancellationToken::new();
let drain = tokio::spawn(async move { while event_rx.recv().await.is_some() {} });
let result = self.execute_internal(session, event_tx, cancel_token).await;
drain.abort();
result
}
pub fn run_stream(
&self,
mut session: Session,
input: impl Into<String>,
) -> mpsc::Receiver<AgentEvent> {
session.add_message(Message::user(input.into()));
self.run_stream_session(session)
}
pub fn run_stream_session(&self, mut session: Session) -> mpsc::Receiver<AgentEvent> {
let (event_tx, event_rx) = mpsc::channel::<AgentEvent>(EVENT_CHANNEL_CAPACITY);
let cancel_token = CancellationToken::new();
let agent = self.clone();
tokio::spawn(async move {
if let Err(error) = agent
.execute_internal(&mut session, event_tx, cancel_token)
.await
{
tracing::warn!("Agent::run_stream execution failed: {error}");
}
});
event_rx
}
async fn execute_internal(
&self,
session: &mut Session,
event_tx: mpsc::Sender<AgentEvent>,
cancel_token: CancellationToken,
) -> Result<(), AgentError> {
if let Some(prompt) = self.system_prompt.as_ref() {
match session.messages.first() {
Some(first) if matches!(first.role, Role::System) => {
session.messages[0] = Message::system(prompt.clone());
}
_ => session.messages.insert(0, Message::system(prompt.clone())),
}
}
if let Some(model) = self.model.as_ref() {
session.model = model.clone();
}
let initial_message = session
.messages
.iter()
.rev()
.find(|m| matches!(m.role, Role::User))
.map(|m| m.content.clone())
.unwrap_or_default();
let mut builder = ExecuteRequestBuilder::new(initial_message, event_tx, cancel_token);
if let Some(model) = self.model.clone() {
builder = builder.model(model);
}
self.inner.execute(session, builder.build()).await
}
pub fn storage(&self) -> &Arc<dyn bamboo_agent_core::storage::Storage> {
self.inner.storage()
}
pub fn persistence(&self) -> &Arc<dyn bamboo_domain::RuntimeSessionPersistence> {
self.inner.persistence()
}
}