1mod acp_agent;
8
9pub use acp_agent::{AcpAgent, LineDirection};
10use sacp::{ByteStreams, ConnectTo, Role};
11use std::sync::Arc;
12use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
13
14pub struct Stdio {
15 debug_callback: Option<Arc<dyn Fn(&str, LineDirection) + Send + Sync + 'static>>,
16}
17
18impl Stdio {
19 pub fn new() -> Self {
20 Self {
21 debug_callback: None,
22 }
23 }
24
25 pub fn with_debug<F>(mut self, callback: F) -> Self
26 where
27 F: Fn(&str, LineDirection) + Send + Sync + 'static,
28 {
29 self.debug_callback = Some(Arc::new(callback));
30 self
31 }
32}
33
34impl Default for Stdio {
35 fn default() -> Self {
36 Self::new()
37 }
38}
39
40impl<Counterpart: Role> ConnectTo<Counterpart> for Stdio {
41 async fn connect_to(
42 self,
43 client: impl ConnectTo<Counterpart::Counterpart>,
44 ) -> Result<(), sacp::Error> {
45 if let Some(callback) = self.debug_callback {
46 use futures::AsyncBufReadExt;
47 use futures::AsyncWriteExt;
48 use futures::StreamExt;
49 use futures::io::BufReader;
50
51 let stdin = tokio::io::stdin();
53 let stdout = tokio::io::stdout();
54
55 let incoming_callback = callback.clone();
57 let incoming_lines = Box::pin(BufReader::new(stdin.compat()).lines().inspect(
58 move |result| {
59 if let Ok(line) = result {
60 incoming_callback(line, LineDirection::Stdin);
61 }
62 },
63 ))
64 as std::pin::Pin<Box<dyn futures::Stream<Item = std::io::Result<String>> + Send>>;
65
66 let outgoing_sink = Box::pin(futures::sink::unfold(
68 (stdout.compat_write(), callback),
69 async move |(mut writer, callback), line: String| {
70 callback(&line, LineDirection::Stdout);
71 let mut bytes = line.into_bytes();
72 bytes.push(b'\n');
73 writer.write_all(&bytes).await?;
74 Ok::<_, std::io::Error>((writer, callback))
75 },
76 ))
77 as std::pin::Pin<Box<dyn futures::Sink<String, Error = std::io::Error> + Send>>;
78
79 ConnectTo::<Counterpart>::connect_to(
80 sacp::Lines::new(outgoing_sink, incoming_lines),
81 client,
82 )
83 .await
84 } else {
85 ConnectTo::<Counterpart>::connect_to(
87 ByteStreams::new(
88 tokio::io::stdout().compat_write(),
89 tokio::io::stdin().compat(),
90 ),
91 client,
92 )
93 .await
94 }
95 }
96}