1use std::collections::VecDeque;
12use std::fs::{File, OpenOptions};
13use std::io::{self, Read, Write};
14use std::net::{SocketAddr, TcpListener, TcpStream};
15use std::path::{Path, PathBuf};
16use std::sync::mpsc;
17use std::thread;
18use std::time::{Duration, Instant};
19
20use frankenterm_core::flow_control::{
21 FlowControlConfig, FlowControlPolicy, FlowControlSnapshot, InputEventClass, LatencyWindowMs,
22 QueueDepthBytes, RateWindowBps,
23};
24use portable_pty::{Child, CommandBuilder, ExitStatus, MasterPty, PtySize};
25use serde_json::{Value, json};
26use time::OffsetDateTime;
27use time::format_description::well_known::Rfc3339;
28use tungstenite::handshake::server::{ErrorResponse, Request, Response};
29use tungstenite::http::StatusCode;
30use tungstenite::protocol::WebSocketConfig;
31use tungstenite::{Error as WsError, Message, WebSocket, accept_hdr_with_config};
32
33#[derive(Debug, Clone)]
35pub struct WsPtyBridgeConfig {
36 pub bind_addr: SocketAddr,
38 pub command: String,
40 pub args: Vec<String>,
42 pub term: String,
44 pub env: Vec<(String, String)>,
46 pub cols: u16,
48 pub rows: u16,
50 pub allowed_origins: Vec<String>,
52 pub auth_token: Option<String>,
54 pub telemetry_path: Option<PathBuf>,
56 pub max_message_bytes: usize,
58 pub idle_sleep: Duration,
60 pub accept_once: bool,
62 pub flow_control: Option<FlowControlBridgeConfig>,
66}
67
68impl Default for WsPtyBridgeConfig {
69 fn default() -> Self {
70 let command = std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".to_string());
71 Self {
72 bind_addr: SocketAddr::from(([127, 0, 0, 1], 9231)),
73 command,
74 args: Vec::new(),
75 term: "xterm-256color".to_string(),
76 env: Vec::new(),
77 cols: 120,
78 rows: 40,
79 allowed_origins: Vec::new(),
80 auth_token: None,
81 telemetry_path: None,
82 max_message_bytes: 256 * 1024,
83 idle_sleep: Duration::from_millis(5),
84 accept_once: true,
85 flow_control: None,
86 }
87 }
88}
89
90#[derive(Debug, Clone, PartialEq, Eq)]
92pub struct BridgeSummary {
93 pub session_id: String,
95 pub ws_in_bytes: u64,
97 pub ws_out_bytes: u64,
99 pub pty_in_bytes: u64,
101 pub pty_out_bytes: u64,
103 pub resize_events: u64,
105 pub exit_code: Option<u32>,
107 pub exit_signal: Option<String>,
109}
110
111impl BridgeSummary {
112 #[must_use]
113 fn as_json(&self) -> Value {
114 json!({
115 "session_id": self.session_id,
116 "ws_in_bytes": self.ws_in_bytes,
117 "ws_out_bytes": self.ws_out_bytes,
118 "pty_in_bytes": self.pty_in_bytes,
119 "pty_out_bytes": self.pty_out_bytes,
120 "resize_events": self.resize_events,
121 "exit_code": self.exit_code,
122 "exit_signal": self.exit_signal,
123 })
124 }
125}
126
127pub fn run_ws_pty_bridge(config: WsPtyBridgeConfig) -> io::Result<()> {
133 let listener = TcpListener::bind(config.bind_addr)?;
134
135 loop {
136 let (stream, peer_addr) = listener.accept()?;
137 let session_id = make_session_id();
138 let mut telemetry = TelemetrySink::new(config.telemetry_path.as_deref(), &session_id)?;
139 telemetry.write(
140 "bridge_session_start",
141 json!({
142 "peer": peer_addr.to_string(),
143 "bind_addr": config.bind_addr.to_string(),
144 "command": config.command,
145 "args": config.args,
146 "cols": config.cols,
147 "rows": config.rows,
148 "term": config.term,
149 "max_message_bytes": config.max_message_bytes,
150 }),
151 )?;
152
153 let result = run_single_session(stream, &config, &session_id, &mut telemetry);
154 match result {
155 Ok(summary) => {
156 telemetry.write("bridge_session_end", summary.as_json())?;
157 }
158 Err(error) => {
159 telemetry.write(
160 "bridge_session_error",
161 json!({ "error": error.to_string() }),
162 )?;
163 if config.accept_once {
164 return Err(error);
165 }
166 }
167 }
168
169 if config.accept_once {
170 break;
171 }
172 }
173
174 Ok(())
175}
176
177fn make_summary(
178 session_id: &str,
179 counters: &Counters,
180 exit_code: Option<u32>,
181 exit_signal: Option<String>,
182) -> BridgeSummary {
183 BridgeSummary {
184 session_id: session_id.to_string(),
185 ws_in_bytes: counters.ws_in_bytes,
186 ws_out_bytes: counters.ws_out_bytes,
187 pty_in_bytes: counters.pty_in_bytes,
188 pty_out_bytes: counters.pty_out_bytes,
189 resize_events: counters.resize_events,
190 exit_code,
191 exit_signal,
192 }
193}
194
195fn run_single_session(
196 stream: TcpStream,
197 config: &WsPtyBridgeConfig,
198 session_id: &str,
199 telemetry: &mut TelemetrySink,
200) -> io::Result<BridgeSummary> {
201 stream.set_nodelay(true)?;
202 let mut websocket = accept_websocket(stream, config)?;
203 websocket.get_mut().set_nonblocking(true)?;
204
205 let mut pty = PtyBridgeSession::spawn(config)?;
206 let mut counters = Counters::default();
207 let mut exit_code = None;
208 let mut exit_signal: Option<String> = None;
209
210 let mut fc_state: Option<FlowControlBridgeState> = config
211 .flow_control
212 .as_ref()
213 .map(FlowControlBridgeState::new);
214
215 if let Some(ref fc_config) = config.flow_control {
216 telemetry.write(
217 "flow_control_enabled",
218 json!({
219 "output_window": fc_config.output_window,
220 "input_window": fc_config.input_window,
221 "coalesce_resize_ms": fc_config.coalesce_resize_ms,
222 }),
223 )?;
224 }
225
226 loop {
227 let mut progressed = false;
228
229 loop {
231 match websocket.read() {
232 Ok(message) => {
233 progressed = true;
234 if handle_ws_message(
235 &mut websocket,
236 &mut pty,
237 &mut counters,
238 telemetry,
239 message,
240 &mut fc_state,
241 )? {
242 if let Some(ref fc) = fc_state {
243 telemetry.write("flow_control_summary", fc.summary_json())?;
244 }
245 return Ok(make_summary(session_id, &counters, exit_code, exit_signal));
246 }
247 }
248 Err(WsError::Io(error)) if error.kind() == io::ErrorKind::WouldBlock => break,
249 Err(WsError::ConnectionClosed | WsError::AlreadyClosed) => {
250 if let Some(ref fc) = fc_state {
251 telemetry.write("flow_control_summary", fc.summary_json())?;
252 }
253 return Ok(make_summary(session_id, &counters, exit_code, exit_signal));
254 }
255 Err(error) => {
256 return Err(io::Error::other(format!("websocket read failed: {error}")));
257 }
258 }
259 }
260
261 if let Some(ref mut fc) = fc_state
263 && let Some((cols, rows)) = fc.flush_pending_resize()
264 {
265 pty.resize(cols, rows)?;
266 counters.resize_events = counters.resize_events.saturating_add(1);
267 telemetry.write(
268 "bridge_resize",
269 json!({ "cols": cols, "rows": rows, "coalesced": true }),
270 )?;
271 }
272
273 let read_pty = fc_state.as_ref().is_none_or(|fc| !fc.pty_reads_paused);
275 if read_pty {
276 let output = pty.drain_output_nonblocking()?;
277 if !output.is_empty() {
278 progressed = true;
279 counters.pty_out_bytes = counters
280 .pty_out_bytes
281 .saturating_add(u64::try_from(output.len()).unwrap_or(u64::MAX));
282
283 match fc_state {
284 Some(ref mut fc) => {
285 fc.enqueue_output(&output);
286 }
287 None => {
288 counters.ws_out_bytes = counters
289 .ws_out_bytes
290 .saturating_add(u64::try_from(output.len()).unwrap_or(u64::MAX));
291 send_ws_message(&mut websocket, Message::binary(output))?;
292 }
293 }
294 }
295 }
296
297 if let Some(ref mut fc) = fc_state {
299 fc.maybe_reset_rate_window();
300 let was_paused = fc.pty_reads_paused;
301 let decision = fc.evaluate();
302
303 if decision.chosen_action.is_some() {
305 telemetry.write(
306 "flow_control_decision",
307 json!({
308 "action": format!("{:?}", decision.chosen_action),
309 "reason": format!("{:?}", decision.reason),
310 "fairness_index": decision.fairness_index,
311 "output_batch_budget": decision.output_batch_budget_bytes,
312 "should_pause_pty_reads": decision.should_pause_pty_reads,
313 "output_queue_bytes": fc.output_queue_bytes(),
314 }),
315 )?;
316 }
317
318 emit_flow_control_stall_if_transitioned(telemetry, fc, was_paused)?;
319
320 let batch = fc.drain_output(decision.output_batch_budget_bytes);
322 if !batch.is_empty() {
323 progressed = true;
324 counters.ws_out_bytes = counters
325 .ws_out_bytes
326 .saturating_add(u64::try_from(batch.len()).unwrap_or(u64::MAX));
327 send_ws_message(&mut websocket, Message::binary(batch))?;
328 }
329
330 if fc.should_send_replenish() {
332 emit_flow_control_event(
333 telemetry,
334 "input",
335 "replenish",
336 fc.input_window,
337 fc.input_pending_bytes,
338 )?;
339 telemetry.write(
340 "flow_control_replenish",
341 json!({
342 "input_consumed": fc.input_consumed,
343 "input_window": fc.input_window,
344 }),
345 )?;
346 fc.record_replenish_sent();
347 }
348 }
349
350 if let Some(status) = pty.try_wait()? {
352 exit_code = Some(status.exit_code());
353 exit_signal = status.signal().map(ToOwned::to_owned);
354
355 let trailing = pty.drain_output_nonblocking()?;
356 if !trailing.is_empty() {
357 counters.pty_out_bytes = counters
358 .pty_out_bytes
359 .saturating_add(u64::try_from(trailing.len()).unwrap_or(u64::MAX));
360
361 match fc_state {
362 Some(ref mut fc) => {
363 fc.enqueue_output(&trailing);
364 let remaining = fc.drain_all_output();
365 if !remaining.is_empty() {
366 counters.ws_out_bytes = counters
367 .ws_out_bytes
368 .saturating_add(u64::try_from(remaining.len()).unwrap_or(u64::MAX));
369 send_ws_message(&mut websocket, Message::binary(remaining))?;
370 }
371 }
372 None => {
373 counters.ws_out_bytes = counters
374 .ws_out_bytes
375 .saturating_add(u64::try_from(trailing.len()).unwrap_or(u64::MAX));
376 send_ws_message(&mut websocket, Message::binary(trailing))?;
377 }
378 }
379 }
380
381 if let Some(ref fc) = fc_state {
382 telemetry.write("flow_control_summary", fc.summary_json())?;
383 }
384
385 let end = json!({
386 "type": "session_end",
387 "exit_code": exit_code,
388 "exit_signal": exit_signal,
389 });
390 send_ws_message(&mut websocket, Message::text(end.to_string()))?;
391 let _ = websocket.close(None);
392 return Ok(make_summary(
393 session_id,
394 &counters,
395 exit_code,
396 exit_signal.clone(),
397 ));
398 }
399
400 if !progressed {
401 thread::sleep(config.idle_sleep);
402 }
403 }
404}
405
406fn handle_ws_message(
407 websocket: &mut WebSocket<TcpStream>,
408 pty: &mut PtyBridgeSession,
409 counters: &mut Counters,
410 telemetry: &mut TelemetrySink,
411 message: Message,
412 fc_state: &mut Option<FlowControlBridgeState>,
413) -> io::Result<bool> {
414 match message {
415 Message::Binary(bytes) => {
416 let byte_len = bytes.len();
417 let len32 = u32::try_from(byte_len).unwrap_or(u32::MAX);
418 counters.ws_in_bytes = counters
419 .ws_in_bytes
420 .saturating_add(u64::try_from(byte_len).unwrap_or(u64::MAX));
421
422 if let Some(ref mut fc) = *fc_state {
427 if fc.should_drop_input(InputEventClass::Interactive) {
428 fc.fc_counters.input_drops = fc.fc_counters.input_drops.saturating_add(1);
432 telemetry.write(
433 "flow_control_input_drop",
434 json!({
435 "bytes": byte_len,
436 "input_queue_bytes": fc.input_pending_bytes,
437 }),
438 )?;
439 return Ok(false);
440 }
441 fc.record_input_arrival(len32);
442 }
443
444 pty.send_input(bytes.as_ref())?;
445 counters.pty_in_bytes = counters
446 .pty_in_bytes
447 .saturating_add(u64::try_from(byte_len).unwrap_or(u64::MAX));
448
449 if let Some(ref mut fc) = *fc_state {
450 fc.record_input_serviced(len32);
451 }
452
453 telemetry.write("bridge_input", json!({ "bytes": byte_len }))?;
454 Ok(false)
455 }
456 Message::Text(text) => match parse_control_message(text.as_ref())? {
457 Some(ControlMessage::Resize { cols, rows }) => {
458 match *fc_state {
459 Some(ref mut fc) => {
460 fc.coalesce_resize(cols, rows);
462 }
463 None => {
464 pty.resize(cols, rows)?;
465 counters.resize_events = counters.resize_events.saturating_add(1);
466 telemetry.write("bridge_resize", json!({ "cols": cols, "rows": rows }))?;
467 }
468 }
469 Ok(false)
470 }
471 Some(ControlMessage::Ping) => {
472 send_ws_message(websocket, Message::Pong(Vec::<u8>::new().into()))?;
473 Ok(false)
474 }
475 Some(ControlMessage::Close) => Ok(true),
476 None => {
477 send_ws_message(
478 websocket,
479 Message::text(
480 json!({ "type": "warning", "message": "unknown_control_message" })
481 .to_string(),
482 ),
483 )?;
484 Ok(false)
485 }
486 },
487 Message::Ping(payload) => {
488 send_ws_message(websocket, Message::Pong(payload))?;
489 Ok(false)
490 }
491 Message::Pong(_) | Message::Frame(_) => Ok(false),
492 Message::Close(_) => Ok(true),
493 }
494}
495
496fn send_ws_message(websocket: &mut WebSocket<TcpStream>, message: Message) -> io::Result<()> {
497 let mut retries = 0_u8;
498 loop {
499 match websocket.send(message.clone()) {
500 Ok(()) => return Ok(()),
501 Err(WsError::Io(error)) if error.kind() == io::ErrorKind::WouldBlock && retries < 5 => {
502 retries = retries.saturating_add(1);
503 thread::sleep(Duration::from_millis(2));
504 }
505 Err(error) => {
506 return Err(io::Error::other(format!("websocket send failed: {error}")));
507 }
508 }
509 }
510}
511
512#[allow(clippy::result_large_err)] fn accept_websocket(
514 stream: TcpStream,
515 config: &WsPtyBridgeConfig,
516) -> io::Result<WebSocket<TcpStream>> {
517 let allowed_origins = config.allowed_origins.clone();
518 let expected_token = config.auth_token.clone();
519 let ws_config = WebSocketConfig::default()
520 .max_message_size(Some(config.max_message_bytes))
521 .max_frame_size(Some(config.max_message_bytes))
522 .write_buffer_size(0);
523
524 let callback = move |request: &Request, response: Response| {
525 validate_upgrade_request(request, &allowed_origins, expected_token.as_deref())
526 .map(|()| response)
527 .map_err(HandshakeRejection::into_response)
528 };
529
530 accept_hdr_with_config(stream, callback, Some(ws_config)).map_err(|error| {
531 io::Error::new(
532 io::ErrorKind::PermissionDenied,
533 format!("websocket handshake failed: {error}"),
534 )
535 })
536}
537
538#[derive(Debug, Clone, Copy, PartialEq, Eq)]
539enum ControlMessage {
540 Resize { cols: u16, rows: u16 },
541 Ping,
542 Close,
543}
544
545fn parse_control_message(text: &str) -> io::Result<Option<ControlMessage>> {
546 let value: Value = serde_json::from_str(text).map_err(|error| {
547 io::Error::new(
548 io::ErrorKind::InvalidData,
549 format!("invalid control JSON: {error}"),
550 )
551 })?;
552
553 let msg_type = value.get("type").and_then(Value::as_str).ok_or_else(|| {
554 io::Error::new(io::ErrorKind::InvalidData, "control message missing `type`")
555 })?;
556
557 match msg_type {
558 "resize" => {
559 let cols = read_u16_field(&value, "cols")?;
560 let rows = read_u16_field(&value, "rows")?;
561 if cols == 0 || rows == 0 {
562 return Err(io::Error::new(
563 io::ErrorKind::InvalidInput,
564 "resize dimensions must be > 0",
565 ));
566 }
567 Ok(Some(ControlMessage::Resize { cols, rows }))
568 }
569 "ping" => Ok(Some(ControlMessage::Ping)),
570 "close" => Ok(Some(ControlMessage::Close)),
571 _ => Ok(None),
572 }
573}
574
575fn read_u16_field(value: &Value, key: &str) -> io::Result<u16> {
576 let raw = value.get(key).and_then(Value::as_u64).ok_or_else(|| {
577 io::Error::new(
578 io::ErrorKind::InvalidData,
579 format!("control message missing numeric `{key}`"),
580 )
581 })?;
582 u16::try_from(raw).map_err(|_| {
583 io::Error::new(
584 io::ErrorKind::InvalidData,
585 format!("`{key}` out of range for u16"),
586 )
587 })
588}
589
590#[derive(Debug, Clone, PartialEq, Eq)]
591struct HandshakeRejection {
592 status: StatusCode,
593 body: String,
594}
595
596impl HandshakeRejection {
597 fn into_response(self) -> ErrorResponse {
598 let mut response = ErrorResponse::new(Some(self.body));
599 *response.status_mut() = self.status;
600 response
601 }
602}
603
604fn validate_upgrade_request(
605 request: &Request,
606 allowed_origins: &[String],
607 expected_token: Option<&str>,
608) -> Result<(), HandshakeRejection> {
609 if !allowed_origins.is_empty() {
610 let origin = request
611 .headers()
612 .get("Origin")
613 .and_then(|value| value.to_str().ok())
614 .ok_or_else(|| HandshakeRejection {
615 status: StatusCode::FORBIDDEN,
616 body: "Origin header missing".to_string(),
617 })?;
618 let allowed = allowed_origins.iter().any(|allowed| allowed == origin);
619 if !allowed {
620 return Err(HandshakeRejection {
621 status: StatusCode::FORBIDDEN,
622 body: "Origin not allowed".to_string(),
623 });
624 }
625 }
626
627 if let Some(token) = expected_token {
628 let query = request.uri().query().ok_or_else(|| HandshakeRejection {
629 status: StatusCode::UNAUTHORIZED,
630 body: "Missing token".to_string(),
631 })?;
632 let presented = query_param(query, "token").ok_or_else(|| HandshakeRejection {
633 status: StatusCode::UNAUTHORIZED,
634 body: "Missing token".to_string(),
635 })?;
636 if presented != token {
637 return Err(HandshakeRejection {
638 status: StatusCode::UNAUTHORIZED,
639 body: "Invalid token".to_string(),
640 });
641 }
642 }
643
644 Ok(())
645}
646
647fn query_param<'a>(query: &'a str, key: &str) -> Option<&'a str> {
648 query.split('&').find_map(|pair| {
649 let mut pieces = pair.splitn(2, '=');
650 let name = pieces.next().unwrap_or_default();
651 let value = pieces.next().unwrap_or_default();
652 if name == key { Some(value) } else { None }
653 })
654}
655
656fn make_session_id() -> String {
657 let ts = OffsetDateTime::now_utc().unix_timestamp_nanos();
658 format!("ws-bridge-{}-{ts}", std::process::id())
659}
660
661fn emit_flow_control_event(
662 telemetry: &mut TelemetrySink,
663 direction: &'static str,
664 action: &'static str,
665 window_bytes: u32,
666 queued_bytes: u32,
667) -> io::Result<()> {
668 telemetry.write(
669 "flow_control",
670 json!({
671 "direction": direction,
672 "action": action,
673 "window_bytes": window_bytes,
674 "queued_bytes": queued_bytes,
675 }),
676 )
677}
678
679fn emit_flow_control_stall_if_transitioned(
680 telemetry: &mut TelemetrySink,
681 fc: &FlowControlBridgeState,
682 was_paused: bool,
683) -> io::Result<()> {
684 if !was_paused && fc.pty_reads_paused {
685 emit_flow_control_event(
686 telemetry,
687 "output",
688 "stall",
689 fc.output_window,
690 fc.output_queue_bytes(),
691 )?;
692 }
693 Ok(())
694}
695
696#[derive(Debug, Default)]
697struct Counters {
698 ws_in_bytes: u64,
699 ws_out_bytes: u64,
700 pty_in_bytes: u64,
701 pty_out_bytes: u64,
702 resize_events: u64,
703}
704
705#[derive(Debug, Clone)]
714pub struct FlowControlBridgeConfig {
715 pub output_window: u32,
717 pub input_window: u32,
719 pub coalesce_resize_ms: u32,
721 pub policy: FlowControlConfig,
723}
724
725impl Default for FlowControlBridgeConfig {
726 fn default() -> Self {
727 Self {
728 output_window: 65_536,
729 input_window: 8_192,
730 coalesce_resize_ms: 50,
731 policy: FlowControlConfig::default(),
732 }
733 }
734}
735
736#[derive(Debug, Default)]
738struct FlowControlCounters {
739 output_queue_peak_bytes: u32,
740 input_drops: u64,
741 decisions_non_stable: u64,
742 resizes_coalesced: u64,
743 pty_read_pauses: u64,
744 replenishments_sent: u64,
745}
746
747struct FlowControlBridgeState {
749 policy: FlowControlPolicy,
750
751 output_window: u32,
753 input_window: u32,
754 output_consumed: u32,
755 input_consumed: u32,
756
757 output_queue: VecDeque<u8>,
759
760 input_pending_bytes: u32,
762
763 serviced_input_bytes: u64,
765 serviced_output_bytes: u64,
766
767 last_replenish: Instant,
769 output_hard_cap_since: Option<Instant>,
770 rate_window_start: Instant,
771
772 rate_in_arrived: u32,
774 rate_out_arrived: u32,
775 rate_in_serviced: u32,
776 rate_out_serviced: u32,
777
778 coalesce_resize_ms: u32,
780 pending_resize: Option<(u16, u16, Instant)>,
781
782 pty_reads_paused: bool,
784
785 fc_counters: FlowControlCounters,
787}
788
789impl FlowControlBridgeState {
790 fn new(config: &FlowControlBridgeConfig) -> Self {
791 let now = Instant::now();
792 Self {
793 policy: FlowControlPolicy::new(config.policy),
794 output_window: config.output_window,
795 input_window: config.input_window,
796 output_consumed: 0,
797 input_consumed: 0,
798 output_queue: VecDeque::new(),
799 input_pending_bytes: 0,
800 serviced_input_bytes: 0,
801 serviced_output_bytes: 0,
802 last_replenish: now,
803 output_hard_cap_since: None,
804 rate_window_start: now,
805 rate_in_arrived: 0,
806 rate_out_arrived: 0,
807 rate_in_serviced: 0,
808 rate_out_serviced: 0,
809 coalesce_resize_ms: config.coalesce_resize_ms,
810 pending_resize: None,
811 pty_reads_paused: false,
812 fc_counters: FlowControlCounters::default(),
813 }
814 }
815
816 fn record_input_arrival(&mut self, len: u32) {
818 self.input_pending_bytes = self.input_pending_bytes.saturating_add(len);
819 self.rate_in_arrived = self.rate_in_arrived.saturating_add(len);
820 self.input_consumed = self.input_consumed.saturating_add(len);
821 }
822
823 fn record_input_serviced(&mut self, len: u32) {
825 self.input_pending_bytes = self.input_pending_bytes.saturating_sub(len);
826 self.serviced_input_bytes = self.serviced_input_bytes.saturating_add(len as u64);
827 self.rate_in_serviced = self.rate_in_serviced.saturating_add(len);
828 }
829
830 fn enqueue_output(&mut self, data: &[u8]) {
833 let hard_cap = self.policy.config.output_hard_cap_bytes as usize;
834 let available = hard_cap.saturating_sub(self.output_queue.len());
835 let to_add = data.len().min(available);
836 self.output_queue.extend(&data[..to_add]);
837 self.rate_out_arrived = self.rate_out_arrived.saturating_add(to_add as u32);
838
839 let queue_len = self.output_queue.len() as u32;
840 if queue_len > self.fc_counters.output_queue_peak_bytes {
841 self.fc_counters.output_queue_peak_bytes = queue_len;
842 }
843 }
844
845 fn drain_output(&mut self, budget: u32) -> Vec<u8> {
847 let queue_available = self.output_queue.len().min(budget as usize);
848 let window_available = self.output_window.saturating_sub(self.output_consumed) as usize;
849 let to_drain = queue_available.min(window_available);
850 if to_drain == 0 {
851 return Vec::new();
852 }
853 let batch: Vec<u8> = self.output_queue.drain(..to_drain).collect();
854 let len = batch.len() as u32;
855 self.output_consumed = self.output_consumed.saturating_add(len);
856 self.serviced_output_bytes = self.serviced_output_bytes.saturating_add(len as u64);
857 self.rate_out_serviced = self.rate_out_serviced.saturating_add(len);
858 batch
859 }
860
861 fn drain_all_output(&mut self) -> Vec<u8> {
863 let batch: Vec<u8> = self.output_queue.drain(..).collect();
864 let len = batch.len() as u32;
865 self.output_consumed = self.output_consumed.saturating_add(len);
866 self.serviced_output_bytes = self.serviced_output_bytes.saturating_add(len as u64);
867 batch
868 }
869
870 fn should_drop_input(&self, class: InputEventClass) -> bool {
872 self.policy
873 .should_drop_input_event(self.input_pending_bytes, class)
874 }
875
876 fn coalesce_resize(&mut self, cols: u16, rows: u16) {
878 self.pending_resize = Some((cols, rows, Instant::now()));
879 }
880
881 fn flush_pending_resize(&mut self) -> Option<(u16, u16)> {
883 let (cols, rows, queued_at) = self.pending_resize?;
884 if self.coalesce_resize_ms == 0 {
885 self.pending_resize = None;
886 return Some((cols, rows));
887 }
888 if queued_at.elapsed() >= Duration::from_millis(self.coalesce_resize_ms as u64) {
889 self.fc_counters.resizes_coalesced =
890 self.fc_counters.resizes_coalesced.saturating_add(1);
891 self.pending_resize = None;
892 Some((cols, rows))
893 } else {
894 None
895 }
896 }
897
898 fn build_snapshot(&self) -> FlowControlSnapshot {
900 let rate_elapsed = self.rate_window_start.elapsed();
901 let rate_secs = rate_elapsed.as_secs_f64().max(0.001);
902
903 let queue_bytes = self.output_queue.len() as u32;
904 let hard_cap = self.policy.config.output_hard_cap_bytes;
905 let occupancy = if hard_cap > 0 {
906 queue_bytes as f64 / hard_cap as f64
907 } else {
908 0.0
909 };
910
911 let hard_cap_duration_ms = self
912 .output_hard_cap_since
913 .map(|start| start.elapsed().as_millis() as u64)
914 .unwrap_or(0);
915
916 FlowControlSnapshot {
917 queues: QueueDepthBytes {
918 input: self.input_pending_bytes,
919 output: queue_bytes,
920 render_frames: 0,
921 },
922 rates: RateWindowBps {
923 lambda_in: (self.rate_in_arrived as f64 / rate_secs) as u32,
924 lambda_out: (self.rate_out_arrived as f64 / rate_secs) as u32,
925 mu_in: (self.rate_in_serviced as f64 / rate_secs) as u32,
926 mu_out: (self.rate_out_serviced as f64 / rate_secs) as u32,
927 },
928 latency: LatencyWindowMs {
929 key_p50_ms: 10.0 * occupancy,
930 key_p95_ms: 20.0 * occupancy,
931 },
932 serviced_input_bytes: self.serviced_input_bytes,
933 serviced_output_bytes: self.serviced_output_bytes,
934 output_hard_cap_duration_ms: hard_cap_duration_ms,
935 }
936 }
937
938 fn evaluate(&mut self) -> frankenterm_core::flow_control::FlowControlDecision {
940 let snapshot = self.build_snapshot();
941 let decision = self.policy.evaluate(snapshot);
942
943 let at_hard_cap =
945 self.output_queue.len() as u32 >= self.policy.config.output_hard_cap_bytes;
946 match (at_hard_cap, self.output_hard_cap_since) {
947 (true, None) => self.output_hard_cap_since = Some(Instant::now()),
948 (false, Some(_)) => self.output_hard_cap_since = None,
949 _ => {}
950 }
951
952 let was_paused = self.pty_reads_paused;
953 self.pty_reads_paused = decision.should_pause_pty_reads;
954 if decision.should_pause_pty_reads && !was_paused {
955 self.fc_counters.pty_read_pauses = self.fc_counters.pty_read_pauses.saturating_add(1);
956 }
957
958 if decision.chosen_action.is_some() {
959 self.fc_counters.decisions_non_stable =
960 self.fc_counters.decisions_non_stable.saturating_add(1);
961 }
962
963 decision
964 }
965
966 fn should_send_replenish(&self) -> bool {
968 let elapsed_ms = self.last_replenish.elapsed().as_millis() as u64;
969 self.policy
970 .should_replenish(self.input_consumed, self.input_window, elapsed_ms)
971 }
972
973 fn record_replenish_sent(&mut self) {
975 self.input_consumed = 0;
976 self.last_replenish = Instant::now();
977 self.fc_counters.replenishments_sent =
978 self.fc_counters.replenishments_sent.saturating_add(1);
979 }
980
981 #[allow(dead_code)]
985 fn process_flow_control_msg(&mut self, output_consumed: u32) {
986 self.output_consumed = self.output_consumed.saturating_sub(output_consumed);
987 }
988
989 fn maybe_reset_rate_window(&mut self) {
991 if self.rate_window_start.elapsed() >= Duration::from_secs(1) {
992 self.rate_in_arrived = 0;
993 self.rate_out_arrived = 0;
994 self.rate_in_serviced = 0;
995 self.rate_out_serviced = 0;
996 self.rate_window_start = Instant::now();
997 }
998 }
999
1000 fn output_queue_bytes(&self) -> u32 {
1001 self.output_queue.len() as u32
1002 }
1003
1004 fn summary_json(&self) -> Value {
1006 json!({
1007 "output_queue_peak_bytes": self.fc_counters.output_queue_peak_bytes,
1008 "input_drops": self.fc_counters.input_drops,
1009 "decisions_non_stable": self.fc_counters.decisions_non_stable,
1010 "resizes_coalesced": self.fc_counters.resizes_coalesced,
1011 "pty_read_pauses": self.fc_counters.pty_read_pauses,
1012 "replenishments_sent": self.fc_counters.replenishments_sent,
1013 "serviced_input_bytes": self.serviced_input_bytes,
1014 "serviced_output_bytes": self.serviced_output_bytes,
1015 })
1016 }
1017}
1018
1019#[derive(Debug)]
1020enum ReaderMsg {
1021 Data(Vec<u8>),
1022 Eof,
1023 Err(io::Error),
1024}
1025
1026struct PtyBridgeSession {
1027 child: Box<dyn Child + Send + Sync>,
1028 master: Box<dyn MasterPty + Send>,
1029 writer: Box<dyn Write + Send>,
1030 rx: mpsc::Receiver<ReaderMsg>,
1031 reader_thread: Option<thread::JoinHandle<()>>,
1032 eof: bool,
1033}
1034
1035impl PtyBridgeSession {
1036 fn spawn(config: &WsPtyBridgeConfig) -> io::Result<Self> {
1037 let mut cmd = CommandBuilder::new(&config.command);
1038 for arg in &config.args {
1039 cmd.arg(arg);
1040 }
1041 cmd.env("TERM", &config.term);
1042 for (key, value) in &config.env {
1043 cmd.env(key, value);
1044 }
1045
1046 let pty_system = portable_pty::native_pty_system();
1047 let pair = pty_system
1048 .openpty(PtySize {
1049 rows: config.rows,
1050 cols: config.cols,
1051 pixel_width: 0,
1052 pixel_height: 0,
1053 })
1054 .map_err(portable_pty_error)?;
1055
1056 let child = pair.slave.spawn_command(cmd).map_err(portable_pty_error)?;
1057 let mut reader = pair.master.try_clone_reader().map_err(portable_pty_error)?;
1058 let writer = pair.master.take_writer().map_err(portable_pty_error)?;
1059
1060 let (tx, rx) = mpsc::channel::<ReaderMsg>();
1061 let reader_thread = thread::Builder::new()
1062 .name("ftui-pty-ws-reader".to_string())
1063 .spawn(move || {
1064 let mut buffer = [0_u8; 8192];
1065 loop {
1066 match reader.read(&mut buffer) {
1067 Ok(0) => {
1068 let _ = tx.send(ReaderMsg::Eof);
1069 break;
1070 }
1071 Ok(n) => {
1072 let _ = tx.send(ReaderMsg::Data(buffer[..n].to_vec()));
1073 }
1074 Err(error) if error.kind() == io::ErrorKind::Interrupted => {}
1075 Err(error) => {
1076 let _ = tx.send(ReaderMsg::Err(error));
1077 break;
1078 }
1079 }
1080 }
1081 })
1082 .map_err(|error| {
1083 io::Error::other(format!("failed to spawn PTY reader thread: {error}"))
1084 })?;
1085
1086 Ok(Self {
1087 child,
1088 master: pair.master,
1089 writer,
1090 rx,
1091 reader_thread: Some(reader_thread),
1092 eof: false,
1093 })
1094 }
1095
1096 fn send_input(&mut self, bytes: &[u8]) -> io::Result<()> {
1097 if bytes.is_empty() {
1098 return Ok(());
1099 }
1100 self.writer.write_all(bytes)?;
1101 self.writer.flush()?;
1102 Ok(())
1103 }
1104
1105 fn resize(&mut self, cols: u16, rows: u16) -> io::Result<()> {
1106 self.master
1107 .resize(PtySize {
1108 rows,
1109 cols,
1110 pixel_width: 0,
1111 pixel_height: 0,
1112 })
1113 .map_err(portable_pty_error)
1114 }
1115
1116 fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
1117 self.child.try_wait()
1118 }
1119
1120 fn drain_output_nonblocking(&mut self) -> io::Result<Vec<u8>> {
1121 if self.eof {
1122 return Ok(Vec::new());
1123 }
1124
1125 let mut output = Vec::new();
1126 loop {
1127 match self.rx.try_recv() {
1128 Ok(ReaderMsg::Data(bytes)) => output.extend_from_slice(&bytes),
1129 Ok(ReaderMsg::Eof) => {
1130 self.eof = true;
1131 break;
1132 }
1133 Ok(ReaderMsg::Err(error)) => return Err(error),
1134 Err(mpsc::TryRecvError::Empty) => break,
1135 Err(mpsc::TryRecvError::Disconnected) => {
1136 self.eof = true;
1137 break;
1138 }
1139 }
1140 }
1141
1142 Ok(output)
1143 }
1144}
1145
1146impl Drop for PtyBridgeSession {
1147 fn drop(&mut self) {
1148 let _ = self.child.kill();
1149 if let Some(handle) = self.reader_thread.take() {
1150 detach_reader_join(handle);
1151 }
1152 }
1153}
1154
1155fn detach_reader_join(handle: thread::JoinHandle<()>) {
1156 let _ = thread::Builder::new()
1157 .name("ftui-pty-ws-detached-join".to_string())
1158 .spawn(move || {
1159 let _ = handle.join();
1160 });
1161}
1162
1163fn portable_pty_error<E: std::fmt::Display>(error: E) -> io::Error {
1164 io::Error::other(format!("{error}"))
1165}
1166
1167struct TelemetrySink {
1168 file: Option<File>,
1169 session_id: String,
1170 seq: u64,
1171}
1172
1173impl TelemetrySink {
1174 fn new(path: Option<&Path>, session_id: &str) -> io::Result<Self> {
1175 let file = match path {
1176 Some(path) => Some(OpenOptions::new().create(true).append(true).open(path)?),
1177 None => None,
1178 };
1179 Ok(Self {
1180 file,
1181 session_id: session_id.to_string(),
1182 seq: 0,
1183 })
1184 }
1185
1186 fn write(&mut self, event: &str, payload: Value) -> io::Result<()> {
1187 let Some(file) = self.file.as_mut() else {
1188 return Ok(());
1189 };
1190 let line = json!({
1191 "event": event,
1192 "ts": now_iso8601(),
1193 "session_id": self.session_id,
1194 "seq": self.seq,
1195 "payload": payload,
1196 });
1197 self.seq = self.seq.saturating_add(1);
1198 writeln!(file, "{line}")?;
1199 file.flush()?;
1200 Ok(())
1201 }
1202}
1203
1204fn now_iso8601() -> String {
1205 OffsetDateTime::now_utc()
1206 .format(&Rfc3339)
1207 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
1208}
1209
1210#[cfg(test)]
1211mod tests {
1212 use super::*;
1213 use std::net::TcpListener;
1214 use std::thread;
1215 use std::time::{Duration, Instant};
1216
1217 use tungstenite::stream::MaybeTlsStream;
1218 use tungstenite::{Message, connect};
1219
1220 fn request(uri: &str, origin: Option<&str>) -> Request {
1221 let mut builder = Request::builder().uri(uri);
1222 if let Some(origin) = origin {
1223 builder = builder.header("Origin", origin);
1224 }
1225 builder.body(()).expect("request build")
1226 }
1227
1228 #[test]
1229 fn query_param_extracts_expected_value() {
1230 assert_eq!(query_param("token=abc&x=1", "token"), Some("abc"));
1231 assert_eq!(query_param("x=1&token=abc", "token"), Some("abc"));
1232 assert_eq!(query_param("x=1", "token"), None);
1233 }
1234
1235 #[test]
1236 fn validate_upgrade_request_allows_matching_origin_and_token() {
1237 let req = request("/ws?token=secret", Some("https://allowed.example"));
1238 let result = validate_upgrade_request(
1239 &req,
1240 &[String::from("https://allowed.example")],
1241 Some("secret"),
1242 );
1243 assert!(result.is_ok());
1244 }
1245
1246 #[test]
1247 fn validate_upgrade_request_rejects_invalid_origin() {
1248 let req = request("/ws?token=secret", Some("https://denied.example"));
1249 let result = validate_upgrade_request(
1250 &req,
1251 &[String::from("https://allowed.example")],
1252 Some("secret"),
1253 );
1254 let rejection = result.expect_err("should reject");
1255 assert_eq!(rejection.status, StatusCode::FORBIDDEN);
1256 }
1257
1258 #[test]
1259 fn validate_upgrade_request_rejects_invalid_token() {
1260 let req = request("/ws?token=wrong", Some("https://allowed.example"));
1261 let result = validate_upgrade_request(
1262 &req,
1263 &[String::from("https://allowed.example")],
1264 Some("secret"),
1265 );
1266 let rejection = result.expect_err("should reject");
1267 assert_eq!(rejection.status, StatusCode::UNAUTHORIZED);
1268 }
1269
1270 #[test]
1271 fn parse_control_message_resize_ping_and_unknown() {
1272 assert_eq!(
1273 parse_control_message(r#"{"type":"resize","cols":120,"rows":40}"#).expect("parse"),
1274 Some(ControlMessage::Resize {
1275 cols: 120,
1276 rows: 40
1277 })
1278 );
1279 assert_eq!(
1280 parse_control_message(r#"{"type":"ping"}"#).expect("parse"),
1281 Some(ControlMessage::Ping)
1282 );
1283 assert_eq!(
1284 parse_control_message(r#"{"type":"unknown"}"#).expect("parse"),
1285 None
1286 );
1287 }
1288
1289 #[test]
1290 fn parse_control_message_rejects_invalid_resize_dimensions() {
1291 let error = parse_control_message(r#"{"type":"resize","cols":0,"rows":40}"#)
1292 .expect_err("invalid dims should fail");
1293 assert_eq!(error.kind(), io::ErrorKind::InvalidInput);
1294 }
1295
1296 #[test]
1299 fn config_default_fields() {
1300 let c = WsPtyBridgeConfig::default();
1301 assert_eq!(c.bind_addr, SocketAddr::from(([127, 0, 0, 1], 9231)));
1302 assert!(c.args.is_empty());
1303 assert_eq!(c.term, "xterm-256color");
1304 assert!(c.env.is_empty());
1305 assert_eq!(c.cols, 120);
1306 assert_eq!(c.rows, 40);
1307 assert!(c.allowed_origins.is_empty());
1308 assert!(c.auth_token.is_none());
1309 assert!(c.telemetry_path.is_none());
1310 assert_eq!(c.max_message_bytes, 256 * 1024);
1311 assert_eq!(c.idle_sleep, Duration::from_millis(5));
1312 assert!(c.accept_once);
1313 }
1314
1315 #[test]
1316 fn config_clone() {
1317 let c1 = WsPtyBridgeConfig::default();
1318 let c2 = c1.clone();
1319 assert_eq!(c2.cols, c1.cols);
1320 assert_eq!(c2.rows, c1.rows);
1321 assert_eq!(c2.term, c1.term);
1322 }
1323
1324 #[test]
1325 fn config_debug() {
1326 let c = WsPtyBridgeConfig::default();
1327 let dbg = format!("{c:?}");
1328 assert!(dbg.contains("WsPtyBridgeConfig"));
1329 assert!(dbg.contains("bind_addr"));
1330 }
1331
1332 #[test]
1335 fn bridge_summary_as_json_contains_all_fields() {
1336 let summary = BridgeSummary {
1337 session_id: "test-123".to_string(),
1338 ws_in_bytes: 100,
1339 ws_out_bytes: 200,
1340 pty_in_bytes: 50,
1341 pty_out_bytes: 150,
1342 resize_events: 3,
1343 exit_code: Some(0),
1344 exit_signal: None,
1345 };
1346 let json = summary.as_json();
1347 assert_eq!(json["session_id"], "test-123");
1348 assert_eq!(json["ws_in_bytes"], 100);
1349 assert_eq!(json["ws_out_bytes"], 200);
1350 assert_eq!(json["pty_in_bytes"], 50);
1351 assert_eq!(json["pty_out_bytes"], 150);
1352 assert_eq!(json["resize_events"], 3);
1353 assert_eq!(json["exit_code"], 0);
1354 assert!(json["exit_signal"].is_null());
1355 }
1356
1357 #[test]
1358 fn bridge_summary_as_json_with_signal() {
1359 let summary = BridgeSummary {
1360 session_id: "s".to_string(),
1361 ws_in_bytes: 0,
1362 ws_out_bytes: 0,
1363 pty_in_bytes: 0,
1364 pty_out_bytes: 0,
1365 resize_events: 0,
1366 exit_code: None,
1367 exit_signal: Some("SIGKILL".to_string()),
1368 };
1369 let json = summary.as_json();
1370 assert!(json["exit_code"].is_null());
1371 assert_eq!(json["exit_signal"], "SIGKILL");
1372 }
1373
1374 #[test]
1375 fn bridge_summary_clone_and_eq() {
1376 let s1 = BridgeSummary {
1377 session_id: "a".to_string(),
1378 ws_in_bytes: 1,
1379 ws_out_bytes: 2,
1380 pty_in_bytes: 3,
1381 pty_out_bytes: 4,
1382 resize_events: 5,
1383 exit_code: Some(42),
1384 exit_signal: None,
1385 };
1386 let s2 = s1.clone();
1387 assert_eq!(s1, s2);
1388 }
1389
1390 #[test]
1391 fn bridge_summary_debug() {
1392 let s = BridgeSummary {
1393 session_id: "x".to_string(),
1394 ws_in_bytes: 0,
1395 ws_out_bytes: 0,
1396 pty_in_bytes: 0,
1397 pty_out_bytes: 0,
1398 resize_events: 0,
1399 exit_code: None,
1400 exit_signal: None,
1401 };
1402 let dbg = format!("{s:?}");
1403 assert!(dbg.contains("BridgeSummary"));
1404 assert!(dbg.contains("session_id"));
1405 }
1406
1407 #[test]
1410 fn control_message_close() {
1411 assert_eq!(
1412 parse_control_message(r#"{"type":"close"}"#).expect("parse"),
1413 Some(ControlMessage::Close)
1414 );
1415 }
1416
1417 #[test]
1418 fn control_message_debug_clone_eq() {
1419 let m = ControlMessage::Resize { cols: 80, rows: 24 };
1420 let m2 = m;
1421 assert_eq!(m, m2);
1422 let dbg = format!("{m:?}");
1423 assert!(dbg.contains("Resize"));
1424 assert!(dbg.contains("80"));
1425 }
1426
1427 #[test]
1430 fn parse_control_message_invalid_json() {
1431 let err = parse_control_message("not json").expect_err("should fail");
1432 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
1433 }
1434
1435 #[test]
1436 fn parse_control_message_missing_type() {
1437 let err = parse_control_message(r#"{"cols":80}"#).expect_err("should fail");
1438 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
1439 }
1440
1441 #[test]
1442 fn parse_control_message_resize_missing_cols() {
1443 let err = parse_control_message(r#"{"type":"resize","rows":40}"#).expect_err("should fail");
1444 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
1445 }
1446
1447 #[test]
1448 fn parse_control_message_resize_missing_rows() {
1449 let err = parse_control_message(r#"{"type":"resize","cols":80}"#).expect_err("should fail");
1450 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
1451 }
1452
1453 #[test]
1454 fn parse_control_message_resize_zero_rows() {
1455 let err = parse_control_message(r#"{"type":"resize","cols":80,"rows":0}"#)
1456 .expect_err("should fail");
1457 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
1458 }
1459
1460 #[test]
1461 fn parse_control_message_resize_zero_cols() {
1462 let err = parse_control_message(r#"{"type":"resize","cols":0,"rows":24}"#)
1463 .expect_err("should fail");
1464 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
1465 }
1466
1467 #[test]
1468 fn parse_control_message_resize_large_values() {
1469 let result =
1471 parse_control_message(r#"{"type":"resize","cols":65535,"rows":65535}"#).expect("parse");
1472 assert_eq!(
1473 result,
1474 Some(ControlMessage::Resize {
1475 cols: 65535,
1476 rows: 65535
1477 })
1478 );
1479 }
1480
1481 #[test]
1482 fn parse_control_message_resize_overflow_u16() {
1483 let err = parse_control_message(r#"{"type":"resize","cols":70000,"rows":40}"#)
1484 .expect_err("should fail");
1485 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
1486 }
1487
1488 #[test]
1491 fn read_u16_field_valid() {
1492 let v: Value = serde_json::from_str(r#"{"x": 42}"#).unwrap();
1493 assert_eq!(read_u16_field(&v, "x").unwrap(), 42);
1494 }
1495
1496 #[test]
1497 fn read_u16_field_missing() {
1498 let v: Value = serde_json::from_str(r#"{"x": 42}"#).unwrap();
1499 let err = read_u16_field(&v, "y").expect_err("should fail");
1500 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
1501 }
1502
1503 #[test]
1504 fn read_u16_field_not_numeric() {
1505 let v: Value = serde_json::from_str(r#"{"x": "hello"}"#).unwrap();
1506 let err = read_u16_field(&v, "x").expect_err("should fail");
1507 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
1508 }
1509
1510 #[test]
1511 fn read_u16_field_overflow() {
1512 let v: Value = serde_json::from_str(r#"{"x": 100000}"#).unwrap();
1513 let err = read_u16_field(&v, "x").expect_err("should fail");
1514 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
1515 }
1516
1517 #[test]
1520 fn query_param_empty_string() {
1521 assert_eq!(query_param("", "token"), None);
1522 }
1523
1524 #[test]
1525 fn query_param_missing_value() {
1526 assert_eq!(query_param("token", "token"), Some(""));
1527 }
1528
1529 #[test]
1530 fn query_param_first_of_duplicates() {
1531 assert_eq!(
1532 query_param("token=first&token=second", "token"),
1533 Some("first")
1534 );
1535 }
1536
1537 #[test]
1538 fn query_param_value_with_equals() {
1539 assert_eq!(query_param("token=a=b", "token"), Some("a=b"));
1540 }
1541
1542 #[test]
1545 fn validate_no_origin_required_no_token_required() {
1546 let req = request("/ws", None);
1547 let result = validate_upgrade_request(&req, &[], None);
1548 assert!(result.is_ok());
1549 }
1550
1551 #[test]
1552 fn validate_origin_required_but_header_missing() {
1553 let req = request("/ws", None);
1554 let result =
1555 validate_upgrade_request(&req, &[String::from("https://allowed.example")], None);
1556 let rejection = result.expect_err("should reject");
1557 assert_eq!(rejection.status, StatusCode::FORBIDDEN);
1558 assert!(rejection.body.contains("Origin"));
1559 }
1560
1561 #[test]
1562 fn validate_token_required_but_no_query_string() {
1563 let req = request("/ws", None);
1564 let result = validate_upgrade_request(&req, &[], Some("secret"));
1565 let rejection = result.expect_err("should reject");
1566 assert_eq!(rejection.status, StatusCode::UNAUTHORIZED);
1567 }
1568
1569 #[test]
1570 fn validate_token_required_but_missing_from_query() {
1571 let req = request("/ws?other=value", None);
1572 let result = validate_upgrade_request(&req, &[], Some("secret"));
1573 let rejection = result.expect_err("should reject");
1574 assert_eq!(rejection.status, StatusCode::UNAUTHORIZED);
1575 }
1576
1577 #[test]
1578 fn validate_correct_token_no_origin_restriction() {
1579 let req = request("/ws?token=mysecret", None);
1580 let result = validate_upgrade_request(&req, &[], Some("mysecret"));
1581 assert!(result.is_ok());
1582 }
1583
1584 #[test]
1587 fn handshake_rejection_into_response() {
1588 let rejection = HandshakeRejection {
1589 status: StatusCode::FORBIDDEN,
1590 body: "test body".to_string(),
1591 };
1592 let response = rejection.into_response();
1593 assert_eq!(response.status(), StatusCode::FORBIDDEN);
1594 }
1595
1596 #[test]
1597 fn handshake_rejection_debug_clone_eq() {
1598 let r1 = HandshakeRejection {
1599 status: StatusCode::UNAUTHORIZED,
1600 body: "bad".to_string(),
1601 };
1602 let r2 = r1.clone();
1603 assert_eq!(r1, r2);
1604 let dbg = format!("{r1:?}");
1605 assert!(dbg.contains("HandshakeRejection"));
1606 }
1607
1608 #[test]
1611 fn counters_default() {
1612 let c = Counters::default();
1613 assert_eq!(c.ws_in_bytes, 0);
1614 assert_eq!(c.ws_out_bytes, 0);
1615 assert_eq!(c.pty_in_bytes, 0);
1616 assert_eq!(c.pty_out_bytes, 0);
1617 assert_eq!(c.resize_events, 0);
1618 }
1619
1620 #[test]
1621 fn counters_debug() {
1622 let c = Counters::default();
1623 let dbg = format!("{c:?}");
1624 assert!(dbg.contains("Counters"));
1625 }
1626
1627 #[test]
1630 fn make_session_id_format() {
1631 let id = make_session_id();
1632 assert!(id.starts_with("ws-bridge-"));
1633 assert!(id.len() > 15);
1634 }
1635
1636 #[test]
1637 fn make_session_id_unique() {
1638 let id1 = make_session_id();
1639 thread::sleep(Duration::from_millis(1));
1640 let id2 = make_session_id();
1641 assert_ne!(id1, id2);
1642 }
1643
1644 #[test]
1647 fn now_iso8601_format() {
1648 let ts = now_iso8601();
1649 assert!(ts.contains('T'));
1650 assert!(ts.contains('-'));
1651 assert!(ts.len() >= 20);
1652 }
1653
1654 #[test]
1657 fn telemetry_sink_no_path_write_is_noop() {
1658 let mut sink = TelemetrySink::new(None, "test").expect("create sink");
1659 sink.write("event", json!({"key": "value"})).expect("write");
1661 assert_eq!(sink.seq, 0);
1662 }
1663
1664 #[test]
1665 fn telemetry_sink_with_path_writes_jsonl() {
1666 let dir = std::env::temp_dir().join("ftui-test-telemetry");
1667 std::fs::create_dir_all(&dir).expect("create dir");
1668 let path = dir.join("test_telemetry.jsonl");
1669 let _ = std::fs::remove_file(&path);
1670
1671 {
1672 let mut sink = TelemetrySink::new(Some(&path), "sess-1").expect("create sink");
1673 sink.write("start", json!({"x": 1})).expect("write 1");
1674 sink.write("end", json!({"x": 2})).expect("write 2");
1675 assert_eq!(sink.seq, 2);
1676 }
1677
1678 let content = std::fs::read_to_string(&path).expect("read file");
1679 let lines: Vec<&str> = content.lines().collect();
1680 assert_eq!(lines.len(), 2);
1681 for line in &lines {
1682 let v: Value = serde_json::from_str(line).expect("parse JSON");
1683 assert_eq!(v["session_id"], "sess-1");
1684 assert!(v["ts"].is_string());
1685 assert!(v["event"].is_string());
1686 }
1687
1688 let _ = std::fs::remove_file(&path);
1689 let _ = std::fs::remove_dir(&dir);
1690 }
1691
1692 #[test]
1693 fn flow_control_event_writes_expected_payload_fields() {
1694 let dir = std::env::temp_dir().join("ftui-test-flow-control-event");
1695 std::fs::create_dir_all(&dir).expect("create dir");
1696 let path = dir.join("flow_control_event.jsonl");
1697 let _ = std::fs::remove_file(&path);
1698
1699 {
1700 let mut sink = TelemetrySink::new(Some(&path), "sess-fc").expect("create sink");
1701 emit_flow_control_event(&mut sink, "output", "stall", 65_536, 65_000)
1702 .expect("write flow_control");
1703 }
1704
1705 let content = std::fs::read_to_string(&path).expect("read file");
1706 let lines: Vec<&str> = content.lines().collect();
1707 assert_eq!(lines.len(), 1);
1708
1709 let parsed: Value = serde_json::from_str(lines[0]).expect("parse flow_control event");
1710 assert_eq!(parsed["event"], "flow_control");
1711 assert_eq!(parsed["payload"]["direction"], "output");
1712 assert_eq!(parsed["payload"]["action"], "stall");
1713 assert_eq!(parsed["payload"]["window_bytes"], 65_536);
1714 assert_eq!(parsed["payload"]["queued_bytes"], 65_000);
1715
1716 let _ = std::fs::remove_file(&path);
1717 let _ = std::fs::remove_dir(&dir);
1718 }
1719
1720 #[test]
1721 fn flow_control_stall_emits_only_on_pause_transition() {
1722 let dir = std::env::temp_dir().join("ftui-test-flow-control-stall");
1723 std::fs::create_dir_all(&dir).expect("create dir");
1724 let path = dir.join("flow_control_stall.jsonl");
1725 let _ = std::fs::remove_file(&path);
1726
1727 let mut fc = default_fc_state();
1728 let payload = [b'x'; 128];
1729 fc.enqueue_output(&payload);
1730 fc.pty_reads_paused = true;
1731
1732 {
1733 let mut sink = TelemetrySink::new(Some(&path), "sess-stall").expect("create sink");
1734 emit_flow_control_stall_if_transitioned(&mut sink, &fc, false)
1735 .expect("emit rising-edge stall");
1736 emit_flow_control_stall_if_transitioned(&mut sink, &fc, true)
1737 .expect("do not emit while already paused");
1738 }
1739
1740 let content = std::fs::read_to_string(&path).expect("read file");
1741 let lines: Vec<&str> = content.lines().collect();
1742 assert_eq!(lines.len(), 1);
1743
1744 let parsed: Value = serde_json::from_str(lines[0]).expect("parse flow_control stall");
1745 assert_eq!(parsed["event"], "flow_control");
1746 assert_eq!(parsed["payload"]["direction"], "output");
1747 assert_eq!(parsed["payload"]["action"], "stall");
1748 assert_eq!(parsed["payload"]["window_bytes"], fc.output_window);
1749 assert_eq!(parsed["payload"]["queued_bytes"], fc.output_queue_bytes());
1750
1751 let _ = std::fs::remove_file(&path);
1752 let _ = std::fs::remove_dir(&dir);
1753 }
1754
1755 #[cfg(unix)]
1756 #[test]
1757 fn bridge_smoke_echoes_bytes_through_pty() {
1758 let listener =
1759 TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("bind ephemeral port");
1760 let bind_addr = listener.local_addr().expect("local addr");
1761 drop(listener);
1762
1763 let config = WsPtyBridgeConfig {
1764 bind_addr,
1765 accept_once: true,
1766 command: "/bin/sh".to_string(),
1767 args: vec!["-c".to_string(), "cat".to_string()],
1768 idle_sleep: Duration::from_millis(1),
1769 ..WsPtyBridgeConfig::default()
1770 };
1771
1772 let handle = thread::spawn(move || run_ws_pty_bridge(config));
1773 thread::sleep(Duration::from_millis(75));
1774
1775 let url = format!("ws://{bind_addr}/ws");
1776 let (mut client, _response) = connect(url).expect("connect websocket");
1777 if let MaybeTlsStream::Plain(stream) = client.get_mut() {
1778 stream
1779 .set_read_timeout(Some(Duration::from_millis(50)))
1780 .expect("set read timeout");
1781 }
1782 client
1783 .send(Message::Binary(b"hello-through-bridge\n".to_vec().into()))
1784 .expect("send input");
1785
1786 let deadline = Instant::now() + Duration::from_secs(3);
1787 let mut observed = Vec::new();
1788 let mut last_error: Option<WsError> = None;
1789 while Instant::now() < deadline {
1790 match client.read() {
1791 Ok(Message::Binary(bytes)) => {
1792 observed.extend_from_slice(bytes.as_ref());
1793 if observed
1794 .windows(b"hello-through-bridge".len())
1795 .any(|window| window == b"hello-through-bridge")
1796 {
1797 break;
1798 }
1799 }
1800 Ok(_) => {}
1801 Err(WsError::Io(error))
1802 if matches!(
1803 error.kind(),
1804 io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut
1805 ) => {}
1806 Err(error) => {
1807 last_error = Some(error);
1808 break;
1809 }
1810 }
1811 }
1812
1813 let saw_echo = observed
1814 .windows(b"hello-through-bridge".len())
1815 .any(|window| window == b"hello-through-bridge");
1816
1817 if let Err(err) = client.send(Message::Text(r#"{"type":"close"}"#.to_string().into())) {
1818 if last_error.is_none() {
1820 last_error = Some(err);
1821 }
1822 }
1823 let result = handle.join().expect("bridge thread join");
1824 result.expect("bridge result");
1825
1826 assert!(
1827 saw_echo,
1828 "expected PTY echo in websocket output; last_error={last_error:?}; observed_len={}",
1829 observed.len()
1830 );
1831 }
1832
1833 #[test]
1836 fn flow_control_bridge_config_default() {
1837 let c = FlowControlBridgeConfig::default();
1838 assert_eq!(c.output_window, 65_536);
1839 assert_eq!(c.input_window, 8_192);
1840 assert_eq!(c.coalesce_resize_ms, 50);
1841 }
1842
1843 #[test]
1844 fn flow_control_bridge_config_debug_clone() {
1845 let c = FlowControlBridgeConfig::default();
1846 let c2 = c.clone();
1847 let dbg = format!("{c2:?}");
1848 assert!(dbg.contains("FlowControlBridgeConfig"));
1849 assert!(dbg.contains("output_window"));
1850 }
1851
1852 fn default_fc_state() -> FlowControlBridgeState {
1855 FlowControlBridgeState::new(&FlowControlBridgeConfig::default())
1856 }
1857
1858 #[test]
1859 fn fc_state_new_has_empty_queue() {
1860 let fc = default_fc_state();
1861 assert_eq!(fc.output_queue_bytes(), 0);
1862 assert!(!fc.pty_reads_paused);
1863 assert_eq!(fc.input_pending_bytes, 0);
1864 }
1865
1866 #[test]
1867 fn fc_enqueue_and_drain_output() {
1868 let mut fc = default_fc_state();
1869 fc.enqueue_output(&[0xAA; 100]);
1870 assert_eq!(fc.output_queue_bytes(), 100);
1871
1872 let batch = fc.drain_output(50);
1873 assert_eq!(batch.len(), 50);
1874 assert_eq!(fc.output_queue_bytes(), 50);
1875
1876 let batch2 = fc.drain_output(100);
1877 assert_eq!(batch2.len(), 50);
1878 assert_eq!(fc.output_queue_bytes(), 0);
1879 }
1880
1881 #[test]
1882 fn fc_enqueue_respects_hard_cap() {
1883 let mut config = FlowControlBridgeConfig::default();
1884 config.policy.output_hard_cap_bytes = 200;
1885 let mut fc = FlowControlBridgeState::new(&config);
1886
1887 fc.enqueue_output(&[0xBB; 300]);
1888 assert_eq!(fc.output_queue_bytes(), 200);
1890 }
1891
1892 #[test]
1893 fn fc_drain_respects_window() {
1894 let config = FlowControlBridgeConfig {
1895 output_window: 50,
1896 ..FlowControlBridgeConfig::default()
1897 };
1898 let mut fc = FlowControlBridgeState::new(&config);
1899
1900 fc.enqueue_output(&[0xCC; 200]);
1901 let batch = fc.drain_output(200);
1902 assert_eq!(batch.len(), 50);
1904 assert_eq!(fc.output_queue_bytes(), 150);
1905 }
1906
1907 #[test]
1908 fn fc_drain_all_output_ignores_window() {
1909 let config = FlowControlBridgeConfig {
1910 output_window: 10,
1911 ..FlowControlBridgeConfig::default()
1912 };
1913 let mut fc = FlowControlBridgeState::new(&config);
1914
1915 fc.enqueue_output(&[0xDD; 100]);
1916 let batch = fc.drain_all_output();
1917 assert_eq!(batch.len(), 100);
1918 assert_eq!(fc.output_queue_bytes(), 0);
1919 }
1920
1921 #[test]
1922 fn fc_output_queue_peak_tracked() {
1923 let mut fc = default_fc_state();
1924 fc.enqueue_output(&[0x01; 500]);
1925 assert_eq!(fc.fc_counters.output_queue_peak_bytes, 500);
1926 fc.drain_output(200);
1927 fc.enqueue_output(&[0x02; 100]);
1928 assert_eq!(fc.fc_counters.output_queue_peak_bytes, 500);
1930 fc.enqueue_output(&[0x03; 500]);
1931 assert!(fc.fc_counters.output_queue_peak_bytes >= 800);
1933 }
1934
1935 #[test]
1938 fn fc_input_arrival_and_service() {
1939 let mut fc = default_fc_state();
1940 fc.record_input_arrival(100);
1941 assert_eq!(fc.input_pending_bytes, 100);
1942 assert_eq!(fc.input_consumed, 100);
1943
1944 fc.record_input_serviced(60);
1945 assert_eq!(fc.input_pending_bytes, 40);
1946 assert_eq!(fc.serviced_input_bytes, 60);
1947 }
1948
1949 #[test]
1950 fn fc_should_drop_interactive_never() {
1951 let mut config = FlowControlBridgeConfig::default();
1952 config.policy.input_hard_cap_bytes = 100;
1953 let mut fc = FlowControlBridgeState::new(&config);
1954 fc.input_pending_bytes = 200; assert!(!fc.should_drop_input(InputEventClass::Interactive));
1957 }
1958
1959 #[test]
1960 fn fc_should_drop_noninteractive_at_hard_cap() {
1961 let mut config = FlowControlBridgeConfig::default();
1962 config.policy.input_hard_cap_bytes = 100;
1963 let mut fc = FlowControlBridgeState::new(&config);
1964
1965 fc.input_pending_bytes = 50;
1966 assert!(!fc.should_drop_input(InputEventClass::NonInteractive));
1967
1968 fc.input_pending_bytes = 100;
1969 assert!(fc.should_drop_input(InputEventClass::NonInteractive));
1970
1971 fc.input_pending_bytes = 200;
1972 assert!(fc.should_drop_input(InputEventClass::NonInteractive));
1973 }
1974
1975 #[test]
1978 fn fc_coalesce_resize_disabled() {
1979 let config = FlowControlBridgeConfig {
1980 coalesce_resize_ms: 0,
1981 ..FlowControlBridgeConfig::default()
1982 };
1983 let mut fc = FlowControlBridgeState::new(&config);
1984
1985 fc.coalesce_resize(80, 24);
1986 let result = fc.flush_pending_resize();
1987 assert_eq!(result, Some((80, 24)));
1988 }
1989
1990 #[test]
1991 fn fc_coalesce_resize_defers() {
1992 let config = FlowControlBridgeConfig {
1993 coalesce_resize_ms: 100, ..FlowControlBridgeConfig::default()
1995 };
1996 let mut fc = FlowControlBridgeState::new(&config);
1997
1998 fc.coalesce_resize(80, 24);
1999 let result = fc.flush_pending_resize();
2001 assert!(result.is_none());
2002 }
2003
2004 #[test]
2005 fn fc_coalesce_resize_overwrite() {
2006 let config = FlowControlBridgeConfig {
2007 coalesce_resize_ms: 0,
2008 ..FlowControlBridgeConfig::default()
2009 };
2010 let mut fc = FlowControlBridgeState::new(&config);
2011
2012 fc.coalesce_resize(80, 24);
2013 fc.coalesce_resize(120, 40); let result = fc.flush_pending_resize();
2015 assert_eq!(result, Some((120, 40))); }
2017
2018 #[test]
2019 fn fc_coalesce_resize_flushes_after_timeout() {
2020 let config = FlowControlBridgeConfig {
2021 coalesce_resize_ms: 10, ..FlowControlBridgeConfig::default()
2023 };
2024 let mut fc = FlowControlBridgeState::new(&config);
2025
2026 fc.coalesce_resize(132, 50);
2027 thread::sleep(Duration::from_millis(15));
2028 let result = fc.flush_pending_resize();
2029 assert_eq!(result, Some((132, 50)));
2030 assert_eq!(fc.fc_counters.resizes_coalesced, 1);
2031 }
2032
2033 #[test]
2034 fn fc_flush_no_pending_resize() {
2035 let mut fc = default_fc_state();
2036 assert_eq!(fc.flush_pending_resize(), None);
2037 }
2038
2039 #[test]
2042 fn fc_build_snapshot_empty_state() {
2043 let fc = default_fc_state();
2044 let snapshot = fc.build_snapshot();
2045 assert_eq!(snapshot.queues.input, 0);
2046 assert_eq!(snapshot.queues.output, 0);
2047 assert_eq!(snapshot.serviced_input_bytes, 0);
2048 assert_eq!(snapshot.serviced_output_bytes, 0);
2049 }
2050
2051 #[test]
2052 fn fc_build_snapshot_with_data() {
2053 let mut fc = default_fc_state();
2054 fc.enqueue_output(&[0xAA; 1000]);
2055 fc.record_input_arrival(200);
2056 fc.record_input_serviced(150);
2057
2058 let snapshot = fc.build_snapshot();
2059 assert_eq!(snapshot.queues.output, 1000);
2060 assert_eq!(snapshot.queues.input, 50);
2061 assert_eq!(snapshot.serviced_input_bytes, 150);
2062 }
2063
2064 #[test]
2065 fn fc_evaluate_no_pause_when_idle() {
2066 let mut fc = default_fc_state();
2067 let decision = fc.evaluate();
2068 assert!(!decision.should_pause_pty_reads);
2070 }
2071
2072 #[test]
2073 fn fc_evaluate_pauses_reads_at_hard_cap() {
2074 let mut config = FlowControlBridgeConfig::default();
2075 config.policy.output_hard_cap_bytes = 500;
2076 let mut fc = FlowControlBridgeState::new(&config);
2077
2078 fc.enqueue_output(&[0xAA; 500]);
2080 let decision = fc.evaluate();
2081 assert!(decision.should_pause_pty_reads);
2082 assert!(fc.pty_reads_paused);
2083 assert_eq!(fc.fc_counters.pty_read_pauses, 1);
2084 }
2085
2086 #[test]
2087 fn fc_evaluate_resumes_reads_after_drain() {
2088 let mut config = FlowControlBridgeConfig::default();
2089 config.policy.output_hard_cap_bytes = 500;
2090 config.output_window = 500;
2091 let mut fc = FlowControlBridgeState::new(&config);
2092
2093 fc.enqueue_output(&[0xAA; 500]);
2094 let _ = fc.evaluate();
2095 assert!(fc.pty_reads_paused);
2096
2097 let _ = fc.drain_output(400);
2099 let decision = fc.evaluate();
2100 assert!(!decision.should_pause_pty_reads);
2101 assert!(!fc.pty_reads_paused);
2102 }
2103
2104 #[test]
2107 fn fc_replenish_at_50_percent_consumption() {
2108 let mut fc = default_fc_state();
2109 fc.input_consumed = 4096;
2111 assert!(fc.should_send_replenish());
2112 }
2113
2114 #[test]
2115 fn fc_replenish_not_at_low_consumption() {
2116 let fc = default_fc_state();
2117 let result = fc.should_send_replenish();
2123 let _ = result;
2125 }
2126
2127 #[test]
2128 fn fc_record_replenish_resets_state() {
2129 let mut fc = default_fc_state();
2130 fc.input_consumed = 5000;
2131 fc.record_replenish_sent();
2132 assert_eq!(fc.input_consumed, 0);
2133 assert_eq!(fc.fc_counters.replenishments_sent, 1);
2134 }
2135
2136 #[test]
2139 fn fc_process_flow_control_msg_replenishes_window() {
2140 let mut fc = default_fc_state();
2141 fc.output_consumed = 30000;
2142 fc.process_flow_control_msg(20000);
2143 assert_eq!(fc.output_consumed, 10000);
2144 }
2145
2146 #[test]
2147 fn fc_process_flow_control_msg_saturates() {
2148 let mut fc = default_fc_state();
2149 fc.output_consumed = 100;
2150 fc.process_flow_control_msg(200);
2151 assert_eq!(fc.output_consumed, 0);
2152 }
2153
2154 #[test]
2157 fn fc_rate_window_resets_after_interval() {
2158 let mut fc = default_fc_state();
2159 fc.rate_in_arrived = 1000;
2160 fc.rate_out_arrived = 2000;
2161 fc.rate_in_serviced = 800;
2162 fc.rate_out_serviced = 1500;
2163 fc.rate_window_start = Instant::now() - Duration::from_secs(2);
2165 fc.maybe_reset_rate_window();
2166 assert_eq!(fc.rate_in_arrived, 0);
2167 assert_eq!(fc.rate_out_arrived, 0);
2168 assert_eq!(fc.rate_in_serviced, 0);
2169 assert_eq!(fc.rate_out_serviced, 0);
2170 }
2171
2172 #[test]
2173 fn fc_rate_window_does_not_reset_early() {
2174 let mut fc = default_fc_state();
2175 fc.rate_in_arrived = 1000;
2176 fc.maybe_reset_rate_window();
2177 assert_eq!(fc.rate_in_arrived, 1000);
2178 }
2179
2180 #[test]
2183 fn fc_summary_json_contains_expected_fields() {
2184 let mut fc = default_fc_state();
2185 fc.fc_counters.input_drops = 5;
2186 fc.fc_counters.decisions_non_stable = 10;
2187 fc.serviced_input_bytes = 1000;
2188 fc.serviced_output_bytes = 2000;
2189
2190 let summary = fc.summary_json();
2191 assert_eq!(summary["input_drops"], 5);
2192 assert_eq!(summary["decisions_non_stable"], 10);
2193 assert_eq!(summary["serviced_input_bytes"], 1000);
2194 assert_eq!(summary["serviced_output_bytes"], 2000);
2195 }
2196
2197 #[test]
2200 fn config_default_no_flow_control() {
2201 let c = WsPtyBridgeConfig::default();
2202 assert!(c.flow_control.is_none());
2203 }
2204
2205 #[test]
2206 fn config_with_flow_control() {
2207 let c = WsPtyBridgeConfig {
2208 flow_control: Some(FlowControlBridgeConfig::default()),
2209 ..WsPtyBridgeConfig::default()
2210 };
2211 assert!(c.flow_control.is_some());
2212 assert_eq!(c.flow_control.as_ref().unwrap().output_window, 65_536);
2213 }
2214
2215 #[cfg(unix)]
2218 #[test]
2219 fn bridge_smoke_with_flow_control() {
2220 let listener =
2221 TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))).expect("bind ephemeral port");
2222 let bind_addr = listener.local_addr().expect("local addr");
2223 drop(listener);
2224
2225 let config = WsPtyBridgeConfig {
2226 bind_addr,
2227 accept_once: true,
2228 command: "/bin/sh".to_string(),
2229 args: vec!["-c".to_string(), "cat".to_string()],
2230 idle_sleep: Duration::from_millis(1),
2231 flow_control: Some(FlowControlBridgeConfig::default()),
2232 ..WsPtyBridgeConfig::default()
2233 };
2234
2235 let handle = thread::spawn(move || run_ws_pty_bridge(config));
2236 thread::sleep(Duration::from_millis(75));
2237
2238 let url = format!("ws://{bind_addr}/ws");
2239 let (mut client, _response) = connect(url).expect("connect websocket");
2240 if let MaybeTlsStream::Plain(stream) = client.get_mut() {
2241 stream
2242 .set_read_timeout(Some(Duration::from_millis(50)))
2243 .expect("set read timeout");
2244 }
2245 client
2246 .send(Message::Binary(b"fc-echo-test\n".to_vec().into()))
2247 .expect("send input");
2248
2249 let deadline = Instant::now() + Duration::from_secs(3);
2250 let mut observed = Vec::new();
2251 let mut last_error: Option<WsError> = None;
2252 while Instant::now() < deadline {
2253 match client.read() {
2254 Ok(Message::Binary(bytes)) => {
2255 observed.extend_from_slice(bytes.as_ref());
2256 if observed
2257 .windows(b"fc-echo-test".len())
2258 .any(|window| window == b"fc-echo-test")
2259 {
2260 break;
2261 }
2262 }
2263 Ok(_) => {}
2264 Err(WsError::Io(error))
2265 if matches!(
2266 error.kind(),
2267 io::ErrorKind::WouldBlock | io::ErrorKind::TimedOut
2268 ) => {}
2269 Err(error) => {
2270 last_error = Some(error);
2271 break;
2272 }
2273 }
2274 }
2275
2276 let saw_echo = observed
2277 .windows(b"fc-echo-test".len())
2278 .any(|window| window == b"fc-echo-test");
2279
2280 if let Err(err) = client.send(Message::Text(r#"{"type":"close"}"#.to_string().into()))
2281 && last_error.is_none()
2282 {
2283 last_error = Some(err);
2284 }
2285 let result = handle.join().expect("bridge thread join");
2286 result.expect("bridge result");
2287
2288 assert!(
2289 saw_echo,
2290 "expected PTY echo with flow control; last_error={last_error:?}; observed_len={}",
2291 observed.len()
2292 );
2293 }
2294
2295 #[cfg(unix)]
2296 #[test]
2297 fn pty_bridge_session_drop_does_not_block_when_background_process_keeps_pty_open() {
2298 let shell = std::env::var("SHELL").unwrap_or_else(|_| "/bin/sh".to_string());
2299 let (done_tx, done_rx) = mpsc::channel();
2300 let drop_thread = thread::spawn(move || {
2301 let session = PtyBridgeSession::spawn(&WsPtyBridgeConfig {
2302 command: shell,
2303 args: vec!["-c".to_string(), "sleep 1 >/dev/null 2>&1 &".to_string()],
2304 telemetry_path: None,
2305 ..WsPtyBridgeConfig::default()
2306 })
2307 .expect("spawn bridge PTY");
2308 drop(session);
2309 done_tx.send(()).expect("signal drop completion");
2310 });
2311
2312 assert!(
2313 done_rx.recv_timeout(Duration::from_millis(400)).is_ok(),
2314 "PtyBridgeSession drop should not wait for background descendants to close the PTY"
2315 );
2316 drop_thread.join().expect("drop thread join");
2317 }
2318}