1use crate::object_store::GitObjectStore;
8use arrow::array::{ArrayRef, RecordBatch, StringArray, TimestampMillisecondArray};
9use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
10use arrow_graph_core::schema::TRIPLES_SCHEMA_VERSION;
11use parquet::arrow::ArrowWriter;
12use parquet::file::properties::WriterProperties;
13use std::fs;
14use std::sync::Arc;
15
16#[derive(Debug, thiserror::Error)]
18pub enum CommitError {
19 #[error("Arrow error: {0}")]
20 Arrow(#[from] arrow::error::ArrowError),
21
22 #[error("Parquet error: {0}")]
23 Parquet(#[from] parquet::errors::ParquetError),
24
25 #[error("IO error: {0}")]
26 Io(#[from] std::io::Error),
27
28 #[error("Commit not found: {0}")]
29 NotFound(String),
30}
31
32pub type Result<T> = std::result::Result<T, CommitError>;
33
34pub fn commits_schema() -> Schema {
36 Schema::new(vec![
37 Field::new("commit_id", DataType::Utf8, false),
38 Field::new(
39 "parent_ids",
40 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
41 false,
42 ),
43 Field::new(
44 "timestamp",
45 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
46 false,
47 ),
48 Field::new("message", DataType::Utf8, false),
49 Field::new("author", DataType::Utf8, false),
50 ])
51}
52
53#[derive(Debug, Clone)]
55pub struct Commit {
56 pub commit_id: String,
57 pub parent_ids: Vec<String>,
58 pub timestamp_ms: i64,
59 pub message: String,
60 pub author: String,
61}
62
63pub struct CommitsTable {
65 schema: Arc<Schema>,
66 commits: Vec<Commit>,
67}
68
69impl CommitsTable {
70 pub fn new() -> Self {
71 CommitsTable {
72 schema: Arc::new(commits_schema()),
73 commits: Vec::new(),
74 }
75 }
76
77 pub fn append(&mut self, commit: Commit) {
79 self.commits.push(commit);
80 }
81
82 pub fn get(&self, commit_id: &str) -> Option<&Commit> {
84 self.commits.iter().find(|c| c.commit_id == commit_id)
85 }
86
87 pub fn all(&self) -> &[Commit] {
89 &self.commits
90 }
91
92 pub fn len(&self) -> usize {
94 self.commits.len()
95 }
96
97 pub fn is_empty(&self) -> bool {
98 self.commits.is_empty()
99 }
100
101 pub fn to_record_batch(&self) -> Result<RecordBatch> {
103 let n = self.commits.len();
104 if n == 0 {
105 return Ok(RecordBatch::new_empty(self.schema.clone()));
106 }
107
108 let ids: Vec<&str> = self.commits.iter().map(|c| c.commit_id.as_str()).collect();
109 let timestamps: Vec<i64> = self.commits.iter().map(|c| c.timestamp_ms).collect();
110 let messages: Vec<&str> = self.commits.iter().map(|c| c.message.as_str()).collect();
111 let authors: Vec<&str> = self.commits.iter().map(|c| c.author.as_str()).collect();
112
113 let parent_ids_list = build_parent_ids_list(&self.commits);
115
116 let batch = RecordBatch::try_new(
117 self.schema.clone(),
118 vec![
119 Arc::new(StringArray::from(ids)),
120 parent_ids_list,
121 Arc::new(TimestampMillisecondArray::from(timestamps).with_timezone("UTC")),
122 Arc::new(StringArray::from(messages)),
123 Arc::new(StringArray::from(authors)),
124 ],
125 )?;
126 Ok(batch)
127 }
128}
129
130impl Default for CommitsTable {
131 fn default() -> Self {
132 Self::new()
133 }
134}
135
136fn build_parent_ids_list(commits: &[Commit]) -> ArrayRef {
138 use arrow::array::ListBuilder;
139 use arrow::array::StringBuilder;
140
141 let mut builder = ListBuilder::new(StringBuilder::new());
142 for commit in commits {
143 for pid in &commit.parent_ids {
144 builder.values().append_value(pid);
145 }
146 builder.append(true);
147 }
148 Arc::new(builder.finish())
149}
150
151pub fn create_commit(
155 obj_store: &GitObjectStore,
156 commits_table: &mut CommitsTable,
157 parent_ids: Vec<String>,
158 message: &str,
159 author: &str,
160) -> Result<Commit> {
161 let commit_id = uuid::Uuid::new_v4().to_string();
162 let now_ms = chrono::Utc::now().timestamp_millis();
163
164 let snap_dir = obj_store.commit_snapshot_dir(&commit_id);
166 fs::create_dir_all(&snap_dir)?;
167
168 for ns in obj_store.store.namespaces() {
170 let batches = obj_store.store.get_namespace_batches(ns);
171 if batches.is_empty() {
172 continue;
173 }
174
175 let path = obj_store.namespace_parquet_path(&commit_id, ns);
176 let schema = obj_store.store.schema().clone();
177 let file = fs::File::create(&path)?;
178 let props = WriterProperties::builder()
179 .set_key_value_metadata(Some(vec![parquet::format::KeyValue {
180 key: "schema_version".to_string(),
181 value: Some(TRIPLES_SCHEMA_VERSION.to_string()),
182 }]))
183 .build();
184 let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
185
186 for batch in batches {
187 writer.write(batch)?;
188 }
189 writer.close()?;
190 }
191
192 let commit = Commit {
193 commit_id,
194 parent_ids,
195 timestamp_ms: now_ms,
196 message: message.to_string(),
197 author: author.to_string(),
198 };
199
200 commits_table.append(commit.clone());
201 Ok(commit)
202}
203
204#[cfg(test)]
205mod tests {
206 use super::*;
207 use arrow_graph_core::Triple;
208
209 fn sample_triple(subj: &str) -> Triple {
210 Triple {
211 subject: subj.to_string(),
212 predicate: "rdf:type".to_string(),
213 object: "Thing".to_string(),
214 graph: None,
215 confidence: Some(0.9),
216 source_document: None,
217 source_chunk_id: None,
218 extracted_by: None,
219 caused_by: None,
220 derived_from: None,
221 consolidated_at: None,
222 }
223 }
224
225 #[test]
226 fn test_commit_creates_parquet_files() {
227 let tmp = tempfile::tempdir().unwrap();
228 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
229 let mut commits = CommitsTable::new();
230
231 for i in 0..10 {
233 obj.store
234 .add_triple(&sample_triple(&format!("s{i}")), "world", Some(1u8))
235 .unwrap();
236 }
237
238 let commit = create_commit(&obj, &mut commits, vec![], "initial", "test").unwrap();
239
240 let parquet_path = obj.namespace_parquet_path(&commit.commit_id, "world");
242 assert!(parquet_path.exists(), "Parquet file should exist");
243
244 assert_eq!(commits.len(), 1);
246 assert_eq!(commits.get(&commit.commit_id).unwrap().message, "initial");
247 }
248
249 #[test]
250 fn test_multiple_commits_form_chain() {
251 let tmp = tempfile::tempdir().unwrap();
252 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
253 let mut commits = CommitsTable::new();
254
255 obj.store
256 .add_triple(&sample_triple("s1"), "world", Some(1u8))
257 .unwrap();
258
259 let c1 = create_commit(&obj, &mut commits, vec![], "first", "test").unwrap();
260
261 obj.store
262 .add_triple(&sample_triple("s2"), "world", Some(1u8))
263 .unwrap();
264
265 let c2 = create_commit(
266 &obj,
267 &mut commits,
268 vec![c1.commit_id.clone()],
269 "second",
270 "test",
271 )
272 .unwrap();
273
274 assert_eq!(commits.len(), 2);
275 assert_eq!(c2.parent_ids, vec![c1.commit_id]);
276 }
277
278 #[test]
279 fn test_commits_table_to_record_batch() {
280 let mut table = CommitsTable::new();
281 table.append(Commit {
282 commit_id: "c1".to_string(),
283 parent_ids: vec![],
284 timestamp_ms: 1000,
285 message: "init".to_string(),
286 author: "test".to_string(),
287 });
288 table.append(Commit {
289 commit_id: "c2".to_string(),
290 parent_ids: vec!["c1".to_string()],
291 timestamp_ms: 2000,
292 message: "second".to_string(),
293 author: "test".to_string(),
294 });
295
296 let batch = table.to_record_batch().unwrap();
297 assert_eq!(batch.num_rows(), 2);
298 assert_eq!(batch.num_columns(), 5);
299 }
300}