mnemara-server 0.1.0

Local-first, explainable AI memory engine for embedded and service-based systems
Documentation
use std::collections::VecDeque;
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};

use mnemara_core::{OperationTrace, TraceListRequest};

#[derive(Debug, Clone, serde::Serialize)]
pub struct TraceRegistrySnapshot {
    pub stored_traces: u64,
    pub trace_capacity: u64,
    pub evicted_traces: u64,
    pub oldest_started_at_unix_ms: Option<u64>,
    pub newest_started_at_unix_ms: Option<u64>,
}

#[derive(Debug)]
pub struct TraceRegistry {
    capacity: usize,
    sequence: AtomicU64,
    evicted: AtomicU64,
    traces: Mutex<VecDeque<OperationTrace>>,
}

impl TraceRegistry {
    pub fn new(capacity: usize) -> Self {
        Self {
            capacity: capacity.max(1),
            sequence: AtomicU64::new(1),
            evicted: AtomicU64::new(0),
            traces: Mutex::new(VecDeque::new()),
        }
    }

    pub fn next_id(&self, prefix: &str) -> String {
        let now = now_unix_ms();
        let sequence = self.sequence.fetch_add(1, Ordering::Relaxed);
        format!("{prefix}-{now}-{sequence}")
    }

    pub fn record(&self, trace: OperationTrace) -> bool {
        let mut traces = self.traces.lock().expect("trace registry poisoned");
        traces.push_front(trace);
        let mut evicted = false;
        while traces.len() > self.capacity {
            traces.pop_back();
            self.evicted.fetch_add(1, Ordering::Relaxed);
            evicted = true;
        }
        evicted
    }

    pub fn list(&self, request: &TraceListRequest) -> Vec<OperationTrace> {
        let limit = request.limit.unwrap_or(self.capacity).min(self.capacity);
        let operation = request.operation.clone();
        let status = request.status.clone();
        let before_started_at_unix_ms = request.before_started_at_unix_ms;
        self.traces
            .lock()
            .expect("trace registry poisoned")
            .iter()
            .filter(|trace| {
                request
                    .tenant_id
                    .as_deref()
                    .is_none_or(|tenant_id| trace.tenant_id.as_deref() == Some(tenant_id))
            })
            .filter(|trace| {
                request
                    .namespace
                    .as_deref()
                    .is_none_or(|namespace| trace.namespace.as_deref() == Some(namespace))
            })
            .filter(|trace| {
                operation
                    .as_ref()
                    .is_none_or(|value| &trace.operation == value)
            })
            .filter(|trace| status.as_ref().is_none_or(|value| &trace.status == value))
            .filter(|trace| {
                before_started_at_unix_ms.is_none_or(|before| trace.started_at_unix_ms <= before)
            })
            .take(limit)
            .cloned()
            .collect()
    }

    pub fn get(&self, trace_id: &str) -> Option<OperationTrace> {
        self.traces
            .lock()
            .expect("trace registry poisoned")
            .iter()
            .find(|trace| trace.trace_id == trace_id)
            .cloned()
    }

    pub fn snapshot(&self) -> TraceRegistrySnapshot {
        let traces = self.traces.lock().expect("trace registry poisoned");
        TraceRegistrySnapshot {
            stored_traces: traces.len() as u64,
            trace_capacity: self.capacity as u64,
            evicted_traces: self.evicted.load(Ordering::Relaxed),
            oldest_started_at_unix_ms: traces.back().map(|trace| trace.started_at_unix_ms),
            newest_started_at_unix_ms: traces.front().map(|trace| trace.started_at_unix_ms),
        }
    }
}

pub fn now_unix_ms() -> u64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map(|duration| duration.as_millis() as u64)
        .unwrap_or(0)
}