netidx_protocols/pack_channel/
client.rs

1use crate::channel::client;
2use anyhow::Result;
3use bytes::{Buf, Bytes, BytesMut};
4use netidx::{
5    pack::Pack,
6    path::Path,
7    subscriber::{Subscriber, Value},
8};
9use parking_lot::Mutex;
10use std::mem;
11use tokio::sync::Mutex as AsyncMutex;
12
13pub struct Batch {
14    data: BytesMut,
15}
16
17impl Batch {
18    /// Queue a packable thing in the batch.
19    pub fn queue<S: Pack>(&mut self, t: &S) -> Result<()> {
20        Ok(Pack::encode(t, &mut self.data)?)
21    }
22
23    /// returns the number of bytes queued in this batch
24    pub fn len(&self) -> usize {
25	self.data.len()
26    }
27}
28
29pub struct Connection {
30    inner: client::Connection,
31    buf: Mutex<BytesMut>,
32    queue: AsyncMutex<Bytes>,
33}
34
35impl Connection {
36    /// Connect to the channel at the the specified path. The channel
37    /// may either be a listener, or a singleton.
38    pub async fn connect(subscriber: &Subscriber, path: Path) -> Result<Connection> {
39        let inner = client::Connection::connect(subscriber, path).await?;
40        Ok(Connection {
41            inner,
42            buf: Mutex::new(BytesMut::new()),
43            queue: AsyncMutex::new(Bytes::new()),
44        })
45    }
46
47    /// return true if the connection is dead
48    pub fn is_dead(&self) -> bool {
49        self.inner.is_dead()
50    }
51
52    /// start a new batch of messages
53    pub fn start_batch(&self) -> Batch {
54        Batch { data: mem::replace(&mut *self.buf.lock(), BytesMut::new()) }
55    }
56
57    /// send a batch of messages
58    pub fn send(&self, mut batch: Batch) -> Result<()> {
59        let v = Value::from(batch.data.split().freeze());
60        self.inner.send(v)?;
61        Ok(*self.buf.lock() = batch.data)
62    }
63
64    /// send just one message
65    pub fn send_one<S: Pack>(&self, t: &S) -> Result<()> {
66        let mut b = self.start_batch();
67        b.queue(t)?;
68        self.send(b)
69    }
70
71    /// return true if messages have been sent but not flushed. Flush
72    /// is only required for pushback.
73    pub fn dirty(&self) -> bool {
74        self.inner.dirty()
75    }
76
77    /// Wait for sent messages to flush to the OS
78    pub async fn flush(&self) -> Result<()> {
79        self.inner.flush().await
80    }
81
82    async fn try_fill_queue(&self, queue: &mut Bytes) -> Result<()> {
83        if !queue.has_remaining() {
84            match self.inner.try_recv_one().await? {
85                Some(Value::Bytes(buf)) => *queue = (*buf).clone(),
86                Some(v) => bail!("unexpected response {}", v),
87                None => (),
88            }
89        }
90        Ok(())
91    }
92
93    async fn fill_queue(&self, queue: &mut Bytes) -> Result<()> {
94        if !queue.has_remaining() {
95            match self.inner.recv_one().await? {
96                Value::Bytes(buf) => *queue = (*buf).clone(),
97                v => bail!("unexpected response {}", v),
98            }
99        }
100        Ok(())
101    }
102
103    /// Receive all avaliable messages, waiting for at least one
104    /// message to arrive before returning. Each message will be
105    /// passed to f, if f returns false, then processing the batch
106    /// will be halted and any remaining messages will stay in the
107    /// channel.
108    pub async fn recv<R: Pack + 'static, F: FnMut(R) -> bool>(
109        &self,
110        mut f: F,
111    ) -> Result<()> {
112        let mut queue = self.queue.lock().await;
113        self.fill_queue(&mut *queue).await?;
114        loop {
115            if queue.has_remaining() {
116                if !f(Pack::decode(&mut *queue)?) {
117                    break;
118                }
119            } else {
120                self.try_fill_queue(&mut *queue).await?;
121                if !queue.has_remaining() {
122                    break;
123                }
124            }
125        }
126        Ok(())
127    }
128
129    /// Receive all available messages, but do not wait for at least 1
130    /// message to arrive. If no messages are available return
131    /// immediatly. This will only block if a concurrent receive is in
132    /// progress.
133    pub async fn try_recv<R: Pack + 'static, F: FnMut(R) -> bool>(
134        &self,
135        mut f: F,
136    ) -> Result<()> {
137        let mut queue = self.queue.lock().await;
138        self.try_fill_queue(&mut *queue).await?;
139        loop {
140            if queue.has_remaining() {
141                if !f(Pack::decode(&mut *queue)?) {
142                    break;
143                }
144            } else {
145                self.try_fill_queue(&mut *queue).await?;
146                if !queue.has_remaining() {
147                    break;
148                }
149            }
150        }
151        Ok(())
152    }
153
154    /// Receive one message, waiting for at least one message to
155    /// arrive if none are queued.
156    pub async fn recv_one<R: Pack + 'static>(&self) -> Result<R> {
157        let mut queue = self.queue.lock().await;
158        self.fill_queue(&mut *queue).await?;
159        Ok(<R as Pack>::decode(&mut *queue)?)
160    }
161
162    /// Receive a message if one is available, otherwise return
163    /// Ok(None).
164    pub async fn try_recv_one<R: Pack + 'static>(&self) -> Result<Option<R>> {
165        let mut queue = self.queue.lock().await;
166        self.try_fill_queue(&mut *queue).await?;
167        if queue.remaining() > 0 {
168            Ok(Some(Pack::decode(&mut *queue)?))
169        } else {
170            Ok(None)
171        }
172    }
173}