webterm_agent/models/
socket_reader.rs

1use crate::models::agent_error::AgentError;
2use crate::models::connection_manager::ConnectionManager;
3use futures::stream::SplitStream;
4use futures::StreamExt;
5use std::sync::Arc;
6use tokio::net::TcpStream;
7use tokio::sync::broadcast;
8use tokio_tungstenite::tungstenite::{Bytes, Message};
9use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
10use tracing::{error, info};
11use webterm_core::models::reader_socket_error::ReaderSocketError;
12
13pub type SocketSubscriber = broadcast::Receiver<Result<Option<Bytes>, ReaderSocketError>>;
14
15pub struct SocketReader {
16    _tx: broadcast::Sender<Result<Option<Bytes>, ReaderSocketError>>,
17}
18
19impl SocketReader {
20    pub fn new(
21        mut reader_stream: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
22        cm: Arc<ConnectionManager>,
23    ) -> Self {
24        let (_tx, _rx) = broadcast::channel::<Result<Option<Bytes>, ReaderSocketError>>(16);
25        let tx = _tx.clone();
26
27        tokio::spawn(async move {
28            loop {
29                if let Some(received) = reader_stream.next().await {
30                    let received = match received {
31                        Ok(Message::Binary(received)) => Ok(Some(received)),
32                        Ok(Message::Close(_)) => {
33                            cm.disconnect().await;
34                            break;
35                        }
36                        Ok(Message::Ping(_)) =>
37                        /* TODO: handle ping */
38                        {
39                            Ok(None)
40                        }
41                        Ok(Message::Pong(_)) =>
42                        /* TODO: handle pong */
43                        {
44                            Ok(None)
45                        }
46                        Ok(Message::Text(_)) =>
47                        /* TODO: handle text */
48                        {
49                            Ok(None)
50                        }
51                        Ok(Message::Frame(_)) =>
52                        /* TODO: handle text */
53                        {
54                            Ok(None)
55                        }
56                        Err(error) => {
57                            error!("Error receiving message from stream: {}", error);
58                            cm.disconnect_with_error(AgentError::RuntimeError(error.to_string()))
59                                .await;
60                            break;
61                        }
62                    };
63
64                    let _ = tx.send(received);
65                } else {
66                    info!("Reader stream closed");
67                    cm.disconnect().await;
68                    break;
69                }
70            }
71        });
72        Self { _tx }
73    }
74
75    pub fn subscriber(&self) -> SocketSubscriber {
76        self._tx.subscribe()
77    }
78}