Skip to main content

ephem_debugger_rs/
bridge.rs

1//! IPC bridge — NDJSON server over TCP (Windows) or Unix socket.
2//!
3//! The bridge listens for connections from the `dbg` CLI and responds to
4//! query commands by reading from the [`LogStore`].
5
6use std::sync::Arc;
7
8use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
9use tokio::net::TcpListener;
10use tokio::sync::Notify;
11
12use crate::protocol::{LogEntry, QueryRequest, QueryResponse};
13use crate::store::LogStore;
14
15/// IPC bridge that the `dbg` CLI connects to.
16///
17/// On Windows the bridge listens on TCP `127.0.0.1:0` and writes the
18/// address to `.debugger/bridge.addr`. On Unix it listens on a Unix
19/// domain socket.
20pub struct Bridge {
21    shutdown: Arc<Notify>,
22    task: Option<tokio::task::JoinHandle<()>>,
23}
24
25impl Bridge {
26    /// Start the IPC bridge.
27    ///
28    /// This spawns a background Tokio task that accepts connections and
29    /// processes NDJSON requests.
30    pub async fn start(store: Arc<LogStore>) -> Result<Self, BridgeError> {
31        let shutdown = Arc::new(Notify::new());
32
33        if cfg!(windows) {
34            Self::start_tcp(store, shutdown.clone()).await
35        } else {
36            Self::start_unix(store, shutdown.clone()).await
37        }
38    }
39
40    #[cfg(not(windows))]
41    async fn start_unix(
42        store: Arc<LogStore>,
43        shutdown: Arc<Notify>,
44    ) -> Result<Self, BridgeError> {
45        use std::path::Path;
46        use tokio::net::UnixListener;
47
48        let socket_path = store.session().socket_path.clone();
49        let dir = Path::new(&socket_path)
50            .parent()
51            .ok_or_else(|| BridgeError::Io("invalid socket path".to_string()))?;
52
53        tokio::fs::create_dir_all(dir)
54            .await
55            .map_err(|e| BridgeError::Io(format!("create socket dir: {e}")))?;
56
57        // Remove stale socket file
58        let _ = tokio::fs::remove_file(&socket_path).await;
59
60        let listener = UnixListener::bind(&socket_path)
61            .map_err(|e| BridgeError::Io(format!("bind unix socket: {e}")))?;
62
63        write_session_file(&store, None).await;
64
65        let cwd = std::env::current_dir()
66            .unwrap_or_default()
67            .to_string_lossy()
68            .to_string();
69        let session_file = format!("{cwd}/.debugger/session.json");
70
71        let sd = shutdown.clone();
72        let task = tokio::spawn(async move {
73            loop {
74                tokio::select! {
75                    _ = sd.notified() => break,
76                    result = listener.accept() => {
77                        match result {
78                            Ok((stream, _)) => {
79                                let store = store.clone();
80                                tokio::spawn(async move {
81                                    let (reader, writer) = stream.into_split();
82                                    handle_connection(BufReader::new(reader), writer, store).await;
83                                });
84                            }
85                            Err(_) => continue,
86                        }
87                    }
88                }
89            }
90            let _ = tokio::fs::remove_file(&socket_path).await;
91            let _ = tokio::fs::remove_file(&session_file).await;
92        });
93
94        Ok(Self {
95            shutdown,
96            task: Some(task),
97        })
98    }
99
100    #[cfg(windows)]
101    async fn start_unix(
102        store: Arc<LogStore>,
103        shutdown: Arc<Notify>,
104    ) -> Result<Self, BridgeError> {
105        // Unix sockets not available on Windows — fall through to TCP
106        Self::start_tcp(store, shutdown).await
107    }
108
109    async fn start_tcp(
110        store: Arc<LogStore>,
111        shutdown: Arc<Notify>,
112    ) -> Result<Self, BridgeError> {
113        let listener = TcpListener::bind("127.0.0.1:0")
114            .await
115            .map_err(|e| BridgeError::Io(format!("bind tcp: {e}")))?;
116
117        let addr = listener
118            .local_addr()
119            .map_err(|e| BridgeError::Io(format!("get local addr: {e}")))?;
120
121        // Write the TCP address to .debugger/bridge.addr
122        let cwd = std::env::current_dir()
123            .unwrap_or_default()
124            .to_string_lossy()
125            .to_string();
126        let addr_dir = format!("{cwd}/.debugger");
127        tokio::fs::create_dir_all(&addr_dir)
128            .await
129            .map_err(|e| BridgeError::Io(format!("create addr dir: {e}")))?;
130
131        let addr_file = format!("{addr_dir}/bridge.addr");
132        tokio::fs::write(&addr_file, addr.to_string())
133            .await
134            .map_err(|e| BridgeError::Io(format!("write addr file: {e}")))?;
135
136        eprintln!("> @ephem-sh/debugger: bridge listening on {addr}");
137
138        write_session_file(&store, Some(&addr.to_string())).await;
139
140        let session_file = format!("{addr_dir}/session.json");
141
142        let sd = shutdown.clone();
143        let task = tokio::spawn(async move {
144            loop {
145                tokio::select! {
146                    _ = sd.notified() => break,
147                    result = listener.accept() => {
148                        match result {
149                            Ok((stream, _)) => {
150                                let store = store.clone();
151                                tokio::spawn(async move {
152                                    let (reader, writer) = stream.into_split();
153                                    handle_connection(BufReader::new(reader), writer, store).await;
154                                });
155                            }
156                            Err(_) => continue,
157                        }
158                    }
159                }
160            }
161            // Clean up bridge.addr and session.json
162            let _ = tokio::fs::remove_file(&addr_file).await;
163            let _ = tokio::fs::remove_file(&session_file).await;
164        });
165
166        Ok(Self {
167            shutdown,
168            task: Some(task),
169        })
170    }
171
172    /// Gracefully stop the bridge.
173    pub async fn stop(&mut self) {
174        self.shutdown.notify_one();
175        if let Some(task) = self.task.take() {
176            let _ = task.await;
177        }
178    }
179}
180
181/// Handle a single client connection — reads NDJSON lines and writes responses.
182async fn handle_connection<R, W>(reader: BufReader<R>, mut writer: W, store: Arc<LogStore>)
183where
184    R: tokio::io::AsyncRead + Unpin,
185    W: tokio::io::AsyncWrite + Unpin,
186{
187    let mut lines = reader.lines();
188    while let Ok(Some(line)) = lines.next_line().await {
189        if line.is_empty() {
190            continue;
191        }
192
193        let req: QueryRequest = match serde_json::from_str(&line) {
194            Ok(r) => r,
195            Err(e) => {
196                let _ = write_error(&mut writer, "", &format!("invalid request: {e}")).await;
197                continue;
198            }
199        };
200
201        if req.command == "push" {
202            handle_push(&mut writer, &req, &store).await;
203            continue;
204        }
205
206        let filters = req.filters.as_ref().cloned().unwrap_or_default();
207        let mut resp = store.query(&req.command, &filters);
208        resp.id = req.id;
209
210        let _ = write_response(&mut writer, &resp).await;
211    }
212}
213
214/// Handle a push command — parse the data field and push to store.
215async fn handle_push<W: tokio::io::AsyncWrite + Unpin>(
216    writer: &mut W,
217    req: &QueryRequest,
218    store: &LogStore,
219) {
220    if let Some(ref data) = req.data {
221        match serde_json::from_value::<LogEntry>(data.clone()) {
222            Ok(entry) => store.push(entry),
223            Err(e) => {
224                let _ = write_error(writer, &req.id, &format!("unmarshal push data: {e}")).await;
225                return;
226            }
227        }
228    }
229    let _ = write_ok(writer, &req.id).await;
230}
231
232async fn write_response<W: tokio::io::AsyncWrite + Unpin>(
233    writer: &mut W,
234    resp: &QueryResponse,
235) -> Result<(), std::io::Error> {
236    let mut json = serde_json::to_vec(resp).unwrap_or_default();
237    json.push(b'\n');
238    writer.write_all(&json).await?;
239    writer.flush().await
240}
241
242async fn write_ok<W: tokio::io::AsyncWrite + Unpin>(
243    writer: &mut W,
244    id: &str,
245) -> Result<(), std::io::Error> {
246    let resp = QueryResponse {
247        id: id.to_string(),
248        ok: true,
249        data: Vec::new(),
250        session: None,
251        error: None,
252    };
253    write_response(writer, &resp).await
254}
255
256async fn write_error<W: tokio::io::AsyncWrite + Unpin>(
257    writer: &mut W,
258    id: &str,
259    msg: &str,
260) -> Result<(), std::io::Error> {
261    let resp = QueryResponse {
262        id: id.to_string(),
263        ok: false,
264        data: Vec::new(),
265        session: None,
266        error: Some(msg.to_string()),
267    };
268    write_response(writer, &resp).await
269}
270
271/// Write session.json to `.debugger/` so the CLI can discover the session.
272async fn write_session_file(store: &LogStore, socket_override: Option<&str>) {
273    let cwd = std::env::current_dir()
274        .unwrap_or_default()
275        .to_string_lossy()
276        .to_string();
277    let session_dir = format!("{cwd}/.debugger");
278    let _ = tokio::fs::create_dir_all(&session_dir).await;
279    let session_file = format!("{session_dir}/session.json");
280    let mut session = store.session().clone();
281    if let Some(addr) = socket_override {
282        session.socket_path = addr.to_string();
283    }
284    if let Ok(json) = serde_json::to_string(&session) {
285        let _ = tokio::fs::write(&session_file, format!("{json}\n")).await;
286    }
287}
288
289/// Errors that can occur when starting or running the bridge.
290#[derive(Debug)]
291pub enum BridgeError {
292    /// An I/O error occurred.
293    Io(String),
294}
295
296impl std::fmt::Display for BridgeError {
297    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
298        match self {
299            Self::Io(msg) => write!(f, "bridge I/O error: {msg}"),
300        }
301    }
302}
303
304impl std::error::Error for BridgeError {}