1use std::sync::Arc;
29
30use bytes::Bytes;
31use futures_util::StreamExt;
32use tokio::runtime::Handle;
33use tokio::sync::watch;
34use tokio::task::JoinHandle;
35use tokio_util::sync::CancellationToken;
36use tracing::warn;
37
38use crabka_remote_storage::{
39 InmemoryRemoteLogMetadataManager, RemoteLogMetadataManager, RemoteLogSegmentMetadata,
40 RemoteLogSegmentMetadataUpdate, RemoteLogSegmentState, RemotePartitionDeleteMetadata,
41 RemoteStorageError, TopicIdPartition,
42};
43
44use crate::error::MetadataLogError;
45use crate::log::{AssignmentHandle, MetadataEventLog, MetadataEventStream, PartitionStart};
46use crate::partitioning::metadata_partition_for;
47use crate::serde::MetadataEvent;
48
49const HWM_UNKNOWN: i64 = i64::MAX;
56
57enum ReadGate {
63 Unassigned,
66 NotReady,
69 Ready,
71}
72
73pub struct TopicBasedRemoteLogMetadataManager {
81 log: Arc<dyn MetadataEventLog>,
82 inner: Arc<InmemoryRemoteLogMetadataManager>,
83 applied: Arc<std::sync::Mutex<Vec<i64>>>,
84 applied_tx: watch::Sender<u64>,
85 runtime: Handle,
86 shutdown: CancellationToken,
87 pump: std::sync::Mutex<Option<JoinHandle<()>>>,
88 snapshot_dir: std::path::PathBuf,
91 snapshotter: std::sync::Mutex<Option<JoinHandle<()>>>,
94 assignment: Arc<dyn AssignmentHandle>,
99 committed_offsets: Vec<i64>,
106 ready_targets: Arc<std::sync::Mutex<std::collections::HashMap<i32, i64>>>,
113}
114
115impl TopicBasedRemoteLogMetadataManager {
116 #[allow(clippy::unused_async)]
136 pub async fn start(
137 log: Arc<dyn MetadataEventLog>,
138 runtime: Handle,
139 snapshot_dir: std::path::PathBuf,
140 snapshot_interval: std::time::Duration,
141 ) -> Result<Arc<Self>, RemoteStorageError> {
142 let n = usize::try_from(log.partition_count()).expect("partition_count fits in usize");
143 let (applied_tx, _) = watch::channel(0u64);
144 let inner = Arc::new(InmemoryRemoteLogMetadataManager::new());
145 let shutdown = CancellationToken::new();
146
147 let snapshot = match crate::snapshot::Snapshot::load(
153 &snapshot_dir.join(crate::snapshot::SNAPSHOT_FILE_NAME),
154 ) {
155 Ok(snap) => snap,
156 Err(e) => {
157 warn!(error = ?e, "topic-based RLMM: snapshot corrupt; starting from empty cache");
158 None
159 }
160 };
161 if let Some(snap) = &snapshot {
162 inner.import(snap.dump.clone());
163 }
164 let (committed, _assignment) = Self::resume_from_snapshot(snapshot.as_ref(), n);
171
172 let applied = Arc::new(std::sync::Mutex::new(committed.clone()));
176
177 let (stream, assignment_handle) = log.subscribe(Vec::new());
178 let pump = runtime.spawn(pump_loop(
179 stream,
180 inner.clone(),
181 applied.clone(),
182 applied_tx.clone(),
183 shutdown.clone(),
184 ));
185
186 let manager = Arc::new(Self {
187 log,
188 inner,
189 applied,
190 applied_tx,
191 runtime,
192 shutdown,
193 pump: std::sync::Mutex::new(Some(pump)),
194 snapshot_dir,
195 snapshotter: std::sync::Mutex::new(None),
196 assignment: assignment_handle,
197 committed_offsets: committed,
198 ready_targets: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
199 });
200
201 let snapshotter = {
204 let weak = Arc::downgrade(&manager);
205 let shutdown = manager.shutdown.clone();
206 manager.runtime.spawn(async move {
207 let mut last_written: i64 = -1;
208 loop {
209 tokio::select! {
210 biased;
211 () = shutdown.cancelled() => return,
212 () = tokio::time::sleep(snapshot_interval) => {}
213 }
214 let Some(m) = weak.upgrade() else { return };
215 let highest = {
217 let applied = m.applied.lock().expect("applied mutex poisoned");
218 applied.iter().copied().max().unwrap_or(-1)
219 };
220 if highest > last_written {
221 match m.write_snapshot() {
222 Ok(written) => last_written = written,
223 Err(e) => {
224 warn!(error = ?e, "topic-based RLMM: periodic snapshot failed");
225 }
226 }
227 }
228 }
229 })
230 };
231 *manager
232 .snapshotter
233 .lock()
234 .expect("snapshotter mutex poisoned") = Some(snapshotter);
235
236 Ok(manager)
242 }
243
244 pub fn shutdown(&self) {
248 self.shutdown.cancel();
249 }
250
251 pub async fn shutdown_and_flush(&self) {
255 self.shutdown.cancel();
256 let handle = self
259 .snapshotter
260 .lock()
261 .expect("snapshotter mutex poisoned")
262 .take();
263 if let Some(h) = handle {
266 let _ = h.await;
267 }
268 if let Err(e) = self.write_snapshot() {
269 warn!(error = ?e, "topic-based RLMM: final snapshot flush failed");
270 }
271 }
272
273 fn write_snapshot(&self) -> Result<i64, crate::error::SnapshotError> {
281 let (committed_offsets, dump) = {
290 let applied = self.applied.lock().expect("applied mutex poisoned");
291 let dump = self.inner.export();
292 (applied.clone(), dump)
293 };
294 let max = committed_offsets.iter().copied().max().unwrap_or(-1);
295 let snap = crate::snapshot::Snapshot {
296 committed_offsets,
297 dump,
298 };
299 let path = self.snapshot_dir.join(crate::snapshot::SNAPSHOT_FILE_NAME);
300 snap.write_atomic(&path)?;
301 Ok(max)
302 }
303
304 fn resume_from_snapshot(
317 snapshot: Option<&crate::snapshot::Snapshot>,
318 n: usize,
319 ) -> (Vec<i64>, Vec<PartitionStart>) {
320 let mut committed = vec![-1i64; n];
321 if let Some(snap) = snapshot {
322 for (i, &off) in snap.committed_offsets.iter().take(n).enumerate() {
323 committed[i] = off;
324 }
325 }
326 let assignment = (0..n)
327 .map(|i| PartitionStart {
328 partition: i32::try_from(i).expect("partition fits in i32"),
329 start_offset: committed[i] + 1,
330 })
331 .collect();
332 (committed, assignment)
333 }
334
335 #[must_use]
340 pub fn committed_offset(&self, partition: i32) -> i64 {
341 usize::try_from(partition)
342 .ok()
343 .and_then(|i| self.committed_offsets.get(i).copied())
344 .unwrap_or(-1)
345 }
346
347 fn metadata_partition_gate(&self, mp: i32) -> ReadGate {
350 let target = {
351 let guard = self.ready_targets.lock().expect("ready_targets poisoned");
352 match guard.get(&mp) {
353 Some(&t) => t,
354 None => return ReadGate::Unassigned,
359 }
360 };
361 if target == 0 {
362 return ReadGate::Ready; }
364 if target == HWM_UNKNOWN {
368 return ReadGate::NotReady;
369 }
370 let Ok(idx) = usize::try_from(mp) else {
371 return ReadGate::NotReady;
376 };
377 let applied = self.applied.lock().expect("applied mutex poisoned");
378 if idx < applied.len() && applied[idx] >= target - 1 {
379 ReadGate::Ready
380 } else {
381 ReadGate::NotReady
382 }
383 }
384
385 #[cfg(test)]
388 fn metadata_partition_ready(&self, mp: i32) -> bool {
389 matches!(self.metadata_partition_gate(mp), ReadGate::Ready)
390 }
391
392 #[must_use]
395 pub fn assigned_metadata_partitions(&self) -> Vec<i32> {
396 let mut v: Vec<i32> = self
397 .ready_targets
398 .lock()
399 .expect("ready_targets poisoned")
400 .keys()
401 .copied()
402 .collect();
403 v.sort_unstable();
404 v
405 }
406
407 pub async fn reconcile_assignment(&self, desired: &[i32]) {
434 use std::collections::HashSet;
435 let want: HashSet<i32> = desired.iter().copied().collect();
436 let current: std::collections::HashMap<i32, i64> = self
440 .ready_targets
441 .lock()
442 .expect("ready_targets poisoned")
443 .clone();
444 let have: HashSet<i32> = current.keys().copied().collect();
445
446 let needs_add = want.difference(&have).copied().collect::<Vec<_>>();
447 let needs_refresh = want
450 .iter()
451 .copied()
452 .filter(|mp| current.get(mp) == Some(&HWM_UNKNOWN))
453 .collect::<Vec<_>>();
454
455 let needs_hwm = !needs_add.is_empty() || !needs_refresh.is_empty();
457 let hwms = if needs_hwm {
458 match self.log.high_water_marks().await {
459 Ok(h) => Some(h),
460 Err(e) => {
461 warn!(error = ?e, "topic-based RLMM: high_water_marks fetch failed; \
462 assigned partitions gate NotReady until a later reconcile refreshes");
463 None
464 }
465 }
466 } else {
467 None
468 };
469
470 let target_for = |mp: i32| -> i64 {
474 match &hwms {
475 Some(h) => usize::try_from(mp)
476 .ok()
477 .and_then(|i| h.get(i).copied())
478 .unwrap_or(HWM_UNKNOWN),
479 None => HWM_UNKNOWN,
480 }
481 };
482
483 for mp in needs_add {
484 let start_offset = self.committed_offset(mp) + 1;
487 self.assignment.add(PartitionStart {
488 partition: mp,
489 start_offset,
490 });
491 self.ready_targets
495 .lock()
496 .expect("ready_targets poisoned")
497 .insert(mp, target_for(mp));
498 }
499 for mp in needs_refresh {
502 let target = target_for(mp);
503 if target != HWM_UNKNOWN {
504 let mut guard = self.ready_targets.lock().expect("ready_targets poisoned");
505 if guard.get(&mp) == Some(&HWM_UNKNOWN) {
509 guard.insert(mp, target);
510 }
511 }
512 }
513 for mp in have.difference(&want).copied() {
514 self.assignment.remove(mp);
515 self.ready_targets
516 .lock()
517 .expect("ready_targets poisoned")
518 .remove(&mp);
519 }
520 }
521
522 async fn wait_for_offset(&self, partition: i32, offset: i64) {
523 let idx = usize::try_from(partition).expect("partition non-negative");
524 let mut rx = self.applied_tx.subscribe();
525 loop {
526 {
527 let applied = self.applied.lock().expect("applied mutex poisoned");
528 if applied[idx] >= offset {
529 return;
530 }
531 }
532 if rx.changed().await.is_err() {
533 return;
534 }
535 }
536 }
537
538 fn publish_and_wait(
539 &self,
540 tp: &TopicIdPartition,
541 event: Bytes,
542 ) -> Result<(), RemoteStorageError> {
543 let partition = metadata_partition_for(tp, self.log.partition_count());
544 let log = self.log.clone();
545 self.runtime.block_on(async {
548 let offset = log
549 .publish(partition, event)
550 .await
551 .map_err(MetadataLogError::into_storage)?;
552 self.wait_for_offset(partition, offset).await;
553 Ok::<_, RemoteStorageError>(())
554 })
555 }
556}
557
558impl Drop for TopicBasedRemoteLogMetadataManager {
559 fn drop(&mut self) {
560 self.shutdown.cancel();
561 if let Some(handle) = self.pump.lock().expect("pump mutex poisoned").take() {
562 handle.abort();
563 }
564 if let Some(handle) = self
565 .snapshotter
566 .lock()
567 .expect("snapshotter mutex poisoned")
568 .take()
569 {
570 handle.abort();
571 }
572 }
573}
574
575impl RemoteLogMetadataManager for TopicBasedRemoteLogMetadataManager {
576 fn add_remote_log_segment_metadata(
577 &self,
578 metadata: RemoteLogSegmentMetadata,
579 ) -> Result<(), RemoteStorageError> {
580 if metadata.state() != RemoteLogSegmentState::CopySegmentStarted {
583 return Err(RemoteStorageError::InvalidAdd {
584 id: metadata.remote_log_segment_id().clone(),
585 reason: format!(
586 "starting state must be CopySegmentStarted, got {:?}",
587 metadata.state()
588 ),
589 });
590 }
591 let tp = metadata.remote_log_segment_id().topic_id_partition.clone();
592 let event = MetadataEvent::AddSegment(metadata).encode();
593 self.publish_and_wait(&tp, event)
594 }
595
596 fn update_remote_log_segment_metadata(
597 &self,
598 update: RemoteLogSegmentMetadataUpdate,
599 ) -> Result<(), RemoteStorageError> {
600 let tp = update.remote_log_segment_id.topic_id_partition.clone();
601 let event = MetadataEvent::UpdateSegment(update).encode();
602 self.publish_and_wait(&tp, event)
603 }
604
605 fn remote_log_segment_metadata(
606 &self,
607 topic_id_partition: &TopicIdPartition,
608 leader_epoch: i32,
609 offset: i64,
610 ) -> Result<Option<RemoteLogSegmentMetadata>, RemoteStorageError> {
611 let mp = metadata_partition_for(topic_id_partition, self.log.partition_count());
612 match self.metadata_partition_gate(mp) {
613 ReadGate::Unassigned => Ok(None),
616 ReadGate::NotReady => Err(RemoteStorageError::NotReady { partition: mp }),
618 ReadGate::Ready => {
619 self.inner
620 .remote_log_segment_metadata(topic_id_partition, leader_epoch, offset)
621 }
622 }
623 }
624
625 fn highest_offset_for_epoch(
626 &self,
627 topic_id_partition: &TopicIdPartition,
628 leader_epoch: i32,
629 ) -> Result<Option<i64>, RemoteStorageError> {
630 let mp = metadata_partition_for(topic_id_partition, self.log.partition_count());
631 match self.metadata_partition_gate(mp) {
632 ReadGate::Unassigned => Ok(None),
633 ReadGate::NotReady => Err(RemoteStorageError::NotReady { partition: mp }),
634 ReadGate::Ready => self
635 .inner
636 .highest_offset_for_epoch(topic_id_partition, leader_epoch),
637 }
638 }
639
640 fn list_remote_log_segments(
641 &self,
642 topic_id_partition: &TopicIdPartition,
643 ) -> Result<Vec<RemoteLogSegmentMetadata>, RemoteStorageError> {
644 let mp = metadata_partition_for(topic_id_partition, self.log.partition_count());
645 match self.metadata_partition_gate(mp) {
646 ReadGate::Unassigned => Ok(Vec::new()),
649 ReadGate::NotReady => Err(RemoteStorageError::NotReady { partition: mp }),
650 ReadGate::Ready => self.inner.list_remote_log_segments(topic_id_partition),
651 }
652 }
653
654 fn list_remote_log_segments_by_epoch(
655 &self,
656 topic_id_partition: &TopicIdPartition,
657 leader_epoch: i32,
658 ) -> Result<Vec<RemoteLogSegmentMetadata>, RemoteStorageError> {
659 self.inner
660 .list_remote_log_segments_by_epoch(topic_id_partition, leader_epoch)
661 }
662
663 fn put_remote_partition_delete_metadata(
664 &self,
665 metadata: RemotePartitionDeleteMetadata,
666 ) -> Result<(), RemoteStorageError> {
667 let tp = metadata.topic_id_partition.clone();
668 let event = MetadataEvent::PartitionDelete(metadata).encode();
669 self.publish_and_wait(&tp, event)
670 }
671}
672
673async fn pump_loop(
674 mut stream: MetadataEventStream,
675 inner: Arc<InmemoryRemoteLogMetadataManager>,
676 applied: Arc<std::sync::Mutex<Vec<i64>>>,
677 applied_tx: watch::Sender<u64>,
678 shutdown: CancellationToken,
679) {
680 let mut version: u64 = 0;
681 loop {
682 let next = tokio::select! {
683 biased;
684 () = shutdown.cancelled() => return,
685 n = stream.next() => n,
686 };
687 let Some(record) = next else { return };
688 match MetadataEvent::decode(&record.payload) {
689 Ok(MetadataEvent::AddSegment(md)) => {
690 if let Err(e) = inner.add_remote_log_segment_metadata(md) {
691 warn!(error = ?e, partition = record.partition, offset = record.offset,
692 "topic-based RLMM: add replay rejected");
693 }
694 }
695 Ok(MetadataEvent::UpdateSegment(u)) => {
696 if let Err(e) = inner.update_remote_log_segment_metadata(u) {
697 warn!(error = ?e, partition = record.partition, offset = record.offset,
698 "topic-based RLMM: update replay rejected");
699 }
700 }
701 Ok(MetadataEvent::PartitionDelete(d)) => {
702 if let Err(e) = inner.put_remote_partition_delete_metadata(d) {
703 warn!(error = ?e, partition = record.partition, offset = record.offset,
704 "topic-based RLMM: partition-delete replay rejected");
705 }
706 }
707 Err(e) => {
708 warn!(error = ?e, partition = record.partition, offset = record.offset,
709 "topic-based RLMM: failed to decode event");
710 }
711 }
712 if let Ok(idx) = usize::try_from(record.partition) {
713 let mut a = applied.lock().expect("applied mutex poisoned");
714 if idx < a.len() && record.offset > a[idx] {
715 a[idx] = record.offset;
716 }
717 }
718 version = version.wrapping_add(1);
719 let _ = applied_tx.send(version);
720 }
721}
722
723#[cfg(test)]
724mod tests {
725 use super::*;
726 use assert2::assert;
727 use std::collections::BTreeMap;
728 use uuid::Uuid;
729
730 use crabka_remote_storage::{CustomMetadata, RemoteLogSegmentId, RemotePartitionDeleteState};
731
732 use crate::error::MetadataLogError;
733 use crate::log::{AssignmentHandle, InProcessMetadataEventLog, MetadataEventStream};
734
735 struct HwmFlakyLog {
740 inner: Arc<InProcessMetadataEventLog>,
741 fail_hwm: std::sync::atomic::AtomicBool,
742 }
743
744 impl HwmFlakyLog {
745 fn new(partition_count: i32) -> Arc<Self> {
746 Arc::new(Self {
747 inner: InProcessMetadataEventLog::new(partition_count),
748 fail_hwm: std::sync::atomic::AtomicBool::new(false),
749 })
750 }
751 fn set_fail_hwm(&self, fail: bool) {
752 self.fail_hwm
753 .store(fail, std::sync::atomic::Ordering::SeqCst);
754 }
755 }
756
757 #[async_trait::async_trait]
758 impl MetadataEventLog for HwmFlakyLog {
759 fn partition_count(&self) -> i32 {
760 self.inner.partition_count()
761 }
762 async fn publish(&self, partition: i32, event: Bytes) -> Result<i64, MetadataLogError> {
763 self.inner.publish(partition, event).await
764 }
765 fn subscribe(
766 &self,
767 assignment: Vec<PartitionStart>,
768 ) -> (MetadataEventStream, Arc<dyn AssignmentHandle>) {
769 self.inner.subscribe(assignment)
770 }
771 async fn high_water_marks(&self) -> Result<Vec<i64>, MetadataLogError> {
772 if self.fail_hwm.load(std::sync::atomic::Ordering::SeqCst) {
773 return Err(MetadataLogError::Other("injected HWM failure".into()));
774 }
775 self.inner.high_water_marks().await
776 }
777 }
778
779 static SNAP_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
780
781 fn snapshot_test_dir(label: &str) -> std::path::PathBuf {
782 std::env::temp_dir().join(format!(
783 "crabka-rlmm-{label}-{}-{}",
784 std::process::id(),
785 SNAP_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
786 ))
787 }
788
789 fn tp() -> TopicIdPartition {
790 TopicIdPartition::new(Uuid::from_u128(1), "orders", 0)
791 }
792
793 fn started(id: u128, start: i64, end: i64) -> RemoteLogSegmentMetadata {
794 RemoteLogSegmentMetadata::new(
795 RemoteLogSegmentId::new(tp(), Uuid::from_u128(id)),
796 start,
797 end,
798 end + 1,
799 1,
800 100,
801 2048,
802 RemoteLogSegmentState::CopySegmentStarted,
803 BTreeMap::from([(0, start)]),
804 )
805 .unwrap()
806 }
807
808 fn finish(id: u128) -> RemoteLogSegmentMetadataUpdate {
809 RemoteLogSegmentMetadataUpdate {
810 remote_log_segment_id: RemoteLogSegmentId::new(tp(), Uuid::from_u128(id)),
811 event_timestamp_ms: 200,
812 custom_metadata: Some(CustomMetadata(vec![7])),
813 state: RemoteLogSegmentState::CopySegmentFinished,
814 broker_id: 1,
815 }
816 }
817
818 async fn on_blocking<T, F>(f: F) -> T
821 where
822 F: FnOnce() -> T + Send + 'static,
823 T: Send + 'static,
824 {
825 tokio::task::spawn_blocking(f).await.unwrap()
826 }
827
828 async fn wait_ready(m: &Arc<TopicBasedRemoteLogMetadataManager>, tp: &TopicIdPartition) {
830 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
831 loop {
832 if matches!(m.remote_log_segment_metadata(tp, 0, 42), Ok(Some(_))) {
833 return;
834 }
835 assert!(
836 std::time::Instant::now() < deadline,
837 "partition never became ready"
838 );
839 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
840 }
841 }
842
843 async fn start_manager(
847 log: Arc<dyn MetadataEventLog>,
848 ) -> Arc<TopicBasedRemoteLogMetadataManager> {
849 TopicBasedRemoteLogMetadataManager::start(
850 log,
851 Handle::current(),
852 snapshot_test_dir("test"),
853 std::time::Duration::from_hours(1),
854 )
855 .await
856 .unwrap()
857 }
858
859 async fn start_manager_all(
865 log: Arc<dyn MetadataEventLog>,
866 ) -> Arc<TopicBasedRemoteLogMetadataManager> {
867 let n = log.partition_count();
868 let m = start_manager(log).await;
869 let all: Vec<i32> = (0..n).collect();
870 m.reconcile_assignment(&all).await;
871 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
875 while !all.iter().all(|&mp| m.metadata_partition_ready(mp)) {
876 assert!(
877 std::time::Instant::now() < deadline,
878 "manager did not catch up on all partitions within 5s"
879 );
880 tokio::time::sleep(std::time::Duration::from_millis(2)).await;
881 }
882 m
883 }
884
885 #[tokio::test(flavor = "multi_thread")]
886 async fn add_finish_query_round_trip() {
887 let log: Arc<dyn MetadataEventLog> = InProcessMetadataEventLog::new(4);
888 let m = start_manager_all(log).await;
889 let m2 = m.clone();
890 on_blocking(move || {
891 m2.add_remote_log_segment_metadata(started(10, 0, 99))
892 .unwrap();
893 })
894 .await;
895 let m2 = m.clone();
896 on_blocking(move || m2.update_remote_log_segment_metadata(finish(10)).unwrap()).await;
897
898 let got = m
899 .remote_log_segment_metadata(&tp(), 0, 42)
900 .unwrap()
901 .expect("segment found");
902 assert!(got.remote_log_segment_id().id == Uuid::from_u128(10));
903 assert!(got.custom_metadata() == Some(&CustomMetadata(vec![7])));
904 assert!(m.highest_offset_for_epoch(&tp(), 0).unwrap() == Some(99));
905 m.shutdown();
906 }
907
908 #[tokio::test(flavor = "multi_thread")]
909 async fn add_with_wrong_state_is_rejected_eagerly() {
910 let log: Arc<dyn MetadataEventLog> = InProcessMetadataEventLog::new(2);
911 let m = start_manager(log.clone()).await;
912 let bad = started(10, 0, 9).with_update(&finish(10)).unwrap();
914 let m2 = m.clone();
915 let err = on_blocking(move || m2.add_remote_log_segment_metadata(bad).unwrap_err()).await;
916 assert!(matches!(err, RemoteStorageError::InvalidAdd { .. }));
917 assert!(log.high_water_marks().await.unwrap() == vec![0; 2]);
919 m.shutdown();
920 }
921
922 #[tokio::test(flavor = "multi_thread")]
923 async fn two_managers_sharing_a_log_converge() {
924 let log: Arc<dyn MetadataEventLog> = InProcessMetadataEventLog::new(4);
925 let a = start_manager_all(log.clone()).await;
926 let b = start_manager_all(log.clone()).await;
927
928 let a2 = a.clone();
929 on_blocking(move || {
930 a2.add_remote_log_segment_metadata(started(10, 0, 99))
931 .unwrap();
932 })
933 .await;
934 let a2 = a.clone();
935 on_blocking(move || a2.update_remote_log_segment_metadata(finish(10)).unwrap()).await;
936
937 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
940 while b.highest_offset_for_epoch(&tp(), 0).unwrap() != Some(99) {
941 assert!(
942 std::time::Instant::now() < deadline,
943 "manager B did not converge within 2s"
944 );
945 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
946 }
947 assert!(b.highest_offset_for_epoch(&tp(), 0).unwrap() == Some(99));
948 let got = b
949 .remote_log_segment_metadata(&tp(), 0, 50)
950 .unwrap()
951 .unwrap();
952 assert!(got.remote_log_segment_id().id == Uuid::from_u128(10));
953
954 a.shutdown();
955 b.shutdown();
956 }
957
958 #[tokio::test(flavor = "multi_thread")]
959 async fn restart_rehydrates_from_log() {
960 let log: Arc<dyn MetadataEventLog> = InProcessMetadataEventLog::new(4);
961 {
962 let m = start_manager_all(log.clone()).await;
963 for (id, start, end) in [(10u128, 0, 99), (11, 100, 199), (12, 200, 299)] {
964 let m2 = m.clone();
965 on_blocking(move || {
966 m2.add_remote_log_segment_metadata(started(id, start, end))
967 .unwrap();
968 })
969 .await;
970 let m2 = m.clone();
971 on_blocking(move || m2.update_remote_log_segment_metadata(finish(id)).unwrap())
972 .await;
973 }
974 m.shutdown();
975 }
976
977 let fresh = start_manager_all(log).await;
980 let listed = fresh.list_remote_log_segments(&tp()).unwrap();
981 assert!(listed.len() == 3);
982 assert!(listed[0].start_offset() == 0);
983 assert!(listed[2].end_offset() == 299);
984 assert!(fresh.highest_offset_for_epoch(&tp(), 0).unwrap() == Some(299));
985 fresh.shutdown();
986 }
987
988 #[tokio::test(flavor = "multi_thread")]
989 async fn partition_delete_lifecycle_round_trip() {
990 let log: Arc<dyn MetadataEventLog> = InProcessMetadataEventLog::new(2);
991 let m = start_manager_all(log).await;
992 for state in [
993 RemotePartitionDeleteState::DeletePartitionMarked,
994 RemotePartitionDeleteState::DeletePartitionStarted,
995 RemotePartitionDeleteState::DeletePartitionFinished,
996 ] {
997 let m2 = m.clone();
998 on_blocking(move || {
999 m2.put_remote_partition_delete_metadata(RemotePartitionDeleteMetadata {
1000 topic_id_partition: tp(),
1001 state,
1002 event_timestamp_ms: 500,
1003 broker_id: 1,
1004 })
1005 .unwrap();
1006 })
1007 .await;
1008 }
1009 m.shutdown();
1010 }
1011
1012 #[tokio::test(flavor = "multi_thread")]
1013 async fn shutdown_flushes_a_snapshot_covering_applied_events() {
1014 let dir = snapshot_test_dir("mgr-snap");
1015 let log: Arc<dyn MetadataEventLog> = InProcessMetadataEventLog::new(4);
1016 let m = TopicBasedRemoteLogMetadataManager::start(
1017 log.clone(),
1018 Handle::current(),
1019 dir.clone(),
1020 std::time::Duration::from_hours(1), )
1022 .await
1023 .unwrap();
1024 m.reconcile_assignment(&(0..log.partition_count()).collect::<Vec<_>>())
1025 .await;
1026 let m2 = m.clone();
1027 on_blocking(move || {
1028 m2.add_remote_log_segment_metadata(started(10, 0, 99))
1029 .unwrap();
1030 })
1031 .await;
1032 let m2 = m.clone();
1033 on_blocking(move || m2.update_remote_log_segment_metadata(finish(10)).unwrap()).await;
1034
1035 m.shutdown_and_flush().await;
1036
1037 let path = dir.join(crate::snapshot::SNAPSHOT_FILE_NAME);
1038 let snap = crate::snapshot::Snapshot::load(&path)
1039 .unwrap()
1040 .expect("snapshot written");
1041 let p = crate::partitioning::metadata_partition_for(&tp(), 4);
1043 let idx = usize::try_from(p).unwrap();
1044 assert!(
1045 snap.committed_offsets[idx] >= 1,
1046 "committed >= last applied offset"
1047 );
1048 assert!(snap.dump.partitions.len() == 1);
1050 assert!(snap.dump.partitions[0].segments.len() == 1);
1051 std::fs::remove_dir_all(&dir).ok();
1052 }
1053
1054 #[tokio::test(flavor = "multi_thread")]
1055 async fn restart_resumes_from_snapshot_without_replaying_from_zero() {
1056 let dir = snapshot_test_dir("resume");
1057 let log: Arc<dyn MetadataEventLog> = InProcessMetadataEventLog::new(4);
1058 let interval = std::time::Duration::from_hours(1);
1059
1060 let pre_cache;
1062 {
1063 let m = TopicBasedRemoteLogMetadataManager::start(
1064 log.clone(),
1065 Handle::current(),
1066 dir.clone(),
1067 interval,
1068 )
1069 .await
1070 .unwrap();
1071 m.reconcile_assignment(&(0..log.partition_count()).collect::<Vec<_>>())
1072 .await;
1073 for (id, start, end) in [(10u128, 0, 99), (11, 100, 199), (12, 200, 299)] {
1074 let m2 = m.clone();
1075 on_blocking(move || {
1076 m2.add_remote_log_segment_metadata(started(id, start, end))
1077 .unwrap();
1078 })
1079 .await;
1080 let m2 = m.clone();
1081 on_blocking(move || m2.update_remote_log_segment_metadata(finish(id)).unwrap())
1082 .await;
1083 }
1084 pre_cache = m.list_remote_log_segments(&tp()).unwrap();
1085 m.shutdown_and_flush().await;
1086 }
1087
1088 let p = crate::partitioning::metadata_partition_for(&tp(), 4);
1090 let idx = usize::try_from(p).unwrap();
1091 let snap = crate::snapshot::Snapshot::load(&dir.join(crate::snapshot::SNAPSHOT_FILE_NAME))
1092 .unwrap()
1093 .expect("snapshot present");
1094 let committed = snap.committed_offsets[idx];
1095 assert!(
1096 committed >= 5,
1097 "6 events (3 add + 3 finish) → committed >= 5"
1098 );
1099
1100 let (resumed_committed, assignment) =
1103 TopicBasedRemoteLogMetadataManager::resume_from_snapshot(Some(&snap), 4);
1104 let orders_start = assignment
1105 .iter()
1106 .find(|s| s.partition == p)
1107 .map(|s| s.start_offset)
1108 .unwrap();
1109 assert!(orders_start == committed + 1, "resume from N+1, not 0");
1110 assert!(resumed_committed[idx] == committed);
1111
1112 let fresh = TopicBasedRemoteLogMetadataManager::start(
1114 log.clone(),
1115 Handle::current(),
1116 dir.clone(),
1117 interval,
1118 )
1119 .await
1120 .unwrap();
1121 assert!(fresh.committed_offset(p) == committed);
1124 fresh
1129 .reconcile_assignment(&(0..log.partition_count()).collect::<Vec<_>>())
1130 .await;
1131 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
1132 while !fresh.metadata_partition_ready(p) {
1133 assert!(
1134 std::time::Instant::now() < deadline,
1135 "fresh manager did not catch up on the orders partition"
1136 );
1137 tokio::time::sleep(std::time::Duration::from_millis(2)).await;
1138 }
1139 let post_cache = fresh.list_remote_log_segments(&tp()).unwrap();
1140 assert!(
1141 post_cache == pre_cache,
1142 "post-load cache equals pre-restart cache"
1143 );
1144 assert!(fresh.highest_offset_for_epoch(&tp(), 0).unwrap() == Some(299));
1145 fresh.shutdown();
1146 std::fs::remove_dir_all(&dir).ok();
1147 }
1148
1149 #[tokio::test(flavor = "multi_thread")]
1150 async fn add_then_remove_drives_assignment_and_readiness() {
1151 use crate::partitioning::metadata_partition_for;
1152
1153 let log: Arc<dyn MetadataEventLog> = InProcessMetadataEventLog::new(4);
1154 {
1156 let writer = start_manager_all(log.clone()).await;
1157 let w2 = writer.clone();
1158 on_blocking(move || {
1159 w2.add_remote_log_segment_metadata(started(10, 0, 99))
1160 .unwrap();
1161 })
1162 .await;
1163 let w2 = writer.clone();
1164 on_blocking(move || w2.update_remote_log_segment_metadata(finish(10)).unwrap()).await;
1165 writer.shutdown();
1166 }
1167
1168 let mp = metadata_partition_for(&tp(), log.partition_count());
1169 let m = start_manager(log).await;
1170
1171 assert!(matches!(
1173 m.remote_log_segment_metadata(&tp(), 0, 42),
1174 Ok(None)
1175 ));
1176
1177 m.reconcile_assignment(&[mp]).await;
1180 assert!(m.assigned_metadata_partitions() == vec![mp]);
1181
1182 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
1183 loop {
1184 match m.remote_log_segment_metadata(&tp(), 0, 42) {
1185 Ok(Some(md)) => {
1186 assert!(md.remote_log_segment_id().id == Uuid::from_u128(10));
1187 break;
1188 }
1189 Err(RemoteStorageError::NotReady { partition }) => {
1190 assert!(partition == mp, "NotReady names the catching-up partition");
1191 assert!(
1192 std::time::Instant::now() < deadline,
1193 "metadata partition never became ready"
1194 );
1195 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
1196 }
1197 other => panic!("unexpected read outcome: {other:?}"),
1198 }
1199 }
1200
1201 m.reconcile_assignment(&[]).await;
1204 assert!(m.assigned_metadata_partitions().is_empty());
1205 assert!(matches!(
1206 m.remote_log_segment_metadata(&tp(), 0, 42),
1207 Ok(None)
1208 ));
1209 m.shutdown();
1210 }
1211
1212 #[tokio::test(flavor = "multi_thread")]
1213 async fn unknown_partition_query_is_none() {
1214 let log: Arc<dyn MetadataEventLog> = InProcessMetadataEventLog::new(2);
1215 let m = start_manager(log).await;
1216 let other = TopicIdPartition::new(Uuid::from_u128(999), "nope", 0);
1217 assert!(m.remote_log_segment_metadata(&other, 0, 0).unwrap() == None);
1218 assert!(m.highest_offset_for_epoch(&other, 0).unwrap() == None);
1219 assert!(m.list_remote_log_segments(&other).unwrap().is_empty());
1220 m.shutdown();
1221 }
1222
1223 #[tokio::test(flavor = "multi_thread")]
1224 async fn two_brokers_split_metadata_partitions() {
1225 use crate::partitioning::metadata_partition_for;
1226
1227 let n = 16;
1230 let topic_id = Uuid::from_u128(0xFEED);
1231 let tp_a = TopicIdPartition::new(topic_id, "orders", 0);
1232 let tp_b = TopicIdPartition::new(topic_id, "orders", 1);
1233 let mp_a = metadata_partition_for(&tp_a, n);
1234 let mp_b = metadata_partition_for(&tp_b, n);
1235 assert!(
1236 mp_a != mp_b,
1237 "test needs the two partitions in distinct buckets"
1238 );
1239
1240 let log: Arc<dyn MetadataEventLog> = InProcessMetadataEventLog::new(n);
1241
1242 for (tp, id) in [(tp_a.clone(), 100u128), (tp_b.clone(), 200)] {
1245 let w = start_manager_all(log.clone()).await;
1246 let started = RemoteLogSegmentMetadata::new(
1247 RemoteLogSegmentId::new(tp.clone(), Uuid::from_u128(id)),
1248 0,
1249 99,
1250 100,
1251 1,
1252 100,
1253 2048,
1254 RemoteLogSegmentState::CopySegmentStarted,
1255 BTreeMap::from([(0, 0)]),
1256 )
1257 .unwrap();
1258 let w2 = w.clone();
1259 on_blocking(move || w2.add_remote_log_segment_metadata(started).unwrap()).await;
1260 let upd = RemoteLogSegmentMetadataUpdate {
1261 remote_log_segment_id: RemoteLogSegmentId::new(tp, Uuid::from_u128(id)),
1262 event_timestamp_ms: 200,
1263 custom_metadata: None,
1264 state: RemoteLogSegmentState::CopySegmentFinished,
1265 broker_id: 1,
1266 };
1267 let w2 = w.clone();
1268 on_blocking(move || w2.update_remote_log_segment_metadata(upd).unwrap()).await;
1269 w.shutdown();
1270 }
1271
1272 let a = start_manager(log.clone()).await;
1274 let b = start_manager(log).await;
1275 a.reconcile_assignment(&[mp_a]).await;
1276 b.reconcile_assignment(&[mp_b]).await;
1277
1278 assert!(a.assigned_metadata_partitions() == vec![mp_a]);
1279 assert!(b.assigned_metadata_partitions() == vec![mp_b]);
1280 assert!(
1282 a.assigned_metadata_partitions()
1283 .iter()
1284 .all(|p| !b.assigned_metadata_partitions().contains(p)),
1285 "shares must be disjoint"
1286 );
1287
1288 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
1290 loop {
1291 let a_own = a.remote_log_segment_metadata(&tp_a, 0, 42);
1292 let b_own = b.remote_log_segment_metadata(&tp_b, 0, 42);
1293 if matches!(a_own, Ok(Some(_))) && matches!(b_own, Ok(Some(_))) {
1294 break;
1295 }
1296 assert!(
1297 std::time::Instant::now() < deadline,
1298 "managers did not catch up: a={a_own:?} b={b_own:?}"
1299 );
1300 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
1301 }
1302
1303 assert!(
1306 matches!(a.remote_log_segment_metadata(&tp_b, 0, 42), Ok(None)),
1307 "A does not consume mp_b → genuine miss"
1308 );
1309 assert!(
1310 matches!(b.remote_log_segment_metadata(&tp_a, 0, 42), Ok(None)),
1311 "B does not consume mp_a → genuine miss"
1312 );
1313
1314 a.shutdown();
1315 b.shutdown();
1316 }
1317
1318 #[tokio::test(flavor = "multi_thread")]
1324 async fn reassignment_remove_then_readd_applies_no_duplicates() {
1325 use crate::partitioning::metadata_partition_for;
1326
1327 let log: Arc<dyn MetadataEventLog> = InProcessMetadataEventLog::new(4);
1328 {
1330 let writer = start_manager_all(log.clone()).await;
1331 let w2 = writer.clone();
1332 on_blocking(move || {
1333 w2.add_remote_log_segment_metadata(started(10, 0, 99))
1334 .unwrap();
1335 })
1336 .await;
1337 let w2 = writer.clone();
1338 on_blocking(move || w2.update_remote_log_segment_metadata(finish(10)).unwrap()).await;
1339 writer.shutdown();
1340 }
1341
1342 let mp = metadata_partition_for(&tp(), log.partition_count());
1343 let m = start_manager(log).await;
1344
1345 m.reconcile_assignment(&[mp]).await;
1347 wait_ready(&m, &tp()).await;
1348 assert!(
1349 m.list_remote_log_segments(&tp()).unwrap().len() == 1,
1350 "one segment after first assignment"
1351 );
1352
1353 m.reconcile_assignment(&[]).await;
1355 assert!(m.assigned_metadata_partitions().is_empty());
1356
1357 m.reconcile_assignment(&[mp]).await;
1361 wait_ready(&m, &tp()).await;
1362
1363 let listed = m.list_remote_log_segments(&tp()).unwrap();
1364 assert!(
1365 listed.len() == 1,
1366 "remove + re-add must not duplicate the segment, got {listed:?}"
1367 );
1368 assert!(listed[0].remote_log_segment_id().id == Uuid::from_u128(10));
1369 assert!(m.highest_offset_for_epoch(&tp(), 0).unwrap() == Some(99));
1371
1372 m.shutdown();
1373 }
1374
1375 #[tokio::test(flavor = "multi_thread")]
1381 async fn hwm_fetch_failure_gates_not_ready_then_self_heals() {
1382 use crate::partitioning::metadata_partition_for;
1383
1384 let flaky = HwmFlakyLog::new(4);
1385 let log: Arc<dyn MetadataEventLog> = flaky.clone();
1386
1387 {
1390 let writer = start_manager_all(log.clone()).await;
1391 let w2 = writer.clone();
1392 on_blocking(move || {
1393 w2.add_remote_log_segment_metadata(started(10, 0, 99))
1394 .unwrap();
1395 })
1396 .await;
1397 let w2 = writer.clone();
1398 on_blocking(move || w2.update_remote_log_segment_metadata(finish(10)).unwrap()).await;
1399 writer.shutdown();
1400 }
1401
1402 let mp = metadata_partition_for(&tp(), log.partition_count());
1403 let m = start_manager(log).await;
1404
1405 flaky.set_fail_hwm(true);
1409 m.reconcile_assignment(&[mp]).await;
1410 assert!(
1411 m.assigned_metadata_partitions() == vec![mp],
1412 "partition is assigned even though HWM is unknown (broker owns it)"
1413 );
1414
1415 let deadline = std::time::Instant::now() + std::time::Duration::from_millis(300);
1419 while std::time::Instant::now() < deadline {
1420 match m.remote_log_segment_metadata(&tp(), 0, 42) {
1421 Err(RemoteStorageError::NotReady { partition }) => assert!(partition == mp),
1422 other => panic!("HWM-unknown partition must read NotReady, got {other:?}"),
1423 }
1424 match m.list_remote_log_segments(&tp()) {
1426 Err(RemoteStorageError::NotReady { partition }) => assert!(partition == mp),
1427 other => panic!("HWM-unknown partition list must be NotReady, got {other:?}"),
1428 }
1429 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1430 }
1431
1432 flaky.set_fail_hwm(false);
1437 m.reconcile_assignment(&[mp]).await;
1438 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
1439 loop {
1440 match m.remote_log_segment_metadata(&tp(), 0, 42) {
1441 Ok(Some(md)) => {
1442 assert!(md.remote_log_segment_id().id == Uuid::from_u128(10));
1443 break;
1444 }
1445 Err(RemoteStorageError::NotReady { partition }) => {
1446 assert!(partition == mp);
1447 assert!(
1448 std::time::Instant::now() < deadline,
1449 "partition never became ready after HWM recovered"
1450 );
1451 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
1452 }
1453 other => panic!("unexpected read outcome after recovery: {other:?}"),
1454 }
1455 }
1456 assert!(m.list_remote_log_segments(&tp()).unwrap().len() == 1);
1458 assert!(m.highest_offset_for_epoch(&tp(), 0).unwrap() == Some(99));
1459
1460 m.shutdown();
1461 }
1462}