syntax = "proto3";
package d_engine.client;
import "proto/error.proto";
option go_package = "github.com/deventlab/d-engine/proto/client";
// Write operation-specific command
message WriteCommand {
message Insert {
bytes key = 1;
bytes value = 2;
// Time-to-live in seconds. 0 means no expiration (default).
// Non-zero values specify expiration time in seconds from insertion.
uint64 ttl_secs = 3;
}
message Delete {
bytes key = 1;
}
message CompareAndSwap {
bytes key = 1;
optional bytes expected_value = 2; // None means key must not exist
bytes new_value = 3; // New value to set if comparison succeeds
}
oneof operation {
Insert insert = 1;
Delete delete = 2;
CompareAndSwap compare_and_swap = 3;
}
}
message ClientWriteRequest {
uint32 client_id = 1;
WriteCommand command = 2; // Changed from 'repeated commands' to singular 'command' (1 request = 1 command)
}
// Read consistency policy for controlling read operation guarantees
//
// Allows clients to choose between performance and consistency trade-offs
// on a per-request basis when supported by the cluster configuration.
enum ReadConsistencyPolicy {
// Lease-based reads for better performance with weaker consistency
//
// Leader serves reads locally without contacting followers during lease period.
// Provides lower latency but slightly weaker consistency guarantees.
READ_CONSISTENCY_POLICY_LEASE_READ = 0;
// Fully linearizable reads for strongest consistency
//
// Leader verifies its leadership with a quorum before serving the read,
// ensuring strict linearizability. Guarantees that all reads reflect
// the most recent committed value in the cluster.
READ_CONSISTENCY_POLICY_LINEARIZABLE_READ = 1;
// Eventually consistent reads from any node
//
// Allows reading from any node (leader, follower, or candidate) without
// additional consistency checks. May return stale data but provides
// best read performance and availability. Suitable for scenarios where
// eventual consistency is acceptable.
READ_CONSISTENCY_POLICY_EVENTUAL_CONSISTENCY = 2;
}
message ClientReadRequest {
uint32 client_id = 1;
repeated bytes keys = 2; // Key list to be read
// Optional consistency policy for this request
//
// When present: Client explicitly specifies consistency requirements
// When absent: Use cluster's configured default policy
optional ReadConsistencyPolicy consistency_policy = 3;
}
// Write operation result
message WriteResult {
bool succeeded = 1;
// Future: uint64 version = 2;
// Future: bytes prev_value = 3;
}
message ClientResponse {
d_engine.error.ErrorCode error = 1;
oneof success_result {
WriteResult write_result = 2;
ReadResults read_data = 3;
}
d_engine.error.ErrorMetadata metadata = 4;
}
message ClientResult { // Renamed from ClientGetResult
bytes key = 1;
bytes value = 2;
}
message ReadResults {
repeated ClientResult results = 1;
}
// A single key-value pair returned by a prefix scan.
message KvEntry {
bytes key = 1;
bytes value = 2;
}
// Request to scan all keys under a namespace prefix.
// Scan is served from the leader's state machine at last_applied.
// consistency_policy is not supported; tracked in #381.
message ScanRequest {
uint32 client_id = 1;
// Prefix to scan. Must start with '/'. e.g. b"/services/".
bytes prefix = 2;
}
// Response from a prefix scan.
message ScanResponse {
// All (key, value) pairs whose key starts with the requested prefix.
repeated KvEntry entries = 1;
// Raft applied index at the moment of the scan.
// Use as the watch-event filter anchor: skip events where event.revision <= revision.
uint64 revision = 2;
}
// Request to start a server-side membership watch stream.
message WatchMembershipRequest {
uint32 client_id = 1;
}
// Point-in-time snapshot of committed cluster membership.
//
// Delivered on every committed ConfChange entry.
// `committed_index` is strictly monotonically increasing and serves as an
// idempotency key for schedulers.
message MembershipSnapshot {
// Current voting members (Follower / Leader role).
repeated uint32 members = 1;
// Current non-voting learners.
repeated uint32 learners = 2;
// Raft log index of the ConfChange entry that produced this snapshot.
uint64 committed_index = 3;
}
// Watch event type indicating the type of change that occurred
enum WatchEventType {
// A key was inserted or updated
WATCH_EVENT_TYPE_PUT = 0;
// A key was explicitly deleted
WATCH_EVENT_TYPE_DELETE = 1;
// Watcher forcibly canceled by the server (e.g. buffer overflow).
// Client should re-sync via Read API and re-register the watch.
WATCH_EVENT_TYPE_CANCELED = 2;
// Periodic heartbeat with no data change.
// Carries current revision so clients can confirm the stream is alive.
WATCH_EVENT_TYPE_PROGRESS = 3;
}
// Request to watch for changes on a key or key prefix.
message WatchRequest {
uint32 client_id = 1;
// Key to watch.
// prefix=false: exact key match.
// prefix=true: prefix match — key must start with "/" and end with "/".
// e.g. "/config/" watches all keys under /config/.
bytes key = 2;
// When true, key is treated as a path prefix (must end with "/").
// Default false preserves backwards-compatible exact-match behaviour.
bool prefix = 3;
// When true, each WatchResponse includes the value that existed before
// the mutation (prev_value). Default false; server skips the extra read
// when no watcher requests it.
bool prev_kv = 4;
}
// Response containing a watch event notification
message WatchResponse {
// The key that changed
bytes key = 1;
// The new value (empty for DELETE events)
bytes value = 2;
// Type of change that occurred
WatchEventType event_type = 3;
// Error information if watch failed
d_engine.error.ErrorCode error = 4;
// Raft applied index at the time this event was produced.
// Monotonically increasing. Clients use this as an anchor after receiving
// CANCELED: re-read state, then re-register watching from revision+1.
uint64 revision = 5;
// Value before this mutation. Empty when:
// - key did not exist before the write
// - event_type is PROGRESS or CANCELED
// - watcher was registered with prev_kv = false
bytes prev_value = 6;
}
service RaftClientService {
rpc HandleClientWrite(ClientWriteRequest) returns (ClientResponse);
rpc HandleClientRead(ClientReadRequest) returns (ClientResponse);
rpc HandleClientScan(ScanRequest) returns (ScanResponse);
// Watch for changes to a specific key.
//
// Returns a stream of WatchResponse events whenever the watched key changes.
// The stream remains open until the client cancels or disconnects.
//
// Performance characteristics:
// - Event notification latency: typically < 100μs
// - Minimal overhead on write path (< 0.01% with 100+ watchers)
//
// Error handling:
// - If the internal event buffer is full, events may be dropped
// - Clients should use Read API to re-sync if they detect gaps
rpc Watch(WatchRequest) returns (stream WatchResponse);
// Watch for committed cluster membership changes.
//
// Immediately pushes the current MembershipSnapshot on connect, then pushes
// a new snapshot on every committed ConfChange (AddNode, BatchPromote, BatchRemove).
// All nodes (leader, follower, learner) emit the event after the entry commits.
//
// Stream lifecycle:
// - Stays open until the client cancels or the server shuts down.
// - On server shutdown the stream closes with Status::UNAVAILABLE.
// Clients should reconnect and re-subscribe.
//
// Idempotency:
// - Use `committed_index` as an idempotency key in the receiver.
rpc WatchMembership(WatchMembershipRequest) returns (stream MembershipSnapshot);
}