Skip to main content

nusy_arrow_git/
commit.rs

1//! Commit — snapshot current graph state to Parquet + record in CommitsTable.
2//!
3//! A commit captures:
4//! - All namespace RecordBatches as Parquet files
5//! - A CommitsTable row with commit_id, parents, message, author, timestamp
6
7use crate::object_store::GitObjectStore;
8use arrow::array::{ArrayRef, RecordBatch, StringArray, TimestampMillisecondArray};
9use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
10use nusy_arrow_core::Namespace;
11use nusy_arrow_core::schema::TRIPLES_SCHEMA_VERSION;
12use parquet::arrow::ArrowWriter;
13use parquet::file::properties::WriterProperties;
14use std::fs;
15use std::sync::Arc;
16
17/// Errors from commit operations.
18#[derive(Debug, thiserror::Error)]
19pub enum CommitError {
20    #[error("Arrow error: {0}")]
21    Arrow(#[from] arrow::error::ArrowError),
22
23    #[error("Parquet error: {0}")]
24    Parquet(#[from] parquet::errors::ParquetError),
25
26    #[error("IO error: {0}")]
27    Io(#[from] std::io::Error),
28
29    #[error("Commit not found: {0}")]
30    NotFound(String),
31}
32
33pub type Result<T> = std::result::Result<T, CommitError>;
34
35/// Schema for the Commits table.
36pub fn commits_schema() -> Schema {
37    Schema::new(vec![
38        Field::new("commit_id", DataType::Utf8, false),
39        Field::new(
40            "parent_ids",
41            DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
42            false,
43        ),
44        Field::new(
45            "timestamp",
46            DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
47            false,
48        ),
49        Field::new("message", DataType::Utf8, false),
50        Field::new("author", DataType::Utf8, false),
51    ])
52}
53
54/// A commit record.
55#[derive(Debug, Clone)]
56pub struct Commit {
57    pub commit_id: String,
58    pub parent_ids: Vec<String>,
59    pub timestamp_ms: i64,
60    pub message: String,
61    pub author: String,
62}
63
64/// The commits history table.
65pub struct CommitsTable {
66    schema: Arc<Schema>,
67    commits: Vec<Commit>,
68}
69
70impl CommitsTable {
71    pub fn new() -> Self {
72        CommitsTable {
73            schema: Arc::new(commits_schema()),
74            commits: Vec::new(),
75        }
76    }
77
78    /// Append a commit record.
79    pub fn append(&mut self, commit: Commit) {
80        self.commits.push(commit);
81    }
82
83    /// Get a commit by ID.
84    pub fn get(&self, commit_id: &str) -> Option<&Commit> {
85        self.commits.iter().find(|c| c.commit_id == commit_id)
86    }
87
88    /// Get all commits (ordered by insertion = chronological).
89    pub fn all(&self) -> &[Commit] {
90        &self.commits
91    }
92
93    /// Number of commits.
94    pub fn len(&self) -> usize {
95        self.commits.len()
96    }
97
98    pub fn is_empty(&self) -> bool {
99        self.commits.is_empty()
100    }
101
102    /// Convert to Arrow RecordBatch.
103    pub fn to_record_batch(&self) -> Result<RecordBatch> {
104        let n = self.commits.len();
105        if n == 0 {
106            return Ok(RecordBatch::new_empty(self.schema.clone()));
107        }
108
109        let ids: Vec<&str> = self.commits.iter().map(|c| c.commit_id.as_str()).collect();
110        let timestamps: Vec<i64> = self.commits.iter().map(|c| c.timestamp_ms).collect();
111        let messages: Vec<&str> = self.commits.iter().map(|c| c.message.as_str()).collect();
112        let authors: Vec<&str> = self.commits.iter().map(|c| c.author.as_str()).collect();
113
114        // Build parent_ids as List<Utf8>
115        let parent_ids_list = build_parent_ids_list(&self.commits);
116
117        let batch = RecordBatch::try_new(
118            self.schema.clone(),
119            vec![
120                Arc::new(StringArray::from(ids)),
121                parent_ids_list,
122                Arc::new(TimestampMillisecondArray::from(timestamps).with_timezone("UTC")),
123                Arc::new(StringArray::from(messages)),
124                Arc::new(StringArray::from(authors)),
125            ],
126        )?;
127        Ok(batch)
128    }
129}
130
131impl Default for CommitsTable {
132    fn default() -> Self {
133        Self::new()
134    }
135}
136
137/// Build a ListArray of parent_ids from commits.
138fn build_parent_ids_list(commits: &[Commit]) -> ArrayRef {
139    use arrow::array::ListBuilder;
140    use arrow::array::StringBuilder;
141
142    let mut builder = ListBuilder::new(StringBuilder::new());
143    for commit in commits {
144        for pid in &commit.parent_ids {
145            builder.values().append_value(pid);
146        }
147        builder.append(true);
148    }
149    Arc::new(builder.finish())
150}
151
152/// Create a commit: snapshot all namespaces to Parquet, record in CommitsTable.
153///
154/// Returns the new Commit.
155pub fn create_commit(
156    obj_store: &GitObjectStore,
157    commits_table: &mut CommitsTable,
158    parent_ids: Vec<String>,
159    message: &str,
160    author: &str,
161) -> Result<Commit> {
162    let commit_id = uuid::Uuid::new_v4().to_string();
163    let now_ms = chrono::Utc::now().timestamp_millis();
164
165    // Create snapshot directory
166    let snap_dir = obj_store.commit_snapshot_dir(&commit_id);
167    fs::create_dir_all(&snap_dir)?;
168
169    // Write each namespace to Parquet
170    for ns in Namespace::ALL {
171        let batches = obj_store.store.get_namespace_batches(ns);
172        if batches.is_empty() {
173            continue;
174        }
175
176        let path = obj_store.namespace_parquet_path(&commit_id, ns.as_str());
177        let schema = obj_store.store.schema().clone();
178        let file = fs::File::create(&path)?;
179        let props = WriterProperties::builder()
180            .set_key_value_metadata(Some(vec![parquet::file::metadata::KeyValue {
181                key: "nusy_schema_version".to_string(),
182                value: Some(TRIPLES_SCHEMA_VERSION.to_string()),
183            }]))
184            .build();
185        let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
186
187        for batch in batches {
188            writer.write(batch)?;
189        }
190        writer.close()?;
191    }
192
193    let commit = Commit {
194        commit_id,
195        parent_ids,
196        timestamp_ms: now_ms,
197        message: message.to_string(),
198        author: author.to_string(),
199    };
200
201    commits_table.append(commit.clone());
202    Ok(commit)
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208    use nusy_arrow_core::{Namespace, Triple, YLayer};
209
210    fn sample_triple(subj: &str) -> Triple {
211        Triple {
212            subject: subj.to_string(),
213            predicate: "rdf:type".to_string(),
214            object: "Thing".to_string(),
215            graph: None,
216            confidence: Some(0.9),
217            source_document: None,
218            source_chunk_id: None,
219            extracted_by: None,
220            caused_by: None,
221            derived_from: None,
222            consolidated_at: None,
223            certifiability_class: None,
224        }
225    }
226
227    #[test]
228    fn test_commit_creates_parquet_files() {
229        let tmp = tempfile::tempdir().unwrap();
230        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
231        let mut commits = CommitsTable::new();
232
233        // Add some triples
234        for i in 0..10 {
235            obj.store
236                .add_triple(
237                    &sample_triple(&format!("s{i}")),
238                    Namespace::World,
239                    YLayer::Semantic,
240                )
241                .unwrap();
242        }
243
244        let commit = create_commit(&obj, &mut commits, vec![], "initial", "DGX").unwrap();
245
246        // Parquet file should exist for world namespace
247        let parquet_path = obj.namespace_parquet_path(&commit.commit_id, "world");
248        assert!(parquet_path.exists(), "Parquet file should exist");
249
250        // CommitsTable should have one entry
251        assert_eq!(commits.len(), 1);
252        assert_eq!(commits.get(&commit.commit_id).unwrap().message, "initial");
253    }
254
255    #[test]
256    fn test_multiple_commits_form_chain() {
257        let tmp = tempfile::tempdir().unwrap();
258        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
259        let mut commits = CommitsTable::new();
260
261        obj.store
262            .add_triple(&sample_triple("s1"), Namespace::World, YLayer::Semantic)
263            .unwrap();
264
265        let c1 = create_commit(&obj, &mut commits, vec![], "first", "DGX").unwrap();
266
267        obj.store
268            .add_triple(&sample_triple("s2"), Namespace::World, YLayer::Semantic)
269            .unwrap();
270
271        let c2 = create_commit(
272            &obj,
273            &mut commits,
274            vec![c1.commit_id.clone()],
275            "second",
276            "DGX",
277        )
278        .unwrap();
279
280        assert_eq!(commits.len(), 2);
281        assert_eq!(c2.parent_ids, vec![c1.commit_id]);
282    }
283
284    #[test]
285    fn test_commits_table_to_record_batch() {
286        let mut table = CommitsTable::new();
287        table.append(Commit {
288            commit_id: "c1".to_string(),
289            parent_ids: vec![],
290            timestamp_ms: 1000,
291            message: "init".to_string(),
292            author: "DGX".to_string(),
293        });
294        table.append(Commit {
295            commit_id: "c2".to_string(),
296            parent_ids: vec!["c1".to_string()],
297            timestamp_ms: 2000,
298            message: "second".to_string(),
299            author: "DGX".to_string(),
300        });
301
302        let batch = table.to_record_batch().unwrap();
303        assert_eq!(batch.num_rows(), 2);
304        assert_eq!(batch.num_columns(), 5);
305    }
306}