tokio_memq/lib.rs
1//! # Tokio MemQ
2//!
3//! High-performance, feature-rich in-memory async message queue powered by Tokio.
4//! Designed for high-throughput local messaging with advanced features like backpressure,
5//! TTL, consumer groups, and pluggable serialization.
6//!
7//! ## Features
8//!
9//! - **Async & Stream API**: Built on Tokio, supporting `Stream` trait for idiomatic async consumption.
10//! - **Backpressure & Flow Control**: Bounded channels, LRU eviction, and lag monitoring.
11//! - **Advanced Consumption**:
12//! - **Batch Operations**: High-throughput `publish_batch` and `recv_batch`.
13//! - **Consumer Groups**: Support for `Earliest`, `Latest`, and `Offset` seeking.
14//! - **Filtering**: Server-side filtering with `recv_filter`.
15//! - **Manual Commit/Seek**: Precise offset control.
16//! - **Serialization Pipeline**:
17//! - Pluggable formats (JSON, MessagePack, Bincode).
18//! - Compression support (Gzip, Zstd).
19//! - Per-topic and per-publisher configuration overrides.
20//! - Auto-format detection via Magic Headers.
21//! - **Management & Monitoring**:
22//! - Topic deletion and creation options (TTL, Max Messages).
23//! - Real-time metrics (depth, subscriber count, lag).
24//!
25//! ## Usage Examples
26//!
27//! ### 1. Basic Publish & Subscribe
28//!
29//! The simplest way to use the queue with default settings.
30//!
31//! ```rust
32//! use tokio_memq::mq::MessageQueue;
33//! use tokio_memq::{MessageSubscriber, AsyncMessagePublisher}; // Import traits for recv() and publish()
34//!
35//! #[tokio::main]
36//! async fn main() -> anyhow::Result<()> {
37//! let mq = MessageQueue::new();
38//! let topic = "demo_topic";
39//!
40//! // Subscriber
41//! let sub = mq.subscriber(topic.to_string()).await?;
42//!
43//! // Publisher
44//! let pub1 = mq.publisher(topic.to_string());
45//!
46//! // Publish asynchronously so subscriber can receive it
47//! tokio::spawn(async move {
48//! pub1.publish("Hello World".to_string()).await.unwrap();
49//! });
50//!
51//! let msg = sub.recv().await?;
52//! let payload: String = msg.deserialize()?;
53//! println!("Received: {}", payload);
54//!
55//! Ok(())
56//! }
57//! ```
58//!
59//! ### 2. Stream API
60//!
61//! Consume messages as an async stream, ideal for continuous processing loops.
62//!
63//! ```rust
64//! # use tokio_memq::mq::MessageQueue;
65//! # use tokio_memq::{MessageSubscriber, AsyncMessagePublisher};
66//! # #[tokio::main]
67//! # async fn main() -> anyhow::Result<()> {
68//! # let mq = MessageQueue::new();
69//! # let topic = "stream_topic";
70//! # let sub = mq.subscriber(topic.to_string()).await?;
71//! # let pub1 = mq.publisher(topic.to_string());
72//! use tokio_stream::StreamExt;
73//! use tokio::pin;
74//!
75//! // Create a stream from the subscriber
76//! let stream = sub.stream();
77//! pin!(stream);
78//!
79//! // Publish a message to trigger the stream
80//! tokio::spawn(async move {
81//! pub1.publish("Stream Message".to_string()).await.unwrap();
82//! });
83//!
84//! while let Some(msg_res) = stream.next().await {
85//! match msg_res {
86//! Ok(msg) => println!("Received: {:?}", msg),
87//! Err(e) => eprintln!("Error: {}", e),
88//! }
89//! break; // Break for test
90//! }
91//! # Ok(())
92//! # }
93//! ```
94//!
95//! ### 3. Batch Operations
96//!
97//! Improve throughput by processing messages in batches.
98//!
99//! ```rust
100//! # use tokio_memq::mq::MessageQueue;
101//! # use tokio_memq::{MessageSubscriber, AsyncMessagePublisher};
102//! # #[tokio::main]
103//! # async fn main() -> anyhow::Result<()> {
104//! # let mq = MessageQueue::new();
105//! # let topic = "batch_topic";
106//! # let publisher = mq.publisher(topic.to_string());
107//! # let sub = mq.subscriber(topic.to_string()).await?;
108//! // Batch Publish
109//! let messages = vec![1, 2, 3, 4, 5];
110//! publisher.publish_batch(messages).await?;
111//!
112//! // Batch Receive
113//! // Returns a vector of up to 10 messages
114//! let batch = sub.recv_batch(10).await?;
115//! for msg in batch {
116//! println!("Batch msg: {:?}", msg);
117//! }
118//! # Ok(())
119//! # }
120//! ```
121//!
122//! ### 4. Advanced Consumption (Filter, Timeout, Metadata)
123//!
124//! ```rust
125//! # use tokio_memq::mq::MessageQueue;
126//! # use tokio_memq::{MessageSubscriber, AsyncMessagePublisher};
127//! # #[tokio::main]
128//! # async fn main() -> anyhow::Result<()> {
129//! # let mq = MessageQueue::new();
130//! # let sub = mq.subscriber("adv_topic".to_string()).await?;
131//! use std::time::Duration;
132//!
133//! // Receive with Timeout
134//! match sub.recv_timeout(Duration::from_millis(500)).await? {
135//! Some(msg) => println!("Got msg: {:?}", msg),
136//! None => println!("Timed out"),
137//! }
138//!
139//! // Receive with Filter (Server-side filtering)
140//! // Only receive messages where payload size > 100 bytes
141//! // Note: This will block until a matching message arrives or channel closes
142//! // let large_msg = sub.recv_filter(|msg| msg.payload.len() > 100).await?;
143//!
144//! // Metadata-only Mode (Avoids full payload clone/deserialization)
145//! // let msg = sub.recv().await?;
146//! // let meta = msg.metadata();
147//! // println!("Offset: {}, Timestamp: {:?}", meta.offset, meta.created_at);
148//! # Ok(())
149//! # }
150//! ```
151//!
152//! ### 5. Consumer Groups & Offsets
153//!
154//! Manage offsets manually or use consumer groups for persistent state.
155//!
156//! ```rust
157//! use tokio_memq::mq::{ConsumptionMode, TopicOptions, MessageQueue};
158//! use tokio_memq::{MessageSubscriber, AsyncMessagePublisher};
159//!
160//! # #[tokio::main]
161//! # async fn main() -> anyhow::Result<()> {
162//! # let mq = MessageQueue::new();
163//! // Configure topic with retention limits
164//! let options = TopicOptions {
165//! max_messages: Some(1000),
166//! message_ttl: None,
167//! lru_enabled: true,
168//! ..Default::default()
169//! };
170//!
171//! // Subscribe as part of a Consumer Group
172//! // Modes: Earliest, Latest, Offset(n), LastOffset
173//! let sub_group = mq.subscriber_group_with_options(
174//! "topic_name".to_string(),
175//! options,
176//! "group_id_1".to_string(),
177//! ConsumptionMode::LastOffset
178//! ).await?;
179//!
180//! // Manual Commit
181//! // let msg = sub_group.recv().await?;
182//! // sub_group.commit(msg.offset); // Save progress
183//! # Ok(())
184//! # }
185//! ```
186//!
187//! ### 6. Serialization Configuration
188//!
189//! Flexible serialization with per-topic and per-publisher overrides.
190//!
191//! ```rust
192//! use tokio_memq::mq::{
193//! SerializationFactory, SerializationFormat, SerializationConfig,
194//! JsonConfig, PipelineConfig, CompressionConfig
195//! };
196//!
197//! let topic = "compressed_logs";
198//!
199//! // Configure compression pipeline
200//! let pipeline = PipelineConfig {
201//! compression: CompressionConfig::Gzip { level: Some(6) },
202//! pre: None,
203//! post: None,
204//! use_magic_header: true, // Auto-detect format on receive
205//! };
206//!
207//! // Register defaults for a topic
208//! SerializationFactory::register_topic_defaults(
209//! topic,
210//! SerializationFormat::Json,
211//! SerializationConfig::Json(JsonConfig { pretty: false }),
212//! Some(pipeline),
213//! );
214//! ```
215//!
216//! ### 7. Partitioned Topics (Sharding)
217//!
218//! Create a partitioned topic with multiple routing strategies (RoundRobin, Hash, Random, Fixed),
219//! publish messages with automatic routing, subscribe to a specific partition, and inspect per-partition stats.
220//!
221//! ```rust
222//! use tokio_memq::mq::{MessageQueue, TopicOptions, ConsumptionMode, PartitionRouting};
223//! # use tokio_memq::mq::TopicMessage;
224//! # use tokio_memq::MessageSubscriber;
225//! use std::time::Duration;
226//!
227//! #[tokio::main]
228//! async fn main() -> anyhow::Result<()> {
229//! let mq = MessageQueue::new();
230//! let topic = "partitioned_events".to_string();
231//!
232//! // Create a topic with 4 partitions
233//! let options = TopicOptions {
234//! max_messages: Some(1000),
235//! partitions: Some(4),
236//! ..Default::default()
237//! };
238//! mq.create_partitioned_topic(topic.clone(), options, 4).await?;
239//!
240//! // Set routing strategy
241//! mq.set_partition_routing(topic.clone(), PartitionRouting::RoundRobin).await?;
242//!
243//! // Publish a few messages (auto-routed)
244//! for i in 1..=8 {
245//! let message = tokio_memq::TopicMessage::new(topic.clone(), format!("msg {}", i))?;
246//! mq.publish_to_partitioned(message).await?;
247//! }
248//!
249//! // Subscribe to a specific partition (0) and consume one message
250//! let sub0 = mq.subscribe_partition(
251//! topic.clone(),
252//! 0,
253//! Some("partition_demo".to_string()),
254//! ConsumptionMode::Earliest
255//! ).await?;
256//!
257//! if let Some(msg) = sub0.recv_timeout(Duration::from_millis(500)).await? {
258//! println!("Partition 0 got: {}", msg.payload_str());
259//! }
260//!
261//! // Inspect partition stats
262//! if let Some(stats) = mq.get_partition_stats(topic.clone(), 0).await {
263//! println!("Partition 0: {} messages, {} subscribers", stats.message_count, stats.subscriber_count);
264//! }
265//!
266//! Ok(())
267//! }
268//! ```
269
270pub mod mq;
271
272pub use mq::traits::{MessagePublisher, AsyncMessagePublisher, MessageSubscriber, QueueManager};
273pub use mq::message::{TopicMessage, TopicOptions, TimestampedMessage, ConsumptionMode};
274pub use mq::serializer::{
275 SerializationFormat, SerializationHelper, Serializer, BincodeSerializer,
276 SerializationFactory, SerializationConfig, JsonConfig, MessagePackConfig,
277 PipelineConfig, CompressionConfig
278};
279pub use mq::publisher::Publisher;
280pub use mq::subscriber::Subscriber;
281pub use mq::broker::{
282 TopicManager, PartitionedTopicChannel, PartitionRouting, PartitionStats, TopicStats
283};
284pub use mq::MessageQueue;