use bb8_redis::bb8::Pool;
use bb8_redis::redis::{AsyncCommands, ErrorKind};
use bb8_redis::RedisConnectionManager;
use serde_json::{self, Error as SerdeJSONError};
use std::collections::HashSet;
use std::sync::RwLock;
use std::time::{Duration, Instant, SystemTime};
use super::cache::STORE_CACHE;
use super::key::StoreKey;
use crate::dns::record::{
RecordBlackhole, RecordName, RecordRegions, RecordType, RecordValue, RecordValues,
};
use crate::dns::zone::ZoneName;
use crate::APP_CONF;
static KEY_TYPE: &'static str = "t";
static KEY_NAME: &'static str = "n";
static KEY_TTL: &'static str = "e";
static KEY_FLATTEN: &'static str = "m"; static KEY_BLACKHOLE: &'static str = "b";
static KEY_REGION: &'static str = "r";
static KEY_RESCUE: &'static str = "f"; static KEY_VALUE: &'static str = "v";
const LIMITS_GET_REMOTE_TIMESPAN_TOTAL: Duration = Duration::from_secs(10);
const LIMITS_GET_REMOTE_ALLOWANCE_THRESHOLD: Duration = Duration::from_secs(8);
type StoreGetType = (
String,
String,
u32,
Option<String>,
Option<String>,
Option<String>,
Option<String>,
String,
);
pub struct StoreBuilder;
pub struct Store {
pools: Vec<StorePool>,
limits: StoreLimits,
}
pub struct StorePool {
connection: Pool<RedisConnectionManager>,
target: String,
delinquent_until: RwLock<Option<Instant>>,
}
pub struct StoreLimits {
rate: RwLock<StoreLimitsRate>,
}
pub struct StoreLimitsRate {
time_last: Instant,
time_spent: Duration,
}
#[derive(Debug, Clone)]
pub struct StoreRecord {
pub kind: RecordType,
pub name: RecordName,
pub ttl: Option<u32>,
pub flatten: Option<bool>,
pub blackhole: Option<RecordBlackhole>,
pub regions: Option<RecordRegions>,
pub rescue: Option<RecordValues>,
pub values: RecordValues,
}
pub enum StoreError {
Corrupted,
Encoding,
Connector,
NotFound,
Disconnected,
}
#[derive(PartialEq, Clone, Copy)]
pub enum StoreAccessOrigin {
External,
Internal,
}
impl StoreBuilder {
#[tokio::main]
pub async fn new() -> Store {
let mut pools = Vec::new();
Self::pool_bind(
&mut pools,
&APP_CONF.redis.master.host,
APP_CONF.redis.master.port,
&APP_CONF.redis.master.password,
)
.await;
if let Some(ref rescue_items) = APP_CONF.redis.rescue {
for rescue in rescue_items {
Self::pool_bind(&mut pools, &rescue.host, rescue.port, &rescue.password).await;
}
}
let limits = StoreLimits {
rate: RwLock::new(StoreLimitsRate::default()),
};
Store { pools, limits }
}
async fn pool_bind(
pools: &mut Vec<StorePool>,
host: &str,
port: u16,
password: &Option<String>,
) {
match Self::pool_connect(host, port, password).await {
Ok(master_pool) => pools.push(master_pool),
Err(err) => panic!("store error: {}", err),
}
}
async fn pool_connect(
host: &str,
port: u16,
password: &Option<String>,
) -> Result<StorePool, &'static str> {
info!("binding to store backend at {}:{}", host, port);
let addr_auth = match password {
Some(ref password) => format!(":{}@", password),
None => "".to_string(),
};
let tcp_addr_raw = format!(
"redis://{}{}:{}/{}",
&addr_auth, host, port, APP_CONF.redis.database,
);
debug!("will connect to redis at: {}", tcp_addr_raw);
match RedisConnectionManager::new(tcp_addr_raw.as_ref()) {
Ok(manager) => {
let builder = Pool::builder()
.test_on_check_out(true)
.max_size(APP_CONF.redis.pool_size)
.max_lifetime(Some(Duration::from_secs(
APP_CONF.redis.max_lifetime_seconds,
)))
.idle_timeout(Some(Duration::from_secs(
APP_CONF.redis.idle_timeout_seconds,
)))
.connection_timeout(Duration::from_secs(
APP_CONF.redis.connection_timeout_seconds,
));
match builder.build(manager).await {
Ok(pool) => {
info!("connected to redis at: {}", tcp_addr_raw);
Ok(StorePool {
connection: pool,
target: tcp_addr_raw,
delinquent_until: RwLock::new(None),
})
}
Err(_) => Err("could not spawn redis pool"),
}
}
Err(_) => Err("could not create redis connection manager"),
}
}
}
impl Store {
pub async fn get(
&self,
zone_name: &ZoneName,
record_name: &RecordName,
record_type: &RecordType,
origin: StoreAccessOrigin,
) -> Result<StoreRecord, StoreError> {
let store_key = StoreKey::to_key(zone_name, record_name, record_type);
if let Ok(cached_records) = STORE_CACHE.get(&store_key) {
debug!(
"get from local store from any on type: {:?}, zone: {:?}, record: {:?}",
record_type, zone_name, record_name
);
return match cached_records {
Some(cached_records) => Ok(cached_records),
None => Err(StoreError::NotFound),
};
}
if origin == StoreAccessOrigin::Internal {
debug!(
"get from remote store from internal on type: {:?}, zone: {:?}, record: {:?}",
record_type, zone_name, record_name
);
return self.raw_get_remote(&store_key, None).await;
}
debug!(
"get from remote store from external on type: {:?}, zone: {:?}, record: {:?}",
record_type, zone_name, record_name
);
let (time_spent_current_timespan, start_instant) = {
let mut limits_rate_write = self.limits.rate.write().unwrap();
let now_instant = Instant::now();
if now_instant.duration_since(limits_rate_write.time_last)
>= LIMITS_GET_REMOTE_TIMESPAN_TOTAL
{
limits_rate_write.time_last = now_instant;
limits_rate_write.time_spent = Duration::new(0, 0);
debug!(
"started a new time spent chunk in remote store from external ({:?} chunks)",
LIMITS_GET_REMOTE_TIMESPAN_TOTAL
);
}
(limits_rate_write.time_spent, now_instant)
};
if time_spent_current_timespan >= LIMITS_GET_REMOTE_ALLOWANCE_THRESHOLD {
error!(
"limited remote store get from external on type: {:?}, zone: {:?}, record: {:?}",
record_type, zone_name, record_name
);
return Err(StoreError::Disconnected);
}
let result_remote = self.raw_get_remote(&store_key, None).await;
{
let mut limits_rate_write = self.limits.rate.write().unwrap();
limits_rate_write.time_spent += start_instant.elapsed();
debug!(
"updated time spent in remote store from external to: {:?} in current chunk",
limits_rate_write.time_spent
);
}
result_remote
}
pub async fn set(&self, zone_name: &ZoneName, record: StoreRecord) -> Result<(), StoreError> {
get_cache_store_client!(&self.pools, StoreError::Disconnected, client {
let flatten_encoder: Result<String, SerdeJSONError> = match record.flatten {
Some(true) => {
Ok("1".to_owned())
},
_ => Ok("".to_owned())
};
let blackhole_encoder = match record.blackhole {
Some(ref blackhole) => {
if blackhole.has_items() == true {
serde_json::to_string(blackhole)
} else {
Ok("".to_owned())
}
},
None => Ok("".to_owned())
};
let region_encoder = match record.regions {
Some(ref regions) => serde_json::to_string(regions),
None => Ok("".to_owned())
};
let rescue_encoder = match record.rescue {
Some(ref rescue) => {
if rescue.is_empty() == false {
serde_json::to_string(rescue)
} else {
Ok("".to_owned())
}
},
None => Ok("".to_owned())
};
match (
serde_json::to_string(&record.values),
flatten_encoder,
blackhole_encoder,
region_encoder,
rescue_encoder
) {
(Ok(values), Ok(flatten), Ok(blackhole), Ok(regions), Ok(rescue)) => {
let store_key = StoreKey::to_key(zone_name, &record.name, &record.kind);
STORE_CACHE.pop(&store_key);
client.hset_multiple(
store_key, &[
(KEY_TYPE, record.kind.to_str()),
(KEY_NAME, record.name.to_str()),
(KEY_TTL, &record.ttl.unwrap_or(0).to_string()),
(KEY_FLATTEN, &flatten),
(KEY_BLACKHOLE, &blackhole),
(KEY_REGION, ®ions),
(KEY_RESCUE, &rescue),
(KEY_VALUE, &values),
]
).await.or(Err(StoreError::Connector))
},
(Err(_), _, _, _, _) |
(_, Err(_), _, _, _) |
(_, _, Err(_), _, _) |
(_, _, _, Err(_), _) |
(_, _, _, _, Err(_)) => {
Err(StoreError::Encoding)
}
}
})
}
pub async fn remove(
&self,
zone_name: &ZoneName,
record_name: &RecordName,
record_type: &RecordType,
) -> Result<(), StoreError> {
get_cache_store_client!(&self.pools, StoreError::Disconnected, client {
let store_key = StoreKey::to_key(zone_name, record_name, record_type);
STORE_CACHE.pop(&store_key);
client.del(store_key).await.or(Err(StoreError::Connector))
})
}
pub async fn raw_get_remote(
&self,
store_key: &str,
cache_accessed_at: Option<SystemTime>,
) -> Result<StoreRecord, StoreError> {
get_cache_store_client!(&self.pools, StoreError::Disconnected, client {
match client.hget::<_, _, StoreGetType>(
store_key,
(
KEY_TYPE,
KEY_NAME,
KEY_TTL,
KEY_FLATTEN,
KEY_BLACKHOLE,
KEY_REGION,
KEY_RESCUE,
KEY_VALUE
),
).await {
Ok(values) => {
if let (Some(kind_value), Some(name_value), Ok(value_value)) = (
RecordType::from_str(&values.0),
RecordName::from_str(&values.1),
serde_json::from_str(&values.7)
) {
let ttl = if values.2 > 0 {
Some(values.2)
} else {
None
};
let flatten = values.3.and_then(|flatten_raw| {
if flatten_raw == "1" {
Some(true)
} else {
None
}
});
let blackhole = values.4.and_then(|blackhole_raw| {
serde_json::from_str::<RecordBlackhole>(&blackhole_raw).ok()
});
let regions = values.5.and_then(|region_raw| {
serde_json::from_str::<RecordRegions>(®ion_raw).ok()
});
let rescue = values.6.and_then(|rescue_raw| {
serde_json::from_str::<RecordValues>(&rescue_raw).ok()
});
debug!(
"read store record with kind: {:?}, name: {:?} and values: {:?}",
kind_value,
name_value,
value_value
);
if flatten.is_some() == true {
debug!(
"store record with kind: {:?}, name: {:?} has flatten: {:?}",
kind_value,
name_value,
flatten
);
}
if blackhole.is_some() == true {
debug!(
"store record with kind: {:?}, name: {:?} has blackhole: {:?}",
kind_value,
name_value,
blackhole
);
}
if regions.is_some() == true {
debug!(
"store record with kind: {:?}, name: {:?} has regions: {:?}",
kind_value,
name_value,
regions
);
}
if rescue.is_some() == true {
debug!(
"store record with kind: {:?}, name: {:?} has rescue: {:?}",
kind_value,
name_value,
rescue
);
}
let record = StoreRecord {
kind: kind_value,
name: name_value,
ttl: ttl,
flatten: flatten,
blackhole: blackhole,
regions: regions,
rescue: rescue,
values: value_value,
};
STORE_CACHE.push(store_key, Some(record.clone()), cache_accessed_at);
Ok(record)
} else {
Err(StoreError::Corrupted)
}
},
Err(err) => {
debug!("could not read store record at key: {}, because: {}", store_key, err);
if err.kind() == ErrorKind::TypeError {
STORE_CACHE.push(store_key, None, cache_accessed_at);
}
Err(StoreError::NotFound)
},
}
})
}
}
impl Default for StoreLimitsRate {
fn default() -> Self {
Self {
time_last: Instant::now(),
time_spent: Duration::new(0, 0),
}
}
}
impl StoreRecord {
pub fn list_record_values<'a>(&'a self) -> HashSet<&'a RecordValue> {
let mut unique_values = HashSet::new();
for value in self.values.iter() {
unique_values.insert(value);
}
if let Some(ref regions) = self.regions {
self.insert_record_values(®ions.nnam, &mut unique_values);
self.insert_record_values(®ions.snam, &mut unique_values);
self.insert_record_values(®ions.nsam, &mut unique_values);
self.insert_record_values(®ions.ssam, &mut unique_values);
self.insert_record_values(®ions.weu, &mut unique_values);
self.insert_record_values(®ions.ceu, &mut unique_values);
self.insert_record_values(®ions.eeu, &mut unique_values);
self.insert_record_values(®ions.ru, &mut unique_values);
self.insert_record_values(®ions.me, &mut unique_values);
self.insert_record_values(®ions.naf, &mut unique_values);
self.insert_record_values(®ions.maf, &mut unique_values);
self.insert_record_values(®ions.saf, &mut unique_values);
self.insert_record_values(®ions.seas, &mut unique_values);
self.insert_record_values(®ions.neas, &mut unique_values);
self.insert_record_values(®ions.oc, &mut unique_values);
self.insert_record_values(®ions._in, &mut unique_values);
}
unique_values
}
fn insert_record_values<'a>(
&'a self,
record_values: &'a Option<RecordValues>,
unique_values: &mut HashSet<&'a RecordValue>,
) {
if let Some(record_values) = record_values {
for value in record_values.iter() {
unique_values.insert(value);
}
}
}
}