Skip to main content

brume_daemon/db/
vfs.rs

1//! How the VFS nodes are stored in the DB
2use std::fmt::Debug;
3
4use diesel::prelude::*;
5use futures::future::Either;
6use serde::{Deserialize, Serialize, de::DeserializeOwned};
7use uuid::Uuid;
8
9use brume::{
10    update::VfsUpdate,
11    vfs::{DirTree, FileMeta, NodeKind, NodeState, Vfs, VfsNode},
12};
13use brume_daemon_proto::VirtualPath;
14
15use crate::schema::{filesystems, nodes};
16
17use super::{Database, DatabaseError};
18
19/// Information loaded from a VFS node in the db
20#[derive(Clone, Debug, Queryable, Selectable, Identifiable, Associations)]
21#[diesel(table_name = nodes)]
22#[diesel(belongs_to(DbVfsNode, foreign_key = parent))]
23#[diesel(check_for_backend(diesel::sqlite::Sqlite))]
24struct DbVfsNode {
25    id: i32,
26    name: String,
27    kind: String,
28    size: Option<i64>,
29    state: Option<Vec<u8>>,
30    parent: Option<i32>,
31}
32
33impl<'a, SyncInfo> From<&'a DbVfsNode> for VfsNode<SyncInfo>
34where
35    SyncInfo: Deserialize<'a>,
36{
37    fn from(value: &'a DbVfsNode) -> Self {
38        let kind = NodeKind::try_from(value.kind.as_str()).unwrap();
39        let state = value
40            .state
41            .as_ref()
42            .map(|st| bincode::deserialize(st).unwrap())
43            .unwrap_or(NodeState::NeedResync);
44
45        match kind {
46            NodeKind::Dir => Self::Dir(DirTree::new_with_state(&value.name, state)),
47            NodeKind::File => Self::File(FileMeta::new_with_state(
48                &value.name,
49                value.size.unwrap() as u64,
50                state,
51            )),
52        }
53    }
54}
55
56/// Information needed to insert a new VFS node in the db
57#[derive(Insertable)]
58#[diesel(table_name = nodes)]
59struct DbNewVfsNode<'a> {
60    name: &'a str,
61    kind: &'a str,
62    size: Option<i64>,
63    state: Option<&'a [u8]>,
64    parent: Option<i32>,
65}
66
67/// Information returned by an update of a VFS node in the db
68#[derive(AsChangeset)]
69#[diesel(table_name = nodes)]
70struct DbUpdatedVfsNode<'a> {
71    size: Option<i64>,
72    state: Option<&'a [u8]>,
73}
74
75impl Default for DbNewVfsNode<'static> {
76    fn default() -> Self {
77        Self::root()
78    }
79}
80
81impl DbNewVfsNode<'static> {
82    pub(super) fn root() -> Self {
83        Self {
84            name: "",
85            kind: NodeKind::Dir.as_str(),
86            size: None,
87            state: None,
88            parent: None,
89        }
90    }
91}
92
93impl Database {
94    pub async fn insert_vfs_root(&self) -> Result<i32, DatabaseError> {
95        use crate::schema::nodes::dsl::*;
96
97        let conn = self.pool.get().await?;
98        conn.interact(move |conn| {
99            let vfs_root = DbNewVfsNode::root();
100            diesel::insert_into(nodes)
101                .values(&vfs_root)
102                .returning(id)
103                .get_result(conn)
104        })
105        .await
106        .unwrap() // This should never fail unless the inner closure panics
107        .map_err(|e| e.into())
108    }
109
110    #[cfg(test)]
111    async fn list_all_nodes(&self) -> Result<Vec<DbVfsNode>, DatabaseError> {
112        use crate::schema::nodes::dsl::*;
113
114        let conn = self.pool.get().await?;
115        conn.interact(|conn| nodes.select(DbVfsNode::as_select()).load(conn))
116            .await
117            .unwrap() // This should never fail unless the inner closure panics
118            .map_err(|e| e.into())
119    }
120
121    /// Loads recursively a node and its children
122    async fn load_nodes_rec<SyncInfo: DeserializeOwned>(
123        &self,
124        parent_node: &DbVfsNode,
125    ) -> Result<VfsNode<SyncInfo>, DatabaseError> {
126        let mut vfs_node = parent_node.into();
127
128        if let VfsNode::Dir(dir) = &mut vfs_node {
129            let parent_node = parent_node.to_owned();
130
131            let db_children = {
132                let conn = self.pool.get().await?;
133                conn.interact(move |conn| {
134                    DbVfsNode::belonging_to(&parent_node)
135                        .select(DbVfsNode::as_select())
136                        .load(conn)
137                })
138                .await
139                .unwrap()
140            }?;
141
142            for db_child in db_children {
143                let child = Box::pin(self.load_nodes_rec(&db_child)).await?;
144
145                dir.insert_child(child);
146            }
147        }
148        Ok(vfs_node)
149    }
150
151    /// Loads a vfs with all its node from the DB
152    pub async fn load_vfs<SyncInfo: DeserializeOwned + Debug>(
153        &self,
154        fs_uuid: Uuid,
155    ) -> Result<Vfs<SyncInfo>, DatabaseError> {
156        let vfs_root = self.get_vfs_root(fs_uuid).await?;
157
158        let loaded_root = self.load_nodes_rec(&vfs_root).await?;
159
160        Ok(Vfs::new(loaded_root))
161    }
162
163    /// Loads the node at `path` from the DB
164    async fn find_node(
165        &self,
166        root: &DbVfsNode,
167        path: &VirtualPath,
168    ) -> Result<DbVfsNode, DatabaseError> {
169        use crate::schema::nodes::dsl::*;
170
171        let mut current_node = root.clone();
172
173        for cur_name in path.iter() {
174            let conn = self.pool.get().await?;
175
176            let cur_name = cur_name.to_owned();
177            current_node = conn
178                .interact(move |conn| {
179                    DbVfsNode::belonging_to(&current_node)
180                        .select(DbVfsNode::as_select())
181                        .filter(name.eq(cur_name))
182                        .first(conn)
183                })
184                .await
185                .unwrap() // This should never fail unless the inner closure panics
186                ?
187        }
188
189        Ok(current_node)
190    }
191
192    /// Inserts a child node with the provided parent
193    async fn insert_node_child<SyncInfo: Serialize>(
194        &self,
195        parent_node: i32,
196        node: &VfsNode<SyncInfo>,
197    ) -> Result<(), DatabaseError> {
198        match node {
199            VfsNode::Dir(dir_tree) => Box::pin(self.insert_dir_child(parent_node, dir_tree)).await,
200            VfsNode::File(file_meta) => self.insert_file_child(parent_node, file_meta).await,
201        }
202    }
203
204    /// Inserts a file node with the provided parent
205    async fn insert_file_child<SyncInfo: Serialize>(
206        &self,
207        parent_node: i32,
208        file: &FileMeta<SyncInfo>,
209    ) -> Result<(), DatabaseError> {
210        use crate::schema::nodes::dsl::*;
211
212        let file_name = file.name().to_string();
213        let file_size = file.size() as i64;
214        let file_state = bincode::serialize(file.state()).unwrap();
215
216        let conn = self.pool.get().await?;
217
218        conn.interact(move |conn| {
219            let db_node = DbNewVfsNode {
220                name: &file_name,
221                kind: NodeKind::File.as_str(),
222                size: Some(file_size),
223                state: Some(&file_state),
224                parent: Some(parent_node),
225            };
226
227            diesel::insert_into(nodes).values(&db_node).execute(conn)
228        })
229        .await
230        .unwrap() // This should never fail unless the inner closure panics
231        .map_err(|e| e.into())
232        .map(|_| ())
233    }
234
235    /// Inserts a dir node with the provided parent
236    async fn insert_dir_child<SyncInfo: Serialize>(
237        &self,
238        parent_node: i32,
239        tree: &DirTree<SyncInfo>,
240    ) -> Result<(), DatabaseError> {
241        use crate::schema::nodes::dsl::*;
242
243        let dir_name = tree.name().to_string();
244        let dir_state = bincode::serialize(tree.state()).unwrap();
245
246        let new_id = {
247            let conn = self.pool.get().await?;
248
249            conn.interact(move |conn| {
250                let db_dir = DbNewVfsNode {
251                    name: &dir_name,
252                    kind: NodeKind::Dir.as_str(),
253                    size: None,
254                    state: Some(&dir_state),
255                    parent: Some(parent_node),
256                };
257
258                diesel::insert_into(nodes)
259                    .values(&db_dir)
260                    .returning(id)
261                    .get_result(conn)
262            })
263								.await
264								.unwrap() // This should never fail unless the inner closure panics
265								?
266        };
267
268        for child in tree.children().iter() {
269            self.insert_node_child(new_id, child).await?;
270        }
271        Ok(())
272    }
273
274    /// Inserts a dir node at the provided path
275    async fn insert_dir<SyncInfo: Serialize>(
276        &self,
277        root: &DbVfsNode,
278        path: &VirtualPath,
279        tree: &DirTree<SyncInfo>,
280    ) -> Result<(), DatabaseError> {
281        let parent_node = path
282            .parent()
283            .map(|parent_path| Either::Left(self.find_node(root, parent_path)))
284            .unwrap_or_else(|| Either::Right(async { Ok(root.clone()) }))
285            .await?;
286
287        self.insert_dir_child(parent_node.id, tree).await
288    }
289
290    /// Updates the metadata of the file node at the provided path
291    async fn update_file<SyncInfo: Serialize>(
292        &self,
293        root: &DbVfsNode,
294        path: &VirtualPath,
295        file: &FileMeta<SyncInfo>,
296    ) -> Result<(), DatabaseError> {
297        use crate::schema::nodes::dsl::*;
298
299        let file_size = file.size() as i64;
300        let file_state = bincode::serialize(file.state())?;
301
302        let node = self.find_node(root, path).await?;
303
304        let conn = self.pool.get().await?;
305
306        conn.interact(move |conn| {
307            let update = DbUpdatedVfsNode {
308                size: Some(file_size),
309                state: Some(&file_state),
310            };
311
312            diesel::update(nodes)
313                .filter(id.eq(node.id))
314                .set(update)
315                .execute(conn)
316        })
317        .await
318        .unwrap() // This should never fail unless the inner closure panics
319        .map_err(|e| e.into())
320        .map(|_| ())
321    }
322
323    /// Inserts a file node at the provided path
324    async fn insert_file<SyncInfo: Serialize>(
325        &self,
326        root: &DbVfsNode,
327        path: &VirtualPath,
328        file: &FileMeta<SyncInfo>,
329    ) -> Result<(), DatabaseError> {
330        let parent_node = path
331            .parent()
332            .map(|parent_path| Either::Left(self.find_node(root, parent_path)))
333            .unwrap_or_else(|| Either::Right(async { Ok(root.clone()) }))
334            .await?;
335
336        self.insert_file_child(parent_node.id, file).await
337    }
338
339    /// Removes a node from DB at the provided path
340    async fn delete_node(&self, root: &DbVfsNode, path: &VirtualPath) -> Result<(), DatabaseError> {
341        let node = self.find_node(root, path).await?;
342
343        let conn = self.pool.get().await?;
344        conn.interact(move |conn| diesel::delete(&node).execute(conn))
345            .await
346            .unwrap() // This should never fail unless the inner closure panics
347            .map_err(|e| e.into())
348            .map(|_| ())
349    }
350
351    /// Updates the state of the node at the provided path
352    async fn set_node_state<SyncInfo: Serialize>(
353        &self,
354        root: &DbVfsNode,
355        path: &VirtualPath,
356        node_state: &NodeState<SyncInfo>,
357    ) -> Result<(), DatabaseError> {
358        use crate::schema::nodes::dsl::*;
359
360        let db_state = bincode::serialize(&node_state)?;
361
362        let node = self.find_node(root, path).await?;
363
364        let conn = self.pool.get().await?;
365
366        conn.interact(move |conn| {
367            diesel::update(nodes)
368                .filter(id.eq(node.id))
369                .set(state.eq(db_state))
370                .execute(conn)
371        })
372        .await
373        .unwrap() // This should never fail unless the inner closure panics
374        .map_err(|e| e.into())
375        .map(|_| ())
376    }
377
378    /// Returns the root node of the FS
379    async fn get_vfs_root(&self, fs_uuid: Uuid) -> Result<DbVfsNode, DatabaseError> {
380        let conn = self.pool.get().await?;
381        conn.interact(move |conn| {
382            nodes::table
383                .inner_join(filesystems::table)
384                .filter(filesystems::uuid.eq(fs_uuid.as_bytes()))
385                .select(DbVfsNode::as_select())
386                .first(conn)
387        })
388        .await
389        .unwrap() // This should never fail unless the inner closure panics
390        .map_err(|e| e.into())
391    }
392
393    /// Updates the state of the node at `path` inside the FS with id `fs_uuid`
394    pub async fn update_vfs_node_state<SyncInfo: Serialize>(
395        &self,
396        fs_uuid: Uuid,
397        path: &VirtualPath,
398        node_state: &NodeState<SyncInfo>,
399    ) -> Result<(), DatabaseError> {
400        let vfs_root = self.get_vfs_root(fs_uuid).await?;
401
402        self.set_node_state(&vfs_root, path, node_state).await
403    }
404
405    /// Removes from the DB the node at `path` inside the FS with id `fs_uuid`
406    pub async fn delete_vfs_node(
407        &self,
408        fs_uuid: Uuid,
409        path: &VirtualPath,
410    ) -> Result<(), DatabaseError> {
411        let vfs_root = self.get_vfs_root(fs_uuid).await?;
412
413        self.delete_node(&vfs_root, path).await
414    }
415
416    /// Applies a VFS update to the nodes in the DB
417    pub async fn update_vfs<SyncInfo: Serialize>(
418        &self,
419        fs_uuid: Uuid,
420        update: &VfsUpdate<SyncInfo>,
421    ) -> Result<(), DatabaseError> {
422        let vfs_root = self.get_vfs_root(fs_uuid).await?;
423
424        match update {
425            VfsUpdate::DirCreated(dir_creation) => {
426                self.insert_dir(&vfs_root, dir_creation.path(), dir_creation.dir())
427                    .await
428            }
429            VfsUpdate::DirRemoved(path) => self.delete_node(&vfs_root, path).await,
430            VfsUpdate::FileCreated(file_creation) => {
431                let name = file_creation.path().name().to_owned();
432                let path = file_creation.path().to_owned();
433                let file =
434                    FileMeta::new(&name, file_creation.file_size(), file_creation.sync_info());
435                self.insert_file(&vfs_root, &path, &file).await
436            }
437            VfsUpdate::FileModified(file_modification) => {
438                let name = file_modification.path().name().to_owned();
439                let path = file_modification.path().to_owned();
440                let file = FileMeta::new(
441                    &name,
442                    file_modification.file_size(),
443                    file_modification.sync_info(),
444                );
445
446                self.update_file(&vfs_root, &path, &file).await
447            }
448            VfsUpdate::FileRemoved(path) => self.delete_node(&vfs_root, path).await,
449            VfsUpdate::FailedApplication(failed_update) => {
450                let path = failed_update.path().to_owned();
451                let state = NodeState::<SyncInfo>::Error(failed_update.clone());
452                self.set_node_state(&vfs_root, &path, &state).await
453            }
454            VfsUpdate::Conflict(vfs_diff) => {
455                let path = vfs_diff.path().to_owned();
456                let state = NodeState::<SyncInfo>::Conflict(vfs_diff.clone());
457                self.set_node_state(&vfs_root, &path, &state).await
458            }
459        }
460    }
461}
462
463#[cfg(test)]
464mod test {
465    use std::io::{self, ErrorKind};
466
467    use brume::{
468        concrete::local::LocalDirError,
469        update::{FailedUpdateApplication, VfsDiff, VfsDirCreation, VfsFileUpdate, VfsUpdate},
470        vfs::{DirTree, FileMeta, NodeKind, VfsNode},
471    };
472    use brume_daemon_proto::{
473        AnyFsCreationInfo, AnyFsRef, LocalDirCreationInfo, VirtualPath, VirtualPathBuf,
474    };
475
476    use crate::db::{Database, DatabaseConfig};
477
478    #[tokio::test]
479    async fn test_update_db_vfs() {
480        let db = Database::new(&DatabaseConfig::InMemory).await.unwrap();
481
482        let mut a = DirTree::new("a", ());
483        let f1 = FileMeta::new("f1", 10, ());
484        let mut b = DirTree::new("b", ());
485        let c = DirTree::new("c", ());
486        let d = DirTree::new("d", ());
487
488        b.insert_child(VfsNode::Dir(c));
489        a.insert_child(VfsNode::File(f1));
490        a.insert_child(VfsNode::Dir(b));
491
492        let mut base_root = DirTree::new("", ());
493        base_root.insert_child(VfsNode::Dir(a.clone()));
494        let base_vfs = VfsNode::Dir(base_root);
495
496        let creation_update = VfsUpdate::DirCreated(VfsDirCreation::new(VirtualPath::root(), a));
497
498        let fs_info = AnyFsCreationInfo::LocalDir(LocalDirCreationInfo::new("/tmp/test"));
499        let fs_ref = AnyFsRef::from(fs_info.clone());
500        db.insert_new_filesystem(fs_ref.id(), &fs_info)
501            .await
502            .unwrap();
503
504        let nodes = db.list_all_nodes().await.unwrap();
505
506        // Only the root is present
507        assert_eq!(nodes.len(), 1);
508
509        db.update_vfs(fs_ref.id(), &creation_update).await.unwrap();
510
511        let nodes = db.list_all_nodes().await.unwrap();
512        assert_eq!(nodes.len(), 5);
513
514        let root = db.get_vfs_root(fs_ref.id()).await.unwrap();
515
516        let node = db
517            .find_node(&root, &VirtualPathBuf::new("/a/f1").unwrap())
518            .await
519            .unwrap();
520
521        assert_eq!(node.kind, NodeKind::File.as_str());
522
523        let node = db
524            .find_node(&root, &VirtualPathBuf::new("/a/b").unwrap())
525            .await
526            .unwrap();
527
528        assert_eq!(node.kind, NodeKind::Dir.as_str());
529
530        let vfs_root = db.load_nodes_rec::<()>(&root).await.unwrap();
531        assert!(vfs_root.structural_eq(&base_vfs));
532
533        let creation_update =
534            VfsUpdate::DirCreated(VfsDirCreation::new(&VirtualPathBuf::new("/a").unwrap(), d));
535        db.update_vfs(fs_ref.id(), &creation_update).await.unwrap();
536
537        let nodes = db.list_all_nodes().await.unwrap();
538        assert_eq!(nodes.len(), 6);
539
540        let node = db
541            .find_node(&root, &VirtualPathBuf::new("/a/d").unwrap())
542            .await
543            .unwrap();
544
545        assert_eq!(node.kind, NodeKind::Dir.as_str());
546
547        let creation_update = VfsUpdate::FileCreated(VfsFileUpdate::new(
548            &VirtualPathBuf::new("/a/b/f2").unwrap(),
549            42,
550            (),
551        ));
552        db.update_vfs(fs_ref.id(), &creation_update).await.unwrap();
553
554        let nodes = db.list_all_nodes().await.unwrap();
555        assert_eq!(nodes.len(), 7);
556
557        let node = db
558            .find_node(&root, &VirtualPathBuf::new("/a/b/f2").unwrap())
559            .await
560            .unwrap();
561
562        assert_eq!(node.kind, NodeKind::File.as_str());
563        assert_eq!(node.size, Some(42));
564
565        let creation_update = VfsUpdate::FileModified(VfsFileUpdate::new(
566            &VirtualPathBuf::new("/a/b/f2").unwrap(),
567            54,
568            (),
569        ));
570        db.update_vfs(fs_ref.id(), &creation_update).await.unwrap();
571
572        let nodes = db.list_all_nodes().await.unwrap();
573        assert_eq!(nodes.len(), 7);
574
575        let node = db
576            .find_node(&root, &VirtualPathBuf::new("/a/b/f2").unwrap())
577            .await
578            .unwrap();
579
580        assert_eq!(node.kind, NodeKind::File.as_str());
581        assert_eq!(node.size, Some(54));
582
583        let diff = VfsDiff::file_modified(VirtualPathBuf::new("/a/b/f2").unwrap());
584        let failed_diff = FailedUpdateApplication::new(
585            diff.clone(),
586            LocalDirError::io(
587                &VirtualPathBuf::new("/a/b/f2").unwrap(),
588                io::Error::new(ErrorKind::AlreadyExists, "Error"),
589            )
590            .into(),
591        );
592
593        let failed_update: VfsUpdate<()> = VfsUpdate::FailedApplication(failed_diff);
594        db.update_vfs(fs_ref.id(), &failed_update).await.unwrap();
595
596        let nodes = db.list_all_nodes().await.unwrap();
597        assert_eq!(nodes.len(), 7);
598
599        let node = db
600            .find_node(&root, &VirtualPathBuf::new("/a/b/f2").unwrap())
601            .await
602            .unwrap();
603
604        let vfs_node = VfsNode::<()>::from(&node);
605        assert!(vfs_node.state().is_err());
606
607        let conflict: VfsUpdate<()> = VfsUpdate::Conflict(diff);
608        db.update_vfs(fs_ref.id(), &conflict).await.unwrap();
609
610        let nodes = db.list_all_nodes().await.unwrap();
611        assert_eq!(nodes.len(), 7);
612
613        let node = db
614            .find_node(&root, &VirtualPathBuf::new("/a/b/f2").unwrap())
615            .await
616            .unwrap();
617
618        let vfs_node = VfsNode::<()>::from(&node);
619        assert!(vfs_node.state().is_conflict());
620    }
621}