use anyhow::Result;
use sacp::ProxyToConductor;
use sacp::component::Component;
use sacp::{Handled, JrMessageHandler, MessageCx};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use tokio::sync::mpsc;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum JsonRpcMessage {
Request {
id: serde_json::Value,
#[serde(flatten)]
message: sacp::UntypedMessage,
},
Notification {
#[serde(flatten)]
message: sacp::UntypedMessage,
},
Reply {
id: serde_json::Value,
result: serde_json::Value,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogEntry {
pub timestamp: String,
pub direction: String,
pub message: JsonRpcMessage,
}
impl LogEntry {
fn new(direction: &str, message: JsonRpcMessage) -> Self {
Self {
timestamp: chrono::Utc::now().to_rfc3339(),
direction: direction.to_string(),
message,
}
}
}
pub struct LogWriter {
log_file: PathBuf,
receiver: mpsc::UnboundedReceiver<LogEntry>,
}
impl LogWriter {
pub fn new(log_file: PathBuf) -> (Self, mpsc::UnboundedSender<LogEntry>) {
let (tx, rx) = mpsc::unbounded_channel();
(
Self {
log_file,
receiver: rx,
},
tx,
)
}
pub async fn run(mut self) -> Result<()> {
use tokio::io::AsyncWriteExt;
let file = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&self.log_file)
.await?;
let mut writer = tokio::io::BufWriter::new(file);
while let Some(entry) = self.receiver.recv().await {
let json = serde_json::to_string(&entry)?;
writer.write_all(json.as_bytes()).await?;
writer.write_all(b"\n").await?;
writer.flush().await?;
}
Ok(())
}
}
pub struct TeeHandler {
log_tx: mpsc::UnboundedSender<LogEntry>,
next_id: u64,
}
impl TeeHandler {
pub fn new(log_tx: mpsc::UnboundedSender<LogEntry>) -> Self {
Self { log_tx, next_id: 1 }
}
fn log_entry(&self, entry: LogEntry) {
let _ = self.log_tx.send(entry);
}
fn allocate_id(&mut self) -> u64 {
let id = self.next_id;
self.next_id += 1;
id
}
}
impl JrMessageHandler for TeeHandler {
type Role = ProxyToConductor;
fn describe_chain(&self) -> impl std::fmt::Debug {
"tee"
}
async fn handle_message(
&mut self,
message: MessageCx,
_cx: sacp::JrConnectionCx<ProxyToConductor>,
) -> Result<Handled<MessageCx>, sacp::Error> {
match message {
MessageCx::Request(request, request_cx) => {
let synthetic_id = self.allocate_id();
let json_msg = JsonRpcMessage::Request {
id: serde_json::json!(synthetic_id),
message: request.clone(),
};
let entry = LogEntry::new("downstream", json_msg);
self.log_entry(entry);
let log_tx = self.log_tx.clone();
let wrapped_cx = request_cx.wrap_params(move |_method, result| {
let result_value = match &result {
Ok(value) => serde_json::to_value(value).unwrap_or(serde_json::Value::Null),
Err(e) => serde_json::json!({ "error": e.to_string() }),
};
let json_msg = JsonRpcMessage::Reply {
id: serde_json::json!(synthetic_id),
result: result_value,
};
let entry = LogEntry::new("upstream", json_msg);
let _ = log_tx.send(entry);
result
});
Ok(Handled::No {
message: MessageCx::Request(request, wrapped_cx),
retry: false,
})
}
MessageCx::Notification(notification) => {
let json_msg = JsonRpcMessage::Notification {
message: notification.clone(),
};
let entry = LogEntry::new("downstream", json_msg);
self.log_entry(entry);
Ok(Handled::No {
message: MessageCx::Notification(notification),
retry: false,
})
}
}
}
}
pub struct Tee {
log_file: PathBuf,
}
impl Tee {
pub fn new(log_file: PathBuf) -> Self {
Self { log_file }
}
}
impl Component for Tee {
async fn serve(self, client: impl Component) -> Result<(), sacp::Error> {
let (log_writer, log_tx) = LogWriter::new(self.log_file.clone());
tokio::spawn(async move {
if let Err(e) = log_writer.run().await {
tracing::error!("Log writer failed: {}", e);
}
});
ProxyToConductor::builder()
.name("sacp-tee")
.with_handler(TeeHandler::new(log_tx))
.connect_to(client)?
.serve()
.await
}
}
pub async fn run_raw(log_file: PathBuf, downstream: sacp_tokio::AcpAgent) -> Result<()> {
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
tracing_subscriber::fmt()
.with_writer(std::io::stderr)
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.init();
tracing::info!(
"Starting sacp-tee in raw mode, logging to: {}",
log_file.display()
);
let log_file = tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&log_file)
.await?;
let mut log_writer = tokio::io::BufWriter::new(log_file);
let (mut child_stdin, child_stdout, _child_stderr, _child) = downstream.spawn_process()?;
let mut child_stdout = BufReader::new(child_stdout).lines();
let stdin = tokio::io::stdin();
let mut stdin_lines = BufReader::new(stdin).lines();
let mut stdout = tokio::io::stdout();
loop {
tokio::select! {
line = stdin_lines.next_line() => {
match line? {
Some(line) => {
child_stdin.write_all(line.as_bytes()).await?;
child_stdin.write_all(b"\n").await?;
child_stdin.flush().await?;
log_writer.write_all(b"\xE2\x86\x92 ").await?; log_writer.write_all(line.as_bytes()).await?;
log_writer.write_all(b"\n").await?;
log_writer.flush().await?;
}
None => {
break;
}
}
}
line = child_stdout.next_line() => {
match line? {
Some(line) => {
stdout.write_all(line.as_bytes()).await?;
stdout.write_all(b"\n").await?;
stdout.flush().await?;
log_writer.write_all(b"\xE2\x86\x90 ").await?; log_writer.write_all(line.as_bytes()).await?;
log_writer.write_all(b"\n").await?;
log_writer.flush().await?;
}
None => {
break;
}
}
}
}
}
Ok(())
}
pub async fn run(log_file: PathBuf) -> Result<()> {
tracing_subscriber::fmt()
.with_writer(std::io::stderr)
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.init();
tracing::info!("Starting sacp-tee, logging to: {}", log_file.display());
Tee::new(log_file).serve(sacp_tokio::Stdio::new()).await?;
Ok(())
}