use tokio::sync::mpsc::{self, Receiver, Sender};
use crate::common::{debug, error, info, Result};
use crate::config::filepath;
use crate::core::middleware::{Dispatcher, MiddlewareChain};
use crate::core::table::Table;
use crate::core::{Config, UnitOfWork};
#[derive(Default)]
pub(crate) struct Builder {
config: Option<Config>,
request_channel_buffer: usize,
}
impl Builder {
pub(crate) fn from_config(config: Config) -> Self {
let mut builder = Builder::new();
builder.config = Some(config);
builder
}
pub(crate) async fn build(mut self) -> Result<Kvsd> {
let (send, recv) = mpsc::channel(self.request_channel_buffer);
let dispatcher = self.build_dispatcher().await?;
let mw = MiddlewareChain::new(&self.config.unwrap_or_default(), dispatcher);
Ok(Kvsd {
request_send: send,
request_recv: recv,
middlewares: mw,
})
}
async fn build_dispatcher(&mut self) -> Result<Dispatcher> {
let (tx, rx) = mpsc::channel(1024);
let root_dir = self.config.as_ref().unwrap().root_dir.as_ref().unwrap();
let default_table = root_dir
.join(filepath::NAMESPACES)
.join(filepath::NS_DEFAULT)
.join("default/default.kvsd");
debug!("Open default table file {}", default_table.display());
let default_table = Table::from_path(rx, default_table).await?;
tokio::spawn(default_table.run());
let mut dispatcher = Dispatcher::new();
dispatcher.add_table("default", "default", tx);
Ok(dispatcher)
}
fn new() -> Self {
Self {
request_channel_buffer: 1024,
..Default::default()
}
}
}
pub(crate) struct Kvsd {
request_recv: Receiver<UnitOfWork>,
request_send: Sender<UnitOfWork>,
middlewares: MiddlewareChain,
}
impl Kvsd {
pub fn request_channel(&self) -> Sender<UnitOfWork> {
self.request_send.clone()
}
pub(crate) async fn run(mut self) {
info!("Kvsd running");
while let Some(request) = self.request_recv.recv().await {
if let Err(err) = self.handle_request(request).await {
error!("Handle request {}", err);
}
}
}
pub(crate) async fn handle_request(&mut self, uow: UnitOfWork) -> Result<()> {
self.middlewares.apply(uow).await
}
}