1#[cfg(feature = "chrono")]
2use chrono::{DateTime, SecondsFormat, Utc};
3#[cfg(not(feature = "chrono"))]
4use std::time::SystemTime;
5
6#[cfg(feature = "chrono")]
7pub type Timestamp = DateTime<Utc>;
8#[cfg(not(feature = "chrono"))]
9pub type Timestamp = SystemTime;
10
11use {
12 std::{
13 fmt::{self, Display},
14 str::FromStr,
15 time::Duration,
16 },
17 tokio_postgres::error::DbError,
18};
19
20#[derive(Debug, Clone)]
25#[cfg_attr(any(feature = "serde", test), derive(serde::Serialize))]
26pub enum PGMessage {
27 Notify(PGNotifyMessage),
28 Raise(PGRaiseMessage),
29 Reconnect {
30 timestamp: Timestamp,
31 attempts: u32,
32 max_attempts: u32,
33 },
34 Timeout {
35 timestamp: Timestamp,
36 duration: Duration,
37 },
38 Cancelled {
39 timestamp: Timestamp,
40 success: bool,
41 },
42 FailedToReconnect {
43 timestamp: Timestamp,
44 attempts: u32,
45 },
46}
47
48impl PGMessage {
49 pub fn reconnect(attempts: u32, max_attempts: u32) -> Self {
50 Self::Reconnect {
51 timestamp: current_timestamp(),
52 attempts,
53 max_attempts,
54 }
55 }
56
57 pub fn timeout(duration: Duration) -> Self {
58 Self::Timeout {
59 timestamp: current_timestamp(),
60 duration,
61 }
62 }
63
64 pub fn cancelled(success: bool) -> Self {
65 Self::Cancelled {
66 timestamp: current_timestamp(),
67 success,
68 }
69 }
70 pub fn failed_to_reconnect(attempts: u32) -> Self {
71 Self::FailedToReconnect {
72 timestamp: current_timestamp(),
73 attempts,
74 }
75 }
76}
77
78impl Display for PGMessage {
79 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
80 use PGMessage::*;
81 match self {
82 Notify(m) => m.fmt(f),
83 Raise(m) => m.fmt(f),
84 Reconnect {
85 timestamp,
86 attempts,
87 max_attempts,
88 } => {
89 let ts = format_timestamp(*timestamp);
90 if *max_attempts != u32::MAX {
91 write!(
92 f,
93 "{}{:>12}: attempt #{} of {}",
94 &ts, "RECONNECT", attempts, max_attempts
95 )
96 } else {
97 write!(f, "{}{:>12}: attempt #{}", &ts, "RECONNECT", attempts)
98 }
99 }
100 Timeout {
101 timestamp,
102 duration,
103 } => {
104 let ts = format_timestamp(*timestamp);
105 write!(
106 f,
107 "{}{:>12}: timeout after {}ms",
108 &ts,
109 "TIMEOUT",
110 duration.as_millis(),
111 )
112 }
113 Cancelled { timestamp, success } => {
114 let ts = format_timestamp(*timestamp);
115 let could = if *success { "" } else { "could not be " };
116 write!(
117 f,
118 "{}{:>12}: server-side query {}cancelled",
119 &ts, "CANCELLED", could
120 )
121 }
122 FailedToReconnect {
123 timestamp,
124 attempts,
125 } => {
126 let ts = format_timestamp(*timestamp);
127 write!(
128 f,
129 "{}{:>12}: failed to reconnect after {} attempts",
130 &ts, "FAILURE", attempts
131 )
132 }
133 }
134 }
135}
136
137impl From<tokio_postgres::Notification> for PGMessage {
138 fn from(note: tokio_postgres::Notification) -> Self {
139 Self::Notify(note.into())
140 }
141}
142
143impl From<tokio_postgres::error::DbError> for PGMessage {
144 fn from(raise: tokio_postgres::error::DbError) -> Self {
145 Self::Raise(raise.into())
146 }
147}
148
149#[derive(Debug, Clone)]
161#[cfg_attr(any(feature = "serde", test), derive(serde::Serialize))]
162pub struct PGNotifyMessage {
163 pub timestamp: Timestamp,
164 pub process_id: i32,
165 pub channel: String,
166 pub payload: String,
167}
168
169impl From<tokio_postgres::Notification> for PGNotifyMessage {
170 fn from(note: tokio_postgres::Notification) -> Self {
171 Self {
172 timestamp: current_timestamp(),
173 process_id: note.process_id(),
174 channel: note.channel().into(),
175 payload: note.payload().into(),
176 }
177 }
178}
179
180impl Display for PGNotifyMessage {
181 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
182 let ts = format_timestamp(self.timestamp);
183 write!(
184 f,
185 "{}{:>12}: pid={} sent {}={}",
186 &ts, "NOTIFY", self.process_id, &self.channel, &self.payload
187 )
188 }
189}
190
191#[derive(Debug, Clone)]
206#[cfg_attr(any(feature = "serde", test), derive(serde::Serialize))]
207pub struct PGRaiseMessage {
208 pub timestamp: Timestamp,
209 pub level: PGRaiseLevel,
210 pub message: String,
211 #[serde(skip)]
212 pub details: DbError,
213}
214
215impl From<DbError> for PGRaiseMessage {
216 fn from(raise: DbError) -> Self {
217 PGRaiseMessage {
218 timestamp: current_timestamp(),
219 level: PGRaiseLevel::from_str(raise.severity()).unwrap_or(PGRaiseLevel::Error),
220 message: raise.message().into(),
221 details: raise,
222 }
223 }
224}
225
226impl Display for PGRaiseMessage {
227 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228 #[cfg(feature = "chrono")]
229 let ts = self.timestamp.to_rfc3339_opts(SecondsFormat::Millis, true);
230
231 #[cfg(not(feature = "chrono"))]
232 let ts = {
233 let duration = self
234 .timestamp
235 .duration_since(SystemTime::UNIX_EPOCH)
236 .unwrap();
237 let millis = duration.as_millis();
238 format!("{}", millis)
239 };
240
241 write!(f, "{}{:>12}: {}", &ts, &self.level.as_ref(), self.message)
242 }
243}
244
245#[derive(Debug, Clone, Copy)]
246#[cfg_attr(any(feature = "serde", test), derive(serde::Serialize))]
247#[cfg_attr(any(feature = "serde", test), serde(rename_all = "UPPERCASE"))]
248pub enum PGRaiseLevel {
249 Debug,
250 Log,
251 Info,
252 Notice,
253 Warning,
254 Error,
255 Fatal,
256 Panic,
257}
258
259impl AsRef<str> for PGRaiseLevel {
260 fn as_ref(&self) -> &str {
261 use PGRaiseLevel::*;
262 match self {
263 Debug => "DEBUG",
264 Log => "LOG",
265 Info => "INFO",
266 Notice => "NOTICE",
267 Warning => "WARNING",
268 Error => "ERROR",
269 Fatal => "FATAL",
270 Panic => "PANIC",
271 }
272 }
273}
274
275impl fmt::Display for PGRaiseLevel {
276 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
277 write!(f, "{}", self.as_ref())
278 }
279}
280
281impl FromStr for PGRaiseLevel {
282 type Err = ();
283 fn from_str(s: &str) -> Result<Self, Self::Err> {
284 match s {
285 "DEBUG" => Ok(PGRaiseLevel::Debug),
286 "LOG" => Ok(PGRaiseLevel::Log),
287 "INFO" => Ok(PGRaiseLevel::Info),
288 "NOTICE" => Ok(PGRaiseLevel::Notice),
289 "WARNING" => Ok(PGRaiseLevel::Warning),
290 "ERROR" => Ok(PGRaiseLevel::Error),
291 "FATAL" => Ok(PGRaiseLevel::Fatal),
292 "PANIC" => Ok(PGRaiseLevel::Panic),
293 _ => Err(()),
294 }
295 }
296}
297
298#[inline(always)]
303fn current_timestamp() -> Timestamp {
304 #[cfg(feature = "chrono")]
305 return Utc::now();
306 #[cfg(not(feature = "chrono"))]
307 return SystemTime::now();
308}
309
310fn format_timestamp(ts: Timestamp) -> String {
311 #[cfg(feature = "chrono")]
312 return ts.to_rfc3339_opts(SecondsFormat::Millis, true);
313
314 #[cfg(not(feature = "chrono"))]
315 {
316 let duration = self
317 .timestamp
318 .duration_since(SystemTime::UNIX_EPOCH)
319 .unwrap();
320 let millis = duration.as_millis();
321 return format!("{}", millis);
322 }
323}