reactive_graph/actions/action.rs
1use crate::{
2 computed::{ArcMemo, Memo},
3 diagnostics::is_suppressing_resource_load,
4 owner::{ArcStoredValue, ArenaItem},
5 send_wrapper_ext::SendOption,
6 signal::{ArcMappedSignal, ArcRwSignal, MappedSignal, RwSignal},
7 traits::{DefinedAt, Dispose, Get, GetUntracked, GetValue, Update, Write},
8 unwrap_signal,
9};
10use any_spawner::Executor;
11use futures::{channel::oneshot, select, FutureExt};
12use send_wrapper::SendWrapper;
13use std::{
14 future::Future,
15 ops::{Deref, DerefMut},
16 panic::Location,
17 pin::Pin,
18 sync::Arc,
19};
20
21/// An action runs some asynchronous code when you dispatch a new value to it, and gives you
22/// reactive access to the result.
23///
24/// Actions are intended for mutating or updating data, not for loading data. If you find yourself
25/// creating an action and immediately dispatching a value to it, this is probably the wrong
26/// primitive.
27///
28/// The arena-allocated, `Copy` version of an `ArcAction` is an [`Action`].
29///
30/// ```rust
31/// # use reactive_graph::actions::*;
32/// # use reactive_graph::prelude::*;
33/// # tokio_test::block_on(async move {
34/// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
35/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
36/// async fn send_new_todo_to_api(task: String) -> usize {
37/// // do something...
38/// // return a task id
39/// 42
40/// }
41/// let save_data = ArcAction::new(|task: &String| {
42/// // `task` is given as `&String` because its value is available in `input`
43/// send_new_todo_to_api(task.clone())
44/// });
45///
46/// // the argument currently running
47/// let input = save_data.input();
48/// // the most recent returned result
49/// let result_of_call = save_data.value();
50/// // whether the call is pending
51/// let pending = save_data.pending();
52/// // how many times the action has run
53/// // useful for reactively updating something else in response to a `dispatch` and response
54/// let version = save_data.version();
55///
56/// // before we do anything
57/// assert_eq!(input.get(), None); // no argument yet
58/// assert_eq!(pending.get(), false); // isn't pending a response
59/// assert_eq!(result_of_call.get(), None); // there's no "last value"
60/// assert_eq!(version.get(), 0);
61///
62/// // dispatch the action
63/// save_data.dispatch("My todo".to_string());
64///
65/// // when we're making the call
66/// assert_eq!(input.get(), Some("My todo".to_string()));
67/// assert_eq!(pending.get(), true); // is pending
68/// assert_eq!(result_of_call.get(), None); // has not yet gotten a response
69///
70/// # any_spawner::Executor::tick().await;
71///
72/// // after call has resolved
73/// assert_eq!(input.get(), None); // input clears out after resolved
74/// assert_eq!(pending.get(), false); // no longer pending
75/// assert_eq!(result_of_call.get(), Some(42));
76/// assert_eq!(version.get(), 1);
77/// # });
78/// ```
79///
80/// The input to the `async` function should always be a single value,
81/// but it can be of any type. The argument is always passed by reference to the
82/// function, because it is stored in [Action::input] as well.
83///
84/// ```rust
85/// # use reactive_graph::actions::*;
86/// // if there's a single argument, just use that
87/// let action1 = ArcAction::new(|input: &String| {
88/// let input = input.clone();
89/// async move { todo!() }
90/// });
91///
92/// // if there are no arguments, use the unit type `()`
93/// let action2 = ArcAction::new(|input: &()| async { todo!() });
94///
95/// // if there are multiple arguments, use a tuple
96/// let action3 = ArcAction::new(|input: &(usize, String)| async { todo!() });
97/// ```
98pub struct ArcAction<I, O> {
99 in_flight: ArcRwSignal<usize>,
100 input: ArcRwSignal<SendOption<I>>,
101 value: ArcRwSignal<SendOption<O>>,
102 version: ArcRwSignal<usize>,
103 dispatched: ArcStoredValue<usize>,
104 #[allow(clippy::complexity)]
105 action_fn: Arc<
106 dyn Fn(&I) -> Pin<Box<dyn Future<Output = O> + Send>> + Send + Sync,
107 >,
108 #[cfg(any(debug_assertions, leptos_debuginfo))]
109 defined_at: &'static Location<'static>,
110}
111
112impl<I, O> Clone for ArcAction<I, O> {
113 fn clone(&self) -> Self {
114 Self {
115 in_flight: self.in_flight.clone(),
116 input: self.input.clone(),
117 value: self.value.clone(),
118 version: self.version.clone(),
119 dispatched: self.dispatched.clone(),
120 action_fn: self.action_fn.clone(),
121 #[cfg(any(debug_assertions, leptos_debuginfo))]
122 defined_at: self.defined_at,
123 }
124 }
125}
126
127impl<I, O> ArcAction<I, O>
128where
129 I: 'static,
130 O: 'static,
131{
132 /// Creates a new action. This is lazy: it does not run the action function until some value
133 /// is dispatched.
134 ///
135 /// The constructor takes a function which will create a new `Future` from some input data.
136 /// When the action is dispatched, this `action_fn` will run, and the `Future` it returns will
137 /// be spawned.
138 ///
139 /// The `action_fn` must be `Send + Sync` so that the `ArcAction` is `Send + Sync`. The
140 /// `Future` must be `Send` so that it can be moved across threads by the async executor as
141 /// needed.
142 ///
143 /// ```rust
144 /// # use reactive_graph::actions::*;
145 /// # use reactive_graph::prelude::*;
146 /// # tokio_test::block_on(async move {
147 /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
148 /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
149 /// let act = ArcAction::new(|n: &u8| {
150 /// let n = n.to_owned();
151 /// async move { n * 2 }
152 /// });
153 ///
154 /// act.dispatch(3);
155 /// assert_eq!(act.input().get(), Some(3));
156 ///
157 /// // Remember that async functions already return a future if they are
158 /// // not `await`ed. You can save keystrokes by leaving out the `async move`
159 ///
160 /// let act2 = Action::new(|n: &String| yell(n.to_owned()));
161 /// act2.dispatch(String::from("i'm in a doctest"));
162 /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
163 ///
164 /// // after it resolves
165 /// assert_eq!(act2.value().get(), Some("I'M IN A DOCTEST".to_string()));
166 ///
167 /// async fn yell(n: String) -> String {
168 /// n.to_uppercase()
169 /// }
170 /// # });
171 /// ```
172 #[track_caller]
173 pub fn new<F, Fu>(action_fn: F) -> Self
174 where
175 F: Fn(&I) -> Fu + Send + Sync + 'static,
176 Fu: Future<Output = O> + Send + 'static,
177 I: Send + Sync,
178 O: Send + Sync,
179 {
180 Self::new_with_value(None, action_fn)
181 }
182
183 /// Creates a new action, initializing it with the given value.
184 ///
185 /// This is lazy: it does not run the action function until some value is dispatched.
186 ///
187 /// The constructor takes a function which will create a new `Future` from some input data.
188 /// When the action is dispatched, this `action_fn` will run, and the `Future` it returns will
189 /// be spawned.
190 ///
191 /// The `action_fn` must be `Send + Sync` so that the `ArcAction` is `Send + Sync`. The
192 /// `Future` must be `Send` so that it can be moved across threads by the async executor as
193 /// needed.
194 #[track_caller]
195 pub fn new_with_value<F, Fu>(value: Option<O>, action_fn: F) -> Self
196 where
197 F: Fn(&I) -> Fu + Send + Sync + 'static,
198 Fu: Future<Output = O> + Send + 'static,
199 I: Send + Sync,
200 O: Send + Sync,
201 {
202 ArcAction {
203 in_flight: ArcRwSignal::new(0),
204 input: ArcRwSignal::new(SendOption::new(None)),
205 value: ArcRwSignal::new(SendOption::new(value)),
206 version: Default::default(),
207 dispatched: Default::default(),
208 action_fn: Arc::new(move |input| Box::pin(action_fn(input))),
209 #[cfg(any(debug_assertions, leptos_debuginfo))]
210 defined_at: Location::caller(),
211 }
212 }
213
214 /// Clears the value of the action, setting its current value to `None`.
215 ///
216 /// This has no other effect: i.e., it will not cancel in-flight actions, set the
217 /// input, etc.
218 #[track_caller]
219 pub fn clear(&self) {
220 if let Some(mut guard) = self.value.try_write() {
221 **guard = None;
222 }
223 }
224}
225
226/// A handle that allows aborting an in-flight action. It is returned from [`Action::dispatch`] or
227/// [`ArcAction::dispatch`].
228#[derive(Debug)]
229pub struct ActionAbortHandle(oneshot::Sender<()>);
230
231impl ActionAbortHandle {
232 /// Aborts the action.
233 ///
234 /// This will cause the dispatched task to complete, without updating the action's value. The
235 /// dispatched action's `Future` will no longer be polled. This does not guarantee that side
236 /// effects created by that `Future` no longer run: for example, if the action dispatches an
237 /// HTTP request, whether that request is actually canceled or not depends on whether the
238 /// request library actually cancels a request when its `Future` is dropped.
239 pub fn abort(self) {
240 let _ = self.0.send(());
241 }
242}
243
244impl<I, O> ArcAction<I, O>
245where
246 I: Send + Sync + 'static,
247 O: Send + Sync + 'static,
248{
249 /// Calls the `async` function with a reference to the input type as its argument.
250 #[track_caller]
251 pub fn dispatch(&self, input: I) -> ActionAbortHandle {
252 let (abort_tx, mut abort_rx) = oneshot::channel();
253 if !is_suppressing_resource_load() {
254 let mut fut = (self.action_fn)(&input).fuse();
255
256 // Update the state before loading
257 self.in_flight.update(|n| *n += 1);
258 let current_version = self.dispatched.get_value();
259 self.input.try_update(|inp| **inp = Some(input));
260
261 // Spawn the task
262 crate::spawn({
263 let input = self.input.clone();
264 let version = self.version.clone();
265 let dispatched = self.dispatched.clone();
266 let value = self.value.clone();
267 let in_flight = self.in_flight.clone();
268 async move {
269 select! {
270 // if the abort message has been sent, bail and do nothing
271 _ = abort_rx => {
272 in_flight.update(|n| *n = n.saturating_sub(1));
273 },
274 // otherwise, update the value
275 result = fut => {
276 in_flight.update(|n| *n = n.saturating_sub(1));
277 let is_latest = dispatched.get_value() <= current_version;
278 if is_latest {
279 version.update(|n| *n += 1);
280 value.update(|n| **n = Some(result));
281 }
282 }
283 }
284 if in_flight.get_untracked() == 0 {
285 input.update(|inp| **inp = None);
286 }
287 }
288 });
289 }
290
291 ActionAbortHandle(abort_tx)
292 }
293}
294
295impl<I, O> ArcAction<I, O>
296where
297 I: 'static,
298 O: 'static,
299{
300 /// Calls the `async` function with a reference to the input type as its argument,
301 /// ensuring that it is spawned on the current thread.
302 #[track_caller]
303 pub fn dispatch_local(&self, input: I) -> ActionAbortHandle {
304 let (abort_tx, mut abort_rx) = oneshot::channel();
305 if !is_suppressing_resource_load() {
306 let mut fut = (self.action_fn)(&input).fuse();
307
308 // Update the state before loading
309 self.in_flight.update(|n| *n += 1);
310 let current_version = self.dispatched.get_value();
311 self.input.try_update(|inp| **inp = Some(input));
312
313 // Spawn the task
314 Executor::spawn_local({
315 let input = self.input.clone();
316 let version = self.version.clone();
317 let value = self.value.clone();
318 let dispatched = self.dispatched.clone();
319 let in_flight = self.in_flight.clone();
320 async move {
321 select! {
322 // if the abort message has been sent, bail and do nothing
323 _ = abort_rx => {
324 in_flight.update(|n| *n = n.saturating_sub(1));
325 },
326 // otherwise, update the value
327 result = fut => {
328 in_flight.update(|n| *n = n.saturating_sub(1));
329 let is_latest = dispatched.get_value() <= current_version;
330 if is_latest {
331 version.update(|n| *n += 1);
332 value.update(|n| **n = Some(result));
333 }
334 }
335 }
336 if in_flight.get_untracked() == 0 {
337 input.update(|inp| **inp = None);
338 }
339 }
340 });
341 }
342 ActionAbortHandle(abort_tx)
343 }
344}
345
346impl<I, O> ArcAction<I, O>
347where
348 I: 'static,
349 O: 'static,
350{
351 /// Creates a new action, which will only be run on the thread in which it is created.
352 ///
353 /// In all other ways, this is identical to [`ArcAction::new`].
354 #[track_caller]
355 pub fn new_unsync<F, Fu>(action_fn: F) -> Self
356 where
357 F: Fn(&I) -> Fu + 'static,
358 Fu: Future<Output = O> + 'static,
359 {
360 let action_fn = move |inp: &I| SendWrapper::new(action_fn(inp));
361 Self::new_unsync_with_value(None, action_fn)
362 }
363
364 /// Creates a new action that will only run on the current thread, initializing it with the given value.
365 ///
366 /// In all other ways, this is identical to [`ArcAction::new_with_value`].
367 #[track_caller]
368 pub fn new_unsync_with_value<F, Fu>(value: Option<O>, action_fn: F) -> Self
369 where
370 F: Fn(&I) -> Fu + 'static,
371 Fu: Future<Output = O> + 'static,
372 {
373 let action_fn = SendWrapper::new(action_fn);
374 ArcAction {
375 in_flight: ArcRwSignal::new(0),
376 input: ArcRwSignal::new(SendOption::new_local(None)),
377 value: ArcRwSignal::new(SendOption::new_local(value)),
378 version: Default::default(),
379 dispatched: Default::default(),
380 action_fn: Arc::new(move |input| {
381 Box::pin(SendWrapper::new(action_fn(input)))
382 }),
383 #[cfg(any(debug_assertions, leptos_debuginfo))]
384 defined_at: Location::caller(),
385 }
386 }
387}
388
389impl<I, O> ArcAction<I, O>
390where
391 I: 'static,
392 O: 'static,
393{
394 /// The number of times the action has successfully completed.
395 ///
396 /// ```rust
397 /// # use reactive_graph::actions::*;
398 /// # use reactive_graph::prelude::*;
399 /// # tokio_test::block_on(async move {
400 /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
401 /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
402 /// let act = ArcAction::new(|n: &u8| {
403 /// let n = n.to_owned();
404 /// async move { n * 2 }
405 /// });
406 ///
407 /// let version = act.version();
408 /// act.dispatch(3);
409 /// assert_eq!(version.get(), 0);
410 ///
411 /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
412 /// // after it resolves
413 /// assert_eq!(version.get(), 1);
414 /// # });
415 /// ```
416 #[track_caller]
417 pub fn version(&self) -> ArcRwSignal<usize> {
418 self.version.clone()
419 }
420
421 /// The current argument that was dispatched to the async function. This value will
422 /// be `Some` while we are waiting for it to resolve, and `None` after it has resolved.
423 ///
424 /// ```rust
425 /// # use reactive_graph::actions::*;
426 /// # use reactive_graph::prelude::*;
427 /// # tokio_test::block_on(async move {
428 /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
429 /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
430 /// let act = ArcAction::new(|n: &u8| {
431 /// let n = n.to_owned();
432 /// async move { n * 2 }
433 /// });
434 ///
435 /// let input = act.input();
436 /// assert_eq!(input.get(), None);
437 /// act.dispatch(3);
438 /// assert_eq!(input.get(), Some(3));
439 ///
440 /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
441 /// // after it resolves
442 /// assert_eq!(input.get(), None);
443 /// # });
444 /// ```
445 #[track_caller]
446 pub fn input(&self) -> ArcMappedSignal<Option<I>> {
447 ArcMappedSignal::new(
448 self.input.clone(),
449 |n| n.deref(),
450 |n| n.deref_mut(),
451 )
452 }
453
454 /// The most recent return value of the `async` function. This will be `None` before
455 /// the action has ever run successfully, and subsequently will always be `Some(_)`,
456 /// holding the old value until a new value has been received.
457 ///
458 /// ```rust
459 /// # use reactive_graph::actions::*;
460 /// # use reactive_graph::prelude::*;
461 /// # tokio_test::block_on(async move {
462 /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
463 /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
464 /// let act = ArcAction::new(|n: &u8| {
465 /// let n = n.to_owned();
466 /// async move { n * 2 }
467 /// });
468 ///
469 /// let value = act.value();
470 /// assert_eq!(value.get(), None);
471 /// act.dispatch(3);
472 /// assert_eq!(value.get(), None);
473 ///
474 /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
475 /// // after it resolves
476 /// assert_eq!(value.get(), Some(6));
477 /// // dispatch another value, and it still holds the old value
478 /// act.dispatch(3);
479 /// assert_eq!(value.get(), Some(6));
480 /// # });
481 /// ```
482 #[track_caller]
483 pub fn value(&self) -> ArcMappedSignal<Option<O>> {
484 ArcMappedSignal::new(
485 self.value.clone(),
486 |n| n.deref(),
487 |n| n.deref_mut(),
488 )
489 }
490
491 /// Whether the action has been dispatched and is currently waiting to resolve.
492 ///
493 /// ```rust
494 /// # use reactive_graph::actions::*;
495 /// # use reactive_graph::prelude::*;
496 /// # tokio_test::block_on(async move {
497 /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
498 /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
499 /// let act = ArcAction::new(|n: &u8| {
500 /// let n = n.to_owned();
501 /// async move { n * 2 }
502 /// });
503 ///
504 /// let pending = act.pending();
505 /// assert_eq!(pending.get(), false);
506 /// act.dispatch(3);
507 /// assert_eq!(pending.get(), true);
508 ///
509 /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
510 /// // after it resolves
511 /// assert_eq!(pending.get(), false);
512 /// # });
513 /// ```
514 #[track_caller]
515 pub fn pending(&self) -> ArcMemo<bool> {
516 let in_flight = self.in_flight.clone();
517 ArcMemo::new(move |_| in_flight.get() > 0)
518 }
519}
520
521impl<I, O> DefinedAt for ArcAction<I, O>
522where
523 I: 'static,
524 O: 'static,
525{
526 fn defined_at(&self) -> Option<&'static Location<'static>> {
527 #[cfg(any(debug_assertions, leptos_debuginfo))]
528 {
529 Some(self.defined_at)
530 }
531 #[cfg(not(any(debug_assertions, leptos_debuginfo)))]
532 {
533 None
534 }
535 }
536}
537
538/// An action runs some asynchronous code when you dispatch a new value to it, and gives you
539/// reactive access to the result.
540///
541/// Actions are intended for mutating or updating data, not for loading data. If you find yourself
542/// creating an action and immediately dispatching a value to it, this is probably the wrong
543/// primitive.
544///
545/// The reference-counted, `Clone` (but not `Copy` version of an `Action` is an [`ArcAction`].
546///
547/// ```rust
548/// # use reactive_graph::actions::*;
549/// # use reactive_graph::prelude::*;
550/// # tokio_test::block_on(async move {
551/// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
552/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
553/// async fn send_new_todo_to_api(task: String) -> usize {
554/// // do something...
555/// // return a task id
556/// 42
557/// }
558/// let save_data = Action::new(|task: &String| {
559/// // `task` is given as `&String` because its value is available in `input`
560/// send_new_todo_to_api(task.clone())
561/// });
562///
563/// // the argument currently running
564/// let input = save_data.input();
565/// // the most recent returned result
566/// let result_of_call = save_data.value();
567/// // whether the call is pending
568/// let pending = save_data.pending();
569/// // how many times the action has run
570/// // useful for reactively updating something else in response to a `dispatch` and response
571/// let version = save_data.version();
572///
573/// // before we do anything
574/// assert_eq!(input.get(), None); // no argument yet
575/// assert_eq!(pending.get(), false); // isn't pending a response
576/// assert_eq!(result_of_call.get(), None); // there's no "last value"
577/// assert_eq!(version.get(), 0);
578///
579/// // dispatch the action
580/// save_data.dispatch("My todo".to_string());
581///
582/// // when we're making the call
583/// assert_eq!(input.get(), Some("My todo".to_string()));
584/// assert_eq!(pending.get(), true); // is pending
585/// assert_eq!(result_of_call.get(), None); // has not yet gotten a response
586///
587/// # any_spawner::Executor::tick().await;
588///
589/// // after call has resolved
590/// assert_eq!(input.get(), None); // input clears out after resolved
591/// assert_eq!(pending.get(), false); // no longer pending
592/// assert_eq!(result_of_call.get(), Some(42));
593/// assert_eq!(version.get(), 1);
594/// # });
595/// ```
596///
597/// The input to the `async` function should always be a single value,
598/// but it can be of any type. The argument is always passed by reference to the
599/// function, because it is stored in [Action::input] as well.
600///
601/// ```rust
602/// # use reactive_graph::actions::*; let owner = reactive_graph::owner::Owner::new(); owner.set();
603/// // if there's a single argument, just use that
604/// let action1 = Action::new(|input: &String| {
605/// let input = input.clone();
606/// async move { todo!() }
607/// });
608///
609/// // if there are no arguments, use the unit type `()`
610/// let action2 = Action::new(|input: &()| async { todo!() });
611///
612/// // if there are multiple arguments, use a tuple
613/// let action3 = Action::new(|input: &(usize, String)| async { todo!() });
614/// ```
615pub struct Action<I, O> {
616 inner: ArenaItem<ArcAction<I, O>>,
617 #[cfg(any(debug_assertions, leptos_debuginfo))]
618 defined_at: &'static Location<'static>,
619}
620
621impl<I, O> Dispose for Action<I, O> {
622 fn dispose(self) {
623 self.inner.dispose()
624 }
625}
626
627impl<I, O> Action<I, O>
628where
629 I: Send + Sync + 'static,
630 O: Send + Sync + 'static,
631{
632 /// Creates a new action. This is lazy: it does not run the action function until some value
633 /// is dispatched.
634 ///
635 /// The constructor takes a function which will create a new `Future` from some input data.
636 /// When the action is dispatched, this `action_fn` will run, and the `Future` it returns will
637 /// be spawned.
638 ///
639 /// The `action_fn` must be `Send + Sync` so that the `ArcAction` is `Send + Sync`. The
640 /// `Future` must be `Send` so that it can be moved across threads by the async executor as
641 /// needed. In order to be stored in the `Copy` arena, the input and output types should also
642 /// be `Send + Sync`.
643 ///
644 /// ```rust
645 /// # use reactive_graph::actions::*;
646 /// # use reactive_graph::prelude::*;
647 /// # tokio_test::block_on(async move {
648 /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
649 /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
650 /// let act = Action::new(|n: &u8| {
651 /// let n = n.to_owned();
652 /// async move { n * 2 }
653 /// });
654 ///
655 /// act.dispatch(3);
656 /// assert_eq!(act.input().get(), Some(3));
657 ///
658 /// // Remember that async functions already return a future if they are
659 /// // not `await`ed. You can save keystrokes by leaving out the `async move`
660 ///
661 /// let act2 = Action::new(|n: &String| yell(n.to_owned()));
662 /// act2.dispatch(String::from("i'm in a doctest"));
663 /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
664 ///
665 /// // after it resolves
666 /// assert_eq!(act2.value().get(), Some("I'M IN A DOCTEST".to_string()));
667 ///
668 /// async fn yell(n: String) -> String {
669 /// n.to_uppercase()
670 /// }
671 /// # });
672 /// ```
673 #[track_caller]
674 pub fn new<F, Fu>(action_fn: F) -> Self
675 where
676 F: Fn(&I) -> Fu + Send + Sync + 'static,
677 Fu: Future<Output = O> + Send + 'static,
678 {
679 Self {
680 inner: ArenaItem::new(ArcAction::new(action_fn)),
681 #[cfg(any(debug_assertions, leptos_debuginfo))]
682 defined_at: Location::caller(),
683 }
684 }
685
686 /// Creates a new action, initializing it with the given value.
687 ///
688 /// This is lazy: it does not run the action function until some value is dispatched.
689 ///
690 /// The constructor takes a function which will create a new `Future` from some input data.
691 /// When the action is dispatched, this `action_fn` will run, and the `Future` it returns will
692 /// be spawned.
693 ///
694 /// The `action_fn` must be `Send + Sync` so that the `ArcAction` is `Send + Sync`. The
695 /// `Future` must be `Send` so that it can be moved across threads by the async executor as
696 /// needed. In order to be stored in the `Copy` arena, the input and output types should also
697 /// be `Send + Sync`.
698 #[track_caller]
699 pub fn new_with_value<F, Fu>(value: Option<O>, action_fn: F) -> Self
700 where
701 F: Fn(&I) -> Fu + Send + Sync + 'static,
702 Fu: Future<Output = O> + Send + 'static,
703 {
704 Self {
705 inner: ArenaItem::new(ArcAction::new_with_value(value, action_fn)),
706 #[cfg(any(debug_assertions, leptos_debuginfo))]
707 defined_at: Location::caller(),
708 }
709 }
710}
711
712impl<I, O> Action<I, O>
713where
714 I: 'static,
715 O: 'static,
716{
717 /// Clears the value of the action, setting its current value to `None`.
718 ///
719 /// This has no other effect: i.e., it will not cancel in-flight actions, set the
720 /// input, etc.
721 #[track_caller]
722 pub fn clear(&self) {
723 self.inner.try_with_value(|inner| inner.clear());
724 }
725}
726
727impl<I, O> Action<I, O>
728where
729 I: 'static,
730 O: 'static,
731{
732 /// Creates a new action, which does not require its inputs or outputs to be `Send`. In all other
733 /// ways, this is the same as [`Action::new`]. If this action is accessed from outside the
734 /// thread on which it was created, it panics.
735 #[track_caller]
736 pub fn new_local<F, Fu>(action_fn: F) -> Self
737 where
738 F: Fn(&I) -> Fu + 'static,
739 Fu: Future<Output = O> + 'static,
740 {
741 Self {
742 inner: ArenaItem::new(ArcAction::new_unsync(action_fn)),
743 #[cfg(any(debug_assertions, leptos_debuginfo))]
744 defined_at: Location::caller(),
745 }
746 }
747
748 /// Creates a new action with the initial value, which does not require its inputs or outputs to be `Send`. In all other
749 /// ways, this is the same as [`Action::new_with_value`]. If this action is accessed from outside the
750 /// thread on which it was created, it panics.
751 #[track_caller]
752 pub fn new_local_with_value<F, Fu>(value: Option<O>, action_fn: F) -> Self
753 where
754 F: Fn(&I) -> Fu + 'static,
755 Fu: Future<Output = O> + Send + 'static,
756 {
757 Self {
758 inner: ArenaItem::new(ArcAction::new_unsync_with_value(
759 value, action_fn,
760 )),
761 #[cfg(any(debug_assertions, leptos_debuginfo))]
762 defined_at: Location::caller(),
763 }
764 }
765}
766
767impl<I, O> Action<I, O>
768where
769 I: 'static,
770 O: 'static,
771{
772 /// The number of times the action has successfully completed.
773 ///
774 /// ```rust
775 /// # use reactive_graph::actions::*;
776 /// # use reactive_graph::prelude::*;
777 /// # tokio_test::block_on(async move {
778 /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
779 /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
780 /// let act = Action::new(|n: &u8| {
781 /// let n = n.to_owned();
782 /// async move { n * 2 }
783 /// });
784 ///
785 /// let version = act.version();
786 /// act.dispatch(3);
787 /// assert_eq!(version.get(), 0);
788 ///
789 /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
790 /// // after it resolves
791 /// assert_eq!(version.get(), 1);
792 /// # });
793 /// ```
794 #[track_caller]
795 pub fn version(&self) -> RwSignal<usize> {
796 let inner = self
797 .inner
798 .try_with_value(|inner| inner.version())
799 .unwrap_or_else(unwrap_signal!(self));
800 inner.into()
801 }
802
803 /// Whether the action has been dispatched and is currently waiting to resolve.
804 ///
805 /// ```rust
806 /// # use reactive_graph::actions::*;
807 /// # use reactive_graph::prelude::*;
808 /// # tokio_test::block_on(async move {
809 /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
810 /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
811 /// let act = Action::new(|n: &u8| {
812 /// let n = n.to_owned();
813 /// async move { n * 2 }
814 /// });
815 ///
816 /// let pending = act.pending();
817 /// assert_eq!(pending.get(), false);
818 /// act.dispatch(3);
819 /// assert_eq!(pending.get(), true);
820 ///
821 /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
822 /// // after it resolves
823 /// assert_eq!(pending.get(), false);
824 /// # });
825 /// ```
826 #[track_caller]
827 pub fn pending(&self) -> Memo<bool> {
828 let inner = self
829 .inner
830 .try_with_value(|inner| inner.pending())
831 .unwrap_or_else(unwrap_signal!(self));
832 inner.into()
833 }
834}
835
836impl<I, O> Action<I, O>
837where
838 I: 'static,
839 O: 'static,
840{
841 /// The current argument that was dispatched to the async function. This value will
842 /// be `Some` while we are waiting for it to resolve, and `None` after it has resolved.
843 ///
844 /// ```rust
845 /// # use reactive_graph::actions::*;
846 /// # use reactive_graph::prelude::*;
847 /// # tokio_test::block_on(async move {
848 /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
849 /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
850 /// let act = Action::new(|n: &u8| {
851 /// let n = n.to_owned();
852 /// async move { n * 2 }
853 /// });
854 ///
855 /// let input = act.input();
856 /// assert_eq!(input.get(), None);
857 /// act.dispatch(3);
858 /// assert_eq!(input.get(), Some(3));
859 ///
860 /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
861 /// // after it resolves
862 /// assert_eq!(input.get(), None);
863 /// # });
864 /// ```
865 #[track_caller]
866 pub fn input(&self) -> MappedSignal<Option<I>> {
867 self.inner
868 .try_with_value(|inner| inner.input())
869 .unwrap_or_else(unwrap_signal!(self))
870 .into()
871 }
872
873 /// The current argument that was dispatched to the async function. This value will
874 /// be `Some` while we are waiting for it to resolve, and `None` after it has resolved.
875 ///
876 /// Returns a thread-local signal using [`LocalStorage`].
877 #[track_caller]
878 #[deprecated = "You can now use .input() for any value, whether it's \
879 thread-safe or not."]
880 pub fn input_local(&self) -> MappedSignal<Option<I>> {
881 self.inner
882 .try_with_value(|inner| inner.input())
883 .unwrap_or_else(unwrap_signal!(self))
884 .into()
885 }
886}
887
888impl<I, O> Action<I, O>
889where
890 I: 'static,
891 O: 'static,
892{
893 /// The most recent return value of the `async` function. This will be `None` before
894 /// the action has ever run successfully, and subsequently will always be `Some(_)`,
895 /// holding the old value until a new value has been received.
896 ///
897 /// ```rust
898 /// # use reactive_graph::actions::*;
899 /// # use reactive_graph::prelude::*;
900 /// # tokio_test::block_on(async move {
901 /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
902 /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
903 /// let act = Action::new(|n: &u8| {
904 /// let n = n.to_owned();
905 /// async move { n * 2 }
906 /// });
907 ///
908 /// let value = act.value();
909 /// assert_eq!(value.get(), None);
910 /// act.dispatch(3);
911 /// assert_eq!(value.get(), None);
912 ///
913 /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
914 /// // after it resolves
915 /// assert_eq!(value.get(), Some(6));
916 /// // dispatch another value, and it still holds the old value
917 /// act.dispatch(3);
918 /// assert_eq!(value.get(), Some(6));
919 /// # });
920 /// ```
921 #[track_caller]
922 pub fn value(&self) -> MappedSignal<Option<O>> {
923 self.inner
924 .try_with_value(|inner| inner.value())
925 .unwrap_or_else(unwrap_signal!(self))
926 .into()
927 }
928
929 /// The most recent return value of the `async` function. This will be `None` before
930 /// the action has ever run successfully, and subsequently will always be `Some(_)`,
931 /// holding the old value until a new value has been received.
932 ///
933 /// Returns a thread-local signal using [`LocalStorage`].
934 #[deprecated = "You can now use .value() for any value, whether it's \
935 thread-safe or not."]
936 #[track_caller]
937 pub fn value_local(&self) -> MappedSignal<Option<O>>
938 where
939 O: Send + Sync,
940 {
941 self.inner
942 .try_with_value(|inner| inner.value())
943 .unwrap_or_else(unwrap_signal!(self))
944 .into()
945 }
946}
947
948impl<I, O> Action<I, O>
949where
950 I: Send + Sync + 'static,
951 O: Send + Sync + 'static,
952{
953 /// Calls the `async` function with a reference to the input type as its argument.
954 #[track_caller]
955 pub fn dispatch(&self, input: I) -> ActionAbortHandle {
956 self.inner
957 .try_get_value()
958 .map(|inner| inner.dispatch(input))
959 .unwrap_or_else(unwrap_signal!(self))
960 }
961}
962
963impl<I, O> Action<I, O>
964where
965 I: 'static,
966 O: 'static,
967{
968 /// Calls the `async` function with a reference to the input type as its argument.
969 #[track_caller]
970 pub fn dispatch_local(&self, input: I) -> ActionAbortHandle {
971 self.inner
972 .try_get_value()
973 .map(|inner| inner.dispatch_local(input))
974 .unwrap_or_else(unwrap_signal!(self))
975 }
976}
977
978impl<I, O> Action<I, O>
979where
980 I: Send + Sync + 'static,
981 O: Send + Sync + 'static,
982{
983 /// Creates a new action, which does not require the action itself to be `Send`, but will run
984 /// it on the same thread it was created on.
985 ///
986 /// In all other ways, this is identical to [`Action::new`].
987 #[track_caller]
988 pub fn new_unsync<F, Fu>(action_fn: F) -> Self
989 where
990 F: Fn(&I) -> Fu + 'static,
991 Fu: Future<Output = O> + 'static,
992 {
993 Self {
994 inner: ArenaItem::new_with_storage(ArcAction::new_unsync(
995 action_fn,
996 )),
997 #[cfg(any(debug_assertions, leptos_debuginfo))]
998 defined_at: Location::caller(),
999 }
1000 }
1001
1002 /// Creates a new action, which does not require the action itself to be `Send`, but will run
1003 /// it on the same thread it was created on, and gives an initial value.
1004 ///
1005 /// In all other ways, this is identical to [`Action::new`].
1006 #[track_caller]
1007 pub fn new_unsync_with_value<F, Fu>(value: Option<O>, action_fn: F) -> Self
1008 where
1009 F: Fn(&I) -> Fu + 'static,
1010 Fu: Future<Output = O> + 'static,
1011 {
1012 Self {
1013 inner: ArenaItem::new_with_storage(
1014 ArcAction::new_unsync_with_value(value, action_fn),
1015 ),
1016 #[cfg(any(debug_assertions, leptos_debuginfo))]
1017 defined_at: Location::caller(),
1018 }
1019 }
1020}
1021
1022impl<I, O> Action<I, O>
1023where
1024 I: 'static,
1025 O: 'static,
1026{
1027 /// Creates a new action, which neither requires the action itself nor the
1028 /// value it returns to be `Send`. If this action is accessed from outside the
1029 /// thread on which it was created, it panics.
1030 ///
1031 /// This combines the features of [`Action::new_local`] and [`Action::new_unsync`].
1032 #[track_caller]
1033 pub fn new_unsync_local<F, Fu>(action_fn: F) -> Self
1034 where
1035 F: Fn(&I) -> Fu + 'static,
1036 Fu: Future<Output = O> + 'static,
1037 {
1038 Self {
1039 inner: ArenaItem::new_with_storage(ArcAction::new_unsync(
1040 action_fn,
1041 )),
1042 #[cfg(any(debug_assertions, leptos_debuginfo))]
1043 defined_at: Location::caller(),
1044 }
1045 }
1046
1047 /// Creates a new action, which neither requires the action itself nor the
1048 /// value it returns to be `Send`, and provides it with an initial value.
1049 /// If this action is accessed from outside the thread on which it was created, it panics.
1050 ///
1051 /// This combines the features of [`Action::new_local_with_value`] and
1052 /// [`Action::new_unsync_with_value`].
1053 #[track_caller]
1054 pub fn new_unsync_local_with_value<F, Fu>(
1055 value: Option<O>,
1056 action_fn: F,
1057 ) -> Self
1058 where
1059 F: Fn(&I) -> Fu + 'static,
1060 Fu: Future<Output = O> + 'static,
1061 {
1062 Self {
1063 inner: ArenaItem::new_with_storage(
1064 ArcAction::new_unsync_with_value(value, action_fn),
1065 ),
1066 #[cfg(any(debug_assertions, leptos_debuginfo))]
1067 defined_at: Location::caller(),
1068 }
1069 }
1070}
1071
1072impl<I, O> DefinedAt for Action<I, O> {
1073 fn defined_at(&self) -> Option<&'static Location<'static>> {
1074 #[cfg(any(debug_assertions, leptos_debuginfo))]
1075 {
1076 Some(self.defined_at)
1077 }
1078 #[cfg(not(any(debug_assertions, leptos_debuginfo)))]
1079 {
1080 None
1081 }
1082 }
1083}
1084
1085impl<I, O> Clone for Action<I, O> {
1086 fn clone(&self) -> Self {
1087 *self
1088 }
1089}
1090
1091impl<I, O> Copy for Action<I, O> {}
1092
1093/// Creates a new action. This is lazy: it does not run the action function until some value
1094/// is dispatched.
1095///
1096/// The constructor takes a function which will create a new `Future` from some input data.
1097/// When the action is dispatched, this `action_fn` will run, and the `Future` it returns will
1098/// be spawned.
1099///
1100/// The `action_fn` must be `Send + Sync` so that the `ArcAction` is `Send + Sync`. The
1101/// `Future` must be `Send` so that it can be moved across threads by the async executor as
1102/// needed. In order to be stored in the `Copy` arena, the input and output types should also
1103/// be `Send + Sync`.
1104///
1105/// ```rust
1106/// # use reactive_graph::actions::*;
1107/// # use reactive_graph::prelude::*;
1108/// # tokio_test::block_on(async move {
1109/// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
1110/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
1111/// let act = Action::new(|n: &u8| {
1112/// let n = n.to_owned();
1113/// async move { n * 2 }
1114/// });
1115///
1116/// act.dispatch(3);
1117/// assert_eq!(act.input().get(), Some(3));
1118///
1119/// // Remember that async functions already return a future if they are
1120/// // not `await`ed. You can save keystrokes by leaving out the `async move`
1121///
1122/// let act2 = Action::new(|n: &String| yell(n.to_owned()));
1123/// act2.dispatch(String::from("i'm in a doctest"));
1124/// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1125///
1126/// // after it resolves
1127/// assert_eq!(act2.value().get(), Some("I'M IN A DOCTEST".to_string()));
1128///
1129/// async fn yell(n: String) -> String {
1130/// n.to_uppercase()
1131/// }
1132/// # });
1133/// ```
1134#[inline(always)]
1135#[track_caller]
1136#[deprecated = "This function is being removed to conform to Rust idioms. \
1137 Please use `Action::new()` instead."]
1138pub fn create_action<I, O, F, Fu>(action_fn: F) -> Action<I, O>
1139where
1140 I: Send + Sync + 'static,
1141 O: Send + Sync + 'static,
1142 F: Fn(&I) -> Fu + Send + Sync + 'static,
1143 Fu: Future<Output = O> + Send + 'static,
1144{
1145 Action::new(action_fn)
1146}