1use std::fmt::Debug;
3
4use diesel::prelude::*;
5use futures::future::Either;
6use serde::{Deserialize, Serialize, de::DeserializeOwned};
7use uuid::Uuid;
8
9use brume::{
10 update::VfsUpdate,
11 vfs::{DirTree, FileMeta, NodeKind, NodeState, Vfs, VfsNode},
12};
13use brume_daemon_proto::VirtualPath;
14
15use crate::schema::{filesystems, nodes};
16
17use super::{Database, DatabaseError};
18
19#[derive(Clone, Debug, Queryable, Selectable, Identifiable, Associations)]
21#[diesel(table_name = nodes)]
22#[diesel(belongs_to(DbVfsNode, foreign_key = parent))]
23#[diesel(check_for_backend(diesel::sqlite::Sqlite))]
24struct DbVfsNode {
25 id: i32,
26 name: String,
27 kind: String,
28 size: Option<i64>,
29 state: Option<Vec<u8>>,
30 parent: Option<i32>,
31}
32
33impl<'a, SyncInfo> From<&'a DbVfsNode> for VfsNode<SyncInfo>
34where
35 SyncInfo: Deserialize<'a>,
36{
37 fn from(value: &'a DbVfsNode) -> Self {
38 let kind = NodeKind::try_from(value.kind.as_str()).unwrap();
39 let state = value
40 .state
41 .as_ref()
42 .map(|st| bincode::deserialize(st).unwrap())
43 .unwrap_or(NodeState::NeedResync);
44
45 match kind {
46 NodeKind::Dir => Self::Dir(DirTree::new_with_state(&value.name, state)),
47 NodeKind::File => Self::File(FileMeta::new_with_state(
48 &value.name,
49 value.size.unwrap() as u64,
50 state,
51 )),
52 }
53 }
54}
55
56#[derive(Insertable)]
58#[diesel(table_name = nodes)]
59struct DbNewVfsNode<'a> {
60 name: &'a str,
61 kind: &'a str,
62 size: Option<i64>,
63 state: Option<&'a [u8]>,
64 parent: Option<i32>,
65}
66
67#[derive(AsChangeset)]
69#[diesel(table_name = nodes)]
70struct DbUpdatedVfsNode<'a> {
71 size: Option<i64>,
72 state: Option<&'a [u8]>,
73}
74
75impl Default for DbNewVfsNode<'static> {
76 fn default() -> Self {
77 Self::root()
78 }
79}
80
81impl DbNewVfsNode<'static> {
82 pub(super) fn root() -> Self {
83 Self {
84 name: "",
85 kind: NodeKind::Dir.as_str(),
86 size: None,
87 state: None,
88 parent: None,
89 }
90 }
91}
92
93impl Database {
94 pub async fn insert_vfs_root(&self) -> Result<i32, DatabaseError> {
95 use crate::schema::nodes::dsl::*;
96
97 let conn = self.pool.get().await?;
98 conn.interact(move |conn| {
99 let vfs_root = DbNewVfsNode::root();
100 diesel::insert_into(nodes)
101 .values(&vfs_root)
102 .returning(id)
103 .get_result(conn)
104 })
105 .await
106 .unwrap() .map_err(|e| e.into())
108 }
109
110 #[cfg(test)]
111 async fn list_all_nodes(&self) -> Result<Vec<DbVfsNode>, DatabaseError> {
112 use crate::schema::nodes::dsl::*;
113
114 let conn = self.pool.get().await?;
115 conn.interact(|conn| nodes.select(DbVfsNode::as_select()).load(conn))
116 .await
117 .unwrap() .map_err(|e| e.into())
119 }
120
121 async fn load_nodes_rec<SyncInfo: DeserializeOwned>(
123 &self,
124 parent_node: &DbVfsNode,
125 ) -> Result<VfsNode<SyncInfo>, DatabaseError> {
126 let mut vfs_node = parent_node.into();
127
128 if let VfsNode::Dir(dir) = &mut vfs_node {
129 let parent_node = parent_node.to_owned();
130
131 let db_children = {
132 let conn = self.pool.get().await?;
133 conn.interact(move |conn| {
134 DbVfsNode::belonging_to(&parent_node)
135 .select(DbVfsNode::as_select())
136 .load(conn)
137 })
138 .await
139 .unwrap()
140 }?;
141
142 for db_child in db_children {
143 let child = Box::pin(self.load_nodes_rec(&db_child)).await?;
144
145 dir.insert_child(child);
146 }
147 }
148 Ok(vfs_node)
149 }
150
151 pub async fn load_vfs<SyncInfo: DeserializeOwned + Debug>(
153 &self,
154 fs_uuid: Uuid,
155 ) -> Result<Vfs<SyncInfo>, DatabaseError> {
156 let vfs_root = self.get_vfs_root(fs_uuid).await?;
157
158 let loaded_root = self.load_nodes_rec(&vfs_root).await?;
159
160 Ok(Vfs::new(loaded_root))
161 }
162
163 async fn find_node(
165 &self,
166 root: &DbVfsNode,
167 path: &VirtualPath,
168 ) -> Result<DbVfsNode, DatabaseError> {
169 use crate::schema::nodes::dsl::*;
170
171 let mut current_node = root.clone();
172
173 for cur_name in path.iter() {
174 let conn = self.pool.get().await?;
175
176 let cur_name = cur_name.to_owned();
177 current_node = conn
178 .interact(move |conn| {
179 DbVfsNode::belonging_to(¤t_node)
180 .select(DbVfsNode::as_select())
181 .filter(name.eq(cur_name))
182 .first(conn)
183 })
184 .await
185 .unwrap() ?
187 }
188
189 Ok(current_node)
190 }
191
192 async fn insert_node_child<SyncInfo: Serialize>(
194 &self,
195 parent_node: i32,
196 node: &VfsNode<SyncInfo>,
197 ) -> Result<(), DatabaseError> {
198 match node {
199 VfsNode::Dir(dir_tree) => Box::pin(self.insert_dir_child(parent_node, dir_tree)).await,
200 VfsNode::File(file_meta) => self.insert_file_child(parent_node, file_meta).await,
201 }
202 }
203
204 async fn insert_file_child<SyncInfo: Serialize>(
206 &self,
207 parent_node: i32,
208 file: &FileMeta<SyncInfo>,
209 ) -> Result<(), DatabaseError> {
210 use crate::schema::nodes::dsl::*;
211
212 let file_name = file.name().to_string();
213 let file_size = file.size() as i64;
214 let file_state = bincode::serialize(file.state()).unwrap();
215
216 let conn = self.pool.get().await?;
217
218 conn.interact(move |conn| {
219 let db_node = DbNewVfsNode {
220 name: &file_name,
221 kind: NodeKind::File.as_str(),
222 size: Some(file_size),
223 state: Some(&file_state),
224 parent: Some(parent_node),
225 };
226
227 diesel::insert_into(nodes).values(&db_node).execute(conn)
228 })
229 .await
230 .unwrap() .map_err(|e| e.into())
232 .map(|_| ())
233 }
234
235 async fn insert_dir_child<SyncInfo: Serialize>(
237 &self,
238 parent_node: i32,
239 tree: &DirTree<SyncInfo>,
240 ) -> Result<(), DatabaseError> {
241 use crate::schema::nodes::dsl::*;
242
243 let dir_name = tree.name().to_string();
244 let dir_state = bincode::serialize(tree.state()).unwrap();
245
246 let new_id = {
247 let conn = self.pool.get().await?;
248
249 conn.interact(move |conn| {
250 let db_dir = DbNewVfsNode {
251 name: &dir_name,
252 kind: NodeKind::Dir.as_str(),
253 size: None,
254 state: Some(&dir_state),
255 parent: Some(parent_node),
256 };
257
258 diesel::insert_into(nodes)
259 .values(&db_dir)
260 .returning(id)
261 .get_result(conn)
262 })
263 .await
264 .unwrap() ?
266 };
267
268 for child in tree.children().iter() {
269 self.insert_node_child(new_id, child).await?;
270 }
271 Ok(())
272 }
273
274 async fn insert_dir<SyncInfo: Serialize>(
276 &self,
277 root: &DbVfsNode,
278 path: &VirtualPath,
279 tree: &DirTree<SyncInfo>,
280 ) -> Result<(), DatabaseError> {
281 let parent_node = path
282 .parent()
283 .map(|parent_path| Either::Left(self.find_node(root, parent_path)))
284 .unwrap_or_else(|| Either::Right(async { Ok(root.clone()) }))
285 .await?;
286
287 self.insert_dir_child(parent_node.id, tree).await
288 }
289
290 async fn update_file<SyncInfo: Serialize>(
292 &self,
293 root: &DbVfsNode,
294 path: &VirtualPath,
295 file: &FileMeta<SyncInfo>,
296 ) -> Result<(), DatabaseError> {
297 use crate::schema::nodes::dsl::*;
298
299 let file_size = file.size() as i64;
300 let file_state = bincode::serialize(file.state())?;
301
302 let node = self.find_node(root, path).await?;
303
304 let conn = self.pool.get().await?;
305
306 conn.interact(move |conn| {
307 let update = DbUpdatedVfsNode {
308 size: Some(file_size),
309 state: Some(&file_state),
310 };
311
312 diesel::update(nodes)
313 .filter(id.eq(node.id))
314 .set(update)
315 .execute(conn)
316 })
317 .await
318 .unwrap() .map_err(|e| e.into())
320 .map(|_| ())
321 }
322
323 async fn insert_file<SyncInfo: Serialize>(
325 &self,
326 root: &DbVfsNode,
327 path: &VirtualPath,
328 file: &FileMeta<SyncInfo>,
329 ) -> Result<(), DatabaseError> {
330 let parent_node = path
331 .parent()
332 .map(|parent_path| Either::Left(self.find_node(root, parent_path)))
333 .unwrap_or_else(|| Either::Right(async { Ok(root.clone()) }))
334 .await?;
335
336 self.insert_file_child(parent_node.id, file).await
337 }
338
339 async fn delete_node(&self, root: &DbVfsNode, path: &VirtualPath) -> Result<(), DatabaseError> {
341 let node = self.find_node(root, path).await?;
342
343 let conn = self.pool.get().await?;
344 conn.interact(move |conn| diesel::delete(&node).execute(conn))
345 .await
346 .unwrap() .map_err(|e| e.into())
348 .map(|_| ())
349 }
350
351 async fn set_node_state<SyncInfo: Serialize>(
353 &self,
354 root: &DbVfsNode,
355 path: &VirtualPath,
356 node_state: &NodeState<SyncInfo>,
357 ) -> Result<(), DatabaseError> {
358 use crate::schema::nodes::dsl::*;
359
360 let db_state = bincode::serialize(&node_state)?;
361
362 let node = self.find_node(root, path).await?;
363
364 let conn = self.pool.get().await?;
365
366 conn.interact(move |conn| {
367 diesel::update(nodes)
368 .filter(id.eq(node.id))
369 .set(state.eq(db_state))
370 .execute(conn)
371 })
372 .await
373 .unwrap() .map_err(|e| e.into())
375 .map(|_| ())
376 }
377
378 async fn get_vfs_root(&self, fs_uuid: Uuid) -> Result<DbVfsNode, DatabaseError> {
380 let conn = self.pool.get().await?;
381 conn.interact(move |conn| {
382 nodes::table
383 .inner_join(filesystems::table)
384 .filter(filesystems::uuid.eq(fs_uuid.as_bytes()))
385 .select(DbVfsNode::as_select())
386 .first(conn)
387 })
388 .await
389 .unwrap() .map_err(|e| e.into())
391 }
392
393 pub async fn update_vfs_node_state<SyncInfo: Serialize>(
395 &self,
396 fs_uuid: Uuid,
397 path: &VirtualPath,
398 node_state: &NodeState<SyncInfo>,
399 ) -> Result<(), DatabaseError> {
400 let vfs_root = self.get_vfs_root(fs_uuid).await?;
401
402 self.set_node_state(&vfs_root, path, node_state).await
403 }
404
405 pub async fn delete_vfs_node(
407 &self,
408 fs_uuid: Uuid,
409 path: &VirtualPath,
410 ) -> Result<(), DatabaseError> {
411 let vfs_root = self.get_vfs_root(fs_uuid).await?;
412
413 self.delete_node(&vfs_root, path).await
414 }
415
416 pub async fn update_vfs<SyncInfo: Serialize>(
418 &self,
419 fs_uuid: Uuid,
420 update: &VfsUpdate<SyncInfo>,
421 ) -> Result<(), DatabaseError> {
422 let vfs_root = self.get_vfs_root(fs_uuid).await?;
423
424 match update {
425 VfsUpdate::DirCreated(dir_creation) => {
426 self.insert_dir(&vfs_root, dir_creation.path(), dir_creation.dir())
427 .await
428 }
429 VfsUpdate::DirRemoved(path) => self.delete_node(&vfs_root, path).await,
430 VfsUpdate::FileCreated(file_creation) => {
431 let name = file_creation.path().name().to_owned();
432 let path = file_creation.path().to_owned();
433 let file =
434 FileMeta::new(&name, file_creation.file_size(), file_creation.sync_info());
435 self.insert_file(&vfs_root, &path, &file).await
436 }
437 VfsUpdate::FileModified(file_modification) => {
438 let name = file_modification.path().name().to_owned();
439 let path = file_modification.path().to_owned();
440 let file = FileMeta::new(
441 &name,
442 file_modification.file_size(),
443 file_modification.sync_info(),
444 );
445
446 self.update_file(&vfs_root, &path, &file).await
447 }
448 VfsUpdate::FileRemoved(path) => self.delete_node(&vfs_root, path).await,
449 VfsUpdate::FailedApplication(failed_update) => {
450 let path = failed_update.path().to_owned();
451 let state = NodeState::<SyncInfo>::Error(failed_update.clone());
452 self.set_node_state(&vfs_root, &path, &state).await
453 }
454 VfsUpdate::Conflict(vfs_diff) => {
455 let path = vfs_diff.path().to_owned();
456 let state = NodeState::<SyncInfo>::Conflict(vfs_diff.clone());
457 self.set_node_state(&vfs_root, &path, &state).await
458 }
459 }
460 }
461}
462
463#[cfg(test)]
464mod test {
465 use std::io::{self, ErrorKind};
466
467 use brume::{
468 concrete::local::LocalDirError,
469 update::{FailedUpdateApplication, VfsDiff, VfsDirCreation, VfsFileUpdate, VfsUpdate},
470 vfs::{DirTree, FileMeta, NodeKind, VfsNode},
471 };
472 use brume_daemon_proto::{
473 AnyFsCreationInfo, AnyFsRef, LocalDirCreationInfo, VirtualPath, VirtualPathBuf,
474 };
475
476 use crate::db::{Database, DatabaseConfig};
477
478 #[tokio::test]
479 async fn test_update_db_vfs() {
480 let db = Database::new(&DatabaseConfig::InMemory).await.unwrap();
481
482 let mut a = DirTree::new("a", ());
483 let f1 = FileMeta::new("f1", 10, ());
484 let mut b = DirTree::new("b", ());
485 let c = DirTree::new("c", ());
486 let d = DirTree::new("d", ());
487
488 b.insert_child(VfsNode::Dir(c));
489 a.insert_child(VfsNode::File(f1));
490 a.insert_child(VfsNode::Dir(b));
491
492 let mut base_root = DirTree::new("", ());
493 base_root.insert_child(VfsNode::Dir(a.clone()));
494 let base_vfs = VfsNode::Dir(base_root);
495
496 let creation_update = VfsUpdate::DirCreated(VfsDirCreation::new(VirtualPath::root(), a));
497
498 let fs_info = AnyFsCreationInfo::LocalDir(LocalDirCreationInfo::new("/tmp/test"));
499 let fs_ref = AnyFsRef::from(fs_info.clone());
500 db.insert_new_filesystem(fs_ref.id(), &fs_info)
501 .await
502 .unwrap();
503
504 let nodes = db.list_all_nodes().await.unwrap();
505
506 assert_eq!(nodes.len(), 1);
508
509 db.update_vfs(fs_ref.id(), &creation_update).await.unwrap();
510
511 let nodes = db.list_all_nodes().await.unwrap();
512 assert_eq!(nodes.len(), 5);
513
514 let root = db.get_vfs_root(fs_ref.id()).await.unwrap();
515
516 let node = db
517 .find_node(&root, &VirtualPathBuf::new("/a/f1").unwrap())
518 .await
519 .unwrap();
520
521 assert_eq!(node.kind, NodeKind::File.as_str());
522
523 let node = db
524 .find_node(&root, &VirtualPathBuf::new("/a/b").unwrap())
525 .await
526 .unwrap();
527
528 assert_eq!(node.kind, NodeKind::Dir.as_str());
529
530 let vfs_root = db.load_nodes_rec::<()>(&root).await.unwrap();
531 assert!(vfs_root.structural_eq(&base_vfs));
532
533 let creation_update =
534 VfsUpdate::DirCreated(VfsDirCreation::new(&VirtualPathBuf::new("/a").unwrap(), d));
535 db.update_vfs(fs_ref.id(), &creation_update).await.unwrap();
536
537 let nodes = db.list_all_nodes().await.unwrap();
538 assert_eq!(nodes.len(), 6);
539
540 let node = db
541 .find_node(&root, &VirtualPathBuf::new("/a/d").unwrap())
542 .await
543 .unwrap();
544
545 assert_eq!(node.kind, NodeKind::Dir.as_str());
546
547 let creation_update = VfsUpdate::FileCreated(VfsFileUpdate::new(
548 &VirtualPathBuf::new("/a/b/f2").unwrap(),
549 42,
550 (),
551 ));
552 db.update_vfs(fs_ref.id(), &creation_update).await.unwrap();
553
554 let nodes = db.list_all_nodes().await.unwrap();
555 assert_eq!(nodes.len(), 7);
556
557 let node = db
558 .find_node(&root, &VirtualPathBuf::new("/a/b/f2").unwrap())
559 .await
560 .unwrap();
561
562 assert_eq!(node.kind, NodeKind::File.as_str());
563 assert_eq!(node.size, Some(42));
564
565 let creation_update = VfsUpdate::FileModified(VfsFileUpdate::new(
566 &VirtualPathBuf::new("/a/b/f2").unwrap(),
567 54,
568 (),
569 ));
570 db.update_vfs(fs_ref.id(), &creation_update).await.unwrap();
571
572 let nodes = db.list_all_nodes().await.unwrap();
573 assert_eq!(nodes.len(), 7);
574
575 let node = db
576 .find_node(&root, &VirtualPathBuf::new("/a/b/f2").unwrap())
577 .await
578 .unwrap();
579
580 assert_eq!(node.kind, NodeKind::File.as_str());
581 assert_eq!(node.size, Some(54));
582
583 let diff = VfsDiff::file_modified(VirtualPathBuf::new("/a/b/f2").unwrap());
584 let failed_diff = FailedUpdateApplication::new(
585 diff.clone(),
586 LocalDirError::io(
587 &VirtualPathBuf::new("/a/b/f2").unwrap(),
588 io::Error::new(ErrorKind::AlreadyExists, "Error"),
589 )
590 .into(),
591 );
592
593 let failed_update: VfsUpdate<()> = VfsUpdate::FailedApplication(failed_diff);
594 db.update_vfs(fs_ref.id(), &failed_update).await.unwrap();
595
596 let nodes = db.list_all_nodes().await.unwrap();
597 assert_eq!(nodes.len(), 7);
598
599 let node = db
600 .find_node(&root, &VirtualPathBuf::new("/a/b/f2").unwrap())
601 .await
602 .unwrap();
603
604 let vfs_node = VfsNode::<()>::from(&node);
605 assert!(vfs_node.state().is_err());
606
607 let conflict: VfsUpdate<()> = VfsUpdate::Conflict(diff);
608 db.update_vfs(fs_ref.id(), &conflict).await.unwrap();
609
610 let nodes = db.list_all_nodes().await.unwrap();
611 assert_eq!(nodes.len(), 7);
612
613 let node = db
614 .find_node(&root, &VirtualPathBuf::new("/a/b/f2").unwrap())
615 .await
616 .unwrap();
617
618 let vfs_node = VfsNode::<()>::from(&node);
619 assert!(vfs_node.state().is_conflict());
620 }
621}