rust_pgdatadiff/diff/
diff_ops.rs1use crate::diff::db_clients::DBClients;
2use anyhow::Result;
3use colored::Colorize;
4use deadpool_postgres::tokio_postgres::NoTls;
5use deadpool_postgres::{Config, ManagerConfig, PoolConfig, RecyclingMethod, Runtime};
6
7use crate::diff::diff_output::DiffOutput;
8use tracing::info;
9
10use crate::diff::diff_payload::DiffPayload;
11use crate::diff::sequence::query::sequence_query_executor::{
12 SequenceDualSourceQueryExecutorImpl, SequenceSingleSourceQueryExecutorImpl,
13};
14
15use crate::diff::sequence::sequence_differ::SequenceDiffer;
16use crate::diff::table::query::table_query_executor::{
17 TableDualSourceQueryExecutorImpl, TableSingleSourceQueryExecutorImpl,
18};
19
20use crate::diff::table::table_differ::TableDiffer;
21
22pub struct Differ;
26
27impl Differ {
28 pub async fn diff_dbs(diff_payload: DiffPayload) -> Result<Vec<DiffOutput>> {
29 info!("{}", "Initiating DB diffing…".bold().blue());
30
31 let tls_connector = if diff_payload.any_accept_invalid_certs() {
32 use native_tls::TlsConnector;
33 use postgres_native_tls::MakeTlsConnector;
34
35 let tls_connector = TlsConnector::builder()
36 .danger_accept_invalid_certs(true)
37 .build()
38 .unwrap();
39
40 Some(MakeTlsConnector::new(tls_connector))
41 } else {
42 None
43 };
44
45 let mut first_cfg = Config::new();
46 first_cfg.url = Some(diff_payload.first_db().to_string());
47 first_cfg.application_name = Some(String::from("rust-pgdatadiff"));
48 first_cfg.pool = Some(PoolConfig::new(diff_payload.max_connections() as usize));
49 first_cfg.manager = Some(ManagerConfig {
50 recycling_method: RecyclingMethod::Fast,
51 });
52
53 let mut second_cfg = Config::new();
54 second_cfg.url = Some(diff_payload.second_db().to_string());
55 second_cfg.application_name = Some(String::from("rust-pgdatadiff"));
56 second_cfg.pool = Some(PoolConfig::new(diff_payload.max_connections() as usize));
57 second_cfg.manager = Some(ManagerConfig {
58 recycling_method: RecyclingMethod::Fast,
59 });
60
61 info!("{}", "Connected to first DB".magenta().bold());
62 let first_db_pool = if diff_payload.accept_invalid_certs_first_db() {
63 first_cfg
64 .create_pool(Some(Runtime::Tokio1), tls_connector.clone().unwrap())
65 .unwrap()
66 } else {
67 first_cfg.create_pool(Some(Runtime::Tokio1), NoTls).unwrap()
68 };
69
70 info!("{}", "Connected to second DB".magenta().bold());
71 let second_db_pool = if diff_payload.accept_invalid_certs_second_db() {
72 second_cfg
73 .create_pool(Some(Runtime::Tokio1), tls_connector.unwrap())
74 .unwrap()
75 } else {
76 second_cfg
77 .create_pool(Some(Runtime::Tokio1), NoTls)
78 .unwrap()
79 };
80
81 let db_clients = DBClients::new(first_db_pool, second_db_pool);
82
83 info!("{}", "Going for diff…".green().bold());
84
85 let single_table_query_executor =
87 TableSingleSourceQueryExecutorImpl::new(db_clients.first_db_pool());
88
89 let dual_source_table_query_executor = TableDualSourceQueryExecutorImpl::new(
91 db_clients.first_db_pool(),
92 db_clients.second_db_pool(),
93 );
94
95 let table_differ = TableDiffer::new(
97 single_table_query_executor,
98 dual_source_table_query_executor,
99 );
100
101 let single_sequence_query_executor =
103 SequenceSingleSourceQueryExecutorImpl::new(db_clients.first_db_pool());
104
105 let dual_source_sequence_query_executor = SequenceDualSourceQueryExecutorImpl::new(
107 db_clients.first_db_pool(),
108 db_clients.second_db_pool(),
109 );
110
111 let sequence_differ = SequenceDiffer::new(
113 single_sequence_query_executor,
114 dual_source_sequence_query_executor,
115 );
116
117 let diff_output = if diff_payload.only_tables() {
119 let original_table_diff = table_differ.diff_all_table_data(&diff_payload).await?;
121 original_table_diff.into_iter().collect::<Vec<DiffOutput>>()
122 } else if diff_payload.only_sequences() {
123 let original_sequence_diff = sequence_differ
125 .diff_all_sequences(diff_payload.schema_name().into())
126 .await?;
127 original_sequence_diff
128 .into_iter()
129 .collect::<Vec<DiffOutput>>()
130 } else {
131 let original_sequence_diff =
133 sequence_differ.diff_all_sequences(diff_payload.schema_name().into());
134
135 let original_table_diff = table_differ.diff_all_table_data(&diff_payload);
136
137 let (table_diff, sequence_diff) =
138 futures::future::join(original_table_diff, original_sequence_diff).await;
139
140 let table_diff: Vec<DiffOutput> = table_diff.unwrap();
141 let sequence_diff: Vec<DiffOutput> = sequence_diff.unwrap();
142
143 table_diff
144 .into_iter()
145 .chain(sequence_diff.into_iter())
146 .collect::<Vec<DiffOutput>>()
147 };
148
149 Ok(diff_output)
150 }
151}