use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use rocketmq_error::RocketMQResult;
use crate::net::channel::Channel;
use crate::protocol::remoting_command::RemotingCommand;
use crate::runtime::connection_handler_context::ConnectionHandlerContext;
pub trait RequestProcessorV2 {
type Fut<'a>: Future<Output = RocketMQResult<Option<RemotingCommand>>> + Send
where
Self: 'a;
fn process_request<'a>(
&'a mut self,
channel: Channel,
ctx: ConnectionHandlerContext,
request: &'a mut RemotingCommand,
) -> Self::Fut<'a>;
fn reject_request(&self, _code: i32) -> (bool, Option<RemotingCommand>) {
(false, None)
}
}
#[derive(Clone)]
pub enum CoreProcessor<SendProc, PullProc, AdminProc> {
Send(SendProc),
Pull(PullProc),
Admin(AdminProc),
}
impl<SendProc, PullProc, AdminProc> RequestProcessorV2 for CoreProcessor<SendProc, PullProc, AdminProc>
where
SendProc: RequestProcessorV2,
PullProc: RequestProcessorV2,
AdminProc: RequestProcessorV2,
{
type Fut<'a>
= CoreProcessorFuture<'a, SendProc, PullProc, AdminProc>
where
Self: 'a;
fn process_request<'a>(
&'a mut self,
channel: Channel,
ctx: ConnectionHandlerContext,
request: &'a mut RemotingCommand,
) -> Self::Fut<'a> {
match self {
CoreProcessor::Send(p) => CoreProcessorFuture::Send(p.process_request(channel, ctx, request)),
CoreProcessor::Pull(p) => CoreProcessorFuture::Pull(p.process_request(channel, ctx, request)),
CoreProcessor::Admin(p) => CoreProcessorFuture::Admin(p.process_request(channel, ctx, request)),
}
}
fn reject_request(&self, code: i32) -> (bool, Option<RemotingCommand>) {
match self {
CoreProcessor::Send(p) => p.reject_request(code),
CoreProcessor::Pull(p) => p.reject_request(code),
CoreProcessor::Admin(p) => p.reject_request(code),
}
}
}
pub enum CoreProcessorFuture<'a, SendProc, PullProc, AdminProc>
where
SendProc: RequestProcessorV2 + 'a,
PullProc: RequestProcessorV2 + 'a,
AdminProc: RequestProcessorV2 + 'a,
{
Send(SendProc::Fut<'a>),
Pull(PullProc::Fut<'a>),
Admin(AdminProc::Fut<'a>),
}
impl<'a, SendProc, PullProc, AdminProc> Future for CoreProcessorFuture<'a, SendProc, PullProc, AdminProc>
where
SendProc: RequestProcessorV2 + 'a,
PullProc: RequestProcessorV2 + 'a,
AdminProc: RequestProcessorV2 + 'a,
{
type Output = RocketMQResult<Option<RemotingCommand>>;
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
unsafe {
match self.get_unchecked_mut() {
CoreProcessorFuture::Send(fut) => Pin::new_unchecked(fut).poll(cx),
CoreProcessorFuture::Pull(fut) => Pin::new_unchecked(fut).poll(cx),
CoreProcessorFuture::Admin(fut) => Pin::new_unchecked(fut).poll(cx),
}
}
}
}
type DynProcessor = Box<
dyn for<'a> Fn(
Channel,
ConnectionHandlerContext,
&'a mut RemotingCommand,
) -> Pin<Box<dyn Future<Output = RocketMQResult<Option<RemotingCommand>>> + Send + 'a>>
+ Send
+ Sync,
>;
pub struct PluginProcessorRegistry {
processors: HashMap<i32, DynProcessor>,
}
impl PluginProcessorRegistry {
pub fn new() -> Self {
Self {
processors: HashMap::new(),
}
}
pub fn register<F, Fut>(&mut self, request_code: i32, processor: F)
where
F: Fn(Channel, ConnectionHandlerContext, &mut RemotingCommand) -> Fut + Send + Sync + 'static,
Fut: Future<Output = RocketMQResult<Option<RemotingCommand>>> + Send + 'static,
{
let boxed: DynProcessor = Box::new(move |channel, ctx, request| Box::pin(processor(channel, ctx, request)));
self.processors.insert(request_code, boxed);
}
pub async fn process_request(
&self,
request_code: i32,
channel: Channel,
ctx: ConnectionHandlerContext,
request: &mut RemotingCommand,
) -> Option<RocketMQResult<Option<RemotingCommand>>> {
if let Some(processor) = self.processors.get(&request_code) {
Some(processor(channel, ctx, request).await)
} else {
None
}
}
pub fn contains(&self, request_code: i32) -> bool {
self.processors.contains_key(&request_code)
}
}
impl Default for PluginProcessorRegistry {
fn default() -> Self {
Self::new()
}
}
pub struct ProcessorDispatcher<SendProc, PullProc, AdminProc> {
core_mapping: HashMap<i32, CoreProcessorVariant>,
send_processor: SendProc,
pull_processor: PullProc,
admin_processor: AdminProc,
plugin_registry: PluginProcessorRegistry,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CoreProcessorVariant {
Send,
Pull,
Admin,
}
impl<SendProc, PullProc, AdminProc> ProcessorDispatcher<SendProc, PullProc, AdminProc>
where
SendProc: RequestProcessorV2,
PullProc: RequestProcessorV2,
AdminProc: RequestProcessorV2,
{
pub fn new(send_processor: SendProc, pull_processor: PullProc, admin_processor: AdminProc) -> Self {
Self {
core_mapping: HashMap::new(),
send_processor,
pull_processor,
admin_processor,
plugin_registry: PluginProcessorRegistry::new(),
}
}
pub fn register_core(&mut self, request_code: i32, variant: CoreProcessorVariant) {
self.core_mapping.insert(request_code, variant);
}
pub fn register_plugin<F, Fut>(&mut self, request_code: i32, processor: F)
where
F: Fn(Channel, ConnectionHandlerContext, &mut RemotingCommand) -> Fut + Send + Sync + 'static,
Fut: Future<Output = RocketMQResult<Option<RemotingCommand>>> + Send + 'static,
{
self.plugin_registry.register(request_code, processor);
}
pub async fn dispatch(
&mut self,
request_code: i32,
channel: Channel,
ctx: ConnectionHandlerContext,
request: &mut RemotingCommand,
) -> RocketMQResult<Option<RemotingCommand>> {
if let Some(variant) = self.core_mapping.get(&request_code) {
return match variant {
CoreProcessorVariant::Send => self.send_processor.process_request(channel, ctx, request).await,
CoreProcessorVariant::Pull => self.pull_processor.process_request(channel, ctx, request).await,
CoreProcessorVariant::Admin => self.admin_processor.process_request(channel, ctx, request).await,
};
}
if let Some(result) = self
.plugin_registry
.process_request(request_code, channel, ctx, request)
.await
{
return result;
}
Err(rocketmq_error::RocketMQError::broker_operation_failed(
"ProcessorDispatcher",
-1,
format!("Unsupported request code: {}", request_code),
))
}
}
pub struct SendMessageProcessorExample {
}
impl Default for SendMessageProcessorExample {
fn default() -> Self {
Self::new()
}
}
impl SendMessageProcessorExample {
pub fn new() -> Self {
Self {}
}
async fn process_internal(
&mut self,
_channel: Channel,
_ctx: ConnectionHandlerContext,
_request: &mut RemotingCommand,
) -> RocketMQResult<Option<RemotingCommand>> {
Ok(None)
}
}
impl RequestProcessorV2 for SendMessageProcessorExample {
type Fut<'a>
= impl Future<Output = RocketMQResult<Option<RemotingCommand>>> + Send + 'a
where
Self: 'a;
fn process_request<'a>(
&'a mut self,
channel: Channel,
ctx: ConnectionHandlerContext,
request: &'a mut RemotingCommand,
) -> Self::Fut<'a> {
self.process_internal(channel, ctx, request)
}
}
pub struct PullMessageProcessorExample {
}
impl Default for PullMessageProcessorExample {
fn default() -> Self {
Self::new()
}
}
impl PullMessageProcessorExample {
pub fn new() -> Self {
Self {}
}
async fn process_internal(
&mut self,
_channel: Channel,
_ctx: ConnectionHandlerContext,
_request: &mut RemotingCommand,
) -> RocketMQResult<Option<RemotingCommand>> {
Ok(None)
}
}
impl RequestProcessorV2 for PullMessageProcessorExample {
type Fut<'a>
= impl Future<Output = RocketMQResult<Option<RemotingCommand>>> + Send + 'a
where
Self: 'a;
fn process_request<'a>(
&'a mut self,
channel: Channel,
ctx: ConnectionHandlerContext,
request: &'a mut RemotingCommand,
) -> Self::Fut<'a> {
self.process_internal(channel, ctx, request)
}
}
pub struct AdminProcessorExample {
}
impl Default for AdminProcessorExample {
fn default() -> Self {
Self::new()
}
}
impl AdminProcessorExample {
pub fn new() -> Self {
Self {}
}
async fn process_internal(
&mut self,
_channel: Channel,
_ctx: ConnectionHandlerContext,
_request: &mut RemotingCommand,
) -> RocketMQResult<Option<RemotingCommand>> {
Ok(None)
}
}
impl RequestProcessorV2 for AdminProcessorExample {
type Fut<'a>
= impl Future<Output = RocketMQResult<Option<RemotingCommand>>> + Send + 'a
where
Self: 'a;
fn process_request<'a>(
&'a mut self,
channel: Channel,
ctx: ConnectionHandlerContext,
request: &'a mut RemotingCommand,
) -> Self::Fut<'a> {
self.process_internal(channel, ctx, request)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_processor_dispatcher() {
let send_processor = SendMessageProcessorExample::new();
let pull_processor = PullMessageProcessorExample::new();
let admin_processor = AdminProcessorExample::new();
let mut dispatcher = ProcessorDispatcher::new(send_processor, pull_processor, admin_processor);
dispatcher.register_core(10, CoreProcessorVariant::Send); dispatcher.register_core(11, CoreProcessorVariant::Pull); dispatcher.register_core(50, CoreProcessorVariant::Admin);
dispatcher.register_plugin(9999, |_channel, _ctx, _request| async move {
Ok(Some(RemotingCommand::create_response_command()))
});
}
}