1use feldera_types::config::StorageBackendConfig;
2use object_store::Error as ObjectStoreError;
3use serde::ser::SerializeStruct;
4use serde::{Serialize, Serializer};
5use std::io::ErrorKind;
6use std::path::PathBuf;
7use thiserror::Error;
8use uuid::Uuid;
9
10#[derive(Clone, Error, Debug, Serialize)]
12pub enum StorageError {
13 #[error("{}: {operation} failed: {kind}", path.as_ref().map_or("(unknown file)", |path| path.as_str()))]
15 #[serde(serialize_with = "serialize_io_error")]
16 StdIo {
17 kind: ErrorKind,
18 operation: &'static str,
19 path: Option<String>,
20 },
21
22 #[error("A process with PID {0} is already using the storage directory {1:?}.")]
27 StorageLocked(u32, PathBuf),
28
29 #[error("Couldn't find the specified checkpoint ({0:?}).")]
31 CheckpointNotFound(Uuid),
32
33 #[error("Internal error: operator {0} has not been assigned a persistent id.")]
35 NoPersistentId(String),
36
37 #[error("Cannot perform operation because storage is not enabled.")]
39 StorageDisabled,
40 #[error("Failed to serialize/deserialize bloom filter.")]
42 BloomFilter,
43
44 #[error("Path is not valid in storage: {}", .0.display())]
49 InvalidPath(PathBuf),
50
51 #[error("Unable to parse URL {0:?}")]
53 InvalidURL(String),
54
55 #[error("Error accessing object store: {message}")]
57 #[serde(serialize_with = "serialize_object_store_error")]
58 ObjectStore { kind: ErrorKind, message: String },
59
60 #[error(
62 "The requested storage backend ({0:?}) is not available in the open-source version of feldera"
63 )]
64 BackendNotSupported(Box<StorageBackendConfig>),
65
66 #[error("The requested storage backend ({backend}) cannot be configured with {config:?}.")]
68 InvalidBackendConfig {
69 backend: String,
70 config: Box<StorageBackendConfig>,
71 },
72
73 #[error("Error deserializing JSON: {0}")]
74 JsonError(String),
75}
76
77impl From<ObjectStoreError> for StorageError {
78 fn from(value: ObjectStoreError) -> Self {
79 let kind = match value {
80 ObjectStoreError::NotFound { .. } => ErrorKind::NotFound,
81 ObjectStoreError::NotSupported { .. } => ErrorKind::Unsupported,
82 ObjectStoreError::AlreadyExists { .. } => ErrorKind::AlreadyExists,
83 ObjectStoreError::NotImplemented => ErrorKind::Unsupported,
84 ObjectStoreError::PermissionDenied { .. }
85 | ObjectStoreError::Unauthenticated { .. } => ErrorKind::PermissionDenied,
86 ObjectStoreError::InvalidPath { .. } => {
87 ErrorKind::Other
89 }
90 ObjectStoreError::Generic { .. }
91 | ObjectStoreError::JoinError { .. }
92 | ObjectStoreError::Precondition { .. }
93 | ObjectStoreError::NotModified { .. }
94 | ObjectStoreError::UnknownConfigurationKey { .. }
95 | _ => ErrorKind::Other,
96 };
97 Self::ObjectStore {
98 kind,
99 message: value.to_string(),
100 }
101 }
102}
103
104fn serialize_io_error<S>(
105 kind: &ErrorKind,
106 operation: &str,
107 path: &Option<String>,
108 serializer: S,
109) -> Result<S::Ok, S::Error>
110where
111 S: Serializer,
112{
113 let mut ser = serializer.serialize_struct("IOError", 3)?;
114 ser.serialize_field("kind", &kind.to_string())?;
115 ser.serialize_field("operation", operation)?;
116 ser.serialize_field("path", &path)?;
117 ser.end()
118}
119
120fn serialize_object_store_error<S>(
121 kind: &ErrorKind,
122 message: &String,
123 serializer: S,
124) -> Result<S::Ok, S::Error>
125where
126 S: Serializer,
127{
128 let mut ser = serializer.serialize_struct("ObjectStoreError", 2)?;
129 ser.serialize_field("kind", &kind.to_string())?;
130 ser.serialize_field("message", message)?;
131 ser.end()
132}
133
134impl StorageError {
135 pub fn stdio(kind: ErrorKind, operation: &'static str, path: impl ToString) -> Self {
136 Self::StdIo {
137 kind,
138 operation,
139 path: Some(path.to_string()),
140 }
141 }
142
143 pub fn kind(&self) -> ErrorKind {
144 match self {
145 StorageError::StdIo { kind, .. } => *kind,
146 StorageError::StorageLocked(..) => ErrorKind::ResourceBusy,
147 StorageError::NoPersistentId(_) => ErrorKind::Other,
148 StorageError::CheckpointNotFound(_) => ErrorKind::NotFound,
149 StorageError::StorageDisabled => ErrorKind::Other,
150 StorageError::BloomFilter => ErrorKind::Other,
151 StorageError::InvalidPath(_) => ErrorKind::Other,
152 StorageError::InvalidURL(_) => ErrorKind::Other,
153 StorageError::ObjectStore { kind, .. } => *kind,
154 StorageError::BackendNotSupported(_) => ErrorKind::Other,
155 StorageError::InvalidBackendConfig { .. } => ErrorKind::Other,
156 StorageError::JsonError(_) => ErrorKind::Other,
157 }
158 }
159
160 pub fn ignore_notfound<T>(result: Result<T, Self>) -> Result<(), Self> {
161 match result {
162 Ok(_) => Ok(()),
163 Err(error) if error.kind() == ErrorKind::NotFound => Ok(()),
164 Err(error) => Err(error),
165 }
166 }
167}