shardmap 0.3.0

Sharded embedded in-memory map with optional cache, protocol, and server internals
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
//! Runtime configuration for the embedded store and optional server.
//!
//! [`ShardCacheConfig`] can be loaded from TOML with
//! [`ShardCacheConfig::load_from_path`] or serialized with
//! [`ShardCacheConfig::store_to_path`]. The defaults are suitable for local
//! development and derive the shard count and tier sizes from the host when
//! possible.

use std::path::{Path, PathBuf};
use std::time::Duration;

use serde::{Deserialize, Serialize};

mod geometry;
mod validation;

use crate::Result;
use crate::cuda::CudaConfig;

use geometry::{CacheGeometryDetector, DefaultShardCount, HotTierCapacity};
use validation::ConfigValidator;

/// Top-level configuration for `shardcache`.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct ShardCacheConfig {
    /// Socket address the server should bind, for example `127.0.0.1:6380`.
    pub bind_addr: String,
    /// Maximum number of accepted client connections.
    pub max_connections: usize,
    /// Number of storage shards to create. Must be a non-zero power of two.
    pub shard_count: usize,
    /// Global memory budget in bytes. `0` disables memory-limit eviction.
    pub max_memory_bytes: u64,
    /// Policy used when a memory limit is configured.
    pub eviction_policy: EvictionPolicy,
    /// Interval between TTL maintenance sweeps.
    pub ttl_sweep_interval_ms: u64,
    /// Interval between periodic stats reports.
    pub stats_interval_ms: u64,
    /// Per-shard tier sizing.
    pub tiers: TierConfig,
    /// GPU-facing configuration values.
    pub cuda: CudaConfig,
    /// WAL and snapshot configuration.
    pub persistence: PersistenceConfig,
    /// Native mutation-stream replication configuration.
    pub replication: ReplicationConfig,
    /// Redis transaction execution mode.
    pub transaction_mode: TransactionMode,
    /// Public server endpoint topology.
    ///
    /// `Fanout` exposes one listener. Caller-owned embedded stores route
    /// fanout requests to shard owners; standalone direct servers keep their
    /// compatibility fanout behavior. `DirectShard` also exposes shard-owned
    /// ports for clients that can route directly.
    pub server_endpoint_mode: ServerEndpointMode,
}

/// Memory-limit eviction policy.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum EvictionPolicy {
    /// Do not evict entries because of memory pressure.
    #[default]
    None,
    /// Evict least-recently-used entries first.
    Lru,
    /// Evict least-frequently-used entries first.
    Lfu,
    /// Evict cold prefix groups first, then cold suffix blocks inside that group.
    #[cfg(feature = "prefix-eviction")]
    Prefix,
}

/// Redis-compatible transaction execution policy.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum TransactionMode {
    /// Reject MULTI/EXEC/DISCARD.
    Disabled,
    /// Allow transactions only when all queued keys route to one shard.
    #[default]
    ShardLocal,
    /// Coordinate transactions across all affected shards using router-level gates.
    CoordinatedCrossShard,
}

/// Public server endpoint topology.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum ServerEndpointMode {
    /// One public listener.
    #[default]
    Fanout,
    /// Expose shard-owned direct ports in addition to the fanout listener.
    DirectShard,
}

/// Capacity settings for the hot, warm, and cold in-memory tiers.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct TierConfig {
    /// Target number of entries in the CPU-cache-sized hot tier.
    pub hot_capacity: usize,
    /// Target number of entries in the warm tier.
    pub warm_capacity: usize,
    /// Target number of entries in the cold tier.
    pub cold_capacity: usize,
    /// Maximum number of entries promoted during one maintenance pass.
    pub promotion_batch: usize,
}

/// WAL and snapshot persistence settings.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct PersistenceConfig {
    /// Enable WAL and snapshot persistence.
    pub enabled: bool,
    /// Directory used for WAL segments and snapshots.
    pub data_dir: PathBuf,
    /// Approximate WAL segment size before rotation.
    pub segment_size_bytes: u64,
    /// Maximum interval between WAL fsync calls.
    pub fsync_interval_ms: u64,
    /// Snapshot cadence in seconds.
    pub snapshot_every_seconds: u64,
    /// Minimum writes before a periodic snapshot is considered.
    pub snapshot_min_writes: u64,
    /// Compress snapshot files.
    pub compress_snapshots: bool,
    /// Compress WAL segments.
    pub compress_wal: bool,
    /// Bounded channel capacity for WAL append requests.
    pub wal_channel_capacity: usize,
    /// Optional live WAL export over TCP.
    pub tcp_export: WalTcpExportConfig,
}

/// Optional live WAL export settings.
///
/// The TCP exporter streams the same framed WAL records used on disk. It is a
/// live feed, not a replay service; disk WAL segments remain the authoritative
/// recovery source.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct WalTcpExportConfig {
    /// Enable the TCP WAL exporter.
    pub enabled: bool,
    /// TCP export mode.
    pub mode: WalTcpExportMode,
    /// Address the exporter connects to or listens on, for example `127.0.0.1:7630`.
    pub addr: String,
    /// Optional plaintext authentication token.
    ///
    /// In `connect` mode, the token is sent to the configured collector before
    /// WAL frames. In `listen` mode, subscribers must send the token before they
    /// receive WAL frames.
    pub auth_token: Option<String>,
    /// Bounded queue between the disk WAL writer and the TCP exporter.
    pub channel_capacity: usize,
    /// Maximum accepted subscribers in `listen` mode.
    pub max_subscribers: usize,
    /// Maximum time spent opening one TCP connection attempt.
    pub connect_timeout_ms: u64,
    /// Maximum time spent writing a frame before reconnecting.
    pub write_timeout_ms: u64,
    /// Delay between reconnect attempts after connect or write failure.
    pub reconnect_backoff_ms: u64,
    /// If true, a full TCP export queue backpressures the WAL writer.
    ///
    /// If false, frames are dropped from the live TCP export when the exporter
    /// cannot keep up; disk WAL append still continues.
    pub backpressure_on_full: bool,
}

/// Native replication configuration.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct ReplicationConfig {
    /// Enable native mutation-stream replication.
    pub enabled: bool,
    /// Runtime role for this process.
    pub role: ReplicationRole,
    /// Address a primary listens on for replicas and service subscribers.
    pub bind_addr: String,
    /// Primary address a replica connects to.
    pub replica_of: Option<String>,
    /// Optional plaintext authentication token for native replication.
    pub auth_token: Option<String>,
    /// Compression algorithm for mutation batches and snapshot chunks.
    pub compression: ReplicationCompression,
    /// zstd compression level used when `compression = "zstd"`.
    pub zstd_level: i32,
    /// Send policy for primary mutation flushes.
    pub send_policy: ReplicationSendPolicy,
    /// Maximum records in one mutation batch.
    pub batch_max_records: usize,
    /// Maximum uncompressed bytes in one mutation batch.
    pub batch_max_bytes: usize,
    /// Maximum time a non-empty batch may wait before flush.
    pub batch_max_delay_us: u64,
    /// Approximate retained in-memory backlog size for partial catch-up.
    pub backlog_bytes: usize,
    /// Snapshot chunk size before compression.
    pub snapshot_chunk_bytes: usize,
    /// Per-shard bounded queue capacity for ready replication batches.
    ///
    /// The shard worker builds ordered mutation batches locally. When this
    /// queue is full, that shard's emitting thread blocks until its exporter
    /// drains a batch. Increase this if export lanes cannot keep up with
    /// bursty writes.
    pub queue_capacity: usize,
    /// Maximum simultaneously-connected replicas in `listen` mode.
    pub max_replicas: usize,
    /// Maximum time spent opening one TCP connect attempt from a replica.
    pub connect_timeout_ms: u64,
    /// Per-write timeout for replication TCP I/O.
    pub write_timeout_ms: u64,
    /// Delay between reconnect attempts after a replica disconnect.
    pub reconnect_backoff_ms: u64,
    /// Per-subscriber outbound channel capacity.
    pub subscriber_channel_capacity: usize,
}

/// Native replication role.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum ReplicationRole {
    /// Emit local writes and serve replicas.
    #[default]
    Primary,
    /// Receive and apply writes from a primary.
    Replica,
}

/// Native replication compression.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum ReplicationCompression {
    /// Do not compress replication payloads.
    None,
    /// Compress replication payloads with zstd.
    #[default]
    Zstd,
}

/// Native replication send policy.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum ReplicationSendPolicy {
    /// Flush each mutation as a one-record batch.
    Immediate,
    /// Accumulate mutations until a record, byte, or delay threshold is reached.
    #[default]
    Batch,
}

/// TCP WAL export topology.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum WalTcpExportMode {
    /// Connect to a single downstream collector and push live WAL frames.
    Connect,
    /// Listen for authenticated subscribers and fan out live WAL frames.
    Listen,
}

/// Host CPU cache geometry used to derive tier defaults.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct CacheGeometry {
    /// L1 data cache size in bytes.
    pub l1d_bytes: usize,
    /// L2 cache size in bytes.
    pub l2_bytes: usize,
    /// L3 cache size in bytes.
    pub l3_bytes: usize,
}

struct ConfigFile<'a> {
    path: &'a Path,
}

enum ConfigFileParent<'a> {
    Present(&'a Path),
    Missing,
}

impl Default for ShardCacheConfig {
    fn default() -> Self {
        let shard_count = Self::default_shard_count();
        Self {
            bind_addr: "127.0.0.1:6380".to_string(),
            max_connections: 4_096,
            shard_count,
            max_memory_bytes: 0,
            eviction_policy: EvictionPolicy::None,
            ttl_sweep_interval_ms: 1_000,
            stats_interval_ms: 5_000,
            tiers: TierConfig::from_geometry(CacheGeometry::detect_current_host(), shard_count),
            cuda: CudaConfig::default(),
            persistence: PersistenceConfig::default(),
            replication: ReplicationConfig::default(),
            transaction_mode: TransactionMode::default(),
            server_endpoint_mode: ServerEndpointMode::default(),
        }
    }
}

impl Default for TierConfig {
    fn default() -> Self {
        Self::from_geometry(
            CacheGeometry::detect_current_host(),
            ShardCacheConfig::default_shard_count(),
        )
    }
}

impl TierConfig {
    /// Builds tier capacities from CPU cache geometry and shard count.
    pub fn from_geometry(geometry: CacheGeometry, shard_count: usize) -> Self {
        let hot_capacity = HotTierCapacity::from_l1(geometry.l1d_bytes);
        let warm_capacity = (geometry.l2_bytes / 160).clamp(1_024, 131_072);
        let cold_bytes_per_shard = usize::max(
            geometry.l3_bytes / usize::max(shard_count, 1),
            2 * 1024 * 1024,
        );
        let cold_capacity = (cold_bytes_per_shard / 192).clamp(8_192, 1_000_000);

        Self {
            hot_capacity,
            warm_capacity,
            cold_capacity,
            promotion_batch: 256,
        }
    }
}

impl Default for PersistenceConfig {
    fn default() -> Self {
        Self {
            enabled: true,
            data_dir: PathBuf::from("./var/shardcache"),
            segment_size_bytes: 64 * 1024 * 1024,
            fsync_interval_ms: 100,
            snapshot_every_seconds: 300,
            snapshot_min_writes: 1_000,
            compress_snapshots: true,
            compress_wal: true,
            wal_channel_capacity: 16_384,
            tcp_export: WalTcpExportConfig::default(),
        }
    }
}

impl Default for WalTcpExportConfig {
    fn default() -> Self {
        Self {
            enabled: false,
            mode: WalTcpExportMode::Connect,
            addr: "127.0.0.1:7630".to_string(),
            auth_token: None,
            channel_capacity: 16_384,
            max_subscribers: 64,
            connect_timeout_ms: 250,
            write_timeout_ms: 250,
            reconnect_backoff_ms: 100,
            backpressure_on_full: false,
        }
    }
}

impl Default for ReplicationConfig {
    fn default() -> Self {
        Self {
            enabled: false,
            role: ReplicationRole::Primary,
            bind_addr: "127.0.0.1:7631".to_string(),
            replica_of: None,
            auth_token: None,
            compression: ReplicationCompression::None,
            zstd_level: 3,
            send_policy: ReplicationSendPolicy::Batch,
            batch_max_records: 512,
            batch_max_bytes: 1024 * 1024,
            batch_max_delay_us: 750,
            backlog_bytes: 64 * 1024 * 1024,
            snapshot_chunk_bytes: 1024 * 1024,
            queue_capacity: 16_384,
            max_replicas: 16,
            connect_timeout_ms: 500,
            write_timeout_ms: 500,
            reconnect_backoff_ms: 200,
            subscriber_channel_capacity: 1_024,
        }
    }
}

impl ShardCacheConfig {
    /// Returns the default shard count for the current host.
    pub fn default_shard_count() -> usize {
        DefaultShardCount::current()
    }

    /// Loads and validates a TOML configuration file.
    pub fn load_from_path(path: &Path) -> Result<Self> {
        let config = ConfigFile::new(path).load()?;
        config.validate()?;
        Ok(config)
    }

    /// Writes the configuration as pretty TOML, creating parent directories.
    pub fn store_to_path(&self, path: &Path) -> Result<()> {
        ConfigFile::new(path).store(self)
    }

    /// Creates directories required by the enabled persistence settings.
    pub fn ensure_paths(&self) -> Result<()> {
        self.persistence.ensure_paths()
    }

    /// Validates cross-field constraints.
    pub fn validate(&self) -> Result<()> {
        ConfigValidator::new(self).validate()
    }

    /// Returns the TTL sweep interval, clamped to at least 1 ms.
    pub fn ttl_sweep_interval(&self) -> Duration {
        Duration::from_millis(self.ttl_sweep_interval_ms.max(1))
    }

    /// Returns the stats interval, clamped to at least 250 ms.
    pub fn stats_interval(&self) -> Duration {
        Duration::from_millis(self.stats_interval_ms.max(250))
    }

    /// Returns the per-shard memory limit implied by `max_memory_bytes`.
    pub fn per_shard_memory_limit_bytes(&self) -> Option<usize> {
        match self.max_memory_bytes {
            0 => None,
            bytes => {
                let shard_count = self.shard_count as u64;
                Some(bytes.div_ceil(shard_count) as usize)
            }
        }
    }

    /// Returns the total memory limit when one is configured.
    pub fn total_memory_limit_bytes(&self) -> Option<usize> {
        match self.max_memory_bytes {
            0 => None,
            bytes => Some(bytes as usize),
        }
    }

    /// Returns the snapshot interval, clamped to at least 1 second.
    pub fn snapshot_interval(&self) -> Duration {
        Duration::from_secs(self.persistence.snapshot_every_seconds.max(1))
    }
}

impl CacheGeometry {
    /// Detects CPU cache geometry from the current host when possible.
    pub fn detect_current_host() -> Self {
        CacheGeometryDetector::detect()
    }
}

impl<'a> ConfigFile<'a> {
    fn new(path: &'a Path) -> Self {
        ConfigFile { path }
    }

    fn load(&self) -> Result<ShardCacheConfig> {
        let contents = std::fs::read_to_string(self.path)?;
        Ok(toml::from_str(&contents)?)
    }

    fn store(&self, config: &ShardCacheConfig) -> Result<()> {
        self.ensure_parent()?;
        let contents = toml::to_string_pretty(config)?;
        std::fs::write(self.path, contents)?;
        Ok(())
    }

    fn ensure_parent(&self) -> Result<()> {
        match ConfigFileParent::from_path(self.path) {
            ConfigFileParent::Present(parent) => {
                std::fs::create_dir_all(parent)?;
                Ok(())
            }
            ConfigFileParent::Missing => Ok(()),
        }
    }
}

impl<'a> ConfigFileParent<'a> {
    fn from_path(path: &'a Path) -> Self {
        match path.parent() {
            Some(parent) => Self::Present(parent),
            None => Self::Missing,
        }
    }
}

impl PersistenceConfig {
    fn ensure_paths(&self) -> Result<()> {
        match self.enabled {
            true => {
                std::fs::create_dir_all(&self.data_dir)?;
                Ok(())
            }
            false => Ok(()),
        }
    }
}

#[cfg(test)]
mod tests {
    use super::{ServerEndpointMode, ShardCacheConfig, geometry::CacheSizeParser};

    #[test]
    fn parses_cache_sizes() {
        assert_eq!(CacheSizeParser::parse("32K"), Some(32 * 1024));
        assert_eq!(CacheSizeParser::parse("4M"), Some(4 * 1024 * 1024));
        assert_eq!(CacheSizeParser::parse("65536"), Some(65_536));
    }

    #[test]
    fn validates_power_of_two_shard_count() {
        for shard_count in [0, 3, 10, 12] {
            let config = ShardCacheConfig {
                shard_count,
                ..ShardCacheConfig::default()
            };
            assert!(config.validate().is_err(), "{shard_count} should fail");
        }

        for shard_count in [1, 2, 4, 8, 16, 32] {
            let config = ShardCacheConfig {
                shard_count,
                ..ShardCacheConfig::default()
            };
            assert!(config.validate().is_ok(), "{shard_count} should pass");
        }
    }

    #[test]
    fn default_shard_count_is_power_of_two() {
        let shard_count = ShardCacheConfig::default_shard_count();
        assert!(shard_count > 0);
        assert!(shard_count.is_power_of_two());
    }

    #[test]
    fn default_server_endpoint_mode_is_fanout() {
        assert_eq!(
            ShardCacheConfig::default().server_endpoint_mode,
            ServerEndpointMode::Fanout
        );
    }

    #[test]
    fn parses_server_endpoint_mode_from_toml() {
        let config: ShardCacheConfig =
            toml::from_str(r#"server_endpoint_mode = "direct_shard""#).unwrap();

        assert_eq!(config.server_endpoint_mode, ServerEndpointMode::DirectShard);
    }
}