rust_pgdatadiff/diff/table/query/
table_query_executor.rs

1/// This module contains the implementation of query executors for table-related operations.
2/// It provides traits and structs for executing queries on a single data source and on dual data sources.
3/// The single data source executor is responsible for querying table names and primary keys.
4/// The dual data source executor is responsible for querying table counts and hash data.
5/// Both executors use the `sqlx` crate for interacting with the database.
6///
7/// # Examples
8///
9/// ```no_run
10/// use sqlx::postgres::PgPool;
11/// use rust_pgdatadiff::diff::table::query::table_query_executor::{
12///     TableSingleSourceQueryExecutor, TableSingleSourceQueryExecutorImpl,
13///     TableDualSourceQueryExecutor, TableDualSourceQueryExecutorImpl,
14/// };
15/// use rust_pgdatadiff::diff::table::query::input::{QueryHashDataInput, QueryPrimaryKeysInput, QueryTableCountInput, QueryTableNamesInput};///
16/// use rust_pgdatadiff::diff::table::query::table_types::{TableName, TableOffset, TablePosition, TablePrimaryKeys};
17/// use rust_pgdatadiff::diff::types::SchemaName;
18///
19/// #[tokio::main]
20/// async fn main() {
21///     // Create a single data source executor
22///     let db_client: PgPool = PgPool::connect("postgres://user:password@localhost:5432/database")
23///         .await
24///         .unwrap();
25///     let single_source_executor = TableSingleSourceQueryExecutorImpl::new(db_client);
26///
27///     // Query table names
28///     let schema_name = SchemaName::new("public".to_string());
29///     let included_tables = vec!["table1", "table2"];
30///     let excluded_tables: Vec<String> = vec![];
31///     let table_names = single_source_executor
32///         .query_table_names(QueryTableNamesInput::new(schema_name, included_tables, excluded_tables))
33///         .await;
34///
35///     // Query primary keys
36///     let primary_keys = single_source_executor
37///         .query_primary_keys(QueryPrimaryKeysInput::new("table1".to_string()))
38///         .await;
39///
40///     // Create a dual data source executor
41///     let first_db_client: PgPool = PgPool::connect("postgres://user:password@localhost:5432/database1")
42///         .await
43///         .unwrap();
44///     let second_db_client: PgPool = PgPool::connect("postgres://user:password@localhost:5432/database2")
45///         .await
46///         .unwrap();
47///     let dual_source_executor = TableDualSourceQueryExecutorImpl::new(first_db_client, second_db_client);
48///
49///     // Query table counts
50///     let schema_name = SchemaName::new("public");
51///     let table_name = TableName::new("table1");
52///     let (first_count, second_count) = dual_source_executor
53///         .query_table_count(QueryTableCountInput::new(schema_name, table_name))
54///         .await;
55///
56///     // Query hash data
57///     let schema_name = SchemaName::new("public");
58///     let table_name = TableName::new("table1");
59///     let primary_keys = TablePrimaryKeys::new("id");
60///     let table_position = TablePosition::new(0);
61///     let table_offset = TableOffset::new(100);
62///     let (first_hash, second_hash) = dual_source_executor
63///         .query_hash_data(QueryHashDataInput::new(schema_name, table_name, primary_keys, table_position, table_offset))
64///         .await;
65/// }
66/// ```
67use anyhow::Result;
68use async_trait::async_trait;
69use deadpool_postgres::Pool;
70
71use crate::diff::table::query::input::{
72    QueryHashDataInput, QueryPrimaryKeysInput, QueryTableCountInput, QueryTableNamesInput,
73};
74use crate::diff::table::query::table_query::TableQuery;
75use crate::diff::table::query::table_types::{IncludedExcludedTables, TableName};
76
77#[cfg(test)]
78use mockall::automock;
79
80#[cfg_attr(test, automock)]
81#[async_trait]
82/// This trait represents a query executor for a single source table.
83pub trait TableSingleSourceQueryExecutor {
84    /// Queries the table names from the database.
85    ///
86    /// # Arguments
87    ///
88    /// * `input` - The input parameters for the query.
89    ///
90    /// # Returns
91    ///
92    /// A vector of table names.
93    async fn query_table_names(&self, input: QueryTableNamesInput) -> Vec<String>;
94
95    /// Queries the primary keys of a table from the database.
96    ///
97    /// # Arguments
98    ///
99    /// * `input` - The input parameters for the query.
100    ///
101    /// # Returns
102    ///
103    /// A vector of primary key column names.
104    async fn query_primary_keys(&self, input: QueryPrimaryKeysInput) -> Vec<String>;
105}
106
107pub struct TableSingleSourceQueryExecutorImpl {
108    db_pool: Pool,
109}
110
111impl TableSingleSourceQueryExecutorImpl {
112    pub fn new(db_pool: Pool) -> Self {
113        Self { db_pool }
114    }
115}
116
117#[async_trait]
118impl TableSingleSourceQueryExecutor for TableSingleSourceQueryExecutorImpl {
119    async fn query_table_names(&self, input: QueryTableNamesInput) -> Vec<String> {
120        // Acquire the database client
121        let client = self.db_pool.get().await.unwrap();
122
123        // Prepare the query for fetching table names
124        let all_tables_query = TableQuery::AllTablesForSchema(
125            input.schema_name().to_owned(),
126            IncludedExcludedTables::new(input.included_tables(), input.excluded_tables()),
127        );
128
129        // Fetch table names
130        let query_result = client
131            .query(&all_tables_query.to_string(), &[])
132            .await
133            .unwrap();
134
135        // Map query results to [Vec<String>]
136        query_result
137            .iter()
138            .map(|row| row.get("table_name"))
139            .collect::<Vec<String>>()
140    }
141
142    async fn query_primary_keys(&self, input: QueryPrimaryKeysInput) -> Vec<String> {
143        // Acquire the database client
144        let client = self.db_pool.get().await.unwrap();
145
146        // Prepare the query for primary keys fetching
147        let find_primary_key_query =
148            TableQuery::FindPrimaryKeyForTable(TableName::new(input.table_name()));
149
150        // Fetch primary keys for the table
151        let query_result = client
152            .query(&find_primary_key_query.to_string(), &[])
153            .await
154            .unwrap();
155
156        // Map query results to [Vec<String>]
157        query_result
158            .iter()
159            .map(|row| row.get("attname"))
160            .collect::<Vec<String>>()
161    }
162}
163
164#[cfg_attr(test, automock)]
165#[async_trait]
166/// This trait defines the methods for executing queries on a dual source table.
167pub trait TableDualSourceQueryExecutor {
168    /// Executes a query to retrieve the count of rows in a table.
169    ///
170    /// # Arguments
171    ///
172    /// * `input` - The input parameters for the query.
173    ///
174    /// # Returns
175    ///
176    /// A tuple containing the result of the query as a `Result<i64>`.
177    async fn query_table_count(&self, input: QueryTableCountInput) -> (Result<i64>, Result<i64>);
178
179    /// Executes a query to retrieve the hash data of a table.
180    ///
181    /// # Arguments
182    ///
183    /// * `input` - The input parameters for the query.
184    ///
185    /// # Returns
186    ///
187    /// A tuple containing the hash data as two `String` values.
188    async fn query_hash_data(&self, input: QueryHashDataInput) -> (String, String);
189}
190
191pub struct TableDualSourceQueryExecutorImpl {
192    first_db_client: Pool,
193    second_db_client: Pool,
194}
195
196impl TableDualSourceQueryExecutorImpl {
197    pub fn new(first_db_client: Pool, second_db_client: Pool) -> Self {
198        Self {
199            first_db_client,
200            second_db_client,
201        }
202    }
203}
204
205#[async_trait]
206impl TableDualSourceQueryExecutor for TableDualSourceQueryExecutorImpl {
207    async fn query_table_count(&self, input: QueryTableCountInput) -> (Result<i64>, Result<i64>) {
208        // Clone the database clients
209        let first_client = self.first_db_client.get().await.unwrap();
210        let second_client = self.second_db_client.get().await.unwrap();
211
212        // Prepare the query for counting rows
213        let count_rows_query = TableQuery::CountRowsForTable(
214            input.schema_name().to_owned(),
215            input.table_name().to_owned(),
216        );
217
218        let count_query_binding = count_rows_query.to_string();
219
220        // Prepare count queries for both databases
221        let first_count = first_client.query_one(&count_query_binding, &[]);
222        let second_count = second_client.query_one(&count_query_binding, &[]);
223
224        // Fetch counts for both databases
225        let count_fetch_futures = futures::future::join_all(vec![first_count, second_count]).await;
226
227        let first_count = count_fetch_futures.first().unwrap();
228        let second_count = count_fetch_futures.get(1).unwrap();
229
230        // Map count results to [anyhow::Result<i64>]
231        let first_count: Result<i64> = match first_count {
232            Ok(pg_row) => Ok(pg_row.get("count")),
233            Err(_e) => Err(anyhow::anyhow!("Failed to fetch count for first table")),
234        };
235
236        let second_count: Result<i64> = match second_count {
237            Ok(pg_row) => Ok(pg_row.get("count")),
238            Err(_e) => Err(anyhow::anyhow!("Failed to fetch count for second table")),
239        };
240
241        (first_count, second_count)
242    }
243
244    async fn query_hash_data(&self, input: QueryHashDataInput) -> (String, String) {
245        // Clone the database clients
246        let first_client = self.first_db_client.get().await.unwrap();
247        let second_client = self.second_db_client.get().await.unwrap();
248
249        // Prepare the query for fetching data hashes
250        let hash_query = TableQuery::HashQuery(
251            input.schema_name(),
252            input.table_name(),
253            input.primary_keys(),
254            input.position(),
255            input.offset(),
256        );
257
258        let hash_query_binding = hash_query.to_string();
259
260        // Prepare hash queries for both databases
261        let first_hash = first_client.query_one(&hash_query_binding, &[]);
262        let second_hash = second_client.query_one(&hash_query_binding, &[]);
263
264        // Fetch hashes for both databases
265        let hash_fetch_futures = futures::future::join_all(vec![first_hash, second_hash]).await;
266
267        let first_hash = hash_fetch_futures.first().unwrap();
268        let second_hash = hash_fetch_futures.get(1).unwrap();
269
270        // Map hash results to [String]
271        let first_hash = match first_hash {
272            Ok(pg_row) => pg_row.try_get("md5").unwrap_or("not_available".to_string()),
273            Err(e) => e.to_string(),
274        };
275        let second_hash = match second_hash {
276            Ok(pg_row) => pg_row.try_get("md5").unwrap_or("not_available".to_string()),
277            Err(e) => e.to_string(),
278        };
279
280        (first_hash, second_hash)
281    }
282}