mod anomaly_extractor;
pub mod approver_extractor;
pub mod banking_extractor;
pub mod behavioral_extractor;
pub mod coa_extractor;
mod correlation_extractor;
mod integrity_extractor;
pub mod manual_extractor;
pub mod pii_denylist;
pub mod reference_extractor;
mod rules_extractor;
mod schema_extractor;
mod stats_extractor;
pub mod streaming;
pub mod tb_extractor;
pub mod text_extractor;
pub mod user_extractor;
pub use anomaly_extractor::*;
pub use approver_extractor::extract_approver_prior_from_parquet;
pub use banking_extractor::*;
pub use coa_extractor::extract_coa_semantic_from_parquet;
pub use correlation_extractor::*;
pub use integrity_extractor::*;
pub use manual_extractor::extract_manual_share_from_parquet;
pub use pii_denylist::PiiDenylist;
pub use reference_extractor::{extract_reference_formats, fill_template, tokenize_reference};
pub use rules_extractor::*;
pub use schema_extractor::*;
pub use stats_extractor::*;
pub use streaming::{StreamingCategoricalStats, StreamingNumericStats};
pub use tb_extractor::extract_tb_anchor_from_parquet;
pub use text_extractor::{
extract_text_taxonomy, extract_text_taxonomy_checked, extract_text_taxonomy_from_records,
TextTaxonomyRecord,
};
use std::path::Path;
use crate::error::{FingerprintError, FingerprintResult};
use crate::models::{
Fingerprint, Manifest, PrivacyLevel, PrivacyMetadata, SchemaFingerprint, SourceMetadata,
StatisticsFingerprint,
};
use crate::privacy::{PrivacyConfig, PrivacyEngine};
#[derive(Debug, Clone)]
pub struct ExtractionConfig {
pub privacy: PrivacyConfig,
pub extract_correlations: bool,
pub extract_integrity: bool,
pub extract_rules: bool,
pub extract_anomalies: bool,
pub max_sample_size: Option<usize>,
pub min_rows: usize,
pub streaming: bool,
pub stream_batch_size: usize,
}
impl Default for ExtractionConfig {
fn default() -> Self {
Self {
privacy: PrivacyConfig::from_level(PrivacyLevel::Standard),
extract_correlations: true,
extract_integrity: true,
extract_rules: true,
extract_anomalies: true,
max_sample_size: None,
min_rows: 10,
streaming: false,
stream_batch_size: 10_000,
}
}
}
impl ExtractionConfig {
pub fn with_privacy_level(level: PrivacyLevel) -> Self {
Self {
privacy: PrivacyConfig::from_level(level),
..Default::default()
}
}
pub fn with_streaming(mut self, batch_size: usize) -> Self {
self.streaming = true;
self.stream_batch_size = batch_size;
self
}
}
pub trait Extractor: Send + Sync {
fn name(&self) -> &'static str;
fn extract(
&self,
data: &DataSource,
config: &ExtractionConfig,
privacy: &mut PrivacyEngine,
) -> FingerprintResult<ExtractedComponent>;
}
#[derive(Debug)]
pub enum DataSource {
Csv(CsvDataSource),
Parquet(ParquetDataSource),
Json(JsonDataSource),
Directory(DirectoryDataSource),
Memory(MemoryDataSource),
}
#[derive(Debug)]
pub struct CsvDataSource {
pub path: std::path::PathBuf,
pub has_headers: bool,
pub delimiter: u8,
}
impl CsvDataSource {
pub fn new(path: impl AsRef<Path>) -> Self {
Self {
path: path.as_ref().to_path_buf(),
has_headers: true,
delimiter: b',',
}
}
}
#[derive(Debug)]
pub struct ParquetDataSource {
pub path: std::path::PathBuf,
pub row_groups: Option<Vec<usize>>,
pub columns: Option<Vec<String>>,
}
impl ParquetDataSource {
pub fn new(path: impl AsRef<Path>) -> Self {
Self {
path: path.as_ref().to_path_buf(),
row_groups: None,
columns: None,
}
}
pub fn with_row_groups(mut self, groups: Vec<usize>) -> Self {
self.row_groups = Some(groups);
self
}
pub fn with_columns(mut self, columns: Vec<String>) -> Self {
self.columns = Some(columns);
self
}
}
#[derive(Debug)]
pub struct JsonDataSource {
pub path: std::path::PathBuf,
pub is_array: bool,
}
impl JsonDataSource {
pub fn new(path: impl AsRef<Path>) -> Self {
let path = path.as_ref().to_path_buf();
let is_array = path
.extension()
.map(|ext| ext != "jsonl" && ext != "ndjson")
.unwrap_or(true);
Self { path, is_array }
}
pub fn json_array(path: impl AsRef<Path>) -> Self {
Self {
path: path.as_ref().to_path_buf(),
is_array: true,
}
}
pub fn jsonl(path: impl AsRef<Path>) -> Self {
Self {
path: path.as_ref().to_path_buf(),
is_array: false,
}
}
}
#[derive(Debug)]
pub struct DirectoryDataSource {
pub path: std::path::PathBuf,
pub extensions: Vec<String>,
pub recursive: bool,
}
impl DirectoryDataSource {
pub fn new(path: impl AsRef<Path>) -> Self {
Self {
path: path.as_ref().to_path_buf(),
extensions: vec![
"csv".to_string(),
"parquet".to_string(),
"json".to_string(),
"jsonl".to_string(),
],
recursive: false,
}
}
pub fn with_extensions(mut self, extensions: Vec<String>) -> Self {
self.extensions = extensions;
self
}
pub fn recursive(mut self) -> Self {
self.recursive = true;
self
}
pub fn files(&self) -> std::io::Result<Vec<std::path::PathBuf>> {
let mut files = Vec::new();
self.collect_files(&self.path, &mut files)?;
Ok(files)
}
fn collect_files(
&self,
dir: &Path,
files: &mut Vec<std::path::PathBuf>,
) -> std::io::Result<()> {
for entry in std::fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
if self.recursive {
self.collect_files(&path, files)?;
}
} else if let Some(ext) = path.extension().and_then(|e| e.to_str()) {
let ext_lower = ext.to_lowercase();
if self.extensions.is_empty()
|| self
.extensions
.iter()
.any(|e| e.to_lowercase() == ext_lower)
{
files.push(path);
}
}
}
Ok(())
}
}
#[derive(Debug)]
pub struct MemoryDataSource {
pub columns: Vec<String>,
pub rows: Vec<Vec<String>>,
}
impl MemoryDataSource {
pub fn new(columns: Vec<String>, rows: Vec<Vec<String>>) -> Self {
Self { columns, rows }
}
pub fn row_count(&self) -> usize {
self.rows.len()
}
pub fn column_count(&self) -> usize {
self.columns.len()
}
}
#[derive(Debug)]
pub enum ExtractedComponent {
Schema(SchemaFingerprint),
Statistics(StatisticsFingerprint),
Correlations(crate::models::CorrelationFingerprint),
Integrity(crate::models::IntegrityFingerprint),
Rules(crate::models::RulesFingerprint),
Anomalies(crate::models::AnomalyFingerprint),
}
pub struct FingerprintExtractor {
config: ExtractionConfig,
}
impl FingerprintExtractor {
pub fn new() -> Self {
Self {
config: ExtractionConfig::default(),
}
}
pub fn with_privacy_level(level: PrivacyLevel) -> Self {
Self {
config: ExtractionConfig::with_privacy_level(level),
}
}
pub fn with_config(config: ExtractionConfig) -> Self {
Self { config }
}
pub fn extract_from_csv(&self, path: impl AsRef<Path>) -> FingerprintResult<Fingerprint> {
let source = DataSource::Csv(CsvDataSource::new(path));
self.extract(&source)
}
pub fn extract_streaming_csv(&self, path: impl AsRef<Path>) -> FingerprintResult<Fingerprint> {
use std::collections::HashMap;
use streaming::{StreamingCategoricalStats, StreamingNumericStats};
let path = path.as_ref();
let mut reader = csv::ReaderBuilder::new()
.has_headers(true)
.from_path(path)?;
let headers: Vec<String> = reader
.headers()?
.iter()
.map(std::string::ToString::to_string)
.collect();
let mut numeric_accumulators: HashMap<usize, StreamingNumericStats> = HashMap::new();
let mut categorical_accumulators: HashMap<usize, StreamingCategoricalStats> =
HashMap::new();
let mut column_is_numeric: HashMap<usize, bool> = HashMap::new();
let mut row_count: u64 = 0;
for result in reader.records() {
let record = result?;
row_count += 1;
for (i, field) in record.iter().enumerate() {
if i >= headers.len() {
continue;
}
let is_numeric = column_is_numeric
.entry(i)
.or_insert_with(|| field.parse::<f64>().is_ok() || field.is_empty());
if *is_numeric {
if let Ok(value) = field.parse::<f64>() {
let acc = numeric_accumulators
.entry(i)
.or_insert_with(|| StreamingNumericStats::new(10000));
acc.add(value);
}
} else {
let acc = categorical_accumulators
.entry(i)
.or_insert_with(|| StreamingCategoricalStats::new(1000));
acc.add(field.to_string());
}
}
if let Some(max) = self.config.max_sample_size {
if row_count >= max as u64 {
break;
}
}
}
if row_count < self.config.min_rows as u64 {
return Err(FingerprintError::InsufficientData {
required: self.config.min_rows,
actual: row_count as usize,
});
}
let mut schema = SchemaFingerprint::new();
let table_name = path.file_stem().and_then(|s| s.to_str()).unwrap_or("data");
let mut table = crate::models::TableSchema::new(table_name, row_count);
for (i, header) in headers.iter().enumerate() {
let is_numeric = column_is_numeric.get(&i).copied().unwrap_or(false);
let data_type = if is_numeric {
crate::models::DataType::Float64
} else {
crate::models::DataType::String
};
let field = crate::models::FieldSchema::new(header.clone(), data_type);
table.add_column(field);
}
schema.add_table(table_name.to_string(), table);
let mut statistics = StatisticsFingerprint::new();
for (i, acc) in numeric_accumulators {
let header = &headers[i];
let numeric_stats = crate::models::NumericStats {
count: acc.count(),
min: acc.min(),
max: acc.max(),
mean: acc.mean(),
std_dev: acc.std_dev(),
percentiles: acc.percentiles(),
distribution: crate::models::DistributionType::Unknown,
distribution_params: crate::models::DistributionParams::empty(),
zero_rate: acc.zero_rate(),
negative_rate: acc.negative_rate(),
benford_first_digit: Some(acc.benford_distribution()),
log_magnitude_percentiles: acc.log_magnitude_percentiles(),
};
statistics.add_numeric(table_name, header, numeric_stats);
}
for (i, acc) in categorical_accumulators {
let header = &headers[i];
let top_values: Vec<crate::models::CategoryFrequency> = acc
.top_values(100)
.into_iter()
.map(|(value, count)| {
let frequency = count as f64 / acc.count() as f64;
crate::models::CategoryFrequency::new(value, frequency)
})
.collect();
let categorical_stats = crate::models::CategoricalStats {
count: acc.count(),
cardinality: acc.cardinality(),
top_values,
rare_values_suppressed: true,
suppressed_count: 0,
entropy: acc.entropy(),
};
statistics.add_categorical(table_name, header, categorical_stats);
}
let source_meta = SourceMetadata::new(
format!("CSV file: {} (streaming extraction)", path.display()),
vec![table_name.to_string()],
row_count,
);
let privacy_meta = PrivacyMetadata::from_level(self.config.privacy.level);
let manifest = Manifest::new(source_meta, privacy_meta);
let privacy_audit = crate::models::PrivacyAudit::new(
self.config.privacy.epsilon,
self.config.privacy.k_anonymity,
);
let fingerprint = Fingerprint::new(manifest, schema, statistics, privacy_audit);
Ok(fingerprint)
}
pub fn extract_from_memory(
&self,
columns: Vec<String>,
rows: Vec<Vec<String>>,
) -> FingerprintResult<Fingerprint> {
let source = DataSource::Memory(MemoryDataSource::new(columns, rows));
self.extract(&source)
}
pub fn extract_from_directory(&self, path: impl AsRef<Path>) -> FingerprintResult<Fingerprint> {
let source = DataSource::Directory(DirectoryDataSource::new(path));
self.extract(&source)
}
pub fn extract(&self, source: &DataSource) -> FingerprintResult<Fingerprint> {
if let DataSource::Directory(dir) = source {
return self.extract_from_directory_impl(dir);
}
let mut privacy = PrivacyEngine::new(self.config.privacy.clone());
let schema_extractor = SchemaExtractor;
let schema = match schema_extractor.extract(source, &self.config, &mut privacy)? {
ExtractedComponent::Schema(s) => s,
_ => {
return Err(FingerprintError::extraction(
"schema",
"Unexpected component type",
))
}
};
let stats_extractor = StatsExtractor;
let statistics = match stats_extractor.extract(source, &self.config, &mut privacy)? {
ExtractedComponent::Statistics(s) => s,
_ => {
return Err(FingerprintError::extraction(
"statistics",
"Unexpected component type",
))
}
};
let correlations = if self.config.extract_correlations {
let extractor = CorrelationExtractor;
match extractor.extract(source, &self.config, &mut privacy) {
Ok(ExtractedComponent::Correlations(c)) => Some(c),
Ok(_) => None,
Err(e) => {
tracing::warn!("Optional correlations extraction failed: {}", e);
None
}
}
} else {
None
};
let integrity = if self.config.extract_integrity {
let extractor = IntegrityExtractor;
match extractor.extract(source, &self.config, &mut privacy) {
Ok(ExtractedComponent::Integrity(i)) => Some(i),
Ok(_) => None,
Err(e) => {
tracing::warn!("Optional integrity extraction failed: {}", e);
None
}
}
} else {
None
};
let rules = if self.config.extract_rules {
let extractor = RulesExtractor;
match extractor.extract(source, &self.config, &mut privacy) {
Ok(ExtractedComponent::Rules(r)) => Some(r),
Ok(_) => None,
Err(e) => {
tracing::warn!("Optional rules extraction failed: {}", e);
None
}
}
} else {
None
};
let anomalies = if self.config.extract_anomalies {
let extractor = AnomalyExtractor;
match extractor.extract(source, &self.config, &mut privacy) {
Ok(ExtractedComponent::Anomalies(a)) => Some(a),
Ok(_) => None,
Err(e) => {
tracing::warn!("Optional anomalies extraction failed: {}", e);
None
}
}
} else {
None
};
let source_meta = build_source_metadata(source, &schema);
let privacy_meta = privacy.build_privacy_metadata();
let manifest = Manifest::new(source_meta, privacy_meta);
let privacy_audit = privacy.into_audit();
let mut fingerprint = Fingerprint::new(manifest, schema, statistics, privacy_audit);
if let Some(c) = correlations {
fingerprint = fingerprint.with_correlations(c);
}
if let Some(i) = integrity {
fingerprint = fingerprint.with_integrity(i);
}
if let Some(r) = rules {
fingerprint = fingerprint.with_rules(r);
}
if let Some(a) = anomalies {
fingerprint = fingerprint.with_anomalies(a);
}
Ok(fingerprint)
}
fn extract_from_directory_impl(
&self,
dir: &DirectoryDataSource,
) -> FingerprintResult<Fingerprint> {
let files = dir.files()?;
if files.is_empty() {
return Err(FingerprintError::InvalidFormat(format!(
"No supported files found in directory: {}",
dir.path.display()
)));
}
let mut merged_schema = SchemaFingerprint::new();
let mut merged_stats = StatisticsFingerprint::new();
let mut total_rows: u64 = 0;
let mut table_names: Vec<String> = Vec::new();
let mut total_epsilon_spent = 0.0;
let mut all_actions = Vec::new();
let num_files = files.len();
let per_file_epsilon = self.config.privacy.epsilon / num_files as f64;
for file_path in &files {
let ext = file_path
.extension()
.and_then(|e| e.to_str())
.map(str::to_lowercase)
.unwrap_or_default();
let source = match ext.as_str() {
"csv" => DataSource::Csv(CsvDataSource::new(file_path)),
"parquet" => DataSource::Parquet(ParquetDataSource::new(file_path)),
"json" => DataSource::Json(JsonDataSource::json_array(file_path)),
"jsonl" | "ndjson" => DataSource::Json(JsonDataSource::jsonl(file_path)),
_ => continue, };
let mut per_file_config = self.config.privacy.clone();
per_file_config.epsilon = per_file_epsilon;
let mut file_privacy = PrivacyEngine::new(per_file_config);
let schema_extractor = SchemaExtractor;
if let Ok(ExtractedComponent::Schema(schema)) =
schema_extractor.extract(&source, &self.config, &mut file_privacy)
{
for (name, table) in schema.tables {
total_rows += table.row_count;
table_names.push(name.clone());
merged_schema.add_table(name, table);
}
}
let stats_extractor = StatsExtractor;
if let Ok(ExtractedComponent::Statistics(stats)) =
stats_extractor.extract(&source, &self.config, &mut file_privacy)
{
for (key, numeric) in stats.numeric_columns {
merged_stats.numeric_columns.insert(key, numeric);
}
for (key, categorical) in stats.categorical_columns {
merged_stats.categorical_columns.insert(key, categorical);
}
}
let file_audit = file_privacy.into_audit();
total_epsilon_spent += file_audit.total_epsilon_spent;
all_actions.extend(file_audit.actions);
}
let description = format!("Directory: {} ({} files)", dir.path.display(), files.len());
let source_meta = SourceMetadata::new(description, table_names, total_rows);
let privacy_meta = PrivacyMetadata::from_level(self.config.privacy.level);
let manifest = Manifest::new(source_meta, privacy_meta);
let mut privacy_audit = crate::models::PrivacyAudit::new(
self.config.privacy.epsilon,
self.config.privacy.k_anonymity,
);
privacy_audit.total_epsilon_spent = total_epsilon_spent;
privacy_audit.actions = all_actions;
let fingerprint = Fingerprint::new(manifest, merged_schema, merged_stats, privacy_audit);
Ok(fingerprint)
}
}
impl Default for FingerprintExtractor {
fn default() -> Self {
Self::new()
}
}
fn build_source_metadata(source: &DataSource, schema: &SchemaFingerprint) -> SourceMetadata {
let (description, tables, total_rows) = match source {
DataSource::Csv(csv) => {
let name = csv
.path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown")
.to_string();
let rows = schema.tables.values().map(|t| t.row_count).sum();
(format!("CSV file: {name}"), vec![name], rows)
}
DataSource::Parquet(pq) => {
let name = pq
.path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown")
.to_string();
let rows = schema.tables.values().map(|t| t.row_count).sum();
(format!("Parquet file: {name}"), vec![name], rows)
}
DataSource::Json(json) => {
let name = json
.path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown")
.to_string();
let rows = schema.tables.values().map(|t| t.row_count).sum();
let format_type = if json.is_array { "JSON" } else { "JSONL" };
(format!("{format_type} file: {name}"), vec![name], rows)
}
DataSource::Memory(mem) => {
let rows = mem.row_count() as u64;
(
"In-memory data".to_string(),
vec!["memory".to_string()],
rows,
)
}
DataSource::Directory(dir) => {
let name = dir.path.display().to_string();
(format!("Directory: {name}"), vec![], 0)
}
};
SourceMetadata::new(description, tables, total_rows)
}