danube_client/
producer.rs

1use crate::{
2    errors::Result, message_router::MessageRouter, topic_producer::TopicProducer, DanubeClient,
3    Schema, SchemaType,
4};
5
6use danube_core::dispatch_strategy::ConfigDispatchStrategy;
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::Mutex;
10
11/// Represents a message producer responsible for sending messages to partitioned or non-partitioned topics distributed across message brokers.
12///
13/// The `Producer` struct is designed to handle the creation and management of a producer instance that sends messages to either partitioned or non-partitioned topics.
14/// It manages the producer's state and ensures that messages are sent according to the configured settings.
15#[derive(Debug)]
16pub struct Producer {
17    client: DanubeClient,
18    topic_name: String,
19    schema: Schema,
20    dispatch_strategy: ConfigDispatchStrategy,
21    producer_name: String,
22    partitions: Option<usize>,
23    message_router: Option<MessageRouter>,
24    producers: Arc<Mutex<Vec<TopicProducer>>>,
25    producer_options: ProducerOptions,
26}
27
28impl Producer {
29    pub(crate) fn new(
30        client: DanubeClient,
31        topic_name: String,
32        schema: Option<Schema>,
33        dispatch_strategy: Option<ConfigDispatchStrategy>,
34        producer_name: String,
35        partitions: Option<usize>,
36        message_router: Option<MessageRouter>,
37        producer_options: ProducerOptions,
38    ) -> Self {
39        // default schema is String if not specified
40        let schema = if let Some(sch) = schema {
41            sch
42        } else {
43            Schema::new("string_schema".into(), SchemaType::String)
44        };
45
46        let dispatch_strategy = if let Some(retention) = dispatch_strategy {
47            retention
48        } else {
49            ConfigDispatchStrategy::default()
50        };
51
52        Producer {
53            client,
54            topic_name,
55            schema,
56            dispatch_strategy,
57            producer_name,
58            partitions,
59            message_router,
60            producers: Arc::new(Mutex::new(Vec::new())),
61            producer_options,
62        }
63    }
64
65    /// Initializes the producer and registers it with the message brokers.
66    ///
67    /// This asynchronous method sets up the producer by establishing connections with the message brokers and configuring it for sending messages to the specified topic.
68    /// It is responsible for creating the necessary resources for producers handling partitioned topics.
69    pub async fn create(&mut self) -> Result<()> {
70        let mut topic_producers: Vec<_> = match self.partitions {
71            None => {
72                // Create a single TopicProducer for non-partitioned topic
73                vec![TopicProducer::new(
74                    self.client.clone(),
75                    self.topic_name.clone(),
76                    self.producer_name.clone(),
77                    self.schema.clone(),
78                    self.dispatch_strategy.clone(),
79                    self.producer_options.clone(),
80                )]
81            }
82            Some(partitions) => {
83                if self.message_router.is_none() {
84                    self.message_router = Some(MessageRouter::new(partitions));
85                };
86
87                (0..partitions)
88                    .map(|partition_id| {
89                        let topic = format!("{}-part-{}", self.topic_name, partition_id);
90                        TopicProducer::new(
91                            self.client.clone(),
92                            topic,
93                            format!("{}-{}", self.producer_name, partition_id),
94                            self.schema.clone(),
95                            self.dispatch_strategy.clone(),
96                            self.producer_options.clone(),
97                        )
98                    })
99                    .collect()
100            }
101        };
102
103        for topic_producer in &mut topic_producers {
104            let _prod_id = topic_producer.create().await?;
105        }
106
107        // ensure that the producers are added only if all topic_producers are succesfully created
108        let mut producers = self.producers.lock().await;
109        *producers = topic_producers;
110
111        Ok(())
112    }
113
114    /// Sends a message to the topic associated with this producer.
115    ///
116    /// It handles the serialization of the payload and any user-defined attributes. This method assumes that the producer has been successfully initialized and is ready to send messages.
117    ///
118    /// # Parameters
119    ///
120    /// - `data`: The message payload to be sent. This should be a `Vec<u8>` representing the content of the message.
121    /// - `attributes`: Optional user-defined properties or attributes associated with the message. This is a `HashMap<String, String>` where keys and values represent the attribute names and values, respectively.
122    ///
123    /// # Returns
124    ///
125    /// - `Ok(u64)`: The sequence ID of the sent message if the operation is successful. This ID can be used for tracking and acknowledging the message.
126    /// - `Err(e)`: An error if message sending fails. Possible reasons for failure include network issues, serialization errors, or broker-related problems.
127    pub async fn send(
128        &self,
129        data: Vec<u8>,
130        attributes: Option<HashMap<String, String>>,
131    ) -> Result<u64> {
132        use crate::retry_manager::RetryManager;
133        use crate::errors::DanubeError;
134
135        let next_partition = match self.partitions {
136            Some(_) => self
137                .message_router
138                .as_ref()
139                .expect("already initialized")
140                .round_robin(),
141
142            None => 0,
143        };
144
145        // Create retry manager for producers
146        let retry_manager = RetryManager::new(
147            self.producer_options.max_retries,
148            self.producer_options.base_backoff_ms,
149            self.producer_options.max_backoff_ms,
150        );
151
152        let mut attempts = 0;
153        let max_retries = if self.producer_options.max_retries == 0 { 5 } else { self.producer_options.max_retries };
154
155        loop {
156            let send_result = {
157                let mut producers = self.producers.lock().await;
158                producers[next_partition].send(data.clone(), attributes.clone()).await
159            };
160
161            match send_result {
162                Ok(sequence_id) => return Ok(sequence_id),
163                Err(error) => {
164                    // Check if this is an unrecoverable error (e.g., stream client not initialized)
165                    if matches!(error, DanubeError::Unrecoverable(_)) {
166                        eprintln!("Unrecoverable error detected in producer send, attempting recreation: {:?}", error);
167                        
168                        // Attempt to recreate the producer for unrecoverable errors
169                        let recreate_result = {
170                            let mut producers = self.producers.lock().await;
171                            producers[next_partition].create().await
172                        };
173                        
174                        match recreate_result {
175                            Ok(_) => {
176                                eprintln!("Producer recreation successful after unrecoverable error, continuing...");
177                                attempts = 0; // Reset attempts after successful recreation
178                                continue; // Go back to sending
179                            }
180                            Err(e) => {
181                                eprintln!("Producer recreation failed after unrecoverable error: {:?}", e);
182                                return Err(e); // Return error if recreation fails
183                            }
184                        }
185                    }
186                    
187                    // Failed to send, check if retryable
188                    if retry_manager.is_retryable_error(&error) {
189                        attempts += 1;
190                        if attempts > max_retries {
191                            eprintln!("Max retries exceeded for producer send, attempting broker lookup and recreation");
192                            
193                            // Attempt broker lookup and producer recreation
194                            let lookup_and_recreate_result = {
195                                let mut producers = self.producers.lock().await;
196                                let producer = &mut producers[next_partition];
197                                
198                                // Perform lookup and reconnect
199                                if let Ok(new_addr) = producer.client.lookup_service.handle_lookup(&producer.client.uri, &producer.topic).await {
200                                    producer.client.uri = new_addr;
201                                    producer.connect(&producer.client.uri.clone()).await?;
202                                    // Recreate producer on new connection
203                                    producer.create().await
204                                } else {
205                                    Err(error)
206                                }
207                            };
208                            
209                            match lookup_and_recreate_result {
210                                Ok(_) => {
211                                    eprintln!("Broker lookup and producer recreation successful, continuing...");
212                                    attempts = 0; // Reset attempts after successful recreation
213                                    continue; // Go back to sending
214                                }
215                                Err(e) => {
216                                    eprintln!("Broker lookup and producer recreation failed: {:?}", e);
217                                    return Err(e); // Return error if recreation fails
218                                }
219                            }
220                        }
221                        let backoff = retry_manager.calculate_backoff(attempts - 1);
222                        tokio::time::sleep(backoff).await;
223                    } else {
224                        eprintln!("Non-retryable error in producer send: {:?}", error);
225                        return Err(error); // Non-retryable error
226                    }
227                }
228            }
229        }
230    }
231}
232
233/// A builder for creating a new `Producer` instance.
234///
235/// `ProducerBuilder` provides a fluent API for configuring and instantiating a `Producer`.
236/// It allows you to set various properties that define how the producer will behave and interact with the message broker.
237#[derive(Debug, Clone)]
238pub struct ProducerBuilder {
239    client: DanubeClient,
240    topic: Option<String>,
241    num_partitions: Option<usize>,
242    producer_name: Option<String>,
243    schema: Option<Schema>,
244    dispatch_strategy: Option<ConfigDispatchStrategy>,
245    producer_options: ProducerOptions,
246}
247
248impl ProducerBuilder {
249    pub fn new(client: &DanubeClient) -> Self {
250        ProducerBuilder {
251            client: client.clone(),
252            topic: None,
253            num_partitions: None,
254            producer_name: None,
255            schema: None,
256            dispatch_strategy: None,
257            producer_options: ProducerOptions::default(),
258        }
259    }
260
261    /// Sets the topic name for the producer. This is a required field.
262    ///
263    /// This method specifies the topic that the producer will send messages to. It must be set before creating the producer.
264    ///
265    /// # Parameters
266    ///
267    /// - `topic`: The name of the topic for the producer. This should be a non-empty string that corresponds to an existing or new topic.
268    pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
269        self.topic = Some(topic.into());
270        self
271    }
272
273    /// Sets the name of the producer. This is a required field.
274    ///
275    /// This method specifies the name to be assigned to the producer instance. It must be set before creating the producer.
276    ///
277    /// # Parameters
278    ///
279    /// - `producer_name`: The name assigned to the producer instance. This should be a non-empty string used for identifying the producer.
280    pub fn with_name(mut self, producer_name: impl Into<String>) -> Self {
281        self.producer_name = Some(producer_name.into());
282        self
283    }
284
285    /// Sets the schema for the producer, defining the structure of the messages.
286    ///
287    /// This method configures the schema used by the producer to serialize messages. The schema specifies how messages are structured and interpreted.
288    /// It is especially important for ensuring that messages adhere to a specific format and can be properly deserialized by consumers.
289    ///
290    /// # Parameters
291    ///
292    /// - `schema_name`: The name of the schema. This should be a non-empty string that identifies the schema.
293    ///
294    /// - `schema_type`: The type of the schema, which determines the format of the data:
295    ///   - `SchemaType::Bytes`: Indicates that the schema uses raw byte data.
296    ///   - `SchemaType::String`: Indicates that the schema uses string data.
297    ///   - `SchemaType::Int64`: Indicates that the schema uses 64-bit integer data.
298    ///   - `SchemaType::Json(String)`: Indicates that the schema uses JSON data. The `String` contains the JSON schema definition.
299    pub fn with_schema(mut self, schema_name: String, schema_type: SchemaType) -> Self {
300        self.schema = Some(Schema::new(schema_name, schema_type));
301        self
302    }
303
304    /// Sets the reliable dispatch options for the producer.
305    /// This method configures the dispatch strategy for the producer, which determines how messages are stored and managed.
306    /// The dispatch strategy defines how long messages are retained and how they are managed in the message broker.
307    ///
308    /// # Parameters
309    ///
310    /// No parameters; broker uses defaults for reliable topics.
311    pub fn with_reliable_dispatch(mut self) -> Self {
312        let dispatch_strategy = ConfigDispatchStrategy::Reliable;
313        self.dispatch_strategy = Some(dispatch_strategy);
314        self
315    }
316
317    /// Sets the configuration options for the producer, allowing customization of producer behavior.
318    ///
319    /// This method allows you to specify various configuration options that affect how the producer operates.
320    /// These options can control aspects such as retries, timeouts, and other producer-specific settings.
321    ///
322    /// # Parameters
323    ///
324    /// - `options`: A `ProducerOptions` instance containing the configuration options for the producer. This should be configured according to the desired behavior and requirements of the producer.
325    pub fn with_options(mut self, options: ProducerOptions) -> Self {
326        self.producer_options = options;
327        self
328    }
329
330    /// Sets the number of partitions for the topic.
331    ///
332    /// This method specifies how many partitions the topic should have. Partitions are used to distribute the load of messages across multiple Danube brokers, which can help with parallel processing and scalability.
333    ///
334    /// # Parameters
335    ///
336    /// - `partitions`: The number of partitions for the topic. This should be a positive integer representing the desired number of partitions. More partitions can improve parallelism and throughput. Default is 0 = non-partitioned topic.
337    pub fn with_partitions(mut self, partitions: usize) -> Self {
338        self.num_partitions = Some(partitions);
339        self
340    }
341
342    /// Creates a new `Producer` instance using the settings configured in the `ProducerBuilder`.
343    ///
344    /// This method performs validation to ensure that all required fields are set before creating the `Producer`. Once validation is successful, it constructs and returns a new `Producer` instance configured with the specified settings.
345    ///
346    /// # Returns
347    ///
348    /// - A `Producer` instance if the builder configuration is valid and the producer is created successfully.
349    ///
350    /// # Example
351    ///
352    /// let producer = ProducerBuilder::new()
353    ///     .with_topic("my-topic")
354    ///     .with_name("my-producer")
355    ///     .with_partitions(3)
356    ///     .with_schema("my-schema".to_string(), SchemaType::Json("schema-definition".to_string()))
357    ///     .build()?;
358    ///
359    pub fn build(self) -> Producer {
360        let topic_name = self
361            .topic
362            .expect("can't create a producer without assigning to a topic");
363        let producer_name = self
364            .producer_name
365            .expect("you should provide a name to the created producer");
366
367        Producer::new(
368            self.client,
369            topic_name,
370            self.schema,
371            self.dispatch_strategy,
372            producer_name,
373            self.num_partitions,
374            None,
375            self.producer_options,
376        )
377    }
378}
379
380/// Configuration options for producers
381#[derive(Debug, Clone, Default)]
382pub struct ProducerOptions {
383    // Reserved for future use
384    pub others: String,
385    // Maximum number of retries for operations like create/send on transient failures
386    pub max_retries: usize,
387    // Base backoff in milliseconds for exponential backoff
388    pub base_backoff_ms: u64,
389    // Maximum backoff cap in milliseconds
390    pub max_backoff_ms: u64,
391}