mod client;
use {
crate::{
Ident,
database::{
DynPartition,
Partition,
Partitions,
query_results::QueryResults,
},
prelude::PartitionKey,
},
std::{
future::poll_fn,
marker::PhantomData,
task::Poll,
time::{
Duration,
Instant,
},
},
thiserror::Error,
};
pub use client::QueryClient;
#[derive(Debug, Error)]
pub enum QueryError {
#[error("Query wait condition timed out after {elapsed:?}")]
WaitTimeout { elapsed: Duration },
#[error("Query cancelled: {reason}")]
Cancelled { reason: String },
}
type WaitCondition<P> = Box<dyn Fn(&QueryResults<P>) -> bool + Send + Sync>;
#[derive(Clone)]
pub enum SortKeyCondition<K> {
Exact(K),
BeginsWith(K),
Between(K, K),
LessThan(K),
LessThanOrEqual(K),
GreaterThan(K),
GreaterThanOrEqual(K),
All,
Never,
}
impl<K: crate::database::partitions::PartitionSortKey> SortKeyCondition<K> {
pub(crate) fn matches(&self, sort_key: &K) -> bool {
match self {
Self::All => true,
Self::Never => false,
Self::Exact(sk) => sort_key == sk,
Self::BeginsWith(prefix) => prefix.is_prefix_of(sort_key),
Self::Between(from, to) => sort_key >= from && sort_key <= to,
Self::LessThan(v) => sort_key < v,
Self::LessThanOrEqual(v) => sort_key <= v,
Self::GreaterThan(v) => sort_key > v,
Self::GreaterThanOrEqual(v) => sort_key >= v,
}
}
}
pub struct QueryBuilder<'a, P: Partitions> {
client: &'a mut QueryClient<P>,
partition_key: Ident,
sort_key_condition: SortKeyCondition<P::SortKey>,
}
impl<'a, P: Partitions> QueryBuilder<'a, P> {
pub(crate) fn new(client: &'a mut QueryClient<P>, partition_key: Ident) -> Self {
Self {
client,
partition_key,
sort_key_condition: SortKeyCondition::All,
}
}
pub fn sort_key(mut self, sort_key: P::SortKey) -> Self {
self.sort_key_condition = SortKeyCondition::Exact(sort_key);
self
}
pub fn sort_key_begins_with(mut self, prefix: P::SortKey) -> Self {
self.sort_key_condition = SortKeyCondition::BeginsWith(prefix);
self
}
pub fn sort_key_between(mut self, from: P::SortKey, to: P::SortKey) -> Self {
self.sort_key_condition = SortKeyCondition::Between(from, to);
self
}
pub fn sort_key_less_than(mut self, value: P::SortKey) -> Self {
self.sort_key_condition = SortKeyCondition::LessThan(value);
self
}
pub fn sort_key_less_than_or_equal(mut self, value: P::SortKey) -> Self {
self.sort_key_condition = SortKeyCondition::LessThanOrEqual(value);
self
}
pub fn sort_key_greater_than(mut self, value: P::SortKey) -> Self {
self.sort_key_condition = SortKeyCondition::GreaterThan(value);
self
}
pub fn sort_key_greater_than_or_equal(mut self, value: P::SortKey) -> Self {
self.sort_key_condition = SortKeyCondition::GreaterThanOrEqual(value);
self
}
pub fn never(mut self) -> Self {
self.sort_key_condition = SortKeyCondition::Never;
self
}
pub fn wait_until<F>(self, condition: F) -> WaitingQueryBuilder<'a, P>
where
F: Fn(&QueryResults<P>) -> bool + Send + Sync + 'static,
{
WaitingQueryBuilder {
client: self.client,
partition_key: self.partition_key,
sort_key_condition: self.sort_key_condition,
wait_condition: Box::new(condition),
timeout: None,
}
}
pub async fn execute(self) -> QueryResults<P> {
match self.sort_key_condition {
SortKeyCondition::Exact(sort_key) => {
self
.client
.get_record_internal(self.partition_key, Some(sort_key))
.await
}
SortKeyCondition::BeginsWith(prefix) => {
self
.client
.prefix_internal(self.partition_key, prefix)
.await
}
SortKeyCondition::Between(from, to) => {
self
.client
.between_internal(self.partition_key, from, to)
.await
}
SortKeyCondition::LessThan(value) => {
self
.client
.less_than_internal(self.partition_key, value, false)
.await
}
SortKeyCondition::LessThanOrEqual(value) => {
self
.client
.less_than_internal(self.partition_key, value, true)
.await
}
SortKeyCondition::GreaterThan(value) => {
self
.client
.greater_than_internal(self.partition_key, value, false)
.await
}
SortKeyCondition::GreaterThanOrEqual(value) => {
self
.client
.greater_than_internal(self.partition_key, value, true)
.await
}
SortKeyCondition::All => {
self
.client
.get_record_internal(self.partition_key, None)
.await
}
SortKeyCondition::Never => {
QueryResults::new(Vec::new(), std::sync::Arc::clone(&self.client.db.cas))
}
}
}
}
pub struct WaitingQueryBuilder<'a, P: Partitions> {
client: &'a mut QueryClient<P>,
partition_key: Ident,
sort_key_condition: SortKeyCondition<P::SortKey>,
wait_condition: WaitCondition<P>,
timeout: Option<Duration>,
}
impl<'a, P: Partitions> WaitingQueryBuilder<'a, P> {
pub fn timeout(mut self, duration: Duration) -> Self {
self.timeout = Some(duration);
self
}
pub async fn execute(self) -> Result<QueryResults<P>, QueryError> {
let start = Instant::now();
let timeout = self.timeout.unwrap_or(Duration::from_secs(30));
loop {
let elapsed = start.elapsed();
if elapsed >= timeout {
return Err(QueryError::WaitTimeout { elapsed });
}
self.client.refresh_snapshot();
let results = match &self.sort_key_condition {
SortKeyCondition::Exact(sort_key) => {
self
.client
.get_record_internal(self.partition_key, Some(sort_key.clone()))
.await
}
SortKeyCondition::BeginsWith(prefix) => {
self
.client
.prefix_internal(self.partition_key, prefix.clone())
.await
}
SortKeyCondition::Between(from, to) => {
self
.client
.between_internal(self.partition_key, from.clone(), to.clone())
.await
}
SortKeyCondition::LessThan(value) => {
self
.client
.less_than_internal(self.partition_key, value.clone(), false)
.await
}
SortKeyCondition::LessThanOrEqual(value) => {
self
.client
.less_than_internal(self.partition_key, value.clone(), true)
.await
}
SortKeyCondition::GreaterThan(value) => {
self
.client
.greater_than_internal(self.partition_key, value.clone(), false)
.await
}
SortKeyCondition::GreaterThanOrEqual(value) => {
self
.client
.greater_than_internal(self.partition_key, value.clone(), true)
.await
}
SortKeyCondition::All => {
self
.client
.get_record_internal(self.partition_key, None)
.await
}
SortKeyCondition::Never => {
QueryResults::new(Vec::new(), std::sync::Arc::clone(&self.client.db.cas))
}
};
let condition_met = (self.wait_condition)(&results);
if condition_met {
return Ok(results);
}
poll_fn(|cx| {
self.client.db.register_partition_waker(
self.partition_key,
self.sort_key_condition.clone(),
cx.waker().clone(),
);
Poll::<()>::Pending
})
.await;
}
}
}
pub struct PartitionQueryBuilder<
'a,
P: Partitions,
Part: DynPartition + PartitionKey,
> {
inner: QueryBuilder<'a, P>,
_partition: PhantomData<Part>,
}
impl<'a, P: Partitions, Part: DynPartition + PartitionKey>
PartitionQueryBuilder<'a, P, Part>
{
pub(crate) fn new(client: &'a mut QueryClient<P>) -> Self {
Self {
inner: client.query_any(Part::KEY),
_partition: PhantomData,
}
}
fn wrap(key: Part::DynSortKey) -> Option<P::SortKey> {
P::wrap_dyn_sort_key(<Part as PartitionKey>::KEY, Box::new(key))
}
pub fn sort_key(mut self, key: Part::DynSortKey) -> Self {
self.inner = match Self::wrap(key) {
Some(k) => self.inner.sort_key(k),
None => self.inner.never(),
};
self
}
pub fn sort_key_begins_with(mut self, prefix: Part::DynSortKey) -> Self {
self.inner = match Self::wrap(prefix) {
Some(k) => self.inner.sort_key_begins_with(k),
None => self.inner.never(),
};
self
}
pub fn sort_key_between(
mut self,
from: Part::DynSortKey,
to: Part::DynSortKey,
) -> Self {
self.inner = match (Self::wrap(from), Self::wrap(to)) {
(Some(f), Some(t)) => self.inner.sort_key_between(f, t),
_ => self.inner.never(),
};
self
}
pub fn sort_key_less_than(mut self, value: Part::DynSortKey) -> Self {
self.inner = match Self::wrap(value) {
Some(k) => self.inner.sort_key_less_than(k),
None => self.inner.never(),
};
self
}
pub fn sort_key_less_than_or_equal(mut self, value: Part::DynSortKey) -> Self {
self.inner = match Self::wrap(value) {
Some(k) => self.inner.sort_key_less_than_or_equal(k),
None => self.inner.never(),
};
self
}
pub fn sort_key_greater_than(mut self, value: Part::DynSortKey) -> Self {
self.inner = match Self::wrap(value) {
Some(k) => self.inner.sort_key_greater_than(k),
None => self.inner.never(),
};
self
}
pub fn sort_key_greater_than_or_equal(
mut self,
value: Part::DynSortKey,
) -> Self {
self.inner = match Self::wrap(value) {
Some(k) => self.inner.sort_key_greater_than_or_equal(k),
None => self.inner.never(),
};
self
}
pub fn wait_until<F>(
self,
condition: F,
) -> PartitionWaitingQueryBuilder<'a, P, Part>
where
F: Fn(&QueryResults<P>) -> bool + Send + Sync + 'static,
{
PartitionWaitingQueryBuilder {
inner: self.inner.wait_until(condition),
_partition: PhantomData,
}
}
pub async fn execute(self) -> QueryResults<P> {
self.inner.execute().await
}
}
pub struct PartitionWaitingQueryBuilder<
'a,
P: Partitions,
Part: DynPartition + PartitionKey,
> {
inner: WaitingQueryBuilder<'a, P>,
_partition: PhantomData<Part>,
}
impl<'a, P: Partitions, Part: DynPartition + PartitionKey>
PartitionWaitingQueryBuilder<'a, P, Part>
{
pub fn timeout(mut self, duration: Duration) -> Self {
self.inner = self.inner.timeout(duration);
self
}
pub async fn execute(self) -> Result<QueryResults<P>, QueryError> {
self.inner.execute().await
}
}
pub struct TypedPartitionQueryBuilder<
'a,
P: Partitions,
Part: Partition + PartitionKey,
> {
inner: QueryBuilder<'a, P>,
_partition: PhantomData<Part>,
}
impl<'a, P: Partitions, Part: Partition + PartitionKey + crate::database::partitions::SortKeyOf<P>>
TypedPartitionQueryBuilder<'a, P, Part>
{
pub(crate) fn new(client: &'a mut QueryClient<P>) -> Self {
Self {
inner: client.query_any(Part::KEY),
_partition: PhantomData,
}
}
pub fn sort_key(mut self, key: Part::SortKey) -> Self {
self.inner = self.inner.sort_key(Part::wrap_sort_key(key));
self
}
pub fn sort_key_begins_with(mut self, prefix: Part::SortKey) -> Self {
self.inner = self.inner.sort_key_begins_with(Part::wrap_sort_key(prefix));
self
}
pub fn sort_key_between(mut self, from: Part::SortKey, to: Part::SortKey) -> Self {
self.inner = self.inner.sort_key_between(Part::wrap_sort_key(from), Part::wrap_sort_key(to));
self
}
pub fn sort_key_less_than(mut self, value: Part::SortKey) -> Self {
self.inner = self.inner.sort_key_less_than(Part::wrap_sort_key(value));
self
}
pub fn sort_key_less_than_or_equal(mut self, value: Part::SortKey) -> Self {
self.inner = self.inner.sort_key_less_than_or_equal(Part::wrap_sort_key(value));
self
}
pub fn sort_key_greater_than(mut self, value: Part::SortKey) -> Self {
self.inner = self.inner.sort_key_greater_than(Part::wrap_sort_key(value));
self
}
pub fn sort_key_greater_than_or_equal(mut self, value: Part::SortKey) -> Self {
self.inner = self.inner.sort_key_greater_than_or_equal(Part::wrap_sort_key(value));
self
}
pub fn wait_until<F>(
self,
condition: F,
) -> TypedPartitionWaitingQueryBuilder<'a, P, Part>
where
F: Fn(&QueryResults<P>) -> bool + Send + Sync + 'static,
{
TypedPartitionWaitingQueryBuilder {
inner: self.inner.wait_until(condition),
_partition: PhantomData,
}
}
pub async fn execute(self) -> QueryResults<P> {
self.inner.execute().await
}
}
pub struct TypedPartitionWaitingQueryBuilder<
'a,
P: Partitions,
Part: Partition + PartitionKey,
> {
inner: WaitingQueryBuilder<'a, P>,
_partition: PhantomData<Part>,
}
impl<'a, P: Partitions, Part: Partition + PartitionKey>
TypedPartitionWaitingQueryBuilder<'a, P, Part>
{
pub fn timeout(mut self, duration: Duration) -> Self {
self.inner = self.inner.timeout(duration);
self
}
pub async fn execute(self) -> Result<QueryResults<P>, QueryError> {
self.inner.execute().await
}
}
#[cfg(test)]
mod tests {
use super::SortKeyCondition;
#[test]
fn test_matches_all() {
assert!(SortKeyCondition::<String>::All.matches(&"anything".to_string()));
assert!(SortKeyCondition::<String>::All.matches(&String::new()));
}
#[test]
fn test_matches_exact() {
let cond = SortKeyCondition::Exact("foo".to_string());
assert!(cond.matches(&"foo".to_string()));
assert!(!cond.matches(&"bar".to_string()));
assert!(!cond.matches(&"foobar".to_string()));
}
#[test]
fn test_matches_begins_with() {
let cond = SortKeyCondition::BeginsWith("prefix".to_string());
assert!(cond.matches(&"prefix_foo".to_string()));
assert!(cond.matches(&"prefix".to_string()));
assert!(!cond.matches(&"other".to_string()));
}
#[test]
fn test_matches_between() {
let cond = SortKeyCondition::Between("b".to_string(), "d".to_string());
assert!(cond.matches(&"b".to_string()));
assert!(cond.matches(&"c".to_string()));
assert!(cond.matches(&"d".to_string()));
assert!(!cond.matches(&"a".to_string()));
assert!(!cond.matches(&"e".to_string()));
}
#[test]
fn test_matches_less_than() {
let cond = SortKeyCondition::LessThan("c".to_string());
assert!(cond.matches(&"a".to_string()));
assert!(cond.matches(&"b".to_string()));
assert!(!cond.matches(&"c".to_string()));
assert!(!cond.matches(&"d".to_string()));
}
#[test]
fn test_matches_less_than_or_equal() {
let cond = SortKeyCondition::LessThanOrEqual("c".to_string());
assert!(cond.matches(&"a".to_string()));
assert!(cond.matches(&"c".to_string()));
assert!(!cond.matches(&"d".to_string()));
}
#[test]
fn test_matches_greater_than() {
let cond = SortKeyCondition::GreaterThan("b".to_string());
assert!(!cond.matches(&"a".to_string()));
assert!(!cond.matches(&"b".to_string()));
assert!(cond.matches(&"c".to_string()));
}
#[test]
fn test_matches_greater_than_or_equal() {
let cond = SortKeyCondition::GreaterThanOrEqual("b".to_string());
assert!(!cond.matches(&"a".to_string()));
assert!(cond.matches(&"b".to_string()));
assert!(cond.matches(&"c".to_string()));
}
}