1use std::collections::HashSet;
4use std::fmt::Debug;
5use std::io::{Cursor, ErrorKind};
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use std::sync::atomic::AtomicI64;
9
10use feldera_types::checkpoint::{CheckpointMetadata, PSpineBatches};
11use feldera_types::config::{StorageBackendConfig, StorageConfig, StorageOptions};
12use feldera_types::constants::CREATE_FILE_EXTENSION;
13use serde::de::DeserializeOwned;
14use tracing::warn;
15use uuid::Uuid;
16
17use crate::block::BlockLocation;
18use crate::error::StorageError;
19use crate::fbuf::FBuf;
20use crate::file::FileId;
21
22pub use object_store::path::{Path as StoragePath, PathPart as StoragePathPart};
23
24pub mod block;
25pub mod checkpoint_synchronizer;
26pub mod error;
27pub mod fbuf;
28pub mod file;
29pub mod histogram;
30pub mod metrics;
31pub mod tokio;
32
33pub fn append_to_path(p: PathBuf, s: &str) -> PathBuf {
35 let mut p = p.into_os_string();
36 p.push(s);
37 p.into()
38}
39
40pub trait StorageBackendFactory: Sync {
41 fn backend(&self) -> &'static str;
42 fn create(
43 &self,
44 storage_config: &StorageConfig,
45 backend_config: &StorageBackendConfig,
46 ) -> Result<Arc<dyn StorageBackend>, StorageError>;
47}
48
49inventory::collect!(&'static dyn StorageBackendFactory);
50
51pub trait StorageBackend: Send + Sync {
53 fn create_named(&self, name: &StoragePath) -> Result<Box<dyn FileWriter>, StorageError>;
56
57 fn create(&self) -> Result<Box<dyn FileWriter>, StorageError> {
60 self.create_with_prefix(&StoragePath::default())
61 }
62
63 fn create_with_prefix(
66 &self,
67 prefix: &StoragePath,
68 ) -> Result<Box<dyn FileWriter>, StorageError> {
69 let uuid = Uuid::now_v7();
70 let name = format!("{}{}{}", prefix, uuid, CREATE_FILE_EXTENSION);
71 self.create_named(&name.into())
72 }
73
74 fn open(&self, name: &StoragePath) -> Result<Arc<dyn FileReader>, StorageError>;
76
77 fn file_system_path(&self) -> Option<&Path> {
80 None
81 }
82
83 fn list(
87 &self,
88 parent: &StoragePath,
89 cb: &mut dyn FnMut(&StoragePath, StorageFileType),
90 ) -> Result<(), StorageError>;
91
92 fn delete(&self, name: &StoragePath) -> Result<(), StorageError>;
93
94 fn delete_recursive(&self, name: &StoragePath) -> Result<(), StorageError>;
95
96 fn delete_if_exists(&self, name: &StoragePath) -> Result<(), StorageError> {
97 match self.delete(name) {
98 Err(error) if error.kind() == ErrorKind::NotFound => Ok(()),
99 other => other,
100 }
101 }
102
103 fn exists(&self, name: &StoragePath) -> Result<bool, StorageError> {
104 match self.open(name) {
105 Ok(_) => Ok(true),
106 Err(error) if error.kind() == ErrorKind::NotFound => Ok(false),
107 Err(error) => Err(error),
108 }
109 }
110
111 fn read(&self, name: &StoragePath) -> Result<Arc<FBuf>, StorageError> {
114 let reader = self.open(name)?;
115 let size = reader.get_size()?.try_into().unwrap();
116 reader.read_block(BlockLocation { offset: 0, size })
117 }
118
119 fn write(
125 &self,
126 name: &StoragePath,
127 content: FBuf,
128 ) -> Result<Arc<dyn FileCommitter>, StorageError> {
129 let mut writer = self.create_named(name)?;
130 writer.write_block(content)?;
131 let reader = writer.complete()?;
132 reader.mark_for_checkpoint();
133 Ok(reader)
134 }
135
136 fn usage(&self) -> Arc<AtomicI64>;
157}
158
159impl dyn StorageBackend {
160 pub fn new(
162 config: &StorageConfig,
163 options: &StorageOptions,
164 ) -> Result<Arc<Self>, StorageError> {
165 Self::warn_about_tmpfs(config.path());
166 for variable_provider in inventory::iter::<&dyn StorageBackendFactory> {
167 if variable_provider.backend() == options.backend.to_string() {
168 return variable_provider.create(config, &options.backend);
169 }
170 }
171 Err(StorageError::BackendNotSupported(Box::new(
172 options.backend.clone(),
173 )))
174 }
175
176 fn is_tmpfs(_path: &Path) -> bool {
177 #[cfg(target_os = "linux")]
178 {
179 use nix::sys::statfs;
180 statfs::statfs(_path).is_ok_and(|s| s.filesystem_type() == statfs::TMPFS_MAGIC)
181 }
182
183 #[cfg(not(target_os = "linux"))]
184 false
185 }
186
187 fn warn_about_tmpfs(path: &Path) {
188 if Self::is_tmpfs(path) {
189 static ONCE: std::sync::Once = std::sync::Once::new();
190 ONCE.call_once(|| {
191 warn!("initializing storage on in-memory tmpfs filesystem at {}; consider configuring physical storage", path.display())
192 });
193 }
194 }
195
196 pub fn gather_batches_for_checkpoint_uuid(
197 &self,
198 cpm: uuid::Uuid,
199 ) -> Result<HashSet<StoragePath>, StorageError> {
200 assert!(!cpm.is_nil());
201
202 let mut spines = Vec::new();
203 self.list(&cpm.to_string().into(), &mut |path, _file_type| {
204 if path
205 .filename()
206 .is_some_and(|filename| filename.starts_with("pspine-batches"))
207 {
208 spines.push(path.clone());
209 }
210 })?;
211
212 let mut batch_files_in_commit: HashSet<StoragePath> = HashSet::new();
213 for spine in spines {
214 let pspine_batches = self.read_json::<PSpineBatches>(&spine)?;
215 for file in pspine_batches.files {
216 batch_files_in_commit.insert(file.into());
217 }
218 }
219
220 Ok(batch_files_in_commit)
221 }
222
223 pub fn gather_batches_for_checkpoint(
224 &self,
225 cpm: &CheckpointMetadata,
226 ) -> Result<HashSet<StoragePath>, StorageError> {
227 self.gather_batches_for_checkpoint_uuid(cpm.uuid)
228 }
229}
230
231impl dyn StorageBackend + '_ {
234 pub fn write_json<V>(
240 &self,
241 name: &StoragePath,
242 value: &V,
243 ) -> Result<Arc<dyn FileCommitter>, StorageError>
244 where
245 V: serde::Serialize,
246 {
247 let mut content = FBuf::new();
248 serde_json::to_writer(&mut content, value).unwrap();
249 self.write(name, content)
250 }
251
252 pub fn read_json<V>(&self, name: &StoragePath) -> Result<V, StorageError>
254 where
255 V: DeserializeOwned,
256 {
257 let content = self.read(name)?;
258 serde_json::from_reader(Cursor::new(content.as_ref()))
259 .map_err(|e| StorageError::JsonError(e.to_string()))
260 }
261}
262
263pub trait FileRw {
265 fn file_id(&self) -> FileId;
267
268 fn path(&self) -> &StoragePath;
270}
271
272pub trait FileWriter: Send + Sync + FileRw {
278 fn write_block(&mut self, data: FBuf) -> Result<Arc<FBuf>, StorageError>;
281
282 fn complete(self: Box<Self>) -> Result<Arc<dyn FileReader>, StorageError>;
290}
291
292pub trait FileCommitter: Send + Sync + Debug + FileRw {
301 fn commit(&self) -> Result<(), StorageError>;
303}
304
305pub trait FileReader: Send + Sync + Debug + FileRw + FileCommitter {
307 fn mark_for_checkpoint(&self);
314
315 fn read_block(&self, location: BlockLocation) -> Result<Arc<FBuf>, StorageError>;
319
320 #[allow(clippy::type_complexity)]
325 fn read_async(
326 &self,
327 blocks: Vec<BlockLocation>,
328 callback: Box<dyn FnOnce(Vec<Result<Arc<FBuf>, StorageError>>) + Send>,
329 ) {
330 default_read_async(self, blocks, callback);
331 }
332
333 fn get_size(&self) -> Result<u64, StorageError>;
335}
336
337#[allow(clippy::type_complexity)]
341pub fn default_read_async<R>(
342 reader: &R,
343 blocks: Vec<BlockLocation>,
344 callback: Box<dyn FnOnce(Vec<Result<Arc<FBuf>, StorageError>>) + Send>,
345) where
346 R: FileReader + ?Sized,
347{
348 callback(
349 blocks
350 .into_iter()
351 .map(|location| reader.read_block(location))
352 .collect(),
353 )
354}
355
356#[derive(Copy, Clone, Debug, PartialEq, Eq)]
357pub enum StorageFileType {
358 File {
360 size: u64,
362 },
363
364 Directory,
373
374 Other,
376}