use crate::cbconfig::TerseConfig;
use crate::error::ErrorKind;
use crate::error::Result;
use crate::memdx::error::ServerErrorKind;
use crate::memdx::response::TryFromClientResponse;
use crate::nmvbhandler::NotMyVbucketConfigHandler;
use crate::vbucketmap::VbucketMap;
use arc_swap::{ArcSwap, Guard};
use std::future::Future;
use std::sync::Arc;
use tracing::debug;
pub(crate) trait VbucketRouter: Send + Sync {
fn update_vbucket_info(&self, info: VbucketRoutingInfo);
fn dispatch_by_key(&self, key: &[u8], vbucket_server_idx: u32) -> Result<(Arc<str>, u16)>;
fn dispatch_to_vbucket(&self, vb_id: u16) -> Result<Arc<str>>;
}
#[derive(Clone, Debug)]
pub(crate) struct VbucketRoutingInfo {
pub vbucket_info: Option<VbucketMap>,
pub server_list: Vec<Arc<str>>,
pub bucket_selected: bool,
}
#[derive(Clone, Debug)]
pub(crate) struct VbucketRouterOptions {}
pub(crate) struct StdVbucketRouter {
routing_info: ArcSwap<VbucketRoutingInfo>,
}
impl StdVbucketRouter {
pub(crate) fn new(info: VbucketRoutingInfo, _opts: VbucketRouterOptions) -> Self {
Self {
routing_info: ArcSwap::new(Arc::new(info)),
}
}
fn get_vbucket_info(info: &Guard<Arc<VbucketRoutingInfo>>) -> Result<&VbucketMap> {
if let Some(i) = &info.vbucket_info {
return Ok(i);
}
Err(ErrorKind::NoVbucketMap.into())
}
}
impl VbucketRouter for StdVbucketRouter {
fn update_vbucket_info(&self, info: VbucketRoutingInfo) {
self.routing_info.store(Arc::new(info));
}
fn dispatch_by_key(&self, key: &[u8], vbucket_server_idx: u32) -> Result<(Arc<str>, u16)> {
let info = self.routing_info.load();
if !info.bucket_selected {
return Err(ErrorKind::NoBucket.into());
}
let vbucket_info = Self::get_vbucket_info(&info)?;
let vb_id = vbucket_info.vbucket_by_key(key);
let idx = vbucket_info.node_by_vbucket(vb_id, vbucket_server_idx)?;
if idx >= 0 {
if let Some(server) = info.server_list.get(idx as usize) {
return Ok((server.clone(), vb_id));
}
}
Err(ErrorKind::NoServerAssigned {
requested_vb_id: vb_id,
}
.into())
}
fn dispatch_to_vbucket(&self, vb_id: u16) -> Result<Arc<str>> {
let info = self.routing_info.load();
if !info.bucket_selected {
return Err(ErrorKind::NoBucket.into());
}
let idx = Self::get_vbucket_info(&info)?.node_by_vbucket(vb_id, 0)?;
if idx > 0 {
if let Some(server) = info.server_list.get(idx as usize) {
return Ok(server.clone());
}
}
Err(ErrorKind::NoServerAssigned {
requested_vb_id: vb_id,
}
.into())
}
}
pub(crate) async fn orchestrate_memd_routing<V, Resp, Fut>(
vb: Arc<V>,
nmvb_handler: Arc<impl NotMyVbucketConfigHandler>,
key: &[u8],
vb_server_idx: u32,
mut operation: impl Fn(Arc<str>, u16) -> Fut,
) -> Result<Resp>
where
V: VbucketRouter,
Fut: Future<Output = Result<Resp>> + Send,
{
let (mut endpoint, vb_id) = vb.dispatch_by_key(key, vb_server_idx)?;
let err = match operation(endpoint.clone(), vb_id).await {
Ok(r) => return Ok(r),
Err(e) => e,
};
let config = if let Some(memdx_err) = err.is_memdx_error() {
if memdx_err.is_server_error_kind(ServerErrorKind::NotMyVbucket) {
if let Some(config) = memdx_err.has_server_config() {
config
} else {
debug!("Received empty NMVB response");
return Err(err);
}
} else {
return Err(err);
}
} else {
return Err(err);
};
if config.is_empty() {
return Err(err);
}
let value = match std::str::from_utf8(config.as_slice()) {
Ok(v) => v.to_string(),
Err(_e) => "".to_string(),
};
let config = value.replace("$HOST", endpoint.as_ref());
let config_json: TerseConfig = match serde_json::from_str(&config) {
Ok(c) => c,
Err(_) => {
return Err(err);
}
};
nmvb_handler
.clone()
.not_my_vbucket_config(config_json, &endpoint)
.await;
Err(err)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::cbconfig::TerseConfig;
use crate::vbucketmap::VbucketMap;
use crate::vbucketrouter::{
NotMyVbucketConfigHandler, StdVbucketRouter, VbucketRouter, VbucketRouterOptions,
VbucketRoutingInfo,
};
struct NVMBHandler {}
impl NotMyVbucketConfigHandler for NVMBHandler {
async fn not_my_vbucket_config(&self, config: TerseConfig, source_hostname: &str) {}
}
#[test]
fn dispatch_to_key() {
let routing_info = VbucketRoutingInfo {
vbucket_info: Option::from(
VbucketMap::new(
vec![vec![0, 1], vec![1, 0], vec![0, 1], vec![0, 1], vec![1, 0]],
1,
)
.unwrap(),
),
server_list: vec![Arc::from("endpoint1"), Arc::from("endpoint2")],
bucket_selected: true,
};
let dispatcher = StdVbucketRouter::new(routing_info, VbucketRouterOptions {});
let (endpoint, vb_id) = dispatcher.dispatch_by_key(b"key1", 0).unwrap();
assert_eq!("endpoint2", &*endpoint);
assert_eq!(1, vb_id);
let (endpoint, vb_id) = dispatcher.dispatch_by_key(b"key2", 0).unwrap();
assert_eq!("endpoint1", &*endpoint);
assert_eq!(3, vb_id);
let (endpoint, vb_id) = dispatcher.dispatch_by_key(b"key2", 1).unwrap();
assert_eq!("endpoint2", &*endpoint);
assert_eq!(3, vb_id);
}
}