klieo-mcp-server 2.2.0

Expose any klieo ToolInvoker or Agent as an MCP server over stdio or HTTP. The inverse of klieo-tools-mcp.
Documentation
//! Bounded drop-oldest ring used by `HttpFrameSink`.
//!
//! Producer pushes synchronously; if the ring is full, the oldest
//! item is dropped to make room (operator-visible via the per-call
//! `usize` return + the metric emitted at the sink call site).
//! Consumer awaits via `recv`.
//!
//! Multi-producer / single-consumer. `RingSender` is `Clone` so the
//! same ring can be wired into more than one producer site (the
//! `OnceCell`-cached HTTP `GET /mcp` outbound clones the sender to
//! hand a copy to the SSE response builder). `RingReceiver` is NOT
//! `Clone` — a second consumer racing on `recv` is undefined.
//!
//! Storage is `Mutex<VecDeque<T>>` + `Notify`; the mutex is
//! `std::sync::Mutex` because the critical section never awaits.

use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};

use tokio::sync::Notify;

/// Construct a new bounded ring with the given capacity. Capacity must
/// be > 0; panics otherwise (a zero-capacity ring drops every push,
/// which is never the intended configuration).
pub(crate) fn bounded_ring<T>(capacity: usize) -> (RingSender<T>, RingReceiver<T>) {
    assert!(capacity > 0, "BoundedRing capacity must be > 0");
    let inner = Arc::new(RingInner::<T>::new(capacity));
    (
        RingSender {
            inner: inner.clone(),
        },
        RingReceiver { inner },
    )
}

struct RingInner<T> {
    capacity: usize,
    buf: Mutex<VecDeque<T>>,
    notify: Notify,
    dropped_oldest: AtomicUsize,
    /// Number of live `RingSender` clones. When this hits zero and the
    /// buffer is drained, `recv` returns `None`.
    sender_count: AtomicUsize,
    /// True once the receiver is dropped — push is still allowed (no
    /// panic) but the value is lost without incrementing the
    /// dropped-oldest counter.
    receiver_dropped: AtomicBool,
}

impl<T> RingInner<T> {
    fn new(capacity: usize) -> Self {
        Self {
            capacity,
            buf: Mutex::new(VecDeque::with_capacity(capacity)),
            notify: Notify::new(),
            dropped_oldest: AtomicUsize::new(0),
            sender_count: AtomicUsize::new(1),
            receiver_dropped: AtomicBool::new(false),
        }
    }
}

/// Producer handle. Cloning yields an additional handle that shares
/// the same ring (sender refcount tracked for close semantics).
pub(crate) struct RingSender<T> {
    inner: Arc<RingInner<T>>,
}

impl<T> Clone for RingSender<T> {
    fn clone(&self) -> Self {
        self.inner.sender_count.fetch_add(1, Ordering::SeqCst);
        Self {
            inner: self.inner.clone(),
        }
    }
}

impl<T> Drop for RingSender<T> {
    fn drop(&mut self) {
        let prev = self.inner.sender_count.fetch_sub(1, Ordering::SeqCst);
        if prev == 1 {
            // Last sender gone — wake any pending receiver so it can
            // return `None`.
            self.inner.notify.notify_one();
        }
    }
}

impl<T> RingSender<T> {
    /// Push one value. If the ring is at capacity, the oldest value is
    /// dropped to make room. Returns the number of values dropped
    /// during this call (0 on the happy path, 1 when the ring was
    /// full). If the receiver has been dropped, the value is silently
    /// lost — the return value is still 0 in that case because the
    /// counter tracks capacity-overflow drops, not receiver-closed
    /// losses.
    pub(crate) fn push(&self, value: T) -> usize {
        if self.inner.receiver_dropped.load(Ordering::SeqCst) {
            return 0;
        }
        let mut buf = self
            .inner
            .buf
            .lock()
            .unwrap_or_else(|poisoned| poisoned.into_inner());
        let dropped = if buf.len() >= self.inner.capacity {
            buf.pop_front();
            self.inner.dropped_oldest.fetch_add(1, Ordering::SeqCst);
            1
        } else {
            0
        };
        buf.push_back(value);
        drop(buf);
        self.inner.notify.notify_one();
        dropped
    }

    /// Total drop-oldest events observed by this ring. Test-only
    /// accessor; production callers read the per-call drop count
    /// returned by [`RingSender::push`] instead.
    #[cfg(test)]
    pub(crate) fn dropped_oldest_count(&self) -> usize {
        self.inner.dropped_oldest.load(Ordering::SeqCst)
    }

    /// Returns true if the consumer side has been dropped. Producers
    /// use this to surface `TransportClosed` instead of silently
    /// swallowing frames.
    pub(crate) fn is_receiver_dropped(&self) -> bool {
        self.inner.receiver_dropped.load(Ordering::SeqCst)
    }
}

/// Consumer handle. Single-consumer expected; cloning is not exposed.
pub(crate) struct RingReceiver<T> {
    inner: Arc<RingInner<T>>,
}

impl<T> Drop for RingReceiver<T> {
    fn drop(&mut self) {
        self.inner.receiver_dropped.store(true, Ordering::SeqCst);
    }
}

impl<T> RingReceiver<T> {
    /// Await the next value. Returns `None` once every `RingSender` has
    /// been dropped AND the buffer is empty.
    pub(crate) async fn recv(&mut self) -> Option<T> {
        loop {
            // Register interest BEFORE checking the buffer to avoid
            // missed notifications.
            let notified = self.inner.notify.notified();
            tokio::pin!(notified);
            {
                let mut buf = self
                    .inner
                    .buf
                    .lock()
                    .unwrap_or_else(|poisoned| poisoned.into_inner());
                if let Some(value) = buf.pop_front() {
                    return Some(value);
                }
                if self.inner.sender_count.load(Ordering::SeqCst) == 0 {
                    return None;
                }
            }
            notified.as_mut().await;
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn push_then_recv_delivers_value_in_order() {
        let (tx, mut rx) = bounded_ring::<i32>(4);
        assert_eq!(tx.push(1), 0);
        assert_eq!(tx.push(2), 0);
        assert_eq!(rx.recv().await, Some(1));
        assert_eq!(rx.recv().await, Some(2));
    }

    #[tokio::test]
    async fn push_at_capacity_drops_oldest() {
        let (tx, mut rx) = bounded_ring::<i32>(2);
        assert_eq!(tx.push(1), 0);
        assert_eq!(tx.push(2), 0);
        assert_eq!(tx.push(3), 1); // capacity hit — drops 1
        assert_eq!(rx.recv().await, Some(2));
        assert_eq!(rx.recv().await, Some(3));
        assert_eq!(tx.dropped_oldest_count(), 1);
    }

    #[tokio::test]
    async fn recv_returns_none_when_sender_dropped_and_ring_empty() {
        let (tx, mut rx) = bounded_ring::<i32>(2);
        drop(tx);
        assert_eq!(rx.recv().await, None);
    }

    #[tokio::test]
    async fn recv_awaits_then_completes_on_push() {
        let (tx, mut rx) = bounded_ring::<i32>(2);
        let handle = tokio::spawn(async move { rx.recv().await });
        tokio::task::yield_now().await;
        assert_eq!(tx.push(42), 0);
        assert_eq!(handle.await.unwrap(), Some(42));
    }

    #[tokio::test]
    async fn push_after_recv_close_signals_dropped_oldest_zero() {
        // Receiver dropped — sender keeps pushing but everything is lost,
        // not counted as dropped-oldest (different code path).
        let (tx, rx) = bounded_ring::<i32>(2);
        drop(rx);
        assert_eq!(tx.push(1), 0);
        assert_eq!(tx.dropped_oldest_count(), 0);
    }

    #[test]
    #[should_panic(expected = "BoundedRing capacity must be > 0")]
    fn bounded_ring_panics_on_zero_capacity() {
        let _ = bounded_ring::<i32>(0);
    }

    #[tokio::test]
    async fn recv_returns_none_only_after_last_sender_clone_drops() {
        let (tx, mut rx) = bounded_ring::<i32>(2);
        let tx_clone = tx.clone();

        // Receiver awaits in a background task.
        let recv_handle = tokio::spawn(async move { rx.recv().await });
        tokio::task::yield_now().await;

        // Drop the original sender — one clone still alive, so recv must
        // NOT yet resolve. Give the runtime a chance to wake the task
        // (it shouldn't) and confirm the future is still pending.
        drop(tx);
        tokio::task::yield_now().await;
        tokio::task::yield_now().await;

        // Push something through the surviving clone — this must wake
        // recv and resolve with the value.
        let dropped = tx_clone.push(7);
        assert_eq!(dropped, 0);
        let value = tokio::time::timeout(std::time::Duration::from_millis(100), recv_handle)
            .await
            .expect("recv resolved within timeout")
            .expect("recv task did not panic");
        assert_eq!(value, Some(7));
    }

    #[tokio::test]
    async fn recv_returns_none_when_all_clones_drop_and_ring_empty() {
        let (tx, mut rx) = bounded_ring::<i32>(2);
        let tx_clone = tx.clone();
        drop(tx);
        drop(tx_clone);
        assert_eq!(rx.recv().await, None);
    }
}