rust_pgdatadiff/diff/sequence/query/
sequence_query_executor.rs

1/// This module contains the implementation of query executors for sequence-related operations.
2/// It provides traits and structs for executing queries on a single data source and on dual data sources.
3/// The single data source executor is responsible for querying sequence names.
4/// The dual data source executor is responsible for querying sequence last values.
5/// Both executors use the `sqlx` crate for interacting with the database.
6///
7/// # Examples
8///
9/// ```no_run
10/// use rust_pgdatadiff::diff::sequence::query::sequence_query_executor::SequenceSingleSourceQueryExecutorImpl;
11/// use rust_pgdatadiff::diff::sequence::query::sequence_query_executor::SequenceSingleSourceQueryExecutor;
12/// use rust_pgdatadiff::diff::sequence::query::input::QueryAllSequencesInput;
13/// use rust_pgdatadiff::diff::types::SchemaName;
14/// use rust_pgdatadiff::diff::sequence::query::sequence_query_executor::SequenceDualSourceQueryExecutorImpl;
15/// use rust_pgdatadiff::diff::sequence::query::sequence_query_executor::SequenceDualSourceQueryExecutor;
16/// use rust_pgdatadiff::diff::sequence::query::sequence_types::SequenceName;
17/// use rust_pgdatadiff::diff::sequence::query::input::QueryLastValuesInput;
18///
19/// #[tokio::main]
20/// async fn main() {
21///
22///     let mut cfg = Config::new();
23///     cfg.url = Some(String::from("postgres://user:password@localhost:5432/database"));
24///
25///     let db_pool: Pool = cfg
26///         .create_pool(Some(Runtime::Tokio1), NoTls)
27///         .unwrap();
28///
29///     // Create a single data source executor
30///     let single_source_executor = SequenceSingleSourceQueryExecutorImpl::new(db_pool);
31///
32///     // Query sequence names
33///     let schema_name = SchemaName::new("public".to_string());
34///     let table_names = single_source_executor
35///         .query_sequence_names(QueryAllSequencesInput::new(schema_name))
36///         .await;
37///
38///     // Create a dual data source executor
39///     let mut first_cfg = Config::new();
40///     first_cfg.url = Some(String::from("postgres://user:password@localhost:5432/database"));
41///
42///     let mut second_cfg = Config::new();
43///     second_cfg.url = Some(String::from("postgres://user:password@localhost:5432/database2"));
44///
45///     let first_db_pool: Pool = first_cfg
46///         .create_pool(Some(Runtime::Tokio1), NoTls)
47///         .unwrap();
48///
49///     let second_db_pool: Pool = second_cfg
50///         .create_pool(Some(Runtime::Tokio1), NoTls)
51///         .unwrap();
52///
53///     let dual_source_executor = SequenceDualSourceQueryExecutorImpl::new(first_db_pool, second_db_pool);
54///
55///     // Query sequence last values
56///     let sequence_name = SequenceName::new("public".to_string());
57///     let schema_name = SchemaName::new("public".to_string());
58///     let (first_count, second_count) = dual_source_executor
59///         .query_sequence_last_values(QueryLastValuesInput::new(schema_name, sequence_name))
60///         .await;
61/// }
62/// ```
63use crate::diff::sequence::query::input::{QueryAllSequencesInput, QueryLastValuesInput};
64use crate::diff::sequence::query::sequence_query::SequenceQuery;
65
66use anyhow::Result;
67use async_trait::async_trait;
68use deadpool_postgres::Pool;
69use tracing::error;
70
71#[cfg_attr(test, mockall::automock)]
72#[async_trait]
73pub trait SequenceSingleSourceQueryExecutor {
74    /// Queries the sequence names from the database.
75    ///
76    /// # Returns
77    ///
78    /// A vector of sequence names.
79    async fn query_sequence_names(&self, input: QueryAllSequencesInput) -> Vec<String>;
80}
81
82pub struct SequenceSingleSourceQueryExecutorImpl {
83    db_pool: Pool,
84}
85
86impl SequenceSingleSourceQueryExecutorImpl {
87    pub fn new(db_pool: Pool) -> Self {
88        Self { db_pool }
89    }
90}
91
92#[async_trait]
93impl SequenceSingleSourceQueryExecutor for SequenceSingleSourceQueryExecutorImpl {
94    async fn query_sequence_names(&self, input: QueryAllSequencesInput) -> Vec<String> {
95        // Clone the database client
96        let client = self.db_pool.get().await.unwrap();
97
98        let schema_name = input.schema_name();
99        let sequence_query = SequenceQuery::AllSequences(schema_name);
100
101        let query_binding = sequence_query.to_string();
102
103        client
104            .query(&query_binding, &[])
105            .await
106            .unwrap()
107            .into_iter()
108            .map(|row| row.get("sequence_name"))
109            .collect::<Vec<String>>()
110    }
111}
112
113#[cfg_attr(test, mockall::automock)]
114#[async_trait]
115pub trait SequenceDualSourceQueryExecutor {
116    /// Executes a query to retrieve the last value of a sequence.
117    ///
118    /// # Arguments
119    ///
120    /// * `input` - The input parameters for the query.
121    ///
122    /// # Returns
123    ///
124    /// A tuple containing the result of the query as a `Result<i64>`.
125    async fn query_sequence_last_values(
126        &self,
127        input: QueryLastValuesInput,
128    ) -> (Result<i64>, Result<i64>);
129}
130
131pub struct SequenceDualSourceQueryExecutorImpl {
132    first_db_pool: Pool,
133    second_db_pool: Pool,
134}
135
136impl SequenceDualSourceQueryExecutorImpl {
137    pub fn new(first_db_pool: Pool, second_db_pool: Pool) -> Self {
138        Self {
139            first_db_pool,
140            second_db_pool,
141        }
142    }
143}
144
145#[async_trait]
146impl SequenceDualSourceQueryExecutor for SequenceDualSourceQueryExecutorImpl {
147    async fn query_sequence_last_values(
148        &self,
149        input: QueryLastValuesInput,
150    ) -> (Result<i64>, Result<i64>) {
151        // Clone the database clients
152        let first_client = self.first_db_pool.get().await.unwrap();
153        let second_client = self.second_db_pool.get().await.unwrap();
154
155        let sequence_query = SequenceQuery::LastValue(
156            input.schema_name().to_owned(),
157            input.sequence_name().to_owned(),
158        );
159
160        let query_binding = sequence_query.to_string();
161
162        let first_result = first_client.query_one(&query_binding, &[]);
163        let second_result = second_client.query_one(&query_binding, &[]);
164
165        let (first_result, second_result) =
166            futures::future::join(first_result, second_result).await;
167
168        let first_count: Result<i64> = match first_result {
169            Ok(pg_row) => Ok(pg_row.try_get("last_value").unwrap()),
170            Err(e) => {
171                error!("Error while fetching first sequence: {}", e);
172                Err(anyhow::anyhow!("Failed to fetch count for first sequence"))
173            }
174        };
175
176        let second_count: Result<i64> = match second_result {
177            Ok(pg_row) => Ok(pg_row.try_get("last_value").unwrap()),
178            Err(e) => {
179                error!("Error while fetching second sequence: {}", e);
180                Err(anyhow::anyhow!("Failed to fetch count for second sequence"))
181            }
182        };
183
184        (first_count, second_count)
185    }
186}