postgres_notify/lib.rs
1//!
2//! `postgres-notify` started out as an easy way to receive PostgreSQL
3//! notifications but has since evolved into a much more useful client
4//! and is able to handle the following:
5//!
6//! - Receive `NOTIFY <channel> <payload>` pub/sub style notifications
7//!
8//! - Receive `RAISE` messages and collects execution logs
9//!
10//! - Applies a timeout to all queries. If a query timesout then the
11//! client will attempt to cancel the ongoing query before returning
12//! an error.
13//!
14//! - Supports cancelling an ongoing query.
15//!
16//! - Automatically reconnects if the connection is lost and uses
17//! exponential backoff with jitter to avoid thundering herd effect.
18//!
19//! - Supports an `connect_script`, which can be executed on connect.
20//!
21//! - Has a familiar API with an additional `timeout` argument.
22//!
23//!
24//! # BREAKING CHANGE in v0.3.2
25//!
26//! Configuration is done through the [`PGRobustClientConfig`] struct.
27//!
28//!
29//! # BREAKING CHANGE in v0.3.0
30//!
31//! This latest version is a breaking change. The `PGNotifyingClient` has
32//! been renamed `PGRobustClient` and queries don't need to be made through
33//! the inner client anymore. Furthermore, a single callback handles all
34//! of the notifications: NOTIFY, RAISE, TIMOUT, RECONNECT.
35//!
36//!
37//!
38//! # LISTEN/NOTIFY
39//!
40//! For a very long time (at least since version 7.1) postgres has supported
41//! asynchronous notifications based on LISTEN/NOTIFY commands. This allows
42//! the database to send notifications to the client in an "out-of-band"
43//! channel.
44//!
45//! Once the client has issued a `LISTEN <channel>` command, the database will
46//! send notifications to the client whenever a `NOTIFY <channel> <payload>`
47//! is issued on the database regardless of which session has issued it.
48//! This can act as a cheap alternative to a pub/sub system though without
49//! mailboxes or persistence.
50//!
51//! When calling `subscribe_notify` with a list of channel names, [`PGRobustClient`]
52//! will the client callback any time a `NOTIFY` message is received for any of
53//! the subscribed channels.
54//!
55//! ```rust
56//! use postgres_notify::{PGRobustClientConfig, PGRobustClient, PGMessage};
57//! use tokio_postgres::NoTls;
58//! use std::time::Duration;
59//!
60//! let rt = tokio::runtime::Builder::new_current_thread()
61//! .enable_io()
62//! .enable_time()
63//! .build()
64//! .expect("could not start tokio runtime");
65//!
66//! rt.block_on(async move {
67//!
68//! let database_url = "postgres://postgres:postgres@localhost:5432/postgres";
69//! let config = PGRobustClientConfig::new(database_url, NoTls)
70//! .callback(|msg:PGMessage| println!("{:?}", &msg));
71//!
72//! let mut client = PGRobustClient::spawn(config)
73//! .await.expect("Could not connect to postgres");
74//!
75//! client.subscribe_notify(&["test"], Some(Duration::from_millis(100)))
76//! .await.expect("Could not subscribe to channels");
77//! });
78//! ```
79//!
80//!
81//!
82//! # RAISE/LOGS
83//!
84//! Logs in PostgreSQL are created by writing `RAISE <level> <message>` statements
85//! within your functions, stored procedures and scripts. When such a command is
86//! issued, [`PGRobustClient`] receives a notification even if the call is still
87//! in progress. This allows the caller to capture the execution log in realtime
88//! if needed.
89//!
90//! [`PGRobustClient`] simplifies log collection in two ways. Firstly it provides
91//! the [`with_captured_log`](PGRobustClient::with_captured_log) functions,
92//! which collects the execution log and returns it along with the query result.
93//! This is probably what most people will want to use.
94//!
95//! If your needs are more complex or if you want to propagate realtime logs,
96//! then using client callback can be used to forwand the message on an
97//! asynchonous channel.
98//!
99//! ```rust
100//! use postgres_notify::{PGRobustClient, PGRobustClientConfig, PGMessage};
101//! use tokio_postgres::NoTls;
102//! use std::time::Duration;
103//!
104//! let rt = tokio::runtime::Builder::new_current_thread()
105//! .enable_io()
106//! .enable_time()
107//! .build()
108//! .expect("could not start tokio runtime");
109//!
110//! rt.block_on(async move {
111//!
112//! let database_url = "postgres://postgres:postgres@localhost:5432/postgres";
113//! let config = PGRobustClientConfig::new(database_url, NoTls)
114//! .callback(|msg:PGMessage| println!("{:?}", &msg));
115//!
116//! let mut client = PGRobustClient::spawn(config)
117//! .await.expect("Could not connect to postgres");
118//!
119//! // Will capture the notices in a Vec
120//! let (_, log) = client.with_captured_log(async |client| {
121//! client.simple_query("
122//! do $$
123//! begin
124//! raise debug 'this is a DEBUG notification';
125//! raise log 'this is a LOG notification';
126//! raise info 'this is a INFO notification';
127//! raise notice 'this is a NOTICE notification';
128//! raise warning 'this is a WARNING notification';
129//! end;
130//! $$",
131//! Some(Duration::from_secs(1))
132//! ).await.expect("Error during query execution");
133//! Ok(())
134//! }).await.expect("Error during captur log");
135//!
136//! println!("{:#?}", &log);
137//! });
138//! ```
139//!
140//! Note that the client passed to the async callback is `&mut self`, which
141//! means that all queries within that block are subject to the same timeout
142//! and reconnect handling.
143//!
144//! You can look at the unit tests for a more in-depth example.
145//!
146//!
147//!
148//! # TIMEOUT
149//!
150//! All of the query functions in [`PGRobustClient`] have a `timeout` argument.
151//! If the query takes longer than the timeout, then an error is returned.
152//! If not specified, the default timeout is 1 hour.
153//!
154//!
155//! # RECONNECT
156//!
157//! If the connection to the database is lost, then [`PGRobustClient`] will
158//! attempt to reconnect to the database automatically. If the maximum number
159//! of reconnect attempts is reached then an error is returned. Furthermore,
160//! it uses a exponential backoff with jitter in order to avoid thundering
161//! herd effect.
162//!
163
164mod error;
165mod messages;
166mod notify;
167mod config;
168mod inner;
169
170pub use error::*;
171pub use messages::*;
172use inner::*;
173pub use config::*;
174
175use tokio_postgres::{SimpleQueryMessage, ToStatement};
176
177use {
178 futures::TryFutureExt,
179 std::{
180 time::Duration,
181 },
182 tokio::{
183 time::{sleep, timeout},
184 },
185 tokio_postgres::{
186 Row, RowStream, Socket, Statement, Transaction,
187 tls::MakeTlsConnect,
188 types::{BorrowToSql, ToSql, Type},
189 },
190};
191
192/// Shorthand for Result with tokio_postgres::Error
193pub type PGResult<T> = Result<T, PGError>;
194
195
196
197pub struct PGRobustClient<TLS>
198{
199 config: PGRobustClientConfig<TLS>,
200 inner: PGClient,
201}
202
203#[allow(unused)]
204impl<TLS> PGRobustClient<TLS>
205where
206 TLS: MakeTlsConnect<Socket> + Clone,
207 <TLS as MakeTlsConnect<Socket>>::Stream: Send + Sync + 'static,
208{
209 ///
210 /// Connects to the database and returns a new client.
211 ///
212 pub async fn spawn(config: PGRobustClientConfig<TLS>) -> PGResult<PGRobustClient<TLS>> {
213 let inner = PGClient::connect(&config).await?;
214 Ok(PGRobustClient { config, inner })
215 }
216
217 ///
218 /// Returns a reference to the config object used to create this client.
219 ///
220 pub fn config(&self) -> &PGRobustClientConfig<TLS> {
221 &self.config
222 }
223
224 ///
225 /// Returns a mutable reference to the config object used to create this client.
226 /// Some changes only take effect on the next connection. Others are immediate.
227 ///
228 pub fn config_mut(&mut self) -> &mut PGRobustClientConfig<TLS> {
229 &mut self.config
230 }
231
232 ///
233 /// Cancels any query in-progress.
234 ///
235 /// This is the only function that does not take a timeout nor does it
236 /// attempt to reconnect if the connection is lost. It will simply
237 /// return the original error.
238 ///
239 pub async fn cancel_query(&mut self) -> PGResult<()> {
240 self.inner
241 .cancel_token
242 .cancel_query(self.config.make_tls.clone())
243 .await
244 .map_err(Into::into)
245 }
246
247 ///
248 /// Returns the log messages captured since the last call to this function.
249 /// It also clears the log.
250 ///
251 pub fn capture_and_clear_log(&mut self) -> Vec<PGMessage> {
252 if let Ok(mut guard) = self.inner.log.write() {
253 let empty_log = Vec::default();
254 std::mem::replace(&mut *guard, empty_log)
255 } else {
256 Vec::default()
257 }
258 }
259
260 ///
261 /// Given an async closure taking the postgres client, returns the result
262 /// of said closure along with the accumulated log since the beginning of
263 /// the closure.
264 ///
265 /// If you use query pipelining then collect the logs for all queries in
266 /// the pipeline. Otherwise, the logs might not be what you expect.
267 ///
268 pub async fn with_captured_log<F, T>(&mut self, f: F) -> PGResult<(T, Vec<PGMessage>)>
269 where
270 F: AsyncFn(&mut Self) -> PGResult<T>,
271 {
272 self.capture_and_clear_log(); // clear the log just in case...
273 let result = f(self).await?;
274 let log = self.capture_and_clear_log();
275 Ok((result, log))
276 }
277
278 ///
279 /// Attempts to reconnect after a connection loss.
280 ///
281 /// Reconnection applies an exponention backoff with jitter in order to
282 /// avoid thundering herd effect. If the maximum number of attempts is
283 /// reached then an error is returned.
284 ///
285 /// If an error unrelated to establishing a new connection is returned
286 /// when trying to connect then that error is returned.
287 ///
288 async fn reconnect(&mut self) -> PGResult<()> {
289 //
290 use std::cmp::{max, min};
291 let mut attempts = 1;
292 let mut k = 500;
293
294 while attempts <= self.config.max_reconnect_attempts {
295 //
296 // Implement exponential backoff + jitter
297 // Initial delay will be 500ms, max delay is 1h.
298 //
299 sleep(Duration::from_millis(k + rand::random_range(0..k / 2))).await;
300 k = min(k * 2, 60000);
301
302 tracing::info!("Reconnect attempt #{}", attempts);
303 (self.config.callback)(PGMessage::reconnect(attempts, self.config.max_reconnect_attempts));
304
305 attempts += 1;
306
307 match PGClient::connect(&self.config).await {
308 Ok(inner) => {
309
310 self.inner = inner;
311
312 let subs: Vec<_> = self.config.subscriptions.iter().map(String::to_owned).collect();
313 match self.inner.issue_listen(&subs).await {
314 Ok(_) => {
315 return Ok(());
316 }
317 Err(e) if is_pg_connection_issue(&e) => {
318 continue;
319 }
320 Err(e) => {
321 return Err(e.into());
322 }
323 }
324 }
325 Err(e) if e.is_pg_connection_issue() => {
326 continue;
327 }
328 Err(e) => {
329 return Err(e);
330 }
331 }
332 }
333
334 // Issue the failed to reconnect message
335 (self.config.callback)(PGMessage::failed_to_reconnect(self.config.max_reconnect_attempts));
336 // Return the error
337 Err(PGError::FailedToReconnect(self.config.max_reconnect_attempts))
338 }
339
340
341 ///
342 /// Wraps most calls that use the client with a timeout and reconnect loop.
343 ///
344 /// If you loose the connection during a query, the client will automatically
345 /// reconnect and retry the query.
346 ///
347 pub async fn wrap_reconnect<T>(
348 &mut self,
349 max_dur: Option<Duration>,
350 factory: impl AsyncFn(&mut PGClient) -> Result<T, tokio_postgres::Error>,
351 ) -> PGResult<T> {
352 let max_dur = max_dur.unwrap_or(self.config.default_timeout);
353 loop {
354 match timeout(max_dur, factory(&mut self.inner)).await {
355 // Query succeeded so return the result
356 Ok(Ok(o)) => return Ok(o),
357 // Query failed because of connection issues
358 Ok(Err(e)) if is_pg_connection_issue(&e) => {
359 self.reconnect().await?;
360 }
361 // Query failed for some other reason
362 Ok(Err(e)) => {
363 return Err(e.into());
364 }
365 // Query timed out!
366 Err(_) => {
367 // Callback with timeout message
368 (self.config.callback)(PGMessage::timeout(max_dur));
369 // Cancel the ongoing query
370 let status = self.inner.cancel_token.cancel_query(self.config.make_tls.clone()).await;
371 // Callback with cancelled message
372 (self.config.callback)(PGMessage::cancelled(!status.is_err()));
373 // Return the timeout error
374 return Err(PGError::Timeout(max_dur));
375 }
376 }
377 }
378 }
379
380 pub async fn subscribe_notify(
381 &mut self,
382 channels: &[impl AsRef<str> + Send + Sync + 'static],
383 timeout: Option<Duration>,
384 ) -> PGResult<()> {
385
386 if !channels.is_empty() {
387 // Issue the `LISTEN` commands with protection
388 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
389 PGClient::issue_listen(client, channels).await
390 })
391 .await?;
392
393 // Add to our subscriptions
394 self.config.with_subscriptions(channels.iter().map(AsRef::as_ref));
395 }
396 Ok(())
397 }
398
399
400
401 pub async fn unsubscribe_notify(
402 &mut self,
403 channels: &[impl AsRef<str> + Send + Sync + 'static],
404 timeout: Option<Duration>,
405 ) -> PGResult<()> {
406 if !channels.is_empty() {
407 // Issue the `UNLISTEN` commands with protection
408 self.wrap_reconnect(timeout, async move |client: &mut PGClient| {
409 PGClient::issue_unlisten(client, channels).await
410 })
411 .await?;
412
413 // Remove subscriptions
414 self.config.without_subscriptions(channels.iter().map(AsRef::as_ref));
415 }
416 Ok(())
417 }
418
419 ///
420 /// Unsubscribes from all channels.
421 ///
422 pub async fn unsubscribe_notify_all(&mut self, timeout: Option<Duration>) -> PGResult<()> {
423 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
424 // Tell the world we are about to unsubscribe
425 #[cfg(feature = "tracing")]
426 tracing::info!("Unsubscribing from channels: *");
427 // Issue the `UNLISTEN` commands
428 client.simple_query("UNLISTEN *").await?;
429 Ok(())
430 })
431 .await
432 }
433
434
435 /// Like [`Client::execute_raw`].
436 pub async fn execute_raw<P, I, T>(
437 &mut self,
438 statement: &T,
439 params: I,
440 timeout: Option<Duration>,
441 ) -> PGResult<u64>
442 where
443 T: ?Sized + ToStatement + Sync + Send,
444 P: BorrowToSql + Clone + Send + Sync,
445 I: IntoIterator<Item = P> + Sync + Send,
446 I::IntoIter: ExactSizeIterator,
447 {
448 let params: Vec<_> = params.into_iter().collect();
449 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
450 client.execute_raw(statement, params.clone()).await
451 })
452 .await
453 }
454
455 /// Like [`Client::query`].
456 pub async fn query<T>(
457 &mut self,
458 query: &T,
459 params: &[&(dyn ToSql + Sync)],
460 timeout: Option<Duration>,
461 ) -> PGResult<Vec<Row>>
462 where
463 T: ?Sized + ToStatement + Sync + Send,
464 {
465 let params = params.to_vec();
466 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
467 client.query(query, ¶ms).await
468 })
469 .await
470 }
471
472 /// Like [`Client::query_one`].
473 pub async fn query_one<T>(
474 &mut self,
475 statement: &T,
476 params: &[&(dyn ToSql + Sync)],
477 timeout: Option<Duration>,
478 ) -> PGResult<Row>
479 where
480 T: ?Sized + ToStatement + Sync + Send,
481 {
482 let params = params.to_vec();
483 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
484 client.query_one(statement, ¶ms).await
485 })
486 .await
487 }
488
489 /// Like [`Client::query_opt`].
490 pub async fn query_opt<T>(
491 &mut self,
492 statement: &T,
493 params: &[&(dyn ToSql + Sync)],
494 timeout: Option<Duration>,
495 ) -> PGResult<Option<Row>>
496 where
497 T: ?Sized + ToStatement + Sync + Send,
498 {
499 let params = params.to_vec();
500 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
501 client.query_opt(statement, ¶ms).await
502 })
503 .await
504 }
505
506 /// Like [`Client::query_raw`].
507 pub async fn query_raw<T, P, I>(
508 &mut self,
509 statement: &T,
510 params: I,
511 timeout: Option<Duration>,
512 ) -> PGResult<RowStream>
513 where
514 T: ?Sized + ToStatement + Sync + Send,
515 P: BorrowToSql + Clone + Send + Sync,
516 I: IntoIterator<Item = P> + Sync + Send,
517 I::IntoIter: ExactSizeIterator,
518 {
519 let params: Vec<_> = params.into_iter().collect();
520 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
521 client.query_raw(statement, params.clone()).await
522 })
523 .await
524 }
525
526 /// Like [`Client::query_typed`]
527 pub async fn query_typed(
528 &mut self,
529 statement: &str,
530 params: &[(&(dyn ToSql + Sync), Type)],
531 timeout: Option<Duration>,
532 ) -> PGResult<Vec<Row>> {
533 let params = params.to_vec();
534 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
535 client.query_typed(statement, ¶ms).await
536 })
537 .await
538 }
539
540 /// Like [`Client::query_typed_raw`]
541 pub async fn query_typed_raw<P, I>(
542 &mut self,
543 statement: &str,
544 params: I,
545 timeout: Option<Duration>,
546 ) -> PGResult<RowStream>
547 where
548 P: BorrowToSql + Clone + Send + Sync,
549 I: IntoIterator<Item = (P, Type)> + Sync + Send,
550 {
551 let params: Vec<_> = params.into_iter().collect();
552 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
553 client.query_typed_raw(statement, params.clone()).await
554 })
555 .await
556 }
557
558 /// Like [`Client::prepare`].
559 pub async fn prepare(&mut self, query: &str, timeout: Option<Duration>) -> PGResult<Statement> {
560 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
561 client.prepare(query).map_err(Into::into).await
562 })
563 .await
564 }
565
566 /// Like [`Client::prepare_typed`].
567 pub async fn prepare_typed(
568 &mut self,
569 query: &str,
570 parameter_types: &[Type],
571 timeout: Option<Duration>,
572 ) -> PGResult<Statement> {
573 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
574 client.prepare_typed(query, parameter_types).await
575 })
576 .await
577 }
578
579 //
580 /// Similar but not quite the same as [`Client::transaction`].
581 ///
582 /// Executes the closure as a single transaction.
583 /// Commit is automatically called after the closure. If any connection
584 /// issues occur during the transaction then the transaction is rolled
585 /// back (on drop) and retried a new with the new connection subject to
586 /// the maximum number of reconnect attempts.
587 ///
588 pub async fn transaction<F>(&mut self, timeout: Option<Duration>, f: F) -> PGResult<()>
589 where
590 for<'a> F: AsyncFn(&'a mut Transaction) -> Result<(), tokio_postgres::Error>,
591 {
592 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
593 let mut tx = client.transaction().await?;
594 f(&mut tx).await?;
595 tx.commit().await?;
596 Ok(())
597 })
598 .await
599 }
600
601 /// Like [`Client::batch_execute`].
602 pub async fn batch_execute(&mut self, query: &str, timeout: Option<Duration>) -> PGResult<()> {
603 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
604 client.batch_execute(query).await
605 })
606 .await
607 }
608
609 /// Like [`Client::simple_query`].
610 pub async fn simple_query(
611 &mut self,
612 query: &str,
613 timeout: Option<Duration>,
614 ) -> PGResult<Vec<SimpleQueryMessage>> {
615 self.wrap_reconnect(timeout, async |client: &mut PGClient| {
616 client.simple_query(query).await
617 })
618 .await
619 }
620
621 /// Returns a reference to the underlying [`tokio_postgres::Client`].
622 pub fn client(&self) -> &tokio_postgres::Client {
623 &self.inner
624 }
625}
626
627///
628/// Wraps any future in a tokio timeout and maps the Elapsed error to a PGError::Timeout.
629///
630pub async fn wrap_timeout<T>(dur: Duration, fut: impl Future<Output = PGResult<T>>) -> PGResult<T> {
631 match timeout(dur, fut).await {
632 Ok(out) => out,
633 Err(_) => Err(PGError::Timeout(dur)),
634 }
635}
636
637#[cfg(test)]
638mod tests {
639
640 use {
641 super::{PGError, PGMessage, PGRaiseLevel, PGRobustClient, PGRobustClientConfig},
642 insta::*,
643 std::{
644 sync::{Arc, RwLock},
645 time::Duration,
646 },
647 testcontainers::{ImageExt, runners::AsyncRunner},
648 testcontainers_modules::postgres::Postgres,
649 };
650
651 fn sql_for_log_and_notify_test(level: PGRaiseLevel) -> String {
652 format!(
653 r#"
654 set client_min_messages to '{}';
655 do $$
656 begin
657 raise debug 'this is a DEBUG notification';
658 notify test, 'test#1';
659 raise log 'this is a LOG notification';
660 notify test, 'test#2';
661 raise info 'this is a INFO notification';
662 notify test, 'test#3';
663 raise notice 'this is a NOTICE notification';
664 notify test, 'test#4';
665 raise warning 'this is a WARNING notification';
666 notify test, 'test#5';
667 end;
668 $$;
669 "#,
670 level
671 )
672 }
673
674 #[tokio::test]
675 async fn test_integration() {
676 //
677 // --------------------------------------------------------------------
678 // Setup Postgres Server
679 // --------------------------------------------------------------------
680
681 let pg_server = Postgres::default()
682 .with_tag("16.4")
683 .start()
684 .await
685 .expect("could not start postgres server");
686
687 // NOTE: this stuff with Box::leak allows us to create a static string
688 let database_url = format!(
689 "postgres://postgres:postgres@{}:{}/postgres",
690 pg_server.get_host().await.unwrap(),
691 pg_server.get_host_port_ipv4(5432).await.unwrap()
692 );
693
694 // let database_url = "postgres://postgres:postgres@localhost:5432/postgres";
695
696 // --------------------------------------------------------------------
697 // Connect to the server
698 // --------------------------------------------------------------------
699
700 let notices = Arc::new(RwLock::new(Vec::new()));
701 let notices_clone = notices.clone();
702
703 let callback = move |msg: PGMessage| {
704 if let Ok(mut guard) = notices_clone.write() {
705 guard.push(msg.to_string());
706 }
707 };
708
709 let config = PGRobustClientConfig::new(database_url, tokio_postgres::NoTls);
710
711 let mut admin = PGRobustClient::spawn(config.clone())
712 .await
713 .expect("could not create initial client");
714
715 let mut client = PGRobustClient::spawn(config.callback(callback).max_reconnect_attempts(2))
716 .await
717 .expect("could not create initial client");
718
719 // --------------------------------------------------------------------
720 // Subscribe to notify and raise
721 // --------------------------------------------------------------------
722
723 client
724 .subscribe_notify(&["test"], None)
725 .await
726 .expect("could not subscribe");
727
728 let (_, execution_log) = client
729 .with_captured_log(async |client: &mut PGRobustClient<_>| {
730 client
731 .simple_query(&sql_for_log_and_notify_test(PGRaiseLevel::Debug), None)
732 .await
733 })
734 .await
735 .expect("could not execute queries on postgres");
736
737 assert_json_snapshot!("subscribed-executionlog", &execution_log, {
738 "[].timestamp" => "<timestamp>",
739 "[].process_id" => "<pid>",
740 });
741
742 assert_snapshot!("subscribed-notify", extract_and_clear_logs(¬ices));
743
744 // --------------------------------------------------------------------
745 // Unsubscribe
746 // --------------------------------------------------------------------
747
748 client
749 .unsubscribe_notify(&["test"], None)
750 .await
751 .expect("could not unsubscribe");
752
753 let (_, execution_log) = client
754 .with_captured_log(async |client| {
755 client
756 .simple_query(&sql_for_log_and_notify_test(PGRaiseLevel::Warning), None)
757 .await
758 })
759 .await
760 .expect("could not execute queries on postgres");
761
762 assert_json_snapshot!("unsubscribed-executionlog", &execution_log, {
763 "[].timestamp" => "<timestamp>",
764 "[].process_id" => "<pid>",
765 });
766
767 assert_snapshot!("unsubscribed-notify", extract_and_clear_logs(¬ices));
768
769 // --------------------------------------------------------------------
770 // Timeout
771 // --------------------------------------------------------------------
772
773 let result = client
774 .simple_query(
775 "
776 do $$
777 begin
778 raise info 'before sleep';
779 perform pg_sleep(3);
780 raise info 'after sleep';
781 end;
782 $$
783 ",
784 Some(Duration::from_secs(1)),
785 )
786 .await;
787
788 assert!(matches!(result, Err(PGError::Timeout(_))));
789 assert_snapshot!("timeout-messages", extract_and_clear_logs(¬ices));
790
791 // --------------------------------------------------------------------
792 // Reconnect (before query)
793 // --------------------------------------------------------------------
794
795 admin.simple_query("select pg_terminate_backend(pid) from pg_stat_activity where pid != pg_backend_pid()", None)
796 .await.expect("could not kill other client");
797
798 let result = client
799 .simple_query(
800 "
801 do $$
802 begin
803 raise info 'before sleep';
804 perform pg_sleep(1);
805 raise info 'after sleep';
806 end;
807 $$
808 ",
809 Some(Duration::from_secs(10)),
810 )
811 .await;
812
813 assert!(matches!(result, Ok(_)));
814 assert_snapshot!("reconnect-before", extract_and_clear_logs(¬ices));
815
816 // --------------------------------------------------------------------
817 // Reconnect (during query)
818 // --------------------------------------------------------------------
819
820 let query = client.simple_query(
821 "
822 do $$
823 begin
824 raise info 'before sleep';
825 perform pg_sleep(1);
826 raise info 'after sleep';
827 end;
828 $$
829 ",
830 None,
831 );
832
833 let kill_later =
834 admin.simple_query("
835 select pg_sleep(0.5);
836 select pg_terminate_backend(pid) from pg_stat_activity where pid != pg_backend_pid()",
837 None
838 );
839
840 let (_, result) = tokio::join!(kill_later, query);
841
842 assert!(matches!(result, Ok(_)));
843 assert_snapshot!("reconnect-during", extract_and_clear_logs(¬ices));
844
845 // --------------------------------------------------------------------
846 // Reconnect (failure)
847 // --------------------------------------------------------------------
848
849 pg_server.stop().await.expect("could not stop server");
850
851 let result = client.simple_query(
852 "
853 do $$
854 begin
855 raise info 'before sleep';
856 perform pg_sleep(1);
857 raise info 'after sleep';
858 end;
859 $$
860 ",
861 None,
862 ).await;
863
864 eprintln!("result: {result:?}");
865 assert!(matches!(result, Err(PGError::FailedToReconnect(2))));
866 assert_snapshot!("reconnect-failure", extract_and_clear_logs(¬ices));
867
868
869 }
870
871 fn extract_and_clear_logs(logs: &Arc<RwLock<Vec<String>>>) -> String {
872 let mut guard = logs.write().expect("could not read notices");
873 let emtpy_log = Vec::default();
874 let log = std::mem::replace(&mut *guard, emtpy_log);
875 redact_pids(&redact_timestamps(&log.join("\n")))
876 }
877
878 fn redact_timestamps(text: &str) -> String {
879 use regex::Regex;
880 use std::sync::OnceLock;
881 pub static TIMESTAMP_PATTERN: OnceLock<Regex> = OnceLock::new();
882 let pat = TIMESTAMP_PATTERN.get_or_init(|| {
883 Regex::new(r"\d{4}-\d{2}-\d{2}.?\d{2}:\d{2}:\d{2}(\.\d{3,9})?(Z| UTC|[+-]\d{2}:\d{2})?")
884 .unwrap()
885 });
886 pat.replace_all(text, "<timestamp>").to_string()
887 }
888
889 fn redact_pids(text: &str) -> String {
890 use regex::Regex;
891 use std::sync::OnceLock;
892 pub static TIMESTAMP_PATTERN: OnceLock<Regex> = OnceLock::new();
893 let pat = TIMESTAMP_PATTERN.get_or_init(|| Regex::new(r"pid=\d+").unwrap());
894 pat.replace_all(text, "<pid>").to_string()
895 }
896}