danube_client/
producer.rs

1use crate::{
2    errors::Result,
3    message_router::MessageRouter,
4    topic_producer::TopicProducer,
5    DanubeClient,
6    // TODO Phase 4: Schema removed
7    // Schema, SchemaType,
8};
9
10use danube_core::dispatch_strategy::ConfigDispatchStrategy;
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::Mutex;
14
15/// Represents a message producer responsible for sending messages to partitioned or non-partitioned topics distributed across message brokers.
16///
17/// The `Producer` struct is designed to handle the creation and management of a producer instance that sends messages to either partitioned or non-partitioned topics.
18/// It manages the producer's state and ensures that messages are sent according to the configured settings.
19use danube_core::proto::SchemaReference as ProtoSchemaReference;
20
21#[derive(Debug)]
22pub struct Producer {
23    client: DanubeClient,
24    topic_name: String,
25    schema_ref: Option<ProtoSchemaReference>,
26    dispatch_strategy: ConfigDispatchStrategy,
27    producer_name: String,
28    partitions: Option<usize>,
29    message_router: Option<MessageRouter>,
30    producers: Arc<Mutex<Vec<TopicProducer>>>,
31    producer_options: ProducerOptions,
32}
33
34impl Producer {
35    pub(crate) fn new(
36        client: DanubeClient,
37        topic_name: String,
38        schema_ref: Option<ProtoSchemaReference>,
39        dispatch_strategy: Option<ConfigDispatchStrategy>,
40        producer_name: String,
41        partitions: Option<usize>,
42        message_router: Option<MessageRouter>,
43        producer_options: ProducerOptions,
44    ) -> Self {
45        let dispatch_strategy = if let Some(retention) = dispatch_strategy {
46            retention
47        } else {
48            ConfigDispatchStrategy::default()
49        };
50
51        Producer {
52            client,
53            topic_name,
54            schema_ref,
55            dispatch_strategy,
56            producer_name,
57            partitions,
58            message_router,
59            producers: Arc::new(Mutex::new(Vec::new())),
60            producer_options,
61        }
62    }
63
64    /// Initializes the producer and registers it with the message brokers.
65    ///
66    /// This asynchronous method sets up the producer by establishing connections with the message brokers and configuring it for sending messages to the specified topic.
67    /// It is responsible for creating the necessary resources for producers handling partitioned topics.
68    pub async fn create(&mut self) -> Result<()> {
69        let mut topic_producers: Vec<_> = match self.partitions {
70            None => {
71                // Create a single TopicProducer for non-partitioned topic
72                vec![TopicProducer::new(
73                    self.client.clone(),
74                    self.topic_name.clone(),
75                    self.producer_name.clone(),
76                    self.schema_ref.clone(),
77                    self.dispatch_strategy.clone(),
78                    self.producer_options.clone(),
79                )]
80            }
81            Some(partitions) => {
82                if self.message_router.is_none() {
83                    self.message_router = Some(MessageRouter::new(partitions));
84                };
85
86                (0..partitions)
87                    .map(|partition_id| {
88                        let topic = format!("{}-part-{}", self.topic_name, partition_id);
89                        TopicProducer::new(
90                            self.client.clone(),
91                            topic,
92                            format!("{}-{}", self.producer_name, partition_id),
93                            self.schema_ref.clone(),
94                            self.dispatch_strategy.clone(),
95                            self.producer_options.clone(),
96                        )
97                    })
98                    .collect()
99            }
100        };
101
102        for topic_producer in &mut topic_producers {
103            let _prod_id = topic_producer.create().await?;
104        }
105
106        // ensure that the producers are added only if all topic_producers are succesfully created
107        let mut producers = self.producers.lock().await;
108        *producers = topic_producers;
109
110        Ok(())
111    }
112
113    /// Sends a message to the topic associated with this producer.
114    ///
115    /// It handles the serialization of the payload and any user-defined attributes. This method assumes that the producer has been successfully initialized and is ready to send messages.
116    ///
117    /// # Parameters
118    ///
119    /// - `data`: The message payload to be sent. This should be a `Vec<u8>` representing the content of the message.
120    /// - `attributes`: Optional user-defined properties or attributes associated with the message. This is a `HashMap<String, String>` where keys and values represent the attribute names and values, respectively.
121    ///
122    /// # Returns
123    ///
124    /// - `Ok(u64)`: The sequence ID of the sent message if the operation is successful. This ID can be used for tracking and acknowledging the message.
125    /// - `Err(e)`: An error if message sending fails. Possible reasons for failure include network issues, serialization errors, or broker-related problems.
126    pub async fn send(
127        &self,
128        data: Vec<u8>,
129        attributes: Option<HashMap<String, String>>,
130    ) -> Result<u64> {
131        use crate::errors::DanubeError;
132        use crate::retry_manager::RetryManager;
133
134        let next_partition = match self.partitions {
135            Some(_) => self
136                .message_router
137                .as_ref()
138                .expect("already initialized")
139                .round_robin(),
140
141            None => 0,
142        };
143
144        // Create retry manager for producers
145        let retry_manager = RetryManager::new(
146            self.producer_options.max_retries,
147            self.producer_options.base_backoff_ms,
148            self.producer_options.max_backoff_ms,
149        );
150
151        let mut attempts = 0;
152        let max_retries = if self.producer_options.max_retries == 0 {
153            5
154        } else {
155            self.producer_options.max_retries
156        };
157
158        loop {
159            let send_result = {
160                let mut producers = self.producers.lock().await;
161                producers[next_partition]
162                    .send(data.clone(), attributes.clone())
163                    .await
164            };
165
166            match send_result {
167                Ok(sequence_id) => return Ok(sequence_id),
168                Err(error) => {
169                    // Check if this is an unrecoverable error (e.g., stream client not initialized)
170                    if matches!(error, DanubeError::Unrecoverable(_)) {
171                        eprintln!("Unrecoverable error detected in producer send, attempting recreation: {:?}", error);
172
173                        // Attempt to recreate the producer for unrecoverable errors
174                        let recreate_result = {
175                            let mut producers = self.producers.lock().await;
176                            producers[next_partition].create().await
177                        };
178
179                        match recreate_result {
180                            Ok(_) => {
181                                eprintln!("Producer recreation successful after unrecoverable error, continuing...");
182                                attempts = 0; // Reset attempts after successful recreation
183                                continue; // Go back to sending
184                            }
185                            Err(e) => {
186                                eprintln!(
187                                    "Producer recreation failed after unrecoverable error: {:?}",
188                                    e
189                                );
190                                return Err(e); // Return error if recreation fails
191                            }
192                        }
193                    }
194
195                    // Failed to send, check if retryable
196                    if retry_manager.is_retryable_error(&error) {
197                        attempts += 1;
198                        if attempts > max_retries {
199                            eprintln!("Max retries exceeded for producer send, attempting broker lookup and recreation");
200
201                            // Attempt broker lookup and producer recreation
202                            let lookup_and_recreate_result = {
203                                let mut producers = self.producers.lock().await;
204                                let producer = &mut producers[next_partition];
205
206                                // Perform lookup and reconnect
207                                if let Ok(new_addr) = producer
208                                    .client
209                                    .lookup_service
210                                    .handle_lookup(&producer.client.uri, &producer.topic)
211                                    .await
212                                {
213                                    producer.client.uri = new_addr;
214                                    producer.connect(&producer.client.uri.clone()).await?;
215                                    // Recreate producer on new connection
216                                    producer.create().await
217                                } else {
218                                    Err(error)
219                                }
220                            };
221
222                            match lookup_and_recreate_result {
223                                Ok(_) => {
224                                    eprintln!("Broker lookup and producer recreation successful, continuing...");
225                                    attempts = 0; // Reset attempts after successful recreation
226                                    continue; // Go back to sending
227                                }
228                                Err(e) => {
229                                    eprintln!(
230                                        "Broker lookup and producer recreation failed: {:?}",
231                                        e
232                                    );
233                                    return Err(e); // Return error if recreation fails
234                                }
235                            }
236                        }
237                        let backoff = retry_manager.calculate_backoff(attempts - 1);
238                        tokio::time::sleep(backoff).await;
239                    } else {
240                        eprintln!("Non-retryable error in producer send: {:?}", error);
241                        return Err(error); // Non-retryable error
242                    }
243                }
244            }
245        }
246    }
247}
248
249/// A builder for creating a new `Producer` instance.
250///
251/// `ProducerBuilder` provides a fluent API for configuring and instantiating a `Producer`.
252/// It allows you to set various properties that define how the producer will behave and interact with the message broker.
253use danube_core::proto::SchemaReference;
254
255#[derive(Debug, Clone)]
256pub struct ProducerBuilder {
257    client: DanubeClient,
258    topic: Option<String>,
259    num_partitions: Option<usize>,
260    producer_name: Option<String>,
261    // TODO Phase 4: schema removed
262    // schema: Option<Schema>,
263    // Phase 5: Schema registry support
264    schema_ref: Option<SchemaReference>,
265    dispatch_strategy: Option<ConfigDispatchStrategy>,
266    producer_options: ProducerOptions,
267}
268
269impl ProducerBuilder {
270    pub fn new(client: &DanubeClient) -> Self {
271        ProducerBuilder {
272            client: client.clone(),
273            topic: None,
274            num_partitions: None,
275            producer_name: None,
276            schema_ref: None,
277            dispatch_strategy: None,
278            producer_options: ProducerOptions::default(),
279        }
280    }
281
282    /// Sets the topic name for the producer. This is a required field.
283    ///
284    /// This method specifies the topic that the producer will send messages to. It must be set before creating the producer.
285    ///
286    /// # Parameters
287    ///
288    /// - `topic`: The name of the topic for the producer. This should be a non-empty string that corresponds to an existing or new topic.
289    pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
290        self.topic = Some(topic.into());
291        self
292    }
293
294    /// Sets the name of the producer. This is a required field.
295    ///
296    /// This method specifies the name to be assigned to the producer instance. It must be set before creating the producer.
297    ///
298    /// # Parameters
299    ///
300    /// - `producer_name`: The name assigned to the producer instance. This should be a non-empty string used for identifying the producer.
301    pub fn with_name(mut self, producer_name: impl Into<String>) -> Self {
302        self.producer_name = Some(producer_name.into());
303        self
304    }
305
306    // ===== Schema Registry Methods =====
307
308    /// Set schema by subject name (uses latest version)
309    ///
310    /// The producer will reference the latest schema version for the given subject.
311    /// The schema must be registered in the schema registry before use.
312    ///
313    /// # Example
314    /// ```no_run
315    /// # use danube_client::DanubeClient;
316    /// # async fn example(client: DanubeClient) -> Result<(), Box<dyn std::error::Error>> {
317    /// let producer = client.producer()
318    ///     .with_topic("user-events")
319    ///     .with_schema_subject("user-events-value")  // Uses latest version
320    ///     .build();
321    /// # Ok(())
322    /// # }
323    /// ```
324    pub fn with_schema_subject(mut self, subject: impl Into<String>) -> Self {
325        use danube_core::proto::schema_reference::VersionRef;
326
327        self.schema_ref = Some(SchemaReference {
328            subject: subject.into(),
329            version_ref: Some(VersionRef::UseLatest(true)),
330        });
331        self
332    }
333
334    /// Set schema with a pinned version
335    ///
336    /// The producer will use a specific schema version and won't automatically
337    /// upgrade to newer versions.
338    ///
339    /// # Example
340    /// ```no_run
341    /// # use danube_client::DanubeClient;
342    /// # async fn example(client: DanubeClient) -> Result<(), Box<dyn std::error::Error>> {
343    /// let producer = client.producer()
344    ///     .with_topic("user-events")
345    ///     .with_schema_version("user-events-value", 2)  // Pin to version 2
346    ///     .build();
347    /// # Ok(())
348    /// # }
349    /// ```
350    pub fn with_schema_version(mut self, subject: impl Into<String>, version: u32) -> Self {
351        use danube_core::proto::schema_reference::VersionRef;
352
353        self.schema_ref = Some(SchemaReference {
354            subject: subject.into(),
355            version_ref: Some(VersionRef::PinnedVersion(version)),
356        });
357        self
358    }
359
360    /// Set schema with a minimum version requirement
361    ///
362    /// The producer will use the specified version or any newer compatible version.
363    ///
364    /// # Example
365    /// ```no_run
366    /// # use danube_client::DanubeClient;
367    /// # async fn example(client: DanubeClient) -> Result<(), Box<dyn std::error::Error>> {
368    /// let producer = client.producer()
369    ///     .with_topic("user-events")
370    ///     .with_schema_min_version("user-events-value", 2)  // Use v2 or newer
371    ///     .build();
372    /// # Ok(())
373    /// # }
374    /// ```
375    pub fn with_schema_min_version(mut self, subject: impl Into<String>, min_version: u32) -> Self {
376        use danube_core::proto::schema_reference::VersionRef;
377
378        self.schema_ref = Some(SchemaReference {
379            subject: subject.into(),
380            version_ref: Some(VersionRef::MinVersion(min_version)),
381        });
382        self
383    }
384
385    /// Set schema with a custom SchemaReference (advanced use)
386    ///
387    /// This allows full control over schema versioning. For most use cases,
388    /// prefer `with_schema_subject()`, `with_schema_version()`, or `with_schema_min_version()`.
389    ///
390    /// # Example
391    /// ```no_run
392    /// # use danube_client::{DanubeClient, SchemaReference, VersionRef};
393    /// # async fn example(client: DanubeClient) -> Result<(), Box<dyn std::error::Error>> {
394    /// let producer = client.producer()
395    ///     .with_topic("user-events")
396    ///     .with_schema_reference(SchemaReference {
397    ///         subject: "user-events-value".to_string(),
398    ///         version_ref: Some(VersionRef::PinnedVersion(2)),
399    ///     })
400    ///     .build();
401    /// # Ok(())
402    /// # }
403    /// ```
404    pub fn with_schema_reference(mut self, schema_ref: SchemaReference) -> Self {
405        self.schema_ref = Some(schema_ref);
406        self
407    }
408
409    /// Sets the reliable dispatch options for the producer.
410    /// This method configures the dispatch strategy for the producer, which determines how messages are stored and managed.
411    /// The dispatch strategy defines how long messages are retained and how they are managed in the message broker.
412    ///
413    /// # Parameters
414    ///
415    /// No parameters; broker uses defaults for reliable topics.
416    pub fn with_reliable_dispatch(mut self) -> Self {
417        let dispatch_strategy = ConfigDispatchStrategy::Reliable;
418        self.dispatch_strategy = Some(dispatch_strategy);
419        self
420    }
421
422    /// Sets the configuration options for the producer, allowing customization of producer behavior.
423    ///
424    /// This method allows you to specify various configuration options that affect how the producer operates.
425    /// These options can control aspects such as retries, timeouts, and other producer-specific settings.
426    ///
427    /// # Parameters
428    ///
429    /// - `options`: A `ProducerOptions` instance containing the configuration options for the producer. This should be configured according to the desired behavior and requirements of the producer.
430    pub fn with_options(mut self, options: ProducerOptions) -> Self {
431        self.producer_options = options;
432        self
433    }
434
435    /// Sets the number of partitions for the topic.
436    ///
437    /// This method specifies how many partitions the topic should have. Partitions are used to distribute the load of messages across multiple Danube brokers, which can help with parallel processing and scalability.
438    ///
439    /// # Parameters
440    ///
441    /// - `partitions`: The number of partitions for the topic. This should be a positive integer representing the desired number of partitions. More partitions can improve parallelism and throughput. Default is 0 = non-partitioned topic.
442    pub fn with_partitions(mut self, partitions: usize) -> Self {
443        self.num_partitions = Some(partitions);
444        self
445    }
446
447    /// Creates a new `Producer` instance using the settings configured in the `ProducerBuilder`.
448    ///
449    /// This method performs validation to ensure that all required fields are set before creating the `Producer`. Once validation is successful, it constructs and returns a new `Producer` instance configured with the specified settings.
450    ///
451    /// # Returns
452    ///
453    /// - A `Producer` instance if the builder configuration is valid and the producer is created successfully.
454    ///
455    /// # Example
456    ///
457    /// let producer = ProducerBuilder::new()
458    ///     .with_topic("my-topic")
459    ///     .with_name("my-producer")
460    ///     .with_partitions(3)
461    ///     .with_schema("my-schema".to_string(), SchemaType::Json("schema-definition".to_string()))
462    ///     .build()?;
463    ///
464    pub fn build(self) -> Producer {
465        let topic_name = self
466            .topic
467            .expect("can't create a producer without assigning to a topic");
468        let producer_name = self
469            .producer_name
470            .expect("you should provide a name to the created producer");
471
472        Producer::new(
473            self.client,
474            topic_name,
475            self.schema_ref, // Phase 5: SchemaReference support
476            self.dispatch_strategy,
477            producer_name,
478            self.num_partitions,
479            None,
480            self.producer_options,
481        )
482    }
483}
484
485/// Configuration options for producers
486#[derive(Debug, Clone, Default)]
487pub struct ProducerOptions {
488    // Reserved for future use
489    pub others: String,
490    // Maximum number of retries for operations like create/send on transient failures
491    pub max_retries: usize,
492    // Base backoff in milliseconds for exponential backoff
493    pub base_backoff_ms: u64,
494    // Maximum backoff cap in milliseconds
495    pub max_backoff_ms: u64,
496}