1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
//! Subscribes to the controller's metadata-image watch channel and
//! on each apply:
//!
//! 1. **Materializes the local on-disk partition** for any
//! `(topic, partition)` where this broker is in `replicas`,
//! regardless of leader/follower role. The `CreateTopics` handler
//! used to do this itself, but with round-robin replica placement
//! the broker that handles the request usually isn't the partition
//! leader — so the lazy supervisor-driven path is the only one
//! that materializes the partition on the leader broker reliably.
//!
//! 2. **Spawns a `replicator::run` task** per `(topic, partition)`
//! where this broker is in `replicas` but is NOT the leader, and
//! cancels tasks for partitions removed from the image.
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use dashmap::DashMap;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::warn;
use crabka_log::{Log, LogConfig};
use crabka_metadata::MetadataImage;
use crabka_raft::NodeId;
use crate::broker::spawn_partition;
use crate::partition_registry::PartitionRegistry;
use crate::replicator;
use crate::throttle::ThrottleState;
use crate::txn::coordinator::TxnCoordinator;
/// `(topic, partition)` pairs where `node_id` is in `replicas` AND
/// `leader != node_id` — i.e., the broker should run a follower
/// replicator task.
pub(crate) fn desired_follower_set(
node_id: NodeId,
image: &MetadataImage,
) -> HashSet<(String, i32)> {
let mut out = HashSet::new();
for t in image.topics() {
for p in image.partitions_of(&t.name) {
if p.replicas.contains(&node_id) && p.leader != node_id {
out.insert((p.topic.clone(), p.partition));
}
}
}
out
}
/// `(topic, partition)` pairs where `node_id` is in `replicas`,
/// regardless of leader/follower role — every entry here means this
/// broker hosts partition data on disk and must materialize the
/// on-disk `Partition` locally.
pub(crate) fn desired_local_set(node_id: NodeId, image: &MetadataImage) -> HashSet<(String, i32)> {
let mut out = HashSet::new();
for t in image.topics() {
for p in image.partitions_of(&t.name) {
if p.replicas.contains(&node_id) {
out.insert((p.topic.clone(), p.partition));
}
}
}
out
}
/// Open (or recover) the on-disk `Partition` for `(topic, partition)` and
/// insert it into `partitions` via `PartitionRegistry::materialize_if_vacant`.
///
/// This is the canonical, race-free materialization helper. Both the
/// `ReplicatorSupervisor` reconcile loop and the `InitProducerId` handler
/// (first-touch path) call this function — `materialize_if_vacant` runs the
/// build closure under the per-key lock so two concurrent callers for the
/// same key can never both spawn independent writer tasks.
///
/// Returns `Ok(())` if the partition is already present (no-op) or was
/// successfully opened. Returns `Err(String)` on I/O failure.
pub(crate) fn materialize_partition(
partitions: &PartitionRegistry,
topic: &str,
partition: i32,
log_dirs: &[PathBuf],
log_config: &LogConfig,
log_dir_status: &crate::log_dir_status::LogDirRegistry,
) -> Result<(), String> {
// `materialize_if_vacant` runs `build` under the per-key write lock —
// only one thread can be inside it for a given key at a time,
// eliminating the TOCTOU race that existed with the old
// `contains_key` + `insert` pattern. JBOD placement (KIP-113) happens
// under this lock too, so two concurrent materializations of the same
// partition can never pick two different log dirs.
partitions.materialize_if_vacant(topic, partition, || {
let dir = crate::log_dir::place_partition_dir(log_dirs, topic, partition);
std::fs::create_dir_all(&dir).map_err(|e| format!("mkdir: {e}"))?;
let log = Log::open(&dir, log_config.clone()).map_err(|e| format!("Log::open: {e}"))?;
let owning_dir = dir
.parent()
.expect("placed partition dir always has a parent log.dir")
.to_path_buf();
Ok(spawn_partition(
topic.to_string(),
partition,
owning_dir,
log,
log_dir_status.clone(),
))
})
}
/// Push topic-config overrides onto every locally-hosted partition in
/// `desired`. Idempotent — sending the same `LogConfig` is a cheap noop
/// write inside `Log::set_config`. Errors on individual partitions are
/// logged via `warn!` but don't propagate.
pub(crate) async fn push_topic_configs(
desired: &HashSet<(String, i32)>,
partitions: &PartitionRegistry,
image: &MetadataImage,
) {
let empty: std::collections::BTreeMap<String, String> = std::collections::BTreeMap::new();
for (topic, partition) in desired {
let Some(part) = partitions.get(topic, *partition) else {
continue;
};
let overrides = image.topic_config(topic).unwrap_or(&empty);
if let Err(e) = part.apply_log_config_overrides(overrides).await {
warn!(
topic = %topic, partition = partition, error = %e,
"supervisor: apply_log_config_overrides failed"
);
}
}
}
/// Compute the dir-assignment reports that changed since last reported.
///
/// Returns `(wire_assignments, tracker_updates)`:
/// - `wire_assignments`: `(topic_id, partition, dir_uuid)` for `build_request`.
/// - `tracker_updates`: `(topic_name, partition, dir_uuid)` to write into
/// `reported_dirs` on a successful send.
///
/// Pure: reads each partition's current owning dir exactly once; no second load
/// after the change-check, eliminating the TOCTOU race and O(n²) `Vec::contains`
/// scan present in the previous double-iteration approach.
#[allow(clippy::type_complexity)]
pub(crate) fn collect_changed_assignments(
local_set: &std::collections::HashSet<(String, i32)>,
partitions: &PartitionRegistry,
log_dir_ids: &crate::log_dir_id::LogDirIds,
image: &MetadataImage,
reported_dirs: &dashmap::DashMap<(String, i32), uuid::Uuid>,
) -> (
Vec<(uuid::Uuid, i32, uuid::Uuid)>,
Vec<(String, i32, uuid::Uuid)>,
) {
let mut wire = Vec::new();
let mut updates = Vec::new();
for (topic, partition) in local_set {
let Some(part) = partitions.get(topic, *partition) else {
continue;
};
let dir = part.log_dir.load();
let Some(dir_uuid) = log_dir_ids.id_for(&dir) else {
continue;
};
let Some(topic_rec) = image.topic(topic) else {
continue;
};
let key = (topic.clone(), *partition);
if reported_dirs.get(&key).map(|e| *e.value()) == Some(dir_uuid) {
continue; // unchanged since last report
}
wire.push((topic_rec.topic_id, *partition, dir_uuid));
updates.push((topic.clone(), *partition, dir_uuid));
}
(wire, updates)
}
pub(crate) struct ReplicatorSupervisor {
node_id: NodeId,
broker_id: i32,
controller: Arc<dyn crate::metadata_source::MetadataSource>,
partitions: Arc<PartitionRegistry>,
log_dirs: Vec<PathBuf>,
log_config: LogConfig,
client_id: String,
tasks: DashMap<(String, i32), CancellationToken>,
/// Per-follower-partition (leader, `leader_epoch`) tuple captured at
/// spawn time. On reconcile, if the tuple changes, the task is
/// cancelled and respawned pointed at the new leader.
task_targets: DashMap<(String, i32), (NodeId, i32)>,
shutdown: CancellationToken,
txn_coordinator: Option<Arc<TxnCoordinator>>,
/// KIP-932 share coordinator. Its view of locally-led
/// `__share_group_state` partitions is refreshed on each reconcile,
/// mirroring the txn coordinator.
share_coordinator: Option<Arc<crate::share_coordinator::coordinator::ShareCoordinator>>,
/// Shared outbound dialer (TLS + SASL when configured, raw TCP
/// otherwise). Each spawned replicator clones this Arc.
inter_broker_client: Arc<crate::network::client::InterBrokerClient>,
/// Listener protocol used for inter-broker dials. Drives whether
/// the dialer runs TLS / SASL.
inter_broker_listener_protocol: crabka_security::ListenerProtocol,
/// Name of the listener whose endpoint we resolve from the
/// metadata image when dialing peers.
inter_broker_listener_name: String,
/// KIP-73: broker-wide throttle state forwarded to each spawned
/// replicator so they can consult the follower-in token bucket.
throttle_state: Arc<ThrottleState>,
/// KIP-113 runtime offline-dir registry. Forwarded into each
/// `materialize_partition` + spawned `Replicator::Config` so the
/// partition writer's storage-failure path can flip the dir
/// offline broker-wide.
log_dir_status: crate::log_dir_status::LogDirRegistry,
/// Broker-wide metrics handle. Each spawned replicator
/// clones this so it can increment `replication_bytes_in` after a
/// successful follower-side append.
metrics: crate::metrics::BrokerMetrics,
/// KIP-858: stable UUID per configured log.dir. Used by the reconcile
/// loop to build `AssignReplicasToDirs` reports.
log_dir_ids: crate::log_dir_id::LogDirIds,
/// KIP-858: tracks the last-reported dir UUID per (topic, partition) so
/// we only send `AssignReplicasToDirs` on first materialization or after
/// a KIP-113 log-dir swap.
reported_dirs: dashmap::DashMap<(String, i32), uuid::Uuid>,
}
impl ReplicatorSupervisor {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
node_id: NodeId,
broker_id: i32,
controller: Arc<dyn crate::metadata_source::MetadataSource>,
partitions: Arc<PartitionRegistry>,
log_dirs: Vec<PathBuf>,
log_config: LogConfig,
client_id: String,
shutdown: CancellationToken,
txn_coordinator: Option<Arc<TxnCoordinator>>,
share_coordinator: Option<Arc<crate::share_coordinator::coordinator::ShareCoordinator>>,
inter_broker_client: Arc<crate::network::client::InterBrokerClient>,
inter_broker_listener_protocol: crabka_security::ListenerProtocol,
inter_broker_listener_name: String,
throttle_state: Arc<ThrottleState>,
log_dir_status: crate::log_dir_status::LogDirRegistry,
metrics: crate::metrics::BrokerMetrics,
log_dir_ids: crate::log_dir_id::LogDirIds,
) -> Self {
Self {
node_id,
broker_id,
controller,
partitions,
log_dirs,
log_config,
client_id,
tasks: DashMap::new(),
task_targets: DashMap::new(),
shutdown,
txn_coordinator,
share_coordinator,
inter_broker_client,
inter_broker_listener_protocol,
inter_broker_listener_name,
throttle_state,
log_dir_status,
metrics,
log_dir_ids,
reported_dirs: dashmap::DashMap::new(),
}
}
#[allow(clippy::too_many_lines)]
pub(crate) async fn reconcile(&self, image: &MetadataImage) {
let local_set = desired_local_set(self.node_id, image);
// 0. Materialize the on-disk partition for every assignment where
// self is in `replicas`, regardless of leader/follower role.
// Additionally: sync the partition's cached leader + epoch
// (idempotent), and for partitions where self is leader,
// install the ISR into ReplicaState for HW computation.
for key in &local_set {
if let Err(e) = self.materialize_local_partition(&key.0, key.1) {
warn!(
topic = %key.0, partition = key.1, error = %e,
"failed to materialize local partition"
);
continue;
}
let Some(part_record) = image.partition(&key.0, key.1).cloned() else {
continue;
};
let Some(part) = self.partitions.get(&key.0, key.1) else {
continue;
};
// Always sync the partition's cached leader + epoch.
// `Partition::install_leader_change` is idempotent (atomic stores
// no-op on equal writes).
part.install_leader_change(part_record.leader, part_record.leader_epoch)
.await;
if part_record.leader == self.node_id {
// Install the *current* ISR from the metadata image (not the
// full replica set) as ISR membership: using `replicas` would
// undo any shrink applied via AlterPartition, so
// isr_maintenance's shrink would never stick (and producers
// with acks=-1 would stay blocked on lagging followers). The
// replica set is passed separately so follower-progress
// tracking survives across reconciles for replicas catching
// up toward ISR re-admission.
part.install_isr(&part_record.isr, &part_record.replicas, part_record.leader)
.await;
}
}
// Push topic-config overrides onto every locally-hosted partition.
// Pushes are idempotent — sending the same `LogConfig` is a cheap
// noop write inside `Log::set_config`. The metadata-watch reconcile
// loop fires on every image change, so AlterConfigs propagation is
// bounded to one reconcile tick.
push_topic_configs(&local_set, &self.partitions, image).await;
let desired = desired_follower_set(self.node_id, image);
// 1. Cancel removed.
let current: Vec<(String, i32)> = self.tasks.iter().map(|e| e.key().clone()).collect();
for k in current {
if !desired.contains(&k)
&& let Some((_, token)) = self.tasks.remove(&k)
{
self.task_targets.remove(&k);
token.cancel();
}
}
// 1b. Cancel any follower task whose target (leader, leader_epoch) changed.
for k in &desired {
let Some(pr) = image.partition(&k.0, k.1).cloned() else {
continue;
};
let new_target = (pr.leader, pr.leader_epoch);
let needs_cancel = self
.task_targets
.get(k)
.is_some_and(|prev| *prev.value() != new_target);
if needs_cancel && let Some((_, token)) = self.tasks.remove(k) {
self.task_targets.remove(k);
token.cancel();
}
}
// 2. Spawn new follower replicators.
for k in desired {
if self.tasks.contains_key(&k) {
continue;
}
let part = image.partition(&k.0, k.1).cloned();
let Some(part) = part else { continue };
let leader = part.leader;
let Some(broker) = image.broker(leader).cloned() else {
warn!(
topic = %k.0, partition = k.1, leader,
"leader broker not yet registered in MetadataImage; deferring"
);
continue;
};
// Resolve the topic's `topic_id` from the same image we're
// reconciling against. The replicator needs it for the v13+
// Fetch wire format; without it the leader's handler can't
// resolve the topic name and returns UNKNOWN_TOPIC_OR_PARTITION.
let Some(topic_rec) = image.topic(&k.0).cloned() else {
warn!(
topic = %k.0, partition = k.1,
"topic record missing from MetadataImage; deferring"
);
continue;
};
let token = CancellationToken::new();
self.tasks.insert(k.clone(), token.clone());
self.task_targets
.insert(k.clone(), (leader, part.leader_epoch));
// Prefer the inter-broker listener's endpoint when the leader
// has projected it onto its registration record. Fall back to
// the legacy top-level host/port for brokers that haven't
// projected a per-listener endpoint yet (or PLAINTEXT-only
// deployments where the synthesized endpoint matches anyway).
let (leader_host, leader_port) = broker
.endpoints
.iter()
.find(|e| e.name == self.inter_broker_listener_name)
.map_or_else(
|| (broker.host.clone(), broker.port),
|e| (e.host.clone(), e.port),
);
tokio::spawn(replicator::run(replicator::Config {
node_id: self.node_id,
topic: k.0,
topic_id: crabka_protocol::primitives::uuid::Uuid(topic_rec.topic_id.into_bytes()),
partition: k.1,
leader_node_id: leader,
leader_host,
leader_port,
partitions: self.partitions.clone(),
log_dirs: self.log_dirs.clone(),
log_config: self.log_config.clone(),
client_id: self.client_id.clone(),
shutdown: token,
inter_broker_client: self.inter_broker_client.clone(),
inter_broker_listener_protocol: self.inter_broker_listener_protocol,
throttle_state: self.throttle_state.clone(),
controller: self.controller.clone(),
log_dir_status: self.log_dir_status.clone(),
metrics: self.metrics.clone(),
}));
}
// 3. Refresh the txn coordinator's view of locally-led
// __transaction_state partitions. Cheap (Arc clone + lock).
if let Some(coord) = &self.txn_coordinator {
coord.refresh_leader_partitions(image).await;
}
// 3b. Refresh the share coordinator's view of locally-led
// __share_group_state partitions (KIP-932). Same shape as txn.
if let Some(coord) = &self.share_coordinator {
coord.refresh_leader_partitions(image).await;
}
// 4. KIP-858: report any (topic, partition) whose owning log-dir UUID
// has changed since the last successful report (first materialization
// or after a KIP-113 dir swap). Only sends if there is at least one
// change; on error the tracker is NOT updated so we retry next tick.
// The report submits a non-clobbering V1PartitionDirAssignment delta
// (merges one replica's `directories` slot), so it can no longer
// revert a concurrent reassignment.
self.report_dir_assignments(&local_set, image).await;
}
/// Collect changed `(topic_id, partition, dir_uuid)` assignments from
/// `local_set` and send `AssignReplicasToDirs` to the controller leader
/// when at least one assignment has changed since the last successful send.
async fn report_dir_assignments(
&self,
local_set: &std::collections::HashSet<(String, i32)>,
image: &MetadataImage,
) {
let (wire, updates) = collect_changed_assignments(
local_set,
&self.partitions,
&self.log_dir_ids,
image,
&self.reported_dirs,
);
if wire.is_empty() {
return;
}
let req = crate::assign_dirs::build_request(self.broker_id, &wire);
match crate::assign_dirs::send_assignments(&self.controller, &self.client_id, req).await {
Ok(()) => {
for (topic, partition, dir_uuid) in updates {
self.reported_dirs.insert((topic, partition), dir_uuid);
}
}
Err(e) => {
warn!(
error = %e,
"assign_replicas_to_dirs report failed; will retry next reconcile"
);
}
}
}
/// Open (or recover) the on-disk `Partition` for `(topic, partition)`
/// and insert it into the broker's shared `partitions` map.
/// Idempotent: a no-op if the partition is already present.
fn materialize_local_partition(&self, topic: &str, partition: i32) -> Result<(), String> {
materialize_partition(
&self.partitions,
topic,
partition,
&self.log_dirs,
&self.log_config,
&self.log_dir_status,
)
}
pub(crate) async fn run(self) {
let mut rx = self.controller.watch_image();
loop {
let image = rx.borrow().clone();
self.reconcile(&image).await;
tokio::select! {
() = self.shutdown.cancelled() => break,
res = rx.changed() => {
if res.is_err() {
break;
}
}
}
}
for entry in &self.tasks {
entry.value().cancel();
}
}
pub(crate) fn spawn(self) -> JoinHandle<()> {
tokio::spawn(self.run())
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use crabka_metadata::{MetadataImage, MetadataRecord, PartitionRecord, TopicRecord};
use uuid::Uuid;
fn image_with(records: &[MetadataRecord]) -> MetadataImage {
let mut img = MetadataImage::new(Uuid::nil());
for r in records {
img.apply(r);
}
img
}
#[test]
fn includes_partition_where_self_is_follower() {
let img = image_with(&[
MetadataRecord::V1Topic(TopicRecord {
name: "t".into(),
topic_id: Uuid::new_v4(),
partitions: 1,
replication_factor: 3,
}),
MetadataRecord::V1Partition(PartitionRecord {
topic: "t".into(),
partition: 0,
leader: 1,
replicas: vec![1, 2, 3],
isr: vec![1, 2, 3],
leader_epoch: 0,
adding_replicas: vec![],
removing_replicas: vec![],
directories: vec![],
partition_epoch: 0,
}),
]);
let d = desired_follower_set(2, &img);
assert!(d.contains(&("t".into(), 0)));
assert!(d.len() == 1);
}
#[test]
fn excludes_partition_where_self_is_leader() {
let img = image_with(&[
MetadataRecord::V1Topic(TopicRecord {
name: "t".into(),
topic_id: Uuid::new_v4(),
partitions: 1,
replication_factor: 3,
}),
MetadataRecord::V1Partition(PartitionRecord {
topic: "t".into(),
partition: 0,
leader: 1,
replicas: vec![1, 2, 3],
isr: vec![1, 2, 3],
leader_epoch: 0,
adding_replicas: vec![],
removing_replicas: vec![],
directories: vec![],
partition_epoch: 0,
}),
]);
assert!(desired_follower_set(1, &img).is_empty());
}
#[test]
fn excludes_partition_where_self_is_not_a_replica() {
let img = image_with(&[
MetadataRecord::V1Topic(TopicRecord {
name: "t".into(),
topic_id: Uuid::new_v4(),
partitions: 1,
replication_factor: 3,
}),
MetadataRecord::V1Partition(PartitionRecord {
topic: "t".into(),
partition: 0,
leader: 1,
replicas: vec![1, 2, 3],
isr: vec![1, 2, 3],
leader_epoch: 0,
adding_replicas: vec![],
removing_replicas: vec![],
directories: vec![],
partition_epoch: 0,
}),
]);
assert!(desired_follower_set(99, &img).is_empty());
}
#[tokio::test]
async fn materialize_partition_helper_supports_isr_install() {
use crabka_log::LogConfig;
use tempfile::tempdir;
let dir = tempdir().expect("tempdir");
let partitions = Arc::new(PartitionRegistry::new());
materialize_partition(
&partitions,
"t",
0,
&[dir.path().to_path_buf()],
&LogConfig::default(),
&crate::log_dir_status::LogDirRegistry::default(),
)
.expect("materialize");
let part = partitions.get("t", 0).expect("part");
// Mirror what reconcile does for leader partitions.
part.install_isr(&[1, 2, 3], &[1, 2, 3], 1).await;
let st = part.replica_state.lock().await;
assert!(st.isr.len() == 3);
}
#[test]
fn multiple_topics_aggregated() {
let img = image_with(&[
MetadataRecord::V1Topic(TopicRecord {
name: "a".into(),
topic_id: Uuid::new_v4(),
partitions: 1,
replication_factor: 3,
}),
MetadataRecord::V1Partition(PartitionRecord {
topic: "a".into(),
partition: 0,
leader: 1,
replicas: vec![1, 2, 3],
isr: vec![1, 2, 3],
leader_epoch: 0,
adding_replicas: vec![],
removing_replicas: vec![],
directories: vec![],
partition_epoch: 0,
}),
MetadataRecord::V1Topic(TopicRecord {
name: "b".into(),
topic_id: Uuid::new_v4(),
partitions: 2,
replication_factor: 3,
}),
MetadataRecord::V1Partition(PartitionRecord {
topic: "b".into(),
partition: 0,
leader: 3,
replicas: vec![1, 2, 3],
isr: vec![1, 2, 3],
leader_epoch: 0,
adding_replicas: vec![],
removing_replicas: vec![],
directories: vec![],
partition_epoch: 0,
}),
MetadataRecord::V1Partition(PartitionRecord {
topic: "b".into(),
partition: 1,
leader: 2,
replicas: vec![1, 2, 3],
isr: vec![1, 2, 3],
leader_epoch: 0,
adding_replicas: vec![],
removing_replicas: vec![],
directories: vec![],
partition_epoch: 0,
}),
]);
let d = desired_follower_set(2, &img);
assert!(d.contains(&("a".into(), 0)));
assert!(d.contains(&("b".into(), 0)));
assert!(!d.contains(&("b".into(), 1))); // self is leader for b/1
assert!(d.len() == 2);
}
#[tokio::test]
async fn push_topic_configs_pushes_overrides_to_local_partition() {
use crabka_log::LogConfig;
use crabka_metadata::{
MetadataImage, MetadataRecord, PartitionRecord, TopicConfigRecord, TopicRecord,
};
use std::collections::BTreeMap;
use tempfile::tempdir;
use uuid::Uuid;
// Build an image with a topic + partition record + V1TopicConfig.
let mut img = MetadataImage::new(Uuid::nil());
img.apply(&MetadataRecord::V1Topic(TopicRecord {
name: "t".into(),
topic_id: Uuid::new_v4(),
partitions: 1,
replication_factor: 1,
}));
img.apply(&MetadataRecord::V1Partition(PartitionRecord {
topic: "t".into(),
partition: 0,
leader: 1,
replicas: vec![1],
isr: vec![1],
leader_epoch: 0,
adding_replicas: vec![],
removing_replicas: vec![],
directories: vec![],
partition_epoch: 0,
}));
let mut overrides = BTreeMap::new();
overrides.insert("retention.ms".to_string(), "60000".to_string());
img.apply(&MetadataRecord::V1TopicConfig(TopicConfigRecord {
topic: "t".into(),
overrides,
}));
// Materialize the partition on disk.
let dir = tempdir().expect("tempdir");
let partitions = Arc::new(PartitionRegistry::new());
materialize_partition(
&partitions,
"t",
0,
&[dir.path().to_path_buf()],
&LogConfig::default(),
&crate::log_dir_status::LogDirRegistry::default(),
)
.expect("materialize");
// Call push_topic_configs directly.
let mut desired = HashSet::new();
desired.insert(("t".to_string(), 0));
push_topic_configs(&desired, &partitions, &img).await;
// Give the writer actor a moment to apply the SetLogConfig message.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
// Verify the partition's Log now has retention.ms=60s.
let part = partitions.get("t", 0).expect("partition materialized");
let snap = part.log.lock().expect("log lock").config_snapshot();
assert!(snap.retention_ms == Some(std::time::Duration::from_mins(1)));
}
#[tokio::test]
async fn push_topic_configs_with_no_overrides_uses_defaults() {
use crabka_log::LogConfig;
use crabka_metadata::{MetadataImage, MetadataRecord, PartitionRecord, TopicRecord};
use tempfile::tempdir;
use uuid::Uuid;
let mut img = MetadataImage::new(Uuid::nil());
img.apply(&MetadataRecord::V1Topic(TopicRecord {
name: "t".into(),
topic_id: Uuid::new_v4(),
partitions: 1,
replication_factor: 1,
}));
img.apply(&MetadataRecord::V1Partition(PartitionRecord {
topic: "t".into(),
partition: 0,
leader: 1,
replicas: vec![1],
isr: vec![1],
leader_epoch: 0,
adding_replicas: vec![],
removing_replicas: vec![],
directories: vec![],
partition_epoch: 0,
}));
let dir = tempdir().expect("tempdir");
let partitions = Arc::new(PartitionRegistry::new());
materialize_partition(
&partitions,
"t",
0,
&[dir.path().to_path_buf()],
&LogConfig::default(),
&crate::log_dir_status::LogDirRegistry::default(),
)
.expect("materialize");
let mut desired = HashSet::new();
desired.insert(("t".to_string(), 0));
push_topic_configs(&desired, &partitions, &img).await;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
// No overrides → default retention applies.
let part = partitions.get("t", 0).expect("partition");
let snap = part.log.lock().expect("log lock").config_snapshot();
assert!(snap.retention_ms == LogConfig::default().retention_ms);
}
#[tokio::test]
async fn collect_changed_assignments_reports_new_then_skips_unchanged() {
use crabka_log::LogConfig;
use crabka_metadata::{MetadataImage, MetadataRecord, PartitionRecord, TopicRecord};
use tempfile::tempdir;
use uuid::Uuid;
// Build image with a single topic+partition.
let topic_id = Uuid::new_v4();
let mut img = MetadataImage::new(Uuid::nil());
img.apply(&MetadataRecord::V1Topic(TopicRecord {
name: "t".into(),
topic_id,
partitions: 1,
replication_factor: 1,
}));
img.apply(&MetadataRecord::V1Partition(PartitionRecord {
topic: "t".into(),
partition: 0,
leader: 1,
replicas: vec![1],
isr: vec![1],
leader_epoch: 0,
adding_replicas: vec![],
removing_replicas: vec![],
directories: vec![],
partition_epoch: 0,
}));
// Materialize the partition under a temp dir.
let dir = tempdir().expect("tempdir");
let partitions = Arc::new(PartitionRegistry::new());
materialize_partition(
&partitions,
"t",
0,
&[dir.path().to_path_buf()],
&LogConfig::default(),
&crate::log_dir_status::LogDirRegistry::default(),
)
.expect("materialize");
// Resolve LogDirIds over the same temp dir.
let log_dir_ids = crate::log_dir_id::LogDirIds::resolve(&[dir.path().to_path_buf()]);
// Confirm the partition's log_dir equals the temp dir (the parent of
// the placed partition sub-dir).
let part = partitions.get("t", 0).expect("part present");
let loaded_dir = part.log_dir.load();
assert!(**loaded_dir == dir.path().to_path_buf());
let dir_uuid = log_dir_ids.id_for(dir.path()).expect("dir uuid resolvable");
let mut local_set = HashSet::new();
local_set.insert(("t".to_string(), 0));
let reported_dirs: dashmap::DashMap<(String, i32), uuid::Uuid> = dashmap::DashMap::new();
// First call: nothing reported yet → one wire entry + one update entry.
let (wire, updates) = collect_changed_assignments(
&local_set,
&partitions,
&log_dir_ids,
&img,
&reported_dirs,
);
assert!(wire.len() == 1);
assert!(updates.len() == 1);
let (w_topic_id, w_partition, w_dir_uuid) = wire[0];
assert!(w_topic_id == topic_id);
assert!(w_partition == 0);
assert!(w_dir_uuid == dir_uuid);
let (u_topic, u_partition, u_dir_uuid) = &updates[0];
assert!(u_topic == "t");
assert!(*u_partition == 0);
assert!(*u_dir_uuid == dir_uuid);
// Simulate a successful send: insert the tracker update.
for (topic, partition, uuid) in updates {
reported_dirs.insert((topic, partition), uuid);
}
// Second call: already reported → both vecs empty.
let (wire2, updates2) = collect_changed_assignments(
&local_set,
&partitions,
&log_dir_ids,
&img,
&reported_dirs,
);
assert!(wire2.is_empty());
assert!(updates2.is_empty());
}
}