pub mod core;
pub mod tracker;
pub use core::{
DataSchema, DataVersion, Operation, OperationType, VersionDiff, VersionId, VersioningError,
};
pub use tracker::{LineageConfig, LineageTracker, SharedLineageTracker, TrackerStats};
use crate::DataFrame;
use std::collections::HashMap;
pub trait DataFrameVersioning {
fn to_schema(&self) -> DataSchema;
fn create_version(&self, tracker: &mut LineageTracker) -> VersionId;
fn create_named_version(&self, tracker: &mut LineageTracker, name: &str) -> VersionId;
}
impl DataFrameVersioning for DataFrame {
fn to_schema(&self) -> DataSchema {
let columns = self.column_names();
let types: HashMap<String, String> = columns
.iter()
.map(|col| {
let type_str = if self.is_numeric_column(col) {
"f64"
} else if self.is_categorical(col) {
"Categorical"
} else {
"String"
};
(col.clone(), type_str.to_string())
})
.collect();
DataSchema::new(columns, types, self.row_count())
}
fn create_version(&self, tracker: &mut LineageTracker) -> VersionId {
let schema = self.to_schema();
let version = DataVersion::new(schema);
tracker.register_version(version)
}
fn create_named_version(&self, tracker: &mut LineageTracker, name: &str) -> VersionId {
let schema = self.to_schema();
let version = DataVersion::new(schema).with_name(name);
tracker.register_version(version)
}
}
pub struct VersionedTransform<'a> {
tracker: &'a mut LineageTracker,
input_version: VersionId,
operations: Vec<OperationType>,
}
impl<'a> VersionedTransform<'a> {
pub fn new(tracker: &'a mut LineageTracker, input_version: VersionId) -> Self {
VersionedTransform {
tracker,
input_version,
operations: Vec::new(),
}
}
pub fn select(mut self, columns: Vec<String>) -> Self {
self.operations.push(OperationType::Select { columns });
self
}
pub fn filter(mut self, condition: &str) -> Self {
self.operations.push(OperationType::Filter {
condition: condition.to_string(),
});
self
}
pub fn sort(mut self, columns: Vec<String>, ascending: Vec<bool>) -> Self {
self.operations
.push(OperationType::Sort { columns, ascending });
self
}
pub fn add_column(mut self, column_name: &str) -> Self {
self.operations.push(OperationType::AddColumn {
column_name: column_name.to_string(),
});
self
}
pub fn drop_columns(mut self, columns: Vec<String>) -> Self {
self.operations.push(OperationType::DropColumn { columns });
self
}
pub fn transform(mut self, name: &str, description: &str) -> Self {
self.operations.push(OperationType::Transform {
name: name.to_string(),
description: description.to_string(),
});
self
}
pub fn commit(self, output_schema: DataSchema) -> VersionId {
let output_version =
DataVersion::new(output_schema).with_parents(vec![self.input_version.clone()]);
let output_id = self.tracker.register_version(output_version);
for op_type in self.operations {
let op = Operation::new(op_type, vec![self.input_version.clone()], output_id.clone());
self.tracker.record_operation(op);
}
output_id
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Series;
fn create_test_dataframe() -> DataFrame {
let mut df = DataFrame::new();
let names = Series::new(
vec![
"Alice".to_string(),
"Bob".to_string(),
"Charlie".to_string(),
],
Some("name".to_string()),
)
.expect("operation should succeed");
let values = Series::new(vec![1.0, 2.0, 3.0], Some("value".to_string()))
.expect("operation should succeed");
df.add_column("name".to_string(), names)
.expect("operation should succeed");
df.add_column("value".to_string(), values)
.expect("operation should succeed");
df
}
#[test]
fn test_dataframe_to_schema() {
let df = create_test_dataframe();
let schema = df.to_schema();
assert_eq!(schema.columns.len(), 2);
assert_eq!(schema.row_count, 3);
}
#[test]
fn test_dataframe_create_version() {
let df = create_test_dataframe();
let mut tracker = LineageTracker::new();
let version_id = df.create_version(&mut tracker);
let version = tracker
.get_version(&version_id)
.expect("operation should succeed");
assert_eq!(version.schema.row_count, 3);
}
#[test]
fn test_versioned_transform() {
let df = create_test_dataframe();
let mut tracker = LineageTracker::new();
let v1 = df.create_named_version(&mut tracker, "original");
let schema2 = DataSchema::new(
vec!["name".to_string()],
[("name".to_string(), "String".to_string())]
.into_iter()
.collect(),
3,
);
let v2 = VersionedTransform::new(&mut tracker, v1.clone())
.select(vec!["name".to_string()])
.commit(schema2);
let lineage = tracker.get_lineage(&v2);
assert_eq!(lineage.len(), 2);
let ops = tracker.get_operations_producing(&v2);
assert_eq!(ops.len(), 1);
}
}