pub struct ReplicationManager {
pub next_generation: AtomicU64,
pub dropped_total: AtomicU64,
/* private fields */
}Expand description
In-memory manager of per-bucket replication configurations + per- (bucket, key) replication statuses.
Fields§
§next_generation: AtomicU64v0.8.2 #61: monotonic per-source-PUT generation counter. Each
put_object (or complete_multipart_upload) on a replicated
source bucket calls Self::next_generation before spawning
its detached replication task. The dispatcher carries the
generation through to Self::record_status_if_newer, which
drops the stamp + the destination write when a newer
generation has already won — guaranteeing the destination
can’t be rolled back by a slow retry.
dropped_total: AtomicU64Bumped each time the dispatcher exhausts its retry budget on a destination PUT. Exposed publicly so the metrics layer can poll without taking the configuration lock.
Implementations§
Source§impl ReplicationManager
impl ReplicationManager
Sourcepub fn new() -> Self
pub fn new() -> Self
Empty manager — no bucket has any replication rules. The
generation counter starts at 1 so the first PUT-issued token is
1 (a stored entry’s generation = 0 from a pre-#61 snapshot
is then strictly less and the very next PUT wins the CAS).
Sourcepub fn next_generation(&self) -> u64
pub fn next_generation(&self) -> u64
v0.8.2 #61: mint a fresh, monotonically-increasing generation
token. Caller is the per-source-PUT dispatcher fork (the body-
bearing put_object branch, the body-less put_object branch,
and complete_multipart_upload). The token is then carried
through replicate_object to Self::record_status_if_newer
so a stale retry can be detected and dropped.
Uses Relaxed ordering — we only need uniqueness +
monotonicity per atomic; the cross-thread happens-before
between PUT-A’s spawn and the dispatcher reading the body is
already established by tokio::spawn’s implicit
Acquire/Release on the task queue.
Sourcepub fn put(&self, bucket: &str, config: ReplicationConfig)
pub fn put(&self, bucket: &str, config: ReplicationConfig)
put_bucket_replication handler entry. The bucket’s existing
configuration is fully replaced (S3 spec — PutBucketReplication
is upsert-style at the bucket scope, not per-rule patch).
Sourcepub fn get(&self, bucket: &str) -> Option<ReplicationConfig>
pub fn get(&self, bucket: &str) -> Option<ReplicationConfig>
get_bucket_replication handler entry. Returns None when
nothing is registered (AWS S3 returns
ReplicationConfigurationNotFoundError in that case; the
service-layer handler maps None accordingly).
Sourcepub fn to_json(&self) -> Result<String, Error>
pub fn to_json(&self) -> Result<String, Error>
Serialise the entire manager state (configurations + per-key
statuses + next generation counter) to JSON. The status entries
are emitted in the v0.8.2 #61 schema (ReplicationStatusEntry);
readers built before #61 will see the embedded
{ status, generation } shape via the untagged enum and
(older binaries) reject — but the production restart path always
runs the same binary against its own snapshot.
Sourcepub fn from_json(s: &str) -> Result<Self, Error>
pub fn from_json(s: &str) -> Result<Self, Error>
Restore a manager from a previously-emitted snapshot. The
dropped_total counter is reset to 0 — historical drops are
runtime metrics, not configuration.
§Back-compat (v0.8.2 #61)
Pre-#61 snapshots store bare ReplicationStatus (no
generation). The untagged StatusOrEntry enum picks them up
and assigns generation = 0, which the CAS-style
Self::record_status_if_newer treats as “always overridable
by the next real PUT” — guaranteed loss-free migration. The
next_generation counter defaults to 1 when the snapshot
predates #61 (= serde(default) on the field).
Sourcepub fn match_rule(
&self,
bucket: &str,
key: &str,
object_tags: &[(String, String)],
) -> Option<ReplicationRule>
pub fn match_rule( &self, bucket: &str, key: &str, object_tags: &[(String, String)], ) -> Option<ReplicationRule>
Match an object against the bucket’s rules and return the
highest-priority enabled rule whose filter matches. Returns
None when no rule matches (or no configuration is registered
for the bucket). Ties on priority are broken by declaration
order — the first such rule wins.
Sourcepub fn record_status(&self, bucket: &str, key: &str, status: ReplicationStatus)
pub fn record_status(&self, bucket: &str, key: &str, status: ReplicationStatus)
Stamp the per-(bucket, key) replication status with no
generation guard. Replaces any previous entry. Generation is
reset to 0 (= overridable by the next real PUT) — callers
that hold a generation token must use
Self::record_status_if_newer instead.
Use cases (kept for back-compat + the eager Pending stamp the
service-layer dispatcher emits before spawning the actual
replication task):
- Eager
Pendingstamp synchronously alongside the source PUT so a HEAD between PUT-return and dispatcher-completion seesPENDINGinstead ofNone. - Tests that don’t care about generation (legacy assertions).
Sourcepub fn record_status_if_newer(
&self,
bucket: &str,
key: &str,
generation: u64,
status: ReplicationStatus,
) -> bool
pub fn record_status_if_newer( &self, bucket: &str, key: &str, generation: u64, status: ReplicationStatus, ) -> bool
v0.8.2 #61: CAS-style stamp. Only updates the entry when
generation >= entry.generation; rejects the update (returns
false) when generation < entry.generation because a newer
PUT has already won and we must not roll the source’s status
back to a stale terminal state.
§Returns
true— the stamp was accepted; the caller may proceed with the destination-bucket PUT (inreplicate_object) / declare success.false— a strictly-newer generation has already stamped the entry; the caller must drop the destination write to avoid overwriting newer bytes with a stale retry’s body.
Equality (generation == entry.generation) is accepted because
the same generation may legitimately stamp twice across the
dispatcher’s retry budget (Pending → Completed on the same
task).
Sourcepub fn sweep_stale(&self, now: DateTime<Utc>, max_age: Duration) -> usize
pub fn sweep_stale(&self, now: DateTime<Utc>, max_age: Duration) -> usize
v0.8.3 #66 (H-5 audit fix): drop terminal-state entries
(Completed / Failed) older than max_age. Pending entries
are never swept because they are still in-flight — the
dispatcher is racing toward a terminal stamp and dropping the
Pending would lose the eventual outcome (and let the entry
re-emerge under the original key with no recorded history).
Replica entries can theoretically appear here through legacy
paths and are likewise preserved (the destination-side stamp is
not produced by record_status_if_newer in the current code,
but the conservative filter keeps any future use loss-free).
Cutoff is now - max_age rather than Utc::now() - max_age so
callers can drive the clock deterministically in tests.
Returns the number of entries removed (operators dashboard via
s4_replication_status_swept_total).
Sourcepub fn lookup_status(
&self,
bucket: &str,
key: &str,
) -> Option<ReplicationStatus>
pub fn lookup_status( &self, bucket: &str, key: &str, ) -> Option<ReplicationStatus>
Look up the recorded replication status for (bucket, key).
Returns None when no PUT to this key has triggered
replication (= the object is not under any replication rule, or
it predates the rule’s creation).
The generation field of the entry is intentionally not
surfaced here — it’s an internal CAS guard, not part of the
AWS wire shape.
Trait Implementations§
Source§impl Debug for ReplicationManager
impl Debug for ReplicationManager
Auto Trait Implementations§
impl !Freeze for ReplicationManager
impl RefUnwindSafe for ReplicationManager
impl Send for ReplicationManager
impl Sync for ReplicationManager
impl Unpin for ReplicationManager
impl UnsafeUnpin for ReplicationManager
impl UnwindSafe for ReplicationManager
Blanket Implementations§
Source§impl<'a, T, E> AsTaggedExplicit<'a, E> for Twhere
T: 'a,
impl<'a, T, E> AsTaggedExplicit<'a, E> for Twhere
T: 'a,
Source§impl<'a, T, E> AsTaggedImplicit<'a, E> for Twhere
T: 'a,
impl<'a, T, E> AsTaggedImplicit<'a, E> for Twhere
T: 'a,
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
impl<ST, DT> CastableFrom<ST, Initialized, Initialized> for DT
impl<ST, DT> CastableFrom<ST, Uninit, Uninit> for DT
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more