1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
use thiserror::Error;

use crate::{
    client_responds::{ConnectRespond, Respond},
    command::CommandWithParsed,
    commands::{MessageCommand, PingCommand, PongCommand},
    protos::protobuf::pulsar_api::{BaseCommand, BaseCommand_Type as Type},
};

mod handle_ack_response;
mod handle_connected;
mod handle_error;
mod handle_message;
mod handle_ping;
mod handle_pong;
mod handle_producer_success;
mod handle_send_error;
mod handle_send_receipt;
mod handle_success;

pub mod errors;
pub mod on_responded;
pub mod pending_messages;
pub mod pending_requests;
pub mod pending_sequences;

pub use errors::{ReadCommandError, WriteCommandError};
pub use on_responded::OnResponded;
pub use pending_messages::PendingMessages;
pub use pending_requests::{PendingRequestValue, PendingRequests};
pub use pending_sequences::{PendingSequenceValue, PendingSequences};

#[derive(Debug)]
pub enum HandlerHandleOutput {
    BrokerPing(PingCommand),
    OnPingResponded(PongCommand),
    OnConnectResponded(
        Result<<ConnectRespond as Respond>::Response, <ConnectRespond as Respond>::Error>,
    ),
    OnResponded(Box<OnResponded>),
    BrokerPushMessage(Box<MessageCommand>),
}

#[derive(Error, Debug)]
pub enum HandlerHandleError {
    #[error("BaseCommandInvalid {0:?}")]
    BaseCommandInvalid(BaseCommand),

    #[error("PendingRequestNotFount {0:?}")]
    PendingRequestNotFount(BaseCommand),
    #[error("PendingRequestMismatch {0:?}")]
    PendingRequestMismatch(BaseCommand),

    #[error("PendingSequenceNotFount {0:?}")]
    PendingSequenceNotFount(BaseCommand),

    #[error("Unsupported {0:?}")]
    Unsupported(BaseCommand),
}

pub fn handle(
    command: &CommandWithParsed,
    pending_requests: &mut PendingRequests,
    pending_sequences: &mut PendingSequences,
) -> Result<HandlerHandleOutput, HandlerHandleError> {
    match command {
        CommandWithParsed::Simple(c) => match &c.message.get_field_type() {
            Type::PING => handle_ping::handle_ping(&c.message),
            Type::PONG => handle_pong::handle_pong(&c.message),
            //
            Type::PRODUCER_SUCCESS => {
                handle_producer_success::handle_producer_success(&c.message, pending_requests)
            }
            Type::SUCCESS => handle_success::handle_success(&c.message, pending_requests),
            Type::ERROR => handle_error::handle_error(&c.message, pending_requests),

            //
            Type::SEND_RECEIPT => {
                handle_send_receipt::handle_send_receipt(&c.message, pending_sequences)
            }
            Type::SEND_ERROR => handle_send_error::handle_send_error(&c.message, pending_sequences),

            //
            Type::ACK_RESPONSE => {
                handle_ack_response::handle_ack_response(&c.message, pending_requests)
            }

            //
            _ => Err(HandlerHandleError::Unsupported(c.message.to_owned())),
        },
        CommandWithParsed::Payload(c) => match &c.message.get_field_type() {
            Type::MESSAGE => handle_message::handle_message(
                &c.message,
                &c.metadata,
                &c.payload,
                c.is_checksum_match,
            ),
            _ => Err(HandlerHandleError::Unsupported(c.message.to_owned())),
        },
    }
}

pub fn handle_with_connect(
    command: &CommandWithParsed,
) -> Result<HandlerHandleOutput, HandlerHandleError> {
    match command {
        CommandWithParsed::Simple(c) => match &c.message.get_field_type() {
            Type::CONNECTED => handle_connected::handle_connected(&c.message),
            Type::ERROR => handle_error::handle_error_with_connect(&c.message),
            _ => Err(HandlerHandleError::Unsupported(c.message.to_owned())),
        },
        CommandWithParsed::Payload(c) => Err(HandlerHandleError::Unsupported(c.message.to_owned())),
    }
}