use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use query_flow::{AssetKey, Db, DurabilityLevel, Query, QueryError, QueryRuntime};
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct DoubleQuery {
value: i32,
}
impl Query for DoubleQuery {
type Output = i32;
fn query(self, _db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
Ok(Arc::new(self.value * 2))
}
fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
old == new
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct ConfigFile(String);
impl AssetKey for ConfigFile {
type Asset = String;
fn asset_eq(old: &Self::Asset, new: &Self::Asset) -> bool {
old == new
}
}
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
struct ListConfigsQuery;
impl Query for ListConfigsQuery {
type Output = Vec<String>;
fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
let keys = db.list_asset_keys::<ConfigFile>();
let mut names: Vec<String> = keys.iter().map(|k| k.0.clone()).collect();
names.sort();
Ok(Arc::new(names))
}
fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
old == new
}
}
#[test]
fn test_list_queries_basic() {
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
struct AggregateQuery;
impl Query for AggregateQuery {
type Output = Vec<i32>;
fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
let queries = db.list_queries::<DoubleQuery>();
let mut results = Vec::new();
for q in queries {
results.push(*db.query(q)?);
}
results.sort();
Ok(Arc::new(results))
}
fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
old == new
}
}
let runtime = QueryRuntime::new();
runtime.query(DoubleQuery { value: 1 }).unwrap();
runtime.query(DoubleQuery { value: 2 }).unwrap();
runtime.query(DoubleQuery { value: 3 }).unwrap();
let result = runtime.query(AggregateQuery).unwrap();
assert_eq!(*result, vec![2, 4, 6]);
}
#[test]
fn test_list_queries_invalidation_on_add() {
static AGGREGATE_COUNT: AtomicU32 = AtomicU32::new(0);
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
struct TrackedAggregateQuery;
impl Query for TrackedAggregateQuery {
type Output = Vec<i32>;
fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
AGGREGATE_COUNT.fetch_add(1, Ordering::SeqCst);
let queries = db.list_queries::<DoubleQuery>();
let mut results = Vec::new();
for q in queries {
results.push(*db.query(q)?);
}
results.sort();
Ok(Arc::new(results))
}
fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
old == new
}
}
let runtime = QueryRuntime::new();
AGGREGATE_COUNT.store(0, Ordering::SeqCst);
runtime.query(DoubleQuery { value: 1 }).unwrap();
runtime.query(DoubleQuery { value: 2 }).unwrap();
let result = runtime.query(TrackedAggregateQuery).unwrap();
assert_eq!(*result, vec![2, 4]);
assert_eq!(AGGREGATE_COUNT.load(Ordering::SeqCst), 1);
let result = runtime.query(TrackedAggregateQuery).unwrap();
assert_eq!(*result, vec![2, 4]);
assert_eq!(AGGREGATE_COUNT.load(Ordering::SeqCst), 1);
runtime.query(DoubleQuery { value: 3 }).unwrap();
let result = runtime.query(TrackedAggregateQuery).unwrap();
assert_eq!(*result, vec![2, 4, 6]);
assert_eq!(AGGREGATE_COUNT.load(Ordering::SeqCst), 2);
}
#[test]
fn test_list_queries_no_invalidation_on_value_change() {
static AGGREGATE_COUNT: AtomicU32 = AtomicU32::new(0);
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
struct ListOnlyQuery;
impl Query for ListOnlyQuery {
type Output = usize;
fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
AGGREGATE_COUNT.fetch_add(1, Ordering::SeqCst);
let queries = db.list_queries::<DoubleQuery>();
Ok(Arc::new(queries.len()))
}
fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
old == new
}
}
let runtime = QueryRuntime::new();
AGGREGATE_COUNT.store(0, Ordering::SeqCst);
runtime.query(DoubleQuery { value: 1 }).unwrap();
let result = runtime.query(ListOnlyQuery).unwrap();
assert_eq!(*result, 1);
assert_eq!(AGGREGATE_COUNT.load(Ordering::SeqCst), 1);
runtime.invalidate(&DoubleQuery { value: 1 });
let result = runtime.query(ListOnlyQuery).unwrap();
assert_eq!(*result, 1);
assert_eq!(AGGREGATE_COUNT.load(Ordering::SeqCst), 1);
}
#[test]
fn test_list_queries_individual_dependency() {
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
struct AggregateQuery2;
impl Query for AggregateQuery2 {
type Output = Vec<i32>;
fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
let queries = db.list_queries::<DoubleQuery>();
let mut results = Vec::new();
for q in queries {
results.push(*db.query(q)?);
}
results.sort();
Ok(Arc::new(results))
}
fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
old == new
}
}
let runtime = QueryRuntime::new();
runtime.query(DoubleQuery { value: 1 }).unwrap();
runtime.query(DoubleQuery { value: 2 }).unwrap();
let result = runtime.query(AggregateQuery2).unwrap();
assert_eq!(*result, vec![2, 4]);
runtime.invalidate(&DoubleQuery { value: 1 });
runtime.query(DoubleQuery { value: 1 }).unwrap();
let result = runtime.query(AggregateQuery2).unwrap();
assert_eq!(*result, vec![2, 4]);
}
#[test]
fn test_list_asset_keys_basic() {
let runtime = QueryRuntime::new();
runtime.resolve_asset(
ConfigFile("app.json".to_string()),
"{}".to_string(),
DurabilityLevel::Volatile,
);
runtime.resolve_asset(
ConfigFile("db.json".to_string()),
"{}".to_string(),
DurabilityLevel::Volatile,
);
let result = runtime.query(ListConfigsQuery).unwrap();
assert_eq!(*result, vec!["app.json", "db.json"]);
}
#[test]
fn test_list_asset_keys_invalidation_on_remove() {
static LIST_COUNT: AtomicU32 = AtomicU32::new(0);
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
struct TrackedListConfigs;
impl Query for TrackedListConfigs {
type Output = Vec<String>;
fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
LIST_COUNT.fetch_add(1, Ordering::SeqCst);
let keys = db.list_asset_keys::<ConfigFile>();
let mut names: Vec<String> = keys.iter().map(|k| k.0.clone()).collect();
names.sort();
Ok(Arc::new(names))
}
fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
old == new
}
}
let runtime = QueryRuntime::new();
LIST_COUNT.store(0, Ordering::SeqCst);
runtime.resolve_asset(
ConfigFile("app.json".to_string()),
"{}".to_string(),
DurabilityLevel::Volatile,
);
runtime.resolve_asset(
ConfigFile("db.json".to_string()),
"{}".to_string(),
DurabilityLevel::Volatile,
);
let result = runtime.query(TrackedListConfigs).unwrap();
assert_eq!(*result, vec!["app.json", "db.json"]);
assert_eq!(LIST_COUNT.load(Ordering::SeqCst), 1);
let result = runtime.query(TrackedListConfigs).unwrap();
assert_eq!(*result, vec!["app.json", "db.json"]);
assert_eq!(LIST_COUNT.load(Ordering::SeqCst), 1);
runtime.remove_asset(&ConfigFile("db.json".to_string()));
let result = runtime.query(TrackedListConfigs).unwrap();
assert_eq!(*result, vec!["app.json"]);
assert_eq!(LIST_COUNT.load(Ordering::SeqCst), 2);
}
#[test]
fn test_list_asset_keys_no_invalidation_on_value_change() {
static LIST_COUNT: AtomicU32 = AtomicU32::new(0);
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
struct TrackedListConfigs2;
impl Query for TrackedListConfigs2 {
type Output = Vec<String>;
fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
LIST_COUNT.fetch_add(1, Ordering::SeqCst);
let keys = db.list_asset_keys::<ConfigFile>();
let mut names: Vec<String> = keys.iter().map(|k| k.0.clone()).collect();
names.sort();
Ok(Arc::new(names))
}
fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
old == new
}
}
let runtime = QueryRuntime::new();
LIST_COUNT.store(0, Ordering::SeqCst);
runtime.resolve_asset(
ConfigFile("app.json".to_string()),
"v1".to_string(),
DurabilityLevel::Volatile,
);
let result = runtime.query(TrackedListConfigs2).unwrap();
assert_eq!(*result, vec!["app.json"]);
assert_eq!(LIST_COUNT.load(Ordering::SeqCst), 1);
runtime.resolve_asset(
ConfigFile("app.json".to_string()),
"v2".to_string(),
DurabilityLevel::Volatile,
);
let result = runtime.query(TrackedListConfigs2).unwrap();
assert_eq!(*result, vec!["app.json"]);
assert_eq!(LIST_COUNT.load(Ordering::SeqCst), 1);
}
#[test]
fn test_list_asset_keys_with_individual_asset_dependency() {
static CONTENT_COUNT: AtomicU32 = AtomicU32::new(0);
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
struct AllConfigContents;
impl Query for AllConfigContents {
type Output = Vec<(String, String)>;
fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
CONTENT_COUNT.fetch_add(1, Ordering::SeqCst);
let keys = db.list_asset_keys::<ConfigFile>();
let mut results = Vec::new();
for key in keys {
let key_name = key.0.clone();
if let Some(content) = db.asset_state(key)?.get() {
results.push((key_name, (**content).clone()));
}
}
results.sort();
Ok(Arc::new(results))
}
fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
old == new
}
}
let runtime = QueryRuntime::new();
CONTENT_COUNT.store(0, Ordering::SeqCst);
runtime.resolve_asset(
ConfigFile("app.json".to_string()),
"v1".to_string(),
DurabilityLevel::Volatile,
);
let result = runtime.query(AllConfigContents).unwrap();
assert_eq!(*result, vec![("app.json".to_string(), "v1".to_string())]);
assert_eq!(CONTENT_COUNT.load(Ordering::SeqCst), 1);
runtime.resolve_asset(
ConfigFile("app.json".to_string()),
"v2".to_string(),
DurabilityLevel::Volatile,
);
let result = runtime.query(AllConfigContents).unwrap();
assert_eq!(*result, vec![("app.json".to_string(), "v2".to_string())]);
assert_eq!(CONTENT_COUNT.load(Ordering::SeqCst), 2);
}
#[test]
fn test_list_queries_empty() {
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
struct AggregateQuery3;
impl Query for AggregateQuery3 {
type Output = Vec<i32>;
fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
let queries = db.list_queries::<DoubleQuery>();
let mut results = Vec::new();
for q in queries {
results.push(*db.query(q)?);
}
results.sort();
Ok(Arc::new(results))
}
fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
old == new
}
}
let runtime = QueryRuntime::new();
let result = runtime.query(AggregateQuery3).unwrap();
assert_eq!(*result, Vec::<i32>::new());
}
#[test]
fn test_list_asset_keys_empty() {
let runtime = QueryRuntime::new();
let result = runtime.query(ListConfigsQuery).unwrap();
assert_eq!(*result, Vec::<String>::new());
}
#[test]
fn test_list_asset_keys_invalidation_on_add() {
static LIST_COUNT: AtomicU32 = AtomicU32::new(0);
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
struct TrackedListConfigs3;
impl Query for TrackedListConfigs3 {
type Output = Vec<String>;
fn query(self, db: &impl Db) -> Result<Arc<Self::Output>, QueryError> {
LIST_COUNT.fetch_add(1, Ordering::SeqCst);
let keys = db.list_asset_keys::<ConfigFile>();
let mut names: Vec<String> = keys.iter().map(|k| k.0.clone()).collect();
names.sort();
Ok(Arc::new(names))
}
fn output_eq(old: &Self::Output, new: &Self::Output) -> bool {
old == new
}
}
let runtime = QueryRuntime::new();
LIST_COUNT.store(0, Ordering::SeqCst);
runtime.resolve_asset(
ConfigFile("app.json".to_string()),
"{}".to_string(),
DurabilityLevel::Volatile,
);
let result = runtime.query(TrackedListConfigs3).unwrap();
assert_eq!(*result, vec!["app.json"]);
assert_eq!(LIST_COUNT.load(Ordering::SeqCst), 1);
runtime.resolve_asset(
ConfigFile("db.json".to_string()),
"{}".to_string(),
DurabilityLevel::Volatile,
);
let result = runtime.query(TrackedListConfigs3).unwrap();
assert_eq!(*result, vec!["app.json", "db.json"]);
assert_eq!(LIST_COUNT.load(Ordering::SeqCst), 2);
}