use std::path::PathBuf;
#[cfg(feature = "crdt")]
use std::sync::Arc;
use indexmap::IndexMap;
use serde_yaml::Value;
use crate::error::{DiaryxError, Result};
use crate::frontmatter;
use crate::fs::AsyncFileSystem;
#[cfg(feature = "crdt")]
use crate::crdt::{BodyDocManager, CrdtStorage, WorkspaceCrdt};
pub(crate) fn yaml_to_json(yaml: Value) -> serde_json::Value {
match yaml {
Value::Null => serde_json::Value::Null,
Value::Bool(b) => serde_json::Value::Bool(b),
Value::Number(n) => {
if let Some(i) = n.as_i64() {
serde_json::Value::Number(i.into())
} else if let Some(u) = n.as_u64() {
serde_json::Value::Number(u.into())
} else if let Some(f) = n.as_f64() {
serde_json::Number::from_f64(f)
.map(serde_json::Value::Number)
.unwrap_or(serde_json::Value::Null)
} else {
serde_json::Value::Null
}
}
Value::String(s) => serde_json::Value::String(s),
Value::Sequence(arr) => {
serde_json::Value::Array(arr.into_iter().map(yaml_to_json).collect())
}
Value::Mapping(map) => {
let obj: serde_json::Map<String, serde_json::Value> = map
.into_iter()
.filter_map(|(k, v)| k.as_str().map(|s| (s.to_string(), yaml_to_json(v))))
.collect();
serde_json::Value::Object(obj)
}
Value::Tagged(tagged) => yaml_to_json(tagged.value),
}
}
pub(crate) fn json_to_yaml(json: serde_json::Value) -> Value {
match json {
serde_json::Value::Null => Value::Null,
serde_json::Value::Bool(b) => Value::Bool(b),
serde_json::Value::Number(n) => {
if let Some(i) = n.as_i64() {
Value::Number(i.into())
} else if let Some(u) = n.as_u64() {
Value::Number(u.into())
} else if let Some(f) = n.as_f64() {
Value::Number(serde_yaml::Number::from(f))
} else {
Value::Null
}
}
serde_json::Value::String(s) => Value::String(s),
serde_json::Value::Array(arr) => {
Value::Sequence(arr.into_iter().map(json_to_yaml).collect())
}
serde_json::Value::Object(map) => {
let yaml_map: serde_yaml::Mapping = map
.into_iter()
.map(|(k, v)| (Value::String(k), json_to_yaml(v)))
.collect();
Value::Mapping(yaml_map)
}
}
}
pub struct Diaryx<FS: AsyncFileSystem> {
fs: FS,
workspace_root: std::sync::RwLock<Option<PathBuf>>,
link_format: crate::link_parser::LinkFormat,
#[cfg(feature = "crdt")]
workspace_crdt: Option<Arc<WorkspaceCrdt>>,
#[cfg(feature = "crdt")]
body_doc_manager: Option<Arc<BodyDocManager>>,
#[cfg(feature = "crdt")]
sync_handler: Option<Arc<crate::crdt::SyncHandler<FS>>>,
#[cfg(feature = "crdt")]
sync_manager: Option<Arc<crate::crdt::RustSyncManager<FS>>>,
}
impl<FS: AsyncFileSystem> Diaryx<FS> {
pub fn new(fs: FS) -> Self {
Self {
fs,
workspace_root: std::sync::RwLock::new(None),
link_format: crate::link_parser::LinkFormat::default(),
#[cfg(feature = "crdt")]
workspace_crdt: None,
#[cfg(feature = "crdt")]
body_doc_manager: None,
#[cfg(feature = "crdt")]
sync_handler: None,
#[cfg(feature = "crdt")]
sync_manager: None,
}
}
pub fn set_link_format(&mut self, format: crate::link_parser::LinkFormat) {
self.link_format = format;
}
pub fn workspace_root(&self) -> Option<PathBuf> {
self.workspace_root.read().unwrap().clone()
}
pub fn link_format(&self) -> crate::link_parser::LinkFormat {
self.link_format
}
#[cfg(feature = "crdt")]
pub fn with_crdt(fs: FS, storage: Arc<dyn CrdtStorage>) -> Self
where
FS: Clone,
{
let sync_handler = Arc::new(crate::crdt::SyncHandler::new(fs.clone()));
let workspace_crdt = Arc::new(WorkspaceCrdt::new(Arc::clone(&storage)));
let body_doc_manager = Arc::new(BodyDocManager::new(storage));
let sync_manager = Arc::new(crate::crdt::RustSyncManager::new(
Arc::clone(&workspace_crdt),
Arc::clone(&body_doc_manager),
Arc::clone(&sync_handler),
));
Self {
fs,
workspace_root: std::sync::RwLock::new(None),
link_format: crate::link_parser::LinkFormat::default(),
workspace_crdt: Some(workspace_crdt),
body_doc_manager: Some(body_doc_manager),
sync_handler: Some(sync_handler),
sync_manager: Some(sync_manager),
}
}
#[cfg(feature = "crdt")]
pub fn with_crdt_load(fs: FS, storage: Arc<dyn CrdtStorage>) -> Result<Self>
where
FS: Clone,
{
let workspace_crdt = Arc::new(WorkspaceCrdt::load(Arc::clone(&storage))?);
let body_doc_manager = Arc::new(BodyDocManager::new(storage));
let sync_handler = Arc::new(crate::crdt::SyncHandler::new(fs.clone()));
let sync_manager = Arc::new(crate::crdt::RustSyncManager::new(
Arc::clone(&workspace_crdt),
Arc::clone(&body_doc_manager),
Arc::clone(&sync_handler),
));
Ok(Self {
fs,
workspace_root: std::sync::RwLock::new(None),
link_format: crate::link_parser::LinkFormat::default(),
workspace_crdt: Some(workspace_crdt),
body_doc_manager: Some(body_doc_manager),
sync_handler: Some(sync_handler),
sync_manager: Some(sync_manager),
})
}
#[cfg(feature = "crdt")]
pub fn with_crdt_instances(
fs: FS,
workspace_crdt: Arc<WorkspaceCrdt>,
body_doc_manager: Arc<BodyDocManager>,
) -> Self
where
FS: Clone,
{
let sync_handler = Arc::new(crate::crdt::SyncHandler::new(fs.clone()));
let sync_manager = Arc::new(crate::crdt::RustSyncManager::new(
Arc::clone(&workspace_crdt),
Arc::clone(&body_doc_manager),
Arc::clone(&sync_handler),
));
Self {
fs,
workspace_root: std::sync::RwLock::new(None),
link_format: crate::link_parser::LinkFormat::default(),
workspace_crdt: Some(workspace_crdt),
body_doc_manager: Some(body_doc_manager),
sync_handler: Some(sync_handler),
sync_manager: Some(sync_manager),
}
}
pub fn fs(&self) -> &FS {
&self.fs
}
pub fn entry(&self) -> EntryOps<'_, FS> {
EntryOps { diaryx: self }
}
pub fn workspace(&self) -> WorkspaceOps<'_, FS> {
WorkspaceOps { diaryx: self }
}
#[cfg(feature = "crdt")]
pub fn crdt(&self) -> Option<CrdtOps<'_, FS>> {
match (&self.workspace_crdt, &self.body_doc_manager) {
(Some(crdt), Some(body_docs)) => Some(CrdtOps {
_diaryx: self,
crdt,
body_docs,
}),
_ => None,
}
}
#[cfg(feature = "crdt")]
pub fn has_crdt(&self) -> bool {
self.workspace_crdt.is_some() && self.body_doc_manager.is_some()
}
#[cfg(feature = "crdt")]
pub(crate) fn body_doc_manager(&self) -> Option<&Arc<BodyDocManager>> {
self.body_doc_manager.as_ref()
}
#[cfg(feature = "crdt")]
pub(crate) fn sync_handler(&self) -> Option<&Arc<crate::crdt::SyncHandler<FS>>> {
self.sync_handler.as_ref()
}
#[cfg(feature = "crdt")]
pub fn sync_manager(&self) -> Option<&Arc<crate::crdt::RustSyncManager<FS>>> {
self.sync_manager.as_ref()
}
#[cfg(feature = "crdt")]
pub fn set_sync_event_callback(
&self,
callback: Arc<dyn Fn(&crate::fs::FileSystemEvent) + Send + Sync>,
) {
if let Some(ref sync_manager) = self.sync_manager {
sync_manager.set_event_callback(callback);
}
}
#[cfg(feature = "crdt")]
pub fn set_workspace_root(&self, root: std::path::PathBuf) {
*self.workspace_root.write().unwrap() = Some(root.clone());
if let Some(ref sync_handler) = self.sync_handler {
sync_handler.set_workspace_root(root);
}
}
}
impl<FS: AsyncFileSystem + Clone> Diaryx<FS> {
pub fn search(&self) -> SearchOps<'_, FS> {
SearchOps { diaryx: self }
}
pub fn export(&self) -> ExportOps<'_, FS> {
ExportOps { diaryx: self }
}
pub fn validate(&self) -> ValidateOps<'_, FS> {
ValidateOps { diaryx: self }
}
}
pub struct EntryOps<'a, FS: AsyncFileSystem> {
diaryx: &'a Diaryx<FS>,
}
impl<'a, FS: AsyncFileSystem> EntryOps<'a, FS> {
fn resolve_path(&self, path: &str) -> PathBuf {
let wr = self.diaryx.workspace_root.read().unwrap();
match &*wr {
Some(root) => root.join(path),
None => PathBuf::from(path),
}
}
pub async fn get_frontmatter(&self, path: &str) -> Result<IndexMap<String, Value>> {
let content = self.read_raw(path).await?;
match frontmatter::parse(&content) {
Ok(parsed) => Ok(parsed.frontmatter),
Err(DiaryxError::NoFrontmatter(_)) => Ok(IndexMap::new()),
Err(e) => Err(e),
}
}
pub async fn get_frontmatter_property(&self, path: &str, key: &str) -> Result<Option<Value>> {
let frontmatter = self.get_frontmatter(path).await?;
Ok(frontmatter.get(key).cloned())
}
pub async fn set_frontmatter_property(
&self,
path: &str,
key: &str,
value: Value,
) -> Result<()> {
let content = self.read_raw_or_empty(path).await?;
let mut parsed = frontmatter::parse_or_empty(&content)?;
frontmatter::set_property(&mut parsed.frontmatter, key, value);
self.write_parsed(path, &parsed).await
}
pub async fn remove_frontmatter_property(&self, path: &str, key: &str) -> Result<()> {
let content = match self.read_raw(path).await {
Ok(c) => c,
Err(_) => return Ok(()), };
let mut parsed = match frontmatter::parse(&content) {
Ok(p) => p,
Err(DiaryxError::NoFrontmatter(_)) => return Ok(()),
Err(e) => return Err(e),
};
frontmatter::remove_property(&mut parsed.frontmatter, key);
self.write_parsed(path, &parsed).await
}
pub async fn get_content(&self, path: &str) -> Result<String> {
let content = self.read_raw_or_empty(path).await?;
let parsed = frontmatter::parse_or_empty(&content)?;
Ok(parsed.body)
}
pub async fn set_content(&self, path: &str, body: &str) -> Result<()> {
let content = self.read_raw_or_empty(path).await?;
let mut parsed = frontmatter::parse_or_empty(&content)?;
parsed.body = body.to_string();
self.write_parsed(path, &parsed).await
}
pub async fn save_content(&self, path: &str, body: &str) -> Result<()> {
self.set_content(path, body).await?;
self.touch_updated(path).await
}
pub async fn touch_updated(&self, path: &str) -> Result<()> {
let timestamp = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string();
self.set_frontmatter_property(path, "updated", Value::String(timestamp))
.await
}
pub async fn append_content(&self, path: &str, content: &str) -> Result<()> {
let raw = self.read_raw_or_empty(path).await?;
let mut parsed = frontmatter::parse_or_empty(&raw)?;
parsed.body = if parsed.body.is_empty() {
content.to_string()
} else if parsed.body.ends_with('\n') {
format!("{}{}", parsed.body, content)
} else {
format!("{}\n{}", parsed.body, content)
};
self.write_parsed(path, &parsed).await
}
pub async fn read_raw(&self, path: &str) -> Result<String> {
let resolved = self.resolve_path(path);
self.diaryx
.fs
.read_to_string(&resolved)
.await
.map_err(|e| DiaryxError::FileRead {
path: resolved,
source: e,
})
}
async fn read_raw_or_empty(&self, path: &str) -> Result<String> {
let resolved = self.resolve_path(path);
match self.diaryx.fs.read_to_string(&resolved).await {
Ok(content) => Ok(content),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(String::new()),
Err(e) => Err(DiaryxError::FileRead {
path: resolved,
source: e,
}),
}
}
async fn write_parsed(&self, path: &str, parsed: &frontmatter::ParsedFile) -> Result<()> {
let content = frontmatter::serialize(&parsed.frontmatter, &parsed.body)?;
let resolved = self.resolve_path(path);
self.diaryx
.fs
.write_file(&resolved, &content)
.await
.map_err(|e| DiaryxError::FileWrite {
path: resolved,
source: e,
})
}
pub async fn get_attachments(&self, path: &str) -> Result<Vec<String>> {
let frontmatter = self.get_frontmatter(path).await?;
Ok(frontmatter::get_string_array(&frontmatter, "attachments"))
}
pub async fn add_attachment(&self, path: &str, attachment_path: &str) -> Result<()> {
let content = self.read_raw_or_empty(path).await?;
let mut parsed = frontmatter::parse_or_empty(&content)?;
let attachments = parsed
.frontmatter
.entry("attachments".to_string())
.or_insert(Value::Sequence(vec![]));
if let Value::Sequence(list) = attachments {
let new_attachment = Value::String(attachment_path.to_string());
if !list.contains(&new_attachment) {
list.push(new_attachment);
}
}
self.write_parsed(path, &parsed).await
}
pub async fn remove_attachment(&self, path: &str, attachment_path: &str) -> Result<()> {
let content = match self.read_raw(path).await {
Ok(c) => c,
Err(_) => return Ok(()),
};
let mut parsed = match frontmatter::parse(&content) {
Ok(p) => p,
Err(DiaryxError::NoFrontmatter(_)) => return Ok(()),
Err(e) => return Err(e),
};
if let Some(Value::Sequence(list)) = parsed.frontmatter.get_mut("attachments") {
list.retain(|item| {
if let Value::String(s) = item {
s != attachment_path
} else {
true
}
});
if list.is_empty() {
parsed.frontmatter.shift_remove("attachments");
}
}
self.write_parsed(path, &parsed).await
}
pub async fn sort_frontmatter(&self, path: &str, pattern: Option<&str>) -> Result<()> {
let content = match self.read_raw(path).await {
Ok(c) => c,
Err(_) => return Ok(()),
};
let parsed = match frontmatter::parse(&content) {
Ok(p) => p,
Err(DiaryxError::NoFrontmatter(_)) => return Ok(()),
Err(e) => return Err(e),
};
let sorted_frontmatter = match pattern {
Some(p) => frontmatter::sort_by_pattern(parsed.frontmatter, p),
None => frontmatter::sort_alphabetically(parsed.frontmatter),
};
let sorted_parsed = frontmatter::ParsedFile {
frontmatter: sorted_frontmatter,
body: parsed.body,
};
self.write_parsed(path, &sorted_parsed).await
}
}
pub struct WorkspaceOps<'a, FS: AsyncFileSystem> {
diaryx: &'a Diaryx<FS>,
}
impl<'a, FS: AsyncFileSystem> WorkspaceOps<'a, FS> {
pub fn inner(&self) -> crate::workspace::Workspace<&'a FS> {
if let Some(root) = self.diaryx.workspace_root() {
crate::workspace::Workspace::with_link_format(
&self.diaryx.fs,
root,
self.diaryx.link_format,
)
} else {
crate::workspace::Workspace::new(&self.diaryx.fs)
}
}
}
pub struct SearchOps<'a, FS: AsyncFileSystem> {
diaryx: &'a Diaryx<FS>,
}
impl<'a, FS: AsyncFileSystem + Clone> SearchOps<'a, FS> {
pub fn inner(&self) -> crate::search::Searcher<FS> {
crate::search::Searcher::new(self.diaryx.fs.clone())
}
pub async fn search_workspace(
&self,
workspace_root: &std::path::Path,
query: &crate::search::SearchQuery,
) -> crate::error::Result<crate::search::SearchResults> {
self.inner().search_workspace(workspace_root, query).await
}
pub async fn search_file(
&self,
path: &std::path::Path,
query: &crate::search::SearchQuery,
) -> crate::error::Result<Option<crate::search::FileSearchResult>> {
self.inner().search_file(path, query).await
}
}
pub struct ExportOps<'a, FS: AsyncFileSystem> {
diaryx: &'a Diaryx<FS>,
}
impl<'a, FS: AsyncFileSystem + Clone> ExportOps<'a, FS> {
pub fn inner(&self) -> crate::export::Exporter<FS> {
crate::export::Exporter::new(self.diaryx.fs.clone())
}
pub async fn plan_export(
&self,
workspace_root: &std::path::Path,
audience: &str,
destination: &std::path::Path,
) -> crate::error::Result<crate::export::ExportPlan> {
self.inner()
.plan_export(workspace_root, audience, destination)
.await
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn execute_export(
&self,
plan: &crate::export::ExportPlan,
options: &crate::export::ExportOptions,
) -> crate::error::Result<crate::export::ExportStats> {
self.inner().execute_export(plan, options).await
}
}
pub struct ValidateOps<'a, FS: AsyncFileSystem> {
diaryx: &'a Diaryx<FS>,
}
impl<'a, FS: AsyncFileSystem + Clone> ValidateOps<'a, FS> {
pub fn inner(&self) -> crate::validate::Validator<FS> {
crate::validate::Validator::new(self.diaryx.fs.clone())
}
pub async fn validate_workspace(
&self,
root_path: &std::path::Path,
max_depth: Option<usize>,
) -> crate::error::Result<crate::validate::ValidationResult> {
self.inner().validate_workspace(root_path, max_depth).await
}
pub async fn validate_file(
&self,
file_path: &std::path::Path,
) -> crate::error::Result<crate::validate::ValidationResult> {
self.inner().validate_file(file_path).await
}
pub fn fixer(&self) -> crate::validate::ValidationFixer<FS> {
crate::validate::ValidationFixer::new(self.diaryx.fs.clone())
}
}
#[cfg(feature = "crdt")]
pub struct CrdtOps<'a, FS: AsyncFileSystem> {
_diaryx: &'a Diaryx<FS>,
crdt: &'a Arc<WorkspaceCrdt>,
body_docs: &'a Arc<BodyDocManager>,
}
#[cfg(feature = "crdt")]
impl<'a, FS: AsyncFileSystem> CrdtOps<'a, FS> {
pub fn get_state_vector(&self) -> Vec<u8> {
self.crdt.encode_state_vector()
}
pub fn get_full_state(&self) -> Vec<u8> {
self.crdt.encode_state_as_update()
}
pub fn get_missing_updates(&self, remote_state_vector: &[u8]) -> Result<Vec<u8>> {
self.crdt.encode_diff(remote_state_vector)
}
pub fn apply_update(
&self,
update: &[u8],
origin: crate::crdt::UpdateOrigin,
) -> Result<Option<i64>> {
self.crdt.apply_update(update, origin)
}
pub fn apply_update_tracking_changes(
&self,
update: &[u8],
origin: crate::crdt::UpdateOrigin,
) -> Result<(Option<i64>, Vec<String>, Vec<(String, String)>)> {
self.crdt.apply_update_tracking_changes(update, origin)
}
pub fn get_file(&self, path: &str) -> Option<crate::crdt::FileMetadata> {
self.crdt.get_file(path)
}
pub fn set_file(&self, path: &str, metadata: crate::crdt::FileMetadata) -> Result<()> {
self.crdt.set_file(path, metadata)
}
pub fn delete_file(&self, path: &str) -> Result<()> {
self.crdt.delete_file(path)
}
pub fn list_files(&self) -> Vec<(String, crate::crdt::FileMetadata)> {
self.crdt.list_files()
}
pub fn list_active_files(&self) -> Vec<(String, crate::crdt::FileMetadata)> {
self.crdt.list_active_files()
}
pub fn get_history(&self) -> Result<Vec<crate::crdt::CrdtUpdate>> {
self.crdt.get_history()
}
pub fn get_updates_since(&self, since_id: i64) -> Result<Vec<crate::crdt::CrdtUpdate>> {
self.crdt.get_updates_since(since_id)
}
pub fn storage(&self) -> &std::sync::Arc<dyn crate::crdt::CrdtStorage> {
self.crdt.storage()
}
pub fn save(&self) -> Result<()> {
self.crdt.save()
}
pub fn file_count(&self) -> usize {
self.crdt.file_count()
}
pub fn get_or_create_body_doc(&self, doc_name: &str) -> std::sync::Arc<crate::crdt::BodyDoc> {
self.body_docs.get_or_create(doc_name)
}
pub fn get_body_doc(&self, doc_name: &str) -> Option<std::sync::Arc<crate::crdt::BodyDoc>> {
self.body_docs.get(doc_name)
}
pub fn get_body_content(&self, doc_name: &str) -> Option<String> {
self.body_docs.get(doc_name).map(|doc| doc.get_body())
}
pub fn set_body_content(&self, doc_name: &str, content: &str) -> Result<()> {
let doc = self.body_docs.get_or_create(doc_name);
doc.set_body(content)
}
pub fn reset_body_doc(&self, doc_name: &str) {
self.body_docs.create(doc_name);
}
pub fn get_body_sync_state(&self, doc_name: &str) -> Option<Vec<u8>> {
self.body_docs.get_sync_state(doc_name)
}
pub fn get_body_full_state(&self, doc_name: &str) -> Option<Vec<u8>> {
self.body_docs.get_full_state(doc_name)
}
pub fn apply_body_update(
&self,
doc_name: &str,
update: &[u8],
origin: crate::crdt::UpdateOrigin,
) -> Result<Option<i64>> {
self.body_docs.apply_update(doc_name, update, origin)
}
pub fn get_body_missing_updates(
&self,
doc_name: &str,
remote_state_vector: &[u8],
) -> Result<Vec<u8>> {
self.body_docs.get_diff(doc_name, remote_state_vector)
}
pub fn save_body_doc(&self, doc_name: &str) -> Result<bool> {
self.body_docs.save(doc_name)
}
pub fn save_all_body_docs(&self) -> Result<()> {
self.body_docs.save_all()
}
pub fn loaded_body_docs(&self) -> Vec<String> {
self.body_docs.loaded_docs()
}
pub fn unload_body_doc(&self, doc_name: &str) {
self.body_docs.unload(doc_name);
}
fn is_workspace_doc(doc_name: &str) -> bool {
doc_name == "workspace" || doc_name.ends_with(":workspace")
}
pub fn create_sync_step1(&self, doc_name: &str) -> Vec<u8> {
log::debug!("[Y-sync] create_sync_step1 for doc: {}", doc_name);
if Self::is_workspace_doc(doc_name) {
let sv = self.crdt.encode_state_vector();
log::debug!("[Y-sync] Workspace state_vector {} bytes", sv.len());
crate::crdt::SyncMessage::SyncStep1(sv).encode()
} else {
let doc = self.body_docs.get_or_create(doc_name);
let sv = doc.encode_state_vector();
log::debug!("[Y-sync] Body doc state_vector {} bytes", sv.len());
crate::crdt::SyncMessage::SyncStep1(sv).encode()
}
}
pub fn handle_sync_message(&self, doc_name: &str, message: &[u8]) -> Result<Option<Vec<u8>>> {
log::debug!(
"[Y-sync] handle_sync_message for doc: {}, {} bytes",
doc_name,
message.len()
);
let messages = crate::crdt::SyncMessage::decode_all(message)?;
if messages.is_empty() {
log::debug!("[Y-sync] No messages decoded, returning None");
return Ok(None);
}
log::debug!("[Y-sync] Decoded {} messages", messages.len());
let mut response: Option<Vec<u8>> = None;
for sync_msg in messages {
let msg_response = if Self::is_workspace_doc(doc_name) {
self.handle_workspace_sync_message(sync_msg)?
} else {
self.handle_body_sync_message(doc_name, sync_msg)?
};
if let Some(resp) = msg_response {
if let Some(ref mut existing) = response {
existing.extend_from_slice(&resp);
} else {
response = Some(resp);
}
}
}
Ok(response)
}
pub fn handle_sync_message_with_changes(
&self,
doc_name: &str,
message: &[u8],
) -> Result<(Option<Vec<u8>>, Vec<String>)> {
log::debug!(
"[Y-sync] handle_sync_message_with_changes for doc: {}, {} bytes",
doc_name,
message.len()
);
let messages = crate::crdt::SyncMessage::decode_all(message)?;
if messages.is_empty() {
log::debug!("[Y-sync] No messages decoded, returning None");
return Ok((None, Vec::new()));
}
log::debug!("[Y-sync] Decoded {} messages", messages.len());
let mut response: Option<Vec<u8>> = None;
let mut all_changed_files = Vec::new();
for sync_msg in messages {
if Self::is_workspace_doc(doc_name) {
let (msg_response, changed_files) =
self.handle_workspace_sync_message_with_changes(sync_msg)?;
all_changed_files.extend(changed_files);
if let Some(resp) = msg_response {
if let Some(ref mut existing) = response {
existing.extend_from_slice(&resp);
} else {
response = Some(resp);
}
}
} else {
let msg_response = self.handle_body_sync_message(doc_name, sync_msg)?;
if let Some(resp) = msg_response {
if let Some(ref mut existing) = response {
existing.extend_from_slice(&resp);
} else {
response = Some(resp);
}
}
}
}
Ok((response, all_changed_files))
}
fn handle_workspace_sync_message_with_changes(
&self,
msg: crate::crdt::SyncMessage,
) -> Result<(Option<Vec<u8>>, Vec<String>)> {
match msg {
crate::crdt::SyncMessage::SyncStep1(remote_sv) => {
log::debug!(
"[Y-sync] Workspace: Received SyncStep1, remote_sv {} bytes",
remote_sv.len()
);
let diff = self.crdt.encode_diff(&remote_sv)?;
log::debug!("[Y-sync] Workspace: Encoded diff {} bytes", diff.len());
let step2 = crate::crdt::SyncMessage::SyncStep2(diff).encode();
let our_sv = self.crdt.encode_state_vector();
log::debug!(
"[Y-sync] Workspace: Our state_vector {} bytes",
our_sv.len()
);
let step1 = crate::crdt::SyncMessage::SyncStep1(our_sv).encode();
let mut combined = step2;
combined.extend_from_slice(&step1);
log::debug!(
"[Y-sync] Workspace: Returning combined response {} bytes (Step2 + Step1)",
combined.len()
);
Ok((Some(combined), Vec::new()))
}
crate::crdt::SyncMessage::SyncStep2(update) => {
log::debug!(
"[Y-sync] Workspace: Received SyncStep2, update {} bytes",
update.len()
);
let mut changed_files = Vec::new();
if !update.is_empty() {
let (_, files, _renames) = self
.crdt
.apply_update_tracking_changes(&update, crate::crdt::UpdateOrigin::Sync)?;
changed_files = files;
log::debug!(
"[Y-sync] Workspace: Applied update successfully, {} files changed",
changed_files.len()
);
} else {
log::debug!("[Y-sync] Workspace: Update is empty, skipping apply");
}
Ok((None, changed_files))
}
crate::crdt::SyncMessage::Update(update) => {
log::debug!(
"[Y-sync] Workspace: Received Update, {} bytes",
update.len()
);
let mut changed_files = Vec::new();
if !update.is_empty() {
let (_, files, _renames) = self.crdt.apply_update_tracking_changes(
&update,
crate::crdt::UpdateOrigin::Remote,
)?;
changed_files = files;
log::debug!(
"[Y-sync] Workspace: Applied remote update successfully, {} files changed",
changed_files.len()
);
} else {
log::debug!("[Y-sync] Workspace: Update is empty, skipping apply");
}
Ok((None, changed_files))
}
}
}
fn handle_workspace_sync_message(
&self,
msg: crate::crdt::SyncMessage,
) -> Result<Option<Vec<u8>>> {
match msg {
crate::crdt::SyncMessage::SyncStep1(remote_sv) => {
log::debug!(
"[Y-sync] Workspace: Received SyncStep1, remote_sv {} bytes",
remote_sv.len()
);
let diff = self.crdt.encode_diff(&remote_sv)?;
log::debug!("[Y-sync] Workspace: Encoded diff {} bytes", diff.len());
let step2 = crate::crdt::SyncMessage::SyncStep2(diff).encode();
let our_sv = self.crdt.encode_state_vector();
log::debug!(
"[Y-sync] Workspace: Our state_vector {} bytes",
our_sv.len()
);
let step1 = crate::crdt::SyncMessage::SyncStep1(our_sv).encode();
let mut combined = step2;
combined.extend_from_slice(&step1);
log::debug!(
"[Y-sync] Workspace: Returning combined response {} bytes (Step2 + Step1)",
combined.len()
);
Ok(Some(combined))
}
crate::crdt::SyncMessage::SyncStep2(update) => {
log::debug!(
"[Y-sync] Workspace: Received SyncStep2, update {} bytes",
update.len()
);
if !update.is_empty() {
self.crdt
.apply_update(&update, crate::crdt::UpdateOrigin::Sync)?;
log::debug!("[Y-sync] Workspace: Applied update successfully");
} else {
log::debug!("[Y-sync] Workspace: Update is empty, skipping apply");
}
Ok(None)
}
crate::crdt::SyncMessage::Update(update) => {
log::debug!(
"[Y-sync] Workspace: Received Update, {} bytes",
update.len()
);
if !update.is_empty() {
self.crdt
.apply_update(&update, crate::crdt::UpdateOrigin::Remote)?;
log::debug!("[Y-sync] Workspace: Applied remote update successfully");
} else {
log::debug!("[Y-sync] Workspace: Update is empty, skipping apply");
}
Ok(None)
}
}
}
fn handle_body_sync_message(
&self,
doc_name: &str,
msg: crate::crdt::SyncMessage,
) -> Result<Option<Vec<u8>>> {
log::debug!("[Y-sync] Body doc '{}': handling message", doc_name);
let doc = self.body_docs.get_or_create(doc_name);
match msg {
crate::crdt::SyncMessage::SyncStep1(remote_sv) => {
log::debug!(
"[Y-sync] Body '{}': Received SyncStep1, remote_sv {} bytes",
doc_name,
remote_sv.len()
);
let diff = doc.encode_diff(&remote_sv)?;
log::debug!(
"[Y-sync] Body '{}': Encoded diff {} bytes",
doc_name,
diff.len()
);
let step2 = crate::crdt::SyncMessage::SyncStep2(diff).encode();
let our_sv = doc.encode_state_vector();
let step1 = crate::crdt::SyncMessage::SyncStep1(our_sv).encode();
let mut combined = step2;
combined.extend_from_slice(&step1);
log::debug!(
"[Y-sync] Body '{}': Returning combined {} bytes",
doc_name,
combined.len()
);
Ok(Some(combined))
}
crate::crdt::SyncMessage::SyncStep2(update) => {
log::debug!(
"[Y-sync] Body '{}': Received SyncStep2, {} bytes",
doc_name,
update.len()
);
if !update.is_empty() {
doc.apply_update(&update, crate::crdt::UpdateOrigin::Sync)?;
log::debug!("[Y-sync] Body '{}': Applied update", doc_name);
}
Ok(None)
}
crate::crdt::SyncMessage::Update(update) => {
log::debug!(
"[Y-sync] Body '{}': Received Update, {} bytes",
doc_name,
update.len()
);
if !update.is_empty() {
doc.apply_update(&update, crate::crdt::UpdateOrigin::Remote)?;
log::debug!("[Y-sync] Body '{}': Applied remote update", doc_name);
}
Ok(None)
}
}
}
pub fn create_update_message(&self, doc_name: &str, update: &[u8]) -> Vec<u8> {
let _ = doc_name; crate::crdt::SyncMessage::Update(update.to_vec()).encode()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::fs::SyncToAsyncFs;
use crate::test_utils::MockFileSystem;
#[test]
fn test_entry_get_set_content() {
let fs =
MockFileSystem::new().with_file("test.md", "---\ntitle: Test\n---\n\nOriginal content");
let diaryx = Diaryx::new(SyncToAsyncFs::new(fs));
let content = crate::fs::block_on_test(diaryx.entry().get_content("test.md")).unwrap();
assert_eq!(content.trim(), "Original content");
crate::fs::block_on_test(diaryx.entry().set_content("test.md", "\nNew content")).unwrap();
let content = crate::fs::block_on_test(diaryx.entry().get_content("test.md")).unwrap();
assert_eq!(content.trim(), "New content");
}
#[test]
fn test_entry_get_frontmatter() {
let fs = MockFileSystem::new()
.with_file("test.md", "---\ntitle: My Title\nauthor: John\n---\n\nBody");
let diaryx = Diaryx::new(SyncToAsyncFs::new(fs));
let fm = crate::fs::block_on_test(diaryx.entry().get_frontmatter("test.md")).unwrap();
assert_eq!(fm.get("title").unwrap().as_str().unwrap(), "My Title");
assert_eq!(fm.get("author").unwrap().as_str().unwrap(), "John");
}
#[test]
fn test_entry_set_frontmatter_property() {
let fs = MockFileSystem::new().with_file("test.md", "---\ntitle: Original\n---\n\nBody");
let diaryx = Diaryx::new(SyncToAsyncFs::new(fs));
crate::fs::block_on_test(diaryx.entry().set_frontmatter_property(
"test.md",
"title",
Value::String("Updated".to_string()),
))
.unwrap();
let fm = crate::fs::block_on_test(diaryx.entry().get_frontmatter("test.md")).unwrap();
assert_eq!(fm.get("title").unwrap().as_str().unwrap(), "Updated");
}
#[cfg(feature = "crdt")]
mod crdt_tests {
use super::*;
use crate::crdt::MemoryStorage;
#[test]
fn test_diaryx_with_crdt() {
let fs = MockFileSystem::new();
let storage = Arc::new(MemoryStorage::new());
let diaryx = Diaryx::with_crdt(SyncToAsyncFs::new(fs), storage);
assert!(diaryx.has_crdt());
let crdt = diaryx.crdt().unwrap();
let metadata = crate::crdt::FileMetadata::new(Some("Test File".to_string()));
crdt.set_file("test.md", metadata).unwrap();
let retrieved = crdt.get_file("test.md");
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().title, Some("Test File".to_string()));
assert_eq!(crdt.file_count(), 1);
}
#[test]
fn test_diaryx_without_crdt() {
let fs = MockFileSystem::new();
let diaryx = Diaryx::new(SyncToAsyncFs::new(fs));
assert!(!diaryx.has_crdt());
assert!(diaryx.crdt().is_none());
}
#[test]
fn test_diaryx_crdt_sync() {
let fs1 = MockFileSystem::new();
let fs2 = MockFileSystem::new();
let storage1 = Arc::new(MemoryStorage::new());
let storage2 = Arc::new(MemoryStorage::new());
let diaryx1 = Diaryx::with_crdt(SyncToAsyncFs::new(fs1), storage1);
let diaryx2 = Diaryx::with_crdt(SyncToAsyncFs::new(fs2), storage2);
let crdt1 = diaryx1.crdt().unwrap();
let crdt2 = diaryx2.crdt().unwrap();
let metadata = crate::crdt::FileMetadata::new(Some("Shared File".to_string()));
crdt1.set_file("shared.md", metadata).unwrap();
let state = crdt1.get_full_state();
crdt2
.apply_update(&state, crate::crdt::UpdateOrigin::Remote)
.unwrap();
let retrieved = crdt2.get_file("shared.md");
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().title, Some("Shared File".to_string()));
}
}
}