net_stream/actors/
write_actor.rs

1use super::ActorShutdown;
2use futures::channel::mpsc;
3use futures::FutureExt;
4use futures::Sink;
5use futures::SinkExt;
6use futures::StreamExt;
7use std::fmt::Debug;
8use std::future::Future;
9use std::pin::Pin;
10
11/// Spawn new actor
12pub(crate) fn spawn_actor<S, T>(sink: S) -> WriteActorHandle<T>
13where
14    T: Send + 'static,
15    S: Sink<T> + Send + Unpin + 'static,
16    S::Error: std::fmt::Debug,
17{
18    let (actor_msg_tx, actor_msg_rx) = mpsc::channel(32);
19    let join_handle = tokio::spawn(actor(actor_msg_rx, sink));
20
21    let shutdown_reason = join_handle.map_into::<ActorShutdown<WriteActorShutdownReason>>().boxed().shared();
22
23    WriteActorHandle::new(actor_msg_tx, shutdown_reason)
24}
25
26#[derive(thiserror::Error, Debug)]
27pub(crate) enum WriteActorError {
28    #[error("Actor could not keep up, channel is full.")]
29    ChannelFull,
30
31    #[error("Actor no longer accepting messages.")]
32    ChannelClosed,
33}
34
35/// Reason for actor shutdown.
36#[derive(Debug, Clone, Copy)]
37pub(crate) enum WriteActorShutdownReason {
38    /// Actor was signalled to shutdown.
39    Signalled,
40
41    /// Error operating on sink.
42    SinkError,
43
44    /// All actor handles were dropped.
45    ActorHandlesDropped,
46}
47
48// TODO: When type_alias_impl_trait is stabilized, this becomes:
49// type ShutdownReasonFuture = futures::future::Shared<impl Future<Output = ActorShutdown<WriteActorShutdownReason>>>;
50// For now, in stable Rust this is the workaround (the Send requirement is there only to help it match the return type of the `.boxed()` method):
51type ShutdownReasonFuture = futures::future::Shared<Pin<Box<dyn Future<Output = ActorShutdown<WriteActorShutdownReason>> + Send>>>;
52
53/// Actor handle.
54#[derive(Debug, Clone)]
55pub(crate) struct WriteActorHandle<T> {
56    actor_msg_tx: mpsc::Sender<ActorMessage<T>>,
57    shutdown_reason: ShutdownReasonFuture,
58}
59
60impl<T> WriteActorHandle<T>
61where
62    T: Send + 'static,
63{
64    fn new(actor_msg_tx: mpsc::Sender<ActorMessage<T>>, shutdown_reason: ShutdownReasonFuture) -> Self {
65        Self {
66            actor_msg_tx,
67            shutdown_reason,
68        }
69    }
70
71    /// Signal actor to call [futures::SinkExt::send] on sink.
72    pub fn send(&mut self, item: T) -> Result<(), WriteActorError> {
73        self.send_actor_message(ActorMessage::Send(item))
74    }
75
76    /// Signal actor to call [futures::SinkExt::feed] on sink.
77    pub fn feed(&mut self, item: T) -> Result<(), WriteActorError> {
78        self.send_actor_message(ActorMessage::Feed(item))
79    }
80
81    /// Signal actor to call [futures::SinkExt::flush] on sink.
82    pub fn flush(&mut self) -> Result<(), WriteActorError> {
83        self.send_actor_message(ActorMessage::Flush)
84    }
85
86    /// Signal actor to shutdown.
87    pub fn signal_shutdown(&mut self) -> Result<(), WriteActorError> {
88        self.send_actor_message(ActorMessage::Shutdown)
89    }
90
91    /// Wait for actor to finish and return reason for shutdown.
92    pub fn wait(&self) -> impl Future<Output = ActorShutdown<WriteActorShutdownReason>> {
93        self.shutdown_reason.clone()
94    }
95
96    /// Checks if the actor task has shut down.
97    ///
98    /// This returns None if the actor is still running, and a Some variant if
99    /// the actor has shut down.
100    #[allow(dead_code)]
101    pub fn is_finished(&self) -> Option<ActorShutdown<WriteActorShutdownReason>> {
102        self.shutdown_reason.peek().cloned()
103    }
104
105    fn send_actor_message(&mut self, msg: ActorMessage<T>) -> Result<(), WriteActorError> {
106        self.actor_msg_tx.try_send(msg).map_err(|err| {
107            if err.is_full() {
108                WriteActorError::ChannelFull
109            } else if err.is_disconnected() {
110                WriteActorError::ChannelClosed
111            } else {
112                unreachable!()
113            }
114        })
115    }
116}
117
118#[derive(Debug, Clone)]
119enum ActorMessage<T> {
120    Send(T),
121    Feed(T),
122    Flush,
123    Shutdown,
124}
125
126async fn actor<S, T>(mut actor_msg_rx: mpsc::Receiver<ActorMessage<T>>, mut sink: S) -> WriteActorShutdownReason
127where
128    S: Sink<T> + Unpin,
129    S::Error: Debug,
130{
131    let shutdown_reason = loop {
132        match actor_msg_rx.next().await {
133            Some(msg) => match msg {
134                ActorMessage::Send(item) => {
135                    if let Err(err) = sink.send(item).await {
136                        log::warn!("Sink error {err:?}");
137                        break WriteActorShutdownReason::SinkError;
138                    }
139                }
140                ActorMessage::Feed(item) => {
141                    if let Err(err) = sink.feed(item).await {
142                        log::warn!("Sink error {err:?}");
143                        break WriteActorShutdownReason::SinkError;
144                    }
145                }
146                ActorMessage::Flush => {
147                    if let Err(err) = sink.flush().await {
148                        log::warn!("Sink error {err:?}");
149                        break WriteActorShutdownReason::SinkError;
150                    }
151                }
152                ActorMessage::Shutdown => {
153                    break WriteActorShutdownReason::Signalled;
154                }
155            },
156            None => {
157                // All actor handles have been dropped.
158                break WriteActorShutdownReason::ActorHandlesDropped;
159            }
160        }
161    };
162
163    // close sink, flushing all remaining output.
164    if let Err(err) = sink.close().await {
165        log::warn!("Sink error {err:?}");
166    }
167
168    log::debug!("Shutting down: {shutdown_reason:?}");
169    shutdown_reason
170}