reactive_graph/actions/
action.rs

1use crate::{
2    computed::{ArcMemo, Memo},
3    diagnostics::is_suppressing_resource_load,
4    owner::{ArcStoredValue, ArenaItem},
5    send_wrapper_ext::SendOption,
6    signal::{ArcMappedSignal, ArcRwSignal, MappedSignal, RwSignal},
7    traits::{DefinedAt, Dispose, Get, GetUntracked, GetValue, Update, Write},
8    unwrap_signal,
9};
10use any_spawner::Executor;
11use futures::{channel::oneshot, select, FutureExt};
12use send_wrapper::SendWrapper;
13use std::{
14    future::Future,
15    ops::{Deref, DerefMut},
16    panic::Location,
17    pin::Pin,
18    sync::Arc,
19};
20
21/// An action runs some asynchronous code when you dispatch a new value to it, and gives you
22/// reactive access to the result.
23///
24/// Actions are intended for mutating or updating data, not for loading data. If you find yourself
25/// creating an action and immediately dispatching a value to it, this is probably the wrong
26/// primitive.
27///
28/// The arena-allocated, `Copy` version of an `ArcAction` is an [`Action`].
29///
30/// ```rust
31/// # use reactive_graph::actions::*;
32/// # use reactive_graph::prelude::*;
33/// # tokio_test::block_on(async move {
34/// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
35/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
36/// async fn send_new_todo_to_api(task: String) -> usize {
37///     // do something...
38///     // return a task id
39///     42
40/// }
41/// let save_data = ArcAction::new(|task: &String| {
42///   // `task` is given as `&String` because its value is available in `input`
43///   send_new_todo_to_api(task.clone())
44/// });
45///
46/// // the argument currently running
47/// let input = save_data.input();
48/// // the most recent returned result
49/// let result_of_call = save_data.value();
50/// // whether the call is pending
51/// let pending = save_data.pending();
52/// // how many times the action has run
53/// // useful for reactively updating something else in response to a `dispatch` and response
54/// let version = save_data.version();
55///
56/// // before we do anything
57/// assert_eq!(input.get(), None); // no argument yet
58/// assert_eq!(pending.get(), false); // isn't pending a response
59/// assert_eq!(result_of_call.get(), None); // there's no "last value"
60/// assert_eq!(version.get(), 0);
61///
62/// // dispatch the action
63/// save_data.dispatch("My todo".to_string());
64///
65/// // when we're making the call
66/// assert_eq!(input.get(), Some("My todo".to_string()));
67/// assert_eq!(pending.get(), true); // is pending
68/// assert_eq!(result_of_call.get(), None); // has not yet gotten a response
69///
70/// # any_spawner::Executor::tick().await;
71///
72/// // after call has resolved
73/// assert_eq!(input.get(), None); // input clears out after resolved
74/// assert_eq!(pending.get(), false); // no longer pending
75/// assert_eq!(result_of_call.get(), Some(42));
76/// assert_eq!(version.get(), 1);
77/// # });
78/// ```
79///
80/// The input to the `async` function should always be a single value,
81/// but it can be of any type. The argument is always passed by reference to the
82/// function, because it is stored in [Action::input] as well.
83///
84/// ```rust
85/// # use reactive_graph::actions::*;
86/// // if there's a single argument, just use that
87/// let action1 = ArcAction::new(|input: &String| {
88///     let input = input.clone();
89///     async move { todo!() }
90/// });
91///
92/// // if there are no arguments, use the unit type `()`
93/// let action2 = ArcAction::new(|input: &()| async { todo!() });
94///
95/// // if there are multiple arguments, use a tuple
96/// let action3 = ArcAction::new(|input: &(usize, String)| async { todo!() });
97/// ```
98pub struct ArcAction<I, O> {
99    in_flight: ArcRwSignal<usize>,
100    input: ArcRwSignal<SendOption<I>>,
101    value: ArcRwSignal<SendOption<O>>,
102    version: ArcRwSignal<usize>,
103    dispatched: ArcStoredValue<usize>,
104    #[allow(clippy::complexity)]
105    action_fn: Arc<
106        dyn Fn(&I) -> Pin<Box<dyn Future<Output = O> + Send>> + Send + Sync,
107    >,
108    #[cfg(any(debug_assertions, leptos_debuginfo))]
109    defined_at: &'static Location<'static>,
110}
111
112impl<I, O> Clone for ArcAction<I, O> {
113    fn clone(&self) -> Self {
114        Self {
115            in_flight: self.in_flight.clone(),
116            input: self.input.clone(),
117            value: self.value.clone(),
118            version: self.version.clone(),
119            dispatched: self.dispatched.clone(),
120            action_fn: self.action_fn.clone(),
121            #[cfg(any(debug_assertions, leptos_debuginfo))]
122            defined_at: self.defined_at,
123        }
124    }
125}
126
127impl<I, O> ArcAction<I, O>
128where
129    I: 'static,
130    O: 'static,
131{
132    /// Creates a new action. This is lazy: it does not run the action function until some value
133    /// is dispatched.
134    ///
135    /// The constructor takes a function which will create a new `Future` from some input data.
136    /// When the action is dispatched, this `action_fn` will run, and the `Future` it returns will
137    /// be spawned.
138    ///
139    /// The `action_fn` must be `Send + Sync` so that the `ArcAction` is `Send + Sync`. The
140    /// `Future` must be `Send` so that it can be moved across threads by the async executor as
141    /// needed.
142    ///
143    /// ```rust
144    /// # use reactive_graph::actions::*;
145    /// # use reactive_graph::prelude::*;
146    /// # tokio_test::block_on(async move {
147    /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
148    /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
149    /// let act = ArcAction::new(|n: &u8| {
150    ///     let n = n.to_owned();
151    ///     async move { n * 2 }
152    /// });
153    ///
154    /// act.dispatch(3);
155    /// assert_eq!(act.input().get(), Some(3));
156    ///
157    /// // Remember that async functions already return a future if they are
158    /// // not `await`ed. You can save keystrokes by leaving out the `async move`
159    ///
160    /// let act2 = Action::new(|n: &String| yell(n.to_owned()));
161    /// act2.dispatch(String::from("i'm in a doctest"));
162    /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
163    ///
164    /// // after it resolves
165    /// assert_eq!(act2.value().get(), Some("I'M IN A DOCTEST".to_string()));
166    ///
167    /// async fn yell(n: String) -> String {
168    ///     n.to_uppercase()
169    /// }
170    /// # });
171    /// ```
172    #[track_caller]
173    pub fn new<F, Fu>(action_fn: F) -> Self
174    where
175        F: Fn(&I) -> Fu + Send + Sync + 'static,
176        Fu: Future<Output = O> + Send + 'static,
177        I: Send + Sync,
178        O: Send + Sync,
179    {
180        Self::new_with_value(None, action_fn)
181    }
182
183    /// Creates a new action, initializing it with the given value.
184    ///
185    /// This is lazy: it does not run the action function until some value is dispatched.
186    ///
187    /// The constructor takes a function which will create a new `Future` from some input data.
188    /// When the action is dispatched, this `action_fn` will run, and the `Future` it returns will
189    /// be spawned.
190    ///
191    /// The `action_fn` must be `Send + Sync` so that the `ArcAction` is `Send + Sync`. The
192    /// `Future` must be `Send` so that it can be moved across threads by the async executor as
193    /// needed.
194    #[track_caller]
195    pub fn new_with_value<F, Fu>(value: Option<O>, action_fn: F) -> Self
196    where
197        F: Fn(&I) -> Fu + Send + Sync + 'static,
198        Fu: Future<Output = O> + Send + 'static,
199        I: Send + Sync,
200        O: Send + Sync,
201    {
202        ArcAction {
203            in_flight: ArcRwSignal::new(0),
204            input: ArcRwSignal::new(SendOption::new(None)),
205            value: ArcRwSignal::new(SendOption::new(value)),
206            version: Default::default(),
207            dispatched: Default::default(),
208            action_fn: Arc::new(move |input| Box::pin(action_fn(input))),
209            #[cfg(any(debug_assertions, leptos_debuginfo))]
210            defined_at: Location::caller(),
211        }
212    }
213
214    /// Clears the value of the action, setting its current value to `None`.
215    ///
216    /// This has no other effect: i.e., it will not cancel in-flight actions, set the
217    /// input, etc.
218    #[track_caller]
219    pub fn clear(&self) {
220        if let Some(mut guard) = self.value.try_write() {
221            **guard = None;
222        }
223    }
224}
225
226/// A handle that allows aborting an in-flight action. It is returned from [`Action::dispatch`] or
227/// [`ArcAction::dispatch`].
228#[derive(Debug)]
229pub struct ActionAbortHandle(oneshot::Sender<()>);
230
231impl ActionAbortHandle {
232    /// Aborts the action.
233    ///
234    /// This will cause the dispatched task to complete, without updating the action's value. The
235    /// dispatched action's `Future` will no longer be polled. This does not guarantee that side
236    /// effects created by that `Future` no longer run: for example, if the action dispatches an
237    /// HTTP request, whether that request is actually canceled or not depends on whether the
238    /// request library actually cancels a request when its `Future` is dropped.
239    pub fn abort(self) {
240        let _ = self.0.send(());
241    }
242}
243
244impl<I, O> ArcAction<I, O>
245where
246    I: Send + Sync + 'static,
247    O: Send + Sync + 'static,
248{
249    /// Calls the `async` function with a reference to the input type as its argument.
250    #[track_caller]
251    pub fn dispatch(&self, input: I) -> ActionAbortHandle {
252        let (abort_tx, mut abort_rx) = oneshot::channel();
253        if !is_suppressing_resource_load() {
254            let mut fut = (self.action_fn)(&input).fuse();
255
256            // Update the state before loading
257            self.in_flight.update(|n| *n += 1);
258            let current_version = self.dispatched.get_value();
259            self.input.try_update(|inp| **inp = Some(input));
260
261            // Spawn the task
262            crate::spawn({
263                let input = self.input.clone();
264                let version = self.version.clone();
265                let dispatched = self.dispatched.clone();
266                let value = self.value.clone();
267                let in_flight = self.in_flight.clone();
268                async move {
269                    select! {
270                        // if the abort message has been sent, bail and do nothing
271                        _ = abort_rx => {
272                            in_flight.update(|n| *n = n.saturating_sub(1));
273                        },
274                        // otherwise, update the value
275                        result = fut => {
276                            in_flight.update(|n| *n = n.saturating_sub(1));
277                            let is_latest = dispatched.get_value() <= current_version;
278                            if is_latest {
279                                version.update(|n| *n += 1);
280                                value.update(|n| **n = Some(result));
281                            }
282                        }
283                    }
284                    if in_flight.get_untracked() == 0 {
285                        input.update(|inp| **inp = None);
286                    }
287                }
288            });
289        }
290
291        ActionAbortHandle(abort_tx)
292    }
293}
294
295impl<I, O> ArcAction<I, O>
296where
297    I: 'static,
298    O: 'static,
299{
300    /// Calls the `async` function with a reference to the input type as its argument,
301    /// ensuring that it is spawned on the current thread.
302    #[track_caller]
303    pub fn dispatch_local(&self, input: I) -> ActionAbortHandle {
304        let (abort_tx, mut abort_rx) = oneshot::channel();
305        if !is_suppressing_resource_load() {
306            let mut fut = (self.action_fn)(&input).fuse();
307
308            // Update the state before loading
309            self.in_flight.update(|n| *n += 1);
310            let current_version = self.dispatched.get_value();
311            self.input.try_update(|inp| **inp = Some(input));
312
313            // Spawn the task
314            Executor::spawn_local({
315                let input = self.input.clone();
316                let version = self.version.clone();
317                let value = self.value.clone();
318                let dispatched = self.dispatched.clone();
319                let in_flight = self.in_flight.clone();
320                async move {
321                    select! {
322                        // if the abort message has been sent, bail and do nothing
323                        _ = abort_rx => {
324                            in_flight.update(|n| *n = n.saturating_sub(1));
325                        },
326                        // otherwise, update the value
327                        result = fut => {
328                            in_flight.update(|n| *n = n.saturating_sub(1));
329                            let is_latest = dispatched.get_value() <= current_version;
330                            if is_latest {
331                                version.update(|n| *n += 1);
332                                value.update(|n| **n = Some(result));
333                            }
334                        }
335                    }
336                    if in_flight.get_untracked() == 0 {
337                        input.update(|inp| **inp = None);
338                    }
339                }
340            });
341        }
342        ActionAbortHandle(abort_tx)
343    }
344}
345
346impl<I, O> ArcAction<I, O>
347where
348    I: 'static,
349    O: 'static,
350{
351    /// Creates a new action, which will only be run on the thread in which it is created.
352    ///
353    /// In all other ways, this is identical to [`ArcAction::new`].
354    #[track_caller]
355    pub fn new_unsync<F, Fu>(action_fn: F) -> Self
356    where
357        F: Fn(&I) -> Fu + 'static,
358        Fu: Future<Output = O> + 'static,
359    {
360        let action_fn = move |inp: &I| SendWrapper::new(action_fn(inp));
361        Self::new_unsync_with_value(None, action_fn)
362    }
363
364    /// Creates a new action that will only run on the current thread, initializing it with the given value.
365    ///
366    /// In all other ways, this is identical to [`ArcAction::new_with_value`].
367    #[track_caller]
368    pub fn new_unsync_with_value<F, Fu>(value: Option<O>, action_fn: F) -> Self
369    where
370        F: Fn(&I) -> Fu + 'static,
371        Fu: Future<Output = O> + 'static,
372    {
373        let action_fn = SendWrapper::new(action_fn);
374        ArcAction {
375            in_flight: ArcRwSignal::new(0),
376            input: ArcRwSignal::new(SendOption::new_local(None)),
377            value: ArcRwSignal::new(SendOption::new_local(value)),
378            version: Default::default(),
379            dispatched: Default::default(),
380            action_fn: Arc::new(move |input| {
381                Box::pin(SendWrapper::new(action_fn(input)))
382            }),
383            #[cfg(any(debug_assertions, leptos_debuginfo))]
384            defined_at: Location::caller(),
385        }
386    }
387}
388
389impl<I, O> ArcAction<I, O>
390where
391    I: 'static,
392    O: 'static,
393{
394    /// The number of times the action has successfully completed.
395    ///
396    /// ```rust
397    /// # use reactive_graph::actions::*;
398    /// # use reactive_graph::prelude::*;
399    /// # tokio_test::block_on(async move {
400    /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
401    /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
402    /// let act = ArcAction::new(|n: &u8| {
403    ///     let n = n.to_owned();
404    ///     async move { n * 2 }
405    /// });
406    ///
407    /// let version = act.version();
408    /// act.dispatch(3);
409    /// assert_eq!(version.get(), 0);
410    ///
411    /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
412    /// // after it resolves
413    /// assert_eq!(version.get(), 1);
414    /// # });
415    /// ```
416    #[track_caller]
417    pub fn version(&self) -> ArcRwSignal<usize> {
418        self.version.clone()
419    }
420
421    /// The current argument that was dispatched to the async function. This value will
422    /// be `Some` while we are waiting for it to resolve, and `None` after it has resolved.
423    ///
424    /// ```rust
425    /// # use reactive_graph::actions::*;
426    /// # use reactive_graph::prelude::*;
427    /// # tokio_test::block_on(async move {
428    /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
429    /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
430    /// let act = ArcAction::new(|n: &u8| {
431    ///     let n = n.to_owned();
432    ///     async move { n * 2 }
433    /// });
434    ///
435    /// let input = act.input();
436    /// assert_eq!(input.get(), None);
437    /// act.dispatch(3);
438    /// assert_eq!(input.get(), Some(3));
439    ///
440    /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
441    /// // after it resolves
442    /// assert_eq!(input.get(), None);
443    /// # });
444    /// ```
445    #[track_caller]
446    pub fn input(&self) -> ArcMappedSignal<Option<I>> {
447        ArcMappedSignal::new(
448            self.input.clone(),
449            |n| n.deref(),
450            |n| n.deref_mut(),
451        )
452    }
453
454    /// The most recent return value of the `async` function. This will be `None` before
455    /// the action has ever run successfully, and subsequently will always be `Some(_)`,
456    /// holding the old value until a new value has been received.
457    ///
458    /// ```rust
459    /// # use reactive_graph::actions::*;
460    /// # use reactive_graph::prelude::*;
461    /// # tokio_test::block_on(async move {
462    /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
463    /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
464    /// let act = ArcAction::new(|n: &u8| {
465    ///     let n = n.to_owned();
466    ///     async move { n * 2 }
467    /// });
468    ///
469    /// let value = act.value();
470    /// assert_eq!(value.get(), None);
471    /// act.dispatch(3);
472    /// assert_eq!(value.get(), None);
473    ///
474    /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
475    /// // after it resolves
476    /// assert_eq!(value.get(), Some(6));
477    /// // dispatch another value, and it still holds the old value
478    /// act.dispatch(3);
479    /// assert_eq!(value.get(), Some(6));
480    /// # });
481    /// ```
482    #[track_caller]
483    pub fn value(&self) -> ArcMappedSignal<Option<O>> {
484        ArcMappedSignal::new(
485            self.value.clone(),
486            |n| n.deref(),
487            |n| n.deref_mut(),
488        )
489    }
490
491    /// Whether the action has been dispatched and is currently waiting to resolve.
492    ///
493    /// ```rust
494    /// # use reactive_graph::actions::*;
495    /// # use reactive_graph::prelude::*;
496    /// # tokio_test::block_on(async move {
497    /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
498    /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
499    /// let act = ArcAction::new(|n: &u8| {
500    ///     let n = n.to_owned();
501    ///     async move { n * 2 }
502    /// });
503    ///
504    /// let pending = act.pending();
505    /// assert_eq!(pending.get(), false);
506    /// act.dispatch(3);
507    /// assert_eq!(pending.get(), true);
508    ///
509    /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
510    /// // after it resolves
511    /// assert_eq!(pending.get(), false);
512    /// # });
513    /// ```
514    #[track_caller]
515    pub fn pending(&self) -> ArcMemo<bool> {
516        let in_flight = self.in_flight.clone();
517        ArcMemo::new(move |_| in_flight.get() > 0)
518    }
519}
520
521impl<I, O> DefinedAt for ArcAction<I, O>
522where
523    I: 'static,
524    O: 'static,
525{
526    fn defined_at(&self) -> Option<&'static Location<'static>> {
527        #[cfg(any(debug_assertions, leptos_debuginfo))]
528        {
529            Some(self.defined_at)
530        }
531        #[cfg(not(any(debug_assertions, leptos_debuginfo)))]
532        {
533            None
534        }
535    }
536}
537
538/// An action runs some asynchronous code when you dispatch a new value to it, and gives you
539/// reactive access to the result.
540///
541/// Actions are intended for mutating or updating data, not for loading data. If you find yourself
542/// creating an action and immediately dispatching a value to it, this is probably the wrong
543/// primitive.
544///
545/// The reference-counted, `Clone` (but not `Copy` version of an `Action` is an [`ArcAction`].
546///
547/// ```rust
548/// # use reactive_graph::actions::*;
549/// # use reactive_graph::prelude::*;
550/// # tokio_test::block_on(async move {
551/// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
552/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
553/// async fn send_new_todo_to_api(task: String) -> usize {
554///     // do something...
555///     // return a task id
556///     42
557/// }
558/// let save_data = Action::new(|task: &String| {
559///   // `task` is given as `&String` because its value is available in `input`
560///   send_new_todo_to_api(task.clone())
561/// });
562///
563/// // the argument currently running
564/// let input = save_data.input();
565/// // the most recent returned result
566/// let result_of_call = save_data.value();
567/// // whether the call is pending
568/// let pending = save_data.pending();
569/// // how many times the action has run
570/// // useful for reactively updating something else in response to a `dispatch` and response
571/// let version = save_data.version();
572///
573/// // before we do anything
574/// assert_eq!(input.get(), None); // no argument yet
575/// assert_eq!(pending.get(), false); // isn't pending a response
576/// assert_eq!(result_of_call.get(), None); // there's no "last value"
577/// assert_eq!(version.get(), 0);
578///
579/// // dispatch the action
580/// save_data.dispatch("My todo".to_string());
581///
582/// // when we're making the call
583/// assert_eq!(input.get(), Some("My todo".to_string()));
584/// assert_eq!(pending.get(), true); // is pending
585/// assert_eq!(result_of_call.get(), None); // has not yet gotten a response
586///
587/// # any_spawner::Executor::tick().await;
588///
589/// // after call has resolved
590/// assert_eq!(input.get(), None); // input clears out after resolved
591/// assert_eq!(pending.get(), false); // no longer pending
592/// assert_eq!(result_of_call.get(), Some(42));
593/// assert_eq!(version.get(), 1);
594/// # });
595/// ```
596///
597/// The input to the `async` function should always be a single value,
598/// but it can be of any type. The argument is always passed by reference to the
599/// function, because it is stored in [Action::input] as well.
600///
601/// ```rust
602/// # use reactive_graph::actions::*; let owner = reactive_graph::owner::Owner::new(); owner.set();
603/// // if there's a single argument, just use that
604/// let action1 = Action::new(|input: &String| {
605///     let input = input.clone();
606///     async move { todo!() }
607/// });
608///
609/// // if there are no arguments, use the unit type `()`
610/// let action2 = Action::new(|input: &()| async { todo!() });
611///
612/// // if there are multiple arguments, use a tuple
613/// let action3 = Action::new(|input: &(usize, String)| async { todo!() });
614/// ```
615pub struct Action<I, O> {
616    inner: ArenaItem<ArcAction<I, O>>,
617    #[cfg(any(debug_assertions, leptos_debuginfo))]
618    defined_at: &'static Location<'static>,
619}
620
621impl<I, O> Dispose for Action<I, O> {
622    fn dispose(self) {
623        self.inner.dispose()
624    }
625}
626
627impl<I, O> Action<I, O>
628where
629    I: Send + Sync + 'static,
630    O: Send + Sync + 'static,
631{
632    /// Creates a new action. This is lazy: it does not run the action function until some value
633    /// is dispatched.
634    ///
635    /// The constructor takes a function which will create a new `Future` from some input data.
636    /// When the action is dispatched, this `action_fn` will run, and the `Future` it returns will
637    /// be spawned.
638    ///
639    /// The `action_fn` must be `Send + Sync` so that the `ArcAction` is `Send + Sync`. The
640    /// `Future` must be `Send` so that it can be moved across threads by the async executor as
641    /// needed. In order to be stored in the `Copy` arena, the input and output types should also
642    /// be `Send + Sync`.
643    ///
644    /// ```rust
645    /// # use reactive_graph::actions::*;
646    /// # use reactive_graph::prelude::*;
647    /// # tokio_test::block_on(async move {
648    /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
649    /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
650    /// let act = Action::new(|n: &u8| {
651    ///     let n = n.to_owned();
652    ///     async move { n * 2 }
653    /// });
654    ///
655    /// act.dispatch(3);
656    /// assert_eq!(act.input().get(), Some(3));
657    ///
658    /// // Remember that async functions already return a future if they are
659    /// // not `await`ed. You can save keystrokes by leaving out the `async move`
660    ///
661    /// let act2 = Action::new(|n: &String| yell(n.to_owned()));
662    /// act2.dispatch(String::from("i'm in a doctest"));
663    /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
664    ///
665    /// // after it resolves
666    /// assert_eq!(act2.value().get(), Some("I'M IN A DOCTEST".to_string()));
667    ///
668    /// async fn yell(n: String) -> String {
669    ///     n.to_uppercase()
670    /// }
671    /// # });
672    /// ```
673    #[track_caller]
674    pub fn new<F, Fu>(action_fn: F) -> Self
675    where
676        F: Fn(&I) -> Fu + Send + Sync + 'static,
677        Fu: Future<Output = O> + Send + 'static,
678    {
679        Self {
680            inner: ArenaItem::new(ArcAction::new(action_fn)),
681            #[cfg(any(debug_assertions, leptos_debuginfo))]
682            defined_at: Location::caller(),
683        }
684    }
685
686    /// Creates a new action, initializing it with the given value.
687    ///
688    /// This is lazy: it does not run the action function until some value is dispatched.
689    ///
690    /// The constructor takes a function which will create a new `Future` from some input data.
691    /// When the action is dispatched, this `action_fn` will run, and the `Future` it returns will
692    /// be spawned.
693    ///
694    /// The `action_fn` must be `Send + Sync` so that the `ArcAction` is `Send + Sync`. The
695    /// `Future` must be `Send` so that it can be moved across threads by the async executor as
696    /// needed. In order to be stored in the `Copy` arena, the input and output types should also
697    /// be `Send + Sync`.
698    #[track_caller]
699    pub fn new_with_value<F, Fu>(value: Option<O>, action_fn: F) -> Self
700    where
701        F: Fn(&I) -> Fu + Send + Sync + 'static,
702        Fu: Future<Output = O> + Send + 'static,
703    {
704        Self {
705            inner: ArenaItem::new(ArcAction::new_with_value(value, action_fn)),
706            #[cfg(any(debug_assertions, leptos_debuginfo))]
707            defined_at: Location::caller(),
708        }
709    }
710}
711
712impl<I, O> Action<I, O>
713where
714    I: 'static,
715    O: 'static,
716{
717    /// Clears the value of the action, setting its current value to `None`.
718    ///
719    /// This has no other effect: i.e., it will not cancel in-flight actions, set the
720    /// input, etc.
721    #[track_caller]
722    pub fn clear(&self) {
723        self.inner.try_with_value(|inner| inner.clear());
724    }
725}
726
727impl<I, O> Action<I, O>
728where
729    I: 'static,
730    O: 'static,
731{
732    /// Creates a new action, which does not require its inputs or outputs to be `Send`. In all other
733    /// ways, this is the same as [`Action::new`]. If this action is accessed from outside the
734    /// thread on which it was created, it panics.
735    #[track_caller]
736    pub fn new_local<F, Fu>(action_fn: F) -> Self
737    where
738        F: Fn(&I) -> Fu + 'static,
739        Fu: Future<Output = O> + 'static,
740    {
741        Self {
742            inner: ArenaItem::new(ArcAction::new_unsync(action_fn)),
743            #[cfg(any(debug_assertions, leptos_debuginfo))]
744            defined_at: Location::caller(),
745        }
746    }
747
748    /// Creates a new action with the initial value, which does not require its inputs or outputs to be `Send`. In all other
749    /// ways, this is the same as [`Action::new_with_value`]. If this action is accessed from outside the
750    /// thread on which it was created, it panics.
751    #[track_caller]
752    pub fn new_local_with_value<F, Fu>(value: Option<O>, action_fn: F) -> Self
753    where
754        F: Fn(&I) -> Fu + 'static,
755        Fu: Future<Output = O> + Send + 'static,
756    {
757        Self {
758            inner: ArenaItem::new(ArcAction::new_unsync_with_value(
759                value, action_fn,
760            )),
761            #[cfg(any(debug_assertions, leptos_debuginfo))]
762            defined_at: Location::caller(),
763        }
764    }
765}
766
767impl<I, O> Action<I, O>
768where
769    I: 'static,
770    O: 'static,
771{
772    /// The number of times the action has successfully completed.
773    ///
774    /// ```rust
775    /// # use reactive_graph::actions::*;
776    /// # use reactive_graph::prelude::*;
777    /// # tokio_test::block_on(async move {
778    /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
779    /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
780    /// let act = Action::new(|n: &u8| {
781    ///     let n = n.to_owned();
782    ///     async move { n * 2 }
783    /// });
784    ///
785    /// let version = act.version();
786    /// act.dispatch(3);
787    /// assert_eq!(version.get(), 0);
788    ///
789    /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
790    /// // after it resolves
791    /// assert_eq!(version.get(), 1);
792    /// # });
793    /// ```
794    #[track_caller]
795    pub fn version(&self) -> RwSignal<usize> {
796        let inner = self
797            .inner
798            .try_with_value(|inner| inner.version())
799            .unwrap_or_else(unwrap_signal!(self));
800        inner.into()
801    }
802
803    /// Whether the action has been dispatched and is currently waiting to resolve.
804    ///
805    /// ```rust
806    /// # use reactive_graph::actions::*;
807    /// # use reactive_graph::prelude::*;
808    /// # tokio_test::block_on(async move {
809    /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
810    /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
811    /// let act = Action::new(|n: &u8| {
812    ///     let n = n.to_owned();
813    ///     async move { n * 2 }
814    /// });
815    ///
816    /// let pending = act.pending();
817    /// assert_eq!(pending.get(), false);
818    /// act.dispatch(3);
819    /// assert_eq!(pending.get(), true);
820    ///
821    /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
822    /// // after it resolves
823    /// assert_eq!(pending.get(), false);
824    /// # });
825    /// ```
826    #[track_caller]
827    pub fn pending(&self) -> Memo<bool> {
828        let inner = self
829            .inner
830            .try_with_value(|inner| inner.pending())
831            .unwrap_or_else(unwrap_signal!(self));
832        inner.into()
833    }
834}
835
836impl<I, O> Action<I, O>
837where
838    I: 'static,
839    O: 'static,
840{
841    /// The current argument that was dispatched to the async function. This value will
842    /// be `Some` while we are waiting for it to resolve, and `None` after it has resolved.
843    ///
844    /// ```rust
845    /// # use reactive_graph::actions::*;
846    /// # use reactive_graph::prelude::*;
847    /// # tokio_test::block_on(async move {
848    /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
849    /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
850    /// let act = Action::new(|n: &u8| {
851    ///     let n = n.to_owned();
852    ///     async move { n * 2 }
853    /// });
854    ///
855    /// let input = act.input();
856    /// assert_eq!(input.get(), None);
857    /// act.dispatch(3);
858    /// assert_eq!(input.get(), Some(3));
859    ///
860    /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
861    /// // after it resolves
862    /// assert_eq!(input.get(), None);
863    /// # });
864    /// ```
865    #[track_caller]
866    pub fn input(&self) -> MappedSignal<Option<I>> {
867        self.inner
868            .try_with_value(|inner| inner.input())
869            .unwrap_or_else(unwrap_signal!(self))
870            .into()
871    }
872
873    /// The current argument that was dispatched to the async function. This value will
874    /// be `Some` while we are waiting for it to resolve, and `None` after it has resolved.
875    ///
876    /// Returns a thread-local signal using [`LocalStorage`].
877    #[track_caller]
878    #[deprecated = "You can now use .input() for any value, whether it's \
879                    thread-safe or not."]
880    pub fn input_local(&self) -> MappedSignal<Option<I>> {
881        self.inner
882            .try_with_value(|inner| inner.input())
883            .unwrap_or_else(unwrap_signal!(self))
884            .into()
885    }
886}
887
888impl<I, O> Action<I, O>
889where
890    I: 'static,
891    O: 'static,
892{
893    /// The most recent return value of the `async` function. This will be `None` before
894    /// the action has ever run successfully, and subsequently will always be `Some(_)`,
895    /// holding the old value until a new value has been received.
896    ///
897    /// ```rust
898    /// # use reactive_graph::actions::*;
899    /// # use reactive_graph::prelude::*;
900    /// # tokio_test::block_on(async move {
901    /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
902    /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
903    /// let act = Action::new(|n: &u8| {
904    ///     let n = n.to_owned();
905    ///     async move { n * 2 }
906    /// });
907    ///
908    /// let value = act.value();
909    /// assert_eq!(value.get(), None);
910    /// act.dispatch(3);
911    /// assert_eq!(value.get(), None);
912    ///
913    /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
914    /// // after it resolves
915    /// assert_eq!(value.get(), Some(6));
916    /// // dispatch another value, and it still holds the old value
917    /// act.dispatch(3);
918    /// assert_eq!(value.get(), Some(6));
919    /// # });
920    /// ```
921    #[track_caller]
922    pub fn value(&self) -> MappedSignal<Option<O>> {
923        self.inner
924            .try_with_value(|inner| inner.value())
925            .unwrap_or_else(unwrap_signal!(self))
926            .into()
927    }
928
929    /// The most recent return value of the `async` function. This will be `None` before
930    /// the action has ever run successfully, and subsequently will always be `Some(_)`,
931    /// holding the old value until a new value has been received.
932    ///
933    /// Returns a thread-local signal using [`LocalStorage`].
934    #[deprecated = "You can now use .value() for any value, whether it's \
935                    thread-safe or not."]
936    #[track_caller]
937    pub fn value_local(&self) -> MappedSignal<Option<O>>
938    where
939        O: Send + Sync,
940    {
941        self.inner
942            .try_with_value(|inner| inner.value())
943            .unwrap_or_else(unwrap_signal!(self))
944            .into()
945    }
946}
947
948impl<I, O> Action<I, O>
949where
950    I: Send + Sync + 'static,
951    O: Send + Sync + 'static,
952{
953    /// Calls the `async` function with a reference to the input type as its argument.
954    #[track_caller]
955    pub fn dispatch(&self, input: I) -> ActionAbortHandle {
956        self.inner
957            .try_get_value()
958            .map(|inner| inner.dispatch(input))
959            .unwrap_or_else(unwrap_signal!(self))
960    }
961}
962
963impl<I, O> Action<I, O>
964where
965    I: 'static,
966    O: 'static,
967{
968    /// Calls the `async` function with a reference to the input type as its argument.
969    #[track_caller]
970    pub fn dispatch_local(&self, input: I) -> ActionAbortHandle {
971        self.inner
972            .try_get_value()
973            .map(|inner| inner.dispatch_local(input))
974            .unwrap_or_else(unwrap_signal!(self))
975    }
976}
977
978impl<I, O> Action<I, O>
979where
980    I: Send + Sync + 'static,
981    O: Send + Sync + 'static,
982{
983    /// Creates a new action, which does not require the action itself to be `Send`, but will run
984    /// it on the same thread it was created on.
985    ///
986    /// In all other ways, this is identical to [`Action::new`].
987    #[track_caller]
988    pub fn new_unsync<F, Fu>(action_fn: F) -> Self
989    where
990        F: Fn(&I) -> Fu + 'static,
991        Fu: Future<Output = O> + 'static,
992    {
993        Self {
994            inner: ArenaItem::new_with_storage(ArcAction::new_unsync(
995                action_fn,
996            )),
997            #[cfg(any(debug_assertions, leptos_debuginfo))]
998            defined_at: Location::caller(),
999        }
1000    }
1001
1002    /// Creates a new action, which does not require the action itself to be `Send`, but will run
1003    /// it on the same thread it was created on, and gives an initial value.
1004    ///
1005    /// In all other ways, this is identical to [`Action::new`].
1006    #[track_caller]
1007    pub fn new_unsync_with_value<F, Fu>(value: Option<O>, action_fn: F) -> Self
1008    where
1009        F: Fn(&I) -> Fu + 'static,
1010        Fu: Future<Output = O> + 'static,
1011    {
1012        Self {
1013            inner: ArenaItem::new_with_storage(
1014                ArcAction::new_unsync_with_value(value, action_fn),
1015            ),
1016            #[cfg(any(debug_assertions, leptos_debuginfo))]
1017            defined_at: Location::caller(),
1018        }
1019    }
1020}
1021
1022impl<I, O> Action<I, O>
1023where
1024    I: 'static,
1025    O: 'static,
1026{
1027    /// Creates a new action, which neither requires the action itself nor the
1028    /// value it returns to be `Send`. If this action is accessed from outside the
1029    /// thread on which it was created, it panics.
1030    ///
1031    /// This combines the features of [`Action::new_local`] and [`Action::new_unsync`].
1032    #[track_caller]
1033    pub fn new_unsync_local<F, Fu>(action_fn: F) -> Self
1034    where
1035        F: Fn(&I) -> Fu + 'static,
1036        Fu: Future<Output = O> + 'static,
1037    {
1038        Self {
1039            inner: ArenaItem::new_with_storage(ArcAction::new_unsync(
1040                action_fn,
1041            )),
1042            #[cfg(any(debug_assertions, leptos_debuginfo))]
1043            defined_at: Location::caller(),
1044        }
1045    }
1046
1047    /// Creates a new action, which neither requires the action itself nor the
1048    /// value it returns to be `Send`, and provides it with an initial value.
1049    /// If this action is accessed from outside the thread on which it was created, it panics.
1050    ///
1051    /// This combines the features of [`Action::new_local_with_value`] and
1052    /// [`Action::new_unsync_with_value`].
1053    #[track_caller]
1054    pub fn new_unsync_local_with_value<F, Fu>(
1055        value: Option<O>,
1056        action_fn: F,
1057    ) -> Self
1058    where
1059        F: Fn(&I) -> Fu + 'static,
1060        Fu: Future<Output = O> + 'static,
1061    {
1062        Self {
1063            inner: ArenaItem::new_with_storage(
1064                ArcAction::new_unsync_with_value(value, action_fn),
1065            ),
1066            #[cfg(any(debug_assertions, leptos_debuginfo))]
1067            defined_at: Location::caller(),
1068        }
1069    }
1070}
1071
1072impl<I, O> DefinedAt for Action<I, O> {
1073    fn defined_at(&self) -> Option<&'static Location<'static>> {
1074        #[cfg(any(debug_assertions, leptos_debuginfo))]
1075        {
1076            Some(self.defined_at)
1077        }
1078        #[cfg(not(any(debug_assertions, leptos_debuginfo)))]
1079        {
1080            None
1081        }
1082    }
1083}
1084
1085impl<I, O> Clone for Action<I, O> {
1086    fn clone(&self) -> Self {
1087        *self
1088    }
1089}
1090
1091impl<I, O> Copy for Action<I, O> {}
1092
1093/// Creates a new action. This is lazy: it does not run the action function until some value
1094/// is dispatched.
1095///
1096/// The constructor takes a function which will create a new `Future` from some input data.
1097/// When the action is dispatched, this `action_fn` will run, and the `Future` it returns will
1098/// be spawned.
1099///
1100/// The `action_fn` must be `Send + Sync` so that the `ArcAction` is `Send + Sync`. The
1101/// `Future` must be `Send` so that it can be moved across threads by the async executor as
1102/// needed. In order to be stored in the `Copy` arena, the input and output types should also
1103/// be `Send + Sync`.
1104///
1105/// ```rust
1106/// # use reactive_graph::actions::*;
1107/// # use reactive_graph::prelude::*;
1108/// # tokio_test::block_on(async move {
1109/// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
1110/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
1111/// let act = Action::new(|n: &u8| {
1112///     let n = n.to_owned();
1113///     async move { n * 2 }
1114/// });
1115///
1116/// act.dispatch(3);
1117/// assert_eq!(act.input().get(), Some(3));
1118///
1119/// // Remember that async functions already return a future if they are
1120/// // not `await`ed. You can save keystrokes by leaving out the `async move`
1121///
1122/// let act2 = Action::new(|n: &String| yell(n.to_owned()));
1123/// act2.dispatch(String::from("i'm in a doctest"));
1124/// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1125///
1126/// // after it resolves
1127/// assert_eq!(act2.value().get(), Some("I'M IN A DOCTEST".to_string()));
1128///
1129/// async fn yell(n: String) -> String {
1130///     n.to_uppercase()
1131/// }
1132/// # });
1133/// ```
1134#[inline(always)]
1135#[track_caller]
1136#[deprecated = "This function is being removed to conform to Rust idioms. \
1137                Please use `Action::new()` instead."]
1138pub fn create_action<I, O, F, Fu>(action_fn: F) -> Action<I, O>
1139where
1140    I: Send + Sync + 'static,
1141    O: Send + Sync + 'static,
1142    F: Fn(&I) -> Fu + Send + Sync + 'static,
1143    Fu: Future<Output = O> + Send + 'static,
1144{
1145    Action::new(action_fn)
1146}