sierradb_cluster/write/
error.rs1use std::time::Duration;
2
3use kameo::prelude::*;
4use serde::{Deserialize, Serialize};
5use sierradb::{
6 StreamId,
7 bucket::PartitionId,
8 database::{CurrentVersion, ExpectedVersion},
9};
10use thiserror::Error;
11use uuid::Uuid;
12
13#[derive(Clone, Debug, Error, Serialize, Deserialize)]
14pub enum WriteError {
15 #[error("all replica nodes failed to process the write request")]
16 AllReplicasFailed,
17
18 #[error("replicate write buffer evicted to make room for lower write")]
19 BufferEvicted,
20
21 #[error("replicate write buffer is full")]
22 BufferFull,
23
24 #[error("circuit breaker open: estimated recovery time: {estimated_recovery_time:?}")]
25 CircuitBreakerOpen {
26 estimated_recovery_time: Option<Duration>,
27 },
28
29 #[error("failed to update coordinator confirmation count: {0}")]
30 ConfirmationFailed(String),
31
32 #[error("failed to update replication confirmation count: {0}")]
33 ReplicationConfirmationFailed(String),
34
35 #[error("database operation failed: {0}")]
36 DatabaseOperationFailed(String),
37
38 #[error(
39 "insufficient healthy replicas for write quorum ({available}/{required} replicas available)"
40 )]
41 InsufficientHealthyReplicas { available: u8, required: u8 },
42
43 #[error("coordinator is not healthy")]
44 InvalidSender,
45
46 #[error("stale write: coordinator has been alive longer than our local record of it")]
47 StaleWrite,
48
49 #[error("transaction is missing an expected partition sequence")]
50 MissingExpectedPartitionSequence,
51
52 #[error("partition {partition_id} is not owned by this node")]
53 PartitionNotOwned { partition_id: PartitionId },
54
55 #[error(
56 "write replication quorum not achieved ({confirmed}/{required} confirmations received)"
57 )]
58 ReplicationQuorumFailed { confirmed: u8, required: u8 },
59
60 #[error("write operation timed out")]
61 RequestTimeout,
62
63 #[error("write request exceeded maximum forward hops ({max} allowed)")]
64 MaximumForwardsExceeded { max: u8 },
65
66 #[error("remote operation failed: {0}")]
67 RemoteOperationFailed(String),
68
69 #[error("sequence is already being processed")]
70 SequenceConflict,
71
72 #[error(
73 "current partition sequence is {current} but expected {expected} for partition {partition_id}"
74 )]
75 WrongExpectedSequence {
76 partition_id: PartitionId,
77 current: CurrentVersion,
78 expected: ExpectedVersion,
79 },
80
81 #[error(
82 "current stream version is {current} but expected {expected} for partition key {partition_key} and stream id {stream_id}"
83 )]
84 WrongExpectedVersion {
85 partition_key: Uuid,
86 stream_id: StreamId,
87 current: CurrentVersion,
88 expected: ExpectedVersion,
89 },
90}
91
92impl WriteError {
93 pub fn code(&self) -> &'static str {
94 match self {
95 WriteError::AllReplicasFailed => "ALLREPLICASFAILED",
96 WriteError::BufferEvicted => "BUFFEREVICTED",
97 WriteError::BufferFull => "BUFFERFULL",
98 WriteError::CircuitBreakerOpen { .. } => "CIRCUITOPEN",
99 WriteError::ConfirmationFailed(_) => "CONFIRMFAILED",
100 WriteError::ReplicationConfirmationFailed(_) => "REPLCONFIRMFAILED",
101 WriteError::DatabaseOperationFailed(_) => "DBOPFAILED",
102 WriteError::InsufficientHealthyReplicas { .. } => "INSUFFICIENTREPLICAS",
103 WriteError::InvalidSender => "INVALIDSENDER",
104 WriteError::StaleWrite => "STALEWRITE",
105 WriteError::MissingExpectedPartitionSequence => "MISSINGPARTSEQ",
106 WriteError::PartitionNotOwned { .. } => "PARTNOTOWNED",
107 WriteError::ReplicationQuorumFailed { .. } => "QUORUMFAILED",
108 WriteError::RequestTimeout => "TIMEOUT",
109 WriteError::MaximumForwardsExceeded { .. } => "MAXFORWARDS",
110 WriteError::RemoteOperationFailed(_) => "REMOTEOPFAILED",
111 WriteError::SequenceConflict => "SEQCONFLICT",
112 WriteError::WrongExpectedSequence { .. } => "WRONGSEQ",
113 WriteError::WrongExpectedVersion { .. } => "WRONGVER",
114 }
115 }
116}
117
118impl From<sierradb::error::WriteError> for WriteError {
119 fn from(err: sierradb::error::WriteError) -> Self {
120 match err {
121 sierradb::error::WriteError::WrongExpectedVersion {
122 partition_key,
123 stream_id,
124 current,
125 expected,
126 } => WriteError::WrongExpectedVersion {
127 partition_key,
128 stream_id,
129 current,
130 expected,
131 },
132 err => WriteError::DatabaseOperationFailed(err.to_string()),
133 }
134 }
135}
136
137impl From<RemoteSendError<WriteError>> for WriteError {
138 fn from(err: RemoteSendError<WriteError>) -> Self {
139 WriteError::RemoteOperationFailed(err.to_string())
140 }
141}
142
143#[derive(Debug, Error, Serialize, Deserialize)]
144pub enum ConfirmTransactionError {
145 #[error("events length mismatch")]
146 EventsLengthMismatch,
147 #[error("event id mismatch")]
148 EventIdMismatch,
149 #[error("partition sequence mismatch (expected {expected}, got {actual})")]
150 PartitionSequenceMismatch { expected: u64, actual: u64 },
151 #[error("transaction not found")]
152 TransactionNotFound,
153 #[error("read error: {0}")]
154 Read(String),
155 #[error("write error: {0}")]
156 Write(String),
157}