Skip to main content

crabka_remote_storage_topic/
manager.rs

1//! [`TopicBasedRemoteLogMetadataManager`] — production
2//! [`RemoteLogMetadataManager`] implementation backed by a publish /
3//! subscribe [`MetadataEventLog`].
4//!
5//! The manager keeps the canonical in-memory view in an
6//! [`InmemoryRemoteLogMetadataManager`] (so the lifecycle state
7//! machine is the single source of truth for cache mutation) and uses
8//! the [`MetadataEventLog`] as the durable event log.
9//!
10//! Lifecycle:
11//!
12//! - [`TopicBasedRemoteLogMetadataManager::start`]: load any on-disk
13//!   snapshot into the cache and spawn the consumer pump subscribed to
14//!   NOTHING. The broker then drives the consumed set via
15//!   [`TopicBasedRemoteLogMetadataManager::reconcile_assignment`], adding
16//!   only the `__remote_log_metadata` partitions covering the
17//!   user-partitions this broker leads or follows. A newly-added partition
18//!   is gated by `NotReady` until the pump reaches the HWM observed at
19//!   assignment time; a partition this broker does not consume is a genuine
20//!   `Ok(None)` (never served from any stale cache).
21//! - Mutation calls (`add`/`update`/`put_partition_delete`):
22//!   serialize, publish, and wait until the consumer pump has applied
23//!   the published offset to the inner cache. The sync return implies
24//!   "the event has been recorded and is visible to local reads".
25//! - Read calls: pure local lookups against the inner cache.
26//! - Drop / [`TopicBasedRemoteLogMetadataManager::shutdown`]: cancel the consumer pump.
27
28use std::sync::Arc;
29
30use bytes::Bytes;
31use futures_util::StreamExt;
32use tokio::runtime::Handle;
33use tokio::sync::watch;
34use tokio::task::JoinHandle;
35use tokio_util::sync::CancellationToken;
36use tracing::warn;
37
38use crabka_remote_storage::{
39    InmemoryRemoteLogMetadataManager, RemoteLogMetadataManager, RemoteLogSegmentMetadata,
40    RemoteLogSegmentMetadataUpdate, RemoteLogSegmentState, RemotePartitionDeleteMetadata,
41    RemoteStorageError, TopicIdPartition,
42};
43
44use crate::error::MetadataLogError;
45use crate::log::{AssignmentHandle, MetadataEventLog, MetadataEventStream, PartitionStart};
46use crate::partitioning::metadata_partition_for;
47use crate::serde::MetadataEvent;
48
49/// Sentinel target HWM meaning "this partition is assigned but its real
50/// high-water mark is not yet known" (the `high_water_marks` RPC failed,
51/// or the partition had no entry in the returned index). The gate treats
52/// it as `NotReady` (a real applied offset can never reach `i64::MAX`),
53/// and the next `reconcile_assignment` re-attempts the HWM fetch to
54/// replace it with the real target.
55const HWM_UNKNOWN: i64 = i64::MAX;
56
57/// Outcome of the per-metadata-partition readiness check that gates the
58/// gated [`RemoteLogMetadataManager`] read methods
59/// ([`RemoteLogMetadataManager::remote_log_segment_metadata`],
60/// [`RemoteLogMetadataManager::list_remote_log_segments`],
61/// [`RemoteLogMetadataManager::highest_offset_for_epoch`]).
62enum ReadGate {
63    /// This broker does not consume the metadata partition (neither leads
64    /// nor follows any covered user-partition) → answer `Ok(None)`.
65    Unassigned,
66    /// Assigned but the consumer pump has not reached the assignment-time
67    /// HWM → answer `Err(NotReady)` (retryable).
68    NotReady,
69    /// Assigned and caught up → delegate to the inner cache.
70    Ready,
71}
72
73/// Production [`RemoteLogMetadataManager`] backed by the
74/// `__remote_log_metadata` topic (via a [`MetadataEventLog`]
75/// adapter).
76///
77/// Construct with [`Self::start`]; it loads any on-disk snapshot but
78/// consumes no metadata partitions until [`Self::reconcile_assignment`]
79/// adds the broker's leader/follower-derived set.
80pub struct TopicBasedRemoteLogMetadataManager {
81    log: Arc<dyn MetadataEventLog>,
82    inner: Arc<InmemoryRemoteLogMetadataManager>,
83    applied: Arc<std::sync::Mutex<Vec<i64>>>,
84    applied_tx: watch::Sender<u64>,
85    runtime: Handle,
86    shutdown: CancellationToken,
87    pump: std::sync::Mutex<Option<JoinHandle<()>>>,
88    /// Directory the on-disk RLMM cache snapshot is written to (one
89    /// [`SNAPSHOT_FILE_NAME`](crate::snapshot::SNAPSHOT_FILE_NAME) file).
90    snapshot_dir: std::path::PathBuf,
91    /// Handle of the background snapshotter task; aborted on `Drop`,
92    /// joined on [`Self::shutdown_and_flush`].
93    snapshotter: std::sync::Mutex<Option<JoinHandle<()>>>,
94    /// Live assignment handle for the metadata-log subscription. Held so
95    /// resume-from-snapshot and per-broker partition-assignment logic
96    /// assignment) can mutate the consumed set at runtime. Driven by
97    /// [`Self::reconcile_assignment`].
98    assignment: Arc<dyn AssignmentHandle>,
99    /// Per-metadata-partition committed offsets loaded from the snapshot
100    /// at `start()`, indexed by metadata partition (`-1` == no committed
101    /// event for that partition / full replay). Retained as the single
102    /// canonical source for resume-offset lookups; assignment
103    /// reconciler reads it via [`Self::committed_offset`] when it
104    /// dynamically adds a partition (to start at `committed + 1`).
105    committed_offsets: Vec<i64>,
106    /// Metadata partition → target HWM observed at assignment time.
107    /// Presence == this manager is currently assigned that partition;
108    /// reads for a user-partition hashing into it return
109    /// [`RemoteStorageError::NotReady`] until `applied[mp] >= target - 1`.
110    /// Empty for managers that never call [`Self::reconcile_assignment`]
111    /// (every read then delegates straight to `inner`).
112    ready_targets: Arc<std::sync::Mutex<std::collections::HashMap<i32, i64>>>,
113}
114
115impl TopicBasedRemoteLogMetadataManager {
116    /// Load any on-disk snapshot into the cache and spawn the consumer
117    /// pump with an empty assignment. The manager consumes nothing until
118    /// [`Self::reconcile_assignment`] is driven (by the broker).
119    ///
120    /// `runtime` must be a Tokio runtime handle that lives at least
121    /// as long as the returned manager. The synchronous
122    /// [`RemoteLogMetadataManager`] methods bridge to this handle via
123    /// `block_on`, so they must NOT be called from a task running on
124    /// this same runtime — the broker only invokes them through
125    /// `spawn_blocking`, which is the only supported call pattern.
126    ///
127    /// # Errors
128    ///
129    /// Currently infallible (the consumed set starts empty), but returns a
130    /// `Result` so the bootstrap contract stays stable if `start` regains a
131    /// fallible step.
132    // Kept `async` for the established 4-arg bootstrap contract the broker
133    // awaits; bootstrap no longer blocks on catch-up (it consumes nothing
134    // until `reconcile_assignment`), so there is no internal `.await`.
135    #[allow(clippy::unused_async)]
136    pub async fn start(
137        log: Arc<dyn MetadataEventLog>,
138        runtime: Handle,
139        snapshot_dir: std::path::PathBuf,
140        snapshot_interval: std::time::Duration,
141    ) -> Result<Arc<Self>, RemoteStorageError> {
142        let n = usize::try_from(log.partition_count()).expect("partition_count fits in usize");
143        let (applied_tx, _) = watch::channel(0u64);
144        let inner = Arc::new(InmemoryRemoteLogMetadataManager::new());
145        let shutdown = CancellationToken::new();
146
147        // Load the snapshot (if any) ONCE and seed the cache from its
148        // dump. `resume_from_snapshot` is the single canonical place that
149        // turns a loaded snapshot into the per-partition committed offsets.
150        // On absence/corruption, committed[] is all -1 (full replay) and the
151        // cache stays empty — never fatal.
152        let snapshot = match crate::snapshot::Snapshot::load(
153            &snapshot_dir.join(crate::snapshot::SNAPSHOT_FILE_NAME),
154        ) {
155            Ok(snap) => snap,
156            Err(e) => {
157                warn!(error = ?e, "topic-based RLMM: snapshot corrupt; starting from empty cache");
158                None
159            }
160        };
161        if let Some(snap) = &snapshot {
162            inner.import(snap.dump.clone());
163        }
164        // A freshly-started manager consumes NOTHING. The broker drives
165        // the consumed set via [`Self::reconcile_assignment`], adding only the
166        // metadata partitions covering user-partitions this broker leads or
167        // follows (each resumed at its snapshot `committed + 1`). This is what
168        // makes an unassigned partition a genuine `Ok(None)` rather than a
169        // false hit from globally-replayed state.
170        let (committed, _assignment) = Self::resume_from_snapshot(snapshot.as_ref(), n);
171
172        // Pre-seed `applied` to the committed offsets so readiness checks for
173        // a later-added partition only block on the delta from committed+1 to
174        // the assignment-time HWM.
175        let applied = Arc::new(std::sync::Mutex::new(committed.clone()));
176
177        let (stream, assignment_handle) = log.subscribe(Vec::new());
178        let pump = runtime.spawn(pump_loop(
179            stream,
180            inner.clone(),
181            applied.clone(),
182            applied_tx.clone(),
183            shutdown.clone(),
184        ));
185
186        let manager = Arc::new(Self {
187            log,
188            inner,
189            applied,
190            applied_tx,
191            runtime,
192            shutdown,
193            pump: std::sync::Mutex::new(Some(pump)),
194            snapshot_dir,
195            snapshotter: std::sync::Mutex::new(None),
196            assignment: assignment_handle,
197            committed_offsets: committed,
198            ready_targets: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
199        });
200
201        // Spawn the periodic snapshotter: flush whenever the cache
202        // advanced since the last write, plus a final flush on shutdown.
203        let snapshotter = {
204            let weak = Arc::downgrade(&manager);
205            let shutdown = manager.shutdown.clone();
206            manager.runtime.spawn(async move {
207                let mut last_written: i64 = -1;
208                loop {
209                    tokio::select! {
210                        biased;
211                        () = shutdown.cancelled() => return,
212                        () = tokio::time::sleep(snapshot_interval) => {}
213                    }
214                    let Some(m) = weak.upgrade() else { return };
215                    // Only write when the cache advanced since the last snapshot.
216                    let highest = {
217                        let applied = m.applied.lock().expect("applied mutex poisoned");
218                        applied.iter().copied().max().unwrap_or(-1)
219                    };
220                    if highest > last_written {
221                        match m.write_snapshot() {
222                            Ok(written) => last_written = written,
223                            Err(e) => {
224                                warn!(error = ?e, "topic-based RLMM: periodic snapshot failed");
225                            }
226                        }
227                    }
228                }
229            })
230        };
231        *manager
232            .snapshotter
233            .lock()
234            .expect("snapshotter mutex poisoned") = Some(snapshotter);
235
236        // Nothing is consumed at bootstrap (empty assignment), so the manager
237        // is immediately ready. Per-partition catch-up after a later
238        // `reconcile_assignment` is governed by `metadata_partition_ready`,
239        // which gates reads with `NotReady` until the pump reaches the
240        // assignment-time HWM.
241        Ok(manager)
242    }
243
244    /// Cancel the consumer pump. Read methods continue to work
245    /// against whatever was applied before shutdown; mutation methods
246    /// will time out / fail to make progress.
247    pub fn shutdown(&self) {
248        self.shutdown.cancel();
249    }
250
251    /// Cancel the pump + snapshotter, then write a final snapshot
252    /// capturing everything applied so far. Safe to call once on
253    /// graceful shutdown.
254    pub async fn shutdown_and_flush(&self) {
255        self.shutdown.cancel();
256        // Take the handle out of the lock BEFORE awaiting it, so the
257        // (sync) mutex is not held across the await point.
258        let handle = self
259            .snapshotter
260            .lock()
261            .expect("snapshotter mutex poisoned")
262            .take();
263        // Let the snapshotter observe cancellation and stop touching
264        // `applied` before we take the final consistent capture.
265        if let Some(h) = handle {
266            let _ = h.await;
267        }
268        if let Err(e) = self.write_snapshot() {
269            warn!(error = ?e, "topic-based RLMM: final snapshot flush failed");
270        }
271    }
272
273    /// Capture the pump's committed offsets together with a cache
274    /// export under a consistent lock, and write a snapshot. The
275    /// `applied` lock is held only long enough to clone the offsets and
276    /// run `export()` (which takes the inner partitions lock); no Kafka
277    /// round-trips happen inside, so the hold is bounded. Returns the
278    /// highest committed offset written (for the "advanced since last"
279    /// check).
280    fn write_snapshot(&self) -> Result<i64, crate::error::SnapshotError> {
281        // Benign-replay invariant: the pump updates `inner` BEFORE bumping
282        // `applied`, so the captured cache may lead the captured committed
283        // offset by at most one event (the in-flight one). On resume that
284        // single event is replayed from committed+1 and harmlessly
285        // re-rejected: a re-applied AddSegment hits already-exists, and a
286        // re-applied finished→finished update is a no-op. The dangerous
287        // direction — cache BEHIND committed, which would skip an event on
288        // resume — cannot occur because inner is always updated first.
289        let (committed_offsets, dump) = {
290            let applied = self.applied.lock().expect("applied mutex poisoned");
291            let dump = self.inner.export();
292            (applied.clone(), dump)
293        };
294        let max = committed_offsets.iter().copied().max().unwrap_or(-1);
295        let snap = crate::snapshot::Snapshot {
296            committed_offsets,
297            dump,
298        };
299        let path = self.snapshot_dir.join(crate::snapshot::SNAPSHOT_FILE_NAME);
300        snap.write_atomic(&path)?;
301        Ok(max)
302    }
303
304    /// Canonical resume-from-snapshot computation, shared by `start()` and
305    /// the resume tests. Given an already-loaded snapshot (or `None` for a
306    /// missing/corrupt one) and the metadata-partition count `n`, produce:
307    ///
308    /// - the per-partition committed offsets, indexed by metadata
309    ///   partition and padded/truncated to `n` (`-1` == no committed
310    ///   event → full replay for that partition), and
311    /// - the metadata-consumer assignment that resumes each partition at
312    ///   `committed + 1`.
313    ///
314    /// This is the ONLY place the `committed + 1` resume policy lives; do
315    /// not recompute it elsewhere.
316    fn resume_from_snapshot(
317        snapshot: Option<&crate::snapshot::Snapshot>,
318        n: usize,
319    ) -> (Vec<i64>, Vec<PartitionStart>) {
320        let mut committed = vec![-1i64; n];
321        if let Some(snap) = snapshot {
322            for (i, &off) in snap.committed_offsets.iter().take(n).enumerate() {
323                committed[i] = off;
324            }
325        }
326        let assignment = (0..n)
327            .map(|i| PartitionStart {
328                partition: i32::try_from(i).expect("partition fits in i32"),
329                start_offset: committed[i] + 1,
330            })
331            .collect();
332        (committed, assignment)
333    }
334
335    /// Committed offset loaded from the snapshot for a single metadata
336    /// partition, or `-1` when the partition is out of range or had no
337    /// committed event (full replay). The assignment reconciler uses
338    /// this to start a dynamically-added partition at `committed + 1`.
339    #[must_use]
340    pub fn committed_offset(&self, partition: i32) -> i64 {
341        usize::try_from(partition)
342            .ok()
343            .and_then(|i| self.committed_offsets.get(i).copied())
344            .unwrap_or(-1)
345    }
346
347    /// The read decision for metadata partition `mp`, used to gate
348    /// [`RemoteLogMetadataManager::remote_log_segment_metadata`].
349    fn metadata_partition_gate(&self, mp: i32) -> ReadGate {
350        let target = {
351            let guard = self.ready_targets.lock().expect("ready_targets poisoned");
352            match guard.get(&mp) {
353                Some(&t) => t,
354                // Not assigned: this broker neither leads nor follows any
355                // user-partition in `mp`, so it must not answer from any
356                // stale cache it happened to consume earlier. A genuine
357                // miss — `Ok(None)`, NOT `NotReady`.
358                None => return ReadGate::Unassigned,
359            }
360        };
361        if target == 0 {
362            return ReadGate::Ready; // empty partition: nothing to catch up to
363        }
364        // A sentinel target means the real HWM is not yet known (the
365        // assignment-time fetch failed); the partition is assigned but the
366        // answer is unknown → retryable, never a false `Ok(None)`.
367        if target == HWM_UNKNOWN {
368            return ReadGate::NotReady;
369        }
370        let Ok(idx) = usize::try_from(mp) else {
371            // Defensive: a metadata partition index that doesn't fit in
372            // usize is nonsensical, but if it ever happens we must NOT
373            // fail open into `Ready` (which would serve a possibly-stale
374            // or false-miss answer). Treat it as still catching up.
375            return ReadGate::NotReady;
376        };
377        let applied = self.applied.lock().expect("applied mutex poisoned");
378        if idx < applied.len() && applied[idx] >= target - 1 {
379            ReadGate::Ready
380        } else {
381            ReadGate::NotReady
382        }
383    }
384
385    /// `true` when metadata partition `mp` is assigned and caught up to its
386    /// assignment-time HWM. Used by tests to poll for catch-up.
387    #[cfg(test)]
388    fn metadata_partition_ready(&self, mp: i32) -> bool {
389        matches!(self.metadata_partition_gate(mp), ReadGate::Ready)
390    }
391
392    /// The metadata partitions this manager is currently assigned (tracked
393    /// for readiness). Sorted ascending.
394    #[must_use]
395    pub fn assigned_metadata_partitions(&self) -> Vec<i32> {
396        let mut v: Vec<i32> = self
397            .ready_targets
398            .lock()
399            .expect("ready_targets poisoned")
400            .keys()
401            .copied()
402            .collect();
403        v.sort_unstable();
404        v
405    }
406
407    /// Diff `desired` against the current assignment and drive the
408    /// [`AssignmentHandle`]: add newly-needed partitions (seeded from the
409    /// snapshot committed offset + 1, falling back to 0 when there is
410    /// no committed event) and remove ones no longer needed. Records each
411    /// added partition's assignment-time HWM so reads gate on `NotReady`
412    /// until the pump catches up.
413    ///
414    /// HWM-fetch failure fails CLOSED: a partition whose real high-water
415    /// mark could not be obtained is recorded with the `HWM_UNKNOWN`
416    /// sentinel target so the gate returns `NotReady` (retryable), never a
417    /// false `Ok(None)`. Such partitions are re-attempted on every
418    /// subsequent reconcile (which the broker drives on each image change /
419    /// reconciler tick), so a transient `high_water_marks` failure
420    /// self-heals: the sentinel is replaced with the real target as soon as
421    /// the fetch succeeds.
422    ///
423    /// MUST be driven by a SINGLE task. This method is not internally
424    /// serialized — it interleaves `.await` points with reads/writes of the
425    /// `ready_targets` map under short, non-overlapping locks — so two
426    /// concurrent callers could race the add/remove/refresh logic.
427    /// Correctness relies on the broker invoking it from exactly one
428    /// reconciler task.
429    ///
430    /// Async because it reads the log's high-water marks; the broker calls
431    /// it from its reconciler task (on the runtime), never from a
432    /// `spawn_blocking` thread.
433    pub async fn reconcile_assignment(&self, desired: &[i32]) {
434        use std::collections::HashSet;
435        let want: HashSet<i32> = desired.iter().copied().collect();
436        // Snapshot the current per-partition targets so we can both diff the
437        // assigned set and find partitions still carrying the HWM-unknown
438        // sentinel (which need a refresh). Lock released before the `.await`.
439        let current: std::collections::HashMap<i32, i64> = self
440            .ready_targets
441            .lock()
442            .expect("ready_targets poisoned")
443            .clone();
444        let have: HashSet<i32> = current.keys().copied().collect();
445
446        let needs_add = want.difference(&have).copied().collect::<Vec<_>>();
447        // Partitions still assigned (in want) whose recorded target is the
448        // sentinel: their HWM is still unknown, so re-attempt the fetch.
449        let needs_refresh = want
450            .iter()
451            .copied()
452            .filter(|mp| current.get(mp) == Some(&HWM_UNKNOWN))
453            .collect::<Vec<_>>();
454
455        // One HWM snapshot covers both additions and sentinel refreshes.
456        let needs_hwm = !needs_add.is_empty() || !needs_refresh.is_empty();
457        let hwms = if needs_hwm {
458            match self.log.high_water_marks().await {
459                Ok(h) => Some(h),
460                Err(e) => {
461                    warn!(error = ?e, "topic-based RLMM: high_water_marks fetch failed; \
462                          assigned partitions gate NotReady until a later reconcile refreshes");
463                    None
464                }
465            }
466        } else {
467            None
468        };
469
470        // Resolve a partition's target HWM from the (maybe-missing) snapshot.
471        // A failed fetch (`None`) or a missing per-partition entry both yield
472        // the sentinel so the gate stays NotReady — never fail open to 0.
473        let target_for = |mp: i32| -> i64 {
474            match &hwms {
475                Some(h) => usize::try_from(mp)
476                    .ok()
477                    .and_then(|i| h.get(i).copied())
478                    .unwrap_or(HWM_UNKNOWN),
479                None => HWM_UNKNOWN,
480            }
481        };
482
483        for mp in needs_add {
484            // `committed_offset` is `-1` when there is no committed event
485            // (full replay), so `+ 1` lands on the resume start offset (0).
486            let start_offset = self.committed_offset(mp) + 1;
487            self.assignment.add(PartitionStart {
488                partition: mp,
489                start_offset,
490            });
491            // Assign-but-NotReady when the HWM is unknown: the broker DOES
492            // own this partition, so leaving it Unassigned would wrongly
493            // return Ok(None). The sentinel makes the gate return NotReady.
494            self.ready_targets
495                .lock()
496                .expect("ready_targets poisoned")
497                .insert(mp, target_for(mp));
498        }
499        // Replace the sentinel for already-assigned partitions whose HWM is
500        // now known (the partition stays assigned; only its target changes).
501        for mp in needs_refresh {
502            let target = target_for(mp);
503            if target != HWM_UNKNOWN {
504                let mut guard = self.ready_targets.lock().expect("ready_targets poisoned");
505                // Only refresh if still assigned with the sentinel (a
506                // concurrent remove would have dropped it — see the
507                // single-task contract above).
508                if guard.get(&mp) == Some(&HWM_UNKNOWN) {
509                    guard.insert(mp, target);
510                }
511            }
512        }
513        for mp in have.difference(&want).copied() {
514            self.assignment.remove(mp);
515            self.ready_targets
516                .lock()
517                .expect("ready_targets poisoned")
518                .remove(&mp);
519        }
520    }
521
522    async fn wait_for_offset(&self, partition: i32, offset: i64) {
523        let idx = usize::try_from(partition).expect("partition non-negative");
524        let mut rx = self.applied_tx.subscribe();
525        loop {
526            {
527                let applied = self.applied.lock().expect("applied mutex poisoned");
528                if applied[idx] >= offset {
529                    return;
530                }
531            }
532            if rx.changed().await.is_err() {
533                return;
534            }
535        }
536    }
537
538    fn publish_and_wait(
539        &self,
540        tp: &TopicIdPartition,
541        event: Bytes,
542    ) -> Result<(), RemoteStorageError> {
543        let partition = metadata_partition_for(tp, self.log.partition_count());
544        let log = self.log.clone();
545        // Caller is on a non-runtime (spawn_blocking) thread; block_on
546        // is safe and gives us the assigned offset to wait on.
547        self.runtime.block_on(async {
548            let offset = log
549                .publish(partition, event)
550                .await
551                .map_err(MetadataLogError::into_storage)?;
552            self.wait_for_offset(partition, offset).await;
553            Ok::<_, RemoteStorageError>(())
554        })
555    }
556}
557
558impl Drop for TopicBasedRemoteLogMetadataManager {
559    fn drop(&mut self) {
560        self.shutdown.cancel();
561        if let Some(handle) = self.pump.lock().expect("pump mutex poisoned").take() {
562            handle.abort();
563        }
564        if let Some(handle) = self
565            .snapshotter
566            .lock()
567            .expect("snapshotter mutex poisoned")
568            .take()
569        {
570            handle.abort();
571        }
572    }
573}
574
575impl RemoteLogMetadataManager for TopicBasedRemoteLogMetadataManager {
576    fn add_remote_log_segment_metadata(
577        &self,
578        metadata: RemoteLogSegmentMetadata,
579    ) -> Result<(), RemoteStorageError> {
580        // Mirror the in-memory manager's eager precondition: fail
581        // fast before paying a round trip through Kafka.
582        if metadata.state() != RemoteLogSegmentState::CopySegmentStarted {
583            return Err(RemoteStorageError::InvalidAdd {
584                id: metadata.remote_log_segment_id().clone(),
585                reason: format!(
586                    "starting state must be CopySegmentStarted, got {:?}",
587                    metadata.state()
588                ),
589            });
590        }
591        let tp = metadata.remote_log_segment_id().topic_id_partition.clone();
592        let event = MetadataEvent::AddSegment(metadata).encode();
593        self.publish_and_wait(&tp, event)
594    }
595
596    fn update_remote_log_segment_metadata(
597        &self,
598        update: RemoteLogSegmentMetadataUpdate,
599    ) -> Result<(), RemoteStorageError> {
600        let tp = update.remote_log_segment_id.topic_id_partition.clone();
601        let event = MetadataEvent::UpdateSegment(update).encode();
602        self.publish_and_wait(&tp, event)
603    }
604
605    fn remote_log_segment_metadata(
606        &self,
607        topic_id_partition: &TopicIdPartition,
608        leader_epoch: i32,
609        offset: i64,
610    ) -> Result<Option<RemoteLogSegmentMetadata>, RemoteStorageError> {
611        let mp = metadata_partition_for(topic_id_partition, self.log.partition_count());
612        match self.metadata_partition_gate(mp) {
613            // Not this broker's partition → genuine miss, do NOT serve any
614            // stale cache.
615            ReadGate::Unassigned => Ok(None),
616            // Assigned but not caught up → retryable, distinct from a miss.
617            ReadGate::NotReady => Err(RemoteStorageError::NotReady { partition: mp }),
618            ReadGate::Ready => {
619                self.inner
620                    .remote_log_segment_metadata(topic_id_partition, leader_epoch, offset)
621            }
622        }
623    }
624
625    fn highest_offset_for_epoch(
626        &self,
627        topic_id_partition: &TopicIdPartition,
628        leader_epoch: i32,
629    ) -> Result<Option<i64>, RemoteStorageError> {
630        let mp = metadata_partition_for(topic_id_partition, self.log.partition_count());
631        match self.metadata_partition_gate(mp) {
632            ReadGate::Unassigned => Ok(None),
633            ReadGate::NotReady => Err(RemoteStorageError::NotReady { partition: mp }),
634            ReadGate::Ready => self
635                .inner
636                .highest_offset_for_epoch(topic_id_partition, leader_epoch),
637        }
638    }
639
640    fn list_remote_log_segments(
641        &self,
642        topic_id_partition: &TopicIdPartition,
643    ) -> Result<Vec<RemoteLogSegmentMetadata>, RemoteStorageError> {
644        let mp = metadata_partition_for(topic_id_partition, self.log.partition_count());
645        match self.metadata_partition_gate(mp) {
646            // Not this broker's partition → it does not own it, so it must
647            // not serve any stale segments it happened to consume earlier.
648            ReadGate::Unassigned => Ok(Vec::new()),
649            ReadGate::NotReady => Err(RemoteStorageError::NotReady { partition: mp }),
650            ReadGate::Ready => self.inner.list_remote_log_segments(topic_id_partition),
651        }
652    }
653
654    fn list_remote_log_segments_by_epoch(
655        &self,
656        topic_id_partition: &TopicIdPartition,
657        leader_epoch: i32,
658    ) -> Result<Vec<RemoteLogSegmentMetadata>, RemoteStorageError> {
659        self.inner
660            .list_remote_log_segments_by_epoch(topic_id_partition, leader_epoch)
661    }
662
663    fn put_remote_partition_delete_metadata(
664        &self,
665        metadata: RemotePartitionDeleteMetadata,
666    ) -> Result<(), RemoteStorageError> {
667        let tp = metadata.topic_id_partition.clone();
668        let event = MetadataEvent::PartitionDelete(metadata).encode();
669        self.publish_and_wait(&tp, event)
670    }
671}
672
673async fn pump_loop(
674    mut stream: MetadataEventStream,
675    inner: Arc<InmemoryRemoteLogMetadataManager>,
676    applied: Arc<std::sync::Mutex<Vec<i64>>>,
677    applied_tx: watch::Sender<u64>,
678    shutdown: CancellationToken,
679) {
680    let mut version: u64 = 0;
681    loop {
682        let next = tokio::select! {
683            biased;
684            () = shutdown.cancelled() => return,
685            n = stream.next() => n,
686        };
687        let Some(record) = next else { return };
688        match MetadataEvent::decode(&record.payload) {
689            Ok(MetadataEvent::AddSegment(md)) => {
690                if let Err(e) = inner.add_remote_log_segment_metadata(md) {
691                    warn!(error = ?e, partition = record.partition, offset = record.offset,
692                          "topic-based RLMM: add replay rejected");
693                }
694            }
695            Ok(MetadataEvent::UpdateSegment(u)) => {
696                if let Err(e) = inner.update_remote_log_segment_metadata(u) {
697                    warn!(error = ?e, partition = record.partition, offset = record.offset,
698                          "topic-based RLMM: update replay rejected");
699                }
700            }
701            Ok(MetadataEvent::PartitionDelete(d)) => {
702                if let Err(e) = inner.put_remote_partition_delete_metadata(d) {
703                    warn!(error = ?e, partition = record.partition, offset = record.offset,
704                          "topic-based RLMM: partition-delete replay rejected");
705                }
706            }
707            Err(e) => {
708                warn!(error = ?e, partition = record.partition, offset = record.offset,
709                      "topic-based RLMM: failed to decode event");
710            }
711        }
712        if let Ok(idx) = usize::try_from(record.partition) {
713            let mut a = applied.lock().expect("applied mutex poisoned");
714            if idx < a.len() && record.offset > a[idx] {
715                a[idx] = record.offset;
716            }
717        }
718        version = version.wrapping_add(1);
719        let _ = applied_tx.send(version);
720    }
721}
722
723#[cfg(test)]
724mod tests {
725    use super::*;
726    use assert2::assert;
727    use std::collections::BTreeMap;
728    use uuid::Uuid;
729
730    use crabka_remote_storage::{CustomMetadata, RemoteLogSegmentId, RemotePartitionDeleteState};
731
732    use crate::error::MetadataLogError;
733    use crate::log::{AssignmentHandle, InProcessMetadataEventLog, MetadataEventStream};
734
735    /// Test double that delegates to an inner [`InProcessMetadataEventLog`]
736    /// but can be told to fail `high_water_marks()` on demand. The
737    /// in-process fixture's HWM RPC always succeeds, which is why the rest
738    /// of the suite cannot exercise the C1 fail-closed path.
739    struct HwmFlakyLog {
740        inner: Arc<InProcessMetadataEventLog>,
741        fail_hwm: std::sync::atomic::AtomicBool,
742    }
743
744    impl HwmFlakyLog {
745        fn new(partition_count: i32) -> Arc<Self> {
746            Arc::new(Self {
747                inner: InProcessMetadataEventLog::new(partition_count),
748                fail_hwm: std::sync::atomic::AtomicBool::new(false),
749            })
750        }
751        fn set_fail_hwm(&self, fail: bool) {
752            self.fail_hwm
753                .store(fail, std::sync::atomic::Ordering::SeqCst);
754        }
755    }
756
757    #[async_trait::async_trait]
758    impl MetadataEventLog for HwmFlakyLog {
759        fn partition_count(&self) -> i32 {
760            self.inner.partition_count()
761        }
762        async fn publish(&self, partition: i32, event: Bytes) -> Result<i64, MetadataLogError> {
763            self.inner.publish(partition, event).await
764        }
765        fn subscribe(
766            &self,
767            assignment: Vec<PartitionStart>,
768        ) -> (MetadataEventStream, Arc<dyn AssignmentHandle>) {
769            self.inner.subscribe(assignment)
770        }
771        async fn high_water_marks(&self) -> Result<Vec<i64>, MetadataLogError> {
772            if self.fail_hwm.load(std::sync::atomic::Ordering::SeqCst) {
773                return Err(MetadataLogError::Other("injected HWM failure".into()));
774            }
775            self.inner.high_water_marks().await
776        }
777    }
778
779    static SNAP_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
780
781    fn snapshot_test_dir(label: &str) -> std::path::PathBuf {
782        std::env::temp_dir().join(format!(
783            "crabka-rlmm-{label}-{}-{}",
784            std::process::id(),
785            SNAP_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
786        ))
787    }
788
789    fn tp() -> TopicIdPartition {
790        TopicIdPartition::new(Uuid::from_u128(1), "orders", 0)
791    }
792
793    fn started(id: u128, start: i64, end: i64) -> RemoteLogSegmentMetadata {
794        RemoteLogSegmentMetadata::new(
795            RemoteLogSegmentId::new(tp(), Uuid::from_u128(id)),
796            start,
797            end,
798            end + 1,
799            1,
800            100,
801            2048,
802            RemoteLogSegmentState::CopySegmentStarted,
803            BTreeMap::from([(0, start)]),
804        )
805        .unwrap()
806    }
807
808    fn finish(id: u128) -> RemoteLogSegmentMetadataUpdate {
809        RemoteLogSegmentMetadataUpdate {
810            remote_log_segment_id: RemoteLogSegmentId::new(tp(), Uuid::from_u128(id)),
811            event_timestamp_ms: 200,
812            custom_metadata: Some(CustomMetadata(vec![7])),
813            state: RemoteLogSegmentState::CopySegmentFinished,
814            broker_id: 1,
815        }
816    }
817
818    /// Run the sync RLMM trait method on the blocking pool, exactly
819    /// like the broker does.
820    async fn on_blocking<T, F>(f: F) -> T
821    where
822        F: FnOnce() -> T + Send + 'static,
823        T: Send + 'static,
824    {
825        tokio::task::spawn_blocking(f).await.unwrap()
826    }
827
828    /// Poll until `tp` reads `Ok(Some)` (assigned + caught up), or panic.
829    async fn wait_ready(m: &Arc<TopicBasedRemoteLogMetadataManager>, tp: &TopicIdPartition) {
830        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
831        loop {
832            if matches!(m.remote_log_segment_metadata(tp, 0, 42), Ok(Some(_))) {
833                return;
834            }
835            assert!(
836                std::time::Instant::now() < deadline,
837                "partition never became ready"
838            );
839            tokio::time::sleep(std::time::Duration::from_millis(5)).await;
840        }
841    }
842
843    /// Start a manager that consumes NOTHING until the caller drives
844    /// `reconcile_assignment`. Used by the assignment/readiness tests, which
845    /// assert pre-assignment reads are a genuine miss.
846    async fn start_manager(
847        log: Arc<dyn MetadataEventLog>,
848    ) -> Arc<TopicBasedRemoteLogMetadataManager> {
849        TopicBasedRemoteLogMetadataManager::start(
850            log,
851            Handle::current(),
852            snapshot_test_dir("test"),
853            std::time::Duration::from_hours(1),
854        )
855        .await
856        .unwrap()
857    }
858
859    /// Start a manager and assign EVERY metadata partition (the eager
860    /// "consume all" behavior). Used by tests that publish through the
861    /// manager and read the result back, and by the multi-broker pre-seed
862    /// writers. Blocks until each non-empty partition has caught up to its
863    /// assignment-time HWM so a subsequent read does not race the pump.
864    async fn start_manager_all(
865        log: Arc<dyn MetadataEventLog>,
866    ) -> Arc<TopicBasedRemoteLogMetadataManager> {
867        let n = log.partition_count();
868        let m = start_manager(log).await;
869        let all: Vec<i32> = (0..n).collect();
870        m.reconcile_assignment(&all).await;
871        // Wait for the pump to catch up to every assigned partition's HWM so
872        // the manager is "ready" for all partitions, mirroring the old
873        // bootstrap contract.
874        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
875        while !all.iter().all(|&mp| m.metadata_partition_ready(mp)) {
876            assert!(
877                std::time::Instant::now() < deadline,
878                "manager did not catch up on all partitions within 5s"
879            );
880            tokio::time::sleep(std::time::Duration::from_millis(2)).await;
881        }
882        m
883    }
884
885    #[tokio::test(flavor = "multi_thread")]
886    async fn add_finish_query_round_trip() {
887        let log: Arc<dyn MetadataEventLog> = InProcessMetadataEventLog::new(4);
888        let m = start_manager_all(log).await;
889        let m2 = m.clone();
890        on_blocking(move || {
891            m2.add_remote_log_segment_metadata(started(10, 0, 99))
892                .unwrap();
893        })
894        .await;
895        let m2 = m.clone();
896        on_blocking(move || m2.update_remote_log_segment_metadata(finish(10)).unwrap()).await;
897
898        let got = m
899            .remote_log_segment_metadata(&tp(), 0, 42)
900            .unwrap()
901            .expect("segment found");
902        assert!(got.remote_log_segment_id().id == Uuid::from_u128(10));
903        assert!(got.custom_metadata() == Some(&CustomMetadata(vec![7])));
904        assert!(m.highest_offset_for_epoch(&tp(), 0).unwrap() == Some(99));
905        m.shutdown();
906    }
907
908    #[tokio::test(flavor = "multi_thread")]
909    async fn add_with_wrong_state_is_rejected_eagerly() {
910        let log: Arc<dyn MetadataEventLog> = InProcessMetadataEventLog::new(2);
911        let m = start_manager(log.clone()).await;
912        // Force a non-Started state via the lifecycle helper.
913        let bad = started(10, 0, 9).with_update(&finish(10)).unwrap();
914        let m2 = m.clone();
915        let err = on_blocking(move || m2.add_remote_log_segment_metadata(bad).unwrap_err()).await;
916        assert!(matches!(err, RemoteStorageError::InvalidAdd { .. }));
917        // Eager rejection means nothing was published.
918        assert!(log.high_water_marks().await.unwrap() == vec![0; 2]);
919        m.shutdown();
920    }
921
922    #[tokio::test(flavor = "multi_thread")]
923    async fn two_managers_sharing_a_log_converge() {
924        let log: Arc<dyn MetadataEventLog> = InProcessMetadataEventLog::new(4);
925        let a = start_manager_all(log.clone()).await;
926        let b = start_manager_all(log.clone()).await;
927
928        let a2 = a.clone();
929        on_blocking(move || {
930            a2.add_remote_log_segment_metadata(started(10, 0, 99))
931                .unwrap();
932        })
933        .await;
934        let a2 = a.clone();
935        on_blocking(move || a2.update_remote_log_segment_metadata(finish(10)).unwrap()).await;
936
937        // `b` must observe `a`'s writes once its pump has applied
938        // them. Poll up to 2s for the in-process broadcast to fan out.
939        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
940        while b.highest_offset_for_epoch(&tp(), 0).unwrap() != Some(99) {
941            assert!(
942                std::time::Instant::now() < deadline,
943                "manager B did not converge within 2s"
944            );
945            tokio::time::sleep(std::time::Duration::from_millis(5)).await;
946        }
947        assert!(b.highest_offset_for_epoch(&tp(), 0).unwrap() == Some(99));
948        let got = b
949            .remote_log_segment_metadata(&tp(), 0, 50)
950            .unwrap()
951            .unwrap();
952        assert!(got.remote_log_segment_id().id == Uuid::from_u128(10));
953
954        a.shutdown();
955        b.shutdown();
956    }
957
958    #[tokio::test(flavor = "multi_thread")]
959    async fn restart_rehydrates_from_log() {
960        let log: Arc<dyn MetadataEventLog> = InProcessMetadataEventLog::new(4);
961        {
962            let m = start_manager_all(log.clone()).await;
963            for (id, start, end) in [(10u128, 0, 99), (11, 100, 199), (12, 200, 299)] {
964                let m2 = m.clone();
965                on_blocking(move || {
966                    m2.add_remote_log_segment_metadata(started(id, start, end))
967                        .unwrap();
968                })
969                .await;
970                let m2 = m.clone();
971                on_blocking(move || m2.update_remote_log_segment_metadata(finish(id)).unwrap())
972                    .await;
973            }
974            m.shutdown();
975        }
976
977        // Fresh manager against the same log: assigning all partitions
978        // replays the full history before the read below.
979        let fresh = start_manager_all(log).await;
980        let listed = fresh.list_remote_log_segments(&tp()).unwrap();
981        assert!(listed.len() == 3);
982        assert!(listed[0].start_offset() == 0);
983        assert!(listed[2].end_offset() == 299);
984        assert!(fresh.highest_offset_for_epoch(&tp(), 0).unwrap() == Some(299));
985        fresh.shutdown();
986    }
987
988    #[tokio::test(flavor = "multi_thread")]
989    async fn partition_delete_lifecycle_round_trip() {
990        let log: Arc<dyn MetadataEventLog> = InProcessMetadataEventLog::new(2);
991        let m = start_manager_all(log).await;
992        for state in [
993            RemotePartitionDeleteState::DeletePartitionMarked,
994            RemotePartitionDeleteState::DeletePartitionStarted,
995            RemotePartitionDeleteState::DeletePartitionFinished,
996        ] {
997            let m2 = m.clone();
998            on_blocking(move || {
999                m2.put_remote_partition_delete_metadata(RemotePartitionDeleteMetadata {
1000                    topic_id_partition: tp(),
1001                    state,
1002                    event_timestamp_ms: 500,
1003                    broker_id: 1,
1004                })
1005                .unwrap();
1006            })
1007            .await;
1008        }
1009        m.shutdown();
1010    }
1011
1012    #[tokio::test(flavor = "multi_thread")]
1013    async fn shutdown_flushes_a_snapshot_covering_applied_events() {
1014        let dir = snapshot_test_dir("mgr-snap");
1015        let log: Arc<dyn MetadataEventLog> = InProcessMetadataEventLog::new(4);
1016        let m = TopicBasedRemoteLogMetadataManager::start(
1017            log.clone(),
1018            Handle::current(),
1019            dir.clone(),
1020            std::time::Duration::from_hours(1), // long interval: only shutdown flushes
1021        )
1022        .await
1023        .unwrap();
1024        m.reconcile_assignment(&(0..log.partition_count()).collect::<Vec<_>>())
1025            .await;
1026        let m2 = m.clone();
1027        on_blocking(move || {
1028            m2.add_remote_log_segment_metadata(started(10, 0, 99))
1029                .unwrap();
1030        })
1031        .await;
1032        let m2 = m.clone();
1033        on_blocking(move || m2.update_remote_log_segment_metadata(finish(10)).unwrap()).await;
1034
1035        m.shutdown_and_flush().await;
1036
1037        let path = dir.join(crate::snapshot::SNAPSHOT_FILE_NAME);
1038        let snap = crate::snapshot::Snapshot::load(&path)
1039            .unwrap()
1040            .expect("snapshot written");
1041        // The orders partition's committed offset covers both events.
1042        let p = crate::partitioning::metadata_partition_for(&tp(), 4);
1043        let idx = usize::try_from(p).unwrap();
1044        assert!(
1045            snap.committed_offsets[idx] >= 1,
1046            "committed >= last applied offset"
1047        );
1048        // The dump contains the finished segment.
1049        assert!(snap.dump.partitions.len() == 1);
1050        assert!(snap.dump.partitions[0].segments.len() == 1);
1051        std::fs::remove_dir_all(&dir).ok();
1052    }
1053
1054    #[tokio::test(flavor = "multi_thread")]
1055    async fn restart_resumes_from_snapshot_without_replaying_from_zero() {
1056        let dir = snapshot_test_dir("resume");
1057        let log: Arc<dyn MetadataEventLog> = InProcessMetadataEventLog::new(4);
1058        let interval = std::time::Duration::from_hours(1);
1059
1060        // First lifetime: seed three finished segments, then shutdown-flush.
1061        let pre_cache;
1062        {
1063            let m = TopicBasedRemoteLogMetadataManager::start(
1064                log.clone(),
1065                Handle::current(),
1066                dir.clone(),
1067                interval,
1068            )
1069            .await
1070            .unwrap();
1071            m.reconcile_assignment(&(0..log.partition_count()).collect::<Vec<_>>())
1072                .await;
1073            for (id, start, end) in [(10u128, 0, 99), (11, 100, 199), (12, 200, 299)] {
1074                let m2 = m.clone();
1075                on_blocking(move || {
1076                    m2.add_remote_log_segment_metadata(started(id, start, end))
1077                        .unwrap();
1078                })
1079                .await;
1080                let m2 = m.clone();
1081                on_blocking(move || m2.update_remote_log_segment_metadata(finish(id)).unwrap())
1082                    .await;
1083            }
1084            pre_cache = m.list_remote_log_segments(&tp()).unwrap();
1085            m.shutdown_and_flush().await;
1086        }
1087
1088        // Snapshot now records committed offset N for the orders partition.
1089        let p = crate::partitioning::metadata_partition_for(&tp(), 4);
1090        let idx = usize::try_from(p).unwrap();
1091        let snap = crate::snapshot::Snapshot::load(&dir.join(crate::snapshot::SNAPSHOT_FILE_NAME))
1092            .unwrap()
1093            .expect("snapshot present");
1094        let committed = snap.committed_offsets[idx];
1095        assert!(
1096            committed >= 5,
1097            "6 events (3 add + 3 finish) → committed >= 5"
1098        );
1099
1100        // The canonical resume computation resumes the orders partition at
1101        // committed + 1 (same path start() uses).
1102        let (resumed_committed, assignment) =
1103            TopicBasedRemoteLogMetadataManager::resume_from_snapshot(Some(&snap), 4);
1104        let orders_start = assignment
1105            .iter()
1106            .find(|s| s.partition == p)
1107            .map(|s| s.start_offset)
1108            .unwrap();
1109        assert!(orders_start == committed + 1, "resume from N+1, not 0");
1110        assert!(resumed_committed[idx] == committed);
1111
1112        // Second lifetime against the SAME log + dir: must resume, not replay.
1113        let fresh = TopicBasedRemoteLogMetadataManager::start(
1114            log.clone(),
1115            Handle::current(),
1116            dir.clone(),
1117            interval,
1118        )
1119        .await
1120        .unwrap();
1121        // The manager exposes the same committed offset via its canonical
1122        // accessor used by the assignment reconciler.
1123        assert!(fresh.committed_offset(p) == committed);
1124        // Assign every partition and wait for catch-up so the gated read
1125        // methods delegate to the (snapshot-seeded) inner cache. The orders
1126        // partition has no backlog past `committed`, so it is ready as soon
1127        // as the assignment-time HWM is recorded.
1128        fresh
1129            .reconcile_assignment(&(0..log.partition_count()).collect::<Vec<_>>())
1130            .await;
1131        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
1132        while !fresh.metadata_partition_ready(p) {
1133            assert!(
1134                std::time::Instant::now() < deadline,
1135                "fresh manager did not catch up on the orders partition"
1136            );
1137            tokio::time::sleep(std::time::Duration::from_millis(2)).await;
1138        }
1139        let post_cache = fresh.list_remote_log_segments(&tp()).unwrap();
1140        assert!(
1141            post_cache == pre_cache,
1142            "post-load cache equals pre-restart cache"
1143        );
1144        assert!(fresh.highest_offset_for_epoch(&tp(), 0).unwrap() == Some(299));
1145        fresh.shutdown();
1146        std::fs::remove_dir_all(&dir).ok();
1147    }
1148
1149    #[tokio::test(flavor = "multi_thread")]
1150    async fn add_then_remove_drives_assignment_and_readiness() {
1151        use crate::partitioning::metadata_partition_for;
1152
1153        let log: Arc<dyn MetadataEventLog> = InProcessMetadataEventLog::new(4);
1154        // Pre-seed a finished segment for `tp()` so a ready read returns Some.
1155        {
1156            let writer = start_manager_all(log.clone()).await;
1157            let w2 = writer.clone();
1158            on_blocking(move || {
1159                w2.add_remote_log_segment_metadata(started(10, 0, 99))
1160                    .unwrap();
1161            })
1162            .await;
1163            let w2 = writer.clone();
1164            on_blocking(move || w2.update_remote_log_segment_metadata(finish(10)).unwrap()).await;
1165            writer.shutdown();
1166        }
1167
1168        let mp = metadata_partition_for(&tp(), log.partition_count());
1169        let m = start_manager(log).await;
1170
1171        // Before assignment: the partition is not consumed → genuine miss.
1172        assert!(matches!(
1173            m.remote_log_segment_metadata(&tp(), 0, 42),
1174            Ok(None)
1175        ));
1176
1177        // Assign it. add() must enqueue a PartitionStart for `mp`, and the
1178        // pump catches up; once applied >= HWM-1 the read returns Some.
1179        m.reconcile_assignment(&[mp]).await;
1180        assert!(m.assigned_metadata_partitions() == vec![mp]);
1181
1182        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
1183        loop {
1184            match m.remote_log_segment_metadata(&tp(), 0, 42) {
1185                Ok(Some(md)) => {
1186                    assert!(md.remote_log_segment_id().id == Uuid::from_u128(10));
1187                    break;
1188                }
1189                Err(RemoteStorageError::NotReady { partition }) => {
1190                    assert!(partition == mp, "NotReady names the catching-up partition");
1191                    assert!(
1192                        std::time::Instant::now() < deadline,
1193                        "metadata partition never became ready"
1194                    );
1195                    tokio::time::sleep(std::time::Duration::from_millis(5)).await;
1196                }
1197                other => panic!("unexpected read outcome: {other:?}"),
1198            }
1199        }
1200
1201        // Remove it: assignment drops, and subsequent reads are a genuine
1202        // miss (Ok(None)) — the partition is no longer consumed.
1203        m.reconcile_assignment(&[]).await;
1204        assert!(m.assigned_metadata_partitions().is_empty());
1205        assert!(matches!(
1206            m.remote_log_segment_metadata(&tp(), 0, 42),
1207            Ok(None)
1208        ));
1209        m.shutdown();
1210    }
1211
1212    #[tokio::test(flavor = "multi_thread")]
1213    async fn unknown_partition_query_is_none() {
1214        let log: Arc<dyn MetadataEventLog> = InProcessMetadataEventLog::new(2);
1215        let m = start_manager(log).await;
1216        let other = TopicIdPartition::new(Uuid::from_u128(999), "nope", 0);
1217        assert!(m.remote_log_segment_metadata(&other, 0, 0).unwrap() == None);
1218        assert!(m.highest_offset_for_epoch(&other, 0).unwrap() == None);
1219        assert!(m.list_remote_log_segments(&other).unwrap().is_empty());
1220        m.shutdown();
1221    }
1222
1223    #[tokio::test(flavor = "multi_thread")]
1224    async fn two_brokers_split_metadata_partitions() {
1225        use crate::partitioning::metadata_partition_for;
1226
1227        // Use a wide metadata topic so two user-partitions land in distinct
1228        // buckets.
1229        let n = 16;
1230        let topic_id = Uuid::from_u128(0xFEED);
1231        let tp_a = TopicIdPartition::new(topic_id, "orders", 0);
1232        let tp_b = TopicIdPartition::new(topic_id, "orders", 1);
1233        let mp_a = metadata_partition_for(&tp_a, n);
1234        let mp_b = metadata_partition_for(&tp_b, n);
1235        assert!(
1236            mp_a != mp_b,
1237            "test needs the two partitions in distinct buckets"
1238        );
1239
1240        let log: Arc<dyn MetadataEventLog> = InProcessMetadataEventLog::new(n);
1241
1242        // Seed one finished segment for each user-partition via a transient
1243        // writer (consumes all partitions, no assignment gating).
1244        for (tp, id) in [(tp_a.clone(), 100u128), (tp_b.clone(), 200)] {
1245            let w = start_manager_all(log.clone()).await;
1246            let started = RemoteLogSegmentMetadata::new(
1247                RemoteLogSegmentId::new(tp.clone(), Uuid::from_u128(id)),
1248                0,
1249                99,
1250                100,
1251                1,
1252                100,
1253                2048,
1254                RemoteLogSegmentState::CopySegmentStarted,
1255                BTreeMap::from([(0, 0)]),
1256            )
1257            .unwrap();
1258            let w2 = w.clone();
1259            on_blocking(move || w2.add_remote_log_segment_metadata(started).unwrap()).await;
1260            let upd = RemoteLogSegmentMetadataUpdate {
1261                remote_log_segment_id: RemoteLogSegmentId::new(tp, Uuid::from_u128(id)),
1262                event_timestamp_ms: 200,
1263                custom_metadata: None,
1264                state: RemoteLogSegmentState::CopySegmentFinished,
1265                broker_id: 1,
1266            };
1267            let w2 = w.clone();
1268            on_blocking(move || w2.update_remote_log_segment_metadata(upd).unwrap()).await;
1269            w.shutdown();
1270        }
1271
1272        // Broker A consumes mp_a only; Broker B consumes mp_b only.
1273        let a = start_manager(log.clone()).await;
1274        let b = start_manager(log).await;
1275        a.reconcile_assignment(&[mp_a]).await;
1276        b.reconcile_assignment(&[mp_b]).await;
1277
1278        assert!(a.assigned_metadata_partitions() == vec![mp_a]);
1279        assert!(b.assigned_metadata_partitions() == vec![mp_b]);
1280        // Disjoint shares.
1281        assert!(
1282            a.assigned_metadata_partitions()
1283                .iter()
1284                .all(|p| !b.assigned_metadata_partitions().contains(p)),
1285            "shares must be disjoint"
1286        );
1287
1288        // Poll until each is caught up and serves its own partition.
1289        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
1290        loop {
1291            let a_own = a.remote_log_segment_metadata(&tp_a, 0, 42);
1292            let b_own = b.remote_log_segment_metadata(&tp_b, 0, 42);
1293            if matches!(a_own, Ok(Some(_))) && matches!(b_own, Ok(Some(_))) {
1294                break;
1295            }
1296            assert!(
1297                std::time::Instant::now() < deadline,
1298                "managers did not catch up: a={a_own:?} b={b_own:?}"
1299            );
1300            tokio::time::sleep(std::time::Duration::from_millis(5)).await;
1301        }
1302
1303        // Cross reads (partition the broker does NOT consume) are a genuine
1304        // miss, not NotReady.
1305        assert!(
1306            matches!(a.remote_log_segment_metadata(&tp_b, 0, 42), Ok(None)),
1307            "A does not consume mp_b → genuine miss"
1308        );
1309        assert!(
1310            matches!(b.remote_log_segment_metadata(&tp_a, 0, 42), Ok(None)),
1311            "B does not consume mp_a → genuine miss"
1312        );
1313
1314        a.shutdown();
1315        b.shutdown();
1316    }
1317
1318    /// Runtime `remove` then `add` reassignment must not
1319    /// double-deliver a metadata partition's events into the cache. A
1320    /// re-applied `AddSegment` is harmlessly rejected by the lifecycle state
1321    /// machine, so the segment list stays at exactly one entry — proving no
1322    /// duplicate corruption after remove + re-add.
1323    #[tokio::test(flavor = "multi_thread")]
1324    async fn reassignment_remove_then_readd_applies_no_duplicates() {
1325        use crate::partitioning::metadata_partition_for;
1326
1327        let log: Arc<dyn MetadataEventLog> = InProcessMetadataEventLog::new(4);
1328        // Pre-seed a single finished segment for `tp()`.
1329        {
1330            let writer = start_manager_all(log.clone()).await;
1331            let w2 = writer.clone();
1332            on_blocking(move || {
1333                w2.add_remote_log_segment_metadata(started(10, 0, 99))
1334                    .unwrap();
1335            })
1336            .await;
1337            let w2 = writer.clone();
1338            on_blocking(move || w2.update_remote_log_segment_metadata(finish(10)).unwrap()).await;
1339            writer.shutdown();
1340        }
1341
1342        let mp = metadata_partition_for(&tp(), log.partition_count());
1343        let m = start_manager(log).await;
1344
1345        // Add → catch up → exactly one segment.
1346        m.reconcile_assignment(&[mp]).await;
1347        wait_ready(&m, &tp()).await;
1348        assert!(
1349            m.list_remote_log_segments(&tp()).unwrap().len() == 1,
1350            "one segment after first assignment"
1351        );
1352
1353        // Remove (drops the live fetch task mid-flight if one is running) …
1354        m.reconcile_assignment(&[]).await;
1355        assert!(m.assigned_metadata_partitions().is_empty());
1356
1357        // … then re-add. The pump re-injects the backlog from the resume
1358        // offset; the re-applied AddSegment is rejected by the lifecycle
1359        // machine, so NO duplicate lands in the cache.
1360        m.reconcile_assignment(&[mp]).await;
1361        wait_ready(&m, &tp()).await;
1362
1363        let listed = m.list_remote_log_segments(&tp()).unwrap();
1364        assert!(
1365            listed.len() == 1,
1366            "remove + re-add must not duplicate the segment, got {listed:?}"
1367        );
1368        assert!(listed[0].remote_log_segment_id().id == Uuid::from_u128(10));
1369        // The finished state survived (no half-applied duplicate update).
1370        assert!(m.highest_offset_for_epoch(&tp(), 0).unwrap() == Some(99));
1371
1372        m.shutdown();
1373    }
1374
1375    /// C1: a HWM-fetch failure must fail CLOSED. When `high_water_marks`
1376    /// errors at assignment time, the newly-added partition must gate
1377    /// `NotReady` (retryable) — NEVER `Ok(None)` (a false end-of-tier) —
1378    /// and the sentinel must self-heal on a later reconcile once the HWM
1379    /// fetch succeeds.
1380    #[tokio::test(flavor = "multi_thread")]
1381    async fn hwm_fetch_failure_gates_not_ready_then_self_heals() {
1382        use crate::partitioning::metadata_partition_for;
1383
1384        let flaky = HwmFlakyLog::new(4);
1385        let log: Arc<dyn MetadataEventLog> = flaky.clone();
1386
1387        // Pre-seed a finished segment for `tp()` via a healthy writer (HWM
1388        // not failing yet), so a ready read would return Some.
1389        {
1390            let writer = start_manager_all(log.clone()).await;
1391            let w2 = writer.clone();
1392            on_blocking(move || {
1393                w2.add_remote_log_segment_metadata(started(10, 0, 99))
1394                    .unwrap();
1395            })
1396            .await;
1397            let w2 = writer.clone();
1398            on_blocking(move || w2.update_remote_log_segment_metadata(finish(10)).unwrap()).await;
1399            writer.shutdown();
1400        }
1401
1402        let mp = metadata_partition_for(&tp(), log.partition_count());
1403        let m = start_manager(log).await;
1404
1405        // Assign the partition WHILE the HWM RPC is failing. The partition
1406        // must be added (the broker owns it) but recorded with the sentinel
1407        // target so the gate returns NotReady, not Ok(None).
1408        flaky.set_fail_hwm(true);
1409        m.reconcile_assignment(&[mp]).await;
1410        assert!(
1411            m.assigned_metadata_partitions() == vec![mp],
1412            "partition is assigned even though HWM is unknown (broker owns it)"
1413        );
1414
1415        // Give the pump ample time to drain the backlog. Even fully caught
1416        // up, the read must stay NotReady because the real HWM is unknown —
1417        // it must NEVER collapse to Ok(None).
1418        let deadline = std::time::Instant::now() + std::time::Duration::from_millis(300);
1419        while std::time::Instant::now() < deadline {
1420            match m.remote_log_segment_metadata(&tp(), 0, 42) {
1421                Err(RemoteStorageError::NotReady { partition }) => assert!(partition == mp),
1422                other => panic!("HWM-unknown partition must read NotReady, got {other:?}"),
1423            }
1424            // The list path is gated the same way.
1425            match m.list_remote_log_segments(&tp()) {
1426                Err(RemoteStorageError::NotReady { partition }) => assert!(partition == mp),
1427                other => panic!("HWM-unknown partition list must be NotReady, got {other:?}"),
1428            }
1429            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1430        }
1431
1432        // Recover: HWM fetch now succeeds. A subsequent reconcile (which the
1433        // broker drives on each image change / tick) must replace the
1434        // sentinel with the real target. Once the pump has caught up the read
1435        // returns Some.
1436        flaky.set_fail_hwm(false);
1437        m.reconcile_assignment(&[mp]).await;
1438        let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
1439        loop {
1440            match m.remote_log_segment_metadata(&tp(), 0, 42) {
1441                Ok(Some(md)) => {
1442                    assert!(md.remote_log_segment_id().id == Uuid::from_u128(10));
1443                    break;
1444                }
1445                Err(RemoteStorageError::NotReady { partition }) => {
1446                    assert!(partition == mp);
1447                    assert!(
1448                        std::time::Instant::now() < deadline,
1449                        "partition never became ready after HWM recovered"
1450                    );
1451                    tokio::time::sleep(std::time::Duration::from_millis(5)).await;
1452                }
1453                other => panic!("unexpected read outcome after recovery: {other:?}"),
1454            }
1455        }
1456        // The list path is now Ready too.
1457        assert!(m.list_remote_log_segments(&tp()).unwrap().len() == 1);
1458        assert!(m.highest_offset_for_epoch(&tp(), 0).unwrap() == Some(99));
1459
1460        m.shutdown();
1461    }
1462}