cnc_rs/db/
postgresdb.rs

1use anyhow::{anyhow, Context, Error};
2use chrono::Local;
3use log::{info, warn};
4use postgres::{Client, NoTls};
5use std::process::{Command, Output};
6
7use super::db_operations::DatabaseOperations;
8use crate::config::{email::SMTP, postgres::Postgres, Config};
9
10pub struct PostgresDB {
11    config_postgres: Postgres,
12    config_email: Option<SMTP>,
13    origin_client: Option<Client>,
14    target_client: Option<Client>,
15}
16
17impl PostgresDB {
18    pub fn new(conf: Config) -> Self {
19        Self {
20            config_postgres: conf.postgres,
21            config_email: conf.smtp,
22            origin_client: None,
23            target_client: None,
24        }
25    }
26}
27
28impl DatabaseOperations for PostgresDB {
29    fn connect(&mut self) -> Result<(), Error> {
30        info!("Connecting to {}", self.config_postgres.origin_host);
31        self.origin_client = Some(
32            Client::connect(
33                &format!(
34                    "host={} user={} password={} port={}",
35                    self.config_postgres.origin_host,
36                    self.config_postgres.origin_user,
37                    self.config_postgres.origin_password,
38                    self.config_postgres.origin_port,
39                ),
40                NoTls,
41            )
42            .context("Cannot connect to Postgres Origin Database")?,
43        );
44        if let Some(ref mut origin_client) = self.origin_client {
45            origin_client
46                .simple_query("SELECT 1")
47                .context("Cannot ping the Postgres Origin Database")?;
48        } else {
49            return Err(anyhow::anyhow!("Origin client is not connected"));
50        }
51
52        self.target_client = Some(
53            Client::connect(
54                &format!(
55                    "host={} user={} password={} port={}",
56                    self.config_postgres.target_host,
57                    self.config_postgres.target_user,
58                    self.config_postgres.target_password,
59                    self.config_postgres.target_port,
60                ),
61                NoTls,
62            )
63            .context("Cannot connect to Postgres Target Database")?,
64        );
65        if let Some(ref mut target_client) = self.target_client {
66            target_client
67                .simple_query("SELECT 1")
68                .context("Cannot ping the Postgres Target Database")?;
69        } else {
70            return Err(anyhow::anyhow!("Target client is not connected"));
71        }
72
73        Ok(())
74    }
75
76    fn close(&mut self) -> Result<(), Error> {
77        info!("Disconnecting from {}", self.config_postgres.origin_host);
78        if let Some(origin_client) = self.origin_client.take() {
79            origin_client.close()?
80        }
81
82        if let Some(target_client) = self.target_client.take() {
83            target_client.close()?
84        }
85
86        Ok(())
87    }
88
89    fn replicate(&mut self) -> Result<(), Error> {
90        let mut output_log = String::new();
91
92        info!(
93            "Replicating all data from {} to {}",
94            self.config_postgres.origin_host, self.config_postgres.target_host
95        );
96
97        // Ensure the clients are connected
98        if self.origin_client.is_none() {
99            return Err(anyhow::anyhow!("Origin client is not connected"));
100        }
101        if self.target_client.is_none() {
102            return Err(anyhow::anyhow!("Target client is not connected"));
103        }
104
105        let dump_file_path = "/tmp/clean_dump.sql";
106
107        let dump_output: Output = Command::new("pg_dump")
108            .env("PGPASSWORD", &self.config_postgres.origin_password)
109            .arg("-U")
110            .arg(&self.config_postgres.origin_user)
111            .arg("-h")
112            .arg(&self.config_postgres.origin_host)
113            .arg("-p")
114            .arg(&self.config_postgres.origin_port)
115            .arg(&self.config_postgres.origin_database)
116            .arg("-v")
117            .arg("--clean")
118            .arg("-f")
119            .arg(dump_file_path)
120            .output()
121            .context("Failed to start pg_dump process")?;
122
123        if !dump_output.stdout.is_empty() {
124            let stdout = String::from_utf8_lossy(&dump_output.stdout);
125            info!("pg_dump stdout:\n{}", stdout);
126            output_log.push_str(&format!("\npg_dump stdout:\n{}", stdout));
127        }
128        if !dump_output.stderr.is_empty() {
129            let stderr = String::from_utf8_lossy(&dump_output.stderr);
130            warn!("pg_dump stderr:\n{}", stderr);
131            output_log.push_str(&format!("\npg_dump stderr:\n{}", stderr));
132        }
133        if !dump_output.status.success() {
134            return Err(anyhow!("pg_dump process exited with an error"));
135        }
136
137        let dump_msg = format!("Data successfully dumped to {}\n", dump_file_path);
138        info!("{}", dump_msg);
139        output_log.push_str(&dump_msg);
140
141        let restore_output: Output = Command::new("psql")
142            .arg("-h")
143            .arg(&self.config_postgres.target_host)
144            .arg("-p")
145            .arg(&self.config_postgres.target_port)
146            .arg("-U")
147            .arg(&self.config_postgres.target_user)
148            .arg("-d")
149            .arg(&self.config_postgres.origin_database)
150            .arg("-f")
151            .arg(dump_file_path)
152            .env("PGPASSWORD", &self.config_postgres.target_password)
153            .output()
154            .context("Failed to start psql process for restoring")?;
155        if !restore_output.stdout.is_empty() {
156            let stdout = String::from_utf8_lossy(&restore_output.stdout);
157            info!("psql stdout:\n{}", stdout);
158            output_log.push_str(&format!("\npsql stdout:\n{}", stdout));
159        }
160        if !restore_output.stderr.is_empty() {
161            let stderr = String::from_utf8_lossy(&restore_output.stderr);
162            warn!("psql stderr:\n{}", stderr);
163            output_log.push_str(&format!("\npsql stderr:\n{}", stderr));
164        }
165        if !restore_output.status.success() {
166            return Err(anyhow!("psql process exited with an error"));
167        }
168
169        let success_msg = format!(
170            "Successfully replicated all data from {} to {}\n",
171            self.config_postgres.origin_host, self.config_postgres.target_host,
172        );
173        info!("{}", success_msg);
174        output_log.push_str(&success_msg);
175
176        std::fs::remove_file(dump_file_path)?;
177
178        if let Some(smtp_config) = &self.config_email {
179            if smtp_config.enabled {
180                smtp_config
181                    .send_email(format!("POSTGRES Replication {}", Local::now()), output_log)?;
182            }
183        }
184        Ok(())
185    }
186}