Skip to main content

ftui_pty/
ws_bridge.rs

1//! WebSocket-to-PTY bridge for FrankenTerm remote sessions.
2//!
3//! This module provides a small, deterministic server that:
4//! - accepts a websocket client,
5//! - spawns a PTY child process,
6//! - forwards websocket binary input to the PTY,
7//! - forwards PTY output back to websocket binary frames,
8//! - supports resize control messages over websocket text frames, and
9//! - emits JSONL telemetry for session/debug analysis.
10
11use std::collections::VecDeque;
12use std::fs::{File, OpenOptions};
13use std::io::{self, Read, Write};
14use std::net::{SocketAddr, TcpListener, TcpStream};
15use std::path::{Path, PathBuf};
16use std::sync::mpsc;
17use std::thread;
18use std::time::{Duration, Instant};
19
20use frankenterm_core::flow_control::{
21    FlowControlConfig, FlowControlPolicy, FlowControlSnapshot, InputEventClass, LatencyWindowMs,
22    QueueDepthBytes, RateWindowBps,
23};
24use portable_pty::{Child, CommandBuilder, ExitStatus, MasterPty, PtySize};
25use serde_json::{Value, json};
26use time::OffsetDateTime;
27use time::format_description::well_known::Rfc3339;
28use tungstenite::handshake::server::{ErrorResponse, Request, Response};
29use tungstenite::http::StatusCode;
30use tungstenite::protocol::WebSocketConfig;
31use tungstenite::{Error as WsError, Message, WebSocket, accept_hdr_with_config};
32
33/// Runtime configuration for the websocket PTY bridge.
34#[derive(Debug, Clone)]
35pub struct WsPtyBridgeConfig {
36    /// Address to bind the websocket server to.
37    pub bind_addr: SocketAddr,
38    /// Executable to spawn in the PTY.
39    pub command: String,
40    /// Command arguments.
41    pub args: Vec<String>,
42    /// TERM value exported to the child process.
43    pub term: String,
44    /// Extra child environment variables.
45    pub env: Vec<(String, String)>,
46    /// Initial PTY columns.
47    pub cols: u16,
48    /// Initial PTY rows.
49    pub rows: u16,
50    /// Allowlist for `Origin` headers. Empty means allow all.
51    pub allowed_origins: Vec<String>,
52    /// Optional shared secret expected as query parameter `token`.
53    pub auth_token: Option<String>,
54    /// Optional JSONL telemetry file path.
55    pub telemetry_path: Option<PathBuf>,
56    /// Max websocket message/frame size.
57    pub max_message_bytes: usize,
58    /// Loop sleep duration when idle.
59    pub idle_sleep: Duration,
60    /// Stop after one session if true.
61    pub accept_once: bool,
62    /// Optional flow control configuration. When `Some`, the bridge tracks
63    /// credit windows, bounded output queues, resize coalescing, and
64    /// backpressure decisions. When `None`, raw passthrough (legacy behavior).
65    pub flow_control: Option<FlowControlBridgeConfig>,
66}
67
68impl Default for WsPtyBridgeConfig {
69    fn default() -> Self {
70        let command = std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".to_string());
71        Self {
72            bind_addr: SocketAddr::from(([127, 0, 0, 1], 9231)),
73            command,
74            args: Vec::new(),
75            term: "xterm-256color".to_string(),
76            env: Vec::new(),
77            cols: 120,
78            rows: 40,
79            allowed_origins: Vec::new(),
80            auth_token: None,
81            telemetry_path: None,
82            max_message_bytes: 256 * 1024,
83            idle_sleep: Duration::from_millis(5),
84            accept_once: true,
85            flow_control: None,
86        }
87    }
88}
89
90/// Session summary emitted when a bridge session ends.
91#[derive(Debug, Clone, PartialEq, Eq)]
92pub struct BridgeSummary {
93    /// Session id used in telemetry.
94    pub session_id: String,
95    /// Total websocket inbound bytes.
96    pub ws_in_bytes: u64,
97    /// Total websocket outbound bytes.
98    pub ws_out_bytes: u64,
99    /// Total bytes written into PTY stdin.
100    pub pty_in_bytes: u64,
101    /// Total bytes read from PTY stdout/stderr.
102    pub pty_out_bytes: u64,
103    /// Number of resize operations processed.
104    pub resize_events: u64,
105    /// Exit code if the child terminated during session.
106    pub exit_code: Option<u32>,
107    /// Exit signal (platform-dependent text) if available.
108    pub exit_signal: Option<String>,
109}
110
111impl BridgeSummary {
112    #[must_use]
113    fn as_json(&self) -> Value {
114        json!({
115            "session_id": self.session_id,
116            "ws_in_bytes": self.ws_in_bytes,
117            "ws_out_bytes": self.ws_out_bytes,
118            "pty_in_bytes": self.pty_in_bytes,
119            "pty_out_bytes": self.pty_out_bytes,
120            "resize_events": self.resize_events,
121            "exit_code": self.exit_code,
122            "exit_signal": self.exit_signal,
123        })
124    }
125}
126
127/// Run the websocket PTY bridge server.
128///
129/// If `accept_once` is true, this accepts a single client and returns.
130/// If false, the server keeps accepting new sessions until an unrecoverable
131/// listener error occurs.
132pub fn run_ws_pty_bridge(config: WsPtyBridgeConfig) -> io::Result<()> {
133    let listener = TcpListener::bind(config.bind_addr)?;
134
135    loop {
136        let (stream, peer_addr) = listener.accept()?;
137        let session_id = make_session_id();
138        let mut telemetry = TelemetrySink::new(config.telemetry_path.as_deref(), &session_id)?;
139        telemetry.write(
140            "bridge_session_start",
141            json!({
142                "peer": peer_addr.to_string(),
143                "bind_addr": config.bind_addr.to_string(),
144                "command": config.command,
145                "args": config.args,
146                "cols": config.cols,
147                "rows": config.rows,
148                "term": config.term,
149                "max_message_bytes": config.max_message_bytes,
150            }),
151        )?;
152
153        let result = run_single_session(stream, &config, &session_id, &mut telemetry);
154        match result {
155            Ok(summary) => {
156                telemetry.write("bridge_session_end", summary.as_json())?;
157            }
158            Err(error) => {
159                telemetry.write(
160                    "bridge_session_error",
161                    json!({ "error": error.to_string() }),
162                )?;
163                if config.accept_once {
164                    return Err(error);
165                }
166            }
167        }
168
169        if config.accept_once {
170            break;
171        }
172    }
173
174    Ok(())
175}
176
177fn make_summary(
178    session_id: &str,
179    counters: &Counters,
180    exit_code: Option<u32>,
181    exit_signal: Option<String>,
182) -> BridgeSummary {
183    BridgeSummary {
184        session_id: session_id.to_string(),
185        ws_in_bytes: counters.ws_in_bytes,
186        ws_out_bytes: counters.ws_out_bytes,
187        pty_in_bytes: counters.pty_in_bytes,
188        pty_out_bytes: counters.pty_out_bytes,
189        resize_events: counters.resize_events,
190        exit_code,
191        exit_signal,
192    }
193}
194
195fn run_single_session(
196    stream: TcpStream,
197    config: &WsPtyBridgeConfig,
198    session_id: &str,
199    telemetry: &mut TelemetrySink,
200) -> io::Result<BridgeSummary> {
201    stream.set_nodelay(true)?;
202    let mut websocket = accept_websocket(stream, config)?;
203    websocket.get_mut().set_nonblocking(true)?;
204
205    let mut pty = PtyBridgeSession::spawn(config)?;
206    let mut counters = Counters::default();
207    let mut exit_code = None;
208    let mut exit_signal: Option<String> = None;
209
210    let mut fc_state: Option<FlowControlBridgeState> = config
211        .flow_control
212        .as_ref()
213        .map(FlowControlBridgeState::new);
214
215    if let Some(ref fc_config) = config.flow_control {
216        telemetry.write(
217            "flow_control_enabled",
218            json!({
219                "output_window": fc_config.output_window,
220                "input_window": fc_config.input_window,
221                "coalesce_resize_ms": fc_config.coalesce_resize_ms,
222            }),
223        )?;
224    }
225
226    loop {
227        let mut progressed = false;
228
229        // --- WebSocket read loop ---
230        loop {
231            match websocket.read() {
232                Ok(message) => {
233                    progressed = true;
234                    if handle_ws_message(
235                        &mut websocket,
236                        &mut pty,
237                        &mut counters,
238                        telemetry,
239                        message,
240                        &mut fc_state,
241                    )? {
242                        if let Some(ref fc) = fc_state {
243                            telemetry.write("flow_control_summary", fc.summary_json())?;
244                        }
245                        return Ok(make_summary(session_id, &counters, exit_code, exit_signal));
246                    }
247                }
248                Err(WsError::Io(error)) if error.kind() == io::ErrorKind::WouldBlock => break,
249                Err(WsError::ConnectionClosed | WsError::AlreadyClosed) => {
250                    if let Some(ref fc) = fc_state {
251                        telemetry.write("flow_control_summary", fc.summary_json())?;
252                    }
253                    return Ok(make_summary(session_id, &counters, exit_code, exit_signal));
254                }
255                Err(error) => {
256                    return Err(io::Error::other(format!("websocket read failed: {error}")));
257                }
258            }
259        }
260
261        // --- Resize coalescing flush ---
262        if let Some(ref mut fc) = fc_state
263            && let Some((cols, rows)) = fc.flush_pending_resize()
264        {
265            pty.resize(cols, rows)?;
266            counters.resize_events = counters.resize_events.saturating_add(1);
267            telemetry.write(
268                "bridge_resize",
269                json!({ "cols": cols, "rows": rows, "coalesced": true }),
270            )?;
271        }
272
273        // --- PTY output ---
274        let read_pty = fc_state.as_ref().is_none_or(|fc| !fc.pty_reads_paused);
275        if read_pty {
276            let output = pty.drain_output_nonblocking()?;
277            if !output.is_empty() {
278                progressed = true;
279                counters.pty_out_bytes = counters
280                    .pty_out_bytes
281                    .saturating_add(u64::try_from(output.len()).unwrap_or(u64::MAX));
282
283                match fc_state {
284                    Some(ref mut fc) => {
285                        fc.enqueue_output(&output);
286                    }
287                    None => {
288                        counters.ws_out_bytes = counters
289                            .ws_out_bytes
290                            .saturating_add(u64::try_from(output.len()).unwrap_or(u64::MAX));
291                        send_ws_message(&mut websocket, Message::binary(output))?;
292                    }
293                }
294            }
295        }
296
297        // --- Flow control: evaluate policy and drain output queue ---
298        if let Some(ref mut fc) = fc_state {
299            fc.maybe_reset_rate_window();
300            let was_paused = fc.pty_reads_paused;
301            let decision = fc.evaluate();
302
303            // Log non-stable decisions
304            if decision.chosen_action.is_some() {
305                telemetry.write(
306                    "flow_control_decision",
307                    json!({
308                        "action": format!("{:?}", decision.chosen_action),
309                        "reason": format!("{:?}", decision.reason),
310                        "fairness_index": decision.fairness_index,
311                        "output_batch_budget": decision.output_batch_budget_bytes,
312                        "should_pause_pty_reads": decision.should_pause_pty_reads,
313                        "output_queue_bytes": fc.output_queue_bytes(),
314                    }),
315                )?;
316            }
317
318            emit_flow_control_stall_if_transitioned(telemetry, fc, was_paused)?;
319
320            // Drain output queue per budget
321            let batch = fc.drain_output(decision.output_batch_budget_bytes);
322            if !batch.is_empty() {
323                progressed = true;
324                counters.ws_out_bytes = counters
325                    .ws_out_bytes
326                    .saturating_add(u64::try_from(batch.len()).unwrap_or(u64::MAX));
327                send_ws_message(&mut websocket, Message::binary(batch))?;
328            }
329
330            // Send replenishment if needed
331            if fc.should_send_replenish() {
332                emit_flow_control_event(
333                    telemetry,
334                    "input",
335                    "replenish",
336                    fc.input_window,
337                    fc.input_pending_bytes,
338                )?;
339                telemetry.write(
340                    "flow_control_replenish",
341                    json!({
342                        "input_consumed": fc.input_consumed,
343                        "input_window": fc.input_window,
344                    }),
345                )?;
346                fc.record_replenish_sent();
347            }
348        }
349
350        // --- Child exit ---
351        if let Some(status) = pty.try_wait()? {
352            exit_code = Some(status.exit_code());
353            exit_signal = status.signal().map(ToOwned::to_owned);
354
355            let trailing = pty.drain_output_nonblocking()?;
356            if !trailing.is_empty() {
357                counters.pty_out_bytes = counters
358                    .pty_out_bytes
359                    .saturating_add(u64::try_from(trailing.len()).unwrap_or(u64::MAX));
360
361                match fc_state {
362                    Some(ref mut fc) => {
363                        fc.enqueue_output(&trailing);
364                        let remaining = fc.drain_all_output();
365                        if !remaining.is_empty() {
366                            counters.ws_out_bytes = counters
367                                .ws_out_bytes
368                                .saturating_add(u64::try_from(remaining.len()).unwrap_or(u64::MAX));
369                            send_ws_message(&mut websocket, Message::binary(remaining))?;
370                        }
371                    }
372                    None => {
373                        counters.ws_out_bytes = counters
374                            .ws_out_bytes
375                            .saturating_add(u64::try_from(trailing.len()).unwrap_or(u64::MAX));
376                        send_ws_message(&mut websocket, Message::binary(trailing))?;
377                    }
378                }
379            }
380
381            if let Some(ref fc) = fc_state {
382                telemetry.write("flow_control_summary", fc.summary_json())?;
383            }
384
385            let end = json!({
386                "type": "session_end",
387                "exit_code": exit_code,
388                "exit_signal": exit_signal,
389            });
390            send_ws_message(&mut websocket, Message::text(end.to_string()))?;
391            let _ = websocket.close(None);
392            return Ok(make_summary(
393                session_id,
394                &counters,
395                exit_code,
396                exit_signal.clone(),
397            ));
398        }
399
400        if !progressed {
401            thread::sleep(config.idle_sleep);
402        }
403    }
404}
405
406fn handle_ws_message(
407    websocket: &mut WebSocket<TcpStream>,
408    pty: &mut PtyBridgeSession,
409    counters: &mut Counters,
410    telemetry: &mut TelemetrySink,
411    message: Message,
412    fc_state: &mut Option<FlowControlBridgeState>,
413) -> io::Result<bool> {
414    match message {
415        Message::Binary(bytes) => {
416            let byte_len = bytes.len();
417            let len32 = u32::try_from(byte_len).unwrap_or(u32::MAX);
418            counters.ws_in_bytes = counters
419                .ws_in_bytes
420                .saturating_add(u64::try_from(byte_len).unwrap_or(u64::MAX));
421
422            // Flow control: check if non-interactive input should be dropped.
423            // For raw binary input we conservatively classify as Interactive
424            // (keystrokes are sent as raw bytes). A future binary-envelope
425            // migration can use semantic sub-types for finer classification.
426            if let Some(ref mut fc) = *fc_state {
427                if fc.should_drop_input(InputEventClass::Interactive) {
428                    // Interactive events are never dropped by policy, so this
429                    // branch is unreachable for Interactive. Kept for symmetry
430                    // with future NonInteractive classification.
431                    fc.fc_counters.input_drops = fc.fc_counters.input_drops.saturating_add(1);
432                    telemetry.write(
433                        "flow_control_input_drop",
434                        json!({
435                            "bytes": byte_len,
436                            "input_queue_bytes": fc.input_pending_bytes,
437                        }),
438                    )?;
439                    return Ok(false);
440                }
441                fc.record_input_arrival(len32);
442            }
443
444            pty.send_input(bytes.as_ref())?;
445            counters.pty_in_bytes = counters
446                .pty_in_bytes
447                .saturating_add(u64::try_from(byte_len).unwrap_or(u64::MAX));
448
449            if let Some(ref mut fc) = *fc_state {
450                fc.record_input_serviced(len32);
451            }
452
453            telemetry.write("bridge_input", json!({ "bytes": byte_len }))?;
454            Ok(false)
455        }
456        Message::Text(text) => match parse_control_message(text.as_ref())? {
457            Some(ControlMessage::Resize { cols, rows }) => {
458                match *fc_state {
459                    Some(ref mut fc) => {
460                        // Coalesce: buffer the resize, flush later in main loop
461                        fc.coalesce_resize(cols, rows);
462                    }
463                    None => {
464                        pty.resize(cols, rows)?;
465                        counters.resize_events = counters.resize_events.saturating_add(1);
466                        telemetry.write("bridge_resize", json!({ "cols": cols, "rows": rows }))?;
467                    }
468                }
469                Ok(false)
470            }
471            Some(ControlMessage::Ping) => {
472                send_ws_message(websocket, Message::Pong(Vec::<u8>::new().into()))?;
473                Ok(false)
474            }
475            Some(ControlMessage::Close) => Ok(true),
476            None => {
477                send_ws_message(
478                    websocket,
479                    Message::text(
480                        json!({ "type": "warning", "message": "unknown_control_message" })
481                            .to_string(),
482                    ),
483                )?;
484                Ok(false)
485            }
486        },
487        Message::Ping(payload) => {
488            send_ws_message(websocket, Message::Pong(payload))?;
489            Ok(false)
490        }
491        Message::Pong(_) | Message::Frame(_) => Ok(false),
492        Message::Close(_) => Ok(true),
493    }
494}
495
496fn send_ws_message(websocket: &mut WebSocket<TcpStream>, message: Message) -> io::Result<()> {
497    let mut retries = 0_u8;
498    loop {
499        match websocket.send(message.clone()) {
500            Ok(()) => return Ok(()),
501            Err(WsError::Io(error)) if error.kind() == io::ErrorKind::WouldBlock && retries < 5 => {
502                retries = retries.saturating_add(1);
503                thread::sleep(Duration::from_millis(2));
504            }
505            Err(error) => {
506                return Err(io::Error::other(format!("websocket send failed: {error}")));
507            }
508        }
509    }
510}
511
512#[allow(clippy::result_large_err)] // ErrorResponse size is dictated by tungstenite's API
513fn accept_websocket(
514    stream: TcpStream,
515    config: &WsPtyBridgeConfig,
516) -> io::Result<WebSocket<TcpStream>> {
517    let allowed_origins = config.allowed_origins.clone();
518    let expected_token = config.auth_token.clone();
519    let ws_config = WebSocketConfig::default()
520        .max_message_size(Some(config.max_message_bytes))
521        .max_frame_size(Some(config.max_message_bytes))
522        .write_buffer_size(0);
523
524    let callback = move |request: &Request, response: Response| {
525        validate_upgrade_request(request, &allowed_origins, expected_token.as_deref())
526            .map(|()| response)
527            .map_err(HandshakeRejection::into_response)
528    };
529
530    accept_hdr_with_config(stream, callback, Some(ws_config)).map_err(|error| {
531        io::Error::new(
532            io::ErrorKind::PermissionDenied,
533            format!("websocket handshake failed: {error}"),
534        )
535    })
536}
537
538#[derive(Debug, Clone, Copy, PartialEq, Eq)]
539enum ControlMessage {
540    Resize { cols: u16, rows: u16 },
541    Ping,
542    Close,
543}
544
545fn parse_control_message(text: &str) -> io::Result<Option<ControlMessage>> {
546    let value: Value = serde_json::from_str(text).map_err(|error| {
547        io::Error::new(
548            io::ErrorKind::InvalidData,
549            format!("invalid control JSON: {error}"),
550        )
551    })?;
552
553    let msg_type = value.get("type").and_then(Value::as_str).ok_or_else(|| {
554        io::Error::new(io::ErrorKind::InvalidData, "control message missing `type`")
555    })?;
556
557    match msg_type {
558        "resize" => {
559            let cols = read_u16_field(&value, "cols")?;
560            let rows = read_u16_field(&value, "rows")?;
561            if cols == 0 || rows == 0 {
562                return Err(io::Error::new(
563                    io::ErrorKind::InvalidInput,
564                    "resize dimensions must be > 0",
565                ));
566            }
567            Ok(Some(ControlMessage::Resize { cols, rows }))
568        }
569        "ping" => Ok(Some(ControlMessage::Ping)),
570        "close" => Ok(Some(ControlMessage::Close)),
571        _ => Ok(None),
572    }
573}
574
575fn read_u16_field(value: &Value, key: &str) -> io::Result<u16> {
576    let raw = value.get(key).and_then(Value::as_u64).ok_or_else(|| {
577        io::Error::new(
578            io::ErrorKind::InvalidData,
579            format!("control message missing numeric `{key}`"),
580        )
581    })?;
582    u16::try_from(raw).map_err(|_| {
583        io::Error::new(
584            io::ErrorKind::InvalidData,
585            format!("`{key}` out of range for u16"),
586        )
587    })
588}
589
590#[derive(Debug, Clone, PartialEq, Eq)]
591struct HandshakeRejection {
592    status: StatusCode,
593    body: String,
594}
595
596impl HandshakeRejection {
597    fn into_response(self) -> ErrorResponse {
598        let mut response = ErrorResponse::new(Some(self.body));
599        *response.status_mut() = self.status;
600        response
601    }
602}
603
604fn validate_upgrade_request(
605    request: &Request,
606    allowed_origins: &[String],
607    expected_token: Option<&str>,
608) -> Result<(), HandshakeRejection> {
609    if !allowed_origins.is_empty() {
610        let origin = request
611            .headers()
612            .get("Origin")
613            .and_then(|value| value.to_str().ok())
614            .ok_or_else(|| HandshakeRejection {
615                status: StatusCode::FORBIDDEN,
616                body: "Origin header missing".to_string(),
617            })?;
618        let allowed = allowed_origins.iter().any(|allowed| allowed == origin);
619        if !allowed {
620            return Err(HandshakeRejection {
621                status: StatusCode::FORBIDDEN,
622                body: "Origin not allowed".to_string(),
623            });
624        }
625    }
626
627    if let Some(token) = expected_token {
628        let query = request.uri().query().ok_or_else(|| HandshakeRejection {
629            status: StatusCode::UNAUTHORIZED,
630            body: "Missing token".to_string(),
631        })?;
632        let presented = query_param(query, "token").ok_or_else(|| HandshakeRejection {
633            status: StatusCode::UNAUTHORIZED,
634            body: "Missing token".to_string(),
635        })?;
636        if presented != token {
637            return Err(HandshakeRejection {
638                status: StatusCode::UNAUTHORIZED,
639                body: "Invalid token".to_string(),
640            });
641        }
642    }
643
644    Ok(())
645}
646
647fn query_param<'a>(query: &'a str, key: &str) -> Option<&'a str> {
648    query.split('&').find_map(|pair| {
649        let mut pieces = pair.splitn(2, '=');
650        let name = pieces.next().unwrap_or_default();
651        let value = pieces.next().unwrap_or_default();
652        if name == key { Some(value) } else { None }
653    })
654}
655
656fn make_session_id() -> String {
657    let ts = OffsetDateTime::now_utc().unix_timestamp_nanos();
658    format!("ws-bridge-{}-{ts}", std::process::id())
659}
660
661fn emit_flow_control_event(
662    telemetry: &mut TelemetrySink,
663    direction: &'static str,
664    action: &'static str,
665    window_bytes: u32,
666    queued_bytes: u32,
667) -> io::Result<()> {
668    telemetry.write(
669        "flow_control",
670        json!({
671            "direction": direction,
672            "action": action,
673            "window_bytes": window_bytes,
674            "queued_bytes": queued_bytes,
675        }),
676    )
677}
678
679fn emit_flow_control_stall_if_transitioned(
680    telemetry: &mut TelemetrySink,
681    fc: &FlowControlBridgeState,
682    was_paused: bool,
683) -> io::Result<()> {
684    if !was_paused && fc.pty_reads_paused {
685        emit_flow_control_event(
686            telemetry,
687            "output",
688            "stall",
689            fc.output_window,
690            fc.output_queue_bytes(),
691        )?;
692    }
693    Ok(())
694}
695
696#[derive(Debug, Default)]
697struct Counters {
698    ws_in_bytes: u64,
699    ws_out_bytes: u64,
700    pty_in_bytes: u64,
701    pty_out_bytes: u64,
702    resize_events: u64,
703}
704
705// ---------------------------------------------------------------------------
706// Flow control
707// ---------------------------------------------------------------------------
708
709/// Flow control configuration for the WebSocket-PTY bridge.
710///
711/// When present in `WsPtyBridgeConfig`, enables credit-window tracking,
712/// bounded output queuing, resize coalescing, and policy-driven backpressure.
713#[derive(Debug, Clone)]
714pub struct FlowControlBridgeConfig {
715    /// Initial output credit window (bytes the server may send before client ACKs).
716    pub output_window: u32,
717    /// Initial input credit window (bytes the client may send before server ACKs).
718    pub input_window: u32,
719    /// Resize coalescing window in milliseconds (0 disables coalescing).
720    pub coalesce_resize_ms: u32,
721    /// Policy engine configuration (queue caps, fairness, batch sizes, etc.).
722    pub policy: FlowControlConfig,
723}
724
725impl Default for FlowControlBridgeConfig {
726    fn default() -> Self {
727        Self {
728            output_window: 65_536,
729            input_window: 8_192,
730            coalesce_resize_ms: 50,
731            policy: FlowControlConfig::default(),
732        }
733    }
734}
735
736/// Aggregate counters emitted as telemetry at session end.
737#[derive(Debug, Default)]
738struct FlowControlCounters {
739    output_queue_peak_bytes: u32,
740    input_drops: u64,
741    decisions_non_stable: u64,
742    resizes_coalesced: u64,
743    pty_read_pauses: u64,
744    replenishments_sent: u64,
745}
746
747/// Mutable flow-control state tracked during a bridge session.
748struct FlowControlBridgeState {
749    policy: FlowControlPolicy,
750
751    // Credit windows
752    output_window: u32,
753    input_window: u32,
754    output_consumed: u32,
755    input_consumed: u32,
756
757    // Bounded output queue (FIFO byte buffer)
758    output_queue: VecDeque<u8>,
759
760    // Input depth (data flows through immediately; we track depth for policy)
761    input_pending_bytes: u32,
762
763    // Fairness tracking
764    serviced_input_bytes: u64,
765    serviced_output_bytes: u64,
766
767    // Timing
768    last_replenish: Instant,
769    output_hard_cap_since: Option<Instant>,
770    rate_window_start: Instant,
771
772    // Rate accumulation in current 1-second window
773    rate_in_arrived: u32,
774    rate_out_arrived: u32,
775    rate_in_serviced: u32,
776    rate_out_serviced: u32,
777
778    // Coalescing
779    coalesce_resize_ms: u32,
780    pending_resize: Option<(u16, u16, Instant)>,
781
782    // State flags
783    pty_reads_paused: bool,
784
785    // Aggregate counters for telemetry
786    fc_counters: FlowControlCounters,
787}
788
789impl FlowControlBridgeState {
790    fn new(config: &FlowControlBridgeConfig) -> Self {
791        let now = Instant::now();
792        Self {
793            policy: FlowControlPolicy::new(config.policy),
794            output_window: config.output_window,
795            input_window: config.input_window,
796            output_consumed: 0,
797            input_consumed: 0,
798            output_queue: VecDeque::new(),
799            input_pending_bytes: 0,
800            serviced_input_bytes: 0,
801            serviced_output_bytes: 0,
802            last_replenish: now,
803            output_hard_cap_since: None,
804            rate_window_start: now,
805            rate_in_arrived: 0,
806            rate_out_arrived: 0,
807            rate_in_serviced: 0,
808            rate_out_serviced: 0,
809            coalesce_resize_ms: config.coalesce_resize_ms,
810            pending_resize: None,
811            pty_reads_paused: false,
812            fc_counters: FlowControlCounters::default(),
813        }
814    }
815
816    /// Record input bytes arriving from the client.
817    fn record_input_arrival(&mut self, len: u32) {
818        self.input_pending_bytes = self.input_pending_bytes.saturating_add(len);
819        self.rate_in_arrived = self.rate_in_arrived.saturating_add(len);
820        self.input_consumed = self.input_consumed.saturating_add(len);
821    }
822
823    /// Record input bytes serviced (written to PTY).
824    fn record_input_serviced(&mut self, len: u32) {
825        self.input_pending_bytes = self.input_pending_bytes.saturating_sub(len);
826        self.serviced_input_bytes = self.serviced_input_bytes.saturating_add(len as u64);
827        self.rate_in_serviced = self.rate_in_serviced.saturating_add(len);
828    }
829
830    /// Enqueue PTY output bytes into the bounded output queue.
831    /// Drops bytes that exceed the hard cap to enforce bounded memory.
832    fn enqueue_output(&mut self, data: &[u8]) {
833        let hard_cap = self.policy.config.output_hard_cap_bytes as usize;
834        let available = hard_cap.saturating_sub(self.output_queue.len());
835        let to_add = data.len().min(available);
836        self.output_queue.extend(&data[..to_add]);
837        self.rate_out_arrived = self.rate_out_arrived.saturating_add(to_add as u32);
838
839        let queue_len = self.output_queue.len() as u32;
840        if queue_len > self.fc_counters.output_queue_peak_bytes {
841            self.fc_counters.output_queue_peak_bytes = queue_len;
842        }
843    }
844
845    /// Drain up to `budget` bytes from the output queue, respecting credit window.
846    fn drain_output(&mut self, budget: u32) -> Vec<u8> {
847        let queue_available = self.output_queue.len().min(budget as usize);
848        let window_available = self.output_window.saturating_sub(self.output_consumed) as usize;
849        let to_drain = queue_available.min(window_available);
850        if to_drain == 0 {
851            return Vec::new();
852        }
853        let batch: Vec<u8> = self.output_queue.drain(..to_drain).collect();
854        let len = batch.len() as u32;
855        self.output_consumed = self.output_consumed.saturating_add(len);
856        self.serviced_output_bytes = self.serviced_output_bytes.saturating_add(len as u64);
857        self.rate_out_serviced = self.rate_out_serviced.saturating_add(len);
858        batch
859    }
860
861    /// Drain ALL remaining bytes (ignoring window), used at session teardown.
862    fn drain_all_output(&mut self) -> Vec<u8> {
863        let batch: Vec<u8> = self.output_queue.drain(..).collect();
864        let len = batch.len() as u32;
865        self.output_consumed = self.output_consumed.saturating_add(len);
866        self.serviced_output_bytes = self.serviced_output_bytes.saturating_add(len as u64);
867        batch
868    }
869
870    /// Check if an input event should be dropped per policy.
871    fn should_drop_input(&self, class: InputEventClass) -> bool {
872        self.policy
873            .should_drop_input_event(self.input_pending_bytes, class)
874    }
875
876    /// Buffer a resize, coalescing with any pending resize.
877    fn coalesce_resize(&mut self, cols: u16, rows: u16) {
878        self.pending_resize = Some((cols, rows, Instant::now()));
879    }
880
881    /// Flush pending resize if coalescing window has elapsed.
882    fn flush_pending_resize(&mut self) -> Option<(u16, u16)> {
883        let (cols, rows, queued_at) = self.pending_resize?;
884        if self.coalesce_resize_ms == 0 {
885            self.pending_resize = None;
886            return Some((cols, rows));
887        }
888        if queued_at.elapsed() >= Duration::from_millis(self.coalesce_resize_ms as u64) {
889            self.fc_counters.resizes_coalesced =
890                self.fc_counters.resizes_coalesced.saturating_add(1);
891            self.pending_resize = None;
892            Some((cols, rows))
893        } else {
894            None
895        }
896    }
897
898    /// Build a `FlowControlSnapshot` for the policy evaluator.
899    fn build_snapshot(&self) -> FlowControlSnapshot {
900        let rate_elapsed = self.rate_window_start.elapsed();
901        let rate_secs = rate_elapsed.as_secs_f64().max(0.001);
902
903        let queue_bytes = self.output_queue.len() as u32;
904        let hard_cap = self.policy.config.output_hard_cap_bytes;
905        let occupancy = if hard_cap > 0 {
906            queue_bytes as f64 / hard_cap as f64
907        } else {
908            0.0
909        };
910
911        let hard_cap_duration_ms = self
912            .output_hard_cap_since
913            .map(|start| start.elapsed().as_millis() as u64)
914            .unwrap_or(0);
915
916        FlowControlSnapshot {
917            queues: QueueDepthBytes {
918                input: self.input_pending_bytes,
919                output: queue_bytes,
920                render_frames: 0,
921            },
922            rates: RateWindowBps {
923                lambda_in: (self.rate_in_arrived as f64 / rate_secs) as u32,
924                lambda_out: (self.rate_out_arrived as f64 / rate_secs) as u32,
925                mu_in: (self.rate_in_serviced as f64 / rate_secs) as u32,
926                mu_out: (self.rate_out_serviced as f64 / rate_secs) as u32,
927            },
928            latency: LatencyWindowMs {
929                key_p50_ms: 10.0 * occupancy,
930                key_p95_ms: 20.0 * occupancy,
931            },
932            serviced_input_bytes: self.serviced_input_bytes,
933            serviced_output_bytes: self.serviced_output_bytes,
934            output_hard_cap_duration_ms: hard_cap_duration_ms,
935        }
936    }
937
938    /// Evaluate the policy and update internal hard-cap/pause state.
939    fn evaluate(&mut self) -> frankenterm_core::flow_control::FlowControlDecision {
940        let snapshot = self.build_snapshot();
941        let decision = self.policy.evaluate(snapshot);
942
943        // Track hard-cap state transitions
944        let at_hard_cap =
945            self.output_queue.len() as u32 >= self.policy.config.output_hard_cap_bytes;
946        match (at_hard_cap, self.output_hard_cap_since) {
947            (true, None) => self.output_hard_cap_since = Some(Instant::now()),
948            (false, Some(_)) => self.output_hard_cap_since = None,
949            _ => {}
950        }
951
952        let was_paused = self.pty_reads_paused;
953        self.pty_reads_paused = decision.should_pause_pty_reads;
954        if decision.should_pause_pty_reads && !was_paused {
955            self.fc_counters.pty_read_pauses = self.fc_counters.pty_read_pauses.saturating_add(1);
956        }
957
958        if decision.chosen_action.is_some() {
959            self.fc_counters.decisions_non_stable =
960                self.fc_counters.decisions_non_stable.saturating_add(1);
961        }
962
963        decision
964    }
965
966    /// Check whether we should send a FlowControl replenishment to the client.
967    fn should_send_replenish(&self) -> bool {
968        let elapsed_ms = self.last_replenish.elapsed().as_millis() as u64;
969        self.policy
970            .should_replenish(self.input_consumed, self.input_window, elapsed_ms)
971    }
972
973    /// Reset replenishment tracking after sending FlowControl message.
974    fn record_replenish_sent(&mut self) {
975        self.input_consumed = 0;
976        self.last_replenish = Instant::now();
977        self.fc_counters.replenishments_sent =
978            self.fc_counters.replenishments_sent.saturating_add(1);
979    }
980
981    /// Process an inbound FlowControl message from the client.
982    /// Currently unused until binary envelope message routing is added;
983    /// kept for downstream integration (bd-2vr05.2.4+).
984    #[allow(dead_code)]
985    fn process_flow_control_msg(&mut self, output_consumed: u32) {
986        self.output_consumed = self.output_consumed.saturating_sub(output_consumed);
987    }
988
989    /// Reset rate counters at ~1 second intervals.
990    fn maybe_reset_rate_window(&mut self) {
991        if self.rate_window_start.elapsed() >= Duration::from_secs(1) {
992            self.rate_in_arrived = 0;
993            self.rate_out_arrived = 0;
994            self.rate_in_serviced = 0;
995            self.rate_out_serviced = 0;
996            self.rate_window_start = Instant::now();
997        }
998    }
999
1000    fn output_queue_bytes(&self) -> u32 {
1001        self.output_queue.len() as u32
1002    }
1003
1004    /// Produce a telemetry summary JSON.
1005    fn summary_json(&self) -> Value {
1006        json!({
1007            "output_queue_peak_bytes": self.fc_counters.output_queue_peak_bytes,
1008            "input_drops": self.fc_counters.input_drops,
1009            "decisions_non_stable": self.fc_counters.decisions_non_stable,
1010            "resizes_coalesced": self.fc_counters.resizes_coalesced,
1011            "pty_read_pauses": self.fc_counters.pty_read_pauses,
1012            "replenishments_sent": self.fc_counters.replenishments_sent,
1013            "serviced_input_bytes": self.serviced_input_bytes,
1014            "serviced_output_bytes": self.serviced_output_bytes,
1015        })
1016    }
1017}
1018
1019#[derive(Debug)]
1020enum ReaderMsg {
1021    Data(Vec<u8>),
1022    Eof,
1023    Err(io::Error),
1024}
1025
1026struct PtyBridgeSession {
1027    child: Box<dyn Child + Send + Sync>,
1028    master: Box<dyn MasterPty + Send>,
1029    writer: Box<dyn Write + Send>,
1030    rx: mpsc::Receiver<ReaderMsg>,
1031    reader_thread: Option<thread::JoinHandle<()>>,
1032    eof: bool,
1033}
1034
1035impl PtyBridgeSession {
1036    fn spawn(config: &WsPtyBridgeConfig) -> io::Result<Self> {
1037        let mut cmd = CommandBuilder::new(&config.command);
1038        for arg in &config.args {
1039            cmd.arg(arg);
1040        }
1041        cmd.env("TERM", &config.term);
1042        for (key, value) in &config.env {
1043            cmd.env(key, value);
1044        }
1045
1046        let pty_system = portable_pty::native_pty_system();
1047        let pair = pty_system
1048            .openpty(PtySize {
1049                rows: config.rows,
1050                cols: config.cols,
1051                pixel_width: 0,
1052                pixel_height: 0,
1053            })
1054            .map_err(portable_pty_error)?;
1055
1056        let child = pair.slave.spawn_command(cmd).map_err(portable_pty_error)?;
1057        let mut reader = pair.master.try_clone_reader().map_err(portable_pty_error)?;
1058        let writer = pair.master.take_writer().map_err(portable_pty_error)?;
1059
1060        let (tx, rx) = mpsc::channel::<ReaderMsg>();
1061        let reader_thread = thread::Builder::new()
1062            .name("ftui-pty-ws-reader".to_string())
1063            .spawn(move || {
1064                let mut buffer = [0_u8; 8192];
1065                loop {
1066                    match reader.read(&mut buffer) {
1067                        Ok(0) => {
1068                            let _ = tx.send(ReaderMsg::Eof);
1069                            break;
1070                        }
1071                        Ok(n) => {
1072                            let _ = tx.send(ReaderMsg::Data(buffer[..n].to_vec()));
1073                        }
1074                        Err(error) if error.kind() == io::ErrorKind::Interrupted => {}
1075                        Err(error) => {
1076                            let _ = tx.send(ReaderMsg::Err(error));
1077                            break;
1078                        }
1079                    }
1080                }
1081            })
1082            .map_err(|error| {
1083                io::Error::other(format!("failed to spawn PTY reader thread: {error}"))
1084            })?;
1085
1086        Ok(Self {
1087            child,
1088            master: pair.master,
1089            writer,
1090            rx,
1091            reader_thread: Some(reader_thread),
1092            eof: false,
1093        })
1094    }
1095
1096    fn send_input(&mut self, bytes: &[u8]) -> io::Result<()> {
1097        if bytes.is_empty() {
1098            return Ok(());
1099        }
1100        self.writer.write_all(bytes)?;
1101        self.writer.flush()?;
1102        Ok(())
1103    }
1104
1105    fn resize(&mut self, cols: u16, rows: u16) -> io::Result<()> {
1106        self.master
1107            .resize(PtySize {
1108                rows,
1109                cols,
1110                pixel_width: 0,
1111                pixel_height: 0,
1112            })
1113            .map_err(portable_pty_error)
1114    }
1115
1116    fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
1117        self.child.try_wait()
1118    }
1119
1120    fn drain_output_nonblocking(&mut self) -> io::Result<Vec<u8>> {
1121        if self.eof {
1122            return Ok(Vec::new());
1123        }
1124
1125        let mut output = Vec::new();
1126        loop {
1127            match self.rx.try_recv() {
1128                Ok(ReaderMsg::Data(bytes)) => output.extend_from_slice(&bytes),
1129                Ok(ReaderMsg::Eof) => {
1130                    self.eof = true;
1131                    break;
1132                }
1133                Ok(ReaderMsg::Err(error)) => return Err(error),
1134                Err(mpsc::TryRecvError::Empty) => break,
1135                Err(mpsc::TryRecvError::Disconnected) => {
1136                    self.eof = true;
1137                    break;
1138                }
1139            }
1140        }
1141
1142        Ok(output)
1143    }
1144}
1145
1146impl Drop for PtyBridgeSession {
1147    fn drop(&mut self) {
1148        let _ = self.child.kill();
1149        if let Some(handle) = self.reader_thread.take() {
1150            detach_reader_join(handle);
1151        }
1152    }
1153}
1154
1155fn detach_reader_join(handle: thread::JoinHandle<()>) {
1156    let _ = thread::Builder::new()
1157        .name("ftui-pty-ws-detached-join".to_string())
1158        .spawn(move || {
1159            let _ = handle.join();
1160        });
1161}
1162
1163fn portable_pty_error<E: std::fmt::Display>(error: E) -> io::Error {
1164    io::Error::other(format!("{error}"))
1165}
1166
1167struct TelemetrySink {
1168    file: Option<File>,
1169    session_id: String,
1170    seq: u64,
1171}
1172
1173impl TelemetrySink {
1174    fn new(path: Option<&Path>, session_id: &str) -> io::Result<Self> {
1175        let file = match path {
1176            Some(path) => Some(OpenOptions::new().create(true).append(true).open(path)?),
1177            None => None,
1178        };
1179        Ok(Self {
1180            file,
1181            session_id: session_id.to_string(),
1182            seq: 0,
1183        })
1184    }
1185
1186    fn write(&mut self, event: &str, payload: Value) -> io::Result<()> {
1187        let Some(file) = self.file.as_mut() else {
1188            return Ok(());
1189        };
1190        let line = json!({
1191            "event": event,
1192            "ts": now_iso8601(),
1193            "session_id": self.session_id,
1194            "seq": self.seq,
1195            "payload": payload,
1196        });
1197        self.seq = self.seq.saturating_add(1);
1198        writeln!(file, "{line}")?;
1199        file.flush()?;
1200        Ok(())
1201    }
1202}
1203
1204fn now_iso8601() -> String {
1205    OffsetDateTime::now_utc()
1206        .format(&Rfc3339)
1207        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
1208}
1209
1210#[cfg(test)]
1211mod tests {
1212    use super::*;
1213    use std::net::TcpListener;
1214    use std::thread;
1215    use std::time::{Duration, Instant};
1216
1217    use tungstenite::stream::MaybeTlsStream;
1218    use tungstenite::{Message, connect};
1219
1220    fn request(uri: &str, origin: Option<&str>) -> Request {
1221        let mut builder = Request::builder().uri(uri);
1222        if let Some(origin) = origin {
1223            builder = builder.header("Origin", origin);
1224        }
1225        builder.body(()).expect("request build")
1226    }
1227
1228    #[test]
1229    fn query_param_extracts_expected_value() {
1230        assert_eq!(query_param("token=abc&x=1", "token"), Some("abc"));
1231        assert_eq!(query_param("x=1&token=abc", "token"), Some("abc"));
1232        assert_eq!(query_param("x=1", "token"), None);
1233    }
1234
1235    #[test]
1236    fn validate_upgrade_request_allows_matching_origin_and_token() {
1237        let req = request("/ws?token=secret", Some("https://allowed.example"));
1238        let result = validate_upgrade_request(
1239            &req,
1240            &[String::from("https://allowed.example")],
1241            Some("secret"),
1242        );
1243        assert!(result.is_ok());
1244    }
1245
1246    #[test]
1247    fn validate_upgrade_request_rejects_invalid_origin() {
1248        let req = request("/ws?token=secret", Some("https://denied.example"));
1249        let result = validate_upgrade_request(
1250            &req,
1251            &[String::from("https://allowed.example")],
1252            Some("secret"),
1253        );
1254        let rejection = result.expect_err("should reject");
1255        assert_eq!(rejection.status, StatusCode::FORBIDDEN);
1256    }
1257
1258    #[test]
1259    fn validate_upgrade_request_rejects_invalid_token() {
1260        let req = request("/ws?token=wrong", Some("https://allowed.example"));
1261        let result = validate_upgrade_request(
1262            &req,
1263            &[String::from("https://allowed.example")],
1264            Some("secret"),
1265        );
1266        let rejection = result.expect_err("should reject");
1267        assert_eq!(rejection.status, StatusCode::UNAUTHORIZED);
1268    }
1269
1270    #[test]
1271    fn parse_control_message_resize_ping_and_unknown() {
1272        assert_eq!(
1273            parse_control_message(r#"{"type":"resize","cols":120,"rows":40}"#).expect("parse"),
1274            Some(ControlMessage::Resize {
1275                cols: 120,
1276                rows: 40
1277            })
1278        );
1279        assert_eq!(
1280            parse_control_message(r#"{"type":"ping"}"#).expect("parse"),
1281            Some(ControlMessage::Ping)
1282        );
1283        assert_eq!(
1284            parse_control_message(r#"{"type":"unknown"}"#).expect("parse"),
1285            None
1286        );
1287    }
1288
1289    #[test]
1290    fn parse_control_message_rejects_invalid_resize_dimensions() {
1291        let error = parse_control_message(r#"{"type":"resize","cols":0,"rows":40}"#)
1292            .expect_err("invalid dims should fail");
1293        assert_eq!(error.kind(), io::ErrorKind::InvalidInput);
1294    }
1295
1296    // --- WsPtyBridgeConfig ---
1297
1298    #[test]
1299    fn config_default_fields() {
1300        let c = WsPtyBridgeConfig::default();
1301        assert_eq!(c.bind_addr, SocketAddr::from(([127, 0, 0, 1], 9231)));
1302        assert!(c.args.is_empty());
1303        assert_eq!(c.term, "xterm-256color");
1304        assert!(c.env.is_empty());
1305        assert_eq!(c.cols, 120);
1306        assert_eq!(c.rows, 40);
1307        assert!(c.allowed_origins.is_empty());
1308        assert!(c.auth_token.is_none());
1309        assert!(c.telemetry_path.is_none());
1310        assert_eq!(c.max_message_bytes, 256 * 1024);
1311        assert_eq!(c.idle_sleep, Duration::from_millis(5));
1312        assert!(c.accept_once);
1313    }
1314
1315    #[test]
1316    fn config_clone() {
1317        let c1 = WsPtyBridgeConfig::default();
1318        let c2 = c1.clone();
1319        assert_eq!(c2.cols, c1.cols);
1320        assert_eq!(c2.rows, c1.rows);
1321        assert_eq!(c2.term, c1.term);
1322    }
1323
1324    #[test]
1325    fn config_debug() {
1326        let c = WsPtyBridgeConfig::default();
1327        let dbg = format!("{c:?}");
1328        assert!(dbg.contains("WsPtyBridgeConfig"));
1329        assert!(dbg.contains("bind_addr"));
1330    }
1331
1332    // --- BridgeSummary ---
1333
1334    #[test]
1335    fn bridge_summary_as_json_contains_all_fields() {
1336        let summary = BridgeSummary {
1337            session_id: "test-123".to_string(),
1338            ws_in_bytes: 100,
1339            ws_out_bytes: 200,
1340            pty_in_bytes: 50,
1341            pty_out_bytes: 150,
1342            resize_events: 3,
1343            exit_code: Some(0),
1344            exit_signal: None,
1345        };
1346        let json = summary.as_json();
1347        assert_eq!(json["session_id"], "test-123");
1348        assert_eq!(json["ws_in_bytes"], 100);
1349        assert_eq!(json["ws_out_bytes"], 200);
1350        assert_eq!(json["pty_in_bytes"], 50);
1351        assert_eq!(json["pty_out_bytes"], 150);
1352        assert_eq!(json["resize_events"], 3);
1353        assert_eq!(json["exit_code"], 0);
1354        assert!(json["exit_signal"].is_null());
1355    }
1356
1357    #[test]
1358    fn bridge_summary_as_json_with_signal() {
1359        let summary = BridgeSummary {
1360            session_id: "s".to_string(),
1361            ws_in_bytes: 0,
1362            ws_out_bytes: 0,
1363            pty_in_bytes: 0,
1364            pty_out_bytes: 0,
1365            resize_events: 0,
1366            exit_code: None,
1367            exit_signal: Some("SIGKILL".to_string()),
1368        };
1369        let json = summary.as_json();
1370        assert!(json["exit_code"].is_null());
1371        assert_eq!(json["exit_signal"], "SIGKILL");
1372    }
1373
1374    #[test]
1375    fn bridge_summary_clone_and_eq() {
1376        let s1 = BridgeSummary {
1377            session_id: "a".to_string(),
1378            ws_in_bytes: 1,
1379            ws_out_bytes: 2,
1380            pty_in_bytes: 3,
1381            pty_out_bytes: 4,
1382            resize_events: 5,
1383            exit_code: Some(42),
1384            exit_signal: None,
1385        };
1386        let s2 = s1.clone();
1387        assert_eq!(s1, s2);
1388    }
1389
1390    #[test]
1391    fn bridge_summary_debug() {
1392        let s = BridgeSummary {
1393            session_id: "x".to_string(),
1394            ws_in_bytes: 0,
1395            ws_out_bytes: 0,
1396            pty_in_bytes: 0,
1397            pty_out_bytes: 0,
1398            resize_events: 0,
1399            exit_code: None,
1400            exit_signal: None,
1401        };
1402        let dbg = format!("{s:?}");
1403        assert!(dbg.contains("BridgeSummary"));
1404        assert!(dbg.contains("session_id"));
1405    }
1406
1407    // --- ControlMessage ---
1408
1409    #[test]
1410    fn control_message_close() {
1411        assert_eq!(
1412            parse_control_message(r#"{"type":"close"}"#).expect("parse"),
1413            Some(ControlMessage::Close)
1414        );
1415    }
1416
1417    #[test]
1418    fn control_message_debug_clone_eq() {
1419        let m = ControlMessage::Resize { cols: 80, rows: 24 };
1420        let m2 = m;
1421        assert_eq!(m, m2);
1422        let dbg = format!("{m:?}");
1423        assert!(dbg.contains("Resize"));
1424        assert!(dbg.contains("80"));
1425    }
1426
1427    // --- parse_control_message edge cases ---
1428
1429    #[test]
1430    fn parse_control_message_invalid_json() {
1431        let err = parse_control_message("not json").expect_err("should fail");
1432        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
1433    }
1434
1435    #[test]
1436    fn parse_control_message_missing_type() {
1437        let err = parse_control_message(r#"{"cols":80}"#).expect_err("should fail");
1438        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
1439    }
1440
1441    #[test]
1442    fn parse_control_message_resize_missing_cols() {
1443        let err = parse_control_message(r#"{"type":"resize","rows":40}"#).expect_err("should fail");
1444        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
1445    }
1446
1447    #[test]
1448    fn parse_control_message_resize_missing_rows() {
1449        let err = parse_control_message(r#"{"type":"resize","cols":80}"#).expect_err("should fail");
1450        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
1451    }
1452
1453    #[test]
1454    fn parse_control_message_resize_zero_rows() {
1455        let err = parse_control_message(r#"{"type":"resize","cols":80,"rows":0}"#)
1456            .expect_err("should fail");
1457        assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
1458    }
1459
1460    #[test]
1461    fn parse_control_message_resize_zero_cols() {
1462        let err = parse_control_message(r#"{"type":"resize","cols":0,"rows":24}"#)
1463            .expect_err("should fail");
1464        assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
1465    }
1466
1467    #[test]
1468    fn parse_control_message_resize_large_values() {
1469        // u16::MAX = 65535 is valid
1470        let result =
1471            parse_control_message(r#"{"type":"resize","cols":65535,"rows":65535}"#).expect("parse");
1472        assert_eq!(
1473            result,
1474            Some(ControlMessage::Resize {
1475                cols: 65535,
1476                rows: 65535
1477            })
1478        );
1479    }
1480
1481    #[test]
1482    fn parse_control_message_resize_overflow_u16() {
1483        let err = parse_control_message(r#"{"type":"resize","cols":70000,"rows":40}"#)
1484            .expect_err("should fail");
1485        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
1486    }
1487
1488    // --- read_u16_field ---
1489
1490    #[test]
1491    fn read_u16_field_valid() {
1492        let v: Value = serde_json::from_str(r#"{"x": 42}"#).unwrap();
1493        assert_eq!(read_u16_field(&v, "x").unwrap(), 42);
1494    }
1495
1496    #[test]
1497    fn read_u16_field_missing() {
1498        let v: Value = serde_json::from_str(r#"{"x": 42}"#).unwrap();
1499        let err = read_u16_field(&v, "y").expect_err("should fail");
1500        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
1501    }
1502
1503    #[test]
1504    fn read_u16_field_not_numeric() {
1505        let v: Value = serde_json::from_str(r#"{"x": "hello"}"#).unwrap();
1506        let err = read_u16_field(&v, "x").expect_err("should fail");
1507        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
1508    }
1509
1510    #[test]
1511    fn read_u16_field_overflow() {
1512        let v: Value = serde_json::from_str(r#"{"x": 100000}"#).unwrap();
1513        let err = read_u16_field(&v, "x").expect_err("should fail");
1514        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
1515    }
1516
1517    // --- query_param edge cases ---
1518
1519    #[test]
1520    fn query_param_empty_string() {
1521        assert_eq!(query_param("", "token"), None);
1522    }
1523
1524    #[test]
1525    fn query_param_missing_value() {
1526        assert_eq!(query_param("token", "token"), Some(""));
1527    }
1528
1529    #[test]
1530    fn query_param_first_of_duplicates() {
1531        assert_eq!(
1532            query_param("token=first&token=second", "token"),
1533            Some("first")
1534        );
1535    }
1536
1537    #[test]
1538    fn query_param_value_with_equals() {
1539        assert_eq!(query_param("token=a=b", "token"), Some("a=b"));
1540    }
1541
1542    // --- validate_upgrade_request edge cases ---
1543
1544    #[test]
1545    fn validate_no_origin_required_no_token_required() {
1546        let req = request("/ws", None);
1547        let result = validate_upgrade_request(&req, &[], None);
1548        assert!(result.is_ok());
1549    }
1550
1551    #[test]
1552    fn validate_origin_required_but_header_missing() {
1553        let req = request("/ws", None);
1554        let result =
1555            validate_upgrade_request(&req, &[String::from("https://allowed.example")], None);
1556        let rejection = result.expect_err("should reject");
1557        assert_eq!(rejection.status, StatusCode::FORBIDDEN);
1558        assert!(rejection.body.contains("Origin"));
1559    }
1560
1561    #[test]
1562    fn validate_token_required_but_no_query_string() {
1563        let req = request("/ws", None);
1564        let result = validate_upgrade_request(&req, &[], Some("secret"));
1565        let rejection = result.expect_err("should reject");
1566        assert_eq!(rejection.status, StatusCode::UNAUTHORIZED);
1567    }
1568
1569    #[test]
1570    fn validate_token_required_but_missing_from_query() {
1571        let req = request("/ws?other=value", None);
1572        let result = validate_upgrade_request(&req, &[], Some("secret"));
1573        let rejection = result.expect_err("should reject");
1574        assert_eq!(rejection.status, StatusCode::UNAUTHORIZED);
1575    }
1576
1577    #[test]
1578    fn validate_correct_token_no_origin_restriction() {
1579        let req = request("/ws?token=mysecret", None);
1580        let result = validate_upgrade_request(&req, &[], Some("mysecret"));
1581        assert!(result.is_ok());
1582    }
1583
1584    // --- HandshakeRejection ---
1585
1586    #[test]
1587    fn handshake_rejection_into_response() {
1588        let rejection = HandshakeRejection {
1589            status: StatusCode::FORBIDDEN,
1590            body: "test body".to_string(),
1591        };
1592        let response = rejection.into_response();
1593        assert_eq!(response.status(), StatusCode::FORBIDDEN);
1594    }
1595
1596    #[test]
1597    fn handshake_rejection_debug_clone_eq() {
1598        let r1 = HandshakeRejection {
1599            status: StatusCode::UNAUTHORIZED,
1600            body: "bad".to_string(),
1601        };
1602        let r2 = r1.clone();
1603        assert_eq!(r1, r2);
1604        let dbg = format!("{r1:?}");
1605        assert!(dbg.contains("HandshakeRejection"));
1606    }
1607
1608    // --- Counters ---
1609
1610    #[test]
1611    fn counters_default() {
1612        let c = Counters::default();
1613        assert_eq!(c.ws_in_bytes, 0);
1614        assert_eq!(c.ws_out_bytes, 0);
1615        assert_eq!(c.pty_in_bytes, 0);
1616        assert_eq!(c.pty_out_bytes, 0);
1617        assert_eq!(c.resize_events, 0);
1618    }
1619
1620    #[test]
1621    fn counters_debug() {
1622        let c = Counters::default();
1623        let dbg = format!("{c:?}");
1624        assert!(dbg.contains("Counters"));
1625    }
1626
1627    // --- make_session_id ---
1628
1629    #[test]
1630    fn make_session_id_format() {
1631        let id = make_session_id();
1632        assert!(id.starts_with("ws-bridge-"));
1633        assert!(id.len() > 15);
1634    }
1635
1636    #[test]
1637    fn make_session_id_unique() {
1638        let id1 = make_session_id();
1639        thread::sleep(Duration::from_millis(1));
1640        let id2 = make_session_id();
1641        assert_ne!(id1, id2);
1642    }
1643
1644    // --- now_iso8601 ---
1645
1646    #[test]
1647    fn now_iso8601_format() {
1648        let ts = now_iso8601();
1649        assert!(ts.contains('T'));
1650        assert!(ts.contains('-'));
1651        assert!(ts.len() >= 20);
1652    }
1653
1654    // --- TelemetrySink ---
1655
1656    #[test]
1657    fn telemetry_sink_no_path_write_is_noop() {
1658        let mut sink = TelemetrySink::new(None, "test").expect("create sink");
1659        // Writing without a file returns early — seq stays at 0.
1660        sink.write("event", json!({"key": "value"})).expect("write");
1661        assert_eq!(sink.seq, 0);
1662    }
1663
1664    #[test]
1665    fn telemetry_sink_with_path_writes_jsonl() {
1666        let dir = std::env::temp_dir().join("ftui-test-telemetry");
1667        std::fs::create_dir_all(&dir).expect("create dir");
1668        let path = dir.join("test_telemetry.jsonl");
1669        let _ = std::fs::remove_file(&path);
1670
1671        {
1672            let mut sink = TelemetrySink::new(Some(&path), "sess-1").expect("create sink");
1673            sink.write("start", json!({"x": 1})).expect("write 1");
1674            sink.write("end", json!({"x": 2})).expect("write 2");
1675            assert_eq!(sink.seq, 2);
1676        }
1677
1678        let content = std::fs::read_to_string(&path).expect("read file");
1679        let lines: Vec<&str> = content.lines().collect();
1680        assert_eq!(lines.len(), 2);
1681        for line in &lines {
1682            let v: Value = serde_json::from_str(line).expect("parse JSON");
1683            assert_eq!(v["session_id"], "sess-1");
1684            assert!(v["ts"].is_string());
1685            assert!(v["event"].is_string());
1686        }
1687
1688        let _ = std::fs::remove_file(&path);
1689        let _ = std::fs::remove_dir(&dir);
1690    }
1691
1692    #[test]
1693    fn flow_control_event_writes_expected_payload_fields() {
1694        let dir = std::env::temp_dir().join("ftui-test-flow-control-event");
1695        std::fs::create_dir_all(&dir).expect("create dir");
1696        let path = dir.join("flow_control_event.jsonl");
1697        let _ = std::fs::remove_file(&path);
1698
1699        {
1700            let mut sink = TelemetrySink::new(Some(&path), "sess-fc").expect("create sink");
1701            emit_flow_control_event(&mut sink, "output", "stall", 65_536, 65_000)
1702                .expect("write flow_control");
1703        }
1704
1705        let content = std::fs::read_to_string(&path).expect("read file");
1706        let lines: Vec<&str> = content.lines().collect();
1707        assert_eq!(lines.len(), 1);
1708
1709        let parsed: Value = serde_json::from_str(lines[0]).expect("parse flow_control event");
1710        assert_eq!(parsed["event"], "flow_control");
1711        assert_eq!(parsed["payload"]["direction"], "output");
1712        assert_eq!(parsed["payload"]["action"], "stall");
1713        assert_eq!(parsed["payload"]["window_bytes"], 65_536);
1714        assert_eq!(parsed["payload"]["queued_bytes"], 65_000);
1715
1716        let _ = std::fs::remove_file(&path);
1717        let _ = std::fs::remove_dir(&dir);
1718    }
1719
1720    #[test]
1721    fn flow_control_stall_emits_only_on_pause_transition() {
1722        let dir = std::env::temp_dir().join("ftui-test-flow-control-stall");
1723        std::fs::create_dir_all(&dir).expect("create dir");
1724        let path = dir.join("flow_control_stall.jsonl");
1725        let _ = std::fs::remove_file(&path);
1726
1727        let mut fc = default_fc_state();
1728        let payload = [b'x'; 128];
1729        fc.enqueue_output(&payload);
1730        fc.pty_reads_paused = true;
1731
1732        {
1733            let mut sink = TelemetrySink::new(Some(&path), "sess-stall").expect("create sink");
1734            emit_flow_control_stall_if_transitioned(&mut sink, &fc, false)
1735                .expect("emit rising-edge stall");
1736            emit_flow_control_stall_if_transitioned(&mut sink, &fc, true)
1737                .expect("do not emit while already paused");
1738        }
1739
1740        let content = std::fs::read_to_string(&path).expect("read file");
1741        let lines: Vec<&str> = content.lines().collect();
1742        assert_eq!(lines.len(), 1);
1743
1744        let parsed: Value = serde_json::from_str(lines[0]).expect("parse flow_control stall");
1745        assert_eq!(parsed["event"], "flow_control");
1746        assert_eq!(parsed["payload"]["direction"], "output");
1747        assert_eq!(parsed["payload"]["action"], "stall");
1748        assert_eq!(parsed["payload"]["window_bytes"], fc.output_window);
1749        assert_eq!(parsed["payload"]["queued_bytes"], fc.output_queue_bytes());
1750
1751        let _ = std::fs::remove_file(&path);
1752        let _ = std::fs::remove_dir(&dir);
1753    }
1754
1755    #[cfg(unix)]
1756    #[test]
1757    fn bridge_smoke_echoes_bytes_through_pty() {
1758        let listener =
1759            TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("bind ephemeral port");
1760        let bind_addr = listener.local_addr().expect("local addr");
1761        drop(listener);
1762
1763        let config = WsPtyBridgeConfig {
1764            bind_addr,
1765            accept_once: true,
1766            command: "/bin/sh".to_string(),
1767            args: vec!["-c".to_string(), "cat".to_string()],
1768            idle_sleep: Duration::from_millis(1),
1769            ..WsPtyBridgeConfig::default()
1770        };
1771
1772        let handle = thread::spawn(move || run_ws_pty_bridge(config));
1773        thread::sleep(Duration::from_millis(75));
1774
1775        let url = format!("ws://{bind_addr}/ws");
1776        let (mut client, _response) = connect(url).expect("connect websocket");
1777        if let MaybeTlsStream::Plain(stream) = client.get_mut() {
1778            stream
1779                .set_read_timeout(Some(Duration::from_millis(50)))
1780                .expect("set read timeout");
1781        }
1782        client
1783            .send(Message::Binary(b"hello-through-bridge\n".to_vec().into()))
1784            .expect("send input");
1785
1786        let deadline = Instant::now() + Duration::from_secs(3);
1787        let mut observed = Vec::new();
1788        let mut last_error: Option<WsError> = None;
1789        while Instant::now() < deadline {
1790            match client.read() {
1791                Ok(Message::Binary(bytes)) => {
1792                    observed.extend_from_slice(bytes.as_ref());
1793                    if observed
1794                        .windows(b"hello-through-bridge".len())
1795                        .any(|window| window == b"hello-through-bridge")
1796                    {
1797                        break;
1798                    }
1799                }
1800                Ok(_) => {}
1801                Err(WsError::Io(error))
1802                    if matches!(
1803                        error.kind(),
1804                        io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut
1805                    ) => {}
1806                Err(error) => {
1807                    last_error = Some(error);
1808                    break;
1809                }
1810            }
1811        }
1812
1813        let saw_echo = observed
1814            .windows(b"hello-through-bridge".len())
1815            .any(|window| window == b"hello-through-bridge");
1816
1817        if let Err(err) = client.send(Message::Text(r#"{"type":"close"}"#.to_string().into())) {
1818            // Preserve the first meaningful error for assertion diagnostics.
1819            if last_error.is_none() {
1820                last_error = Some(err);
1821            }
1822        }
1823        let result = handle.join().expect("bridge thread join");
1824        result.expect("bridge result");
1825
1826        assert!(
1827            saw_echo,
1828            "expected PTY echo in websocket output; last_error={last_error:?}; observed_len={}",
1829            observed.len()
1830        );
1831    }
1832
1833    // --- FlowControlBridgeConfig ---
1834
1835    #[test]
1836    fn flow_control_bridge_config_default() {
1837        let c = FlowControlBridgeConfig::default();
1838        assert_eq!(c.output_window, 65_536);
1839        assert_eq!(c.input_window, 8_192);
1840        assert_eq!(c.coalesce_resize_ms, 50);
1841    }
1842
1843    #[test]
1844    fn flow_control_bridge_config_debug_clone() {
1845        let c = FlowControlBridgeConfig::default();
1846        let c2 = c.clone();
1847        let dbg = format!("{c2:?}");
1848        assert!(dbg.contains("FlowControlBridgeConfig"));
1849        assert!(dbg.contains("output_window"));
1850    }
1851
1852    // --- FlowControlBridgeState: output queuing ---
1853
1854    fn default_fc_state() -> FlowControlBridgeState {
1855        FlowControlBridgeState::new(&FlowControlBridgeConfig::default())
1856    }
1857
1858    #[test]
1859    fn fc_state_new_has_empty_queue() {
1860        let fc = default_fc_state();
1861        assert_eq!(fc.output_queue_bytes(), 0);
1862        assert!(!fc.pty_reads_paused);
1863        assert_eq!(fc.input_pending_bytes, 0);
1864    }
1865
1866    #[test]
1867    fn fc_enqueue_and_drain_output() {
1868        let mut fc = default_fc_state();
1869        fc.enqueue_output(&[0xAA; 100]);
1870        assert_eq!(fc.output_queue_bytes(), 100);
1871
1872        let batch = fc.drain_output(50);
1873        assert_eq!(batch.len(), 50);
1874        assert_eq!(fc.output_queue_bytes(), 50);
1875
1876        let batch2 = fc.drain_output(100);
1877        assert_eq!(batch2.len(), 50);
1878        assert_eq!(fc.output_queue_bytes(), 0);
1879    }
1880
1881    #[test]
1882    fn fc_enqueue_respects_hard_cap() {
1883        let mut config = FlowControlBridgeConfig::default();
1884        config.policy.output_hard_cap_bytes = 200;
1885        let mut fc = FlowControlBridgeState::new(&config);
1886
1887        fc.enqueue_output(&[0xBB; 300]);
1888        // Only 200 bytes should be enqueued (hard cap)
1889        assert_eq!(fc.output_queue_bytes(), 200);
1890    }
1891
1892    #[test]
1893    fn fc_drain_respects_window() {
1894        let config = FlowControlBridgeConfig {
1895            output_window: 50,
1896            ..FlowControlBridgeConfig::default()
1897        };
1898        let mut fc = FlowControlBridgeState::new(&config);
1899
1900        fc.enqueue_output(&[0xCC; 200]);
1901        let batch = fc.drain_output(200);
1902        // Window allows only 50 bytes
1903        assert_eq!(batch.len(), 50);
1904        assert_eq!(fc.output_queue_bytes(), 150);
1905    }
1906
1907    #[test]
1908    fn fc_drain_all_output_ignores_window() {
1909        let config = FlowControlBridgeConfig {
1910            output_window: 10,
1911            ..FlowControlBridgeConfig::default()
1912        };
1913        let mut fc = FlowControlBridgeState::new(&config);
1914
1915        fc.enqueue_output(&[0xDD; 100]);
1916        let batch = fc.drain_all_output();
1917        assert_eq!(batch.len(), 100);
1918        assert_eq!(fc.output_queue_bytes(), 0);
1919    }
1920
1921    #[test]
1922    fn fc_output_queue_peak_tracked() {
1923        let mut fc = default_fc_state();
1924        fc.enqueue_output(&[0x01; 500]);
1925        assert_eq!(fc.fc_counters.output_queue_peak_bytes, 500);
1926        fc.drain_output(200);
1927        fc.enqueue_output(&[0x02; 100]);
1928        // Peak should still be 500
1929        assert_eq!(fc.fc_counters.output_queue_peak_bytes, 500);
1930        fc.enqueue_output(&[0x03; 500]);
1931        // Now peak should be 900 (300 remaining + 500 new, but capped)
1932        assert!(fc.fc_counters.output_queue_peak_bytes >= 800);
1933    }
1934
1935    // --- FlowControlBridgeState: input tracking ---
1936
1937    #[test]
1938    fn fc_input_arrival_and_service() {
1939        let mut fc = default_fc_state();
1940        fc.record_input_arrival(100);
1941        assert_eq!(fc.input_pending_bytes, 100);
1942        assert_eq!(fc.input_consumed, 100);
1943
1944        fc.record_input_serviced(60);
1945        assert_eq!(fc.input_pending_bytes, 40);
1946        assert_eq!(fc.serviced_input_bytes, 60);
1947    }
1948
1949    #[test]
1950    fn fc_should_drop_interactive_never() {
1951        let mut config = FlowControlBridgeConfig::default();
1952        config.policy.input_hard_cap_bytes = 100;
1953        let mut fc = FlowControlBridgeState::new(&config);
1954        fc.input_pending_bytes = 200; // way over hard cap
1955        // Interactive events are never dropped
1956        assert!(!fc.should_drop_input(InputEventClass::Interactive));
1957    }
1958
1959    #[test]
1960    fn fc_should_drop_noninteractive_at_hard_cap() {
1961        let mut config = FlowControlBridgeConfig::default();
1962        config.policy.input_hard_cap_bytes = 100;
1963        let mut fc = FlowControlBridgeState::new(&config);
1964
1965        fc.input_pending_bytes = 50;
1966        assert!(!fc.should_drop_input(InputEventClass::NonInteractive));
1967
1968        fc.input_pending_bytes = 100;
1969        assert!(fc.should_drop_input(InputEventClass::NonInteractive));
1970
1971        fc.input_pending_bytes = 200;
1972        assert!(fc.should_drop_input(InputEventClass::NonInteractive));
1973    }
1974
1975    // --- FlowControlBridgeState: resize coalescing ---
1976
1977    #[test]
1978    fn fc_coalesce_resize_disabled() {
1979        let config = FlowControlBridgeConfig {
1980            coalesce_resize_ms: 0,
1981            ..FlowControlBridgeConfig::default()
1982        };
1983        let mut fc = FlowControlBridgeState::new(&config);
1984
1985        fc.coalesce_resize(80, 24);
1986        let result = fc.flush_pending_resize();
1987        assert_eq!(result, Some((80, 24)));
1988    }
1989
1990    #[test]
1991    fn fc_coalesce_resize_defers() {
1992        let config = FlowControlBridgeConfig {
1993            coalesce_resize_ms: 100, // 100ms window
1994            ..FlowControlBridgeConfig::default()
1995        };
1996        let mut fc = FlowControlBridgeState::new(&config);
1997
1998        fc.coalesce_resize(80, 24);
1999        // Immediately after, flush should return None (timer not elapsed)
2000        let result = fc.flush_pending_resize();
2001        assert!(result.is_none());
2002    }
2003
2004    #[test]
2005    fn fc_coalesce_resize_overwrite() {
2006        let config = FlowControlBridgeConfig {
2007            coalesce_resize_ms: 0,
2008            ..FlowControlBridgeConfig::default()
2009        };
2010        let mut fc = FlowControlBridgeState::new(&config);
2011
2012        fc.coalesce_resize(80, 24);
2013        fc.coalesce_resize(120, 40); // Overwrite pending
2014        let result = fc.flush_pending_resize();
2015        assert_eq!(result, Some((120, 40))); // Gets the latest
2016    }
2017
2018    #[test]
2019    fn fc_coalesce_resize_flushes_after_timeout() {
2020        let config = FlowControlBridgeConfig {
2021            coalesce_resize_ms: 10, // 10ms window
2022            ..FlowControlBridgeConfig::default()
2023        };
2024        let mut fc = FlowControlBridgeState::new(&config);
2025
2026        fc.coalesce_resize(132, 50);
2027        thread::sleep(Duration::from_millis(15));
2028        let result = fc.flush_pending_resize();
2029        assert_eq!(result, Some((132, 50)));
2030        assert_eq!(fc.fc_counters.resizes_coalesced, 1);
2031    }
2032
2033    #[test]
2034    fn fc_flush_no_pending_resize() {
2035        let mut fc = default_fc_state();
2036        assert_eq!(fc.flush_pending_resize(), None);
2037    }
2038
2039    // --- FlowControlBridgeState: snapshot and evaluation ---
2040
2041    #[test]
2042    fn fc_build_snapshot_empty_state() {
2043        let fc = default_fc_state();
2044        let snapshot = fc.build_snapshot();
2045        assert_eq!(snapshot.queues.input, 0);
2046        assert_eq!(snapshot.queues.output, 0);
2047        assert_eq!(snapshot.serviced_input_bytes, 0);
2048        assert_eq!(snapshot.serviced_output_bytes, 0);
2049    }
2050
2051    #[test]
2052    fn fc_build_snapshot_with_data() {
2053        let mut fc = default_fc_state();
2054        fc.enqueue_output(&[0xAA; 1000]);
2055        fc.record_input_arrival(200);
2056        fc.record_input_serviced(150);
2057
2058        let snapshot = fc.build_snapshot();
2059        assert_eq!(snapshot.queues.output, 1000);
2060        assert_eq!(snapshot.queues.input, 50);
2061        assert_eq!(snapshot.serviced_input_bytes, 150);
2062    }
2063
2064    #[test]
2065    fn fc_evaluate_no_pause_when_idle() {
2066        let mut fc = default_fc_state();
2067        let decision = fc.evaluate();
2068        // When idle, the output queue is empty so reads should not be paused
2069        assert!(!decision.should_pause_pty_reads);
2070    }
2071
2072    #[test]
2073    fn fc_evaluate_pauses_reads_at_hard_cap() {
2074        let mut config = FlowControlBridgeConfig::default();
2075        config.policy.output_hard_cap_bytes = 500;
2076        let mut fc = FlowControlBridgeState::new(&config);
2077
2078        // Fill queue to hard cap
2079        fc.enqueue_output(&[0xAA; 500]);
2080        let decision = fc.evaluate();
2081        assert!(decision.should_pause_pty_reads);
2082        assert!(fc.pty_reads_paused);
2083        assert_eq!(fc.fc_counters.pty_read_pauses, 1);
2084    }
2085
2086    #[test]
2087    fn fc_evaluate_resumes_reads_after_drain() {
2088        let mut config = FlowControlBridgeConfig::default();
2089        config.policy.output_hard_cap_bytes = 500;
2090        config.output_window = 500;
2091        let mut fc = FlowControlBridgeState::new(&config);
2092
2093        fc.enqueue_output(&[0xAA; 500]);
2094        let _ = fc.evaluate();
2095        assert!(fc.pty_reads_paused);
2096
2097        // Drain below hard cap
2098        let _ = fc.drain_output(400);
2099        let decision = fc.evaluate();
2100        assert!(!decision.should_pause_pty_reads);
2101        assert!(!fc.pty_reads_paused);
2102    }
2103
2104    // --- FlowControlBridgeState: replenishment ---
2105
2106    #[test]
2107    fn fc_replenish_at_50_percent_consumption() {
2108        let mut fc = default_fc_state();
2109        // input_window is 8192, so 50% = 4096
2110        fc.input_consumed = 4096;
2111        assert!(fc.should_send_replenish());
2112    }
2113
2114    #[test]
2115    fn fc_replenish_not_at_low_consumption() {
2116        let fc = default_fc_state();
2117        // Just created, no consumption
2118        // But note: should_replenish also checks elapsed time (10ms default)
2119        // Since we just created it, time elapsed is ~0ms
2120        // At 0 consumed, not at threshold... but the policy checks elapsed too
2121        // Let's just verify the API works
2122        let result = fc.should_send_replenish();
2123        // Could be true or false depending on timing; just ensure no panic
2124        let _ = result;
2125    }
2126
2127    #[test]
2128    fn fc_record_replenish_resets_state() {
2129        let mut fc = default_fc_state();
2130        fc.input_consumed = 5000;
2131        fc.record_replenish_sent();
2132        assert_eq!(fc.input_consumed, 0);
2133        assert_eq!(fc.fc_counters.replenishments_sent, 1);
2134    }
2135
2136    // --- FlowControlBridgeState: flow control message ---
2137
2138    #[test]
2139    fn fc_process_flow_control_msg_replenishes_window() {
2140        let mut fc = default_fc_state();
2141        fc.output_consumed = 30000;
2142        fc.process_flow_control_msg(20000);
2143        assert_eq!(fc.output_consumed, 10000);
2144    }
2145
2146    #[test]
2147    fn fc_process_flow_control_msg_saturates() {
2148        let mut fc = default_fc_state();
2149        fc.output_consumed = 100;
2150        fc.process_flow_control_msg(200);
2151        assert_eq!(fc.output_consumed, 0);
2152    }
2153
2154    // --- FlowControlBridgeState: rate window ---
2155
2156    #[test]
2157    fn fc_rate_window_resets_after_interval() {
2158        let mut fc = default_fc_state();
2159        fc.rate_in_arrived = 1000;
2160        fc.rate_out_arrived = 2000;
2161        fc.rate_in_serviced = 800;
2162        fc.rate_out_serviced = 1500;
2163        // Pretend 2 seconds elapsed
2164        fc.rate_window_start = Instant::now() - Duration::from_secs(2);
2165        fc.maybe_reset_rate_window();
2166        assert_eq!(fc.rate_in_arrived, 0);
2167        assert_eq!(fc.rate_out_arrived, 0);
2168        assert_eq!(fc.rate_in_serviced, 0);
2169        assert_eq!(fc.rate_out_serviced, 0);
2170    }
2171
2172    #[test]
2173    fn fc_rate_window_does_not_reset_early() {
2174        let mut fc = default_fc_state();
2175        fc.rate_in_arrived = 1000;
2176        fc.maybe_reset_rate_window();
2177        assert_eq!(fc.rate_in_arrived, 1000);
2178    }
2179
2180    // --- FlowControlBridgeState: summary ---
2181
2182    #[test]
2183    fn fc_summary_json_contains_expected_fields() {
2184        let mut fc = default_fc_state();
2185        fc.fc_counters.input_drops = 5;
2186        fc.fc_counters.decisions_non_stable = 10;
2187        fc.serviced_input_bytes = 1000;
2188        fc.serviced_output_bytes = 2000;
2189
2190        let summary = fc.summary_json();
2191        assert_eq!(summary["input_drops"], 5);
2192        assert_eq!(summary["decisions_non_stable"], 10);
2193        assert_eq!(summary["serviced_input_bytes"], 1000);
2194        assert_eq!(summary["serviced_output_bytes"], 2000);
2195    }
2196
2197    // --- WsPtyBridgeConfig with flow control ---
2198
2199    #[test]
2200    fn config_default_no_flow_control() {
2201        let c = WsPtyBridgeConfig::default();
2202        assert!(c.flow_control.is_none());
2203    }
2204
2205    #[test]
2206    fn config_with_flow_control() {
2207        let c = WsPtyBridgeConfig {
2208            flow_control: Some(FlowControlBridgeConfig::default()),
2209            ..WsPtyBridgeConfig::default()
2210        };
2211        assert!(c.flow_control.is_some());
2212        assert_eq!(c.flow_control.as_ref().unwrap().output_window, 65_536);
2213    }
2214
2215    // --- Integration: smoke test with flow control enabled ---
2216
2217    #[cfg(unix)]
2218    #[test]
2219    fn bridge_smoke_with_flow_control() {
2220        let listener =
2221            TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("bind ephemeral port");
2222        let bind_addr = listener.local_addr().expect("local addr");
2223        drop(listener);
2224
2225        let config = WsPtyBridgeConfig {
2226            bind_addr,
2227            accept_once: true,
2228            command: "/bin/sh".to_string(),
2229            args: vec!["-c".to_string(), "cat".to_string()],
2230            idle_sleep: Duration::from_millis(1),
2231            flow_control: Some(FlowControlBridgeConfig::default()),
2232            ..WsPtyBridgeConfig::default()
2233        };
2234
2235        let handle = thread::spawn(move || run_ws_pty_bridge(config));
2236        thread::sleep(Duration::from_millis(75));
2237
2238        let url = format!("ws://{bind_addr}/ws");
2239        let (mut client, _response) = connect(url).expect("connect websocket");
2240        if let MaybeTlsStream::Plain(stream) = client.get_mut() {
2241            stream
2242                .set_read_timeout(Some(Duration::from_millis(50)))
2243                .expect("set read timeout");
2244        }
2245        client
2246            .send(Message::Binary(b"fc-echo-test\n".to_vec().into()))
2247            .expect("send input");
2248
2249        let deadline = Instant::now() + Duration::from_secs(3);
2250        let mut observed = Vec::new();
2251        let mut last_error: Option<WsError> = None;
2252        while Instant::now() < deadline {
2253            match client.read() {
2254                Ok(Message::Binary(bytes)) => {
2255                    observed.extend_from_slice(bytes.as_ref());
2256                    if observed
2257                        .windows(b"fc-echo-test".len())
2258                        .any(|window| window == b"fc-echo-test")
2259                    {
2260                        break;
2261                    }
2262                }
2263                Ok(_) => {}
2264                Err(WsError::Io(error))
2265                    if matches!(
2266                        error.kind(),
2267                        io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut
2268                    ) => {}
2269                Err(error) => {
2270                    last_error = Some(error);
2271                    break;
2272                }
2273            }
2274        }
2275
2276        let saw_echo = observed
2277            .windows(b"fc-echo-test".len())
2278            .any(|window| window == b"fc-echo-test");
2279
2280        if let Err(err) = client.send(Message::Text(r#"{"type":"close"}"#.to_string().into()))
2281            && last_error.is_none()
2282        {
2283            last_error = Some(err);
2284        }
2285        let result = handle.join().expect("bridge thread join");
2286        result.expect("bridge result");
2287
2288        assert!(
2289            saw_echo,
2290            "expected PTY echo with flow control; last_error={last_error:?}; observed_len={}",
2291            observed.len()
2292        );
2293    }
2294
2295    #[cfg(unix)]
2296    #[test]
2297    fn pty_bridge_session_drop_does_not_block_when_background_process_keeps_pty_open() {
2298        let shell = std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".to_string());
2299        let (done_tx, done_rx) = mpsc::channel();
2300        let drop_thread = thread::spawn(move || {
2301            let session = PtyBridgeSession::spawn(&WsPtyBridgeConfig {
2302                command: shell,
2303                args: vec!["-c".to_string(), "sleep 1 >/dev/null 2>&1 &".to_string()],
2304                telemetry_path: None,
2305                ..WsPtyBridgeConfig::default()
2306            })
2307            .expect("spawn bridge PTY");
2308            drop(session);
2309            done_tx.send(()).expect("signal drop completion");
2310        });
2311
2312        assert!(
2313            done_rx.recv_timeout(Duration::from_millis(400)).is_ok(),
2314            "PtyBridgeSession drop should not wait for background descendants to close the PTY"
2315        );
2316        drop_thread.join().expect("drop thread join");
2317    }
2318}