pub trait Handler:
Send
+ Sync
+ 'static {
// Required method
fn handle_message<'life0, 'async_trait>(
&'life0 self,
message: ProtocolMessage,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
}Expand description
消息处理器 trait
实现此 trait 来处理从 NSQ 接收的消息。
§自动响应模式(默认)
当 ConsumerConfig::disable_auto_response 为 false 时(默认):
- 如果
handle_message返回Ok(()),消息会自动发送 FIN 命令 - 如果
handle_message返回Err(_),消息会自动发送 REQ 命令
§手动响应模式
当 ConsumerConfig::disable_auto_response 为 true 时:
- 消息不会自动发送 FIN/REQ
- 需要在 handler 中手动调用:
message.finish()完成消息处理message.requeue(delay)重新入队消息message.touch()重置消息超时
或者,在自动响应模式下,也可以在单个消息上调用 message.disable_auto_response() 来禁用该消息的自动响应。
§示例
§自动响应模式
ⓘ
struct MyHandler;
#[async_trait]
impl Handler for MyHandler {
async fn handle_message(&self, message: Message) -> Result<()> {
// 处理消息
println!("收到消息: {:?}", String::from_utf8_lossy(&message.body));
// 返回 Ok 会自动发送 FIN
Ok(())
}
}§手动响应模式
ⓘ
struct MyHandler;
#[async_trait]
impl Handler for MyHandler {
async fn handle_message(&self, message: Message) -> Result<()> {
// 处理消息
match process(&message.body) {
Ok(_) => {
// 手动发送 FIN
message.finish().await?;
}
Err(_) => {
// 手动重新入队,延迟 5 秒
message.requeue(5000).await?;
}
}
Ok(())
}
}