lcpfs 2026.1.102

LCP File System - A ZFS-inspired copy-on-write filesystem for Rust
// Copyright 2025 LunaOS Contributors
// SPDX-License-Identifier: Apache-2.0

//! Data Lineage Tracking for LCPFS.
//!
//! This module provides tracking of data provenance and lineage,
//! recording where data came from and what transformations were applied.
//!
//! # Features
//!
//! - **Lineage Graph**: Directed acyclic graph of data relationships
//! - **Relationship Types**: Copy, Derived, Merged, Import, etc.
//! - **Graph Traversal**: Find ancestors and descendants
//! - **Search**: Query nodes by dataset, path, creator, etc.
//! - **DOT Export**: Visualize with GraphViz
//!
//! # Example
//!
//! ```ignore
//! use lcpfs::lineage::{record_create, record_derivation, get_ancestors};
//!
//! // Record initial file creation
//! let source_id = record_create("pool/data", 100, "/raw/input.csv",
//!     checksum, "importer", timestamp)?;
//!
//! // Record derived file
//! let output_id = record_create("pool/data", 101, "/processed/output.csv",
//!     new_checksum, "etl_job", timestamp)?;
//!
//! // Link the derivation
//! record_derivation(&[source_id], output_id, "csv_transform", timestamp)?;
//!
//! // Query ancestors
//! let ancestors = get_ancestors(output_id, 10)?;
//! println!("Lineage graph:\n{}", ancestors.to_dot());
//! ```
//!
//! # Relationship Types
//!
//! - **Copy**: Direct copy with no modifications
//! - **Derived**: Transformed from source(s)
//! - **Merged**: Combined from multiple sources
//! - **Import**: Imported from external source
//! - **Updated**: New version of existing object
//! - **Snapshot**: Point-in-time snapshot
//! - **Clone**: Clone (similar to snapshot)
//! - **Renamed**: Renamed file
//!
//! # Use Cases
//!
//! - **Compliance**: Track data for regulatory requirements
//! - **Debugging**: Understand data flow issues
//! - **Impact Analysis**: See what's affected by changes
//! - **Auditing**: Record all data operations

pub mod store;
pub mod types;

// Re-exports
pub use store::{
    LineageStats, add_edge, add_node, clear_all, delete_node, get_ancestors, get_by_object,
    get_by_path, get_descendants, get_edges_from, get_edges_to, get_latest_version, get_node,
    get_stats, record_copy, record_create, record_derivation, record_import, record_merge,
    record_update, search, update_node,
};
pub use types::{
    LineageEdge, LineageError, LineageGraph, LineageNode, LineageQuery, LineageRelation,
    LineageResult,
};

// ═══════════════════════════════════════════════════════════════════════════════
// INTEGRATION TESTS
// ═══════════════════════════════════════════════════════════════════════════════

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_exports_accessible() {
        // Verify all re-exports are accessible
        let _ = LineageRelation::Copy;
        let _ = LineageQuery::new();
    }

    #[test]
    fn test_complete_lineage_workflow() {
        // Use unique object IDs to avoid conflicts with other tests
        let base_obj = 10000u64;

        // Create original file
        let original = record_create(
            "workflow_test",
            base_obj,
            "/raw/data.csv",
            [0x1234, 0x5678, 0, 0],
            "data_import",
            1000,
        )
        .unwrap();

        // Create first processing step
        let step1 = record_create(
            "workflow_test",
            base_obj + 1,
            "/processed/step1.csv",
            [0xabcd, 0xef01, 0, 0],
            "etl_job",
            2000,
        )
        .unwrap();
        record_derivation(&[original], step1, "filter_nulls", 2000).unwrap();

        // Create second processing step
        let step2 = record_create(
            "workflow_test",
            base_obj + 2,
            "/processed/step2.csv",
            [0xfeed, 0xface, 0, 0],
            "etl_job",
            3000,
        )
        .unwrap();
        record_derivation(&[step1], step2, "aggregate", 3000).unwrap();

        // Create a copy
        let backup = record_create(
            "workflow_backup",
            base_obj + 100,
            "/backup/step2_backup.csv",
            [0xfeed, 0xface, 0, 0],
            "backup_job",
            4000,
        )
        .unwrap();
        record_copy(step2, backup, 4000).unwrap();

        // Test ancestor traversal
        let ancestors = get_ancestors(step2, 10).unwrap();
        assert_eq!(ancestors.node_count(), 3); // original, step1, step2
        assert_eq!(ancestors.edge_count(), 2);

        // Test descendant traversal
        let descendants = get_descendants(original, 10).unwrap();
        assert_eq!(descendants.node_count(), 4); // original, step1, step2, backup

        // Test DOT export
        let dot = ancestors.to_dot();
        assert!(dot.contains("digraph"));
        // Labels are transform names ("filter_nulls", "aggregate"), not relation type
        assert!(dot.contains("filter_nulls") || dot.contains("aggregate"));
    }

    #[test]
    fn test_merge_lineage() {
        // Use unique object IDs
        let base = 20000u64;

        // Create two source files
        let src1 =
            record_create("merge_ds", base, "/source1.csv", [1, 0, 0, 0], "user", 0).unwrap();
        let src2 = record_create(
            "merge_ds",
            base + 1,
            "/source2.csv",
            [2, 0, 0, 0],
            "user",
            0,
        )
        .unwrap();

        // Create merged output
        let merged = record_create(
            "merge_ds",
            base + 2,
            "/merged.csv",
            [3, 0, 0, 0],
            "user",
            100,
        )
        .unwrap();
        record_merge(&[src1, src2], merged, 100).unwrap();

        // Verify merge edges
        let edges = get_edges_to(merged);
        assert_eq!(edges.len(), 2);
        assert!(edges.iter().all(|e| e.relation == LineageRelation::Merged));
    }

    #[test]
    fn test_import_lineage() {
        // Use unique object ID
        let obj = 30000u64;

        // Create file from external source
        let file = record_create(
            "import_ds",
            obj,
            "/imported.csv",
            [0; 4],
            "import_job",
            1000,
        )
        .unwrap();
        record_import(file, "s3://bucket/external/file.csv", 1000).unwrap();

        // Should have an edge from external source
        let edges = get_edges_to(file);
        assert_eq!(edges.len(), 1);
        assert_eq!(edges[0].relation, LineageRelation::Import);
    }

    #[test]
    fn test_version_lineage() {
        // Use unique object ID
        let obj = 40000u64;

        // Create version 1
        let v1 = record_create("version_ds", obj, "/file.txt", [1, 0, 0, 0], "user", 1000).unwrap();

        // Create version 2
        let mut v2_node = LineageNode::new(
            0, // Will be assigned
            obj,
            2,
            "/file.txt",
            [2, 0, 0, 0],
            2000,
            "user",
            "version_ds",
        );
        v2_node.id = 1000; // Temporary
        let v2 = add_node(v2_node.clone()).unwrap();

        // Update the ID
        let mut final_node = v2_node;
        final_node.id = v2;
        update_node(final_node).unwrap();

        record_update(v1, v2, 2000).unwrap();

        // Get latest version
        let latest = get_latest_version(obj).unwrap();
        assert_eq!(latest.version, 2);
    }

    #[test]
    fn test_search_by_time_range() {
        // Use unique object IDs and timestamps
        let base = 50000u64;
        let ts_base = 50000000u64;

        record_create(
            "time_ds",
            base,
            "/time_a",
            [0; 4],
            "time_user",
            ts_base + 1000,
        )
        .unwrap();
        record_create(
            "time_ds",
            base + 1,
            "/time_b",
            [0; 4],
            "time_user",
            ts_base + 2000,
        )
        .unwrap();
        record_create(
            "time_ds",
            base + 2,
            "/time_c",
            [0; 4],
            "time_user",
            ts_base + 3000,
        )
        .unwrap();

        let query = LineageQuery::new().created_between(ts_base + 1500, ts_base + 2500);
        let results = search(&query);
        assert_eq!(results.len(), 1);
        assert_eq!(results[0].path, "/time_b");
    }

    #[test]
    fn test_search_with_pagination() {
        // Use unique dataset name and object IDs
        let base = 60000u64;

        for i in 0..10 {
            record_create(
                "pagination_ds",
                base + i,
                &alloc::format!("/paged_file{}", i),
                [0; 4],
                "paged_user",
                i,
            )
            .unwrap();
        }

        let query = LineageQuery::new()
            .dataset("pagination_ds")
            .limit(3)
            .offset(2);
        let results = search(&query);
        assert_eq!(results.len(), 3);
    }

    #[test]
    fn test_statistics() {
        // Use unique dataset/creator names and object IDs
        let base = 70000u64;

        let n1 = record_create("stats_ds1", base, "/stats_a", [0; 4], "stats_user1", 0).unwrap();
        let n2 =
            record_create("stats_ds1", base + 1, "/stats_b", [0; 4], "stats_user2", 0).unwrap();
        record_create("stats_ds2", base + 2, "/stats_c", [0; 4], "stats_user1", 0).unwrap();

        let _ = record_copy(n1, n2, 0);

        let stats = get_stats();
        // Stats are global, so we just verify they're >= what we added
        assert!(stats.total_nodes >= 3);
        assert!(stats.datasets >= 2);
        assert!(stats.creators >= 2);
    }

    #[test]
    fn test_graph_dot_export() {
        // Use unique object IDs
        let base = 80000u64;

        let n1 = record_create("dot_ds", base, "/dot_input.csv", [0; 4], "dot_etl", 0).unwrap();
        let n2 = record_create(
            "dot_ds",
            base + 1,
            "/dot_output.csv",
            [0; 4],
            "dot_etl",
            100,
        )
        .unwrap();
        record_derivation(&[n1], n2, "dot_transform", 100).unwrap();

        let graph = get_descendants(n1, 10).unwrap();
        let dot = graph.to_dot();

        assert!(dot.contains("digraph lineage"));
        assert!(dot.contains("dot_input.csv"));
        assert!(dot.contains("dot_output.csv"));
        assert!(dot.contains("dot_transform"));
    }
}