1use std::collections::HashMap;
7use std::path::PathBuf;
8use std::sync::Arc;
9
10use tokio::sync::{broadcast, mpsc, oneshot, RwLock};
11use uuid::Uuid;
12
13use crate::database::models::FuseMount;
14use crate::database::types::MountStatus;
15use crate::database::Database;
16use crate::fuse::cache::FileCacheConfig;
17use crate::fuse::jax_fs::JaxFs;
18use crate::fuse::sync_events::{SaveRequest, SyncEvent};
19use crate::fuse::FileCache;
20use common::mount::{ConflictFile, Mount};
21use common::peer::Peer;
22
23#[derive(Debug, Clone)]
25pub struct MountManagerConfig {
26 pub sync_event_capacity: usize,
28}
29
30impl Default for MountManagerConfig {
31 fn default() -> Self {
32 Self {
33 sync_event_capacity: 256,
34 }
35 }
36}
37
38pub struct SessionHandle {
43 unmount_tx: Option<oneshot::Sender<()>>,
45}
46
47impl SessionHandle {
48 fn spawn(session: fuser::BackgroundSession) -> Self {
50 let (unmount_tx, unmount_rx) = oneshot::channel();
51
52 std::thread::spawn(move || {
55 let _session = session;
57 let _ = unmount_rx.blocking_recv();
59 });
61
62 Self {
63 unmount_tx: Some(unmount_tx),
64 }
65 }
66
67 fn unmount(&mut self) {
69 if let Some(tx) = self.unmount_tx.take() {
70 let _ = tx.send(());
71 }
72 }
73}
74
75pub struct LiveMount {
77 pub mount: Arc<RwLock<Mount>>,
79 pub session: Option<SessionHandle>,
81 pub cache: FileCache,
83 pub config: FuseMount,
85}
86
87impl std::fmt::Debug for LiveMount {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 f.debug_struct("LiveMount")
90 .field("config", &self.config)
91 .field("has_session", &self.session.is_some())
92 .finish()
93 }
94}
95
96pub struct MountManager {
100 mounts: RwLock<HashMap<Uuid, LiveMount>>,
102 db: Database,
104 peer: Peer<Database>,
106 sync_tx: broadcast::Sender<SyncEvent>,
108}
109
110impl MountManager {
111 pub fn new(db: Database, peer: Peer<Database>, config: MountManagerConfig) -> Self {
113 let (sync_tx, _) = broadcast::channel(config.sync_event_capacity);
114
115 Self {
116 mounts: RwLock::new(HashMap::new()),
117 db,
118 peer,
119 sync_tx,
120 }
121 }
122
123 pub fn subscribe_sync_events(&self) -> broadcast::Receiver<SyncEvent> {
125 self.sync_tx.subscribe()
126 }
127
128 pub fn sync_sender(&self) -> broadcast::Sender<SyncEvent> {
130 self.sync_tx.clone()
131 }
132
133 pub async fn on_bucket_synced(&self, bucket_id: Uuid) -> Result<(), MountError> {
139 let mounts = self.mounts.read().await;
140
141 for (mount_id, live_mount) in mounts.iter() {
142 if *live_mount.config.bucket_id == bucket_id {
143 let has_local_changes = {
145 let mount_guard = live_mount.mount.read().await;
146 let inner = mount_guard.inner().await;
147 !inner.ops_log().is_empty()
148 };
149
150 if has_local_changes {
151 tracing::info!(
152 "Mount {} has local changes, merging with incoming sync",
153 mount_id
154 );
155
156 let incoming = self
158 .peer
159 .mount(bucket_id)
160 .await
161 .map_err(|e| MountError::MountLoad(e.into()))?;
162
163 let resolver = ConflictFile::new();
165
166 let mut mount_guard = live_mount.mount.write().await;
168 match mount_guard
169 .merge_from(&incoming, &resolver, self.peer.blobs())
170 .await
171 {
172 Ok((result, link)) => {
173 for resolved in &result.conflicts_resolved {
175 tracing::info!(
176 "Resolved conflict for {:?}: {:?}",
177 resolved.conflict.path,
178 resolved.resolution
179 );
180 }
181
182 tracing::info!(
183 "Merged {} operations, {} conflicts resolved, new link: {}",
184 result.operations_added,
185 result.conflicts_resolved.len(),
186 link.hash()
187 );
188
189 if let Err(e) = self.peer.save_mount(&mount_guard, false).await {
191 tracing::error!("Failed to save merged mount {}: {}", mount_id, e);
192 }
193 }
194 Err(e) => {
195 tracing::error!(
196 "Failed to merge mount {} with incoming changes: {}",
197 mount_id,
198 e
199 );
200 let new_mount = self
202 .peer
203 .mount(bucket_id)
204 .await
205 .map_err(|e| MountError::MountLoad(e.into()))?;
206 *mount_guard = new_mount;
207 }
208 }
209 } else {
210 let new_mount = self
212 .peer
213 .mount(bucket_id)
214 .await
215 .map_err(|e| MountError::MountLoad(e.into()))?;
216
217 *live_mount.mount.write().await = new_mount;
218 }
219
220 live_mount.cache.invalidate_all();
222
223 let _ = self.sync_tx.send(SyncEvent::MountInvalidated {
225 mount_id: *mount_id,
226 });
227
228 tracing::debug!(
229 "Mount {} cache invalidated after bucket {} sync",
230 mount_id,
231 bucket_id
232 );
233 }
234 }
235
236 Ok(())
237 }
238
239 pub async fn create_mount(
241 &self,
242 bucket_id: Uuid,
243 mount_point: &str,
244 auto_mount: bool,
245 read_only: bool,
246 cache_size_mb: Option<u32>,
247 cache_ttl_secs: Option<u32>,
248 ) -> Result<FuseMount, MountError> {
249 let bucket_info = self
251 .db
252 .get_bucket_info(&bucket_id)
253 .await
254 .map_err(MountError::Database)?;
255
256 if bucket_info.is_none() {
257 return Err(MountError::BucketNotFound(bucket_id));
258 }
259
260 let mount_path = PathBuf::from(mount_point);
262 if !mount_path.exists() {
263 std::fs::create_dir_all(&mount_path).map_err(|e| {
264 MountError::MountPointNotFound(format!("{} (failed to create: {})", mount_point, e))
265 })?;
266 }
267
268 if !mount_path.is_dir() {
269 return Err(MountError::MountPointNotDirectory(mount_point.to_string()));
270 }
271
272 let mount = FuseMount::create(
274 bucket_id,
275 mount_point,
276 auto_mount,
277 read_only,
278 cache_size_mb.map(|v| v as i64),
279 cache_ttl_secs.map(|v| v as i64),
280 &self.db,
281 )
282 .await
283 .map_err(MountError::Database)?;
284
285 tracing::info!(
286 "Created mount {} for bucket {} at {}",
287 mount.mount_id,
288 mount.bucket_id,
289 mount.mount_point
290 );
291
292 Ok(mount)
293 }
294
295 pub async fn get(&self, mount_id: &Uuid) -> Result<Option<FuseMount>, MountError> {
297 FuseMount::get(*mount_id, &self.db)
298 .await
299 .map_err(MountError::Database)
300 }
301
302 pub async fn list(&self) -> Result<Vec<FuseMount>, MountError> {
304 FuseMount::list(&self.db)
305 .await
306 .map_err(MountError::Database)
307 }
308
309 #[allow(clippy::too_many_arguments)]
311 pub async fn update(
312 &self,
313 mount_id: &Uuid,
314 mount_point: Option<&str>,
315 enabled: Option<bool>,
316 auto_mount: Option<bool>,
317 read_only: Option<bool>,
318 cache_size_mb: Option<u32>,
319 cache_ttl_secs: Option<u32>,
320 ) -> Result<Option<FuseMount>, MountError> {
321 FuseMount::update(
322 *mount_id,
323 mount_point,
324 enabled,
325 auto_mount,
326 read_only,
327 cache_size_mb.map(|v| v as i64),
328 cache_ttl_secs.map(|v| v as i64),
329 &self.db,
330 )
331 .await
332 .map_err(MountError::Database)
333 }
334
335 pub async fn delete(&self, mount_id: &Uuid) -> Result<bool, MountError> {
337 let _ = self.stop(mount_id).await;
339
340 self.mounts.write().await.remove(mount_id);
342
343 FuseMount::delete(*mount_id, &self.db)
345 .await
346 .map_err(MountError::Database)
347 }
348
349 pub async fn start(&self, mount_id: &Uuid) -> Result<(), MountError> {
351 let mount_config = FuseMount::get(*mount_id, &self.db)
353 .await
354 .map_err(MountError::Database)?
355 .ok_or(MountError::MountNotFound(*mount_id))?;
356
357 {
359 let mounts = self.mounts.read().await;
360 if let Some(live) = mounts.get(mount_id) {
361 if live.session.is_some() {
362 return Err(MountError::AlreadyRunning(*mount_id));
363 }
364 }
365 }
366
367 FuseMount::update_status(*mount_id, MountStatus::Starting, None, &self.db)
369 .await
370 .map_err(MountError::Database)?;
371
372 let bucket_mount = self
374 .peer
375 .mount(*mount_config.bucket_id)
376 .await
377 .map_err(|e| MountError::MountLoad(e.into()))?;
378
379 let cache = FileCache::new(crate::fuse::cache::FileCacheConfig {
381 max_size_mb: mount_config.cache_size_mb as u32,
382 ttl_secs: mount_config.cache_ttl_secs as u32,
383 });
384
385 let mount_arc = Arc::new(RwLock::new(bucket_mount));
387 let sync_rx = self.subscribe_sync_events();
388
389 let (save_tx, save_rx) = mpsc::channel::<SaveRequest>(32);
391
392 let fs = JaxFs::new(
393 tokio::runtime::Handle::current(),
394 mount_arc.clone(),
395 *mount_id,
396 *mount_config.bucket_id,
397 FileCacheConfig {
398 max_size_mb: mount_config.cache_size_mb as u32,
399 ttl_secs: mount_config.cache_ttl_secs as u32,
400 },
401 *mount_config.read_only,
402 Some(sync_rx),
403 Some(save_tx),
404 );
405
406 self.spawn_save_handler(save_rx, mount_arc.clone());
408
409 #[cfg(target_os = "linux")]
411 let options = vec![
412 fuser::MountOption::FSName("jax".to_string()),
413 fuser::MountOption::AutoUnmount,
414 fuser::MountOption::AllowOther,
415 ];
416
417 #[cfg(target_os = "macos")]
418 let options = {
419 let bucket_name = self
421 .db
422 .get_bucket_info(&mount_config.bucket_id)
423 .await
424 .ok()
425 .flatten()
426 .map(|info| info.name)
427 .unwrap_or_else(|| "jax".to_string());
428
429 vec![
430 fuser::MountOption::FSName("jax".to_string()),
431 fuser::MountOption::AutoUnmount,
432 fuser::MountOption::CUSTOM(format!("volname={}", bucket_name)),
433 fuser::MountOption::CUSTOM("local".to_string()),
434 fuser::MountOption::CUSTOM("noappledouble".to_string()),
435 ]
436 };
437
438 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
439 let options = vec![
440 fuser::MountOption::FSName("jax".to_string()),
441 fuser::MountOption::AutoUnmount,
442 ];
443
444 let mount_path = std::path::Path::new(&mount_config.mount_point);
446 tracing::info!("Mounting FUSE filesystem at {:?}", mount_path);
447
448 let session = fuser::spawn_mount2(fs, mount_path, &options).map_err(|e| {
449 MountError::SpawnFailed(format!(
450 "Failed to mount at {}: {}",
451 mount_config.mount_point, e
452 ))
453 })?;
454
455 let session_handle = SessionHandle::spawn(session);
457
458 let live_mount = LiveMount {
460 mount: mount_arc,
461 session: Some(session_handle),
462 cache,
463 config: mount_config.clone(),
464 };
465
466 self.mounts.write().await.insert(*mount_id, live_mount);
468
469 FuseMount::update_status(*mount_id, MountStatus::Running, None, &self.db)
471 .await
472 .map_err(MountError::Database)?;
473
474 tracing::info!("Started mount {} at {}", mount_id, mount_config.mount_point);
475
476 Ok(())
477 }
478
479 pub async fn stop(&self, mount_id: &Uuid) -> Result<(), MountError> {
481 let _ = FuseMount::update_status(*mount_id, MountStatus::Stopping, None, &self.db).await;
483
484 let mount_point = {
485 let mut mounts = self.mounts.write().await;
486 if let Some(live) = mounts.get_mut(mount_id) {
487 if let Some(ref mut session) = live.session {
489 session.unmount();
490 }
491 live.session.take();
492
493 live.config.mount_point.clone()
494 } else {
495 match FuseMount::get(*mount_id, &self.db).await {
497 Ok(Some(config)) => config.mount_point,
498 _ => return Ok(()),
499 }
500 }
501 };
502
503 self.unmount_path(&mount_point).await?;
505
506 FuseMount::update_status(*mount_id, MountStatus::Stopped, None, &self.db)
508 .await
509 .map_err(MountError::Database)?;
510
511 tracing::info!("Stopped mount {} at {}", mount_id, mount_point);
512
513 Ok(())
514 }
515
516 pub async fn stop_all(&self) -> Result<(), MountError> {
518 let mount_ids: Vec<Uuid> = {
519 let mounts = self.mounts.read().await;
520 mounts.keys().copied().collect()
521 };
522
523 for mount_id in mount_ids {
524 if let Err(e) = self.stop(&mount_id).await {
525 tracing::error!("Failed to stop mount {}: {}", mount_id, e);
526 }
527 }
528
529 Ok(())
530 }
531
532 pub async fn start_auto(&self) -> Result<(), MountError> {
534 let auto_mounts = FuseMount::auto_list(&self.db)
535 .await
536 .map_err(MountError::Database)?;
537
538 tracing::info!("Starting {} auto-mount(s)", auto_mounts.len());
539
540 for mount in auto_mounts {
541 if let Err(e) = self.start(&mount.mount_id).await {
542 tracing::error!(
543 "Failed to auto-mount {} at {}: {}",
544 mount.mount_id,
545 mount.mount_point,
546 e
547 );
548
549 let _ = FuseMount::update_status(
551 *mount.mount_id,
552 MountStatus::Error,
553 Some(&e.to_string()),
554 &self.db,
555 )
556 .await;
557 }
558 }
559
560 Ok(())
561 }
562
563 pub async fn get_live_mount(&self, mount_id: &Uuid) -> Option<Arc<RwLock<Mount>>> {
565 let mounts = self.mounts.read().await;
566 mounts.get(mount_id).map(|m| m.mount.clone())
567 }
568
569 pub async fn get_mount_cache(&self, mount_id: &Uuid) -> Option<FileCache> {
571 let mounts = self.mounts.read().await;
572 mounts.get(mount_id).map(|m| m.cache.clone())
573 }
574
575 fn spawn_save_handler(
577 &self,
578 mut save_rx: mpsc::Receiver<SaveRequest>,
579 mount: Arc<RwLock<Mount>>,
580 ) {
581 let peer = self.peer.clone();
582
583 tokio::spawn(async move {
584 while let Some(request) = save_rx.recv().await {
585 tracing::debug!("Received save request for mount {}", request.mount_id);
586
587 let mount_guard = mount.read().await;
589 match peer.save_mount(&mount_guard, false).await {
590 Ok(link) => {
591 tracing::info!(
592 "Successfully saved mount {} to {}",
593 request.mount_id,
594 link.hash()
595 );
596 }
597 Err(e) => {
598 tracing::error!("Failed to save mount {}: {}", request.mount_id, e);
599 }
600 }
601 }
602
603 tracing::debug!("Save handler shutting down");
604 });
605 }
606
607 async fn unmount_path(&self, mount_point: &str) -> Result<(), MountError> {
609 use std::process::Command;
610
611 #[cfg(target_os = "macos")]
612 {
613 let status = Command::new("umount")
614 .arg(mount_point)
615 .status()
616 .map_err(|e| MountError::UnmountFailed(e.to_string()))?;
617
618 if !status.success() {
619 let _ = Command::new("diskutil")
621 .args(["unmount", "force", mount_point])
622 .status();
623 }
624 }
625
626 #[cfg(target_os = "linux")]
627 {
628 let status = Command::new("fusermount")
629 .args(["-u", mount_point])
630 .status()
631 .map_err(|e| MountError::UnmountFailed(e.to_string()))?;
632
633 if !status.success() {
634 let _ = Command::new("fusermount")
636 .args(["-uz", mount_point])
637 .status();
638 }
639 }
640
641 #[cfg(not(any(target_os = "macos", target_os = "linux")))]
642 {
643 tracing::warn!("Unmount not implemented for this platform");
644 }
645
646 Ok(())
647 }
648}
649
650impl std::fmt::Debug for MountManager {
651 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
652 f.debug_struct("MountManager")
653 .field("peer_id", &self.peer.id())
654 .finish()
655 }
656}
657
658#[derive(Debug, thiserror::Error)]
660pub enum MountError {
661 #[error("database error: {0}")]
662 Database(#[from] sqlx::Error),
663
664 #[error("bucket not found: {0}")]
665 BucketNotFound(Uuid),
666
667 #[error("mount not found: {0}")]
668 MountNotFound(Uuid),
669
670 #[error("mount point not found: {0}")]
671 MountPointNotFound(String),
672
673 #[error("mount point is not a directory: {0}")]
674 MountPointNotDirectory(String),
675
676 #[error("mount already running: {0}")]
677 AlreadyRunning(Uuid),
678
679 #[error("failed to load bucket mount: {0}")]
680 MountLoad(#[source] anyhow::Error),
681
682 #[error("failed to spawn FUSE process: {0}")]
683 SpawnFailed(String),
684
685 #[error("failed to unmount: {0}")]
686 UnmountFailed(String),
687}