Skip to main content

alimentar/format/
piracy.rs

1//! Piracy detection and watermarking for .ald format (ยง9.3)
2//!
3//! Provides first-class support for detecting stolen datasets and tracing
4//! leaks.
5//!
6//! # Features
7//!
8//! - **Entropy Analysis**: Detect watermark presence without seller key
9//! - **Watermark Embedding**: Buyer-specific fingerprints in LSB of floats
10//! - **Watermark Extraction**: Recover buyer identity with seller key
11//! - **Legal Evidence**: Generate cryptographic proof for proceedings
12
13// Statistical calculations require f64 casts which are acceptable for this use case
14#![allow(clippy::cast_precision_loss)]
15
16use arrow::array::{Array, Float32Array, Float64Array, RecordBatch};
17
18use crate::error::{Error, Result};
19
20/// Natural LSB entropy threshold for clean data
21pub const LSB_NATURAL_THRESHOLD: f64 = 0.97;
22
23/// Autocorrelation threshold for pattern detection
24/// Watermarks repeat every 256 bits, creating detectable autocorrelation
25pub const AUTOCORRELATION_THRESHOLD: f64 = 0.7;
26
27/// Minimum confidence for watermark detection
28pub const DETECTION_CONFIDENCE_THRESHOLD: f64 = 0.80;
29
30/// Result of watermark detection analysis
31#[derive(Debug, Clone)]
32pub struct DetectionResult {
33    /// Whether the dataset is likely watermarked
34    pub likely_watermarked: bool,
35    /// Confidence level (0.0 - 1.0)
36    pub confidence: f64,
37    /// Columns with suspicious LSB patterns
38    pub suspicious_columns: Vec<String>,
39}
40
41/// Entropy analysis results for a column
42#[derive(Debug, Clone)]
43pub struct ColumnEntropy {
44    /// Column name
45    pub name: String,
46    /// Shannon entropy of values (bits)
47    pub shannon_entropy: f64,
48    /// Shannon entropy of LSB bits only
49    pub lsb_entropy: f64,
50    /// Kolmogorov-Smirnov test p-value against uniform
51    pub ks_pvalue: f64,
52    /// Chi-square test result for LSB uniformity
53    pub chi_square_pvalue: f64,
54    /// Autocorrelation at lag 256 (watermark period)
55    pub autocorrelation_256: f64,
56}
57
58/// Full entropy analysis of a dataset
59#[derive(Debug, Clone)]
60pub struct EntropyAnalysis {
61    /// Per-column entropy results
62    pub columns: Vec<ColumnEntropy>,
63    /// Overall LSB entropy
64    pub overall_lsb_entropy: f64,
65    /// Overall autocorrelation at lag 256
66    pub overall_autocorrelation: f64,
67    /// Detection confidence
68    pub confidence: f64,
69    /// Anomalous columns (high autocorrelation = repeating pattern)
70    pub anomalous_columns: Vec<String>,
71}
72
73/// Watermark configuration for embedding
74#[derive(Debug, Clone)]
75pub struct Watermark {
76    /// Buyer identifier (hashed with seller secret)
77    pub buyer_hash: [u8; 32],
78    /// Embedding strength (0.0001 - 0.001 typical)
79    pub strength: f32,
80    /// Columns to watermark (indices)
81    pub column_indices: Vec<usize>,
82    /// Redundancy factor (0.0 - 1.0, survives N% row deletion)
83    pub redundancy: f32,
84}
85
86/// Extracted buyer identity
87#[derive(Debug, Clone)]
88pub struct BuyerIdentity {
89    /// Recovered buyer hash
90    pub buyer_hash: [u8; 32],
91    /// Extraction confidence
92    pub confidence: f64,
93}
94
95/// Legal evidence for proceedings
96#[derive(Debug, Clone)]
97pub struct LegalEvidence {
98    /// Dataset hash (SHA-256)
99    pub dataset_hash: [u8; 32],
100    /// Extracted buyer hash
101    pub buyer_hash: [u8; 32],
102    /// Statistical confidence (0.0-1.0)
103    pub confidence: f64,
104    /// Timestamp of analysis (RFC 3339)
105    pub analyzed_at: String,
106    /// Column-level evidence
107    pub column_evidence: Vec<ColumnEvidence>,
108}
109
110/// Per-column evidence
111#[derive(Debug, Clone)]
112pub struct ColumnEvidence {
113    /// Column name
114    pub name: String,
115    /// LSB entropy (lower = more likely watermarked)
116    pub lsb_entropy: f64,
117    /// Chi-square p-value
118    pub chi_square_pvalue: f64,
119    /// Bits extracted matching buyer
120    pub matching_bits: usize,
121    /// Total bits analyzed
122    pub total_bits: usize,
123}
124
125/// Piracy detector for analyzing datasets
126pub struct PiracyDetector;
127
128impl PiracyDetector {
129    /// Detect if dataset likely contains watermarks (no seller key needed)
130    ///
131    /// Uses statistical analysis of LSB autocorrelation to detect repeating
132    /// patterns. Watermarks repeat every 256 bits, creating detectable
133    /// autocorrelation.
134    pub fn detect_watermark_presence(batches: &[RecordBatch]) -> DetectionResult {
135        let analysis = Self::analyze_entropy(batches);
136
137        // High autocorrelation indicates repeating pattern (watermark)
138        let likely_watermarked = analysis.overall_autocorrelation > AUTOCORRELATION_THRESHOLD;
139
140        DetectionResult {
141            likely_watermarked,
142            confidence: analysis.confidence,
143            suspicious_columns: analysis.anomalous_columns,
144        }
145    }
146
147    /// Perform entropy analysis on numeric columns
148    pub fn analyze_entropy(batches: &[RecordBatch]) -> EntropyAnalysis {
149        if batches.is_empty() {
150            return EntropyAnalysis {
151                columns: vec![],
152                overall_lsb_entropy: 1.0,
153                overall_autocorrelation: 0.0,
154                confidence: 0.0,
155                anomalous_columns: vec![],
156            };
157        }
158
159        let schema = batches[0].schema();
160        let mut column_results = Vec::new();
161        let mut anomalous = Vec::new();
162
163        for (col_idx, field) in schema.fields().iter().enumerate() {
164            // Only analyze float columns (watermarks embedded in LSB)
165            if !is_float_type(field.data_type()) {
166                continue;
167            }
168
169            let lsb_bits = collect_lsb_bits(batches, col_idx);
170            if lsb_bits.is_empty() {
171                continue;
172            }
173
174            let lsb_entropy = shannon_entropy_bits(&lsb_bits);
175            let chi_pvalue = chi_square_uniformity(&lsb_bits);
176            let ks_pvalue = ks_test_uniform(&lsb_bits);
177            let autocorr = autocorrelation_lag_256(&lsb_bits);
178
179            let col_entropy = ColumnEntropy {
180                name: field.name().clone(),
181                shannon_entropy: 0.0, // Not needed for detection
182                lsb_entropy,
183                ks_pvalue,
184                chi_square_pvalue: chi_pvalue,
185                autocorrelation_256: autocorr,
186            };
187
188            // High autocorrelation indicates repeating pattern (watermark)
189            if autocorr > AUTOCORRELATION_THRESHOLD {
190                anomalous.push(field.name().clone());
191            }
192
193            column_results.push(col_entropy);
194        }
195
196        // Calculate overall metrics
197        let overall_lsb = if column_results.is_empty() {
198            1.0
199        } else {
200            column_results.iter().map(|c| c.lsb_entropy).sum::<f64>() / column_results.len() as f64
201        };
202
203        let overall_autocorr = if column_results.is_empty() {
204            0.0
205        } else {
206            column_results
207                .iter()
208                .map(|c| c.autocorrelation_256)
209                .sum::<f64>()
210                / column_results.len() as f64
211        };
212
213        // Confidence based on autocorrelation strength
214        let confidence = if overall_autocorr > 0.9 {
215            0.99
216        } else if overall_autocorr > AUTOCORRELATION_THRESHOLD {
217            (overall_autocorr - AUTOCORRELATION_THRESHOLD).mul_add(0.6, 0.80)
218        } else if overall_autocorr > 0.3 {
219            overall_autocorr.mul_add(0.5, 0.50)
220        } else {
221            overall_autocorr
222        }
223        .clamp(0.0, 1.0);
224
225        EntropyAnalysis {
226            columns: column_results,
227            overall_lsb_entropy: overall_lsb,
228            overall_autocorrelation: overall_autocorr,
229            confidence,
230            anomalous_columns: anomalous,
231        }
232    }
233
234    /// Generate legal evidence package
235    pub fn generate_evidence(
236        batches: &[RecordBatch],
237        buyer_hash: &[u8; 32],
238    ) -> Result<LegalEvidence> {
239        let analysis = Self::analyze_entropy(batches);
240
241        // Compute dataset hash
242        let dataset_hash = hash_batches(batches);
243
244        // Build column evidence
245        let column_evidence: Vec<ColumnEvidence> = analysis
246            .columns
247            .iter()
248            .map(|col| ColumnEvidence {
249                name: col.name.clone(),
250                lsb_entropy: col.lsb_entropy,
251                chi_square_pvalue: col.chi_square_pvalue,
252                matching_bits: 0, // Would need seller key to determine
253                total_bits: 0,
254            })
255            .collect();
256
257        // Get current timestamp
258        let analyzed_at = chrono_lite_now();
259
260        Ok(LegalEvidence {
261            dataset_hash,
262            buyer_hash: *buyer_hash,
263            confidence: analysis.confidence,
264            analyzed_at,
265            column_evidence,
266        })
267    }
268}
269
270/// Watermark embedder for protecting datasets
271pub struct WatermarkEmbedder {
272    seller_key: [u8; 32],
273}
274
275impl WatermarkEmbedder {
276    /// Create a new embedder with seller secret key
277    #[must_use]
278    pub fn new(seller_key: [u8; 32]) -> Self {
279        Self { seller_key }
280    }
281
282    /// Embed watermark into dataset batches
283    ///
284    /// Modifies LSB of float values to encode buyer identity.
285    pub fn embed(
286        &self,
287        batches: &[RecordBatch],
288        watermark: &Watermark,
289    ) -> Result<Vec<RecordBatch>> {
290        let mut result = Vec::with_capacity(batches.len());
291
292        // Generate deterministic bit sequence from buyer hash + seller key
293        let bit_sequence = generate_watermark_bits(&watermark.buyer_hash, &self.seller_key);
294
295        for batch in batches {
296            let modified = Self::embed_batch(batch, watermark, &bit_sequence)?;
297            result.push(modified);
298        }
299
300        Ok(result)
301    }
302
303    fn embed_batch(
304        batch: &RecordBatch,
305        watermark: &Watermark,
306        bits: &[bool],
307    ) -> Result<RecordBatch> {
308        use std::sync::Arc;
309
310        let schema = batch.schema();
311        let mut new_columns: Vec<Arc<dyn Array>> = Vec::with_capacity(batch.num_columns());
312
313        for col_idx in 0..batch.num_columns() {
314            let col = batch.column(col_idx);
315
316            if watermark.column_indices.contains(&col_idx) {
317                // Embed watermark in this column
318                let modified = embed_in_column(col.as_ref(), bits, watermark.strength)?;
319                new_columns.push(modified);
320            } else {
321                new_columns.push(Arc::clone(col));
322            }
323        }
324
325        RecordBatch::try_new(schema, new_columns).map_err(Error::Arrow)
326    }
327
328    /// Extract watermark from dataset (requires seller key)
329    pub fn extract(&self, batches: &[RecordBatch]) -> Option<BuyerIdentity> {
330        if batches.is_empty() {
331            return None;
332        }
333
334        // Collect LSB bits from all float columns
335        let schema = batches[0].schema();
336        let mut all_bits = Vec::new();
337
338        for (col_idx, field) in schema.fields().iter().enumerate() {
339            if is_float_type(field.data_type()) {
340                let bits = collect_lsb_bits(batches, col_idx);
341                all_bits.extend(bits);
342            }
343        }
344
345        if all_bits.len() < 256 {
346            return None; // Not enough data
347        }
348
349        // Try to decode buyer hash using seller key
350        let decoded = decode_watermark_bits(&all_bits, &self.seller_key)?;
351
352        // Calculate confidence based on bit correlation
353        let confidence = calculate_extraction_confidence(&all_bits, &decoded, &self.seller_key);
354
355        if confidence < DETECTION_CONFIDENCE_THRESHOLD {
356            return None;
357        }
358
359        Some(BuyerIdentity {
360            buyer_hash: decoded,
361            confidence,
362        })
363    }
364
365    /// Verify if dataset contains specific buyer's watermark
366    pub fn verify(&self, batches: &[RecordBatch], buyer_hash: &[u8; 32]) -> bool {
367        self.extract(batches)
368            .is_some_and(|id| &id.buyer_hash == buyer_hash)
369    }
370}
371
372// === Helper functions ===
373
374fn is_float_type(dtype: &arrow::datatypes::DataType) -> bool {
375    matches!(
376        dtype,
377        arrow::datatypes::DataType::Float32 | arrow::datatypes::DataType::Float64
378    )
379}
380
381fn collect_lsb_bits(batches: &[RecordBatch], col_idx: usize) -> Vec<bool> {
382    let mut bits = Vec::new();
383
384    for batch in batches {
385        if col_idx >= batch.num_columns() {
386            continue;
387        }
388        collect_column_lsb_bits(batch.column(col_idx), &mut bits);
389    }
390
391    bits
392}
393
394fn collect_column_lsb_bits(col: &dyn arrow::array::Array, bits: &mut Vec<bool>) {
395    if let Some(f32_arr) = col.as_any().downcast_ref::<Float32Array>() {
396        for i in 0..f32_arr.len() {
397            if !f32_arr.is_null(i) {
398                bits.push(f32_arr.value(i).to_bits() & 1 == 1);
399            }
400        }
401    } else if let Some(f64_arr) = col.as_any().downcast_ref::<Float64Array>() {
402        for i in 0..f64_arr.len() {
403            if !f64_arr.is_null(i) {
404                bits.push(f64_arr.value(i).to_bits() & 1 == 1);
405            }
406        }
407    }
408}
409
410fn shannon_entropy_bits(bits: &[bool]) -> f64 {
411    if bits.is_empty() {
412        return 1.0;
413    }
414
415    let ones = bits.iter().filter(|&&b| b).count();
416    let zeros = bits.len() - ones;
417    let total = bits.len() as f64;
418
419    let p1 = ones as f64 / total;
420    let p0 = zeros as f64 / total;
421
422    let mut entropy = 0.0;
423    if p0 > 0.0 {
424        entropy -= p0 * p0.log2();
425    }
426    if p1 > 0.0 {
427        entropy -= p1 * p1.log2();
428    }
429
430    entropy // Max is 1.0 for uniform distribution
431}
432
433fn chi_square_uniformity(bits: &[bool]) -> f64 {
434    if bits.is_empty() {
435        return 1.0;
436    }
437
438    let ones = bits.iter().filter(|&&b| b).count() as f64;
439    let zeros = (bits.len() - bits.iter().filter(|&&b| b).count()) as f64;
440    let expected = bits.len() as f64 / 2.0;
441
442    let chi_sq = (ones - expected).powi(2) / expected + (zeros - expected).powi(2) / expected;
443
444    // Approximate p-value for chi-square with 1 df
445    // Using simple approximation: p โ‰ˆ e^(-chi_sq/2) for large values
446    (-chi_sq / 2.0).exp().clamp(0.0, 1.0)
447}
448
449fn ks_test_uniform(bits: &[bool]) -> f64 {
450    if bits.is_empty() {
451        return 1.0;
452    }
453
454    // Simple KS test against uniform [0,1]
455    let n = bits.len() as f64;
456    let ones_ratio = bits.iter().filter(|&&b| b).count() as f64 / n;
457
458    // Max deviation from expected 0.5
459    let d = (ones_ratio - 0.5).abs();
460
461    // KS statistic
462    let ks = d * n.sqrt();
463
464    // Approximate p-value
465    (-2.0 * ks.powi(2)).exp().clamp(0.0, 1.0)
466}
467
468/// Compute autocorrelation at lag 256 (watermark period)
469///
470/// Watermarks repeat every 256 bits, so watermarked data will have
471/// high correlation between bits that are 256 apart.
472fn autocorrelation_lag_256(bits: &[bool]) -> f64 {
473    const LAG: usize = 256;
474
475    if bits.len() < LAG * 2 {
476        return 0.0; // Not enough data
477    }
478
479    // Convert to +1/-1 for correlation calculation
480    let values: Vec<f64> = bits.iter().map(|&b| if b { 1.0 } else { -1.0 }).collect();
481
482    // Compute mean
483    let mean = values.iter().sum::<f64>() / values.len() as f64;
484
485    // Compute variance
486    let variance = values.iter().map(|&v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64;
487
488    if variance < 1e-10 {
489        return 0.0;
490    }
491
492    // Compute autocorrelation at lag 256
493    let n = values.len() - LAG;
494    let autocorr: f64 = (0..n)
495        .map(|i| (values[i] - mean) * (values[i + LAG] - mean))
496        .sum::<f64>()
497        / (n as f64 * variance);
498
499    autocorr.clamp(-1.0, 1.0)
500}
501
502fn hash_batches(batches: &[RecordBatch]) -> [u8; 32] {
503    use std::{
504        collections::hash_map::DefaultHasher,
505        hash::{Hash, Hasher},
506    };
507
508    let mut hasher = DefaultHasher::new();
509
510    for batch in batches {
511        batch.num_rows().hash(&mut hasher);
512        batch.num_columns().hash(&mut hasher);
513
514        for col_idx in 0..batch.num_columns() {
515            let col = batch.column(col_idx);
516            col.len().hash(&mut hasher);
517        }
518    }
519
520    let hash64 = hasher.finish();
521
522    // Expand to 32 bytes by hashing again
523    let mut result = [0u8; 32];
524    result[..8].copy_from_slice(&hash64.to_le_bytes());
525    result[8..16].copy_from_slice(&hash64.to_be_bytes());
526    result[16..24].copy_from_slice(&(!hash64).to_le_bytes());
527    result[24..32].copy_from_slice(&hash64.rotate_left(32).to_le_bytes());
528
529    result
530}
531
532fn chrono_lite_now() -> String {
533    // Simple timestamp without chrono dependency
534    use std::time::{SystemTime, UNIX_EPOCH};
535
536    let duration = SystemTime::now()
537        .duration_since(UNIX_EPOCH)
538        .unwrap_or_default();
539    format!("{}", duration.as_secs())
540}
541
542fn generate_watermark_bits(buyer_hash: &[u8; 32], seller_key: &[u8; 32]) -> Vec<bool> {
543    // XOR buyer and seller to create deterministic sequence
544    let mut combined = [0u8; 32];
545    for i in 0..32 {
546        combined[i] = buyer_hash[i] ^ seller_key[i];
547    }
548
549    // Expand to bit sequence
550    let mut bits = Vec::with_capacity(256);
551    for byte in &combined {
552        for bit_pos in 0..8 {
553            bits.push((byte >> bit_pos) & 1 == 1);
554        }
555    }
556
557    bits
558}
559
560#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
561fn embed_in_column(
562    col: &dyn Array,
563    bits: &[bool],
564    strength: f32,
565) -> Result<std::sync::Arc<dyn Array>> {
566    use std::sync::Arc;
567
568    if let Some(f32_arr) = col.as_any().downcast_ref::<Float32Array>() {
569        let mut values: Vec<f32> = Vec::with_capacity(f32_arr.len());
570
571        for (i, val) in f32_arr.iter().enumerate() {
572            if let Some(v) = val {
573                let bit_idx = i % bits.len();
574                let modified = embed_bit_f32(v, bits[bit_idx], strength);
575                values.push(modified);
576            } else {
577                values.push(f32::NAN);
578            }
579        }
580
581        Ok(Arc::new(Float32Array::from(values)))
582    } else if let Some(f64_arr) = col.as_any().downcast_ref::<Float64Array>() {
583        let mut values: Vec<f64> = Vec::with_capacity(f64_arr.len());
584
585        for (i, val) in f64_arr.iter().enumerate() {
586            if let Some(v) = val {
587                let bit_idx = i % bits.len();
588                let modified = embed_bit_f64(v, bits[bit_idx], f64::from(strength));
589                values.push(modified);
590            } else {
591                values.push(f64::NAN);
592            }
593        }
594
595        Ok(Arc::new(Float64Array::from(values)))
596    } else {
597        Err(Error::Format("Column is not a float type".to_string()))
598    }
599}
600
601fn embed_bit_f32(value: f32, bit: bool, _strength: f32) -> f32 {
602    let mut bits = value.to_bits();
603    if bit {
604        bits |= 1;
605    } else {
606        bits &= !1;
607    }
608    f32::from_bits(bits)
609}
610
611fn embed_bit_f64(value: f64, bit: bool, _strength: f64) -> f64 {
612    let mut bits = value.to_bits();
613    if bit {
614        bits |= 1;
615    } else {
616        bits &= !1;
617    }
618    f64::from_bits(bits)
619}
620
621fn decode_watermark_bits(bits: &[bool], seller_key: &[u8; 32]) -> Option<[u8; 32]> {
622    if bits.len() < 256 {
623        return None;
624    }
625
626    // Extract first 256 bits as buyer hash (XORed with seller key)
627    let mut encoded = [0u8; 32];
628    for (byte_idx, chunk) in bits.chunks(8).enumerate().take(32) {
629        let mut byte = 0u8;
630        for (bit_idx, &bit) in chunk.iter().enumerate() {
631            if bit {
632                byte |= 1 << bit_idx;
633            }
634        }
635        encoded[byte_idx] = byte;
636    }
637
638    // XOR with seller key to recover buyer hash
639    let mut buyer_hash = [0u8; 32];
640    for i in 0..32 {
641        buyer_hash[i] = encoded[i] ^ seller_key[i];
642    }
643
644    Some(buyer_hash)
645}
646
647fn calculate_extraction_confidence(
648    observed_bits: &[bool],
649    decoded_buyer: &[u8; 32],
650    seller_key: &[u8; 32],
651) -> f64 {
652    // Generate expected bit sequence
653    let expected_bits = generate_watermark_bits(decoded_buyer, seller_key);
654
655    if observed_bits.len() < expected_bits.len() {
656        return 0.0;
657    }
658
659    // Count matching bits
660    let matches = observed_bits
661        .iter()
662        .zip(expected_bits.iter().cycle())
663        .filter(|(a, b)| a == b)
664        .count();
665
666    let total = observed_bits.len().min(expected_bits.len() * 4); // Check multiple cycles
667    let match_ratio = matches as f64 / total as f64;
668
669    // Convert to confidence (0.5 = random, 1.0 = perfect match)
670    ((match_ratio - 0.5) * 2.0).clamp(0.0, 1.0)
671}
672
673#[cfg(test)]
674mod tests {
675    use std::sync::Arc;
676
677    use arrow::datatypes::{DataType, Field, Schema};
678
679    use super::*;
680
681    fn create_test_batch_with_size(size: usize) -> RecordBatch {
682        let schema = Arc::new(Schema::new(vec![
683            Field::new("price", DataType::Float64, false),
684            Field::new("quantity", DataType::Float64, false),
685        ]));
686
687        // Use simple LCG to generate pseudo-random floats with varied LSBs
688        // This simulates real-world data where LSBs are essentially random
689        let mut seed: u64 = 12345;
690        let prices: Vec<f64> = (0..size)
691            .map(|_| {
692                seed = seed.wrapping_mul(6364136223846793005).wrapping_add(1);
693                let mantissa = (seed >> 11) as f64 / (1u64 << 53) as f64;
694                10.0 + mantissa * 100.0
695            })
696            .collect();
697
698        seed = 67890;
699        let quantities: Vec<f64> = (0..size)
700            .map(|_| {
701                seed = seed.wrapping_mul(6364136223846793005).wrapping_add(1);
702                let mantissa = (seed >> 11) as f64 / (1u64 << 53) as f64;
703                1.0 + mantissa * 50.0
704            })
705            .collect();
706
707        RecordBatch::try_new(
708            schema,
709            vec![
710                Arc::new(Float64Array::from(prices)),
711                Arc::new(Float64Array::from(quantities)),
712            ],
713        )
714        .expect("create batch")
715    }
716
717    #[test]
718    fn test_entropy_analysis_clean_data() {
719        // Need enough data for autocorrelation at lag 256
720        let batch = create_test_batch_with_size(1000);
721        let analysis = PiracyDetector::analyze_entropy(&[batch]);
722
723        // Clean data should have LOW autocorrelation (no repeating pattern)
724        assert!(
725            analysis.overall_autocorrelation < AUTOCORRELATION_THRESHOLD,
726            "Clean data autocorrelation should be low: {}",
727            analysis.overall_autocorrelation
728        );
729
730        // Clean data should have high entropy (random LSBs)
731        assert!(
732            analysis.overall_lsb_entropy > 0.9,
733            "Clean data LSB entropy should be high: {}",
734            analysis.overall_lsb_entropy
735        );
736    }
737
738    #[test]
739    fn test_watermark_embed_extract() {
740        // Need enough data for autocorrelation detection
741        let batch = create_test_batch_with_size(1000);
742        let seller_key = [42u8; 32];
743        let buyer_hash = [7u8; 32];
744
745        let embedder = WatermarkEmbedder::new(seller_key);
746
747        let watermark = Watermark {
748            buyer_hash,
749            strength: 0.001,
750            column_indices: vec![0, 1],
751            redundancy: 0.5,
752        };
753
754        // Embed watermark
755        let watermarked = embedder.embed(&[batch], &watermark).expect("embed failed");
756
757        // Watermarked data should have HIGH autocorrelation (repeating pattern)
758        let analysis = PiracyDetector::analyze_entropy(&watermarked);
759        assert!(
760            analysis.overall_autocorrelation > AUTOCORRELATION_THRESHOLD,
761            "Watermarked data autocorrelation should be high: {}",
762            analysis.overall_autocorrelation
763        );
764
765        // Extract watermark
766        let extracted = embedder.extract(&watermarked);
767        assert!(extracted.is_some(), "Should extract watermark");
768
769        let identity = extracted.expect("identity");
770        assert_eq!(identity.buyer_hash, buyer_hash);
771    }
772
773    #[test]
774    fn test_detection_without_key() {
775        // Need enough data for autocorrelation detection
776        let batch = create_test_batch_with_size(1000);
777        let seller_key = [42u8; 32];
778        let buyer_hash = [7u8; 32];
779
780        let embedder = WatermarkEmbedder::new(seller_key);
781
782        let watermark = Watermark {
783            buyer_hash,
784            strength: 0.001,
785            column_indices: vec![0, 1],
786            redundancy: 0.5,
787        };
788
789        let watermarked = embedder.embed(&[batch], &watermark).expect("embed failed");
790
791        // Detection should work without seller key (based on autocorrelation)
792        let detection = PiracyDetector::detect_watermark_presence(&watermarked);
793        assert!(
794            detection.likely_watermarked,
795            "Should detect watermark presence"
796        );
797        assert!(
798            detection.confidence > 0.5,
799            "Confidence: {}",
800            detection.confidence
801        );
802    }
803
804    #[test]
805    fn test_verify_buyer() {
806        // Need enough data for watermark extraction
807        let batch = create_test_batch_with_size(1000);
808        let seller_key = [42u8; 32];
809        let buyer_hash = [7u8; 32];
810        let wrong_buyer = [99u8; 32];
811
812        let embedder = WatermarkEmbedder::new(seller_key);
813
814        let watermark = Watermark {
815            buyer_hash,
816            strength: 0.001,
817            column_indices: vec![0, 1],
818            redundancy: 0.5,
819        };
820
821        let watermarked = embedder.embed(&[batch], &watermark).expect("embed failed");
822
823        assert!(
824            embedder.verify(&watermarked, &buyer_hash),
825            "Should verify correct buyer"
826        );
827        assert!(
828            !embedder.verify(&watermarked, &wrong_buyer),
829            "Should reject wrong buyer"
830        );
831    }
832
833    #[test]
834    fn test_shannon_entropy() {
835        // Uniform distribution should have entropy ~1.0
836        let uniform: Vec<bool> = (0..1000).map(|i| i % 2 == 0).collect();
837        let entropy = shannon_entropy_bits(&uniform);
838        assert!((entropy - 1.0).abs() < 0.01, "Uniform entropy: {}", entropy);
839
840        // All zeros should have entropy 0
841        let zeros = vec![false; 1000];
842        let entropy = shannon_entropy_bits(&zeros);
843        assert!(entropy < 0.01, "Zero entropy: {}", entropy);
844    }
845
846    #[test]
847    fn test_generate_evidence() {
848        let batch = create_test_batch_with_size(1000);
849        let buyer_hash = [7u8; 32];
850
851        let evidence =
852            PiracyDetector::generate_evidence(&[batch], &buyer_hash).expect("generate failed");
853
854        assert_eq!(evidence.buyer_hash, buyer_hash);
855        assert!(!evidence.column_evidence.is_empty());
856    }
857
858    #[test]
859    fn test_autocorrelation_detection() {
860        // Test that autocorrelation correctly distinguishes clean vs watermarked data
861        let clean_batch = create_test_batch_with_size(1000);
862        let seller_key = [42u8; 32];
863        let buyer_hash = [7u8; 32];
864
865        let embedder = WatermarkEmbedder::new(seller_key);
866        let watermark = Watermark {
867            buyer_hash,
868            strength: 0.001,
869            column_indices: vec![0, 1],
870            redundancy: 0.5,
871        };
872
873        let watermarked = embedder
874            .embed(&[clean_batch.clone()], &watermark)
875            .expect("embed");
876
877        // Analyze both
878        let clean_analysis = PiracyDetector::analyze_entropy(&[clean_batch]);
879        let watermarked_analysis = PiracyDetector::analyze_entropy(&watermarked);
880
881        // Clean should have low autocorrelation
882        assert!(
883            clean_analysis.overall_autocorrelation < 0.3,
884            "Clean autocorr: {}",
885            clean_analysis.overall_autocorrelation
886        );
887
888        // Watermarked should have high autocorrelation
889        assert!(
890            watermarked_analysis.overall_autocorrelation > 0.9,
891            "Watermarked autocorr: {}",
892            watermarked_analysis.overall_autocorrelation
893        );
894    }
895
896    #[test]
897    fn test_detection_result_default() {
898        let result = DetectionResult {
899            likely_watermarked: false,
900            confidence: 0.0,
901            suspicious_columns: Vec::new(),
902        };
903        let debug = format!("{:?}", result);
904        assert!(debug.contains("DetectionResult"));
905    }
906
907    #[test]
908    fn test_column_entropy_debug() {
909        let entropy = ColumnEntropy {
910            name: "col".to_string(),
911            shannon_entropy: 7.5,
912            lsb_entropy: 0.99,
913            ks_pvalue: 0.5,
914            chi_square_pvalue: 0.5,
915            autocorrelation_256: 0.1,
916        };
917        let debug = format!("{:?}", entropy);
918        assert!(debug.contains("ColumnEntropy"));
919        assert!(debug.contains("col"));
920    }
921
922    #[test]
923    fn test_watermark_clone() {
924        let watermark = Watermark {
925            buyer_hash: [1u8; 32],
926            strength: 0.001,
927            column_indices: vec![0],
928            redundancy: 0.5,
929        };
930        let cloned = watermark.clone();
931        assert_eq!(cloned.buyer_hash, watermark.buyer_hash);
932        assert_eq!(cloned.strength, watermark.strength);
933    }
934
935    #[test]
936    fn test_entropy_analysis_clone() {
937        let analysis = EntropyAnalysis {
938            columns: Vec::new(),
939            overall_lsb_entropy: 0.99,
940            overall_autocorrelation: 0.1,
941            confidence: 0.0,
942            anomalous_columns: Vec::new(),
943        };
944        let cloned = analysis.clone();
945        assert_eq!(cloned.overall_lsb_entropy, 0.99);
946    }
947
948    #[test]
949    fn test_entropy_analysis_empty_batches() {
950        let analysis = PiracyDetector::analyze_entropy(&[]);
951        assert_eq!(analysis.overall_lsb_entropy, 1.0);
952        assert_eq!(analysis.overall_autocorrelation, 0.0);
953        assert_eq!(analysis.confidence, 0.0);
954        assert!(analysis.columns.is_empty());
955    }
956
957    #[test]
958    fn test_is_float_type() {
959        assert!(is_float_type(&DataType::Float32));
960        assert!(is_float_type(&DataType::Float64));
961        assert!(!is_float_type(&DataType::Int32));
962        assert!(!is_float_type(&DataType::Utf8));
963    }
964
965    #[test]
966    fn test_chi_square_empty() {
967        let bits: Vec<bool> = vec![];
968        assert_eq!(chi_square_uniformity(&bits), 1.0);
969    }
970
971    #[test]
972    fn test_ks_test_empty() {
973        let bits: Vec<bool> = vec![];
974        assert_eq!(ks_test_uniform(&bits), 1.0);
975    }
976
977    #[test]
978    fn test_shannon_entropy_empty() {
979        let bits: Vec<bool> = vec![];
980        assert_eq!(shannon_entropy_bits(&bits), 1.0);
981    }
982
983    #[test]
984    fn test_autocorrelation_short_data() {
985        // Data shorter than 2*LAG should return 0
986        let bits: Vec<bool> = (0..500).map(|i| i % 2 == 0).collect();
987        let autocorr = autocorrelation_lag_256(&bits);
988        assert_eq!(autocorr, 0.0);
989    }
990
991    #[test]
992    fn test_hash_batches() {
993        let batch = create_test_batch_with_size(100);
994        let hash1 = hash_batches(&[batch.clone()]);
995        let hash2 = hash_batches(&[batch]);
996        assert_eq!(hash1, hash2); // Same data should produce same hash
997    }
998
999    #[test]
1000    fn test_generate_watermark_bits() {
1001        let buyer_hash = [1u8; 32];
1002        let seller_key = [2u8; 32];
1003        let bits = generate_watermark_bits(&buyer_hash, &seller_key);
1004        assert_eq!(bits.len(), 256);
1005    }
1006
1007    #[test]
1008    fn test_decode_watermark_bits_short() {
1009        let bits: Vec<bool> = vec![true; 100]; // Too short
1010        let result = decode_watermark_bits(&bits, &[0u8; 32]);
1011        assert!(result.is_none());
1012    }
1013
1014    #[test]
1015    fn test_decode_watermark_bits_roundtrip() {
1016        let buyer_hash = [42u8; 32];
1017        let seller_key = [99u8; 32];
1018        let bits = generate_watermark_bits(&buyer_hash, &seller_key);
1019
1020        let decoded = decode_watermark_bits(&bits, &seller_key);
1021        assert!(decoded.is_some());
1022        assert_eq!(decoded.unwrap(), buyer_hash);
1023    }
1024
1025    #[test]
1026    fn test_embed_bit_f32() {
1027        let val = 1.0f32;
1028        let embedded_1 = embed_bit_f32(val, true, 0.001);
1029        let embedded_0 = embed_bit_f32(val, false, 0.001);
1030
1031        // LSB should be set correctly
1032        assert_eq!(embedded_1.to_bits() & 1, 1);
1033        assert_eq!(embedded_0.to_bits() & 1, 0);
1034    }
1035
1036    #[test]
1037    fn test_embed_bit_f64() {
1038        let val = 1.0f64;
1039        let embedded_1 = embed_bit_f64(val, true, 0.001);
1040        let embedded_0 = embed_bit_f64(val, false, 0.001);
1041
1042        // LSB should be set correctly
1043        assert_eq!(embedded_1.to_bits() & 1, 1);
1044        assert_eq!(embedded_0.to_bits() & 1, 0);
1045    }
1046
1047    #[test]
1048    fn test_extraction_confidence_short_data() {
1049        let observed_bits: Vec<bool> = vec![true; 100]; // Too short
1050        let decoded = [0u8; 32];
1051        let seller_key = [0u8; 32];
1052        let confidence = calculate_extraction_confidence(&observed_bits, &decoded, &seller_key);
1053        assert_eq!(confidence, 0.0);
1054    }
1055
1056    #[test]
1057    fn test_extract_empty_batches() {
1058        let embedder = WatermarkEmbedder::new([0u8; 32]);
1059        let result = embedder.extract(&[]);
1060        assert!(result.is_none());
1061    }
1062
1063    #[test]
1064    fn test_collect_lsb_bits_f32() {
1065        let schema = Arc::new(Schema::new(vec![Field::new(
1066            "value",
1067            DataType::Float32,
1068            false,
1069        )]));
1070        let values: Vec<f32> = (0..100).map(|i| i as f32).collect();
1071        let batch =
1072            RecordBatch::try_new(schema, vec![Arc::new(Float32Array::from(values))]).unwrap();
1073
1074        let bits = collect_lsb_bits(&[batch], 0);
1075        assert_eq!(bits.len(), 100);
1076    }
1077
1078    #[test]
1079    fn test_collect_lsb_bits_column_out_of_range() {
1080        let batch = create_test_batch_with_size(10);
1081        let bits = collect_lsb_bits(&[batch], 999); // Out of range
1082        assert!(bits.is_empty());
1083    }
1084
1085    #[test]
1086    fn test_legal_evidence_clone() {
1087        let evidence = LegalEvidence {
1088            dataset_hash: [0u8; 32],
1089            buyer_hash: [1u8; 32],
1090            confidence: 0.95,
1091            analyzed_at: "2024-01-01".to_string(),
1092            column_evidence: vec![],
1093        };
1094        let cloned = evidence.clone();
1095        assert_eq!(cloned.confidence, 0.95);
1096    }
1097
1098    #[test]
1099    fn test_buyer_identity_clone() {
1100        let identity = BuyerIdentity {
1101            buyer_hash: [42u8; 32],
1102            confidence: 0.9,
1103        };
1104        let cloned = identity.clone();
1105        assert_eq!(cloned.buyer_hash, identity.buyer_hash);
1106    }
1107
1108    #[test]
1109    fn test_column_evidence_clone() {
1110        let evidence = ColumnEvidence {
1111            name: "test".to_string(),
1112            lsb_entropy: 0.98,
1113            chi_square_pvalue: 0.5,
1114            matching_bits: 100,
1115            total_bits: 200,
1116        };
1117        let cloned = evidence.clone();
1118        assert_eq!(cloned.name, "test");
1119    }
1120
1121    #[test]
1122    fn test_detection_result_clone() {
1123        let result = DetectionResult {
1124            likely_watermarked: true,
1125            confidence: 0.9,
1126            suspicious_columns: vec!["col1".to_string()],
1127        };
1128        let cloned = result.clone();
1129        assert!(cloned.likely_watermarked);
1130    }
1131
1132    #[test]
1133    fn test_chrono_lite_now() {
1134        let timestamp = chrono_lite_now();
1135        // Should be a parseable number (seconds since epoch)
1136        assert!(!timestamp.is_empty());
1137        let _: u64 = timestamp.parse().expect("Should be a number");
1138    }
1139
1140    #[test]
1141    fn test_confidence_calculation_ranges() {
1142        // Test various autocorrelation ranges for confidence
1143        let _analysis_high = EntropyAnalysis {
1144            columns: vec![],
1145            overall_lsb_entropy: 0.9,
1146            overall_autocorrelation: 0.95, // Very high
1147            confidence: 0.0,
1148            anomalous_columns: vec![],
1149        };
1150
1151        // Confidence should be high for high autocorrelation
1152        let analysis = PiracyDetector::analyze_entropy(&[]);
1153        assert!(analysis.confidence >= 0.0 && analysis.confidence <= 1.0);
1154    }
1155}