d_engine_core/client/client_api.rs
1//! KV client trait - unified interface for key-value operations.
2//!
3//! Provides a common abstraction for both remote (gRPC) and embedded (local) access
4//! to d-engine's key-value store.
5//!
6//! # Implementations
7//!
8//! - `GrpcClient` (d-engine-client): Remote access via gRPC protocol
9//! - `EmbeddedClient` (d-engine-server): Zero-overhead embedded access
10//!
11//! # Design Principles
12//!
13//! - **Unified Interface**: Same API for remote and embedded modes
14//! - **Async-first**: All operations are async for non-blocking I/O
15//! - **Type Safety**: Strong typing with clear error handling
16//! - **Performance**: Zero-cost abstractions, no runtime overhead
17//!
18//! # Example
19//!
20//! ```rust,ignore
21//! async fn store_config<C: ClientApi>(client: &C) -> Result<()> {
22//! client.put(b"config:timeout", b"30s").await?;
23//! let value = client.get(b"config:timeout").await?;
24//! Ok(())
25//! }
26//! ```
27
28use bytes::Bytes;
29use d_engine_proto::client::ReadConsistencyPolicy;
30
31use crate::client::client_api_error::ClientApiResult;
32
33/// Unified key-value store interface.
34///
35/// This trait abstracts over different client implementations, allowing applications
36/// to write generic code that works with both remote (gRPC) and embedded (local) access.
37///
38/// # Consistency Guarantees
39///
40/// - **put()**: Strong consistency, linearizable writes
41/// - **get()**: Linearizable reads by default
42/// - **delete()**: Strong consistency, linearizable deletes
43///
44/// # Thread Safety
45///
46/// All implementations must be `Send + Sync`, safe for concurrent access.
47///
48/// # Performance Characteristics
49///
50/// - `GrpcClient`: 1-2ms latency (network + serialization)
51/// - `EmbeddedClient`: <0.1ms latency (direct function call)
52#[async_trait::async_trait]
53pub trait ClientApi: Send + Sync {
54 /// Stores a key-value pair with strong consistency.
55 ///
56 /// The write is replicated to a quorum of nodes before returning,
57 /// ensuring durability and linearizability.
58 ///
59 /// # Arguments
60 ///
61 /// * `key` - The key to store (arbitrary bytes)
62 /// * `value` - The value to store (arbitrary bytes)
63 ///
64 /// # Errors
65 ///
66 /// - [`ClientApiError::Network`] if node is shutting down or timeout occurs
67 /// - [`ClientApiError::Business`] for server-side errors (e.g., not leader)
68 ///
69 /// # Example
70 ///
71 /// ```rust,ignore
72 /// client.put(b"user:1001", b"Alice").await?;
73 /// ```
74 async fn put(
75 &self,
76 key: impl AsRef<[u8]> + Send,
77 value: impl AsRef<[u8]> + Send,
78 ) -> ClientApiResult<()>;
79
80 /// Stores a key-value pair with time-to-live (TTL).
81 ///
82 /// The key will automatically expire after `ttl_secs` seconds.
83 ///
84 /// # Arguments
85 ///
86 /// * `key` - The key to store
87 /// * `value` - The value to store
88 /// * `ttl_secs` - Time-to-live in seconds
89 ///
90 /// # Errors
91 ///
92 /// Same as [`put()`](Self::put)
93 ///
94 /// # Example
95 ///
96 /// ```rust,ignore
97 /// // Session expires after 1 hour
98 /// client.put_with_ttl(b"session:abc", b"user_data", 3600).await?;
99 /// ```
100 async fn put_with_ttl(
101 &self,
102 key: impl AsRef<[u8]> + Send,
103 value: impl AsRef<[u8]> + Send,
104 ttl_secs: u64,
105 ) -> ClientApiResult<()>;
106
107 /// Retrieves the value associated with a key.
108 ///
109 /// Uses linearizable reads by default, ensuring the returned value
110 /// reflects all previously committed writes.
111 ///
112 /// # Arguments
113 ///
114 /// * `key` - The key to retrieve
115 ///
116 /// # Returns
117 ///
118 /// * `Ok(Some(value))` if key exists
119 /// * `Ok(None)` if key does not exist or has expired
120 /// * `Err(_)` if operation failed
121 ///
122 /// # Errors
123 ///
124 /// - [`ClientApiError::Network`] if node is shutting down or timeout occurs
125 /// - [`ClientApiError::Business`] for server-side errors
126 ///
127 /// # Example
128 ///
129 /// ```rust,ignore
130 /// match client.get(b"user:1001").await? {
131 /// Some(value) => println!("User: {:?}", value),
132 /// None => println!("User not found"),
133 /// }
134 /// ```
135 async fn get(
136 &self,
137 key: impl AsRef<[u8]> + Send,
138 ) -> ClientApiResult<Option<Bytes>>;
139
140 /// Retrieves multiple keys in a single request.
141 ///
142 /// More efficient than multiple individual `get()` calls when fetching
143 /// multiple keys, as it batches the requests.
144 ///
145 /// # Arguments
146 ///
147 /// * `keys` - Slice of keys to retrieve
148 ///
149 /// # Returns
150 ///
151 /// Vector of results in the same order as input keys.
152 /// `None` for keys that don't exist.
153 ///
154 /// # Errors
155 ///
156 /// Same as [`get()`](Self::get)
157 ///
158 /// # Example
159 ///
160 /// ```rust,ignore
161 /// let keys = vec![
162 /// Bytes::from("user:1001"),
163 /// Bytes::from("user:1002"),
164 /// ];
165 /// let results = client.get_multi(&keys).await?;
166 /// ```
167 async fn get_multi(
168 &self,
169 keys: &[Bytes],
170 ) -> ClientApiResult<Vec<Option<Bytes>>>;
171
172 /// Deletes a key-value pair with strong consistency.
173 ///
174 /// The deletion is replicated to a quorum before returning.
175 /// Returns successfully even if the key does not exist (idempotent).
176 ///
177 /// # Arguments
178 ///
179 /// * `key` - The key to delete
180 ///
181 /// # Errors
182 ///
183 /// - [`ClientApiError::Network`] if node is shutting down or timeout occurs
184 /// - [`ClientApiError::Business`] for server-side errors
185 ///
186 /// # Example
187 ///
188 /// ```rust,ignore
189 /// client.delete(b"temp:session_123").await?;
190 /// ```
191 async fn delete(
192 &self,
193 key: impl AsRef<[u8]> + Send,
194 ) -> ClientApiResult<()>;
195
196 /// Atomically compare and swap a key's value
197 ///
198 /// Returns:
199 /// - Ok(true): CAS succeeded, value was updated
200 /// - Ok(false): CAS failed, current value != expected_value
201 /// - Err(e): Operation error (timeout, cluster unavailable, etc.)
202 async fn compare_and_swap(
203 &self,
204 key: impl AsRef<[u8]> + Send,
205 expected_value: Option<impl AsRef<[u8]> + Send>,
206 new_value: impl AsRef<[u8]> + Send,
207 ) -> ClientApiResult<bool>;
208
209 // ==================== Cluster Management Operations ====================
210
211 /// Lists all cluster members with metadata
212 ///
213 /// Returns node information including:
214 /// - Node ID
215 /// - Address
216 /// - Role (Leader/Follower/Learner)
217 /// - Status
218 ///
219 /// # Errors
220 ///
221 /// Returns error if unable to retrieve cluster metadata
222 async fn list_members(&self)
223 -> ClientApiResult<Vec<d_engine_proto::server::cluster::NodeMeta>>;
224
225 /// Get the current leader ID
226 ///
227 /// Returns the leader node ID if known, or None if no leader is currently elected.
228 ///
229 /// # Errors
230 ///
231 /// Returns error if unable to determine leader status
232 async fn get_leader_id(&self) -> ClientApiResult<Option<u32>>;
233
234 // ==================== Advanced Read Operations with Consistency Control ====================
235
236 /// Retrieves multiple keys with explicit consistency policy
237 ///
238 /// Batch retrieval with explicit consistency control.
239 /// More efficient than multiple individual calls when fetching multiple keys.
240 ///
241 /// # Arguments
242 ///
243 /// * `keys` - Slice of keys to retrieve
244 /// * `consistency_policy` - Explicit consistency policy for this batch request
245 ///
246 /// # Returns
247 ///
248 /// Vector of results in the same order as input keys.
249 /// `None` for keys that don't exist.
250 ///
251 /// # Errors
252 ///
253 /// Same as [`get()`](Self::get)
254 ///
255 /// # Example
256 ///
257 /// ```rust,ignore
258 /// use d_engine_proto::client::ReadConsistencyPolicy;
259 ///
260 /// let keys = vec![
261 /// Bytes::from("key1"),
262 /// Bytes::from("key2"),
263 /// ];
264 /// let values = client.get_multi_with_policy(
265 /// &keys,
266 /// Some(ReadConsistencyPolicy::EventualConsistency)
267 /// ).await?;
268 /// ```
269 async fn get_multi_with_policy(
270 &self,
271 keys: &[Bytes],
272 consistency_policy: Option<ReadConsistencyPolicy>,
273 ) -> ClientApiResult<Vec<Option<Bytes>>>;
274
275 /// Retrieves a key with linearizable (strong) consistency
276 ///
277 /// Convenience method for [`get_multi_with_policy()`](Self::get_multi_with_policy) with
278 /// `LinearizableRead` policy. Guarantees reading the latest committed value.
279 ///
280 /// # Arguments
281 ///
282 /// * `key` - The key to retrieve
283 ///
284 /// # Returns
285 ///
286 /// * `Ok(Some(value))` if key exists
287 /// * `Ok(None)` if key does not exist or has expired
288 ///
289 /// # Errors
290 ///
291 /// Same as [`get()`](Self::get)
292 ///
293 /// # Example
294 ///
295 /// ```rust,ignore
296 /// // Guaranteed to read latest value
297 /// let value = client.get_linearizable(b"critical-config").await?;
298 /// ```
299 async fn get_linearizable(
300 &self,
301 key: impl AsRef<[u8]> + Send,
302 ) -> ClientApiResult<Option<Bytes>>;
303
304 /// Retrieves a key with lease-based consistency
305 ///
306 /// Convenience method for [`get_multi_with_policy()`](Self::get_multi_with_policy) with
307 /// `LeaseRead` policy. Optimized linearizable read using Leader lease mechanism.
308 ///
309 /// # Arguments
310 ///
311 /// * `key` - The key to retrieve
312 ///
313 /// # Returns
314 ///
315 /// * `Ok(Some(value))` if key exists
316 /// * `Ok(None)` if key does not exist or has expired
317 ///
318 /// # Errors
319 ///
320 /// Same as [`get()`](Self::get)
321 ///
322 /// # Example
323 ///
324 /// ```rust,ignore
325 /// // Lease-based read (faster than linearizable, still strong consistency)
326 /// let value = client.get_lease(b"config").await?;
327 /// ```
328 async fn get_lease(
329 &self,
330 key: impl AsRef<[u8]> + Send,
331 ) -> ClientApiResult<Option<Bytes>>;
332
333 /// Retrieves a key with eventual consistency
334 ///
335 /// Convenience method for [`get_multi_with_policy()`](Self::get_multi_with_policy) with
336 /// `EventualConsistency` policy. Fast but may return stale data if replication is lagging.
337 ///
338 /// # Arguments
339 ///
340 /// * `key` - The key to retrieve
341 ///
342 /// # Returns
343 ///
344 /// * `Ok(Some(value))` if key exists (may be stale)
345 /// * `Ok(None)` if key does not exist or has expired
346 ///
347 /// # Errors
348 ///
349 /// Same as [`get()`](Self::get)
350 ///
351 /// # Use Cases
352 ///
353 /// - Read-heavy workloads with acceptable staleness
354 /// - Analytics/reporting (staleness acceptable)
355 /// - Caching scenarios
356 ///
357 /// # Example
358 ///
359 /// ```rust,ignore
360 /// // Fast stale read
361 /// let cached_value = client.get_eventual(b"user-preference").await?;
362 /// ```
363 async fn get_eventual(
364 &self,
365 key: impl AsRef<[u8]> + Send,
366 ) -> ClientApiResult<Option<Bytes>>;
367}