postgres_notify/
messages.rs

1#[cfg(feature = "chrono")]
2use chrono::{DateTime, SecondsFormat, Utc};
3#[cfg(not(feature = "chrono"))]
4use std::time::SystemTime;
5
6#[cfg(feature = "chrono")]
7pub type Timestamp = DateTime<Utc>;
8#[cfg(not(feature = "chrono"))]
9pub type Timestamp = SystemTime;
10
11use {
12    std::{
13        fmt::{self, Display},
14        str::FromStr,
15        time::Duration,
16    },
17    tokio_postgres::error::DbError,
18};
19
20///
21/// Type used to represent any of the messages that can be received
22/// by the client callback.
23///
24#[derive(Debug, Clone)]
25#[cfg_attr(any(feature = "serde", test), derive(serde::Serialize))]
26pub enum PGMessage {
27    Notify(PGNotifyMessage),
28    Raise(PGRaiseMessage),
29    Reconnect {
30        timestamp: Timestamp,
31        attempts: u32,
32        max_attempts: u32,
33    },
34    Timeout {
35        timestamp: Timestamp,
36        duration: Duration,
37    },
38    Cancelled {
39        timestamp: Timestamp,
40        success: bool,
41    },
42    FailedToReconnect {
43        timestamp: Timestamp,
44        attempts: u32,
45    },
46}
47
48impl PGMessage {
49    pub fn reconnect(attempts: u32, max_attempts: u32) -> Self {
50        Self::Reconnect {
51            timestamp: current_timestamp(),
52            attempts,
53            max_attempts,
54        }
55    }
56
57    pub fn timeout(duration: Duration) -> Self {
58        Self::Timeout {
59            timestamp: current_timestamp(),
60            duration,
61        }
62    }
63
64    pub fn cancelled(success: bool) -> Self {
65        Self::Cancelled {
66            timestamp: current_timestamp(),
67            success,
68        }
69    }
70    pub fn failed_to_reconnect(attempts: u32) -> Self {
71        Self::FailedToReconnect {
72            timestamp: current_timestamp(),
73            attempts,
74        }
75    }
76}
77
78impl Display for PGMessage {
79    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80        use PGMessage::*;
81        match self {
82            Notify(m) => m.fmt(f),
83            Raise(m) => m.fmt(f),
84            Reconnect {
85                timestamp,
86                attempts,
87                max_attempts,
88            } => {
89                let ts = format_timestamp(*timestamp);
90                if *max_attempts != u32::MAX {
91                    write!(
92                        f,
93                        "{}{:>12}: attempt #{} of {}",
94                        &ts, "RECONNECT", attempts, max_attempts
95                    )
96                } else {
97                    write!(f, "{}{:>12}: attempt #{}", &ts, "RECONNECT", attempts)
98                }
99            }
100            Timeout {
101                timestamp,
102                duration,
103            } => {
104                let ts = format_timestamp(*timestamp);
105                write!(
106                    f,
107                    "{}{:>12}: timeout after {}ms",
108                    &ts,
109                    "TIMEOUT",
110                    duration.as_millis(),
111                )
112            }
113            Cancelled { timestamp, success } => {
114                let ts = format_timestamp(*timestamp);
115                let could = if *success { "" } else { "could not be " };
116                write!(
117                    f,
118                    "{}{:>12}: server-side query {}cancelled",
119                    &ts, "CANCELLED", could
120                )
121            }
122            FailedToReconnect {
123                timestamp,
124                attempts,
125            } => {
126                let ts = format_timestamp(*timestamp);
127                write!(
128                    f,
129                    "{}{:>12}: failed to reconnect after {} attempts",
130                    &ts, "FAILURE", attempts
131                )
132            }
133        }
134    }
135}
136
137impl From<tokio_postgres::Notification> for PGMessage {
138    fn from(note: tokio_postgres::Notification) -> Self {
139        Self::Notify(note.into())
140    }
141}
142
143impl From<tokio_postgres::error::DbError> for PGMessage {
144    fn from(raise: tokio_postgres::error::DbError) -> Self {
145        Self::Raise(raise.into())
146    }
147}
148
149///
150/// Message received when a `NOTIFY [channel] [payload]` is issued on PostgreSQL.
151///
152/// Postgres notifications contain a string payload on a named channel.
153/// It also specifies the server-side process id of the notifying process.
154/// To this we add a timestamp, which is either an [`chrono::DateTime<Utc>`]
155/// or [`std::time::SystemTime`] depending on the `chrono` feature flag.
156///
157/// More details on the postgres notification can be found
158/// [here](https://www.postgresql.org/docs/current/sql-notify.html).
159///
160#[derive(Debug, Clone)]
161#[cfg_attr(any(feature = "serde", test), derive(serde::Serialize))]
162pub struct PGNotifyMessage {
163    pub timestamp: Timestamp,
164    pub process_id: i32,
165    pub channel: String,
166    pub payload: String,
167}
168
169impl From<tokio_postgres::Notification> for PGNotifyMessage {
170    fn from(note: tokio_postgres::Notification) -> Self {
171        Self {
172            timestamp: current_timestamp(),
173            process_id: note.process_id(),
174            channel: note.channel().into(),
175            payload: note.payload().into(),
176        }
177    }
178}
179
180impl Display for PGNotifyMessage {
181    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
182        let ts = format_timestamp(self.timestamp);
183        write!(
184            f,
185            "{}{:>12}: pid={} sent {}={}",
186            &ts, "NOTIFY", self.process_id, &self.channel, &self.payload
187        )
188    }
189}
190
191///
192/// # Message received when a `raise <level> <message>` is issued on PostgreSQL.
193///
194/// Postgres logs are created by issuing `RAISE <level> <message>` commands
195/// within your functions, stored procedures and scripts. When such a command is
196/// issued, [`PGNotifyingClient`] receives a notification even if the call is in
197/// progress, which allows the caller to capture the execution log in realtime.
198///
199/// Here we extract the level and message fields but all of the detailed
200/// location information can be found in the `details` field.
201///
202/// More details on how to raise log messages can be found
203/// (here)[https://www.postgresql.org/docs/current/plpgsql-errors-and-messages.html].
204///
205#[derive(Debug, Clone)]
206#[cfg_attr(any(feature = "serde", test), derive(serde::Serialize))]
207pub struct PGRaiseMessage {
208    pub timestamp: Timestamp,
209    pub level: PGRaiseLevel,
210    pub message: String,
211    #[serde(skip)]
212    pub details: DbError,
213}
214
215impl From<DbError> for PGRaiseMessage {
216    fn from(raise: DbError) -> Self {
217        PGRaiseMessage {
218            timestamp: current_timestamp(),
219            level: PGRaiseLevel::from_str(raise.severity()).unwrap_or(PGRaiseLevel::Error),
220            message: raise.message().into(),
221            details: raise,
222        }
223    }
224}
225
226impl Display for PGRaiseMessage {
227    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228        #[cfg(feature = "chrono")]
229        let ts = self.timestamp.to_rfc3339_opts(SecondsFormat::Millis, true);
230
231        #[cfg(not(feature = "chrono"))]
232        let ts = {
233            let duration = self
234                .timestamp
235                .duration_since(SystemTime::UNIX_EPOCH)
236                .unwrap();
237            let millis = duration.as_millis();
238            format!("{}", millis)
239        };
240
241        write!(f, "{}{:>12}: {}", &ts, &self.level.as_ref(), self.message)
242    }
243}
244
245#[derive(Debug, Clone, Copy)]
246#[cfg_attr(any(feature = "serde", test), derive(serde::Serialize))]
247#[cfg_attr(any(feature = "serde", test), serde(rename_all = "UPPERCASE"))]
248pub enum PGRaiseLevel {
249    Debug,
250    Log,
251    Info,
252    Notice,
253    Warning,
254    Error,
255    Fatal,
256    Panic,
257}
258
259impl AsRef<str> for PGRaiseLevel {
260    fn as_ref(&self) -> &str {
261        use PGRaiseLevel::*;
262        match self {
263            Debug => "DEBUG",
264            Log => "LOG",
265            Info => "INFO",
266            Notice => "NOTICE",
267            Warning => "WARNING",
268            Error => "ERROR",
269            Fatal => "FATAL",
270            Panic => "PANIC",
271        }
272    }
273}
274
275impl fmt::Display for PGRaiseLevel {
276    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
277        write!(f, "{}", self.as_ref())
278    }
279}
280
281impl FromStr for PGRaiseLevel {
282    type Err = ();
283    fn from_str(s: &str) -> Result<Self, Self::Err> {
284        match s {
285            "DEBUG" => Ok(PGRaiseLevel::Debug),
286            "LOG" => Ok(PGRaiseLevel::Log),
287            "INFO" => Ok(PGRaiseLevel::Info),
288            "NOTICE" => Ok(PGRaiseLevel::Notice),
289            "WARNING" => Ok(PGRaiseLevel::Warning),
290            "ERROR" => Ok(PGRaiseLevel::Error),
291            "FATAL" => Ok(PGRaiseLevel::Fatal),
292            "PANIC" => Ok(PGRaiseLevel::Panic),
293            _ => Err(()),
294        }
295    }
296}
297
298///
299/// Returns the current time either as a DateTime<Utc> or SystemTime
300/// depending on the `chrono` feature flag.
301///
302#[inline(always)]
303fn current_timestamp() -> Timestamp {
304    #[cfg(feature = "chrono")]
305    return Utc::now();
306    #[cfg(not(feature = "chrono"))]
307    return SystemTime::now();
308}
309
310fn format_timestamp(ts: Timestamp) -> String {
311    #[cfg(feature = "chrono")]
312    return ts.to_rfc3339_opts(SecondsFormat::Millis, true);
313
314    #[cfg(not(feature = "chrono"))]
315    {
316        let duration = self
317            .timestamp
318            .duration_since(SystemTime::UNIX_EPOCH)
319            .unwrap();
320        let millis = duration.as_millis();
321        return format!("{}", millis);
322    }
323}