use indexmap::IndexMap;
use meerkat_core::error::AgentError;
use meerkat_core::service::SessionError;
use meerkat_core::types::SessionId;
use std::sync::Mutex;
#[cfg(target_arch = "wasm32")]
use crate::tokio::sync::{OwnedSemaphorePermit, Semaphore};
#[cfg(not(target_arch = "wasm32"))]
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use std::sync::Arc;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MaterializationStatus {
Staged,
Promoting,
Active,
}
enum MaterializationRecord {
Staged {
permit: Option<OwnedSemaphorePermit>,
},
Promoting,
Active,
}
impl MaterializationRecord {
fn status(&self) -> MaterializationStatus {
match self {
Self::Staged { .. } => MaterializationStatus::Staged,
Self::Promoting => MaterializationStatus::Promoting,
Self::Active => MaterializationStatus::Active,
}
}
}
pub struct AdmissionOutcome {
pub status: MaterializationStatus,
pub permit: Option<OwnedSemaphorePermit>,
}
pub struct StagedPromotion {
pub permit: Option<OwnedSemaphorePermit>,
}
pub(crate) struct PromotionTicket {
registry: Arc<StagedSessionRegistry>,
id: SessionId,
}
impl PromotionTicket {
pub(crate) fn new(registry: Arc<StagedSessionRegistry>, id: SessionId) -> Self {
Self { registry, id }
}
pub(crate) fn commit(self) {
self.registry.complete_promotion(&self.id);
}
pub(crate) fn settle(self, permit: Option<OwnedSemaphorePermit>) {
self.registry.finish_promotion(&self.id, permit);
}
}
pub struct StagedSessionRegistry {
inner: Mutex<RegistryInner>,
capacity: Option<Arc<Semaphore>>,
max_sessions: Option<usize>,
}
#[derive(Default)]
struct RegistryInner {
records: IndexMap<SessionId, MaterializationRecord>,
}
impl StagedSessionRegistry {
pub fn bounded(max_sessions: usize) -> Self {
Self {
inner: Mutex::new(RegistryInner::default()),
capacity: Some(Arc::new(Semaphore::new(max_sessions))),
max_sessions: Some(max_sessions),
}
}
pub fn unbounded() -> Self {
Self {
inner: Mutex::new(RegistryInner::default()),
capacity: None,
max_sessions: None,
}
}
fn lock(&self) -> std::sync::MutexGuard<'_, RegistryInner> {
self.inner
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
}
fn capacity_exhausted_error(&self) -> SessionError {
let max_sessions = self.max_sessions.unwrap_or(0);
let available = self
.capacity
.as_ref()
.map(|c| c.available_permits())
.unwrap_or(0);
let active = max_sessions.saturating_sub(available);
SessionError::Agent(AgentError::InternalError(format!(
"Max sessions reached ({active}/{max_sessions})"
)))
}
fn try_acquire_permit(&self) -> Result<Option<OwnedSemaphorePermit>, SessionError> {
match self.capacity.as_ref() {
Some(capacity) => match Arc::clone(capacity).try_acquire_owned() {
Ok(permit) => Ok(Some(permit)),
Err(_) => Err(self.capacity_exhausted_error()),
},
None => Ok(None),
}
}
pub fn reserve(&self, id: &SessionId) -> Result<AdmissionOutcome, SessionError> {
let mut inner = self.lock();
let permit = self.try_acquire_permit()?;
inner
.records
.insert(id.clone(), MaterializationRecord::Active);
Ok(AdmissionOutcome {
status: MaterializationStatus::Active,
permit,
})
}
pub fn reserve_capacity(&self) -> Result<Option<OwnedSemaphorePermit>, SessionError> {
let _inner = self.lock();
self.try_acquire_permit()
}
pub fn record_staged(&self, id: &SessionId, permit: Option<OwnedSemaphorePermit>) {
self.lock()
.records
.insert(id.clone(), MaterializationRecord::Staged { permit });
}
pub fn record_active(&self, id: &SessionId) {
self.lock()
.records
.insert(id.clone(), MaterializationRecord::Active);
}
pub fn begin_promotion(&self, id: &SessionId) -> Option<StagedPromotion> {
let mut inner = self.lock();
let record = inner.records.get_mut(id)?;
match record {
MaterializationRecord::Staged { permit } => {
let permit = permit.take();
*record = MaterializationRecord::Promoting;
Some(StagedPromotion { permit })
}
_ => None,
}
}
pub fn complete_promotion(&self, id: &SessionId) {
let mut inner = self.lock();
if let Some(record) = inner.records.get_mut(id)
&& matches!(record, MaterializationRecord::Promoting)
{
*record = MaterializationRecord::Active;
}
}
pub fn finish_promotion(&self, id: &SessionId, permit: Option<OwnedSemaphorePermit>) {
let mut inner = self.lock();
if let Some(record) = inner.records.get_mut(id)
&& matches!(record, MaterializationRecord::Promoting)
{
*record = MaterializationRecord::Staged { permit };
return;
}
drop(permit);
}
pub fn status(&self, id: &SessionId) -> Option<MaterializationStatus> {
self.lock()
.records
.get(id)
.map(MaterializationRecord::status)
}
pub fn forget(&self, id: &SessionId) {
self.lock().records.swap_remove(id);
}
pub fn ensure_capacity_available(&self) -> Result<(), SessionError> {
let Some(capacity) = self.capacity.as_ref() else {
return Ok(());
};
if capacity.available_permits() > 0 {
return Ok(());
}
Err(self.capacity_exhausted_error())
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
fn sid() -> SessionId {
SessionId::new()
}
#[test]
fn reserve_consumes_permit_and_records_active_status() {
let registry = StagedSessionRegistry::bounded(1);
let id = sid();
let outcome = registry.reserve(&id).expect("first reservation admitted");
assert_eq!(outcome.status, MaterializationStatus::Active);
assert!(
outcome.permit.is_some(),
"bounded registry hands back the consumed permit"
);
assert_eq!(registry.status(&id), Some(MaterializationStatus::Active));
}
#[test]
fn reserve_fails_closed_when_capacity_exhausted() {
let registry = StagedSessionRegistry::bounded(1);
let held = registry
.reserve(&sid())
.expect("first reservation admitted");
assert!(held.permit.is_some());
let err = registry
.reserve(&sid())
.err()
.expect("second reservation must be rejected, not laundered to success");
let msg = format!("{err}");
assert!(
msg.contains("Max sessions reached"),
"expected typed capacity-exhaustion error, got: {msg}"
);
}
#[test]
fn permit_returns_to_gate_when_outcome_dropped() {
let registry = StagedSessionRegistry::bounded(1);
{
let outcome = registry
.reserve(&sid())
.expect("first reservation admitted");
drop(outcome);
}
registry
.reserve(&sid())
.expect("capacity freed after permit drop");
}
#[test]
fn record_staged_takes_permit_custody() {
let registry = StagedSessionRegistry::bounded(1);
let id = sid();
let permit = registry.reserve_capacity().expect("capacity reserved");
assert!(permit.is_some());
registry.record_staged(&id, permit);
assert_eq!(registry.status(&id), Some(MaterializationStatus::Staged));
registry
.ensure_capacity_available()
.expect_err("staged custody keeps the gate closed");
registry.forget(&id);
registry
.ensure_capacity_available()
.expect("capacity freed when the staged record is forgotten");
}
#[test]
fn begin_promotion_is_single_winner_and_transfers_custody() {
let registry = StagedSessionRegistry::bounded(2);
let id = sid();
let permit = registry.reserve_capacity().expect("capacity reserved");
registry.record_staged(&id, permit);
let promotion = registry
.begin_promotion(&id)
.expect("first promotion of a staged reservation wins");
assert!(
promotion.permit.is_some(),
"the staged permit's custody transfers to the winner"
);
assert_eq!(registry.status(&id), Some(MaterializationStatus::Promoting));
assert!(
registry.begin_promotion(&id).is_none(),
"second concurrent promotion of the same reservation is rejected"
);
}
#[test]
fn begin_promotion_rejects_non_staged_sessions() {
let registry = StagedSessionRegistry::bounded(2);
let id = sid();
assert!(registry.begin_promotion(&id).is_none());
let _held = registry.reserve(&id).expect("reservation admitted");
assert!(registry.begin_promotion(&id).is_none());
}
#[test]
fn uncommitted_promotion_settles_back_to_staged() {
let registry = StagedSessionRegistry::bounded(1);
let id = sid();
let permit = registry.reserve_capacity().expect("capacity reserved");
registry.record_staged(&id, permit);
let promotion = registry.begin_promotion(&id).expect("promotion wins");
registry.finish_promotion(&id, promotion.permit);
assert_eq!(registry.status(&id), Some(MaterializationStatus::Staged));
registry
.ensure_capacity_available()
.expect_err("restored staged reservation re-takes permit custody");
}
#[test]
fn committed_promotion_settles_to_released_capacity() {
let registry = StagedSessionRegistry::bounded(1);
let id = sid();
let permit = registry.reserve_capacity().expect("capacity reserved");
registry.record_staged(&id, permit);
let promotion = registry.begin_promotion(&id).expect("promotion wins");
registry.complete_promotion(&id);
assert_eq!(registry.status(&id), Some(MaterializationStatus::Active));
registry.finish_promotion(&id, promotion.permit);
assert_eq!(registry.status(&id), Some(MaterializationStatus::Active));
registry
.ensure_capacity_available()
.expect("committed promotion returns capacity to the gate");
}
#[test]
fn complete_promotion_only_transitions_promoting_sessions() {
let registry = StagedSessionRegistry::bounded(2);
let id = sid();
registry.complete_promotion(&id);
assert_eq!(registry.status(&id), None);
let permit = registry.reserve_capacity().expect("capacity reserved");
registry.record_staged(&id, permit);
registry.complete_promotion(&id);
assert_eq!(registry.status(&id), Some(MaterializationStatus::Staged));
}
#[test]
fn forget_clears_typed_status() {
let registry = StagedSessionRegistry::bounded(2);
let id = sid();
let _held = registry.reserve(&id).expect("reservation admitted");
registry.forget(&id);
assert_eq!(registry.status(&id), None);
}
#[test]
fn ensure_capacity_available_fails_closed() {
let registry = StagedSessionRegistry::bounded(1);
registry
.ensure_capacity_available()
.expect("spare capacity");
let _held = registry.reserve(&sid()).expect("reservation admitted");
registry
.ensure_capacity_available()
.expect_err("must fail closed once the gate is full");
}
#[test]
fn unbounded_registry_admits_without_permit() {
let registry = StagedSessionRegistry::unbounded();
let outcome = registry
.reserve(&sid())
.expect("unbounded registry always admits");
assert!(
outcome.permit.is_none(),
"unbounded registry has no permit to hand back"
);
registry
.ensure_capacity_available()
.expect("unbounded registry never exhausts");
}
#[test]
fn unbounded_staged_promotion_is_status_gated_not_permit_gated() {
let registry = StagedSessionRegistry::unbounded();
let id = sid();
registry.record_staged(&id, None);
assert_eq!(registry.status(&id), Some(MaterializationStatus::Staged));
let promotion = registry
.begin_promotion(&id)
.expect("unbounded staged session promotes by status");
assert!(promotion.permit.is_none());
assert!(registry.begin_promotion(&id).is_none());
registry.complete_promotion(&id);
assert_eq!(registry.status(&id), Some(MaterializationStatus::Active));
}
}