pulsar_binary_protocol_spec/client_handler/
handle_message.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
use crate::{
    command::PayloadCommandPayloadWithParsed,
    commands::MessageCommand,
    protos::protobuf::pulsar_api::{BaseCommand, MessageMetadata},
};

use super::{HandlerHandleError, HandlerHandleOutput};

pub(super) fn handle_message(
    base_command: &BaseCommand,
    message_metadata: &MessageMetadata,
    payload: &PayloadCommandPayloadWithParsed,
    is_checksum_match: Option<bool>,
) -> Result<HandlerHandleOutput, HandlerHandleError> {
    if let Some(c) = base_command.message.as_ref() {
        let c = MessageCommand {
            inner_command: c.to_owned(),
            message_metadata: message_metadata.to_owned(),
            payload: payload.to_owned(),
            is_checksum_match,
        };
        Ok(HandlerHandleOutput::BrokerPushMessage(Box::new(c)))
    } else {
        Err(HandlerHandleError::BaseCommandInvalid(
            base_command.to_owned(),
        ))
    }
}