rust_pgdatadiff/diff/sequence/query/sequence_query_executor.rs
1/// This module contains the implementation of query executors for sequence-related operations.
2/// It provides traits and structs for executing queries on a single data source and on dual data sources.
3/// The single data source executor is responsible for querying sequence names.
4/// The dual data source executor is responsible for querying sequence last values.
5/// Both executors use the `sqlx` crate for interacting with the database.
6///
7/// # Examples
8///
9/// ```no_run
10/// use rust_pgdatadiff::diff::sequence::query::sequence_query_executor::SequenceSingleSourceQueryExecutorImpl;
11/// use rust_pgdatadiff::diff::sequence::query::sequence_query_executor::SequenceSingleSourceQueryExecutor;
12/// use rust_pgdatadiff::diff::sequence::query::input::QueryAllSequencesInput;
13/// use rust_pgdatadiff::diff::types::SchemaName;
14/// use rust_pgdatadiff::diff::sequence::query::sequence_query_executor::SequenceDualSourceQueryExecutorImpl;
15/// use rust_pgdatadiff::diff::sequence::query::sequence_query_executor::SequenceDualSourceQueryExecutor;
16/// use rust_pgdatadiff::diff::sequence::query::sequence_types::SequenceName;
17/// use rust_pgdatadiff::diff::sequence::query::input::QueryLastValuesInput;
18///
19/// #[tokio::main]
20/// async fn main() {
21///
22/// let mut cfg = Config::new();
23/// cfg.url = Some(String::from("postgres://user:password@localhost:5432/database"));
24///
25/// let db_pool: Pool = cfg
26/// .create_pool(Some(Runtime::Tokio1), NoTls)
27/// .unwrap();
28///
29/// // Create a single data source executor
30/// let single_source_executor = SequenceSingleSourceQueryExecutorImpl::new(db_pool);
31///
32/// // Query sequence names
33/// let schema_name = SchemaName::new("public".to_string());
34/// let table_names = single_source_executor
35/// .query_sequence_names(QueryAllSequencesInput::new(schema_name))
36/// .await;
37///
38/// // Create a dual data source executor
39/// let mut first_cfg = Config::new();
40/// first_cfg.url = Some(String::from("postgres://user:password@localhost:5432/database"));
41///
42/// let mut second_cfg = Config::new();
43/// second_cfg.url = Some(String::from("postgres://user:password@localhost:5432/database2"));
44///
45/// let first_db_pool: Pool = first_cfg
46/// .create_pool(Some(Runtime::Tokio1), NoTls)
47/// .unwrap();
48///
49/// let second_db_pool: Pool = second_cfg
50/// .create_pool(Some(Runtime::Tokio1), NoTls)
51/// .unwrap();
52///
53/// let dual_source_executor = SequenceDualSourceQueryExecutorImpl::new(first_db_pool, second_db_pool);
54///
55/// // Query sequence last values
56/// let sequence_name = SequenceName::new("public".to_string());
57/// let schema_name = SchemaName::new("public".to_string());
58/// let (first_count, second_count) = dual_source_executor
59/// .query_sequence_last_values(QueryLastValuesInput::new(schema_name, sequence_name))
60/// .await;
61/// }
62/// ```
63use crate::diff::sequence::query::input::{QueryAllSequencesInput, QueryLastValuesInput};
64use crate::diff::sequence::query::sequence_query::SequenceQuery;
65
66use anyhow::Result;
67use async_trait::async_trait;
68use deadpool_postgres::Pool;
69use tracing::error;
70
71#[cfg_attr(test, mockall::automock)]
72#[async_trait]
73pub trait SequenceSingleSourceQueryExecutor {
74 /// Queries the sequence names from the database.
75 ///
76 /// # Returns
77 ///
78 /// A vector of sequence names.
79 async fn query_sequence_names(&self, input: QueryAllSequencesInput) -> Vec<String>;
80}
81
82pub struct SequenceSingleSourceQueryExecutorImpl {
83 db_pool: Pool,
84}
85
86impl SequenceSingleSourceQueryExecutorImpl {
87 pub fn new(db_pool: Pool) -> Self {
88 Self { db_pool }
89 }
90}
91
92#[async_trait]
93impl SequenceSingleSourceQueryExecutor for SequenceSingleSourceQueryExecutorImpl {
94 async fn query_sequence_names(&self, input: QueryAllSequencesInput) -> Vec<String> {
95 // Clone the database client
96 let client = self.db_pool.get().await.unwrap();
97
98 let schema_name = input.schema_name();
99 let sequence_query = SequenceQuery::AllSequences(schema_name);
100
101 let query_binding = sequence_query.to_string();
102
103 client
104 .query(&query_binding, &[])
105 .await
106 .unwrap()
107 .into_iter()
108 .map(|row| row.get("sequence_name"))
109 .collect::<Vec<String>>()
110 }
111}
112
113#[cfg_attr(test, mockall::automock)]
114#[async_trait]
115pub trait SequenceDualSourceQueryExecutor {
116 /// Executes a query to retrieve the last value of a sequence.
117 ///
118 /// # Arguments
119 ///
120 /// * `input` - The input parameters for the query.
121 ///
122 /// # Returns
123 ///
124 /// A tuple containing the result of the query as a `Result<i64>`.
125 async fn query_sequence_last_values(
126 &self,
127 input: QueryLastValuesInput,
128 ) -> (Result<i64>, Result<i64>);
129}
130
131pub struct SequenceDualSourceQueryExecutorImpl {
132 first_db_pool: Pool,
133 second_db_pool: Pool,
134}
135
136impl SequenceDualSourceQueryExecutorImpl {
137 pub fn new(first_db_pool: Pool, second_db_pool: Pool) -> Self {
138 Self {
139 first_db_pool,
140 second_db_pool,
141 }
142 }
143}
144
145#[async_trait]
146impl SequenceDualSourceQueryExecutor for SequenceDualSourceQueryExecutorImpl {
147 async fn query_sequence_last_values(
148 &self,
149 input: QueryLastValuesInput,
150 ) -> (Result<i64>, Result<i64>) {
151 // Clone the database clients
152 let first_client = self.first_db_pool.get().await.unwrap();
153 let second_client = self.second_db_pool.get().await.unwrap();
154
155 let sequence_query = SequenceQuery::LastValue(
156 input.schema_name().to_owned(),
157 input.sequence_name().to_owned(),
158 );
159
160 let query_binding = sequence_query.to_string();
161
162 let first_result = first_client.query_one(&query_binding, &[]);
163 let second_result = second_client.query_one(&query_binding, &[]);
164
165 let (first_result, second_result) =
166 futures::future::join(first_result, second_result).await;
167
168 let first_count: Result<i64> = match first_result {
169 Ok(pg_row) => Ok(pg_row.try_get("last_value").unwrap()),
170 Err(e) => {
171 error!("Error while fetching first sequence: {}", e);
172 Err(anyhow::anyhow!("Failed to fetch count for first sequence"))
173 }
174 };
175
176 let second_count: Result<i64> = match second_result {
177 Ok(pg_row) => Ok(pg_row.try_get("last_value").unwrap()),
178 Err(e) => {
179 error!("Error while fetching second sequence: {}", e);
180 Err(anyhow::anyhow!("Failed to fetch count for second sequence"))
181 }
182 };
183
184 (first_count, second_count)
185 }
186}