danube_client/producer.rs
1use crate::{
2 errors::Result,
3 message_router::MessageRouter,
4 topic_producer::TopicProducer,
5 DanubeClient,
6 // TODO Phase 4: Schema removed
7 // Schema, SchemaType,
8};
9
10use danube_core::dispatch_strategy::ConfigDispatchStrategy;
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::Mutex;
14
15/// Represents a message producer responsible for sending messages to partitioned or non-partitioned topics distributed across message brokers.
16///
17/// The `Producer` struct is designed to handle the creation and management of a producer instance that sends messages to either partitioned or non-partitioned topics.
18/// It manages the producer's state and ensures that messages are sent according to the configured settings.
19use danube_core::proto::SchemaReference as ProtoSchemaReference;
20
21#[derive(Debug)]
22pub struct Producer {
23 client: DanubeClient,
24 topic_name: String,
25 schema_ref: Option<ProtoSchemaReference>,
26 dispatch_strategy: ConfigDispatchStrategy,
27 producer_name: String,
28 partitions: Option<usize>,
29 message_router: Option<MessageRouter>,
30 producers: Arc<Mutex<Vec<TopicProducer>>>,
31 producer_options: ProducerOptions,
32}
33
34impl Producer {
35 pub(crate) fn new(
36 client: DanubeClient,
37 topic_name: String,
38 schema_ref: Option<ProtoSchemaReference>,
39 dispatch_strategy: Option<ConfigDispatchStrategy>,
40 producer_name: String,
41 partitions: Option<usize>,
42 message_router: Option<MessageRouter>,
43 producer_options: ProducerOptions,
44 ) -> Self {
45 let dispatch_strategy = if let Some(retention) = dispatch_strategy {
46 retention
47 } else {
48 ConfigDispatchStrategy::default()
49 };
50
51 Producer {
52 client,
53 topic_name,
54 schema_ref,
55 dispatch_strategy,
56 producer_name,
57 partitions,
58 message_router,
59 producers: Arc::new(Mutex::new(Vec::new())),
60 producer_options,
61 }
62 }
63
64 /// Initializes the producer and registers it with the message brokers.
65 ///
66 /// This asynchronous method sets up the producer by establishing connections with the message brokers and configuring it for sending messages to the specified topic.
67 /// It is responsible for creating the necessary resources for producers handling partitioned topics.
68 pub async fn create(&mut self) -> Result<()> {
69 let mut topic_producers: Vec<_> = match self.partitions {
70 None => {
71 // Create a single TopicProducer for non-partitioned topic
72 vec![TopicProducer::new(
73 self.client.clone(),
74 self.topic_name.clone(),
75 self.producer_name.clone(),
76 self.schema_ref.clone(),
77 self.dispatch_strategy.clone(),
78 self.producer_options.clone(),
79 )]
80 }
81 Some(partitions) => {
82 if self.message_router.is_none() {
83 self.message_router = Some(MessageRouter::new(partitions));
84 };
85
86 (0..partitions)
87 .map(|partition_id| {
88 let topic = format!("{}-part-{}", self.topic_name, partition_id);
89 TopicProducer::new(
90 self.client.clone(),
91 topic,
92 format!("{}-{}", self.producer_name, partition_id),
93 self.schema_ref.clone(),
94 self.dispatch_strategy.clone(),
95 self.producer_options.clone(),
96 )
97 })
98 .collect()
99 }
100 };
101
102 for topic_producer in &mut topic_producers {
103 let _prod_id = topic_producer.create().await?;
104 }
105
106 // ensure that the producers are added only if all topic_producers are succesfully created
107 let mut producers = self.producers.lock().await;
108 *producers = topic_producers;
109
110 Ok(())
111 }
112
113 /// Sends a message to the topic associated with this producer.
114 ///
115 /// It handles the serialization of the payload and any user-defined attributes. This method assumes that the producer has been successfully initialized and is ready to send messages.
116 ///
117 /// # Parameters
118 ///
119 /// - `data`: The message payload to be sent. This should be a `Vec<u8>` representing the content of the message.
120 /// - `attributes`: Optional user-defined properties or attributes associated with the message. This is a `HashMap<String, String>` where keys and values represent the attribute names and values, respectively.
121 ///
122 /// # Returns
123 ///
124 /// - `Ok(u64)`: The sequence ID of the sent message if the operation is successful. This ID can be used for tracking and acknowledging the message.
125 /// - `Err(e)`: An error if message sending fails. Possible reasons for failure include network issues, serialization errors, or broker-related problems.
126 pub async fn send(
127 &self,
128 data: Vec<u8>,
129 attributes: Option<HashMap<String, String>>,
130 ) -> Result<u64> {
131 use crate::errors::DanubeError;
132 use crate::retry_manager::RetryManager;
133
134 let next_partition = match self.partitions {
135 Some(_) => self
136 .message_router
137 .as_ref()
138 .expect("already initialized")
139 .round_robin(),
140
141 None => 0,
142 };
143
144 // Create retry manager for producers
145 let retry_manager = RetryManager::new(
146 self.producer_options.max_retries,
147 self.producer_options.base_backoff_ms,
148 self.producer_options.max_backoff_ms,
149 );
150
151 let mut attempts = 0;
152 let max_retries = if self.producer_options.max_retries == 0 {
153 5
154 } else {
155 self.producer_options.max_retries
156 };
157
158 loop {
159 let send_result = {
160 let mut producers = self.producers.lock().await;
161 producers[next_partition]
162 .send(data.clone(), attributes.clone())
163 .await
164 };
165
166 match send_result {
167 Ok(sequence_id) => return Ok(sequence_id),
168 Err(error) => {
169 // Check if this is an unrecoverable error (e.g., stream client not initialized)
170 if matches!(error, DanubeError::Unrecoverable(_)) {
171 eprintln!("Unrecoverable error detected in producer send, attempting recreation: {:?}", error);
172
173 // Attempt to recreate the producer for unrecoverable errors
174 let recreate_result = {
175 let mut producers = self.producers.lock().await;
176 producers[next_partition].create().await
177 };
178
179 match recreate_result {
180 Ok(_) => {
181 eprintln!("Producer recreation successful after unrecoverable error, continuing...");
182 attempts = 0; // Reset attempts after successful recreation
183 continue; // Go back to sending
184 }
185 Err(e) => {
186 eprintln!(
187 "Producer recreation failed after unrecoverable error: {:?}",
188 e
189 );
190 return Err(e); // Return error if recreation fails
191 }
192 }
193 }
194
195 // Failed to send, check if retryable
196 if retry_manager.is_retryable_error(&error) {
197 attempts += 1;
198 if attempts > max_retries {
199 eprintln!("Max retries exceeded for producer send, attempting broker lookup and recreation");
200
201 // Attempt broker lookup and producer recreation
202 let lookup_and_recreate_result = {
203 let mut producers = self.producers.lock().await;
204 let producer = &mut producers[next_partition];
205
206 // Perform lookup and reconnect
207 if let Ok(new_addr) = producer
208 .client
209 .lookup_service
210 .handle_lookup(&producer.client.uri, &producer.topic)
211 .await
212 {
213 producer.client.uri = new_addr;
214 producer.connect(&producer.client.uri.clone()).await?;
215 // Recreate producer on new connection
216 producer.create().await
217 } else {
218 Err(error)
219 }
220 };
221
222 match lookup_and_recreate_result {
223 Ok(_) => {
224 eprintln!("Broker lookup and producer recreation successful, continuing...");
225 attempts = 0; // Reset attempts after successful recreation
226 continue; // Go back to sending
227 }
228 Err(e) => {
229 eprintln!(
230 "Broker lookup and producer recreation failed: {:?}",
231 e
232 );
233 return Err(e); // Return error if recreation fails
234 }
235 }
236 }
237 let backoff = retry_manager.calculate_backoff(attempts - 1);
238 tokio::time::sleep(backoff).await;
239 } else {
240 eprintln!("Non-retryable error in producer send: {:?}", error);
241 return Err(error); // Non-retryable error
242 }
243 }
244 }
245 }
246 }
247}
248
249/// A builder for creating a new `Producer` instance.
250///
251/// `ProducerBuilder` provides a fluent API for configuring and instantiating a `Producer`.
252/// It allows you to set various properties that define how the producer will behave and interact with the message broker.
253use danube_core::proto::SchemaReference;
254
255#[derive(Debug, Clone)]
256pub struct ProducerBuilder {
257 client: DanubeClient,
258 topic: Option<String>,
259 num_partitions: Option<usize>,
260 producer_name: Option<String>,
261 // TODO Phase 4: schema removed
262 // schema: Option<Schema>,
263 // Phase 5: Schema registry support
264 schema_ref: Option<SchemaReference>,
265 dispatch_strategy: Option<ConfigDispatchStrategy>,
266 producer_options: ProducerOptions,
267}
268
269impl ProducerBuilder {
270 pub fn new(client: &DanubeClient) -> Self {
271 ProducerBuilder {
272 client: client.clone(),
273 topic: None,
274 num_partitions: None,
275 producer_name: None,
276 schema_ref: None,
277 dispatch_strategy: None,
278 producer_options: ProducerOptions::default(),
279 }
280 }
281
282 /// Sets the topic name for the producer. This is a required field.
283 ///
284 /// This method specifies the topic that the producer will send messages to. It must be set before creating the producer.
285 ///
286 /// # Parameters
287 ///
288 /// - `topic`: The name of the topic for the producer. This should be a non-empty string that corresponds to an existing or new topic.
289 pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
290 self.topic = Some(topic.into());
291 self
292 }
293
294 /// Sets the name of the producer. This is a required field.
295 ///
296 /// This method specifies the name to be assigned to the producer instance. It must be set before creating the producer.
297 ///
298 /// # Parameters
299 ///
300 /// - `producer_name`: The name assigned to the producer instance. This should be a non-empty string used for identifying the producer.
301 pub fn with_name(mut self, producer_name: impl Into<String>) -> Self {
302 self.producer_name = Some(producer_name.into());
303 self
304 }
305
306 // ===== Schema Registry Methods =====
307
308 /// Set schema by subject name (uses latest version)
309 ///
310 /// The producer will reference the latest schema version for the given subject.
311 /// The schema must be registered in the schema registry before use.
312 ///
313 /// # Example
314 /// ```no_run
315 /// # use danube_client::DanubeClient;
316 /// # async fn example(client: DanubeClient) -> Result<(), Box<dyn std::error::Error>> {
317 /// let producer = client.producer()
318 /// .with_topic("user-events")
319 /// .with_schema_subject("user-events-value") // Uses latest version
320 /// .build();
321 /// # Ok(())
322 /// # }
323 /// ```
324 pub fn with_schema_subject(mut self, subject: impl Into<String>) -> Self {
325 use danube_core::proto::schema_reference::VersionRef;
326
327 self.schema_ref = Some(SchemaReference {
328 subject: subject.into(),
329 version_ref: Some(VersionRef::UseLatest(true)),
330 });
331 self
332 }
333
334 /// Set schema with a pinned version
335 ///
336 /// The producer will use a specific schema version and won't automatically
337 /// upgrade to newer versions.
338 ///
339 /// # Example
340 /// ```no_run
341 /// # use danube_client::DanubeClient;
342 /// # async fn example(client: DanubeClient) -> Result<(), Box<dyn std::error::Error>> {
343 /// let producer = client.producer()
344 /// .with_topic("user-events")
345 /// .with_schema_version("user-events-value", 2) // Pin to version 2
346 /// .build();
347 /// # Ok(())
348 /// # }
349 /// ```
350 pub fn with_schema_version(mut self, subject: impl Into<String>, version: u32) -> Self {
351 use danube_core::proto::schema_reference::VersionRef;
352
353 self.schema_ref = Some(SchemaReference {
354 subject: subject.into(),
355 version_ref: Some(VersionRef::PinnedVersion(version)),
356 });
357 self
358 }
359
360 /// Set schema with a minimum version requirement
361 ///
362 /// The producer will use the specified version or any newer compatible version.
363 ///
364 /// # Example
365 /// ```no_run
366 /// # use danube_client::DanubeClient;
367 /// # async fn example(client: DanubeClient) -> Result<(), Box<dyn std::error::Error>> {
368 /// let producer = client.producer()
369 /// .with_topic("user-events")
370 /// .with_schema_min_version("user-events-value", 2) // Use v2 or newer
371 /// .build();
372 /// # Ok(())
373 /// # }
374 /// ```
375 pub fn with_schema_min_version(mut self, subject: impl Into<String>, min_version: u32) -> Self {
376 use danube_core::proto::schema_reference::VersionRef;
377
378 self.schema_ref = Some(SchemaReference {
379 subject: subject.into(),
380 version_ref: Some(VersionRef::MinVersion(min_version)),
381 });
382 self
383 }
384
385 /// Set schema with a custom SchemaReference (advanced use)
386 ///
387 /// This allows full control over schema versioning. For most use cases,
388 /// prefer `with_schema_subject()`, `with_schema_version()`, or `with_schema_min_version()`.
389 ///
390 /// # Example
391 /// ```no_run
392 /// # use danube_client::{DanubeClient, SchemaReference, VersionRef};
393 /// # async fn example(client: DanubeClient) -> Result<(), Box<dyn std::error::Error>> {
394 /// let producer = client.producer()
395 /// .with_topic("user-events")
396 /// .with_schema_reference(SchemaReference {
397 /// subject: "user-events-value".to_string(),
398 /// version_ref: Some(VersionRef::PinnedVersion(2)),
399 /// })
400 /// .build();
401 /// # Ok(())
402 /// # }
403 /// ```
404 pub fn with_schema_reference(mut self, schema_ref: SchemaReference) -> Self {
405 self.schema_ref = Some(schema_ref);
406 self
407 }
408
409 /// Sets the reliable dispatch options for the producer.
410 /// This method configures the dispatch strategy for the producer, which determines how messages are stored and managed.
411 /// The dispatch strategy defines how long messages are retained and how they are managed in the message broker.
412 ///
413 /// # Parameters
414 ///
415 /// No parameters; broker uses defaults for reliable topics.
416 pub fn with_reliable_dispatch(mut self) -> Self {
417 let dispatch_strategy = ConfigDispatchStrategy::Reliable;
418 self.dispatch_strategy = Some(dispatch_strategy);
419 self
420 }
421
422 /// Sets the configuration options for the producer, allowing customization of producer behavior.
423 ///
424 /// This method allows you to specify various configuration options that affect how the producer operates.
425 /// These options can control aspects such as retries, timeouts, and other producer-specific settings.
426 ///
427 /// # Parameters
428 ///
429 /// - `options`: A `ProducerOptions` instance containing the configuration options for the producer. This should be configured according to the desired behavior and requirements of the producer.
430 pub fn with_options(mut self, options: ProducerOptions) -> Self {
431 self.producer_options = options;
432 self
433 }
434
435 /// Sets the number of partitions for the topic.
436 ///
437 /// This method specifies how many partitions the topic should have. Partitions are used to distribute the load of messages across multiple Danube brokers, which can help with parallel processing and scalability.
438 ///
439 /// # Parameters
440 ///
441 /// - `partitions`: The number of partitions for the topic. This should be a positive integer representing the desired number of partitions. More partitions can improve parallelism and throughput. Default is 0 = non-partitioned topic.
442 pub fn with_partitions(mut self, partitions: usize) -> Self {
443 self.num_partitions = Some(partitions);
444 self
445 }
446
447 /// Creates a new `Producer` instance using the settings configured in the `ProducerBuilder`.
448 ///
449 /// This method performs validation to ensure that all required fields are set before creating the `Producer`. Once validation is successful, it constructs and returns a new `Producer` instance configured with the specified settings.
450 ///
451 /// # Returns
452 ///
453 /// - A `Producer` instance if the builder configuration is valid and the producer is created successfully.
454 ///
455 /// # Example
456 ///
457 /// let producer = ProducerBuilder::new()
458 /// .with_topic("my-topic")
459 /// .with_name("my-producer")
460 /// .with_partitions(3)
461 /// .with_schema("my-schema".to_string(), SchemaType::Json("schema-definition".to_string()))
462 /// .build()?;
463 ///
464 pub fn build(self) -> Producer {
465 let topic_name = self
466 .topic
467 .expect("can't create a producer without assigning to a topic");
468 let producer_name = self
469 .producer_name
470 .expect("you should provide a name to the created producer");
471
472 Producer::new(
473 self.client,
474 topic_name,
475 self.schema_ref, // Phase 5: SchemaReference support
476 self.dispatch_strategy,
477 producer_name,
478 self.num_partitions,
479 None,
480 self.producer_options,
481 )
482 }
483}
484
485/// Configuration options for producers
486#[derive(Debug, Clone, Default)]
487pub struct ProducerOptions {
488 // Reserved for future use
489 pub others: String,
490 // Maximum number of retries for operations like create/send on transient failures
491 pub max_retries: usize,
492 // Base backoff in milliseconds for exponential backoff
493 pub base_backoff_ms: u64,
494 // Maximum backoff cap in milliseconds
495 pub max_backoff_ms: u64,
496}