1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
use protobuf::SingularPtrField; use crate::{ command::{Command, SimpleCommand}, protos::{ protobuf::pulsar_api::{BaseCommand, BaseCommand_Type as Type, CommandSubscribe}, utils::convert_tuple_slice_to_key_value_vector, }, types::{ConsumerId, RequestId, SubscribeType}, }; #[derive(Clone, Debug)] pub struct SubscribeCommand { #[cfg(feature = "with-hacking-commands")] pub inner_command: CommandSubscribe, #[cfg(not(feature = "with-hacking-commands"))] pub(crate) inner_command: CommandSubscribe, } impl SubscribeCommand { pub fn new(topic: &str, subscription: &str, subscribe_type: SubscribeType) -> Self { let mut inner_command = CommandSubscribe::new(); inner_command.set_topic(topic.into()); inner_command.set_subscription(subscription.into()); inner_command.set_subType(subscribe_type.into()); Self { inner_command } } pub fn set_consumer_id(&mut self, consumer_id: ConsumerId) -> &mut Self { self.inner_command.set_consumer_id(consumer_id.into()); self } pub fn get_consumer_id(&self) -> ConsumerId { ConsumerId::new(self.inner_command.get_consumer_id()) } pub fn set_request_id(&mut self, request_id: RequestId) -> &mut Self { self.inner_command.set_request_id(request_id.into()); self } pub fn get_request_id(&self) -> RequestId { RequestId::new(self.inner_command.get_request_id()) } pub fn set_consumer_name(&mut self, consumer_name: &str) -> &mut Self { self.inner_command.set_consumer_name(consumer_name.into()); self } pub fn append_metadata(&mut self, metadata: &[(&str, &str)]) -> &mut Self { for kv in convert_tuple_slice_to_key_value_vector(metadata) { self.inner_command.metadata.push(kv); } self } pub fn clear_metadata(&mut self) -> &mut Self { self.inner_command.metadata.clear(); self } } impl From<&SubscribeCommand> for Command { fn from(c: &SubscribeCommand) -> Self { let mut base_command = BaseCommand::new(); base_command.set_field_type(Type::SUBSCRIBE); base_command.subscribe = SingularPtrField::some(c.inner_command.to_owned()); Command::Simple(SimpleCommand { message: base_command, }) } }