1mod anomaly_extractor;
100pub mod approver_extractor;
101pub mod banking_extractor;
102pub mod behavioral_extractor;
103pub mod coa_extractor;
104mod correlation_extractor;
105mod integrity_extractor;
106pub mod manual_extractor;
107pub mod pii_denylist;
108pub mod reference_extractor;
109mod rules_extractor;
110mod schema_extractor;
111mod stats_extractor;
112pub mod streaming;
113pub mod tb_extractor;
114pub mod text_extractor;
115pub mod user_extractor;
116
117pub use anomaly_extractor::*;
118pub use approver_extractor::extract_approver_prior_from_parquet;
119pub use banking_extractor::*;
120pub use coa_extractor::extract_coa_semantic_from_parquet;
121pub use correlation_extractor::*;
122pub use integrity_extractor::*;
123pub use manual_extractor::extract_manual_share_from_parquet;
124pub use pii_denylist::PiiDenylist;
125pub use reference_extractor::{extract_reference_formats, fill_template, tokenize_reference};
126pub use rules_extractor::*;
127pub use schema_extractor::*;
128pub use stats_extractor::*;
129pub use streaming::{StreamingCategoricalStats, StreamingNumericStats};
130pub use tb_extractor::extract_tb_anchor_from_parquet;
131pub use text_extractor::{
132 extract_text_taxonomy, extract_text_taxonomy_checked, extract_text_taxonomy_from_records,
133 TextTaxonomyRecord,
134};
135
136use std::path::Path;
137
138use crate::error::{FingerprintError, FingerprintResult};
139use crate::models::{
140 Fingerprint, Manifest, PrivacyLevel, PrivacyMetadata, SchemaFingerprint, SourceMetadata,
141 StatisticsFingerprint,
142};
143use crate::privacy::{PrivacyConfig, PrivacyEngine};
144
145#[derive(Debug, Clone)]
147pub struct ExtractionConfig {
148 pub privacy: PrivacyConfig,
150 pub extract_correlations: bool,
152 pub extract_integrity: bool,
154 pub extract_rules: bool,
156 pub extract_anomalies: bool,
158 pub max_sample_size: Option<usize>,
160 pub min_rows: usize,
162 pub streaming: bool,
167 pub stream_batch_size: usize,
172}
173
174impl Default for ExtractionConfig {
175 fn default() -> Self {
176 Self {
177 privacy: PrivacyConfig::from_level(PrivacyLevel::Standard),
178 extract_correlations: true,
179 extract_integrity: true,
180 extract_rules: true,
181 extract_anomalies: true,
182 max_sample_size: None,
183 min_rows: 10,
184 streaming: false,
185 stream_batch_size: 10_000,
186 }
187 }
188}
189
190impl ExtractionConfig {
191 pub fn with_privacy_level(level: PrivacyLevel) -> Self {
193 Self {
194 privacy: PrivacyConfig::from_level(level),
195 ..Default::default()
196 }
197 }
198
199 pub fn with_streaming(mut self, batch_size: usize) -> Self {
204 self.streaming = true;
205 self.stream_batch_size = batch_size;
206 self
207 }
208}
209
210pub trait Extractor: Send + Sync {
212 fn name(&self) -> &'static str;
214
215 fn extract(
217 &self,
218 data: &DataSource,
219 config: &ExtractionConfig,
220 privacy: &mut PrivacyEngine,
221 ) -> FingerprintResult<ExtractedComponent>;
222}
223
224#[derive(Debug)]
226pub enum DataSource {
227 Csv(CsvDataSource),
229 Parquet(ParquetDataSource),
231 Json(JsonDataSource),
233 Directory(DirectoryDataSource),
235 Memory(MemoryDataSource),
237}
238
239#[derive(Debug)]
241pub struct CsvDataSource {
242 pub path: std::path::PathBuf,
244 pub has_headers: bool,
246 pub delimiter: u8,
248}
249
250impl CsvDataSource {
251 pub fn new(path: impl AsRef<Path>) -> Self {
253 Self {
254 path: path.as_ref().to_path_buf(),
255 has_headers: true,
256 delimiter: b',',
257 }
258 }
259}
260
261#[derive(Debug)]
263pub struct ParquetDataSource {
264 pub path: std::path::PathBuf,
266 pub row_groups: Option<Vec<usize>>,
268 pub columns: Option<Vec<String>>,
270}
271
272impl ParquetDataSource {
273 pub fn new(path: impl AsRef<Path>) -> Self {
275 Self {
276 path: path.as_ref().to_path_buf(),
277 row_groups: None,
278 columns: None,
279 }
280 }
281
282 pub fn with_row_groups(mut self, groups: Vec<usize>) -> Self {
284 self.row_groups = Some(groups);
285 self
286 }
287
288 pub fn with_columns(mut self, columns: Vec<String>) -> Self {
290 self.columns = Some(columns);
291 self
292 }
293}
294
295#[derive(Debug)]
297pub struct JsonDataSource {
298 pub path: std::path::PathBuf,
300 pub is_array: bool,
302}
303
304impl JsonDataSource {
305 pub fn new(path: impl AsRef<Path>) -> Self {
307 let path = path.as_ref().to_path_buf();
308 let is_array = path
309 .extension()
310 .map(|ext| ext != "jsonl" && ext != "ndjson")
311 .unwrap_or(true);
312 Self { path, is_array }
313 }
314
315 pub fn json_array(path: impl AsRef<Path>) -> Self {
317 Self {
318 path: path.as_ref().to_path_buf(),
319 is_array: true,
320 }
321 }
322
323 pub fn jsonl(path: impl AsRef<Path>) -> Self {
325 Self {
326 path: path.as_ref().to_path_buf(),
327 is_array: false,
328 }
329 }
330}
331
332#[derive(Debug)]
334pub struct DirectoryDataSource {
335 pub path: std::path::PathBuf,
337 pub extensions: Vec<String>,
339 pub recursive: bool,
341}
342
343impl DirectoryDataSource {
344 pub fn new(path: impl AsRef<Path>) -> Self {
346 Self {
347 path: path.as_ref().to_path_buf(),
348 extensions: vec![
349 "csv".to_string(),
350 "parquet".to_string(),
351 "json".to_string(),
352 "jsonl".to_string(),
353 ],
354 recursive: false,
355 }
356 }
357
358 pub fn with_extensions(mut self, extensions: Vec<String>) -> Self {
360 self.extensions = extensions;
361 self
362 }
363
364 pub fn recursive(mut self) -> Self {
366 self.recursive = true;
367 self
368 }
369
370 pub fn files(&self) -> std::io::Result<Vec<std::path::PathBuf>> {
372 let mut files = Vec::new();
373 self.collect_files(&self.path, &mut files)?;
374 Ok(files)
375 }
376
377 fn collect_files(
378 &self,
379 dir: &Path,
380 files: &mut Vec<std::path::PathBuf>,
381 ) -> std::io::Result<()> {
382 for entry in std::fs::read_dir(dir)? {
383 let entry = entry?;
384 let path = entry.path();
385
386 if path.is_dir() {
387 if self.recursive {
388 self.collect_files(&path, files)?;
389 }
390 } else if let Some(ext) = path.extension().and_then(|e| e.to_str()) {
391 let ext_lower = ext.to_lowercase();
392 if self.extensions.is_empty()
393 || self
394 .extensions
395 .iter()
396 .any(|e| e.to_lowercase() == ext_lower)
397 {
398 files.push(path);
399 }
400 }
401 }
402 Ok(())
403 }
404}
405
406#[derive(Debug)]
408pub struct MemoryDataSource {
409 pub columns: Vec<String>,
411 pub rows: Vec<Vec<String>>,
413}
414
415impl MemoryDataSource {
416 pub fn new(columns: Vec<String>, rows: Vec<Vec<String>>) -> Self {
418 Self { columns, rows }
419 }
420
421 pub fn row_count(&self) -> usize {
423 self.rows.len()
424 }
425
426 pub fn column_count(&self) -> usize {
428 self.columns.len()
429 }
430}
431
432#[derive(Debug)]
434pub enum ExtractedComponent {
435 Schema(SchemaFingerprint),
436 Statistics(StatisticsFingerprint),
437 Correlations(crate::models::CorrelationFingerprint),
438 Integrity(crate::models::IntegrityFingerprint),
439 Rules(crate::models::RulesFingerprint),
440 Anomalies(crate::models::AnomalyFingerprint),
441}
442
443pub struct FingerprintExtractor {
445 config: ExtractionConfig,
446}
447
448impl FingerprintExtractor {
449 pub fn new() -> Self {
451 Self {
452 config: ExtractionConfig::default(),
453 }
454 }
455
456 pub fn with_privacy_level(level: PrivacyLevel) -> Self {
458 Self {
459 config: ExtractionConfig::with_privacy_level(level),
460 }
461 }
462
463 pub fn with_config(config: ExtractionConfig) -> Self {
465 Self { config }
466 }
467
468 pub fn extract_from_csv(&self, path: impl AsRef<Path>) -> FingerprintResult<Fingerprint> {
470 let source = DataSource::Csv(CsvDataSource::new(path));
471 self.extract(&source)
472 }
473
474 pub fn extract_streaming_csv(&self, path: impl AsRef<Path>) -> FingerprintResult<Fingerprint> {
490 use std::collections::HashMap;
491 use streaming::{StreamingCategoricalStats, StreamingNumericStats};
492
493 let path = path.as_ref();
494 let mut reader = csv::ReaderBuilder::new()
495 .has_headers(true)
496 .from_path(path)?;
497
498 let headers: Vec<String> = reader
499 .headers()?
500 .iter()
501 .map(std::string::ToString::to_string)
502 .collect();
503
504 let mut numeric_accumulators: HashMap<usize, StreamingNumericStats> = HashMap::new();
506 let mut categorical_accumulators: HashMap<usize, StreamingCategoricalStats> =
507 HashMap::new();
508 let mut column_is_numeric: HashMap<usize, bool> = HashMap::new();
509 let mut row_count: u64 = 0;
510
511 for result in reader.records() {
513 let record = result?;
514 row_count += 1;
515
516 for (i, field) in record.iter().enumerate() {
517 if i >= headers.len() {
518 continue;
519 }
520
521 let is_numeric = column_is_numeric
523 .entry(i)
524 .or_insert_with(|| field.parse::<f64>().is_ok() || field.is_empty());
525
526 if *is_numeric {
527 if let Ok(value) = field.parse::<f64>() {
528 let acc = numeric_accumulators
529 .entry(i)
530 .or_insert_with(|| StreamingNumericStats::new(10000));
531 acc.add(value);
532 }
533 } else {
534 let acc = categorical_accumulators
535 .entry(i)
536 .or_insert_with(|| StreamingCategoricalStats::new(1000));
537 acc.add(field.to_string());
538 }
539 }
540
541 if let Some(max) = self.config.max_sample_size {
543 if row_count >= max as u64 {
544 break;
545 }
546 }
547 }
548
549 if row_count < self.config.min_rows as u64 {
551 return Err(FingerprintError::InsufficientData {
552 required: self.config.min_rows,
553 actual: row_count as usize,
554 });
555 }
556
557 let mut schema = SchemaFingerprint::new();
559 let table_name = path.file_stem().and_then(|s| s.to_str()).unwrap_or("data");
560
561 let mut table = crate::models::TableSchema::new(table_name, row_count);
562 for (i, header) in headers.iter().enumerate() {
563 let is_numeric = column_is_numeric.get(&i).copied().unwrap_or(false);
564 let data_type = if is_numeric {
565 crate::models::DataType::Float64
566 } else {
567 crate::models::DataType::String
568 };
569 let field = crate::models::FieldSchema::new(header.clone(), data_type);
570 table.add_column(field);
571 }
572 schema.add_table(table_name.to_string(), table);
573
574 let mut statistics = StatisticsFingerprint::new();
576
577 for (i, acc) in numeric_accumulators {
578 let header = &headers[i];
579 let numeric_stats = crate::models::NumericStats {
580 count: acc.count(),
581 min: acc.min(),
582 max: acc.max(),
583 mean: acc.mean(),
584 std_dev: acc.std_dev(),
585 percentiles: acc.percentiles(),
586 distribution: crate::models::DistributionType::Unknown,
587 distribution_params: crate::models::DistributionParams::empty(),
588 zero_rate: acc.zero_rate(),
589 negative_rate: acc.negative_rate(),
590 benford_first_digit: Some(acc.benford_distribution()),
591 log_magnitude_percentiles: acc.log_magnitude_percentiles(),
592 };
593 statistics.add_numeric(table_name, header, numeric_stats);
594 }
595
596 for (i, acc) in categorical_accumulators {
597 let header = &headers[i];
598 let top_values: Vec<crate::models::CategoryFrequency> = acc
599 .top_values(100)
600 .into_iter()
601 .map(|(value, count)| {
602 let frequency = count as f64 / acc.count() as f64;
603 crate::models::CategoryFrequency::new(value, frequency)
604 })
605 .collect();
606
607 let categorical_stats = crate::models::CategoricalStats {
608 count: acc.count(),
609 cardinality: acc.cardinality(),
610 top_values,
611 rare_values_suppressed: true,
612 suppressed_count: 0,
613 entropy: acc.entropy(),
614 };
615 statistics.add_categorical(table_name, header, categorical_stats);
616 }
617
618 let source_meta = SourceMetadata::new(
620 format!("CSV file: {} (streaming extraction)", path.display()),
621 vec![table_name.to_string()],
622 row_count,
623 );
624 let privacy_meta = PrivacyMetadata::from_level(self.config.privacy.level);
625 let manifest = Manifest::new(source_meta, privacy_meta);
626
627 let privacy_audit = crate::models::PrivacyAudit::new(
629 self.config.privacy.epsilon,
630 self.config.privacy.k_anonymity,
631 );
632
633 let fingerprint = Fingerprint::new(manifest, schema, statistics, privacy_audit);
634
635 Ok(fingerprint)
636 }
637
638 pub fn extract_from_memory(
640 &self,
641 columns: Vec<String>,
642 rows: Vec<Vec<String>>,
643 ) -> FingerprintResult<Fingerprint> {
644 let source = DataSource::Memory(MemoryDataSource::new(columns, rows));
645 self.extract(&source)
646 }
647
648 pub fn extract_from_directory(&self, path: impl AsRef<Path>) -> FingerprintResult<Fingerprint> {
650 let source = DataSource::Directory(DirectoryDataSource::new(path));
651 self.extract(&source)
652 }
653
654 pub fn extract(&self, source: &DataSource) -> FingerprintResult<Fingerprint> {
656 if let DataSource::Directory(dir) = source {
658 return self.extract_from_directory_impl(dir);
659 }
660
661 let mut privacy = PrivacyEngine::new(self.config.privacy.clone());
662
663 let schema_extractor = SchemaExtractor;
665 let schema = match schema_extractor.extract(source, &self.config, &mut privacy)? {
666 ExtractedComponent::Schema(s) => s,
667 _ => {
668 return Err(FingerprintError::extraction(
669 "schema",
670 "Unexpected component type",
671 ))
672 }
673 };
674
675 let stats_extractor = StatsExtractor;
677 let statistics = match stats_extractor.extract(source, &self.config, &mut privacy)? {
678 ExtractedComponent::Statistics(s) => s,
679 _ => {
680 return Err(FingerprintError::extraction(
681 "statistics",
682 "Unexpected component type",
683 ))
684 }
685 };
686
687 let correlations = if self.config.extract_correlations {
689 let extractor = CorrelationExtractor;
690 match extractor.extract(source, &self.config, &mut privacy) {
691 Ok(ExtractedComponent::Correlations(c)) => Some(c),
692 Ok(_) => None,
693 Err(e) => {
694 tracing::warn!("Optional correlations extraction failed: {}", e);
695 None
696 }
697 }
698 } else {
699 None
700 };
701
702 let integrity = if self.config.extract_integrity {
703 let extractor = IntegrityExtractor;
704 match extractor.extract(source, &self.config, &mut privacy) {
705 Ok(ExtractedComponent::Integrity(i)) => Some(i),
706 Ok(_) => None,
707 Err(e) => {
708 tracing::warn!("Optional integrity extraction failed: {}", e);
709 None
710 }
711 }
712 } else {
713 None
714 };
715
716 let rules = if self.config.extract_rules {
717 let extractor = RulesExtractor;
718 match extractor.extract(source, &self.config, &mut privacy) {
719 Ok(ExtractedComponent::Rules(r)) => Some(r),
720 Ok(_) => None,
721 Err(e) => {
722 tracing::warn!("Optional rules extraction failed: {}", e);
723 None
724 }
725 }
726 } else {
727 None
728 };
729
730 let anomalies = if self.config.extract_anomalies {
731 let extractor = AnomalyExtractor;
732 match extractor.extract(source, &self.config, &mut privacy) {
733 Ok(ExtractedComponent::Anomalies(a)) => Some(a),
734 Ok(_) => None,
735 Err(e) => {
736 tracing::warn!("Optional anomalies extraction failed: {}", e);
737 None
738 }
739 }
740 } else {
741 None
742 };
743
744 let source_meta = build_source_metadata(source, &schema);
746 let privacy_meta = privacy.build_privacy_metadata();
747 let manifest = Manifest::new(source_meta, privacy_meta);
748
749 let privacy_audit = privacy.into_audit();
751
752 let mut fingerprint = Fingerprint::new(manifest, schema, statistics, privacy_audit);
754
755 if let Some(c) = correlations {
756 fingerprint = fingerprint.with_correlations(c);
757 }
758 if let Some(i) = integrity {
759 fingerprint = fingerprint.with_integrity(i);
760 }
761 if let Some(r) = rules {
762 fingerprint = fingerprint.with_rules(r);
763 }
764 if let Some(a) = anomalies {
765 fingerprint = fingerprint.with_anomalies(a);
766 }
767
768 Ok(fingerprint)
769 }
770
771 fn extract_from_directory_impl(
773 &self,
774 dir: &DirectoryDataSource,
775 ) -> FingerprintResult<Fingerprint> {
776 let files = dir.files()?;
777
778 if files.is_empty() {
779 return Err(FingerprintError::InvalidFormat(format!(
780 "No supported files found in directory: {}",
781 dir.path.display()
782 )));
783 }
784
785 let mut merged_schema = SchemaFingerprint::new();
787 let mut merged_stats = StatisticsFingerprint::new();
788 let mut total_rows: u64 = 0;
789 let mut table_names: Vec<String> = Vec::new();
790
791 let mut total_epsilon_spent = 0.0;
793 let mut all_actions = Vec::new();
794
795 let num_files = files.len();
797 let per_file_epsilon = self.config.privacy.epsilon / num_files as f64;
798
799 for file_path in &files {
800 let ext = file_path
802 .extension()
803 .and_then(|e| e.to_str())
804 .map(str::to_lowercase)
805 .unwrap_or_default();
806
807 let source = match ext.as_str() {
808 "csv" => DataSource::Csv(CsvDataSource::new(file_path)),
809 "parquet" => DataSource::Parquet(ParquetDataSource::new(file_path)),
810 "json" => DataSource::Json(JsonDataSource::json_array(file_path)),
811 "jsonl" | "ndjson" => DataSource::Json(JsonDataSource::jsonl(file_path)),
812 _ => continue, };
814
815 let mut per_file_config = self.config.privacy.clone();
817 per_file_config.epsilon = per_file_epsilon;
818 let mut file_privacy = PrivacyEngine::new(per_file_config);
819
820 let schema_extractor = SchemaExtractor;
822 if let Ok(ExtractedComponent::Schema(schema)) =
823 schema_extractor.extract(&source, &self.config, &mut file_privacy)
824 {
825 for (name, table) in schema.tables {
826 total_rows += table.row_count;
827 table_names.push(name.clone());
828 merged_schema.add_table(name, table);
829 }
830 }
831
832 let stats_extractor = StatsExtractor;
834 if let Ok(ExtractedComponent::Statistics(stats)) =
835 stats_extractor.extract(&source, &self.config, &mut file_privacy)
836 {
837 for (key, numeric) in stats.numeric_columns {
839 merged_stats.numeric_columns.insert(key, numeric);
840 }
841 for (key, categorical) in stats.categorical_columns {
842 merged_stats.categorical_columns.insert(key, categorical);
843 }
844 }
845
846 let file_audit = file_privacy.into_audit();
848 total_epsilon_spent += file_audit.total_epsilon_spent;
849 all_actions.extend(file_audit.actions);
850 }
851
852 let description = format!("Directory: {} ({} files)", dir.path.display(), files.len());
854 let source_meta = SourceMetadata::new(description, table_names, total_rows);
855 let privacy_meta = PrivacyMetadata::from_level(self.config.privacy.level);
856 let manifest = Manifest::new(source_meta, privacy_meta);
857
858 let mut privacy_audit = crate::models::PrivacyAudit::new(
860 self.config.privacy.epsilon,
861 self.config.privacy.k_anonymity,
862 );
863 privacy_audit.total_epsilon_spent = total_epsilon_spent;
864 privacy_audit.actions = all_actions;
865
866 let fingerprint = Fingerprint::new(manifest, merged_schema, merged_stats, privacy_audit);
868
869 Ok(fingerprint)
870 }
871}
872
873impl Default for FingerprintExtractor {
874 fn default() -> Self {
875 Self::new()
876 }
877}
878
879fn build_source_metadata(source: &DataSource, schema: &SchemaFingerprint) -> SourceMetadata {
881 let (description, tables, total_rows) = match source {
882 DataSource::Csv(csv) => {
883 let name = csv
884 .path
885 .file_name()
886 .and_then(|n| n.to_str())
887 .unwrap_or("unknown")
888 .to_string();
889 let rows = schema.tables.values().map(|t| t.row_count).sum();
890 (format!("CSV file: {name}"), vec![name], rows)
891 }
892 DataSource::Parquet(pq) => {
893 let name = pq
894 .path
895 .file_name()
896 .and_then(|n| n.to_str())
897 .unwrap_or("unknown")
898 .to_string();
899 let rows = schema.tables.values().map(|t| t.row_count).sum();
900 (format!("Parquet file: {name}"), vec![name], rows)
901 }
902 DataSource::Json(json) => {
903 let name = json
904 .path
905 .file_name()
906 .and_then(|n| n.to_str())
907 .unwrap_or("unknown")
908 .to_string();
909 let rows = schema.tables.values().map(|t| t.row_count).sum();
910 let format_type = if json.is_array { "JSON" } else { "JSONL" };
911 (format!("{format_type} file: {name}"), vec![name], rows)
912 }
913 DataSource::Memory(mem) => {
914 let rows = mem.row_count() as u64;
915 (
916 "In-memory data".to_string(),
917 vec!["memory".to_string()],
918 rows,
919 )
920 }
921 DataSource::Directory(dir) => {
922 let name = dir.path.display().to_string();
924 (format!("Directory: {name}"), vec![], 0)
925 }
926 };
927
928 SourceMetadata::new(description, tables, total_rows)
929}