Skip to main content

libgrite_git/
wal.rs

1//! WAL (Write-Ahead Log) operations via `refs/grite/wal`
2//!
3//! The WAL stores events as git commits with CBOR-encoded chunks.
4//! Each commit contains:
5//! - meta.json with commit metadata
6//! - events/YYYY/MM/DD/<chunk_hash>.bin with the actual events
7
8use chrono::{DateTime, Datelike, Utc};
9use git2::{Oid, Repository, Signature};
10use libgrite_core::types::event::Event;
11use libgrite_core::types::ids::ActorId;
12use serde::{Deserialize, Serialize};
13use std::path::Path;
14
15use crate::chunk::{chunk_hash, decode_chunk, encode_chunk};
16use crate::GitError;
17
18/// WAL reference name
19pub const WAL_REF: &str = "refs/grite/wal";
20
21/// Metadata stored in each WAL commit
22#[derive(Debug, Serialize, Deserialize)]
23pub struct WalMeta {
24    pub schema_version: u32,
25    pub actor_id: String,
26    pub chunk_hash: String,
27    pub prev_wal: Option<String>,
28}
29
30/// Information about a WAL commit
31#[derive(Debug, Clone)]
32pub struct WalCommit {
33    pub oid: Oid,
34    pub actor_id: String,
35    pub chunk_hash: String,
36    pub prev_wal: Option<Oid>,
37}
38
39/// Manager for WAL operations
40pub struct WalManager {
41    repo: Repository,
42}
43
44impl WalManager {
45    /// Open a WAL manager for the repository at the given path
46    pub fn open(git_dir: &Path) -> Result<Self, GitError> {
47        // git_dir is .git, so parent is the repo root
48        let repo_path = git_dir.parent().ok_or(GitError::NotARepo)?;
49        let repo = Repository::open(repo_path)?;
50        Ok(Self { repo })
51    }
52
53    /// Get the current WAL head commit OID, if any
54    pub fn head(&self) -> Result<Option<Oid>, GitError> {
55        match self.repo.find_reference(WAL_REF) {
56            Ok(reference) => {
57                let oid = reference
58                    .target()
59                    .ok_or_else(|| GitError::Wal("WAL ref has no target".to_string()))?;
60                Ok(Some(oid))
61            }
62            Err(e) if e.code() == git2::ErrorCode::NotFound => Ok(None),
63            Err(e) => Err(e.into()),
64        }
65    }
66
67    /// Append events to the WAL, creating a new commit
68    pub fn append(&self, actor_id: &ActorId, events: &[Event]) -> Result<Oid, GitError> {
69        if events.is_empty() {
70            return Err(GitError::Wal("Cannot append empty events".to_string()));
71        }
72
73        // Encode events to chunk
74        let chunk_data = encode_chunk(events)?;
75        let hash = chunk_hash(&chunk_data);
76        let hash_hex = hex::encode(hash);
77
78        // Get current head (will be parent)
79        let parent_commit = self
80            .head()?
81            .map(|oid| self.repo.find_commit(oid))
82            .transpose()?;
83        let prev_wal = parent_commit.as_ref().map(|c| c.id());
84
85        // Determine chunk path based on timestamp of first event
86        let ts = events[0].ts_unix_ms;
87        let dt: DateTime<Utc> = DateTime::from_timestamp_millis(ts as i64).unwrap_or_else(Utc::now);
88        let chunk_path = format!(
89            "events/{:04}/{:02}/{:02}/{}.bin",
90            dt.year(),
91            dt.month(),
92            dt.day(),
93            hash_hex
94        );
95
96        // Create meta.json
97        let actor_id_hex = hex::encode(actor_id);
98        let meta = WalMeta {
99            schema_version: 1,
100            actor_id: actor_id_hex.clone(),
101            chunk_hash: hash_hex.clone(),
102            prev_wal: prev_wal.map(|oid| oid.to_string()),
103        };
104        let meta_json = serde_json::to_string_pretty(&meta)?;
105
106        // Build tree - each commit has only its own chunk, not parent's
107        let mut tree_builder = self.repo.treebuilder(None)?;
108
109        // Add meta.json blob
110        let meta_blob = self.repo.blob(meta_json.as_bytes())?;
111        tree_builder.insert("meta.json", meta_blob, 0o100644)?;
112
113        // Add chunk blob at the nested path
114        // We need to create the nested directory structure
115        let chunk_blob = self.repo.blob(&chunk_data)?;
116        let tree_oid = self.insert_nested_blob(&mut tree_builder, &chunk_path, chunk_blob)?;
117
118        // Create commit
119        let tree = self.repo.find_tree(tree_oid)?;
120        let sig = Signature::now("grite", "grit@local")?;
121        let message = format!("WAL: {} events from {}", events.len(), &actor_id_hex[..8]);
122
123        let parents: Vec<&git2::Commit> =
124            parent_commit.as_ref().map(|c| vec![c]).unwrap_or_default();
125        let commit_oid = self
126            .repo
127            .commit(Some(WAL_REF), &sig, &sig, &message, &tree, &parents)?;
128
129        Ok(commit_oid)
130    }
131
132    /// Read all events from the WAL
133    pub fn read_all(&self) -> Result<Vec<Event>, GitError> {
134        let head = match self.head()? {
135            Some(oid) => oid,
136            None => return Ok(vec![]),
137        };
138        self.read_since_impl(head, None)
139    }
140
141    /// Read events since a given commit (exclusive)
142    pub fn read_since(&self, since_oid: Oid) -> Result<Vec<Event>, GitError> {
143        let head = match self.head()? {
144            Some(oid) => oid,
145            None => return Ok(vec![]),
146        };
147        self.read_since_impl(head, Some(since_oid))
148    }
149
150    /// Read all events from a specific commit OID (useful for reading orphaned commits)
151    pub fn read_from_oid(&self, oid: Oid) -> Result<Vec<Event>, GitError> {
152        self.read_since_impl(oid, None)
153    }
154
155    /// Internal implementation for reading events
156    fn read_since_impl(&self, head: Oid, stop_at: Option<Oid>) -> Result<Vec<Event>, GitError> {
157        let mut all_events = Vec::new();
158        let mut current_oid = Some(head);
159
160        // Walk backwards through commits
161        while let Some(oid) = current_oid {
162            if Some(oid) == stop_at {
163                break;
164            }
165
166            let commit = self.repo.find_commit(oid)?;
167            let tree = commit.tree()?;
168
169            // Read meta.json to get chunk path
170            let meta_entry = tree
171                .get_name("meta.json")
172                .ok_or_else(|| GitError::Wal("Missing meta.json in WAL commit".to_string()))?;
173            let meta_blob = self.repo.find_blob(meta_entry.id())?;
174            let meta: WalMeta = serde_json::from_slice(meta_blob.content())?;
175
176            // Find and decode the chunk
177            let events = self.find_chunk_in_tree(&tree)?;
178            all_events.extend(events);
179
180            // Move to parent
181            current_oid = meta
182                .prev_wal
183                .as_ref()
184                .map(|s| Oid::from_str(s))
185                .transpose()?;
186        }
187
188        // Events are in reverse order (newest first), reverse to get chronological
189        all_events.reverse();
190        Ok(all_events)
191    }
192
193    /// Find and decode chunk from tree
194    fn find_chunk_in_tree(&self, tree: &git2::Tree) -> Result<Vec<Event>, GitError> {
195        // Walk the tree to find .bin files
196        let mut events = Vec::new();
197        self.walk_tree_for_chunks(tree, &mut events)?;
198        Ok(events)
199    }
200
201    /// Recursively walk tree looking for .bin chunks
202    fn walk_tree_for_chunks(
203        &self,
204        tree: &git2::Tree,
205        events: &mut Vec<Event>,
206    ) -> Result<(), GitError> {
207        for entry in tree.iter() {
208            let name = entry.name().unwrap_or("");
209            match entry.kind() {
210                Some(git2::ObjectType::Blob) if name.ends_with(".bin") => {
211                    let blob = self.repo.find_blob(entry.id())?;
212                    let chunk_events = decode_chunk(blob.content())?;
213                    events.extend(chunk_events);
214                }
215                Some(git2::ObjectType::Tree) => {
216                    let subtree = self.repo.find_tree(entry.id())?;
217                    self.walk_tree_for_chunks(&subtree, events)?;
218                }
219                _ => {}
220            }
221        }
222        Ok(())
223    }
224
225    /// Insert a blob at a nested path, creating intermediate trees
226    fn insert_nested_blob(
227        &self,
228        root_builder: &mut git2::TreeBuilder,
229        path: &str,
230        blob_oid: Oid,
231    ) -> Result<Oid, GitError> {
232        let parts: Vec<&str> = path.split('/').collect();
233        if parts.len() == 1 {
234            // Direct insertion
235            root_builder.insert(parts[0], blob_oid, 0o100644)?;
236            return Ok(root_builder.write()?);
237        }
238
239        // Need to create nested structure
240        self.insert_nested_recursive(root_builder, &parts, blob_oid)
241    }
242
243    fn insert_nested_recursive(
244        &self,
245        builder: &mut git2::TreeBuilder,
246        parts: &[&str],
247        blob_oid: Oid,
248    ) -> Result<Oid, GitError> {
249        if parts.len() == 1 {
250            builder.insert(parts[0], blob_oid, 0o100644)?;
251            return Ok(builder.write()?);
252        }
253
254        let dir_name = parts[0];
255        let remaining = &parts[1..];
256
257        // Check if directory already exists
258        let existing_tree = builder.get(dir_name)?.map(|e| e.id());
259
260        let mut sub_builder = if let Some(tree_oid) = existing_tree {
261            let tree = self.repo.find_tree(tree_oid)?;
262            self.repo.treebuilder(Some(&tree))?
263        } else {
264            self.repo.treebuilder(None)?
265        };
266
267        self.insert_nested_recursive(&mut sub_builder, remaining, blob_oid)?;
268        let sub_tree_oid = sub_builder.write()?;
269        builder.insert(dir_name, sub_tree_oid, 0o040000)?;
270
271        Ok(builder.write()?)
272    }
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278    use libgrite_core::hash::compute_event_id;
279    use libgrite_core::types::event::EventKind;
280    use libgrite_core::types::ids::generate_issue_id;
281    use std::process::Command;
282    use tempfile::TempDir;
283
284    fn setup_test_repo() -> (TempDir, Repository) {
285        let temp = TempDir::new().unwrap();
286
287        // Initialize git repo
288        Command::new("git")
289            .args(["init"])
290            .current_dir(temp.path())
291            .output()
292            .unwrap();
293
294        let repo = Repository::open(temp.path()).unwrap();
295        (temp, repo)
296    }
297
298    fn make_test_event(kind: EventKind) -> Event {
299        let issue_id = generate_issue_id();
300        let actor = [1u8; 16];
301        let ts_unix_ms = 1700000000000u64;
302        let event_id = compute_event_id(&issue_id, &actor, ts_unix_ms, None, &kind);
303        Event::new(event_id, issue_id, actor, ts_unix_ms, None, kind)
304    }
305
306    #[test]
307    fn test_wal_append_and_read() {
308        let (temp, _repo) = setup_test_repo();
309        let git_dir = temp.path().join(".git");
310
311        let wal = WalManager::open(&git_dir).unwrap();
312
313        // Initially empty
314        assert!(wal.head().unwrap().is_none());
315
316        // Append an event
317        let event = make_test_event(EventKind::IssueCreated {
318            title: "Test".to_string(),
319            body: "Body".to_string(),
320            labels: vec![],
321        });
322        let actor = [1u8; 16];
323
324        let oid = wal.append(&actor, std::slice::from_ref(&event)).unwrap();
325        assert!(wal.head().unwrap().is_some());
326        assert_eq!(wal.head().unwrap().unwrap(), oid);
327
328        // Read back
329        let events = wal.read_all().unwrap();
330        assert_eq!(events.len(), 1);
331        assert_eq!(events[0].event_id, event.event_id);
332    }
333
334    #[test]
335    fn test_wal_multiple_appends() {
336        let (temp, _repo) = setup_test_repo();
337        let git_dir = temp.path().join(".git");
338
339        let wal = WalManager::open(&git_dir).unwrap();
340        let actor = [1u8; 16];
341
342        // Append first event
343        let event1 = make_test_event(EventKind::IssueCreated {
344            title: "First".to_string(),
345            body: "Body 1".to_string(),
346            labels: vec![],
347        });
348        let oid1 = wal.append(&actor, std::slice::from_ref(&event1)).unwrap();
349
350        // Append second event
351        let event2 = make_test_event(EventKind::CommentAdded {
352            body: "A comment".to_string(),
353        });
354        let _oid2 = wal.append(&actor, std::slice::from_ref(&event2)).unwrap();
355
356        // Read all - should get both in order
357        let events = wal.read_all().unwrap();
358        assert_eq!(events.len(), 2);
359        assert_eq!(events[0].event_id, event1.event_id);
360        assert_eq!(events[1].event_id, event2.event_id);
361
362        // Read since first - should only get second
363        let events_since = wal.read_since(oid1).unwrap();
364        assert_eq!(events_since.len(), 1);
365        assert_eq!(events_since[0].event_id, event2.event_id);
366    }
367}