1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
//! Minimal blocking MTProto client implementation
//!
//! Provides a blocking connect and methods for issuing the supported commands.
use bytes::Bytes;
use std::time::Duration;
use tokio::net::ToSocketAddrs;
use tokio::runtime::Runtime;
pub use crate::client::Message;
/// Established connection with a MTProto server.
///
/// Backed by a single `TcpStream`, `BlockingClient` provides basic network
/// client functionality (no pooling, retrying, ...). Connections are
/// established using the [`connect`](fn@connect) function.
///
/// Requests are issued using the various methods of `Client`.
pub struct BlockingClient {
/// The asynchronous `Client`.
inner: crate::client::Client,
/// A `current_thread` runtime for executing operations on the asynchronous
/// client in a blocking manner.
rt: Runtime,
}
/// A client that has entered pub/sub mode.
///
/// Once clients subscribe to a channel, they may only perform pub/sub related
/// commands. The `BlockingClient` type is transitioned to a
/// `BlockingSubscriber` type in order to prevent non-pub/sub methods from being
/// called.
pub struct BlockingSubscriber {
/// The asynchronous `Subscriber`.
inner: crate::client::Subscriber,
/// A `current_thread` runtime for executing operations on the asynchronous
/// `Subscriber` in a blocking manner.
rt: Runtime,
}
/// The iterator returned by `Subscriber::into_iter`.
struct SubscriberIterator {
/// The asynchronous `Subscriber`.
inner: crate::client::Subscriber,
/// A `current_thread` runtime for executing operations on the asynchronous
/// `Subscriber` in a blocking manner.
rt: Runtime,
}
/// Establish a connection with the MTProt server located at `addr`.
///
/// `addr` may be any type that can be asynchronously converted to a
/// `SocketAddr`. This includes `SocketAddr` and strings. The `ToSocketAddrs`
/// trait is the Tokio version and not the `std` version.
///
/// # Examples
///
/// ```no_run
/// use mini_telegram::blocking_client;
///
/// fn main() {
/// let client = match blocking_client::connect("localhost:6379") {
/// Ok(client) => client,
/// Err(_) => panic!("failed to establish connection"),
/// };
/// # drop(client);
/// }
/// ```
pub fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<BlockingClient> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let inner = rt.block_on(crate::client::connect(addr))?;
Ok(BlockingClient { inner, rt })
}
impl BlockingClient {
/// Get the value of key.
///
/// If the key does not exist the special value `None` is returned.
///
/// # Examples
///
/// Demonstrates basic usage.
///
/// ```no_run
/// use mini_telegram::blocking_client;
///
/// fn main() {
/// let mut client = blocking_client::connect("localhost:6379").unwrap();
///
/// let val = client.get("foo").unwrap();
/// println!("Got = {:?}", val);
/// }
/// ```
pub fn get(&mut self, key: &str) -> crate::Result<Option<Bytes>> {
self.rt.block_on(self.inner.get(key))
}
/// Set `key` to hold the given `value`.
///
/// The `value` is associated with `key` until it is overwritten by the next
/// call to `set` or it is removed.
///
/// If key already holds a value, it is overwritten. Any previous time to
/// live associated with the key is discarded on successful SET operation.
///
/// # Examples
///
/// Demonstrates basic usage.
///
/// ```no_run
/// use mini_telegram::blocking_client;
///
/// fn main() {
/// let mut client = blocking_client::connect("localhost:6379").unwrap();
///
/// client.set("foo", "bar".into()).unwrap();
///
/// // Getting the value immediately works
/// let val = client.get("foo").unwrap().unwrap();
/// assert_eq!(val, "bar");
/// }
/// ```
pub fn set(&mut self, key: &str, value: Bytes) -> crate::Result<()> {
self.rt.block_on(self.inner.set(key, value))
}
/// Set `key` to hold the given `value`. The value expires after `expiration`
///
/// The `value` is associated with `key` until one of the following:
/// - it expires.
/// - it is overwritten by the next call to `set`.
/// - it is removed.
///
/// If key already holds a value, it is overwritten. Any previous time to
/// live associated with the key is discarded on a successful SET operation.
///
/// # Examples
///
/// Demonstrates basic usage. This example is not **guaranteed** to always
/// work as it relies on time based logic and assumes the client and server
/// stay relatively synchronized in time. The real world tends to not be so
/// favorable.
///
/// ```no_run
/// use mini_telegram::blocking_client;
/// use std::thread;
/// use std::time::Duration;
///
/// fn main() {
/// let ttl = Duration::from_millis(500);
/// let mut client = blocking_client::connect("localhost:6379").unwrap();
///
/// client.set_expires("foo", "bar".into(), ttl).unwrap();
///
/// // Getting the value immediately works
/// let val = client.get("foo").unwrap().unwrap();
/// assert_eq!(val, "bar");
///
/// // Wait for the TTL to expire
/// thread::sleep(ttl);
///
/// let val = client.get("foo").unwrap();
/// assert!(val.is_some());
/// }
/// ```
pub fn set_expires(
&mut self,
key: &str,
value: Bytes,
expiration: Duration,
) -> crate::Result<()> {
self.rt
.block_on(self.inner.set_expires(key, value, expiration))
}
/// Posts `message` to the given `channel`.
///
/// Returns the number of subscribers currently listening on the channel.
/// There is no guarantee that these subscribers receive the message as they
/// may disconnect at any time.
///
/// # Examples
///
/// Demonstrates basic usage.
///
/// ```no_run
/// use mini_telegram::blocking_client;
///
/// fn main() {
/// let mut client = blocking_client::connect("localhost:6379").unwrap();
///
/// let val = client.publish("foo", "bar".into()).unwrap();
/// println!("Got = {:?}", val);
/// }
/// ```
pub fn publish(&mut self, channel: &str, message: Bytes) -> crate::Result<u64> {
self.rt.block_on(self.inner.publish(channel, message))
}
/// Subscribes the client to the specified channels.
///
/// Once a client issues a subscribe command, it may no longer issue any
/// non-pub/sub commands. The function consumes `self` and returns a
/// `BlockingSubscriber`.
///
/// The `BlockingSubscriber` value is used to receive messages as well as
/// manage the list of channels the client is subscribed to.
pub fn subscribe(self, channels: Vec<String>) -> crate::Result<BlockingSubscriber> {
let subscriber = self.rt.block_on(self.inner.subscribe(channels))?;
Ok(BlockingSubscriber {
inner: subscriber,
rt: self.rt,
})
}
}
impl BlockingSubscriber {
/// Returns the set of channels currently subscribed to.
pub fn get_subscribed(&self) -> &[String] {
self.inner.get_subscribed()
}
/// Receive the next message published on a subscribed channel, waiting if
/// necessary.
///
/// `None` indicates the subscription has been terminated.
pub fn next_message(&mut self) -> crate::Result<Option<Message>> {
self.rt.block_on(self.inner.next_message())
}
/// Convert the subscriber into an `Iterator` yielding new messages published
/// on subscribed channels.
pub fn into_iter(self) -> impl Iterator<Item = crate::Result<Message>> {
SubscriberIterator {
inner: self.inner,
rt: self.rt,
}
}
/// Subscribe to a list of new channels
pub fn subscribe(&mut self, channels: &[String]) -> crate::Result<()> {
self.rt.block_on(self.inner.subscribe(channels))
}
/// Unsubscribe to a list of new channels
pub fn unsubscribe(&mut self, channels: &[String]) -> crate::Result<()> {
self.rt.block_on(self.inner.unsubscribe(channels))
}
}
impl Iterator for SubscriberIterator {
type Item = crate::Result<Message>;
fn next(&mut self) -> Option<crate::Result<Message>> {
self.rt.block_on(self.inner.next_message()).transpose()
}
}