1use std::collections::HashMap;
7use std::path::PathBuf;
8use std::sync::Arc;
9
10use tokio::sync::{broadcast, oneshot, RwLock};
11use uuid::Uuid;
12
13use crate::database::models::FuseMount;
14use crate::database::types::MountStatus;
15use crate::database::Database;
16use crate::fuse::cache::FileCacheConfig;
17use crate::fuse::jax_fs::JaxFs;
18use crate::fuse::sync_events::SyncEvent;
19use crate::fuse::FileCache;
20use crate::http_server::api::client::ApiClient;
21use common::mount::{ConflictFile, Mount};
22use common::peer::Peer;
23
24#[derive(Debug, Clone)]
26pub struct MountManagerConfig {
27 pub sync_event_capacity: usize,
29 pub api_port: u16,
31}
32
33impl Default for MountManagerConfig {
34 fn default() -> Self {
35 Self {
36 sync_event_capacity: 256,
37 api_port: 5001,
38 }
39 }
40}
41
42pub struct SessionHandle {
47 unmount_tx: Option<oneshot::Sender<()>>,
49}
50
51impl SessionHandle {
52 fn spawn(session: fuser::BackgroundSession) -> Self {
54 let (unmount_tx, unmount_rx) = oneshot::channel();
55
56 std::thread::spawn(move || {
59 let _session = session;
61 let _ = unmount_rx.blocking_recv();
63 });
65
66 Self {
67 unmount_tx: Some(unmount_tx),
68 }
69 }
70
71 fn unmount(&mut self) {
73 if let Some(tx) = self.unmount_tx.take() {
74 let _ = tx.send(());
75 }
76 }
77}
78
79pub struct LiveMount {
81 pub mount: Arc<RwLock<Mount>>,
83 pub session: Option<SessionHandle>,
85 pub cache: FileCache,
87 pub config: FuseMount,
89}
90
91impl std::fmt::Debug for LiveMount {
92 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93 f.debug_struct("LiveMount")
94 .field("config", &self.config)
95 .field("has_session", &self.session.is_some())
96 .finish()
97 }
98}
99
100pub struct MountManager {
104 mounts: RwLock<HashMap<Uuid, LiveMount>>,
106 db: Database,
108 peer: Peer<Database>,
110 sync_tx: broadcast::Sender<SyncEvent>,
112 api_port: u16,
114}
115
116impl MountManager {
117 pub fn new(db: Database, peer: Peer<Database>, config: MountManagerConfig) -> Self {
119 let (sync_tx, _) = broadcast::channel(config.sync_event_capacity);
120
121 Self {
122 mounts: RwLock::new(HashMap::new()),
123 db,
124 peer,
125 sync_tx,
126 api_port: config.api_port,
127 }
128 }
129
130 pub fn subscribe_sync_events(&self) -> broadcast::Receiver<SyncEvent> {
132 self.sync_tx.subscribe()
133 }
134
135 pub fn sync_sender(&self) -> broadcast::Sender<SyncEvent> {
137 self.sync_tx.clone()
138 }
139
140 pub async fn on_bucket_synced(&self, bucket_id: Uuid) -> Result<(), MountError> {
146 let mounts = self.mounts.read().await;
147
148 for (mount_id, live_mount) in mounts.iter() {
149 if *live_mount.config.bucket_id == bucket_id {
150 let has_local_changes = {
152 let mount_guard = live_mount.mount.read().await;
153 let inner = mount_guard.inner().await;
154 !inner.ops_log().is_empty()
155 };
156
157 if has_local_changes {
158 tracing::info!(
159 "Mount {} has local changes, merging with incoming sync",
160 mount_id
161 );
162
163 let incoming = self
165 .peer
166 .mount(bucket_id)
167 .await
168 .map_err(|e| MountError::MountLoad(e.into()))?;
169
170 let resolver = ConflictFile::new();
172
173 let mut mount_guard = live_mount.mount.write().await;
175 match mount_guard
176 .merge_from(&incoming, &resolver, self.peer.blobs())
177 .await
178 {
179 Ok((result, link)) => {
180 for resolved in &result.conflicts_resolved {
182 tracing::info!(
183 "Resolved conflict for {:?}: {:?}",
184 resolved.conflict.path,
185 resolved.resolution
186 );
187 }
188
189 tracing::info!(
190 "Merged {} operations, {} conflicts resolved, new link: {}",
191 result.operations_added,
192 result.conflicts_resolved.len(),
193 link.hash()
194 );
195
196 if let Err(e) = self.peer.save_mount(&mount_guard, None).await {
198 tracing::error!("Failed to save merged mount {}: {}", mount_id, e);
199 }
200 }
201 Err(e) => {
202 tracing::error!(
203 "Failed to merge mount {} with incoming changes: {}",
204 mount_id,
205 e
206 );
207 let new_mount = self
209 .peer
210 .mount(bucket_id)
211 .await
212 .map_err(|e| MountError::MountLoad(e.into()))?;
213 *mount_guard = new_mount;
214 }
215 }
216 } else {
217 let new_mount = self
219 .peer
220 .mount(bucket_id)
221 .await
222 .map_err(|e| MountError::MountLoad(e.into()))?;
223
224 *live_mount.mount.write().await = new_mount;
225 }
226
227 live_mount.cache.invalidate_all();
229
230 let _ = self.sync_tx.send(SyncEvent::MountInvalidated {
232 mount_id: *mount_id,
233 });
234
235 tracing::debug!(
236 "Mount {} cache invalidated after bucket {} sync",
237 mount_id,
238 bucket_id
239 );
240 }
241 }
242
243 Ok(())
244 }
245
246 pub async fn create_mount(
248 &self,
249 bucket_id: Uuid,
250 mount_point: &str,
251 auto_mount: bool,
252 read_only: bool,
253 cache_size_mb: Option<u32>,
254 cache_ttl_secs: Option<u32>,
255 ) -> Result<FuseMount, MountError> {
256 let bucket_info = self
258 .db
259 .get_bucket_info(&bucket_id)
260 .await
261 .map_err(MountError::Database)?;
262
263 if bucket_info.is_none() {
264 return Err(MountError::BucketNotFound(bucket_id));
265 }
266
267 let mount_path = PathBuf::from(mount_point);
269 if !mount_path.exists() {
270 std::fs::create_dir_all(&mount_path).map_err(|e| {
271 MountError::MountPointNotFound(format!("{} (failed to create: {})", mount_point, e))
272 })?;
273 }
274
275 if !mount_path.is_dir() {
276 return Err(MountError::MountPointNotDirectory(mount_point.to_string()));
277 }
278
279 let mount = FuseMount::create(
281 bucket_id,
282 mount_point,
283 auto_mount,
284 read_only,
285 cache_size_mb.map(|v| v as i64),
286 cache_ttl_secs.map(|v| v as i64),
287 &self.db,
288 )
289 .await
290 .map_err(MountError::Database)?;
291
292 tracing::info!(
293 "Created mount {} for bucket {} at {}",
294 mount.mount_id,
295 mount.bucket_id,
296 mount.mount_point
297 );
298
299 Ok(mount)
300 }
301
302 pub async fn get(&self, mount_id: &Uuid) -> Result<Option<FuseMount>, MountError> {
304 FuseMount::get(*mount_id, &self.db)
305 .await
306 .map_err(MountError::Database)
307 }
308
309 pub async fn list(&self) -> Result<Vec<FuseMount>, MountError> {
311 FuseMount::list(&self.db)
312 .await
313 .map_err(MountError::Database)
314 }
315
316 #[allow(clippy::too_many_arguments)]
318 pub async fn update(
319 &self,
320 mount_id: &Uuid,
321 mount_point: Option<&str>,
322 enabled: Option<bool>,
323 auto_mount: Option<bool>,
324 read_only: Option<bool>,
325 cache_size_mb: Option<u32>,
326 cache_ttl_secs: Option<u32>,
327 ) -> Result<Option<FuseMount>, MountError> {
328 FuseMount::update(
329 *mount_id,
330 mount_point,
331 enabled,
332 auto_mount,
333 read_only,
334 cache_size_mb.map(|v| v as i64),
335 cache_ttl_secs.map(|v| v as i64),
336 &self.db,
337 )
338 .await
339 .map_err(MountError::Database)
340 }
341
342 pub async fn delete(&self, mount_id: &Uuid) -> Result<bool, MountError> {
344 let _ = self.stop(mount_id).await;
346
347 self.mounts.write().await.remove(mount_id);
349
350 FuseMount::delete(*mount_id, &self.db)
352 .await
353 .map_err(MountError::Database)
354 }
355
356 pub async fn start(&self, mount_id: &Uuid) -> Result<(), MountError> {
358 let mount_config = FuseMount::get(*mount_id, &self.db)
360 .await
361 .map_err(MountError::Database)?
362 .ok_or(MountError::MountNotFound(*mount_id))?;
363
364 {
366 let mounts = self.mounts.read().await;
367 if let Some(live) = mounts.get(mount_id) {
368 if live.session.is_some() {
369 return Err(MountError::AlreadyRunning(*mount_id));
370 }
371 }
372 }
373
374 FuseMount::update_status(*mount_id, MountStatus::Starting, None, &self.db)
376 .await
377 .map_err(MountError::Database)?;
378
379 let bucket_mount = self
381 .peer
382 .mount(*mount_config.bucket_id)
383 .await
384 .map_err(|e| MountError::MountLoad(e.into()))?;
385
386 let cache = FileCache::new(FileCacheConfig::from_basic(
388 mount_config.cache_size_mb as u32,
389 mount_config.cache_ttl_secs as u32,
390 ));
391
392 let mount_arc = Arc::new(RwLock::new(bucket_mount));
395 let sync_rx = self.subscribe_sync_events();
396
397 let api_base_url = url::Url::parse(&format!("http://localhost:{}", self.api_port))
398 .expect("valid localhost URL");
399 let api_client = ApiClient::new(&api_base_url)
400 .map_err(|e| MountError::SpawnFailed(format!("Failed to create API client: {}", e)))?;
401
402 let fs = JaxFs::new(
403 tokio::runtime::Handle::current(),
404 mount_arc.clone(),
405 *mount_id,
406 *mount_config.bucket_id,
407 FileCacheConfig::from_basic(
408 mount_config.cache_size_mb as u32,
409 mount_config.cache_ttl_secs as u32,
410 ),
411 *mount_config.read_only,
412 Some(sync_rx),
413 api_client,
414 );
415
416 #[cfg(target_os = "linux")]
418 let options = vec![
419 fuser::MountOption::FSName("jax".to_string()),
420 fuser::MountOption::AutoUnmount,
421 fuser::MountOption::AllowOther,
422 ];
423
424 #[cfg(target_os = "macos")]
425 let options = {
426 let bucket_name = self
428 .db
429 .get_bucket_info(&mount_config.bucket_id)
430 .await
431 .ok()
432 .flatten()
433 .map(|info| info.name)
434 .unwrap_or_else(|| "jax".to_string());
435
436 vec![
437 fuser::MountOption::FSName("jax".to_string()),
438 fuser::MountOption::AutoUnmount,
439 fuser::MountOption::CUSTOM(format!("volname={}", bucket_name)),
440 fuser::MountOption::CUSTOM("local".to_string()),
441 fuser::MountOption::CUSTOM("noappledouble".to_string()),
442 ]
443 };
444
445 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
446 let options = vec![
447 fuser::MountOption::FSName("jax".to_string()),
448 fuser::MountOption::AutoUnmount,
449 ];
450
451 let mount_path = std::path::Path::new(&mount_config.mount_point);
453 tracing::info!("Mounting FUSE filesystem at {:?}", mount_path);
454
455 let session = fuser::spawn_mount2(fs, mount_path, &options).map_err(|e| {
456 MountError::SpawnFailed(format!(
457 "Failed to mount at {}: {}",
458 mount_config.mount_point, e
459 ))
460 })?;
461
462 let session_handle = SessionHandle::spawn(session);
464
465 let live_mount = LiveMount {
467 mount: mount_arc,
468 session: Some(session_handle),
469 cache,
470 config: mount_config.clone(),
471 };
472
473 self.mounts.write().await.insert(*mount_id, live_mount);
475
476 FuseMount::update_status(*mount_id, MountStatus::Running, None, &self.db)
478 .await
479 .map_err(MountError::Database)?;
480
481 tracing::info!("Started mount {} at {}", mount_id, mount_config.mount_point);
482
483 Ok(())
484 }
485
486 pub async fn stop(&self, mount_id: &Uuid) -> Result<(), MountError> {
488 let _ = FuseMount::update_status(*mount_id, MountStatus::Stopping, None, &self.db).await;
490
491 let mount_point = {
492 let mut mounts = self.mounts.write().await;
493 if let Some(live) = mounts.get_mut(mount_id) {
494 if let Some(ref mut session) = live.session {
496 session.unmount();
497 }
498 live.session.take();
499
500 live.config.mount_point.clone()
501 } else {
502 match FuseMount::get(*mount_id, &self.db).await {
504 Ok(Some(config)) => config.mount_point,
505 _ => return Ok(()),
506 }
507 }
508 };
509
510 self.unmount_path(&mount_point).await?;
512
513 FuseMount::update_status(*mount_id, MountStatus::Stopped, None, &self.db)
515 .await
516 .map_err(MountError::Database)?;
517
518 tracing::info!("Stopped mount {} at {}", mount_id, mount_point);
519
520 Ok(())
521 }
522
523 pub async fn stop_all(&self) -> Result<(), MountError> {
525 let mount_ids: Vec<Uuid> = {
526 let mounts = self.mounts.read().await;
527 mounts.keys().copied().collect()
528 };
529
530 for mount_id in mount_ids {
531 if let Err(e) = self.stop(&mount_id).await {
532 tracing::error!("Failed to stop mount {}: {}", mount_id, e);
533 }
534 }
535
536 Ok(())
537 }
538
539 pub async fn start_auto(&self) -> Result<(), MountError> {
541 let auto_mounts = FuseMount::auto_list(&self.db)
542 .await
543 .map_err(MountError::Database)?;
544
545 tracing::info!("Starting {} auto-mount(s)", auto_mounts.len());
546
547 for mount in auto_mounts {
548 if let Err(e) = self.start(&mount.mount_id).await {
549 tracing::error!(
550 "Failed to auto-mount {} at {}: {}",
551 mount.mount_id,
552 mount.mount_point,
553 e
554 );
555
556 let _ = FuseMount::update_status(
558 *mount.mount_id,
559 MountStatus::Error,
560 Some(&e.to_string()),
561 &self.db,
562 )
563 .await;
564 }
565 }
566
567 Ok(())
568 }
569
570 pub async fn get_live_mount(&self, mount_id: &Uuid) -> Option<Arc<RwLock<Mount>>> {
572 let mounts = self.mounts.read().await;
573 mounts.get(mount_id).map(|m| m.mount.clone())
574 }
575
576 pub async fn get_mount_cache(&self, mount_id: &Uuid) -> Option<FileCache> {
578 let mounts = self.mounts.read().await;
579 mounts.get(mount_id).map(|m| m.cache.clone())
580 }
581
582 async fn unmount_path(&self, mount_point: &str) -> Result<(), MountError> {
584 use std::process::Command;
585
586 #[cfg(target_os = "macos")]
587 {
588 let status = Command::new("umount")
589 .arg(mount_point)
590 .status()
591 .map_err(|e| MountError::UnmountFailed(e.to_string()))?;
592
593 if !status.success() {
594 let _ = Command::new("diskutil")
596 .args(["unmount", "force", mount_point])
597 .status();
598 }
599 }
600
601 #[cfg(target_os = "linux")]
602 {
603 let status = Command::new("fusermount")
604 .args(["-u", mount_point])
605 .status()
606 .map_err(|e| MountError::UnmountFailed(e.to_string()))?;
607
608 if !status.success() {
609 let _ = Command::new("fusermount")
611 .args(["-uz", mount_point])
612 .status();
613 }
614 }
615
616 #[cfg(not(any(target_os = "macos", target_os = "linux")))]
617 {
618 tracing::warn!("Unmount not implemented for this platform");
619 }
620
621 Ok(())
622 }
623}
624
625impl std::fmt::Debug for MountManager {
626 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
627 f.debug_struct("MountManager")
628 .field("peer_id", &self.peer.id())
629 .finish()
630 }
631}
632
633#[derive(Debug, thiserror::Error)]
635pub enum MountError {
636 #[error("database error: {0}")]
637 Database(#[from] sqlx::Error),
638
639 #[error("bucket not found: {0}")]
640 BucketNotFound(Uuid),
641
642 #[error("mount not found: {0}")]
643 MountNotFound(Uuid),
644
645 #[error("mount point not found: {0}")]
646 MountPointNotFound(String),
647
648 #[error("mount point is not a directory: {0}")]
649 MountPointNotDirectory(String),
650
651 #[error("mount already running: {0}")]
652 AlreadyRunning(Uuid),
653
654 #[error("failed to load bucket mount: {0}")]
655 MountLoad(#[source] anyhow::Error),
656
657 #[error("failed to spawn FUSE process: {0}")]
658 SpawnFailed(String),
659
660 #[error("failed to unmount: {0}")]
661 UnmountFailed(String),
662}