1use futures::{stream::SelectAll, Stream, StreamExt};
2#[cfg(unix)]
3use std::path::PathBuf;
4use std::{
5 io::ErrorKind,
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf};
11use tokio_stream::wrappers::LinesStream;
12
13#[cfg(unix)]
14use tokio::net::UnixListener;
15#[cfg(unix)]
16use tokio::net::UnixStream;
17
18#[cfg(windows)]
19use tokio::net::TcpListener;
20#[cfg(windows)]
21use tokio::net::TcpStream;
22
23use crate::{FrontendEvent, FrontendRequest, IpcError, ListenerCreationError};
24
25pub struct AsyncFrontendListener {
26 #[cfg(windows)]
27 listener: TcpListener,
28 #[cfg(unix)]
29 listener: UnixListener,
30 #[cfg(unix)]
31 socket_path: PathBuf,
32 #[cfg(unix)]
33 line_streams: SelectAll<LinesStream<BufReader<ReadHalf<UnixStream>>>>,
34 #[cfg(windows)]
35 line_streams: SelectAll<LinesStream<BufReader<ReadHalf<TcpStream>>>>,
36 #[cfg(unix)]
37 tx_streams: Vec<WriteHalf<UnixStream>>,
38 #[cfg(windows)]
39 tx_streams: Vec<WriteHalf<TcpStream>>,
40}
41
42impl AsyncFrontendListener {
43 pub async fn new() -> Result<Self, ListenerCreationError> {
44 #[cfg(unix)]
45 let (socket_path, listener) = {
46 let socket_path = crate::default_socket_path()?;
47
48 log::debug!("remove socket: {:?}", socket_path);
49 if socket_path.exists() {
50 match UnixStream::connect(&socket_path).await {
53 Ok(_) => return Err(ListenerCreationError::AlreadyRunning),
55 Err(e) => {
57 log::debug!("{socket_path:?}: {e} - removing left behind socket");
58 let _ = std::fs::remove_file(&socket_path);
59 }
60 }
61 }
62 let listener = match UnixListener::bind(&socket_path) {
63 Ok(ls) => ls,
64 Err(e) if e.kind() == ErrorKind::AddrInUse => {
66 return Err(ListenerCreationError::AlreadyRunning)
67 }
68 Err(e) => return Err(ListenerCreationError::Bind(e)),
69 };
70 (socket_path, listener)
71 };
72
73 #[cfg(windows)]
74 let listener = match TcpListener::bind("127.0.0.1:5252").await {
75 Ok(ls) => ls,
76 Err(e) if e.kind() == ErrorKind::AddrInUse => {
78 return Err(ListenerCreationError::AlreadyRunning)
79 }
80 Err(e) => return Err(ListenerCreationError::Bind(e)),
81 };
82
83 let adapter = Self {
84 listener,
85 #[cfg(unix)]
86 socket_path,
87 line_streams: SelectAll::new(),
88 tx_streams: vec![],
89 };
90
91 Ok(adapter)
92 }
93
94 pub async fn broadcast(&mut self, notify: FrontendEvent) {
95 let mut json = serde_json::to_string(¬ify).unwrap();
97 json.push('\n');
98
99 let mut keep = vec![];
100 for tx in self.tx_streams.iter_mut() {
102 if tx.write(json.as_bytes()).await.is_err() {
104 keep.push(false);
105 continue;
106 }
107 keep.push(true);
108 }
109
110 let mut keep = keep.into_iter();
112 self.tx_streams.retain(|_| keep.next().unwrap());
113 }
114}
115
116#[cfg(unix)]
117impl Drop for AsyncFrontendListener {
118 fn drop(&mut self) {
119 log::debug!("remove socket: {:?}", self.socket_path);
120 let _ = std::fs::remove_file(&self.socket_path);
121 }
122}
123
124impl Stream for AsyncFrontendListener {
125 type Item = Result<FrontendRequest, IpcError>;
126 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
127 if let Poll::Ready(Some(Ok(l))) = self.line_streams.poll_next_unpin(cx) {
128 let request = serde_json::from_str(l.as_str()).map_err(|e| e.into());
129 return Poll::Ready(Some(request));
130 }
131 let mut sync = false;
132 while let Poll::Ready(Ok((stream, _))) = self.listener.poll_accept(cx) {
133 let (rx, tx) = tokio::io::split(stream);
134 let buf_reader = BufReader::new(rx);
135 let lines = buf_reader.lines();
136 let lines = LinesStream::new(lines);
137 self.line_streams.push(lines);
138 self.tx_streams.push(tx);
139 sync = true;
140 }
141 if sync {
142 Poll::Ready(Some(Ok(FrontendRequest::Sync)))
143 } else {
144 Poll::Pending
145 }
146 }
147}