rust_pgdatadiff/diff/
diff_ops.rs

1use crate::diff::db_clients::DBClients;
2use anyhow::Result;
3use colored::Colorize;
4use deadpool_postgres::tokio_postgres::NoTls;
5use deadpool_postgres::{Config, ManagerConfig, PoolConfig, RecyclingMethod, Runtime};
6
7use crate::diff::diff_output::DiffOutput;
8use tracing::info;
9
10use crate::diff::diff_payload::DiffPayload;
11use crate::diff::sequence::query::sequence_query_executor::{
12    SequenceDualSourceQueryExecutorImpl, SequenceSingleSourceQueryExecutorImpl,
13};
14
15use crate::diff::sequence::sequence_differ::SequenceDiffer;
16use crate::diff::table::query::table_query_executor::{
17    TableDualSourceQueryExecutorImpl, TableSingleSourceQueryExecutorImpl,
18};
19
20use crate::diff::table::table_differ::TableDiffer;
21
22/// The `Differ` struct represents a database differ.
23///
24/// It provides a method `diff_dbs` that performs the diffing operation between two databases.
25pub struct Differ;
26
27impl Differ {
28    pub async fn diff_dbs(diff_payload: DiffPayload) -> Result<Vec<DiffOutput>> {
29        info!("{}", "Initiating DB diffing…".bold().blue());
30
31        let tls_connector = if diff_payload.any_accept_invalid_certs() {
32            use native_tls::TlsConnector;
33            use postgres_native_tls::MakeTlsConnector;
34
35            let tls_connector = TlsConnector::builder()
36                .danger_accept_invalid_certs(true)
37                .build()
38                .unwrap();
39
40            Some(MakeTlsConnector::new(tls_connector))
41        } else {
42            None
43        };
44
45        let mut first_cfg = Config::new();
46        first_cfg.url = Some(diff_payload.first_db().to_string());
47        first_cfg.application_name = Some(String::from("rust-pgdatadiff"));
48        first_cfg.pool = Some(PoolConfig::new(diff_payload.max_connections() as usize));
49        first_cfg.manager = Some(ManagerConfig {
50            recycling_method: RecyclingMethod::Fast,
51        });
52
53        let mut second_cfg = Config::new();
54        second_cfg.url = Some(diff_payload.second_db().to_string());
55        second_cfg.application_name = Some(String::from("rust-pgdatadiff"));
56        second_cfg.pool = Some(PoolConfig::new(diff_payload.max_connections() as usize));
57        second_cfg.manager = Some(ManagerConfig {
58            recycling_method: RecyclingMethod::Fast,
59        });
60
61        info!("{}", "Connected to first DB".magenta().bold());
62        let first_db_pool = if diff_payload.accept_invalid_certs_first_db() {
63            first_cfg
64                .create_pool(Some(Runtime::Tokio1), tls_connector.clone().unwrap())
65                .unwrap()
66        } else {
67            first_cfg.create_pool(Some(Runtime::Tokio1), NoTls).unwrap()
68        };
69
70        info!("{}", "Connected to second DB".magenta().bold());
71        let second_db_pool = if diff_payload.accept_invalid_certs_second_db() {
72            second_cfg
73                .create_pool(Some(Runtime::Tokio1), tls_connector.unwrap())
74                .unwrap()
75        } else {
76            second_cfg
77                .create_pool(Some(Runtime::Tokio1), NoTls)
78                .unwrap()
79        };
80
81        let db_clients = DBClients::new(first_db_pool, second_db_pool);
82
83        info!("{}", "Going for diff…".green().bold());
84
85        // Create a single source query executor for tables
86        let single_table_query_executor =
87            TableSingleSourceQueryExecutorImpl::new(db_clients.first_db_pool());
88
89        // Create a dual source query executor for tables
90        let dual_source_table_query_executor = TableDualSourceQueryExecutorImpl::new(
91            db_clients.first_db_pool(),
92            db_clients.second_db_pool(),
93        );
94
95        // Create a table differ
96        let table_differ = TableDiffer::new(
97            single_table_query_executor,
98            dual_source_table_query_executor,
99        );
100
101        // Create a single source query executor for sequences
102        let single_sequence_query_executor =
103            SequenceSingleSourceQueryExecutorImpl::new(db_clients.first_db_pool());
104
105        // Create a dual source query executor for sequences
106        let dual_source_sequence_query_executor = SequenceDualSourceQueryExecutorImpl::new(
107            db_clients.first_db_pool(),
108            db_clients.second_db_pool(),
109        );
110
111        // Create a sequence differ
112        let sequence_differ = SequenceDiffer::new(
113            single_sequence_query_executor,
114            dual_source_sequence_query_executor,
115        );
116
117        // Prepare diff output
118        let diff_output = if diff_payload.only_tables() {
119            // Load only tables diff
120            let original_table_diff = table_differ.diff_all_table_data(&diff_payload).await?;
121            original_table_diff.into_iter().collect::<Vec<DiffOutput>>()
122        } else if diff_payload.only_sequences() {
123            // Load only sequences diff
124            let original_sequence_diff = sequence_differ
125                .diff_all_sequences(diff_payload.schema_name().into())
126                .await?;
127            original_sequence_diff
128                .into_iter()
129                .collect::<Vec<DiffOutput>>()
130        } else {
131            // Load both tables and sequences diff
132            let original_sequence_diff =
133                sequence_differ.diff_all_sequences(diff_payload.schema_name().into());
134
135            let original_table_diff = table_differ.diff_all_table_data(&diff_payload);
136
137            let (table_diff, sequence_diff) =
138                futures::future::join(original_table_diff, original_sequence_diff).await;
139
140            let table_diff: Vec<DiffOutput> = table_diff.unwrap();
141            let sequence_diff: Vec<DiffOutput> = sequence_diff.unwrap();
142
143            table_diff
144                .into_iter()
145                .chain(sequence_diff.into_iter())
146                .collect::<Vec<DiffOutput>>()
147        };
148
149        Ok(diff_output)
150    }
151}