use std::collections::BTreeMap;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use ulid::Ulid;
use crate::core::schema::{DataType, EdgeTypeMeta, LabelMeta};
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Ord, PartialOrd, Serialize, Deserialize)]
#[serde(transparent)]
pub struct ForkId(pub Ulid);
impl ForkId {
#[must_use]
pub fn new() -> Self {
Self(Ulid::new())
}
pub fn parse(s: &str) -> Result<Self, ulid::DecodeError> {
Ulid::from_string(s).map(Self)
}
}
impl Default for ForkId {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Display for ForkId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
#[non_exhaustive]
pub enum ForkStatus {
Pending,
Active,
Tombstoned,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ForkInfo {
pub id: ForkId,
pub name: String,
#[serde(default)]
pub parent_fork_id: Option<ForkId>,
pub parent_snapshot_id: String,
pub created_at: DateTime<Utc>,
#[serde(default)]
pub ttl_expires_at: Option<DateTime<Utc>>,
pub schema_version_at_creation: u32,
pub datasets: BTreeMap<String, String>,
pub status: ForkStatus,
}
impl ForkInfo {
#[must_use]
pub fn new_pending(
id: ForkId,
name: impl Into<String>,
parent_snapshot_id: impl Into<String>,
schema_version: u32,
) -> Self {
Self {
id,
name: name.into(),
parent_fork_id: None,
parent_snapshot_id: parent_snapshot_id.into(),
created_at: Utc::now(),
ttl_expires_at: None,
schema_version_at_creation: schema_version,
datasets: BTreeMap::new(),
status: ForkStatus::Pending,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PropertyAddition {
pub owner: String,
pub owner_kind: PropertyOwnerKind,
pub property: String,
pub data_type: DataType,
pub nullable: bool,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum PropertyOwnerKind {
Label,
EdgeType,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct SchemaDelta {
#[serde(default)]
pub added_labels: Vec<(String, LabelMeta)>,
#[serde(default)]
pub added_edge_types: Vec<(String, EdgeTypeMeta)>,
#[serde(default)]
pub added_properties: Vec<PropertyAddition>,
}
impl SchemaDelta {
#[must_use]
pub fn empty() -> Self {
Self::default()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.added_labels.is_empty()
&& self.added_edge_types.is_empty()
&& self.added_properties.is_empty()
}
#[must_use]
pub fn merge_atop(&self, base: &SchemaDelta) -> SchemaDelta {
use std::collections::BTreeMap;
let mut labels: BTreeMap<String, LabelMeta> = BTreeMap::new();
for (name, meta) in &base.added_labels {
labels.insert(name.clone(), meta.clone());
}
for (name, meta) in &self.added_labels {
labels.insert(name.clone(), meta.clone());
}
let mut edge_types: BTreeMap<String, EdgeTypeMeta> = BTreeMap::new();
for (name, meta) in &base.added_edge_types {
edge_types.insert(name.clone(), meta.clone());
}
for (name, meta) in &self.added_edge_types {
edge_types.insert(name.clone(), meta.clone());
}
let mut properties: BTreeMap<(String, String), PropertyAddition> = BTreeMap::new();
for add in &base.added_properties {
properties.insert((add.owner.clone(), add.property.clone()), add.clone());
}
for add in &self.added_properties {
properties.insert((add.owner.clone(), add.property.clone()), add.clone());
}
SchemaDelta {
added_labels: labels.into_iter().collect(),
added_edge_types: edge_types.into_iter().collect(),
added_properties: properties.into_values().collect(),
}
}
}
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct ForkRegistryFile {
#[serde(default)]
pub forks: BTreeMap<String, ForkInfo>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn fork_id_roundtrip() {
let id = ForkId::new();
let s = id.to_string();
let parsed = ForkId::parse(&s).unwrap();
assert_eq!(id, parsed);
}
#[test]
fn fork_info_serde_roundtrip() {
let info = ForkInfo::new_pending(ForkId::new(), "scenario_1", "snap-abc", 17);
let json = serde_json::to_string(&info).unwrap();
let parsed: ForkInfo = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.id, info.id);
assert_eq!(parsed.name, "scenario_1");
assert_eq!(parsed.parent_snapshot_id, "snap-abc");
assert_eq!(parsed.schema_version_at_creation, 17);
assert_eq!(parsed.status, ForkStatus::Pending);
assert!(parsed.datasets.is_empty());
assert!(parsed.parent_fork_id.is_none());
assert!(parsed.ttl_expires_at.is_none());
}
#[test]
fn registry_file_default_empty() {
let file = ForkRegistryFile::default();
let json = serde_json::to_string(&file).unwrap();
let parsed: ForkRegistryFile = serde_json::from_str(&json).unwrap();
assert!(parsed.forks.is_empty());
}
#[test]
fn schema_delta_default_is_empty() {
let d = SchemaDelta::default();
assert!(d.is_empty());
}
fn label_meta(id: u16) -> LabelMeta {
use crate::core::schema::SchemaElementState;
LabelMeta {
id,
created_at: chrono::Utc::now(),
state: SchemaElementState::Active,
description: None,
}
}
fn edge_type_meta(id: u32) -> EdgeTypeMeta {
use crate::core::schema::SchemaElementState;
EdgeTypeMeta {
id,
src_labels: vec!["A".into()],
dst_labels: vec!["A".into()],
state: SchemaElementState::Active,
description: None,
}
}
#[test]
fn merge_atop_unions_disjoint_labels_and_edge_types() {
let base = SchemaDelta {
added_labels: vec![("A".into(), label_meta(1))],
added_edge_types: vec![("E1".into(), edge_type_meta(10))],
..Default::default()
};
let top = SchemaDelta {
added_labels: vec![("B".into(), label_meta(2))],
added_edge_types: vec![("E2".into(), edge_type_meta(20))],
..Default::default()
};
let merged = top.merge_atop(&base);
let label_names: Vec<&str> = merged
.added_labels
.iter()
.map(|(n, _)| n.as_str())
.collect();
assert!(label_names.contains(&"A") && label_names.contains(&"B"));
let edge_names: Vec<&str> = merged
.added_edge_types
.iter()
.map(|(n, _)| n.as_str())
.collect();
assert!(edge_names.contains(&"E1") && edge_names.contains(&"E2"));
}
#[test]
fn merge_atop_self_wins_on_collision() {
let base = SchemaDelta {
added_labels: vec![("A".into(), label_meta(100))],
..Default::default()
};
let top = SchemaDelta {
added_labels: vec![("A".into(), label_meta(200))],
..Default::default()
};
let merged = top.merge_atop(&base);
assert_eq!(merged.added_labels.len(), 1);
assert_eq!(merged.added_labels[0].1.id, 200, "self must win");
}
#[test]
fn merge_atop_empty_base_is_self() {
let top = SchemaDelta {
added_labels: vec![("A".into(), label_meta(1))],
..Default::default()
};
let merged = top.merge_atop(&SchemaDelta::empty());
assert_eq!(merged.added_labels.len(), 1);
assert_eq!(merged.added_labels[0].0, "A");
}
#[test]
fn merge_atop_empty_self_is_base() {
let base = SchemaDelta {
added_labels: vec![("A".into(), label_meta(1))],
..Default::default()
};
let merged = SchemaDelta::empty().merge_atop(&base);
assert_eq!(merged.added_labels.len(), 1);
assert_eq!(merged.added_labels[0].0, "A");
}
#[test]
fn merge_atop_dedupes_properties_by_owner_and_name() {
let base_add = PropertyAddition {
owner: "Person".into(),
owner_kind: PropertyOwnerKind::Label,
property: "age".into(),
data_type: DataType::Int64,
nullable: true,
};
let top_add = PropertyAddition {
owner: "Person".into(),
owner_kind: PropertyOwnerKind::Label,
property: "age".into(),
data_type: DataType::String, nullable: false,
};
let base = SchemaDelta {
added_properties: vec![base_add],
..Default::default()
};
let top = SchemaDelta {
added_properties: vec![top_add],
..Default::default()
};
let merged = top.merge_atop(&base);
assert_eq!(merged.added_properties.len(), 1);
assert!(matches!(
merged.added_properties[0].data_type,
DataType::String
));
assert!(!merged.added_properties[0].nullable);
}
}