1use std::collections::VecDeque;
43use std::io::{Read, Write};
44use std::sync::Arc;
45use std::time::{SystemTime, UNIX_EPOCH};
46
47use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
48use axum::extract::{Path, State};
49use axum::response::{Html, IntoResponse};
50use axum::routing::{delete, get};
51use axum::{Json, Router};
52use base64::Engine as _;
53use portable_pty::{native_pty_system, CommandBuilder, MasterPty, PtySize};
54use serde::{Deserialize, Serialize};
55use tokio::sync::{broadcast, Mutex, RwLock};
56use uuid::Uuid;
57use vte::{Params, Parser, Perform};
58
59const SHELL_INTEGRATION_SH: &str = include_str!("shell_integration.sh");
63
64const RING_CAP: usize = 256 * 1024;
67
68const BLOCKS_CAP: usize = 128;
71
72const BROADCAST_CAP: usize = 256;
75
76const DEFAULT_COLS: u16 = 120;
78const DEFAULT_ROWS: u16 = 32;
79
80#[derive(Clone, Debug, Serialize)]
85pub struct BlockRecord {
86 pub session_id: String,
87 pub seq: u64,
88 pub started_at_ms: u64,
89 pub ended_at_ms: u64,
90 pub command: String,
91 pub output_b64: String,
93 pub exit_code: i32,
94}
95
96#[derive(Clone, Debug, Serialize)]
97pub struct SessionInfo {
98 pub id: String,
99 pub title: String,
100 pub created_at_ms: u64,
101 pub next_seq: u64,
102}
103
104#[derive(Deserialize)]
105#[allow(dead_code)] struct HelloMsg {
107 have_up_to: Option<u64>,
108}
109
110#[derive(Deserialize)]
111struct ResizeMsg {
112 cols: u16,
113 rows: u16,
114}
115
116#[derive(Deserialize)]
117#[serde(tag = "type", rename_all = "lowercase")]
118#[allow(dead_code)]
119enum ClientMsg {
120 Hello(HelloMsg),
121 Resize(ResizeMsg),
122}
123
124#[derive(Clone)]
129enum Event {
130 Raw(Arc<Vec<u8>>),
131 Block(Arc<BlockRecord>),
132 Exit(i32),
133}
134
135struct Session {
136 id: String,
137 title: String,
138 created_at_ms: u64,
139 master: Arc<Mutex<Box<dyn MasterPty + Send>>>,
140 writer: Arc<Mutex<Box<dyn Write + Send>>>,
141 ring: Arc<Mutex<VecDeque<u8>>>,
142 blocks: Arc<RwLock<VecDeque<BlockRecord>>>,
143 next_seq: Arc<Mutex<u64>>,
144 tx: broadcast::Sender<Event>,
145 pid: Option<u32>,
149}
150
151impl Session {
152 fn info(&self) -> SessionInfo {
153 SessionInfo {
154 id: self.id.clone(),
155 title: self.title.clone(),
156 created_at_ms: self.created_at_ms,
157 next_seq: self.next_seq.try_lock().map(|g| *g).unwrap_or(0),
158 }
159 }
160
161 fn kill(&self) {
164 let Some(pid) = self.pid else { return };
165 let pid = pid as i32;
166 unsafe {
167 libc::kill(pid, libc::SIGHUP);
168 }
169 let _ = pid;
173 }
174}
175
176pub type ShellFn = Arc<dyn Fn(&str, &str) -> String + Send + Sync>;
184
185#[derive(Clone)]
186pub struct Manager {
187 inner: Arc<RwLock<std::collections::HashMap<String, Arc<Session>>>>,
188 shell: ShellFn,
189}
190
191impl Default for Manager {
192 fn default() -> Self {
193 Self::new()
194 }
195}
196
197impl Manager {
198 pub fn new() -> Self {
200 Self {
201 inner: Arc::new(RwLock::new(Default::default())),
202 shell: Arc::new(default_shell),
203 }
204 }
205
206 pub fn with_shell<F>(mut self, f: F) -> Self
209 where
210 F: Fn(&str, &str) -> String + Send + Sync + 'static,
211 {
212 self.shell = Arc::new(f);
213 self
214 }
215
216 async fn list(&self) -> Vec<SessionInfo> {
217 let g = self.inner.read().await;
218 g.values().map(|s| s.info()).collect()
219 }
220
221 async fn get(&self, id: &str) -> Option<Arc<Session>> {
222 self.inner.read().await.get(id).cloned()
223 }
224
225 async fn create(&self, title: String) -> std::io::Result<Arc<Session>> {
226 let id = short_id();
227 let (session, child) = spawn_session(id.clone(), title).await?;
228 self.inner.write().await.insert(id.clone(), session.clone());
229
230 let tx = session.tx.clone();
234 let manager = self.clone();
235 let wait_id = id;
236 tokio::spawn(async move {
237 let code = tokio::task::spawn_blocking(move || {
238 let mut child = child;
239 child
240 .wait()
241 .ok()
242 .and_then(|s| i32::try_from(s.exit_code()).ok())
243 .unwrap_or(-1)
244 })
245 .await
246 .unwrap_or(-1);
247 let _ = tx.send(Event::Exit(code));
248 manager.inner.write().await.remove(&wait_id);
249 });
250
251 Ok(session)
252 }
253
254 async fn remove(&self, id: &str) -> bool {
255 let removed = self.inner.write().await.remove(id);
256 if let Some(s) = removed {
257 s.kill();
261 true
262 } else {
263 false
264 }
265 }
266}
267
268fn short_id() -> String {
269 let u = Uuid::new_v4();
270 u.simple().to_string()[..12].to_string()
271}
272
273fn now_ms() -> u64 {
274 SystemTime::now()
275 .duration_since(UNIX_EPOCH)
276 .map(|d| d.as_millis() as u64)
277 .unwrap_or(0)
278}
279
280async fn spawn_session(
281 id: String,
282 title: String,
283) -> std::io::Result<(Arc<Session>, Box<dyn portable_pty::Child + Send + Sync>)> {
284 let pty_system = native_pty_system();
285 let pair = pty_system
286 .openpty(PtySize {
287 rows: DEFAULT_ROWS,
288 cols: DEFAULT_COLS,
289 pixel_width: 0,
290 pixel_height: 0,
291 })
292 .map_err(io_err)?;
293
294 let rc_path = write_rc_tempfile()?;
297
298 let shell = if std::path::Path::new("/bin/bash").exists() {
300 "/bin/bash"
301 } else {
302 "/bin/sh"
303 };
304
305 let mut cmd = CommandBuilder::new(shell);
306 cmd.args(["--rcfile", &rc_path, "-i"]);
307 cmd.env("TERM", "xterm-256color");
308 cmd.env_remove("WEZTERM_SHELL_SKIP_ALL");
310 if let Ok(home) = std::env::var("HOME") {
311 cmd.env("HOME", home);
312 }
313
314 let child = pair.slave.spawn_command(cmd).map_err(io_err)?;
315 let pid = child.process_id();
316 let reader = pair.master.try_clone_reader().map_err(io_err)?;
317 let writer = pair.master.take_writer().map_err(io_err)?;
318
319 let created_at_ms = now_ms();
320 let (tx, _rx) = broadcast::channel(BROADCAST_CAP);
321
322 let session = Arc::new(Session {
323 id: id.clone(),
324 title,
325 created_at_ms,
326 master: Arc::new(Mutex::new(pair.master)),
327 writer: Arc::new(Mutex::new(writer)),
328 ring: Arc::new(Mutex::new(VecDeque::with_capacity(RING_CAP))),
329 blocks: Arc::new(RwLock::new(VecDeque::with_capacity(BLOCKS_CAP))),
330 next_seq: Arc::new(Mutex::new(0)),
331 tx,
332 pid,
333 });
334
335 let sess = session.clone();
338 std::thread::Builder::new()
339 .name(format!("webtmux-rd-{id}"))
340 .spawn(move || reader_loop(sess, reader))
341 .map_err(io_err)?;
342
343 Ok((session, child))
344}
345
346fn io_err<E: std::fmt::Display>(e: E) -> std::io::Error {
347 std::io::Error::other(format!("{e}"))
348}
349
350fn write_rc_tempfile() -> std::io::Result<String> {
351 let dir = std::env::temp_dir();
352 let path = dir.join("dd-webtmux-wezterm.sh");
353 if std::fs::read(&path).ok().as_deref() != Some(SHELL_INTEGRATION_SH.as_bytes()) {
356 std::fs::write(&path, SHELL_INTEGRATION_SH)?;
357 }
358 Ok(path.to_string_lossy().into_owned())
359}
360
361fn reader_loop(session: Arc<Session>, mut reader: Box<dyn Read + Send>) {
366 let mut parser = Parser::new();
367 let mut perf = SemanticPerform {
368 session: session.clone(),
369 state: PromptState::Idle,
370 input_scratch: Vec::new(),
371 pending_command: String::new(),
372 };
373 let mut buf = [0u8; 4096];
374 loop {
375 match reader.read(&mut buf) {
376 Ok(0) => break,
377 Ok(n) => {
378 let chunk = &buf[..n];
379 {
381 let mut ring = session.ring.blocking_lock();
382 if ring.len() + n > RING_CAP {
383 let drop_n = ring.len() + n - RING_CAP;
384 for _ in 0..drop_n.min(ring.len()) {
385 ring.pop_front();
386 }
387 }
388 ring.extend(chunk.iter().copied());
389 }
390 let _ = session.tx.send(Event::Raw(Arc::new(chunk.to_vec())));
392 for &b in chunk {
394 parser.advance(&mut perf, b);
395 }
396 }
397 Err(_) => break,
398 }
399 }
400}
401
402#[derive(Debug)]
403enum PromptState {
404 Idle,
405 InPrompt, InInput, InOutput(PartialBlock), }
409
410#[derive(Debug)]
411struct PartialBlock {
412 started_at_ms: u64,
413 output_bytes: Vec<u8>,
414}
415
416struct SemanticPerform {
417 session: Arc<Session>,
418 state: PromptState,
419 input_scratch: Vec<u8>,
422 pending_command: String,
425}
426
427impl Perform for SemanticPerform {
428 fn print(&mut self, c: char) {
429 let mut buf = [0u8; 4];
430 let s = c.encode_utf8(&mut buf);
431 match &mut self.state {
432 PromptState::InInput => self.input_scratch.extend_from_slice(s.as_bytes()),
433 PromptState::InOutput(pb) => pb.output_bytes.extend_from_slice(s.as_bytes()),
434 _ => {}
435 }
436 }
437 fn execute(&mut self, b: u8) {
438 if !matches!(b, b'\n' | b'\r' | b'\t') {
442 return;
443 }
444 match &mut self.state {
445 PromptState::InInput => self.input_scratch.push(b),
446 PromptState::InOutput(pb) => pb.output_bytes.push(b),
447 _ => {}
448 }
449 }
450 fn hook(&mut self, _p: &Params, _i: &[u8], _ignore: bool, _c: char) {}
451 fn put(&mut self, _b: u8) {}
452 fn unhook(&mut self) {}
453 fn csi_dispatch(&mut self, _p: &Params, _i: &[u8], _ignore: bool, _c: char) {}
454 fn esc_dispatch(&mut self, _i: &[u8], _ignore: bool, _b: u8) {}
455
456 fn osc_dispatch(&mut self, params: &[&[u8]], _bell_terminated: bool) {
457 if params.len() < 2 {
458 return;
459 }
460 if params[0] != b"133" {
461 return;
462 }
463 let kind = params[1].first().copied();
464 match kind {
465 Some(b'A') => {
466 self.state = PromptState::InPrompt;
467 self.input_scratch.clear();
468 }
469 Some(b'B') => {
470 self.state = PromptState::InInput;
471 self.input_scratch.clear();
472 }
473 Some(b'C') => {
474 let _command = decode_command(&self.input_scratch);
475 self.pending_command = _command;
476 self.input_scratch.clear();
477 self.state = PromptState::InOutput(PartialBlock {
478 started_at_ms: now_ms(),
479 output_bytes: Vec::new(),
480 });
481 }
482 Some(b'D') => {
483 let exit_code: i32 = params
486 .get(2)
487 .and_then(|p| std::str::from_utf8(p).ok())
488 .and_then(|s| s.parse().ok())
489 .unwrap_or(0);
490 if let PromptState::InOutput(pb) =
491 std::mem::replace(&mut self.state, PromptState::Idle)
492 {
493 let command = std::mem::take(&mut self.pending_command);
494 let block = finalize_block(&self.session, pb, command, exit_code);
495 let arc = Arc::new(block);
496 {
497 let mut blocks = self.session.blocks.blocking_write();
498 while blocks.len() >= BLOCKS_CAP {
499 blocks.pop_front();
500 }
501 blocks.push_back((*arc).clone());
502 }
503 let _ = self.session.tx.send(Event::Block(arc));
504 }
505 }
506 _ => {}
507 }
508 }
509}
510
511fn decode_command(input: &[u8]) -> String {
518 let s = String::from_utf8_lossy(input);
523 let mut out = String::new();
524 let mut in_esc = false;
525 for c in s.chars() {
526 if in_esc {
527 if c.is_ascii_alphabetic() || c == '\x07' {
528 in_esc = false;
529 }
530 continue;
531 }
532 match c {
533 '\x1b' => in_esc = true,
534 '\x08' => {
535 out.pop();
536 }
537 '\r' | '\n' => {}
538 _ => out.push(c),
539 }
540 }
541 out.trim().to_string()
542}
543
544fn finalize_block(
545 session: &Session,
546 pb: PartialBlock,
547 command: String,
548 exit_code: i32,
549) -> BlockRecord {
550 let mut seq_g = session.next_seq.blocking_lock();
551 let seq = *seq_g;
552 *seq_g += 1;
553 let output_b64 = base64::engine::general_purpose::STANDARD.encode(&pb.output_bytes);
554 BlockRecord {
555 session_id: session.id.clone(),
556 seq,
557 started_at_ms: pb.started_at_ms,
558 ended_at_ms: now_ms(),
559 command,
560 output_b64,
561 exit_code,
562 }
563}
564
565pub fn router(manager: Manager) -> Router {
577 Router::new()
578 .route("/", get(page))
579 .route("/api/sessions", get(list_sessions).post(create_session))
580 .route("/api/sessions/{id}", delete(kill_session))
581 .route("/ws/{id}", get(ws_upgrade))
582 .with_state(manager)
583}
584
585async fn page(State(m): State<Manager>) -> impl IntoResponse {
586 Html((m.shell)("Terminal", PAGE_BODY))
587}
588
589#[derive(Deserialize)]
590struct CreateBody {
591 title: Option<String>,
592}
593
594async fn list_sessions(State(m): State<Manager>) -> Json<Vec<SessionInfo>> {
595 Json(m.list().await)
596}
597
598async fn create_session(
599 State(m): State<Manager>,
600 body: Option<Json<CreateBody>>,
601) -> Result<Json<SessionInfo>, axum::http::StatusCode> {
602 let title = body
603 .and_then(|b| b.0.title)
604 .unwrap_or_else(|| "shell".to_string());
605 match m.create(title).await {
606 Ok(s) => Ok(Json(s.info())),
607 Err(_) => Err(axum::http::StatusCode::INTERNAL_SERVER_ERROR),
608 }
609}
610
611async fn kill_session(State(m): State<Manager>, Path(id): Path<String>) -> axum::http::StatusCode {
612 if m.remove(&id).await {
613 axum::http::StatusCode::NO_CONTENT
614 } else {
615 axum::http::StatusCode::NOT_FOUND
616 }
617}
618
619async fn ws_upgrade(
620 ws: WebSocketUpgrade,
621 Path(id): Path<String>,
622 State(m): State<Manager>,
623) -> impl IntoResponse {
624 ws.on_upgrade(move |socket| ws_loop(socket, id, m))
625}
626
627async fn ws_loop(mut socket: WebSocket, id: String, m: Manager) {
628 use futures_util::{SinkExt, StreamExt};
629
630 let Some(session) = m.get(&id).await else {
631 let _ = socket
632 .send(Message::Text(
633 r#"{"type":"error","code":"not_found"}"#.to_string().into(),
634 ))
635 .await;
636 return;
637 };
638
639 let (mut sink, mut stream) = socket.split();
641
642 {
645 let ring_bytes: Vec<u8> = session.ring.lock().await.iter().copied().collect();
646 if !ring_bytes.is_empty() {
647 let _ = sink.send(Message::Binary(ring_bytes.into())).await;
648 }
649 let blocks = session.blocks.read().await;
650 for b in blocks.iter() {
651 if let Ok(s) = serde_json::to_string(&serde_json::json!({
652 "type": "block",
653 "session_id": b.session_id,
654 "seq": b.seq,
655 "started_at_ms": b.started_at_ms,
656 "ended_at_ms": b.ended_at_ms,
657 "command": b.command,
658 "output_b64": b.output_b64,
659 "exit_code": b.exit_code,
660 })) {
661 let _ = sink.send(Message::Text(s.into())).await;
662 }
663 }
664 let seq = *session.next_seq.lock().await;
665 let _ = sink
666 .send(Message::Text(
667 serde_json::json!({"type":"ready","seq":seq})
668 .to_string()
669 .into(),
670 ))
671 .await;
672 }
673
674 let mut rx = session.tx.subscribe();
676
677 let writer = session.writer.clone();
678 let master = session.master.clone();
679
680 let inbound = async move {
682 while let Some(Ok(msg)) = stream.next().await {
683 match msg {
684 Message::Binary(bytes) => {
685 let w = writer.clone();
686 let _ = tokio::task::spawn_blocking(move || {
687 let mut g = w.blocking_lock();
688 let _ = g.write_all(&bytes);
689 })
690 .await;
691 }
692 Message::Text(s) => {
693 let Ok(msg) = serde_json::from_str::<ClientMsg>(&s) else {
694 continue;
695 };
696 match msg {
697 ClientMsg::Resize(r) => {
698 let g = master.lock().await;
699 let _ = g.resize(PtySize {
700 rows: r.rows.max(4),
701 cols: r.cols.max(8),
702 pixel_width: 0,
703 pixel_height: 0,
704 });
705 }
706 ClientMsg::Hello(_) => {
707 }
710 }
711 }
712 Message::Close(_) => break,
713 _ => {}
714 }
715 }
716 };
717
718 let outbound = async move {
720 loop {
721 let ev = match rx.recv().await {
722 Ok(e) => e,
723 Err(broadcast::error::RecvError::Lagged(_)) => continue,
724 Err(_) => break,
725 };
726 match ev {
727 Event::Raw(bytes) => {
728 if sink
729 .send(Message::Binary((*bytes).clone().into()))
730 .await
731 .is_err()
732 {
733 break;
734 }
735 }
736 Event::Block(b) => {
737 let payload = serde_json::json!({
738 "type": "block",
739 "session_id": b.session_id,
740 "seq": b.seq,
741 "started_at_ms": b.started_at_ms,
742 "ended_at_ms": b.ended_at_ms,
743 "command": b.command,
744 "output_b64": b.output_b64,
745 "exit_code": b.exit_code,
746 });
747 if sink
748 .send(Message::Text(payload.to_string().into()))
749 .await
750 .is_err()
751 {
752 break;
753 }
754 }
755 Event::Exit(code) => {
756 let _ = sink
757 .send(Message::Text(
758 serde_json::json!({"type":"exit","code":code})
759 .to_string()
760 .into(),
761 ))
762 .await;
763 break;
764 }
765 }
766 }
767 };
768
769 tokio::select! {
770 _ = inbound => {},
771 _ = outbound => {},
772 }
773}
774
775const PAGE_BODY: &str = include_str!("page.html");
780
781fn default_shell(title: &str, body: &str) -> String {
785 format!(
786 r#"<!DOCTYPE html>
787<html><head><meta charset="utf-8"><meta name="viewport" content="width=device-width,initial-scale=1">
788<title>{title}</title>
789<style>
790 * {{ box-sizing: border-box; margin: 0; padding: 0; }}
791 html, body {{ height: 100%; background: #1e1e2e; color: #cdd6f4;
792 font-family: 'JetBrains Mono', ui-monospace, monospace; }}
793 body {{ display: flex; flex-direction: column; }}
794 header {{ padding: 10px 16px; border-bottom: 1px solid #313244;
795 font-weight: 600; color: #89b4fa; font-size: 13px; }}
796 .fullpage {{ flex: 1; min-height: 0; display: flex; }}
797 a {{ color: #89b4fa; text-decoration: none; }}
798 a:hover {{ text-decoration: underline; }}
799</style></head>
800<body>
801<header>bastion</header>
802<div class="fullpage">{body}</div>
803</body></html>"#
804 )
805}
806
807#[cfg(test)]
812mod tests {
813 use super::*;
814
815 #[test]
816 fn osc_133_sequence_produces_block() {
817 let session = Arc::new(Session {
819 id: "t".into(),
820 title: "t".into(),
821 created_at_ms: 0,
822 master: Arc::new(Mutex::new(make_fake_master())),
823 writer: Arc::new(Mutex::new(
824 Box::new(std::io::sink()) as Box<dyn Write + Send>
825 )),
826 ring: Arc::new(Mutex::new(VecDeque::new())),
827 blocks: Arc::new(RwLock::new(VecDeque::new())),
828 next_seq: Arc::new(Mutex::new(0)),
829 tx: broadcast::channel::<Event>(8).0,
830 pid: None,
831 });
832 let mut parser = Parser::new();
833 let mut perf = SemanticPerform {
834 session: session.clone(),
835 state: PromptState::Idle,
836 input_scratch: Vec::new(),
837 pending_command: String::new(),
838 };
839 let stream = b"\x1b]133;A\x07\x1b]133;B\x07echo hi\r\x1b]133;C\x07hi\n\x1b]133;D;0\x07";
841 for &b in stream {
842 parser.advance(&mut perf, b);
843 }
844 let blocks = session.blocks.blocking_read();
845 assert_eq!(blocks.len(), 1);
846 let b = &blocks[0];
847 assert_eq!(b.command, "echo hi");
848 assert_eq!(b.exit_code, 0);
849 }
850
851 fn make_fake_master() -> Box<dyn MasterPty + Send> {
852 let pair = native_pty_system()
855 .openpty(PtySize {
856 rows: 24,
857 cols: 80,
858 pixel_width: 0,
859 pixel_height: 0,
860 })
861 .expect("openpty");
862 pair.master
863 }
864}