use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use super::sync_manager::RustSyncManager;
use super::transport::{ConnectionStatus, MessageCallback, SyncConfig, SyncTransport};
use super::{frame_body_message, unframe_body_message};
use crate::error::Result;
use crate::fs::{AsyncFileSystem, FileSystemEvent};
#[derive(Debug, Clone)]
pub struct OutgoingSyncMessage {
pub doc_name: String,
pub message: Vec<u8>,
pub is_body: bool,
}
impl OutgoingSyncMessage {
pub fn workspace(message: Vec<u8>) -> Self {
Self {
doc_name: "workspace".to_string(),
message,
is_body: false,
}
}
pub fn body(doc_name: String, message: Vec<u8>) -> Self {
Self {
doc_name,
message,
is_body: true,
}
}
pub fn from_event(event: &FileSystemEvent) -> Option<Self> {
match event {
FileSystemEvent::SendSyncMessage {
doc_name,
message,
is_body,
} => Some(Self {
doc_name: doc_name.clone(),
message: message.clone(),
is_body: *is_body,
}),
_ => None,
}
}
}
pub type OutgoingSender = std::sync::mpsc::Sender<OutgoingSyncMessage>;
type OutgoingReceiver = std::sync::mpsc::Receiver<OutgoingSyncMessage>;
pub type SyncEventBridge = Arc<dyn Fn(&FileSystemEvent) + Send + Sync>;
pub fn create_sync_event_bridge(sender: OutgoingSender) -> SyncEventBridge {
Arc::new(move |event: &FileSystemEvent| {
if let Some(msg) = OutgoingSyncMessage::from_event(event) {
let doc_name = msg.doc_name.clone();
let is_body = msg.is_body;
let msg_len = msg.message.len();
if let Err(e) = sender.send(msg) {
log::warn!(
"[SyncEventBridge] Failed to queue sync message: doc={}, is_body={}, msg_len={}, error={:#?}",
doc_name,
is_body,
msg_len,
e
);
} else {
log::debug!(
"[SyncEventBridge] Queued outgoing sync message: doc={}, is_body={}",
doc_name,
is_body
);
}
}
})
}
#[derive(Debug, Clone)]
pub struct SyncClientConfig {
pub server_url: String,
pub workspace_id: String,
pub auth_token: Option<String>,
pub workspace_root: PathBuf,
pub write_to_disk: bool,
pub max_reconnect_attempts: u32,
}
impl SyncClientConfig {
pub fn new(server_url: String, workspace_id: String, workspace_root: PathBuf) -> Self {
Self {
server_url,
workspace_id,
auth_token: None,
workspace_root,
write_to_disk: true,
max_reconnect_attempts: 10,
}
}
pub fn with_auth(mut self, token: String) -> Self {
self.auth_token = Some(token);
self
}
pub fn with_write_to_disk(mut self, write: bool) -> Self {
self.write_to_disk = write;
self
}
pub fn with_max_reconnects(mut self, max: u32) -> Self {
self.max_reconnect_attempts = max;
self
}
fn metadata_config(&self) -> SyncConfig {
let mut config = SyncConfig::metadata(self.server_url.clone(), self.workspace_id.clone());
if let Some(ref token) = self.auth_token {
config = config.with_auth(token.clone());
}
config.with_write_to_disk(self.write_to_disk)
}
fn body_config(&self) -> SyncConfig {
let mut config = SyncConfig::body(self.server_url.clone(), self.workspace_id.clone());
if let Some(ref token) = self.auth_token {
config = config.with_auth(token.clone());
}
config.with_write_to_disk(self.write_to_disk)
}
}
pub type SyncEventCallback = Arc<dyn Fn(SyncEvent) + Send + Sync>;
#[derive(Debug, Clone)]
pub enum SyncEvent {
StatusChanged(ConnectionStatus),
MetadataSynced {
file_count: usize,
},
BodySynced {
file_count: usize,
},
FilesChanged {
paths: Vec<String>,
},
BodyChanged {
path: String,
content: String,
},
Progress {
completed: usize,
total: usize,
},
Error {
message: String,
},
FocusListChanged {
files: Vec<String>,
},
}
#[deprecated(
note = "Use direct WebSocket with v2 protocol instead. See CLI sync/client.rs for reference."
)]
pub struct SyncClient<T: SyncTransport, FS: AsyncFileSystem + Send + Sync + 'static> {
config: SyncClientConfig,
metadata_transport: T,
body_transport: T,
sync_manager: Arc<RustSyncManager<FS>>,
running: AtomicBool,
metadata_connected: AtomicBool,
body_connected: AtomicBool,
reconnect_attempts: AtomicU32,
status: RwLock<ConnectionStatus>,
event_callback: RwLock<Option<SyncEventCallback>>,
outgoing_tx: OutgoingSender,
outgoing_rx: Mutex<Option<OutgoingReceiver>>,
}
impl<T: SyncTransport, FS: AsyncFileSystem + Send + Sync + 'static> SyncClient<T, FS> {
pub fn new(
config: SyncClientConfig,
metadata_transport: T,
body_transport: T,
sync_manager: Arc<RustSyncManager<FS>>,
) -> Self {
let (outgoing_tx, outgoing_rx) = std::sync::mpsc::channel();
Self {
config,
metadata_transport,
body_transport,
sync_manager,
running: AtomicBool::new(false),
metadata_connected: AtomicBool::new(false),
body_connected: AtomicBool::new(false),
reconnect_attempts: AtomicU32::new(0),
status: RwLock::new(ConnectionStatus::Disconnected),
event_callback: RwLock::new(None),
outgoing_tx,
outgoing_rx: Mutex::new(Some(outgoing_rx)),
}
}
pub fn outgoing_sender(&self) -> OutgoingSender {
self.outgoing_tx.clone()
}
pub async fn process_outgoing(&self) -> usize {
let pending_messages: Vec<OutgoingSyncMessage> = {
let guard = self.outgoing_rx.lock().unwrap();
if let Some(ref rx) = *guard {
let mut messages = Vec::new();
while let Ok(msg) = rx.try_recv() {
messages.push(msg);
}
messages
} else {
Vec::new()
}
};
let mut count = 0;
for msg in pending_messages {
if self.send_outgoing_message(&msg).await.is_ok() {
count += 1;
}
}
count
}
async fn send_outgoing_message(&self, msg: &OutgoingSyncMessage) -> Result<()> {
if msg.is_body {
if self.body_connected.load(Ordering::SeqCst) {
let framed = frame_body_message(&msg.doc_name, &msg.message);
self.body_transport.send(&framed).await?;
log::debug!(
"[SyncClient] Sent body message for {}, {} bytes",
msg.doc_name,
msg.message.len()
);
} else {
log::warn!(
"[SyncClient] Cannot send body message for {} - not connected",
msg.doc_name
);
}
} else {
if self.metadata_connected.load(Ordering::SeqCst) {
self.metadata_transport.send(&msg.message).await?;
log::debug!(
"[SyncClient] Sent workspace message, {} bytes",
msg.message.len()
);
} else {
log::warn!("[SyncClient] Cannot send workspace message - not connected");
}
}
Ok(())
}
pub fn set_event_callback(&self, callback: SyncEventCallback) {
let mut cb = self.event_callback.write().unwrap();
*cb = Some(callback);
}
fn emit_event(&self, event: SyncEvent) {
if let Some(ref cb) = *self.event_callback.read().unwrap() {
cb(event);
}
}
fn set_status(&self, status: ConnectionStatus) {
{
let mut s = self.status.write().unwrap();
*s = status.clone();
}
self.emit_event(SyncEvent::StatusChanged(status));
}
pub fn status(&self) -> ConnectionStatus {
self.status.read().unwrap().clone()
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::SeqCst)
}
pub fn is_connected(&self) -> bool {
self.metadata_connected.load(Ordering::SeqCst) && self.body_connected.load(Ordering::SeqCst)
}
pub async fn start(&self) -> Result<()> {
if self.running.swap(true, Ordering::SeqCst) {
return Ok(());
}
log::info!(
"[SyncClient] Starting sync to {} for workspace {}",
self.config.server_url,
self.config.workspace_id
);
self.set_status(ConnectionStatus::Connecting);
self.reconnect_attempts.store(0, Ordering::SeqCst);
self.connect_metadata().await?;
self.connect_body().await?;
Ok(())
}
async fn connect_metadata(&self) -> Result<()> {
let config = self.config.metadata_config();
let sync_manager = Arc::clone(&self.sync_manager);
let write_to_disk = self.config.write_to_disk;
let event_callback = self.event_callback.read().unwrap().clone();
let callback: MessageCallback = Arc::new(move |message: &[u8]| {
let result = futures_lite::future::block_on(
sync_manager.handle_workspace_message(message, write_to_disk),
);
match result {
Ok(sync_result) => {
if !sync_result.changed_files.is_empty() {
if let Some(ref cb) = event_callback {
cb(SyncEvent::FilesChanged {
paths: sync_result.changed_files,
});
}
}
sync_result.response
}
Err(e) => {
log::error!("[SyncClient] Metadata message error: {:?}", e);
if let Some(ref cb) = event_callback {
cb(SyncEvent::Error {
message: e.to_string(),
});
}
None
}
}
});
self.metadata_transport.set_on_message(callback);
self.metadata_transport.connect(&config).await?;
self.metadata_connected.store(true, Ordering::SeqCst);
let step1 = self.sync_manager.create_workspace_sync_step1();
self.metadata_transport.send(&step1).await?;
log::info!("[SyncClient] Metadata connection established");
Ok(())
}
async fn connect_body(&self) -> Result<()> {
let config = self.config.body_config();
let sync_manager = Arc::clone(&self.sync_manager);
let write_to_disk = self.config.write_to_disk;
let event_callback = self.event_callback.read().unwrap().clone();
let callback: MessageCallback = Arc::new(move |message: &[u8]| {
let (file_path, body_msg) = match unframe_body_message(message) {
Some((path, msg)) => (path, msg),
None => {
log::warn!("[SyncClient] Failed to unframe body message");
return None;
}
};
let result = futures_lite::future::block_on(sync_manager.handle_body_message(
&file_path,
&body_msg,
write_to_disk,
));
match result {
Ok(body_result) => {
if let Some(content) = body_result.content {
if !body_result.is_echo {
if let Some(ref cb) = event_callback {
cb(SyncEvent::BodyChanged {
path: file_path.clone(),
content,
});
}
}
}
body_result
.response
.map(|resp| frame_body_message(&file_path, &resp))
}
Err(e) => {
log::error!("[SyncClient] Body message error for {}: {:?}", file_path, e);
if let Some(ref cb) = event_callback {
cb(SyncEvent::Error {
message: e.to_string(),
});
}
None
}
}
});
self.body_transport.set_on_message(callback);
self.body_transport.connect(&config).await?;
self.body_connected.store(true, Ordering::SeqCst);
log::info!("[SyncClient] Body connection established");
self.set_status(ConnectionStatus::Connected);
Ok(())
}
pub async fn send_workspace_update(&self) -> Result<()> {
if !self.metadata_connected.load(Ordering::SeqCst) {
log::warn!("[SyncClient] Cannot send workspace update: not connected");
return Ok(());
}
let update = self.sync_manager.create_workspace_update(None)?;
if !update.is_empty() {
self.metadata_transport.send(&update).await?;
}
Ok(())
}
pub async fn send_body_update(&self, doc_name: &str, content: &str) -> Result<()> {
if !self.body_connected.load(Ordering::SeqCst) {
log::warn!("[SyncClient] Cannot send body update: not connected");
return Ok(());
}
let update = self.sync_manager.create_body_update(doc_name, content)?;
if !update.is_empty() {
let framed = frame_body_message(doc_name, &update);
self.body_transport.send(&framed).await?;
}
Ok(())
}
pub async fn stop(&self) {
if !self.running.swap(false, Ordering::SeqCst) {
return;
}
log::info!("[SyncClient] Stopping sync");
let _ = self.metadata_transport.disconnect().await;
let _ = self.body_transport.disconnect().await;
self.metadata_connected.store(false, Ordering::SeqCst);
self.body_connected.store(false, Ordering::SeqCst);
self.set_status(ConnectionStatus::Disconnected);
}
pub fn reconnect_delay(&self) -> u64 {
let attempts = self.reconnect_attempts.load(Ordering::SeqCst);
std::cmp::min(1000 * 2u64.pow(attempts), 32000)
}
pub fn should_reconnect(&self) -> bool {
self.running.load(Ordering::SeqCst)
&& self.reconnect_attempts.load(Ordering::SeqCst) < self.config.max_reconnect_attempts
}
pub fn increment_reconnect(&self) -> u32 {
self.reconnect_attempts.fetch_add(1, Ordering::SeqCst) + 1
}
pub fn reset_reconnect(&self) {
self.reconnect_attempts.store(0, Ordering::SeqCst);
}
pub async fn focus_files(&self, files: &[String]) -> Result<()> {
if !self.body_connected.load(Ordering::SeqCst) {
log::warn!("[SyncClient] Cannot focus files: body not connected");
return Ok(());
}
if files.is_empty() {
return Ok(());
}
let msg = serde_json::json!({
"type": "focus",
"files": files
});
let text = msg.to_string();
self.body_transport.send_text(&text).await?;
log::debug!("[SyncClient] Sent focus for {} files", files.len());
Ok(())
}
pub async fn unfocus_files(&self, files: &[String]) -> Result<()> {
if !self.body_connected.load(Ordering::SeqCst) {
log::warn!("[SyncClient] Cannot unfocus files: body not connected");
return Ok(());
}
if files.is_empty() {
return Ok(());
}
let msg = serde_json::json!({
"type": "unfocus",
"files": files
});
let text = msg.to_string();
self.body_transport.send_text(&text).await?;
log::debug!("[SyncClient] Sent unfocus for {} files", files.len());
Ok(())
}
pub async fn subscribe_bodies(&self, files: &[String]) -> Result<()> {
if !self.body_connected.load(Ordering::SeqCst) {
log::warn!("[SyncClient] Cannot subscribe bodies: not connected");
return Ok(());
}
log::info!(
"[SyncClient] Subscribing to {} body docs from focus list",
files.len()
);
for doc_name in files {
if !self.sync_manager.body_state_changed(doc_name) {
log::debug!("[SyncClient] Skipping unchanged body doc: {}", doc_name);
continue;
}
if let Err(e) = self.sync_manager.ensure_body_content_loaded(doc_name).await {
log::warn!(
"[SyncClient] Failed to load body content for {}: {:?}",
doc_name,
e
);
}
let step1 = self.sync_manager.create_body_sync_step1(doc_name);
let framed = frame_body_message(doc_name, &step1);
if let Err(e) = self.body_transport.send(&framed).await {
log::warn!(
"[SyncClient] Failed to subscribe body {}: {:?}",
doc_name,
e
);
}
}
Ok(())
}
pub async fn subscribe_all_bodies(&self) -> Result<()> {
if !self.body_connected.load(Ordering::SeqCst) {
log::warn!("[SyncClient] Cannot subscribe bodies: not connected");
return Ok(());
}
let file_paths = self.sync_manager.get_all_file_paths();
log::info!(
"[SyncClient] Subscribing to {} body docs (loading content from disk)",
file_paths.len()
);
let mut loaded_count = 0;
let mut empty_count = 0;
let mut skipped_count = 0;
const BATCH_SIZE: usize = 20;
#[allow(unused_variables)]
for (batch_idx, chunk) in file_paths.chunks(BATCH_SIZE).enumerate() {
for doc_name in chunk {
if !self.sync_manager.body_state_changed(doc_name) {
log::debug!("[SyncClient] Skipping unchanged body doc: {}", doc_name);
skipped_count += 1;
continue;
}
match self.sync_manager.ensure_body_content_loaded(doc_name).await {
Ok(true) => loaded_count += 1,
Ok(false) => empty_count += 1,
Err(e) => {
log::warn!(
"[SyncClient] Failed to load body content for {}: {:?}",
doc_name,
e
);
}
}
let step1 = self.sync_manager.create_body_sync_step1(doc_name);
let framed = frame_body_message(doc_name, &step1);
if let Err(e) = self.body_transport.send(&framed).await {
log::warn!(
"[SyncClient] Failed to subscribe body {}: {:?}",
doc_name,
e
);
}
}
#[cfg(feature = "native-sync")]
if batch_idx < file_paths.len() / BATCH_SIZE {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
}
log::info!(
"[SyncClient] Body subscription complete: loaded {} files, {} already had content or empty, {} skipped (unchanged)",
loaded_count,
empty_count,
skipped_count
);
Ok(())
}
}
impl<T: SyncTransport, FS: AsyncFileSystem + Send + Sync + 'static> std::fmt::Debug
for SyncClient<T, FS>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SyncClient")
.field("workspace_id", &self.config.workspace_id)
.field("running", &self.running.load(Ordering::SeqCst))
.field(
"metadata_connected",
&self.metadata_connected.load(Ordering::SeqCst),
)
.field(
"body_connected",
&self.body_connected.load(Ordering::SeqCst),
)
.field("status", &self.status())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sync_client_config() {
let config = SyncClientConfig::new(
"wss://sync.example.com".to_string(),
"ws123".to_string(),
PathBuf::from("/workspace"),
)
.with_auth("token".to_string())
.with_max_reconnects(5);
assert_eq!(config.server_url, "wss://sync.example.com");
assert_eq!(config.workspace_id, "ws123");
assert_eq!(config.auth_token, Some("token".to_string()));
assert_eq!(config.max_reconnect_attempts, 5);
}
#[test]
fn test_metadata_config() {
let config = SyncClientConfig::new(
"wss://sync.example.com".to_string(),
"ws123".to_string(),
PathBuf::from("/workspace"),
);
let meta_config = config.metadata_config();
assert!(!meta_config.multiplexed);
assert_eq!(meta_config.doc_id, "ws123");
}
#[test]
fn test_body_config() {
let config = SyncClientConfig::new(
"wss://sync.example.com".to_string(),
"ws123".to_string(),
PathBuf::from("/workspace"),
);
let body_config = config.body_config();
assert!(body_config.multiplexed);
}
#[test]
fn test_reconnect_delay() {
assert_eq!(std::cmp::min(1000 * 2u64.pow(0), 32000), 1000); assert_eq!(std::cmp::min(1000 * 2u64.pow(1), 32000), 2000); assert_eq!(std::cmp::min(1000 * 2u64.pow(2), 32000), 4000); assert_eq!(std::cmp::min(1000 * 2u64.pow(3), 32000), 8000); assert_eq!(std::cmp::min(1000 * 2u64.pow(4), 32000), 16000); assert_eq!(std::cmp::min(1000 * 2u64.pow(5), 32000), 32000); assert_eq!(std::cmp::min(1000 * 2u64.pow(6), 32000), 32000); }
#[test]
fn test_sync_event_variants() {
let event = SyncEvent::StatusChanged(ConnectionStatus::Connected);
assert!(matches!(event, SyncEvent::StatusChanged(_)));
let event = SyncEvent::FilesChanged {
paths: vec!["file.md".to_string()],
};
assert!(matches!(event, SyncEvent::FilesChanged { .. }));
let event = SyncEvent::Error {
message: "test".to_string(),
};
assert!(matches!(event, SyncEvent::Error { .. }));
}
#[test]
fn test_outgoing_sync_message_from_event() {
let event = FileSystemEvent::SendSyncMessage {
doc_name: "notes/test.md".to_string(),
message: vec![1, 2, 3],
is_body: true,
};
let msg = OutgoingSyncMessage::from_event(&event);
assert!(msg.is_some());
let msg = msg.unwrap();
assert_eq!(msg.doc_name, "notes/test.md");
assert_eq!(msg.message, vec![1, 2, 3]);
assert!(msg.is_body);
let event = FileSystemEvent::SendSyncMessage {
doc_name: "workspace".to_string(),
message: vec![4, 5, 6],
is_body: false,
};
let msg = OutgoingSyncMessage::from_event(&event).unwrap();
assert_eq!(msg.doc_name, "workspace");
assert!(!msg.is_body);
let event = FileSystemEvent::file_created(PathBuf::from("test.md"));
assert!(OutgoingSyncMessage::from_event(&event).is_none());
let event = FileSystemEvent::sync_status_changed("synced", None);
assert!(OutgoingSyncMessage::from_event(&event).is_none());
}
#[test]
fn test_create_sync_event_bridge() {
let (tx, rx) = std::sync::mpsc::channel::<OutgoingSyncMessage>();
let bridge = create_sync_event_bridge(tx);
let event = FileSystemEvent::SendSyncMessage {
doc_name: "test.md".to_string(),
message: vec![1, 2, 3],
is_body: true,
};
bridge(&event);
let received = rx.try_recv().unwrap();
assert_eq!(received.doc_name, "test.md");
assert_eq!(received.message, vec![1, 2, 3]);
assert!(received.is_body);
let event = FileSystemEvent::file_created(PathBuf::from("other.md"));
bridge(&event);
assert!(rx.try_recv().is_err());
}
}