use std::fmt::{Display, Formatter};
use flv_util::string_helper::upper_cammel_case_to_sentence;
use fluvio_protocol::{Encoder, Decoder};
use crate::smartmodule::SmartModuleRuntimeError;
#[repr(i16)]
#[derive(thiserror::Error, Encoder, Decoder, PartialEq, Debug, Clone)]
#[non_exhaustive]
pub enum ErrorCode {
#[fluvio(tag = -1)]
#[error("An unknown server error occurred")]
UnknownServerError,
#[fluvio(tag = 0)]
#[error("ErrorCode indicated success. If you see this it is likely a bug.")]
None,
#[fluvio(tag = 2)]
#[error("Other error: {0}")]
Other(String),
#[fluvio(tag = 1)]
#[error("Offset out of range")]
OffsetOutOfRange,
#[fluvio(tag = 6)]
#[error("the given SPU is not the leader for the partition")]
NotLeaderForPartition,
#[fluvio(tag = 7)]
#[error("the request '{kind}' exceeded the timeout {timeout_ms} ms")]
RequestTimedOut { timeout_ms: u64, kind: RequestKind },
#[fluvio(tag = 10)]
#[error("the message is too large to send")]
MessageTooLarge,
#[fluvio(tag = 13)]
#[error("permission denied")]
PermissionDenied,
#[fluvio(tag = 56)]
#[error("a storage error occurred")]
StorageError,
#[fluvio(tag = 60)]
#[error("invalid create request")]
InvalidCreateRequest,
#[fluvio(tag = 61)]
#[error("invalid Delete request")]
InvalidDeleteRequest,
#[fluvio(tag = 1000)]
#[error("an error occurred on the SPU")]
SpuError,
#[fluvio(tag = 1001)]
#[error("failed to register an SPU")]
SpuRegisterationFailed,
#[fluvio(tag = 1002)]
#[error("the SPU is offline")]
SpuOffline,
#[fluvio(tag = 1003)]
#[error("the SPU was not found")]
SpuNotFound,
#[fluvio(tag = 1004)]
#[error("the SPU already exists")]
SpuAlreadyExists,
#[fluvio(tag = 2000)]
#[error("a topic error occurred")]
TopicError,
#[fluvio(tag = 2001)]
#[error("the topic was not found")]
TopicNotFound,
#[fluvio(tag = 2002)]
#[error("the topic already exists")]
TopicAlreadyExists,
#[fluvio(tag = 2003)]
#[error("the topic has not been initialized")]
TopicPendingInitialization,
#[fluvio(tag = 2004)]
#[error("the topic configuration is invalid")]
TopicInvalidConfiguration,
#[fluvio(tag = 2005)]
#[error("the topic is not provisioned")]
TopicNotProvisioned,
#[fluvio(tag = 2006)]
#[error("the topic name is invalid")]
TopicInvalidName,
#[fluvio(tag = 3000)]
#[error("the partition is not initialized")]
PartitionPendingInitialization,
#[fluvio(tag = 3001)]
#[error("the partition is not a leader")]
PartitionNotLeader,
#[fluvio(tag = 3002)]
#[error("the fetch session was not found")]
FetchSessionNotFoud,
#[deprecated(since = "0.9.13")]
#[fluvio(tag = 4000)]
#[error("a legacy SmartModule error occurred")]
LegacySmartModuleError(#[from] LegacySmartModuleError),
#[fluvio(tag = 5000)]
#[error("an error occurred while managing a connector")]
ManagedConnectorError,
#[fluvio(tag = 5001)]
#[error("the managed connector was not found")]
ManagedConnectorNotFound,
#[fluvio(tag = 5002)]
#[error("an error occurred while managing a connector")]
ManagedConnectorAlreadyExists,
#[fluvio(tag = 6000)]
#[error("an error occurred while managing a SmartModule")]
SmartModuleError,
#[fluvio(tag = 6001)]
#[error("SmartModule {name} was not found")]
SmartModuleNotFound { name: String },
#[fluvio(tag = 6002)]
#[error("SmartModule is invalid: {error}")]
SmartModuleInvalid { error: String, name: Option<String> },
#[fluvio(tag = 6003)]
#[error("SmartModule is not a valid '{kind}' SmartModule due to {error}. Are you missing a #[smartmodule({kind})] attribute?")]
SmartModuleInvalidExports { error: String, kind: String },
#[fluvio(tag = 6004)]
#[error("SmartModule runtime error {0}")]
SmartModuleRuntimeError(SmartModuleRuntimeError),
#[fluvio(tag = 7000)]
#[error("a tableformat error occurred")]
TableFormatError,
#[fluvio(tag = 7001)]
#[error("the tableformat was not found")]
TableFormatNotFound,
#[fluvio(tag = 7002)]
#[error("the tableformat already exists")]
TableFormatAlreadyExists,
#[fluvio(tag = 8000)]
#[error("DerivedStream object error")]
DerivedStreamObjectError,
#[fluvio(tag = 8001)]
#[error("the derivedstream was not found")]
DerivedStreamNotFound(String),
#[fluvio(tag = 8002)]
#[error("the derivedstream join data cannot be fetched")]
DerivedStreamJoinFetchError,
#[fluvio(tag = 8003)]
#[error("the derivedstream {0} is invalid")]
DerivedStreamInvalid(String),
#[error("can't do recursive derivedstream yet: {0}->{1}")]
DerivedStreamRecursion(String, String),
#[fluvio(tag = 9000)]
#[error("a compression error occurred in the SPU")]
CompressionError,
}
impl Default for ErrorCode {
fn default() -> ErrorCode {
ErrorCode::None
}
}
impl ErrorCode {
pub fn is_ok(&self) -> bool {
matches!(self, ErrorCode::None)
}
pub fn to_sentence(&self) -> String {
match self {
ErrorCode::None => "".to_owned(),
_ => upper_cammel_case_to_sentence(format!("{:?}", self), true),
}
}
pub fn is_error(&self) -> bool {
!self.is_ok()
}
}
#[derive(thiserror::Error, Debug, Clone, PartialEq, Encoder, Decoder)]
pub enum LegacySmartModuleError {
#[error("Runtime error")]
Runtime(#[from] SmartModuleRuntimeError),
#[error("WASM Module error: {0}")]
InvalidWasmModule(String),
#[error("WASM module is not a valid '{0}' DerivedStream. Are you missing a #[smartmodule({0})] attribute?")]
InvalidDerivedStreamModule(String),
}
impl Default for LegacySmartModuleError {
fn default() -> Self {
Self::Runtime(Default::default())
}
}
#[derive(Debug, Clone, PartialEq, Encoder, Decoder)]
#[non_exhaustive]
pub enum RequestKind {
Produce,
}
impl Default for RequestKind {
fn default() -> Self {
RequestKind::Produce
}
}
impl Display for RequestKind {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_protocol_tags_unchanged() {
macro_rules! assert_tag {
($variant:expr, $tag:expr, $version:expr) => {{
let mut data = Vec::new();
let mut value = ErrorCode::default();
fluvio_protocol::Encoder::encode(&$variant, &mut data, $version)
.expect(&format!("Failed to encode {}", stringify!($variant)));
assert_eq!(
data[..2],
($tag as i16).to_be_bytes(),
"Data check failed for {}",
stringify!($variant)
);
fluvio_protocol::Decoder::decode(
&mut value,
&mut std::io::Cursor::new(&data),
$version,
)
.expect(&format!("Failed to decode {}", stringify!($variant)));
assert_eq!(
&value,
&$variant,
"Value check failed for {}",
stringify!($variant)
);
}};
}
assert_tag!(ErrorCode::UnknownServerError, -1, 0);
assert_tag!(ErrorCode::None, 0, 0);
assert_tag!(ErrorCode::OffsetOutOfRange, 1, 0);
assert_tag!(ErrorCode::NotLeaderForPartition, 6, 0);
assert_tag!(
ErrorCode::RequestTimedOut {
kind: RequestKind::Produce,
timeout_ms: 1
},
7,
0
);
assert_tag!(ErrorCode::MessageTooLarge, 10, 0);
assert_tag!(ErrorCode::PermissionDenied, 13, 0);
assert_tag!(ErrorCode::StorageError, 56, 0);
assert_tag!(ErrorCode::SpuError, 1000, 0);
assert_tag!(ErrorCode::SpuRegisterationFailed, 1001, 0);
assert_tag!(ErrorCode::SpuOffline, 1002, 0);
assert_tag!(ErrorCode::SpuNotFound, 1003, 0);
assert_tag!(ErrorCode::SpuAlreadyExists, 1004, 0);
assert_tag!(ErrorCode::TopicError, 2000, 0);
assert_tag!(ErrorCode::TopicNotFound, 2001, 0);
assert_tag!(ErrorCode::TopicAlreadyExists, 2002, 0);
assert_tag!(ErrorCode::TopicPendingInitialization, 2003, 0);
assert_tag!(ErrorCode::TopicInvalidConfiguration, 2004, 0);
assert_tag!(ErrorCode::TopicNotProvisioned, 2005, 0);
assert_tag!(ErrorCode::PartitionPendingInitialization, 3000, 0);
assert_tag!(ErrorCode::PartitionNotLeader, 3001, 0);
assert_tag!(ErrorCode::FetchSessionNotFoud, 3002, 0);
}
#[test]
fn test_errorcode_impls_error() {
fn assert_error<E: std::error::Error>() {}
assert_error::<ErrorCode>();
}
}