use crate::core::errors::DataProfilerError;
use crate::database::{validate_base_query, validate_sql_identifier};
pub fn not_connected_error() -> DataProfilerError {
DataProfilerError::database_connection("Not connected to database")
}
#[allow(dead_code)]
pub fn feature_not_enabled_error(db_name: &str, feature: &str) -> DataProfilerError {
DataProfilerError::database_feature_disabled(db_name, feature)
}
#[macro_export]
macro_rules! streaming_profile_loop {
($pool:expr, $query:expr, $batch_size:expr, $total_rows:expr, $db_name:literal) => {{
use sqlx::{Column, Row};
use $crate::database::connectors::common::build_batch_query;
use $crate::database::streaming::{StreamingProgress, merge_column_batches};
let mut progress = StreamingProgress::new(Some($total_rows as u64));
let mut all_batches: Vec<std::collections::HashMap<String, Vec<String>>> = Vec::new();
let mut offset = 0usize;
loop {
let batch_query = build_batch_query($query, $batch_size, offset)?;
let rows = sqlx::query(&batch_query)
.fetch_all($pool)
.await
.map_err(
|e| $crate::core::errors::DataProfilerError::DatabaseQueryError {
message: format!("Batch query execution failed: {}", e),
},
)?;
if rows.is_empty() {
break;
}
let columns = rows[0].columns();
let mut batch_result: std::collections::HashMap<String, Vec<String>> =
std::collections::HashMap::with_capacity(columns.len());
for col in columns {
batch_result.insert(col.name().to_string(), Vec::with_capacity(rows.len()));
}
for row in &rows {
for (i, col) in columns.iter().enumerate() {
let value: Option<String> = row.try_get(i).ok();
if let Some(column_data) = batch_result.get_mut(col.name()) {
column_data.push(value.unwrap_or_default());
}
}
}
let batch_size_actual = rows.len();
all_batches.push(batch_result);
progress.update(batch_size_actual as u64);
if let Some(percentage) = progress.percentage() {
log::info!(
"{} streaming progress: {:.1}% ({}/{} rows)",
$db_name,
percentage,
progress.processed_rows,
$total_rows
);
}
offset += $batch_size;
if batch_size_actual < $batch_size {
break;
}
}
merge_column_batches(all_batches)
}};
}
#[macro_export]
macro_rules! process_rows_to_columns {
($rows:expr) => {{
use sqlx::{Column, Row};
if $rows.is_empty() {
std::collections::HashMap::new()
} else {
let columns = $rows[0].columns();
let mut result: std::collections::HashMap<String, Vec<String>> =
std::collections::HashMap::with_capacity(columns.len());
for col in columns {
result.insert(col.name().to_string(), Vec::with_capacity($rows.len()));
}
for row in &$rows {
for (i, col) in columns.iter().enumerate() {
let value: Option<String> = row.try_get(i).ok();
if let Some(column_data) = result.get_mut(col.name()) {
column_data.push(value.unwrap_or_default());
}
}
}
result
}
}};
}
pub fn build_count_query(query: &str) -> Result<String, DataProfilerError> {
if query.trim().to_uppercase().starts_with("SELECT") {
let validated_query = validate_base_query(query)?;
Ok(format!(
"SELECT COUNT(*) FROM ({}) as count_subquery",
validated_query
))
} else {
validate_sql_identifier(query)?;
Ok(format!("SELECT COUNT(*) FROM {}", query))
}
}
pub fn build_batch_query(
query: &str,
batch_size: usize,
offset: usize,
) -> Result<String, DataProfilerError> {
let validated_query = if query.trim().to_uppercase().starts_with("SELECT") {
validate_base_query(query)?
} else {
validate_sql_identifier(query)?;
format!("SELECT * FROM {}", query)
};
Ok(format!(
"{} LIMIT {} OFFSET {}",
validated_query, batch_size, offset
))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_build_count_query_table() {
let result = build_count_query("users").unwrap();
assert_eq!(result, "SELECT COUNT(*) FROM users");
}
#[test]
fn test_build_count_query_select() {
let result = build_count_query("SELECT * FROM users WHERE active = true").unwrap();
assert!(result.contains("SELECT COUNT(*) FROM"));
assert!(result.contains("count_subquery"));
}
#[test]
fn test_build_batch_query() {
let result = build_batch_query("users", 100, 0).unwrap();
assert_eq!(result, "SELECT * FROM users LIMIT 100 OFFSET 0");
}
}