klieo_core/bus.rs
1//! Inter-agent bus traits — Pubsub, RequestReply, KvStore, JobQueue.
2//!
3//! Trait shapes mirror NATS JetStream + KV semantics so the production
4//! impl (`klieo-bus-nats`) is a thin adapter, and the in-process impl
5//! (`klieo-bus-memory`) can faithfully simulate them. See the spec for
6//! reliability invariants.
7
8use crate::error::BusError;
9use crate::ids::{DurableName, JobId};
10use async_trait::async_trait;
11use bytes::Bytes;
12use futures_core::Stream;
13use std::collections::HashMap;
14use std::pin::Pin;
15use std::time::Duration;
16
17/// Opaque headers accompanying a bus message.
18pub type Headers = HashMap<String, String>;
19
20/// One message delivered to a subscriber.
21pub struct Msg {
22 /// Subject the message was published on.
23 pub subject: String,
24 /// Payload bytes.
25 pub payload: Bytes,
26 /// Headers.
27 pub headers: Headers,
28 /// Acknowledgement handle. The impl provides the underlying mechanism;
29 /// callers must invoke exactly one of `ack` / `nak` / `term` per
30 /// message or rely on visibility-timeout redelivery.
31 pub ack: AckHandle,
32}
33
34impl std::fmt::Debug for Msg {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 f.debug_struct("Msg")
37 .field("subject", &self.subject)
38 .field("payload_len", &self.payload.len())
39 .field("headers", &self.headers)
40 .finish()
41 }
42}
43
44/// Acknowledgement handle attached to a delivered [`Msg`].
45pub struct AckHandle(Box<dyn AckHandleImpl>);
46
47impl AckHandle {
48 /// Construct from an impl. Bus implementations call this when delivering messages.
49 pub fn new(inner: Box<dyn AckHandleImpl>) -> Self {
50 Self(inner)
51 }
52}
53
54/// Implementor side of an ack handle. Exposed so impls can supply backend
55/// behaviour without callers reaching into private types.
56#[async_trait]
57pub trait AckHandleImpl: Send + Sync {
58 /// Acknowledge successful processing.
59 async fn ack(self: Box<Self>) -> Result<(), BusError>;
60 /// Negative-ack with optional redelivery delay. Triggers redelivery
61 /// after `delay`.
62 async fn nak(self: Box<Self>, delay: Duration) -> Result<(), BusError>;
63 /// Terminate the message (do not redeliver).
64 async fn term(self: Box<Self>) -> Result<(), BusError>;
65}
66
67impl AckHandle {
68 /// Acknowledge.
69 pub async fn ack(self) -> Result<(), BusError> {
70 self.0.ack().await
71 }
72 /// Negative-ack.
73 pub async fn nak(self, delay: Duration) -> Result<(), BusError> {
74 self.0.nak(delay).await
75 }
76 /// Terminate.
77 pub async fn term(self) -> Result<(), BusError> {
78 self.0.term().await
79 }
80}
81
82/// Stream of messages delivered to a subscriber.
83pub type MsgStream = Pin<Box<dyn Stream<Item = Result<Msg, BusError>> + Send + 'static>>;
84
85/// Pub/sub interface (subject-based, durable consumers).
86///
87/// ```
88/// # tokio_test::block_on(async {
89/// use klieo_core::test_utils::noop_bus;
90/// use klieo_core::{Headers, Pubsub};
91/// use bytes::Bytes;
92/// let (pubsub, _, _, _) = noop_bus();
93/// pubsub.publish("subject.demo", Bytes::from_static(b"hi"), Headers::new())
94/// .await.unwrap();
95/// # });
96/// ```
97#[async_trait]
98pub trait Pubsub: Send + Sync {
99 /// Publish `payload` on `subject` with `headers`.
100 async fn publish(
101 &self,
102 subject: &str,
103 payload: Bytes,
104 headers: Headers,
105 ) -> Result<(), BusError>;
106
107 /// Subscribe with a durable consumer name. Multiple calls with the
108 /// same `durable` form a competing-consumer group sharing replays.
109 async fn subscribe(&self, subject: &str, durable: DurableName) -> Result<MsgStream, BusError>;
110}
111
112/// Synchronous request/response over the bus.
113///
114/// ```
115/// # tokio_test::block_on(async {
116/// use klieo_core::test_utils::noop_bus;
117/// use klieo_core::{BusError, RequestReply};
118/// use bytes::Bytes;
119/// use std::time::Duration;
120/// let (_, request_reply, _, _) = noop_bus();
121/// let err = request_reply
122/// .request("svc.add", Bytes::from_static(b"1"), Duration::from_secs(1))
123/// .await
124/// .unwrap_err();
125/// assert!(matches!(err, BusError::NotFound(_)));
126/// # });
127/// ```
128#[async_trait]
129pub trait RequestReply: Send + Sync {
130 /// Send a request and await one reply, bounded by `timeout`.
131 async fn request(
132 &self,
133 subject: &str,
134 payload: Bytes,
135 timeout: Duration,
136 ) -> Result<Bytes, BusError>;
137}
138
139/// CAS revision returned by KV writes.
140pub type Revision = u64;
141
142/// One KV entry.
143#[derive(Debug, Clone)]
144pub struct KvEntry {
145 /// Stored value.
146 pub value: Bytes,
147 /// Revision number after the last write.
148 pub revision: Revision,
149}
150
151/// Bucket-keyed durable KV store with CAS.
152///
153/// ```
154/// # tokio_test::block_on(async {
155/// use klieo_core::test_utils::noop_bus;
156/// use klieo_core::KvStore;
157/// use bytes::Bytes;
158/// let (_, _, kv, _) = noop_bus();
159/// let rev = kv.put("bucket", "key", Bytes::from_static(b"v")).await.unwrap();
160/// assert_eq!(rev, 1);
161/// # });
162/// ```
163#[async_trait]
164pub trait KvStore: Send + Sync {
165 /// Read a value.
166 async fn get(&self, bucket: &str, key: &str) -> Result<Option<KvEntry>, BusError>;
167
168 /// Unconditional write. Returns the new revision.
169 async fn put(&self, bucket: &str, key: &str, value: Bytes) -> Result<Revision, BusError>;
170
171 /// Compare-and-set. `expected = None` requires the key to be absent;
172 /// `Some(rev)` requires the current revision to equal `rev`. Returns
173 /// the new revision on success.
174 async fn cas(
175 &self,
176 bucket: &str,
177 key: &str,
178 value: Bytes,
179 expected: Option<Revision>,
180 ) -> Result<Revision, BusError>;
181
182 /// Delete a key.
183 async fn delete(&self, bucket: &str, key: &str) -> Result<(), BusError>;
184
185 /// Acquire an exclusive lease over `key` for `ttl`. Implementations
186 /// should hold the lease as long as the returned [`Lease`] is live
187 /// and call its `heartbeat` to extend.
188 async fn lease(&self, bucket: &str, key: &str, ttl: Duration) -> Result<Lease, BusError>;
189}
190
191/// Lease handle. Heartbeat to extend; drop to release.
192pub struct Lease(Box<dyn LeaseImpl>);
193
194impl Lease {
195 /// Construct from an impl. KvStore implementations call this when granting a lease.
196 pub fn new(inner: Box<dyn LeaseImpl>) -> Self {
197 Self(inner)
198 }
199}
200
201/// Implementor side of a lease.
202#[async_trait]
203pub trait LeaseImpl: Send + Sync {
204 /// Extend the TTL.
205 async fn heartbeat(&self) -> Result<(), BusError>;
206}
207
208impl Lease {
209 /// Extend the TTL.
210 pub async fn heartbeat(&self) -> Result<(), BusError> {
211 self.0.heartbeat().await
212 }
213}
214
215/// Job enqueued for durable processing.
216#[derive(Debug, Clone)]
217pub struct Job {
218 /// Job payload.
219 pub payload: Bytes,
220 /// Optional dedup key — if set, the impl writes a `dedup.<queue>`
221 /// idempotency record before invoking the handler.
222 pub dedup_key: Option<String>,
223 /// Maximum redelivery attempts before routing to the DLQ subject.
224 /// `None` = use queue default (5).
225 pub max_attempts: Option<u32>,
226}
227
228impl Job {
229 /// Build a job with default settings from raw bytes.
230 pub fn new(payload: impl Into<Bytes>) -> Self {
231 Self {
232 payload: payload.into(),
233 dedup_key: None,
234 max_attempts: None,
235 }
236 }
237}
238
239/// Job claimed by a worker.
240pub struct ClaimedJob {
241 /// Job id.
242 pub id: JobId,
243 /// Job payload.
244 pub payload: Bytes,
245 /// Lease handle. Caller heartbeats; on drop without ack/nak/dlq the
246 /// lease expires and the impl redelivers.
247 pub lease: Lease,
248 /// Internal handle the impl uses to mark the claim resolved.
249 pub claim: ClaimHandle,
250}
251
252impl std::fmt::Debug for ClaimedJob {
253 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254 f.debug_struct("ClaimedJob")
255 .field("id", &self.id)
256 .field("payload_len", &self.payload.len())
257 .finish()
258 }
259}
260
261/// Handle the impl uses to resolve a claim.
262pub struct ClaimHandle(Box<dyn ClaimHandleImpl>);
263
264impl ClaimHandle {
265 /// Construct from an impl. JobQueue implementations call this when claiming a job.
266 pub fn new(inner: Box<dyn ClaimHandleImpl>) -> Self {
267 Self(inner)
268 }
269}
270
271/// Implementor side of a claim handle.
272#[async_trait]
273pub trait ClaimHandleImpl: Send + Sync {
274 /// Successful completion. Releases the lease, marks the job done.
275 async fn ack(self: Box<Self>) -> Result<(), BusError>;
276 /// Retry with backoff.
277 async fn nak(self: Box<Self>, delay: Duration) -> Result<(), BusError>;
278 /// Send to DLQ.
279 async fn dead_letter(self: Box<Self>, reason: &str) -> Result<(), BusError>;
280}
281
282impl ClaimedJob {
283 /// Heartbeat the lease.
284 pub async fn heartbeat(&self) -> Result<(), BusError> {
285 self.lease.heartbeat().await
286 }
287 /// Acknowledge successful completion.
288 pub async fn ack(self) -> Result<(), BusError> {
289 self.claim.0.ack().await
290 }
291 /// Negative-ack; retry after `delay`.
292 pub async fn nak(self, delay: Duration) -> Result<(), BusError> {
293 self.claim.0.nak(delay).await
294 }
295 /// Route to DLQ.
296 pub async fn dead_letter(self, reason: &str) -> Result<(), BusError> {
297 self.claim.0.dead_letter(reason).await
298 }
299}
300
301/// Durable competing-consumer job queue. **No ordering guarantee** —
302/// workloads requiring order must use [`Pubsub`] on a partitioned subject
303/// with one consumer per partition instead.
304///
305/// ```
306/// # tokio_test::block_on(async {
307/// use klieo_core::test_utils::noop_bus;
308/// use klieo_core::{Job, JobQueue};
309/// use bytes::Bytes;
310/// let (_, _, _, jobs) = noop_bus();
311/// let id = jobs.enqueue("queue.work", Job::new(Bytes::from_static(b"payload"))).await.unwrap();
312/// assert_eq!(id.0, "noop-0");
313/// # });
314/// ```
315#[async_trait]
316pub trait JobQueue: Send + Sync {
317 /// Enqueue a job. Returns a stable id.
318 async fn enqueue(&self, queue: &str, job: Job) -> Result<JobId, BusError>;
319
320 /// Claim the next available job. Returns `None` when the queue is
321 /// empty. Implementations may long-poll up to a small bounded
322 /// duration before returning `None`.
323 async fn claim(
324 &self,
325 queue: &str,
326 worker_id: &str,
327 lease_ttl: Duration,
328 ) -> Result<Option<ClaimedJob>, BusError>;
329}
330
331#[cfg(test)]
332mod tests {
333 use super::*;
334
335 #[allow(dead_code)]
336 fn _assert_dyn_pubsub(_: &dyn Pubsub) {}
337 #[allow(dead_code)]
338 fn _assert_dyn_request(_: &dyn RequestReply) {}
339 #[allow(dead_code)]
340 fn _assert_dyn_kv(_: &dyn KvStore) {}
341 #[allow(dead_code)]
342 fn _assert_dyn_jobs(_: &dyn JobQueue) {}
343}