1#[cfg(feature = "chrono")]
2use chrono::{DateTime, SecondsFormat, Utc};
3#[cfg(not(feature = "chrono"))]
4use std::time::SystemTime;
5
6#[cfg(feature = "chrono")]
7pub type Timestamp = DateTime<Utc>;
8#[cfg(not(feature = "chrono"))]
9pub type Timestamp = SystemTime;
10
11use {
12 std::{
13 fmt::{self, Display},
14 str::FromStr,
15 time::Duration,
16 },
17 tokio_postgres::error::DbError,
18};
19
20#[derive(Debug, Clone)]
25#[cfg_attr(any(feature = "serde", test), derive(serde::Serialize))]
26pub enum PGMessage {
27 Notify(PGNotifyMessage),
28 Raise(PGRaiseMessage),
29 Reconnect {
30 timestamp: Timestamp,
31 attempts: u32,
32 max_attempts: u32,
33 },
34 Connected {
35 timestamp: Timestamp,
36 },
37 Timeout {
38 timestamp: Timestamp,
39 duration: Duration,
40 },
41 Cancelled {
42 timestamp: Timestamp,
43 success: bool,
44 },
45 FailedToReconnect {
46 timestamp: Timestamp,
47 attempts: u32,
48 },
49 Disconnected {
50 timestamp: Timestamp,
51 reason: String,
52 },
53}
54
55impl PGMessage {
56 pub fn reconnect(attempts: u32, max_attempts: u32) -> Self {
57 Self::Reconnect {
58 timestamp: current_timestamp(),
59 attempts,
60 max_attempts,
61 }
62 }
63
64 pub fn connected() -> Self {
65 Self::Connected {
66 timestamp: current_timestamp(),
67 }
68 }
69
70 pub fn timeout(duration: Duration) -> Self {
71 Self::Timeout {
72 timestamp: current_timestamp(),
73 duration,
74 }
75 }
76
77 pub fn cancelled(success: bool) -> Self {
78 Self::Cancelled {
79 timestamp: current_timestamp(),
80 success,
81 }
82 }
83 pub fn failed_to_reconnect(attempts: u32) -> Self {
84 Self::FailedToReconnect {
85 timestamp: current_timestamp(),
86 attempts,
87 }
88 }
89
90 pub fn disconnected(reason: impl Into<String>) -> Self {
91 Self::Disconnected {
92 timestamp: current_timestamp(),
93 reason: reason.into(),
94 }
95 }
96}
97
98impl Display for PGMessage {
99 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100 use PGMessage::*;
101 match self {
102 Notify(m) => m.fmt(f),
103 Raise(m) => m.fmt(f),
104 Reconnect {
105 timestamp,
106 attempts,
107 max_attempts,
108 } => {
109 let ts = format_timestamp(*timestamp);
110 if *max_attempts != u32::MAX {
111 write!(
112 f,
113 "{}{:>12}: attempt #{} of {}",
114 &ts, "RECONNECT", attempts, max_attempts
115 )
116 } else {
117 write!(f, "{}{:>12}: attempt #{}", &ts, "RECONNECT", attempts)
118 }
119 }
120 Connected { timestamp } => {
121 let ts = format_timestamp(*timestamp);
122 write!(f, "{}{:>12}: connection established", &ts, "CONNECTED")
123 }
124 Timeout {
125 timestamp,
126 duration,
127 } => {
128 let ts = format_timestamp(*timestamp);
129 write!(
130 f,
131 "{}{:>12}: timeout after {}ms",
132 &ts,
133 "TIMEOUT",
134 duration.as_millis(),
135 )
136 }
137 Cancelled { timestamp, success } => {
138 let ts = format_timestamp(*timestamp);
139 let could = if *success { "" } else { "could not be " };
140 write!(
141 f,
142 "{}{:>12}: server-side query {}cancelled",
143 &ts, "CANCELLED", could
144 )
145 }
146 FailedToReconnect {
147 timestamp,
148 attempts,
149 } => {
150 let ts = format_timestamp(*timestamp);
151 write!(
152 f,
153 "{}{:>12}: failed to reconnect after {} attempts",
154 &ts, "FAILURE", attempts
155 )
156 }
157 Disconnected { timestamp, reason } => {
158 let ts = format_timestamp(*timestamp);
159 write!(f, "{}{:>12}: {}", &ts, "DISCONNECTED", reason)
160 }
161 }
162 }
163}
164
165impl From<tokio_postgres::Notification> for PGMessage {
166 fn from(note: tokio_postgres::Notification) -> Self {
167 Self::Notify(note.into())
168 }
169}
170
171impl From<tokio_postgres::error::DbError> for PGMessage {
172 fn from(raise: tokio_postgres::error::DbError) -> Self {
173 Self::Raise(raise.into())
174 }
175}
176
177#[derive(Debug, Clone)]
189#[cfg_attr(any(feature = "serde", test), derive(serde::Serialize))]
190pub struct PGNotifyMessage {
191 pub timestamp: Timestamp,
192 pub process_id: i32,
193 pub channel: String,
194 pub payload: String,
195}
196
197impl From<tokio_postgres::Notification> for PGNotifyMessage {
198 fn from(note: tokio_postgres::Notification) -> Self {
199 Self {
200 timestamp: current_timestamp(),
201 process_id: note.process_id(),
202 channel: note.channel().into(),
203 payload: note.payload().into(),
204 }
205 }
206}
207
208impl Display for PGNotifyMessage {
209 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210 let ts = format_timestamp(self.timestamp);
211 write!(
212 f,
213 "{}{:>12}: pid={} sent {}={}",
214 &ts, "NOTIFY", self.process_id, &self.channel, &self.payload
215 )
216 }
217}
218
219#[derive(Debug, Clone)]
234#[cfg_attr(any(feature = "serde", test), derive(serde::Serialize))]
235pub struct PGRaiseMessage {
236 pub timestamp: Timestamp,
237 pub level: PGRaiseLevel,
238 pub message: String,
239 #[cfg_attr(any(feature = "serde", test), serde(skip))]
240 pub details: DbError,
241}
242
243impl From<DbError> for PGRaiseMessage {
244 fn from(raise: DbError) -> Self {
245 PGRaiseMessage {
246 timestamp: current_timestamp(),
247 level: PGRaiseLevel::from_str(raise.severity()).unwrap_or(PGRaiseLevel::Error),
248 message: raise.message().into(),
249 details: raise,
250 }
251 }
252}
253
254impl Display for PGRaiseMessage {
255 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
256 #[cfg(feature = "chrono")]
257 let ts = self.timestamp.to_rfc3339_opts(SecondsFormat::Millis, true);
258
259 #[cfg(not(feature = "chrono"))]
260 let ts = {
261 let duration = self
262 .timestamp
263 .duration_since(SystemTime::UNIX_EPOCH)
264 .unwrap();
265 let millis = duration.as_millis();
266 format!("{}", millis)
267 };
268
269 write!(f, "{}{:>12}: {}", &ts, &self.level.as_ref(), self.message)
270 }
271}
272
273#[derive(Debug, Clone, Copy)]
274#[cfg_attr(any(feature = "serde", test), derive(serde::Serialize))]
275#[cfg_attr(any(feature = "serde", test), serde(rename_all = "UPPERCASE"))]
276pub enum PGRaiseLevel {
277 Debug,
278 Log,
279 Info,
280 Notice,
281 Warning,
282 Error,
283 Fatal,
284 Panic,
285}
286
287impl AsRef<str> for PGRaiseLevel {
288 fn as_ref(&self) -> &str {
289 use PGRaiseLevel::*;
290 match self {
291 Debug => "DEBUG",
292 Log => "LOG",
293 Info => "INFO",
294 Notice => "NOTICE",
295 Warning => "WARNING",
296 Error => "ERROR",
297 Fatal => "FATAL",
298 Panic => "PANIC",
299 }
300 }
301}
302
303impl fmt::Display for PGRaiseLevel {
304 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
305 write!(f, "{}", self.as_ref())
306 }
307}
308
309impl FromStr for PGRaiseLevel {
310 type Err = ();
311 fn from_str(s: &str) -> Result<Self, Self::Err> {
312 match s {
313 "DEBUG" => Ok(PGRaiseLevel::Debug),
314 "LOG" => Ok(PGRaiseLevel::Log),
315 "INFO" => Ok(PGRaiseLevel::Info),
316 "NOTICE" => Ok(PGRaiseLevel::Notice),
317 "WARNING" => Ok(PGRaiseLevel::Warning),
318 "ERROR" => Ok(PGRaiseLevel::Error),
319 "FATAL" => Ok(PGRaiseLevel::Fatal),
320 "PANIC" => Ok(PGRaiseLevel::Panic),
321 _ => Err(()),
322 }
323 }
324}
325
326#[inline(always)]
331fn current_timestamp() -> Timestamp {
332 #[cfg(feature = "chrono")]
333 return Utc::now();
334 #[cfg(not(feature = "chrono"))]
335 return SystemTime::now();
336}
337
338fn format_timestamp(ts: Timestamp) -> String {
339 #[cfg(feature = "chrono")]
340 return ts.to_rfc3339_opts(SecondsFormat::Millis, true);
341
342 #[cfg(not(feature = "chrono"))]
343 {
344 let duration = ts
345 .duration_since(SystemTime::UNIX_EPOCH)
346 .unwrap();
347 let millis = duration.as_millis();
348 return format!("{}", millis);
349 }
350}