use crate::{
computed::{ArcMemo, Memo},
diagnostics::is_suppressing_resource_load,
owner::{ArcStoredValue, ArenaItem},
send_wrapper_ext::SendOption,
signal::{ArcMappedSignal, ArcRwSignal, MappedSignal, RwSignal},
traits::{DefinedAt, Dispose, Get, GetUntracked, GetValue, Update, Write},
unwrap_signal,
};
use any_spawner::Executor;
use futures::{channel::oneshot, select, FutureExt};
use send_wrapper::SendWrapper;
use std::{
future::Future,
ops::{Deref, DerefMut},
panic::Location,
pin::Pin,
sync::Arc,
};
pub struct ArcAction<I, O> {
in_flight: ArcRwSignal<usize>,
input: ArcRwSignal<SendOption<I>>,
value: ArcRwSignal<SendOption<O>>,
version: ArcRwSignal<usize>,
dispatched: ArcStoredValue<usize>,
#[allow(clippy::complexity)]
action_fn: Arc<
dyn Fn(&I) -> Pin<Box<dyn Future<Output = O> + Send>> + Send + Sync,
>,
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: &'static Location<'static>,
}
impl<I, O> Clone for ArcAction<I, O> {
fn clone(&self) -> Self {
Self {
in_flight: self.in_flight.clone(),
input: self.input.clone(),
value: self.value.clone(),
version: self.version.clone(),
dispatched: self.dispatched.clone(),
action_fn: self.action_fn.clone(),
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: self.defined_at,
}
}
}
impl<I, O> ArcAction<I, O>
where
I: 'static,
O: 'static,
{
#[track_caller]
pub fn new<F, Fu>(action_fn: F) -> Self
where
F: Fn(&I) -> Fu + Send + Sync + 'static,
Fu: Future<Output = O> + Send + 'static,
I: Send + Sync,
O: Send + Sync,
{
Self::new_with_value(None, action_fn)
}
#[track_caller]
pub fn new_with_value<F, Fu>(value: Option<O>, action_fn: F) -> Self
where
F: Fn(&I) -> Fu + Send + Sync + 'static,
Fu: Future<Output = O> + Send + 'static,
I: Send + Sync,
O: Send + Sync,
{
ArcAction {
in_flight: ArcRwSignal::new(0),
input: ArcRwSignal::new(SendOption::new(None)),
value: ArcRwSignal::new(SendOption::new(value)),
version: Default::default(),
dispatched: Default::default(),
action_fn: Arc::new(move |input| Box::pin(action_fn(input))),
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: Location::caller(),
}
}
#[track_caller]
pub fn clear(&self) {
if let Some(mut guard) = self.value.try_write() {
**guard = None;
}
}
}
#[derive(Debug)]
pub struct ActionAbortHandle(oneshot::Sender<()>);
impl ActionAbortHandle {
pub fn abort(self) {
let _ = self.0.send(());
}
}
impl<I, O> ArcAction<I, O>
where
I: Send + Sync + 'static,
O: Send + Sync + 'static,
{
#[track_caller]
pub fn dispatch(&self, input: I) -> ActionAbortHandle {
let (abort_tx, mut abort_rx) = oneshot::channel();
if !is_suppressing_resource_load() {
let mut fut = (self.action_fn)(&input).fuse();
self.in_flight.update(|n| *n += 1);
let current_version = self.dispatched.get_value();
self.input.try_update(|inp| **inp = Some(input));
crate::spawn({
let input = self.input.clone();
let version = self.version.clone();
let dispatched = self.dispatched.clone();
let value = self.value.clone();
let in_flight = self.in_flight.clone();
async move {
select! {
_ = abort_rx => {
in_flight.update(|n| *n = n.saturating_sub(1));
},
result = fut => {
in_flight.update(|n| *n = n.saturating_sub(1));
let is_latest = dispatched.get_value() <= current_version;
if is_latest {
version.update(|n| *n += 1);
value.update(|n| **n = Some(result));
}
}
}
if in_flight.get_untracked() == 0 {
input.update(|inp| **inp = None);
}
}
});
}
ActionAbortHandle(abort_tx)
}
}
impl<I, O> ArcAction<I, O>
where
I: 'static,
O: 'static,
{
#[track_caller]
pub fn dispatch_local(&self, input: I) -> ActionAbortHandle {
let (abort_tx, mut abort_rx) = oneshot::channel();
if !is_suppressing_resource_load() {
let mut fut = (self.action_fn)(&input).fuse();
self.in_flight.update(|n| *n += 1);
let current_version = self.dispatched.get_value();
self.input.try_update(|inp| **inp = Some(input));
Executor::spawn_local({
let input = self.input.clone();
let version = self.version.clone();
let value = self.value.clone();
let dispatched = self.dispatched.clone();
let in_flight = self.in_flight.clone();
async move {
select! {
_ = abort_rx => {
in_flight.update(|n| *n = n.saturating_sub(1));
},
result = fut => {
in_flight.update(|n| *n = n.saturating_sub(1));
let is_latest = dispatched.get_value() <= current_version;
if is_latest {
version.update(|n| *n += 1);
value.update(|n| **n = Some(result));
}
}
}
if in_flight.get_untracked() == 0 {
input.update(|inp| **inp = None);
}
}
});
}
ActionAbortHandle(abort_tx)
}
}
impl<I, O> ArcAction<I, O>
where
I: 'static,
O: 'static,
{
#[track_caller]
pub fn new_unsync<F, Fu>(action_fn: F) -> Self
where
F: Fn(&I) -> Fu + 'static,
Fu: Future<Output = O> + 'static,
{
let action_fn = move |inp: &I| SendWrapper::new(action_fn(inp));
Self::new_unsync_with_value(None, action_fn)
}
#[track_caller]
pub fn new_unsync_with_value<F, Fu>(value: Option<O>, action_fn: F) -> Self
where
F: Fn(&I) -> Fu + 'static,
Fu: Future<Output = O> + 'static,
{
let action_fn = SendWrapper::new(action_fn);
ArcAction {
in_flight: ArcRwSignal::new(0),
input: ArcRwSignal::new(SendOption::new_local(None)),
value: ArcRwSignal::new(SendOption::new_local(value)),
version: Default::default(),
dispatched: Default::default(),
action_fn: Arc::new(move |input| {
Box::pin(SendWrapper::new(action_fn(input)))
}),
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: Location::caller(),
}
}
}
impl<I, O> ArcAction<I, O>
where
I: 'static,
O: 'static,
{
#[track_caller]
pub fn version(&self) -> ArcRwSignal<usize> {
self.version.clone()
}
#[track_caller]
pub fn input(&self) -> ArcMappedSignal<Option<I>> {
ArcMappedSignal::new(
self.input.clone(),
|n| n.deref(),
|n| n.deref_mut(),
)
}
#[track_caller]
pub fn value(&self) -> ArcMappedSignal<Option<O>> {
ArcMappedSignal::new(
self.value.clone(),
|n| n.deref(),
|n| n.deref_mut(),
)
}
#[track_caller]
pub fn pending(&self) -> ArcMemo<bool> {
let in_flight = self.in_flight.clone();
ArcMemo::new(move |_| in_flight.get() > 0)
}
}
impl<I, O> DefinedAt for ArcAction<I, O>
where
I: 'static,
O: 'static,
{
fn defined_at(&self) -> Option<&'static Location<'static>> {
#[cfg(any(debug_assertions, leptos_debuginfo))]
{
Some(self.defined_at)
}
#[cfg(not(any(debug_assertions, leptos_debuginfo)))]
{
None
}
}
}
pub struct Action<I, O> {
inner: ArenaItem<ArcAction<I, O>>,
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: &'static Location<'static>,
}
impl<I, O> Dispose for Action<I, O> {
fn dispose(self) {
self.inner.dispose()
}
}
impl<I, O> Action<I, O>
where
I: Send + Sync + 'static,
O: Send + Sync + 'static,
{
#[track_caller]
pub fn new<F, Fu>(action_fn: F) -> Self
where
F: Fn(&I) -> Fu + Send + Sync + 'static,
Fu: Future<Output = O> + Send + 'static,
{
Self {
inner: ArenaItem::new(ArcAction::new(action_fn)),
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: Location::caller(),
}
}
#[track_caller]
pub fn new_with_value<F, Fu>(value: Option<O>, action_fn: F) -> Self
where
F: Fn(&I) -> Fu + Send + Sync + 'static,
Fu: Future<Output = O> + Send + 'static,
{
Self {
inner: ArenaItem::new(ArcAction::new_with_value(value, action_fn)),
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: Location::caller(),
}
}
}
impl<I, O> Action<I, O>
where
I: 'static,
O: 'static,
{
#[track_caller]
pub fn clear(&self) {
self.inner.try_with_value(|inner| inner.clear());
}
}
impl<I, O> Action<I, O>
where
I: 'static,
O: 'static,
{
#[track_caller]
pub fn new_local<F, Fu>(action_fn: F) -> Self
where
F: Fn(&I) -> Fu + 'static,
Fu: Future<Output = O> + 'static,
{
Self {
inner: ArenaItem::new(ArcAction::new_unsync(action_fn)),
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: Location::caller(),
}
}
#[track_caller]
pub fn new_local_with_value<F, Fu>(value: Option<O>, action_fn: F) -> Self
where
F: Fn(&I) -> Fu + 'static,
Fu: Future<Output = O> + Send + 'static,
{
Self {
inner: ArenaItem::new(ArcAction::new_unsync_with_value(
value, action_fn,
)),
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: Location::caller(),
}
}
}
impl<I, O> Action<I, O>
where
I: 'static,
O: 'static,
{
#[track_caller]
pub fn version(&self) -> RwSignal<usize> {
let inner = self
.inner
.try_with_value(|inner| inner.version())
.unwrap_or_else(unwrap_signal!(self));
inner.into()
}
#[track_caller]
pub fn pending(&self) -> Memo<bool> {
let inner = self
.inner
.try_with_value(|inner| inner.pending())
.unwrap_or_else(unwrap_signal!(self));
inner.into()
}
}
impl<I, O> Action<I, O>
where
I: 'static,
O: 'static,
{
#[track_caller]
pub fn input(&self) -> MappedSignal<Option<I>> {
self.inner
.try_with_value(|inner| inner.input())
.unwrap_or_else(unwrap_signal!(self))
.into()
}
#[track_caller]
#[deprecated = "You can now use .input() for any value, whether it's \
thread-safe or not."]
pub fn input_local(&self) -> MappedSignal<Option<I>> {
self.inner
.try_with_value(|inner| inner.input())
.unwrap_or_else(unwrap_signal!(self))
.into()
}
}
impl<I, O> Action<I, O>
where
I: 'static,
O: 'static,
{
#[track_caller]
pub fn value(&self) -> MappedSignal<Option<O>> {
self.inner
.try_with_value(|inner| inner.value())
.unwrap_or_else(unwrap_signal!(self))
.into()
}
#[deprecated = "You can now use .value() for any value, whether it's \
thread-safe or not."]
#[track_caller]
pub fn value_local(&self) -> MappedSignal<Option<O>>
where
O: Send + Sync,
{
self.inner
.try_with_value(|inner| inner.value())
.unwrap_or_else(unwrap_signal!(self))
.into()
}
}
impl<I, O> Action<I, O>
where
I: Send + Sync + 'static,
O: Send + Sync + 'static,
{
#[track_caller]
pub fn dispatch(&self, input: I) -> ActionAbortHandle {
self.inner
.try_get_value()
.map(|inner| inner.dispatch(input))
.unwrap_or_else(unwrap_signal!(self))
}
}
impl<I, O> Action<I, O>
where
I: 'static,
O: 'static,
{
#[track_caller]
pub fn dispatch_local(&self, input: I) -> ActionAbortHandle {
self.inner
.try_get_value()
.map(|inner| inner.dispatch_local(input))
.unwrap_or_else(unwrap_signal!(self))
}
}
impl<I, O> Action<I, O>
where
I: Send + Sync + 'static,
O: Send + Sync + 'static,
{
#[track_caller]
pub fn new_unsync<F, Fu>(action_fn: F) -> Self
where
F: Fn(&I) -> Fu + 'static,
Fu: Future<Output = O> + 'static,
{
Self {
inner: ArenaItem::new_with_storage(ArcAction::new_unsync(
action_fn,
)),
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: Location::caller(),
}
}
#[track_caller]
pub fn new_unsync_with_value<F, Fu>(value: Option<O>, action_fn: F) -> Self
where
F: Fn(&I) -> Fu + 'static,
Fu: Future<Output = O> + 'static,
{
Self {
inner: ArenaItem::new_with_storage(
ArcAction::new_unsync_with_value(value, action_fn),
),
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: Location::caller(),
}
}
}
impl<I, O> Action<I, O>
where
I: 'static,
O: 'static,
{
#[track_caller]
pub fn new_unsync_local<F, Fu>(action_fn: F) -> Self
where
F: Fn(&I) -> Fu + 'static,
Fu: Future<Output = O> + 'static,
{
Self {
inner: ArenaItem::new_with_storage(ArcAction::new_unsync(
action_fn,
)),
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: Location::caller(),
}
}
#[track_caller]
pub fn new_unsync_local_with_value<F, Fu>(
value: Option<O>,
action_fn: F,
) -> Self
where
F: Fn(&I) -> Fu + 'static,
Fu: Future<Output = O> + 'static,
{
Self {
inner: ArenaItem::new_with_storage(
ArcAction::new_unsync_with_value(value, action_fn),
),
#[cfg(any(debug_assertions, leptos_debuginfo))]
defined_at: Location::caller(),
}
}
}
impl<I, O> DefinedAt for Action<I, O> {
fn defined_at(&self) -> Option<&'static Location<'static>> {
#[cfg(any(debug_assertions, leptos_debuginfo))]
{
Some(self.defined_at)
}
#[cfg(not(any(debug_assertions, leptos_debuginfo)))]
{
None
}
}
}
impl<I, O> Clone for Action<I, O> {
fn clone(&self) -> Self {
*self
}
}
impl<I, O> Copy for Action<I, O> {}
#[inline(always)]
#[track_caller]
#[deprecated = "This function is being removed to conform to Rust idioms. \
Please use `Action::new()` instead."]
pub fn create_action<I, O, F, Fu>(action_fn: F) -> Action<I, O>
where
I: Send + Sync + 'static,
O: Send + Sync + 'static,
F: Fn(&I) -> Fu + Send + Sync + 'static,
Fu: Future<Output = O> + Send + 'static,
{
Action::new(action_fn)
}