Handler

Trait Handler 

Source
pub trait Handler:
    Send
    + Sync
    + 'static {
    // Required method
    fn handle_message<'life0, 'async_trait>(
        &'life0 self,
        message: ProtocolMessage,
    ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
}
Expand description

消息处理器 trait

实现此 trait 来处理从 NSQ 接收的消息。

§自动响应模式(默认)

ConsumerConfig::disable_auto_responsefalse 时(默认):

  • 如果 handle_message 返回 Ok(()),消息会自动发送 FIN 命令
  • 如果 handle_message 返回 Err(_),消息会自动发送 REQ 命令

§手动响应模式

ConsumerConfig::disable_auto_responsetrue 时:

  • 消息不会自动发送 FIN/REQ
  • 需要在 handler 中手动调用:
    • message.finish() 完成消息处理
    • message.requeue(delay) 重新入队消息
    • message.touch() 重置消息超时

或者,在自动响应模式下,也可以在单个消息上调用 message.disable_auto_response() 来禁用该消息的自动响应。

§示例

§自动响应模式

struct MyHandler;

#[async_trait]
impl Handler for MyHandler {
    async fn handle_message(&self, message: Message) -> Result<()> {
        // 处理消息
        println!("收到消息: {:?}", String::from_utf8_lossy(&message.body));
         
        // 返回 Ok 会自动发送 FIN
        Ok(())
    }
}

§手动响应模式

struct MyHandler;

#[async_trait]
impl Handler for MyHandler {
    async fn handle_message(&self, message: Message) -> Result<()> {
        // 处理消息
        match process(&message.body) {
            Ok(_) => {
                // 手动发送 FIN
                message.finish().await?;
            }
            Err(_) => {
                // 手动重新入队,延迟 5 秒
                message.requeue(5000).await?;
            }
        }
         
        Ok(())
    }
}

Required Methods§

Source

fn handle_message<'life0, 'async_trait>( &'life0 self, message: ProtocolMessage, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Implementors§