postgres_notify/
messages.rs

1#[cfg(feature = "chrono")]
2use chrono::{DateTime, SecondsFormat, Utc};
3#[cfg(not(feature = "chrono"))]
4use std::time::SystemTime;
5
6#[cfg(feature = "chrono")]
7pub type Timestamp = DateTime<Utc>;
8#[cfg(not(feature = "chrono"))]
9pub type Timestamp = SystemTime;
10
11use {
12    std::{
13        fmt::{self, Display},
14        str::FromStr,
15        time::Duration,
16    },
17    tokio_postgres::error::DbError,
18};
19
20///
21/// Type used to represent any of the messages that can be received
22/// by the client callback.
23///
24#[derive(Debug, Clone)]
25#[cfg_attr(any(feature = "serde", test), derive(serde::Serialize))]
26pub enum PGMessage {
27    Notify(PGNotifyMessage),
28    Raise(PGRaiseMessage),
29    Reconnect {
30        timestamp: Timestamp,
31        attempts: u32,
32        max_attempts: u32,
33    },
34    Connected {
35        timestamp: Timestamp,
36    },
37    Timeout {
38        timestamp: Timestamp,
39        duration: Duration,
40    },
41    Cancelled {
42        timestamp: Timestamp,
43        success: bool,
44    },
45    FailedToReconnect {
46        timestamp: Timestamp,
47        attempts: u32,
48    },
49    Disconnected {
50        timestamp: Timestamp,
51        reason: String,
52    },
53}
54
55impl PGMessage {
56    pub fn reconnect(attempts: u32, max_attempts: u32) -> Self {
57        Self::Reconnect {
58            timestamp: current_timestamp(),
59            attempts,
60            max_attempts,
61        }
62    }
63
64    pub fn connected() -> Self {
65        Self::Connected {
66            timestamp: current_timestamp(),
67        }
68    }
69
70    pub fn timeout(duration: Duration) -> Self {
71        Self::Timeout {
72            timestamp: current_timestamp(),
73            duration,
74        }
75    }
76
77    pub fn cancelled(success: bool) -> Self {
78        Self::Cancelled {
79            timestamp: current_timestamp(),
80            success,
81        }
82    }
83    pub fn failed_to_reconnect(attempts: u32) -> Self {
84        Self::FailedToReconnect {
85            timestamp: current_timestamp(),
86            attempts,
87        }
88    }
89
90    pub fn disconnected(reason: impl Into<String>) -> Self {
91        Self::Disconnected {
92            timestamp: current_timestamp(),
93            reason: reason.into(),
94        }
95    }
96}
97
98impl Display for PGMessage {
99    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100        use PGMessage::*;
101        match self {
102            Notify(m) => m.fmt(f),
103            Raise(m) => m.fmt(f),
104            Reconnect {
105                timestamp,
106                attempts,
107                max_attempts,
108            } => {
109                let ts = format_timestamp(*timestamp);
110                if *max_attempts != u32::MAX {
111                    write!(
112                        f,
113                        "{}{:>12}: attempt #{} of {}",
114                        &ts, "RECONNECT", attempts, max_attempts
115                    )
116                } else {
117                    write!(f, "{}{:>12}: attempt #{}", &ts, "RECONNECT", attempts)
118                }
119            }
120            Connected { timestamp } => {
121                let ts = format_timestamp(*timestamp);
122                write!(f, "{}{:>12}: connection established", &ts, "CONNECTED")
123            }
124            Timeout {
125                timestamp,
126                duration,
127            } => {
128                let ts = format_timestamp(*timestamp);
129                write!(
130                    f,
131                    "{}{:>12}: timeout after {}ms",
132                    &ts,
133                    "TIMEOUT",
134                    duration.as_millis(),
135                )
136            }
137            Cancelled { timestamp, success } => {
138                let ts = format_timestamp(*timestamp);
139                let could = if *success { "" } else { "could not be " };
140                write!(
141                    f,
142                    "{}{:>12}: server-side query {}cancelled",
143                    &ts, "CANCELLED", could
144                )
145            }
146            FailedToReconnect {
147                timestamp,
148                attempts,
149            } => {
150                let ts = format_timestamp(*timestamp);
151                write!(
152                    f,
153                    "{}{:>12}: failed to reconnect after {} attempts",
154                    &ts, "FAILURE", attempts
155                )
156            }
157            Disconnected { timestamp, reason } => {
158                let ts = format_timestamp(*timestamp);
159                write!(f, "{}{:>12}: {}", &ts, "DISCONNECTED", reason)
160            }
161        }
162    }
163}
164
165impl From<tokio_postgres::Notification> for PGMessage {
166    fn from(note: tokio_postgres::Notification) -> Self {
167        Self::Notify(note.into())
168    }
169}
170
171impl From<tokio_postgres::error::DbError> for PGMessage {
172    fn from(raise: tokio_postgres::error::DbError) -> Self {
173        Self::Raise(raise.into())
174    }
175}
176
177///
178/// Message received when a `NOTIFY [channel] [payload]` is issued on PostgreSQL.
179///
180/// Postgres notifications contain a string payload on a named channel.
181/// It also specifies the server-side process id of the notifying process.
182/// To this we add a timestamp, which is either an [`chrono::DateTime<Utc>`]
183/// or [`std::time::SystemTime`] depending on the `chrono` feature flag.
184///
185/// More details on the postgres notification can be found
186/// [here](https://www.postgresql.org/docs/current/sql-notify.html).
187///
188#[derive(Debug, Clone)]
189#[cfg_attr(any(feature = "serde", test), derive(serde::Serialize))]
190pub struct PGNotifyMessage {
191    pub timestamp: Timestamp,
192    pub process_id: i32,
193    pub channel: String,
194    pub payload: String,
195}
196
197impl From<tokio_postgres::Notification> for PGNotifyMessage {
198    fn from(note: tokio_postgres::Notification) -> Self {
199        Self {
200            timestamp: current_timestamp(),
201            process_id: note.process_id(),
202            channel: note.channel().into(),
203            payload: note.payload().into(),
204        }
205    }
206}
207
208impl Display for PGNotifyMessage {
209    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210        let ts = format_timestamp(self.timestamp);
211        write!(
212            f,
213            "{}{:>12}: pid={} sent {}={}",
214            &ts, "NOTIFY", self.process_id, &self.channel, &self.payload
215        )
216    }
217}
218
219///
220/// # Message received when a `raise <level> <message>` is issued on PostgreSQL.
221///
222/// Postgres logs are created by issuing `RAISE <level> <message>` commands
223/// within your functions, stored procedures and scripts. When such a command is
224/// issued, [`PGNotifyingClient`] receives a notification even if the call is in
225/// progress, which allows the caller to capture the execution log in realtime.
226///
227/// Here we extract the level and message fields but all of the detailed
228/// location information can be found in the `details` field.
229///
230/// More details on how to raise log messages can be found
231/// (here)[https://www.postgresql.org/docs/current/plpgsql-errors-and-messages.html].
232///
233#[derive(Debug, Clone)]
234#[cfg_attr(any(feature = "serde", test), derive(serde::Serialize))]
235pub struct PGRaiseMessage {
236    pub timestamp: Timestamp,
237    pub level: PGRaiseLevel,
238    pub message: String,
239    #[cfg_attr(any(feature = "serde", test), serde(skip))]
240    pub details: DbError,
241}
242
243impl From<DbError> for PGRaiseMessage {
244    fn from(raise: DbError) -> Self {
245        PGRaiseMessage {
246            timestamp: current_timestamp(),
247            level: PGRaiseLevel::from_str(raise.severity()).unwrap_or(PGRaiseLevel::Error),
248            message: raise.message().into(),
249            details: raise,
250        }
251    }
252}
253
254impl Display for PGRaiseMessage {
255    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
256        #[cfg(feature = "chrono")]
257        let ts = self.timestamp.to_rfc3339_opts(SecondsFormat::Millis, true);
258
259        #[cfg(not(feature = "chrono"))]
260        let ts = {
261            let duration = self
262                .timestamp
263                .duration_since(SystemTime::UNIX_EPOCH)
264                .unwrap();
265            let millis = duration.as_millis();
266            format!("{}", millis)
267        };
268
269        write!(f, "{}{:>12}: {}", &ts, &self.level.as_ref(), self.message)
270    }
271}
272
273#[derive(Debug, Clone, Copy)]
274#[cfg_attr(any(feature = "serde", test), derive(serde::Serialize))]
275#[cfg_attr(any(feature = "serde", test), serde(rename_all = "UPPERCASE"))]
276pub enum PGRaiseLevel {
277    Debug,
278    Log,
279    Info,
280    Notice,
281    Warning,
282    Error,
283    Fatal,
284    Panic,
285}
286
287impl AsRef<str> for PGRaiseLevel {
288    fn as_ref(&self) -> &str {
289        use PGRaiseLevel::*;
290        match self {
291            Debug => "DEBUG",
292            Log => "LOG",
293            Info => "INFO",
294            Notice => "NOTICE",
295            Warning => "WARNING",
296            Error => "ERROR",
297            Fatal => "FATAL",
298            Panic => "PANIC",
299        }
300    }
301}
302
303impl fmt::Display for PGRaiseLevel {
304    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
305        write!(f, "{}", self.as_ref())
306    }
307}
308
309impl FromStr for PGRaiseLevel {
310    type Err = ();
311    fn from_str(s: &str) -> Result<Self, Self::Err> {
312        match s {
313            "DEBUG" => Ok(PGRaiseLevel::Debug),
314            "LOG" => Ok(PGRaiseLevel::Log),
315            "INFO" => Ok(PGRaiseLevel::Info),
316            "NOTICE" => Ok(PGRaiseLevel::Notice),
317            "WARNING" => Ok(PGRaiseLevel::Warning),
318            "ERROR" => Ok(PGRaiseLevel::Error),
319            "FATAL" => Ok(PGRaiseLevel::Fatal),
320            "PANIC" => Ok(PGRaiseLevel::Panic),
321            _ => Err(()),
322        }
323    }
324}
325
326///
327/// Returns the current time either as a DateTime<Utc> or SystemTime
328/// depending on the `chrono` feature flag.
329///
330#[inline(always)]
331fn current_timestamp() -> Timestamp {
332    #[cfg(feature = "chrono")]
333    return Utc::now();
334    #[cfg(not(feature = "chrono"))]
335    return SystemTime::now();
336}
337
338fn format_timestamp(ts: Timestamp) -> String {
339    #[cfg(feature = "chrono")]
340    return ts.to_rfc3339_opts(SecondsFormat::Millis, true);
341
342    #[cfg(not(feature = "chrono"))]
343    {
344        let duration = ts
345            .duration_since(SystemTime::UNIX_EPOCH)
346            .unwrap();
347        let millis = duration.as_millis();
348        return format!("{}", millis);
349    }
350}