rust_pgdatadiff/diff/table/query/table_query_executor.rs
1/// This module contains the implementation of query executors for table-related operations.
2/// It provides traits and structs for executing queries on a single data source and on dual data sources.
3/// The single data source executor is responsible for querying table names and primary keys.
4/// The dual data source executor is responsible for querying table counts and hash data.
5/// Both executors use the `sqlx` crate for interacting with the database.
6///
7/// # Examples
8///
9/// ```no_run
10/// use sqlx::postgres::PgPool;
11/// use rust_pgdatadiff::diff::table::query::table_query_executor::{
12/// TableSingleSourceQueryExecutor, TableSingleSourceQueryExecutorImpl,
13/// TableDualSourceQueryExecutor, TableDualSourceQueryExecutorImpl,
14/// };
15/// use rust_pgdatadiff::diff::table::query::input::{QueryHashDataInput, QueryPrimaryKeysInput, QueryTableCountInput, QueryTableNamesInput};///
16/// use rust_pgdatadiff::diff::table::query::table_types::{TableName, TableOffset, TablePosition, TablePrimaryKeys};
17/// use rust_pgdatadiff::diff::types::SchemaName;
18///
19/// #[tokio::main]
20/// async fn main() {
21/// // Create a single data source executor
22/// let db_client: PgPool = PgPool::connect("postgres://user:password@localhost:5432/database")
23/// .await
24/// .unwrap();
25/// let single_source_executor = TableSingleSourceQueryExecutorImpl::new(db_client);
26///
27/// // Query table names
28/// let schema_name = SchemaName::new("public".to_string());
29/// let included_tables = vec!["table1", "table2"];
30/// let excluded_tables: Vec<String> = vec![];
31/// let table_names = single_source_executor
32/// .query_table_names(QueryTableNamesInput::new(schema_name, included_tables, excluded_tables))
33/// .await;
34///
35/// // Query primary keys
36/// let primary_keys = single_source_executor
37/// .query_primary_keys(QueryPrimaryKeysInput::new("table1".to_string()))
38/// .await;
39///
40/// // Create a dual data source executor
41/// let first_db_client: PgPool = PgPool::connect("postgres://user:password@localhost:5432/database1")
42/// .await
43/// .unwrap();
44/// let second_db_client: PgPool = PgPool::connect("postgres://user:password@localhost:5432/database2")
45/// .await
46/// .unwrap();
47/// let dual_source_executor = TableDualSourceQueryExecutorImpl::new(first_db_client, second_db_client);
48///
49/// // Query table counts
50/// let schema_name = SchemaName::new("public");
51/// let table_name = TableName::new("table1");
52/// let (first_count, second_count) = dual_source_executor
53/// .query_table_count(QueryTableCountInput::new(schema_name, table_name))
54/// .await;
55///
56/// // Query hash data
57/// let schema_name = SchemaName::new("public");
58/// let table_name = TableName::new("table1");
59/// let primary_keys = TablePrimaryKeys::new("id");
60/// let table_position = TablePosition::new(0);
61/// let table_offset = TableOffset::new(100);
62/// let (first_hash, second_hash) = dual_source_executor
63/// .query_hash_data(QueryHashDataInput::new(schema_name, table_name, primary_keys, table_position, table_offset))
64/// .await;
65/// }
66/// ```
67use anyhow::Result;
68use async_trait::async_trait;
69use deadpool_postgres::Pool;
70
71use crate::diff::table::query::input::{
72 QueryHashDataInput, QueryPrimaryKeysInput, QueryTableCountInput, QueryTableNamesInput,
73};
74use crate::diff::table::query::table_query::TableQuery;
75use crate::diff::table::query::table_types::{IncludedExcludedTables, TableName};
76
77#[cfg(test)]
78use mockall::automock;
79
80#[cfg_attr(test, automock)]
81#[async_trait]
82/// This trait represents a query executor for a single source table.
83pub trait TableSingleSourceQueryExecutor {
84 /// Queries the table names from the database.
85 ///
86 /// # Arguments
87 ///
88 /// * `input` - The input parameters for the query.
89 ///
90 /// # Returns
91 ///
92 /// A vector of table names.
93 async fn query_table_names(&self, input: QueryTableNamesInput) -> Vec<String>;
94
95 /// Queries the primary keys of a table from the database.
96 ///
97 /// # Arguments
98 ///
99 /// * `input` - The input parameters for the query.
100 ///
101 /// # Returns
102 ///
103 /// A vector of primary key column names.
104 async fn query_primary_keys(&self, input: QueryPrimaryKeysInput) -> Vec<String>;
105}
106
107pub struct TableSingleSourceQueryExecutorImpl {
108 db_pool: Pool,
109}
110
111impl TableSingleSourceQueryExecutorImpl {
112 pub fn new(db_pool: Pool) -> Self {
113 Self { db_pool }
114 }
115}
116
117#[async_trait]
118impl TableSingleSourceQueryExecutor for TableSingleSourceQueryExecutorImpl {
119 async fn query_table_names(&self, input: QueryTableNamesInput) -> Vec<String> {
120 // Acquire the database client
121 let client = self.db_pool.get().await.unwrap();
122
123 // Prepare the query for fetching table names
124 let all_tables_query = TableQuery::AllTablesForSchema(
125 input.schema_name().to_owned(),
126 IncludedExcludedTables::new(input.included_tables(), input.excluded_tables()),
127 );
128
129 // Fetch table names
130 let query_result = client
131 .query(&all_tables_query.to_string(), &[])
132 .await
133 .unwrap();
134
135 // Map query results to [Vec<String>]
136 query_result
137 .iter()
138 .map(|row| row.get("table_name"))
139 .collect::<Vec<String>>()
140 }
141
142 async fn query_primary_keys(&self, input: QueryPrimaryKeysInput) -> Vec<String> {
143 // Acquire the database client
144 let client = self.db_pool.get().await.unwrap();
145
146 // Prepare the query for primary keys fetching
147 let find_primary_key_query =
148 TableQuery::FindPrimaryKeyForTable(TableName::new(input.table_name()));
149
150 // Fetch primary keys for the table
151 let query_result = client
152 .query(&find_primary_key_query.to_string(), &[])
153 .await
154 .unwrap();
155
156 // Map query results to [Vec<String>]
157 query_result
158 .iter()
159 .map(|row| row.get("attname"))
160 .collect::<Vec<String>>()
161 }
162}
163
164#[cfg_attr(test, automock)]
165#[async_trait]
166/// This trait defines the methods for executing queries on a dual source table.
167pub trait TableDualSourceQueryExecutor {
168 /// Executes a query to retrieve the count of rows in a table.
169 ///
170 /// # Arguments
171 ///
172 /// * `input` - The input parameters for the query.
173 ///
174 /// # Returns
175 ///
176 /// A tuple containing the result of the query as a `Result<i64>`.
177 async fn query_table_count(&self, input: QueryTableCountInput) -> (Result<i64>, Result<i64>);
178
179 /// Executes a query to retrieve the hash data of a table.
180 ///
181 /// # Arguments
182 ///
183 /// * `input` - The input parameters for the query.
184 ///
185 /// # Returns
186 ///
187 /// A tuple containing the hash data as two `String` values.
188 async fn query_hash_data(&self, input: QueryHashDataInput) -> (String, String);
189}
190
191pub struct TableDualSourceQueryExecutorImpl {
192 first_db_client: Pool,
193 second_db_client: Pool,
194}
195
196impl TableDualSourceQueryExecutorImpl {
197 pub fn new(first_db_client: Pool, second_db_client: Pool) -> Self {
198 Self {
199 first_db_client,
200 second_db_client,
201 }
202 }
203}
204
205#[async_trait]
206impl TableDualSourceQueryExecutor for TableDualSourceQueryExecutorImpl {
207 async fn query_table_count(&self, input: QueryTableCountInput) -> (Result<i64>, Result<i64>) {
208 // Clone the database clients
209 let first_client = self.first_db_client.get().await.unwrap();
210 let second_client = self.second_db_client.get().await.unwrap();
211
212 // Prepare the query for counting rows
213 let count_rows_query = TableQuery::CountRowsForTable(
214 input.schema_name().to_owned(),
215 input.table_name().to_owned(),
216 );
217
218 let count_query_binding = count_rows_query.to_string();
219
220 // Prepare count queries for both databases
221 let first_count = first_client.query_one(&count_query_binding, &[]);
222 let second_count = second_client.query_one(&count_query_binding, &[]);
223
224 // Fetch counts for both databases
225 let count_fetch_futures = futures::future::join_all(vec![first_count, second_count]).await;
226
227 let first_count = count_fetch_futures.first().unwrap();
228 let second_count = count_fetch_futures.get(1).unwrap();
229
230 // Map count results to [anyhow::Result<i64>]
231 let first_count: Result<i64> = match first_count {
232 Ok(pg_row) => Ok(pg_row.get("count")),
233 Err(_e) => Err(anyhow::anyhow!("Failed to fetch count for first table")),
234 };
235
236 let second_count: Result<i64> = match second_count {
237 Ok(pg_row) => Ok(pg_row.get("count")),
238 Err(_e) => Err(anyhow::anyhow!("Failed to fetch count for second table")),
239 };
240
241 (first_count, second_count)
242 }
243
244 async fn query_hash_data(&self, input: QueryHashDataInput) -> (String, String) {
245 // Clone the database clients
246 let first_client = self.first_db_client.get().await.unwrap();
247 let second_client = self.second_db_client.get().await.unwrap();
248
249 // Prepare the query for fetching data hashes
250 let hash_query = TableQuery::HashQuery(
251 input.schema_name(),
252 input.table_name(),
253 input.primary_keys(),
254 input.position(),
255 input.offset(),
256 );
257
258 let hash_query_binding = hash_query.to_string();
259
260 // Prepare hash queries for both databases
261 let first_hash = first_client.query_one(&hash_query_binding, &[]);
262 let second_hash = second_client.query_one(&hash_query_binding, &[]);
263
264 // Fetch hashes for both databases
265 let hash_fetch_futures = futures::future::join_all(vec![first_hash, second_hash]).await;
266
267 let first_hash = hash_fetch_futures.first().unwrap();
268 let second_hash = hash_fetch_futures.get(1).unwrap();
269
270 // Map hash results to [String]
271 let first_hash = match first_hash {
272 Ok(pg_row) => pg_row.try_get("md5").unwrap_or("not_available".to_string()),
273 Err(e) => e.to_string(),
274 };
275 let second_hash = match second_hash {
276 Ok(pg_row) => pg_row.try_get("md5").unwrap_or("not_available".to_string()),
277 Err(e) => e.to_string(),
278 };
279
280 (first_hash, second_hash)
281 }
282}