use anyhow::Result;
use serde_json::Value;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use super::runtime_agent::RuntimeAgent;
use super::streaming::{StreamConsumer, StreamEvent};
use super::{Agent, Session};
type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
type ContentHandler = dyn Fn(&str) + Send + Sync;
type ToolCallHandler = dyn Fn(&str, &str) + Send + Sync;
type ToolResultHandler = dyn Fn(&str, &Value) + Send + Sync;
type DoneHandler = dyn Fn() + Send + Sync;
type AsyncToolCallHandler = dyn Fn(String, String) -> BoxFuture<'static, Result<()>> + Send + Sync;
type AsyncToolResultHandler = dyn Fn(String, Value) -> BoxFuture<'static, Result<()>> + Send + Sync;
pub struct StreamBuilder<'a> {
agent: &'a RuntimeAgent,
message: String,
on_content: Option<Arc<ContentHandler>>,
on_reasoning: Option<Arc<ContentHandler>>,
on_tool_call: Option<Arc<ToolCallHandler>>,
on_tool_result: Option<Arc<ToolResultHandler>>,
on_tool_failed: Option<Arc<ToolCallHandler>>,
on_error: Option<Arc<ContentHandler>>,
on_done: Option<Arc<DoneHandler>>,
on_session_started: Option<Arc<ContentHandler>>,
on_tool_call_async: Option<Arc<AsyncToolCallHandler>>,
on_tool_result_async: Option<Arc<AsyncToolResultHandler>>,
}
impl<'a> StreamBuilder<'a> {
pub fn new(agent: &'a RuntimeAgent, message: impl Into<String>) -> Self {
Self {
agent,
message: message.into(),
on_content: None,
on_reasoning: None,
on_tool_call: None,
on_tool_result: None,
on_tool_failed: None,
on_error: None,
on_done: None,
on_session_started: None,
on_tool_call_async: None,
on_tool_result_async: None,
}
}
pub fn on_content<F>(mut self, f: F) -> Self
where
F: Fn(&str) + Send + Sync + 'static,
{
self.on_content = Some(Arc::new(f));
self
}
pub fn on_reasoning<F>(mut self, f: F) -> Self
where
F: Fn(&str) + Send + Sync + 'static,
{
self.on_reasoning = Some(Arc::new(f));
self
}
pub fn on_tool_call<F>(mut self, f: F) -> Self
where
F: Fn(&str, &str) + Send + Sync + 'static,
{
self.on_tool_call = Some(Arc::new(f));
self
}
pub fn on_tool_result<F>(mut self, f: F) -> Self
where
F: Fn(&str, &Value) + Send + Sync + 'static,
{
self.on_tool_result = Some(Arc::new(f));
self
}
pub fn on_tool_failed<F>(mut self, f: F) -> Self
where
F: Fn(&str, &str) + Send + Sync + 'static,
{
self.on_tool_failed = Some(Arc::new(f));
self
}
pub fn on_error<F>(mut self, f: F) -> Self
where
F: Fn(&str) + Send + Sync + 'static,
{
self.on_error = Some(Arc::new(f));
self
}
pub fn on_done<F>(mut self, f: F) -> Self
where
F: Fn() + Send + Sync + 'static,
{
self.on_done = Some(Arc::new(f));
self
}
pub fn on_session_started<F>(mut self, f: F) -> Self
where
F: Fn(&str) + Send + Sync + 'static,
{
self.on_session_started = Some(Arc::new(f));
self
}
pub fn on_tool_call_async<F, Fut>(mut self, f: F) -> Self
where
F: Fn(String, String) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<()>> + Send + 'static,
{
self.on_tool_call_async = Some(Arc::new(move |name, args| Box::pin(f(name, args))));
self
}
pub fn on_tool_result_async<F, Fut>(mut self, f: F) -> Self
where
F: Fn(String, Value) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<()>> + Send + 'static,
{
self.on_tool_result_async = Some(Arc::new(move |name, result| Box::pin(f(name, result))));
self
}
pub async fn run(self) -> Result<Session> {
let consumer = ClosureConsumer {
on_content: self.on_content,
on_reasoning: self.on_reasoning,
on_tool_call: self.on_tool_call,
on_tool_result: self.on_tool_result,
on_tool_failed: self.on_tool_failed,
on_error: self.on_error,
on_done: self.on_done,
on_session_started: self.on_session_started,
on_tool_call_async: self.on_tool_call_async,
on_tool_result_async: self.on_tool_result_async,
};
self.agent
.run_streaming(&self.message, Box::new(consumer))
.await
}
}
struct ClosureConsumer {
on_content: Option<Arc<ContentHandler>>,
on_reasoning: Option<Arc<ContentHandler>>,
on_tool_call: Option<Arc<ToolCallHandler>>,
on_tool_result: Option<Arc<ToolResultHandler>>,
on_tool_failed: Option<Arc<ToolCallHandler>>,
on_error: Option<Arc<ContentHandler>>,
on_done: Option<Arc<DoneHandler>>,
on_session_started: Option<Arc<ContentHandler>>,
on_tool_call_async: Option<Arc<AsyncToolCallHandler>>,
on_tool_result_async: Option<Arc<AsyncToolResultHandler>>,
}
impl StreamConsumer for ClosureConsumer {
fn on_event(&self, event: &StreamEvent) -> Result<()> {
match event {
StreamEvent::SessionStarted { session_id } => {
if let Some(ref f) = self.on_session_started {
f(session_id);
}
}
StreamEvent::Content { content } => {
if let Some(ref f) = self.on_content {
f(content);
}
}
StreamEvent::Reasoning { content } => {
if let Some(ref f) = self.on_reasoning {
f(content);
}
}
StreamEvent::ToolCallStarted {
tool_name,
arguments,
} => {
if let Some(ref f) = self.on_tool_call {
f(tool_name, arguments);
}
if let Some(ref f) = self.on_tool_call_async {
let fut = f(tool_name.clone(), arguments.clone());
tokio::spawn(async move {
if let Err(e) = fut.await {
eprintln!("Error in async tool call handler: {}", e);
}
});
}
}
StreamEvent::ToolCallCompleted {
tool_name,
result,
success,
..
} => {
if *success {
if let Some(ref f) = self.on_tool_result {
f(tool_name, result);
}
if let Some(ref f) = self.on_tool_result_async {
let fut = f(tool_name.clone(), result.clone());
tokio::spawn(async move {
if let Err(e) = fut.await {
eprintln!("Error in async tool result handler: {}", e);
}
});
}
}
}
StreamEvent::ToolCallFailed { tool_name, error } => {
if let Some(ref f) = self.on_tool_failed {
f(tool_name, error);
}
}
StreamEvent::Error { message, .. } => {
if let Some(ref f) = self.on_error {
f(message);
}
}
StreamEvent::Done => {
if let Some(ref f) = self.on_done {
f();
}
}
_ => {}
}
Ok(())
}
}
impl RuntimeAgent {
pub fn stream(&self, message: impl Into<String>) -> StreamBuilder<'_> {
StreamBuilder::new(self, message)
}
}
#[cfg(test)]
mod tests {
#[test]
fn test_stream_builder_creation() {
}
}