danube_client/producer.rs
1use crate::{
2 errors::{DanubeError, Result},
3 message_router::MessageRouter,
4 retry_manager::RetryManager,
5 topic_producer::TopicProducer,
6 DanubeClient,
7};
8
9use danube_core::dispatch_strategy::ConfigDispatchStrategy;
10use danube_core::proto::schema_reference::VersionRef;
11use danube_core::proto::SchemaReference;
12use std::collections::HashMap;
13use std::sync::Arc;
14use tokio::sync::Mutex;
15use tracing::{error, info, warn};
16
17/// Represents a message producer responsible for sending messages to partitioned or non-partitioned topics distributed across message brokers.
18///
19/// The `Producer` struct is designed to handle the creation and management of a producer instance that sends messages to either partitioned or non-partitioned topics.
20/// It manages the producer's state and ensures that messages are sent according to the configured settings.
21#[derive(Debug)]
22pub struct Producer {
23 client: DanubeClient,
24 topic_name: String,
25 schema_ref: Option<SchemaReference>,
26 dispatch_strategy: ConfigDispatchStrategy,
27 producer_name: String,
28 partitions: Option<usize>,
29 message_router: Option<MessageRouter>,
30 producers: Arc<Mutex<Vec<TopicProducer>>>,
31 producer_options: ProducerOptions,
32}
33
34impl Producer {
35 pub(crate) fn new(
36 client: DanubeClient,
37 topic_name: String,
38 schema_ref: Option<SchemaReference>,
39 dispatch_strategy: Option<ConfigDispatchStrategy>,
40 producer_name: String,
41 partitions: Option<usize>,
42 message_router: Option<MessageRouter>,
43 producer_options: ProducerOptions,
44 ) -> Self {
45 let dispatch_strategy = dispatch_strategy.unwrap_or_default();
46
47 Producer {
48 client,
49 topic_name,
50 schema_ref,
51 dispatch_strategy,
52 producer_name,
53 partitions,
54 message_router,
55 producers: Arc::new(Mutex::new(Vec::new())),
56 producer_options,
57 }
58 }
59
60 /// Initializes the producer and registers it with the message brokers.
61 ///
62 /// This asynchronous method sets up the producer by establishing connections with the message brokers and configuring it for sending messages to the specified topic.
63 /// It is responsible for creating the necessary resources for producers handling partitioned topics.
64 pub async fn create(&mut self) -> Result<()> {
65 let mut topic_producers: Vec<_> = match self.partitions {
66 None => {
67 // Create a single TopicProducer for non-partitioned topic
68 vec![TopicProducer::new(
69 self.client.clone(),
70 self.topic_name.clone(),
71 self.producer_name.clone(),
72 self.schema_ref.clone(),
73 self.dispatch_strategy.clone(),
74 self.producer_options.clone(),
75 )]
76 }
77 Some(partitions) => {
78 if self.message_router.is_none() {
79 self.message_router = Some(MessageRouter::new(partitions));
80 };
81
82 (0..partitions)
83 .map(|partition_id| {
84 let topic = format!("{}-part-{}", self.topic_name, partition_id);
85 TopicProducer::new(
86 self.client.clone(),
87 topic,
88 format!("{}-{}", self.producer_name, partition_id),
89 self.schema_ref.clone(),
90 self.dispatch_strategy.clone(),
91 self.producer_options.clone(),
92 )
93 })
94 .collect()
95 }
96 };
97
98 for topic_producer in &mut topic_producers {
99 let _prod_id = topic_producer.create().await?;
100 }
101
102 // ensure that the producers are added only if all topic_producers are succesfully created
103 let mut producers = self.producers.lock().await;
104 *producers = topic_producers;
105
106 Ok(())
107 }
108
109 /// Sends a message to the topic associated with this producer.
110 ///
111 /// It handles the serialization of the payload and any user-defined attributes. This method assumes that the producer has been successfully initialized and is ready to send messages.
112 ///
113 /// # Parameters
114 ///
115 /// - `data`: The message payload to be sent. This should be a `Vec<u8>` representing the content of the message.
116 /// - `attributes`: Optional user-defined properties or attributes associated with the message. This is a `HashMap<String, String>` where keys and values represent the attribute names and values, respectively.
117 ///
118 /// # Returns
119 ///
120 /// - `Ok(u64)`: The sequence ID of the sent message if the operation is successful. This ID can be used for tracking and acknowledging the message.
121 /// - `Err(e)`: An error if message sending fails. Possible reasons for failure include network issues, serialization errors, or broker-related problems.
122 pub async fn send(
123 &self,
124 data: Vec<u8>,
125 attributes: Option<HashMap<String, String>>,
126 ) -> Result<u64> {
127 let partition = self.select_partition();
128 let retry_manager = RetryManager::new(
129 self.producer_options.max_retries,
130 self.producer_options.base_backoff_ms,
131 self.producer_options.max_backoff_ms,
132 );
133
134 let mut attempts = 0;
135
136 loop {
137 let send_result = {
138 let mut producers = self.producers.lock().await;
139 producers[partition].send(&data, attributes.as_ref()).await
140 };
141
142 match send_result {
143 Ok(sequence_id) => return Ok(sequence_id),
144
145 // Unrecoverable: attempt full recreation
146 Err(ref error) if matches!(error, DanubeError::Unrecoverable(_)) => {
147 warn!(error = ?error, "unrecoverable error, attempting producer recreation");
148 self.recreate_producer(partition).await?;
149 attempts = 0;
150 }
151
152 // Retryable: backoff, then escalate to lookup+recreate after max retries
153 Err(error) if retry_manager.is_retryable_error(&error) => {
154 attempts += 1;
155 if attempts > retry_manager.max_retries() {
156 warn!("max retries exceeded, attempting broker lookup and recreation");
157 self.lookup_and_recreate(partition, error).await?;
158 attempts = 0;
159 continue;
160 }
161 let backoff = retry_manager.calculate_backoff(attempts - 1);
162 tokio::time::sleep(backoff).await;
163 }
164
165 // Non-retryable: bail
166 Err(error) => {
167 error!(error = ?error, "non-retryable error in producer send");
168 return Err(error);
169 }
170 }
171 }
172 }
173
174 /// Select the next partition using round-robin, or 0 for non-partitioned topics.
175 fn select_partition(&self) -> usize {
176 match self.partitions {
177 Some(_) => self
178 .message_router
179 .as_ref()
180 .expect("message_router must be initialized for partitioned topics")
181 .round_robin(),
182 None => 0,
183 }
184 }
185
186 /// Recreate a single topic producer (e.g., after an unrecoverable error).
187 async fn recreate_producer(&self, partition: usize) -> Result<()> {
188 let mut producers = self.producers.lock().await;
189 producers[partition].create().await?;
190 info!("producer recreation successful");
191 Ok(())
192 }
193
194 /// Look up a new broker and recreate the topic producer on the new connection.
195 /// On lookup failure, returns the `original_error` from the failed send.
196 async fn lookup_and_recreate(
197 &self,
198 partition: usize,
199 original_error: DanubeError,
200 ) -> Result<()> {
201 let mut producers = self.producers.lock().await;
202 let producer = &mut producers[partition];
203
204 let new_addr = producer
205 .client
206 .lookup_service
207 .handle_lookup(&producer.broker_addr, &producer.topic)
208 .await
209 .map_err(|_| original_error)?;
210
211 producer.broker_addr = new_addr;
212 producer.create().await?;
213 info!("broker lookup and producer recreation successful");
214 Ok(())
215 }
216}
217
218/// A builder for creating a new `Producer` instance.
219///
220/// `ProducerBuilder` provides a fluent API for configuring and instantiating a `Producer`.
221/// It allows you to set various properties that define how the producer will behave and interact with the message broker.
222#[derive(Debug, Clone)]
223pub struct ProducerBuilder {
224 client: DanubeClient,
225 topic: Option<String>,
226 num_partitions: Option<usize>,
227 producer_name: Option<String>,
228 // TODO Phase 4: schema removed
229 // schema: Option<Schema>,
230 // Phase 5: Schema registry support
231 schema_ref: Option<SchemaReference>,
232 dispatch_strategy: Option<ConfigDispatchStrategy>,
233 producer_options: ProducerOptions,
234}
235
236impl ProducerBuilder {
237 pub fn new(client: &DanubeClient) -> Self {
238 ProducerBuilder {
239 client: client.clone(),
240 topic: None,
241 num_partitions: None,
242 producer_name: None,
243 schema_ref: None,
244 dispatch_strategy: None,
245 producer_options: ProducerOptions::default(),
246 }
247 }
248
249 /// Sets the topic name for the producer. This is a required field.
250 ///
251 /// This method specifies the topic that the producer will send messages to. It must be set before creating the producer.
252 ///
253 /// # Parameters
254 ///
255 /// - `topic`: The name of the topic for the producer. This should be a non-empty string that corresponds to an existing or new topic.
256 pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
257 self.topic = Some(topic.into());
258 self
259 }
260
261 /// Sets the name of the producer. This is a required field.
262 ///
263 /// This method specifies the name to be assigned to the producer instance. It must be set before creating the producer.
264 ///
265 /// # Parameters
266 ///
267 /// - `producer_name`: The name assigned to the producer instance. This should be a non-empty string used for identifying the producer.
268 pub fn with_name(mut self, producer_name: impl Into<String>) -> Self {
269 self.producer_name = Some(producer_name.into());
270 self
271 }
272
273 // ===== Schema Registry Methods =====
274
275 /// Set schema by subject name (uses latest version)
276 ///
277 /// The producer will reference the latest schema version for the given subject.
278 /// The schema must be registered in the schema registry before use.
279 ///
280 /// # Example
281 /// ```no_run
282 /// # use danube_client::DanubeClient;
283 /// # async fn example(client: DanubeClient) -> Result<(), Box<dyn std::error::Error>> {
284 /// let mut producer = client.new_producer()
285 /// .with_topic("user-events")
286 /// .with_name("my-producer")
287 /// .with_schema_subject("user-events-value") // Uses latest version
288 /// .build()?;
289 /// # Ok(())
290 /// # }
291 /// ```
292 pub fn with_schema_subject(mut self, subject: impl Into<String>) -> Self {
293 self.schema_ref = Some(SchemaReference {
294 subject: subject.into(),
295 version_ref: Some(VersionRef::UseLatest(true)),
296 });
297 self
298 }
299
300 /// Set schema with a pinned version
301 ///
302 /// The producer will use a specific schema version and won't automatically
303 /// upgrade to newer versions.
304 ///
305 /// # Example
306 /// ```no_run
307 /// # use danube_client::DanubeClient;
308 /// # async fn example(client: DanubeClient) -> Result<(), Box<dyn std::error::Error>> {
309 /// let mut producer = client.new_producer()
310 /// .with_topic("user-events")
311 /// .with_name("my-producer")
312 /// .with_schema_version("user-events-value", 2) // Pin to version 2
313 /// .build()?;
314 /// # Ok(())
315 /// # }
316 /// ```
317 pub fn with_schema_version(mut self, subject: impl Into<String>, version: u32) -> Self {
318 self.schema_ref = Some(SchemaReference {
319 subject: subject.into(),
320 version_ref: Some(VersionRef::PinnedVersion(version)),
321 });
322 self
323 }
324
325 /// Set schema with a minimum version requirement
326 ///
327 /// The producer will use the specified version or any newer compatible version.
328 ///
329 /// # Example
330 /// ```no_run
331 /// # use danube_client::DanubeClient;
332 /// # async fn example(client: DanubeClient) -> Result<(), Box<dyn std::error::Error>> {
333 /// let mut producer = client.new_producer()
334 /// .with_topic("user-events")
335 /// .with_name("my-producer")
336 /// .with_schema_min_version("user-events-value", 2) // Use v2 or newer
337 /// .build()?;
338 /// # Ok(())
339 /// # }
340 /// ```
341 pub fn with_schema_min_version(mut self, subject: impl Into<String>, min_version: u32) -> Self {
342 self.schema_ref = Some(SchemaReference {
343 subject: subject.into(),
344 version_ref: Some(VersionRef::MinVersion(min_version)),
345 });
346 self
347 }
348
349 /// Set schema with a custom SchemaReference (advanced use)
350 ///
351 /// This allows full control over schema versioning. For most use cases,
352 /// prefer `with_schema_subject()`, `with_schema_version()`, or `with_schema_min_version()`.
353 ///
354 /// # Example
355 /// ```no_run
356 /// # use danube_client::{DanubeClient, SchemaReference, VersionRef};
357 /// # async fn example(client: DanubeClient) -> Result<(), Box<dyn std::error::Error>> {
358 /// let mut producer = client.new_producer()
359 /// .with_topic("user-events")
360 /// .with_name("my-producer")
361 /// .with_schema_reference(SchemaReference {
362 /// subject: "user-events-value".to_string(),
363 /// version_ref: Some(VersionRef::PinnedVersion(2)),
364 /// })
365 /// .build()?;
366 /// # Ok(())
367 /// # }
368 /// ```
369 pub fn with_schema_reference(mut self, schema_ref: SchemaReference) -> Self {
370 self.schema_ref = Some(schema_ref);
371 self
372 }
373
374 /// Sets the reliable dispatch options for the producer.
375 /// This method configures the dispatch strategy for the producer, which determines how messages are stored and managed.
376 /// The dispatch strategy defines how long messages are retained and how they are managed in the message broker.
377 ///
378 /// # Parameters
379 ///
380 /// No parameters; broker uses defaults for reliable topics.
381 pub fn with_reliable_dispatch(mut self) -> Self {
382 let dispatch_strategy = ConfigDispatchStrategy::Reliable;
383 self.dispatch_strategy = Some(dispatch_strategy);
384 self
385 }
386
387 /// Sets the configuration options for the producer, allowing customization of producer behavior.
388 ///
389 /// This method allows you to specify various configuration options that affect how the producer operates.
390 /// These options can control aspects such as retries, timeouts, and other producer-specific settings.
391 ///
392 /// # Parameters
393 ///
394 /// - `options`: A `ProducerOptions` instance containing the configuration options for the producer. This should be configured according to the desired behavior and requirements of the producer.
395 pub fn with_options(mut self, options: ProducerOptions) -> Self {
396 self.producer_options = options;
397 self
398 }
399
400 /// Sets the number of partitions for the topic.
401 ///
402 /// This method specifies how many partitions the topic should have. Partitions are used to distribute the load of messages across multiple Danube brokers, which can help with parallel processing and scalability.
403 ///
404 /// # Parameters
405 ///
406 /// - `partitions`: The number of partitions for the topic. This should be a positive integer representing the desired number of partitions. More partitions can improve parallelism and throughput. Default is 0 = non-partitioned topic.
407 pub fn with_partitions(mut self, partitions: usize) -> Self {
408 self.num_partitions = Some(partitions);
409 self
410 }
411
412 /// Creates a new `Producer` instance using the settings configured in the `ProducerBuilder`.
413 ///
414 /// This method performs validation to ensure that all required fields are set before creating the `Producer`. Once validation is successful, it constructs and returns a new `Producer` instance configured with the specified settings.
415 ///
416 /// # Returns
417 ///
418 /// - A `Producer` instance if the builder configuration is valid and the producer is created successfully.
419 ///
420 /// # Example
421 /// ```no_run
422 /// # use danube_client::DanubeClient;
423 /// # async fn example(client: DanubeClient) -> Result<(), Box<dyn std::error::Error>> {
424 /// let mut producer = client.new_producer()
425 /// .with_topic("my-topic")
426 /// .with_name("my-producer")
427 /// .with_partitions(3)
428 /// .build()?;
429 /// # Ok(())
430 /// # }
431 /// ```
432 pub fn build(self) -> Result<Producer> {
433 let topic_name = self.topic.ok_or_else(|| {
434 DanubeError::Unrecoverable("topic is required to build a Producer".into())
435 })?;
436 let producer_name = self.producer_name.ok_or_else(|| {
437 DanubeError::Unrecoverable("producer name is required to build a Producer".into())
438 })?;
439
440 if let Some(0) = self.num_partitions {
441 return Err(DanubeError::Unrecoverable(
442 "partitions must be > 0 or omitted for non-partitioned topic".into(),
443 ));
444 }
445
446 Ok(Producer::new(
447 self.client,
448 topic_name,
449 self.schema_ref,
450 self.dispatch_strategy,
451 producer_name,
452 self.num_partitions,
453 None,
454 self.producer_options,
455 ))
456 }
457}
458
459/// Configuration options for producers
460#[derive(Debug, Clone, Default)]
461#[non_exhaustive]
462pub struct ProducerOptions {
463 // Maximum number of retries for operations like create/send on transient failures
464 pub max_retries: usize,
465 // Base backoff in milliseconds for exponential backoff
466 pub base_backoff_ms: u64,
467 // Maximum backoff cap in milliseconds
468 pub max_backoff_ms: u64,
469}