use crate::workspace_persistence::WorkspacePersistence;
use crate::{Error, Result};
use notify::{Config, Event, RecommendedWatcher, Watcher};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
use tokio::time::{timeout, Duration};
use tracing::{debug, error, info, warn};
pub struct SyncWatcher {
watchers: HashMap<String, RecommendedWatcher>,
running: Arc<Mutex<bool>>,
persistence: Arc<WorkspacePersistence>,
}
#[derive(Debug, Clone)]
pub enum SyncEvent {
FileCreated {
workspace_id: String,
path: PathBuf,
content: String,
},
FileModified {
workspace_id: String,
path: PathBuf,
content: String,
},
FileDeleted {
workspace_id: String,
path: PathBuf,
},
DirectoryChanged {
workspace_id: String,
changes: Vec<FileChange>,
},
}
#[derive(Debug, Clone)]
pub struct FileChange {
pub path: PathBuf,
pub kind: ChangeKind,
pub content: Option<String>,
}
#[derive(Debug, Clone)]
pub enum ChangeKind {
Created,
Modified,
Deleted,
}
impl SyncWatcher {
pub fn new<P: AsRef<Path>>(workspace_dir: P) -> Self {
let persistence = Arc::new(WorkspacePersistence::new(workspace_dir));
Self {
watchers: HashMap::new(),
running: Arc::new(Mutex::new(false)),
persistence,
}
}
pub async fn start_monitoring(&mut self, workspace_id: &str, directory: &str) -> Result<()> {
let directory_path = PathBuf::from(directory);
if !directory_path.exists() {
std::fs::create_dir_all(&directory_path)
.map_err(|e| Error::io_with_context("create sync directory", e.to_string()))?;
}
let (tx, mut rx) = mpsc::channel(100);
let workspace_id_string = workspace_id.to_string();
let workspace_id_for_watcher = workspace_id_string.clone();
let workspace_id_for_processing = workspace_id_string.clone();
let directory_path_clone = directory_path.clone();
let directory_path_for_processing = directory_path.clone();
let directory_str = directory.to_string();
let config = Config::default()
.with_poll_interval(Duration::from_secs(1))
.with_compare_contents(true);
let mut watcher = RecommendedWatcher::new(
move |res: notify::Result<Event>| {
if let Ok(event) = res {
debug!("File system event: {:?}", event);
let tx_clone = tx.clone();
let workspace_id_clone = workspace_id_string.clone();
let dir_clone = directory_path_clone.clone();
tokio::spawn(async move {
if let Err(e) = Self::handle_fs_event(
&tx_clone,
&workspace_id_clone,
&dir_clone,
&event,
)
.await
{
error!("Failed to handle file system event: {}", e);
}
});
}
},
config,
)
.map_err(|e| Error::io_with_context("create file watcher", e.to_string()))?;
watcher
.watch(&directory_path, notify::RecursiveMode::Recursive)
.map_err(|e| Error::io_with_context("watch directory", e.to_string()))?;
self.watchers.insert(workspace_id_for_watcher, watcher);
let persistence_clone = self.persistence.clone();
let is_running = self.running.clone();
tokio::spawn(async move {
info!(
"Started monitoring workspace {} in directory {}",
workspace_id_for_processing, directory_str
);
info!(
workspace_id = %workspace_id_for_processing,
directory = %directory_str,
"Monitoring workspace directory"
);
while *is_running.lock().await {
match timeout(Duration::from_millis(100), rx.recv()).await {
Ok(Some(event)) => {
if let Err(e) = Self::process_sync_event(
&persistence_clone,
&workspace_id_for_processing,
&directory_path_for_processing,
event,
)
.await
{
error!("Failed to process sync event: {}", e);
}
}
Ok(None) => break, Err(_) => continue, }
}
info!(
"Stopped monitoring workspace {} in directory {}",
workspace_id_for_processing, directory_str
);
info!(
workspace_id = %workspace_id_for_processing,
directory = %directory_str,
"Stopped monitoring workspace directory"
);
});
Ok(())
}
pub async fn stop_monitoring(&mut self, workspace_id: &str) -> Result<()> {
if let Some(watcher) = self.watchers.remove(workspace_id) {
drop(watcher);
}
Ok(())
}
pub async fn stop_all(&mut self) -> Result<()> {
*self.running.lock().await = false;
self.watchers.clear();
Ok(())
}
async fn handle_fs_event(
tx: &mpsc::Sender<SyncEvent>,
workspace_id: &str,
base_dir: &Path,
event: &Event,
) -> Result<()> {
let mut changes = Vec::new();
for path in &event.paths {
let relative_path = path.strip_prefix(base_dir).unwrap_or(path);
if relative_path.starts_with(".")
|| relative_path
.file_name()
.map(|n| n.to_string_lossy().starts_with("."))
.unwrap_or(false)
{
continue;
}
if let Some(extension) = path.extension() {
if extension != "yaml" && extension != "yml" {
continue;
}
}
match event.kind {
notify::EventKind::Create(_) => {
if let Ok(content) = tokio::fs::read_to_string(&path).await {
changes.push(FileChange {
path: relative_path.to_path_buf(),
kind: ChangeKind::Created,
content: Some(content),
});
}
}
notify::EventKind::Modify(_) => {
if let Ok(content) = tokio::fs::read_to_string(&path).await {
changes.push(FileChange {
path: relative_path.to_path_buf(),
kind: ChangeKind::Modified,
content: Some(content),
});
}
}
notify::EventKind::Remove(_) => {
changes.push(FileChange {
path: relative_path.to_path_buf(),
kind: ChangeKind::Deleted,
content: None,
});
}
_ => {}
}
}
if !changes.is_empty() {
let _ = tx
.send(SyncEvent::DirectoryChanged {
workspace_id: workspace_id.to_string(),
changes,
})
.await;
}
Ok(())
}
async fn process_sync_event(
persistence: &WorkspacePersistence,
_workspace_id: &str,
_directory: &Path,
event: SyncEvent,
) -> Result<()> {
if let SyncEvent::DirectoryChanged {
workspace_id,
changes,
} = event
{
info!("Processing {} file changes for workspace {}", changes.len(), workspace_id);
if !changes.is_empty() {
info!(
workspace_id = %workspace_id,
count = changes.len(),
"Detected file changes in workspace"
);
}
for change in changes {
match change.kind {
ChangeKind::Created => {
info!(path = %change.path.display(), "File created");
if let Some(content) = change.content {
if let Err(e) = Self::import_yaml_content(
persistence,
&workspace_id,
&change.path,
&content,
)
.await
{
warn!("Failed to import file {}: {}", change.path.display(), e);
} else {
info!(path = %change.path.display(), "Successfully imported");
}
}
}
ChangeKind::Modified => {
info!(path = %change.path.display(), "File modified");
if let Some(content) = change.content {
if let Err(e) = Self::import_yaml_content(
persistence,
&workspace_id,
&change.path,
&content,
)
.await
{
warn!("Failed to import file {}: {}", change.path.display(), e);
} else {
info!(path = %change.path.display(), "Successfully updated");
}
}
}
ChangeKind::Deleted => {
info!(
path = %change.path.display(),
workspace_id = %workspace_id,
"File deleted from watched directory — workspace may be out of sync. \
Re-export workspace or restart sync to reconcile."
);
}
}
}
}
Ok(())
}
async fn import_yaml_content(
persistence: &WorkspacePersistence,
workspace_id: &str,
path: &Path,
content: &str,
) -> Result<()> {
let workspace = persistence.load_workspace(workspace_id).await?;
if !matches!(workspace.get_sync_direction(), crate::workspace::SyncDirection::Bidirectional)
{
debug!("Workspace {} is not configured for bidirectional sync", workspace_id);
return Ok(());
}
if let Ok(_export) =
serde_yaml::from_str::<crate::workspace_persistence::WorkspaceExport>(content)
{
info!(
"Detected workspace export for {}, skipping full import to avoid conflicts",
workspace_id
);
debug!("Skipping workspace export to avoid conflicts");
return Ok(());
}
if let Ok(request) = serde_yaml::from_str::<crate::workspace::MockRequest>(content) {
debug!("Importing request {} from {}", request.name, path.display());
let mut workspace = persistence.load_workspace(workspace_id).await?;
workspace.add_request(request)?;
persistence.save_workspace(&workspace).await?;
info!(
"Successfully imported request from {} into workspace {}",
path.display(),
workspace_id
);
} else {
debug!("Content in {} is not a recognized format, skipping", path.display());
return Err(Error::io_with_context(
"import YAML content",
"File is not a recognized format (expected MockRequest YAML)",
));
}
Ok(())
}
pub async fn is_monitoring(&self, workspace_id: &str) -> bool {
self.watchers.contains_key(workspace_id)
}
pub fn get_monitored_workspaces(&self) -> Vec<String> {
self.watchers.keys().cloned().collect()
}
}
impl Drop for SyncWatcher {
fn drop(&mut self) {
}
}
pub struct SyncService {
watcher: Arc<Mutex<SyncWatcher>>,
running: Arc<Mutex<bool>>,
}
impl SyncService {
pub fn new<P: AsRef<Path>>(workspace_dir: P) -> Self {
let watcher = Arc::new(Mutex::new(SyncWatcher::new(workspace_dir)));
Self {
watcher,
running: Arc::new(Mutex::new(false)),
}
}
pub async fn start(&self) -> Result<()> {
let mut running = self.running.lock().await;
*running = true;
info!("Sync service started");
Ok(())
}
pub async fn stop(&self) -> Result<()> {
let mut running = self.running.lock().await;
*running = false;
let mut watcher = self.watcher.lock().await;
watcher.stop_all().await?;
info!("Sync service stopped");
Ok(())
}
pub async fn monitor_workspace(&self, workspace_id: &str, directory: &str) -> Result<()> {
let mut watcher = self.watcher.lock().await;
watcher.start_monitoring(workspace_id, directory).await?;
Ok(())
}
pub async fn stop_monitoring_workspace(&self, workspace_id: &str) -> Result<()> {
let mut watcher = self.watcher.lock().await;
watcher.stop_monitoring(workspace_id).await?;
Ok(())
}
pub async fn is_workspace_monitored(&self, workspace_id: &str) -> bool {
let watcher = self.watcher.lock().await;
watcher.is_monitoring(workspace_id).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_sync_service_creation() {
let temp_dir = TempDir::new().unwrap();
let service = SyncService::new(temp_dir.path());
assert!(!*service.running.lock().await);
}
#[tokio::test]
async fn test_sync_service_lifecycle() {
let temp_dir = TempDir::new().unwrap();
let service = SyncService::new(temp_dir.path());
service.start().await.unwrap();
assert!(*service.running.lock().await);
service.stop().await.unwrap();
assert!(!*service.running.lock().await);
}
}