reactive_graph/actions/action.rs
1use crate::{
2 computed::{ArcMemo, Memo, ScopedFuture},
3 diagnostics::is_suppressing_resource_load,
4 graph::untrack,
5 owner::{ArcStoredValue, ArenaItem, Owner},
6 send_wrapper_ext::SendOption,
7 signal::{ArcMappedSignal, ArcRwSignal, MappedSignal, RwSignal},
8 traits::{DefinedAt, Dispose, Get, GetUntracked, GetValue, Update, Write},
9 unwrap_signal,
10};
11use any_spawner::Executor;
12use futures::{channel::oneshot, select, FutureExt};
13use send_wrapper::SendWrapper;
14use std::{
15 future::Future,
16 ops::{Deref, DerefMut},
17 panic::Location,
18 pin::Pin,
19 sync::Arc,
20};
21
22/// An action runs some asynchronous code when you dispatch a new value to it, and gives you
23/// reactive access to the result.
24///
25/// Actions are intended for mutating or updating data, not for loading data. If you find yourself
26/// creating an action and immediately dispatching a value to it, this is probably the wrong
27/// primitive.
28///
29/// The arena-allocated, `Copy` version of an `ArcAction` is an [`Action`].
30///
31/// ```rust
32/// # use reactive_graph::actions::*;
33/// # use reactive_graph::prelude::*;
34/// # tokio_test::block_on(async move {
35/// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
36/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
37/// async fn send_new_todo_to_api(task: String) -> usize {
38/// // do something...
39/// // return a task id
40/// 42
41/// }
42/// let save_data = ArcAction::new(|task: &String| {
43/// // `task` is given as `&String` because its value is available in `input`
44/// send_new_todo_to_api(task.clone())
45/// });
46///
47/// // the argument currently running
48/// let input = save_data.input();
49/// // the most recent returned result
50/// let result_of_call = save_data.value();
51/// // whether the call is pending
52/// let pending = save_data.pending();
53/// // how many times the action has run
54/// // useful for reactively updating something else in response to a `dispatch` and response
55/// let version = save_data.version();
56///
57/// // before we do anything
58/// assert_eq!(input.get(), None); // no argument yet
59/// assert_eq!(pending.get(), false); // isn't pending a response
60/// assert_eq!(result_of_call.get(), None); // there's no "last value"
61/// assert_eq!(version.get(), 0);
62///
63/// // dispatch the action
64/// save_data.dispatch("My todo".to_string());
65///
66/// // when we're making the call
67/// assert_eq!(input.get(), Some("My todo".to_string()));
68/// assert_eq!(pending.get(), true); // is pending
69/// assert_eq!(result_of_call.get(), None); // has not yet gotten a response
70///
71/// # any_spawner::Executor::tick().await;
72///
73/// // after call has resolved
74/// assert_eq!(input.get(), None); // input clears out after resolved
75/// assert_eq!(pending.get(), false); // no longer pending
76/// assert_eq!(result_of_call.get(), Some(42));
77/// assert_eq!(version.get(), 1);
78/// # });
79/// ```
80///
81/// The input to the `async` function should always be a single value,
82/// but it can be of any type. The argument is always passed by reference to the
83/// function, because it is stored in [Action::input] as well.
84///
85/// ```rust
86/// # use reactive_graph::actions::*;
87/// // if there's a single argument, just use that
88/// let action1 = ArcAction::new(|input: &String| {
89/// let input = input.clone();
90/// async move { todo!() }
91/// });
92///
93/// // if there are no arguments, use the unit type `()`
94/// let action2 = ArcAction::new(|input: &()| async { todo!() });
95///
96/// // if there are multiple arguments, use a tuple
97/// let action3 = ArcAction::new(|input: &(usize, String)| async { todo!() });
98/// ```
99pub struct ArcAction<I, O> {
100 in_flight: ArcRwSignal<usize>,
101 input: ArcRwSignal<SendOption<I>>,
102 value: ArcRwSignal<SendOption<O>>,
103 version: ArcRwSignal<usize>,
104 dispatched: ArcStoredValue<usize>,
105 #[allow(clippy::complexity)]
106 action_fn: Arc<
107 dyn Fn(&I) -> Pin<Box<dyn Future<Output = O> + Send>> + Send + Sync,
108 >,
109 #[cfg(any(debug_assertions, leptos_debuginfo))]
110 defined_at: &'static Location<'static>,
111}
112
113impl<I, O> Clone for ArcAction<I, O> {
114 fn clone(&self) -> Self {
115 Self {
116 in_flight: self.in_flight.clone(),
117 input: self.input.clone(),
118 value: self.value.clone(),
119 version: self.version.clone(),
120 dispatched: self.dispatched.clone(),
121 action_fn: self.action_fn.clone(),
122 #[cfg(any(debug_assertions, leptos_debuginfo))]
123 defined_at: self.defined_at,
124 }
125 }
126}
127
128impl<I, O> ArcAction<I, O>
129where
130 I: 'static,
131 O: 'static,
132{
133 /// Creates a new action. This is lazy: it does not run the action function until some value
134 /// is dispatched.
135 ///
136 /// The constructor takes a function which will create a new `Future` from some input data.
137 /// When the action is dispatched, this `action_fn` will run, and the `Future` it returns will
138 /// be spawned.
139 ///
140 /// The `action_fn` must be `Send + Sync` so that the `ArcAction` is `Send + Sync`. The
141 /// `Future` must be `Send` so that it can be moved across threads by the async executor as
142 /// needed.
143 ///
144 /// ```rust
145 /// # use reactive_graph::actions::*;
146 /// # use reactive_graph::prelude::*;
147 /// # tokio_test::block_on(async move {
148 /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
149 /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
150 /// let act = ArcAction::new(|n: &u8| {
151 /// let n = n.to_owned();
152 /// async move { n * 2 }
153 /// });
154 ///
155 /// act.dispatch(3);
156 /// assert_eq!(act.input().get(), Some(3));
157 ///
158 /// // Remember that async functions already return a future if they are
159 /// // not `await`ed. You can save keystrokes by leaving out the `async move`
160 ///
161 /// let act2 = Action::new(|n: &String| yell(n.to_owned()));
162 /// act2.dispatch(String::from("i'm in a doctest"));
163 /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
164 ///
165 /// // after it resolves
166 /// assert_eq!(act2.value().get(), Some("I'M IN A DOCTEST".to_string()));
167 ///
168 /// async fn yell(n: String) -> String {
169 /// n.to_uppercase()
170 /// }
171 /// # });
172 /// ```
173 #[track_caller]
174 pub fn new<F, Fu>(action_fn: F) -> Self
175 where
176 F: Fn(&I) -> Fu + Send + Sync + 'static,
177 Fu: Future<Output = O> + Send + 'static,
178 I: Send + Sync,
179 O: Send + Sync,
180 {
181 Self::new_with_value(None, action_fn)
182 }
183
184 /// Creates a new action, initializing it with the given value.
185 ///
186 /// This is lazy: it does not run the action function until some value is dispatched.
187 ///
188 /// The constructor takes a function which will create a new `Future` from some input data.
189 /// When the action is dispatched, this `action_fn` will run, and the `Future` it returns will
190 /// be spawned.
191 ///
192 /// The `action_fn` must be `Send + Sync` so that the `ArcAction` is `Send + Sync`. The
193 /// `Future` must be `Send` so that it can be moved across threads by the async executor as
194 /// needed.
195 #[track_caller]
196 pub fn new_with_value<F, Fu>(value: Option<O>, action_fn: F) -> Self
197 where
198 F: Fn(&I) -> Fu + Send + Sync + 'static,
199 Fu: Future<Output = O> + Send + 'static,
200 I: Send + Sync,
201 O: Send + Sync,
202 {
203 let owner = Owner::current().unwrap_or_default();
204 ArcAction {
205 in_flight: ArcRwSignal::new(0),
206 input: ArcRwSignal::new(SendOption::new(None)),
207 value: ArcRwSignal::new(SendOption::new(value)),
208 version: Default::default(),
209 dispatched: Default::default(),
210 action_fn: Arc::new(move |input| {
211 Box::pin(owner.with(|| {
212 ScopedFuture::new_untracked(untrack(|| action_fn(input)))
213 }))
214 }),
215 #[cfg(any(debug_assertions, leptos_debuginfo))]
216 defined_at: Location::caller(),
217 }
218 }
219
220 /// Clears the value of the action, setting its current value to `None`.
221 ///
222 /// This has no other effect: i.e., it will not cancel in-flight actions, set the
223 /// input, etc.
224 #[track_caller]
225 pub fn clear(&self) {
226 if let Some(mut guard) = self.value.try_write() {
227 **guard = None;
228 }
229 }
230}
231
232/// A handle that allows aborting an in-flight action. It is returned from [`Action::dispatch`] or
233/// [`ArcAction::dispatch`].
234#[derive(Debug)]
235pub struct ActionAbortHandle(oneshot::Sender<()>);
236
237impl ActionAbortHandle {
238 /// Aborts the action.
239 ///
240 /// This will cause the dispatched task to complete, without updating the action's value. The
241 /// dispatched action's `Future` will no longer be polled. This does not guarantee that side
242 /// effects created by that `Future` no longer run: for example, if the action dispatches an
243 /// HTTP request, whether that request is actually canceled or not depends on whether the
244 /// request library actually cancels a request when its `Future` is dropped.
245 pub fn abort(self) {
246 let _ = self.0.send(());
247 }
248}
249
250impl<I, O> ArcAction<I, O>
251where
252 I: Send + Sync + 'static,
253 O: Send + Sync + 'static,
254{
255 /// Calls the `async` function with a reference to the input type as its argument.
256 #[track_caller]
257 pub fn dispatch(&self, input: I) -> ActionAbortHandle {
258 let (abort_tx, mut abort_rx) = oneshot::channel();
259 if !is_suppressing_resource_load() {
260 let mut fut = (self.action_fn)(&input).fuse();
261
262 // Update the state before loading
263 self.in_flight.update(|n| *n += 1);
264 let current_version = self.dispatched.get_value();
265 self.input.try_update(|inp| **inp = Some(input));
266
267 // Spawn the task
268 crate::spawn({
269 let input = self.input.clone();
270 let version = self.version.clone();
271 let dispatched = self.dispatched.clone();
272 let value = self.value.clone();
273 let in_flight = self.in_flight.clone();
274 async move {
275 select! {
276 // if the abort message has been sent, bail and do nothing
277 _ = abort_rx => {
278 in_flight.update(|n| *n = n.saturating_sub(1));
279 },
280 // otherwise, update the value
281 result = fut => {
282 in_flight.update(|n| *n = n.saturating_sub(1));
283 let is_latest = dispatched.get_value() <= current_version;
284 if is_latest {
285 version.update(|n| *n += 1);
286 value.update(|n| **n = Some(result));
287 }
288 }
289 }
290 if in_flight.get_untracked() == 0 {
291 input.update(|inp| **inp = None);
292 }
293 }
294 });
295 }
296
297 ActionAbortHandle(abort_tx)
298 }
299}
300
301impl<I, O> ArcAction<I, O>
302where
303 I: 'static,
304 O: 'static,
305{
306 /// Calls the `async` function with a reference to the input type as its argument,
307 /// ensuring that it is spawned on the current thread.
308 #[track_caller]
309 pub fn dispatch_local(&self, input: I) -> ActionAbortHandle {
310 let (abort_tx, mut abort_rx) = oneshot::channel();
311 if !is_suppressing_resource_load() {
312 let mut fut = (self.action_fn)(&input).fuse();
313
314 // Update the state before loading
315 self.in_flight.update(|n| *n += 1);
316 let current_version = self.dispatched.get_value();
317 self.input.try_update(|inp| **inp = Some(input));
318
319 // Spawn the task
320 Executor::spawn_local({
321 let input = self.input.clone();
322 let version = self.version.clone();
323 let value = self.value.clone();
324 let dispatched = self.dispatched.clone();
325 let in_flight = self.in_flight.clone();
326 async move {
327 select! {
328 // if the abort message has been sent, bail and do nothing
329 _ = abort_rx => {
330 in_flight.update(|n| *n = n.saturating_sub(1));
331 },
332 // otherwise, update the value
333 result = fut => {
334 in_flight.update(|n| *n = n.saturating_sub(1));
335 let is_latest = dispatched.get_value() <= current_version;
336 if is_latest {
337 version.update(|n| *n += 1);
338 value.update(|n| **n = Some(result));
339 }
340 }
341 }
342 if in_flight.get_untracked() == 0 {
343 input.update(|inp| **inp = None);
344 }
345 }
346 });
347 }
348 ActionAbortHandle(abort_tx)
349 }
350}
351
352impl<I, O> ArcAction<I, O>
353where
354 I: 'static,
355 O: 'static,
356{
357 /// Creates a new action, which will only be run on the thread in which it is created.
358 ///
359 /// In all other ways, this is identical to [`ArcAction::new`].
360 #[track_caller]
361 pub fn new_unsync<F, Fu>(action_fn: F) -> Self
362 where
363 F: Fn(&I) -> Fu + 'static,
364 Fu: Future<Output = O> + 'static,
365 {
366 let action_fn = move |inp: &I| SendWrapper::new(action_fn(inp));
367 Self::new_unsync_with_value(None, action_fn)
368 }
369
370 /// Creates a new action that will only run on the current thread, initializing it with the given value.
371 ///
372 /// In all other ways, this is identical to [`ArcAction::new_with_value`].
373 #[track_caller]
374 pub fn new_unsync_with_value<F, Fu>(value: Option<O>, action_fn: F) -> Self
375 where
376 F: Fn(&I) -> Fu + 'static,
377 Fu: Future<Output = O> + 'static,
378 {
379 let owner = Owner::current().unwrap_or_default();
380 let action_fn = SendWrapper::new(action_fn);
381 ArcAction {
382 in_flight: ArcRwSignal::new(0),
383 input: ArcRwSignal::new(SendOption::new_local(None)),
384 value: ArcRwSignal::new(SendOption::new_local(value)),
385 version: Default::default(),
386 dispatched: Default::default(),
387 action_fn: Arc::new(move |input| {
388 Box::pin(SendWrapper::new(owner.with(|| {
389 ScopedFuture::new_untracked(untrack(|| action_fn(input)))
390 })))
391 }),
392 #[cfg(any(debug_assertions, leptos_debuginfo))]
393 defined_at: Location::caller(),
394 }
395 }
396}
397
398impl<I, O> ArcAction<I, O>
399where
400 I: 'static,
401 O: 'static,
402{
403 /// The number of times the action has successfully completed.
404 ///
405 /// ```rust
406 /// # use reactive_graph::actions::*;
407 /// # use reactive_graph::prelude::*;
408 /// # tokio_test::block_on(async move {
409 /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
410 /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
411 /// let act = ArcAction::new(|n: &u8| {
412 /// let n = n.to_owned();
413 /// async move { n * 2 }
414 /// });
415 ///
416 /// let version = act.version();
417 /// act.dispatch(3);
418 /// assert_eq!(version.get(), 0);
419 ///
420 /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
421 /// // after it resolves
422 /// assert_eq!(version.get(), 1);
423 /// # });
424 /// ```
425 #[track_caller]
426 pub fn version(&self) -> ArcRwSignal<usize> {
427 self.version.clone()
428 }
429
430 /// The current argument that was dispatched to the async function. This value will
431 /// be `Some` while we are waiting for it to resolve, and `None` after it has resolved.
432 ///
433 /// ```rust
434 /// # use reactive_graph::actions::*;
435 /// # use reactive_graph::prelude::*;
436 /// # tokio_test::block_on(async move {
437 /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
438 /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
439 /// let act = ArcAction::new(|n: &u8| {
440 /// let n = n.to_owned();
441 /// async move { n * 2 }
442 /// });
443 ///
444 /// let input = act.input();
445 /// assert_eq!(input.get(), None);
446 /// act.dispatch(3);
447 /// assert_eq!(input.get(), Some(3));
448 ///
449 /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
450 /// // after it resolves
451 /// assert_eq!(input.get(), None);
452 /// # });
453 /// ```
454 #[track_caller]
455 pub fn input(&self) -> ArcMappedSignal<Option<I>> {
456 ArcMappedSignal::new(
457 self.input.clone(),
458 |n| n.deref(),
459 |n| n.deref_mut(),
460 )
461 }
462
463 /// The most recent return value of the `async` function. This will be `None` before
464 /// the action has ever run successfully, and subsequently will always be `Some(_)`,
465 /// holding the old value until a new value has been received.
466 ///
467 /// ```rust
468 /// # use reactive_graph::actions::*;
469 /// # use reactive_graph::prelude::*;
470 /// # tokio_test::block_on(async move {
471 /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
472 /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
473 /// let act = ArcAction::new(|n: &u8| {
474 /// let n = n.to_owned();
475 /// async move { n * 2 }
476 /// });
477 ///
478 /// let value = act.value();
479 /// assert_eq!(value.get(), None);
480 /// act.dispatch(3);
481 /// assert_eq!(value.get(), None);
482 ///
483 /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
484 /// // after it resolves
485 /// assert_eq!(value.get(), Some(6));
486 /// // dispatch another value, and it still holds the old value
487 /// act.dispatch(3);
488 /// assert_eq!(value.get(), Some(6));
489 /// # });
490 /// ```
491 #[track_caller]
492 pub fn value(&self) -> ArcMappedSignal<Option<O>> {
493 ArcMappedSignal::new(
494 self.value.clone(),
495 |n| n.deref(),
496 |n| n.deref_mut(),
497 )
498 }
499
500 /// Whether the action has been dispatched and is currently waiting to resolve.
501 ///
502 /// ```rust
503 /// # use reactive_graph::actions::*;
504 /// # use reactive_graph::prelude::*;
505 /// # tokio_test::block_on(async move {
506 /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
507 /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
508 /// let act = ArcAction::new(|n: &u8| {
509 /// let n = n.to_owned();
510 /// async move { n * 2 }
511 /// });
512 ///
513 /// let pending = act.pending();
514 /// assert_eq!(pending.get(), false);
515 /// act.dispatch(3);
516 /// assert_eq!(pending.get(), true);
517 ///
518 /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
519 /// // after it resolves
520 /// assert_eq!(pending.get(), false);
521 /// # });
522 /// ```
523 #[track_caller]
524 pub fn pending(&self) -> ArcMemo<bool> {
525 let in_flight = self.in_flight.clone();
526 ArcMemo::new(move |_| in_flight.get() > 0)
527 }
528}
529
530impl<I, O> DefinedAt for ArcAction<I, O>
531where
532 I: 'static,
533 O: 'static,
534{
535 fn defined_at(&self) -> Option<&'static Location<'static>> {
536 #[cfg(any(debug_assertions, leptos_debuginfo))]
537 {
538 Some(self.defined_at)
539 }
540 #[cfg(not(any(debug_assertions, leptos_debuginfo)))]
541 {
542 None
543 }
544 }
545}
546
547/// An action runs some asynchronous code when you dispatch a new value to it, and gives you
548/// reactive access to the result.
549///
550/// Actions are intended for mutating or updating data, not for loading data. If you find yourself
551/// creating an action and immediately dispatching a value to it, this is probably the wrong
552/// primitive.
553///
554/// The reference-counted, `Clone` (but not `Copy` version of an `Action` is an [`ArcAction`].
555///
556/// ```rust
557/// # use reactive_graph::actions::*;
558/// # use reactive_graph::prelude::*;
559/// # tokio_test::block_on(async move {
560/// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
561/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
562/// async fn send_new_todo_to_api(task: String) -> usize {
563/// // do something...
564/// // return a task id
565/// 42
566/// }
567/// let save_data = Action::new(|task: &String| {
568/// // `task` is given as `&String` because its value is available in `input`
569/// send_new_todo_to_api(task.clone())
570/// });
571///
572/// // the argument currently running
573/// let input = save_data.input();
574/// // the most recent returned result
575/// let result_of_call = save_data.value();
576/// // whether the call is pending
577/// let pending = save_data.pending();
578/// // how many times the action has run
579/// // useful for reactively updating something else in response to a `dispatch` and response
580/// let version = save_data.version();
581///
582/// // before we do anything
583/// assert_eq!(input.get(), None); // no argument yet
584/// assert_eq!(pending.get(), false); // isn't pending a response
585/// assert_eq!(result_of_call.get(), None); // there's no "last value"
586/// assert_eq!(version.get(), 0);
587///
588/// // dispatch the action
589/// save_data.dispatch("My todo".to_string());
590///
591/// // when we're making the call
592/// assert_eq!(input.get(), Some("My todo".to_string()));
593/// assert_eq!(pending.get(), true); // is pending
594/// assert_eq!(result_of_call.get(), None); // has not yet gotten a response
595///
596/// # any_spawner::Executor::tick().await;
597///
598/// // after call has resolved
599/// assert_eq!(input.get(), None); // input clears out after resolved
600/// assert_eq!(pending.get(), false); // no longer pending
601/// assert_eq!(result_of_call.get(), Some(42));
602/// assert_eq!(version.get(), 1);
603/// # });
604/// ```
605///
606/// The input to the `async` function should always be a single value,
607/// but it can be of any type. The argument is always passed by reference to the
608/// function, because it is stored in [Action::input] as well.
609///
610/// ```rust
611/// # use reactive_graph::actions::*; let owner = reactive_graph::owner::Owner::new(); owner.set();
612/// // if there's a single argument, just use that
613/// let action1 = Action::new(|input: &String| {
614/// let input = input.clone();
615/// async move { todo!() }
616/// });
617///
618/// // if there are no arguments, use the unit type `()`
619/// let action2 = Action::new(|input: &()| async { todo!() });
620///
621/// // if there are multiple arguments, use a tuple
622/// let action3 = Action::new(|input: &(usize, String)| async { todo!() });
623/// ```
624pub struct Action<I, O> {
625 inner: ArenaItem<ArcAction<I, O>>,
626 #[cfg(any(debug_assertions, leptos_debuginfo))]
627 defined_at: &'static Location<'static>,
628}
629
630impl<I, O> Dispose for Action<I, O> {
631 fn dispose(self) {
632 self.inner.dispose()
633 }
634}
635
636impl<I, O> Action<I, O>
637where
638 I: Send + Sync + 'static,
639 O: Send + Sync + 'static,
640{
641 /// Creates a new action. This is lazy: it does not run the action function until some value
642 /// is dispatched.
643 ///
644 /// The constructor takes a function which will create a new `Future` from some input data.
645 /// When the action is dispatched, this `action_fn` will run, and the `Future` it returns will
646 /// be spawned.
647 ///
648 /// The `action_fn` must be `Send + Sync` so that the `ArcAction` is `Send + Sync`. The
649 /// `Future` must be `Send` so that it can be moved across threads by the async executor as
650 /// needed. In order to be stored in the `Copy` arena, the input and output types should also
651 /// be `Send + Sync`.
652 ///
653 /// ```rust
654 /// # use reactive_graph::actions::*;
655 /// # use reactive_graph::prelude::*;
656 /// # tokio_test::block_on(async move {
657 /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
658 /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
659 /// let act = Action::new(|n: &u8| {
660 /// let n = n.to_owned();
661 /// async move { n * 2 }
662 /// });
663 ///
664 /// act.dispatch(3);
665 /// assert_eq!(act.input().get(), Some(3));
666 ///
667 /// // Remember that async functions already return a future if they are
668 /// // not `await`ed. You can save keystrokes by leaving out the `async move`
669 ///
670 /// let act2 = Action::new(|n: &String| yell(n.to_owned()));
671 /// act2.dispatch(String::from("i'm in a doctest"));
672 /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
673 ///
674 /// // after it resolves
675 /// assert_eq!(act2.value().get(), Some("I'M IN A DOCTEST".to_string()));
676 ///
677 /// async fn yell(n: String) -> String {
678 /// n.to_uppercase()
679 /// }
680 /// # });
681 /// ```
682 #[track_caller]
683 pub fn new<F, Fu>(action_fn: F) -> Self
684 where
685 F: Fn(&I) -> Fu + Send + Sync + 'static,
686 Fu: Future<Output = O> + Send + 'static,
687 {
688 Self {
689 inner: ArenaItem::new(ArcAction::new(action_fn)),
690 #[cfg(any(debug_assertions, leptos_debuginfo))]
691 defined_at: Location::caller(),
692 }
693 }
694
695 /// Creates a new action, initializing it with the given value.
696 ///
697 /// This is lazy: it does not run the action function until some value is dispatched.
698 ///
699 /// The constructor takes a function which will create a new `Future` from some input data.
700 /// When the action is dispatched, this `action_fn` will run, and the `Future` it returns will
701 /// be spawned.
702 ///
703 /// The `action_fn` must be `Send + Sync` so that the `ArcAction` is `Send + Sync`. The
704 /// `Future` must be `Send` so that it can be moved across threads by the async executor as
705 /// needed. In order to be stored in the `Copy` arena, the input and output types should also
706 /// be `Send + Sync`.
707 #[track_caller]
708 pub fn new_with_value<F, Fu>(value: Option<O>, action_fn: F) -> Self
709 where
710 F: Fn(&I) -> Fu + Send + Sync + 'static,
711 Fu: Future<Output = O> + Send + 'static,
712 {
713 Self {
714 inner: ArenaItem::new(ArcAction::new_with_value(value, action_fn)),
715 #[cfg(any(debug_assertions, leptos_debuginfo))]
716 defined_at: Location::caller(),
717 }
718 }
719}
720
721impl<I, O> Action<I, O>
722where
723 I: 'static,
724 O: 'static,
725{
726 /// Clears the value of the action, setting its current value to `None`.
727 ///
728 /// This has no other effect: i.e., it will not cancel in-flight actions, set the
729 /// input, etc.
730 #[track_caller]
731 pub fn clear(&self) {
732 self.inner.try_with_value(|inner| inner.clear());
733 }
734}
735
736impl<I, O> Action<I, O>
737where
738 I: 'static,
739 O: 'static,
740{
741 /// Creates a new action, which does not require its inputs or outputs to be `Send`. In all other
742 /// ways, this is the same as [`Action::new`]. If this action is accessed from outside the
743 /// thread on which it was created, it panics.
744 #[track_caller]
745 pub fn new_local<F, Fu>(action_fn: F) -> Self
746 where
747 F: Fn(&I) -> Fu + 'static,
748 Fu: Future<Output = O> + 'static,
749 {
750 Self {
751 inner: ArenaItem::new(ArcAction::new_unsync(action_fn)),
752 #[cfg(any(debug_assertions, leptos_debuginfo))]
753 defined_at: Location::caller(),
754 }
755 }
756
757 /// Creates a new action with the initial value, which does not require its inputs or outputs to be `Send`. In all other
758 /// ways, this is the same as [`Action::new_with_value`]. If this action is accessed from outside the
759 /// thread on which it was created, it panics.
760 #[track_caller]
761 pub fn new_local_with_value<F, Fu>(value: Option<O>, action_fn: F) -> Self
762 where
763 F: Fn(&I) -> Fu + 'static,
764 Fu: Future<Output = O> + Send + 'static,
765 {
766 Self {
767 inner: ArenaItem::new(ArcAction::new_unsync_with_value(
768 value, action_fn,
769 )),
770 #[cfg(any(debug_assertions, leptos_debuginfo))]
771 defined_at: Location::caller(),
772 }
773 }
774}
775
776impl<I, O> Action<I, O>
777where
778 I: 'static,
779 O: 'static,
780{
781 /// The number of times the action has successfully completed.
782 ///
783 /// ```rust
784 /// # use reactive_graph::actions::*;
785 /// # use reactive_graph::prelude::*;
786 /// # tokio_test::block_on(async move {
787 /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
788 /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
789 /// let act = Action::new(|n: &u8| {
790 /// let n = n.to_owned();
791 /// async move { n * 2 }
792 /// });
793 ///
794 /// let version = act.version();
795 /// act.dispatch(3);
796 /// assert_eq!(version.get(), 0);
797 ///
798 /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
799 /// // after it resolves
800 /// assert_eq!(version.get(), 1);
801 /// # });
802 /// ```
803 #[track_caller]
804 pub fn version(&self) -> RwSignal<usize> {
805 let inner = self
806 .inner
807 .try_with_value(|inner| inner.version())
808 .unwrap_or_else(unwrap_signal!(self));
809 inner.into()
810 }
811
812 /// Whether the action has been dispatched and is currently waiting to resolve.
813 ///
814 /// ```rust
815 /// # use reactive_graph::actions::*;
816 /// # use reactive_graph::prelude::*;
817 /// # tokio_test::block_on(async move {
818 /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
819 /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
820 /// let act = Action::new(|n: &u8| {
821 /// let n = n.to_owned();
822 /// async move { n * 2 }
823 /// });
824 ///
825 /// let pending = act.pending();
826 /// assert_eq!(pending.get(), false);
827 /// act.dispatch(3);
828 /// assert_eq!(pending.get(), true);
829 ///
830 /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
831 /// // after it resolves
832 /// assert_eq!(pending.get(), false);
833 /// # });
834 /// ```
835 #[track_caller]
836 pub fn pending(&self) -> Memo<bool> {
837 let inner = self
838 .inner
839 .try_with_value(|inner| inner.pending())
840 .unwrap_or_else(unwrap_signal!(self));
841 inner.into()
842 }
843}
844
845impl<I, O> Action<I, O>
846where
847 I: 'static,
848 O: 'static,
849{
850 /// The current argument that was dispatched to the async function. This value will
851 /// be `Some` while we are waiting for it to resolve, and `None` after it has resolved.
852 ///
853 /// ```rust
854 /// # use reactive_graph::actions::*;
855 /// # use reactive_graph::prelude::*;
856 /// # tokio_test::block_on(async move {
857 /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
858 /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
859 /// let act = Action::new(|n: &u8| {
860 /// let n = n.to_owned();
861 /// async move { n * 2 }
862 /// });
863 ///
864 /// let input = act.input();
865 /// assert_eq!(input.get(), None);
866 /// act.dispatch(3);
867 /// assert_eq!(input.get(), Some(3));
868 ///
869 /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
870 /// // after it resolves
871 /// assert_eq!(input.get(), None);
872 /// # });
873 /// ```
874 #[track_caller]
875 pub fn input(&self) -> MappedSignal<Option<I>> {
876 self.inner
877 .try_with_value(|inner| inner.input())
878 .unwrap_or_else(unwrap_signal!(self))
879 .into()
880 }
881
882 /// The current argument that was dispatched to the async function. This value will
883 /// be `Some` while we are waiting for it to resolve, and `None` after it has resolved.
884 ///
885 /// Returns a thread-local signal using [`LocalStorage`].
886 #[track_caller]
887 #[deprecated = "You can now use .input() for any value, whether it's \
888 thread-safe or not."]
889 pub fn input_local(&self) -> MappedSignal<Option<I>> {
890 self.inner
891 .try_with_value(|inner| inner.input())
892 .unwrap_or_else(unwrap_signal!(self))
893 .into()
894 }
895}
896
897impl<I, O> Action<I, O>
898where
899 I: 'static,
900 O: 'static,
901{
902 /// The most recent return value of the `async` function. This will be `None` before
903 /// the action has ever run successfully, and subsequently will always be `Some(_)`,
904 /// holding the old value until a new value has been received.
905 ///
906 /// ```rust
907 /// # use reactive_graph::actions::*;
908 /// # use reactive_graph::prelude::*;
909 /// # tokio_test::block_on(async move {
910 /// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
911 /// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
912 /// let act = Action::new(|n: &u8| {
913 /// let n = n.to_owned();
914 /// async move { n * 2 }
915 /// });
916 ///
917 /// let value = act.value();
918 /// assert_eq!(value.get(), None);
919 /// act.dispatch(3);
920 /// assert_eq!(value.get(), None);
921 ///
922 /// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
923 /// // after it resolves
924 /// assert_eq!(value.get(), Some(6));
925 /// // dispatch another value, and it still holds the old value
926 /// act.dispatch(3);
927 /// assert_eq!(value.get(), Some(6));
928 /// # });
929 /// ```
930 #[track_caller]
931 pub fn value(&self) -> MappedSignal<Option<O>> {
932 self.inner
933 .try_with_value(|inner| inner.value())
934 .unwrap_or_else(unwrap_signal!(self))
935 .into()
936 }
937
938 /// The most recent return value of the `async` function. This will be `None` before
939 /// the action has ever run successfully, and subsequently will always be `Some(_)`,
940 /// holding the old value until a new value has been received.
941 ///
942 /// Returns a thread-local signal using [`LocalStorage`].
943 #[deprecated = "You can now use .value() for any value, whether it's \
944 thread-safe or not."]
945 #[track_caller]
946 pub fn value_local(&self) -> MappedSignal<Option<O>>
947 where
948 O: Send + Sync,
949 {
950 self.inner
951 .try_with_value(|inner| inner.value())
952 .unwrap_or_else(unwrap_signal!(self))
953 .into()
954 }
955}
956
957impl<I, O> Action<I, O>
958where
959 I: Send + Sync + 'static,
960 O: Send + Sync + 'static,
961{
962 /// Calls the `async` function with a reference to the input type as its argument.
963 #[track_caller]
964 pub fn dispatch(&self, input: I) -> ActionAbortHandle {
965 self.inner
966 .try_get_value()
967 .map(|inner| inner.dispatch(input))
968 .unwrap_or_else(unwrap_signal!(self))
969 }
970}
971
972impl<I, O> Action<I, O>
973where
974 I: 'static,
975 O: 'static,
976{
977 /// Calls the `async` function with a reference to the input type as its argument.
978 #[track_caller]
979 pub fn dispatch_local(&self, input: I) -> ActionAbortHandle {
980 self.inner
981 .try_get_value()
982 .map(|inner| inner.dispatch_local(input))
983 .unwrap_or_else(unwrap_signal!(self))
984 }
985}
986
987impl<I, O> Action<I, O>
988where
989 I: Send + Sync + 'static,
990 O: Send + Sync + 'static,
991{
992 /// Creates a new action, which does not require the action itself to be `Send`, but will run
993 /// it on the same thread it was created on.
994 ///
995 /// In all other ways, this is identical to [`Action::new`].
996 #[track_caller]
997 pub fn new_unsync<F, Fu>(action_fn: F) -> Self
998 where
999 F: Fn(&I) -> Fu + 'static,
1000 Fu: Future<Output = O> + 'static,
1001 {
1002 Self {
1003 inner: ArenaItem::new_with_storage(ArcAction::new_unsync(
1004 action_fn,
1005 )),
1006 #[cfg(any(debug_assertions, leptos_debuginfo))]
1007 defined_at: Location::caller(),
1008 }
1009 }
1010
1011 /// Creates a new action, which does not require the action itself to be `Send`, but will run
1012 /// it on the same thread it was created on, and gives an initial value.
1013 ///
1014 /// In all other ways, this is identical to [`Action::new`].
1015 #[track_caller]
1016 pub fn new_unsync_with_value<F, Fu>(value: Option<O>, action_fn: F) -> Self
1017 where
1018 F: Fn(&I) -> Fu + 'static,
1019 Fu: Future<Output = O> + 'static,
1020 {
1021 Self {
1022 inner: ArenaItem::new_with_storage(
1023 ArcAction::new_unsync_with_value(value, action_fn),
1024 ),
1025 #[cfg(any(debug_assertions, leptos_debuginfo))]
1026 defined_at: Location::caller(),
1027 }
1028 }
1029}
1030
1031impl<I, O> Action<I, O>
1032where
1033 I: 'static,
1034 O: 'static,
1035{
1036 /// Creates a new action, which neither requires the action itself nor the
1037 /// value it returns to be `Send`. If this action is accessed from outside the
1038 /// thread on which it was created, it panics.
1039 ///
1040 /// This combines the features of [`Action::new_local`] and [`Action::new_unsync`].
1041 #[track_caller]
1042 pub fn new_unsync_local<F, Fu>(action_fn: F) -> Self
1043 where
1044 F: Fn(&I) -> Fu + 'static,
1045 Fu: Future<Output = O> + 'static,
1046 {
1047 Self {
1048 inner: ArenaItem::new_with_storage(ArcAction::new_unsync(
1049 action_fn,
1050 )),
1051 #[cfg(any(debug_assertions, leptos_debuginfo))]
1052 defined_at: Location::caller(),
1053 }
1054 }
1055
1056 /// Creates a new action, which neither requires the action itself nor the
1057 /// value it returns to be `Send`, and provides it with an initial value.
1058 /// If this action is accessed from outside the thread on which it was created, it panics.
1059 ///
1060 /// This combines the features of [`Action::new_local_with_value`] and
1061 /// [`Action::new_unsync_with_value`].
1062 #[track_caller]
1063 pub fn new_unsync_local_with_value<F, Fu>(
1064 value: Option<O>,
1065 action_fn: F,
1066 ) -> Self
1067 where
1068 F: Fn(&I) -> Fu + 'static,
1069 Fu: Future<Output = O> + 'static,
1070 {
1071 Self {
1072 inner: ArenaItem::new_with_storage(
1073 ArcAction::new_unsync_with_value(value, action_fn),
1074 ),
1075 #[cfg(any(debug_assertions, leptos_debuginfo))]
1076 defined_at: Location::caller(),
1077 }
1078 }
1079}
1080
1081impl<I, O> DefinedAt for Action<I, O> {
1082 fn defined_at(&self) -> Option<&'static Location<'static>> {
1083 #[cfg(any(debug_assertions, leptos_debuginfo))]
1084 {
1085 Some(self.defined_at)
1086 }
1087 #[cfg(not(any(debug_assertions, leptos_debuginfo)))]
1088 {
1089 None
1090 }
1091 }
1092}
1093
1094impl<I, O> Clone for Action<I, O> {
1095 fn clone(&self) -> Self {
1096 *self
1097 }
1098}
1099
1100impl<I, O> Copy for Action<I, O> {}
1101
1102/// Creates a new action. This is lazy: it does not run the action function until some value
1103/// is dispatched.
1104///
1105/// The constructor takes a function which will create a new `Future` from some input data.
1106/// When the action is dispatched, this `action_fn` will run, and the `Future` it returns will
1107/// be spawned.
1108///
1109/// The `action_fn` must be `Send + Sync` so that the `ArcAction` is `Send + Sync`. The
1110/// `Future` must be `Send` so that it can be moved across threads by the async executor as
1111/// needed. In order to be stored in the `Copy` arena, the input and output types should also
1112/// be `Send + Sync`.
1113///
1114/// ```rust
1115/// # use reactive_graph::actions::*;
1116/// # use reactive_graph::prelude::*;
1117/// # tokio_test::block_on(async move {
1118/// # any_spawner::Executor::init_tokio(); let owner = reactive_graph::owner::Owner::new(); owner.set();
1119/// # let _guard = reactive_graph::diagnostics::SpecialNonReactiveZone::enter();
1120/// let act = Action::new(|n: &u8| {
1121/// let n = n.to_owned();
1122/// async move { n * 2 }
1123/// });
1124///
1125/// act.dispatch(3);
1126/// assert_eq!(act.input().get(), Some(3));
1127///
1128/// // Remember that async functions already return a future if they are
1129/// // not `await`ed. You can save keystrokes by leaving out the `async move`
1130///
1131/// let act2 = Action::new(|n: &String| yell(n.to_owned()));
1132/// act2.dispatch(String::from("i'm in a doctest"));
1133/// # tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1134///
1135/// // after it resolves
1136/// assert_eq!(act2.value().get(), Some("I'M IN A DOCTEST".to_string()));
1137///
1138/// async fn yell(n: String) -> String {
1139/// n.to_uppercase()
1140/// }
1141/// # });
1142/// ```
1143#[inline(always)]
1144#[track_caller]
1145#[deprecated = "This function is being removed to conform to Rust idioms. \
1146 Please use `Action::new()` instead."]
1147pub fn create_action<I, O, F, Fu>(action_fn: F) -> Action<I, O>
1148where
1149 I: Send + Sync + 'static,
1150 O: Send + Sync + 'static,
1151 F: Fn(&I) -> Fu + Send + Sync + 'static,
1152 Fu: Future<Output = O> + Send + 'static,
1153{
1154 Action::new(action_fn)
1155}