Skip to main content

sassi/punnu/
delta_refresh.rs

1//! Delta-sync refresh subscriptions.
2//!
3//! A delta subscription is scoped to one fetcher/filter pair. Multiple
4//! subscriptions may write into the same [`Punnu`]
5//! identity map, but each subscription owns its own watermark and
6//! single-flight slot so a narrow query cannot advance a broader query's
7//! progress.
8
9use crate::error::FetchError;
10use crate::executor::BoxFut;
11use crate::punnu::delta::DeltaResult;
12use crate::punnu::events::{EventReason, PunnuEvent};
13use crate::punnu::pool::Punnu;
14use crate::punnu::recovery::{RecoverySet, RecoverySnapshot};
15use crate::punnu::single_flight::{FetchErrorClone, into_clone};
16use crate::watermark::DeltaSyncCacheable;
17#[cfg(not(target_arch = "wasm32"))]
18use async_trait::async_trait;
19use futures::FutureExt;
20#[cfg(not(target_arch = "wasm32"))]
21use futures::future::BoxFuture;
22#[cfg(target_arch = "wasm32")]
23use futures::future::LocalBoxFuture;
24use futures::future::Shared;
25use std::collections::HashSet;
26use std::fmt;
27use std::future::Future;
28use std::num::NonZeroUsize;
29use std::panic::AssertUnwindSafe;
30use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
31use std::sync::{Arc, Mutex};
32use std::time::Duration;
33use tokio::sync::broadcast;
34use tokio::sync::watch;
35
36/// User-supplied delta fetcher for one refresh subscription.
37///
38/// Native fetchers must be `Send + Sync` because Sassi's default native
39/// executor uses `tokio::spawn`. The wasm target accepts non-`Send`
40/// futures so browser-side fetchers can await JS-backed primitives.
41///
42/// # Fetcher contract
43///
44/// Treat [`DeltaQuery::since`] as an inclusive lower bound (`>=`), not a
45/// strict `>` cursor. Boundary rows may have changed without their
46/// watermark changing; Sassi deduplicates by `T::Id` when those rows are
47/// returned again.
48///
49/// Tombstones are true deletes from the shared `Punnu<T>` identity map.
50/// Do not use a tombstone to mean "this row left my query/filter/page";
51/// return the updated row and let read-time predicates stop matching it.
52/// For delete-only batches, use
53/// [`DeltaResult::with_high_watermark`] so the subscription can advance
54/// its source cursor even though no item carries the delete watermark.
55///
56/// The subscription cursor tracks source progress, not L1 retention. A
57/// row can be processed and then omitted from L1 because sampled-LRU
58/// evicted it, or because [`crate::punnu::OnConflict::Reject`] kept an
59/// existing resident value. Use `LastWriteWins` or `Update` when a delta
60/// stream is authoritative for cached values.
61#[cfg(not(target_arch = "wasm32"))]
62#[async_trait]
63pub trait DeltaPunnuFetcher<T: DeltaSyncCacheable>: Send + Sync + 'static {
64    /// Fetch one delta or full-refresh result for this subscription.
65    async fn fetch_delta(
66        &self,
67        query: DeltaQuery<T>,
68    ) -> Result<DeltaResult<T, T::Watermark>, FetchError>;
69}
70
71/// User-supplied delta fetcher for one refresh subscription.
72///
73/// The wasm target accepts non-`Send` futures so browser-side fetchers
74/// can await JS-backed primitives.
75///
76/// # Fetcher contract
77///
78/// Treat [`DeltaQuery::since`] as an inclusive lower bound (`>=`), not a
79/// strict `>` cursor. Boundary rows may have changed without their
80/// watermark changing; Sassi deduplicates by `T::Id` when those rows are
81/// returned again.
82///
83/// Tombstones are true deletes from the shared `Punnu<T>` identity map.
84/// Do not use a tombstone to mean "this row left my query/filter/page";
85/// return the updated row and let read-time predicates stop matching it.
86/// For delete-only batches, use
87/// [`DeltaResult::with_high_watermark`] so the subscription can advance
88/// its source cursor even though no item carries the delete watermark.
89///
90/// The subscription cursor tracks source progress, not L1 retention. A
91/// row can be processed and then omitted from L1 because sampled-LRU
92/// evicted it, or because [`crate::punnu::OnConflict::Reject`] kept an
93/// existing resident value. Use `LastWriteWins` or `Update` when a delta
94/// stream is authoritative for cached values.
95#[cfg(target_arch = "wasm32")]
96#[async_trait::async_trait(?Send)]
97pub trait DeltaPunnuFetcher<T: DeltaSyncCacheable>: 'static {
98    /// Fetch one delta or full-refresh result for this subscription.
99    async fn fetch_delta(
100        &self,
101        query: DeltaQuery<T>,
102    ) -> Result<DeltaResult<T, T::Watermark>, FetchError>;
103}
104
105/// Query passed to [`DeltaPunnuFetcher::fetch_delta`].
106#[non_exhaustive]
107pub struct DeltaQuery<T: DeltaSyncCacheable> {
108    /// The current subscription watermark. `None` means a full query.
109    ///
110    /// When this is `Some`, fetchers must query with an inclusive
111    /// `>=` boundary and let Sassi deduplicate rows by identity.
112    pub since: Option<T::Watermark>,
113    /// IDs that must be recovered regardless of watermark.
114    ///
115    /// Eviction recovery uses this to ask the fetcher for IDs that
116    /// this subscription previously observed but L1 later evicted.
117    pub recover_ids: HashSet<T::Id>,
118}
119
120/// Result of one delta subscription update.
121pub struct UpdateResult<T: DeltaSyncCacheable> {
122    /// Count of fetched items that survived into the published snapshot.
123    pub applied: usize,
124    /// The subscription's high-water mark after the update.
125    pub watermark: Option<T::Watermark>,
126}
127
128impl<T: DeltaSyncCacheable> Clone for UpdateResult<T> {
129    fn clone(&self) -> Self {
130        Self {
131            applied: self.applied,
132            watermark: self.watermark.clone(),
133        }
134    }
135}
136
137impl<T> fmt::Debug for UpdateResult<T>
138where
139    T: DeltaSyncCacheable,
140    T::Watermark: fmt::Debug,
141{
142    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143        f.debug_struct("UpdateResult")
144            .field("applied", &self.applied)
145            .field("watermark", &self.watermark)
146            .finish()
147    }
148}
149
150impl<T: DeltaSyncCacheable> PartialEq for UpdateResult<T> {
151    fn eq(&self, other: &Self) -> bool {
152        self.applied == other.applied && self.watermark == other.watermark
153    }
154}
155
156impl<T: DeltaSyncCacheable> Eq for UpdateResult<T> {}
157
158/// Public handle to a delta-sync subscription.
159pub struct DeltaRefreshHandle<T: DeltaSyncCacheable> {
160    pub(crate) inner: Arc<RefreshSubscription<T>>,
161}
162
163impl<T: DeltaSyncCacheable> DeltaRefreshHandle<T> {
164    /// Trigger one delta update and wait for the shared result.
165    ///
166    /// Once a delta fetch is registered, dropping this caller's future
167    /// does not cancel the fetch or its cache-state application.
168    ///
169    /// # Panics
170    ///
171    /// On native `runtime-tokio` builds, panics if called outside an
172    /// active Tokio runtime.
173    pub async fn update(&self) -> Result<UpdateResult<T>, FetchError> {
174        assert_active_tokio_runtime("DeltaRefreshHandle::update");
175        RefreshSubscription::update(self.inner.clone()).await
176    }
177
178    /// Trigger one full refresh and wait for the shared result.
179    ///
180    /// If a delta is already in flight, the full refresh is queued behind
181    /// it and coalesced for every caller. Once the request is registered,
182    /// dropping the caller's future does not cancel the queued full
183    /// refresh; this prevents `update_full()` starvation under sustained
184    /// delta traffic.
185    ///
186    /// # Panics
187    ///
188    /// On native `runtime-tokio` builds, panics if called outside an
189    /// active Tokio runtime.
190    pub async fn update_full(&self) -> Result<UpdateResult<T>, FetchError> {
191        assert_active_tokio_runtime("DeltaRefreshHandle::update_full");
192        RefreshSubscription::update_full(self.inner.clone()).await
193    }
194
195    /// Stop future periodic ticks. In-flight fetches continue to
196    /// completion and still apply their cache-state changes.
197    pub fn cancel(&self) {
198        let _ = self.inner.cancel.send(true);
199    }
200
201    /// Configure eviction-triggered recovery for this subscription.
202    ///
203    /// When enabled, LRU evictions for IDs this subscription has
204    /// observed are passed to the fetcher as
205    /// [`DeltaQuery::recover_ids`] on a later delta update.
206    pub fn with_eviction_recovery(self, enabled: bool) -> Self {
207        self.inner
208            .eviction_recovery_enabled
209            .store(enabled, Ordering::Release);
210        if !enabled {
211            self.inner
212                .recovery
213                .lock()
214                .expect("delta refresh recovery lock poisoned")
215                .clear();
216            self.inner.satisfied_force_full_generation.store(
217                self.inner.force_full_generation.load(Ordering::Acquire),
218                Ordering::Release,
219            );
220        }
221        self
222    }
223
224    /// Configure periodic full refreshes.
225    ///
226    /// `Some(n)` makes every nth successful scheduled refresh tick use
227    /// `since = None`; `None` disables the policy.
228    pub fn with_periodic_full_refresh(self, every_n_ticks: Option<NonZeroUsize>) -> Self {
229        self.inner.periodic_full_every.store(
230            every_n_ticks.map(NonZeroUsize::get).unwrap_or(0),
231            Ordering::Release,
232        );
233        self.inner
234            .periodic_full_progress
235            .store(0, Ordering::Release);
236        self
237    }
238
239    /// Count IDs queued for eviction recovery on this subscription.
240    ///
241    /// Returns zero when eviction recovery is disabled.
242    pub fn pending_eviction_recovery_count(&self) -> usize {
243        if !self.inner.eviction_recovery_enabled.load(Ordering::Acquire) {
244            return 0;
245        }
246        self.inner
247            .recovery
248            .lock()
249            .expect("delta refresh recovery lock poisoned")
250            .len()
251    }
252
253    /// Current periodic full-refresh progress.
254    ///
255    /// Returns `None` when the policy is disabled; otherwise returns
256    /// `(elapsed, configured_every)`.
257    pub fn periodic_full_refresh_progress(&self) -> Option<(usize, NonZeroUsize)> {
258        let every = NonZeroUsize::new(self.inner.periodic_full_every.load(Ordering::Acquire))?;
259        let progress = self
260            .inner
261            .periodic_full_progress
262            .load(Ordering::Acquire)
263            .min(every.get().saturating_sub(1));
264        Some((progress, every))
265    }
266
267    /// Return the current subscription watermark.
268    pub fn watermark(&self) -> Option<T::Watermark> {
269        self.inner
270            .last_watermark
271            .lock()
272            .expect("delta refresh watermark lock poisoned")
273            .clone()
274    }
275}
276
277impl<T: DeltaSyncCacheable> Drop for DeltaRefreshHandle<T> {
278    fn drop(&mut self) {
279        let _ = self.inner.cancel.send(true);
280    }
281}
282
283pub(crate) struct RefreshSubscription<T: DeltaSyncCacheable> {
284    punnu: Punnu<T>,
285    fetcher: Arc<dyn DeltaPunnuFetcher<T>>,
286    last_watermark: Mutex<Option<T::Watermark>>,
287    membership: Mutex<HashSet<T::Id>>,
288    recovery: Mutex<RecoverySet<T::Id>>,
289    eviction_recovery_enabled: AtomicBool,
290    force_full_generation: AtomicU64,
291    satisfied_force_full_generation: AtomicU64,
292    lru_warning_issued: AtomicBool,
293    periodic_full_every: AtomicUsize,
294    periodic_full_progress: AtomicUsize,
295    recovery_tick: AtomicU64,
296    slot: Mutex<InFlightSlot<T>>,
297    next_operation_id: AtomicU64,
298    cancel: watch::Sender<bool>,
299}
300
301enum InFlightSlot<T: DeltaSyncCacheable> {
302    Empty,
303    Delta {
304        operation_id: u64,
305        shared: SharedUpdate<T>,
306        pending_full: Option<PendingFull<T>>,
307    },
308    Full {
309        operation_id: u64,
310        shared: SharedUpdate<T>,
311    },
312}
313
314struct PendingFull<T: DeltaSyncCacheable> {
315    operation_id: u64,
316    shared: SharedUpdate<T>,
317}
318
319#[derive(Debug, Clone, Copy, PartialEq, Eq)]
320enum TickKind {
321    Delta,
322    Full,
323}
324
325struct MembershipUpdate<Id> {
326    full_refresh: bool,
327    item_ids: HashSet<Id>,
328    tombstones: HashSet<Id>,
329    recovered_ids: HashSet<Id>,
330}
331
332enum UpdateFullAction<T: DeltaSyncCacheable> {
333    AwaitFull(SharedUpdate<T>),
334    AwaitDeltaThenFull {
335        delta: SharedUpdate<T>,
336        full: SharedUpdate<T>,
337    },
338}
339
340type SharedOutput<T> = Result<UpdateResult<T>, FetchErrorClone>;
341
342#[cfg(not(target_arch = "wasm32"))]
343type UpdateFuture<T> = BoxFuture<'static, SharedOutput<T>>;
344#[cfg(target_arch = "wasm32")]
345type UpdateFuture<T> = LocalBoxFuture<'static, SharedOutput<T>>;
346
347type SharedUpdate<T> = Shared<UpdateFuture<T>>;
348
349impl<Id> MembershipUpdate<Id>
350where
351    Id: Eq + std::hash::Hash + Clone,
352{
353    fn from_delta<T>(
354        kind: TickKind,
355        delta: &DeltaResult<T, T::Watermark>,
356        recovery_snapshot: &RecoverySnapshot<T::Id>,
357    ) -> MembershipUpdate<T::Id>
358    where
359        T: DeltaSyncCacheable<Id = Id>,
360    {
361        let tombstones = delta.tombstones.clone();
362        let item_ids = delta
363            .items
364            .iter()
365            .map(|item| item.id())
366            .filter(|id| !tombstones.contains(id))
367            .collect();
368        MembershipUpdate {
369            full_refresh: matches!(kind, TickKind::Full),
370            item_ids,
371            tombstones,
372            recovered_ids: recovery_snapshot.ids(),
373        }
374    }
375}
376
377impl<T: DeltaSyncCacheable> RefreshSubscription<T> {
378    pub(crate) fn spawn<F>(punnu: Punnu<T>, interval: Duration, fetcher: F) -> DeltaRefreshHandle<T>
379    where
380        F: DeltaPunnuFetcher<T>,
381    {
382        assert!(
383            !interval.is_zero(),
384            "delta refresh interval must be non-zero"
385        );
386
387        let (cancel_tx, cancel_rx) = watch::channel(false);
388        let subscription = Arc::new(Self {
389            recovery: Mutex::new(RecoverySet::new(punnu.config().lru_size)),
390            punnu,
391            fetcher: Arc::new(fetcher),
392            last_watermark: Mutex::new(None),
393            membership: Mutex::new(HashSet::new()),
394            eviction_recovery_enabled: AtomicBool::new(false),
395            force_full_generation: AtomicU64::new(0),
396            satisfied_force_full_generation: AtomicU64::new(0),
397            lru_warning_issued: AtomicBool::new(false),
398            periodic_full_every: AtomicUsize::new(0),
399            periodic_full_progress: AtomicUsize::new(0),
400            recovery_tick: AtomicU64::new(1),
401            slot: Mutex::new(InFlightSlot::Empty),
402            next_operation_id: AtomicU64::new(1),
403            cancel: cancel_tx,
404        });
405
406        let loop_subscription = subscription.clone();
407        let executor = subscription.punnu.executor();
408        let events_subscription = subscription.clone();
409        let events = subscription.punnu.events();
410        let event_cancel_rx = cancel_rx.clone();
411        executor.spawn(box_spawn_future(async move {
412            run_delta_recovery_event_listener(events_subscription, events, event_cancel_rx).await;
413        }));
414        executor.spawn(box_spawn_future(async move {
415            run_periodic_delta_refresh(loop_subscription, interval, cancel_rx).await;
416        }));
417
418        DeltaRefreshHandle {
419            inner: subscription,
420        }
421    }
422
423    async fn update(subscription: Arc<Self>) -> Result<UpdateResult<T>, FetchError> {
424        let (_, result) = Self::update_with_kind(subscription).await?;
425        Ok(result)
426    }
427
428    async fn update_with_kind(
429        subscription: Arc<Self>,
430    ) -> Result<(TickKind, UpdateResult<T>), FetchError> {
431        if subscription.delta_should_promote_to_full() {
432            return Self::update_full(subscription)
433                .await
434                .map(|result| (TickKind::Full, result));
435        }
436        let (kind, shared) = subscription.shared_for_delta();
437        shared
438            .await
439            .map(|result| (kind, result))
440            .map_err(FetchError::from)
441    }
442
443    async fn update_full(subscription: Arc<Self>) -> Result<UpdateResult<T>, FetchError> {
444        let action = {
445            let mut slot = subscription
446                .slot
447                .lock()
448                .expect("delta refresh in-flight slot lock poisoned");
449            match &mut *slot {
450                InFlightSlot::Empty => {
451                    let operation_id = subscription.next_operation_id();
452                    let shared = subscription.build_and_spawn_tick(TickKind::Full, operation_id);
453                    *slot = InFlightSlot::Full {
454                        operation_id,
455                        shared: shared.clone(),
456                    };
457                    UpdateFullAction::AwaitFull(shared)
458                }
459                InFlightSlot::Full { shared, .. } => {
460                    let shared = shared.clone();
461                    UpdateFullAction::AwaitFull(shared)
462                }
463                InFlightSlot::Delta {
464                    shared,
465                    pending_full,
466                    ..
467                } => {
468                    let delta = shared.clone();
469                    let full = if let Some(pending) = pending_full {
470                        pending.shared.clone()
471                    } else {
472                        let operation_id = subscription.next_operation_id();
473                        let full =
474                            subscription.build_and_spawn_chained_full(delta.clone(), operation_id);
475                        *pending_full = Some(PendingFull {
476                            operation_id,
477                            shared: full.clone(),
478                        });
479                        full
480                    };
481                    UpdateFullAction::AwaitDeltaThenFull { delta, full }
482                }
483            }
484        };
485
486        match action {
487            UpdateFullAction::AwaitFull(shared) => shared.await.map_err(FetchError::from),
488            UpdateFullAction::AwaitDeltaThenFull { delta, full } => {
489                let _ = delta.await;
490                full.await.map_err(FetchError::from)
491            }
492        }
493    }
494
495    fn shared_for_delta(self: &Arc<Self>) -> (TickKind, SharedUpdate<T>) {
496        let mut slot = self
497            .slot
498            .lock()
499            .expect("delta refresh in-flight slot lock poisoned");
500        match &mut *slot {
501            InFlightSlot::Empty => {
502                let operation_id = self.next_operation_id();
503                let shared = self.build_and_spawn_tick(TickKind::Delta, operation_id);
504                *slot = InFlightSlot::Delta {
505                    operation_id,
506                    shared: shared.clone(),
507                    pending_full: None,
508                };
509                (TickKind::Delta, shared)
510            }
511            InFlightSlot::Delta { shared, .. } => (TickKind::Delta, shared.clone()),
512            InFlightSlot::Full { shared, .. } => (TickKind::Full, shared.clone()),
513        }
514    }
515
516    async fn run_tick(&self, kind: TickKind) -> Result<UpdateResult<T>, FetchError> {
517        let current_tick = self.recovery_tick.fetch_add(1, Ordering::Relaxed);
518        let observed_force_generation = if matches!(kind, TickKind::Full) {
519            self.force_full_generation.load(Ordering::Acquire)
520        } else {
521            0
522        };
523        let (recovery_snapshot, recover_ids) = self.prepare_recovery_query(kind, current_tick);
524        let mut rollback = TickRollbackGuard::new(self, current_tick, recovery_snapshot);
525        let since = match kind {
526            TickKind::Delta => self
527                .last_watermark
528                .lock()
529                .expect("delta refresh watermark lock poisoned")
530                .clone(),
531            TickKind::Full => None,
532        };
533
534        let fetch = AssertUnwindSafe(self.fetcher.fetch_delta(DeltaQuery { since, recover_ids }))
535            .catch_unwind()
536            .await;
537
538        let delta = match fetch {
539            Ok(Ok(delta)) => delta,
540            Ok(Err(err)) => {
541                rollback.restore_after_failed();
542                return Err(err);
543            }
544            Err(panic_payload) => {
545                rollback.restore_after_failed();
546                return Err(FetchError::FetcherPanic {
547                    type_name: std::any::type_name::<T>(),
548                    message: panic_message(&panic_payload),
549                });
550            }
551        };
552
553        let membership_update =
554            MembershipUpdate::<T::Id>::from_delta(kind, &delta, rollback.recovery_snapshot());
555        let membership_before_prime = self.membership_snapshot();
556        self.prime_membership_for_observed_items(&membership_update);
557        rollback.record_membership_snapshot(membership_before_prime);
558        let (applied, next_watermark) =
559            match self.apply_delta_and_observed_watermark(delta, rollback.recovery_snapshot()) {
560                Ok(applied) => applied,
561                Err(err) => {
562                    rollback.restore_after_failed();
563                    return Err(err);
564                }
565            };
566        self.queue_missing_observed_items(&membership_update);
567        let watermark = self.advance_watermark(next_watermark);
568        self.note_successful_tick(kind, observed_force_generation);
569        self.apply_membership_update(membership_update);
570        rollback.note_success();
571
572        Ok(UpdateResult { applied, watermark })
573    }
574
575    fn prepare_recovery_query(
576        &self,
577        kind: TickKind,
578        current_tick: u64,
579    ) -> (RecoverySnapshot<T::Id>, HashSet<T::Id>) {
580        if !self.eviction_recovery_enabled.load(Ordering::Acquire) {
581            if matches!(kind, TickKind::Full) {
582                self.satisfied_force_full_generation.store(
583                    self.force_full_generation.load(Ordering::Acquire),
584                    Ordering::Release,
585                );
586            }
587            return (RecoverySnapshot::empty(), HashSet::new());
588        }
589
590        let mut recovery = self
591            .recovery
592            .lock()
593            .expect("delta refresh recovery lock poisoned");
594        match kind {
595            TickKind::Delta => {
596                let snapshot = recovery.snapshot_eligible(current_tick);
597                let recover_ids = snapshot.ids();
598                (snapshot, recover_ids)
599            }
600            TickKind::Full => (recovery.snapshot_all(), HashSet::new()),
601        }
602    }
603
604    fn restore_recovery_after_failed(&self, snapshot: RecoverySnapshot<T::Id>, current_tick: u64) {
605        self.recovery
606            .lock()
607            .expect("delta refresh recovery lock poisoned")
608            .restore_after_failed(snapshot, current_tick);
609    }
610
611    fn delta_should_promote_to_full(&self) -> bool {
612        if !self.eviction_recovery_enabled.load(Ordering::Acquire) {
613            return false;
614        }
615        if self.force_full_generation.load(Ordering::Acquire)
616            > self.satisfied_force_full_generation.load(Ordering::Acquire)
617        {
618            return true;
619        }
620        self.recovery
621            .lock()
622            .expect("delta refresh recovery lock poisoned")
623            .is_overflowing()
624    }
625
626    fn next_periodic_tick_is_full(&self) -> bool {
627        let every = self.periodic_full_every.load(Ordering::Acquire);
628        every != 0
629            && self
630                .periodic_full_progress
631                .load(Ordering::Acquire)
632                .saturating_add(1)
633                >= every
634    }
635
636    fn note_successful_tick(&self, kind: TickKind, observed_force_generation: u64) {
637        if matches!(kind, TickKind::Full) {
638            self.satisfied_force_full_generation
639                .fetch_max(observed_force_generation, Ordering::AcqRel);
640            if self.periodic_full_every.load(Ordering::Acquire) != 0 {
641                self.periodic_full_progress.store(0, Ordering::Release);
642            }
643        }
644    }
645
646    fn note_successful_scheduled_delta_tick(&self) {
647        let every = self.periodic_full_every.load(Ordering::Acquire);
648        if every == 0 {
649            return;
650        }
651        let next = self
652            .periodic_full_progress
653            .load(Ordering::Acquire)
654            .saturating_add(1)
655            .min(every.saturating_sub(1));
656        self.periodic_full_progress.store(next, Ordering::Release);
657    }
658
659    fn prime_membership_for_observed_items(&self, update: &MembershipUpdate<T::Id>) {
660        if update.item_ids.is_empty() && update.tombstones.is_empty() {
661            return;
662        }
663        let mut membership = self
664            .membership
665            .lock()
666            .expect("delta refresh membership lock poisoned");
667        for id in &update.tombstones {
668            membership.remove(id);
669        }
670        membership.extend(update.item_ids.iter().cloned());
671    }
672
673    fn membership_snapshot(&self) -> HashSet<T::Id> {
674        self.membership
675            .lock()
676            .expect("delta refresh membership lock poisoned")
677            .clone()
678    }
679
680    fn restore_membership(&self, snapshot: HashSet<T::Id>) {
681        *self
682            .membership
683            .lock()
684            .expect("delta refresh membership lock poisoned") = snapshot;
685    }
686
687    fn queue_missing_observed_items(&self, update: &MembershipUpdate<T::Id>) {
688        let missing = update
689            .item_ids
690            .iter()
691            .filter(|id| !self.punnu.contains_unexpired(id))
692            .cloned()
693            .collect::<Vec<_>>();
694        if missing.is_empty() {
695            return;
696        }
697        let mut recovery = self
698            .recovery
699            .lock()
700            .expect("delta refresh recovery lock poisoned");
701        for id in missing {
702            recovery.record_eviction(id);
703        }
704    }
705
706    fn apply_membership_update(&self, update: MembershipUpdate<T::Id>) {
707        let mut membership = self
708            .membership
709            .lock()
710            .expect("delta refresh membership lock poisoned");
711        if update.full_refresh {
712            *membership = update.item_ids;
713        } else {
714            for id in update.recovered_ids {
715                if !update.item_ids.contains(&id) && !update.tombstones.contains(&id) {
716                    membership.remove(&id);
717                }
718            }
719            for id in update.tombstones {
720                membership.remove(&id);
721            }
722            membership.extend(update.item_ids);
723        }
724    }
725
726    fn note_lru_eviction(&self, id: T::Id) {
727        if !self.lru_warning_issued.swap(true, Ordering::AcqRel) {
728            tracing::warn!(
729                "Punnu LRU eviction observed while a delta refresh subscription is active; \
730                 consider raising lru_size, enabling eviction recovery, or configuring periodic full refresh"
731            );
732        }
733
734        let owned_by_subscription = self
735            .membership
736            .lock()
737            .expect("delta refresh membership lock poisoned")
738            .contains(&id);
739        if owned_by_subscription {
740            self.recovery
741                .lock()
742                .expect("delta refresh recovery lock poisoned")
743                .record_eviction(id);
744        }
745    }
746
747    fn note_event_lag(&self, skipped: u64) {
748        tracing::warn!(
749            skipped,
750            "delta refresh subscription event stream lagged; forcing next recovery tick to full refresh"
751        );
752        self.force_full_generation.fetch_add(1, Ordering::AcqRel);
753    }
754
755    fn apply_delta_and_observed_watermark(
756        &self,
757        delta: DeltaResult<T, T::Watermark>,
758        recovery_snapshot: &RecoverySnapshot<T::Id>,
759    ) -> Result<(usize, Option<T::Watermark>), FetchError> {
760        let next_watermark = delta.observed_watermark();
761        let recovered_ids = recovery_snapshot.ids();
762        let stats = self
763            .punnu
764            .apply_delta_recovering(delta.without_high_watermark(), &recovered_ids);
765        if stats.backend_reserved_skips > 0 {
766            return Err(FetchError::Serialization(
767                "delta refresh deferred because a strict backend insert reserved one of its ids"
768                    .to_owned(),
769            ));
770        }
771        Ok((stats.applied_items, next_watermark))
772    }
773
774    fn advance_watermark(&self, next_watermark: Option<T::Watermark>) -> Option<T::Watermark> {
775        let mut stored = self
776            .last_watermark
777            .lock()
778            .expect("delta refresh watermark lock poisoned");
779        if let Some(next_watermark) = next_watermark {
780            match &*stored {
781                Some(current) if current >= &next_watermark => {}
782                _ => *stored = Some(next_watermark),
783            }
784        }
785        stored.clone()
786    }
787
788    fn finish_tick(self: Arc<Self>, kind: TickKind, operation_id: u64) {
789        let mut slot = self
790            .slot
791            .lock()
792            .expect("delta refresh in-flight slot lock poisoned");
793
794        match (&mut *slot, kind) {
795            (
796                InFlightSlot::Delta {
797                    operation_id: current_id,
798                    pending_full,
799                    ..
800                },
801                TickKind::Delta,
802            ) if *current_id == operation_id => {
803                if let Some(pending) = pending_full.take() {
804                    *slot = InFlightSlot::Full {
805                        operation_id: pending.operation_id,
806                        shared: pending.shared,
807                    };
808                } else {
809                    *slot = InFlightSlot::Empty;
810                }
811            }
812            (
813                InFlightSlot::Full {
814                    operation_id: current_id,
815                    ..
816                },
817                TickKind::Full,
818            ) if *current_id == operation_id => {
819                *slot = InFlightSlot::Empty;
820            }
821            _ => {}
822        }
823    }
824
825    fn build_and_spawn_tick(
826        self: &Arc<Self>,
827        kind: TickKind,
828        operation_id: u64,
829    ) -> SharedUpdate<T> {
830        let future = box_update_future(self.clone().run_owned_tick(kind, operation_id));
831        let shared = future.shared();
832        let driver = shared.clone();
833        self.punnu.executor().spawn(box_spawn_future(async move {
834            let _ = driver.await;
835        }));
836        shared
837    }
838
839    fn build_and_spawn_chained_full(
840        self: &Arc<Self>,
841        delta: SharedUpdate<T>,
842        operation_id: u64,
843    ) -> SharedUpdate<T> {
844        let owner = self.clone();
845        let future = box_update_future(async move {
846            let _ = delta.await;
847            owner.run_owned_tick(TickKind::Full, operation_id).await
848        });
849        let shared = future.shared();
850        let driver = shared.clone();
851        self.punnu.executor().spawn(box_spawn_future(async move {
852            let _ = driver.await;
853        }));
854        shared
855    }
856
857    async fn run_owned_tick(self: Arc<Self>, kind: TickKind, operation_id: u64) -> SharedOutput<T> {
858        let type_name = std::any::type_name::<T>();
859        let result = AssertUnwindSafe(self.run_tick(kind)).catch_unwind().await;
860        let output = match result {
861            Ok(Ok(result)) => Ok(result),
862            Ok(Err(err)) => Err(into_clone(err)),
863            Err(panic_payload) => Err(FetchErrorClone::FetcherPanic {
864                type_name,
865                message: panic_message(&panic_payload),
866            }),
867        };
868        self.finish_tick(kind, operation_id);
869        output
870    }
871
872    fn next_operation_id(&self) -> u64 {
873        self.next_operation_id.fetch_add(1, Ordering::Relaxed)
874    }
875}
876
877struct TickRollbackGuard<'a, T: DeltaSyncCacheable> {
878    subscription: &'a RefreshSubscription<T>,
879    current_tick: u64,
880    recovery_snapshot: Option<RecoverySnapshot<T::Id>>,
881    membership_before_prime: Option<HashSet<T::Id>>,
882    resolved: bool,
883}
884
885impl<'a, T: DeltaSyncCacheable> TickRollbackGuard<'a, T> {
886    fn new(
887        subscription: &'a RefreshSubscription<T>,
888        current_tick: u64,
889        recovery_snapshot: RecoverySnapshot<T::Id>,
890    ) -> Self {
891        Self {
892            subscription,
893            current_tick,
894            recovery_snapshot: Some(recovery_snapshot),
895            membership_before_prime: None,
896            resolved: false,
897        }
898    }
899
900    fn recovery_snapshot(&self) -> &RecoverySnapshot<T::Id> {
901        self.recovery_snapshot
902            .as_ref()
903            .expect("delta tick recovery snapshot already resolved")
904    }
905
906    fn record_membership_snapshot(&mut self, snapshot: HashSet<T::Id>) {
907        self.membership_before_prime = Some(snapshot);
908    }
909
910    fn restore_after_failed(&mut self) {
911        if let Some(snapshot) = self.membership_before_prime.take() {
912            self.subscription.restore_membership(snapshot);
913        }
914        if let Some(snapshot) = self.recovery_snapshot.take() {
915            self.subscription
916                .restore_recovery_after_failed(snapshot, self.current_tick);
917        }
918        self.resolved = true;
919    }
920
921    fn note_success(mut self) {
922        let Some(snapshot) = self.recovery_snapshot.take() else {
923            self.resolved = true;
924            return;
925        };
926        self.subscription
927            .recovery
928            .lock()
929            .expect("delta refresh recovery lock poisoned")
930            .note_success(snapshot);
931        self.resolved = true;
932    }
933}
934
935impl<T: DeltaSyncCacheable> Drop for TickRollbackGuard<'_, T> {
936    fn drop(&mut self) {
937        if self.resolved {
938            return;
939        }
940
941        if let Some(snapshot) = self.membership_before_prime.take() {
942            match self.subscription.membership.lock() {
943                Ok(mut membership) => *membership = snapshot,
944                Err(_) => tracing::error!(
945                    "delta refresh membership lock poisoned while rolling back failed tick"
946                ),
947            }
948        }
949
950        if let Some(snapshot) = self.recovery_snapshot.take() {
951            match self.subscription.recovery.lock() {
952                Ok(mut recovery) => {
953                    recovery.restore_after_failed(snapshot, self.current_tick);
954                }
955                Err(_) => tracing::error!(
956                    "delta refresh recovery lock poisoned while rolling back failed tick"
957                ),
958            }
959        }
960    }
961}
962
963async fn run_periodic_delta_refresh<T>(
964    subscription: Arc<RefreshSubscription<T>>,
965    interval: Duration,
966    mut cancel: watch::Receiver<bool>,
967) where
968    T: DeltaSyncCacheable,
969{
970    loop {
971        if refresh_cancelled(&cancel) {
972            break;
973        }
974
975        let sleep = subscription.punnu.executor().sleep(interval);
976        tokio::select! {
977            biased;
978            changed = cancel.changed() => {
979                let _ = changed;
980                break;
981            }
982            _ = sleep => {
983                if refresh_cancelled(&cancel) {
984                    break;
985                }
986                tokio::select! {
987                    biased;
988                    changed = cancel.changed() => {
989                        let _ = changed;
990                        break;
991                    }
992                    result = run_periodic_tick(subscription.clone()) => {
993                        if let Err(err) = result {
994                            tracing::warn!(error = %err, "delta refresh failed");
995                        }
996                    }
997                }
998            }
999        }
1000    }
1001}
1002
1003async fn run_periodic_tick<T>(
1004    subscription: Arc<RefreshSubscription<T>>,
1005) -> Result<UpdateResult<T>, FetchError>
1006where
1007    T: DeltaSyncCacheable,
1008{
1009    if subscription.next_periodic_tick_is_full() {
1010        RefreshSubscription::update_full(subscription).await
1011    } else {
1012        let (kind, result) = RefreshSubscription::update_with_kind(subscription.clone()).await?;
1013        if matches!(kind, TickKind::Delta) {
1014            subscription.note_successful_scheduled_delta_tick();
1015        }
1016        Ok(result)
1017    }
1018}
1019
1020async fn run_delta_recovery_event_listener<T>(
1021    subscription: Arc<RefreshSubscription<T>>,
1022    mut events: broadcast::Receiver<PunnuEvent<T>>,
1023    mut cancel: watch::Receiver<bool>,
1024) where
1025    T: DeltaSyncCacheable,
1026{
1027    loop {
1028        if refresh_cancelled(&cancel) {
1029            break;
1030        }
1031
1032        tokio::select! {
1033            biased;
1034            changed = cancel.changed() => {
1035                let _ = changed;
1036                break;
1037            }
1038            event = events.recv() => {
1039                match event {
1040                    Ok(PunnuEvent::Invalidate { id, reason: EventReason::LruEvict }) => {
1041                        subscription.note_lru_eviction(id);
1042                    }
1043                    Ok(_) => {}
1044                    Err(broadcast::error::RecvError::Lagged(skipped)) => {
1045                        subscription.note_event_lag(skipped);
1046                    }
1047                    Err(broadcast::error::RecvError::Closed) => break,
1048                }
1049            }
1050        }
1051    }
1052}
1053
1054fn refresh_cancelled(cancel: &watch::Receiver<bool>) -> bool {
1055    *cancel.borrow() || cancel.has_changed().is_err()
1056}
1057
1058fn assert_active_tokio_runtime(_operation: &str) {
1059    #[cfg(all(feature = "runtime-tokio", not(target_arch = "wasm32")))]
1060    if tokio::runtime::Handle::try_current().is_err() {
1061        panic!("{_operation} requires an active Tokio runtime");
1062    }
1063}
1064
1065fn panic_message(payload: &Box<dyn std::any::Any + Send>) -> String {
1066    if let Some(s) = payload.downcast_ref::<String>() {
1067        s.clone()
1068    } else if let Some(s) = payload.downcast_ref::<&'static str>() {
1069        (*s).to_string()
1070    } else {
1071        String::new()
1072    }
1073}
1074
1075#[cfg(not(target_arch = "wasm32"))]
1076fn box_update_future<T>(
1077    future: impl Future<Output = SharedOutput<T>> + Send + 'static,
1078) -> UpdateFuture<T>
1079where
1080    T: DeltaSyncCacheable,
1081{
1082    Box::pin(future)
1083}
1084
1085#[cfg(target_arch = "wasm32")]
1086fn box_update_future<T>(future: impl Future<Output = SharedOutput<T>> + 'static) -> UpdateFuture<T>
1087where
1088    T: DeltaSyncCacheable,
1089{
1090    Box::pin(future)
1091}
1092
1093#[cfg(not(target_arch = "wasm32"))]
1094fn box_spawn_future(future: impl Future<Output = ()> + Send + 'static) -> BoxFut<'static> {
1095    Box::pin(future)
1096}
1097
1098#[cfg(target_arch = "wasm32")]
1099fn box_spawn_future(future: impl Future<Output = ()> + 'static) -> BoxFut<'static> {
1100    Box::pin(future)
1101}
1102
1103#[cfg(all(test, feature = "runtime-tokio", not(target_arch = "wasm32")))]
1104mod tests {
1105    use super::*;
1106    use crate::Cacheable;
1107    use std::collections::HashSet;
1108    use tokio::sync::Notify;
1109
1110    #[derive(Clone)]
1111    struct TestItem {
1112        id: i64,
1113        updated_at: i64,
1114    }
1115
1116    impl Cacheable for TestItem {
1117        type Id = i64;
1118        type Fields = ();
1119
1120        fn id(&self) -> Self::Id {
1121            self.id
1122        }
1123
1124        fn fields() -> Self::Fields {}
1125    }
1126
1127    impl DeltaSyncCacheable for TestItem {
1128        type Watermark = i64;
1129
1130        fn watermark(&self) -> Self::Watermark {
1131            self.updated_at
1132        }
1133    }
1134
1135    #[derive(Clone)]
1136    struct BlockingFetcher {
1137        started: Arc<Notify>,
1138        release: Arc<Notify>,
1139    }
1140
1141    #[async_trait]
1142    impl DeltaPunnuFetcher<TestItem> for BlockingFetcher {
1143        async fn fetch_delta(
1144            &self,
1145            _query: DeltaQuery<TestItem>,
1146        ) -> Result<DeltaResult<TestItem, i64>, FetchError> {
1147            self.started.notify_one();
1148            self.release.notified().await;
1149            Ok(DeltaResult::new(
1150                vec![TestItem {
1151                    id: 1,
1152                    updated_at: 10,
1153                }],
1154                HashSet::new(),
1155            ))
1156        }
1157    }
1158
1159    async fn wait_for_notification(notify: &Notify, context: &'static str) {
1160        tokio::time::timeout(Duration::from_secs(2), notify.notified())
1161            .await
1162            .expect(context);
1163    }
1164
1165    #[tokio::test]
1166    async fn periodic_cancel_drops_awaiter_while_spawned_fetch_continues() {
1167        let punnu = Punnu::<TestItem>::builder().build();
1168        let started = Arc::new(Notify::new());
1169        let release = Arc::new(Notify::new());
1170        let handle = punnu.start_delta_refresh(
1171            Duration::from_millis(1),
1172            BlockingFetcher {
1173                started: started.clone(),
1174                release: release.clone(),
1175            },
1176        );
1177        let weak_subscription = Arc::downgrade(&handle.inner);
1178
1179        wait_for_notification(&started, "periodic delta fetch should start").await;
1180        drop(handle);
1181
1182        let cancel_observed = tokio::time::timeout(Duration::from_millis(200), async {
1183            loop {
1184                if weak_subscription.strong_count() <= 1 {
1185                    break;
1186                }
1187                tokio::task::yield_now().await;
1188            }
1189        })
1190        .await;
1191        release.notify_one();
1192
1193        assert!(
1194            cancel_observed.is_ok(),
1195            "periodic loop should stop awaiting a blocked shared update when the handle is dropped"
1196        );
1197        tokio::time::timeout(Duration::from_secs(2), async {
1198            loop {
1199                if weak_subscription.strong_count() == 0 {
1200                    break;
1201                }
1202                tokio::task::yield_now().await;
1203            }
1204        })
1205        .await
1206        .expect("detached fetch driver should finish and release the subscription");
1207    }
1208}