pupactor/
actor_ref.rs

1use std::convert::Infallible;
2use std::future::Future;
3use std::pin::Pin;
4
5use std::task::{Context, Poll};
6use tokio::sync::mpsc::error::SendError;
7use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, WeakUnboundedSender};
8use tokio::sync::{mpsc, oneshot};
9
10pub fn actor_channel<Msg, Command>() -> (
11    ActorRef<Msg, Command>,
12    UnboundedReceiver<ActorMsg<Msg, Command>>,
13) {
14    let (sender, receiver) = mpsc::unbounded_channel();
15    (ActorRef::new(sender), receiver)
16}
17
18pub struct ActorRef<Msg, Command = Infallible> {
19    inner: UnboundedSender<ActorMsg<Msg, Command>>,
20}
21
22impl<Msg, Command> Clone for ActorRef<Msg, Command> {
23    fn clone(&self) -> Self {
24        Self::new(self.inner.clone())
25    }
26}
27
28pub enum ActorMsg<Msg, Command = Infallible> {
29    Msg(Msg),
30    Cmd(Command),
31}
32
33impl<Msg, Command> From<Msg> for ActorMsg<Msg, Command> {
34    #[inline(always)]
35    fn from(msg: Msg) -> Self {
36        ActorMsg::Msg(msg)
37    }
38}
39
40impl<Msg, Command> ActorRef<Msg, Command> {
41    #[inline]
42    pub fn new(inner: UnboundedSender<ActorMsg<Msg, Command>>) -> Self {
43        Self { inner }
44    }
45
46    #[inline]
47    pub fn try_send<M>(&self, msg: M) -> Result<(), SendError<ActorMsg<Msg, Command>>>
48    where
49        Msg: From<M>,
50    {
51        self.inner.send(ActorMsg::Msg(msg.into()))
52    }
53
54    #[inline]
55    pub fn send<M>(&self, msg: M)
56    where
57        Msg: From<M>,
58    {
59        let _ = self.inner.send(ActorMsg::Msg(msg.into()));
60    }
61
62    pub fn ask<Resp>(&self) -> oneshot::Receiver<Resp>
63    where
64        Msg: From<oneshot::Sender<Resp>>,
65    {
66        let (tx, rx) = oneshot::channel::<Resp>();
67        let _ = self.inner.send(ActorMsg::Msg(tx.into()));
68        rx
69    }
70
71    pub fn ask_or_default<Resp>(&self) -> PendingRespOrDefault<Resp>
72    where
73        Msg: From<oneshot::Sender<Resp>>,
74        Resp: Default,
75    {
76        PendingRespOrDefault(self.ask())
77    }
78
79    pub fn try_command<Resp>(
80        &self,
81        msg: Command,
82    ) -> Result<(), SendError<ActorMsg<Msg, Command>>>
83    where
84        Msg: From<oneshot::Sender<Resp>>,
85        Resp: Default,
86    {
87        self.inner.send(ActorMsg::Cmd(msg))
88    }
89
90    pub fn command<IntoCommand>(&self, command: IntoCommand)
91    where
92        Command: From<IntoCommand>,
93    {
94        let _ = self.inner.send(ActorMsg::Cmd(command.into()));
95    }
96
97    #[inline]
98    pub fn downgrade(&self) -> WeakActorRef<Msg, Command> {
99        WeakActorRef {
100            inner: self.inner.downgrade(),
101        }
102    }
103
104    #[inline]
105    pub fn strong_count(&self) -> usize {
106        self.inner.strong_count()
107    }
108
109    #[inline]
110    pub fn weak_count(&self) -> usize {
111        self.inner.weak_count()
112    }
113
114    #[inline]
115    pub fn is_closed(&self) -> bool {
116        self.inner.is_closed()
117    }
118}
119
120pub struct PendingRespOrDefault<T>(oneshot::Receiver<T>);
121
122impl<T> Future for PendingRespOrDefault<T>
123where
124    T: Default,
125{
126    type Output = T;
127
128    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
129        let pin = Pin::new(&mut Pin::get_mut(self).0);
130        match Future::poll(pin, cx) {
131            Poll::Ready(res) => match res {
132                Ok(res) => Poll::Ready(res),
133                Err(_) => Poll::Ready(T::default()),
134            },
135            Poll::Pending => Poll::Pending,
136        }
137    }
138}
139
140impl<T> PendingRespOrDefault<T>
141where
142    T: Default,
143{
144    pub fn blocking_recv(self) -> T {
145        self.0.blocking_recv().unwrap_or_default()
146    }
147}
148
149pub struct WeakActorRef<Msg, Command = Infallible> {
150    inner: WeakUnboundedSender<ActorMsg<Msg, Command>>,
151}
152
153impl<Msg, Command> WeakActorRef<Msg, Command> {
154    #[inline]
155    pub fn upgrade(&self) -> Option<ActorRef<Msg, Command>> {
156        self.inner.upgrade().map(ActorRef::new)
157    }
158
159    #[inline]
160    pub fn strong_count(&self) -> usize {
161        self.inner.strong_count()
162    }
163
164    #[inline]
165    pub fn weak_count(&self) -> usize {
166        self.inner.weak_count()
167    }
168}
169
170#[cfg(test)]
171mod tests {
172    use super::*;
173
174    #[test]
175    fn test() {
176        assert_eq!(1, size_of::<Result<(), SendError<()>>>());
177    }
178}