1#[cfg(feature = "chrono")]
2use chrono::{DateTime, SecondsFormat, Utc};
3#[cfg(not(feature = "chrono"))]
4use std::time::SystemTime;
5
6#[cfg(feature = "chrono")]
7pub type Timestamp = DateTime<Utc>;
8#[cfg(not(feature = "chrono"))]
9pub type Timestamp = SystemTime;
10
11use {
12 std::{
13 fmt::{self, Display},
14 str::FromStr,
15 time::Duration,
16 },
17 tokio_postgres::error::DbError,
18};
19
20#[derive(Debug, Clone)]
25#[cfg_attr(any(feature = "serde", test), derive(serde::Serialize))]
26pub enum PGMessage {
27 Notify(PGNotifyMessage),
28 Raise(PGRaiseMessage),
29 Reconnect {
30 timestamp: Timestamp,
31 attempts: u32,
32 max_attempts: u32,
33 },
34 Connected {
35 timestamp: Timestamp,
36 },
37 Timeout {
38 timestamp: Timestamp,
39 duration: Duration,
40 },
41 Cancelled {
42 timestamp: Timestamp,
43 success: bool,
44 },
45 FailedToReconnect {
46 timestamp: Timestamp,
47 attempts: u32,
48 },
49}
50
51impl PGMessage {
52 pub fn reconnect(attempts: u32, max_attempts: u32) -> Self {
53 Self::Reconnect {
54 timestamp: current_timestamp(),
55 attempts,
56 max_attempts,
57 }
58 }
59
60 pub fn connected() -> Self {
61 Self::Connected {
62 timestamp: current_timestamp(),
63 }
64 }
65
66 pub fn timeout(duration: Duration) -> Self {
67 Self::Timeout {
68 timestamp: current_timestamp(),
69 duration,
70 }
71 }
72
73 pub fn cancelled(success: bool) -> Self {
74 Self::Cancelled {
75 timestamp: current_timestamp(),
76 success,
77 }
78 }
79 pub fn failed_to_reconnect(attempts: u32) -> Self {
80 Self::FailedToReconnect {
81 timestamp: current_timestamp(),
82 attempts,
83 }
84 }
85}
86
87impl Display for PGMessage {
88 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89 use PGMessage::*;
90 match self {
91 Notify(m) => m.fmt(f),
92 Raise(m) => m.fmt(f),
93 Reconnect {
94 timestamp,
95 attempts,
96 max_attempts,
97 } => {
98 let ts = format_timestamp(*timestamp);
99 if *max_attempts != u32::MAX {
100 write!(
101 f,
102 "{}{:>12}: attempt #{} of {}",
103 &ts, "RECONNECT", attempts, max_attempts
104 )
105 } else {
106 write!(f, "{}{:>12}: attempt #{}", &ts, "RECONNECT", attempts)
107 }
108 }
109 Connected { timestamp } => {
110 let ts = format_timestamp(*timestamp);
111 write!(f, "{}{:>12}: connection established", &ts, "CONNECTED")
112 }
113 Timeout {
114 timestamp,
115 duration,
116 } => {
117 let ts = format_timestamp(*timestamp);
118 write!(
119 f,
120 "{}{:>12}: timeout after {}ms",
121 &ts,
122 "TIMEOUT",
123 duration.as_millis(),
124 )
125 }
126 Cancelled { timestamp, success } => {
127 let ts = format_timestamp(*timestamp);
128 let could = if *success { "" } else { "could not be " };
129 write!(
130 f,
131 "{}{:>12}: server-side query {}cancelled",
132 &ts, "CANCELLED", could
133 )
134 }
135 FailedToReconnect {
136 timestamp,
137 attempts,
138 } => {
139 let ts = format_timestamp(*timestamp);
140 write!(
141 f,
142 "{}{:>12}: failed to reconnect after {} attempts",
143 &ts, "FAILURE", attempts
144 )
145 }
146 }
147 }
148}
149
150impl From<tokio_postgres::Notification> for PGMessage {
151 fn from(note: tokio_postgres::Notification) -> Self {
152 Self::Notify(note.into())
153 }
154}
155
156impl From<tokio_postgres::error::DbError> for PGMessage {
157 fn from(raise: tokio_postgres::error::DbError) -> Self {
158 Self::Raise(raise.into())
159 }
160}
161
162#[derive(Debug, Clone)]
174#[cfg_attr(any(feature = "serde", test), derive(serde::Serialize))]
175pub struct PGNotifyMessage {
176 pub timestamp: Timestamp,
177 pub process_id: i32,
178 pub channel: String,
179 pub payload: String,
180}
181
182impl From<tokio_postgres::Notification> for PGNotifyMessage {
183 fn from(note: tokio_postgres::Notification) -> Self {
184 Self {
185 timestamp: current_timestamp(),
186 process_id: note.process_id(),
187 channel: note.channel().into(),
188 payload: note.payload().into(),
189 }
190 }
191}
192
193impl Display for PGNotifyMessage {
194 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195 let ts = format_timestamp(self.timestamp);
196 write!(
197 f,
198 "{}{:>12}: pid={} sent {}={}",
199 &ts, "NOTIFY", self.process_id, &self.channel, &self.payload
200 )
201 }
202}
203
204#[derive(Debug, Clone)]
219#[cfg_attr(any(feature = "serde", test), derive(serde::Serialize))]
220pub struct PGRaiseMessage {
221 pub timestamp: Timestamp,
222 pub level: PGRaiseLevel,
223 pub message: String,
224 #[serde(skip)]
225 pub details: DbError,
226}
227
228impl From<DbError> for PGRaiseMessage {
229 fn from(raise: DbError) -> Self {
230 PGRaiseMessage {
231 timestamp: current_timestamp(),
232 level: PGRaiseLevel::from_str(raise.severity()).unwrap_or(PGRaiseLevel::Error),
233 message: raise.message().into(),
234 details: raise,
235 }
236 }
237}
238
239impl Display for PGRaiseMessage {
240 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241 #[cfg(feature = "chrono")]
242 let ts = self.timestamp.to_rfc3339_opts(SecondsFormat::Millis, true);
243
244 #[cfg(not(feature = "chrono"))]
245 let ts = {
246 let duration = self
247 .timestamp
248 .duration_since(SystemTime::UNIX_EPOCH)
249 .unwrap();
250 let millis = duration.as_millis();
251 format!("{}", millis)
252 };
253
254 write!(f, "{}{:>12}: {}", &ts, &self.level.as_ref(), self.message)
255 }
256}
257
258#[derive(Debug, Clone, Copy)]
259#[cfg_attr(any(feature = "serde", test), derive(serde::Serialize))]
260#[cfg_attr(any(feature = "serde", test), serde(rename_all = "UPPERCASE"))]
261pub enum PGRaiseLevel {
262 Debug,
263 Log,
264 Info,
265 Notice,
266 Warning,
267 Error,
268 Fatal,
269 Panic,
270}
271
272impl AsRef<str> for PGRaiseLevel {
273 fn as_ref(&self) -> &str {
274 use PGRaiseLevel::*;
275 match self {
276 Debug => "DEBUG",
277 Log => "LOG",
278 Info => "INFO",
279 Notice => "NOTICE",
280 Warning => "WARNING",
281 Error => "ERROR",
282 Fatal => "FATAL",
283 Panic => "PANIC",
284 }
285 }
286}
287
288impl fmt::Display for PGRaiseLevel {
289 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290 write!(f, "{}", self.as_ref())
291 }
292}
293
294impl FromStr for PGRaiseLevel {
295 type Err = ();
296 fn from_str(s: &str) -> Result<Self, Self::Err> {
297 match s {
298 "DEBUG" => Ok(PGRaiseLevel::Debug),
299 "LOG" => Ok(PGRaiseLevel::Log),
300 "INFO" => Ok(PGRaiseLevel::Info),
301 "NOTICE" => Ok(PGRaiseLevel::Notice),
302 "WARNING" => Ok(PGRaiseLevel::Warning),
303 "ERROR" => Ok(PGRaiseLevel::Error),
304 "FATAL" => Ok(PGRaiseLevel::Fatal),
305 "PANIC" => Ok(PGRaiseLevel::Panic),
306 _ => Err(()),
307 }
308 }
309}
310
311#[inline(always)]
316fn current_timestamp() -> Timestamp {
317 #[cfg(feature = "chrono")]
318 return Utc::now();
319 #[cfg(not(feature = "chrono"))]
320 return SystemTime::now();
321}
322
323fn format_timestamp(ts: Timestamp) -> String {
324 #[cfg(feature = "chrono")]
325 return ts.to_rfc3339_opts(SecondsFormat::Millis, true);
326
327 #[cfg(not(feature = "chrono"))]
328 {
329 let duration = self
330 .timestamp
331 .duration_since(SystemTime::UNIX_EPOCH)
332 .unwrap();
333 let millis = duration.as_millis();
334 return format!("{}", millis);
335 }
336}