Skip to main content

elfo_core/
remote.rs

1use crate::{
2    addr::Addr,
3    envelope::Envelope,
4    errors::{RequestError, SendError, TrySendError},
5    request_table::ResponseToken,
6};
7
8#[instability::unstable]
9pub trait RemoteHandle: Send + Sync + 'static {
10    fn send(&self, recipient: Addr, envelope: Envelope) -> SendResult;
11    fn try_send(&self, recipient: Addr, envelope: Envelope) -> Result<(), TrySendError<Envelope>>;
12    fn unbounded_send(
13        &self,
14        recipient: Addr,
15        envelope: Envelope,
16    ) -> Result<(), SendError<Envelope>>;
17    fn respond(&self, token: ResponseToken, response: Result<Envelope, RequestError>);
18}
19
20#[instability::unstable]
21pub enum SendResult {
22    Ok,
23    Err(SendError<Envelope>),
24    Wait(SendNotified, Envelope),
25}
26
27#[instability::unstable]
28pub use self::notifier::*;
29mod notifier {
30    use std::{
31        future::Future,
32        pin::Pin,
33        sync::atomic::{AtomicUsize, Ordering},
34        task,
35    };
36
37    use futures_intrusive::sync::{SharedSemaphore, SharedSemaphoreAcquireFuture};
38    use pin_project::pin_project;
39
40    #[instability::unstable]
41    pub struct SendNotify {
42        semaphore: SharedSemaphore,
43        waiters: AtomicUsize,
44    }
45
46    impl Default for SendNotify {
47        fn default() -> Self {
48            Self {
49                semaphore: SharedSemaphore::new(true, 0),
50                waiters: AtomicUsize::new(0),
51            }
52        }
53    }
54
55    impl SendNotify {
56        #[instability::unstable]
57        pub fn notified(&self) -> SendNotified {
58            self.waiters.fetch_add(1, Ordering::SeqCst);
59            SendNotified(self.semaphore.acquire(1))
60        }
61
62        #[instability::unstable]
63        pub fn notify(&self) {
64            let waiters = self.waiters.swap(0, Ordering::SeqCst);
65            self.semaphore.release(waiters);
66        }
67    }
68
69    #[instability::unstable]
70    #[pin_project]
71    pub struct SendNotified(#[pin] SharedSemaphoreAcquireFuture);
72
73    impl Future for SendNotified {
74        type Output = ();
75
76        fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
77            let this = self.project();
78
79            this.0.poll(cx).map(|mut r| {
80                r.disarm();
81            })
82        }
83    }
84}