Skip to main content

arrow_graph_git/
commit.rs

1//! Commit — snapshot current graph state to Parquet + record in CommitsTable.
2//!
3//! A commit captures:
4//! - All namespace RecordBatches as Parquet files
5//! - A CommitsTable row with commit_id, parents, message, author, timestamp
6
7use crate::object_store::GitObjectStore;
8use arrow::array::{ArrayRef, RecordBatch, StringArray, TimestampMillisecondArray};
9use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
10use arrow_graph_core::schema::TRIPLES_SCHEMA_VERSION;
11use parquet::arrow::ArrowWriter;
12use parquet::file::properties::WriterProperties;
13use std::fs;
14use std::sync::Arc;
15
16/// Errors from commit operations.
17#[derive(Debug, thiserror::Error)]
18pub enum CommitError {
19    #[error("Arrow error: {0}")]
20    Arrow(#[from] arrow::error::ArrowError),
21
22    #[error("Parquet error: {0}")]
23    Parquet(#[from] parquet::errors::ParquetError),
24
25    #[error("IO error: {0}")]
26    Io(#[from] std::io::Error),
27
28    #[error("Commit not found: {0}")]
29    NotFound(String),
30}
31
32pub type Result<T> = std::result::Result<T, CommitError>;
33
34/// Schema for the Commits table.
35pub fn commits_schema() -> Schema {
36    Schema::new(vec![
37        Field::new("commit_id", DataType::Utf8, false),
38        Field::new(
39            "parent_ids",
40            DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
41            false,
42        ),
43        Field::new(
44            "timestamp",
45            DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
46            false,
47        ),
48        Field::new("message", DataType::Utf8, false),
49        Field::new("author", DataType::Utf8, false),
50    ])
51}
52
53/// A commit record.
54#[derive(Debug, Clone)]
55pub struct Commit {
56    pub commit_id: String,
57    pub parent_ids: Vec<String>,
58    pub timestamp_ms: i64,
59    pub message: String,
60    pub author: String,
61}
62
63/// The commits history table.
64pub struct CommitsTable {
65    schema: Arc<Schema>,
66    commits: Vec<Commit>,
67}
68
69impl CommitsTable {
70    pub fn new() -> Self {
71        CommitsTable {
72            schema: Arc::new(commits_schema()),
73            commits: Vec::new(),
74        }
75    }
76
77    /// Append a commit record.
78    pub fn append(&mut self, commit: Commit) {
79        self.commits.push(commit);
80    }
81
82    /// Get a commit by ID.
83    pub fn get(&self, commit_id: &str) -> Option<&Commit> {
84        self.commits.iter().find(|c| c.commit_id == commit_id)
85    }
86
87    /// Get all commits (ordered by insertion = chronological).
88    pub fn all(&self) -> &[Commit] {
89        &self.commits
90    }
91
92    /// Number of commits.
93    pub fn len(&self) -> usize {
94        self.commits.len()
95    }
96
97    pub fn is_empty(&self) -> bool {
98        self.commits.is_empty()
99    }
100
101    /// Convert to Arrow RecordBatch.
102    pub fn to_record_batch(&self) -> Result<RecordBatch> {
103        let n = self.commits.len();
104        if n == 0 {
105            return Ok(RecordBatch::new_empty(self.schema.clone()));
106        }
107
108        let ids: Vec<&str> = self.commits.iter().map(|c| c.commit_id.as_str()).collect();
109        let timestamps: Vec<i64> = self.commits.iter().map(|c| c.timestamp_ms).collect();
110        let messages: Vec<&str> = self.commits.iter().map(|c| c.message.as_str()).collect();
111        let authors: Vec<&str> = self.commits.iter().map(|c| c.author.as_str()).collect();
112
113        // Build parent_ids as List<Utf8>
114        let parent_ids_list = build_parent_ids_list(&self.commits);
115
116        let batch = RecordBatch::try_new(
117            self.schema.clone(),
118            vec![
119                Arc::new(StringArray::from(ids)),
120                parent_ids_list,
121                Arc::new(TimestampMillisecondArray::from(timestamps).with_timezone("UTC")),
122                Arc::new(StringArray::from(messages)),
123                Arc::new(StringArray::from(authors)),
124            ],
125        )?;
126        Ok(batch)
127    }
128}
129
130impl Default for CommitsTable {
131    fn default() -> Self {
132        Self::new()
133    }
134}
135
136/// Build a ListArray of parent_ids from commits.
137fn build_parent_ids_list(commits: &[Commit]) -> ArrayRef {
138    use arrow::array::ListBuilder;
139    use arrow::array::StringBuilder;
140
141    let mut builder = ListBuilder::new(StringBuilder::new());
142    for commit in commits {
143        for pid in &commit.parent_ids {
144            builder.values().append_value(pid);
145        }
146        builder.append(true);
147    }
148    Arc::new(builder.finish())
149}
150
151/// Create a commit: snapshot all namespaces to Parquet, record in CommitsTable.
152///
153/// Returns the new Commit.
154pub fn create_commit(
155    obj_store: &GitObjectStore,
156    commits_table: &mut CommitsTable,
157    parent_ids: Vec<String>,
158    message: &str,
159    author: &str,
160) -> Result<Commit> {
161    let commit_id = uuid::Uuid::new_v4().to_string();
162    let now_ms = chrono::Utc::now().timestamp_millis();
163
164    // Create snapshot directory
165    let snap_dir = obj_store.commit_snapshot_dir(&commit_id);
166    fs::create_dir_all(&snap_dir)?;
167
168    // Write each namespace to Parquet
169    for ns in obj_store.store.namespaces() {
170        let batches = obj_store.store.get_namespace_batches(ns);
171        if batches.is_empty() {
172            continue;
173        }
174
175        let path = obj_store.namespace_parquet_path(&commit_id, ns);
176        let schema = obj_store.store.schema().clone();
177        let file = fs::File::create(&path)?;
178        let props = WriterProperties::builder()
179            .set_key_value_metadata(Some(vec![parquet::format::KeyValue {
180                key: "schema_version".to_string(),
181                value: Some(TRIPLES_SCHEMA_VERSION.to_string()),
182            }]))
183            .build();
184        let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
185
186        for batch in batches {
187            writer.write(batch)?;
188        }
189        writer.close()?;
190    }
191
192    let commit = Commit {
193        commit_id,
194        parent_ids,
195        timestamp_ms: now_ms,
196        message: message.to_string(),
197        author: author.to_string(),
198    };
199
200    commits_table.append(commit.clone());
201    Ok(commit)
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207    use arrow_graph_core::Triple;
208
209    fn sample_triple(subj: &str) -> Triple {
210        Triple {
211            subject: subj.to_string(),
212            predicate: "rdf:type".to_string(),
213            object: "Thing".to_string(),
214            graph: None,
215            confidence: Some(0.9),
216            source_document: None,
217            source_chunk_id: None,
218            extracted_by: None,
219            caused_by: None,
220            derived_from: None,
221            consolidated_at: None,
222        }
223    }
224
225    #[test]
226    fn test_commit_creates_parquet_files() {
227        let tmp = tempfile::tempdir().unwrap();
228        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
229        let mut commits = CommitsTable::new();
230
231        // Add some triples
232        for i in 0..10 {
233            obj.store
234                .add_triple(&sample_triple(&format!("s{i}")), "world", Some(1u8))
235                .unwrap();
236        }
237
238        let commit = create_commit(&obj, &mut commits, vec![], "initial", "test").unwrap();
239
240        // Parquet file should exist for world namespace
241        let parquet_path = obj.namespace_parquet_path(&commit.commit_id, "world");
242        assert!(parquet_path.exists(), "Parquet file should exist");
243
244        // CommitsTable should have one entry
245        assert_eq!(commits.len(), 1);
246        assert_eq!(commits.get(&commit.commit_id).unwrap().message, "initial");
247    }
248
249    #[test]
250    fn test_multiple_commits_form_chain() {
251        let tmp = tempfile::tempdir().unwrap();
252        let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
253        let mut commits = CommitsTable::new();
254
255        obj.store
256            .add_triple(&sample_triple("s1"), "world", Some(1u8))
257            .unwrap();
258
259        let c1 = create_commit(&obj, &mut commits, vec![], "first", "test").unwrap();
260
261        obj.store
262            .add_triple(&sample_triple("s2"), "world", Some(1u8))
263            .unwrap();
264
265        let c2 = create_commit(
266            &obj,
267            &mut commits,
268            vec![c1.commit_id.clone()],
269            "second",
270            "test",
271        )
272        .unwrap();
273
274        assert_eq!(commits.len(), 2);
275        assert_eq!(c2.parent_ids, vec![c1.commit_id]);
276    }
277
278    #[test]
279    fn test_commits_table_to_record_batch() {
280        let mut table = CommitsTable::new();
281        table.append(Commit {
282            commit_id: "c1".to_string(),
283            parent_ids: vec![],
284            timestamp_ms: 1000,
285            message: "init".to_string(),
286            author: "test".to_string(),
287        });
288        table.append(Commit {
289            commit_id: "c2".to_string(),
290            parent_ids: vec!["c1".to_string()],
291            timestamp_ms: 2000,
292            message: "second".to_string(),
293            author: "test".to_string(),
294        });
295
296        let batch = table.to_record_batch().unwrap();
297        assert_eq!(batch.num_rows(), 2);
298        assert_eq!(batch.num_columns(), 5);
299    }
300}