use tracing::{debug, info};
use crate::amqp::{AmqpClient, ConfirmStats};
use crate::backup::queue_reader::{to_amqp_properties, to_field_table};
use crate::error::Result;
use crate::manifest::BackupRecord;
const RESTORE_PUBLISH_MANDATORY: bool = true;
fn restore_publish_mandatory() -> bool {
RESTORE_PUBLISH_MANDATORY
}
pub struct Publisher {
client: AmqpClient,
channel_id: u16,
confirms_enabled: bool,
}
impl Publisher {
pub fn new(client: AmqpClient, channel_id: u16) -> Self {
Self {
client,
channel_id,
confirms_enabled: false,
}
}
pub async fn enable_confirms(&mut self) -> Result<()> {
self.client.confirm_select(self.channel_id).await?;
self.confirms_enabled = true;
Ok(())
}
pub async fn publish_record(
&mut self,
record: &BackupRecord,
exchange: &str,
routing_key: &str,
) -> Result<u64> {
let mut props = to_amqp_properties(&record.properties);
if !record.headers.is_empty() {
props = props.with_headers(to_field_table(&record.headers));
}
let body = record.body.as_deref().unwrap_or(&[]);
self.client
.basic_publish(
self.channel_id,
exchange,
routing_key,
restore_publish_mandatory(),
&props,
body,
)
.await
}
pub async fn wait_for_confirms(&mut self, up_to_tag: u64) -> Result<ConfirmStats> {
if self.confirms_enabled && up_to_tag > 0 {
self.client
.wait_for_confirms(self.channel_id, up_to_tag)
.await
} else {
Ok(ConfirmStats::default())
}
}
pub async fn publish_batch(
&mut self,
records: &[BackupRecord],
exchange: &str,
routing_key: &str,
) -> Result<(u64, u64)> {
if records.is_empty() {
return Ok((0, 0));
}
let mut last_tag = 0u64;
for record in records {
last_tag = self.publish_record(record, exchange, routing_key).await?;
}
let confirm_stats = self.wait_for_confirms(last_tag).await?;
let failed = confirm_stats.failed();
let confirmed = (records.len() as u64).saturating_sub(failed);
debug!(
"Batch published: {} confirmed, {} failed ({:?})",
confirmed, failed, confirm_stats
);
Ok((confirmed, failed))
}
pub async fn close(mut self) {
self.client.close_channel(self.channel_id).await.ok();
self.client.close().await.ok();
info!("Publisher closed");
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_restore_publish_uses_mandatory_routing() {
assert!(restore_publish_mandatory());
}
}