Skip to main content

ember_core/
engine.rs

1//! The engine: coordinator for the sharded keyspace.
2//!
3//! Routes single-key operations to the correct shard based on a hash
4//! of the key. Each shard is an independent tokio task — no locks on
5//! the hot path.
6
7use std::collections::hash_map::DefaultHasher;
8use std::hash::{Hash, Hasher};
9
10use crate::dropper::DropHandle;
11use crate::error::ShardError;
12use crate::keyspace::ShardConfig;
13use crate::shard::{self, ShardHandle, ShardPersistenceConfig, ShardRequest, ShardResponse};
14
15/// Channel buffer size per shard. 256 is large enough to absorb
16/// bursts without putting meaningful back-pressure on connections.
17const SHARD_BUFFER: usize = 256;
18
19/// Configuration for the engine, passed down to each shard.
20#[derive(Debug, Clone, Default)]
21pub struct EngineConfig {
22    /// Per-shard configuration (memory limits, eviction policy).
23    pub shard: ShardConfig,
24    /// Optional persistence configuration. When set, each shard gets
25    /// its own AOF and snapshot files under this directory.
26    pub persistence: Option<ShardPersistenceConfig>,
27    /// Optional schema registry for protobuf value validation.
28    /// When set, enables PROTO.* commands.
29    #[cfg(feature = "protobuf")]
30    pub schema_registry: Option<crate::schema::SharedSchemaRegistry>,
31}
32
33/// The sharded engine. Owns handles to all shard tasks and routes
34/// requests by key hash.
35///
36/// `Clone` is cheap — it just clones the `Vec<ShardHandle>` (which are
37/// mpsc senders under the hood).
38#[derive(Debug, Clone)]
39pub struct Engine {
40    shards: Vec<ShardHandle>,
41    #[cfg(feature = "protobuf")]
42    schema_registry: Option<crate::schema::SharedSchemaRegistry>,
43}
44
45impl Engine {
46    /// Creates an engine with `shard_count` shards using default config.
47    ///
48    /// Each shard is spawned as a tokio task immediately.
49    /// Panics if `shard_count` is zero.
50    pub fn new(shard_count: usize) -> Self {
51        Self::with_config(shard_count, EngineConfig::default())
52    }
53
54    /// Creates an engine with `shard_count` shards and the given config.
55    ///
56    /// Spawns a single background drop thread shared by all shards for
57    /// lazy-freeing large values.
58    ///
59    /// Panics if `shard_count` is zero.
60    pub fn with_config(shard_count: usize, config: EngineConfig) -> Self {
61        assert!(shard_count > 0, "shard count must be at least 1");
62
63        let drop_handle = DropHandle::spawn();
64
65        let shards = (0..shard_count)
66            .map(|i| {
67                let mut shard_config = config.shard.clone();
68                shard_config.shard_id = i as u16;
69                shard::spawn_shard(
70                    SHARD_BUFFER,
71                    shard_config,
72                    config.persistence.clone(),
73                    Some(drop_handle.clone()),
74                    #[cfg(feature = "protobuf")]
75                    config.schema_registry.clone(),
76                )
77            })
78            .collect();
79
80        Self {
81            shards,
82            #[cfg(feature = "protobuf")]
83            schema_registry: config.schema_registry,
84        }
85    }
86
87    /// Creates an engine with one shard per available CPU core.
88    ///
89    /// Falls back to a single shard if the core count can't be determined.
90    pub fn with_available_cores() -> Self {
91        Self::with_available_cores_config(EngineConfig::default())
92    }
93
94    /// Creates an engine with one shard per available CPU core and the
95    /// given config.
96    pub fn with_available_cores_config(config: EngineConfig) -> Self {
97        let cores = std::thread::available_parallelism()
98            .map(|n| n.get())
99            .unwrap_or(1);
100        Self::with_config(cores, config)
101    }
102
103    /// Returns a reference to the schema registry, if protobuf is enabled.
104    #[cfg(feature = "protobuf")]
105    pub fn schema_registry(&self) -> Option<&crate::schema::SharedSchemaRegistry> {
106        self.schema_registry.as_ref()
107    }
108
109    /// Returns the number of shards.
110    pub fn shard_count(&self) -> usize {
111        self.shards.len()
112    }
113
114    /// Sends a request to a specific shard by index.
115    ///
116    /// Used by SCAN to iterate through shards sequentially.
117    pub async fn send_to_shard(
118        &self,
119        shard_idx: usize,
120        request: ShardRequest,
121    ) -> Result<ShardResponse, ShardError> {
122        if shard_idx >= self.shards.len() {
123            return Err(ShardError::Unavailable);
124        }
125        self.shards[shard_idx].send(request).await
126    }
127
128    /// Routes a request to the shard that owns `key`.
129    pub async fn route(
130        &self,
131        key: &str,
132        request: ShardRequest,
133    ) -> Result<ShardResponse, ShardError> {
134        let idx = self.shard_for_key(key);
135        self.shards[idx].send(request).await
136    }
137
138    /// Sends a request to every shard and collects all responses.
139    ///
140    /// Dispatches to all shards first (so they start processing in
141    /// parallel), then collects the replies. Used for commands like
142    /// DBSIZE and INFO that need data from all shards.
143    pub async fn broadcast<F>(&self, make_req: F) -> Result<Vec<ShardResponse>, ShardError>
144    where
145        F: Fn() -> ShardRequest,
146    {
147        // dispatch to all shards without waiting for responses
148        let mut receivers = Vec::with_capacity(self.shards.len());
149        for shard in &self.shards {
150            receivers.push(shard.dispatch(make_req()).await?);
151        }
152
153        // now collect all responses
154        let mut results = Vec::with_capacity(receivers.len());
155        for rx in receivers {
156            results.push(rx.await.map_err(|_| ShardError::Unavailable)?);
157        }
158        Ok(results)
159    }
160
161    /// Routes requests for multiple keys concurrently.
162    ///
163    /// Dispatches all requests without waiting, then collects responses.
164    /// The response order matches the key order. Used for multi-key
165    /// commands like DEL and EXISTS.
166    pub async fn route_multi<F>(
167        &self,
168        keys: &[String],
169        make_req: F,
170    ) -> Result<Vec<ShardResponse>, ShardError>
171    where
172        F: Fn(String) -> ShardRequest,
173    {
174        let mut receivers = Vec::with_capacity(keys.len());
175        for key in keys {
176            let idx = self.shard_for_key(key);
177            let rx = self.shards[idx].dispatch(make_req(key.clone())).await?;
178            receivers.push(rx);
179        }
180
181        let mut results = Vec::with_capacity(receivers.len());
182        for rx in receivers {
183            results.push(rx.await.map_err(|_| ShardError::Unavailable)?);
184        }
185        Ok(results)
186    }
187
188    /// Determines which shard owns a given key.
189    fn shard_for_key(&self, key: &str) -> usize {
190        shard_index(key, self.shards.len())
191    }
192}
193
194/// Pure function: maps a key to a shard index.
195///
196/// Uses `DefaultHasher` (SipHash) and modulo. Deterministic within a
197/// single process — that's all we need for local sharding. CRC16 will
198/// replace this when cluster-level slot assignment arrives.
199fn shard_index(key: &str, shard_count: usize) -> usize {
200    let mut hasher = DefaultHasher::new();
201    key.hash(&mut hasher);
202    (hasher.finish() as usize) % shard_count
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208    use crate::types::Value;
209    use bytes::Bytes;
210
211    #[test]
212    fn same_key_same_shard() {
213        let idx1 = shard_index("foo", 8);
214        let idx2 = shard_index("foo", 8);
215        assert_eq!(idx1, idx2);
216    }
217
218    #[test]
219    fn keys_spread_across_shards() {
220        let mut seen = std::collections::HashSet::new();
221        // with enough keys, we should hit more than one shard
222        for i in 0..100 {
223            let key = format!("key:{i}");
224            seen.insert(shard_index(&key, 4));
225        }
226        assert!(seen.len() > 1, "expected keys to spread across shards");
227    }
228
229    #[test]
230    fn single_shard_always_zero() {
231        assert_eq!(shard_index("anything", 1), 0);
232        assert_eq!(shard_index("other", 1), 0);
233    }
234
235    #[tokio::test]
236    async fn engine_round_trip() {
237        let engine = Engine::new(4);
238
239        let resp = engine
240            .route(
241                "greeting",
242                ShardRequest::Set {
243                    key: "greeting".into(),
244                    value: Bytes::from("hello"),
245                    expire: None,
246                    nx: false,
247                    xx: false,
248                },
249            )
250            .await
251            .unwrap();
252        assert!(matches!(resp, ShardResponse::Ok));
253
254        let resp = engine
255            .route(
256                "greeting",
257                ShardRequest::Get {
258                    key: "greeting".into(),
259                },
260            )
261            .await
262            .unwrap();
263        match resp {
264            ShardResponse::Value(Some(Value::String(data))) => {
265                assert_eq!(data, Bytes::from("hello"));
266            }
267            other => panic!("expected Value(Some(String)), got {other:?}"),
268        }
269    }
270
271    #[tokio::test]
272    async fn multi_shard_del() {
273        let engine = Engine::new(4);
274
275        // set several keys (likely landing on different shards)
276        for key in &["a", "b", "c", "d"] {
277            engine
278                .route(
279                    key,
280                    ShardRequest::Set {
281                        key: key.to_string(),
282                        value: Bytes::from("v"),
283                        expire: None,
284                        nx: false,
285                        xx: false,
286                    },
287                )
288                .await
289                .unwrap();
290        }
291
292        // delete them all and count successes
293        let mut count = 0i64;
294        for key in &["a", "b", "c", "d", "missing"] {
295            let resp = engine
296                .route(
297                    key,
298                    ShardRequest::Del {
299                        key: key.to_string(),
300                    },
301                )
302                .await
303                .unwrap();
304            if let ShardResponse::Bool(true) = resp {
305                count += 1;
306            }
307        }
308        assert_eq!(count, 4);
309    }
310
311    #[test]
312    #[should_panic(expected = "shard count must be at least 1")]
313    fn zero_shards_panics() {
314        Engine::new(0);
315    }
316}