rabbitmq-backup-core 0.1.0

Core engine for RabbitMQ backup and restore operations
Documentation
//! Stream protocol reader for RabbitMQ Streams.
//!
//! Reads messages from RabbitMQ Streams using the native stream protocol.
//! Streams are append-only logs — reads are inherently non-destructive.

use std::sync::Arc;
use std::time::Duration;

use futures::StreamExt;
use rabbitmq_stream_client::types::{Delivery, OffsetSpecification};
use tokio::sync::{broadcast, Mutex};
use tracing::{debug, info, warn};

use crate::compression;
use crate::config::{BackupOptions, CompressionType, QueueType};
use crate::error::Result;
use crate::manifest::{BackupManifest, BackupProperties, BackupRecord};
use crate::offset_store::{QueueProgressUpdate, SqliteOffsetStore};
use crate::segment::SegmentWriter;
use crate::storage::StorageBackend;
use crate::stream::StreamClient;

#[derive(Clone)]
pub struct StreamCheckpoint {
    pub store: Arc<SqliteOffsetStore>,
    pub remote_key: Option<String>,
}

impl StreamCheckpoint {
    async fn sync(&self, storage: &Arc<dyn StorageBackend>) -> Result<()> {
        if let Some(key) = &self.remote_key {
            self.store.sync_to_storage(storage, key).await?;
        } else {
            self.store.checkpoint().await?;
        }
        Ok(())
    }
}

/// Reads messages from RabbitMQ Streams using the native stream protocol.
/// Streams are append-only logs supporting non-destructive offset-based reads.
pub struct StreamReader;

impl StreamReader {
    /// Back up a stream by reading from the given offset to the end.
    /// Writes messages to segments and updates the manifest.
    #[allow(clippy::too_many_arguments)]
    pub async fn backup_stream(
        client: &StreamClient,
        stream_name: &str,
        vhost: &str,
        start_offset: OffsetSpecification,
        backup_id: &str,
        backup_opts: &BackupOptions,
        storage: &Arc<dyn StorageBackend>,
        manifest: &Arc<Mutex<BackupManifest>>,
        checkpoint: Option<StreamCheckpoint>,
        target_message_count: u64,
        mut shutdown_rx: broadcast::Receiver<()>,
    ) -> Result<u64> {
        info!("Backing up stream {} (vhost={})", stream_name, vhost);

        let mut consumer = client.create_consumer(stream_name, start_offset).await?;

        let mut segment_writer = SegmentWriter::new(1);
        let mut received_count = 0u64;
        let mut last_offset = None;
        let mut last_segment_sequence = 0u64;
        let read_timeout = Duration::from_secs(10);
        let mut shutdown_closed = false;

        loop {
            tokio::select! {
                shutdown = shutdown_rx.recv(), if !shutdown_closed => {
                    match shutdown {
                        Ok(()) => {
                            info!("Shutdown signal received for stream {}", stream_name);
                            break;
                        }
                        Err(broadcast::error::RecvError::Closed) => {
                            shutdown_closed = true;
                        }
                        Err(broadcast::error::RecvError::Lagged(_)) => {
                            info!("Shutdown signal received for stream {}", stream_name);
                            break;
                        }
                    }
                }
                delivery = tokio::time::timeout(read_timeout, consumer.next()) => {
                    match delivery {
                        Ok(Some(Ok(delivery))) => {
                            last_offset = Some(delivery.offset());
                            let record = delivery_to_record(&delivery, stream_name, vhost);

                            segment_writer.add_record(&record)?;
                            received_count += 1;

                            if received_count.is_multiple_of(1000) {
                                debug!("Stream {}: {} messages received", stream_name, received_count);
                            }

                            // Check segment rotation
                            if segment_writer.should_rotate(
                                backup_opts.segment_max_bytes,
                                backup_opts.segment_max_interval_ms,
                            ) {
                                let key = stream_segment_key(
                                    backup_id, vhost, stream_name,
                                    segment_writer.sequence(), backup_opts.compression,
                                );
                                let finalized = segment_writer.finalize(
                                    backup_opts.compression, backup_opts.compression_level, key,
                                )?;
                                storage.put(&finalized.metadata.key, finalized.data).await?;
                                last_segment_sequence = finalized.metadata.sequence;
                                manifest.lock().await.add_segment(
                                    vhost, stream_name, QueueType::Stream, finalized.metadata,
                                );
                                set_stream_progress(
                                    &checkpoint,
                                    backup_id,
                                    vhost,
                                    stream_name,
                                    last_offset.unwrap_or(0),
                                    last_segment_sequence,
                                    target_message_count,
                                    false,
                                    storage,
                                )
                                .await?;
                            }
                        }
                        Ok(Some(Err(e))) => {
                            warn!("Stream delivery error: {:?}", e);
                            break;
                        }
                        Ok(None) => {
                            // Stream ended
                            debug!("Stream {} consumer ended", stream_name);
                            break;
                        }
                        Err(_) => {
                            // Timeout — no more messages
                            debug!("Stream {}: no more messages after {}", stream_name, received_count);
                            break;
                        }
                    }
                }
            }
        }

        // Flush final segment
        if segment_writer.has_records() {
            let key = stream_segment_key(
                backup_id,
                vhost,
                stream_name,
                segment_writer.sequence(),
                backup_opts.compression,
            );
            let finalized = segment_writer.finalize(
                backup_opts.compression,
                backup_opts.compression_level,
                key,
            )?;
            storage.put(&finalized.metadata.key, finalized.data).await?;
            last_segment_sequence = finalized.metadata.sequence;
            manifest.lock().await.add_segment(
                vhost,
                stream_name,
                QueueType::Stream,
                finalized.metadata,
            );
        }

        set_stream_progress(
            &checkpoint,
            backup_id,
            vhost,
            stream_name,
            last_offset.unwrap_or(0),
            last_segment_sequence,
            target_message_count,
            true,
            storage,
        )
        .await?;

        info!(
            "Stream {} backed up: {} messages",
            stream_name, received_count
        );
        Ok(received_count)
    }
}

#[allow(clippy::too_many_arguments)]
async fn set_stream_progress(
    checkpoint: &Option<StreamCheckpoint>,
    backup_id: &str,
    vhost: &str,
    stream_name: &str,
    last_offset: u64,
    last_segment_sequence: u64,
    target_message_count: u64,
    completed: bool,
    storage: &Arc<dyn StorageBackend>,
) -> Result<()> {
    if let Some(checkpoint) = checkpoint {
        checkpoint
            .store
            .set_progress_state(QueueProgressUpdate {
                backup_id,
                vhost,
                queue_name: stream_name,
                messages_backed_up: last_offset,
                last_segment_sequence,
                target_message_count,
                completed,
            })
            .await?;
        checkpoint.sync(storage).await?;
    }

    Ok(())
}

/// Convert a stream delivery to a BackupRecord.
fn delivery_to_record(delivery: &Delivery, stream_name: &str, vhost: &str) -> BackupRecord {
    let msg = delivery.message();

    // Extract body
    let body = msg.data().map(|data| data.to_vec());

    // Extract properties from AMQP 1.0 message
    let properties = if let Some(props) = msg.properties() {
        BackupProperties {
            content_type: props.content_type.as_ref().map(|s| s.to_string()),
            content_encoding: props.content_encoding.as_ref().map(|s| s.to_string()),
            message_id: props.message_id.as_ref().map(|v| format!("{:?}", v)),
            correlation_id: props.correlation_id.as_ref().map(|v| format!("{:?}", v)),
            reply_to: props.reply_to.as_ref().map(|s| s.to_string()),
            timestamp: None, // Stream timestamps require DateTime conversion
            ..Default::default()
        }
    } else {
        BackupProperties::default()
    };

    // Extract application properties as headers
    let headers = msg
        .application_properties()
        .map(|app_props| {
            app_props
                .iter()
                .map(|(k, v)| {
                    (
                        k.to_string(),
                        crate::manifest::BackupHeaderValue::LongString(format!("{:?}", v)),
                    )
                })
                .collect()
        })
        .unwrap_or_default();

    BackupRecord {
        body,
        properties,
        headers,
        exchange: String::new(),
        routing_key: String::new(),
        delivery_tag: delivery.offset(),
        redelivered: false,
        backed_up_at: chrono::Utc::now().timestamp_millis(),
        source_queue: stream_name.to_string(),
        source_vhost: vhost.to_string(),
    }
}

fn stream_segment_key(
    backup_id: &str,
    vhost: &str,
    stream: &str,
    seq: u64,
    compression_type: CompressionType,
) -> String {
    let vhost_safe = if vhost == "/" { "_default" } else { vhost };
    format!(
        "{}/queues/{}/{}/segment-{:04}{}",
        backup_id,
        vhost_safe,
        stream,
        seq,
        compression::extension(compression_type)
    )
}