d_engine_server/node/client/
local_kv.rs1use std::fmt;
19use std::time::Duration;
20
21use bytes::Bytes;
22use d_engine_client::KvClient;
23use d_engine_client::KvClientError;
24use d_engine_client::KvResult;
25use d_engine_core::MaybeCloneOneshot;
26use d_engine_core::RaftEvent;
27use d_engine_core::RaftOneshot;
28use d_engine_proto::client::ClientReadRequest;
29use d_engine_proto::client::ClientWriteRequest;
30use d_engine_proto::client::ReadConsistencyPolicy;
31use d_engine_proto::client::WriteCommand;
32use d_engine_proto::error::ErrorCode;
33use tokio::sync::mpsc;
34
35#[derive(Debug)]
37pub enum LocalClientError {
38 ChannelClosed,
40 Timeout(Duration),
42 NotLeader {
44 leader_id: Option<String>,
46 leader_address: Option<String>,
48 },
49 ServerError(String),
51}
52
53impl fmt::Display for LocalClientError {
54 fn fmt(
55 &self,
56 f: &mut fmt::Formatter<'_>,
57 ) -> fmt::Result {
58 match self {
59 LocalClientError::ChannelClosed => {
60 write!(f, "Channel closed, node may be shutting down")
61 }
62 LocalClientError::Timeout(d) => write!(f, "Operation timeout after {d:?}"),
63 LocalClientError::NotLeader {
64 leader_id,
65 leader_address,
66 } => {
67 write!(f, "Not leader")?;
68 if let Some(id) = leader_id {
69 write!(f, " (leader_id: {id})")?;
70 }
71 if let Some(addr) = leader_address {
72 write!(f, " (leader_address: {addr})")?;
73 }
74 Ok(())
75 }
76 LocalClientError::ServerError(s) => write!(f, "Server error: {s}"),
77 }
78 }
79}
80
81impl std::error::Error for LocalClientError {}
82
83pub type Result<T> = std::result::Result<T, LocalClientError>;
84
85impl From<LocalClientError> for KvClientError {
87 fn from(err: LocalClientError) -> Self {
88 match err {
89 LocalClientError::ChannelClosed => KvClientError::ChannelClosed,
90 LocalClientError::Timeout(_) => KvClientError::Timeout,
91 LocalClientError::NotLeader {
92 leader_id,
93 leader_address,
94 } => {
95 let msg = if let Some(addr) = leader_address {
96 format!("Not leader, try leader at: {addr}")
97 } else if let Some(id) = leader_id {
98 format!("Not leader, leader_id: {id}")
99 } else {
100 "Not leader".to_string()
101 };
102 KvClientError::ServerError(msg)
103 }
104 LocalClientError::ServerError(msg) => KvClientError::ServerError(msg),
105 }
106 }
107}
108
109#[derive(Clone)]
113pub struct LocalKvClient {
114 event_tx: mpsc::Sender<RaftEvent>,
115 client_id: u32,
116 timeout: Duration,
117}
118
119impl LocalKvClient {
120 pub(crate) fn new_internal(
122 event_tx: mpsc::Sender<RaftEvent>,
123 client_id: u32,
124 timeout: Duration,
125 ) -> Self {
126 Self {
127 event_tx,
128 client_id,
129 timeout,
130 }
131 }
132
133 fn map_error_response(
135 error_code: i32,
136 metadata: Option<d_engine_proto::error::ErrorMetadata>,
137 ) -> LocalClientError {
138 use d_engine_proto::error::ErrorCode;
139
140 match ErrorCode::try_from(error_code) {
141 Ok(ErrorCode::NotLeader) => {
142 let (leader_id, leader_address) = if let Some(meta) = metadata {
143 (meta.leader_id, meta.leader_address)
144 } else {
145 (None, None)
146 };
147 LocalClientError::NotLeader {
148 leader_id,
149 leader_address,
150 }
151 }
152 _ => LocalClientError::ServerError(format!("Error code: {error_code}")),
153 }
154 }
155
156 pub async fn put(
158 &self,
159 key: impl AsRef<[u8]>,
160 value: impl AsRef<[u8]>,
161 ) -> Result<()> {
162 let command = WriteCommand::insert(
163 Bytes::copy_from_slice(key.as_ref()),
164 Bytes::copy_from_slice(value.as_ref()),
165 );
166
167 let request = ClientWriteRequest {
168 client_id: self.client_id,
169 commands: vec![command],
170 };
171
172 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
173
174 self.event_tx
175 .send(RaftEvent::ClientPropose(request, resp_tx))
176 .await
177 .map_err(|_| LocalClientError::ChannelClosed)?;
178
179 let result = tokio::time::timeout(self.timeout, resp_rx)
180 .await
181 .map_err(|_| LocalClientError::Timeout(self.timeout))?
182 .map_err(|_| LocalClientError::ChannelClosed)?;
183
184 let response = result.map_err(|status| {
185 LocalClientError::ServerError(format!("RPC error: {}", status.message()))
186 })?;
187
188 if response.error != ErrorCode::Success as i32 {
189 return Err(Self::map_error_response(response.error, response.metadata));
190 }
191
192 Ok(())
193 }
194
195 pub async fn get_linearizable(
213 &self,
214 key: impl AsRef<[u8]>,
215 ) -> Result<Option<Bytes>> {
216 self.get_with_consistency(key, ReadConsistencyPolicy::LinearizableRead).await
217 }
218
219 pub async fn get_eventual(
239 &self,
240 key: impl AsRef<[u8]>,
241 ) -> Result<Option<Bytes>> {
242 self.get_with_consistency(key, ReadConsistencyPolicy::EventualConsistency).await
243 }
244
245 pub async fn get_with_consistency(
264 &self,
265 key: impl AsRef<[u8]>,
266 consistency: ReadConsistencyPolicy,
267 ) -> Result<Option<Bytes>> {
268 let request = ClientReadRequest {
269 client_id: self.client_id,
270 keys: vec![Bytes::copy_from_slice(key.as_ref())],
271 consistency_policy: Some(consistency as i32),
272 };
273
274 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
275
276 self.event_tx
277 .send(RaftEvent::ClientReadRequest(request, resp_tx))
278 .await
279 .map_err(|_| LocalClientError::ChannelClosed)?;
280
281 let result = tokio::time::timeout(self.timeout, resp_rx)
282 .await
283 .map_err(|_| LocalClientError::Timeout(self.timeout))?
284 .map_err(|_| LocalClientError::ChannelClosed)?;
285
286 let response = result.map_err(|status| {
287 LocalClientError::ServerError(format!("RPC error: {}", status.message()))
288 })?;
289
290 if response.error != ErrorCode::Success as i32 {
291 return Err(Self::map_error_response(response.error, response.metadata));
292 }
293
294 match response.success_result {
295 Some(d_engine_proto::client::client_response::SuccessResult::ReadData(
296 read_results,
297 )) => {
298 Ok(read_results.results.first().map(|r| r.value.clone()))
301 }
302 _ => Ok(None),
303 }
304 }
305
306 pub async fn get_multi_linearizable(
316 &self,
317 keys: &[Bytes],
318 ) -> Result<Vec<Option<Bytes>>> {
319 self.get_multi_with_consistency(keys, ReadConsistencyPolicy::LinearizableRead)
320 .await
321 }
322
323 pub async fn get_multi_eventual(
333 &self,
334 keys: &[Bytes],
335 ) -> Result<Vec<Option<Bytes>>> {
336 self.get_multi_with_consistency(keys, ReadConsistencyPolicy::EventualConsistency)
337 .await
338 }
339
340 pub async fn get_multi_with_consistency(
342 &self,
343 keys: &[Bytes],
344 consistency: ReadConsistencyPolicy,
345 ) -> Result<Vec<Option<Bytes>>> {
346 let request = ClientReadRequest {
347 client_id: self.client_id,
348 keys: keys.to_vec(),
349 consistency_policy: Some(consistency as i32),
350 };
351
352 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
353
354 self.event_tx
355 .send(RaftEvent::ClientReadRequest(request, resp_tx))
356 .await
357 .map_err(|_| LocalClientError::ChannelClosed)?;
358
359 let result = tokio::time::timeout(self.timeout, resp_rx)
360 .await
361 .map_err(|_| LocalClientError::Timeout(self.timeout))?
362 .map_err(|_| LocalClientError::ChannelClosed)?;
363
364 let response = result.map_err(|status| {
365 LocalClientError::ServerError(format!("RPC error: {}", status.message()))
366 })?;
367
368 if response.error != ErrorCode::Success as i32 {
369 return Err(Self::map_error_response(response.error, response.metadata));
370 }
371
372 match response.success_result {
373 Some(d_engine_proto::client::client_response::SuccessResult::ReadData(
374 read_results,
375 )) => {
376 let results_by_key: std::collections::HashMap<_, _> =
380 read_results.results.into_iter().map(|r| (r.key, r.value)).collect();
381
382 Ok(keys.iter().map(|k| results_by_key.get(k).cloned()).collect())
383 }
384 _ => Ok(vec![None; keys.len()]),
385 }
386 }
387
388 pub async fn delete(
390 &self,
391 key: impl AsRef<[u8]>,
392 ) -> Result<()> {
393 let command = WriteCommand::delete(Bytes::copy_from_slice(key.as_ref()));
394
395 let request = ClientWriteRequest {
396 client_id: self.client_id,
397 commands: vec![command],
398 };
399
400 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
401
402 self.event_tx
403 .send(RaftEvent::ClientPropose(request, resp_tx))
404 .await
405 .map_err(|_| LocalClientError::ChannelClosed)?;
406
407 let result = tokio::time::timeout(self.timeout, resp_rx)
408 .await
409 .map_err(|_| LocalClientError::Timeout(self.timeout))?
410 .map_err(|_| LocalClientError::ChannelClosed)?;
411
412 let response = result.map_err(|status| {
413 LocalClientError::ServerError(format!("RPC error: {}", status.message()))
414 })?;
415
416 if response.error != ErrorCode::Success as i32 {
417 return Err(Self::map_error_response(response.error, response.metadata));
418 }
419
420 Ok(())
421 }
422
423 pub fn client_id(&self) -> u32 {
425 self.client_id
426 }
427
428 pub fn timeout(&self) -> Duration {
430 self.timeout
431 }
432
433 pub fn node_id(&self) -> u32 {
435 self.client_id
436 }
437}
438
439impl std::fmt::Debug for LocalKvClient {
440 fn fmt(
441 &self,
442 f: &mut std::fmt::Formatter<'_>,
443 ) -> std::fmt::Result {
444 f.debug_struct("LocalKvClient")
445 .field("client_id", &self.client_id)
446 .field("timeout", &self.timeout)
447 .finish()
448 }
449}
450
451#[async_trait::async_trait]
453impl KvClient for LocalKvClient {
454 async fn put(
455 &self,
456 key: impl AsRef<[u8]> + Send,
457 value: impl AsRef<[u8]> + Send,
458 ) -> KvResult<()> {
459 self.put(key, value).await.map_err(Into::into)
460 }
461
462 async fn put_with_ttl(
463 &self,
464 key: impl AsRef<[u8]> + Send,
465 value: impl AsRef<[u8]> + Send,
466 ttl_secs: u64,
467 ) -> KvResult<()> {
468 let command = WriteCommand::insert_with_ttl(
470 Bytes::copy_from_slice(key.as_ref()),
471 Bytes::copy_from_slice(value.as_ref()),
472 ttl_secs,
473 );
474
475 let request = ClientWriteRequest {
476 client_id: self.client_id,
477 commands: vec![command],
478 };
479
480 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
481
482 self.event_tx
483 .send(RaftEvent::ClientPropose(request, resp_tx))
484 .await
485 .map_err(|_| KvClientError::ChannelClosed)?;
486
487 let result = tokio::time::timeout(self.timeout, resp_rx)
488 .await
489 .map_err(|_| KvClientError::Timeout)?
490 .map_err(|_| KvClientError::ChannelClosed)?;
491
492 let response = result.map_err(|status| {
493 KvClientError::ServerError(format!("RPC error: {}", status.message()))
494 })?;
495
496 if response.error != ErrorCode::Success as i32 {
497 let local_err = LocalKvClient::map_error_response(response.error, response.metadata);
498 return Err(local_err.into());
499 }
500
501 Ok(())
502 }
503
504 async fn get(
505 &self,
506 key: impl AsRef<[u8]> + Send,
507 ) -> KvResult<Option<Bytes>> {
508 self.get_linearizable(key).await.map_err(Into::into)
509 }
510
511 async fn get_multi(
512 &self,
513 keys: &[Bytes],
514 ) -> KvResult<Vec<Option<Bytes>>> {
515 self.get_multi_linearizable(keys).await.map_err(Into::into)
516 }
517
518 async fn delete(
519 &self,
520 key: impl AsRef<[u8]> + Send,
521 ) -> KvResult<()> {
522 self.delete(key).await.map_err(Into::into)
523 }
524}