use std::fs;
use std::path::PathBuf;
use lago_core::event::{EventEnvelope, EventPayload};
use lago_core::{BranchId, LagoResult, Projection};
use lago_store::BlobStore;
use tracing::{debug, warn};
pub struct LakeFsSync {
target_dir: PathBuf,
blob_store: BlobStore,
target_branch: BranchId,
}
impl LakeFsSync {
pub fn new(target_dir: PathBuf, blob_store: BlobStore, target_branch: BranchId) -> Self {
Self {
target_dir,
blob_store,
target_branch,
}
}
fn resolve_path(&self, virtual_path: &str) -> PathBuf {
let relative = virtual_path.trim_start_matches('/');
self.target_dir.join(relative)
}
}
impl Projection for LakeFsSync {
fn on_event(&mut self, event: &EventEnvelope) -> LagoResult<()> {
if event.branch_id != self.target_branch {
return Ok(());
}
match &event.payload {
EventPayload::FileWrite {
path, blob_hash, ..
} => {
let fs_path = self.resolve_path(path);
if let Some(parent) = fs_path.parent() {
let _ = fs::create_dir_all(parent);
}
let lago_hash = lago_core::BlobHash::from_hex(blob_hash.as_str());
match self.blob_store.get(&lago_hash) {
Ok(data) => {
let _ = fs::write(&fs_path, &data);
debug!(path = %path, "synced file write to disk");
}
Err(e) => {
warn!(path = %path, error = %e, "failed to get blob for sync");
}
}
}
EventPayload::FileDelete { path } => {
let fs_path = self.resolve_path(path);
let _ = fs::remove_file(&fs_path);
debug!(path = %path, "synced file delete to disk");
}
EventPayload::FileRename { old_path, new_path } => {
let old_fs_path = self.resolve_path(old_path);
let new_fs_path = self.resolve_path(new_path);
if let Some(parent) = new_fs_path.parent() {
let _ = fs::create_dir_all(parent);
}
let _ = fs::rename(&old_fs_path, &new_fs_path);
debug!(old = %old_path, new = %new_path, "synced file rename to disk");
}
_ => {}
}
Ok(())
}
fn name(&self) -> &str {
"lake_fs_sync"
}
}
#[cfg(test)]
mod tests {
use super::*;
use lago_core::id::{EventId, SeqNo, SessionId};
use std::collections::HashMap;
fn make_envelope(seq: SeqNo, payload: EventPayload, branch: &BranchId) -> EventEnvelope {
EventEnvelope {
event_id: EventId::new(),
session_id: SessionId::new(),
branch_id: branch.clone(),
run_id: None,
seq,
timestamp: 1000 + seq,
parent_id: None,
payload,
metadata: HashMap::new(),
schema_version: 1,
}
}
#[test]
fn syncs_writes_to_disk() {
let dir = tempfile::tempdir().unwrap();
let blobs_dir = dir.path().join("blobs");
let target_dir = dir.path().join("workspace");
fs::create_dir_all(&target_dir).unwrap();
let blob_store = BlobStore::open(&blobs_dir).unwrap();
let data = b"hello sync";
let hash = blob_store.put(data).unwrap();
let target_branch = BranchId::from_string("main");
let mut sync = LakeFsSync::new(target_dir.clone(), blob_store, target_branch.clone());
let event = make_envelope(
1,
EventPayload::FileWrite {
path: "/docs/readme.txt".to_string(),
blob_hash: hash.to_string().into(),
size_bytes: data.len() as u64,
content_type: None,
},
&target_branch,
);
sync.on_event(&event).unwrap();
let expected_path = target_dir.join("docs/readme.txt");
assert!(expected_path.exists());
assert_eq!(fs::read(&expected_path).unwrap(), b"hello sync");
}
#[test]
fn ignores_other_branches() {
let dir = tempfile::tempdir().unwrap();
let blob_store = BlobStore::open(dir.path().join("blobs")).unwrap();
let branch_main = BranchId::from_string("main");
let mut sync = LakeFsSync::new(dir.path().join("ws"), blob_store, branch_main);
let branch_feature = BranchId::from_string("feature");
let event = make_envelope(
1,
EventPayload::FileDelete {
path: "/some.txt".to_string(),
},
&branch_feature, );
sync.on_event(&event).unwrap();
}
}