observable_property/
lib.rs

1//! # Observable Property
2//!
3//! A thread-safe observable property implementation for Rust that allows you to
4//! observe changes to values across multiple threads.
5//!
6//! ## Features
7//!
8//! - **Thread-safe**: Uses `Arc<RwLock<>>` for safe concurrent access
9//! - **Observer pattern**: Subscribe to property changes with callbacks
10//! - **Filtered observers**: Only notify when specific conditions are met
11//! - **Async notifications**: Non-blocking observer notifications with background threads
12//! - **Panic isolation**: Observer panics don't crash the system
13//! - **Type-safe**: Generic implementation works with any `Clone + Send + Sync` type
14//!
15//! ## Quick Start
16//!
17//! ```rust
18//! use observable_property::ObservableProperty;
19//! use std::sync::Arc;
20//!
21//! // Create an observable property
22//! let property = ObservableProperty::new(42);
23//!
24//! // Subscribe to changes
25//! let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
26//!     println!("Value changed from {} to {}", old_value, new_value);
27//! })).unwrap();
28//!
29//! // Change the value (triggers observer)
30//! property.set(100).unwrap();
31//!
32//! // Unsubscribe when done
33//! property.unsubscribe(observer_id).unwrap();
34//! ```
35//!
36//! ## Multi-threading Example
37//!
38//! ```rust
39//! use observable_property::ObservableProperty;
40//! use std::sync::Arc;
41//! use std::thread;
42//!
43//! let property = Arc::new(ObservableProperty::new(0));
44//! let property_clone = property.clone();
45//!
46//! // Subscribe from one thread
47//! property.subscribe(Arc::new(|old, new| {
48//!     println!("Value changed: {} -> {}", old, new);
49//! })).unwrap();
50//!
51//! // Modify from another thread
52//! thread::spawn(move || {
53//!     property_clone.set(42).unwrap();
54//! }).join().unwrap();
55//! ```
56
57use std::collections::HashMap;
58use std::fmt;
59use std::panic;
60use std::sync::{Arc, RwLock};
61use std::thread;
62
63/// Errors that can occur when working with ObservableProperty
64#[derive(Debug, Clone)]
65pub enum PropertyError {
66    /// Failed to acquire a read lock on the property
67    ReadLockError { 
68        /// Context describing what operation was being attempted
69        context: String 
70    },
71    /// Failed to acquire a write lock on the property  
72    WriteLockError { 
73        /// Context describing what operation was being attempted
74        context: String 
75    },
76    /// Attempted to unsubscribe an observer that doesn't exist
77    ObserverNotFound { 
78        /// The ID of the observer that wasn't found
79        id: usize 
80    },
81    /// The property's lock has been poisoned due to a panic in another thread
82    PoisonedLock,
83    /// An observer function encountered an error during execution
84    ObserverError { 
85        /// Description of what went wrong
86        reason: String 
87    },
88}
89
90impl fmt::Display for PropertyError {
91    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92        match self {
93            PropertyError::ReadLockError { context } => {
94                write!(f, "Failed to acquire read lock: {}", context)
95            }
96            PropertyError::WriteLockError { context } => {
97                write!(f, "Failed to acquire write lock: {}", context)
98            }
99            PropertyError::ObserverNotFound { id } => {
100                write!(f, "Observer with ID {} not found", id)
101            }
102            PropertyError::PoisonedLock => {
103                write!(
104                    f,
105                    "Property is in a poisoned state due to a panic in another thread"
106                )
107            }
108            PropertyError::ObserverError { reason } => {
109                write!(f, "Observer execution failed: {}", reason)
110            }
111        }
112    }
113}
114
115impl std::error::Error for PropertyError {}
116
117/// Function type for observers that get called when property values change
118pub type Observer<T> = Arc<dyn Fn(&T, &T) + Send + Sync>;
119
120/// Unique identifier for registered observers
121pub type ObserverId = usize;
122
123/// A thread-safe observable property that notifies observers when its value changes
124///
125/// This type wraps a value of type `T` and allows multiple observers to be notified
126/// whenever the value is modified. All operations are thread-safe and can be called
127/// from multiple threads concurrently.
128///
129/// # Type Requirements
130///
131/// The generic type `T` must implement:
132/// - `Clone`: Required for returning values and passing them to observers
133/// - `Send`: Required for transferring between threads
134/// - `Sync`: Required for concurrent access from multiple threads  
135/// - `'static`: Required for observer callbacks that may outlive the original scope
136///
137/// # Examples
138///
139/// ```rust
140/// use observable_property::ObservableProperty;
141/// use std::sync::Arc;
142///
143/// let property = ObservableProperty::new("initial".to_string());
144///
145/// let observer_id = property.subscribe(Arc::new(|old, new| {
146///     println!("Changed from '{}' to '{}'", old, new);
147/// })).unwrap();
148///
149/// property.set("updated".to_string()).unwrap(); // Prints: Changed from 'initial' to 'updated'
150/// property.unsubscribe(observer_id).unwrap();
151/// ```
152pub struct ObservableProperty<T> {
153    inner: Arc<RwLock<InnerProperty<T>>>,
154}
155
156struct InnerProperty<T> {
157    value: T,
158    observers: HashMap<ObserverId, Observer<T>>,
159    next_id: ObserverId,
160}
161
162impl<T: Clone + Send + Sync + 'static> ObservableProperty<T> {
163    /// Creates a new observable property with the given initial value
164    ///
165    /// # Arguments
166    ///
167    /// * `initial_value` - The starting value for this property
168    ///
169    /// # Examples
170    ///
171    /// ```rust
172    /// use observable_property::ObservableProperty;
173    ///
174    /// let property = ObservableProperty::new(42);
175    /// assert_eq!(property.get().unwrap(), 42);
176    /// ```
177    pub fn new(initial_value: T) -> Self {
178        Self {
179            inner: Arc::new(RwLock::new(InnerProperty {
180                value: initial_value,
181                observers: HashMap::new(),
182                next_id: 0,
183            })),
184        }
185    }
186
187    /// Gets the current value of the property
188    ///
189    /// This method acquires a read lock, which allows multiple concurrent readers
190    /// but will block if a writer currently holds the lock.
191    ///
192    /// # Returns
193    ///
194    /// `Ok(T)` containing a clone of the current value, or `Err(PropertyError)`
195    /// if the lock is poisoned.
196    ///
197    /// # Examples
198    ///
199    /// ```rust
200    /// use observable_property::ObservableProperty;
201    ///
202    /// let property = ObservableProperty::new("hello".to_string());
203    /// assert_eq!(property.get().unwrap(), "hello");
204    /// ```
205    pub fn get(&self) -> Result<T, PropertyError> {
206        self.inner
207            .read()
208            .map(|prop| prop.value.clone())
209            .map_err(|_| PropertyError::PoisonedLock)
210    }
211
212    /// Sets the property to a new value and notifies all observers
213    ///
214    /// This method will:
215    /// 1. Acquire a write lock (blocking other readers/writers)
216    /// 2. Update the value and capture a snapshot of observers
217    /// 3. Release the lock
218    /// 4. Notify all observers sequentially with the old and new values
219    ///
220    /// Observer notifications are wrapped in panic recovery to prevent one
221    /// misbehaving observer from affecting others.
222    ///
223    /// # Arguments
224    ///
225    /// * `new_value` - The new value to set
226    ///
227    /// # Returns
228    ///
229    /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
230    ///
231    /// # Examples
232    ///
233    /// ```rust
234    /// use observable_property::ObservableProperty;
235    /// use std::sync::Arc;
236    ///
237    /// let property = ObservableProperty::new(10);
238    /// 
239    /// property.subscribe(Arc::new(|old, new| {
240    ///     println!("Value changed from {} to {}", old, new);
241    /// })).unwrap();
242    ///
243    /// property.set(20).unwrap(); // Triggers observer notification
244    /// ```
245    pub fn set(&self, new_value: T) -> Result<(), PropertyError> {
246        let (old_value, observers_snapshot) = {
247            let mut prop = self
248                .inner
249                .write()
250                .map_err(|_| PropertyError::WriteLockError {
251                    context: "setting property value".to_string(),
252                })?;
253
254            let old_value = prop.value.clone();
255            prop.value = new_value.clone();
256            let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
257            (old_value, observers_snapshot)
258        };
259
260        for observer in observers_snapshot {
261            if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
262                observer(&old_value, &new_value);
263            })) {
264                eprintln!("Observer panic: {:?}", e);
265            }
266        }
267
268        Ok(())
269    }
270
271    /// Sets the property to a new value and notifies observers asynchronously
272    ///
273    /// This method is similar to `set()` but spawns observers in background threads
274    /// for non-blocking operation. This is useful when observers might perform
275    /// time-consuming operations.
276    ///
277    /// Observers are batched into groups and each batch runs in its own thread
278    /// to limit resource usage while still providing parallelism.
279    ///
280    /// # Arguments
281    ///
282    /// * `new_value` - The new value to set
283    ///
284    /// # Returns
285    ///
286    /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
287    /// Note that this only indicates the property was updated successfully;
288    /// observer execution happens asynchronously.
289    ///
290    /// # Examples
291    ///
292    /// ```rust
293    /// use observable_property::ObservableProperty;
294    /// use std::sync::Arc;
295    /// use std::time::Duration;
296    ///
297    /// let property = ObservableProperty::new(0);
298    /// 
299    /// property.subscribe(Arc::new(|old, new| {
300    ///     // This observer does slow work but won't block the caller
301    ///     std::thread::sleep(Duration::from_millis(100));
302    ///     println!("Slow observer: {} -> {}", old, new);
303    /// })).unwrap();
304    ///
305    /// // This returns immediately even though observer is slow
306    /// property.set_async(42).unwrap();
307    /// ```
308    pub fn set_async(&self, new_value: T) -> Result<(), PropertyError> {
309        let (old_value, observers_snapshot) = {
310            let mut prop = self
311                .inner
312                .write()
313                .map_err(|_| PropertyError::WriteLockError {
314                    context: "setting property value".to_string(),
315                })?;
316
317            let old_value = prop.value.clone();
318            prop.value = new_value.clone();
319            let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
320            (old_value, observers_snapshot)
321        };
322
323        if observers_snapshot.is_empty() {
324            return Ok(());
325        }
326
327        const MAX_THREADS: usize = 4;
328        let observers_per_thread = (observers_snapshot.len() + MAX_THREADS - 1) / MAX_THREADS;
329
330        for batch in observers_snapshot.chunks(observers_per_thread) {
331            let batch_observers = batch.to_vec();
332            let old_val = old_value.clone();
333            let new_val = new_value.clone();
334
335            thread::spawn(move || {
336                for observer in batch_observers {
337                    if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
338                        observer(&old_val, &new_val);
339                    })) {
340                        eprintln!("Observer panic in batch thread: {:?}", e);
341                    }
342                }
343            });
344        }
345
346        Ok(())
347    }
348
349    /// Subscribes an observer function to be called when the property changes
350    ///
351    /// The observer function will be called with the old and new values whenever
352    /// the property is modified via `set()` or `set_async()`.
353    ///
354    /// # Arguments
355    ///
356    /// * `observer` - A function wrapped in `Arc` that takes `(&T, &T)` parameters
357    ///
358    /// # Returns
359    ///
360    /// `Ok(ObserverId)` containing a unique identifier for this observer,
361    /// or `Err(PropertyError)` if the lock is poisoned.
362    ///
363    /// # Examples
364    ///
365    /// ```rust
366    /// use observable_property::ObservableProperty;
367    /// use std::sync::Arc;
368    ///
369    /// let property = ObservableProperty::new(0);
370    ///
371    /// let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
372    ///     println!("Property changed from {} to {}", old_value, new_value);
373    /// })).unwrap();
374    ///
375    /// // Later, unsubscribe using the returned ID
376    /// property.unsubscribe(observer_id).unwrap();
377    /// ```
378    pub fn subscribe(&self, observer: Observer<T>) -> Result<ObserverId, PropertyError> {
379        let mut prop = self
380            .inner
381            .write()
382            .map_err(|_| PropertyError::WriteLockError {
383                context: "subscribing observer".to_string(),
384            })?;
385
386        let id = prop.next_id;
387        prop.next_id += 1;
388        prop.observers.insert(id, observer);
389        Ok(id)
390    }
391
392    /// Removes an observer by its ID
393    ///
394    /// # Arguments
395    ///
396    /// * `id` - The observer ID returned by `subscribe()`
397    ///
398    /// # Returns
399    ///
400    /// `Ok(bool)` where `true` means the observer was found and removed,
401    /// `false` means no observer with that ID existed.
402    /// Returns `Err(PropertyError)` if the lock is poisoned.
403    ///
404    /// # Examples
405    ///
406    /// ```rust
407    /// use observable_property::ObservableProperty;
408    /// use std::sync::Arc;
409    ///
410    /// let property = ObservableProperty::new(0);
411    /// let id = property.subscribe(Arc::new(|_, _| {})).unwrap();
412    ///
413    /// let was_removed = property.unsubscribe(id).unwrap();
414    /// assert!(was_removed); // Observer existed and was removed
415    ///
416    /// let was_removed_again = property.unsubscribe(id).unwrap();
417    /// assert!(!was_removed_again); // Observer no longer exists
418    /// ```
419    pub fn unsubscribe(&self, id: ObserverId) -> Result<bool, PropertyError> {
420        let mut prop = self
421            .inner
422            .write()
423            .map_err(|_| PropertyError::WriteLockError {
424                context: "unsubscribing observer".to_string(),
425            })?;
426
427        let was_present = prop.observers.remove(&id).is_some();
428        Ok(was_present)
429    }
430
431    /// Subscribes an observer that only gets called when a filter condition is met
432    ///
433    /// This is useful for observing only specific types of changes, such as
434    /// when a value increases or crosses a threshold.
435    ///
436    /// # Arguments
437    ///
438    /// * `observer` - The observer function to call when the filter passes
439    /// * `filter` - A predicate function that receives `(old_value, new_value)` and returns `bool`
440    ///
441    /// # Returns
442    ///
443    /// `Ok(ObserverId)` for the filtered observer, or `Err(PropertyError)` if the lock is poisoned.
444    ///
445    /// # Examples
446    ///
447    /// ```rust
448    /// use observable_property::ObservableProperty;
449    /// use std::sync::Arc;
450    ///
451    /// let property = ObservableProperty::new(0);
452    ///
453    /// // Only notify when value increases
454    /// let id = property.subscribe_filtered(
455    ///     Arc::new(|old, new| println!("Value increased: {} -> {}", old, new)),
456    ///     |old, new| new > old
457    /// ).unwrap();
458    ///
459    /// property.set(10).unwrap(); // Triggers observer (0 -> 10)
460    /// property.set(5).unwrap();  // Does NOT trigger observer (10 -> 5)
461    /// property.set(15).unwrap(); // Triggers observer (5 -> 15)
462    /// ```
463    pub fn subscribe_filtered<F>(
464        &self,
465        observer: Observer<T>,
466        filter: F,
467    ) -> Result<ObserverId, PropertyError>
468    where
469        F: Fn(&T, &T) -> bool + Send + Sync + 'static,
470    {
471        let filter = Arc::new(filter);
472        let filtered_observer = Arc::new(move |old_val: &T, new_val: &T| {
473            if filter(old_val, new_val) {
474                observer(old_val, new_val);
475            }
476        });
477
478        self.subscribe(filtered_observer)
479    }
480}
481
482impl<T: Clone> Clone for ObservableProperty<T> {
483    /// Creates a new reference to the same observable property
484    ///
485    /// This creates a new `ObservableProperty` instance that shares the same
486    /// underlying data with the original. Changes made through either instance
487    /// will be visible to observers subscribed through both instances.
488    ///
489    /// # Examples
490    ///
491    /// ```rust
492    /// use observable_property::ObservableProperty;
493    /// use std::sync::Arc;
494    ///
495    /// let property1 = ObservableProperty::new(42);
496    /// let property2 = property1.clone();
497    ///
498    /// property2.subscribe(Arc::new(|old, new| {
499    ///     println!("Observer on property2 saw change: {} -> {}", old, new);
500    /// })).unwrap();
501    ///
502    /// // This change through property1 will trigger the observer on property2
503    /// property1.set(100).unwrap();
504    /// ```
505    fn clone(&self) -> Self {
506        Self {
507            inner: Arc::clone(&self.inner),
508        }
509    }
510}
511
512impl<T: Clone + std::fmt::Debug + Send + Sync + 'static> std::fmt::Debug for ObservableProperty<T> {
513    /// Debug implementation that shows the current value if accessible
514    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
515        match self.get() {
516            Ok(value) => f.debug_struct("ObservableProperty")
517                .field("value", &value)
518                .field("observers_count", &"[hidden]")
519                .finish(),
520            Err(_) => f.debug_struct("ObservableProperty")
521                .field("value", &"[inaccessible]")
522                .field("observers_count", &"[hidden]")
523                .finish(),
524        }
525    }
526}
527
528#[cfg(test)]
529mod tests {
530    use super::*;
531    use std::sync::atomic::{AtomicUsize, Ordering};
532    use std::time::Duration;
533
534    #[test]
535    fn test_property_creation_and_basic_operations() {
536        let prop = ObservableProperty::new(42);
537        
538        // Test initial value
539        match prop.get() {
540            Ok(value) => assert_eq!(value, 42),
541            Err(e) => panic!("Failed to get initial value: {}", e),
542        }
543        
544        // Test setting value
545        if let Err(e) = prop.set(100) {
546            panic!("Failed to set value: {}", e);
547        }
548        
549        match prop.get() {
550            Ok(value) => assert_eq!(value, 100),
551            Err(e) => panic!("Failed to get updated value: {}", e),
552        }
553    }
554
555    #[test]
556    fn test_observer_subscription_and_notification() {
557        let prop = ObservableProperty::new("initial".to_string());
558        let notification_count = Arc::new(AtomicUsize::new(0));
559        let last_old_value = Arc::new(RwLock::new(String::new()));
560        let last_new_value = Arc::new(RwLock::new(String::new()));
561        
562        let count_clone = notification_count.clone();
563        let old_clone = last_old_value.clone();
564        let new_clone = last_new_value.clone();
565        
566        let observer_id = match prop.subscribe(Arc::new(move |old, new| {
567            count_clone.fetch_add(1, Ordering::SeqCst);
568            if let Ok(mut old_val) = old_clone.write() {
569                *old_val = old.clone();
570            }
571            if let Ok(mut new_val) = new_clone.write() {
572                *new_val = new.clone();
573            }
574        })) {
575            Ok(id) => id,
576            Err(e) => panic!("Failed to subscribe observer: {}", e),
577        };
578        
579        // Change value and verify notification
580        if let Err(e) = prop.set("changed".to_string()) {
581            panic!("Failed to set property value: {}", e);
582        }
583        
584        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
585        
586        match last_old_value.read() {
587            Ok(old_val) => assert_eq!(*old_val, "initial"),
588            Err(e) => panic!("Failed to read old value: {:?}", e),
589        }
590        
591        match last_new_value.read() {
592            Ok(new_val) => assert_eq!(*new_val, "changed"),
593            Err(e) => panic!("Failed to read new value: {:?}", e),
594        }
595        
596        // Test unsubscription
597        match prop.unsubscribe(observer_id) {
598            Ok(was_present) => assert!(was_present),
599            Err(e) => panic!("Failed to unsubscribe observer: {}", e),
600        }
601        
602        // Change value again - should not notify
603        if let Err(e) = prop.set("not_notified".to_string()) {
604            panic!("Failed to set property value after unsubscribe: {}", e);
605        }
606        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
607    }
608
609    #[test]
610    fn test_filtered_observer() {
611        let prop = ObservableProperty::new(0i32);
612        let notification_count = Arc::new(AtomicUsize::new(0));
613        let count_clone = notification_count.clone();
614        
615        // Observer only triggered when value increases
616        let observer_id = match prop.subscribe_filtered(
617            Arc::new(move |_, _| {
618                count_clone.fetch_add(1, Ordering::SeqCst);
619            }),
620            |old, new| new > old
621        ) {
622            Ok(id) => id,
623            Err(e) => panic!("Failed to subscribe filtered observer: {}", e),
624        };
625        
626        // Should trigger (0 -> 5)
627        if let Err(e) = prop.set(5) {
628            panic!("Failed to set property value to 5: {}", e);
629        }
630        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
631        
632        // Should NOT trigger (5 -> 3)
633        if let Err(e) = prop.set(3) {
634            panic!("Failed to set property value to 3: {}", e);
635        }
636        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
637        
638        // Should trigger (3 -> 10)
639        if let Err(e) = prop.set(10) {
640            panic!("Failed to set property value to 10: {}", e);
641        }
642        assert_eq!(notification_count.load(Ordering::SeqCst), 2);
643        
644        match prop.unsubscribe(observer_id) {
645            Ok(_) => {},
646            Err(e) => panic!("Failed to unsubscribe filtered observer: {}", e),
647        }
648    }
649
650    #[test]
651    fn test_thread_safety_concurrent_reads() {
652        let prop = Arc::new(ObservableProperty::new(42i32));
653        let num_threads = 10;
654        let reads_per_thread = 100;
655        
656        let handles: Vec<_> = (0..num_threads).map(|_| {
657            let prop_clone = prop.clone();
658            thread::spawn(move || {
659                for _ in 0..reads_per_thread {
660                    match prop_clone.get() {
661                        Ok(value) => assert_eq!(value, 42),
662                        Err(e) => panic!("Failed to read property value: {}", e),
663                    }
664                    thread::sleep(Duration::from_millis(1));
665                }
666            })
667        }).collect();
668        
669        for handle in handles {
670            if let Err(e) = handle.join() {
671                panic!("Thread failed to complete: {:?}", e);
672            }
673        }
674    }
675
676    #[test]
677    fn test_async_set_performance() {
678        let prop = ObservableProperty::new(0i32);
679        let slow_observer_count = Arc::new(AtomicUsize::new(0));
680        let count_clone = slow_observer_count.clone();
681        
682        // Add observer that simulates slow work
683        let _id = match prop.subscribe(Arc::new(move |_, _| {
684            thread::sleep(Duration::from_millis(50));
685            count_clone.fetch_add(1, Ordering::SeqCst);
686        })) {
687            Ok(id) => id,
688            Err(e) => panic!("Failed to subscribe slow observer: {}", e),
689        };
690        
691        // Test synchronous set (should be slow)
692        let start = std::time::Instant::now();
693        if let Err(e) = prop.set(1) {
694            panic!("Failed to set property value synchronously: {}", e);
695        }
696        let sync_duration = start.elapsed();
697        
698        // Test asynchronous set (should be fast)
699        let start = std::time::Instant::now();
700        if let Err(e) = prop.set_async(2) {
701            panic!("Failed to set property value asynchronously: {}", e);
702        }
703        let async_duration = start.elapsed();
704        
705        // Async should be much faster than sync
706        assert!(async_duration < sync_duration);
707        assert!(async_duration.as_millis() < 10); // Should be very fast
708        
709        // Wait for async observer to complete
710        thread::sleep(Duration::from_millis(100));
711        
712        // Both observers should have been called
713        assert_eq!(slow_observer_count.load(Ordering::SeqCst), 2);
714    }
715}