mockforge_mqtt/
lib.rs

1//! MQTT protocol support for MockForge
2//!
3//! This crate provides a complete MQTT 3.1.1 broker implementation for IoT and pub/sub
4//! testing scenarios.
5//!
6//! ## Features
7//!
8//! - **Full MQTT 3.1.1 Protocol Support**: Handles all control packet types including
9//!   CONNECT, PUBLISH, SUBSCRIBE, and their acknowledgments
10//! - **QoS 0, 1, 2 Support**: Fire-and-forget, at-least-once, and exactly-once delivery
11//! - **Session Management**: Clean and persistent sessions with subscription restoration
12//! - **Topic Wildcards**: Supports + and # wildcards for flexible subscriptions
13//! - **Retained Messages**: Messages are stored and delivered to new subscribers
14//!
15//! ## Metrics and Observability
16//!
17//! The MQTT broker includes built-in metrics collection for monitoring:
18//! - Connection counts (total and active)
19//! - Message publish/delivery rates
20//! - Subscription tracking
21//! - QoS level distribution
22//! - Error rates and latency
23//!
24//! Use [`MqttMetrics`] to collect metrics and [`MqttMetricsExporter`] to export
25//! in Prometheus format.
26//!
27//! ## Usage
28//!
29//! ```no_run
30//! use mockforge_mqtt::{MqttConfig, start_mqtt_server};
31//!
32//! #[tokio::main]
33//! async fn main() {
34//!     let config = MqttConfig::default();
35//!     start_mqtt_server(config).await.expect("Failed to start MQTT server");
36//! }
37//! ```
38
39pub mod broker;
40pub mod fixtures;
41pub mod metrics;
42pub mod protocol;
43pub mod qos;
44pub mod server;
45pub mod session;
46pub mod spec_registry;
47pub mod tls;
48pub mod topics;
49
50pub use broker::{MqttBroker, MqttConfig};
51pub use fixtures::{AutoPublishConfig, MqttFixture, MqttFixtureRegistry, MqttResponse};
52pub use metrics::{MqttMetrics, MqttMetricsExporter, MqttMetricsSnapshot};
53pub use protocol::{
54    ConnackCode, ConnackPacket, ConnectPacket, Packet, PacketDecoder, PacketEncoder, ProtocolError,
55    PublishPacket, QoS as ProtocolQoS, SubackPacket, SubackReturnCode, SubscribePacket,
56    UnsubscribePacket,
57};
58pub use server::{
59    start_mqtt_dual_server, start_mqtt_server, start_mqtt_server_with_metrics,
60    start_mqtt_tls_server, MqttServer,
61};
62pub use session::SessionManager;
63pub use spec_registry::MqttSpecRegistry;
64pub use tls::{create_tls_acceptor, create_tls_acceptor_with_client_auth, TlsError};
65pub use topics::TopicTree;
66
67#[cfg(test)]
68mod tests {
69    use super::*;
70    use mockforge_core::protocol_abstraction::SpecRegistry;
71    use std::sync::Arc;
72
73    #[test]
74    fn test_mqtt_broker_export() {
75        let config = MqttConfig::default();
76        let spec_registry = Arc::new(MqttSpecRegistry::new());
77        let _broker = MqttBroker::new(config, spec_registry);
78        // Just testing that the type is accessible
79    }
80
81    #[test]
82    fn test_mqtt_config_export() {
83        let config = MqttConfig::default();
84        assert_eq!(config.port, 1883);
85    }
86
87    #[test]
88    fn test_mqtt_fixture_export() {
89        let fixture = MqttFixture {
90            identifier: "test".to_string(),
91            name: "Test".to_string(),
92            topic_pattern: "test".to_string(),
93            qos: 0,
94            retained: false,
95            response: MqttResponse {
96                payload: serde_json::json!({}),
97            },
98            auto_publish: None,
99        };
100        assert_eq!(fixture.identifier, "test");
101    }
102
103    #[test]
104    fn test_mqtt_response_export() {
105        let response = MqttResponse {
106            payload: serde_json::json!({"test": "data"}),
107        };
108        assert_eq!(response.payload["test"], "data");
109    }
110
111    #[test]
112    fn test_auto_publish_config_export() {
113        let config = AutoPublishConfig {
114            enabled: true,
115            interval_ms: 1000,
116            count: Some(5),
117        };
118        assert!(config.enabled);
119        assert_eq!(config.interval_ms, 1000);
120    }
121
122    #[test]
123    fn test_mqtt_fixture_registry_export() {
124        let registry = MqttFixtureRegistry::new();
125        assert_eq!(registry.fixtures().count(), 0);
126    }
127
128    #[test]
129    fn test_mqtt_spec_registry_export() {
130        let registry = MqttSpecRegistry::new();
131        assert_eq!(registry.operations().len(), 0);
132    }
133
134    #[test]
135    fn test_topic_tree_export() {
136        let tree = TopicTree::new();
137        let stats = tree.stats();
138        assert_eq!(stats.total_subscriptions, 0);
139    }
140
141    #[test]
142    fn test_all_modules_accessible() {
143        // Test that all modules are accessible
144        let _broker_module = broker::MqttConfig::default();
145        let _fixtures_module = fixtures::MqttFixtureRegistry::new();
146        let _spec_module = spec_registry::MqttSpecRegistry::new();
147        let _topics_module = topics::TopicTree::new();
148        let _qos_module = qos::QoS::AtMostOnce;
149    }
150
151    #[test]
152    fn test_qos_levels_accessible() {
153        use qos::QoS;
154        assert_eq!(QoS::AtMostOnce.as_u8(), 0);
155        assert_eq!(QoS::AtLeastOnce.as_u8(), 1);
156        assert_eq!(QoS::ExactlyOnce.as_u8(), 2);
157    }
158
159    #[tokio::test]
160    async fn test_broker_basic_usage() {
161        let config = MqttConfig::default();
162        let spec_registry = Arc::new(MqttSpecRegistry::new());
163        let broker = MqttBroker::new(config, spec_registry);
164
165        // Test basic operations
166        broker.client_connect("test-client", true).await.unwrap();
167        let clients = broker.get_connected_clients().await;
168        assert_eq!(clients.len(), 1);
169
170        broker.client_disconnect("test-client").await.unwrap();
171        let clients = broker.get_connected_clients().await;
172        assert_eq!(clients.len(), 0);
173    }
174
175    #[test]
176    fn test_fixture_registry_basic_usage() {
177        let mut registry = MqttFixtureRegistry::new();
178
179        let fixture = MqttFixture {
180            identifier: "test-fixture".to_string(),
181            name: "Test Fixture".to_string(),
182            topic_pattern: "test/topic".to_string(),
183            qos: 1,
184            retained: false,
185            response: MqttResponse {
186                payload: serde_json::json!({"message": "test"}),
187            },
188            auto_publish: None,
189        };
190
191        registry.add_fixture(fixture);
192        assert_eq!(registry.fixtures().count(), 1);
193
194        let found = registry.find_by_topic("test/topic");
195        assert!(found.is_some());
196    }
197
198    #[test]
199    fn test_topic_tree_basic_usage() {
200        let mut tree = TopicTree::new();
201
202        // Subscribe
203        tree.subscribe("sensor/temp", 1, "client-1");
204
205        // Check subscription
206        let matches = tree.match_topic("sensor/temp");
207        assert_eq!(matches.len(), 1);
208
209        // Unsubscribe
210        tree.unsubscribe("sensor/temp", "client-1");
211        let matches = tree.match_topic("sensor/temp");
212        assert_eq!(matches.len(), 0);
213    }
214
215    #[test]
216    fn test_spec_registry_basic_usage() {
217        let mut registry = MqttSpecRegistry::new();
218
219        let fixture = MqttFixture {
220            identifier: "spec-test".to_string(),
221            name: "Spec Test".to_string(),
222            topic_pattern: "spec/test".to_string(),
223            qos: 0,
224            retained: false,
225            response: MqttResponse {
226                payload: serde_json::json!({}),
227            },
228            auto_publish: None,
229        };
230
231        registry.add_fixture(fixture);
232
233        let found = registry.find_fixture_by_topic("spec/test");
234        assert!(found.is_some());
235        assert_eq!(found.unwrap().identifier, "spec-test");
236    }
237
238    #[test]
239    fn test_module_documentation() {
240        // This test ensures the module is properly documented
241        // The module doc comment should mention MQTT, broker, and IoT
242    }
243}