use crate::{
errors::{DanubeError, Result},
retry_manager::{status_to_danube_error, RetryManager},
DanubeClient, ProducerOptions,
};
use danube_core::proto::{
health_check_request::ClientType, producer_service_client::ProducerServiceClient,
schema_reference::VersionRef, DispatchStrategy as ProtoDispatchStrategy, ProducerAccessMode,
ProducerRequest, SchemaReference, StreamMessage as ProtoStreamMessage,
};
use danube_core::{
dispatch_strategy::ConfigDispatchStrategy,
message::{MessageID, StreamMessage},
};
use std::collections::HashMap;
use std::sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
};
use std::time::{SystemTime, UNIX_EPOCH};
use tonic::transport::Uri;
use tracing::warn;
#[derive(Debug)]
enum ProducerState {
Disconnected,
Ready {
stream_client: ProducerServiceClient<tonic::transport::Channel>,
producer_id: u64,
},
}
#[derive(Debug)]
#[allow(dead_code)]
pub(crate) struct TopicProducer {
pub(crate) client: DanubeClient,
pub(crate) topic: String,
producer_name: String,
request_id: AtomicU64,
schema_ref: Option<SchemaReference>,
schema_id: Option<u64>,
schema_version: Option<u32>,
dispatch_strategy: ConfigDispatchStrategy,
producer_options: ProducerOptions,
state: ProducerState,
pub(crate) broker_addr: Uri,
pub(crate) connect_url: Uri,
pub(crate) proxy: bool,
stop_signal: Arc<AtomicBool>,
retry_manager: RetryManager,
}
impl TopicProducer {
pub(crate) fn new(
client: DanubeClient,
topic: String,
producer_name: String,
schema_ref: Option<SchemaReference>,
dispatch_strategy: ConfigDispatchStrategy,
producer_options: ProducerOptions,
) -> Self {
let retry_manager = RetryManager::new(
producer_options.max_retries,
producer_options.base_backoff_ms,
producer_options.max_backoff_ms,
);
let broker_addr = client.uri.clone();
let connect_url = broker_addr.clone();
TopicProducer {
client,
topic,
producer_name,
request_id: AtomicU64::new(0),
schema_ref,
schema_id: None,
schema_version: None,
dispatch_strategy,
producer_options,
state: ProducerState::Disconnected,
broker_addr,
connect_url,
proxy: false,
stop_signal: Arc::new(AtomicBool::new(false)),
retry_manager,
}
}
pub(crate) async fn create(&mut self) -> Result<u64> {
let mut attempts = 0;
loop {
match self.try_create().await {
Ok(producer_id) => return Ok(producer_id),
Err(error) => {
if !self.retry_manager.is_retryable_error(&error) {
return Err(error);
}
attempts += 1;
if attempts > self.retry_manager.max_retries() {
return Err(error);
}
self.lookup_new_broker().await;
let backoff = self.retry_manager.calculate_backoff(attempts - 1);
tokio::time::sleep(backoff).await;
}
}
}
}
async fn try_create(&mut self) -> Result<u64> {
let mut stream_client = self.connect().await?;
let producer_request = ProducerRequest {
request_id: self.request_id.fetch_add(1, Ordering::SeqCst),
producer_name: self.producer_name.clone(),
topic_name: self.topic.clone(),
schema_ref: self.schema_ref.clone(),
producer_access_mode: ProducerAccessMode::Shared.into(),
dispatch_strategy: match &self.dispatch_strategy {
ConfigDispatchStrategy::NonReliable => ProtoDispatchStrategy::NonReliable as i32,
ConfigDispatchStrategy::Reliable => ProtoDispatchStrategy::Reliable as i32,
},
};
let mut request = tonic::Request::new(producer_request);
RetryManager::insert_auth_token(&self.client, &mut request, &self.connect_url).await?;
RetryManager::insert_proxy_header(&mut request, &self.broker_addr, self.proxy);
let response = stream_client
.create_producer(request)
.await
.map_err(|status| {
if status.code() == tonic::Code::AlreadyExists {
warn!("producer already exists, not allowed to create the same producer twice");
}
status_to_danube_error(status)
})?
.into_inner();
let stop_signal = Arc::clone(&self.stop_signal);
let _ = self
.client
.health_check_service
.start_health_check(
&self.connect_url,
&self.broker_addr,
self.proxy,
ClientType::Producer,
response.producer_id,
stop_signal,
)
.await;
if let Some(schema_ref) = self.schema_ref.clone() {
let (schema_id, schema_version) = self.resolve_schema_metadata(&schema_ref).await?;
self.schema_id = Some(schema_id);
self.schema_version = Some(schema_version);
tracing::debug!(
"Producer '{}' cached schema: id={}, version={}",
self.producer_name,
schema_id,
schema_version
);
}
self.state = ProducerState::Ready {
stream_client,
producer_id: response.producer_id,
};
Ok(response.producer_id)
}
async fn lookup_new_broker(&mut self) {
if let Ok(broker_address) = self
.client
.lookup_service
.handle_lookup(&self.connect_url, &self.topic)
.await
{
self.broker_addr = broker_address.broker_url;
self.connect_url = broker_address.connect_url;
self.proxy = broker_address.proxy;
}
}
pub(crate) async fn send(
&mut self,
data: &[u8],
attributes: Option<&HashMap<String, String>>,
) -> Result<u64> {
let (stream_client, producer_id) = match &self.state {
ProducerState::Ready {
stream_client,
producer_id,
} => (stream_client.clone(), *producer_id),
ProducerState::Disconnected => {
return Err(DanubeError::Unrecoverable(
"Send: producer is not connected".into(),
));
}
};
let publish_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;
let msg_id = MessageID {
producer_id,
topic_name: self.topic.clone(),
broker_addr: self.broker_addr.to_string(),
topic_offset: 0,
};
let send_message = StreamMessage {
request_id: self.request_id.fetch_add(1, Ordering::SeqCst),
msg_id,
payload: data.to_vec(),
publish_time,
producer_name: self.producer_name.clone(),
subscription_name: None,
attributes: attributes.cloned().unwrap_or_default(),
schema_id: self.schema_id,
schema_version: self.schema_version,
};
let req: ProtoStreamMessage = send_message.into();
let mut request = tonic::Request::new(req);
RetryManager::insert_auth_token(&self.client, &mut request, &self.connect_url).await?;
RetryManager::insert_proxy_header(&mut request, &self.broker_addr, self.proxy);
let response = stream_client
.clone()
.send_message(request)
.await
.map_err(status_to_danube_error)?;
Ok(response.into_inner().request_id)
}
async fn connect(&self) -> Result<ProducerServiceClient<tonic::transport::Channel>> {
let grpc_cnx = self
.client
.cnx_manager
.get_connection(&self.broker_addr, &self.connect_url)
.await?;
Ok(ProducerServiceClient::new(grpc_cnx.grpc_cnx.clone()))
}
async fn resolve_schema_metadata(
&mut self,
schema_ref: &SchemaReference,
) -> Result<(u64, u32)> {
let schema_client = self.client.schema();
match &schema_ref.version_ref {
Some(VersionRef::PinnedVersion(pinned_version)) => {
tracing::debug!(
"Resolving pinned version {} for subject '{}'",
pinned_version,
schema_ref.subject
);
let latest = schema_client.get_latest_schema(&schema_ref.subject).await?;
if *pinned_version > latest.version {
return Err(DanubeError::SchemaError(format!(
"Pinned version {} does not exist for subject '{}'. Latest version is {}",
pinned_version, schema_ref.subject, latest.version
)));
}
if *pinned_version == latest.version {
return Ok((latest.schema_id, latest.version));
}
let pinned_schema = schema_client
.get_schema_version(latest.schema_id, Some(*pinned_version))
.await?;
Ok((pinned_schema.schema_id, pinned_schema.version))
}
Some(VersionRef::MinVersion(min_version)) => {
tracing::debug!(
"Resolving minimum version {} for subject '{}'",
min_version,
schema_ref.subject
);
let latest = schema_client.get_latest_schema(&schema_ref.subject).await?;
if latest.version < *min_version {
return Err(DanubeError::SchemaError(format!(
"Latest version {} does not meet minimum version requirement {} for subject '{}'",
latest.version, min_version, schema_ref.subject
)));
}
Ok((latest.schema_id, latest.version))
}
Some(VersionRef::UseLatest(_)) | None => {
tracing::debug!(
"Resolving latest version for subject '{}'",
schema_ref.subject
);
let latest = schema_client.get_latest_schema(&schema_ref.subject).await?;
Ok((latest.schema_id, latest.version))
}
}
}
}