rabbitmq-backup-core 0.1.0

Core engine for RabbitMQ backup and restore operations
Documentation
//! AMQP message publisher for restore operations.
//!
//! Publishes backed-up messages to a target RabbitMQ cluster with publisher confirms.

use tracing::{debug, info};

use crate::amqp::{AmqpClient, ConfirmStats};
use crate::backup::queue_reader::{to_amqp_properties, to_field_table};
use crate::error::Result;
use crate::manifest::BackupRecord;

const RESTORE_PUBLISH_MANDATORY: bool = true;

fn restore_publish_mandatory() -> bool {
    RESTORE_PUBLISH_MANDATORY
}

/// Publishes messages to a target RabbitMQ cluster with publisher confirms.
pub struct Publisher {
    client: AmqpClient,
    channel_id: u16,
    confirms_enabled: bool,
}

impl Publisher {
    /// Create a new publisher from an existing AMQP connection and channel.
    pub fn new(client: AmqpClient, channel_id: u16) -> Self {
        Self {
            client,
            channel_id,
            confirms_enabled: false,
        }
    }

    /// Enable publisher confirms on the channel.
    pub async fn enable_confirms(&mut self) -> Result<()> {
        self.client.confirm_select(self.channel_id).await?;
        self.confirms_enabled = true;
        Ok(())
    }

    /// Publish a single record to the given exchange + routing key.
    /// Returns the delivery tag for confirm tracking.
    pub async fn publish_record(
        &mut self,
        record: &BackupRecord,
        exchange: &str,
        routing_key: &str,
    ) -> Result<u64> {
        // Reconstruct AMQPProperties from BackupProperties
        let mut props = to_amqp_properties(&record.properties);

        // Reconstruct headers if present
        if !record.headers.is_empty() {
            props = props.with_headers(to_field_table(&record.headers));
        }

        let body = record.body.as_deref().unwrap_or(&[]);

        self.client
            .basic_publish(
                self.channel_id,
                exchange,
                routing_key,
                restore_publish_mandatory(),
                &props,
                body,
            )
            .await
    }

    pub async fn wait_for_confirms(&mut self, up_to_tag: u64) -> Result<ConfirmStats> {
        if self.confirms_enabled && up_to_tag > 0 {
            self.client
                .wait_for_confirms(self.channel_id, up_to_tag)
                .await
        } else {
            Ok(ConfirmStats::default())
        }
    }

    /// Publish a batch of records and wait for all confirms.
    /// Returns (confirmed_count, nacked_count).
    pub async fn publish_batch(
        &mut self,
        records: &[BackupRecord],
        exchange: &str,
        routing_key: &str,
    ) -> Result<(u64, u64)> {
        if records.is_empty() {
            return Ok((0, 0));
        }

        let mut last_tag = 0u64;
        for record in records {
            last_tag = self.publish_record(record, exchange, routing_key).await?;
        }

        // Wait for all confirms
        let confirm_stats = self.wait_for_confirms(last_tag).await?;
        let failed = confirm_stats.failed();

        let confirmed = (records.len() as u64).saturating_sub(failed);
        debug!(
            "Batch published: {} confirmed, {} failed ({:?})",
            confirmed, failed, confirm_stats
        );

        Ok((confirmed, failed))
    }

    /// Close the publisher's channel and connection.
    pub async fn close(mut self) {
        self.client.close_channel(self.channel_id).await.ok();
        self.client.close().await.ok();
        info!("Publisher closed");
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_restore_publish_uses_mandatory_routing() {
        assert!(restore_publish_mandatory());
    }
}