lan_mouse_ipc/
connect_async.rs1use crate::{ConnectionError, FrontendEvent, FrontendRequest, IpcError};
2use std::{
3 cmp::min,
4 io,
5 task::{ready, Poll},
6 time::Duration,
7};
8
9use futures::{Stream, StreamExt};
10use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf};
11use tokio_stream::wrappers::LinesStream;
12
13#[cfg(unix)]
14use tokio::net::UnixStream;
15
16#[cfg(windows)]
17use tokio::net::TcpStream;
18
19pub struct AsyncFrontendEventReader {
20 #[cfg(unix)]
21 lines_stream: LinesStream<BufReader<ReadHalf<UnixStream>>>,
22 #[cfg(windows)]
23 lines_stream: LinesStream<BufReader<ReadHalf<TcpStream>>>,
24}
25
26pub struct AsyncFrontendRequestWriter {
27 #[cfg(unix)]
28 tx: WriteHalf<UnixStream>,
29 #[cfg(windows)]
30 tx: WriteHalf<TcpStream>,
31}
32
33impl Stream for AsyncFrontendEventReader {
34 type Item = Result<FrontendEvent, IpcError>;
35
36 fn poll_next(
37 mut self: std::pin::Pin<&mut Self>,
38 cx: &mut std::task::Context<'_>,
39 ) -> std::task::Poll<Option<Self::Item>> {
40 let line = ready!(self.lines_stream.poll_next_unpin(cx));
41 let event = line.map(|l| {
42 l.map_err(Into::<IpcError>::into)
43 .and_then(|l| serde_json::from_str(l.as_str()).map_err(|e| e.into()))
44 });
45 Poll::Ready(event)
46 }
47}
48
49impl AsyncFrontendRequestWriter {
50 pub async fn request(&mut self, request: FrontendRequest) -> Result<(), io::Error> {
51 let mut json = serde_json::to_string(&request).unwrap();
52 log::debug!("requesting: {json}");
53 json.push('\n');
54 self.tx.write_all(json.as_bytes()).await?;
55 Ok(())
56 }
57}
58
59pub async fn connect_async(
60) -> Result<(AsyncFrontendEventReader, AsyncFrontendRequestWriter), ConnectionError> {
61 let stream = wait_for_service().await?;
62 #[cfg(unix)]
63 let (rx, tx): (ReadHalf<UnixStream>, WriteHalf<UnixStream>) = tokio::io::split(stream);
64 #[cfg(windows)]
65 let (rx, tx): (ReadHalf<TcpStream>, WriteHalf<TcpStream>) = tokio::io::split(stream);
66 let buf_reader = BufReader::new(rx);
67 let lines = buf_reader.lines();
68 let lines_stream = LinesStream::new(lines);
69 let reader = AsyncFrontendEventReader { lines_stream };
70 let writer = AsyncFrontendRequestWriter { tx };
71 Ok((reader, writer))
72}
73
74#[cfg(unix)]
76async fn wait_for_service() -> Result<UnixStream, ConnectionError> {
77 let socket_path = crate::default_socket_path()?;
78 let mut duration = Duration::from_millis(10);
79 loop {
80 if let Ok(stream) = UnixStream::connect(&socket_path).await {
81 break Ok(stream);
82 }
83 tokio::time::sleep(exponential_back_off(&mut duration)).await;
86 }
87}
88
89#[cfg(windows)]
90async fn wait_for_service() -> Result<TcpStream, ConnectionError> {
91 let mut duration = Duration::from_millis(10);
92 loop {
93 if let Ok(stream) = TcpStream::connect("127.0.0.1:5252").await {
94 break Ok(stream);
95 }
96 tokio::time::sleep(exponential_back_off(&mut duration)).await;
97 }
98}
99
100fn exponential_back_off(duration: &mut Duration) -> Duration {
101 let new = duration.saturating_mul(2);
102 *duration = min(new, Duration::from_secs(1));
103 *duration
104}