1#![allow(clippy::unused_async)]
27
28use crate::cx::Cx;
29use std::fmt;
30use std::io;
31use std::time::Duration;
32
33#[cfg(feature = "kafka")]
34use parking_lot::Mutex;
35#[cfg(feature = "kafka")]
36use rdkafka::{
37 client::ClientContext,
38 config::ClientConfig,
39 error::{KafkaError as RdKafkaError, RDKafkaErrorCode},
40 message::{BorrowedMessage, DeliveryResult, Header, Message, OwnedHeaders},
41 producer::{BaseRecord, ProducerContext, ThreadedProducer},
42};
43#[cfg(feature = "kafka")]
44use std::future::Future;
45#[cfg(feature = "kafka")]
46use std::pin::Pin;
47#[cfg(feature = "kafka")]
48use std::sync::Arc;
49#[cfg(feature = "kafka")]
50use std::task::{Context, Poll, Waker};
51
52#[derive(Debug)]
54pub enum KafkaError {
55 Io(io::Error),
57 Protocol(String),
59 Broker(String),
61 QueueFull,
63 MessageTooLarge {
65 size: usize,
67 max_size: usize,
69 },
70 InvalidTopic(String),
72 Transaction(String),
74 Cancelled,
76 Config(String),
78}
79
80impl fmt::Display for KafkaError {
81 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82 match self {
83 Self::Io(e) => write!(f, "Kafka I/O error: {e}"),
84 Self::Protocol(msg) => write!(f, "Kafka protocol error: {msg}"),
85 Self::Broker(msg) => write!(f, "Kafka broker error: {msg}"),
86 Self::QueueFull => write!(f, "Kafka producer queue is full"),
87 Self::MessageTooLarge { size, max_size } => {
88 write!(f, "Kafka message too large: {size} bytes (max: {max_size})")
89 }
90 Self::InvalidTopic(topic) => write!(f, "Invalid Kafka topic: {topic}"),
91 Self::Transaction(msg) => write!(f, "Kafka transaction error: {msg}"),
92 Self::Cancelled => write!(f, "Kafka operation cancelled"),
93 Self::Config(msg) => write!(f, "Kafka configuration error: {msg}"),
94 }
95 }
96}
97
98impl std::error::Error for KafkaError {
99 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
100 match self {
101 Self::Io(e) => Some(e),
102 _ => None,
103 }
104 }
105}
106
107impl From<io::Error> for KafkaError {
108 fn from(err: io::Error) -> Self {
109 Self::Io(err)
110 }
111}
112
113#[cfg(feature = "kafka")]
114#[derive(Debug)]
115struct KafkaContext;
116
117#[cfg(feature = "kafka")]
118impl ClientContext for KafkaContext {}
119
120#[cfg(feature = "kafka")]
121impl ProducerContext for KafkaContext {
122 type DeliveryOpaque = Box<DeliverySender>;
123
124 fn delivery(
125 &self,
126 delivery_result: &DeliveryResult<'_>,
127 delivery_opaque: Self::DeliveryOpaque,
128 ) {
129 let mapped = map_delivery_result(delivery_result);
130 delivery_opaque.complete(mapped);
131 }
132}
133
134#[cfg(feature = "kafka")]
135#[derive(Debug)]
136struct DeliveryState {
137 value: Option<Result<RecordMetadata, KafkaError>>,
138 waker: Option<Waker>,
139 closed: bool,
140}
141
142#[cfg(feature = "kafka")]
143impl DeliveryState {
144 fn new() -> Self {
145 Self {
146 value: None,
147 waker: None,
148 closed: false,
149 }
150 }
151}
152
153#[cfg(feature = "kafka")]
154#[derive(Debug)]
155struct DeliverySender {
156 inner: Arc<Mutex<DeliveryState>>,
157}
158
159#[cfg(feature = "kafka")]
160impl DeliverySender {
161 fn complete(self, value: Result<RecordMetadata, KafkaError>) {
162 let mut state = self.inner.lock();
163 if state.closed || state.value.is_some() {
164 return;
165 }
166 state.value = Some(value);
167 if let Some(waker) = state.waker.take() {
168 waker.wake();
169 }
170 }
171}
172
173#[cfg(feature = "kafka")]
174#[derive(Debug)]
175struct DeliveryReceiver {
176 inner: Arc<Mutex<DeliveryState>>,
177 cx: Cx,
178}
179
180#[cfg(feature = "kafka")]
181impl Drop for DeliveryReceiver {
182 fn drop(&mut self) {
183 let mut state = self.inner.lock();
184 state.closed = true;
185 state.waker = None;
186 }
187}
188
189#[cfg(feature = "kafka")]
190impl Future for DeliveryReceiver {
191 type Output = Result<RecordMetadata, KafkaError>;
192
193 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
194 if self.cx.checkpoint().is_err() {
195 let mut state = self.inner.lock();
196 state.closed = true;
197 state.waker = None;
198 return Poll::Ready(Err(KafkaError::Cancelled));
199 }
200
201 let mut state = self.inner.lock();
202 if let Some(value) = state.value.take() {
203 Poll::Ready(value)
204 } else {
205 if !state
206 .waker
207 .as_ref()
208 .is_some_and(|w| w.will_wake(cx.waker()))
209 {
210 state.waker = Some(cx.waker().clone());
211 }
212 Poll::Pending
213 }
214 }
215}
216
217#[cfg(feature = "kafka")]
218fn delivery_channel(cx: &Cx) -> (DeliverySender, DeliveryReceiver) {
219 let inner = Arc::new(Mutex::new(DeliveryState::new()));
220 (
221 DeliverySender {
222 inner: Arc::clone(&inner),
223 },
224 DeliveryReceiver {
225 inner,
226 cx: cx.clone(),
227 },
228 )
229}
230
231#[cfg(feature = "kafka")]
232fn map_delivery_result(delivery_result: &DeliveryResult<'_>) -> Result<RecordMetadata, KafkaError> {
233 match delivery_result {
234 Ok(message) => Ok(record_metadata_from_message(message)),
235 Err((err, message)) => Err(map_rdkafka_error(err, Some(message))),
236 }
237}
238
239#[cfg(feature = "kafka")]
240fn record_metadata_from_message(message: &BorrowedMessage<'_>) -> RecordMetadata {
241 RecordMetadata {
242 topic: message.topic().to_string(),
243 partition: message.partition(),
244 offset: message.offset(),
245 timestamp: message.timestamp().to_millis(),
246 }
247}
248
249#[cfg(feature = "kafka")]
250fn map_rdkafka_error(err: &RdKafkaError, message: Option<&BorrowedMessage<'_>>) -> KafkaError {
251 match err {
252 RdKafkaError::ClientConfig(_, _, _, msg) => KafkaError::Config(msg.clone()),
253 RdKafkaError::MessageProduction(code) => {
254 map_error_code(*code, message.map(|msg| msg.topic()))
255 }
256 RdKafkaError::Canceled => KafkaError::Cancelled,
257 _ => KafkaError::Broker(err.to_string()),
258 }
259}
260
261#[cfg(feature = "kafka")]
262fn map_error_code(code: RDKafkaErrorCode, topic: Option<&str>) -> KafkaError {
263 match code {
264 RDKafkaErrorCode::QueueFull => KafkaError::QueueFull,
265 RDKafkaErrorCode::InvalidTopic | RDKafkaErrorCode::UnknownTopic => {
266 KafkaError::InvalidTopic(topic.unwrap_or("unknown").to_string())
267 }
268 _ => KafkaError::Broker(format!("{code:?}")),
269 }
270}
271
272#[cfg(feature = "kafka")]
273fn compression_to_str(compression: Compression) -> &'static str {
274 match compression {
275 Compression::None => "none",
276 Compression::Gzip => "gzip",
277 Compression::Snappy => "snappy",
278 Compression::Lz4 => "lz4",
279 Compression::Zstd => "zstd",
280 }
281}
282
283#[cfg(feature = "kafka")]
284fn acks_to_str(acks: Acks) -> &'static str {
285 match acks {
286 Acks::None => "0",
287 Acks::Leader => "1",
288 Acks::All => "all",
289 }
290}
291
292#[cfg(feature = "kafka")]
293fn build_client_config(
294 config: &ProducerConfig,
295 transactional: Option<&TransactionalConfig>,
296) -> Result<ClientConfig, KafkaError> {
297 let mut client = ClientConfig::new();
298 client.set("bootstrap.servers", config.bootstrap_servers.join(","));
299 if let Some(client_id) = &config.client_id {
300 client.set("client.id", client_id);
301 }
302 client.set("batch.size", config.batch_size.to_string());
303 client.set("linger.ms", config.linger_ms.to_string());
304 client.set("compression.type", compression_to_str(config.compression));
305 client.set("enable.idempotence", config.enable_idempotence.to_string());
306 client.set("acks", acks_to_str(config.acks));
307 client.set("retries", config.retries.to_string());
308 client.set(
309 "request.timeout.ms",
310 config.request_timeout.as_millis().to_string(),
311 );
312 client.set("message.max.bytes", config.max_message_size.to_string());
313
314 if let Some(tx) = transactional {
315 client.set("transactional.id", tx.transaction_id.as_str());
316 client.set(
317 "transaction.timeout.ms",
318 tx.transaction_timeout.as_millis().to_string(),
319 );
320 client.set("enable.idempotence", "true");
321 }
322
323 Ok(client)
324}
325
326#[cfg(feature = "kafka")]
327fn build_producer(
328 config: &ProducerConfig,
329 transactional: Option<&TransactionalConfig>,
330) -> Result<ThreadedProducer<KafkaContext>, KafkaError> {
331 let client = build_client_config(config, transactional)?;
332 client
333 .create_with_context(KafkaContext)
334 .map_err(|err| map_rdkafka_error(&err, None))
335}
336
337#[cfg(feature = "kafka")]
338async fn send_with_producer(
339 producer: &ThreadedProducer<KafkaContext>,
340 cx: &Cx,
341 config: &ProducerConfig,
342 topic: &str,
343 key: Option<&[u8]>,
344 payload: &[u8],
345 partition: Option<i32>,
346 headers: Option<&[(&str, &[u8])]>,
347) -> Result<RecordMetadata, KafkaError> {
348 cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
349
350 if payload.len() > config.max_message_size {
351 return Err(KafkaError::MessageTooLarge {
352 size: payload.len(),
353 max_size: config.max_message_size,
354 });
355 }
356
357 let (sender, receiver) = delivery_channel(cx);
358
359 let mut record = BaseRecord::with_opaque_to(topic, Box::new(sender)).payload(payload);
360 if let Some(key) = key {
361 record = record.key(key);
362 }
363 if let Some(partition) = partition {
364 record = record.partition(partition);
365 }
366 if let Some(headers) = headers {
367 let mut owned_headers = OwnedHeaders::new();
368 for (key, value) in headers {
369 owned_headers = owned_headers.insert(Header {
370 key,
371 value: Some(*value),
372 });
373 }
374 record = record.headers(owned_headers);
375 }
376
377 match producer.send(record) {
378 Ok(()) => receiver.await,
379 Err((err, _)) => Err(map_rdkafka_error(&err, None)),
380 }
381}
382
383#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
385pub enum Compression {
386 #[default]
388 None,
389 Gzip,
391 Snappy,
393 Lz4,
395 Zstd,
397}
398
399#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
401pub enum Acks {
402 None,
404 Leader,
406 #[default]
408 All,
409}
410
411impl Acks {
412 #[must_use]
414 pub const fn as_i16(&self) -> i16 {
415 match self {
416 Self::None => 0,
417 Self::Leader => 1,
418 Self::All => -1,
419 }
420 }
421}
422
423#[derive(Debug, Clone)]
425pub struct ProducerConfig {
426 pub bootstrap_servers: Vec<String>,
428 pub client_id: Option<String>,
430 pub batch_size: usize,
432 pub linger_ms: u64,
434 pub compression: Compression,
436 pub enable_idempotence: bool,
438 pub acks: Acks,
440 pub retries: u32,
442 pub request_timeout: Duration,
444 pub max_message_size: usize,
446}
447
448impl Default for ProducerConfig {
449 fn default() -> Self {
450 Self {
451 bootstrap_servers: vec!["localhost:9092".to_string()],
452 client_id: None,
453 batch_size: 16_384, linger_ms: 5, compression: Compression::None,
456 enable_idempotence: true,
457 acks: Acks::All,
458 retries: 3,
459 request_timeout: Duration::from_secs(30),
460 max_message_size: 1_048_576, }
462 }
463}
464
465impl ProducerConfig {
466 #[must_use]
468 pub fn new(bootstrap_servers: Vec<String>) -> Self {
469 Self {
470 bootstrap_servers,
471 ..Default::default()
472 }
473 }
474
475 #[must_use]
477 pub fn client_id(mut self, client_id: &str) -> Self {
478 self.client_id = Some(client_id.to_string());
479 self
480 }
481
482 #[must_use]
484 pub const fn batch_size(mut self, size: usize) -> Self {
485 self.batch_size = size;
486 self
487 }
488
489 #[must_use]
491 pub const fn linger_ms(mut self, ms: u64) -> Self {
492 self.linger_ms = ms;
493 self
494 }
495
496 #[must_use]
498 pub const fn compression(mut self, compression: Compression) -> Self {
499 self.compression = compression;
500 self
501 }
502
503 #[must_use]
505 pub const fn enable_idempotence(mut self, enable: bool) -> Self {
506 self.enable_idempotence = enable;
507 self
508 }
509
510 #[must_use]
512 pub const fn acks(mut self, acks: Acks) -> Self {
513 self.acks = acks;
514 self
515 }
516
517 #[must_use]
519 pub const fn retries(mut self, retries: u32) -> Self {
520 self.retries = retries;
521 self
522 }
523
524 pub fn validate(&self) -> Result<(), KafkaError> {
526 if self.bootstrap_servers.is_empty() {
527 return Err(KafkaError::Config(
528 "bootstrap_servers cannot be empty".to_string(),
529 ));
530 }
531 if self.batch_size == 0 {
532 return Err(KafkaError::Config("batch_size must be > 0".to_string()));
533 }
534 if self.max_message_size == 0 {
535 return Err(KafkaError::Config(
536 "max_message_size must be > 0".to_string(),
537 ));
538 }
539 Ok(())
540 }
541}
542
543#[derive(Debug, Clone)]
545pub struct RecordMetadata {
546 pub topic: String,
548 pub partition: i32,
550 pub offset: i64,
552 pub timestamp: Option<i64>,
554}
555
556#[derive(Debug)]
561pub struct KafkaProducer {
562 config: ProducerConfig,
563}
564
565impl KafkaProducer {
566 pub fn new(config: ProducerConfig) -> Result<Self, KafkaError> {
568 config.validate()?;
569 Ok(Self { config })
570 }
571
572 #[allow(unused_variables)]
584 pub async fn send(
585 &self,
586 cx: &Cx,
587 topic: &str,
588 key: Option<&[u8]>,
589 payload: &[u8],
590 partition: Option<i32>,
591 ) -> Result<RecordMetadata, KafkaError> {
592 cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
593
594 if payload.len() > self.config.max_message_size {
596 return Err(KafkaError::MessageTooLarge {
597 size: payload.len(),
598 max_size: self.config.max_message_size,
599 });
600 }
601
602 Err(KafkaError::Io(io::Error::other(
604 "Phase 0: requires rdkafka integration",
605 )))
606 }
607
608 #[allow(unused_variables)]
617 pub async fn send_with_headers(
618 &self,
619 cx: &Cx,
620 topic: &str,
621 key: Option<&[u8]>,
622 payload: &[u8],
623 headers: &[(&str, &[u8])],
624 ) -> Result<RecordMetadata, KafkaError> {
625 cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
626
627 if payload.len() > self.config.max_message_size {
628 return Err(KafkaError::MessageTooLarge {
629 size: payload.len(),
630 max_size: self.config.max_message_size,
631 });
632 }
633
634 Err(KafkaError::Io(io::Error::other(
635 "Phase 0: requires rdkafka integration",
636 )))
637 }
638
639 #[allow(unused_variables)]
643 pub async fn flush(&self, cx: &Cx, timeout: Duration) -> Result<(), KafkaError> {
644 cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
645
646 Err(KafkaError::Io(io::Error::other(
647 "Phase 0: requires rdkafka integration",
648 )))
649 }
650
651 #[must_use]
653 pub const fn config(&self) -> &ProducerConfig {
654 &self.config
655 }
656}
657
658#[derive(Debug, Clone)]
660pub struct TransactionalConfig {
661 pub producer: ProducerConfig,
663 pub transaction_id: String,
665 pub transaction_timeout: Duration,
667}
668
669impl TransactionalConfig {
670 #[must_use]
672 pub fn new(producer: ProducerConfig, transaction_id: String) -> Self {
673 Self {
674 producer,
675 transaction_id,
676 transaction_timeout: Duration::from_mins(1),
677 }
678 }
679
680 #[must_use]
682 pub const fn transaction_timeout(mut self, timeout: Duration) -> Self {
683 self.transaction_timeout = timeout;
684 self
685 }
686}
687
688#[derive(Debug)]
692pub struct TransactionalProducer {
693 config: TransactionalConfig,
694}
695
696impl TransactionalProducer {
697 pub fn new(config: TransactionalConfig) -> Result<Self, KafkaError> {
699 config.producer.validate()?;
700
701 if config.transaction_id.is_empty() {
702 return Err(KafkaError::Config(
703 "transaction_id cannot be empty".to_string(),
704 ));
705 }
706
707 Ok(Self { config })
708 }
709
710 #[allow(unused_variables)]
714 pub async fn begin_transaction(&self, cx: &Cx) -> Result<Transaction<'_>, KafkaError> {
715 cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
716
717 Err(KafkaError::Io(io::Error::other(
719 "Phase 0: requires rdkafka integration",
720 )))
721 }
722
723 #[must_use]
725 pub fn transaction_id(&self) -> &str {
726 &self.config.transaction_id
727 }
728
729 #[must_use]
731 pub const fn config(&self) -> &TransactionalConfig {
732 &self.config
733 }
734}
735
736#[derive(Debug)]
741pub struct Transaction<'a> {
742 producer: &'a TransactionalProducer,
743 committed: bool,
744}
745
746impl Transaction<'_> {
747 #[allow(unused_variables)]
749 pub async fn send(
750 &self,
751 cx: &Cx,
752 topic: &str,
753 key: Option<&[u8]>,
754 payload: &[u8],
755 ) -> Result<(), KafkaError> {
756 cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
757
758 Err(KafkaError::Io(io::Error::other(
759 "Phase 0: requires rdkafka integration",
760 )))
761 }
762
763 #[allow(unused_variables)]
767 pub async fn commit(mut self, cx: &Cx) -> Result<(), KafkaError> {
768 cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
769
770 self.committed = true;
772
773 Err(KafkaError::Io(io::Error::other(
774 "Phase 0: requires rdkafka integration",
775 )))
776 }
777
778 #[allow(unused_variables)]
782 pub async fn abort(mut self, cx: &Cx) -> Result<(), KafkaError> {
783 cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
784
785 self.committed = true;
787
788 Err(KafkaError::Io(io::Error::other(
789 "Phase 0: requires rdkafka integration",
790 )))
791 }
792}
793
794impl Drop for Transaction<'_> {
795 fn drop(&mut self) {
796 if !self.committed {
797 }
800 }
801}
802
803#[cfg(test)]
804mod tests {
805 use super::*;
806
807 #[test]
808 fn test_acks_values() {
809 assert_eq!(Acks::None.as_i16(), 0);
810 assert_eq!(Acks::Leader.as_i16(), 1);
811 assert_eq!(Acks::All.as_i16(), -1);
812 }
813
814 #[test]
815 fn test_config_defaults() {
816 let config = ProducerConfig::default();
817 assert_eq!(config.batch_size, 16_384);
818 assert_eq!(config.linger_ms, 5);
819 assert!(config.enable_idempotence);
820 assert_eq!(config.acks, Acks::All);
821 }
822
823 #[test]
824 fn test_config_builder() {
825 let config = ProducerConfig::new(vec!["kafka:9092".to_string()])
826 .client_id("my-producer")
827 .batch_size(32_768)
828 .compression(Compression::Snappy)
829 .acks(Acks::Leader);
830
831 assert_eq!(config.bootstrap_servers, vec!["kafka:9092"]);
832 assert_eq!(config.client_id, Some("my-producer".to_string()));
833 assert_eq!(config.batch_size, 32_768);
834 assert_eq!(config.compression, Compression::Snappy);
835 assert_eq!(config.acks, Acks::Leader);
836 }
837
838 #[test]
839 fn test_config_validation() {
840 let empty_servers = ProducerConfig {
841 bootstrap_servers: vec![],
842 ..Default::default()
843 };
844 assert!(empty_servers.validate().is_err());
845
846 let valid = ProducerConfig::default();
847 assert!(valid.validate().is_ok());
848 }
849
850 #[test]
851 fn test_producer_creation() {
852 let config = ProducerConfig::default();
853 let producer = KafkaProducer::new(config);
854 assert!(producer.is_ok());
855 }
856
857 #[test]
858 fn test_transactional_config() {
859 let config =
860 TransactionalConfig::new(ProducerConfig::default(), "my-transaction-id".to_string())
861 .transaction_timeout(Duration::from_mins(2));
862
863 assert_eq!(config.transaction_id, "my-transaction-id");
864 assert_eq!(config.transaction_timeout, Duration::from_mins(2));
865 }
866
867 #[test]
868 fn test_transactional_producer_empty_id() {
869 let config = TransactionalConfig::new(ProducerConfig::default(), String::new());
870 let producer = TransactionalProducer::new(config);
871 assert!(producer.is_err());
872 }
873
874 #[test]
875 fn test_error_display() {
876 let io_err = KafkaError::Io(io::Error::other("test"));
877 assert!(io_err.to_string().contains("I/O error"));
878
879 let msg_err = KafkaError::MessageTooLarge {
880 size: 2_000_000,
881 max_size: 1_000_000,
882 };
883 assert!(msg_err.to_string().contains("2000000"));
884 assert!(msg_err.to_string().contains("1000000"));
885
886 let cancelled = KafkaError::Cancelled;
887 assert!(cancelled.to_string().contains("cancelled"));
888 }
889
890 #[test]
891 fn test_record_metadata() {
892 let meta = RecordMetadata {
893 topic: "test-topic".to_string(),
894 partition: 0,
895 offset: 42,
896 timestamp: Some(1_234_567_890),
897 };
898 assert_eq!(meta.topic, "test-topic");
899 assert_eq!(meta.partition, 0);
900 assert_eq!(meta.offset, 42);
901 assert_eq!(meta.timestamp, Some(1_234_567_890));
902 }
903
904 #[test]
907 fn kafka_error_display_all_variants() {
908 assert!(
909 KafkaError::Io(io::Error::other("e"))
910 .to_string()
911 .contains("I/O error")
912 );
913 assert!(
914 KafkaError::Protocol("p".into())
915 .to_string()
916 .contains("protocol error")
917 );
918 assert!(
919 KafkaError::Broker("b".into())
920 .to_string()
921 .contains("broker error")
922 );
923 assert!(KafkaError::QueueFull.to_string().contains("queue is full"));
924 assert!(
925 KafkaError::MessageTooLarge {
926 size: 10,
927 max_size: 5
928 }
929 .to_string()
930 .contains("10")
931 );
932 assert!(
933 KafkaError::InvalidTopic("bad".into())
934 .to_string()
935 .contains("bad")
936 );
937 assert!(
938 KafkaError::Transaction("tx".into())
939 .to_string()
940 .contains("transaction error")
941 );
942 assert!(KafkaError::Cancelled.to_string().contains("cancelled"));
943 assert!(
944 KafkaError::Config("cfg".into())
945 .to_string()
946 .contains("configuration error")
947 );
948 }
949
950 #[test]
951 fn kafka_error_debug() {
952 let err = KafkaError::QueueFull;
953 let dbg = format!("{err:?}");
954 assert!(dbg.contains("QueueFull"));
955 }
956
957 #[test]
958 fn kafka_error_source_io() {
959 let err = KafkaError::Io(io::Error::other("disk"));
960 let src = std::error::Error::source(&err);
961 assert!(src.is_some());
962 }
963
964 #[test]
965 fn kafka_error_source_none_for_others() {
966 let err = KafkaError::Cancelled;
967 assert!(std::error::Error::source(&err).is_none());
968 }
969
970 #[test]
971 fn kafka_error_from_io() {
972 let io_err = io::Error::other("net");
973 let err: KafkaError = KafkaError::from(io_err);
974 assert!(matches!(err, KafkaError::Io(_)));
975 }
976
977 #[test]
978 fn compression_default_is_none() {
979 assert_eq!(Compression::default(), Compression::None);
980 }
981
982 #[test]
983 fn compression_debug_clone_copy_eq() {
984 let c = Compression::Snappy;
985 let dbg = format!("{c:?}");
986 assert!(dbg.contains("Snappy"));
987
988 let copy = c;
989 assert_eq!(c, copy);
990 }
991
992 #[test]
993 fn compression_all_variants_ne() {
994 let variants = [
995 Compression::None,
996 Compression::Gzip,
997 Compression::Snappy,
998 Compression::Lz4,
999 Compression::Zstd,
1000 ];
1001 for (i, a) in variants.iter().enumerate() {
1002 for (j, b) in variants.iter().enumerate() {
1003 if i != j {
1004 assert_ne!(a, b);
1005 }
1006 }
1007 }
1008 }
1009
1010 #[test]
1011 fn acks_default_is_all() {
1012 assert_eq!(Acks::default(), Acks::All);
1013 }
1014
1015 #[test]
1016 fn acks_debug_clone_copy_eq() {
1017 let a = Acks::Leader;
1018 let dbg = format!("{a:?}");
1019 assert!(dbg.contains("Leader"));
1020
1021 let copy = a;
1022 assert_eq!(a, copy);
1023 }
1024
1025 #[test]
1026 fn acks_as_i16_all_variants() {
1027 assert_eq!(Acks::None.as_i16(), 0);
1028 assert_eq!(Acks::Leader.as_i16(), 1);
1029 assert_eq!(Acks::All.as_i16(), -1);
1030 }
1031
1032 #[test]
1033 fn producer_config_default_values() {
1034 let cfg = ProducerConfig::default();
1035 assert_eq!(cfg.bootstrap_servers, vec!["localhost:9092".to_string()]);
1036 assert!(cfg.client_id.is_none());
1037 assert_eq!(cfg.batch_size, 16_384);
1038 assert_eq!(cfg.linger_ms, 5);
1039 assert_eq!(cfg.compression, Compression::None);
1040 assert!(cfg.enable_idempotence);
1041 assert_eq!(cfg.acks, Acks::All);
1042 assert_eq!(cfg.retries, 3);
1043 assert_eq!(cfg.request_timeout, Duration::from_secs(30));
1044 assert_eq!(cfg.max_message_size, 1_048_576);
1045 }
1046
1047 #[test]
1048 fn producer_config_debug_clone() {
1049 let cfg = ProducerConfig::default();
1050 let dbg = format!("{cfg:?}");
1051 assert!(dbg.contains("ProducerConfig"));
1052
1053 let cloned = cfg;
1054 assert_eq!(cloned.batch_size, 16_384);
1055 }
1056
1057 #[test]
1058 fn producer_config_builder_linger_retries() {
1059 let cfg = ProducerConfig::new(vec!["k:9092".into()])
1060 .linger_ms(100)
1061 .retries(10)
1062 .enable_idempotence(false);
1063 assert_eq!(cfg.linger_ms, 100);
1064 assert_eq!(cfg.retries, 10);
1065 assert!(!cfg.enable_idempotence);
1066 }
1067
1068 #[test]
1069 fn producer_config_validate_zero_batch_size() {
1070 let cfg = ProducerConfig {
1071 batch_size: 0,
1072 ..Default::default()
1073 };
1074 assert!(cfg.validate().is_err());
1075 }
1076
1077 #[test]
1078 fn producer_config_validate_zero_max_message() {
1079 let cfg = ProducerConfig {
1080 max_message_size: 0,
1081 ..Default::default()
1082 };
1083 assert!(cfg.validate().is_err());
1084 }
1085
1086 #[test]
1087 fn record_metadata_debug_clone() {
1088 let meta = RecordMetadata {
1089 topic: "t".into(),
1090 partition: 1,
1091 offset: 99,
1092 timestamp: None,
1093 };
1094 let dbg = format!("{meta:?}");
1095 assert!(dbg.contains("RecordMetadata"));
1096
1097 let cloned = meta;
1098 assert_eq!(cloned.partition, 1);
1099 assert!(cloned.timestamp.is_none());
1100 }
1101
1102 #[test]
1103 fn kafka_producer_config_accessor() {
1104 let cfg = ProducerConfig::new(vec!["host:9092".into()]).batch_size(999);
1105 let producer = KafkaProducer::new(cfg).unwrap();
1106 assert_eq!(producer.config().batch_size, 999);
1107 }
1108
1109 #[test]
1110 fn kafka_producer_debug() {
1111 let producer = KafkaProducer::new(ProducerConfig::default()).unwrap();
1112 let dbg = format!("{producer:?}");
1113 assert!(dbg.contains("KafkaProducer"));
1114 }
1115
1116 #[test]
1117 fn kafka_producer_reject_empty_servers() {
1118 let cfg = ProducerConfig {
1119 bootstrap_servers: vec![],
1120 ..Default::default()
1121 };
1122 assert!(KafkaProducer::new(cfg).is_err());
1123 }
1124
1125 #[test]
1126 fn transactional_config_debug_clone() {
1127 let tc = TransactionalConfig::new(ProducerConfig::default(), "tx-1".into());
1128 let dbg = format!("{tc:?}");
1129 assert!(dbg.contains("TransactionalConfig"));
1130
1131 let cloned = tc;
1132 assert_eq!(cloned.transaction_id, "tx-1");
1133 }
1134
1135 #[test]
1136 fn transactional_config_default_timeout() {
1137 let tc = TransactionalConfig::new(ProducerConfig::default(), "tx-2".into());
1138 assert_eq!(tc.transaction_timeout, Duration::from_mins(1));
1139 }
1140
1141 #[test]
1142 fn transactional_producer_debug() {
1143 let tc = TransactionalConfig::new(ProducerConfig::default(), "tx-3".into());
1144 let producer = TransactionalProducer::new(tc).unwrap();
1145 let dbg = format!("{producer:?}");
1146 assert!(dbg.contains("TransactionalProducer"));
1147 }
1148
1149 #[test]
1150 fn transactional_producer_accessors() {
1151 let tc = TransactionalConfig::new(ProducerConfig::default(), "tx-4".into());
1152 let producer = TransactionalProducer::new(tc).unwrap();
1153 assert_eq!(producer.transaction_id(), "tx-4");
1154 assert_eq!(producer.config().transaction_id, "tx-4");
1155 }
1156
1157 #[test]
1158 fn compression_debug_clone_copy_default_eq() {
1159 let c = Compression::default();
1160 assert_eq!(c, Compression::None);
1161 let dbg = format!("{c:?}");
1162 assert!(dbg.contains("None"), "{dbg}");
1163 let copied: Compression = c;
1164 let cloned = c;
1165 assert_eq!(copied, cloned);
1166 assert_ne!(c, Compression::Zstd);
1167 }
1168
1169 #[test]
1170 fn acks_debug_clone_copy_default_eq() {
1171 let a = Acks::default();
1172 assert_eq!(a, Acks::All);
1173 let dbg = format!("{a:?}");
1174 assert!(dbg.contains("All"), "{dbg}");
1175 let copied: Acks = a;
1176 let cloned = a;
1177 assert_eq!(copied, cloned);
1178 assert_ne!(a, Acks::Leader);
1179 }
1180}