1use feldera_types::config::StorageBackendConfig;
2use object_store::Error as ObjectStoreError;
3use serde::ser::SerializeStruct;
4use serde::{Serialize, Serializer};
5use std::io::ErrorKind;
6use std::path::PathBuf;
7use thiserror::Error;
8use uuid::Uuid;
9
10#[derive(Clone, Error, Debug, Serialize)]
12pub enum StorageError {
13 #[error("{}: {operation} failed: {kind}", path.as_ref().map_or("(unknown file)", |path| path.as_str()))]
15 #[serde(serialize_with = "serialize_io_error")]
16 StdIo {
17 kind: ErrorKind,
18 operation: &'static str,
19 path: Option<String>,
20 },
21
22 #[error("A process with PID {0} is already using the storage directory {1:?}.")]
27 StorageLocked(u32, PathBuf),
28
29 #[error("Couldn't find the specified checkpoint ({0:?}).")]
31 CheckpointNotFound(Uuid),
32
33 #[error("Internal error: operator {0} has not been assigned a persistent id.")]
35 NoPersistentId(String),
36
37 #[error("Cannot perform operation because storage is not enabled.")]
39 StorageDisabled,
40 #[error("Failed to serialize/deserialize batch key filter.")]
42 BloomFilter,
43
44 #[error("Failed to serialize roaring bitmap batch key filter.")]
46 RoaringBitmapFilter,
47
48 #[error("Path is not valid in storage: {}", .0.display())]
53 InvalidPath(PathBuf),
54
55 #[error("Unable to parse URL {0:?}")]
57 InvalidURL(String),
58
59 #[error("Error accessing object store: {message}")]
61 #[serde(serialize_with = "serialize_object_store_error")]
62 ObjectStore { kind: ErrorKind, message: String },
63
64 #[error(
66 "The requested storage backend ({0:?}) is not available in the open-source version of feldera"
67 )]
68 BackendNotSupported(Box<StorageBackendConfig>),
69
70 #[error("The requested storage backend ({backend}) cannot be configured with {config:?}.")]
72 InvalidBackendConfig {
73 backend: String,
74 config: Box<StorageBackendConfig>,
75 },
76
77 #[error("Error deserializing JSON: {0}")]
78 JsonError(String),
79}
80
81impl From<ObjectStoreError> for StorageError {
82 fn from(value: ObjectStoreError) -> Self {
83 let kind = match value {
84 ObjectStoreError::NotFound { .. } => ErrorKind::NotFound,
85 ObjectStoreError::NotSupported { .. } => ErrorKind::Unsupported,
86 ObjectStoreError::AlreadyExists { .. } => ErrorKind::AlreadyExists,
87 ObjectStoreError::NotImplemented => ErrorKind::Unsupported,
88 ObjectStoreError::PermissionDenied { .. }
89 | ObjectStoreError::Unauthenticated { .. } => ErrorKind::PermissionDenied,
90 ObjectStoreError::InvalidPath { .. } => {
91 ErrorKind::Other
93 }
94 ObjectStoreError::Generic { .. }
95 | ObjectStoreError::JoinError { .. }
96 | ObjectStoreError::Precondition { .. }
97 | ObjectStoreError::NotModified { .. }
98 | ObjectStoreError::UnknownConfigurationKey { .. }
99 | _ => ErrorKind::Other,
100 };
101 Self::ObjectStore {
102 kind,
103 message: value.to_string(),
104 }
105 }
106}
107
108fn serialize_io_error<S>(
109 kind: &ErrorKind,
110 operation: &str,
111 path: &Option<String>,
112 serializer: S,
113) -> Result<S::Ok, S::Error>
114where
115 S: Serializer,
116{
117 let mut ser = serializer.serialize_struct("IOError", 3)?;
118 ser.serialize_field("kind", &kind.to_string())?;
119 ser.serialize_field("operation", operation)?;
120 ser.serialize_field("path", &path)?;
121 ser.end()
122}
123
124fn serialize_object_store_error<S>(
125 kind: &ErrorKind,
126 message: &String,
127 serializer: S,
128) -> Result<S::Ok, S::Error>
129where
130 S: Serializer,
131{
132 let mut ser = serializer.serialize_struct("ObjectStoreError", 2)?;
133 ser.serialize_field("kind", &kind.to_string())?;
134 ser.serialize_field("message", message)?;
135 ser.end()
136}
137
138impl StorageError {
139 pub fn stdio(kind: ErrorKind, operation: &'static str, path: impl ToString) -> Self {
140 Self::StdIo {
141 kind,
142 operation,
143 path: Some(path.to_string()),
144 }
145 }
146
147 pub fn kind(&self) -> ErrorKind {
148 match self {
149 StorageError::StdIo { kind, .. } => *kind,
150 StorageError::StorageLocked(..) => ErrorKind::ResourceBusy,
151 StorageError::NoPersistentId(_) => ErrorKind::Other,
152 StorageError::CheckpointNotFound(_) => ErrorKind::NotFound,
153 StorageError::StorageDisabled => ErrorKind::Other,
154 StorageError::BloomFilter | StorageError::RoaringBitmapFilter => ErrorKind::Other,
155 StorageError::InvalidPath(_) => ErrorKind::Other,
156 StorageError::InvalidURL(_) => ErrorKind::Other,
157 StorageError::ObjectStore { kind, .. } => *kind,
158 StorageError::BackendNotSupported(_) => ErrorKind::Other,
159 StorageError::InvalidBackendConfig { .. } => ErrorKind::Other,
160 StorageError::JsonError(_) => ErrorKind::Other,
161 }
162 }
163
164 pub fn ignore_notfound<T>(result: Result<T, Self>) -> Result<(), Self> {
165 match result {
166 Ok(_) => Ok(()),
167 Err(error) if error.kind() == ErrorKind::NotFound => Ok(()),
168 Err(error) => Err(error),
169 }
170 }
171}