1pub use crate::rpc::RpcError;
19pub use crate::rpc::{ApiError, FlussError};
20
21use arrow_schema::ArrowError;
22use snafu::Snafu;
23use std::{io, result};
24use strum::ParseError;
25
26pub type Result<T> = result::Result<T, Error>;
27
28#[derive(Debug, Snafu)]
29pub enum Error {
30 #[snafu(
31 whatever,
32 display("Fluss hitting unexpected error {}: {:?}", message, source)
33 )]
34 UnexpectedError {
35 message: String,
36 #[snafu(source(from(Box<dyn std::error::Error + Send + Sync + 'static>, Some)))]
38 source: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
39 },
40
41 #[snafu(
42 visibility(pub(crate)),
43 display("Fluss hitting unexpected io error {}: {:?}", message, source)
44 )]
45 IoUnexpectedError { message: String, source: io::Error },
46
47 #[snafu(
48 visibility(pub(crate)),
49 display(
50 "Fluss hitting remote storage unexpected error {}: {:?}",
51 message,
52 source
53 )
54 )]
55 RemoteStorageUnexpectedError {
56 message: String,
57 source: opendal::Error,
58 },
59
60 #[snafu(
61 visibility(pub(crate)),
62 display("Fluss hitting json serde error {}.", message)
63 )]
64 JsonSerdeError { message: String },
65
66 #[snafu(
67 visibility(pub(crate)),
68 display("Fluss hitting unexpected rpc error {}: {:?}", message, source)
69 )]
70 RpcError { message: String, source: RpcError },
71
72 #[snafu(
73 visibility(pub(crate)),
74 display("Fluss hitting row convert error {}.", message)
75 )]
76 RowConvertError { message: String },
77
78 #[snafu(
79 visibility(pub(crate)),
80 display("Fluss hitting Arrow error {}: {:?}.", message, source)
81 )]
82 ArrowError { message: String, source: ArrowError },
83
84 #[snafu(
85 visibility(pub(crate)),
86 display("Fluss hitting illegal argument error {}.", message)
87 )]
88 IllegalArgument { message: String },
89
90 #[snafu(
91 visibility(pub(crate)),
92 display("Fluss hitting IO not supported error {}.", message)
93 )]
94 IoUnsupported { message: String },
95
96 #[snafu(
97 visibility(pub(crate)),
98 display("Fluss hitting wakeup error {}.", message)
99 )]
100 WakeupError { message: String },
101 #[snafu(
102 visibility(pub(crate)),
103 display("Fluss hitting unsupported operation error {}.", message)
104 )]
105 UnsupportedOperation { message: String },
106
107 #[snafu(visibility(pub(crate)), display("Fluss writer closed: {}.", message))]
108 WriterClosed { message: String },
109
110 #[snafu(
111 visibility(pub(crate)),
112 display("Fluss buffer exhausted: {}.", message)
113 )]
114 BufferExhausted { message: String },
115
116 #[snafu(visibility(pub(crate)), display("Fluss API Error: {}.", api_error))]
117 FlussAPIError { api_error: ApiError },
118}
119
120impl Error {
124 pub fn table_not_exist(message: impl Into<String>) -> Self {
125 Error::FlussAPIError {
126 api_error: ApiError {
127 code: FlussError::TableNotExist.code(),
128 message: message.into(),
129 },
130 }
131 }
132
133 pub fn invalid_table(message: impl Into<String>) -> Self {
134 Error::FlussAPIError {
135 api_error: ApiError {
136 code: FlussError::InvalidTableException.code(),
137 message: message.into(),
138 },
139 }
140 }
141
142 pub fn partition_not_exist(message: impl Into<String>) -> Self {
143 Error::FlussAPIError {
144 api_error: ApiError {
145 code: FlussError::PartitionNotExists.code(),
146 message: message.into(),
147 },
148 }
149 }
150
151 pub fn invalid_partition(message: impl Into<String>) -> Self {
152 Error::FlussAPIError {
153 api_error: ApiError {
154 code: FlussError::PartitionSpecInvalidException.code(),
155 message: message.into(),
156 },
157 }
158 }
159
160 pub fn leader_not_available(message: impl Into<String>) -> Self {
161 Error::FlussAPIError {
162 api_error: ApiError {
163 code: FlussError::LeaderNotAvailableException.code(),
164 message: message.into(),
165 },
166 }
167 }
168
169 pub fn api_error(&self) -> Option<FlussError> {
171 if let Error::FlussAPIError { api_error } = self {
172 Some(FlussError::for_code(api_error.code))
173 } else {
174 None
175 }
176 }
177
178 pub fn is_retriable(&self) -> bool {
182 match self {
183 Error::RpcError { .. } => true,
184 Error::FlussAPIError { api_error } => api_error.is_retriable(),
185 _ => false,
186 }
187 }
188}
189
190impl From<ArrowError> for Error {
191 fn from(value: ArrowError) -> Self {
192 Error::ArrowError {
193 message: format!("{value}"),
194 source: value,
195 }
196 }
197}
198
199impl From<RpcError> for Error {
200 fn from(value: RpcError) -> Self {
201 Error::RpcError {
202 message: format!("{value}"),
203 source: value,
204 }
205 }
206}
207
208impl From<io::Error> for Error {
209 fn from(value: io::Error) -> Self {
210 Error::IoUnexpectedError {
211 message: format!("{value}"),
212 source: value,
213 }
214 }
215}
216
217impl From<opendal::Error> for Error {
218 fn from(value: opendal::Error) -> Self {
219 Error::RemoteStorageUnexpectedError {
220 message: format!("{value}"),
221 source: value,
222 }
223 }
224}
225
226impl From<ApiError> for Error {
227 fn from(value: ApiError) -> Self {
228 Error::FlussAPIError { api_error: value }
229 }
230}
231
232impl From<ParseError> for Error {
233 fn from(value: ParseError) -> Self {
234 Error::IllegalArgument {
235 message: value.to_string(),
236 }
237 }
238}