1use super::{ArcAsyncDerived, AsyncDerivedReadyFuture, BlockingLock};
2use crate::{
3 graph::{
4 AnySource, AnySubscriber, ReactiveNode, Source, Subscriber,
5 ToAnySource, ToAnySubscriber,
6 },
7 owner::{ArenaItem, FromLocal, LocalStorage, Storage, SyncStorage},
8 send_wrapper_ext::SendOption,
9 signal::guards::{AsyncPlain, Mapped, MappedMut, ReadGuard, WriteGuard},
10 traits::{
11 DefinedAt, Dispose, IsDisposed, Notify, ReadUntracked,
12 UntrackableGuard, Write,
13 },
14 unwrap_signal,
15};
16use core::fmt::Debug;
17use or_poisoned::OrPoisoned;
18use std::{
19 future::Future,
20 mem,
21 ops::{Deref, DerefMut},
22 panic::Location,
23};
24
25pub struct AsyncDerived<T, S = SyncStorage> {
92 #[cfg(any(debug_assertions, leptos_debuginfo))]
93 defined_at: &'static Location<'static>,
94 pub(crate) inner: ArenaItem<ArcAsyncDerived<T>, S>,
95}
96
97impl<T, S> Dispose for AsyncDerived<T, S> {
98 fn dispose(self) {
99 self.inner.dispose()
100 }
101}
102
103impl<T, S> From<ArcAsyncDerived<T>> for AsyncDerived<T, S>
104where
105 T: 'static,
106 S: Storage<ArcAsyncDerived<T>>,
107{
108 fn from(value: ArcAsyncDerived<T>) -> Self {
109 #[cfg(any(debug_assertions, leptos_debuginfo))]
110 let defined_at = value.defined_at;
111 Self {
112 #[cfg(any(debug_assertions, leptos_debuginfo))]
113 defined_at,
114 inner: ArenaItem::new_with_storage(value),
115 }
116 }
117}
118
119impl<T, S> From<AsyncDerived<T, S>> for ArcAsyncDerived<T>
120where
121 T: 'static,
122 S: Storage<ArcAsyncDerived<T>>,
123{
124 #[track_caller]
125 fn from(value: AsyncDerived<T, S>) -> Self {
126 value
127 .inner
128 .try_get_value()
129 .unwrap_or_else(unwrap_signal!(value))
130 }
131}
132
133impl<T> FromLocal<ArcAsyncDerived<T>> for AsyncDerived<T, LocalStorage>
134where
135 T: 'static,
136{
137 fn from_local(value: ArcAsyncDerived<T>) -> Self {
138 #[cfg(any(debug_assertions, leptos_debuginfo))]
139 let defined_at = value.defined_at;
140 Self {
141 #[cfg(any(debug_assertions, leptos_debuginfo))]
142 defined_at,
143 inner: ArenaItem::new_with_storage(value),
144 }
145 }
146}
147
148impl<T> AsyncDerived<T>
149where
150 T: 'static,
151{
152 #[track_caller]
157 pub fn new<Fut>(fun: impl Fn() -> Fut + Send + Sync + 'static) -> Self
158 where
159 T: Send + Sync + 'static,
160 Fut: Future<Output = T> + Send + 'static,
161 {
162 Self {
163 #[cfg(any(debug_assertions, leptos_debuginfo))]
164 defined_at: Location::caller(),
165 inner: ArenaItem::new_with_storage(ArcAsyncDerived::new(fun)),
166 }
167 }
168
169 pub fn new_with_initial<Fut>(
173 initial_value: Option<T>,
174 fun: impl Fn() -> Fut + Send + Sync + 'static,
175 ) -> Self
176 where
177 T: Send + Sync + 'static,
178 Fut: Future<Output = T> + Send + 'static,
179 {
180 Self {
181 #[cfg(any(debug_assertions, leptos_debuginfo))]
182 defined_at: Location::caller(),
183 inner: ArenaItem::new_with_storage(
184 ArcAsyncDerived::new_with_initial(initial_value, fun),
185 ),
186 }
187 }
188}
189
190impl<T> AsyncDerived<T> {
191 #[doc(hidden)]
192 pub fn new_mock<Fut>(fun: impl Fn() -> Fut + 'static) -> Self
193 where
194 T: 'static,
195 Fut: Future<Output = T> + 'static,
196 {
197 Self {
198 #[cfg(any(debug_assertions, leptos_debuginfo))]
199 defined_at: Location::caller(),
200 inner: ArenaItem::new_with_storage(ArcAsyncDerived::new_mock(fun)),
201 }
202 }
203
204 pub fn new_unsync_threadsafe_storage<Fut>(
207 fun: impl Fn() -> Fut + 'static,
208 ) -> Self
209 where
210 T: 'static,
211 Fut: Future<Output = T> + 'static,
212 {
213 Self {
214 #[cfg(any(debug_assertions, leptos_debuginfo))]
215 defined_at: Location::caller(),
216 inner: ArenaItem::new_with_storage(ArcAsyncDerived::new_unsync(
217 fun,
218 )),
219 }
220 }
221}
222
223impl<T> AsyncDerived<T, LocalStorage>
224where
225 T: 'static,
226{
227 pub fn new_unsync<Fut>(fun: impl Fn() -> Fut + 'static) -> Self
233 where
234 T: 'static,
235 Fut: Future<Output = T> + 'static,
236 {
237 Self {
238 #[cfg(any(debug_assertions, leptos_debuginfo))]
239 defined_at: Location::caller(),
240 inner: ArenaItem::new_with_storage(ArcAsyncDerived::new_unsync(
241 fun,
242 )),
243 }
244 }
245
246 pub fn new_unsync_with_initial<Fut>(
251 initial_value: Option<T>,
252 fun: impl Fn() -> Fut + 'static,
253 ) -> Self
254 where
255 T: 'static,
256 Fut: Future<Output = T> + 'static,
257 {
258 Self {
259 #[cfg(any(debug_assertions, leptos_debuginfo))]
260 defined_at: Location::caller(),
261 inner: ArenaItem::new_with_storage(
262 ArcAsyncDerived::new_unsync_with_initial(initial_value, fun),
263 ),
264 }
265 }
266}
267
268impl<T, S> AsyncDerived<T, S>
269where
270 T: 'static,
271 S: Storage<ArcAsyncDerived<T>>,
272{
273 #[track_caller]
275 pub fn ready(&self) -> AsyncDerivedReadyFuture {
276 let this = self
277 .inner
278 .try_get_value()
279 .unwrap_or_else(unwrap_signal!(self));
280 this.ready()
281 }
282}
283
284impl<T, S> Copy for AsyncDerived<T, S> {}
285
286impl<T, S> Clone for AsyncDerived<T, S> {
287 fn clone(&self) -> Self {
288 *self
289 }
290}
291
292impl<T, S> Debug for AsyncDerived<T, S>
293where
294 S: Debug,
295{
296 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297 f.debug_struct("AsyncDerived")
298 .field("type", &std::any::type_name::<T>())
299 .field("store", &self.inner)
300 .finish()
301 }
302}
303
304impl<T, S> DefinedAt for AsyncDerived<T, S> {
305 #[inline(always)]
306 fn defined_at(&self) -> Option<&'static Location<'static>> {
307 #[cfg(any(debug_assertions, leptos_debuginfo))]
308 {
309 Some(self.defined_at)
310 }
311 #[cfg(not(any(debug_assertions, leptos_debuginfo)))]
312 {
313 None
314 }
315 }
316}
317
318impl<T, S> ReadUntracked for AsyncDerived<T, S>
319where
320 T: 'static,
321 S: Storage<ArcAsyncDerived<T>>,
322{
323 type Value =
324 ReadGuard<Option<T>, Mapped<AsyncPlain<SendOption<T>>, Option<T>>>;
325
326 fn try_read_untracked(&self) -> Option<Self::Value> {
327 self.inner
328 .try_get_value()
329 .map(|inner| inner.read_untracked())
330 }
331}
332
333impl<T, S> Notify for AsyncDerived<T, S>
334where
335 T: 'static,
336 S: Storage<ArcAsyncDerived<T>>,
337{
338 fn notify(&self) {
339 self.inner.try_with_value(|inner| inner.notify());
340 }
341}
342
343impl<T, S> Write for AsyncDerived<T, S>
344where
345 T: 'static,
346 S: Storage<ArcAsyncDerived<T>>,
347{
348 type Value = Option<T>;
349
350 fn try_write(&self) -> Option<impl UntrackableGuard<Target = Self::Value>> {
351 let guard = self
352 .inner
353 .try_with_value(|n| n.value.blocking_write_arc())?;
354
355 self.inner.try_with_value(|n| {
356 let mut guard = n.inner.write().or_poisoned();
357 guard.version += 1;
360
361 drop(mem::take(&mut guard.pending_suspenses));
363 });
364
365 Some(MappedMut::new(
366 WriteGuard::new(*self, guard),
367 |v| v.deref(),
368 |v| v.deref_mut(),
369 ))
370 }
371
372 fn try_write_untracked(
373 &self,
374 ) -> Option<impl DerefMut<Target = Self::Value>> {
375 self.inner.try_with_value(|n| {
376 let mut guard = n.inner.write().or_poisoned();
377 guard.version += 1;
380
381 drop(mem::take(&mut guard.pending_suspenses));
383 });
384
385 self.inner
386 .try_with_value(|n| n.value.blocking_write_arc())
387 .map(|inner| {
388 MappedMut::new(inner, |v| v.deref(), |v| v.deref_mut())
389 })
390 }
391}
392
393impl<T, S> IsDisposed for AsyncDerived<T, S>
394where
395 T: 'static,
396 S: Storage<ArcAsyncDerived<T>>,
397{
398 fn is_disposed(&self) -> bool {
399 self.inner.is_disposed()
400 }
401}
402
403impl<T, S> ToAnySource for AsyncDerived<T, S>
404where
405 T: 'static,
406 S: Storage<ArcAsyncDerived<T>>,
407{
408 fn to_any_source(&self) -> AnySource {
409 self.inner
410 .try_get_value()
411 .map(|inner| inner.to_any_source())
412 .unwrap_or_else(unwrap_signal!(self))
413 }
414}
415
416impl<T, S> ToAnySubscriber for AsyncDerived<T, S>
417where
418 T: 'static,
419 S: Storage<ArcAsyncDerived<T>>,
420{
421 fn to_any_subscriber(&self) -> AnySubscriber {
422 self.inner
423 .try_get_value()
424 .map(|inner| inner.to_any_subscriber())
425 .unwrap_or_else(unwrap_signal!(self))
426 }
427}
428
429impl<T, S> Source for AsyncDerived<T, S>
430where
431 T: 'static,
432 S: Storage<ArcAsyncDerived<T>>,
433{
434 fn add_subscriber(&self, subscriber: AnySubscriber) {
435 if let Some(inner) = self.inner.try_get_value() {
436 inner.add_subscriber(subscriber);
437 }
438 }
439
440 fn remove_subscriber(&self, subscriber: &AnySubscriber) {
441 if let Some(inner) = self.inner.try_get_value() {
442 inner.remove_subscriber(subscriber);
443 }
444 }
445
446 fn clear_subscribers(&self) {
447 if let Some(inner) = self.inner.try_get_value() {
448 inner.clear_subscribers();
449 }
450 }
451}
452
453impl<T, S> ReactiveNode for AsyncDerived<T, S>
454where
455 T: 'static,
456 S: Storage<ArcAsyncDerived<T>>,
457{
458 fn mark_dirty(&self) {
459 if let Some(inner) = self.inner.try_get_value() {
460 inner.mark_dirty();
461 }
462 }
463
464 fn mark_check(&self) {
465 if let Some(inner) = self.inner.try_get_value() {
466 inner.mark_check();
467 }
468 }
469
470 fn mark_subscribers_check(&self) {
471 if let Some(inner) = self.inner.try_get_value() {
472 inner.mark_subscribers_check();
473 }
474 }
475
476 fn update_if_necessary(&self) -> bool {
477 if let Some(inner) = self.inner.try_get_value() {
478 inner.update_if_necessary()
479 } else {
480 false
481 }
482 }
483}
484
485impl<T, S> Subscriber for AsyncDerived<T, S>
486where
487 T: 'static,
488 S: Storage<ArcAsyncDerived<T>>,
489{
490 fn add_source(&self, source: AnySource) {
491 if let Some(inner) = self.inner.try_get_value() {
492 inner.add_source(source);
493 }
494 }
495
496 fn clear_sources(&self, subscriber: &AnySubscriber) {
497 if let Some(inner) = self.inner.try_get_value() {
498 inner.clear_sources(subscriber);
499 }
500 }
501}