use crate::proto::{
producer_service_client::ProducerServiceClient, MessageRequest, MessageResponse,
ProducerAccessMode, ProducerRequest, ProducerResponse,
};
use crate::{
errors::{decode_error_details, DanubeError, Result},
message::{MessageMetadata, SendMessage},
schema::{Schema, SchemaType},
DanubeClient,
};
use std::collections::HashMap;
use std::sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
};
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::time::{sleep, Duration};
use tonic::{transport::Uri, Code, Response, Status};
use tracing::warn;
#[derive(Debug)]
#[allow(dead_code)]
pub struct Producer {
client: DanubeClient,
topic: String,
producer_name: String,
producer_id: Option<u64>,
request_id: AtomicU64,
message_sequence_id: AtomicU64,
schema: Option<Schema>,
producer_options: ProducerOptions,
stream_client: Option<ProducerServiceClient<tonic::transport::Channel>>,
stop_signal: Arc<AtomicBool>,
}
impl Producer {
pub fn new(
client: DanubeClient,
topic: String,
producer_name: String,
schema: Option<Schema>,
producer_options: ProducerOptions,
) -> Self {
Producer {
client,
topic,
producer_name,
producer_id: None,
request_id: AtomicU64::new(0),
message_sequence_id: AtomicU64::new(0),
schema,
producer_options,
stream_client: None,
stop_signal: Arc::new(AtomicBool::new(false)),
}
}
pub async fn create(&mut self) -> Result<u64> {
self.connect(&self.client.uri.clone()).await?;
let mut schema = Schema::new("bytes_schema".into(), SchemaType::String);
if let Some(sch) = self.schema.clone() {
schema = sch;
}
let req = ProducerRequest {
request_id: self.request_id.fetch_add(1, Ordering::SeqCst),
producer_name: self.producer_name.clone(),
topic_name: self.topic.clone(),
schema: Some(schema.into()),
producer_access_mode: ProducerAccessMode::Shared.into(),
};
let max_retries = 4;
let mut attempts = 0;
let mut broker_addr = self.client.uri.clone();
loop {
let request = tonic::Request::new(req.clone());
let mut client = self.stream_client.as_mut().unwrap().clone();
let response: std::result::Result<Response<ProducerResponse>, Status> =
client.create_producer(request).await;
match response {
Ok(resp) => {
let response = resp.into_inner();
self.producer_id = Some(response.producer_id);
let stop_signal = Arc::clone(&self.stop_signal);
let _ = self
.client
.health_check_service
.start_health_check(&broker_addr, 0, response.producer_id, stop_signal)
.await;
return Ok(response.producer_id);
}
Err(status) => {
let error_message = decode_error_details(&status);
if status.code() == Code::AlreadyExists {
return Err(DanubeError::FromStatus(status, error_message));
}
attempts += 1;
if attempts >= max_retries {
return Err(DanubeError::FromStatus(status, error_message));
}
if let Some(error_m) = &error_message {
if error_m.error_type != 3 {
return Err(DanubeError::FromStatus(status, error_message));
}
}
sleep(Duration::from_secs(2)).await;
match self
.client
.lookup_service
.handle_lookup(&broker_addr, &self.topic)
.await
{
Ok(addr) => {
broker_addr = addr.clone();
self.connect(&addr).await?;
self.client.uri = addr;
}
Err(err) => {
if let Some(status) = err.extract_status() {
if let Some(error_message) = decode_error_details(status) {
if error_message.error_type != 3 {
return Err(DanubeError::FromStatus(
status.to_owned(),
Some(error_message),
));
}
}
} else {
warn!("Lookup request failed with error: {}", err);
return Err(DanubeError::Unrecoverable(format!(
"Lookup failed with error: {}",
err
)));
}
}
}
}
};
}
}
pub async fn send(
&self,
data: Vec<u8>,
attributes: Option<HashMap<String, String>>,
) -> Result<u64> {
let publish_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;
let attr = if let Some(attributes) = attributes {
attributes
} else {
HashMap::new()
};
let meta_data = MessageMetadata {
producer_name: self.producer_name.clone(),
sequence_id: self.message_sequence_id.fetch_add(1, Ordering::SeqCst),
publish_time: publish_time,
attributes: attr,
};
let send_message = SendMessage {
request_id: self.request_id.fetch_add(1, Ordering::SeqCst),
producer_id: self
.producer_id
.expect("Producer ID should be set before sending messages"),
metadata: Some(meta_data),
message: data,
};
let req: MessageRequest = send_message.to_proto();
let mut client = self.stream_client.as_ref().unwrap().clone();
let response: std::result::Result<Response<MessageResponse>, Status> =
client.send_message(tonic::Request::new(req)).await;
match response {
Ok(resp) => {
let response = resp.into_inner();
return Ok(response.sequence_id);
}
Err(status) => {
let decoded_message = decode_error_details(&status);
return Err(DanubeError::FromStatus(status, decoded_message));
}
}
}
async fn connect(&mut self, addr: &Uri) -> Result<()> {
let grpc_cnx = self.client.cnx_manager.get_connection(addr, addr).await?;
let client = ProducerServiceClient::new(grpc_cnx.grpc_cnx.clone());
self.stream_client = Some(client);
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct ProducerBuilder {
client: DanubeClient,
topic: Option<String>,
producer_name: Option<String>,
schema: Option<Schema>,
producer_options: ProducerOptions,
}
impl ProducerBuilder {
pub fn new(client: &DanubeClient) -> Self {
ProducerBuilder {
client: client.clone(),
topic: None,
producer_name: None,
schema: None,
producer_options: ProducerOptions::default(),
}
}
pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
self.topic = Some(topic.into());
self
}
pub fn with_name(mut self, producer_name: impl Into<String>) -> Self {
self.producer_name = Some(producer_name.into());
self
}
pub fn with_schema(mut self, schema_name: String, schema_type: SchemaType) -> Self {
self.schema = Some(Schema::new(schema_name, schema_type));
self
}
pub fn with_options(mut self, options: ProducerOptions) -> Self {
self.producer_options = options;
self
}
pub fn build(self) -> Producer {
let topic = self
.topic
.expect("can't create a producer without assigning to a topic");
let producer_name = self
.producer_name
.expect("you should provide a name to the created producer");
Producer::new(
self.client,
topic,
producer_name,
self.schema,
self.producer_options,
)
}
}
#[derive(Debug, Clone, Default)]
pub struct ProducerOptions {
pub others: String,
}