postgres_notify/
lib.rs

1//!
2//! `postgres-notify` started out as an easy way to receive PostgreSQL
3//! notifications but has since evolved into a much more useful client
4//! and is able to handle the following:
5//!
6//! - Receive `NOTIFY <channel> <payload>` pub/sub style notifications
7//!
8//! - Receive `RAISE` messages and collects execution logs
9//!
10//! - Applies a timeout to all queries. If a query timesout then the
11//!   client will attempt to cancel the ongoing query before returning
12//!   an error.
13//!
14//! - Supports cancelling an ongoing query.
15//!
16//! - Automatically reconnects if the connection is lost and uses
17//!   exponential backoff with jitter to avoid thundering herd effect.
18//!
19//! - Supports an `connect_script`, which can be executed on connect.
20//! 
21//! - Has a familiar API with an additional `timeout` argument.
22//!
23//!
24//! # BREAKING CHANGE in v0.3.2
25//!
26//! Configuration is done through the [`PGRobustClientConfig`] struct.
27//! 
28//!
29//! # BREAKING CHANGE in v0.3.0
30//!
31//! This latest version is a breaking change. The `PGNotifyingClient` has
32//! been renamed `PGRobustClient` and queries don't need to be made through
33//! the inner client anymore. Furthermore, a single callback handles all
34//! of the notifications: NOTIFY, RAISE, TIMOUT, RECONNECT.
35//!
36//!
37//!
38//! # LISTEN/NOTIFY
39//!
40//! For a very long time (at least since version 7.1) postgres has supported
41//! asynchronous notifications based on LISTEN/NOTIFY commands. This allows
42//! the database to send notifications to the client in an "out-of-band"
43//! channel.
44//!
45//! Once the client has issued a `LISTEN <channel>` command, the database will
46//! send notifications to the client whenever a `NOTIFY <channel> <payload>`
47//! is issued on the database regardless of which session has issued it.
48//! This can act as a cheap alternative to a pub/sub system though without
49//! mailboxes or persistence.
50//!
51//! When calling `subscribe_notify` with a list of channel names, [`PGRobustClient`]
52//! will the client callback any time a `NOTIFY` message is received for any of
53//! the subscribed channels.
54//!
55//! ```rust
56//! use postgres_notify::{PGRobustClientConfig, PGRobustClient, PGMessage};
57//! use tokio_postgres::NoTls;
58//! use std::time::Duration;
59//!
60//! let rt = tokio::runtime::Builder::new_current_thread()
61//!     .enable_io()
62//!     .enable_time()
63//!     .build()
64//!     .expect("could not start tokio runtime");
65//!
66//! rt.block_on(async move {
67//!     
68//!     let database_url = "postgres://postgres:postgres@localhost:5432/postgres";
69//!     let config = PGRobustClientConfig::new(database_url, NoTls)
70//!         .callback(|msg:PGMessage| println!("{:?}", &msg));
71//!
72//!     let mut client = PGRobustClient::spawn(config)
73//!         .await.expect("Could not connect to postgres");
74//!
75//!     client.subscribe_notify(&["test"], Some(Duration::from_millis(100)))
76//!         .await.expect("Could not subscribe to channels");
77//! });
78//! ```
79//!
80//!
81//!
82//! # RAISE/LOGS
83//!
84//! Logs in PostgreSQL are created by writing `RAISE <level> <message>` statements
85//! within your functions, stored procedures and scripts. When such a command is
86//! issued, [`PGRobustClient`] receives a notification even if the call is still
87//! in progress. This allows the caller to capture the execution log in realtime
88//! if needed.
89//!
90//! [`PGRobustClient`] simplifies log collection in two ways. Firstly it provides
91//! the [`with_captured_log`](PGRobustClient::with_captured_log) functions,
92//! which collects the execution log and returns it along with the query result.
93//! This is probably what most people will want to use.
94//!
95//! If your needs are more complex or if you want to propagate realtime logs,
96//! then using client callback can be used to forwand the message on an
97//! asynchonous channel.
98//!
99//! ```rust
100//! use postgres_notify::{PGRobustClient, PGRobustClientConfig, PGMessage};
101//! use tokio_postgres::NoTls;
102//! use std::time::Duration;
103//!
104//! let rt = tokio::runtime::Builder::new_current_thread()
105//!     .enable_io()
106//!     .enable_time()
107//!     .build()
108//!     .expect("could not start tokio runtime");
109//!
110//! rt.block_on(async move {
111//!
112//!     let database_url = "postgres://postgres:postgres@localhost:5432/postgres";
113//!     let config = PGRobustClientConfig::new(database_url, NoTls)
114//!         .callback(|msg:PGMessage| println!("{:?}", &msg));
115//! 
116//!     let mut client = PGRobustClient::spawn(config)
117//!         .await.expect("Could not connect to postgres");
118//!
119//!     // Will capture the notices in a Vec
120//!     let (_, log) = client.with_captured_log(async |client| {
121//!         client.simple_query("
122//!             do $$
123//!             begin
124//!                 raise debug 'this is a DEBUG notification';
125//!                 raise log 'this is a LOG notification';
126//!                 raise info 'this is a INFO notification';
127//!                 raise notice 'this is a NOTICE notification';
128//!                 raise warning 'this is a WARNING notification';
129//!             end;
130//!             $$",
131//!             Some(Duration::from_secs(1))
132//!         ).await.expect("Error during query execution");
133//!         Ok(())
134//!     }).await.expect("Error during captur log");
135//!
136//!     println!("{:#?}", &log);
137//!  });
138//! ```
139//!
140//! Note that the client passed to the async callback is `&mut self`, which
141//! means that all queries within that block are subject to the same timeout
142//! and reconnect handling.
143//!
144//! You can look at the unit tests for a more in-depth example.
145//!
146//!
147//!
148//! # TIMEOUT
149//!
150//! All of the query functions in [`PGRobustClient`] have a `timeout` argument.
151//! If the query takes longer than the timeout, then an error is returned.
152//! If not specified, the default timeout is 1 hour.
153//!
154//!
155//! # RECONNECT
156//!
157//! If the connection to the database is lost, then [`PGRobustClient`] will
158//! attempt to reconnect to the database automatically. If the maximum number
159//! of reconnect attempts is reached then an error is returned. Furthermore,
160//! it uses a exponential backoff with jitter in order to avoid thundering
161//! herd effect.
162//!
163
164mod error;
165mod messages;
166mod notify;
167mod config;
168mod inner;
169
170pub use error::*;
171pub use messages::*;
172use inner::*;
173pub use config::*;
174
175use tokio_postgres::{SimpleQueryMessage, ToStatement};
176
177use {
178    futures::TryFutureExt,
179    std::{
180        time::Duration,
181    },
182    tokio::{
183        time::{sleep, timeout},
184    },
185    tokio_postgres::{
186        Row, RowStream, Socket, Statement, Transaction,
187        tls::MakeTlsConnect,
188        types::{BorrowToSql, ToSql, Type},
189    },
190};
191
192/// Shorthand for Result with tokio_postgres::Error
193pub type PGResult<T> = Result<T, PGError>;
194
195
196
197pub struct PGRobustClient<TLS>
198{
199    config: PGRobustClientConfig<TLS>,
200    inner: PGClient,
201}
202
203#[allow(unused)]
204impl<TLS> PGRobustClient<TLS>
205where
206    TLS: MakeTlsConnect<Socket> + Clone,
207    <TLS as MakeTlsConnect<Socket>>::Stream: Send + Sync + 'static,
208{
209    ///
210    /// Connects to the database and returns a new client.
211    /// 
212    pub async fn spawn(config: PGRobustClientConfig<TLS>) -> PGResult<PGRobustClient<TLS>> {
213        let inner = PGClient::connect(&config).await?;
214        Ok(PGRobustClient { config, inner })
215    }
216
217    ///
218    /// Returns a reference to the config object used to create this client.
219    /// 
220    pub fn config(&self) -> &PGRobustClientConfig<TLS> {
221        &self.config
222    }
223
224    ///
225    /// Returns a mutable reference to the config object used to create this client.
226    /// Some changes only take effect on the next connection. Others are immediate.
227    ///
228    pub fn config_mut(&mut self) -> &mut PGRobustClientConfig<TLS> {
229        &mut self.config
230    }   
231    
232    ///
233    /// Cancels any query in-progress.
234    ///
235    /// This is the only function that does not take a timeout nor does it
236    /// attempt to reconnect if the connection is lost. It will simply
237    /// return the original error.
238    ///
239    pub async fn cancel_query(&mut self) -> PGResult<()> {
240        self.inner
241            .cancel_token
242            .cancel_query(self.config.make_tls.clone())
243            .await
244            .map_err(Into::into)
245    }
246
247    ///
248    /// Returns the log messages captured since the last call to this function.
249    /// It also clears the log.
250    ///
251    pub fn capture_and_clear_log(&mut self) -> Vec<PGMessage> {
252        if let Ok(mut guard) = self.inner.log.write() {
253            let empty_log = Vec::default();
254            std::mem::replace(&mut *guard, empty_log)
255        } else {
256            Vec::default()
257        }
258    }
259
260    ///
261    /// Given an async closure taking the postgres client, returns the result
262    /// of said closure along with the accumulated log since the beginning of
263    /// the closure.
264    ///
265    /// If you use query pipelining then collect the logs for all queries in
266    /// the pipeline. Otherwise, the logs might not be what you expect.
267    ///
268    pub async fn with_captured_log<F, T>(&mut self, f: F) -> PGResult<(T, Vec<PGMessage>)>
269    where
270        F: AsyncFn(&mut Self) -> PGResult<T>,
271    {
272        self.capture_and_clear_log(); // clear the log just in case...
273        let result = f(self).await?;
274        let log = self.capture_and_clear_log();
275        Ok((result, log))
276    }
277
278    ///
279    /// Attempts to reconnect after a connection loss.
280    ///
281    /// Reconnection applies an exponention backoff with jitter in order to
282    /// avoid thundering herd effect. If the maximum number of attempts is
283    /// reached then an error is returned.
284    ///
285    /// If an error unrelated to establishing a new connection is returned
286    /// when trying to connect then that error is returned.
287    ///
288    async fn reconnect(&mut self) -> PGResult<()> {
289        //
290        use std::cmp::{max, min};
291        let mut attempts = 1;
292        let mut k = 500;
293
294        while attempts <= self.config.max_reconnect_attempts {
295            //
296            // Implement exponential backoff + jitter
297            // Initial delay will be 500ms, max delay is 1h.
298            //
299            sleep(Duration::from_millis(k + rand::random_range(0..k / 2))).await;
300            k = min(k * 2, 60000);
301
302            tracing::info!("Reconnect attempt #{}", attempts);
303            (self.config.callback)(PGMessage::reconnect(attempts, self.config.max_reconnect_attempts));
304
305            attempts += 1;
306
307            match PGClient::connect(&self.config).await {
308                Ok(inner) => {
309
310                    self.inner = inner;
311
312                    let subs: Vec<_> = self.config.subscriptions.iter().map(String::to_owned).collect();
313                    match self.inner.issue_listen(&subs).await {
314                        Ok(_) => {
315                            return Ok(());
316                        }
317                        Err(e) if is_pg_connection_issue(&e) => {
318                            continue;
319                        }
320                        Err(e) => {
321                            return Err(e.into());
322                        }
323                    }
324                }
325                Err(e) if e.is_pg_connection_issue() => {
326                    continue;
327                }
328                Err(e) => {
329                    return Err(e);
330                }
331            }
332        }
333
334        // Issue the failed to reconnect message
335        (self.config.callback)(PGMessage::failed_to_reconnect(self.config.max_reconnect_attempts));
336        // Return the error
337        Err(PGError::FailedToReconnect(self.config.max_reconnect_attempts))
338    }
339
340
341    ///
342    /// Wraps most calls that use the client with a timeout and reconnect loop.
343    /// 
344    /// If you loose the connection during a query, the client will automatically 
345    /// reconnect and retry the query.
346    /// 
347    pub async fn wrap_reconnect<T>(
348        &mut self,
349        max_dur: Option<Duration>,
350        factory: impl AsyncFn(&mut PGClient) -> Result<T, tokio_postgres::Error>,
351    ) -> PGResult<T> {
352        let max_dur = max_dur.unwrap_or(self.config.default_timeout);
353        loop {
354            match timeout(max_dur, factory(&mut self.inner)).await {
355                // Query succeeded so return the result
356                Ok(Ok(o)) => return Ok(o),
357                // Query failed because of connection issues
358                Ok(Err(e)) if is_pg_connection_issue(&e) => {
359                    self.reconnect().await?;
360                }
361                // Query failed for some other reason
362                Ok(Err(e)) => {
363                    return Err(e.into());
364                }
365                // Query timed out!
366                Err(_) => {
367                    // Callback with timeout message
368                    (self.config.callback)(PGMessage::timeout(max_dur));
369                    // Cancel the ongoing query
370                    let status = self.inner.cancel_token.cancel_query(self.config.make_tls.clone()).await;
371                    // Callback with cancelled message
372                    (self.config.callback)(PGMessage::cancelled(!status.is_err()));
373                    // Return the timeout error
374                    return Err(PGError::Timeout(max_dur));
375                }
376            }
377        }
378    }
379
380    pub async fn subscribe_notify(
381        &mut self,
382        channels: &[impl AsRef<str> + Send + Sync + 'static],
383        timeout: Option<Duration>,
384    ) -> PGResult<()> {
385
386        if !channels.is_empty() {
387            // Issue the `LISTEN` commands with protection
388            self.wrap_reconnect(timeout, async |client: &mut PGClient| {
389                PGClient::issue_listen(client, channels).await
390            })
391            .await?;
392
393            // Add to our subscriptions
394            self.config.with_subscriptions(channels.iter().map(AsRef::as_ref));
395        }
396        Ok(())
397    }
398
399
400
401    pub async fn unsubscribe_notify(
402        &mut self,
403        channels: &[impl AsRef<str> + Send + Sync + 'static],
404        timeout: Option<Duration>,
405    ) -> PGResult<()> {
406        if !channels.is_empty() {
407            // Issue the `UNLISTEN` commands with protection
408            self.wrap_reconnect(timeout, async move |client: &mut PGClient| {
409                PGClient::issue_unlisten(client, channels).await
410            })
411            .await?;
412
413            // Remove subscriptions
414            self.config.without_subscriptions(channels.iter().map(AsRef::as_ref));
415        }
416        Ok(())
417    }
418
419    ///
420    /// Unsubscribes from all channels.
421    ///
422    pub async fn unsubscribe_notify_all(&mut self, timeout: Option<Duration>) -> PGResult<()> {
423        self.wrap_reconnect(timeout, async |client: &mut PGClient| {
424            // Tell the world we are about to unsubscribe
425            #[cfg(feature = "tracing")]
426            tracing::info!("Unsubscribing from channels: *");
427            // Issue the `UNLISTEN` commands
428            client.simple_query("UNLISTEN *").await?;
429            Ok(())
430        })
431        .await
432    }
433
434
435    /// Like [`Client::execute_raw`].
436    pub async fn execute_raw<P, I, T>(
437        &mut self,
438        statement: &T,
439        params: I,
440        timeout: Option<Duration>,
441    ) -> PGResult<u64>
442    where
443        T: ?Sized + ToStatement + Sync + Send,
444        P: BorrowToSql + Clone + Send + Sync,
445        I: IntoIterator<Item = P> + Sync + Send,
446        I::IntoIter: ExactSizeIterator,
447    {
448        let params: Vec<_> = params.into_iter().collect();
449        self.wrap_reconnect(timeout, async |client: &mut PGClient| {
450            client.execute_raw(statement, params.clone()).await
451        })
452        .await
453    }
454
455    /// Like [`Client::query`].
456    pub async fn query<T>(
457        &mut self,
458        query: &T,
459        params: &[&(dyn ToSql + Sync)],
460        timeout: Option<Duration>,
461    ) -> PGResult<Vec<Row>>
462    where
463        T: ?Sized + ToStatement + Sync + Send,
464    {
465        let params = params.to_vec();
466        self.wrap_reconnect(timeout, async |client: &mut PGClient| {
467            client.query(query, &params).await
468        })
469        .await
470    }
471
472    /// Like [`Client::query_one`].
473    pub async fn query_one<T>(
474        &mut self,
475        statement: &T,
476        params: &[&(dyn ToSql + Sync)],
477        timeout: Option<Duration>,
478    ) -> PGResult<Row>
479    where
480        T: ?Sized + ToStatement + Sync + Send,
481    {
482        let params = params.to_vec();
483        self.wrap_reconnect(timeout, async |client: &mut PGClient| {
484            client.query_one(statement, &params).await
485        })
486        .await
487    }
488
489    /// Like [`Client::query_opt`].
490    pub async fn query_opt<T>(
491        &mut self,
492        statement: &T,
493        params: &[&(dyn ToSql + Sync)],
494        timeout: Option<Duration>,
495    ) -> PGResult<Option<Row>>
496    where
497        T: ?Sized + ToStatement + Sync + Send,
498    {
499        let params = params.to_vec();
500        self.wrap_reconnect(timeout, async |client: &mut PGClient| {
501            client.query_opt(statement, &params).await
502        })
503        .await
504    }
505
506    /// Like [`Client::query_raw`].
507    pub async fn query_raw<T, P, I>(
508        &mut self,
509        statement: &T,
510        params: I,
511        timeout: Option<Duration>,
512    ) -> PGResult<RowStream>
513    where
514        T: ?Sized + ToStatement + Sync + Send,
515        P: BorrowToSql + Clone + Send + Sync,
516        I: IntoIterator<Item = P> + Sync + Send,
517        I::IntoIter: ExactSizeIterator,
518    {
519        let params: Vec<_> = params.into_iter().collect();
520        self.wrap_reconnect(timeout, async |client: &mut PGClient| {
521            client.query_raw(statement, params.clone()).await
522        })
523        .await
524    }
525
526    /// Like [`Client::query_typed`]
527    pub async fn query_typed(
528        &mut self,
529        statement: &str,
530        params: &[(&(dyn ToSql + Sync), Type)],
531        timeout: Option<Duration>,
532    ) -> PGResult<Vec<Row>> {
533        let params = params.to_vec();
534        self.wrap_reconnect(timeout, async |client: &mut PGClient| {
535            client.query_typed(statement, &params).await
536        })
537        .await
538    }
539
540    /// Like [`Client::query_typed_raw`]
541    pub async fn query_typed_raw<P, I>(
542        &mut self,
543        statement: &str,
544        params: I,
545        timeout: Option<Duration>,
546    ) -> PGResult<RowStream>
547    where
548        P: BorrowToSql + Clone + Send + Sync,
549        I: IntoIterator<Item = (P, Type)> + Sync + Send,
550    {
551        let params: Vec<_> = params.into_iter().collect();
552        self.wrap_reconnect(timeout, async |client: &mut PGClient| {
553            client.query_typed_raw(statement, params.clone()).await
554        })
555        .await
556    }
557
558    /// Like [`Client::prepare`].
559    pub async fn prepare(&mut self, query: &str, timeout: Option<Duration>) -> PGResult<Statement> {
560        self.wrap_reconnect(timeout, async |client: &mut PGClient| {
561            client.prepare(query).map_err(Into::into).await
562        })
563        .await
564    }
565
566    /// Like [`Client::prepare_typed`].
567    pub async fn prepare_typed(
568        &mut self,
569        query: &str,
570        parameter_types: &[Type],
571        timeout: Option<Duration>,
572    ) -> PGResult<Statement> {
573        self.wrap_reconnect(timeout, async |client: &mut PGClient| {
574            client.prepare_typed(query, parameter_types).await
575        })
576        .await
577    }
578
579    //
580    /// Similar but not quite the same as [`Client::transaction`].
581    ///
582    /// Executes the closure as a single transaction.
583    /// Commit is automatically called after the closure. If any connection
584    /// issues occur during the transaction then the transaction is rolled
585    /// back (on drop) and retried a new with the new connection subject to
586    /// the maximum number of reconnect attempts.
587    ///
588    pub async fn transaction<F>(&mut self, timeout: Option<Duration>, f: F) -> PGResult<()>
589    where
590        for<'a> F: AsyncFn(&'a mut Transaction) -> Result<(), tokio_postgres::Error>,
591    {
592        self.wrap_reconnect(timeout, async |client: &mut PGClient| {
593            let mut tx = client.transaction().await?;
594            f(&mut tx).await?;
595            tx.commit().await?;
596            Ok(())
597        })
598        .await
599    }
600
601    /// Like [`Client::batch_execute`].
602    pub async fn batch_execute(&mut self, query: &str, timeout: Option<Duration>) -> PGResult<()> {
603        self.wrap_reconnect(timeout, async |client: &mut PGClient| {
604            client.batch_execute(query).await
605        })
606        .await
607    }
608
609    /// Like [`Client::simple_query`].
610    pub async fn simple_query(
611        &mut self,
612        query: &str,
613        timeout: Option<Duration>,
614    ) -> PGResult<Vec<SimpleQueryMessage>> {
615        self.wrap_reconnect(timeout, async |client: &mut PGClient| {
616            client.simple_query(query).await
617        })
618        .await
619    }
620
621    /// Returns a reference to the underlying [`tokio_postgres::Client`].
622    pub fn client(&self) -> &tokio_postgres::Client {
623        &self.inner
624    }
625}
626
627///
628/// Wraps any future in a tokio timeout and maps the Elapsed error to a PGError::Timeout.
629///
630pub async fn wrap_timeout<T>(dur: Duration, fut: impl Future<Output = PGResult<T>>) -> PGResult<T> {
631    match timeout(dur, fut).await {
632        Ok(out) => out,
633        Err(_) => Err(PGError::Timeout(dur)),
634    }
635}
636
637#[cfg(test)]
638mod tests {
639
640    use {
641        super::{PGError, PGMessage, PGRaiseLevel, PGRobustClient, PGRobustClientConfig},
642        insta::*,
643        std::{
644            sync::{Arc, RwLock},
645            time::Duration,
646        },
647        testcontainers::{ImageExt, runners::AsyncRunner},
648        testcontainers_modules::postgres::Postgres,
649    };
650
651    fn sql_for_log_and_notify_test(level: PGRaiseLevel) -> String {
652        format!(
653            r#"
654                    set client_min_messages to '{}';
655                    do $$
656                    begin
657                        raise debug 'this is a DEBUG notification';
658                        notify test, 'test#1';
659                        raise log 'this is a LOG notification';
660                        notify test, 'test#2';
661                        raise info 'this is a INFO notification';
662                        notify test, 'test#3';
663                        raise notice 'this is a NOTICE notification';
664                        notify test, 'test#4';
665                        raise warning 'this is a WARNING notification';
666                        notify test, 'test#5';
667                    end;
668                    $$;
669                "#,
670            level
671        )
672    }
673
674    #[tokio::test]
675    async fn test_integration() {
676        //
677        // --------------------------------------------------------------------
678        // Setup Postgres Server
679        // --------------------------------------------------------------------
680
681        let pg_server = Postgres::default()
682            .with_tag("16.4")
683            .start()
684            .await
685            .expect("could not start postgres server");
686
687        // NOTE: this stuff with Box::leak allows us to create a static string
688        let database_url = format!(
689            "postgres://postgres:postgres@{}:{}/postgres",
690            pg_server.get_host().await.unwrap(),
691            pg_server.get_host_port_ipv4(5432).await.unwrap()
692        );
693
694        // let database_url = "postgres://postgres:postgres@localhost:5432/postgres";
695
696        // --------------------------------------------------------------------
697        // Connect to the server
698        // --------------------------------------------------------------------
699
700        let notices = Arc::new(RwLock::new(Vec::new()));
701        let notices_clone = notices.clone();
702
703        let callback = move |msg: PGMessage| {
704            if let Ok(mut guard) = notices_clone.write() {
705                guard.push(msg.to_string());
706            }
707        };
708
709        let config = PGRobustClientConfig::new(database_url, tokio_postgres::NoTls);
710
711        let mut admin = PGRobustClient::spawn(config.clone())
712            .await
713            .expect("could not create initial client");
714
715        let mut client = PGRobustClient::spawn(config.callback(callback).max_reconnect_attempts(2))
716            .await
717            .expect("could not create initial client");
718
719        // --------------------------------------------------------------------
720        // Subscribe to notify and raise
721        // --------------------------------------------------------------------
722
723        client
724            .subscribe_notify(&["test"], None)
725            .await
726            .expect("could not subscribe");
727
728        let (_, execution_log) = client
729            .with_captured_log(async |client: &mut PGRobustClient<_>| {
730                client
731                    .simple_query(&sql_for_log_and_notify_test(PGRaiseLevel::Debug), None)
732                    .await
733            })
734            .await
735            .expect("could not execute queries on postgres");
736
737        assert_json_snapshot!("subscribed-executionlog", &execution_log, {
738            "[].timestamp" => "<timestamp>",
739            "[].process_id" => "<pid>",
740        });
741
742        assert_snapshot!("subscribed-notify", extract_and_clear_logs(&notices));
743
744        // --------------------------------------------------------------------
745        // Unsubscribe
746        // --------------------------------------------------------------------
747
748        client
749            .unsubscribe_notify(&["test"], None)
750            .await
751            .expect("could not unsubscribe");
752
753        let (_, execution_log) = client
754            .with_captured_log(async |client| {
755                client
756                    .simple_query(&sql_for_log_and_notify_test(PGRaiseLevel::Warning), None)
757                    .await
758            })
759            .await
760            .expect("could not execute queries on postgres");
761
762        assert_json_snapshot!("unsubscribed-executionlog", &execution_log, {
763            "[].timestamp" => "<timestamp>",
764            "[].process_id" => "<pid>",
765        });
766
767        assert_snapshot!("unsubscribed-notify", extract_and_clear_logs(&notices));
768
769        // --------------------------------------------------------------------
770        // Timeout
771        // --------------------------------------------------------------------
772
773        let result = client
774            .simple_query(
775                "
776                    do $$
777                    begin
778                        raise info 'before sleep';
779                        perform pg_sleep(3);
780                        raise info 'after sleep';
781                    end;
782                    $$
783                ",
784                Some(Duration::from_secs(1)),
785            )
786            .await;
787
788        assert!(matches!(result, Err(PGError::Timeout(_))));
789        assert_snapshot!("timeout-messages", extract_and_clear_logs(&notices));
790
791        // --------------------------------------------------------------------
792        // Reconnect (before query)
793        // --------------------------------------------------------------------
794
795        admin.simple_query("select pg_terminate_backend(pid) from pg_stat_activity where pid != pg_backend_pid()", None)
796            .await.expect("could not kill other client");
797
798        let result = client
799            .simple_query(
800                "
801                    do $$
802                    begin
803                        raise info 'before sleep';
804                        perform pg_sleep(1);
805                        raise info 'after sleep';
806                    end;
807                    $$
808                ",
809                Some(Duration::from_secs(10)),
810            )
811            .await;
812
813        assert!(matches!(result, Ok(_)));
814        assert_snapshot!("reconnect-before", extract_and_clear_logs(&notices));
815
816        // --------------------------------------------------------------------
817        // Reconnect (during query)
818        // --------------------------------------------------------------------
819
820        let query = client.simple_query(
821            "
822                    do $$
823                    begin
824                        raise info 'before sleep';
825                        perform pg_sleep(1);
826                        raise info 'after sleep';
827                    end;
828                    $$
829                ",
830            None,
831        );
832
833        let kill_later = 
834            admin.simple_query("
835                select pg_sleep(0.5); 
836                select pg_terminate_backend(pid) from pg_stat_activity where pid != pg_backend_pid()", 
837                None
838            );
839
840        let (_, result) = tokio::join!(kill_later, query);
841
842        assert!(matches!(result, Ok(_)));
843        assert_snapshot!("reconnect-during", extract_and_clear_logs(&notices));
844
845        // --------------------------------------------------------------------
846        // Reconnect (failure)
847        // --------------------------------------------------------------------
848
849        pg_server.stop().await.expect("could not stop server");
850
851        let result = client.simple_query(
852            "
853                do $$
854                begin
855                    raise info 'before sleep';
856                    perform pg_sleep(1);
857                    raise info 'after sleep';
858                end;
859                $$
860            ",
861            None,
862        ).await;
863
864        eprintln!("result: {result:?}");
865        assert!(matches!(result, Err(PGError::FailedToReconnect(2))));
866        assert_snapshot!("reconnect-failure", extract_and_clear_logs(&notices));
867
868
869    }
870
871    fn extract_and_clear_logs(logs: &Arc<RwLock<Vec<String>>>) -> String {
872        let mut guard = logs.write().expect("could not read notices");
873        let emtpy_log = Vec::default();
874        let log = std::mem::replace(&mut *guard, emtpy_log);
875        redact_pids(&redact_timestamps(&log.join("\n")))
876    }
877
878    fn redact_timestamps(text: &str) -> String {
879        use regex::Regex;
880        use std::sync::OnceLock;
881        pub static TIMESTAMP_PATTERN: OnceLock<Regex> = OnceLock::new();
882        let pat = TIMESTAMP_PATTERN.get_or_init(|| {
883            Regex::new(r"\d{4}-\d{2}-\d{2}.?\d{2}:\d{2}:\d{2}(\.\d{3,9})?(Z| UTC|[+-]\d{2}:\d{2})?")
884                .unwrap()
885        });
886        pat.replace_all(text, "<timestamp>").to_string()
887    }
888
889    fn redact_pids(text: &str) -> String {
890        use regex::Regex;
891        use std::sync::OnceLock;
892        pub static TIMESTAMP_PATTERN: OnceLock<Regex> = OnceLock::new();
893        let pat = TIMESTAMP_PATTERN.get_or_init(|| Regex::new(r"pid=\d+").unwrap());
894        pat.replace_all(text, "<pid>").to_string()
895    }
896}