webterm_agent/models/
pty_activity_reader.rs1use pty_process::OwnedReadPty;
2use std::sync::OnceLock;
3use tokio::io::AsyncReadExt;
4use tokio::sync::{broadcast, mpsc, Mutex};
5use tracing::debug;
6use tracing::info;
7use webterm_core::serialisers::talk_v1::terminal_output_builder::{
8    ActivityOutputBlob, TerminalOutputBuilder,
9};
10use webterm_core::types::ActivityId;
11
12const BUFFER_SIZE: usize = 10240;
13
14pub type TerminalSubscriber = broadcast::Receiver<Vec<u8>>;
15
16type ChannelType = (
17    mpsc::Sender<PtyActivityReaderPayload>,
18    Mutex<mpsc::Receiver<PtyActivityReaderPayload>>,
19);
20
21pub struct PtyActivityReaderPayload {
22    pub(crate) activity_id: ActivityId,
23    pub(crate) data: Vec<u8>,
24}
25
26impl PtyActivityReaderPayload {
27    pub fn to_fb_output(&self) -> ActivityOutputBlob {
28        let builder = TerminalOutputBuilder::new();
29        builder.build_output(&self.data).to_flatbuffers()
30    }
31}
32
33pub struct PtyActivityReader {}
34
35impl PtyActivityReader {
36    pub fn channel() -> &'static ChannelType {
37        static CHANNEL: OnceLock<ChannelType> = OnceLock::new();
38        CHANNEL.get_or_init(|| {
39            let (tx, rx) = mpsc::channel::<PtyActivityReaderPayload>(1024);
40            (tx, Mutex::new(rx))
41        })
42    }
43
44    pub fn sender() -> mpsc::Sender<PtyActivityReaderPayload> {
45        Self::channel().0.clone()
46    }
47
48    pub fn receiver() -> &'static Mutex<mpsc::Receiver<PtyActivityReaderPayload>> {
49        &Self::channel().1
50    }
51
52    pub fn new(activity_id: ActivityId, mut reader_stream: OwnedReadPty) -> Self {
53        let sender = Self::sender();
54        tokio::spawn(async move {
55            debug!("starting new terminal reader stream");
56            loop {
57                let mut buf = [0u8; BUFFER_SIZE];
58                if let Ok(length) = reader_stream.read(&mut buf).await {
59                    sender
64                        .send(PtyActivityReaderPayload {
65                            activity_id,
66                            data: buf[..length].to_vec(),
67                        })
68                        .await
69                        .expect("this shouldn't fail");
70                } else {
71                    info!("Reader stream closed");
72                    break;
73                }
74            }
75        });
76
77        Self {}
78    }
79}