observable_property/
lib.rs

1//! # Observable Property
2//!
3//! A thread-safe observable property implementation for Rust that allows you to
4//! observe changes to values across multiple threads.
5//!
6//! ## Features
7//!
8//! - **Thread-safe**: Uses `Arc<RwLock<>>` for safe concurrent access
9//! - **Observer pattern**: Subscribe to property changes with callbacks
10//! - **Filtered observers**: Only notify when specific conditions are met
11//! - **Async notifications**: Non-blocking observer notifications with background threads
12//! - **Panic isolation**: Observer panics don't crash the system
13//! - **Type-safe**: Generic implementation works with any `Clone + Send + Sync` type
14//!
15//! ## Quick Start
16//!
17//! ```rust
18//! use observable_property::ObservableProperty;
19//! use std::sync::Arc;
20//!
21//! // Create an observable property
22//! let property = ObservableProperty::new(42);
23//!
24//! // Subscribe to changes
25//! let observer_id = match property.subscribe(Arc::new(|old_value, new_value| {
26//!     println!("Value changed from {} to {}", old_value, new_value);
27//! })) {
28//!     Ok(id) => id,
29//!     Err(e) => {
30//!         eprintln!("Failed to subscribe observer: {}", e);
31//!         return; // or handle error appropriately
32//!     }
33//! };
34//!
35//! // Change the value (triggers observer)
36//! if let Err(e) = property.set(100) {
37//!     eprintln!("Failed to set property value: {}", e);
38//!     return; // or handle error appropriately
39//! }
40//!
41//! // Unsubscribe when done
42//! match property.unsubscribe(observer_id) {
43//!     Ok(_) => println!("Observer unsubscribed successfully"),
44//!     Err(e) => eprintln!("Failed to unsubscribe observer: {}", e),
45//! }
46//! ```
47//!
48//! ## Multi-threading Example
49//!
50//! ```rust
51//! use observable_property::ObservableProperty;
52//! use std::sync::Arc;
53//! use std::thread;
54//!
55//! let property = Arc::new(ObservableProperty::new(0));
56//! let property_clone = property.clone();
57//!
58//! // Subscribe from one thread
59//! match property.subscribe(Arc::new(|old, new| {
60//!     println!("Value changed: {} -> {}", old, new);
61//! })) {
62//!     Ok(_) => println!("Observer subscribed successfully"),
63//!     Err(e) => {
64//!         eprintln!("Failed to subscribe observer: {}", e);
65//!         return; // or handle error appropriately
66//!     }
67//! };
68//!
69//! // Modify from another thread
70//! match thread::spawn(move || {
71//!     if let Err(e) = property_clone.set(42) {
72//!         eprintln!("Failed to set property value: {}", e);
73//!     }
74//! }).join() {
75//!     Ok(_) => println!("Thread completed successfully"),
76//!     Err(e) => eprintln!("Thread panicked: {:?}", e),
77//! };
78//! ```
79
80use std::collections::HashMap;
81use std::fmt;
82use std::panic;
83use std::sync::{Arc, RwLock};
84use std::thread;
85
86/// Errors that can occur when working with ObservableProperty
87#[derive(Debug, Clone)]
88pub enum PropertyError {
89    /// Failed to acquire a read lock on the property
90    ReadLockError { 
91        /// Context describing what operation was being attempted
92        context: String 
93    },
94    /// Failed to acquire a write lock on the property  
95    WriteLockError { 
96        /// Context describing what operation was being attempted
97        context: String 
98    },
99    /// Attempted to unsubscribe an observer that doesn't exist
100    ObserverNotFound { 
101        /// The ID of the observer that wasn't found
102        id: usize 
103    },
104    /// The property's lock has been poisoned due to a panic in another thread
105    PoisonedLock,
106    /// An observer function encountered an error during execution
107    ObserverError { 
108        /// Description of what went wrong
109        reason: String 
110    },
111}
112
113impl fmt::Display for PropertyError {
114    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115        match self {
116            PropertyError::ReadLockError { context } => {
117                write!(f, "Failed to acquire read lock: {}", context)
118            }
119            PropertyError::WriteLockError { context } => {
120                write!(f, "Failed to acquire write lock: {}", context)
121            }
122            PropertyError::ObserverNotFound { id } => {
123                write!(f, "Observer with ID {} not found", id)
124            }
125            PropertyError::PoisonedLock => {
126                write!(
127                    f,
128                    "Property is in a poisoned state due to a panic in another thread"
129                )
130            }
131            PropertyError::ObserverError { reason } => {
132                write!(f, "Observer execution failed: {}", reason)
133            }
134        }
135    }
136}
137
138impl std::error::Error for PropertyError {}
139
140/// Function type for observers that get called when property values change
141pub type Observer<T> = Arc<dyn Fn(&T, &T) + Send + Sync>;
142
143/// Unique identifier for registered observers
144pub type ObserverId = usize;
145
146/// A thread-safe observable property that notifies observers when its value changes
147///
148/// This type wraps a value of type `T` and allows multiple observers to be notified
149/// whenever the value is modified. All operations are thread-safe and can be called
150/// from multiple threads concurrently.
151///
152/// # Type Requirements
153///
154/// The generic type `T` must implement:
155/// - `Clone`: Required for returning values and passing them to observers
156/// - `Send`: Required for transferring between threads
157/// - `Sync`: Required for concurrent access from multiple threads  
158/// - `'static`: Required for observer callbacks that may outlive the original scope
159///
160/// # Examples
161///
162/// ```rust
163/// use observable_property::ObservableProperty;
164/// use std::sync::Arc;
165///
166/// let property = ObservableProperty::new("initial".to_string());
167///
168/// let observer_id = match property.subscribe(Arc::new(|old, new| {
169///     println!("Changed from '{}' to '{}'", old, new);
170/// })) {
171///     Ok(id) => id,
172///     Err(e) => {
173///         eprintln!("Failed to subscribe observer: {}", e);
174///         return; // or handle error appropriately
175///     }
176/// };
177///
178/// if let Err(e) = property.set("updated".to_string()) {
179///     eprintln!("Failed to set property value: {}", e);
180/// } // Prints: Changed from 'initial' to 'updated'
181/// 
182/// if let Err(e) = property.unsubscribe(observer_id) {
183///     eprintln!("Failed to unsubscribe observer: {}", e);
184/// }
185/// ```
186pub struct ObservableProperty<T> {
187    inner: Arc<RwLock<InnerProperty<T>>>,
188}
189
190struct InnerProperty<T> {
191    value: T,
192    observers: HashMap<ObserverId, Observer<T>>,
193    next_id: ObserverId,
194}
195
196impl<T: Clone + Send + Sync + 'static> ObservableProperty<T> {
197    /// Creates a new observable property with the given initial value
198    ///
199    /// # Arguments
200    ///
201    /// * `initial_value` - The starting value for this property
202    ///
203    /// # Examples
204    ///
205    /// ```rust
206    /// use observable_property::ObservableProperty;
207    ///
208    /// let property = ObservableProperty::new(42);
209    /// match property.get() {
210    ///     Ok(value) => assert_eq!(value, 42),
211    ///     Err(e) => eprintln!("Failed to get property value: {}", e),
212    /// }
213    /// ```
214    pub fn new(initial_value: T) -> Self {
215        Self {
216            inner: Arc::new(RwLock::new(InnerProperty {
217                value: initial_value,
218                observers: HashMap::new(),
219                next_id: 0,
220            })),
221        }
222    }
223
224    /// Gets the current value of the property
225    ///
226    /// This method acquires a read lock, which allows multiple concurrent readers
227    /// but will block if a writer currently holds the lock.
228    ///
229    /// # Returns
230    ///
231    /// `Ok(T)` containing a clone of the current value, or `Err(PropertyError)`
232    /// if the lock is poisoned.
233    ///
234    /// # Examples
235    ///
236    /// ```rust
237    /// use observable_property::ObservableProperty;
238    ///
239    /// let property = ObservableProperty::new("hello".to_string());
240    /// match property.get() {
241    ///     Ok(value) => assert_eq!(value, "hello"),
242    ///     Err(e) => eprintln!("Failed to get property value: {}", e),
243    /// }
244    /// ```
245    pub fn get(&self) -> Result<T, PropertyError> {
246        self.inner
247            .read()
248            .map(|prop| prop.value.clone())
249            .map_err(|_| PropertyError::PoisonedLock)
250    }
251
252    /// Sets the property to a new value and notifies all observers
253    ///
254    /// This method will:
255    /// 1. Acquire a write lock (blocking other readers/writers)
256    /// 2. Update the value and capture a snapshot of observers
257    /// 3. Release the lock
258    /// 4. Notify all observers sequentially with the old and new values
259    ///
260    /// Observer notifications are wrapped in panic recovery to prevent one
261    /// misbehaving observer from affecting others.
262    ///
263    /// # Arguments
264    ///
265    /// * `new_value` - The new value to set
266    ///
267    /// # Returns
268    ///
269    /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
270    ///
271    /// # Examples
272    ///
273    /// ```rust
274    /// use observable_property::ObservableProperty;
275    /// use std::sync::Arc;
276    ///
277    /// let property = ObservableProperty::new(10);
278    /// 
279    /// match property.subscribe(Arc::new(|old, new| {
280    ///     println!("Value changed from {} to {}", old, new);
281    /// })) {
282    ///     Ok(_) => println!("Observer subscribed successfully"),
283    ///     Err(e) => {
284    ///         eprintln!("Failed to subscribe observer: {}", e);
285    ///         return; // or handle error appropriately
286    ///     }
287    /// };
288    ///
289    /// // Triggers observer notification
290    /// if let Err(e) = property.set(20) {
291    ///     eprintln!("Failed to set property value: {}", e);
292    /// }
293    /// ```
294    pub fn set(&self, new_value: T) -> Result<(), PropertyError> {
295        let (old_value, observers_snapshot) = {
296            let mut prop = self
297                .inner
298                .write()
299                .map_err(|_| PropertyError::WriteLockError {
300                    context: "setting property value".to_string(),
301                })?;
302
303            let old_value = prop.value.clone();
304            prop.value = new_value.clone();
305            let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
306            (old_value, observers_snapshot)
307        };
308
309        for observer in observers_snapshot {
310            if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
311                observer(&old_value, &new_value);
312            })) {
313                eprintln!("Observer panic: {:?}", e);
314            }
315        }
316
317        Ok(())
318    }
319
320    /// Sets the property to a new value and notifies observers asynchronously
321    ///
322    /// This method is similar to `set()` but spawns observers in background threads
323    /// for non-blocking operation. This is useful when observers might perform
324    /// time-consuming operations.
325    ///
326    /// Observers are batched into groups and each batch runs in its own thread
327    /// to limit resource usage while still providing parallelism.
328    ///
329    /// # Arguments
330    ///
331    /// * `new_value` - The new value to set
332    ///
333    /// # Returns
334    ///
335    /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
336    /// Note that this only indicates the property was updated successfully;
337    /// observer execution happens asynchronously.
338    ///
339    /// # Examples
340    ///
341    /// ```rust
342    /// use observable_property::ObservableProperty;
343    /// use std::sync::Arc;
344    /// use std::time::Duration;
345    ///
346    /// let property = ObservableProperty::new(0);
347    /// 
348    /// match property.subscribe(Arc::new(|old, new| {
349    ///     // This observer does slow work but won't block the caller
350    ///     std::thread::sleep(Duration::from_millis(100));
351    ///     println!("Slow observer: {} -> {}", old, new);
352    /// })) {
353    ///     Ok(_) => println!("Observer subscribed successfully"),
354    ///     Err(e) => {
355    ///         eprintln!("Failed to subscribe observer: {}", e);
356    ///         return; // or handle error appropriately
357    ///     }
358    /// };
359    ///
360    /// // This returns immediately even though observer is slow
361    /// if let Err(e) = property.set_async(42) {
362    ///     eprintln!("Failed to set property value asynchronously: {}", e);
363    /// }
364    /// ```
365    pub fn set_async(&self, new_value: T) -> Result<(), PropertyError> {
366        let (old_value, observers_snapshot) = {
367            let mut prop = self
368                .inner
369                .write()
370                .map_err(|_| PropertyError::WriteLockError {
371                    context: "setting property value".to_string(),
372                })?;
373
374            let old_value = prop.value.clone();
375            prop.value = new_value.clone();
376            let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
377            (old_value, observers_snapshot)
378        };
379
380        if observers_snapshot.is_empty() {
381            return Ok(());
382        }
383
384        const MAX_THREADS: usize = 4;
385        let observers_per_thread = observers_snapshot.len().div_ceil(MAX_THREADS);
386
387        for batch in observers_snapshot.chunks(observers_per_thread) {
388            let batch_observers = batch.to_vec();
389            let old_val = old_value.clone();
390            let new_val = new_value.clone();
391
392            thread::spawn(move || {
393                for observer in batch_observers {
394                    if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
395                        observer(&old_val, &new_val);
396                    })) {
397                        eprintln!("Observer panic in batch thread: {:?}", e);
398                    }
399                }
400            });
401        }
402
403        Ok(())
404    }
405
406    /// Subscribes an observer function to be called when the property changes
407    ///
408    /// The observer function will be called with the old and new values whenever
409    /// the property is modified via `set()` or `set_async()`.
410    ///
411    /// # Arguments
412    ///
413    /// * `observer` - A function wrapped in `Arc` that takes `(&T, &T)` parameters
414    ///
415    /// # Returns
416    ///
417    /// `Ok(ObserverId)` containing a unique identifier for this observer,
418    /// or `Err(PropertyError)` if the lock is poisoned.
419    ///
420    /// # Examples
421    ///
422    /// ```rust
423    /// use observable_property::ObservableProperty;
424    /// use std::sync::Arc;
425    ///
426    /// let property = ObservableProperty::new(0);
427    ///
428    /// let observer_id = match property.subscribe(Arc::new(|old_value, new_value| {
429    ///     println!("Property changed from {} to {}", old_value, new_value);
430    /// })) {
431    ///     Ok(id) => id,
432    ///     Err(e) => {
433    ///         eprintln!("Failed to subscribe observer: {}", e);
434    ///         return; // or handle error appropriately
435    ///     }
436    /// };
437    ///
438    /// // Later, unsubscribe using the returned ID
439    /// if let Err(e) = property.unsubscribe(observer_id) {
440    ///     eprintln!("Failed to unsubscribe observer: {}", e);
441    /// }
442    /// ```
443    pub fn subscribe(&self, observer: Observer<T>) -> Result<ObserverId, PropertyError> {
444        let mut prop = self
445            .inner
446            .write()
447            .map_err(|_| PropertyError::WriteLockError {
448                context: "subscribing observer".to_string(),
449            })?;
450
451        let id = prop.next_id;
452        prop.next_id += 1;
453        prop.observers.insert(id, observer);
454        Ok(id)
455    }
456
457    /// Removes an observer by its ID
458    ///
459    /// # Arguments
460    ///
461    /// * `id` - The observer ID returned by `subscribe()`
462    ///
463    /// # Returns
464    ///
465    /// `Ok(bool)` where `true` means the observer was found and removed,
466    /// `false` means no observer with that ID existed.
467    /// Returns `Err(PropertyError)` if the lock is poisoned.
468    ///
469    /// # Examples
470    ///
471    /// ```rust
472    /// use observable_property::ObservableProperty;
473    /// use std::sync::Arc;
474    ///
475    /// let property = ObservableProperty::new(0);
476    /// 
477    /// // First, subscribe an observer
478    /// let id = match property.subscribe(Arc::new(|_, _| {})) {
479    ///     Ok(id) => id,
480    ///     Err(e) => {
481    ///         eprintln!("Failed to subscribe observer: {}", e);
482    ///         return; // or handle error appropriately
483    ///     }
484    /// };
485    ///
486    /// // Then unsubscribe it
487    /// match property.unsubscribe(id) {
488    ///     Ok(was_removed) => {
489    ///         assert!(was_removed); // Observer existed and was removed
490    ///     },
491    ///     Err(e) => eprintln!("Failed to unsubscribe observer: {}", e),
492    /// }
493    ///
494    /// // Try unsubscribing again - should return false
495    /// match property.unsubscribe(id) {
496    ///     Ok(was_removed_again) => {
497    ///         assert!(!was_removed_again); // Observer no longer exists
498    ///     },
499    ///     Err(e) => eprintln!("Failed to unsubscribe observer: {}", e),
500    /// }
501    /// ```
502    pub fn unsubscribe(&self, id: ObserverId) -> Result<bool, PropertyError> {
503        let mut prop = self
504            .inner
505            .write()
506            .map_err(|_| PropertyError::WriteLockError {
507                context: "unsubscribing observer".to_string(),
508            })?;
509
510        let was_present = prop.observers.remove(&id).is_some();
511        Ok(was_present)
512    }
513
514    /// Subscribes an observer that only gets called when a filter condition is met
515    ///
516    /// This is useful for observing only specific types of changes, such as
517    /// when a value increases or crosses a threshold.
518    ///
519    /// # Arguments
520    ///
521    /// * `observer` - The observer function to call when the filter passes
522    /// * `filter` - A predicate function that receives `(old_value, new_value)` and returns `bool`
523    ///
524    /// # Returns
525    ///
526    /// `Ok(ObserverId)` for the filtered observer, or `Err(PropertyError)` if the lock is poisoned.
527    ///
528    /// # Examples
529    ///
530    /// ```rust
531    /// use observable_property::ObservableProperty;
532    /// use std::sync::Arc;
533    ///
534    /// let property = ObservableProperty::new(0);
535    ///
536    /// // Only notify when value increases
537    /// let id = match property.subscribe_filtered(
538    ///     Arc::new(|old, new| println!("Value increased: {} -> {}", old, new)),
539    ///     |old, new| new > old
540    /// ) {
541    ///     Ok(id) => id,
542    ///     Err(e) => {
543    ///         eprintln!("Failed to subscribe filtered observer: {}", e);
544    ///         return; // or handle error appropriately
545    ///     }
546    /// };
547    ///
548    /// // Triggers observer (0 -> 10)
549    /// if let Err(e) = property.set(10) {
550    ///     eprintln!("Failed to set property value: {}", e);
551    /// }
552    /// 
553    /// // Does NOT trigger observer (10 -> 5)
554    /// if let Err(e) = property.set(5) {
555    ///     eprintln!("Failed to set property value: {}", e);
556    /// }
557    /// 
558    /// // Triggers observer (5 -> 15)
559    /// if let Err(e) = property.set(15) {
560    ///     eprintln!("Failed to set property value: {}", e);
561    /// }
562    /// ```
563    pub fn subscribe_filtered<F>(
564        &self,
565        observer: Observer<T>,
566        filter: F,
567    ) -> Result<ObserverId, PropertyError>
568    where
569        F: Fn(&T, &T) -> bool + Send + Sync + 'static,
570    {
571        let filter = Arc::new(filter);
572        let filtered_observer = Arc::new(move |old_val: &T, new_val: &T| {
573            if filter(old_val, new_val) {
574                observer(old_val, new_val);
575            }
576        });
577
578        self.subscribe(filtered_observer)
579    }
580}
581
582impl<T: Clone> Clone for ObservableProperty<T> {
583    /// Creates a new reference to the same observable property
584    ///
585    /// This creates a new `ObservableProperty` instance that shares the same
586    /// underlying data with the original. Changes made through either instance
587    /// will be visible to observers subscribed through both instances.
588    ///
589    /// # Examples
590    ///
591    /// ```rust
592    /// use observable_property::ObservableProperty;
593    /// use std::sync::Arc;
594    ///
595    /// let property1 = ObservableProperty::new(42);
596    /// let property2 = property1.clone();
597    ///
598    /// match property2.subscribe(Arc::new(|old, new| {
599    ///     println!("Observer on property2 saw change: {} -> {}", old, new);
600    /// })) {
601    ///     Ok(_) => println!("Observer subscribed successfully"),
602    ///     Err(e) => {
603    ///         eprintln!("Failed to subscribe observer: {}", e);
604    ///         return; // or handle error appropriately
605    ///     }
606    /// };
607    ///
608    /// // This change through property1 will trigger the observer on property2
609    /// if let Err(e) = property1.set(100) {
610    ///     eprintln!("Failed to set property value: {}", e);
611    /// }
612    /// ```
613    fn clone(&self) -> Self {
614        Self {
615            inner: Arc::clone(&self.inner),
616        }
617    }
618}
619
620impl<T: Clone + std::fmt::Debug + Send + Sync + 'static> std::fmt::Debug for ObservableProperty<T> {
621    /// Debug implementation that shows the current value if accessible
622    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
623        match self.get() {
624            Ok(value) => f.debug_struct("ObservableProperty")
625                .field("value", &value)
626                .field("observers_count", &"[hidden]")
627                .finish(),
628            Err(_) => f.debug_struct("ObservableProperty")
629                .field("value", &"[inaccessible]")
630                .field("observers_count", &"[hidden]")
631                .finish(),
632        }
633    }
634}
635
636#[cfg(test)]
637mod tests {
638    use super::*;
639    use std::sync::atomic::{AtomicUsize, Ordering};
640    use std::time::Duration;
641
642    #[test]
643    fn test_property_creation_and_basic_operations() {
644        let prop = ObservableProperty::new(42);
645        
646        // Test initial value
647        match prop.get() {
648            Ok(value) => assert_eq!(value, 42),
649            Err(e) => panic!("Failed to get initial value: {}", e),
650        }
651        
652        // Test setting value
653        if let Err(e) = prop.set(100) {
654            panic!("Failed to set value: {}", e);
655        }
656        
657        match prop.get() {
658            Ok(value) => assert_eq!(value, 100),
659            Err(e) => panic!("Failed to get updated value: {}", e),
660        }
661    }
662
663    #[test]
664    fn test_observer_subscription_and_notification() {
665        let prop = ObservableProperty::new("initial".to_string());
666        let notification_count = Arc::new(AtomicUsize::new(0));
667        let last_old_value = Arc::new(RwLock::new(String::new()));
668        let last_new_value = Arc::new(RwLock::new(String::new()));
669        
670        let count_clone = notification_count.clone();
671        let old_clone = last_old_value.clone();
672        let new_clone = last_new_value.clone();
673        
674        let observer_id = match prop.subscribe(Arc::new(move |old, new| {
675            count_clone.fetch_add(1, Ordering::SeqCst);
676            if let Ok(mut old_val) = old_clone.write() {
677                *old_val = old.clone();
678            }
679            if let Ok(mut new_val) = new_clone.write() {
680                *new_val = new.clone();
681            }
682        })) {
683            Ok(id) => id,
684            Err(e) => panic!("Failed to subscribe observer: {}", e),
685        };
686        
687        // Change value and verify notification
688        if let Err(e) = prop.set("changed".to_string()) {
689            panic!("Failed to set property value: {}", e);
690        }
691        
692        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
693        
694        match last_old_value.read() {
695            Ok(old_val) => assert_eq!(*old_val, "initial"),
696            Err(e) => panic!("Failed to read old value: {:?}", e),
697        }
698        
699        match last_new_value.read() {
700            Ok(new_val) => assert_eq!(*new_val, "changed"),
701            Err(e) => panic!("Failed to read new value: {:?}", e),
702        }
703        
704        // Test unsubscription
705        match prop.unsubscribe(observer_id) {
706            Ok(was_present) => assert!(was_present),
707            Err(e) => panic!("Failed to unsubscribe observer: {}", e),
708        }
709        
710        // Change value again - should not notify
711        if let Err(e) = prop.set("not_notified".to_string()) {
712            panic!("Failed to set property value after unsubscribe: {}", e);
713        }
714        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
715    }
716
717    #[test]
718    fn test_filtered_observer() {
719        let prop = ObservableProperty::new(0i32);
720        let notification_count = Arc::new(AtomicUsize::new(0));
721        let count_clone = notification_count.clone();
722        
723        // Observer only triggered when value increases
724        let observer_id = match prop.subscribe_filtered(
725            Arc::new(move |_, _| {
726                count_clone.fetch_add(1, Ordering::SeqCst);
727            }),
728            |old, new| new > old
729        ) {
730            Ok(id) => id,
731            Err(e) => panic!("Failed to subscribe filtered observer: {}", e),
732        };
733        
734        // Should trigger (0 -> 5)
735        if let Err(e) = prop.set(5) {
736            panic!("Failed to set property value to 5: {}", e);
737        }
738        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
739        
740        // Should NOT trigger (5 -> 3)
741        if let Err(e) = prop.set(3) {
742            panic!("Failed to set property value to 3: {}", e);
743        }
744        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
745        
746        // Should trigger (3 -> 10)
747        if let Err(e) = prop.set(10) {
748            panic!("Failed to set property value to 10: {}", e);
749        }
750        assert_eq!(notification_count.load(Ordering::SeqCst), 2);
751        
752        match prop.unsubscribe(observer_id) {
753            Ok(_) => {},
754            Err(e) => panic!("Failed to unsubscribe filtered observer: {}", e),
755        }
756    }
757
758    #[test]
759    fn test_thread_safety_concurrent_reads() {
760        let prop = Arc::new(ObservableProperty::new(42i32));
761        let num_threads = 10;
762        let reads_per_thread = 100;
763        
764        let handles: Vec<_> = (0..num_threads).map(|_| {
765            let prop_clone = prop.clone();
766            thread::spawn(move || {
767                for _ in 0..reads_per_thread {
768                    match prop_clone.get() {
769                        Ok(value) => assert_eq!(value, 42),
770                        Err(e) => panic!("Failed to read property value: {}", e),
771                    }
772                    thread::sleep(Duration::from_millis(1));
773                }
774            })
775        }).collect();
776        
777        for handle in handles {
778            if let Err(e) = handle.join() {
779                panic!("Thread failed to complete: {:?}", e);
780            }
781        }
782    }
783
784    #[test]
785    fn test_async_set_performance() {
786        let prop = ObservableProperty::new(0i32);
787        let slow_observer_count = Arc::new(AtomicUsize::new(0));
788        let count_clone = slow_observer_count.clone();
789        
790        // Add observer that simulates slow work
791        let _id = match prop.subscribe(Arc::new(move |_, _| {
792            thread::sleep(Duration::from_millis(50));
793            count_clone.fetch_add(1, Ordering::SeqCst);
794        })) {
795            Ok(id) => id,
796            Err(e) => panic!("Failed to subscribe slow observer: {}", e),
797        };
798        
799        // Test synchronous set (should be slow)
800        let start = std::time::Instant::now();
801        if let Err(e) = prop.set(1) {
802            panic!("Failed to set property value synchronously: {}", e);
803        }
804        let sync_duration = start.elapsed();
805        
806        // Test asynchronous set (should be fast)
807        let start = std::time::Instant::now();
808        if let Err(e) = prop.set_async(2) {
809            panic!("Failed to set property value asynchronously: {}", e);
810        }
811        let async_duration = start.elapsed();
812        
813        // Async should be much faster than sync
814        assert!(async_duration < sync_duration);
815        assert!(async_duration.as_millis() < 10); // Should be very fast
816        
817        // Wait for async observer to complete
818        thread::sleep(Duration::from_millis(100));
819        
820        // Both observers should have been called
821        assert_eq!(slow_observer_count.load(Ordering::SeqCst), 2);
822    }
823}