Skip to main content

klieo_core/
bus.rs

1//! Inter-agent bus traits — Pubsub, RequestReply, KvStore, JobQueue.
2//!
3//! Trait shapes mirror NATS JetStream + KV semantics so the production
4//! impl (`klieo-bus-nats`) is a thin adapter, and the in-process impl
5//! (`klieo-bus-memory`) can faithfully simulate them. See the spec for
6//! reliability invariants.
7
8use crate::error::BusError;
9use crate::ids::{DurableName, JobId};
10use async_trait::async_trait;
11use bytes::Bytes;
12use futures_core::Stream;
13use std::collections::HashMap;
14use std::pin::Pin;
15use std::time::Duration;
16
17/// Opaque headers accompanying a bus message.
18pub type Headers = HashMap<String, String>;
19
20/// One message delivered to a subscriber.
21pub struct Msg {
22    /// Subject the message was published on.
23    pub subject: String,
24    /// Payload bytes.
25    pub payload: Bytes,
26    /// Headers.
27    pub headers: Headers,
28    /// Acknowledgement handle. The impl provides the underlying mechanism;
29    /// callers must invoke exactly one of `ack` / `nak` / `term` per
30    /// message or rely on visibility-timeout redelivery.
31    pub ack: AckHandle,
32}
33
34impl std::fmt::Debug for Msg {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        f.debug_struct("Msg")
37            .field("subject", &self.subject)
38            .field("payload_len", &self.payload.len())
39            .field("headers", &self.headers)
40            .finish()
41    }
42}
43
44/// Acknowledgement handle attached to a delivered [`Msg`].
45pub struct AckHandle(Box<dyn AckHandleImpl>);
46
47impl AckHandle {
48    /// Construct from an impl. Bus implementations call this when delivering messages.
49    pub fn new(inner: Box<dyn AckHandleImpl>) -> Self {
50        Self(inner)
51    }
52}
53
54/// Implementor side of an ack handle. Exposed so impls can supply backend
55/// behaviour without callers reaching into private types.
56#[async_trait]
57pub trait AckHandleImpl: Send + Sync {
58    /// Acknowledge successful processing.
59    async fn ack(self: Box<Self>) -> Result<(), BusError>;
60    /// Negative-ack with optional redelivery delay. Triggers redelivery
61    /// after `delay`.
62    async fn nak(self: Box<Self>, delay: Duration) -> Result<(), BusError>;
63    /// Terminate the message (do not redeliver).
64    async fn term(self: Box<Self>) -> Result<(), BusError>;
65}
66
67impl AckHandle {
68    /// Acknowledge.
69    pub async fn ack(self) -> Result<(), BusError> {
70        self.0.ack().await
71    }
72    /// Negative-ack.
73    pub async fn nak(self, delay: Duration) -> Result<(), BusError> {
74        self.0.nak(delay).await
75    }
76    /// Terminate.
77    pub async fn term(self) -> Result<(), BusError> {
78        self.0.term().await
79    }
80}
81
82/// Stream of messages delivered to a subscriber.
83pub type MsgStream = Pin<Box<dyn Stream<Item = Result<Msg, BusError>> + Send + 'static>>;
84
85/// Pub/sub interface (subject-based, durable consumers).
86///
87/// ```
88/// # tokio_test::block_on(async {
89/// use klieo_core::test_utils::noop_bus;
90/// use klieo_core::{Headers, Pubsub};
91/// use bytes::Bytes;
92/// let (pubsub, _, _, _) = noop_bus();
93/// pubsub.publish("subject.demo", Bytes::from_static(b"hi"), Headers::new())
94///     .await.unwrap();
95/// # });
96/// ```
97#[async_trait]
98pub trait Pubsub: Send + Sync {
99    /// Publish `payload` on `subject` with `headers`.
100    async fn publish(
101        &self,
102        subject: &str,
103        payload: Bytes,
104        headers: Headers,
105    ) -> Result<(), BusError>;
106
107    /// Subscribe with a durable consumer name. Multiple calls with the
108    /// same `durable` form a competing-consumer group sharing replays.
109    async fn subscribe(&self, subject: &str, durable: DurableName) -> Result<MsgStream, BusError>;
110}
111
112/// Synchronous request/response over the bus.
113///
114/// ```
115/// # tokio_test::block_on(async {
116/// use klieo_core::test_utils::noop_bus;
117/// use klieo_core::{BusError, RequestReply};
118/// use bytes::Bytes;
119/// use std::time::Duration;
120/// let (_, request_reply, _, _) = noop_bus();
121/// let err = request_reply
122///     .request("svc.add", Bytes::from_static(b"1"), Duration::from_secs(1))
123///     .await
124///     .unwrap_err();
125/// assert!(matches!(err, BusError::NotFound(_)));
126/// # });
127/// ```
128#[async_trait]
129pub trait RequestReply: Send + Sync {
130    /// Send a request and await one reply, bounded by `timeout`.
131    async fn request(
132        &self,
133        subject: &str,
134        payload: Bytes,
135        timeout: Duration,
136    ) -> Result<Bytes, BusError>;
137}
138
139/// CAS revision returned by KV writes.
140pub type Revision = u64;
141
142/// One KV entry.
143#[derive(Debug, Clone)]
144pub struct KvEntry {
145    /// Stored value.
146    pub value: Bytes,
147    /// Revision number after the last write.
148    pub revision: Revision,
149}
150
151/// Bucket-keyed durable KV store with CAS.
152///
153/// ```
154/// # tokio_test::block_on(async {
155/// use klieo_core::test_utils::noop_bus;
156/// use klieo_core::KvStore;
157/// use bytes::Bytes;
158/// let (_, _, kv, _) = noop_bus();
159/// let rev = kv.put("bucket", "key", Bytes::from_static(b"v")).await.unwrap();
160/// assert_eq!(rev, 1);
161/// # });
162/// ```
163#[async_trait]
164pub trait KvStore: Send + Sync {
165    /// Read a value.
166    async fn get(&self, bucket: &str, key: &str) -> Result<Option<KvEntry>, BusError>;
167
168    /// Unconditional write. Returns the new revision.
169    async fn put(&self, bucket: &str, key: &str, value: Bytes) -> Result<Revision, BusError>;
170
171    /// Compare-and-set. `expected = None` requires the key to be absent;
172    /// `Some(rev)` requires the current revision to equal `rev`. Returns
173    /// the new revision on success.
174    async fn cas(
175        &self,
176        bucket: &str,
177        key: &str,
178        value: Bytes,
179        expected: Option<Revision>,
180    ) -> Result<Revision, BusError>;
181
182    /// Delete a key.
183    async fn delete(&self, bucket: &str, key: &str) -> Result<(), BusError>;
184
185    /// Acquire an exclusive lease over `key` for `ttl`. Implementations
186    /// should hold the lease as long as the returned [`Lease`] is live
187    /// and call its `heartbeat` to extend.
188    async fn lease(&self, bucket: &str, key: &str, ttl: Duration) -> Result<Lease, BusError>;
189}
190
191/// Lease handle. Heartbeat to extend; drop to release.
192pub struct Lease(Box<dyn LeaseImpl>);
193
194impl Lease {
195    /// Construct from an impl. KvStore implementations call this when granting a lease.
196    pub fn new(inner: Box<dyn LeaseImpl>) -> Self {
197        Self(inner)
198    }
199}
200
201/// Implementor side of a lease.
202#[async_trait]
203pub trait LeaseImpl: Send + Sync {
204    /// Extend the TTL.
205    async fn heartbeat(&self) -> Result<(), BusError>;
206}
207
208impl Lease {
209    /// Extend the TTL.
210    pub async fn heartbeat(&self) -> Result<(), BusError> {
211        self.0.heartbeat().await
212    }
213}
214
215/// Job enqueued for durable processing.
216#[derive(Debug, Clone)]
217pub struct Job {
218    /// Job payload.
219    pub payload: Bytes,
220    /// Optional dedup key — if set, the impl writes a `dedup.<queue>`
221    /// idempotency record before invoking the handler.
222    pub dedup_key: Option<String>,
223    /// Maximum redelivery attempts before routing to the DLQ subject.
224    /// `None` = use queue default (5).
225    pub max_attempts: Option<u32>,
226}
227
228impl Job {
229    /// Build a job with default settings from raw bytes.
230    pub fn new(payload: impl Into<Bytes>) -> Self {
231        Self {
232            payload: payload.into(),
233            dedup_key: None,
234            max_attempts: None,
235        }
236    }
237}
238
239/// Job claimed by a worker.
240pub struct ClaimedJob {
241    /// Job id.
242    pub id: JobId,
243    /// Job payload.
244    pub payload: Bytes,
245    /// Lease handle. Caller heartbeats; on drop without ack/nak/dlq the
246    /// lease expires and the impl redelivers.
247    pub lease: Lease,
248    /// Internal handle the impl uses to mark the claim resolved.
249    pub claim: ClaimHandle,
250}
251
252impl std::fmt::Debug for ClaimedJob {
253    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254        f.debug_struct("ClaimedJob")
255            .field("id", &self.id)
256            .field("payload_len", &self.payload.len())
257            .finish()
258    }
259}
260
261/// Handle the impl uses to resolve a claim.
262pub struct ClaimHandle(Box<dyn ClaimHandleImpl>);
263
264impl ClaimHandle {
265    /// Construct from an impl. JobQueue implementations call this when claiming a job.
266    pub fn new(inner: Box<dyn ClaimHandleImpl>) -> Self {
267        Self(inner)
268    }
269}
270
271/// Implementor side of a claim handle.
272#[async_trait]
273pub trait ClaimHandleImpl: Send + Sync {
274    /// Successful completion. Releases the lease, marks the job done.
275    async fn ack(self: Box<Self>) -> Result<(), BusError>;
276    /// Retry with backoff.
277    async fn nak(self: Box<Self>, delay: Duration) -> Result<(), BusError>;
278    /// Send to DLQ.
279    async fn dead_letter(self: Box<Self>, reason: &str) -> Result<(), BusError>;
280}
281
282impl ClaimedJob {
283    /// Heartbeat the lease.
284    pub async fn heartbeat(&self) -> Result<(), BusError> {
285        self.lease.heartbeat().await
286    }
287    /// Acknowledge successful completion.
288    pub async fn ack(self) -> Result<(), BusError> {
289        self.claim.0.ack().await
290    }
291    /// Negative-ack; retry after `delay`.
292    pub async fn nak(self, delay: Duration) -> Result<(), BusError> {
293        self.claim.0.nak(delay).await
294    }
295    /// Route to DLQ.
296    pub async fn dead_letter(self, reason: &str) -> Result<(), BusError> {
297        self.claim.0.dead_letter(reason).await
298    }
299}
300
301/// Durable competing-consumer job queue. **No ordering guarantee** —
302/// workloads requiring order must use [`Pubsub`] on a partitioned subject
303/// with one consumer per partition instead.
304///
305/// ```
306/// # tokio_test::block_on(async {
307/// use klieo_core::test_utils::noop_bus;
308/// use klieo_core::{Job, JobQueue};
309/// use bytes::Bytes;
310/// let (_, _, _, jobs) = noop_bus();
311/// let id = jobs.enqueue("queue.work", Job::new(Bytes::from_static(b"payload"))).await.unwrap();
312/// assert_eq!(id.0, "noop-0");
313/// # });
314/// ```
315#[async_trait]
316pub trait JobQueue: Send + Sync {
317    /// Enqueue a job. Returns a stable id.
318    async fn enqueue(&self, queue: &str, job: Job) -> Result<JobId, BusError>;
319
320    /// Claim the next available job. Returns `None` when the queue is
321    /// empty. Implementations may long-poll up to a small bounded
322    /// duration before returning `None`.
323    async fn claim(
324        &self,
325        queue: &str,
326        worker_id: &str,
327        lease_ttl: Duration,
328    ) -> Result<Option<ClaimedJob>, BusError>;
329}
330
331#[cfg(test)]
332mod tests {
333    use super::*;
334
335    #[allow(dead_code)]
336    fn _assert_dyn_pubsub(_: &dyn Pubsub) {}
337    #[allow(dead_code)]
338    fn _assert_dyn_request(_: &dyn RequestReply) {}
339    #[allow(dead_code)]
340    fn _assert_dyn_kv(_: &dyn KvStore) {}
341    #[allow(dead_code)]
342    fn _assert_dyn_jobs(_: &dyn JobQueue) {}
343}