use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use super::body_doc::{BodyDoc, SyncCallback};
use super::storage::{CrdtStorage, StorageResult};
use super::types::UpdateOrigin;
use crate::fs::FileSystemEvent;
pub struct BodyDocManager {
storage: Arc<dyn CrdtStorage>,
docs: RwLock<HashMap<String, Arc<BodyDoc>>>,
event_callback: RwLock<Option<Arc<dyn Fn(&FileSystemEvent) + Send + Sync>>>,
sync_callback: RwLock<Option<SyncCallback>>,
}
impl BodyDocManager {
pub fn new(storage: Arc<dyn CrdtStorage>) -> Self {
Self {
storage,
docs: RwLock::new(HashMap::new()),
event_callback: RwLock::new(None),
sync_callback: RwLock::new(None),
}
}
pub fn set_event_callback(&self, callback: Arc<dyn Fn(&FileSystemEvent) + Send + Sync>) {
let mut cb = self.event_callback.write().unwrap();
*cb = Some(callback);
}
pub fn set_sync_callback(&self, callback: SyncCallback) {
{
let mut cb = self.sync_callback.write().unwrap();
*cb = Some(callback.clone());
}
let docs = self.docs.read().unwrap();
log::trace!(
"[BodyDocManager] set_sync_callback: applying to {} currently loaded docs",
docs.len()
);
for (doc_name, doc) in docs.iter() {
log::trace!(
"[BodyDocManager] set_sync_callback: registering observer for '{}'",
doc_name
);
doc.set_sync_callback(Arc::new({
let callback = callback.clone();
let doc_name = doc_name.clone();
move |_name: &str, update: &[u8]| {
callback(&doc_name, update);
}
}));
}
}
fn apply_event_callback(&self, doc: &mut BodyDoc) {
let cb = self.event_callback.read().unwrap();
if let Some(ref callback) = *cb {
doc.set_event_callback(Arc::clone(callback));
}
}
fn apply_sync_callback(&self, doc: &BodyDoc, doc_name: &str) {
let cb = self.sync_callback.read().unwrap();
if let Some(ref callback) = *cb {
log::trace!(
"[BodyDocManager] apply_sync_callback: registering observer for '{}'",
doc_name
);
let callback = callback.clone();
let doc_name = doc_name.to_string();
doc.set_sync_callback(Arc::new(move |_name: &str, update: &[u8]| {
callback(&doc_name, update);
}));
} else {
log::trace!(
"[BodyDocManager] apply_sync_callback: no callback set, skipping '{}'",
doc_name
);
}
}
pub fn get(&self, doc_name: &str) -> Option<Arc<BodyDoc>> {
{
let docs = self.docs.read().unwrap();
if let Some(doc) = docs.get(doc_name) {
return Some(Arc::clone(doc));
}
}
match self.storage.load_doc(doc_name) {
Ok(Some(_)) => {
let mut docs = self.docs.write().unwrap();
if let Some(doc) = docs.get(doc_name) {
return Some(Arc::clone(doc));
}
match BodyDoc::load(Arc::clone(&self.storage), doc_name.to_string()) {
Ok(mut doc) => {
self.apply_event_callback(&mut doc);
let doc = Arc::new(doc);
self.apply_sync_callback(&doc, doc_name);
docs.insert(doc_name.to_string(), Arc::clone(&doc));
Some(doc)
}
Err(_) => None,
}
}
_ => None,
}
}
pub fn get_or_create(&self, doc_name: &str) -> Arc<BodyDoc> {
{
let docs = self.docs.read().unwrap();
if let Some(doc) = docs.get(doc_name) {
return Arc::clone(doc);
}
}
let mut docs = self.docs.write().unwrap();
if let Some(doc) = docs.get(doc_name) {
return Arc::clone(doc);
}
let mut doc = match BodyDoc::load(Arc::clone(&self.storage), doc_name.to_string()) {
Ok(doc) => doc,
Err(_) => BodyDoc::new(Arc::clone(&self.storage), doc_name.to_string()),
};
self.apply_event_callback(&mut doc);
let doc = Arc::new(doc);
self.apply_sync_callback(&doc, doc_name);
docs.insert(doc_name.to_string(), Arc::clone(&doc));
doc
}
pub fn create(&self, doc_name: &str) -> Arc<BodyDoc> {
let mut doc = BodyDoc::new(Arc::clone(&self.storage), doc_name.to_string());
self.apply_event_callback(&mut doc);
let doc = Arc::new(doc);
self.apply_sync_callback(&doc, doc_name);
let mut docs = self.docs.write().unwrap();
docs.insert(doc_name.to_string(), Arc::clone(&doc));
doc
}
pub fn is_loaded(&self, doc_name: &str) -> bool {
let docs = self.docs.read().unwrap();
docs.contains_key(doc_name)
}
pub fn unload(&self, doc_name: &str) -> Option<Arc<BodyDoc>> {
let mut docs = self.docs.write().unwrap();
docs.remove(doc_name)
}
pub fn loaded_docs(&self) -> Vec<String> {
let docs = self.docs.read().unwrap();
docs.keys().cloned().collect()
}
pub fn save_all(&self) -> StorageResult<()> {
let docs = self.docs.read().unwrap();
for doc in docs.values() {
doc.save()?;
}
Ok(())
}
pub fn save(&self, doc_name: &str) -> StorageResult<bool> {
let docs = self.docs.read().unwrap();
if let Some(doc) = docs.get(doc_name) {
doc.save()?;
Ok(true)
} else {
Ok(false)
}
}
pub fn apply_update(
&self,
doc_name: &str,
update: &[u8],
origin: UpdateOrigin,
) -> StorageResult<Option<i64>> {
let doc = self.get_or_create(doc_name);
doc.apply_update(update, origin)
}
pub fn get_sync_state(&self, doc_name: &str) -> Option<Vec<u8>> {
self.get(doc_name).map(|doc| doc.encode_state_vector())
}
pub fn get_full_state(&self, doc_name: &str) -> Option<Vec<u8>> {
self.get(doc_name).map(|doc| doc.encode_state_as_update())
}
pub fn get_diff(&self, doc_name: &str, remote_state_vector: &[u8]) -> StorageResult<Vec<u8>> {
let doc = self.get_or_create(doc_name);
doc.encode_diff(remote_state_vector)
}
pub fn loaded_count(&self) -> usize {
let docs = self.docs.read().unwrap();
docs.len()
}
pub fn clear(&self) {
let mut docs = self.docs.write().unwrap();
docs.clear();
}
pub fn rename(&self, old_name: &str, new_name: &str) -> StorageResult<()> {
log::debug!("BodyDocManager: renaming {} to {}", old_name, new_name);
self.storage.rename_doc(old_name, new_name)?;
let mut docs = self.docs.write().unwrap();
if let Some(doc) = docs.remove(old_name) {
doc.set_doc_name(new_name.to_string());
docs.insert(new_name.to_string(), doc);
}
log::debug!(
"BodyDocManager: rename complete {} -> {}",
old_name,
new_name
);
Ok(())
}
pub fn delete(&self, doc_name: &str) -> StorageResult<()> {
log::debug!("BodyDocManager: deleting {}", doc_name);
self.storage.delete_doc(doc_name)?;
let mut docs = self.docs.write().unwrap();
docs.remove(doc_name);
Ok(())
}
}
impl std::fmt::Debug for BodyDocManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let docs = self.docs.read().unwrap();
f.debug_struct("BodyDocManager")
.field("loaded_docs", &docs.keys().collect::<Vec<_>>())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::crdt::MemoryStorage;
fn create_manager() -> BodyDocManager {
let storage = Arc::new(MemoryStorage::new());
BodyDocManager::new(storage)
}
#[test]
fn test_get_or_create_new_doc() {
let manager = create_manager();
let doc = manager.get_or_create("test.md");
assert_eq!(doc.doc_name(), "test.md");
assert_eq!(doc.get_body(), "");
}
#[test]
fn test_get_returns_cached_doc() {
let manager = create_manager();
let doc1 = manager.get_or_create("test.md");
let _ = doc1.set_body("Hello");
let doc2 = manager.get("test.md").unwrap();
assert_eq!(doc2.get_body(), "Hello");
assert!(Arc::ptr_eq(&doc1, &doc2));
}
#[test]
fn test_get_nonexistent_returns_none() {
let manager = create_manager();
assert!(manager.get("nonexistent.md").is_none());
}
#[test]
fn test_create_replaces_existing() {
let manager = create_manager();
let doc1 = manager.get_or_create("test.md");
let _ = doc1.set_body("Original");
let doc2 = manager.create("test.md");
assert_eq!(doc2.get_body(), "");
assert!(!Arc::ptr_eq(&doc1, &doc2));
}
#[test]
fn test_is_loaded() {
let manager = create_manager();
assert!(!manager.is_loaded("test.md"));
manager.get_or_create("test.md");
assert!(manager.is_loaded("test.md"));
}
#[test]
fn test_unload() {
let manager = create_manager();
manager.get_or_create("test.md");
assert!(manager.is_loaded("test.md"));
manager.unload("test.md");
assert!(!manager.is_loaded("test.md"));
}
#[test]
fn test_loaded_docs() {
let manager = create_manager();
manager.get_or_create("doc1.md");
manager.get_or_create("doc2.md");
manager.get_or_create("doc3.md");
let loaded = manager.loaded_docs();
assert_eq!(loaded.len(), 3);
assert!(loaded.contains(&"doc1.md".to_string()));
assert!(loaded.contains(&"doc2.md".to_string()));
assert!(loaded.contains(&"doc3.md".to_string()));
}
#[test]
fn test_save_and_reload() {
let storage: Arc<dyn CrdtStorage> = Arc::new(MemoryStorage::new());
let manager = BodyDocManager::new(Arc::clone(&storage));
let doc = manager.get_or_create("test.md");
let _ = doc.set_body("Persistent content");
manager.save_all().unwrap();
manager.clear();
assert!(!manager.is_loaded("test.md"));
let reloaded = manager.get("test.md").unwrap();
assert_eq!(reloaded.get_body(), "Persistent content");
}
#[test]
fn test_apply_update_creates_doc() {
let storage: Arc<dyn CrdtStorage> = Arc::new(MemoryStorage::new());
let source_doc = BodyDoc::new(Arc::clone(&storage), "source.md".to_string());
let _ = source_doc.set_body("Synced content");
let update = source_doc.encode_state_as_update();
let manager = BodyDocManager::new(Arc::clone(&storage));
manager
.apply_update("target.md", &update, UpdateOrigin::Remote)
.unwrap();
let target = manager.get("target.md").unwrap();
assert_eq!(target.get_body(), "Synced content");
}
#[test]
fn test_loaded_count() {
let manager = create_manager();
assert_eq!(manager.loaded_count(), 0);
manager.get_or_create("doc1.md");
assert_eq!(manager.loaded_count(), 1);
manager.get_or_create("doc2.md");
assert_eq!(manager.loaded_count(), 2);
manager.unload("doc1.md");
assert_eq!(manager.loaded_count(), 1);
}
#[test]
fn test_clear() {
let manager = create_manager();
manager.get_or_create("doc1.md");
manager.get_or_create("doc2.md");
assert_eq!(manager.loaded_count(), 2);
manager.clear();
assert_eq!(manager.loaded_count(), 0);
}
#[test]
fn test_get_sync_state() {
let manager = create_manager();
assert!(manager.get_sync_state("nonexistent.md").is_none());
manager.get_or_create("test.md");
let state = manager.get_sync_state("test.md");
assert!(state.is_some());
}
#[test]
fn test_sync_between_managers() {
let storage1 = Arc::new(MemoryStorage::new());
let storage2 = Arc::new(MemoryStorage::new());
let manager1 = BodyDocManager::new(storage1);
let manager2 = BodyDocManager::new(storage2);
let doc1 = manager1.get_or_create("shared.md");
let _ = doc1.set_body("Hello from manager1");
let update = manager1.get_full_state("shared.md").unwrap();
manager2
.apply_update("shared.md", &update, UpdateOrigin::Remote)
.unwrap();
let doc2 = manager2.get("shared.md").unwrap();
assert_eq!(doc2.get_body(), "Hello from manager1");
}
#[test]
fn test_rename_preserves_content() {
let storage: Arc<dyn CrdtStorage> = Arc::new(MemoryStorage::new());
let manager = BodyDocManager::new(Arc::clone(&storage));
let doc = manager.get_or_create("old_name.md");
let _ = doc.set_body("Important content");
let _ = doc.save();
manager.rename("old_name.md", "new_name.md").unwrap();
assert!(!manager.is_loaded("old_name.md"));
assert!(storage.load_doc("old_name.md").unwrap().is_none());
assert!(manager.is_loaded("new_name.md"));
let renamed_doc = manager.get("new_name.md").unwrap();
assert_eq!(renamed_doc.get_body(), "Important content");
assert_eq!(renamed_doc.doc_name(), "new_name.md");
}
#[test]
fn test_rename_uncached_doc() {
let storage: Arc<dyn CrdtStorage> = Arc::new(MemoryStorage::new());
let manager = BodyDocManager::new(Arc::clone(&storage));
let doc = manager.get_or_create("old_name.md");
let _ = doc.set_body("Persisted content");
let _ = doc.save();
manager.clear();
assert!(!manager.is_loaded("old_name.md"));
manager.rename("old_name.md", "new_name.md").unwrap();
assert!(storage.load_doc("old_name.md").unwrap().is_none());
assert!(storage.load_doc("new_name.md").unwrap().is_some());
}
#[test]
fn test_delete_removes_from_storage_and_cache() {
let storage: Arc<dyn CrdtStorage> = Arc::new(MemoryStorage::new());
let manager = BodyDocManager::new(Arc::clone(&storage));
let doc = manager.get_or_create("to_delete.md");
let _ = doc.set_body("Soon to be deleted");
let _ = doc.save();
assert!(manager.is_loaded("to_delete.md"));
assert!(storage.load_doc("to_delete.md").unwrap().is_some());
manager.delete("to_delete.md").unwrap();
assert!(!manager.is_loaded("to_delete.md"));
assert!(storage.load_doc("to_delete.md").unwrap().is_none());
}
#[test]
fn test_sync_callback_fires_on_local_changes() {
use std::sync::{Arc, Mutex};
let storage: Arc<dyn CrdtStorage> = Arc::new(MemoryStorage::new());
let manager = BodyDocManager::new(Arc::clone(&storage));
let sync_calls = Arc::new(Mutex::new(Vec::<(String, Vec<u8>)>::new()));
let sync_calls_clone = Arc::clone(&sync_calls);
manager.set_sync_callback(Arc::new(move |doc_name: &str, update: &[u8]| {
sync_calls_clone
.lock()
.unwrap()
.push((doc_name.to_string(), update.to_vec()));
}));
let doc = manager.get_or_create("test.md");
let _ = doc.set_body("Hello World");
let calls = sync_calls.lock().unwrap();
assert!(
!calls.is_empty(),
"Sync callback should have been called when content was set"
);
assert_eq!(calls[0].0, "test.md", "Callback should receive doc name");
assert!(!calls[0].1.is_empty(), "Update bytes should not be empty");
}
}