Skip to main content

brume_daemon/
daemon.rs

1//! Handles the list of [`brume::synchro::Synchro`] in the background
2//!
3//! The daemon can be queried through the [`Server`] by multiple client applications to add or
4//! remove filesystem pairs to synchronize. It regularly synchronizes them.
5
6use std::{
7    future::Future,
8    io,
9    path::PathBuf,
10    sync::{
11        Arc,
12        atomic::{AtomicBool, Ordering},
13    },
14    time::Duration,
15};
16
17use anyhow::{Context, Result, anyhow};
18
19use brume::vfs::VirtualPathBuf;
20use futures::StreamExt;
21use interprocess::local_socket::{
22    GenericNamespaced, ListenerOptions, ToNsName, tokio::Listener, traits::tokio::Listener as _,
23};
24use log::{error, info, warn};
25use tarpc::{
26    serde_transport,
27    server::{BaseChannel, Channel},
28    tokio_serde::formats::Bincode,
29    tokio_util::{
30        codec::{LengthDelimitedCodec, length_delimited::Builder},
31        sync::CancellationToken,
32    },
33};
34use tokio::{
35    sync::{
36        Mutex,
37        mpsc::{UnboundedReceiver, unbounded_channel},
38    },
39    task::JoinHandle,
40    time,
41};
42
43use brume_daemon_proto::{
44    AnySynchroCreationInfo, BRUME_SOCK_NAME, BrumeService, SynchroId, SynchroSide, SynchroState,
45};
46
47use crate::{
48    db::{Database, DatabaseConfig},
49    server::Server,
50    synchro_list::ReadWriteSynchroList,
51};
52
53/// Configuration of a [`Daemon`]
54#[derive(Clone)]
55pub struct DaemonConfig {
56    /// Time between two synchronizations
57    sync_interval: Duration,
58    /// How internal errors are handled
59    error_mode: ErrorMode,
60    /// Name of the unix socket used to communicate with the daemon
61    sock_name: String,
62    /// Config of the sqlite database
63    db: DatabaseConfig,
64}
65
66impl Default for DaemonConfig {
67    fn default() -> Self {
68        Self {
69            sync_interval: Duration::from_secs(10),
70            error_mode: ErrorMode::default(),
71            sock_name: BRUME_SOCK_NAME.to_string(),
72            db: DatabaseConfig::OnDisk(PathBuf::from("./dev.db")), // TODO: change this
73        }
74    }
75}
76
77impl DaemonConfig {
78    pub fn with_sync_interval(self, sync_interval: Duration) -> Self {
79        Self {
80            sync_interval,
81            ..self
82        }
83    }
84
85    pub fn with_error_mode(self, error_mode: ErrorMode) -> Self {
86        Self { error_mode, ..self }
87    }
88
89    pub fn with_sock_name(self, sock_name: &str) -> Self {
90        Self {
91            sock_name: sock_name.to_string(),
92            ..self
93        }
94    }
95
96    pub fn with_db_config(self, db_config: DatabaseConfig) -> Self {
97        Self {
98            db: db_config,
99            ..self
100        }
101    }
102}
103
104/// How errors should be handled by the daemon
105#[derive(Default, Copy, Clone, PartialEq, Eq)]
106pub enum ErrorMode {
107    #[default]
108    Log,
109    Exit,
110}
111
112/// The different commands that can be received from user applications
113#[derive(Debug)]
114pub enum UserCommand {
115    SynchroCreation(SynchroCreationRequest),
116    SynchroDeletion(SynchroDeletionRequest),
117    StateChange(StateChangeRequest),
118    ConflictResolution(ConflictResolutionRequest),
119}
120
121/// A command to create a new synchro
122#[derive(Debug)]
123pub struct SynchroCreationRequest {
124    info: AnySynchroCreationInfo,
125}
126
127impl SynchroCreationRequest {
128    pub fn new(info: AnySynchroCreationInfo) -> Self {
129        Self { info }
130    }
131}
132
133/// A command to delete an existing synchro
134#[derive(Debug)]
135pub struct SynchroDeletionRequest {
136    id: SynchroId,
137}
138
139impl SynchroDeletionRequest {
140    pub fn new(id: SynchroId) -> Self {
141        Self { id }
142    }
143}
144
145/// A user request to change the [`SynchroState`] of a synchro
146#[derive(Debug)]
147pub struct StateChangeRequest {
148    id: SynchroId,
149    state: SynchroState,
150}
151
152impl StateChangeRequest {
153    pub fn new(id: SynchroId, state: SynchroState) -> Self {
154        Self { id, state }
155    }
156}
157
158/// A user request for a conflict resolution
159#[derive(Debug)]
160pub struct ConflictResolutionRequest {
161    id: SynchroId,
162    path: VirtualPathBuf,
163    side: SynchroSide,
164}
165
166impl ConflictResolutionRequest {
167    pub fn new(id: SynchroId, path: VirtualPathBuf, side: SynchroSide) -> Self {
168        Self { id, path, side }
169    }
170}
171
172/// The daemon holds the list of the synchronized folders, and synchronize them regularly.
173///
174/// It can be queried by client applications through the [`Server`].
175pub struct Daemon {
176    codec_builder: Builder,
177    rpc_listener: Listener,
178    synchro_list: ReadWriteSynchroList,
179    server: Server,
180    commands_chan: Mutex<UnboundedReceiver<UserCommand>>,
181    is_running: AtomicBool,
182    cancellation_token: CancellationToken,
183    database: Database,
184    config: DaemonConfig,
185}
186
187impl Daemon {
188    pub async fn new(config: DaemonConfig) -> Result<Self> {
189        let name = config
190            .sock_name
191            .as_str()
192            .to_ns_name::<GenericNamespaced>()
193            .context("Invalid name for sock")?;
194        let opts = ListenerOptions::new().name(name);
195        let listener = match opts.create_tokio() {
196            Err(e) if e.kind() == io::ErrorKind::AddrInUse => {
197                error!(
198                    "Error: could not start server because the socket file is occupied. \
199Please check if {} is in use by another process and try again.",
200                    config.sock_name
201                );
202                return Err(e).context("Failed to start server");
203            }
204            x => x?,
205        };
206
207        let codec_builder = LengthDelimitedCodec::builder();
208        let (commands_to_daemon, commands_from_server) = unbounded_channel();
209
210        info!("Loading db: {}", config.db.as_str().unwrap());
211        let database = Database::new(&config.db).await?;
212
213        let synchro_list = ReadWriteSynchroList::from(database.load_all_synchros().await.unwrap());
214
215        info!("Server running at {}", config.sock_name);
216        let server = Server::new(commands_to_daemon, synchro_list.as_read_only());
217
218        Ok(Self {
219            codec_builder,
220            rpc_listener: listener,
221            synchro_list,
222            server,
223            commands_chan: Mutex::new(commands_from_server),
224            is_running: AtomicBool::new(false),
225            cancellation_token: CancellationToken::new(),
226            config,
227            database,
228        })
229    }
230
231    /// Requests to stop the running daemon
232    pub fn stop(&self) {
233        self.cancellation_token.cancel()
234    }
235
236    /// Spawns a new tokio task and runs the daemon inside it
237    pub async fn spawn(self: &Arc<Self>) -> JoinHandle<Result<()>> {
238        let daemon = self.clone();
239        tokio::spawn(async move { daemon.run().await })
240    }
241
242    /// Handles client connections and periodically synchronize filesystems
243    pub async fn run(self: Arc<Self>) -> Result<()> {
244        if self
245            .is_running
246            .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
247            .is_err()
248        {
249            return Err(anyhow!("Daemon is already running"));
250        }
251        // Handle connections from client apps
252        {
253            let server = self.clone();
254            tokio::spawn(async move { server.serve().await });
255        }
256
257        // Synchronize all filesystems
258        let mut interval = time::interval(self.config.sync_interval);
259        interval.tick().await; // The first interval is immediate
260
261        loop {
262            info!("Starting full sync for all filesystems");
263            let synchro_list = self.synchro_list();
264
265            let results = synchro_list.sync_all(&self.database).await;
266
267            for res in results {
268                if let Err(err) = res {
269                    let wrapped_err = anyhow!(err);
270                    error!("Failed to synchronize filesystems: {wrapped_err:?}");
271                    if self.config.error_mode == ErrorMode::Exit {
272                        self.is_running.store(false, Ordering::Relaxed);
273                        return Err(wrapped_err);
274                    }
275                }
276            }
277
278            // Wait and update synchro list with any new sync from user
279            loop {
280                tokio::select! {
281                    _ = interval.tick() => break,
282                    Some(command) = self.recv_user_commands() => {
283                        let res = self.handle_user_commands(command).await;
284                        res.inspect_err(|_| {
285                            self.is_running.store(false, Ordering::Relaxed);
286                        })?
287                    }
288                    _ = self.cancellation_token.cancelled() => {
289                        self.is_running.store(false, Ordering::Relaxed);
290                        return Ok(())
291                    }
292                }
293            }
294        }
295    }
296
297    /// Starts a new rpc server that will handle incoming requests from client applications
298    pub async fn serve(&self) {
299        async fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
300            tokio::spawn(fut);
301        }
302
303        loop {
304            let res = tokio::select! {
305                res = self.rpc_listener.accept() => res,
306                _ = self.cancellation_token.cancelled() => {
307                    return
308                }
309            };
310
311            let conn = match res {
312                Ok(c) => c,
313                Err(e) => {
314                    warn!("There was an error with an incoming connection: {e}");
315                    continue;
316                }
317            };
318
319            let transport =
320                serde_transport::new(self.codec_builder.new_framed(conn), Bincode::default());
321
322            let fut = BaseChannel::with_defaults(transport)
323                .execute(self.server.clone().serve())
324                .for_each(spawn);
325
326            let token = self.cancellation_token.child_token();
327            tokio::spawn(async move {
328                tokio::select! {
329                    _ = fut => {},
330                    _ = token.cancelled() => {}
331                }
332            });
333        }
334    }
335
336    /// Returns true if the server is running
337    pub fn is_running(&self) -> bool {
338        self.is_running.load(Ordering::Relaxed)
339    }
340
341    /// The list of the synchronized fs
342    pub fn synchro_list(&self) -> ReadWriteSynchroList {
343        self.synchro_list.clone()
344    }
345
346    /// Receives messages of client applications
347    pub async fn recv_user_commands(&self) -> Option<UserCommand> {
348        let mut receiver = self.commands_chan.lock().await;
349        receiver.recv().await
350    }
351
352    /// Creates a new synchro
353    pub async fn create_synchro(&self, synchro: SynchroCreationRequest) -> Result<()> {
354        let info = synchro.info;
355        let created = self.synchro_list.insert(info.clone()).await?;
356
357        self.database.insert_synchro(created, info).await?;
358
359        Ok(())
360    }
361
362    /// Deletes a synchro
363    pub async fn delete_synchro(&self, synchro: SynchroDeletionRequest) -> Result<()> {
364        self.synchro_list.remove(synchro.id).await?;
365        self.database.delete_synchro(synchro.id).await?;
366
367        Ok(())
368    }
369
370    pub async fn update_synchro_state(&self, state_request: StateChangeRequest) -> Result<()> {
371        self.synchro_list
372            .read()
373            .await
374            .set_state(state_request.id, state_request.state)
375            .await?;
376        self.database
377            .set_synchro_state(state_request.id, state_request.state)
378            .await?;
379
380        Ok(())
381    }
382
383    /// Handles messages of client applications
384    pub async fn handle_user_commands(&self, command: UserCommand) -> Result<()> {
385        match command {
386            UserCommand::SynchroCreation(new_synchro) => {
387                if let Err(err) = self.create_synchro(new_synchro).await {
388                    let wrapped_err = anyhow!(err);
389                    error!("Failed to insert new synchro: {wrapped_err:?}");
390                    if self.config.error_mode == ErrorMode::Exit {
391                        return Err(wrapped_err);
392                    }
393                }
394            }
395
396            UserCommand::SynchroDeletion(to_delete) => {
397                if let Err(err) = self.delete_synchro(to_delete).await {
398                    let wrapped_err = anyhow!(err);
399                    error!("Failed to delete synchro: {wrapped_err:?}");
400                    if self.config.error_mode == ErrorMode::Exit {
401                        return Err(wrapped_err);
402                    }
403                }
404            }
405            UserCommand::StateChange(state_request) => {
406                if let Err(err) = self.update_synchro_state(state_request).await {
407                    let wrapped_err = anyhow!(err);
408                    error!("Failed to set synchro state: {wrapped_err:?}");
409                    if self.config.error_mode == ErrorMode::Exit {
410                        return Err(wrapped_err);
411                    }
412                }
413            }
414            UserCommand::ConflictResolution(conflict) => {
415                // TODO: resolve conflict can be slow and should be performed in another task
416                // to not block the receiver thread
417                let res = self
418                    .synchro_list
419                    .resolve_conflict(conflict.id, &conflict.path, conflict.side, &self.database)
420                    .await;
421                if let Err(err) = res {
422                    let wrapped_err = anyhow!(err);
423                    error!("Failed resolve conflict: {wrapped_err:?}");
424                    if self.config.error_mode == ErrorMode::Exit {
425                        return Err(wrapped_err);
426                    }
427                }
428            }
429        }
430
431        Ok(())
432    }
433}