use core::fmt;
use std::{
cell::{
Ref,
RefCell,
},
collections::HashMap,
future::Future,
hash::Hash,
mem,
rc::Rc,
time::{
Duration,
Instant,
},
};
use async_io::Timer;
use freya_core::{
integration::FxHashSet,
lifecycle::context::{
consume_context,
provide_context_for_scope_id,
try_consume_context,
},
prelude::*,
scope_id::ScopeId,
};
use futures_util::stream::{
FuturesUnordered,
StreamExt,
};
pub trait QueryCapability
where
Self: 'static + Clone + PartialEq + Hash + Eq,
{
type Ok;
type Err;
type Keys: Hash + PartialEq + Clone;
fn run(&self, keys: &Self::Keys) -> impl Future<Output = Result<Self::Ok, Self::Err>>;
fn matches(&self, _keys: &Self::Keys) -> bool {
true
}
}
pub enum QueryStateData<Q: QueryCapability> {
Pending,
Loading { res: Option<Result<Q::Ok, Q::Err>> },
Settled {
res: Result<Q::Ok, Q::Err>,
settlement_instant: Instant,
},
}
impl<Q: QueryCapability> TryFrom<QueryStateData<Q>> for Result<Q::Ok, Q::Err> {
type Error = ();
fn try_from(value: QueryStateData<Q>) -> Result<Self, Self::Error> {
match value {
QueryStateData::Loading { res: Some(res) } => Ok(res),
QueryStateData::Settled { res, .. } => Ok(res),
_ => Err(()),
}
}
}
impl<Q> fmt::Debug for QueryStateData<Q>
where
Q: QueryCapability,
Q::Ok: fmt::Debug,
Q::Err: fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Pending => f.write_str("Pending"),
Self::Loading { res } => write!(f, "Loading {{ {res:?} }}"),
Self::Settled { res, .. } => write!(f, "Settled {{ {res:?} }}"),
}
}
}
impl<Q: QueryCapability> QueryStateData<Q> {
pub fn is_ok(&self) -> bool {
matches!(self, QueryStateData::Settled { res: Ok(_), .. })
}
pub fn is_err(&self) -> bool {
matches!(self, QueryStateData::Settled { res: Err(_), .. })
}
pub fn is_loading(&self) -> bool {
matches!(self, QueryStateData::Loading { .. })
}
pub fn is_pending(&self) -> bool {
matches!(self, QueryStateData::Pending)
}
pub fn is_stale(&self, query: &Query<Q>) -> bool {
match self {
QueryStateData::Pending => true,
QueryStateData::Loading { .. } => true,
QueryStateData::Settled {
settlement_instant, ..
} => Instant::now().duration_since(*settlement_instant) >= query.stale_time,
}
}
pub fn ok(&self) -> Option<&Q::Ok> {
match self {
Self::Settled { res: Ok(res), .. } => Some(res),
Self::Loading { res: Some(Ok(res)) } => Some(res),
_ => None,
}
}
pub fn unwrap(&self) -> &Result<Q::Ok, Q::Err> {
match self {
Self::Loading { res: Some(v) } => v,
Self::Settled { res, .. } => res,
_ => unreachable!(),
}
}
fn into_loading(self) -> QueryStateData<Q> {
match self {
QueryStateData::Pending => QueryStateData::Loading { res: None },
QueryStateData::Loading { res } => QueryStateData::Loading { res },
QueryStateData::Settled { res, .. } => QueryStateData::Loading { res: Some(res) },
}
}
}
pub struct QueriesStorage<Q: QueryCapability> {
storage: State<HashMap<Query<Q>, QueryData<Q>>>,
}
impl<Q: QueryCapability> Copy for QueriesStorage<Q> {}
impl<Q: QueryCapability> Clone for QueriesStorage<Q> {
fn clone(&self) -> Self {
*self
}
}
pub struct QueryData<Q: QueryCapability> {
state: Rc<RefCell<QueryStateData<Q>>>,
reactive_contexts: Rc<RefCell<FxHashSet<ReactiveContext>>>,
interval_task: Rc<RefCell<Option<(Duration, TaskHandle)>>>,
clean_task: Rc<RefCell<Option<TaskHandle>>>,
}
impl<Q: QueryCapability> Clone for QueryData<Q> {
fn clone(&self) -> Self {
Self {
state: self.state.clone(),
reactive_contexts: self.reactive_contexts.clone(),
interval_task: self.interval_task.clone(),
clean_task: self.clean_task.clone(),
}
}
}
impl<Q: QueryCapability> QueriesStorage<Q> {
fn new_in_root() -> Self {
Self {
storage: State::create_global(HashMap::default()),
}
}
fn insert_or_get_query(&mut self, query: Query<Q>) -> QueryData<Q> {
let query_clone = query.clone();
let mut storage = self.storage.write_unchecked();
let query_data = storage.entry(query).or_insert_with(|| QueryData {
state: Rc::new(RefCell::new(QueryStateData::Pending)),
reactive_contexts: Rc::new(RefCell::new(FxHashSet::default())),
interval_task: Rc::default(),
clean_task: Rc::default(),
});
let query_data_clone = query_data.clone();
if let Some(clean_task) = query_data.clean_task.take() {
clean_task.cancel();
}
let interval = query_clone.interval_time;
let interval_enabled = query_clone.interval_time != Duration::MAX;
let interval_task = &mut *query_data.interval_task.borrow_mut();
let create_interval_task = match interval_task {
None if interval_enabled => true,
Some((current_interval, current_interval_task)) if interval_enabled => {
let new_interval_is_shorter = *current_interval > interval;
if new_interval_is_shorter {
current_interval_task.cancel();
*interval_task = None;
}
new_interval_is_shorter
}
_ => false,
};
if create_interval_task {
let task = spawn_forever(async move {
loop {
Timer::after(interval).await;
QueriesStorage::<Q>::run_queries(&[(&query_clone, &query_data_clone)]).await;
}
});
*interval_task = Some((interval, task));
}
query_data.clone()
}
fn update_tasks(&mut self, query: Query<Q>) {
let storage_clone = self.storage;
let mut storage = self.storage.write_unchecked();
let query_data = storage.get_mut(&query).unwrap();
if let Some((_, interval_task)) = query_data.interval_task.take() {
interval_task.cancel();
}
if query_data.reactive_contexts.borrow().len() == 1 {
*query_data.clean_task.borrow_mut() = Some(spawn_forever(async move {
Timer::after(query.clean_time).await;
let mut storage = storage_clone.write_unchecked();
storage.remove(&query);
}));
}
}
pub async fn get(get_query: GetQuery<Q>) -> QueryReader<Q> {
let query: Query<Q> = get_query.into();
let mut storage = match try_consume_context::<QueriesStorage<Q>>() {
Some(storage) => storage,
None => {
provide_context_for_scope_id(
QueriesStorage::<Q>::new_in_root(),
Some(ScopeId::ROOT),
);
try_consume_context::<QueriesStorage<Q>>().unwrap()
}
};
let mut map = storage.storage.write();
let query_data = map
.entry(query.clone())
.or_insert_with(|| QueryData {
state: Rc::new(RefCell::new(QueryStateData::Pending)),
reactive_contexts: Rc::new(RefCell::new(FxHashSet::default())),
interval_task: Rc::default(),
clean_task: Rc::default(),
})
.clone();
if query_data.state.borrow().is_stale(&query) {
let res = mem::replace(&mut *query_data.state.borrow_mut(), QueryStateData::Pending)
.into_loading();
*query_data.state.borrow_mut() = res;
for reactive_context in query_data.reactive_contexts.borrow().iter() {
reactive_context.notify();
}
let res = query.query.run(&query.keys).await;
*query_data.state.borrow_mut() = QueryStateData::Settled {
res,
settlement_instant: Instant::now(),
};
for reactive_context in query_data.reactive_contexts.borrow().iter() {
reactive_context.notify();
}
}
if query_data.reactive_contexts.borrow().is_empty() {
*query_data.clean_task.borrow_mut() = Some(spawn_forever(async move {
Timer::after(query.clean_time).await;
let mut storage = storage.storage.write_unchecked();
storage.remove(&query);
}));
}
QueryReader {
state: query_data.state,
}
}
pub async fn invalidate_all() {
let storage = consume_context::<QueriesStorage<Q>>();
storage.inner_invalidate_all().await;
}
pub async fn try_invalidate_all() {
let Some(storage) = try_consume_context::<QueriesStorage<Q>>() else {
return;
};
storage.inner_invalidate_all().await;
}
async fn inner_invalidate_all(self) {
let matching_queries = self.storage.read().clone().into_iter().collect::<Vec<_>>();
let matching_queries = matching_queries
.iter()
.map(|(q, d)| (q, d))
.collect::<Vec<_>>();
Self::run_queries(&matching_queries).await
}
pub async fn invalidate_matching(matching_keys: Q::Keys) {
let storage = consume_context::<QueriesStorage<Q>>();
storage.inner_invalidate_matching(matching_keys).await;
}
pub async fn try_invalidate_matching(matching_keys: Q::Keys) {
let Some(storage) = try_consume_context::<QueriesStorage<Q>>() else {
return;
};
storage.inner_invalidate_matching(matching_keys).await;
}
async fn inner_invalidate_matching(self, matching_keys: Q::Keys) {
let mut matching_queries = Vec::new();
for (query, data) in self.storage.read().iter() {
if query.query.matches(&matching_keys) {
matching_queries.push((query.clone(), data.clone()));
}
}
let matching_queries = matching_queries
.iter()
.map(|(q, d)| (q, d))
.collect::<Vec<_>>();
Self::run_queries(&matching_queries).await
}
async fn run_queries(queries: &[(&Query<Q>, &QueryData<Q>)]) {
let tasks = FuturesUnordered::new();
for (query, query_data) in queries {
let res = mem::replace(&mut *query_data.state.borrow_mut(), QueryStateData::Pending)
.into_loading();
*query_data.state.borrow_mut() = res;
for reactive_context in query_data.reactive_contexts.borrow().iter() {
reactive_context.notify();
}
tasks.push(Box::pin(async move {
let res = query.query.run(&query.keys).await;
*query_data.state.borrow_mut() = QueryStateData::Settled {
res,
settlement_instant: Instant::now(),
};
for reactive_context in query_data.reactive_contexts.borrow().iter() {
reactive_context.notify();
}
}));
}
tasks.count().await;
}
}
pub struct GetQuery<Q: QueryCapability> {
query: Q,
keys: Q::Keys,
stale_time: Duration,
clean_time: Duration,
}
impl<Q: QueryCapability> GetQuery<Q> {
pub fn new(keys: Q::Keys, query: Q) -> Self {
Self {
query,
keys,
stale_time: Duration::ZERO,
clean_time: Duration::ZERO,
}
}
pub fn stale_time(self, stale_time: Duration) -> Self {
Self { stale_time, ..self }
}
pub fn clean_time(self, clean_time: Duration) -> Self {
Self { clean_time, ..self }
}
}
impl<Q: QueryCapability> From<GetQuery<Q>> for Query<Q> {
fn from(value: GetQuery<Q>) -> Self {
Query {
query: value.query,
keys: value.keys,
enabled: true,
stale_time: value.stale_time,
clean_time: value.clean_time,
interval_time: Duration::MAX,
}
}
}
#[derive(PartialEq, Clone)]
pub struct Query<Q: QueryCapability> {
query: Q,
keys: Q::Keys,
enabled: bool,
stale_time: Duration,
clean_time: Duration,
interval_time: Duration,
}
impl<Q: QueryCapability> Eq for Query<Q> {}
impl<Q: QueryCapability> Hash for Query<Q> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.query.hash(state);
self.keys.hash(state);
self.enabled.hash(state);
self.stale_time.hash(state);
self.clean_time.hash(state);
}
}
impl<Q: QueryCapability> Query<Q> {
pub fn new(keys: Q::Keys, query: Q) -> Self {
Self {
query,
keys,
enabled: true,
stale_time: Duration::ZERO,
clean_time: Duration::from_secs(5 * 60),
interval_time: Duration::MAX,
}
}
pub fn enable(self, enabled: bool) -> Self {
Self { enabled, ..self }
}
pub fn stale_time(self, stale_time: Duration) -> Self {
Self { stale_time, ..self }
}
pub fn clean_time(self, clean_time: Duration) -> Self {
Self { clean_time, ..self }
}
pub fn interval_time(self, interval_time: Duration) -> Self {
Self {
interval_time,
..self
}
}
}
pub struct QueryReader<Q: QueryCapability> {
state: Rc<RefCell<QueryStateData<Q>>>,
}
impl<Q: QueryCapability> QueryReader<Q> {
pub fn state(&'_ self) -> Ref<'_, QueryStateData<Q>> {
self.state.borrow()
}
pub fn as_settled(&'_ self) -> Ref<'_, Result<Q::Ok, Q::Err>> {
Ref::map(self.state.borrow(), |state| match state {
QueryStateData::Settled { res, .. } => res,
_ => panic!("Query is not settled."),
})
}
}
pub struct UseQuery<Q: QueryCapability> {
query: State<Query<Q>>,
}
impl<Q: QueryCapability> Clone for UseQuery<Q> {
fn clone(&self) -> Self {
*self
}
}
impl<Q: QueryCapability> Copy for UseQuery<Q> {}
impl<Q: QueryCapability> UseQuery<Q> {
pub fn read(&self) -> QueryReader<Q> {
let storage = consume_context::<QueriesStorage<Q>>();
let map = storage.storage.peek();
let query_data = map.get(&self.query.peek()).cloned().unwrap();
if let Some(mut reactive_context) = ReactiveContext::try_current() {
reactive_context.subscribe(&query_data.reactive_contexts);
}
QueryReader {
state: query_data.state,
}
}
pub fn peek(&self) -> QueryReader<Q> {
let storage = consume_context::<QueriesStorage<Q>>();
let map = storage.storage.peek();
let query_data = map.get(&self.query.peek()).cloned().unwrap();
QueryReader {
state: query_data.state,
}
}
pub async fn invalidate_async(&self) -> QueryReader<Q> {
let storage = consume_context::<QueriesStorage<Q>>();
let query = self.query.peek().clone();
let map = storage.storage.peek();
let query_data = map.get(&query).cloned().unwrap();
QueriesStorage::run_queries(&[(&query, &query_data)]).await;
QueryReader {
state: query_data.state.clone(),
}
}
pub fn invalidate(&self) {
let storage = consume_context::<QueriesStorage<Q>>();
let query = self.query.peek().clone();
let map = storage.storage.peek();
let query_data = map.get(&query).cloned().unwrap();
spawn_forever(async move { QueriesStorage::run_queries(&[(&query, &query_data)]).await });
}
}
pub fn use_query<Q: QueryCapability>(query: Query<Q>) -> UseQuery<Q> {
let mut storage = match try_consume_context::<QueriesStorage<Q>>() {
Some(storage) => storage,
None => {
provide_context_for_scope_id(QueriesStorage::<Q>::new_in_root(), Some(ScopeId::ROOT));
try_consume_context::<QueriesStorage<Q>>().unwrap()
}
};
let mut make_query = |query: &Query<Q>, mut prev_query: Option<Query<Q>>| {
let query_data = storage.insert_or_get_query(query.clone());
if let Some(prev_query) = prev_query.take() {
storage.update_tasks(prev_query);
}
if query.enabled && query_data.state.borrow().is_stale(query) {
let query = query.clone();
spawn_forever(async move {
QueriesStorage::run_queries(&[(&query, &query_data)]).await;
});
}
};
let mut current_query = use_hook(|| {
make_query(&query, None);
State::create(query.clone())
});
if *current_query.read() != query {
let prev = mem::replace(&mut *current_query.write(), query.clone());
make_query(&query, Some(prev));
}
use_drop({
move || {
storage.update_tasks(current_query.peek().clone());
}
});
let query = UseQuery {
query: current_query,
};
use_side_effect(move || {
let _ = query.read();
});
query
}