pub struct ReplicationManager {
pub next_generation: AtomicU64,
pub dropped_total: AtomicU64,
/* private fields */
}Expand description
In-memory manager of per-bucket replication configurations + per- (bucket, key) replication statuses.
Fields§
§next_generation: AtomicU64v0.8.2 #61: monotonic per-source-PUT generation counter. Each
put_object (or complete_multipart_upload) on a replicated
source bucket calls Self::next_generation before spawning
its detached replication task. The dispatcher carries the
generation through to Self::record_status_if_newer, which
drops the stamp + the destination write when a newer
generation has already won — guaranteeing the destination
can’t be rolled back by a slow retry.
dropped_total: AtomicU64Bumped each time the dispatcher exhausts its retry budget on a destination PUT. Exposed publicly so the metrics layer can poll without taking the configuration lock.
Implementations§
Source§impl ReplicationManager
impl ReplicationManager
Sourcepub fn new() -> Self
pub fn new() -> Self
Empty manager — no bucket has any replication rules. The
generation counter starts at 1 so the first PUT-issued token is
1 (a stored entry’s generation = 0 from a pre-#61 snapshot
is then strictly less and the very next PUT wins the CAS).
Sourcepub fn next_generation(&self) -> u64
pub fn next_generation(&self) -> u64
v0.8.2 #61: mint a fresh, monotonically-increasing generation
token. Caller is the per-source-PUT dispatcher fork (the body-
bearing put_object branch, the body-less put_object branch,
and complete_multipart_upload). The token is then carried
through replicate_object to Self::record_status_if_newer
so a stale retry can be detected and dropped.
Uses Relaxed ordering — we only need uniqueness +
monotonicity per atomic; the cross-thread happens-before
between PUT-A’s spawn and the dispatcher reading the body is
already established by tokio::spawn’s implicit
Acquire/Release on the task queue.
Sourcepub fn put(&self, bucket: &str, config: ReplicationConfig)
pub fn put(&self, bucket: &str, config: ReplicationConfig)
put_bucket_replication handler entry. The bucket’s existing
configuration is fully replaced (S3 spec — PutBucketReplication
is upsert-style at the bucket scope, not per-rule patch).
Sourcepub fn get(&self, bucket: &str) -> Option<ReplicationConfig>
pub fn get(&self, bucket: &str) -> Option<ReplicationConfig>
get_bucket_replication handler entry. Returns None when
nothing is registered (AWS S3 returns
ReplicationConfigurationNotFoundError in that case; the
service-layer handler maps None accordingly).
Sourcepub fn to_json(&self) -> Result<String, Error>
pub fn to_json(&self) -> Result<String, Error>
Serialise the entire manager state (configurations + per-key
statuses + next generation counter) to JSON. The status entries
are emitted in the v0.8.2 #61 schema (ReplicationStatusEntry);
readers built before #61 will see the embedded
{ status, generation } shape via the untagged enum and
(older binaries) reject — but the production restart path always
runs the same binary against its own snapshot.
Sourcepub fn from_json(s: &str) -> Result<Self, Error>
pub fn from_json(s: &str) -> Result<Self, Error>
Restore a manager from a previously-emitted snapshot. The
dropped_total counter is reset to 0 — historical drops are
runtime metrics, not configuration.
§Back-compat (v0.8.2 #61)
Pre-#61 snapshots store bare ReplicationStatus (no
generation). The untagged StatusOrEntry enum picks them up
and assigns generation = 0, which the CAS-style
Self::record_status_if_newer treats as “always overridable
by the next real PUT” — guaranteed loss-free migration. The
next_generation counter defaults to 1 when the snapshot
predates #61 (= serde(default) on the field).
Sourcepub fn match_rule(
&self,
bucket: &str,
key: &str,
object_tags: &[(String, String)],
) -> Option<ReplicationRule>
pub fn match_rule( &self, bucket: &str, key: &str, object_tags: &[(String, String)], ) -> Option<ReplicationRule>
Match an object against the bucket’s rules and return the
highest-priority enabled rule whose filter matches. Returns
None when no rule matches (or no configuration is registered
for the bucket). Ties on priority are broken by declaration
order — the first such rule wins.
Sourcepub fn record_status(&self, bucket: &str, key: &str, status: ReplicationStatus)
pub fn record_status(&self, bucket: &str, key: &str, status: ReplicationStatus)
Stamp the per-(bucket, key) replication status with no
generation guard. Replaces any previous entry. Generation is
reset to 0 (= overridable by the next real PUT) — callers
that hold a generation token must use
Self::record_status_if_newer instead.
Use cases (kept for back-compat + the eager Pending stamp the
service-layer dispatcher emits before spawning the actual
replication task):
- Eager
Pendingstamp synchronously alongside the source PUT so a HEAD between PUT-return and dispatcher-completion seesPENDINGinstead ofNone. - Tests that don’t care about generation (legacy assertions).
Sourcepub fn record_status_if_newer(
&self,
bucket: &str,
key: &str,
generation: u64,
status: ReplicationStatus,
) -> bool
pub fn record_status_if_newer( &self, bucket: &str, key: &str, generation: u64, status: ReplicationStatus, ) -> bool
v0.8.2 #61: CAS-style stamp. Only updates the entry when
generation >= entry.generation; rejects the update (returns
false) when generation < entry.generation because a newer
PUT has already won and we must not roll the source’s status
back to a stale terminal state.
§Returns
true— the stamp was accepted; the caller may proceed with the destination-bucket PUT (inreplicate_object) / declare success.false— a strictly-newer generation has already stamped the entry; the caller must drop the destination write to avoid overwriting newer bytes with a stale retry’s body.
Equality (generation == entry.generation) is accepted because
the same generation may legitimately stamp twice across the
dispatcher’s retry budget (Pending → Completed on the same
task).
Sourcepub fn sweep_stale(&self, now: DateTime<Utc>, max_age: Duration) -> usize
pub fn sweep_stale(&self, now: DateTime<Utc>, max_age: Duration) -> usize
v0.8.3 #66 (H-5 audit fix): drop terminal-state entries
(Completed / Failed) older than max_age. Pending entries
are never swept because they are still in-flight — the
dispatcher is racing toward a terminal stamp and dropping the
Pending would lose the eventual outcome (and let the entry
re-emerge under the original key with no recorded history).
Replica entries can theoretically appear here through legacy
paths and are likewise preserved (the destination-side stamp is
not produced by record_status_if_newer in the current code,
but the conservative filter keeps any future use loss-free).
Cutoff is now - max_age rather than Utc::now() - max_age so
callers can drive the clock deterministically in tests.
Returns the number of entries removed (operators dashboard via
s4_replication_status_swept_total).
Sourcepub fn lookup_status(
&self,
bucket: &str,
key: &str,
) -> Option<ReplicationStatus>
pub fn lookup_status( &self, bucket: &str, key: &str, ) -> Option<ReplicationStatus>
Look up the recorded replication status for (bucket, key).
Returns None when no PUT to this key has triggered
replication (= the object is not under any replication rule, or
it predates the rule’s creation).
The generation field of the entry is intentionally not
surfaced here — it’s an internal CAS guard, not part of the
AWS wire shape.