use {
crate::{
ContentHash, Ident,
database::{
Database, DynPartition, GenerationEpoch, HasPartition, Partition, Partitions, RecordRef,
query::{PartitionQueryBuilder, QueryBuilder, TypedPartitionQueryBuilder},
query_results::{QueryResults, RecordMetadata},
},
prelude::PartitionKey,
},
std::sync::Arc,
};
#[derive(Clone)]
pub struct QueryClient<P: Partitions> {
pub(crate) db: Database<P>,
snapshot_epoch: GenerationEpoch,
}
impl<P: Partitions> QueryClient<P> {
pub fn new(db: Database<P>) -> Self {
let snapshot_epoch = db.get_current_epoch();
Self {
db,
snapshot_epoch,
}
}
pub(crate) fn query_any<'a>(
&'a mut self,
partition_key: Ident,
) -> QueryBuilder<'a, P> {
QueryBuilder::new(self, partition_key)
}
pub fn query_partition<'a, Part: DynPartition + PartitionKey>(
&'a mut self,
_partition: Part,
) -> PartitionQueryBuilder<'a, P, Part> {
PartitionQueryBuilder::new(self)
}
pub fn query<'a, Part: Partition + PartitionKey>(
&'a mut self,
_partition: Part,
) -> TypedPartitionQueryBuilder<'a, P, Part> {
TypedPartitionQueryBuilder::new(self)
}
pub async fn get_record(
&mut self,
partition_key: Ident,
sort_key: String,
) -> QueryResults<P> {
self
.get_record_internal(partition_key, Some(sort_key))
.await
}
pub fn span_index_get<D: PartitionKey>(
&self,
uri: &crate::Uri,
byte_offset: u64,
) -> Option<P::RecordRef<'_>> {
let hash = self.db.span_index_query(D::KEY, uri, byte_offset)?;
self.db.cas.get_any(D::KEY, hash)
}
pub fn get_by_hash<D: PartitionKey>(
&self,
hash: ContentHash,
) -> Option<P::RecordRef<'_>> {
self.db.cas.get_any(D::KEY, hash)
}
pub async fn batch_get_items<T>(
&mut self,
items: Vec<(Ident, T)>,
) -> QueryResults<P>
where
T: Into<String> + Clone,
{
let mut all_records = Vec::new();
for (partition_key, sort_key) in items {
let results = self
.get_record_internal(partition_key, Some(sort_key))
.await;
all_records.extend(results.records);
}
all_records.sort_by(|a, b| a.sort_key.cmp(&b.sort_key));
QueryResults::new(all_records, Arc::clone(&self.db.cas))
}
pub(crate) async fn get_record_internal<T>(
&mut self,
partition_key: Ident,
sort_key: Option<T>,
) -> QueryResults<P>
where
T: Into<String> + Clone,
{
match sort_key {
| Some(sk) => {
let sk_string = sk.into();
match self.db.index_get(partition_key, &sk_string) {
| Some(content_hash) => {
let records = vec![RecordMetadata {
partition_key,
sort_key: sk_string,
content_hash,
}];
QueryResults::new(records, Arc::clone(&self.db.cas))
},
| None => {
QueryResults::new(Vec::new(), Arc::clone(&self.db.cas))
},
}
},
| None => {
let entries = self.db.index_range(partition_key, "");
if entries.is_empty() {
return QueryResults::new(Vec::new(), Arc::clone(&self.db.cas));
}
let records: Vec<RecordMetadata> = entries
.into_iter()
.map(|(_, sort_key, content_hash)| RecordMetadata {
partition_key,
sort_key,
content_hash,
})
.collect();
QueryResults::new(records, Arc::clone(&self.db.cas))
},
}
}
pub(crate) async fn less_than_internal(
&mut self,
partition_key: Ident,
value: String,
inclusive: bool,
) -> QueryResults<P> {
let entries = self.db.index_less_than(partition_key, &value, inclusive);
if entries.is_empty() {
return QueryResults::new(Vec::new(), Arc::clone(&self.db.cas));
}
let records: Vec<RecordMetadata> = entries
.into_iter()
.map(|(_, sort_key, content_hash)| RecordMetadata {
partition_key,
sort_key,
content_hash,
})
.collect();
QueryResults::new(records, Arc::clone(&self.db.cas))
}
pub(crate) async fn greater_than_internal(
&mut self,
partition_key: Ident,
value: String,
inclusive: bool,
) -> QueryResults<P> {
let entries = self.db.index_greater_than(partition_key, &value, inclusive);
if entries.is_empty() {
return QueryResults::new(Vec::new(), Arc::clone(&self.db.cas));
}
let records: Vec<RecordMetadata> = entries
.into_iter()
.map(|(_, sort_key, content_hash)| RecordMetadata {
partition_key,
sort_key,
content_hash,
})
.collect();
QueryResults::new(records, Arc::clone(&self.db.cas))
}
pub(crate) async fn between_internal(
&mut self,
partition_key: Ident,
from: String,
to: String,
) -> QueryResults<P> {
let entries = self.db.index_between(partition_key, &from, &to);
if entries.is_empty() {
return QueryResults::new(Vec::new(), Arc::clone(&self.db.cas));
}
let records: Vec<RecordMetadata> = entries
.into_iter()
.map(|(_, sort_key, content_hash)| RecordMetadata {
partition_key,
sort_key,
content_hash,
})
.collect();
QueryResults::new(records, Arc::clone(&self.db.cas))
}
pub(crate) async fn prefix_internal(
&mut self,
partition_key: Ident,
prefix: String,
) -> QueryResults<P> {
let entries = self.db.index_range(partition_key, &prefix);
if entries.is_empty() {
return QueryResults::new(Vec::new(), Arc::clone(&self.db.cas));
}
let records: Vec<RecordMetadata> = entries
.into_iter()
.map(|(_, sort_key, content_hash)| RecordMetadata {
partition_key,
sort_key,
content_hash,
})
.collect();
QueryResults::new(records, Arc::clone(&self.db.cas))
}
pub fn index_get<Part>(&self, sort_key: &str) -> Option<Part::IndexEntry>
where
Part: Partition + PartitionKey,
P::Stores: HasPartition<Part>,
{
<P::Stores as HasPartition<Part>>::store(self.db.cas.stores())
.index_get(sort_key)
}
pub fn index_range<Part>(&self, prefix: &str) -> Vec<(String, Part::IndexEntry)>
where
Part: Partition + PartitionKey,
P::Stores: HasPartition<Part>,
{
<P::Stores as HasPartition<Part>>::store(self.db.cas.stores())
.index_range(prefix)
}
pub fn index_less_than<Part>(
&self,
value: &str,
inclusive: bool,
) -> Vec<(String, Part::IndexEntry)>
where
Part: Partition + PartitionKey,
P::Stores: HasPartition<Part>,
{
<P::Stores as HasPartition<Part>>::store(self.db.cas.stores())
.index_less_than(value, inclusive)
}
pub fn index_greater_than<Part>(
&self,
value: &str,
inclusive: bool,
) -> Vec<(String, Part::IndexEntry)>
where
Part: Partition + PartitionKey,
P::Stores: HasPartition<Part>,
{
<P::Stores as HasPartition<Part>>::store(self.db.cas.stores())
.index_greater_than(value, inclusive)
}
pub fn index_between<Part>(
&self,
from: &str,
to: &str,
) -> Vec<(String, Part::IndexEntry)>
where
Part: Partition + PartitionKey,
P::Stores: HasPartition<Part>,
{
<P::Stores as HasPartition<Part>>::store(self.db.cas.stores())
.index_between(from, to)
}
pub fn get<Part>(&self, hash: ContentHash) -> Option<RecordRef<'_, Part>>
where
Part: Partition + 'static,
P::Stores: HasPartition<Part>,
{
self.db.cas.get::<Part>(hash)
}
pub(crate) fn refresh_snapshot(&mut self) {
self.snapshot_epoch = self.db.get_current_epoch();
}
}