embedded_redis/subscription/
mod.rs

1//! # Subscription client
2//!
3//! This crates supports subscribing to one or multiple channels. (s. [Redis Pub/Sub](https://redis.io/docs/manual/pubsub/)).
4//!
5//! A regular client can be turned to a [Subscription] in the following way.
6//!
7//! ```
8//!# use core::str::FromStr;
9//!# use core::net::SocketAddr;
10//!# use std_embedded_nal::Stack;
11//!# use std_embedded_time::StandardClock;
12//!# use embedded_redis::network::ConnectionHandler;
13//!#
14//!# let mut stack = Stack::default();
15//!# let clock = StandardClock::default();
16//!#
17//!# let server_address = SocketAddr::from_str("127.0.0.1:6379").unwrap();
18//!# let mut connection_handler = ConnectionHandler::resp3(server_address);
19//! let client = connection_handler
20//!                 .connect(&mut stack, Some(&clock)).unwrap()
21//!                 .subscribe(["first_channel".into(), "second_channel".into()])
22//!                 .unwrap();
23//! ```
24//!
25//! If the subscriptions fails, it's recommended to close the connection, as a the
26//! state is undefined. A further reuse of the connection could cause subsequent errors.
27//!
28//! ## Receiving messages
29//!
30//! Messages can be received using the `receive()` method. Which returns [Some(Message)](Message) in case a message is pending.
31//!
32//! ```
33//!# use core::str::FromStr;
34//!# use std::{thread, time};
35//!# use core::net::SocketAddr;
36//!# use std_embedded_nal::Stack;
37//!# use std_embedded_time::StandardClock;
38//!# use embedded_redis::network::ConnectionHandler;
39//!#
40//!# thread::spawn(|| {
41//!#     let mut stack = Stack::default();
42//!#     let clock = StandardClock::default();
43//!#
44//!#     let server_address = SocketAddr::from_str("127.0.0.1:6379").unwrap();
45//!#     let mut connection_handler = ConnectionHandler::resp3(server_address);
46//!#     let mut  client = connection_handler.connect(&mut stack, Some(&clock)).unwrap();
47//!#
48//!#     loop {
49//!#         client.publish("first_channel", "example payload").unwrap().wait().unwrap();
50//!#         thread::sleep(time::Duration::from_millis(10));
51//!#     }
52//!# });
53//!#
54//!# let mut stack = Stack::default();
55//!# let clock = StandardClock::default();
56//!#
57//!# let server_address = SocketAddr::from_str("127.0.0.1:6379").unwrap();
58//!# let mut connection_handler = ConnectionHandler::resp3(server_address);
59//!# let mut  client = connection_handler
60//!#                 .connect(&mut stack, Some(&clock)).unwrap()
61//!#                 .subscribe(["first_channel".into(), "second_channel".into()])
62//!#                 .unwrap();
63//!#
64//! loop {
65//!     let message = client.receive().unwrap();
66//!
67//!     if let Some(message) = message {
68//!         assert_eq!("first_channel", core::str::from_utf8(&message.channel[..]).unwrap());
69//!         assert_eq!("example payload", core::str::from_utf8(&message.payload[..]).unwrap());
70//!         break;
71//!     }
72//! }
73//! ```
74//!
75//! ## Unsubscribing
76//!
77//! To leave a clean connection state, unsubscribe from all channels at the end.
78//!
79//! ```
80//!# use core::str::FromStr;
81//!# use core::net::SocketAddr;
82//!# use std_embedded_nal::Stack;
83//!# use std_embedded_time::StandardClock;
84//!# use embedded_redis::network::ConnectionHandler;
85//!#
86//!# let mut stack = Stack::default();
87//!# let clock = StandardClock::default();
88//!#
89//!# let server_address = SocketAddr::from_str("127.0.0.1:6379").unwrap();
90//!# let mut connection_handler = ConnectionHandler::resp3(server_address);
91//!# let client = connection_handler
92//!#                 .connect(&mut stack, Some(&clock)).unwrap()
93//!#                 .subscribe(["first_channel".into(), "second_channel".into()])
94//!#                 .unwrap();
95//!#
96//! client.unsubscribe().unwrap();
97//! ```
98//!
99//! *Note: `unsubscribe()` is called automatically when the client is dropped*
100pub use client::{Error, Message, Subscription};
101
102pub(crate) mod client;
103pub(crate) mod messages;
104
105#[cfg(test)]
106mod tests;