use std::collections::HashMap;
use std::time::Duration;
use rdkafka::client::ClientContext;
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::{FutureProducer, FutureRecord};
use uuid::Uuid;
use crate::backend::PublisherImpl;
use crate::error::Result;
use crate::metrics;
use crate::publisher_internal::validate_headers;
use crate::retry::Backoff;
use crate::topic::Topic;
use crate::{QueueTopology, ShoveError};
use super::client::KafkaClient;
use super::constants::{MESSAGE_ID_HEADER, RETRY_COUNT_HEADER};
const MAX_PUBLISH_ATTEMPTS: u32 = 3;
const PRODUCE_TIMEOUT: Duration = Duration::from_secs(5);
pub(super) async fn publish_with_retry<C>(
producer: &FutureProducer<C>,
topic: &str,
key: Option<&[u8]>,
headers: OwnedHeaders,
payload: &[u8],
max_attempts: u32,
label: &str,
) -> Result<()>
where
C: ClientContext + 'static,
{
let mut backoff = Backoff::new(Duration::from_millis(100), Duration::from_secs(2));
for attempt in 1..=max_attempts {
let mut record = FutureRecord::to(topic)
.payload(payload)
.headers(headers.clone());
if let Some(k) = key {
record = record.key(k);
}
match producer.send(record, PRODUCE_TIMEOUT).await {
Ok(_) => return Ok(()),
Err((e, _)) => {
if attempt == max_attempts {
metrics::record_backend_error(
metrics::BackendLabel::Kafka,
metrics::BackendErrorKind::Publish,
);
return Err(ShoveError::Connection(format!(
"{label} failed after {max_attempts} attempts: {e}"
)));
}
let delay = backoff.next().unwrap_or(Duration::from_secs(2));
tracing::warn!(attempt, error = %e, "{label} failed, retrying");
tokio::time::sleep(delay).await;
}
}
}
unreachable!()
}
#[derive(Clone)]
pub struct KafkaPublisher {
client: KafkaClient,
}
impl KafkaPublisher {
pub async fn new(client: KafkaClient) -> Result<Self> {
Ok(Self { client })
}
fn resolve_topic_and_key<T: Topic>(
topology: &'static QueueTopology,
message: &T::Message,
) -> (String, Option<Vec<u8>>) {
let topic = topology.queue().to_string();
let key = T::SEQUENCE_KEY_FN.map(|key_fn| key_fn(message).into_bytes());
(topic, key)
}
fn build_headers(extra: Option<&HashMap<String, String>>) -> OwnedHeaders {
let mut headers = OwnedHeaders::new()
.insert(Header {
key: MESSAGE_ID_HEADER,
value: Some(Uuid::new_v4().to_string().as_bytes()),
})
.insert(Header {
key: RETRY_COUNT_HEADER,
value: Some(b"0"),
});
if let Some(extra) = extra {
for (k, v) in extra {
headers = headers.insert(Header {
key: k.as_str(),
value: Some(v.as_bytes()),
});
}
}
headers
}
}
impl KafkaPublisher {
pub async fn publish<T: Topic>(&self, message: &T::Message) -> Result<()> {
let payload = serde_json::to_vec(message)?;
let topology = T::topology();
let (topic, key) = Self::resolve_topic_and_key::<T>(topology, message);
let headers = Self::build_headers(None);
self.client
.publish_with_retry(
&topic,
key.as_deref(),
headers,
&payload,
MAX_PUBLISH_ATTEMPTS,
"publish",
)
.await
}
pub async fn publish_with_headers<T: Topic>(
&self,
message: &T::Message,
extra_headers: HashMap<String, String>,
) -> Result<()> {
validate_headers(&extra_headers)?;
let payload = serde_json::to_vec(message)?;
let topology = T::topology();
let (topic, key) = Self::resolve_topic_and_key::<T>(topology, message);
let headers = Self::build_headers(Some(&extra_headers));
self.client
.publish_with_retry(
&topic,
key.as_deref(),
headers,
&payload,
MAX_PUBLISH_ATTEMPTS,
"publish_with_headers",
)
.await
}
pub async fn publish_batch<T: Topic>(&self, messages: &[T::Message]) -> (u64, Result<()>) {
use futures_util::future::join_all;
let topology = T::topology();
#[allow(clippy::type_complexity)]
let prepared: Result<Vec<(String, Option<Vec<u8>>, OwnedHeaders, Vec<u8>)>> = messages
.iter()
.map(|msg| {
let payload = serde_json::to_vec(msg)?;
let (topic, key) = Self::resolve_topic_and_key::<T>(topology, msg);
let headers = Self::build_headers(None);
Ok((topic, key, headers, payload))
})
.collect();
let prepared = match prepared {
Ok(v) => v,
Err(e) => return (0, Err(e)),
};
let results: Vec<Result<()>> =
join_all(prepared.into_iter().map(|(topic, key, headers, payload)| {
let client = &self.client;
async move {
client
.publish_with_retry(
&topic,
key.as_deref(),
headers,
&payload,
1,
"batch publish",
)
.await
}
}))
.await;
let mut succeeded: u64 = 0;
let mut first_err: Option<ShoveError> = None;
for r in results {
match r {
Ok(()) => succeeded += 1,
Err(e) => {
if first_err.is_none() {
first_err = Some(e);
}
}
}
}
match first_err {
Some(e) => (succeeded, Err(e)),
None => (succeeded, Ok(())),
}
}
}
impl PublisherImpl for KafkaPublisher {
fn publish<T: Topic>(&self, msg: &T::Message) -> impl Future<Output = Result<()>> + Send {
KafkaPublisher::publish::<T>(self, msg)
}
fn publish_with_headers<T: Topic>(
&self,
msg: &T::Message,
headers: HashMap<String, String>,
) -> impl Future<Output = Result<()>> + Send {
KafkaPublisher::publish_with_headers::<T>(self, msg, headers)
}
fn publish_batch<T: Topic>(
&self,
msgs: &[T::Message],
) -> impl Future<Output = (u64, Result<()>)> + Send {
KafkaPublisher::publish_batch::<T>(self, msgs)
}
}