1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
use crate::{
network::MonitorReceiver,
resp::{FromValue, Value},
Client, ClientPreparedCommand, ConnectionCommands, Error, Result,
};
use futures::{Stream, StreamExt};
use std::{
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};
pub struct MonitorStream {
closed: bool,
receiver: MonitorReceiver,
client: Client,
}
impl MonitorStream {
pub(crate) fn new(receiver: MonitorReceiver, client: Client) -> Self {
Self {
closed: false,
receiver,
client,
}
}
pub async fn close(&mut self) -> Result<()> {
self.client.reset().await?;
self.closed = true;
Ok(())
}
}
impl Stream for MonitorStream {
type Item = MonitoredCommandInfo;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if self.closed {
Poll::Ready(None)
} else {
match self.get_mut().receiver.poll_next_unpin(cx) {
Poll::Ready(value) => match value {
Some(value) => match value {
Ok(value) => match value.into() {
Ok(str) => Poll::Ready(Some(str)),
Err(_) => Poll::Ready(None),
},
Err(_) => Poll::Ready(None),
},
None => Poll::Ready(None),
},
Poll::Pending => Poll::Pending,
}
}
}
}
impl Drop for MonitorStream {
fn drop(&mut self) {
if self.closed {
return;
}
let _result = self.client.reset().forget();
}
}
#[derive(Debug)]
pub struct MonitoredCommandInfo {
pub unix_timestamp_millis: f64,
pub database: usize,
pub server_addr: SocketAddr,
pub command: String,
pub command_args: Vec<String>,
}
impl FromValue for MonitoredCommandInfo {
fn from_value(value: Value) -> Result<Self> {
let line: String = value.into()?;
let mut parts = line.split(' ');
let info = match (parts.next(), parts.next(), parts.next(), parts.next()) {
(Some(unix_timestamp_millis), Some(database), Some(server_addr), Some(command)) => {
let database = &database[1..];
let server_addr = &server_addr[..server_addr.len() - 1];
match (
unix_timestamp_millis.parse::<f64>(),
server_addr.parse::<SocketAddr>(),
database.parse::<usize>(),
) {
(Ok(unix_timestamp_millis), Ok(server_addr), Ok(database)) => Some(Self {
unix_timestamp_millis,
database,
server_addr,
command: command[1..command.len() - 1].to_owned(),
command_args: parts.map(|a| a[1..a.len() - 1].to_owned()).collect(),
}),
_ => None,
}
}
_ => None,
};
info.ok_or_else(|| Error::Client(format!("Cannot parse result from MONITOR event: {line}")))
}
}