Skip to main content

danube_client/
producer.rs

1use crate::{
2    errors::{DanubeError, Result},
3    message_router::MessageRouter,
4    retry_manager::RetryManager,
5    topic_producer::TopicProducer,
6    DanubeClient,
7};
8
9use danube_core::dispatch_strategy::ConfigDispatchStrategy;
10use danube_core::proto::schema_reference::VersionRef;
11use danube_core::proto::SchemaReference;
12use std::collections::HashMap;
13use std::sync::Arc;
14use tokio::sync::Mutex;
15use tracing::{error, info, warn};
16
17/// Represents a message producer responsible for sending messages to partitioned or non-partitioned topics distributed across message brokers.
18///
19/// The `Producer` struct is designed to handle the creation and management of a producer instance that sends messages to either partitioned or non-partitioned topics.
20/// It manages the producer's state and ensures that messages are sent according to the configured settings.
21#[derive(Debug)]
22pub struct Producer {
23    client: DanubeClient,
24    topic_name: String,
25    schema_ref: Option<SchemaReference>,
26    dispatch_strategy: ConfigDispatchStrategy,
27    producer_name: String,
28    partitions: Option<usize>,
29    message_router: Option<MessageRouter>,
30    producers: Arc<Mutex<Vec<TopicProducer>>>,
31    producer_options: ProducerOptions,
32}
33
34impl Producer {
35    pub(crate) fn new(
36        client: DanubeClient,
37        topic_name: String,
38        schema_ref: Option<SchemaReference>,
39        dispatch_strategy: Option<ConfigDispatchStrategy>,
40        producer_name: String,
41        partitions: Option<usize>,
42        message_router: Option<MessageRouter>,
43        producer_options: ProducerOptions,
44    ) -> Self {
45        let dispatch_strategy = dispatch_strategy.unwrap_or_default();
46
47        Producer {
48            client,
49            topic_name,
50            schema_ref,
51            dispatch_strategy,
52            producer_name,
53            partitions,
54            message_router,
55            producers: Arc::new(Mutex::new(Vec::new())),
56            producer_options,
57        }
58    }
59
60    /// Initializes the producer and registers it with the message brokers.
61    ///
62    /// This asynchronous method sets up the producer by establishing connections with the message brokers and configuring it for sending messages to the specified topic.
63    /// It is responsible for creating the necessary resources for producers handling partitioned topics.
64    pub async fn create(&mut self) -> Result<()> {
65        let mut topic_producers: Vec<_> = match self.partitions {
66            None => {
67                // Create a single TopicProducer for non-partitioned topic
68                vec![TopicProducer::new(
69                    self.client.clone(),
70                    self.topic_name.clone(),
71                    self.producer_name.clone(),
72                    self.schema_ref.clone(),
73                    self.dispatch_strategy.clone(),
74                    self.producer_options.clone(),
75                )]
76            }
77            Some(partitions) => {
78                if self.message_router.is_none() {
79                    self.message_router = Some(MessageRouter::new(partitions));
80                };
81
82                (0..partitions)
83                    .map(|partition_id| {
84                        let topic = format!("{}-part-{}", self.topic_name, partition_id);
85                        TopicProducer::new(
86                            self.client.clone(),
87                            topic,
88                            format!("{}-{}", self.producer_name, partition_id),
89                            self.schema_ref.clone(),
90                            self.dispatch_strategy.clone(),
91                            self.producer_options.clone(),
92                        )
93                    })
94                    .collect()
95            }
96        };
97
98        for topic_producer in &mut topic_producers {
99            let _prod_id = topic_producer.create().await?;
100        }
101
102        // ensure that the producers are added only if all topic_producers are succesfully created
103        let mut producers = self.producers.lock().await;
104        *producers = topic_producers;
105
106        Ok(())
107    }
108
109    /// Sends a message to the topic associated with this producer.
110    ///
111    /// It handles the serialization of the payload and any user-defined attributes. This method assumes that the producer has been successfully initialized and is ready to send messages.
112    ///
113    /// # Parameters
114    ///
115    /// - `data`: The message payload to be sent. This should be a `Vec<u8>` representing the content of the message.
116    /// - `attributes`: Optional user-defined properties or attributes associated with the message. This is a `HashMap<String, String>` where keys and values represent the attribute names and values, respectively.
117    ///
118    /// # Returns
119    ///
120    /// - `Ok(u64)`: The sequence ID of the sent message if the operation is successful. This ID can be used for tracking and acknowledging the message.
121    /// - `Err(e)`: An error if message sending fails. Possible reasons for failure include network issues, serialization errors, or broker-related problems.
122    pub async fn send(
123        &self,
124        data: Vec<u8>,
125        attributes: Option<HashMap<String, String>>,
126    ) -> Result<u64> {
127        let partition = self.select_partition();
128        let retry_manager = RetryManager::new(
129            self.producer_options.max_retries,
130            self.producer_options.base_backoff_ms,
131            self.producer_options.max_backoff_ms,
132        );
133
134        let mut attempts = 0;
135
136        loop {
137            let send_result = {
138                let mut producers = self.producers.lock().await;
139                producers[partition].send(&data, attributes.as_ref()).await
140            };
141
142            match send_result {
143                Ok(sequence_id) => return Ok(sequence_id),
144
145                // Unrecoverable: attempt full recreation
146                Err(ref error) if matches!(error, DanubeError::Unrecoverable(_)) => {
147                    warn!(error = ?error, "unrecoverable error, attempting producer recreation");
148                    self.recreate_producer(partition).await?;
149                    attempts = 0;
150                }
151
152                // Retryable: backoff, then escalate to lookup+recreate after max retries
153                Err(error) if retry_manager.is_retryable_error(&error) => {
154                    attempts += 1;
155                    if attempts > retry_manager.max_retries() {
156                        warn!("max retries exceeded, attempting broker lookup and recreation");
157                        self.lookup_and_recreate(partition, error).await?;
158                        attempts = 0;
159                        continue;
160                    }
161                    let backoff = retry_manager.calculate_backoff(attempts - 1);
162                    tokio::time::sleep(backoff).await;
163                }
164
165                // Non-retryable: bail
166                Err(error) => {
167                    error!(error = ?error, "non-retryable error in producer send");
168                    return Err(error);
169                }
170            }
171        }
172    }
173
174    /// Select the next partition using round-robin, or 0 for non-partitioned topics.
175    fn select_partition(&self) -> usize {
176        match self.partitions {
177            Some(_) => self
178                .message_router
179                .as_ref()
180                .expect("message_router must be initialized for partitioned topics")
181                .round_robin(),
182            None => 0,
183        }
184    }
185
186    /// Recreate a single topic producer (e.g., after an unrecoverable error).
187    async fn recreate_producer(&self, partition: usize) -> Result<()> {
188        let mut producers = self.producers.lock().await;
189        producers[partition].create().await?;
190        info!("producer recreation successful");
191        Ok(())
192    }
193
194    /// Look up a new broker and recreate the topic producer on the new connection.
195    /// On lookup failure, returns the `original_error` from the failed send.
196    async fn lookup_and_recreate(
197        &self,
198        partition: usize,
199        original_error: DanubeError,
200    ) -> Result<()> {
201        let mut producers = self.producers.lock().await;
202        let producer = &mut producers[partition];
203
204        let new_addr = producer
205            .client
206            .lookup_service
207            .handle_lookup(&producer.broker_addr, &producer.topic)
208            .await
209            .map_err(|_| original_error)?;
210
211        producer.broker_addr = new_addr;
212        producer.create().await?;
213        info!("broker lookup and producer recreation successful");
214        Ok(())
215    }
216}
217
218/// A builder for creating a new `Producer` instance.
219///
220/// `ProducerBuilder` provides a fluent API for configuring and instantiating a `Producer`.
221/// It allows you to set various properties that define how the producer will behave and interact with the message broker.
222#[derive(Debug, Clone)]
223pub struct ProducerBuilder {
224    client: DanubeClient,
225    topic: Option<String>,
226    num_partitions: Option<usize>,
227    producer_name: Option<String>,
228    // TODO Phase 4: schema removed
229    // schema: Option<Schema>,
230    // Phase 5: Schema registry support
231    schema_ref: Option<SchemaReference>,
232    dispatch_strategy: Option<ConfigDispatchStrategy>,
233    producer_options: ProducerOptions,
234}
235
236impl ProducerBuilder {
237    pub fn new(client: &DanubeClient) -> Self {
238        ProducerBuilder {
239            client: client.clone(),
240            topic: None,
241            num_partitions: None,
242            producer_name: None,
243            schema_ref: None,
244            dispatch_strategy: None,
245            producer_options: ProducerOptions::default(),
246        }
247    }
248
249    /// Sets the topic name for the producer. This is a required field.
250    ///
251    /// This method specifies the topic that the producer will send messages to. It must be set before creating the producer.
252    ///
253    /// # Parameters
254    ///
255    /// - `topic`: The name of the topic for the producer. This should be a non-empty string that corresponds to an existing or new topic.
256    pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
257        self.topic = Some(topic.into());
258        self
259    }
260
261    /// Sets the name of the producer. This is a required field.
262    ///
263    /// This method specifies the name to be assigned to the producer instance. It must be set before creating the producer.
264    ///
265    /// # Parameters
266    ///
267    /// - `producer_name`: The name assigned to the producer instance. This should be a non-empty string used for identifying the producer.
268    pub fn with_name(mut self, producer_name: impl Into<String>) -> Self {
269        self.producer_name = Some(producer_name.into());
270        self
271    }
272
273    // ===== Schema Registry Methods =====
274
275    /// Set schema by subject name (uses latest version)
276    ///
277    /// The producer will reference the latest schema version for the given subject.
278    /// The schema must be registered in the schema registry before use.
279    ///
280    /// # Example
281    /// ```no_run
282    /// # use danube_client::DanubeClient;
283    /// # async fn example(client: DanubeClient) -> Result<(), Box<dyn std::error::Error>> {
284    /// let mut producer = client.new_producer()
285    ///     .with_topic("user-events")
286    ///     .with_name("my-producer")
287    ///     .with_schema_subject("user-events-value")  // Uses latest version
288    ///     .build()?;
289    /// # Ok(())
290    /// # }
291    /// ```
292    pub fn with_schema_subject(mut self, subject: impl Into<String>) -> Self {
293        self.schema_ref = Some(SchemaReference {
294            subject: subject.into(),
295            version_ref: Some(VersionRef::UseLatest(true)),
296        });
297        self
298    }
299
300    /// Set schema with a pinned version
301    ///
302    /// The producer will use a specific schema version and won't automatically
303    /// upgrade to newer versions.
304    ///
305    /// # Example
306    /// ```no_run
307    /// # use danube_client::DanubeClient;
308    /// # async fn example(client: DanubeClient) -> Result<(), Box<dyn std::error::Error>> {
309    /// let mut producer = client.new_producer()
310    ///     .with_topic("user-events")
311    ///     .with_name("my-producer")
312    ///     .with_schema_version("user-events-value", 2)  // Pin to version 2
313    ///     .build()?;
314    /// # Ok(())
315    /// # }
316    /// ```
317    pub fn with_schema_version(mut self, subject: impl Into<String>, version: u32) -> Self {
318        self.schema_ref = Some(SchemaReference {
319            subject: subject.into(),
320            version_ref: Some(VersionRef::PinnedVersion(version)),
321        });
322        self
323    }
324
325    /// Set schema with a minimum version requirement
326    ///
327    /// The producer will use the specified version or any newer compatible version.
328    ///
329    /// # Example
330    /// ```no_run
331    /// # use danube_client::DanubeClient;
332    /// # async fn example(client: DanubeClient) -> Result<(), Box<dyn std::error::Error>> {
333    /// let mut producer = client.new_producer()
334    ///     .with_topic("user-events")
335    ///     .with_name("my-producer")
336    ///     .with_schema_min_version("user-events-value", 2)  // Use v2 or newer
337    ///     .build()?;
338    /// # Ok(())
339    /// # }
340    /// ```
341    pub fn with_schema_min_version(mut self, subject: impl Into<String>, min_version: u32) -> Self {
342        self.schema_ref = Some(SchemaReference {
343            subject: subject.into(),
344            version_ref: Some(VersionRef::MinVersion(min_version)),
345        });
346        self
347    }
348
349    /// Set schema with a custom SchemaReference (advanced use)
350    ///
351    /// This allows full control over schema versioning. For most use cases,
352    /// prefer `with_schema_subject()`, `with_schema_version()`, or `with_schema_min_version()`.
353    ///
354    /// # Example
355    /// ```no_run
356    /// # use danube_client::{DanubeClient, SchemaReference, VersionRef};
357    /// # async fn example(client: DanubeClient) -> Result<(), Box<dyn std::error::Error>> {
358    /// let mut producer = client.new_producer()
359    ///     .with_topic("user-events")
360    ///     .with_name("my-producer")
361    ///     .with_schema_reference(SchemaReference {
362    ///         subject: "user-events-value".to_string(),
363    ///         version_ref: Some(VersionRef::PinnedVersion(2)),
364    ///     })
365    ///     .build()?;
366    /// # Ok(())
367    /// # }
368    /// ```
369    pub fn with_schema_reference(mut self, schema_ref: SchemaReference) -> Self {
370        self.schema_ref = Some(schema_ref);
371        self
372    }
373
374    /// Sets the reliable dispatch options for the producer.
375    /// This method configures the dispatch strategy for the producer, which determines how messages are stored and managed.
376    /// The dispatch strategy defines how long messages are retained and how they are managed in the message broker.
377    ///
378    /// # Parameters
379    ///
380    /// No parameters; broker uses defaults for reliable topics.
381    pub fn with_reliable_dispatch(mut self) -> Self {
382        let dispatch_strategy = ConfigDispatchStrategy::Reliable;
383        self.dispatch_strategy = Some(dispatch_strategy);
384        self
385    }
386
387    /// Sets the configuration options for the producer, allowing customization of producer behavior.
388    ///
389    /// This method allows you to specify various configuration options that affect how the producer operates.
390    /// These options can control aspects such as retries, timeouts, and other producer-specific settings.
391    ///
392    /// # Parameters
393    ///
394    /// - `options`: A `ProducerOptions` instance containing the configuration options for the producer. This should be configured according to the desired behavior and requirements of the producer.
395    pub fn with_options(mut self, options: ProducerOptions) -> Self {
396        self.producer_options = options;
397        self
398    }
399
400    /// Sets the number of partitions for the topic.
401    ///
402    /// This method specifies how many partitions the topic should have. Partitions are used to distribute the load of messages across multiple Danube brokers, which can help with parallel processing and scalability.
403    ///
404    /// # Parameters
405    ///
406    /// - `partitions`: The number of partitions for the topic. This should be a positive integer representing the desired number of partitions. More partitions can improve parallelism and throughput. Default is 0 = non-partitioned topic.
407    pub fn with_partitions(mut self, partitions: usize) -> Self {
408        self.num_partitions = Some(partitions);
409        self
410    }
411
412    /// Creates a new `Producer` instance using the settings configured in the `ProducerBuilder`.
413    ///
414    /// This method performs validation to ensure that all required fields are set before creating the `Producer`. Once validation is successful, it constructs and returns a new `Producer` instance configured with the specified settings.
415    ///
416    /// # Returns
417    ///
418    /// - A `Producer` instance if the builder configuration is valid and the producer is created successfully.
419    ///
420    /// # Example
421    /// ```no_run
422    /// # use danube_client::DanubeClient;
423    /// # async fn example(client: DanubeClient) -> Result<(), Box<dyn std::error::Error>> {
424    /// let mut producer = client.new_producer()
425    ///     .with_topic("my-topic")
426    ///     .with_name("my-producer")
427    ///     .with_partitions(3)
428    ///     .build()?;
429    /// # Ok(())
430    /// # }
431    /// ```
432    pub fn build(self) -> Result<Producer> {
433        let topic_name = self.topic.ok_or_else(|| {
434            DanubeError::Unrecoverable("topic is required to build a Producer".into())
435        })?;
436        let producer_name = self.producer_name.ok_or_else(|| {
437            DanubeError::Unrecoverable("producer name is required to build a Producer".into())
438        })?;
439
440        if let Some(0) = self.num_partitions {
441            return Err(DanubeError::Unrecoverable(
442                "partitions must be > 0 or omitted for non-partitioned topic".into(),
443            ));
444        }
445
446        Ok(Producer::new(
447            self.client,
448            topic_name,
449            self.schema_ref,
450            self.dispatch_strategy,
451            producer_name,
452            self.num_partitions,
453            None,
454            self.producer_options,
455        ))
456    }
457}
458
459/// Configuration options for producers
460#[derive(Debug, Clone, Default)]
461#[non_exhaustive]
462pub struct ProducerOptions {
463    // Maximum number of retries for operations like create/send on transient failures
464    pub max_retries: usize,
465    // Base backoff in milliseconds for exponential backoff
466    pub base_backoff_ms: u64,
467    // Maximum backoff cap in milliseconds
468    pub max_backoff_ms: u64,
469}