Skip to main content

feldera_storage/
error.rs

1use feldera_types::config::StorageBackendConfig;
2use object_store::Error as ObjectStoreError;
3use serde::ser::SerializeStruct;
4use serde::{Serialize, Serializer};
5use std::io::ErrorKind;
6use std::path::PathBuf;
7use thiserror::Error;
8use uuid::Uuid;
9
10/// An error that can occur when using the storage backend.
11#[derive(Clone, Error, Debug, Serialize)]
12pub enum StorageError {
13    /// I/O error.
14    #[error("{}: {operation} failed: {kind}", path.as_ref().map_or("(unknown file)", |path| path.as_str()))]
15    #[serde(serialize_with = "serialize_io_error")]
16    StdIo {
17        kind: ErrorKind,
18        operation: &'static str,
19        path: Option<String>,
20    },
21
22    /// A process already locked the provided storage directory.
23    ///
24    /// If this is not expected, please remove the lock file manually, after verifying
25    /// that the process with the given PID no longer exists.
26    #[error("A process with PID {0} is already using the storage directory {1:?}.")]
27    StorageLocked(u32, PathBuf),
28
29    /// Unknown checkpoint specified in configuration.
30    #[error("Couldn't find the specified checkpoint ({0:?}).")]
31    CheckpointNotFound(Uuid),
32
33    /// The operator wasn't assigned a persistent ID when the circuit was constructed.
34    #[error("Internal error: operator {0} has not been assigned a persistent id.")]
35    NoPersistentId(String),
36
37    /// Cannot perform operation because storage is not enabled.
38    #[error("Cannot perform operation because storage is not enabled.")]
39    StorageDisabled,
40    /// Error while creating a batch key filter.
41    #[error("Failed to serialize/deserialize batch key filter.")]
42    BloomFilter,
43
44    /// Error while serializing a roaring bitmap batch key filter.
45    #[error("Failed to serialize roaring bitmap batch key filter.")]
46    RoaringBitmapFilter,
47
48    /// Path is not valid in storage.
49    ///
50    /// Storage paths may not be absolute, may not start with a drive letter (on
51    /// Windows), and may not contain `.` or `..` components.
52    #[error("Path is not valid in storage: {}", .0.display())]
53    InvalidPath(PathBuf),
54
55    /// Unable to parse URL.
56    #[error("Unable to parse URL {0:?}")]
57    InvalidURL(String),
58
59    /// Error accessing object store.
60    #[error("Error accessing object store: {message}")]
61    #[serde(serialize_with = "serialize_object_store_error")]
62    ObjectStore { kind: ErrorKind, message: String },
63
64    /// The requested storage backend is not available.
65    #[error(
66        "The requested storage backend ({0:?}) is not available in the open-source version of feldera"
67    )]
68    BackendNotSupported(Box<StorageBackendConfig>),
69
70    /// The requested storage backend ({backend}) cannot be configured with {}.
71    #[error("The requested storage backend ({backend}) cannot be configured with {config:?}.")]
72    InvalidBackendConfig {
73        backend: String,
74        config: Box<StorageBackendConfig>,
75    },
76
77    #[error("Error deserializing JSON: {0}")]
78    JsonError(String),
79}
80
81impl From<ObjectStoreError> for StorageError {
82    fn from(value: ObjectStoreError) -> Self {
83        let kind = match value {
84            ObjectStoreError::NotFound { .. } => ErrorKind::NotFound,
85            ObjectStoreError::NotSupported { .. } => ErrorKind::Unsupported,
86            ObjectStoreError::AlreadyExists { .. } => ErrorKind::AlreadyExists,
87            ObjectStoreError::NotImplemented => ErrorKind::Unsupported,
88            ObjectStoreError::PermissionDenied { .. }
89            | ObjectStoreError::Unauthenticated { .. } => ErrorKind::PermissionDenied,
90            ObjectStoreError::InvalidPath { .. } => {
91                // Should be `ErrorKind::InvalidFilename` (once stabilized).
92                ErrorKind::Other
93            }
94            ObjectStoreError::Generic { .. }
95            | ObjectStoreError::JoinError { .. }
96            | ObjectStoreError::Precondition { .. }
97            | ObjectStoreError::NotModified { .. }
98            | ObjectStoreError::UnknownConfigurationKey { .. }
99            | _ => ErrorKind::Other,
100        };
101        Self::ObjectStore {
102            kind,
103            message: value.to_string(),
104        }
105    }
106}
107
108fn serialize_io_error<S>(
109    kind: &ErrorKind,
110    operation: &str,
111    path: &Option<String>,
112    serializer: S,
113) -> Result<S::Ok, S::Error>
114where
115    S: Serializer,
116{
117    let mut ser = serializer.serialize_struct("IOError", 3)?;
118    ser.serialize_field("kind", &kind.to_string())?;
119    ser.serialize_field("operation", operation)?;
120    ser.serialize_field("path", &path)?;
121    ser.end()
122}
123
124fn serialize_object_store_error<S>(
125    kind: &ErrorKind,
126    message: &String,
127    serializer: S,
128) -> Result<S::Ok, S::Error>
129where
130    S: Serializer,
131{
132    let mut ser = serializer.serialize_struct("ObjectStoreError", 2)?;
133    ser.serialize_field("kind", &kind.to_string())?;
134    ser.serialize_field("message", message)?;
135    ser.end()
136}
137
138impl StorageError {
139    pub fn stdio(kind: ErrorKind, operation: &'static str, path: impl ToString) -> Self {
140        Self::StdIo {
141            kind,
142            operation,
143            path: Some(path.to_string()),
144        }
145    }
146
147    pub fn kind(&self) -> ErrorKind {
148        match self {
149            StorageError::StdIo { kind, .. } => *kind,
150            StorageError::StorageLocked(..) => ErrorKind::ResourceBusy,
151            StorageError::NoPersistentId(_) => ErrorKind::Other,
152            StorageError::CheckpointNotFound(_) => ErrorKind::NotFound,
153            StorageError::StorageDisabled => ErrorKind::Other,
154            StorageError::BloomFilter | StorageError::RoaringBitmapFilter => ErrorKind::Other,
155            StorageError::InvalidPath(_) => ErrorKind::Other,
156            StorageError::InvalidURL(_) => ErrorKind::Other,
157            StorageError::ObjectStore { kind, .. } => *kind,
158            StorageError::BackendNotSupported(_) => ErrorKind::Other,
159            StorageError::InvalidBackendConfig { .. } => ErrorKind::Other,
160            StorageError::JsonError(_) => ErrorKind::Other,
161        }
162    }
163
164    pub fn ignore_notfound<T>(result: Result<T, Self>) -> Result<(), Self> {
165        match result {
166            Ok(_) => Ok(()),
167            Err(error) if error.kind() == ErrorKind::NotFound => Ok(()),
168            Err(error) => Err(error),
169        }
170    }
171}