kade_proto/clients/
blocking_client.rs

1use bytes::Bytes;
2use std::time::Duration;
3use tokio::net::ToSocketAddrs;
4use tokio::runtime::Runtime;
5
6pub use crate::clients::Message;
7
8pub struct BlockingClient {
9    rt: Runtime,
10    inner: crate::clients::Client,
11}
12
13pub struct BlockingSubscriber {
14    rt: Runtime,
15    inner: crate::clients::Subscriber,
16}
17
18struct SubscriberIterator {
19    rt: Runtime,
20    inner: crate::clients::Subscriber,
21}
22
23impl BlockingClient {
24    pub fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<BlockingClient> {
25        let rt = tokio::runtime::Builder::new_current_thread().enable_all().build()?;
26
27        let inner = rt.block_on(crate::clients::Client::connect(addr))?;
28
29        Ok(BlockingClient { inner, rt })
30    }
31
32    pub fn get(&mut self, key: &str) -> crate::Result<Option<Bytes>> { self.rt.block_on(self.inner.get(key)) }
33
34    pub fn set(&mut self, key: &str, value: Bytes) -> crate::Result<()> { self.rt.block_on(self.inner.set(key, value)) }
35
36    pub fn set_expires(&mut self, key: &str, value: Bytes, expiration: Duration) -> crate::Result<()> { self.rt.block_on(self.inner.set_expires(key, value, expiration)) }
37
38    pub fn publish(&mut self, channel: &str, message: Bytes) -> crate::Result<u64> { self.rt.block_on(self.inner.publish(channel, message)) }
39
40    pub fn subscribe(self, channels: Vec<String>) -> crate::Result<BlockingSubscriber> {
41        let subscriber = self.rt.block_on(self.inner.subscribe(channels))?;
42        Ok(BlockingSubscriber { inner: subscriber, rt: self.rt })
43    }
44}
45
46impl BlockingSubscriber {
47    pub fn get_subscribed(&self) -> &[String] { self.inner.get_subscribed() }
48
49    pub fn next_message(&mut self) -> crate::Result<Option<Message>> { self.rt.block_on(self.inner.next_message()) }
50
51    pub fn into_iter(self) -> impl Iterator<Item = crate::Result<Message>> { SubscriberIterator { inner: self.inner, rt: self.rt } }
52
53    pub fn subscribe(&mut self, channels: &[String]) -> crate::Result<()> { self.rt.block_on(self.inner.subscribe(channels)) }
54
55    pub fn unsubscribe(&mut self, channels: &[String]) -> crate::Result<()> { self.rt.block_on(self.inner.unsubscribe(channels)) }
56}
57
58impl Iterator for SubscriberIterator {
59    type Item = crate::Result<Message>;
60
61    fn next(&mut self) -> Option<crate::Result<Message>> { self.rt.block_on(self.inner.next_message()).transpose() }
62}