mnemara-server 0.1.0

Local-first, explainable AI memory engine for embedded and service-based systems
Documentation
use std::collections::{BTreeMap, VecDeque};
use std::sync::{Arc, Mutex};
use std::time::Duration;

use tokio::sync::{OwnedSemaphorePermit, Semaphore};

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AdmissionClass {
    Read,
    Write,
    Admin,
}

#[derive(Debug, Clone, serde::Serialize)]
pub struct RuntimeAdmissionStatus {
    pub max_inflight_reads: u64,
    pub max_inflight_writes: u64,
    pub max_inflight_admin: u64,
    pub max_queued_requests: u64,
    pub max_tenant_inflight: u64,
    pub queued_total: u64,
    pub inflight_total: u64,
    pub queued_reads: u64,
    pub queued_writes: u64,
    pub queued_admin: u64,
    pub inflight_reads: u64,
    pub inflight_writes: u64,
    pub inflight_admin: u64,
    pub tenant_inflight: BTreeMap<String, u64>,
    pub queue_wait_timeout_ms: u64,
    pub average_queue_wait_ms: u64,
    pub max_queue_wait_ms: u64,
    pub oldest_queue_wait_ms: u64,
    pub oldest_read_queue_wait_ms: u64,
    pub oldest_write_queue_wait_ms: u64,
    pub oldest_admin_queue_wait_ms: u64,
    pub fair_queue_policy: String,
}

#[derive(Debug, Clone)]
pub struct AdmissionConfig {
    pub max_inflight_reads: usize,
    pub max_inflight_writes: usize,
    pub max_inflight_admin: usize,
    pub max_queued_requests: usize,
    pub max_tenant_inflight: usize,
    pub queue_wait_timeout_ms: u64,
}

#[derive(Debug)]
pub struct AdmissionController {
    config: AdmissionConfig,
    reads: Arc<Semaphore>,
    writes: Arc<Semaphore>,
    admin: Arc<Semaphore>,
    state: Mutex<AdmissionState>,
}

#[derive(Debug, Default)]
struct AdmissionState {
    queued_total: u64,
    inflight_total: u64,
    queued_reads: u64,
    queued_writes: u64,
    queued_admin: u64,
    inflight_reads: u64,
    inflight_writes: u64,
    inflight_admin: u64,
    tenant_inflight: BTreeMap<String, u64>,
    queued_read_since: VecDeque<u64>,
    queued_write_since: VecDeque<u64>,
    queued_admin_since: VecDeque<u64>,
    total_queue_wait_ms: u64,
    completed_queue_waits: u64,
    max_queue_wait_ms: u64,
}

#[derive(Debug)]
pub enum AdmissionError {
    QueueFull,
    TimedOut,
    TenantBusy,
}

#[derive(Debug)]
pub struct AdmissionGuard {
    controller: Arc<AdmissionController>,
    class: AdmissionClass,
    tenant_id: Option<String>,
    _permit: OwnedSemaphorePermit,
}

impl AdmissionController {
    pub fn new(config: AdmissionConfig) -> Self {
        Self {
            reads: Arc::new(Semaphore::new(config.max_inflight_reads.max(1))),
            writes: Arc::new(Semaphore::new(config.max_inflight_writes.max(1))),
            admin: Arc::new(Semaphore::new(config.max_inflight_admin.max(1))),
            config,
            state: Mutex::new(AdmissionState::default()),
        }
    }

    pub async fn acquire(
        self: &Arc<Self>,
        class: AdmissionClass,
        tenant_id: Option<&str>,
    ) -> Result<AdmissionGuard, AdmissionError> {
        {
            let mut state = self.state.lock().expect("admission state poisoned");
            if state.queued_total as usize >= self.config.max_queued_requests {
                return Err(AdmissionError::QueueFull);
            }
            let enqueued_at = now_unix_ms();
            state.queued_total += 1;
            match class {
                AdmissionClass::Read => {
                    state.queued_reads += 1;
                    state.queued_read_since.push_back(enqueued_at);
                }
                AdmissionClass::Write => {
                    state.queued_writes += 1;
                    state.queued_write_since.push_back(enqueued_at);
                }
                AdmissionClass::Admin => {
                    state.queued_admin += 1;
                    state.queued_admin_since.push_back(enqueued_at);
                }
            }
        }

        let permit_result = tokio::time::timeout(
            Duration::from_millis(self.config.queue_wait_timeout_ms.max(1)),
            match class {
                AdmissionClass::Read => self.reads.clone().acquire_owned(),
                AdmissionClass::Write => self.writes.clone().acquire_owned(),
                AdmissionClass::Admin => self.admin.clone().acquire_owned(),
            },
        )
        .await;

        let permit = match permit_result {
            Ok(Ok(permit)) => permit,
            Ok(Err(_)) | Err(_) => {
                self.release_queue(class);
                return Err(AdmissionError::TimedOut);
            }
        };

        let tenant_id = tenant_id.map(ToOwned::to_owned);
        self.release_queue(class);
        {
            let mut state = self.state.lock().expect("admission state poisoned");
            if let Some(tenant_id) = tenant_id.as_ref() {
                let current = state.tenant_inflight.get(tenant_id).copied().unwrap_or(0);
                if current as usize >= self.config.max_tenant_inflight {
                    return Err(AdmissionError::TenantBusy);
                }
                state.tenant_inflight.insert(tenant_id.clone(), current + 1);
            }

            state.inflight_total += 1;
            match class {
                AdmissionClass::Read => state.inflight_reads += 1,
                AdmissionClass::Write => state.inflight_writes += 1,
                AdmissionClass::Admin => state.inflight_admin += 1,
            }
        }

        Ok(AdmissionGuard {
            controller: Arc::clone(self),
            class,
            tenant_id,
            _permit: permit,
        })
    }

    pub fn snapshot(&self) -> RuntimeAdmissionStatus {
        let state = self.state.lock().expect("admission state poisoned");
        let now = now_unix_ms();
        RuntimeAdmissionStatus {
            max_inflight_reads: self.config.max_inflight_reads as u64,
            max_inflight_writes: self.config.max_inflight_writes as u64,
            max_inflight_admin: self.config.max_inflight_admin as u64,
            max_queued_requests: self.config.max_queued_requests as u64,
            max_tenant_inflight: self.config.max_tenant_inflight as u64,
            queued_total: state.queued_total,
            inflight_total: state.inflight_total,
            queued_reads: state.queued_reads,
            queued_writes: state.queued_writes,
            queued_admin: state.queued_admin,
            inflight_reads: state.inflight_reads,
            inflight_writes: state.inflight_writes,
            inflight_admin: state.inflight_admin,
            tenant_inflight: state.tenant_inflight.clone(),
            queue_wait_timeout_ms: self.config.queue_wait_timeout_ms,
            average_queue_wait_ms: if state.completed_queue_waits == 0 {
                0
            } else {
                state.total_queue_wait_ms / state.completed_queue_waits
            },
            max_queue_wait_ms: state.max_queue_wait_ms,
            oldest_queue_wait_ms: [
                state
                    .queued_read_since
                    .front()
                    .copied()
                    .map(|started| now.saturating_sub(started)),
                state
                    .queued_write_since
                    .front()
                    .copied()
                    .map(|started| now.saturating_sub(started)),
                state
                    .queued_admin_since
                    .front()
                    .copied()
                    .map(|started| now.saturating_sub(started)),
            ]
            .into_iter()
            .flatten()
            .max()
            .unwrap_or(0),
            oldest_read_queue_wait_ms: state
                .queued_read_since
                .front()
                .copied()
                .map(|started| now.saturating_sub(started))
                .unwrap_or(0),
            oldest_write_queue_wait_ms: state
                .queued_write_since
                .front()
                .copied()
                .map(|started| now.saturating_sub(started))
                .unwrap_or(0),
            oldest_admin_queue_wait_ms: state
                .queued_admin_since
                .front()
                .copied()
                .map(|started| now.saturating_sub(started))
                .unwrap_or(0),
            fair_queue_policy: "fifo-per-class".to_string(),
        }
    }

    fn release_queue(&self, class: AdmissionClass) {
        let mut state = self.state.lock().expect("admission state poisoned");
        let started_at = match class {
            AdmissionClass::Read => {
                state.queued_reads = state.queued_reads.saturating_sub(1);
                state.queued_read_since.pop_front()
            }
            AdmissionClass::Write => {
                state.queued_writes = state.queued_writes.saturating_sub(1);
                state.queued_write_since.pop_front()
            }
            AdmissionClass::Admin => {
                state.queued_admin = state.queued_admin.saturating_sub(1);
                state.queued_admin_since.pop_front()
            }
        };
        state.queued_total = state.queued_total.saturating_sub(1);
        if let Some(started_at) = started_at {
            let waited = now_unix_ms().saturating_sub(started_at);
            state.total_queue_wait_ms = state.total_queue_wait_ms.saturating_add(waited);
            state.completed_queue_waits = state.completed_queue_waits.saturating_add(1);
            state.max_queue_wait_ms = state.max_queue_wait_ms.max(waited);
        }
    }
}

impl Drop for AdmissionGuard {
    fn drop(&mut self) {
        let mut state = self
            .controller
            .state
            .lock()
            .expect("admission state poisoned");
        state.inflight_total = state.inflight_total.saturating_sub(1);
        match self.class {
            AdmissionClass::Read => state.inflight_reads = state.inflight_reads.saturating_sub(1),
            AdmissionClass::Write => {
                state.inflight_writes = state.inflight_writes.saturating_sub(1)
            }
            AdmissionClass::Admin => state.inflight_admin = state.inflight_admin.saturating_sub(1),
        }
        if let Some(tenant_id) = self.tenant_id.as_ref() {
            let current = state.tenant_inflight.get(tenant_id).copied().unwrap_or(1);
            if current <= 1 {
                state.tenant_inflight.remove(tenant_id);
            } else {
                state.tenant_inflight.insert(tenant_id.clone(), current - 1);
            }
        }
    }
}

fn now_unix_ms() -> u64 {
    std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .map(|duration| duration.as_millis() as u64)
        .unwrap_or(0)
}