Struct RedisError

Source
pub struct RedisError {
    pub error: RedisErrorKind,
    /* private fields */
}

Fields§

§error: RedisErrorKind

Implementations§

Source§

impl RedisError

Source

pub fn new(error: RedisErrorKind, desc: String) -> RedisError

Examples found in repository?
examples/stream_consumer.rs (lines 26-27)
21    fn from_redis_value(value: &RedisValue) -> RedisResult<Self> {
22        match value {
23            RedisValue::BulkString(data) => {
24                let string = String::from_utf8(data.clone())
25                    .map_err(|err|
26                        RedisError::new(RedisErrorKind::ParseError,
27                                        format!("Could not parse message: {}", err))
28                    )?;
29                // Construct a Message from received data
30                Ok(Message(string))
31            }
32            _ => Err(RedisError::new(RedisErrorKind::ParseError,
33                                     format!("Could not parse message from invalid RedisValue {:?}", value)))
34        }
35    }
36}
37
38impl Message {
39    /// Tries to convert a Message from HashMap<String, RedisValue> (represents a Redis Stream entry).
40    /// The entry should have the following structure:
41    /// "type Message data \"Some data\""
42    fn from_redis_stream_entry(key_values: &HashMap<String, RedisValue>) -> RedisResult<Self> {
43        if key_values.len() != 2 {
44            return Err(RedisError::new(RedisErrorKind::ParseError,
45                                       "Invalid packet".to_string()));
46        }
47
48        let get_value =
49            |key: &str| match key_values.get(key) {
50                Some(x) => Ok(x),
51                _ => Err(RedisError::new(RedisErrorKind::ParseError,
52                                         "Invalid packet".to_string()))
53            };
54
55        let packet_type: String = from_redis_value(get_value("type")?)?;
56        match packet_type.as_str() {
57            "Message" => {
58                let data: Message = from_redis_value(get_value("data")?)?;
59                Ok(data)
60            }
61            _ => Err(RedisError::new(RedisErrorKind::ParseError,
62                                     "Unknown message type".to_string()))
63        }
64    }
65}
66
67fn main() {
68    println!("Consumer example has started");
69    println!("Please enter a STREAM to listen on");
70    let stream_name = read_stdin();
71    println!("Please enter a GROUP");
72    let group_name = read_stdin();
73    println!("Please enter a CONSUMER");
74    let consumer_name = read_stdin();
75
76    let redis_address = env::var("REDIS_URL")
77        .unwrap_or("127.0.0.1:6379".to_string())
78        .parse::<SocketAddr>().expect("Couldn't parse Redis URl");
79
80    let touch_options = TouchGroupOptions::new(stream_name.clone(), group_name.clone());
81
82    // Try to create a group.
83    // If the group exists already, the future will not be set into an error.
84    // The create_group variable is the Future<Item=(), Error=()>.
85    let create_group = RedisStream::connect(&redis_address)
86        .and_then(move |con|
87            // Create group if the one does not exists yet
88            con.touch_group(touch_options))
89        .then(|_| -> RedisResult<()> { Ok(()) });
90
91    // Start the consuming after the group has been checked.
92    //
93    // Note nothing will happen if the previous future has failed.
94    // The consumer variable in a result is the Future<Item=(), Error=()>.
95    // The Item and Error are required by tokio::run().
96    let consumer = create_group
97        .and_then(move |_| {
98            // Create two connections to the Redis Server:
99            // first will be used for managing (send Acknowledge request),
100            // second will be used for receive entries from Redis Server.
101            let manager = RedisStream::connect(&redis_address);
102            let consumer = RedisStream::connect(&redis_address);
103            consumer.join(manager)
104        })
105        .and_then(move |(connection, manager)| {
106            // Create an unbounded channel to allow the consumer notifies the manager
107            // about received and unacknowledged yet entries.
108            let (tx, rx) = unbounded::<EntryId>();
109
110            // Copy of stream_name and group_name to move it into ack_entry future.
111            let stream = stream_name.clone();
112            let group = group_name.clone();
113
114            let ack_entry = rx
115                .map_err(|_|
116                    RedisError::new(RedisErrorKind::InternalError,
117                                    "Something went wrong with UnboundedChannel".to_string()))
118                // Use fold() to redirect notification from the channel receiver (rx) to the manager.
119                .fold(manager, move |manager, id_to_ack|
120                    ack_stream_entry(manager, stream.clone(), group.clone(), id_to_ack))
121                .map(|_| ())
122                .map_err(|_| ());
123
124            // Spawn the ack_entry future to be handled separately from the process_entry future.
125            tokio::spawn(ack_entry);
126
127            let group = RedisGroup::new(group_name, consumer_name);
128            let options = SubscribeOptions::with_group(vec![stream_name], group);
129
130            // Subscribe to a Redis stream, processes any incoming entries and sends
131            // entry ids of success processed entries to the manager via the channel sender (tx).
132            let process_entry =
133                connection.subscribe(options)
134                    .and_then(move |subscribe|
135                        subscribe.for_each(move |entries|
136                            process_stream_entries(tx.clone(), entries)));
137            // Return and run later the process_entry future.
138            process_entry
139        })
140        .map_err(|err| eprintln!("Something went wrong: {:?}", err));
141
142    tokio::run(consumer);
143}
More examples
Hide additional examples
examples/stream_producer.rs (lines 99-100)
71fn start_producer(rx: UnboundedReceiver<Message>,
72                  stream_name: String,
73                  group_name: String,
74                  redis_address: SocketAddr) {
75    let touch_options = TouchGroupOptions::new(stream_name.clone(), group_name.clone());
76
77    // Try to create a group.
78    // If the group exists already, the future will not be set into an error.
79    // The create_group variable is the Future<Item=(), Error=()>.
80    let create_group = RedisStream::connect(&redis_address)
81        .and_then(move |con|
82            // Create group if the one does not exists yet.
83            con.touch_group(touch_options))
84        .then(|_| -> RedisResult<()> { Ok(()) });
85
86    // Creates and holds a connection to the Redis Server, waits new messages from
87    // the channel receiver (rx) and send them to a Redis stream.
88    //
89    // Note nothing will happen if the previous future has failed.
90    // The producer variable in a result is the Future<Item=(), Error=()>.
91    // The Item and Error are required by tokio::run().
92    let producer = create_group
93        .and_then(move |_| {
94            RedisStream::connect(&redis_address)
95        })
96        .and_then(move |producer| {
97            rx
98                .map_err(|_|
99                    RedisError::new(RedisErrorKind::InternalError,
100                                    "Something went wrong with UnboundedChannel".to_string()))
101                // Use fold() to redirect messages from the channel receiver (rx) to the Redis stream.
102                .fold(producer, move |producer, message| {
103                    let options = SendEntryOptions::new(stream_name.clone());
104
105                    // Serialize the message to pairs of key-value.
106                    let data = message.into_redis_stream_entry();
107
108                    producer
109                        .send_entry(options, data)
110                        .map(|(producer, added_entry_id)| {
111                            println!("{:?} has sent", added_entry_id.to_string());
112                            println!("Please enter a message");
113                            producer
114                        })
115                })
116        })
117        .map(|_| ())
118        .map_err(|err| println!("{}", err));
119
120    tokio::run(producer);
121}

Trait Implementations§

Source§

impl Clone for RedisError

Source§

fn clone(&self) -> RedisError

Returns a copy of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for RedisError

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Display for RedisError

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Error for RedisError

Source§

fn description(&self) -> &str

👎Deprecated since 1.42.0: use the Display impl or to_string()
1.30.0 · Source§

fn source(&self) -> Option<&(dyn Error + 'static)>

Returns the lower-level source of this error, if any. Read more
1.0.0 · Source§

fn cause(&self) -> Option<&dyn Error>

👎Deprecated since 1.33.0: replaced by Error::source, which can support downcasting
Source§

fn provide<'a>(&'a self, request: &mut Request<'a>)

🔬This is a nightly-only experimental API. (error_generic_member_access)
Provides type-based access to context intended for error reports. Read more
Source§

impl From<Error> for RedisError

Source§

fn from(err: Error) -> Self

Converts to this type from the input type.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T> ToString for T
where T: Display + ?Sized,

Source§

fn to_string(&self) -> String

Converts the given value to a String. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.