interlink/ext/sink.rs
1//! # Sink Support
2//!
3//! This module provides support for attaching futures [`Sink`]'s to your
4//! services to handle sending messages easily
5//!
6//! Use [`ServiceContext::attach_sink`] to attach a sink to your service
7
8use crate::{
9 envelope::ErrorEnvelope,
10 link::{Link, LinkError},
11 msg::ErrorHandler,
12 service::{Service, ServiceContext},
13};
14use futures_sink::Sink;
15use std::{
16 future::Future,
17 pin::Pin,
18 task::{ready, Context, Poll},
19};
20use tokio::sync::mpsc;
21
22impl<S: Service> ServiceContext<S> {
23 /// Attaches a sink to this service and provides a link to the
24 /// service so that it can be used to write messages
25 ///
26 /// `sink` The sink to attach
27 pub fn attach_sink<Si, I>(&self, sink: Si) -> SinkLink<I>
28 where
29 S: ErrorHandler<Si::Error>,
30 Si: Sink<I> + Send + Unpin + 'static,
31 Si::Error: Send + 'static,
32 I: Send + 'static,
33 {
34 SinkService::start(sink, self.link())
35 }
36}
37
38/// Dedicated link type for sinks.
39///
40/// This is cheaply clonable so you can clone
41/// it to use it in multiple places.
42pub struct SinkLink<I>(mpsc::UnboundedSender<SinkMessage<I>>);
43
44impl<I> Clone for SinkLink<I> {
45 fn clone(&self) -> Self {
46 Self(self.0.clone())
47 }
48}
49
50/// Messages used to communicate internally with the sink
51enum SinkMessage<I> {
52 /// Feed an item into the sink optionally flushing
53 Feed { item: I, flush: bool },
54 /// Flush all the items in the sink
55 Flush,
56 /// Tells the sink to stop processing values
57 Stop,
58}
59
60/// Additional logic for sink links
61impl<I> SinkLink<I>
62where
63 I: Send,
64{
65 /// Sends a new item to the sink. This item will be written
66 /// to the underlying target as soon as the previous messages
67 /// are written.
68 ///
69 /// This does not required calling flush.
70 ///
71 /// Any previous calls to feed will be flushed with this message
72 ///
73 /// `item` The item to write
74 pub fn sink(&self, item: I) -> Result<(), LinkError> {
75 self.0
76 .send(SinkMessage::Feed { item, flush: true })
77 .map_err(|_| LinkError::Send)
78 }
79
80 /// Feeds a new item to the sink this item will not be written
81 /// until flush is called.
82 ///
83 /// This requires flush be called at some point to send
84 ///
85 /// `item` The item to feed
86 pub fn feed(&self, item: I) -> Result<(), LinkError> {
87 self.0
88 .send(SinkMessage::Feed { item, flush: false })
89 .map_err(|_| LinkError::Send)
90 }
91
92 /// Requests that the Sink service flush any messages that have
93 /// been fed to the sink
94 pub fn flush(&self) -> Result<(), LinkError> {
95 self.0.send(SinkMessage::Flush).map_err(|_| LinkError::Send)
96 }
97
98 /// Requests that the sink shutdown and stop accepting new
99 /// messages. This will drop the underlying write target
100 pub fn stop(&self) -> Result<(), LinkError> {
101 self.0.send(SinkMessage::Stop).map_err(|_| LinkError::Send)
102 }
103}
104
105/// Service for handling a Sink and its writing this is a
106/// lightweight service which has its own link type and doesn't
107/// implement normal service logic to be more lightweight
108struct SinkService<S, Si, I> {
109 /// The underlying sink to write to
110 sink: Si,
111 /// The link to the attached service
112 link: Link<S>,
113 /// Receiver for sink messages
114 rx: mpsc::UnboundedReceiver<SinkMessage<I>>,
115 /// The current sink service action
116 action: Option<FutState<I>>,
117}
118
119impl<S, Si, I> SinkService<S, Si, I>
120where
121 S: Service + ErrorHandler<Si::Error>,
122 Si: Sink<I> + Send + Unpin + 'static,
123 Si::Error: Send + 'static,
124 I: Send + 'static,
125{
126 /// Starts a new sink service. You should attach this
127 ///
128 /// `sink` The sink to send and feed the items into
129 /// `link` Link to the service that will handle the items
130 pub(crate) fn start(sink: Si, link: Link<S>) -> SinkLink<I> {
131 let (tx, rx) = mpsc::unbounded_channel();
132 let sink_link = SinkLink(tx);
133 let service = SinkService {
134 sink,
135 link,
136 rx,
137 action: None,
138 };
139 tokio::spawn(service);
140 sink_link
141 }
142
143 /// Handlings polling the provided action on the provided sink
144 ///
145 /// `sink` The sink to poll against
146 /// `state` The state to poll
147 /// `cx` The polling context
148 fn poll_sink(
149 mut sink: Pin<&mut Si>,
150 state: &mut FutState<I>,
151 cx: &mut Context<'_>,
152 ) -> Poll<Result<(), Si::Error>> {
153 let flush = match state {
154 FutState::Feed { item, flush } => {
155 if item.is_some() {
156 // Poll the sink until its ready
157 ready!(sink.as_mut().poll_ready(cx))?;
158
159 // Send the item to the sink
160 let item = item.take().expect("polled feed after completion");
161 sink.as_mut().start_send(item)?;
162 }
163 *flush
164 }
165 FutState::Flush => true,
166 };
167
168 // Flush the sink if required
169 if flush {
170 sink.poll_flush(cx)
171 } else {
172 Poll::Ready(Ok(()))
173 }
174 }
175}
176
177/// State for what the service is currently doing
178enum FutState<I> {
179 /// Feeding into the sink
180 Feed {
181 /// The item to feed
182 item: Option<I>,
183 /// Whether to flush after feeding the item
184 flush: bool,
185 },
186 /// Flushing the sink
187 Flush,
188}
189
190impl<I> Unpin for FutState<I> {}
191
192impl<S, Si, I> Future for SinkService<S, Si, I>
193where
194 S: Service + ErrorHandler<Si::Error>,
195 Si: Sink<I> + Send + Unpin + 'static,
196 Si::Error: Send + 'static,
197 I: Send + 'static,
198{
199 type Output = ();
200
201 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
202 let this = self.get_mut();
203
204 loop {
205 this.action = if let Some(action) = &mut this.action {
206 let sink = Pin::new(&mut this.sink);
207 if let Err(err) = ready!(Self::poll_sink(sink, action, cx)) {
208 if this.link.tx(ErrorEnvelope::new(err)).is_err() {
209 // If the error message couldn't be sent the service is stopped
210 break;
211 }
212 }
213 None
214 } else {
215 let msg = match ready!(this.rx.poll_recv(cx)) {
216 Some(value) => value,
217 // Nothing left to recv
218 None => break,
219 };
220
221 let state = match msg {
222 SinkMessage::Feed { item, flush } => FutState::Feed {
223 item: Some(item),
224 flush,
225 },
226 SinkMessage::Flush => FutState::Flush,
227 // Nothing left to recv stop processing
228 SinkMessage::Stop => break,
229 };
230 Some(state)
231 }
232 }
233
234 Poll::Ready(())
235 }
236}