cobble 0.1.0

A flexible embedded key-value storage engine for distributed systems as well as single-node applications.
Documentation
use crate::VolumeDescriptor;
use crate::error::Result;
use crate::file::files::{RandomAccessFile, SequentialWriteFile};
use crate::file::opendal_fs::OpendalFileSystem;
#[cfg(unix)]
use crate::file::posix_fs::PosixFileSystem;
use crate::util::normalize_storage_path_to_url;
use dashmap::DashMap;
use std::sync::Arc;
use url::Url;

pub trait FileSystem: Send + Sync {
    fn init(url: &Url, access_id: Option<String>, access_key: Option<String>) -> Result<Self>
    where
        Self: Sized;

    fn create_dir(&self, path: &str) -> Result<()>;

    fn exists(&self, path: &str) -> Result<bool>;

    fn delete(&self, path: &str) -> Result<()>;

    fn delete_async(&self, path: &str) -> Result<()>;

    fn rename(&self, from: &str, to: &str) -> Result<()>;

    fn list(&self, path: &str) -> Result<Vec<String>>;

    fn open_read(&self, path: &str) -> Result<Box<dyn RandomAccessFile>>;

    fn open_write(&self, path: &str) -> Result<Box<dyn SequentialWriteFile>>;

    fn last_modified(&self, path: &str) -> Result<Option<u64>>;
}

pub struct FileSystemRegistry {
    registered: DashMap<String, Arc<dyn FileSystem>>,
}

impl FileSystemRegistry {
    pub fn new() -> Self {
        Self {
            registered: DashMap::new(),
        }
    }

    pub fn get_or_register(&self, name: impl AsRef<str>) -> Result<Arc<dyn FileSystem>> {
        let normalized = normalize_storage_path_to_url(name.as_ref())?;
        let url = Url::parse(&normalized)?;
        let name = url.to_string();
        if let Some(fs) = self.registered.get(&name) {
            return Ok(Arc::clone(&fs));
        }

        #[cfg(unix)]
        if url.scheme().eq_ignore_ascii_case("file") {
            let fs: Arc<dyn FileSystem> = Arc::new(PosixFileSystem::init(&url, None, None)?);
            self.registered.insert(name.clone(), Arc::clone(&fs));
            return Ok(fs);
        }

        let fs: Arc<dyn FileSystem> = Arc::new(OpendalFileSystem::init(&url, None, None)?);
        self.registered.insert(name.clone(), Arc::clone(&fs));
        Ok(fs)
    }

    pub fn get_or_register_volume(&self, volume: &VolumeDescriptor) -> Result<Arc<dyn FileSystem>> {
        let normalized = normalize_storage_path_to_url(&volume.base_dir)?;
        let url = Url::parse(&normalized)?;
        let name = url.to_string();
        if let Some(fs) = self.registered.get(&name) {
            return Ok(Arc::clone(&fs));
        }

        #[cfg(unix)]
        if url.scheme().eq_ignore_ascii_case("file") {
            let fs: Arc<dyn FileSystem> = Arc::new(PosixFileSystem::init(
                &url,
                volume.access_id.clone(),
                volume.secret_key.clone(),
            )?);
            self.registered.insert(name.clone(), Arc::clone(&fs));
            return Ok(fs);
        }

        let fs: Arc<dyn FileSystem> = Arc::new(OpendalFileSystem::init(
            &url,
            volume.access_id.clone(),
            volume.secret_key.clone(),
        )?);
        self.registered.insert(name.clone(), Arc::clone(&fs));
        Ok(fs)
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use std::path::PathBuf;
    #[test]
    fn test_filesystem_registry() {
        let registry = FileSystemRegistry::new();
        let fs1 = registry.get_or_register("file:///tmp/checkpoint");
        assert!(fs1.is_ok());
        let fs2 = registry.get_or_register("file:///tmp/checkpoint");
        assert!(fs2.is_ok());
        assert!(Arc::ptr_eq(&fs1.unwrap(), &fs2.unwrap()));
    }

    #[test]
    fn test_filesystem_registry_normalizes_absolute_local_path() {
        let registry = FileSystemRegistry::new();
        let path = std::env::temp_dir().join("cobble-filesystem-registry-normalize");
        let path_str = path.to_string_lossy().to_string();
        let url = url::Url::from_file_path(PathBuf::from(&path_str))
            .expect("absolute local path should convert to file URL")
            .to_string();
        let from_path = registry.get_or_register(&path_str).unwrap();
        let from_url = registry.get_or_register(&url).unwrap();
        assert!(Arc::ptr_eq(&from_path, &from_url));
    }
}