1use crate::{
2 addr::Addr,
3 envelope::Envelope,
4 errors::{RequestError, SendError, TrySendError},
5 request_table::ResponseToken,
6};
7
8#[instability::unstable]
9pub trait RemoteHandle: Send + Sync + 'static {
10 fn send(&self, recipient: Addr, envelope: Envelope) -> SendResult;
11 fn try_send(&self, recipient: Addr, envelope: Envelope) -> Result<(), TrySendError<Envelope>>;
12 fn unbounded_send(
13 &self,
14 recipient: Addr,
15 envelope: Envelope,
16 ) -> Result<(), SendError<Envelope>>;
17 fn respond(&self, token: ResponseToken, response: Result<Envelope, RequestError>);
18}
19
20#[instability::unstable]
21pub enum SendResult {
22 Ok,
23 Err(SendError<Envelope>),
24 Wait(SendNotified, Envelope),
25}
26
27#[instability::unstable]
28pub use self::notifier::*;
29mod notifier {
30 use std::{
31 future::Future,
32 pin::Pin,
33 sync::atomic::{AtomicUsize, Ordering},
34 task,
35 };
36
37 use futures_intrusive::sync::{SharedSemaphore, SharedSemaphoreAcquireFuture};
38 use pin_project::pin_project;
39
40 #[instability::unstable]
41 pub struct SendNotify {
42 semaphore: SharedSemaphore,
43 waiters: AtomicUsize,
44 }
45
46 impl Default for SendNotify {
47 fn default() -> Self {
48 Self {
49 semaphore: SharedSemaphore::new(true, 0),
50 waiters: AtomicUsize::new(0),
51 }
52 }
53 }
54
55 impl SendNotify {
56 #[instability::unstable]
57 pub fn notified(&self) -> SendNotified {
58 self.waiters.fetch_add(1, Ordering::SeqCst);
59 SendNotified(self.semaphore.acquire(1))
60 }
61
62 #[instability::unstable]
63 pub fn notify(&self) {
64 let waiters = self.waiters.swap(0, Ordering::SeqCst);
65 self.semaphore.release(waiters);
66 }
67 }
68
69 #[instability::unstable]
70 #[pin_project]
71 pub struct SendNotified(#[pin] SharedSemaphoreAcquireFuture);
72
73 impl Future for SendNotified {
74 type Output = ();
75
76 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
77 let this = self.project();
78
79 this.0.poll(cx).map(|mut r| {
80 r.disarm();
81 })
82 }
83 }
84}