reactive_graph/actions/
action.rs

1use crate::{
2    computed::{ArcMemo, Memo, ScopedFuture},
3    diagnostics::is_suppressing_resource_load,
4    graph::untrack,
5    owner::{ArcStoredValue, ArenaItem, Owner},
6    send_wrapper_ext::SendOption,
7    signal::{ArcMappedSignal, ArcRwSignal, MappedSignal, RwSignal},
8    traits::{DefinedAt, Dispose, Get, GetUntracked, GetValue, Update, Write},
9    unwrap_signal,
10};
11use any_spawner::Executor;
12use futures::{channel::oneshot, select, FutureExt};
13use send_wrapper::SendWrapper;
14use std::{
15    future::Future,
16    ops::{Deref, DerefMut},
17    panic::Location,
18    pin::Pin,
19    sync::Arc,
20};
21
22/// An action runs some asynchronous code when you dispatch a new value to it, and gives you
23/// reactive access to the result.
24///
25/// Actions are intended for mutating or updating data, not for loading data. If you find yourself
26/// creating an action and immediately dispatching a value to it, this is probably the wrong
27/// primitive.
28///
29/// The arena-allocated, `Copy` version of an `ArcAction` is an [`Action`].
30///
31/// ```rust
32/// # use reactive_graph::actions::*;
33/// # use reactive_graph::prelude::*;
34/// # tokio_test::block_on(async move {
35/// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
36/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
37/// async fn send_new_todo_to_api(task: String) -> usize {
38///     // do something...
39///     // return a task id
40///     42
41/// }
42/// let save_data = ArcAction::new(|task: &String| {
43///   // `task` is given as `&String` because its value is available in `input`
44///   send_new_todo_to_api(task.clone())
45/// });
46///
47/// // the argument currently running
48/// let input = save_data.input();
49/// // the most recent returned result
50/// let result_of_call = save_data.value();
51/// // whether the call is pending
52/// let pending = save_data.pending();
53/// // how many times the action has run
54/// // useful for reactively updating something else in response to a `dispatch` and response
55/// let version = save_data.version();
56///
57/// // before we do anything
58/// assert_eq!(input.get(), None); // no argument yet
59/// assert_eq!(pending.get(), false); // isn't pending a response
60/// assert_eq!(result_of_call.get(), None); // there's no "last value"
61/// assert_eq!(version.get(), 0);
62///
63/// // dispatch the action
64/// save_data.dispatch("My todo".to_string());
65///
66/// // when we're making the call
67/// assert_eq!(input.get(), Some("My todo".to_string()));
68/// assert_eq!(pending.get(), true); // is pending
69/// assert_eq!(result_of_call.get(), None); // has not yet gotten a response
70///
71/// # any_spawner::Executor::tick().await;
72///
73/// // after call has resolved
74/// assert_eq!(input.get(), None); // input clears out after resolved
75/// assert_eq!(pending.get(), false); // no longer pending
76/// assert_eq!(result_of_call.get(), Some(42));
77/// assert_eq!(version.get(), 1);
78/// # });
79/// ```
80///
81/// The input to the `async` function should always be a single value,
82/// but it can be of any type. The argument is always passed by reference to the
83/// function, because it is stored in [Action::input] as well.
84///
85/// ```rust
86/// # use reactive_graph::actions::*;
87/// // if there's a single argument, just use that
88/// let action1 = ArcAction::new(|input: &String| {
89///     let input = input.clone();
90///     async move { todo!() }
91/// });
92///
93/// // if there are no arguments, use the unit type `()`
94/// let action2 = ArcAction::new(|input: &()| async { todo!() });
95///
96/// // if there are multiple arguments, use a tuple
97/// let action3 = ArcAction::new(|input: &(usize, String)| async { todo!() });
98/// ```
99pub struct ArcAction<I, O> {
100    in_flight: ArcRwSignal<usize>,
101    input: ArcRwSignal<SendOption<I>>,
102    value: ArcRwSignal<SendOption<O>>,
103    version: ArcRwSignal<usize>,
104    dispatched: ArcStoredValue<usize>,
105    #[allow(clippy::complexity)]
106    action_fn: Arc<
107        dyn Fn(&I) -> Pin<Box<dyn Future<Output = O> + Send>> + Send + Sync,
108    >,
109    #[cfg(any(debug_assertions, leptos_debuginfo))]
110    defined_at: &'static Location<'static>,
111}
112
113impl<I, O> Clone for ArcAction<I, O> {
114    fn clone(&self) -> Self {
115        Self {
116            in_flight: self.in_flight.clone(),
117            input: self.input.clone(),
118            value: self.value.clone(),
119            version: self.version.clone(),
120            dispatched: self.dispatched.clone(),
121            action_fn: self.action_fn.clone(),
122            #[cfg(any(debug_assertions, leptos_debuginfo))]
123            defined_at: self.defined_at,
124        }
125    }
126}
127
128impl<I, O> ArcAction<I, O>
129where
130    I: 'static,
131    O: 'static,
132{
133    /// Creates a new action. This is lazy: it does not run the action function until some value
134    /// is dispatched.
135    ///
136    /// The constructor takes a function which will create a new `Future` from some input data.
137    /// When the action is dispatched, this `action_fn` will run, and the `Future` it returns will
138    /// be spawned.
139    ///
140    /// The `action_fn` must be `Send + Sync` so that the `ArcAction` is `Send + Sync`. The
141    /// `Future` must be `Send` so that it can be moved across threads by the async executor as
142    /// needed.
143    ///
144    /// ```rust
145    /// # use reactive_graph::actions::*;
146    /// # use reactive_graph::prelude::*;
147    /// # tokio_test::block_on(async move {
148    /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
149    /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
150    /// let act = ArcAction::new(|n: &u8| {
151    ///     let n = n.to_owned();
152    ///     async move { n * 2 }
153    /// });
154    ///
155    /// act.dispatch(3);
156    /// assert_eq!(act.input().get(), Some(3));
157    ///
158    /// // Remember that async functions already return a future if they are
159    /// // not `await`ed. You can save keystrokes by leaving out the `async move`
160    ///
161    /// let act2 = Action::new(|n: &String| yell(n.to_owned()));
162    /// act2.dispatch(String::from("i'm in a doctest"));
163    /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
164    ///
165    /// // after it resolves
166    /// assert_eq!(act2.value().get(), Some("I'M IN A DOCTEST".to_string()));
167    ///
168    /// async fn yell(n: String) -> String {
169    ///     n.to_uppercase()
170    /// }
171    /// # });
172    /// ```
173    #[track_caller]
174    pub fn new<F, Fu>(action_fn: F) -> Self
175    where
176        F: Fn(&I) -> Fu + Send + Sync + 'static,
177        Fu: Future<Output = O> + Send + 'static,
178        I: Send + Sync,
179        O: Send + Sync,
180    {
181        Self::new_with_value(None, action_fn)
182    }
183
184    /// Creates a new action, initializing it with the given value.
185    ///
186    /// This is lazy: it does not run the action function until some value is dispatched.
187    ///
188    /// The constructor takes a function which will create a new `Future` from some input data.
189    /// When the action is dispatched, this `action_fn` will run, and the `Future` it returns will
190    /// be spawned.
191    ///
192    /// The `action_fn` must be `Send + Sync` so that the `ArcAction` is `Send + Sync`. The
193    /// `Future` must be `Send` so that it can be moved across threads by the async executor as
194    /// needed.
195    #[track_caller]
196    pub fn new_with_value<F, Fu>(value: Option<O>, action_fn: F) -> Self
197    where
198        F: Fn(&I) -> Fu + Send + Sync + 'static,
199        Fu: Future<Output = O> + Send + 'static,
200        I: Send + Sync,
201        O: Send + Sync,
202    {
203        let owner = Owner::current().unwrap_or_default();
204        ArcAction {
205            in_flight: ArcRwSignal::new(0),
206            input: ArcRwSignal::new(SendOption::new(None)),
207            value: ArcRwSignal::new(SendOption::new(value)),
208            version: Default::default(),
209            dispatched: Default::default(),
210            action_fn: Arc::new(move |input| {
211                Box::pin(owner.with(|| {
212                    ScopedFuture::new_untracked(untrack(|| action_fn(input)))
213                }))
214            }),
215            #[cfg(any(debug_assertions, leptos_debuginfo))]
216            defined_at: Location::caller(),
217        }
218    }
219
220    /// Clears the value of the action, setting its current value to `None`.
221    ///
222    /// This has no other effect: i.e., it will not cancel in-flight actions, set the
223    /// input, etc.
224    #[track_caller]
225    pub fn clear(&self) {
226        if let Some(mut guard) = self.value.try_write() {
227            **guard = None;
228        }
229    }
230}
231
232/// A handle that allows aborting an in-flight action. It is returned from [`Action::dispatch`] or
233/// [`ArcAction::dispatch`].
234#[derive(Debug)]
235pub struct ActionAbortHandle(oneshot::Sender<()>);
236
237impl ActionAbortHandle {
238    /// Aborts the action.
239    ///
240    /// This will cause the dispatched task to complete, without updating the action's value. The
241    /// dispatched action's `Future` will no longer be polled. This does not guarantee that side
242    /// effects created by that `Future` no longer run: for example, if the action dispatches an
243    /// HTTP request, whether that request is actually canceled or not depends on whether the
244    /// request library actually cancels a request when its `Future` is dropped.
245    pub fn abort(self) {
246        let _ = self.0.send(());
247    }
248}
249
250impl<I, O> ArcAction<I, O>
251where
252    I: Send + Sync + 'static,
253    O: Send + Sync + 'static,
254{
255    /// Calls the `async` function with a reference to the input type as its argument.
256    #[track_caller]
257    pub fn dispatch(&self, input: I) -> ActionAbortHandle {
258        let (abort_tx, mut abort_rx) = oneshot::channel();
259        if !is_suppressing_resource_load() {
260            let mut fut = (self.action_fn)(&input).fuse();
261
262            // Update the state before loading
263            self.in_flight.update(|n| *n += 1);
264            let current_version = self.dispatched.get_value();
265            self.input.try_update(|inp| **inp = Some(input));
266
267            // Spawn the task
268            crate::spawn({
269                let input = self.input.clone();
270                let version = self.version.clone();
271                let dispatched = self.dispatched.clone();
272                let value = self.value.clone();
273                let in_flight = self.in_flight.clone();
274                async move {
275                    select! {
276                        // if the abort message has been sent, bail and do nothing
277                        _ = abort_rx => {
278                            in_flight.update(|n| *n = n.saturating_sub(1));
279                        },
280                        // otherwise, update the value
281                        result = fut => {
282                            in_flight.update(|n| *n = n.saturating_sub(1));
283                            let is_latest = dispatched.get_value() <= current_version;
284                            if is_latest {
285                                version.update(|n| *n += 1);
286                                value.update(|n| **n = Some(result));
287                            }
288                        }
289                    }
290                    if in_flight.get_untracked() == 0 {
291                        input.update(|inp| **inp = None);
292                    }
293                }
294            });
295        }
296
297        ActionAbortHandle(abort_tx)
298    }
299}
300
301impl<I, O> ArcAction<I, O>
302where
303    I: 'static,
304    O: 'static,
305{
306    /// Calls the `async` function with a reference to the input type as its argument,
307    /// ensuring that it is spawned on the current thread.
308    #[track_caller]
309    pub fn dispatch_local(&self, input: I) -> ActionAbortHandle {
310        let (abort_tx, mut abort_rx) = oneshot::channel();
311        if !is_suppressing_resource_load() {
312            let mut fut = (self.action_fn)(&input).fuse();
313
314            // Update the state before loading
315            self.in_flight.update(|n| *n += 1);
316            let current_version = self.dispatched.get_value();
317            self.input.try_update(|inp| **inp = Some(input));
318
319            // Spawn the task
320            Executor::spawn_local({
321                let input = self.input.clone();
322                let version = self.version.clone();
323                let value = self.value.clone();
324                let dispatched = self.dispatched.clone();
325                let in_flight = self.in_flight.clone();
326                async move {
327                    select! {
328                        // if the abort message has been sent, bail and do nothing
329                        _ = abort_rx => {
330                            in_flight.update(|n| *n = n.saturating_sub(1));
331                        },
332                        // otherwise, update the value
333                        result = fut => {
334                            in_flight.update(|n| *n = n.saturating_sub(1));
335                            let is_latest = dispatched.get_value() <= current_version;
336                            if is_latest {
337                                version.update(|n| *n += 1);
338                                value.update(|n| **n = Some(result));
339                            }
340                        }
341                    }
342                    if in_flight.get_untracked() == 0 {
343                        input.update(|inp| **inp = None);
344                    }
345                }
346            });
347        }
348        ActionAbortHandle(abort_tx)
349    }
350}
351
352impl<I, O> ArcAction<I, O>
353where
354    I: 'static,
355    O: 'static,
356{
357    /// Creates a new action, which will only be run on the thread in which it is created.
358    ///
359    /// In all other ways, this is identical to [`ArcAction::new`].
360    #[track_caller]
361    pub fn new_unsync<F, Fu>(action_fn: F) -> Self
362    where
363        F: Fn(&I) -> Fu + 'static,
364        Fu: Future<Output = O> + 'static,
365    {
366        let action_fn = move |inp: &I| SendWrapper::new(action_fn(inp));
367        Self::new_unsync_with_value(None, action_fn)
368    }
369
370    /// Creates a new action that will only run on the current thread, initializing it with the given value.
371    ///
372    /// In all other ways, this is identical to [`ArcAction::new_with_value`].
373    #[track_caller]
374    pub fn new_unsync_with_value<F, Fu>(value: Option<O>, action_fn: F) -> Self
375    where
376        F: Fn(&I) -> Fu + 'static,
377        Fu: Future<Output = O> + 'static,
378    {
379        let owner = Owner::current().unwrap_or_default();
380        let action_fn = SendWrapper::new(action_fn);
381        ArcAction {
382            in_flight: ArcRwSignal::new(0),
383            input: ArcRwSignal::new(SendOption::new_local(None)),
384            value: ArcRwSignal::new(SendOption::new_local(value)),
385            version: Default::default(),
386            dispatched: Default::default(),
387            action_fn: Arc::new(move |input| {
388                Box::pin(SendWrapper::new(owner.with(|| {
389                    ScopedFuture::new_untracked(untrack(|| action_fn(input)))
390                })))
391            }),
392            #[cfg(any(debug_assertions, leptos_debuginfo))]
393            defined_at: Location::caller(),
394        }
395    }
396}
397
398impl<I, O> ArcAction<I, O>
399where
400    I: 'static,
401    O: 'static,
402{
403    /// The number of times the action has successfully completed.
404    ///
405    /// ```rust
406    /// # use reactive_graph::actions::*;
407    /// # use reactive_graph::prelude::*;
408    /// # tokio_test::block_on(async move {
409    /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
410    /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
411    /// let act = ArcAction::new(|n: &u8| {
412    ///     let n = n.to_owned();
413    ///     async move { n * 2 }
414    /// });
415    ///
416    /// let version = act.version();
417    /// act.dispatch(3);
418    /// assert_eq!(version.get(), 0);
419    ///
420    /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
421    /// // after it resolves
422    /// assert_eq!(version.get(), 1);
423    /// # });
424    /// ```
425    #[track_caller]
426    pub fn version(&self) -> ArcRwSignal<usize> {
427        self.version.clone()
428    }
429
430    /// The current argument that was dispatched to the async function. This value will
431    /// be `Some` while we are waiting for it to resolve, and `None` after it has resolved.
432    ///
433    /// ```rust
434    /// # use reactive_graph::actions::*;
435    /// # use reactive_graph::prelude::*;
436    /// # tokio_test::block_on(async move {
437    /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
438    /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
439    /// let act = ArcAction::new(|n: &u8| {
440    ///     let n = n.to_owned();
441    ///     async move { n * 2 }
442    /// });
443    ///
444    /// let input = act.input();
445    /// assert_eq!(input.get(), None);
446    /// act.dispatch(3);
447    /// assert_eq!(input.get(), Some(3));
448    ///
449    /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
450    /// // after it resolves
451    /// assert_eq!(input.get(), None);
452    /// # });
453    /// ```
454    #[track_caller]
455    pub fn input(&self) -> ArcMappedSignal<Option<I>> {
456        ArcMappedSignal::new(
457            self.input.clone(),
458            |n| n.deref(),
459            |n| n.deref_mut(),
460        )
461    }
462
463    /// The most recent return value of the `async` function. This will be `None` before
464    /// the action has ever run successfully, and subsequently will always be `Some(_)`,
465    /// holding the old value until a new value has been received.
466    ///
467    /// ```rust
468    /// # use reactive_graph::actions::*;
469    /// # use reactive_graph::prelude::*;
470    /// # tokio_test::block_on(async move {
471    /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
472    /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
473    /// let act = ArcAction::new(|n: &u8| {
474    ///     let n = n.to_owned();
475    ///     async move { n * 2 }
476    /// });
477    ///
478    /// let value = act.value();
479    /// assert_eq!(value.get(), None);
480    /// act.dispatch(3);
481    /// assert_eq!(value.get(), None);
482    ///
483    /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
484    /// // after it resolves
485    /// assert_eq!(value.get(), Some(6));
486    /// // dispatch another value, and it still holds the old value
487    /// act.dispatch(3);
488    /// assert_eq!(value.get(), Some(6));
489    /// # });
490    /// ```
491    #[track_caller]
492    pub fn value(&self) -> ArcMappedSignal<Option<O>> {
493        ArcMappedSignal::new(
494            self.value.clone(),
495            |n| n.deref(),
496            |n| n.deref_mut(),
497        )
498    }
499
500    /// Whether the action has been dispatched and is currently waiting to resolve.
501    ///
502    /// ```rust
503    /// # use reactive_graph::actions::*;
504    /// # use reactive_graph::prelude::*;
505    /// # tokio_test::block_on(async move {
506    /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
507    /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
508    /// let act = ArcAction::new(|n: &u8| {
509    ///     let n = n.to_owned();
510    ///     async move { n * 2 }
511    /// });
512    ///
513    /// let pending = act.pending();
514    /// assert_eq!(pending.get(), false);
515    /// act.dispatch(3);
516    /// assert_eq!(pending.get(), true);
517    ///
518    /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
519    /// // after it resolves
520    /// assert_eq!(pending.get(), false);
521    /// # });
522    /// ```
523    #[track_caller]
524    pub fn pending(&self) -> ArcMemo<bool> {
525        let in_flight = self.in_flight.clone();
526        ArcMemo::new(move |_| in_flight.get() > 0)
527    }
528}
529
530impl<I, O> DefinedAt for ArcAction<I, O>
531where
532    I: 'static,
533    O: 'static,
534{
535    fn defined_at(&self) -> Option<&'static Location<'static>> {
536        #[cfg(any(debug_assertions, leptos_debuginfo))]
537        {
538            Some(self.defined_at)
539        }
540        #[cfg(not(any(debug_assertions, leptos_debuginfo)))]
541        {
542            None
543        }
544    }
545}
546
547/// An action runs some asynchronous code when you dispatch a new value to it, and gives you
548/// reactive access to the result.
549///
550/// Actions are intended for mutating or updating data, not for loading data. If you find yourself
551/// creating an action and immediately dispatching a value to it, this is probably the wrong
552/// primitive.
553///
554/// The reference-counted, `Clone` (but not `Copy` version of an `Action` is an [`ArcAction`].
555///
556/// ```rust
557/// # use reactive_graph::actions::*;
558/// # use reactive_graph::prelude::*;
559/// # tokio_test::block_on(async move {
560/// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
561/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
562/// async fn send_new_todo_to_api(task: String) -> usize {
563///     // do something...
564///     // return a task id
565///     42
566/// }
567/// let save_data = Action::new(|task: &String| {
568///   // `task` is given as `&String` because its value is available in `input`
569///   send_new_todo_to_api(task.clone())
570/// });
571///
572/// // the argument currently running
573/// let input = save_data.input();
574/// // the most recent returned result
575/// let result_of_call = save_data.value();
576/// // whether the call is pending
577/// let pending = save_data.pending();
578/// // how many times the action has run
579/// // useful for reactively updating something else in response to a `dispatch` and response
580/// let version = save_data.version();
581///
582/// // before we do anything
583/// assert_eq!(input.get(), None); // no argument yet
584/// assert_eq!(pending.get(), false); // isn't pending a response
585/// assert_eq!(result_of_call.get(), None); // there's no "last value"
586/// assert_eq!(version.get(), 0);
587///
588/// // dispatch the action
589/// save_data.dispatch("My todo".to_string());
590///
591/// // when we're making the call
592/// assert_eq!(input.get(), Some("My todo".to_string()));
593/// assert_eq!(pending.get(), true); // is pending
594/// assert_eq!(result_of_call.get(), None); // has not yet gotten a response
595///
596/// # any_spawner::Executor::tick().await;
597///
598/// // after call has resolved
599/// assert_eq!(input.get(), None); // input clears out after resolved
600/// assert_eq!(pending.get(), false); // no longer pending
601/// assert_eq!(result_of_call.get(), Some(42));
602/// assert_eq!(version.get(), 1);
603/// # });
604/// ```
605///
606/// The input to the `async` function should always be a single value,
607/// but it can be of any type. The argument is always passed by reference to the
608/// function, because it is stored in [Action::input] as well.
609///
610/// ```rust
611/// # use reactive_graph::actions::*; let owner = reactive_graph::owner::Owner::new(); owner.set();
612/// // if there's a single argument, just use that
613/// let action1 = Action::new(|input: &String| {
614///     let input = input.clone();
615///     async move { todo!() }
616/// });
617///
618/// // if there are no arguments, use the unit type `()`
619/// let action2 = Action::new(|input: &()| async { todo!() });
620///
621/// // if there are multiple arguments, use a tuple
622/// let action3 = Action::new(|input: &(usize, String)| async { todo!() });
623/// ```
624pub struct Action<I, O> {
625    inner: ArenaItem<ArcAction<I, O>>,
626    #[cfg(any(debug_assertions, leptos_debuginfo))]
627    defined_at: &'static Location<'static>,
628}
629
630impl<I, O> Dispose for Action<I, O> {
631    fn dispose(self) {
632        self.inner.dispose()
633    }
634}
635
636impl<I, O> Action<I, O>
637where
638    I: Send + Sync + 'static,
639    O: Send + Sync + 'static,
640{
641    /// Creates a new action. This is lazy: it does not run the action function until some value
642    /// is dispatched.
643    ///
644    /// The constructor takes a function which will create a new `Future` from some input data.
645    /// When the action is dispatched, this `action_fn` will run, and the `Future` it returns will
646    /// be spawned.
647    ///
648    /// The `action_fn` must be `Send + Sync` so that the `ArcAction` is `Send + Sync`. The
649    /// `Future` must be `Send` so that it can be moved across threads by the async executor as
650    /// needed. In order to be stored in the `Copy` arena, the input and output types should also
651    /// be `Send + Sync`.
652    ///
653    /// ```rust
654    /// # use reactive_graph::actions::*;
655    /// # use reactive_graph::prelude::*;
656    /// # tokio_test::block_on(async move {
657    /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
658    /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
659    /// let act = Action::new(|n: &u8| {
660    ///     let n = n.to_owned();
661    ///     async move { n * 2 }
662    /// });
663    ///
664    /// act.dispatch(3);
665    /// assert_eq!(act.input().get(), Some(3));
666    ///
667    /// // Remember that async functions already return a future if they are
668    /// // not `await`ed. You can save keystrokes by leaving out the `async move`
669    ///
670    /// let act2 = Action::new(|n: &String| yell(n.to_owned()));
671    /// act2.dispatch(String::from("i'm in a doctest"));
672    /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
673    ///
674    /// // after it resolves
675    /// assert_eq!(act2.value().get(), Some("I'M IN A DOCTEST".to_string()));
676    ///
677    /// async fn yell(n: String) -> String {
678    ///     n.to_uppercase()
679    /// }
680    /// # });
681    /// ```
682    #[track_caller]
683    pub fn new<F, Fu>(action_fn: F) -> Self
684    where
685        F: Fn(&I) -> Fu + Send + Sync + 'static,
686        Fu: Future<Output = O> + Send + 'static,
687    {
688        Self {
689            inner: ArenaItem::new(ArcAction::new(action_fn)),
690            #[cfg(any(debug_assertions, leptos_debuginfo))]
691            defined_at: Location::caller(),
692        }
693    }
694
695    /// Creates a new action, initializing it with the given value.
696    ///
697    /// This is lazy: it does not run the action function until some value is dispatched.
698    ///
699    /// The constructor takes a function which will create a new `Future` from some input data.
700    /// When the action is dispatched, this `action_fn` will run, and the `Future` it returns will
701    /// be spawned.
702    ///
703    /// The `action_fn` must be `Send + Sync` so that the `ArcAction` is `Send + Sync`. The
704    /// `Future` must be `Send` so that it can be moved across threads by the async executor as
705    /// needed. In order to be stored in the `Copy` arena, the input and output types should also
706    /// be `Send + Sync`.
707    #[track_caller]
708    pub fn new_with_value<F, Fu>(value: Option<O>, action_fn: F) -> Self
709    where
710        F: Fn(&I) -> Fu + Send + Sync + 'static,
711        Fu: Future<Output = O> + Send + 'static,
712    {
713        Self {
714            inner: ArenaItem::new(ArcAction::new_with_value(value, action_fn)),
715            #[cfg(any(debug_assertions, leptos_debuginfo))]
716            defined_at: Location::caller(),
717        }
718    }
719}
720
721impl<I, O> Action<I, O>
722where
723    I: 'static,
724    O: 'static,
725{
726    /// Clears the value of the action, setting its current value to `None`.
727    ///
728    /// This has no other effect: i.e., it will not cancel in-flight actions, set the
729    /// input, etc.
730    #[track_caller]
731    pub fn clear(&self) {
732        self.inner.try_with_value(|inner| inner.clear());
733    }
734}
735
736impl<I, O> Action<I, O>
737where
738    I: 'static,
739    O: 'static,
740{
741    /// Creates a new action, which does not require its inputs or outputs to be `Send`. In all other
742    /// ways, this is the same as [`Action::new`]. If this action is accessed from outside the
743    /// thread on which it was created, it panics.
744    #[track_caller]
745    pub fn new_local<F, Fu>(action_fn: F) -> Self
746    where
747        F: Fn(&I) -> Fu + 'static,
748        Fu: Future<Output = O> + 'static,
749    {
750        Self {
751            inner: ArenaItem::new(ArcAction::new_unsync(action_fn)),
752            #[cfg(any(debug_assertions, leptos_debuginfo))]
753            defined_at: Location::caller(),
754        }
755    }
756
757    /// Creates a new action with the initial value, which does not require its inputs or outputs to be `Send`. In all other
758    /// ways, this is the same as [`Action::new_with_value`]. If this action is accessed from outside the
759    /// thread on which it was created, it panics.
760    #[track_caller]
761    pub fn new_local_with_value<F, Fu>(value: Option<O>, action_fn: F) -> Self
762    where
763        F: Fn(&I) -> Fu + 'static,
764        Fu: Future<Output = O> + Send + 'static,
765    {
766        Self {
767            inner: ArenaItem::new(ArcAction::new_unsync_with_value(
768                value, action_fn,
769            )),
770            #[cfg(any(debug_assertions, leptos_debuginfo))]
771            defined_at: Location::caller(),
772        }
773    }
774}
775
776impl<I, O> Action<I, O>
777where
778    I: 'static,
779    O: 'static,
780{
781    /// The number of times the action has successfully completed.
782    ///
783    /// ```rust
784    /// # use reactive_graph::actions::*;
785    /// # use reactive_graph::prelude::*;
786    /// # tokio_test::block_on(async move {
787    /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
788    /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
789    /// let act = Action::new(|n: &u8| {
790    ///     let n = n.to_owned();
791    ///     async move { n * 2 }
792    /// });
793    ///
794    /// let version = act.version();
795    /// act.dispatch(3);
796    /// assert_eq!(version.get(), 0);
797    ///
798    /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
799    /// // after it resolves
800    /// assert_eq!(version.get(), 1);
801    /// # });
802    /// ```
803    #[track_caller]
804    pub fn version(&self) -> RwSignal<usize> {
805        let inner = self
806            .inner
807            .try_with_value(|inner| inner.version())
808            .unwrap_or_else(unwrap_signal!(self));
809        inner.into()
810    }
811
812    /// Whether the action has been dispatched and is currently waiting to resolve.
813    ///
814    /// ```rust
815    /// # use reactive_graph::actions::*;
816    /// # use reactive_graph::prelude::*;
817    /// # tokio_test::block_on(async move {
818    /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
819    /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
820    /// let act = Action::new(|n: &u8| {
821    ///     let n = n.to_owned();
822    ///     async move { n * 2 }
823    /// });
824    ///
825    /// let pending = act.pending();
826    /// assert_eq!(pending.get(), false);
827    /// act.dispatch(3);
828    /// assert_eq!(pending.get(), true);
829    ///
830    /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
831    /// // after it resolves
832    /// assert_eq!(pending.get(), false);
833    /// # });
834    /// ```
835    #[track_caller]
836    pub fn pending(&self) -> Memo<bool> {
837        let inner = self
838            .inner
839            .try_with_value(|inner| inner.pending())
840            .unwrap_or_else(unwrap_signal!(self));
841        inner.into()
842    }
843}
844
845impl<I, O> Action<I, O>
846where
847    I: 'static,
848    O: 'static,
849{
850    /// The current argument that was dispatched to the async function. This value will
851    /// be `Some` while we are waiting for it to resolve, and `None` after it has resolved.
852    ///
853    /// ```rust
854    /// # use reactive_graph::actions::*;
855    /// # use reactive_graph::prelude::*;
856    /// # tokio_test::block_on(async move {
857    /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
858    /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
859    /// let act = Action::new(|n: &u8| {
860    ///     let n = n.to_owned();
861    ///     async move { n * 2 }
862    /// });
863    ///
864    /// let input = act.input();
865    /// assert_eq!(input.get(), None);
866    /// act.dispatch(3);
867    /// assert_eq!(input.get(), Some(3));
868    ///
869    /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
870    /// // after it resolves
871    /// assert_eq!(input.get(), None);
872    /// # });
873    /// ```
874    #[track_caller]
875    pub fn input(&self) -> MappedSignal<Option<I>> {
876        self.inner
877            .try_with_value(|inner| inner.input())
878            .unwrap_or_else(unwrap_signal!(self))
879            .into()
880    }
881
882    /// The current argument that was dispatched to the async function. This value will
883    /// be `Some` while we are waiting for it to resolve, and `None` after it has resolved.
884    ///
885    /// Returns a thread-local signal using [`LocalStorage`].
886    #[track_caller]
887    #[deprecated = "You can now use .input() for any value, whether it's \
888                    thread-safe or not."]
889    pub fn input_local(&self) -> MappedSignal<Option<I>> {
890        self.inner
891            .try_with_value(|inner| inner.input())
892            .unwrap_or_else(unwrap_signal!(self))
893            .into()
894    }
895}
896
897impl<I, O> Action<I, O>
898where
899    I: 'static,
900    O: 'static,
901{
902    /// The most recent return value of the `async` function. This will be `None` before
903    /// the action has ever run successfully, and subsequently will always be `Some(_)`,
904    /// holding the old value until a new value has been received.
905    ///
906    /// ```rust
907    /// # use reactive_graph::actions::*;
908    /// # use reactive_graph::prelude::*;
909    /// # tokio_test::block_on(async move {
910    /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
911    /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
912    /// let act = Action::new(|n: &u8| {
913    ///     let n = n.to_owned();
914    ///     async move { n * 2 }
915    /// });
916    ///
917    /// let value = act.value();
918    /// assert_eq!(value.get(), None);
919    /// act.dispatch(3);
920    /// assert_eq!(value.get(), None);
921    ///
922    /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
923    /// // after it resolves
924    /// assert_eq!(value.get(), Some(6));
925    /// // dispatch another value, and it still holds the old value
926    /// act.dispatch(3);
927    /// assert_eq!(value.get(), Some(6));
928    /// # });
929    /// ```
930    #[track_caller]
931    pub fn value(&self) -> MappedSignal<Option<O>> {
932        self.inner
933            .try_with_value(|inner| inner.value())
934            .unwrap_or_else(unwrap_signal!(self))
935            .into()
936    }
937
938    /// The most recent return value of the `async` function. This will be `None` before
939    /// the action has ever run successfully, and subsequently will always be `Some(_)`,
940    /// holding the old value until a new value has been received.
941    ///
942    /// Returns a thread-local signal using [`LocalStorage`].
943    #[deprecated = "You can now use .value() for any value, whether it's \
944                    thread-safe or not."]
945    #[track_caller]
946    pub fn value_local(&self) -> MappedSignal<Option<O>>
947    where
948        O: Send + Sync,
949    {
950        self.inner
951            .try_with_value(|inner| inner.value())
952            .unwrap_or_else(unwrap_signal!(self))
953            .into()
954    }
955}
956
957impl<I, O> Action<I, O>
958where
959    I: Send + Sync + 'static,
960    O: Send + Sync + 'static,
961{
962    /// Calls the `async` function with a reference to the input type as its argument.
963    #[track_caller]
964    pub fn dispatch(&self, input: I) -> ActionAbortHandle {
965        self.inner
966            .try_get_value()
967            .map(|inner| inner.dispatch(input))
968            .unwrap_or_else(unwrap_signal!(self))
969    }
970}
971
972impl<I, O> Action<I, O>
973where
974    I: 'static,
975    O: 'static,
976{
977    /// Calls the `async` function with a reference to the input type as its argument.
978    #[track_caller]
979    pub fn dispatch_local(&self, input: I) -> ActionAbortHandle {
980        self.inner
981            .try_get_value()
982            .map(|inner| inner.dispatch_local(input))
983            .unwrap_or_else(unwrap_signal!(self))
984    }
985}
986
987impl<I, O> Action<I, O>
988where
989    I: Send + Sync + 'static,
990    O: Send + Sync + 'static,
991{
992    /// Creates a new action, which does not require the action itself to be `Send`, but will run
993    /// it on the same thread it was created on.
994    ///
995    /// In all other ways, this is identical to [`Action::new`].
996    #[track_caller]
997    pub fn new_unsync<F, Fu>(action_fn: F) -> Self
998    where
999        F: Fn(&I) -> Fu + 'static,
1000        Fu: Future<Output = O> + 'static,
1001    {
1002        Self {
1003            inner: ArenaItem::new_with_storage(ArcAction::new_unsync(
1004                action_fn,
1005            )),
1006            #[cfg(any(debug_assertions, leptos_debuginfo))]
1007            defined_at: Location::caller(),
1008        }
1009    }
1010
1011    /// Creates a new action, which does not require the action itself to be `Send`, but will run
1012    /// it on the same thread it was created on, and gives an initial value.
1013    ///
1014    /// In all other ways, this is identical to [`Action::new`].
1015    #[track_caller]
1016    pub fn new_unsync_with_value<F, Fu>(value: Option<O>, action_fn: F) -> Self
1017    where
1018        F: Fn(&I) -> Fu + 'static,
1019        Fu: Future<Output = O> + 'static,
1020    {
1021        Self {
1022            inner: ArenaItem::new_with_storage(
1023                ArcAction::new_unsync_with_value(value, action_fn),
1024            ),
1025            #[cfg(any(debug_assertions, leptos_debuginfo))]
1026            defined_at: Location::caller(),
1027        }
1028    }
1029}
1030
1031impl<I, O> Action<I, O>
1032where
1033    I: 'static,
1034    O: 'static,
1035{
1036    /// Creates a new action, which neither requires the action itself nor the
1037    /// value it returns to be `Send`. If this action is accessed from outside the
1038    /// thread on which it was created, it panics.
1039    ///
1040    /// This combines the features of [`Action::new_local`] and [`Action::new_unsync`].
1041    #[track_caller]
1042    pub fn new_unsync_local<F, Fu>(action_fn: F) -> Self
1043    where
1044        F: Fn(&I) -> Fu + 'static,
1045        Fu: Future<Output = O> + 'static,
1046    {
1047        Self {
1048            inner: ArenaItem::new_with_storage(ArcAction::new_unsync(
1049                action_fn,
1050            )),
1051            #[cfg(any(debug_assertions, leptos_debuginfo))]
1052            defined_at: Location::caller(),
1053        }
1054    }
1055
1056    /// Creates a new action, which neither requires the action itself nor the
1057    /// value it returns to be `Send`, and provides it with an initial value.
1058    /// If this action is accessed from outside the thread on which it was created, it panics.
1059    ///
1060    /// This combines the features of [`Action::new_local_with_value`] and
1061    /// [`Action::new_unsync_with_value`].
1062    #[track_caller]
1063    pub fn new_unsync_local_with_value<F, Fu>(
1064        value: Option<O>,
1065        action_fn: F,
1066    ) -> Self
1067    where
1068        F: Fn(&I) -> Fu + 'static,
1069        Fu: Future<Output = O> + 'static,
1070    {
1071        Self {
1072            inner: ArenaItem::new_with_storage(
1073                ArcAction::new_unsync_with_value(value, action_fn),
1074            ),
1075            #[cfg(any(debug_assertions, leptos_debuginfo))]
1076            defined_at: Location::caller(),
1077        }
1078    }
1079}
1080
1081impl<I, O> DefinedAt for Action<I, O> {
1082    fn defined_at(&self) -> Option<&'static Location<'static>> {
1083        #[cfg(any(debug_assertions, leptos_debuginfo))]
1084        {
1085            Some(self.defined_at)
1086        }
1087        #[cfg(not(any(debug_assertions, leptos_debuginfo)))]
1088        {
1089            None
1090        }
1091    }
1092}
1093
1094impl<I, O> Clone for Action<I, O> {
1095    fn clone(&self) -> Self {
1096        *self
1097    }
1098}
1099
1100impl<I, O> Copy for Action<I, O> {}
1101
1102/// Creates a new action. This is lazy: it does not run the action function until some value
1103/// is dispatched.
1104///
1105/// The constructor takes a function which will create a new `Future` from some input data.
1106/// When the action is dispatched, this `action_fn` will run, and the `Future` it returns will
1107/// be spawned.
1108///
1109/// The `action_fn` must be `Send + Sync` so that the `ArcAction` is `Send + Sync`. The
1110/// `Future` must be `Send` so that it can be moved across threads by the async executor as
1111/// needed. In order to be stored in the `Copy` arena, the input and output types should also
1112/// be `Send + Sync`.
1113///
1114/// ```rust
1115/// # use reactive_graph::actions::*;
1116/// # use reactive_graph::prelude::*;
1117/// # tokio_test::block_on(async move {
1118/// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
1119/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
1120/// let act = Action::new(|n: &u8| {
1121///     let n = n.to_owned();
1122///     async move { n * 2 }
1123/// });
1124///
1125/// act.dispatch(3);
1126/// assert_eq!(act.input().get(), Some(3));
1127///
1128/// // Remember that async functions already return a future if they are
1129/// // not `await`ed. You can save keystrokes by leaving out the `async move`
1130///
1131/// let act2 = Action::new(|n: &String| yell(n.to_owned()));
1132/// act2.dispatch(String::from("i'm in a doctest"));
1133/// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1134///
1135/// // after it resolves
1136/// assert_eq!(act2.value().get(), Some("I'M IN A DOCTEST".to_string()));
1137///
1138/// async fn yell(n: String) -> String {
1139///     n.to_uppercase()
1140/// }
1141/// # });
1142/// ```
1143#[inline(always)]
1144#[track_caller]
1145#[deprecated = "This function is being removed to conform to Rust idioms. \
1146                Please use `Action::new()` instead."]
1147pub fn create_action<I, O, F, Fu>(action_fn: F) -> Action<I, O>
1148where
1149    I: Send + Sync + 'static,
1150    O: Send + Sync + 'static,
1151    F: Fn(&I) -> Fu + Send + Sync + 'static,
1152    Fu: Future<Output = O> + Send + 'static,
1153{
1154    Action::new(action_fn)
1155}