1use std::path::Path;
9use git2::{Oid, Repository, Signature};
10use serde::{Deserialize, Serialize};
11use chrono::{DateTime, Utc, Datelike};
12use libgrite_core::types::event::Event;
13use libgrite_core::types::ids::ActorId;
14
15use crate::chunk::{encode_chunk, decode_chunk, chunk_hash};
16use crate::GitError;
17
18pub const WAL_REF: &str = "refs/grite/wal";
20
21#[derive(Debug, Serialize, Deserialize)]
23pub struct WalMeta {
24 pub schema_version: u32,
25 pub actor_id: String,
26 pub chunk_hash: String,
27 pub prev_wal: Option<String>,
28}
29
30#[derive(Debug, Clone)]
32pub struct WalCommit {
33 pub oid: Oid,
34 pub actor_id: String,
35 pub chunk_hash: String,
36 pub prev_wal: Option<Oid>,
37}
38
39pub struct WalManager {
41 repo: Repository,
42}
43
44impl WalManager {
45 pub fn open(git_dir: &Path) -> Result<Self, GitError> {
47 let repo_path = git_dir.parent().ok_or(GitError::NotARepo)?;
49 let repo = Repository::open(repo_path)?;
50 Ok(Self { repo })
51 }
52
53 pub fn head(&self) -> Result<Option<Oid>, GitError> {
55 match self.repo.find_reference(WAL_REF) {
56 Ok(reference) => {
57 let oid = reference.target().ok_or_else(|| {
58 GitError::Wal("WAL ref has no target".to_string())
59 })?;
60 Ok(Some(oid))
61 }
62 Err(e) if e.code() == git2::ErrorCode::NotFound => Ok(None),
63 Err(e) => Err(e.into()),
64 }
65 }
66
67 pub fn append(&self, actor_id: &ActorId, events: &[Event]) -> Result<Oid, GitError> {
69 if events.is_empty() {
70 return Err(GitError::Wal("Cannot append empty events".to_string()));
71 }
72
73 let chunk_data = encode_chunk(events)?;
75 let hash = chunk_hash(&chunk_data);
76 let hash_hex = hex::encode(hash);
77
78 let parent_commit = self.head()?.map(|oid| self.repo.find_commit(oid)).transpose()?;
80 let prev_wal = parent_commit.as_ref().map(|c| c.id());
81
82 let ts = events[0].ts_unix_ms;
84 let dt: DateTime<Utc> = DateTime::from_timestamp_millis(ts as i64)
85 .unwrap_or_else(|| Utc::now());
86 let chunk_path = format!(
87 "events/{:04}/{:02}/{:02}/{}.bin",
88 dt.year(),
89 dt.month(),
90 dt.day(),
91 hash_hex
92 );
93
94 let actor_id_hex = hex::encode(actor_id);
96 let meta = WalMeta {
97 schema_version: 1,
98 actor_id: actor_id_hex.clone(),
99 chunk_hash: hash_hex.clone(),
100 prev_wal: prev_wal.map(|oid| oid.to_string()),
101 };
102 let meta_json = serde_json::to_string_pretty(&meta)?;
103
104 let mut tree_builder = self.repo.treebuilder(None)?;
106
107 let meta_blob = self.repo.blob(meta_json.as_bytes())?;
109 tree_builder.insert("meta.json", meta_blob, 0o100644)?;
110
111 let chunk_blob = self.repo.blob(&chunk_data)?;
114 let tree_oid = self.insert_nested_blob(&mut tree_builder, &chunk_path, chunk_blob)?;
115
116 let tree = self.repo.find_tree(tree_oid)?;
118 let sig = Signature::now("grite", "grit@local")?;
119 let message = format!("WAL: {} events from {}", events.len(), &actor_id_hex[..8]);
120
121 let parents: Vec<&git2::Commit> = parent_commit.as_ref().map(|c| vec![c]).unwrap_or_default();
122 let commit_oid = self.repo.commit(
123 Some(WAL_REF),
124 &sig,
125 &sig,
126 &message,
127 &tree,
128 &parents,
129 )?;
130
131 Ok(commit_oid)
132 }
133
134 pub fn read_all(&self) -> Result<Vec<Event>, GitError> {
136 let head = match self.head()? {
137 Some(oid) => oid,
138 None => return Ok(vec![]),
139 };
140 self.read_since_impl(head, None)
141 }
142
143 pub fn read_since(&self, since_oid: Oid) -> Result<Vec<Event>, GitError> {
145 let head = match self.head()? {
146 Some(oid) => oid,
147 None => return Ok(vec![]),
148 };
149 self.read_since_impl(head, Some(since_oid))
150 }
151
152 pub fn read_from_oid(&self, oid: Oid) -> Result<Vec<Event>, GitError> {
154 self.read_since_impl(oid, None)
155 }
156
157 fn read_since_impl(&self, head: Oid, stop_at: Option<Oid>) -> Result<Vec<Event>, GitError> {
159 let mut all_events = Vec::new();
160 let mut current_oid = Some(head);
161
162 while let Some(oid) = current_oid {
164 if Some(oid) == stop_at {
165 break;
166 }
167
168 let commit = self.repo.find_commit(oid)?;
169 let tree = commit.tree()?;
170
171 let meta_entry = tree.get_name("meta.json")
173 .ok_or_else(|| GitError::Wal("Missing meta.json in WAL commit".to_string()))?;
174 let meta_blob = self.repo.find_blob(meta_entry.id())?;
175 let meta: WalMeta = serde_json::from_slice(meta_blob.content())?;
176
177 let events = self.find_chunk_in_tree(&tree)?;
179 all_events.extend(events);
180
181 current_oid = meta.prev_wal
183 .as_ref()
184 .map(|s| Oid::from_str(s))
185 .transpose()?;
186 }
187
188 all_events.reverse();
190 Ok(all_events)
191 }
192
193 fn find_chunk_in_tree(&self, tree: &git2::Tree) -> Result<Vec<Event>, GitError> {
195 let mut events = Vec::new();
197 self.walk_tree_for_chunks(tree, &mut events)?;
198 Ok(events)
199 }
200
201 fn walk_tree_for_chunks(&self, tree: &git2::Tree, events: &mut Vec<Event>) -> Result<(), GitError> {
203 for entry in tree.iter() {
204 let name = entry.name().unwrap_or("");
205 match entry.kind() {
206 Some(git2::ObjectType::Blob) if name.ends_with(".bin") => {
207 let blob = self.repo.find_blob(entry.id())?;
208 let chunk_events = decode_chunk(blob.content())?;
209 events.extend(chunk_events);
210 }
211 Some(git2::ObjectType::Tree) => {
212 let subtree = self.repo.find_tree(entry.id())?;
213 self.walk_tree_for_chunks(&subtree, events)?;
214 }
215 _ => {}
216 }
217 }
218 Ok(())
219 }
220
221 fn insert_nested_blob(
223 &self,
224 root_builder: &mut git2::TreeBuilder,
225 path: &str,
226 blob_oid: Oid,
227 ) -> Result<Oid, GitError> {
228 let parts: Vec<&str> = path.split('/').collect();
229 if parts.len() == 1 {
230 root_builder.insert(parts[0], blob_oid, 0o100644)?;
232 return Ok(root_builder.write()?);
233 }
234
235 self.insert_nested_recursive(root_builder, &parts, blob_oid)
237 }
238
239 fn insert_nested_recursive(
240 &self,
241 builder: &mut git2::TreeBuilder,
242 parts: &[&str],
243 blob_oid: Oid,
244 ) -> Result<Oid, GitError> {
245 if parts.len() == 1 {
246 builder.insert(parts[0], blob_oid, 0o100644)?;
247 return Ok(builder.write()?);
248 }
249
250 let dir_name = parts[0];
251 let remaining = &parts[1..];
252
253 let existing_tree = builder.get(dir_name)?.map(|e| e.id());
255
256 let mut sub_builder = if let Some(tree_oid) = existing_tree {
257 let tree = self.repo.find_tree(tree_oid)?;
258 self.repo.treebuilder(Some(&tree))?
259 } else {
260 self.repo.treebuilder(None)?
261 };
262
263 self.insert_nested_recursive(&mut sub_builder, remaining, blob_oid)?;
264 let sub_tree_oid = sub_builder.write()?;
265 builder.insert(dir_name, sub_tree_oid, 0o040000)?;
266
267 Ok(builder.write()?)
268 }
269}
270
271#[cfg(test)]
272mod tests {
273 use super::*;
274 use libgrite_core::hash::compute_event_id;
275 use libgrite_core::types::event::EventKind;
276 use libgrite_core::types::ids::generate_issue_id;
277 use tempfile::TempDir;
278 use std::process::Command;
279
280 fn setup_test_repo() -> (TempDir, Repository) {
281 let temp = TempDir::new().unwrap();
282
283 Command::new("git")
285 .args(["init"])
286 .current_dir(temp.path())
287 .output()
288 .unwrap();
289
290 let repo = Repository::open(temp.path()).unwrap();
291 (temp, repo)
292 }
293
294 fn make_test_event(kind: EventKind) -> Event {
295 let issue_id = generate_issue_id();
296 let actor = [1u8; 16];
297 let ts_unix_ms = 1700000000000u64;
298 let event_id = compute_event_id(&issue_id, &actor, ts_unix_ms, None, &kind);
299 Event::new(event_id, issue_id, actor, ts_unix_ms, None, kind)
300 }
301
302 #[test]
303 fn test_wal_append_and_read() {
304 let (temp, _repo) = setup_test_repo();
305 let git_dir = temp.path().join(".git");
306
307 let wal = WalManager::open(&git_dir).unwrap();
308
309 assert!(wal.head().unwrap().is_none());
311
312 let event = make_test_event(EventKind::IssueCreated {
314 title: "Test".to_string(),
315 body: "Body".to_string(),
316 labels: vec![],
317 });
318 let actor = [1u8; 16];
319
320 let oid = wal.append(&actor, &[event.clone()]).unwrap();
321 assert!(wal.head().unwrap().is_some());
322 assert_eq!(wal.head().unwrap().unwrap(), oid);
323
324 let events = wal.read_all().unwrap();
326 assert_eq!(events.len(), 1);
327 assert_eq!(events[0].event_id, event.event_id);
328 }
329
330 #[test]
331 fn test_wal_multiple_appends() {
332 let (temp, _repo) = setup_test_repo();
333 let git_dir = temp.path().join(".git");
334
335 let wal = WalManager::open(&git_dir).unwrap();
336 let actor = [1u8; 16];
337
338 let event1 = make_test_event(EventKind::IssueCreated {
340 title: "First".to_string(),
341 body: "Body 1".to_string(),
342 labels: vec![],
343 });
344 let oid1 = wal.append(&actor, &[event1.clone()]).unwrap();
345
346 let event2 = make_test_event(EventKind::CommentAdded {
348 body: "A comment".to_string(),
349 });
350 let _oid2 = wal.append(&actor, &[event2.clone()]).unwrap();
351
352 let events = wal.read_all().unwrap();
354 assert_eq!(events.len(), 2);
355 assert_eq!(events[0].event_id, event1.event_id);
356 assert_eq!(events[1].event_id, event2.event_id);
357
358 let events_since = wal.read_since(oid1).unwrap();
360 assert_eq!(events_since.len(), 1);
361 assert_eq!(events_since[0].event_id, event2.event_id);
362 }
363}