use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use parking_lot::Mutex;
use thiserror::Error;
use crate::conf::{ConfDynSeed, DataStore};
use crate::msg::{Msg, MsgType};
use crate::seeds::{
dns::DnsSeedsProvider as InnerDnsSeedsProvider,
florida::FloridaSeedsProvider as InnerFloridaSeedsProvider,
simple::SimpleSeedsProvider as InnerSimpleSeedsProvider, SeedsError, SeedsProvider as RawSeeds,
};
use crate::stats::{describe_stats, MetricSpec, Snapshot};
pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum DatastoreError {
#[error("unsupported request: {0:?}")]
Unsupported(MsgType),
#[error("datastore error: {0}")]
Backend(String),
#[error("io error: {0}")]
Io(String),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum Protocol {
Redis,
Memcache,
Custom,
}
impl From<DataStore> for Protocol {
fn from(d: DataStore) -> Self {
match d {
DataStore::Redis => Protocol::Redis,
DataStore::Memcache => Protocol::Memcache,
DataStore::Noxu => Protocol::Custom,
}
}
}
pub trait Datastore: Send + Sync {
fn protocol(&self) -> Protocol;
fn supports(&self, _cmd: MsgType) -> bool {
true
}
fn dispatch(&self, req: Msg) -> BoxFuture<'_, Result<Msg, DatastoreError>>;
fn list_buckets_stream(&self) -> DatastoreByteStream {
Box::pin(unsupported_byte_stream())
}
fn list_keys_stream(&self, _bucket: &[u8]) -> DatastoreByteStream {
Box::pin(unsupported_byte_stream())
}
fn riak_get<'a>(
&'a self,
_bucket: &'a [u8],
_key: &'a [u8],
) -> BoxFuture<'a, Result<Option<Vec<u8>>, DatastoreError>> {
Box::pin(async move { Err(DatastoreError::Unsupported(MsgType::Unknown)) })
}
fn riak_put<'a>(
&'a self,
_bucket: &'a [u8],
_key: &'a [u8],
_value: &'a [u8],
_indexes: &'a [(Vec<u8>, Vec<u8>)],
) -> BoxFuture<'a, Result<(), DatastoreError>> {
Box::pin(async move { Err(DatastoreError::Unsupported(MsgType::Unknown)) })
}
fn riak_delete<'a>(
&'a self,
_bucket: &'a [u8],
_key: &'a [u8],
) -> BoxFuture<'a, Result<bool, DatastoreError>> {
Box::pin(async move { Err(DatastoreError::Unsupported(MsgType::Unknown)) })
}
fn riak_index_eq<'a>(
&'a self,
_bucket: &'a [u8],
_index_name: &'a [u8],
_value: &'a [u8],
) -> BoxFuture<'a, Result<Vec<Vec<u8>>, DatastoreError>> {
Box::pin(async move { Err(DatastoreError::Unsupported(MsgType::Unknown)) })
}
fn riak_index_range<'a>(
&'a self,
_bucket: &'a [u8],
_index_name: &'a [u8],
_min: &'a [u8],
_max: &'a [u8],
) -> BoxFuture<'a, Result<Vec<Vec<u8>>, DatastoreError>> {
Box::pin(async move { Err(DatastoreError::Unsupported(MsgType::Unknown)) })
}
}
#[derive(Debug, Default, Clone)]
pub struct MemoryDatastore {
inner: Arc<Mutex<MemoryStore>>,
}
#[derive(Debug, Default)]
struct MemoryStore {
calls: u64,
}
impl MemoryDatastore {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn dispatch_count(&self) -> u64 {
self.inner.lock().calls
}
}
impl Datastore for MemoryDatastore {
fn protocol(&self) -> Protocol {
Protocol::Custom
}
fn dispatch(&self, req: Msg) -> BoxFuture<'_, Result<Msg, DatastoreError>> {
let inner = self.inner.clone();
Box::pin(async move {
inner.lock().calls += 1;
let mut rsp = Msg::new(req.id(), MsgType::Unknown, false);
rsp.set_parent_id(req.id());
Ok(rsp)
})
}
fn list_buckets_stream(&self) -> DatastoreByteStream {
let snapshot = self.list_buckets_snapshot();
Box::pin(VecByteStream {
items: snapshot.into_iter(),
})
}
fn list_keys_stream(&self, bucket: &[u8]) -> DatastoreByteStream {
let snapshot = self.list_keys_snapshot(bucket);
Box::pin(VecByteStream {
items: snapshot.into_iter(),
})
}
}
#[derive(Debug, Clone)]
pub struct RedisDatastore {
target: String,
}
impl RedisDatastore {
pub fn new(target: impl Into<String>) -> Self {
Self {
target: target.into(),
}
}
#[must_use]
pub fn target(&self) -> &str {
&self.target
}
}
impl Datastore for RedisDatastore {
fn protocol(&self) -> Protocol {
Protocol::Redis
}
fn dispatch(&self, req: Msg) -> BoxFuture<'_, Result<Msg, DatastoreError>> {
Box::pin(async move {
let mut rsp = Msg::new(req.id(), MsgType::RspRedisStatus, false);
rsp.set_parent_id(req.id());
Ok(rsp)
})
}
}
#[derive(Debug, Clone)]
pub struct MemcacheDatastore {
target: String,
}
impl MemcacheDatastore {
pub fn new(target: impl Into<String>) -> Self {
Self {
target: target.into(),
}
}
#[must_use]
pub fn target(&self) -> &str {
&self.target
}
}
impl Datastore for MemcacheDatastore {
fn protocol(&self) -> Protocol {
Protocol::Memcache
}
fn dispatch(&self, req: Msg) -> BoxFuture<'_, Result<Msg, DatastoreError>> {
Box::pin(async move {
let mut rsp = Msg::new(req.id(), MsgType::RspMcEnd, false);
rsp.set_parent_id(req.id());
Ok(rsp)
})
}
}
pub trait SeedsProvider: Send + Sync {
fn fetch(&self) -> Result<Vec<ConfDynSeed>, SeedsError>;
fn refresh_interval(&self) -> Duration {
Duration::from_secs(30)
}
}
#[derive(Debug, Clone, Default)]
pub struct SimpleSeedsProvider {
inner: InnerSimpleSeedsProvider,
}
impl SimpleSeedsProvider {
#[must_use]
pub fn new(seeds: Vec<ConfDynSeed>) -> Self {
Self {
inner: InnerSimpleSeedsProvider::new(seeds),
}
}
}
impl SeedsProvider for SimpleSeedsProvider {
fn fetch(&self) -> Result<Vec<ConfDynSeed>, SeedsError> {
self.inner.get_seeds()
}
}
pub type DnsSeedsProvider = InnerDnsSeedsProvider;
impl SeedsProvider for DnsSeedsProvider {
fn fetch(&self) -> Result<Vec<ConfDynSeed>, SeedsError> {
self.get_seeds()
}
}
pub type FloridaSeedsProvider = InnerFloridaSeedsProvider;
impl SeedsProvider for FloridaSeedsProvider {
fn fetch(&self) -> Result<Vec<ConfDynSeed>, SeedsError> {
self.get_seeds()
}
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum CryptoProviderError {
#[error("encryption failed: {0}")]
Encrypt(String),
#[error("decryption failed: {0}")]
Decrypt(String),
#[error("misconfiguration: {0}")]
Misconfigured(String),
}
pub trait CryptoProvider: Send + Sync {
fn rsa_size(&self) -> usize;
fn aes_key(&self) -> [u8; crate::crypto::AES_KEYLEN];
fn aes_encrypt(&self, plaintext: &[u8]) -> Result<Vec<u8>, CryptoProviderError>;
fn aes_decrypt(&self, ciphertext: &[u8]) -> Result<Vec<u8>, CryptoProviderError>;
fn rsa_encrypt(&self, plaintext: &[u8]) -> Result<Vec<u8>, CryptoProviderError>;
fn rsa_decrypt(&self, ciphertext: &[u8]) -> Result<Vec<u8>, CryptoProviderError>;
}
#[derive(Debug)]
pub struct RustCryptoProvider {
crypto: Arc<crate::crypto::Crypto>,
}
impl RustCryptoProvider {
#[must_use]
pub fn new(crypto: crate::crypto::Crypto) -> Self {
Self {
crypto: Arc::new(crypto),
}
}
#[must_use]
pub fn from_arc(crypto: Arc<crate::crypto::Crypto>) -> Self {
Self { crypto }
}
}
impl CryptoProvider for RustCryptoProvider {
fn rsa_size(&self) -> usize {
self.crypto.rsa_size()
}
fn aes_key(&self) -> [u8; crate::crypto::AES_KEYLEN] {
*self.crypto.aes_key()
}
fn aes_encrypt(&self, plaintext: &[u8]) -> Result<Vec<u8>, CryptoProviderError> {
crate::crypto::Crypto::aes_encrypt(plaintext, self.crypto.aes_key())
.map_err(|e| CryptoProviderError::Encrypt(e.to_string()))
}
fn aes_decrypt(&self, ciphertext: &[u8]) -> Result<Vec<u8>, CryptoProviderError> {
crate::crypto::Crypto::aes_decrypt(ciphertext, self.crypto.aes_key())
.map_err(|e| CryptoProviderError::Decrypt(e.to_string()))
}
fn rsa_encrypt(&self, plaintext: &[u8]) -> Result<Vec<u8>, CryptoProviderError> {
self.crypto
.rsa_encrypt(plaintext)
.map_err(|e| CryptoProviderError::Encrypt(e.to_string()))
}
fn rsa_decrypt(&self, ciphertext: &[u8]) -> Result<Vec<u8>, CryptoProviderError> {
self.crypto
.rsa_decrypt(ciphertext)
.map_err(|e| CryptoProviderError::Decrypt(e.to_string()))
}
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum MetricsError {
#[error("metrics flush failed: {0}")]
Flush(String),
}
pub trait MetricsSink: Send + Sync {
fn emit<'a>(&'a self, snapshot: &'a Snapshot) -> BoxFuture<'a, Result<(), MetricsError>>;
fn flush_interval(&self) -> Duration {
Duration::from_secs(10)
}
fn manifest(&self) -> Vec<MetricSpec> {
Vec::new()
}
}
#[derive(Debug, Clone)]
pub struct LoggingMetricsSink {
name: String,
counter: Arc<Mutex<u64>>,
}
impl LoggingMetricsSink {
#[must_use]
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
counter: Arc::new(Mutex::new(0)),
}
}
#[must_use]
pub fn flush_count(&self) -> u64 {
*self.counter.lock()
}
}
impl MetricsSink for LoggingMetricsSink {
fn emit<'a>(&'a self, snapshot: &'a Snapshot) -> BoxFuture<'a, Result<(), MetricsError>> {
let counter = self.counter.clone();
let name = self.name.clone();
let pool_name = snapshot.pool.name.clone();
Box::pin(async move {
*counter.lock() += 1;
tracing::info!(sink = %name, pool = %pool_name, "metrics flush");
Ok(())
})
}
fn manifest(&self) -> Vec<MetricSpec> {
let json = describe_stats();
tracing::debug!(bytes = json.len(), "metrics manifest");
Vec::new()
}
}
use bytes::Bytes;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::OnceLock;
use std::task::{Context, Poll};
pub type DatastoreByteStream =
Pin<Box<dyn futures_core::Stream<Item = Result<Bytes, DatastoreError>> + Send>>;
fn unsupported_byte_stream(
) -> impl futures_core::Stream<Item = Result<Bytes, DatastoreError>> + Send {
UnsupportedListStream { emitted: false }
}
struct UnsupportedListStream {
emitted: bool,
}
impl futures_core::Stream for UnsupportedListStream {
type Item = Result<Bytes, DatastoreError>;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.emitted {
return Poll::Ready(None);
}
self.emitted = true;
Poll::Ready(Some(Err(DatastoreError::Unsupported(MsgType::Unknown))))
}
}
struct VecByteStream {
items: std::vec::IntoIter<Bytes>,
}
impl futures_core::Stream for VecByteStream {
type Item = Result<Bytes, DatastoreError>;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.items.next() {
Some(b) => Poll::Ready(Some(Ok(b))),
None => Poll::Ready(None),
}
}
}
#[derive(Debug, Default)]
struct MemoryListing {
buckets: BTreeMap<Vec<u8>, BTreeSet<Vec<u8>>>,
}
#[derive(Debug, Default, Clone)]
struct ListingHandle {
inner: Arc<Mutex<MemoryListing>>,
}
fn listing_for(ds: &MemoryDatastore) -> ListingHandle {
static REGISTRY: OnceLock<Mutex<Vec<(usize, ListingHandle)>>> = OnceLock::new();
let registry = REGISTRY.get_or_init(|| Mutex::new(Vec::new()));
let id = Arc::as_ptr(&ds.inner) as usize;
let mut g = registry.lock();
if let Some((_, h)) = g.iter().find(|(k, _)| *k == id) {
return h.clone();
}
let h = ListingHandle::default();
g.push((id, h.clone()));
h
}
impl MemoryDatastore {
pub fn insert(&self, bucket: &[u8], key: &[u8]) {
let h = listing_for(self);
let mut g = h.inner.lock();
g.buckets
.entry(bucket.to_vec())
.or_default()
.insert(key.to_vec());
}
#[must_use]
pub fn list_buckets_snapshot(&self) -> Vec<Bytes> {
let h = listing_for(self);
let g = h.inner.lock();
g.buckets
.keys()
.map(|b| Bytes::copy_from_slice(b))
.collect()
}
#[must_use]
pub fn list_keys_snapshot(&self, bucket: &[u8]) -> Vec<Bytes> {
let h = listing_for(self);
let g = h.inner.lock();
g.buckets
.get(bucket)
.map(|s| s.iter().map(|k| Bytes::copy_from_slice(k)).collect())
.unwrap_or_default()
}
}