mountpoint_s3_fs/
config.rs1use anyhow::Context as _;
2use futures::executor::block_on;
3use mountpoint_s3_client::ObjectClient;
4
5use crate::data_cache::{DataCacheConfig, DiskDataCache, ExpressDataCache, MultilevelDataCache};
6use crate::fuse::config::FuseSessionConfig;
7use crate::fuse::session::FuseSession;
8use crate::fuse::{ErrorLogger, S3FuseFilesystem};
9use crate::memory::PagedPool;
10use crate::metablock::Metablock;
11use crate::prefetch::{Prefetcher, PrefetcherBuilder};
12use crate::sync::Arc;
13use crate::{Runtime, S3Filesystem, S3FilesystemConfig};
14
15#[derive(Debug)]
17pub struct MountpointConfig {
18 fuse_session_config: FuseSessionConfig,
19 data_cache_config: DataCacheConfig,
20 filesystem_config: S3FilesystemConfig,
21 error_logger: Option<Box<dyn ErrorLogger + Send + Sync>>,
22}
23
24impl MountpointConfig {
25 pub fn new(
26 fuse_session_config: FuseSessionConfig,
27 filesystem_config: S3FilesystemConfig,
28 data_cache_config: DataCacheConfig,
29 ) -> Self {
30 Self {
31 fuse_session_config,
32 data_cache_config,
33 filesystem_config,
34 error_logger: None,
35 }
36 }
37
38 pub fn error_logger(mut self, error_logger: impl ErrorLogger + Send + Sync + 'static) -> Self {
40 self.error_logger = Some(Box::new(error_logger));
41 self
42 }
43
44 pub fn create_fuse_session<Client>(
46 self,
47 metablock: impl Metablock + 'static,
48 client: Client,
49 runtime: Runtime,
50 memory_pool: PagedPool,
51 ) -> anyhow::Result<FuseSession>
52 where
53 Client: ObjectClient + Clone + Send + Sync + 'static,
54 {
55 let prefetcher_builder =
56 create_prefetcher_builder(self.data_cache_config, &client, &runtime, memory_pool.clone())?;
57 tracing::trace!(filesystem_config=?self.filesystem_config, "creating file system");
58 let fs = S3Filesystem::new(
59 client,
60 prefetcher_builder,
61 memory_pool,
62 runtime,
63 metablock,
64 self.filesystem_config,
65 );
66
67 let fuse_fs = S3FuseFilesystem::new(fs, self.error_logger);
68 let session = FuseSession::new(fuse_fs, self.fuse_session_config)?;
69 ctrlc::set_handler(session.shutdown_fn()).context("failed to set interrupt handler")?;
70 Ok(session)
71 }
72}
73
74fn create_prefetcher_builder<Client>(
75 data_cache_config: DataCacheConfig,
76 client: &Client,
77 runtime: &Runtime,
78 memory_pool: PagedPool,
79) -> anyhow::Result<PrefetcherBuilder<Client>>
80where
81 Client: ObjectClient + Clone + Send + Sync + 'static,
82{
83 let disk_cache = data_cache_config
84 .disk_cache_config
85 .map(|config| DiskDataCache::new(config, memory_pool));
86 let express_cache = match data_cache_config.express_cache_config {
87 None => None,
88 Some(config) => {
89 let cache_bucket_name = config.bucket_name.clone();
90 let express_cache = ExpressDataCache::new(client.clone(), config);
91 block_on(express_cache.verify_cache_valid())
92 .with_context(|| format!("initial PutObject failed for shared cache bucket {cache_bucket_name}"))?;
93 Some(express_cache)
94 }
95 };
96 let client = client.clone();
97 let builder = match (disk_cache, express_cache) {
98 (None, Some(express_cache)) => Prefetcher::caching_builder(express_cache, client),
99 (Some(disk_cache), None) => Prefetcher::caching_builder(disk_cache, client),
100 (Some(disk_cache), Some(express_cache)) => {
101 let cache = MultilevelDataCache::new(Arc::new(disk_cache), express_cache, runtime.clone());
102 Prefetcher::caching_builder(cache, client)
103 }
104 _ => Prefetcher::default_builder(client),
105 };
106 Ok(builder)
107}