rust_pgdatadiff/diff/sequence/
sequence_differ.rs

1use anyhow::Result;
2use colored::Colorize;
3
4use tracing::{debug, info};
5
6use crate::diff::diff_output::DiffOutput;
7use crate::diff::sequence::query::input::{QueryAllSequencesInput, QueryLastValuesInput};
8use crate::diff::sequence::query::output::{SequenceCountDiff, SequenceDiffOutput, SequenceSource};
9use tokio::time::Instant;
10
11use crate::diff::sequence::query::sequence_query_executor::{
12    SequenceDualSourceQueryExecutor, SequenceSingleSourceQueryExecutor,
13};
14use crate::diff::sequence::query::sequence_types::SequenceName;
15use crate::diff::types::SchemaName;
16
17pub struct SequenceDiffer<
18    SQE: SequenceSingleSourceQueryExecutor,
19    DSQE: SequenceDualSourceQueryExecutor,
20> {
21    single_sequence_query_executor: SQE,
22    dual_sequence_query_executor: DSQE,
23}
24
25impl<SQE: SequenceSingleSourceQueryExecutor, DSQE: SequenceDualSourceQueryExecutor>
26    SequenceDiffer<SQE, DSQE>
27{
28    pub fn new(single_sequence_query_executor: SQE, dual_sequence_query_executor: DSQE) -> Self {
29        Self {
30            single_sequence_query_executor,
31            dual_sequence_query_executor,
32        }
33    }
34
35    pub async fn diff_all_sequences(&self, schema_name: String) -> Result<Vec<DiffOutput>> {
36        info!("{}", "Starting sequence analysis…".bold().yellow());
37        let mut sequences = self.get_all_sequences(schema_name.to_owned()).await?;
38
39        sequences.sort_by_key(|s| s.to_lowercase());
40
41        let sorted_sequences = sequences.to_owned();
42
43        let futures = sorted_sequences.iter().map(|sequence_name| async {
44            let start = Instant::now();
45
46            let schema_name = SchemaName::new(schema_name.to_owned());
47            let sequence_name = SequenceName::new(sequence_name.to_owned());
48            let input = QueryLastValuesInput::new(schema_name, sequence_name.to_owned());
49            let (first_result, second_result) = self
50                .dual_sequence_query_executor
51                .query_sequence_last_values(input)
52                .await;
53
54            debug!(
55                "{}",
56                format!("Analyzing sequence: {}", &sequence_name.name())
57                    .yellow()
58                    .bold()
59            );
60
61            let sequence_diff_result =
62                Self::extract_result(sequence_name.name(), first_result, second_result);
63
64            let elapsed = start.elapsed();
65            debug!(
66                "{}",
67                format!("Sequence analysis completed in: {}ms", elapsed.as_millis())
68            );
69            debug!("##############################################");
70
71            sequence_diff_result
72        });
73
74        info!(
75            "{}",
76            "Waiting for total sequence analysis to complete…"
77                .yellow()
78                .bold()
79        );
80        let start = Instant::now();
81        let sequences_analysed = futures::future::join_all(futures).await;
82        let elapsed = start.elapsed();
83        debug!(
84            "{}",
85            format!(
86                "Total sequence analysis completed in: {}ms",
87                elapsed.as_millis()
88            )
89            .yellow()
90            .bold(),
91        );
92
93        for sequence_diff_result in &sequences_analysed {
94            info!("{}", sequence_diff_result.to_string());
95        }
96
97        Ok(sequences_analysed
98            .into_iter()
99            .map(|diff| diff.into())
100            .collect())
101    }
102
103    pub async fn get_all_sequences(&self, schema_name: String) -> Result<Vec<String>> {
104        let input = QueryAllSequencesInput::new(SchemaName::new(schema_name));
105        let query_result = self
106            .single_sequence_query_executor
107            .query_sequence_names(input)
108            .await;
109        Ok(query_result)
110    }
111
112    fn extract_result(
113        sequence_name: String,
114        first_result: Result<i64>,
115        second_result: Result<i64>,
116    ) -> SequenceDiffOutput {
117        match (first_result, second_result) {
118            (Ok(first_value), Ok(second_value)) => {
119                if first_value != second_value {
120                    SequenceDiffOutput::Diff(
121                        sequence_name,
122                        SequenceCountDiff::new(first_value, second_value),
123                    )
124                } else {
125                    SequenceDiffOutput::NoDiff(sequence_name)
126                }
127            }
128            (Err(_e), _) => SequenceDiffOutput::NotExists(sequence_name, SequenceSource::First),
129            (_, Err(_e)) => SequenceDiffOutput::NotExists(sequence_name, SequenceSource::Second),
130        }
131    }
132}