use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use dashmap::DashMap;
use nexo_broker::{Event, StdioBridgeBroker};
use nexo_llm::types::ChatMessage;
use nexo_memory::MemoryEntry;
use nexo_plugin_manifest::PluginManifest;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader};
use tokio::sync::{mpsc, oneshot, Mutex};
use crate::errors::{Error as SdkError, Result as SdkResult};
pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
pub trait BrokerEventHandler: Send + Sync + 'static {
fn handle(&self, topic: String, event: Event, broker: BrokerSender) -> BoxFuture<'static, ()>;
}
impl<F, Fut> BrokerEventHandler for F
where
F: Fn(String, Event, BrokerSender) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
fn handle(&self, topic: String, event: Event, broker: BrokerSender) -> BoxFuture<'static, ()> {
Box::pin((self)(topic, event, broker))
}
}
pub trait ShutdownHandler: Send + Sync + 'static {
fn handle(&self) -> BoxFuture<'static, Result<(), String>>;
}
impl<F, Fut> ShutdownHandler for F
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), String>> + Send + 'static,
{
fn handle(&self) -> BoxFuture<'static, Result<(), String>> {
Box::pin((self)())
}
}
pub trait ConfigureHandler: Send + Sync + 'static {
fn handle(
&self,
value: serde_yaml::Value,
) -> BoxFuture<'static, Result<(), String>>;
}
impl<F, Fut> ConfigureHandler for F
where
F: Fn(serde_yaml::Value) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), String>> + Send + 'static,
{
fn handle(
&self,
value: serde_yaml::Value,
) -> BoxFuture<'static, Result<(), String>> {
Box::pin((self)(value))
}
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct CredentialsListReply {
pub accounts: Vec<String>,
pub warnings: Vec<String>,
}
pub trait CredentialsListHandler: Send + Sync + 'static {
fn handle(&self) -> BoxFuture<'static, Result<CredentialsListReply, String>>;
}
impl<F, Fut> CredentialsListHandler for F
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<CredentialsListReply, String>> + Send + 'static,
{
fn handle(&self) -> BoxFuture<'static, Result<CredentialsListReply, String>> {
Box::pin((self)())
}
}
pub trait CredentialsIssueHandler: Send + Sync + 'static {
fn handle(
&self,
account_id: String,
agent_id: String,
) -> BoxFuture<'static, Result<(), String>>;
}
impl<F, Fut> CredentialsIssueHandler for F
where
F: Fn(String, String) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), String>> + Send + 'static,
{
fn handle(
&self,
account_id: String,
agent_id: String,
) -> BoxFuture<'static, Result<(), String>> {
Box::pin((self)(account_id, agent_id))
}
}
pub trait CredentialsResolveBytesHandler: Send + Sync + 'static {
fn handle(
&self,
account_id: String,
agent_id: String,
fingerprint: String,
) -> BoxFuture<'static, Result<Vec<u8>, String>>;
}
impl<F, Fut> CredentialsResolveBytesHandler for F
where
F: Fn(String, String, String) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<Vec<u8>, String>> + Send + 'static,
{
fn handle(
&self,
account_id: String,
agent_id: String,
fingerprint: String,
) -> BoxFuture<'static, Result<Vec<u8>, String>> {
Box::pin((self)(account_id, agent_id, fingerprint))
}
}
pub trait CredentialsReloadHandler: Send + Sync + 'static {
fn handle(&self) -> BoxFuture<'static, Result<(), String>>;
}
impl<F, Fut> CredentialsReloadHandler for F
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), String>> + Send + 'static,
{
fn handle(&self) -> BoxFuture<'static, Result<(), String>> {
Box::pin((self)())
}
}
type ChildPending = Arc<DashMap<u64, PendingKind>>;
#[doc(hidden)]
pub enum PendingKind {
Single(oneshot::Sender<Result<Value, RpcError>>),
Streaming {
delta_tx: mpsc::UnboundedSender<String>,
final_tx: oneshot::Sender<Result<LlmCompleteResult, RpcError>>,
},
}
const DEFAULT_RPC_TIMEOUT: Duration = Duration::from_secs(30);
fn next_request_id(counter: &AtomicU64) -> u64 {
counter.fetch_add(1, Ordering::Relaxed)
}
#[derive(Debug, thiserror::Error)]
pub enum RpcError {
#[error("rpc error {code}: {message}")]
Server {
code: i32,
message: String,
},
#[error("rpc request timed out after {0:?}")]
Timeout(Duration),
#[error("rpc transport closed before reply: {0}")]
Transport(String),
#[error("rpc decode error: {0}")]
Decode(String),
}
#[derive(Clone)]
pub struct BrokerSender {
writer: Arc<Mutex<Box<dyn AsyncWrite + Send + Unpin>>>,
pending: ChildPending,
next_id: Arc<AtomicU64>,
}
#[derive(Debug, Clone, Default)]
pub struct LlmCompleteParams {
pub provider: String,
pub model: String,
pub messages: Vec<ChatMessage>,
pub max_tokens: Option<u32>,
pub temperature: Option<f32>,
pub system_prompt: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct LlmCompleteResult {
#[serde(default)]
pub content: String,
pub finish_reason: String,
pub usage: TokenCount,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
pub struct TokenCount {
#[serde(default)]
pub prompt_tokens: u32,
#[serde(default)]
pub completion_tokens: u32,
}
impl BrokerSender {
pub async fn request(
&self,
method: &str,
params: Value,
timeout: Option<Duration>,
) -> Result<Value, RpcError> {
let id = next_request_id(&self.next_id);
let frame = json!({
"jsonrpc": "2.0",
"id": id,
"method": method,
"params": params,
});
let (tx, rx) = oneshot::channel::<Result<Value, RpcError>>();
self.pending.insert(id, PendingKind::Single(tx));
let line = serde_json::to_string(&frame).map_err(|e| {
self.pending.remove(&id);
RpcError::Decode(format!("serialize request: {e}"))
})?;
{
let mut w = self.writer.lock().await;
if w.write_all(line.as_bytes()).await.is_err()
|| w.write_all(b"\n").await.is_err()
|| w.flush().await.is_err()
{
self.pending.remove(&id);
return Err(RpcError::Transport(
"stdin write failed (host closed?)".to_string(),
));
}
}
let timeout = timeout.unwrap_or(DEFAULT_RPC_TIMEOUT);
match tokio::time::timeout(timeout, rx).await {
Ok(Ok(payload)) => payload,
Ok(Err(_canceled)) => {
self.pending.remove(&id);
Err(RpcError::Transport(
"response oneshot canceled before reply".to_string(),
))
}
Err(_elapsed) => {
self.pending.remove(&id);
Err(RpcError::Timeout(timeout))
}
}
}
pub async fn recall_memory(
&self,
agent_id: &str,
query: &str,
limit: u64,
) -> Result<Vec<MemoryEntry>, RpcError> {
let params = json!({
"agent_id": agent_id,
"query": query,
"limit": limit,
});
let result = self.request("memory.recall", params, None).await?;
let entries_val = result.get("entries").cloned().unwrap_or(Value::Null);
serde_json::from_value::<Vec<MemoryEntry>>(entries_val)
.map_err(|e| RpcError::Decode(format!("memory.recall entries: {e}")))
}
pub async fn complete_llm_stream(&self, p: LlmCompleteParams) -> Result<LlmStream, RpcError> {
let mut params = json!({
"provider": p.provider,
"model": p.model,
"messages": p.messages,
"stream": true,
});
if let Some(max) = p.max_tokens {
params["max_tokens"] = json!(max);
}
if let Some(temp) = p.temperature {
params["temperature"] = json!(temp);
}
if let Some(sys) = p.system_prompt {
params["system_prompt"] = json!(sys);
}
let id = next_request_id(&self.next_id);
let frame = json!({
"jsonrpc": "2.0",
"id": id,
"method": "llm.complete",
"params": params,
});
let (delta_tx, delta_rx) = mpsc::unbounded_channel::<String>();
let (final_tx, final_rx) = oneshot::channel::<Result<LlmCompleteResult, RpcError>>();
self.pending
.insert(id, PendingKind::Streaming { delta_tx, final_tx });
let line = serde_json::to_string(&frame).map_err(|e| {
self.pending.remove(&id);
RpcError::Decode(format!("serialize stream request: {e}"))
})?;
{
let mut w = self.writer.lock().await;
if w.write_all(line.as_bytes()).await.is_err()
|| w.write_all(b"\n").await.is_err()
|| w.flush().await.is_err()
{
self.pending.remove(&id);
return Err(RpcError::Transport(
"stdin write failed (host closed?)".to_string(),
));
}
}
Ok(LlmStream {
request_id: id,
chunks: delta_rx,
finished: Some(final_rx),
pending: self.pending.clone(),
})
}
pub async fn complete_llm(&self, p: LlmCompleteParams) -> Result<LlmCompleteResult, RpcError> {
let mut params = json!({
"provider": p.provider,
"model": p.model,
"messages": p.messages,
});
if let Some(max) = p.max_tokens {
params["max_tokens"] = json!(max);
}
if let Some(temp) = p.temperature {
params["temperature"] = json!(temp);
}
if let Some(sys) = p.system_prompt {
params["system_prompt"] = json!(sys);
}
let result = self.request("llm.complete", params, None).await?;
serde_json::from_value::<LlmCompleteResult>(result)
.map_err(|e| RpcError::Decode(format!("llm.complete result: {e}")))
}
pub async fn publish(&self, topic: &str, event: Event) -> SdkResult<()> {
let frame = json!({
"jsonrpc": "2.0",
"method": "broker.publish",
"params": { "topic": topic, "event": event },
});
let line = serde_json::to_string(&frame)
.map_err(|e| SdkError::Io(io::Error::new(io::ErrorKind::Other, e.to_string())))?;
let mut writer = self.writer.lock().await;
writer.write_all(line.as_bytes()).await?;
writer.write_all(b"\n").await?;
writer.flush().await?;
Ok(())
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ToolDef {
pub name: String,
pub description: String,
pub input_schema: serde_json::Value,
}
#[derive(Debug, Clone, serde::Deserialize)]
pub struct ToolInvocation {
pub plugin_id: String,
pub tool_name: String,
#[serde(default)]
pub args: serde_json::Value,
#[serde(default)]
pub agent_id: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, thiserror::Error)]
pub enum ToolInvocationError {
#[error("tool not found: {0}")]
NotFound(String),
#[error("invalid argument: {0}")]
ArgumentInvalid(String),
#[error("execution failed: {0}")]
ExecutionFailed(String),
#[error("unavailable: {0}")]
Unavailable(String),
#[error("denied: {0}")]
Denied(String),
}
impl ToolInvocationError {
pub fn code(&self) -> i32 {
match self {
Self::NotFound(_) => -33401,
Self::ArgumentInvalid(_) => -33402,
Self::ExecutionFailed(_) => -33403,
Self::Unavailable(_) => -33404,
Self::Denied(_) => -33405,
}
}
}
pub trait ToolHandler: Send + Sync + 'static {
fn call(
&self,
invocation: ToolInvocation,
) -> std::pin::Pin<
Box<
dyn std::future::Future<Output = Result<serde_json::Value, ToolInvocationError>> + Send,
>,
>;
}
impl<F, Fut> ToolHandler for F
where
F: Fn(ToolInvocation) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<serde_json::Value, ToolInvocationError>>
+ Send
+ 'static,
{
fn call(
&self,
invocation: ToolInvocation,
) -> std::pin::Pin<
Box<
dyn std::future::Future<Output = Result<serde_json::Value, ToolInvocationError>> + Send,
>,
> {
Box::pin((self)(invocation))
}
}
#[non_exhaustive]
#[derive(Clone)]
pub struct ToolContext {
pub broker: BrokerSender,
pub plugin_id: String,
}
impl std::fmt::Debug for ToolContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ToolContext")
.field("plugin_id", &self.plugin_id)
.field("broker", &"<BrokerSender>")
.finish()
}
}
pub trait ToolHandlerWithContext: Send + Sync + 'static {
fn call(
&self,
invocation: ToolInvocation,
ctx: ToolContext,
) -> std::pin::Pin<
Box<
dyn std::future::Future<Output = Result<serde_json::Value, ToolInvocationError>> + Send,
>,
>;
}
impl<F, Fut> ToolHandlerWithContext for F
where
F: Fn(ToolInvocation, ToolContext) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<serde_json::Value, ToolInvocationError>>
+ Send
+ 'static,
{
fn call(
&self,
invocation: ToolInvocation,
ctx: ToolContext,
) -> std::pin::Pin<
Box<
dyn std::future::Future<Output = Result<serde_json::Value, ToolInvocationError>> + Send,
>,
> {
Box::pin((self)(invocation, ctx))
}
}
pub struct PluginAdapter {
cached_manifest: PluginManifest,
server_version: String,
on_broker_event: Option<Arc<dyn BrokerEventHandler>>,
on_shutdown: Option<Arc<dyn ShutdownHandler>>,
on_configure: Option<Arc<dyn ConfigureHandler>>,
on_credentials_list: Option<Arc<dyn CredentialsListHandler>>,
on_credentials_issue: Option<Arc<dyn CredentialsIssueHandler>>,
on_credentials_resolve_bytes: Option<Arc<dyn CredentialsResolveBytesHandler>>,
on_credentials_reload: Option<Arc<dyn CredentialsReloadHandler>>,
declared_tools: Vec<ToolDef>,
tool_handler: Option<Arc<dyn ToolHandler>>,
tool_handler_with_context: Option<Arc<dyn ToolHandlerWithContext>>,
outbound_drain: Option<mpsc::Receiver<Value>>,
}
pub struct LlmStream {
request_id: u64,
chunks: mpsc::UnboundedReceiver<String>,
finished: Option<oneshot::Receiver<Result<LlmCompleteResult, RpcError>>>,
pending: ChildPending,
}
impl LlmStream {
pub async fn next_chunk(&mut self) -> Option<String> {
self.chunks.recv().await
}
pub async fn await_final(mut self) -> Result<LlmCompleteResult, RpcError> {
let rx = self
.finished
.take()
.ok_or_else(|| RpcError::Transport("await_final already consumed".into()))?;
match rx.await {
Ok(payload) => payload,
Err(_canceled) => Err(RpcError::Transport(
"final response oneshot canceled (host closed mid-stream)".into(),
)),
}
}
}
impl Drop for LlmStream {
fn drop(&mut self) {
self.pending.remove(&self.request_id);
}
}
impl PluginAdapter {
pub fn new(manifest_toml: &str) -> SdkResult<Self> {
let cached_manifest: PluginManifest = toml::from_str(manifest_toml).map_err(|e| {
SdkError::Io(io::Error::new(
io::ErrorKind::InvalidData,
format!("PluginAdapter: parse manifest TOML failed: {e}"),
))
})?;
let server_version = format!(
"{}-{}",
cached_manifest.plugin.id, cached_manifest.plugin.version
);
Ok(Self {
cached_manifest,
server_version,
on_broker_event: None,
on_shutdown: None,
on_configure: None,
on_credentials_list: None,
on_credentials_issue: None,
on_credentials_resolve_bytes: None,
on_credentials_reload: None,
declared_tools: Vec::new(),
tool_handler: None,
tool_handler_with_context: None,
outbound_drain: None,
})
}
pub fn with_server_version(mut self, version: impl Into<String>) -> Self {
self.server_version = version.into();
self
}
pub fn on_broker_event<H: BrokerEventHandler>(mut self, handler: H) -> Self {
self.on_broker_event = Some(Arc::new(handler));
self
}
pub fn with_stdio_bridge_broker(mut self) -> (Self, Arc<StdioBridgeBroker>) {
let (broker, drain_rx) = StdioBridgeBroker::with_channel();
let broker_arc = Arc::new(broker);
let broker_for_evt = broker_arc.clone();
self.on_broker_event = Some(Arc::new(
move |topic: String, event: Event, _sender: BrokerSender| {
let b = broker_for_evt.clone();
Box::pin(async move {
b.feed_event(topic, event).await;
}) as BoxFuture<'static, ()>
},
));
self.outbound_drain = Some(drain_rx);
(self, broker_arc)
}
pub fn declare_tools(mut self, defs: impl IntoIterator<Item = ToolDef>) -> Self {
self.declared_tools = defs.into_iter().collect();
self
}
pub fn on_tool<H: ToolHandler>(mut self, handler: H) -> Self {
self.tool_handler = Some(Arc::new(handler));
self
}
pub fn on_tool_with_context<H: ToolHandlerWithContext>(mut self, handler: H) -> Self {
self.tool_handler_with_context = Some(Arc::new(handler));
self
}
pub fn on_shutdown<H: ShutdownHandler>(mut self, handler: H) -> Self {
self.on_shutdown = Some(Arc::new(handler));
self
}
pub fn on_configure<H: ConfigureHandler>(mut self, handler: H) -> Self {
self.on_configure = Some(Arc::new(handler));
self
}
pub fn on_credentials_list<H: CredentialsListHandler>(mut self, handler: H) -> Self {
self.on_credentials_list = Some(Arc::new(handler));
self
}
pub fn on_credentials_issue<H: CredentialsIssueHandler>(mut self, handler: H) -> Self {
self.on_credentials_issue = Some(Arc::new(handler));
self
}
pub fn on_credentials_resolve_bytes<H: CredentialsResolveBytesHandler>(
mut self,
handler: H,
) -> Self {
self.on_credentials_resolve_bytes = Some(Arc::new(handler));
self
}
pub fn on_credentials_reload<H: CredentialsReloadHandler>(mut self, handler: H) -> Self {
self.on_credentials_reload = Some(Arc::new(handler));
self
}
pub async fn run_stdio(self) -> SdkResult<()> {
let stdin = io::stdin();
let stdout = io::stdout();
self.run(BufReader::new(stdin), stdout).await
}
pub async fn run<R, W>(self, reader: R, writer: W) -> SdkResult<()>
where
R: AsyncBufRead + Unpin + Send + 'static,
W: AsyncWrite + Unpin + Send + 'static,
{
let writer: Arc<Mutex<Box<dyn AsyncWrite + Send + Unpin>>> =
Arc::new(Mutex::new(Box::new(writer)));
dispatch_loop(reader, writer, self).await
}
}
async fn dispatch_loop<R>(
reader: R,
writer: Arc<Mutex<Box<dyn AsyncWrite + Send + Unpin>>>,
mut adapter: PluginAdapter,
) -> SdkResult<()>
where
R: AsyncBufRead + Unpin + Send + 'static,
{
if let Some(mut drain) = adapter.outbound_drain.take() {
let writer_for_drain = writer.clone();
tokio::spawn(async move {
while let Some(frame) = drain.recv().await {
let line = match serde_json::to_string(&frame) {
Ok(s) => s,
Err(e) => {
tracing::warn!(error = %e, "outbound drain: serialize failed");
continue;
}
};
let mut w = writer_for_drain.lock().await;
if let Err(e) = w.write_all(line.as_bytes()).await {
tracing::warn!(error = %e, "outbound drain: write failed (host closed?)");
return;
}
if let Err(e) = w.write_all(b"\n").await {
tracing::warn!(error = %e, "outbound drain: newline failed");
return;
}
if let Err(e) = w.flush().await {
tracing::warn!(error = %e, "outbound drain: flush failed");
return;
}
}
});
}
let mut lines = reader.lines();
let manifest_value = serde_json::to_value(&adapter.cached_manifest)
.map_err(|e| SdkError::Io(io::Error::new(io::ErrorKind::Other, e.to_string())))?;
let pending: ChildPending = Arc::new(DashMap::new());
let next_id: Arc<AtomicU64> = Arc::new(AtomicU64::new(100));
while let Some(line) = lines.next_line().await? {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let frame: Value = match serde_json::from_str(trimmed) {
Ok(v) => v,
Err(e) => {
write_error(&writer, None, -32700, &format!("parse error: {e}")).await?;
continue;
}
};
let id = frame.get("id").cloned();
let method = frame.get("method").and_then(Value::as_str).unwrap_or("");
let params = frame.get("params").cloned().unwrap_or(Value::Null);
if let Some(id_val) = id.as_ref() {
if method.is_empty() {
if let Some(req_id) = id_val.as_u64() {
if let Some((_, kind)) = pending.remove(&req_id) {
let err_obj = frame.get("error").cloned();
let result_val = frame.get("result").cloned().unwrap_or(Value::Null);
match kind {
PendingKind::Single(sender) => {
let payload = if let Some(err) = err_obj {
let code =
err.get("code").and_then(|v| v.as_i64()).unwrap_or(-32603)
as i32;
let message = err
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("(no message)")
.to_string();
Err(RpcError::Server { code, message })
} else {
Ok(result_val)
};
let _ = sender.send(payload);
}
PendingKind::Streaming { final_tx, .. } => {
let payload = if let Some(err) = err_obj {
let code =
err.get("code").and_then(|v| v.as_i64()).unwrap_or(-32603)
as i32;
let message = err
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("(no message)")
.to_string();
Err(RpcError::Server { code, message })
} else {
serde_json::from_value::<LlmCompleteResult>(result_val).map_err(
|e| {
RpcError::Decode(format!(
"llm.complete stream final result: {e}"
))
},
)
};
let _ = final_tx.send(payload);
}
}
continue;
}
tracing::debug!(
id = req_id,
"rpc response with unknown id — drop (likely after timeout)"
);
continue;
}
}
}
if id.is_none() {
if method == "broker.event" {
let topic = params
.get("topic")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
let event_val = params.get("event").cloned().unwrap_or(Value::Null);
let event: Event = match serde_json::from_value(event_val) {
Ok(e) => e,
Err(e) => {
tracing::warn!(error = %e, topic, "broker.event: deserialize Event failed — drop");
continue;
}
};
if let Some(handler) = &adapter.on_broker_event {
let sender = BrokerSender {
writer: writer.clone(),
pending: pending.clone(),
next_id: next_id.clone(),
};
let handler_clone = handler.clone();
tokio::spawn(async move {
handler_clone.handle(topic, event, sender).await;
});
}
} else if method == "llm.complete.delta" {
let req_id = params.get("request_id").and_then(|v| v.as_u64());
let chunk = params.get("chunk").and_then(|v| v.as_str()).unwrap_or("");
if let Some(req_id) = req_id {
if let Some(entry) = pending.get(&req_id) {
if let PendingKind::Streaming { delta_tx, .. } = entry.value() {
let _ = delta_tx.send(chunk.to_string());
} else {
tracing::debug!(
request_id = req_id,
"llm.complete.delta arrived for non-streaming pending — drop"
);
}
} else {
tracing::debug!(
request_id = req_id,
"llm.complete.delta with unknown request_id — drop"
);
}
}
} else {
tracing::debug!(method, "unhandled notification — drop");
}
continue;
}
match method {
"initialize" => {
let mut result_obj = json!({
"manifest": manifest_value,
"server_version": adapter.server_version,
});
if !adapter.declared_tools.is_empty() {
let tools_value =
serde_json::to_value(&adapter.declared_tools).map_err(|e| {
SdkError::Io(io::Error::new(
io::ErrorKind::InvalidData,
format!("declared_tools serialise failed: {e}"),
))
})?;
if let Some(map) = result_obj.as_object_mut() {
map.insert("tools".to_string(), tools_value);
}
}
write_result(&writer, id, result_obj).await?;
}
"shutdown" => {
if let Some(handler) = &adapter.on_shutdown {
match handler.handle().await {
Ok(()) => {
write_result(&writer, id, json!({"ok": true})).await?;
}
Err(e) => {
write_error(&writer, id, -32000, &e).await?;
}
}
} else {
write_result(&writer, id, json!({"ok": true})).await?;
}
break;
}
"plugin.configure" => {
let yaml_value: serde_yaml::Value = params
.get("value")
.cloned()
.map(|v| serde_yaml::to_value(&v).unwrap_or(serde_yaml::Value::Null))
.unwrap_or(serde_yaml::Value::Null);
if let Some(handler) = &adapter.on_configure {
match handler.handle(yaml_value).await {
Ok(()) => write_result(&writer, id, json!({})).await?,
Err(e) => write_error(&writer, id, -32603, &e).await?,
}
} else {
write_result(&writer, id, json!({})).await?;
}
}
"plugin.credentials.list" => {
if let Some(handler) = &adapter.on_credentials_list {
match handler.handle().await {
Ok(reply) => {
let value = serde_json::to_value(&reply).unwrap_or_else(|_| {
json!({ "accounts": [], "warnings": [] })
});
write_result(&writer, id, value).await?;
}
Err(e) => write_error(&writer, id, -32603, &e).await?,
}
} else {
write_error(
&writer,
id,
-32601,
"method not found: plugin.credentials.list (no handler registered — call PluginAdapter::on_credentials_list)",
)
.await?;
}
}
"plugin.credentials.issue" => {
let account_id = params.get("account_id").and_then(|v| v.as_str());
let agent_id = params.get("agent_id").and_then(|v| v.as_str());
let (Some(account_id), Some(agent_id)) = (account_id, agent_id) else {
write_error(
&writer,
id,
-32602,
"invalid params: missing account_id or agent_id",
)
.await?;
continue;
};
if let Some(handler) = &adapter.on_credentials_issue {
match handler
.handle(account_id.to_string(), agent_id.to_string())
.await
{
Ok(()) => write_result(&writer, id, json!({ "ok": true })).await?,
Err(e) => write_error(&writer, id, -32603, &e).await?,
}
} else {
write_error(
&writer,
id,
-32601,
"method not found: plugin.credentials.issue (no handler registered — call PluginAdapter::on_credentials_issue)",
)
.await?;
}
}
"plugin.credentials.resolve_bytes" => {
let account_id = params.get("account_id").and_then(|v| v.as_str());
let agent_id = params.get("agent_id").and_then(|v| v.as_str());
let fingerprint = params.get("fingerprint").and_then(|v| v.as_str());
let (Some(account_id), Some(agent_id), Some(fingerprint)) =
(account_id, agent_id, fingerprint)
else {
write_error(
&writer,
id,
-32602,
"invalid params: missing account_id, agent_id, or fingerprint",
)
.await?;
continue;
};
if let Some(handler) = &adapter.on_credentials_resolve_bytes {
match handler
.handle(
account_id.to_string(),
agent_id.to_string(),
fingerprint.to_string(),
)
.await
{
Ok(bytes) => {
use base64::Engine as _;
let b64 =
base64::engine::general_purpose::STANDARD.encode(&bytes);
write_result(&writer, id, json!({ "bytes_b64": b64 })).await?;
}
Err(e) => write_error(&writer, id, -32603, &e).await?,
}
} else {
write_error(
&writer,
id,
-32601,
"method not found: plugin.credentials.resolve_bytes (no handler registered — call PluginAdapter::on_credentials_resolve_bytes)",
)
.await?;
}
}
"plugin.credentials.reload" => {
if let Some(handler) = &adapter.on_credentials_reload {
match handler.handle().await {
Ok(()) => write_result(&writer, id, json!({ "ok": true })).await?,
Err(e) => write_error(&writer, id, -32603, &e).await?,
}
} else {
write_error(
&writer,
id,
-32601,
"method not found: plugin.credentials.reload (no handler registered — call PluginAdapter::on_credentials_reload)",
)
.await?;
}
}
"tool.invoke" => {
let with_ctx = adapter.tool_handler_with_context.clone();
let plain = adapter.tool_handler.clone();
if with_ctx.is_none() && plain.is_none() {
write_error(
&writer,
id,
-32601,
"method not found: tool.invoke (no handler registered — call PluginAdapter::on_tool or on_tool_with_context)",
)
.await?;
continue;
}
let params = frame.get("params").cloned().unwrap_or(Value::Null);
let invocation: ToolInvocation = match serde_json::from_value(params) {
Ok(inv) => inv,
Err(e) => {
write_error(
&writer,
id,
-32602,
&format!("tool.invoke: invalid params: {e}"),
)
.await?;
continue;
}
};
let result = if let Some(handler) = with_ctx {
let ctx = ToolContext {
broker: BrokerSender {
writer: writer.clone(),
pending: pending.clone(),
next_id: next_id.clone(),
},
plugin_id: adapter.cached_manifest.plugin.id.clone(),
};
handler.call(invocation, ctx).await
} else {
plain.expect("checked above").call(invocation).await
};
match result {
Ok(value) => {
write_result(&writer, id, value).await?;
}
Err(err) => {
let code = err.code();
write_error(&writer, id, code, &err.to_string()).await?;
}
}
}
other => {
write_error(&writer, id, -32601, &format!("method not found: {other}")).await?;
}
}
}
Ok(())
}
async fn write_result(
writer: &Arc<Mutex<Box<dyn AsyncWrite + Send + Unpin>>>,
id: Option<Value>,
result: Value,
) -> SdkResult<()> {
let frame = json!({
"jsonrpc": "2.0",
"id": id.unwrap_or(Value::Null),
"result": result,
});
write_line(writer, &frame).await
}
async fn write_error(
writer: &Arc<Mutex<Box<dyn AsyncWrite + Send + Unpin>>>,
id: Option<Value>,
code: i32,
message: &str,
) -> SdkResult<()> {
let frame = json!({
"jsonrpc": "2.0",
"id": id.unwrap_or(Value::Null),
"error": { "code": code, "message": message },
});
write_line(writer, &frame).await
}
async fn write_line(
writer: &Arc<Mutex<Box<dyn AsyncWrite + Send + Unpin>>>,
frame: &Value,
) -> SdkResult<()> {
let line = serde_json::to_string(frame)
.map_err(|e| SdkError::Io(io::Error::new(io::ErrorKind::Other, e.to_string())))?;
let mut w = writer.lock().await;
w.write_all(line.as_bytes()).await?;
w.write_all(b"\n").await?;
w.flush().await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use tokio::io::{duplex, BufReader as TokioBufReader};
const TEST_MANIFEST: &str = r#"
[plugin]
id = "test_plugin"
version = "0.1.0"
name = "test"
description = "fixture"
min_nexo_version = ">=0.1.0"
"#;
#[test]
fn tool_def_serde_round_trip() {
let def = ToolDef {
name: "test_plugin_echo".into(),
description: "Echo the args back.".into(),
input_schema: serde_json::json!({
"type": "object",
"properties": { "msg": { "type": "string" } },
"required": ["msg"],
}),
};
let s = serde_json::to_string(&def).unwrap();
assert!(s.contains("\"name\":\"test_plugin_echo\""));
assert!(s.contains("\"description\":\"Echo the args back.\""));
assert!(s.contains("\"input_schema\""));
let back: ToolDef = serde_json::from_str(&s).unwrap();
assert_eq!(back.name, def.name);
assert_eq!(back.description, def.description);
assert_eq!(back.input_schema, def.input_schema);
}
#[test]
fn tool_invocation_args_default_to_null() {
let raw = r#"{ "plugin_id": "p", "tool_name": "t" }"#;
let inv: ToolInvocation = serde_json::from_str(raw).unwrap();
assert_eq!(inv.plugin_id, "p");
assert_eq!(inv.tool_name, "t");
assert_eq!(inv.args, serde_json::Value::Null);
assert!(inv.agent_id.is_none());
}
#[test]
fn tool_invocation_full_shape_round_trip() {
let raw = r#"{
"plugin_id": "browser",
"tool_name": "browser_navigate",
"args": { "url": "about:blank" },
"agent_id": "ana"
}"#;
let inv: ToolInvocation = serde_json::from_str(raw).unwrap();
assert_eq!(inv.tool_name, "browser_navigate");
assert_eq!(inv.args["url"], "about:blank");
assert_eq!(inv.agent_id.as_deref(), Some("ana"));
}
#[test]
fn tool_invocation_error_codes_match_contract_v1_10_band() {
assert_eq!(ToolInvocationError::NotFound("x".into()).code(), -33401);
assert_eq!(
ToolInvocationError::ArgumentInvalid("x".into()).code(),
-33402
);
assert_eq!(
ToolInvocationError::ExecutionFailed("x".into()).code(),
-33403
);
assert_eq!(ToolInvocationError::Unavailable("x".into()).code(), -33404);
assert_eq!(ToolInvocationError::Denied("x".into()).code(), -33405);
}
#[test]
fn tool_invocation_error_messages_format_with_payload() {
let e = ToolInvocationError::NotFound("browser_thirteenth".into());
assert_eq!(e.to_string(), "tool not found: browser_thirteenth");
let e = ToolInvocationError::ExecutionFailed("CDP 500".into());
assert_eq!(e.to_string(), "execution failed: CDP 500");
}
#[tokio::test]
async fn tool_handler_blanket_impl_accepts_closure() {
let handler = |inv: ToolInvocation| async move {
match inv.tool_name.as_str() {
"echo" => Ok(inv.args),
other => Err(ToolInvocationError::NotFound(other.into())),
}
};
let inv = ToolInvocation {
plugin_id: "p".into(),
tool_name: "echo".into(),
args: serde_json::json!({"hello": "world"}),
agent_id: None,
};
let out = ToolHandler::call(&handler, inv).await.unwrap();
assert_eq!(out, serde_json::json!({"hello": "world"}));
}
#[tokio::test]
async fn tool_handler_blanket_impl_propagates_error_variant() {
let handler = |_inv: ToolInvocation| async move {
Err::<serde_json::Value, _>(ToolInvocationError::Denied("nope".into()))
};
let inv = ToolInvocation {
plugin_id: "p".into(),
tool_name: "x".into(),
args: serde_json::Value::Null,
agent_id: None,
};
let err = ToolHandler::call(&handler, inv).await.unwrap_err();
assert_eq!(err.code(), -33405);
}
async fn run_adapter_on_duplex(
adapter: PluginAdapter,
) -> (
tokio::io::WriteHalf<tokio::io::DuplexStream>,
TokioBufReader<tokio::io::ReadHalf<tokio::io::DuplexStream>>,
tokio::task::JoinHandle<SdkResult<()>>,
) {
let (host_writer_end, plugin_reader_end) = duplex(8192);
let (plugin_writer_end, host_reader_end) = duplex(8192);
let plugin_reader = TokioBufReader::new(plugin_reader_end);
let plugin_writer = plugin_writer_end;
let join = tokio::spawn(adapter.run(plugin_reader, plugin_writer));
let (_unused_read, host_write) = tokio::io::split(host_writer_end);
let (host_read, _unused_write) = tokio::io::split(host_reader_end);
(host_write, TokioBufReader::new(host_read), join)
}
async fn read_reply_line(
reader: &mut TokioBufReader<tokio::io::ReadHalf<tokio::io::DuplexStream>>,
) -> Value {
let mut buf = String::new();
reader.read_line(&mut buf).await.expect("read reply line");
serde_json::from_str(buf.trim()).expect("reply parses as JSON")
}
#[tokio::test]
async fn initialize_replies_with_cached_manifest() {
let adapter = PluginAdapter::new(TEST_MANIFEST).expect("manifest parses");
let (mut host_write, mut host_read, _join) = run_adapter_on_duplex(adapter).await;
host_write
.write_all(b"{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"initialize\",\"params\":{}}\n")
.await
.unwrap();
let reply = read_reply_line(&mut host_read).await;
assert_eq!(reply["jsonrpc"], "2.0");
assert_eq!(reply["id"], 1);
assert_eq!(reply["result"]["manifest"]["plugin"]["id"], "test_plugin");
assert_eq!(reply["result"]["server_version"], "test_plugin-0.1.0");
}
#[tokio::test]
async fn initialize_reply_omits_tools_when_none_declared() {
let adapter = PluginAdapter::new(TEST_MANIFEST).expect("manifest parses");
let (mut host_write, mut host_read, _join) = run_adapter_on_duplex(adapter).await;
host_write
.write_all(b"{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"initialize\",\"params\":{}}\n")
.await
.unwrap();
let reply = read_reply_line(&mut host_read).await;
assert!(
reply["result"].get("tools").is_none(),
"expected no `tools` field; got: {}",
reply["result"]
);
}
#[tokio::test]
async fn initialize_reply_includes_declared_tools_array() {
let defs = vec![
ToolDef {
name: "test_plugin_echo".into(),
description: "Echo args.".into(),
input_schema: serde_json::json!({"type":"object"}),
},
ToolDef {
name: "test_plugin_ping".into(),
description: "Ping/pong.".into(),
input_schema: serde_json::json!({"type":"object"}),
},
];
let adapter = PluginAdapter::new(TEST_MANIFEST)
.expect("manifest parses")
.declare_tools(defs);
let (mut host_write, mut host_read, _join) = run_adapter_on_duplex(adapter).await;
host_write
.write_all(b"{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"initialize\",\"params\":{}}\n")
.await
.unwrap();
let reply = read_reply_line(&mut host_read).await;
let tools = reply["result"]["tools"].as_array().expect("tools array");
assert_eq!(tools.len(), 2);
assert_eq!(tools[0]["name"], "test_plugin_echo");
assert_eq!(tools[1]["name"], "test_plugin_ping");
}
#[tokio::test]
async fn tool_invoke_routes_to_registered_handler() {
let adapter = PluginAdapter::new(TEST_MANIFEST)
.expect("manifest parses")
.on_tool(
|inv: ToolInvocation| async move { Ok(serde_json::json!({"echoed": inv.args})) },
);
let (mut host_write, mut host_read, _join) = run_adapter_on_duplex(adapter).await;
host_write
.write_all(
br#"{"jsonrpc":"2.0","id":7,"method":"tool.invoke","params":{"plugin_id":"test_plugin","tool_name":"echo","args":{"x":1}}}
"#,
)
.await
.unwrap();
let reply = read_reply_line(&mut host_read).await;
assert_eq!(reply["id"], 7);
assert_eq!(reply["result"]["echoed"]["x"], 1);
}
#[tokio::test]
async fn tool_invoke_handler_error_maps_to_minus_33401() {
let adapter = PluginAdapter::new(TEST_MANIFEST)
.expect("manifest parses")
.on_tool(|inv: ToolInvocation| async move {
Err::<serde_json::Value, _>(ToolInvocationError::NotFound(inv.tool_name))
});
let (mut host_write, mut host_read, _join) = run_adapter_on_duplex(adapter).await;
host_write
.write_all(
br#"{"jsonrpc":"2.0","id":8,"method":"tool.invoke","params":{"plugin_id":"test_plugin","tool_name":"unknown"}}
"#,
)
.await
.unwrap();
let reply = read_reply_line(&mut host_read).await;
assert_eq!(reply["id"], 8);
assert_eq!(reply["error"]["code"], -33401);
assert!(reply["error"]["message"]
.as_str()
.unwrap()
.contains("unknown"));
}
#[tokio::test]
async fn tool_invoke_without_handler_returns_method_not_found() {
let adapter = PluginAdapter::new(TEST_MANIFEST).expect("manifest parses");
let (mut host_write, mut host_read, _join) = run_adapter_on_duplex(adapter).await;
host_write
.write_all(
br#"{"jsonrpc":"2.0","id":9,"method":"tool.invoke","params":{"plugin_id":"test_plugin","tool_name":"x"}}
"#,
)
.await
.unwrap();
let reply = read_reply_line(&mut host_read).await;
assert_eq!(reply["id"], 9);
assert_eq!(reply["error"]["code"], -32601);
}
#[tokio::test]
async fn tool_invoke_with_context_handler_receives_broker_and_plugin_id() {
let adapter = PluginAdapter::new(TEST_MANIFEST)
.expect("manifest parses")
.on_tool_with_context(|inv: ToolInvocation, ctx: ToolContext| async move {
Ok(serde_json::json!({
"echoed_plugin_id": ctx.plugin_id,
"tool_name": inv.tool_name,
}))
});
let (mut host_write, mut host_read, _join) = run_adapter_on_duplex(adapter).await;
host_write
.write_all(
br#"{"jsonrpc":"2.0","id":10,"method":"tool.invoke","params":{"plugin_id":"test_plugin","tool_name":"ping"}}
"#,
)
.await
.unwrap();
let reply = read_reply_line(&mut host_read).await;
assert_eq!(reply["id"], 10);
assert_eq!(reply["result"]["echoed_plugin_id"], "test_plugin");
assert_eq!(reply["result"]["tool_name"], "ping");
}
#[tokio::test]
async fn tool_invoke_with_context_takes_precedence_over_plain() {
let adapter = PluginAdapter::new(TEST_MANIFEST)
.expect("manifest parses")
.on_tool(|_inv: ToolInvocation| async move { Ok(serde_json::json!({"path": "plain"})) })
.on_tool_with_context(|_inv, _ctx| async move {
Ok(serde_json::json!({"path": "with_context"}))
});
let (mut host_write, mut host_read, _join) = run_adapter_on_duplex(adapter).await;
host_write
.write_all(
br#"{"jsonrpc":"2.0","id":11,"method":"tool.invoke","params":{"plugin_id":"test_plugin","tool_name":"x"}}
"#,
)
.await
.unwrap();
let reply = read_reply_line(&mut host_read).await;
assert_eq!(reply["result"]["path"], "with_context");
}
#[tokio::test]
async fn broker_event_dispatches_to_user_handler() {
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
let adapter = PluginAdapter::new(TEST_MANIFEST)
.expect("manifest parses")
.on_broker_event(move |topic: String, event: Event, _broker: BrokerSender| {
let called = called_clone.clone();
async move {
assert_eq!(topic, "plugin.outbound.test");
assert_eq!(event.source, "host");
called.store(true, Ordering::SeqCst);
}
});
let (mut host_write, _host_read, _join) = run_adapter_on_duplex(adapter).await;
let frame = serde_json::json!({
"jsonrpc": "2.0",
"method": "broker.event",
"params": {
"topic": "plugin.outbound.test",
"event": {
"id": "00000000-0000-0000-0000-000000000010",
"timestamp": "2026-05-01T00:00:00Z",
"topic": "plugin.outbound.test",
"source": "host",
"session_id": null,
"payload": {"hello": "world"},
}
}
});
let line = format!("{}\n", serde_json::to_string(&frame).unwrap());
host_write.write_all(line.as_bytes()).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
assert!(called.load(Ordering::SeqCst), "handler must be invoked");
}
#[tokio::test]
async fn broker_sender_writes_publish_notification() {
let adapter = PluginAdapter::new(TEST_MANIFEST)
.expect("manifest parses")
.on_broker_event(
|_topic: String, event: Event, broker: BrokerSender| async move {
let echo = Event::new(
"plugin.inbound.test",
"plugin",
serde_json::json!({"echo": event.payload}),
);
broker
.publish("plugin.inbound.test", echo)
.await
.expect("publish ok");
},
);
let (mut host_write, mut host_read, _join) = run_adapter_on_duplex(adapter).await;
let frame = serde_json::json!({
"jsonrpc": "2.0",
"method": "broker.event",
"params": {
"topic": "plugin.outbound.test",
"event": {
"id": "00000000-0000-0000-0000-000000000010",
"timestamp": "2026-05-01T00:00:00Z",
"topic": "plugin.outbound.test",
"source": "host",
"session_id": null,
"payload": {"foo": "bar"},
}
}
});
let line = format!("{}\n", serde_json::to_string(&frame).unwrap());
host_write.write_all(line.as_bytes()).await.unwrap();
let reply = read_reply_line(&mut host_read).await;
assert!(reply.get("id").is_none(), "publish must have NO id");
assert_eq!(reply["method"], "broker.publish");
assert_eq!(reply["params"]["topic"], "plugin.inbound.test");
assert_eq!(reply["params"]["event"]["payload"]["echo"]["foo"], "bar");
}
#[tokio::test]
async fn shutdown_invokes_handler_and_breaks_loop() {
let calls = Arc::new(AtomicU32::new(0));
let calls_clone = calls.clone();
let adapter = PluginAdapter::new(TEST_MANIFEST)
.expect("manifest parses")
.on_shutdown(move || {
let calls = calls_clone.clone();
async move {
calls.fetch_add(1, Ordering::SeqCst);
Ok(())
}
});
let (mut host_write, mut host_read, join) = run_adapter_on_duplex(adapter).await;
host_write
.write_all(b"{\"jsonrpc\":\"2.0\",\"id\":7,\"method\":\"shutdown\",\"params\":{}}\n")
.await
.unwrap();
let reply = read_reply_line(&mut host_read).await;
assert_eq!(reply["id"], 7);
assert_eq!(reply["result"]["ok"], true);
let res = tokio::time::timeout(std::time::Duration::from_millis(500), join).await;
assert!(
res.is_ok(),
"dispatch loop must exit promptly after shutdown"
);
assert_eq!(calls.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn unknown_method_returns_neg_32601() {
let adapter = PluginAdapter::new(TEST_MANIFEST).expect("manifest parses");
let (mut host_write, mut host_read, _join) = run_adapter_on_duplex(adapter).await;
host_write
.write_all(b"{\"jsonrpc\":\"2.0\",\"id\":5,\"method\":\"bogus\",\"params\":{}}\n")
.await
.unwrap();
let reply = read_reply_line(&mut host_read).await;
assert_eq!(reply["id"], 5);
assert_eq!(reply["error"]["code"], -32601);
assert!(reply["error"]["message"]
.as_str()
.unwrap()
.contains("bogus"));
}
#[tokio::test]
async fn parse_error_returns_neg_32700() {
let adapter = PluginAdapter::new(TEST_MANIFEST).expect("manifest parses");
let (mut host_write, mut host_read, _join) = run_adapter_on_duplex(adapter).await;
host_write.write_all(b"not-json\n").await.unwrap();
let reply = read_reply_line(&mut host_read).await;
assert_eq!(reply["error"]["code"], -32700);
}
#[tokio::test]
async fn request_helper_round_trips_via_dispatch_loop() {
use std::sync::atomic::{AtomicBool, Ordering};
let observed = Arc::new(AtomicBool::new(false));
let observed_clone = observed.clone();
let adapter = PluginAdapter::new(TEST_MANIFEST)
.expect("manifest parses")
.on_broker_event(move |_topic, _event, broker: BrokerSender| {
let observed = observed_clone.clone();
async move {
let result = broker
.request(
"test.echo",
serde_json::json!({"x": 1}),
Some(Duration::from_secs(1)),
)
.await;
match result {
Ok(v) => {
assert_eq!(v["echoed"], 1);
observed.store(true, Ordering::SeqCst);
}
Err(e) => {
panic!("request must succeed, got {e}")
}
}
}
});
let (mut host_write, mut host_read, _join) = run_adapter_on_duplex(adapter).await;
let trigger = serde_json::json!({
"jsonrpc": "2.0",
"method": "broker.event",
"params": {
"topic": "plugin.outbound.test",
"event": {
"id": "00000000-0000-0000-0000-000000000010",
"timestamp": "2026-05-01T00:00:00Z",
"topic": "plugin.outbound.test",
"source": "host",
"session_id": null,
"payload": {}
}
}
});
host_write
.write_all(format!("{}\n", trigger).as_bytes())
.await
.unwrap();
let request_frame = read_reply_line(&mut host_read).await;
assert_eq!(request_frame["method"], "test.echo");
let req_id = request_frame["id"].as_u64().expect("id is u64");
assert!(req_id >= 100, "child ids start at 100, got {req_id}");
assert_eq!(request_frame["params"]["x"], 1);
let response = serde_json::json!({
"jsonrpc": "2.0",
"id": req_id,
"result": {"echoed": request_frame["params"]["x"]},
});
host_write
.write_all(format!("{}\n", response).as_bytes())
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(150)).await;
assert!(
observed.load(Ordering::SeqCst),
"handler must observe the response"
);
}
#[tokio::test]
async fn request_helper_propagates_server_error() {
use std::sync::atomic::{AtomicBool, Ordering};
let observed = Arc::new(AtomicBool::new(false));
let observed_clone = observed.clone();
let adapter = PluginAdapter::new(TEST_MANIFEST)
.expect("manifest parses")
.on_broker_event(move |_t, _e, broker: BrokerSender| {
let observed = observed_clone.clone();
async move {
let result = broker
.request(
"memory.recall",
serde_json::json!({"agent_id": "x", "query": "x"}),
Some(Duration::from_secs(1)),
)
.await;
match result {
Err(RpcError::Server { code, message }) => {
assert_eq!(code, -32603);
assert!(message.contains("not configured"));
observed.store(true, Ordering::SeqCst);
}
other => panic!("expected RpcError::Server, got {other:?}"),
}
}
});
let (mut host_write, mut host_read, _join) = run_adapter_on_duplex(adapter).await;
let trigger = serde_json::json!({
"jsonrpc": "2.0",
"method": "broker.event",
"params": {
"topic": "plugin.outbound.test",
"event": {
"id": "00000000-0000-0000-0000-000000000010",
"timestamp": "2026-05-01T00:00:00Z",
"topic": "plugin.outbound.test",
"source": "host",
"session_id": null,
"payload": {}
}
}
});
host_write
.write_all(format!("{}\n", trigger).as_bytes())
.await
.unwrap();
let req = read_reply_line(&mut host_read).await;
let err_resp = serde_json::json!({
"jsonrpc": "2.0",
"id": req["id"],
"error": { "code": -32603, "message": "memory not configured" }
});
host_write
.write_all(format!("{}\n", err_resp).as_bytes())
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(150)).await;
assert!(
observed.load(Ordering::SeqCst),
"handler must observe RpcError::Server"
);
}
#[tokio::test]
async fn request_helper_times_out_when_host_silent() {
use std::sync::atomic::{AtomicBool, Ordering};
let observed = Arc::new(AtomicBool::new(false));
let observed_clone = observed.clone();
let adapter = PluginAdapter::new(TEST_MANIFEST)
.expect("manifest parses")
.on_broker_event(move |_t, _e, broker: BrokerSender| {
let observed = observed_clone.clone();
async move {
let result = broker
.request(
"test.silent",
serde_json::json!({}),
Some(Duration::from_millis(150)),
)
.await;
match result {
Err(RpcError::Timeout(_)) => {
observed.store(true, Ordering::SeqCst);
}
other => panic!("expected Timeout, got {other:?}"),
}
}
});
let (mut host_write, mut host_read, _join) = run_adapter_on_duplex(adapter).await;
let trigger = serde_json::json!({
"jsonrpc": "2.0",
"method": "broker.event",
"params": {
"topic": "plugin.outbound.test",
"event": {
"id": "00000000-0000-0000-0000-000000000010",
"timestamp": "2026-05-01T00:00:00Z",
"topic": "plugin.outbound.test",
"source": "host",
"session_id": null,
"payload": {}
}
}
});
host_write
.write_all(format!("{}\n", trigger).as_bytes())
.await
.unwrap();
let _req = read_reply_line(&mut host_read).await;
tokio::time::sleep(Duration::from_millis(400)).await;
assert!(
observed.load(Ordering::SeqCst),
"handler must observe Timeout"
);
}
#[tokio::test]
async fn recall_memory_typed_wrapper_round_trips() {
use std::sync::atomic::{AtomicBool, Ordering};
let observed = Arc::new(AtomicBool::new(false));
let observed_clone = observed.clone();
let adapter = PluginAdapter::new(TEST_MANIFEST)
.expect("manifest parses")
.on_broker_event(move |_t, _e, broker: BrokerSender| {
let observed = observed_clone.clone();
async move {
let result = broker.recall_memory("agent_x", "preference", 5).await;
match result {
Ok(entries) => {
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].agent_id, "agent_x");
observed.store(true, Ordering::SeqCst);
}
Err(e) => panic!("recall_memory must succeed, got {e}"),
}
}
});
let (mut host_write, mut host_read, _join) = run_adapter_on_duplex(adapter).await;
let trigger = serde_json::json!({
"jsonrpc": "2.0",
"method": "broker.event",
"params": {
"topic": "plugin.outbound.test",
"event": {
"id": "00000000-0000-0000-0000-000000000010",
"timestamp": "2026-05-01T00:00:00Z",
"topic": "plugin.outbound.test",
"source": "host",
"session_id": null,
"payload": {}
}
}
});
host_write
.write_all(format!("{}\n", trigger).as_bytes())
.await
.unwrap();
let req = read_reply_line(&mut host_read).await;
assert_eq!(req["method"], "memory.recall");
assert_eq!(req["params"]["agent_id"], "agent_x");
assert_eq!(req["params"]["query"], "preference");
assert_eq!(req["params"]["limit"], 5);
let response = serde_json::json!({
"jsonrpc": "2.0",
"id": req["id"],
"result": {
"entries": [{
"id": "00000000-0000-0000-0000-000000000001",
"agent_id": "agent_x",
"content": "user prefers concise",
"tags": ["preference"],
"concept_tags": [],
"created_at": "2026-05-01T00:00:00Z",
"memory_type": null
}]
}
});
host_write
.write_all(format!("{}\n", response).as_bytes())
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(150)).await;
assert!(
observed.load(Ordering::SeqCst),
"recall_memory wrapper must deserialize entries"
);
}
#[tokio::test]
async fn complete_llm_stream_yields_chunks_and_final_result() {
use std::sync::atomic::{AtomicBool, Ordering};
let observed = Arc::new(AtomicBool::new(false));
let observed_clone = observed.clone();
let adapter = PluginAdapter::new(TEST_MANIFEST)
.expect("manifest parses")
.on_broker_event(move |_t, _e, broker: BrokerSender| {
let observed = observed_clone.clone();
async move {
let params = LlmCompleteParams {
provider: "stub".into(),
model: "x".into(),
messages: vec![],
..Default::default()
};
let mut stream = match broker.complete_llm_stream(params).await {
Ok(s) => s,
Err(e) => panic!("complete_llm_stream open failed: {e}"),
};
let mut assembled = String::new();
while let Some(chunk) = stream.next_chunk().await {
assembled.push_str(&chunk);
}
let result = match stream.await_final().await {
Ok(r) => r,
Err(e) => panic!("await_final failed: {e}"),
};
assert_eq!(assembled, "hello world");
assert_eq!(result.finish_reason, "stop");
assert_eq!(result.usage.completion_tokens, 5);
observed.store(true, Ordering::SeqCst);
}
});
let (mut host_write, mut host_read, _join) = run_adapter_on_duplex(adapter).await;
let trigger = serde_json::json!({
"jsonrpc": "2.0",
"method": "broker.event",
"params": {
"topic": "plugin.outbound.test",
"event": {
"id": "00000000-0000-0000-0000-000000000010",
"timestamp": "2026-05-01T00:00:00Z",
"topic": "plugin.outbound.test",
"source": "host",
"session_id": null,
"payload": {}
}
}
});
host_write
.write_all(format!("{}\n", trigger).as_bytes())
.await
.unwrap();
let req = read_reply_line(&mut host_read).await;
assert_eq!(req["method"], "llm.complete");
assert_eq!(req["params"]["stream"], true);
let req_id = req["id"].clone();
for chunk in ["hello", " ", "world"] {
let delta = serde_json::json!({
"jsonrpc": "2.0",
"method": "llm.complete.delta",
"params": { "request_id": req_id, "chunk": chunk }
});
host_write
.write_all(format!("{}\n", delta).as_bytes())
.await
.unwrap();
}
let final_resp = serde_json::json!({
"jsonrpc": "2.0",
"id": req_id,
"result": {
"content": "",
"finish_reason": "stop",
"usage": { "prompt_tokens": 3, "completion_tokens": 5 }
}
});
host_write
.write_all(format!("{}\n", final_resp).as_bytes())
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
assert!(
observed.load(Ordering::SeqCst),
"handler must reassemble chunks + observe final result"
);
}
#[tokio::test]
async fn configure_handler_dispatch_returns_ok() {
use std::sync::Mutex;
let observed: Arc<Mutex<Option<serde_yaml::Value>>> = Arc::new(Mutex::new(None));
let observed_h = observed.clone();
let adapter = PluginAdapter::new(TEST_MANIFEST)
.expect("manifest parses")
.on_configure(move |value: serde_yaml::Value| {
let slot = observed_h.clone();
async move {
*slot.lock().unwrap() = Some(value);
Ok(())
}
});
let (mut host_write, mut host_read, _join) = run_adapter_on_duplex(adapter).await;
host_write
.write_all(
b"{\"jsonrpc\":\"2.0\",\"id\":3,\"method\":\"plugin.configure\",\
\"params\":{\"value\":{\"token\":\"abc\"}}}\n",
)
.await
.unwrap();
let reply = read_reply_line(&mut host_read).await;
assert_eq!(reply["id"], 3);
assert_eq!(
reply["result"],
serde_json::json!({}),
"configure ack must be empty result object",
);
let captured = observed.lock().unwrap().clone();
let m = captured.expect("handler observed value");
let m = m.as_mapping().expect("value is mapping");
assert_eq!(
m.get(serde_yaml::Value::String("token".into()))
.and_then(|v| v.as_str()),
Some("abc"),
);
}
#[tokio::test]
async fn credentials_list_dispatch_returns_accounts_and_warnings() {
let adapter = PluginAdapter::new(TEST_MANIFEST)
.expect("manifest parses")
.on_credentials_list(|| async {
Ok(CredentialsListReply {
accounts: vec!["main".into(), "secondary".into()],
warnings: vec!["w1".into()],
})
});
let (mut host_write, mut host_read, _join) = run_adapter_on_duplex(adapter).await;
host_write
.write_all(
b"{\"jsonrpc\":\"2.0\",\"id\":4,\"method\":\"plugin.credentials.list\",\"params\":{}}\n",
)
.await
.unwrap();
let reply = read_reply_line(&mut host_read).await;
assert_eq!(reply["id"], 4);
let accounts = reply["result"]["accounts"]
.as_array()
.expect("accounts array");
assert_eq!(accounts.len(), 2);
assert_eq!(accounts[0].as_str(), Some("main"));
assert_eq!(accounts[1].as_str(), Some("secondary"));
let warnings = reply["result"]["warnings"]
.as_array()
.expect("warnings array");
assert_eq!(warnings.len(), 1);
assert_eq!(warnings[0].as_str(), Some("w1"));
}
#[tokio::test]
async fn credentials_issue_dispatch_routes_to_handler() {
use std::sync::Mutex;
let observed: Arc<Mutex<Option<(String, String)>>> = Arc::new(Mutex::new(None));
let observed_h = observed.clone();
let adapter = PluginAdapter::new(TEST_MANIFEST)
.expect("manifest parses")
.on_credentials_issue(move |account_id, agent_id| {
let slot = observed_h.clone();
async move {
*slot.lock().unwrap() = Some((account_id, agent_id));
Ok(())
}
});
let (mut host_write, mut host_read, _join) = run_adapter_on_duplex(adapter).await;
host_write
.write_all(
b"{\"jsonrpc\":\"2.0\",\"id\":5,\"method\":\"plugin.credentials.issue\",\
\"params\":{\"account_id\":\"main\",\"agent_id\":\"alice\"}}\n",
)
.await
.unwrap();
let reply = read_reply_line(&mut host_read).await;
assert_eq!(reply["id"], 5);
assert_eq!(reply["result"]["ok"], serde_json::Value::Bool(true));
let captured = observed.lock().unwrap().clone();
assert_eq!(
captured,
Some(("main".to_string(), "alice".to_string())),
);
}
#[tokio::test]
async fn credentials_resolve_bytes_dispatch_returns_base64() {
let adapter = PluginAdapter::new(TEST_MANIFEST)
.expect("manifest parses")
.on_credentials_resolve_bytes(|_acc, _ag, _fp| async move {
Ok(vec![1u8, 2, 3, 4])
});
let (mut host_write, mut host_read, _join) = run_adapter_on_duplex(adapter).await;
host_write
.write_all(
b"{\"jsonrpc\":\"2.0\",\"id\":6,\"method\":\"plugin.credentials.resolve_bytes\",\
\"params\":{\"account_id\":\"main\",\"agent_id\":\"alice\",\"fingerprint\":\"abc\"}}\n",
)
.await
.unwrap();
let reply = read_reply_line(&mut host_read).await;
assert_eq!(reply["id"], 6);
assert_eq!(
reply["result"]["bytes_b64"].as_str(),
Some("AQIDBA=="),
"base64(vec![1,2,3,4]) == AQIDBA==",
);
}
#[tokio::test]
async fn credentials_method_without_handler_returns_method_not_found() {
let adapter = PluginAdapter::new(TEST_MANIFEST).expect("manifest parses");
let (mut host_write, mut host_read, _join) = run_adapter_on_duplex(adapter).await;
host_write
.write_all(
b"{\"jsonrpc\":\"2.0\",\"id\":7,\"method\":\"plugin.credentials.list\",\"params\":{}}\n",
)
.await
.unwrap();
let reply = read_reply_line(&mut host_read).await;
assert_eq!(reply["id"], 7);
assert_eq!(reply["error"]["code"], -32601);
assert!(
reply["error"]["message"]
.as_str()
.unwrap_or_default()
.starts_with("method not found"),
"expected method-not-found error, got {}",
reply["error"]["message"],
);
}
}