1use std::error::Error;
4use std::path::{Path, PathBuf};
5
6use deadpool_diesel::PoolError;
7use deadpool_diesel::{
8 Runtime,
9 sqlite::{Manager, Pool},
10};
11use diesel::prelude::*;
12use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
13use futures::future::try_join;
14use thiserror::Error;
15use tokio::sync::RwLock;
16use uuid::Uuid;
17
18use brume::concrete::{local::LocalSyncInfo, nextcloud::NextcloudSyncInfo};
19use brume_daemon_proto::{
20 AnyFsCreationInfo, AnyFsRef, AnySynchroCreationInfo, AnySynchroRef, SynchroId, SynchroState,
21 SynchroStatus,
22};
23
24use crate::{
25 schema::{filesystems, synchros},
26 synchro_list::{CreatedSynchro, SynchroList},
27};
28
29pub mod vfs;
30
31#[derive(Queryable, Selectable, Identifiable)]
33#[diesel(table_name = filesystems)]
34#[diesel(check_for_backend(diesel::sqlite::Sqlite))]
35struct DbFileSystem {
36 id: i32,
37 uuid: Vec<u8>,
38 creation_info: Vec<u8>,
39}
40
41#[derive(Insertable)]
43#[diesel(table_name = filesystems)]
44struct DbNewFileSystem<'a> {
45 uuid: &'a [u8],
46 creation_info: &'a [u8],
47 root_node: i32,
48}
49
50#[derive(Queryable, Selectable, Identifiable)]
52#[diesel(table_name = synchros)]
53#[diesel(check_for_backend(diesel::sqlite::Sqlite))]
54struct DbSynchro {
55 id: i32,
56 uuid: Vec<u8>,
57 name: String,
58 local_fs: i32,
59 remote_fs: i32,
60 state: String,
61 status: String,
62}
63
64#[derive(Insertable)]
66#[diesel(table_name = synchros)]
67struct DbNewSynchro<'a> {
68 uuid: &'a [u8],
69 name: &'a str,
70 local_fs: i32,
71 remote_fs: i32,
72 status: &'a str,
73 state: &'a str,
74}
75
76#[derive(Clone)]
80pub struct LoadedFileSystem {
81 uuid: Uuid,
82 creation_info: AnyFsCreationInfo,
83}
84
85impl From<LoadedFileSystem> for AnyFsRef {
86 fn from(value: LoadedFileSystem) -> Self {
87 Self::new(value.uuid, value.creation_info.into())
88 }
89}
90
91impl From<LoadedFileSystem> for AnyFsCreationInfo {
92 fn from(value: LoadedFileSystem) -> Self {
93 value.creation_info
94 }
95}
96
97pub struct Database {
99 pool: Pool,
100}
101
102const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
104
105#[derive(Clone)]
107pub enum DatabaseConfig {
108 InMemory,
109 OnDisk(PathBuf),
110}
111
112impl DatabaseConfig {
113 pub fn as_str(&self) -> Option<&str> {
114 match self {
115 DatabaseConfig::InMemory => Some(":memory:"),
116 DatabaseConfig::OnDisk(path_buf) => path_buf.as_os_str().to_str(),
117 }
118 }
119
120 fn to_string_lossy(&self) -> String {
121 match self {
122 DatabaseConfig::InMemory => String::from(":memory:"),
123 DatabaseConfig::OnDisk(path_buf) => path_buf.as_os_str().to_string_lossy().to_string(),
124 }
125 }
126
127 pub fn new_ondisk<P: AsRef<Path>>(path: P) -> Self {
128 Self::OnDisk(path.as_ref().to_path_buf())
129 }
130
131 pub fn new_inmemory() -> Self {
132 Self::InMemory
133 }
134}
135
136#[derive(Error, Debug)]
137#[error("Failed to create database")]
138pub enum DatabaseCreationError {
139 #[error("Invalid path: {0}")]
140 InvalidDbPath(String),
141 #[error("Failed to update db to the latest schema")]
142 MigrationError(#[source] Box<dyn Error + Send + Sync>),
143}
144
145#[derive(Error, Debug)]
146#[error("Failed to create database")]
147pub enum DatabaseError {
148 #[error("failed to connect to the database")]
149 ConnectionError(#[from] PoolError),
150 #[error("Invalid database state")]
151 InvalidState(#[from] diesel::result::Error),
152 #[error("Data value found in the database in table {table} column {column} is not valid")]
153 InvalidData {
154 table: String,
155 column: String,
156 source: Option<Box<dyn Error + Send + Sync>>,
157 },
158 #[error("Failed to serialize data before database insert")]
159 SerializationError(#[from] bincode::Error),
160}
161
162impl DatabaseError {
163 fn invalid_data(
164 table: &str,
165 column: &str,
166 source: Option<Box<dyn Error + Send + Sync>>,
167 ) -> Self {
168 Self::InvalidData {
169 table: table.to_string(),
170 column: column.to_string(),
171 source,
172 }
173 }
174}
175
176impl Database {
177 pub async fn new(config: &DatabaseConfig) -> Result<Self, DatabaseCreationError> {
179 let db_str = config
180 .as_str()
181 .ok_or_else(|| DatabaseCreationError::InvalidDbPath(config.to_string_lossy()))?;
182 let manager = Manager::new(db_str, Runtime::Tokio1);
183
184 let pool = Pool::builder(manager).max_size(1).build().unwrap();
188
189 let conn = pool
190 .get()
191 .await
192 .map_err(|_| DatabaseCreationError::InvalidDbPath(config.to_string_lossy()))?;
193 conn.interact(|conn| conn.run_pending_migrations(MIGRATIONS).map(|_| ()))
194 .await
195 .unwrap() .map_err(DatabaseCreationError::MigrationError)?;
197
198 Ok(Self { pool })
199 }
200
201 #[cfg(test)]
203 async fn load_all_filesystems(&self) -> Result<Vec<LoadedFileSystem>, DatabaseError> {
204 use crate::schema::filesystems::dsl::*;
205
206 let results = {
207 let conn = self.pool.get().await?;
208 conn.interact(|conn| filesystems.select(DbFileSystem::as_select()).load(conn))
209 .await
210 .unwrap() }?;
212
213 results
214 .into_iter()
215 .map(|db_fs| {
216 Ok(LoadedFileSystem {
217 uuid: Uuid::from_slice(&db_fs.uuid).map_err(|e| {
218 DatabaseError::invalid_data("filesystems", "uuid", Some(Box::new(e)))
219 })?,
220 creation_info: bincode::deserialize(&db_fs.creation_info).map_err(|e| {
221 DatabaseError::invalid_data(
222 "filesystems",
223 "creation_info",
224 Some(Box::new(e)),
225 )
226 })?,
227 })
228 })
229 .collect()
230 }
231
232 async fn load_filesystem_from_id(&self, fs_id: i32) -> Result<LoadedFileSystem, DatabaseError> {
234 use crate::schema::filesystems::dsl::*;
235
236 let db_fs = {
237 let conn = self.pool.get().await?;
238 conn.interact(move |conn| {
239 filesystems
240 .filter(id.eq(fs_id))
241 .select(DbFileSystem::as_select())
242 .first(conn)
243 })
244 .await
245 .unwrap() }?;
247
248 Ok(LoadedFileSystem {
249 uuid: Uuid::from_slice(&db_fs.uuid).unwrap(),
250 creation_info: bincode::deserialize(&db_fs.creation_info).unwrap(),
251 })
252 }
253
254 pub async fn insert_new_filesystem(
256 &self,
257 fs_uuid: Uuid,
258 fs: &AnyFsCreationInfo,
259 ) -> Result<i32, DatabaseError> {
260 use crate::schema::filesystems::dsl::*;
261
262 let info = bincode::serialize(fs)?;
263
264 let vfs_root = self.insert_vfs_root().await?;
265
266 let conn = self.pool.get().await?;
267 conn.interact(move |conn| {
268 let new_fs = DbNewFileSystem {
269 uuid: fs_uuid.as_bytes(),
270 creation_info: &info,
271 root_node: vfs_root,
272 };
273
274 diesel::insert_into(filesystems)
275 .values(&new_fs)
276 .returning(id)
277 .get_result(conn)
278 })
279 .await
280 .unwrap() .map_err(|e| e.into())
282 }
283
284 pub async fn delete_filesystem(&self, fs: &AnyFsRef) -> Result<(), DatabaseError> {
286 use crate::schema::filesystems::dsl::*;
287
288 let fs = fs.clone();
289
290 let conn = self.pool.get().await?;
291 conn.interact(move |conn| {
292 diesel::delete(filesystems.filter(uuid.eq(fs.id().as_bytes()))).execute(conn)
293 })
294 .await
295 .unwrap() .map_err(|e| e.into())
297 .map(|_| ())
298 }
299
300 pub async fn set_synchro_status(
302 &self,
303 synchro: SynchroId,
304 synchro_status: SynchroStatus,
305 ) -> Result<(), DatabaseError> {
306 use crate::schema::synchros::dsl::*;
307
308 let conn = self.pool.get().await?;
309 conn.interact(move |conn| {
310 diesel::update(synchros)
311 .filter(uuid.eq(synchro.id().as_bytes()))
312 .set(status.eq(format!("{}", synchro_status)))
313 .execute(conn)
314 })
315 .await
316 .unwrap() .map_err(|e| e.into())
318 .map(|_| ())
319 }
320
321 pub async fn set_synchro_state(
323 &self,
324 synchro: SynchroId,
325 synchro_state: SynchroState,
326 ) -> Result<(), DatabaseError> {
327 use crate::schema::synchros::dsl::*;
328
329 let conn = self.pool.get().await?;
330 conn.interact(move |conn| {
331 diesel::update(synchros)
332 .filter(uuid.eq(synchro.id().as_bytes()))
333 .set(state.eq(format!("{}", synchro_state)))
334 .execute(conn)
335 })
336 .await
337 .unwrap() .map_err(|e| e.into())
339 .map(|_| ())
340 }
341
342 async fn load_fs_to_list(
344 &self,
345 fs: &LoadedFileSystem,
346 synchro_list: &mut SynchroList,
347 ) -> Result<(), DatabaseError> {
348 let fs_info: AnyFsCreationInfo = fs.clone().into();
349 match fs_info {
350 AnyFsCreationInfo::LocalDir(_) => {
351 let vfs = self.load_vfs::<LocalSyncInfo>(fs.uuid).await?;
352 synchro_list
353 .insert_existing_fs(fs_info, &vfs, fs.uuid)
354 .map_err(|e| DatabaseError::invalid_data("nodes", "*", Some(Box::new(e))))
355 }
356 AnyFsCreationInfo::Nextcloud(_) => {
357 let vfs = self.load_vfs::<NextcloudSyncInfo>(fs.uuid).await?;
358 synchro_list
359 .insert_existing_fs(fs_info, &vfs, fs.uuid)
360 .map_err(|e| DatabaseError::invalid_data("nodes", "*", Some(Box::new(e))))
361 }
362 }
363 }
364
365 async fn load_synchro_to_list(
367 &self,
368 synchro: &DbSynchro,
369 synchro_list: &mut SynchroList,
370 ) -> Result<(), DatabaseError> {
371 let local = self.load_filesystem_from_id(synchro.local_fs).await?;
372 self.load_fs_to_list(&local, synchro_list).await?;
373 let remote = self.load_filesystem_from_id(synchro.remote_fs).await?;
374 self.load_fs_to_list(&remote, synchro_list).await?;
375
376 let mut synchro_ref = AnySynchroRef::new(local.into(), remote.into(), synchro.name.clone());
377 synchro_ref.set_state(
378 synchro
379 .state
380 .as_str()
381 .try_into()
382 .map_err(|_| DatabaseError::invalid_data("synchros", "state", None))?,
383 );
384 synchro_ref.set_status(
385 synchro
386 .status
387 .as_str()
388 .try_into()
389 .map_err(|_| DatabaseError::invalid_data("synchros", "state", None))?,
390 );
391
392 let synchro_id = Uuid::from_slice(synchro.uuid.as_slice())
393 .map_err(|e| DatabaseError::invalid_data("synchros", "uuid", Some(Box::new(e))))?;
394
395 synchro_list
396 .synchro_ref_list_mut()
397 .insert(synchro_id.into(), RwLock::new(synchro_ref));
398 Ok(())
399 }
400
401 pub async fn load_all_synchros(&self) -> Result<SynchroList, DatabaseError> {
403 use crate::schema::synchros::dsl::*;
404
405 let mut list = SynchroList::new();
406
407 let db_synchros = {
408 let conn = self.pool.get().await?;
409 conn.interact(|conn| synchros.select(DbSynchro::as_select()).load(conn))
410 .await
411 .unwrap() }?;
413
414 for synchro in db_synchros {
415 self.load_synchro_to_list(&synchro, &mut list).await?;
420 }
421
422 Ok(list)
423 }
424
425 pub async fn insert_synchro(
427 &self,
428 created: CreatedSynchro,
429 info: AnySynchroCreationInfo,
430 ) -> Result<i32, DatabaseError> {
431 use crate::schema::synchros::dsl::*;
432
433 let (local_db_id, remote_db_id) = try_join(
434 self.insert_new_filesystem(created.local_id(), info.local()),
435 self.insert_new_filesystem(created.remote_id(), info.remote()),
436 )
437 .await?;
438
439 let created_id = created.id();
440
441 let conn = self.pool.get().await?;
442 conn.interact(move |conn| {
443 let new_synchro = DbNewSynchro {
444 uuid: created_id.as_bytes(),
445 name: created.name(),
446 local_fs: local_db_id,
447 remote_fs: remote_db_id,
448 status: &SynchroStatus::default().to_string(),
449 state: &SynchroState::default().to_string(),
450 };
451
452 diesel::insert_into(synchros)
453 .values(&new_synchro)
454 .returning(id)
455 .get_result(conn)
456 })
457 .await
458 .unwrap() .map_err(|e| e.into())
460 }
461
462 pub async fn delete_synchro(&self, synchro: SynchroId) -> Result<(), DatabaseError> {
464 use crate::schema::synchros::dsl::*;
465
466 let conn = self.pool.get().await?;
467 conn.interact(move |conn| {
468 let db_synchro = &synchros
469 .filter(uuid.eq(synchro.as_bytes()))
470 .select(DbSynchro::as_select())
471 .get_result(conn)?;
472
473 diesel::delete(filesystems::table.filter(filesystems::id.eq(db_synchro.local_fs)))
474 .execute(conn)?;
475 diesel::delete(filesystems::table.filter(filesystems::id.eq(db_synchro.remote_fs)))
476 .execute(conn)?;
477
478 diesel::delete(synchros.filter(uuid.eq(synchro.as_bytes()))).execute(conn)
479 })
480 .await
481 .unwrap() .map_err(|e| e.into())
483 .map(|_| ())
484 }
485}
486
487#[cfg(test)]
488mod test {
489 use brume_daemon_proto::{AnyFsCreationInfo, LocalDirCreationInfo, NextcloudFsCreationInfo};
490
491 use crate::synchro_list::SynchroList;
492
493 use super::*;
494
495 #[tokio::test]
496 async fn test_db_filystem() {
497 let db = Database::new(&DatabaseConfig::InMemory).await.unwrap();
498
499 db.load_all_filesystems().await.unwrap();
500
501 let fs_info = AnyFsCreationInfo::LocalDir(LocalDirCreationInfo::new("/tmp/test"));
502 let fs_ref = AnyFsRef::from(fs_info.clone());
503 db.insert_new_filesystem(fs_ref.id(), &fs_info)
504 .await
505 .unwrap();
506
507 let fs_list = db.load_all_filesystems().await.unwrap();
508 assert_eq!(fs_list.len(), 1);
509
510 db.delete_filesystem(&fs_ref).await.unwrap();
511
512 let fs_list = db.load_all_filesystems().await.unwrap();
513 assert_eq!(fs_list.len(), 0);
514 }
515
516 #[tokio::test]
517 async fn test_db_synchro() {
518 let db = Database::new(&DatabaseConfig::InMemory).await.unwrap();
519
520 let mut list = SynchroList::new();
521
522 let loc_1 = LocalDirCreationInfo::new("/a");
523 let rem_1 = NextcloudFsCreationInfo::new("http://localhost", "admin", "admin");
524 let sync1 = AnySynchroCreationInfo::new(
525 AnyFsCreationInfo::LocalDir(loc_1),
526 AnyFsCreationInfo::Nextcloud(rem_1),
527 None,
528 );
529
530 let created1 = list.insert(sync1.clone()).await.unwrap();
531
532 db.insert_synchro(created1.clone(), sync1).await.unwrap();
533
534 let fs_list = db.load_all_filesystems().await.unwrap();
535 assert_eq!(fs_list.len(), 2);
536 let sync_list = db.load_all_synchros().await.unwrap();
537 assert_eq!(sync_list.len(), 1);
538
539 let loc_2 = LocalDirCreationInfo::new("/b");
540 let rem_2 = NextcloudFsCreationInfo::new("http://remote.dir", "admin", "admin");
541 let sync2 = AnySynchroCreationInfo::new(
542 AnyFsCreationInfo::LocalDir(loc_2),
543 AnyFsCreationInfo::Nextcloud(rem_2),
544 Some(String::from("2")),
545 );
546
547 let created2 = list.insert(sync2.clone()).await.unwrap();
548 db.insert_synchro(created2, sync2).await.unwrap();
549
550 let fs_list = db.load_all_filesystems().await.unwrap();
551 assert_eq!(fs_list.len(), 4);
552 let sync_list = db.load_all_synchros().await.unwrap();
553 assert_eq!(sync_list.len(), 2);
554
555 db.delete_synchro(created1.id()).await.unwrap();
556 list.remove(created1.id()).unwrap();
557
558 let fs_list = db.load_all_filesystems().await.unwrap();
559 assert_eq!(fs_list.len(), 2);
560 let sync_list = db.load_all_synchros().await.unwrap();
561 assert_eq!(sync_list.len(), 1);
562 }
563}