Skip to main content

oxirs_stream/
producer.rs

1//! # Stream Producer
2//!
3//! Producer types and configuration for streaming backends.
4
5use serde::{Deserialize, Serialize};
6
7/// Producer configuration
8#[derive(Debug, Clone, Serialize, Deserialize)]
9pub struct ProducerConfig {
10    pub client_id: String,
11    pub acks: AckLevel,
12    pub retries: u32,
13    pub batch_size: usize,
14    pub linger_ms: u64,
15    pub buffer_memory: usize,
16    pub compression_type: CompressionType,
17    pub request_timeout_ms: u64,
18    pub delivery_timeout_ms: u64,
19    pub enable_idempotence: bool,
20    pub transactional_id: Option<String>,
21}
22
23impl Default for ProducerConfig {
24    fn default() -> Self {
25        Self {
26            client_id: "oxirs-producer".to_string(),
27            acks: AckLevel::All,
28            retries: 3,
29            batch_size: 16384,
30            linger_ms: 10,
31            buffer_memory: 33554432, // 32MB
32            compression_type: CompressionType::None,
33            request_timeout_ms: 30000,
34            delivery_timeout_ms: 120000,
35            enable_idempotence: true,
36            transactional_id: None,
37        }
38    }
39}
40
41/// Acknowledgment level for producers
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
43pub enum AckLevel {
44    /// No acknowledgment
45    None,
46    /// Leader acknowledgment only
47    Leader,
48    /// All in-sync replicas acknowledgment
49    All,
50}
51
52/// Compression types
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
54pub enum CompressionType {
55    None,
56    Gzip,
57    Snappy,
58    Lz4,
59    Zstd,
60}
61
62/// Producer record with key and headers
63#[derive(Debug, Clone)]
64pub struct ProducerRecord {
65    pub topic: String,
66    pub partition: Option<u32>,
67    pub key: Option<Vec<u8>>,
68    pub value: Vec<u8>,
69    pub headers: Vec<(String, Vec<u8>)>,
70    pub timestamp: Option<u64>,
71}