postgres_notify/
messages.rs

1#[cfg(feature = "chrono")]
2use chrono::{DateTime, SecondsFormat, Utc};
3#[cfg(not(feature = "chrono"))]
4use std::time::SystemTime;
5
6#[cfg(feature = "chrono")]
7pub type Timestamp = DateTime<Utc>;
8#[cfg(not(feature = "chrono"))]
9pub type Timestamp = SystemTime;
10
11use {
12    std::{
13        fmt::{self, Display},
14        str::FromStr,
15        time::Duration,
16    },
17    tokio_postgres::error::DbError,
18};
19
20///
21/// Type used to represent any of the messages that can be received
22/// by the client callback.
23///
24#[derive(Debug, Clone)]
25#[cfg_attr(any(feature = "serde", test), derive(serde::Serialize))]
26pub enum PGMessage {
27    Notify(PGNotifyMessage),
28    Raise(PGRaiseMessage),
29    Reconnect {
30        timestamp: Timestamp,
31        attempts: u32,
32        max_attempts: u32,
33    },
34    Connected {
35        timestamp: Timestamp,
36    },
37    Timeout {
38        timestamp: Timestamp,
39        duration: Duration,
40    },
41    Cancelled {
42        timestamp: Timestamp,
43        success: bool,
44    },
45    FailedToReconnect {
46        timestamp: Timestamp,
47        attempts: u32,
48    },
49}
50
51impl PGMessage {
52    pub fn reconnect(attempts: u32, max_attempts: u32) -> Self {
53        Self::Reconnect {
54            timestamp: current_timestamp(),
55            attempts,
56            max_attempts,
57        }
58    }
59
60    pub fn connected() -> Self {
61        Self::Connected {
62            timestamp: current_timestamp(),
63        }
64    }
65
66    pub fn timeout(duration: Duration) -> Self {
67        Self::Timeout {
68            timestamp: current_timestamp(),
69            duration,
70        }
71    }
72
73    pub fn cancelled(success: bool) -> Self {
74        Self::Cancelled {
75            timestamp: current_timestamp(),
76            success,
77        }
78    }
79    pub fn failed_to_reconnect(attempts: u32) -> Self {
80        Self::FailedToReconnect {
81            timestamp: current_timestamp(),
82            attempts,
83        }
84    }
85}
86
87impl Display for PGMessage {
88    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89        use PGMessage::*;
90        match self {
91            Notify(m) => m.fmt(f),
92            Raise(m) => m.fmt(f),
93            Reconnect {
94                timestamp,
95                attempts,
96                max_attempts,
97            } => {
98                let ts = format_timestamp(*timestamp);
99                if *max_attempts != u32::MAX {
100                    write!(
101                        f,
102                        "{}{:>12}: attempt #{} of {}",
103                        &ts, "RECONNECT", attempts, max_attempts
104                    )
105                } else {
106                    write!(f, "{}{:>12}: attempt #{}", &ts, "RECONNECT", attempts)
107                }
108            }
109            Connected { timestamp } => {
110                let ts = format_timestamp(*timestamp);
111                write!(f, "{}{:>12}: connection established", &ts, "CONNECTED")
112            }
113            Timeout {
114                timestamp,
115                duration,
116            } => {
117                let ts = format_timestamp(*timestamp);
118                write!(
119                    f,
120                    "{}{:>12}: timeout after {}ms",
121                    &ts,
122                    "TIMEOUT",
123                    duration.as_millis(),
124                )
125            }
126            Cancelled { timestamp, success } => {
127                let ts = format_timestamp(*timestamp);
128                let could = if *success { "" } else { "could not be " };
129                write!(
130                    f,
131                    "{}{:>12}: server-side query {}cancelled",
132                    &ts, "CANCELLED", could
133                )
134            }
135            FailedToReconnect {
136                timestamp,
137                attempts,
138            } => {
139                let ts = format_timestamp(*timestamp);
140                write!(
141                    f,
142                    "{}{:>12}: failed to reconnect after {} attempts",
143                    &ts, "FAILURE", attempts
144                )
145            }
146        }
147    }
148}
149
150impl From<tokio_postgres::Notification> for PGMessage {
151    fn from(note: tokio_postgres::Notification) -> Self {
152        Self::Notify(note.into())
153    }
154}
155
156impl From<tokio_postgres::error::DbError> for PGMessage {
157    fn from(raise: tokio_postgres::error::DbError) -> Self {
158        Self::Raise(raise.into())
159    }
160}
161
162///
163/// Message received when a `NOTIFY [channel] [payload]` is issued on PostgreSQL.
164///
165/// Postgres notifications contain a string payload on a named channel.
166/// It also specifies the server-side process id of the notifying process.
167/// To this we add a timestamp, which is either an [`chrono::DateTime<Utc>`]
168/// or [`std::time::SystemTime`] depending on the `chrono` feature flag.
169///
170/// More details on the postgres notification can be found
171/// [here](https://www.postgresql.org/docs/current/sql-notify.html).
172///
173#[derive(Debug, Clone)]
174#[cfg_attr(any(feature = "serde", test), derive(serde::Serialize))]
175pub struct PGNotifyMessage {
176    pub timestamp: Timestamp,
177    pub process_id: i32,
178    pub channel: String,
179    pub payload: String,
180}
181
182impl From<tokio_postgres::Notification> for PGNotifyMessage {
183    fn from(note: tokio_postgres::Notification) -> Self {
184        Self {
185            timestamp: current_timestamp(),
186            process_id: note.process_id(),
187            channel: note.channel().into(),
188            payload: note.payload().into(),
189        }
190    }
191}
192
193impl Display for PGNotifyMessage {
194    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195        let ts = format_timestamp(self.timestamp);
196        write!(
197            f,
198            "{}{:>12}: pid={} sent {}={}",
199            &ts, "NOTIFY", self.process_id, &self.channel, &self.payload
200        )
201    }
202}
203
204///
205/// # Message received when a `raise <level> <message>` is issued on PostgreSQL.
206///
207/// Postgres logs are created by issuing `RAISE <level> <message>` commands
208/// within your functions, stored procedures and scripts. When such a command is
209/// issued, [`PGNotifyingClient`] receives a notification even if the call is in
210/// progress, which allows the caller to capture the execution log in realtime.
211///
212/// Here we extract the level and message fields but all of the detailed
213/// location information can be found in the `details` field.
214///
215/// More details on how to raise log messages can be found
216/// (here)[https://www.postgresql.org/docs/current/plpgsql-errors-and-messages.html].
217///
218#[derive(Debug, Clone)]
219#[cfg_attr(any(feature = "serde", test), derive(serde::Serialize))]
220pub struct PGRaiseMessage {
221    pub timestamp: Timestamp,
222    pub level: PGRaiseLevel,
223    pub message: String,
224    #[serde(skip)]
225    pub details: DbError,
226}
227
228impl From<DbError> for PGRaiseMessage {
229    fn from(raise: DbError) -> Self {
230        PGRaiseMessage {
231            timestamp: current_timestamp(),
232            level: PGRaiseLevel::from_str(raise.severity()).unwrap_or(PGRaiseLevel::Error),
233            message: raise.message().into(),
234            details: raise,
235        }
236    }
237}
238
239impl Display for PGRaiseMessage {
240    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241        #[cfg(feature = "chrono")]
242        let ts = self.timestamp.to_rfc3339_opts(SecondsFormat::Millis, true);
243
244        #[cfg(not(feature = "chrono"))]
245        let ts = {
246            let duration = self
247                .timestamp
248                .duration_since(SystemTime::UNIX_EPOCH)
249                .unwrap();
250            let millis = duration.as_millis();
251            format!("{}", millis)
252        };
253
254        write!(f, "{}{:>12}: {}", &ts, &self.level.as_ref(), self.message)
255    }
256}
257
258#[derive(Debug, Clone, Copy)]
259#[cfg_attr(any(feature = "serde", test), derive(serde::Serialize))]
260#[cfg_attr(any(feature = "serde", test), serde(rename_all = "UPPERCASE"))]
261pub enum PGRaiseLevel {
262    Debug,
263    Log,
264    Info,
265    Notice,
266    Warning,
267    Error,
268    Fatal,
269    Panic,
270}
271
272impl AsRef<str> for PGRaiseLevel {
273    fn as_ref(&self) -> &str {
274        use PGRaiseLevel::*;
275        match self {
276            Debug => "DEBUG",
277            Log => "LOG",
278            Info => "INFO",
279            Notice => "NOTICE",
280            Warning => "WARNING",
281            Error => "ERROR",
282            Fatal => "FATAL",
283            Panic => "PANIC",
284        }
285    }
286}
287
288impl fmt::Display for PGRaiseLevel {
289    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290        write!(f, "{}", self.as_ref())
291    }
292}
293
294impl FromStr for PGRaiseLevel {
295    type Err = ();
296    fn from_str(s: &str) -> Result<Self, Self::Err> {
297        match s {
298            "DEBUG" => Ok(PGRaiseLevel::Debug),
299            "LOG" => Ok(PGRaiseLevel::Log),
300            "INFO" => Ok(PGRaiseLevel::Info),
301            "NOTICE" => Ok(PGRaiseLevel::Notice),
302            "WARNING" => Ok(PGRaiseLevel::Warning),
303            "ERROR" => Ok(PGRaiseLevel::Error),
304            "FATAL" => Ok(PGRaiseLevel::Fatal),
305            "PANIC" => Ok(PGRaiseLevel::Panic),
306            _ => Err(()),
307        }
308    }
309}
310
311///
312/// Returns the current time either as a DateTime<Utc> or SystemTime
313/// depending on the `chrono` feature flag.
314///
315#[inline(always)]
316fn current_timestamp() -> Timestamp {
317    #[cfg(feature = "chrono")]
318    return Utc::now();
319    #[cfg(not(feature = "chrono"))]
320    return SystemTime::now();
321}
322
323fn format_timestamp(ts: Timestamp) -> String {
324    #[cfg(feature = "chrono")]
325    return ts.to_rfc3339_opts(SecondsFormat::Millis, true);
326
327    #[cfg(not(feature = "chrono"))]
328    {
329        let duration = self
330            .timestamp
331            .duration_since(SystemTime::UNIX_EPOCH)
332            .unwrap();
333        let millis = duration.as_millis();
334        return format!("{}", millis);
335    }
336}