Skip to main content

feldera_storage/
lib.rs

1//! Common Types and Trait Definition for Storage in Feldera.
2
3use std::collections::HashSet;
4use std::fmt::Debug;
5use std::io::{Cursor, ErrorKind};
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use std::sync::atomic::AtomicI64;
9
10use feldera_types::checkpoint::{CheckpointMetadata, PSpineBatches};
11use feldera_types::config::{StorageBackendConfig, StorageConfig, StorageOptions};
12use feldera_types::constants::CREATE_FILE_EXTENSION;
13use serde::de::DeserializeOwned;
14use tracing::warn;
15use uuid::Uuid;
16
17use crate::block::BlockLocation;
18use crate::error::StorageError;
19use crate::fbuf::FBuf;
20use crate::file::FileId;
21
22pub use object_store::path::{Path as StoragePath, PathPart as StoragePathPart};
23
24pub mod block;
25pub mod checkpoint_synchronizer;
26pub mod error;
27pub mod fbuf;
28pub mod file;
29pub mod histogram;
30pub mod metrics;
31pub mod tokio;
32
33/// Helper function that appends to a [`PathBuf`].
34pub fn append_to_path(p: PathBuf, s: &str) -> PathBuf {
35    let mut p = p.into_os_string();
36    p.push(s);
37    p.into()
38}
39
40pub trait StorageBackendFactory: Sync {
41    fn backend(&self) -> &'static str;
42    fn create(
43        &self,
44        storage_config: &StorageConfig,
45        backend_config: &StorageBackendConfig,
46    ) -> Result<Arc<dyn StorageBackend>, StorageError>;
47}
48
49inventory::collect!(&'static dyn StorageBackendFactory);
50
51/// A storage backend.
52pub trait StorageBackend: Send + Sync {
53    /// Create a new file with the given `name`, automatically creating any
54    /// parent directories within `name` that don't already exist.
55    fn create_named(&self, name: &StoragePath) -> Result<Box<dyn FileWriter>, StorageError>;
56
57    /// Creates a new persistent file used for writing data. The backend selects
58    /// a name.
59    fn create(&self) -> Result<Box<dyn FileWriter>, StorageError> {
60        self.create_with_prefix(&StoragePath::default())
61    }
62
63    /// Creates a new persistent file used for writing data, giving the file's
64    /// name the specified `prefix`. See also [`create`](Self::create).
65    fn create_with_prefix(
66        &self,
67        prefix: &StoragePath,
68    ) -> Result<Box<dyn FileWriter>, StorageError> {
69        let uuid = Uuid::now_v7();
70        let name = format!("{}{}{}", prefix, uuid, CREATE_FILE_EXTENSION);
71        self.create_named(&name.into())
72    }
73
74    /// Opens `name` for reading.
75    fn open(&self, name: &StoragePath) -> Result<Arc<dyn FileReader>, StorageError>;
76
77    /// Returns the base directory path on the local file system if the storage backend
78    /// uses local disk.
79    fn file_system_path(&self) -> Option<&Path> {
80        None
81    }
82
83    /// Calls `cb` with the name of each of the files under `parent`. This is a
84    /// non-recursive list: it does not include files under sub-directories of
85    /// `parent`.
86    fn list(
87        &self,
88        parent: &StoragePath,
89        cb: &mut dyn FnMut(&StoragePath, StorageFileType),
90    ) -> Result<(), StorageError>;
91
92    fn delete(&self, name: &StoragePath) -> Result<(), StorageError>;
93
94    fn delete_recursive(&self, name: &StoragePath) -> Result<(), StorageError>;
95
96    fn delete_if_exists(&self, name: &StoragePath) -> Result<(), StorageError> {
97        match self.delete(name) {
98            Err(error) if error.kind() == ErrorKind::NotFound => Ok(()),
99            other => other,
100        }
101    }
102
103    fn exists(&self, name: &StoragePath) -> Result<bool, StorageError> {
104        match self.open(name) {
105            Ok(_) => Ok(true),
106            Err(error) if error.kind() == ErrorKind::NotFound => Ok(false),
107            Err(error) => Err(error),
108        }
109    }
110
111    /// Reads `name` and returns its contents.  The file `name` is relative to
112    /// the base of the storage backend.
113    fn read(&self, name: &StoragePath) -> Result<Arc<FBuf>, StorageError> {
114        let reader = self.open(name)?;
115        let size = reader.get_size()?.try_into().unwrap();
116        reader.read_block(BlockLocation { offset: 0, size })
117    }
118
119    /// Writes `content` to `name`, automatically creating any parent
120    /// directories within `name` that don't already exist.
121    ///
122    /// The caller must call `commit` on the returned file if it wants to make
123    /// sure that the file is committed to stable storage.
124    fn write(
125        &self,
126        name: &StoragePath,
127        content: FBuf,
128    ) -> Result<Arc<dyn FileCommitter>, StorageError> {
129        let mut writer = self.create_named(name)?;
130        writer.write_block(content)?;
131        let reader = writer.complete()?;
132        reader.mark_for_checkpoint();
133        Ok(reader)
134    }
135
136    /// Returns a value that represents the number of bytes of storage in use.
137    /// The storage backend updates this value when its own functions cause more
138    /// or less storage to be used:
139    ///
140    /// - Writing to a file.
141    ///
142    /// - Deleting a file (by dropping a [FileWriter] without completing, or by
143    ///   dropping a [FileReader] without marking it for a checkpoint, or by
144    ///   calling functions to delete files.
145    ///
146    /// The backend is *not* required to:
147    ///
148    /// - Initially report how much storage is in use. Instead, it just starts
149    ///   out at zero. The client can traverse the storage itself and store the
150    ///   correct initial value.
151    ///
152    /// - Detect changes made by a different backend or outside any backend.
153    ///
154    /// The value is signed because the problems above can cause it to become
155    /// negative.
156    fn usage(&self) -> Arc<AtomicI64>;
157}
158
159impl dyn StorageBackend {
160    /// Creates and returns a new backend configured according to `config` and `options`.
161    pub fn new(
162        config: &StorageConfig,
163        options: &StorageOptions,
164    ) -> Result<Arc<Self>, StorageError> {
165        Self::warn_about_tmpfs(config.path());
166        for variable_provider in inventory::iter::<&dyn StorageBackendFactory> {
167            if variable_provider.backend() == options.backend.to_string() {
168                return variable_provider.create(config, &options.backend);
169            }
170        }
171        Err(StorageError::BackendNotSupported(Box::new(
172            options.backend.clone(),
173        )))
174    }
175
176    fn is_tmpfs(_path: &Path) -> bool {
177        #[cfg(target_os = "linux")]
178        {
179            use nix::sys::statfs;
180            statfs::statfs(_path).is_ok_and(|s| s.filesystem_type() == statfs::TMPFS_MAGIC)
181        }
182
183        #[cfg(not(target_os = "linux"))]
184        false
185    }
186
187    fn warn_about_tmpfs(path: &Path) {
188        if Self::is_tmpfs(path) {
189            static ONCE: std::sync::Once = std::sync::Once::new();
190            ONCE.call_once(|| {
191                warn!("initializing storage on in-memory tmpfs filesystem at {}; consider configuring physical storage", path.display())
192            });
193        }
194    }
195
196    pub fn gather_batches_for_checkpoint_uuid(
197        &self,
198        cpm: uuid::Uuid,
199    ) -> Result<HashSet<StoragePath>, StorageError> {
200        assert!(!cpm.is_nil());
201
202        let mut spines = Vec::new();
203        self.list(&cpm.to_string().into(), &mut |path, _file_type| {
204            if path
205                .filename()
206                .is_some_and(|filename| filename.starts_with("pspine-batches"))
207            {
208                spines.push(path.clone());
209            }
210        })?;
211
212        let mut batch_files_in_commit: HashSet<StoragePath> = HashSet::new();
213        for spine in spines {
214            let pspine_batches = self.read_json::<PSpineBatches>(&spine)?;
215            for file in pspine_batches.files {
216                batch_files_in_commit.insert(file.into());
217            }
218        }
219
220        Ok(batch_files_in_commit)
221    }
222
223    pub fn gather_batches_for_checkpoint(
224        &self,
225        cpm: &CheckpointMetadata,
226    ) -> Result<HashSet<StoragePath>, StorageError> {
227        self.gather_batches_for_checkpoint_uuid(cpm.uuid)
228    }
229}
230
231// For an explanation of the `+ '_` here, see:
232// https://stackoverflow.com/questions/73495603/trait-problem-borrowed-data-escapes-outside-of-associated-function
233impl dyn StorageBackend + '_ {
234    /// Writes `content` to `name` as JSON, automatically creating any parent
235    /// directories within `name` that don't already exist.
236    ///
237    /// The caller must call `commit` on the returned file if it wants to make
238    /// sure that the file is committed to stable storage.
239    pub fn write_json<V>(
240        &self,
241        name: &StoragePath,
242        value: &V,
243    ) -> Result<Arc<dyn FileCommitter>, StorageError>
244    where
245        V: serde::Serialize,
246    {
247        let mut content = FBuf::new();
248        serde_json::to_writer(&mut content, value).unwrap();
249        self.write(name, content)
250    }
251
252    /// Reads `name` as JSON.
253    pub fn read_json<V>(&self, name: &StoragePath) -> Result<V, StorageError>
254    where
255        V: DeserializeOwned,
256    {
257        let content = self.read(name)?;
258        serde_json::from_reader(Cursor::new(content.as_ref()))
259            .map_err(|e| StorageError::JsonError(e.to_string()))
260    }
261}
262
263/// A file being read or written.
264pub trait FileRw {
265    /// Returns the file's unique ID.
266    fn file_id(&self) -> FileId;
267
268    /// Returns the file's path.
269    fn path(&self) -> &StoragePath;
270}
271
272/// A file being written.
273///
274/// The file can't be read until it is completed with
275/// [FileWriter::complete]. Until then, the file is temporary and will be
276/// deleted if it is dropped.
277pub trait FileWriter: Send + Sync + FileRw {
278    /// Writes `data` at the end of the file. len()` must be a multiple of 512.
279    /// Returns the data that was written encapsulated in an `Arc`.
280    fn write_block(&mut self, data: FBuf) -> Result<Arc<FBuf>, StorageError>;
281
282    /// Completes writing of a file and returns a reader for the file.
283    ///
284    /// The file will be deleted if the reader is dropped without calling
285    /// [FileReader::mark_for_checkpoint].
286    ///
287    /// The file is not necessarily committed to stable storage before calling
288    /// `commit` on the returned file.
289    fn complete(self: Box<Self>) -> Result<Arc<dyn FileReader>, StorageError>;
290}
291
292/// Allows a file to be committed to stable storage.
293///
294/// This is a supertrait of [FileReader] that only allows the commit operation.
295/// It's somewhat surprising that a file that can't be written can be committed,
296/// but it makes sense in the context of [FileWriter::complete] returning a
297/// [FileReader] that isn't necessarily committed yet.  Making this a separate
298/// trait allows code to split off a `FileCommitter` to hand to a piece of code
299/// that only needs to be able to commit it.
300pub trait FileCommitter: Send + Sync + Debug + FileRw {
301    /// Commits the file to stable storage.
302    fn commit(&self) -> Result<(), StorageError>;
303}
304
305/// A readable file.
306pub trait FileReader: Send + Sync + Debug + FileRw + FileCommitter {
307    /// Marks a file to be part of a checkpoint.
308    ///
309    /// This is used to prevent the file from being deleted when it is dropped.
310    /// This is only useful for files obtained via [FileWriter::complete],
311    /// because files that were opened with [StorageBackend::open] are never
312    /// deleted on drop.
313    fn mark_for_checkpoint(&self);
314
315    /// Reads data at `location` from the file.  If successful, the result will
316    /// be exactly the requested length; that is, this API treats read past EOF
317    /// as an error.
318    fn read_block(&self, location: BlockLocation) -> Result<Arc<FBuf>, StorageError>;
319
320    /// Initiates an asynchronous read.  When the read completes, `callback`
321    /// will be called.
322    ///
323    /// The default implementation is not actually asynchronous.
324    #[allow(clippy::type_complexity)]
325    fn read_async(
326        &self,
327        blocks: Vec<BlockLocation>,
328        callback: Box<dyn FnOnce(Vec<Result<Arc<FBuf>, StorageError>>) + Send>,
329    ) {
330        default_read_async(self, blocks, callback);
331    }
332
333    /// Returns the file's size in bytes.
334    fn get_size(&self) -> Result<u64, StorageError>;
335}
336
337/// Default implementation for [FileReader::read_async].
338///
339/// This implementation is not actually asynchronous.
340#[allow(clippy::type_complexity)]
341pub fn default_read_async<R>(
342    reader: &R,
343    blocks: Vec<BlockLocation>,
344    callback: Box<dyn FnOnce(Vec<Result<Arc<FBuf>, StorageError>>) + Send>,
345) where
346    R: FileReader + ?Sized,
347{
348    callback(
349        blocks
350            .into_iter()
351            .map(|location| reader.read_block(location))
352            .collect(),
353    )
354}
355
356#[derive(Copy, Clone, Debug, PartialEq, Eq)]
357pub enum StorageFileType {
358    /// A regular file.
359    File {
360        /// File size in bytes.
361        size: u64,
362    },
363
364    /// A directory.
365    ///
366    /// Only some kinds of storage backends support directories. The ones that
367    /// don't still allow files to be named hierarchically, but they don't
368    /// support creating or deleting directories independently from the files in
369    /// them. That is, with such a backend, a directory is effectively created
370    /// by creating a file in it, and is effectively deleted when the last file
371    /// in it is deleted.
372    Directory,
373
374    /// Something else.
375    Other,
376}