1use std::convert::Infallible;
2use std::future::Future;
3use std::pin::Pin;
4
5use std::task::{Context, Poll};
6use tokio::sync::mpsc::error::SendError;
7use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, WeakUnboundedSender};
8use tokio::sync::{mpsc, oneshot};
9
10pub fn actor_channel<Msg, Command>() -> (
11 ActorRef<Msg, Command>,
12 UnboundedReceiver<ActorMsg<Msg, Command>>,
13) {
14 let (sender, receiver) = mpsc::unbounded_channel();
15 (ActorRef::new(sender), receiver)
16}
17
18pub struct ActorRef<Msg, Command = Infallible> {
19 inner: UnboundedSender<ActorMsg<Msg, Command>>,
20}
21
22impl<Msg, Command> Clone for ActorRef<Msg, Command> {
23 fn clone(&self) -> Self {
24 Self::new(self.inner.clone())
25 }
26}
27
28pub enum ActorMsg<Msg, Command = Infallible> {
29 Msg(Msg),
30 Cmd(Command),
31}
32
33impl<Msg, Command> From<Msg> for ActorMsg<Msg, Command> {
34 #[inline(always)]
35 fn from(msg: Msg) -> Self {
36 ActorMsg::Msg(msg)
37 }
38}
39
40impl<Msg, Command> ActorRef<Msg, Command> {
41 #[inline]
42 pub fn new(inner: UnboundedSender<ActorMsg<Msg, Command>>) -> Self {
43 Self { inner }
44 }
45
46 #[inline]
47 pub fn try_send<M>(&self, msg: M) -> Result<(), SendError<ActorMsg<Msg, Command>>>
48 where
49 Msg: From<M>,
50 {
51 self.inner.send(ActorMsg::Msg(msg.into()))
52 }
53
54 #[inline]
55 pub fn send<M>(&self, msg: M)
56 where
57 Msg: From<M>,
58 {
59 let _ = self.inner.send(ActorMsg::Msg(msg.into()));
60 }
61
62 pub fn ask<Resp>(&self) -> oneshot::Receiver<Resp>
63 where
64 Msg: From<oneshot::Sender<Resp>>,
65 {
66 let (tx, rx) = oneshot::channel::<Resp>();
67 let _ = self.inner.send(ActorMsg::Msg(tx.into()));
68 rx
69 }
70
71 pub fn ask_or_default<Resp>(&self) -> PendingRespOrDefault<Resp>
72 where
73 Msg: From<oneshot::Sender<Resp>>,
74 Resp: Default,
75 {
76 PendingRespOrDefault(self.ask())
77 }
78
79 pub fn try_command<Resp>(
80 &self,
81 msg: Command,
82 ) -> Result<(), SendError<ActorMsg<Msg, Command>>>
83 where
84 Msg: From<oneshot::Sender<Resp>>,
85 Resp: Default,
86 {
87 self.inner.send(ActorMsg::Cmd(msg))
88 }
89
90 pub fn command<IntoCommand>(&self, command: IntoCommand)
91 where
92 Command: From<IntoCommand>,
93 {
94 let _ = self.inner.send(ActorMsg::Cmd(command.into()));
95 }
96
97 #[inline]
98 pub fn downgrade(&self) -> WeakActorRef<Msg, Command> {
99 WeakActorRef {
100 inner: self.inner.downgrade(),
101 }
102 }
103
104 #[inline]
105 pub fn strong_count(&self) -> usize {
106 self.inner.strong_count()
107 }
108
109 #[inline]
110 pub fn weak_count(&self) -> usize {
111 self.inner.weak_count()
112 }
113
114 #[inline]
115 pub fn is_closed(&self) -> bool {
116 self.inner.is_closed()
117 }
118}
119
120pub struct PendingRespOrDefault<T>(oneshot::Receiver<T>);
121
122impl<T> Future for PendingRespOrDefault<T>
123where
124 T: Default,
125{
126 type Output = T;
127
128 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
129 let pin = Pin::new(&mut Pin::get_mut(self).0);
130 match Future::poll(pin, cx) {
131 Poll::Ready(res) => match res {
132 Ok(res) => Poll::Ready(res),
133 Err(_) => Poll::Ready(T::default()),
134 },
135 Poll::Pending => Poll::Pending,
136 }
137 }
138}
139
140impl<T> PendingRespOrDefault<T>
141where
142 T: Default,
143{
144 pub fn blocking_recv(self) -> T {
145 self.0.blocking_recv().unwrap_or_default()
146 }
147}
148
149pub struct WeakActorRef<Msg, Command = Infallible> {
150 inner: WeakUnboundedSender<ActorMsg<Msg, Command>>,
151}
152
153impl<Msg, Command> WeakActorRef<Msg, Command> {
154 #[inline]
155 pub fn upgrade(&self) -> Option<ActorRef<Msg, Command>> {
156 self.inner.upgrade().map(ActorRef::new)
157 }
158
159 #[inline]
160 pub fn strong_count(&self) -> usize {
161 self.inner.strong_count()
162 }
163
164 #[inline]
165 pub fn weak_count(&self) -> usize {
166 self.inner.weak_count()
167 }
168}
169
170#[cfg(test)]
171mod tests {
172 use super::*;
173
174 #[test]
175 fn test() {
176 assert_eq!(1, size_of::<Result<(), SendError<()>>>());
177 }
178}