emberkv-core 0.4.8

Core engine for ember: keyspace, data types, sharding
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
//! The engine: coordinator for the sharded keyspace.
//!
//! Routes single-key operations to the correct shard based on a hash
//! of the key. Each shard is an independent tokio task — no locks on
//! the hot path.

use tokio::sync::broadcast;

use crate::dropper::DropHandle;
use crate::error::ShardError;
use crate::keyspace::ShardConfig;
use crate::shard::{
    self, PreparedShard, ReplicationEvent, ShardHandle, ShardPersistenceConfig, ShardRequest,
    ShardResponse,
};

/// Default channel buffer size per shard.
///
/// With batch dispatch, a single pipeline of 16 commands targeting 8 shards
/// consumes ~8 channel slots (one batch per shard) instead of 16. 4096 gives
/// generous headroom even under pathological key distribution, at ~400KB per
/// shard on 64-bit.
const DEFAULT_SHARD_BUFFER: usize = 4096;

/// Configuration for the engine, passed down to each shard.
#[derive(Debug, Clone, Default)]
pub struct EngineConfig {
    /// Per-shard configuration (memory limits, eviction policy).
    pub shard: ShardConfig,
    /// Optional persistence configuration. When set, each shard gets
    /// its own AOF and snapshot files under this directory.
    pub persistence: Option<ShardPersistenceConfig>,
    /// Optional broadcast sender for replication events.
    ///
    /// When set, every successful mutation is published as a
    /// [`ReplicationEvent`] so replication clients can stream it to
    /// replicas.
    pub replication_tx: Option<broadcast::Sender<ReplicationEvent>>,
    /// Optional schema registry for protobuf value validation.
    /// When set, enables PROTO.* commands.
    #[cfg(feature = "protobuf")]
    pub schema_registry: Option<crate::schema::SharedSchemaRegistry>,
    /// Channel buffer size per shard. 0 means use the default (256).
    pub shard_channel_buffer: usize,
}

/// The sharded engine. Owns handles to all shard tasks and routes
/// requests by key hash.
///
/// `Clone` is cheap — it just clones the `Vec<ShardHandle>` (which are
/// mpsc senders under the hood).
#[derive(Debug, Clone)]
pub struct Engine {
    shards: Vec<ShardHandle>,
    replication_tx: Option<broadcast::Sender<ReplicationEvent>>,
    #[cfg(feature = "protobuf")]
    schema_registry: Option<crate::schema::SharedSchemaRegistry>,
}

impl Engine {
    /// Creates an engine with `shard_count` shards using default config.
    ///
    /// Each shard is spawned as a tokio task immediately.
    /// Panics if `shard_count` is zero.
    pub fn new(shard_count: usize) -> Self {
        Self::with_config(shard_count, EngineConfig::default())
    }

    /// Creates an engine with `shard_count` shards and the given config.
    ///
    /// Spawns a single background drop thread shared by all shards for
    /// lazy-freeing large values.
    ///
    /// Panics if `shard_count` is zero.
    pub fn with_config(shard_count: usize, config: EngineConfig) -> Self {
        assert!(shard_count > 0, "shard count must be at least 1");
        assert!(
            shard_count <= u16::MAX as usize,
            "shard count must fit in u16"
        );

        let drop_handle = DropHandle::spawn();
        let buffer = if config.shard_channel_buffer == 0 {
            DEFAULT_SHARD_BUFFER
        } else {
            config.shard_channel_buffer
        };

        let shards = (0..shard_count)
            .map(|i| {
                let mut shard_config = config.shard.clone();
                shard_config.shard_id = i as u16;
                shard::spawn_shard(
                    buffer,
                    shard_config,
                    config.persistence.clone(),
                    Some(drop_handle.clone()),
                    config.replication_tx.clone(),
                    #[cfg(feature = "protobuf")]
                    config.schema_registry.clone(),
                )
            })
            .collect();

        Self {
            shards,
            replication_tx: config.replication_tx,
            #[cfg(feature = "protobuf")]
            schema_registry: config.schema_registry,
        }
    }

    /// Creates the engine and prepared shards without spawning any tasks.
    ///
    /// The caller is responsible for running each [`PreparedShard`] on the
    /// desired runtime via [`shard::run_prepared`]. This is the entry
    /// point for thread-per-core deployment where each OS thread runs its
    /// own single-threaded tokio runtime and one shard.
    ///
    /// Panics if `shard_count` is zero.
    pub fn prepare(shard_count: usize, config: EngineConfig) -> (Self, Vec<PreparedShard>) {
        assert!(shard_count > 0, "shard count must be at least 1");
        assert!(
            shard_count <= u16::MAX as usize,
            "shard count must fit in u16"
        );

        let drop_handle = DropHandle::spawn();
        let buffer = if config.shard_channel_buffer == 0 {
            DEFAULT_SHARD_BUFFER
        } else {
            config.shard_channel_buffer
        };

        let mut handles = Vec::with_capacity(shard_count);
        let mut prepared = Vec::with_capacity(shard_count);

        for i in 0..shard_count {
            let mut shard_config = config.shard.clone();
            shard_config.shard_id = i as u16;
            let (handle, shard) = shard::prepare_shard(
                buffer,
                shard_config,
                config.persistence.clone(),
                Some(drop_handle.clone()),
                config.replication_tx.clone(),
                #[cfg(feature = "protobuf")]
                config.schema_registry.clone(),
            );
            handles.push(handle);
            prepared.push(shard);
        }

        let engine = Self {
            shards: handles,
            replication_tx: config.replication_tx,
            #[cfg(feature = "protobuf")]
            schema_registry: config.schema_registry,
        };

        (engine, prepared)
    }

    /// Creates an engine with one shard per available CPU core.
    ///
    /// Falls back to a single shard if the core count can't be determined.
    pub fn with_available_cores() -> Self {
        Self::with_available_cores_config(EngineConfig::default())
    }

    /// Creates an engine with one shard per available CPU core and the
    /// given config.
    pub fn with_available_cores_config(config: EngineConfig) -> Self {
        let cores = std::thread::available_parallelism()
            .map(|n| n.get())
            .unwrap_or(1);
        Self::with_config(cores, config)
    }

    /// Returns a reference to the schema registry, if protobuf is enabled.
    #[cfg(feature = "protobuf")]
    pub fn schema_registry(&self) -> Option<&crate::schema::SharedSchemaRegistry> {
        self.schema_registry.as_ref()
    }

    /// Returns the number of shards.
    pub fn shard_count(&self) -> usize {
        self.shards.len()
    }

    /// Creates a new broadcast receiver for replication events.
    ///
    /// Returns `None` if no replication channel was configured. Each
    /// caller gets an independent receiver starting from the current
    /// broadcast position — not from the beginning of the stream.
    pub fn subscribe_replication(&self) -> Option<broadcast::Receiver<ReplicationEvent>> {
        self.replication_tx.as_ref().map(|tx| tx.subscribe())
    }

    /// Sends a request to a specific shard by index.
    ///
    /// Used by SCAN to iterate through shards sequentially.
    pub async fn send_to_shard(
        &self,
        shard_idx: usize,
        request: ShardRequest,
    ) -> Result<ShardResponse, ShardError> {
        if shard_idx >= self.shards.len() {
            return Err(ShardError::Unavailable);
        }
        self.shards[shard_idx].send(request).await
    }

    /// Routes a request to the shard that owns `key`.
    pub async fn route(
        &self,
        key: &str,
        request: ShardRequest,
    ) -> Result<ShardResponse, ShardError> {
        let idx = self.shard_for_key(key);
        self.shards[idx].send(request).await
    }

    /// Sends a request to every shard and collects all responses.
    ///
    /// Dispatches to all shards first (so they start processing in
    /// parallel), then collects the replies. Used for commands like
    /// DBSIZE and INFO that need data from all shards.
    pub async fn broadcast<F>(&self, make_req: F) -> Result<Vec<ShardResponse>, ShardError>
    where
        F: Fn() -> ShardRequest,
    {
        // dispatch to all shards without waiting for responses
        let mut receivers = Vec::with_capacity(self.shards.len());
        for shard in &self.shards {
            receivers.push(shard.dispatch(make_req()).await?);
        }

        // now collect all responses
        let mut results = Vec::with_capacity(receivers.len());
        for rx in receivers {
            results.push(rx.await.map_err(|_| ShardError::Unavailable)?);
        }
        Ok(results)
    }

    /// Routes requests for multiple keys concurrently.
    ///
    /// Dispatches all requests without waiting, then collects responses.
    /// The response order matches the key order. Used for multi-key
    /// commands like DEL and EXISTS.
    pub async fn route_multi<F>(
        &self,
        keys: &[String],
        make_req: F,
    ) -> Result<Vec<ShardResponse>, ShardError>
    where
        F: Fn(String) -> ShardRequest,
    {
        let mut receivers = Vec::with_capacity(keys.len());
        for key in keys {
            let idx = self.shard_for_key(key);
            let rx = self.shards[idx].dispatch(make_req(key.clone())).await?;
            receivers.push(rx);
        }

        let mut results = Vec::with_capacity(receivers.len());
        for rx in receivers {
            results.push(rx.await.map_err(|_| ShardError::Unavailable)?);
        }
        Ok(results)
    }

    /// Returns true if both keys are owned by the same shard.
    pub fn same_shard(&self, key1: &str, key2: &str) -> bool {
        self.shard_for_key(key1) == self.shard_for_key(key2)
    }

    /// Determines which shard owns a given key.
    pub fn shard_for_key(&self, key: &str) -> usize {
        shard_index(key, self.shards.len())
    }

    /// Sends a request to a shard and returns the reply channel without
    /// waiting for the response. Used by the connection handler to
    /// dispatch commands and collect responses separately.
    pub async fn dispatch_to_shard(
        &self,
        shard_idx: usize,
        request: ShardRequest,
    ) -> Result<tokio::sync::oneshot::Receiver<ShardResponse>, ShardError> {
        if shard_idx >= self.shards.len() {
            return Err(ShardError::Unavailable);
        }
        self.shards[shard_idx].dispatch(request).await
    }

    /// Sends a request to a shard using a caller-owned mpsc reply channel.
    ///
    /// Avoids the per-command oneshot allocation on the P=1 path.
    pub async fn dispatch_reusable_to_shard(
        &self,
        shard_idx: usize,
        request: ShardRequest,
        reply: tokio::sync::mpsc::Sender<ShardResponse>,
    ) -> Result<(), ShardError> {
        if shard_idx >= self.shards.len() {
            return Err(ShardError::Unavailable);
        }
        self.shards[shard_idx]
            .dispatch_reusable(request, reply)
            .await
    }

    /// Sends a batch of requests to a single shard as one channel message.
    ///
    /// Returns one receiver per request, preserving order. This is the
    /// pipeline batching optimization: N commands targeting the same shard
    /// consume 1 channel slot instead of N, eliminating head-of-line
    /// blocking under high pipeline depths.
    pub async fn dispatch_batch_to_shard(
        &self,
        shard_idx: usize,
        requests: Vec<ShardRequest>,
    ) -> Result<Vec<tokio::sync::oneshot::Receiver<ShardResponse>>, ShardError> {
        if shard_idx >= self.shards.len() {
            return Err(ShardError::Unavailable);
        }
        self.shards[shard_idx].dispatch_batch(requests).await
    }
}

/// Pure function: maps a key to a shard index.
///
/// Uses FNV-1a hashing for deterministic shard routing across restarts.
/// This is critical for AOF/snapshot recovery — keys must hash to the
/// same shard on every startup, otherwise recovered data lands in the
/// wrong shard.
///
/// FNV-1a is simple, fast for short keys, and completely deterministic
/// (no per-process randomization). Shard routing is trusted internal
/// logic so DoS-resistant hashing is unnecessary here.
fn shard_index(key: &str, shard_count: usize) -> usize {
    // FNV-1a 64-bit
    const FNV_OFFSET: u64 = 0xcbf29ce484222325;
    const FNV_PRIME: u64 = 0x100000001b3;

    let mut hash = FNV_OFFSET;
    for byte in key.as_bytes() {
        hash ^= *byte as u64;
        hash = hash.wrapping_mul(FNV_PRIME);
    }
    (hash as usize) % shard_count
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::types::Value;
    use bytes::Bytes;

    #[test]
    fn same_key_same_shard() {
        let idx1 = shard_index("foo", 8);
        let idx2 = shard_index("foo", 8);
        assert_eq!(idx1, idx2);
    }

    #[test]
    fn keys_spread_across_shards() {
        let mut seen = std::collections::HashSet::new();
        // with enough keys, we should hit more than one shard
        for i in 0..100 {
            let key = format!("key:{i}");
            seen.insert(shard_index(&key, 4));
        }
        assert!(seen.len() > 1, "expected keys to spread across shards");
    }

    #[test]
    fn single_shard_always_zero() {
        assert_eq!(shard_index("anything", 1), 0);
        assert_eq!(shard_index("other", 1), 0);
    }

    #[tokio::test]
    async fn engine_round_trip() {
        let engine = Engine::new(4);

        let resp = engine
            .route(
                "greeting",
                ShardRequest::Set {
                    key: "greeting".into(),
                    value: Bytes::from("hello"),
                    expire: None,
                    nx: false,
                    xx: false,
                },
            )
            .await
            .unwrap();
        assert!(matches!(resp, ShardResponse::Ok));

        let resp = engine
            .route(
                "greeting",
                ShardRequest::Get {
                    key: "greeting".into(),
                },
            )
            .await
            .unwrap();
        match resp {
            ShardResponse::Value(Some(Value::String(data))) => {
                assert_eq!(data, Bytes::from("hello"));
            }
            other => panic!("expected Value(Some(String)), got {other:?}"),
        }
    }

    #[tokio::test]
    async fn multi_shard_del() {
        let engine = Engine::new(4);

        // set several keys (likely landing on different shards)
        for key in &["a", "b", "c", "d"] {
            engine
                .route(
                    key,
                    ShardRequest::Set {
                        key: key.to_string(),
                        value: Bytes::from("v"),
                        expire: None,
                        nx: false,
                        xx: false,
                    },
                )
                .await
                .unwrap();
        }

        // delete them all and count successes
        let mut count = 0i64;
        for key in &["a", "b", "c", "d", "missing"] {
            let resp = engine
                .route(
                    key,
                    ShardRequest::Del {
                        key: key.to_string(),
                    },
                )
                .await
                .unwrap();
            if let ShardResponse::Bool(true) = resp {
                count += 1;
            }
        }
        assert_eq!(count, 4);
    }

    #[test]
    #[should_panic(expected = "shard count must be at least 1")]
    fn zero_shards_panics() {
        Engine::new(0);
    }
}