Skip to main content

razor_stream/
error.rs

1//! Error handling for razor-stream RPC framework.
2//!
3//! This module provides the error types and traits used for RPC error handling.
4//! It supports both internal RPC errors and user-defined custom errors.
5//!
6//! # Default Supported Error Types
7//!
8//! The following types have built-in `RpcErrCodec` implementations:
9//!
10//! - **Numeric types**: `i8`, `u8`, `i16`, `u16`, `i32`, `u32`
11//!   - Encoded as `u32` values for efficient transport
12//!   - Useful for errno-style error codes
13//!
14//! - **`()` (unit type)**
15//!   - Encoded as `0u32`
16//!   - Used when no additional error information is needed
17//!
18//! - `String` and &str
19//!   - Encoded as UTF-8 bytes
20//!   - Useful for descriptive error messages
21//!
22//! - `nix::errno::Errno`
23//!   - Encoded as `u32` values
24//!   - For system-level error codes
25//!
26//! # Custom Error Types
27//!
28//! To use your own error type in RPC methods, implement the [`RpcErrCodec`] trait.
29//!
30//! Keep in mind that your type should can be serialized into or deserialized from one of the
31//! variants of [EncodedErr].
32//! You have to choose in-between numeric or string:
33//!
34//! ## Approach 1: Numeric Encoding (errno-style)
35//!
36//! Use this when you want compact, efficient numeric error codes.
37//!
38//! ```rust
39//! use num_enum::TryFromPrimitive;
40//! use razor_stream::{Codec, error::{RpcErrCodec, RpcIntErr, EncodedErr}};
41//!
42//! #[derive(Debug, Clone, Copy, PartialEq, TryFromPrimitive)]
43//! #[repr(u32)]
44//! pub enum MyErrorCode {
45//!     NotFound = 1,
46//!     PermissionDenied = 2,
47//!     Timeout = 3,
48//! }
49//!
50//! impl RpcErrCodec for MyErrorCode {
51//!     fn encode<C: Codec>(&self, _codec: &C) -> EncodedErr {
52//!         EncodedErr::Num(*self as u32)
53//!     }
54//!
55//!     fn decode<C: Codec>(_codec: &C, buf: Result<u32, &[u8]>) -> Result<Self, ()> {
56//!         if let Ok(code) = buf {
57//!             return MyErrorCode::try_from(code).map_err(|_| ());
58//!         }
59//!         Err(())
60//!     }
61//!
62//!     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
63//!         std::fmt::Debug::fmt(self, f)
64//!     }
65//! }
66//! ```
67//!
68//! ## Approach 2: String Encoding (with strum)
69//!
70//! Use this when you want human-readable error strings that can be serialized/deserialized.
71//! Uses `EncodedErr::Static` to avoid heap allocation during encoding.
72//!
73//! ```rust
74//! use razor_stream::{Codec, error::{RpcErrCodec, RpcIntErr, EncodedErr}};
75//! use std::str::FromStr;
76//! use strum::{Display, EnumString, IntoStaticStr};
77//!
78//! #[derive(Debug, Clone, Display, EnumString, IntoStaticStr, PartialEq)]
79//! pub enum MyStringError {
80//!     #[strum(serialize = "not_found")]
81//!     NotFound,
82//!     #[strum(serialize = "permission_denied")]
83//!     PermissionDenied,
84//!     #[strum(serialize = "timeout")]
85//!     Timeout,
86//! }
87//!
88//! impl RpcErrCodec for MyStringError {
89//!     fn encode<C: Codec>(&self, _codec: &C) -> EncodedErr {
90//!         // Use EncodedErr::Static to avoid heap allocation, with the help of strum::IntoStaticStr
91//!         EncodedErr::Static(self.into())
92//!     }
93//!
94//!     fn decode<C: Codec>(_codec: &C, buf: Result<u32, &[u8]>) -> Result<Self, ()> {
95//!         // Decode with zero-copy: directly parse from &[u8] without allocating
96//!         if let Err(bytes) = buf {
97//!             // Safety: error strings are valid ASCII (subset of UTF-8), so unchecked is safe
98//!             let s = unsafe { std::str::from_utf8_unchecked(bytes) };
99//!             // Use strum's EnumString derive to parse from string
100//!             return MyStringError::from_str(s).map_err(|_| ());
101//!         }
102//!         Err(())
103//!     }
104//!
105//!     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
106//!         std::fmt::Display::fmt(self, f)
107//!     }
108//! }
109//! ```
110
111use crate::Codec;
112use std::fmt;
113
114/// "rpc_" prefix is reserved for internal error, you should avoid conflict with it
115pub const RPC_ERR_PREFIX: &str = "rpc_";
116
117/// A error type defined by client-side user logic
118///
119/// Due to possible decode
120#[derive(thiserror::Error)]
121pub enum RpcError<E: RpcErrCodec> {
122    User(#[from] E),
123    Rpc(#[from] RpcIntErr),
124}
125
126impl<E: RpcErrCodec> fmt::Display for RpcError<E> {
127    #[inline]
128    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
129        match self {
130            Self::User(e) => RpcErrCodec::fmt(e, f),
131            Self::Rpc(e) => fmt::Display::fmt(e, f),
132        }
133    }
134}
135
136impl<E: RpcErrCodec> fmt::Debug for RpcError<E> {
137    #[inline]
138    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
139        fmt::Display::fmt(self, f)
140    }
141}
142
143impl<E: RpcErrCodec> std::cmp::PartialEq<RpcIntErr> for RpcError<E> {
144    #[inline]
145    fn eq(&self, other: &RpcIntErr) -> bool {
146        if let Self::Rpc(r) = self
147            && r == other
148        {
149            return true;
150        }
151        false
152    }
153}
154
155impl<E: RpcErrCodec + PartialEq> std::cmp::PartialEq<E> for RpcError<E> {
156    #[inline]
157    fn eq(&self, other: &E) -> bool {
158        if let Self::User(r) = self {
159            return r == other;
160        }
161        false
162    }
163}
164
165impl<E: RpcErrCodec + PartialEq> std::cmp::PartialEq<RpcError<E>> for RpcError<E> {
166    #[inline]
167    fn eq(&self, other: &Self) -> bool {
168        match self {
169            Self::Rpc(r) => {
170                if let Self::Rpc(o) = other {
171                    return r == o;
172                }
173            }
174            Self::User(r) => {
175                if let Self::User(o) = other {
176                    return r == o;
177                }
178            }
179        }
180        false
181    }
182}
183
184impl From<&str> for RpcError<String> {
185    #[inline]
186    fn from(e: &str) -> Self {
187        Self::User(e.to_string())
188    }
189}
190
191/// Serialize and Deserialize trait for custom RpcError
192///
193/// There is only two forms for rpc transport layer, u32 and String, choose one of them.
194///
195/// Because Rust does not allow overlapping impl, we only imple RpcErrCodec trait by default for the following types:
196/// - ()
197/// - from i8 to u32
198/// - String
199/// - nix::errno::Errno
200///
201/// If you use other type as error, you can implement manually:
202///
203/// # Example with serde_derive
204/// ```rust
205/// use serde_derive::{Serialize, Deserialize};
206/// use razor_stream::{Codec, error::{RpcErrCodec, RpcIntErr, EncodedErr}};
207/// use strum::Display;
208/// #[derive(Serialize, Deserialize, Debug)]
209/// pub enum MyError {
210///     NoSuchFile,
211///     TooManyRequest,
212/// }
213///
214/// impl RpcErrCodec for MyError {
215///     #[inline(always)]
216///     fn encode<C: Codec>(&self, codec: &C) -> EncodedErr {
217///         match codec.encode(self) {
218///             Ok(buf)=>EncodedErr::Buf(buf),
219///             Err(())=>EncodedErr::Rpc(RpcIntErr::Encode),
220///         }
221///     }
222///
223///     #[inline(always)]
224///     fn decode<C: Codec>(codec: &C, buf: Result<u32, &[u8]>) -> Result<Self, ()> {
225///         if let Err(b) = buf {
226///             return codec.decode(b);
227///         } else {
228///             Err(())
229///         }
230///     }
231///     #[inline(always)]
232///     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
233///         std::fmt::Debug::fmt(self, f)
234///     }
235/// }
236/// ```
237///
238/// # Example with num_enum
239///
240/// ```rust
241/// use num_enum::TryFromPrimitive;
242/// use razor_stream::{Codec, error::{RpcErrCodec, RpcIntErr, EncodedErr}};
243///
244/// // Define your error codes as a C-like enum with explicit values
245/// // You can use num_enum's TryFromPrimitive for safer deserialization
246/// #[derive(Debug, Clone, Copy, PartialEq, TryFromPrimitive)]
247/// #[repr(u32)]
248/// pub enum MyRpcErrorCode {
249///     /// Service is not available
250///     ServiceUnavailable = 1,
251///     /// Request timed out
252///     RequestTimeout = 2,
253///     /// Invalid parameter
254///     InvalidParameter = 3,
255///     /// Resource not found
256///     NotFound = 4,
257/// }
258///
259/// impl RpcErrCodec for MyRpcErrorCode {
260///     #[inline(always)]
261///     fn encode<C: Codec>(&self, _codec: &C) -> EncodedErr {
262///         // Manual conversion to u32 (no IntoPrimitive needed)
263///         let code: u32 = *self as u32;
264///         EncodedErr::Num(code)
265///     }
266///
267///     #[inline(always)]
268///     fn decode<C: Codec>(_codec: &C, buf: Result<u32, &[u8]>) -> Result<Self, ()> {
269///         if let Ok(code) = buf {
270///             // Using num_enum for safe deserialization (TryFromPrimitive)
271///             return MyRpcErrorCode::try_from(code).map_err(|_| ());
272///         }
273///         Err(())
274///     }
275///
276///     #[inline(always)]
277///     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
278///         std::fmt::Debug::fmt(self, f)
279///     }
280/// }
281/// ```
282pub trait RpcErrCodec: Send + Sized + 'static + Unpin {
283    fn encode<C: Codec>(&self, codec: &C) -> EncodedErr;
284
285    fn decode<C: Codec>(codec: &C, buf: Result<u32, &[u8]>) -> Result<Self, ()>;
286
287    /// You can choose to use std::fmt::Debug or std::fmt::Display for the type.
288    ///
289    /// NOTE that this method exists because rust does not have Display for ().
290    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result;
291
292    /// Check if this error should trigger a failover/retry and return the redirect address.
293    ///
294    /// This is used by FailoverPool to implement leader redirection for multi-node
295    /// master-slave services. When a follower node receives a write request, it can
296    /// return a redirect error with the leader's address.
297    ///
298    /// Returns:
299    /// - `Ok(Some(addr))`: Retry to the specific address
300    /// - `Ok(None)`: Retry to next available node (round-robin or leader election)
301    /// - `Err(())`: Don't retry, return error to user
302    ///
303    /// Default implementation returns `Err(())`, meaning no retry.
304    ///
305    /// # Example
306    /// ```rust
307    /// use razor_stream::{Codec, error::{RpcErrCodec, EncodedErr}};
308    ///
309    /// #[derive(Debug, Clone, PartialEq)]
310    /// pub enum MyError {
311    ///     Redirect(String),
312    ///     NotLeader,
313    ///     OtherError,
314    /// }
315    ///
316    /// impl RpcErrCodec for MyError {
317    ///     fn encode<C: Codec>(&self, _codec: &C) -> EncodedErr {
318    ///         // ... encode implementation
319    ///         # EncodedErr::Static("error")
320    ///     }
321    ///
322    ///     fn decode<C: Codec>(_codec: &C, _buf: Result<u32, &[u8]>) -> Result<Self, ()> {
323    ///         // ... decode implementation
324    ///         # Err(())
325    ///     }
326    ///
327    ///     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
328    ///         std::fmt::Debug::fmt(self, f)
329    ///     }
330    ///
331    ///     fn should_failover(&self) -> Result<Option<&str>, ()> {
332    ///         match self {
333    ///             Self::Redirect(addr) => Ok(Some(addr)),
334    ///             Self::NotLeader => Ok(None),  // Retry to next node
335    ///             Self::OtherError => Err(()),  // Don't retry
336    ///         }
337    ///     }
338    /// }
339    /// ```
340    #[inline(always)]
341    fn should_failover(&self) -> Result<Option<&str>, ()> {
342        Err(())
343    }
344}
345
346macro_rules! impl_rpc_error_for_num {
347    ($t: tt) => {
348        impl RpcErrCodec for $t {
349            #[inline(always)]
350            fn encode<C: Codec>(&self, _codec: &C) -> EncodedErr {
351                EncodedErr::Num(*self as u32)
352            }
353
354            #[inline(always)]
355            fn decode<C: Codec>(_codec: &C, buf: Result<u32, &[u8]>) -> Result<Self, ()> {
356                if let Ok(i) = buf {
357                    if i <= $t::MAX as u32 {
358                        return Ok(i as Self);
359                    }
360                }
361                Err(())
362            }
363
364            #[inline(always)]
365            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
366                write!(f, "errno {}", self)
367            }
368        }
369    };
370}
371
372impl_rpc_error_for_num!(i8);
373impl_rpc_error_for_num!(u8);
374impl_rpc_error_for_num!(i16);
375impl_rpc_error_for_num!(u16);
376impl_rpc_error_for_num!(i32);
377impl_rpc_error_for_num!(u32);
378
379impl RpcErrCodec for nix::errno::Errno {
380    #[inline(always)]
381    fn encode<C: Codec>(&self, _codec: &C) -> EncodedErr {
382        EncodedErr::Num(*self as u32)
383    }
384
385    #[inline(always)]
386    fn decode<C: Codec>(_codec: &C, buf: Result<u32, &[u8]>) -> Result<Self, ()> {
387        if let Ok(i) = buf
388            && i <= i32::MAX as u32
389        {
390            return Ok(Self::from_raw(i as i32));
391        }
392        Err(())
393    }
394
395    #[inline(always)]
396    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
397        write!(f, "{:?}", self)
398    }
399}
400
401impl RpcErrCodec for () {
402    #[inline(always)]
403    fn encode<C: Codec>(&self, _codec: &C) -> EncodedErr {
404        EncodedErr::Num(0u32)
405    }
406
407    #[inline(always)]
408    fn decode<C: Codec>(_codec: &C, buf: Result<u32, &[u8]>) -> Result<Self, ()> {
409        if let Ok(i) = buf
410            && i == 0
411        {
412            return Ok(());
413        }
414        Err(())
415    }
416
417    #[inline(always)]
418    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
419        write!(f, "err")
420    }
421}
422
423impl RpcErrCodec for String {
424    #[inline(always)]
425    fn encode<C: Codec>(&self, _codec: &C) -> EncodedErr {
426        EncodedErr::Buf(Vec::from(self.as_bytes()))
427    }
428    #[inline(always)]
429    fn decode<C: Codec>(_codec: &C, buf: Result<u32, &[u8]>) -> Result<Self, ()> {
430        if let Err(s) = buf
431            && let Ok(s) = str::from_utf8(s)
432        {
433            return Ok(s.to_string());
434        }
435        Err(())
436    }
437
438    #[inline(always)]
439    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
440        write!(f, "{}", self)
441    }
442}
443
444/// RpcIntErr represent internal error from the framework
445///
446/// **NOTE**:
447/// - This error type is serialized in string, "rpc_" prefix is reserved for internal error, you
448///   should avoid conflict with it.
449/// - We presume the variants less than RpcIntErr::Method is retriable errors
450#[derive(
451    strum::Display,
452    strum::EnumString,
453    strum::AsRefStr,
454    PartialEq,
455    PartialOrd,
456    Clone,
457    thiserror::Error,
458)]
459#[repr(u8)]
460pub enum RpcIntErr {
461    /// Ping or connect error
462    #[strum(serialize = "rpc_unreachable")]
463    Unreachable = 0,
464    /// IO error
465    #[strum(serialize = "rpc_io_err")]
466    IO = 1,
467    /// Task timeout
468    #[strum(serialize = "rpc_timeout")]
469    Timeout = 2,
470    /// Method not found
471    #[strum(serialize = "rpc_method_notfound")]
472    Method = 3,
473    /// service notfound
474    #[strum(serialize = "rpc_service_notfound")]
475    Service = 4,
476    /// Encode Error
477    #[strum(serialize = "rpc_encode")]
478    Encode = 5,
479    /// Decode Error
480    #[strum(serialize = "rpc_decode")]
481    Decode = 6,
482    /// Internal error
483    #[strum(serialize = "rpc_internal_err")]
484    Internal = 7,
485    /// invalid version number in rpc header
486    #[strum(serialize = "rpc_invalid_ver")]
487    Version = 8,
488}
489
490// The default Debug derive just ignore strum customized string, by strum only have a Display derive
491impl fmt::Debug for RpcIntErr {
492    #[inline]
493    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
494        fmt::Display::fmt(self, f)
495    }
496}
497
498impl RpcIntErr {
499    #[inline]
500    pub fn as_bytes(&self) -> &[u8] {
501        self.as_ref().as_bytes()
502    }
503}
504
505impl From<std::io::Error> for RpcIntErr {
506    #[inline(always)]
507    fn from(_e: std::io::Error) -> Self {
508        Self::IO
509    }
510}
511
512/// A container for error message parse from / send into transport
513#[derive(Debug)]
514pub enum EncodedErr {
515    /// The ClientTransport should try the best to parse it from string with "rpc_" prefix
516    Rpc(RpcIntErr),
517    /// For nix errno and the like
518    Num(u32),
519    /// Only for server-side to encode err.
520    ///
521    /// The ClientTransport cannot decode into static type
522    Static(&'static str),
523    /// The ClientTransport will fallback to `Vec<u8>` after try to parse  RpcIntErr and  num
524    Buf(Vec<u8>),
525}
526
527impl EncodedErr {
528    #[inline]
529    pub fn try_as_str(&self) -> Result<&str, ()> {
530        match self {
531            Self::Static(s) => return Ok(s),
532            Self::Buf(b) => {
533                if let Ok(s) = str::from_utf8(b) {
534                    return Ok(s);
535                }
536            }
537            _ => {}
538        }
539        Err(())
540    }
541}
542
543/// Just for macro test
544impl std::cmp::PartialEq<EncodedErr> for EncodedErr {
545    fn eq(&self, other: &EncodedErr) -> bool {
546        match self {
547            Self::Rpc(e) => {
548                if let Self::Rpc(o) = other {
549                    return e == o;
550                }
551            }
552            Self::Num(e) => {
553                if let Self::Num(o) = other {
554                    return e == o;
555                }
556            }
557            Self::Static(s) => {
558                if let Ok(o) = other.try_as_str() {
559                    return *s == o;
560                }
561            }
562            Self::Buf(s) => {
563                if let Self::Buf(o) = other {
564                    return s == o;
565                } else if let Ok(o) = other.try_as_str() {
566                    // other's type is not Buf
567                    if let Ok(_s) = str::from_utf8(s) {
568                        return _s == o;
569                    }
570                }
571            }
572        }
573        false
574    }
575}
576
577impl fmt::Display for EncodedErr {
578    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
579        match self {
580            Self::Rpc(e) => e.fmt(f),
581            Self::Num(no) => write!(f, "errno {}", no),
582            Self::Static(s) => write!(f, "{}", s),
583            Self::Buf(b) => match str::from_utf8(b) {
584                Ok(s) => {
585                    write!(f, "{}", s)
586                }
587                Err(_) => {
588                    write!(f, "err blob {} length", b.len())
589                }
590            },
591        }
592    }
593}
594
595impl From<RpcIntErr> for EncodedErr {
596    #[inline(always)]
597    fn from(e: RpcIntErr) -> Self {
598        Self::Rpc(e)
599    }
600}
601
602#[cfg(test)]
603mod tests {
604    use super::*;
605    use nix::errno::Errno;
606    use std::str::FromStr;
607
608    #[test]
609    fn test_internal_error() {
610        println!("{}", RpcIntErr::Internal);
611        println!("{:?}", RpcIntErr::Internal);
612        let s = RpcIntErr::Timeout.as_ref();
613        println!("RpcIntErr::Timeout as {}", s);
614        let e = RpcIntErr::from_str(s).expect("parse");
615        assert_eq!(e, RpcIntErr::Timeout);
616        assert!(RpcIntErr::from_str("timeoutss").is_err());
617        assert!(RpcIntErr::Timeout < RpcIntErr::Method);
618        assert!(RpcIntErr::IO < RpcIntErr::Method);
619        assert!(RpcIntErr::Unreachable < RpcIntErr::Method);
620    }
621
622    #[test]
623    fn test_rpc_error_default() {
624        let e = RpcError::<i32>::from(1i32);
625        println!("err {:?} {}", e, e);
626
627        let e = RpcError::<Errno>::from(Errno::EIO);
628        println!("err {:?} {}", e, e);
629
630        let e = RpcError::<String>::from("err_str");
631        println!("err {:?} {}", e, e);
632        let e2 = RpcError::<String>::from("err_str".to_string());
633        assert_eq!(e, e2);
634
635        let e = RpcError::<String>::from(RpcIntErr::IO);
636        println!("err {:?} {}", e, e);
637
638        let _e: Result<(), RpcIntErr> = Err(RpcIntErr::IO);
639
640        // let e: Result<(), RpcError::<String>> = _e.into();
641        // Not allow by rust, and orphan rule prevent we do
642        // `impl<E: RpcErrCodec> From<Result<(), RpcIntErr>> for Result<(), RpcError<E>>`
643
644        // it's ok to use map_err
645        let e: Result<(), RpcError<String>> = _e.map_err(|e| e.into());
646        println!("err {:?}", e);
647    }
648
649    //#[test]
650    //fn test_rpc_error_string_enum() {
651    //    not supported by default, should provide a derive
652    //    #[derive(
653    //        Debug, strum::Display, strum::EnumString, strum::AsRefStr, PartialEq, Clone, thiserror::Error,
654    //    )]
655    //    enum MyError {
656    //        OhMyGod,
657    //    }
658    //    let e = RpcError::<MyError>::from(MyError::OhMyGod);
659    //    println!("err {:?} {}", e, e);
660    //}
661}