#![allow(clippy::ptr_arg)]
#![allow(clippy::needless_pass_by_value)]
use atomic_immut::AtomicImmut;
use cannyls::deadline::Deadline;
use cannyls::lump::LumpId;
use cannyls_rpc::DeviceId;
use frugalos_segment::{ObjectValue, SegmentStatistics};
use futures::{self, Future};
use libfrugalos::consistency::ReadConsistency;
use libfrugalos::entity::bucket::{BucketId, BucketKind};
use libfrugalos::entity::object::{
DeleteObjectsByPrefixSummary, FragmentsSummary, ObjectId, ObjectPrefix, ObjectSummary,
ObjectVersion,
};
use libfrugalos::expect::Expect;
use rustracing_jaeger::span::{Span, SpanHandle};
use std::collections::HashMap;
use std::fmt;
use std::ops::Range;
use std::sync::Arc;
use std::time::Duration;
use trackable::error::ErrorKindExt;
use crate::bucket::Bucket;
use crate::{Error, ErrorKind};
type BoxFuture<T> = Box<dyn Future<Item = T, Error = Error> + Send + 'static>;
#[derive(Clone)]
pub struct FrugalosClient {
buckets: Arc<AtomicImmut<HashMap<BucketId, Bucket>>>,
}
impl FrugalosClient {
pub(crate) fn new(buckets: Arc<AtomicImmut<HashMap<BucketId, Bucket>>>) -> Self {
FrugalosClient { buckets }
}
pub fn request(&self, bucket_id: BucketId) -> Request {
Request::new(self, bucket_id)
}
pub fn segment_count(&self, bucket_id: &BucketId) -> Option<u16> {
self.buckets
.load()
.get(bucket_id)
.map(|b| b.segments().len() as u16)
}
pub fn effectiveness_ratio(&self, bucket_id: &BucketId) -> Option<f64> {
self.buckets
.load()
.get(bucket_id)
.map(|b| b.effectiveness_ratio())
}
pub fn redundance_ratio(&self, bucket_id: &BucketId) -> Option<f64> {
self.buckets
.load()
.get(bucket_id)
.map(|b| b.redundance_ratio())
}
pub fn bucket_kind(&self, bucket_id: &BucketId) -> Option<BucketKind> {
self.buckets.load().get(bucket_id).map(|b| b.kind())
}
}
impl fmt::Debug for FrugalosClient {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "FrugalosClient {{ .. }}")
}
}
macro_rules! try_get_bucket {
($buckets:expr, $bucket_id:expr) => {
if let Some(bucket) = $buckets.get(&$bucket_id) {
bucket
} else {
let e = ErrorKind::NotFound
.cause(format!("No such bucket: {:?}", $bucket_id))
.into();
return Box::new(futures::failed(e));
}
};
}
pub struct Request<'a> {
client: &'a FrugalosClient,
bucket_id: BucketId,
deadline: Deadline,
expect: Expect,
parent: SpanHandle,
}
impl<'a> Request<'a> {
pub fn new(client: &'a FrugalosClient, bucket_id: BucketId) -> Self {
Request {
client,
bucket_id,
deadline: Deadline::Within(Duration::from_millis(5000)),
expect: Expect::Any,
parent: Span::inactive().handle(),
}
}
pub fn deadline(&mut self, deadline: Deadline) -> &mut Self {
self.deadline = deadline;
self
}
pub fn expect(&mut self, expect: Expect) -> &mut Self {
self.expect = expect;
self
}
pub fn span(&mut self, span: &Span) -> &mut Self {
self.parent = span.handle();
self
}
pub fn get(
&self,
object_id: ObjectId,
consistency: ReadConsistency,
) -> BoxFuture<Option<ObjectValue>> {
let buckets = self.client.buckets.load();
let bucket = try_get_bucket!(buckets, self.bucket_id);
let segment = bucket.get_segment(&object_id);
let future = segment.get(object_id, self.deadline, consistency, self.parent.clone());
Box::new(future.map_err(|e| track!(Error::from(e))))
}
pub fn head(
&self,
object_id: ObjectId,
consistency: ReadConsistency,
) -> BoxFuture<Option<ObjectVersion>> {
let buckets = self.client.buckets.load();
let bucket = try_get_bucket!(buckets, self.bucket_id);
let segment = bucket.get_segment(&object_id);
let future = segment.head(object_id, consistency, self.parent.clone());
Box::new(future.map_err(|e| track!(Error::from(e))))
}
pub fn head_storage(
&self,
object_id: ObjectId,
consistency: ReadConsistency,
) -> BoxFuture<Option<ObjectVersion>> {
let buckets = self.client.buckets.load();
let bucket = try_get_bucket!(buckets, self.bucket_id);
let segment = bucket.get_segment(&object_id);
let future =
segment.head_storage(object_id, self.deadline, consistency, self.parent.clone());
Box::new(future.map_err(|e| track!(Error::from(e))))
}
pub fn count_fragments(
&self,
object_id: ObjectId,
consistency: ReadConsistency,
) -> BoxFuture<Option<FragmentsSummary>> {
let buckets = self.client.buckets.load();
let bucket = try_get_bucket!(buckets, self.bucket_id);
let segment = bucket.get_segment(&object_id);
let future =
segment.count_fragments(object_id, self.deadline, consistency, self.parent.clone());
Box::new(future.map_err(|e| track!(Error::from(e))))
}
pub fn put(&self, object_id: ObjectId, content: Vec<u8>) -> BoxFuture<(ObjectVersion, bool)> {
let buckets = self.client.buckets.load();
let bucket = try_get_bucket!(buckets, self.bucket_id);
let segment = bucket.get_segment(&object_id);
let future = segment.put(
object_id,
content,
self.deadline,
self.expect.clone(),
self.parent.clone(),
);
Box::new(future.map_err(|e| track!(Error::from(e))))
}
pub fn delete(&self, object_id: ObjectId) -> BoxFuture<Option<ObjectVersion>> {
let buckets = self.client.buckets.load();
let bucket = try_get_bucket!(buckets, self.bucket_id);
let segment = bucket.get_segment(&object_id);
let future = segment.delete(
object_id,
self.deadline,
self.expect.clone(),
self.parent.clone(),
);
Box::new(future.map_err(|e| track!(Error::from(e))))
}
pub fn delete_by_version(
&self,
segment: usize,
object_version: ObjectVersion,
) -> BoxFuture<Option<ObjectVersion>> {
let buckets = self.client.buckets.load();
let bucket = try_get_bucket!(buckets, self.bucket_id);
if segment < bucket.segments().len() {
let segment = &bucket.segments()[segment];
let future =
segment.delete_by_version(object_version, self.deadline, self.parent.clone());
Box::new(future.map_err(|e| track!(Error::from(e))))
} else {
let e = ErrorKind::InvalidInput.cause(format!("Too large segment number: {}", segment));
Box::new(futures::failed(e.into()))
}
}
pub fn delete_by_range(
&self,
segment: usize,
targets: Range<ObjectVersion>,
) -> BoxFuture<Vec<ObjectSummary>> {
let buckets = self.client.buckets.load();
let bucket = try_get_bucket!(buckets, self.bucket_id);
if segment < bucket.segments().len() {
let segment = &bucket.segments()[segment];
let future = segment.delete_by_range(targets, self.deadline, self.parent.clone());
Box::new(future.map_err(|e| track!(Error::from(e))))
} else {
let e = ErrorKind::InvalidInput.cause(format!("Too large segment number: {}", segment));
Box::new(futures::failed(e.into()))
}
}
pub fn delete_by_prefix(
&self,
prefix: ObjectPrefix,
) -> BoxFuture<DeleteObjectsByPrefixSummary> {
let buckets = self.client.buckets.load();
let bucket = try_get_bucket!(buckets, self.bucket_id);
let mut futures = Vec::new();
for segment in bucket.segments() {
futures.push(
segment
.delete_by_prefix(prefix.clone(), self.deadline, self.parent.clone())
.map_err(|e| track!(Error::from(e))),
);
}
Box::new(futures::future::join_all(futures).map(|summaries| {
let total = summaries.iter().map(|summary| summary.total).sum();
DeleteObjectsByPrefixSummary { total }
}))
}
#[allow(clippy::type_complexity)]
pub fn delete_fragment(
&self,
object_id: ObjectId,
index: usize,
) -> BoxFuture<Option<(ObjectVersion, Option<(bool, DeviceId, LumpId)>)>> {
let buckets = self.client.buckets.load();
let bucket = try_get_bucket!(buckets, self.bucket_id);
let segment = bucket.get_segment(&object_id);
let future = segment.delete_fragment(object_id, self.deadline, self.parent.clone(), index);
Box::new(future.map_err(|e| track!(Error::from(e))))
}
pub fn list(&self, segment: usize) -> BoxFuture<Vec<ObjectSummary>> {
let buckets = self.client.buckets.load();
let bucket = try_get_bucket!(buckets, self.bucket_id);
if segment < bucket.segments().len() {
let future = bucket.segments()[segment].list();
Box::new(future.map_err(|e| track!(Error::from(e))))
} else {
let e = ErrorKind::InvalidInput.cause(format!("Too large segment number: {}", segment));
Box::new(futures::failed(e.into()))
}
}
pub fn list_by_prefix(&self, prefix: ObjectPrefix) -> BoxFuture<Vec<ObjectSummary>> {
let buckets = self.client.buckets.load();
let bucket = try_get_bucket!(buckets, self.bucket_id);
let mut futures = Vec::new();
for segment in bucket.segments() {
futures.push(
segment
.list_by_prefix(prefix.clone(), self.deadline, self.parent.clone())
.map_err(|e| track!(Error::from(e))),
);
}
Box::new(
futures::future::join_all(futures)
.map(|summaries| summaries.into_iter().flatten().collect()),
)
}
pub fn latest(&self, segment: usize) -> BoxFuture<Option<ObjectSummary>> {
let buckets = self.client.buckets.load();
let bucket = try_get_bucket!(buckets, self.bucket_id);
if segment < bucket.segments().len() {
let future = bucket.segments()[segment].latest();
Box::new(future.map_err(|e| track!(Error::from(e))))
} else {
let e = ErrorKind::InvalidInput.cause(format!("Too large segment number: {}", segment));
Box::new(futures::failed(e.into()))
}
}
pub fn object_count(&self, segment: usize) -> BoxFuture<u64> {
let buckets = self.client.buckets.load();
let bucket = try_get_bucket!(buckets, self.bucket_id);
if segment < bucket.segments().len() {
let future = bucket.segments()[segment].object_count();
Box::new(future.map_err(|e| track!(Error::from(e))))
} else {
let e = ErrorKind::InvalidInput.cause(format!("Too large segment number: {}", segment));
Box::new(futures::failed(e.into()))
}
}
pub fn segment_stats(&self, segment: u16) -> BoxFuture<SegmentStatistics> {
let segment = segment as usize;
let buckets = self.client.buckets.load();
let bucket = try_get_bucket!(buckets, self.bucket_id);
if segment < bucket.segments().len() {
let future = bucket.segments()[segment].stats(self.parent.clone());
Box::new(future.map_err(|e| track!(Error::from(e))))
} else {
let e = ErrorKind::InvalidInput.cause(format!("Too large segment number: {}", segment));
Box::new(futures::failed(e.into()))
}
}
}