pulsar_binary_protocol_spec/client_handler/
handle_error.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
use crate::{commands::ErrorCommand, protos::protobuf::pulsar_api::BaseCommand};

use super::{
    HandlerHandleError, HandlerHandleOutput, OnResponded, PendingRequestValue, PendingRequests,
};

pub(super) fn handle_error(
    base_command: &BaseCommand,
    pending_requests: &mut PendingRequests,
) -> Result<HandlerHandleOutput, HandlerHandleError> {
    if let Some(c) = base_command.error.as_ref() {
        let c = ErrorCommand {
            inner_command: c.to_owned(),
        };
        if let Some(pending_request) = pending_requests.remove(&c.get_request_id()) {
            match pending_request {
                PendingRequestValue::SessionCreateProducer(producer_command, s) => Ok(
                    HandlerHandleOutput::OnResponded(Box::new(OnResponded::SessionCreateProducer(
                        producer_command,
                        s,
                        Err((c.get_error(), c.get_message()).into()),
                    ))),
                ),
                PendingRequestValue::SessionCreateConsumer(subscribe_command, s) => Ok(
                    HandlerHandleOutput::OnResponded(Box::new(OnResponded::SessionCreateConsumer(
                        subscribe_command,
                        s,
                        Err((c.get_error(), c.get_message()).into()),
                    ))),
                ),
                PendingRequestValue::ConsumerAck(s) => {
                    Ok(HandlerHandleOutput::OnResponded(Box::new(
                        OnResponded::ConsumerAck(s, Err((c.get_error(), c.get_message()).into())),
                    )))
                }
            }
        } else {
            Err(HandlerHandleError::PendingRequestNotFount(
                base_command.to_owned(),
            ))
        }
    } else {
        Err(HandlerHandleError::BaseCommandInvalid(
            base_command.to_owned(),
        ))
    }
}

pub(super) fn handle_error_with_connect(
    base_command: &BaseCommand,
) -> Result<HandlerHandleOutput, HandlerHandleError> {
    if let Some(c) = base_command.error.as_ref() {
        let c = ErrorCommand {
            inner_command: c.to_owned(),
        };
        Ok(HandlerHandleOutput::OnConnectResponded(Err((
            c.get_error(),
            c.get_message(),
        )
            .into())))
    } else {
        Err(HandlerHandleError::BaseCommandInvalid(
            base_command.to_owned(),
        ))
    }
}