hyphae 0.6.3

Reactive cells and runtime primitives for rship
Documentation
use std::{
    sync::{
        Arc,
        atomic::{AtomicU64, Ordering},
    },
    thread,
    time::Duration,
};

use super::{CellValue, Watchable};
use crate::{
    cell::{Cell, CellImmutable, CellMutable},
    signal::Signal,
};

pub trait DebounceExt<T>: Watchable<T> {
    #[track_caller]
    fn debounce(&self, duration: Duration) -> Cell<T, CellImmutable>
    where
        T: CellValue,
        Self: Clone + Send + Sync + 'static,
    {
        let cell = Cell::<T, CellMutable>::new(self.get());
        let cell = if let Some(name) = self.name() {
            cell.with_name(format!("{}::debounce", name))
        } else {
            cell
        };

        let generation = Arc::new(AtomicU64::new(0));
        let weak = cell.downgrade();
        let guard = self.subscribe(move |signal| {
            match signal {
                Signal::Value(value) => {
                    let my_gen = generation.fetch_add(1, Ordering::SeqCst) + 1;
                    let value = value.clone(); // Arc clone
                    let weak = weak.clone();
                    let generation = generation.clone();

                    thread::spawn(move || {
                        thread::sleep(duration);
                        if generation.load(Ordering::SeqCst) == my_gen
                            && let Some(c) = weak.upgrade()
                        {
                            c.notify(Signal::value_arc(value));
                        }
                    });
                }
                Signal::Complete => {
                    if let Some(c) = weak.upgrade() {
                        c.notify(Signal::Complete);
                    }
                }
                Signal::Error(e) => {
                    if let Some(c) = weak.upgrade() {
                        c.notify(Signal::Error(e.clone()));
                    }
                }
            }
        });
        cell.own(guard);

        cell.lock()
    }
}

impl<T, W: Watchable<T>> DebounceExt<T> for W {}

#[cfg(test)]
mod tests {
    use std::sync::atomic::AtomicU64;

    use super::*;
    use crate::Mutable;

    #[test]
    fn test_debounce_waits_for_pause() {
        let source = Cell::new(0u64);
        let debounced = source.debounce(Duration::from_millis(50));
        let received = Arc::new(AtomicU64::new(0));

        let r = received.clone();
        let _guard = debounced.subscribe(move |signal| {
            if let Signal::Value(v) = signal {
                r.store(**v, Ordering::SeqCst);
            }
        });

        // Rapid updates
        source.set(1);
        source.set(2);
        source.set(3);

        // Should not have updated yet
        thread::sleep(Duration::from_millis(10));
        assert_eq!(received.load(Ordering::SeqCst), 0);

        // Wait for debounce
        thread::sleep(Duration::from_millis(100));
        assert_eq!(received.load(Ordering::SeqCst), 3);
    }
}