pub struct RedisError {
pub error: RedisErrorKind,
/* private fields */
}
Fields§
§error: RedisErrorKind
Implementations§
Source§impl RedisError
impl RedisError
Sourcepub fn new(error: RedisErrorKind, desc: String) -> RedisError
pub fn new(error: RedisErrorKind, desc: String) -> RedisError
Examples found in repository?
examples/stream_consumer.rs (lines 26-27)
21 fn from_redis_value(value: &RedisValue) -> RedisResult<Self> {
22 match value {
23 RedisValue::BulkString(data) => {
24 let string = String::from_utf8(data.clone())
25 .map_err(|err|
26 RedisError::new(RedisErrorKind::ParseError,
27 format!("Could not parse message: {}", err))
28 )?;
29 // Construct a Message from received data
30 Ok(Message(string))
31 }
32 _ => Err(RedisError::new(RedisErrorKind::ParseError,
33 format!("Could not parse message from invalid RedisValue {:?}", value)))
34 }
35 }
36}
37
38impl Message {
39 /// Tries to convert a Message from HashMap<String, RedisValue> (represents a Redis Stream entry).
40 /// The entry should have the following structure:
41 /// "type Message data \"Some data\""
42 fn from_redis_stream_entry(key_values: &HashMap<String, RedisValue>) -> RedisResult<Self> {
43 if key_values.len() != 2 {
44 return Err(RedisError::new(RedisErrorKind::ParseError,
45 "Invalid packet".to_string()));
46 }
47
48 let get_value =
49 |key: &str| match key_values.get(key) {
50 Some(x) => Ok(x),
51 _ => Err(RedisError::new(RedisErrorKind::ParseError,
52 "Invalid packet".to_string()))
53 };
54
55 let packet_type: String = from_redis_value(get_value("type")?)?;
56 match packet_type.as_str() {
57 "Message" => {
58 let data: Message = from_redis_value(get_value("data")?)?;
59 Ok(data)
60 }
61 _ => Err(RedisError::new(RedisErrorKind::ParseError,
62 "Unknown message type".to_string()))
63 }
64 }
65}
66
67fn main() {
68 println!("Consumer example has started");
69 println!("Please enter a STREAM to listen on");
70 let stream_name = read_stdin();
71 println!("Please enter a GROUP");
72 let group_name = read_stdin();
73 println!("Please enter a CONSUMER");
74 let consumer_name = read_stdin();
75
76 let redis_address = env::var("REDIS_URL")
77 .unwrap_or("127.0.0.1:6379".to_string())
78 .parse::<SocketAddr>().expect("Couldn't parse Redis URl");
79
80 let touch_options = TouchGroupOptions::new(stream_name.clone(), group_name.clone());
81
82 // Try to create a group.
83 // If the group exists already, the future will not be set into an error.
84 // The create_group variable is the Future<Item=(), Error=()>.
85 let create_group = RedisStream::connect(&redis_address)
86 .and_then(move |con|
87 // Create group if the one does not exists yet
88 con.touch_group(touch_options))
89 .then(|_| -> RedisResult<()> { Ok(()) });
90
91 // Start the consuming after the group has been checked.
92 //
93 // Note nothing will happen if the previous future has failed.
94 // The consumer variable in a result is the Future<Item=(), Error=()>.
95 // The Item and Error are required by tokio::run().
96 let consumer = create_group
97 .and_then(move |_| {
98 // Create two connections to the Redis Server:
99 // first will be used for managing (send Acknowledge request),
100 // second will be used for receive entries from Redis Server.
101 let manager = RedisStream::connect(&redis_address);
102 let consumer = RedisStream::connect(&redis_address);
103 consumer.join(manager)
104 })
105 .and_then(move |(connection, manager)| {
106 // Create an unbounded channel to allow the consumer notifies the manager
107 // about received and unacknowledged yet entries.
108 let (tx, rx) = unbounded::<EntryId>();
109
110 // Copy of stream_name and group_name to move it into ack_entry future.
111 let stream = stream_name.clone();
112 let group = group_name.clone();
113
114 let ack_entry = rx
115 .map_err(|_|
116 RedisError::new(RedisErrorKind::InternalError,
117 "Something went wrong with UnboundedChannel".to_string()))
118 // Use fold() to redirect notification from the channel receiver (rx) to the manager.
119 .fold(manager, move |manager, id_to_ack|
120 ack_stream_entry(manager, stream.clone(), group.clone(), id_to_ack))
121 .map(|_| ())
122 .map_err(|_| ());
123
124 // Spawn the ack_entry future to be handled separately from the process_entry future.
125 tokio::spawn(ack_entry);
126
127 let group = RedisGroup::new(group_name, consumer_name);
128 let options = SubscribeOptions::with_group(vec![stream_name], group);
129
130 // Subscribe to a Redis stream, processes any incoming entries and sends
131 // entry ids of success processed entries to the manager via the channel sender (tx).
132 let process_entry =
133 connection.subscribe(options)
134 .and_then(move |subscribe|
135 subscribe.for_each(move |entries|
136 process_stream_entries(tx.clone(), entries)));
137 // Return and run later the process_entry future.
138 process_entry
139 })
140 .map_err(|err| eprintln!("Something went wrong: {:?}", err));
141
142 tokio::run(consumer);
143}
More examples
examples/stream_producer.rs (lines 99-100)
71fn start_producer(rx: UnboundedReceiver<Message>,
72 stream_name: String,
73 group_name: String,
74 redis_address: SocketAddr) {
75 let touch_options = TouchGroupOptions::new(stream_name.clone(), group_name.clone());
76
77 // Try to create a group.
78 // If the group exists already, the future will not be set into an error.
79 // The create_group variable is the Future<Item=(), Error=()>.
80 let create_group = RedisStream::connect(&redis_address)
81 .and_then(move |con|
82 // Create group if the one does not exists yet.
83 con.touch_group(touch_options))
84 .then(|_| -> RedisResult<()> { Ok(()) });
85
86 // Creates and holds a connection to the Redis Server, waits new messages from
87 // the channel receiver (rx) and send them to a Redis stream.
88 //
89 // Note nothing will happen if the previous future has failed.
90 // The producer variable in a result is the Future<Item=(), Error=()>.
91 // The Item and Error are required by tokio::run().
92 let producer = create_group
93 .and_then(move |_| {
94 RedisStream::connect(&redis_address)
95 })
96 .and_then(move |producer| {
97 rx
98 .map_err(|_|
99 RedisError::new(RedisErrorKind::InternalError,
100 "Something went wrong with UnboundedChannel".to_string()))
101 // Use fold() to redirect messages from the channel receiver (rx) to the Redis stream.
102 .fold(producer, move |producer, message| {
103 let options = SendEntryOptions::new(stream_name.clone());
104
105 // Serialize the message to pairs of key-value.
106 let data = message.into_redis_stream_entry();
107
108 producer
109 .send_entry(options, data)
110 .map(|(producer, added_entry_id)| {
111 println!("{:?} has sent", added_entry_id.to_string());
112 println!("Please enter a message");
113 producer
114 })
115 })
116 })
117 .map(|_| ())
118 .map_err(|err| println!("{}", err));
119
120 tokio::run(producer);
121}
Trait Implementations§
Source§impl Clone for RedisError
impl Clone for RedisError
Source§fn clone(&self) -> RedisError
fn clone(&self) -> RedisError
Returns a copy of the value. Read more
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source
. Read moreSource§impl Debug for RedisError
impl Debug for RedisError
Source§impl Display for RedisError
impl Display for RedisError
Source§impl Error for RedisError
impl Error for RedisError
Source§fn description(&self) -> &str
fn description(&self) -> &str
👎Deprecated since 1.42.0: use the Display impl or to_string()
1.30.0 · Source§fn source(&self) -> Option<&(dyn Error + 'static)>
fn source(&self) -> Option<&(dyn Error + 'static)>
Returns the lower-level source of this error, if any. Read more
Auto Trait Implementations§
impl Freeze for RedisError
impl RefUnwindSafe for RedisError
impl Send for RedisError
impl Sync for RedisError
impl Unpin for RedisError
impl UnwindSafe for RedisError
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more