rust_pgdatadiff/diff/table/
table_differ.rs

1use crate::diff::diff_payload::DiffPayload;
2use crate::diff::table::query::input::{
3    QueryHashDataInput, QueryPrimaryKeysInput, QueryTableCountInput, QueryTableNamesInput,
4};
5use crate::diff::table::query::output::{TableCountDiff, TableDiffOutput, TableSource};
6
7use crate::diff::table::query::table_query_executor::{
8    TableDualSourceQueryExecutor, TableSingleSourceQueryExecutor,
9};
10use crate::diff::table::query::table_types::{
11    TableName, TableOffset, TablePosition, TablePrimaryKeys,
12};
13use anyhow::Result;
14use colored::Colorize;
15use tracing::{debug, info};
16
17use crate::diff::diff_output::DiffOutput;
18use crate::diff::types::SchemaName;
19use std::time::Instant;
20
21pub struct TableDiffer<TQE: TableSingleSourceQueryExecutor, DTQE: TableDualSourceQueryExecutor> {
22    single_table_query_executor: TQE,
23    dual_table_query_executor: DTQE,
24}
25
26impl<TQE: TableSingleSourceQueryExecutor, DTQE: TableDualSourceQueryExecutor>
27    TableDiffer<TQE, DTQE>
28{
29    pub fn new(single_table_query_executor: TQE, dual_table_query_executor: DTQE) -> Self {
30        Self {
31            single_table_query_executor,
32            dual_table_query_executor,
33        }
34    }
35
36    pub async fn diff_all_table_data(&self, diff_payload: &DiffPayload) -> Result<Vec<DiffOutput>> {
37        info!("{}", "Starting data analysis…".yellow().bold());
38
39        let mut tables = self.get_all_tables(diff_payload).await?;
40
41        tables.sort_by_key(|s| s.to_lowercase());
42
43        let sorted_tables = tables.to_owned();
44
45        let futures = sorted_tables.iter().map(|table_name| async {
46            let start = Instant::now();
47
48            // Start loading counts for table from both DBs
49            let query_count_input = QueryTableCountInput::new(
50                SchemaName::new(diff_payload.schema_name().to_string()),
51                TableName::new(table_name.to_string()),
52            );
53
54            let table_counts_start = Instant::now();
55            let (first_result, second_result) = self
56                .dual_table_query_executor
57                .query_table_count(query_count_input)
58                .await;
59
60            let table_counts_elapsed = table_counts_start.elapsed();
61            debug!(
62                "Table counts for {} loaded in: {}ms",
63                table_name.clone(),
64                table_counts_elapsed.as_millis()
65            );
66
67            info!(
68                "{}",
69                format!("Analyzing table: {}", table_name.clone())
70                    .yellow()
71                    .bold()
72            );
73
74            // Start counts comparison
75            let table_diff_result = Self::extract_result(table_name, first_result, second_result);
76
77            let elapsed = start.elapsed();
78            debug!(
79                "{}",
80                format!("Table analysis completed in: {}ms", elapsed.as_millis())
81            );
82
83            debug!("##############################################");
84
85            // If we only care about counts, return the result
86            if diff_payload.only_count() {
87                return table_diff_result;
88            }
89
90            // If the diff result permits us to skip data comparison, return the result
91            if table_diff_result.skip_table_diff() {
92                return table_diff_result;
93            }
94
95            let query_primary_keys_input = QueryPrimaryKeysInput::new(table_name.clone());
96
97            let primary_keys = self
98                .single_table_query_executor
99                .query_primary_keys(query_primary_keys_input)
100                .await;
101
102            // If no primary keys found, return the result
103            if primary_keys.is_empty() {
104                let table_diff_result = TableDiffOutput::NoPrimaryKeyFound(table_name.clone());
105                return table_diff_result;
106            }
107
108            // Prepare the primary keys for the table
109            // Will be used for query ordering when hashing data
110            let primary_keys = primary_keys.as_slice().join(",");
111
112            let total_rows = match table_diff_result {
113                TableDiffOutput::NoCountDiff(_, rows) => rows,
114                _ => {
115                    // Since we do not expect to reach here, print the result and panic
116                    panic!("Unexpected table diff result")
117                }
118            };
119
120            let schema_name = SchemaName::new(diff_payload.schema_name().to_string());
121            let query_table_name = TableName::new(table_name.clone());
122            let table_offset = TableOffset::new(diff_payload.chunk_size());
123            let table_primary_keys = TablePrimaryKeys::new(primary_keys);
124
125            let start = Instant::now();
126
127            if let Some(value) = self
128                .diff_table_data(
129                    diff_payload,
130                    schema_name,
131                    query_table_name,
132                    table_offset,
133                    table_primary_keys,
134                    total_rows,
135                    start,
136                )
137                .await
138            {
139                return value;
140            }
141
142            let elapsed = start.elapsed();
143
144            TableDiffOutput::NoDiffWithDuration(table_name.clone(), elapsed)
145        });
146
147        info!(
148            "{}",
149            "Waiting for table analysis to complete…".yellow().bold()
150        );
151        let start = Instant::now();
152        let analysed_tables = futures::future::join_all(futures).await;
153        let elapsed = start.elapsed();
154        info!(
155            "{}",
156            format!(
157                "Total table analysis completed in: {}ms",
158                elapsed.as_millis()
159            )
160            .yellow()
161            .bold(),
162        );
163        info!(
164            "{}",
165            format!("Total tables for row count check: {}", tables.len())
166                .bright_blue()
167                .bold()
168        );
169
170        info!("##############################################");
171        info!("{}", "Table analysis results 👇".bright_magenta().bold());
172
173        for table_diff_result in &analysed_tables {
174            info!("{}", table_diff_result.to_string());
175        }
176
177        info!("##############################################");
178
179        Ok(analysed_tables
180            .into_iter()
181            .map(|diff| diff.into())
182            .collect())
183    }
184
185    pub async fn get_all_tables(&self, diff_payload: &DiffPayload) -> Result<Vec<String>> {
186        let input = QueryTableNamesInput::new(
187            SchemaName::new(diff_payload.schema_name().to_string()),
188            diff_payload.included_tables().to_vec(),
189            diff_payload.excluded_tables().to_vec(),
190        );
191
192        let tables = self
193            .single_table_query_executor
194            .query_table_names(input)
195            .await;
196
197        Ok(tables)
198    }
199
200    fn extract_result(
201        table_name: &str,
202        first_result: Result<i64>,
203        second_result: Result<i64>,
204    ) -> TableDiffOutput {
205        match (first_result, second_result) {
206            (Ok(first_total_rows), Ok(second_total_rows)) => {
207                if first_total_rows != second_total_rows {
208                    TableDiffOutput::Diff(
209                        table_name.to_owned(),
210                        TableCountDiff::new(first_total_rows, second_total_rows),
211                    )
212                } else {
213                    TableDiffOutput::NoCountDiff(table_name.to_owned(), first_total_rows)
214                }
215            }
216            (Err(_e), _) => TableDiffOutput::NotExists(table_name.to_owned(), TableSource::First),
217            (_, Err(_e)) => TableDiffOutput::NotExists(table_name.to_owned(), TableSource::Second),
218        }
219    }
220    #[allow(clippy::too_many_arguments)]
221    async fn diff_table_data(
222        &self,
223        diff_payload: &DiffPayload,
224        schema_name: SchemaName,
225        query_table_name: TableName,
226        table_offset: TableOffset,
227        table_primary_keys: TablePrimaryKeys,
228        total_rows: i64,
229        start: Instant,
230    ) -> Option<TableDiffOutput> {
231        // Start data comparison
232        let mut position = diff_payload.start_position();
233        while position <= total_rows {
234            let input = QueryHashDataInput::new(
235                schema_name.clone(),
236                query_table_name.clone(),
237                table_primary_keys.clone(),
238                TablePosition::new(position),
239                table_offset.clone(),
240            );
241
242            let hash_fetch_start = Instant::now();
243            let (first_hash, second_hash) =
244                self.dual_table_query_executor.query_hash_data(input).await;
245            let hash_fetch_elapsed = hash_fetch_start.elapsed();
246            debug!(
247                "Hashes for {} loaded in: {}ms",
248                query_table_name.name(),
249                hash_fetch_elapsed.as_millis()
250            );
251
252            // If hashes are different, return the result
253            if first_hash != second_hash {
254                let elapsed = start.elapsed();
255                return Some(TableDiffOutput::DataDiffWithDuration(
256                    query_table_name.name().to_string(),
257                    position,
258                    position + diff_payload.chunk_size(),
259                    elapsed,
260                ));
261            }
262
263            // Increase the position for the next iteration
264            position += diff_payload.chunk_size();
265        }
266
267        None
268    }
269}