1use std::collections::HashMap;
18use std::pin::Pin;
19use std::sync::atomic::AtomicU64;
20use std::sync::{Arc, Mutex};
21
22use async_trait::async_trait;
23use bytes::Bytes;
24use futures_util::stream::{Stream, unfold};
25use futures_util::{StreamExt, stream};
26use tokio::sync::{broadcast, mpsc};
27
28use crate::error::MetadataLogError;
29
30#[derive(Debug, Clone)]
32pub struct MetadataEventRecord {
33 pub partition: i32,
35 pub offset: i64,
37 pub payload: Bytes,
39}
40
41pub type MetadataEventStream = Pin<Box<dyn Stream<Item = MetadataEventRecord> + Send + 'static>>;
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub struct PartitionStart {
47 pub partition: i32,
49 pub start_offset: i64,
51}
52
53pub trait AssignmentHandle: Send + Sync {
57 fn add(&self, start: PartitionStart);
62 fn remove(&self, partition: i32);
65 fn assigned(&self) -> Vec<i32>;
67}
68
69#[async_trait]
84pub trait MetadataEventLog: Send + Sync {
85 fn partition_count(&self) -> i32;
89
90 async fn publish(&self, partition: i32, event: Bytes) -> Result<i64, MetadataLogError>;
98
99 fn subscribe(
108 &self,
109 assignment: Vec<PartitionStart>,
110 ) -> (MetadataEventStream, Arc<dyn AssignmentHandle>);
111
112 async fn high_water_marks(&self) -> Result<Vec<i64>, MetadataLogError>;
120}
121
122pub struct InProcessMetadataEventLog {
126 inner: Arc<InProcessInner>,
127}
128
129#[derive(Debug, Clone, Copy)]
131struct PartitionCursor {
132 next: i64,
135 via_inject: bool,
144}
145
146struct SubscriptionState {
150 assigned: Mutex<HashMap<i32, PartitionCursor>>,
152 inject: mpsc::UnboundedSender<MetadataEventRecord>,
155}
156
157struct InProcessInner {
158 log: Mutex<Vec<Vec<Bytes>>>,
160 tx: broadcast::Sender<MetadataEventRecord>,
162 partition_count: i32,
164 subscriptions: Mutex<HashMap<u64, Arc<SubscriptionState>>>,
167 next_sub_id: AtomicU64,
169}
170
171impl InProcessMetadataEventLog {
172 #[must_use]
178 pub fn new(partition_count: i32) -> Arc<Self> {
179 assert!(partition_count > 0, "partition_count must be positive");
180 let cap = usize::try_from(partition_count).expect("partition_count fits in usize");
181 let (tx, _rx) = broadcast::channel(1024);
182 Arc::new(Self {
183 inner: Arc::new(InProcessInner {
184 log: Mutex::new(vec![Vec::new(); cap]),
185 tx,
186 partition_count,
187 subscriptions: Mutex::new(HashMap::new()),
188 next_sub_id: AtomicU64::new(0),
189 }),
190 })
191 }
192}
193
194#[async_trait]
195impl MetadataEventLog for InProcessMetadataEventLog {
196 fn partition_count(&self) -> i32 {
197 self.inner.partition_count
198 }
199
200 async fn publish(&self, partition: i32, event: Bytes) -> Result<i64, MetadataLogError> {
201 if partition < 0 || partition >= self.inner.partition_count {
202 return Err(MetadataLogError::PartitionOutOfRange {
203 partition,
204 count: self.inner.partition_count,
205 });
206 }
207 let mut guard = self.inner.log.lock().expect("metadata-log mutex poisoned");
211 let idx = usize::try_from(partition).expect("partition non-negative");
212 let log_for_p = &mut guard[idx];
213 let offset = i64::try_from(log_for_p.len()).expect("offset fits in i64");
214 log_for_p.push(event.clone());
215 let record = MetadataEventRecord {
216 partition,
217 offset,
218 payload: event,
219 };
220 let _ = self.inner.tx.send(record);
224 Ok(offset)
225 }
226
227 fn subscribe(
228 &self,
229 assignment: Vec<PartitionStart>,
230 ) -> (MetadataEventStream, Arc<dyn AssignmentHandle>) {
231 use std::sync::atomic::Ordering;
232
233 let guard = self.inner.log.lock().expect("metadata-log mutex poisoned");
236 let rx = self.inner.tx.subscribe();
237
238 let mut assigned: HashMap<i32, PartitionCursor> = HashMap::new();
242 let mut snapshot: Vec<MetadataEventRecord> = Vec::new();
243 for ps in &assignment {
244 let Ok(idx) = usize::try_from(ps.partition) else {
245 continue;
246 };
247 if idx >= guard.len() {
248 continue;
249 }
250 let records = &guard[idx];
251 let begin = usize::try_from(ps.start_offset.max(0)).unwrap_or(usize::MAX);
252 for (offset, payload) in records.iter().enumerate().skip(begin) {
253 snapshot.push(MetadataEventRecord {
254 partition: ps.partition,
255 offset: i64::try_from(offset).expect("offset fits in i64"),
256 payload: payload.clone(),
257 });
258 }
259 assigned.insert(
260 ps.partition,
261 PartitionCursor {
262 next: i64::try_from(records.len()).expect("len fits in i64"),
263 via_inject: false,
266 },
267 );
268 }
269
270 let (inject_tx, inject_rx) = mpsc::unbounded_channel::<MetadataEventRecord>();
271 let state = Arc::new(SubscriptionState {
272 assigned: Mutex::new(assigned),
273 inject: inject_tx,
274 });
275 let sub_id = self.inner.next_sub_id.fetch_add(1, Ordering::Relaxed);
276 self.inner
277 .subscriptions
278 .lock()
279 .expect("metadata-log subscriptions mutex poisoned")
280 .insert(sub_id, state.clone());
281 drop(guard);
282
283 let snapshot_stream = stream::iter(snapshot);
284 let inject_stream = unfold(inject_rx, |mut rx| async move {
285 rx.recv().await.map(|r| (r, rx))
286 });
287 let live = filtered_broadcast(rx, state.clone());
288 let merged = stream::select(inject_stream, live);
291 let stream = snapshot_stream.chain(merged).boxed();
292
293 let handle: Arc<dyn AssignmentHandle> = Arc::new(InProcessAssignmentHandle {
294 inner: self.inner.clone(),
295 sub_id,
296 });
297 (stream, handle)
298 }
299
300 async fn high_water_marks(&self) -> Result<Vec<i64>, MetadataLogError> {
301 let guard = self.inner.log.lock().expect("metadata-log mutex poisoned");
302 Ok(guard
303 .iter()
304 .map(|v| i64::try_from(v.len()).expect("hwm fits in i64"))
305 .collect())
306 }
307}
308
309struct InProcessAssignmentHandle {
310 inner: Arc<InProcessInner>,
311 sub_id: u64,
312}
313
314impl Drop for InProcessAssignmentHandle {
315 fn drop(&mut self) {
316 if let Ok(mut subs) = self.inner.subscriptions.lock() {
323 subs.remove(&self.sub_id);
324 }
325 }
326}
327
328impl AssignmentHandle for InProcessAssignmentHandle {
329 fn add(&self, start: PartitionStart) {
330 let subs = self
331 .inner
332 .subscriptions
333 .lock()
334 .expect("metadata-log subscriptions mutex poisoned");
335 let Some(state) = subs.get(&self.sub_id).cloned() else {
336 return;
337 };
338 drop(subs);
339 let log = self.inner.log.lock().expect("metadata-log mutex poisoned");
344 let mut assigned = state.assigned.lock().expect("assigned mutex poisoned");
345 if assigned.contains_key(&start.partition) {
346 return; }
348 let idx = match usize::try_from(start.partition) {
349 Ok(i) if i < log.len() => i,
350 _ => return, };
352 let records = &log[idx];
353 let begin = usize::try_from(start.start_offset.max(0)).unwrap_or(usize::MAX);
354 for (offset, payload) in records.iter().enumerate().skip(begin) {
355 let _ = state.inject.send(MetadataEventRecord {
356 partition: start.partition,
357 offset: i64::try_from(offset).expect("offset fits in i64"),
358 payload: payload.clone(),
359 });
360 }
361 let next_live = i64::try_from(records.len()).expect("len fits in i64");
368 assigned.insert(
369 start.partition,
370 PartitionCursor {
371 next: next_live,
372 via_inject: true,
373 },
374 );
375 }
376
377 fn remove(&self, partition: i32) {
378 let subs = self
379 .inner
380 .subscriptions
381 .lock()
382 .expect("metadata-log subscriptions mutex poisoned");
383 if let Some(state) = subs.get(&self.sub_id) {
384 state
385 .assigned
386 .lock()
387 .expect("assigned mutex poisoned")
388 .remove(&partition);
389 }
390 }
391
392 fn assigned(&self) -> Vec<i32> {
393 let subs = self
394 .inner
395 .subscriptions
396 .lock()
397 .expect("metadata-log subscriptions mutex poisoned");
398 let Some(state) = subs.get(&self.sub_id) else {
399 return Vec::new();
400 };
401 let mut v: Vec<i32> = state
402 .assigned
403 .lock()
404 .expect("assigned mutex poisoned")
405 .keys()
406 .copied()
407 .collect();
408 v.sort_unstable();
409 v
410 }
411}
412
413enum Forward {
415 Emit,
417 Inject,
419 Drop,
421}
422
423fn filtered_broadcast(
432 rx: broadcast::Receiver<MetadataEventRecord>,
433 state: Arc<SubscriptionState>,
434) -> MetadataEventStream {
435 unfold((rx, state), |(mut rx, state)| async move {
436 loop {
437 match rx.recv().await {
438 Ok(record) => {
439 let action = {
440 let assigned = state.assigned.lock().expect("assigned mutex poisoned");
441 match assigned.get(&record.partition) {
442 Some(cur) if record.offset >= cur.next => {
443 if cur.via_inject {
444 Forward::Inject
445 } else {
446 Forward::Emit
447 }
448 }
449 _ => Forward::Drop,
450 }
451 };
452 match action {
453 Forward::Emit => return Some((record, (rx, state))),
454 Forward::Inject => {
455 let _ = state.inject.send(record);
458 }
459 Forward::Drop => {}
460 }
461 }
462 Err(broadcast::error::RecvError::Lagged(_)) => {
463 }
468 Err(broadcast::error::RecvError::Closed) => return None,
469 }
470 }
471 })
472 .boxed()
473}
474
475#[cfg(test)]
476mod tests {
477 use super::*;
478 use assert2::assert;
479 use futures_util::StreamExt;
480
481 #[tokio::test]
482 async fn publish_assigns_monotonic_offsets() {
483 let log = InProcessMetadataEventLog::new(2);
484 assert!(log.publish(0, Bytes::from_static(b"a")).await.unwrap() == 0);
485 assert!(log.publish(0, Bytes::from_static(b"b")).await.unwrap() == 1);
486 assert!(log.publish(1, Bytes::from_static(b"c")).await.unwrap() == 0);
487 let hwms = log.high_water_marks().await.unwrap();
488 assert!(hwms == vec![2, 1]);
489 }
490
491 #[tokio::test]
492 async fn subscribe_replays_history_then_forwards_new_writes() {
493 let log = InProcessMetadataEventLog::new(1);
494 log.publish(0, Bytes::from_static(b"a")).await.unwrap();
495 log.publish(0, Bytes::from_static(b"b")).await.unwrap();
496 let (mut stream, _h) = log.subscribe(vec![PartitionStart {
497 partition: 0,
498 start_offset: 0,
499 }]);
500 let a = stream.next().await.unwrap();
501 let b = stream.next().await.unwrap();
502 assert!(a.payload.as_ref() == b"a");
503 assert!(b.payload.as_ref() == b"b");
504 log.publish(0, Bytes::from_static(b"c")).await.unwrap();
505 let c = stream.next().await.unwrap();
506 assert!(c.payload.as_ref() == b"c");
507 assert!((c.partition, c.offset) == (0, 2));
508 }
509
510 #[tokio::test]
511 async fn subscribe_attached_after_history_still_sees_history() {
512 let log = InProcessMetadataEventLog::new(1);
513 for i in 0..5 {
514 log.publish(0, Bytes::copy_from_slice(&[i])).await.unwrap();
515 }
516 let (mut stream, _h) = log.subscribe(vec![PartitionStart {
517 partition: 0,
518 start_offset: 0,
519 }]);
520 for i in 0..5 {
521 let r = stream.next().await.unwrap();
522 assert!(r.payload.as_ref() == &[i]);
523 assert!(r.offset == i64::from(i));
524 }
525 }
526
527 #[tokio::test]
528 async fn publish_out_of_range_is_rejected() {
529 let log = InProcessMetadataEventLog::new(2);
530 let err = log.publish(5, Bytes::from_static(b"x")).await.unwrap_err();
531 assert!(matches!(err, MetadataLogError::PartitionOutOfRange { .. }));
532 }
533
534 #[tokio::test]
535 async fn two_subscribers_see_the_same_history() {
536 let log = InProcessMetadataEventLog::new(1);
537 log.publish(0, Bytes::from_static(b"a")).await.unwrap();
538 let (mut s1, _h1) = log.subscribe(vec![PartitionStart {
539 partition: 0,
540 start_offset: 0,
541 }]);
542 let (mut s2, _h2) = log.subscribe(vec![PartitionStart {
543 partition: 0,
544 start_offset: 0,
545 }]);
546 log.publish(0, Bytes::from_static(b"b")).await.unwrap();
547 for s in [&mut s1, &mut s2] {
548 assert!(s.next().await.unwrap().payload.as_ref() == b"a");
549 assert!(s.next().await.unwrap().payload.as_ref() == b"b");
550 }
551 }
552
553 #[tokio::test]
554 async fn subscribe_delivers_only_assigned_partitions_from_start_offset() {
555 let log = InProcessMetadataEventLog::new(3);
556 for p0 in [b"a".as_slice(), b"b", b"c"] {
558 log.publish(0, Bytes::copy_from_slice(p0)).await.unwrap();
559 }
560 for p1 in [b"x".as_slice(), b"y"] {
561 log.publish(1, Bytes::copy_from_slice(p1)).await.unwrap();
562 }
563 log.publish(2, Bytes::from_static(b"z")).await.unwrap();
564
565 let (mut stream, _h) = log.subscribe(vec![
568 PartitionStart {
569 partition: 0,
570 start_offset: 1,
571 },
572 PartitionStart {
573 partition: 1,
574 start_offset: 0,
575 },
576 ]);
577
578 let mut got: Vec<(i32, i64, Vec<u8>)> = Vec::new();
579 for _ in 0..3 {
580 let r = stream.next().await.unwrap();
581 got.push((r.partition, r.offset, r.payload.to_vec()));
582 }
583 got.sort();
584 assert!(
585 got == vec![
586 (0, 1, b"b".to_vec()),
587 (0, 2, b"c".to_vec()),
588 (1, 0, b"x".to_vec()),
589 ]
590 );
591 let r = stream.next().await.unwrap();
593 assert!((r.partition, r.offset, r.payload.as_ref()) == (1, 1, b"y".as_ref()));
594 }
595
596 #[tokio::test]
597 async fn live_appends_only_for_assigned_partitions() {
598 let log = InProcessMetadataEventLog::new(2);
599 let (mut stream, _h) = log.subscribe(vec![PartitionStart {
600 partition: 0,
601 start_offset: 0,
602 }]);
603 log.publish(1, Bytes::from_static(b"skip")).await.unwrap();
605 log.publish(0, Bytes::from_static(b"keep")).await.unwrap();
606 let r = stream.next().await.unwrap();
607 assert!((r.partition, r.payload.as_ref()) == (0, b"keep".as_ref()));
608 }
609
610 #[tokio::test]
611 async fn add_mid_stream_delivers_backlog_then_live() {
612 let log = InProcessMetadataEventLog::new(2);
613 for v in [b"old0".as_slice(), b"old1", b"old2"] {
615 log.publish(1, Bytes::copy_from_slice(v)).await.unwrap();
616 }
617 let (mut stream, handle) = log.subscribe(vec![PartitionStart {
618 partition: 0,
619 start_offset: 0,
620 }]);
621 handle.add(PartitionStart {
635 partition: 1,
636 start_offset: 0,
637 });
638 log.publish(1, Bytes::from_static(b"new")).await.unwrap();
639
640 let mut got = Vec::new();
641 for _ in 0..4 {
642 let r = stream.next().await.unwrap();
643 got.push((r.partition, r.offset, r.payload.to_vec()));
644 }
645 assert!(
646 got == vec![
647 (1, 0, b"old0".to_vec()),
648 (1, 1, b"old1".to_vec()),
649 (1, 2, b"old2".to_vec()),
650 (1, 3, b"new".to_vec()),
651 ],
652 "backlog must drain fully (in offset order) before the live append"
653 );
654 assert!(handle.assigned().contains(&1));
655 }
656
657 #[tokio::test]
658 async fn dropping_handle_evicts_subscription_state() {
659 let log = InProcessMetadataEventLog::new(1);
660 let (_stream, handle) = log.subscribe(vec![PartitionStart {
661 partition: 0,
662 start_offset: 0,
663 }]);
664 assert!(log.inner.subscriptions.lock().unwrap().len() == 1);
665 drop(handle);
666 assert!(
667 log.inner.subscriptions.lock().unwrap().len() == 0,
668 "subscription state must be evicted when the handle drops"
669 );
670 }
671
672 #[tokio::test]
673 async fn remove_stops_delivery() {
674 let log = InProcessMetadataEventLog::new(2);
675 let (mut stream, handle) = log.subscribe(vec![
676 PartitionStart {
677 partition: 0,
678 start_offset: 0,
679 },
680 PartitionStart {
681 partition: 1,
682 start_offset: 0,
683 },
684 ]);
685 handle.remove(1);
686 assert!(handle.assigned() == vec![0]);
687 log.publish(1, Bytes::from_static(b"gone")).await.unwrap();
688 log.publish(0, Bytes::from_static(b"here")).await.unwrap();
689 let r = stream.next().await.unwrap();
690 assert!((r.partition, r.payload.as_ref()) == (0, b"here".as_ref()));
691 }
692}