capnp_futures/
write_queue.rs1use std::future::Future;
22
23use futures_channel::oneshot;
24use futures_util::{AsyncWrite, AsyncWriteExt, StreamExt, TryFutureExt};
25
26use capnp::Error;
27
28use crate::serialize::AsOutputSegments;
29
30enum Item<M>
31where
32 M: AsOutputSegments,
33{
34 Message(M, oneshot::Sender<M>),
35 Done(Result<(), Error>, oneshot::Sender<()>),
36}
37
38pub struct Sender<M>
40where
41 M: AsOutputSegments,
42{
43 sender: futures_channel::mpsc::UnboundedSender<Item<M>>,
44 in_flight: std::sync::Arc<std::sync::atomic::AtomicI32>,
45}
46
47impl<M> Clone for Sender<M>
48where
49 M: AsOutputSegments,
50{
51 fn clone(&self) -> Self {
52 Self {
53 sender: self.sender.clone(),
54 in_flight: self.in_flight.clone(),
55 }
56 }
57}
58
59pub fn write_queue<W, M>(mut writer: W) -> (Sender<M>, impl Future<Output = Result<(), Error>>)
66where
67 W: AsyncWrite + Unpin,
68 M: AsOutputSegments,
69{
70 let (tx, mut rx) = futures_channel::mpsc::unbounded::<Item<M>>();
71
72 let in_flight = std::sync::Arc::new(std::sync::atomic::AtomicI32::new(0));
73
74 let sender = Sender {
75 sender: tx,
76 in_flight: in_flight.clone(),
77 };
78
79 let queue = async move {
80 while let Some(item) = rx.next().await {
81 match item {
82 Item::Message(m, returner) => {
83 let result = crate::serialize::write_message(&mut writer, &m).await;
84 in_flight.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
85 result?;
86 writer.flush().await?;
87 let _ = returner.send(m);
88 }
89 Item::Done(r, finisher) => {
90 let _ = finisher.send(());
91 return r;
92 }
93 }
94 }
95 Ok(())
96 };
97
98 (sender, queue)
99}
100
101fn _assert_kinds() {
102 fn _assert_send<T: Send>(_x: T) {}
103 fn _assert_sync<T: Sync>() {}
104 fn _assert_write_queue_send<W: AsyncWrite + Unpin + Send, M: AsOutputSegments + Sync + Send>(
105 w: W,
106 ) {
107 let (s, f) = write_queue::<W, M>(w);
108 _assert_send(s);
109 _assert_send(f);
110 }
111 fn _assert_write_queue_send_2<W: AsyncWrite + Unpin + Send>(w: W) {
112 let (s, f) = write_queue::<W, capnp::message::Builder<capnp::message::HeapAllocator>>(w);
113 _assert_send(s);
114 _assert_send(f);
115 }
116}
117
118impl<M> Sender<M>
119where
120 M: AsOutputSegments,
121{
122 pub fn send(&mut self, message: M) -> impl Future<Output = Result<M, Error>> + Unpin {
125 self.in_flight
126 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
127 let (complete, oneshot) = oneshot::channel();
128
129 let _ = self.sender.unbounded_send(Item::Message(message, complete));
130
131 oneshot.map_err(|oneshot::Canceled| Error::disconnected("WriteQueue has terminated".into()))
132 }
133
134 pub fn len(&self) -> usize {
136 let result = self.in_flight.load(std::sync::atomic::Ordering::SeqCst);
137 assert!(result >= 0);
138 result as usize
139 }
140
141 pub fn is_empty(&self) -> bool {
142 self.len() == 0
143 }
144
145 pub fn terminate(
149 &mut self,
150 result: Result<(), Error>,
151 ) -> impl Future<Output = Result<(), Error>> + Unpin {
152 let (complete, receiver) = oneshot::channel();
153
154 let _ = self.sender.unbounded_send(Item::Done(result, complete));
155
156 receiver
157 .map_err(|oneshot::Canceled| Error::disconnected("WriteQueue has terminated".into()))
158 }
159}