webterm_agent/models/
pty_activity_reader.rs1use pty_process::OwnedReadPty;
2use std::sync::OnceLock;
3use tokio::io::AsyncReadExt;
4use tokio::sync::{broadcast, mpsc, Mutex};
5use tracing::debug;
6use tracing::info;
7use webterm_core::serialisers::talk_v1::terminal_output_builder::{
8 ActivityOutputBlob, TerminalOutputBuilder,
9};
10use webterm_core::types::ActivityId;
11
12const BUFFER_SIZE: usize = 10240;
13
14pub type TerminalSubscriber = broadcast::Receiver<Vec<u8>>;
15
16type ChannelType = (
17 mpsc::Sender<PtyActivityReaderPayload>,
18 Mutex<mpsc::Receiver<PtyActivityReaderPayload>>,
19);
20
21pub struct PtyActivityReaderPayload {
22 pub(crate) activity_id: ActivityId,
23 pub(crate) data: Vec<u8>,
24}
25
26impl PtyActivityReaderPayload {
27 pub fn to_fb_output(&self) -> ActivityOutputBlob {
28 let builder = TerminalOutputBuilder::new();
29 builder.build_output(&self.data).to_flatbuffers()
30 }
31}
32
33pub struct PtyActivityReader {}
34
35impl PtyActivityReader {
36 pub fn channel() -> &'static ChannelType {
37 static CHANNEL: OnceLock<ChannelType> = OnceLock::new();
38 CHANNEL.get_or_init(|| {
39 let (tx, rx) = mpsc::channel::<PtyActivityReaderPayload>(1024);
40 (tx, Mutex::new(rx))
41 })
42 }
43
44 pub fn sender() -> mpsc::Sender<PtyActivityReaderPayload> {
45 Self::channel().0.clone()
46 }
47
48 pub fn receiver() -> &'static Mutex<mpsc::Receiver<PtyActivityReaderPayload>> {
49 &Self::channel().1
50 }
51
52 pub fn new(activity_id: ActivityId, mut reader_stream: OwnedReadPty) -> Self {
53 let sender = Self::sender();
54 tokio::spawn(async move {
55 debug!("starting new terminal reader stream");
56 loop {
57 let mut buf = [0u8; BUFFER_SIZE];
58 if let Ok(length) = reader_stream.read(&mut buf).await {
59 sender
64 .send(PtyActivityReaderPayload {
65 activity_id,
66 data: buf[..length].to_vec(),
67 })
68 .await
69 .expect("this shouldn't fail");
70 } else {
71 info!("Reader stream closed");
72 break;
73 }
74 }
75 });
76
77 Self {}
78 }
79}