netidx_protocols/pack_channel/
server.rs

1use crate::channel::server;
2use anyhow::Result;
3use bytes::{Buf, Bytes, BytesMut};
4use netidx::{
5    pack::Pack,
6    path::Path,
7    protocol::resolver::UserInfo,
8    publisher::{PublishFlags, Publisher, Value},
9};
10use parking_lot::Mutex;
11use std::{mem, time::Duration};
12use tokio::sync::Mutex as AsyncMutex;
13
14pub struct Batch {
15    data: BytesMut,
16}
17
18impl Batch {
19    /// Queue a packable thing in the batch.
20    pub fn queue<S: Pack>(&mut self, t: &S) -> Result<()> {
21        Ok(Pack::encode(t, &mut self.data)?)
22    }
23
24    /// Returns the number of bytes queued in this batch
25    pub fn len(&self) -> usize {
26	self.data.len()
27    }
28}
29
30/// A bidirectional channel between two end points.
31pub struct Connection {
32    inner: server::Connection,
33    buf: Mutex<BytesMut>,
34    queue: AsyncMutex<Bytes>,
35}
36
37impl Connection {
38    /// Return true if the connection is dead.
39    pub fn is_dead(&self) -> bool {
40        self.inner.is_dead()
41    }
42
43    /// Start a new batch of messages to be sent to the other side
44    pub fn start_batch(&self) -> Batch {
45        Batch { data: mem::replace(&mut *self.buf.lock(), BytesMut::new()) }
46    }
47
48    /// Send a batch of messages to the other side.
49    pub async fn send(&self, mut batch: Batch) -> Result<()> {
50        let v = Value::from(batch.data.split().freeze());
51        self.inner.send_one(v).await?;
52        *self.buf.lock() = batch.data;
53        Ok(())
54    }
55
56    /// Send one message to the other side
57    pub async fn send_one<S: Pack>(&self, t: &S) -> Result<()> {
58        let mut b = self.start_batch();
59        b.queue(t)?;
60        self.send(b).await
61    }
62
63    async fn try_fill_queue(&self, queue: &mut Bytes) -> Result<()> {
64        if !queue.has_remaining() {
65            match self.inner.try_recv_one().await? {
66                Some(Value::Bytes(buf)) => *queue = (*buf).clone(),
67                Some(_) => bail!("unexpected response"),
68                None => (),
69            }
70        }
71        Ok(())
72    }
73
74    async fn fill_queue(&self, queue: &mut Bytes) -> Result<()> {
75        if !queue.has_remaining() {
76            match self.inner.recv_one().await? {
77                Value::Bytes(buf) => *queue = (*buf).clone(),
78                _ => bail!("unexpected response"),
79            }
80        }
81        Ok(())
82    }
83
84    /// Receive a batch of messages from the other side. Recv will
85    /// repeatedly call the specified closure with new messages until
86    /// either,
87    ///
88    /// - the closure returns false
89    /// - there are no more messages
90    ///
91    pub async fn recv<R: Pack + 'static, F: FnMut(R) -> bool>(
92        &self,
93        mut f: F,
94    ) -> Result<()> {
95        let mut queue = self.queue.lock().await;
96        self.fill_queue(&mut *queue).await?;
97        loop {
98            if queue.has_remaining() {
99                if !f(Pack::decode(&mut *queue)?) {
100                    break;
101                }
102            } else {
103                self.try_fill_queue(&mut *queue).await?;
104                if !queue.has_remaining() {
105                    break;
106                }
107            }
108        }
109        Ok(())
110    }
111
112    /// Receive all available messages, but do not wait for at least 1
113    /// message to arrive. If no messages are available return
114    /// immediatly. This will only block if a concurrent receive is in
115    /// progress.
116    pub async fn try_recv<R: Pack + 'static, F: FnMut(R) -> bool>(
117        &self,
118        mut f: F,
119    ) -> Result<()> {
120        let mut queue = self.queue.lock().await;
121        self.try_fill_queue(&mut *queue).await?;
122        loop {
123            if queue.has_remaining() {
124                if !f(Pack::decode(&mut *queue)?) {
125                    break;
126                }
127            } else {
128                self.try_fill_queue(&mut *queue).await?;
129                if !queue.has_remaining() {
130                    break;
131                }
132            }
133        }
134        Ok(())
135    }
136
137    /// Wait for one message from the other side, and return it when
138    /// it is available.
139    pub async fn recv_one<R: Pack + 'static>(&self) -> Result<R> {
140        let mut queue = self.queue.lock().await;
141        self.fill_queue(&mut *queue).await?;
142        Ok(<R as Pack>::decode(&mut *queue)?)
143    }
144
145    /// Receive a message if one is available, otherwise return
146    /// Ok(None).
147    pub async fn try_recv_one<R: Pack + 'static>(&self) -> Result<Option<R>> {
148        let mut queue = self.queue.lock().await;
149        self.try_fill_queue(&mut *queue).await?;
150        if queue.remaining() > 0 {
151            Ok(Some(Pack::decode(&mut *queue)?))
152        } else {
153            Ok(None)
154        }
155    }
156
157    /// Return the user connected to this channel, if known
158    pub fn user(&self) -> Option<UserInfo> {
159        self.inner.user()
160    }
161}
162
163/// A single pending connection. You must call wait_connected to
164/// finish the handshake.
165pub struct Singleton(server::Singleton);
166
167impl Singleton {
168    /// Wait for the client to connect and complete the handshake
169    pub async fn wait_connected(&mut self) -> Result<Connection> {
170        let inner = self.0.wait_connected().await?;
171        Ok(Connection {
172            inner,
173            buf: Mutex::new(BytesMut::new()),
174            queue: AsyncMutex::new(Bytes::new()),
175        })
176    }
177}
178
179/// Create a single waiting connection at the specified path. This
180/// will allow one client to connect and form a bidirectional channel.
181pub async fn singleton(
182    publisher: &Publisher,
183    timeout: Option<Duration>,
184    path: Path,
185) -> Result<Singleton> {
186    Ok(Singleton(server::singleton(publisher, timeout, path).await?))
187}
188
189pub async fn singleton_with_flags(
190    publisher: &Publisher,
191    flags: PublishFlags,
192    timeout: Option<Duration>,
193    path: Path,
194) -> Result<Singleton> {
195    Ok(Singleton(server::singleton_with_flags(publisher, flags, timeout, path).await?))
196}
197
198/// A listener can accept connections from muliple clients and produce
199/// a channel to talk to each one.
200pub struct Listener(server::Listener);
201
202impl Listener {
203    pub async fn new(
204        publisher: &Publisher,
205        timeout: Option<Duration>,
206        path: Path,
207    ) -> Result<Self> {
208        let inner = server::Listener::new(publisher, timeout, path).await?;
209        Ok(Self(inner))
210    }
211
212    pub async fn new_with_flags(
213        publisher: &Publisher,
214        flags: PublishFlags,
215        timeout: Option<Duration>,
216        path: Path,
217    ) -> Result<Self> {
218        let inner =
219            server::Listener::new_with_flags(publisher, flags, timeout, path).await?;
220        Ok(Self(inner))
221    }
222
223    /// Wait for a client to connect, and return a singleton
224    /// connection to the new client.
225    pub async fn accept(&mut self) -> Result<Connection> {
226        let inner = self.0.accept().await?;
227        Ok(Connection {
228            inner,
229            buf: Mutex::new(BytesMut::new()),
230            queue: AsyncMutex::new(Bytes::new()),
231        })
232    }
233}