Skip to main content

d_engine_server/api/
embedded_client.rs

1//! Zero-overhead KV client for embedded d-engine.
2//!
3//! [`EmbeddedClient`] provides direct access to Raft state machine
4//! without gRPC serialization or network traversal.
5//!
6//! # Performance
7//! - **10-20x faster** than gRPC (localhost)
8//! - **<0.1ms latency** per operation
9//! - Zero serialization overhead
10//!
11//! # Usage
12//! ```rust,ignore
13//! let engine = EmbeddedEngine::start("./data").await?;
14//! let client = engine.client();
15//! client.put(b"key", b"value").await?;
16//! ```
17
18use std::time::Duration;
19
20#[cfg(feature = "watch")]
21use std::sync::Arc;
22
23use bytes::Bytes;
24use d_engine_core::MaybeCloneOneshot;
25use d_engine_core::RaftEvent;
26use d_engine_core::RaftOneshot;
27use d_engine_core::ScanResult;
28use d_engine_core::client::{
29    ClientApi, ClientApiError, ClientApiResult, ClientReadRequest, ClientResponsePayload,
30    ClientWriteRequest, ErrorCode, LeaderHint, ReadResults, WriteOperation,
31};
32use d_engine_core::config::ReadConsistencyPolicy;
33use tokio::sync::mpsc;
34
35#[cfg(feature = "watch")]
36use d_engine_core::watch::WatchRegistry;
37
38// ============================================================================
39// Error helpers - simplify ClientApiError construction for embedded client
40// ============================================================================
41
42fn channel_closed_error() -> ClientApiError {
43    ClientApiError::Network {
44        code: ErrorCode::ConnectionTimeout,
45        message: "Channel closed, node may be shutting down".to_string(),
46        retry_after_ms: None,
47        leader_hint: None,
48    }
49}
50
51fn timeout_error(duration: Duration) -> ClientApiError {
52    ClientApiError::Network {
53        code: ErrorCode::ConnectionTimeout,
54        message: format!("Operation timed out after {duration:?}"),
55        retry_after_ms: Some(1000),
56        leader_hint: None,
57    }
58}
59
60fn not_leader_error(
61    leader_id: Option<String>,
62    leader_address: Option<String>,
63    retry_after_ms: Option<u64>,
64) -> ClientApiError {
65    let message = match (&leader_address, &leader_id) {
66        (Some(addr), _) => format!("Not leader, try leader at: {addr}"),
67        (None, Some(id)) => format!("Not leader, leader_id: {id}"),
68        (None, None) => "Not leader".to_string(),
69    };
70
71    let leader_hint = match (&leader_id, &leader_address) {
72        (Some(id_str), Some(addr)) => id_str.parse::<u32>().ok().map(|id| LeaderHint {
73            leader_id: id,
74            address: addr.clone(),
75        }),
76        _ => None,
77    };
78
79    ClientApiError::Network {
80        code: ErrorCode::NotLeader,
81        message,
82        retry_after_ms: retry_after_ms.or(Some(100)),
83        leader_hint,
84    }
85}
86
87fn server_error(msg: String) -> ClientApiError {
88    ClientApiError::Business {
89        code: ErrorCode::Uncategorized,
90        message: msg,
91        required_action: None,
92    }
93}
94
95/// Unwrap a `ClientResponsePayload` as a `ReadResults`, returning a
96/// `Protocol { InvalidResponse }` error for any other variant.
97///
98/// Centralises the match so both `get_with_consistency` and
99/// `get_multi_with_consistency` share identical error semantics, and so
100/// the logic can be unit-tested without standing up a full Raft channel.
101fn extract_read_payload(result: Option<ClientResponsePayload>) -> ClientApiResult<ReadResults> {
102    match result {
103        Some(ClientResponsePayload::Read(r)) => Ok(r),
104        Some(ClientResponsePayload::Write(_)) => Err(ClientApiError::Protocol {
105            code: ErrorCode::InvalidResponse,
106            message: "expected ReadData payload, got WriteResult".to_string(),
107            supported_versions: None,
108        }),
109        None => Err(ClientApiError::Protocol {
110            code: ErrorCode::InvalidResponse,
111            message: "expected ReadData payload, got None".to_string(),
112            supported_versions: None,
113        }),
114    }
115}
116
117/// Zero-overhead KV client for embedded mode.
118///
119/// Directly calls Raft core without gRPC overhead.
120#[derive(Clone)]
121pub struct EmbeddedClient {
122    event_tx: mpsc::Sender<RaftEvent>,
123    cmd_tx: mpsc::Sender<d_engine_core::ClientCmd>,
124    client_id: u32,
125    timeout: Duration,
126    #[cfg(feature = "watch")]
127    watch_registry: Option<Arc<WatchRegistry>>,
128}
129
130impl EmbeddedClient {
131    /// Internal constructor (used by EmbeddedEngine)
132    pub(crate) fn new_internal(
133        event_tx: mpsc::Sender<RaftEvent>,
134        cmd_tx: mpsc::Sender<d_engine_core::ClientCmd>,
135        client_id: u32,
136        timeout: Duration,
137    ) -> Self {
138        Self {
139            event_tx,
140            cmd_tx,
141            client_id,
142            timeout,
143            #[cfg(feature = "watch")]
144            watch_registry: None,
145        }
146    }
147
148    /// Set watch registry for watch operations
149    #[cfg(feature = "watch")]
150    pub(crate) fn with_watch_registry(
151        mut self,
152        registry: Arc<WatchRegistry>,
153    ) -> Self {
154        self.watch_registry = Some(registry);
155        self
156    }
157
158    fn map_error_response(
159        error: ErrorCode,
160        leader_hint: Option<LeaderHint>,
161        retry_after_ms: Option<u64>,
162    ) -> ClientApiError {
163        match error {
164            ErrorCode::NotLeader => {
165                let (leader_id, leader_address) = if let Some(hint) = leader_hint {
166                    (Some(hint.leader_id.to_string()), Some(hint.address))
167                } else {
168                    (None, None)
169                };
170                not_leader_error(leader_id, leader_address, retry_after_ms)
171            }
172            _ => server_error(format!("Error code: {error:?}")),
173        }
174    }
175
176    /// Store a key-value pair with strong consistency.
177    ///
178    /// # Errors
179    /// Returns an error if the node is not the leader, the channel is closed,
180    /// the operation times out, or the state machine returns a server error.
181    pub async fn put(
182        &self,
183        key: impl AsRef<[u8]>,
184        value: impl AsRef<[u8]>,
185    ) -> ClientApiResult<()> {
186        let request = ClientWriteRequest {
187            client_id: self.client_id,
188            command: Some(WriteOperation::Insert {
189                key: Bytes::copy_from_slice(key.as_ref()),
190                value: Bytes::copy_from_slice(value.as_ref()),
191                ttl_secs: None,
192            }),
193        };
194
195        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
196
197        self.cmd_tx
198            .send(d_engine_core::ClientCmd::Propose(request, resp_tx))
199            .await
200            .map_err(|_| channel_closed_error())?;
201
202        let result = tokio::time::timeout(self.timeout, resp_rx)
203            .await
204            .map_err(|_| timeout_error(self.timeout))?
205            .map_err(|_| channel_closed_error())?;
206
207        let response =
208            result.map_err(|status| server_error(format!("RPC error: {}", status.message())))?;
209
210        if response.error != ErrorCode::Success {
211            return Err(Self::map_error_response(
212                response.error,
213                response.leader_hint,
214                response.retry_after_ms,
215            ));
216        }
217
218        Ok(())
219    }
220
221    /// Strongly consistent read (linearizable).
222    ///
223    /// Guarantees reading the latest committed value by querying the Leader.
224    /// Use for critical reads where staleness is unacceptable.
225    ///
226    /// # Performance
227    /// - Latency: ~1-5ms (network RTT to Leader)
228    /// - Throughput: Limited by Leader capacity
229    ///
230    /// # Raft Protocol
231    /// Implements linearizable read per Raft §8.
232    ///
233    /// # Example
234    /// ```ignore
235    /// let client = engine.client();
236    /// let value = client.get_linearizable(b"critical-config").await?;
237    /// ```
238    pub async fn get_linearizable(
239        &self,
240        key: impl AsRef<[u8]>,
241    ) -> ClientApiResult<Option<Bytes>> {
242        self.get_with_consistency(key, ReadConsistencyPolicy::LinearizableRead).await
243    }
244
245    /// Eventually consistent read (stale OK).
246    ///
247    /// Reads from local state machine without Leader coordination.
248    /// Fast but may return stale data if replication is lagging.
249    ///
250    /// # Performance
251    /// - Latency: ~0.1ms (local memory access)
252    /// - Throughput: High (no Leader bottleneck)
253    ///
254    /// # Use Cases
255    /// - Read-heavy workloads
256    /// - Analytics/reporting (staleness acceptable)
257    /// - Caching scenarios
258    ///
259    /// # Example
260    /// ```ignore
261    /// let client = engine.client();
262    /// let cached_value = client.get_eventual(b"user-preference").await?;
263    /// ```
264    pub async fn get_eventual(
265        &self,
266        key: impl AsRef<[u8]>,
267    ) -> ClientApiResult<Option<Bytes>> {
268        self.get_with_consistency(key, ReadConsistencyPolicy::EventualConsistency).await
269    }
270
271    /// Advanced: Read with explicit consistency policy.
272    ///
273    /// For fine-grained control over read consistency vs performance trade-off.
274    ///
275    /// # Consistency Policies
276    /// - `LinearizableRead`: Read from Leader (strong consistency, may be slower)
277    /// - `EventualConsistency`: Read from local node (fast, may be stale)
278    /// - `LeaseRead`: Optimized Leader read using lease mechanism
279    ///
280    /// # Example
281    /// ```ignore
282    /// use d_engine_proto::client::ReadConsistencyPolicy;
283    ///
284    /// let value = client.get_with_consistency(
285    ///     b"key",
286    ///     ReadConsistencyPolicy::LeaseRead,
287    /// ).await?;
288    /// ```
289    pub async fn get_with_consistency(
290        &self,
291        key: impl AsRef<[u8]>,
292        consistency: ReadConsistencyPolicy,
293    ) -> ClientApiResult<Option<Bytes>> {
294        let request = ClientReadRequest {
295            client_id: self.client_id,
296            keys: vec![Bytes::copy_from_slice(key.as_ref())],
297            consistency_policy: Some(consistency),
298        };
299
300        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
301
302        self.cmd_tx
303            .send(d_engine_core::ClientCmd::Read(request, resp_tx))
304            .await
305            .map_err(|_| channel_closed_error())?;
306
307        let result = tokio::time::timeout(self.timeout, resp_rx)
308            .await
309            .map_err(|_| timeout_error(self.timeout))?
310            .map_err(|_| channel_closed_error())?;
311
312        let response =
313            result.map_err(|status| server_error(format!("RPC error: {}", status.message())))?;
314
315        if response.error != ErrorCode::Success {
316            return Err(Self::map_error_response(
317                response.error,
318                response.leader_hint,
319                response.retry_after_ms,
320            ));
321        }
322
323        let read_results = extract_read_payload(response.result)?;
324        Ok(read_results.entries.first().map(|e| e.value.clone()))
325    }
326
327    /// Get multiple keys with linearizable consistency.
328    ///
329    /// Reads multiple keys from the Leader with strong consistency guarantee.
330    ///
331    /// # Example
332    /// ```ignore
333    /// let keys = vec![Bytes::from("key1"), Bytes::from("key2")];
334    /// let values = client.get_multi_linearizable(&keys).await?;
335    /// ```
336    pub async fn get_multi_linearizable(
337        &self,
338        keys: &[Bytes],
339    ) -> ClientApiResult<Vec<Option<Bytes>>> {
340        self.get_multi_with_consistency(keys, ReadConsistencyPolicy::LinearizableRead)
341            .await
342    }
343
344    /// Get multiple keys with eventual consistency.
345    ///
346    /// Reads multiple keys from local state machine (fast, may be stale).
347    ///
348    /// # Example
349    /// ```ignore
350    /// let keys = vec![Bytes::from("key1"), Bytes::from("key2")];
351    /// let values = client.get_multi_eventual(&keys).await?;
352    /// ```
353    pub async fn get_multi_eventual(
354        &self,
355        keys: &[Bytes],
356    ) -> ClientApiResult<Vec<Option<Bytes>>> {
357        self.get_multi_with_consistency(keys, ReadConsistencyPolicy::EventualConsistency)
358            .await
359    }
360
361    /// Advanced: Get multiple keys with explicit consistency policy.
362    pub async fn get_multi_with_consistency(
363        &self,
364        keys: &[Bytes],
365        consistency: ReadConsistencyPolicy,
366    ) -> ClientApiResult<Vec<Option<Bytes>>> {
367        let request = ClientReadRequest {
368            client_id: self.client_id,
369            keys: keys.to_vec(),
370            consistency_policy: Some(consistency),
371        };
372
373        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
374
375        self.cmd_tx
376            .send(d_engine_core::ClientCmd::Read(request, resp_tx))
377            .await
378            .map_err(|_| channel_closed_error())?;
379
380        let result = tokio::time::timeout(self.timeout, resp_rx)
381            .await
382            .map_err(|_| timeout_error(self.timeout))?
383            .map_err(|_| channel_closed_error())?;
384
385        let response =
386            result.map_err(|status| server_error(format!("RPC error: {}", status.message())))?;
387
388        if response.error != ErrorCode::Success {
389            return Err(Self::map_error_response(
390                response.error,
391                response.leader_hint,
392                response.retry_after_ms,
393            ));
394        }
395
396        let read_results = extract_read_payload(response.result)?;
397        // Reconstruct result vector in requested key order.
398        // Server only returns results for keys that exist, so we must
399        // map by key to preserve positional correspondence with input.
400        let results_by_key: std::collections::HashMap<_, _> =
401            read_results.entries.into_iter().map(|e| (e.key, e.value)).collect();
402        Ok(keys.iter().map(|k| results_by_key.get(k).cloned()).collect())
403    }
404
405    /// Delete a key-value pair with strong consistency.
406    ///
407    /// # Errors
408    /// Returns an error if the node is not the leader, the channel is closed,
409    /// the operation times out, or the state machine returns a server error.
410    pub async fn delete(
411        &self,
412        key: impl AsRef<[u8]>,
413    ) -> ClientApiResult<()> {
414        let request = ClientWriteRequest {
415            client_id: self.client_id,
416            command: Some(WriteOperation::Delete {
417                key: Bytes::copy_from_slice(key.as_ref()),
418            }),
419        };
420
421        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
422
423        self.cmd_tx
424            .send(d_engine_core::ClientCmd::Propose(request, resp_tx))
425            .await
426            .map_err(|_| channel_closed_error())?;
427
428        let result = tokio::time::timeout(self.timeout, resp_rx)
429            .await
430            .map_err(|_| timeout_error(self.timeout))?
431            .map_err(|_| channel_closed_error())?;
432
433        let response =
434            result.map_err(|status| server_error(format!("RPC error: {}", status.message())))?;
435
436        if response.error != ErrorCode::Success {
437            return Err(Self::map_error_response(
438                response.error,
439                response.leader_hint,
440                response.retry_after_ms,
441            ));
442        }
443
444        Ok(())
445    }
446
447    /// Returns the client ID assigned to this local client
448    pub fn client_id(&self) -> u32 {
449        self.client_id
450    }
451
452    /// Returns the configured timeout duration for operations
453    pub fn timeout(&self) -> Duration {
454        self.timeout
455    }
456
457    /// Watch for changes to a specific key
458    ///
459    /// Returns a `WatcherHandle` that yields watch events when the key's value changes.
460    /// The stream will continue until explicitly dropped or a connection error occurs.
461    ///
462    /// # Arguments
463    ///
464    /// * `key` - The key to watch
465    ///
466    /// # Returns
467    ///
468    /// A `WatcherHandle` that can be used to receive watch events
469    ///
470    /// # Errors
471    ///
472    /// Returns error if watch feature is not enabled or WatchRegistry not initialized
473    ///
474    /// # Example
475    ///
476    /// ```ignore
477    /// let client = engine.client();
478    /// let mut watcher = client.watch(b"config/timeout").await?;
479    /// while let Some(event) = watcher.next().await {
480    ///     println!("Value changed: {:?}", event);
481    /// }
482    /// ```
483    #[cfg(feature = "watch")]
484    pub fn watch(
485        &self,
486        key: impl AsRef<[u8]>,
487    ) -> ClientApiResult<d_engine_core::watch::WatcherHandle> {
488        self.watch_with_options(key, false)
489    }
490
491    /// Watch a key and opt in to receiving the previous value on each mutation.
492    ///
493    /// This is the lower-level form of [`watch`](Self::watch). Use it when you need
494    /// `event.prev_value` — for example to detect what a key held before a write, or
495    /// to implement an audit log.
496    ///
497    /// # Arguments
498    ///
499    /// * `key`     - The exact key to watch.
500    /// * `prev_kv` - When `true`, every `Put` and `Delete` event carries the value the
501    ///   key held **before** the mutation in `event.prev_value`. When
502    ///   `false` (the default via [`watch`](Self::watch)), `prev_value` is
503    ///   always empty.
504    ///
505    /// # Performance note
506    ///
507    /// When at least one watcher has `prev_kv = true`, the state machine reads the old
508    /// value from storage before each `apply_chunk`.  The read is amortised across the
509    /// whole write batch — cost scales with **write rate**, not watcher count.  Disable
510    /// when you don't need the previous value.
511    ///
512    /// # `prev_value` is empty in these cases
513    ///
514    /// - The watcher was registered with `prev_kv = false`.
515    /// - The event type is `Progress` or `Canceled` (not a data mutation).
516    /// - The key did not exist before a `Put` (i.e. it was a fresh insert).
517    ///
518    /// # Example
519    ///
520    /// ```ignore
521    /// // Distributed-lock audit: know who held the lock before it changed hands.
522    /// let watcher = client.watch_with_options(b"lock/resource_a", true)?;
523    /// let (_, _, mut rx) = watcher.into_receiver();
524    ///
525    /// tokio::spawn(async move {
526    ///     while let Some(event) = rx.recv().await {
527    ///         match event.event_type {
528    ///             WatchEventType::Put => println!(
529    ///                 "lock acquired by {:?}, was held by {:?}",
530    ///                 event.value, event.prev_value
531    ///             ),
532    ///             WatchEventType::Delete => println!(
533    ///                 "lock released, was held by {:?}",
534    ///                 event.prev_value
535    ///             ),
536    ///             WatchEventType::Canceled => { /* re-register */ break; }
537    ///             WatchEventType::Progress => {}
538    ///         }
539    ///     }
540    /// });
541    /// ```
542    #[cfg(feature = "watch")]
543    pub fn watch_with_options(
544        &self,
545        key: impl AsRef<[u8]>,
546        prev_kv: bool,
547    ) -> ClientApiResult<d_engine_core::watch::WatcherHandle> {
548        let registry = self.watch_registry.as_ref().ok_or_else(|| ClientApiError::Business {
549            code: ErrorCode::Uncategorized,
550            message: "Watch feature disabled (WatchRegistry not initialized)".to_string(),
551            required_action: None,
552        })?;
553
554        let key_bytes = Bytes::copy_from_slice(key.as_ref());
555        registry.register(key_bytes, prev_kv).map_err(|e| ClientApiError::Business {
556            code: ErrorCode::Uncategorized,
557            message: e.to_string(),
558            required_action: None,
559        })
560    }
561
562    /// Register a prefix watcher — notified on every key under the given path prefix.
563    ///
564    /// `prefix` must start with '/' and end with '/'.  E.g. `b"/services/"` watches
565    /// all keys whose path begins with `/services/`.
566    #[cfg(feature = "watch")]
567    pub fn watch_prefix(
568        &self,
569        prefix: impl AsRef<[u8]>,
570    ) -> ClientApiResult<d_engine_core::watch::WatcherHandle> {
571        self.watch_prefix_with_options(prefix, false)
572    }
573
574    /// Register a prefix watcher and opt in to receiving the previous value on each mutation.
575    ///
576    /// Prefix form of [`watch_with_options`](Self::watch_with_options).  Every key whose
577    /// path starts with `prefix` triggers an event; `event.key` is the full child key.
578    ///
579    /// See [`watch_with_options`](Self::watch_with_options) for the `prev_kv` semantics,
580    /// performance note, and the cases where `prev_value` is empty.
581    ///
582    /// # Example
583    ///
584    /// ```ignore
585    /// // Track every endpoint change under /services/ together with the old address.
586    /// let watcher = client.watch_prefix_with_options(b"/services/", true)?;
587    /// let (_, _, mut rx) = watcher.into_receiver();
588    ///
589    /// tokio::spawn(async move {
590    ///     while let Some(event) = rx.recv().await {
591    ///         match event.event_type {
592    ///             WatchEventType::Put => println!(
593    ///                 "{:?} moved from {:?} → {:?}",
594    ///                 event.key, event.prev_value, event.value
595    ///             ),
596    ///             WatchEventType::Delete => println!(
597    ///                 "{:?} removed (was {:?})",
598    ///                 event.key, event.prev_value
599    ///             ),
600    ///             WatchEventType::Canceled => { /* buffer overflow — re-register */ break; }
601    ///             WatchEventType::Progress => {}
602    ///         }
603    ///     }
604    /// });
605    /// ```
606    #[cfg(feature = "watch")]
607    pub fn watch_prefix_with_options(
608        &self,
609        prefix: impl AsRef<[u8]>,
610        prev_kv: bool,
611    ) -> ClientApiResult<d_engine_core::watch::WatcherHandle> {
612        let registry = self.watch_registry.as_ref().ok_or_else(|| ClientApiError::Business {
613            code: ErrorCode::Uncategorized,
614            message: "Watch feature disabled (WatchRegistry not initialized)".to_string(),
615            required_action: None,
616        })?;
617
618        let prefix_bytes = Bytes::copy_from_slice(prefix.as_ref());
619        registry
620            .register_prefix(prefix_bytes, prev_kv)
621            .map_err(|e| ClientApiError::Business {
622                code: ErrorCode::Uncategorized,
623                message: e.to_string(),
624                required_action: None,
625            })
626    }
627
628    /// Scan all keys under `prefix` with linearizable consistency.
629    ///
630    /// Returns a `ScanResult` containing all matching `(key, value)` pairs and the
631    /// `revision` (applied index) at scan time. Use `revision` to filter watch events
632    /// during reconnection: skip events where `event.revision <= revision`.
633    pub async fn scan_prefix(
634        &self,
635        prefix: impl AsRef<[u8]>,
636    ) -> ClientApiResult<ScanResult> {
637        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
638
639        self.cmd_tx
640            .send(d_engine_core::ClientCmd::Scan(
641                Bytes::copy_from_slice(prefix.as_ref()),
642                resp_tx,
643            ))
644            .await
645            .map_err(|_| channel_closed_error())?;
646
647        let result = tokio::time::timeout(self.timeout, resp_rx)
648            .await
649            .map_err(|_| timeout_error(self.timeout))?
650            .map_err(|_| channel_closed_error())?;
651
652        result.map_err(|status| server_error(format!("RPC error: {}", status.message())))
653    }
654
655    /// Internal helper: Get cluster membership via ClusterConf event
656    async fn get_cluster_membership(
657        &self
658    ) -> ClientApiResult<d_engine_proto::server::cluster::ClusterMembership> {
659        let request = d_engine_proto::server::cluster::MetadataRequest {};
660
661        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
662
663        self.event_tx
664            .send(RaftEvent::ClusterConf(request, resp_tx))
665            .await
666            .map_err(|_| channel_closed_error())?;
667
668        let result = tokio::time::timeout(self.timeout, resp_rx)
669            .await
670            .map_err(|_| timeout_error(self.timeout))?
671            .map_err(|_| channel_closed_error())?;
672
673        result.map_err(|status| server_error(format!("ClusterConf error: {}", status.message())))
674    }
675}
676
677impl std::fmt::Debug for EmbeddedClient {
678    fn fmt(
679        &self,
680        f: &mut std::fmt::Formatter<'_>,
681    ) -> std::fmt::Result {
682        f.debug_struct("EmbeddedClient")
683            .field("client_id", &self.client_id)
684            .field("timeout", &self.timeout)
685            .finish()
686    }
687}
688
689// Implement ClientApi trait
690#[async_trait::async_trait]
691impl ClientApi for EmbeddedClient {
692    async fn put(
693        &self,
694        key: impl AsRef<[u8]> + Send,
695        value: impl AsRef<[u8]> + Send,
696    ) -> ClientApiResult<()> {
697        self.put(key, value).await
698    }
699
700    async fn put_with_ttl(
701        &self,
702        key: impl AsRef<[u8]> + Send,
703        value: impl AsRef<[u8]> + Send,
704        ttl_secs: u64,
705    ) -> ClientApiResult<()> {
706        let request = ClientWriteRequest {
707            client_id: self.client_id,
708            command: Some(WriteOperation::Insert {
709                key: Bytes::copy_from_slice(key.as_ref()),
710                value: Bytes::copy_from_slice(value.as_ref()),
711                ttl_secs: Some(ttl_secs),
712            }),
713        };
714
715        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
716
717        self.cmd_tx
718            .send(d_engine_core::ClientCmd::Propose(request, resp_tx))
719            .await
720            .map_err(|_| channel_closed_error())?;
721
722        let result = tokio::time::timeout(self.timeout, resp_rx)
723            .await
724            .map_err(|_| timeout_error(self.timeout))?
725            .map_err(|_| channel_closed_error())?;
726
727        let response =
728            result.map_err(|status| server_error(format!("RPC error: {}", status.message())))?;
729
730        if response.error != ErrorCode::Success {
731            return Err(Self::map_error_response(
732                response.error,
733                response.leader_hint,
734                response.retry_after_ms,
735            ));
736        }
737
738        Ok(())
739    }
740
741    async fn get(
742        &self,
743        key: impl AsRef<[u8]> + Send,
744    ) -> ClientApiResult<Option<Bytes>> {
745        self.get_linearizable(key).await
746    }
747
748    async fn get_multi(
749        &self,
750        keys: &[Bytes],
751    ) -> ClientApiResult<Vec<Option<Bytes>>> {
752        self.get_multi_linearizable(keys).await
753    }
754
755    async fn delete(
756        &self,
757        key: impl AsRef<[u8]> + Send,
758    ) -> ClientApiResult<()> {
759        self.delete(key).await
760    }
761
762    async fn compare_and_swap(
763        &self,
764        key: impl AsRef<[u8]> + Send,
765        expected_value: Option<impl AsRef<[u8]> + Send>,
766        new_value: impl AsRef<[u8]> + Send,
767    ) -> ClientApiResult<bool> {
768        let request = ClientWriteRequest {
769            client_id: self.client_id,
770            command: Some(WriteOperation::CompareAndSwap {
771                key: Bytes::copy_from_slice(key.as_ref()),
772                expected: expected_value.map(|v| Bytes::copy_from_slice(v.as_ref())),
773                new_value: Bytes::copy_from_slice(new_value.as_ref()),
774            }),
775        };
776
777        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
778
779        self.cmd_tx
780            .send(d_engine_core::ClientCmd::Propose(request, resp_tx))
781            .await
782            .map_err(|_| channel_closed_error())?;
783
784        let result = tokio::time::timeout(self.timeout, resp_rx)
785            .await
786            .map_err(|_| timeout_error(self.timeout))?
787            .map_err(|_| channel_closed_error())?;
788
789        let response =
790            result.map_err(|status| server_error(format!("RPC error: {}", status.message())))?;
791
792        if response.error != ErrorCode::Success {
793            return Err(Self::map_error_response(
794                response.error,
795                response.leader_hint,
796                response.retry_after_ms,
797            ));
798        }
799
800        match response.result {
801            Some(ClientResponsePayload::Write(result)) => Ok(result.succeeded),
802            _ => Err(server_error("Invalid CAS response".to_string())),
803        }
804    }
805
806    async fn list_members(
807        &self
808    ) -> ClientApiResult<Vec<d_engine_proto::server::cluster::NodeMeta>> {
809        let cluster_membership = self.get_cluster_membership().await?;
810        Ok(cluster_membership.nodes)
811    }
812
813    async fn get_leader_id(&self) -> ClientApiResult<Option<u32>> {
814        let cluster_membership = self.get_cluster_membership().await?;
815        Ok(cluster_membership.current_leader_id)
816    }
817
818    async fn get_multi_with_policy(
819        &self,
820        keys: &[Bytes],
821        consistency_policy: Option<d_engine_core::config::ReadConsistencyPolicy>,
822    ) -> ClientApiResult<Vec<Option<Bytes>>> {
823        self.get_multi_with_consistency(
824            keys,
825            consistency_policy
826                .unwrap_or(d_engine_core::config::ReadConsistencyPolicy::LinearizableRead),
827        )
828        .await
829    }
830
831    async fn get_linearizable(
832        &self,
833        key: impl AsRef<[u8]> + Send,
834    ) -> ClientApiResult<Option<Bytes>> {
835        self.get_linearizable(key).await
836    }
837
838    async fn get_lease(
839        &self,
840        key: impl AsRef<[u8]> + Send,
841    ) -> ClientApiResult<Option<Bytes>> {
842        self.get_with_consistency(key, ReadConsistencyPolicy::LeaseRead).await
843    }
844
845    async fn get_eventual(
846        &self,
847        key: impl AsRef<[u8]> + Send,
848    ) -> ClientApiResult<Option<Bytes>> {
849        self.get_eventual(key).await
850    }
851
852    async fn scan_prefix(
853        &self,
854        prefix: impl AsRef<[u8]> + Send,
855    ) -> ClientApiResult<ScanResult> {
856        self.scan_prefix(prefix).await
857    }
858}
859
860#[cfg(test)]
861mod error_helper_tests {
862    use d_engine_core::client::KvEntry;
863
864    use super::*;
865
866    // ─── not_leader_error ────────────────────────────────────────────────────
867
868    #[test]
869    fn test_not_leader_uses_server_retry_after_ms_when_provided() {
870        let err = not_leader_error(
871            Some("1".to_string()),
872            Some("127.0.0.1:5001".to_string()),
873            Some(500),
874        );
875        match err {
876            ClientApiError::Network { retry_after_ms, .. } => {
877                assert_eq!(retry_after_ms, Some(500));
878            }
879            _ => panic!("expected Network error"),
880        }
881    }
882
883    #[test]
884    fn test_not_leader_falls_back_to_100ms_when_server_provides_none() {
885        let err = not_leader_error(None, None, None);
886        match err {
887            ClientApiError::Network { retry_after_ms, .. } => {
888                assert_eq!(retry_after_ms, Some(100));
889            }
890            _ => panic!("expected Network error"),
891        }
892    }
893
894    #[test]
895    fn test_not_leader_zero_is_not_treated_as_none() {
896        // Some(0) is an explicit server instruction, not absent — must be preserved
897        let err = not_leader_error(None, None, Some(0));
898        match err {
899            ClientApiError::Network { retry_after_ms, .. } => {
900                assert_eq!(retry_after_ms, Some(0));
901            }
902            _ => panic!("expected Network error"),
903        }
904    }
905
906    // ─── map_error_response ──────────────────────────────────────────────────
907
908    #[test]
909    fn test_map_error_response_not_leader_forwards_retry_after_ms() {
910        let err = EmbeddedClient::map_error_response(
911            ErrorCode::NotLeader,
912            Some(LeaderHint {
913                leader_id: 2,
914                address: "10.0.0.2:5002".into(),
915            }),
916            Some(250),
917        );
918        match err {
919            ClientApiError::Network {
920                code,
921                retry_after_ms,
922                leader_hint,
923                ..
924            } => {
925                assert_eq!(code, ErrorCode::NotLeader);
926                assert_eq!(retry_after_ms, Some(250));
927                let h = leader_hint.unwrap();
928                assert_eq!(h.leader_id, 2);
929            }
930            _ => panic!("expected Network error"),
931        }
932    }
933
934    #[test]
935    fn test_map_error_response_not_leader_falls_back_to_100ms_when_none() {
936        let err = EmbeddedClient::map_error_response(ErrorCode::NotLeader, None, None);
937        match err {
938            ClientApiError::Network { retry_after_ms, .. } => {
939                assert_eq!(retry_after_ms, Some(100));
940            }
941            _ => panic!("expected Network error"),
942        }
943    }
944
945    // ─── extract_read_payload ────────────────────────────────────────────────
946    //
947    // These tests call `extract_read_payload` directly — the actual function
948    // used by both `get_with_consistency` and `get_multi_with_consistency`.
949    // This ensures the error behaviour is tested at the implementation site,
950    // not via a copy of the match logic that could silently drift.
951
952    #[test]
953    fn test_extract_read_payload_returns_read_results_on_success() {
954        // Happy path: a well-formed Read payload must be unwrapped without error.
955        let entries = vec![KvEntry {
956            key: Bytes::from("k"),
957            value: Bytes::from("v"),
958        }];
959        let payload = Some(ClientResponsePayload::Read(ReadResults {
960            entries: entries.clone(),
961        }));
962
963        let result = extract_read_payload(payload).unwrap();
964        assert_eq!(result.entries.len(), 1);
965        assert_eq!(result.entries[0].key, Bytes::from("k"));
966        assert_eq!(result.entries[0].value, Bytes::from("v"));
967    }
968
969    #[test]
970    fn test_extract_read_payload_rejects_write_result_payload() {
971        // A WriteResult inside a read response is a server protocol violation.
972        // It must surface as InvalidResponse, not silently become "key not found".
973        let payload = Some(ClientResponsePayload::Write(
974            d_engine_core::client::WriteResult { succeeded: true },
975        ));
976
977        let err = extract_read_payload(payload).unwrap_err();
978        assert_eq!(err.code(), ErrorCode::InvalidResponse);
979        assert!(
980            err.message().contains("WriteResult"),
981            "error message should identify the unexpected variant; got: {}",
982            err.message()
983        );
984    }
985
986    #[test]
987    fn test_extract_read_payload_rejects_none_payload() {
988        // A Success response with no payload is a protocol violation.
989        // It must surface as InvalidResponse, not silently become "key not found".
990        let err = extract_read_payload(None).unwrap_err();
991        assert_eq!(err.code(), ErrorCode::InvalidResponse);
992        assert!(
993            err.message().contains("None"),
994            "error message should identify missing payload; got: {}",
995            err.message()
996        );
997    }
998
999    #[test]
1000    fn test_extract_read_payload_empty_entries_is_valid() {
1001        // An empty ReadResults is a legitimate response (no keys matched).
1002        // It must not be treated as an error.
1003        let payload = Some(ClientResponsePayload::Read(ReadResults { entries: vec![] }));
1004
1005        let result = extract_read_payload(payload).unwrap();
1006        assert!(result.entries.is_empty());
1007    }
1008
1009    #[test]
1010    fn test_extract_read_payload_multiple_entries_are_preserved() {
1011        // All entries in the ReadResults must be passed through unchanged.
1012        let payload = Some(ClientResponsePayload::Read(ReadResults {
1013            entries: vec![
1014                KvEntry {
1015                    key: Bytes::from("k1"),
1016                    value: Bytes::from("v1"),
1017                },
1018                KvEntry {
1019                    key: Bytes::from("k2"),
1020                    value: Bytes::from("v2"),
1021                },
1022            ],
1023        }));
1024
1025        let result = extract_read_payload(payload).unwrap();
1026        assert_eq!(result.entries.len(), 2);
1027        assert_eq!(result.entries[1].key, Bytes::from("k2"));
1028    }
1029}