qrusty 0.20.7

A trusty priority queue server built with Rust
Documentation
// src/message.rs

//! # Message Types
//!
//! This module defines the core data structures used throughout Qrusty for
//! representing messages and queue statistics.

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::fmt;

// Re-export PayloadRef so it can be used alongside Message.
pub use crate::payload_store::PayloadRef;

/// Priority ordering for queues.
///
/// Determines whether higher or lower priority values are processed first.
/// This affects the order in which messages are retrieved from the queue.
///
/// # Examples
///
/// ```rust
/// use qrusty::message::PriorityOrdering;
///
/// // Higher priority values (e.g., 100) processed before lower values (e.g., 10)
/// let max_first = PriorityOrdering::MaxFirst;
///
/// // Lower priority values (e.g., 10) processed before higher values (e.g., 100)
/// let min_first = PriorityOrdering::MinFirst;
///
/// // First-in-first-out ordering, ignoring priority values
/// let fifo = PriorityOrdering::Fifo;
/// ```
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum PriorityOrdering {
    /// Higher priority values are processed first (default behavior)
    ///
    /// In this mode:
    /// - Priority 100 is processed before priority 50
    /// - Priority 50 is processed before priority 10
    /// - This is the traditional "high priority = urgent" model
    MaxFirst,

    /// Lower priority values are processed first
    ///
    /// In this mode:
    /// - Priority 10 is processed before priority 50  
    /// - Priority 50 is processed before priority 100
    /// - This follows the Unix process priority model where lower = higher priority
    MinFirst,

    /// First-in-first-out ordering, priority values are ignored
    ///
    /// In this mode:
    /// - Messages are processed in arrival order regardless of priority
    /// - The first message added is the first message processed
    /// - Priority values are still stored but don't affect ordering
    Fifo,
}

impl Default for PriorityOrdering {
    /// Returns `MaxFirst` as the default ordering to maintain backward compatibility.
    fn default() -> Self {
        PriorityOrdering::MaxFirst
    }
}

/// Priority value for a message — either a numeric u64 or a lexicographic string key.
///
/// Numeric priorities use zero-padded decimal for storage key ordering.
/// Text priorities use raw UTF-8 for MinFirst/Fifo and byte-complement hex
/// encoding for MaxFirst, ensuring correct lexicographic sort in both directions.
///
/// Deserialized with `#[serde(untagged)]` so JSON accepts both:
/// - `"priority": 100`   → `Priority::Numeric(100)`
/// - `"priority": "abc"` → `Priority::Text("abc")`
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(untagged)]
pub enum Priority {
    Numeric(u64),
    Text(String),
}

impl Default for Priority {
    fn default() -> Self {
        Priority::Numeric(0)
    }
}

impl fmt::Display for Priority {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Priority::Numeric(n) => write!(f, "{}", n),
            Priority::Text(s) => write!(f, "{}", s),
        }
    }
}

impl Priority {
    /// Returns the `PriorityKind` of this value.
    pub fn kind(&self) -> PriorityKind {
        match self {
            Priority::Numeric(_) => PriorityKind::Numeric,
            Priority::Text(_) => PriorityKind::Text,
        }
    }
}

/// Distinguishes numeric vs. text priority for queue-level enforcement.
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum PriorityKind {
    #[default]
    Numeric,
    Text,
}

/// Configuration for a queue's behavior.
///
/// Defines how a queue should handle message ordering and other behavioral settings.
/// This configuration is set when a queue is first created and affects all messages
/// in that queue.
///
/// # Examples
///
/// ```rust
/// use qrusty::message::{QueueConfig, PriorityOrdering};
///
/// // Traditional high-priority-first queue (with defaults)
/// let high_first_config = QueueConfig {
///     ordering: PriorityOrdering::MaxFirst,
///     ..Default::default()
/// };
///
/// // Unix-style low-priority-first queue
/// let low_first_config = QueueConfig {
///     ordering: PriorityOrdering::MinFirst,
///     allow_duplicates: true,
///     ..Default::default()
/// };
///
/// // Queue that rejects duplicate payloads
/// let no_dups_config = QueueConfig {
///     ordering: PriorityOrdering::MaxFirst,
///     allow_duplicates: false,
///     ..Default::default()
/// };
/// ```
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueConfig {
    /// How messages should be ordered by priority
    pub ordering: PriorityOrdering,
    /// Whether to allow messages with duplicate payloads (default: true)
    #[serde(default = "default_allow_duplicates")]
    pub allow_duplicates: bool,
    /// Whether priorities are numeric (u64) or text (string).
    /// Defaults to Numeric for backward compatibility with existing queues.
    #[serde(default)]
    pub priority_kind: PriorityKind,
}

fn default_allow_duplicates() -> bool {
    true
}

impl Default for QueueConfig {
    /// Returns default configuration with `MaxFirst` ordering for backward compatibility.
    fn default() -> Self {
        QueueConfig {
            ordering: PriorityOrdering::MaxFirst,
            allow_duplicates: true,
            priority_kind: PriorityKind::Numeric,
        }
    }
}

/// A message in the priority queue system.
///
/// Messages are the fundamental unit of work in Qrusty. Each message has a priority,
/// payload, and metadata for tracking its lifecycle through the queue system.
///
/// # Key Design Features
///
/// - **Priority**: Numeric (u64) or text (String) for lexicographic ordering
/// - **Persistence**: All fields are serializable for storage
/// - **Locking**: Messages can be locked by consumers with timeouts
/// - **Retry Logic**: Built-in retry counting with configurable limits
/// - **Dead Letter Queue**: Messages exceeding max_retries are moved to DLQ
///
/// # Examples
///
/// ```rust
/// use chrono::Utc;
/// use qrusty::message::{Message, Priority};
///
/// let msg = Message {
///     id: "msg-123".to_string(),
///     queue: "orders".to_string(),
///     priority: Priority::Numeric(100),
///     payload: r#"{"order_id": 456}"#.to_string(),
///     created_at: Utc::now(),
///     locked_until: None,
///     locked_by: None,
///     retry_count: 0,
///     max_retries: 3,
///     payload_ref: None,
///     payload_hash: None,
/// };
/// ```
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Message {
    /// Unique identifier for this message (UUID v4)
    pub id: String,

    /// Name of the queue this message belongs to
    pub queue: String,

    /// Priority value — numeric (u64) or text (String) for lexicographic ordering
    pub priority: Priority,

    /// Message payload as a JSON string
    /// Can contain any valid JSON data structure
    pub payload: String,

    /// Timestamp when the message was first created
    pub created_at: DateTime<Utc>,

    /// When this message's lock expires (None if not locked)
    /// Messages are automatically unlocked when this time passes
    pub locked_until: Option<DateTime<Utc>>,

    /// ID of the consumer that currently has this message locked
    /// None if the message is not locked
    pub locked_by: Option<String>,

    /// Number of times this message has been consumed
    /// Incremented each time a consumer pops this message
    pub retry_count: u32,

    /// Maximum number of retries before moving to dead letter queue
    /// Default is 3, configurable per message
    pub max_retries: u32,

    /// Optional reference to an externally-stored payload (PER-0016).
    /// When present, the `payload` field is empty and the actual content
    /// lives in the PayloadStore mmap'd file.  Old messages without this
    /// field use the inline `payload` string (backward compatible).
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub payload_ref: Option<PayloadRef>,

    /// Cached xxh3_128 hash of the original payload (PER-0014).
    /// Stored alongside externalized payloads so dedup set cleanup
    /// doesn't require reading the payload back from the store.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub payload_hash: Option<[u8; 16]>,
}

/// Statistics for a specific queue.
///
/// Provides insight into queue health and performance, including message
/// counts broken down by availability status.
///
/// # Examples
///
/// ```rust
/// use qrusty::message::{QueueStats, QueueConfig};
///
/// let stats = QueueStats {
///     name: "orders".to_string(),
///     available: 150,
///     locked: 25,
///     total: 175,
///     config: QueueConfig::default(),
/// };
///
/// println!("Queue {} has {} messages ready for processing",
///          stats.name, stats.available);
/// ```
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueStats {
    /// Name of the queue
    pub name: String,

    /// Number of messages available for consumption (not locked)
    pub available: usize,

    /// Number of messages currently locked by consumers
    pub locked: usize,

    /// Total number of messages in the queue (available + locked)
    pub total: usize,

    /// Queue configuration (ordering preference)
    pub config: QueueConfig,
}

/// Result of a batch ACK operation.
///
/// Reports which message IDs were successfully deleted and which were not
/// found (either missing or locked by a different consumer).
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct BatchAckResult {
    /// IDs that were found, ownership-verified, and permanently deleted.
    pub acked: Vec<String>,
    /// IDs that were not found or were locked by a different consumer.
    pub not_found: Vec<String>,
}

/// Result of a batch NACK operation.
///
/// Reports the outcome for each message ID in the batch.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct BatchNackResult {
    /// IDs unlocked and returned to the queue for retry.
    pub unlocked: Vec<String>,
    /// IDs that exceeded `max_retries` and were moved to the dead-letter queue.
    pub dead_lettered: Vec<String>,
    /// IDs deleted because a duplicate payload already exists in the queue
    /// (only applies when `allow_duplicates = false`).
    pub dropped: Vec<String>,
    /// IDs not found or locked by a different consumer.
    pub not_found: Vec<String>,
}