use std::collections::HashMap;
use std::collections::HashSet;
use cheetah_string::CheetahString;
use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere;
use serde::Deserialize;
use serde::Serialize;
use crate::protocol::body::connection::Connection;
use crate::protocol::heartbeat::consume_type::ConsumeType;
use crate::protocol::heartbeat::message_model::MessageModel;
use crate::protocol::heartbeat::subscription_data::SubscriptionData;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ConsumerConnection {
connection_set: HashSet<Connection>,
#[serde(default)]
subscription_table: HashMap<CheetahString, SubscriptionData>,
#[serde(skip_serializing_if = "Option::is_none")]
consume_type: Option<ConsumeType>,
#[serde(skip_serializing_if = "Option::is_none")]
message_model: Option<MessageModel>,
#[serde(skip_serializing_if = "Option::is_none")]
consume_from_where: Option<ConsumeFromWhere>,
}
impl ConsumerConnection {
pub fn new() -> Self {
ConsumerConnection {
connection_set: HashSet::new(),
subscription_table: HashMap::new(),
consume_type: None,
message_model: None,
consume_from_where: None,
}
}
pub fn compute_min_version(&self) -> i32 {
let mut min_version = i32::MAX;
for connection in &self.connection_set {
let version = connection.get_version();
if version < min_version {
min_version = version;
}
}
min_version
}
pub fn get_connection_set(&self) -> &HashSet<Connection> {
&self.connection_set
}
pub fn set_connection_set(&mut self, connection_set: HashSet<Connection>) {
self.connection_set = connection_set;
}
pub fn insert_connection(&mut self, connection: Connection) -> bool {
self.connection_set.insert(connection)
}
pub fn get_subscription_table(&self) -> &HashMap<CheetahString, SubscriptionData> {
&self.subscription_table
}
pub fn get_subscription_table_mut(&mut self) -> &mut HashMap<CheetahString, SubscriptionData> {
&mut self.subscription_table
}
pub fn set_subscription_table(&mut self, subscription_table: HashMap<CheetahString, SubscriptionData>) {
self.subscription_table = subscription_table;
}
pub fn get_consume_type(&self) -> Option<ConsumeType> {
self.consume_type
}
pub fn set_consume_type(&mut self, consume_type: ConsumeType) {
self.consume_type = Some(consume_type);
}
pub fn get_message_model(&self) -> Option<MessageModel> {
self.message_model
}
pub fn set_message_model(&mut self, message_model: MessageModel) {
self.message_model = Some(message_model);
}
pub fn get_consume_from_where(&self) -> Option<ConsumeFromWhere> {
self.consume_from_where
}
pub fn set_consume_from_where(&mut self, consume_from_where: ConsumeFromWhere) {
self.consume_from_where = Some(consume_from_where);
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_connection() -> Connection {
let mut conn = Connection::new();
conn.set_client_id(CheetahString::from_static_str("test_client_1"));
conn.set_client_addr(CheetahString::from_static_str("127.0.0.1:9876"));
conn.set_version(1);
conn
}
fn create_test_subscription() -> SubscriptionData {
SubscriptionData {
topic: CheetahString::from_static_str("test_topic"),
sub_string: CheetahString::from_static_str("*"),
..Default::default()
}
}
#[test]
fn consumer_connection_serializes_correctly() {
let mut consumer_conn = ConsumerConnection::new();
let conn = create_test_connection();
consumer_conn.insert_connection(conn);
consumer_conn.set_consume_type(ConsumeType::ConsumeActively);
consumer_conn.set_message_model(MessageModel::Clustering);
let json_str = serde_json::to_string(&consumer_conn).unwrap();
assert!(json_str.contains("connectionSet"));
assert!(json_str.contains("consumeType"));
assert!(json_str.contains("messageModel"));
}
#[test]
fn consumer_connection_deserializes_correctly() {
let json_str = r#"{
"connectionSet": [
{
"clientId": "test_client",
"clientAddr": "127.0.0.1:9876",
"language": "RUST",
"version": 1
}
],
"subscriptionTable": {
"test_topic": {
"classFilterMode": false,
"topic": "test_topic",
"subString": "*",
"tagsSet": [],
"codeSet": [],
"subVersion": 123456,
"expressionType": "TAG"
}
},
"consumeType": "CONSUME_ACTIVELY",
"messageModel": "CLUSTERING",
"consumeFromWhere": "CONSUME_FROM_LAST_OFFSET"
}"#;
let consumer_conn: ConsumerConnection = serde_json::from_str(json_str).unwrap();
assert_eq!(consumer_conn.get_connection_set().len(), 1);
assert_eq!(consumer_conn.get_subscription_table().len(), 1);
assert!(consumer_conn.get_consume_type().is_some());
assert!(consumer_conn.get_message_model().is_some());
assert!(consumer_conn.get_consume_from_where().is_some());
}
#[test]
fn consumer_connection_default_and_new() {
let consumer_conn = ConsumerConnection::default();
assert!(consumer_conn.get_connection_set().is_empty());
assert!(consumer_conn.get_subscription_table().is_empty());
assert!(consumer_conn.get_consume_type().is_none());
assert!(consumer_conn.get_message_model().is_none());
assert!(consumer_conn.get_consume_from_where().is_none());
let consumer_conn = ConsumerConnection::new();
assert!(consumer_conn.get_connection_set().is_empty());
assert!(consumer_conn.get_subscription_table().is_empty());
assert!(consumer_conn.get_consume_type().is_none());
assert!(consumer_conn.get_message_model().is_none());
assert!(consumer_conn.get_consume_from_where().is_none());
}
#[test]
fn consumer_connection_compute_min_version() {
let consumer_conn = ConsumerConnection::new();
assert_eq!(consumer_conn.compute_min_version(), i32::MAX);
let mut consumer_conn = ConsumerConnection::new();
let mut conn = Connection::new();
conn.set_version(42);
consumer_conn.insert_connection(conn);
assert_eq!(consumer_conn.compute_min_version(), 42);
let mut consumer_conn = ConsumerConnection::new();
let mut conn1 = Connection::new();
conn1.set_client_id(CheetahString::from_static_str("client1"));
conn1.set_version(100);
let mut conn2 = Connection::new();
conn2.set_client_id(CheetahString::from_static_str("client2"));
conn2.set_version(50);
let mut conn3 = Connection::new();
conn3.set_client_id(CheetahString::from_static_str("client3"));
conn3.set_version(75);
consumer_conn.insert_connection(conn1);
consumer_conn.insert_connection(conn2);
consumer_conn.insert_connection(conn3);
assert_eq!(consumer_conn.compute_min_version(), 50);
}
#[test]
fn consumer_connection_set_connection_set() {
let mut consumer_conn = ConsumerConnection::new();
let mut conn_set = HashSet::new();
conn_set.insert(create_test_connection());
consumer_conn.set_connection_set(conn_set.clone());
assert_eq!(consumer_conn.get_connection_set().len(), 1);
}
#[test]
fn consumer_connection_insert_connection() {
let mut consumer_conn = ConsumerConnection::new();
let conn = create_test_connection();
let result = consumer_conn.insert_connection(conn.clone());
assert!(result);
let result = consumer_conn.insert_connection(conn.clone());
assert!(!result);
assert_eq!(consumer_conn.get_connection_set().len(), 1);
assert!(consumer_conn.get_connection_set().contains(&conn));
}
#[test]
fn consumer_connection_subscription_table_operations() {
let mut consumer_conn = ConsumerConnection::new();
let mut sub_table = HashMap::new();
let sub_data = create_test_subscription();
sub_table.insert(CheetahString::from_static_str("test_topic"), sub_data.clone());
consumer_conn.set_subscription_table(sub_table);
assert_eq!(consumer_conn.get_subscription_table().len(), 1);
assert!(consumer_conn
.get_subscription_table()
.contains_key(&CheetahString::from_static_str("test_topic")));
let mut consumer_conn = ConsumerConnection::new();
let sub_data = create_test_subscription();
{
let sub_table = consumer_conn.get_subscription_table_mut();
sub_table.insert(CheetahString::from_static_str("test_topic"), sub_data.clone());
}
assert_eq!(consumer_conn.get_subscription_table().len(), 1);
let retrieved_sub = consumer_conn
.get_subscription_table()
.get(&CheetahString::from_static_str("test_topic"));
assert!(retrieved_sub.is_some());
}
#[test]
fn consumer_connection_consume_type_operations() {
let mut consumer_conn = ConsumerConnection::new();
assert!(consumer_conn.get_consume_type().is_none());
consumer_conn.set_consume_type(ConsumeType::ConsumeActively);
assert_eq!(consumer_conn.get_consume_type(), Some(ConsumeType::ConsumeActively));
}
#[test]
fn consumer_connection_message_model_operations() {
let mut consumer_conn = ConsumerConnection::new();
assert!(consumer_conn.get_message_model().is_none());
consumer_conn.set_message_model(MessageModel::Clustering);
assert_eq!(consumer_conn.get_message_model(), Some(MessageModel::Clustering));
}
#[test]
fn consumer_connection_consume_from_where_operations() {
let mut consumer_conn = ConsumerConnection::new();
assert!(consumer_conn.get_consume_from_where().is_none());
consumer_conn.set_consume_from_where(ConsumeFromWhere::ConsumeFromLastOffset);
assert_eq!(
consumer_conn.get_consume_from_where(),
Some(ConsumeFromWhere::ConsumeFromLastOffset)
);
}
}