ephem_debugger_rs/
bridge.rs1use std::sync::Arc;
7
8use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
9use tokio::net::TcpListener;
10use tokio::sync::Notify;
11
12use crate::protocol::{LogEntry, QueryRequest, QueryResponse};
13use crate::store::LogStore;
14
15pub struct Bridge {
21 shutdown: Arc<Notify>,
22 task: Option<tokio::task::JoinHandle<()>>,
23}
24
25impl Bridge {
26 pub async fn start(store: Arc<LogStore>) -> Result<Self, BridgeError> {
31 let shutdown = Arc::new(Notify::new());
32
33 if cfg!(windows) {
34 Self::start_tcp(store, shutdown.clone()).await
35 } else {
36 Self::start_unix(store, shutdown.clone()).await
37 }
38 }
39
40 #[cfg(not(windows))]
41 async fn start_unix(
42 store: Arc<LogStore>,
43 shutdown: Arc<Notify>,
44 ) -> Result<Self, BridgeError> {
45 use std::path::Path;
46 use tokio::net::UnixListener;
47
48 let socket_path = store.session().socket_path.clone();
49 let dir = Path::new(&socket_path)
50 .parent()
51 .ok_or_else(|| BridgeError::Io("invalid socket path".to_string()))?;
52
53 tokio::fs::create_dir_all(dir)
54 .await
55 .map_err(|e| BridgeError::Io(format!("create socket dir: {e}")))?;
56
57 let _ = tokio::fs::remove_file(&socket_path).await;
59
60 let listener = UnixListener::bind(&socket_path)
61 .map_err(|e| BridgeError::Io(format!("bind unix socket: {e}")))?;
62
63 write_session_file(&store, None).await;
64
65 let cwd = std::env::current_dir()
66 .unwrap_or_default()
67 .to_string_lossy()
68 .to_string();
69 let session_file = format!("{cwd}/.debugger/session.json");
70
71 let sd = shutdown.clone();
72 let task = tokio::spawn(async move {
73 loop {
74 tokio::select! {
75 _ = sd.notified() => break,
76 result = listener.accept() => {
77 match result {
78 Ok((stream, _)) => {
79 let store = store.clone();
80 tokio::spawn(async move {
81 let (reader, writer) = stream.into_split();
82 handle_connection(BufReader::new(reader), writer, store).await;
83 });
84 }
85 Err(_) => continue,
86 }
87 }
88 }
89 }
90 let _ = tokio::fs::remove_file(&socket_path).await;
91 let _ = tokio::fs::remove_file(&session_file).await;
92 });
93
94 Ok(Self {
95 shutdown,
96 task: Some(task),
97 })
98 }
99
100 #[cfg(windows)]
101 async fn start_unix(
102 store: Arc<LogStore>,
103 shutdown: Arc<Notify>,
104 ) -> Result<Self, BridgeError> {
105 Self::start_tcp(store, shutdown).await
107 }
108
109 async fn start_tcp(
110 store: Arc<LogStore>,
111 shutdown: Arc<Notify>,
112 ) -> Result<Self, BridgeError> {
113 let listener = TcpListener::bind("127.0.0.1:0")
114 .await
115 .map_err(|e| BridgeError::Io(format!("bind tcp: {e}")))?;
116
117 let addr = listener
118 .local_addr()
119 .map_err(|e| BridgeError::Io(format!("get local addr: {e}")))?;
120
121 let cwd = std::env::current_dir()
123 .unwrap_or_default()
124 .to_string_lossy()
125 .to_string();
126 let addr_dir = format!("{cwd}/.debugger");
127 tokio::fs::create_dir_all(&addr_dir)
128 .await
129 .map_err(|e| BridgeError::Io(format!("create addr dir: {e}")))?;
130
131 let addr_file = format!("{addr_dir}/bridge.addr");
132 tokio::fs::write(&addr_file, addr.to_string())
133 .await
134 .map_err(|e| BridgeError::Io(format!("write addr file: {e}")))?;
135
136 eprintln!("> @ephem-sh/debugger: bridge listening on {addr}");
137
138 write_session_file(&store, Some(&addr.to_string())).await;
139
140 let session_file = format!("{addr_dir}/session.json");
141
142 let sd = shutdown.clone();
143 let task = tokio::spawn(async move {
144 loop {
145 tokio::select! {
146 _ = sd.notified() => break,
147 result = listener.accept() => {
148 match result {
149 Ok((stream, _)) => {
150 let store = store.clone();
151 tokio::spawn(async move {
152 let (reader, writer) = stream.into_split();
153 handle_connection(BufReader::new(reader), writer, store).await;
154 });
155 }
156 Err(_) => continue,
157 }
158 }
159 }
160 }
161 let _ = tokio::fs::remove_file(&addr_file).await;
163 let _ = tokio::fs::remove_file(&session_file).await;
164 });
165
166 Ok(Self {
167 shutdown,
168 task: Some(task),
169 })
170 }
171
172 pub async fn stop(&mut self) {
174 self.shutdown.notify_one();
175 if let Some(task) = self.task.take() {
176 let _ = task.await;
177 }
178 }
179}
180
181async fn handle_connection<R, W>(reader: BufReader<R>, mut writer: W, store: Arc<LogStore>)
183where
184 R: tokio::io::AsyncRead + Unpin,
185 W: tokio::io::AsyncWrite + Unpin,
186{
187 let mut lines = reader.lines();
188 while let Ok(Some(line)) = lines.next_line().await {
189 if line.is_empty() {
190 continue;
191 }
192
193 let req: QueryRequest = match serde_json::from_str(&line) {
194 Ok(r) => r,
195 Err(e) => {
196 let _ = write_error(&mut writer, "", &format!("invalid request: {e}")).await;
197 continue;
198 }
199 };
200
201 if req.command == "push" {
202 handle_push(&mut writer, &req, &store).await;
203 continue;
204 }
205
206 let filters = req.filters.as_ref().cloned().unwrap_or_default();
207 let mut resp = store.query(&req.command, &filters);
208 resp.id = req.id;
209
210 let _ = write_response(&mut writer, &resp).await;
211 }
212}
213
214async fn handle_push<W: tokio::io::AsyncWrite + Unpin>(
216 writer: &mut W,
217 req: &QueryRequest,
218 store: &LogStore,
219) {
220 if let Some(ref data) = req.data {
221 match serde_json::from_value::<LogEntry>(data.clone()) {
222 Ok(entry) => store.push(entry),
223 Err(e) => {
224 let _ = write_error(writer, &req.id, &format!("unmarshal push data: {e}")).await;
225 return;
226 }
227 }
228 }
229 let _ = write_ok(writer, &req.id).await;
230}
231
232async fn write_response<W: tokio::io::AsyncWrite + Unpin>(
233 writer: &mut W,
234 resp: &QueryResponse,
235) -> Result<(), std::io::Error> {
236 let mut json = serde_json::to_vec(resp).unwrap_or_default();
237 json.push(b'\n');
238 writer.write_all(&json).await?;
239 writer.flush().await
240}
241
242async fn write_ok<W: tokio::io::AsyncWrite + Unpin>(
243 writer: &mut W,
244 id: &str,
245) -> Result<(), std::io::Error> {
246 let resp = QueryResponse {
247 id: id.to_string(),
248 ok: true,
249 data: Vec::new(),
250 session: None,
251 error: None,
252 };
253 write_response(writer, &resp).await
254}
255
256async fn write_error<W: tokio::io::AsyncWrite + Unpin>(
257 writer: &mut W,
258 id: &str,
259 msg: &str,
260) -> Result<(), std::io::Error> {
261 let resp = QueryResponse {
262 id: id.to_string(),
263 ok: false,
264 data: Vec::new(),
265 session: None,
266 error: Some(msg.to_string()),
267 };
268 write_response(writer, &resp).await
269}
270
271async fn write_session_file(store: &LogStore, socket_override: Option<&str>) {
273 let cwd = std::env::current_dir()
274 .unwrap_or_default()
275 .to_string_lossy()
276 .to_string();
277 let session_dir = format!("{cwd}/.debugger");
278 let _ = tokio::fs::create_dir_all(&session_dir).await;
279 let session_file = format!("{session_dir}/session.json");
280 let mut session = store.session().clone();
281 if let Some(addr) = socket_override {
282 session.socket_path = addr.to_string();
283 }
284 if let Ok(json) = serde_json::to_string(&session) {
285 let _ = tokio::fs::write(&session_file, format!("{json}\n")).await;
286 }
287}
288
289#[derive(Debug)]
291pub enum BridgeError {
292 Io(String),
294}
295
296impl std::fmt::Display for BridgeError {
297 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
298 match self {
299 Self::Io(msg) => write!(f, "bridge I/O error: {msg}"),
300 }
301 }
302}
303
304impl std::error::Error for BridgeError {}