pub struct EmbeddedClient { /* private fields */ }Expand description
Zero-overhead KV client for embedded mode.
Directly calls Raft core without gRPC overhead.
Implementations§
Source§impl EmbeddedClient
impl EmbeddedClient
Sourcepub async fn put(
&self,
key: impl AsRef<[u8]>,
value: impl AsRef<[u8]>,
) -> ClientApiResult<()>
pub async fn put( &self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>, ) -> ClientApiResult<()>
Store a key-value pair with strong consistency.
§Errors
Returns an error if the node is not the leader, the channel is closed, the operation times out, or the state machine returns a server error.
Sourcepub async fn get_linearizable(
&self,
key: impl AsRef<[u8]>,
) -> ClientApiResult<Option<Bytes>>
pub async fn get_linearizable( &self, key: impl AsRef<[u8]>, ) -> ClientApiResult<Option<Bytes>>
Strongly consistent read (linearizable).
Guarantees reading the latest committed value by querying the Leader. Use for critical reads where staleness is unacceptable.
§Performance
- Latency: ~1-5ms (network RTT to Leader)
- Throughput: Limited by Leader capacity
§Raft Protocol
Implements linearizable read per Raft §8.
§Example
let client = engine.client();
let value = client.get_linearizable(b"critical-config").await?;Sourcepub async fn get_eventual(
&self,
key: impl AsRef<[u8]>,
) -> ClientApiResult<Option<Bytes>>
pub async fn get_eventual( &self, key: impl AsRef<[u8]>, ) -> ClientApiResult<Option<Bytes>>
Eventually consistent read (stale OK).
Reads from local state machine without Leader coordination. Fast but may return stale data if replication is lagging.
§Performance
- Latency: ~0.1ms (local memory access)
- Throughput: High (no Leader bottleneck)
§Use Cases
- Read-heavy workloads
- Analytics/reporting (staleness acceptable)
- Caching scenarios
§Example
let client = engine.client();
let cached_value = client.get_eventual(b"user-preference").await?;Sourcepub async fn get_with_consistency(
&self,
key: impl AsRef<[u8]>,
consistency: ReadConsistencyPolicy,
) -> ClientApiResult<Option<Bytes>>
pub async fn get_with_consistency( &self, key: impl AsRef<[u8]>, consistency: ReadConsistencyPolicy, ) -> ClientApiResult<Option<Bytes>>
Advanced: Read with explicit consistency policy.
For fine-grained control over read consistency vs performance trade-off.
§Consistency Policies
LinearizableRead: Read from Leader (strong consistency, may be slower)EventualConsistency: Read from local node (fast, may be stale)LeaseRead: Optimized Leader read using lease mechanism
§Example
use d_engine_proto::client::ReadConsistencyPolicy;
let value = client.get_with_consistency(
b"key",
ReadConsistencyPolicy::LeaseRead,
).await?;Sourcepub async fn get_multi_linearizable(
&self,
keys: &[Bytes],
) -> ClientApiResult<Vec<Option<Bytes>>>
pub async fn get_multi_linearizable( &self, keys: &[Bytes], ) -> ClientApiResult<Vec<Option<Bytes>>>
Sourcepub async fn get_multi_eventual(
&self,
keys: &[Bytes],
) -> ClientApiResult<Vec<Option<Bytes>>>
pub async fn get_multi_eventual( &self, keys: &[Bytes], ) -> ClientApiResult<Vec<Option<Bytes>>>
Sourcepub async fn get_multi_with_consistency(
&self,
keys: &[Bytes],
consistency: ReadConsistencyPolicy,
) -> ClientApiResult<Vec<Option<Bytes>>>
pub async fn get_multi_with_consistency( &self, keys: &[Bytes], consistency: ReadConsistencyPolicy, ) -> ClientApiResult<Vec<Option<Bytes>>>
Advanced: Get multiple keys with explicit consistency policy.
Sourcepub async fn delete(&self, key: impl AsRef<[u8]>) -> ClientApiResult<()>
pub async fn delete(&self, key: impl AsRef<[u8]>) -> ClientApiResult<()>
Delete a key-value pair with strong consistency.
§Errors
Returns an error if the node is not the leader, the channel is closed, the operation times out, or the state machine returns a server error.
Sourcepub fn watch(&self, key: impl AsRef<[u8]>) -> ClientApiResult<WatcherHandle>
pub fn watch(&self, key: impl AsRef<[u8]>) -> ClientApiResult<WatcherHandle>
Watch for changes to a specific key
Returns a WatcherHandle that yields watch events when the key’s value changes.
The stream will continue until explicitly dropped or a connection error occurs.
§Arguments
key- The key to watch
§Returns
A WatcherHandle that can be used to receive watch events
§Errors
Returns error if watch feature is not enabled or WatchRegistry not initialized
§Example
let client = engine.client();
let mut watcher = client.watch(b"config/timeout").await?;
while let Some(event) = watcher.next().await {
println!("Value changed: {:?}", event);
}Sourcepub fn watch_with_options(
&self,
key: impl AsRef<[u8]>,
prev_kv: bool,
) -> ClientApiResult<WatcherHandle>
pub fn watch_with_options( &self, key: impl AsRef<[u8]>, prev_kv: bool, ) -> ClientApiResult<WatcherHandle>
Watch a key and opt in to receiving the previous value on each mutation.
This is the lower-level form of watch. Use it when you need
event.prev_value — for example to detect what a key held before a write, or
to implement an audit log.
§Arguments
key- The exact key to watch.prev_kv- Whentrue, everyPutandDeleteevent carries the value the key held before the mutation inevent.prev_value. Whenfalse(the default viawatch),prev_valueis always empty.
§Performance note
When at least one watcher has prev_kv = true, the state machine reads the old
value from storage before each apply_chunk. The read is amortised across the
whole write batch — cost scales with write rate, not watcher count. Disable
when you don’t need the previous value.
§prev_value is empty in these cases
- The watcher was registered with
prev_kv = false. - The event type is
ProgressorCanceled(not a data mutation). - The key did not exist before a
Put(i.e. it was a fresh insert).
§Example
// Distributed-lock audit: know who held the lock before it changed hands.
let watcher = client.watch_with_options(b"lock/resource_a", true)?;
let (_, _, mut rx) = watcher.into_receiver();
tokio::spawn(async move {
while let Some(event) = rx.recv().await {
match event.event_type {
WatchEventType::Put => println!(
"lock acquired by {:?}, was held by {:?}",
event.value, event.prev_value
),
WatchEventType::Delete => println!(
"lock released, was held by {:?}",
event.prev_value
),
WatchEventType::Canceled => { /* re-register */ break; }
WatchEventType::Progress => {}
}
}
});Sourcepub fn watch_prefix(
&self,
prefix: impl AsRef<[u8]>,
) -> ClientApiResult<WatcherHandle>
pub fn watch_prefix( &self, prefix: impl AsRef<[u8]>, ) -> ClientApiResult<WatcherHandle>
Register a prefix watcher — notified on every key under the given path prefix.
prefix must start with ‘/’ and end with ‘/’. E.g. b"/services/" watches
all keys whose path begins with /services/.
Sourcepub fn watch_prefix_with_options(
&self,
prefix: impl AsRef<[u8]>,
prev_kv: bool,
) -> ClientApiResult<WatcherHandle>
pub fn watch_prefix_with_options( &self, prefix: impl AsRef<[u8]>, prev_kv: bool, ) -> ClientApiResult<WatcherHandle>
Register a prefix watcher and opt in to receiving the previous value on each mutation.
Prefix form of watch_with_options. Every key whose
path starts with prefix triggers an event; event.key is the full child key.
See watch_with_options for the prev_kv semantics,
performance note, and the cases where prev_value is empty.
§Example
// Track every endpoint change under /services/ together with the old address.
let watcher = client.watch_prefix_with_options(b"/services/", true)?;
let (_, _, mut rx) = watcher.into_receiver();
tokio::spawn(async move {
while let Some(event) = rx.recv().await {
match event.event_type {
WatchEventType::Put => println!(
"{:?} moved from {:?} → {:?}",
event.key, event.prev_value, event.value
),
WatchEventType::Delete => println!(
"{:?} removed (was {:?})",
event.key, event.prev_value
),
WatchEventType::Canceled => { /* buffer overflow — re-register */ break; }
WatchEventType::Progress => {}
}
}
});Sourcepub async fn scan_prefix(
&self,
prefix: impl AsRef<[u8]>,
) -> ClientApiResult<ScanResult>
pub async fn scan_prefix( &self, prefix: impl AsRef<[u8]>, ) -> ClientApiResult<ScanResult>
Scan all keys under prefix with linearizable consistency.
Returns a ScanResult containing all matching (key, value) pairs and the
revision (applied index) at scan time. Use revision to filter watch events
during reconnection: skip events where event.revision <= revision.
Trait Implementations§
Source§impl ClientApi for EmbeddedClient
impl ClientApi for EmbeddedClient
Source§fn put<'life0, 'async_trait>(
&'life0 self,
key: impl 'async_trait + AsRef<[u8]> + Send,
value: impl 'async_trait + AsRef<[u8]> + Send,
) -> Pin<Box<dyn Future<Output = ClientApiResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn put<'life0, 'async_trait>(
&'life0 self,
key: impl 'async_trait + AsRef<[u8]> + Send,
value: impl 'async_trait + AsRef<[u8]> + Send,
) -> Pin<Box<dyn Future<Output = ClientApiResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn put_with_ttl<'life0, 'async_trait>(
&'life0 self,
key: impl 'async_trait + AsRef<[u8]> + Send,
value: impl 'async_trait + AsRef<[u8]> + Send,
ttl_secs: u64,
) -> Pin<Box<dyn Future<Output = ClientApiResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn put_with_ttl<'life0, 'async_trait>(
&'life0 self,
key: impl 'async_trait + AsRef<[u8]> + Send,
value: impl 'async_trait + AsRef<[u8]> + Send,
ttl_secs: u64,
) -> Pin<Box<dyn Future<Output = ClientApiResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn get<'life0, 'async_trait>(
&'life0 self,
key: impl 'async_trait + AsRef<[u8]> + Send,
) -> Pin<Box<dyn Future<Output = ClientApiResult<Option<Bytes>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn get<'life0, 'async_trait>(
&'life0 self,
key: impl 'async_trait + AsRef<[u8]> + Send,
) -> Pin<Box<dyn Future<Output = ClientApiResult<Option<Bytes>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn get_multi<'life0, 'life1, 'async_trait>(
&'life0 self,
keys: &'life1 [Bytes],
) -> Pin<Box<dyn Future<Output = ClientApiResult<Vec<Option<Bytes>>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn get_multi<'life0, 'life1, 'async_trait>(
&'life0 self,
keys: &'life1 [Bytes],
) -> Pin<Box<dyn Future<Output = ClientApiResult<Vec<Option<Bytes>>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§fn delete<'life0, 'async_trait>(
&'life0 self,
key: impl 'async_trait + AsRef<[u8]> + Send,
) -> Pin<Box<dyn Future<Output = ClientApiResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn delete<'life0, 'async_trait>(
&'life0 self,
key: impl 'async_trait + AsRef<[u8]> + Send,
) -> Pin<Box<dyn Future<Output = ClientApiResult<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn compare_and_swap<'life0, 'async_trait>(
&'life0 self,
key: impl 'async_trait + AsRef<[u8]> + Send,
expected_value: Option<impl 'async_trait + AsRef<[u8]> + Send>,
new_value: impl 'async_trait + AsRef<[u8]> + Send,
) -> Pin<Box<dyn Future<Output = ClientApiResult<bool>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn compare_and_swap<'life0, 'async_trait>(
&'life0 self,
key: impl 'async_trait + AsRef<[u8]> + Send,
expected_value: Option<impl 'async_trait + AsRef<[u8]> + Send>,
new_value: impl 'async_trait + AsRef<[u8]> + Send,
) -> Pin<Box<dyn Future<Output = ClientApiResult<bool>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn list_members<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = ClientApiResult<Vec<NodeMeta>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn list_members<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = ClientApiResult<Vec<NodeMeta>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn get_leader_id<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = ClientApiResult<Option<u32>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn get_leader_id<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = ClientApiResult<Option<u32>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn get_multi_with_policy<'life0, 'life1, 'async_trait>(
&'life0 self,
keys: &'life1 [Bytes],
consistency_policy: Option<ReadConsistencyPolicy>,
) -> Pin<Box<dyn Future<Output = ClientApiResult<Vec<Option<Bytes>>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn get_multi_with_policy<'life0, 'life1, 'async_trait>(
&'life0 self,
keys: &'life1 [Bytes],
consistency_policy: Option<ReadConsistencyPolicy>,
) -> Pin<Box<dyn Future<Output = ClientApiResult<Vec<Option<Bytes>>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§fn get_linearizable<'life0, 'async_trait>(
&'life0 self,
key: impl 'async_trait + AsRef<[u8]> + Send,
) -> Pin<Box<dyn Future<Output = ClientApiResult<Option<Bytes>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn get_linearizable<'life0, 'async_trait>(
&'life0 self,
key: impl 'async_trait + AsRef<[u8]> + Send,
) -> Pin<Box<dyn Future<Output = ClientApiResult<Option<Bytes>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn get_lease<'life0, 'async_trait>(
&'life0 self,
key: impl 'async_trait + AsRef<[u8]> + Send,
) -> Pin<Box<dyn Future<Output = ClientApiResult<Option<Bytes>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn get_lease<'life0, 'async_trait>(
&'life0 self,
key: impl 'async_trait + AsRef<[u8]> + Send,
) -> Pin<Box<dyn Future<Output = ClientApiResult<Option<Bytes>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn get_eventual<'life0, 'async_trait>(
&'life0 self,
key: impl 'async_trait + AsRef<[u8]> + Send,
) -> Pin<Box<dyn Future<Output = ClientApiResult<Option<Bytes>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn get_eventual<'life0, 'async_trait>(
&'life0 self,
key: impl 'async_trait + AsRef<[u8]> + Send,
) -> Pin<Box<dyn Future<Output = ClientApiResult<Option<Bytes>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn scan_prefix<'life0, 'async_trait>(
&'life0 self,
prefix: impl 'async_trait + AsRef<[u8]> + Send,
) -> Pin<Box<dyn Future<Output = ClientApiResult<ScanResult>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn scan_prefix<'life0, 'async_trait>(
&'life0 self,
prefix: impl 'async_trait + AsRef<[u8]> + Send,
) -> Pin<Box<dyn Future<Output = ClientApiResult<ScanResult>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
(key, value) pairs whose key starts with prefix, plus
a revision that anchors watch event deduplication. Read moreSource§impl Clone for EmbeddedClient
impl Clone for EmbeddedClient
Source§fn clone(&self) -> EmbeddedClient
fn clone(&self) -> EmbeddedClient
1.0.0 (const: unstable) · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreAuto Trait Implementations§
impl Freeze for EmbeddedClient
impl !RefUnwindSafe for EmbeddedClient
impl Send for EmbeddedClient
impl Sync for EmbeddedClient
impl Unpin for EmbeddedClient
impl UnsafeUnpin for EmbeddedClient
impl !UnwindSafe for EmbeddedClient
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request