1use crate::error::FetchError;
10use crate::executor::BoxFut;
11use crate::punnu::delta::DeltaResult;
12use crate::punnu::events::{EventReason, PunnuEvent};
13use crate::punnu::pool::Punnu;
14use crate::punnu::recovery::{RecoverySet, RecoverySnapshot};
15use crate::punnu::single_flight::{FetchErrorClone, into_clone};
16use crate::watermark::DeltaSyncCacheable;
17#[cfg(not(target_arch = "wasm32"))]
18use async_trait::async_trait;
19use futures::FutureExt;
20#[cfg(not(target_arch = "wasm32"))]
21use futures::future::BoxFuture;
22#[cfg(target_arch = "wasm32")]
23use futures::future::LocalBoxFuture;
24use futures::future::Shared;
25use std::collections::HashSet;
26use std::fmt;
27use std::future::Future;
28use std::num::NonZeroUsize;
29use std::panic::AssertUnwindSafe;
30use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
31use std::sync::{Arc, Mutex};
32use std::time::Duration;
33use tokio::sync::broadcast;
34use tokio::sync::watch;
35
36#[cfg(not(target_arch = "wasm32"))]
62#[async_trait]
63pub trait DeltaPunnuFetcher<T: DeltaSyncCacheable>: Send + Sync + 'static {
64 async fn fetch_delta(
66 &self,
67 query: DeltaQuery<T>,
68 ) -> Result<DeltaResult<T, T::Watermark>, FetchError>;
69}
70
71#[cfg(target_arch = "wasm32")]
96#[async_trait::async_trait(?Send)]
97pub trait DeltaPunnuFetcher<T: DeltaSyncCacheable>: 'static {
98 async fn fetch_delta(
100 &self,
101 query: DeltaQuery<T>,
102 ) -> Result<DeltaResult<T, T::Watermark>, FetchError>;
103}
104
105#[non_exhaustive]
107pub struct DeltaQuery<T: DeltaSyncCacheable> {
108 pub since: Option<T::Watermark>,
113 pub recover_ids: HashSet<T::Id>,
118}
119
120pub struct UpdateResult<T: DeltaSyncCacheable> {
122 pub applied: usize,
124 pub watermark: Option<T::Watermark>,
126}
127
128impl<T: DeltaSyncCacheable> Clone for UpdateResult<T> {
129 fn clone(&self) -> Self {
130 Self {
131 applied: self.applied,
132 watermark: self.watermark.clone(),
133 }
134 }
135}
136
137impl<T> fmt::Debug for UpdateResult<T>
138where
139 T: DeltaSyncCacheable,
140 T::Watermark: fmt::Debug,
141{
142 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143 f.debug_struct("UpdateResult")
144 .field("applied", &self.applied)
145 .field("watermark", &self.watermark)
146 .finish()
147 }
148}
149
150impl<T: DeltaSyncCacheable> PartialEq for UpdateResult<T> {
151 fn eq(&self, other: &Self) -> bool {
152 self.applied == other.applied && self.watermark == other.watermark
153 }
154}
155
156impl<T: DeltaSyncCacheable> Eq for UpdateResult<T> {}
157
158pub struct DeltaRefreshHandle<T: DeltaSyncCacheable> {
160 pub(crate) inner: Arc<RefreshSubscription<T>>,
161}
162
163impl<T: DeltaSyncCacheable> DeltaRefreshHandle<T> {
164 pub async fn update(&self) -> Result<UpdateResult<T>, FetchError> {
174 assert_active_tokio_runtime("DeltaRefreshHandle::update");
175 RefreshSubscription::update(self.inner.clone()).await
176 }
177
178 pub async fn update_full(&self) -> Result<UpdateResult<T>, FetchError> {
191 assert_active_tokio_runtime("DeltaRefreshHandle::update_full");
192 RefreshSubscription::update_full(self.inner.clone()).await
193 }
194
195 pub fn cancel(&self) {
198 let _ = self.inner.cancel.send(true);
199 }
200
201 pub fn with_eviction_recovery(self, enabled: bool) -> Self {
207 self.inner
208 .eviction_recovery_enabled
209 .store(enabled, Ordering::Release);
210 if !enabled {
211 self.inner
212 .recovery
213 .lock()
214 .expect("delta refresh recovery lock poisoned")
215 .clear();
216 self.inner.satisfied_force_full_generation.store(
217 self.inner.force_full_generation.load(Ordering::Acquire),
218 Ordering::Release,
219 );
220 }
221 self
222 }
223
224 pub fn with_periodic_full_refresh(self, every_n_ticks: Option<NonZeroUsize>) -> Self {
229 self.inner.periodic_full_every.store(
230 every_n_ticks.map(NonZeroUsize::get).unwrap_or(0),
231 Ordering::Release,
232 );
233 self.inner
234 .periodic_full_progress
235 .store(0, Ordering::Release);
236 self
237 }
238
239 pub fn pending_eviction_recovery_count(&self) -> usize {
243 if !self.inner.eviction_recovery_enabled.load(Ordering::Acquire) {
244 return 0;
245 }
246 self.inner
247 .recovery
248 .lock()
249 .expect("delta refresh recovery lock poisoned")
250 .len()
251 }
252
253 pub fn periodic_full_refresh_progress(&self) -> Option<(usize, NonZeroUsize)> {
258 let every = NonZeroUsize::new(self.inner.periodic_full_every.load(Ordering::Acquire))?;
259 let progress = self
260 .inner
261 .periodic_full_progress
262 .load(Ordering::Acquire)
263 .min(every.get().saturating_sub(1));
264 Some((progress, every))
265 }
266
267 pub fn watermark(&self) -> Option<T::Watermark> {
269 self.inner
270 .last_watermark
271 .lock()
272 .expect("delta refresh watermark lock poisoned")
273 .clone()
274 }
275}
276
277impl<T: DeltaSyncCacheable> Drop for DeltaRefreshHandle<T> {
278 fn drop(&mut self) {
279 let _ = self.inner.cancel.send(true);
280 }
281}
282
283pub(crate) struct RefreshSubscription<T: DeltaSyncCacheable> {
284 punnu: Punnu<T>,
285 fetcher: Arc<dyn DeltaPunnuFetcher<T>>,
286 last_watermark: Mutex<Option<T::Watermark>>,
287 membership: Mutex<HashSet<T::Id>>,
288 recovery: Mutex<RecoverySet<T::Id>>,
289 eviction_recovery_enabled: AtomicBool,
290 force_full_generation: AtomicU64,
291 satisfied_force_full_generation: AtomicU64,
292 lru_warning_issued: AtomicBool,
293 periodic_full_every: AtomicUsize,
294 periodic_full_progress: AtomicUsize,
295 recovery_tick: AtomicU64,
296 slot: Mutex<InFlightSlot<T>>,
297 next_operation_id: AtomicU64,
298 cancel: watch::Sender<bool>,
299}
300
301enum InFlightSlot<T: DeltaSyncCacheable> {
302 Empty,
303 Delta {
304 operation_id: u64,
305 shared: SharedUpdate<T>,
306 pending_full: Option<PendingFull<T>>,
307 },
308 Full {
309 operation_id: u64,
310 shared: SharedUpdate<T>,
311 },
312}
313
314struct PendingFull<T: DeltaSyncCacheable> {
315 operation_id: u64,
316 shared: SharedUpdate<T>,
317}
318
319#[derive(Debug, Clone, Copy, PartialEq, Eq)]
320enum TickKind {
321 Delta,
322 Full,
323}
324
325struct MembershipUpdate<Id> {
326 full_refresh: bool,
327 item_ids: HashSet<Id>,
328 tombstones: HashSet<Id>,
329 recovered_ids: HashSet<Id>,
330}
331
332enum UpdateFullAction<T: DeltaSyncCacheable> {
333 AwaitFull(SharedUpdate<T>),
334 AwaitDeltaThenFull {
335 delta: SharedUpdate<T>,
336 full: SharedUpdate<T>,
337 },
338}
339
340type SharedOutput<T> = Result<UpdateResult<T>, FetchErrorClone>;
341
342#[cfg(not(target_arch = "wasm32"))]
343type UpdateFuture<T> = BoxFuture<'static, SharedOutput<T>>;
344#[cfg(target_arch = "wasm32")]
345type UpdateFuture<T> = LocalBoxFuture<'static, SharedOutput<T>>;
346
347type SharedUpdate<T> = Shared<UpdateFuture<T>>;
348
349impl<Id> MembershipUpdate<Id>
350where
351 Id: Eq + std::hash::Hash + Clone,
352{
353 fn from_delta<T>(
354 kind: TickKind,
355 delta: &DeltaResult<T, T::Watermark>,
356 recovery_snapshot: &RecoverySnapshot<T::Id>,
357 ) -> MembershipUpdate<T::Id>
358 where
359 T: DeltaSyncCacheable<Id = Id>,
360 {
361 let tombstones = delta.tombstones.clone();
362 let item_ids = delta
363 .items
364 .iter()
365 .map(|item| item.id())
366 .filter(|id| !tombstones.contains(id))
367 .collect();
368 MembershipUpdate {
369 full_refresh: matches!(kind, TickKind::Full),
370 item_ids,
371 tombstones,
372 recovered_ids: recovery_snapshot.ids(),
373 }
374 }
375}
376
377impl<T: DeltaSyncCacheable> RefreshSubscription<T> {
378 pub(crate) fn spawn<F>(punnu: Punnu<T>, interval: Duration, fetcher: F) -> DeltaRefreshHandle<T>
379 where
380 F: DeltaPunnuFetcher<T>,
381 {
382 assert!(
383 !interval.is_zero(),
384 "delta refresh interval must be non-zero"
385 );
386
387 let (cancel_tx, cancel_rx) = watch::channel(false);
388 let subscription = Arc::new(Self {
389 recovery: Mutex::new(RecoverySet::new(punnu.config().lru_size)),
390 punnu,
391 fetcher: Arc::new(fetcher),
392 last_watermark: Mutex::new(None),
393 membership: Mutex::new(HashSet::new()),
394 eviction_recovery_enabled: AtomicBool::new(false),
395 force_full_generation: AtomicU64::new(0),
396 satisfied_force_full_generation: AtomicU64::new(0),
397 lru_warning_issued: AtomicBool::new(false),
398 periodic_full_every: AtomicUsize::new(0),
399 periodic_full_progress: AtomicUsize::new(0),
400 recovery_tick: AtomicU64::new(1),
401 slot: Mutex::new(InFlightSlot::Empty),
402 next_operation_id: AtomicU64::new(1),
403 cancel: cancel_tx,
404 });
405
406 let loop_subscription = subscription.clone();
407 let executor = subscription.punnu.executor();
408 let events_subscription = subscription.clone();
409 let events = subscription.punnu.events();
410 let event_cancel_rx = cancel_rx.clone();
411 executor.spawn(box_spawn_future(async move {
412 run_delta_recovery_event_listener(events_subscription, events, event_cancel_rx).await;
413 }));
414 executor.spawn(box_spawn_future(async move {
415 run_periodic_delta_refresh(loop_subscription, interval, cancel_rx).await;
416 }));
417
418 DeltaRefreshHandle {
419 inner: subscription,
420 }
421 }
422
423 async fn update(subscription: Arc<Self>) -> Result<UpdateResult<T>, FetchError> {
424 let (_, result) = Self::update_with_kind(subscription).await?;
425 Ok(result)
426 }
427
428 async fn update_with_kind(
429 subscription: Arc<Self>,
430 ) -> Result<(TickKind, UpdateResult<T>), FetchError> {
431 if subscription.delta_should_promote_to_full() {
432 return Self::update_full(subscription)
433 .await
434 .map(|result| (TickKind::Full, result));
435 }
436 let (kind, shared) = subscription.shared_for_delta();
437 shared
438 .await
439 .map(|result| (kind, result))
440 .map_err(FetchError::from)
441 }
442
443 async fn update_full(subscription: Arc<Self>) -> Result<UpdateResult<T>, FetchError> {
444 let action = {
445 let mut slot = subscription
446 .slot
447 .lock()
448 .expect("delta refresh in-flight slot lock poisoned");
449 match &mut *slot {
450 InFlightSlot::Empty => {
451 let operation_id = subscription.next_operation_id();
452 let shared = subscription.build_and_spawn_tick(TickKind::Full, operation_id);
453 *slot = InFlightSlot::Full {
454 operation_id,
455 shared: shared.clone(),
456 };
457 UpdateFullAction::AwaitFull(shared)
458 }
459 InFlightSlot::Full { shared, .. } => {
460 let shared = shared.clone();
461 UpdateFullAction::AwaitFull(shared)
462 }
463 InFlightSlot::Delta {
464 shared,
465 pending_full,
466 ..
467 } => {
468 let delta = shared.clone();
469 let full = if let Some(pending) = pending_full {
470 pending.shared.clone()
471 } else {
472 let operation_id = subscription.next_operation_id();
473 let full =
474 subscription.build_and_spawn_chained_full(delta.clone(), operation_id);
475 *pending_full = Some(PendingFull {
476 operation_id,
477 shared: full.clone(),
478 });
479 full
480 };
481 UpdateFullAction::AwaitDeltaThenFull { delta, full }
482 }
483 }
484 };
485
486 match action {
487 UpdateFullAction::AwaitFull(shared) => shared.await.map_err(FetchError::from),
488 UpdateFullAction::AwaitDeltaThenFull { delta, full } => {
489 let _ = delta.await;
490 full.await.map_err(FetchError::from)
491 }
492 }
493 }
494
495 fn shared_for_delta(self: &Arc<Self>) -> (TickKind, SharedUpdate<T>) {
496 let mut slot = self
497 .slot
498 .lock()
499 .expect("delta refresh in-flight slot lock poisoned");
500 match &mut *slot {
501 InFlightSlot::Empty => {
502 let operation_id = self.next_operation_id();
503 let shared = self.build_and_spawn_tick(TickKind::Delta, operation_id);
504 *slot = InFlightSlot::Delta {
505 operation_id,
506 shared: shared.clone(),
507 pending_full: None,
508 };
509 (TickKind::Delta, shared)
510 }
511 InFlightSlot::Delta { shared, .. } => (TickKind::Delta, shared.clone()),
512 InFlightSlot::Full { shared, .. } => (TickKind::Full, shared.clone()),
513 }
514 }
515
516 async fn run_tick(&self, kind: TickKind) -> Result<UpdateResult<T>, FetchError> {
517 let current_tick = self.recovery_tick.fetch_add(1, Ordering::Relaxed);
518 let observed_force_generation = if matches!(kind, TickKind::Full) {
519 self.force_full_generation.load(Ordering::Acquire)
520 } else {
521 0
522 };
523 let (recovery_snapshot, recover_ids) = self.prepare_recovery_query(kind, current_tick);
524 let mut rollback = TickRollbackGuard::new(self, current_tick, recovery_snapshot);
525 let since = match kind {
526 TickKind::Delta => self
527 .last_watermark
528 .lock()
529 .expect("delta refresh watermark lock poisoned")
530 .clone(),
531 TickKind::Full => None,
532 };
533
534 let fetch = AssertUnwindSafe(self.fetcher.fetch_delta(DeltaQuery { since, recover_ids }))
535 .catch_unwind()
536 .await;
537
538 let delta = match fetch {
539 Ok(Ok(delta)) => delta,
540 Ok(Err(err)) => {
541 rollback.restore_after_failed();
542 return Err(err);
543 }
544 Err(panic_payload) => {
545 rollback.restore_after_failed();
546 return Err(FetchError::FetcherPanic {
547 type_name: std::any::type_name::<T>(),
548 message: panic_message(&panic_payload),
549 });
550 }
551 };
552
553 let membership_update =
554 MembershipUpdate::<T::Id>::from_delta(kind, &delta, rollback.recovery_snapshot());
555 let membership_before_prime = self.membership_snapshot();
556 self.prime_membership_for_observed_items(&membership_update);
557 rollback.record_membership_snapshot(membership_before_prime);
558 let (applied, next_watermark) =
559 match self.apply_delta_and_observed_watermark(delta, rollback.recovery_snapshot()) {
560 Ok(applied) => applied,
561 Err(err) => {
562 rollback.restore_after_failed();
563 return Err(err);
564 }
565 };
566 self.queue_missing_observed_items(&membership_update);
567 let watermark = self.advance_watermark(next_watermark);
568 self.note_successful_tick(kind, observed_force_generation);
569 self.apply_membership_update(membership_update);
570 rollback.note_success();
571
572 Ok(UpdateResult { applied, watermark })
573 }
574
575 fn prepare_recovery_query(
576 &self,
577 kind: TickKind,
578 current_tick: u64,
579 ) -> (RecoverySnapshot<T::Id>, HashSet<T::Id>) {
580 if !self.eviction_recovery_enabled.load(Ordering::Acquire) {
581 if matches!(kind, TickKind::Full) {
582 self.satisfied_force_full_generation.store(
583 self.force_full_generation.load(Ordering::Acquire),
584 Ordering::Release,
585 );
586 }
587 return (RecoverySnapshot::empty(), HashSet::new());
588 }
589
590 let mut recovery = self
591 .recovery
592 .lock()
593 .expect("delta refresh recovery lock poisoned");
594 match kind {
595 TickKind::Delta => {
596 let snapshot = recovery.snapshot_eligible(current_tick);
597 let recover_ids = snapshot.ids();
598 (snapshot, recover_ids)
599 }
600 TickKind::Full => (recovery.snapshot_all(), HashSet::new()),
601 }
602 }
603
604 fn restore_recovery_after_failed(&self, snapshot: RecoverySnapshot<T::Id>, current_tick: u64) {
605 self.recovery
606 .lock()
607 .expect("delta refresh recovery lock poisoned")
608 .restore_after_failed(snapshot, current_tick);
609 }
610
611 fn delta_should_promote_to_full(&self) -> bool {
612 if !self.eviction_recovery_enabled.load(Ordering::Acquire) {
613 return false;
614 }
615 if self.force_full_generation.load(Ordering::Acquire)
616 > self.satisfied_force_full_generation.load(Ordering::Acquire)
617 {
618 return true;
619 }
620 self.recovery
621 .lock()
622 .expect("delta refresh recovery lock poisoned")
623 .is_overflowing()
624 }
625
626 fn next_periodic_tick_is_full(&self) -> bool {
627 let every = self.periodic_full_every.load(Ordering::Acquire);
628 every != 0
629 && self
630 .periodic_full_progress
631 .load(Ordering::Acquire)
632 .saturating_add(1)
633 >= every
634 }
635
636 fn note_successful_tick(&self, kind: TickKind, observed_force_generation: u64) {
637 if matches!(kind, TickKind::Full) {
638 self.satisfied_force_full_generation
639 .fetch_max(observed_force_generation, Ordering::AcqRel);
640 if self.periodic_full_every.load(Ordering::Acquire) != 0 {
641 self.periodic_full_progress.store(0, Ordering::Release);
642 }
643 }
644 }
645
646 fn note_successful_scheduled_delta_tick(&self) {
647 let every = self.periodic_full_every.load(Ordering::Acquire);
648 if every == 0 {
649 return;
650 }
651 let next = self
652 .periodic_full_progress
653 .load(Ordering::Acquire)
654 .saturating_add(1)
655 .min(every.saturating_sub(1));
656 self.periodic_full_progress.store(next, Ordering::Release);
657 }
658
659 fn prime_membership_for_observed_items(&self, update: &MembershipUpdate<T::Id>) {
660 if update.item_ids.is_empty() && update.tombstones.is_empty() {
661 return;
662 }
663 let mut membership = self
664 .membership
665 .lock()
666 .expect("delta refresh membership lock poisoned");
667 for id in &update.tombstones {
668 membership.remove(id);
669 }
670 membership.extend(update.item_ids.iter().cloned());
671 }
672
673 fn membership_snapshot(&self) -> HashSet<T::Id> {
674 self.membership
675 .lock()
676 .expect("delta refresh membership lock poisoned")
677 .clone()
678 }
679
680 fn restore_membership(&self, snapshot: HashSet<T::Id>) {
681 *self
682 .membership
683 .lock()
684 .expect("delta refresh membership lock poisoned") = snapshot;
685 }
686
687 fn queue_missing_observed_items(&self, update: &MembershipUpdate<T::Id>) {
688 let missing = update
689 .item_ids
690 .iter()
691 .filter(|id| !self.punnu.contains_unexpired(id))
692 .cloned()
693 .collect::<Vec<_>>();
694 if missing.is_empty() {
695 return;
696 }
697 let mut recovery = self
698 .recovery
699 .lock()
700 .expect("delta refresh recovery lock poisoned");
701 for id in missing {
702 recovery.record_eviction(id);
703 }
704 }
705
706 fn apply_membership_update(&self, update: MembershipUpdate<T::Id>) {
707 let mut membership = self
708 .membership
709 .lock()
710 .expect("delta refresh membership lock poisoned");
711 if update.full_refresh {
712 *membership = update.item_ids;
713 } else {
714 for id in update.recovered_ids {
715 if !update.item_ids.contains(&id) && !update.tombstones.contains(&id) {
716 membership.remove(&id);
717 }
718 }
719 for id in update.tombstones {
720 membership.remove(&id);
721 }
722 membership.extend(update.item_ids);
723 }
724 }
725
726 fn note_lru_eviction(&self, id: T::Id) {
727 if !self.lru_warning_issued.swap(true, Ordering::AcqRel) {
728 tracing::warn!(
729 "Punnu LRU eviction observed while a delta refresh subscription is active; \
730 consider raising lru_size, enabling eviction recovery, or configuring periodic full refresh"
731 );
732 }
733
734 let owned_by_subscription = self
735 .membership
736 .lock()
737 .expect("delta refresh membership lock poisoned")
738 .contains(&id);
739 if owned_by_subscription {
740 self.recovery
741 .lock()
742 .expect("delta refresh recovery lock poisoned")
743 .record_eviction(id);
744 }
745 }
746
747 fn note_event_lag(&self, skipped: u64) {
748 tracing::warn!(
749 skipped,
750 "delta refresh subscription event stream lagged; forcing next recovery tick to full refresh"
751 );
752 self.force_full_generation.fetch_add(1, Ordering::AcqRel);
753 }
754
755 fn apply_delta_and_observed_watermark(
756 &self,
757 delta: DeltaResult<T, T::Watermark>,
758 recovery_snapshot: &RecoverySnapshot<T::Id>,
759 ) -> Result<(usize, Option<T::Watermark>), FetchError> {
760 let next_watermark = delta.observed_watermark();
761 let recovered_ids = recovery_snapshot.ids();
762 let stats = self
763 .punnu
764 .apply_delta_recovering(delta.without_high_watermark(), &recovered_ids);
765 if stats.backend_reserved_skips > 0 {
766 return Err(FetchError::Serialization(
767 "delta refresh deferred because a strict backend insert reserved one of its ids"
768 .to_owned(),
769 ));
770 }
771 Ok((stats.applied_items, next_watermark))
772 }
773
774 fn advance_watermark(&self, next_watermark: Option<T::Watermark>) -> Option<T::Watermark> {
775 let mut stored = self
776 .last_watermark
777 .lock()
778 .expect("delta refresh watermark lock poisoned");
779 if let Some(next_watermark) = next_watermark {
780 match &*stored {
781 Some(current) if current >= &next_watermark => {}
782 _ => *stored = Some(next_watermark),
783 }
784 }
785 stored.clone()
786 }
787
788 fn finish_tick(self: Arc<Self>, kind: TickKind, operation_id: u64) {
789 let mut slot = self
790 .slot
791 .lock()
792 .expect("delta refresh in-flight slot lock poisoned");
793
794 match (&mut *slot, kind) {
795 (
796 InFlightSlot::Delta {
797 operation_id: current_id,
798 pending_full,
799 ..
800 },
801 TickKind::Delta,
802 ) if *current_id == operation_id => {
803 if let Some(pending) = pending_full.take() {
804 *slot = InFlightSlot::Full {
805 operation_id: pending.operation_id,
806 shared: pending.shared,
807 };
808 } else {
809 *slot = InFlightSlot::Empty;
810 }
811 }
812 (
813 InFlightSlot::Full {
814 operation_id: current_id,
815 ..
816 },
817 TickKind::Full,
818 ) if *current_id == operation_id => {
819 *slot = InFlightSlot::Empty;
820 }
821 _ => {}
822 }
823 }
824
825 fn build_and_spawn_tick(
826 self: &Arc<Self>,
827 kind: TickKind,
828 operation_id: u64,
829 ) -> SharedUpdate<T> {
830 let future = box_update_future(self.clone().run_owned_tick(kind, operation_id));
831 let shared = future.shared();
832 let driver = shared.clone();
833 self.punnu.executor().spawn(box_spawn_future(async move {
834 let _ = driver.await;
835 }));
836 shared
837 }
838
839 fn build_and_spawn_chained_full(
840 self: &Arc<Self>,
841 delta: SharedUpdate<T>,
842 operation_id: u64,
843 ) -> SharedUpdate<T> {
844 let owner = self.clone();
845 let future = box_update_future(async move {
846 let _ = delta.await;
847 owner.run_owned_tick(TickKind::Full, operation_id).await
848 });
849 let shared = future.shared();
850 let driver = shared.clone();
851 self.punnu.executor().spawn(box_spawn_future(async move {
852 let _ = driver.await;
853 }));
854 shared
855 }
856
857 async fn run_owned_tick(self: Arc<Self>, kind: TickKind, operation_id: u64) -> SharedOutput<T> {
858 let type_name = std::any::type_name::<T>();
859 let result = AssertUnwindSafe(self.run_tick(kind)).catch_unwind().await;
860 let output = match result {
861 Ok(Ok(result)) => Ok(result),
862 Ok(Err(err)) => Err(into_clone(err)),
863 Err(panic_payload) => Err(FetchErrorClone::FetcherPanic {
864 type_name,
865 message: panic_message(&panic_payload),
866 }),
867 };
868 self.finish_tick(kind, operation_id);
869 output
870 }
871
872 fn next_operation_id(&self) -> u64 {
873 self.next_operation_id.fetch_add(1, Ordering::Relaxed)
874 }
875}
876
877struct TickRollbackGuard<'a, T: DeltaSyncCacheable> {
878 subscription: &'a RefreshSubscription<T>,
879 current_tick: u64,
880 recovery_snapshot: Option<RecoverySnapshot<T::Id>>,
881 membership_before_prime: Option<HashSet<T::Id>>,
882 resolved: bool,
883}
884
885impl<'a, T: DeltaSyncCacheable> TickRollbackGuard<'a, T> {
886 fn new(
887 subscription: &'a RefreshSubscription<T>,
888 current_tick: u64,
889 recovery_snapshot: RecoverySnapshot<T::Id>,
890 ) -> Self {
891 Self {
892 subscription,
893 current_tick,
894 recovery_snapshot: Some(recovery_snapshot),
895 membership_before_prime: None,
896 resolved: false,
897 }
898 }
899
900 fn recovery_snapshot(&self) -> &RecoverySnapshot<T::Id> {
901 self.recovery_snapshot
902 .as_ref()
903 .expect("delta tick recovery snapshot already resolved")
904 }
905
906 fn record_membership_snapshot(&mut self, snapshot: HashSet<T::Id>) {
907 self.membership_before_prime = Some(snapshot);
908 }
909
910 fn restore_after_failed(&mut self) {
911 if let Some(snapshot) = self.membership_before_prime.take() {
912 self.subscription.restore_membership(snapshot);
913 }
914 if let Some(snapshot) = self.recovery_snapshot.take() {
915 self.subscription
916 .restore_recovery_after_failed(snapshot, self.current_tick);
917 }
918 self.resolved = true;
919 }
920
921 fn note_success(mut self) {
922 let Some(snapshot) = self.recovery_snapshot.take() else {
923 self.resolved = true;
924 return;
925 };
926 self.subscription
927 .recovery
928 .lock()
929 .expect("delta refresh recovery lock poisoned")
930 .note_success(snapshot);
931 self.resolved = true;
932 }
933}
934
935impl<T: DeltaSyncCacheable> Drop for TickRollbackGuard<'_, T> {
936 fn drop(&mut self) {
937 if self.resolved {
938 return;
939 }
940
941 if let Some(snapshot) = self.membership_before_prime.take() {
942 match self.subscription.membership.lock() {
943 Ok(mut membership) => *membership = snapshot,
944 Err(_) => tracing::error!(
945 "delta refresh membership lock poisoned while rolling back failed tick"
946 ),
947 }
948 }
949
950 if let Some(snapshot) = self.recovery_snapshot.take() {
951 match self.subscription.recovery.lock() {
952 Ok(mut recovery) => {
953 recovery.restore_after_failed(snapshot, self.current_tick);
954 }
955 Err(_) => tracing::error!(
956 "delta refresh recovery lock poisoned while rolling back failed tick"
957 ),
958 }
959 }
960 }
961}
962
963async fn run_periodic_delta_refresh<T>(
964 subscription: Arc<RefreshSubscription<T>>,
965 interval: Duration,
966 mut cancel: watch::Receiver<bool>,
967) where
968 T: DeltaSyncCacheable,
969{
970 loop {
971 if refresh_cancelled(&cancel) {
972 break;
973 }
974
975 let sleep = subscription.punnu.executor().sleep(interval);
976 tokio::select! {
977 biased;
978 changed = cancel.changed() => {
979 let _ = changed;
980 break;
981 }
982 _ = sleep => {
983 if refresh_cancelled(&cancel) {
984 break;
985 }
986 tokio::select! {
987 biased;
988 changed = cancel.changed() => {
989 let _ = changed;
990 break;
991 }
992 result = run_periodic_tick(subscription.clone()) => {
993 if let Err(err) = result {
994 tracing::warn!(error = %err, "delta refresh failed");
995 }
996 }
997 }
998 }
999 }
1000 }
1001}
1002
1003async fn run_periodic_tick<T>(
1004 subscription: Arc<RefreshSubscription<T>>,
1005) -> Result<UpdateResult<T>, FetchError>
1006where
1007 T: DeltaSyncCacheable,
1008{
1009 if subscription.next_periodic_tick_is_full() {
1010 RefreshSubscription::update_full(subscription).await
1011 } else {
1012 let (kind, result) = RefreshSubscription::update_with_kind(subscription.clone()).await?;
1013 if matches!(kind, TickKind::Delta) {
1014 subscription.note_successful_scheduled_delta_tick();
1015 }
1016 Ok(result)
1017 }
1018}
1019
1020async fn run_delta_recovery_event_listener<T>(
1021 subscription: Arc<RefreshSubscription<T>>,
1022 mut events: broadcast::Receiver<PunnuEvent<T>>,
1023 mut cancel: watch::Receiver<bool>,
1024) where
1025 T: DeltaSyncCacheable,
1026{
1027 loop {
1028 if refresh_cancelled(&cancel) {
1029 break;
1030 }
1031
1032 tokio::select! {
1033 biased;
1034 changed = cancel.changed() => {
1035 let _ = changed;
1036 break;
1037 }
1038 event = events.recv() => {
1039 match event {
1040 Ok(PunnuEvent::Invalidate { id, reason: EventReason::LruEvict }) => {
1041 subscription.note_lru_eviction(id);
1042 }
1043 Ok(_) => {}
1044 Err(broadcast::error::RecvError::Lagged(skipped)) => {
1045 subscription.note_event_lag(skipped);
1046 }
1047 Err(broadcast::error::RecvError::Closed) => break,
1048 }
1049 }
1050 }
1051 }
1052}
1053
1054fn refresh_cancelled(cancel: &watch::Receiver<bool>) -> bool {
1055 *cancel.borrow() || cancel.has_changed().is_err()
1056}
1057
1058fn assert_active_tokio_runtime(_operation: &str) {
1059 #[cfg(all(feature = "runtime-tokio", not(target_arch = "wasm32")))]
1060 if tokio::runtime::Handle::try_current().is_err() {
1061 panic!("{_operation} requires an active Tokio runtime");
1062 }
1063}
1064
1065fn panic_message(payload: &Box<dyn std::any::Any + Send>) -> String {
1066 if let Some(s) = payload.downcast_ref::<String>() {
1067 s.clone()
1068 } else if let Some(s) = payload.downcast_ref::<&'static str>() {
1069 (*s).to_string()
1070 } else {
1071 String::new()
1072 }
1073}
1074
1075#[cfg(not(target_arch = "wasm32"))]
1076fn box_update_future<T>(
1077 future: impl Future<Output = SharedOutput<T>> + Send + 'static,
1078) -> UpdateFuture<T>
1079where
1080 T: DeltaSyncCacheable,
1081{
1082 Box::pin(future)
1083}
1084
1085#[cfg(target_arch = "wasm32")]
1086fn box_update_future<T>(future: impl Future<Output = SharedOutput<T>> + 'static) -> UpdateFuture<T>
1087where
1088 T: DeltaSyncCacheable,
1089{
1090 Box::pin(future)
1091}
1092
1093#[cfg(not(target_arch = "wasm32"))]
1094fn box_spawn_future(future: impl Future<Output = ()> + Send + 'static) -> BoxFut<'static> {
1095 Box::pin(future)
1096}
1097
1098#[cfg(target_arch = "wasm32")]
1099fn box_spawn_future(future: impl Future<Output = ()> + 'static) -> BoxFut<'static> {
1100 Box::pin(future)
1101}
1102
1103#[cfg(all(test, feature = "runtime-tokio", not(target_arch = "wasm32")))]
1104mod tests {
1105 use super::*;
1106 use crate::Cacheable;
1107 use std::collections::HashSet;
1108 use tokio::sync::Notify;
1109
1110 #[derive(Clone)]
1111 struct TestItem {
1112 id: i64,
1113 updated_at: i64,
1114 }
1115
1116 impl Cacheable for TestItem {
1117 type Id = i64;
1118 type Fields = ();
1119
1120 fn id(&self) -> Self::Id {
1121 self.id
1122 }
1123
1124 fn fields() -> Self::Fields {}
1125 }
1126
1127 impl DeltaSyncCacheable for TestItem {
1128 type Watermark = i64;
1129
1130 fn watermark(&self) -> Self::Watermark {
1131 self.updated_at
1132 }
1133 }
1134
1135 #[derive(Clone)]
1136 struct BlockingFetcher {
1137 started: Arc<Notify>,
1138 release: Arc<Notify>,
1139 }
1140
1141 #[async_trait]
1142 impl DeltaPunnuFetcher<TestItem> for BlockingFetcher {
1143 async fn fetch_delta(
1144 &self,
1145 _query: DeltaQuery<TestItem>,
1146 ) -> Result<DeltaResult<TestItem, i64>, FetchError> {
1147 self.started.notify_one();
1148 self.release.notified().await;
1149 Ok(DeltaResult::new(
1150 vec![TestItem {
1151 id: 1,
1152 updated_at: 10,
1153 }],
1154 HashSet::new(),
1155 ))
1156 }
1157 }
1158
1159 async fn wait_for_notification(notify: &Notify, context: &'static str) {
1160 tokio::time::timeout(Duration::from_secs(2), notify.notified())
1161 .await
1162 .expect(context);
1163 }
1164
1165 #[tokio::test]
1166 async fn periodic_cancel_drops_awaiter_while_spawned_fetch_continues() {
1167 let punnu = Punnu::<TestItem>::builder().build();
1168 let started = Arc::new(Notify::new());
1169 let release = Arc::new(Notify::new());
1170 let handle = punnu.start_delta_refresh(
1171 Duration::from_millis(1),
1172 BlockingFetcher {
1173 started: started.clone(),
1174 release: release.clone(),
1175 },
1176 );
1177 let weak_subscription = Arc::downgrade(&handle.inner);
1178
1179 wait_for_notification(&started, "periodic delta fetch should start").await;
1180 drop(handle);
1181
1182 let cancel_observed = tokio::time::timeout(Duration::from_millis(200), async {
1183 loop {
1184 if weak_subscription.strong_count() <= 1 {
1185 break;
1186 }
1187 tokio::task::yield_now().await;
1188 }
1189 })
1190 .await;
1191 release.notify_one();
1192
1193 assert!(
1194 cancel_observed.is_ok(),
1195 "periodic loop should stop awaiting a blocked shared update when the handle is dropped"
1196 );
1197 tokio::time::timeout(Duration::from_secs(2), async {
1198 loop {
1199 if weak_subscription.strong_count() == 0 {
1200 break;
1201 }
1202 tokio::task::yield_now().await;
1203 }
1204 })
1205 .await
1206 .expect("detached fetch driver should finish and release the subscription");
1207 }
1208}