use std::sync::Arc;
use std::time::Duration;
use futures::StreamExt;
use rabbitmq_stream_client::types::{Delivery, OffsetSpecification};
use tokio::sync::{broadcast, Mutex};
use tracing::{debug, info, warn};
use crate::compression;
use crate::config::{BackupOptions, CompressionType, QueueType};
use crate::error::Result;
use crate::manifest::{BackupManifest, BackupProperties, BackupRecord};
use crate::offset_store::{QueueProgressUpdate, SqliteOffsetStore};
use crate::segment::SegmentWriter;
use crate::storage::StorageBackend;
use crate::stream::StreamClient;
#[derive(Clone)]
pub struct StreamCheckpoint {
pub store: Arc<SqliteOffsetStore>,
pub remote_key: Option<String>,
}
impl StreamCheckpoint {
async fn sync(&self, storage: &Arc<dyn StorageBackend>) -> Result<()> {
if let Some(key) = &self.remote_key {
self.store.sync_to_storage(storage, key).await?;
} else {
self.store.checkpoint().await?;
}
Ok(())
}
}
pub struct StreamReader;
impl StreamReader {
#[allow(clippy::too_many_arguments)]
pub async fn backup_stream(
client: &StreamClient,
stream_name: &str,
vhost: &str,
start_offset: OffsetSpecification,
backup_id: &str,
backup_opts: &BackupOptions,
storage: &Arc<dyn StorageBackend>,
manifest: &Arc<Mutex<BackupManifest>>,
checkpoint: Option<StreamCheckpoint>,
target_message_count: u64,
mut shutdown_rx: broadcast::Receiver<()>,
) -> Result<u64> {
info!("Backing up stream {} (vhost={})", stream_name, vhost);
let mut consumer = client.create_consumer(stream_name, start_offset).await?;
let mut segment_writer = SegmentWriter::new(1);
let mut received_count = 0u64;
let mut last_offset = None;
let mut last_segment_sequence = 0u64;
let read_timeout = Duration::from_secs(10);
let mut shutdown_closed = false;
loop {
tokio::select! {
shutdown = shutdown_rx.recv(), if !shutdown_closed => {
match shutdown {
Ok(()) => {
info!("Shutdown signal received for stream {}", stream_name);
break;
}
Err(broadcast::error::RecvError::Closed) => {
shutdown_closed = true;
}
Err(broadcast::error::RecvError::Lagged(_)) => {
info!("Shutdown signal received for stream {}", stream_name);
break;
}
}
}
delivery = tokio::time::timeout(read_timeout, consumer.next()) => {
match delivery {
Ok(Some(Ok(delivery))) => {
last_offset = Some(delivery.offset());
let record = delivery_to_record(&delivery, stream_name, vhost);
segment_writer.add_record(&record)?;
received_count += 1;
if received_count.is_multiple_of(1000) {
debug!("Stream {}: {} messages received", stream_name, received_count);
}
if segment_writer.should_rotate(
backup_opts.segment_max_bytes,
backup_opts.segment_max_interval_ms,
) {
let key = stream_segment_key(
backup_id, vhost, stream_name,
segment_writer.sequence(), backup_opts.compression,
);
let finalized = segment_writer.finalize(
backup_opts.compression, backup_opts.compression_level, key,
)?;
storage.put(&finalized.metadata.key, finalized.data).await?;
last_segment_sequence = finalized.metadata.sequence;
manifest.lock().await.add_segment(
vhost, stream_name, QueueType::Stream, finalized.metadata,
);
set_stream_progress(
&checkpoint,
backup_id,
vhost,
stream_name,
last_offset.unwrap_or(0),
last_segment_sequence,
target_message_count,
false,
storage,
)
.await?;
}
}
Ok(Some(Err(e))) => {
warn!("Stream delivery error: {:?}", e);
break;
}
Ok(None) => {
debug!("Stream {} consumer ended", stream_name);
break;
}
Err(_) => {
debug!("Stream {}: no more messages after {}", stream_name, received_count);
break;
}
}
}
}
}
if segment_writer.has_records() {
let key = stream_segment_key(
backup_id,
vhost,
stream_name,
segment_writer.sequence(),
backup_opts.compression,
);
let finalized = segment_writer.finalize(
backup_opts.compression,
backup_opts.compression_level,
key,
)?;
storage.put(&finalized.metadata.key, finalized.data).await?;
last_segment_sequence = finalized.metadata.sequence;
manifest.lock().await.add_segment(
vhost,
stream_name,
QueueType::Stream,
finalized.metadata,
);
}
set_stream_progress(
&checkpoint,
backup_id,
vhost,
stream_name,
last_offset.unwrap_or(0),
last_segment_sequence,
target_message_count,
true,
storage,
)
.await?;
info!(
"Stream {} backed up: {} messages",
stream_name, received_count
);
Ok(received_count)
}
}
#[allow(clippy::too_many_arguments)]
async fn set_stream_progress(
checkpoint: &Option<StreamCheckpoint>,
backup_id: &str,
vhost: &str,
stream_name: &str,
last_offset: u64,
last_segment_sequence: u64,
target_message_count: u64,
completed: bool,
storage: &Arc<dyn StorageBackend>,
) -> Result<()> {
if let Some(checkpoint) = checkpoint {
checkpoint
.store
.set_progress_state(QueueProgressUpdate {
backup_id,
vhost,
queue_name: stream_name,
messages_backed_up: last_offset,
last_segment_sequence,
target_message_count,
completed,
})
.await?;
checkpoint.sync(storage).await?;
}
Ok(())
}
fn delivery_to_record(delivery: &Delivery, stream_name: &str, vhost: &str) -> BackupRecord {
let msg = delivery.message();
let body = msg.data().map(|data| data.to_vec());
let properties = if let Some(props) = msg.properties() {
BackupProperties {
content_type: props.content_type.as_ref().map(|s| s.to_string()),
content_encoding: props.content_encoding.as_ref().map(|s| s.to_string()),
message_id: props.message_id.as_ref().map(|v| format!("{:?}", v)),
correlation_id: props.correlation_id.as_ref().map(|v| format!("{:?}", v)),
reply_to: props.reply_to.as_ref().map(|s| s.to_string()),
timestamp: None, ..Default::default()
}
} else {
BackupProperties::default()
};
let headers = msg
.application_properties()
.map(|app_props| {
app_props
.iter()
.map(|(k, v)| {
(
k.to_string(),
crate::manifest::BackupHeaderValue::LongString(format!("{:?}", v)),
)
})
.collect()
})
.unwrap_or_default();
BackupRecord {
body,
properties,
headers,
exchange: String::new(),
routing_key: String::new(),
delivery_tag: delivery.offset(),
redelivered: false,
backed_up_at: chrono::Utc::now().timestamp_millis(),
source_queue: stream_name.to_string(),
source_vhost: vhost.to_string(),
}
}
fn stream_segment_key(
backup_id: &str,
vhost: &str,
stream: &str,
seq: u64,
compression_type: CompressionType,
) -> String {
let vhost_safe = if vhost == "/" { "_default" } else { vhost };
format!(
"{}/queues/{}/{}/segment-{:04}{}",
backup_id,
vhost_safe,
stream,
seq,
compression::extension(compression_type)
)
}