postgres_notify/
config.rs

1use {
2    crate::PGMessage,
3    std::{collections::BTreeSet, sync::Arc, time::Duration},
4    tokio_postgres::{Socket, tls::MakeTlsConnect},
5};
6
7#[derive(Clone)]
8pub struct PGRobustClientConfig<TLS> {
9    pub(crate) database_url: String,
10    pub(crate) make_tls: TLS,
11    pub(crate) subscriptions: BTreeSet<String>,
12    pub(crate) callback: Arc<dyn Fn(PGMessage) + Send + Sync + 'static>,
13    pub(crate) max_reconnect_attempts: u32,
14    pub(crate) default_timeout: Duration,
15    pub(crate) connect_script: Option<String>,
16    pub(crate) application_name: Option<String>,
17}
18
19impl<TLS> PGRobustClientConfig<TLS>
20where
21    TLS: MakeTlsConnect<Socket> + Clone,
22    <TLS as MakeTlsConnect<Socket>>::Stream: Send + Sync + 'static,
23{
24    pub fn new(database_url: impl Into<String>, make_tls: TLS) -> PGRobustClientConfig<TLS> {
25        PGRobustClientConfig {
26            database_url: database_url.into(),
27            make_tls,
28            subscriptions: BTreeSet::new(),
29            callback: Arc::new(|_| {}),
30            max_reconnect_attempts: 10,
31            default_timeout: Duration::from_secs(3600),
32            connect_script: None,
33            application_name: None,
34        }
35    }
36
37    pub fn subscriptions(
38        mut self,
39        subscriptions: impl IntoIterator<Item = impl Into<String>>,
40    ) -> Self {
41        self.subscriptions
42            .extend(subscriptions.into_iter().map(Into::into));
43        self
44    }
45
46    pub fn with_subscriptions(
47        &mut self,
48        subscriptions: impl IntoIterator<Item = impl Into<String>>,
49    ) {
50        self.subscriptions
51            .extend(subscriptions.into_iter().map(Into::into));
52    }
53
54    pub fn without_subscriptions(
55        &mut self,
56        subscriptions: impl IntoIterator<Item = impl Into<String>>,
57    ) {
58        for s in subscriptions.into_iter().map(Into::into) {
59            self.subscriptions.remove(&s);
60        }
61    }
62
63    pub fn callback(mut self, callback: impl Fn(PGMessage) + Send + Sync + 'static) -> Self {
64        self.callback = Arc::new(callback);
65        self
66    }
67
68    pub fn with_callback(&mut self, callback: impl Fn(PGMessage) + Send + Sync + 'static) {
69        self.callback = Arc::new(callback);
70    }
71
72    pub fn max_reconnect_attempts(mut self, max_reconnect_attempts: u32) -> Self {
73        self.max_reconnect_attempts = max_reconnect_attempts;
74        self
75    }
76
77    pub fn with_max_reconnect_attempts(&mut self, max_reconnect_attempts: Option<u32>) {
78        if let Some(n) = max_reconnect_attempts {
79            self.max_reconnect_attempts = n;
80        }
81    }
82
83    pub fn default_timeout(mut self, default_timeout: Duration) -> Self {
84        self.default_timeout = default_timeout;
85        self
86    }
87
88    pub fn with_default_timeout(&mut self, default_timeout: Option<Duration>) {
89        if let Some(d) = default_timeout {
90            self.default_timeout = d;
91        }
92    }
93
94    pub fn connect_script(mut self, connect_script: impl Into<String>) -> Self {
95        self.connect_script = Some(connect_script.into());
96        self
97    }
98
99    pub fn with_connect_script(&mut self, connect_script: Option<impl Into<String>>) {
100        self.connect_script = connect_script.map(Into::into);
101    }
102
103    pub fn application_name(mut self, application_name: impl Into<String>) -> Self {
104        self.application_name = Some(application_name.into());
105        self
106    }
107
108    pub fn with_application_name(&mut self, application_name: Option<impl Into<String>>) {
109        self.application_name = application_name.map(Into::into);
110    }
111
112    pub fn full_connect_script(&self) -> Option<String> {
113        if self.application_name.is_none()
114            && self.connect_script.is_none()
115            && self.subscriptions.is_empty()
116        {
117            return None;
118        }
119
120        let mut script = String::with_capacity(512);
121        if let Some(name) = self.application_name.as_ref() {
122            script.push_str("SET application_name = '");
123            script.push_str(name);
124            script.push_str("';\n");
125        }
126        if let Some(sql) = self.connect_script.as_ref() {
127            script.push_str(sql);
128        }
129        for sub in &self.subscriptions {
130            script.push_str("LISTEN ");
131            script.push_str(sub);
132            script.push_str(";\n");
133        }
134        Some(script)
135    }
136}