rust_pgdatadiff/diff/sequence/
sequence_differ.rs1use anyhow::Result;
2use colored::Colorize;
3
4use tracing::{debug, info};
5
6use crate::diff::diff_output::DiffOutput;
7use crate::diff::sequence::query::input::{QueryAllSequencesInput, QueryLastValuesInput};
8use crate::diff::sequence::query::output::{SequenceCountDiff, SequenceDiffOutput, SequenceSource};
9use tokio::time::Instant;
10
11use crate::diff::sequence::query::sequence_query_executor::{
12 SequenceDualSourceQueryExecutor, SequenceSingleSourceQueryExecutor,
13};
14use crate::diff::sequence::query::sequence_types::SequenceName;
15use crate::diff::types::SchemaName;
16
17pub struct SequenceDiffer<
18 SQE: SequenceSingleSourceQueryExecutor,
19 DSQE: SequenceDualSourceQueryExecutor,
20> {
21 single_sequence_query_executor: SQE,
22 dual_sequence_query_executor: DSQE,
23}
24
25impl<SQE: SequenceSingleSourceQueryExecutor, DSQE: SequenceDualSourceQueryExecutor>
26 SequenceDiffer<SQE, DSQE>
27{
28 pub fn new(single_sequence_query_executor: SQE, dual_sequence_query_executor: DSQE) -> Self {
29 Self {
30 single_sequence_query_executor,
31 dual_sequence_query_executor,
32 }
33 }
34
35 pub async fn diff_all_sequences(&self, schema_name: String) -> Result<Vec<DiffOutput>> {
36 info!("{}", "Starting sequence analysis…".bold().yellow());
37 let mut sequences = self.get_all_sequences(schema_name.to_owned()).await?;
38
39 sequences.sort_by_key(|s| s.to_lowercase());
40
41 let sorted_sequences = sequences.to_owned();
42
43 let futures = sorted_sequences.iter().map(|sequence_name| async {
44 let start = Instant::now();
45
46 let schema_name = SchemaName::new(schema_name.to_owned());
47 let sequence_name = SequenceName::new(sequence_name.to_owned());
48 let input = QueryLastValuesInput::new(schema_name, sequence_name.to_owned());
49 let (first_result, second_result) = self
50 .dual_sequence_query_executor
51 .query_sequence_last_values(input)
52 .await;
53
54 debug!(
55 "{}",
56 format!("Analyzing sequence: {}", &sequence_name.name())
57 .yellow()
58 .bold()
59 );
60
61 let sequence_diff_result =
62 Self::extract_result(sequence_name.name(), first_result, second_result);
63
64 let elapsed = start.elapsed();
65 debug!(
66 "{}",
67 format!("Sequence analysis completed in: {}ms", elapsed.as_millis())
68 );
69 debug!("##############################################");
70
71 sequence_diff_result
72 });
73
74 info!(
75 "{}",
76 "Waiting for total sequence analysis to complete…"
77 .yellow()
78 .bold()
79 );
80 let start = Instant::now();
81 let sequences_analysed = futures::future::join_all(futures).await;
82 let elapsed = start.elapsed();
83 debug!(
84 "{}",
85 format!(
86 "Total sequence analysis completed in: {}ms",
87 elapsed.as_millis()
88 )
89 .yellow()
90 .bold(),
91 );
92
93 for sequence_diff_result in &sequences_analysed {
94 info!("{}", sequence_diff_result.to_string());
95 }
96
97 Ok(sequences_analysed
98 .into_iter()
99 .map(|diff| diff.into())
100 .collect())
101 }
102
103 pub async fn get_all_sequences(&self, schema_name: String) -> Result<Vec<String>> {
104 let input = QueryAllSequencesInput::new(SchemaName::new(schema_name));
105 let query_result = self
106 .single_sequence_query_executor
107 .query_sequence_names(input)
108 .await;
109 Ok(query_result)
110 }
111
112 fn extract_result(
113 sequence_name: String,
114 first_result: Result<i64>,
115 second_result: Result<i64>,
116 ) -> SequenceDiffOutput {
117 match (first_result, second_result) {
118 (Ok(first_value), Ok(second_value)) => {
119 if first_value != second_value {
120 SequenceDiffOutput::Diff(
121 sequence_name,
122 SequenceCountDiff::new(first_value, second_value),
123 )
124 } else {
125 SequenceDiffOutput::NoDiff(sequence_name)
126 }
127 }
128 (Err(_e), _) => SequenceDiffOutput::NotExists(sequence_name, SequenceSource::First),
129 (_, Err(_e)) => SequenceDiffOutput::NotExists(sequence_name, SequenceSource::Second),
130 }
131 }
132}