mod client;
use {
crate::{
Ident,
database::{
DynPartition,
Partition,
Partitions,
query_results::QueryResults,
},
prelude::PartitionKey,
},
std::{
future::poll_fn,
marker::PhantomData,
task::Poll,
time::{
Duration,
Instant,
},
},
thiserror::Error,
};
pub use client::QueryClient;
#[derive(Debug, Error)]
pub enum QueryError {
#[error("Query wait condition timed out after {elapsed:?}")]
WaitTimeout { elapsed: Duration },
#[error("Query cancelled: {reason}")]
Cancelled { reason: String },
}
type WaitCondition<P> = Box<dyn Fn(&QueryResults<P>) -> bool + Send + Sync>;
#[derive(Clone)]
pub enum SortKeyCondition {
Exact(String),
BeginsWith(String),
Between(String, String),
LessThan(String),
LessThanOrEqual(String),
GreaterThan(String),
GreaterThanOrEqual(String),
All,
}
impl SortKeyCondition {
pub(crate) fn matches(&self, sort_key: &str) -> bool {
match self {
Self::All => true,
Self::Exact(sk) => sort_key == sk,
Self::BeginsWith(prefix) => sort_key.starts_with(prefix.as_str()),
Self::Between(from, to) => sort_key >= from.as_str() && sort_key <= to.as_str(),
Self::LessThan(v) => sort_key < v.as_str(),
Self::LessThanOrEqual(v) => sort_key <= v.as_str(),
Self::GreaterThan(v) => sort_key > v.as_str(),
Self::GreaterThanOrEqual(v) => sort_key >= v.as_str(),
}
}
}
pub struct QueryBuilder<'a, P: Partitions> {
client: &'a mut QueryClient<P>,
partition_key: Ident,
sort_key_condition: SortKeyCondition,
}
impl<'a, P: Partitions> QueryBuilder<'a, P> {
pub(crate) fn new(client: &'a mut QueryClient<P>, partition_key: Ident) -> Self {
Self {
client,
partition_key,
sort_key_condition: SortKeyCondition::All,
}
}
pub fn sort_key<SK: std::fmt::Display>(mut self, sort_key: SK) -> Self {
self.sort_key_condition = SortKeyCondition::Exact(sort_key.to_string());
self
}
pub fn sort_key_begins_with<SK: std::fmt::Display>(
mut self,
prefix: SK,
) -> Self {
self.sort_key_condition = SortKeyCondition::BeginsWith(prefix.to_string());
self
}
pub fn sort_key_between<SK: std::fmt::Display, TOSK: std::fmt::Display>(
mut self,
from: SK,
to: TOSK,
) -> Self {
self.sort_key_condition =
SortKeyCondition::Between(from.to_string(), to.to_string());
self
}
pub fn sort_key_less_than<SK: std::fmt::Display>(
mut self,
value: SK,
) -> Self {
self.sort_key_condition = SortKeyCondition::LessThan(value.to_string());
self
}
pub fn sort_key_less_than_or_equal<SK: std::fmt::Display>(
mut self,
value: SK,
) -> Self {
self.sort_key_condition =
SortKeyCondition::LessThanOrEqual(value.to_string());
self
}
pub fn sort_key_greater_than<SK: std::fmt::Display>(
mut self,
value: SK,
) -> Self {
self.sort_key_condition = SortKeyCondition::GreaterThan(value.to_string());
self
}
pub fn sort_key_greater_than_or_equal<SK: std::fmt::Display>(
mut self,
value: SK,
) -> Self {
self.sort_key_condition =
SortKeyCondition::GreaterThanOrEqual(value.to_string());
self
}
pub fn wait_until<F>(self, condition: F) -> WaitingQueryBuilder<'a, P>
where
F: Fn(&QueryResults<P>) -> bool + Send + Sync + 'static,
{
WaitingQueryBuilder {
client: self.client,
partition_key: self.partition_key,
sort_key_condition: self.sort_key_condition,
wait_condition: Box::new(condition),
timeout: None,
}
}
pub async fn execute(self) -> QueryResults<P> {
match self.sort_key_condition {
SortKeyCondition::Exact(sort_key) => {
self
.client
.get_record_internal(self.partition_key, Some(sort_key))
.await
}
SortKeyCondition::BeginsWith(prefix) => {
self
.client
.prefix_internal(self.partition_key, prefix)
.await
}
SortKeyCondition::Between(from, to) => {
self
.client
.between_internal(self.partition_key, from, to)
.await
}
SortKeyCondition::LessThan(value) => {
self
.client
.less_than_internal(self.partition_key, value, false)
.await
}
SortKeyCondition::LessThanOrEqual(value) => {
self
.client
.less_than_internal(self.partition_key, value, true)
.await
}
SortKeyCondition::GreaterThan(value) => {
self
.client
.greater_than_internal(self.partition_key, value, false)
.await
}
SortKeyCondition::GreaterThanOrEqual(value) => {
self
.client
.greater_than_internal(self.partition_key, value, true)
.await
}
SortKeyCondition::All => {
self
.client
.get_record_internal::<String>(self.partition_key, None)
.await
}
}
}
}
pub struct WaitingQueryBuilder<'a, P: Partitions> {
client: &'a mut QueryClient<P>,
partition_key: Ident,
sort_key_condition: SortKeyCondition,
wait_condition: WaitCondition<P>,
timeout: Option<Duration>,
}
impl<'a, P: Partitions> WaitingQueryBuilder<'a, P> {
pub fn timeout(mut self, duration: Duration) -> Self {
self.timeout = Some(duration);
self
}
pub async fn execute(self) -> Result<QueryResults<P>, QueryError> {
let start = Instant::now();
let timeout = self.timeout.unwrap_or(Duration::from_secs(30));
loop {
let elapsed = start.elapsed();
if elapsed >= timeout {
return Err(QueryError::WaitTimeout { elapsed });
}
self.client.refresh_snapshot();
let results = match &self.sort_key_condition {
SortKeyCondition::Exact(sort_key) => {
self
.client
.get_record_internal(self.partition_key, Some(sort_key.clone()))
.await
}
SortKeyCondition::BeginsWith(prefix) => {
self
.client
.prefix_internal(self.partition_key, prefix.clone())
.await
}
SortKeyCondition::Between(from, to) => {
self
.client
.between_internal(self.partition_key, from.clone(), to.clone())
.await
}
SortKeyCondition::LessThan(value) => {
self
.client
.less_than_internal(self.partition_key, value.clone(), false)
.await
}
SortKeyCondition::LessThanOrEqual(value) => {
self
.client
.less_than_internal(self.partition_key, value.clone(), true)
.await
}
SortKeyCondition::GreaterThan(value) => {
self
.client
.greater_than_internal(self.partition_key, value.clone(), false)
.await
}
SortKeyCondition::GreaterThanOrEqual(value) => {
self
.client
.greater_than_internal(self.partition_key, value.clone(), true)
.await
}
SortKeyCondition::All => {
self
.client
.get_record_internal::<String>(self.partition_key, None)
.await
}
};
let condition_met = (self.wait_condition)(&results);
if condition_met {
return Ok(results);
}
poll_fn(|cx| {
self.client.db.register_partition_waker(
self.partition_key,
self.sort_key_condition.clone(),
cx.waker().clone(),
);
Poll::<()>::Pending
})
.await;
}
}
}
pub struct PartitionQueryBuilder<
'a,
P: Partitions,
Part: DynPartition + PartitionKey,
> {
inner: QueryBuilder<'a, P>,
_partition: PhantomData<Part>,
}
impl<'a, P: Partitions, Part: DynPartition + PartitionKey>
PartitionQueryBuilder<'a, P, Part>
{
pub(crate) fn new(client: &'a mut QueryClient<P>) -> Self {
Self {
inner: client.query_any(Part::KEY),
_partition: PhantomData,
}
}
pub fn sort_key(mut self, key: Part::DynSortKey) -> Self {
self.inner = self.inner.sort_key(key);
self
}
pub fn sort_key_begins_with(mut self, prefix: Part::DynSortKey) -> Self {
self.inner = self.inner.sort_key_begins_with(prefix.to_string());
self
}
pub fn sort_key_between(
mut self,
from: Part::DynSortKey,
to: Part::DynSortKey,
) -> Self {
self.inner = self.inner.sort_key_between(from, to);
self
}
pub fn sort_key_less_than(mut self, value: Part::DynSortKey) -> Self {
self.inner = self.inner.sort_key_less_than(value);
self
}
pub fn sort_key_less_than_or_equal(mut self, value: Part::DynSortKey) -> Self {
self.inner = self.inner.sort_key_less_than_or_equal(value);
self
}
pub fn sort_key_greater_than(mut self, value: Part::DynSortKey) -> Self {
self.inner = self.inner.sort_key_greater_than(value);
self
}
pub fn sort_key_greater_than_or_equal(
mut self,
value: Part::DynSortKey,
) -> Self {
self.inner = self.inner.sort_key_greater_than_or_equal(value);
self
}
pub fn wait_until<F>(
self,
condition: F,
) -> PartitionWaitingQueryBuilder<'a, P, Part>
where
F: Fn(&QueryResults<P>) -> bool + Send + Sync + 'static,
{
PartitionWaitingQueryBuilder {
inner: self.inner.wait_until(condition),
_partition: PhantomData,
}
}
pub async fn execute(self) -> QueryResults<P> {
self.inner.execute().await
}
}
pub struct PartitionWaitingQueryBuilder<
'a,
P: Partitions,
Part: DynPartition + PartitionKey,
> {
inner: WaitingQueryBuilder<'a, P>,
_partition: PhantomData<Part>,
}
impl<'a, P: Partitions, Part: DynPartition + PartitionKey>
PartitionWaitingQueryBuilder<'a, P, Part>
{
pub fn timeout(mut self, duration: Duration) -> Self {
self.inner = self.inner.timeout(duration);
self
}
pub async fn execute(self) -> Result<QueryResults<P>, QueryError> {
self.inner.execute().await
}
}
pub struct TypedPartitionQueryBuilder<
'a,
P: Partitions,
Part: Partition + PartitionKey,
> {
inner: QueryBuilder<'a, P>,
_partition: PhantomData<Part>,
}
impl<'a, P: Partitions, Part: Partition + PartitionKey>
TypedPartitionQueryBuilder<'a, P, Part>
{
pub(crate) fn new(client: &'a mut QueryClient<P>) -> Self {
Self {
inner: client.query_any(Part::KEY),
_partition: PhantomData,
}
}
pub fn sort_key(mut self, key: Part::SortKey) -> Self {
self.inner = self.inner.sort_key(key);
self
}
pub fn sort_key_begins_with(mut self, prefix: Part::SortKey) -> Self {
self.inner = self.inner.sort_key_begins_with(prefix.to_string());
self
}
pub fn sort_key_between(mut self, from: Part::SortKey, to: Part::SortKey) -> Self {
self.inner = self.inner.sort_key_between(from, to);
self
}
pub fn sort_key_less_than(mut self, value: Part::SortKey) -> Self {
self.inner = self.inner.sort_key_less_than(value);
self
}
pub fn sort_key_less_than_or_equal(mut self, value: Part::SortKey) -> Self {
self.inner = self.inner.sort_key_less_than_or_equal(value);
self
}
pub fn sort_key_greater_than(mut self, value: Part::SortKey) -> Self {
self.inner = self.inner.sort_key_greater_than(value);
self
}
pub fn sort_key_greater_than_or_equal(mut self, value: Part::SortKey) -> Self {
self.inner = self.inner.sort_key_greater_than_or_equal(value);
self
}
pub fn wait_until<F>(
self,
condition: F,
) -> TypedPartitionWaitingQueryBuilder<'a, P, Part>
where
F: Fn(&QueryResults<P>) -> bool + Send + Sync + 'static,
{
TypedPartitionWaitingQueryBuilder {
inner: self.inner.wait_until(condition),
_partition: PhantomData,
}
}
pub async fn execute(self) -> QueryResults<P> {
self.inner.execute().await
}
}
pub struct TypedPartitionWaitingQueryBuilder<
'a,
P: Partitions,
Part: Partition + PartitionKey,
> {
inner: WaitingQueryBuilder<'a, P>,
_partition: PhantomData<Part>,
}
impl<'a, P: Partitions, Part: Partition + PartitionKey>
TypedPartitionWaitingQueryBuilder<'a, P, Part>
{
pub fn timeout(mut self, duration: Duration) -> Self {
self.inner = self.inner.timeout(duration);
self
}
pub async fn execute(self) -> Result<QueryResults<P>, QueryError> {
self.inner.execute().await
}
}
#[cfg(test)]
mod tests {
use super::SortKeyCondition;
#[test]
fn test_matches_all() {
assert!(SortKeyCondition::All.matches("anything"));
assert!(SortKeyCondition::All.matches(""));
}
#[test]
fn test_matches_exact() {
let cond = SortKeyCondition::Exact("foo".to_string());
assert!(cond.matches("foo"));
assert!(!cond.matches("bar"));
assert!(!cond.matches("foobar"));
}
#[test]
fn test_matches_begins_with() {
let cond = SortKeyCondition::BeginsWith("prefix".to_string());
assert!(cond.matches("prefix_foo"));
assert!(cond.matches("prefix"));
assert!(!cond.matches("other"));
}
#[test]
fn test_matches_between() {
let cond = SortKeyCondition::Between("b".to_string(), "d".to_string());
assert!(cond.matches("b"));
assert!(cond.matches("c"));
assert!(cond.matches("d"));
assert!(!cond.matches("a"));
assert!(!cond.matches("e"));
}
#[test]
fn test_matches_less_than() {
let cond = SortKeyCondition::LessThan("c".to_string());
assert!(cond.matches("a"));
assert!(cond.matches("b"));
assert!(!cond.matches("c"));
assert!(!cond.matches("d"));
}
#[test]
fn test_matches_less_than_or_equal() {
let cond = SortKeyCondition::LessThanOrEqual("c".to_string());
assert!(cond.matches("a"));
assert!(cond.matches("c"));
assert!(!cond.matches("d"));
}
#[test]
fn test_matches_greater_than() {
let cond = SortKeyCondition::GreaterThan("b".to_string());
assert!(!cond.matches("a"));
assert!(!cond.matches("b"));
assert!(cond.matches("c"));
}
#[test]
fn test_matches_greater_than_or_equal() {
let cond = SortKeyCondition::GreaterThanOrEqual("b".to_string());
assert!(!cond.matches("a"));
assert!(cond.matches("b"));
assert!(cond.matches("c"));
}
}