rabbitmq-backup-core 0.1.0

Core engine for RabbitMQ backup and restore operations
Documentation
//! Stream protocol client for RabbitMQ Streams (x-queue-type: stream).
//!
//! Uses the `rabbitmq-stream-client` crate for native stream protocol access.
//! Streams are append-only logs supporting non-destructive offset-based reads.

use rabbitmq_stream_client::types::OffsetSpecification;
use rabbitmq_stream_client::Environment;
use tracing::{debug, info};

use crate::error::{Error, Result};

/// Client for the RabbitMQ Stream Protocol (port 5552).
pub struct StreamClient {
    environment: Environment,
}

impl StreamClient {
    /// Connect to the RabbitMQ stream protocol endpoint.
    pub async fn connect(host: &str, port: u16, username: &str, password: &str) -> Result<Self> {
        Self::connect_with_vhost(host, port, username, password, "/").await
    }

    /// Connect to the RabbitMQ stream protocol endpoint for a specific vhost.
    pub async fn connect_with_vhost(
        host: &str,
        port: u16,
        username: &str,
        password: &str,
        vhost: &str,
    ) -> Result<Self> {
        info!("Connecting to stream protocol at {}:{}", host, port);

        let environment = Environment::builder()
            .host(host)
            .port(port)
            .username(username)
            .password(password)
            .virtual_host(vhost)
            .build()
            .await
            .map_err(|e| Error::Stream(format!("Failed to connect: {:?}", e)))?;

        debug!("Stream protocol connection established");
        Ok(Self { environment })
    }

    /// Create a consumer for the given stream starting from the specified offset.
    pub async fn create_consumer(
        &self,
        stream: &str,
        offset: OffsetSpecification,
    ) -> Result<rabbitmq_stream_client::Consumer> {
        let consumer = self
            .environment
            .consumer()
            .offset(offset)
            .build(stream)
            .await
            .map_err(|e| {
                Error::Stream(format!("Failed to create consumer for {}: {:?}", stream, e))
            })?;

        debug!("Stream consumer created for {}", stream);
        Ok(consumer)
    }

    /// Get the underlying environment for advanced operations.
    pub fn environment(&self) -> &Environment {
        &self.environment
    }
}