postgres_notify/lib.rs
1//!
2//! `postgres-notify` started out as an easy way to receive PostgreSQL
3//! notifications but has since evolved into a much more useful client
4//! and is able to handle the following:
5//!
6//! - Receive `NOTIFY <channel> <payload>` pub/sub style notifications
7//!
8//! - Receive `RAISE` messages and collects execution logs
9//!
10//! - Applies a timeout to all queries. If a query timesout then the
11//! client will attempt to cancel the ongoing query before returning
12//! an error.
13//!
14//! - Supports cancelling an ongoing query.
15//!
16//! - Automatically reconnects if the connection is lost and uses
17//! exponential backoff with jitter to avoid thundering herd effect.
18//!
19//! - Supports an `connect_script`, which can be executed on connect.
20//!
21//! - Has a familiar API with an additional `timeout` argument.
22//!
23//!
24//! # BREAKING CHANGE in v0.3.2
25//!
26//! Configuration is done through the [`PGRobustClientConfig`] struct.
27//!
28//!
29//! # BREAKING CHANGE in v0.3.0
30//!
31//! This latest version is a breaking change. The `PGNotifyingClient` has
32//! been renamed `PGRobustClient` and queries don't need to be made through
33//! the inner client anymore. Furthermore, a single callback handles all
34//! of the notifications: NOTIFY, RAISE, TIMOUT, RECONNECT.
35//!
36//!
37//!
38//! # LISTEN/NOTIFY
39//!
40//! For a very long time (at least since version 7.1) postgres has supported
41//! asynchronous notifications based on LISTEN/NOTIFY commands. This allows
42//! the database to send notifications to the client in an "out-of-band"
43//! channel.
44//!
45//! Once the client has issued a `LISTEN <channel>` command, the database will
46//! send notifications to the client whenever a `NOTIFY <channel> <payload>`
47//! is issued on the database regardless of which session has issued it.
48//! This can act as a cheap alternative to a pub/sub system though without
49//! mailboxes or persistence.
50//!
51//! When calling `subscribe_notify` with a list of channel names, [`PGRobustClient`]
52//! will the client callback any time a `NOTIFY` message is received for any of
53//! the subscribed channels.
54//!
55//! ```rust
56//! use postgres_notify::{PGRobustClientConfig, PGRobustClient, PGMessage};
57//! use tokio_postgres::NoTls;
58//! use std::time::Duration;
59//!
60//! let rt = tokio::runtime::Builder::new_current_thread()
61//! .enable_io()
62//! .enable_time()
63//! .build()
64//! .expect("could not start tokio runtime");
65//!
66//! rt.block_on(async move {
67//!
68//! let database_url = "postgres://postgres:postgres@localhost:5432/postgres";
69//! let config = PGRobustClientConfig::new(database_url, NoTls)
70//! .callback(|msg:PGMessage| println!("{:?}", &msg));
71//!
72//! let mut client = PGRobustClient::spawn(config)
73//! .await.expect("Could not connect to postgres");
74//!
75//! client.subscribe_notify(&["test"], Some(Duration::from_millis(100)))
76//! .await.expect("Could not subscribe to channels");
77//! });
78//! ```
79//!
80//!
81//!
82//! # RAISE/LOGS
83//!
84//! Logs in PostgreSQL are created by writing `RAISE <level> <message>` statements
85//! within your functions, stored procedures and scripts. When such a command is
86//! issued, [`PGRobustClient`] receives a notification even if the call is still
87//! in progress. This allows the caller to capture the execution log in realtime
88//! if needed.
89//!
90//! [`PGRobustClient`] simplifies log collection in two ways. Firstly it provides
91//! the [`with_captured_log`](PGRobustClient::with_captured_log) functions,
92//! which collects the execution log and returns it along with the query result.
93//! This is probably what most people will want to use.
94//!
95//! If your needs are more complex or if you want to propagate realtime logs,
96//! then using client callback can be used to forwand the message on an
97//! asynchonous channel.
98//!
99//! ```rust
100//! use postgres_notify::{PGRobustClient, PGRobustClientConfig, PGMessage};
101//! use tokio_postgres::NoTls;
102//! use std::time::Duration;
103//!
104//! let rt = tokio::runtime::Builder::new_current_thread()
105//! .enable_io()
106//! .enable_time()
107//! .build()
108//! .expect("could not start tokio runtime");
109//!
110//! rt.block_on(async move {
111//!
112//! let database_url = "postgres://postgres:postgres@localhost:5432/postgres";
113//! let config = PGRobustClientConfig::new(database_url, NoTls)
114//! .callback(|msg:PGMessage| println!("{:?}", &msg));
115//!
116//! let mut client = PGRobustClient::spawn(config)
117//! .await.expect("Could not connect to postgres");
118//!
119//! // Will capture the notices in a Vec
120//! let (_, log) = client.with_captured_log(async |client| {
121//! client.simple_query("
122//! do $$
123//! begin
124//! raise debug 'this is a DEBUG notification';
125//! raise log 'this is a LOG notification';
126//! raise info 'this is a INFO notification';
127//! raise notice 'this is a NOTICE notification';
128//! raise warning 'this is a WARNING notification';
129//! end;
130//! $$",
131//! Some(Duration::from_secs(1))
132//! ).await.expect("Error during query execution");
133//! Ok(())
134//! }).await.expect("Error during captur log");
135//!
136//! println!("{:#?}", &log);
137//! });
138//! ```
139//!
140//! Note that the client passed to the async callback is `&mut self`, which
141//! means that all queries within that block are subject to the same timeout
142//! and reconnect handling.
143//!
144//! You can look at the unit tests for a more in-depth example.
145//!
146//!
147//!
148//! # TIMEOUT
149//!
150//! All of the query functions in [`PGRobustClient`] have a `timeout` argument.
151//! If the query takes longer than the timeout, then an error is returned.
152//! If not specified, the default timeout is 1 hour.
153//!
154//!
155//! # RECONNECT
156//!
157//! If the connection to the database is lost, then [`PGRobustClient`] will
158//! attempt to reconnect to the database automatically. If the maximum number
159//! of reconnect attempts is reached then an error is returned. Furthermore,
160//! it uses a exponential backoff with jitter in order to avoid thundering
161//! herd effect.
162//!
163
164mod error;
165mod messages;
166mod notify;
167mod config;
168mod inner;
169
170pub use error::*;
171pub use messages::*;
172use inner::*;
173pub use config::*;
174
175use tokio_postgres::{SimpleQueryMessage, ToStatement};
176
177use {
178 futures::TryFutureExt,
179 std::{
180 time::Duration,
181 },
182 tokio::{
183 time::{sleep, timeout},
184 },
185 tokio_postgres::{
186 Row, RowStream, Socket, Statement, Transaction,
187 tls::MakeTlsConnect,
188 types::{BorrowToSql, ToSql, Type},
189 },
190};
191
192/// Shorthand for Result with tokio_postgres::Error
193pub type PGResult<T> = Result<T, PGError>;
194
195
196
197pub struct PGRobustClient<TLS>
198{
199 config: PGRobustClientConfig<TLS>,
200 inner: PGClient,
201}
202
203#[allow(unused)]
204impl<TLS> PGRobustClient<TLS>
205where
206 TLS: MakeTlsConnect<Socket> + Clone,
207 <TLS as MakeTlsConnect<Socket>>::Stream: Send + Sync + 'static,
208{
209 ///
210 /// Connects to the database and returns a new client.
211 ///
212 pub async fn spawn(config: PGRobustClientConfig<TLS>) -> PGResult<PGRobustClient<TLS>> {
213 let inner = PGClient::connect(&config).await?;
214 Ok(PGRobustClient { config, inner })
215 }
216
217 ///
218 /// Returns a reference to the config object used to create this client.
219 ///
220 pub fn config(&self) -> &PGRobustClientConfig<TLS> {
221 &self.config
222 }
223
224 ///
225 /// Returns a mutable reference to the config object used to create this client.
226 /// Some changes only take effect on the next connection. Others are immediate.
227 ///
228 pub fn config_mut(&mut self) -> &mut PGRobustClientConfig<TLS> {
229 &mut self.config
230 }
231
232 ///
233 /// Cancels any query in-progress.
234 ///
235 /// This is the only function that does not take a timeout nor does it
236 /// attempt to reconnect if the connection is lost. It will simply
237 /// return the original error.
238 ///
239 pub async fn cancel_query(&mut self) -> PGResult<()> {
240 self.inner
241 .cancel_token
242 .cancel_query(self.config.make_tls.clone())
243 .await
244 .map_err(Into::into)
245 }
246
247 ///
248 /// Returns the log messages captured since the last call to this function.
249 /// It also clears the log.
250 ///
251 pub fn capture_and_clear_log(&mut self) -> Vec<PGMessage> {
252 if let Ok(mut guard) = self.inner.log.write() {
253 let empty_log = Vec::default();
254 std::mem::replace(&mut *guard, empty_log)
255 } else {
256 Vec::default()
257 }
258 }
259
260 ///
261 /// Given an async closure taking the postgres client, returns the result
262 /// of said closure along with the accumulated log since the beginning of
263 /// the closure.
264 ///
265 /// If you use query pipelining then collect the logs for all queries in
266 /// the pipeline. Otherwise, the logs might not be what you expect.
267 ///
268 pub async fn with_captured_log<F, T>(&mut self, f: F) -> PGResult<(T, Vec<PGMessage>)>
269 where
270 F: AsyncFn(&mut Self) -> PGResult<T>,
271 {
272 self.capture_and_clear_log(); // clear the log just in case...
273 let result = f(self).await?;
274 let log = self.capture_and_clear_log();
275 Ok((result, log))
276 }
277
278 ///
279 /// Attempts to reconnect after a connection loss.
280 ///
281 /// Reconnection applies an exponention backoff with jitter in order to
282 /// avoid thundering herd effect. If the maximum number of attempts is
283 /// reached then an error is returned.
284 ///
285 /// If an error unrelated to establishing a new connection is returned
286 /// when trying to connect then that error is returned.
287 ///
288 async fn reconnect(&mut self) -> PGResult<()> {
289 //
290 use std::cmp::{max, min};
291 let mut attempts = 1;
292 let mut k = 500;
293
294 while attempts <= self.config.max_reconnect_attempts {
295 //
296 // Implement exponential backoff + jitter
297 // Initial delay will be 500ms, max delay is 1h.
298 //
299 sleep(Duration::from_millis(k + rand::random_range(0..k / 2))).await;
300 k = min(k * 2, 60000);
301
302 tracing::info!("Reconnect attempt #{}", attempts);
303 (self.config.callback)(PGMessage::reconnect(attempts, self.config.max_reconnect_attempts));
304
305 attempts += 1;
306
307 match PGClient::connect(&self.config).await {
308 Ok(inner) => {
309
310 self.inner = inner;
311
312 (self.config.callback)(PGMessage::connected());
313
314 if let Some(sql) = self.config.full_connect_script() {
315 match self.inner.simple_query(&sql).await {
316 Ok(_) => {
317 return Ok(());
318 }
319 Err(e) if is_pg_connection_issue(&e) => {
320 continue;
321 }
322 Err(e) => {
323 return Err(e.into());
324 }
325 }
326 } else {
327 return Ok(());
328 }
329 }
330 Err(e) if e.is_pg_connection_issue() => {
331 continue;
332 }
333 Err(e) => {
334 return Err(e);
335 }
336 }
337 }
338
339 // Issue the failed to reconnect message
340 (self.config.callback)(PGMessage::failed_to_reconnect(self.config.max_reconnect_attempts));
341 // Return the error
342 Err(PGError::FailedToReconnect(self.config.max_reconnect_attempts))
343 }
344
345
346 ///
347 /// Wraps most calls that use the client with a timeout and reconnect loop.
348 ///
349 /// If you loose the connection during a query, the client will automatically
350 /// reconnect and retry the query.
351 ///
352 pub async fn wrap_reconnect<T>(
353 &mut self,
354 max_dur: Option<Duration>,
355 factory: impl AsyncFn(&mut PGClient) -> Result<T, tokio_postgres::Error>,
356 ) -> PGResult<T> {
357 let max_dur = max_dur.unwrap_or(self.config.default_timeout);
358 loop {
359 match timeout(max_dur, factory(&mut self.inner)).await {
360 // Query succeeded so return the result
361 Ok(Ok(o)) => return Ok(o),
362 // Query failed because of connection issues
363 Ok(Err(e)) if is_pg_connection_issue(&e) => {
364 self.reconnect().await?;
365 }
366 // Query failed for some other reason
367 Ok(Err(e)) => {
368 return Err(e.into());
369 }
370 // Query timed out!
371 Err(_) => {
372 // Callback with timeout message
373 (self.config.callback)(PGMessage::timeout(max_dur));
374 // Cancel the ongoing query
375 let status = self.inner.cancel_token.cancel_query(self.config.make_tls.clone()).await;
376 // Callback with cancelled message
377 (self.config.callback)(PGMessage::cancelled(!status.is_err()));
378 // Return the timeout error
379 return Err(PGError::Timeout(max_dur));
380 }
381 }
382 }
383 }
384
385 pub async fn subscribe_notify(
386 &mut self,
387 channels: &[impl AsRef<str> + Send + Sync + 'static],
388 timeout: Option<Duration>,
389 ) -> PGResult<()> {
390
391 if !channels.is_empty() {
392 // Issue the `LISTEN` commands with protection
393 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
394 PGClient::issue_listen(client, channels).await
395 })
396 .await?;
397
398 // Add to our subscriptions
399 self.config.with_subscriptions(channels.iter().map(AsRef::as_ref));
400 }
401 Ok(())
402 }
403
404
405
406 pub async fn unsubscribe_notify(
407 &mut self,
408 channels: &[impl AsRef<str> + Send + Sync + 'static],
409 timeout: Option<Duration>,
410 ) -> PGResult<()> {
411 if !channels.is_empty() {
412 // Issue the `UNLISTEN` commands with protection
413 self.wrap_reconnect(timeout, async move |client: &mut PGClient| {
414 PGClient::issue_unlisten(client, channels).await
415 })
416 .await?;
417
418 // Remove subscriptions
419 self.config.without_subscriptions(channels.iter().map(AsRef::as_ref));
420 }
421 Ok(())
422 }
423
424 ///
425 /// Unsubscribes from all channels.
426 ///
427 pub async fn unsubscribe_notify_all(&mut self, timeout: Option<Duration>) -> PGResult<()> {
428 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
429 // Tell the world we are about to unsubscribe
430 #[cfg(feature = "tracing")]
431 tracing::info!("Unsubscribing from channels: *");
432 // Issue the `UNLISTEN` commands
433 client.simple_query("UNLISTEN *").await?;
434 Ok(())
435 })
436 .await
437 }
438
439
440 /// Like [`Client::execute_raw`].
441 pub async fn execute_raw<P, I, T>(
442 &mut self,
443 statement: &T,
444 params: I,
445 timeout: Option<Duration>,
446 ) -> PGResult<u64>
447 where
448 T: ?Sized + ToStatement + Sync + Send,
449 P: BorrowToSql + Clone + Send + Sync,
450 I: IntoIterator<Item = P> + Sync + Send,
451 I::IntoIter: ExactSizeIterator,
452 {
453 let params: Vec<_> = params.into_iter().collect();
454 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
455 client.execute_raw(statement, params.clone()).await
456 })
457 .await
458 }
459
460 /// Like [`Client::query`].
461 pub async fn query<T>(
462 &mut self,
463 query: &T,
464 params: &[&(dyn ToSql + Sync)],
465 timeout: Option<Duration>,
466 ) -> PGResult<Vec<Row>>
467 where
468 T: ?Sized + ToStatement + Sync + Send,
469 {
470 let params = params.to_vec();
471 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
472 client.query(query, ¶ms).await
473 })
474 .await
475 }
476
477 /// Like [`Client::query_one`].
478 pub async fn query_one<T>(
479 &mut self,
480 statement: &T,
481 params: &[&(dyn ToSql + Sync)],
482 timeout: Option<Duration>,
483 ) -> PGResult<Row>
484 where
485 T: ?Sized + ToStatement + Sync + Send,
486 {
487 let params = params.to_vec();
488 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
489 client.query_one(statement, ¶ms).await
490 })
491 .await
492 }
493
494 /// Like [`Client::query_opt`].
495 pub async fn query_opt<T>(
496 &mut self,
497 statement: &T,
498 params: &[&(dyn ToSql + Sync)],
499 timeout: Option<Duration>,
500 ) -> PGResult<Option<Row>>
501 where
502 T: ?Sized + ToStatement + Sync + Send,
503 {
504 let params = params.to_vec();
505 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
506 client.query_opt(statement, ¶ms).await
507 })
508 .await
509 }
510
511 /// Like [`Client::query_raw`].
512 pub async fn query_raw<T, P, I>(
513 &mut self,
514 statement: &T,
515 params: I,
516 timeout: Option<Duration>,
517 ) -> PGResult<RowStream>
518 where
519 T: ?Sized + ToStatement + Sync + Send,
520 P: BorrowToSql + Clone + Send + Sync,
521 I: IntoIterator<Item = P> + Sync + Send,
522 I::IntoIter: ExactSizeIterator,
523 {
524 let params: Vec<_> = params.into_iter().collect();
525 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
526 client.query_raw(statement, params.clone()).await
527 })
528 .await
529 }
530
531 /// Like [`Client::query_typed`]
532 pub async fn query_typed(
533 &mut self,
534 statement: &str,
535 params: &[(&(dyn ToSql + Sync), Type)],
536 timeout: Option<Duration>,
537 ) -> PGResult<Vec<Row>> {
538 let params = params.to_vec();
539 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
540 client.query_typed(statement, ¶ms).await
541 })
542 .await
543 }
544
545 /// Like [`Client::query_typed_raw`]
546 pub async fn query_typed_raw<P, I>(
547 &mut self,
548 statement: &str,
549 params: I,
550 timeout: Option<Duration>,
551 ) -> PGResult<RowStream>
552 where
553 P: BorrowToSql + Clone + Send + Sync,
554 I: IntoIterator<Item = (P, Type)> + Sync + Send,
555 {
556 let params: Vec<_> = params.into_iter().collect();
557 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
558 client.query_typed_raw(statement, params.clone()).await
559 })
560 .await
561 }
562
563 /// Like [`Client::prepare`].
564 pub async fn prepare(&mut self, query: &str, timeout: Option<Duration>) -> PGResult<Statement> {
565 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
566 client.prepare(query).map_err(Into::into).await
567 })
568 .await
569 }
570
571 /// Like [`Client::prepare_typed`].
572 pub async fn prepare_typed(
573 &mut self,
574 query: &str,
575 parameter_types: &[Type],
576 timeout: Option<Duration>,
577 ) -> PGResult<Statement> {
578 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
579 client.prepare_typed(query, parameter_types).await
580 })
581 .await
582 }
583
584 //
585 /// Similar but not quite the same as [`Client::transaction`].
586 ///
587 /// Executes the closure as a single transaction.
588 /// Commit is automatically called after the closure. If any connection
589 /// issues occur during the transaction then the transaction is rolled
590 /// back (on drop) and retried a new with the new connection subject to
591 /// the maximum number of reconnect attempts.
592 ///
593 pub async fn transaction<F>(&mut self, timeout: Option<Duration>, f: F) -> PGResult<()>
594 where
595 for<'a> F: AsyncFn(&'a mut Transaction) -> Result<(), tokio_postgres::Error>,
596 {
597 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
598 let mut tx = client.transaction().await?;
599 f(&mut tx).await?;
600 tx.commit().await?;
601 Ok(())
602 })
603 .await
604 }
605
606 /// Like [`Client::batch_execute`].
607 pub async fn batch_execute(&mut self, query: &str, timeout: Option<Duration>) -> PGResult<()> {
608 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
609 client.batch_execute(query).await
610 })
611 .await
612 }
613
614 /// Like [`Client::simple_query`].
615 pub async fn simple_query(
616 &mut self,
617 query: &str,
618 timeout: Option<Duration>,
619 ) -> PGResult<Vec<SimpleQueryMessage>> {
620 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
621 client.simple_query(query).await
622 })
623 .await
624 }
625
626 /// Returns a reference to the underlying [`tokio_postgres::Client`].
627 pub fn client(&self) -> &tokio_postgres::Client {
628 &self.inner
629 }
630}
631
632///
633/// Wraps any future in a tokio timeout and maps the Elapsed error to a PGError::Timeout.
634///
635pub async fn wrap_timeout<T>(dur: Duration, fut: impl Future<Output = PGResult<T>>) -> PGResult<T> {
636 match timeout(dur, fut).await {
637 Ok(out) => out,
638 Err(_) => Err(PGError::Timeout(dur)),
639 }
640}
641
642#[cfg(test)]
643mod tests {
644
645 use {
646 super::{PGError, PGMessage, PGRaiseLevel, PGRobustClient, PGRobustClientConfig},
647 insta::*,
648 std::{
649 sync::{Arc, RwLock},
650 time::Duration,
651 },
652 testcontainers::{ImageExt, runners::AsyncRunner},
653 testcontainers_modules::postgres::Postgres,
654 };
655
656 fn sql_for_log_and_notify_test(level: PGRaiseLevel) -> String {
657 format!(
658 r#"
659 set client_min_messages to '{}';
660 do $$
661 begin
662 raise debug 'this is a DEBUG notification';
663 notify test, 'test#1';
664 raise log 'this is a LOG notification';
665 notify test, 'test#2';
666 raise info 'this is a INFO notification';
667 notify test, 'test#3';
668 raise notice 'this is a NOTICE notification';
669 notify test, 'test#4';
670 raise warning 'this is a WARNING notification';
671 notify test, 'test#5';
672 end;
673 $$;
674 "#,
675 level
676 )
677 }
678
679 #[tokio::test]
680 async fn test_integration() {
681 //
682 // --------------------------------------------------------------------
683 // Setup Postgres Server
684 // --------------------------------------------------------------------
685
686 let pg_server = Postgres::default()
687 .with_tag("16.4")
688 .start()
689 .await
690 .expect("could not start postgres server");
691
692 // NOTE: this stuff with Box::leak allows us to create a static string
693 let database_url = format!(
694 "postgres://postgres:postgres@{}:{}/postgres",
695 pg_server.get_host().await.unwrap(),
696 pg_server.get_host_port_ipv4(5432).await.unwrap()
697 );
698
699 // let database_url = "postgres://postgres:postgres@localhost:5432/postgres";
700
701 // --------------------------------------------------------------------
702 // Connect to the server
703 // --------------------------------------------------------------------
704
705 let notices = Arc::new(RwLock::new(Vec::new()));
706 let notices_clone = notices.clone();
707
708 let callback = move |msg: PGMessage| {
709 if let Ok(mut guard) = notices_clone.write() {
710 guard.push(msg.to_string());
711 }
712 };
713
714 let config = PGRobustClientConfig::new(database_url, tokio_postgres::NoTls);
715
716 let mut admin = PGRobustClient::spawn(config.clone())
717 .await
718 .expect("could not create initial client");
719
720 let mut client = PGRobustClient::spawn(config.callback(callback).max_reconnect_attempts(2))
721 .await
722 .expect("could not create initial client");
723
724 // --------------------------------------------------------------------
725 // Subscribe to notify and raise
726 // --------------------------------------------------------------------
727
728 client
729 .subscribe_notify(&["test"], None)
730 .await
731 .expect("could not subscribe");
732
733 let (_, execution_log) = client
734 .with_captured_log(async |client: &mut PGRobustClient<_>| {
735 client
736 .simple_query(&sql_for_log_and_notify_test(PGRaiseLevel::Debug), None)
737 .await
738 })
739 .await
740 .expect("could not execute queries on postgres");
741
742 assert_json_snapshot!("subscribed-executionlog", &execution_log, {
743 "[].timestamp" => "<timestamp>",
744 "[].process_id" => "<pid>",
745 });
746
747 assert_snapshot!("subscribed-notify", extract_and_clear_logs(¬ices));
748
749 // --------------------------------------------------------------------
750 // Unsubscribe
751 // --------------------------------------------------------------------
752
753 client
754 .unsubscribe_notify(&["test"], None)
755 .await
756 .expect("could not unsubscribe");
757
758 let (_, execution_log) = client
759 .with_captured_log(async |client| {
760 client
761 .simple_query(&sql_for_log_and_notify_test(PGRaiseLevel::Warning), None)
762 .await
763 })
764 .await
765 .expect("could not execute queries on postgres");
766
767 assert_json_snapshot!("unsubscribed-executionlog", &execution_log, {
768 "[].timestamp" => "<timestamp>",
769 "[].process_id" => "<pid>",
770 });
771
772 assert_snapshot!("unsubscribed-notify", extract_and_clear_logs(¬ices));
773
774 // --------------------------------------------------------------------
775 // Timeout
776 // --------------------------------------------------------------------
777
778 let result = client
779 .simple_query(
780 "
781 do $$
782 begin
783 raise info 'before sleep';
784 perform pg_sleep(3);
785 raise info 'after sleep';
786 end;
787 $$
788 ",
789 Some(Duration::from_secs(1)),
790 )
791 .await;
792
793 assert!(matches!(result, Err(PGError::Timeout(_))));
794 assert_snapshot!("timeout-messages", extract_and_clear_logs(¬ices));
795
796 // --------------------------------------------------------------------
797 // Reconnect (before query)
798 // --------------------------------------------------------------------
799
800 admin.simple_query("select pg_terminate_backend(pid) from pg_stat_activity where pid != pg_backend_pid()", None)
801 .await.expect("could not kill other client");
802
803 let result = client
804 .simple_query(
805 "
806 do $$
807 begin
808 raise info 'before sleep';
809 perform pg_sleep(1);
810 raise info 'after sleep';
811 end;
812 $$
813 ",
814 Some(Duration::from_secs(10)),
815 )
816 .await;
817
818 assert!(matches!(result, Ok(_)));
819 assert_snapshot!("reconnect-before", extract_and_clear_logs(¬ices));
820
821 // --------------------------------------------------------------------
822 // Reconnect (during query)
823 // --------------------------------------------------------------------
824
825 let query = client.simple_query(
826 "
827 do $$
828 begin
829 raise info 'before sleep';
830 perform pg_sleep(1);
831 raise info 'after sleep';
832 end;
833 $$
834 ",
835 None,
836 );
837
838 let kill_later =
839 admin.simple_query("
840 select pg_sleep(0.5);
841 select pg_terminate_backend(pid) from pg_stat_activity where pid != pg_backend_pid()",
842 None
843 );
844
845 let (_, result) = tokio::join!(kill_later, query);
846
847 assert!(matches!(result, Ok(_)));
848 assert_snapshot!("reconnect-during", extract_and_clear_logs(¬ices));
849
850 // --------------------------------------------------------------------
851 // Reconnect (failure)
852 // --------------------------------------------------------------------
853
854 pg_server.stop().await.expect("could not stop server");
855
856 let result = client.simple_query(
857 "
858 do $$
859 begin
860 raise info 'before sleep';
861 perform pg_sleep(1);
862 raise info 'after sleep';
863 end;
864 $$
865 ",
866 None,
867 ).await;
868
869 eprintln!("result: {result:?}");
870 assert!(matches!(result, Err(PGError::FailedToReconnect(2))));
871 assert_snapshot!("reconnect-failure", extract_and_clear_logs(¬ices));
872
873
874 }
875
876 fn extract_and_clear_logs(logs: &Arc<RwLock<Vec<String>>>) -> String {
877 let mut guard = logs.write().expect("could not read notices");
878 let emtpy_log = Vec::default();
879 let log = std::mem::replace(&mut *guard, emtpy_log);
880 redact_pids(&redact_timestamps(&log.join("\n")))
881 }
882
883 fn redact_timestamps(text: &str) -> String {
884 use regex::Regex;
885 use std::sync::OnceLock;
886 pub static TIMESTAMP_PATTERN: OnceLock<Regex> = OnceLock::new();
887 let pat = TIMESTAMP_PATTERN.get_or_init(|| {
888 Regex::new(r"\d{4}-\d{2}-\d{2}.?\d{2}:\d{2}:\d{2}(\.\d{3,9})?(Z| UTC|[+-]\d{2}:\d{2})?")
889 .unwrap()
890 });
891 pat.replace_all(text, "<timestamp>").to_string()
892 }
893
894 fn redact_pids(text: &str) -> String {
895 use regex::Regex;
896 use std::sync::OnceLock;
897 pub static TIMESTAMP_PATTERN: OnceLock<Regex> = OnceLock::new();
898 let pat = TIMESTAMP_PATTERN.get_or_init(|| Regex::new(r"pid=\d+").unwrap());
899 pat.replace_all(text, "<pid>").to_string()
900 }
901}