1use anyhow::{anyhow, Context, Error};
2use chrono::Local;
3use log::{info, warn};
4use postgres::{Client, NoTls};
5use std::process::{Command, Output};
6
7use super::db_operations::DatabaseOperations;
8use crate::config::{email::SMTP, postgres::Postgres, Config};
9
10pub struct PostgresDB {
11 config_postgres: Postgres,
12 config_email: Option<SMTP>,
13 origin_client: Option<Client>,
14 target_client: Option<Client>,
15}
16
17impl PostgresDB {
18 pub fn new(conf: Config) -> Self {
19 Self {
20 config_postgres: conf.postgres,
21 config_email: conf.smtp,
22 origin_client: None,
23 target_client: None,
24 }
25 }
26}
27
28impl DatabaseOperations for PostgresDB {
29 fn connect(&mut self) -> Result<(), Error> {
30 info!("Connecting to {}", self.config_postgres.origin_host);
31 self.origin_client = Some(
32 Client::connect(
33 &format!(
34 "host={} user={} password={} port={}",
35 self.config_postgres.origin_host,
36 self.config_postgres.origin_user,
37 self.config_postgres.origin_password,
38 self.config_postgres.origin_port,
39 ),
40 NoTls,
41 )
42 .context("Cannot connect to Postgres Origin Database")?,
43 );
44 if let Some(ref mut origin_client) = self.origin_client {
45 origin_client
46 .simple_query("SELECT 1")
47 .context("Cannot ping the Postgres Origin Database")?;
48 } else {
49 return Err(anyhow::anyhow!("Origin client is not connected"));
50 }
51
52 self.target_client = Some(
53 Client::connect(
54 &format!(
55 "host={} user={} password={} port={}",
56 self.config_postgres.target_host,
57 self.config_postgres.target_user,
58 self.config_postgres.target_password,
59 self.config_postgres.target_port,
60 ),
61 NoTls,
62 )
63 .context("Cannot connect to Postgres Target Database")?,
64 );
65 if let Some(ref mut target_client) = self.target_client {
66 target_client
67 .simple_query("SELECT 1")
68 .context("Cannot ping the Postgres Target Database")?;
69 } else {
70 return Err(anyhow::anyhow!("Target client is not connected"));
71 }
72
73 Ok(())
74 }
75
76 fn close(&mut self) -> Result<(), Error> {
77 info!("Disconnecting from {}", self.config_postgres.origin_host);
78 if let Some(origin_client) = self.origin_client.take() {
79 origin_client.close()?
80 }
81
82 if let Some(target_client) = self.target_client.take() {
83 target_client.close()?
84 }
85
86 Ok(())
87 }
88
89 fn replicate(&mut self) -> Result<(), Error> {
90 let mut output_log = String::new();
91
92 info!(
93 "Replicating all data from {} to {}",
94 self.config_postgres.origin_host, self.config_postgres.target_host
95 );
96
97 if self.origin_client.is_none() {
99 return Err(anyhow::anyhow!("Origin client is not connected"));
100 }
101 if self.target_client.is_none() {
102 return Err(anyhow::anyhow!("Target client is not connected"));
103 }
104
105 let dump_file_path = "/tmp/clean_dump.sql";
106
107 let dump_output: Output = Command::new("pg_dump")
108 .env("PGPASSWORD", &self.config_postgres.origin_password)
109 .arg("-U")
110 .arg(&self.config_postgres.origin_user)
111 .arg("-h")
112 .arg(&self.config_postgres.origin_host)
113 .arg("-p")
114 .arg(&self.config_postgres.origin_port)
115 .arg(&self.config_postgres.origin_database)
116 .arg("-v")
117 .arg("--clean")
118 .arg("-f")
119 .arg(dump_file_path)
120 .output()
121 .context("Failed to start pg_dump process")?;
122
123 if !dump_output.stdout.is_empty() {
124 let stdout = String::from_utf8_lossy(&dump_output.stdout);
125 info!("pg_dump stdout:\n{}", stdout);
126 output_log.push_str(&format!("\npg_dump stdout:\n{}", stdout));
127 }
128 if !dump_output.stderr.is_empty() {
129 let stderr = String::from_utf8_lossy(&dump_output.stderr);
130 warn!("pg_dump stderr:\n{}", stderr);
131 output_log.push_str(&format!("\npg_dump stderr:\n{}", stderr));
132 }
133 if !dump_output.status.success() {
134 return Err(anyhow!("pg_dump process exited with an error"));
135 }
136
137 let dump_msg = format!("Data successfully dumped to {}\n", dump_file_path);
138 info!("{}", dump_msg);
139 output_log.push_str(&dump_msg);
140
141 let restore_output: Output = Command::new("psql")
142 .arg("-h")
143 .arg(&self.config_postgres.target_host)
144 .arg("-p")
145 .arg(&self.config_postgres.target_port)
146 .arg("-U")
147 .arg(&self.config_postgres.target_user)
148 .arg("-d")
149 .arg(&self.config_postgres.origin_database)
150 .arg("-f")
151 .arg(dump_file_path)
152 .env("PGPASSWORD", &self.config_postgres.target_password)
153 .output()
154 .context("Failed to start psql process for restoring")?;
155 if !restore_output.stdout.is_empty() {
156 let stdout = String::from_utf8_lossy(&restore_output.stdout);
157 info!("psql stdout:\n{}", stdout);
158 output_log.push_str(&format!("\npsql stdout:\n{}", stdout));
159 }
160 if !restore_output.stderr.is_empty() {
161 let stderr = String::from_utf8_lossy(&restore_output.stderr);
162 warn!("psql stderr:\n{}", stderr);
163 output_log.push_str(&format!("\npsql stderr:\n{}", stderr));
164 }
165 if !restore_output.status.success() {
166 return Err(anyhow!("psql process exited with an error"));
167 }
168
169 let success_msg = format!(
170 "Successfully replicated all data from {} to {}\n",
171 self.config_postgres.origin_host, self.config_postgres.target_host,
172 );
173 info!("{}", success_msg);
174 output_log.push_str(&success_msg);
175
176 std::fs::remove_file(dump_file_path)?;
177
178 if let Some(smtp_config) = &self.config_email {
179 if smtp_config.enabled {
180 smtp_config
181 .send_email(format!("POSTGRES Replication {}", Local::now()), output_log)?;
182 }
183 }
184 Ok(())
185 }
186}