rocketmq_remoting/protocol/body/
consumer_connection.rs1use std::collections::HashSet;
19use std::sync::Arc;
20
21use cheetah_string::CheetahString;
22use dashmap::DashMap;
23use parking_lot::RwLock;
24use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere;
25use serde::ser::SerializeStruct;
26use serde::Serialize;
27use serde::Serializer;
28
29use crate::protocol::body::connection::Connection;
30use crate::protocol::heartbeat::consume_type::ConsumeType;
31use crate::protocol::heartbeat::message_model::MessageModel;
32use crate::protocol::heartbeat::subscription_data::SubscriptionData;
33
34#[derive(Debug, Clone, Default)]
35pub struct ConsumerConnection {
36 connection_set: HashSet<Connection>,
37 subscription_table: Arc<DashMap<CheetahString, SubscriptionData>>,
38 consume_type: Arc<RwLock<ConsumeType>>,
39 message_model: Arc<RwLock<MessageModel>>,
40 consume_from_where: Arc<RwLock<ConsumeFromWhere>>,
41}
42
43impl ConsumerConnection {
44 pub fn new() -> Self {
45 ConsumerConnection {
46 connection_set: HashSet::new(),
47 subscription_table: Arc::new(DashMap::new()),
48 consume_type: Arc::new(RwLock::new(ConsumeType::default())),
49 message_model: Arc::new(RwLock::new(MessageModel::default())),
50 consume_from_where: Arc::new(RwLock::new(ConsumeFromWhere::default())),
51 }
52 }
53}
54
55impl Serialize for ConsumerConnection {
56 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
57 where
58 S: Serializer,
59 {
60 let mut s = serializer.serialize_struct("ConsumerConnection", 5)?;
61 s.serialize_field("connection_set", &self.connection_set)?;
62 s.serialize_field("subscription_table", &*self.subscription_table)?;
63 s.serialize_field("consume_type", &*self.consume_type.read())?;
64 s.serialize_field("message_model", &*self.message_model.read())?;
65 s.serialize_field("consume_from_where", &*self.consume_from_where.read())?;
66 s.end()
67 }
68}
69
70impl ConsumerConnection {
71 pub fn get_connection_set(&self) -> HashSet<Connection> {
72 self.connection_set.clone()
73 }
74
75 pub fn connection_set_insert(&mut self, connection: Connection) {
76 self.connection_set.insert(connection);
77 }
78
79 pub fn set_connection_set(&mut self, connection_set: HashSet<Connection>) {
80 self.connection_set = connection_set;
81 }
82
83 pub fn get_subscription_table(&self) -> Arc<DashMap<CheetahString, SubscriptionData>> {
84 self.subscription_table.clone()
85 }
86
87 pub fn set_subscription_table(
88 &mut self,
89 subscription_table: DashMap<CheetahString, SubscriptionData>,
90 ) {
91 self.subscription_table = Arc::new(subscription_table);
92 }
93
94 pub fn get_consume_type(&self) -> ConsumeType {
95 *self.consume_type.read()
96 }
97
98 pub fn set_consume_type(&mut self, consume_type: ConsumeType) {
99 *self.consume_type.write() = consume_type;
100 }
101
102 pub fn get_message_model(&self) -> MessageModel {
103 *self.message_model.read()
104 }
105
106 pub fn set_message_model(&mut self, message_model: MessageModel) {
107 *self.message_model.write() = message_model;
108 }
109
110 pub fn get_consume_from_where(&self) -> ConsumeFromWhere {
111 *self.consume_from_where.read()
112 }
113
114 pub fn set_consume_from_where(&mut self, consume_from_where: ConsumeFromWhere) {
115 *self.consume_from_where.write() = consume_from_where;
116 }
117}