use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use async_trait::async_trait;
use greentic_deploy_spec::ids::{BundleId, DeploymentId, RevisionId};
use tracing::{info, warn};
use crate::revision_dispatcher::RevisionDispatcher;
use crate::rollout_telemetry::emit_drain_transition;
use greentic_telemetry::RolloutEvent;
#[async_trait]
pub trait WsRevisionCloser: Send + Sync {
async fn close_revision(&self, deployment_id: DeploymentId, revision_id: RevisionId);
}
pub struct NoopWsRevisionCloser;
#[async_trait]
impl WsRevisionCloser for NoopWsRevisionCloser {
async fn close_revision(&self, _deployment_id: DeploymentId, _revision_id: RevisionId) {
}
}
pub trait RevisionTeardown: Send + Sync {
fn remove_revision(
&self,
tenant: &str,
deployment_id: DeploymentId,
bundle_id: BundleId,
revision_id: RevisionId,
) -> bool;
}
pub struct NoopRevisionTeardown;
impl RevisionTeardown for NoopRevisionTeardown {
fn remove_revision(
&self,
_tenant: &str,
_deployment_id: DeploymentId,
_bundle_id: BundleId,
_revision_id: RevisionId,
) -> bool {
false
}
}
pub trait RevisionLivenessProbe: Send + Sync {
fn is_live_elsewhere(&self, deployment_id: DeploymentId, revision_id: RevisionId) -> bool;
}
pub struct NoopRevisionLiveness;
impl RevisionLivenessProbe for NoopRevisionLiveness {
fn is_live_elsewhere(&self, _deployment_id: DeploymentId, _revision_id: RevisionId) -> bool {
false
}
}
pub struct DrainRequest<'a> {
pub tenant: &'a str,
pub deployment_id: DeploymentId,
pub bundle_id: BundleId,
pub revision_id: RevisionId,
pub drain_seconds: u32,
}
pub struct RevisionDrainCoordinator {
dispatcher: Arc<RevisionDispatcher>,
teardown: Arc<dyn RevisionTeardown>,
ws_closer: Arc<dyn WsRevisionCloser>,
liveness: Arc<dyn RevisionLivenessProbe>,
}
impl RevisionDrainCoordinator {
pub fn new(
dispatcher: Arc<RevisionDispatcher>,
teardown: Arc<dyn RevisionTeardown>,
ws_closer: Arc<dyn WsRevisionCloser>,
) -> Self {
Self {
dispatcher,
teardown,
ws_closer,
liveness: Arc::new(NoopRevisionLiveness),
}
}
pub fn with_liveness_probe(mut self, liveness: Arc<dyn RevisionLivenessProbe>) -> Self {
self.liveness = liveness;
self
}
pub fn with_noop_ws(
dispatcher: Arc<RevisionDispatcher>,
teardown: Arc<dyn RevisionTeardown>,
) -> Self {
Self::new(dispatcher, teardown, Arc::new(NoopWsRevisionCloser))
}
pub async fn run(&self, req: DrainRequest<'_>) -> Result<DrainReport> {
let DrainRequest {
tenant,
deployment_id,
bundle_id,
revision_id,
drain_seconds,
} = req;
let newly_marked = self.dispatcher.mark_draining(deployment_id, revision_id);
if newly_marked {
emit_drain_transition(
RolloutEvent::RevisionDraining,
tenant,
self.dispatcher.env_id(),
deployment_id,
&bundle_id,
revision_id,
);
} else {
info!(
deployment_id = %deployment_id,
revision_id = %revision_id,
"revision already draining or unknown to dispatcher; \
proceeding with teardown anyway",
);
}
tokio::time::sleep(Duration::from_secs(u64::from(drain_seconds))).await;
let evicted = self.dispatcher.evict_revision(deployment_id, revision_id);
let live_elsewhere = evicted && self.liveness.is_live_elsewhere(deployment_id, revision_id);
let eviction_event_emitted = evicted && !live_elsewhere;
if eviction_event_emitted {
emit_drain_transition(
RolloutEvent::RevisionEvicted,
tenant,
self.dispatcher.env_id(),
deployment_id,
&bundle_id,
revision_id,
);
} else if live_elsewhere {
info!(
deployment_id = %deployment_id,
revision_id = %revision_id,
"revision re-added into a newer activation before the drain \
window elapsed; suppressing stale RevisionEvicted telemetry",
);
}
self.ws_closer
.close_revision(deployment_id, revision_id)
.await;
let removed = self
.teardown
.remove_revision(tenant, deployment_id, bundle_id, revision_id);
if !removed {
warn!(
tenant = %tenant,
deployment_id = %deployment_id,
revision_id = %revision_id,
"no active runtime found for revision at drain completion; \
either never warmed or already torn down",
);
}
Ok(DrainReport {
newly_marked,
evicted_from_dispatch: evicted,
eviction_event_emitted,
removed_runtime: removed,
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct DrainReport {
pub newly_marked: bool,
pub evicted_from_dispatch: bool,
pub eviction_event_emitted: bool,
pub removed_runtime: bool,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::revision_dispatcher::{RevisionDispatcher, RevisionDispatcherConfig, RevisionEntry};
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
fn cfg() -> RevisionDispatcherConfig {
RevisionDispatcherConfig::new("local", [7u8; 32])
}
fn bundle() -> BundleId {
BundleId::new("customer.support")
}
fn entry(rev: RevisionId, w: u32) -> RevisionEntry {
RevisionEntry {
revision_id: rev,
bundle_id: bundle(),
weight_bps: w,
}
}
#[derive(Default)]
struct RecordingTeardown {
calls: Mutex<Vec<(String, DeploymentId, BundleId, RevisionId)>>,
return_value: bool,
}
impl RecordingTeardown {
fn new(return_value: bool) -> Self {
Self {
calls: Mutex::new(Vec::new()),
return_value,
}
}
}
impl RevisionTeardown for RecordingTeardown {
fn remove_revision(
&self,
tenant: &str,
deployment_id: DeploymentId,
bundle_id: BundleId,
revision_id: RevisionId,
) -> bool {
self.calls.lock().unwrap().push((
tenant.to_string(),
deployment_id,
bundle_id,
revision_id,
));
self.return_value
}
}
#[derive(Default)]
struct CountingWsCloser {
closes: AtomicUsize,
}
#[async_trait]
impl WsRevisionCloser for CountingWsCloser {
async fn close_revision(&self, _deployment_id: DeploymentId, _revision_id: RevisionId) {
self.closes.fetch_add(1, Ordering::SeqCst);
}
}
fn dispatcher_with(
deployment: DeploymentId,
revisions: Vec<RevisionEntry>,
) -> Arc<RevisionDispatcher> {
let d = RevisionDispatcher::new(cfg());
let bid = revisions[0].bundle_id.clone();
d.apply_traffic_split(deployment, revisions, bid, 0)
.expect("apply_traffic_split");
Arc::new(d)
}
#[tokio::test]
async fn drain_marks_then_tears_down() {
let dep_id = DeploymentId::new();
let r1 = RevisionId::new();
let r2 = RevisionId::new();
let dispatcher = dispatcher_with(dep_id, vec![entry(r1, 5000), entry(r2, 5000)]);
let teardown = Arc::new(RecordingTeardown::new(true));
let ws_closer = Arc::new(CountingWsCloser::default());
let coord = RevisionDrainCoordinator::new(
Arc::clone(&dispatcher),
teardown.clone(),
ws_closer.clone(),
);
let report = coord
.run(DrainRequest {
tenant: "acme",
deployment_id: dep_id,
bundle_id: bundle(),
revision_id: r2,
drain_seconds: 0,
})
.await
.expect("drain ran");
assert!(report.newly_marked);
assert!(report.evicted_from_dispatch);
assert!(report.eviction_event_emitted);
assert!(report.removed_runtime);
assert!(!dispatcher.is_draining(dep_id, r2));
assert!(dispatcher.draining_revisions(dep_id).is_empty());
assert_eq!(ws_closer.closes.load(Ordering::SeqCst), 1);
let calls = teardown.calls.lock().unwrap().clone();
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].0, "acme");
assert_eq!(calls[0].1, dep_id);
assert_eq!(calls[0].2, bundle());
assert_eq!(calls[0].3, r2);
}
struct AlwaysLiveElsewhere;
impl RevisionLivenessProbe for AlwaysLiveElsewhere {
fn is_live_elsewhere(&self, _: DeploymentId, _: RevisionId) -> bool {
true
}
}
#[tokio::test]
async fn drain_suppresses_eviction_event_when_revision_live_elsewhere() {
let dep_id = DeploymentId::new();
let r1 = RevisionId::new();
let dispatcher = dispatcher_with(dep_id, vec![entry(r1, 10_000)]);
let teardown = Arc::new(RecordingTeardown::new(false));
let coord = RevisionDrainCoordinator::with_noop_ws(Arc::clone(&dispatcher), teardown)
.with_liveness_probe(Arc::new(AlwaysLiveElsewhere));
let report = coord
.run(DrainRequest {
tenant: "acme",
deployment_id: dep_id,
bundle_id: bundle(),
revision_id: r1,
drain_seconds: 0,
})
.await
.expect("drain ran");
assert!(report.newly_marked, "draining flag was set at drain start");
assert!(
report.evicted_from_dispatch,
"dispatcher entry is still evicted (cleanup on the superseded table)"
);
assert!(
!report.eviction_event_emitted,
"RevisionEvicted telemetry must be suppressed for a re-added revision"
);
assert!(!dispatcher.contains_revision(dep_id, r1));
}
#[tokio::test]
async fn drain_is_idempotent() {
let dep_id = DeploymentId::new();
let r1 = RevisionId::new();
let dispatcher = dispatcher_with(dep_id, vec![entry(r1, 10_000)]);
let teardown = Arc::new(RecordingTeardown::new(false));
let coord =
RevisionDrainCoordinator::with_noop_ws(Arc::clone(&dispatcher), teardown.clone());
let first = coord
.run(DrainRequest {
tenant: "acme",
deployment_id: dep_id,
bundle_id: bundle(),
revision_id: r1,
drain_seconds: 0,
})
.await
.unwrap();
let second = coord
.run(DrainRequest {
tenant: "acme",
deployment_id: dep_id,
bundle_id: bundle(),
revision_id: r1,
drain_seconds: 0,
})
.await
.unwrap();
assert!(first.newly_marked);
assert!(first.evicted_from_dispatch);
assert!(!second.newly_marked);
assert!(!second.evicted_from_dispatch);
assert!(!first.removed_runtime);
assert!(!second.removed_runtime);
}
#[tokio::test]
async fn drain_unknown_revision_proceeds_with_teardown() {
let dep_id = DeploymentId::new();
let r1 = RevisionId::new();
let dispatcher = dispatcher_with(dep_id, vec![entry(r1, 10_000)]);
let teardown = Arc::new(RecordingTeardown::new(true));
let coord =
RevisionDrainCoordinator::with_noop_ws(Arc::clone(&dispatcher), teardown.clone());
let ghost = RevisionId::new();
let report = coord
.run(DrainRequest {
tenant: "acme",
deployment_id: dep_id,
bundle_id: bundle(),
revision_id: ghost,
drain_seconds: 0,
})
.await
.unwrap();
assert!(!report.newly_marked); assert!(!report.evicted_from_dispatch); assert!(report.removed_runtime); assert_eq!(teardown.calls.lock().unwrap().len(), 1);
}
#[tokio::test]
async fn drain_waits_the_window() {
let dep_id = DeploymentId::new();
let r1 = RevisionId::new();
let dispatcher = dispatcher_with(dep_id, vec![entry(r1, 10_000)]);
let teardown = Arc::new(RecordingTeardown::new(true));
let coord =
RevisionDrainCoordinator::with_noop_ws(Arc::clone(&dispatcher), teardown.clone());
tokio::time::pause();
let start = tokio::time::Instant::now();
let handle = tokio::spawn(async move {
coord
.run(DrainRequest {
tenant: "acme",
deployment_id: dep_id,
bundle_id: bundle(),
revision_id: r1,
drain_seconds: 2,
})
.await
});
tokio::task::yield_now().await;
tokio::time::advance(Duration::from_secs(2)).await;
let _ = handle.await.unwrap().unwrap();
assert!(start.elapsed() >= Duration::from_secs(2));
}
#[tokio::test]
async fn drain_then_stale_cookie_reselects_healthy_revision() {
use crate::revision_dispatcher::{DispatchRequest, SelectionReason};
use crate::revision_pin::now_secs;
use rand::SeedableRng;
use rand::rngs::StdRng;
let dep_id = DeploymentId::new();
let r1 = RevisionId::new();
let r2 = RevisionId::new();
let dispatcher = dispatcher_with(dep_id, vec![entry(r1, 5000), entry(r2, 5000)]);
let cookie = dispatcher.seal_cookie("local", "t", dep_id, r2, 1, now_secs() + 3600);
let mut rng = StdRng::seed_from_u64(0);
let pre = dispatcher
.dispatch(
&DispatchRequest {
env_id: "local",
tenant: "t",
deployment_id: dep_id,
session_hint: None,
trusted: false,
header_revision: None,
cookie: Some(&cookie),
},
&mut rng,
)
.await
.unwrap();
assert_eq!(pre.revision_id, r2);
assert_eq!(pre.reason, SelectionReason::Cookie);
let teardown = Arc::new(RecordingTeardown::new(true));
let coord =
RevisionDrainCoordinator::with_noop_ws(Arc::clone(&dispatcher), teardown.clone());
coord
.run(DrainRequest {
tenant: "acme",
deployment_id: dep_id,
bundle_id: bundle(),
revision_id: r2,
drain_seconds: 0,
})
.await
.unwrap();
let post = dispatcher
.dispatch(
&DispatchRequest {
env_id: "local",
tenant: "t",
deployment_id: dep_id,
session_hint: None,
trusted: false,
header_revision: None,
cookie: Some(&cookie),
},
&mut rng,
)
.await
.unwrap();
assert_eq!(
post.revision_id, r1,
"stale cookie must re-dispatch to healthy r1"
);
assert_eq!(post.reason, SelectionReason::Weighted);
assert!(
post.set_cookie.is_some(),
"fresh cookie migrates the session"
);
}
#[tokio::test]
async fn drain_then_stale_pin_reselects_healthy_revision() {
use crate::revision_dispatcher::{DispatchRequest, SelectionReason};
use rand::SeedableRng;
use rand::rngs::StdRng;
let dep_id = DeploymentId::new();
let r1 = RevisionId::new();
let r2 = RevisionId::new();
let dispatcher = dispatcher_with(dep_id, vec![entry(r1, 0), entry(r2, 10_000)]);
let mut rng = StdRng::seed_from_u64(0);
let pre = dispatcher
.dispatch(
&DispatchRequest {
env_id: "local",
tenant: "t",
deployment_id: dep_id,
session_hint: Some("sess-pin"),
trusted: false,
header_revision: None,
cookie: None,
},
&mut rng,
)
.await
.unwrap();
assert_eq!(pre.revision_id, r2);
dispatcher
.apply_traffic_split(dep_id, vec![entry(r1, 5000), entry(r2, 5000)], bundle(), 1)
.unwrap();
let _ = dispatcher
.dispatch(
&DispatchRequest {
env_id: "local",
tenant: "t",
deployment_id: dep_id,
session_hint: Some("sess-pin"),
trusted: false,
header_revision: None,
cookie: None,
},
&mut rng,
)
.await
.unwrap();
let teardown = Arc::new(RecordingTeardown::new(true));
let coord =
RevisionDrainCoordinator::with_noop_ws(Arc::clone(&dispatcher), teardown.clone());
coord
.run(DrainRequest {
tenant: "acme",
deployment_id: dep_id,
bundle_id: bundle(),
revision_id: r2,
drain_seconds: 0,
})
.await
.unwrap();
let post = dispatcher
.dispatch(
&DispatchRequest {
env_id: "local",
tenant: "t",
deployment_id: dep_id,
session_hint: Some("sess-pin"),
trusted: false,
header_revision: None,
cookie: None,
},
&mut rng,
)
.await
.unwrap();
assert_eq!(
post.revision_id, r1,
"stale pin must re-dispatch to healthy r1"
);
assert_eq!(post.reason, SelectionReason::Weighted);
}
#[tokio::test]
async fn drain_does_not_make_pinned_session_flap_across_survivors() {
use crate::revision_dispatcher::{DispatchRequest, SelectionReason};
use rand::SeedableRng;
use rand::rngs::StdRng;
let dep_id = DeploymentId::new();
let r1 = RevisionId::new();
let r2 = RevisionId::new();
let r3 = RevisionId::new();
let dispatcher =
dispatcher_with(dep_id, vec![entry(r1, 0), entry(r2, 0), entry(r3, 10_000)]);
let mut rng = StdRng::seed_from_u64(0);
let pre = dispatcher
.dispatch(
&DispatchRequest {
env_id: "local",
tenant: "t",
deployment_id: dep_id,
session_hint: Some("sess-flap"),
trusted: false,
header_revision: None,
cookie: None,
},
&mut rng,
)
.await
.unwrap();
assert_eq!(pre.revision_id, r3);
dispatcher
.apply_traffic_split(
dep_id,
vec![entry(r1, 5000), entry(r2, 5000), entry(r3, 0)],
bundle(),
1,
)
.unwrap();
dispatcher.mark_draining(dep_id, r3);
let teardown = Arc::new(RecordingTeardown::new(true));
let coord =
RevisionDrainCoordinator::with_noop_ws(Arc::clone(&dispatcher), teardown.clone());
coord
.run(DrainRequest {
tenant: "acme",
deployment_id: dep_id,
bundle_id: bundle(),
revision_id: r3,
drain_seconds: 0,
})
.await
.unwrap();
let first = dispatcher
.dispatch(
&DispatchRequest {
env_id: "local",
tenant: "t",
deployment_id: dep_id,
session_hint: Some("sess-flap"),
trusted: false,
header_revision: None,
cookie: None,
},
&mut rng,
)
.await
.unwrap();
assert!(first.revision_id == r1 || first.revision_id == r2);
let anchored = first.revision_id;
for _ in 0..50 {
let next = dispatcher
.dispatch(
&DispatchRequest {
env_id: "local",
tenant: "t",
deployment_id: dep_id,
session_hint: Some("sess-flap"),
trusted: false,
header_revision: None,
cookie: None,
},
&mut rng,
)
.await
.unwrap();
assert_eq!(
next.revision_id, anchored,
"session must stay anchored to one survivor, not flap"
);
assert_eq!(next.reason, SelectionReason::Pin);
}
}
}