frp-signal 0.1.0

Lifecycle and telemetry signal contracts for frp persistence flows.
Documentation
use std::sync::{
    Arc, RwLock, Weak,
    atomic::{AtomicBool, Ordering},
};

use crate::computed::SignalSubscribable;

// ---------------------------------------------------------------------------
// Internal state
// ---------------------------------------------------------------------------

struct SignalInner<T> {
    value: T,
    /// Weak refs so that dropped `Computed` handles are automatically pruned.
    subscribers: Vec<Weak<AtomicBool>>,
}

impl<T> SignalInner<T> {
    fn new(initial: T) -> Self {
        Self { value: initial, subscribers: Vec::new() }
    }

    /// Notify all live subscribers that they are dirty. Prunes dead `Weak` refs.
    fn notify_subscribers(&mut self) {
        self.subscribers.retain(|weak| {
            if let Some(flag) = weak.upgrade() {
                flag.store(true, Ordering::Release);
                true
            } else {
                false // dead reference — remove it
            }
        });
    }
}

// ---------------------------------------------------------------------------
// Signal<T>
// ---------------------------------------------------------------------------

/// A writable reactive cell. Setting a new value marks all registered
/// [`Computed`](crate::computed::Computed) dependents as dirty so they
/// re-evaluate lazily on their next `get()`.
///
/// `Signal<T>` is cheaply cloneable — all clones share the same underlying
/// value and subscriber list.
pub struct Signal<T: Clone + Send + Sync + 'static> {
    inner: Arc<RwLock<SignalInner<T>>>,
}

impl<T: Clone + Send + Sync + 'static> Signal<T> {
    /// Create a new `Signal` with the given initial value.
    pub fn new(initial: T) -> Self {
        Self { inner: Arc::new(RwLock::new(SignalInner::new(initial))) }
    }

    /// Read the current value. Acquires a read lock.
    pub fn get(&self) -> T {
        self.inner.read().expect("Signal RwLock poisoned").value.clone()
    }

    /// Write a new value and mark all live subscribers as dirty.
    /// Acquires a write lock.
    pub fn set(&self, value: T) {
        let mut guard = self.inner.write().expect("Signal RwLock poisoned");
        guard.value = value;
        guard.notify_subscribers();
    }

    /// Register a dirty flag. When this `Signal` is written, the flag will be
    /// set to `true` (ordering: `Release`). The caller owns the `Arc`; this
    /// `Signal` only holds a `Weak` reference, so dropping the `Arc` silently
    /// unregisters the subscriber.
    pub fn subscribe(&self) -> Arc<AtomicBool> {
        let flag = Arc::new(AtomicBool::new(false));
        self.inner
            .write()
            .expect("Signal RwLock poisoned")
            .subscribers
            .push(Arc::downgrade(&flag));
        flag
    }
    /// Register a pre-existing dirty flag as a subscriber. When this `Signal`
    /// is written, `flag` will be set to `true`. Used internally by
    /// [`Computed::track`] and [`Computed::track_any`].
    pub(crate) fn subscribe_with_flag(&self, flag: Arc<AtomicBool>) {
        self.inner
            .write()
            .expect("Signal RwLock poisoned")
            .subscribers
            .push(Arc::downgrade(&flag));
    }
}

impl<T: Clone + Send + Sync + 'static> Clone for Signal<T> {
    fn clone(&self) -> Self {
        Self { inner: Arc::clone(&self.inner) }
    }
}

impl<T: Clone + Send + Sync + 'static> SignalSubscribable for Signal<T> {
    fn subscribe_with_flag(&self, flag: Arc<AtomicBool>) {
        Signal::subscribe_with_flag(self, flag);
    }
}

impl<T: Clone + Send + Sync + std::fmt::Debug + 'static> std::fmt::Debug for Signal<T> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self.inner.try_read() {
            Ok(guard) => write!(f, "Signal({:?})", guard.value),
            Err(_) => write!(f, "Signal(<locked>)"),
        }
    }
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn get_returns_initial_value() {
        let s = Signal::new(42i32);
        assert_eq!(s.get(), 42);
    }

    #[test]
    fn set_updates_value() {
        let s = Signal::new(0i32);
        s.set(99);
        assert_eq!(s.get(), 99);
    }

    #[test]
    fn subscribe_flag_starts_clean() {
        let s = Signal::new(0i32);
        let flag = s.subscribe();
        assert!(!flag.load(Ordering::Acquire));
    }

    #[test]
    fn set_marks_subscriber_dirty() {
        let s = Signal::new(0i32);
        let flag = s.subscribe();
        s.set(1);
        assert!(flag.load(Ordering::Acquire));
    }

    #[test]
    fn dropped_subscriber_does_not_prevent_set() {
        let s = Signal::new(0i32);
        {
            let _flag = s.subscribe(); // dropped at end of this block
        }
        // Should not panic — weak ref is pruned on next set
        s.set(1);
        assert_eq!(s.get(), 1);
    }

    #[test]
    fn clone_shares_value() {
        let s1 = Signal::new("hello");
        let s2 = s1.clone();
        s1.set("world");
        assert_eq!(s2.get(), "world");
    }
}