slipstream/stores.rs
1use async_trait::async_trait;
2use std::sync::Arc;
3use std::time::Duration;
4
5use crate::kv::{KvError, KvPurge, KvReader, KvWatcher, KvWriter};
6
7/// Storage type for a store.
8#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
9pub enum StorageType {
10 /// In-memory storage (fast, lost on restart).
11 Memory,
12 /// Persistent storage (survives restarts).
13 #[default]
14 Persistent,
15}
16
17/// What a bounded bucket does when it reaches `max_bytes` (NATS-specific).
18///
19/// This is a real semantic choice, not a tuning knob — it decides what the bucket
20/// gives up at capacity:
21#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
22pub enum DiscardPolicy {
23 /// **Reject** new writes when full, preserving every existing entry (NATS
24 /// `discard:new`). Correct for **config** buckets (certs, configs read as the
25 /// source of truth) where silently dropping a live value is unacceptable — but
26 /// it FREEZES writes at capacity (err 10077). Pair it with `max_age` so the
27 /// bucket is trimmed before it ever fills.
28 #[default]
29 New,
30 /// **Evict the oldest** messages when full (NATS `discard:old`): a hard size
31 /// ceiling that never rejects. Correct for high-churn **log** buckets whose
32 /// consumers hold the durable fold (e.g. routing origins): the bucket is a
33 /// bounded change-feed, not the source of truth, so an evicted entry is
34 /// recovered from the consumer's fold (and the `CursorExpired` resync path),
35 /// while writers never freeze.
36 Old,
37}
38
39impl DiscardPolicy {
40 pub(crate) fn as_nats(self) -> &'static str {
41 match self {
42 DiscardPolicy::New => "new",
43 DiscardPolicy::Old => "old",
44 }
45 }
46}
47
48/// Configuration for creating a store.
49#[derive(Debug, Clone, Default)]
50pub struct StoreConfig {
51 /// Store name/bucket identifier.
52 pub name: String,
53 /// Storage type (memory or persistent).
54 pub storage: StorageType,
55 /// Maximum history/versions to keep (NATS-specific, ignored by other stores).
56 pub max_history: Option<u32>,
57 /// Maximum age for entries in the bucket (bucket-level TTL).
58 /// Entries older than this are automatically removed.
59 /// NATS: maps to `max_age` on bucket config.
60 pub max_age: Option<Duration>,
61 /// Maximum bytes for the bucket (required by Synadia Cloud).
62 /// NATS: maps to `max_bytes` on bucket config.
63 pub max_bytes: Option<i64>,
64 /// Number of stream replicas for the bucket (NATS cluster mode).
65 /// Defaults to 1 (single replica). Set to 3 for production HA clusters.
66 pub num_replicas: Option<usize>,
67 /// Behavior at `max_bytes` (NATS-specific, ignored by other stores). Defaults
68 /// to [`DiscardPolicy::New`] (reject) so config buckets never silently drop a
69 /// live value; set [`DiscardPolicy::Old`] for size-bounded log buckets (routing
70 /// origins) that must never freeze writers. Only applied at bucket *creation* —
71 /// an existing bucket's policy is left untouched.
72 pub discard: DiscardPolicy,
73}
74
75/// A named KV store (bucket/namespace/database).
76pub trait KvStore: Send + Sync {
77 /// The store's name/bucket identifier.
78 fn name(&self) -> &str;
79
80 /// Get the reader interface.
81 fn reader(&self) -> Arc<dyn KvReader>;
82
83 /// Get the watcher interface (if supported).
84 fn watcher(&self) -> Option<Arc<dyn KvWatcher>> {
85 None
86 }
87
88 /// Get the writer interface (if supported).
89 fn writer(&self) -> Option<Arc<dyn KvWriter>> {
90 None
91 }
92
93 /// Get the purge interface (if supported).
94 ///
95 /// Purge reclaims a key's bytes, unlike `writer().delete()` which only
96 /// writes a marker. See [`KvPurge`]. Returns `None` for backends without
97 /// byte-reclaiming purge.
98 fn purge_writer(&self) -> Option<Arc<dyn KvPurge>> {
99 None
100 }
101}
102
103/// Capabilities a store connection may support.
104#[derive(Debug, Clone, Default)]
105pub struct ConnectionCapabilities {
106 /// Supports streaming watch (continuous updates). NATS: true, FDB: false.
107 pub streaming_watch: bool,
108 /// Supports native prefix watch. NATS: true, FDB: false (uses sentinel pattern).
109 pub prefix_watch: bool,
110 /// Supports TTL on keys.
111 pub ttl: bool,
112 /// Supports byte-reclaiming purge (rollup delete). NATS: true.
113 pub purge: bool,
114 /// Supports atomic compare-and-swap.
115 pub cas: bool,
116 /// Supports multi-key transactions.
117 pub transactions: bool,
118 /// Maximum value size in bytes (0 = unlimited).
119 pub max_value_size: usize,
120 /// Global ordering via versionstamps. FDB: true, NATS: false.
121 pub global_ordering: bool,
122}
123
124/// Store connection lifecycle and store factory.
125#[async_trait]
126pub trait Connection: Send + Sync {
127 /// Connect to the store.
128 async fn connect(&self) -> Result<(), KvError>;
129
130 /// Graceful shutdown.
131 async fn shutdown(&self) -> Result<(), KvError>;
132
133 /// Health check - fast, non-blocking.
134 fn is_healthy(&self) -> bool;
135
136 /// Get or create a named store with default configuration.
137 async fn store(&self, name: &str) -> Result<Arc<dyn KvStore>, KvError>;
138
139 /// Get or create a named store with custom configuration.
140 ///
141 /// Use this when you need to specify bucket-level settings like TTL or history.
142 ///
143 /// Config applies only at **creation**. If the bucket already exists, the
144 /// existing one is returned as-is and `config` (max_bytes, num_replicas,
145 /// max_history, max_age, …) is ignored — there is no reconciliation. To change
146 /// settings on a live bucket (e.g. raising `num_replicas` for HA), alter the
147 /// underlying stream out-of-band; calling this with new values is a no-op.
148 async fn store_with_config(&self, config: StoreConfig) -> Result<Arc<dyn KvStore>, KvError>;
149
150 /// Store capabilities for runtime feature detection.
151 fn capabilities(&self) -> ConnectionCapabilities;
152}