rocketmq_remoting/protocol/body/
consumer_connection.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::collections::HashSet;
19use std::sync::Arc;
20
21use cheetah_string::CheetahString;
22use dashmap::DashMap;
23use parking_lot::RwLock;
24use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere;
25use serde::ser::SerializeStruct;
26use serde::Serialize;
27use serde::Serializer;
28
29use crate::protocol::body::connection::Connection;
30use crate::protocol::heartbeat::consume_type::ConsumeType;
31use crate::protocol::heartbeat::message_model::MessageModel;
32use crate::protocol::heartbeat::subscription_data::SubscriptionData;
33
34#[derive(Debug, Clone, Default)]
35pub struct ConsumerConnection {
36    connection_set: HashSet<Connection>,
37    subscription_table: Arc<DashMap<CheetahString, SubscriptionData>>,
38    consume_type: Arc<RwLock<ConsumeType>>,
39    message_model: Arc<RwLock<MessageModel>>,
40    consume_from_where: Arc<RwLock<ConsumeFromWhere>>,
41}
42
43impl ConsumerConnection {
44    pub fn new() -> Self {
45        ConsumerConnection {
46            connection_set: HashSet::new(),
47            subscription_table: Arc::new(DashMap::new()),
48            consume_type: Arc::new(RwLock::new(ConsumeType::default())),
49            message_model: Arc::new(RwLock::new(MessageModel::default())),
50            consume_from_where: Arc::new(RwLock::new(ConsumeFromWhere::default())),
51        }
52    }
53}
54
55impl Serialize for ConsumerConnection {
56    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
57    where
58        S: Serializer,
59    {
60        let mut s = serializer.serialize_struct("ConsumerConnection", 5)?;
61        s.serialize_field("connection_set", &self.connection_set)?;
62        s.serialize_field("subscription_table", &*self.subscription_table)?;
63        s.serialize_field("consume_type", &*self.consume_type.read())?;
64        s.serialize_field("message_model", &*self.message_model.read())?;
65        s.serialize_field("consume_from_where", &*self.consume_from_where.read())?;
66        s.end()
67    }
68}
69
70impl ConsumerConnection {
71    pub fn get_connection_set(&self) -> HashSet<Connection> {
72        self.connection_set.clone()
73    }
74
75    pub fn connection_set_insert(&mut self, connection: Connection) {
76        self.connection_set.insert(connection);
77    }
78
79    pub fn set_connection_set(&mut self, connection_set: HashSet<Connection>) {
80        self.connection_set = connection_set;
81    }
82
83    pub fn get_subscription_table(&self) -> Arc<DashMap<CheetahString, SubscriptionData>> {
84        self.subscription_table.clone()
85    }
86
87    pub fn set_subscription_table(
88        &mut self,
89        subscription_table: DashMap<CheetahString, SubscriptionData>,
90    ) {
91        self.subscription_table = Arc::new(subscription_table);
92    }
93
94    pub fn get_consume_type(&self) -> ConsumeType {
95        *self.consume_type.read()
96    }
97
98    pub fn set_consume_type(&mut self, consume_type: ConsumeType) {
99        *self.consume_type.write() = consume_type;
100    }
101
102    pub fn get_message_model(&self) -> MessageModel {
103        *self.message_model.read()
104    }
105
106    pub fn set_message_model(&mut self, message_model: MessageModel) {
107        *self.message_model.write() = message_model;
108    }
109
110    pub fn get_consume_from_where(&self) -> ConsumeFromWhere {
111        *self.consume_from_where.read()
112    }
113
114    pub fn set_consume_from_where(&mut self, consume_from_where: ConsumeFromWhere) {
115        *self.consume_from_where.write() = consume_from_where;
116    }
117}