Skip to main content

feldera_storage/
error.rs

1use feldera_types::config::StorageBackendConfig;
2use object_store::Error as ObjectStoreError;
3use serde::ser::SerializeStruct;
4use serde::{Serialize, Serializer};
5use std::io::ErrorKind;
6use std::path::PathBuf;
7use thiserror::Error;
8use uuid::Uuid;
9
10/// An error that can occur when using the storage backend.
11#[derive(Clone, Error, Debug, Serialize)]
12pub enum StorageError {
13    /// I/O error.
14    #[error("{}: {operation} failed: {kind}", path.as_ref().map_or("(unknown file)", |path| path.as_str()))]
15    #[serde(serialize_with = "serialize_io_error")]
16    StdIo {
17        kind: ErrorKind,
18        operation: &'static str,
19        path: Option<String>,
20    },
21
22    /// A process already locked the provided storage directory.
23    ///
24    /// If this is not expected, please remove the lock file manually, after verifying
25    /// that the process with the given PID no longer exists.
26    #[error("A process with PID {0} is already using the storage directory {1:?}.")]
27    StorageLocked(u32, PathBuf),
28
29    /// Unknown checkpoint specified in configuration.
30    #[error("Couldn't find the specified checkpoint ({0:?}).")]
31    CheckpointNotFound(Uuid),
32
33    /// The operator wasn't assigned a persistent ID when the circuit was constructed.
34    #[error("Internal error: operator {0} has not been assigned a persistent id.")]
35    NoPersistentId(String),
36
37    /// Cannot perform operation because storage is not enabled.
38    #[error("Cannot perform operation because storage is not enabled.")]
39    StorageDisabled,
40    /// Error while creating a bloom filter.
41    #[error("Failed to serialize/deserialize bloom filter.")]
42    BloomFilter,
43
44    /// Path is not valid in storage.
45    ///
46    /// Storage paths may not be absolute, may not start with a drive letter (on
47    /// Windows), and may not contain `.` or `..` components.
48    #[error("Path is not valid in storage: {}", .0.display())]
49    InvalidPath(PathBuf),
50
51    /// Unable to parse URL.
52    #[error("Unable to parse URL {0:?}")]
53    InvalidURL(String),
54
55    /// Error accessing object store.
56    #[error("Error accessing object store: {message}")]
57    #[serde(serialize_with = "serialize_object_store_error")]
58    ObjectStore { kind: ErrorKind, message: String },
59
60    /// The requested storage backend is not available.
61    #[error(
62        "The requested storage backend ({0:?}) is not available in the open-source version of feldera"
63    )]
64    BackendNotSupported(Box<StorageBackendConfig>),
65
66    /// The requested storage backend ({backend}) cannot be configured with {}.
67    #[error("The requested storage backend ({backend}) cannot be configured with {config:?}.")]
68    InvalidBackendConfig {
69        backend: String,
70        config: Box<StorageBackendConfig>,
71    },
72
73    #[error("Error deserializing JSON: {0}")]
74    JsonError(String),
75}
76
77impl From<ObjectStoreError> for StorageError {
78    fn from(value: ObjectStoreError) -> Self {
79        let kind = match value {
80            ObjectStoreError::NotFound { .. } => ErrorKind::NotFound,
81            ObjectStoreError::NotSupported { .. } => ErrorKind::Unsupported,
82            ObjectStoreError::AlreadyExists { .. } => ErrorKind::AlreadyExists,
83            ObjectStoreError::NotImplemented => ErrorKind::Unsupported,
84            ObjectStoreError::PermissionDenied { .. }
85            | ObjectStoreError::Unauthenticated { .. } => ErrorKind::PermissionDenied,
86            ObjectStoreError::InvalidPath { .. } => {
87                // Should be `ErrorKind::InvalidFilename` (once stabilized).
88                ErrorKind::Other
89            }
90            ObjectStoreError::Generic { .. }
91            | ObjectStoreError::JoinError { .. }
92            | ObjectStoreError::Precondition { .. }
93            | ObjectStoreError::NotModified { .. }
94            | ObjectStoreError::UnknownConfigurationKey { .. }
95            | _ => ErrorKind::Other,
96        };
97        Self::ObjectStore {
98            kind,
99            message: value.to_string(),
100        }
101    }
102}
103
104fn serialize_io_error<S>(
105    kind: &ErrorKind,
106    operation: &str,
107    path: &Option<String>,
108    serializer: S,
109) -> Result<S::Ok, S::Error>
110where
111    S: Serializer,
112{
113    let mut ser = serializer.serialize_struct("IOError", 3)?;
114    ser.serialize_field("kind", &kind.to_string())?;
115    ser.serialize_field("operation", operation)?;
116    ser.serialize_field("path", &path)?;
117    ser.end()
118}
119
120fn serialize_object_store_error<S>(
121    kind: &ErrorKind,
122    message: &String,
123    serializer: S,
124) -> Result<S::Ok, S::Error>
125where
126    S: Serializer,
127{
128    let mut ser = serializer.serialize_struct("ObjectStoreError", 2)?;
129    ser.serialize_field("kind", &kind.to_string())?;
130    ser.serialize_field("message", message)?;
131    ser.end()
132}
133
134impl StorageError {
135    pub fn stdio(kind: ErrorKind, operation: &'static str, path: impl ToString) -> Self {
136        Self::StdIo {
137            kind,
138            operation,
139            path: Some(path.to_string()),
140        }
141    }
142
143    pub fn kind(&self) -> ErrorKind {
144        match self {
145            StorageError::StdIo { kind, .. } => *kind,
146            StorageError::StorageLocked(..) => ErrorKind::ResourceBusy,
147            StorageError::NoPersistentId(_) => ErrorKind::Other,
148            StorageError::CheckpointNotFound(_) => ErrorKind::NotFound,
149            StorageError::StorageDisabled => ErrorKind::Other,
150            StorageError::BloomFilter => ErrorKind::Other,
151            StorageError::InvalidPath(_) => ErrorKind::Other,
152            StorageError::InvalidURL(_) => ErrorKind::Other,
153            StorageError::ObjectStore { kind, .. } => *kind,
154            StorageError::BackendNotSupported(_) => ErrorKind::Other,
155            StorageError::InvalidBackendConfig { .. } => ErrorKind::Other,
156            StorageError::JsonError(_) => ErrorKind::Other,
157        }
158    }
159
160    pub fn ignore_notfound<T>(result: Result<T, Self>) -> Result<(), Self> {
161        match result {
162            Ok(_) => Ok(()),
163            Err(error) if error.kind() == ErrorKind::NotFound => Ok(()),
164            Err(error) => Err(error),
165        }
166    }
167}