use crate::error::SerDeError;
use crate::ipc::{
self, IpcMessage, IpcReceiver, IpcReceiverSet, IpcSelectionResult, IpcSender, OpaqueIpcReceiver,
};
use futures_channel::mpsc::UnboundedReceiver;
use futures_channel::mpsc::UnboundedSender;
use futures_core::stream::FusedStream;
use futures_core::task::Context;
use futures_core::task::Poll;
use futures_core::Stream;
use serde_core::{Deserialize, Serialize};
use std::collections::HashMap;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::{LazyLock, Mutex};
use std::thread;
pub struct IpcStream<T>(UnboundedReceiver<IpcMessage>, PhantomData<T>);
impl<T> Unpin for IpcStream<T> {}
struct Router {
add_route: UnboundedSender<(OpaqueIpcReceiver, UnboundedSender<IpcMessage>)>,
wakeup: Mutex<IpcSender<()>>,
}
static ROUTER: LazyLock<Router> = LazyLock::new(|| {
let (send, mut recv) = futures_channel::mpsc::unbounded();
let (waker, wakee) = ipc::channel().expect("Failed to create IPC channel");
thread::spawn(move || {
let mut receivers = IpcReceiverSet::new().expect("Failed to create receiver set");
let mut senders = HashMap::<u64, UnboundedSender<IpcMessage>>::new();
let _ = receivers.add(wakee);
while let Ok(mut selections) = receivers.select() {
for selection in selections.drain(..) {
match selection {
IpcSelectionResult::MessageReceived(id, msg) => {
if let Some(sender) = senders.get(&id) {
let _ = sender.unbounded_send(msg);
}
},
IpcSelectionResult::ChannelClosed(id) => {
senders.remove(&id);
},
}
}
if !recv.is_terminated() {
while let Ok(Some((receiver, sender))) = recv.try_next() {
if let Ok(id) = receivers.add_opaque(receiver) {
senders.insert(id, sender);
}
}
}
}
});
Router {
add_route: send,
wakeup: Mutex::new(waker),
}
});
impl<T> IpcReceiver<T>
where
T: for<'de> Deserialize<'de> + Serialize,
{
pub fn to_stream(self) -> IpcStream<T> {
let opaque = self.to_opaque();
let (send, recv) = futures_channel::mpsc::unbounded();
let _ = ROUTER.add_route.unbounded_send((opaque, send));
if let Ok(waker) = ROUTER.wakeup.lock() {
let _ = waker.send(());
}
IpcStream(recv, PhantomData)
}
}
impl<T> Stream for IpcStream<T>
where
T: for<'de> Deserialize<'de> + Serialize,
{
type Item = Result<T, SerDeError>;
fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let recv = Pin::new(&mut self.0);
match recv.poll_next(ctx) {
Poll::Ready(Some(msg)) => Poll::Ready(Some(msg.to())),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
impl<T> FusedStream for IpcStream<T>
where
T: for<'de> Deserialize<'de> + Serialize,
{
fn is_terminated(&self) -> bool {
self.0.is_terminated()
}
}