# Transactions
Transactional producers use Kafka's transaction protocol v2 path and require
Kafka 4.0 or newer.
Use transactions when records must commit or abort together, or when consumed
offsets must be committed atomically with produced output records.
## Producer Transaction
```rust,no_run
use kafkit_client::{KafkaProducer, ProduceRecord, ProducerConfig};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let producer = KafkaProducer::connect(
ProducerConfig::new("localhost:9092")
.with_acks(-1)
.with_transactional_id("orders-tx-producer"),
)
.await?;
producer.begin_transaction().await?;
producer
.send(ProduceRecord::new("orders-output", 0, "created"))
.await?;
producer.commit_transaction().await?;
producer.shutdown().await?;
Ok(())
}
```
If processing fails after `begin_transaction()`, call `abort_transaction()` and
decide whether to retry the work.
## Read-Process-Write
Use `send_offsets_to_transaction(...)` when output records and consumed offsets
must be committed atomically.
```rust,no_run
use kafkit_client::{
AutoOffsetReset, CommitOffset, ConsumerGroupMetadata, KafkaClient, KafkaProducer,
ProduceRecord, ProducerConfig,
};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let consumer = KafkaClient::new("localhost:9092")
.topic("orders-input")
.consumer("orders-worker")
.with_auto_offset_reset(AutoOffsetReset::Earliest)
.connect()
.await?;
let producer = KafkaProducer::connect(
ProducerConfig::new("localhost:9092")
.with_acks(-1)
.with_transactional_id("orders-worker-1"),
)
.await?;
let records = consumer.poll().await?;
let offsets: Vec<CommitOffset> = records.commit_offsets();
let group_metadata: ConsumerGroupMetadata = consumer.group_metadata().await?;
producer.begin_transaction().await?;
for record in records.iter() {
let value = record.value.clone().unwrap_or_default();
producer
.send(ProduceRecord::new("orders-output", 0, value))
.await?;
}
producer
.send_offsets_to_transaction(offsets, group_metadata)
.await?;
producer.commit_transaction().await?;
consumer.shutdown().await?;
producer.shutdown().await?;
Ok(())
}
```
## Requirements And Behavior
- A stable `transactional_id` is required.
- `begin_transaction().await` initializes the transactional producer on first
use, then starts a transaction.
- `init_transactions().await` is optional. Use it only when an application wants
eager startup-time transaction readiness and fencing.
- `commit_transaction().await` commits produced records and transaction offsets.
- `abort_transaction().await` aborts the open transaction.
- `send_offsets_to_transaction(...)` is for atomic offset commits.
- Transactional and idempotent producers require `acks = -1`.
- Ambiguous transaction completion failures, such as a lost commit or abort
response, make that producer instance fatal. Create a new producer with the
same stable `transactional_id` to continue after the broker has recovered.
- Abort-required failures must be followed by `abort_transaction().await` before
the producer can start another transaction.
## When Not To Use Transactions
- Use plain producing when duplicates and atomicity are handled elsewhere.
- Use idempotent non-transactional producing when duplicate protection is useful
but offset commits do not need to be atomic with output records.
- Avoid transactions for fire-and-forget telemetry or best-effort workloads.