Skip to main content

sierradb_cluster/write/
error.rs

1use std::time::Duration;
2
3use kameo::prelude::*;
4use serde::{Deserialize, Serialize};
5use sierradb::{
6    StreamId,
7    bucket::PartitionId,
8    database::{CurrentVersion, ExpectedVersion},
9};
10use thiserror::Error;
11use uuid::Uuid;
12
13#[derive(Clone, Debug, Error, Serialize, Deserialize)]
14pub enum WriteError {
15    #[error("all replica nodes failed to process the write request")]
16    AllReplicasFailed,
17
18    #[error("replicate write buffer evicted to make room for lower write")]
19    BufferEvicted,
20
21    #[error("replicate write buffer is full")]
22    BufferFull,
23
24    #[error("circuit breaker open: estimated recovery time: {estimated_recovery_time:?}")]
25    CircuitBreakerOpen {
26        estimated_recovery_time: Option<Duration>,
27    },
28
29    #[error("failed to update coordinator confirmation count: {0}")]
30    ConfirmationFailed(String),
31
32    #[error("failed to update replication confirmation count: {0}")]
33    ReplicationConfirmationFailed(String),
34
35    #[error("database operation failed: {0}")]
36    DatabaseOperationFailed(String),
37
38    #[error(
39        "insufficient healthy replicas for write quorum ({available}/{required} replicas available)"
40    )]
41    InsufficientHealthyReplicas { available: u8, required: u8 },
42
43    #[error("coordinator is not healthy")]
44    InvalidSender,
45
46    #[error("stale write: coordinator has been alive longer than our local record of it")]
47    StaleWrite,
48
49    #[error("transaction is missing an expected partition sequence")]
50    MissingExpectedPartitionSequence,
51
52    #[error("partition {partition_id} is not owned by this node")]
53    PartitionNotOwned { partition_id: PartitionId },
54
55    #[error(
56        "write replication quorum not achieved ({confirmed}/{required} confirmations received)"
57    )]
58    ReplicationQuorumFailed { confirmed: u8, required: u8 },
59
60    #[error("write operation timed out")]
61    RequestTimeout,
62
63    #[error("write request exceeded maximum forward hops ({max} allowed)")]
64    MaximumForwardsExceeded { max: u8 },
65
66    #[error("remote operation failed: {0}")]
67    RemoteOperationFailed(String),
68
69    #[error("sequence is already being processed")]
70    SequenceConflict,
71
72    #[error(
73        "current partition sequence is {current} but expected {expected} for partition {partition_id}"
74    )]
75    WrongExpectedSequence {
76        partition_id: PartitionId,
77        current: CurrentVersion,
78        expected: ExpectedVersion,
79    },
80
81    #[error(
82        "current stream version is {current} but expected {expected} for partition key {partition_key} and stream id {stream_id}"
83    )]
84    WrongExpectedVersion {
85        partition_key: Uuid,
86        stream_id: StreamId,
87        current: CurrentVersion,
88        expected: ExpectedVersion,
89    },
90}
91
92impl WriteError {
93    pub fn code(&self) -> &'static str {
94        match self {
95            WriteError::AllReplicasFailed => "ALLREPLICASFAILED",
96            WriteError::BufferEvicted => "BUFFEREVICTED",
97            WriteError::BufferFull => "BUFFERFULL",
98            WriteError::CircuitBreakerOpen { .. } => "CIRCUITOPEN",
99            WriteError::ConfirmationFailed(_) => "CONFIRMFAILED",
100            WriteError::ReplicationConfirmationFailed(_) => "REPLCONFIRMFAILED",
101            WriteError::DatabaseOperationFailed(_) => "DBOPFAILED",
102            WriteError::InsufficientHealthyReplicas { .. } => "INSUFFICIENTREPLICAS",
103            WriteError::InvalidSender => "INVALIDSENDER",
104            WriteError::StaleWrite => "STALEWRITE",
105            WriteError::MissingExpectedPartitionSequence => "MISSINGPARTSEQ",
106            WriteError::PartitionNotOwned { .. } => "PARTNOTOWNED",
107            WriteError::ReplicationQuorumFailed { .. } => "QUORUMFAILED",
108            WriteError::RequestTimeout => "TIMEOUT",
109            WriteError::MaximumForwardsExceeded { .. } => "MAXFORWARDS",
110            WriteError::RemoteOperationFailed(_) => "REMOTEOPFAILED",
111            WriteError::SequenceConflict => "SEQCONFLICT",
112            WriteError::WrongExpectedSequence { .. } => "WRONGSEQ",
113            WriteError::WrongExpectedVersion { .. } => "WRONGVER",
114        }
115    }
116}
117
118impl From<sierradb::error::WriteError> for WriteError {
119    fn from(err: sierradb::error::WriteError) -> Self {
120        match err {
121            sierradb::error::WriteError::WrongExpectedVersion {
122                partition_key,
123                stream_id,
124                current,
125                expected,
126            } => WriteError::WrongExpectedVersion {
127                partition_key,
128                stream_id,
129                current,
130                expected,
131            },
132            err => WriteError::DatabaseOperationFailed(err.to_string()),
133        }
134    }
135}
136
137impl From<RemoteSendError<WriteError>> for WriteError {
138    fn from(err: RemoteSendError<WriteError>) -> Self {
139        WriteError::RemoteOperationFailed(err.to_string())
140    }
141}
142
143#[derive(Debug, Error, Serialize, Deserialize)]
144pub enum ConfirmTransactionError {
145    #[error("events length mismatch")]
146    EventsLengthMismatch,
147    #[error("event id mismatch")]
148    EventIdMismatch,
149    #[error("partition sequence mismatch (expected {expected}, got {actual})")]
150    PartitionSequenceMismatch { expected: u64, actual: u64 },
151    #[error("transaction not found")]
152    TransactionNotFound,
153    #[error("read error: {0}")]
154    Read(String),
155    #[error("write error: {0}")]
156    Write(String),
157}