use std::sync::Arc;
use yrs::updates::decoder::Decode;
use yrs::updates::encoder::Encode;
use yrs::{Doc, Map, MapRef, Observable, ReadTxn, StateVector, Transact, Update};
use super::storage::{CrdtStorage, StorageResult};
use super::types::{CrdtUpdate, FileMetadata, UpdateOrigin};
use crate::error::DiaryxError;
const FILES_MAP_NAME: &str = "files";
const WORKSPACE_DOC_NAME: &str = "workspace";
pub struct WorkspaceCrdt {
doc: Doc,
files_map: MapRef,
storage: Arc<dyn CrdtStorage>,
doc_name: String,
}
impl WorkspaceCrdt {
pub fn new(storage: Arc<dyn CrdtStorage>) -> Self {
Self::with_name(storage, WORKSPACE_DOC_NAME.to_string())
}
pub fn with_name(storage: Arc<dyn CrdtStorage>, doc_name: String) -> Self {
let doc = Doc::new();
let files_map = doc.get_or_insert_map(FILES_MAP_NAME);
Self {
doc,
files_map,
storage,
doc_name,
}
}
pub fn load(storage: Arc<dyn CrdtStorage>) -> StorageResult<Self> {
Self::load_with_name(storage, WORKSPACE_DOC_NAME.to_string())
}
pub fn load_with_name(storage: Arc<dyn CrdtStorage>, doc_name: String) -> StorageResult<Self> {
let doc = Doc::new();
{
let mut txn = doc.transact_mut();
if let Some(state) = storage.load_doc(&doc_name)? {
let update = Update::decode_v1(&state).map_err(|e| {
DiaryxError::Unsupported(format!("Failed to decode CRDT state: {}", e))
})?;
txn.apply_update(update).map_err(|e| {
DiaryxError::Unsupported(format!("Failed to apply snapshot: {}", e))
})?;
}
let updates = storage.get_all_updates(&doc_name)?;
for crdt_update in updates {
if let Ok(update) = Update::decode_v1(&crdt_update.data) {
let _ = txn.apply_update(update);
}
}
}
let files_map = doc.get_or_insert_map(FILES_MAP_NAME);
Ok(Self {
doc,
files_map,
storage,
doc_name,
})
}
pub fn doc(&self) -> &Doc {
&self.doc
}
pub fn doc_name(&self) -> &str {
&self.doc_name
}
pub fn storage(&self) -> &Arc<dyn CrdtStorage> {
&self.storage
}
pub fn get_file(&self, path: &str) -> Option<FileMetadata> {
let txn = self.doc.transact();
self.files_map.get(&txn, path).and_then(|value| {
let json = value.to_string(&txn);
serde_json::from_str(&json).ok()
})
}
pub fn set_file(&self, path: &str, metadata: FileMetadata) -> StorageResult<()> {
let sv_before = {
let txn = self.doc.transact();
txn.state_vector()
};
{
let mut txn = self.doc.transact_mut();
let json = serde_json::to_string(&metadata).unwrap_or_default();
self.files_map.insert(&mut txn, path, json);
}
let update = {
let txn = self.doc.transact();
txn.encode_state_as_update_v1(&sv_before)
};
if !update.is_empty() {
self.storage
.append_update(&self.doc_name, &update, UpdateOrigin::Local)?;
}
Ok(())
}
pub fn delete_file(&self, path: &str) -> StorageResult<()> {
if let Some(mut metadata) = self.get_file(path) {
metadata.mark_deleted();
self.set_file(path, metadata)?;
}
Ok(())
}
pub fn remove_file(&self, path: &str) -> StorageResult<()> {
let sv_before = {
let txn = self.doc.transact();
txn.state_vector()
};
{
let mut txn = self.doc.transact_mut();
self.files_map.remove(&mut txn, path);
}
let update = {
let txn = self.doc.transact();
txn.encode_state_as_update_v1(&sv_before)
};
if !update.is_empty() {
self.storage
.append_update(&self.doc_name, &update, UpdateOrigin::Local)?;
}
Ok(())
}
pub fn list_files(&self) -> Vec<(String, FileMetadata)> {
let txn = self.doc.transact();
self.files_map
.iter(&txn)
.filter_map(|(key, value)| {
let path = key.to_string();
let json = value.to_string(&txn);
let metadata: FileMetadata = serde_json::from_str(&json).ok()?;
Some((path, metadata))
})
.collect()
}
pub fn list_active_files(&self) -> Vec<(String, FileMetadata)> {
self.list_files()
.into_iter()
.filter(|(_, meta)| !meta.deleted)
.collect()
}
pub fn file_count(&self) -> usize {
let txn = self.doc.transact();
self.files_map.len(&txn) as usize
}
pub fn encode_state_vector(&self) -> Vec<u8> {
let txn = self.doc.transact();
txn.state_vector().encode_v1()
}
pub fn encode_state_as_update(&self) -> Vec<u8> {
let txn = self.doc.transact();
txn.encode_state_as_update_v1(&StateVector::default())
}
pub fn encode_diff(&self, remote_state_vector: &[u8]) -> StorageResult<Vec<u8>> {
let sv = StateVector::decode_v1(remote_state_vector).map_err(|e| {
DiaryxError::Unsupported(format!("Failed to decode state vector: {}", e))
})?;
let txn = self.doc.transact();
Ok(txn.encode_state_as_update_v1(&sv))
}
pub fn apply_update(&self, update: &[u8], origin: UpdateOrigin) -> StorageResult<Option<i64>> {
let decoded = Update::decode_v1(update)
.map_err(|e| DiaryxError::Unsupported(format!("Failed to decode update: {}", e)))?;
{
let mut txn = self.doc.transact_mut();
txn.apply_update(decoded)
.map_err(|e| DiaryxError::Unsupported(format!("Failed to apply update: {}", e)))?;
}
let update_id = self.storage.append_update(&self.doc_name, update, origin)?;
Ok(Some(update_id))
}
pub fn save(&self) -> StorageResult<()> {
let state = self.encode_state_as_update();
self.storage.save_doc(&self.doc_name, &state)
}
pub fn reload(&mut self) -> StorageResult<()> {
if let Some(state) = self.storage.load_doc(&self.doc_name)? {
let update = Update::decode_v1(&state).map_err(|e| {
DiaryxError::Unsupported(format!("Failed to decode CRDT state: {}", e))
})?;
self.doc = Doc::new();
self.files_map = self.doc.get_or_insert_map(FILES_MAP_NAME);
let mut txn = self.doc.transact_mut();
txn.apply_update(update)
.map_err(|e| DiaryxError::Unsupported(format!("Failed to apply update: {}", e)))?;
}
Ok(())
}
pub fn get_history(&self) -> StorageResult<Vec<CrdtUpdate>> {
self.storage.get_all_updates(&self.doc_name)
}
pub fn get_updates_since(&self, since_id: i64) -> StorageResult<Vec<CrdtUpdate>> {
self.storage.get_updates_since(&self.doc_name, since_id)
}
pub fn get_latest_update_id(&self) -> StorageResult<i64> {
self.storage.get_latest_update_id(&self.doc_name)
}
pub fn observe_updates<F>(&self, callback: F) -> yrs::Subscription
where
F: Fn(&[u8]) + 'static,
{
self.doc
.observe_update_v1(move |_txn, event| {
callback(&event.update);
})
.expect("Failed to observe document updates")
}
pub fn observe_files<F>(&self, callback: F) -> yrs::Subscription
where
F: Fn(Vec<(String, Option<FileMetadata>)>) + 'static,
{
self.files_map.observe(move |txn, event| {
let changes: Vec<(String, Option<FileMetadata>)> = event
.keys(txn)
.iter()
.map(|(key, change)| {
let path = key.to_string();
match change {
yrs::types::EntryChange::Inserted(value)
| yrs::types::EntryChange::Updated(_, value) => {
let json = value.clone().cast::<String>().unwrap_or_default();
let metadata: Option<FileMetadata> = serde_json::from_str(&json).ok();
(path, metadata)
}
yrs::types::EntryChange::Removed(_) => (path, None),
}
})
.collect();
if !changes.is_empty() {
callback(changes);
}
})
}
}
impl std::fmt::Debug for WorkspaceCrdt {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WorkspaceCrdt")
.field("doc_name", &self.doc_name)
.field("file_count", &self.file_count())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::crdt::MemoryStorage;
fn create_test_crdt() -> WorkspaceCrdt {
let storage: Arc<dyn CrdtStorage> = Arc::new(MemoryStorage::new());
WorkspaceCrdt::new(storage)
}
#[test]
fn test_new_workspace_is_empty() {
let crdt = create_test_crdt();
assert_eq!(crdt.file_count(), 0);
assert!(crdt.list_files().is_empty());
}
#[test]
fn test_set_and_get_file() {
let crdt = create_test_crdt();
let metadata = FileMetadata::new(Some("Test File".to_string()));
crdt.set_file("test.md", metadata.clone()).unwrap();
let retrieved = crdt.get_file("test.md").unwrap();
assert_eq!(retrieved.title, Some("Test File".to_string()));
}
#[test]
fn test_get_nonexistent_file() {
let crdt = create_test_crdt();
assert!(crdt.get_file("nonexistent.md").is_none());
}
#[test]
fn test_update_file() {
let crdt = create_test_crdt();
let mut metadata = FileMetadata::new(Some("Original".to_string()));
crdt.set_file("test.md", metadata.clone()).unwrap();
metadata.title = Some("Updated".to_string());
crdt.set_file("test.md", metadata).unwrap();
let retrieved = crdt.get_file("test.md").unwrap();
assert_eq!(retrieved.title, Some("Updated".to_string()));
assert_eq!(crdt.file_count(), 1);
}
#[test]
fn test_delete_file() {
let crdt = create_test_crdt();
let metadata = FileMetadata::new(Some("To Delete".to_string()));
crdt.set_file("test.md", metadata).unwrap();
crdt.delete_file("test.md").unwrap();
let retrieved = crdt.get_file("test.md").unwrap();
assert!(retrieved.deleted);
assert_eq!(crdt.file_count(), 1);
}
#[test]
fn test_list_active_files() {
let crdt = create_test_crdt();
crdt.set_file("active.md", FileMetadata::new(Some("Active".to_string())))
.unwrap();
crdt.set_file("deleted.md", FileMetadata::new(Some("Deleted".to_string())))
.unwrap();
crdt.delete_file("deleted.md").unwrap();
let all = crdt.list_files();
assert_eq!(all.len(), 2);
let active = crdt.list_active_files();
assert_eq!(active.len(), 1);
assert_eq!(active[0].0, "active.md");
}
#[test]
fn test_remove_file() {
let crdt = create_test_crdt();
crdt.set_file("test.md", FileMetadata::new(Some("Test".to_string())))
.unwrap();
assert_eq!(crdt.file_count(), 1);
crdt.remove_file("test.md").unwrap();
assert_eq!(crdt.file_count(), 0);
assert!(crdt.get_file("test.md").is_none());
}
#[test]
fn test_encode_and_apply_update() {
let crdt1 = create_test_crdt();
let crdt2 = create_test_crdt();
crdt1
.set_file("file1.md", FileMetadata::new(Some("File 1".to_string())))
.unwrap();
crdt1
.set_file("file2.md", FileMetadata::new(Some("File 2".to_string())))
.unwrap();
let update = crdt1.encode_state_as_update();
crdt2.apply_update(&update, UpdateOrigin::Remote).unwrap();
assert_eq!(crdt2.file_count(), 2);
assert!(crdt2.get_file("file1.md").is_some());
assert!(crdt2.get_file("file2.md").is_some());
}
#[test]
fn test_encode_diff() {
let crdt1 = create_test_crdt();
let crdt2 = create_test_crdt();
crdt1
.set_file("file1.md", FileMetadata::new(Some("File 1".to_string())))
.unwrap();
let update = crdt1.encode_state_as_update();
crdt2.apply_update(&update, UpdateOrigin::Sync).unwrap();
crdt1
.set_file("file2.md", FileMetadata::new(Some("File 2".to_string())))
.unwrap();
let sv = crdt2.encode_state_vector();
let diff = crdt1.encode_diff(&sv).unwrap();
crdt2.apply_update(&diff, UpdateOrigin::Remote).unwrap();
assert_eq!(crdt2.file_count(), 2);
}
#[test]
fn test_save_and_load() {
let storage: Arc<dyn CrdtStorage> = Arc::new(MemoryStorage::new());
{
let crdt1 = WorkspaceCrdt::new(Arc::clone(&storage));
crdt1
.set_file("file1.md", FileMetadata::new(Some("File 1".to_string())))
.unwrap();
crdt1
.set_file("file2.md", FileMetadata::new(Some("File 2".to_string())))
.unwrap();
crdt1.save().unwrap();
}
let crdt2 = WorkspaceCrdt::load(storage).unwrap();
assert_eq!(crdt2.file_count(), 2);
assert_eq!(
crdt2.get_file("file1.md").unwrap().title,
Some("File 1".to_string())
);
}
#[test]
fn test_concurrent_edits_merge() {
let storage1: Arc<dyn CrdtStorage> = Arc::new(MemoryStorage::new());
let storage2: Arc<dyn CrdtStorage> = Arc::new(MemoryStorage::new());
let crdt1 = WorkspaceCrdt::new(storage1);
let crdt2 = WorkspaceCrdt::new(storage2);
crdt1
.set_file(
"file1.md",
FileMetadata::new(Some("From CRDT1".to_string())),
)
.unwrap();
crdt2
.set_file(
"file2.md",
FileMetadata::new(Some("From CRDT2".to_string())),
)
.unwrap();
let update1 = crdt1.encode_state_as_update();
let update2 = crdt2.encode_state_as_update();
crdt1.apply_update(&update2, UpdateOrigin::Remote).unwrap();
crdt2.apply_update(&update1, UpdateOrigin::Remote).unwrap();
assert_eq!(crdt1.file_count(), 2);
assert_eq!(crdt2.file_count(), 2);
assert!(crdt1.get_file("file1.md").is_some());
assert!(crdt1.get_file("file2.md").is_some());
assert!(crdt2.get_file("file1.md").is_some());
assert!(crdt2.get_file("file2.md").is_some());
}
#[test]
fn test_file_metadata_with_contents() {
let crdt = create_test_crdt();
let mut metadata = FileMetadata::new(Some("Index".to_string()));
metadata.part_of = None;
metadata.contents = Some(vec!["child1.md".to_string(), "child2.md".to_string()]);
metadata.audience = Some(vec!["public".to_string()]);
crdt.set_file("index.md", metadata).unwrap();
let retrieved = crdt.get_file("index.md").unwrap();
assert_eq!(retrieved.contents.unwrap().len(), 2);
assert_eq!(retrieved.audience.unwrap(), vec!["public"]);
}
#[test]
fn test_observer_fires_on_change() {
use std::cell::RefCell;
use std::rc::Rc;
let crdt = create_test_crdt();
let changes = Rc::new(RefCell::new(Vec::new()));
let changes_clone = Rc::clone(&changes);
let _sub = crdt.observe_files(move |file_changes| {
changes_clone.borrow_mut().extend(file_changes);
});
crdt.set_file("test.md", FileMetadata::new(Some("Test".to_string())))
.unwrap();
let captured = changes.borrow();
assert_eq!(captured.len(), 1);
assert_eq!(captured[0].0, "test.md");
}
}