Skip to main content

sacp_tokio/
lib.rs

1//! Tokio-based utilities for SACP
2//!
3//! This crate provides higher-level functionality for working with SACP
4//! that requires the Tokio async runtime, such as spawning agent processes
5//! and creating connections.
6
7mod acp_agent;
8
9pub use acp_agent::{AcpAgent, LineDirection};
10use sacp::{ByteStreams, ConnectTo, Role};
11use std::sync::Arc;
12use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
13
14pub struct Stdio {
15    debug_callback: Option<Arc<dyn Fn(&str, LineDirection) + Send + Sync + 'static>>,
16}
17
18impl Stdio {
19    pub fn new() -> Self {
20        Self {
21            debug_callback: None,
22        }
23    }
24
25    pub fn with_debug<F>(mut self, callback: F) -> Self
26    where
27        F: Fn(&str, LineDirection) + Send + Sync + 'static,
28    {
29        self.debug_callback = Some(Arc::new(callback));
30        self
31    }
32}
33
34impl Default for Stdio {
35    fn default() -> Self {
36        Self::new()
37    }
38}
39
40impl<Counterpart: Role> ConnectTo<Counterpart> for Stdio {
41    async fn connect_to(
42        self,
43        client: impl ConnectTo<Counterpart::Counterpart>,
44    ) -> Result<(), sacp::Error> {
45        if let Some(callback) = self.debug_callback {
46            use futures::AsyncBufReadExt;
47            use futures::AsyncWriteExt;
48            use futures::StreamExt;
49            use futures::io::BufReader;
50
51            // With debug: use Lines with interception
52            let stdin = tokio::io::stdin();
53            let stdout = tokio::io::stdout();
54
55            // Convert stdio to line streams with debug inspection
56            let incoming_callback = callback.clone();
57            let incoming_lines = Box::pin(BufReader::new(stdin.compat()).lines().inspect(
58                move |result| {
59                    if let Ok(line) = result {
60                        incoming_callback(line, LineDirection::Stdin);
61                    }
62                },
63            ))
64                as std::pin::Pin<Box<dyn futures::Stream<Item = std::io::Result<String>> + Send>>;
65
66            // Create a sink that writes lines with debug logging
67            let outgoing_sink = Box::pin(futures::sink::unfold(
68                (stdout.compat_write(), callback),
69                async move |(mut writer, callback), line: String| {
70                    callback(&line, LineDirection::Stdout);
71                    let mut bytes = line.into_bytes();
72                    bytes.push(b'\n');
73                    writer.write_all(&bytes).await?;
74                    Ok::<_, std::io::Error>((writer, callback))
75                },
76            ))
77                as std::pin::Pin<Box<dyn futures::Sink<String, Error = std::io::Error> + Send>>;
78
79            ConnectTo::<Counterpart>::connect_to(
80                sacp::Lines::new(outgoing_sink, incoming_lines),
81                client,
82            )
83            .await
84        } else {
85            // Without debug: use simple ByteStreams
86            ConnectTo::<Counterpart>::connect_to(
87                ByteStreams::new(
88                    tokio::io::stdout().compat_write(),
89                    tokio::io::stdin().compat(),
90                ),
91                client,
92            )
93            .await
94        }
95    }
96}