reactive_graph/computed/async_derived/
mod.rs1mod arc_async_derived;
2pub use arc_async_derived::*;
3#[allow(clippy::module_inception)] mod async_derived;
5mod future_impls;
6mod inner;
7use crate::{
8 graph::{AnySubscriber, Observer, WithObserver},
9 owner::Owner,
10};
11pub use async_derived::*;
12pub use future_impls::*;
13use futures::Future;
14use pin_project_lite::pin_project;
15use std::{
16 pin::Pin,
17 task::{Context, Poll},
18};
19
20pin_project! {
21 #[derive(Clone)]
24 #[allow(missing_docs)]
25 pub struct ScopedFuture<Fut> {
26 pub owner: Owner,
27 pub observer: Option<AnySubscriber>,
28 #[pin]
29 pub fut: Fut,
30 }
31}
32
33impl<Fut> ScopedFuture<Fut> {
34 pub fn new(fut: Fut) -> Self {
37 let owner = Owner::current().unwrap_or_default();
38 let observer = Observer::get();
39 Self {
40 owner,
41 observer,
42 fut,
43 }
44 }
45
46 pub fn new_untracked(fut: Fut) -> Self {
50 let owner = Owner::current().unwrap_or_default();
51 Self {
52 owner,
53 observer: None,
54 fut,
55 }
56 }
57
58 #[doc(hidden)]
59 #[track_caller]
60 pub fn new_untracked_with_diagnostics(
61 fut: Fut,
62 ) -> ScopedFutureUntrackedWithDiagnostics<Fut> {
63 let owner = Owner::current().unwrap_or_default();
64 ScopedFutureUntrackedWithDiagnostics {
65 owner,
66 observer: None,
67 fut,
68 }
69 }
70}
71
72impl<Fut: Future> Future for ScopedFuture<Fut> {
73 type Output = Fut::Output;
74
75 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
76 let this = self.project();
77 this.owner.with(|| {
78 #[cfg(debug_assertions)]
79 let _maybe_guard = if this.observer.is_none() {
80 Some(crate::diagnostics::SpecialNonReactiveZone::enter())
81 } else {
82 None
83 };
84 this.observer.with_observer(|| this.fut.poll(cx))
85 })
86 }
87}
88
89pin_project! {
90 #[derive(Clone)]
95 pub struct ScopedFutureUntrackedWithDiagnostics<Fut> {
96 owner: Owner,
97 observer: Option<AnySubscriber>,
98 #[pin]
99 fut: Fut,
100 }
101}
102
103impl<Fut: Future> Future for ScopedFutureUntrackedWithDiagnostics<Fut> {
104 type Output = Fut::Output;
105
106 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
107 let this = self.project();
108 this.owner
109 .with(|| this.observer.with_observer(|| this.fut.poll(cx)))
110 }
111}
112
113pub mod suspense {
115 use crate::{
116 signal::ArcRwSignal,
117 traits::{Update, Write},
118 };
119 use futures::channel::oneshot::Sender;
120 use or_poisoned::OrPoisoned;
121 use slotmap::{DefaultKey, SlotMap};
122 use std::sync::{Arc, Mutex};
123
124 #[derive(Clone, Debug)]
127 pub struct LocalResourceNotifier(Arc<Mutex<Option<Sender<()>>>>);
128
129 impl LocalResourceNotifier {
130 pub fn notify(&mut self) {
132 if let Some(tx) = self.0.lock().or_poisoned().take() {
133 tx.send(()).unwrap();
134 }
135 }
136 }
137
138 impl From<Sender<()>> for LocalResourceNotifier {
139 fn from(value: Sender<()>) -> Self {
140 Self(Arc::new(Mutex::new(Some(value))))
141 }
142 }
143
144 #[derive(Clone, Debug)]
146 pub struct SuspenseContext {
147 pub tasks: ArcRwSignal<SlotMap<DefaultKey, ()>>,
149 }
150
151 impl SuspenseContext {
152 pub fn task_id(&self) -> TaskHandle {
154 let key = self.tasks.write().insert(());
155 TaskHandle {
156 tasks: self.tasks.clone(),
157 key,
158 }
159 }
160 }
161
162 #[derive(Debug)]
164 pub struct TaskHandle {
165 tasks: ArcRwSignal<SlotMap<DefaultKey, ()>>,
166 key: DefaultKey,
167 }
168
169 impl Drop for TaskHandle {
170 fn drop(&mut self) {
171 self.tasks.update(|tasks| {
172 tasks.remove(self.key);
173 });
174 }
175 }
176}