Skip to main content

bastion/
lib.rs

1//! # bastion — block-aware web terminal
2//!
3//! Serves a persistent, OSC 133–segmented shell to the browser over a
4//! WebSocket. Each session owns a PTY spawned with WezTerm's shell
5//! integration injected, so the byte stream is cut into
6//! `{command, output, exit}` blocks and kept alongside a small raw-byte
7//! ring for reconnect replay.
8//!
9//! ## Quick start
10//!
11//! ```no_run
12//! use axum::Router;
13//!
14//! # #[tokio::main]
15//! # async fn main() -> std::io::Result<()> {
16//! let mgr = bastion::Manager::new();
17//! let app: Router = Router::new().nest("/term", bastion::router(mgr));
18//! let listener = tokio::net::TcpListener::bind("127.0.0.1:7681").await?;
19//! axum::serve(listener, app).await.unwrap();
20//! # Ok(())
21//! # }
22//! ```
23//!
24//! Routes (relative to wherever you mount the router):
25//!   - `GET  /`               — SPA HTML
26//!   - `GET  /api/sessions`   — list sessions
27//!   - `POST /api/sessions`   — create a session
28//!   - `DEL  /api/sessions/:id` — kill a session
29//!   - `GET  /ws/:id`         — WebSocket
30//!
31//! ## WebSocket protocol
32//!
33//! Client → server:
34//!   - binary frames = stdin bytes (forwarded to PTY)
35//!   - text JSON `{type:"resize",cols,rows}` | `{type:"hello",have_up_to:N}`
36//!
37//! Server → client:
38//!   - binary frames = raw PTY bytes (feed to xterm.js)
39//!   - text JSON `{type:"block",...record}` | `{type:"exit",code}` |
40//!     `{type:"gap",from,to}` | `{type:"ready",seq}`
41
42use std::collections::VecDeque;
43use std::io::{Read, Write};
44use std::sync::Arc;
45use std::time::{SystemTime, UNIX_EPOCH};
46
47use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
48use axum::extract::{Path, State};
49use axum::response::{Html, IntoResponse};
50use axum::routing::{delete, get};
51use axum::{Json, Router};
52use base64::Engine as _;
53use portable_pty::{native_pty_system, CommandBuilder, MasterPty, PtySize};
54use serde::{Deserialize, Serialize};
55use tokio::sync::{broadcast, Mutex, RwLock};
56use uuid::Uuid;
57use vte::{Params, Parser, Perform};
58
59/// Vendored from wezterm/assets/shell-integration/wezterm.sh.
60/// Emits OSC 133 A/B/C/D around prompt/input/command/exit boundaries for
61/// both bash and zsh; works from bash 3.1+ and any zsh.
62const SHELL_INTEGRATION_SH: &str = include_str!("shell_integration.sh");
63
64/// Cap on the raw-byte ring per session. Enough to replay a full screen
65/// on reconnect without letting the agent become a log store.
66const RING_CAP: usize = 256 * 1024;
67
68/// Cap on committed blocks kept per session. The browser has IndexedDB
69/// for long history; the agent only keeps enough for a gap-free reconnect.
70const BLOCKS_CAP: usize = 128;
71
72/// Broadcast buffer size for live subscribers. Small: slow consumers are
73/// expected to drop and resync from the ring.
74const BROADCAST_CAP: usize = 256;
75
76/// Default PTY geometry until the client sends a resize.
77const DEFAULT_COLS: u16 = 120;
78const DEFAULT_ROWS: u16 = 32;
79
80// ---------------------------------------------------------------------
81// Types exposed to the wire
82// ---------------------------------------------------------------------
83
84#[derive(Clone, Debug, Serialize)]
85pub struct BlockRecord {
86    pub session_id: String,
87    pub seq: u64,
88    pub started_at_ms: u64,
89    pub ended_at_ms: u64,
90    pub command: String,
91    /// base64 of the raw ANSI output slice C→D
92    pub output_b64: String,
93    pub exit_code: i32,
94}
95
96#[derive(Clone, Debug, Serialize)]
97pub struct SessionInfo {
98    pub id: String,
99    pub title: String,
100    pub created_at_ms: u64,
101    pub next_seq: u64,
102}
103
104#[derive(Deserialize)]
105#[allow(dead_code)] // `have_up_to` is wire-level; M3 will consume it
106struct HelloMsg {
107    have_up_to: Option<u64>,
108}
109
110#[derive(Deserialize)]
111struct ResizeMsg {
112    cols: u16,
113    rows: u16,
114}
115
116#[derive(Deserialize)]
117#[serde(tag = "type", rename_all = "lowercase")]
118#[allow(dead_code)]
119enum ClientMsg {
120    Hello(HelloMsg),
121    Resize(ResizeMsg),
122}
123
124// ---------------------------------------------------------------------
125// Session
126// ---------------------------------------------------------------------
127
128#[derive(Clone)]
129enum Event {
130    Raw(Arc<Vec<u8>>),
131    Block(Arc<BlockRecord>),
132    Exit(i32),
133}
134
135struct Session {
136    id: String,
137    title: String,
138    created_at_ms: u64,
139    master: Arc<Mutex<Box<dyn MasterPty + Send>>>,
140    writer: Arc<Mutex<Box<dyn Write + Send>>>,
141    ring: Arc<Mutex<VecDeque<u8>>>,
142    blocks: Arc<RwLock<VecDeque<BlockRecord>>>,
143    next_seq: Arc<Mutex<u64>>,
144    tx: broadcast::Sender<Event>,
145    /// PID of the spawned shell so `Manager::remove` can SIGHUP it.
146    /// The waiter thread owns the `Child` handle; we track the PID
147    /// separately to avoid a lock contest with `wait()`.
148    pid: Option<u32>,
149}
150
151impl Session {
152    fn info(&self) -> SessionInfo {
153        SessionInfo {
154            id: self.id.clone(),
155            title: self.title.clone(),
156            created_at_ms: self.created_at_ms,
157            next_seq: self.next_seq.try_lock().map(|g| *g).unwrap_or(0),
158        }
159    }
160
161    /// Best-effort termination: SIGHUP first (lets bash exit cleanly),
162    /// SIGKILL after a short grace period if still alive.
163    fn kill(&self) {
164        let Some(pid) = self.pid else { return };
165        let pid = pid as i32;
166        unsafe {
167            libc::kill(pid, libc::SIGHUP);
168        }
169        // Reader thread's EOF + waiter tear-down take care of the rest.
170        // We don't SIGKILL here — the waiter will hit `wait()` and move on,
171        // and if the process wedges, a later manual kill can handle it.
172        let _ = pid;
173    }
174}
175
176// ---------------------------------------------------------------------
177// Manager
178// ---------------------------------------------------------------------
179
180/// A function that wraps the SPA body in HTML chrome (title + whatever
181/// nav/branding the host app wants). Called with `(title, body_html)`,
182/// must return the complete HTML document string.
183pub type ShellFn = Arc<dyn Fn(&str, &str) -> String + Send + Sync>;
184
185#[derive(Clone)]
186pub struct Manager {
187    inner: Arc<RwLock<std::collections::HashMap<String, Arc<Session>>>>,
188    shell: ShellFn,
189}
190
191impl Default for Manager {
192    fn default() -> Self {
193        Self::new()
194    }
195}
196
197impl Manager {
198    /// New manager with the default (minimal dark-themed) HTML shell.
199    pub fn new() -> Self {
200        Self {
201            inner: Arc::new(RwLock::new(Default::default())),
202            shell: Arc::new(default_shell),
203        }
204    }
205
206    /// Replace the HTML chrome. Useful when embedding bastion inside a
207    /// host app that has its own nav / branding.
208    pub fn with_shell<F>(mut self, f: F) -> Self
209    where
210        F: Fn(&str, &str) -> String + Send + Sync + 'static,
211    {
212        self.shell = Arc::new(f);
213        self
214    }
215
216    async fn list(&self) -> Vec<SessionInfo> {
217        let g = self.inner.read().await;
218        g.values().map(|s| s.info()).collect()
219    }
220
221    async fn get(&self, id: &str) -> Option<Arc<Session>> {
222        self.inner.read().await.get(id).cloned()
223    }
224
225    async fn create(&self, title: String) -> std::io::Result<Arc<Session>> {
226        let id = short_id();
227        let (session, child) = spawn_session(id.clone(), title).await?;
228        self.inner.write().await.insert(id.clone(), session.clone());
229
230        // Waiter: reap the child on a blocking thread, then come back to
231        // async to broadcast Exit and self-remove from the manager so the
232        // map doesn't accumulate dead sessions.
233        let tx = session.tx.clone();
234        let manager = self.clone();
235        let wait_id = id;
236        tokio::spawn(async move {
237            let code = tokio::task::spawn_blocking(move || {
238                let mut child = child;
239                child
240                    .wait()
241                    .ok()
242                    .and_then(|s| i32::try_from(s.exit_code()).ok())
243                    .unwrap_or(-1)
244            })
245            .await
246            .unwrap_or(-1);
247            let _ = tx.send(Event::Exit(code));
248            manager.inner.write().await.remove(&wait_id);
249        });
250
251        Ok(session)
252    }
253
254    async fn remove(&self, id: &str) -> bool {
255        let removed = self.inner.write().await.remove(id);
256        if let Some(s) = removed {
257            // Kill the shell; its exit will flush through the normal
258            // reader/waiter path. Any live WS subscribers get an Exit
259            // event and close out.
260            s.kill();
261            true
262        } else {
263            false
264        }
265    }
266}
267
268fn short_id() -> String {
269    let u = Uuid::new_v4();
270    u.simple().to_string()[..12].to_string()
271}
272
273fn now_ms() -> u64 {
274    SystemTime::now()
275        .duration_since(UNIX_EPOCH)
276        .map(|d| d.as_millis() as u64)
277        .unwrap_or(0)
278}
279
280async fn spawn_session(
281    id: String,
282    title: String,
283) -> std::io::Result<(Arc<Session>, Box<dyn portable_pty::Child + Send + Sync>)> {
284    let pty_system = native_pty_system();
285    let pair = pty_system
286        .openpty(PtySize {
287            rows: DEFAULT_ROWS,
288            cols: DEFAULT_COLS,
289            pixel_width: 0,
290            pixel_height: 0,
291        })
292        .map_err(io_err)?;
293
294    // Write the wezterm shell integration to a tempfile so bash can
295    // --rcfile it. Idempotent content; tempfs sweeps it on reboot.
296    let rc_path = write_rc_tempfile()?;
297
298    // Prefer bash; fall back to sh if bash is absent on the VM.
299    let shell = if std::path::Path::new("/bin/bash").exists() {
300        "/bin/bash"
301    } else {
302        "/bin/sh"
303    };
304
305    let mut cmd = CommandBuilder::new(shell);
306    cmd.args(["--rcfile", &rc_path, "-i"]);
307    cmd.env("TERM", "xterm-256color");
308    // Force the integration to load even if the user env would skip it.
309    cmd.env_remove("WEZTERM_SHELL_SKIP_ALL");
310    if let Ok(home) = std::env::var("HOME") {
311        cmd.env("HOME", home);
312    }
313
314    let child = pair.slave.spawn_command(cmd).map_err(io_err)?;
315    let pid = child.process_id();
316    let reader = pair.master.try_clone_reader().map_err(io_err)?;
317    let writer = pair.master.take_writer().map_err(io_err)?;
318
319    let created_at_ms = now_ms();
320    let (tx, _rx) = broadcast::channel(BROADCAST_CAP);
321
322    let session = Arc::new(Session {
323        id: id.clone(),
324        title,
325        created_at_ms,
326        master: Arc::new(Mutex::new(pair.master)),
327        writer: Arc::new(Mutex::new(writer)),
328        ring: Arc::new(Mutex::new(VecDeque::with_capacity(RING_CAP))),
329        blocks: Arc::new(RwLock::new(VecDeque::with_capacity(BLOCKS_CAP))),
330        next_seq: Arc::new(Mutex::new(0)),
331        tx,
332        pid,
333    });
334
335    // Reader thread: read PTY bytes, feed the OSC parser, broadcast raw
336    // bytes + committed blocks. Blocking IO on a dedicated thread.
337    let sess = session.clone();
338    std::thread::Builder::new()
339        .name(format!("webtmux-rd-{id}"))
340        .spawn(move || reader_loop(sess, reader))
341        .map_err(io_err)?;
342
343    Ok((session, child))
344}
345
346fn io_err<E: std::fmt::Display>(e: E) -> std::io::Error {
347    std::io::Error::other(format!("{e}"))
348}
349
350fn write_rc_tempfile() -> std::io::Result<String> {
351    let dir = std::env::temp_dir();
352    let path = dir.join("dd-webtmux-wezterm.sh");
353    // Idempotent write; if content matches, skip to avoid racing concurrent
354    // session creates.
355    if std::fs::read(&path).ok().as_deref() != Some(SHELL_INTEGRATION_SH.as_bytes()) {
356        std::fs::write(&path, SHELL_INTEGRATION_SH)?;
357    }
358    Ok(path.to_string_lossy().into_owned())
359}
360
361// ---------------------------------------------------------------------
362// Reader loop + OSC 133 parser
363// ---------------------------------------------------------------------
364
365fn reader_loop(session: Arc<Session>, mut reader: Box<dyn Read + Send>) {
366    let mut parser = Parser::new();
367    let mut perf = SemanticPerform {
368        session: session.clone(),
369        state: PromptState::Idle,
370        input_scratch: Vec::new(),
371        pending_command: String::new(),
372    };
373    let mut buf = [0u8; 4096];
374    loop {
375        match reader.read(&mut buf) {
376            Ok(0) => break,
377            Ok(n) => {
378                let chunk = &buf[..n];
379                // Append to rolling ring
380                {
381                    let mut ring = session.ring.blocking_lock();
382                    if ring.len() + n > RING_CAP {
383                        let drop_n = ring.len() + n - RING_CAP;
384                        for _ in 0..drop_n.min(ring.len()) {
385                            ring.pop_front();
386                        }
387                    }
388                    ring.extend(chunk.iter().copied());
389                }
390                // Broadcast raw bytes
391                let _ = session.tx.send(Event::Raw(Arc::new(chunk.to_vec())));
392                // Feed the semantic parser (vte drives all capture).
393                for &b in chunk {
394                    parser.advance(&mut perf, b);
395                }
396            }
397            Err(_) => break,
398        }
399    }
400}
401
402#[derive(Debug)]
403enum PromptState {
404    Idle,
405    InPrompt,               // after A, before B
406    InInput,                // after B, before C
407    InOutput(PartialBlock), // after C, before D
408}
409
410#[derive(Debug)]
411struct PartialBlock {
412    started_at_ms: u64,
413    output_bytes: Vec<u8>,
414}
415
416struct SemanticPerform {
417    session: Arc<Session>,
418    state: PromptState,
419    /// Echoed bytes between OSC 133 B and C; trimmed into the command
420    /// text when we enter InOutput.
421    input_scratch: Vec<u8>,
422    /// Command text derived from `input_scratch` at B→C transition,
423    /// parked here until the D event finalizes the block.
424    pending_command: String,
425}
426
427impl Perform for SemanticPerform {
428    fn print(&mut self, c: char) {
429        let mut buf = [0u8; 4];
430        let s = c.encode_utf8(&mut buf);
431        match &mut self.state {
432            PromptState::InInput => self.input_scratch.extend_from_slice(s.as_bytes()),
433            PromptState::InOutput(pb) => pb.output_bytes.extend_from_slice(s.as_bytes()),
434            _ => {}
435        }
436    }
437    fn execute(&mut self, b: u8) {
438        // Keep only the visible whitespace / control chars that belong
439        // in the transcript. Everything else (BEL, backspace, etc.) is
440        // terminal noise we don't need in the block record.
441        if !matches!(b, b'\n' | b'\r' | b'\t') {
442            return;
443        }
444        match &mut self.state {
445            PromptState::InInput => self.input_scratch.push(b),
446            PromptState::InOutput(pb) => pb.output_bytes.push(b),
447            _ => {}
448        }
449    }
450    fn hook(&mut self, _p: &Params, _i: &[u8], _ignore: bool, _c: char) {}
451    fn put(&mut self, _b: u8) {}
452    fn unhook(&mut self) {}
453    fn csi_dispatch(&mut self, _p: &Params, _i: &[u8], _ignore: bool, _c: char) {}
454    fn esc_dispatch(&mut self, _i: &[u8], _ignore: bool, _b: u8) {}
455
456    fn osc_dispatch(&mut self, params: &[&[u8]], _bell_terminated: bool) {
457        if params.len() < 2 {
458            return;
459        }
460        if params[0] != b"133" {
461            return;
462        }
463        let kind = params[1].first().copied();
464        match kind {
465            Some(b'A') => {
466                self.state = PromptState::InPrompt;
467                self.input_scratch.clear();
468            }
469            Some(b'B') => {
470                self.state = PromptState::InInput;
471                self.input_scratch.clear();
472            }
473            Some(b'C') => {
474                let _command = decode_command(&self.input_scratch);
475                self.pending_command = _command;
476                self.input_scratch.clear();
477                self.state = PromptState::InOutput(PartialBlock {
478                    started_at_ms: now_ms(),
479                    output_bytes: Vec::new(),
480                });
481            }
482            Some(b'D') => {
483                // OSC 133 D has form "D;<exit>" — params[2] is the exit
484                // code if present; default to 0.
485                let exit_code: i32 = params
486                    .get(2)
487                    .and_then(|p| std::str::from_utf8(p).ok())
488                    .and_then(|s| s.parse().ok())
489                    .unwrap_or(0);
490                if let PromptState::InOutput(pb) =
491                    std::mem::replace(&mut self.state, PromptState::Idle)
492                {
493                    let command = std::mem::take(&mut self.pending_command);
494                    let block = finalize_block(&self.session, pb, command, exit_code);
495                    let arc = Arc::new(block);
496                    {
497                        let mut blocks = self.session.blocks.blocking_write();
498                        while blocks.len() >= BLOCKS_CAP {
499                            blocks.pop_front();
500                        }
501                        blocks.push_back((*arc).clone());
502                    }
503                    let _ = self.session.tx.send(Event::Block(arc));
504                }
505            }
506            _ => {}
507        }
508    }
509}
510
511// Reopen the struct to add the scratch field without reshuffling above
512// — kept at the bottom to keep the imports block clean. The whole file
513// is cohesive enough that this is just a style choice.
514// (Rust allows only one definition per struct, so we inline the field
515// into the original above.)
516
517fn decode_command(input: &[u8]) -> String {
518    // The bytes between B and C are the echoed keystrokes as the user
519    // typed — includes backspaces, cursor moves, and the trailing Enter.
520    // Strip ANSI sequences and trim. Not authoritative; upgrade to
521    // OSC 633;E capture later.
522    let s = String::from_utf8_lossy(input);
523    let mut out = String::new();
524    let mut in_esc = false;
525    for c in s.chars() {
526        if in_esc {
527            if c.is_ascii_alphabetic() || c == '\x07' {
528                in_esc = false;
529            }
530            continue;
531        }
532        match c {
533            '\x1b' => in_esc = true,
534            '\x08' => {
535                out.pop();
536            }
537            '\r' | '\n' => {}
538            _ => out.push(c),
539        }
540    }
541    out.trim().to_string()
542}
543
544fn finalize_block(
545    session: &Session,
546    pb: PartialBlock,
547    command: String,
548    exit_code: i32,
549) -> BlockRecord {
550    let mut seq_g = session.next_seq.blocking_lock();
551    let seq = *seq_g;
552    *seq_g += 1;
553    let output_b64 = base64::engine::general_purpose::STANDARD.encode(&pb.output_bytes);
554    BlockRecord {
555        session_id: session.id.clone(),
556        seq,
557        started_at_ms: pb.started_at_ms,
558        ended_at_ms: now_ms(),
559        command,
560        output_b64,
561        exit_code,
562    }
563}
564
565// ---------------------------------------------------------------------
566// HTTP handlers
567// ---------------------------------------------------------------------
568
569/// Mount point-agnostic. Nest this wherever you want:
570///
571/// ```no_run
572/// # use axum::Router;
573/// let app: Router = Router::new()
574///     .nest("/term", bastion::router(bastion::Manager::new()));
575/// ```
576pub fn router(manager: Manager) -> Router {
577    Router::new()
578        .route("/", get(page))
579        .route("/api/sessions", get(list_sessions).post(create_session))
580        .route("/api/sessions/{id}", delete(kill_session))
581        .route("/ws/{id}", get(ws_upgrade))
582        .with_state(manager)
583}
584
585async fn page(State(m): State<Manager>) -> impl IntoResponse {
586    Html((m.shell)("Terminal", PAGE_BODY))
587}
588
589#[derive(Deserialize)]
590struct CreateBody {
591    title: Option<String>,
592}
593
594async fn list_sessions(State(m): State<Manager>) -> Json<Vec<SessionInfo>> {
595    Json(m.list().await)
596}
597
598async fn create_session(
599    State(m): State<Manager>,
600    body: Option<Json<CreateBody>>,
601) -> Result<Json<SessionInfo>, axum::http::StatusCode> {
602    let title = body
603        .and_then(|b| b.0.title)
604        .unwrap_or_else(|| "shell".to_string());
605    match m.create(title).await {
606        Ok(s) => Ok(Json(s.info())),
607        Err(_) => Err(axum::http::StatusCode::INTERNAL_SERVER_ERROR),
608    }
609}
610
611async fn kill_session(State(m): State<Manager>, Path(id): Path<String>) -> axum::http::StatusCode {
612    if m.remove(&id).await {
613        axum::http::StatusCode::NO_CONTENT
614    } else {
615        axum::http::StatusCode::NOT_FOUND
616    }
617}
618
619async fn ws_upgrade(
620    ws: WebSocketUpgrade,
621    Path(id): Path<String>,
622    State(m): State<Manager>,
623) -> impl IntoResponse {
624    ws.on_upgrade(move |socket| ws_loop(socket, id, m))
625}
626
627async fn ws_loop(mut socket: WebSocket, id: String, m: Manager) {
628    use futures_util::{SinkExt, StreamExt};
629
630    let Some(session) = m.get(&id).await else {
631        let _ = socket
632            .send(Message::Text(
633                r#"{"type":"error","code":"not_found"}"#.to_string().into(),
634            ))
635            .await;
636        return;
637    };
638
639    // Split the socket so send and receive halves run concurrently.
640    let (mut sink, mut stream) = socket.split();
641
642    // Replay: send current ring + blocks, tagged with latest seq so the
643    // client can dedupe against its IndexedDB.
644    {
645        let ring_bytes: Vec<u8> = session.ring.lock().await.iter().copied().collect();
646        if !ring_bytes.is_empty() {
647            let _ = sink.send(Message::Binary(ring_bytes.into())).await;
648        }
649        let blocks = session.blocks.read().await;
650        for b in blocks.iter() {
651            if let Ok(s) = serde_json::to_string(&serde_json::json!({
652                "type": "block",
653                "session_id": b.session_id,
654                "seq": b.seq,
655                "started_at_ms": b.started_at_ms,
656                "ended_at_ms": b.ended_at_ms,
657                "command": b.command,
658                "output_b64": b.output_b64,
659                "exit_code": b.exit_code,
660            })) {
661                let _ = sink.send(Message::Text(s.into())).await;
662            }
663        }
664        let seq = *session.next_seq.lock().await;
665        let _ = sink
666            .send(Message::Text(
667                serde_json::json!({"type":"ready","seq":seq})
668                    .to_string()
669                    .into(),
670            ))
671            .await;
672    }
673
674    // Subscribe to live events.
675    let mut rx = session.tx.subscribe();
676
677    let writer = session.writer.clone();
678    let master = session.master.clone();
679
680    // Inbound: stdin bytes and control JSON.
681    let inbound = async move {
682        while let Some(Ok(msg)) = stream.next().await {
683            match msg {
684                Message::Binary(bytes) => {
685                    let w = writer.clone();
686                    let _ = tokio::task::spawn_blocking(move || {
687                        let mut g = w.blocking_lock();
688                        let _ = g.write_all(&bytes);
689                    })
690                    .await;
691                }
692                Message::Text(s) => {
693                    let Ok(msg) = serde_json::from_str::<ClientMsg>(&s) else {
694                        continue;
695                    };
696                    match msg {
697                        ClientMsg::Resize(r) => {
698                            let g = master.lock().await;
699                            let _ = g.resize(PtySize {
700                                rows: r.rows.max(4),
701                                cols: r.cols.max(8),
702                                pixel_width: 0,
703                                pixel_height: 0,
704                            });
705                        }
706                        ClientMsg::Hello(_) => {
707                            // M1: we already replayed everything above.
708                            // M3 will honor have_up_to to skip known seqs.
709                        }
710                    }
711                }
712                Message::Close(_) => break,
713                _ => {}
714            }
715        }
716    };
717
718    // Outbound: pump broadcast events to the client.
719    let outbound = async move {
720        loop {
721            let ev = match rx.recv().await {
722                Ok(e) => e,
723                Err(broadcast::error::RecvError::Lagged(_)) => continue,
724                Err(_) => break,
725            };
726            match ev {
727                Event::Raw(bytes) => {
728                    if sink
729                        .send(Message::Binary((*bytes).clone().into()))
730                        .await
731                        .is_err()
732                    {
733                        break;
734                    }
735                }
736                Event::Block(b) => {
737                    let payload = serde_json::json!({
738                        "type": "block",
739                        "session_id": b.session_id,
740                        "seq": b.seq,
741                        "started_at_ms": b.started_at_ms,
742                        "ended_at_ms": b.ended_at_ms,
743                        "command": b.command,
744                        "output_b64": b.output_b64,
745                        "exit_code": b.exit_code,
746                    });
747                    if sink
748                        .send(Message::Text(payload.to_string().into()))
749                        .await
750                        .is_err()
751                    {
752                        break;
753                    }
754                }
755                Event::Exit(code) => {
756                    let _ = sink
757                        .send(Message::Text(
758                            serde_json::json!({"type":"exit","code":code})
759                                .to_string()
760                                .into(),
761                        ))
762                        .await;
763                    break;
764                }
765            }
766        }
767    };
768
769    tokio::select! {
770        _ = inbound => {},
771        _ = outbound => {},
772    }
773}
774
775// ---------------------------------------------------------------------
776// SPA (inline for M1)
777// ---------------------------------------------------------------------
778
779const PAGE_BODY: &str = include_str!("page.html");
780
781/// Default chrome: wraps the SPA body in a minimal dark-themed HTML
782/// shell. Apps that want their own nav/branding pass
783/// [`router_with_shell`] a custom `fn(title, body_html) -> String`.
784fn default_shell(title: &str, body: &str) -> String {
785    format!(
786        r#"<!DOCTYPE html>
787<html><head><meta charset="utf-8"><meta name="viewport" content="width=device-width,initial-scale=1">
788<title>{title}</title>
789<style>
790  * {{ box-sizing: border-box; margin: 0; padding: 0; }}
791  html, body {{ height: 100%; background: #1e1e2e; color: #cdd6f4;
792                font-family: 'JetBrains Mono', ui-monospace, monospace; }}
793  body {{ display: flex; flex-direction: column; }}
794  header {{ padding: 10px 16px; border-bottom: 1px solid #313244;
795            font-weight: 600; color: #89b4fa; font-size: 13px; }}
796  .fullpage {{ flex: 1; min-height: 0; display: flex; }}
797  a {{ color: #89b4fa; text-decoration: none; }}
798  a:hover {{ text-decoration: underline; }}
799</style></head>
800<body>
801<header>bastion</header>
802<div class="fullpage">{body}</div>
803</body></html>"#
804    )
805}
806
807// ---------------------------------------------------------------------
808// Tests
809// ---------------------------------------------------------------------
810
811#[cfg(test)]
812mod tests {
813    use super::*;
814
815    #[test]
816    fn osc_133_sequence_produces_block() {
817        // Build a minimal session without a real PTY.
818        let session = Arc::new(Session {
819            id: "t".into(),
820            title: "t".into(),
821            created_at_ms: 0,
822            master: Arc::new(Mutex::new(make_fake_master())),
823            writer: Arc::new(Mutex::new(
824                Box::new(std::io::sink()) as Box<dyn Write + Send>
825            )),
826            ring: Arc::new(Mutex::new(VecDeque::new())),
827            blocks: Arc::new(RwLock::new(VecDeque::new())),
828            next_seq: Arc::new(Mutex::new(0)),
829            tx: broadcast::channel::<Event>(8).0,
830            pid: None,
831        });
832        let mut parser = Parser::new();
833        let mut perf = SemanticPerform {
834            session: session.clone(),
835            state: PromptState::Idle,
836            input_scratch: Vec::new(),
837            pending_command: String::new(),
838        };
839        // A B <typed "echo hi\r"> C <output "hi\n"> D;0
840        let stream = b"\x1b]133;A\x07\x1b]133;B\x07echo hi\r\x1b]133;C\x07hi\n\x1b]133;D;0\x07";
841        for &b in stream {
842            parser.advance(&mut perf, b);
843        }
844        let blocks = session.blocks.blocking_read();
845        assert_eq!(blocks.len(), 1);
846        let b = &blocks[0];
847        assert_eq!(b.command, "echo hi");
848        assert_eq!(b.exit_code, 0);
849    }
850
851    fn make_fake_master() -> Box<dyn MasterPty + Send> {
852        // Opening a real pty is fine in unit tests on Linux; the
853        // caller won't read/write it in this test.
854        let pair = native_pty_system()
855            .openpty(PtySize {
856                rows: 24,
857                cols: 80,
858                pixel_width: 0,
859                pixel_height: 0,
860            })
861            .expect("openpty");
862        pair.master
863    }
864}