use bytes::Bytes;
use serde::{Deserialize, Serialize};
use crate::config::QueueType;
use crate::error::Result;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackupManifest {
pub backup_id: String,
pub created_at: i64,
pub completed_at: Option<i64>,
pub source_cluster: Option<String>,
pub rabbitmq_version: Option<String>,
pub backup_tool_version: String,
pub definitions: Option<DefinitionsBackup>,
pub queues: Vec<QueueBackup>,
pub total_messages: u64,
pub total_bytes: u64,
pub total_segments: u64,
}
impl BackupManifest {
pub fn new(backup_id: String, tool_version: String) -> Self {
Self {
backup_id,
created_at: chrono::Utc::now().timestamp_millis(),
completed_at: None,
source_cluster: None,
rabbitmq_version: None,
backup_tool_version: tool_version,
definitions: None,
queues: Vec::new(),
total_messages: 0,
total_bytes: 0,
total_segments: 0,
}
}
pub fn set_source_info(
&mut self,
cluster_name: Option<String>,
rabbitmq_version: Option<String>,
) {
self.source_cluster = cluster_name;
self.rabbitmq_version = rabbitmq_version;
}
pub fn set_definitions(&mut self, definitions: DefinitionsBackup) {
self.definitions = Some(definitions);
}
pub fn get_or_create_queue(
&mut self,
vhost: &str,
name: &str,
queue_type: QueueType,
) -> &mut QueueBackup {
let pos = self
.queues
.iter()
.position(|q| q.vhost == vhost && q.name == name);
match pos {
Some(idx) => &mut self.queues[idx],
None => {
self.queues.push(QueueBackup {
vhost: vhost.to_string(),
name: name.to_string(),
queue_type,
segments: Vec::new(),
message_count: 0,
first_message_timestamp: None,
last_message_timestamp: None,
});
self.queues.last_mut().unwrap()
}
}
}
pub fn add_segment(
&mut self,
vhost: &str,
queue: &str,
queue_type: QueueType,
metadata: SegmentMetadata,
) {
let queue_backup = self.get_or_create_queue(vhost, queue, queue_type);
queue_backup.message_count += metadata.record_count;
if let Some(ts) = metadata.first_timestamp {
if queue_backup.first_message_timestamp.is_none()
|| Some(ts) < queue_backup.first_message_timestamp
{
queue_backup.first_message_timestamp = Some(ts);
}
}
if let Some(ts) = metadata.last_timestamp {
if queue_backup.last_message_timestamp.is_none()
|| Some(ts) > queue_backup.last_message_timestamp
{
queue_backup.last_message_timestamp = Some(ts);
}
}
queue_backup.segments.push(metadata);
}
pub fn finalize(&mut self) {
self.completed_at = Some(chrono::Utc::now().timestamp_millis());
self.total_messages = self.queues.iter().map(|q| q.message_count).sum();
self.total_bytes = self
.queues
.iter()
.flat_map(|q| q.segments.iter())
.map(|s| s.size_bytes)
.sum();
self.total_segments = self.queues.iter().map(|q| q.segments.len() as u64).sum();
}
pub fn to_json(&self) -> Result<Bytes> {
let json = serde_json::to_vec_pretty(self)?;
Ok(Bytes::from(json))
}
pub fn from_json(data: &[u8]) -> Result<Self> {
let manifest: Self = serde_json::from_slice(data)?;
Ok(manifest)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DefinitionsBackup {
pub key: String,
pub vhost_count: usize,
pub queue_count: usize,
pub exchange_count: usize,
pub user_count: usize,
pub size_bytes: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueBackup {
pub vhost: String,
pub name: String,
pub queue_type: QueueType,
pub segments: Vec<SegmentMetadata>,
pub message_count: u64,
pub first_message_timestamp: Option<i64>,
pub last_message_timestamp: Option<i64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SegmentMetadata {
pub key: String,
pub sequence: u64,
pub record_count: u64,
pub size_bytes: u64,
pub uncompressed_bytes: u64,
pub first_timestamp: Option<i64>,
pub last_timestamp: Option<i64>,
pub checksum: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackupRecord {
pub body: Option<Vec<u8>>,
pub properties: BackupProperties,
pub headers: Vec<(String, BackupHeaderValue)>,
pub exchange: String,
pub routing_key: String,
pub delivery_tag: u64,
pub redelivered: bool,
pub backed_up_at: i64,
pub source_queue: String,
pub source_vhost: String,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct BackupProperties {
pub content_type: Option<String>,
pub content_encoding: Option<String>,
pub delivery_mode: Option<u8>,
pub priority: Option<u8>,
pub correlation_id: Option<String>,
pub reply_to: Option<String>,
pub expiration: Option<String>,
pub message_id: Option<String>,
pub timestamp: Option<i64>,
pub type_field: Option<String>,
pub user_id: Option<String>,
pub app_id: Option<String>,
pub cluster_id: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum BackupHeaderValue {
LongString(String),
LongStringBytes(Vec<u8>),
ShortString(String),
Long(i64),
Short(i16),
ShortShortInt(i8),
ShortShortUInt(u8),
ShortInt(i16),
ShortUInt(u16),
LongInt(i32),
LongUInt(u32),
LongLongInt(i64),
Bool(bool),
Bytes(Vec<u8>),
Timestamp(i64),
Float(f32),
Double(f64),
Decimal { scale: u8, value: u32 },
Void,
Table(Vec<(String, BackupHeaderValue)>),
Array(Vec<BackupHeaderValue>),
}