use crate::backend::BackendType;
use crate::cfg::{Cfg, InstanceRole};
use crate::core::file_cache::FILE_CACHE;
use crate::core::sync::AsyncRwLock;
use crate::storage::block_manager::BLOCK_INDEX_FILE;
use crate::storage::proto::folder_map::Item;
use crate::storage::proto::FolderMap;
use log::warn;
use prost::Message;
use reduct_base::error::ReductError;
use reduct_base::internal_server_error;
use std::io::SeekFrom::Start;
use std::io::{Read, Write};
use std::path::PathBuf;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(super) enum DiscoveryDepth {
FirstLevel,
Recursive,
}
pub(super) struct FolderKeeper {
path: PathBuf,
full_access: bool,
depth: DiscoveryDepth,
map: AsyncRwLock<FolderMap>,
}
impl FolderKeeper {
pub async fn new(path: PathBuf, cfg: &Cfg) -> Self {
Self::new_with_depth(path, cfg, DiscoveryDepth::Recursive).await
}
pub async fn new_with_depth(path: PathBuf, cfg: &Cfg, depth: DiscoveryDepth) -> Self {
let list_path = path.join(".folder");
let full_access = cfg.role != InstanceRole::Replica;
let proto = if cfg.cs_config.backend_type == BackendType::Filesystem {
let proto = Self::build_from_fs(&path, depth).await;
if full_access {
if let Err(err) =
Self::save_static(&list_path, &AsyncRwLock::new(proto.clone())).await
{
warn!("Failed to persist folder map at {:?}: {}", list_path, err);
}
}
proto
} else {
Self::read_or_build_map(&path, &list_path, full_access, depth).await
};
FolderKeeper {
path,
full_access,
depth,
map: AsyncRwLock::new(proto),
}
}
async fn read_or_build_map(
path: &PathBuf,
list_path: &PathBuf,
save_on_change: bool,
depth: DiscoveryDepth,
) -> FolderMap {
if FILE_CACHE.try_exists(list_path).await.unwrap_or(false) {
match Self::read_folder_map(list_path).await {
Ok(map) => map,
Err(err) => {
warn!(
"Failed to decode folder map at {:?}: {}. Rebuilding cache.",
list_path, err
);
Self::build_from_fs(path, depth).await
}
}
} else {
let proto = Self::build_from_fs(path, depth).await;
if save_on_change {
Self::save_static(list_path, &AsyncRwLock::new(proto.clone()))
.await
.expect("Failed to persist folder map");
}
proto
}
}
async fn read_folder_map(list_path: &PathBuf) -> Result<FolderMap, ReductError> {
let mut lock = FILE_CACHE.read(list_path, Start(0)).await?;
let mut buf = Vec::new();
lock.read_to_end(&mut buf)?;
FolderMap::decode(&buf[..]).map_err(|err| internal_server_error!("{}", err))
}
pub async fn list_folders(&self) -> Result<Vec<PathBuf>, ReductError> {
let mut folders = Vec::new();
for item in &self.map.read().await?.items {
if self.depth == DiscoveryDepth::FirstLevel
&& (item.folder_name.contains('/') || item.folder_name.contains('\\'))
{
continue;
}
let folder_path = self.path.join(&item.folder_name);
folders.push(folder_path);
}
Ok(folders)
}
pub async fn add_folder(&self, folder_name: &str) -> Result<(), ReductError> {
let folder_path = self.path.join(folder_name);
FILE_CACHE.create_dir_all(&folder_path).await?;
{
let mut map = self.map.write().await?;
let mut current = String::new();
for segment in folder_name.split('/') {
if !current.is_empty() {
current.push('/');
}
current.push_str(segment);
if !map.items.iter().any(|item| item.folder_name == current) {
map.items.push(Item {
name: current.to_string(),
folder_name: current.to_string(),
});
}
}
}
self.save().await
}
pub async fn remove_folder(&self, folder_name: &str) -> Result<(), ReductError> {
let folder_path = self.path.join(folder_name);
FILE_CACHE.remove_dir(&folder_path).await?;
{
let mut map = self.map.write().await?;
map.items.retain(|item| {
item.folder_name != folder_name
&& !item.folder_name.starts_with(&format!("{folder_name}/"))
});
}
self.save().await
}
pub async fn rename_folder(&self, old_name: &str, new_name: &str) -> Result<(), ReductError> {
let old_path = self.path.join(old_name);
let new_path = self.path.join(new_name);
FILE_CACHE.rename(&old_path, &new_path).await?;
{
let mut map = self.map.write().await?;
for item in map.items.iter_mut() {
if item.folder_name == old_name {
item.name = new_name.to_string();
item.folder_name = new_name.to_string();
} else if item.folder_name.starts_with(&format!("{old_name}/")) {
let suffix = &item.folder_name[old_name.len()..];
let renamed = format!("{new_name}{suffix}");
item.name = renamed.clone();
item.folder_name = renamed;
}
}
}
self.save().await
}
pub async fn reload(&self) -> Result<(), ReductError> {
let file_path = self.path.join(".folder"); FILE_CACHE.invalidate_local_cache_file(&file_path).await?;
let proto =
Self::read_or_build_map(&self.path, &self.path.join(".folder"), false, self.depth)
.await;
let mut map = self.map.write().await?;
*map = proto;
Ok(())
}
async fn save(&self) -> Result<(), ReductError> {
if !self.full_access {
return Ok(());
}
Self::save_static(&self.path.join(".folder"), &self.map).await?;
Ok(())
}
async fn save_static(path: &PathBuf, map: &AsyncRwLock<FolderMap>) -> Result<(), ReductError> {
let mut buf = Vec::new();
map.read()
.await?
.encode(&mut buf)
.map_err(|e| internal_server_error!("Failed to encode folder map: {}", e))?;
let mut lock = FILE_CACHE.write_or_create(path, Start(0)).await?;
lock.set_len(0)?; lock.write_all(&buf)?;
lock.sync_all().await?;
Ok(())
}
async fn build_from_fs(path: &PathBuf, depth: DiscoveryDepth) -> FolderMap {
if depth == DiscoveryDepth::FirstLevel {
let mut proto = FolderMap { items: vec![] };
for item in FILE_CACHE.read_dir(path).await.unwrap_or_default() {
if item.is_dir() {
let skip_dir = item
.file_name()
.and_then(|n| n.to_str())
.is_some_and(|name| name.starts_with('.'));
if !skip_dir {
if let Some(name) = item.file_name().and_then(|n| n.to_str()) {
proto.items.push(Item {
name: name.to_string(),
folder_name: name.to_string(),
});
}
}
}
}
proto
.items
.sort_by(|a, b| a.folder_name.cmp(&b.folder_name));
return proto;
}
let mut proto = FolderMap { items: vec![] };
let mut stack = vec![path.clone()];
while let Some(current) = stack.pop() {
let mut child_dirs = Vec::new();
for item in FILE_CACHE.read_dir(¤t).await.unwrap_or_default() {
if item.is_dir() {
let skip_dir = if let Some(name) = item.file_name().and_then(|n| n.to_str()) {
if name.starts_with('.') {
true
} else if name == "wal" {
!FILE_CACHE
.try_exists(¤t.join(".wal"))
.await
.unwrap_or(false)
} else {
false
}
} else {
false
};
if !skip_dir {
child_dirs.push(item);
}
}
}
let has_block_index = FILE_CACHE
.try_exists(¤t.join(BLOCK_INDEX_FILE))
.await
.unwrap_or(false);
if current != *path && (has_block_index || child_dirs.is_empty()) {
if let Ok(relative) = current.strip_prefix(path) {
let name = relative.to_string_lossy().replace('\\', "/");
proto.items.push(Item {
name: name.clone(),
folder_name: name,
});
}
}
stack.extend(child_dirs);
}
proto
.items
.sort_by(|a, b| a.folder_name.cmp(&b.folder_name));
proto
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backend::BackendType;
use crate::cfg::{Cfg, InstanceRole};
use crate::core::file_cache::FILE_CACHE;
use crate::storage::block_manager::BLOCK_INDEX_FILE;
use rstest::{fixture, rstest};
use std::io::SeekFrom;
use tempfile::tempdir;
#[fixture]
pub async fn path() -> PathBuf {
let path = tempdir().unwrap().keep();
path
}
#[rstest]
#[tokio::test]
async fn reads_folder_map_from_cache_for_non_filesystem_backend(#[future] path: PathBuf) {
let path = path.await;
let base_path = path.join("s3_bucket");
FILE_CACHE.create_dir_all(&base_path).await.unwrap();
FILE_CACHE
.create_dir_all(&base_path.join("entry_1"))
.await
.unwrap();
let list_path = base_path.join(".folder");
let cached_map = FolderMap {
items: vec![Item {
name: "cached_entry".to_string(),
folder_name: "cached_entry".to_string(),
}],
};
FolderKeeper::save_static(&list_path, &AsyncRwLock::new(cached_map))
.await
.unwrap();
let mut cfg = Cfg::default();
cfg.cs_config.backend_type = BackendType::S3;
let keeper = FolderKeeper::new(base_path.clone(), &cfg).await;
let folders = keeper.list_folders().await.unwrap();
assert!(
folders.iter().any(|path| path.ends_with("cached_entry")),
"Should read cached_entry from .folder file"
);
assert!(
!folders.iter().any(|path| path.ends_with("entry_1")),
"Should not include entry_1 from filesystem scan"
);
}
#[rstest]
#[tokio::test]
async fn builds_folder_map_when_cache_missing_for_non_filesystem_backend(
#[future] path: PathBuf,
) {
let path = path.await;
let base_path = path.join("s3_bucket_no_cache");
FILE_CACHE.create_dir_all(&base_path).await.unwrap();
FILE_CACHE
.create_dir_all(&base_path.join("entry_from_fs"))
.await
.unwrap();
let mut cfg = Cfg::default();
cfg.cs_config.backend_type = BackendType::S3;
let keeper = FolderKeeper::new(base_path.clone(), &cfg).await;
let folders = keeper.list_folders().await.unwrap();
assert!(
folders.iter().any(|path| path.ends_with("entry_from_fs")),
"Should rebuild from filesystem when cache is missing"
);
let list_path = base_path.join(".folder");
assert!(
FILE_CACHE.try_exists(&list_path).await.unwrap_or(false),
".folder should be created after rebuild"
);
}
#[rstest]
#[tokio::test]
async fn scans_nested_entry_paths_for_filesystem_backend(#[future] path: PathBuf) {
let path = path.await;
let base_path = path.join("fs_bucket_nested");
FILE_CACHE.create_dir_all(&base_path).await.unwrap();
FILE_CACHE
.create_dir_all(&base_path.join("entry").join("a"))
.await
.unwrap();
FILE_CACHE
.write_or_create(
&base_path.join("entry").join("a").join(BLOCK_INDEX_FILE),
SeekFrom::Start(0),
)
.await
.unwrap();
let mut cfg = Cfg::default();
cfg.cs_config.backend_type = BackendType::Filesystem;
let keeper = FolderKeeper::new(base_path.clone(), &cfg).await;
let folders = keeper.list_folders().await.unwrap();
assert!(
folders.iter().any(|path| path.ends_with("entry/a")),
"Should include nested entry path"
);
}
#[rstest]
#[tokio::test]
async fn ignores_dot_wal_but_not_regular_wal_dirs_for_filesystem_backend(
#[future] path: PathBuf,
) {
let path = path.await;
let base_path = path.join("fs_bucket_ignore_wal");
FILE_CACHE.create_dir_all(&base_path).await.unwrap();
FILE_CACHE
.create_dir_all(&base_path.join("entry").join(".wal"))
.await
.unwrap();
FILE_CACHE
.create_dir_all(&base_path.join("entry").join("wal"))
.await
.unwrap();
let mut cfg = Cfg::default();
cfg.cs_config.backend_type = BackendType::Filesystem;
let keeper = FolderKeeper::new(base_path.clone(), &cfg).await;
let folders = keeper.list_folders().await.unwrap();
assert!(
!folders.iter().any(|path| path.ends_with("entry/.wal")),
"Should ignore internal .wal directory"
);
assert!(
folders.iter().any(|path| path.ends_with("entry/wal")),
"Regular wal directory is not reserved and should be discoverable"
);
}
#[rstest]
#[tokio::test]
async fn scans_parent_and_nested_entries_when_both_have_index(#[future] path: PathBuf) {
let path = path.await;
let base_path = path.join("fs_bucket_parent_and_nested");
FILE_CACHE.create_dir_all(&base_path).await.unwrap();
FILE_CACHE
.create_dir_all(&base_path.join("entry"))
.await
.unwrap();
FILE_CACHE
.write_or_create(
&base_path.join("entry").join(BLOCK_INDEX_FILE),
SeekFrom::Start(0),
)
.await
.unwrap();
FILE_CACHE
.create_dir_all(&base_path.join("entry").join("a"))
.await
.unwrap();
FILE_CACHE
.write_or_create(
&base_path.join("entry").join("a").join(BLOCK_INDEX_FILE),
SeekFrom::Start(0),
)
.await
.unwrap();
let mut cfg = Cfg::default();
cfg.cs_config.backend_type = BackendType::Filesystem;
let keeper = FolderKeeper::new(base_path.clone(), &cfg).await;
let folders = keeper.list_folders().await.unwrap();
assert!(folders.iter().any(|p| p.ends_with("entry")));
assert!(folders.iter().any(|p| p.ends_with("entry/a")));
}
#[rstest]
#[tokio::test]
async fn ignores_legacy_wal_dir_when_dot_wal_is_missing(#[future] path: PathBuf) {
let path = path.await;
let base_path = path.join("fs_bucket_legacy_wal");
FILE_CACHE.create_dir_all(&base_path).await.unwrap();
FILE_CACHE
.create_dir_all(&base_path.join("entry").join("wal"))
.await
.unwrap();
let mut cfg = Cfg::default();
cfg.cs_config.backend_type = BackendType::Filesystem;
let keeper = FolderKeeper::new(base_path.clone(), &cfg).await;
let folders = keeper.list_folders().await.unwrap();
assert!(
!folders.iter().any(|path| path.ends_with("entry/wal")),
"Should ignore legacy internal wal directory when .wal is absent"
);
}
#[rstest]
#[tokio::test]
async fn first_level_discovery_filters_nested_paths(#[future] path: PathBuf) {
let path = path.await;
let base_path = path.join("fs_first_level_only");
FILE_CACHE.create_dir_all(&base_path).await.unwrap();
FILE_CACHE
.create_dir_all(&base_path.join("bucket-1"))
.await
.unwrap();
FILE_CACHE
.create_dir_all(&base_path.join("bucket-1").join("entry"))
.await
.unwrap();
let mut cfg = Cfg::default();
cfg.cs_config.backend_type = BackendType::Filesystem;
let keeper =
FolderKeeper::new_with_depth(base_path.clone(), &cfg, DiscoveryDepth::FirstLevel).await;
let folders = keeper.list_folders().await.unwrap();
assert!(folders.iter().any(|p| p.ends_with("bucket-1")));
assert!(!folders.iter().any(|p| p.ends_with("bucket-1/entry")));
}
#[rstest]
#[tokio::test]
async fn ignores_invalid_folder_map(#[future] path: PathBuf) {
let path = path.await;
let base_path = path.join("bucket");
FILE_CACHE.create_dir_all(&base_path).await.unwrap();
{
let list_path = base_path.join(".folder");
let mut lock = FILE_CACHE
.write_or_create(&list_path, SeekFrom::Start(0))
.await
.unwrap();
lock.write_all(b"invalid-folder-map").unwrap();
lock.flush().unwrap()
};
let cfg = Cfg::default();
let keeper = FolderKeeper::new(base_path.clone(), &cfg).await;
let folders = keeper.list_folders().await.unwrap();
assert!(folders.is_empty());
}
#[rstest]
#[rstest]
#[tokio::test]
async fn does_not_persist_folder_map_in_replica_mode(#[future] path: PathBuf) {
let path = path.await;
let base_path = path.join("replica_bucket");
FILE_CACHE.create_dir_all(&base_path).await.unwrap();
let mut cfg = Cfg::default();
cfg.role = InstanceRole::Replica;
let _ = FolderKeeper::new(base_path.clone(), &cfg).await;
let list_path = base_path.join(".folder");
assert!(
!FILE_CACHE.try_exists(&list_path).await.unwrap_or(false),
".folder should not be created in replica mode"
);
}
#[rstest]
#[tokio::test]
async fn rebuilds_folder_map_from_fs_for_filesystem_backend(#[future] path: PathBuf) {
let path = path.await;
let base_path = path.join("bucket");
FILE_CACHE.create_dir_all(&base_path).await.unwrap();
let list_path = base_path.join(".folder");
let empty_map = FolderMap { items: vec![] };
FolderKeeper::save_static(&list_path, &AsyncRwLock::new(empty_map))
.await
.unwrap();
FILE_CACHE
.create_dir_all(&base_path.join("entry_1"))
.await
.unwrap();
let cfg = Cfg::default();
let keeper = FolderKeeper::new(base_path.clone(), &cfg).await;
let folders = keeper.list_folders().await.unwrap();
assert!(folders.iter().any(|path| path.ends_with("entry_1")));
}
#[rstest]
#[tokio::test]
async fn add_folder_adds_parent_prefixes(#[future] path: PathBuf) {
let path = path.await;
let base_path = path.join("bucket_add_parents");
FILE_CACHE.create_dir_all(&base_path).await.unwrap();
let keeper = FolderKeeper::new(base_path.clone(), &Cfg::default()).await;
keeper.add_folder("a/b/c").await.unwrap();
let folders = keeper.list_folders().await.unwrap();
assert!(folders.iter().any(|p| p.ends_with("a")));
assert!(folders.iter().any(|p| p.ends_with("a/b")));
assert!(folders.iter().any(|p| p.ends_with("a/b/c")));
}
#[rstest]
#[tokio::test]
async fn rename_folder_renames_descendants(#[future] path: PathBuf) {
let path = path.await;
let base_path = path.join("bucket_rename_descendants");
FILE_CACHE.create_dir_all(&base_path).await.unwrap();
let keeper = FolderKeeper::new(base_path.clone(), &Cfg::default()).await;
keeper.add_folder("a/b/c").await.unwrap();
keeper.rename_folder("a", "renamed").await.unwrap();
let folders = keeper.list_folders().await.unwrap();
assert!(!folders.iter().any(|p| p.ends_with("a")));
assert!(!folders.iter().any(|p| p.ends_with("a/b")));
assert!(!folders.iter().any(|p| p.ends_with("a/b/c")));
assert!(folders.iter().any(|p| p.ends_with("renamed")));
assert!(folders.iter().any(|p| p.ends_with("renamed/b")));
assert!(folders.iter().any(|p| p.ends_with("renamed/b/c")));
}
#[rstest]
#[tokio::test]
async fn remove_folder_removes_descendants(#[future] path: PathBuf) {
let path = path.await;
let base_path = path.join("bucket_remove_descendants");
FILE_CACHE.create_dir_all(&base_path).await.unwrap();
let keeper = FolderKeeper::new(base_path.clone(), &Cfg::default()).await;
keeper.add_folder("a/b/c").await.unwrap();
keeper.remove_folder("a").await.unwrap();
let folders = keeper.list_folders().await.unwrap();
assert!(!folders.iter().any(|p| p.ends_with("a")));
assert!(!folders.iter().any(|p| p.ends_with("a/b")));
assert!(!folders.iter().any(|p| p.ends_with("a/b/c")));
}
}