Skip to main content

fast_cache/
config.rs

1//! Runtime configuration for the embedded store and optional server.
2//!
3//! [`FastCacheConfig`] can be loaded from TOML with
4//! [`FastCacheConfig::load_from_path`] or serialized with
5//! [`FastCacheConfig::store_to_path`]. The defaults are suitable for local
6//! development and derive the shard count and tier sizes from the host when
7//! possible.
8
9use std::path::{Path, PathBuf};
10use std::time::Duration;
11
12use serde::{Deserialize, Serialize};
13
14mod geometry;
15mod validation;
16
17use crate::Result;
18use crate::cuda::CudaConfig;
19
20use geometry::{CacheGeometryDetector, DefaultShardCount, HotTierCapacity};
21use validation::ConfigValidator;
22
23/// Top-level configuration for `fast-cache-server`.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25#[serde(default)]
26pub struct FastCacheConfig {
27    /// Socket address the server should bind, for example `127.0.0.1:6380`.
28    pub bind_addr: String,
29    /// Maximum number of accepted client connections.
30    pub max_connections: usize,
31    /// Number of storage shards to create. Must be a non-zero power of two.
32    pub shard_count: usize,
33    /// Global memory budget in bytes. `0` disables memory-limit eviction.
34    pub max_memory_bytes: u64,
35    /// Policy used when a memory limit is configured.
36    pub eviction_policy: EvictionPolicy,
37    /// Interval between TTL maintenance sweeps.
38    pub ttl_sweep_interval_ms: u64,
39    /// Interval between periodic stats reports.
40    pub stats_interval_ms: u64,
41    /// Per-shard tier sizing.
42    pub tiers: TierConfig,
43    /// GPU-facing configuration values.
44    pub cuda: CudaConfig,
45    /// WAL and snapshot configuration.
46    pub persistence: PersistenceConfig,
47    /// Native mutation-stream replication configuration.
48    pub replication: ReplicationConfig,
49}
50
51/// Memory-limit eviction policy.
52#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
53#[serde(rename_all = "snake_case")]
54pub enum EvictionPolicy {
55    /// Do not evict entries because of memory pressure.
56    #[default]
57    None,
58    /// Evict least-recently-used entries first.
59    Lru,
60    /// Evict least-frequently-used entries first.
61    Lfu,
62}
63
64/// Capacity settings for the hot, warm, and cold in-memory tiers.
65#[derive(Debug, Clone, Serialize, Deserialize)]
66#[serde(default)]
67pub struct TierConfig {
68    /// Target number of entries in the CPU-cache-sized hot tier.
69    pub hot_capacity: usize,
70    /// Target number of entries in the warm tier.
71    pub warm_capacity: usize,
72    /// Target number of entries in the cold tier.
73    pub cold_capacity: usize,
74    /// Maximum number of entries promoted during one maintenance pass.
75    pub promotion_batch: usize,
76}
77
78/// WAL and snapshot persistence settings.
79#[derive(Debug, Clone, Serialize, Deserialize)]
80#[serde(default)]
81pub struct PersistenceConfig {
82    /// Enable WAL and snapshot persistence.
83    pub enabled: bool,
84    /// Directory used for WAL segments and snapshots.
85    pub data_dir: PathBuf,
86    /// Approximate WAL segment size before rotation.
87    pub segment_size_bytes: u64,
88    /// Maximum interval between WAL fsync calls.
89    pub fsync_interval_ms: u64,
90    /// Snapshot cadence in seconds.
91    pub snapshot_every_seconds: u64,
92    /// Minimum writes before a periodic snapshot is considered.
93    pub snapshot_min_writes: u64,
94    /// Compress snapshot files.
95    pub compress_snapshots: bool,
96    /// Compress WAL segments.
97    pub compress_wal: bool,
98    /// Bounded channel capacity for WAL append requests.
99    pub wal_channel_capacity: usize,
100    /// Optional live WAL export over TCP.
101    pub tcp_export: WalTcpExportConfig,
102}
103
104/// Optional live WAL export settings.
105///
106/// The TCP exporter streams the same framed WAL records used on disk. It is a
107/// live feed, not a replay service; disk WAL segments remain the authoritative
108/// recovery source.
109#[derive(Debug, Clone, Serialize, Deserialize)]
110#[serde(default)]
111pub struct WalTcpExportConfig {
112    /// Enable the TCP WAL exporter.
113    pub enabled: bool,
114    /// TCP export mode.
115    pub mode: WalTcpExportMode,
116    /// Address the exporter connects to or listens on, for example `127.0.0.1:7630`.
117    pub addr: String,
118    /// Optional plaintext authentication token.
119    ///
120    /// In `connect` mode, the token is sent to the configured collector before
121    /// WAL frames. In `listen` mode, subscribers must send the token before they
122    /// receive WAL frames.
123    pub auth_token: Option<String>,
124    /// Bounded queue between the disk WAL writer and the TCP exporter.
125    pub channel_capacity: usize,
126    /// Maximum accepted subscribers in `listen` mode.
127    pub max_subscribers: usize,
128    /// Maximum time spent opening one TCP connection attempt.
129    pub connect_timeout_ms: u64,
130    /// Maximum time spent writing a frame before reconnecting.
131    pub write_timeout_ms: u64,
132    /// Delay between reconnect attempts after connect or write failure.
133    pub reconnect_backoff_ms: u64,
134    /// If true, a full TCP export queue backpressures the WAL writer.
135    ///
136    /// If false, frames are dropped from the live TCP export when the exporter
137    /// cannot keep up; disk WAL append still continues.
138    pub backpressure_on_full: bool,
139}
140
141/// Native replication configuration.
142#[derive(Debug, Clone, Serialize, Deserialize)]
143#[serde(default)]
144pub struct ReplicationConfig {
145    /// Enable native mutation-stream replication.
146    pub enabled: bool,
147    /// Runtime role for this process.
148    pub role: ReplicationRole,
149    /// Address a primary listens on for replicas and service subscribers.
150    pub bind_addr: String,
151    /// Primary address a replica connects to.
152    pub replica_of: Option<String>,
153    /// Optional plaintext authentication token for native replication.
154    pub auth_token: Option<String>,
155    /// Compression algorithm for mutation batches and snapshot chunks.
156    pub compression: ReplicationCompression,
157    /// zstd compression level used when `compression = "zstd"`.
158    pub zstd_level: i32,
159    /// Send policy for primary mutation flushes.
160    pub send_policy: ReplicationSendPolicy,
161    /// Maximum records in one mutation batch.
162    pub batch_max_records: usize,
163    /// Maximum uncompressed bytes in one mutation batch.
164    pub batch_max_bytes: usize,
165    /// Maximum time a non-empty batch may wait before flush.
166    pub batch_max_delay_us: u64,
167    /// Approximate retained in-memory backlog size for partial catch-up.
168    pub backlog_bytes: usize,
169    /// Snapshot chunk size before compression.
170    pub snapshot_chunk_bytes: usize,
171    /// Per-shard bounded queue capacity for ready replication batches.
172    ///
173    /// The shard worker builds ordered mutation batches locally. When this
174    /// queue is full, that shard's emitting thread blocks until its exporter
175    /// drains a batch. Increase this if export lanes cannot keep up with
176    /// bursty writes.
177    pub queue_capacity: usize,
178    /// Maximum simultaneously-connected replicas in `listen` mode.
179    pub max_replicas: usize,
180    /// Maximum time spent opening one TCP connect attempt from a replica.
181    pub connect_timeout_ms: u64,
182    /// Per-write timeout for replication TCP I/O.
183    pub write_timeout_ms: u64,
184    /// Delay between reconnect attempts after a replica disconnect.
185    pub reconnect_backoff_ms: u64,
186    /// Per-subscriber outbound channel capacity.
187    pub subscriber_channel_capacity: usize,
188}
189
190/// Native replication role.
191#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
192#[serde(rename_all = "snake_case")]
193pub enum ReplicationRole {
194    /// Emit local writes and serve replicas.
195    #[default]
196    Primary,
197    /// Receive and apply writes from a primary.
198    Replica,
199}
200
201/// Native replication compression.
202#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
203#[serde(rename_all = "snake_case")]
204pub enum ReplicationCompression {
205    /// Do not compress replication payloads.
206    None,
207    /// Compress replication payloads with zstd.
208    #[default]
209    Zstd,
210}
211
212/// Native replication send policy.
213#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
214#[serde(rename_all = "snake_case")]
215pub enum ReplicationSendPolicy {
216    /// Flush each mutation as a one-record batch.
217    Immediate,
218    /// Accumulate mutations until a record, byte, or delay threshold is reached.
219    #[default]
220    Batch,
221}
222
223/// TCP WAL export topology.
224#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
225#[serde(rename_all = "snake_case")]
226pub enum WalTcpExportMode {
227    /// Connect to a single downstream collector and push live WAL frames.
228    Connect,
229    /// Listen for authenticated subscribers and fan out live WAL frames.
230    Listen,
231}
232
233/// Host CPU cache geometry used to derive tier defaults.
234#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
235pub struct CacheGeometry {
236    /// L1 data cache size in bytes.
237    pub l1d_bytes: usize,
238    /// L2 cache size in bytes.
239    pub l2_bytes: usize,
240    /// L3 cache size in bytes.
241    pub l3_bytes: usize,
242}
243
244struct ConfigFile<'a> {
245    path: &'a Path,
246}
247
248enum ConfigFileParent<'a> {
249    Present(&'a Path),
250    Missing,
251}
252
253impl Default for FastCacheConfig {
254    fn default() -> Self {
255        let shard_count = Self::default_shard_count();
256        Self {
257            bind_addr: "127.0.0.1:6380".to_string(),
258            max_connections: 4_096,
259            shard_count,
260            max_memory_bytes: 0,
261            eviction_policy: EvictionPolicy::None,
262            ttl_sweep_interval_ms: 1_000,
263            stats_interval_ms: 5_000,
264            tiers: TierConfig::from_geometry(CacheGeometry::detect_current_host(), shard_count),
265            cuda: CudaConfig::default(),
266            persistence: PersistenceConfig::default(),
267            replication: ReplicationConfig::default(),
268        }
269    }
270}
271
272impl Default for TierConfig {
273    fn default() -> Self {
274        Self::from_geometry(
275            CacheGeometry::detect_current_host(),
276            FastCacheConfig::default_shard_count(),
277        )
278    }
279}
280
281impl TierConfig {
282    /// Builds tier capacities from CPU cache geometry and shard count.
283    pub fn from_geometry(geometry: CacheGeometry, shard_count: usize) -> Self {
284        let hot_capacity = HotTierCapacity::from_l1(geometry.l1d_bytes);
285        let warm_capacity = (geometry.l2_bytes / 160).clamp(1_024, 131_072);
286        let cold_bytes_per_shard = usize::max(
287            geometry.l3_bytes / usize::max(shard_count, 1),
288            2 * 1024 * 1024,
289        );
290        let cold_capacity = (cold_bytes_per_shard / 192).clamp(8_192, 1_000_000);
291
292        Self {
293            hot_capacity,
294            warm_capacity,
295            cold_capacity,
296            promotion_batch: 256,
297        }
298    }
299}
300
301impl Default for PersistenceConfig {
302    fn default() -> Self {
303        Self {
304            enabled: true,
305            data_dir: PathBuf::from("./var/fast-cache"),
306            segment_size_bytes: 64 * 1024 * 1024,
307            fsync_interval_ms: 100,
308            snapshot_every_seconds: 300,
309            snapshot_min_writes: 1_000,
310            compress_snapshots: true,
311            compress_wal: true,
312            wal_channel_capacity: 16_384,
313            tcp_export: WalTcpExportConfig::default(),
314        }
315    }
316}
317
318impl Default for WalTcpExportConfig {
319    fn default() -> Self {
320        Self {
321            enabled: false,
322            mode: WalTcpExportMode::Connect,
323            addr: "127.0.0.1:7630".to_string(),
324            auth_token: None,
325            channel_capacity: 16_384,
326            max_subscribers: 64,
327            connect_timeout_ms: 250,
328            write_timeout_ms: 250,
329            reconnect_backoff_ms: 100,
330            backpressure_on_full: false,
331        }
332    }
333}
334
335impl Default for ReplicationConfig {
336    fn default() -> Self {
337        Self {
338            enabled: false,
339            role: ReplicationRole::Primary,
340            bind_addr: "127.0.0.1:7631".to_string(),
341            replica_of: None,
342            auth_token: None,
343            compression: ReplicationCompression::None,
344            zstd_level: 3,
345            send_policy: ReplicationSendPolicy::Batch,
346            batch_max_records: 512,
347            batch_max_bytes: 1024 * 1024,
348            batch_max_delay_us: 750,
349            backlog_bytes: 64 * 1024 * 1024,
350            snapshot_chunk_bytes: 1024 * 1024,
351            queue_capacity: 16_384,
352            max_replicas: 16,
353            connect_timeout_ms: 500,
354            write_timeout_ms: 500,
355            reconnect_backoff_ms: 200,
356            subscriber_channel_capacity: 1_024,
357        }
358    }
359}
360
361impl FastCacheConfig {
362    /// Returns the default shard count for the current host.
363    pub fn default_shard_count() -> usize {
364        DefaultShardCount::current()
365    }
366
367    /// Loads and validates a TOML configuration file.
368    pub fn load_from_path(path: &Path) -> Result<Self> {
369        let config = ConfigFile::new(path).load()?;
370        config.validate()?;
371        Ok(config)
372    }
373
374    /// Writes the configuration as pretty TOML, creating parent directories.
375    pub fn store_to_path(&self, path: &Path) -> Result<()> {
376        ConfigFile::new(path).store(self)
377    }
378
379    /// Creates directories required by the enabled persistence settings.
380    pub fn ensure_paths(&self) -> Result<()> {
381        self.persistence.ensure_paths()
382    }
383
384    /// Validates cross-field constraints.
385    pub fn validate(&self) -> Result<()> {
386        ConfigValidator::new(self).validate()
387    }
388
389    /// Returns the TTL sweep interval, clamped to at least 1 ms.
390    pub fn ttl_sweep_interval(&self) -> Duration {
391        Duration::from_millis(self.ttl_sweep_interval_ms.max(1))
392    }
393
394    /// Returns the stats interval, clamped to at least 250 ms.
395    pub fn stats_interval(&self) -> Duration {
396        Duration::from_millis(self.stats_interval_ms.max(250))
397    }
398
399    /// Returns the per-shard memory limit implied by `max_memory_bytes`.
400    pub fn per_shard_memory_limit_bytes(&self) -> Option<usize> {
401        match self.max_memory_bytes {
402            0 => None,
403            bytes => {
404                let shard_count = self.shard_count as u64;
405                Some(bytes.div_ceil(shard_count) as usize)
406            }
407        }
408    }
409
410    /// Returns the snapshot interval, clamped to at least 1 second.
411    pub fn snapshot_interval(&self) -> Duration {
412        Duration::from_secs(self.persistence.snapshot_every_seconds.max(1))
413    }
414}
415
416impl CacheGeometry {
417    /// Detects CPU cache geometry from the current host when possible.
418    pub fn detect_current_host() -> Self {
419        CacheGeometryDetector::detect()
420    }
421}
422
423impl<'a> ConfigFile<'a> {
424    fn new(path: &'a Path) -> Self {
425        ConfigFile { path }
426    }
427
428    fn load(&self) -> Result<FastCacheConfig> {
429        let contents = std::fs::read_to_string(self.path)?;
430        Ok(toml::from_str(&contents)?)
431    }
432
433    fn store(&self, config: &FastCacheConfig) -> Result<()> {
434        self.ensure_parent()?;
435        let contents = toml::to_string_pretty(config)?;
436        std::fs::write(self.path, contents)?;
437        Ok(())
438    }
439
440    fn ensure_parent(&self) -> Result<()> {
441        match ConfigFileParent::from_path(self.path) {
442            ConfigFileParent::Present(parent) => {
443                std::fs::create_dir_all(parent)?;
444                Ok(())
445            }
446            ConfigFileParent::Missing => Ok(()),
447        }
448    }
449}
450
451impl<'a> ConfigFileParent<'a> {
452    fn from_path(path: &'a Path) -> Self {
453        match path.parent() {
454            Some(parent) => Self::Present(parent),
455            None => Self::Missing,
456        }
457    }
458}
459
460impl PersistenceConfig {
461    fn ensure_paths(&self) -> Result<()> {
462        match self.enabled {
463            true => {
464                std::fs::create_dir_all(&self.data_dir)?;
465                Ok(())
466            }
467            false => Ok(()),
468        }
469    }
470}
471
472#[cfg(test)]
473mod tests {
474    use super::{FastCacheConfig, geometry::CacheSizeParser};
475
476    #[test]
477    fn parses_cache_sizes() {
478        assert_eq!(CacheSizeParser::parse("32K"), Some(32 * 1024));
479        assert_eq!(CacheSizeParser::parse("4M"), Some(4 * 1024 * 1024));
480        assert_eq!(CacheSizeParser::parse("65536"), Some(65_536));
481    }
482
483    #[test]
484    fn validates_power_of_two_shard_count() {
485        for shard_count in [0, 3, 10, 12] {
486            let config = FastCacheConfig {
487                shard_count,
488                ..FastCacheConfig::default()
489            };
490            assert!(config.validate().is_err(), "{shard_count} should fail");
491        }
492
493        for shard_count in [1, 2, 4, 8, 16, 32] {
494            let config = FastCacheConfig {
495                shard_count,
496                ..FastCacheConfig::default()
497            };
498            assert!(config.validate().is_ok(), "{shard_count} should pass");
499        }
500    }
501
502    #[test]
503    fn default_shard_count_is_power_of_two() {
504        let shard_count = FastCacheConfig::default_shard_count();
505        assert!(shard_count > 0);
506        assert!(shard_count.is_power_of_two());
507    }
508}