reactive_graph/computed/async_derived/
async_derived.rs1use super::{ArcAsyncDerived, AsyncDerivedReadyFuture, BlockingLock};
2use crate::{
3 graph::{
4 AnySource, AnySubscriber, ReactiveNode, Source, Subscriber,
5 ToAnySource, ToAnySubscriber,
6 },
7 owner::{ArenaItem, FromLocal, LocalStorage, Storage, SyncStorage},
8 signal::guards::{AsyncPlain, ReadGuard, WriteGuard},
9 traits::{
10 DefinedAt, Dispose, IsDisposed, Notify, ReadUntracked,
11 UntrackableGuard, Write,
12 },
13 unwrap_signal,
14};
15use core::fmt::Debug;
16use send_wrapper::SendWrapper;
17use std::{future::Future, ops::DerefMut, panic::Location};
18
19pub struct AsyncDerived<T, S = SyncStorage> {
86 #[cfg(any(debug_assertions, leptos_debuginfo))]
87 defined_at: &'static Location<'static>,
88 pub(crate) inner: ArenaItem<ArcAsyncDerived<T>, S>,
89}
90
91impl<T, S> Dispose for AsyncDerived<T, S> {
92 fn dispose(self) {
93 self.inner.dispose()
94 }
95}
96
97impl<T> From<ArcAsyncDerived<T>> for AsyncDerived<T>
98where
99 T: Send + Sync + 'static,
100{
101 fn from(value: ArcAsyncDerived<T>) -> Self {
102 #[cfg(any(debug_assertions, leptos_debuginfo))]
103 let defined_at = value.defined_at;
104 Self {
105 #[cfg(any(debug_assertions, leptos_debuginfo))]
106 defined_at,
107 inner: ArenaItem::new_with_storage(value),
108 }
109 }
110}
111
112impl<T> From<AsyncDerived<T>> for ArcAsyncDerived<T>
113where
114 T: Send + Sync + 'static,
115{
116 #[track_caller]
117 fn from(value: AsyncDerived<T>) -> Self {
118 value
119 .inner
120 .try_get_value()
121 .unwrap_or_else(unwrap_signal!(value))
122 }
123}
124
125impl<T> FromLocal<ArcAsyncDerived<T>> for AsyncDerived<T, LocalStorage>
126where
127 T: 'static,
128{
129 fn from_local(value: ArcAsyncDerived<T>) -> Self {
130 #[cfg(any(debug_assertions, leptos_debuginfo))]
131 let defined_at = value.defined_at;
132 Self {
133 #[cfg(any(debug_assertions, leptos_debuginfo))]
134 defined_at,
135 inner: ArenaItem::new_with_storage(value),
136 }
137 }
138}
139
140impl<T> AsyncDerived<T>
141where
142 T: 'static,
143{
144 #[track_caller]
149 pub fn new<Fut>(fun: impl Fn() -> Fut + Send + Sync + 'static) -> Self
150 where
151 T: Send + Sync + 'static,
152 Fut: Future<Output = T> + Send + 'static,
153 {
154 Self {
155 #[cfg(any(debug_assertions, leptos_debuginfo))]
156 defined_at: Location::caller(),
157 inner: ArenaItem::new_with_storage(ArcAsyncDerived::new(fun)),
158 }
159 }
160
161 pub fn new_with_initial<Fut>(
165 initial_value: Option<T>,
166 fun: impl Fn() -> Fut + Send + Sync + 'static,
167 ) -> Self
168 where
169 T: Send + Sync + 'static,
170 Fut: Future<Output = T> + Send + 'static,
171 {
172 Self {
173 #[cfg(any(debug_assertions, leptos_debuginfo))]
174 defined_at: Location::caller(),
175 inner: ArenaItem::new_with_storage(
176 ArcAsyncDerived::new_with_initial(initial_value, fun),
177 ),
178 }
179 }
180}
181
182impl<T> AsyncDerived<SendWrapper<T>> {
183 #[doc(hidden)]
184 pub fn new_mock<Fut>(fun: impl Fn() -> Fut + 'static) -> Self
185 where
186 T: 'static,
187 Fut: Future<Output = T> + 'static,
188 {
189 Self {
190 #[cfg(any(debug_assertions, leptos_debuginfo))]
191 defined_at: Location::caller(),
192 inner: ArenaItem::new_with_storage(ArcAsyncDerived::new_mock(fun)),
193 }
194 }
195}
196
197impl<T> AsyncDerived<T, LocalStorage>
198where
199 T: 'static,
200{
201 pub fn new_unsync<Fut>(fun: impl Fn() -> Fut + 'static) -> Self
207 where
208 T: 'static,
209 Fut: Future<Output = T> + 'static,
210 {
211 Self {
212 #[cfg(any(debug_assertions, leptos_debuginfo))]
213 defined_at: Location::caller(),
214 inner: ArenaItem::new_with_storage(ArcAsyncDerived::new_unsync(
215 fun,
216 )),
217 }
218 }
219
220 pub fn new_unsync_with_initial<Fut>(
225 initial_value: Option<T>,
226 fun: impl Fn() -> Fut + 'static,
227 ) -> Self
228 where
229 T: 'static,
230 Fut: Future<Output = T> + 'static,
231 {
232 Self {
233 #[cfg(any(debug_assertions, leptos_debuginfo))]
234 defined_at: Location::caller(),
235 inner: ArenaItem::new_with_storage(
236 ArcAsyncDerived::new_unsync_with_initial(initial_value, fun),
237 ),
238 }
239 }
240}
241
242impl<T, S> AsyncDerived<T, S>
243where
244 T: 'static,
245 S: Storage<ArcAsyncDerived<T>>,
246{
247 #[track_caller]
249 pub fn ready(&self) -> AsyncDerivedReadyFuture {
250 let this = self
251 .inner
252 .try_get_value()
253 .unwrap_or_else(unwrap_signal!(self));
254 this.ready()
255 }
256}
257
258impl<T, S> Copy for AsyncDerived<T, S> {}
259
260impl<T, S> Clone for AsyncDerived<T, S> {
261 fn clone(&self) -> Self {
262 *self
263 }
264}
265
266impl<T, S> Debug for AsyncDerived<T, S>
267where
268 S: Debug,
269{
270 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271 f.debug_struct("AsyncDerived")
272 .field("type", &std::any::type_name::<T>())
273 .field("store", &self.inner)
274 .finish()
275 }
276}
277
278impl<T, S> DefinedAt for AsyncDerived<T, S> {
279 #[inline(always)]
280 fn defined_at(&self) -> Option<&'static Location<'static>> {
281 #[cfg(any(debug_assertions, leptos_debuginfo))]
282 {
283 Some(self.defined_at)
284 }
285 #[cfg(not(any(debug_assertions, leptos_debuginfo)))]
286 {
287 None
288 }
289 }
290}
291
292impl<T, S> ReadUntracked for AsyncDerived<T, S>
293where
294 T: 'static,
295 S: Storage<ArcAsyncDerived<T>>,
296{
297 type Value = ReadGuard<Option<T>, AsyncPlain<Option<T>>>;
298
299 fn try_read_untracked(&self) -> Option<Self::Value> {
300 self.inner
301 .try_get_value()
302 .map(|inner| inner.read_untracked())
303 }
304}
305
306impl<T, S> Notify for AsyncDerived<T, S>
307where
308 T: 'static,
309 S: Storage<ArcAsyncDerived<T>>,
310{
311 fn notify(&self) {
312 self.inner.try_with_value(|inner| inner.notify());
313 }
314}
315
316impl<T, S> Write for AsyncDerived<T, S>
317where
318 T: 'static,
319 S: Storage<ArcAsyncDerived<T>>,
320{
321 type Value = Option<T>;
322
323 fn try_write(&self) -> Option<impl UntrackableGuard<Target = Self::Value>> {
324 let guard = self
325 .inner
326 .try_with_value(|n| n.value.blocking_write_arc())?;
327 Some(WriteGuard::new(*self, guard))
328 }
329
330 fn try_write_untracked(
331 &self,
332 ) -> Option<impl DerefMut<Target = Self::Value>> {
333 self.inner.try_with_value(|n| n.value.blocking_write_arc())
334 }
335}
336
337impl<T, S> IsDisposed for AsyncDerived<T, S>
338where
339 T: 'static,
340 S: Storage<ArcAsyncDerived<T>>,
341{
342 fn is_disposed(&self) -> bool {
343 self.inner.is_disposed()
344 }
345}
346
347impl<T, S> ToAnySource for AsyncDerived<T, S>
348where
349 T: 'static,
350 S: Storage<ArcAsyncDerived<T>>,
351{
352 fn to_any_source(&self) -> AnySource {
353 self.inner
354 .try_get_value()
355 .map(|inner| inner.to_any_source())
356 .unwrap_or_else(unwrap_signal!(self))
357 }
358}
359
360impl<T, S> ToAnySubscriber for AsyncDerived<T, S>
361where
362 T: 'static,
363 S: Storage<ArcAsyncDerived<T>>,
364{
365 fn to_any_subscriber(&self) -> AnySubscriber {
366 self.inner
367 .try_get_value()
368 .map(|inner| inner.to_any_subscriber())
369 .unwrap_or_else(unwrap_signal!(self))
370 }
371}
372
373impl<T, S> Source for AsyncDerived<T, S>
374where
375 T: 'static,
376 S: Storage<ArcAsyncDerived<T>>,
377{
378 fn add_subscriber(&self, subscriber: AnySubscriber) {
379 if let Some(inner) = self.inner.try_get_value() {
380 inner.add_subscriber(subscriber);
381 }
382 }
383
384 fn remove_subscriber(&self, subscriber: &AnySubscriber) {
385 if let Some(inner) = self.inner.try_get_value() {
386 inner.remove_subscriber(subscriber);
387 }
388 }
389
390 fn clear_subscribers(&self) {
391 if let Some(inner) = self.inner.try_get_value() {
392 inner.clear_subscribers();
393 }
394 }
395}
396
397impl<T, S> ReactiveNode for AsyncDerived<T, S>
398where
399 T: 'static,
400 S: Storage<ArcAsyncDerived<T>>,
401{
402 fn mark_dirty(&self) {
403 if let Some(inner) = self.inner.try_get_value() {
404 inner.mark_dirty();
405 }
406 }
407
408 fn mark_check(&self) {
409 if let Some(inner) = self.inner.try_get_value() {
410 inner.mark_check();
411 }
412 }
413
414 fn mark_subscribers_check(&self) {
415 if let Some(inner) = self.inner.try_get_value() {
416 inner.mark_subscribers_check();
417 }
418 }
419
420 fn update_if_necessary(&self) -> bool {
421 if let Some(inner) = self.inner.try_get_value() {
422 inner.update_if_necessary()
423 } else {
424 false
425 }
426 }
427}
428
429impl<T, S> Subscriber for AsyncDerived<T, S>
430where
431 T: 'static,
432 S: Storage<ArcAsyncDerived<T>>,
433{
434 fn add_source(&self, source: AnySource) {
435 if let Some(inner) = self.inner.try_get_value() {
436 inner.add_source(source);
437 }
438 }
439
440 fn clear_sources(&self, subscriber: &AnySubscriber) {
441 if let Some(inner) = self.inner.try_get_value() {
442 inner.clear_sources(subscriber);
443 }
444 }
445}