tokio_memq/
lib.rs

1//! # Tokio MemQ
2//!
3//! High-performance, feature-rich in-memory async message queue powered by Tokio.
4//! Designed for high-throughput local messaging with advanced features like backpressure,
5//! TTL, consumer groups, and pluggable serialization.
6//!
7//! ## Features
8//!
9//! - **Async & Stream API**: Built on Tokio, supporting `Stream` trait for idiomatic async consumption.
10//! - **Backpressure & Flow Control**: Bounded channels, LRU eviction, and lag monitoring.
11//! - **Advanced Consumption**:
12//!   - **Batch Operations**: High-throughput `publish_batch` and `recv_batch`.
13//!   - **Consumer Groups**: Support for `Earliest`, `Latest`, and `Offset` seeking.
14//!   - **Filtering**: Server-side filtering with `recv_filter`.
15//!   - **Manual Commit/Seek**: Precise offset control.
16//! - **Serialization Pipeline**:
17//!   - Pluggable formats (JSON, MessagePack, Bincode).
18//!   - Compression support (Gzip, Zstd).
19//!   - Per-topic and per-publisher configuration overrides.
20//!   - Auto-format detection via Magic Headers.
21//! - **Management & Monitoring**:
22//!   - Topic deletion and creation options (TTL, Max Messages).
23//!   - Real-time metrics (depth, subscriber count, lag).
24//!
25//! ## Usage Examples
26//!
27//! ### 1. Basic Publish & Subscribe
28//!
29//! The simplest way to use the queue with default settings.
30//!
31//! ```rust
32//! use tokio_memq::mq::MessageQueue;
33//! use tokio_memq::{MessageSubscriber, AsyncMessagePublisher}; // Import traits for recv() and publish()
34//!
35//! #[tokio::main]
36//! async fn main() -> anyhow::Result<()> {
37//!     let mq = MessageQueue::new();
38//!     let topic = "demo_topic";
39//!
40//!     // Subscriber
41//!     let sub = mq.subscriber(topic.to_string()).await?;
42//!
43//!     // Publisher
44//!     let pub1 = mq.publisher(topic.to_string());
45//!     
46//!     // Publish asynchronously so subscriber can receive it
47//!     tokio::spawn(async move {
48//!         pub1.publish("Hello World".to_string()).await.unwrap();
49//!     });
50//!
51//!     let msg = sub.recv().await?;
52//!     let payload: String = msg.deserialize()?;
53//!     println!("Received: {}", payload);
54//!
55//!     Ok(())
56//! }
57//! ```
58//!
59//! ### 2. Stream API
60//!
61//! Consume messages as an async stream, ideal for continuous processing loops.
62//!
63//! ```rust
64//! # use tokio_memq::mq::MessageQueue;
65//! # use tokio_memq::{MessageSubscriber, AsyncMessagePublisher};
66//! # #[tokio::main]
67//! # async fn main() -> anyhow::Result<()> {
68//! # let mq = MessageQueue::new();
69//! # let topic = "stream_topic";
70//! # let sub = mq.subscriber(topic.to_string()).await?;
71//! # let pub1 = mq.publisher(topic.to_string());
72//! use tokio_stream::StreamExt;
73//! use tokio::pin;
74//!
75//! // Create a stream from the subscriber
76//! let stream = sub.stream();
77//! pin!(stream);
78//!
79//! // Publish a message to trigger the stream
80//! tokio::spawn(async move {
81//!     pub1.publish("Stream Message".to_string()).await.unwrap();
82//! });
83//!
84//! while let Some(msg_res) = stream.next().await {
85//!     match msg_res {
86//!         Ok(msg) => println!("Received: {:?}", msg),
87//!         Err(e) => eprintln!("Error: {}", e),
88//!     }
89//!     break; // Break for test
90//! }
91//! # Ok(())
92//! # }
93//! ```
94//!
95//! ### 3. Batch Operations
96//!
97//! Improve throughput by processing messages in batches.
98//!
99//! ```rust
100//! # use tokio_memq::mq::MessageQueue;
101//! # use tokio_memq::{MessageSubscriber, AsyncMessagePublisher};
102//! # #[tokio::main]
103//! # async fn main() -> anyhow::Result<()> {
104//! # let mq = MessageQueue::new();
105//! # let topic = "batch_topic";
106//! # let publisher = mq.publisher(topic.to_string());
107//! # let sub = mq.subscriber(topic.to_string()).await?;
108//! // Batch Publish
109//! let messages = vec![1, 2, 3, 4, 5];
110//! publisher.publish_batch(messages).await?;
111//!
112//! // Batch Receive
113//! // Returns a vector of up to 10 messages
114//! let batch = sub.recv_batch(10).await?; 
115//! for msg in batch {
116//!     println!("Batch msg: {:?}", msg);
117//! }
118//! # Ok(())
119//! # }
120//! ```
121//!
122//! ### 4. Advanced Consumption (Filter, Timeout, Metadata)
123//!
124//! ```rust
125//! # use tokio_memq::mq::MessageQueue;
126//! # use tokio_memq::{MessageSubscriber, AsyncMessagePublisher};
127//! # #[tokio::main]
128//! # async fn main() -> anyhow::Result<()> {
129//! # let mq = MessageQueue::new();
130//! # let sub = mq.subscriber("adv_topic".to_string()).await?;
131//! use std::time::Duration;
132//!
133//! // Receive with Timeout
134//! match sub.recv_timeout(Duration::from_millis(500)).await? {
135//!     Some(msg) => println!("Got msg: {:?}", msg),
136//!     None => println!("Timed out"),
137//! }
138//!
139//! // Receive with Filter (Server-side filtering)
140//! // Only receive messages where payload size > 100 bytes
141//! // Note: This will block until a matching message arrives or channel closes
142//! // let large_msg = sub.recv_filter(|msg| msg.payload.len() > 100).await?;
143//!
144//! // Metadata-only Mode (Avoids full payload clone/deserialization)
145//! // let msg = sub.recv().await?;
146//! // let meta = msg.metadata();
147//! // println!("Offset: {}, Timestamp: {:?}", meta.offset, meta.created_at);
148//! # Ok(())
149//! # }
150//! ```
151//!
152//! ### 5. Consumer Groups & Offsets
153//!
154//! Manage offsets manually or use consumer groups for persistent state.
155//!
156//! ```rust
157//! use tokio_memq::mq::{ConsumptionMode, TopicOptions, MessageQueue};
158//! use tokio_memq::{MessageSubscriber, AsyncMessagePublisher};
159//!
160//! # #[tokio::main]
161//! # async fn main() -> anyhow::Result<()> {
162//! # let mq = MessageQueue::new();
163//! // Configure topic with retention limits
164//! let options = TopicOptions {
165//!     max_messages: Some(1000),
166//!     message_ttl: None,
167//!     lru_enabled: true,
168//!     ..Default::default()
169//! };
170//!
171//! // Subscribe as part of a Consumer Group
172//! // Modes: Earliest, Latest, Offset(n), LastOffset
173//! let sub_group = mq.subscriber_group_with_options(
174//!     "topic_name".to_string(),
175//!     options,
176//!     "group_id_1".to_string(),
177//!     ConsumptionMode::LastOffset
178//! ).await?;
179//!
180//! // Manual Commit
181//! // let msg = sub_group.recv().await?;
182//! // sub_group.commit(msg.offset); // Save progress
183//! # Ok(())
184//! # }
185//! ```
186//!
187//! ### 6. Serialization Configuration
188//!
189//! Flexible serialization with per-topic and per-publisher overrides.
190//!
191//! ```rust
192//! use tokio_memq::mq::{
193//!     SerializationFactory, SerializationFormat, SerializationConfig, 
194//!     JsonConfig, PipelineConfig, CompressionConfig
195//! };
196//!
197//! let topic = "compressed_logs";
198//!
199//! // Configure compression pipeline
200//! let pipeline = PipelineConfig {
201//!     compression: CompressionConfig::Gzip { level: Some(6) },
202//!     pre: None, 
203//!     post: None,
204//!     use_magic_header: true, // Auto-detect format on receive
205//! };
206//!
207//! // Register defaults for a topic
208//! SerializationFactory::register_topic_defaults(
209//!     topic,
210//!     SerializationFormat::Json,
211//!     SerializationConfig::Json(JsonConfig { pretty: false }),
212//!     Some(pipeline),
213//! );
214//! ```
215//!
216//! ### 7. Partitioned Topics (Sharding)
217//! 
218//! Create a partitioned topic with multiple routing strategies (RoundRobin, Hash, Random, Fixed),
219//! publish messages with automatic routing, subscribe to a specific partition, and inspect per-partition stats.
220//!
221//! ```rust
222//! use tokio_memq::mq::{MessageQueue, TopicOptions, ConsumptionMode, PartitionRouting};
223//! # use tokio_memq::mq::TopicMessage;
224//! # use tokio_memq::MessageSubscriber;
225//! use std::time::Duration;
226//!
227//! #[tokio::main]
228//! async fn main() -> anyhow::Result<()> {
229//!     let mq = MessageQueue::new();
230//!     let topic = "partitioned_events".to_string();
231//!
232//!     // Create a topic with 4 partitions
233//!     let options = TopicOptions {
234//!         max_messages: Some(1000),
235//!         partitions: Some(4),
236//!         ..Default::default()
237//!     };
238//!     mq.create_partitioned_topic(topic.clone(), options, 4).await?;
239//!
240//!     // Set routing strategy
241//!     mq.set_partition_routing(topic.clone(), PartitionRouting::RoundRobin).await?;
242//!
243//!     // Publish a few messages (auto-routed)
244//!     for i in 1..=8 {
245//!         let message = tokio_memq::TopicMessage::new(topic.clone(), format!("msg {}", i))?;
246//!         mq.publish_to_partitioned(message).await?;
247//!     }
248//!
249//!     // Subscribe to a specific partition (0) and consume one message
250//!     let sub0 = mq.subscribe_partition(
251//!         topic.clone(), 
252//!         0, 
253//!         Some("partition_demo".to_string()),
254//!         ConsumptionMode::Earliest
255//!     ).await?;
256//!
257//!     if let Some(msg) = sub0.recv_timeout(Duration::from_millis(500)).await? {
258//!         println!("Partition 0 got: {}", msg.payload_str());
259//!     }
260//!
261//!     // Inspect partition stats
262//!     if let Some(stats) = mq.get_partition_stats(topic.clone(), 0).await {
263//!         println!("Partition 0: {} messages, {} subscribers", stats.message_count, stats.subscriber_count);
264//!     }
265//!
266//!     Ok(())
267//! }
268//! ```
269
270pub mod mq;
271
272pub use mq::traits::{MessagePublisher, AsyncMessagePublisher, MessageSubscriber, QueueManager};
273pub use mq::message::{TopicMessage, TopicOptions, TimestampedMessage, ConsumptionMode};
274pub use mq::serializer::{
275    SerializationFormat, SerializationHelper, Serializer, BincodeSerializer,
276    SerializationFactory, SerializationConfig, JsonConfig, MessagePackConfig,
277    PipelineConfig, CompressionConfig
278};
279pub use mq::publisher::Publisher;
280pub use mq::subscriber::Subscriber;
281pub use mq::broker::{
282    TopicManager, PartitionedTopicChannel, PartitionRouting, PartitionStats, TopicStats
283};
284pub use mq::MessageQueue;