pulsar_binary_protocol_spec/client_handler/
handle_send_error.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
use crate::{commands::SendErrorCommand, protos::protobuf::pulsar_api::BaseCommand};

use super::{HandlerHandleError, HandlerHandleOutput, OnResponded, PendingSequences};

pub(super) fn handle_send_error(
    base_command: &BaseCommand,
    pending_sequences: &mut PendingSequences,
) -> Result<HandlerHandleOutput, HandlerHandleError> {
    if let Some(c) = base_command.send_error.as_ref() {
        let c = SendErrorCommand {
            inner_command: c.to_owned(),
        };
        if let Some(pending_sequence) = pending_sequences.remove(&c.get_sequence_id()) {
            Ok(HandlerHandleOutput::OnResponded(Box::new(
                OnResponded::ProducerSend(
                    pending_sequence,
                    Err((c.get_error(), c.get_message()).into()),
                ),
            )))
        } else {
            Err(HandlerHandleError::PendingSequenceNotFount(
                base_command.to_owned(),
            ))
        }
    } else {
        Err(HandlerHandleError::BaseCommandInvalid(
            base_command.to_owned(),
        ))
    }
}