ftth_common/
channel.rs

1//! AsyncWorld channels.
2//! 
3//! This enables calls from sync codes into async codes, with responses.
4
5use std::{fmt::Debug, hash::Hash, io::ErrorKind};
6
7pub use std::io::Error;
8
9pub fn create_pair<Req, Res>() -> (AsyncWorldClient<Req, Res>, AsyncWorldServer<Req, Res>) {
10    let id: u128 = rand::random();
11    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
12    (
13        AsyncWorldClient {
14            id,
15            sender: tx,
16        },
17        AsyncWorldServer {
18            id,
19            receiver: rx,
20        },
21    )
22}
23
24struct AsyncWorldRequest<Req, Res> {
25    request: Req,
26    channel: tokio::sync::oneshot::Sender<Res>,
27}
28
29impl<Req, Res> AsyncWorldRequest<Req, Res> {
30    fn extract(self) -> (Req, impl FnOnce(Res)) {
31        let req = self.request;
32        let ch = self.channel;
33        (
34            req,
35            move |data: Res| {
36                let _ = ch.send(data);
37            },
38        )
39    }
40}
41
42/// This is a client of an async world for a sync code.
43#[derive(Clone)]
44pub struct AsyncWorldClient<Req, Res> {
45    id: u128,
46    sender: tokio::sync::mpsc::UnboundedSender<AsyncWorldRequest<Req, Res>>,
47}
48
49impl<Req, Res> Debug for AsyncWorldClient<Req, Res> {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        f.write_str(&format!("AsyncWorldClient({})", self.id))
52    }
53}
54
55impl<Req1, Res1, Req2, Res2> PartialEq<AsyncWorldClient<Req2, Res2>> for AsyncWorldClient<Req1, Res1> {
56    fn eq(&self, other: &AsyncWorldClient<Req2, Res2>) -> bool {
57        self.id == other.id
58    }
59}
60
61impl<Req, Res> Eq for AsyncWorldClient<Req, Res> {}
62
63impl<Req, Res> Hash for AsyncWorldClient<Req, Res> {
64    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
65        self.id.hash(state)
66    }
67}
68
69impl<Req, Res> AsyncWorldClient<Req, Res>
70where
71    Res: Send + 'static
72{
73    /// This method sends a request, expecting a response, from a sync world into an async world.
74    /// 
75    /// This is a blocking call, but it should be safe to be called from a tokio context.
76    pub fn send_request(&self, data: Req) -> Result<Res, Error> {
77        let (send, recv) = tokio::sync::oneshot::channel();
78        let req = AsyncWorldRequest {
79            request: data,
80            channel: send,
81        };
82        self.sender.send(req).map_err(|_| Error::new(ErrorKind::BrokenPipe, "Channel broken"))?;
83
84        let handle = std::thread::spawn(move || {
85            recv.blocking_recv()
86        });
87
88        let res = handle.join().map_err(|_| Error::new(ErrorKind::Other, "Failed to spwan a thread"))?
89        .map_err(|_| Error::new(ErrorKind::BrokenPipe, "Channel broken"))?;
90        Ok(res)
91    }
92}
93
94pub struct AsyncWorldServer<Req, Res> {
95    id: u128,
96    receiver: tokio::sync::mpsc::UnboundedReceiver<AsyncWorldRequest<Req, Res>>,
97}
98
99impl<Req, Res> Debug for AsyncWorldServer<Req, Res> {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        f.write_str(&format!("AsyncWorldServer({})", self.id))
102    }
103}
104
105impl<Req1, Res1, Req2, Res2> PartialEq<AsyncWorldServer<Req2, Res2>> for AsyncWorldServer<Req1, Res1> {
106    fn eq(&self, other: &AsyncWorldServer<Req2, Res2>) -> bool {
107        self.id == other.id
108    }
109}
110
111impl<Req, Res> Eq for AsyncWorldServer<Req, Res> {}
112
113impl<Req, Res> Hash for AsyncWorldServer<Req, Res> {
114    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
115        self.id.hash(state)
116    }
117}
118
119impl<Req, Res> AsyncWorldServer<Req, Res> {
120    /// This method must be called from a tokio context.
121    pub async fn accept(&mut self) -> Option<(Req, impl FnOnce(Res))> {
122        self.receiver.recv().await.map(|req| req.extract())
123    }
124}