Skip to main content

slipstream/
stores.rs

1use async_trait::async_trait;
2use std::sync::Arc;
3use std::time::Duration;
4
5use crate::kv::{KvError, KvPurge, KvReader, KvWatcher, KvWriter};
6
7/// Storage type for a store.
8#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
9pub enum StorageType {
10    /// In-memory storage (fast, lost on restart).
11    Memory,
12    /// Persistent storage (survives restarts).
13    #[default]
14    Persistent,
15}
16
17/// What a bounded bucket does when it reaches `max_bytes` (NATS-specific).
18///
19/// This is a real semantic choice, not a tuning knob — it decides what the bucket
20/// gives up at capacity:
21#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
22pub enum DiscardPolicy {
23    /// **Reject** new writes when full, preserving every existing entry (NATS
24    /// `discard:new`). Correct for **config** buckets (certs, configs read as the
25    /// source of truth) where silently dropping a live value is unacceptable — but
26    /// it FREEZES writes at capacity (err 10077). Pair it with `max_age` so the
27    /// bucket is trimmed before it ever fills.
28    #[default]
29    New,
30    /// **Evict the oldest** messages when full (NATS `discard:old`): a hard size
31    /// ceiling that never rejects. Correct for high-churn **log** buckets whose
32    /// consumers hold the durable fold (e.g. routing origins): the bucket is a
33    /// bounded change-feed, not the source of truth, so an evicted entry is
34    /// recovered from the consumer's fold (and the `CursorExpired` resync path),
35    /// while writers never freeze.
36    Old,
37}
38
39impl DiscardPolicy {
40    pub(crate) fn as_nats(self) -> &'static str {
41        match self {
42            DiscardPolicy::New => "new",
43            DiscardPolicy::Old => "old",
44        }
45    }
46}
47
48/// Configuration for creating a store.
49#[derive(Debug, Clone, Default)]
50pub struct StoreConfig {
51    /// Store name/bucket identifier.
52    pub name: String,
53    /// Storage type (memory or persistent).
54    pub storage: StorageType,
55    /// Maximum history/versions to keep (NATS-specific, ignored by other stores).
56    pub max_history: Option<u32>,
57    /// Maximum age for entries in the bucket (bucket-level TTL).
58    /// Entries older than this are automatically removed.
59    /// NATS: maps to `max_age` on bucket config.
60    pub max_age: Option<Duration>,
61    /// Maximum bytes for the bucket (required by Synadia Cloud).
62    /// NATS: maps to `max_bytes` on bucket config.
63    pub max_bytes: Option<i64>,
64    /// Number of stream replicas for the bucket (NATS cluster mode).
65    /// Defaults to 1 (single replica). Set to 3 for production HA clusters.
66    pub num_replicas: Option<usize>,
67    /// Behavior at `max_bytes` (NATS-specific, ignored by other stores). Defaults
68    /// to [`DiscardPolicy::New`] (reject) so config buckets never silently drop a
69    /// live value; set [`DiscardPolicy::Old`] for size-bounded log buckets (routing
70    /// origins) that must never freeze writers. Only applied at bucket *creation* —
71    /// an existing bucket's policy is left untouched.
72    pub discard: DiscardPolicy,
73}
74
75/// A named KV store (bucket/namespace/database).
76pub trait KvStore: Send + Sync {
77    /// The store's name/bucket identifier.
78    fn name(&self) -> &str;
79
80    /// Get the reader interface.
81    fn reader(&self) -> Arc<dyn KvReader>;
82
83    /// Get the watcher interface (if supported).
84    fn watcher(&self) -> Option<Arc<dyn KvWatcher>> {
85        None
86    }
87
88    /// Get the writer interface (if supported).
89    fn writer(&self) -> Option<Arc<dyn KvWriter>> {
90        None
91    }
92
93    /// Get the purge interface (if supported).
94    ///
95    /// Purge reclaims a key's bytes, unlike `writer().delete()` which only
96    /// writes a marker. See [`KvPurge`]. Returns `None` for backends without
97    /// byte-reclaiming purge.
98    fn purge_writer(&self) -> Option<Arc<dyn KvPurge>> {
99        None
100    }
101}
102
103/// Capabilities a store connection may support.
104#[derive(Debug, Clone, Default)]
105pub struct ConnectionCapabilities {
106    /// Supports streaming watch (continuous updates). NATS: true, FDB: false.
107    pub streaming_watch: bool,
108    /// Supports native prefix watch. NATS: true, FDB: false (uses sentinel pattern).
109    pub prefix_watch: bool,
110    /// Supports TTL on keys.
111    pub ttl: bool,
112    /// Supports byte-reclaiming purge (rollup delete). NATS: true.
113    pub purge: bool,
114    /// Supports atomic compare-and-swap.
115    pub cas: bool,
116    /// Supports multi-key transactions.
117    pub transactions: bool,
118    /// Maximum value size in bytes (0 = unlimited).
119    pub max_value_size: usize,
120    /// Global ordering via versionstamps. FDB: true, NATS: false.
121    pub global_ordering: bool,
122}
123
124/// Store connection lifecycle and store factory.
125#[async_trait]
126pub trait Connection: Send + Sync {
127    /// Connect to the store.
128    async fn connect(&self) -> Result<(), KvError>;
129
130    /// Graceful shutdown.
131    async fn shutdown(&self) -> Result<(), KvError>;
132
133    /// Health check - fast, non-blocking.
134    fn is_healthy(&self) -> bool;
135
136    /// Get or create a named store with default configuration.
137    async fn store(&self, name: &str) -> Result<Arc<dyn KvStore>, KvError>;
138
139    /// Get or create a named store with custom configuration.
140    ///
141    /// Use this when you need to specify bucket-level settings like TTL or history.
142    ///
143    /// Config applies only at **creation**. If the bucket already exists, the
144    /// existing one is returned as-is and `config` (max_bytes, num_replicas,
145    /// max_history, max_age, …) is ignored — there is no reconciliation. To change
146    /// settings on a live bucket (e.g. raising `num_replicas` for HA), alter the
147    /// underlying stream out-of-band; calling this with new values is a no-op.
148    async fn store_with_config(&self, config: StoreConfig) -> Result<Arc<dyn KvStore>, KvError>;
149
150    /// Store capabilities for runtime feature detection.
151    fn capabilities(&self) -> ConnectionCapabilities;
152}