webterm_agent/models/
socket_writer.rs1use crate::models::agent_error::AgentError;
2use crate::models::connection_manager::ConnectionManager;
3use futures::stream::SplitSink;
4use futures::SinkExt;
5use std::sync::Arc;
6use tokio::net::TcpStream;
7use tokio::sync::mpsc;
8use tokio_tungstenite::tungstenite::{Bytes, Message};
9use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
10use tracing::info;
11
12pub type SocketPublisher = mpsc::Sender<Bytes>;
13
14pub struct SocketWriter {
15 _tx: SocketPublisher,
16}
17
18impl SocketWriter {
19 pub fn new(
20 mut writer_stream: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
21 cm: Arc<ConnectionManager>,
22 ) -> Self {
23 let (_tx, mut rx) = mpsc::channel::<Bytes>(16);
24
25 tokio::spawn(async move {
26 loop {
27 let received = rx.recv().await;
28 match received {
29 None => {
30 info!("mpsc rx closed");
31 cm.disconnect().await;
32 break;
33 }
34 Some(message) => {
35 let result = writer_stream.send(Message::Binary(message)).await;
36 match result {
37 Ok(_) => {
38 }
40 Err(error) => {
41 info!("Error sending message to writer stream: {:?}", error);
42 cm.disconnect_with_error(AgentError::RuntimeError(
43 error.to_string(),
44 ))
45 .await;
46 break;
47 }
48 }
49 }
50 }
51 }
52 });
53 Self { _tx }
54 }
55
56 pub fn publisher(&self) -> SocketPublisher {
57 self._tx.clone()
58 }
59}