1use anyhow::Result;
7use sacp::ProxyToConductor;
8use sacp::component::Component;
9use sacp::{Handled, JrMessageHandler, MessageCx};
10use serde::{Deserialize, Serialize};
11use std::path::PathBuf;
12use tokio::sync::mpsc;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16#[serde(untagged)]
17pub enum JsonRpcMessage {
18 Request {
19 id: serde_json::Value,
20 #[serde(flatten)]
21 message: sacp::UntypedMessage,
22 },
23 Notification {
24 #[serde(flatten)]
25 message: sacp::UntypedMessage,
26 },
27 Reply {
28 id: serde_json::Value,
29 result: serde_json::Value,
30 },
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct LogEntry {
36 pub timestamp: String,
37 pub direction: String,
38 pub message: JsonRpcMessage,
39}
40
41impl LogEntry {
42 fn new(direction: &str, message: JsonRpcMessage) -> Self {
43 Self {
44 timestamp: chrono::Utc::now().to_rfc3339(),
45 direction: direction.to_string(),
46 message,
47 }
48 }
49}
50
51pub struct LogWriter {
53 log_file: PathBuf,
54 receiver: mpsc::UnboundedReceiver<LogEntry>,
55}
56
57impl LogWriter {
58 pub fn new(log_file: PathBuf) -> (Self, mpsc::UnboundedSender<LogEntry>) {
59 let (tx, rx) = mpsc::unbounded_channel();
60 (
61 Self {
62 log_file,
63 receiver: rx,
64 },
65 tx,
66 )
67 }
68
69 pub async fn run(mut self) -> Result<()> {
71 use tokio::io::AsyncWriteExt;
72
73 let file = tokio::fs::OpenOptions::new()
74 .create(true)
75 .append(true)
76 .open(&self.log_file)
77 .await?;
78 let mut writer = tokio::io::BufWriter::new(file);
79
80 while let Some(entry) = self.receiver.recv().await {
81 let json = serde_json::to_string(&entry)?;
82 writer.write_all(json.as_bytes()).await?;
83 writer.write_all(b"\n").await?;
84 writer.flush().await?;
85 }
86
87 Ok(())
88 }
89}
90
91pub struct TeeHandler {
93 log_tx: mpsc::UnboundedSender<LogEntry>,
94 next_id: u64,
95}
96
97impl TeeHandler {
98 pub fn new(log_tx: mpsc::UnboundedSender<LogEntry>) -> Self {
99 Self { log_tx, next_id: 1 }
100 }
101
102 fn log_entry(&self, entry: LogEntry) {
103 let _ = self.log_tx.send(entry);
105 }
106
107 fn allocate_id(&mut self) -> u64 {
108 let id = self.next_id;
109 self.next_id += 1;
110 id
111 }
112}
113
114impl JrMessageHandler for TeeHandler {
115 type Link = ProxyToConductor;
116
117 fn describe_chain(&self) -> impl std::fmt::Debug {
118 "tee"
119 }
120
121 async fn handle_message(
122 &mut self,
123 message: MessageCx,
124 _cx: sacp::JrConnectionCx<ProxyToConductor>,
125 ) -> Result<Handled<MessageCx>, sacp::Error> {
126 match message {
127 MessageCx::Request(request, request_cx) => {
128 let synthetic_id = self.allocate_id();
130
131 let json_msg = JsonRpcMessage::Request {
133 id: serde_json::json!(synthetic_id),
134 message: request.clone(),
135 };
136 let entry = LogEntry::new("downstream", json_msg);
137 self.log_entry(entry);
138
139 let log_tx = self.log_tx.clone();
141
142 let wrapped_cx = request_cx.wrap_params(move |_method, result| {
143 let result_value = match &result {
145 Ok(value) => serde_json::to_value(value).unwrap_or(serde_json::Value::Null),
146 Err(e) => serde_json::json!({ "error": e.to_string() }),
147 };
148
149 let json_msg = JsonRpcMessage::Reply {
150 id: serde_json::json!(synthetic_id),
151 result: result_value,
152 };
153 let entry = LogEntry::new("upstream", json_msg);
154
155 let _ = log_tx.send(entry);
156
157 result
158 });
159
160 Ok(Handled::No {
162 message: MessageCx::Request(request, wrapped_cx),
163 retry: false,
164 })
165 }
166 MessageCx::Notification(notification) => {
167 let json_msg = JsonRpcMessage::Notification {
169 message: notification.clone(),
170 };
171 let entry = LogEntry::new("downstream", json_msg);
172 self.log_entry(entry);
173
174 Ok(Handled::No {
176 message: MessageCx::Notification(notification),
177 retry: false,
178 })
179 }
180 }
181 }
182}
183
184pub struct Tee {
187 log_file: PathBuf,
188}
189
190impl Tee {
191 pub fn new(log_file: PathBuf) -> Self {
192 Self { log_file }
193 }
194}
195
196impl Component<sacp::link::ProxyToConductor> for Tee {
197 async fn serve(
198 self,
199 client: impl Component<sacp::link::ConductorToProxy>,
200 ) -> Result<(), sacp::Error> {
201 let (log_writer, log_tx) = LogWriter::new(self.log_file.clone());
203
204 tokio::spawn(async move {
206 if let Err(e) = log_writer.run().await {
207 tracing::error!("Log writer failed: {}", e);
208 }
209 });
210
211 ProxyToConductor::builder()
213 .name("sacp-tee")
214 .with_handler(TeeHandler::new(log_tx))
215 .connect_to(client)?
216 .serve()
217 .await
218 }
219}
220
221pub async fn run_raw(log_file: PathBuf, downstream: sacp_tokio::AcpAgent) -> Result<()> {
223 use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
224
225 tracing_subscriber::fmt()
227 .with_writer(std::io::stderr)
228 .with_env_filter(
229 tracing_subscriber::EnvFilter::try_from_default_env()
230 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
231 )
232 .init();
233
234 tracing::info!(
235 "Starting sacp-tee in raw mode, logging to: {}",
236 log_file.display()
237 );
238
239 let log_file = tokio::fs::OpenOptions::new()
241 .create(true)
242 .append(true)
243 .open(&log_file)
244 .await?;
245 let mut log_writer = tokio::io::BufWriter::new(log_file);
246
247 let (mut child_stdin, child_stdout, _child_stderr, _child) = downstream.spawn_process()?;
249 let mut child_stdout = BufReader::new(child_stdout).lines();
250
251 let stdin = tokio::io::stdin();
253 let mut stdin_lines = BufReader::new(stdin).lines();
254 let mut stdout = tokio::io::stdout();
255
256 loop {
257 tokio::select! {
258 line = stdin_lines.next_line() => {
260 match line? {
261 Some(line) => {
262 child_stdin.write_all(line.as_bytes()).await?;
264 child_stdin.write_all(b"\n").await?;
265 child_stdin.flush().await?;
266
267 log_writer.write_all(b"\xE2\x86\x92 ").await?; log_writer.write_all(line.as_bytes()).await?;
270 log_writer.write_all(b"\n").await?;
271 log_writer.flush().await?;
272 }
273 None => {
274 break;
276 }
277 }
278 }
279
280 line = child_stdout.next_line() => {
282 match line? {
283 Some(line) => {
284 stdout.write_all(line.as_bytes()).await?;
286 stdout.write_all(b"\n").await?;
287 stdout.flush().await?;
288
289 log_writer.write_all(b"\xE2\x86\x90 ").await?; log_writer.write_all(line.as_bytes()).await?;
292 log_writer.write_all(b"\n").await?;
293 log_writer.flush().await?;
294 }
295 None => {
296 break;
298 }
299 }
300 }
301 }
302 }
303
304 Ok(())
305}
306
307pub async fn run(log_file: PathBuf) -> Result<()> {
309 tracing_subscriber::fmt()
311 .with_writer(std::io::stderr)
312 .with_env_filter(
313 tracing_subscriber::EnvFilter::try_from_default_env()
314 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
315 )
316 .init();
317
318 tracing::info!("Starting sacp-tee, logging to: {}", log_file.display());
319
320 Tee::new(log_file).serve(sacp_tokio::Stdio::new()).await?;
321
322 Ok(())
323}