net_stream/actors/
write_actor.rs1use super::ActorShutdown;
2use futures::channel::mpsc;
3use futures::FutureExt;
4use futures::Sink;
5use futures::SinkExt;
6use futures::StreamExt;
7use std::fmt::Debug;
8use std::future::Future;
9use std::pin::Pin;
10
11pub(crate) fn spawn_actor<S, T>(sink: S) -> WriteActorHandle<T>
13where
14 T: Send + 'static,
15 S: Sink<T> + Send + Unpin + 'static,
16 S::Error: std::fmt::Debug,
17{
18 let (actor_msg_tx, actor_msg_rx) = mpsc::channel(32);
19 let join_handle = tokio::spawn(actor(actor_msg_rx, sink));
20
21 let shutdown_reason = join_handle.map_into::<ActorShutdown<WriteActorShutdownReason>>().boxed().shared();
22
23 WriteActorHandle::new(actor_msg_tx, shutdown_reason)
24}
25
26#[derive(thiserror::Error, Debug)]
27pub(crate) enum WriteActorError {
28 #[error("Actor could not keep up, channel is full.")]
29 ChannelFull,
30
31 #[error("Actor no longer accepting messages.")]
32 ChannelClosed,
33}
34
35#[derive(Debug, Clone, Copy)]
37pub(crate) enum WriteActorShutdownReason {
38 Signalled,
40
41 SinkError,
43
44 ActorHandlesDropped,
46}
47
48type ShutdownReasonFuture = futures::future::Shared<Pin<Box<dyn Future<Output = ActorShutdown<WriteActorShutdownReason>> + Send>>>;
52
53#[derive(Debug, Clone)]
55pub(crate) struct WriteActorHandle<T> {
56 actor_msg_tx: mpsc::Sender<ActorMessage<T>>,
57 shutdown_reason: ShutdownReasonFuture,
58}
59
60impl<T> WriteActorHandle<T>
61where
62 T: Send + 'static,
63{
64 fn new(actor_msg_tx: mpsc::Sender<ActorMessage<T>>, shutdown_reason: ShutdownReasonFuture) -> Self {
65 Self {
66 actor_msg_tx,
67 shutdown_reason,
68 }
69 }
70
71 pub fn send(&mut self, item: T) -> Result<(), WriteActorError> {
73 self.send_actor_message(ActorMessage::Send(item))
74 }
75
76 pub fn feed(&mut self, item: T) -> Result<(), WriteActorError> {
78 self.send_actor_message(ActorMessage::Feed(item))
79 }
80
81 pub fn flush(&mut self) -> Result<(), WriteActorError> {
83 self.send_actor_message(ActorMessage::Flush)
84 }
85
86 pub fn signal_shutdown(&mut self) -> Result<(), WriteActorError> {
88 self.send_actor_message(ActorMessage::Shutdown)
89 }
90
91 pub fn wait(&self) -> impl Future<Output = ActorShutdown<WriteActorShutdownReason>> {
93 self.shutdown_reason.clone()
94 }
95
96 #[allow(dead_code)]
101 pub fn is_finished(&self) -> Option<ActorShutdown<WriteActorShutdownReason>> {
102 self.shutdown_reason.peek().cloned()
103 }
104
105 fn send_actor_message(&mut self, msg: ActorMessage<T>) -> Result<(), WriteActorError> {
106 self.actor_msg_tx.try_send(msg).map_err(|err| {
107 if err.is_full() {
108 WriteActorError::ChannelFull
109 } else if err.is_disconnected() {
110 WriteActorError::ChannelClosed
111 } else {
112 unreachable!()
113 }
114 })
115 }
116}
117
118#[derive(Debug, Clone)]
119enum ActorMessage<T> {
120 Send(T),
121 Feed(T),
122 Flush,
123 Shutdown,
124}
125
126async fn actor<S, T>(mut actor_msg_rx: mpsc::Receiver<ActorMessage<T>>, mut sink: S) -> WriteActorShutdownReason
127where
128 S: Sink<T> + Unpin,
129 S::Error: Debug,
130{
131 let shutdown_reason = loop {
132 match actor_msg_rx.next().await {
133 Some(msg) => match msg {
134 ActorMessage::Send(item) => {
135 if let Err(err) = sink.send(item).await {
136 log::warn!("Sink error {err:?}");
137 break WriteActorShutdownReason::SinkError;
138 }
139 }
140 ActorMessage::Feed(item) => {
141 if let Err(err) = sink.feed(item).await {
142 log::warn!("Sink error {err:?}");
143 break WriteActorShutdownReason::SinkError;
144 }
145 }
146 ActorMessage::Flush => {
147 if let Err(err) = sink.flush().await {
148 log::warn!("Sink error {err:?}");
149 break WriteActorShutdownReason::SinkError;
150 }
151 }
152 ActorMessage::Shutdown => {
153 break WriteActorShutdownReason::Signalled;
154 }
155 },
156 None => {
157 break WriteActorShutdownReason::ActorHandlesDropped;
159 }
160 }
161 };
162
163 if let Err(err) = sink.close().await {
165 log::warn!("Sink error {err:?}");
166 }
167
168 log::debug!("Shutting down: {shutdown_reason:?}");
169 shutdown_reason
170}