use anyhow::{Context, Result};
use std::path::PathBuf;
use thiserror::Error;
use tokio::fs;
use crate::models::{Message, MessageId, QueuedMessage, Report};
#[allow(dead_code)]
#[derive(Debug, Error)]
pub enum QueueError {
#[error("I/O error: {operation} at {path}: {source}")]
Io {
operation: &'static str,
path: String,
#[source]
source: std::io::Error,
},
#[error("YAML parsing error for {file_type} at {path}: {details}")]
YamlParsing {
file_type: &'static str,
path: String,
details: String,
},
#[error("YAML serialization error for {file_type}: {details}")]
YamlSerialization {
file_type: &'static str,
details: String,
},
#[error("Message validation error: {field}: {reason}")]
Validation { field: &'static str, reason: String },
#[error("Message not found: {message_id}")]
MessageNotFound { message_id: String },
#[error("Queue directory error: {path}: {reason}")]
QueueDirectory { path: String, reason: String },
#[error("Atomic write failed for {path}: {reason}")]
AtomicWrite { path: String, reason: String },
}
#[allow(dead_code)]
impl QueueError {
pub fn io(operation: &'static str, path: impl Into<String>, source: std::io::Error) -> Self {
Self::Io {
operation,
path: path.into(),
source,
}
}
pub fn yaml_parsing(
file_type: &'static str,
path: impl Into<String>,
details: impl Into<String>,
) -> Self {
Self::YamlParsing {
file_type,
path: path.into(),
details: details.into(),
}
}
pub fn validation(field: &'static str, reason: impl Into<String>) -> Self {
Self::Validation {
field,
reason: reason.into(),
}
}
}
#[allow(dead_code)]
pub type QueueResult<T> = std::result::Result<T, QueueError>;
pub struct QueueManager {
base_path: PathBuf,
}
impl QueueManager {
pub fn new(queue_path: PathBuf) -> Self {
Self {
base_path: queue_path,
}
}
fn reports_path(&self) -> PathBuf {
self.base_path.join("reports")
}
fn messages_path(&self) -> PathBuf {
self.base_path.join("messages")
}
fn queue_path(&self) -> PathBuf {
self.messages_path().join("queue")
}
fn outbox_path(&self) -> PathBuf {
self.messages_path().join("outbox")
}
fn status_path(&self) -> PathBuf {
self.base_path.join("status")
}
#[allow(dead_code)]
fn report_file(&self, expert_id: u32) -> PathBuf {
self.reports_path()
.join(format!("expert{expert_id}_report.yaml"))
}
fn message_file(&self, message_id: &str) -> PathBuf {
self.queue_path().join(format!("{message_id}.yaml"))
}
pub async fn init(&self) -> Result<()> {
fs::create_dir_all(self.reports_path()).await?;
fs::create_dir_all(self.status_path()).await?;
self.init_message_queue().await?;
Ok(())
}
pub async fn init_message_queue(&self) -> Result<()> {
fs::create_dir_all(self.queue_path()).await?;
fs::create_dir_all(self.outbox_path()).await?;
Ok(())
}
#[allow(dead_code)]
pub async fn write_report(&self, report: &Report) -> Result<()> {
let path = self.report_file(report.expert_id);
let content = serde_yaml::to_string(report)?;
fs::write(&path, content)
.await
.context("Failed to write report file")?;
Ok(())
}
#[allow(dead_code)]
pub async fn read_report(&self, expert_id: u32) -> Result<Option<Report>> {
let path = self.report_file(expert_id);
if !path.exists() {
return Ok(None);
}
let content = fs::read_to_string(&path)
.await
.context("Failed to read report file")?;
let report: Report = serde_yaml::from_str(&content)?;
Ok(Some(report))
}
#[allow(dead_code)]
pub async fn clear_report(&self, expert_id: u32) -> Result<()> {
let path = self.report_file(expert_id);
if path.exists() {
fs::remove_file(&path).await?;
}
Ok(())
}
pub async fn list_reports(&self) -> Result<Vec<Report>> {
let mut reports = Vec::new();
let reports_path = self.reports_path();
if !reports_path.exists() {
return Ok(reports);
}
let mut entries = fs::read_dir(&reports_path).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.extension().is_some_and(|e| e == "yaml") {
match fs::read_to_string(&path).await {
Ok(content) => match serde_yaml::from_str::<Report>(&content) {
Ok(report) => {
if let Err(validation_errors) = report.validate() {
tracing::warn!(
"Report {} has validation warnings: {:?}",
path.display(),
validation_errors
);
}
reports.push(report);
}
Err(e) => {
tracing::error!(
"Failed to parse report file {}: {}",
path.display(),
e
);
}
},
Err(e) => {
tracing::error!("Failed to read report file {}: {}", path.display(), e);
}
}
}
}
reports.sort_by_key(|r| r.started_at);
Ok(reports)
}
pub async fn enqueue(&self, message: &Message) -> Result<()> {
let queued_message = QueuedMessage::new(message.clone());
let path = self.message_file(&message.message_id);
let yaml = serde_yaml::to_string(&queued_message)
.context("Failed to serialize message to YAML")?;
let temp_path = path.with_extension("yaml.tmp");
fs::write(&temp_path, yaml)
.await
.context("Failed to write message to temp file")?;
fs::rename(&temp_path, &path)
.await
.context("Failed to atomically move message file")?;
tracing::debug!("Enqueued message {} to queue", message.message_id);
Ok(())
}
pub async fn read_queue(&self) -> Result<Vec<QueuedMessage>> {
let mut messages = Vec::new();
let queue = self.queue_path();
if !queue.exists() {
return Ok(messages);
}
let mut entries = fs::read_dir(&queue).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.extension().is_some_and(|e| e == "yaml") {
match fs::read_to_string(&path).await {
Ok(content) => match serde_yaml::from_str::<QueuedMessage>(&content) {
Ok(mut queued_msg) => {
if queued_msg.message.is_expired() {
queued_msg.mark_expired();
}
messages.push(queued_msg);
}
Err(e) => {
if let Some(raw_id) = Self::detect_misplaced_message(&content) {
tracing::error!(
"Misplaced raw Message detected in queue directory \
(id: {}, file: {}). This file appears to be a plain \
Message written directly to the queue instead of the \
outbox. Write messages to .macot/messages/outbox/ and \
let the control tower process them.",
raw_id,
path.display()
);
} else {
tracing::error!(
"Failed to parse message file {}: {}",
path.display(),
e
);
}
}
},
Err(e) => {
tracing::error!("Failed to read message file {}: {}", path.display(), e);
}
}
}
}
messages.sort_by(|a, b| {
b.message
.priority
.cmp(&a.message.priority)
.then_with(|| a.message.created_at.cmp(&b.message.created_at))
});
Ok(messages)
}
pub async fn dequeue(&self, message_id: &str) -> Result<()> {
let path = self.message_file(message_id);
if path.exists() {
fs::remove_file(&path)
.await
.context("Failed to remove message file")?;
tracing::debug!("Dequeued message {} from queue", message_id);
}
Ok(())
}
#[allow(dead_code)]
pub async fn queue_len(&self) -> Result<usize> {
Ok(self.read_queue().await?.len())
}
#[allow(dead_code)]
pub async fn update_delivery_attempts(&self, message_id: &str, attempts: u32) -> Result<()> {
let path = self.message_file(message_id);
if !path.exists() {
return Ok(());
}
let content = fs::read_to_string(&path)
.await
.context("Failed to read message file for update")?;
let mut queued_message: QueuedMessage =
serde_yaml::from_str(&content).context("Failed to parse message file for update")?;
queued_message.attempts = attempts;
queued_message.message.delivery_attempts = attempts;
let yaml = serde_yaml::to_string(&queued_message)
.context("Failed to serialize updated message")?;
let temp_path = path.with_extension("yaml.tmp");
fs::write(&temp_path, yaml)
.await
.context("Failed to write updated message to temp file")?;
fs::rename(&temp_path, &path)
.await
.context("Failed to atomically update message file")?;
tracing::debug!(
"Updated delivery attempts for message {} to {}",
message_id,
attempts
);
Ok(())
}
pub async fn update_message_status(
&self,
message_id: &str,
queued_message: &QueuedMessage,
) -> Result<()> {
let path = self.message_file(message_id);
if !path.exists() {
return Ok(());
}
let yaml = serde_yaml::to_string(queued_message)
.context("Failed to serialize message status update")?;
let temp_path = path.with_extension("yaml.tmp");
fs::write(&temp_path, yaml)
.await
.context("Failed to write message status to temp file")?;
fs::rename(&temp_path, &path)
.await
.context("Failed to atomically update message status")?;
tracing::debug!("Updated status for message {}", message_id);
Ok(())
}
pub async fn cleanup_expired_messages(&self) -> Result<Vec<MessageId>> {
let messages = self.read_queue().await?;
let mut removed_messages = Vec::new();
for queued_msg in messages {
let should_remove = if queued_msg.message.is_expired() {
tracing::info!(
"Removing expired message: {}",
queued_msg.message.message_id
);
true
} else if queued_msg.message.has_exceeded_max_attempts() {
tracing::warn!(
"Removing message {} after {} delivery attempts",
queued_msg.message.message_id,
queued_msg.message.delivery_attempts
);
true
} else {
false
};
if should_remove {
self.dequeue(&queued_msg.message.message_id).await?;
removed_messages.push(queued_msg.message.message_id);
}
}
if !removed_messages.is_empty() {
tracing::info!(
"Cleaned up {} expired/failed messages",
removed_messages.len()
);
}
Ok(removed_messages)
}
fn detect_misplaced_message(content: &str) -> Option<String> {
if serde_yaml::from_str::<QueuedMessage>(content).is_ok() {
return None;
}
match serde_yaml::from_str::<Message>(content) {
Ok(msg) if !msg.message_id.is_empty() => Some(msg.message_id),
_ => None,
}
}
pub async fn get_pending_messages(&self) -> Result<Vec<QueuedMessage>> {
let messages = self.read_queue().await?;
Ok(messages
.into_iter()
.filter(|msg| msg.should_retry())
.collect())
}
pub async fn process_outbox(&self) -> Result<Vec<MessageId>> {
let mut processed_messages = Vec::new();
let outbox = self.outbox_path();
if !outbox.exists() {
return Ok(processed_messages);
}
let mut entries = fs::read_dir(&outbox).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.extension().is_some_and(|e| e == "yaml") {
match self.process_outbox_file(&path).await {
Ok(message_id) => {
processed_messages.push(message_id);
if let Err(e) = fs::remove_file(&path).await {
tracing::warn!(
"Failed to remove processed outbox file {}: {}",
path.display(),
e
);
}
}
Err(e) => {
tracing::error!("Failed to process outbox file {}: {}", path.display(), e);
}
}
}
}
if !processed_messages.is_empty() {
tracing::info!(
"Processed {} messages from outbox",
processed_messages.len()
);
}
Ok(processed_messages)
}
async fn process_outbox_file(&self, file_path: &std::path::Path) -> Result<MessageId> {
let content = fs::read_to_string(file_path)
.await
.context("Failed to read outbox file")?;
let message: Message =
serde_yaml::from_str(&content).context("Failed to parse message YAML from outbox")?;
self.validate_message(&message)?;
self.enqueue(&message).await?;
tracing::debug!("Processed outbox message: {}", message.message_id);
Ok(message.message_id)
}
fn validate_message(&self, message: &Message) -> Result<()> {
if message.message_id.is_empty() {
return Err(anyhow::anyhow!("Message ID is required"));
}
if message.content.subject.is_empty() {
return Err(anyhow::anyhow!("Message subject is required"));
}
if message.content.body.is_empty() {
return Err(anyhow::anyhow!("Message body is required"));
}
Ok(())
}
#[allow(dead_code)]
pub async fn cleanup(&self) -> Result<()> {
if self.reports_path().exists() {
fs::remove_dir_all(self.reports_path()).await?;
}
if self.messages_path().exists() {
fs::remove_dir_all(self.messages_path()).await?;
}
self.init().await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::models::TaskStatus;
use tempfile::TempDir;
async fn create_test_manager() -> (QueueManager, TempDir) {
let temp_dir = TempDir::new().unwrap();
let manager = QueueManager::new(temp_dir.path().to_path_buf());
manager.init().await.unwrap();
(manager, temp_dir)
}
#[tokio::test]
async fn queue_manager_init_creates_directories() {
let (manager, _temp) = create_test_manager().await;
assert!(manager.reports_path().exists());
assert!(manager.status_path().exists());
}
#[tokio::test]
async fn queue_manager_write_and_read_report() {
let (manager, _temp) = create_test_manager().await;
let report = Report::new("task-001".to_string(), 0, "architect".to_string())
.complete("Done".to_string());
manager.write_report(&report).await.unwrap();
let loaded = manager.read_report(0).await.unwrap();
assert!(loaded.is_some());
let loaded = loaded.unwrap();
assert_eq!(loaded.task_id, "task-001");
assert_eq!(loaded.status, TaskStatus::Done);
}
#[tokio::test]
async fn queue_manager_list_reports_returns_all() {
let (manager, _temp) = create_test_manager().await;
let report1 = Report::new("task-001".to_string(), 0, "architect".to_string());
let report2 = Report::new("task-002".to_string(), 1, "frontend".to_string());
manager.write_report(&report1).await.unwrap();
manager.write_report(&report2).await.unwrap();
let reports = manager.list_reports().await.unwrap();
assert_eq!(reports.len(), 2);
}
#[tokio::test]
async fn queue_manager_cleanup_removes_all() {
let (manager, _temp) = create_test_manager().await;
let report = Report::new("task-001".to_string(), 0, "architect".to_string());
manager.write_report(&report).await.unwrap();
manager.cleanup().await.unwrap();
let reports = manager.list_reports().await.unwrap();
assert!(reports.is_empty());
}
use crate::models::{MessageContent, MessagePriority, MessageRecipient, MessageType};
fn create_test_message() -> Message {
let content = MessageContent {
subject: "Test Subject".to_string(),
body: "Test Body".to_string(),
};
let recipient = MessageRecipient::expert_id(1);
Message::new(0, recipient, MessageType::Query, content)
}
#[tokio::test]
async fn queue_manager_init_creates_message_directories() {
let (manager, _temp) = create_test_manager().await;
assert!(manager.messages_path().exists());
assert!(manager.queue_path().exists());
assert!(manager.outbox_path().exists());
}
#[tokio::test]
async fn queue_manager_enqueue_and_read_message() {
let (manager, _temp) = create_test_manager().await;
let message = create_test_message();
manager.enqueue(&message).await.unwrap();
let messages = manager.read_queue().await.unwrap();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].message.message_id, message.message_id);
assert_eq!(messages[0].message.content.subject, "Test Subject");
}
#[tokio::test]
async fn queue_manager_dequeue_removes_message() {
let (manager, _temp) = create_test_manager().await;
let message = create_test_message();
manager.enqueue(&message).await.unwrap();
assert_eq!(manager.queue_len().await.unwrap(), 1);
manager.dequeue(&message.message_id).await.unwrap();
assert_eq!(manager.queue_len().await.unwrap(), 0);
}
#[tokio::test]
async fn queue_manager_priority_ordering() {
let (manager, _temp) = create_test_manager().await;
let content1 = MessageContent {
subject: "Low Priority".to_string(),
body: "Low priority message".to_string(),
};
let low_msg = Message::new(
0,
MessageRecipient::expert_id(1),
MessageType::Query,
content1,
)
.with_priority(MessagePriority::Low);
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
let content2 = MessageContent {
subject: "High Priority".to_string(),
body: "High priority message".to_string(),
};
let high_msg = Message::new(
0,
MessageRecipient::expert_id(2),
MessageType::Query,
content2,
)
.with_priority(MessagePriority::High);
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
let content3 = MessageContent {
subject: "Normal Priority".to_string(),
body: "Normal priority message".to_string(),
};
let normal_msg = Message::new(
0,
MessageRecipient::expert_id(3),
MessageType::Query,
content3,
)
.with_priority(MessagePriority::Normal);
manager.enqueue(&low_msg).await.unwrap();
manager.enqueue(&high_msg).await.unwrap();
manager.enqueue(&normal_msg).await.unwrap();
let messages = manager.read_queue().await.unwrap();
assert_eq!(messages.len(), 3);
assert_eq!(messages[0].message.priority, MessagePriority::High);
assert_eq!(messages[1].message.priority, MessagePriority::Normal);
assert_eq!(messages[2].message.priority, MessagePriority::Low);
}
#[tokio::test]
async fn queue_manager_update_delivery_attempts() {
let (manager, _temp) = create_test_manager().await;
let message = create_test_message();
manager.enqueue(&message).await.unwrap();
manager
.update_delivery_attempts(&message.message_id, 5)
.await
.unwrap();
let messages = manager.read_queue().await.unwrap();
assert_eq!(messages[0].attempts, 5);
assert_eq!(messages[0].message.delivery_attempts, 5);
}
#[tokio::test]
async fn queue_manager_cleanup_expired_messages() {
let (manager, _temp) = create_test_manager().await;
let expired_content = MessageContent {
subject: "Expired Message".to_string(),
body: "This message will expire".to_string(),
};
let expired_msg = Message::new(
0,
MessageRecipient::expert_id(1),
MessageType::Query,
expired_content,
)
.with_ttl_seconds(0);
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
let normal_content = MessageContent {
subject: "Normal Message".to_string(),
body: "This message is normal".to_string(),
};
let normal_msg = Message::new(
0,
MessageRecipient::expert_id(2),
MessageType::Query,
normal_content,
);
manager.enqueue(&expired_msg).await.unwrap();
manager.enqueue(&normal_msg).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
let removed = manager.cleanup_expired_messages().await.unwrap();
assert_eq!(removed.len(), 1);
assert_eq!(removed[0], expired_msg.message_id);
let remaining = manager.read_queue().await.unwrap();
assert_eq!(remaining.len(), 1);
assert_eq!(remaining[0].message.message_id, normal_msg.message_id);
}
#[tokio::test]
async fn queue_manager_get_pending_messages_filters_correctly() {
let (manager, _temp) = create_test_manager().await;
let pending_content = MessageContent {
subject: "Pending Message".to_string(),
body: "This message is pending".to_string(),
};
let pending_msg = Message::new(
0,
MessageRecipient::expert_id(1),
MessageType::Query,
pending_content,
);
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
let expired_content = MessageContent {
subject: "Expired Message".to_string(),
body: "This message will expire".to_string(),
};
let expired_msg = Message::new(
0,
MessageRecipient::expert_id(2),
MessageType::Query,
expired_content,
)
.with_ttl_seconds(0);
manager.enqueue(&pending_msg).await.unwrap();
manager.enqueue(&expired_msg).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
let pending = manager.get_pending_messages().await.unwrap();
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].message.message_id, pending_msg.message_id);
}
#[tokio::test]
async fn queue_manager_process_outbox_valid_message() {
let (manager, _temp) = create_test_manager().await;
let message = create_test_message();
let outbox_path = manager.outbox_path();
let message_file = outbox_path.join(format!("{}.yaml", message.message_id));
let yaml_content = serde_yaml::to_string(&message).unwrap();
fs::write(&message_file, yaml_content).await.unwrap();
let processed = manager.process_outbox().await.unwrap();
assert_eq!(processed.len(), 1);
assert_eq!(processed[0], message.message_id);
let messages = manager.read_queue().await.unwrap();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].message.message_id, message.message_id);
assert!(!message_file.exists());
}
#[test]
fn detect_misplaced_message_identifies_raw_message() {
let content = MessageContent {
subject: "Test".to_string(),
body: "Body".to_string(),
};
let msg = Message::new(
0,
MessageRecipient::expert_id(1),
MessageType::Query,
content,
);
let yaml = serde_yaml::to_string(&msg).unwrap();
let result = QueueManager::detect_misplaced_message(&yaml);
assert!(
result.is_some(),
"detect_misplaced_message: should identify a raw Message YAML"
);
assert_eq!(
result.unwrap(),
msg.message_id,
"detect_misplaced_message: should return the message_id"
);
}
#[test]
fn detect_misplaced_message_returns_none_for_queued_message() {
let content = MessageContent {
subject: "Test".to_string(),
body: "Body".to_string(),
};
let msg = Message::new(
0,
MessageRecipient::expert_id(1),
MessageType::Query,
content,
);
let queued = QueuedMessage::new(msg);
let yaml = serde_yaml::to_string(&queued).unwrap();
let result = QueueManager::detect_misplaced_message(&yaml);
assert!(
result.is_none(),
"detect_misplaced_message: should return None for valid QueuedMessage"
);
}
#[test]
fn detect_misplaced_message_returns_none_for_garbage() {
let result = QueueManager::detect_misplaced_message("not valid yaml at all {{{");
assert!(
result.is_none(),
"detect_misplaced_message: should return None for unparseable content"
);
}
#[test]
fn detect_misplaced_message_returns_none_for_empty_id() {
let yaml = r#"
message_id: ""
from_expert_id: 0
to:
expert_id: 1
message_type: query
priority: normal
created_at: "2024-01-15T10:30:00Z"
content:
subject: "Test"
body: "Body"
"#;
let result = QueueManager::detect_misplaced_message(yaml);
assert!(
result.is_none(),
"detect_misplaced_message: should return None when message_id is empty"
);
}
#[tokio::test]
async fn read_queue_skips_raw_message_in_queue_directory() {
let (manager, _temp) = create_test_manager().await;
let content = MessageContent {
subject: "Misplaced".to_string(),
body: "This was written directly to queue".to_string(),
};
let raw_msg = Message::new(
3,
MessageRecipient::expert_id(2),
MessageType::Notify,
content,
);
let yaml = serde_yaml::to_string(&raw_msg).unwrap();
let bad_file = manager
.queue_path()
.join("msg_expert3_to_expert2_20260220.yaml");
fs::write(&bad_file, &yaml).await.unwrap();
let valid_msg = create_test_message();
manager.enqueue(&valid_msg).await.unwrap();
let messages = manager.read_queue().await.unwrap();
assert_eq!(
messages.len(),
1,
"read_queue: should skip raw Message files in queue directory"
);
assert_eq!(messages[0].message.message_id, valid_msg.message_id);
}
#[tokio::test]
async fn queue_manager_process_outbox_invalid_message() {
let (manager, _temp) = create_test_manager().await;
let outbox_path = manager.outbox_path();
let invalid_file = outbox_path.join("invalid.yaml");
fs::write(&invalid_file, "invalid: yaml: content: [")
.await
.unwrap();
let processed = manager.process_outbox().await.unwrap();
assert_eq!(processed.len(), 0);
let messages = manager.read_queue().await.unwrap();
assert_eq!(messages.len(), 0);
assert!(invalid_file.exists());
}
}
#[cfg(test)]
mod property_tests {
use super::*;
use crate::models::{MessageContent, MessagePriority, MessageRecipient, MessageType};
use proptest::prelude::*;
fn arbitrary_message_content() -> impl Strategy<Value = MessageContent> {
("[a-zA-Z0-9 ]{1,100}", "[a-zA-Z0-9 \n]{1,1000}")
.prop_map(|(subject, body)| MessageContent { subject, body })
}
fn arbitrary_message_recipient() -> impl Strategy<Value = MessageRecipient> {
prop_oneof![
(0u32..100).prop_map(MessageRecipient::expert_id),
"[a-zA-Z0-9-]{1,50}".prop_map(MessageRecipient::role),
]
}
fn arbitrary_message_type() -> impl Strategy<Value = MessageType> {
prop_oneof![
Just(MessageType::Query),
Just(MessageType::Response),
Just(MessageType::Notify),
Just(MessageType::Delegate),
]
}
fn arbitrary_message_priority() -> impl Strategy<Value = MessagePriority> {
prop_oneof![
Just(MessagePriority::Low),
Just(MessagePriority::Normal),
Just(MessagePriority::High),
]
}
fn arbitrary_valid_message() -> impl Strategy<Value = Message> {
(
0u32..100,
arbitrary_message_recipient(),
arbitrary_message_type(),
arbitrary_message_content(),
arbitrary_message_priority(),
1u64..86400,
)
.prop_map(
|(from_expert_id, to, message_type, content, priority, ttl_seconds)| {
Message::new(from_expert_id, to, message_type, content)
.with_priority(priority)
.with_ttl_seconds(ttl_seconds)
},
)
}
async fn create_test_manager_for_props() -> (QueueManager, tempfile::TempDir) {
let temp_dir = tempfile::TempDir::new().unwrap();
let manager = QueueManager::new(temp_dir.path().to_path_buf());
manager.init().await.unwrap();
(manager, temp_dir)
}
proptest! {
#[test]
fn message_validation_consistency(
message in arbitrary_valid_message()
) {
tokio_test::block_on(async {
let (manager, _temp) = create_test_manager_for_props().await;
let result = manager.enqueue(&message).await;
assert!(result.is_ok(), "Valid message should be accepted: {:?}", result);
let messages = manager.read_queue().await.unwrap();
assert!(!messages.is_empty(), "Enqueued message should be in queue");
let found_message = messages.iter()
.find(|m| m.message.message_id == message.message_id);
assert!(found_message.is_some(), "Enqueued message should be findable in queue");
let found = found_message.unwrap();
assert_eq!(found.message.content.subject, message.content.subject);
assert_eq!(found.message.content.body, message.content.body);
assert_eq!(found.message.priority, message.priority);
assert_eq!(found.message.message_type, message.message_type);
});
}
#[test]
fn message_queue_accepts_valid_outbox_messages(
message in arbitrary_valid_message()
) {
tokio_test::block_on(async {
let (manager, _temp) = create_test_manager_for_props().await;
let outbox_path = manager.outbox_path();
let message_file = outbox_path.join(format!("{}.yaml", message.message_id));
let yaml_content = serde_yaml::to_string(&message).unwrap();
fs::write(&message_file, yaml_content).await.unwrap();
let processed = manager.process_outbox().await.unwrap();
assert_eq!(processed.len(), 1, "One message should be processed from outbox");
assert_eq!(processed[0], message.message_id, "Processed message ID should match");
let queued_messages = manager.read_queue().await.unwrap();
assert_eq!(queued_messages.len(), 1, "One message should be in queue");
let queued = &queued_messages[0];
assert_eq!(queued.message.message_id, message.message_id, "Message ID should be preserved");
assert!(!queued.message.message_id.is_empty(), "Message should have unique ID");
assert!(queued.message.created_at <= chrono::Utc::now(), "Message should have valid timestamp");
assert_eq!(queued.message.content.subject, message.content.subject);
assert_eq!(queued.message.content.body, message.content.body);
assert_eq!(queued.message.priority, message.priority);
assert_eq!(queued.message.message_type, message.message_type);
assert_eq!(queued.message.to, message.to);
assert_eq!(queued.message.from_expert_id, message.from_expert_id);
assert!(!message_file.exists(), "Outbox file should be removed after processing");
});
}
#[test]
fn message_queue_operations_are_consistent(
messages in prop::collection::vec(arbitrary_valid_message(), 1..10)
) {
tokio_test::block_on(async {
let (manager, _temp) = create_test_manager_for_props().await;
let mut unique_messages = Vec::new();
for (i, mut message) in messages.into_iter().enumerate() {
message.message_id = format!("{}-{}", message.message_id, i);
unique_messages.push(message);
if i < 9 {
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
}
}
for message in &unique_messages {
manager.enqueue(message).await.unwrap();
}
let queue_len = manager.queue_len().await.unwrap();
assert_eq!(queue_len, unique_messages.len());
let queued_messages = manager.read_queue().await.unwrap();
assert_eq!(queued_messages.len(), unique_messages.len());
for original in &unique_messages {
let found = queued_messages.iter()
.find(|q| q.message.message_id == original.message_id);
assert!(found.is_some(), "Message {} should be in queue", original.message_id);
}
});
}
#[test]
fn message_dequeue_removes_correctly(
message in arbitrary_valid_message()
) {
tokio_test::block_on(async {
let (manager, _temp) = create_test_manager_for_props().await;
manager.enqueue(&message).await.unwrap();
assert_eq!(manager.queue_len().await.unwrap(), 1);
manager.dequeue(&message.message_id).await.unwrap();
assert_eq!(manager.queue_len().await.unwrap(), 0);
let messages = manager.read_queue().await.unwrap();
assert!(messages.is_empty());
});
}
#[test]
fn message_priority_ordering_is_consistent(
messages in prop::collection::vec(arbitrary_valid_message(), 2..5)
) {
tokio_test::block_on(async {
let (manager, _temp) = create_test_manager_for_props().await;
let mut unique_messages = Vec::new();
for (i, mut message) in messages.into_iter().enumerate() {
message.message_id = format!("{}-{}", message.message_id, i);
unique_messages.push(message);
if i < 4 {
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
}
}
for message in &unique_messages {
manager.enqueue(message).await.unwrap();
}
let queued_messages = manager.read_queue().await.unwrap();
assert_eq!(queued_messages.len(), unique_messages.len());
for i in 1..queued_messages.len() {
let prev = &queued_messages[i-1];
let curr = &queued_messages[i];
assert!(
prev.message.priority > curr.message.priority ||
(prev.message.priority == curr.message.priority &&
prev.message.created_at <= curr.message.created_at),
"Messages should be ordered by priority then timestamp. Prev: {:?} {:?}, Curr: {:?} {:?}",
prev.message.priority, prev.message.created_at,
curr.message.priority, curr.message.created_at
);
}
});
}
#[test]
fn message_lifecycle_ttl_expiration(
message in arbitrary_valid_message()
) {
tokio_test::block_on(async {
let (manager, _temp) = create_test_manager_for_props().await;
let mut expired_message = message.clone();
expired_message.message_id = format!("{}-expired", expired_message.message_id);
expired_message.expires_at = Some(chrono::Utc::now() - chrono::Duration::seconds(1));
manager.enqueue(&expired_message).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(2)).await;
let mut valid_message = message;
valid_message.message_id = format!("{}-valid", valid_message.message_id);
valid_message.expires_at = Some(chrono::Utc::now() + chrono::Duration::hours(24));
manager.enqueue(&valid_message).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
let removed = manager.cleanup_expired_messages().await.unwrap();
assert!(
removed.contains(&expired_message.message_id),
"Expired message should be removed during cleanup"
);
assert!(
!removed.contains(&valid_message.message_id),
"Valid message should not be removed during cleanup"
);
let remaining = manager.read_queue().await.unwrap();
assert_eq!(remaining.len(), 1, "Only valid message should remain");
assert_eq!(
remaining[0].message.message_id,
valid_message.message_id,
"Remaining message should be the valid one"
);
});
}
#[test]
fn message_lifecycle_max_attempts_cleanup(
message in arbitrary_valid_message()
) {
tokio_test::block_on(async {
let (manager, _temp) = create_test_manager_for_props().await;
let mut max_attempts_message = message.clone();
max_attempts_message.message_id = format!("{}-maxattempts", max_attempts_message.message_id);
max_attempts_message.delivery_attempts = crate::models::MAX_DELIVERY_ATTEMPTS;
let mut valid_message = message;
valid_message.message_id = format!("{}-valid", valid_message.message_id);
valid_message.delivery_attempts = 0;
manager.enqueue(&max_attempts_message).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(2)).await;
manager.enqueue(&valid_message).await.unwrap();
let removed = manager.cleanup_expired_messages().await.unwrap();
assert!(
removed.contains(&max_attempts_message.message_id),
"Message with max delivery attempts should be removed"
);
assert!(
!removed.contains(&valid_message.message_id),
"Valid message should not be removed"
);
let remaining = manager.read_queue().await.unwrap();
assert_eq!(remaining.len(), 1, "Only valid message should remain");
assert_eq!(
remaining[0].message.message_id,
valid_message.message_id,
"Remaining message should be the valid one"
);
});
}
#[test]
fn message_lifecycle_get_pending_filters_correctly(
messages in prop::collection::vec(arbitrary_valid_message(), 2..5)
) {
tokio_test::block_on(async {
let (manager, _temp) = create_test_manager_for_props().await;
let mut pending_count = 0;
let mut expired_count = 0;
let mut max_attempts_count = 0;
for (i, mut message) in messages.into_iter().enumerate() {
message.message_id = format!("{}-{}", message.message_id, i);
match i % 3 {
0 => {
message.expires_at = Some(chrono::Utc::now() + chrono::Duration::hours(24));
message.delivery_attempts = 0;
pending_count += 1;
},
1 => {
message.expires_at = Some(chrono::Utc::now() - chrono::Duration::seconds(1));
expired_count += 1;
},
_ => {
message.delivery_attempts = crate::models::MAX_DELIVERY_ATTEMPTS;
max_attempts_count += 1;
}
}
manager.enqueue(&message).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(2)).await;
}
tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
let pending = manager.get_pending_messages().await.unwrap();
assert_eq!(
pending.len(),
pending_count,
"Only pending messages should be returned. Expected {}, got {}. Expired: {}, MaxAttempts: {}",
pending_count, pending.len(), expired_count, max_attempts_count
);
for queued_msg in &pending {
assert!(
!queued_msg.message.is_expired(),
"Pending messages should not be expired"
);
assert!(
!queued_msg.message.has_exceeded_max_attempts(),
"Pending messages should not have exceeded max attempts"
);
assert!(
queued_msg.should_retry(),
"Pending messages should be retryable"
);
}
});
}
#[test]
fn delivery_attempt_tracking_increments_correctly(
message in arbitrary_valid_message(),
attempts in 1u32..10
) {
tokio_test::block_on(async {
let (manager, _temp) = create_test_manager_for_props().await;
manager.enqueue(&message).await.unwrap();
manager.update_delivery_attempts(&message.message_id, attempts).await.unwrap();
let messages = manager.read_queue().await.unwrap();
assert_eq!(messages.len(), 1);
let queued_msg = &messages[0];
assert_eq!(
queued_msg.attempts, attempts,
"QueuedMessage.attempts should be updated to {}", attempts
);
assert_eq!(
queued_msg.message.delivery_attempts, attempts,
"Message.delivery_attempts should be updated to {}", attempts
);
});
}
#[test]
fn delivery_attempt_tracking_status_update(
message in arbitrary_valid_message()
) {
tokio_test::block_on(async {
let (manager, _temp) = create_test_manager_for_props().await;
manager.enqueue(&message).await.unwrap();
let mut messages = manager.read_queue().await.unwrap();
assert_eq!(messages.len(), 1);
let mut queued_msg = messages.remove(0);
queued_msg.mark_delivery_attempt();
assert_eq!(queued_msg.attempts, 1);
assert!(queued_msg.is_delivering());
assert!(queued_msg.last_attempt.is_some());
manager.update_message_status(&message.message_id, &queued_msg).await.unwrap();
let updated_messages = manager.read_queue().await.unwrap();
assert_eq!(updated_messages.len(), 1);
let persisted = &updated_messages[0];
assert_eq!(persisted.attempts, 1, "Attempts should be persisted");
assert!(persisted.last_attempt.is_some(), "Last attempt time should be persisted");
});
}
#[test]
fn delivery_attempt_tracking_failure_reason(
message in arbitrary_valid_message(),
failure_reason in "[a-zA-Z0-9 ]{1,100}"
) {
tokio_test::block_on(async {
let (manager, _temp) = create_test_manager_for_props().await;
manager.enqueue(&message).await.unwrap();
let mut messages = manager.read_queue().await.unwrap();
let mut queued_msg = messages.remove(0);
queued_msg.mark_failed(failure_reason.clone());
assert!(queued_msg.is_failed());
assert_eq!(queued_msg.get_failure_reason(), Some(failure_reason.as_str()));
manager.update_message_status(&message.message_id, &queued_msg).await.unwrap();
let updated_messages = manager.read_queue().await.unwrap();
assert_eq!(updated_messages.len(), 1);
let persisted = &updated_messages[0];
assert!(persisted.is_failed(), "Failed status should be persisted");
assert_eq!(
persisted.get_failure_reason(),
Some(failure_reason.as_str()),
"Failure reason should be persisted"
);
});
}
#[test]
fn delivery_attempt_tracking_retry_logic(
message in arbitrary_valid_message()
) {
tokio_test::block_on(async {
let (manager, _temp) = create_test_manager_for_props().await;
let mut test_message = message;
test_message.expires_at = Some(chrono::Utc::now() + chrono::Duration::hours(24));
test_message.delivery_attempts = 0;
manager.enqueue(&test_message).await.unwrap();
let messages = manager.read_queue().await.unwrap();
let queued_msg = &messages[0];
assert!(queued_msg.should_retry(), "Fresh message should be retryable");
let mut updated_msg = queued_msg.clone();
updated_msg.mark_delivery_attempt();
updated_msg.mark_failed("Temporary error".to_string());
updated_msg.reset_to_pending();
assert!(updated_msg.is_pending(), "Reset message should be pending");
assert!(updated_msg.should_retry(), "Reset message should be retryable");
manager.update_message_status(&test_message.message_id, &updated_msg).await.unwrap();
let pending = manager.get_pending_messages().await.unwrap();
assert_eq!(pending.len(), 1, "Retryable message should appear in pending");
assert!(pending[0].should_retry(), "Pending message should be retryable");
});
}
#[test]
fn error_logging_invalid_yaml_parsing(
invalid_content in "[^{}]*\\{[^{}]*" ) {
tokio_test::block_on(async {
let (manager, _temp) = create_test_manager_for_props().await;
let queue_path = manager.queue_path();
let invalid_file = queue_path.join("invalid-msg.yaml");
fs::write(&invalid_file, &invalid_content).await.unwrap();
let messages = manager.read_queue().await.unwrap();
assert!(
messages.is_empty() || messages.iter().all(|m| !m.message.message_id.contains("invalid")),
"Invalid messages should be isolated and not included in queue"
);
let _ = fs::remove_file(&invalid_file).await;
});
}
#[test]
fn error_logging_message_validation_failures(
from_expert_id in 0u32..100,
recipient in arbitrary_message_recipient()
) {
tokio_test::block_on(async {
let (manager, _temp) = create_test_manager_for_props().await;
let invalid_content1 = MessageContent {
subject: "".to_string(), body: "Valid body content".to_string(),
};
let message1 = Message::new(from_expert_id, recipient.clone(), MessageType::Query, invalid_content1);
let validation_result = manager.validate_message(&message1);
assert!(
validation_result.is_err(),
"Empty subject should fail validation"
);
let invalid_content2 = MessageContent {
subject: "Valid subject".to_string(),
body: "".to_string(), };
let message2 = Message::new(from_expert_id, recipient.clone(), MessageType::Query, invalid_content2);
let validation_result2 = manager.validate_message(&message2);
assert!(
validation_result2.is_err(),
"Empty body should fail validation"
);
let valid_content = MessageContent {
subject: "Valid subject".to_string(),
body: "Valid body".to_string(),
};
let mut message3 = Message::new(from_expert_id, recipient, MessageType::Query, valid_content);
message3.message_id = "".to_string();
let validation_result3 = manager.validate_message(&message3);
assert!(
validation_result3.is_err(),
"Empty message ID should fail validation"
);
});
}
#[test]
fn error_logging_file_system_isolation(
message in arbitrary_valid_message()
) {
tokio_test::block_on(async {
let (manager, _temp) = create_test_manager_for_props().await;
manager.enqueue(&message).await.unwrap();
let queue_path = manager.queue_path();
let non_yaml_file = queue_path.join("not-a-yaml.txt");
fs::write(&non_yaml_file, "This is not YAML").await.unwrap();
let dir_in_queue = queue_path.join("subdir");
fs::create_dir_all(&dir_in_queue).await.unwrap();
let messages = manager.read_queue().await.unwrap();
assert_eq!(
messages.len(), 1,
"Only valid YAML message files should be read"
);
assert_eq!(
messages[0].message.message_id, message.message_id,
"The valid message should be returned"
);
let _ = fs::remove_file(&non_yaml_file).await;
let _ = fs::remove_dir(&dir_in_queue).await;
});
}
#[test]
fn error_logging_outbox_error_isolation(
valid_message in arbitrary_valid_message()
) {
tokio_test::block_on(async {
let (manager, _temp) = create_test_manager_for_props().await;
let outbox_path = manager.outbox_path();
let valid_file = outbox_path.join(format!("{}.yaml", valid_message.message_id));
let valid_yaml = serde_yaml::to_string(&valid_message).unwrap();
fs::write(&valid_file, valid_yaml).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
let invalid_file = outbox_path.join("invalid-outbox.yaml");
fs::write(&invalid_file, "invalid: yaml: [\ncontent").await.unwrap();
let incomplete_file = outbox_path.join("incomplete-outbox.yaml");
fs::write(&incomplete_file, "message_id: incomplete\n").await.unwrap();
let processed = manager.process_outbox().await.unwrap();
assert_eq!(
processed.len(), 1,
"Only valid messages should be processed from outbox"
);
assert_eq!(
processed[0], valid_message.message_id,
"The valid message should be processed"
);
let queued = manager.read_queue().await.unwrap();
assert_eq!(queued.len(), 1);
assert_eq!(queued[0].message.message_id, valid_message.message_id);
assert!(invalid_file.exists(), "Invalid file should not be deleted on error");
assert!(incomplete_file.exists(), "Incomplete file should not be deleted on error");
let _ = fs::remove_file(&invalid_file).await;
let _ = fs::remove_file(&incomplete_file).await;
});
}
#[test]
fn error_logging_delivery_lifecycle_events(
message in arbitrary_valid_message()
) {
tokio_test::block_on(async {
let (manager, _temp) = create_test_manager_for_props().await;
manager.enqueue(&message).await.unwrap();
let messages = manager.read_queue().await.unwrap();
assert_eq!(messages.len(), 1);
manager.update_delivery_attempts(&message.message_id, 1).await.unwrap();
let mut queued_msg = messages[0].clone();
queued_msg.mark_delivery_attempt();
manager.update_message_status(&message.message_id, &queued_msg).await.unwrap();
manager.dequeue(&message.message_id).await.unwrap();
let remaining = manager.read_queue().await.unwrap();
assert!(remaining.is_empty());
});
}
#[test]
fn error_logging_cleanup_events(
messages in prop::collection::vec(arbitrary_valid_message(), 2..5)
) {
tokio_test::block_on(async {
let (manager, _temp) = create_test_manager_for_props().await;
let mut expired_count = 0;
let mut max_attempts_count = 0;
let mut valid_count = 0;
for (i, mut message) in messages.into_iter().enumerate() {
message.message_id = format!("{}-{}", message.message_id, i);
match i % 3 {
0 => {
message.expires_at = Some(chrono::Utc::now() - chrono::Duration::seconds(1));
expired_count += 1;
},
1 => {
message.delivery_attempts = crate::models::MAX_DELIVERY_ATTEMPTS;
max_attempts_count += 1;
},
_ => {
message.expires_at = Some(chrono::Utc::now() + chrono::Duration::hours(24));
message.delivery_attempts = 0;
valid_count += 1;
}
}
manager.enqueue(&message).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(2)).await;
}
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
let removed = manager.cleanup_expired_messages().await.unwrap();
let expected_removed = expired_count + max_attempts_count;
assert_eq!(
removed.len(), expected_removed,
"Should remove {} expired/max-attempts messages, removed {}",
expected_removed, removed.len()
);
let remaining = manager.read_queue().await.unwrap();
assert_eq!(
remaining.len(), valid_count,
"Should have {} valid messages remaining",
valid_count
);
});
}
#[test]
fn error_type_coverage(
path in "[a-zA-Z0-9/]{1,50}",
message_id in "[a-zA-Z0-9-]{1,30}",
_field in "[a-zA-Z]{1,20}",
reason in "[a-zA-Z0-9 ]{1,100}"
) {
let io_error = QueueError::io(
"read",
path.clone(),
std::io::Error::new(std::io::ErrorKind::NotFound, "test error")
);
let io_msg = io_error.to_string();
assert!(io_msg.contains("I/O error"));
assert!(io_msg.contains(&path));
let yaml_error = QueueError::yaml_parsing(
"message",
path.clone(),
"invalid syntax"
);
let yaml_msg = yaml_error.to_string();
assert!(yaml_msg.contains("YAML parsing error"));
assert!(yaml_msg.contains(&path));
let validation_error = QueueError::validation(
"subject",
reason.clone()
);
let validation_msg = validation_error.to_string();
assert!(validation_msg.contains("validation error"));
assert!(validation_msg.contains(&reason));
let not_found = QueueError::MessageNotFound {
message_id: message_id.clone()
};
let not_found_msg = not_found.to_string();
assert!(not_found_msg.contains("not found"));
assert!(not_found_msg.contains(&message_id));
let dir_error = QueueError::QueueDirectory {
path: path.clone(),
reason: reason.clone()
};
let dir_msg = dir_error.to_string();
assert!(dir_msg.contains("Queue directory error"));
assert!(dir_msg.contains(&path));
let atomic_error = QueueError::AtomicWrite {
path: path.clone(),
reason: reason.clone()
};
let atomic_msg = atomic_error.to_string();
assert!(atomic_msg.contains("Atomic write failed"));
assert!(atomic_msg.contains(&path));
}
}
}