use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::{Debug, Display};
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct VersionId(pub String);
impl VersionId {
pub fn new() -> Self {
VersionId(uuid_v4())
}
pub fn from_str(s: &str) -> Self {
VersionId(s.to_string())
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl Default for VersionId {
fn default() -> Self {
Self::new()
}
}
impl Display for VersionId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
fn uuid_v4() -> String {
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("operation should succeed")
.as_nanos();
let counter = COUNTER.fetch_add(1, Ordering::SeqCst);
let thread_id = std::thread::current().id();
let thread_hash = format!("{:?}", thread_id).len() as u64;
let random_part: u64 = (timestamp as u64)
.wrapping_add(counter)
.wrapping_add(thread_hash.wrapping_mul(0x5851F42D4C957F2D));
format!(
"{:08x}-{:04x}-4{:03x}-{:04x}-{:012x}",
(random_part >> 32) as u32,
((random_part >> 16) & 0xFFFF) as u16,
(random_part & 0x0FFF) as u16,
((random_part >> 48) & 0x3FFF) as u16 | 0x8000,
(random_part ^ counter) & 0xFFFFFFFFFFFF
)
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum OperationType {
Create,
Load { source: String, format: String },
Select { columns: Vec<String> },
Filter { condition: String },
Sort {
columns: Vec<String>,
ascending: Vec<bool>,
},
AddColumn { column_name: String },
DropColumn { columns: Vec<String> },
Rename { old_name: String, new_name: String },
Aggregate {
group_by: Vec<String>,
aggregations: Vec<String>,
},
Join {
other_version: VersionId,
join_type: String,
on: Vec<String>,
},
Concat { other_versions: Vec<VersionId> },
FillNA { strategy: String },
Cast { column: String, to_type: String },
Transform { name: String, description: String },
Save { destination: String, format: String },
Custom {
name: String,
params: HashMap<String, String>,
},
}
impl Display for OperationType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OperationType::Create => write!(f, "CREATE"),
OperationType::Load { source, format } => write!(f, "LOAD({}, {})", source, format),
OperationType::Select { columns } => write!(f, "SELECT({})", columns.join(", ")),
OperationType::Filter { condition } => write!(f, "FILTER({})", condition),
OperationType::Sort { columns, .. } => write!(f, "SORT({})", columns.join(", ")),
OperationType::AddColumn { column_name } => write!(f, "ADD_COLUMN({})", column_name),
OperationType::DropColumn { columns } => write!(f, "DROP({})", columns.join(", ")),
OperationType::Rename { old_name, new_name } => {
write!(f, "RENAME({} -> {})", old_name, new_name)
}
OperationType::Aggregate {
group_by,
aggregations,
} => {
write!(
f,
"AGGREGATE(BY: {}, AGG: {})",
group_by.join(", "),
aggregations.join(", ")
)
}
OperationType::Join { join_type, on, .. } => {
write!(f, "JOIN({}, ON: {})", join_type, on.join(", "))
}
OperationType::Concat { other_versions } => {
write!(f, "CONCAT({} DataFrames)", other_versions.len())
}
OperationType::FillNA { strategy } => write!(f, "FILL_NA({})", strategy),
OperationType::Cast { column, to_type } => {
write!(f, "CAST({} -> {})", column, to_type)
}
OperationType::Transform { name, .. } => write!(f, "TRANSFORM({})", name),
OperationType::Save {
destination,
format,
} => {
write!(f, "SAVE({}, {})", destination, format)
}
OperationType::Custom { name, .. } => write!(f, "CUSTOM({})", name),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Operation {
pub id: String,
pub operation_type: OperationType,
pub timestamp: DateTime<Utc>,
pub inputs: Vec<VersionId>,
pub output: VersionId,
pub metadata: HashMap<String, String>,
pub user: Option<String>,
pub duration_ms: Option<u64>,
}
impl Operation {
pub fn new(operation_type: OperationType, inputs: Vec<VersionId>, output: VersionId) -> Self {
Operation {
id: uuid_v4(),
operation_type,
timestamp: Utc::now(),
inputs,
output,
metadata: HashMap::new(),
user: None,
duration_ms: None,
}
}
pub fn with_metadata(mut self, key: &str, value: &str) -> Self {
self.metadata.insert(key.to_string(), value.to_string());
self
}
pub fn with_user(mut self, user: &str) -> Self {
self.user = Some(user.to_string());
self
}
pub fn with_duration(mut self, duration_ms: u64) -> Self {
self.duration_ms = Some(duration_ms);
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DataSchema {
pub columns: Vec<String>,
pub types: HashMap<String, String>,
pub row_count: usize,
}
impl DataSchema {
pub fn new(columns: Vec<String>, types: HashMap<String, String>, row_count: usize) -> Self {
DataSchema {
columns,
types,
row_count,
}
}
pub fn is_compatible(&self, other: &DataSchema) -> bool {
if self.columns != other.columns {
return false;
}
for col in &self.columns {
if self.types.get(col) != other.types.get(col) {
return false;
}
}
true
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DataVersion {
pub id: VersionId,
pub name: Option<String>,
pub description: Option<String>,
pub created_at: DateTime<Utc>,
pub schema: DataSchema,
pub parents: Vec<VersionId>,
pub tags: Vec<String>,
pub metadata: HashMap<String, String>,
pub data_hash: Option<String>,
pub size_bytes: Option<usize>,
}
impl DataVersion {
pub fn new(schema: DataSchema) -> Self {
DataVersion {
id: VersionId::new(),
name: None,
description: None,
created_at: Utc::now(),
schema,
parents: Vec::new(),
tags: Vec::new(),
metadata: HashMap::new(),
data_hash: None,
size_bytes: None,
}
}
pub fn with_name(mut self, name: &str) -> Self {
self.name = Some(name.to_string());
self
}
pub fn with_description(mut self, description: &str) -> Self {
self.description = Some(description.to_string());
self
}
pub fn with_parents(mut self, parents: Vec<VersionId>) -> Self {
self.parents = parents;
self
}
pub fn with_tag(mut self, tag: &str) -> Self {
self.tags.push(tag.to_string());
self
}
pub fn with_metadata(mut self, key: &str, value: &str) -> Self {
self.metadata.insert(key.to_string(), value.to_string());
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VersionDiff {
pub from_version: VersionId,
pub to_version: VersionId,
pub columns_added: Vec<String>,
pub columns_removed: Vec<String>,
pub type_changes: HashMap<String, (String, String)>,
pub row_count_diff: i64,
pub operations: Vec<Operation>,
}
impl VersionDiff {
pub fn new(from: VersionId, to: VersionId) -> Self {
VersionDiff {
from_version: from,
to_version: to,
columns_added: Vec::new(),
columns_removed: Vec::new(),
type_changes: HashMap::new(),
row_count_diff: 0,
operations: Vec::new(),
}
}
pub fn from_schemas(from: &DataVersion, to: &DataVersion) -> Self {
let mut diff = VersionDiff::new(from.id.clone(), to.id.clone());
for col in &to.schema.columns {
if !from.schema.columns.contains(col) {
diff.columns_added.push(col.clone());
}
}
for col in &from.schema.columns {
if !to.schema.columns.contains(col) {
diff.columns_removed.push(col.clone());
}
}
for col in &from.schema.columns {
if to.schema.columns.contains(col) {
let from_type = from.schema.types.get(col);
let to_type = to.schema.types.get(col);
if from_type != to_type {
diff.type_changes.insert(
col.clone(),
(
from_type.cloned().unwrap_or_default(),
to_type.cloned().unwrap_or_default(),
),
);
}
}
}
diff.row_count_diff = to.schema.row_count as i64 - from.schema.row_count as i64;
diff
}
pub fn has_changes(&self) -> bool {
!self.columns_added.is_empty()
|| !self.columns_removed.is_empty()
|| !self.type_changes.is_empty()
|| self.row_count_diff != 0
}
}
#[derive(Debug, Clone)]
pub enum VersioningError {
VersionNotFound(VersionId),
OperationNotFound(String),
InvalidOperation(String),
StorageError(String),
SerializationError(String),
}
impl std::fmt::Display for VersioningError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
VersioningError::VersionNotFound(id) => {
write!(f, "Version not found: {}", id)
}
VersioningError::OperationNotFound(id) => {
write!(f, "Operation not found: {}", id)
}
VersioningError::InvalidOperation(msg) => {
write!(f, "Invalid operation: {}", msg)
}
VersioningError::StorageError(msg) => {
write!(f, "Storage error: {}", msg)
}
VersioningError::SerializationError(msg) => {
write!(f, "Serialization error: {}", msg)
}
}
}
}
impl std::error::Error for VersioningError {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_version_id_creation() {
let v1 = VersionId::new();
let v2 = VersionId::new();
assert_ne!(v1, v2);
}
#[test]
fn test_operation_type_display() {
let op = OperationType::Select {
columns: vec!["a".to_string(), "b".to_string()],
};
assert!(op.to_string().contains("SELECT"));
assert!(op.to_string().contains("a"));
assert!(op.to_string().contains("b"));
}
#[test]
fn test_data_schema_compatibility() {
let schema1 = DataSchema::new(
vec!["a".to_string(), "b".to_string()],
[
("a".to_string(), "f64".to_string()),
("b".to_string(), "String".to_string()),
]
.into_iter()
.collect(),
100,
);
let schema2 = DataSchema::new(
vec!["a".to_string(), "b".to_string()],
[
("a".to_string(), "f64".to_string()),
("b".to_string(), "String".to_string()),
]
.into_iter()
.collect(),
200,
);
assert!(schema1.is_compatible(&schema2));
}
#[test]
fn test_version_diff() {
let schema1 = DataSchema::new(
vec!["a".to_string(), "b".to_string()],
[
("a".to_string(), "f64".to_string()),
("b".to_string(), "String".to_string()),
]
.into_iter()
.collect(),
100,
);
let schema2 = DataSchema::new(
vec!["a".to_string(), "c".to_string()],
[
("a".to_string(), "f64".to_string()),
("c".to_string(), "i64".to_string()),
]
.into_iter()
.collect(),
150,
);
let v1 = DataVersion::new(schema1);
let v2 = DataVersion::new(schema2);
let diff = VersionDiff::from_schemas(&v1, &v2);
assert!(diff.columns_added.contains(&"c".to_string()));
assert!(diff.columns_removed.contains(&"b".to_string()));
assert_eq!(diff.row_count_diff, 50);
}
}