use std::path::Path;
use git2::{Oid, Repository, Signature};
use serde::{Deserialize, Serialize};
use chrono::{DateTime, Utc, Datelike};
use libgrite_core::types::event::Event;
use libgrite_core::types::ids::ActorId;
use crate::chunk::{encode_chunk, decode_chunk, chunk_hash};
use crate::GitError;
pub const WAL_REF: &str = "refs/grite/wal";
#[derive(Debug, Serialize, Deserialize)]
pub struct WalMeta {
pub schema_version: u32,
pub actor_id: String,
pub chunk_hash: String,
pub prev_wal: Option<String>,
}
#[derive(Debug, Clone)]
pub struct WalCommit {
pub oid: Oid,
pub actor_id: String,
pub chunk_hash: String,
pub prev_wal: Option<Oid>,
}
pub struct WalManager {
repo: Repository,
}
impl WalManager {
pub fn open(git_dir: &Path) -> Result<Self, GitError> {
let repo_path = git_dir.parent().ok_or(GitError::NotARepo)?;
let repo = Repository::open(repo_path)?;
Ok(Self { repo })
}
pub fn head(&self) -> Result<Option<Oid>, GitError> {
match self.repo.find_reference(WAL_REF) {
Ok(reference) => {
let oid = reference.target().ok_or_else(|| {
GitError::Wal("WAL ref has no target".to_string())
})?;
Ok(Some(oid))
}
Err(e) if e.code() == git2::ErrorCode::NotFound => Ok(None),
Err(e) => Err(e.into()),
}
}
pub fn append(&self, actor_id: &ActorId, events: &[Event]) -> Result<Oid, GitError> {
if events.is_empty() {
return Err(GitError::Wal("Cannot append empty events".to_string()));
}
let chunk_data = encode_chunk(events)?;
let hash = chunk_hash(&chunk_data);
let hash_hex = hex::encode(hash);
let parent_commit = self.head()?.map(|oid| self.repo.find_commit(oid)).transpose()?;
let prev_wal = parent_commit.as_ref().map(|c| c.id());
let ts = events[0].ts_unix_ms;
let dt: DateTime<Utc> = DateTime::from_timestamp_millis(ts as i64)
.unwrap_or_else(|| Utc::now());
let chunk_path = format!(
"events/{:04}/{:02}/{:02}/{}.bin",
dt.year(),
dt.month(),
dt.day(),
hash_hex
);
let actor_id_hex = hex::encode(actor_id);
let meta = WalMeta {
schema_version: 1,
actor_id: actor_id_hex.clone(),
chunk_hash: hash_hex.clone(),
prev_wal: prev_wal.map(|oid| oid.to_string()),
};
let meta_json = serde_json::to_string_pretty(&meta)?;
let mut tree_builder = self.repo.treebuilder(None)?;
let meta_blob = self.repo.blob(meta_json.as_bytes())?;
tree_builder.insert("meta.json", meta_blob, 0o100644)?;
let chunk_blob = self.repo.blob(&chunk_data)?;
let tree_oid = self.insert_nested_blob(&mut tree_builder, &chunk_path, chunk_blob)?;
let tree = self.repo.find_tree(tree_oid)?;
let sig = Signature::now("grite", "grit@local")?;
let message = format!("WAL: {} events from {}", events.len(), &actor_id_hex[..8]);
let parents: Vec<&git2::Commit> = parent_commit.as_ref().map(|c| vec![c]).unwrap_or_default();
let commit_oid = self.repo.commit(
Some(WAL_REF),
&sig,
&sig,
&message,
&tree,
&parents,
)?;
Ok(commit_oid)
}
pub fn read_all(&self) -> Result<Vec<Event>, GitError> {
let head = match self.head()? {
Some(oid) => oid,
None => return Ok(vec![]),
};
self.read_since_impl(head, None)
}
pub fn read_since(&self, since_oid: Oid) -> Result<Vec<Event>, GitError> {
let head = match self.head()? {
Some(oid) => oid,
None => return Ok(vec![]),
};
self.read_since_impl(head, Some(since_oid))
}
pub fn read_from_oid(&self, oid: Oid) -> Result<Vec<Event>, GitError> {
self.read_since_impl(oid, None)
}
fn read_since_impl(&self, head: Oid, stop_at: Option<Oid>) -> Result<Vec<Event>, GitError> {
let mut all_events = Vec::new();
let mut current_oid = Some(head);
while let Some(oid) = current_oid {
if Some(oid) == stop_at {
break;
}
let commit = self.repo.find_commit(oid)?;
let tree = commit.tree()?;
let meta_entry = tree.get_name("meta.json")
.ok_or_else(|| GitError::Wal("Missing meta.json in WAL commit".to_string()))?;
let meta_blob = self.repo.find_blob(meta_entry.id())?;
let meta: WalMeta = serde_json::from_slice(meta_blob.content())?;
let events = self.find_chunk_in_tree(&tree)?;
all_events.extend(events);
current_oid = meta.prev_wal
.as_ref()
.map(|s| Oid::from_str(s))
.transpose()?;
}
all_events.reverse();
Ok(all_events)
}
fn find_chunk_in_tree(&self, tree: &git2::Tree) -> Result<Vec<Event>, GitError> {
let mut events = Vec::new();
self.walk_tree_for_chunks(tree, &mut events)?;
Ok(events)
}
fn walk_tree_for_chunks(&self, tree: &git2::Tree, events: &mut Vec<Event>) -> Result<(), GitError> {
for entry in tree.iter() {
let name = entry.name().unwrap_or("");
match entry.kind() {
Some(git2::ObjectType::Blob) if name.ends_with(".bin") => {
let blob = self.repo.find_blob(entry.id())?;
let chunk_events = decode_chunk(blob.content())?;
events.extend(chunk_events);
}
Some(git2::ObjectType::Tree) => {
let subtree = self.repo.find_tree(entry.id())?;
self.walk_tree_for_chunks(&subtree, events)?;
}
_ => {}
}
}
Ok(())
}
fn insert_nested_blob(
&self,
root_builder: &mut git2::TreeBuilder,
path: &str,
blob_oid: Oid,
) -> Result<Oid, GitError> {
let parts: Vec<&str> = path.split('/').collect();
if parts.len() == 1 {
root_builder.insert(parts[0], blob_oid, 0o100644)?;
return Ok(root_builder.write()?);
}
self.insert_nested_recursive(root_builder, &parts, blob_oid)
}
fn insert_nested_recursive(
&self,
builder: &mut git2::TreeBuilder,
parts: &[&str],
blob_oid: Oid,
) -> Result<Oid, GitError> {
if parts.len() == 1 {
builder.insert(parts[0], blob_oid, 0o100644)?;
return Ok(builder.write()?);
}
let dir_name = parts[0];
let remaining = &parts[1..];
let existing_tree = builder.get(dir_name)?.map(|e| e.id());
let mut sub_builder = if let Some(tree_oid) = existing_tree {
let tree = self.repo.find_tree(tree_oid)?;
self.repo.treebuilder(Some(&tree))?
} else {
self.repo.treebuilder(None)?
};
self.insert_nested_recursive(&mut sub_builder, remaining, blob_oid)?;
let sub_tree_oid = sub_builder.write()?;
builder.insert(dir_name, sub_tree_oid, 0o040000)?;
Ok(builder.write()?)
}
}
#[cfg(test)]
mod tests {
use super::*;
use libgrite_core::hash::compute_event_id;
use libgrite_core::types::event::EventKind;
use libgrite_core::types::ids::generate_issue_id;
use tempfile::TempDir;
use std::process::Command;
fn setup_test_repo() -> (TempDir, Repository) {
let temp = TempDir::new().unwrap();
Command::new("git")
.args(["init"])
.current_dir(temp.path())
.output()
.unwrap();
let repo = Repository::open(temp.path()).unwrap();
(temp, repo)
}
fn make_test_event(kind: EventKind) -> Event {
let issue_id = generate_issue_id();
let actor = [1u8; 16];
let ts_unix_ms = 1700000000000u64;
let event_id = compute_event_id(&issue_id, &actor, ts_unix_ms, None, &kind);
Event::new(event_id, issue_id, actor, ts_unix_ms, None, kind)
}
#[test]
fn test_wal_append_and_read() {
let (temp, _repo) = setup_test_repo();
let git_dir = temp.path().join(".git");
let wal = WalManager::open(&git_dir).unwrap();
assert!(wal.head().unwrap().is_none());
let event = make_test_event(EventKind::IssueCreated {
title: "Test".to_string(),
body: "Body".to_string(),
labels: vec![],
});
let actor = [1u8; 16];
let oid = wal.append(&actor, &[event.clone()]).unwrap();
assert!(wal.head().unwrap().is_some());
assert_eq!(wal.head().unwrap().unwrap(), oid);
let events = wal.read_all().unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_id, event.event_id);
}
#[test]
fn test_wal_multiple_appends() {
let (temp, _repo) = setup_test_repo();
let git_dir = temp.path().join(".git");
let wal = WalManager::open(&git_dir).unwrap();
let actor = [1u8; 16];
let event1 = make_test_event(EventKind::IssueCreated {
title: "First".to_string(),
body: "Body 1".to_string(),
labels: vec![],
});
let oid1 = wal.append(&actor, &[event1.clone()]).unwrap();
let event2 = make_test_event(EventKind::CommentAdded {
body: "A comment".to_string(),
});
let _oid2 = wal.append(&actor, &[event2.clone()]).unwrap();
let events = wal.read_all().unwrap();
assert_eq!(events.len(), 2);
assert_eq!(events[0].event_id, event1.event_id);
assert_eq!(events[1].event_id, event2.event_id);
let events_since = wal.read_since(oid1).unwrap();
assert_eq!(events_since.len(), 1);
assert_eq!(events_since[0].event_id, event2.event_id);
}
}