use std::collections::HashMap;
use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::message::message_queue_for_c::MessageQueueForC;
use serde::Deserialize;
use serde::Serialize;
use serde_json_any_key::*;
use crate::protocol::admin::consume_stats::append_message_queue_object_key;
use crate::protocol::admin::consume_stats::normalize_nonstandard_offset_table_keys;
use crate::protocol::RemotingDeserializable;
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
#[serde(rename_all = "camelCase")]
pub struct ResetOffsetBody {
#[serde(with = "any_key_map")]
pub offset_table: HashMap<MessageQueue, i64>,
}
impl ResetOffsetBody {
pub fn new() -> Self {
Self {
offset_table: HashMap::new(),
}
}
pub fn encode(&self) -> Vec<u8> {
self.encode_java_compatible().unwrap_or_default()
}
pub fn encode_java_compatible(&self) -> rocketmq_error::RocketMQResult<Vec<u8>> {
Ok(self.to_java_compatible_json()?.into_bytes())
}
pub fn to_java_compatible_json(&self) -> rocketmq_error::RocketMQResult<String> {
let mut body = String::new();
body.push_str("{\"offsetTable\":{");
for (index, (queue, offset)) in self.offset_table.iter().enumerate() {
if index > 0 {
body.push(',');
}
append_message_queue_object_key(&mut body, queue)?;
body.push(':');
body.push_str(&offset.to_string());
}
body.push_str("}}");
Ok(body)
}
pub fn decode(body: &[u8]) -> Option<Self> {
match <Self as RemotingDeserializable>::decode(body) {
Ok(decoded) => Some(decoded),
Err(_) => {
let raw = std::str::from_utf8(body).ok()?;
let normalized = normalize_nonstandard_offset_table_keys(raw);
if normalized == raw {
return None;
}
<Self as RemotingDeserializable>::decode_str(&normalized).ok()
}
}
}
}
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
#[serde(rename_all = "camelCase")]
pub struct ResetOffsetBodyForC {
pub offset_table: Vec<MessageQueueForC>,
}
impl ResetOffsetBodyForC {
pub fn new() -> Self {
Self {
offset_table: Vec::new(),
}
}
pub fn from_offset_table(offset_table: &HashMap<MessageQueue, i64>) -> Self {
let offset_list: Vec<MessageQueueForC> = offset_table
.iter()
.map(|(mq, offset)| MessageQueueForC::from_message_queue(mq, *offset))
.collect();
Self {
offset_table: offset_list,
}
}
pub fn encode(&self) -> Vec<u8> {
serde_json::to_vec(self).unwrap_or_default()
}
pub fn decode(body: &[u8]) -> Option<Self> {
serde_json::from_slice(body).ok()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_reset_offset_body_new() {
let body = ResetOffsetBody::new();
assert!(body.offset_table.is_empty());
}
#[test]
fn test_reset_offset_body_encode_decode() {
let mut body = ResetOffsetBody::new();
let mq = MessageQueue::from_parts("test_topic", "broker-a", 1);
body.offset_table.insert(mq.clone(), 100);
let encoded = body.encode();
assert!(!encoded.is_empty());
let json_str = String::from_utf8(encoded).unwrap();
assert!(json_str.contains("offsetTable"));
}
#[test]
fn reset_offset_body_encode_uses_java_object_keys() {
let mut body = ResetOffsetBody::new();
body.offset_table
.insert(MessageQueue::from_parts("test_topic", "broker-a", 1), 100);
let encoded = String::from_utf8(body.encode()).expect("reset offset body should be utf8");
assert!(encoded.contains(r#""offsetTable":{{"topic":"test_topic","brokerName":"broker-a","queueId":1}:100}"#));
assert!(!encoded.contains(r#""{\"topic\""#));
}
#[test]
fn reset_offset_body_decode_accepts_java_object_keys() {
let body = r#"{"offsetTable":{{"topic":"test_topic","brokerName":"broker-a","queueId":1}:100}}"#;
let decoded = ResetOffsetBody::decode(body.as_bytes()).expect("decode reset offset body");
let queue = MessageQueue::from_parts("test_topic", "broker-a", 1);
assert_eq!(decoded.offset_table.get(&queue), Some(&100));
}
#[test]
fn test_reset_offset_body_for_c_from_offset_table() {
let mut offset_table = HashMap::new();
let mq1 = MessageQueue::from_parts("topic1", "broker-a", 0);
let mq2 = MessageQueue::from_parts("topic1", "broker-a", 1);
offset_table.insert(mq1, 100);
offset_table.insert(mq2, 200);
let body = ResetOffsetBodyForC::from_offset_table(&offset_table);
assert_eq!(body.offset_table.len(), 2);
for mq_for_c in &body.offset_table {
assert!(mq_for_c.offset == 100 || mq_for_c.offset == 200);
}
}
#[test]
fn test_reset_offset_body_for_c_encode_decode() {
let mut body = ResetOffsetBodyForC::new();
body.offset_table
.push(MessageQueueForC::new("test_topic", "broker-a", 1, 100));
let encoded = body.encode();
let decoded = ResetOffsetBodyForC::decode(&encoded).unwrap();
assert_eq!(decoded.offset_table.len(), 1);
assert_eq!(decoded.offset_table[0].offset, 100);
}
}