use std::any::Any;
use std::collections::BTreeSet;
use std::hash::Hash;
use std::sync::{Arc, Weak};
use arc_swap::ArcSwap;
use futures::future::BoxFuture;
use parking_lot::Mutex;
use tokio::task::AbortHandle;
use tokio_util::sync::CancellationToken;
use crate::arc::addr::WeakAddr;
use crate::arc::dependencies::Dependencies;
use crate::arc::{Derived, Evaluation, Invalid, Observable, State, Version};
use crate::hashed::Hashed;
#[doc(hidden)]
pub struct Async<T>
where
T: Send + Sync + Hash + 'static,
{
body: Arc<AsyncBody<T>>,
}
impl<T> Clone for Async<T>
where
T: Send + Sync + Hash,
{
fn clone(&self) -> Self {
Self {
body: self.body.clone(),
}
}
}
impl<T: Send + Sync + Hash + 'static> From<Async<T>> for Arc<dyn Any> {
fn from(var: Async<T>) -> Self {
var.body
}
}
impl<T: Send + Sync + Hash + 'static> TryFrom<Arc<dyn Any + Send + Sync>> for Async<T> {
type Error = Arc<dyn Any + Send + Sync>;
fn try_from(value: Arc<dyn Any + Send + Sync>) -> Result<Self, Self::Error> {
Arc::downcast::<AsyncBody<T>>(value).map(|body| Async { body })
}
}
pub struct AsyncBody<T>
where
T: Send + Hash + Sync + 'static,
{
value: ArcSwap<Option<Hashed<T>>>,
inner: Mutex<AsyncInner<T>>,
}
struct AsyncEffect<
K: Hash + Send,
T,
H: Fn(&Evaluation) -> K + Send + 'static,
F: Fn(K, CancellationToken) -> BoxFuture<'static, T> + Send + 'static,
> {
handler: H,
func: F,
value: Option<Hashed<K>>,
}
pub trait AsyncEffecty<T>: Send {
fn compute(&mut self, cx: &Evaluation) -> u64;
fn invoke(&mut self, cancel: CancellationToken) -> BoxFuture<'static, T>;
}
impl<K, T, H, F> AsyncEffecty<T> for AsyncEffect<K, T, H, F>
where
K: Hash + Send,
H: Fn(&Evaluation) -> K + 'static + Send,
F: Fn(K, CancellationToken) -> BoxFuture<'static, T> + 'static + Send,
{
fn compute(&mut self, cx: &Evaluation) -> u64 {
let value = Hashed::new((self.handler)(cx));
self.value = Some(value);
return self.value.as_ref().unwrap().hash;
}
fn invoke(&mut self, cancel: CancellationToken) -> BoxFuture<'static, T> {
(self.func)(self.value.take().unwrap().value, cancel)
}
}
pub struct AsyncInner<T>
where
T: Send + Hash + Sync + 'static,
{
effect: Box<dyn AsyncEffecty<T>>,
future: Option<BoxFuture<'static, T>>,
revision: u64,
cancel: CancellationToken,
handle: Option<AbortHandle>,
state: State,
used_by: BTreeSet<WeakAddr<dyn Derived>>,
dependencies: Dependencies,
this: Weak<AsyncBody<T>>,
}
impl<T> Drop for AsyncInner<T>
where
T: Send + Sync + Hash + 'static,
{
fn drop(&mut self) {
let refr = self.this.clone() as Weak<dyn Derived>;
self.dependencies.drop(&refr);
}
}
impl<T> Async<T>
where
T: Send + Sync + Hash + 'static,
{
pub fn new<K: Hash + Send + 'static>(
handler: impl Fn(&Evaluation) -> K + 'static + Send,
func: impl Fn(K, CancellationToken) -> BoxFuture<'static, T> + 'static + Send,
) -> Self {
Async {
body: Arc::new_cyclic(|this| AsyncBody {
value: ArcSwap::new(Arc::new(None)),
inner: Mutex::new(AsyncInner {
effect: Box::new(AsyncEffect {
func,
handler,
value: None,
}) as Box<dyn AsyncEffecty<T>>,
revision: 0,
future: None,
handle: None,
cancel: CancellationToken::new(),
state: State::Invalid(Invalid::Definitely),
used_by: BTreeSet::new(),
dependencies: Dependencies::new(),
this: this.clone(),
}),
}),
}
}
#[inline]
pub fn get<'a>(&'a self, cx: &'a impl AsRef<Evaluation>) -> u64 {
self.body.get(cx.as_ref())
}
}
impl<T> AsyncBody<T>
where
T: Send + Sync + Hash + 'static,
{
pub fn get<'a>(&'a self, eval: &'_ Evaluation) -> u64 {
{
let mut self_mut = self.inner.lock();
self.inner_update(&mut self_mut);
eval.based_on(
self_mut.this.upgrade().unwrap(),
Version::Hash(self_mut.revision),
);
self_mut.used_by(eval.parent());
self_mut.revision
}
}
pub(crate) fn used_by(&self, observable: Weak<dyn Derived>) {
self.inner.lock().used_by(observable);
}
fn not_used_by(&self, derived: &Weak<dyn Derived>) {
self.inner.lock().not_used_by(derived);
}
pub fn inner_update(&self, inner_mut: &mut AsyncInner<T>) {
if inner_mut.state == State::Valid {
return;
}
let is_valid = match inner_mut.state {
State::Valid => true,
State::Invalid(Invalid::Definitely) => false,
State::Invalid(Invalid::Maybe) => inner_mut.dependencies.are_valid(),
};
if is_valid {
inner_mut.state = State::Valid;
return;
}
let this = inner_mut.this.clone();
let evaluation = Evaluation::new(this.clone() as Weak<dyn Derived>);
let revision = inner_mut.effect.compute(&evaluation);
if revision != inner_mut.revision {
inner_mut.revision = revision;
let future = inner_mut.effect.invoke(inner_mut.cancel.clone());
inner_mut.handle = Some(
tokio::spawn(async move {
let value = future.await;
let Some(this) = this.upgrade() else {
return;
};
let value = Some(Hashed::new(value));
if **this.value.load() != value {
this.value.swap(Arc::new(value));
let mut inner = this.inner.lock();
inner.used_by.retain(|item| {
if let Some(item) = item.upgrade() {
item.invalidate(Invalid::Maybe);
true
} else {
false
}
});
}
})
.abort_handle(),
);
}
inner_mut.state = State::Valid;
let parent = inner_mut.this.clone() as Weak<dyn Derived>;
inner_mut.dependencies.swap(evaluation.take(), &parent);
}
}
impl<T> AsyncInner<T>
where
T: Send + Sync + Hash + 'static,
{
pub(crate) fn used_by(&mut self, observable: Weak<dyn Derived>) {
self.used_by.insert(WeakAddr::new(observable));
}
fn not_used_by(&mut self, derived: &Weak<dyn Derived>) {
self.used_by.remove(&WeakAddr::new(derived.clone()));
}
}
impl<T> Observable for AsyncBody<T>
where
T: Send + Sync + Hash + 'static,
{
fn update(&self) -> Version {
self.inner_update(&mut self.inner.lock());
self.version()
}
fn version(&self) -> Version {
Version::Hash(0)
}
fn used_by(&self, derived: Weak<dyn Derived>) {
AsyncBody::used_by(self, derived)
}
fn not_used_by(&self, derived: &Weak<dyn Derived>) {
AsyncBody::not_used_by(self, derived)
}
}
impl<T: 'static> Derived for AsyncBody<T>
where
T: Send + Sync + Hash + 'static,
{
fn invalidate(self: Arc<Self>, invalid: crate::arc::Invalid) {
let mut self_mut = self.inner.lock();
if matches!(self_mut.state, State::Valid) {
self_mut.state = State::Invalid(invalid);
self_mut.used_by.retain(|item| {
if let Some(item) = item.upgrade() {
item.invalidate(Invalid::Maybe);
true
} else {
false
}
});
}
}
}