netidx_protocols/pack_channel/
server.rs1use crate::channel::server;
2use anyhow::Result;
3use bytes::{Buf, Bytes, BytesMut};
4use netidx::{
5 pack::Pack,
6 path::Path,
7 protocol::resolver::UserInfo,
8 publisher::{PublishFlags, Publisher, Value},
9};
10use parking_lot::Mutex;
11use std::{mem, time::Duration};
12use tokio::sync::Mutex as AsyncMutex;
13
14pub struct Batch {
15 data: BytesMut,
16}
17
18impl Batch {
19 pub fn queue<S: Pack>(&mut self, t: &S) -> Result<()> {
21 Ok(Pack::encode(t, &mut self.data)?)
22 }
23
24 pub fn len(&self) -> usize {
26 self.data.len()
27 }
28}
29
30pub struct Connection {
32 inner: server::Connection,
33 buf: Mutex<BytesMut>,
34 queue: AsyncMutex<Bytes>,
35}
36
37impl Connection {
38 pub fn is_dead(&self) -> bool {
40 self.inner.is_dead()
41 }
42
43 pub fn start_batch(&self) -> Batch {
45 Batch { data: mem::replace(&mut *self.buf.lock(), BytesMut::new()) }
46 }
47
48 pub async fn send(&self, mut batch: Batch) -> Result<()> {
50 let v = Value::from(batch.data.split().freeze());
51 self.inner.send_one(v).await?;
52 *self.buf.lock() = batch.data;
53 Ok(())
54 }
55
56 pub async fn send_one<S: Pack>(&self, t: &S) -> Result<()> {
58 let mut b = self.start_batch();
59 b.queue(t)?;
60 self.send(b).await
61 }
62
63 async fn try_fill_queue(&self, queue: &mut Bytes) -> Result<()> {
64 if !queue.has_remaining() {
65 match self.inner.try_recv_one().await? {
66 Some(Value::Bytes(buf)) => *queue = (*buf).clone(),
67 Some(_) => bail!("unexpected response"),
68 None => (),
69 }
70 }
71 Ok(())
72 }
73
74 async fn fill_queue(&self, queue: &mut Bytes) -> Result<()> {
75 if !queue.has_remaining() {
76 match self.inner.recv_one().await? {
77 Value::Bytes(buf) => *queue = (*buf).clone(),
78 _ => bail!("unexpected response"),
79 }
80 }
81 Ok(())
82 }
83
84 pub async fn recv<R: Pack + 'static, F: FnMut(R) -> bool>(
92 &self,
93 mut f: F,
94 ) -> Result<()> {
95 let mut queue = self.queue.lock().await;
96 self.fill_queue(&mut *queue).await?;
97 loop {
98 if queue.has_remaining() {
99 if !f(Pack::decode(&mut *queue)?) {
100 break;
101 }
102 } else {
103 self.try_fill_queue(&mut *queue).await?;
104 if !queue.has_remaining() {
105 break;
106 }
107 }
108 }
109 Ok(())
110 }
111
112 pub async fn try_recv<R: Pack + 'static, F: FnMut(R) -> bool>(
117 &self,
118 mut f: F,
119 ) -> Result<()> {
120 let mut queue = self.queue.lock().await;
121 self.try_fill_queue(&mut *queue).await?;
122 loop {
123 if queue.has_remaining() {
124 if !f(Pack::decode(&mut *queue)?) {
125 break;
126 }
127 } else {
128 self.try_fill_queue(&mut *queue).await?;
129 if !queue.has_remaining() {
130 break;
131 }
132 }
133 }
134 Ok(())
135 }
136
137 pub async fn recv_one<R: Pack + 'static>(&self) -> Result<R> {
140 let mut queue = self.queue.lock().await;
141 self.fill_queue(&mut *queue).await?;
142 Ok(<R as Pack>::decode(&mut *queue)?)
143 }
144
145 pub async fn try_recv_one<R: Pack + 'static>(&self) -> Result<Option<R>> {
148 let mut queue = self.queue.lock().await;
149 self.try_fill_queue(&mut *queue).await?;
150 if queue.remaining() > 0 {
151 Ok(Some(Pack::decode(&mut *queue)?))
152 } else {
153 Ok(None)
154 }
155 }
156
157 pub fn user(&self) -> Option<UserInfo> {
159 self.inner.user()
160 }
161}
162
163pub struct Singleton(server::Singleton);
166
167impl Singleton {
168 pub async fn wait_connected(&mut self) -> Result<Connection> {
170 let inner = self.0.wait_connected().await?;
171 Ok(Connection {
172 inner,
173 buf: Mutex::new(BytesMut::new()),
174 queue: AsyncMutex::new(Bytes::new()),
175 })
176 }
177}
178
179pub async fn singleton(
182 publisher: &Publisher,
183 timeout: Option<Duration>,
184 path: Path,
185) -> Result<Singleton> {
186 Ok(Singleton(server::singleton(publisher, timeout, path).await?))
187}
188
189pub async fn singleton_with_flags(
190 publisher: &Publisher,
191 flags: PublishFlags,
192 timeout: Option<Duration>,
193 path: Path,
194) -> Result<Singleton> {
195 Ok(Singleton(server::singleton_with_flags(publisher, flags, timeout, path).await?))
196}
197
198pub struct Listener(server::Listener);
201
202impl Listener {
203 pub async fn new(
204 publisher: &Publisher,
205 timeout: Option<Duration>,
206 path: Path,
207 ) -> Result<Self> {
208 let inner = server::Listener::new(publisher, timeout, path).await?;
209 Ok(Self(inner))
210 }
211
212 pub async fn new_with_flags(
213 publisher: &Publisher,
214 flags: PublishFlags,
215 timeout: Option<Duration>,
216 path: Path,
217 ) -> Result<Self> {
218 let inner =
219 server::Listener::new_with_flags(publisher, flags, timeout, path).await?;
220 Ok(Self(inner))
221 }
222
223 pub async fn accept(&mut self) -> Result<Connection> {
226 let inner = self.0.accept().await?;
227 Ok(Connection {
228 inner,
229 buf: Mutex::new(BytesMut::new()),
230 queue: AsyncMutex::new(Bytes::new()),
231 })
232 }
233}