mountpoint_s3_fs/
config.rs

1use anyhow::Context as _;
2use futures::executor::block_on;
3use mountpoint_s3_client::ObjectClient;
4
5use crate::data_cache::{DataCacheConfig, DiskDataCache, ExpressDataCache, MultilevelDataCache};
6use crate::fuse::config::FuseSessionConfig;
7use crate::fuse::session::FuseSession;
8use crate::fuse::{ErrorLogger, S3FuseFilesystem};
9use crate::memory::PagedPool;
10use crate::metablock::Metablock;
11use crate::prefetch::{Prefetcher, PrefetcherBuilder};
12use crate::sync::Arc;
13use crate::{Runtime, S3Filesystem, S3FilesystemConfig};
14
15/// Configuration for a Mountpoint session
16#[derive(Debug)]
17pub struct MountpointConfig {
18    fuse_session_config: FuseSessionConfig,
19    data_cache_config: DataCacheConfig,
20    filesystem_config: S3FilesystemConfig,
21    error_logger: Option<Box<dyn ErrorLogger + Send + Sync>>,
22}
23
24impl MountpointConfig {
25    pub fn new(
26        fuse_session_config: FuseSessionConfig,
27        filesystem_config: S3FilesystemConfig,
28        data_cache_config: DataCacheConfig,
29    ) -> Self {
30        Self {
31            fuse_session_config,
32            data_cache_config,
33            filesystem_config,
34            error_logger: None,
35        }
36    }
37
38    /// Set the [Self::error_logger] field
39    pub fn error_logger(mut self, error_logger: impl ErrorLogger + Send + Sync + 'static) -> Self {
40        self.error_logger = Some(Box::new(error_logger));
41        self
42    }
43
44    /// Create a new FUSE session
45    pub fn create_fuse_session<Client>(
46        self,
47        metablock: impl Metablock + 'static,
48        client: Client,
49        runtime: Runtime,
50        memory_pool: PagedPool,
51    ) -> anyhow::Result<FuseSession>
52    where
53        Client: ObjectClient + Clone + Send + Sync + 'static,
54    {
55        let prefetcher_builder =
56            create_prefetcher_builder(self.data_cache_config, &client, &runtime, memory_pool.clone())?;
57        tracing::trace!(filesystem_config=?self.filesystem_config, "creating file system");
58        let fs = S3Filesystem::new(
59            client,
60            prefetcher_builder,
61            memory_pool,
62            runtime,
63            metablock,
64            self.filesystem_config,
65        );
66
67        let fuse_fs = S3FuseFilesystem::new(fs, self.error_logger);
68        let session = FuseSession::new(fuse_fs, self.fuse_session_config)?;
69        ctrlc::set_handler(session.shutdown_fn()).context("failed to set interrupt handler")?;
70        Ok(session)
71    }
72}
73
74fn create_prefetcher_builder<Client>(
75    data_cache_config: DataCacheConfig,
76    client: &Client,
77    runtime: &Runtime,
78    memory_pool: PagedPool,
79) -> anyhow::Result<PrefetcherBuilder<Client>>
80where
81    Client: ObjectClient + Clone + Send + Sync + 'static,
82{
83    let disk_cache = data_cache_config
84        .disk_cache_config
85        .map(|config| DiskDataCache::new(config, memory_pool));
86    let express_cache = match data_cache_config.express_cache_config {
87        None => None,
88        Some(config) => {
89            let cache_bucket_name = config.bucket_name.clone();
90            let express_cache = ExpressDataCache::new(client.clone(), config);
91            block_on(express_cache.verify_cache_valid())
92                .with_context(|| format!("initial PutObject failed for shared cache bucket {cache_bucket_name}"))?;
93            Some(express_cache)
94        }
95    };
96    let client = client.clone();
97    let builder = match (disk_cache, express_cache) {
98        (None, Some(express_cache)) => Prefetcher::caching_builder(express_cache, client),
99        (Some(disk_cache), None) => Prefetcher::caching_builder(disk_cache, client),
100        (Some(disk_cache), Some(express_cache)) => {
101            let cache = MultilevelDataCache::new(Arc::new(disk_cache), express_cache, runtime.clone());
102            Prefetcher::caching_builder(cache, client)
103        }
104        _ => Prefetcher::default_builder(client),
105    };
106    Ok(builder)
107}