use std::collections::HashMap;
use brume::vfs::VirtualPathBuf;
use log::{info, warn};
use tarpc::context::Context;
use tokio::sync::mpsc::UnboundedSender;
use brume_daemon_proto::{
AnyFsCreationInfo, AnyFsDescription, AnySynchroCreationInfo, AnySynchroRef, BrumeService,
SynchroId, SynchroSide, SynchroState,
};
use crate::{
daemon::{
ConflictResolutionRequest, StateChangeRequest, SynchroCreationRequest,
SynchroDeletionRequest, UserCommand,
},
synchro_list::ReadOnlySynchroList,
};
#[derive(Clone)]
pub struct Server {
commands_chan: UnboundedSender<UserCommand>,
synchro_list: ReadOnlySynchroList,
}
impl Server {
pub(crate) fn new(
commands_chan: UnboundedSender<UserCommand>,
synchro_list: ReadOnlySynchroList,
) -> Self {
Self {
commands_chan,
synchro_list,
}
}
}
impl BrumeService for Server {
async fn new_synchro(
self,
_context: Context,
local: AnyFsCreationInfo,
remote: AnyFsCreationInfo,
name: Option<String>,
) -> Result<(), String> {
let local_desc = AnyFsDescription::from(local.clone());
let remote_desc = AnyFsDescription::from(remote.clone());
info!("Received synchro creation request: local {local_desc}, remote {remote_desc}");
local
.validate()
.await
.inspect_err(|e| warn!("{e}, skipping"))?;
remote
.validate()
.await
.inspect_err(|e| warn!("{e}, skipping"))?;
{
let list = self.synchro_list.read().await;
if list.is_synchronized(&local_desc, &remote_desc).await {
warn!("Duplicate sync request, skipping");
return Err("Filesystems are already in sync".to_string());
}
}
let synchro = AnySynchroCreationInfo::new(local, remote, name);
let command = UserCommand::SynchroCreation(SynchroCreationRequest::new(synchro));
self.commands_chan
.send(command)
.map_err(|e| e.to_string())?;
Ok(())
}
async fn list_synchros(self, _context: Context) -> HashMap<SynchroId, AnySynchroRef> {
let list = self.synchro_list.read().await;
let mut ret = HashMap::new();
for (key, val) in list.synchro_ref_list() {
ret.insert(*key, val.read().await.clone());
}
ret
}
async fn delete_synchro(self, _context: Context, id: SynchroId) -> Result<(), String> {
info!("Received synchro deletion request: id {id:?}");
let command = UserCommand::SynchroDeletion(SynchroDeletionRequest::new(id));
self.commands_chan.send(command).map_err(|e| e.to_string())
}
async fn pause_synchro(self, _context: Context, id: SynchroId) -> Result<(), String> {
info!("Received synchro pause request: id {id:?}");
let request = StateChangeRequest::new(id, SynchroState::Paused);
let command = UserCommand::StateChange(request);
self.commands_chan.send(command).map_err(|e| e.to_string())
}
async fn resume_synchro(self, _context: Context, id: SynchroId) -> Result<(), String> {
info!("Received synchro resume request: id {id:?}");
let request = StateChangeRequest::new(id, SynchroState::Running);
let command = UserCommand::StateChange(request);
self.commands_chan.send(command).map_err(|e| e.to_string())
}
async fn resolve_conflict(
self,
_context: Context,
id: SynchroId,
path: VirtualPathBuf,
side: SynchroSide,
) -> Result<(), String> {
info!("Received conflict resolution request: id {id:?}, path {path:?}, side: {side:?}");
let (local_vfs, remote_vfs) = {
let list = self.synchro_list.read().await;
let local = list
.get_vfs(id, SynchroSide::Local)
.await
.map_err(|e| e.to_string())?;
let remote = list
.get_vfs(id, SynchroSide::Remote)
.await
.map_err(|e| e.to_string())?;
(local, remote)
};
let node = local_vfs
.find_node(&path)
.or_else(|| remote_vfs.find_node(&path))
.ok_or_else(|| "Invalid path".to_string())?;
if !node.state().is_conflict() {
return Err("Node is not in conflict".to_string());
}
let request = ConflictResolutionRequest::new(id, path, side);
let command = UserCommand::ConflictResolution(request);
self.commands_chan
.send(command)
.map_err(|e| e.to_string())?;
Ok(())
}
}