1use {
2 prost::{DecodeError, Message},
3 richat_proto::richat::{
4 QuicSubscribeClose, QuicSubscribeCloseError, QuicSubscribeResponse,
5 QuicSubscribeResponseError,
6 },
7 std::io,
8 thiserror::Error,
9 tokio::io::{AsyncRead, AsyncReadExt},
10};
11
12#[derive(Debug, Error)]
13pub enum SubscribeError {
14 #[error("failed to send/recv data: {0}")]
15 Io(#[from] io::Error),
16 #[error("failed to send data: {0}")]
17 QuicWrite(#[from] quinn::WriteError),
18 #[error("connection lost: {0}")]
19 QuicConnection(#[from] quinn::ConnectionError),
20 #[error("failed to decode response: {0}")]
21 Decode(#[from] DecodeError),
22 #[error("unknown subscribe response error: {0}")]
23 Unknown(i32),
24 #[error("recv stream should be greater than zero")]
25 ZeroRecvStreams,
26 #[error("exceed max number of recv streams: {0}")]
27 ExceedRecvStreams(u32),
28 #[error("stream not initialized yet")]
29 NotInitialized,
30 #[error("replay from slot is not available, lowest available: {0}")]
31 ReplayFromSlotNotAvailable(u64),
32 #[error("request is too large")]
33 RequestSizeTooLarge,
34 #[error("x-token required")]
35 XTokenRequired,
36 #[error("x-token invalid")]
37 XTokenInvalid,
38}
39
40impl SubscribeError {
41 pub(crate) async fn parse_quic_response<R: AsyncRead + Unpin>(
42 recv: &mut R,
43 ) -> Result<String, Self> {
44 let size = recv.read_u64().await?;
45 let mut buf = vec![0; size as usize];
46 recv.read_exact(buf.as_mut_slice()).await?;
47
48 let response = QuicSubscribeResponse::decode(buf.as_slice())?;
49 if let Some(error) = response.error {
50 Err(match QuicSubscribeResponseError::try_from(error) {
51 Ok(QuicSubscribeResponseError::ZeroRecvStreams) => SubscribeError::ZeroRecvStreams,
52 Ok(QuicSubscribeResponseError::ExceedRecvStreams) => {
53 SubscribeError::ExceedRecvStreams(response.max_recv_streams())
54 }
55 Ok(QuicSubscribeResponseError::NotInitialized) => SubscribeError::NotInitialized,
56 Ok(QuicSubscribeResponseError::SlotNotAvailable) => {
57 SubscribeError::ReplayFromSlotNotAvailable(response.first_available_slot())
58 }
59 Ok(QuicSubscribeResponseError::RequestSizeTooLarge) => {
60 SubscribeError::RequestSizeTooLarge
61 }
62 Ok(QuicSubscribeResponseError::XTokenRequired) => SubscribeError::XTokenRequired,
63 Ok(QuicSubscribeResponseError::XTokenInvalid) => SubscribeError::XTokenInvalid,
64 Err(_error) => SubscribeError::Unknown(error),
65 })
66 } else {
67 Ok(response.version)
68 }
69 }
70}
71
72#[derive(Debug, Error)]
73pub enum ReceiveError {
74 #[error("failed to recv data: {0}")]
75 Io(#[from] io::Error),
76 #[error("failed to recv data: {0}")]
77 QuicRecv(#[from] quinn::ReadExactError),
78 #[error("failed to decode response: {0}")]
79 Decode(#[from] DecodeError),
80 #[error("stream failed: {0}")]
81 Status(#[from] tonic::Status),
82 #[error("unknown close error: {0}")]
83 Unknown(i32),
84 #[error("stream lagged")]
85 Lagged,
86 #[error("internal geyser stream is closed")]
87 Closed,
88}
89
90impl From<QuicSubscribeClose> for ReceiveError {
91 fn from(close: QuicSubscribeClose) -> Self {
92 match QuicSubscribeCloseError::try_from(close.error) {
93 Ok(QuicSubscribeCloseError::Lagged) => Self::Lagged,
94 Ok(QuicSubscribeCloseError::Closed) => Self::Closed,
95 Err(_error) => Self::Unknown(close.error),
96 }
97 }
98}
99
100impl ReceiveError {
101 pub fn is_eof(&self) -> bool {
102 if let Self::Io(error) = self {
103 error.kind() == io::ErrorKind::UnexpectedEof
104 } else {
105 false
106 }
107 }
108}