use std::{
sync::{
Arc, Weak,
atomic::{AtomicBool, Ordering},
},
thread::{self, ThreadId},
};
use crate::{Request, RequestHandle, Resolvable, ResolveError, capability::Operation};
use super::Layer;
type ResolveFn<Output> = Box<dyn FnMut(&mut RequestHandle<Output>, Output) + Send>;
pub struct EffectResolver<Output: Send + 'static> {
handle: RequestHandle<Output>,
resolve_fn: ResolveFn<Output>,
active: Arc<AtomicBool>,
calling_thread: ThreadId,
}
impl<Output: Send + 'static> EffectResolver<Output> {
pub fn resolve(&mut self, output: Output) {
assert!(
!(self.active.load(Ordering::Acquire) && thread::current().id() == self.calling_thread),
"EffectMiddleware::try_process_effect must not call resolve() synchronously. \
Dispatch work asynchronously (thread, spawn_local, channel, etc.). \
See https://github.com/redbadger/crux/issues/492"
);
(self.resolve_fn)(&mut self.handle, output);
}
}
pub trait EffectMiddleware: Send + Sync {
type Op: Operation;
fn try_process_effect(
&self,
operation: Self::Op,
resolver: EffectResolver<<Self::Op as Operation>::Output>,
);
}
struct EffectMiddlewareLayerInner<Next, EM>
where
Next: Layer + Sync + Send + 'static,
Next::Effect: TryInto<Request<EM::Op>, Error = Next::Effect>,
EM: EffectMiddleware,
{
next: Next,
middleware: EM,
}
pub struct HandleEffectLayer<Next, EM>
where
Next: Layer + Sync + Send + 'static,
Next::Effect: TryInto<Request<EM::Op>, Error = Next::Effect>,
EM: EffectMiddleware,
{
inner: Arc<EffectMiddlewareLayerInner<Next, EM>>,
}
impl<Next, EM> Layer for HandleEffectLayer<Next, EM>
where
Next: Layer,
Next::Effect: TryInto<Request<EM::Op>, Error = Next::Effect>,
EM: EffectMiddleware + 'static,
{
type Event = Next::Event;
type Effect = Next::Effect;
type ViewModel = Next::ViewModel;
fn update<F: Fn(Vec<Self::Effect>) + Send + Sync + 'static>(
&self,
event: Self::Event,
effect_callback: F,
) -> Vec<Self::Effect> {
self.update(event, effect_callback)
}
fn resolve<Output, F: Fn(Vec<Self::Effect>) + Send + Sync + 'static>(
&self,
request: &mut impl Resolvable<Output>,
output: Output,
effect_callback: F,
) -> Result<Vec<Self::Effect>, ResolveError> {
self.resolve(request, output, effect_callback)
}
fn view(&self) -> Self::ViewModel {
self.view()
}
fn process_tasks<F>(&self, effect_callback: F) -> Vec<Self::Effect>
where
F: Fn(Vec<Self::Effect>) + Sync + Send + 'static,
{
self.process_tasks(effect_callback)
}
}
impl<Next, EM> HandleEffectLayer<Next, EM>
where
Next: Layer,
Next::Effect: TryInto<Request<EM::Op>, Error = Next::Effect>,
EM: EffectMiddleware + 'static,
{
pub fn new(next: Next, middleware: EM) -> Self {
Self {
inner: Arc::new(EffectMiddlewareLayerInner { next, middleware }),
}
}
fn update(
&self,
event: Next::Event,
return_effects: impl Fn(Vec<Next::Effect>) + Send + Sync + 'static,
) -> Vec<Next::Effect> {
let inner = Arc::downgrade(&self.inner);
let return_effects = Arc::new(return_effects);
let return_effects_copy = return_effects.clone();
let effects = self
.inner
.next
.update(event, move |later_effects_from_next| {
Self::process_known_effects_with(&inner, later_effects_from_next, &return_effects);
});
Self::process_known_effects(&Arc::downgrade(&self.inner), effects, &return_effects_copy)
}
fn resolve<Output>(
&self,
request: &mut impl Resolvable<Output>,
result: Output,
return_effects: impl Fn(Vec<Next::Effect>) + Send + Sync + 'static,
) -> Result<Vec<Next::Effect>, ResolveError> {
let inner = Arc::downgrade(&self.inner);
let return_effects = Arc::new(return_effects);
let return_effects_copy = return_effects.clone();
let effects = self
.inner
.next
.resolve(request, result, move |later_effects_from_next| {
Self::process_known_effects_with(&inner, later_effects_from_next, &return_effects);
})?;
Ok(Self::process_known_effects(
&Arc::downgrade(&self.inner),
effects,
&return_effects_copy,
))
}
fn view(&self) -> Next::ViewModel {
self.inner.next.view()
}
fn process_tasks<F>(&self, return_effects: F) -> Vec<Next::Effect>
where
F: Fn(Vec<Next::Effect>) + Sync + Send + 'static,
{
let inner = Arc::downgrade(&self.inner);
let return_effects = Arc::new(return_effects);
let return_effects_copy = return_effects.clone();
let effects = self
.inner
.next
.process_tasks(move |later_effects_from_next| {
Self::process_known_effects_with(&inner, later_effects_from_next, &return_effects);
});
Self::process_known_effects(&Arc::downgrade(&self.inner), effects, &return_effects_copy)
}
fn process_known_effects(
inner: &Weak<EffectMiddlewareLayerInner<Next, EM>>,
effects: Vec<Next::Effect>,
return_effects: &Arc<impl Fn(Vec<Next::Effect>) + Send + Sync + 'static>,
) -> Vec<Next::Effect> {
effects
.into_iter()
.filter_map(|effect| {
let request: Request<EM::Op> = match effect.try_into() {
Ok(req) => req,
Err(effect) => return Some(effect),
};
let (operation, handle) = request.split();
let resolve_fn = {
let return_effects = return_effects.clone();
let inner = inner.clone();
move |req_handle: &mut RequestHandle<<EM::Op as Operation>::Output>, output| {
let Some(strong_inner) = inner.upgrade() else {
eprintln!("Inner can't be upgraded after resolving effect");
return;
};
if let Ok(immediate_effects) =
strong_inner.next.resolve(req_handle, output, {
let return_effects = return_effects.clone();
let future_inner = inner.clone();
move |eventual_effects| {
Self::process_known_effects_with(
&future_inner,
eventual_effects,
&return_effects,
);
}
})
{
Self::process_known_effects_with(
&inner,
immediate_effects,
&return_effects,
);
}
}
};
let Some(strong_inner) = inner.upgrade() else {
eprintln!("Inner can't be upgraded to process effect");
return None;
};
let active = Arc::new(AtomicBool::new(true));
let resolver = EffectResolver {
handle,
resolve_fn: Box::new(resolve_fn),
active: active.clone(),
calling_thread: thread::current().id(),
};
strong_inner
.middleware
.try_process_effect(operation, resolver);
active.store(false, Ordering::Release);
None
})
.collect()
}
fn process_known_effects_with(
inner: &Weak<EffectMiddlewareLayerInner<Next, EM>>,
effects: Vec<<Next as Layer>::Effect>,
return_effects: &Arc<impl Fn(Vec<<Next as Layer>::Effect>) + Send + Sync + 'static>,
) {
let unknown_effects = Self::process_known_effects(inner, effects, return_effects);
if !unknown_effects.is_empty() {
return_effects(unknown_effects);
}
}
}