use crate::object_store::GitObjectStore;
use arrow::array::{ArrayRef, RecordBatch, StringArray, TimestampMillisecondArray};
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use nusy_arrow_core::Namespace;
use nusy_arrow_core::schema::TRIPLES_SCHEMA_VERSION;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use std::fs;
use std::sync::Arc;
#[derive(Debug, thiserror::Error)]
pub enum CommitError {
#[error("Arrow error: {0}")]
Arrow(#[from] arrow::error::ArrowError),
#[error("Parquet error: {0}")]
Parquet(#[from] parquet::errors::ParquetError),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Commit not found: {0}")]
NotFound(String),
}
pub type Result<T> = std::result::Result<T, CommitError>;
pub fn commits_schema() -> Schema {
Schema::new(vec![
Field::new("commit_id", DataType::Utf8, false),
Field::new(
"parent_ids",
DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
false,
),
Field::new(
"timestamp",
DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
false,
),
Field::new("message", DataType::Utf8, false),
Field::new("author", DataType::Utf8, false),
])
}
#[derive(Debug, Clone)]
pub struct Commit {
pub commit_id: String,
pub parent_ids: Vec<String>,
pub timestamp_ms: i64,
pub message: String,
pub author: String,
}
pub struct CommitsTable {
schema: Arc<Schema>,
commits: Vec<Commit>,
}
impl CommitsTable {
pub fn new() -> Self {
CommitsTable {
schema: Arc::new(commits_schema()),
commits: Vec::new(),
}
}
pub fn append(&mut self, commit: Commit) {
self.commits.push(commit);
}
pub fn get(&self, commit_id: &str) -> Option<&Commit> {
self.commits.iter().find(|c| c.commit_id == commit_id)
}
pub fn all(&self) -> &[Commit] {
&self.commits
}
pub fn len(&self) -> usize {
self.commits.len()
}
pub fn is_empty(&self) -> bool {
self.commits.is_empty()
}
pub fn to_record_batch(&self) -> Result<RecordBatch> {
let n = self.commits.len();
if n == 0 {
return Ok(RecordBatch::new_empty(self.schema.clone()));
}
let ids: Vec<&str> = self.commits.iter().map(|c| c.commit_id.as_str()).collect();
let timestamps: Vec<i64> = self.commits.iter().map(|c| c.timestamp_ms).collect();
let messages: Vec<&str> = self.commits.iter().map(|c| c.message.as_str()).collect();
let authors: Vec<&str> = self.commits.iter().map(|c| c.author.as_str()).collect();
let parent_ids_list = build_parent_ids_list(&self.commits);
let batch = RecordBatch::try_new(
self.schema.clone(),
vec![
Arc::new(StringArray::from(ids)),
parent_ids_list,
Arc::new(TimestampMillisecondArray::from(timestamps).with_timezone("UTC")),
Arc::new(StringArray::from(messages)),
Arc::new(StringArray::from(authors)),
],
)?;
Ok(batch)
}
}
impl Default for CommitsTable {
fn default() -> Self {
Self::new()
}
}
fn build_parent_ids_list(commits: &[Commit]) -> ArrayRef {
use arrow::array::ListBuilder;
use arrow::array::StringBuilder;
let mut builder = ListBuilder::new(StringBuilder::new());
for commit in commits {
for pid in &commit.parent_ids {
builder.values().append_value(pid);
}
builder.append(true);
}
Arc::new(builder.finish())
}
pub fn create_commit(
obj_store: &GitObjectStore,
commits_table: &mut CommitsTable,
parent_ids: Vec<String>,
message: &str,
author: &str,
) -> Result<Commit> {
let commit_id = uuid::Uuid::new_v4().to_string();
let now_ms = chrono::Utc::now().timestamp_millis();
let snap_dir = obj_store.commit_snapshot_dir(&commit_id);
fs::create_dir_all(&snap_dir)?;
for ns in Namespace::ALL {
let batches = obj_store.store.get_namespace_batches(ns);
if batches.is_empty() {
continue;
}
let path = obj_store.namespace_parquet_path(&commit_id, ns.as_str());
let schema = obj_store.store.schema().clone();
let file = fs::File::create(&path)?;
let props = WriterProperties::builder()
.set_key_value_metadata(Some(vec![parquet::file::metadata::KeyValue {
key: "nusy_schema_version".to_string(),
value: Some(TRIPLES_SCHEMA_VERSION.to_string()),
}]))
.build();
let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
for batch in batches {
writer.write(batch)?;
}
writer.close()?;
}
let commit = Commit {
commit_id,
parent_ids,
timestamp_ms: now_ms,
message: message.to_string(),
author: author.to_string(),
};
commits_table.append(commit.clone());
Ok(commit)
}
#[cfg(test)]
mod tests {
use super::*;
use nusy_arrow_core::{Namespace, Triple, YLayer};
fn sample_triple(subj: &str) -> Triple {
Triple {
subject: subj.to_string(),
predicate: "rdf:type".to_string(),
object: "Thing".to_string(),
graph: None,
confidence: Some(0.9),
source_document: None,
source_chunk_id: None,
extracted_by: None,
caused_by: None,
derived_from: None,
consolidated_at: None,
certifiability_class: None,
}
}
#[test]
fn test_commit_creates_parquet_files() {
let tmp = tempfile::tempdir().unwrap();
let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
let mut commits = CommitsTable::new();
for i in 0..10 {
obj.store
.add_triple(
&sample_triple(&format!("s{i}")),
Namespace::World,
YLayer::Semantic,
)
.unwrap();
}
let commit = create_commit(&obj, &mut commits, vec![], "initial", "DGX").unwrap();
let parquet_path = obj.namespace_parquet_path(&commit.commit_id, "world");
assert!(parquet_path.exists(), "Parquet file should exist");
assert_eq!(commits.len(), 1);
assert_eq!(commits.get(&commit.commit_id).unwrap().message, "initial");
}
#[test]
fn test_multiple_commits_form_chain() {
let tmp = tempfile::tempdir().unwrap();
let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
let mut commits = CommitsTable::new();
obj.store
.add_triple(&sample_triple("s1"), Namespace::World, YLayer::Semantic)
.unwrap();
let c1 = create_commit(&obj, &mut commits, vec![], "first", "DGX").unwrap();
obj.store
.add_triple(&sample_triple("s2"), Namespace::World, YLayer::Semantic)
.unwrap();
let c2 = create_commit(
&obj,
&mut commits,
vec![c1.commit_id.clone()],
"second",
"DGX",
)
.unwrap();
assert_eq!(commits.len(), 2);
assert_eq!(c2.parent_ids, vec![c1.commit_id]);
}
#[test]
fn test_commits_table_to_record_batch() {
let mut table = CommitsTable::new();
table.append(Commit {
commit_id: "c1".to_string(),
parent_ids: vec![],
timestamp_ms: 1000,
message: "init".to_string(),
author: "DGX".to_string(),
});
table.append(Commit {
commit_id: "c2".to_string(),
parent_ids: vec!["c1".to_string()],
timestamp_ms: 2000,
message: "second".to_string(),
author: "DGX".to_string(),
});
let batch = table.to_record_batch().unwrap();
assert_eq!(batch.num_rows(), 2);
assert_eq!(batch.num_columns(), 5);
}
}