rabbitmq-backup-core 0.1.0

Core engine for RabbitMQ backup and restore operations
Documentation
//! Backup manifest and message record types.

use bytes::Bytes;
use serde::{Deserialize, Serialize};

use crate::config::QueueType;
use crate::error::Result;

/// Top-level backup manifest stored at `{prefix}/{backup_id}/manifest.json`.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackupManifest {
    pub backup_id: String,
    pub created_at: i64,
    pub completed_at: Option<i64>,
    pub source_cluster: Option<String>,
    pub rabbitmq_version: Option<String>,
    pub backup_tool_version: String,
    pub definitions: Option<DefinitionsBackup>,
    pub queues: Vec<QueueBackup>,
    pub total_messages: u64,
    pub total_bytes: u64,
    pub total_segments: u64,
}

impl BackupManifest {
    /// Create a new manifest for a backup run.
    pub fn new(backup_id: String, tool_version: String) -> Self {
        Self {
            backup_id,
            created_at: chrono::Utc::now().timestamp_millis(),
            completed_at: None,
            source_cluster: None,
            rabbitmq_version: None,
            backup_tool_version: tool_version,
            definitions: None,
            queues: Vec::new(),
            total_messages: 0,
            total_bytes: 0,
            total_segments: 0,
        }
    }

    /// Set source cluster info from the Management API overview.
    pub fn set_source_info(
        &mut self,
        cluster_name: Option<String>,
        rabbitmq_version: Option<String>,
    ) {
        self.source_cluster = cluster_name;
        self.rabbitmq_version = rabbitmq_version;
    }

    /// Set the definitions backup metadata.
    pub fn set_definitions(&mut self, definitions: DefinitionsBackup) {
        self.definitions = Some(definitions);
    }

    /// Get or create a queue entry in the manifest.
    pub fn get_or_create_queue(
        &mut self,
        vhost: &str,
        name: &str,
        queue_type: QueueType,
    ) -> &mut QueueBackup {
        let pos = self
            .queues
            .iter()
            .position(|q| q.vhost == vhost && q.name == name);
        match pos {
            Some(idx) => &mut self.queues[idx],
            None => {
                self.queues.push(QueueBackup {
                    vhost: vhost.to_string(),
                    name: name.to_string(),
                    queue_type,
                    segments: Vec::new(),
                    message_count: 0,
                    first_message_timestamp: None,
                    last_message_timestamp: None,
                });
                self.queues.last_mut().unwrap()
            }
        }
    }

    /// Add a segment to a queue in the manifest.
    pub fn add_segment(
        &mut self,
        vhost: &str,
        queue: &str,
        queue_type: QueueType,
        metadata: SegmentMetadata,
    ) {
        let queue_backup = self.get_or_create_queue(vhost, queue, queue_type);
        queue_backup.message_count += metadata.record_count;

        if let Some(ts) = metadata.first_timestamp {
            if queue_backup.first_message_timestamp.is_none()
                || Some(ts) < queue_backup.first_message_timestamp
            {
                queue_backup.first_message_timestamp = Some(ts);
            }
        }
        if let Some(ts) = metadata.last_timestamp {
            if queue_backup.last_message_timestamp.is_none()
                || Some(ts) > queue_backup.last_message_timestamp
            {
                queue_backup.last_message_timestamp = Some(ts);
            }
        }

        queue_backup.segments.push(metadata);
    }

    /// Finalize the manifest after backup completes.
    pub fn finalize(&mut self) {
        self.completed_at = Some(chrono::Utc::now().timestamp_millis());
        self.total_messages = self.queues.iter().map(|q| q.message_count).sum();
        self.total_bytes = self
            .queues
            .iter()
            .flat_map(|q| q.segments.iter())
            .map(|s| s.size_bytes)
            .sum();
        self.total_segments = self.queues.iter().map(|q| q.segments.len() as u64).sum();
    }

    /// Serialize to pretty-printed JSON bytes.
    pub fn to_json(&self) -> Result<Bytes> {
        let json = serde_json::to_vec_pretty(self)?;
        Ok(Bytes::from(json))
    }

    /// Deserialize from JSON bytes.
    pub fn from_json(data: &[u8]) -> Result<Self> {
        let manifest: Self = serde_json::from_slice(data)?;
        Ok(manifest)
    }
}

/// Metadata about definitions included in the backup.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DefinitionsBackup {
    pub key: String,
    pub vhost_count: usize,
    pub queue_count: usize,
    pub exchange_count: usize,
    pub user_count: usize,
    pub size_bytes: u64,
}

/// Per-queue backup metadata.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueBackup {
    pub vhost: String,
    pub name: String,
    pub queue_type: QueueType,
    pub segments: Vec<SegmentMetadata>,
    pub message_count: u64,
    pub first_message_timestamp: Option<i64>,
    pub last_message_timestamp: Option<i64>,
}

/// Per-segment metadata.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SegmentMetadata {
    pub key: String,
    pub sequence: u64,
    pub record_count: u64,
    pub size_bytes: u64,
    pub uncompressed_bytes: u64,
    pub first_timestamp: Option<i64>,
    pub last_timestamp: Option<i64>,
    pub checksum: String,
}

/// A single backed-up message with all AMQP metadata.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackupRecord {
    /// Message body (opaque bytes)
    pub body: Option<Vec<u8>>,

    /// AMQP basic properties
    pub properties: BackupProperties,

    /// Message headers (from basic.properties.headers)
    pub headers: Vec<(String, BackupHeaderValue)>,

    /// Exchange the message was published to
    pub exchange: String,

    /// Routing key
    pub routing_key: String,

    /// Delivery tag from the broker
    pub delivery_tag: u64,

    /// Whether the message was redelivered
    pub redelivered: bool,

    /// Epoch millis when this record was captured
    pub backed_up_at: i64,

    /// Queue name this message was read from
    pub source_queue: String,

    /// Vhost of the source queue
    pub source_vhost: String,
}

/// AMQP basic properties preserved from the original message.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct BackupProperties {
    pub content_type: Option<String>,
    pub content_encoding: Option<String>,
    pub delivery_mode: Option<u8>,
    pub priority: Option<u8>,
    pub correlation_id: Option<String>,
    pub reply_to: Option<String>,
    pub expiration: Option<String>,
    pub message_id: Option<String>,
    pub timestamp: Option<i64>,
    pub type_field: Option<String>,
    pub user_id: Option<String>,
    pub app_id: Option<String>,
    pub cluster_id: Option<String>,
}

/// AMQP header value types.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum BackupHeaderValue {
    LongString(String),
    LongStringBytes(Vec<u8>),
    ShortString(String),
    Long(i64),
    Short(i16),
    ShortShortInt(i8),
    ShortShortUInt(u8),
    ShortInt(i16),
    ShortUInt(u16),
    LongInt(i32),
    LongUInt(u32),
    LongLongInt(i64),
    Bool(bool),
    Bytes(Vec<u8>),
    Timestamp(i64),
    Float(f32),
    Double(f64),
    Decimal { scale: u8, value: u32 },
    Void,
    Table(Vec<(String, BackupHeaderValue)>),
    Array(Vec<BackupHeaderValue>),
}