lan_mouse_ipc/
listen.rs

1use futures::{stream::SelectAll, Stream, StreamExt};
2#[cfg(unix)]
3use std::path::PathBuf;
4use std::{
5    io::ErrorKind,
6    pin::Pin,
7    task::{Context, Poll},
8};
9
10use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf};
11use tokio_stream::wrappers::LinesStream;
12
13#[cfg(unix)]
14use tokio::net::UnixListener;
15#[cfg(unix)]
16use tokio::net::UnixStream;
17
18#[cfg(windows)]
19use tokio::net::TcpListener;
20#[cfg(windows)]
21use tokio::net::TcpStream;
22
23use crate::{FrontendEvent, FrontendRequest, IpcError, ListenerCreationError};
24
25pub struct AsyncFrontendListener {
26    #[cfg(windows)]
27    listener: TcpListener,
28    #[cfg(unix)]
29    listener: UnixListener,
30    #[cfg(unix)]
31    socket_path: PathBuf,
32    #[cfg(unix)]
33    line_streams: SelectAll<LinesStream<BufReader<ReadHalf<UnixStream>>>>,
34    #[cfg(windows)]
35    line_streams: SelectAll<LinesStream<BufReader<ReadHalf<TcpStream>>>>,
36    #[cfg(unix)]
37    tx_streams: Vec<WriteHalf<UnixStream>>,
38    #[cfg(windows)]
39    tx_streams: Vec<WriteHalf<TcpStream>>,
40}
41
42impl AsyncFrontendListener {
43    pub async fn new() -> Result<Self, ListenerCreationError> {
44        #[cfg(unix)]
45        let (socket_path, listener) = {
46            let socket_path = crate::default_socket_path()?;
47
48            log::debug!("remove socket: {:?}", socket_path);
49            if socket_path.exists() {
50                // try to connect to see if some other instance
51                // of lan-mouse is already running
52                match UnixStream::connect(&socket_path).await {
53                    // connected -> lan-mouse is already running
54                    Ok(_) => return Err(ListenerCreationError::AlreadyRunning),
55                    // lan-mouse is not running but a socket was left behind
56                    Err(e) => {
57                        log::debug!("{socket_path:?}: {e} - removing left behind socket");
58                        let _ = std::fs::remove_file(&socket_path);
59                    }
60                }
61            }
62            let listener = match UnixListener::bind(&socket_path) {
63                Ok(ls) => ls,
64                // some other lan-mouse instance has bound the socket in the meantime
65                Err(e) if e.kind() == ErrorKind::AddrInUse => {
66                    return Err(ListenerCreationError::AlreadyRunning)
67                }
68                Err(e) => return Err(ListenerCreationError::Bind(e)),
69            };
70            (socket_path, listener)
71        };
72
73        #[cfg(windows)]
74        let listener = match TcpListener::bind("127.0.0.1:5252").await {
75            Ok(ls) => ls,
76            // some other lan-mouse instance has bound the socket in the meantime
77            Err(e) if e.kind() == ErrorKind::AddrInUse => {
78                return Err(ListenerCreationError::AlreadyRunning)
79            }
80            Err(e) => return Err(ListenerCreationError::Bind(e)),
81        };
82
83        let adapter = Self {
84            listener,
85            #[cfg(unix)]
86            socket_path,
87            line_streams: SelectAll::new(),
88            tx_streams: vec![],
89        };
90
91        Ok(adapter)
92    }
93
94    pub async fn broadcast(&mut self, notify: FrontendEvent) {
95        // encode event
96        let mut json = serde_json::to_string(&notify).unwrap();
97        json.push('\n');
98
99        let mut keep = vec![];
100        // TODO do simultaneously
101        for tx in self.tx_streams.iter_mut() {
102            // write len + payload
103            if tx.write(json.as_bytes()).await.is_err() {
104                keep.push(false);
105                continue;
106            }
107            keep.push(true);
108        }
109
110        // could not find a better solution because async
111        let mut keep = keep.into_iter();
112        self.tx_streams.retain(|_| keep.next().unwrap());
113    }
114}
115
116#[cfg(unix)]
117impl Drop for AsyncFrontendListener {
118    fn drop(&mut self) {
119        log::debug!("remove socket: {:?}", self.socket_path);
120        let _ = std::fs::remove_file(&self.socket_path);
121    }
122}
123
124impl Stream for AsyncFrontendListener {
125    type Item = Result<FrontendRequest, IpcError>;
126    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
127        if let Poll::Ready(Some(Ok(l))) = self.line_streams.poll_next_unpin(cx) {
128            let request = serde_json::from_str(l.as_str()).map_err(|e| e.into());
129            return Poll::Ready(Some(request));
130        }
131        let mut sync = false;
132        while let Poll::Ready(Ok((stream, _))) = self.listener.poll_accept(cx) {
133            let (rx, tx) = tokio::io::split(stream);
134            let buf_reader = BufReader::new(rx);
135            let lines = buf_reader.lines();
136            let lines = LinesStream::new(lines);
137            self.line_streams.push(lines);
138            self.tx_streams.push(tx);
139            sync = true;
140        }
141        if sync {
142            Poll::Ready(Some(Ok(FrontendRequest::Sync)))
143        } else {
144            Poll::Pending
145        }
146    }
147}