capnp_futures/
write_queue.rs

1// Copyright (c) 2016 Sandstorm Development Group, Inc. and contributors
2//
3// Permission is hereby granted, free of charge, to any person obtaining a copy
4// of this software and associated documentation files (the "Software"), to deal
5// in the Software without restriction, including without limitation the rights
6// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7// copies of the Software, and to permit persons to whom the Software is
8// furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19// THE SOFTWARE.
20
21use std::future::Future;
22
23use futures_channel::oneshot;
24use futures_util::{AsyncWrite, AsyncWriteExt, StreamExt, TryFutureExt};
25
26use capnp::Error;
27
28use crate::serialize::AsOutputSegments;
29
30enum Item<M>
31where
32    M: AsOutputSegments,
33{
34    Message(M, oneshot::Sender<M>),
35    Done(Result<(), Error>, oneshot::Sender<()>),
36}
37
38/// A handle that allows messages to be sent to a write queue.
39pub struct Sender<M>
40where
41    M: AsOutputSegments,
42{
43    sender: futures_channel::mpsc::UnboundedSender<Item<M>>,
44    in_flight: std::sync::Arc<std::sync::atomic::AtomicI32>,
45}
46
47impl<M> Clone for Sender<M>
48where
49    M: AsOutputSegments,
50{
51    fn clone(&self) -> Self {
52        Self {
53            sender: self.sender.clone(),
54            in_flight: self.in_flight.clone(),
55        }
56    }
57}
58
59/// Creates a new write queue that wraps the given `AsyncWrite`.
60///
61/// Returns `(sender, task)`, where `sender` can be used to push writes onto the
62/// queue, and `task` is a future that performs the work of the writes. The queue
63/// will run as long as `task` is polled, until either `sender.terminate()` is
64/// called or `sender` and all of its clones are dropped.
65pub fn write_queue<W, M>(mut writer: W) -> (Sender<M>, impl Future<Output = Result<(), Error>>)
66where
67    W: AsyncWrite + Unpin,
68    M: AsOutputSegments,
69{
70    let (tx, mut rx) = futures_channel::mpsc::unbounded::<Item<M>>();
71
72    let in_flight = std::sync::Arc::new(std::sync::atomic::AtomicI32::new(0));
73
74    let sender = Sender {
75        sender: tx,
76        in_flight: in_flight.clone(),
77    };
78
79    let queue = async move {
80        while let Some(item) = rx.next().await {
81            match item {
82                Item::Message(m, returner) => {
83                    let result = crate::serialize::write_message(&mut writer, &m).await;
84                    in_flight.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
85                    result?;
86                    writer.flush().await?;
87                    let _ = returner.send(m);
88                }
89                Item::Done(r, finisher) => {
90                    let _ = finisher.send(());
91                    return r;
92                }
93            }
94        }
95        Ok(())
96    };
97
98    (sender, queue)
99}
100
101fn _assert_kinds() {
102    fn _assert_send<T: Send>(_x: T) {}
103    fn _assert_sync<T: Sync>() {}
104    fn _assert_write_queue_send<W: AsyncWrite + Unpin + Send, M: AsOutputSegments + Sync + Send>(
105        w: W,
106    ) {
107        let (s, f) = write_queue::<W, M>(w);
108        _assert_send(s);
109        _assert_send(f);
110    }
111    fn _assert_write_queue_send_2<W: AsyncWrite + Unpin + Send>(w: W) {
112        let (s, f) = write_queue::<W, capnp::message::Builder<capnp::message::HeapAllocator>>(w);
113        _assert_send(s);
114        _assert_send(f);
115    }
116}
117
118impl<M> Sender<M>
119where
120    M: AsOutputSegments,
121{
122    /// Enqueues a message to be written. Returns the message once the write
123    /// has completed. Dropping the returned future does *not* cancel the write.
124    pub fn send(&mut self, message: M) -> impl Future<Output = Result<M, Error>> + Unpin {
125        self.in_flight
126            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
127        let (complete, oneshot) = oneshot::channel();
128
129        let _ = self.sender.unbounded_send(Item::Message(message, complete));
130
131        oneshot.map_err(|oneshot::Canceled| Error::disconnected("WriteQueue has terminated".into()))
132    }
133
134    /// Returns the number of messages queued to be written.
135    pub fn len(&self) -> usize {
136        let result = self.in_flight.load(std::sync::atomic::Ordering::SeqCst);
137        assert!(result >= 0);
138        result as usize
139    }
140
141    pub fn is_empty(&self) -> bool {
142        self.len() == 0
143    }
144
145    /// Commands the queue to stop writing messages once it is empty. After this method has been called,
146    /// any new calls to `send()` will return a future that immediately resolves to an error.
147    /// If the passed-in `result` is an error, then the `WriteQueue` will resolve to that error.
148    pub fn terminate(
149        &mut self,
150        result: Result<(), Error>,
151    ) -> impl Future<Output = Result<(), Error>> + Unpin {
152        let (complete, receiver) = oneshot::channel();
153
154        let _ = self.sender.unbounded_send(Item::Done(result, complete));
155
156        receiver
157            .map_err(|oneshot::Canceled| Error::disconnected("WriteQueue has terminated".into()))
158    }
159}