Skip to main content

fluss/
error.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18pub use crate::rpc::RpcError;
19pub use crate::rpc::{ApiError, FlussError};
20
21use arrow_schema::ArrowError;
22use snafu::Snafu;
23use std::{io, result};
24use strum::ParseError;
25
26pub type Result<T> = result::Result<T, Error>;
27
28#[derive(Debug, Snafu)]
29pub enum Error {
30    #[snafu(
31        whatever,
32        display("Fluss hitting unexpected error {}: {:?}", message, source)
33    )]
34    UnexpectedError {
35        message: String,
36        /// see <https://github.com/shepmaster/snafu/issues/446>
37        #[snafu(source(from(Box<dyn std::error::Error + Send + Sync + 'static>, Some)))]
38        source: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
39    },
40
41    #[snafu(
42        visibility(pub(crate)),
43        display("Fluss hitting unexpected io error {}: {:?}", message, source)
44    )]
45    IoUnexpectedError { message: String, source: io::Error },
46
47    #[snafu(
48        visibility(pub(crate)),
49        display(
50            "Fluss hitting remote storage unexpected error {}: {:?}",
51            message,
52            source
53        )
54    )]
55    RemoteStorageUnexpectedError {
56        message: String,
57        source: opendal::Error,
58    },
59
60    #[snafu(
61        visibility(pub(crate)),
62        display("Fluss hitting json serde error {}.", message)
63    )]
64    JsonSerdeError { message: String },
65
66    #[snafu(
67        visibility(pub(crate)),
68        display("Fluss hitting unexpected rpc error {}: {:?}", message, source)
69    )]
70    RpcError { message: String, source: RpcError },
71
72    #[snafu(
73        visibility(pub(crate)),
74        display("Fluss hitting row convert error {}.", message)
75    )]
76    RowConvertError { message: String },
77
78    #[snafu(
79        visibility(pub(crate)),
80        display("Fluss hitting Arrow error {}: {:?}.", message, source)
81    )]
82    ArrowError { message: String, source: ArrowError },
83
84    #[snafu(
85        visibility(pub(crate)),
86        display("Fluss hitting illegal argument error {}.", message)
87    )]
88    IllegalArgument { message: String },
89
90    #[snafu(
91        visibility(pub(crate)),
92        display("Fluss hitting IO not supported error {}.", message)
93    )]
94    IoUnsupported { message: String },
95
96    #[snafu(
97        visibility(pub(crate)),
98        display("Fluss hitting wakeup error {}.", message)
99    )]
100    WakeupError { message: String },
101    #[snafu(
102        visibility(pub(crate)),
103        display("Fluss hitting unsupported operation error {}.", message)
104    )]
105    UnsupportedOperation { message: String },
106
107    #[snafu(visibility(pub(crate)), display("Fluss writer closed: {}.", message))]
108    WriterClosed { message: String },
109
110    #[snafu(
111        visibility(pub(crate)),
112        display("Fluss buffer exhausted: {}.", message)
113    )]
114    BufferExhausted { message: String },
115
116    #[snafu(visibility(pub(crate)), display("Fluss API Error: {}.", api_error))]
117    FlussAPIError { api_error: ApiError },
118}
119
120/// Convenience constructors for API errors that may be raised client-side.
121/// These create `FlussAPIError` with the correct protocol error code,
122/// consistent with Java where e.g. `InvalidTableException` always carries code 15.
123impl Error {
124    pub fn table_not_exist(message: impl Into<String>) -> Self {
125        Error::FlussAPIError {
126            api_error: ApiError {
127                code: FlussError::TableNotExist.code(),
128                message: message.into(),
129            },
130        }
131    }
132
133    pub fn invalid_table(message: impl Into<String>) -> Self {
134        Error::FlussAPIError {
135            api_error: ApiError {
136                code: FlussError::InvalidTableException.code(),
137                message: message.into(),
138            },
139        }
140    }
141
142    pub fn partition_not_exist(message: impl Into<String>) -> Self {
143        Error::FlussAPIError {
144            api_error: ApiError {
145                code: FlussError::PartitionNotExists.code(),
146                message: message.into(),
147            },
148        }
149    }
150
151    pub fn invalid_partition(message: impl Into<String>) -> Self {
152        Error::FlussAPIError {
153            api_error: ApiError {
154                code: FlussError::PartitionSpecInvalidException.code(),
155                message: message.into(),
156            },
157        }
158    }
159
160    pub fn leader_not_available(message: impl Into<String>) -> Self {
161        Error::FlussAPIError {
162            api_error: ApiError {
163                code: FlussError::LeaderNotAvailableException.code(),
164                message: message.into(),
165            },
166        }
167    }
168
169    /// Returns the API error kind if this is an API error, for ergonomic pattern matching.
170    pub fn api_error(&self) -> Option<FlussError> {
171        if let Error::FlussAPIError { api_error } = self {
172            Some(FlussError::for_code(api_error.code))
173        } else {
174            None
175        }
176    }
177
178    /// Returns `true` if retrying the request may succeed.
179    /// [`Error::RpcError`] is always retriable; [`Error::FlussAPIError`] delegates to
180    /// [`ApiError::is_retriable`]; all other variants are not.
181    pub fn is_retriable(&self) -> bool {
182        match self {
183            Error::RpcError { .. } => true,
184            Error::FlussAPIError { api_error } => api_error.is_retriable(),
185            _ => false,
186        }
187    }
188}
189
190impl From<ArrowError> for Error {
191    fn from(value: ArrowError) -> Self {
192        Error::ArrowError {
193            message: format!("{value}"),
194            source: value,
195        }
196    }
197}
198
199impl From<RpcError> for Error {
200    fn from(value: RpcError) -> Self {
201        Error::RpcError {
202            message: format!("{value}"),
203            source: value,
204        }
205    }
206}
207
208impl From<io::Error> for Error {
209    fn from(value: io::Error) -> Self {
210        Error::IoUnexpectedError {
211            message: format!("{value}"),
212            source: value,
213        }
214    }
215}
216
217impl From<opendal::Error> for Error {
218    fn from(value: opendal::Error) -> Self {
219        Error::RemoteStorageUnexpectedError {
220            message: format!("{value}"),
221            source: value,
222        }
223    }
224}
225
226impl From<ApiError> for Error {
227    fn from(value: ApiError) -> Self {
228        Error::FlussAPIError { api_error: value }
229    }
230}
231
232impl From<ParseError> for Error {
233    fn from(value: ParseError) -> Self {
234        Error::IllegalArgument {
235            message: value.to_string(),
236        }
237    }
238}