#![deny(unsafe_op_in_unsafe_fn)]
mod block_xshard;
mod blocked;
mod cluster;
mod conn;
mod exec;
mod exec_build;
mod exec_dispatch;
mod exec_notify;
mod exec_op;
mod exec_pubsub;
mod exec_pubsub_pattern;
mod exec_rename;
mod exec_slowlog;
mod exec_watch;
mod inbox;
mod persist_worker;
mod message;
mod reduce;
mod replica_inbox;
mod replication;
mod replication_apply;
mod replication_gate;
mod replication_io;
mod replication_pump;
mod reshard;
mod route;
mod runtime;
mod runtime_builders;
mod shard;
mod shard_flush;
mod shard_lifecycle;
mod shard_tick;
#[cfg(target_os = "linux")]
mod uring_conn;
#[cfg(target_os = "linux")]
mod uring_inbox;
#[cfg(target_os = "linux")]
mod uring_io;
#[cfg(target_os = "linux")]
mod uring_park;
#[cfg(target_os = "linux")]
mod uring_reactor;
pub use blocked::{BlockHint, BlockKind};
pub use cluster::shard_slot_range;
pub use exec_slowlog::{SlowlogSub, parse_slowlog_sub};
pub use kevy_config::NotificationFlags;
pub use kevy_persist::Fsync;
pub use kevy_resp::{Argv, ArgvBorrowed, ArgvView, RespVersion};
pub use kevy_store::Store;
pub use replica_inbox::{ReplicaApply, ReplicaInboxReceiver, ReplicaInboxSender, replica_inbox_pair};
pub use replication_gate::ReplicatedApplyGuard;
pub use route::{Route, XGroupCtx};
pub use runtime::Runtime;
pub trait Commands: Clone + Send + 'static {
fn route<A: ArgvView + ?Sized>(&self, args: &A) -> Route;
fn dispatch<A: ArgvView + ?Sized>(&self, store: &mut Store, args: &A) -> Vec<u8>;
fn dispatch_resp3<A: ArgvView + ?Sized>(&self, store: &mut Store, args: &A) -> Vec<u8> {
self.dispatch(store, args)
}
fn dispatch_into<A: ArgvView + ?Sized>(&self, store: &mut Store, args: &A, out: &mut Vec<u8>) {
out.extend_from_slice(&self.dispatch(store, args));
}
fn dispatch_into_resp3<A: ArgvView + ?Sized>(
&self,
store: &mut Store,
args: &A,
out: &mut Vec<u8>,
) {
self.dispatch_into(store, args, out);
}
fn notify_class<A: ArgvView + ?Sized>(&self, _args: &A) -> Option<NotifyClass> {
None
}
fn hello_reply<A: ArgvView + ?Sized>(
&self,
_args: &A,
current_proto: RespVersion,
) -> (RespVersion, Vec<u8>) {
(current_proto, b"+OK\r\n".to_vec())
}
fn is_quit<A: ArgvView + ?Sized>(&self, args: &A) -> bool;
fn is_write<A: ArgvView + ?Sized>(&self, args: &A) -> bool;
fn txn_kind<A: ArgvView + ?Sized>(&self, args: &A) -> TxnKind;
fn on_shard_init(&self, _store: &mut Store) {}
fn on_shard_start(&self, _shard: usize) {}
fn on_persist_stats(&self, _in_flight: bool, _aof_rewrites_total: u64) {}
fn on_replication_view(
&self,
_master_repl_offset: u64,
_replicas: Vec<(std::net::Ipv4Addr, u16, u64)>,
) {}
fn on_shard_tick(&self, _store: &mut Store) {}
fn on_command(&self) {}
fn on_connection(&self) {}
fn shard_tick_interval_ms(&self) -> u64 {
100
}
fn live_runtime_config(&self) -> LiveRuntimeConfig {
LiveRuntimeConfig::default()
}
fn wake_idx<A: ArgvView + ?Sized>(&self, _args: &A) -> Option<u8> {
None
}
fn block_hint<A: ArgvView + ?Sized>(&self, _args: &A) -> BlockHint {
BlockHint::None
}
fn resolve_block_argv<A: ArgvView + ?Sized>(
&self,
_store: &mut Store,
args: &A,
_kind: BlockKind,
) -> Argv {
args.to_argv()
}
fn block_serve_argv<A: ArgvView + ?Sized>(
&self,
args: &A,
_kind: BlockKind,
_key: &[u8],
) -> Argv {
args.to_argv()
}
fn block_ready<A: ArgvView + ?Sized>(
&self,
_store: &mut Store,
_serve_argv: &A,
_kind: BlockKind,
) -> bool {
false
}
fn resolve<A: ArgvView + ?Sized>(&self, args: &A) -> ResolvedCmd {
ResolvedCmd {
txn_kind: self.txn_kind(args),
route: self.route(args),
is_quit: self.is_quit(args),
is_write: self.is_write(args),
block_hint: self.block_hint(args),
wake_idx: None,
}
}
}
pub struct ResolvedCmd {
pub txn_kind: TxnKind,
pub route: Route,
pub is_quit: bool,
pub is_write: bool,
pub block_hint: BlockHint,
pub wake_idx: Option<u8>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NotifyClass {
Generic,
String,
List,
Set,
Hash,
Zset,
Stream,
}
impl NotifyClass {
#[inline]
pub fn enabled_in(self, flags: &NotificationFlags) -> bool {
match self {
NotifyClass::Generic => flags.generic,
NotifyClass::String => flags.string,
NotifyClass::List => flags.list,
NotifyClass::Set => flags.set,
NotifyClass::Hash => flags.hash,
NotifyClass::Zset => flags.zset,
NotifyClass::Stream => flags.stream,
}
}
}
pub enum TxnKind {
Multi,
Exec,
Discard,
Watch,
Other,
}
#[derive(Debug, Default, Clone, Copy)]
pub struct LiveRuntimeConfig {
pub appendfsync: Option<Fsync>,
pub auto_aof_rewrite_pct: Option<u32>,
pub auto_aof_rewrite_min_size: Option<u64>,
pub tick_interval_ms: Option<u64>,
pub notify_flags: Option<NotificationFlags>,
pub slowlog_slower_than_micros: Option<i64>,
pub slowlog_max_len: Option<u32>,
}