#![allow(incomplete_features)]
#![feature(impl_trait_in_assoc_type)]
use std::future::Future;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use rocketmq_error::RocketMQResult;
use rocketmq_remoting::net::channel::Channel;
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
use rocketmq_remoting::runtime::connection_handler_context::ConnectionHandlerContext;
use rocketmq_remoting::runtime::processor_v2::CoreProcessor;
use rocketmq_remoting::runtime::processor_v2::CoreProcessorVariant;
use rocketmq_remoting::runtime::processor_v2::PluginProcessorRegistry;
use rocketmq_remoting::runtime::processor_v2::ProcessorDispatcher;
use rocketmq_remoting::runtime::processor_v2::RequestProcessorV2;
struct EchoProcessor;
impl RequestProcessorV2 for EchoProcessor {
type Fut<'a>
= impl Future<Output = RocketMQResult<Option<RemotingCommand>>> + Send + 'a
where
Self: 'a;
fn process_request<'a>(
&'a mut self,
_channel: Channel,
_ctx: ConnectionHandlerContext,
_request: &'a mut RemotingCommand,
) -> Self::Fut<'a> {
async move {
let response = RemotingCommand::create_response_command()
.set_code(100)
.set_remark("Echo from EchoProcessor");
Ok(Some(response))
}
}
}
#[derive(Clone)]
struct MetricsProcessor {
request_count: Arc<AtomicU64>,
total_bytes: Arc<AtomicU64>,
}
impl MetricsProcessor {
fn new() -> Self {
Self {
request_count: Arc::new(AtomicU64::new(0)),
total_bytes: Arc::new(AtomicU64::new(0)),
}
}
fn get_metrics(&self) -> (u64, u64) {
(
self.request_count.load(Ordering::Relaxed),
self.total_bytes.load(Ordering::Relaxed),
)
}
}
impl RequestProcessorV2 for MetricsProcessor {
type Fut<'a>
= impl Future<Output = RocketMQResult<Option<RemotingCommand>>> + Send + 'a
where
Self: 'a;
fn process_request<'a>(
&'a mut self,
_channel: Channel,
_ctx: ConnectionHandlerContext,
request: &'a mut RemotingCommand,
) -> Self::Fut<'a> {
async move {
self.request_count.fetch_add(1, Ordering::Relaxed);
if let Some(body) = request.body() {
self.total_bytes.fetch_add(body.len() as u64, Ordering::Relaxed);
}
let response = RemotingCommand::create_response_command().set_remark(format!(
"Processed {} requests",
self.request_count.load(Ordering::Relaxed)
));
Ok(Some(response))
}
}
}
#[test]
fn test_metrics_processor_creation() {
let processor = MetricsProcessor::new();
let (count, bytes) = processor.get_metrics();
assert_eq!(count, 0);
assert_eq!(bytes, 0);
}
type MyAppCoreProcessor = CoreProcessor<EchoProcessor, MetricsProcessor, EchoProcessor>;
#[test]
fn test_core_processor_types() {
let _echo = MyAppCoreProcessor::Send(EchoProcessor);
let _metrics = MyAppCoreProcessor::Pull(MetricsProcessor::new());
let _admin = MyAppCoreProcessor::Admin(EchoProcessor);
}
#[test]
fn test_plugin_registry_creation() {
let mut registry = PluginProcessorRegistry::new();
registry.register(1001, |_channel, _ctx, _request| async move {
let response = RemotingCommand::create_response_command().set_remark("Plugin 1001");
Ok(Some(response))
});
registry.register(1002, |_channel, _ctx, request| {
let code = request.code();
async move {
let response =
RemotingCommand::create_response_command().set_remark(format!("Plugin 1002 handled code {}", code));
Ok(Some(response))
}
});
assert!(registry.contains(1001));
assert!(registry.contains(1002));
assert!(!registry.contains(9999));
}
#[test]
fn test_dispatcher_creation() {
let send_processor = EchoProcessor;
let pull_processor = MetricsProcessor::new();
let admin_processor = EchoProcessor;
let mut dispatcher = ProcessorDispatcher::new(send_processor, pull_processor, admin_processor);
dispatcher.register_core(100, CoreProcessorVariant::Send);
dispatcher.register_core(200, CoreProcessorVariant::Pull);
dispatcher.register_core(300, CoreProcessorVariant::Admin);
dispatcher.register_plugin(9001, |_channel, _ctx, _request| async move {
let response = RemotingCommand::create_response_command().set_remark("Experimental Feature");
Ok(Some(response))
});
}
struct AsyncIoProcessor {
db_client: Arc<MockDatabase>,
}
struct MockDatabase;
impl MockDatabase {
async fn query(&self, _key: &str) -> String {
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
"mock_value".to_string()
}
}
impl RequestProcessorV2 for AsyncIoProcessor {
type Fut<'a>
= impl Future<Output = RocketMQResult<Option<RemotingCommand>>> + Send + 'a
where
Self: 'a;
fn process_request<'a>(
&'a mut self,
_channel: Channel,
_ctx: ConnectionHandlerContext,
request: &'a mut RemotingCommand,
) -> Self::Fut<'a> {
async move {
let key = request.remark().as_ref().map(|s| s.as_str()).unwrap_or("default");
let value = self.db_client.query(key).await;
let response = RemotingCommand::create_response_command().set_remark(format!("DB result: {}", value));
Ok(Some(response))
}
}
}
#[tokio::test]
async fn test_async_io_processor() {
let processor = AsyncIoProcessor {
db_client: Arc::new(MockDatabase),
};
let _request = RemotingCommand::create_remoting_command(400).set_remark("test_key");
let _result = async {
let _value = processor.db_client.query("test_key").await;
Ok::<(), rocketmq_error::RocketMQError>(())
}
.await;
}
#[test]
#[ignore = "Manual verification required"]
fn verify_zero_allocation() {
}
#[test]
fn test_plugin_hot_reload() {
let send_processor = EchoProcessor;
let pull_processor = MetricsProcessor::new();
let admin_processor = EchoProcessor;
let mut dispatcher = ProcessorDispatcher::new(send_processor, pull_processor, admin_processor);
dispatcher.register_plugin(5000, |_channel, _ctx, _request| async move {
let response = RemotingCommand::create_response_command().set_remark("Plugin v1");
Ok(Some(response))
});
dispatcher.register_plugin(5000, |_channel, _ctx, _request| async move {
let response = RemotingCommand::create_response_command().set_remark("Plugin v2 - Updated!");
Ok(Some(response))
});
}
#[allow(dead_code)]
fn api_surface_documentation() {}
#[test]
fn test_core_mapping_routing() {
let send = MetricsProcessor::new();
let pull = MetricsProcessor::new();
let admin = MetricsProcessor::new();
let send_ref = send.clone();
let pull_ref = pull.clone();
let admin_ref = admin.clone();
let mut dispatcher = ProcessorDispatcher::new(send, pull, admin);
dispatcher.register_core(100, CoreProcessorVariant::Pull);
dispatcher.register_core(200, CoreProcessorVariant::Admin);
dispatcher.register_core(300, CoreProcessorVariant::Send);
assert_eq!(send_ref.get_metrics().0, 0);
assert_eq!(pull_ref.get_metrics().0, 0);
assert_eq!(admin_ref.get_metrics().0, 0);
}