#![allow(dead_code)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AmqpDeliveryMode {
NonPersistent,
Persistent,
}
#[derive(Debug, Clone)]
pub struct AmqpMessage {
pub exchange: String,
pub routing_key: String,
pub body: Vec<u8>,
pub delivery_mode: AmqpDeliveryMode,
pub content_type: String,
}
#[derive(Debug, Default)]
pub struct AmqpExport {
pub messages: Vec<AmqpMessage>,
pub connection_url: String,
}
pub fn new_amqp_export(connection_url: &str) -> AmqpExport {
AmqpExport {
messages: Vec::new(),
connection_url: connection_url.to_owned(),
}
}
pub fn publish_amqp_text(
export: &mut AmqpExport,
exchange: &str,
routing_key: &str,
body: &str,
persistent: bool,
) {
export.messages.push(AmqpMessage {
exchange: exchange.to_owned(),
routing_key: routing_key.to_owned(),
body: body.as_bytes().to_vec(),
delivery_mode: if persistent {
AmqpDeliveryMode::Persistent
} else {
AmqpDeliveryMode::NonPersistent
},
content_type: "text/plain".to_owned(),
});
}
pub fn publish_amqp_json(export: &mut AmqpExport, exchange: &str, routing_key: &str, json: &str) {
export.messages.push(AmqpMessage {
exchange: exchange.to_owned(),
routing_key: routing_key.to_owned(),
body: json.as_bytes().to_vec(),
delivery_mode: AmqpDeliveryMode::Persistent,
content_type: "application/json".to_owned(),
});
}
pub fn amqp_message_count(export: &AmqpExport) -> usize {
export.messages.len()
}
pub fn persistent_message_count(export: &AmqpExport) -> usize {
export
.messages
.iter()
.filter(|m| m.delivery_mode == AmqpDeliveryMode::Persistent)
.count()
}
pub fn find_by_routing_key<'a>(export: &'a AmqpExport, key: &str) -> Option<&'a AmqpMessage> {
export.messages.iter().find(|m| m.routing_key == key)
}
pub fn total_amqp_bytes(export: &AmqpExport) -> usize {
export.messages.iter().map(|m| m.body.len()).sum()
}
pub fn amqp_export_to_json(export: &AmqpExport) -> String {
format!(
r#"{{"url":"{}", "message_count":{}, "total_bytes":{}}}"#,
export.connection_url,
amqp_message_count(export),
total_amqp_bytes(export)
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn new_export_empty() {
let e = new_amqp_export("amqp://localhost");
assert_eq!(amqp_message_count(&e), 0);
}
#[test]
fn publish_text_increments_count() {
let mut e = new_amqp_export("amqp://localhost");
publish_amqp_text(&mut e, "ex", "rk", "body", false);
assert_eq!(amqp_message_count(&e), 1);
}
#[test]
fn persistent_count_correct() {
let mut e = new_amqp_export("amqp://localhost");
publish_amqp_text(&mut e, "ex", "rk1", "a", true);
publish_amqp_text(&mut e, "ex", "rk2", "b", false);
assert_eq!(persistent_message_count(&e), 1);
}
#[test]
fn find_by_routing_key_success() {
let mut e = new_amqp_export("amqp://localhost");
publish_amqp_text(&mut e, "ex", "mesh.pos", "data", false);
assert!(find_by_routing_key(&e, "mesh.pos").is_some());
}
#[test]
fn find_missing_key_returns_none() {
let e = new_amqp_export("amqp://localhost");
assert!(find_by_routing_key(&e, "gone").is_none());
}
#[test]
fn total_bytes_correct() {
let mut e = new_amqp_export("amqp://localhost");
publish_amqp_text(&mut e, "ex", "rk", "test", false);
assert_eq!(total_amqp_bytes(&e), 4);
}
#[test]
fn json_content_type_set_for_json_publish() {
let mut e = new_amqp_export("amqp://localhost");
publish_amqp_json(&mut e, "ex", "rk.json", r#"{"a":1}"#);
assert_eq!(e.messages[0].content_type, "application/json");
}
#[test]
fn json_export_contains_url() {
let e = new_amqp_export("amqp://broker.example.com");
assert!(amqp_export_to_json(&e).contains("broker.example.com"));
}
#[test]
fn multiple_messages_counted() {
let mut e = new_amqp_export("amqp://localhost");
for i in 0..3 {
publish_amqp_text(&mut e, "ex", &format!("rk.{i}"), "x", false);
}
assert_eq!(amqp_message_count(&e), 3);
}
}