d_engine_server/node/client/
local_kv.rs

1//! Zero-overhead KV client for embedded d-engine.
2//!
3//! [`LocalKvClient`] provides direct access to Raft state machine
4//! without gRPC serialization or network traversal.
5//!
6//! # Performance
7//! - **10-20x faster** than gRPC (localhost)
8//! - **<0.1ms latency** per operation
9//! - Zero serialization overhead
10//!
11//! # Usage
12//! ```rust,ignore
13//! let node = NodeBuilder::new(config).start().await?;
14//! let client = node.local_client();
15//! client.put(b"key", b"value").await?;
16//! ```
17
18use std::fmt;
19use std::time::Duration;
20
21use bytes::Bytes;
22use d_engine_client::KvClient;
23use d_engine_client::KvClientError;
24use d_engine_client::KvResult;
25use d_engine_core::MaybeCloneOneshot;
26use d_engine_core::RaftEvent;
27use d_engine_core::RaftOneshot;
28use d_engine_proto::client::ClientReadRequest;
29use d_engine_proto::client::ClientWriteRequest;
30use d_engine_proto::client::ReadConsistencyPolicy;
31use d_engine_proto::client::WriteCommand;
32use d_engine_proto::error::ErrorCode;
33use tokio::sync::mpsc;
34
35/// Local client error types
36#[derive(Debug)]
37pub enum LocalClientError {
38    /// Event channel closed (node shutting down)
39    ChannelClosed,
40    /// Operation exceeded timeout duration
41    Timeout(Duration),
42    /// Not the leader - request should be forwarded
43    NotLeader {
44        /// Leader's node ID (if known)
45        leader_id: Option<String>,
46        /// Leader's address (if known)
47        leader_address: Option<String>,
48    },
49    /// Server-side error occurred
50    ServerError(String),
51}
52
53impl fmt::Display for LocalClientError {
54    fn fmt(
55        &self,
56        f: &mut fmt::Formatter<'_>,
57    ) -> fmt::Result {
58        match self {
59            LocalClientError::ChannelClosed => {
60                write!(f, "Channel closed, node may be shutting down")
61            }
62            LocalClientError::Timeout(d) => write!(f, "Operation timeout after {d:?}"),
63            LocalClientError::NotLeader {
64                leader_id,
65                leader_address,
66            } => {
67                write!(f, "Not leader")?;
68                if let Some(id) = leader_id {
69                    write!(f, " (leader_id: {id})")?;
70                }
71                if let Some(addr) = leader_address {
72                    write!(f, " (leader_address: {addr})")?;
73                }
74                Ok(())
75            }
76            LocalClientError::ServerError(s) => write!(f, "Server error: {s}"),
77        }
78    }
79}
80
81impl std::error::Error for LocalClientError {}
82
83pub type Result<T> = std::result::Result<T, LocalClientError>;
84
85// Convert LocalClientError to KvClientError
86impl From<LocalClientError> for KvClientError {
87    fn from(err: LocalClientError) -> Self {
88        match err {
89            LocalClientError::ChannelClosed => KvClientError::ChannelClosed,
90            LocalClientError::Timeout(_) => KvClientError::Timeout,
91            LocalClientError::NotLeader {
92                leader_id,
93                leader_address,
94            } => {
95                let msg = if let Some(addr) = leader_address {
96                    format!("Not leader, try leader at: {addr}")
97                } else if let Some(id) = leader_id {
98                    format!("Not leader, leader_id: {id}")
99                } else {
100                    "Not leader".to_string()
101                };
102                KvClientError::ServerError(msg)
103            }
104            LocalClientError::ServerError(msg) => KvClientError::ServerError(msg),
105        }
106    }
107}
108
109/// Zero-overhead KV client for embedded mode.
110///
111/// Directly calls Raft core without gRPC overhead.
112#[derive(Clone)]
113pub struct LocalKvClient {
114    event_tx: mpsc::Sender<RaftEvent>,
115    client_id: u32,
116    timeout: Duration,
117}
118
119impl LocalKvClient {
120    /// Internal constructor (used by Node::local_client())
121    pub(crate) fn new_internal(
122        event_tx: mpsc::Sender<RaftEvent>,
123        client_id: u32,
124        timeout: Duration,
125    ) -> Self {
126        Self {
127            event_tx,
128            client_id,
129            timeout,
130        }
131    }
132
133    /// Map ErrorCode and ErrorMetadata to LocalClientError
134    fn map_error_response(
135        error_code: i32,
136        metadata: Option<d_engine_proto::error::ErrorMetadata>,
137    ) -> LocalClientError {
138        use d_engine_proto::error::ErrorCode;
139
140        match ErrorCode::try_from(error_code) {
141            Ok(ErrorCode::NotLeader) => {
142                let (leader_id, leader_address) = if let Some(meta) = metadata {
143                    (meta.leader_id, meta.leader_address)
144                } else {
145                    (None, None)
146                };
147                LocalClientError::NotLeader {
148                    leader_id,
149                    leader_address,
150                }
151            }
152            _ => LocalClientError::ServerError(format!("Error code: {error_code}")),
153        }
154    }
155
156    /// Store a key-value pair with strong consistency.
157    pub async fn put(
158        &self,
159        key: impl AsRef<[u8]>,
160        value: impl AsRef<[u8]>,
161    ) -> Result<()> {
162        let command = WriteCommand::insert(
163            Bytes::copy_from_slice(key.as_ref()),
164            Bytes::copy_from_slice(value.as_ref()),
165        );
166
167        let request = ClientWriteRequest {
168            client_id: self.client_id,
169            commands: vec![command],
170        };
171
172        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
173
174        self.event_tx
175            .send(RaftEvent::ClientPropose(request, resp_tx))
176            .await
177            .map_err(|_| LocalClientError::ChannelClosed)?;
178
179        let result = tokio::time::timeout(self.timeout, resp_rx)
180            .await
181            .map_err(|_| LocalClientError::Timeout(self.timeout))?
182            .map_err(|_| LocalClientError::ChannelClosed)?;
183
184        let response = result.map_err(|status| {
185            LocalClientError::ServerError(format!("RPC error: {}", status.message()))
186        })?;
187
188        if response.error != ErrorCode::Success as i32 {
189            return Err(Self::map_error_response(response.error, response.metadata));
190        }
191
192        Ok(())
193    }
194
195    /// Strongly consistent read (linearizable).
196    ///
197    /// Guarantees reading the latest committed value by querying the Leader.
198    /// Use for critical reads where staleness is unacceptable.
199    ///
200    /// # Performance
201    /// - Latency: ~1-5ms (network RTT to Leader)
202    /// - Throughput: Limited by Leader capacity
203    ///
204    /// # Raft Protocol
205    /// Implements linearizable read per Raft ยง8.
206    ///
207    /// # Example
208    /// ```ignore
209    /// let client = node.local_client();
210    /// let value = client.get_linearizable(b"critical-config").await?;
211    /// ```
212    pub async fn get_linearizable(
213        &self,
214        key: impl AsRef<[u8]>,
215    ) -> Result<Option<Bytes>> {
216        self.get_with_consistency(key, ReadConsistencyPolicy::LinearizableRead).await
217    }
218
219    /// Eventually consistent read (stale OK).
220    ///
221    /// Reads from local state machine without Leader coordination.
222    /// Fast but may return stale data if replication is lagging.
223    ///
224    /// # Performance
225    /// - Latency: ~0.1ms (local memory access)
226    /// - Throughput: High (no Leader bottleneck)
227    ///
228    /// # Use Cases
229    /// - Read-heavy workloads
230    /// - Analytics/reporting (staleness acceptable)
231    /// - Caching scenarios
232    ///
233    /// # Example
234    /// ```ignore
235    /// let client = node.local_client();
236    /// let cached_value = client.get_eventual(b"user-preference").await?;
237    /// ```
238    pub async fn get_eventual(
239        &self,
240        key: impl AsRef<[u8]>,
241    ) -> Result<Option<Bytes>> {
242        self.get_with_consistency(key, ReadConsistencyPolicy::EventualConsistency).await
243    }
244
245    /// Advanced: Read with explicit consistency policy.
246    ///
247    /// For fine-grained control over read consistency vs performance trade-off.
248    ///
249    /// # Consistency Policies
250    /// - `LinearizableRead`: Read from Leader (strong consistency, may be slower)
251    /// - `EventualConsistency`: Read from local node (fast, may be stale)
252    /// - `LeaseRead`: Optimized Leader read using lease mechanism
253    ///
254    /// # Example
255    /// ```ignore
256    /// use d_engine_proto::client::ReadConsistencyPolicy;
257    ///
258    /// let value = client.get_with_consistency(
259    ///     b"key",
260    ///     ReadConsistencyPolicy::LeaseRead,
261    /// ).await?;
262    /// ```
263    pub async fn get_with_consistency(
264        &self,
265        key: impl AsRef<[u8]>,
266        consistency: ReadConsistencyPolicy,
267    ) -> Result<Option<Bytes>> {
268        let request = ClientReadRequest {
269            client_id: self.client_id,
270            keys: vec![Bytes::copy_from_slice(key.as_ref())],
271            consistency_policy: Some(consistency as i32),
272        };
273
274        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
275
276        self.event_tx
277            .send(RaftEvent::ClientReadRequest(request, resp_tx))
278            .await
279            .map_err(|_| LocalClientError::ChannelClosed)?;
280
281        let result = tokio::time::timeout(self.timeout, resp_rx)
282            .await
283            .map_err(|_| LocalClientError::Timeout(self.timeout))?
284            .map_err(|_| LocalClientError::ChannelClosed)?;
285
286        let response = result.map_err(|status| {
287            LocalClientError::ServerError(format!("RPC error: {}", status.message()))
288        })?;
289
290        if response.error != ErrorCode::Success as i32 {
291            return Err(Self::map_error_response(response.error, response.metadata));
292        }
293
294        match response.success_result {
295            Some(d_engine_proto::client::client_response::SuccessResult::ReadData(
296                read_results,
297            )) => {
298                // If results list is empty, key doesn't exist
299                // Otherwise, return the value (even if empty bytes)
300                Ok(read_results.results.first().map(|r| r.value.clone()))
301            }
302            _ => Ok(None),
303        }
304    }
305
306    /// Get multiple keys with linearizable consistency.
307    ///
308    /// Reads multiple keys from the Leader with strong consistency guarantee.
309    ///
310    /// # Example
311    /// ```ignore
312    /// let keys = vec![Bytes::from("key1"), Bytes::from("key2")];
313    /// let values = client.get_multi_linearizable(&keys).await?;
314    /// ```
315    pub async fn get_multi_linearizable(
316        &self,
317        keys: &[Bytes],
318    ) -> Result<Vec<Option<Bytes>>> {
319        self.get_multi_with_consistency(keys, ReadConsistencyPolicy::LinearizableRead)
320            .await
321    }
322
323    /// Get multiple keys with eventual consistency.
324    ///
325    /// Reads multiple keys from local state machine (fast, may be stale).
326    ///
327    /// # Example
328    /// ```ignore
329    /// let keys = vec![Bytes::from("key1"), Bytes::from("key2")];
330    /// let values = client.get_multi_eventual(&keys).await?;
331    /// ```
332    pub async fn get_multi_eventual(
333        &self,
334        keys: &[Bytes],
335    ) -> Result<Vec<Option<Bytes>>> {
336        self.get_multi_with_consistency(keys, ReadConsistencyPolicy::EventualConsistency)
337            .await
338    }
339
340    /// Advanced: Get multiple keys with explicit consistency policy.
341    pub async fn get_multi_with_consistency(
342        &self,
343        keys: &[Bytes],
344        consistency: ReadConsistencyPolicy,
345    ) -> Result<Vec<Option<Bytes>>> {
346        let request = ClientReadRequest {
347            client_id: self.client_id,
348            keys: keys.to_vec(),
349            consistency_policy: Some(consistency as i32),
350        };
351
352        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
353
354        self.event_tx
355            .send(RaftEvent::ClientReadRequest(request, resp_tx))
356            .await
357            .map_err(|_| LocalClientError::ChannelClosed)?;
358
359        let result = tokio::time::timeout(self.timeout, resp_rx)
360            .await
361            .map_err(|_| LocalClientError::Timeout(self.timeout))?
362            .map_err(|_| LocalClientError::ChannelClosed)?;
363
364        let response = result.map_err(|status| {
365            LocalClientError::ServerError(format!("RPC error: {}", status.message()))
366        })?;
367
368        if response.error != ErrorCode::Success as i32 {
369            return Err(Self::map_error_response(response.error, response.metadata));
370        }
371
372        match response.success_result {
373            Some(d_engine_proto::client::client_response::SuccessResult::ReadData(
374                read_results,
375            )) => {
376                // Reconstruct result vector in requested key order.
377                // Server only returns results for keys that exist, so we must
378                // map by key to preserve positional correspondence with input.
379                let results_by_key: std::collections::HashMap<_, _> =
380                    read_results.results.into_iter().map(|r| (r.key, r.value)).collect();
381
382                Ok(keys.iter().map(|k| results_by_key.get(k).cloned()).collect())
383            }
384            _ => Ok(vec![None; keys.len()]),
385        }
386    }
387
388    /// Delete a key-value pair.
389    pub async fn delete(
390        &self,
391        key: impl AsRef<[u8]>,
392    ) -> Result<()> {
393        let command = WriteCommand::delete(Bytes::copy_from_slice(key.as_ref()));
394
395        let request = ClientWriteRequest {
396            client_id: self.client_id,
397            commands: vec![command],
398        };
399
400        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
401
402        self.event_tx
403            .send(RaftEvent::ClientPropose(request, resp_tx))
404            .await
405            .map_err(|_| LocalClientError::ChannelClosed)?;
406
407        let result = tokio::time::timeout(self.timeout, resp_rx)
408            .await
409            .map_err(|_| LocalClientError::Timeout(self.timeout))?
410            .map_err(|_| LocalClientError::ChannelClosed)?;
411
412        let response = result.map_err(|status| {
413            LocalClientError::ServerError(format!("RPC error: {}", status.message()))
414        })?;
415
416        if response.error != ErrorCode::Success as i32 {
417            return Err(Self::map_error_response(response.error, response.metadata));
418        }
419
420        Ok(())
421    }
422
423    /// Returns the client ID assigned to this local client
424    pub fn client_id(&self) -> u32 {
425        self.client_id
426    }
427
428    /// Returns the configured timeout duration for operations
429    pub fn timeout(&self) -> Duration {
430        self.timeout
431    }
432
433    /// Returns the node ID for testing purposes
434    pub fn node_id(&self) -> u32 {
435        self.client_id
436    }
437}
438
439impl std::fmt::Debug for LocalKvClient {
440    fn fmt(
441        &self,
442        f: &mut std::fmt::Formatter<'_>,
443    ) -> std::fmt::Result {
444        f.debug_struct("LocalKvClient")
445            .field("client_id", &self.client_id)
446            .field("timeout", &self.timeout)
447            .finish()
448    }
449}
450
451// Implement KvClient trait
452#[async_trait::async_trait]
453impl KvClient for LocalKvClient {
454    async fn put(
455        &self,
456        key: impl AsRef<[u8]> + Send,
457        value: impl AsRef<[u8]> + Send,
458    ) -> KvResult<()> {
459        self.put(key, value).await.map_err(Into::into)
460    }
461
462    async fn put_with_ttl(
463        &self,
464        key: impl AsRef<[u8]> + Send,
465        value: impl AsRef<[u8]> + Send,
466        ttl_secs: u64,
467    ) -> KvResult<()> {
468        // Create command with TTL
469        let command = WriteCommand::insert_with_ttl(
470            Bytes::copy_from_slice(key.as_ref()),
471            Bytes::copy_from_slice(value.as_ref()),
472            ttl_secs,
473        );
474
475        let request = ClientWriteRequest {
476            client_id: self.client_id,
477            commands: vec![command],
478        };
479
480        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
481
482        self.event_tx
483            .send(RaftEvent::ClientPropose(request, resp_tx))
484            .await
485            .map_err(|_| KvClientError::ChannelClosed)?;
486
487        let result = tokio::time::timeout(self.timeout, resp_rx)
488            .await
489            .map_err(|_| KvClientError::Timeout)?
490            .map_err(|_| KvClientError::ChannelClosed)?;
491
492        let response = result.map_err(|status| {
493            KvClientError::ServerError(format!("RPC error: {}", status.message()))
494        })?;
495
496        if response.error != ErrorCode::Success as i32 {
497            let local_err = LocalKvClient::map_error_response(response.error, response.metadata);
498            return Err(local_err.into());
499        }
500
501        Ok(())
502    }
503
504    async fn get(
505        &self,
506        key: impl AsRef<[u8]> + Send,
507    ) -> KvResult<Option<Bytes>> {
508        self.get_linearizable(key).await.map_err(Into::into)
509    }
510
511    async fn get_multi(
512        &self,
513        keys: &[Bytes],
514    ) -> KvResult<Vec<Option<Bytes>>> {
515        self.get_multi_linearizable(keys).await.map_err(Into::into)
516    }
517
518    async fn delete(
519        &self,
520        key: impl AsRef<[u8]> + Send,
521    ) -> KvResult<()> {
522        self.delete(key).await.map_err(Into::into)
523    }
524}