Skip to main content

richat_client/
error.rs

1use {
2    prost::{DecodeError, Message},
3    richat_proto::richat::{
4        QuicSubscribeClose, QuicSubscribeCloseError, QuicSubscribeResponse,
5        QuicSubscribeResponseError,
6    },
7    std::io,
8    thiserror::Error,
9    tokio::io::{AsyncRead, AsyncReadExt},
10};
11
12#[derive(Debug, Error)]
13pub enum SubscribeError {
14    #[error("failed to send/recv data: {0}")]
15    Io(#[from] io::Error),
16    #[error("failed to send data: {0}")]
17    QuicWrite(#[from] quinn::WriteError),
18    #[error("connection lost: {0}")]
19    QuicConnection(#[from] quinn::ConnectionError),
20    #[error("failed to decode response: {0}")]
21    Decode(#[from] DecodeError),
22    #[error("unknown subscribe response error: {0}")]
23    Unknown(i32),
24    #[error("recv stream should be greater than zero")]
25    ZeroRecvStreams,
26    #[error("exceed max number of recv streams: {0}")]
27    ExceedRecvStreams(u32),
28    #[error("stream not initialized yet")]
29    NotInitialized,
30    #[error("replay from slot is not available, lowest available: {0}")]
31    ReplayFromSlotNotAvailable(u64),
32    #[error("request is too large")]
33    RequestSizeTooLarge,
34    #[error("x-token required")]
35    XTokenRequired,
36    #[error("x-token invalid")]
37    XTokenInvalid,
38}
39
40impl SubscribeError {
41    pub(crate) async fn parse_quic_response<R: AsyncRead + Unpin>(
42        recv: &mut R,
43    ) -> Result<String, Self> {
44        let size = recv.read_u64().await?;
45        let mut buf = vec![0; size as usize];
46        recv.read_exact(buf.as_mut_slice()).await?;
47
48        let response = QuicSubscribeResponse::decode(buf.as_slice())?;
49        if let Some(error) = response.error {
50            Err(match QuicSubscribeResponseError::try_from(error) {
51                Ok(QuicSubscribeResponseError::ZeroRecvStreams) => SubscribeError::ZeroRecvStreams,
52                Ok(QuicSubscribeResponseError::ExceedRecvStreams) => {
53                    SubscribeError::ExceedRecvStreams(response.max_recv_streams())
54                }
55                Ok(QuicSubscribeResponseError::NotInitialized) => SubscribeError::NotInitialized,
56                Ok(QuicSubscribeResponseError::SlotNotAvailable) => {
57                    SubscribeError::ReplayFromSlotNotAvailable(response.first_available_slot())
58                }
59                Ok(QuicSubscribeResponseError::RequestSizeTooLarge) => {
60                    SubscribeError::RequestSizeTooLarge
61                }
62                Ok(QuicSubscribeResponseError::XTokenRequired) => SubscribeError::XTokenRequired,
63                Ok(QuicSubscribeResponseError::XTokenInvalid) => SubscribeError::XTokenInvalid,
64                Err(_error) => SubscribeError::Unknown(error),
65            })
66        } else {
67            Ok(response.version)
68        }
69    }
70}
71
72#[derive(Debug, Error)]
73pub enum ReceiveError {
74    #[error("failed to recv data: {0}")]
75    Io(#[from] io::Error),
76    #[error("failed to recv data: {0}")]
77    QuicRecv(#[from] quinn::ReadExactError),
78    #[error("failed to decode response: {0}")]
79    Decode(#[from] DecodeError),
80    #[error("stream failed: {0}")]
81    Status(#[from] tonic::Status),
82    #[error("unknown close error: {0}")]
83    Unknown(i32),
84    #[error("stream lagged")]
85    Lagged,
86    #[error("internal geyser stream is closed")]
87    Closed,
88}
89
90impl From<QuicSubscribeClose> for ReceiveError {
91    fn from(close: QuicSubscribeClose) -> Self {
92        match QuicSubscribeCloseError::try_from(close.error) {
93            Ok(QuicSubscribeCloseError::Lagged) => Self::Lagged,
94            Ok(QuicSubscribeCloseError::Closed) => Self::Closed,
95            Err(_error) => Self::Unknown(close.error),
96        }
97    }
98}
99
100impl ReceiveError {
101    pub fn is_eof(&self) -> bool {
102        if let Self::Io(error) = self {
103            error.kind() == io::ErrorKind::UnexpectedEof
104        } else {
105            false
106        }
107    }
108}