kade_proto/clients/
blocking_client.rs1use bytes::Bytes;
2use std::time::Duration;
3use tokio::net::ToSocketAddrs;
4use tokio::runtime::Runtime;
5
6pub use crate::clients::Message;
7
8pub struct BlockingClient {
9 rt: Runtime,
10 inner: crate::clients::Client,
11}
12
13pub struct BlockingSubscriber {
14 rt: Runtime,
15 inner: crate::clients::Subscriber,
16}
17
18struct SubscriberIterator {
19 rt: Runtime,
20 inner: crate::clients::Subscriber,
21}
22
23impl BlockingClient {
24 pub fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<BlockingClient> {
25 let rt = tokio::runtime::Builder::new_current_thread().enable_all().build()?;
26
27 let inner = rt.block_on(crate::clients::Client::connect(addr))?;
28
29 Ok(BlockingClient { inner, rt })
30 }
31
32 pub fn get(&mut self, key: &str) -> crate::Result<Option<Bytes>> { self.rt.block_on(self.inner.get(key)) }
33
34 pub fn set(&mut self, key: &str, value: Bytes) -> crate::Result<()> { self.rt.block_on(self.inner.set(key, value)) }
35
36 pub fn set_expires(&mut self, key: &str, value: Bytes, expiration: Duration) -> crate::Result<()> { self.rt.block_on(self.inner.set_expires(key, value, expiration)) }
37
38 pub fn publish(&mut self, channel: &str, message: Bytes) -> crate::Result<u64> { self.rt.block_on(self.inner.publish(channel, message)) }
39
40 pub fn subscribe(self, channels: Vec<String>) -> crate::Result<BlockingSubscriber> {
41 let subscriber = self.rt.block_on(self.inner.subscribe(channels))?;
42 Ok(BlockingSubscriber { inner: subscriber, rt: self.rt })
43 }
44}
45
46impl BlockingSubscriber {
47 pub fn get_subscribed(&self) -> &[String] { self.inner.get_subscribed() }
48
49 pub fn next_message(&mut self) -> crate::Result<Option<Message>> { self.rt.block_on(self.inner.next_message()) }
50
51 pub fn into_iter(self) -> impl Iterator<Item = crate::Result<Message>> { SubscriberIterator { inner: self.inner, rt: self.rt } }
52
53 pub fn subscribe(&mut self, channels: &[String]) -> crate::Result<()> { self.rt.block_on(self.inner.subscribe(channels)) }
54
55 pub fn unsubscribe(&mut self, channels: &[String]) -> crate::Result<()> { self.rt.block_on(self.inner.unsubscribe(channels)) }
56}
57
58impl Iterator for SubscriberIterator {
59 type Item = crate::Result<Message>;
60
61 fn next(&mut self) -> Option<crate::Result<Message>> { self.rt.block_on(self.inner.next_message()).transpose() }
62}