Skip to main content

ocular_proxy/
lib.rs

1use anyhow::Result;
2use ocular_protocol::{Protocol, parse_request, parse_response, extract_full_command, format_response_detail, parse_amqp_frame, parse_amqp_request_full, is_async_method, amqp_frame_len};
3use std::sync::Arc;
4use std::time::{Instant, SystemTime};
5use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncRead, AsyncWrite};
6use tokio::net::{TcpListener, TcpStream};
7use tokio::sync::{broadcast, Mutex};
8use tracing::{info, warn, error, debug};
9
10pub use ocular_protocol::ProxyEvent;
11
12/// Pending request info
13struct PendingRequest {
14    timestamp: SystemTime,
15    instant: Instant,
16    command: String,
17    full_command: String,
18}
19
20pub async fn run_proxy(
21    listen_addr: String,
22    remote_addr: String,
23    name: String,
24    protocol: Protocol,
25    tx: broadcast::Sender<ProxyEvent>,
26    mut shutdown: tokio::sync::watch::Receiver<bool>,
27) -> Result<()> {
28    let listener = TcpListener::bind(&listen_addr).await?;
29    info!(component = %name, listen = %listen_addr, remote = %remote_addr, ?protocol, "proxy listening");
30
31    loop {
32        tokio::select! {
33            result = listener.accept() => {
34                let (client, peer) = result?;
35                debug!(component = %name, peer = %peer, "new client connection");
36                let remote = remote_addr.clone();
37                let name = name.clone();
38                let tx = tx.clone();
39                let process = resolve_peer_process(peer.port());
40                let peer_addr = peer.to_string();
41                let remote_for_conn = remote.clone();
42                tokio::spawn(async move {
43                    if let Err(e) = handle_conn(client, &remote, &name, protocol, &tx, process, peer_addr, remote_for_conn).await {
44                        warn!(component = %name, remote = %remote, error = %e, "connection ended with error");
45                    }
46                });
47            }
48            _ = shutdown.changed() => {
49                info!(component = %name, "proxy shutting down");
50                break;
51            }
52        }
53    }
54    Ok(())
55}
56
57#[allow(clippy::too_many_arguments)]
58async fn handle_conn(
59    mut client: TcpStream,
60    remote_addr: &str,
61    name: &str,
62    protocol: Protocol,
63    tx: &broadcast::Sender<ProxyEvent>,
64    process: Option<String>,
65    src: String,
66    dest: String,
67) -> Result<()> {
68    // Parse remote address: detect https:// for TLS outbound
69    let (actual_addr, use_tls, tls_host) = if remote_addr.starts_with("https://") {
70        let stripped = remote_addr.strip_prefix("https://").unwrap();
71        let host = stripped.split(':').next().unwrap_or(stripped).to_string();
72        (stripped.to_string(), true, host)
73    } else {
74        let stripped = remote_addr.strip_prefix("http://").unwrap_or(remote_addr);
75        (stripped.to_string(), false, String::new())
76    };
77
78    let tcp_stream = match TcpStream::connect(&actual_addr).await {
79        Ok(s) => {
80            debug!(component = %name, remote = %actual_addr, "connected to remote");
81            s
82        }
83        Err(e) => {
84            error!(component = %name, remote = %actual_addr, error = %e,
85                "failed to connect to remote — is the service running?");
86            if protocol == Protocol::Redis {
87                let err_msg = format!("-ERR ocular proxy: cannot reach {} ({})\r\n", actual_addr, e);
88                let _ = client.write_all(err_msg.as_bytes()).await;
89            }
90            return Err(e.into());
91        }
92    };
93
94    let (sr, sw): (Box<dyn AsyncRead + Unpin + Send>, Box<dyn AsyncWrite + Unpin + Send>) = if use_tls {
95        let config = rustls::ClientConfig::builder()
96            .dangerous()
97            .with_custom_certificate_verifier(Arc::new(NoVerify))
98            .with_no_client_auth();
99        let connector = tokio_rustls::TlsConnector::from(Arc::new(config));
100        let domain = rustls::pki_types::ServerName::try_from(tls_host)
101            .map_err(|e| anyhow::anyhow!("invalid TLS hostname: {}", e))?;
102        let tls_stream = connector.connect(domain, tcp_stream).await?;
103        let (r, w) = tokio::io::split(tls_stream);
104        (Box::new(r) as Box<dyn AsyncRead + Unpin + Send>, Box::new(w) as Box<dyn AsyncWrite + Unpin + Send>)
105    } else {
106        let (r, w) = tokio::io::split(tcp_stream);
107        (Box::new(r) as Box<dyn AsyncRead + Unpin + Send>, Box::new(w) as Box<dyn AsyncWrite + Unpin + Send>)
108    };
109
110    let mut sr = sr;
111    let mut sw = sw;
112
113    // For MySQL: strip SSL from greeting
114    if protocol == Protocol::Mysql {
115        let mut greeting_buf = [0u8; 65536];
116        let n = sr.read(&mut greeting_buf).await?;
117        if n == 0 { return Ok(()); }
118        let mut greeting = greeting_buf[..n].to_vec();
119        strip_mysql_ssl_flag(&mut greeting);
120        client.write_all(&greeting).await?;
121        debug!(component = %name, "forwarded MySQL greeting with SSL stripped");
122    }
123
124    // For PostgreSQL: handle SSL negotiation before normal flow
125    if protocol == Protocol::Postgres {
126        let mut buf = [0u8; 256];
127        let n = client.read(&mut buf).await?;
128        if n == 0 { return Ok(()); }
129        let data = &buf[..n];
130        // Forward SSLRequest to server
131        sw.write_all(data).await?;
132        // Read server's SSL response (single byte N or S)
133        let mut resp = [0u8; 1];
134        let rn = sr.read(&mut resp).await?;
135        if rn == 0 { return Ok(()); }
136        // Forward to client
137        client.write_all(&resp[..rn]).await?;
138        // Emit SSLRequest event
139        let command = parse_request(protocol, data).unwrap_or_else(|| "SSLRequest".into());
140        let response = if resp[0] == b'N' { "SSLResponse: No" } else { "SSLResponse: Yes" };
141        let _ = tx.send(ProxyEvent {
142            timestamp: SystemTime::now(),
143            component: name.to_string(),
144            protocol,
145            command: command.clone(),
146            full_command: command,
147            response: response.into(),
148            response_detail: response.into(),
149            latency: std::time::Duration::ZERO,
150            process: process.clone(),
151            src: Some(src.clone()),
152            dest: Some(dest.clone()),
153        });
154        // If server said 'S' (SSL), we'd need to upgrade — but we don't support that
155        // Most local setups respond 'N'
156    }
157
158    let (mut cr, mut cw) = client.split();
159
160    let pending: Arc<Mutex<Option<PendingRequest>>> = Arc::new(Mutex::new(None));
161
162    let name_req = name.to_string();
163    let name_resp = name.to_string();
164    let tx_req = tx.clone();
165    let tx_resp = tx.clone();
166    let pending_w = pending.clone();
167    let pending_r = pending;
168    let process_info = process;
169
170    let process_req = process_info.clone();
171    let src_req = src.clone();
172    let dest_req = dest.clone();
173    let src_resp = src.clone();
174    let dest_resp = dest;
175    let client_to_server = async move {
176        let mut buf = [0u8; 65536];
177        let mut http_req_buf: Vec<u8> = Vec::new();
178        let mut memcached_req_buf: Vec<u8> = Vec::new();
179        let mut kafka_req_buf: Vec<u8> = Vec::new();
180        loop {
181            let n = cr.read(&mut buf).await?;
182            if n == 0 { break; }
183            let data = &buf[..n];
184
185            if protocol == Protocol::Amqp {
186                // AMQP: loop through all frames in this read
187                let mut pos = 0;
188                while pos < data.len() {
189                    let frame_data = &data[pos..];
190                    let Some(flen) = amqp_frame_len(frame_data) else { break };
191                    if let Some(frame) = parse_amqp_frame(frame_data) {
192                        // Skip heartbeat — not a real request
193                        if frame.frame_type == 8 {
194                            pos += flen;
195                            continue;
196                        }
197                        if let Some(ref method) = frame.method {
198                            if is_async_method(method.class_id, method.method_id) {
199                                let (summary, detail) = parse_amqp_request_full(frame_data)
200                                    .unwrap_or_else(|| (method.summary.clone(), method.detail.clone()));
201                                let _ = tx_req.send(ProxyEvent {
202                                    timestamp: SystemTime::now(),
203                                    component: name_req.clone(),
204                                    protocol,
205                                    command: summary,
206                                    full_command: detail.clone(),
207                                    response: String::new(),
208                                    response_detail: detail,
209                                    latency: std::time::Duration::ZERO,
210                                    process: process_req.clone(),
211                                    src: Some(src_req.clone()),
212                                    dest: Some(dest_req.clone()),
213                                });
214                            } else {
215                                debug!(component = %name_req, command = %method.summary);
216                                *pending_w.lock().await = Some(PendingRequest {
217                                    timestamp: SystemTime::now(),
218                                    instant: Instant::now(),
219                                    command: method.summary.clone(),
220                                    full_command: method.detail.clone(),
221                                });
222                            }
223                        }
224                    }
225                    pos += flen;
226                }
227            } else if protocol == Protocol::Http {
228                http_req_buf.extend_from_slice(data);
229                if ocular_protocol::http::http_request_complete(&http_req_buf) {
230                    if let Some(command) = parse_request(protocol, &http_req_buf) {
231                        let full_command = extract_full_command(protocol, &http_req_buf).unwrap_or_else(|| command.clone());
232                        *pending_w.lock().await = Some(PendingRequest {
233                            timestamp: SystemTime::now(),
234                            instant: Instant::now(),
235                            command,
236                            full_command,
237                        });
238                    }
239                    http_req_buf.clear();
240                }
241            } else if protocol == Protocol::Memcached {
242                memcached_req_buf.extend_from_slice(data);
243                if ocular_protocol::memcached::memcached_request_complete(&memcached_req_buf) {
244                    if let Some(command) = parse_request(protocol, &memcached_req_buf) {
245                        let full_command = extract_full_command(protocol, &memcached_req_buf).unwrap_or_else(|| command.clone());
246                        *pending_w.lock().await = Some(PendingRequest {
247                            timestamp: SystemTime::now(),
248                            instant: Instant::now(),
249                            command,
250                            full_command,
251                        });
252                    }
253                    memcached_req_buf.clear();
254                }
255            } else if protocol == Protocol::Kafka {
256                kafka_req_buf.extend_from_slice(data);
257                while ocular_protocol::kafka::kafka_frame_complete(&kafka_req_buf) {
258                    let frame_len = i32::from_be_bytes([kafka_req_buf[0], kafka_req_buf[1], kafka_req_buf[2], kafka_req_buf[3]]) as usize + 4;
259                    let frame = &kafka_req_buf[..frame_len];
260                    if let Some(command) = parse_request(protocol, frame) {
261                        let full_command = extract_full_command(protocol, frame).unwrap_or_else(|| command.clone());
262                        *pending_w.lock().await = Some(PendingRequest {
263                            timestamp: SystemTime::now(),
264                            instant: Instant::now(),
265                            command,
266                            full_command,
267                        });
268                    }
269                    kafka_req_buf = kafka_req_buf[frame_len..].to_vec();
270                }
271            } else if let Some(command) = parse_request(protocol, data) {
272                let full_command = extract_full_command(protocol, data).unwrap_or_else(|| command.clone());
273                debug!(component = %name_req, %command);
274                *pending_w.lock().await = Some(PendingRequest {
275                    timestamp: SystemTime::now(),
276                    instant: Instant::now(),
277                    command,
278                    full_command,
279                });
280            } else if protocol == Protocol::Postgres && n > 0 {
281                info!(component = %name_req, bytes = n, first_byte = format!("0x{:02x}", data[0]), "pg client→server UNPARSED");
282            }
283
284            sw.write_all(data).await?;
285        }
286        Ok::<_, anyhow::Error>(())
287    };
288
289    let process_mysql = process_info.clone();
290    let server_to_client = async move {
291        let mut buf = [0u8; 65536];
292        let mut mysql_buf: Vec<u8> = Vec::new();
293        let mut http_resp_buf: Vec<u8> = Vec::new();
294        let mut memcached_resp_buf: Vec<u8> = Vec::new();
295        let mut kafka_resp_buf: Vec<u8> = Vec::new();
296        let mut awaiting_response = false;
297        let mut memcached_awaiting = false;
298        loop {
299            let n = sr.read(&mut buf).await?;
300            if n == 0 { break; }
301            let data = &buf[..n];
302            cw.write_all(data).await?;
303
304            if protocol == Protocol::Mysql {
305                let has_pending = pending_r.lock().await.is_some();
306                if has_pending || awaiting_response {
307                    awaiting_response = true;
308                    mysql_buf.extend_from_slice(data);
309                    if mysql_response_complete(&mysql_buf) {
310                        if let Some(req) = pending_r.lock().await.take() {
311                            let latency = req.instant.elapsed();
312                            let response = parse_response(protocol, &mysql_buf).unwrap_or_default();
313                            let response_detail = format_response_detail(protocol, &mysql_buf).unwrap_or_default();
314                            let _ = tx_resp.send(ProxyEvent {
315                                timestamp: req.timestamp,
316                                component: name_resp.clone(),
317                                protocol,
318                                command: req.command,
319                                full_command: req.full_command,
320                                response,
321                                response_detail,
322                                latency,
323                                process: process_mysql.clone(),
324                                src: Some(src_resp.clone()),
325                                dest: Some(dest_resp.clone()),
326                            });
327                        }
328                        mysql_buf.clear();
329                        awaiting_response = false;
330                    }
331                }
332            } else if protocol == Protocol::Http {
333                http_resp_buf.extend_from_slice(data);
334                if ocular_protocol::http::http_response_complete(&http_resp_buf) {
335                    if let Some(req) = pending_r.lock().await.take() {
336                        let latency = req.instant.elapsed();
337                        let response = parse_response(protocol, &http_resp_buf).unwrap_or_default();
338                        let response_detail = format_response_detail(protocol, &http_resp_buf).unwrap_or_else(|| response.clone());
339                        let _ = tx_resp.send(ProxyEvent {
340                            timestamp: req.timestamp,
341                            component: name_resp.clone(),
342                            protocol,
343                            command: req.command,
344                            full_command: req.full_command,
345                            response,
346                            response_detail,
347                            latency,
348                            process: process_info.clone(),
349                                    src: Some(src_resp.clone()),
350                                    dest: Some(dest_resp.clone()),
351                        });
352                    }
353                    http_resp_buf.clear();
354                }
355            } else if protocol == Protocol::Amqp {
356                // AMQP: loop through all server frames
357                let mut pos = 0;
358                while pos < data.len() {
359                    let frame_data = &data[pos..];
360                    let Some(flen) = amqp_frame_len(frame_data) else { break };
361                    if let Some(frame) = parse_amqp_frame(frame_data) {
362                        // Skip content header and body frames — handled below with method
363                        if frame.frame_type == 2 || frame.frame_type == 3 {
364                            pos += flen;
365                            continue;
366                        }
367                        // Heartbeat: skip
368                        if frame.frame_type == 8 {
369                            pos += flen;
370                            continue;
371                        }
372                    }
373
374                    // Extract body from subsequent Header+Body frames
375                    let mut body_text = String::new();
376                    let mut peek = pos + flen;
377                    while peek < data.len() {
378                        let peek_data = &data[peek..];
379                        let Some(plen) = amqp_frame_len(peek_data) else { break };
380                        if let Some(pf) = parse_amqp_frame(peek_data) {
381                            if pf.frame_type == 2 {
382                                // Header frame, skip
383                            } else if pf.frame_type == 3 {
384                                // Body frame
385                                if let Some(body) = &pf.body {
386                                    body_text = String::from_utf8_lossy(body).to_string();
387                                }
388                            } else {
389                                break;
390                            }
391                        } else {
392                            break;
393                        }
394                        peek += plen;
395                    }
396
397                    if let Some(req) = pending_r.lock().await.take() {
398                        let latency = req.instant.elapsed();
399                        let mut response = parse_response(protocol, frame_data).unwrap_or_default();
400                        let mut response_detail = format_response_detail(protocol, frame_data).unwrap_or_else(|| response.clone());
401                        if !body_text.is_empty() {
402                            response = format!("{} | {}", response, body_text);
403                            response_detail = format!("{}\nBody: {}", response_detail, body_text);
404                        }
405                        let _ = tx_resp.send(ProxyEvent {
406                            timestamp: req.timestamp,
407                            component: name_resp.clone(),
408                            protocol,
409                            command: req.command,
410                            full_command: req.full_command,
411                            response,
412                            response_detail,
413                            latency,
414                            process: process_info.clone(),
415                                    src: Some(src_resp.clone()),
416                                    dest: Some(dest_resp.clone()),
417                        });
418                    } else if let Some(frame) = parse_amqp_frame(frame_data) {
419                        // Server-initiated method (e.g. Basic.Deliver) — emit as standalone
420                        if let Some(ref method) = frame.method {
421                            let response = if body_text.is_empty() { String::new() } else { body_text.clone() };
422                            let response_detail = if body_text.is_empty() { String::new() } else { body_text.clone() };
423                            let command = method.summary.clone();
424                            let _ = tx_resp.send(ProxyEvent {
425                                timestamp: SystemTime::now(),
426                                component: name_resp.clone(),
427                                protocol,
428                                command,
429                                full_command: method.detail.clone(),
430                                response,
431                                response_detail,
432                                latency: std::time::Duration::ZERO,
433                                process: process_info.clone(),
434                                    src: Some(dest_resp.clone()),
435                                    dest: Some(src_resp.clone()),
436                            });
437                        }
438                    }
439                    // Advance past the method frame + any header/body frames we consumed
440                    pos = peek;
441                }
442            } else if protocol == Protocol::Postgres {
443                // Postgres: only pair with meaningful responses, skip setup noise
444                let first = data[0];
445                info!(component = %name_resp, bytes = n, first_byte = format!("0x{:02x}", first),
446                    hex_head = format!("{:02x?}", &data[..n.min(20)]), "pg server→client");
447                // Use parse_postgres_response which scans all messages and prioritizes errors
448                let is_meaningful = matches!(first, b'C' | b'E' | b'T' | b'Z' | b'I' | b'D' | b'R');
449                if is_meaningful {
450                    if let Some(req) = pending_r.lock().await.take() {
451                        let latency = req.instant.elapsed();
452                        let response = parse_response(protocol, data).unwrap_or_default();
453                        let response_detail = format_response_detail(protocol, data).unwrap_or_else(|| response.clone());
454                        let _ = tx_resp.send(ProxyEvent {
455                            timestamp: req.timestamp,
456                            component: name_resp.clone(),
457                            protocol,
458                            command: req.command,
459                            full_command: req.full_command,
460                            response,
461                            response_detail,
462                            latency,
463                            process: process_info.clone(),
464                                    src: Some(src_resp.clone()),
465                                    dest: Some(dest_resp.clone()),
466                        });
467                    }
468                }
469                // ParameterStatus (S), BackendKeyData (K), etc. are silently skipped
470            } else if protocol == Protocol::Memcached {
471                let has_pending = pending_r.lock().await.is_some();
472                if has_pending || memcached_awaiting {
473                    memcached_awaiting = true;
474                    memcached_resp_buf.extend_from_slice(data);
475                    if ocular_protocol::memcached::memcached_response_complete(&memcached_resp_buf) {
476                        if let Some(req) = pending_r.lock().await.take() {
477                            let latency = req.instant.elapsed();
478                            let response = parse_response(protocol, &memcached_resp_buf).unwrap_or_default();
479                            let response_detail = format_response_detail(protocol, &memcached_resp_buf).unwrap_or_else(|| response.clone());
480                            let _ = tx_resp.send(ProxyEvent {
481                                timestamp: req.timestamp,
482                                component: name_resp.clone(),
483                                protocol,
484                                command: req.command,
485                                full_command: req.full_command,
486                                response,
487                                response_detail,
488                                latency,
489                                process: process_info.clone(),
490                                src: Some(src_resp.clone()),
491                                dest: Some(dest_resp.clone()),
492                            });
493                        }
494                        memcached_resp_buf.clear();
495                        memcached_awaiting = false;
496                    }
497                }
498            } else if protocol == Protocol::Kafka {
499                kafka_resp_buf.extend_from_slice(data);
500                while ocular_protocol::kafka::kafka_frame_complete(&kafka_resp_buf) {
501                    let frame_len = i32::from_be_bytes([kafka_resp_buf[0], kafka_resp_buf[1], kafka_resp_buf[2], kafka_resp_buf[3]]) as usize + 4;
502                    if let Some(req) = pending_r.lock().await.take() {
503                        let latency = req.instant.elapsed();
504                        let response = parse_response(protocol, &kafka_resp_buf[..frame_len]).unwrap_or_default();
505                        let response_detail = format_response_detail(protocol, &kafka_resp_buf[..frame_len]).unwrap_or_else(|| response.clone());
506                        let _ = tx_resp.send(ProxyEvent {
507                            timestamp: req.timestamp,
508                            component: name_resp.clone(),
509                            protocol,
510                            command: req.command,
511                            full_command: req.full_command,
512                            response,
513                            response_detail,
514                            latency,
515                            process: process_info.clone(),
516                            src: Some(src_resp.clone()),
517                            dest: Some(dest_resp.clone()),
518                        });
519                    }
520                    kafka_resp_buf = kafka_resp_buf[frame_len..].to_vec();
521                }
522            } else {
523                // Redis/MongoDB: single request/response per read
524                if let Some(req) = pending_r.lock().await.take() {
525                    let latency = req.instant.elapsed();
526                    let response = parse_response(protocol, data).unwrap_or_default();
527                    let response_detail = format_response_detail(protocol, data).unwrap_or_else(|| response.clone());
528                    let _ = tx_resp.send(ProxyEvent {
529                        timestamp: req.timestamp,
530                        component: name_resp.clone(),
531                        protocol,
532                        command: req.command,
533                        full_command: req.full_command,
534                        response,
535                        response_detail,
536                        latency,
537                        process: process_info.clone(),
538                        src: Some(src_resp.clone()),
539                        dest: Some(dest_resp.clone()),
540                    });
541                }
542            }
543        }
544        Ok::<_, anyhow::Error>(())
545    };
546
547    tokio::select! {
548        r = client_to_server => r?,
549        r = server_to_client => r?,
550    }
551    Ok(())
552}
553
554fn mysql_response_complete(buf: &[u8]) -> bool {
555    if buf.len() < 5 { return false; }
556    let first_marker = buf[4];
557    match first_marker {
558        0x00 | 0xff => return true,
559        _ => {}
560    }
561    let mut pos = 0;
562    let mut last_marker = 0u8;
563    let mut last_pkt_len = 0usize;
564    while pos + 4 <= buf.len() {
565        let pkt_len = (buf[pos] as usize) | (buf[pos+1] as usize) << 8 | (buf[pos+2] as usize) << 16;
566        let end = pos + 4 + pkt_len;
567        if end > buf.len() { break; }
568        if pkt_len > 0 {
569            last_marker = buf[pos + 4];
570            last_pkt_len = pkt_len;
571        }
572        pos = end;
573    }
574    (last_marker == 0xfe && last_pkt_len < 9) || (last_marker == 0x00 && last_pkt_len < 16 && pos == buf.len())
575}
576
577fn strip_mysql_ssl_flag(packet: &mut [u8]) {
578    if packet.len() < 5 { return; }
579    let payload = &mut packet[4..];
580    if payload.is_empty() || payload[0] != 10 { return; }
581    let mut pos = 1;
582    while pos < payload.len() && payload[pos] != 0 { pos += 1; }
583    pos += 1;
584    pos += 4;
585    pos += 8;
586    pos += 1;
587    if pos + 2 > payload.len() { return; }
588    let cap_lower = u16::from_le_bytes([payload[pos], payload[pos + 1]]);
589    let cap_lower_new = cap_lower & !0x0800;
590    payload[pos] = (cap_lower_new & 0xff) as u8;
591    payload[pos + 1] = ((cap_lower_new >> 8) & 0xff) as u8;
592}
593
594/// Resolve which process owns a local TCP port (the client's ephemeral port).
595fn resolve_peer_process(port: u16) -> Option<String> {
596    use std::process::Command;
597    let my_pid = std::process::id().to_string();
598
599    if cfg!(target_os = "macos") {
600        // lsof -i tcp:PORT -sTCP:ESTABLISHED -Fp -Fc
601        // Returns multiple process entries; skip our own PID
602        let output = Command::new("lsof")
603            .args(["-i", &format!("tcp:{}", port), "-sTCP:ESTABLISHED", "-Fp", "-Fc"])
604            .output()
605            .ok()?;
606        let text = String::from_utf8_lossy(&output.stdout);
607        let mut current_pid = String::new();
608        let mut current_cmd = String::new();
609        for line in text.lines() {
610            if let Some(p) = line.strip_prefix('p') {
611                // Save previous entry if it wasn't us
612                if !current_pid.is_empty() && current_pid != my_pid {
613                    return Some(format!("[{}] {}", current_pid, current_cmd));
614                }
615                current_pid = p.to_string();
616                current_cmd.clear();
617            }
618            if let Some(c) = line.strip_prefix('c') {
619                current_cmd = c.to_string();
620            }
621        }
622        // Check last entry
623        if !current_pid.is_empty() && current_pid != my_pid {
624            return Some(format!("[{}] {}", current_pid, current_cmd));
625        }
626        None
627    } else {
628        // Linux: ss -tnp sport = :PORT
629        let output = Command::new("ss")
630            .args(["-tnp", &format!("sport = :{}", port)])
631            .output()
632            .ok()?;
633        let text = String::from_utf8_lossy(&output.stdout);
634        // Parse: users:(("process_name",pid=1234,fd=5))
635        for line in text.lines() {
636            if let Some(start) = line.find("users:((\"") {
637                let rest = &line[start + 9..];
638                if let Some(end) = rest.find('"') {
639                    let proc_name = &rest[..end];
640                    let pid = rest.find("pid=")
641                        .and_then(|i| rest[i+4..].split(|c: char| !c.is_ascii_digit()).next())
642                        .unwrap_or("?");
643                    return Some(format!("[{}] {}", pid, proc_name));
644                }
645            }
646        }
647        None
648    }
649}
650
651/// TLS certificate verifier that accepts any certificate (for proxying to known backends).
652#[derive(Debug)]
653struct NoVerify;
654
655impl rustls::client::danger::ServerCertVerifier for NoVerify {
656    fn verify_server_cert(
657        &self, _: &rustls::pki_types::CertificateDer<'_>, _: &[rustls::pki_types::CertificateDer<'_>],
658        _: &rustls::pki_types::ServerName<'_>, _: &[u8], _: rustls::pki_types::UnixTime,
659    ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
660        Ok(rustls::client::danger::ServerCertVerified::assertion())
661    }
662    fn verify_tls12_signature(
663        &self, _: &[u8], _: &rustls::pki_types::CertificateDer<'_>, _: &rustls::DigitallySignedStruct,
664    ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
665        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
666    }
667    fn verify_tls13_signature(
668        &self, _: &[u8], _: &rustls::pki_types::CertificateDer<'_>, _: &rustls::DigitallySignedStruct,
669    ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
670        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
671    }
672    fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
673        vec![
674            rustls::SignatureScheme::RSA_PKCS1_SHA256,
675            rustls::SignatureScheme::RSA_PKCS1_SHA384,
676            rustls::SignatureScheme::RSA_PKCS1_SHA512,
677            rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
678            rustls::SignatureScheme::ECDSA_NISTP384_SHA384,
679            rustls::SignatureScheme::RSA_PSS_SHA256,
680            rustls::SignatureScheme::RSA_PSS_SHA384,
681            rustls::SignatureScheme::RSA_PSS_SHA512,
682            rustls::SignatureScheme::ED25519,
683        ]
684    }
685}