#[cfg(any(
feature = "smtp",
feature = "mqtt",
feature = "kafka",
feature = "amqp"
))]
use axum::extract::Path;
#[cfg(any(feature = "mqtt", feature = "kafka"))]
use axum::extract::Query;
#[cfg(any(feature = "mqtt", feature = "kafka"))]
use axum::response::sse::{Event, Sse};
use axum::{
extract::State,
http::StatusCode,
response::{IntoResponse, Json},
};
#[cfg(any(feature = "mqtt", feature = "kafka"))]
use futures::stream::{self, Stream};
#[cfg(any(feature = "mqtt", feature = "kafka", feature = "amqp"))]
use serde::{Deserialize, Serialize};
#[cfg(any(feature = "mqtt", feature = "kafka"))]
use std::convert::Infallible;
#[cfg(any(feature = "mqtt", feature = "kafka"))]
use tokio::sync::broadcast;
#[cfg(any(feature = "mqtt", feature = "kafka"))]
use tracing::*;
use super::ManagementState;
#[cfg(any(feature = "mqtt", feature = "kafka"))]
use super::MessageEvent;
#[cfg(feature = "mqtt")]
use super::MqttMessageEvent;
#[cfg(feature = "smtp")]
use mockforge_smtp::EmailSearchFilters;
#[cfg(feature = "smtp")]
pub(crate) async fn list_smtp_emails(State(state): State<ManagementState>) -> impl IntoResponse {
if let Some(ref smtp_registry) = state.smtp_registry {
match smtp_registry.get_emails() {
Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"error": "Failed to retrieve emails",
"message": e.to_string()
})),
),
}
} else {
(
StatusCode::NOT_IMPLEMENTED,
Json(serde_json::json!({
"error": "SMTP mailbox management not available",
"message": "SMTP server is not enabled or registry not available."
})),
)
}
}
#[cfg(feature = "smtp")]
pub(crate) async fn get_smtp_email(
State(state): State<ManagementState>,
Path(id): Path<String>,
) -> impl IntoResponse {
if let Some(ref smtp_registry) = state.smtp_registry {
match smtp_registry.get_email_by_id(&id) {
Ok(Some(email)) => (StatusCode::OK, Json(serde_json::json!(email))),
Ok(None) => (
StatusCode::NOT_FOUND,
Json(serde_json::json!({
"error": "Email not found",
"id": id
})),
),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"error": "Failed to retrieve email",
"message": e.to_string()
})),
),
}
} else {
(
StatusCode::NOT_IMPLEMENTED,
Json(serde_json::json!({
"error": "SMTP mailbox management not available",
"message": "SMTP server is not enabled or registry not available."
})),
)
}
}
#[cfg(feature = "smtp")]
pub(crate) async fn clear_smtp_mailbox(State(state): State<ManagementState>) -> impl IntoResponse {
if let Some(ref smtp_registry) = state.smtp_registry {
match smtp_registry.clear_mailbox() {
Ok(()) => (
StatusCode::OK,
Json(serde_json::json!({
"message": "Mailbox cleared successfully"
})),
),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"error": "Failed to clear mailbox",
"message": e.to_string()
})),
),
}
} else {
(
StatusCode::NOT_IMPLEMENTED,
Json(serde_json::json!({
"error": "SMTP mailbox management not available",
"message": "SMTP server is not enabled or registry not available."
})),
)
}
}
#[cfg(feature = "smtp")]
pub(crate) async fn export_smtp_mailbox(
Query(params): Query<std::collections::HashMap<String, String>>,
) -> impl IntoResponse {
let format = params.get("format").unwrap_or(&"json".to_string()).clone();
(
StatusCode::NOT_IMPLEMENTED,
Json(serde_json::json!({
"error": "SMTP mailbox management not available via HTTP API",
"message": "SMTP server runs separately from HTTP server. Use CLI commands to access mailbox.",
"requested_format": format
})),
)
}
#[cfg(feature = "smtp")]
pub(crate) async fn search_smtp_emails(
State(state): State<ManagementState>,
Query(params): Query<std::collections::HashMap<String, String>>,
) -> impl IntoResponse {
if let Some(ref smtp_registry) = state.smtp_registry {
let filters = EmailSearchFilters {
sender: params.get("sender").cloned(),
recipient: params.get("recipient").cloned(),
subject: params.get("subject").cloned(),
body: params.get("body").cloned(),
since: params
.get("since")
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&chrono::Utc)),
until: params
.get("until")
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&chrono::Utc)),
use_regex: params.get("regex").map(|s| s == "true").unwrap_or(false),
case_sensitive: params.get("case_sensitive").map(|s| s == "true").unwrap_or(false),
};
match smtp_registry.search_emails(filters) {
Ok(emails) => (StatusCode::OK, Json(serde_json::json!(emails))),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"error": "Failed to search emails",
"message": e.to_string()
})),
),
}
} else {
(
StatusCode::NOT_IMPLEMENTED,
Json(serde_json::json!({
"error": "SMTP mailbox management not available",
"message": "SMTP server is not enabled or registry not available."
})),
)
}
}
#[cfg(feature = "mqtt")]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MqttBrokerStats {
pub connected_clients: usize,
pub active_topics: usize,
pub retained_messages: usize,
pub total_subscriptions: usize,
}
#[cfg(feature = "mqtt")]
pub(crate) async fn get_mqtt_stats(State(state): State<ManagementState>) -> impl IntoResponse {
if let Some(broker) = &state.mqtt_sessions {
let connected_clients = broker.get_connected_clients().await.len();
let active_topics = broker.get_active_topics().await.len();
let stats = broker.get_topic_stats().await;
let broker_stats = MqttBrokerStats {
connected_clients,
active_topics,
retained_messages: stats.retained_messages,
total_subscriptions: stats.total_subscriptions,
};
Json(broker_stats).into_response()
} else {
(StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
}
}
#[cfg(feature = "mqtt")]
pub(crate) async fn get_mqtt_clients(State(state): State<ManagementState>) -> impl IntoResponse {
if let Some(broker) = &state.mqtt_sessions {
let clients = broker.get_connected_clients().await;
Json(serde_json::json!({
"clients": clients
}))
.into_response()
} else {
(StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
}
}
#[cfg(feature = "mqtt")]
pub(crate) async fn get_mqtt_topics(State(state): State<ManagementState>) -> impl IntoResponse {
if let Some(broker) = &state.mqtt_sessions {
let topics = broker.get_active_topics().await;
Json(serde_json::json!({
"topics": topics
}))
.into_response()
} else {
(StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
}
}
#[cfg(feature = "mqtt")]
pub(crate) async fn disconnect_mqtt_client(
State(state): State<ManagementState>,
Path(client_id): Path<String>,
) -> impl IntoResponse {
if let Some(sessions) = &state.mqtt_sessions {
sessions.disconnect(&client_id, true).await;
(StatusCode::OK, format!("Client '{}' disconnected", client_id)).into_response()
} else {
(StatusCode::SERVICE_UNAVAILABLE, "MQTT broker not available").into_response()
}
}
#[cfg(feature = "mqtt")]
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
pub struct MqttPublishRequest {
pub topic: String,
pub payload: String,
#[serde(default = "default_qos")]
pub qos: u8,
#[serde(default)]
pub retain: bool,
}
#[cfg(feature = "mqtt")]
#[allow(dead_code)]
fn default_qos() -> u8 {
0
}
#[cfg(feature = "mqtt")]
pub(crate) async fn publish_mqtt_message_handler(
State(state): State<ManagementState>,
Json(request): Json<serde_json::Value>,
) -> impl IntoResponse {
let topic = request.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
let payload = request.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
let qos = request.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
let retain = request.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
if topic.is_none() || payload.is_none() {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"error": "Invalid request",
"message": "Missing required fields: topic and payload"
})),
);
}
let topic = topic.unwrap();
let payload = payload.unwrap();
if let Some(broker) = &state.mqtt_sessions {
if qos > 2 {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"error": "Invalid QoS",
"message": "QoS must be 0, 1, or 2"
})),
);
}
let payload_bytes = payload.as_bytes().to_vec();
let client_id = "mockforge-management-api".to_string();
let qos_level = mockforge_mqtt::ProtocolQoS::try_from(qos).unwrap_or_default();
broker.publish_raw(&client_id, &topic, payload_bytes, qos_level, retain).await;
let event = MessageEvent::Mqtt(MqttMessageEvent {
topic: topic.clone(),
payload: payload.clone(),
qos,
retain,
timestamp: chrono::Utc::now().to_rfc3339(),
});
let _ = state.message_events.send(event);
(
StatusCode::OK,
Json(serde_json::json!({
"success": true,
"message": format!("Message published to topic '{}'", topic),
"topic": topic,
"qos": qos,
"retain": retain
})),
)
} else {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({
"error": "MQTT broker not available",
"message": "MQTT broker is not enabled or not available."
})),
)
}
}
#[cfg(not(feature = "mqtt"))]
pub(crate) async fn publish_mqtt_message_handler(
State(_state): State<ManagementState>,
Json(_request): Json<serde_json::Value>,
) -> impl IntoResponse {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({
"error": "MQTT feature not enabled",
"message": "MQTT support is not compiled into this build"
})),
)
}
#[cfg(feature = "mqtt")]
#[derive(Debug, Deserialize)]
#[allow(dead_code)]
pub struct MqttBatchPublishRequest {
pub messages: Vec<MqttPublishRequest>,
#[serde(default = "default_delay")]
pub delay_ms: u64,
}
#[cfg(feature = "mqtt")]
#[allow(dead_code)]
fn default_delay() -> u64 {
100
}
#[cfg(feature = "mqtt")]
pub(crate) async fn publish_mqtt_batch_handler(
State(state): State<ManagementState>,
Json(request): Json<serde_json::Value>,
) -> impl IntoResponse {
let messages_json = request.get("messages").and_then(|v| v.as_array());
let delay_ms = request.get("delay_ms").and_then(|v| v.as_u64()).unwrap_or(100);
if messages_json.is_none() {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"error": "Invalid request",
"message": "Missing required field: messages"
})),
);
}
let messages_json = messages_json.unwrap();
if let Some(broker) = &state.mqtt_sessions {
if messages_json.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"error": "Empty batch",
"message": "At least one message is required"
})),
);
}
let mut results = Vec::new();
let client_id = "mockforge-management-api".to_string();
for (index, msg_json) in messages_json.iter().enumerate() {
let topic = msg_json.get("topic").and_then(|v| v.as_str()).map(|s| s.to_string());
let payload = msg_json.get("payload").and_then(|v| v.as_str()).map(|s| s.to_string());
let qos = msg_json.get("qos").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
let retain = msg_json.get("retain").and_then(|v| v.as_bool()).unwrap_or(false);
if topic.is_none() || payload.is_none() {
results.push(serde_json::json!({
"index": index,
"success": false,
"error": "Missing required fields: topic and payload"
}));
continue;
}
let topic = topic.unwrap();
let payload = payload.unwrap();
if qos > 2 {
results.push(serde_json::json!({
"index": index,
"success": false,
"error": "Invalid QoS (must be 0, 1, or 2)"
}));
continue;
}
let payload_bytes = payload.as_bytes().to_vec();
let qos_level = mockforge_mqtt::ProtocolQoS::try_from(qos).unwrap_or_default();
broker.publish_raw(&client_id, &topic, payload_bytes, qos_level, retain).await;
let event = MessageEvent::Mqtt(MqttMessageEvent {
topic: topic.clone(),
payload: payload.clone(),
qos,
retain,
timestamp: chrono::Utc::now().to_rfc3339(),
});
let _ = state.message_events.send(event);
results.push(serde_json::json!({
"index": index,
"success": true,
"topic": topic,
"qos": qos
}));
if index < messages_json.len() - 1 && delay_ms > 0 {
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
}
}
let success_count =
results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
(
StatusCode::OK,
Json(serde_json::json!({
"success": true,
"total": messages_json.len(),
"succeeded": success_count,
"failed": messages_json.len() - success_count,
"results": results
})),
)
} else {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({
"error": "MQTT broker not available",
"message": "MQTT broker is not enabled or not available."
})),
)
}
}
#[cfg(not(feature = "mqtt"))]
pub(crate) async fn publish_mqtt_batch_handler(
State(_state): State<ManagementState>,
Json(_request): Json<serde_json::Value>,
) -> impl IntoResponse {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({
"error": "MQTT feature not enabled",
"message": "MQTT support is not compiled into this build"
})),
)
}
#[cfg(feature = "mqtt")]
pub(crate) async fn mqtt_messages_stream(
State(state): State<ManagementState>,
Query(params): Query<std::collections::HashMap<String, String>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let rx = state.message_events.subscribe();
let topic_filter = params.get("topic").cloned();
let stream = stream::unfold(rx, move |mut rx| {
let topic_filter = topic_filter.clone();
async move {
loop {
match rx.recv().await {
Ok(MessageEvent::Mqtt(event)) => {
if let Some(filter) = &topic_filter {
if !event.topic.contains(filter) {
continue;
}
}
let event_json = serde_json::json!({
"protocol": "mqtt",
"topic": event.topic,
"payload": event.payload,
"qos": event.qos,
"retain": event.retain,
"timestamp": event.timestamp,
});
if let Ok(event_data) = serde_json::to_string(&event_json) {
let sse_event = Event::default().event("mqtt_message").data(event_data);
return Some((Ok(sse_event), rx));
}
}
#[cfg(feature = "kafka")]
Ok(MessageEvent::Kafka(_)) => {
continue;
}
Err(broadcast::error::RecvError::Closed) => {
return None;
}
Err(broadcast::error::RecvError::Lagged(skipped)) => {
warn!("MQTT message stream lagged, skipped {} messages", skipped);
continue;
}
}
}
}
});
Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(std::time::Duration::from_secs(15))
.text("keep-alive-text"),
)
}
#[cfg(feature = "kafka")]
use super::KafkaMessageEvent;
#[cfg(feature = "kafka")]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KafkaBrokerStats {
pub topics: usize,
pub partitions: usize,
pub consumer_groups: usize,
pub messages_produced: u64,
pub messages_consumed: u64,
}
#[cfg(feature = "kafka")]
#[allow(missing_docs)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KafkaTopicInfo {
pub name: String,
pub partitions: usize,
pub replication_factor: i32,
}
#[cfg(feature = "kafka")]
#[allow(missing_docs)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KafkaConsumerGroupInfo {
pub group_id: String,
pub members: usize,
pub state: String,
}
#[cfg(feature = "kafka")]
pub(crate) async fn get_kafka_stats(State(state): State<ManagementState>) -> impl IntoResponse {
if let Some(broker) = &state.kafka_broker {
let topics = broker.topics.read().await;
let consumer_groups = broker.consumer_groups.read().await;
let total_partitions: usize = topics.values().map(|t| t.partitions.len()).sum();
let metrics_snapshot = broker.metrics().snapshot();
let stats = KafkaBrokerStats {
topics: topics.len(),
partitions: total_partitions,
consumer_groups: consumer_groups.groups().len(),
messages_produced: metrics_snapshot.messages_produced_total,
messages_consumed: metrics_snapshot.messages_consumed_total,
};
Json(stats).into_response()
} else {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({
"error": "Kafka broker not available",
"message": "Kafka broker is not enabled or not available."
})),
)
.into_response()
}
}
#[cfg(feature = "kafka")]
pub(crate) async fn get_kafka_topics(State(state): State<ManagementState>) -> impl IntoResponse {
if let Some(broker) = &state.kafka_broker {
let topics = broker.topics.read().await;
let topic_list: Vec<KafkaTopicInfo> = topics
.iter()
.map(|(name, topic)| KafkaTopicInfo {
name: name.clone(),
partitions: topic.partitions.len(),
replication_factor: topic.config.replication_factor as i32,
})
.collect();
Json(serde_json::json!({
"topics": topic_list
}))
.into_response()
} else {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({
"error": "Kafka broker not available",
"message": "Kafka broker is not enabled or not available."
})),
)
.into_response()
}
}
#[cfg(feature = "kafka")]
pub(crate) async fn get_kafka_topic(
State(state): State<ManagementState>,
Path(topic_name): Path<String>,
) -> impl IntoResponse {
if let Some(broker) = &state.kafka_broker {
let topics = broker.topics.read().await;
if let Some(topic) = topics.get(&topic_name) {
Json(serde_json::json!({
"name": topic_name,
"partitions": topic.partitions.len(),
"replication_factor": topic.config.replication_factor,
"partitions_detail": topic.partitions.iter().enumerate().map(|(idx, partition)| serde_json::json!({
"id": idx as i32,
"leader": 0,
"replicas": vec![0],
"message_count": partition.messages.len()
})).collect::<Vec<_>>()
})).into_response()
} else {
(
StatusCode::NOT_FOUND,
Json(serde_json::json!({
"error": "Topic not found",
"topic": topic_name
})),
)
.into_response()
}
} else {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({
"error": "Kafka broker not available",
"message": "Kafka broker is not enabled or not available."
})),
)
.into_response()
}
}
#[cfg(feature = "kafka")]
pub(crate) async fn get_kafka_groups(State(state): State<ManagementState>) -> impl IntoResponse {
if let Some(broker) = &state.kafka_broker {
let consumer_groups = broker.consumer_groups.read().await;
let groups: Vec<KafkaConsumerGroupInfo> = consumer_groups
.groups()
.iter()
.map(|(group_id, group)| KafkaConsumerGroupInfo {
group_id: group_id.clone(),
members: group.members.len(),
state: "Stable".to_string(), })
.collect();
Json(serde_json::json!({
"groups": groups
}))
.into_response()
} else {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({
"error": "Kafka broker not available",
"message": "Kafka broker is not enabled or not available."
})),
)
.into_response()
}
}
#[cfg(feature = "kafka")]
pub(crate) async fn get_kafka_group(
State(state): State<ManagementState>,
Path(group_id): Path<String>,
) -> impl IntoResponse {
if let Some(broker) = &state.kafka_broker {
let consumer_groups = broker.consumer_groups.read().await;
if let Some(group) = consumer_groups.groups().get(&group_id) {
Json(serde_json::json!({
"group_id": group_id,
"members": group.members.len(),
"state": "Stable",
"members_detail": group.members.iter().map(|(member_id, member)| serde_json::json!({
"member_id": member_id,
"client_id": member.client_id,
"assignments": member.assignment.iter().map(|a| serde_json::json!({
"topic": a.topic,
"partitions": a.partitions
})).collect::<Vec<_>>()
})).collect::<Vec<_>>(),
"offsets": group.offsets.iter().map(|((topic, partition), offset)| serde_json::json!({
"topic": topic,
"partition": partition,
"offset": offset
})).collect::<Vec<_>>()
})).into_response()
} else {
(
StatusCode::NOT_FOUND,
Json(serde_json::json!({
"error": "Consumer group not found",
"group_id": group_id
})),
)
.into_response()
}
} else {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({
"error": "Kafka broker not available",
"message": "Kafka broker is not enabled or not available."
})),
)
.into_response()
}
}
#[cfg(feature = "kafka")]
#[derive(Debug, Deserialize)]
pub struct KafkaProduceRequest {
pub topic: String,
#[serde(default)]
pub key: Option<String>,
pub value: String,
#[serde(default)]
pub partition: Option<i32>,
#[serde(default)]
pub headers: Option<std::collections::HashMap<String, String>>,
}
#[cfg(feature = "kafka")]
pub(crate) async fn produce_kafka_message(
State(state): State<ManagementState>,
Json(request): Json<KafkaProduceRequest>,
) -> impl IntoResponse {
if let Some(broker) = &state.kafka_broker {
let mut topics = broker.topics.write().await;
let topic_entry = topics.entry(request.topic.clone()).or_insert_with(|| {
mockforge_kafka::topics::Topic::new(
request.topic.clone(),
mockforge_kafka::topics::TopicConfig::default(),
)
});
let partition_id = if let Some(partition) = request.partition {
partition
} else {
topic_entry.assign_partition(request.key.as_ref().map(|k| k.as_bytes()))
};
if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"error": "Invalid partition",
"message": format!("Partition {} does not exist (topic has {} partitions)", partition_id, topic_entry.partitions.len())
})),
)
.into_response();
}
let key_clone = request.key.clone();
let headers_clone = request.headers.clone();
let message = mockforge_kafka::partitions::KafkaMessage {
offset: 0, timestamp: chrono::Utc::now().timestamp_millis(),
key: key_clone.clone().map(|k| k.as_bytes().to_vec()),
value: request.value.as_bytes().to_vec(),
headers: headers_clone
.clone()
.unwrap_or_default()
.into_iter()
.map(|(k, v)| (k, v.as_bytes().to_vec()))
.collect(),
};
match topic_entry.produce(partition_id, message).await {
Ok(offset) => {
if let Some(broker) = &state.kafka_broker {
broker.metrics().record_messages_produced(1);
}
#[cfg(feature = "kafka")]
{
let event = MessageEvent::Kafka(KafkaMessageEvent {
topic: request.topic.clone(),
key: key_clone,
value: request.value.clone(),
partition: partition_id,
offset,
headers: headers_clone,
timestamp: chrono::Utc::now().to_rfc3339(),
});
let _ = state.message_events.send(event);
}
Json(serde_json::json!({
"success": true,
"message": format!("Message produced to topic '{}'", request.topic),
"topic": request.topic,
"partition": partition_id,
"offset": offset
}))
.into_response()
}
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"error": "Failed to produce message",
"message": e.to_string()
})),
)
.into_response(),
}
} else {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({
"error": "Kafka broker not available",
"message": "Kafka broker is not enabled or not available."
})),
)
.into_response()
}
}
#[cfg(feature = "kafka")]
#[derive(Debug, Deserialize)]
pub struct KafkaBatchProduceRequest {
pub messages: Vec<KafkaProduceRequest>,
#[serde(default = "kafka_default_delay")]
pub delay_ms: u64,
}
#[cfg(feature = "kafka")]
fn kafka_default_delay() -> u64 {
100
}
#[cfg(feature = "kafka")]
pub(crate) async fn produce_kafka_batch(
State(state): State<ManagementState>,
Json(request): Json<KafkaBatchProduceRequest>,
) -> impl IntoResponse {
if let Some(broker) = &state.kafka_broker {
if request.messages.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"error": "Empty batch",
"message": "At least one message is required"
})),
)
.into_response();
}
let mut results = Vec::new();
for (index, msg_request) in request.messages.iter().enumerate() {
let mut topics = broker.topics.write().await;
let topic_entry = topics.entry(msg_request.topic.clone()).or_insert_with(|| {
mockforge_kafka::topics::Topic::new(
msg_request.topic.clone(),
mockforge_kafka::topics::TopicConfig::default(),
)
});
let partition_id = if let Some(partition) = msg_request.partition {
partition
} else {
topic_entry.assign_partition(msg_request.key.as_ref().map(|k| k.as_bytes()))
};
if partition_id < 0 || partition_id >= topic_entry.partitions.len() as i32 {
results.push(serde_json::json!({
"index": index,
"success": false,
"error": format!("Invalid partition {} (topic has {} partitions)", partition_id, topic_entry.partitions.len())
}));
continue;
}
let message = mockforge_kafka::partitions::KafkaMessage {
offset: 0,
timestamp: chrono::Utc::now().timestamp_millis(),
key: msg_request.key.clone().map(|k| k.as_bytes().to_vec()),
value: msg_request.value.as_bytes().to_vec(),
headers: msg_request
.headers
.clone()
.unwrap_or_default()
.into_iter()
.map(|(k, v)| (k, v.as_bytes().to_vec()))
.collect(),
};
match topic_entry.produce(partition_id, message).await {
Ok(offset) => {
if let Some(broker) = &state.kafka_broker {
broker.metrics().record_messages_produced(1);
}
let event = MessageEvent::Kafka(KafkaMessageEvent {
topic: msg_request.topic.clone(),
key: msg_request.key.clone(),
value: msg_request.value.clone(),
partition: partition_id,
offset,
headers: msg_request.headers.clone(),
timestamp: chrono::Utc::now().to_rfc3339(),
});
let _ = state.message_events.send(event);
results.push(serde_json::json!({
"index": index,
"success": true,
"topic": msg_request.topic,
"partition": partition_id,
"offset": offset
}));
}
Err(e) => {
results.push(serde_json::json!({
"index": index,
"success": false,
"error": e.to_string()
}));
}
}
if index < request.messages.len() - 1 && request.delay_ms > 0 {
tokio::time::sleep(std::time::Duration::from_millis(request.delay_ms)).await;
}
}
let success_count =
results.iter().filter(|r| r["success"].as_bool().unwrap_or(false)).count();
Json(serde_json::json!({
"success": true,
"total": request.messages.len(),
"succeeded": success_count,
"failed": request.messages.len() - success_count,
"results": results
}))
.into_response()
} else {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(serde_json::json!({
"error": "Kafka broker not available",
"message": "Kafka broker is not enabled or not available."
})),
)
.into_response()
}
}
#[cfg(feature = "kafka")]
pub(crate) async fn kafka_messages_stream(
State(state): State<ManagementState>,
Query(params): Query<std::collections::HashMap<String, String>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let rx = state.message_events.subscribe();
let topic_filter = params.get("topic").cloned();
let stream = stream::unfold(rx, move |mut rx| {
let topic_filter = topic_filter.clone();
async move {
loop {
match rx.recv().await {
#[cfg(feature = "mqtt")]
Ok(MessageEvent::Mqtt(_)) => {
continue;
}
Ok(MessageEvent::Kafka(event)) => {
if let Some(filter) = &topic_filter {
if !event.topic.contains(filter) {
continue;
}
}
let event_json = serde_json::json!({
"protocol": "kafka",
"topic": event.topic,
"key": event.key,
"value": event.value,
"partition": event.partition,
"offset": event.offset,
"headers": event.headers,
"timestamp": event.timestamp,
});
if let Ok(event_data) = serde_json::to_string(&event_json) {
let sse_event =
Event::default().event("kafka_message").data(event_data);
return Some((Ok(sse_event), rx));
}
}
Err(broadcast::error::RecvError::Closed) => {
return None;
}
Err(broadcast::error::RecvError::Lagged(skipped)) => {
warn!("Kafka message stream lagged, skipped {} messages", skipped);
continue;
}
}
}
}
});
Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(std::time::Duration::from_secs(15))
.text("keep-alive-text"),
)
}
#[cfg(feature = "amqp")]
mod amqp {
use super::*;
pub(crate) use mockforge_amqp::bindings::Binding;
use mockforge_amqp::exchanges::ExchangeType;
use mockforge_amqp::messages::Message;
pub(crate) use mockforge_amqp::messages::QueuedMessage;
#[derive(Debug, Clone, Serialize)]
pub(crate) struct AmqpBrokerStats {
pub exchanges: usize,
pub queues: usize,
pub bindings: usize,
pub buffered_messages: usize,
pub connections_active: u64,
pub channels_active: u64,
pub messages_published_total: u64,
pub messages_consumed_total: u64,
}
#[derive(Debug, Clone, Serialize)]
pub(crate) struct AmqpBindingInfo {
pub queue: String,
pub routing_key: String,
}
#[derive(Debug, Clone, Serialize)]
pub(crate) struct AmqpExchangeInfo {
pub name: String,
#[serde(rename = "type")]
pub exchange_type: String,
pub durable: bool,
pub auto_delete: bool,
pub bindings: Vec<AmqpBindingInfo>,
}
#[derive(Debug, Clone, Serialize)]
pub(crate) struct AmqpQueueInfo {
pub name: String,
pub durable: bool,
pub exclusive: bool,
pub auto_delete: bool,
pub message_count: usize,
pub consumer_count: usize,
}
#[derive(Debug, Deserialize)]
pub(crate) struct DeclareExchangeRequest {
pub name: String,
#[serde(default = "default_exchange_type")]
pub r#type: String,
#[serde(default)]
pub durable: bool,
#[serde(default)]
pub auto_delete: bool,
}
fn default_exchange_type() -> String {
"direct".to_string()
}
#[derive(Debug, Deserialize)]
pub(crate) struct DeclareQueueRequest {
pub name: String,
#[serde(default)]
pub durable: bool,
#[serde(default)]
pub exclusive: bool,
#[serde(default)]
pub auto_delete: bool,
}
#[derive(Debug, Deserialize)]
pub(crate) struct AddBindingRequest {
pub queue: String,
#[serde(default)]
pub routing_key: String,
}
#[derive(Debug, Deserialize)]
pub(crate) struct PublishRequest {
#[serde(default)]
pub exchange: String,
#[serde(default)]
pub routing_key: String,
#[serde(default)]
pub payload: String,
}
pub(crate) fn exchange_type_str(t: &ExchangeType) -> &'static str {
match t {
ExchangeType::Direct => "direct",
ExchangeType::Fanout => "fanout",
ExchangeType::Topic => "topic",
ExchangeType::Headers => "headers",
}
}
pub(crate) fn parse_exchange_type(s: &str) -> Option<ExchangeType> {
match s.to_ascii_lowercase().as_str() {
"direct" => Some(ExchangeType::Direct),
"fanout" => Some(ExchangeType::Fanout),
"topic" => Some(ExchangeType::Topic),
"headers" => Some(ExchangeType::Headers),
_ => None,
}
}
pub(crate) fn build_message(routing_key: String, payload: String) -> Message {
Message {
properties: Default::default(),
body: payload.into_bytes(),
routing_key,
}
}
}
#[cfg(feature = "amqp")]
use amqp::*;
#[cfg(feature = "amqp")]
pub(crate) async fn get_amqp_stats(State(state): State<ManagementState>) -> impl IntoResponse {
let Some(broker) = &state.amqp_broker else {
return (StatusCode::SERVICE_UNAVAILABLE, "AMQP broker not available").into_response();
};
let snap = broker.metrics.snapshot();
let (exchanges, bindings) = {
let mgr = broker.exchanges.read().await;
let list = mgr.list_exchanges();
let bindings = list.iter().map(|e| e.bindings.len()).sum();
(list.len(), bindings)
};
let (queues, buffered_messages) = {
let mgr = broker.queues.read().await;
let list = mgr.list_queues();
let buffered = list.iter().map(|q| q.messages.len()).sum();
(list.len(), buffered)
};
Json(AmqpBrokerStats {
exchanges,
queues,
bindings,
buffered_messages,
connections_active: snap.connections_active,
channels_active: snap.channels_active,
messages_published_total: snap.messages_published_total,
messages_consumed_total: snap.messages_consumed_total,
})
.into_response()
}
#[cfg(feature = "amqp")]
pub(crate) async fn get_amqp_exchanges(State(state): State<ManagementState>) -> impl IntoResponse {
let Some(broker) = &state.amqp_broker else {
return (StatusCode::SERVICE_UNAVAILABLE, "AMQP broker not available").into_response();
};
let mgr = broker.exchanges.read().await;
let exchanges: Vec<AmqpExchangeInfo> = mgr
.list_exchanges()
.into_iter()
.map(|e| AmqpExchangeInfo {
name: e.name.clone(),
exchange_type: exchange_type_str(&e.exchange_type).to_string(),
durable: e.durable,
auto_delete: e.auto_delete,
bindings: e
.bindings
.iter()
.map(|b| AmqpBindingInfo {
queue: b.queue.clone(),
routing_key: b.routing_key.clone(),
})
.collect(),
})
.collect();
Json(serde_json::json!({ "exchanges": exchanges })).into_response()
}
#[cfg(feature = "amqp")]
pub(crate) async fn declare_amqp_exchange(
State(state): State<ManagementState>,
Json(req): Json<DeclareExchangeRequest>,
) -> impl IntoResponse {
let Some(broker) = &state.amqp_broker else {
return (StatusCode::SERVICE_UNAVAILABLE, "AMQP broker not available").into_response();
};
if req.name.trim().is_empty() {
return (StatusCode::BAD_REQUEST, "exchange name is required").into_response();
}
let Some(exchange_type) = parse_exchange_type(&req.r#type) else {
return (StatusCode::BAD_REQUEST, "type must be one of: direct, fanout, topic, headers")
.into_response();
};
broker.exchanges.write().await.declare_exchange(
req.name.clone(),
exchange_type,
req.durable,
req.auto_delete,
);
(StatusCode::OK, Json(serde_json::json!({ "declared": req.name }))).into_response()
}
#[cfg(feature = "amqp")]
pub(crate) async fn delete_amqp_exchange(
State(state): State<ManagementState>,
Path(name): Path<String>,
) -> impl IntoResponse {
let Some(broker) = &state.amqp_broker else {
return (StatusCode::SERVICE_UNAVAILABLE, "AMQP broker not available").into_response();
};
if broker.exchanges.write().await.delete_exchange(&name) {
(StatusCode::OK, Json(serde_json::json!({ "deleted": name }))).into_response()
} else {
(StatusCode::NOT_FOUND, format!("exchange '{}' not found", name)).into_response()
}
}
#[cfg(feature = "amqp")]
pub(crate) async fn add_amqp_binding(
State(state): State<ManagementState>,
Path(name): Path<String>,
Json(req): Json<AddBindingRequest>,
) -> impl IntoResponse {
let Some(broker) = &state.amqp_broker else {
return (StatusCode::SERVICE_UNAVAILABLE, "AMQP broker not available").into_response();
};
if req.queue.trim().is_empty() {
return (StatusCode::BAD_REQUEST, "queue is required").into_response();
}
let binding = Binding::new(name.clone(), req.queue.clone(), req.routing_key.clone());
if broker.exchanges.write().await.add_binding(&name, binding) {
(
StatusCode::OK,
Json(serde_json::json!({
"exchange": name,
"queue": req.queue,
"routing_key": req.routing_key,
})),
)
.into_response()
} else {
(StatusCode::NOT_FOUND, format!("exchange '{}' not found", name)).into_response()
}
}
#[cfg(feature = "amqp")]
pub(crate) async fn get_amqp_queues(State(state): State<ManagementState>) -> impl IntoResponse {
let Some(broker) = &state.amqp_broker else {
return (StatusCode::SERVICE_UNAVAILABLE, "AMQP broker not available").into_response();
};
let mgr = broker.queues.read().await;
let queues: Vec<AmqpQueueInfo> = mgr
.list_queues()
.into_iter()
.map(|q| AmqpQueueInfo {
name: q.name.clone(),
durable: q.durable,
exclusive: q.exclusive,
auto_delete: q.auto_delete,
message_count: q.messages.len(),
consumer_count: q.consumers.len(),
})
.collect();
Json(serde_json::json!({ "queues": queues })).into_response()
}
#[cfg(feature = "amqp")]
pub(crate) async fn declare_amqp_queue(
State(state): State<ManagementState>,
Json(req): Json<DeclareQueueRequest>,
) -> impl IntoResponse {
let Some(broker) = &state.amqp_broker else {
return (StatusCode::SERVICE_UNAVAILABLE, "AMQP broker not available").into_response();
};
if req.name.trim().is_empty() {
return (StatusCode::BAD_REQUEST, "queue name is required").into_response();
}
broker.queues.write().await.declare_queue(
req.name.clone(),
req.durable,
req.exclusive,
req.auto_delete,
);
(StatusCode::OK, Json(serde_json::json!({ "declared": req.name }))).into_response()
}
#[cfg(feature = "amqp")]
pub(crate) async fn publish_amqp_message(
State(state): State<ManagementState>,
Json(req): Json<PublishRequest>,
) -> impl IntoResponse {
let Some(broker) = &state.amqp_broker else {
return (StatusCode::SERVICE_UNAVAILABLE, "AMQP broker not available").into_response();
};
let message = build_message(req.routing_key.clone(), req.payload.clone());
let targets = {
let exchanges = broker.exchanges.read().await;
if req.exchange.is_empty() {
vec![req.routing_key.clone()]
} else if let Some(exchange) = exchanges.get_exchange(&req.exchange) {
exchange.route_message(&message, &req.routing_key)
} else {
return (StatusCode::NOT_FOUND, format!("exchange '{}' not found", req.exchange))
.into_response();
}
};
let mut delivered = Vec::new();
{
let mut queues = broker.queues.write().await;
for queue_name in targets {
let queued = QueuedMessage::new(message.clone());
match queues.enqueue_and_notify(&queue_name, queued) {
Ok(()) => delivered.push(queue_name),
Err(e) => {
tracing::warn!(
"AMQP admin publish: failed to enqueue to {}: {}",
queue_name,
e
);
}
}
}
}
Json(serde_json::json!({
"success": true,
"exchange": req.exchange,
"routing_key": req.routing_key,
"queued_to": delivered,
}))
.into_response()
}
#[cfg(all(test, feature = "amqp"))]
mod amqp_tests {
use super::amqp::{build_message, exchange_type_str, parse_exchange_type};
use mockforge_amqp::bindings::Binding;
use mockforge_amqp::exchanges::{ExchangeManager, ExchangeType};
use mockforge_amqp::messages::QueuedMessage;
use mockforge_amqp::queues::QueueManager;
#[test]
fn parse_and_render_exchange_types_round_trip() {
for (s, t) in [
("direct", ExchangeType::Direct),
("FANOUT", ExchangeType::Fanout),
("topic", ExchangeType::Topic),
("Headers", ExchangeType::Headers),
] {
assert_eq!(parse_exchange_type(s).expect("known type"), t);
}
assert!(parse_exchange_type("bogus").is_none());
assert_eq!(exchange_type_str(&ExchangeType::Topic), "topic");
assert_eq!(exchange_type_str(&ExchangeType::Fanout), "fanout");
}
#[test]
fn publish_routes_through_direct_exchange_to_bound_queue() {
let mut exchanges = ExchangeManager::new();
exchanges.declare_exchange("orders".into(), ExchangeType::Direct, false, false);
assert!(exchanges.add_binding(
"orders",
Binding::new("orders".into(), "q.orders".into(), "order.created".into()),
));
let msg = build_message("order.created".into(), "{\"id\":1}".into());
let targets =
exchanges.get_exchange("orders").unwrap().route_message(&msg, "order.created");
assert_eq!(targets, vec!["q.orders".to_string()]);
let mut queues = QueueManager::new();
queues.declare_queue("q.orders".into(), false, false, false);
queues.enqueue_and_notify("q.orders", QueuedMessage::new(msg)).expect("enqueue");
assert_eq!(queues.get_queue("q.orders").unwrap().messages.len(), 1);
}
#[test]
fn unknown_exchange_type_is_rejected() {
assert!(parse_exchange_type("").is_none());
assert!(parse_exchange_type("fan-out").is_none());
}
}