Skip to main content

libgrite_git/
wal.rs

1//! WAL (Write-Ahead Log) operations via `refs/grite/wal`
2//!
3//! The WAL stores events as git commits with CBOR-encoded chunks.
4//! Each commit contains:
5//! - meta.json with commit metadata
6//! - events/YYYY/MM/DD/<chunk_hash>.bin with the actual events
7
8use std::path::Path;
9use git2::{Oid, Repository, Signature};
10use serde::{Deserialize, Serialize};
11use chrono::{DateTime, Utc, Datelike};
12use libgrite_core::types::event::Event;
13use libgrite_core::types::ids::ActorId;
14
15use crate::chunk::{encode_chunk, decode_chunk, chunk_hash};
16use crate::GitError;
17
18/// WAL reference name
19pub const WAL_REF: &str = "refs/grite/wal";
20
21/// Metadata stored in each WAL commit
22#[derive(Debug, Serialize, Deserialize)]
23pub struct WalMeta {
24    pub schema_version: u32,
25    pub actor_id: String,
26    pub chunk_hash: String,
27    pub prev_wal: Option<String>,
28}
29
30/// Information about a WAL commit
31#[derive(Debug, Clone)]
32pub struct WalCommit {
33    pub oid: Oid,
34    pub actor_id: String,
35    pub chunk_hash: String,
36    pub prev_wal: Option<Oid>,
37}
38
39/// Manager for WAL operations
40pub struct WalManager {
41    repo: Repository,
42}
43
44impl WalManager {
45    /// Open a WAL manager for the repository at the given path
46    pub fn open(git_dir: &Path) -> Result<Self, GitError> {
47        // git_dir is .git, so parent is the repo root
48        let repo_path = git_dir.parent().ok_or(GitError::NotARepo)?;
49        let repo = Repository::open(repo_path)?;
50        Ok(Self { repo })
51    }
52
53    /// Get the current WAL head commit OID, if any
54    pub fn head(&self) -> Result<Option<Oid>, GitError> {
55        match self.repo.find_reference(WAL_REF) {
56            Ok(reference) => {
57                let oid = reference.target().ok_or_else(|| {
58                    GitError::Wal("WAL ref has no target".to_string())
59                })?;
60                Ok(Some(oid))
61            }
62            Err(e) if e.code() == git2::ErrorCode::NotFound => Ok(None),
63            Err(e) => Err(e.into()),
64        }
65    }
66
67    /// Append events to the WAL, creating a new commit
68    pub fn append(&self, actor_id: &ActorId, events: &[Event]) -> Result<Oid, GitError> {
69        if events.is_empty() {
70            return Err(GitError::Wal("Cannot append empty events".to_string()));
71        }
72
73        // Encode events to chunk
74        let chunk_data = encode_chunk(events)?;
75        let hash = chunk_hash(&chunk_data);
76        let hash_hex = hex::encode(hash);
77
78        // Get current head (will be parent)
79        let parent_commit = self.head()?.map(|oid| self.repo.find_commit(oid)).transpose()?;
80        let prev_wal = parent_commit.as_ref().map(|c| c.id());
81
82        // Determine chunk path based on timestamp of first event
83        let ts = events[0].ts_unix_ms;
84        let dt: DateTime<Utc> = DateTime::from_timestamp_millis(ts as i64)
85            .unwrap_or_else(|| Utc::now());
86        let chunk_path = format!(
87            "events/{:04}/{:02}/{:02}/{}.bin",
88            dt.year(),
89            dt.month(),
90            dt.day(),
91            hash_hex
92        );
93
94        // Create meta.json
95        let actor_id_hex = hex::encode(actor_id);
96        let meta = WalMeta {
97            schema_version: 1,
98            actor_id: actor_id_hex.clone(),
99            chunk_hash: hash_hex.clone(),
100            prev_wal: prev_wal.map(|oid| oid.to_string()),
101        };
102        let meta_json = serde_json::to_string_pretty(&meta)?;
103
104        // Build tree - each commit has only its own chunk, not parent's
105        let mut tree_builder = self.repo.treebuilder(None)?;
106
107        // Add meta.json blob
108        let meta_blob = self.repo.blob(meta_json.as_bytes())?;
109        tree_builder.insert("meta.json", meta_blob, 0o100644)?;
110
111        // Add chunk blob at the nested path
112        // We need to create the nested directory structure
113        let chunk_blob = self.repo.blob(&chunk_data)?;
114        let tree_oid = self.insert_nested_blob(&mut tree_builder, &chunk_path, chunk_blob)?;
115
116        // Create commit
117        let tree = self.repo.find_tree(tree_oid)?;
118        let sig = Signature::now("grite", "grit@local")?;
119        let message = format!("WAL: {} events from {}", events.len(), &actor_id_hex[..8]);
120
121        let parents: Vec<&git2::Commit> = parent_commit.as_ref().map(|c| vec![c]).unwrap_or_default();
122        let commit_oid = self.repo.commit(
123            Some(WAL_REF),
124            &sig,
125            &sig,
126            &message,
127            &tree,
128            &parents,
129        )?;
130
131        Ok(commit_oid)
132    }
133
134    /// Read all events from the WAL
135    pub fn read_all(&self) -> Result<Vec<Event>, GitError> {
136        let head = match self.head()? {
137            Some(oid) => oid,
138            None => return Ok(vec![]),
139        };
140        self.read_since_impl(head, None)
141    }
142
143    /// Read events since a given commit (exclusive)
144    pub fn read_since(&self, since_oid: Oid) -> Result<Vec<Event>, GitError> {
145        let head = match self.head()? {
146            Some(oid) => oid,
147            None => return Ok(vec![]),
148        };
149        self.read_since_impl(head, Some(since_oid))
150    }
151
152    /// Read all events from a specific commit OID (useful for reading orphaned commits)
153    pub fn read_from_oid(&self, oid: Oid) -> Result<Vec<Event>, GitError> {
154        self.read_since_impl(oid, None)
155    }
156
157    /// Internal implementation for reading events
158    fn read_since_impl(&self, head: Oid, stop_at: Option<Oid>) -> Result<Vec<Event>, GitError> {
159        let mut all_events = Vec::new();
160        let mut current_oid = Some(head);
161
162        // Walk backwards through commits
163        while let Some(oid) = current_oid {
164            if Some(oid) == stop_at {
165                break;
166            }
167
168            let commit = self.repo.find_commit(oid)?;
169            let tree = commit.tree()?;
170
171            // Read meta.json to get chunk path
172            let meta_entry = tree.get_name("meta.json")
173                .ok_or_else(|| GitError::Wal("Missing meta.json in WAL commit".to_string()))?;
174            let meta_blob = self.repo.find_blob(meta_entry.id())?;
175            let meta: WalMeta = serde_json::from_slice(meta_blob.content())?;
176
177            // Find and decode the chunk
178            let events = self.find_chunk_in_tree(&tree)?;
179            all_events.extend(events);
180
181            // Move to parent
182            current_oid = meta.prev_wal
183                .as_ref()
184                .map(|s| Oid::from_str(s))
185                .transpose()?;
186        }
187
188        // Events are in reverse order (newest first), reverse to get chronological
189        all_events.reverse();
190        Ok(all_events)
191    }
192
193    /// Find and decode chunk from tree
194    fn find_chunk_in_tree(&self, tree: &git2::Tree) -> Result<Vec<Event>, GitError> {
195        // Walk the tree to find .bin files
196        let mut events = Vec::new();
197        self.walk_tree_for_chunks(tree, &mut events)?;
198        Ok(events)
199    }
200
201    /// Recursively walk tree looking for .bin chunks
202    fn walk_tree_for_chunks(&self, tree: &git2::Tree, events: &mut Vec<Event>) -> Result<(), GitError> {
203        for entry in tree.iter() {
204            let name = entry.name().unwrap_or("");
205            match entry.kind() {
206                Some(git2::ObjectType::Blob) if name.ends_with(".bin") => {
207                    let blob = self.repo.find_blob(entry.id())?;
208                    let chunk_events = decode_chunk(blob.content())?;
209                    events.extend(chunk_events);
210                }
211                Some(git2::ObjectType::Tree) => {
212                    let subtree = self.repo.find_tree(entry.id())?;
213                    self.walk_tree_for_chunks(&subtree, events)?;
214                }
215                _ => {}
216            }
217        }
218        Ok(())
219    }
220
221    /// Insert a blob at a nested path, creating intermediate trees
222    fn insert_nested_blob(
223        &self,
224        root_builder: &mut git2::TreeBuilder,
225        path: &str,
226        blob_oid: Oid,
227    ) -> Result<Oid, GitError> {
228        let parts: Vec<&str> = path.split('/').collect();
229        if parts.len() == 1 {
230            // Direct insertion
231            root_builder.insert(parts[0], blob_oid, 0o100644)?;
232            return Ok(root_builder.write()?);
233        }
234
235        // Need to create nested structure
236        self.insert_nested_recursive(root_builder, &parts, blob_oid)
237    }
238
239    fn insert_nested_recursive(
240        &self,
241        builder: &mut git2::TreeBuilder,
242        parts: &[&str],
243        blob_oid: Oid,
244    ) -> Result<Oid, GitError> {
245        if parts.len() == 1 {
246            builder.insert(parts[0], blob_oid, 0o100644)?;
247            return Ok(builder.write()?);
248        }
249
250        let dir_name = parts[0];
251        let remaining = &parts[1..];
252
253        // Check if directory already exists
254        let existing_tree = builder.get(dir_name)?.map(|e| e.id());
255
256        let mut sub_builder = if let Some(tree_oid) = existing_tree {
257            let tree = self.repo.find_tree(tree_oid)?;
258            self.repo.treebuilder(Some(&tree))?
259        } else {
260            self.repo.treebuilder(None)?
261        };
262
263        self.insert_nested_recursive(&mut sub_builder, remaining, blob_oid)?;
264        let sub_tree_oid = sub_builder.write()?;
265        builder.insert(dir_name, sub_tree_oid, 0o040000)?;
266
267        Ok(builder.write()?)
268    }
269}
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274    use libgrite_core::hash::compute_event_id;
275    use libgrite_core::types::event::EventKind;
276    use libgrite_core::types::ids::generate_issue_id;
277    use tempfile::TempDir;
278    use std::process::Command;
279
280    fn setup_test_repo() -> (TempDir, Repository) {
281        let temp = TempDir::new().unwrap();
282
283        // Initialize git repo
284        Command::new("git")
285            .args(["init"])
286            .current_dir(temp.path())
287            .output()
288            .unwrap();
289
290        let repo = Repository::open(temp.path()).unwrap();
291        (temp, repo)
292    }
293
294    fn make_test_event(kind: EventKind) -> Event {
295        let issue_id = generate_issue_id();
296        let actor = [1u8; 16];
297        let ts_unix_ms = 1700000000000u64;
298        let event_id = compute_event_id(&issue_id, &actor, ts_unix_ms, None, &kind);
299        Event::new(event_id, issue_id, actor, ts_unix_ms, None, kind)
300    }
301
302    #[test]
303    fn test_wal_append_and_read() {
304        let (temp, _repo) = setup_test_repo();
305        let git_dir = temp.path().join(".git");
306
307        let wal = WalManager::open(&git_dir).unwrap();
308
309        // Initially empty
310        assert!(wal.head().unwrap().is_none());
311
312        // Append an event
313        let event = make_test_event(EventKind::IssueCreated {
314            title: "Test".to_string(),
315            body: "Body".to_string(),
316            labels: vec![],
317        });
318        let actor = [1u8; 16];
319
320        let oid = wal.append(&actor, &[event.clone()]).unwrap();
321        assert!(wal.head().unwrap().is_some());
322        assert_eq!(wal.head().unwrap().unwrap(), oid);
323
324        // Read back
325        let events = wal.read_all().unwrap();
326        assert_eq!(events.len(), 1);
327        assert_eq!(events[0].event_id, event.event_id);
328    }
329
330    #[test]
331    fn test_wal_multiple_appends() {
332        let (temp, _repo) = setup_test_repo();
333        let git_dir = temp.path().join(".git");
334
335        let wal = WalManager::open(&git_dir).unwrap();
336        let actor = [1u8; 16];
337
338        // Append first event
339        let event1 = make_test_event(EventKind::IssueCreated {
340            title: "First".to_string(),
341            body: "Body 1".to_string(),
342            labels: vec![],
343        });
344        let oid1 = wal.append(&actor, &[event1.clone()]).unwrap();
345
346        // Append second event
347        let event2 = make_test_event(EventKind::CommentAdded {
348            body: "A comment".to_string(),
349        });
350        let _oid2 = wal.append(&actor, &[event2.clone()]).unwrap();
351
352        // Read all - should get both in order
353        let events = wal.read_all().unwrap();
354        assert_eq!(events.len(), 2);
355        assert_eq!(events[0].event_id, event1.event_id);
356        assert_eq!(events[1].event_id, event2.event_id);
357
358        // Read since first - should only get second
359        let events_since = wal.read_since(oid1).unwrap();
360        assert_eq!(events_since.len(), 1);
361        assert_eq!(events_since[0].event_id, event2.event_id);
362    }
363}