use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use serde::{Deserialize, Serialize};
use ts_rs::TS;
use yrs::updates::decoder::Decode;
use yrs::{Doc, Map, ReadTxn, StateVector, Transact, Update};
use super::storage::{CrdtStorage, StorageResult};
use super::types::FileMetadata;
use crate::error::DiaryxError;
const SNAPSHOT_CACHE_MAX_SIZE: usize = 10;
const SNAPSHOT_INTERVAL: i64 = 100;
const FILES_MAP_NAME: &str = "files";
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
#[ts(export, export_to = "bindings/")]
pub struct HistoryEntry {
pub update_id: i64,
pub timestamp: i64,
pub origin: String,
pub files_changed: Vec<String>,
pub device_id: Option<String>,
pub device_name: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, TS)]
#[ts(export, export_to = "bindings/")]
pub enum ChangeType {
Added,
Modified,
Deleted,
Restored,
}
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
#[ts(export, export_to = "bindings/")]
pub struct FileDiff {
pub path: String,
pub change_type: ChangeType,
pub old_metadata: Option<FileMetadata>,
pub new_metadata: Option<FileMetadata>,
}
#[derive(Clone)]
struct CachedSnapshot {
update_id: i64,
state: Vec<u8>,
}
pub struct HistoryManager {
storage: Arc<dyn CrdtStorage>,
snapshot_cache: RwLock<HashMap<String, Vec<CachedSnapshot>>>,
}
impl HistoryManager {
pub fn new(storage: Arc<dyn CrdtStorage>) -> Self {
Self {
storage,
snapshot_cache: RwLock::new(HashMap::new()),
}
}
pub fn get_history(
&self,
doc_name: &str,
limit: Option<usize>,
) -> StorageResult<Vec<HistoryEntry>> {
self.get_history_with_files_changed(doc_name, limit)
}
fn get_history_with_files_changed(
&self,
doc_name: &str,
limit: Option<usize>,
) -> StorageResult<Vec<HistoryEntry>> {
let updates = self.storage.get_all_updates(doc_name)?;
if doc_name != "workspace" {
let entries: Vec<HistoryEntry> = updates
.into_iter()
.rev()
.take(limit.unwrap_or(usize::MAX))
.map(|u| HistoryEntry {
update_id: u.update_id,
timestamp: u.timestamp,
origin: u.origin.to_string(),
files_changed: vec![doc_name.to_string()],
device_id: u.device_id,
device_name: u.device_name,
})
.collect();
return Ok(entries);
}
let mut entries = Vec::new();
let doc = Doc::new();
let files_map = doc.get_or_insert_map(FILES_MAP_NAME);
let mut prev_files: HashMap<String, String> = HashMap::new();
for update in &updates {
if let Ok(decoded) = Update::decode_v1(&update.data) {
let mut txn = doc.transact_mut();
let _ = txn.apply_update(decoded);
}
let txn = doc.transact();
let mut current_files: HashMap<String, String> = HashMap::new();
for (key, value) in files_map.iter(&txn) {
current_files.insert(key.to_string(), value.to_string(&txn));
}
let mut files_changed = Vec::new();
for (path, new_value) in ¤t_files {
match prev_files.get(path) {
None => files_changed.push(path.clone()), Some(old_value) if old_value != new_value => {
files_changed.push(path.clone()) }
_ => {}
}
}
for path in prev_files.keys() {
if !current_files.contains_key(path) {
files_changed.push(path.clone());
}
}
files_changed.sort();
entries.push(HistoryEntry {
update_id: update.update_id,
timestamp: update.timestamp,
origin: update.origin.to_string(),
files_changed,
device_id: update.device_id.clone(),
device_name: update.device_name.clone(),
});
prev_files = current_files;
}
entries.reverse();
if let Some(limit) = limit {
entries.truncate(limit);
}
Ok(entries)
}
pub fn get_file_history(
&self,
file_path: &str,
limit: Option<usize>,
) -> StorageResult<Vec<HistoryEntry>> {
let workspace_history = self.get_history_with_files_changed("workspace", None)?;
let filtered_workspace: Vec<HistoryEntry> = workspace_history
.into_iter()
.filter(|e| e.files_changed.contains(&file_path.to_string()))
.collect();
let body_history = self.get_history_with_files_changed(file_path, None)?;
let mut combined: Vec<HistoryEntry> =
filtered_workspace.into_iter().chain(body_history).collect();
combined.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
if let Some(limit) = limit {
combined.truncate(limit);
}
Ok(combined)
}
pub fn get_state_at(&self, doc_name: &str, update_id: i64) -> StorageResult<Option<Vec<u8>>> {
let all_updates = self.storage.get_all_updates(doc_name)?;
if all_updates.is_empty() {
return self.storage.load_doc(doc_name);
}
if let Some(last) = all_updates.last()
&& update_id >= last.update_id
{
return self.storage.load_doc(doc_name);
}
let (start_update_id, base_state) = self.find_nearest_snapshot(doc_name, update_id);
let doc = Doc::new();
let _files_map = doc.get_or_insert_map(FILES_MAP_NAME);
if let Some(state) = base_state
&& let Ok(decoded) = Update::decode_v1(&state)
{
let mut txn = doc.transact_mut();
let _ = txn.apply_update(decoded);
}
let mut updates_applied_since_cache = 0i64;
let mut last_cacheable_id: Option<i64> = None;
for update in &all_updates {
if update.update_id <= start_update_id {
continue;
}
if update.update_id > update_id {
break;
}
let decoded = Update::decode_v1(&update.data).map_err(|e| {
DiaryxError::Crdt(format!(
"Failed to decode update {}: {}",
update.update_id, e
))
})?;
let mut txn = doc.transact_mut();
txn.apply_update(decoded).map_err(|e| {
DiaryxError::Crdt(format!(
"Failed to apply update {}: {}",
update.update_id, e
))
})?;
updates_applied_since_cache += 1;
if updates_applied_since_cache % SNAPSHOT_INTERVAL == 0 {
last_cacheable_id = Some(update.update_id);
}
}
let txn = doc.transact();
let state = txn.encode_state_as_update_v1(&StateVector::default());
if let Some(cache_id) = last_cacheable_id {
if cache_id < update_id {
self.cache_snapshot(doc_name, cache_id, &state);
}
}
Ok(Some(state))
}
fn find_nearest_snapshot(&self, doc_name: &str, target_id: i64) -> (i64, Option<Vec<u8>>) {
let cache = self.snapshot_cache.read().unwrap();
if let Some(snapshots) = cache.get(doc_name) {
if let Some(snapshot) = snapshots
.iter()
.filter(|s| s.update_id <= target_id)
.max_by_key(|s| s.update_id)
{
return (snapshot.update_id, Some(snapshot.state.clone()));
}
}
(0, None)
}
fn cache_snapshot(&self, doc_name: &str, update_id: i64, state: &[u8]) {
let mut cache = self.snapshot_cache.write().unwrap();
let snapshots = cache.entry(doc_name.to_string()).or_default();
if snapshots.iter().any(|s| s.update_id == update_id) {
return;
}
snapshots.push(CachedSnapshot {
update_id,
state: state.to_vec(),
});
snapshots.sort_by_key(|s| s.update_id);
if snapshots.len() > SNAPSHOT_CACHE_MAX_SIZE {
let step = snapshots.len() / SNAPSHOT_CACHE_MAX_SIZE;
let keep_indices: Vec<usize> = (0..SNAPSHOT_CACHE_MAX_SIZE).map(|i| i * step).collect();
let kept: Vec<CachedSnapshot> = snapshots
.iter()
.enumerate()
.filter(|(i, _)| keep_indices.contains(i))
.map(|(_, s)| s.clone())
.collect();
*snapshots = kept;
}
}
pub fn clear_cache(&self, doc_name: &str) {
let mut cache = self.snapshot_cache.write().unwrap();
cache.remove(doc_name);
}
fn get_files_from_state(&self, state: &[u8]) -> StorageResult<HashMap<String, FileMetadata>> {
let doc = Doc::new();
let update = Update::decode_v1(state)
.map_err(|e| DiaryxError::Crdt(format!("Failed to decode state: {}", e)))?;
{
let mut txn = doc.transact_mut();
txn.apply_update(update)
.map_err(|e| DiaryxError::Crdt(format!("Failed to apply state: {}", e)))?;
}
let files_map = doc.get_or_insert_map(FILES_MAP_NAME);
let txn = doc.transact();
let mut files = HashMap::new();
for (key, value) in files_map.iter(&txn) {
let path = key.to_string();
let json = value.to_string(&txn);
if let Ok(metadata) = serde_json::from_str::<FileMetadata>(&json) {
files.insert(path, metadata);
}
}
Ok(files)
}
pub fn diff(&self, doc_name: &str, from_id: i64, to_id: i64) -> StorageResult<Vec<FileDiff>> {
let from_state = self.get_state_at(doc_name, from_id)?;
let to_state = self.get_state_at(doc_name, to_id)?;
let from_files = match &from_state {
Some(state) => self.get_files_from_state(state)?,
None => HashMap::new(),
};
let to_files = match &to_state {
Some(state) => self.get_files_from_state(state)?,
None => HashMap::new(),
};
let mut diffs = Vec::new();
for (path, new_meta) in &to_files {
match from_files.get(path) {
None => {
diffs.push(FileDiff {
path: path.clone(),
change_type: ChangeType::Added,
old_metadata: None,
new_metadata: Some(new_meta.clone()),
});
}
Some(old_meta) => {
if old_meta != new_meta {
let change_type = if old_meta.deleted && !new_meta.deleted {
ChangeType::Restored
} else if !old_meta.deleted && new_meta.deleted {
ChangeType::Deleted
} else {
ChangeType::Modified
};
diffs.push(FileDiff {
path: path.clone(),
change_type,
old_metadata: Some(old_meta.clone()),
new_metadata: Some(new_meta.clone()),
});
}
}
}
}
for (path, old_meta) in &from_files {
if !to_files.contains_key(path) {
diffs.push(FileDiff {
path: path.clone(),
change_type: ChangeType::Deleted,
old_metadata: Some(old_meta.clone()),
new_metadata: None,
});
}
}
diffs.sort_by(|a, b| a.path.cmp(&b.path));
Ok(diffs)
}
pub fn create_restore_update(&self, doc_name: &str, update_id: i64) -> StorageResult<Vec<u8>> {
let historical_state = self.get_state_at(doc_name, update_id)?.ok_or_else(|| {
DiaryxError::Crdt(format!("No state found at update ID {}", update_id))
})?;
let historical_files = self.get_files_from_state(&historical_state)?;
let doc = Doc::new();
let files_map = doc.get_or_insert_map(FILES_MAP_NAME);
{
let mut txn = doc.transact_mut();
for (path, metadata) in historical_files {
let json = serde_json::to_string(&metadata).unwrap_or_default();
files_map.insert(&mut txn, path.as_str(), json);
}
}
let txn = doc.transact();
Ok(txn.encode_state_as_update_v1(&StateVector::default()))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::crdt::{MemoryStorage, UpdateOrigin};
fn create_test_doc(storage: &Arc<dyn CrdtStorage>, doc_name: &str) {
let doc = Doc::new();
let files_map = doc.get_or_insert_map(FILES_MAP_NAME);
{
let mut txn = doc.transact_mut();
let meta = FileMetadata::new(Some("Test".to_string()));
let json = serde_json::to_string(&meta).unwrap();
files_map.insert(&mut txn, "test.md", json);
}
let txn = doc.transact();
let state = txn.encode_state_as_update_v1(&StateVector::default());
storage.save_doc(doc_name, &state).unwrap();
}
#[test]
fn test_get_history_empty() {
let storage: Arc<dyn CrdtStorage> = Arc::new(MemoryStorage::new());
let history = HistoryManager::new(storage);
let entries = history.get_history("test", None).unwrap();
assert!(entries.is_empty());
}
#[test]
fn test_get_history_with_updates() {
let storage: Arc<dyn CrdtStorage> = Arc::new(MemoryStorage::new());
storage
.append_update("test", b"update1", UpdateOrigin::Local)
.unwrap();
storage
.append_update("test", b"update2", UpdateOrigin::Remote)
.unwrap();
storage
.append_update("test", b"update3", UpdateOrigin::Local)
.unwrap();
let history = HistoryManager::new(storage);
let entries = history.get_history("test", None).unwrap();
assert_eq!(entries.len(), 3);
assert!(entries[0].update_id > entries[1].update_id);
assert!(entries[1].update_id > entries[2].update_id);
}
#[test]
fn test_get_history_with_limit() {
let storage: Arc<dyn CrdtStorage> = Arc::new(MemoryStorage::new());
for i in 0..10 {
storage
.append_update(
"test",
format!("update{}", i).as_bytes(),
UpdateOrigin::Local,
)
.unwrap();
}
let history = HistoryManager::new(storage);
let entries = history.get_history("test", Some(3)).unwrap();
assert_eq!(entries.len(), 3);
}
#[test]
fn test_get_state_at() {
let storage: Arc<dyn CrdtStorage> = Arc::new(MemoryStorage::new());
create_test_doc(&storage, "workspace");
let history = HistoryManager::new(Arc::clone(&storage));
let state = history.get_state_at("workspace", 0).unwrap();
assert!(state.is_some());
}
#[test]
fn test_diff_added_file() {
let storage: Arc<dyn CrdtStorage> = Arc::new(MemoryStorage::new());
let doc1 = Doc::new();
let _files_map1 = doc1.get_or_insert_map(FILES_MAP_NAME);
let txn1 = doc1.transact();
let state1 = txn1.encode_state_as_update_v1(&StateVector::default());
storage.save_doc("workspace", &state1).unwrap();
let id1 = storage
.append_update("workspace", &state1, UpdateOrigin::Local)
.unwrap();
let doc2 = Doc::new();
let files_map2 = doc2.get_or_insert_map(FILES_MAP_NAME);
{
let mut txn2 = doc2.transact_mut();
let meta = FileMetadata::new(Some("New File".to_string()));
let json = serde_json::to_string(&meta).unwrap();
files_map2.insert(&mut txn2, "new.md", json);
}
let txn2 = doc2.transact();
let state2 = txn2.encode_state_as_update_v1(&StateVector::default());
storage.save_doc("workspace", &state2).unwrap();
let id2 = storage
.append_update("workspace", &state2, UpdateOrigin::Local)
.unwrap();
let history = HistoryManager::new(storage);
let diffs = history.diff("workspace", id1, id2).unwrap();
assert_eq!(diffs.len(), 1);
assert_eq!(diffs[0].path, "new.md");
assert_eq!(diffs[0].change_type, ChangeType::Added);
assert!(diffs[0].old_metadata.is_none());
assert!(diffs[0].new_metadata.is_some());
}
#[test]
fn test_diff_deleted_file() {
let storage: Arc<dyn CrdtStorage> = Arc::new(MemoryStorage::new());
let doc1 = Doc::new();
let files_map1 = doc1.get_or_insert_map(FILES_MAP_NAME);
{
let mut txn1 = doc1.transact_mut();
let meta = FileMetadata::new(Some("To Delete".to_string()));
let json = serde_json::to_string(&meta).unwrap();
files_map1.insert(&mut txn1, "delete.md", json);
}
let txn1 = doc1.transact();
let state1 = txn1.encode_state_as_update_v1(&StateVector::default());
storage.save_doc("workspace", &state1).unwrap();
let id1 = storage
.append_update("workspace", &state1, UpdateOrigin::Local)
.unwrap();
let doc2 = Doc::new();
let files_map2 = doc2.get_or_insert_map(FILES_MAP_NAME);
{
let mut txn2 = doc2.transact_mut();
let mut meta = FileMetadata::new(Some("To Delete".to_string()));
meta.deleted = true;
let json = serde_json::to_string(&meta).unwrap();
files_map2.insert(&mut txn2, "delete.md", json);
}
let txn2 = doc2.transact();
let state2 = txn2.encode_state_as_update_v1(&StateVector::default());
storage.save_doc("workspace", &state2).unwrap();
let id2 = storage
.append_update("workspace", &state2, UpdateOrigin::Local)
.unwrap();
let history = HistoryManager::new(storage);
let diffs = history.diff("workspace", id1, id2).unwrap();
assert_eq!(diffs.len(), 1);
assert_eq!(diffs[0].path, "delete.md");
assert_eq!(diffs[0].change_type, ChangeType::Deleted);
}
#[test]
fn test_create_restore_update() {
let storage: Arc<dyn CrdtStorage> = Arc::new(MemoryStorage::new());
let doc1 = Doc::new();
let files_map1 = doc1.get_or_insert_map(FILES_MAP_NAME);
{
let mut txn1 = doc1.transact_mut();
let meta = FileMetadata::new(Some("Original".to_string()));
let json = serde_json::to_string(&meta).unwrap();
files_map1.insert(&mut txn1, "file.md", json);
}
let txn1 = doc1.transact();
let state1 = txn1.encode_state_as_update_v1(&StateVector::default());
drop(txn1); storage.save_doc("workspace", &state1).unwrap();
let id1 = storage
.append_update("workspace", &state1, UpdateOrigin::Local)
.unwrap();
let history = HistoryManager::new(storage);
let restore_update = history.create_restore_update("workspace", id1).unwrap();
assert!(!restore_update.is_empty());
let decoded = Update::decode_v1(&restore_update);
assert!(decoded.is_ok());
}
}