worterbuch_client/
unix.rs1use tokio::{
21 io::{BufReader, Lines},
22 net::unix::{OwnedReadHalf, OwnedWriteHalf},
23 spawn,
24 sync::{mpsc, oneshot},
25};
26use tracing::{debug, error};
27use worterbuch_common::{
28 ClientMessage, ServerMessage, error::ConnectionResult, write_line_and_flush,
29};
30
31const SERVER_ID: &str = "worterbuch server";
32
33pub struct UnixClientSocket {
34 tx: mpsc::Sender<ClientMessage>,
35 rx: Lines<BufReader<OwnedReadHalf>>,
36 closed: oneshot::Receiver<()>,
37}
38
39impl UnixClientSocket {
40 pub async fn new(
41 tx: OwnedWriteHalf,
42 rx: Lines<BufReader<OwnedReadHalf>>,
43 buffer_size: usize,
44 ) -> Self {
45 let (send_tx, send_rx) = mpsc::channel(buffer_size);
46 let (closed_tx, closed_rx) = oneshot::channel();
47 spawn(forward_unix_messages(tx, send_rx, closed_tx));
48 Self {
49 tx: send_tx,
50 rx,
51 closed: closed_rx,
52 }
53 }
54
55 pub async fn send_msg(&self, msg: ClientMessage) -> ConnectionResult<()> {
56 self.tx.send(msg).await?;
57 Ok(())
58 }
59
60 pub async fn receive_msg(&mut self) -> ConnectionResult<Option<ServerMessage>> {
61 let read = self.rx.next_line().await;
62 match read {
63 Ok(None) => Ok(None),
64 Ok(Some(json)) => {
65 debug!("Received message: {json}");
66 let sm = serde_json::from_str(&json);
67 if let Err(e) = &sm {
68 error!("Error deserializing message '{json}': {e}")
69 }
70 Ok(sm?)
71 }
72 Err(e) => Err(e.into()),
73 }
74 }
75
76 pub async fn close(self) -> ConnectionResult<()> {
77 drop(self.tx);
78 drop(self.rx);
79 self.closed.await.ok();
80 Ok(())
81 }
82}
83
84async fn forward_unix_messages(
85 mut tx: OwnedWriteHalf,
86 mut send_rx: mpsc::Receiver<ClientMessage>,
87 closed_tx: oneshot::Sender<()>,
88) {
89 while let Some(msg) = send_rx.recv().await {
90 if let Err(e) = write_line_and_flush(msg, &mut tx, None, SERVER_ID).await {
91 error!("Error sending TCP message: {e}");
92 break;
93 }
94 }
95
96 drop(tx);
97
98 closed_tx.send(()).ok();
99}