use std::collections::BTreeMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use serde_json::Value;
use tokio::io::{AsyncBufRead, AsyncWrite, BufReader};
use crate::errors::Result as SdkResult;
use crate::hook::HookHandler;
use crate::runtime::{dispatch_loop, Handlers, ToolHandler};
type NotificationHandler = Arc<dyn Fn(Value) + Send + Sync>;
pub type HandlerRegistry = Handlers;
type InitCallback = Pin<Box<dyn FnOnce() -> Pin<Box<dyn Future<Output = SdkResult<()>> + Send>> + Send>>;
type ShutdownCallback = Pin<Box<dyn FnOnce() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send>>;
#[cfg(feature = "admin")]
type AdminReadyCallback =
Box<dyn FnOnce(Arc<crate::admin::AdminClient>) + Send>;
pub struct Microapp {
name: String,
version: String,
tools: BTreeMap<String, Arc<dyn ToolHandler>>,
hooks: BTreeMap<String, Arc<dyn HookHandler>>,
on_initialize: Option<InitCallback>,
on_shutdown: Option<ShutdownCallback>,
#[cfg(feature = "admin")]
pub(crate) admin_enabled: bool,
#[cfg(feature = "admin")]
on_admin_ready: Option<AdminReadyCallback>,
notification_listeners: BTreeMap<String, NotificationHandler>,
}
impl Microapp {
pub fn new(name: impl Into<String>, version: impl Into<String>) -> Self {
Self {
name: name.into(),
version: version.into(),
tools: BTreeMap::new(),
hooks: BTreeMap::new(),
on_initialize: None,
on_shutdown: None,
#[cfg(feature = "admin")]
admin_enabled: false,
#[cfg(feature = "admin")]
on_admin_ready: None,
notification_listeners: BTreeMap::new(),
}
}
#[cfg(feature = "admin")]
pub fn with_admin(mut self) -> Self {
self.admin_enabled = true;
self
}
#[cfg(feature = "admin")]
pub fn on_admin_ready<F>(mut self, callback: F) -> Self
where
F: FnOnce(Arc<crate::admin::AdminClient>) + Send + 'static,
{
self.on_admin_ready = Some(Box::new(callback));
self
}
pub fn with_notification_listener<F>(
mut self,
method: impl Into<String>,
handler: F,
) -> Self
where
F: Fn(Value) + Send + Sync + 'static,
{
self.notification_listeners
.insert(method.into(), Arc::new(handler));
self
}
pub fn with_tool<H>(mut self, name: impl Into<String>, handler: H) -> Self
where
H: ToolHandler + 'static,
{
self.tools.insert(name.into(), Arc::new(handler));
self
}
pub fn with_hook<H>(mut self, name: impl Into<String>, handler: H) -> Self
where
H: HookHandler + 'static,
{
self.hooks.insert(name.into(), Arc::new(handler));
self
}
pub fn on_initialize<F, Fut>(mut self, callback: F) -> Self
where
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = SdkResult<()>> + Send + 'static,
{
self.on_initialize = Some(Box::pin(move || Box::pin(callback()) as _));
self
}
pub fn on_shutdown<F, Fut>(mut self, callback: F) -> Self
where
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
self.on_shutdown = Some(Box::pin(move || Box::pin(callback()) as _));
self
}
pub async fn run_stdio(self) -> SdkResult<()> {
let stdin = tokio::io::stdin();
let stdout = tokio::io::stdout();
self.run_with(BufReader::new(stdin), stdout).await
}
pub async fn run_with<R, W>(self, reader: R, writer: W) -> SdkResult<()>
where
R: AsyncBufRead + Unpin + Send + 'static,
W: AsyncWrite + Unpin + Send + 'static,
{
let handlers = Handlers {
name: self.name,
version: self.version,
tools: self.tools,
hooks: self.hooks,
#[cfg(feature = "admin")]
admin: None,
notification_listeners: self.notification_listeners,
};
let _ = self.on_initialize;
let _ = self.on_shutdown;
let writer = std::sync::Arc::new(tokio::sync::Mutex::new(writer));
#[cfg(feature = "admin")]
let handlers = {
let mut h = handlers;
if self.admin_enabled {
let sender = std::sync::Arc::new(
crate::admin::WriterAdminSender::new(writer.clone()),
);
let client =
std::sync::Arc::new(crate::admin::AdminClient::new(sender));
if let Some(cb) = self.on_admin_ready {
cb(std::sync::Arc::clone(&client));
}
h.admin = Some(client);
}
h
};
dispatch_loop(reader, writer, handlers).await
}
#[doc(hidden)]
pub fn into_handlers(self) -> Handlers {
Handlers {
name: self.name,
version: self.version,
tools: self.tools,
hooks: self.hooks,
#[cfg(feature = "admin")]
admin: None,
notification_listeners: self.notification_listeners,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ctx::ToolCtx;
use crate::errors::ToolError;
use crate::reply::ToolReply;
use serde_json::Value;
async fn echo(args: Value, _ctx: ToolCtx) -> std::result::Result<ToolReply, ToolError> {
Ok(ToolReply::ok_json(args))
}
#[test]
fn with_tool_registers() {
let app = Microapp::new("t", "0.0.0").with_tool("echo", echo);
let h = app.into_handlers();
assert!(h.tools.contains_key("echo"));
}
#[test]
fn chained_with_tool_preserves_each() {
let app = Microapp::new("t", "0.0.0")
.with_tool("echo", echo)
.with_tool("ping", echo);
let h = app.into_handlers();
assert!(h.tools.contains_key("echo"));
assert!(h.tools.contains_key("ping"));
assert_eq!(h.tools.len(), 2);
}
#[test]
fn name_and_version_passthrough() {
let app = Microapp::new("agent-creator", "0.1.0");
let h = app.into_handlers();
assert_eq!(h.name, "agent-creator");
assert_eq!(h.version, "0.1.0");
}
#[cfg(feature = "admin")]
#[tokio::test]
async fn on_admin_ready_fires_with_live_client_before_dispatch_loop() {
let (tx, rx) = tokio::sync::oneshot::channel::<Arc<crate::admin::AdminClient>>();
let app = Microapp::new("t", "0.0.0")
.with_admin()
.on_admin_ready(move |client| {
let _ = tx.send(client);
});
let reader = tokio::io::BufReader::new(tokio::io::empty());
let writer = Vec::new();
let _ = app.run_with(reader, writer).await;
assert!(rx.await.is_ok(), "on_admin_ready must fire before run_with returns");
}
#[tokio::test]
async fn notification_listener_fires_on_matching_method_with_no_id() {
let req = r#"{"jsonrpc":"2.0","method":"nexo/notify/agent_event","params":{"kind":"transcript_appended"}}"#;
let counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let counter_cb = Arc::clone(&counter);
let app = Microapp::new("t", "0.0.0").with_notification_listener(
"nexo/notify/agent_event",
move |params| {
assert_eq!(params["kind"], "transcript_appended");
counter_cb.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
},
);
let reader = tokio::io::BufReader::new(std::io::Cursor::new(
format!("{req}\n").into_bytes(),
));
let writer = Vec::new();
let _ = app.run_with(reader, writer).await;
assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
}
#[tokio::test]
async fn notification_listener_does_not_fire_for_unregistered_method() {
let req = r#"{"jsonrpc":"2.0","method":"nexo/notify/unknown","params":{}}"#;
let counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let counter_cb = Arc::clone(&counter);
let app = Microapp::new("t", "0.0.0").with_notification_listener(
"nexo/notify/agent_event",
move |_| {
counter_cb.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
},
);
let reader = tokio::io::BufReader::new(std::io::Cursor::new(
format!("{req}\n").into_bytes(),
));
let _ = app.run_with(reader, Vec::new()).await;
assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 0);
}
#[tokio::test]
async fn notification_listener_skipped_when_id_present() {
let req = r#"{"jsonrpc":"2.0","id":1,"method":"nexo/notify/agent_event","params":{}}"#;
let counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let counter_cb = Arc::clone(&counter);
let app = Microapp::new("t", "0.0.0").with_notification_listener(
"nexo/notify/agent_event",
move |_| {
counter_cb.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
},
);
let reader = tokio::io::BufReader::new(std::io::Cursor::new(
format!("{req}\n").into_bytes(),
));
let _ = app.run_with(reader, Vec::new()).await;
assert_eq!(
counter.load(std::sync::atomic::Ordering::SeqCst),
0,
"listener must only fire for notifications, not requests"
);
}
#[cfg(feature = "admin")]
#[tokio::test]
async fn on_admin_ready_does_not_fire_without_with_admin() {
let (tx, rx) = tokio::sync::oneshot::channel::<Arc<crate::admin::AdminClient>>();
let app = Microapp::new("t", "0.0.0").on_admin_ready(move |client| {
let _ = tx.send(client);
});
let reader = tokio::io::BufReader::new(tokio::io::empty());
let writer = Vec::new();
let _ = app.run_with(reader, writer).await;
assert!(rx.await.is_err(), "callback must not fire without with_admin()");
}
}