1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
use thiserror::Error;
use crate::{
client_responds::{ConnectRespond, Respond},
command::CommandWithParsed,
commands::{MessageCommand, PingCommand, PongCommand},
protos::protobuf::pulsar_api::{BaseCommand, BaseCommand_Type as Type},
};
mod handle_ack_response;
mod handle_connected;
mod handle_error;
mod handle_message;
mod handle_ping;
mod handle_pong;
mod handle_producer_success;
mod handle_send_error;
mod handle_send_receipt;
mod handle_success;
pub mod errors;
pub mod on_responded;
pub mod pending_messages;
pub mod pending_requests;
pub mod pending_sequences;
pub use errors::{ReadCommandError, WriteCommandError};
pub use on_responded::OnResponded;
pub use pending_messages::PendingMessages;
pub use pending_requests::{PendingRequestValue, PendingRequests};
pub use pending_sequences::{PendingSequenceValue, PendingSequences};
#[derive(Debug)]
pub enum HandlerHandleOutput {
BrokerPing(PingCommand),
OnPingResponded(PongCommand),
OnConnectResponded(
Result<<ConnectRespond as Respond>::Response, <ConnectRespond as Respond>::Error>,
),
OnResponded(Box<OnResponded>),
BrokerPushMessage(Box<MessageCommand>),
}
#[derive(Error, Debug)]
pub enum HandlerHandleError {
#[error("BaseCommandInvalid {0:?}")]
BaseCommandInvalid(BaseCommand),
#[error("PendingRequestNotFount {0:?}")]
PendingRequestNotFount(BaseCommand),
#[error("PendingRequestMismatch {0:?}")]
PendingRequestMismatch(BaseCommand),
#[error("PendingSequenceNotFount {0:?}")]
PendingSequenceNotFount(BaseCommand),
#[error("Unsupported {0:?}")]
Unsupported(BaseCommand),
}
pub fn handle(
command: &CommandWithParsed,
pending_requests: &mut PendingRequests,
pending_sequences: &mut PendingSequences,
) -> Result<HandlerHandleOutput, HandlerHandleError> {
match command {
CommandWithParsed::Simple(c) => match &c.message.get_field_type() {
Type::PING => handle_ping::handle_ping(&c.message),
Type::PONG => handle_pong::handle_pong(&c.message),
Type::PRODUCER_SUCCESS => {
handle_producer_success::handle_producer_success(&c.message, pending_requests)
}
Type::SUCCESS => handle_success::handle_success(&c.message, pending_requests),
Type::ERROR => handle_error::handle_error(&c.message, pending_requests),
Type::SEND_RECEIPT => {
handle_send_receipt::handle_send_receipt(&c.message, pending_sequences)
}
Type::SEND_ERROR => handle_send_error::handle_send_error(&c.message, pending_sequences),
Type::ACK_RESPONSE => {
handle_ack_response::handle_ack_response(&c.message, pending_requests)
}
_ => Err(HandlerHandleError::Unsupported(c.message.to_owned())),
},
CommandWithParsed::Payload(c) => match &c.message.get_field_type() {
Type::MESSAGE => handle_message::handle_message(
&c.message,
&c.metadata,
&c.payload,
c.is_checksum_match,
),
_ => Err(HandlerHandleError::Unsupported(c.message.to_owned())),
},
}
}
pub fn handle_with_connect(
command: &CommandWithParsed,
) -> Result<HandlerHandleOutput, HandlerHandleError> {
match command {
CommandWithParsed::Simple(c) => match &c.message.get_field_type() {
Type::CONNECTED => handle_connected::handle_connected(&c.message),
Type::ERROR => handle_error::handle_error_with_connect(&c.message),
_ => Err(HandlerHandleError::Unsupported(c.message.to_owned())),
},
CommandWithParsed::Payload(c) => Err(HandlerHandleError::Unsupported(c.message.to_owned())),
}
}