rust_pgdatadiff/diff/table/
table_differ.rs1use crate::diff::diff_payload::DiffPayload;
2use crate::diff::table::query::input::{
3 QueryHashDataInput, QueryPrimaryKeysInput, QueryTableCountInput, QueryTableNamesInput,
4};
5use crate::diff::table::query::output::{TableCountDiff, TableDiffOutput, TableSource};
6
7use crate::diff::table::query::table_query_executor::{
8 TableDualSourceQueryExecutor, TableSingleSourceQueryExecutor,
9};
10use crate::diff::table::query::table_types::{
11 TableName, TableOffset, TablePosition, TablePrimaryKeys,
12};
13use anyhow::Result;
14use colored::Colorize;
15use tracing::{debug, info};
16
17use crate::diff::diff_output::DiffOutput;
18use crate::diff::types::SchemaName;
19use std::time::Instant;
20
21pub struct TableDiffer<TQE: TableSingleSourceQueryExecutor, DTQE: TableDualSourceQueryExecutor> {
22 single_table_query_executor: TQE,
23 dual_table_query_executor: DTQE,
24}
25
26impl<TQE: TableSingleSourceQueryExecutor, DTQE: TableDualSourceQueryExecutor>
27 TableDiffer<TQE, DTQE>
28{
29 pub fn new(single_table_query_executor: TQE, dual_table_query_executor: DTQE) -> Self {
30 Self {
31 single_table_query_executor,
32 dual_table_query_executor,
33 }
34 }
35
36 pub async fn diff_all_table_data(&self, diff_payload: &DiffPayload) -> Result<Vec<DiffOutput>> {
37 info!("{}", "Starting data analysis…".yellow().bold());
38
39 let mut tables = self.get_all_tables(diff_payload).await?;
40
41 tables.sort_by_key(|s| s.to_lowercase());
42
43 let sorted_tables = tables.to_owned();
44
45 let futures = sorted_tables.iter().map(|table_name| async {
46 let start = Instant::now();
47
48 let query_count_input = QueryTableCountInput::new(
50 SchemaName::new(diff_payload.schema_name().to_string()),
51 TableName::new(table_name.to_string()),
52 );
53
54 let table_counts_start = Instant::now();
55 let (first_result, second_result) = self
56 .dual_table_query_executor
57 .query_table_count(query_count_input)
58 .await;
59
60 let table_counts_elapsed = table_counts_start.elapsed();
61 debug!(
62 "Table counts for {} loaded in: {}ms",
63 table_name.clone(),
64 table_counts_elapsed.as_millis()
65 );
66
67 info!(
68 "{}",
69 format!("Analyzing table: {}", table_name.clone())
70 .yellow()
71 .bold()
72 );
73
74 let table_diff_result = Self::extract_result(table_name, first_result, second_result);
76
77 let elapsed = start.elapsed();
78 debug!(
79 "{}",
80 format!("Table analysis completed in: {}ms", elapsed.as_millis())
81 );
82
83 debug!("##############################################");
84
85 if diff_payload.only_count() {
87 return table_diff_result;
88 }
89
90 if table_diff_result.skip_table_diff() {
92 return table_diff_result;
93 }
94
95 let query_primary_keys_input = QueryPrimaryKeysInput::new(table_name.clone());
96
97 let primary_keys = self
98 .single_table_query_executor
99 .query_primary_keys(query_primary_keys_input)
100 .await;
101
102 if primary_keys.is_empty() {
104 let table_diff_result = TableDiffOutput::NoPrimaryKeyFound(table_name.clone());
105 return table_diff_result;
106 }
107
108 let primary_keys = primary_keys.as_slice().join(",");
111
112 let total_rows = match table_diff_result {
113 TableDiffOutput::NoCountDiff(_, rows) => rows,
114 _ => {
115 panic!("Unexpected table diff result")
117 }
118 };
119
120 let schema_name = SchemaName::new(diff_payload.schema_name().to_string());
121 let query_table_name = TableName::new(table_name.clone());
122 let table_offset = TableOffset::new(diff_payload.chunk_size());
123 let table_primary_keys = TablePrimaryKeys::new(primary_keys);
124
125 let start = Instant::now();
126
127 if let Some(value) = self
128 .diff_table_data(
129 diff_payload,
130 schema_name,
131 query_table_name,
132 table_offset,
133 table_primary_keys,
134 total_rows,
135 start,
136 )
137 .await
138 {
139 return value;
140 }
141
142 let elapsed = start.elapsed();
143
144 TableDiffOutput::NoDiffWithDuration(table_name.clone(), elapsed)
145 });
146
147 info!(
148 "{}",
149 "Waiting for table analysis to complete…".yellow().bold()
150 );
151 let start = Instant::now();
152 let analysed_tables = futures::future::join_all(futures).await;
153 let elapsed = start.elapsed();
154 info!(
155 "{}",
156 format!(
157 "Total table analysis completed in: {}ms",
158 elapsed.as_millis()
159 )
160 .yellow()
161 .bold(),
162 );
163 info!(
164 "{}",
165 format!("Total tables for row count check: {}", tables.len())
166 .bright_blue()
167 .bold()
168 );
169
170 info!("##############################################");
171 info!("{}", "Table analysis results 👇".bright_magenta().bold());
172
173 for table_diff_result in &analysed_tables {
174 info!("{}", table_diff_result.to_string());
175 }
176
177 info!("##############################################");
178
179 Ok(analysed_tables
180 .into_iter()
181 .map(|diff| diff.into())
182 .collect())
183 }
184
185 pub async fn get_all_tables(&self, diff_payload: &DiffPayload) -> Result<Vec<String>> {
186 let input = QueryTableNamesInput::new(
187 SchemaName::new(diff_payload.schema_name().to_string()),
188 diff_payload.included_tables().to_vec(),
189 diff_payload.excluded_tables().to_vec(),
190 );
191
192 let tables = self
193 .single_table_query_executor
194 .query_table_names(input)
195 .await;
196
197 Ok(tables)
198 }
199
200 fn extract_result(
201 table_name: &str,
202 first_result: Result<i64>,
203 second_result: Result<i64>,
204 ) -> TableDiffOutput {
205 match (first_result, second_result) {
206 (Ok(first_total_rows), Ok(second_total_rows)) => {
207 if first_total_rows != second_total_rows {
208 TableDiffOutput::Diff(
209 table_name.to_owned(),
210 TableCountDiff::new(first_total_rows, second_total_rows),
211 )
212 } else {
213 TableDiffOutput::NoCountDiff(table_name.to_owned(), first_total_rows)
214 }
215 }
216 (Err(_e), _) => TableDiffOutput::NotExists(table_name.to_owned(), TableSource::First),
217 (_, Err(_e)) => TableDiffOutput::NotExists(table_name.to_owned(), TableSource::Second),
218 }
219 }
220 #[allow(clippy::too_many_arguments)]
221 async fn diff_table_data(
222 &self,
223 diff_payload: &DiffPayload,
224 schema_name: SchemaName,
225 query_table_name: TableName,
226 table_offset: TableOffset,
227 table_primary_keys: TablePrimaryKeys,
228 total_rows: i64,
229 start: Instant,
230 ) -> Option<TableDiffOutput> {
231 let mut position = diff_payload.start_position();
233 while position <= total_rows {
234 let input = QueryHashDataInput::new(
235 schema_name.clone(),
236 query_table_name.clone(),
237 table_primary_keys.clone(),
238 TablePosition::new(position),
239 table_offset.clone(),
240 );
241
242 let hash_fetch_start = Instant::now();
243 let (first_hash, second_hash) =
244 self.dual_table_query_executor.query_hash_data(input).await;
245 let hash_fetch_elapsed = hash_fetch_start.elapsed();
246 debug!(
247 "Hashes for {} loaded in: {}ms",
248 query_table_name.name(),
249 hash_fetch_elapsed.as_millis()
250 );
251
252 if first_hash != second_hash {
254 let elapsed = start.elapsed();
255 return Some(TableDiffOutput::DataDiffWithDuration(
256 query_table_name.name().to_string(),
257 position,
258 position + diff_payload.chunk_size(),
259 elapsed,
260 ));
261 }
262
263 position += diff_payload.chunk_size();
265 }
266
267 None
268 }
269}