rhei-datafusion 2.0.0

DataFusion OLAP backend for Rhei HTAP engine
Documentation
//! Pluggable storage modes for the DataFusion OLAP engine.
//!
//! - `InMemory`: default, data stored as `Vec<RecordBatch>` (lost on shutdown)
//! - `Vortex { url }`: durable Vortex columnar storage, auto-routing between
//!   local filesystem and S3-compatible object store based on the URL scheme.
//!   - Local: `file://...`, `/abs/path`, `./rel/path`, or schemeless paths
//!   - S3-compatible: `s3://bucket/prefix` (requires `cloud-storage` feature)

use std::path::PathBuf;

/// Storage mode for the DataFusion OLAP engine.
#[derive(Debug, Clone, Default)]
pub enum StorageMode {
    /// In-memory storage (default). Data is lost on shutdown.
    #[default]
    InMemory,
    /// Vortex columnar file storage — local filesystem or S3-compatible.
    ///
    /// The URL scheme determines the backend:
    ///
    /// - **Local** (`file://...`, `/abs/path`, `./rel/path`, or schemeless):
    ///   data is written as `.vortex` files under `<resolved_base>/<table>/`.
    ///   Survives restarts.
    ///
    /// - **S3-compatible** (`s3://bucket/prefix`, requires `cloud-storage`
    ///   feature): data is written as `.vortex` objects under
    ///   `<url>/<table>/`. Credentials are resolved from environment variables
    ///   (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_REGION`, etc.)
    ///   following `object_store` conventions. S3-compatible services (MinIO,
    ///   Cloudflare R2, Wasabi, Ceph RGW) work via `AWS_ENDPOINT_URL`.
    ///
    /// Any other scheme (e.g. `gs://`) is rejected at construction time with a
    /// clear error.
    Vortex {
        /// Filesystem path or object-store URL for the base storage location.
        url: String,
    },
}

/// Internal classification of a Vortex URL: local path or S3-compatible.
#[derive(Debug, Clone)]
pub enum VortexLocation {
    /// Resolved base directory for local storage.
    Local { base_path: PathBuf },
    /// S3-compatible URL (bucket + prefix).
    #[cfg(feature = "cloud-storage")]
    S3 { url: String },
}

impl StorageMode {
    /// Returns `true` if this mode uses S3-compatible cloud object storage.
    ///
    /// Always `false` when the `cloud-storage` feature is not enabled.
    pub fn is_cloud(&self) -> bool {
        match self {
            Self::InMemory => false,
            Self::Vortex { url } => {
                let _ = url; // avoid unused-var warning on non-cloud builds
                #[cfg(feature = "cloud-storage")]
                {
                    url.starts_with("s3://")
                }
                #[cfg(not(feature = "cloud-storage"))]
                {
                    false
                }
            }
        }
    }

    /// Classify the Vortex URL and return a [`VortexLocation`].
    ///
    /// Returns `None` for [`StorageMode::InMemory`].
    ///
    /// # Errors
    ///
    /// Returns `Err` if the URL scheme is unsupported (e.g. `gs://`).
    pub fn classify(&self) -> Result<Option<VortexLocation>, String> {
        match self {
            Self::InMemory => Ok(None),
            Self::Vortex { url } => {
                if url.starts_with("s3://") {
                    #[cfg(feature = "cloud-storage")]
                    {
                        return Ok(Some(VortexLocation::S3 { url: url.clone() }));
                    }
                    #[cfg(not(feature = "cloud-storage"))]
                    {
                        return Err(format!(
                            "S3 URL '{url}' requires the 'cloud-storage' feature"
                        ));
                    }
                }
                // Reject unsupported schemes (gs://, az://, http://, https://, etc.).
                // Only `file://` is acceptable for local paths — anything else with a
                // `://` separator is silently classified as a remote backend that we
                // don't support. Bailing out at config-parse time prevents
                // misconfigurations from creating local directories named after the
                // URL string (e.g. `./https:` from `https://...`).
                if let Some(colon) = url.find("://") {
                    let scheme = &url[..colon];
                    if scheme != "file" {
                        return Err(format!(
                            "unsupported storage URL scheme '{scheme}://' in '{url}'. \
                             Use a local path, 'file://', or (with cloud-storage feature) 's3://'"
                        ));
                    }
                }
                // Local: strip file:// prefix if present, canonicalize to PathBuf.
                let path = if let Some(rest) = url.strip_prefix("file://") {
                    PathBuf::from(rest)
                } else {
                    PathBuf::from(url)
                };
                Ok(Some(VortexLocation::Local { base_path: path }))
            }
        }
    }

    /// Returns the local base path for local Vortex storage, or `None`
    /// for in-memory and cloud modes.
    pub fn local_base_path(&self) -> Option<PathBuf> {
        match self.classify() {
            Ok(Some(VortexLocation::Local { base_path })) => Some(base_path),
            _ => None,
        }
    }

    /// Returns the cloud base URL for S3-backed Vortex storage, or `None`
    /// for in-memory and local modes.
    ///
    /// Available on crate feature `cloud-storage` only.
    #[cfg(feature = "cloud-storage")]
    pub fn cloud_base_url(&self) -> Option<&str> {
        if let Self::Vortex { url } = self {
            if url.starts_with("s3://") {
                return Some(url.as_str());
            }
        }
        None
    }

    /// Returns the file extension for data files in this storage mode.
    ///
    /// - `InMemory`: `""` (no files written)
    /// - `Vortex`: `"vortex"`
    pub fn file_extension(&self) -> &'static str {
        match self {
            Self::InMemory => "",
            Self::Vortex { .. } => "vortex",
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn inmemory_is_not_cloud() {
        assert!(!StorageMode::InMemory.is_cloud());
    }

    #[test]
    fn inmemory_classify_is_none() {
        assert!(StorageMode::InMemory.classify().unwrap().is_none());
    }

    #[test]
    fn vortex_local_abs_path() {
        let mode = StorageMode::Vortex {
            url: "/tmp/rhei-data".to_string(),
        };
        assert!(!mode.is_cloud());
        assert_eq!(mode.file_extension(), "vortex");
        match mode.classify().unwrap().unwrap() {
            VortexLocation::Local { base_path } => {
                assert_eq!(base_path, PathBuf::from("/tmp/rhei-data"));
            }
            #[cfg(feature = "cloud-storage")]
            VortexLocation::S3 { .. } => panic!("expected Local"),
        }
    }

    #[test]
    fn vortex_local_rel_path() {
        let mode = StorageMode::Vortex {
            url: "./data/olap".to_string(),
        };
        match mode.classify().unwrap().unwrap() {
            VortexLocation::Local { base_path } => {
                assert_eq!(base_path, PathBuf::from("./data/olap"));
            }
            #[cfg(feature = "cloud-storage")]
            VortexLocation::S3 { .. } => panic!("expected Local"),
        }
    }

    #[test]
    fn vortex_file_url() {
        let mode = StorageMode::Vortex {
            url: "file:///tmp/rhei-vortex".to_string(),
        };
        match mode.classify().unwrap().unwrap() {
            VortexLocation::Local { base_path } => {
                assert_eq!(base_path, PathBuf::from("/tmp/rhei-vortex"));
            }
            #[cfg(feature = "cloud-storage")]
            VortexLocation::S3 { .. } => panic!("expected Local"),
        }
    }

    #[test]
    fn vortex_unsupported_scheme_rejects() {
        // gs://, http://, https://, az://, azure://, ftp://, anything-but-`s3` and
        // `file` must error at parse time. http/https in particular are easy
        // misconfigurations (someone pastes a console URL) — silently treating
        // them as local paths would create directories named e.g. `./https:`
        // from `https://example.com/data`.
        for url in [
            "gs://my-bucket/prefix",
            "http://example.com/data",
            "https://example.com/data",
            "az://account/container",
            "azure://account/container",
            "ftp://server/path",
        ] {
            let mode = StorageMode::Vortex { url: url.into() };
            let err = mode
                .classify()
                .expect_err(&format!("expected reject for '{url}', got Ok"));
            assert!(
                err.contains("unsupported storage URL scheme"),
                "expected 'unsupported storage URL scheme' in error for '{url}', got: {err}"
            );
        }
    }

    #[cfg(feature = "cloud-storage")]
    #[test]
    fn vortex_s3_url() {
        let mode = StorageMode::Vortex {
            url: "s3://my-bucket/rhei-data".to_string(),
        };
        assert!(mode.is_cloud());
        assert_eq!(mode.file_extension(), "vortex");
        assert_eq!(mode.cloud_base_url(), Some("s3://my-bucket/rhei-data"));
        match mode.classify().unwrap().unwrap() {
            VortexLocation::S3 { url } => assert_eq!(url, "s3://my-bucket/rhei-data"),
            VortexLocation::Local { .. } => panic!("expected S3"),
        }
    }

    #[test]
    fn vortex_file_extension() {
        assert_eq!(StorageMode::InMemory.file_extension(), "");
        assert_eq!(
            StorageMode::Vortex {
                url: "/tmp/test".to_string()
            }
            .file_extension(),
            "vortex"
        );
    }
}