webterm_agent/models/
socket_writer.rs

1use crate::models::agent_error::AgentError;
2use crate::models::connection_manager::ConnectionManager;
3use futures::stream::SplitSink;
4use futures::SinkExt;
5use std::sync::Arc;
6use tokio::net::TcpStream;
7use tokio::sync::mpsc;
8use tokio_tungstenite::tungstenite::{Bytes, Message};
9use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
10use tracing::info;
11
12pub type SocketPublisher = mpsc::Sender<Bytes>;
13
14pub struct SocketWriter {
15    _tx: SocketPublisher,
16}
17
18impl SocketWriter {
19    pub fn new(
20        mut writer_stream: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
21        cm: Arc<ConnectionManager>,
22    ) -> Self {
23        let (_tx, mut rx) = mpsc::channel::<Bytes>(16);
24
25        tokio::spawn(async move {
26            loop {
27                let received = rx.recv().await;
28                match received {
29                    None => {
30                        info!("mpsc rx closed");
31                        cm.disconnect().await;
32                        break;
33                    }
34                    Some(message) => {
35                        let result = writer_stream.send(Message::Binary(message)).await;
36                        match result {
37                            Ok(_) => {
38                                // continue
39                            }
40                            Err(error) => {
41                                info!("Error sending message to writer stream: {:?}", error);
42                                cm.disconnect_with_error(AgentError::RuntimeError(
43                                    error.to_string(),
44                                ))
45                                .await;
46                                break;
47                            }
48                        }
49                    }
50                }
51            }
52        });
53        Self { _tx }
54    }
55
56    pub fn publisher(&self) -> SocketPublisher {
57        self._tx.clone()
58    }
59}