rhei_datafusion/storage.rs
1//! Pluggable storage modes for the DataFusion OLAP engine.
2//!
3//! - `InMemory`: default, data stored as `Vec<RecordBatch>` (lost on shutdown)
4//! - `Vortex { url }`: durable Vortex columnar storage, auto-routing between
5//! local filesystem and S3-compatible object store based on the URL scheme.
6//! - Local: `file://...`, `/abs/path`, `./rel/path`, or schemeless paths
7//! - S3-compatible: `s3://bucket/prefix` (requires `cloud-storage` feature)
8
9use std::path::PathBuf;
10
11/// Storage mode for the DataFusion OLAP engine.
12#[derive(Debug, Clone, Default)]
13pub enum StorageMode {
14 /// In-memory storage (default). Data is lost on shutdown.
15 #[default]
16 InMemory,
17 /// Vortex columnar file storage — local filesystem or S3-compatible.
18 ///
19 /// The URL scheme determines the backend:
20 ///
21 /// - **Local** (`file://...`, `/abs/path`, `./rel/path`, or schemeless):
22 /// data is written as `.vortex` files under `<resolved_base>/<table>/`.
23 /// Survives restarts.
24 ///
25 /// - **S3-compatible** (`s3://bucket/prefix`, requires `cloud-storage`
26 /// feature): data is written as `.vortex` objects under
27 /// `<url>/<table>/`. Credentials are resolved from environment variables
28 /// (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_REGION`, etc.)
29 /// following `object_store` conventions. S3-compatible services (MinIO,
30 /// Cloudflare R2, Wasabi, Ceph RGW) work via `AWS_ENDPOINT_URL`.
31 ///
32 /// Any other scheme (e.g. `gs://`) is rejected at construction time with a
33 /// clear error.
34 Vortex {
35 /// Filesystem path or object-store URL for the base storage location.
36 url: String,
37 },
38}
39
40/// Internal classification of a Vortex URL: local path or S3-compatible.
41#[derive(Debug, Clone)]
42pub enum VortexLocation {
43 /// Resolved base directory for local storage.
44 Local { base_path: PathBuf },
45 /// S3-compatible URL (bucket + prefix).
46 #[cfg(feature = "cloud-storage")]
47 S3 { url: String },
48}
49
50impl StorageMode {
51 /// Returns `true` if this mode uses S3-compatible cloud object storage.
52 ///
53 /// Always `false` when the `cloud-storage` feature is not enabled.
54 pub fn is_cloud(&self) -> bool {
55 match self {
56 Self::InMemory => false,
57 Self::Vortex { url } => {
58 let _ = url; // avoid unused-var warning on non-cloud builds
59 #[cfg(feature = "cloud-storage")]
60 {
61 url.starts_with("s3://")
62 }
63 #[cfg(not(feature = "cloud-storage"))]
64 {
65 false
66 }
67 }
68 }
69 }
70
71 /// Classify the Vortex URL and return a [`VortexLocation`].
72 ///
73 /// Returns `None` for [`StorageMode::InMemory`].
74 ///
75 /// # Errors
76 ///
77 /// Returns `Err` if the URL scheme is unsupported (e.g. `gs://`).
78 pub fn classify(&self) -> Result<Option<VortexLocation>, String> {
79 match self {
80 Self::InMemory => Ok(None),
81 Self::Vortex { url } => {
82 if url.starts_with("s3://") {
83 #[cfg(feature = "cloud-storage")]
84 {
85 return Ok(Some(VortexLocation::S3 { url: url.clone() }));
86 }
87 #[cfg(not(feature = "cloud-storage"))]
88 {
89 return Err(format!(
90 "S3 URL '{url}' requires the 'cloud-storage' feature"
91 ));
92 }
93 }
94 // Reject unsupported schemes (gs://, az://, http://, https://, etc.).
95 // Only `file://` is acceptable for local paths — anything else with a
96 // `://` separator is silently classified as a remote backend that we
97 // don't support. Bailing out at config-parse time prevents
98 // misconfigurations from creating local directories named after the
99 // URL string (e.g. `./https:` from `https://...`).
100 if let Some(colon) = url.find("://") {
101 let scheme = &url[..colon];
102 if scheme != "file" {
103 return Err(format!(
104 "unsupported storage URL scheme '{scheme}://' in '{url}'. \
105 Use a local path, 'file://', or (with cloud-storage feature) 's3://'"
106 ));
107 }
108 }
109 // Local: strip file:// prefix if present, canonicalize to PathBuf.
110 let path = if let Some(rest) = url.strip_prefix("file://") {
111 PathBuf::from(rest)
112 } else {
113 PathBuf::from(url)
114 };
115 Ok(Some(VortexLocation::Local { base_path: path }))
116 }
117 }
118 }
119
120 /// Returns the local base path for local Vortex storage, or `None`
121 /// for in-memory and cloud modes.
122 pub fn local_base_path(&self) -> Option<PathBuf> {
123 match self.classify() {
124 Ok(Some(VortexLocation::Local { base_path })) => Some(base_path),
125 _ => None,
126 }
127 }
128
129 /// Returns the cloud base URL for S3-backed Vortex storage, or `None`
130 /// for in-memory and local modes.
131 ///
132 /// Available on crate feature `cloud-storage` only.
133 #[cfg(feature = "cloud-storage")]
134 pub fn cloud_base_url(&self) -> Option<&str> {
135 if let Self::Vortex { url } = self {
136 if url.starts_with("s3://") {
137 return Some(url.as_str());
138 }
139 }
140 None
141 }
142
143 /// Returns the file extension for data files in this storage mode.
144 ///
145 /// - `InMemory`: `""` (no files written)
146 /// - `Vortex`: `"vortex"`
147 pub fn file_extension(&self) -> &'static str {
148 match self {
149 Self::InMemory => "",
150 Self::Vortex { .. } => "vortex",
151 }
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158
159 #[test]
160 fn inmemory_is_not_cloud() {
161 assert!(!StorageMode::InMemory.is_cloud());
162 }
163
164 #[test]
165 fn inmemory_classify_is_none() {
166 assert!(StorageMode::InMemory.classify().unwrap().is_none());
167 }
168
169 #[test]
170 fn vortex_local_abs_path() {
171 let mode = StorageMode::Vortex {
172 url: "/tmp/rhei-data".to_string(),
173 };
174 assert!(!mode.is_cloud());
175 assert_eq!(mode.file_extension(), "vortex");
176 match mode.classify().unwrap().unwrap() {
177 VortexLocation::Local { base_path } => {
178 assert_eq!(base_path, PathBuf::from("/tmp/rhei-data"));
179 }
180 #[cfg(feature = "cloud-storage")]
181 VortexLocation::S3 { .. } => panic!("expected Local"),
182 }
183 }
184
185 #[test]
186 fn vortex_local_rel_path() {
187 let mode = StorageMode::Vortex {
188 url: "./data/olap".to_string(),
189 };
190 match mode.classify().unwrap().unwrap() {
191 VortexLocation::Local { base_path } => {
192 assert_eq!(base_path, PathBuf::from("./data/olap"));
193 }
194 #[cfg(feature = "cloud-storage")]
195 VortexLocation::S3 { .. } => panic!("expected Local"),
196 }
197 }
198
199 #[test]
200 fn vortex_file_url() {
201 let mode = StorageMode::Vortex {
202 url: "file:///tmp/rhei-vortex".to_string(),
203 };
204 match mode.classify().unwrap().unwrap() {
205 VortexLocation::Local { base_path } => {
206 assert_eq!(base_path, PathBuf::from("/tmp/rhei-vortex"));
207 }
208 #[cfg(feature = "cloud-storage")]
209 VortexLocation::S3 { .. } => panic!("expected Local"),
210 }
211 }
212
213 #[test]
214 fn vortex_unsupported_scheme_rejects() {
215 // gs://, http://, https://, az://, azure://, ftp://, anything-but-`s3` and
216 // `file` must error at parse time. http/https in particular are easy
217 // misconfigurations (someone pastes a console URL) — silently treating
218 // them as local paths would create directories named e.g. `./https:`
219 // from `https://example.com/data`.
220 for url in [
221 "gs://my-bucket/prefix",
222 "http://example.com/data",
223 "https://example.com/data",
224 "az://account/container",
225 "azure://account/container",
226 "ftp://server/path",
227 ] {
228 let mode = StorageMode::Vortex { url: url.into() };
229 let err = mode
230 .classify()
231 .expect_err(&format!("expected reject for '{url}', got Ok"));
232 assert!(
233 err.contains("unsupported storage URL scheme"),
234 "expected 'unsupported storage URL scheme' in error for '{url}', got: {err}"
235 );
236 }
237 }
238
239 #[cfg(feature = "cloud-storage")]
240 #[test]
241 fn vortex_s3_url() {
242 let mode = StorageMode::Vortex {
243 url: "s3://my-bucket/rhei-data".to_string(),
244 };
245 assert!(mode.is_cloud());
246 assert_eq!(mode.file_extension(), "vortex");
247 assert_eq!(mode.cloud_base_url(), Some("s3://my-bucket/rhei-data"));
248 match mode.classify().unwrap().unwrap() {
249 VortexLocation::S3 { url } => assert_eq!(url, "s3://my-bucket/rhei-data"),
250 VortexLocation::Local { .. } => panic!("expected S3"),
251 }
252 }
253
254 #[test]
255 fn vortex_file_extension() {
256 assert_eq!(StorageMode::InMemory.file_extension(), "");
257 assert_eq!(
258 StorageMode::Vortex {
259 url: "/tmp/test".to_string()
260 }
261 .file_extension(),
262 "vortex"
263 );
264 }
265}