redis_driver/clients/
monitor_stream.rs

1use crate::{
2    network::MonitorReceiver,
3    resp::{FromValue, Value},
4    Client, ClientPreparedCommand, ConnectionCommands, Error, Result,
5};
6use futures::{Stream, StreamExt};
7use std::{
8    net::SocketAddr,
9    pin::Pin,
10    task::{Context, Poll},
11};
12
13/// Stream to get [`MONITOR`](https://redis.io/commands/monitor/) command events
14/// when the stream is dropped or closed, a reset command is sent to the Redis server
15pub struct MonitorStream {
16    closed: bool,
17    receiver: MonitorReceiver,
18    client: Client,
19}
20
21impl MonitorStream {
22    pub(crate) fn new(receiver: MonitorReceiver, client: Client) -> Self {
23        Self {
24            closed: false,
25            receiver,
26            client,
27        }
28    }
29
30    pub async fn close(&mut self) -> Result<()> {
31        self.client.reset().await?;
32        self.closed = true;
33        Ok(())
34    }
35}
36
37impl Stream for MonitorStream {
38    type Item = MonitoredCommandInfo;
39
40    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
41        if self.closed {
42            Poll::Ready(None)
43        } else {
44            match self.get_mut().receiver.poll_next_unpin(cx) {
45                Poll::Ready(value) => match value {
46                    Some(value) => match value {
47                        Ok(value) => match value.into() {
48                            Ok(str) => Poll::Ready(Some(str)),
49                            Err(_) => Poll::Ready(None),
50                        },
51                        Err(_) => Poll::Ready(None),
52                    },
53                    None => Poll::Ready(None),
54                },
55                Poll::Pending => Poll::Pending,
56            }
57        }
58    }
59}
60
61impl Drop for MonitorStream {
62    fn drop(&mut self) {
63        if self.closed {
64            return;
65        }
66
67        let _result = self.client.reset().forget();
68    }
69}
70
71#[derive(Debug)]
72pub struct MonitoredCommandInfo {
73    pub unix_timestamp_millis: f64,
74    pub database: usize,
75    pub server_addr: SocketAddr,
76    pub command: String,
77    pub command_args: Vec<String>,
78}
79
80impl FromValue for MonitoredCommandInfo {
81    fn from_value(value: Value) -> Result<Self> {
82        let line: String = value.into()?;
83        let mut parts = line.split(' ');
84
85        let info = match (parts.next(), parts.next(), parts.next(), parts.next()) {
86            (Some(unix_timestamp_millis), Some(database), Some(server_addr), Some(command)) => {
87                let database = &database[1..];
88                let server_addr = &server_addr[..server_addr.len() - 1];
89                match (
90                    unix_timestamp_millis.parse::<f64>(),
91                    server_addr.parse::<SocketAddr>(),
92                    database.parse::<usize>(),
93                ) {
94                    (Ok(unix_timestamp_millis), Ok(server_addr), Ok(database)) => Some(Self {
95                        unix_timestamp_millis,
96                        database,
97                        server_addr,
98                        command: command[1..command.len() - 1].to_owned(),
99                        command_args: parts.map(|a| a[1..a.len() - 1].to_owned()).collect(),
100                    }),
101                    _ => None,
102                }
103            }
104            _ => None,
105        };
106
107        info.ok_or_else(|| Error::Client(format!("Cannot parse result from MONITOR event: {line}")))
108    }
109}