Skip to main content

ocular_proxy/
lib.rs

1use anyhow::Result;
2use ocular_protocol::{Protocol, parse_request, parse_response, extract_full_command, format_response_detail, parse_amqp_frame, parse_amqp_request_full, is_async_method, amqp_frame_len};
3use std::sync::Arc;
4use std::time::{Instant, SystemTime};
5use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncRead, AsyncWrite};
6use tokio::net::{TcpListener, TcpStream};
7use tokio::sync::{broadcast, Mutex};
8use tracing::{info, warn, error, debug};
9
10pub use ocular_protocol::ProxyEvent;
11
12/// Pending request info
13struct PendingRequest {
14    timestamp: SystemTime,
15    instant: Instant,
16    command: String,
17    full_command: String,
18}
19
20pub async fn run_proxy(
21    listen_addr: String,
22    remote_addr: String,
23    name: String,
24    protocol: Protocol,
25    tx: broadcast::Sender<ProxyEvent>,
26) -> Result<()> {
27    let listener = TcpListener::bind(&listen_addr).await?;
28    info!(component = %name, listen = %listen_addr, remote = %remote_addr, ?protocol, "proxy listening");
29
30    loop {
31        let (client, peer) = listener.accept().await?;
32        debug!(component = %name, peer = %peer, "new client connection");
33        let remote = remote_addr.clone();
34        let name = name.clone();
35        let tx = tx.clone();
36        let process = resolve_peer_process(peer.port());
37        let peer_addr = peer.to_string();
38        let remote_for_conn = remote.clone();
39        tokio::spawn(async move {
40            if let Err(e) = handle_conn(client, &remote, &name, protocol, &tx, process, peer_addr, remote_for_conn).await {
41                warn!(component = %name, remote = %remote, error = %e, "connection ended with error");
42            }
43        });
44    }
45}
46
47#[allow(clippy::too_many_arguments)]
48async fn handle_conn(
49    mut client: TcpStream,
50    remote_addr: &str,
51    name: &str,
52    protocol: Protocol,
53    tx: &broadcast::Sender<ProxyEvent>,
54    process: Option<String>,
55    src: String,
56    dest: String,
57) -> Result<()> {
58    // Parse remote address: detect https:// for TLS outbound
59    let (actual_addr, use_tls, tls_host) = if remote_addr.starts_with("https://") {
60        let stripped = remote_addr.strip_prefix("https://").unwrap();
61        let host = stripped.split(':').next().unwrap_or(stripped).to_string();
62        (stripped.to_string(), true, host)
63    } else {
64        let stripped = remote_addr.strip_prefix("http://").unwrap_or(remote_addr);
65        (stripped.to_string(), false, String::new())
66    };
67
68    let tcp_stream = match TcpStream::connect(&actual_addr).await {
69        Ok(s) => {
70            debug!(component = %name, remote = %actual_addr, "connected to remote");
71            s
72        }
73        Err(e) => {
74            error!(component = %name, remote = %actual_addr, error = %e,
75                "failed to connect to remote — is the service running?");
76            if protocol == Protocol::Redis {
77                let err_msg = format!("-ERR ocular proxy: cannot reach {} ({})\r\n", actual_addr, e);
78                let _ = client.write_all(err_msg.as_bytes()).await;
79            }
80            return Err(e.into());
81        }
82    };
83
84    let (sr, sw): (Box<dyn AsyncRead + Unpin + Send>, Box<dyn AsyncWrite + Unpin + Send>) = if use_tls {
85        let config = rustls::ClientConfig::builder()
86            .dangerous()
87            .with_custom_certificate_verifier(Arc::new(NoVerify))
88            .with_no_client_auth();
89        let connector = tokio_rustls::TlsConnector::from(Arc::new(config));
90        let domain = rustls::pki_types::ServerName::try_from(tls_host)
91            .map_err(|e| anyhow::anyhow!("invalid TLS hostname: {}", e))?;
92        let tls_stream = connector.connect(domain, tcp_stream).await?;
93        let (r, w) = tokio::io::split(tls_stream);
94        (Box::new(r) as Box<dyn AsyncRead + Unpin + Send>, Box::new(w) as Box<dyn AsyncWrite + Unpin + Send>)
95    } else {
96        let (r, w) = tokio::io::split(tcp_stream);
97        (Box::new(r) as Box<dyn AsyncRead + Unpin + Send>, Box::new(w) as Box<dyn AsyncWrite + Unpin + Send>)
98    };
99
100    let mut sr = sr;
101    let mut sw = sw;
102
103    // For MySQL: strip SSL from greeting
104    if protocol == Protocol::Mysql {
105        let mut greeting_buf = [0u8; 65536];
106        let n = sr.read(&mut greeting_buf).await?;
107        if n == 0 { return Ok(()); }
108        let mut greeting = greeting_buf[..n].to_vec();
109        strip_mysql_ssl_flag(&mut greeting);
110        client.write_all(&greeting).await?;
111        debug!(component = %name, "forwarded MySQL greeting with SSL stripped");
112    }
113
114    // For PostgreSQL: handle SSL negotiation before normal flow
115    if protocol == Protocol::Postgres {
116        let mut buf = [0u8; 256];
117        let n = client.read(&mut buf).await?;
118        if n == 0 { return Ok(()); }
119        let data = &buf[..n];
120        // Forward SSLRequest to server
121        sw.write_all(data).await?;
122        // Read server's SSL response (single byte N or S)
123        let mut resp = [0u8; 1];
124        let rn = sr.read(&mut resp).await?;
125        if rn == 0 { return Ok(()); }
126        // Forward to client
127        client.write_all(&resp[..rn]).await?;
128        // Emit SSLRequest event
129        let command = parse_request(protocol, data).unwrap_or_else(|| "SSLRequest".into());
130        let response = if resp[0] == b'N' { "SSLResponse: No" } else { "SSLResponse: Yes" };
131        let _ = tx.send(ProxyEvent {
132            timestamp: SystemTime::now(),
133            component: name.to_string(),
134            protocol,
135            command: command.clone(),
136            full_command: command,
137            response: response.into(),
138            response_detail: response.into(),
139            latency: std::time::Duration::ZERO,
140            process: process.clone(),
141            src: Some(src.clone()),
142            dest: Some(dest.clone()),
143        });
144        // If server said 'S' (SSL), we'd need to upgrade — but we don't support that
145        // Most local setups respond 'N'
146    }
147
148    let (mut cr, mut cw) = client.split();
149
150    let pending: Arc<Mutex<Option<PendingRequest>>> = Arc::new(Mutex::new(None));
151
152    let name_req = name.to_string();
153    let name_resp = name.to_string();
154    let tx_req = tx.clone();
155    let tx_resp = tx.clone();
156    let pending_w = pending.clone();
157    let pending_r = pending;
158    let process_info = process;
159
160    let process_req = process_info.clone();
161    let src_req = src.clone();
162    let dest_req = dest.clone();
163    let src_resp = src.clone();
164    let dest_resp = dest;
165    let client_to_server = async move {
166        let mut buf = [0u8; 65536];
167        let mut http_req_buf: Vec<u8> = Vec::new();
168        let mut memcached_req_buf: Vec<u8> = Vec::new();
169        let mut kafka_req_buf: Vec<u8> = Vec::new();
170        loop {
171            let n = cr.read(&mut buf).await?;
172            if n == 0 { break; }
173            let data = &buf[..n];
174
175            if protocol == Protocol::Amqp {
176                // AMQP: loop through all frames in this read
177                let mut pos = 0;
178                while pos < data.len() {
179                    let frame_data = &data[pos..];
180                    let Some(flen) = amqp_frame_len(frame_data) else { break };
181                    if let Some(frame) = parse_amqp_frame(frame_data) {
182                        // Skip heartbeat — not a real request
183                        if frame.frame_type == 8 {
184                            pos += flen;
185                            continue;
186                        }
187                        if let Some(ref method) = frame.method {
188                            if is_async_method(method.class_id, method.method_id) {
189                                let (summary, detail) = parse_amqp_request_full(frame_data)
190                                    .unwrap_or_else(|| (method.summary.clone(), method.detail.clone()));
191                                let _ = tx_req.send(ProxyEvent {
192                                    timestamp: SystemTime::now(),
193                                    component: name_req.clone(),
194                                    protocol,
195                                    command: summary,
196                                    full_command: detail.clone(),
197                                    response: String::new(),
198                                    response_detail: detail,
199                                    latency: std::time::Duration::ZERO,
200                                    process: process_req.clone(),
201                                    src: Some(src_req.clone()),
202                                    dest: Some(dest_req.clone()),
203                                });
204                            } else {
205                                debug!(component = %name_req, command = %method.summary);
206                                *pending_w.lock().await = Some(PendingRequest {
207                                    timestamp: SystemTime::now(),
208                                    instant: Instant::now(),
209                                    command: method.summary.clone(),
210                                    full_command: method.detail.clone(),
211                                });
212                            }
213                        }
214                    }
215                    pos += flen;
216                }
217            } else if protocol == Protocol::Http {
218                http_req_buf.extend_from_slice(data);
219                if ocular_protocol::http::http_request_complete(&http_req_buf) {
220                    if let Some(command) = parse_request(protocol, &http_req_buf) {
221                        let full_command = extract_full_command(protocol, &http_req_buf).unwrap_or_else(|| command.clone());
222                        *pending_w.lock().await = Some(PendingRequest {
223                            timestamp: SystemTime::now(),
224                            instant: Instant::now(),
225                            command,
226                            full_command,
227                        });
228                    }
229                    http_req_buf.clear();
230                }
231            } else if protocol == Protocol::Memcached {
232                memcached_req_buf.extend_from_slice(data);
233                if ocular_protocol::memcached::memcached_request_complete(&memcached_req_buf) {
234                    if let Some(command) = parse_request(protocol, &memcached_req_buf) {
235                        let full_command = extract_full_command(protocol, &memcached_req_buf).unwrap_or_else(|| command.clone());
236                        *pending_w.lock().await = Some(PendingRequest {
237                            timestamp: SystemTime::now(),
238                            instant: Instant::now(),
239                            command,
240                            full_command,
241                        });
242                    }
243                    memcached_req_buf.clear();
244                }
245            } else if protocol == Protocol::Kafka {
246                kafka_req_buf.extend_from_slice(data);
247                while ocular_protocol::kafka::kafka_frame_complete(&kafka_req_buf) {
248                    let frame_len = i32::from_be_bytes([kafka_req_buf[0], kafka_req_buf[1], kafka_req_buf[2], kafka_req_buf[3]]) as usize + 4;
249                    let frame = &kafka_req_buf[..frame_len];
250                    if let Some(command) = parse_request(protocol, frame) {
251                        let full_command = extract_full_command(protocol, frame).unwrap_or_else(|| command.clone());
252                        *pending_w.lock().await = Some(PendingRequest {
253                            timestamp: SystemTime::now(),
254                            instant: Instant::now(),
255                            command,
256                            full_command,
257                        });
258                    }
259                    kafka_req_buf = kafka_req_buf[frame_len..].to_vec();
260                }
261            } else if let Some(command) = parse_request(protocol, data) {
262                let full_command = extract_full_command(protocol, data).unwrap_or_else(|| command.clone());
263                debug!(component = %name_req, %command);
264                *pending_w.lock().await = Some(PendingRequest {
265                    timestamp: SystemTime::now(),
266                    instant: Instant::now(),
267                    command,
268                    full_command,
269                });
270            } else if protocol == Protocol::Postgres && n > 0 {
271                info!(component = %name_req, bytes = n, first_byte = format!("0x{:02x}", data[0]), "pg client→server UNPARSED");
272            }
273
274            sw.write_all(data).await?;
275        }
276        Ok::<_, anyhow::Error>(())
277    };
278
279    let process_mysql = process_info.clone();
280    let server_to_client = async move {
281        let mut buf = [0u8; 65536];
282        let mut mysql_buf: Vec<u8> = Vec::new();
283        let mut http_resp_buf: Vec<u8> = Vec::new();
284        let mut memcached_resp_buf: Vec<u8> = Vec::new();
285        let mut kafka_resp_buf: Vec<u8> = Vec::new();
286        let mut awaiting_response = false;
287        let mut memcached_awaiting = false;
288        loop {
289            let n = sr.read(&mut buf).await?;
290            if n == 0 { break; }
291            let data = &buf[..n];
292            cw.write_all(data).await?;
293
294            if protocol == Protocol::Mysql {
295                let has_pending = pending_r.lock().await.is_some();
296                if has_pending || awaiting_response {
297                    awaiting_response = true;
298                    mysql_buf.extend_from_slice(data);
299                    if mysql_response_complete(&mysql_buf) {
300                        if let Some(req) = pending_r.lock().await.take() {
301                            let latency = req.instant.elapsed();
302                            let response = parse_response(protocol, &mysql_buf).unwrap_or_default();
303                            let response_detail = format_response_detail(protocol, &mysql_buf).unwrap_or_default();
304                            let _ = tx_resp.send(ProxyEvent {
305                                timestamp: req.timestamp,
306                                component: name_resp.clone(),
307                                protocol,
308                                command: req.command,
309                                full_command: req.full_command,
310                                response,
311                                response_detail,
312                                latency,
313                                process: process_mysql.clone(),
314                                src: Some(src_resp.clone()),
315                                dest: Some(dest_resp.clone()),
316                            });
317                        }
318                        mysql_buf.clear();
319                        awaiting_response = false;
320                    }
321                }
322            } else if protocol == Protocol::Http {
323                http_resp_buf.extend_from_slice(data);
324                if ocular_protocol::http::http_response_complete(&http_resp_buf) {
325                    if let Some(req) = pending_r.lock().await.take() {
326                        let latency = req.instant.elapsed();
327                        let response = parse_response(protocol, &http_resp_buf).unwrap_or_default();
328                        let response_detail = format_response_detail(protocol, &http_resp_buf).unwrap_or_else(|| response.clone());
329                        let _ = tx_resp.send(ProxyEvent {
330                            timestamp: req.timestamp,
331                            component: name_resp.clone(),
332                            protocol,
333                            command: req.command,
334                            full_command: req.full_command,
335                            response,
336                            response_detail,
337                            latency,
338                            process: process_info.clone(),
339                                    src: Some(src_resp.clone()),
340                                    dest: Some(dest_resp.clone()),
341                        });
342                    }
343                    http_resp_buf.clear();
344                }
345            } else if protocol == Protocol::Amqp {
346                // AMQP: loop through all server frames
347                let mut pos = 0;
348                while pos < data.len() {
349                    let frame_data = &data[pos..];
350                    let Some(flen) = amqp_frame_len(frame_data) else { break };
351                    if let Some(frame) = parse_amqp_frame(frame_data) {
352                        // Skip content header and body frames — handled below with method
353                        if frame.frame_type == 2 || frame.frame_type == 3 {
354                            pos += flen;
355                            continue;
356                        }
357                        // Heartbeat: skip
358                        if frame.frame_type == 8 {
359                            pos += flen;
360                            continue;
361                        }
362                    }
363
364                    // Extract body from subsequent Header+Body frames
365                    let mut body_text = String::new();
366                    let mut peek = pos + flen;
367                    while peek < data.len() {
368                        let peek_data = &data[peek..];
369                        let Some(plen) = amqp_frame_len(peek_data) else { break };
370                        if let Some(pf) = parse_amqp_frame(peek_data) {
371                            if pf.frame_type == 2 {
372                                // Header frame, skip
373                            } else if pf.frame_type == 3 {
374                                // Body frame
375                                if let Some(body) = &pf.body {
376                                    body_text = String::from_utf8_lossy(body).to_string();
377                                }
378                            } else {
379                                break;
380                            }
381                        } else {
382                            break;
383                        }
384                        peek += plen;
385                    }
386
387                    if let Some(req) = pending_r.lock().await.take() {
388                        let latency = req.instant.elapsed();
389                        let mut response = parse_response(protocol, frame_data).unwrap_or_default();
390                        let mut response_detail = format_response_detail(protocol, frame_data).unwrap_or_else(|| response.clone());
391                        if !body_text.is_empty() {
392                            response = format!("{} | {}", response, body_text);
393                            response_detail = format!("{}\nBody: {}", response_detail, body_text);
394                        }
395                        let _ = tx_resp.send(ProxyEvent {
396                            timestamp: req.timestamp,
397                            component: name_resp.clone(),
398                            protocol,
399                            command: req.command,
400                            full_command: req.full_command,
401                            response,
402                            response_detail,
403                            latency,
404                            process: process_info.clone(),
405                                    src: Some(src_resp.clone()),
406                                    dest: Some(dest_resp.clone()),
407                        });
408                    } else if let Some(frame) = parse_amqp_frame(frame_data) {
409                        // Server-initiated method (e.g. Basic.Deliver) — emit as standalone
410                        if let Some(ref method) = frame.method {
411                            let response = if body_text.is_empty() { String::new() } else { body_text.clone() };
412                            let response_detail = if body_text.is_empty() { String::new() } else { body_text.clone() };
413                            let command = method.summary.clone();
414                            let _ = tx_resp.send(ProxyEvent {
415                                timestamp: SystemTime::now(),
416                                component: name_resp.clone(),
417                                protocol,
418                                command,
419                                full_command: method.detail.clone(),
420                                response,
421                                response_detail,
422                                latency: std::time::Duration::ZERO,
423                                process: process_info.clone(),
424                                    src: Some(dest_resp.clone()),
425                                    dest: Some(src_resp.clone()),
426                            });
427                        }
428                    }
429                    // Advance past the method frame + any header/body frames we consumed
430                    pos = peek;
431                }
432            } else if protocol == Protocol::Postgres {
433                // Postgres: only pair with meaningful responses, skip setup noise
434                let first = data[0];
435                info!(component = %name_resp, bytes = n, first_byte = format!("0x{:02x}", first),
436                    hex_head = format!("{:02x?}", &data[..n.min(20)]), "pg server→client");
437                // Use parse_postgres_response which scans all messages and prioritizes errors
438                let is_meaningful = matches!(first, b'C' | b'E' | b'T' | b'Z' | b'I' | b'D' | b'R');
439                if is_meaningful {
440                    if let Some(req) = pending_r.lock().await.take() {
441                        let latency = req.instant.elapsed();
442                        let response = parse_response(protocol, data).unwrap_or_default();
443                        let response_detail = format_response_detail(protocol, data).unwrap_or_else(|| response.clone());
444                        let _ = tx_resp.send(ProxyEvent {
445                            timestamp: req.timestamp,
446                            component: name_resp.clone(),
447                            protocol,
448                            command: req.command,
449                            full_command: req.full_command,
450                            response,
451                            response_detail,
452                            latency,
453                            process: process_info.clone(),
454                                    src: Some(src_resp.clone()),
455                                    dest: Some(dest_resp.clone()),
456                        });
457                    }
458                }
459                // ParameterStatus (S), BackendKeyData (K), etc. are silently skipped
460            } else if protocol == Protocol::Memcached {
461                let has_pending = pending_r.lock().await.is_some();
462                if has_pending || memcached_awaiting {
463                    memcached_awaiting = true;
464                    memcached_resp_buf.extend_from_slice(data);
465                    if ocular_protocol::memcached::memcached_response_complete(&memcached_resp_buf) {
466                        if let Some(req) = pending_r.lock().await.take() {
467                            let latency = req.instant.elapsed();
468                            let response = parse_response(protocol, &memcached_resp_buf).unwrap_or_default();
469                            let response_detail = format_response_detail(protocol, &memcached_resp_buf).unwrap_or_else(|| response.clone());
470                            let _ = tx_resp.send(ProxyEvent {
471                                timestamp: req.timestamp,
472                                component: name_resp.clone(),
473                                protocol,
474                                command: req.command,
475                                full_command: req.full_command,
476                                response,
477                                response_detail,
478                                latency,
479                                process: process_info.clone(),
480                                src: Some(src_resp.clone()),
481                                dest: Some(dest_resp.clone()),
482                            });
483                        }
484                        memcached_resp_buf.clear();
485                        memcached_awaiting = false;
486                    }
487                }
488            } else if protocol == Protocol::Kafka {
489                kafka_resp_buf.extend_from_slice(data);
490                while ocular_protocol::kafka::kafka_frame_complete(&kafka_resp_buf) {
491                    let frame_len = i32::from_be_bytes([kafka_resp_buf[0], kafka_resp_buf[1], kafka_resp_buf[2], kafka_resp_buf[3]]) as usize + 4;
492                    if let Some(req) = pending_r.lock().await.take() {
493                        let latency = req.instant.elapsed();
494                        let response = parse_response(protocol, &kafka_resp_buf[..frame_len]).unwrap_or_default();
495                        let response_detail = format_response_detail(protocol, &kafka_resp_buf[..frame_len]).unwrap_or_else(|| response.clone());
496                        let _ = tx_resp.send(ProxyEvent {
497                            timestamp: req.timestamp,
498                            component: name_resp.clone(),
499                            protocol,
500                            command: req.command,
501                            full_command: req.full_command,
502                            response,
503                            response_detail,
504                            latency,
505                            process: process_info.clone(),
506                            src: Some(src_resp.clone()),
507                            dest: Some(dest_resp.clone()),
508                        });
509                    }
510                    kafka_resp_buf = kafka_resp_buf[frame_len..].to_vec();
511                }
512            } else {
513                // Redis/MongoDB: single request/response per read
514                if let Some(req) = pending_r.lock().await.take() {
515                    let latency = req.instant.elapsed();
516                    let response = parse_response(protocol, data).unwrap_or_default();
517                    let response_detail = format_response_detail(protocol, data).unwrap_or_else(|| response.clone());
518                    let _ = tx_resp.send(ProxyEvent {
519                        timestamp: req.timestamp,
520                        component: name_resp.clone(),
521                        protocol,
522                        command: req.command,
523                        full_command: req.full_command,
524                        response,
525                        response_detail,
526                        latency,
527                        process: process_info.clone(),
528                        src: Some(src_resp.clone()),
529                        dest: Some(dest_resp.clone()),
530                    });
531                }
532            }
533        }
534        Ok::<_, anyhow::Error>(())
535    };
536
537    tokio::select! {
538        r = client_to_server => r?,
539        r = server_to_client => r?,
540    }
541    Ok(())
542}
543
544fn mysql_response_complete(buf: &[u8]) -> bool {
545    if buf.len() < 5 { return false; }
546    let first_marker = buf[4];
547    match first_marker {
548        0x00 | 0xff => return true,
549        _ => {}
550    }
551    let mut pos = 0;
552    let mut last_marker = 0u8;
553    let mut last_pkt_len = 0usize;
554    while pos + 4 <= buf.len() {
555        let pkt_len = (buf[pos] as usize) | (buf[pos+1] as usize) << 8 | (buf[pos+2] as usize) << 16;
556        let end = pos + 4 + pkt_len;
557        if end > buf.len() { break; }
558        if pkt_len > 0 {
559            last_marker = buf[pos + 4];
560            last_pkt_len = pkt_len;
561        }
562        pos = end;
563    }
564    (last_marker == 0xfe && last_pkt_len < 9) || (last_marker == 0x00 && last_pkt_len < 16 && pos == buf.len())
565}
566
567fn strip_mysql_ssl_flag(packet: &mut [u8]) {
568    if packet.len() < 5 { return; }
569    let payload = &mut packet[4..];
570    if payload.is_empty() || payload[0] != 10 { return; }
571    let mut pos = 1;
572    while pos < payload.len() && payload[pos] != 0 { pos += 1; }
573    pos += 1;
574    pos += 4;
575    pos += 8;
576    pos += 1;
577    if pos + 2 > payload.len() { return; }
578    let cap_lower = u16::from_le_bytes([payload[pos], payload[pos + 1]]);
579    let cap_lower_new = cap_lower & !0x0800;
580    payload[pos] = (cap_lower_new & 0xff) as u8;
581    payload[pos + 1] = ((cap_lower_new >> 8) & 0xff) as u8;
582}
583
584/// Resolve which process owns a local TCP port (the client's ephemeral port).
585fn resolve_peer_process(port: u16) -> Option<String> {
586    use std::process::Command;
587    let my_pid = std::process::id().to_string();
588
589    if cfg!(target_os = "macos") {
590        // lsof -i tcp:PORT -sTCP:ESTABLISHED -Fp -Fc
591        // Returns multiple process entries; skip our own PID
592        let output = Command::new("lsof")
593            .args(["-i", &format!("tcp:{}", port), "-sTCP:ESTABLISHED", "-Fp", "-Fc"])
594            .output()
595            .ok()?;
596        let text = String::from_utf8_lossy(&output.stdout);
597        let mut current_pid = String::new();
598        let mut current_cmd = String::new();
599        for line in text.lines() {
600            if let Some(p) = line.strip_prefix('p') {
601                // Save previous entry if it wasn't us
602                if !current_pid.is_empty() && current_pid != my_pid {
603                    return Some(format!("[{}] {}", current_pid, current_cmd));
604                }
605                current_pid = p.to_string();
606                current_cmd.clear();
607            }
608            if let Some(c) = line.strip_prefix('c') {
609                current_cmd = c.to_string();
610            }
611        }
612        // Check last entry
613        if !current_pid.is_empty() && current_pid != my_pid {
614            return Some(format!("[{}] {}", current_pid, current_cmd));
615        }
616        None
617    } else {
618        // Linux: ss -tnp sport = :PORT
619        let output = Command::new("ss")
620            .args(["-tnp", &format!("sport = :{}", port)])
621            .output()
622            .ok()?;
623        let text = String::from_utf8_lossy(&output.stdout);
624        // Parse: users:(("process_name",pid=1234,fd=5))
625        for line in text.lines() {
626            if let Some(start) = line.find("users:((\"") {
627                let rest = &line[start + 9..];
628                if let Some(end) = rest.find('"') {
629                    let proc_name = &rest[..end];
630                    let pid = rest.find("pid=")
631                        .and_then(|i| rest[i+4..].split(|c: char| !c.is_ascii_digit()).next())
632                        .unwrap_or("?");
633                    return Some(format!("[{}] {}", pid, proc_name));
634                }
635            }
636        }
637        None
638    }
639}
640
641/// TLS certificate verifier that accepts any certificate (for proxying to known backends).
642#[derive(Debug)]
643struct NoVerify;
644
645impl rustls::client::danger::ServerCertVerifier for NoVerify {
646    fn verify_server_cert(
647        &self, _: &rustls::pki_types::CertificateDer<'_>, _: &[rustls::pki_types::CertificateDer<'_>],
648        _: &rustls::pki_types::ServerName<'_>, _: &[u8], _: rustls::pki_types::UnixTime,
649    ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
650        Ok(rustls::client::danger::ServerCertVerified::assertion())
651    }
652    fn verify_tls12_signature(
653        &self, _: &[u8], _: &rustls::pki_types::CertificateDer<'_>, _: &rustls::DigitallySignedStruct,
654    ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
655        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
656    }
657    fn verify_tls13_signature(
658        &self, _: &[u8], _: &rustls::pki_types::CertificateDer<'_>, _: &rustls::DigitallySignedStruct,
659    ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
660        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
661    }
662    fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
663        vec![
664            rustls::SignatureScheme::RSA_PKCS1_SHA256,
665            rustls::SignatureScheme::RSA_PKCS1_SHA384,
666            rustls::SignatureScheme::RSA_PKCS1_SHA512,
667            rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
668            rustls::SignatureScheme::ECDSA_NISTP384_SHA384,
669            rustls::SignatureScheme::RSA_PSS_SHA256,
670            rustls::SignatureScheme::RSA_PSS_SHA384,
671            rustls::SignatureScheme::RSA_PSS_SHA512,
672            rustls::SignatureScheme::ED25519,
673        ]
674    }
675}