reactive_graph/computed/async_derived/
mod.rs

1mod arc_async_derived;
2pub use arc_async_derived::*;
3#[allow(clippy::module_inception)] // not a pub mod, who cares?
4mod async_derived;
5mod future_impls;
6mod inner;
7use crate::{
8    graph::{AnySubscriber, Observer, WithObserver},
9    owner::Owner,
10};
11pub use async_derived::*;
12pub use future_impls::*;
13use futures::Future;
14use pin_project_lite::pin_project;
15use std::{
16    pin::Pin,
17    task::{Context, Poll},
18};
19
20pin_project! {
21    /// A [`Future`] wrapper that sets the [`Owner`] and [`Observer`] before polling the inner
22    /// `Future`.
23    #[derive(Clone)]
24    #[allow(missing_docs)]
25    pub struct ScopedFuture<Fut> {
26        pub owner: Owner,
27        pub observer: Option<AnySubscriber>,
28        #[pin]
29        pub fut: Fut,
30    }
31}
32
33impl<Fut> ScopedFuture<Fut> {
34    /// Wraps the given `Future` by taking the current [`Owner`] and [`Observer`] and re-setting
35    /// them as the active owner and observer every time the inner `Future` is polled.
36    pub fn new(fut: Fut) -> Self {
37        let owner = Owner::current().unwrap_or_default();
38        let observer = Observer::get();
39        Self {
40            owner,
41            observer,
42            fut,
43        }
44    }
45
46    /// Wraps the given `Future` by taking the current [`Owner`] re-setting it as the
47    /// active owner every time the inner `Future` is polled. Always untracks, i.e., clears
48    /// the active [`Observer`] when polled.
49    pub fn new_untracked(fut: Fut) -> Self {
50        let owner = Owner::current().unwrap_or_default();
51        Self {
52            owner,
53            observer: None,
54            fut,
55        }
56    }
57
58    #[doc(hidden)]
59    #[track_caller]
60    pub fn new_untracked_with_diagnostics(
61        fut: Fut,
62    ) -> ScopedFutureUntrackedWithDiagnostics<Fut> {
63        let owner = Owner::current().unwrap_or_default();
64        ScopedFutureUntrackedWithDiagnostics {
65            owner,
66            observer: None,
67            fut,
68        }
69    }
70}
71
72impl<Fut: Future> Future for ScopedFuture<Fut> {
73    type Output = Fut::Output;
74
75    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
76        let this = self.project();
77        this.owner.with(|| {
78            #[cfg(debug_assertions)]
79            let _maybe_guard = if this.observer.is_none() {
80                Some(crate::diagnostics::SpecialNonReactiveZone::enter())
81            } else {
82                None
83            };
84            this.observer.with_observer(|| this.fut.poll(cx))
85        })
86    }
87}
88
89pin_project! {
90    /// A [`Future`] wrapper that sets the [`Owner`] and [`Observer`] before polling the inner
91    /// `Future`, output of [`ScopedFuture::new_untracked_with_diagnostics`].
92    ///
93    /// In leptos 0.9 this will be replaced with `ScopedFuture` itself.
94    #[derive(Clone)]
95    pub struct ScopedFutureUntrackedWithDiagnostics<Fut> {
96        owner: Owner,
97        observer: Option<AnySubscriber>,
98        #[pin]
99        fut: Fut,
100    }
101}
102
103impl<Fut: Future> Future for ScopedFutureUntrackedWithDiagnostics<Fut> {
104    type Output = Fut::Output;
105
106    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
107        let this = self.project();
108        this.owner
109            .with(|| this.observer.with_observer(|| this.fut.poll(cx)))
110    }
111}
112
113/// Utilities used to track whether asynchronous computeds are currently loading.
114pub mod suspense {
115    use crate::{
116        signal::ArcRwSignal,
117        traits::{Update, Write},
118    };
119    use futures::channel::oneshot::Sender;
120    use or_poisoned::OrPoisoned;
121    use slotmap::{DefaultKey, SlotMap};
122    use std::sync::{Arc, Mutex};
123
124    /// Sends a one-time notification that the resource being read from is "local only," i.e.,
125    /// that it will only run on the client, not the server.
126    #[derive(Clone, Debug)]
127    pub struct LocalResourceNotifier(Arc<Mutex<Option<Sender<()>>>>);
128
129    impl LocalResourceNotifier {
130        /// Send the notification. If the inner channel has already been used, this does nothing.
131        pub fn notify(&mut self) {
132            if let Some(tx) = self.0.lock().or_poisoned().take() {
133                tx.send(()).unwrap();
134            }
135        }
136    }
137
138    impl From<Sender<()>> for LocalResourceNotifier {
139        fn from(value: Sender<()>) -> Self {
140            Self(Arc::new(Mutex::new(Some(value))))
141        }
142    }
143
144    /// Tracks the collection of active async tasks.
145    #[derive(Clone, Debug)]
146    pub struct SuspenseContext {
147        /// The set of active tasks.
148        pub tasks: ArcRwSignal<SlotMap<DefaultKey, ()>>,
149    }
150
151    impl SuspenseContext {
152        /// Generates a unique task ID.
153        pub fn task_id(&self) -> TaskHandle {
154            let key = self.tasks.write().insert(());
155            TaskHandle {
156                tasks: self.tasks.clone(),
157                key,
158            }
159        }
160    }
161
162    /// A unique identifier that removes itself from the set of tasks when it is dropped.
163    #[derive(Debug)]
164    pub struct TaskHandle {
165        tasks: ArcRwSignal<SlotMap<DefaultKey, ()>>,
166        key: DefaultKey,
167    }
168
169    impl Drop for TaskHandle {
170        fn drop(&mut self) {
171            self.tasks.update(|tasks| {
172                tasks.remove(self.key);
173            });
174        }
175    }
176}