1use chrono::{DateTime, Datelike, Utc};
9use git2::{Oid, Repository, Signature};
10use libgrite_core::types::event::Event;
11use libgrite_core::types::ids::ActorId;
12use serde::{Deserialize, Serialize};
13use std::path::Path;
14
15use crate::chunk::{chunk_hash, decode_chunk, encode_chunk};
16use crate::GitError;
17
18pub const WAL_REF: &str = "refs/grite/wal";
20
21#[derive(Debug, Serialize, Deserialize)]
23pub struct WalMeta {
24 pub schema_version: u32,
25 pub actor_id: String,
26 pub chunk_hash: String,
27 pub prev_wal: Option<String>,
28}
29
30#[derive(Debug, Clone)]
32pub struct WalCommit {
33 pub oid: Oid,
34 pub actor_id: String,
35 pub chunk_hash: String,
36 pub prev_wal: Option<Oid>,
37}
38
39pub struct WalManager {
41 repo: Repository,
42}
43
44impl WalManager {
45 pub fn open(git_dir: &Path) -> Result<Self, GitError> {
47 let repo_path = git_dir.parent().ok_or(GitError::NotARepo)?;
49 let repo = Repository::open(repo_path)?;
50 Ok(Self { repo })
51 }
52
53 pub fn head(&self) -> Result<Option<Oid>, GitError> {
55 match self.repo.find_reference(WAL_REF) {
56 Ok(reference) => {
57 let oid = reference
58 .target()
59 .ok_or_else(|| GitError::Wal("WAL ref has no target".to_string()))?;
60 Ok(Some(oid))
61 }
62 Err(e) if e.code() == git2::ErrorCode::NotFound => Ok(None),
63 Err(e) => Err(e.into()),
64 }
65 }
66
67 pub fn append(&self, actor_id: &ActorId, events: &[Event]) -> Result<Oid, GitError> {
69 if events.is_empty() {
70 return Err(GitError::Wal("Cannot append empty events".to_string()));
71 }
72
73 let chunk_data = encode_chunk(events)?;
75 let hash = chunk_hash(&chunk_data);
76 let hash_hex = hex::encode(hash);
77
78 let parent_commit = self
80 .head()?
81 .map(|oid| self.repo.find_commit(oid))
82 .transpose()?;
83 let prev_wal = parent_commit.as_ref().map(|c| c.id());
84
85 let ts = events[0].ts_unix_ms;
87 let dt: DateTime<Utc> = DateTime::from_timestamp_millis(ts as i64).unwrap_or_else(Utc::now);
88 let chunk_path = format!(
89 "events/{:04}/{:02}/{:02}/{}.bin",
90 dt.year(),
91 dt.month(),
92 dt.day(),
93 hash_hex
94 );
95
96 let actor_id_hex = hex::encode(actor_id);
98 let meta = WalMeta {
99 schema_version: 1,
100 actor_id: actor_id_hex.clone(),
101 chunk_hash: hash_hex.clone(),
102 prev_wal: prev_wal.map(|oid| oid.to_string()),
103 };
104 let meta_json = serde_json::to_string_pretty(&meta)?;
105
106 let mut tree_builder = self.repo.treebuilder(None)?;
108
109 let meta_blob = self.repo.blob(meta_json.as_bytes())?;
111 tree_builder.insert("meta.json", meta_blob, 0o100644)?;
112
113 let chunk_blob = self.repo.blob(&chunk_data)?;
116 let tree_oid = self.insert_nested_blob(&mut tree_builder, &chunk_path, chunk_blob)?;
117
118 let tree = self.repo.find_tree(tree_oid)?;
120 let sig = Signature::now("grite", "grit@local")?;
121 let message = format!("WAL: {} events from {}", events.len(), &actor_id_hex[..8]);
122
123 let parents: Vec<&git2::Commit> =
124 parent_commit.as_ref().map(|c| vec![c]).unwrap_or_default();
125 let commit_oid = self
126 .repo
127 .commit(Some(WAL_REF), &sig, &sig, &message, &tree, &parents)?;
128
129 Ok(commit_oid)
130 }
131
132 pub fn read_all(&self) -> Result<Vec<Event>, GitError> {
134 let head = match self.head()? {
135 Some(oid) => oid,
136 None => return Ok(vec![]),
137 };
138 self.read_since_impl(head, None)
139 }
140
141 pub fn read_since(&self, since_oid: Oid) -> Result<Vec<Event>, GitError> {
143 let head = match self.head()? {
144 Some(oid) => oid,
145 None => return Ok(vec![]),
146 };
147 self.read_since_impl(head, Some(since_oid))
148 }
149
150 pub fn read_from_oid(&self, oid: Oid) -> Result<Vec<Event>, GitError> {
152 self.read_since_impl(oid, None)
153 }
154
155 fn read_since_impl(&self, head: Oid, stop_at: Option<Oid>) -> Result<Vec<Event>, GitError> {
157 let mut all_events = Vec::new();
158 let mut current_oid = Some(head);
159
160 while let Some(oid) = current_oid {
162 if Some(oid) == stop_at {
163 break;
164 }
165
166 let commit = self.repo.find_commit(oid)?;
167 let tree = commit.tree()?;
168
169 let meta_entry = tree
171 .get_name("meta.json")
172 .ok_or_else(|| GitError::Wal("Missing meta.json in WAL commit".to_string()))?;
173 let meta_blob = self.repo.find_blob(meta_entry.id())?;
174 let meta: WalMeta = serde_json::from_slice(meta_blob.content())?;
175
176 let events = self.find_chunk_in_tree(&tree)?;
178 all_events.extend(events);
179
180 current_oid = meta
182 .prev_wal
183 .as_ref()
184 .map(|s| Oid::from_str(s))
185 .transpose()?;
186 }
187
188 all_events.reverse();
190 Ok(all_events)
191 }
192
193 fn find_chunk_in_tree(&self, tree: &git2::Tree) -> Result<Vec<Event>, GitError> {
195 let mut events = Vec::new();
197 self.walk_tree_for_chunks(tree, &mut events)?;
198 Ok(events)
199 }
200
201 fn walk_tree_for_chunks(
203 &self,
204 tree: &git2::Tree,
205 events: &mut Vec<Event>,
206 ) -> Result<(), GitError> {
207 for entry in tree.iter() {
208 let name = entry.name().unwrap_or("");
209 match entry.kind() {
210 Some(git2::ObjectType::Blob) if name.ends_with(".bin") => {
211 let blob = self.repo.find_blob(entry.id())?;
212 let chunk_events = decode_chunk(blob.content())?;
213 events.extend(chunk_events);
214 }
215 Some(git2::ObjectType::Tree) => {
216 let subtree = self.repo.find_tree(entry.id())?;
217 self.walk_tree_for_chunks(&subtree, events)?;
218 }
219 _ => {}
220 }
221 }
222 Ok(())
223 }
224
225 fn insert_nested_blob(
227 &self,
228 root_builder: &mut git2::TreeBuilder,
229 path: &str,
230 blob_oid: Oid,
231 ) -> Result<Oid, GitError> {
232 let parts: Vec<&str> = path.split('/').collect();
233 if parts.len() == 1 {
234 root_builder.insert(parts[0], blob_oid, 0o100644)?;
236 return Ok(root_builder.write()?);
237 }
238
239 self.insert_nested_recursive(root_builder, &parts, blob_oid)
241 }
242
243 fn insert_nested_recursive(
244 &self,
245 builder: &mut git2::TreeBuilder,
246 parts: &[&str],
247 blob_oid: Oid,
248 ) -> Result<Oid, GitError> {
249 if parts.len() == 1 {
250 builder.insert(parts[0], blob_oid, 0o100644)?;
251 return Ok(builder.write()?);
252 }
253
254 let dir_name = parts[0];
255 let remaining = &parts[1..];
256
257 let existing_tree = builder.get(dir_name)?.map(|e| e.id());
259
260 let mut sub_builder = if let Some(tree_oid) = existing_tree {
261 let tree = self.repo.find_tree(tree_oid)?;
262 self.repo.treebuilder(Some(&tree))?
263 } else {
264 self.repo.treebuilder(None)?
265 };
266
267 self.insert_nested_recursive(&mut sub_builder, remaining, blob_oid)?;
268 let sub_tree_oid = sub_builder.write()?;
269 builder.insert(dir_name, sub_tree_oid, 0o040000)?;
270
271 Ok(builder.write()?)
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278 use libgrite_core::hash::compute_event_id;
279 use libgrite_core::types::event::EventKind;
280 use libgrite_core::types::ids::generate_issue_id;
281 use std::process::Command;
282 use tempfile::TempDir;
283
284 fn setup_test_repo() -> (TempDir, Repository) {
285 let temp = TempDir::new().unwrap();
286
287 Command::new("git")
289 .args(["init"])
290 .current_dir(temp.path())
291 .output()
292 .unwrap();
293
294 let repo = Repository::open(temp.path()).unwrap();
295 (temp, repo)
296 }
297
298 fn make_test_event(kind: EventKind) -> Event {
299 let issue_id = generate_issue_id();
300 let actor = [1u8; 16];
301 let ts_unix_ms = 1700000000000u64;
302 let event_id = compute_event_id(&issue_id, &actor, ts_unix_ms, None, &kind);
303 Event::new(event_id, issue_id, actor, ts_unix_ms, None, kind)
304 }
305
306 #[test]
307 fn test_wal_append_and_read() {
308 let (temp, _repo) = setup_test_repo();
309 let git_dir = temp.path().join(".git");
310
311 let wal = WalManager::open(&git_dir).unwrap();
312
313 assert!(wal.head().unwrap().is_none());
315
316 let event = make_test_event(EventKind::IssueCreated {
318 title: "Test".to_string(),
319 body: "Body".to_string(),
320 labels: vec![],
321 });
322 let actor = [1u8; 16];
323
324 let oid = wal.append(&actor, std::slice::from_ref(&event)).unwrap();
325 assert!(wal.head().unwrap().is_some());
326 assert_eq!(wal.head().unwrap().unwrap(), oid);
327
328 let events = wal.read_all().unwrap();
330 assert_eq!(events.len(), 1);
331 assert_eq!(events[0].event_id, event.event_id);
332 }
333
334 #[test]
335 fn test_wal_multiple_appends() {
336 let (temp, _repo) = setup_test_repo();
337 let git_dir = temp.path().join(".git");
338
339 let wal = WalManager::open(&git_dir).unwrap();
340 let actor = [1u8; 16];
341
342 let event1 = make_test_event(EventKind::IssueCreated {
344 title: "First".to_string(),
345 body: "Body 1".to_string(),
346 labels: vec![],
347 });
348 let oid1 = wal.append(&actor, std::slice::from_ref(&event1)).unwrap();
349
350 let event2 = make_test_event(EventKind::CommentAdded {
352 body: "A comment".to_string(),
353 });
354 let _oid2 = wal.append(&actor, std::slice::from_ref(&event2)).unwrap();
355
356 let events = wal.read_all().unwrap();
358 assert_eq!(events.len(), 2);
359 assert_eq!(events[0].event_id, event1.event_id);
360 assert_eq!(events[1].event_id, event2.event_id);
361
362 let events_since = wal.read_since(oid1).unwrap();
364 assert_eq!(events_since.len(), 1);
365 assert_eq!(events_since[0].event_id, event2.event_id);
366 }
367}