nsq-async-rs
 

Other language versions: English, 简体中文
nsq-async-rs is a high-performance, reliable NSQ client library written in Rust. The project is inspired by the official go-nsq implementation and provides similar functionality and interfaces in the Rust ecosystem.
Features
- ✨ Async I/O support (based on tokio)
- 🚀 High-performance message processing
- 🔄 Automatic reconnection and error retry
- 🔍 Support for nsqlookupd service discovery
- 🛡️ Graceful shutdown support
- 📊 Built-in message statistics
- ⚡ Support for delayed publishing
- 📦 Support for batch publishing
- 🔀 Support for concurrent message processing
- 🏊♂️ Built-in connection pool for producers
- 🎯 Manual message acknowledgement support
- 💫 Feature parity with official go-nsq
Installation
Add the following to your Cargo.toml:
[dependencies]
nsq-async-rs = "0.1.8"
Quick Start
Basic Consumer Example
use nsq_async_rs::consumer::{Consumer, ConsumerConfig, Handler};
use nsq_async_rs::error::Result;
use nsq_async_rs::protocol::Message;
#[derive(Default)]
struct MessageHandler;
#[async_trait::async_trait]
impl Handler for MessageHandler {
    async fn handle_message(&self, message: Message) -> Result<()> {
        println!("Received message: {:?}", String::from_utf8_lossy(&message.body));
        Ok(())
    }
}
#[tokio::main]
async fn main() -> Result<()> {
    let config = ConsumerConfig::default();
    let consumer = Consumer::new(
        "test_topic".to_string(),
        "test_channel".to_string(),
        config,
        MessageHandler::default(),
    )?;
    consumer.connect_to_nsqlookupd("http://127.0.0.1:4161".to_string()).await?;
    consumer.start().await?;
    tokio::signal::ctrl_c().await?;
    consumer.stop().await?;
    Ok(())
}
Concurrent Consumer Example
use async_trait::async_trait;
use log::{error, info};
use nsq_async_rs::consumer::{Consumer, ConsumerConfig, Handler};
use nsq_async_rs::error::Result;
use nsq_async_rs::protocol::Message;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, Mutex};
struct ConcurrentMessageHandler {
    worker_count: usize,
    sender: Arc<Mutex<mpsc::Sender<Message>>>,
}
impl ConcurrentMessageHandler {
    pub fn new(worker_count: usize) -> Self {
                let (tx, rx) = mpsc::channel(worker_count * 10);
        let sender = Arc::new(Mutex::new(tx));
        let receiver = Arc::new(Mutex::new(rx));
        let handler = Self {
            worker_count,
            sender,
        };
                handler.start_workers(receiver);
        handler
    }
    fn start_workers(&self, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) {
        for i in 0..self.worker_count {
            let worker_id = i + 1;
            let rx = receiver.clone();
            tokio::spawn(async move {
                info!("Worker {} started", worker_id);
                loop {
                                        let msg = {
                        let mut rx_guard = rx.lock().await;
                        match rx_guard.recv().await {
                            Some(msg) => msg,
                            None => break,
                        }
                    };
                                        let msg_id = String::from_utf8_lossy(&msg.id).to_string();
                    info!("Worker {} processing message: {}", worker_id, msg_id);
                    
                                        
                    info!("Worker {} finished processing message: {}", worker_id, msg_id);
                }
            });
        }
    }
}
#[async_trait]
impl Handler for ConcurrentMessageHandler {
    async fn handle_message(&self, message: Message) -> Result<()> {
        let msg_id = String::from_utf8_lossy(&message.id).to_string();
        let sender = self.sender.lock().await;
                let send_result = sender.try_send(message.clone());
        match send_result {
            Ok(_) => {
                info!("Message sent to worker channel: ID={}", msg_id);
            }
            Err(mpsc::error::TrySendError::Full(msg)) => {
                                if let Err(e) = sender.send(msg).await {
                    error!("Failed to send message to worker channel: {}", e);
                    return Err(nsq_async_rs::error::Error::Other(e.to_string()));
                }
            }
            Err(mpsc::error::TrySendError::Closed(_)) => {
                error!("Worker channel closed: ID={}", msg_id);
                return Err(nsq_async_rs::error::Error::Other("Worker channel closed".into()));
            }
        }
        Ok(())
    }
}
#[tokio::main]
async fn main() -> Result<()> {
        let config = ConsumerConfig {
        max_in_flight: 100,         max_attempts: 5,
                ..Default::default()
    };
        let handler = ConcurrentMessageHandler::new(20);
        let consumer = Consumer::new(
        "test_topic".to_string(),
        "test_channel".to_string(),
        config,
        handler,
    )?;
    consumer.connect_to_nsqlookupd("http://127.0.0.1:4161".to_string()).await?;
    consumer.start().await?;
    tokio::signal::ctrl_c().await?;
    consumer.stop().await?;
    Ok(())
}
Basic Producer Example
use nsq_async_rs::producer::Producer;
use nsq_async_rs::error::Result;
#[tokio::main]
async fn main() -> Result<()> {
    let producer = Producer::connect("127.0.0.1:4150").await?;
    
    producer.publish("test_topic", "Hello, NSQ!".as_bytes()).await?;
    Ok(())
}
Producer with Connection Pool Example
use log::{error, info};
use nsq_async_rs::{
    producer::{new_producer, NsqProducer},
    Producer, ProducerConfig,
};
use std::error::Error;
use std::time::Duration;
use nsq_async_rs::pool::{Pool, PoolConfig, PoolError};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
        let pool_config = PoolConfig {
        initial_cap: 3,                    max_cap: 10,                       max_idle: 5,                       idle_timeout: Duration::from_secs(30),          max_lifetime: Duration::from_secs(300),     };
        let pool = Pool::new(
        pool_config,
                || {
            let p_cfg = ProducerConfig {
                nsqd_addresses: vec!["127.0.0.1:4150".to_string()],
                ..Default::default()
            };
            Ok(new_producer(p_cfg))
        },
                |_producer| Ok(()),
                Some(|_producer| Ok(())),
    ).await?;
        let topic = "test_topic";
    let pooled_conn = pool.get().await?;
    
        match pooled_conn.conn.publish(topic, "Hello from connection pool!").await {
        Ok(_) => info!("Message published successfully"),
        Err(e) => error!("Failed to publish message: {}", e),
    }
    
        pool.put(pooled_conn).await?;
    
        pool.release().await?;
    
    Ok(())
}
Batch Publishing Example
use chrono::Local;
use nsq_async_rs::producer::{new_producer, ProducerConfig};
use std::error::Error;
use std::time::Instant;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
        let mut config = ProducerConfig::default();
    config.nsqd_addresses = vec!["127.0.0.1:4150".to_string()];
        let producer = new_producer(config);
    let topic = "test_topic";
    
        let mut messages = vec![];
    for i in 0..100 {
        messages.push(format!(
            "Message #{}, Time:{}",
            i + 1,
            Local::now().to_string()
        ));
    }
        let start = Instant::now();
    producer.publish_multi(topic, messages).await?;
    let elapsed = start.elapsed();
    println!("Batch publishing 100 messages took: {:?}", elapsed);
    println!("Average time per message: {:?}", elapsed / 100);
    Ok(())
}
Configuration Options
Consumer Configuration
ConsumerConfig {
    max_in_flight: 100,                      max_attempts: 5,                         dial_timeout: Duration::from_secs(1),      read_timeout: Duration::from_secs(60),     write_timeout: Duration::from_secs(1),     lookup_poll_interval: Duration::from_secs(60),     lookup_poll_jitter: 0.3,                  max_requeue_delay: Duration::from_secs(15 * 60),     default_requeue_delay: Duration::from_secs(90),      shutdown_timeout: Duration::from_secs(30),           backoff_strategy: true,                }
Connection Pool Configuration
PoolConfig {
    initial_cap: 5,                        max_cap: 20,                           max_idle: 10,                          idle_timeout: Duration::from_secs(30),     max_lifetime: Duration::from_secs(300), }
Advanced Features
Connection Health Check (Ping)
let result = producer.ping(None, None).await;
let result = producer.ping(
    Some("127.0.0.1:4150"),
    Some(Duration::from_millis(500))
).await;
if let Err(err) = result {
    println!("NSQ server connection error: {}", err);
    }
Delayed Publishing
producer.publish_with_delay("test_topic", "Delayed message".as_bytes(), Duration::from_secs(60)).await?;
Batch Publishing
let messages = vec![
    "Message 1".as_bytes().to_vec(),
    "Message 2".as_bytes().to_vec(),
    "Message 3".as_bytes().to_vec(),
];
producer.publish_multiple("test_topic", messages).await?;
Manual Message Acknowledgement
nsq-async-rs supports manual message acknowledgement, giving you full control over when messages are acknowledged. This is particularly useful for concurrent processing, batch processing, or complex error handling scenarios.
When to Use Manual Acknowledgement
Manual acknowledgement is ideal for:
- Concurrent message processing - When you need to send messages to a worker thread pool for async processing
- Batch processing - Accumulating multiple messages before processing them together
- Complex error handling - Deciding whether to retry or discard based on different error types
- Long-running processing - Using TOUCHto extend timeout for messages that take longer to process
Enabling Manual Acknowledgement
let config = ConsumerConfig {
    max_in_flight: 100,
    disable_auto_response: true,     ..Default::default()
};
Manual Acknowledgement API
#[async_trait]
impl Handler for MyHandler {
    async fn handle_message(&self, message: Message) -> Result<()> {
                self.worker_tx.send(message).await?;
        
                Ok(())
    }
}
async fn worker(message: Message) {
    match process_message(&message.body).await {
        Ok(_) => {
                        message.finish().await.unwrap();
        }
        Err(_) => {
                        message.requeue(5000).await.unwrap();
        }
    }
}
API Methods
- message.finish()- Send FIN command to mark message as successfully processed
- message.requeue(delay)- Send REQ command to requeue the message with a delay (in milliseconds)
- message.touch()- Send TOUCH command to reset message timeout
- message.disable_auto_response()- Disable auto-response for a specific message
Complete Example
Here's a complete example showing manual acknowledgement with concurrent message processing:
use async_trait::async_trait;
use nsq_async_rs::consumer::{Consumer, ConsumerConfig, Handler};
use nsq_async_rs::protocol::Message;
use tokio::sync::mpsc;
struct ConcurrentHandler {
    worker_tx: mpsc::Sender<Message>,
}
impl ConcurrentHandler {
    fn new(worker_count: usize) -> Self {
        let (tx, mut rx) = mpsc::channel(100);
        
                for worker_id in 0..worker_count {
            let mut worker_rx = rx.clone();
            tokio::spawn(async move {
                while let Some(msg) = worker_rx.recv().await {
                                        match process_message(&msg.body).await {
                        Ok(_) => {
                            msg.finish().await.ok();
                        }
                        Err(_) => {
                            if msg.attempts < 3 {
                                msg.requeue(5000).await.ok();
                            } else {
                                msg.finish().await.ok();                             }
                        }
                    }
                }
            });
        }
        
        Self { worker_tx: tx }
    }
}
#[async_trait]
impl Handler for ConcurrentHandler {
    async fn handle_message(&self, message: Message) -> Result<()> {
        self.worker_tx.send(message).await?;
        Ok(())
    }
}
#[tokio::main]
async fn main() -> Result<()> {
    let config = ConsumerConfig {
        max_in_flight: 100,
        disable_auto_response: true,         ..Default::default()
    };
    
    let handler = ConcurrentHandler::new(20);
    let consumer = Consumer::new("topic", "channel", config, handler)?;
    
    consumer.connect_to_nsqlookupd("http://127.0.0.1:4161").await?;
    consumer.start().await?;
    
    tokio::signal::ctrl_c().await?;
    consumer.stop().await?;
    
    Ok(())
}
Running Examples
cargo run --example manual_ack_consume
cargo run --example simple_consume
Best Practices
- Ensure every message is acknowledged - Use proper error handling to guarantee messages are acknowledged even when processing fails
- Set appropriate max_in_flight- Can be set higher (e.g., 100) with manual ack, but ensure it doesn't exceed worker capacity
- Handle retry logic - Check message.attemptsto avoid infinite retries and use exponential backoff
- Avoid deadlocks - Ensure message channels have sufficient buffer and use try_sendor timeout mechanisms
- Monitor statistics - Regularly call consumer.stats()to monitormessages_received,messages_finished, andmessages_requeued
Troubleshooting
Message Timeout Issues
- Symptom: Messages keep retrying, attemptskeeps increasing
- Solution: Use message.touch()to reset timeout or increaseIdentifyConfig.msg_timeout
Message Loss
- Symptom: Messages received but not processed
- Solution: Ensure manual acknowledgement is properly called in all code paths
Memory Leaks
- Symptom: Memory usage continuously grows
- Solution: Limit channel buffer size, monitor channel length, implement backpressure
Contributing
Contributions are welcome! Please feel free to submit issues and pull requests.
License
MIT License