use {
crate::{
ContentHash,
database::{
DynPartition, HasPartition, Partition, Partitions, RecordRef,
partitions::SortKeyOf,
query::{
PartitionQueryBuilder, QueryClient, SortKeyCondition,
TypedPartitionQueryBuilder,
},
query_results::QueryResults,
},
prelude::PartitionKey,
},
std::{future::Future, marker::PhantomData, sync::Arc},
};
pub trait QueryReads {
type Storage: Partitions;
fn query_once<'a, Part>(
&'a mut self,
partition: Part,
) -> TypedPartitionQueryBuilder<'a, Self::Storage, Part>
where
Part: Partition + PartitionKey + SortKeyOf<Self::Storage>;
fn get_record(
&mut self,
partition_key: crate::SpannedIdent,
sort_key: <Self::Storage as Partitions>::SortKey,
) -> impl Future<Output = QueryResults<Self::Storage>> + Send;
fn index_get<Part>(
&self,
sort_key: &Part::SortKey,
) -> Option<Part::IndexEntry>
where
Part: Partition + PartitionKey,
<Self::Storage as Partitions>::Stores: HasPartition<Part>;
fn index_range<Part>(
&self,
prefix: &Part::SortKey,
) -> Vec<(Part::SortKey, Part::IndexEntry)>
where
Part: Partition + PartitionKey,
<Self::Storage as Partitions>::Stores: HasPartition<Part>;
fn index_entries<Part>(&self) -> Vec<(Part::SortKey, Part::IndexEntry)>
where
Part: Partition + PartitionKey,
<Self::Storage as Partitions>::Stores: HasPartition<Part>;
fn get<Part>(&self, hash: ContentHash) -> Option<RecordRef<'_, Part>>
where
Part: Partition + 'static,
<Self::Storage as Partitions>::Stores: HasPartition<Part>;
fn get_by_hash<D: PartitionKey>(
&self,
hash: ContentHash,
) -> Option<<Self::Storage as Partitions>::RecordRef<'_>>;
fn span_index_get<D: PartitionKey>(
&self,
uri: &crate::Uri,
byte_offset: u64,
) -> Option<<Self::Storage as Partitions>::RecordRef<'_>>;
fn span_index_hash<D: PartitionKey>(
&self,
uri: &crate::Uri,
byte_offset: u64,
) -> Option<ContentHash>;
}
macro_rules! impl_query_reads {
($client:ident) => {
impl<P: Partitions> QueryReads for $client<P> {
type Storage = P;
fn query_once<'a, Part>(
&'a mut self,
partition: Part,
) -> TypedPartitionQueryBuilder<'a, P, Part>
where
Part: Partition + PartitionKey + SortKeyOf<P>,
{
self.query_once(partition)
}
fn get_record(
&mut self,
partition_key: crate::SpannedIdent,
sort_key: P::SortKey,
) -> impl Future<Output = QueryResults<P>> + Send {
self.get_record(partition_key, sort_key)
}
fn index_get<Part>(
&self,
sort_key: &Part::SortKey,
) -> Option<Part::IndexEntry>
where
Part: Partition + PartitionKey,
P::Stores: HasPartition<Part>,
{
self.index_get::<Part>(sort_key)
}
fn index_range<Part>(
&self,
prefix: &Part::SortKey,
) -> Vec<(Part::SortKey, Part::IndexEntry)>
where
Part: Partition + PartitionKey,
P::Stores: HasPartition<Part>,
{
self.index_range::<Part>(prefix)
}
fn index_entries<Part>(&self) -> Vec<(Part::SortKey, Part::IndexEntry)>
where
Part: Partition + PartitionKey,
P::Stores: HasPartition<Part>,
{
self.index_entries::<Part>()
}
fn get<Part>(&self, hash: ContentHash) -> Option<RecordRef<'_, Part>>
where
Part: Partition + 'static,
P::Stores: HasPartition<Part>,
{
self.get::<Part>(hash)
}
fn get_by_hash<D: PartitionKey>(
&self,
hash: ContentHash,
) -> Option<P::RecordRef<'_>> {
self.get_by_hash::<D>(hash)
}
fn span_index_get<D: PartitionKey>(
&self,
uri: &crate::Uri,
byte_offset: u64,
) -> Option<P::RecordRef<'_>> {
self.span_index_get::<D>(uri, byte_offset)
}
fn span_index_hash<D: PartitionKey>(
&self,
uri: &crate::Uri,
byte_offset: u64,
) -> Option<ContentHash> {
self.span_index_hash::<D>(uri, byte_offset)
}
}
};
}
impl_query_reads!(EventQueryClient);
impl_query_reads!(WatcherQueryClient);
pub trait QueryClientSubscriptions<P: Partitions>: Send {
fn take_subscriptions(
&mut self,
) -> Vec<(crate::SpannedIdent, SortKeyCondition<P::SortKey>)>;
}
macro_rules! impl_shared_reads {
($client:ident) => {
impl<P: Partitions> $client<P> {
pub fn query_once<'a, Part>(
&'a mut self,
partition: Part,
) -> TypedPartitionQueryBuilder<'a, P, Part>
where
Part: Partition + PartitionKey + SortKeyOf<P>,
{
self.engine.query(partition)
}
pub async fn scan<Part>(&mut self, _partition: Part) -> QueryResults<P>
where
Part: Partition + PartitionKey + SortKeyOf<P>,
{
self
.engine
.get_record_internal(<Part as PartitionKey>::KEY, None)
.await
}
pub fn query_partition<'a, Part: DynPartition + PartitionKey>(
&'a mut self,
partition: Part,
) -> PartitionQueryBuilder<'a, P, Part> {
self.engine.query_partition(partition)
}
pub async fn get_record(
&mut self,
partition_key: crate::SpannedIdent,
sort_key: P::SortKey,
) -> QueryResults<P> {
self.engine.get_record(partition_key, sort_key).await
}
pub async fn get_record_dyn<D: DynPartition + PartitionKey>(
&mut self,
partition: D,
key: D::DynSortKey,
) -> QueryResults<P> {
self.engine.get_record_dyn(partition, key).await
}
pub async fn batch_get_items(
&mut self,
items: Vec<(crate::SpannedIdent, P::SortKey)>,
) -> QueryResults<P> {
self.engine.batch_get_items(items).await
}
pub fn index_get<Part>(
&self,
sort_key: &Part::SortKey,
) -> Option<Part::IndexEntry>
where
Part: Partition + PartitionKey,
P::Stores: HasPartition<Part>,
{
self.engine.index_get::<Part>(sort_key)
}
pub fn index_range<Part>(
&self,
prefix: &Part::SortKey,
) -> Vec<(Part::SortKey, Part::IndexEntry)>
where
Part: Partition + PartitionKey,
P::Stores: HasPartition<Part>,
{
self.engine.index_range::<Part>(prefix)
}
pub fn index_less_than<Part>(
&self,
value: &Part::SortKey,
inclusive: bool,
) -> Vec<(Part::SortKey, Part::IndexEntry)>
where
Part: Partition + PartitionKey,
P::Stores: HasPartition<Part>,
{
self.engine.index_less_than::<Part>(value, inclusive)
}
pub fn index_greater_than<Part>(
&self,
value: &Part::SortKey,
inclusive: bool,
) -> Vec<(Part::SortKey, Part::IndexEntry)>
where
Part: Partition + PartitionKey,
P::Stores: HasPartition<Part>,
{
self.engine.index_greater_than::<Part>(value, inclusive)
}
pub fn index_between<Part>(
&self,
from: &Part::SortKey,
to: &Part::SortKey,
) -> Vec<(Part::SortKey, Part::IndexEntry)>
where
Part: Partition + PartitionKey,
P::Stores: HasPartition<Part>,
{
self.engine.index_between::<Part>(from, to)
}
pub fn index_entries<Part>(
&self,
) -> Vec<(Part::SortKey, Part::IndexEntry)>
where
Part: Partition + PartitionKey,
P::Stores: HasPartition<Part>,
{
self.engine.index_entries::<Part>()
}
pub fn get<Part>(&self, hash: ContentHash) -> Option<RecordRef<'_, Part>>
where
Part: Partition + 'static,
P::Stores: HasPartition<Part>,
{
self.engine.get::<Part>(hash)
}
pub fn get_by_hash<D: PartitionKey>(
&self,
hash: ContentHash,
) -> Option<P::RecordRef<'_>> {
self.engine.get_by_hash::<D>(hash)
}
pub fn span_index_get<D: PartitionKey>(
&self,
uri: &crate::Uri,
byte_offset: u64,
) -> Option<P::RecordRef<'_>> {
self.engine.span_index_get::<D>(uri, byte_offset)
}
pub fn span_index_hash<D: PartitionKey>(
&self,
uri: &crate::Uri,
byte_offset: u64,
) -> Option<ContentHash> {
self.engine.span_index_hash::<D>(uri, byte_offset)
}
}
};
}
pub struct EventQueryClient<P: Partitions> {
engine: QueryClient<P>,
}
impl<P: Partitions> EventQueryClient<P> {
pub(crate) fn new(engine: QueryClient<P>) -> Self {
Self { engine }
}
pub(crate) fn engine_mut(&mut self) -> &mut QueryClient<P> {
&mut self.engine
}
}
impl_shared_reads!(EventQueryClient);
pub struct WatcherQueryClient<P: Partitions> {
engine: QueryClient<P>,
subscriptions: Vec<(crate::SpannedIdent, SortKeyCondition<P::SortKey>)>,
}
impl<P: Partitions> WatcherQueryClient<P> {
pub(crate) fn new(engine: QueryClient<P>) -> Self {
Self {
engine,
subscriptions: Vec::new(),
}
}
pub fn query<Part>(
&mut self,
_partition: Part,
) -> SubscribingQueryBuilder<'_, P, Part>
where
Part: Partition + PartitionKey + SortKeyOf<P>,
{
SubscribingQueryBuilder {
client: self,
condition: SortKeyCondition::All,
_part: PhantomData,
}
}
}
impl_shared_reads!(WatcherQueryClient);
impl<P: Partitions> QueryClientSubscriptions<P> for WatcherQueryClient<P> {
fn take_subscriptions(
&mut self,
) -> Vec<(crate::SpannedIdent, SortKeyCondition<P::SortKey>)> {
std::mem::take(&mut self.subscriptions)
}
}
impl<P: Partitions> QueryClientSubscriptions<P> for EventQueryClient<P> {
fn take_subscriptions(
&mut self,
) -> Vec<(crate::SpannedIdent, SortKeyCondition<P::SortKey>)> {
Vec::new()
}
}
pub struct SubscribingQueryBuilder<'a, P: Partitions, Part> {
client: &'a mut WatcherQueryClient<P>,
condition: SortKeyCondition<P::SortKey>,
_part: PhantomData<Part>,
}
impl<'a, P: Partitions, Part> SubscribingQueryBuilder<'a, P, Part>
where
Part: Partition + PartitionKey + SortKeyOf<P>,
{
pub fn sort_key(mut self, key: Part::SortKey) -> Self {
self.condition =
SortKeyCondition::Exact(<Part as SortKeyOf<P>>::wrap_sort_key(key));
self
}
pub fn sort_key_begins_with(mut self, prefix: Part::SortKey) -> Self {
self.condition = SortKeyCondition::BeginsWith(
<Part as SortKeyOf<P>>::wrap_sort_key(prefix),
);
self
}
pub fn sort_key_between(
mut self,
from: Part::SortKey,
to: Part::SortKey,
) -> Self {
self.condition = SortKeyCondition::Between(
<Part as SortKeyOf<P>>::wrap_sort_key(from),
<Part as SortKeyOf<P>>::wrap_sort_key(to),
);
self
}
pub fn sort_key_less_than(mut self, value: Part::SortKey) -> Self {
self.condition =
SortKeyCondition::LessThan(<Part as SortKeyOf<P>>::wrap_sort_key(value));
self
}
pub fn sort_key_less_than_or_equal(mut self, value: Part::SortKey) -> Self {
self.condition =
SortKeyCondition::LessThanOrEqual(<Part as SortKeyOf<P>>::wrap_sort_key(
value,
));
self
}
pub fn sort_key_greater_than(mut self, value: Part::SortKey) -> Self {
self.condition = SortKeyCondition::GreaterThan(
<Part as SortKeyOf<P>>::wrap_sort_key(value),
);
self
}
pub fn sort_key_greater_than_or_equal(
mut self,
value: Part::SortKey,
) -> Self {
self.condition = SortKeyCondition::GreaterThanOrEqual(
<Part as SortKeyOf<P>>::wrap_sort_key(value),
);
self
}
pub async fn execute(self) -> QueryResults<P> {
debug_assert!(
!matches!(self.condition, SortKeyCondition::All),
"WatcherQueryClient::query(...) with no sort-key bound is a whole-partition \
subscription; use scan() for a whole-partition read, or bound the query"
);
let pk = <Part as PartitionKey>::KEY;
self.client.subscriptions.push((pk, self.condition.clone()));
let engine = &mut self.client.engine;
match self.condition {
| SortKeyCondition::Exact(k) => {
engine.get_record_internal(pk, Some(k)).await
},
| SortKeyCondition::BeginsWith(p) => engine.prefix_internal(pk, p).await,
| SortKeyCondition::Between(from, to) => {
engine.between_internal(pk, from, to).await
},
| SortKeyCondition::LessThan(v) => {
engine.less_than_internal(pk, v, false).await
},
| SortKeyCondition::LessThanOrEqual(v) => {
engine.less_than_internal(pk, v, true).await
},
| SortKeyCondition::GreaterThan(v) => {
engine.greater_than_internal(pk, v, false).await
},
| SortKeyCondition::GreaterThanOrEqual(v) => {
engine.greater_than_internal(pk, v, true).await
},
| SortKeyCondition::All => engine.get_record_internal(pk, None).await,
| SortKeyCondition::Never => {
QueryResults::new(Vec::new(), Arc::clone(&engine.db.cas))
},
}
}
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::{
Ident,
database::{
Database,
chunk::RecordWriter,
tests::storage::{Test2Partition, TestPartitions, TestRecordData},
},
record::LaburnumRecord,
},
macro_rules_attribute::apply,
smol_macros::test,
};
fn make_record(value: &str) -> TestRecordData {
TestRecordData::Laburnum(LaburnumRecord::WorkspaceConfig {
value: value.to_string(),
})
}
fn commit(db: &Database<TestPartitions>, sort_key: &str, value: &str) {
let sc =
crate::source::cache::reporter::SourceCacheReader::new_empty_for_test();
let mut writer = RecordWriter::new(Ident::new("test-writer"));
writer.insert::<Test2Partition>(sort_key.to_string(), make_record(value));
db.commit_chunk(writer.build(), &sc);
}
fn watcher(
db: Database<TestPartitions>,
) -> WatcherQueryClient<TestPartitions> {
WatcherQueryClient::new(QueryClient::new(db))
}
#[apply(test!)]
async fn query_records_subscription_even_when_empty() {
let db = Database::<TestPartitions>::new();
let mut wc = watcher(db);
let results = wc
.query(Test2Partition)
.sort_key_begins_with("dep:".to_string())
.execute()
.await;
assert_eq!(results.len(), 0, "empty db yields no rows");
let subs = wc.take_subscriptions();
assert_eq!(subs.len(), 1, "an empty query still subscribes");
assert_eq!(subs[0].0, <Test2Partition as PartitionKey>::KEY);
assert!(
subs[0].1
== SortKeyCondition::BeginsWith(<Test2Partition as SortKeyOf<
TestPartitions,
>>::wrap_sort_key(
"dep:".to_string()
)),
"subscription carries the queried condition"
);
}
#[apply(test!)]
async fn query_returns_rows_and_subscribes() {
let db = Database::<TestPartitions>::new();
commit(&db, "dep:a", "v");
let mut wc = watcher(db);
let results = wc
.query(Test2Partition)
.sort_key_begins_with("dep:".to_string())
.execute()
.await;
assert_eq!(results.len(), 1);
assert_eq!(wc.take_subscriptions().len(), 1);
}
#[apply(test!)]
async fn query_once_and_scan_do_not_subscribe() {
let db = Database::<TestPartitions>::new();
commit(&db, "dep:a", "v");
let mut wc = watcher(db);
let _ = wc
.query_once(Test2Partition)
.sort_key_begins_with("dep:".to_string())
.execute()
.await;
let scanned = wc.scan(Test2Partition).await;
assert_eq!(scanned.len(), 1, "scan reads the whole partition");
assert!(
wc.take_subscriptions().is_empty(),
"query_once and scan never subscribe"
);
}
#[apply(test!)]
async fn event_client_reads_without_subscribing() {
let db = Database::<TestPartitions>::new();
commit(&db, "dep:a", "v");
let mut ec = EventQueryClient::new(QueryClient::new(db));
let results = ec
.query_once(Test2Partition)
.sort_key_begins_with("dep:".to_string())
.execute()
.await;
assert_eq!(results.len(), 1);
let scanned = ec.scan(Test2Partition).await;
assert_eq!(scanned.len(), 1);
}
}