Skip to main content

sacp_tee/
lib.rs

1//! sacp-tee - A debugging proxy that logs all ACP traffic
2//!
3//! This proxy sits transparently between two ACP endpoints, passing messages through
4//! while logging them to a file for debugging purposes.
5
6use anyhow::Result;
7use sacp::ProxyToConductor;
8use sacp::component::Component;
9use sacp::{Handled, JrMessageHandler, MessageCx};
10use serde::{Deserialize, Serialize};
11use std::path::PathBuf;
12use tokio::sync::mpsc;
13
14/// A JSON-RPC message representation for logging
15#[derive(Debug, Clone, Serialize, Deserialize)]
16#[serde(untagged)]
17pub enum JsonRpcMessage {
18    Request {
19        id: serde_json::Value,
20        #[serde(flatten)]
21        message: sacp::UntypedMessage,
22    },
23    Notification {
24        #[serde(flatten)]
25        message: sacp::UntypedMessage,
26    },
27    Reply {
28        id: serde_json::Value,
29        result: serde_json::Value,
30    },
31}
32
33/// A log entry representing a message passing through the proxy
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct LogEntry {
36    pub timestamp: String,
37    pub direction: String,
38    pub message: JsonRpcMessage,
39}
40
41impl LogEntry {
42    fn new(direction: &str, message: JsonRpcMessage) -> Self {
43        Self {
44            timestamp: chrono::Utc::now().to_rfc3339(),
45            direction: direction.to_string(),
46            message,
47        }
48    }
49}
50
51/// Log writer actor that receives log entries and writes them to disk
52pub struct LogWriter {
53    log_file: PathBuf,
54    receiver: mpsc::UnboundedReceiver<LogEntry>,
55}
56
57impl LogWriter {
58    pub fn new(log_file: PathBuf) -> (Self, mpsc::UnboundedSender<LogEntry>) {
59        let (tx, rx) = mpsc::unbounded_channel();
60        (
61            Self {
62                log_file,
63                receiver: rx,
64            },
65            tx,
66        )
67    }
68
69    /// Run the log writer actor
70    pub async fn run(mut self) -> Result<()> {
71        use tokio::io::AsyncWriteExt;
72
73        let file = tokio::fs::OpenOptions::new()
74            .create(true)
75            .append(true)
76            .open(&self.log_file)
77            .await?;
78        let mut writer = tokio::io::BufWriter::new(file);
79
80        while let Some(entry) = self.receiver.recv().await {
81            let json = serde_json::to_string(&entry)?;
82            writer.write_all(json.as_bytes()).await?;
83            writer.write_all(b"\n").await?;
84            writer.flush().await?;
85        }
86
87        Ok(())
88    }
89}
90
91/// Handler that logs messages passing through
92pub struct TeeHandler {
93    log_tx: mpsc::UnboundedSender<LogEntry>,
94    next_id: u64,
95}
96
97impl TeeHandler {
98    pub fn new(log_tx: mpsc::UnboundedSender<LogEntry>) -> Self {
99        Self { log_tx, next_id: 1 }
100    }
101
102    fn log_entry(&self, entry: LogEntry) {
103        // Fire and forget - if the channel is closed, we just drop the log
104        let _ = self.log_tx.send(entry);
105    }
106
107    fn allocate_id(&mut self) -> u64 {
108        let id = self.next_id;
109        self.next_id += 1;
110        id
111    }
112}
113
114impl JrMessageHandler for TeeHandler {
115    type Link = ProxyToConductor;
116
117    fn describe_chain(&self) -> impl std::fmt::Debug {
118        "tee"
119    }
120
121    async fn handle_message(
122        &mut self,
123        message: MessageCx,
124        _cx: sacp::JrConnectionCx<ProxyToConductor>,
125    ) -> Result<Handled<MessageCx>, sacp::Error> {
126        match message {
127            MessageCx::Request(request, request_cx) => {
128                // Allocate a synthetic ID for tracking this request/response pair
129                let synthetic_id = self.allocate_id();
130
131                // Log the outgoing request
132                let json_msg = JsonRpcMessage::Request {
133                    id: serde_json::json!(synthetic_id),
134                    message: request.clone(),
135                };
136                let entry = LogEntry::new("downstream", json_msg);
137                self.log_entry(entry);
138
139                // Wrap the request context to log the response when it comes back
140                let log_tx = self.log_tx.clone();
141
142                let wrapped_cx = request_cx.wrap_params(move |_method, result| {
143                    // Log the response
144                    let result_value = match &result {
145                        Ok(value) => serde_json::to_value(value).unwrap_or(serde_json::Value::Null),
146                        Err(e) => serde_json::json!({ "error": e.to_string() }),
147                    };
148
149                    let json_msg = JsonRpcMessage::Reply {
150                        id: serde_json::json!(synthetic_id),
151                        result: result_value,
152                    };
153                    let entry = LogEntry::new("upstream", json_msg);
154
155                    let _ = log_tx.send(entry);
156
157                    result
158                });
159
160                // Return unhandled with the wrapped context
161                Ok(Handled::No {
162                    message: MessageCx::Request(request, wrapped_cx),
163                    retry: false,
164                })
165            }
166            MessageCx::Notification(notification) => {
167                // Log the notification
168                let json_msg = JsonRpcMessage::Notification {
169                    message: notification.clone(),
170                };
171                let entry = LogEntry::new("downstream", json_msg);
172                self.log_entry(entry);
173
174                // Return unhandled so it continues down the chain
175                Ok(Handled::No {
176                    message: MessageCx::Notification(notification),
177                    retry: false,
178                })
179            }
180        }
181    }
182}
183
184/// The Tee component - can be used as a component in a larger proxy chain
185/// or run standalone as a binary
186pub struct Tee {
187    log_file: PathBuf,
188}
189
190impl Tee {
191    pub fn new(log_file: PathBuf) -> Self {
192        Self { log_file }
193    }
194}
195
196impl Component<sacp::link::ProxyToConductor> for Tee {
197    async fn serve(
198        self,
199        client: impl Component<sacp::link::ConductorToProxy>,
200    ) -> Result<(), sacp::Error> {
201        // Create the log writer actor
202        let (log_writer, log_tx) = LogWriter::new(self.log_file.clone());
203
204        // Spawn the log writer
205        tokio::spawn(async move {
206            if let Err(e) = log_writer.run().await {
207                tracing::error!("Log writer failed: {}", e);
208            }
209        });
210
211        // Create the connection
212        ProxyToConductor::builder()
213            .name("sacp-tee")
214            .with_handler(TeeHandler::new(log_tx))
215            .connect_to(client)?
216            .serve()
217            .await
218    }
219}
220
221/// Run the tee in raw mode - just log lines without parsing
222pub async fn run_raw(log_file: PathBuf, downstream: sacp_tokio::AcpAgent) -> Result<()> {
223    use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
224
225    // Initialize tracing
226    tracing_subscriber::fmt()
227        .with_writer(std::io::stderr)
228        .with_env_filter(
229            tracing_subscriber::EnvFilter::try_from_default_env()
230                .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
231        )
232        .init();
233
234    tracing::info!(
235        "Starting sacp-tee in raw mode, logging to: {}",
236        log_file.display()
237    );
238
239    // Open log file
240    let log_file = tokio::fs::OpenOptions::new()
241        .create(true)
242        .append(true)
243        .open(&log_file)
244        .await?;
245    let mut log_writer = tokio::io::BufWriter::new(log_file);
246
247    // Spawn the downstream process
248    let (mut child_stdin, child_stdout, _child_stderr, _child) = downstream.spawn_process()?;
249    let mut child_stdout = BufReader::new(child_stdout).lines();
250
251    // Get stdin/stdout
252    let stdin = tokio::io::stdin();
253    let mut stdin_lines = BufReader::new(stdin).lines();
254    let mut stdout = tokio::io::stdout();
255
256    loop {
257        tokio::select! {
258            // Read from stdin, write to child, log outgoing
259            line = stdin_lines.next_line() => {
260                match line? {
261                    Some(line) => {
262                        // Write to child
263                        child_stdin.write_all(line.as_bytes()).await?;
264                        child_stdin.write_all(b"\n").await?;
265                        child_stdin.flush().await?;
266
267                        // Log outgoing
268                        log_writer.write_all(b"\xE2\x86\x92 ").await?; // → arrow
269                        log_writer.write_all(line.as_bytes()).await?;
270                        log_writer.write_all(b"\n").await?;
271                        log_writer.flush().await?;
272                    }
273                    None => {
274                        // stdin closed
275                        break;
276                    }
277                }
278            }
279
280            // Read from child, write to stdout, log incoming
281            line = child_stdout.next_line() => {
282                match line? {
283                    Some(line) => {
284                        // Write to stdout
285                        stdout.write_all(line.as_bytes()).await?;
286                        stdout.write_all(b"\n").await?;
287                        stdout.flush().await?;
288
289                        // Log incoming
290                        log_writer.write_all(b"\xE2\x86\x90 ").await?; // ← arrow
291                        log_writer.write_all(line.as_bytes()).await?;
292                        log_writer.write_all(b"\n").await?;
293                        log_writer.flush().await?;
294                    }
295                    None => {
296                        // child stdout closed
297                        break;
298                    }
299                }
300            }
301        }
302    }
303
304    Ok(())
305}
306
307/// Run the tee proxy as a standalone binary connected to stdio
308pub async fn run(log_file: PathBuf) -> Result<()> {
309    // Initialize tracing
310    tracing_subscriber::fmt()
311        .with_writer(std::io::stderr)
312        .with_env_filter(
313            tracing_subscriber::EnvFilter::try_from_default_env()
314                .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
315        )
316        .init();
317
318    tracing::info!("Starting sacp-tee, logging to: {}", log_file.display());
319
320    Tee::new(log_file).serve(sacp_tokio::Stdio::new()).await?;
321
322    Ok(())
323}