use std::sync::Arc;
use async_trait::async_trait;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::Mutex;
use super::{AdminError, AdminSender};
pub struct WriterAdminSender<W>
where
W: AsyncWrite + Unpin + Send + 'static,
{
writer: Arc<Mutex<W>>,
}
impl<W> std::fmt::Debug for WriterAdminSender<W>
where
W: AsyncWrite + Unpin + Send + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WriterAdminSender").finish_non_exhaustive()
}
}
impl<W> WriterAdminSender<W>
where
W: AsyncWrite + Unpin + Send + 'static,
{
pub fn new(writer: Arc<Mutex<W>>) -> Self {
Self { writer }
}
}
#[async_trait]
impl<W> AdminSender for WriterAdminSender<W>
where
W: AsyncWrite + Unpin + Send + 'static,
{
async fn send_line(&self, line: String) -> Result<(), AdminError> {
let mut buf = line;
buf.push('\n');
let mut guard = self.writer.lock().await;
guard
.write_all(buf.as_bytes())
.await
.map_err(|e| AdminError::Transport(e.to_string()))?;
guard
.flush()
.await
.map_err(|e| AdminError::Transport(e.to_string()))?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{Microapp, ToolCtx, ToolError, ToolReply};
use serde_json::Value;
use tokio::io::DuplexStream;
#[tokio::test]
async fn writer_admin_sender_round_trips_one_line() {
let (write_half, mut read_half) = tokio::io::duplex(1024);
let writer = Arc::new(Mutex::new(write_half));
let sender = WriterAdminSender::new(writer);
sender.send_line("hello".into()).await.unwrap();
let mut buf = vec![0u8; 16];
use tokio::io::AsyncReadExt;
let n = read_half.read(&mut buf).await.unwrap();
assert_eq!(&buf[..n], b"hello\n");
}
#[tokio::test]
async fn admin_round_trip_via_runtime() {
async fn echo_via_admin(_args: Value, ctx: ToolCtx) -> Result<ToolReply, ToolError> {
let admin = ctx.admin().expect("admin wired");
let v: Value = admin
.call_raw("nexo/admin/echo", serde_json::json!({"x": 1}))
.await
.map_err(|e| ToolError::Internal(e.to_string()))?;
Ok(ToolReply::ok_json(v))
}
let app = Microapp::new("test", "0.0.0")
.with_admin()
.with_tool("call_admin", echo_via_admin);
let (client_to_app, app_stdin) = tokio::io::duplex(1024);
let (app_stdout, mut client_from_app) = tokio::io::duplex(1024);
let runner = tokio::spawn(async move {
app.run_with(tokio::io::BufReader::new(app_stdin), app_stdout)
.await
});
let req = serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": { "name": "call_admin", "arguments": {} },
});
use tokio::io::AsyncWriteExt;
let mut stdin = client_to_app;
let line = format!("{req}\n");
stdin.write_all(line.as_bytes()).await.unwrap();
stdin.flush().await.unwrap();
use tokio::io::AsyncReadExt;
let mut buf = vec![0u8; 1024];
let n = client_from_app.read(&mut buf).await.unwrap();
let admin_frame: Value =
serde_json::from_slice(buf[..n].split(|b| *b == b'\n').next().unwrap()).unwrap();
assert_eq!(admin_frame["method"], "nexo/admin/echo");
let admin_id = admin_frame["id"].as_str().unwrap().to_string();
assert!(admin_id.starts_with("app:"));
let admin_resp = serde_json::json!({
"jsonrpc": "2.0",
"id": admin_id,
"result": { "echoed": { "x": 1 } },
});
let line = format!("{admin_resp}\n");
stdin.write_all(line.as_bytes()).await.unwrap();
stdin.flush().await.unwrap();
let mut buf2 = vec![0u8; 1024];
let mut full = Vec::new();
loop {
let n = client_from_app.read(&mut buf2).await.unwrap();
if n == 0 {
break;
}
full.extend_from_slice(&buf2[..n]);
if full.iter().filter(|b| **b == b'\n').count() >= 1 {
break;
}
}
let tool_reply: Value =
serde_json::from_slice(full.split(|b| *b == b'\n').next().unwrap()).unwrap();
assert_eq!(tool_reply["id"], serde_json::json!(1));
drop(stdin);
drop(client_from_app);
runner.abort();
let _ = runner.await;
}
#[tokio::test]
async fn writer_admin_sender_serialises_concurrent_lines() {
let (write_half, mut read_half) = tokio::io::duplex(1024);
let writer: Arc<Mutex<DuplexStream>> = Arc::new(Mutex::new(write_half));
let s1 = WriterAdminSender::new(writer.clone());
let s2 = WriterAdminSender::new(writer.clone());
let t1 = tokio::spawn(async move { s1.send_line("AAAA".into()).await });
let t2 = tokio::spawn(async move { s2.send_line("BBBB".into()).await });
t1.await.unwrap().unwrap();
t2.await.unwrap().unwrap();
let mut text = String::new();
use tokio::io::AsyncReadExt;
let mut buf = vec![0u8; 32];
let n = read_half.read(&mut buf).await.unwrap();
text.push_str(std::str::from_utf8(&buf[..n]).unwrap());
let mut lines: Vec<&str> = text.lines().collect();
lines.sort();
assert_eq!(lines, vec!["AAAA", "BBBB"]);
}
}