use std::sync::Arc;
use oxirs_core::sla::SlaClass;
use super::admission::{ClusterAdmissionController, SlaError};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ReaderOutcome {
Admitted,
Rejected(SlaError),
}
#[derive(Debug, Clone)]
pub struct SlaReaderGate {
controller: Arc<ClusterAdmissionController>,
}
impl SlaReaderGate {
pub fn new(controller: Arc<ClusterAdmissionController>) -> Self {
Self { controller }
}
pub fn controller(&self) -> &Arc<ClusterAdmissionController> {
&self.controller
}
pub fn try_acquire(&self, class: SlaClass) -> Result<(), SlaError> {
self.controller.acquire_permit(class)
}
pub fn admit(&self, class: SlaClass) -> ReaderOutcome {
match self.controller.acquire_permit(class) {
Ok(()) => ReaderOutcome::Admitted,
Err(e) => ReaderOutcome::Rejected(e),
}
}
pub fn try_acquire_with_cost(&self, class: SlaClass, cost: f64) -> Result<(), SlaError> {
self.controller.acquire_permit_with_cost(class, cost)
}
pub fn release(&self, class: SlaClass) -> Result<(), SlaError> {
self.controller.release_permit(class)
}
pub fn scoped<F, T>(&self, class: SlaClass, f: F) -> Result<T, SlaError>
where
F: FnOnce() -> T,
{
self.controller.scoped_admit(class, f)
}
pub fn in_flight(&self, class: SlaClass) -> Result<usize, SlaError> {
self.controller.in_flight(class)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sla::admission::{SlaAdmissionConfig, SlaClassQuota};
use std::collections::HashMap;
fn small_gate() -> SlaReaderGate {
let mut quotas = HashMap::new();
quotas.insert(
SlaClass::Silver,
SlaClassQuota {
max_qps: None,
max_concurrent: 2,
token_cost: 1.0,
},
);
let controller = Arc::new(ClusterAdmissionController::new(SlaAdmissionConfig {
quotas,
}));
SlaReaderGate::new(controller)
}
#[test]
fn admit_succeeds_within_cap() {
let gate = small_gate();
assert_eq!(gate.admit(SlaClass::Silver), ReaderOutcome::Admitted);
assert_eq!(gate.admit(SlaClass::Silver), ReaderOutcome::Admitted);
}
#[test]
fn admit_blocked_after_cap() {
let gate = small_gate();
gate.try_acquire(SlaClass::Silver).expect("1");
gate.try_acquire(SlaClass::Silver).expect("2");
let outcome = gate.admit(SlaClass::Silver);
match outcome {
ReaderOutcome::Rejected(SlaError::ConcurrencyCapExceeded { limit, .. }) => {
assert_eq!(limit, 2);
}
other => panic!("unexpected outcome: {:?}", other),
}
}
#[test]
fn cost_aware_admit_drains_bucket() {
let gate = small_gate();
gate.try_acquire_with_cost(SlaClass::Silver, 10.0)
.expect("expensive read");
assert_eq!(gate.in_flight(SlaClass::Silver).expect("count"), 1);
gate.release(SlaClass::Silver).expect("release");
}
#[test]
fn cost_aware_admit_rejects_unregistered() {
let gate = small_gate();
let res = gate.try_acquire_with_cost(SlaClass::Bronze, 1.0);
assert!(matches!(res, Err(SlaError::ClassNotRegistered { .. })));
}
#[test]
fn scoped_releases_on_normal_return() {
let gate = small_gate();
gate.scoped(SlaClass::Silver, || ()).expect("scoped");
assert_eq!(gate.in_flight(SlaClass::Silver).expect("count"), 0);
}
}