Crate tokio_memq

Crate tokio_memq 

Source
Expand description

§Tokio MemQ

High-performance, feature-rich in-memory async message queue powered by Tokio. Designed for high-throughput local messaging with advanced features like backpressure, TTL, consumer groups, and pluggable serialization.

§Features

  • Async & Stream API: Built on Tokio, supporting Stream trait for idiomatic async consumption.
  • Backpressure & Flow Control: Bounded channels, LRU eviction, and lag monitoring.
  • Advanced Consumption:
    • Batch Operations: High-throughput publish_batch and recv_batch.
    • Consumer Groups: Support for Earliest, Latest, and Offset seeking.
    • Filtering: Server-side filtering with recv_filter.
    • Manual Commit/Seek: Precise offset control.
  • Serialization Pipeline:
    • Pluggable formats (JSON, MessagePack, Bincode).
    • Compression support (Gzip, Zstd).
    • Per-topic and per-publisher configuration overrides.
    • Auto-format detection via Magic Headers.
  • Management & Monitoring:
    • Topic deletion and creation options (TTL, Max Messages).
    • Real-time metrics (depth, subscriber count, lag).

§Usage Examples

§1. Basic Publish & Subscribe

The simplest way to use the queue with default settings.

use tokio_memq::mq::MessageQueue;
use tokio_memq::{MessageSubscriber, AsyncMessagePublisher}; // Import traits for recv() and publish()

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mq = MessageQueue::new();
    let topic = "demo_topic";

    // Subscriber
    let sub = mq.subscriber(topic.to_string()).await?;

    // Publisher
    let pub1 = mq.publisher(topic.to_string());
     
    // Publish asynchronously so subscriber can receive it
    tokio::spawn(async move {
        pub1.publish("Hello World".to_string()).await.unwrap();
    });

    let msg = sub.recv().await?;
    let payload: String = msg.deserialize()?;
    println!("Received: {}", payload);

    Ok(())
}

§2. Stream API

Consume messages as an async stream, ideal for continuous processing loops.

use tokio_stream::StreamExt;
use tokio::pin;

// Create a stream from the subscriber
let stream = sub.stream();
pin!(stream);

// Publish a message to trigger the stream
tokio::spawn(async move {
    pub1.publish("Stream Message".to_string()).await.unwrap();
});

while let Some(msg_res) = stream.next().await {
    match msg_res {
        Ok(msg) => println!("Received: {:?}", msg),
        Err(e) => eprintln!("Error: {}", e),
    }
    break; // Break for test
}

§3. Batch Operations

Improve throughput by processing messages in batches.

// Batch Publish
let messages = vec![1, 2, 3, 4, 5];
publisher.publish_batch(messages).await?;

// Batch Receive
// Returns a vector of up to 10 messages
let batch = sub.recv_batch(10).await?; 
for msg in batch {
    println!("Batch msg: {:?}", msg);
}

§4. Advanced Consumption (Filter, Timeout, Metadata)

use std::time::Duration;

// Receive with Timeout
match sub.recv_timeout(Duration::from_millis(500)).await? {
    Some(msg) => println!("Got msg: {:?}", msg),
    None => println!("Timed out"),
}

// Receive with Filter (Server-side filtering)
// Only receive messages where payload size > 100 bytes
// Note: This will block until a matching message arrives or channel closes
// let large_msg = sub.recv_filter(|msg| msg.payload.len() > 100).await?;

// Metadata-only Mode (Avoids full payload clone/deserialization)
// let msg = sub.recv().await?;
// let meta = msg.metadata();
// println!("Offset: {}, Timestamp: {:?}", meta.offset, meta.created_at);

§5. Consumer Groups & Offsets

Manage offsets manually or use consumer groups for persistent state.

use tokio_memq::mq::{ConsumptionMode, TopicOptions, MessageQueue};
use tokio_memq::{MessageSubscriber, AsyncMessagePublisher};

// Configure topic with retention limits
let options = TopicOptions {
    max_messages: Some(1000),
    message_ttl: None,
    lru_enabled: true,
    ..Default::default()
};

// Subscribe as part of a Consumer Group
// Modes: Earliest, Latest, Offset(n), LastOffset
let sub_group = mq.subscriber_group_with_options(
    "topic_name".to_string(),
    options,
    "group_id_1".to_string(),
    ConsumptionMode::LastOffset
).await?;

// Manual Commit
// let msg = sub_group.recv().await?;
// sub_group.commit(msg.offset); // Save progress

§6. Serialization Configuration

Flexible serialization with per-topic and per-publisher overrides.

use tokio_memq::mq::{
    SerializationFactory, SerializationFormat, SerializationConfig, 
    JsonConfig, PipelineConfig, CompressionConfig
};

let topic = "compressed_logs";

// Configure compression pipeline
let pipeline = PipelineConfig {
    compression: CompressionConfig::Gzip { level: Some(6) },
    pre: None, 
    post: None,
    use_magic_header: true, // Auto-detect format on receive
};

// Register defaults for a topic
SerializationFactory::register_topic_defaults(
    topic,
    SerializationFormat::Json,
    SerializationConfig::Json(JsonConfig { pretty: false }),
    Some(pipeline),
);

§7. Partitioned Topics (Sharding)

Create a partitioned topic with multiple routing strategies (RoundRobin, Hash, Random, Fixed), publish messages with automatic routing, subscribe to a specific partition, and inspect per-partition stats.

use tokio_memq::mq::{MessageQueue, TopicOptions, ConsumptionMode, PartitionRouting};
use std::time::Duration;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mq = MessageQueue::new();
    let topic = "partitioned_events".to_string();

    // Create a topic with 4 partitions
    let options = TopicOptions {
        max_messages: Some(1000),
        partitions: Some(4),
        ..Default::default()
    };
    mq.create_partitioned_topic(topic.clone(), options, 4).await?;

    // Set routing strategy
    mq.set_partition_routing(topic.clone(), PartitionRouting::RoundRobin).await?;

    // Publish a few messages (auto-routed)
    for i in 1..=8 {
        let message = tokio_memq::TopicMessage::new(topic.clone(), format!("msg {}", i))?;
        mq.publish_to_partitioned(message).await?;
    }

    // Subscribe to a specific partition (0) and consume one message
    let sub0 = mq.subscribe_partition(
        topic.clone(), 
        0, 
        Some("partition_demo".to_string()),
        ConsumptionMode::Earliest
    ).await?;

    if let Some(msg) = sub0.recv_timeout(Duration::from_millis(500)).await? {
        println!("Partition 0 got: {}", msg.payload_str());
    }

    // Inspect partition stats
    if let Some(stats) = mq.get_partition_stats(topic.clone(), 0).await {
        println!("Partition 0: {} messages, {} subscribers", stats.message_count, stats.subscriber_count);
    }

    Ok(())
}

Re-exports§

pub use mq::traits::MessagePublisher;
pub use mq::traits::AsyncMessagePublisher;
pub use mq::traits::MessageSubscriber;
pub use mq::traits::QueueManager;
pub use mq::message::TopicMessage;
pub use mq::message::TopicOptions;
pub use mq::message::TimestampedMessage;
pub use mq::message::ConsumptionMode;
pub use mq::serializer::SerializationFormat;
pub use mq::serializer::SerializationHelper;
pub use mq::serializer::Serializer;
pub use mq::serializer::BincodeSerializer;
pub use mq::serializer::SerializationFactory;
pub use mq::serializer::SerializationConfig;
pub use mq::serializer::JsonConfig;
pub use mq::serializer::MessagePackConfig;
pub use mq::serializer::PipelineConfig;
pub use mq::serializer::CompressionConfig;
pub use mq::publisher::Publisher;
pub use mq::subscriber::Subscriber;
pub use mq::broker::TopicManager;
pub use mq::broker::PartitionedTopicChannel;
pub use mq::broker::PartitionRouting;
pub use mq::broker::PartitionStats;
pub use mq::broker::TopicStats;
pub use mq::MessageQueue;

Modules§

mq