use crate::Commands;
use crate::message::{Inbound, PubSubReg};
use crate::shard::Shard;
use kevy_map::KevyMap;
use kevy_persist::{Aof, Fsync};
use kevy_ring::{Consumer, Producer};
use kevy_store::Store;
use kevy_sys::{Poller, Waker, tcp_listen_reuseport, waker};
use std::collections::{HashMap, VecDeque};
use std::io;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};
const RING_CAPACITY: usize = 1024;
pub struct Runtime<C: Commands> {
ip: [u8; 4],
port: u16,
nshards: usize,
commands: C,
data_dir: PathBuf,
enable_aof: bool,
appendfsync: Fsync,
auto_aof_rewrite_pct: u32,
auto_aof_rewrite_min_size: u64,
}
impl<C: Commands> Runtime<C> {
pub fn new(ip: [u8; 4], port: u16, nshards: usize, commands: C) -> Self {
Runtime {
ip,
port,
nshards: nshards.max(1),
commands,
data_dir: PathBuf::from("."),
enable_aof: true,
appendfsync: Fsync::EverySec,
auto_aof_rewrite_pct: 100,
auto_aof_rewrite_min_size: 64 * 1024 * 1024,
}
}
pub fn with_data_dir(mut self, dir: impl Into<PathBuf>) -> Self {
self.data_dir = dir.into();
self
}
pub fn with_aof(mut self, on: bool) -> Self {
self.enable_aof = on;
self
}
pub fn with_appendfsync(mut self, fsync: Fsync) -> Self {
self.appendfsync = fsync;
self
}
pub fn with_auto_aof_rewrite(mut self, pct: u32, min_size: u64) -> Self {
self.auto_aof_rewrite_pct = pct;
self.auto_aof_rewrite_min_size = min_size;
self
}
pub fn run(self, stop: Arc<AtomicBool>) -> io::Result<()> {
let n = self.nshards;
let mut outboxes: Vec<Vec<Option<Producer<Inbound>>>> =
(0..n).map(|_| (0..n).map(|_| None).collect()).collect();
let mut inboxes: Vec<Vec<Option<Consumer<Inbound>>>> =
(0..n).map(|_| (0..n).map(|_| None).collect()).collect();
for i in 0..n {
for j in 0..n {
if i == j {
continue;
}
let (p, c) = kevy_ring::ring::<Inbound>(RING_CAPACITY);
outboxes[i][j] = Some(p);
inboxes[j][i] = Some(c);
}
}
let mut wakers: Vec<Arc<Waker>> = Vec::with_capacity(n);
for _ in 0..n {
wakers.push(Arc::new(waker()?));
}
let parked: Vec<Arc<AtomicBool>> =
(0..n).map(|_| Arc::new(AtomicBool::new(false))).collect();
let pubsub: PubSubReg = Arc::new(RwLock::new(HashMap::new()));
let mut shards = Vec::with_capacity(n);
for id in 0..n {
let listener = tcp_listen_reuseport(self.ip, self.port, 1024)?;
let aof = if self.enable_aof {
Some(Aof::open(
&self.data_dir.join(format!("aof-{id}.aof")),
self.appendfsync,
)?)
} else {
None
};
let mut store = Store::new();
self.commands.on_shard_init(&mut store);
shards.push(Shard {
id,
nshards: n,
store,
commands: self.commands.clone(),
poller: Poller::new()?,
listener,
waker: wakers[id].clone(),
inboxes: std::mem::take(&mut inboxes[id]),
outboxes: std::mem::take(&mut outboxes[id]),
backlog: (0..n).map(|_| VecDeque::new()).collect(),
wakers: wakers.clone(),
conns: KevyMap::new(),
fd_to_conn: KevyMap::new(),
next_conn_id: 1,
events: Vec::with_capacity(1024),
read_buf: vec![0u8; 64 * 1024],
pending_wakes: vec![false; n],
parked: parked.clone(),
data_dir: self.data_dir.clone(),
aof,
auto_aof_rewrite_pct: self.auto_aof_rewrite_pct,
auto_aof_rewrite_min_size: self.auto_aof_rewrite_min_size,
dirty: Vec::new(),
pubsub: pubsub.clone(),
publish_batch: (0..n).map(|_| Vec::new()).collect(),
request_batch: (0..n).map(|_| Vec::new()).collect(),
scratch_argv: kevy_resp::Argv::default(),
});
}
#[cfg(target_os = "linux")]
let use_uring = std::env::var_os("KEVY_IO_URING").is_some();
let mut handles = Vec::with_capacity(n);
for shard in shards {
let stop = stop.clone();
let id = shard.id;
handles.push(std::thread::spawn(move || {
#[cfg(target_os = "linux")]
let res = if use_uring { shard.run_uring(stop) } else { shard.run(stop) };
#[cfg(not(target_os = "linux"))]
let res = shard.run(stop);
if let Err(e) = res {
eprintln!("kevy: shard {id} exited with error: {e}");
}
}));
}
for h in handles {
let _ = h.join();
}
Ok(())
}
}