postgres_notify/
config.rs1use {
2 crate::PGMessage,
3 std::{collections::BTreeSet, sync::Arc, time::Duration},
4 tokio_postgres::{Socket, tls::MakeTlsConnect},
5};
6
7#[derive(Clone)]
8pub struct PGRobustClientConfig<TLS> {
9 pub(crate) database_url: String,
10 pub(crate) make_tls: TLS,
11 pub(crate) subscriptions: BTreeSet<String>,
12 pub(crate) callback: Arc<dyn Fn(PGMessage) + Send + Sync + 'static>,
13 pub(crate) max_reconnect_attempts: u32,
14 pub(crate) default_timeout: Duration,
15 pub(crate) connect_script: Option<String>,
16 pub(crate) application_name: Option<String>,
17}
18
19impl<TLS> PGRobustClientConfig<TLS>
20where
21 TLS: MakeTlsConnect<Socket> + Clone,
22 <TLS as MakeTlsConnect<Socket>>::Stream: Send + Sync + 'static,
23{
24 pub fn new(database_url: impl Into<String>, make_tls: TLS) -> PGRobustClientConfig<TLS> {
25 PGRobustClientConfig {
26 database_url: database_url.into(),
27 make_tls,
28 subscriptions: BTreeSet::new(),
29 callback: Arc::new(|_| {}),
30 max_reconnect_attempts: 10,
31 default_timeout: Duration::from_secs(3600),
32 connect_script: None,
33 application_name: None,
34 }
35 }
36
37 pub fn subscriptions(
38 mut self,
39 subscriptions: impl IntoIterator<Item = impl Into<String>>,
40 ) -> Self {
41 self.subscriptions
42 .extend(subscriptions.into_iter().map(Into::into));
43 self
44 }
45
46 pub fn with_subscriptions(
47 &mut self,
48 subscriptions: impl IntoIterator<Item = impl Into<String>>,
49 ) {
50 self.subscriptions
51 .extend(subscriptions.into_iter().map(Into::into));
52 }
53
54 pub fn without_subscriptions(
55 &mut self,
56 subscriptions: impl IntoIterator<Item = impl Into<String>>,
57 ) {
58 for s in subscriptions.into_iter().map(Into::into) {
59 self.subscriptions.remove(&s);
60 }
61 }
62
63 pub fn callback(mut self, callback: impl Fn(PGMessage) + Send + Sync + 'static) -> Self {
64 self.callback = Arc::new(callback);
65 self
66 }
67
68 pub fn with_callback(&mut self, callback: impl Fn(PGMessage) + Send + Sync + 'static) {
69 self.callback = Arc::new(callback);
70 }
71
72 pub fn max_reconnect_attempts(mut self, max_reconnect_attempts: u32) -> Self {
73 self.max_reconnect_attempts = max_reconnect_attempts;
74 self
75 }
76
77 pub fn with_max_reconnect_attempts(&mut self, max_reconnect_attempts: Option<u32>) {
78 if let Some(n) = max_reconnect_attempts {
79 self.max_reconnect_attempts = n;
80 }
81 }
82
83 pub fn default_timeout(mut self, default_timeout: Duration) -> Self {
84 self.default_timeout = default_timeout;
85 self
86 }
87
88 pub fn with_default_timeout(&mut self, default_timeout: Option<Duration>) {
89 if let Some(d) = default_timeout {
90 self.default_timeout = d;
91 }
92 }
93
94 pub fn connect_script(mut self, connect_script: impl Into<String>) -> Self {
102 self.connect_script = Some(connect_script.into());
103 self
104 }
105
106 pub fn with_connect_script(&mut self, connect_script: Option<impl Into<String>>) {
114 self.connect_script = connect_script.map(Into::into);
115 }
116
117 pub fn application_name(mut self, application_name: impl Into<String>) -> Self {
125 self.application_name = Some(application_name.into());
126 self
127 }
128
129 pub fn with_application_name(&mut self, application_name: Option<impl Into<String>>) {
137 self.application_name = application_name.map(Into::into);
138 }
139
140 pub fn full_connect_script(&self) -> Option<String> {
141 if self.application_name.is_none()
142 && self.connect_script.is_none()
143 && self.subscriptions.is_empty()
144 {
145 return None;
146 }
147
148 let mut script = String::with_capacity(512);
149 if let Some(name) = self.application_name.as_ref() {
150 script.push_str("SET application_name = '");
151 script.push_str(name);
152 script.push_str("';\n");
153 }
154 if let Some(sql) = self.connect_script.as_ref() {
155 script.push_str(sql);
156 }
157 for sub in &self.subscriptions {
158 script.push_str("LISTEN ");
159 script.push_str(sub);
160 script.push_str(";\n");
161 }
162 Some(script)
163 }
164}