1use ursula_shard::{CoreId, RaftGroupId, ShardMapError, ShardPlacement};
2use ursula_stream::StreamErrorCode;
3
4use crate::engine::{GroupEngineError, GroupLeaderHint};
5
6#[derive(Debug, Clone, PartialEq, Eq)]
7pub enum RuntimeError {
8 InvalidConfig(ShardMapError),
9 InvalidRaftGroup {
10 raft_group_id: RaftGroupId,
11 raft_group_count: u32,
12 },
13 SnapshotPlacementMismatch {
14 expected: ShardPlacement,
15 actual: ShardPlacement,
16 },
17 EmptyAppend,
18 ColdStoreConfig {
19 message: String,
20 },
21 ColdStoreIo {
22 message: String,
23 },
24 LiveReadBackpressure {
25 core_id: CoreId,
26 current_waiters: u64,
27 limit: u64,
28 },
29 GroupEngine {
30 core_id: CoreId,
31 raft_group_id: RaftGroupId,
32 message: String,
33 next_offset: Option<u64>,
34 leader_hint: Option<GroupLeaderHint>,
35 },
36 MailboxClosed {
37 core_id: CoreId,
38 },
39 ResponseDropped {
40 core_id: CoreId,
41 },
42 SpawnCoreThread {
43 core_id: CoreId,
44 message: String,
45 },
46}
47
48impl RuntimeError {
49 pub(crate) fn group_engine(placement: ShardPlacement, err: GroupEngineError) -> Self {
50 Self::GroupEngine {
51 core_id: placement.core_id,
52 raft_group_id: placement.raft_group_id,
53 message: err.message().to_owned(),
54 next_offset: err.next_offset(),
55 leader_hint: err.leader_hint().cloned(),
56 }
57 }
58}
59
60impl std::fmt::Display for RuntimeError {
61 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 match self {
63 Self::InvalidConfig(err) => write!(f, "invalid shard runtime config: {err}"),
64 Self::InvalidRaftGroup {
65 raft_group_id,
66 raft_group_count,
67 } => write!(
68 f,
69 "raft group {} is outside configured range 0..{}",
70 raft_group_id.0, raft_group_count
71 ),
72 Self::SnapshotPlacementMismatch { expected, actual } => write!(
73 f,
74 "snapshot placement for raft group {} is core {}, expected core {}",
75 actual.raft_group_id.0, actual.core_id.0, expected.core_id.0
76 ),
77 Self::EmptyAppend => f.write_str("append payload must be non-empty"),
78 Self::ColdStoreConfig { message } => {
79 write!(f, "invalid cold store config: {message}")
80 }
81 Self::ColdStoreIo { message } => write!(f, "cold store IO error: {message}"),
82 Self::LiveReadBackpressure {
83 core_id,
84 current_waiters,
85 limit,
86 } => write!(
87 f,
88 "core {} live read waiters at {} would exceed limit {}",
89 core_id.0, current_waiters, limit
90 ),
91 Self::GroupEngine {
92 core_id,
93 raft_group_id,
94 message,
95 ..
96 } => write!(
97 f,
98 "core {} raft group {} append failed: {message}",
99 core_id.0, raft_group_id.0
100 ),
101 Self::MailboxClosed { core_id } => {
102 write!(f, "core {} mailbox is closed", core_id.0)
103 }
104 Self::ResponseDropped { core_id } => {
105 write!(f, "core {} dropped append response", core_id.0)
106 }
107 Self::SpawnCoreThread { core_id, message } => {
108 write!(f, "failed to spawn core {} thread: {message}", core_id.0)
109 }
110 }
111 }
112}
113
114impl std::error::Error for RuntimeError {}
115
116impl From<ShardMapError> for RuntimeError {
117 fn from(value: ShardMapError) -> Self {
118 Self::InvalidConfig(value)
119 }
120}
121
122pub(crate) fn map_fork_source_ref_error(
123 err: RuntimeError,
124 placement: ShardPlacement,
125) -> RuntimeError {
126 if let RuntimeError::GroupEngine { message, .. } = &err
127 && message.contains("StreamGone")
128 {
129 return RuntimeError::group_engine(
130 placement,
131 GroupEngineError::stream(
132 StreamErrorCode::StreamAlreadyExistsConflict,
133 "source stream is gone and cannot be forked",
134 ),
135 );
136 }
137 err
138}