Skip to main content

agent_client_protocol/
stdio.rs

1//! Stdio transport for connecting ACP components via standard input/output.
2
3use crate::acp_agent::LineDirection;
4use crate::{ByteStreams, ConnectTo, Role};
5use std::sync::Arc;
6
7/// A transport that connects to an ACP peer via standard input/output.
8///
9/// This is useful for building agents or proxies that communicate over stdio,
10/// which is the standard transport for MCP and ACP subprocess communication.
11pub struct Stdio {
12    debug_callback: Option<Arc<dyn Fn(&str, LineDirection) + Send + Sync + 'static>>,
13}
14
15impl std::fmt::Debug for Stdio {
16    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
17        f.debug_struct("Stdio").finish_non_exhaustive()
18    }
19}
20
21impl Stdio {
22    /// Create a new `Stdio` transport.
23    #[must_use]
24    pub fn new() -> Self {
25        Self {
26            debug_callback: None,
27        }
28    }
29
30    /// Add a debug callback that will be invoked for each line sent/received.
31    #[must_use]
32    pub fn with_debug<F>(mut self, callback: F) -> Self
33    where
34        F: Fn(&str, LineDirection) + Send + Sync + 'static,
35    {
36        self.debug_callback = Some(Arc::new(callback));
37        self
38    }
39}
40
41impl Default for Stdio {
42    fn default() -> Self {
43        Self::new()
44    }
45}
46
47impl<Counterpart: Role> ConnectTo<Counterpart> for Stdio {
48    async fn connect_to(
49        self,
50        client: impl ConnectTo<Counterpart::Counterpart>,
51    ) -> Result<(), crate::Error> {
52        let stdin = blocking::Unblock::new(std::io::stdin());
53        let stdout = blocking::Unblock::new(std::io::stdout());
54
55        if let Some(callback) = self.debug_callback {
56            use futures::io::BufReader;
57            use futures::{AsyncBufReadExt, AsyncWriteExt, StreamExt};
58
59            let incoming_callback = callback.clone();
60            let incoming_lines = Box::pin(BufReader::new(stdin).lines().inspect(move |result| {
61                if let Ok(line) = result {
62                    incoming_callback(line, LineDirection::Stdin);
63                }
64            }))
65                as std::pin::Pin<Box<dyn futures::Stream<Item = std::io::Result<String>> + Send>>;
66
67            let outgoing_sink = Box::pin(futures::sink::unfold(
68                (stdout, callback),
69                async move |(mut writer, callback), line: String| {
70                    callback(&line, LineDirection::Stdout);
71                    let mut bytes = line.into_bytes();
72                    bytes.push(b'\n');
73                    writer.write_all(&bytes).await?;
74                    Ok::<_, std::io::Error>((writer, callback))
75                },
76            ))
77                as std::pin::Pin<Box<dyn futures::Sink<String, Error = std::io::Error> + Send>>;
78
79            ConnectTo::<Counterpart>::connect_to(
80                crate::Lines::new(outgoing_sink, incoming_lines),
81                client,
82            )
83            .await
84        } else {
85            ConnectTo::<Counterpart>::connect_to(ByteStreams::new(stdout, stdin), client).await
86        }
87    }
88}