interlink/ext/
sink.rs

1//! # Sink Support
2//!
3//! This module provides support for attaching futures [`Sink`]'s to your
4//! services to handle sending messages easily
5//!
6//! Use [`ServiceContext::attach_sink`] to attach a sink to your service
7
8use crate::{
9    envelope::ErrorEnvelope,
10    link::{Link, LinkError},
11    msg::ErrorHandler,
12    service::{Service, ServiceContext},
13};
14use futures_sink::Sink;
15use std::{
16    future::Future,
17    pin::Pin,
18    task::{ready, Context, Poll},
19};
20use tokio::sync::mpsc;
21
22impl<S: Service> ServiceContext<S> {
23    /// Attaches a sink to this service and provides a link to the
24    /// service so that it can be used to write messages
25    ///
26    /// `sink` The sink to attach
27    pub fn attach_sink<Si, I>(&self, sink: Si) -> SinkLink<I>
28    where
29        S: ErrorHandler<Si::Error>,
30        Si: Sink<I> + Send + Unpin + 'static,
31        Si::Error: Send + 'static,
32        I: Send + 'static,
33    {
34        SinkService::start(sink, self.link())
35    }
36}
37
38/// Dedicated link type for sinks.
39///
40/// This is cheaply clonable so you can clone
41/// it to use it in multiple places.
42pub struct SinkLink<I>(mpsc::UnboundedSender<SinkMessage<I>>);
43
44impl<I> Clone for SinkLink<I> {
45    fn clone(&self) -> Self {
46        Self(self.0.clone())
47    }
48}
49
50/// Messages used to communicate internally with the sink
51enum SinkMessage<I> {
52    /// Feed an item into the sink optionally flushing
53    Feed { item: I, flush: bool },
54    /// Flush all the items in the sink
55    Flush,
56    /// Tells the sink to stop processing values
57    Stop,
58}
59
60/// Additional logic for sink links
61impl<I> SinkLink<I>
62where
63    I: Send,
64{
65    /// Sends a new item to the sink. This item will be written
66    /// to the underlying target as soon as the previous messages
67    /// are written.
68    ///
69    /// This does not required calling flush.
70    ///
71    /// Any previous calls to feed will be flushed with this message
72    ///
73    /// `item` The item to write
74    pub fn sink(&self, item: I) -> Result<(), LinkError> {
75        self.0
76            .send(SinkMessage::Feed { item, flush: true })
77            .map_err(|_| LinkError::Send)
78    }
79
80    /// Feeds a new item to the sink this item will not be written
81    /// until flush is called.
82    ///
83    /// This requires flush be called at some point to send
84    ///
85    /// `item` The item to feed
86    pub fn feed(&self, item: I) -> Result<(), LinkError> {
87        self.0
88            .send(SinkMessage::Feed { item, flush: false })
89            .map_err(|_| LinkError::Send)
90    }
91
92    /// Requests that the Sink service flush any messages that have
93    /// been fed to the sink
94    pub fn flush(&self) -> Result<(), LinkError> {
95        self.0.send(SinkMessage::Flush).map_err(|_| LinkError::Send)
96    }
97
98    /// Requests that the sink shutdown and stop accepting new
99    /// messages. This will drop the underlying write target
100    pub fn stop(&self) -> Result<(), LinkError> {
101        self.0.send(SinkMessage::Stop).map_err(|_| LinkError::Send)
102    }
103}
104
105/// Service for handling a Sink and its writing this is a
106/// lightweight service which has its own link type and doesn't
107/// implement normal service logic to be more lightweight
108struct SinkService<S, Si, I> {
109    /// The underlying sink to write to
110    sink: Si,
111    /// The link to the attached service
112    link: Link<S>,
113    /// Receiver for sink messages
114    rx: mpsc::UnboundedReceiver<SinkMessage<I>>,
115    /// The current sink service action
116    action: Option<FutState<I>>,
117}
118
119impl<S, Si, I> SinkService<S, Si, I>
120where
121    S: Service + ErrorHandler<Si::Error>,
122    Si: Sink<I> + Send + Unpin + 'static,
123    Si::Error: Send + 'static,
124    I: Send + 'static,
125{
126    /// Starts a new sink service. You should attach this
127    ///
128    /// `sink` The sink to send and feed the items into
129    /// `link` Link to the service that will handle the items
130    pub(crate) fn start(sink: Si, link: Link<S>) -> SinkLink<I> {
131        let (tx, rx) = mpsc::unbounded_channel();
132        let sink_link = SinkLink(tx);
133        let service = SinkService {
134            sink,
135            link,
136            rx,
137            action: None,
138        };
139        tokio::spawn(service);
140        sink_link
141    }
142
143    /// Handlings polling the provided action on the provided sink
144    ///
145    /// `sink`   The sink to poll against
146    /// `state`  The state to poll
147    /// `cx`     The polling context
148    fn poll_sink(
149        mut sink: Pin<&mut Si>,
150        state: &mut FutState<I>,
151        cx: &mut Context<'_>,
152    ) -> Poll<Result<(), Si::Error>> {
153        let flush = match state {
154            FutState::Feed { item, flush } => {
155                if item.is_some() {
156                    // Poll the sink until its ready
157                    ready!(sink.as_mut().poll_ready(cx))?;
158
159                    // Send the item to the sink
160                    let item = item.take().expect("polled feed after completion");
161                    sink.as_mut().start_send(item)?;
162                }
163                *flush
164            }
165            FutState::Flush => true,
166        };
167
168        // Flush the sink if required
169        if flush {
170            sink.poll_flush(cx)
171        } else {
172            Poll::Ready(Ok(()))
173        }
174    }
175}
176
177/// State for what the service is currently doing
178enum FutState<I> {
179    /// Feeding into the sink
180    Feed {
181        /// The item to feed
182        item: Option<I>,
183        /// Whether to flush after feeding the item
184        flush: bool,
185    },
186    /// Flushing the sink
187    Flush,
188}
189
190impl<I> Unpin for FutState<I> {}
191
192impl<S, Si, I> Future for SinkService<S, Si, I>
193where
194    S: Service + ErrorHandler<Si::Error>,
195    Si: Sink<I> + Send + Unpin + 'static,
196    Si::Error: Send + 'static,
197    I: Send + 'static,
198{
199    type Output = ();
200
201    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
202        let this = self.get_mut();
203
204        loop {
205            this.action = if let Some(action) = &mut this.action {
206                let sink = Pin::new(&mut this.sink);
207                if let Err(err) = ready!(Self::poll_sink(sink, action, cx)) {
208                    if this.link.tx(ErrorEnvelope::new(err)).is_err() {
209                        // If the error message couldn't be sent the service is stopped
210                        break;
211                    }
212                }
213                None
214            } else {
215                let msg = match ready!(this.rx.poll_recv(cx)) {
216                    Some(value) => value,
217                    // Nothing left to recv
218                    None => break,
219                };
220
221                let state = match msg {
222                    SinkMessage::Feed { item, flush } => FutState::Feed {
223                        item: Some(item),
224                        flush,
225                    },
226                    SinkMessage::Flush => FutState::Flush,
227                    // Nothing left to recv stop processing
228                    SinkMessage::Stop => break,
229                };
230                Some(state)
231            }
232        }
233
234        Poll::Ready(())
235    }
236}