use std::path::PathBuf;
use std::time::Duration;
use super::backend_registry::BackendKey;
pub const DEFAULT_BROADCAST_ACK_TIMEOUT: Duration = Duration::from_secs(5);
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum BroadcastOperation {
ReleaseHandles {
path_prefix: PathBuf,
},
Quiesce {
reason: QuiesceReason,
},
}
impl BroadcastOperation {
pub fn release_handles(path_prefix: impl Into<PathBuf>) -> Self {
Self::ReleaseHandles {
path_prefix: path_prefix.into(),
}
}
pub fn quiesce(reason: QuiesceReason) -> Self {
Self::Quiesce { reason }
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum QuiesceReason {
IdleTimeout,
BrokerShutdown,
Maintenance,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct BroadcastPolicy {
pub ack_timeout: Duration,
}
impl BroadcastPolicy {
pub fn new(ack_timeout: Duration) -> Self {
Self {
ack_timeout: if ack_timeout.is_zero() {
Duration::from_millis(1)
} else {
ack_timeout
},
}
}
}
impl Default for BroadcastPolicy {
fn default() -> Self {
Self {
ack_timeout: DEFAULT_BROADCAST_ACK_TIMEOUT,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct BroadcastBackend {
pub key: BackendKey,
live: bool,
response: BroadcastBackendResponse,
received: Vec<BroadcastOperation>,
}
impl BroadcastBackend {
pub fn live(key: BackendKey) -> Self {
Self {
key,
live: true,
response: BroadcastBackendResponse::Ack,
received: Vec::new(),
}
}
pub fn dead(key: BackendKey) -> Self {
Self {
key,
live: false,
response: BroadcastBackendResponse::Ack,
received: Vec::new(),
}
}
pub fn with_response(mut self, response: BroadcastBackendResponse) -> Self {
self.response = response;
self
}
pub fn set_live(&mut self, live: bool) {
self.live = live;
}
pub fn is_live(&self) -> bool {
self.live
}
pub fn received_operations(&self) -> &[BroadcastOperation] {
&self.received
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum BroadcastBackendResponse {
Ack,
Timeout,
Failure(BroadcastFailureReason),
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum BroadcastFailureReason {
UnsupportedOperation,
Rejected,
BackendError,
}
#[derive(Debug)]
pub struct LifecycleBroadcastModel {
policy: BroadcastPolicy,
backends: Vec<BroadcastBackend>,
}
impl LifecycleBroadcastModel {
pub fn new() -> Self {
Self::with_policy(BroadcastPolicy::default())
}
pub fn with_policy(policy: BroadcastPolicy) -> Self {
Self {
policy,
backends: Vec::new(),
}
}
pub fn register_backend(&mut self, backend: BroadcastBackend) -> Option<BroadcastBackend> {
if let Some(existing) = self
.backends
.iter_mut()
.find(|existing| existing.key == backend.key)
{
return Some(std::mem::replace(existing, backend));
}
self.backends.push(backend);
None
}
pub fn backend(&self, key: &BackendKey) -> Option<&BroadcastBackend> {
self.backends.iter().find(|backend| &backend.key == key)
}
pub fn backends(&self) -> &[BroadcastBackend] {
&self.backends
}
pub fn release_handles_under_path(
&mut self,
path_prefix: impl Into<PathBuf>,
) -> BroadcastResult {
self.broadcast(BroadcastOperation::release_handles(path_prefix))
}
pub fn quiesce(&mut self, reason: QuiesceReason) -> BroadcastResult {
self.broadcast(BroadcastOperation::quiesce(reason))
}
pub fn broadcast(&mut self, operation: BroadcastOperation) -> BroadcastResult {
let mut result = BroadcastResult::new(operation.clone());
for backend in &mut self.backends {
if !backend.live {
result.skipped_dead.push(backend.key.clone());
continue;
}
backend.received.push(operation.clone());
match backend.response {
BroadcastBackendResponse::Ack => {
result.acks.push(BroadcastAck {
key: backend.key.clone(),
});
}
BroadcastBackendResponse::Timeout => {
result.timeouts.push(BroadcastTimeout {
key: backend.key.clone(),
timeout: self.policy.ack_timeout,
});
}
BroadcastBackendResponse::Failure(reason) => {
result.failures.push(BroadcastFailure {
key: backend.key.clone(),
reason,
});
}
}
}
result
}
}
impl Default for LifecycleBroadcastModel {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct BroadcastResult {
pub operation: BroadcastOperation,
pub acks: Vec<BroadcastAck>,
pub timeouts: Vec<BroadcastTimeout>,
pub failures: Vec<BroadcastFailure>,
pub skipped_dead: Vec<BackendKey>,
}
impl BroadcastResult {
fn new(operation: BroadcastOperation) -> Self {
Self {
operation,
acks: Vec::new(),
timeouts: Vec::new(),
failures: Vec::new(),
skipped_dead: Vec::new(),
}
}
pub fn sent_count(&self) -> usize {
self.acks.len() + self.timeouts.len() + self.failures.len()
}
pub fn all_live_backends_acked(&self) -> bool {
self.timeouts.is_empty() && self.failures.is_empty()
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct BroadcastAck {
pub key: BackendKey,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct BroadcastTimeout {
pub key: BackendKey,
pub timeout: Duration,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct BroadcastFailure {
pub key: BackendKey,
pub reason: BroadcastFailureReason,
}