Skip to main content

crabka_remote_storage_topic/
log.rs

1//! [`MetadataEventLog`]: the publish/subscribe seam between the
2//! [`TopicBasedRemoteLogMetadataManager`](crate::TopicBasedRemoteLogMetadataManager)
3//! and the underlying durable event store.
4//!
5//! The in-process implementation, [`InProcessMetadataEventLog`], is an
6//! in-memory broadcast-channel fixture used by unit tests (and a
7//! single-process model for the multi-broker case — multiple manager
8//! instances that share the same fixture observe each other's writes).
9//! The production Kafka-backed adapter implements the same trait.
10//!
11//! Instead of consuming every partition from offset 0,
12//! [`MetadataEventLog::subscribe`] takes an
13//! explicit [`PartitionStart`] assignment (a subset of partitions, each
14//! with its own start offset) and returns an [`AssignmentHandle`] that
15//! can mutate the live assignment at runtime.
16
17use std::collections::HashMap;
18use std::pin::Pin;
19use std::sync::atomic::AtomicU64;
20use std::sync::{Arc, Mutex};
21
22use async_trait::async_trait;
23use bytes::Bytes;
24use futures_util::stream::{Stream, unfold};
25use futures_util::{StreamExt, stream};
26use tokio::sync::{broadcast, mpsc};
27
28use crate::error::MetadataLogError;
29
30/// One event read from the metadata log.
31#[derive(Debug, Clone)]
32pub struct MetadataEventRecord {
33    /// Metadata-topic partition the event came from.
34    pub partition: i32,
35    /// Offset within that partition.
36    pub offset: i64,
37    /// Encoded event payload (see [`crate::serde`]).
38    pub payload: Bytes,
39}
40
41/// Boxed event stream the [`MetadataEventLog`] hands to subscribers.
42pub type MetadataEventStream = Pin<Box<dyn Stream<Item = MetadataEventRecord> + Send + 'static>>;
43
44/// One partition to consume and the offset to begin at (inclusive).
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub struct PartitionStart {
47    /// Metadata-topic partition to consume.
48    pub partition: i32,
49    /// First offset to deliver (inclusive). `0` replays from the start.
50    pub start_offset: i64,
51}
52
53/// Runtime control over a live [`MetadataEventLog`] subscription's
54/// assigned partition set. Returned alongside the stream by
55/// [`MetadataEventLog::subscribe`].
56pub trait AssignmentHandle: Send + Sync {
57    /// Begin consuming `start.partition` from `start.start_offset`;
58    /// no-op if already assigned. A newly-added partition emits its
59    /// backlog (from `start_offset`) into the existing stream, then
60    /// live records.
61    fn add(&self, start: PartitionStart);
62    /// Stop consuming `partition` and stop emitting its events. No-op
63    /// if not currently assigned.
64    fn remove(&self, partition: i32);
65    /// Current assigned partition set (unordered).
66    fn assigned(&self) -> Vec<i32>;
67}
68
69/// Publish/subscribe transport that backs the `__remote_log_metadata`
70/// topic.
71///
72/// Implementations must guarantee:
73///
74/// - `publish(p, _)` resolves to a monotonically-increasing offset
75///   within partition `p`, and the assigned offset is also what every
76///   subscriber observes for that record.
77/// - The stream returned by `subscribe` replays each assigned
78///   partition's backlog from its `start_offset` and then forwards new
79///   records as they are published for currently-assigned partitions.
80///   Subscribers attached after some records were already published
81///   still see the history at/after their start offset.
82/// - Records are delivered in publish order on a per-partition basis.
83#[async_trait]
84pub trait MetadataEventLog: Send + Sync {
85    /// Number of partitions the log holds. Stable for the lifetime of
86    /// the log; the manager hashes user partitions into
87    /// `[0, partition_count())`.
88    fn partition_count(&self) -> i32;
89
90    /// Append `event` to `partition`. Resolves to the assigned offset.
91    ///
92    /// # Errors
93    ///
94    /// Returns [`MetadataLogError`] if the transport refused the
95    /// write — e.g. the partition is out of range, or the log has
96    /// been closed.
97    async fn publish(&self, partition: i32, event: Bytes) -> Result<i64, MetadataLogError>;
98
99    /// Start consuming the given partitions, each from its start
100    /// offset (inclusive). Returns the event stream plus a handle to
101    /// mutate the live assignment.
102    ///
103    /// The stream replays each assigned partition's backlog from its
104    /// `start_offset`, then forwards live appends for the currently
105    /// assigned partitions. Records are delivered in publish order on
106    /// a per-partition basis.
107    fn subscribe(
108        &self,
109        assignment: Vec<PartitionStart>,
110    ) -> (MetadataEventStream, Arc<dyn AssignmentHandle>);
111
112    /// One past the highest written offset for each partition,
113    /// indexed by partition.
114    ///
115    /// # Errors
116    ///
117    /// Returns [`MetadataLogError`] only on an underlying store
118    /// failure; an empty partition is `0`, not an error.
119    async fn high_water_marks(&self) -> Result<Vec<i64>, MetadataLogError>;
120}
121
122/// Single-process [`MetadataEventLog`] used by unit tests and as the
123/// multi-broker fixture (multiple manager instances cloning the same
124/// `Arc` observe each other's writes).
125pub struct InProcessMetadataEventLog {
126    inner: Arc<InProcessInner>,
127}
128
129/// Live-assignment cursor for one partition within a subscription.
130#[derive(Debug, Clone, Copy)]
131struct PartitionCursor {
132    /// Next offset that has NOT yet been delivered by the backlog/live
133    /// path. Records below this are filtered out.
134    next: i64,
135    /// When set, live records for this partition are forwarded through
136    /// the `inject` FIFO rather than emitted directly on the broadcast
137    /// path. A partition added mid-stream sets this so its live appends
138    /// queue *behind* its already-injected backlog — otherwise
139    /// `stream::select` could interleave a live record ahead of undrained
140    /// backlog, violating per-partition publish order. Initially-assigned
141    /// partitions leave this `false`: their backlog goes through the
142    /// chained snapshot stream, which fully drains before any live record.
143    via_inject: bool,
144}
145
146/// Per-subscription live assignment + a sender to inject backlog when a
147/// partition is added mid-stream. Keyed by a monotonically-increasing
148/// subscription id so multiple subscribers stay independent.
149struct SubscriptionState {
150    /// partition -> cursor. Presence in the map == assigned.
151    assigned: Mutex<HashMap<i32, PartitionCursor>>,
152    /// Inject backlog (and, for `add`-ed partitions, live) records in
153    /// FIFO publish order.
154    inject: mpsc::UnboundedSender<MetadataEventRecord>,
155}
156
157struct InProcessInner {
158    /// `log[partition][offset] = encoded event payload`.
159    log: Mutex<Vec<Vec<Bytes>>>,
160    /// Notify subscribers of new writes.
161    tx: broadcast::Sender<MetadataEventRecord>,
162    /// Constant for the life of the log.
163    partition_count: i32,
164    /// Live subscriptions, keyed by id, for assignment filtering and
165    /// mid-stream backlog injection.
166    subscriptions: Mutex<HashMap<u64, Arc<SubscriptionState>>>,
167    /// Allocates subscription ids.
168    next_sub_id: AtomicU64,
169}
170
171impl InProcessMetadataEventLog {
172    /// Construct an empty log with `partition_count` partitions.
173    ///
174    /// # Panics
175    ///
176    /// Panics when `partition_count <= 0`.
177    #[must_use]
178    pub fn new(partition_count: i32) -> Arc<Self> {
179        assert!(partition_count > 0, "partition_count must be positive");
180        let cap = usize::try_from(partition_count).expect("partition_count fits in usize");
181        let (tx, _rx) = broadcast::channel(1024);
182        Arc::new(Self {
183            inner: Arc::new(InProcessInner {
184                log: Mutex::new(vec![Vec::new(); cap]),
185                tx,
186                partition_count,
187                subscriptions: Mutex::new(HashMap::new()),
188                next_sub_id: AtomicU64::new(0),
189            }),
190        })
191    }
192}
193
194#[async_trait]
195impl MetadataEventLog for InProcessMetadataEventLog {
196    fn partition_count(&self) -> i32 {
197        self.inner.partition_count
198    }
199
200    async fn publish(&self, partition: i32, event: Bytes) -> Result<i64, MetadataLogError> {
201        if partition < 0 || partition >= self.inner.partition_count {
202            return Err(MetadataLogError::PartitionOutOfRange {
203                partition,
204                count: self.inner.partition_count,
205            });
206        }
207        // Hold the partition lock across the broadcast.send so that any
208        // concurrent subscribe() observes either the appended record in
209        // its snapshot or as a forwarded broadcast — never both.
210        let mut guard = self.inner.log.lock().expect("metadata-log mutex poisoned");
211        let idx = usize::try_from(partition).expect("partition non-negative");
212        let log_for_p = &mut guard[idx];
213        let offset = i64::try_from(log_for_p.len()).expect("offset fits in i64");
214        log_for_p.push(event.clone());
215        let record = MetadataEventRecord {
216            partition,
217            offset,
218            payload: event,
219        };
220        // `send` only errors when there are no active receivers; that
221        // is fine — the record is still durable in the in-memory log
222        // and any later subscriber's snapshot will see it.
223        let _ = self.inner.tx.send(record);
224        Ok(offset)
225    }
226
227    fn subscribe(
228        &self,
229        assignment: Vec<PartitionStart>,
230    ) -> (MetadataEventStream, Arc<dyn AssignmentHandle>) {
231        use std::sync::atomic::Ordering;
232
233        // Bracket snapshot + broadcast subscribe under the log lock so each
234        // published record is seen exactly once (snapshot xor live).
235        let guard = self.inner.log.lock().expect("metadata-log mutex poisoned");
236        let rx = self.inner.tx.subscribe();
237
238        // Initial assigned set: partition -> next live offset (= current
239        // len), so the broadcast path forwards only records published after
240        // subscribe; everything earlier comes from the snapshot below.
241        let mut assigned: HashMap<i32, PartitionCursor> = HashMap::new();
242        let mut snapshot: Vec<MetadataEventRecord> = Vec::new();
243        for ps in &assignment {
244            let Ok(idx) = usize::try_from(ps.partition) else {
245                continue;
246            };
247            if idx >= guard.len() {
248                continue;
249            }
250            let records = &guard[idx];
251            let begin = usize::try_from(ps.start_offset.max(0)).unwrap_or(usize::MAX);
252            for (offset, payload) in records.iter().enumerate().skip(begin) {
253                snapshot.push(MetadataEventRecord {
254                    partition: ps.partition,
255                    offset: i64::try_from(offset).expect("offset fits in i64"),
256                    payload: payload.clone(),
257                });
258            }
259            assigned.insert(
260                ps.partition,
261                PartitionCursor {
262                    next: i64::try_from(records.len()).expect("len fits in i64"),
263                    // Initially-assigned: backlog rides the chained
264                    // snapshot stream, so live records can go direct.
265                    via_inject: false,
266                },
267            );
268        }
269
270        let (inject_tx, inject_rx) = mpsc::unbounded_channel::<MetadataEventRecord>();
271        let state = Arc::new(SubscriptionState {
272            assigned: Mutex::new(assigned),
273            inject: inject_tx,
274        });
275        let sub_id = self.inner.next_sub_id.fetch_add(1, Ordering::Relaxed);
276        self.inner
277            .subscriptions
278            .lock()
279            .expect("metadata-log subscriptions mutex poisoned")
280            .insert(sub_id, state.clone());
281        drop(guard);
282
283        let snapshot_stream = stream::iter(snapshot);
284        let inject_stream = unfold(inject_rx, |mut rx| async move {
285            rx.recv().await.map(|r| (r, rx))
286        });
287        let live = filtered_broadcast(rx, state.clone());
288        // Snapshot first (subscribe-time backlog), then a merge of injected
289        // backlog (from `add`) and assignment-filtered live records.
290        let merged = stream::select(inject_stream, live);
291        let stream = snapshot_stream.chain(merged).boxed();
292
293        let handle: Arc<dyn AssignmentHandle> = Arc::new(InProcessAssignmentHandle {
294            inner: self.inner.clone(),
295            sub_id,
296        });
297        (stream, handle)
298    }
299
300    async fn high_water_marks(&self) -> Result<Vec<i64>, MetadataLogError> {
301        let guard = self.inner.log.lock().expect("metadata-log mutex poisoned");
302        Ok(guard
303            .iter()
304            .map(|v| i64::try_from(v.len()).expect("hwm fits in i64"))
305            .collect())
306    }
307}
308
309struct InProcessAssignmentHandle {
310    inner: Arc<InProcessInner>,
311    sub_id: u64,
312}
313
314impl Drop for InProcessAssignmentHandle {
315    fn drop(&mut self) {
316        // Evict this subscription's state so the map does not grow
317        // without bound as subscriptions come and go. The stream's live
318        // filter holds its own `Arc<SubscriptionState>`, so dropping the
319        // map entry never affects an in-flight stream — only `add`/
320        // `remove`/`assigned` (which go through the handle) stop working,
321        // and the handle is gone.
322        if let Ok(mut subs) = self.inner.subscriptions.lock() {
323            subs.remove(&self.sub_id);
324        }
325    }
326}
327
328impl AssignmentHandle for InProcessAssignmentHandle {
329    fn add(&self, start: PartitionStart) {
330        let subs = self
331            .inner
332            .subscriptions
333            .lock()
334            .expect("metadata-log subscriptions mutex poisoned");
335        let Some(state) = subs.get(&self.sub_id).cloned() else {
336            return;
337        };
338        drop(subs);
339        // Hold the log lock so the backlog snapshot + the assigned
340        // insert bracket every concurrent publish exactly once: a
341        // publish either lands in the snapshot we inject here, or it is
342        // forwarded live (because `assigned` already contains it).
343        let log = self.inner.log.lock().expect("metadata-log mutex poisoned");
344        let mut assigned = state.assigned.lock().expect("assigned mutex poisoned");
345        if assigned.contains_key(&start.partition) {
346            return; // already assigned: no-op
347        }
348        let idx = match usize::try_from(start.partition) {
349            Ok(i) if i < log.len() => i,
350            _ => return, // out of range: ignore
351        };
352        let records = &log[idx];
353        let begin = usize::try_from(start.start_offset.max(0)).unwrap_or(usize::MAX);
354        for (offset, payload) in records.iter().enumerate().skip(begin) {
355            let _ = state.inject.send(MetadataEventRecord {
356                partition: start.partition,
357                offset: i64::try_from(offset).expect("offset fits in i64"),
358                payload: payload.clone(),
359            });
360        }
361        // Live records at or after the current end are forwarded by the
362        // broadcast path once `assigned` contains the partition. They are
363        // routed through `inject` (via_inject) so they queue *behind* the
364        // backlog we just pushed above, preserving per-partition publish
365        // order: stream::select must not interleave a live record ahead of
366        // undrained backlog.
367        let next_live = i64::try_from(records.len()).expect("len fits in i64");
368        assigned.insert(
369            start.partition,
370            PartitionCursor {
371                next: next_live,
372                via_inject: true,
373            },
374        );
375    }
376
377    fn remove(&self, partition: i32) {
378        let subs = self
379            .inner
380            .subscriptions
381            .lock()
382            .expect("metadata-log subscriptions mutex poisoned");
383        if let Some(state) = subs.get(&self.sub_id) {
384            state
385                .assigned
386                .lock()
387                .expect("assigned mutex poisoned")
388                .remove(&partition);
389        }
390    }
391
392    fn assigned(&self) -> Vec<i32> {
393        let subs = self
394            .inner
395            .subscriptions
396            .lock()
397            .expect("metadata-log subscriptions mutex poisoned");
398        let Some(state) = subs.get(&self.sub_id) else {
399            return Vec::new();
400        };
401        let mut v: Vec<i32> = state
402            .assigned
403            .lock()
404            .expect("assigned mutex poisoned")
405            .keys()
406            .copied()
407            .collect();
408        v.sort_unstable();
409        v
410    }
411}
412
413/// What [`filtered_broadcast`] does with a received live record.
414enum Forward {
415    /// Emit directly on the broadcast stream.
416    Emit,
417    /// Re-route through the `inject` FIFO (added-mid-stream partition).
418    Inject,
419    /// Not assigned / below cursor: discard.
420    Drop,
421}
422
423/// Forward a broadcast record only when its partition is currently
424/// assigned and its offset is at/after the recorded live cursor for
425/// that partition.
426///
427/// For a partition added mid-stream (`via_inject`), a passing record is
428/// re-routed into the `inject` FIFO instead of being emitted here, so it
429/// is ordered behind that partition's already-injected backlog rather
430/// than racing it through `stream::select`.
431fn filtered_broadcast(
432    rx: broadcast::Receiver<MetadataEventRecord>,
433    state: Arc<SubscriptionState>,
434) -> MetadataEventStream {
435    unfold((rx, state), |(mut rx, state)| async move {
436        loop {
437            match rx.recv().await {
438                Ok(record) => {
439                    let action = {
440                        let assigned = state.assigned.lock().expect("assigned mutex poisoned");
441                        match assigned.get(&record.partition) {
442                            Some(cur) if record.offset >= cur.next => {
443                                if cur.via_inject {
444                                    Forward::Inject
445                                } else {
446                                    Forward::Emit
447                                }
448                            }
449                            _ => Forward::Drop,
450                        }
451                    };
452                    match action {
453                        Forward::Emit => return Some((record, (rx, state))),
454                        Forward::Inject => {
455                            // Queue behind backlog; if the receiver is gone
456                            // the stream is being dropped anyway.
457                            let _ = state.inject.send(record);
458                        }
459                        Forward::Drop => {}
460                    }
461                }
462                Err(broadcast::error::RecvError::Lagged(_)) => {
463                    // The in-memory snapshot already supplied earlier
464                    // records; a lag only happens when the consumer
465                    // pump fell behind a single-process write burst
466                    // that overflowed the broadcast capacity (1024).
467                }
468                Err(broadcast::error::RecvError::Closed) => return None,
469            }
470        }
471    })
472    .boxed()
473}
474
475#[cfg(test)]
476mod tests {
477    use super::*;
478    use assert2::assert;
479    use futures_util::StreamExt;
480
481    #[tokio::test]
482    async fn publish_assigns_monotonic_offsets() {
483        let log = InProcessMetadataEventLog::new(2);
484        assert!(log.publish(0, Bytes::from_static(b"a")).await.unwrap() == 0);
485        assert!(log.publish(0, Bytes::from_static(b"b")).await.unwrap() == 1);
486        assert!(log.publish(1, Bytes::from_static(b"c")).await.unwrap() == 0);
487        let hwms = log.high_water_marks().await.unwrap();
488        assert!(hwms == vec![2, 1]);
489    }
490
491    #[tokio::test]
492    async fn subscribe_replays_history_then_forwards_new_writes() {
493        let log = InProcessMetadataEventLog::new(1);
494        log.publish(0, Bytes::from_static(b"a")).await.unwrap();
495        log.publish(0, Bytes::from_static(b"b")).await.unwrap();
496        let (mut stream, _h) = log.subscribe(vec![PartitionStart {
497            partition: 0,
498            start_offset: 0,
499        }]);
500        let a = stream.next().await.unwrap();
501        let b = stream.next().await.unwrap();
502        assert!(a.payload.as_ref() == b"a");
503        assert!(b.payload.as_ref() == b"b");
504        log.publish(0, Bytes::from_static(b"c")).await.unwrap();
505        let c = stream.next().await.unwrap();
506        assert!(c.payload.as_ref() == b"c");
507        assert!((c.partition, c.offset) == (0, 2));
508    }
509
510    #[tokio::test]
511    async fn subscribe_attached_after_history_still_sees_history() {
512        let log = InProcessMetadataEventLog::new(1);
513        for i in 0..5 {
514            log.publish(0, Bytes::copy_from_slice(&[i])).await.unwrap();
515        }
516        let (mut stream, _h) = log.subscribe(vec![PartitionStart {
517            partition: 0,
518            start_offset: 0,
519        }]);
520        for i in 0..5 {
521            let r = stream.next().await.unwrap();
522            assert!(r.payload.as_ref() == &[i]);
523            assert!(r.offset == i64::from(i));
524        }
525    }
526
527    #[tokio::test]
528    async fn publish_out_of_range_is_rejected() {
529        let log = InProcessMetadataEventLog::new(2);
530        let err = log.publish(5, Bytes::from_static(b"x")).await.unwrap_err();
531        assert!(matches!(err, MetadataLogError::PartitionOutOfRange { .. }));
532    }
533
534    #[tokio::test]
535    async fn two_subscribers_see_the_same_history() {
536        let log = InProcessMetadataEventLog::new(1);
537        log.publish(0, Bytes::from_static(b"a")).await.unwrap();
538        let (mut s1, _h1) = log.subscribe(vec![PartitionStart {
539            partition: 0,
540            start_offset: 0,
541        }]);
542        let (mut s2, _h2) = log.subscribe(vec![PartitionStart {
543            partition: 0,
544            start_offset: 0,
545        }]);
546        log.publish(0, Bytes::from_static(b"b")).await.unwrap();
547        for s in [&mut s1, &mut s2] {
548            assert!(s.next().await.unwrap().payload.as_ref() == b"a");
549            assert!(s.next().await.unwrap().payload.as_ref() == b"b");
550        }
551    }
552
553    #[tokio::test]
554    async fn subscribe_delivers_only_assigned_partitions_from_start_offset() {
555        let log = InProcessMetadataEventLog::new(3);
556        // partition 0: a,b,c ; partition 1: x,y ; partition 2: z
557        for p0 in [b"a".as_slice(), b"b", b"c"] {
558            log.publish(0, Bytes::copy_from_slice(p0)).await.unwrap();
559        }
560        for p1 in [b"x".as_slice(), b"y"] {
561            log.publish(1, Bytes::copy_from_slice(p1)).await.unwrap();
562        }
563        log.publish(2, Bytes::from_static(b"z")).await.unwrap();
564
565        // Assign partition 0 from offset 1 and partition 1 from offset 0;
566        // partition 2 is NOT assigned.
567        let (mut stream, _h) = log.subscribe(vec![
568            PartitionStart {
569                partition: 0,
570                start_offset: 1,
571            },
572            PartitionStart {
573                partition: 1,
574                start_offset: 0,
575            },
576        ]);
577
578        let mut got: Vec<(i32, i64, Vec<u8>)> = Vec::new();
579        for _ in 0..3 {
580            let r = stream.next().await.unwrap();
581            got.push((r.partition, r.offset, r.payload.to_vec()));
582        }
583        got.sort();
584        assert!(
585            got == vec![
586                (0, 1, b"b".to_vec()),
587                (0, 2, b"c".to_vec()),
588                (1, 0, b"x".to_vec()),
589            ]
590        );
591        // partition 1 offset 1 ("y") is the only remaining assigned record.
592        let r = stream.next().await.unwrap();
593        assert!((r.partition, r.offset, r.payload.as_ref()) == (1, 1, b"y".as_ref()));
594    }
595
596    #[tokio::test]
597    async fn live_appends_only_for_assigned_partitions() {
598        let log = InProcessMetadataEventLog::new(2);
599        let (mut stream, _h) = log.subscribe(vec![PartitionStart {
600            partition: 0,
601            start_offset: 0,
602        }]);
603        // Unassigned partition write must not appear.
604        log.publish(1, Bytes::from_static(b"skip")).await.unwrap();
605        log.publish(0, Bytes::from_static(b"keep")).await.unwrap();
606        let r = stream.next().await.unwrap();
607        assert!((r.partition, r.payload.as_ref()) == (0, b"keep".as_ref()));
608    }
609
610    #[tokio::test]
611    async fn add_mid_stream_delivers_backlog_then_live() {
612        let log = InProcessMetadataEventLog::new(2);
613        // Three backlog records on partition 1 (offsets 0,1,2).
614        for v in [b"old0".as_slice(), b"old1", b"old2"] {
615            log.publish(1, Bytes::copy_from_slice(v)).await.unwrap();
616        }
617        let (mut stream, handle) = log.subscribe(vec![PartitionStart {
618            partition: 0,
619            start_offset: 0,
620        }]);
621        // Add partition 1 from offset 0, then publish a live append
622        // IMMEDIATELY — without first draining the injected backlog.
623        //
624        // This is the ordering trap. The merged stream is
625        // `stream::select(inject_stream, live)`, which round-robins
626        // between its two inputs when both have a ready item. Under the
627        // OLD behavior the live "new" (offset 3) is emitted by the `live`
628        // input directly, so select interleaves it between backlog items:
629        // old0(inject), new(live), old1(inject), old2(inject) — "new"
630        // jumps ahead of old1/old2, violating per-partition publish
631        // order. The fix routes a just-added partition's live records
632        // through the SAME inject FIFO, so they queue strictly behind the
633        // backlog: old0, old1, old2, new.
634        handle.add(PartitionStart {
635            partition: 1,
636            start_offset: 0,
637        });
638        log.publish(1, Bytes::from_static(b"new")).await.unwrap();
639
640        let mut got = Vec::new();
641        for _ in 0..4 {
642            let r = stream.next().await.unwrap();
643            got.push((r.partition, r.offset, r.payload.to_vec()));
644        }
645        assert!(
646            got == vec![
647                (1, 0, b"old0".to_vec()),
648                (1, 1, b"old1".to_vec()),
649                (1, 2, b"old2".to_vec()),
650                (1, 3, b"new".to_vec()),
651            ],
652            "backlog must drain fully (in offset order) before the live append"
653        );
654        assert!(handle.assigned().contains(&1));
655    }
656
657    #[tokio::test]
658    async fn dropping_handle_evicts_subscription_state() {
659        let log = InProcessMetadataEventLog::new(1);
660        let (_stream, handle) = log.subscribe(vec![PartitionStart {
661            partition: 0,
662            start_offset: 0,
663        }]);
664        assert!(log.inner.subscriptions.lock().unwrap().len() == 1);
665        drop(handle);
666        assert!(
667            log.inner.subscriptions.lock().unwrap().len() == 0,
668            "subscription state must be evicted when the handle drops"
669        );
670    }
671
672    #[tokio::test]
673    async fn remove_stops_delivery() {
674        let log = InProcessMetadataEventLog::new(2);
675        let (mut stream, handle) = log.subscribe(vec![
676            PartitionStart {
677                partition: 0,
678                start_offset: 0,
679            },
680            PartitionStart {
681                partition: 1,
682                start_offset: 0,
683            },
684        ]);
685        handle.remove(1);
686        assert!(handle.assigned() == vec![0]);
687        log.publish(1, Bytes::from_static(b"gone")).await.unwrap();
688        log.publish(0, Bytes::from_static(b"here")).await.unwrap();
689        let r = stream.next().await.unwrap();
690        assert!((r.partition, r.payload.as_ref()) == (0, b"here".as_ref()));
691    }
692}