redis_driver/clients/
monitor_stream.rs1use crate::{
2 network::MonitorReceiver,
3 resp::{FromValue, Value},
4 Client, ClientPreparedCommand, ConnectionCommands, Error, Result,
5};
6use futures::{Stream, StreamExt};
7use std::{
8 net::SocketAddr,
9 pin::Pin,
10 task::{Context, Poll},
11};
12
13pub struct MonitorStream {
16 closed: bool,
17 receiver: MonitorReceiver,
18 client: Client,
19}
20
21impl MonitorStream {
22 pub(crate) fn new(receiver: MonitorReceiver, client: Client) -> Self {
23 Self {
24 closed: false,
25 receiver,
26 client,
27 }
28 }
29
30 pub async fn close(&mut self) -> Result<()> {
31 self.client.reset().await?;
32 self.closed = true;
33 Ok(())
34 }
35}
36
37impl Stream for MonitorStream {
38 type Item = MonitoredCommandInfo;
39
40 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
41 if self.closed {
42 Poll::Ready(None)
43 } else {
44 match self.get_mut().receiver.poll_next_unpin(cx) {
45 Poll::Ready(value) => match value {
46 Some(value) => match value {
47 Ok(value) => match value.into() {
48 Ok(str) => Poll::Ready(Some(str)),
49 Err(_) => Poll::Ready(None),
50 },
51 Err(_) => Poll::Ready(None),
52 },
53 None => Poll::Ready(None),
54 },
55 Poll::Pending => Poll::Pending,
56 }
57 }
58 }
59}
60
61impl Drop for MonitorStream {
62 fn drop(&mut self) {
63 if self.closed {
64 return;
65 }
66
67 let _result = self.client.reset().forget();
68 }
69}
70
71#[derive(Debug)]
72pub struct MonitoredCommandInfo {
73 pub unix_timestamp_millis: f64,
74 pub database: usize,
75 pub server_addr: SocketAddr,
76 pub command: String,
77 pub command_args: Vec<String>,
78}
79
80impl FromValue for MonitoredCommandInfo {
81 fn from_value(value: Value) -> Result<Self> {
82 let line: String = value.into()?;
83 let mut parts = line.split(' ');
84
85 let info = match (parts.next(), parts.next(), parts.next(), parts.next()) {
86 (Some(unix_timestamp_millis), Some(database), Some(server_addr), Some(command)) => {
87 let database = &database[1..];
88 let server_addr = &server_addr[..server_addr.len() - 1];
89 match (
90 unix_timestamp_millis.parse::<f64>(),
91 server_addr.parse::<SocketAddr>(),
92 database.parse::<usize>(),
93 ) {
94 (Ok(unix_timestamp_millis), Ok(server_addr), Ok(database)) => Some(Self {
95 unix_timestamp_millis,
96 database,
97 server_addr,
98 command: command[1..command.len() - 1].to_owned(),
99 command_args: parts.map(|a| a[1..a.len() - 1].to_owned()).collect(),
100 }),
101 _ => None,
102 }
103 }
104 _ => None,
105 };
106
107 info.ok_or_else(|| Error::Client(format!("Cannot parse result from MONITOR event: {line}")))
108 }
109}