Skip to main content

fluss/rpc/
fluss_api_error.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::proto::ErrorResponse;
19use std::fmt::{Debug, Display, Formatter};
20
21/// API error response from Fluss server
22pub struct ApiError {
23    pub code: i32,
24    pub message: String,
25}
26
27impl Debug for ApiError {
28    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
29        f.debug_struct("ApiError")
30            .field("code", &self.code)
31            .field("message", &self.message)
32            .finish()
33    }
34}
35
36impl Display for ApiError {
37    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
38        Debug::fmt(self, f)
39    }
40}
41
42impl ApiError {
43    /// Returns `true` if retrying the request may succeed. Delegates to [`FlussError::is_retriable`].
44    pub fn is_retriable(&self) -> bool {
45        FlussError::for_code(self.code).is_retriable()
46    }
47}
48
49/// Fluss protocol errors. These errors are part of the client-server protocol.
50/// The error codes cannot be changed, but the names can be.
51///
52/// Do not add exceptions that occur only on the client or only on the server here.
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
54#[repr(i32)]
55pub enum FlussError {
56    /// The server experienced an unexpected error when processing the request.
57    UnknownServerError = -1,
58    /// No error occurred.
59    None = 0,
60    /// The server disconnected before a response was received.
61    NetworkException = 1,
62    /// The version of API is not supported.
63    UnsupportedVersion = 2,
64    /// This message has failed its CRC checksum, exceeds the valid size, has a null key for a primary key table, or is otherwise corrupt.
65    CorruptMessage = 3,
66    /// The database does not exist.
67    DatabaseNotExist = 4,
68    /// The database is not empty.
69    DatabaseNotEmpty = 5,
70    /// The database already exists.
71    DatabaseAlreadyExist = 6,
72    /// The table does not exist.
73    TableNotExist = 7,
74    /// The table already exists.
75    TableAlreadyExist = 8,
76    /// The schema does not exist.
77    SchemaNotExist = 9,
78    /// Exception occur while storage data for log in server.
79    LogStorageException = 10,
80    /// Exception occur while storage data for kv in server.
81    KvStorageException = 11,
82    /// Not leader or follower.
83    NotLeaderOrFollower = 12,
84    /// The record is too large.
85    RecordTooLargeException = 13,
86    /// The record is corrupt.
87    CorruptRecordException = 14,
88    /// The client has attempted to perform an operation on an invalid table.
89    InvalidTableException = 15,
90    /// The client has attempted to perform an operation on an invalid database.
91    InvalidDatabaseException = 16,
92    /// The replication factor is larger then the number of available tablet servers.
93    InvalidReplicationFactor = 17,
94    /// Produce request specified an invalid value for required acks.
95    InvalidRequiredAcks = 18,
96    /// The log offset is out of range.
97    LogOffsetOutOfRangeException = 19,
98    /// The table is not primary key table.
99    NonPrimaryKeyTableException = 20,
100    /// The table or bucket does not exist.
101    UnknownTableOrBucketException = 21,
102    /// The update version is invalid.
103    InvalidUpdateVersionException = 22,
104    /// The coordinator is invalid.
105    InvalidCoordinatorException = 23,
106    /// The leader epoch is invalid.
107    FencedLeaderEpochException = 24,
108    /// The request time out.
109    RequestTimeOut = 25,
110    /// The general storage exception.
111    StorageException = 26,
112    /// The server did not attempt to execute this operation.
113    OperationNotAttemptedException = 27,
114    /// Records are written to the server already, but to fewer in-sync replicas than required.
115    NotEnoughReplicasAfterAppendException = 28,
116    /// Messages are rejected since there are fewer in-sync replicas than required.
117    NotEnoughReplicasException = 29,
118    /// Get file access security token exception.
119    SecurityTokenException = 30,
120    /// The tablet server received an out of order sequence batch.
121    OutOfOrderSequenceException = 31,
122    /// The tablet server received a duplicate sequence batch.
123    DuplicateSequenceException = 32,
124    /// This exception is raised by the tablet server if it could not locate the writer metadata.
125    UnknownWriterIdException = 33,
126    /// The requested column projection is invalid.
127    InvalidColumnProjection = 34,
128    /// The requested target column to write is invalid.
129    InvalidTargetColumn = 35,
130    /// The partition does not exist.
131    PartitionNotExists = 36,
132    /// The table is not partitioned.
133    TableNotPartitionedException = 37,
134    /// The timestamp is invalid.
135    InvalidTimestampException = 38,
136    /// The config is invalid.
137    InvalidConfigException = 39,
138    /// The lake storage is not configured.
139    LakeStorageNotConfiguredException = 40,
140    /// The kv snapshot is not exist.
141    KvSnapshotNotExist = 41,
142    /// The partition already exists.
143    PartitionAlreadyExists = 42,
144    /// The partition spec is invalid.
145    PartitionSpecInvalidException = 43,
146    /// There is no currently available leader for the given partition.
147    LeaderNotAvailableException = 44,
148    /// Exceed the maximum number of partitions.
149    PartitionMaxNumException = 45,
150    /// Authentication failed.
151    AuthenticateException = 46,
152    /// Security is disabled.
153    SecurityDisabledException = 47,
154    /// Authorization failed.
155    AuthorizationException = 48,
156    /// Exceed the maximum number of buckets.
157    BucketMaxNumException = 49,
158    /// The tiering epoch is invalid.
159    FencedTieringEpochException = 50,
160    /// Authentication failed with retriable exception.
161    RetriableAuthenticateException = 51,
162    /// The server rack info is invalid.
163    InvalidServerRackInfoException = 52,
164    /// The lake snapshot is not exist.
165    LakeSnapshotNotExist = 53,
166    /// The lake table already exists.
167    LakeTableAlreadyExist = 54,
168    /// The new ISR contains at least one ineligible replica.
169    IneligibleReplicaException = 55,
170    /// The alter table is invalid.
171    InvalidAlterTableException = 56,
172    /// Deletion operations are disabled on this table.
173    DeletionDisabledException = 57,
174}
175
176impl FlussError {
177    /// Returns the error code for this error.
178    pub fn code(&self) -> i32 {
179        *self as i32
180    }
181
182    pub fn is_retriable(&self) -> bool {
183        matches!(
184            self,
185            FlussError::NetworkException
186                | FlussError::CorruptMessage
187                | FlussError::SchemaNotExist
188                | FlussError::LogStorageException
189                | FlussError::KvStorageException
190                | FlussError::NotLeaderOrFollower
191                | FlussError::CorruptRecordException
192                | FlussError::UnknownTableOrBucketException
193                | FlussError::RequestTimeOut
194                | FlussError::StorageException
195                | FlussError::NotEnoughReplicasAfterAppendException
196                | FlussError::NotEnoughReplicasException
197                | FlussError::LeaderNotAvailableException
198        )
199    }
200
201    /// Returns a friendly description of the error.
202    pub fn message(&self) -> &'static str {
203        match self {
204            FlussError::UnknownServerError => {
205                "The server experienced an unexpected error when processing the request."
206            }
207            FlussError::None => "No error",
208            FlussError::NetworkException => {
209                "The server disconnected before a response was received."
210            }
211            FlussError::UnsupportedVersion => "The version of API is not supported.",
212            FlussError::CorruptMessage => {
213                "This message has failed its CRC checksum, exceeds the valid size, has a null key for a primary key table, or is otherwise corrupt."
214            }
215            FlussError::DatabaseNotExist => "The database does not exist.",
216            FlussError::DatabaseNotEmpty => "The database is not empty.",
217            FlussError::DatabaseAlreadyExist => "The database already exists.",
218            FlussError::TableNotExist => "The table does not exist.",
219            FlussError::TableAlreadyExist => "The table already exists.",
220            FlussError::SchemaNotExist => "The schema does not exist.",
221            FlussError::LogStorageException => {
222                "Exception occur while storage data for log in server."
223            }
224            FlussError::KvStorageException => {
225                "Exception occur while storage data for kv in server."
226            }
227            FlussError::NotLeaderOrFollower => "Not leader or follower.",
228            FlussError::RecordTooLargeException => "The record is too large.",
229            FlussError::CorruptRecordException => "The record is corrupt.",
230            FlussError::InvalidTableException => {
231                "The client has attempted to perform an operation on an invalid table."
232            }
233            FlussError::InvalidDatabaseException => {
234                "The client has attempted to perform an operation on an invalid database."
235            }
236            FlussError::InvalidReplicationFactor => {
237                "The replication factor is larger then the number of available tablet servers."
238            }
239            FlussError::InvalidRequiredAcks => {
240                "Produce request specified an invalid value for required acks."
241            }
242            FlussError::LogOffsetOutOfRangeException => "The log offset is out of range.",
243            FlussError::NonPrimaryKeyTableException => "The table is not primary key table.",
244            FlussError::UnknownTableOrBucketException => "The table or bucket does not exist.",
245            FlussError::InvalidUpdateVersionException => "The update version is invalid.",
246            FlussError::InvalidCoordinatorException => "The coordinator is invalid.",
247            FlussError::FencedLeaderEpochException => "The leader epoch is invalid.",
248            FlussError::RequestTimeOut => "The request time out.",
249            FlussError::StorageException => "The general storage exception.",
250            FlussError::OperationNotAttemptedException => {
251                "The server did not attempt to execute this operation."
252            }
253            FlussError::NotEnoughReplicasAfterAppendException => {
254                "Records are written to the server already, but to fewer in-sync replicas than required."
255            }
256            FlussError::NotEnoughReplicasException => {
257                "Messages are rejected since there are fewer in-sync replicas than required."
258            }
259            FlussError::SecurityTokenException => "Get file access security token exception.",
260            FlussError::OutOfOrderSequenceException => {
261                "The tablet server received an out of order sequence batch."
262            }
263            FlussError::DuplicateSequenceException => {
264                "The tablet server received a duplicate sequence batch."
265            }
266            FlussError::UnknownWriterIdException => {
267                "This exception is raised by the tablet server if it could not locate the writer metadata."
268            }
269            FlussError::InvalidColumnProjection => "The requested column projection is invalid.",
270            FlussError::InvalidTargetColumn => "The requested target column to write is invalid.",
271            FlussError::PartitionNotExists => "The partition does not exist.",
272            FlussError::TableNotPartitionedException => "The table is not partitioned.",
273            FlussError::InvalidTimestampException => "The timestamp is invalid.",
274            FlussError::InvalidConfigException => "The config is invalid.",
275            FlussError::LakeStorageNotConfiguredException => "The lake storage is not configured.",
276            FlussError::KvSnapshotNotExist => "The kv snapshot does not exist.",
277            FlussError::PartitionAlreadyExists => "The partition already exists.",
278            FlussError::PartitionSpecInvalidException => "The partition spec is invalid.",
279            FlussError::LeaderNotAvailableException => {
280                "There is no currently available leader for the given partition."
281            }
282            FlussError::PartitionMaxNumException => "Exceed the maximum number of partitions.",
283            FlussError::AuthenticateException => "Authentication failed.",
284            FlussError::SecurityDisabledException => "Security is disabled.",
285            FlussError::AuthorizationException => "Authorization failed.",
286            FlussError::BucketMaxNumException => "Exceed the maximum number of buckets.",
287            FlussError::FencedTieringEpochException => "The tiering epoch is invalid.",
288            FlussError::RetriableAuthenticateException => {
289                "Authentication failed with retriable exception."
290            }
291            FlussError::InvalidServerRackInfoException => "The server rack info is invalid.",
292            FlussError::LakeSnapshotNotExist => "The lake snapshot does not exist.",
293            FlussError::LakeTableAlreadyExist => "The lake table already exists.",
294            FlussError::IneligibleReplicaException => {
295                "The new ISR contains at least one ineligible replica."
296            }
297            FlussError::InvalidAlterTableException => "The alter table is invalid.",
298            FlussError::DeletionDisabledException => {
299                "Deletion operations are disabled on this table."
300            }
301        }
302    }
303
304    /// Create an ApiError from this error with the default message.
305    pub fn to_api_error(&self, message: Option<String>) -> ApiError {
306        ApiError {
307            code: self.code(),
308            message: message.unwrap_or(self.message().to_string()),
309        }
310    }
311
312    /// Get the FlussError for the given error code.
313    /// Returns `UnknownServerError` if the code is not recognized.
314    pub fn for_code(code: i32) -> Self {
315        match code {
316            -1 => FlussError::UnknownServerError,
317            0 => FlussError::None,
318            1 => FlussError::NetworkException,
319            2 => FlussError::UnsupportedVersion,
320            3 => FlussError::CorruptMessage,
321            4 => FlussError::DatabaseNotExist,
322            5 => FlussError::DatabaseNotEmpty,
323            6 => FlussError::DatabaseAlreadyExist,
324            7 => FlussError::TableNotExist,
325            8 => FlussError::TableAlreadyExist,
326            9 => FlussError::SchemaNotExist,
327            10 => FlussError::LogStorageException,
328            11 => FlussError::KvStorageException,
329            12 => FlussError::NotLeaderOrFollower,
330            13 => FlussError::RecordTooLargeException,
331            14 => FlussError::CorruptRecordException,
332            15 => FlussError::InvalidTableException,
333            16 => FlussError::InvalidDatabaseException,
334            17 => FlussError::InvalidReplicationFactor,
335            18 => FlussError::InvalidRequiredAcks,
336            19 => FlussError::LogOffsetOutOfRangeException,
337            20 => FlussError::NonPrimaryKeyTableException,
338            21 => FlussError::UnknownTableOrBucketException,
339            22 => FlussError::InvalidUpdateVersionException,
340            23 => FlussError::InvalidCoordinatorException,
341            24 => FlussError::FencedLeaderEpochException,
342            25 => FlussError::RequestTimeOut,
343            26 => FlussError::StorageException,
344            27 => FlussError::OperationNotAttemptedException,
345            28 => FlussError::NotEnoughReplicasAfterAppendException,
346            29 => FlussError::NotEnoughReplicasException,
347            30 => FlussError::SecurityTokenException,
348            31 => FlussError::OutOfOrderSequenceException,
349            32 => FlussError::DuplicateSequenceException,
350            33 => FlussError::UnknownWriterIdException,
351            34 => FlussError::InvalidColumnProjection,
352            35 => FlussError::InvalidTargetColumn,
353            36 => FlussError::PartitionNotExists,
354            37 => FlussError::TableNotPartitionedException,
355            38 => FlussError::InvalidTimestampException,
356            39 => FlussError::InvalidConfigException,
357            40 => FlussError::LakeStorageNotConfiguredException,
358            41 => FlussError::KvSnapshotNotExist,
359            42 => FlussError::PartitionAlreadyExists,
360            43 => FlussError::PartitionSpecInvalidException,
361            44 => FlussError::LeaderNotAvailableException,
362            45 => FlussError::PartitionMaxNumException,
363            46 => FlussError::AuthenticateException,
364            47 => FlussError::SecurityDisabledException,
365            48 => FlussError::AuthorizationException,
366            49 => FlussError::BucketMaxNumException,
367            50 => FlussError::FencedTieringEpochException,
368            51 => FlussError::RetriableAuthenticateException,
369            52 => FlussError::InvalidServerRackInfoException,
370            53 => FlussError::LakeSnapshotNotExist,
371            54 => FlussError::LakeTableAlreadyExist,
372            55 => FlussError::IneligibleReplicaException,
373            56 => FlussError::InvalidAlterTableException,
374            57 => FlussError::DeletionDisabledException,
375            _ => FlussError::UnknownServerError,
376        }
377    }
378}
379
380impl Display for FlussError {
381    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
382        write!(f, "{}", self.message())
383    }
384}
385
386impl From<ErrorResponse> for ApiError {
387    fn from(error_response: ErrorResponse) -> Self {
388        let fluss_error = FlussError::for_code(error_response.error_code);
389        fluss_error.to_api_error(error_response.error_message)
390    }
391}
392
393impl From<ApiError> for FlussError {
394    fn from(api_error: ApiError) -> Self {
395        FlussError::for_code(api_error.code)
396    }
397}
398
399#[cfg(test)]
400mod tests {
401    use super::*;
402
403    #[test]
404    fn for_code_maps_known_and_unknown() {
405        assert_eq!(FlussError::for_code(0), FlussError::None);
406        assert_eq!(
407            FlussError::for_code(FlussError::AuthorizationException.code()),
408            FlussError::AuthorizationException
409        );
410        assert_eq!(FlussError::for_code(9999), FlussError::UnknownServerError);
411    }
412
413    #[test]
414    fn to_api_error_uses_message() {
415        let err = FlussError::InvalidTableException.to_api_error(None);
416        assert_eq!(err.code, FlussError::InvalidTableException.code());
417        assert!(err.message.contains("invalid table"));
418    }
419
420    #[test]
421    fn error_response_conversion_round_trip() {
422        let response = ErrorResponse {
423            error_code: FlussError::TableNotExist.code(),
424            error_message: Some("missing".to_string()),
425        };
426        let api_error = ApiError::from(response);
427        assert_eq!(api_error.code, FlussError::TableNotExist.code());
428        assert_eq!(api_error.message, "missing");
429        let fluss_error = FlussError::from(api_error);
430        assert_eq!(fluss_error, FlussError::TableNotExist);
431    }
432
433    #[test]
434    fn is_retriable_known_retriable_errors() {
435        let retriable = [
436            FlussError::NetworkException,
437            FlussError::CorruptMessage,
438            FlussError::SchemaNotExist,
439            FlussError::LogStorageException,
440            FlussError::KvStorageException,
441            FlussError::NotLeaderOrFollower,
442            FlussError::CorruptRecordException,
443            FlussError::UnknownTableOrBucketException,
444            FlussError::RequestTimeOut,
445            FlussError::StorageException,
446            FlussError::NotEnoughReplicasAfterAppendException,
447            FlussError::NotEnoughReplicasException,
448            FlussError::LeaderNotAvailableException,
449        ];
450        for err in &retriable {
451            assert!(err.is_retriable(), "{err:?} should be retriable");
452        }
453    }
454
455    #[test]
456    fn is_retriable_known_non_retriable_errors() {
457        let non_retriable = [
458            FlussError::UnknownServerError,
459            FlussError::None,
460            FlussError::TableNotExist,
461            FlussError::AuthenticateException,
462            FlussError::AuthorizationException,
463            FlussError::RecordTooLargeException,
464            FlussError::DeletionDisabledException,
465            FlussError::InvalidCoordinatorException,
466            FlussError::FencedLeaderEpochException,
467            FlussError::FencedTieringEpochException,
468            FlussError::RetriableAuthenticateException,
469        ];
470        for err in &non_retriable {
471            assert!(!err.is_retriable(), "{err:?} should not be retriable");
472        }
473    }
474
475    #[test]
476    fn api_error_is_retriable_delegates_to_fluss_error() {
477        let retriable_api = FlussError::RequestTimeOut.to_api_error(None);
478        assert!(retriable_api.is_retriable());
479
480        let permanent_api = FlussError::TableNotExist.to_api_error(None);
481        assert!(!permanent_api.is_retriable());
482    }
483}