Skip to main content

asupersync/messaging/
kafka.rs

1//! Kafka producer with Cx integration for cancel-correct message publishing.
2//!
3//! This module provides a Kafka producer with exactly-once semantics and
4//! transactional support, integrated with the Asupersync `Cx` context for
5//! proper cancellation handling.
6//!
7//! # Design
8//!
9//! The implementation wraps the rdkafka crate (when available) with a Cx
10//! integration layer. In Phase 0, this provides the API shape as stubs.
11//!
12//! # Exactly-Once Semantics
13//!
14//! Kafka supports exactly-once via:
15//! - Idempotent producers (deduplication via sequence numbers)
16//! - Transactional producers (atomic batch commits)
17//!
18//! # Cancel-Correct Behavior
19//!
20//! - In-flight sends are tracked as obligations
21//! - Cancellation waits for pending acks (with bounded timeout)
22//! - Uncommitted transactions abort on cancellation
23
24// Phase 0 stubs return errors immediately; async is for API consistency
25// with eventual rdkafka integration.
26#![allow(clippy::unused_async)]
27
28use crate::cx::Cx;
29use std::fmt;
30use std::io;
31use std::time::Duration;
32
33#[cfg(feature = "kafka")]
34use parking_lot::Mutex;
35#[cfg(feature = "kafka")]
36use rdkafka::{
37    client::ClientContext,
38    config::ClientConfig,
39    error::{KafkaError as RdKafkaError, RDKafkaErrorCode},
40    message::{BorrowedMessage, DeliveryResult, Header, Message, OwnedHeaders},
41    producer::{BaseRecord, ProducerContext, ThreadedProducer},
42};
43#[cfg(feature = "kafka")]
44use std::future::Future;
45#[cfg(feature = "kafka")]
46use std::pin::Pin;
47#[cfg(feature = "kafka")]
48use std::sync::Arc;
49#[cfg(feature = "kafka")]
50use std::task::{Context, Poll, Waker};
51
52/// Error type for Kafka operations.
53#[derive(Debug)]
54pub enum KafkaError {
55    /// I/O error during communication.
56    Io(io::Error),
57    /// Protocol error (malformed Kafka response).
58    Protocol(String),
59    /// Kafka broker returned an error.
60    Broker(String),
61    /// Producer queue is full.
62    QueueFull,
63    /// Message is too large.
64    MessageTooLarge {
65        /// Size of the message.
66        size: usize,
67        /// Maximum allowed size.
68        max_size: usize,
69    },
70    /// Invalid topic name.
71    InvalidTopic(String),
72    /// Transaction error.
73    Transaction(String),
74    /// Operation cancelled.
75    Cancelled,
76    /// Configuration error.
77    Config(String),
78}
79
80impl fmt::Display for KafkaError {
81    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82        match self {
83            Self::Io(e) => write!(f, "Kafka I/O error: {e}"),
84            Self::Protocol(msg) => write!(f, "Kafka protocol error: {msg}"),
85            Self::Broker(msg) => write!(f, "Kafka broker error: {msg}"),
86            Self::QueueFull => write!(f, "Kafka producer queue is full"),
87            Self::MessageTooLarge { size, max_size } => {
88                write!(f, "Kafka message too large: {size} bytes (max: {max_size})")
89            }
90            Self::InvalidTopic(topic) => write!(f, "Invalid Kafka topic: {topic}"),
91            Self::Transaction(msg) => write!(f, "Kafka transaction error: {msg}"),
92            Self::Cancelled => write!(f, "Kafka operation cancelled"),
93            Self::Config(msg) => write!(f, "Kafka configuration error: {msg}"),
94        }
95    }
96}
97
98impl std::error::Error for KafkaError {
99    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
100        match self {
101            Self::Io(e) => Some(e),
102            _ => None,
103        }
104    }
105}
106
107impl From<io::Error> for KafkaError {
108    fn from(err: io::Error) -> Self {
109        Self::Io(err)
110    }
111}
112
113#[cfg(feature = "kafka")]
114#[derive(Debug)]
115struct KafkaContext;
116
117#[cfg(feature = "kafka")]
118impl ClientContext for KafkaContext {}
119
120#[cfg(feature = "kafka")]
121impl ProducerContext for KafkaContext {
122    type DeliveryOpaque = Box<DeliverySender>;
123
124    fn delivery(
125        &self,
126        delivery_result: &DeliveryResult<'_>,
127        delivery_opaque: Self::DeliveryOpaque,
128    ) {
129        let mapped = map_delivery_result(delivery_result);
130        delivery_opaque.complete(mapped);
131    }
132}
133
134#[cfg(feature = "kafka")]
135#[derive(Debug)]
136struct DeliveryState {
137    value: Option<Result<RecordMetadata, KafkaError>>,
138    waker: Option<Waker>,
139    closed: bool,
140}
141
142#[cfg(feature = "kafka")]
143impl DeliveryState {
144    fn new() -> Self {
145        Self {
146            value: None,
147            waker: None,
148            closed: false,
149        }
150    }
151}
152
153#[cfg(feature = "kafka")]
154#[derive(Debug)]
155struct DeliverySender {
156    inner: Arc<Mutex<DeliveryState>>,
157}
158
159#[cfg(feature = "kafka")]
160impl DeliverySender {
161    fn complete(self, value: Result<RecordMetadata, KafkaError>) {
162        let mut state = self.inner.lock();
163        if state.closed || state.value.is_some() {
164            return;
165        }
166        state.value = Some(value);
167        if let Some(waker) = state.waker.take() {
168            waker.wake();
169        }
170    }
171}
172
173#[cfg(feature = "kafka")]
174#[derive(Debug)]
175struct DeliveryReceiver {
176    inner: Arc<Mutex<DeliveryState>>,
177    cx: Cx,
178}
179
180#[cfg(feature = "kafka")]
181impl Drop for DeliveryReceiver {
182    fn drop(&mut self) {
183        let mut state = self.inner.lock();
184        state.closed = true;
185        state.waker = None;
186    }
187}
188
189#[cfg(feature = "kafka")]
190impl Future for DeliveryReceiver {
191    type Output = Result<RecordMetadata, KafkaError>;
192
193    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
194        if self.cx.checkpoint().is_err() {
195            let mut state = self.inner.lock();
196            state.closed = true;
197            state.waker = None;
198            return Poll::Ready(Err(KafkaError::Cancelled));
199        }
200
201        let mut state = self.inner.lock();
202        if let Some(value) = state.value.take() {
203            Poll::Ready(value)
204        } else {
205            if !state
206                .waker
207                .as_ref()
208                .is_some_and(|w| w.will_wake(cx.waker()))
209            {
210                state.waker = Some(cx.waker().clone());
211            }
212            Poll::Pending
213        }
214    }
215}
216
217#[cfg(feature = "kafka")]
218fn delivery_channel(cx: &Cx) -> (DeliverySender, DeliveryReceiver) {
219    let inner = Arc::new(Mutex::new(DeliveryState::new()));
220    (
221        DeliverySender {
222            inner: Arc::clone(&inner),
223        },
224        DeliveryReceiver {
225            inner,
226            cx: cx.clone(),
227        },
228    )
229}
230
231#[cfg(feature = "kafka")]
232fn map_delivery_result(delivery_result: &DeliveryResult<'_>) -> Result<RecordMetadata, KafkaError> {
233    match delivery_result {
234        Ok(message) => Ok(record_metadata_from_message(message)),
235        Err((err, message)) => Err(map_rdkafka_error(err, Some(message))),
236    }
237}
238
239#[cfg(feature = "kafka")]
240fn record_metadata_from_message(message: &BorrowedMessage<'_>) -> RecordMetadata {
241    RecordMetadata {
242        topic: message.topic().to_string(),
243        partition: message.partition(),
244        offset: message.offset(),
245        timestamp: message.timestamp().to_millis(),
246    }
247}
248
249#[cfg(feature = "kafka")]
250fn map_rdkafka_error(err: &RdKafkaError, message: Option<&BorrowedMessage<'_>>) -> KafkaError {
251    match err {
252        RdKafkaError::ClientConfig(_, _, _, msg) => KafkaError::Config(msg.clone()),
253        RdKafkaError::MessageProduction(code) => {
254            map_error_code(*code, message.map(|msg| msg.topic()))
255        }
256        RdKafkaError::Canceled => KafkaError::Cancelled,
257        _ => KafkaError::Broker(err.to_string()),
258    }
259}
260
261#[cfg(feature = "kafka")]
262fn map_error_code(code: RDKafkaErrorCode, topic: Option<&str>) -> KafkaError {
263    match code {
264        RDKafkaErrorCode::QueueFull => KafkaError::QueueFull,
265        RDKafkaErrorCode::InvalidTopic | RDKafkaErrorCode::UnknownTopic => {
266            KafkaError::InvalidTopic(topic.unwrap_or("unknown").to_string())
267        }
268        _ => KafkaError::Broker(format!("{code:?}")),
269    }
270}
271
272#[cfg(feature = "kafka")]
273fn compression_to_str(compression: Compression) -> &'static str {
274    match compression {
275        Compression::None => "none",
276        Compression::Gzip => "gzip",
277        Compression::Snappy => "snappy",
278        Compression::Lz4 => "lz4",
279        Compression::Zstd => "zstd",
280    }
281}
282
283#[cfg(feature = "kafka")]
284fn acks_to_str(acks: Acks) -> &'static str {
285    match acks {
286        Acks::None => "0",
287        Acks::Leader => "1",
288        Acks::All => "all",
289    }
290}
291
292#[cfg(feature = "kafka")]
293fn build_client_config(
294    config: &ProducerConfig,
295    transactional: Option<&TransactionalConfig>,
296) -> Result<ClientConfig, KafkaError> {
297    let mut client = ClientConfig::new();
298    client.set("bootstrap.servers", config.bootstrap_servers.join(","));
299    if let Some(client_id) = &config.client_id {
300        client.set("client.id", client_id);
301    }
302    client.set("batch.size", config.batch_size.to_string());
303    client.set("linger.ms", config.linger_ms.to_string());
304    client.set("compression.type", compression_to_str(config.compression));
305    client.set("enable.idempotence", config.enable_idempotence.to_string());
306    client.set("acks", acks_to_str(config.acks));
307    client.set("retries", config.retries.to_string());
308    client.set(
309        "request.timeout.ms",
310        config.request_timeout.as_millis().to_string(),
311    );
312    client.set("message.max.bytes", config.max_message_size.to_string());
313
314    if let Some(tx) = transactional {
315        client.set("transactional.id", tx.transaction_id.as_str());
316        client.set(
317            "transaction.timeout.ms",
318            tx.transaction_timeout.as_millis().to_string(),
319        );
320        client.set("enable.idempotence", "true");
321    }
322
323    Ok(client)
324}
325
326#[cfg(feature = "kafka")]
327fn build_producer(
328    config: &ProducerConfig,
329    transactional: Option<&TransactionalConfig>,
330) -> Result<ThreadedProducer<KafkaContext>, KafkaError> {
331    let client = build_client_config(config, transactional)?;
332    client
333        .create_with_context(KafkaContext)
334        .map_err(|err| map_rdkafka_error(&err, None))
335}
336
337#[cfg(feature = "kafka")]
338async fn send_with_producer(
339    producer: &ThreadedProducer<KafkaContext>,
340    cx: &Cx,
341    config: &ProducerConfig,
342    topic: &str,
343    key: Option<&[u8]>,
344    payload: &[u8],
345    partition: Option<i32>,
346    headers: Option<&[(&str, &[u8])]>,
347) -> Result<RecordMetadata, KafkaError> {
348    cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
349
350    if payload.len() > config.max_message_size {
351        return Err(KafkaError::MessageTooLarge {
352            size: payload.len(),
353            max_size: config.max_message_size,
354        });
355    }
356
357    let (sender, receiver) = delivery_channel(cx);
358
359    let mut record = BaseRecord::with_opaque_to(topic, Box::new(sender)).payload(payload);
360    if let Some(key) = key {
361        record = record.key(key);
362    }
363    if let Some(partition) = partition {
364        record = record.partition(partition);
365    }
366    if let Some(headers) = headers {
367        let mut owned_headers = OwnedHeaders::new();
368        for (key, value) in headers {
369            owned_headers = owned_headers.insert(Header {
370                key,
371                value: Some(*value),
372            });
373        }
374        record = record.headers(owned_headers);
375    }
376
377    match producer.send(record) {
378        Ok(()) => receiver.await,
379        Err((err, _)) => Err(map_rdkafka_error(&err, None)),
380    }
381}
382
383/// Compression algorithm for Kafka messages.
384#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
385pub enum Compression {
386    /// No compression.
387    #[default]
388    None,
389    /// Gzip compression.
390    Gzip,
391    /// Snappy compression.
392    Snappy,
393    /// LZ4 compression.
394    Lz4,
395    /// Zstandard compression.
396    Zstd,
397}
398
399/// Acknowledgment level for producer requests.
400#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
401pub enum Acks {
402    /// No acknowledgment (fire and forget).
403    None,
404    /// Wait for leader acknowledgment.
405    Leader,
406    /// Wait for all in-sync replicas.
407    #[default]
408    All,
409}
410
411impl Acks {
412    /// Convert to Kafka protocol value.
413    #[must_use]
414    pub const fn as_i16(&self) -> i16 {
415        match self {
416            Self::None => 0,
417            Self::Leader => 1,
418            Self::All => -1,
419        }
420    }
421}
422
423/// Configuration for Kafka producer.
424#[derive(Debug, Clone)]
425pub struct ProducerConfig {
426    /// Bootstrap server addresses (host:port).
427    pub bootstrap_servers: Vec<String>,
428    /// Client identifier.
429    pub client_id: Option<String>,
430    /// Batch size in bytes (default: 16KB).
431    pub batch_size: usize,
432    /// Linger time before sending batch (default: 5ms).
433    pub linger_ms: u64,
434    /// Compression algorithm.
435    pub compression: Compression,
436    /// Enable idempotent producer (exactly-once without transactions).
437    pub enable_idempotence: bool,
438    /// Acknowledgment level.
439    pub acks: Acks,
440    /// Maximum retries for transient failures.
441    pub retries: u32,
442    /// Request timeout.
443    pub request_timeout: Duration,
444    /// Maximum message size in bytes.
445    pub max_message_size: usize,
446}
447
448impl Default for ProducerConfig {
449    fn default() -> Self {
450        Self {
451            bootstrap_servers: vec!["localhost:9092".to_string()],
452            client_id: None,
453            batch_size: 16_384, // 16KB
454            linger_ms: 5,       // 5ms
455            compression: Compression::None,
456            enable_idempotence: true,
457            acks: Acks::All,
458            retries: 3,
459            request_timeout: Duration::from_secs(30),
460            max_message_size: 1_048_576, // 1MB
461        }
462    }
463}
464
465impl ProducerConfig {
466    /// Create a new producer configuration.
467    #[must_use]
468    pub fn new(bootstrap_servers: Vec<String>) -> Self {
469        Self {
470            bootstrap_servers,
471            ..Default::default()
472        }
473    }
474
475    /// Set the client identifier.
476    #[must_use]
477    pub fn client_id(mut self, client_id: &str) -> Self {
478        self.client_id = Some(client_id.to_string());
479        self
480    }
481
482    /// Set the batch size in bytes.
483    #[must_use]
484    pub const fn batch_size(mut self, size: usize) -> Self {
485        self.batch_size = size;
486        self
487    }
488
489    /// Set the linger time in milliseconds.
490    #[must_use]
491    pub const fn linger_ms(mut self, ms: u64) -> Self {
492        self.linger_ms = ms;
493        self
494    }
495
496    /// Set the compression algorithm.
497    #[must_use]
498    pub const fn compression(mut self, compression: Compression) -> Self {
499        self.compression = compression;
500        self
501    }
502
503    /// Enable or disable idempotent producer.
504    #[must_use]
505    pub const fn enable_idempotence(mut self, enable: bool) -> Self {
506        self.enable_idempotence = enable;
507        self
508    }
509
510    /// Set the acknowledgment level.
511    #[must_use]
512    pub const fn acks(mut self, acks: Acks) -> Self {
513        self.acks = acks;
514        self
515    }
516
517    /// Set the maximum number of retries.
518    #[must_use]
519    pub const fn retries(mut self, retries: u32) -> Self {
520        self.retries = retries;
521        self
522    }
523
524    /// Validate the configuration.
525    pub fn validate(&self) -> Result<(), KafkaError> {
526        if self.bootstrap_servers.is_empty() {
527            return Err(KafkaError::Config(
528                "bootstrap_servers cannot be empty".to_string(),
529            ));
530        }
531        if self.batch_size == 0 {
532            return Err(KafkaError::Config("batch_size must be > 0".to_string()));
533        }
534        if self.max_message_size == 0 {
535            return Err(KafkaError::Config(
536                "max_message_size must be > 0".to_string(),
537            ));
538        }
539        Ok(())
540    }
541}
542
543/// Metadata returned after successfully sending a message.
544#[derive(Debug, Clone)]
545pub struct RecordMetadata {
546    /// Topic the message was sent to.
547    pub topic: String,
548    /// Partition the message was written to.
549    pub partition: i32,
550    /// Offset within the partition.
551    pub offset: i64,
552    /// Timestamp of the message (milliseconds since epoch).
553    pub timestamp: Option<i64>,
554}
555
556/// Kafka producer (Phase 0 stub).
557///
558/// Provides the API shape for a Kafka producer with Cx integration.
559/// Full implementation requires rdkafka integration.
560#[derive(Debug)]
561pub struct KafkaProducer {
562    config: ProducerConfig,
563}
564
565impl KafkaProducer {
566    /// Create a new Kafka producer.
567    pub fn new(config: ProducerConfig) -> Result<Self, KafkaError> {
568        config.validate()?;
569        Ok(Self { config })
570    }
571
572    /// Send a message to a topic.
573    ///
574    /// # Arguments
575    /// * `cx` - Cancellation context
576    /// * `topic` - Target topic name
577    /// * `key` - Optional message key for partitioning
578    /// * `payload` - Message payload
579    /// * `partition` - Optional partition override
580    ///
581    /// # Errors
582    /// Returns an error if the message cannot be sent.
583    #[allow(unused_variables)]
584    pub async fn send(
585        &self,
586        cx: &Cx,
587        topic: &str,
588        key: Option<&[u8]>,
589        payload: &[u8],
590        partition: Option<i32>,
591    ) -> Result<RecordMetadata, KafkaError> {
592        cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
593
594        // Check message size
595        if payload.len() > self.config.max_message_size {
596            return Err(KafkaError::MessageTooLarge {
597                size: payload.len(),
598                max_size: self.config.max_message_size,
599            });
600        }
601
602        // Phase 0: stub implementation
603        Err(KafkaError::Io(io::Error::other(
604            "Phase 0: requires rdkafka integration",
605        )))
606    }
607
608    /// Send a message with headers.
609    ///
610    /// # Arguments
611    /// * `cx` - Cancellation context
612    /// * `topic` - Target topic name
613    /// * `key` - Optional message key for partitioning
614    /// * `payload` - Message payload
615    /// * `headers` - Key-value header pairs
616    #[allow(unused_variables)]
617    pub async fn send_with_headers(
618        &self,
619        cx: &Cx,
620        topic: &str,
621        key: Option<&[u8]>,
622        payload: &[u8],
623        headers: &[(&str, &[u8])],
624    ) -> Result<RecordMetadata, KafkaError> {
625        cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
626
627        if payload.len() > self.config.max_message_size {
628            return Err(KafkaError::MessageTooLarge {
629                size: payload.len(),
630                max_size: self.config.max_message_size,
631            });
632        }
633
634        Err(KafkaError::Io(io::Error::other(
635            "Phase 0: requires rdkafka integration",
636        )))
637    }
638
639    /// Flush all pending messages.
640    ///
641    /// Blocks until all messages in the queue are sent or the timeout expires.
642    #[allow(unused_variables)]
643    pub async fn flush(&self, cx: &Cx, timeout: Duration) -> Result<(), KafkaError> {
644        cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
645
646        Err(KafkaError::Io(io::Error::other(
647            "Phase 0: requires rdkafka integration",
648        )))
649    }
650
651    /// Get the current configuration.
652    #[must_use]
653    pub const fn config(&self) -> &ProducerConfig {
654        &self.config
655    }
656}
657
658/// Configuration for transactional producer.
659#[derive(Debug, Clone)]
660pub struct TransactionalConfig {
661    /// Base producer configuration.
662    pub producer: ProducerConfig,
663    /// Transaction ID (must be unique per producer instance).
664    pub transaction_id: String,
665    /// Transaction timeout.
666    pub transaction_timeout: Duration,
667}
668
669impl TransactionalConfig {
670    /// Create a new transactional configuration.
671    #[must_use]
672    pub fn new(producer: ProducerConfig, transaction_id: String) -> Self {
673        Self {
674            producer,
675            transaction_id,
676            transaction_timeout: Duration::from_mins(1),
677        }
678    }
679
680    /// Set the transaction timeout.
681    #[must_use]
682    pub const fn transaction_timeout(mut self, timeout: Duration) -> Self {
683        self.transaction_timeout = timeout;
684        self
685    }
686}
687
688/// Transactional Kafka producer for exactly-once semantics.
689///
690/// Provides atomic message publishing across multiple topics/partitions.
691#[derive(Debug)]
692pub struct TransactionalProducer {
693    config: TransactionalConfig,
694}
695
696impl TransactionalProducer {
697    /// Create a new transactional producer.
698    pub fn new(config: TransactionalConfig) -> Result<Self, KafkaError> {
699        config.producer.validate()?;
700
701        if config.transaction_id.is_empty() {
702            return Err(KafkaError::Config(
703                "transaction_id cannot be empty".to_string(),
704            ));
705        }
706
707        Ok(Self { config })
708    }
709
710    /// Begin a new transaction.
711    ///
712    /// Returns a `Transaction` that must be committed or aborted.
713    #[allow(unused_variables)]
714    pub async fn begin_transaction(&self, cx: &Cx) -> Result<Transaction<'_>, KafkaError> {
715        cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
716
717        // Phase 0: stub implementation
718        Err(KafkaError::Io(io::Error::other(
719            "Phase 0: requires rdkafka integration",
720        )))
721    }
722
723    /// Get the transaction ID.
724    #[must_use]
725    pub fn transaction_id(&self) -> &str {
726        &self.config.transaction_id
727    }
728
729    /// Get the current configuration.
730    #[must_use]
731    pub const fn config(&self) -> &TransactionalConfig {
732        &self.config
733    }
734}
735
736/// An active Kafka transaction.
737///
738/// Messages sent within a transaction are atomically committed or aborted.
739/// The transaction must be explicitly committed or aborted before being dropped.
740#[derive(Debug)]
741pub struct Transaction<'a> {
742    producer: &'a TransactionalProducer,
743    committed: bool,
744}
745
746impl Transaction<'_> {
747    /// Send a message within the transaction.
748    #[allow(unused_variables)]
749    pub async fn send(
750        &self,
751        cx: &Cx,
752        topic: &str,
753        key: Option<&[u8]>,
754        payload: &[u8],
755    ) -> Result<(), KafkaError> {
756        cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
757
758        Err(KafkaError::Io(io::Error::other(
759            "Phase 0: requires rdkafka integration",
760        )))
761    }
762
763    /// Commit the transaction.
764    ///
765    /// Atomically publishes all messages sent within this transaction.
766    #[allow(unused_variables)]
767    pub async fn commit(mut self, cx: &Cx) -> Result<(), KafkaError> {
768        cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
769
770        // Phase 0: stub - mark as committed to prevent drop warning
771        self.committed = true;
772
773        Err(KafkaError::Io(io::Error::other(
774            "Phase 0: requires rdkafka integration",
775        )))
776    }
777
778    /// Abort the transaction.
779    ///
780    /// Discards all messages sent within this transaction.
781    #[allow(unused_variables)]
782    pub async fn abort(mut self, cx: &Cx) -> Result<(), KafkaError> {
783        cx.checkpoint().map_err(|_| KafkaError::Cancelled)?;
784
785        // Phase 0: stub - mark as committed to prevent drop warning
786        self.committed = true;
787
788        Err(KafkaError::Io(io::Error::other(
789            "Phase 0: requires rdkafka integration",
790        )))
791    }
792}
793
794impl Drop for Transaction<'_> {
795    fn drop(&mut self) {
796        if !self.committed {
797            // In production, this should log a warning about uncommitted transaction
798            // The broker will abort it after transaction.timeout.ms expires
799        }
800    }
801}
802
803#[cfg(test)]
804mod tests {
805    use super::*;
806
807    #[test]
808    fn test_acks_values() {
809        assert_eq!(Acks::None.as_i16(), 0);
810        assert_eq!(Acks::Leader.as_i16(), 1);
811        assert_eq!(Acks::All.as_i16(), -1);
812    }
813
814    #[test]
815    fn test_config_defaults() {
816        let config = ProducerConfig::default();
817        assert_eq!(config.batch_size, 16_384);
818        assert_eq!(config.linger_ms, 5);
819        assert!(config.enable_idempotence);
820        assert_eq!(config.acks, Acks::All);
821    }
822
823    #[test]
824    fn test_config_builder() {
825        let config = ProducerConfig::new(vec!["kafka:9092".to_string()])
826            .client_id("my-producer")
827            .batch_size(32_768)
828            .compression(Compression::Snappy)
829            .acks(Acks::Leader);
830
831        assert_eq!(config.bootstrap_servers, vec!["kafka:9092"]);
832        assert_eq!(config.client_id, Some("my-producer".to_string()));
833        assert_eq!(config.batch_size, 32_768);
834        assert_eq!(config.compression, Compression::Snappy);
835        assert_eq!(config.acks, Acks::Leader);
836    }
837
838    #[test]
839    fn test_config_validation() {
840        let empty_servers = ProducerConfig {
841            bootstrap_servers: vec![],
842            ..Default::default()
843        };
844        assert!(empty_servers.validate().is_err());
845
846        let valid = ProducerConfig::default();
847        assert!(valid.validate().is_ok());
848    }
849
850    #[test]
851    fn test_producer_creation() {
852        let config = ProducerConfig::default();
853        let producer = KafkaProducer::new(config);
854        assert!(producer.is_ok());
855    }
856
857    #[test]
858    fn test_transactional_config() {
859        let config =
860            TransactionalConfig::new(ProducerConfig::default(), "my-transaction-id".to_string())
861                .transaction_timeout(Duration::from_mins(2));
862
863        assert_eq!(config.transaction_id, "my-transaction-id");
864        assert_eq!(config.transaction_timeout, Duration::from_mins(2));
865    }
866
867    #[test]
868    fn test_transactional_producer_empty_id() {
869        let config = TransactionalConfig::new(ProducerConfig::default(), String::new());
870        let producer = TransactionalProducer::new(config);
871        assert!(producer.is_err());
872    }
873
874    #[test]
875    fn test_error_display() {
876        let io_err = KafkaError::Io(io::Error::other("test"));
877        assert!(io_err.to_string().contains("I/O error"));
878
879        let msg_err = KafkaError::MessageTooLarge {
880            size: 2_000_000,
881            max_size: 1_000_000,
882        };
883        assert!(msg_err.to_string().contains("2000000"));
884        assert!(msg_err.to_string().contains("1000000"));
885
886        let cancelled = KafkaError::Cancelled;
887        assert!(cancelled.to_string().contains("cancelled"));
888    }
889
890    #[test]
891    fn test_record_metadata() {
892        let meta = RecordMetadata {
893            topic: "test-topic".to_string(),
894            partition: 0,
895            offset: 42,
896            timestamp: Some(1_234_567_890),
897        };
898        assert_eq!(meta.topic, "test-topic");
899        assert_eq!(meta.partition, 0);
900        assert_eq!(meta.offset, 42);
901        assert_eq!(meta.timestamp, Some(1_234_567_890));
902    }
903
904    // Pure data-type tests (wave 13 – CyanBarn)
905
906    #[test]
907    fn kafka_error_display_all_variants() {
908        assert!(
909            KafkaError::Io(io::Error::other("e"))
910                .to_string()
911                .contains("I/O error")
912        );
913        assert!(
914            KafkaError::Protocol("p".into())
915                .to_string()
916                .contains("protocol error")
917        );
918        assert!(
919            KafkaError::Broker("b".into())
920                .to_string()
921                .contains("broker error")
922        );
923        assert!(KafkaError::QueueFull.to_string().contains("queue is full"));
924        assert!(
925            KafkaError::MessageTooLarge {
926                size: 10,
927                max_size: 5
928            }
929            .to_string()
930            .contains("10")
931        );
932        assert!(
933            KafkaError::InvalidTopic("bad".into())
934                .to_string()
935                .contains("bad")
936        );
937        assert!(
938            KafkaError::Transaction("tx".into())
939                .to_string()
940                .contains("transaction error")
941        );
942        assert!(KafkaError::Cancelled.to_string().contains("cancelled"));
943        assert!(
944            KafkaError::Config("cfg".into())
945                .to_string()
946                .contains("configuration error")
947        );
948    }
949
950    #[test]
951    fn kafka_error_debug() {
952        let err = KafkaError::QueueFull;
953        let dbg = format!("{err:?}");
954        assert!(dbg.contains("QueueFull"));
955    }
956
957    #[test]
958    fn kafka_error_source_io() {
959        let err = KafkaError::Io(io::Error::other("disk"));
960        let src = std::error::Error::source(&err);
961        assert!(src.is_some());
962    }
963
964    #[test]
965    fn kafka_error_source_none_for_others() {
966        let err = KafkaError::Cancelled;
967        assert!(std::error::Error::source(&err).is_none());
968    }
969
970    #[test]
971    fn kafka_error_from_io() {
972        let io_err = io::Error::other("net");
973        let err: KafkaError = KafkaError::from(io_err);
974        assert!(matches!(err, KafkaError::Io(_)));
975    }
976
977    #[test]
978    fn compression_default_is_none() {
979        assert_eq!(Compression::default(), Compression::None);
980    }
981
982    #[test]
983    fn compression_debug_clone_copy_eq() {
984        let c = Compression::Snappy;
985        let dbg = format!("{c:?}");
986        assert!(dbg.contains("Snappy"));
987
988        let copy = c;
989        assert_eq!(c, copy);
990    }
991
992    #[test]
993    fn compression_all_variants_ne() {
994        let variants = [
995            Compression::None,
996            Compression::Gzip,
997            Compression::Snappy,
998            Compression::Lz4,
999            Compression::Zstd,
1000        ];
1001        for (i, a) in variants.iter().enumerate() {
1002            for (j, b) in variants.iter().enumerate() {
1003                if i != j {
1004                    assert_ne!(a, b);
1005                }
1006            }
1007        }
1008    }
1009
1010    #[test]
1011    fn acks_default_is_all() {
1012        assert_eq!(Acks::default(), Acks::All);
1013    }
1014
1015    #[test]
1016    fn acks_debug_clone_copy_eq() {
1017        let a = Acks::Leader;
1018        let dbg = format!("{a:?}");
1019        assert!(dbg.contains("Leader"));
1020
1021        let copy = a;
1022        assert_eq!(a, copy);
1023    }
1024
1025    #[test]
1026    fn acks_as_i16_all_variants() {
1027        assert_eq!(Acks::None.as_i16(), 0);
1028        assert_eq!(Acks::Leader.as_i16(), 1);
1029        assert_eq!(Acks::All.as_i16(), -1);
1030    }
1031
1032    #[test]
1033    fn producer_config_default_values() {
1034        let cfg = ProducerConfig::default();
1035        assert_eq!(cfg.bootstrap_servers, vec!["localhost:9092".to_string()]);
1036        assert!(cfg.client_id.is_none());
1037        assert_eq!(cfg.batch_size, 16_384);
1038        assert_eq!(cfg.linger_ms, 5);
1039        assert_eq!(cfg.compression, Compression::None);
1040        assert!(cfg.enable_idempotence);
1041        assert_eq!(cfg.acks, Acks::All);
1042        assert_eq!(cfg.retries, 3);
1043        assert_eq!(cfg.request_timeout, Duration::from_secs(30));
1044        assert_eq!(cfg.max_message_size, 1_048_576);
1045    }
1046
1047    #[test]
1048    fn producer_config_debug_clone() {
1049        let cfg = ProducerConfig::default();
1050        let dbg = format!("{cfg:?}");
1051        assert!(dbg.contains("ProducerConfig"));
1052
1053        let cloned = cfg;
1054        assert_eq!(cloned.batch_size, 16_384);
1055    }
1056
1057    #[test]
1058    fn producer_config_builder_linger_retries() {
1059        let cfg = ProducerConfig::new(vec!["k:9092".into()])
1060            .linger_ms(100)
1061            .retries(10)
1062            .enable_idempotence(false);
1063        assert_eq!(cfg.linger_ms, 100);
1064        assert_eq!(cfg.retries, 10);
1065        assert!(!cfg.enable_idempotence);
1066    }
1067
1068    #[test]
1069    fn producer_config_validate_zero_batch_size() {
1070        let cfg = ProducerConfig {
1071            batch_size: 0,
1072            ..Default::default()
1073        };
1074        assert!(cfg.validate().is_err());
1075    }
1076
1077    #[test]
1078    fn producer_config_validate_zero_max_message() {
1079        let cfg = ProducerConfig {
1080            max_message_size: 0,
1081            ..Default::default()
1082        };
1083        assert!(cfg.validate().is_err());
1084    }
1085
1086    #[test]
1087    fn record_metadata_debug_clone() {
1088        let meta = RecordMetadata {
1089            topic: "t".into(),
1090            partition: 1,
1091            offset: 99,
1092            timestamp: None,
1093        };
1094        let dbg = format!("{meta:?}");
1095        assert!(dbg.contains("RecordMetadata"));
1096
1097        let cloned = meta;
1098        assert_eq!(cloned.partition, 1);
1099        assert!(cloned.timestamp.is_none());
1100    }
1101
1102    #[test]
1103    fn kafka_producer_config_accessor() {
1104        let cfg = ProducerConfig::new(vec!["host:9092".into()]).batch_size(999);
1105        let producer = KafkaProducer::new(cfg).unwrap();
1106        assert_eq!(producer.config().batch_size, 999);
1107    }
1108
1109    #[test]
1110    fn kafka_producer_debug() {
1111        let producer = KafkaProducer::new(ProducerConfig::default()).unwrap();
1112        let dbg = format!("{producer:?}");
1113        assert!(dbg.contains("KafkaProducer"));
1114    }
1115
1116    #[test]
1117    fn kafka_producer_reject_empty_servers() {
1118        let cfg = ProducerConfig {
1119            bootstrap_servers: vec![],
1120            ..Default::default()
1121        };
1122        assert!(KafkaProducer::new(cfg).is_err());
1123    }
1124
1125    #[test]
1126    fn transactional_config_debug_clone() {
1127        let tc = TransactionalConfig::new(ProducerConfig::default(), "tx-1".into());
1128        let dbg = format!("{tc:?}");
1129        assert!(dbg.contains("TransactionalConfig"));
1130
1131        let cloned = tc;
1132        assert_eq!(cloned.transaction_id, "tx-1");
1133    }
1134
1135    #[test]
1136    fn transactional_config_default_timeout() {
1137        let tc = TransactionalConfig::new(ProducerConfig::default(), "tx-2".into());
1138        assert_eq!(tc.transaction_timeout, Duration::from_mins(1));
1139    }
1140
1141    #[test]
1142    fn transactional_producer_debug() {
1143        let tc = TransactionalConfig::new(ProducerConfig::default(), "tx-3".into());
1144        let producer = TransactionalProducer::new(tc).unwrap();
1145        let dbg = format!("{producer:?}");
1146        assert!(dbg.contains("TransactionalProducer"));
1147    }
1148
1149    #[test]
1150    fn transactional_producer_accessors() {
1151        let tc = TransactionalConfig::new(ProducerConfig::default(), "tx-4".into());
1152        let producer = TransactionalProducer::new(tc).unwrap();
1153        assert_eq!(producer.transaction_id(), "tx-4");
1154        assert_eq!(producer.config().transaction_id, "tx-4");
1155    }
1156
1157    #[test]
1158    fn compression_debug_clone_copy_default_eq() {
1159        let c = Compression::default();
1160        assert_eq!(c, Compression::None);
1161        let dbg = format!("{c:?}");
1162        assert!(dbg.contains("None"), "{dbg}");
1163        let copied: Compression = c;
1164        let cloned = c;
1165        assert_eq!(copied, cloned);
1166        assert_ne!(c, Compression::Zstd);
1167    }
1168
1169    #[test]
1170    fn acks_debug_clone_copy_default_eq() {
1171        let a = Acks::default();
1172        assert_eq!(a, Acks::All);
1173        let dbg = format!("{a:?}");
1174        assert!(dbg.contains("All"), "{dbg}");
1175        let copied: Acks = a;
1176        let cloned = a;
1177        assert_eq!(copied, cloned);
1178        assert_ne!(a, Acks::Leader);
1179    }
1180}