1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
//! A single partition's runtime handle. Owned by the partition registry
//! inside `Broker`. The handle gives any task:
//!
//! - read access to the partition's [`Log`] via `Arc<Mutex<Log>>`
//! - write access via a `mpsc::Sender<WriterMessage>` (a single writer task
//! drains the channel; see `partition_writer.rs`)
//! - a [`Notify`] that fires after every successful append, used by
//! long-poll Fetch to wake when new data arrives.
// Fields (`log`, `writer_tx`, `append_notify`) are consumed by the Produce
// + Fetch handlers landing in Tasks 15-16; keep this allow until then.
#![allow(dead_code)]
use std::path::PathBuf;
use std::sync::atomic::{AtomicI32, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use arc_swap::ArcSwap;
use crabka_log::{AbortedTxn, Log, ReadOutput};
use crabka_protocol::records::RecordBatch;
use tokio::sync::{Notify, mpsc, oneshot};
use tokio::task::JoinHandle;
// `std::sync::Mutex` is kept for `log` (sync hot-path callers);
// `replica_state` uses `tokio::sync::Mutex` to avoid blocking worker threads.
use crate::error::BrokerError;
use crate::replica_state::ReplicaState;
/// Produce-path message sent from the Produce handler to the partition's
/// writer task. The writer assigns `base_offset` (overwriting whatever the
/// handler put there) and replies with the assigned value.
#[derive(Debug)]
pub struct ProduceJob {
/// The batch to append. The writer mutates `base_offset` before append.
pub batch: RecordBatch,
/// Oneshot for the writer to report success (base offset assigned)
/// or failure back to the handler.
pub ack: oneshot::Sender<Result<i64, BrokerError>>,
}
/// All message kinds the partition's writer task accepts.
///
/// The writer task is single-consumer over a single `mpsc::Sender`; using
/// an enum here keeps replication appends ordered with produce appends.
#[derive(Debug)]
pub enum WriterMessage {
/// Append a batch, assigning `base_offset` from the log. Used by the
/// `Produce` handler.
Produce(ProduceJob),
/// Append a batch at the caller-supplied offset (already assigned by
/// the partition's leader). Used by the per-(topic, partition)
/// replicator on a follower broker.
Replicate {
batch: RecordBatch,
ack: oneshot::Sender<Result<(), BrokerError>>,
},
/// Truncate the log so no records at offset `>= offset` remain. Used
/// by the replicator's `OFFSET_OUT_OF_RANGE` recovery path.
Truncate {
offset: i64,
ack: oneshot::Sender<Result<(), BrokerError>>,
},
/// Drop every segment and recreate the active segment at `new_base`.
/// Used by the replicator's `OFFSET_OUT_OF_RANGE` recovery path when
/// the follower has fallen behind the leader's `log_start` — the
/// follower must move its own `log_start` *forward* past records it
/// never saw, which `Truncate` can't do.
ResetTo {
new_base: i64,
ack: oneshot::Sender<Result<(), BrokerError>>,
},
/// Atomically swap the partition's `LogConfig`. The writer task
/// serializes this with appends so no in-flight `RecordBatch` sees a
/// half-applied config. Sent by
/// `ReplicatorSupervisor::reconcile` whenever a `V1TopicConfig`
/// record changes the topic's overrides.
SetLogConfig {
config: crabka_log::LogConfig,
ack: tokio::sync::oneshot::Sender<()>,
},
/// Run one compaction pass. The writer actor serializes this with
/// appends to preserve the single-writer invariant on `Log`.
Compact {
ack: tokio::sync::oneshot::Sender<Result<(), BrokerError>>,
},
/// Trim from the start of the log: drop sealed segments whose last
/// offset is `< new_start`, advance `log_start_offset` if `new_start`
/// falls inside the active segment. Returns the resulting
/// `log_start_offset` (which may be less than `new_start` when
/// `new_start` falls between segment boundaries — Kafka semantics).
/// Used by the `DeleteRecords` handler.
TrimToOffset {
new_start: i64,
ack: tokio::sync::oneshot::Sender<Result<i64, BrokerError>>,
},
/// Test-only: shift the in-memory `log_start_offset` without
/// physically truncating segments. Simulates retention-driven
/// truncation for the `out_of_range_truncates_and_recovers`
/// replication integration test.
#[cfg(any(test, feature = "test-helpers"))]
TestSetLogStart {
new_start: i64,
ack: oneshot::Sender<Result<(), BrokerError>>,
},
/// Atomically swap the partition's `Log` to a future log that has
/// fully caught up. Sent by the KIP-113 move task in
/// `future_log.rs` once `future_log.LEO == current_log.LEO`. The
/// writer re-checks the invariant under its own lock, then:
/// 1. drops the current `Log`,
/// 2. `fs::rename`s `future_path` → `target_partition_path`,
/// 3. removes the source partition directory,
/// 4. re-opens `Log` at `target_partition_path` and stores it,
/// 5. updates `Partition.log_dir` to `target_log_dir`.
///
/// If the future log fell behind during the request hop, returns
/// `Ok(SwapOutcome::NotCaughtUp)` so the caller can loop once more.
SwapFutureLog {
target_log_dir: PathBuf,
future_log: Arc<Mutex<Log>>,
future_path: PathBuf,
target_partition_path: PathBuf,
ack: oneshot::Sender<Result<SwapOutcome, BrokerError>>,
},
}
/// Result of a [`WriterMessage::SwapFutureLog`] handling cycle.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SwapOutcome {
/// The swap succeeded; the partition is now serving from the
/// target log dir and the source dir has been removed.
Swapped,
/// The future log was behind the current log when the writer
/// re-checked. The caller should resume replication and retry.
NotCaughtUp,
}
/// Returned by `await_hw_at_least` when the deadline elapses before
/// the High Watermark reaches the target offset.
#[derive(Debug)]
pub struct HwTimeout;
/// Runtime handle for a single partition.
///
/// Cheap to clone — `log`, `writer_tx`, `append_notify` are all `Arc`-ish
/// and the writer handle isn't cloned (`Arc<JoinHandle<()>>` wraps it).
#[derive(Clone)]
// `partition_id` mirrors Kafka's wire naming and is the conventional term
// used throughout the broker; renaming to `id` would shadow `Partition`'s
// own identity at every call site.
#[allow(clippy::struct_field_names)]
pub struct Partition {
pub topic: String,
pub partition_id: i32,
/// Parent `log.dir` currently owning the partition (the parent of
/// `log.lock().dir()` — i.e. the configured directory, not the
/// `<topic>-<partition>` subdirectory). Updated by
/// [`WriterMessage::SwapFutureLog`] as the last step of a KIP-113
/// move. `ArcSwap` so readers (e.g. `DescribeLogDirs`,
/// `AlterReplicaLogDirs` validation) see the swap atomically
/// without taking the `log` mutex.
pub log_dir: Arc<ArcSwap<PathBuf>>,
pub log: Arc<Mutex<Log>>,
pub writer_tx: mpsc::Sender<WriterMessage>,
pub append_notify: Arc<Notify>,
pub(crate) replica_state: Arc<tokio::sync::Mutex<ReplicaState>>,
pub hw_advance_notify: Arc<Notify>,
/// Current leader's `NodeId` from the metadata image. Atomic for
/// lock-free reads in the Produce/Fetch hot paths.
pub current_leader: Arc<AtomicU64>,
/// Current `leader_epoch` from the metadata image. Stamped on every
/// appended batch; validated on every follower Fetch.
pub current_leader_epoch: Arc<AtomicI32>,
/// Held so the writer task is reaped when every `Partition` handle is
/// dropped. Not accessed after construction.
#[allow(clippy::pub_underscore_fields)]
pub _writer_handle: Arc<JoinHandle<()>>,
}
impl Partition {
/// Next offset the underlying [`Log`] will assign. Cheap: takes the
/// `Arc<Mutex<Log>>` briefly. Replicators call this before each Fetch
/// to compute `fetch_offset`.
///
/// Returns 0 if the log mutex is poisoned (i.e. the writer task
/// panicked). The caller treats that as "not making progress" and the
/// writer-died path eventually surfaces a clearer error.
#[must_use]
pub fn log_end_offset(&self) -> i64 {
match self.log.lock() {
Ok(g) => g.log_end_offset(),
Err(_) => 0,
}
}
/// Last Stable Offset: the highest offset at or before which all records
/// in all in-flight transactions have been resolved (committed or aborted).
/// Cheap: takes the `Arc<Mutex<Log>>` briefly.
///
/// Returns 0 if the log mutex is poisoned (i.e. the writer task
/// panicked). The caller treats that as "not making progress" and the
/// writer-died path eventually surfaces a clearer error.
#[must_use]
pub fn lso(&self) -> i64 {
match self.log.lock() {
Ok(g) => g.lso(),
Err(_) => 0,
}
}
/// Push `overrides` (already-validated; see `config_keys`) through the
/// writer actor so the partition's `Log` picks up the new
/// `retention.ms` / `retention.bytes` / `segment.bytes` on the next
/// retention/roll tick. Idempotent: pushing the same map twice is a
/// cheap noop. Called by `ReplicatorSupervisor::reconcile` every time
/// the metadata image changes.
///
/// # Errors
///
/// Returns `BrokerError::Replication` if the writer is dead or the
/// ack is dropped.
pub(crate) async fn apply_log_config_overrides(
&self,
overrides: &std::collections::BTreeMap<String, String>,
) -> Result<(), BrokerError> {
let merged =
crate::config_keys::apply_to_log_config(overrides, &crabka_log::LogConfig::default());
let (ack_tx, ack_rx) = oneshot::channel();
self.writer_tx
.send(WriterMessage::SetLogConfig {
config: merged,
ack: ack_tx,
})
.await
.map_err(|_| BrokerError::Replication("partition writer dead".into()))?;
ack_rx
.await
.map_err(|_| BrokerError::Replication("ack dropped".into()))?;
Ok(())
}
/// Append a leader-assigned batch to the local log, preserving its
/// `base_offset`. Used by the per-partition replicator on a follower
/// broker. Sends the batch through the writer task so it stays
/// ordered with produce appends (which, on a follower, will be
/// rejected by the produce handler anyway, but the channel ordering
/// is still part of the invariant).
pub async fn replicate_batch(&self, batch: RecordBatch) -> Result<(), BrokerError> {
let (ack_tx, ack_rx) = oneshot::channel();
self.writer_tx
.send(WriterMessage::Replicate { batch, ack: ack_tx })
.await
.map_err(|_| BrokerError::Replication("partition writer dead".into()))?;
ack_rx
.await
.map_err(|_| BrokerError::Replication("ack dropped".into()))?
}
/// Truncate the log to `offset`, dropping all records at offsets
/// `>= offset`. Used by the replicator's `OFFSET_OUT_OF_RANGE`
/// recovery path and the KIP-320 in-band `diverging_epoch` truncation
/// path (which passes the leader's epoch boundary, not just 0).
pub async fn truncate_to(&self, offset: i64) -> Result<(), BrokerError> {
let (ack_tx, ack_rx) = oneshot::channel();
self.writer_tx
.send(WriterMessage::Truncate {
offset,
ack: ack_tx,
})
.await
.map_err(|_| BrokerError::Replication("partition writer dead".into()))?;
ack_rx
.await
.map_err(|_| BrokerError::Replication("ack dropped".into()))?
}
/// Drop every segment and recreate the active segment at `new_base`.
/// Goes through the writer task so it stays ordered with appends.
pub async fn reset_to(&self, new_base: i64) -> Result<(), BrokerError> {
let (ack_tx, ack_rx) = oneshot::channel();
self.writer_tx
.send(WriterMessage::ResetTo {
new_base,
ack: ack_tx,
})
.await
.map_err(|_| BrokerError::Replication("partition writer dead".into()))?;
ack_rx
.await
.map_err(|_| BrokerError::Replication("ack dropped".into()))?
}
/// Send a trim request through the writer actor. Returns the resulting
/// `log_start_offset`. Used by the `DeleteRecords` handler.
///
/// # Errors
///
/// Returns `BrokerError` if the writer is dead, the ack is dropped,
/// or the underlying `Log::trim_to_offset` fails (negative offset).
pub async fn trim_to_offset(&self, new_start: i64) -> Result<i64, BrokerError> {
let (ack_tx, ack_rx) = oneshot::channel();
self.writer_tx
.send(WriterMessage::TrimToOffset {
new_start,
ack: ack_tx,
})
.await
.map_err(|_| BrokerError::Replication("partition writer dead".into()))?;
ack_rx
.await
.map_err(|_| BrokerError::Replication("ack dropped".into()))?
}
/// Send a `WriterMessage::Compact` to the partition's writer
/// actor and await the ack. Used by the broker-wide [`Cleaner`]
/// ticker.
pub async fn compact_log(&self) -> Result<(), BrokerError> {
let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
self.writer_tx
.send(WriterMessage::Compact { ack: ack_tx })
.await
.map_err(|_| BrokerError::Replication("partition writer dead".into()))?;
ack_rx
.await
.map_err(|_| BrokerError::Replication("compact ack dropped".into()))?
}
/// First absolute offset still present in the underlying [`Log`].
/// Cheap: takes the `Arc<Mutex<Log>>` briefly.
///
/// Returns 0 if the log mutex is poisoned (i.e. the writer task panicked).
/// Used by `TxnCoordinator::recover` to seed the replay scan offset.
#[must_use]
pub(crate) fn log_start_offset(&self) -> i64 {
match self.log.lock() {
Ok(g) => g.log_start_offset(),
Err(_) => 0,
}
}
/// Return aborted transactions from the active segment's `.txnindex`
/// whose offset range overlaps `[start, end)`.
///
/// Locks the `Arc<Mutex<Log>>` briefly. Returns an empty `Vec` if
/// the mutex is poisoned.
#[must_use]
pub fn aborted_in_range(&self, start: i64, end: i64) -> Vec<AbortedTxn> {
match self.log.lock() {
Ok(g) => g.aborted_in_range(start, end),
Err(_) => Vec::new(),
}
}
/// Read batches from the underlying [`Log`] starting at `offset`,
/// returning up to `max_bytes` of data.
///
/// Locks the `Arc<Mutex<Log>>` for the duration of the read. Used by
/// `TxnCoordinator::recover` to replay `__transaction_state` records.
///
/// # Errors
///
/// Returns [`BrokerError::Log`] if the underlying [`Log::read`] fails
/// (e.g. `offset < log_start_offset()`).
pub(crate) fn read_log(
&self,
offset: i64,
max_bytes: usize,
) -> Result<ReadOutput, BrokerError> {
self.log
.lock()
.map_err(|_| BrokerError::Txn("log mutex poisoned".into()))?
.read(offset, max_bytes)
.map_err(BrokerError::from)
}
/// Append `batch` to the local log at the next assigned offset, going
/// through the partition's writer task so the append is ordered with
/// all other produce appends. Returns the assigned `base_offset`.
///
/// Used by `TxnCoordinator::put` to persist `__transaction_state` records.
///
/// # Errors
///
/// Returns [`BrokerError::Txn`] if the writer task is dead or the ack
/// channel closes before the writer replies.
pub(crate) async fn produce_batch(&self, batch: RecordBatch) -> Result<i64, BrokerError> {
let (ack_tx, ack_rx) = oneshot::channel();
self.writer_tx
.send(WriterMessage::Produce(ProduceJob { batch, ack: ack_tx }))
.await
.map_err(|_| BrokerError::Txn("partition writer dead".into()))?;
ack_rx
.await
.map_err(|_| BrokerError::Txn("ack dropped".into()))?
}
/// Cached High Watermark. Awaits `replica_state` cooperatively so it
/// doesn't block tokio worker threads.
#[must_use]
pub async fn high_watermark(&self) -> i64 {
self.replica_state.lock().await.hw
}
/// KIP-392: record the high watermark the leader reported in a follower
/// Fetch response, so consumer reads served from this follower are bounded
/// correctly. Clamps to the local log end (never expose records we have not
/// replicated yet) and only advances `hw` (HW is monotonic). Fires
/// `hw_advance_notify` when it advances so a consumer parked at the old HW
/// wakes.
pub async fn set_follower_hw(&self, reported_hw: i64) {
let log_end = self.log_end_offset();
let new_hw = reported_hw.min(log_end);
let advanced = {
let mut st = self.replica_state.lock().await;
if new_hw > st.hw {
st.hw = new_hw;
true
} else {
false
}
};
if advanced {
self.hw_advance_notify.notify_waiters();
}
}
/// Install (or reinstall) the ISR membership and seed non-leader
/// follower entries to 0. Called by the replicator supervisor
/// when this broker materializes a partition where it's the leader.
/// Idempotent: re-installing the same `(isr, replicas, leader)`
/// preserves existing follower progress.
///
/// `isr` is the committed in-sync set; `replicas` is the full replica
/// assignment. Follower-progress tracking is keyed on `replicas` so a
/// replica catching up toward ISR re-admission keeps its progress
/// across reconciles — see [`crate::replica_state::ReplicaState::install_isr`].
///
/// Recomputes HW under the new ISR and fires `hw_advance_notify`
/// if HW advanced — necessary because an `AlterPartition` shrink
/// may have just dropped a lagging follower, and the surviving
/// followers' LEOs (which were already being updated via fetch)
/// can now satisfy a previously-blocked acks=-1 produce without
/// waiting for the next fetch round.
pub async fn install_isr(
&self,
isr: &[crabka_raft::NodeId],
replicas: &[crabka_raft::NodeId],
leader: crabka_raft::NodeId,
) {
let leader_leo = self.log_end_offset();
let mut st = self.replica_state.lock().await;
let prev_hw = st.hw;
st.install_isr(isr, replicas, leader);
let new_hw = st.recompute_hw_for_leader_append(leader_leo);
drop(st);
if new_hw > prev_hw {
self.hw_advance_notify.notify_waiters();
}
}
/// Apply a leader change observed via the metadata image. Updates
/// the cached `current_leader` + `current_leader_epoch`. If the
/// leader or epoch actually changed, clears per-follower stats
/// (stale under the new leader's view). On idempotent re-installs
/// (same leader + epoch) per-follower progress is preserved — the
/// supervisor calls this on every reconcile and unconditional
/// clearing would reset follower LEOs each time, dropping HW back
/// to 0 and blocking acks=-1 producers until followers re-fetch.
/// Fires `hw_advance_notify` so waiting Produce gates can re-check.
pub async fn install_leader_change(&self, new_leader: u64, new_epoch: i32) {
let prev_leader = self.current_leader.swap(new_leader, Ordering::AcqRel);
let prev_epoch = self.current_leader_epoch.swap(new_epoch, Ordering::AcqRel);
let leader_changed = prev_leader != new_leader || prev_epoch != new_epoch;
let mut st = self.replica_state.lock().await;
if leader_changed {
st.per_follower.clear();
}
st.current_leader_epoch = new_epoch;
drop(st);
self.hw_advance_notify.notify_waiters();
}
/// Wait until `replica_state.hw >= target_offset` or `deadline`
/// elapses. Used by the Produce handler for `acks == -1` to gate
/// the response on full replication.
///
/// # Errors
///
/// Returns `Err(HwTimeout)` if the deadline elapses before the HW
/// advances. Returns `Ok(())` on the first re-check that satisfies
/// the target.
pub async fn await_hw_at_least(
&self,
target_offset: i64,
deadline: std::time::Instant,
) -> Result<(), HwTimeout> {
loop {
if self.high_watermark().await >= target_offset {
return Ok(());
}
// Subscribe to the notify BEFORE re-reading HW so we don't
// miss an advance that happens between read and await.
let waiter = self.hw_advance_notify.notified();
tokio::pin!(waiter);
if self.high_watermark().await >= target_offset {
return Ok(());
}
tokio::select! {
() = &mut waiter => {},
() = tokio::time::sleep_until(deadline.into()) => return Err(HwTimeout),
}
}
}
/// Test-only: directly set the partition's `current_leader_epoch`
/// without going through the supervisor's metadata-image-driven path.
/// Used by `tests/leader_epoch.rs` to simulate split-brain by forcing
/// an epoch bump mid-Produce.
#[cfg(any(test, feature = "test-helpers"))]
pub fn test_set_leader_epoch(&self, epoch: i32) {
self.current_leader_epoch
.store(epoch, std::sync::atomic::Ordering::Release);
}
/// Test-only: shift the partition's in-memory `log_start_offset` to
/// `new_start`. Goes through the writer task to maintain the
/// single-writer invariant on the underlying `Log`.
#[cfg(any(test, feature = "test-helpers"))]
pub async fn test_set_log_start(&self, new_start: i64) -> Result<(), BrokerError> {
let (ack_tx, ack_rx) = oneshot::channel();
self.writer_tx
.send(WriterMessage::TestSetLogStart {
new_start,
ack: ack_tx,
})
.await
.map_err(|_| BrokerError::Replication("partition writer dead".into()))?;
ack_rx
.await
.map_err(|_| BrokerError::Replication("ack dropped".into()))?
}
}
impl std::fmt::Debug for Partition {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Deliberately does NOT include `log` — formatting a `Mutex<Log>`
// would block on the mutex and dump internal segment state into
// tracing output.
f.debug_struct("Partition")
.field("topic", &self.topic)
.field("partition_id", &self.partition_id)
.finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use assert2::assert;
use std::sync::atomic::{AtomicI32, AtomicU64};
use super::*;
use crabka_log::LogConfig;
use tempfile::tempdir;
#[test]
fn partition_is_clone_and_send() {
// Compile-time check.
fn assert_send<T: Send>() {}
fn assert_clone<T: Clone>() {}
assert_send::<Partition>();
assert_clone::<Partition>();
}
#[tokio::test]
async fn debug_does_not_dump_log() {
let dir = tempdir().expect("tempdir");
let log = Log::open(dir.path(), LogConfig::default()).expect("open log");
let (tx, _rx) = mpsc::channel::<WriterMessage>(1);
let writer = tokio::spawn(async {});
let p = Partition {
topic: "t".into(),
partition_id: 0,
log_dir: Arc::new(ArcSwap::from_pointee(dir.path().to_path_buf())),
log: Arc::new(Mutex::new(log)),
writer_tx: tx,
append_notify: Arc::new(Notify::new()),
replica_state: Arc::new(tokio::sync::Mutex::new(
crate::replica_state::ReplicaState::new(),
)),
hw_advance_notify: Arc::new(Notify::new()),
current_leader: Arc::new(AtomicU64::new(0)),
current_leader_epoch: Arc::new(AtomicI32::new(0)),
_writer_handle: Arc::new(writer),
};
let s = format!("{p:?}");
assert!(s.contains("topic"));
assert!(s.contains("partition_id"));
// The mutex/log internals must NOT appear in Debug output.
assert!(!s.contains("Mutex"));
assert!(!s.contains("segments"));
}
#[tokio::test]
async fn high_watermark_reads_cached_value() {
let dir = tempdir().expect("tempdir");
let log = Log::open(dir.path(), LogConfig::default()).expect("open log");
let (tx, _rx) = mpsc::channel::<WriterMessage>(1);
let writer = tokio::spawn(async {});
let replica_state = Arc::new(tokio::sync::Mutex::new(
crate::replica_state::ReplicaState::new(),
));
{
let mut st = replica_state.lock().await;
st.hw = 42;
}
let p = Partition {
topic: "t".into(),
partition_id: 0,
log_dir: Arc::new(ArcSwap::from_pointee(dir.path().to_path_buf())),
log: Arc::new(Mutex::new(log)),
writer_tx: tx,
append_notify: Arc::new(Notify::new()),
replica_state,
hw_advance_notify: Arc::new(Notify::new()),
current_leader: Arc::new(AtomicU64::new(0)),
current_leader_epoch: Arc::new(AtomicI32::new(0)),
_writer_handle: Arc::new(writer),
};
assert!(p.high_watermark().await == 42);
}
#[tokio::test]
async fn install_isr_populates_replica_state() {
let dir = tempdir().expect("tempdir");
let log = Log::open(dir.path(), LogConfig::default()).expect("open log");
let (tx, _rx) = mpsc::channel::<WriterMessage>(1);
let writer = tokio::spawn(async {});
let p = Partition {
topic: "t".into(),
partition_id: 0,
log_dir: Arc::new(ArcSwap::from_pointee(dir.path().to_path_buf())),
log: Arc::new(Mutex::new(log)),
writer_tx: tx,
append_notify: Arc::new(Notify::new()),
replica_state: Arc::new(tokio::sync::Mutex::new(
crate::replica_state::ReplicaState::new(),
)),
hw_advance_notify: Arc::new(Notify::new()),
current_leader: Arc::new(AtomicU64::new(0)),
current_leader_epoch: Arc::new(AtomicI32::new(0)),
_writer_handle: Arc::new(writer),
};
p.install_isr(&[1, 2, 3], &[1, 2, 3], 1).await;
let st = p.replica_state.lock().await;
assert!(st.isr.len() == 3);
assert!(st.isr.contains(&1) && st.isr.contains(&2) && st.isr.contains(&3));
assert!(st.per_follower.get(&2).map(|f| f.leo) == Some(0));
}
#[tokio::test]
async fn await_hw_returns_immediately_if_already_satisfied() {
let dir = tempdir().expect("tempdir");
let log = Log::open(dir.path(), LogConfig::default()).expect("open log");
let (tx, _rx) = mpsc::channel::<WriterMessage>(1);
let writer = tokio::spawn(async {});
let replica_state = Arc::new(tokio::sync::Mutex::new(
crate::replica_state::ReplicaState::new(),
));
{
let mut st = replica_state.lock().await;
st.hw = 100;
}
let p = Partition {
topic: "t".into(),
partition_id: 0,
log_dir: Arc::new(ArcSwap::from_pointee(dir.path().to_path_buf())),
log: Arc::new(Mutex::new(log)),
writer_tx: tx,
append_notify: Arc::new(Notify::new()),
replica_state,
hw_advance_notify: Arc::new(Notify::new()),
current_leader: Arc::new(AtomicU64::new(0)),
current_leader_epoch: Arc::new(AtomicI32::new(0)),
_writer_handle: Arc::new(writer),
};
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
p.await_hw_at_least(50, deadline).await.expect("immediate");
}
#[tokio::test]
async fn await_hw_returns_timeout_when_unreached() {
let dir = tempdir().expect("tempdir");
let log = Log::open(dir.path(), LogConfig::default()).expect("open log");
let (tx, _rx) = mpsc::channel::<WriterMessage>(1);
let writer = tokio::spawn(async {});
let p = Partition {
topic: "t".into(),
partition_id: 0,
log_dir: Arc::new(ArcSwap::from_pointee(dir.path().to_path_buf())),
log: Arc::new(Mutex::new(log)),
writer_tx: tx,
append_notify: Arc::new(Notify::new()),
replica_state: Arc::new(tokio::sync::Mutex::new(
crate::replica_state::ReplicaState::new(),
)),
hw_advance_notify: Arc::new(Notify::new()),
current_leader: Arc::new(AtomicU64::new(0)),
current_leader_epoch: Arc::new(AtomicI32::new(0)),
_writer_handle: Arc::new(writer),
};
let deadline = std::time::Instant::now() + std::time::Duration::from_millis(50);
let result = p.await_hw_at_least(100, deadline).await;
assert!(matches!(result, Err(crate::partition::HwTimeout)));
}
#[tokio::test]
async fn set_follower_hw_clamps_advances_and_notifies() {
use crabka_protocol::records::{Attributes, Record, RecordBatch};
let dir = tempdir().expect("tempdir");
let log = Log::open(dir.path(), LogConfig::default()).expect("open log");
let (tx, _rx) = mpsc::channel::<WriterMessage>(1);
let writer = tokio::spawn(async {});
let hw_advance_notify = Arc::new(Notify::new());
let p = Partition {
topic: "t".into(),
partition_id: 0,
log_dir: Arc::new(ArcSwap::from_pointee(dir.path().to_path_buf())),
log: Arc::new(Mutex::new(log)),
writer_tx: tx,
append_notify: Arc::new(Notify::new()),
replica_state: Arc::new(tokio::sync::Mutex::new(
crate::replica_state::ReplicaState::new(),
)),
hw_advance_notify: hw_advance_notify.clone(),
current_leader: Arc::new(AtomicU64::new(0)),
current_leader_epoch: Arc::new(AtomicI32::new(0)),
_writer_handle: Arc::new(writer),
};
// Append a 3-record batch so log_end_offset() == 3.
let mut batch = RecordBatch {
base_offset: 0,
partition_leader_epoch: -1,
attributes: Attributes::default(),
last_offset_delta: 2,
base_timestamp: 1_700_000_000,
max_timestamp: 1_700_000_000,
producer_id: -1,
producer_epoch: -1,
base_sequence: -1,
records: (0..3)
.map(|i| Record {
attributes: 0,
offset_delta: i,
timestamp_delta: 0,
key: None,
value: Some(bytes::Bytes::from_static(b"v")),
headers: vec![],
})
.collect(),
};
p.log
.lock()
.expect("log mutex")
.append(&mut batch)
.expect("append");
assert!(p.log_end_offset() == 3);
// reported_hw below log_end: stored verbatim, notify fires.
// A `Notified` future does not register with the `Notify` until it is
// first polled, and `notify_waiters()` only wakes already-registered
// waiters — so poll once (Pending) to register BEFORE advancing HW.
let waiter = hw_advance_notify.notified();
tokio::pin!(waiter);
assert!(
futures_util::poll!(&mut waiter).is_pending(),
"waiter registers on first poll"
);
p.set_follower_hw(2).await;
assert!(p.high_watermark().await == 2);
assert!(
futures_util::poll!(&mut waiter).is_ready(),
"notify should fire when HW advances"
);
// reported_hw above log_end: clamped to log_end (3).
p.set_follower_hw(100).await;
assert!(p.high_watermark().await == 3);
// reported_hw below current HW: no regression.
p.set_follower_hw(1).await;
assert!(p.high_watermark().await == 3);
}
#[tokio::test]
async fn await_hw_wakes_on_advance() {
let dir = tempdir().expect("tempdir");
let log = Log::open(dir.path(), LogConfig::default()).expect("open log");
let (tx, _rx) = mpsc::channel::<WriterMessage>(1);
let writer = tokio::spawn(async {});
let replica_state = Arc::new(tokio::sync::Mutex::new(
crate::replica_state::ReplicaState::new(),
));
let hw_advance_notify = Arc::new(Notify::new());
let p = Partition {
topic: "t".into(),
partition_id: 0,
log_dir: Arc::new(ArcSwap::from_pointee(dir.path().to_path_buf())),
log: Arc::new(Mutex::new(log)),
writer_tx: tx,
append_notify: Arc::new(Notify::new()),
replica_state: replica_state.clone(),
hw_advance_notify: hw_advance_notify.clone(),
current_leader: Arc::new(AtomicU64::new(0)),
current_leader_epoch: Arc::new(AtomicI32::new(0)),
_writer_handle: Arc::new(writer),
};
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
replica_state.lock().await.hw = 100;
hw_advance_notify.notify_waiters();
});
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
p.await_hw_at_least(50, deadline)
.await
.expect("woke on advance");
}
}