netidx_protocols/pack_channel/
client.rs1use crate::channel::client;
2use anyhow::Result;
3use bytes::{Buf, Bytes, BytesMut};
4use netidx::{
5 pack::Pack,
6 path::Path,
7 subscriber::{Subscriber, Value},
8};
9use parking_lot::Mutex;
10use std::mem;
11use tokio::sync::Mutex as AsyncMutex;
12
13pub struct Batch {
14 data: BytesMut,
15}
16
17impl Batch {
18 pub fn queue<S: Pack>(&mut self, t: &S) -> Result<()> {
20 Ok(Pack::encode(t, &mut self.data)?)
21 }
22
23 pub fn len(&self) -> usize {
25 self.data.len()
26 }
27}
28
29pub struct Connection {
30 inner: client::Connection,
31 buf: Mutex<BytesMut>,
32 queue: AsyncMutex<Bytes>,
33}
34
35impl Connection {
36 pub async fn connect(subscriber: &Subscriber, path: Path) -> Result<Connection> {
39 let inner = client::Connection::connect(subscriber, path).await?;
40 Ok(Connection {
41 inner,
42 buf: Mutex::new(BytesMut::new()),
43 queue: AsyncMutex::new(Bytes::new()),
44 })
45 }
46
47 pub fn is_dead(&self) -> bool {
49 self.inner.is_dead()
50 }
51
52 pub fn start_batch(&self) -> Batch {
54 Batch { data: mem::replace(&mut *self.buf.lock(), BytesMut::new()) }
55 }
56
57 pub fn send(&self, mut batch: Batch) -> Result<()> {
59 let v = Value::from(batch.data.split().freeze());
60 self.inner.send(v)?;
61 Ok(*self.buf.lock() = batch.data)
62 }
63
64 pub fn send_one<S: Pack>(&self, t: &S) -> Result<()> {
66 let mut b = self.start_batch();
67 b.queue(t)?;
68 self.send(b)
69 }
70
71 pub fn dirty(&self) -> bool {
74 self.inner.dirty()
75 }
76
77 pub async fn flush(&self) -> Result<()> {
79 self.inner.flush().await
80 }
81
82 async fn try_fill_queue(&self, queue: &mut Bytes) -> Result<()> {
83 if !queue.has_remaining() {
84 match self.inner.try_recv_one().await? {
85 Some(Value::Bytes(buf)) => *queue = (*buf).clone(),
86 Some(v) => bail!("unexpected response {}", v),
87 None => (),
88 }
89 }
90 Ok(())
91 }
92
93 async fn fill_queue(&self, queue: &mut Bytes) -> Result<()> {
94 if !queue.has_remaining() {
95 match self.inner.recv_one().await? {
96 Value::Bytes(buf) => *queue = (*buf).clone(),
97 v => bail!("unexpected response {}", v),
98 }
99 }
100 Ok(())
101 }
102
103 pub async fn recv<R: Pack + 'static, F: FnMut(R) -> bool>(
109 &self,
110 mut f: F,
111 ) -> Result<()> {
112 let mut queue = self.queue.lock().await;
113 self.fill_queue(&mut *queue).await?;
114 loop {
115 if queue.has_remaining() {
116 if !f(Pack::decode(&mut *queue)?) {
117 break;
118 }
119 } else {
120 self.try_fill_queue(&mut *queue).await?;
121 if !queue.has_remaining() {
122 break;
123 }
124 }
125 }
126 Ok(())
127 }
128
129 pub async fn try_recv<R: Pack + 'static, F: FnMut(R) -> bool>(
134 &self,
135 mut f: F,
136 ) -> Result<()> {
137 let mut queue = self.queue.lock().await;
138 self.try_fill_queue(&mut *queue).await?;
139 loop {
140 if queue.has_remaining() {
141 if !f(Pack::decode(&mut *queue)?) {
142 break;
143 }
144 } else {
145 self.try_fill_queue(&mut *queue).await?;
146 if !queue.has_remaining() {
147 break;
148 }
149 }
150 }
151 Ok(())
152 }
153
154 pub async fn recv_one<R: Pack + 'static>(&self) -> Result<R> {
157 let mut queue = self.queue.lock().await;
158 self.fill_queue(&mut *queue).await?;
159 Ok(<R as Pack>::decode(&mut *queue)?)
160 }
161
162 pub async fn try_recv_one<R: Pack + 'static>(&self) -> Result<Option<R>> {
165 let mut queue = self.queue.lock().await;
166 self.try_fill_queue(&mut *queue).await?;
167 if queue.remaining() > 0 {
168 Ok(Some(Pack::decode(&mut *queue)?))
169 } else {
170 Ok(None)
171 }
172 }
173}