Skip to main content

ursula_runtime/
error.rs

1use ursula_shard::{CoreId, RaftGroupId, ShardMapError, ShardPlacement};
2use ursula_stream::StreamErrorCode;
3
4use crate::engine::{GroupEngineError, GroupLeaderHint};
5
6#[derive(Debug, Clone, PartialEq, Eq)]
7pub enum RuntimeError {
8    InvalidConfig(ShardMapError),
9    InvalidRaftGroup {
10        raft_group_id: RaftGroupId,
11        raft_group_count: u32,
12    },
13    SnapshotPlacementMismatch {
14        expected: ShardPlacement,
15        actual: ShardPlacement,
16    },
17    EmptyAppend,
18    ColdStoreConfig {
19        message: String,
20    },
21    ColdStoreIo {
22        message: String,
23    },
24    LiveReadBackpressure {
25        core_id: CoreId,
26        current_waiters: u64,
27        limit: u64,
28    },
29    GroupEngine {
30        core_id: CoreId,
31        raft_group_id: RaftGroupId,
32        message: String,
33        next_offset: Option<u64>,
34        leader_hint: Option<GroupLeaderHint>,
35    },
36    MailboxClosed {
37        core_id: CoreId,
38    },
39    ResponseDropped {
40        core_id: CoreId,
41    },
42    SpawnCoreThread {
43        core_id: CoreId,
44        message: String,
45    },
46}
47
48impl RuntimeError {
49    pub(crate) fn group_engine(placement: ShardPlacement, err: GroupEngineError) -> Self {
50        Self::GroupEngine {
51            core_id: placement.core_id,
52            raft_group_id: placement.raft_group_id,
53            message: err.message().to_owned(),
54            next_offset: err.next_offset(),
55            leader_hint: err.leader_hint().cloned(),
56        }
57    }
58}
59
60impl std::fmt::Display for RuntimeError {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        match self {
63            Self::InvalidConfig(err) => write!(f, "invalid shard runtime config: {err}"),
64            Self::InvalidRaftGroup {
65                raft_group_id,
66                raft_group_count,
67            } => write!(
68                f,
69                "raft group {} is outside configured range 0..{}",
70                raft_group_id.0, raft_group_count
71            ),
72            Self::SnapshotPlacementMismatch { expected, actual } => write!(
73                f,
74                "snapshot placement for raft group {} is core {}, expected core {}",
75                actual.raft_group_id.0, actual.core_id.0, expected.core_id.0
76            ),
77            Self::EmptyAppend => f.write_str("append payload must be non-empty"),
78            Self::ColdStoreConfig { message } => {
79                write!(f, "invalid cold store config: {message}")
80            }
81            Self::ColdStoreIo { message } => write!(f, "cold store IO error: {message}"),
82            Self::LiveReadBackpressure {
83                core_id,
84                current_waiters,
85                limit,
86            } => write!(
87                f,
88                "core {} live read waiters at {} would exceed limit {}",
89                core_id.0, current_waiters, limit
90            ),
91            Self::GroupEngine {
92                core_id,
93                raft_group_id,
94                message,
95                ..
96            } => write!(
97                f,
98                "core {} raft group {} append failed: {message}",
99                core_id.0, raft_group_id.0
100            ),
101            Self::MailboxClosed { core_id } => {
102                write!(f, "core {} mailbox is closed", core_id.0)
103            }
104            Self::ResponseDropped { core_id } => {
105                write!(f, "core {} dropped append response", core_id.0)
106            }
107            Self::SpawnCoreThread { core_id, message } => {
108                write!(f, "failed to spawn core {} thread: {message}", core_id.0)
109            }
110        }
111    }
112}
113
114impl std::error::Error for RuntimeError {}
115
116impl From<ShardMapError> for RuntimeError {
117    fn from(value: ShardMapError) -> Self {
118        Self::InvalidConfig(value)
119    }
120}
121
122pub(crate) fn map_fork_source_ref_error(
123    err: RuntimeError,
124    placement: ShardPlacement,
125) -> RuntimeError {
126    if let RuntimeError::GroupEngine { message, .. } = &err
127        && message.contains("StreamGone")
128    {
129        return RuntimeError::group_engine(
130            placement,
131            GroupEngineError::stream(
132                StreamErrorCode::StreamAlreadyExistsConflict,
133                "source stream is gone and cannot be forked",
134            ),
135        );
136    }
137    err
138}