nsq-async-rs 0.1.8

An asynchronous Rust NSQ client library with support for concurrent message processing
Documentation

nsq-async-rs

License Crates.io

Other language versions: English, 简体中文

nsq-async-rs is a high-performance, reliable NSQ client library written in Rust. The project is inspired by the official go-nsq implementation and provides similar functionality and interfaces in the Rust ecosystem.

Features

  • ✨ Async I/O support (based on tokio)
  • 🚀 High-performance message processing
  • 🔄 Automatic reconnection and error retry
  • 🔍 Support for nsqlookupd service discovery
  • 🛡️ Graceful shutdown support
  • 📊 Built-in message statistics
  • ⚡ Support for delayed publishing
  • 📦 Support for batch publishing
  • 🔀 Support for concurrent message processing
  • 🏊‍♂️ Built-in connection pool for producers
  • 🎯 Manual message acknowledgement support
  • 💫 Feature parity with official go-nsq

Installation

Add the following to your Cargo.toml:

[dependencies]
nsq-async-rs = "0.1.8"

Quick Start

Basic Consumer Example

use nsq_async_rs::consumer::{Consumer, ConsumerConfig, Handler};
use nsq_async_rs::error::Result;
use nsq_async_rs::protocol::Message;

#[derive(Default)]
struct MessageHandler;

#[async_trait::async_trait]
impl Handler for MessageHandler {
    async fn handle_message(&self, message: Message) -> Result<()> {
        println!("Received message: {:?}", String::from_utf8_lossy(&message.body));
        Ok(())
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    let config = ConsumerConfig::default();
    let consumer = Consumer::new(
        "test_topic".to_string(),
        "test_channel".to_string(),
        config,
        MessageHandler::default(),
    )?;

    consumer.connect_to_nsqlookupd("http://127.0.0.1:4161".to_string()).await?;
    consumer.start().await?;

    tokio::signal::ctrl_c().await?;
    consumer.stop().await?;
    Ok(())
}

Concurrent Consumer Example

use async_trait::async_trait;
use log::{error, info};
use nsq_async_rs::consumer::{Consumer, ConsumerConfig, Handler};
use nsq_async_rs::error::Result;
use nsq_async_rs::protocol::Message;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, Mutex};

/// Concurrent message handler
struct ConcurrentMessageHandler {
    worker_count: usize,
    sender: Arc<Mutex<mpsc::Sender<Message>>>,
}

impl ConcurrentMessageHandler {
    pub fn new(worker_count: usize) -> Self {
        // Create message channel with buffer size 10x worker count
        let (tx, rx) = mpsc::channel(worker_count * 10);
        let sender = Arc::new(Mutex::new(tx));
        let receiver = Arc::new(Mutex::new(rx));

        let handler = Self {
            worker_count,
            sender,
        };

        // Start worker threads
        handler.start_workers(receiver);

        handler
    }

    fn start_workers(&self, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) {
        for i in 0..self.worker_count {
            let worker_id = i + 1;
            let rx = receiver.clone();

            tokio::spawn(async move {
                info!("Worker {} started", worker_id);

                loop {
                    // Get message from channel
                    let msg = {
                        let mut rx_guard = rx.lock().await;
                        match rx_guard.recv().await {
                            Some(msg) => msg,
                            None => break,
                        }
                    };

                    // Process message
                    let msg_id = String::from_utf8_lossy(&msg.id).to_string();
                    info!("Worker {} processing message: {}", worker_id, msg_id);
                    
                    // Add your message processing logic here
                    
                    info!("Worker {} finished processing message: {}", worker_id, msg_id);
                }
            });
        }
    }
}

#[async_trait]
impl Handler for ConcurrentMessageHandler {
    async fn handle_message(&self, message: Message) -> Result<()> {
        let msg_id = String::from_utf8_lossy(&message.id).to_string();
        let sender = self.sender.lock().await;

        // Try non-blocking send first
        let send_result = sender.try_send(message.clone());
        match send_result {
            Ok(_) => {
                info!("Message sent to worker channel: ID={}", msg_id);
            }
            Err(mpsc::error::TrySendError::Full(msg)) => {
                // Channel full, use blocking send
                if let Err(e) = sender.send(msg).await {
                    error!("Failed to send message to worker channel: {}", e);
                    return Err(nsq_async_rs::error::Error::Other(e.to_string()));
                }
            }
            Err(mpsc::error::TrySendError::Closed(_)) => {
                error!("Worker channel closed: ID={}", msg_id);
                return Err(nsq_async_rs::error::Error::Other("Worker channel closed".into()));
            }
        }

        Ok(())
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    // Create consumer config
    let config = ConsumerConfig {
        max_in_flight: 100, // Increase for higher throughput
        max_attempts: 5,
        // Other config options...
        ..Default::default()
    };

    // Create concurrent handler with 20 worker threads
    let handler = ConcurrentMessageHandler::new(20);

    // Create consumer
    let consumer = Consumer::new(
        "test_topic".to_string(),
        "test_channel".to_string(),
        config,
        handler,
    )?;

    consumer.connect_to_nsqlookupd("http://127.0.0.1:4161".to_string()).await?;
    consumer.start().await?;

    tokio::signal::ctrl_c().await?;
    consumer.stop().await?;
    Ok(())
}

Basic Producer Example

use nsq_async_rs::producer::Producer;
use nsq_async_rs::error::Result;

#[tokio::main]
async fn main() -> Result<()> {
    let producer = Producer::connect("127.0.0.1:4150").await?;
    
    producer.publish("test_topic", "Hello, NSQ!".as_bytes()).await?;
    Ok(())
}

Producer with Connection Pool Example

use log::{error, info};
use nsq_async_rs::{
    producer::{new_producer, NsqProducer},
    Producer, ProducerConfig,
};
use std::error::Error;
use std::time::Duration;
use nsq_async_rs::pool::{Pool, PoolConfig, PoolError};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // Configure the connection pool
    let pool_config = PoolConfig {
        initial_cap: 3,            // Initial connections
        max_cap: 10,               // Maximum connections
        max_idle: 5,               // Maximum idle connections
        idle_timeout: Duration::from_secs(30),  // Idle timeout
        max_lifetime: Duration::from_secs(300), // Connection lifetime (5 minutes)
    };

    // Create NSQ producer connection pool
    let pool = Pool::new(
        pool_config,
        // Factory function to create new connections
        || {
            let p_cfg = ProducerConfig {
                nsqd_addresses: vec!["127.0.0.1:4150".to_string()],
                ..Default::default()
            };
            Ok(new_producer(p_cfg))
        },
        // Close function (NSQ producers don't need explicit closing)
        |_producer| Ok(()),
        // Optional ping function to check connection health
        Some(|_producer| Ok(())),
    ).await?;

    // Get a connection from the pool
    let topic = "test_topic";
    let pooled_conn = pool.get().await?;
    
    // Use the connection
    match pooled_conn.conn.publish(topic, "Hello from connection pool!").await {
        Ok(_) => info!("Message published successfully"),
        Err(e) => error!("Failed to publish message: {}", e),
    }
    
    // Return the connection to the pool
    pool.put(pooled_conn).await?;
    
    // Close the pool when done
    pool.release().await?;
    
    Ok(())
}

Batch Publishing Example

use chrono::Local;
use nsq_async_rs::producer::{new_producer, ProducerConfig};
use std::error::Error;
use std::time::Instant;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // Create producer config
    let mut config = ProducerConfig::default();
    config.nsqd_addresses = vec!["127.0.0.1:4150".to_string()];

    // Create NSQ producer
    let producer = new_producer(config);
    let topic = "test_topic";
    
    // Prepare multiple messages
    let mut messages = vec![];
    for i in 0..100 {
        messages.push(format!(
            "Message #{}, Time:{}",
            i + 1,
            Local::now().to_string()
        ));
    }

    // Measure batch publishing performance
    let start = Instant::now();
    producer.publish_multi(topic, messages).await?;
    let elapsed = start.elapsed();

    println!("Batch publishing 100 messages took: {:?}", elapsed);
    println!("Average time per message: {:?}", elapsed / 100);

    Ok(())
}

Configuration Options

Consumer Configuration

ConsumerConfig {
    max_in_flight: 100,                  // Maximum number of messages to process simultaneously
    max_attempts: 5,                     // Maximum number of retries
    dial_timeout: Duration::from_secs(1),  // Connection timeout
    read_timeout: Duration::from_secs(60), // Read timeout
    write_timeout: Duration::from_secs(1), // Write timeout
    lookup_poll_interval: Duration::from_secs(60), // nsqlookupd polling interval
    lookup_poll_jitter: 0.3,              // Polling jitter coefficient
    max_requeue_delay: Duration::from_secs(15 * 60), // Maximum requeue delay
    default_requeue_delay: Duration::from_secs(90),  // Default requeue delay
    shutdown_timeout: Duration::from_secs(30),       // Shutdown timeout
    backoff_strategy: true,                // Enable exponential backoff reconnection strategy
}

Connection Pool Configuration

PoolConfig {
    initial_cap: 5,                    // Initial connections to create
    max_cap: 20,                       // Maximum connections allowed
    max_idle: 10,                      // Maximum idle connections to keep
    idle_timeout: Duration::from_secs(30), // How long connections can remain idle
    max_lifetime: Duration::from_secs(300), // Maximum connection lifetime (5 minutes)
}

Advanced Features

Connection Health Check (Ping)

// Ping with default timeout (5 seconds)
let result = producer.ping(None, None).await;

// Ping with custom address and timeout
let result = producer.ping(
    Some("127.0.0.1:4150"),
    Some(Duration::from_millis(500))
).await;

// Check ping result before proceeding
if let Err(err) = result {
    println!("NSQ server connection error: {}", err);
    // Handle connection error...
}

Delayed Publishing

producer.publish_with_delay("test_topic", "Delayed message".as_bytes(), Duration::from_secs(60)).await?;

Batch Publishing

let messages = vec![
    "Message 1".as_bytes().to_vec(),
    "Message 2".as_bytes().to_vec(),
    "Message 3".as_bytes().to_vec(),
];
producer.publish_multiple("test_topic", messages).await?;

Manual Message Acknowledgement

nsq-async-rs supports manual message acknowledgement, giving you full control over when messages are acknowledged. This is particularly useful for concurrent processing, batch processing, or complex error handling scenarios.

When to Use Manual Acknowledgement

Manual acknowledgement is ideal for:

  1. Concurrent message processing - When you need to send messages to a worker thread pool for async processing
  2. Batch processing - Accumulating multiple messages before processing them together
  3. Complex error handling - Deciding whether to retry or discard based on different error types
  4. Long-running processing - Using TOUCH to extend timeout for messages that take longer to process

Enabling Manual Acknowledgement

let config = ConsumerConfig {
    max_in_flight: 100,
    disable_auto_response: true, // Enable manual acknowledgement
    ..Default::default()
};

Manual Acknowledgement API

#[async_trait]
impl Handler for MyHandler {
    async fn handle_message(&self, message: Message) -> Result<()> {
        // Send to worker thread for async processing
        self.worker_tx.send(message).await?;
        
        // Return Ok, but won't auto-FIN (because manual ack is enabled)
        Ok(())
    }
}

// In worker thread
async fn worker(message: Message) {
    match process_message(&message.body).await {
        Ok(_) => {
            // Manually send FIN
            message.finish().await.unwrap();
        }
        Err(_) => {
            // Manually requeue with 5 second delay
            message.requeue(5000).await.unwrap();
        }
    }
}

API Methods

  • message.finish() - Send FIN command to mark message as successfully processed
  • message.requeue(delay) - Send REQ command to requeue the message with a delay (in milliseconds)
  • message.touch() - Send TOUCH command to reset message timeout
  • message.disable_auto_response() - Disable auto-response for a specific message

Complete Example

Here's a complete example showing manual acknowledgement with concurrent message processing:

use async_trait::async_trait;
use nsq_async_rs::consumer::{Consumer, ConsumerConfig, Handler};
use nsq_async_rs::protocol::Message;
use tokio::sync::mpsc;

struct ConcurrentHandler {
    worker_tx: mpsc::Sender<Message>,
}

impl ConcurrentHandler {
    fn new(worker_count: usize) -> Self {
        let (tx, mut rx) = mpsc::channel(100);
        
        // Start worker threads
        for worker_id in 0..worker_count {
            let mut worker_rx = rx.clone();
            tokio::spawn(async move {
                while let Some(msg) = worker_rx.recv().await {
                    // Process message
                    match process_message(&msg.body).await {
                        Ok(_) => {
                            msg.finish().await.ok();
                        }
                        Err(_) => {
                            if msg.attempts < 3 {
                                msg.requeue(5000).await.ok();
                            } else {
                                msg.finish().await.ok(); // Give up retrying
                            }
                        }
                    }
                }
            });
        }
        
        Self { worker_tx: tx }
    }
}

#[async_trait]
impl Handler for ConcurrentHandler {
    async fn handle_message(&self, message: Message) -> Result<()> {
        self.worker_tx.send(message).await?;
        Ok(())
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    let config = ConsumerConfig {
        max_in_flight: 100,
        disable_auto_response: true, // Enable manual acknowledgement
        ..Default::default()
    };
    
    let handler = ConcurrentHandler::new(20);
    let consumer = Consumer::new("topic", "channel", config, handler)?;
    
    consumer.connect_to_nsqlookupd("http://127.0.0.1:4161").await?;
    consumer.start().await?;
    
    tokio::signal::ctrl_c().await?;
    consumer.stop().await?;
    
    Ok(())
}

Running Examples

# Run manual acknowledgement example
cargo run --example manual_ack_consume

# Run automatic acknowledgement example (for comparison)
cargo run --example simple_consume

Best Practices

  1. Ensure every message is acknowledged - Use proper error handling to guarantee messages are acknowledged even when processing fails
  2. Set appropriate max_in_flight - Can be set higher (e.g., 100) with manual ack, but ensure it doesn't exceed worker capacity
  3. Handle retry logic - Check message.attempts to avoid infinite retries and use exponential backoff
  4. Avoid deadlocks - Ensure message channels have sufficient buffer and use try_send or timeout mechanisms
  5. Monitor statistics - Regularly call consumer.stats() to monitor messages_received, messages_finished, and messages_requeued

Troubleshooting

Message Timeout Issues

  • Symptom: Messages keep retrying, attempts keeps increasing
  • Solution: Use message.touch() to reset timeout or increase IdentifyConfig.msg_timeout

Message Loss

  • Symptom: Messages received but not processed
  • Solution: Ensure manual acknowledgement is properly called in all code paths

Memory Leaks

  • Symptom: Memory usage continuously grows
  • Solution: Limit channel buffer size, monitor channel length, implement backpressure

Contributing

Contributions are welcome! Please feel free to submit issues and pull requests.

License

MIT License