postgres_notify/
lib.rs

1//!
2//! `postgres-notify` started out as an easy way to receive PostgreSQL
3//! notifications but has since evolved into a much more useful client
4//! and is able to handle the following:
5//!
6//! - Receive `NOTIFY <channel> <payload>` pub/sub style notifications
7//!
8//! - Receive `RAISE` messages and collects execution logs
9//!
10//! - Applies a timeout to all queries. If a query timesout then the
11//!   client will attempt to cancel the ongoing query before returning
12//!   an error.
13//!
14//! - Supports cancelling an ongoing query.
15//!
16//! - Automatically reconnects if the connection is lost and uses
17//!   exponential backoff with jitter to avoid thundering herd effect.
18//!
19//! - Supports an `connect_script`, which can be executed on connect.
20//! 
21//! - Has a familiar API with an additional `timeout` argument.
22//!
23//!
24//! # BREAKING CHANGE in v0.3.2
25//!
26//! Configuration is done through the [`PGRobustClientConfig`] struct.
27//! 
28//!
29//! # BREAKING CHANGE in v0.3.0
30//!
31//! This latest version is a breaking change. The `PGNotifyingClient` has
32//! been renamed `PGRobustClient` and queries don't need to be made through
33//! the inner client anymore. Furthermore, a single callback handles all
34//! of the notifications: NOTIFY, RAISE, TIMOUT, RECONNECT.
35//!
36//!
37//!
38//! # LISTEN/NOTIFY
39//!
40//! For a very long time (at least since version 7.1) postgres has supported
41//! asynchronous notifications based on LISTEN/NOTIFY commands. This allows
42//! the database to send notifications to the client in an "out-of-band"
43//! channel.
44//!
45//! Once the client has issued a `LISTEN <channel>` command, the database will
46//! send notifications to the client whenever a `NOTIFY <channel> <payload>`
47//! is issued on the database regardless of which session has issued it.
48//! This can act as a cheap alternative to a pub/sub system though without
49//! mailboxes or persistence.
50//!
51//! When calling `subscribe_notify` with a list of channel names, [`PGRobustClient`]
52//! will the client callback any time a `NOTIFY` message is received for any of
53//! the subscribed channels.
54//!
55//! ```rust
56//! use postgres_notify::{PGRobustClientConfig, PGRobustClient, PGMessage};
57//! use tokio_postgres::NoTls;
58//! use std::time::Duration;
59//!
60//! let rt = tokio::runtime::Builder::new_current_thread()
61//!     .enable_io()
62//!     .enable_time()
63//!     .build()
64//!     .expect("could not start tokio runtime");
65//!
66//! rt.block_on(async move {
67//!     
68//!     let database_url = "postgres://postgres:postgres@localhost:5432/postgres";
69//!     let config = PGRobustClientConfig::new(database_url, NoTls)
70//!         .callback(|msg:PGMessage| println!("{:?}", &msg));
71//!
72//!     let mut client = PGRobustClient::spawn(config)
73//!         .await.expect("Could not connect to postgres");
74//!
75//!     client.subscribe_notify(&["test"], Some(Duration::from_millis(100)))
76//!         .await.expect("Could not subscribe to channels");
77//! });
78//! ```
79//!
80//!
81//!
82//! # RAISE/LOGS
83//!
84//! Logs in PostgreSQL are created by writing `RAISE <level> <message>` statements
85//! within your functions, stored procedures and scripts. When such a command is
86//! issued, [`PGRobustClient`] receives a notification even if the call is still
87//! in progress. This allows the caller to capture the execution log in realtime
88//! if needed.
89//!
90//! [`PGRobustClient`] simplifies log collection in two ways. Firstly it provides
91//! the [`with_captured_log`](PGRobustClient::with_captured_log) functions,
92//! which collects the execution log and returns it along with the query result.
93//! This is probably what most people will want to use.
94//!
95//! If your needs are more complex or if you want to propagate realtime logs,
96//! then using client callback can be used to forwand the message on an
97//! asynchonous channel.
98//!
99//! ```rust
100//! use postgres_notify::{PGRobustClient, PGRobustClientConfig, PGMessage};
101//! use tokio_postgres::NoTls;
102//! use std::time::Duration;
103//!
104//! let rt = tokio::runtime::Builder::new_current_thread()
105//!     .enable_io()
106//!     .enable_time()
107//!     .build()
108//!     .expect("could not start tokio runtime");
109//!
110//! rt.block_on(async move {
111//!
112//!     let database_url = "postgres://postgres:postgres@localhost:5432/postgres";
113//!     let config = PGRobustClientConfig::new(database_url, NoTls)
114//!         .callback(|msg:PGMessage| println!("{:?}", &msg));
115//! 
116//!     let mut client = PGRobustClient::spawn(config)
117//!         .await.expect("Could not connect to postgres");
118//!
119//!     // Will capture the notices in a Vec
120//!     let (_, log) = client.with_captured_log(async |client| {
121//!         client.simple_query("
122//!             do $$
123//!             begin
124//!                 raise debug 'this is a DEBUG notification';
125//!                 raise log 'this is a LOG notification';
126//!                 raise info 'this is a INFO notification';
127//!                 raise notice 'this is a NOTICE notification';
128//!                 raise warning 'this is a WARNING notification';
129//!             end;
130//!             $$",
131//!             Some(Duration::from_secs(1))
132//!         ).await.expect("Error during query execution");
133//!         Ok(())
134//!     }).await.expect("Error during captur log");
135//!
136//!     println!("{:#?}", &log);
137//!  });
138//! ```
139//!
140//! Note that the client passed to the async callback is `&mut self`, which
141//! means that all queries within that block are subject to the same timeout
142//! and reconnect handling.
143//!
144//! You can look at the unit tests for a more in-depth example.
145//!
146//!
147//!
148//! # TIMEOUT
149//!
150//! All of the query functions in [`PGRobustClient`] have a `timeout` argument.
151//! If the query takes longer than the timeout, then an error is returned.
152//! If not specified, the default timeout is 1 hour.
153//!
154//!
155//! # RECONNECT
156//!
157//! If the connection to the database is lost, then [`PGRobustClient`] will
158//! attempt to reconnect to the database automatically. If the maximum number
159//! of reconnect attempts is reached then an error is returned. Furthermore,
160//! it uses a exponential backoff with jitter in order to avoid thundering
161//! herd effect.
162//!
163
164mod error;
165mod messages;
166mod notify;
167mod config;
168mod inner;
169
170pub use error::*;
171pub use messages::*;
172use inner::*;
173pub use config::*;
174
175use tokio_postgres::{SimpleQueryMessage, ToStatement};
176
177use {
178    futures::TryFutureExt,
179    std::{
180        time::Duration,
181    },
182    tokio::{
183        time::{sleep, timeout},
184    },
185    tokio_postgres::{
186        Row, RowStream, Socket, Statement, Transaction,
187        tls::MakeTlsConnect,
188        types::{BorrowToSql, ToSql, Type},
189    },
190};
191
192/// Shorthand for Result with tokio_postgres::Error
193pub type PGResult<T> = Result<T, PGError>;
194
195
196
197pub struct PGRobustClient<TLS>
198{
199    config: PGRobustClientConfig<TLS>,
200    inner: PGClient,
201}
202
203#[allow(unused)]
204impl<TLS> PGRobustClient<TLS>
205where
206    TLS: MakeTlsConnect<Socket> + Clone,
207    <TLS as MakeTlsConnect<Socket>>::Stream: Send + Sync + 'static,
208{
209    ///
210    /// Connects to the database and returns a new client.
211    /// 
212    pub async fn spawn(config: PGRobustClientConfig<TLS>) -> PGResult<PGRobustClient<TLS>> {
213        let inner = PGClient::connect(&config).await?;
214        Ok(PGRobustClient { config, inner })
215    }
216
217    ///
218    /// Returns a reference to the config object used to create this client.
219    /// 
220    pub fn config(&self) -> &PGRobustClientConfig<TLS> {
221        &self.config
222    }
223
224    ///
225    /// Returns a mutable reference to the config object used to create this client.
226    /// Some changes only take effect on the next connection. Others are immediate.
227    ///
228    pub fn config_mut(&mut self) -> &mut PGRobustClientConfig<TLS> {
229        &mut self.config
230    }   
231    
232    ///
233    /// Cancels any query in-progress.
234    ///
235    /// This is the only function that does not take a timeout nor does it
236    /// attempt to reconnect if the connection is lost. It will simply
237    /// return the original error.
238    ///
239    pub async fn cancel_query(&mut self) -> PGResult<()> {
240        self.inner
241            .cancel_token
242            .cancel_query(self.config.make_tls.clone())
243            .await
244            .map_err(Into::into)
245    }
246
247    ///
248    /// Returns the log messages captured since the last call to this function.
249    /// It also clears the log.
250    ///
251    pub fn capture_and_clear_log(&mut self) -> Vec<PGMessage> {
252        if let Ok(mut guard) = self.inner.log.write() {
253            let empty_log = Vec::default();
254            std::mem::replace(&mut *guard, empty_log)
255        } else {
256            Vec::default()
257        }
258    }
259
260    ///
261    /// Given an async closure taking the postgres client, returns the result
262    /// of said closure along with the accumulated log since the beginning of
263    /// the closure.
264    ///
265    /// If you use query pipelining then collect the logs for all queries in
266    /// the pipeline. Otherwise, the logs might not be what you expect.
267    ///
268    pub async fn with_captured_log<F, T>(&mut self, f: F) -> PGResult<(T, Vec<PGMessage>)>
269    where
270        F: AsyncFn(&mut Self) -> PGResult<T>,
271    {
272        self.capture_and_clear_log(); // clear the log just in case...
273        let result = f(self).await?;
274        let log = self.capture_and_clear_log();
275        Ok((result, log))
276    }
277
278    ///
279    /// Attempts to reconnect after a connection loss.
280    ///
281    /// Reconnection applies an exponention backoff with jitter in order to
282    /// avoid thundering herd effect. If the maximum number of attempts is
283    /// reached then an error is returned.
284    ///
285    /// If an error unrelated to establishing a new connection is returned
286    /// when trying to connect then that error is returned.
287    ///
288    async fn reconnect(&mut self) -> PGResult<()> {
289        //
290        use std::cmp::{max, min};
291        let mut attempts = 1;
292        let mut k = 500;
293
294        while attempts <= self.config.max_reconnect_attempts {
295            //
296            // Implement exponential backoff + jitter
297            // Initial delay will be 500ms, max delay is 1h.
298            //
299            sleep(Duration::from_millis(k + rand::random_range(0..k / 2))).await;
300            k = min(k * 2, 60000);
301
302            tracing::info!("Reconnect attempt #{}", attempts);
303            (self.config.callback)(PGMessage::reconnect(attempts, self.config.max_reconnect_attempts));
304
305            attempts += 1;
306
307            match PGClient::connect(&self.config).await {
308                Ok(inner) => {
309
310                    self.inner = inner;
311
312                    (self.config.callback)(PGMessage::connected());
313                    
314                    if let Some(sql) = self.config.full_connect_script() {
315                        match self.inner.simple_query(&sql).await {
316                            Ok(_) => {
317                                return Ok(());
318                            }
319                            Err(e) if is_pg_connection_issue(&e) => {
320                                continue;
321                            }
322                            Err(e) => {
323                                return Err(e.into());
324                            }
325                        }
326                    } else {
327                        return Ok(());
328                    }
329                }
330                Err(e) if e.is_pg_connection_issue() => {
331                    continue;
332                }
333                Err(e) => {
334                    return Err(e);
335                }
336            }
337        }
338
339        // Issue the failed to reconnect message
340        (self.config.callback)(PGMessage::failed_to_reconnect(self.config.max_reconnect_attempts));
341        // Return the error
342        Err(PGError::FailedToReconnect(self.config.max_reconnect_attempts))
343    }
344
345
346    ///
347    /// Wraps most calls that use the client with a timeout and reconnect loop.
348    /// 
349    /// If you loose the connection during a query, the client will automatically 
350    /// reconnect and retry the query.
351    /// 
352    pub async fn wrap_reconnect<T>(
353        &mut self,
354        max_dur: Option<Duration>,
355        factory: impl AsyncFn(&mut PGClient) -> Result<T, tokio_postgres::Error>,
356    ) -> PGResult<T> {
357        let max_dur = max_dur.unwrap_or(self.config.default_timeout);
358        loop {
359            match timeout(max_dur, factory(&mut self.inner)).await {
360                // Query succeeded so return the result
361                Ok(Ok(o)) => return Ok(o),
362                // Query failed because of connection issues
363                Ok(Err(e)) if is_pg_connection_issue(&e) => {
364                    self.reconnect().await?;
365                }
366                // Query failed for some other reason
367                Ok(Err(e)) => {
368                    return Err(e.into());
369                }
370                // Query timed out!
371                Err(_) => {
372                    // Callback with timeout message
373                    (self.config.callback)(PGMessage::timeout(max_dur));
374                    // Cancel the ongoing query
375                    let status = self.inner.cancel_token.cancel_query(self.config.make_tls.clone()).await;
376                    // Callback with cancelled message
377                    (self.config.callback)(PGMessage::cancelled(!status.is_err()));
378                    // Return the timeout error
379                    return Err(PGError::Timeout(max_dur));
380                }
381            }
382        }
383    }
384
385    pub async fn subscribe_notify(
386        &mut self,
387        channels: &[impl AsRef<str> + Send + Sync + 'static],
388        timeout: Option<Duration>,
389    ) -> PGResult<()> {
390
391        if !channels.is_empty() {
392            // Issue the `LISTEN` commands with protection
393            self.wrap_reconnect(timeout, async |client: &mut PGClient| {
394                PGClient::issue_listen(client, channels).await
395            })
396            .await?;
397
398            // Add to our subscriptions
399            self.config.with_subscriptions(channels.iter().map(AsRef::as_ref));
400        }
401        Ok(())
402    }
403
404
405
406    pub async fn unsubscribe_notify(
407        &mut self,
408        channels: &[impl AsRef<str> + Send + Sync + 'static],
409        timeout: Option<Duration>,
410    ) -> PGResult<()> {
411        if !channels.is_empty() {
412            // Issue the `UNLISTEN` commands with protection
413            self.wrap_reconnect(timeout, async move |client: &mut PGClient| {
414                PGClient::issue_unlisten(client, channels).await
415            })
416            .await?;
417
418            // Remove subscriptions
419            self.config.without_subscriptions(channels.iter().map(AsRef::as_ref));
420        }
421        Ok(())
422    }
423
424    ///
425    /// Unsubscribes from all channels.
426    ///
427    pub async fn unsubscribe_notify_all(&mut self, timeout: Option<Duration>) -> PGResult<()> {
428        self.wrap_reconnect(timeout, async |client: &mut PGClient| {
429            // Tell the world we are about to unsubscribe
430            #[cfg(feature = "tracing")]
431            tracing::info!("Unsubscribing from channels: *");
432            // Issue the `UNLISTEN` commands
433            client.simple_query("UNLISTEN *").await?;
434            Ok(())
435        })
436        .await
437    }
438
439
440    /// Like [`Client::execute_raw`].
441    pub async fn execute_raw<P, I, T>(
442        &mut self,
443        statement: &T,
444        params: I,
445        timeout: Option<Duration>,
446    ) -> PGResult<u64>
447    where
448        T: ?Sized + ToStatement + Sync + Send,
449        P: BorrowToSql + Clone + Send + Sync,
450        I: IntoIterator<Item = P> + Sync + Send,
451        I::IntoIter: ExactSizeIterator,
452    {
453        let params: Vec<_> = params.into_iter().collect();
454        self.wrap_reconnect(timeout, async |client: &mut PGClient| {
455            client.execute_raw(statement, params.clone()).await
456        })
457        .await
458    }
459
460    /// Like [`Client::query`].
461    pub async fn query<T>(
462        &mut self,
463        query: &T,
464        params: &[&(dyn ToSql + Sync)],
465        timeout: Option<Duration>,
466    ) -> PGResult<Vec<Row>>
467    where
468        T: ?Sized + ToStatement + Sync + Send,
469    {
470        let params = params.to_vec();
471        self.wrap_reconnect(timeout, async |client: &mut PGClient| {
472            client.query(query, &params).await
473        })
474        .await
475    }
476
477    /// Like [`Client::query_one`].
478    pub async fn query_one<T>(
479        &mut self,
480        statement: &T,
481        params: &[&(dyn ToSql + Sync)],
482        timeout: Option<Duration>,
483    ) -> PGResult<Row>
484    where
485        T: ?Sized + ToStatement + Sync + Send,
486    {
487        let params = params.to_vec();
488        self.wrap_reconnect(timeout, async |client: &mut PGClient| {
489            client.query_one(statement, &params).await
490        })
491        .await
492    }
493
494    /// Like [`Client::query_opt`].
495    pub async fn query_opt<T>(
496        &mut self,
497        statement: &T,
498        params: &[&(dyn ToSql + Sync)],
499        timeout: Option<Duration>,
500    ) -> PGResult<Option<Row>>
501    where
502        T: ?Sized + ToStatement + Sync + Send,
503    {
504        let params = params.to_vec();
505        self.wrap_reconnect(timeout, async |client: &mut PGClient| {
506            client.query_opt(statement, &params).await
507        })
508        .await
509    }
510
511    /// Like [`Client::query_raw`].
512    pub async fn query_raw<T, P, I>(
513        &mut self,
514        statement: &T,
515        params: I,
516        timeout: Option<Duration>,
517    ) -> PGResult<RowStream>
518    where
519        T: ?Sized + ToStatement + Sync + Send,
520        P: BorrowToSql + Clone + Send + Sync,
521        I: IntoIterator<Item = P> + Sync + Send,
522        I::IntoIter: ExactSizeIterator,
523    {
524        let params: Vec<_> = params.into_iter().collect();
525        self.wrap_reconnect(timeout, async |client: &mut PGClient| {
526            client.query_raw(statement, params.clone()).await
527        })
528        .await
529    }
530
531    /// Like [`Client::query_typed`]
532    pub async fn query_typed(
533        &mut self,
534        statement: &str,
535        params: &[(&(dyn ToSql + Sync), Type)],
536        timeout: Option<Duration>,
537    ) -> PGResult<Vec<Row>> {
538        let params = params.to_vec();
539        self.wrap_reconnect(timeout, async |client: &mut PGClient| {
540            client.query_typed(statement, &params).await
541        })
542        .await
543    }
544
545    /// Like [`Client::query_typed_raw`]
546    pub async fn query_typed_raw<P, I>(
547        &mut self,
548        statement: &str,
549        params: I,
550        timeout: Option<Duration>,
551    ) -> PGResult<RowStream>
552    where
553        P: BorrowToSql + Clone + Send + Sync,
554        I: IntoIterator<Item = (P, Type)> + Sync + Send,
555    {
556        let params: Vec<_> = params.into_iter().collect();
557        self.wrap_reconnect(timeout, async |client: &mut PGClient| {
558            client.query_typed_raw(statement, params.clone()).await
559        })
560        .await
561    }
562
563    /// Like [`Client::prepare`].
564    pub async fn prepare(&mut self, query: &str, timeout: Option<Duration>) -> PGResult<Statement> {
565        self.wrap_reconnect(timeout, async |client: &mut PGClient| {
566            client.prepare(query).map_err(Into::into).await
567        })
568        .await
569    }
570
571    /// Like [`Client::prepare_typed`].
572    pub async fn prepare_typed(
573        &mut self,
574        query: &str,
575        parameter_types: &[Type],
576        timeout: Option<Duration>,
577    ) -> PGResult<Statement> {
578        self.wrap_reconnect(timeout, async |client: &mut PGClient| {
579            client.prepare_typed(query, parameter_types).await
580        })
581        .await
582    }
583
584    //
585    /// Similar but not quite the same as [`Client::transaction`].
586    ///
587    /// Executes the closure as a single transaction.
588    /// Commit is automatically called after the closure. If any connection
589    /// issues occur during the transaction then the transaction is rolled
590    /// back (on drop) and retried a new with the new connection subject to
591    /// the maximum number of reconnect attempts.
592    ///
593    pub async fn transaction<F>(&mut self, timeout: Option<Duration>, f: F) -> PGResult<()>
594    where
595        for<'a> F: AsyncFn(&'a mut Transaction) -> Result<(), tokio_postgres::Error>,
596    {
597        self.wrap_reconnect(timeout, async |client: &mut PGClient| {
598            let mut tx = client.transaction().await?;
599            f(&mut tx).await?;
600            tx.commit().await?;
601            Ok(())
602        })
603        .await
604    }
605
606    /// Like [`Client::batch_execute`].
607    pub async fn batch_execute(&mut self, query: &str, timeout: Option<Duration>) -> PGResult<()> {
608        self.wrap_reconnect(timeout, async |client: &mut PGClient| {
609            client.batch_execute(query).await
610        })
611        .await
612    }
613
614    /// Like [`Client::simple_query`].
615    pub async fn simple_query(
616        &mut self,
617        query: &str,
618        timeout: Option<Duration>,
619    ) -> PGResult<Vec<SimpleQueryMessage>> {
620        self.wrap_reconnect(timeout, async |client: &mut PGClient| {
621            client.simple_query(query).await
622        })
623        .await
624    }
625
626    /// Returns a reference to the underlying [`tokio_postgres::Client`].
627    pub fn client(&self) -> &tokio_postgres::Client {
628        &self.inner
629    }
630}
631
632///
633/// Wraps any future in a tokio timeout and maps the Elapsed error to a PGError::Timeout.
634///
635pub async fn wrap_timeout<T>(dur: Duration, fut: impl Future<Output = PGResult<T>>) -> PGResult<T> {
636    match timeout(dur, fut).await {
637        Ok(out) => out,
638        Err(_) => Err(PGError::Timeout(dur)),
639    }
640}
641
642#[cfg(test)]
643mod tests {
644
645    use {
646        super::{PGError, PGMessage, PGRaiseLevel, PGRobustClient, PGRobustClientConfig},
647        insta::*,
648        std::{
649            sync::{Arc, RwLock},
650            time::Duration,
651        },
652        testcontainers::{ImageExt, runners::AsyncRunner},
653        testcontainers_modules::postgres::Postgres,
654    };
655
656    fn sql_for_log_and_notify_test(level: PGRaiseLevel) -> String {
657        format!(
658            r#"
659                    set client_min_messages to '{}';
660                    do $$
661                    begin
662                        raise debug 'this is a DEBUG notification';
663                        notify test, 'test#1';
664                        raise log 'this is a LOG notification';
665                        notify test, 'test#2';
666                        raise info 'this is a INFO notification';
667                        notify test, 'test#3';
668                        raise notice 'this is a NOTICE notification';
669                        notify test, 'test#4';
670                        raise warning 'this is a WARNING notification';
671                        notify test, 'test#5';
672                    end;
673                    $$;
674                "#,
675            level
676        )
677    }
678
679    #[tokio::test]
680    async fn test_integration() {
681        //
682        // --------------------------------------------------------------------
683        // Setup Postgres Server
684        // --------------------------------------------------------------------
685
686        let pg_server = Postgres::default()
687            .with_tag("16.4")
688            .start()
689            .await
690            .expect("could not start postgres server");
691
692        // NOTE: this stuff with Box::leak allows us to create a static string
693        let database_url = format!(
694            "postgres://postgres:postgres@{}:{}/postgres",
695            pg_server.get_host().await.unwrap(),
696            pg_server.get_host_port_ipv4(5432).await.unwrap()
697        );
698
699        // let database_url = "postgres://postgres:postgres@localhost:5432/postgres";
700
701        // --------------------------------------------------------------------
702        // Connect to the server
703        // --------------------------------------------------------------------
704
705        let notices = Arc::new(RwLock::new(Vec::new()));
706        let notices_clone = notices.clone();
707
708        let callback = move |msg: PGMessage| {
709            if let Ok(mut guard) = notices_clone.write() {
710                guard.push(msg.to_string());
711            }
712        };
713
714        let config = PGRobustClientConfig::new(database_url, tokio_postgres::NoTls);
715
716        let mut admin = PGRobustClient::spawn(config.clone())
717            .await
718            .expect("could not create initial client");
719
720        let mut client = PGRobustClient::spawn(config.callback(callback).max_reconnect_attempts(2))
721            .await
722            .expect("could not create initial client");
723
724        // --------------------------------------------------------------------
725        // Subscribe to notify and raise
726        // --------------------------------------------------------------------
727
728        client
729            .subscribe_notify(&["test"], None)
730            .await
731            .expect("could not subscribe");
732
733        let (_, execution_log) = client
734            .with_captured_log(async |client: &mut PGRobustClient<_>| {
735                client
736                    .simple_query(&sql_for_log_and_notify_test(PGRaiseLevel::Debug), None)
737                    .await
738            })
739            .await
740            .expect("could not execute queries on postgres");
741
742        assert_json_snapshot!("subscribed-executionlog", &execution_log, {
743            "[].timestamp" => "<timestamp>",
744            "[].process_id" => "<pid>",
745        });
746
747        assert_snapshot!("subscribed-notify", extract_and_clear_logs(&notices));
748
749        // --------------------------------------------------------------------
750        // Unsubscribe
751        // --------------------------------------------------------------------
752
753        client
754            .unsubscribe_notify(&["test"], None)
755            .await
756            .expect("could not unsubscribe");
757
758        let (_, execution_log) = client
759            .with_captured_log(async |client| {
760                client
761                    .simple_query(&sql_for_log_and_notify_test(PGRaiseLevel::Warning), None)
762                    .await
763            })
764            .await
765            .expect("could not execute queries on postgres");
766
767        assert_json_snapshot!("unsubscribed-executionlog", &execution_log, {
768            "[].timestamp" => "<timestamp>",
769            "[].process_id" => "<pid>",
770        });
771
772        assert_snapshot!("unsubscribed-notify", extract_and_clear_logs(&notices));
773
774        // --------------------------------------------------------------------
775        // Timeout
776        // --------------------------------------------------------------------
777
778        let result = client
779            .simple_query(
780                "
781                    do $$
782                    begin
783                        raise info 'before sleep';
784                        perform pg_sleep(3);
785                        raise info 'after sleep';
786                    end;
787                    $$
788                ",
789                Some(Duration::from_secs(1)),
790            )
791            .await;
792
793        assert!(matches!(result, Err(PGError::Timeout(_))));
794        assert_snapshot!("timeout-messages", extract_and_clear_logs(&notices));
795
796        // --------------------------------------------------------------------
797        // Reconnect (before query)
798        // --------------------------------------------------------------------
799
800        admin.simple_query("select pg_terminate_backend(pid) from pg_stat_activity where pid != pg_backend_pid()", None)
801            .await.expect("could not kill other client");
802
803        let result = client
804            .simple_query(
805                "
806                    do $$
807                    begin
808                        raise info 'before sleep';
809                        perform pg_sleep(1);
810                        raise info 'after sleep';
811                    end;
812                    $$
813                ",
814                Some(Duration::from_secs(10)),
815            )
816            .await;
817
818        assert!(matches!(result, Ok(_)));
819        assert_snapshot!("reconnect-before", extract_and_clear_logs(&notices));
820
821        // --------------------------------------------------------------------
822        // Reconnect (during query)
823        // --------------------------------------------------------------------
824
825        let query = client.simple_query(
826            "
827                    do $$
828                    begin
829                        raise info 'before sleep';
830                        perform pg_sleep(1);
831                        raise info 'after sleep';
832                    end;
833                    $$
834                ",
835            None,
836        );
837
838        let kill_later = 
839            admin.simple_query("
840                select pg_sleep(0.5); 
841                select pg_terminate_backend(pid) from pg_stat_activity where pid != pg_backend_pid()", 
842                None
843            );
844
845        let (_, result) = tokio::join!(kill_later, query);
846
847        assert!(matches!(result, Ok(_)));
848        assert_snapshot!("reconnect-during", extract_and_clear_logs(&notices));
849
850        // --------------------------------------------------------------------
851        // Reconnect (failure)
852        // --------------------------------------------------------------------
853
854        pg_server.stop().await.expect("could not stop server");
855
856        let result = client.simple_query(
857            "
858                do $$
859                begin
860                    raise info 'before sleep';
861                    perform pg_sleep(1);
862                    raise info 'after sleep';
863                end;
864                $$
865            ",
866            None,
867        ).await;
868
869        eprintln!("result: {result:?}");
870        assert!(matches!(result, Err(PGError::FailedToReconnect(2))));
871        assert_snapshot!("reconnect-failure", extract_and_clear_logs(&notices));
872
873
874    }
875
876    fn extract_and_clear_logs(logs: &Arc<RwLock<Vec<String>>>) -> String {
877        let mut guard = logs.write().expect("could not read notices");
878        let emtpy_log = Vec::default();
879        let log = std::mem::replace(&mut *guard, emtpy_log);
880        redact_pids(&redact_timestamps(&log.join("\n")))
881    }
882
883    fn redact_timestamps(text: &str) -> String {
884        use regex::Regex;
885        use std::sync::OnceLock;
886        pub static TIMESTAMP_PATTERN: OnceLock<Regex> = OnceLock::new();
887        let pat = TIMESTAMP_PATTERN.get_or_init(|| {
888            Regex::new(r"\d{4}-\d{2}-\d{2}.?\d{2}:\d{2}:\d{2}(\.\d{3,9})?(Z| UTC|[+-]\d{2}:\d{2})?")
889                .unwrap()
890        });
891        pat.replace_all(text, "<timestamp>").to_string()
892    }
893
894    fn redact_pids(text: &str) -> String {
895        use regex::Regex;
896        use std::sync::OnceLock;
897        pub static TIMESTAMP_PATTERN: OnceLock<Regex> = OnceLock::new();
898        let pat = TIMESTAMP_PATTERN.get_or_init(|| Regex::new(r"pid=\d+").unwrap());
899        pat.replace_all(text, "<pid>").to_string()
900    }
901}