Skip to main content

brume_daemon/
server.rs

1//! The server provides rpc to remotely manipulate the list of synchronized Filesystems
2
3use std::collections::HashMap;
4
5use brume::vfs::VirtualPathBuf;
6use log::{info, warn};
7use tarpc::context::Context;
8use tokio::sync::mpsc::UnboundedSender;
9
10use brume_daemon_proto::{
11    AnyFsCreationInfo, AnyFsDescription, AnySynchroCreationInfo, AnySynchroRef, BrumeService,
12    SynchroId, SynchroSide, SynchroState,
13};
14
15use crate::{
16    daemon::{
17        ConflictResolutionRequest, StateChangeRequest, SynchroCreationRequest,
18        SynchroDeletionRequest, UserCommand,
19    },
20    synchro_list::ReadOnlySynchroList,
21};
22
23/// A Server that handle RPC connections from client applications
24///
25/// The server and the [`Daemon`] are running in separate tasks to be able to give a quick feedback
26/// to client applications even when a synchronization is in progress.
27///
28/// [`Daemon`]: crate::daemon::Daemon
29#[derive(Clone)]
30pub struct Server {
31    commands_chan: UnboundedSender<UserCommand>,
32    synchro_list: ReadOnlySynchroList,
33}
34
35impl Server {
36    pub(crate) fn new(
37        commands_chan: UnboundedSender<UserCommand>,
38        synchro_list: ReadOnlySynchroList,
39    ) -> Self {
40        Self {
41            commands_chan,
42            synchro_list,
43        }
44    }
45}
46
47impl BrumeService for Server {
48    async fn new_synchro(
49        self,
50        _context: Context,
51        local: AnyFsCreationInfo,
52        remote: AnyFsCreationInfo,
53        name: Option<String>,
54    ) -> Result<(), String> {
55        let local_desc = AnyFsDescription::from(local.clone());
56        let remote_desc = AnyFsDescription::from(remote.clone());
57        info!("Received synchro creation request: local {local_desc}, remote {remote_desc}");
58
59        // Check if the info are suitable for filesystem creation
60        local
61            .validate()
62            .await
63            .inspect_err(|e| warn!("{e}, skipping"))?;
64        remote
65            .validate()
66            .await
67            .inspect_err(|e| warn!("{e}, skipping"))?;
68
69        // Check if the fs pair is already in sync to return an error to the user
70        {
71            let list = self.synchro_list.read().await;
72
73            if list.is_synchronized(&local_desc, &remote_desc).await {
74                warn!("Duplicate sync request, skipping");
75                return Err("Filesystems are already in sync".to_string());
76            }
77        }
78
79        let synchro = AnySynchroCreationInfo::new(local, remote, name);
80        let command = UserCommand::SynchroCreation(SynchroCreationRequest::new(synchro));
81
82        self.commands_chan
83            .send(command)
84            .map_err(|e| e.to_string())?;
85
86        Ok(())
87    }
88
89    async fn list_synchros(self, _context: Context) -> HashMap<SynchroId, AnySynchroRef> {
90        let list = self.synchro_list.read().await;
91
92        let mut ret = HashMap::new();
93        for (key, val) in list.synchro_ref_list() {
94            ret.insert(*key, val.read().await.clone());
95        }
96
97        ret
98    }
99
100    async fn delete_synchro(self, _context: Context, id: SynchroId) -> Result<(), String> {
101        info!("Received synchro deletion request: id {id:?}");
102
103        let command = UserCommand::SynchroDeletion(SynchroDeletionRequest::new(id));
104        self.commands_chan.send(command).map_err(|e| e.to_string())
105    }
106
107    async fn pause_synchro(self, _context: Context, id: SynchroId) -> Result<(), String> {
108        info!("Received synchro pause request: id {id:?}");
109        let request = StateChangeRequest::new(id, SynchroState::Paused);
110        let command = UserCommand::StateChange(request);
111        self.commands_chan.send(command).map_err(|e| e.to_string())
112    }
113
114    async fn resume_synchro(self, _context: Context, id: SynchroId) -> Result<(), String> {
115        info!("Received synchro resume request: id {id:?}");
116        let request = StateChangeRequest::new(id, SynchroState::Running);
117        let command = UserCommand::StateChange(request);
118        self.commands_chan.send(command).map_err(|e| e.to_string())
119    }
120
121    async fn resolve_conflict(
122        self,
123        _context: Context,
124        id: SynchroId,
125        path: VirtualPathBuf,
126        side: SynchroSide,
127    ) -> Result<(), String> {
128        info!("Received conflict resolution request: id {id:?}, path {path:?}, side: {side:?}");
129        // Check if the synchro is valid and if the file exist to be able to return an early error
130        let (local_vfs, remote_vfs) = {
131            let list = self.synchro_list.read().await;
132
133            let local = list
134                .get_vfs(id, SynchroSide::Local)
135                .await
136                .map_err(|e| e.to_string())?;
137
138            let remote = list
139                .get_vfs(id, SynchroSide::Remote)
140                .await
141                .map_err(|e| e.to_string())?;
142
143            (local, remote)
144        };
145
146        let node = local_vfs
147            .find_node(&path)
148            .or_else(|| remote_vfs.find_node(&path))
149            .ok_or_else(|| "Invalid path".to_string())?;
150
151        if !node.state().is_conflict() {
152            return Err("Node is not in conflict".to_string());
153        }
154
155        let request = ConflictResolutionRequest::new(id, path, side);
156        let command = UserCommand::ConflictResolution(request);
157
158        self.commands_chan
159            .send(command)
160            .map_err(|e| e.to_string())?;
161        Ok(())
162    }
163}