worterbuch_client/
unix.rs

1/*
2 *  Worterbuch client Unix Socket module
3 *
4 *  Copyright (C) 2024 Michael Bachmann
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU Affero General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU Affero General Public License for more details.
15 *
16 *  You should have received a copy of the GNU Affero General Public License
17 *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
18 */
19
20use tokio::{
21    io::{BufReader, Lines},
22    net::unix::{OwnedReadHalf, OwnedWriteHalf},
23    spawn,
24    sync::{mpsc, oneshot},
25};
26use tracing::{debug, error};
27use worterbuch_common::{
28    ClientMessage, ServerMessage, error::ConnectionResult, write_line_and_flush,
29};
30
31const SERVER_ID: &str = "worterbuch server";
32
33pub struct UnixClientSocket {
34    tx: mpsc::Sender<ClientMessage>,
35    rx: Lines<BufReader<OwnedReadHalf>>,
36    closed: oneshot::Receiver<()>,
37}
38
39impl UnixClientSocket {
40    pub async fn new(
41        tx: OwnedWriteHalf,
42        rx: Lines<BufReader<OwnedReadHalf>>,
43        buffer_size: usize,
44    ) -> Self {
45        let (send_tx, send_rx) = mpsc::channel(buffer_size);
46        let (closed_tx, closed_rx) = oneshot::channel();
47        spawn(forward_unix_messages(tx, send_rx, closed_tx));
48        Self {
49            tx: send_tx,
50            rx,
51            closed: closed_rx,
52        }
53    }
54
55    pub async fn send_msg(&self, msg: ClientMessage) -> ConnectionResult<()> {
56        self.tx.send(msg).await?;
57        Ok(())
58    }
59
60    pub async fn receive_msg(&mut self) -> ConnectionResult<Option<ServerMessage>> {
61        let read = self.rx.next_line().await;
62        match read {
63            Ok(None) => Ok(None),
64            Ok(Some(json)) => {
65                debug!("Received message: {json}");
66                let sm = serde_json::from_str(&json);
67                if let Err(e) = &sm {
68                    error!("Error deserializing message '{json}': {e}")
69                }
70                Ok(sm?)
71            }
72            Err(e) => Err(e.into()),
73        }
74    }
75
76    pub async fn close(self) -> ConnectionResult<()> {
77        drop(self.tx);
78        drop(self.rx);
79        self.closed.await.ok();
80        Ok(())
81    }
82}
83
84async fn forward_unix_messages(
85    mut tx: OwnedWriteHalf,
86    mut send_rx: mpsc::Receiver<ClientMessage>,
87    closed_tx: oneshot::Sender<()>,
88) {
89    while let Some(msg) = send_rx.recv().await {
90        if let Err(e) = write_line_and_flush(msg, &mut tx, None, SERVER_ID).await {
91            error!("Error sending TCP message: {e}");
92            break;
93        }
94    }
95
96    drop(tx);
97
98    closed_tx.send(()).ok();
99}