Skip to main content

d_engine_core/client/
client_api.rs

1//! KV client trait - unified interface for key-value operations.
2//!
3//! Provides a common abstraction for both remote (gRPC) and embedded (local) access
4//! to d-engine's key-value store.
5//!
6//! # Implementations
7//!
8//! - `GrpcClient` (d-engine-client): Remote access via gRPC protocol
9//! - `EmbeddedClient` (d-engine-server): Zero-overhead embedded access
10//!
11//! # Design Principles
12//!
13//! - **Unified Interface**: Same API for remote and embedded modes
14//! - **Async-first**: All operations are async for non-blocking I/O
15//! - **Type Safety**: Strong typing with clear error handling
16//! - **Performance**: Zero-cost abstractions, no runtime overhead
17//!
18//! # Example
19//!
20//! ```rust,ignore
21//! async fn store_config<C: ClientApi>(client: &C) -> Result<()> {
22//!     client.put(b"config:timeout", b"30s").await?;
23//!     let value = client.get(b"config:timeout").await?;
24//!     Ok(())
25//! }
26//! ```
27
28use bytes::Bytes;
29use d_engine_proto::client::ReadConsistencyPolicy;
30
31use crate::client::client_api_error::ClientApiResult;
32
33/// Unified key-value store interface.
34///
35/// This trait abstracts over different client implementations, allowing applications
36/// to write generic code that works with both remote (gRPC) and embedded (local) access.
37///
38/// # Consistency Guarantees
39///
40/// - **put()**: Strong consistency, linearizable writes
41/// - **get()**: Linearizable reads by default
42/// - **delete()**: Strong consistency, linearizable deletes
43///
44/// # Thread Safety
45///
46/// All implementations must be `Send + Sync`, safe for concurrent access.
47///
48/// # Performance Characteristics
49///
50/// - `GrpcClient`: 1-2ms latency (network + serialization)
51/// - `EmbeddedClient`: <0.1ms latency (direct function call)
52#[async_trait::async_trait]
53pub trait ClientApi: Send + Sync {
54    /// Stores a key-value pair with strong consistency.
55    ///
56    /// The write is replicated to a quorum of nodes before returning,
57    /// ensuring durability and linearizability.
58    ///
59    /// # Arguments
60    ///
61    /// * `key` - The key to store (arbitrary bytes)
62    /// * `value` - The value to store (arbitrary bytes)
63    ///
64    /// # Errors
65    ///
66    /// - [`ClientApiError::Network`] if node is shutting down or timeout occurs
67    /// - [`ClientApiError::Business`] for server-side errors (e.g., not leader)
68    ///
69    /// # Example
70    ///
71    /// ```rust,ignore
72    /// client.put(b"user:1001", b"Alice").await?;
73    /// ```
74    async fn put(
75        &self,
76        key: impl AsRef<[u8]> + Send,
77        value: impl AsRef<[u8]> + Send,
78    ) -> ClientApiResult<()>;
79
80    /// Stores a key-value pair with time-to-live (TTL).
81    ///
82    /// The key will automatically expire after `ttl_secs` seconds.
83    ///
84    /// # Arguments
85    ///
86    /// * `key` - The key to store
87    /// * `value` - The value to store
88    /// * `ttl_secs` - Time-to-live in seconds
89    ///
90    /// # Errors
91    ///
92    /// Same as [`put()`](Self::put)
93    ///
94    /// # Example
95    ///
96    /// ```rust,ignore
97    /// // Session expires after 1 hour
98    /// client.put_with_ttl(b"session:abc", b"user_data", 3600).await?;
99    /// ```
100    async fn put_with_ttl(
101        &self,
102        key: impl AsRef<[u8]> + Send,
103        value: impl AsRef<[u8]> + Send,
104        ttl_secs: u64,
105    ) -> ClientApiResult<()>;
106
107    /// Retrieves the value associated with a key.
108    ///
109    /// Uses linearizable reads by default, ensuring the returned value
110    /// reflects all previously committed writes.
111    ///
112    /// # Arguments
113    ///
114    /// * `key` - The key to retrieve
115    ///
116    /// # Returns
117    ///
118    /// * `Ok(Some(value))` if key exists
119    /// * `Ok(None)` if key does not exist or has expired
120    /// * `Err(_)` if operation failed
121    ///
122    /// # Errors
123    ///
124    /// - [`ClientApiError::Network`] if node is shutting down or timeout occurs
125    /// - [`ClientApiError::Business`] for server-side errors
126    ///
127    /// # Example
128    ///
129    /// ```rust,ignore
130    /// match client.get(b"user:1001").await? {
131    ///     Some(value) => println!("User: {:?}", value),
132    ///     None => println!("User not found"),
133    /// }
134    /// ```
135    async fn get(
136        &self,
137        key: impl AsRef<[u8]> + Send,
138    ) -> ClientApiResult<Option<Bytes>>;
139
140    /// Retrieves multiple keys in a single request.
141    ///
142    /// More efficient than multiple individual `get()` calls when fetching
143    /// multiple keys, as it batches the requests.
144    ///
145    /// # Arguments
146    ///
147    /// * `keys` - Slice of keys to retrieve
148    ///
149    /// # Returns
150    ///
151    /// Vector of results in the same order as input keys.
152    /// `None` for keys that don't exist.
153    ///
154    /// # Errors
155    ///
156    /// Same as [`get()`](Self::get)
157    ///
158    /// # Example
159    ///
160    /// ```rust,ignore
161    /// let keys = vec![
162    ///     Bytes::from("user:1001"),
163    ///     Bytes::from("user:1002"),
164    /// ];
165    /// let results = client.get_multi(&keys).await?;
166    /// ```
167    async fn get_multi(
168        &self,
169        keys: &[Bytes],
170    ) -> ClientApiResult<Vec<Option<Bytes>>>;
171
172    /// Deletes a key-value pair with strong consistency.
173    ///
174    /// The deletion is replicated to a quorum before returning.
175    /// Returns successfully even if the key does not exist (idempotent).
176    ///
177    /// # Arguments
178    ///
179    /// * `key` - The key to delete
180    ///
181    /// # Errors
182    ///
183    /// - [`ClientApiError::Network`] if node is shutting down or timeout occurs
184    /// - [`ClientApiError::Business`] for server-side errors
185    ///
186    /// # Example
187    ///
188    /// ```rust,ignore
189    /// client.delete(b"temp:session_123").await?;
190    /// ```
191    async fn delete(
192        &self,
193        key: impl AsRef<[u8]> + Send,
194    ) -> ClientApiResult<()>;
195
196    /// Atomically compare and swap a key's value
197    ///
198    /// Returns:
199    /// - Ok(true): CAS succeeded, value was updated
200    /// - Ok(false): CAS failed, current value != expected_value
201    /// - Err(e): Operation error (timeout, cluster unavailable, etc.)
202    async fn compare_and_swap(
203        &self,
204        key: impl AsRef<[u8]> + Send,
205        expected_value: Option<impl AsRef<[u8]> + Send>,
206        new_value: impl AsRef<[u8]> + Send,
207    ) -> ClientApiResult<bool>;
208
209    // ==================== Cluster Management Operations ====================
210
211    /// Lists all cluster members with metadata
212    ///
213    /// Returns node information including:
214    /// - Node ID
215    /// - Address
216    /// - Role (Leader/Follower/Learner)
217    /// - Status
218    ///
219    /// # Errors
220    ///
221    /// Returns error if unable to retrieve cluster metadata
222    async fn list_members(&self)
223    -> ClientApiResult<Vec<d_engine_proto::server::cluster::NodeMeta>>;
224
225    /// Get the current leader ID
226    ///
227    /// Returns the leader node ID if known, or None if no leader is currently elected.
228    ///
229    /// # Errors
230    ///
231    /// Returns error if unable to determine leader status
232    async fn get_leader_id(&self) -> ClientApiResult<Option<u32>>;
233
234    // ==================== Advanced Read Operations with Consistency Control ====================
235
236    /// Retrieves multiple keys with explicit consistency policy
237    ///
238    /// Batch retrieval with explicit consistency control.
239    /// More efficient than multiple individual calls when fetching multiple keys.
240    ///
241    /// # Arguments
242    ///
243    /// * `keys` - Slice of keys to retrieve
244    /// * `consistency_policy` - Explicit consistency policy for this batch request
245    ///
246    /// # Returns
247    ///
248    /// Vector of results in the same order as input keys.
249    /// `None` for keys that don't exist.
250    ///
251    /// # Errors
252    ///
253    /// Same as [`get()`](Self::get)
254    ///
255    /// # Example
256    ///
257    /// ```rust,ignore
258    /// use d_engine_proto::client::ReadConsistencyPolicy;
259    ///
260    /// let keys = vec![
261    ///     Bytes::from("key1"),
262    ///     Bytes::from("key2"),
263    /// ];
264    /// let values = client.get_multi_with_policy(
265    ///     &keys,
266    ///     Some(ReadConsistencyPolicy::EventualConsistency)
267    /// ).await?;
268    /// ```
269    async fn get_multi_with_policy(
270        &self,
271        keys: &[Bytes],
272        consistency_policy: Option<ReadConsistencyPolicy>,
273    ) -> ClientApiResult<Vec<Option<Bytes>>>;
274
275    /// Retrieves a key with linearizable (strong) consistency
276    ///
277    /// Convenience method for [`get_multi_with_policy()`](Self::get_multi_with_policy) with
278    /// `LinearizableRead` policy. Guarantees reading the latest committed value.
279    ///
280    /// # Arguments
281    ///
282    /// * `key` - The key to retrieve
283    ///
284    /// # Returns
285    ///
286    /// * `Ok(Some(value))` if key exists
287    /// * `Ok(None)` if key does not exist or has expired
288    ///
289    /// # Errors
290    ///
291    /// Same as [`get()`](Self::get)
292    ///
293    /// # Example
294    ///
295    /// ```rust,ignore
296    /// // Guaranteed to read latest value
297    /// let value = client.get_linearizable(b"critical-config").await?;
298    /// ```
299    async fn get_linearizable(
300        &self,
301        key: impl AsRef<[u8]> + Send,
302    ) -> ClientApiResult<Option<Bytes>>;
303
304    /// Retrieves a key with lease-based consistency
305    ///
306    /// Convenience method for [`get_multi_with_policy()`](Self::get_multi_with_policy) with
307    /// `LeaseRead` policy. Optimized linearizable read using Leader lease mechanism.
308    ///
309    /// # Arguments
310    ///
311    /// * `key` - The key to retrieve
312    ///
313    /// # Returns
314    ///
315    /// * `Ok(Some(value))` if key exists
316    /// * `Ok(None)` if key does not exist or has expired
317    ///
318    /// # Errors
319    ///
320    /// Same as [`get()`](Self::get)
321    ///
322    /// # Example
323    ///
324    /// ```rust,ignore
325    /// // Lease-based read (faster than linearizable, still strong consistency)
326    /// let value = client.get_lease(b"config").await?;
327    /// ```
328    async fn get_lease(
329        &self,
330        key: impl AsRef<[u8]> + Send,
331    ) -> ClientApiResult<Option<Bytes>>;
332
333    /// Retrieves a key with eventual consistency
334    ///
335    /// Convenience method for [`get_multi_with_policy()`](Self::get_multi_with_policy) with
336    /// `EventualConsistency` policy. Fast but may return stale data if replication is lagging.
337    ///
338    /// # Arguments
339    ///
340    /// * `key` - The key to retrieve
341    ///
342    /// # Returns
343    ///
344    /// * `Ok(Some(value))` if key exists (may be stale)
345    /// * `Ok(None)` if key does not exist or has expired
346    ///
347    /// # Errors
348    ///
349    /// Same as [`get()`](Self::get)
350    ///
351    /// # Use Cases
352    ///
353    /// - Read-heavy workloads with acceptable staleness
354    /// - Analytics/reporting (staleness acceptable)
355    /// - Caching scenarios
356    ///
357    /// # Example
358    ///
359    /// ```rust,ignore
360    /// // Fast stale read
361    /// let cached_value = client.get_eventual(b"user-preference").await?;
362    /// ```
363    async fn get_eventual(
364        &self,
365        key: impl AsRef<[u8]> + Send,
366    ) -> ClientApiResult<Option<Bytes>>;
367}