use super::identity::NodeIdentity;
use super::ownership::{
CatalogError, CatalogVersion, CollectionId, OwnershipEpoch, RangeId, RangeOwnership,
ShardOwnershipCatalog,
};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ForceTransitionCapability {
operator: NodeIdentity,
}
impl ForceTransitionCapability {
pub fn granted_to(operator: NodeIdentity) -> Self {
Self { operator }
}
pub fn operator(&self) -> &NodeIdentity {
&self.operator
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct OperatorReason(String);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct EmptyOperatorReason;
impl std::fmt::Display for EmptyOperatorReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "forced transition operator reason must not be empty")
}
}
impl std::error::Error for EmptyOperatorReason {}
impl OperatorReason {
pub fn new(text: impl Into<String>) -> Result<Self, EmptyOperatorReason> {
let text = text.into();
let trimmed = text.trim();
if trimmed.is_empty() {
return Err(EmptyOperatorReason);
}
Ok(Self(trimmed.to_string()))
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl std::fmt::Display for OperatorReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ForcedTransitionRequest {
collection: CollectionId,
range_id: RangeId,
target: NodeIdentity,
new_replicas: Vec<NodeIdentity>,
capability: Option<ForceTransitionCapability>,
reason: Option<OperatorReason>,
}
impl ForcedTransitionRequest {
pub fn new(collection: CollectionId, range_id: RangeId, target: NodeIdentity) -> Self {
Self {
collection,
range_id,
target,
new_replicas: Vec::new(),
capability: None,
reason: None,
}
}
pub fn with_capability(mut self, capability: ForceTransitionCapability) -> Self {
self.capability = Some(capability);
self
}
pub fn with_reason(mut self, reason: OperatorReason) -> Self {
self.reason = Some(reason);
self
}
pub fn with_replicas(mut self, replicas: impl IntoIterator<Item = NodeIdentity>) -> Self {
self.new_replicas = replicas.into_iter().collect();
self
}
pub fn collection(&self) -> &CollectionId {
&self.collection
}
pub fn range_id(&self) -> RangeId {
self.range_id
}
pub fn target(&self) -> &NodeIdentity {
&self.target
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ForceDenial {
MissingCapability,
MissingReason,
}
impl ForceDenial {
fn label(self) -> &'static str {
match self {
ForceDenial::MissingCapability => "no administrative force capability presented",
ForceDenial::MissingReason => "no explicit operator reason supplied",
}
}
}
impl std::fmt::Display for ForceDenial {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.label())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ForceFailure {
UnknownRange,
Catalog(CatalogError),
}
impl std::fmt::Display for ForceFailure {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::UnknownRange => write!(f, "no such range in the catalog"),
Self::Catalog(err) => write!(f, "{err}"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ForcedTransitionDisposition {
Allowed {
previous_owner: NodeIdentity,
new_owner: NodeIdentity,
previous_epoch: OwnershipEpoch,
new_epoch: OwnershipEpoch,
previous_version: CatalogVersion,
new_version: CatalogVersion,
},
Denied(ForceDenial),
Failed(ForceFailure),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ForcedTransitionAudit {
attempted_at_ms: u64,
operator: Option<NodeIdentity>,
reason: Option<String>,
collection: CollectionId,
range_id: RangeId,
target: NodeIdentity,
disposition: ForcedTransitionDisposition,
}
impl ForcedTransitionAudit {
pub fn attempted_at_ms(&self) -> u64 {
self.attempted_at_ms
}
pub fn operator(&self) -> Option<&NodeIdentity> {
self.operator.as_ref()
}
pub fn reason(&self) -> Option<&str> {
self.reason.as_deref()
}
pub fn collection(&self) -> &CollectionId {
&self.collection
}
pub fn range_id(&self) -> RangeId {
self.range_id
}
pub fn target(&self) -> &NodeIdentity {
&self.target
}
pub fn disposition(&self) -> &ForcedTransitionDisposition {
&self.disposition
}
pub fn is_allowed(&self) -> bool {
matches!(
self.disposition,
ForcedTransitionDisposition::Allowed { .. }
)
}
pub fn fenced_old_owner(&self) -> bool {
matches!(
self.disposition,
ForcedTransitionDisposition::Allowed {
previous_epoch,
new_epoch,
..
} if new_epoch > previous_epoch
)
}
}
impl std::fmt::Display for ForcedTransitionAudit {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let operator = self
.operator
.as_ref()
.map(|o| o.to_string())
.unwrap_or_else(|| "<none>".to_string());
let reason = self.reason.as_deref().unwrap_or("<none>");
write!(
f,
"forced ownership transition @ {} ms by operator {} for {}/{} -> {} (reason: {}): ",
self.attempted_at_ms, operator, self.collection, self.range_id, self.target, reason,
)?;
match &self.disposition {
ForcedTransitionDisposition::Allowed {
previous_owner,
new_owner,
previous_epoch,
new_epoch,
previous_version,
new_version,
} => write!(
f,
"ALLOWED: {} (epoch {}, version {}) -> {} (epoch {}, version {})",
previous_owner, previous_epoch, previous_version, new_owner, new_epoch, new_version,
),
ForcedTransitionDisposition::Denied(reason) => write!(f, "DENIED: {reason}"),
ForcedTransitionDisposition::Failed(failure) => write!(f, "FAILED: {failure}"),
}
}
}
pub fn force_transition(
catalog: &mut ShardOwnershipCatalog,
request: &ForcedTransitionRequest,
now_ms: u64,
) -> ForcedTransitionAudit {
let operator = request.capability.as_ref().map(|c| c.operator().clone());
let reason = request.reason.as_ref().map(|r| r.as_str().to_string());
let audit = |disposition| ForcedTransitionAudit {
attempted_at_ms: now_ms,
operator: operator.clone(),
reason: reason.clone(),
collection: request.collection.clone(),
range_id: request.range_id,
target: request.target.clone(),
disposition,
};
if request.capability.is_none() {
return audit(ForcedTransitionDisposition::Denied(
ForceDenial::MissingCapability,
));
}
if request.reason.is_none() {
return audit(ForcedTransitionDisposition::Denied(
ForceDenial::MissingReason,
));
}
let Some(current) = catalog.range(&request.collection, request.range_id) else {
return audit(ForcedTransitionDisposition::Failed(
ForceFailure::UnknownRange,
));
};
let previous_owner = current.owner().clone();
let previous_epoch = current.epoch();
let previous_version = current.version();
let next = current.transfer_to(request.target.clone(), request.new_replicas.clone());
let new_epoch = next.epoch();
let new_version = next.version();
match catalog.apply_update(next) {
Ok(_) => audit(ForcedTransitionDisposition::Allowed {
previous_owner,
new_owner: request.target.clone(),
previous_epoch,
new_epoch,
previous_version,
new_version,
}),
Err(err) => audit(ForcedTransitionDisposition::Failed(ForceFailure::Catalog(
err,
))),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cluster::ownership::{
OwnershipEpoch, PlacementMetadata, RangeBounds, RangeWriteReject, ShardKeyMode,
};
fn collection(name: &str) -> CollectionId {
CollectionId::new(name).unwrap()
}
fn ident(cn: &str) -> NodeIdentity {
NodeIdentity::from_certificate_subject(cn).unwrap()
}
fn catalog_with(owner: &str, replicas: &[&str]) -> (ShardOwnershipCatalog, CollectionId) {
let orders = collection("orders");
let mut catalog = ShardOwnershipCatalog::new();
catalog
.apply_update(RangeOwnership::establish(
orders.clone(),
RangeId::new(1),
ShardKeyMode::Hash,
RangeBounds::full(),
ident(owner),
replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
PlacementMetadata::with_replication_factor(3),
))
.unwrap();
(catalog, orders)
}
fn capability(operator: &str) -> ForceTransitionCapability {
ForceTransitionCapability::granted_to(ident(operator))
}
fn reason() -> OperatorReason {
OperatorReason::new("primary AZ lost, promoting surviving copy").unwrap()
}
fn authorised_request(orders: &CollectionId, target: &str) -> ForcedTransitionRequest {
ForcedTransitionRequest::new(orders.clone(), RangeId::new(1), ident(target))
.with_capability(capability("CN=operator-root"))
.with_reason(reason())
}
#[test]
fn operator_reason_rejects_blank_input() {
assert_eq!(OperatorReason::new(""), Err(EmptyOperatorReason));
assert_eq!(OperatorReason::new(" "), Err(EmptyOperatorReason));
assert_eq!(OperatorReason::new("\t\n "), Err(EmptyOperatorReason));
}
#[test]
fn operator_reason_trims_surrounding_whitespace() {
let r = OperatorReason::new(" recover orders/1 ").unwrap();
assert_eq!(r.as_str(), "recover orders/1");
}
#[test]
fn force_denied_without_capability() {
let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let req = ForcedTransitionRequest::new(orders.clone(), RangeId::new(1), ident("CN=node-b"))
.with_reason(reason());
let audit = force_transition(&mut catalog, &req, 1_000);
assert!(!audit.is_allowed());
assert_eq!(
audit.disposition(),
&ForcedTransitionDisposition::Denied(ForceDenial::MissingCapability)
);
let range = catalog.range(&orders, RangeId::new(1)).unwrap();
assert_eq!(range.owner(), &ident("CN=node-a"));
assert_eq!(range.epoch(), OwnershipEpoch::initial());
assert!(audit.to_string().contains("DENIED"));
assert_eq!(audit.attempted_at_ms(), 1_000);
}
#[test]
fn force_denied_without_reason() {
let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let req = ForcedTransitionRequest::new(orders.clone(), RangeId::new(1), ident("CN=node-b"))
.with_capability(capability("CN=operator-root"));
let audit = force_transition(&mut catalog, &req, 2_000);
assert!(!audit.is_allowed());
assert_eq!(
audit.disposition(),
&ForcedTransitionDisposition::Denied(ForceDenial::MissingReason)
);
assert_eq!(audit.operator(), Some(&ident("CN=operator-root")));
assert_eq!(audit.reason(), None);
let range = catalog.range(&orders, RangeId::new(1)).unwrap();
assert_eq!(range.owner(), &ident("CN=node-a"));
}
#[test]
fn missing_capability_is_reported_before_missing_reason() {
let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let req = ForcedTransitionRequest::new(orders, RangeId::new(1), ident("CN=node-b"));
let audit = force_transition(&mut catalog, &req, 0);
assert_eq!(
audit.disposition(),
&ForcedTransitionDisposition::Denied(ForceDenial::MissingCapability)
);
}
#[test]
fn authorised_force_bumps_epoch_and_moves_owner() {
let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let req = ForcedTransitionRequest::new(orders.clone(), RangeId::new(1), ident("CN=node-z"))
.with_capability(capability("CN=operator-root"))
.with_reason(reason());
let audit = force_transition(&mut catalog, &req, 5_000);
assert!(audit.is_allowed());
assert!(audit.fenced_old_owner());
match audit.disposition() {
ForcedTransitionDisposition::Allowed {
previous_owner,
new_owner,
previous_epoch,
new_epoch,
previous_version,
new_version,
} => {
assert_eq!(previous_owner, &ident("CN=node-a"));
assert_eq!(new_owner, &ident("CN=node-z"));
assert_eq!(*previous_epoch, OwnershipEpoch::initial());
assert_eq!(new_epoch.value(), 2);
assert_eq!(previous_version.value(), 1);
assert_eq!(new_version.value(), 2);
}
other => panic!("expected Allowed, got {other:?}"),
}
let range = catalog.range(&orders, RangeId::new(1)).unwrap();
assert_eq!(range.owner(), &ident("CN=node-z"));
assert_eq!(range.epoch().value(), 2);
}
#[test]
fn audit_evidence_records_operator_reason_and_boundary() {
let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let req = authorised_request(&orders, "CN=node-b");
let audit = force_transition(&mut catalog, &req, 7_777);
assert_eq!(audit.operator(), Some(&ident("CN=operator-root")));
assert_eq!(
audit.reason(),
Some("primary AZ lost, promoting surviving copy")
);
assert_eq!(audit.attempted_at_ms(), 7_777);
assert_eq!(audit.target(), &ident("CN=node-b"));
let line = audit.to_string();
assert!(line.contains("ALLOWED"));
assert!(line.contains("CN=operator-root"));
assert!(line.contains("primary AZ lost"));
assert!(line.contains("CN=node-a"));
assert!(line.contains("CN=node-b"));
}
#[test]
fn reappearing_old_owner_is_fenced_after_force() {
let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
assert!(catalog
.admit_public_write(
&ident("CN=node-a"),
&orders,
b"k",
OwnershipEpoch::initial()
)
.is_ok());
let req = authorised_request(&orders, "CN=node-b").with_replicas([ident("CN=node-a")]);
let audit = force_transition(&mut catalog, &req, 1_000);
assert!(audit.is_allowed());
let err = catalog
.admit_public_write(
&ident("CN=node-a"),
&orders,
b"k",
OwnershipEpoch::initial(),
)
.unwrap_err();
assert!(matches!(err, RangeWriteReject::NotOwner { .. }));
let current_epoch = catalog.range(&orders, RangeId::new(1)).unwrap().epoch();
assert!(catalog
.admit_public_write(&ident("CN=node-b"), &orders, b"k", current_epoch)
.is_ok());
}
#[test]
fn force_against_unknown_range_fails_and_is_audited() {
let mut catalog = ShardOwnershipCatalog::new();
let orders = collection("orders");
let req = authorised_request(&orders, "CN=node-b");
let audit = force_transition(&mut catalog, &req, 3_000);
assert!(!audit.is_allowed());
assert_eq!(
audit.disposition(),
&ForcedTransitionDisposition::Failed(ForceFailure::UnknownRange)
);
assert_eq!(audit.operator(), Some(&ident("CN=operator-root")));
assert!(audit.reason().is_some());
assert!(audit.to_string().contains("FAILED"));
}
#[test]
fn ordinary_safety_checks_are_untouched_by_the_force_path() {
use crate::cluster::ownership_transition::{
run_transition, CommitWatermark, TransitionError, TransitionKind, TransitionRejection,
TransitionRequest,
};
let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let forced = force_transition(&mut catalog, &authorised_request(&orders, "CN=node-b"), 10);
assert!(forced.is_allowed());
let stale = TransitionRequest::new(
TransitionKind::Promote,
orders.clone(),
RangeId::new(1),
ident("CN=node-a"),
OwnershipEpoch::initial(),
CatalogVersion::initial(),
ident("CN=node-b"),
CommitWatermark::new(1, 10),
)
.with_evidence(crate::cluster::ownership_transition::CatchUpEvidence::new(
ident("CN=node-b"),
1,
10,
));
let err = run_transition(&mut catalog, &stale).unwrap_err();
assert!(matches!(
err,
TransitionError::Rejected(TransitionRejection::OwnerMismatch { .. })
));
}
}