klieo-core 0.4.0

Core traits + runtime for the klieo agent framework.
Documentation
//! Inter-agent bus traits — Pubsub, RequestReply, KvStore, JobQueue.
//!
//! Trait shapes mirror NATS JetStream + KV semantics so the production
//! impl (`klieo-bus-nats`) is a thin adapter, and the in-process impl
//! (`klieo-bus-memory`) can faithfully simulate them. See the spec for
//! reliability invariants.

use crate::error::BusError;
use crate::ids::{DurableName, JobId};
use async_trait::async_trait;
use bytes::Bytes;
use futures_core::Stream;
use std::collections::HashMap;
use std::pin::Pin;
use std::time::Duration;

/// Opaque headers accompanying a bus message.
pub type Headers = HashMap<String, String>;

/// One message delivered to a subscriber.
pub struct Msg {
    /// Subject the message was published on.
    pub subject: String,
    /// Payload bytes.
    pub payload: Bytes,
    /// Headers.
    pub headers: Headers,
    /// Acknowledgement handle. The impl provides the underlying mechanism;
    /// callers must invoke exactly one of `ack` / `nak` / `term` per
    /// message or rely on visibility-timeout redelivery.
    pub ack: AckHandle,
}

impl std::fmt::Debug for Msg {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Msg")
            .field("subject", &self.subject)
            .field("payload_len", &self.payload.len())
            .field("headers", &self.headers)
            .finish()
    }
}

/// Acknowledgement handle attached to a delivered [`Msg`].
pub struct AckHandle(Box<dyn AckHandleImpl>);

impl AckHandle {
    /// Construct from an impl. Bus implementations call this when delivering messages.
    pub fn new(inner: Box<dyn AckHandleImpl>) -> Self {
        Self(inner)
    }
}

/// Implementor side of an ack handle. Exposed so impls can supply backend
/// behaviour without callers reaching into private types.
#[async_trait]
pub trait AckHandleImpl: Send + Sync {
    /// Acknowledge successful processing.
    async fn ack(self: Box<Self>) -> Result<(), BusError>;
    /// Negative-ack with optional redelivery delay. Triggers redelivery
    /// after `delay`.
    async fn nak(self: Box<Self>, delay: Duration) -> Result<(), BusError>;
    /// Terminate the message (do not redeliver).
    async fn term(self: Box<Self>) -> Result<(), BusError>;
}

impl AckHandle {
    /// Acknowledge.
    pub async fn ack(self) -> Result<(), BusError> {
        self.0.ack().await
    }
    /// Negative-ack.
    pub async fn nak(self, delay: Duration) -> Result<(), BusError> {
        self.0.nak(delay).await
    }
    /// Terminate.
    pub async fn term(self) -> Result<(), BusError> {
        self.0.term().await
    }
}

/// Stream of messages delivered to a subscriber.
pub type MsgStream = Pin<Box<dyn Stream<Item = Result<Msg, BusError>> + Send + 'static>>;

/// Pub/sub interface (subject-based, durable consumers).
///
/// ```
/// # tokio_test::block_on(async {
/// use klieo_core::test_utils::noop_bus;
/// use klieo_core::{Headers, Pubsub};
/// use bytes::Bytes;
/// let (pubsub, _, _, _) = noop_bus();
/// pubsub.publish("subject.demo", Bytes::from_static(b"hi"), Headers::new())
///     .await.unwrap();
/// # });
/// ```
#[async_trait]
pub trait Pubsub: Send + Sync {
    /// Publish `payload` on `subject` with `headers`.
    async fn publish(
        &self,
        subject: &str,
        payload: Bytes,
        headers: Headers,
    ) -> Result<(), BusError>;

    /// Subscribe with a durable consumer name. Multiple calls with the
    /// same `durable` form a competing-consumer group sharing replays.
    async fn subscribe(&self, subject: &str, durable: DurableName) -> Result<MsgStream, BusError>;
}

/// Synchronous request/response over the bus.
///
/// ```
/// # tokio_test::block_on(async {
/// use klieo_core::test_utils::noop_bus;
/// use klieo_core::{BusError, RequestReply};
/// use bytes::Bytes;
/// use std::time::Duration;
/// let (_, request_reply, _, _) = noop_bus();
/// let err = request_reply
///     .request("svc.add", Bytes::from_static(b"1"), Duration::from_secs(1))
///     .await
///     .unwrap_err();
/// assert!(matches!(err, BusError::NotFound(_)));
/// # });
/// ```
#[async_trait]
pub trait RequestReply: Send + Sync {
    /// Send a request and await one reply, bounded by `timeout`.
    async fn request(
        &self,
        subject: &str,
        payload: Bytes,
        timeout: Duration,
    ) -> Result<Bytes, BusError>;
}

/// CAS revision returned by KV writes.
pub type Revision = u64;

/// One KV entry.
#[derive(Debug, Clone)]
pub struct KvEntry {
    /// Stored value.
    pub value: Bytes,
    /// Revision number after the last write.
    pub revision: Revision,
}

/// Bucket-keyed durable KV store with CAS.
///
/// ```
/// # tokio_test::block_on(async {
/// use klieo_core::test_utils::noop_bus;
/// use klieo_core::KvStore;
/// use bytes::Bytes;
/// let (_, _, kv, _) = noop_bus();
/// let rev = kv.put("bucket", "key", Bytes::from_static(b"v")).await.unwrap();
/// assert_eq!(rev, 1);
/// # });
/// ```
#[async_trait]
pub trait KvStore: Send + Sync {
    /// Read a value.
    async fn get(&self, bucket: &str, key: &str) -> Result<Option<KvEntry>, BusError>;

    /// Unconditional write. Returns the new revision.
    async fn put(&self, bucket: &str, key: &str, value: Bytes) -> Result<Revision, BusError>;

    /// Compare-and-set. `expected = None` requires the key to be absent;
    /// `Some(rev)` requires the current revision to equal `rev`. Returns
    /// the new revision on success.
    async fn cas(
        &self,
        bucket: &str,
        key: &str,
        value: Bytes,
        expected: Option<Revision>,
    ) -> Result<Revision, BusError>;

    /// Delete a key.
    async fn delete(&self, bucket: &str, key: &str) -> Result<(), BusError>;

    /// Acquire an exclusive lease over `key` for `ttl`. Implementations
    /// should hold the lease as long as the returned [`Lease`] is live
    /// and call its `heartbeat` to extend.
    async fn lease(&self, bucket: &str, key: &str, ttl: Duration) -> Result<Lease, BusError>;
}

/// Lease handle. Heartbeat to extend; drop to release.
pub struct Lease(Box<dyn LeaseImpl>);

impl Lease {
    /// Construct from an impl. KvStore implementations call this when granting a lease.
    pub fn new(inner: Box<dyn LeaseImpl>) -> Self {
        Self(inner)
    }
}

/// Implementor side of a lease.
#[async_trait]
pub trait LeaseImpl: Send + Sync {
    /// Extend the TTL.
    async fn heartbeat(&self) -> Result<(), BusError>;
}

impl Lease {
    /// Extend the TTL.
    pub async fn heartbeat(&self) -> Result<(), BusError> {
        self.0.heartbeat().await
    }
}

/// Job enqueued for durable processing.
#[derive(Debug, Clone)]
pub struct Job {
    /// Job payload.
    pub payload: Bytes,
    /// Optional dedup key — if set, the impl writes a `dedup.<queue>`
    /// idempotency record before invoking the handler.
    pub dedup_key: Option<String>,
    /// Maximum redelivery attempts before routing to the DLQ subject.
    /// `None` = use queue default (5).
    pub max_attempts: Option<u32>,
}

impl Job {
    /// Build a job with default settings from raw bytes.
    pub fn new(payload: impl Into<Bytes>) -> Self {
        Self {
            payload: payload.into(),
            dedup_key: None,
            max_attempts: None,
        }
    }
}

/// Job claimed by a worker.
pub struct ClaimedJob {
    /// Job id.
    pub id: JobId,
    /// Job payload.
    pub payload: Bytes,
    /// Lease handle. Caller heartbeats; on drop without ack/nak/dlq the
    /// lease expires and the impl redelivers.
    pub lease: Lease,
    /// Internal handle the impl uses to mark the claim resolved.
    pub claim: ClaimHandle,
}

impl std::fmt::Debug for ClaimedJob {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("ClaimedJob")
            .field("id", &self.id)
            .field("payload_len", &self.payload.len())
            .finish()
    }
}

/// Handle the impl uses to resolve a claim.
pub struct ClaimHandle(Box<dyn ClaimHandleImpl>);

impl ClaimHandle {
    /// Construct from an impl. JobQueue implementations call this when claiming a job.
    pub fn new(inner: Box<dyn ClaimHandleImpl>) -> Self {
        Self(inner)
    }
}

/// Implementor side of a claim handle.
#[async_trait]
pub trait ClaimHandleImpl: Send + Sync {
    /// Successful completion. Releases the lease, marks the job done.
    async fn ack(self: Box<Self>) -> Result<(), BusError>;
    /// Retry with backoff.
    async fn nak(self: Box<Self>, delay: Duration) -> Result<(), BusError>;
    /// Send to DLQ.
    async fn dead_letter(self: Box<Self>, reason: &str) -> Result<(), BusError>;
}

impl ClaimedJob {
    /// Heartbeat the lease.
    pub async fn heartbeat(&self) -> Result<(), BusError> {
        self.lease.heartbeat().await
    }
    /// Acknowledge successful completion.
    pub async fn ack(self) -> Result<(), BusError> {
        self.claim.0.ack().await
    }
    /// Negative-ack; retry after `delay`.
    pub async fn nak(self, delay: Duration) -> Result<(), BusError> {
        self.claim.0.nak(delay).await
    }
    /// Route to DLQ.
    pub async fn dead_letter(self, reason: &str) -> Result<(), BusError> {
        self.claim.0.dead_letter(reason).await
    }
}

/// Durable competing-consumer job queue. **No ordering guarantee** —
/// workloads requiring order must use [`Pubsub`] on a partitioned subject
/// with one consumer per partition instead.
///
/// ```
/// # tokio_test::block_on(async {
/// use klieo_core::test_utils::noop_bus;
/// use klieo_core::{Job, JobQueue};
/// use bytes::Bytes;
/// let (_, _, _, jobs) = noop_bus();
/// let id = jobs.enqueue("queue.work", Job::new(Bytes::from_static(b"payload"))).await.unwrap();
/// assert_eq!(id.0, "noop-0");
/// # });
/// ```
#[async_trait]
pub trait JobQueue: Send + Sync {
    /// Enqueue a job. Returns a stable id.
    async fn enqueue(&self, queue: &str, job: Job) -> Result<JobId, BusError>;

    /// Claim the next available job. Returns `None` when the queue is
    /// empty. Implementations may long-poll up to a small bounded
    /// duration before returning `None`.
    async fn claim(
        &self,
        queue: &str,
        worker_id: &str,
        lease_ttl: Duration,
    ) -> Result<Option<ClaimedJob>, BusError>;
}

#[cfg(test)]
mod tests {
    use super::*;

    #[allow(dead_code)]
    fn _assert_dyn_pubsub(_: &dyn Pubsub) {}
    #[allow(dead_code)]
    fn _assert_dyn_request(_: &dyn RequestReply) {}
    #[allow(dead_code)]
    fn _assert_dyn_kv(_: &dyn KvStore) {}
    #[allow(dead_code)]
    fn _assert_dyn_jobs(_: &dyn JobQueue) {}
}