webterm_agent/models/
socket_reader.rs1use crate::models::agent_error::AgentError;
2use crate::models::connection_manager::ConnectionManager;
3use futures::stream::SplitStream;
4use futures::StreamExt;
5use std::sync::Arc;
6use tokio::net::TcpStream;
7use tokio::sync::broadcast;
8use tokio_tungstenite::tungstenite::{Bytes, Message};
9use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
10use tracing::{error, info};
11use webterm_core::models::reader_socket_error::ReaderSocketError;
12
13pub type SocketSubscriber = broadcast::Receiver<Result<Option<Bytes>, ReaderSocketError>>;
14
15pub struct SocketReader {
16 _tx: broadcast::Sender<Result<Option<Bytes>, ReaderSocketError>>,
17}
18
19impl SocketReader {
20 pub fn new(
21 mut reader_stream: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
22 cm: Arc<ConnectionManager>,
23 ) -> Self {
24 let (_tx, _rx) = broadcast::channel::<Result<Option<Bytes>, ReaderSocketError>>(16);
25 let tx = _tx.clone();
26
27 tokio::spawn(async move {
28 loop {
29 if let Some(received) = reader_stream.next().await {
30 let received = match received {
31 Ok(Message::Binary(received)) => Ok(Some(received)),
32 Ok(Message::Close(_)) => {
33 cm.disconnect().await;
34 break;
35 }
36 Ok(Message::Ping(_)) =>
37 {
39 Ok(None)
40 }
41 Ok(Message::Pong(_)) =>
42 {
44 Ok(None)
45 }
46 Ok(Message::Text(_)) =>
47 {
49 Ok(None)
50 }
51 Ok(Message::Frame(_)) =>
52 {
54 Ok(None)
55 }
56 Err(error) => {
57 error!("Error receiving message from stream: {}", error);
58 cm.disconnect_with_error(AgentError::RuntimeError(error.to_string()))
59 .await;
60 break;
61 }
62 };
63
64 let _ = tx.send(received);
65 } else {
66 info!("Reader stream closed");
67 cm.disconnect().await;
68 break;
69 }
70 }
71 });
72 Self { _tx }
73 }
74
75 pub fn subscriber(&self) -> SocketSubscriber {
76 self._tx.subscribe()
77 }
78}