Skip to main content

brume_daemon/db/
mod.rs

1//! The sqlite database that is use to persist the state of the daemon
2
3use std::error::Error;
4use std::path::{Path, PathBuf};
5
6use deadpool_diesel::PoolError;
7use deadpool_diesel::{
8    Runtime,
9    sqlite::{Manager, Pool},
10};
11use diesel::prelude::*;
12use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
13use futures::future::try_join;
14use thiserror::Error;
15use tokio::sync::RwLock;
16use uuid::Uuid;
17
18use brume::concrete::{local::LocalSyncInfo, nextcloud::NextcloudSyncInfo};
19use brume_daemon_proto::{
20    AnyFsCreationInfo, AnyFsRef, AnySynchroCreationInfo, AnySynchroRef, SynchroId, SynchroState,
21    SynchroStatus,
22};
23
24use crate::{
25    schema::{filesystems, synchros},
26    synchro_list::{CreatedSynchro, SynchroList},
27};
28
29pub mod vfs;
30
31/// Information loaded from a filesystem in the db
32#[derive(Queryable, Selectable, Identifiable)]
33#[diesel(table_name = filesystems)]
34#[diesel(check_for_backend(diesel::sqlite::Sqlite))]
35struct DbFileSystem {
36    id: i32,
37    uuid: Vec<u8>,
38    creation_info: Vec<u8>,
39}
40
41/// Information needed to insert a new filesystem in the db
42#[derive(Insertable)]
43#[diesel(table_name = filesystems)]
44struct DbNewFileSystem<'a> {
45    uuid: &'a [u8],
46    creation_info: &'a [u8],
47    root_node: i32,
48}
49
50/// Information loaded from a synchro in the db
51#[derive(Queryable, Selectable, Identifiable)]
52#[diesel(table_name = synchros)]
53#[diesel(check_for_backend(diesel::sqlite::Sqlite))]
54struct DbSynchro {
55    id: i32,
56    uuid: Vec<u8>,
57    name: String,
58    local_fs: i32,
59    remote_fs: i32,
60    state: String,
61    status: String,
62}
63
64/// Information needed to insert a new synchro in the db
65#[derive(Insertable)]
66#[diesel(table_name = synchros)]
67struct DbNewSynchro<'a> {
68    uuid: &'a [u8],
69    name: &'a str,
70    local_fs: i32,
71    remote_fs: i32,
72    status: &'a str,
73    state: &'a str,
74}
75
76/// Metadata retrevied from a loaded filesystem in the db, allows to re-create the concrete FS.
77///
78/// Does not hold the Vfs
79#[derive(Clone)]
80pub struct LoadedFileSystem {
81    uuid: Uuid,
82    creation_info: AnyFsCreationInfo,
83}
84
85impl From<LoadedFileSystem> for AnyFsRef {
86    fn from(value: LoadedFileSystem) -> Self {
87        Self::new(value.uuid, value.creation_info.into())
88    }
89}
90
91impl From<LoadedFileSystem> for AnyFsCreationInfo {
92    fn from(value: LoadedFileSystem) -> Self {
93        value.creation_info
94    }
95}
96
97/// A connection to the database
98pub struct Database {
99    pool: Pool,
100}
101
102// Loads migrations from the sql in crates/brume-daemon/migrations
103const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
104
105/// The config used for the daemon database
106#[derive(Clone)]
107pub enum DatabaseConfig {
108    InMemory,
109    OnDisk(PathBuf),
110}
111
112impl DatabaseConfig {
113    pub fn as_str(&self) -> Option<&str> {
114        match self {
115            DatabaseConfig::InMemory => Some(":memory:"),
116            DatabaseConfig::OnDisk(path_buf) => path_buf.as_os_str().to_str(),
117        }
118    }
119
120    fn to_string_lossy(&self) -> String {
121        match self {
122            DatabaseConfig::InMemory => String::from(":memory:"),
123            DatabaseConfig::OnDisk(path_buf) => path_buf.as_os_str().to_string_lossy().to_string(),
124        }
125    }
126
127    pub fn new_ondisk<P: AsRef<Path>>(path: P) -> Self {
128        Self::OnDisk(path.as_ref().to_path_buf())
129    }
130
131    pub fn new_inmemory() -> Self {
132        Self::InMemory
133    }
134}
135
136#[derive(Error, Debug)]
137#[error("Failed to create database")]
138pub enum DatabaseCreationError {
139    #[error("Invalid path: {0}")]
140    InvalidDbPath(String),
141    #[error("Failed to update db to the latest schema")]
142    MigrationError(#[source] Box<dyn Error + Send + Sync>),
143}
144
145#[derive(Error, Debug)]
146#[error("Failed to create database")]
147pub enum DatabaseError {
148    #[error("failed to connect to the database")]
149    ConnectionError(#[from] PoolError),
150    #[error("Invalid database state")]
151    InvalidState(#[from] diesel::result::Error),
152    #[error("Data value found in the database in table {table} column {column} is not valid")]
153    InvalidData {
154        table: String,
155        column: String,
156        source: Option<Box<dyn Error + Send + Sync>>,
157    },
158    #[error("Failed to serialize data before database insert")]
159    SerializationError(#[from] bincode::Error),
160}
161
162impl DatabaseError {
163    fn invalid_data(
164        table: &str,
165        column: &str,
166        source: Option<Box<dyn Error + Send + Sync>>,
167    ) -> Self {
168        Self::InvalidData {
169            table: table.to_string(),
170            column: column.to_string(),
171            source,
172        }
173    }
174}
175
176impl Database {
177    /// Creates a new empty database from the config
178    pub async fn new(config: &DatabaseConfig) -> Result<Self, DatabaseCreationError> {
179        let db_str = config
180            .as_str()
181            .ok_or_else(|| DatabaseCreationError::InvalidDbPath(config.to_string_lossy()))?;
182        let manager = Manager::new(db_str, Runtime::Tokio1);
183
184        // Pool size is set to 1 because most operations will be writes, and sqlite is not
185        // well suited for concurrent accesses
186        // Ok to unwrap because this cannot fail if a runtime is provided
187        let pool = Pool::builder(manager).max_size(1).build().unwrap();
188
189        let conn = pool
190            .get()
191            .await
192            .map_err(|_| DatabaseCreationError::InvalidDbPath(config.to_string_lossy()))?;
193        conn.interact(|conn| conn.run_pending_migrations(MIGRATIONS).map(|_| ()))
194            .await
195            .unwrap() // This should never fail unless the inner closure panics
196            .map_err(DatabaseCreationError::MigrationError)?;
197
198        Ok(Self { pool })
199    }
200
201    /// Loads all the filesystems, regardless of the synchro they belong to
202    #[cfg(test)]
203    async fn load_all_filesystems(&self) -> Result<Vec<LoadedFileSystem>, DatabaseError> {
204        use crate::schema::filesystems::dsl::*;
205
206        let results = {
207            let conn = self.pool.get().await?;
208            conn.interact(|conn| filesystems.select(DbFileSystem::as_select()).load(conn))
209                .await
210                .unwrap() // This should never fail unless the inner closure panics
211        }?;
212
213        results
214            .into_iter()
215            .map(|db_fs| {
216                Ok(LoadedFileSystem {
217                    uuid: Uuid::from_slice(&db_fs.uuid).map_err(|e| {
218                        DatabaseError::invalid_data("filesystems", "uuid", Some(Box::new(e)))
219                    })?,
220                    creation_info: bincode::deserialize(&db_fs.creation_info).map_err(|e| {
221                        DatabaseError::invalid_data(
222                            "filesystems",
223                            "creation_info",
224                            Some(Box::new(e)),
225                        )
226                    })?,
227                })
228            })
229            .collect()
230    }
231
232    /// Loads a single filesystem from its id
233    async fn load_filesystem_from_id(&self, fs_id: i32) -> Result<LoadedFileSystem, DatabaseError> {
234        use crate::schema::filesystems::dsl::*;
235
236        let db_fs = {
237            let conn = self.pool.get().await?;
238            conn.interact(move |conn| {
239                filesystems
240                    .filter(id.eq(fs_id))
241                    .select(DbFileSystem::as_select())
242                    .first(conn)
243            })
244            .await
245            .unwrap() // This should never fail unless the inner closure panics
246        }?;
247
248        Ok(LoadedFileSystem {
249            uuid: Uuid::from_slice(&db_fs.uuid).unwrap(),
250            creation_info: bincode::deserialize(&db_fs.creation_info).unwrap(),
251        })
252    }
253
254    /// Inserts a new filesystem in the db
255    pub async fn insert_new_filesystem(
256        &self,
257        fs_uuid: Uuid,
258        fs: &AnyFsCreationInfo,
259    ) -> Result<i32, DatabaseError> {
260        use crate::schema::filesystems::dsl::*;
261
262        let info = bincode::serialize(fs)?;
263
264        let vfs_root = self.insert_vfs_root().await?;
265
266        let conn = self.pool.get().await?;
267        conn.interact(move |conn| {
268            let new_fs = DbNewFileSystem {
269                uuid: fs_uuid.as_bytes(),
270                creation_info: &info,
271                root_node: vfs_root,
272            };
273
274            diesel::insert_into(filesystems)
275                .values(&new_fs)
276                .returning(id)
277                .get_result(conn)
278        })
279        .await
280        .unwrap() // This should never fail unless the inner closure panics
281        .map_err(|e| e.into())
282    }
283
284    /// Deletes a single filesystem from the db
285    pub async fn delete_filesystem(&self, fs: &AnyFsRef) -> Result<(), DatabaseError> {
286        use crate::schema::filesystems::dsl::*;
287
288        let fs = fs.clone();
289
290        let conn = self.pool.get().await?;
291        conn.interact(move |conn| {
292            diesel::delete(filesystems.filter(uuid.eq(fs.id().as_bytes()))).execute(conn)
293        })
294        .await
295        .unwrap() // This should never fail unless the inner closure panics
296        .map_err(|e| e.into())
297        .map(|_| ())
298    }
299
300    /// Updates the (status)[`SynchroStatus`] of a synchro
301    pub async fn set_synchro_status(
302        &self,
303        synchro: SynchroId,
304        synchro_status: SynchroStatus,
305    ) -> Result<(), DatabaseError> {
306        use crate::schema::synchros::dsl::*;
307
308        let conn = self.pool.get().await?;
309        conn.interact(move |conn| {
310            diesel::update(synchros)
311                .filter(uuid.eq(synchro.id().as_bytes()))
312                .set(status.eq(format!("{}", synchro_status)))
313                .execute(conn)
314        })
315        .await
316        .unwrap() // This should never fail unless the inner closure panics
317        .map_err(|e| e.into())
318        .map(|_| ())
319    }
320
321    /// Updates the [state](`SynchroState`) of a synchro
322    pub async fn set_synchro_state(
323        &self,
324        synchro: SynchroId,
325        synchro_state: SynchroState,
326    ) -> Result<(), DatabaseError> {
327        use crate::schema::synchros::dsl::*;
328
329        let conn = self.pool.get().await?;
330        conn.interact(move |conn| {
331            diesel::update(synchros)
332                .filter(uuid.eq(synchro.id().as_bytes()))
333                .set(state.eq(format!("{}", synchro_state)))
334                .execute(conn)
335        })
336        .await
337        .unwrap() // This should never fail unless the inner closure panics
338        .map_err(|e| e.into())
339        .map(|_| ())
340    }
341
342    /// Loads a single filesystem into the [`SynchroList`]
343    async fn load_fs_to_list(
344        &self,
345        fs: &LoadedFileSystem,
346        synchro_list: &mut SynchroList,
347    ) -> Result<(), DatabaseError> {
348        let fs_info: AnyFsCreationInfo = fs.clone().into();
349        match fs_info {
350            AnyFsCreationInfo::LocalDir(_) => {
351                let vfs = self.load_vfs::<LocalSyncInfo>(fs.uuid).await?;
352                synchro_list
353                    .insert_existing_fs(fs_info, &vfs, fs.uuid)
354                    .map_err(|e| DatabaseError::invalid_data("nodes", "*", Some(Box::new(e))))
355            }
356            AnyFsCreationInfo::Nextcloud(_) => {
357                let vfs = self.load_vfs::<NextcloudSyncInfo>(fs.uuid).await?;
358                synchro_list
359                    .insert_existing_fs(fs_info, &vfs, fs.uuid)
360                    .map_err(|e| DatabaseError::invalid_data("nodes", "*", Some(Box::new(e))))
361            }
362        }
363    }
364
365    /// Loads a single synchro into the [`SynchroList`]
366    async fn load_synchro_to_list(
367        &self,
368        synchro: &DbSynchro,
369        synchro_list: &mut SynchroList,
370    ) -> Result<(), DatabaseError> {
371        let local = self.load_filesystem_from_id(synchro.local_fs).await?;
372        self.load_fs_to_list(&local, synchro_list).await?;
373        let remote = self.load_filesystem_from_id(synchro.remote_fs).await?;
374        self.load_fs_to_list(&remote, synchro_list).await?;
375
376        let mut synchro_ref = AnySynchroRef::new(local.into(), remote.into(), synchro.name.clone());
377        synchro_ref.set_state(
378            synchro
379                .state
380                .as_str()
381                .try_into()
382                .map_err(|_| DatabaseError::invalid_data("synchros", "state", None))?,
383        );
384        synchro_ref.set_status(
385            synchro
386                .status
387                .as_str()
388                .try_into()
389                .map_err(|_| DatabaseError::invalid_data("synchros", "state", None))?,
390        );
391
392        let synchro_id = Uuid::from_slice(synchro.uuid.as_slice())
393            .map_err(|e| DatabaseError::invalid_data("synchros", "uuid", Some(Box::new(e))))?;
394
395        synchro_list
396            .synchro_ref_list_mut()
397            .insert(synchro_id.into(), RwLock::new(synchro_ref));
398        Ok(())
399    }
400
401    /// Loads all the Synchros from the DB and create a new [`SynchroList`]
402    pub async fn load_all_synchros(&self) -> Result<SynchroList, DatabaseError> {
403        use crate::schema::synchros::dsl::*;
404
405        let mut list = SynchroList::new();
406
407        let db_synchros = {
408            let conn = self.pool.get().await?;
409            conn.interact(|conn| synchros.select(DbSynchro::as_select()).load(conn))
410                .await
411                .unwrap() // This should never fail unless the inner closure panics
412        }?;
413
414        for synchro in db_synchros {
415            // TODO: check for situations where the loaded synchro might be invalid:
416            // - SyncInProgress
417            // - ?
418
419            self.load_synchro_to_list(&synchro, &mut list).await?;
420        }
421
422        Ok(list)
423    }
424
425    /// Insert a new Synchro in the DB
426    pub async fn insert_synchro(
427        &self,
428        created: CreatedSynchro,
429        info: AnySynchroCreationInfo,
430    ) -> Result<i32, DatabaseError> {
431        use crate::schema::synchros::dsl::*;
432
433        let (local_db_id, remote_db_id) = try_join(
434            self.insert_new_filesystem(created.local_id(), info.local()),
435            self.insert_new_filesystem(created.remote_id(), info.remote()),
436        )
437        .await?;
438
439        let created_id = created.id();
440
441        let conn = self.pool.get().await?;
442        conn.interact(move |conn| {
443            let new_synchro = DbNewSynchro {
444                uuid: created_id.as_bytes(),
445                name: created.name(),
446                local_fs: local_db_id,
447                remote_fs: remote_db_id,
448                status: &SynchroStatus::default().to_string(),
449                state: &SynchroState::default().to_string(),
450            };
451
452            diesel::insert_into(synchros)
453                .values(&new_synchro)
454                .returning(id)
455                .get_result(conn)
456        })
457        .await
458        .unwrap() // This should never fail unless the inner closure panics
459        .map_err(|e| e.into())
460    }
461
462    /// Deletes a Synchro from the DB
463    pub async fn delete_synchro(&self, synchro: SynchroId) -> Result<(), DatabaseError> {
464        use crate::schema::synchros::dsl::*;
465
466        let conn = self.pool.get().await?;
467        conn.interact(move |conn| {
468            let db_synchro = &synchros
469                .filter(uuid.eq(synchro.as_bytes()))
470                .select(DbSynchro::as_select())
471                .get_result(conn)?;
472
473            diesel::delete(filesystems::table.filter(filesystems::id.eq(db_synchro.local_fs)))
474                .execute(conn)?;
475            diesel::delete(filesystems::table.filter(filesystems::id.eq(db_synchro.remote_fs)))
476                .execute(conn)?;
477
478            diesel::delete(synchros.filter(uuid.eq(synchro.as_bytes()))).execute(conn)
479        })
480        .await
481        .unwrap() // This should never fail unless the inner closure panics
482        .map_err(|e| e.into())
483        .map(|_| ())
484    }
485}
486
487#[cfg(test)]
488mod test {
489    use brume_daemon_proto::{AnyFsCreationInfo, LocalDirCreationInfo, NextcloudFsCreationInfo};
490
491    use crate::synchro_list::SynchroList;
492
493    use super::*;
494
495    #[tokio::test]
496    async fn test_db_filystem() {
497        let db = Database::new(&DatabaseConfig::InMemory).await.unwrap();
498
499        db.load_all_filesystems().await.unwrap();
500
501        let fs_info = AnyFsCreationInfo::LocalDir(LocalDirCreationInfo::new("/tmp/test"));
502        let fs_ref = AnyFsRef::from(fs_info.clone());
503        db.insert_new_filesystem(fs_ref.id(), &fs_info)
504            .await
505            .unwrap();
506
507        let fs_list = db.load_all_filesystems().await.unwrap();
508        assert_eq!(fs_list.len(), 1);
509
510        db.delete_filesystem(&fs_ref).await.unwrap();
511
512        let fs_list = db.load_all_filesystems().await.unwrap();
513        assert_eq!(fs_list.len(), 0);
514    }
515
516    #[tokio::test]
517    async fn test_db_synchro() {
518        let db = Database::new(&DatabaseConfig::InMemory).await.unwrap();
519
520        let mut list = SynchroList::new();
521
522        let loc_1 = LocalDirCreationInfo::new("/a");
523        let rem_1 = NextcloudFsCreationInfo::new("http://localhost", "admin", "admin");
524        let sync1 = AnySynchroCreationInfo::new(
525            AnyFsCreationInfo::LocalDir(loc_1),
526            AnyFsCreationInfo::Nextcloud(rem_1),
527            None,
528        );
529
530        let created1 = list.insert(sync1.clone()).await.unwrap();
531
532        db.insert_synchro(created1.clone(), sync1).await.unwrap();
533
534        let fs_list = db.load_all_filesystems().await.unwrap();
535        assert_eq!(fs_list.len(), 2);
536        let sync_list = db.load_all_synchros().await.unwrap();
537        assert_eq!(sync_list.len(), 1);
538
539        let loc_2 = LocalDirCreationInfo::new("/b");
540        let rem_2 = NextcloudFsCreationInfo::new("http://remote.dir", "admin", "admin");
541        let sync2 = AnySynchroCreationInfo::new(
542            AnyFsCreationInfo::LocalDir(loc_2),
543            AnyFsCreationInfo::Nextcloud(rem_2),
544            Some(String::from("2")),
545        );
546
547        let created2 = list.insert(sync2.clone()).await.unwrap();
548        db.insert_synchro(created2, sync2).await.unwrap();
549
550        let fs_list = db.load_all_filesystems().await.unwrap();
551        assert_eq!(fs_list.len(), 4);
552        let sync_list = db.load_all_synchros().await.unwrap();
553        assert_eq!(sync_list.len(), 2);
554
555        db.delete_synchro(created1.id()).await.unwrap();
556        list.remove(created1.id()).unwrap();
557
558        let fs_list = db.load_all_filesystems().await.unwrap();
559        assert_eq!(fs_list.len(), 2);
560        let sync_list = db.load_all_synchros().await.unwrap();
561        assert_eq!(sync_list.len(), 1);
562    }
563}