Skip to main content

datasynth_fingerprint/extraction/
mod.rs

1//! Extraction engine for fingerprinting.
2//!
3//! This module provides extractors that analyze data and produce
4//! fingerprint components while applying privacy mechanisms.
5//!
6//! # Overview
7//!
8//! The extraction process analyzes real data and produces a [`Fingerprint`]
9//! that captures statistical properties without storing individual records.
10//!
11//! # Basic Usage
12//!
13//! ```ignore
14//! use datasynth_fingerprint::extraction::FingerprintExtractor;
15//! use datasynth_fingerprint::models::PrivacyLevel;
16//! use std::path::Path;
17//!
18//! // Create extractor with standard privacy
19//! let extractor = FingerprintExtractor::new(PrivacyLevel::Standard);
20//!
21//! // Extract from CSV file
22//! let fingerprint = extractor.extract_from_csv(Path::new("data.csv"))?;
23//! ```
24//!
25//! # Data Sources
26//!
27//! Multiple data source types are supported:
28//!
29//! ```ignore
30//! use datasynth_fingerprint::extraction::{DataSource, CsvDataSource, ParquetDataSource, JsonDataSource};
31//!
32//! // CSV files
33//! let csv_source = DataSource::Csv(CsvDataSource::new("data.csv"));
34//!
35//! // Parquet files
36//! let parquet_source = DataSource::Parquet(ParquetDataSource::new("data.parquet"));
37//!
38//! // JSON files (array or newline-delimited)
39//! let json_source = DataSource::Json(JsonDataSource::json_array("data.json"));
40//! let jsonl_source = DataSource::Json(JsonDataSource::jsonl("data.jsonl"));
41//!
42//! // Multi-table from directory
43//! let fingerprint = extractor.extract_from_directory(Path::new("./data_folder/"))?;
44//! ```
45//!
46//! # Streaming Extraction
47//!
48//! For large files that don't fit in memory, use streaming extraction:
49//!
50//! ```ignore
51//! use datasynth_fingerprint::extraction::{FingerprintExtractor, ExtractionConfig};
52//!
53//! let config = ExtractionConfig {
54//!     streaming: true,
55//!     stream_batch_size: 100_000,  // Process 100k rows at a time
56//!     ..ExtractionConfig::default()
57//! };
58//!
59//! let extractor = FingerprintExtractor::with_config(config);
60//! let fingerprint = extractor.extract_streaming_csv(Path::new("large_file.csv"))?;
61//! ```
62//!
63//! # Component Extractors
64//!
65//! Individual extractors handle different fingerprint components:
66//!
67//! | Extractor | Output | Description |
68//! |-----------|--------|-------------|
69//! | [`SchemaExtractor`] | [`SchemaFingerprint`] | Column types, constraints |
70//! | [`StatsExtractor`] | [`StatisticsFingerprint`] | Distributions, percentiles |
71//! | [`CorrelationExtractor`] | [`CorrelationFingerprint`] | Correlation matrices |
72//! | [`IntegrityExtractor`] | [`IntegrityFingerprint`] | FK relationships |
73//! | [`RulesExtractor`] | [`RulesFingerprint`] | Business rules |
74//! | [`AnomalyExtractor`] | [`AnomalyFingerprint`] | Anomaly patterns |
75//!
76//! # Streaming Statistics
77//!
78//! The [`StreamingNumericStats`] and [`StreamingCategoricalStats`] types
79//! provide memory-efficient online algorithms for computing statistics:
80//!
81//! ```ignore
82//! use datasynth_fingerprint::extraction::StreamingNumericStats;
83//!
84//! let mut stats = StreamingNumericStats::new();
85//! for value in data_iterator {
86//!     stats.update(value);
87//! }
88//! let final_stats = stats.finalize();
89//! ```
90//!
91//! [`Fingerprint`]: crate::models::Fingerprint
92//! [`SchemaFingerprint`]: crate::models::SchemaFingerprint
93//! [`StatisticsFingerprint`]: crate::models::StatisticsFingerprint
94//! [`CorrelationFingerprint`]: crate::models::CorrelationFingerprint
95//! [`IntegrityFingerprint`]: crate::models::IntegrityFingerprint
96//! [`RulesFingerprint`]: crate::models::RulesFingerprint
97//! [`AnomalyFingerprint`]: crate::models::AnomalyFingerprint
98
99mod anomaly_extractor;
100pub mod approver_extractor;
101pub mod banking_extractor;
102pub mod behavioral_extractor;
103pub mod coa_extractor;
104mod correlation_extractor;
105mod integrity_extractor;
106pub mod manual_extractor;
107pub mod pii_denylist;
108pub mod reference_extractor;
109mod rules_extractor;
110mod schema_extractor;
111mod stats_extractor;
112pub mod streaming;
113pub mod tb_extractor;
114pub mod text_extractor;
115pub mod user_extractor;
116
117pub use anomaly_extractor::*;
118pub use approver_extractor::extract_approver_prior_from_parquet;
119pub use banking_extractor::*;
120pub use coa_extractor::extract_coa_semantic_from_parquet;
121pub use correlation_extractor::*;
122pub use integrity_extractor::*;
123pub use manual_extractor::extract_manual_share_from_parquet;
124pub use pii_denylist::PiiDenylist;
125pub use reference_extractor::{extract_reference_formats, fill_template, tokenize_reference};
126pub use rules_extractor::*;
127pub use schema_extractor::*;
128pub use stats_extractor::*;
129pub use streaming::{StreamingCategoricalStats, StreamingNumericStats};
130pub use tb_extractor::extract_tb_anchor_from_parquet;
131pub use text_extractor::{
132    extract_text_taxonomy, extract_text_taxonomy_checked, extract_text_taxonomy_from_records,
133    TextTaxonomyRecord,
134};
135
136use std::path::Path;
137
138use crate::error::{FingerprintError, FingerprintResult};
139use crate::models::{
140    Fingerprint, Manifest, PrivacyLevel, PrivacyMetadata, SchemaFingerprint, SourceMetadata,
141    StatisticsFingerprint,
142};
143use crate::privacy::{PrivacyConfig, PrivacyEngine};
144
145/// Configuration for fingerprint extraction.
146#[derive(Debug, Clone)]
147pub struct ExtractionConfig {
148    /// Privacy configuration.
149    pub privacy: PrivacyConfig,
150    /// Whether to extract correlations.
151    pub extract_correlations: bool,
152    /// Whether to extract integrity constraints.
153    pub extract_integrity: bool,
154    /// Whether to extract business rules.
155    pub extract_rules: bool,
156    /// Whether to extract anomaly patterns.
157    pub extract_anomalies: bool,
158    /// Maximum sample size for large datasets.
159    pub max_sample_size: Option<usize>,
160    /// Minimum rows required for extraction.
161    pub min_rows: usize,
162    /// Enable streaming extraction for large files.
163    ///
164    /// When enabled, uses online algorithms for statistics computation
165    /// to reduce memory usage. Set `stream_batch_size` to control memory.
166    pub streaming: bool,
167    /// Batch size for streaming extraction (number of rows per batch).
168    ///
169    /// Smaller values reduce memory but may increase computation time.
170    /// Default is 10,000 rows.
171    pub stream_batch_size: usize,
172}
173
174impl Default for ExtractionConfig {
175    fn default() -> Self {
176        Self {
177            privacy: PrivacyConfig::from_level(PrivacyLevel::Standard),
178            extract_correlations: true,
179            extract_integrity: true,
180            extract_rules: true,
181            extract_anomalies: true,
182            max_sample_size: None,
183            min_rows: 10,
184            streaming: false,
185            stream_batch_size: 10_000,
186        }
187    }
188}
189
190impl ExtractionConfig {
191    /// Create with a specific privacy level.
192    pub fn with_privacy_level(level: PrivacyLevel) -> Self {
193        Self {
194            privacy: PrivacyConfig::from_level(level),
195            ..Default::default()
196        }
197    }
198
199    /// Enable streaming mode for large datasets.
200    ///
201    /// Streaming mode uses online algorithms to compute statistics
202    /// without loading all data into memory.
203    pub fn with_streaming(mut self, batch_size: usize) -> Self {
204        self.streaming = true;
205        self.stream_batch_size = batch_size;
206        self
207    }
208}
209
210/// Trait for data extractors.
211pub trait Extractor: Send + Sync {
212    /// Name of this extractor.
213    fn name(&self) -> &'static str;
214
215    /// Extract component from data.
216    fn extract(
217        &self,
218        data: &DataSource,
219        config: &ExtractionConfig,
220        privacy: &mut PrivacyEngine,
221    ) -> FingerprintResult<ExtractedComponent>;
222}
223
224/// Source of data for extraction.
225#[derive(Debug)]
226pub enum DataSource {
227    /// CSV file.
228    Csv(CsvDataSource),
229    /// Parquet file.
230    Parquet(ParquetDataSource),
231    /// JSON/JSONL file.
232    Json(JsonDataSource),
233    /// Directory with multiple files.
234    Directory(DirectoryDataSource),
235    /// In-memory data.
236    Memory(MemoryDataSource),
237}
238
239/// CSV data source.
240#[derive(Debug)]
241pub struct CsvDataSource {
242    /// Path to the CSV file.
243    pub path: std::path::PathBuf,
244    /// Whether the CSV has headers.
245    pub has_headers: bool,
246    /// Delimiter character.
247    pub delimiter: u8,
248}
249
250impl CsvDataSource {
251    /// Create from a path.
252    pub fn new(path: impl AsRef<Path>) -> Self {
253        Self {
254            path: path.as_ref().to_path_buf(),
255            has_headers: true,
256            delimiter: b',',
257        }
258    }
259}
260
261/// Parquet data source.
262#[derive(Debug)]
263pub struct ParquetDataSource {
264    /// Path to the Parquet file.
265    pub path: std::path::PathBuf,
266    /// Row group indices to read (None = all).
267    pub row_groups: Option<Vec<usize>>,
268    /// Columns to read (None = all).
269    pub columns: Option<Vec<String>>,
270}
271
272impl ParquetDataSource {
273    /// Create from a path.
274    pub fn new(path: impl AsRef<Path>) -> Self {
275        Self {
276            path: path.as_ref().to_path_buf(),
277            row_groups: None,
278            columns: None,
279        }
280    }
281
282    /// Specify row groups to read.
283    pub fn with_row_groups(mut self, groups: Vec<usize>) -> Self {
284        self.row_groups = Some(groups);
285        self
286    }
287
288    /// Specify columns to read.
289    pub fn with_columns(mut self, columns: Vec<String>) -> Self {
290        self.columns = Some(columns);
291        self
292    }
293}
294
295/// JSON/JSONL data source.
296#[derive(Debug)]
297pub struct JsonDataSource {
298    /// Path to the JSON or JSONL file.
299    pub path: std::path::PathBuf,
300    /// Format: true for JSON array, false for JSONL (one object per line).
301    pub is_array: bool,
302}
303
304impl JsonDataSource {
305    /// Create from a path, auto-detecting format from extension.
306    pub fn new(path: impl AsRef<Path>) -> Self {
307        let path = path.as_ref().to_path_buf();
308        let is_array = path
309            .extension()
310            .map(|ext| ext != "jsonl" && ext != "ndjson")
311            .unwrap_or(true);
312        Self { path, is_array }
313    }
314
315    /// Create a JSON array source.
316    pub fn json_array(path: impl AsRef<Path>) -> Self {
317        Self {
318            path: path.as_ref().to_path_buf(),
319            is_array: true,
320        }
321    }
322
323    /// Create a JSONL (newline-delimited) source.
324    pub fn jsonl(path: impl AsRef<Path>) -> Self {
325        Self {
326            path: path.as_ref().to_path_buf(),
327            is_array: false,
328        }
329    }
330}
331
332/// Directory data source for multi-table extraction.
333#[derive(Debug)]
334pub struct DirectoryDataSource {
335    /// Path to the directory.
336    pub path: std::path::PathBuf,
337    /// File extensions to include (empty = all supported).
338    pub extensions: Vec<String>,
339    /// Whether to recurse into subdirectories.
340    pub recursive: bool,
341}
342
343impl DirectoryDataSource {
344    /// Create from a directory path.
345    pub fn new(path: impl AsRef<Path>) -> Self {
346        Self {
347            path: path.as_ref().to_path_buf(),
348            extensions: vec![
349                "csv".to_string(),
350                "parquet".to_string(),
351                "json".to_string(),
352                "jsonl".to_string(),
353            ],
354            recursive: false,
355        }
356    }
357
358    /// Set file extensions to include.
359    pub fn with_extensions(mut self, extensions: Vec<String>) -> Self {
360        self.extensions = extensions;
361        self
362    }
363
364    /// Enable recursive directory traversal.
365    pub fn recursive(mut self) -> Self {
366        self.recursive = true;
367        self
368    }
369
370    /// Get all matching files in the directory.
371    pub fn files(&self) -> std::io::Result<Vec<std::path::PathBuf>> {
372        let mut files = Vec::new();
373        self.collect_files(&self.path, &mut files)?;
374        Ok(files)
375    }
376
377    fn collect_files(
378        &self,
379        dir: &Path,
380        files: &mut Vec<std::path::PathBuf>,
381    ) -> std::io::Result<()> {
382        for entry in std::fs::read_dir(dir)? {
383            let entry = entry?;
384            let path = entry.path();
385
386            if path.is_dir() {
387                if self.recursive {
388                    self.collect_files(&path, files)?;
389                }
390            } else if let Some(ext) = path.extension().and_then(|e| e.to_str()) {
391                let ext_lower = ext.to_lowercase();
392                if self.extensions.is_empty()
393                    || self
394                        .extensions
395                        .iter()
396                        .any(|e| e.to_lowercase() == ext_lower)
397                {
398                    files.push(path);
399                }
400            }
401        }
402        Ok(())
403    }
404}
405
406/// In-memory data source.
407#[derive(Debug)]
408pub struct MemoryDataSource {
409    /// Column names.
410    pub columns: Vec<String>,
411    /// Row data (each inner Vec is a row).
412    pub rows: Vec<Vec<String>>,
413}
414
415impl MemoryDataSource {
416    /// Create from columns and rows.
417    pub fn new(columns: Vec<String>, rows: Vec<Vec<String>>) -> Self {
418        Self { columns, rows }
419    }
420
421    /// Get row count.
422    pub fn row_count(&self) -> usize {
423        self.rows.len()
424    }
425
426    /// Get column count.
427    pub fn column_count(&self) -> usize {
428        self.columns.len()
429    }
430}
431
432/// Result of extraction from a single extractor.
433#[derive(Debug)]
434pub enum ExtractedComponent {
435    Schema(SchemaFingerprint),
436    Statistics(StatisticsFingerprint),
437    Correlations(crate::models::CorrelationFingerprint),
438    Integrity(crate::models::IntegrityFingerprint),
439    Rules(crate::models::RulesFingerprint),
440    Anomalies(crate::models::AnomalyFingerprint),
441}
442
443/// Main fingerprint extractor that coordinates all extraction.
444pub struct FingerprintExtractor {
445    config: ExtractionConfig,
446}
447
448impl FingerprintExtractor {
449    /// Create a new extractor with default configuration.
450    pub fn new() -> Self {
451        Self {
452            config: ExtractionConfig::default(),
453        }
454    }
455
456    /// Create with a specific privacy level.
457    pub fn with_privacy_level(level: PrivacyLevel) -> Self {
458        Self {
459            config: ExtractionConfig::with_privacy_level(level),
460        }
461    }
462
463    /// Create with custom configuration.
464    pub fn with_config(config: ExtractionConfig) -> Self {
465        Self { config }
466    }
467
468    /// Extract fingerprint from a CSV file.
469    pub fn extract_from_csv(&self, path: impl AsRef<Path>) -> FingerprintResult<Fingerprint> {
470        let source = DataSource::Csv(CsvDataSource::new(path));
471        self.extract(&source)
472    }
473
474    /// Extract fingerprint from a large CSV file using streaming.
475    ///
476    /// This method processes the CSV in batches to reduce memory usage,
477    /// using online algorithms for statistics computation.
478    ///
479    /// # Arguments
480    /// * `path` - Path to the CSV file
481    ///
482    /// # Example
483    /// ```no_run
484    /// use datasynth_fingerprint::extraction::FingerprintExtractor;
485    ///
486    /// let extractor = FingerprintExtractor::new();
487    /// let fingerprint = extractor.extract_streaming_csv("large_data.csv").unwrap();
488    /// ```
489    pub fn extract_streaming_csv(&self, path: impl AsRef<Path>) -> FingerprintResult<Fingerprint> {
490        use std::collections::HashMap;
491        use streaming::{StreamingCategoricalStats, StreamingNumericStats};
492
493        let path = path.as_ref();
494        let mut reader = csv::ReaderBuilder::new()
495            .has_headers(true)
496            .from_path(path)?;
497
498        let headers: Vec<String> = reader
499            .headers()?
500            .iter()
501            .map(std::string::ToString::to_string)
502            .collect();
503
504        // Initialize streaming accumulators for each column
505        let mut numeric_accumulators: HashMap<usize, StreamingNumericStats> = HashMap::new();
506        let mut categorical_accumulators: HashMap<usize, StreamingCategoricalStats> =
507            HashMap::new();
508        let mut column_is_numeric: HashMap<usize, bool> = HashMap::new();
509        let mut row_count: u64 = 0;
510
511        // Process rows in streaming fashion
512        for result in reader.records() {
513            let record = result?;
514            row_count += 1;
515
516            for (i, field) in record.iter().enumerate() {
517                if i >= headers.len() {
518                    continue;
519                }
520
521                // Determine if column is numeric (on first batch)
522                let is_numeric = column_is_numeric
523                    .entry(i)
524                    .or_insert_with(|| field.parse::<f64>().is_ok() || field.is_empty());
525
526                if *is_numeric {
527                    if let Ok(value) = field.parse::<f64>() {
528                        let acc = numeric_accumulators
529                            .entry(i)
530                            .or_insert_with(|| StreamingNumericStats::new(10000));
531                        acc.add(value);
532                    }
533                } else {
534                    let acc = categorical_accumulators
535                        .entry(i)
536                        .or_insert_with(|| StreamingCategoricalStats::new(1000));
537                    acc.add(field.to_string());
538                }
539            }
540
541            // Optional: limit rows if max_sample_size is set
542            if let Some(max) = self.config.max_sample_size {
543                if row_count >= max as u64 {
544                    break;
545                }
546            }
547        }
548
549        // Check minimum rows
550        if row_count < self.config.min_rows as u64 {
551            return Err(FingerprintError::InsufficientData {
552                required: self.config.min_rows,
553                actual: row_count as usize,
554            });
555        }
556
557        // Build schema
558        let mut schema = SchemaFingerprint::new();
559        let table_name = path.file_stem().and_then(|s| s.to_str()).unwrap_or("data");
560
561        let mut table = crate::models::TableSchema::new(table_name, row_count);
562        for (i, header) in headers.iter().enumerate() {
563            let is_numeric = column_is_numeric.get(&i).copied().unwrap_or(false);
564            let data_type = if is_numeric {
565                crate::models::DataType::Float64
566            } else {
567                crate::models::DataType::String
568            };
569            let field = crate::models::FieldSchema::new(header.clone(), data_type);
570            table.add_column(field);
571        }
572        schema.add_table(table_name.to_string(), table);
573
574        // Build statistics from accumulators
575        let mut statistics = StatisticsFingerprint::new();
576
577        for (i, acc) in numeric_accumulators {
578            let header = &headers[i];
579            let numeric_stats = crate::models::NumericStats {
580                count: acc.count(),
581                min: acc.min(),
582                max: acc.max(),
583                mean: acc.mean(),
584                std_dev: acc.std_dev(),
585                percentiles: acc.percentiles(),
586                distribution: crate::models::DistributionType::Unknown,
587                distribution_params: crate::models::DistributionParams::empty(),
588                zero_rate: acc.zero_rate(),
589                negative_rate: acc.negative_rate(),
590                benford_first_digit: Some(acc.benford_distribution()),
591                log_magnitude_percentiles: acc.log_magnitude_percentiles(),
592            };
593            statistics.add_numeric(table_name, header, numeric_stats);
594        }
595
596        for (i, acc) in categorical_accumulators {
597            let header = &headers[i];
598            let top_values: Vec<crate::models::CategoryFrequency> = acc
599                .top_values(100)
600                .into_iter()
601                .map(|(value, count)| {
602                    let frequency = count as f64 / acc.count() as f64;
603                    crate::models::CategoryFrequency::new(value, frequency)
604                })
605                .collect();
606
607            let categorical_stats = crate::models::CategoricalStats {
608                count: acc.count(),
609                cardinality: acc.cardinality(),
610                top_values,
611                rare_values_suppressed: true,
612                suppressed_count: 0,
613                entropy: acc.entropy(),
614            };
615            statistics.add_categorical(table_name, header, categorical_stats);
616        }
617
618        // Build manifest
619        let source_meta = SourceMetadata::new(
620            format!("CSV file: {} (streaming extraction)", path.display()),
621            vec![table_name.to_string()],
622            row_count,
623        );
624        let privacy_meta = PrivacyMetadata::from_level(self.config.privacy.level);
625        let manifest = Manifest::new(source_meta, privacy_meta);
626
627        // Build fingerprint (minimal privacy audit for streaming mode)
628        let privacy_audit = crate::models::PrivacyAudit::new(
629            self.config.privacy.epsilon,
630            self.config.privacy.k_anonymity,
631        );
632
633        let fingerprint = Fingerprint::new(manifest, schema, statistics, privacy_audit);
634
635        Ok(fingerprint)
636    }
637
638    /// Extract fingerprint from in-memory data.
639    pub fn extract_from_memory(
640        &self,
641        columns: Vec<String>,
642        rows: Vec<Vec<String>>,
643    ) -> FingerprintResult<Fingerprint> {
644        let source = DataSource::Memory(MemoryDataSource::new(columns, rows));
645        self.extract(&source)
646    }
647
648    /// Extract fingerprint from a directory.
649    pub fn extract_from_directory(&self, path: impl AsRef<Path>) -> FingerprintResult<Fingerprint> {
650        let source = DataSource::Directory(DirectoryDataSource::new(path));
651        self.extract(&source)
652    }
653
654    /// Extract fingerprint from a data source.
655    pub fn extract(&self, source: &DataSource) -> FingerprintResult<Fingerprint> {
656        // Handle directory sources specially by extracting from each file and merging
657        if let DataSource::Directory(dir) = source {
658            return self.extract_from_directory_impl(dir);
659        }
660
661        let mut privacy = PrivacyEngine::new(self.config.privacy.clone());
662
663        // Extract schema
664        let schema_extractor = SchemaExtractor;
665        let schema = match schema_extractor.extract(source, &self.config, &mut privacy)? {
666            ExtractedComponent::Schema(s) => s,
667            _ => {
668                return Err(FingerprintError::extraction(
669                    "schema",
670                    "Unexpected component type",
671                ))
672            }
673        };
674
675        // Extract statistics
676        let stats_extractor = StatsExtractor;
677        let statistics = match stats_extractor.extract(source, &self.config, &mut privacy)? {
678            ExtractedComponent::Statistics(s) => s,
679            _ => {
680                return Err(FingerprintError::extraction(
681                    "statistics",
682                    "Unexpected component type",
683                ))
684            }
685        };
686
687        // Extract optional components
688        let correlations = if self.config.extract_correlations {
689            let extractor = CorrelationExtractor;
690            match extractor.extract(source, &self.config, &mut privacy) {
691                Ok(ExtractedComponent::Correlations(c)) => Some(c),
692                Ok(_) => None,
693                Err(e) => {
694                    tracing::warn!("Optional correlations extraction failed: {}", e);
695                    None
696                }
697            }
698        } else {
699            None
700        };
701
702        let integrity = if self.config.extract_integrity {
703            let extractor = IntegrityExtractor;
704            match extractor.extract(source, &self.config, &mut privacy) {
705                Ok(ExtractedComponent::Integrity(i)) => Some(i),
706                Ok(_) => None,
707                Err(e) => {
708                    tracing::warn!("Optional integrity extraction failed: {}", e);
709                    None
710                }
711            }
712        } else {
713            None
714        };
715
716        let rules = if self.config.extract_rules {
717            let extractor = RulesExtractor;
718            match extractor.extract(source, &self.config, &mut privacy) {
719                Ok(ExtractedComponent::Rules(r)) => Some(r),
720                Ok(_) => None,
721                Err(e) => {
722                    tracing::warn!("Optional rules extraction failed: {}", e);
723                    None
724                }
725            }
726        } else {
727            None
728        };
729
730        let anomalies = if self.config.extract_anomalies {
731            let extractor = AnomalyExtractor;
732            match extractor.extract(source, &self.config, &mut privacy) {
733                Ok(ExtractedComponent::Anomalies(a)) => Some(a),
734                Ok(_) => None,
735                Err(e) => {
736                    tracing::warn!("Optional anomalies extraction failed: {}", e);
737                    None
738                }
739            }
740        } else {
741            None
742        };
743
744        // Build manifest with composition metadata from the engine
745        let source_meta = build_source_metadata(source, &schema);
746        let privacy_meta = privacy.build_privacy_metadata();
747        let manifest = Manifest::new(source_meta, privacy_meta);
748
749        // Get privacy audit (includes composition method and RDP alpha)
750        let privacy_audit = privacy.into_audit();
751
752        // Build fingerprint
753        let mut fingerprint = Fingerprint::new(manifest, schema, statistics, privacy_audit);
754
755        if let Some(c) = correlations {
756            fingerprint = fingerprint.with_correlations(c);
757        }
758        if let Some(i) = integrity {
759            fingerprint = fingerprint.with_integrity(i);
760        }
761        if let Some(r) = rules {
762            fingerprint = fingerprint.with_rules(r);
763        }
764        if let Some(a) = anomalies {
765            fingerprint = fingerprint.with_anomalies(a);
766        }
767
768        Ok(fingerprint)
769    }
770
771    /// Extract fingerprint from a directory by processing each file.
772    fn extract_from_directory_impl(
773        &self,
774        dir: &DirectoryDataSource,
775    ) -> FingerprintResult<Fingerprint> {
776        let files = dir.files()?;
777
778        if files.is_empty() {
779            return Err(FingerprintError::InvalidFormat(format!(
780                "No supported files found in directory: {}",
781                dir.path.display()
782            )));
783        }
784
785        // Extract from each file and merge
786        let mut merged_schema = SchemaFingerprint::new();
787        let mut merged_stats = StatisticsFingerprint::new();
788        let mut total_rows: u64 = 0;
789        let mut table_names: Vec<String> = Vec::new();
790
791        // Track total epsilon spent across all files
792        let mut total_epsilon_spent = 0.0;
793        let mut all_actions = Vec::new();
794
795        // Divide epsilon budget among files to ensure each file gets some budget
796        let num_files = files.len();
797        let per_file_epsilon = self.config.privacy.epsilon / num_files as f64;
798
799        for file_path in &files {
800            // Determine file type
801            let ext = file_path
802                .extension()
803                .and_then(|e| e.to_str())
804                .map(str::to_lowercase)
805                .unwrap_or_default();
806
807            let source = match ext.as_str() {
808                "csv" => DataSource::Csv(CsvDataSource::new(file_path)),
809                "parquet" => DataSource::Parquet(ParquetDataSource::new(file_path)),
810                "json" => DataSource::Json(JsonDataSource::json_array(file_path)),
811                "jsonl" | "ndjson" => DataSource::Json(JsonDataSource::jsonl(file_path)),
812                _ => continue, // Skip unknown file types
813            };
814
815            // Create a fresh privacy engine for each file with proportional budget
816            let mut per_file_config = self.config.privacy.clone();
817            per_file_config.epsilon = per_file_epsilon;
818            let mut file_privacy = PrivacyEngine::new(per_file_config);
819
820            // Extract schema
821            let schema_extractor = SchemaExtractor;
822            if let Ok(ExtractedComponent::Schema(schema)) =
823                schema_extractor.extract(&source, &self.config, &mut file_privacy)
824            {
825                for (name, table) in schema.tables {
826                    total_rows += table.row_count;
827                    table_names.push(name.clone());
828                    merged_schema.add_table(name, table);
829                }
830            }
831
832            // Extract statistics
833            let stats_extractor = StatsExtractor;
834            if let Ok(ExtractedComponent::Statistics(stats)) =
835                stats_extractor.extract(&source, &self.config, &mut file_privacy)
836            {
837                // Merge statistics
838                for (key, numeric) in stats.numeric_columns {
839                    merged_stats.numeric_columns.insert(key, numeric);
840                }
841                for (key, categorical) in stats.categorical_columns {
842                    merged_stats.categorical_columns.insert(key, categorical);
843                }
844            }
845
846            // Collect privacy audit from this file
847            let file_audit = file_privacy.into_audit();
848            total_epsilon_spent += file_audit.total_epsilon_spent;
849            all_actions.extend(file_audit.actions);
850        }
851
852        // Build source metadata
853        let description = format!("Directory: {} ({} files)", dir.path.display(), files.len());
854        let source_meta = SourceMetadata::new(description, table_names, total_rows);
855        let privacy_meta = PrivacyMetadata::from_level(self.config.privacy.level);
856        let manifest = Manifest::new(source_meta, privacy_meta);
857
858        // Build combined privacy audit
859        let mut privacy_audit = crate::models::PrivacyAudit::new(
860            self.config.privacy.epsilon,
861            self.config.privacy.k_anonymity,
862        );
863        privacy_audit.total_epsilon_spent = total_epsilon_spent;
864        privacy_audit.actions = all_actions;
865
866        // Build fingerprint
867        let fingerprint = Fingerprint::new(manifest, merged_schema, merged_stats, privacy_audit);
868
869        Ok(fingerprint)
870    }
871}
872
873impl Default for FingerprintExtractor {
874    fn default() -> Self {
875        Self::new()
876    }
877}
878
879/// Build source metadata from data source and schema.
880fn build_source_metadata(source: &DataSource, schema: &SchemaFingerprint) -> SourceMetadata {
881    let (description, tables, total_rows) = match source {
882        DataSource::Csv(csv) => {
883            let name = csv
884                .path
885                .file_name()
886                .and_then(|n| n.to_str())
887                .unwrap_or("unknown")
888                .to_string();
889            let rows = schema.tables.values().map(|t| t.row_count).sum();
890            (format!("CSV file: {name}"), vec![name], rows)
891        }
892        DataSource::Parquet(pq) => {
893            let name = pq
894                .path
895                .file_name()
896                .and_then(|n| n.to_str())
897                .unwrap_or("unknown")
898                .to_string();
899            let rows = schema.tables.values().map(|t| t.row_count).sum();
900            (format!("Parquet file: {name}"), vec![name], rows)
901        }
902        DataSource::Json(json) => {
903            let name = json
904                .path
905                .file_name()
906                .and_then(|n| n.to_str())
907                .unwrap_or("unknown")
908                .to_string();
909            let rows = schema.tables.values().map(|t| t.row_count).sum();
910            let format_type = if json.is_array { "JSON" } else { "JSONL" };
911            (format!("{format_type} file: {name}"), vec![name], rows)
912        }
913        DataSource::Memory(mem) => {
914            let rows = mem.row_count() as u64;
915            (
916                "In-memory data".to_string(),
917                vec!["memory".to_string()],
918                rows,
919            )
920        }
921        DataSource::Directory(dir) => {
922            // This shouldn't be called for directories - they're handled separately
923            let name = dir.path.display().to_string();
924            (format!("Directory: {name}"), vec![], 0)
925        }
926    };
927
928    SourceMetadata::new(description, tables, total_rows)
929}