1use std::{collections::BTreeSet, convert::Infallible};
2
3use ankurah_proto::{CollectionId, DecodeError, EntityId, EventId};
4use thiserror::Error;
5
6use crate::{connector::SendError, policy::AccessDenied};
7
8#[derive(Error, Debug)]
9pub enum RetrievalError {
10 #[error("access denied")]
11 AccessDenied(AccessDenied),
12 #[error("Parse error: {0}")]
13 ParseError(ankql::error::ParseError),
14 #[error("Entity not found: {0:?}")]
15 EntityNotFound(EntityId),
16 #[error("Event not found: {0:?}")]
17 EventNotFound(EventId),
18 #[error("Storage error: {0}")]
19 StorageError(Box<dyn std::error::Error + Send + Sync + 'static>),
20 #[error("Collection not found: {0}")]
21 CollectionNotFound(CollectionId),
22 #[error("Update failed: {0}")]
23 FailedUpdate(Box<dyn std::error::Error + Send + Sync + 'static>),
24 #[error("Deserialization error: {0}")]
25 DeserializationError(bincode::Error),
26 #[error("No durable peers available for fetch operation")]
27 NoDurablePeers,
28 #[error("Other error: {0}")]
29 Other(String),
30 #[error("bucket name must only contain valid characters")]
31 InvalidBucketName,
32 #[error("ankql filter: {0}")]
33 AnkqlFilter(ankql::selection::filter::Error),
34 #[error("Future join: {0}")]
35 FutureJoin(tokio::task::JoinError),
36 #[error("{0}")]
37 Anyhow(anyhow::Error),
38 #[error("Decode error: {0}")]
39 DecodeError(DecodeError),
40 #[error("State error: {0}")]
41 StateError(StateError),
42 #[error("Mutation error: {0}")]
43 MutationError(Box<MutationError>),
44 #[error("Property error: {0}")]
45 PropertyError(Box<crate::property::PropertyError>),
46 #[error("Request error: {0}")]
47 RequestError(RequestError),
48 #[error("Apply error: {0}")]
49 ApplyError(ApplyError),
50}
51
52impl From<RequestError> for RetrievalError {
53 fn from(err: RequestError) -> Self { RetrievalError::RequestError(err) }
54}
55
56impl From<crate::property::PropertyError> for RetrievalError {
57 fn from(err: crate::property::PropertyError) -> Self { RetrievalError::PropertyError(Box::new(err)) }
58}
59
60impl From<tokio::task::JoinError> for RetrievalError {
61 fn from(err: tokio::task::JoinError) -> Self { RetrievalError::FutureJoin(err) }
62}
63
64impl From<MutationError> for RetrievalError {
65 fn from(err: MutationError) -> Self { RetrievalError::MutationError(Box::new(err)) }
66}
67
68impl RetrievalError {
69 pub fn storage(err: impl std::error::Error + Send + Sync + 'static) -> Self { RetrievalError::StorageError(Box::new(err)) }
70}
71
72impl From<bincode::Error> for RetrievalError {
73 fn from(e: bincode::Error) -> Self { RetrievalError::DeserializationError(e) }
74}
75
76impl From<ankql::selection::filter::Error> for RetrievalError {
77 fn from(err: ankql::selection::filter::Error) -> Self { RetrievalError::AnkqlFilter(err) }
78}
79
80impl From<anyhow::Error> for RetrievalError {
81 fn from(err: anyhow::Error) -> Self { RetrievalError::Anyhow(err) }
82}
83
84impl From<Infallible> for RetrievalError {
85 fn from(_: Infallible) -> Self { unreachable!("Infallible can never be constructed") }
86}
87
88#[derive(Error, Debug)]
89pub enum RequestError {
90 #[error("Peer not connected")]
91 PeerNotConnected,
92 #[error("Connection lost")]
93 ConnectionLost,
94 #[error("Server error: {0}")]
95 ServerError(String),
96 #[error("Send error: {0}")]
97 SendError(SendError),
98 #[error("Internal channel closed")]
99 InternalChannelClosed,
100 #[error("Unexpected response: {0:?}")]
101 UnexpectedResponse(ankurah_proto::NodeResponseBody),
102}
103
104impl From<SendError> for RequestError {
105 fn from(err: SendError) -> Self { RequestError::SendError(err) }
106}
107
108#[derive(Error, Debug)]
109pub enum SubscriptionError {
110 #[error("predicate not found")]
111 PredicateNotFound,
112 #[error("already subscribed to predicate")]
113 PredicateAlreadySubscribed,
114 #[error("subscription not found")]
115 SubscriptionNotFound,
116}
117
118impl From<DecodeError> for RetrievalError {
119 fn from(err: DecodeError) -> Self { RetrievalError::DecodeError(err) }
120}
121
122#[derive(Error, Debug)]
123pub enum MutationError {
124 #[error("access denied")]
125 AccessDenied(AccessDenied),
126 #[error("already exists")]
127 AlreadyExists,
128 #[error("retrieval error: {0}")]
129 RetrievalError(RetrievalError),
130 #[error("state error: {0}")]
131 StateError(StateError),
132 #[error("failed update: {0}")]
133 UpdateFailed(Box<dyn std::error::Error + Send + Sync + 'static>),
134 #[error("failed step: {0}: {1}")]
135 FailedStep(&'static str, String),
136 #[error("failed to set property: {0}: {1}")]
137 FailedToSetProperty(&'static str, String),
138 #[error("general error: {0}")]
139 General(Box<dyn std::error::Error + Send + Sync + 'static>),
140 #[error("no durable peers available")]
141 NoDurablePeers,
142 #[error("decode error: {0}")]
143 DecodeError(DecodeError),
144 #[error("lineage error: {0}")]
145 LineageError(LineageError),
146 #[error("peer rejected transaction")]
147 PeerRejected,
148 #[error("invalid event")]
149 InvalidEvent,
150 #[error("invalid update")]
151 InvalidUpdate(&'static str),
152 #[error("property error: {0}")]
153 PropertyError(crate::property::PropertyError),
154 #[error("future join: {0}")]
155 FutureJoin(tokio::task::JoinError),
156 #[error("anyhow error: {0}")]
157 Anyhow(anyhow::Error),
158}
159
160impl From<tokio::task::JoinError> for MutationError {
161 fn from(err: tokio::task::JoinError) -> Self { MutationError::FutureJoin(err) }
162}
163
164impl From<anyhow::Error> for MutationError {
165 fn from(err: anyhow::Error) -> Self { MutationError::Anyhow(err) }
166}
167
168#[derive(Debug)]
169pub enum LineageError {
170 Incomparable,
171 PartiallyDescends { meet: Vec<EventId> },
172 BudgetExceeded { original_budget: usize, subject_frontier: BTreeSet<EventId>, other_frontier: BTreeSet<EventId> },
173}
174
175impl std::fmt::Display for LineageError {
176 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177 match self {
178 LineageError::Incomparable => write!(f, "incomparable"),
179 LineageError::PartiallyDescends { meet } => {
180 write!(f, "partially descends: [")?;
181 let meets: Vec<_> = meet.iter().map(|id| id.to_base64_short()).collect();
182 write!(f, "{}]", meets.join(", "))
183 }
184 LineageError::BudgetExceeded { original_budget, subject_frontier, other_frontier } => {
185 let subject: Vec<_> = subject_frontier.iter().map(|id| id.to_base64_short()).collect();
186 let other: Vec<_> = other_frontier.iter().map(|id| id.to_base64_short()).collect();
187 write!(f, "budget exceeded ({}): subject[{}] other[{}]", original_budget, subject.join(", "), other.join(", "))
188 }
189 }
190 }
191}
192
193impl std::error::Error for LineageError {}
194
195impl From<LineageError> for MutationError {
196 fn from(err: LineageError) -> Self { MutationError::LineageError(err) }
197}
198
199impl From<DecodeError> for MutationError {
200 fn from(err: DecodeError) -> Self { MutationError::DecodeError(err) }
201}
202
203#[cfg(feature = "wasm")]
204impl From<MutationError> for wasm_bindgen::JsValue {
205 fn from(err: MutationError) -> Self { err.to_string().into() }
206}
207#[cfg(feature = "wasm")]
208impl From<RetrievalError> for wasm_bindgen::JsValue {
209 fn from(err: RetrievalError) -> Self { err.to_string().into() }
210}
211
212impl From<AccessDenied> for MutationError {
213 fn from(err: AccessDenied) -> Self { MutationError::AccessDenied(err) }
214}
215
216impl From<bincode::Error> for MutationError {
217 fn from(e: bincode::Error) -> Self { MutationError::StateError(StateError::SerializationError(e)) }
218}
219
220impl From<RetrievalError> for MutationError {
221 fn from(err: RetrievalError) -> Self {
222 match err {
223 RetrievalError::AccessDenied(a) => MutationError::AccessDenied(a),
224 _ => MutationError::RetrievalError(err),
225 }
226 }
227}
228impl From<AccessDenied> for RetrievalError {
229 fn from(err: AccessDenied) -> Self { RetrievalError::AccessDenied(err) }
230}
231
232impl From<SubscriptionError> for RetrievalError {
233 fn from(err: SubscriptionError) -> Self { RetrievalError::Anyhow(anyhow::anyhow!("Subscription error: {:?}", err)) }
234}
235
236#[derive(Error, Debug)]
237pub enum StateError {
238 #[error("serialization error: {0}")]
239 SerializationError(Box<dyn std::error::Error + Send + Sync + 'static>),
240 #[error("DDL error: {0}")]
241 DDLError(Box<dyn std::error::Error + Send + Sync + 'static>),
242 #[error("DMLError: {0}")]
243 DMLError(Box<dyn std::error::Error + Send + Sync + 'static>),
244}
245
246impl From<bincode::Error> for StateError {
247 fn from(e: bincode::Error) -> Self { StateError::SerializationError(Box::new(e)) }
248}
249
250impl From<StateError> for MutationError {
251 fn from(err: StateError) -> Self { MutationError::StateError(err) }
252}
253
254impl From<crate::property::PropertyError> for MutationError {
255 fn from(err: crate::property::PropertyError) -> Self { MutationError::PropertyError(err) }
256}
257
258impl From<StateError> for RetrievalError {
259 fn from(err: StateError) -> Self { RetrievalError::StateError(err) }
260}
261
262#[derive(Error, Debug)]
263pub enum ValidationError {
264 #[error("Deserialization error: {0}")]
265 Deserialization(Box<dyn std::error::Error + Send + Sync + 'static>),
266 #[error("Validation failed: {0}")]
267 ValidationFailed(String),
268 #[error("Serialization error: {0}")]
269 Serialization(String),
270 #[error("Rejected: {0}")]
271 Rejected(&'static str),
272}
273
274#[derive(Debug)]
276pub enum ApplyError {
277 Items(Vec<ApplyErrorItem>),
278 CollectionNotFound(CollectionId),
279 RetrievalError(Box<RetrievalError>),
280 MutationError(Box<MutationError>),
281}
282
283impl std::fmt::Display for ApplyError {
284 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
285 match self {
286 ApplyError::Items(errors) => {
287 write!(f, "Failed to apply {} delta(s)", errors.len())?;
288 for (i, err) in errors.iter().enumerate() {
289 write!(f, "\n [{}] {}", i + 1, err)?;
290 }
291 Ok(())
292 }
293 ApplyError::CollectionNotFound(id) => write!(f, "Collection not found: {}", id),
294 ApplyError::RetrievalError(e) => write!(f, "Retrieval error: {}", e),
295 ApplyError::MutationError(e) => write!(f, "Mutation error: {}", e),
296 }
297 }
298}
299
300impl std::error::Error for ApplyError {
301 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
302 match self {
303 ApplyError::RetrievalError(e) => Some(e),
304 ApplyError::MutationError(e) => Some(e),
305 _ => None,
306 }
307 }
308}
309
310#[derive(Debug)]
312pub struct ApplyErrorItem {
313 pub entity_id: EntityId,
314 pub collection: CollectionId,
315 pub cause: MutationError,
316}
317
318impl std::fmt::Display for ApplyErrorItem {
319 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
320 write!(f, "Failed to apply delta for entity {} in collection {}: {}", self.entity_id.to_base64_short(), self.collection, self.cause)
321 }
322}
323
324impl From<RetrievalError> for ApplyError {
325 fn from(err: RetrievalError) -> Self { ApplyError::RetrievalError(Box::new(err)) }
326}
327
328impl From<MutationError> for ApplyError {
329 fn from(err: MutationError) -> Self { ApplyError::MutationError(Box::new(err)) }
330}
331
332impl From<ApplyError> for RetrievalError {
333 fn from(err: ApplyError) -> Self { RetrievalError::ApplyError(err) }
334}