pub struct TopicBasedRemoteLogMetadataManager { /* private fields */ }Expand description
Production RemoteLogMetadataManager backed by the
__remote_log_metadata topic (via a MetadataEventLog
adapter).
Construct with Self::start; it loads any on-disk snapshot but
consumes no metadata partitions until Self::reconcile_assignment
adds the broker’s leader/follower-derived set.
Implementations§
Source§impl TopicBasedRemoteLogMetadataManager
impl TopicBasedRemoteLogMetadataManager
Sourcepub async fn start(
log: Arc<dyn MetadataEventLog>,
runtime: Handle,
snapshot_dir: PathBuf,
snapshot_interval: Duration,
) -> Result<Arc<Self>, RemoteStorageError>
pub async fn start( log: Arc<dyn MetadataEventLog>, runtime: Handle, snapshot_dir: PathBuf, snapshot_interval: Duration, ) -> Result<Arc<Self>, RemoteStorageError>
Load any on-disk snapshot into the cache and spawn the consumer
pump with an empty assignment. The manager consumes nothing until
Self::reconcile_assignment is driven (by the broker).
runtime must be a Tokio runtime handle that lives at least
as long as the returned manager. The synchronous
RemoteLogMetadataManager methods bridge to this handle via
block_on, so they must NOT be called from a task running on
this same runtime — the broker only invokes them through
spawn_blocking, which is the only supported call pattern.
§Errors
Currently infallible (the consumed set starts empty), but returns a
Result so the bootstrap contract stays stable if start regains a
fallible step.
Sourcepub fn shutdown(&self)
pub fn shutdown(&self)
Cancel the consumer pump. Read methods continue to work against whatever was applied before shutdown; mutation methods will time out / fail to make progress.
Sourcepub async fn shutdown_and_flush(&self)
pub async fn shutdown_and_flush(&self)
Cancel the pump + snapshotter, then write a final snapshot capturing everything applied so far. Safe to call once on graceful shutdown.
Sourcepub fn committed_offset(&self, partition: i32) -> i64
pub fn committed_offset(&self, partition: i32) -> i64
Committed offset loaded from the snapshot for a single metadata
partition, or -1 when the partition is out of range or had no
committed event (full replay). The assignment reconciler uses
this to start a dynamically-added partition at committed + 1.
Sourcepub fn assigned_metadata_partitions(&self) -> Vec<i32>
pub fn assigned_metadata_partitions(&self) -> Vec<i32>
The metadata partitions this manager is currently assigned (tracked for readiness). Sorted ascending.
Sourcepub async fn reconcile_assignment(&self, desired: &[i32])
pub async fn reconcile_assignment(&self, desired: &[i32])
Diff desired against the current assignment and drive the
AssignmentHandle: add newly-needed partitions (seeded from the
snapshot committed offset + 1, falling back to 0 when there is
no committed event) and remove ones no longer needed. Records each
added partition’s assignment-time HWM so reads gate on NotReady
until the pump catches up.
HWM-fetch failure fails CLOSED: a partition whose real high-water
mark could not be obtained is recorded with the HWM_UNKNOWN
sentinel target so the gate returns NotReady (retryable), never a
false Ok(None). Such partitions are re-attempted on every
subsequent reconcile (which the broker drives on each image change /
reconciler tick), so a transient high_water_marks failure
self-heals: the sentinel is replaced with the real target as soon as
the fetch succeeds.
MUST be driven by a SINGLE task. This method is not internally
serialized — it interleaves .await points with reads/writes of the
ready_targets map under short, non-overlapping locks — so two
concurrent callers could race the add/remove/refresh logic.
Correctness relies on the broker invoking it from exactly one
reconciler task.
Async because it reads the log’s high-water marks; the broker calls
it from its reconciler task (on the runtime), never from a
spawn_blocking thread.
Trait Implementations§
Source§impl RemoteLogMetadataManager for TopicBasedRemoteLogMetadataManager
impl RemoteLogMetadataManager for TopicBasedRemoteLogMetadataManager
Source§fn add_remote_log_segment_metadata(
&self,
metadata: RemoteLogSegmentMetadata,
) -> Result<(), RemoteStorageError>
fn add_remote_log_segment_metadata( &self, metadata: RemoteLogSegmentMetadata, ) -> Result<(), RemoteStorageError>
CopySegmentStarted
and its id must not already be known. Read moreSource§fn update_remote_log_segment_metadata(
&self,
update: RemoteLogSegmentMetadataUpdate,
) -> Result<(), RemoteStorageError>
fn update_remote_log_segment_metadata( &self, update: RemoteLogSegmentMetadataUpdate, ) -> Result<(), RemoteStorageError>
Source§fn remote_log_segment_metadata(
&self,
topic_id_partition: &TopicIdPartition,
leader_epoch: i32,
offset: i64,
) -> Result<Option<RemoteLogSegmentMetadata>, RemoteStorageError>
fn remote_log_segment_metadata( &self, topic_id_partition: &TopicIdPartition, leader_epoch: i32, offset: i64, ) -> Result<Option<RemoteLogSegmentMetadata>, RemoteStorageError>
Source§fn highest_offset_for_epoch(
&self,
topic_id_partition: &TopicIdPartition,
leader_epoch: i32,
) -> Result<Option<i64>, RemoteStorageError>
fn highest_offset_for_epoch( &self, topic_id_partition: &TopicIdPartition, leader_epoch: i32, ) -> Result<Option<i64>, RemoteStorageError>
leader_epoch
(the max end_offset across finished segments carrying that epoch). Read moreSource§fn list_remote_log_segments(
&self,
topic_id_partition: &TopicIdPartition,
) -> Result<Vec<RemoteLogSegmentMetadata>, RemoteStorageError>
fn list_remote_log_segments( &self, topic_id_partition: &TopicIdPartition, ) -> Result<Vec<RemoteLogSegmentMetadata>, RemoteStorageError>
start_offset
(regardless of state). Read moreSource§fn list_remote_log_segments_by_epoch(
&self,
topic_id_partition: &TopicIdPartition,
leader_epoch: i32,
) -> Result<Vec<RemoteLogSegmentMetadata>, RemoteStorageError>
fn list_remote_log_segments_by_epoch( &self, topic_id_partition: &TopicIdPartition, leader_epoch: i32, ) -> Result<Vec<RemoteLogSegmentMetadata>, RemoteStorageError>
Source§fn put_remote_partition_delete_metadata(
&self,
metadata: RemotePartitionDeleteMetadata,
) -> Result<(), RemoteStorageError>
fn put_remote_partition_delete_metadata( &self, metadata: RemotePartitionDeleteMetadata, ) -> Result<(), RemoteStorageError>
Auto Trait Implementations§
impl !Freeze for TopicBasedRemoteLogMetadataManager
impl !RefUnwindSafe for TopicBasedRemoteLogMetadataManager
impl !UnwindSafe for TopicBasedRemoteLogMetadataManager
impl Send for TopicBasedRemoteLogMetadataManager
impl Sync for TopicBasedRemoteLogMetadataManager
impl Unpin for TopicBasedRemoteLogMetadataManager
impl UnsafeUnpin for TopicBasedRemoteLogMetadataManager
Blanket Implementations§
Source§impl<'a, T, E> AsTaggedExplicit<'a, E> for Twhere
T: 'a,
impl<'a, T, E> AsTaggedExplicit<'a, E> for Twhere
T: 'a,
Source§impl<'a, T, E> AsTaggedImplicit<'a, E> for Twhere
T: 'a,
impl<'a, T, E> AsTaggedImplicit<'a, E> for Twhere
T: 'a,
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
impl<ST, DT> CastableFrom<ST, Initialized, Initialized> for DT
impl<ST, DT> CastableFrom<ST, Uninit, Uninit> for DT
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more