1use crate::object_store::GitObjectStore;
8use arrow::array::{ArrayRef, RecordBatch, StringArray, TimestampMillisecondArray};
9use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
10use nusy_arrow_core::Namespace;
11use nusy_arrow_core::schema::TRIPLES_SCHEMA_VERSION;
12use parquet::arrow::ArrowWriter;
13use parquet::file::properties::WriterProperties;
14use std::fs;
15use std::sync::Arc;
16
17#[derive(Debug, thiserror::Error)]
19pub enum CommitError {
20 #[error("Arrow error: {0}")]
21 Arrow(#[from] arrow::error::ArrowError),
22
23 #[error("Parquet error: {0}")]
24 Parquet(#[from] parquet::errors::ParquetError),
25
26 #[error("IO error: {0}")]
27 Io(#[from] std::io::Error),
28
29 #[error("Commit not found: {0}")]
30 NotFound(String),
31}
32
33pub type Result<T> = std::result::Result<T, CommitError>;
34
35pub fn commits_schema() -> Schema {
37 Schema::new(vec![
38 Field::new("commit_id", DataType::Utf8, false),
39 Field::new(
40 "parent_ids",
41 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
42 false,
43 ),
44 Field::new(
45 "timestamp",
46 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
47 false,
48 ),
49 Field::new("message", DataType::Utf8, false),
50 Field::new("author", DataType::Utf8, false),
51 ])
52}
53
54#[derive(Debug, Clone)]
56pub struct Commit {
57 pub commit_id: String,
58 pub parent_ids: Vec<String>,
59 pub timestamp_ms: i64,
60 pub message: String,
61 pub author: String,
62}
63
64pub struct CommitsTable {
66 schema: Arc<Schema>,
67 commits: Vec<Commit>,
68}
69
70impl CommitsTable {
71 pub fn new() -> Self {
72 CommitsTable {
73 schema: Arc::new(commits_schema()),
74 commits: Vec::new(),
75 }
76 }
77
78 pub fn append(&mut self, commit: Commit) {
80 self.commits.push(commit);
81 }
82
83 pub fn get(&self, commit_id: &str) -> Option<&Commit> {
85 self.commits.iter().find(|c| c.commit_id == commit_id)
86 }
87
88 pub fn all(&self) -> &[Commit] {
90 &self.commits
91 }
92
93 pub fn len(&self) -> usize {
95 self.commits.len()
96 }
97
98 pub fn is_empty(&self) -> bool {
99 self.commits.is_empty()
100 }
101
102 pub fn to_record_batch(&self) -> Result<RecordBatch> {
104 let n = self.commits.len();
105 if n == 0 {
106 return Ok(RecordBatch::new_empty(self.schema.clone()));
107 }
108
109 let ids: Vec<&str> = self.commits.iter().map(|c| c.commit_id.as_str()).collect();
110 let timestamps: Vec<i64> = self.commits.iter().map(|c| c.timestamp_ms).collect();
111 let messages: Vec<&str> = self.commits.iter().map(|c| c.message.as_str()).collect();
112 let authors: Vec<&str> = self.commits.iter().map(|c| c.author.as_str()).collect();
113
114 let parent_ids_list = build_parent_ids_list(&self.commits);
116
117 let batch = RecordBatch::try_new(
118 self.schema.clone(),
119 vec![
120 Arc::new(StringArray::from(ids)),
121 parent_ids_list,
122 Arc::new(TimestampMillisecondArray::from(timestamps).with_timezone("UTC")),
123 Arc::new(StringArray::from(messages)),
124 Arc::new(StringArray::from(authors)),
125 ],
126 )?;
127 Ok(batch)
128 }
129}
130
131impl Default for CommitsTable {
132 fn default() -> Self {
133 Self::new()
134 }
135}
136
137fn build_parent_ids_list(commits: &[Commit]) -> ArrayRef {
139 use arrow::array::ListBuilder;
140 use arrow::array::StringBuilder;
141
142 let mut builder = ListBuilder::new(StringBuilder::new());
143 for commit in commits {
144 for pid in &commit.parent_ids {
145 builder.values().append_value(pid);
146 }
147 builder.append(true);
148 }
149 Arc::new(builder.finish())
150}
151
152pub fn create_commit(
156 obj_store: &GitObjectStore,
157 commits_table: &mut CommitsTable,
158 parent_ids: Vec<String>,
159 message: &str,
160 author: &str,
161) -> Result<Commit> {
162 let commit_id = uuid::Uuid::new_v4().to_string();
163 let now_ms = chrono::Utc::now().timestamp_millis();
164
165 let snap_dir = obj_store.commit_snapshot_dir(&commit_id);
167 fs::create_dir_all(&snap_dir)?;
168
169 for ns in Namespace::ALL {
171 let batches = obj_store.store.get_namespace_batches(ns);
172 if batches.is_empty() {
173 continue;
174 }
175
176 let path = obj_store.namespace_parquet_path(&commit_id, ns.as_str());
177 let schema = obj_store.store.schema().clone();
178 let file = fs::File::create(&path)?;
179 let props = WriterProperties::builder()
180 .set_key_value_metadata(Some(vec![parquet::file::metadata::KeyValue {
181 key: "nusy_schema_version".to_string(),
182 value: Some(TRIPLES_SCHEMA_VERSION.to_string()),
183 }]))
184 .build();
185 let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
186
187 for batch in batches {
188 writer.write(batch)?;
189 }
190 writer.close()?;
191 }
192
193 let commit = Commit {
194 commit_id,
195 parent_ids,
196 timestamp_ms: now_ms,
197 message: message.to_string(),
198 author: author.to_string(),
199 };
200
201 commits_table.append(commit.clone());
202 Ok(commit)
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208 use nusy_arrow_core::{Namespace, Triple, YLayer};
209
210 fn sample_triple(subj: &str) -> Triple {
211 Triple {
212 subject: subj.to_string(),
213 predicate: "rdf:type".to_string(),
214 object: "Thing".to_string(),
215 graph: None,
216 confidence: Some(0.9),
217 source_document: None,
218 source_chunk_id: None,
219 extracted_by: None,
220 caused_by: None,
221 derived_from: None,
222 consolidated_at: None,
223 certifiability_class: None,
224 }
225 }
226
227 #[test]
228 fn test_commit_creates_parquet_files() {
229 let tmp = tempfile::tempdir().unwrap();
230 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
231 let mut commits = CommitsTable::new();
232
233 for i in 0..10 {
235 obj.store
236 .add_triple(
237 &sample_triple(&format!("s{i}")),
238 Namespace::World,
239 YLayer::Semantic,
240 )
241 .unwrap();
242 }
243
244 let commit = create_commit(&obj, &mut commits, vec![], "initial", "DGX").unwrap();
245
246 let parquet_path = obj.namespace_parquet_path(&commit.commit_id, "world");
248 assert!(parquet_path.exists(), "Parquet file should exist");
249
250 assert_eq!(commits.len(), 1);
252 assert_eq!(commits.get(&commit.commit_id).unwrap().message, "initial");
253 }
254
255 #[test]
256 fn test_multiple_commits_form_chain() {
257 let tmp = tempfile::tempdir().unwrap();
258 let mut obj = GitObjectStore::with_snapshot_dir(tmp.path());
259 let mut commits = CommitsTable::new();
260
261 obj.store
262 .add_triple(&sample_triple("s1"), Namespace::World, YLayer::Semantic)
263 .unwrap();
264
265 let c1 = create_commit(&obj, &mut commits, vec![], "first", "DGX").unwrap();
266
267 obj.store
268 .add_triple(&sample_triple("s2"), Namespace::World, YLayer::Semantic)
269 .unwrap();
270
271 let c2 = create_commit(
272 &obj,
273 &mut commits,
274 vec![c1.commit_id.clone()],
275 "second",
276 "DGX",
277 )
278 .unwrap();
279
280 assert_eq!(commits.len(), 2);
281 assert_eq!(c2.parent_ids, vec![c1.commit_id]);
282 }
283
284 #[test]
285 fn test_commits_table_to_record_batch() {
286 let mut table = CommitsTable::new();
287 table.append(Commit {
288 commit_id: "c1".to_string(),
289 parent_ids: vec![],
290 timestamp_ms: 1000,
291 message: "init".to_string(),
292 author: "DGX".to_string(),
293 });
294 table.append(Commit {
295 commit_id: "c2".to_string(),
296 parent_ids: vec!["c1".to_string()],
297 timestamp_ms: 2000,
298 message: "second".to_string(),
299 author: "DGX".to_string(),
300 });
301
302 let batch = table.to_record_batch().unwrap();
303 assert_eq!(batch.num_rows(), 2);
304 assert_eq!(batch.num_columns(), 5);
305 }
306}