Skip to main content

rhei_datafusion/
storage.rs

1//! Pluggable storage modes for the DataFusion OLAP engine.
2//!
3//! - `InMemory`: default, data stored as `Vec<RecordBatch>` (lost on shutdown)
4//! - `Vortex { url }`: durable Vortex columnar storage, auto-routing between
5//!   local filesystem and S3-compatible object store based on the URL scheme.
6//!   - Local: `file://...`, `/abs/path`, `./rel/path`, or schemeless paths
7//!   - S3-compatible: `s3://bucket/prefix` (requires `cloud-storage` feature)
8
9use std::path::PathBuf;
10
11/// Storage mode for the DataFusion OLAP engine.
12#[derive(Debug, Clone, Default)]
13pub enum StorageMode {
14    /// In-memory storage (default). Data is lost on shutdown.
15    #[default]
16    InMemory,
17    /// Vortex columnar file storage — local filesystem or S3-compatible.
18    ///
19    /// The URL scheme determines the backend:
20    ///
21    /// - **Local** (`file://...`, `/abs/path`, `./rel/path`, or schemeless):
22    ///   data is written as `.vortex` files under `<resolved_base>/<table>/`.
23    ///   Survives restarts.
24    ///
25    /// - **S3-compatible** (`s3://bucket/prefix`, requires `cloud-storage`
26    ///   feature): data is written as `.vortex` objects under
27    ///   `<url>/<table>/`. Credentials are resolved from environment variables
28    ///   (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_REGION`, etc.)
29    ///   following `object_store` conventions. S3-compatible services (MinIO,
30    ///   Cloudflare R2, Wasabi, Ceph RGW) work via `AWS_ENDPOINT_URL`.
31    ///
32    /// Any other scheme (e.g. `gs://`) is rejected at construction time with a
33    /// clear error.
34    Vortex {
35        /// Filesystem path or object-store URL for the base storage location.
36        url: String,
37    },
38}
39
40/// Internal classification of a Vortex URL: local path or S3-compatible.
41#[derive(Debug, Clone)]
42pub enum VortexLocation {
43    /// Resolved base directory for local storage.
44    Local { base_path: PathBuf },
45    /// S3-compatible URL (bucket + prefix).
46    #[cfg(feature = "cloud-storage")]
47    S3 { url: String },
48}
49
50impl StorageMode {
51    /// Returns `true` if this mode uses S3-compatible cloud object storage.
52    ///
53    /// Always `false` when the `cloud-storage` feature is not enabled.
54    pub fn is_cloud(&self) -> bool {
55        match self {
56            Self::InMemory => false,
57            Self::Vortex { url } => {
58                let _ = url; // avoid unused-var warning on non-cloud builds
59                #[cfg(feature = "cloud-storage")]
60                {
61                    url.starts_with("s3://")
62                }
63                #[cfg(not(feature = "cloud-storage"))]
64                {
65                    false
66                }
67            }
68        }
69    }
70
71    /// Classify the Vortex URL and return a [`VortexLocation`].
72    ///
73    /// Returns `None` for [`StorageMode::InMemory`].
74    ///
75    /// # Errors
76    ///
77    /// Returns `Err` if the URL scheme is unsupported (e.g. `gs://`).
78    pub fn classify(&self) -> Result<Option<VortexLocation>, String> {
79        match self {
80            Self::InMemory => Ok(None),
81            Self::Vortex { url } => {
82                if url.starts_with("s3://") {
83                    #[cfg(feature = "cloud-storage")]
84                    {
85                        return Ok(Some(VortexLocation::S3 { url: url.clone() }));
86                    }
87                    #[cfg(not(feature = "cloud-storage"))]
88                    {
89                        return Err(format!(
90                            "S3 URL '{url}' requires the 'cloud-storage' feature"
91                        ));
92                    }
93                }
94                // Reject unsupported schemes (gs://, az://, http://, https://, etc.).
95                // Only `file://` is acceptable for local paths — anything else with a
96                // `://` separator is silently classified as a remote backend that we
97                // don't support. Bailing out at config-parse time prevents
98                // misconfigurations from creating local directories named after the
99                // URL string (e.g. `./https:` from `https://...`).
100                if let Some(colon) = url.find("://") {
101                    let scheme = &url[..colon];
102                    if scheme != "file" {
103                        return Err(format!(
104                            "unsupported storage URL scheme '{scheme}://' in '{url}'. \
105                             Use a local path, 'file://', or (with cloud-storage feature) 's3://'"
106                        ));
107                    }
108                }
109                // Local: strip file:// prefix if present, canonicalize to PathBuf.
110                let path = if let Some(rest) = url.strip_prefix("file://") {
111                    PathBuf::from(rest)
112                } else {
113                    PathBuf::from(url)
114                };
115                Ok(Some(VortexLocation::Local { base_path: path }))
116            }
117        }
118    }
119
120    /// Returns the local base path for local Vortex storage, or `None`
121    /// for in-memory and cloud modes.
122    pub fn local_base_path(&self) -> Option<PathBuf> {
123        match self.classify() {
124            Ok(Some(VortexLocation::Local { base_path })) => Some(base_path),
125            _ => None,
126        }
127    }
128
129    /// Returns the cloud base URL for S3-backed Vortex storage, or `None`
130    /// for in-memory and local modes.
131    ///
132    /// Available on crate feature `cloud-storage` only.
133    #[cfg(feature = "cloud-storage")]
134    pub fn cloud_base_url(&self) -> Option<&str> {
135        if let Self::Vortex { url } = self {
136            if url.starts_with("s3://") {
137                return Some(url.as_str());
138            }
139        }
140        None
141    }
142
143    /// Returns the file extension for data files in this storage mode.
144    ///
145    /// - `InMemory`: `""` (no files written)
146    /// - `Vortex`: `"vortex"`
147    pub fn file_extension(&self) -> &'static str {
148        match self {
149            Self::InMemory => "",
150            Self::Vortex { .. } => "vortex",
151        }
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158
159    #[test]
160    fn inmemory_is_not_cloud() {
161        assert!(!StorageMode::InMemory.is_cloud());
162    }
163
164    #[test]
165    fn inmemory_classify_is_none() {
166        assert!(StorageMode::InMemory.classify().unwrap().is_none());
167    }
168
169    #[test]
170    fn vortex_local_abs_path() {
171        let mode = StorageMode::Vortex {
172            url: "/tmp/rhei-data".to_string(),
173        };
174        assert!(!mode.is_cloud());
175        assert_eq!(mode.file_extension(), "vortex");
176        match mode.classify().unwrap().unwrap() {
177            VortexLocation::Local { base_path } => {
178                assert_eq!(base_path, PathBuf::from("/tmp/rhei-data"));
179            }
180            #[cfg(feature = "cloud-storage")]
181            VortexLocation::S3 { .. } => panic!("expected Local"),
182        }
183    }
184
185    #[test]
186    fn vortex_local_rel_path() {
187        let mode = StorageMode::Vortex {
188            url: "./data/olap".to_string(),
189        };
190        match mode.classify().unwrap().unwrap() {
191            VortexLocation::Local { base_path } => {
192                assert_eq!(base_path, PathBuf::from("./data/olap"));
193            }
194            #[cfg(feature = "cloud-storage")]
195            VortexLocation::S3 { .. } => panic!("expected Local"),
196        }
197    }
198
199    #[test]
200    fn vortex_file_url() {
201        let mode = StorageMode::Vortex {
202            url: "file:///tmp/rhei-vortex".to_string(),
203        };
204        match mode.classify().unwrap().unwrap() {
205            VortexLocation::Local { base_path } => {
206                assert_eq!(base_path, PathBuf::from("/tmp/rhei-vortex"));
207            }
208            #[cfg(feature = "cloud-storage")]
209            VortexLocation::S3 { .. } => panic!("expected Local"),
210        }
211    }
212
213    #[test]
214    fn vortex_unsupported_scheme_rejects() {
215        // gs://, http://, https://, az://, azure://, ftp://, anything-but-`s3` and
216        // `file` must error at parse time. http/https in particular are easy
217        // misconfigurations (someone pastes a console URL) — silently treating
218        // them as local paths would create directories named e.g. `./https:`
219        // from `https://example.com/data`.
220        for url in [
221            "gs://my-bucket/prefix",
222            "http://example.com/data",
223            "https://example.com/data",
224            "az://account/container",
225            "azure://account/container",
226            "ftp://server/path",
227        ] {
228            let mode = StorageMode::Vortex { url: url.into() };
229            let err = mode
230                .classify()
231                .expect_err(&format!("expected reject for '{url}', got Ok"));
232            assert!(
233                err.contains("unsupported storage URL scheme"),
234                "expected 'unsupported storage URL scheme' in error for '{url}', got: {err}"
235            );
236        }
237    }
238
239    #[cfg(feature = "cloud-storage")]
240    #[test]
241    fn vortex_s3_url() {
242        let mode = StorageMode::Vortex {
243            url: "s3://my-bucket/rhei-data".to_string(),
244        };
245        assert!(mode.is_cloud());
246        assert_eq!(mode.file_extension(), "vortex");
247        assert_eq!(mode.cloud_base_url(), Some("s3://my-bucket/rhei-data"));
248        match mode.classify().unwrap().unwrap() {
249            VortexLocation::S3 { url } => assert_eq!(url, "s3://my-bucket/rhei-data"),
250            VortexLocation::Local { .. } => panic!("expected S3"),
251        }
252    }
253
254    #[test]
255    fn vortex_file_extension() {
256        assert_eq!(StorageMode::InMemory.file_extension(), "");
257        assert_eq!(
258            StorageMode::Vortex {
259                url: "/tmp/test".to_string()
260            }
261            .file_extension(),
262            "vortex"
263        );
264    }
265}