postgresql_commands/
pg_recvlogical.rs

1use crate::Settings;
2use crate::traits::CommandBuilder;
3use std::convert::AsRef;
4use std::ffi::{OsStr, OsString};
5use std::path::PathBuf;
6
7/// `pg_recvlogical` controls `PostgreSQL` logical decoding streams.
8#[derive(Clone, Debug, Default)]
9pub struct PgRecvLogicalBuilder {
10    program_dir: Option<PathBuf>,
11    envs: Vec<(OsString, OsString)>,
12    create_slot: bool,
13    drop_slot: bool,
14    start: bool,
15    endpos: Option<OsString>,
16    file: Option<OsString>,
17    fsync_interval: Option<OsString>,
18    if_not_exists: bool,
19    startpos: Option<OsString>,
20    no_loop: bool,
21    option: Option<OsString>,
22    plugin: Option<OsString>,
23    status_interval: Option<OsString>,
24    slot: Option<OsString>,
25    two_phase: bool,
26    verbose: bool,
27    version: bool,
28    help: bool,
29    dbname: Option<OsString>,
30    host: Option<OsString>,
31    port: Option<u16>,
32    username: Option<OsString>,
33    no_password: bool,
34    password: bool,
35    pg_password: Option<OsString>,
36}
37
38impl PgRecvLogicalBuilder {
39    /// Create a new [`PgRecvLogicalBuilder`]
40    #[must_use]
41    pub fn new() -> Self {
42        Self::default()
43    }
44
45    /// Create a new [`PgRecvLogicalBuilder`] from [Settings]
46    pub fn from(settings: &dyn Settings) -> Self {
47        Self::new()
48            .program_dir(settings.get_binary_dir())
49            .host(settings.get_host())
50            .port(settings.get_port())
51            .username(settings.get_username())
52            .pg_password(settings.get_password())
53    }
54
55    /// Location of the program binary
56    #[must_use]
57    pub fn program_dir<P: Into<PathBuf>>(mut self, path: P) -> Self {
58        self.program_dir = Some(path.into());
59        self
60    }
61
62    /// create a new replication slot
63    #[must_use]
64    pub fn create_slot(mut self) -> Self {
65        self.create_slot = true;
66        self
67    }
68
69    /// drop the replication slot
70    #[must_use]
71    pub fn drop_slot(mut self) -> Self {
72        self.drop_slot = true;
73        self
74    }
75
76    /// start streaming in a replication slot
77    #[must_use]
78    pub fn start(mut self) -> Self {
79        self.start = true;
80        self
81    }
82
83    /// exit after receiving the specified LSN
84    #[must_use]
85    pub fn endpos<S: AsRef<OsStr>>(mut self, endpos: S) -> Self {
86        self.endpos = Some(endpos.as_ref().to_os_string());
87        self
88    }
89
90    /// receive log into this file, - for stdout
91    #[must_use]
92    pub fn file<S: AsRef<OsStr>>(mut self, file: S) -> Self {
93        self.file = Some(file.as_ref().to_os_string());
94        self
95    }
96
97    /// time between fsyncs to the output file (default: 10)
98    #[must_use]
99    pub fn fsync_interval<S: AsRef<OsStr>>(mut self, fsync_interval: S) -> Self {
100        self.fsync_interval = Some(fsync_interval.as_ref().to_os_string());
101        self
102    }
103
104    /// do not error if slot already exists when creating a slot
105    #[must_use]
106    pub fn if_not_exists(mut self) -> Self {
107        self.if_not_exists = true;
108        self
109    }
110
111    /// where in an existing slot should the streaming start
112    #[must_use]
113    pub fn startpos<S: AsRef<OsStr>>(mut self, startpos: S) -> Self {
114        self.startpos = Some(startpos.as_ref().to_os_string());
115        self
116    }
117
118    /// do not loop on connection lost
119    #[must_use]
120    pub fn no_loop(mut self) -> Self {
121        self.no_loop = true;
122        self
123    }
124
125    /// pass option NAME with optional value VALUE to the output plugin
126    #[must_use]
127    pub fn option<S: AsRef<OsStr>>(mut self, option: S) -> Self {
128        self.option = Some(option.as_ref().to_os_string());
129        self
130    }
131
132    /// use output plugin PLUGIN (default: `test_decoding`)
133    #[must_use]
134    pub fn plugin<S: AsRef<OsStr>>(mut self, plugin: S) -> Self {
135        self.plugin = Some(plugin.as_ref().to_os_string());
136        self
137    }
138
139    /// time between status packets sent to server (default: 10)
140    #[must_use]
141    pub fn status_interval<S: AsRef<OsStr>>(mut self, status_interval: S) -> Self {
142        self.status_interval = Some(status_interval.as_ref().to_os_string());
143        self
144    }
145
146    /// name of the logical replication slot
147    #[must_use]
148    pub fn slot<S: AsRef<OsStr>>(mut self, slot: S) -> Self {
149        self.slot = Some(slot.as_ref().to_os_string());
150        self
151    }
152
153    /// enable decoding of prepared transactions when creating a slot
154    #[must_use]
155    pub fn two_phase(mut self) -> Self {
156        self.two_phase = true;
157        self
158    }
159
160    /// output verbose messages
161    #[must_use]
162    pub fn verbose(mut self) -> Self {
163        self.verbose = true;
164        self
165    }
166
167    /// output version information, then exit
168    #[must_use]
169    pub fn version(mut self) -> Self {
170        self.version = true;
171        self
172    }
173
174    /// show help, then exit
175    #[must_use]
176    pub fn help(mut self) -> Self {
177        self.help = true;
178        self
179    }
180
181    /// database to connect to
182    #[must_use]
183    pub fn dbname<S: AsRef<OsStr>>(mut self, dbname: S) -> Self {
184        self.dbname = Some(dbname.as_ref().to_os_string());
185        self
186    }
187
188    /// database server host or socket directory
189    #[must_use]
190    pub fn host<S: AsRef<OsStr>>(mut self, host: S) -> Self {
191        self.host = Some(host.as_ref().to_os_string());
192        self
193    }
194
195    /// database server port number
196    #[must_use]
197    pub fn port(mut self, port: u16) -> Self {
198        self.port = Some(port);
199        self
200    }
201
202    /// connect as specified database user
203    #[must_use]
204    pub fn username<S: AsRef<OsStr>>(mut self, username: S) -> Self {
205        self.username = Some(username.as_ref().to_os_string());
206        self
207    }
208
209    /// never prompt for password
210    #[must_use]
211    pub fn no_password(mut self) -> Self {
212        self.no_password = true;
213        self
214    }
215
216    /// force password prompt (should happen automatically)
217    #[must_use]
218    pub fn password(mut self) -> Self {
219        self.password = true;
220        self
221    }
222
223    /// user password
224    #[must_use]
225    pub fn pg_password<S: AsRef<OsStr>>(mut self, pg_password: S) -> Self {
226        self.pg_password = Some(pg_password.as_ref().to_os_string());
227        self
228    }
229}
230
231impl CommandBuilder for PgRecvLogicalBuilder {
232    /// Get the program name
233    fn get_program(&self) -> &'static OsStr {
234        "pg_recvlogical".as_ref()
235    }
236
237    /// Location of the program binary
238    fn get_program_dir(&self) -> &Option<PathBuf> {
239        &self.program_dir
240    }
241
242    /// Get the arguments for the command
243    fn get_args(&self) -> Vec<OsString> {
244        let mut args: Vec<OsString> = Vec::new();
245
246        if self.create_slot {
247            args.push("--create-slot".into());
248        }
249
250        if self.drop_slot {
251            args.push("--drop-slot".into());
252        }
253
254        if self.start {
255            args.push("--start".into());
256        }
257
258        if let Some(endpos) = &self.endpos {
259            args.push("--endpos".into());
260            args.push(endpos.into());
261        }
262
263        if let Some(file) = &self.file {
264            args.push("--file".into());
265            args.push(file.into());
266        }
267
268        if let Some(fsync_interval) = &self.fsync_interval {
269            args.push("--fsync-interval".into());
270            args.push(fsync_interval.into());
271        }
272
273        if self.if_not_exists {
274            args.push("--if-not-exists".into());
275        }
276
277        if let Some(startpos) = &self.startpos {
278            args.push("--startpos".into());
279            args.push(startpos.into());
280        }
281
282        if self.no_loop {
283            args.push("--no-loop".into());
284        }
285
286        if let Some(option) = &self.option {
287            args.push("--option".into());
288            args.push(option.into());
289        }
290
291        if let Some(plugin) = &self.plugin {
292            args.push("--plugin".into());
293            args.push(plugin.into());
294        }
295
296        if let Some(status_interval) = &self.status_interval {
297            args.push("--status-interval".into());
298            args.push(status_interval.into());
299        }
300
301        if let Some(slot) = &self.slot {
302            args.push("--slot".into());
303            args.push(slot.into());
304        }
305
306        if self.two_phase {
307            args.push("--two-phase".into());
308        }
309
310        if self.verbose {
311            args.push("--verbose".into());
312        }
313
314        if self.version {
315            args.push("--version".into());
316        }
317
318        if self.help {
319            args.push("--help".into());
320        }
321
322        if let Some(dbname) = &self.dbname {
323            args.push("--dbname".into());
324            args.push(dbname.into());
325        }
326
327        if let Some(host) = &self.host {
328            args.push("--host".into());
329            args.push(host.into());
330        }
331
332        if let Some(port) = &self.port {
333            args.push("--port".into());
334            args.push(port.to_string().into());
335        }
336
337        if let Some(username) = &self.username {
338            args.push("--username".into());
339            args.push(username.into());
340        }
341
342        if self.no_password {
343            args.push("--no-password".into());
344        }
345
346        if self.password {
347            args.push("--password".into());
348        }
349
350        args
351    }
352
353    /// Get the environment variables for the command
354    fn get_envs(&self) -> Vec<(OsString, OsString)> {
355        let mut envs: Vec<(OsString, OsString)> = self.envs.clone();
356
357        if let Some(password) = &self.pg_password {
358            envs.push(("PGPASSWORD".into(), password.into()));
359        }
360
361        envs
362    }
363
364    /// Set an environment variable for the command
365    fn env<S: AsRef<OsStr>>(mut self, key: S, value: S) -> Self {
366        self.envs
367            .push((key.as_ref().to_os_string(), value.as_ref().to_os_string()));
368        self
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375    use crate::TestSettings;
376    use crate::traits::CommandToString;
377    use test_log::test;
378
379    #[test]
380    fn test_builder_new() {
381        let command = PgRecvLogicalBuilder::new().program_dir(".").build();
382        assert_eq!(
383            PathBuf::from(".").join("pg_recvlogical"),
384            PathBuf::from(command.to_command_string().replace('"', ""))
385        );
386    }
387
388    #[test]
389    fn test_builder_from() {
390        let command = PgRecvLogicalBuilder::from(&TestSettings).build();
391        #[cfg(not(target_os = "windows"))]
392        let command_prefix = r#"PGPASSWORD="password" "./pg_recvlogical" "#;
393        #[cfg(target_os = "windows")]
394        let command_prefix = r#"".\\pg_recvlogical" "#;
395
396        assert_eq!(
397            format!(
398                r#"{command_prefix}"--host" "localhost" "--port" "5432" "--username" "postgres""#
399            ),
400            command.to_command_string()
401        );
402    }
403
404    #[test]
405    fn test_builder() {
406        let command = PgRecvLogicalBuilder::new()
407            .env("PGDATABASE", "database")
408            .create_slot()
409            .drop_slot()
410            .start()
411            .endpos("endpos")
412            .file("file")
413            .fsync_interval("fsync_interval")
414            .if_not_exists()
415            .startpos("startpos")
416            .no_loop()
417            .option("option")
418            .plugin("plugin")
419            .status_interval("status_interval")
420            .slot("slot")
421            .two_phase()
422            .verbose()
423            .version()
424            .help()
425            .dbname("dbname")
426            .host("localhost")
427            .port(5432)
428            .username("username")
429            .no_password()
430            .password()
431            .pg_password("password")
432            .build();
433        #[cfg(not(target_os = "windows"))]
434        let command_prefix = r#"PGDATABASE="database" PGPASSWORD="password" "#;
435        #[cfg(target_os = "windows")]
436        let command_prefix = String::new();
437
438        assert_eq!(
439            format!(
440                r#"{command_prefix}"pg_recvlogical" "--create-slot" "--drop-slot" "--start" "--endpos" "endpos" "--file" "file" "--fsync-interval" "fsync_interval" "--if-not-exists" "--startpos" "startpos" "--no-loop" "--option" "option" "--plugin" "plugin" "--status-interval" "status_interval" "--slot" "slot" "--two-phase" "--verbose" "--version" "--help" "--dbname" "dbname" "--host" "localhost" "--port" "5432" "--username" "username" "--no-password" "--password""#
441            ),
442            command.to_command_string()
443        );
444    }
445}