lan_mouse_ipc/
connect_async.rs

1use crate::{ConnectionError, FrontendEvent, FrontendRequest, IpcError};
2use std::{
3    cmp::min,
4    io,
5    task::{ready, Poll},
6    time::Duration,
7};
8
9use futures::{Stream, StreamExt};
10use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf};
11use tokio_stream::wrappers::LinesStream;
12
13#[cfg(unix)]
14use tokio::net::UnixStream;
15
16#[cfg(windows)]
17use tokio::net::TcpStream;
18
19pub struct AsyncFrontendEventReader {
20    #[cfg(unix)]
21    lines_stream: LinesStream<BufReader<ReadHalf<UnixStream>>>,
22    #[cfg(windows)]
23    lines_stream: LinesStream<BufReader<ReadHalf<TcpStream>>>,
24}
25
26pub struct AsyncFrontendRequestWriter {
27    #[cfg(unix)]
28    tx: WriteHalf<UnixStream>,
29    #[cfg(windows)]
30    tx: WriteHalf<TcpStream>,
31}
32
33impl Stream for AsyncFrontendEventReader {
34    type Item = Result<FrontendEvent, IpcError>;
35
36    fn poll_next(
37        mut self: std::pin::Pin<&mut Self>,
38        cx: &mut std::task::Context<'_>,
39    ) -> std::task::Poll<Option<Self::Item>> {
40        let line = ready!(self.lines_stream.poll_next_unpin(cx));
41        let event = line.map(|l| {
42            l.map_err(Into::<IpcError>::into)
43                .and_then(|l| serde_json::from_str(l.as_str()).map_err(|e| e.into()))
44        });
45        Poll::Ready(event)
46    }
47}
48
49impl AsyncFrontendRequestWriter {
50    pub async fn request(&mut self, request: FrontendRequest) -> Result<(), io::Error> {
51        let mut json = serde_json::to_string(&request).unwrap();
52        log::debug!("requesting: {json}");
53        json.push('\n');
54        self.tx.write_all(json.as_bytes()).await?;
55        Ok(())
56    }
57}
58
59pub async fn connect_async(
60) -> Result<(AsyncFrontendEventReader, AsyncFrontendRequestWriter), ConnectionError> {
61    let stream = wait_for_service().await?;
62    #[cfg(unix)]
63    let (rx, tx): (ReadHalf<UnixStream>, WriteHalf<UnixStream>) = tokio::io::split(stream);
64    #[cfg(windows)]
65    let (rx, tx): (ReadHalf<TcpStream>, WriteHalf<TcpStream>) = tokio::io::split(stream);
66    let buf_reader = BufReader::new(rx);
67    let lines = buf_reader.lines();
68    let lines_stream = LinesStream::new(lines);
69    let reader = AsyncFrontendEventReader { lines_stream };
70    let writer = AsyncFrontendRequestWriter { tx };
71    Ok((reader, writer))
72}
73
74/// wait for the lan-mouse socket to come online
75#[cfg(unix)]
76async fn wait_for_service() -> Result<UnixStream, ConnectionError> {
77    let socket_path = crate::default_socket_path()?;
78    let mut duration = Duration::from_millis(10);
79    loop {
80        if let Ok(stream) = UnixStream::connect(&socket_path).await {
81            break Ok(stream);
82        }
83        // a signaling mechanism or inotify could be used to
84        // improve this
85        tokio::time::sleep(exponential_back_off(&mut duration)).await;
86    }
87}
88
89#[cfg(windows)]
90async fn wait_for_service() -> Result<TcpStream, ConnectionError> {
91    let mut duration = Duration::from_millis(10);
92    loop {
93        if let Ok(stream) = TcpStream::connect("127.0.0.1:5252").await {
94            break Ok(stream);
95        }
96        tokio::time::sleep(exponential_back_off(&mut duration)).await;
97    }
98}
99
100fn exponential_back_off(duration: &mut Duration) -> Duration {
101    let new = duration.saturating_mul(2);
102    *duration = min(new, Duration::from_secs(1));
103    *duration
104}