#![allow(incomplete_features)]
#![feature(impl_trait_in_assoc_type)]
use std::future::Future;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use cheetah_string::CheetahString;
use rocketmq_error::RocketMQResult;
use rocketmq_remoting::code::request_code::RequestCode;
use rocketmq_remoting::net::channel::Channel;
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
use rocketmq_remoting::runtime::processor_v2::CoreProcessorVariant;
use rocketmq_remoting::runtime::processor_v2::ProcessorDispatcher;
use rocketmq_remoting::runtime::processor_v2::RequestProcessorV2;
#[derive(Clone)]
pub struct SendMessageProcessor {
send_count: Arc<AtomicU64>,
}
impl Default for SendMessageProcessor {
fn default() -> Self {
Self::new()
}
}
impl SendMessageProcessor {
pub fn new() -> Self {
Self {
send_count: Arc::new(AtomicU64::new(0)),
}
}
pub fn get_send_count(&self) -> u64 {
self.send_count.load(Ordering::Relaxed)
}
}
impl RequestProcessorV2 for SendMessageProcessor {
type Fut<'a>
= impl Future<Output = RocketMQResult<Option<RemotingCommand>>> + Send + 'a
where
Self: 'a;
fn process_request<'a>(
&'a mut self,
_channel: Channel,
_ctx: ConnectionHandlerContext,
request: &'a mut RemotingCommand,
) -> Self::Fut<'a> {
async move {
self.send_count.fetch_add(1, Ordering::Relaxed);
let topic = request
.ext_fields()
.and_then(|f| f.get("topic"))
.cloned()
.unwrap_or_else(|| CheetahString::from("DefaultTopic"));
println!("[SendMessageProcessor] Processing send to topic: {}", topic);
let response = RemotingCommand::create_response_command()
.set_code(0) .set_remark(format!("Message sent to {}", topic));
Ok(Some(response))
}
}
}
#[derive(Clone)]
pub struct PullMessageProcessor {
pull_count: Arc<AtomicU64>,
}
impl Default for PullMessageProcessor {
fn default() -> Self {
Self::new()
}
}
impl PullMessageProcessor {
pub fn new() -> Self {
Self {
pull_count: Arc::new(AtomicU64::new(0)),
}
}
pub fn get_pull_count(&self) -> u64 {
self.pull_count.load(Ordering::Relaxed)
}
}
impl RequestProcessorV2 for PullMessageProcessor {
type Fut<'a>
= impl Future<Output = RocketMQResult<Option<RemotingCommand>>> + Send + 'a
where
Self: 'a;
fn process_request<'a>(
&'a mut self,
_channel: Channel,
_ctx: ConnectionHandlerContext,
request: &'a mut RemotingCommand,
) -> Self::Fut<'a> {
async move {
self.pull_count.fetch_add(1, Ordering::Relaxed);
let queue_id = request
.ext_fields()
.and_then(|f| f.get("queueId"))
.and_then(|s| s.parse::<i32>().ok())
.unwrap_or(0);
println!("[PullMessageProcessor] Processing pull from queue: {}", queue_id);
let response = RemotingCommand::create_response_command()
.set_code(0) .set_body(vec![0u8; 1024]) .set_remark(format!("Pulled from queue {}", queue_id));
Ok(Some(response))
}
}
}
#[derive(Clone)]
pub struct AdminProcessor {
admin_count: Arc<AtomicU64>,
}
impl Default for AdminProcessor {
fn default() -> Self {
Self::new()
}
}
impl AdminProcessor {
pub fn new() -> Self {
Self {
admin_count: Arc::new(AtomicU64::new(0)),
}
}
pub fn get_admin_count(&self) -> u64 {
self.admin_count.load(Ordering::Relaxed)
}
}
impl RequestProcessorV2 for AdminProcessor {
type Fut<'a>
= impl Future<Output = RocketMQResult<Option<RemotingCommand>>> + Send + 'a
where
Self: 'a;
fn process_request<'a>(
&'a mut self,
_channel: Channel,
_ctx: ConnectionHandlerContext,
request: &'a mut RemotingCommand,
) -> Self::Fut<'a> {
async move {
self.admin_count.fetch_add(1, Ordering::Relaxed);
let operation = request
.ext_fields()
.and_then(|f| f.get("operation"))
.cloned()
.unwrap_or_else(|| CheetahString::from("status"));
println!("[AdminProcessor] Processing admin operation: {}", operation);
let response = RemotingCommand::create_response_command()
.set_code(0)
.set_remark(format!("Admin operation {} completed", operation));
Ok(Some(response))
}
}
}
pub struct AppProcessorDispatcher {
dispatcher: ProcessorDispatcher<SendMessageProcessor, PullMessageProcessor, AdminProcessor>,
send_processor_ref: SendMessageProcessor,
pull_processor_ref: PullMessageProcessor,
admin_processor_ref: AdminProcessor,
}
impl Default for AppProcessorDispatcher {
fn default() -> Self {
Self::new()
}
}
impl AppProcessorDispatcher {
pub fn new() -> Self {
let send_processor = SendMessageProcessor::new();
let pull_processor = PullMessageProcessor::new();
let admin_processor = AdminProcessor::new();
let send_processor_ref = send_processor.clone();
let pull_processor_ref = pull_processor.clone();
let admin_processor_ref = admin_processor.clone();
let mut dispatcher = ProcessorDispatcher::new(send_processor, pull_processor, admin_processor);
dispatcher.register_core(RequestCode::SendMessage as i32, CoreProcessorVariant::Send);
dispatcher.register_core(RequestCode::SendMessageV2 as i32, CoreProcessorVariant::Send);
dispatcher.register_core(RequestCode::SendBatchMessage as i32, CoreProcessorVariant::Send);
dispatcher.register_core(RequestCode::PullMessage as i32, CoreProcessorVariant::Pull);
dispatcher.register_core(RequestCode::LitePullMessage as i32, CoreProcessorVariant::Pull);
dispatcher.register_core(RequestCode::GetBrokerConfig as i32, CoreProcessorVariant::Admin);
dispatcher.register_core(RequestCode::UpdateBrokerConfig as i32, CoreProcessorVariant::Admin);
Self {
dispatcher,
send_processor_ref,
pull_processor_ref,
admin_processor_ref,
}
}
pub fn register_experimental_features(&mut self) {
self.dispatcher.register_plugin(9001, |_channel, _ctx, request| {
let code = request.code();
let opaque = request.opaque();
async move {
println!("[Plugin:Trace] Request code: {}, opaque: {}", code, opaque);
let response = RemotingCommand::create_response_command().set_remark("Trace logged".to_string());
Ok(Some(response))
}
});
self.dispatcher
.register_plugin(9002, |_channel, _ctx, _request| async move {
println!("[Plugin:ABTest] Testing new algorithm");
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
let response =
RemotingCommand::create_response_command().set_remark("New algorithm result".to_string());
Ok(Some(response))
});
self.dispatcher
.register_plugin(9003, |_channel, _ctx, _request| async move {
println!("[Plugin:ThirdParty] External processor");
let response =
RemotingCommand::create_response_command().set_remark("Third-party processed".to_string());
Ok(Some(response))
});
}
pub async fn process(
&mut self,
request_code: i32,
channel: Channel,
ctx: ConnectionHandlerContext,
request: &mut RemotingCommand,
) -> RocketMQResult<Option<RemotingCommand>> {
self.dispatcher.dispatch(request_code, channel, ctx, request).await
}
pub fn get_metrics(&self) -> (u64, u64, u64) {
(
self.send_processor_ref.get_send_count(),
self.pull_processor_ref.get_pull_count(),
self.admin_processor_ref.get_admin_count(),
)
}
}
#[allow(dead_code, unreachable_code, unused_variables)]
#[tokio::main]
async fn main() -> RocketMQResult<()> {
println!("=== RocketMQ Processor Registry V2 Demo ===\n");
unimplemented!("This example requires actual runtime initialization. See tests for working examples.");
Ok(())
}
#[cfg(test)]
mod benchmarks {
use super::*;
#[tokio::test]
#[ignore = "Requires actual runtime initialization"]
async fn benchmark_core_processor_throughput() {
let mut dispatcher = AppProcessorDispatcher::new();
let channel: Channel = todo!();
let ctx: ConnectionHandlerContext = todo!();
let iterations = 100_000;
let start = std::time::Instant::now();
for _ in 0..iterations {
let mut request = RemotingCommand::create_remoting_command(RequestCode::SendMessage as i32);
let _ = dispatcher
.process(
RequestCode::SendMessage as i32,
channel.clone(),
ctx.clone(),
&mut request,
)
.await;
}
let duration = start.elapsed();
let qps = iterations as f64 / duration.as_secs_f64();
println!("Core processor QPS: {:.0}", qps);
println!("Average latency: {:?}", duration / iterations);
assert!(qps > 500_000.0, "QPS too low: {}", qps);
}
#[tokio::test]
#[ignore = "Requires actual runtime initialization"]
async fn benchmark_plugin_processor_throughput() {
let mut dispatcher = AppProcessorDispatcher::new();
dispatcher.register_experimental_features();
let channel: Channel = todo!();
let ctx: ConnectionHandlerContext = todo!();
let iterations = 10_000;
let start = std::time::Instant::now();
for _ in 0..iterations {
let mut request = RemotingCommand::create_remoting_command(9001);
let _ = dispatcher
.process(9001, channel.clone(), ctx.clone(), &mut request)
.await;
}
let duration = start.elapsed();
let qps = iterations as f64 / duration.as_secs_f64();
println!("Plugin processor QPS: {:.0}", qps);
println!("Average latency: {:?}", duration / iterations);
assert!(qps > 50_000.0, "Plugin QPS too low: {}", qps);
}
}