pulsar_binary_protocol_spec/client_handler/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
use thiserror::Error;

use crate::{
    client_responds::{ConnectRespond, Respond},
    command::CommandWithParsed,
    commands::{MessageCommand, PingCommand, PongCommand},
    protos::protobuf::pulsar_api::{BaseCommand, BaseCommand_Type as Type},
};

mod handle_ack_response;
mod handle_connected;
mod handle_error;
mod handle_message;
mod handle_ping;
mod handle_pong;
mod handle_producer_success;
mod handle_send_error;
mod handle_send_receipt;
mod handle_success;

pub mod errors;
pub mod on_responded;
pub mod pending_messages;
pub mod pending_requests;
pub mod pending_sequences;

pub use errors::{ReadCommandError, WriteCommandError};
pub use on_responded::OnResponded;
pub use pending_messages::PendingMessages;
pub use pending_requests::{PendingRequestValue, PendingRequests};
pub use pending_sequences::{PendingSequenceValue, PendingSequences};

#[derive(Debug)]
pub enum HandlerHandleOutput {
    BrokerPing(PingCommand),
    OnPingResponded(PongCommand),
    OnConnectResponded(
        Result<<ConnectRespond as Respond>::Response, <ConnectRespond as Respond>::Error>,
    ),
    OnResponded(Box<OnResponded>),
    BrokerPushMessage(Box<MessageCommand>),
}

#[derive(Error, Debug)]
pub enum HandlerHandleError {
    #[error("BaseCommandInvalid {0:?}")]
    BaseCommandInvalid(BaseCommand),

    #[error("PendingRequestNotFount {0:?}")]
    PendingRequestNotFount(BaseCommand),
    #[error("PendingRequestMismatch {0:?}")]
    PendingRequestMismatch(BaseCommand),

    #[error("PendingSequenceNotFount {0:?}")]
    PendingSequenceNotFount(BaseCommand),

    #[error("Unsupported {0:?}")]
    Unsupported(BaseCommand),
}

pub fn handle(
    command: &CommandWithParsed,
    pending_requests: &mut PendingRequests,
    pending_sequences: &mut PendingSequences,
) -> Result<HandlerHandleOutput, HandlerHandleError> {
    match command {
        CommandWithParsed::Simple(c) => match &c.message.get_field_type() {
            Type::PING => handle_ping::handle_ping(&c.message),
            Type::PONG => handle_pong::handle_pong(&c.message),
            //
            Type::PRODUCER_SUCCESS => {
                handle_producer_success::handle_producer_success(&c.message, pending_requests)
            }
            Type::SUCCESS => handle_success::handle_success(&c.message, pending_requests),
            Type::ERROR => handle_error::handle_error(&c.message, pending_requests),

            //
            Type::SEND_RECEIPT => {
                handle_send_receipt::handle_send_receipt(&c.message, pending_sequences)
            }
            Type::SEND_ERROR => handle_send_error::handle_send_error(&c.message, pending_sequences),

            //
            Type::ACK_RESPONSE => {
                handle_ack_response::handle_ack_response(&c.message, pending_requests)
            }

            //
            _ => Err(HandlerHandleError::Unsupported(c.message.to_owned())),
        },
        CommandWithParsed::Payload(c) => match &c.message.get_field_type() {
            Type::MESSAGE => handle_message::handle_message(
                &c.message,
                &c.metadata,
                &c.payload,
                c.is_checksum_match,
            ),
            _ => Err(HandlerHandleError::Unsupported(c.message.to_owned())),
        },
    }
}

pub fn handle_with_connect(
    command: &CommandWithParsed,
) -> Result<HandlerHandleOutput, HandlerHandleError> {
    match command {
        CommandWithParsed::Simple(c) => match &c.message.get_field_type() {
            Type::CONNECTED => handle_connected::handle_connected(&c.message),
            Type::ERROR => handle_error::handle_error_with_connect(&c.message),
            _ => Err(HandlerHandleError::Unsupported(c.message.to_owned())),
        },
        CommandWithParsed::Payload(c) => Err(HandlerHandleError::Unsupported(c.message.to_owned())),
    }
}