observable_property/
lib.rs

1//! # Observable Property
2//!
3//! A thread-safe observable property implementation for Rust that allows you to
4//! observe changes to values across multiple threads.
5//!
6//! ## Features
7//!
8//! - **Thread-safe**: Uses `Arc<RwLock<>>` for safe concurrent access
9//! - **Observer pattern**: Subscribe to property changes with callbacks
10//! - **Filtered observers**: Only notify when specific conditions are met
11//! - **Async notifications**: Non-blocking observer notifications with background threads
12//! - **Panic isolation**: Observer panics don't crash the system
13//! - **Type-safe**: Generic implementation works with any `Clone + Send + Sync` type
14//!
15//! ## Quick Start
16//!
17//! ```rust
18//! use observable_property::ObservableProperty;
19//! use std::sync::Arc;
20//!
21//! // Create an observable property
22//! let property = ObservableProperty::new(42);
23//!
24//! // Subscribe to changes
25//! let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
26//!     println!("Value changed from {} to {}", old_value, new_value);
27//! })).map_err(|e| {
28//!     eprintln!("Failed to subscribe: {}", e);
29//!     e
30//! })?;
31//!
32//! // Change the value (triggers observer)
33//! property.set(100).map_err(|e| {
34//!     eprintln!("Failed to set value: {}", e);
35//!     e
36//! })?;
37//!
38//! // Unsubscribe when done
39//! property.unsubscribe(observer_id).map_err(|e| {
40//!     eprintln!("Failed to unsubscribe: {}", e);
41//!     e
42//! })?;
43//! # Ok::<(), observable_property::PropertyError>(())
44//! ```
45//!
46//! ## Multi-threading Example
47//!
48//! ```rust
49//! use observable_property::ObservableProperty;
50//! use std::sync::Arc;
51//! use std::thread;
52//!
53//! let property = Arc::new(ObservableProperty::new(0));
54//! let property_clone = property.clone();
55//!
56//! // Subscribe from one thread
57//! property.subscribe(Arc::new(|old, new| {
58//!     println!("Value changed: {} -> {}", old, new);
59//! })).map_err(|e| {
60//!     eprintln!("Failed to subscribe: {}", e);
61//!     e
62//! })?;
63//!
64//! // Modify from another thread
65//! thread::spawn(move || {
66//!     if let Err(e) = property_clone.set(42) {
67//!         eprintln!("Failed to set value: {}", e);
68//!     }
69//! }).join().expect("Thread panicked");
70//! # Ok::<(), observable_property::PropertyError>(())
71//! ```
72
73use std::collections::HashMap;
74use std::fmt;
75use std::panic;
76use std::sync::{Arc, RwLock};
77use std::thread;
78
79/// Errors that can occur when working with ObservableProperty
80#[derive(Debug, Clone)]
81pub enum PropertyError {
82    /// Failed to acquire a read lock on the property
83    ReadLockError { 
84        /// Context describing what operation was being attempted
85        context: String 
86    },
87    /// Failed to acquire a write lock on the property  
88    WriteLockError { 
89        /// Context describing what operation was being attempted
90        context: String 
91    },
92    /// Attempted to unsubscribe an observer that doesn't exist
93    ObserverNotFound { 
94        /// The ID of the observer that wasn't found
95        id: usize 
96    },
97    /// The property's lock has been poisoned due to a panic in another thread
98    PoisonedLock,
99    /// An observer function encountered an error during execution
100    ObserverError { 
101        /// Description of what went wrong
102        reason: String 
103    },
104}
105
106impl fmt::Display for PropertyError {
107    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108        match self {
109            PropertyError::ReadLockError { context } => {
110                write!(f, "Failed to acquire read lock: {}", context)
111            }
112            PropertyError::WriteLockError { context } => {
113                write!(f, "Failed to acquire write lock: {}", context)
114            }
115            PropertyError::ObserverNotFound { id } => {
116                write!(f, "Observer with ID {} not found", id)
117            }
118            PropertyError::PoisonedLock => {
119                write!(
120                    f,
121                    "Property is in a poisoned state due to a panic in another thread"
122                )
123            }
124            PropertyError::ObserverError { reason } => {
125                write!(f, "Observer execution failed: {}", reason)
126            }
127        }
128    }
129}
130
131impl std::error::Error for PropertyError {}
132
133/// Function type for observers that get called when property values change
134pub type Observer<T> = Arc<dyn Fn(&T, &T) + Send + Sync>;
135
136/// Unique identifier for registered observers
137pub type ObserverId = usize;
138
139/// A thread-safe observable property that notifies observers when its value changes
140///
141/// This type wraps a value of type `T` and allows multiple observers to be notified
142/// whenever the value is modified. All operations are thread-safe and can be called
143/// from multiple threads concurrently.
144///
145/// # Type Requirements
146///
147/// The generic type `T` must implement:
148/// - `Clone`: Required for returning values and passing them to observers
149/// - `Send`: Required for transferring between threads
150/// - `Sync`: Required for concurrent access from multiple threads  
151/// - `'static`: Required for observer callbacks that may outlive the original scope
152///
153/// # Examples
154///
155/// ```rust
156/// use observable_property::ObservableProperty;
157/// use std::sync::Arc;
158///
159/// let property = ObservableProperty::new("initial".to_string());
160///
161/// let observer_id = property.subscribe(Arc::new(|old, new| {
162///     println!("Changed from '{}' to '{}'", old, new);
163/// })).map_err(|e| {
164///     eprintln!("Failed to subscribe: {}", e);
165///     e
166/// })?;
167///
168/// property.set("updated".to_string()).map_err(|e| {
169///     eprintln!("Failed to set value: {}", e);
170///     e
171/// })?; // Prints: Changed from 'initial' to 'updated'
172///
173/// property.unsubscribe(observer_id).map_err(|e| {
174///     eprintln!("Failed to unsubscribe: {}", e);
175///     e
176/// })?;
177/// # Ok::<(), observable_property::PropertyError>(())
178/// ```
179pub struct ObservableProperty<T> {
180    inner: Arc<RwLock<InnerProperty<T>>>,
181}
182
183struct InnerProperty<T> {
184    value: T,
185    observers: HashMap<ObserverId, Observer<T>>,
186    next_id: ObserverId,
187}
188
189impl<T: Clone + Send + Sync + 'static> ObservableProperty<T> {
190    /// Creates a new observable property with the given initial value
191    ///
192    /// # Arguments
193    ///
194    /// * `initial_value` - The starting value for this property
195    ///
196    /// # Examples
197    ///
198    /// ```rust
199    /// use observable_property::ObservableProperty;
200    ///
201    /// let property = ObservableProperty::new(42);
202    /// match property.get() {
203    ///     Ok(value) => assert_eq!(value, 42),
204    ///     Err(e) => eprintln!("Failed to get property value: {}", e),
205    /// }
206    /// ```
207    pub fn new(initial_value: T) -> Self {
208        Self {
209            inner: Arc::new(RwLock::new(InnerProperty {
210                value: initial_value,
211                observers: HashMap::new(),
212                next_id: 0,
213            })),
214        }
215    }
216
217    /// Gets the current value of the property
218    ///
219    /// This method acquires a read lock, which allows multiple concurrent readers
220    /// but will block if a writer currently holds the lock.
221    ///
222    /// # Returns
223    ///
224    /// `Ok(T)` containing a clone of the current value, or `Err(PropertyError)`
225    /// if the lock is poisoned.
226    ///
227    /// # Examples
228    ///
229    /// ```rust
230    /// use observable_property::ObservableProperty;
231    ///
232    /// let property = ObservableProperty::new("hello".to_string());
233    /// match property.get() {
234    ///     Ok(value) => assert_eq!(value, "hello"),
235    ///     Err(e) => eprintln!("Failed to get property value: {}", e),
236    /// }
237    /// ```
238    pub fn get(&self) -> Result<T, PropertyError> {
239        self.inner
240            .read()
241            .map(|prop| prop.value.clone())
242            .map_err(|_| PropertyError::PoisonedLock)
243    }
244
245    /// Sets the property to a new value and notifies all observers
246    ///
247    /// This method will:
248    /// 1. Acquire a write lock (blocking other readers/writers)
249    /// 2. Update the value and capture a snapshot of observers
250    /// 3. Release the lock
251    /// 4. Notify all observers sequentially with the old and new values
252    ///
253    /// Observer notifications are wrapped in panic recovery to prevent one
254    /// misbehaving observer from affecting others.
255    ///
256    /// # Arguments
257    ///
258    /// * `new_value` - The new value to set
259    ///
260    /// # Returns
261    ///
262    /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
263    ///
264    /// # Examples
265    ///
266    /// ```rust
267    /// use observable_property::ObservableProperty;
268    /// use std::sync::Arc;
269    ///
270    /// let property = ObservableProperty::new(10);
271    /// 
272    /// property.subscribe(Arc::new(|old, new| {
273    ///     println!("Value changed from {} to {}", old, new);
274    /// })).map_err(|e| {
275    ///     eprintln!("Failed to subscribe: {}", e);
276    ///     e
277    /// })?;
278    ///
279    /// property.set(20).map_err(|e| {
280    ///     eprintln!("Failed to set property value: {}", e);
281    ///     e
282    /// })?; // Triggers observer notification
283    /// # Ok::<(), observable_property::PropertyError>(())
284    /// ```
285    pub fn set(&self, new_value: T) -> Result<(), PropertyError> {
286        let (old_value, observers_snapshot) = {
287            let mut prop = self
288                .inner
289                .write()
290                .map_err(|_| PropertyError::WriteLockError {
291                    context: "setting property value".to_string(),
292                })?;
293
294            let old_value = prop.value.clone();
295            prop.value = new_value.clone();
296            let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
297            (old_value, observers_snapshot)
298        };
299
300        for observer in observers_snapshot {
301            if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
302                observer(&old_value, &new_value);
303            })) {
304                eprintln!("Observer panic: {:?}", e);
305            }
306        }
307
308        Ok(())
309    }
310
311    /// Sets the property to a new value and notifies observers asynchronously
312    ///
313    /// This method is similar to `set()` but spawns observers in background threads
314    /// for non-blocking operation. This is useful when observers might perform
315    /// time-consuming operations.
316    ///
317    /// Observers are batched into groups and each batch runs in its own thread
318    /// to limit resource usage while still providing parallelism.
319    ///
320    /// # Arguments
321    ///
322    /// * `new_value` - The new value to set
323    ///
324    /// # Returns
325    ///
326    /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
327    /// Note that this only indicates the property was updated successfully;
328    /// observer execution happens asynchronously.
329    ///
330    /// # Examples
331    ///
332    /// ```rust
333    /// use observable_property::ObservableProperty;
334    /// use std::sync::Arc;
335    /// use std::time::Duration;
336    ///
337    /// let property = ObservableProperty::new(0);
338    /// 
339    /// property.subscribe(Arc::new(|old, new| {
340    ///     // This observer does slow work but won't block the caller
341    ///     std::thread::sleep(Duration::from_millis(100));
342    ///     println!("Slow observer: {} -> {}", old, new);
343    /// })).map_err(|e| {
344    ///     eprintln!("Failed to subscribe: {}", e);
345    ///     e
346    /// })?;
347    ///
348    /// // This returns immediately even though observer is slow
349    /// property.set_async(42).map_err(|e| {
350    ///     eprintln!("Failed to set value asynchronously: {}", e);
351    ///     e
352    /// })?;
353    /// # Ok::<(), observable_property::PropertyError>(())
354    /// ```
355    pub fn set_async(&self, new_value: T) -> Result<(), PropertyError> {
356        let (old_value, observers_snapshot) = {
357            let mut prop = self
358                .inner
359                .write()
360                .map_err(|_| PropertyError::WriteLockError {
361                    context: "setting property value".to_string(),
362                })?;
363
364            let old_value = prop.value.clone();
365            prop.value = new_value.clone();
366            let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
367            (old_value, observers_snapshot)
368        };
369
370        if observers_snapshot.is_empty() {
371            return Ok(());
372        }
373
374        const MAX_THREADS: usize = 4;
375        let observers_per_thread = observers_snapshot.len().div_ceil(MAX_THREADS);
376
377        for batch in observers_snapshot.chunks(observers_per_thread) {
378            let batch_observers = batch.to_vec();
379            let old_val = old_value.clone();
380            let new_val = new_value.clone();
381
382            thread::spawn(move || {
383                for observer in batch_observers {
384                    if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
385                        observer(&old_val, &new_val);
386                    })) {
387                        eprintln!("Observer panic in batch thread: {:?}", e);
388                    }
389                }
390            });
391        }
392
393        Ok(())
394    }
395
396    /// Subscribes an observer function to be called when the property changes
397    ///
398    /// The observer function will be called with the old and new values whenever
399    /// the property is modified via `set()` or `set_async()`.
400    ///
401    /// # Arguments
402    ///
403    /// * `observer` - A function wrapped in `Arc` that takes `(&T, &T)` parameters
404    ///
405    /// # Returns
406    ///
407    /// `Ok(ObserverId)` containing a unique identifier for this observer,
408    /// or `Err(PropertyError)` if the lock is poisoned.
409    ///
410    /// # Examples
411    ///
412    /// ```rust
413    /// use observable_property::ObservableProperty;
414    /// use std::sync::Arc;
415    ///
416    /// let property = ObservableProperty::new(0);
417    ///
418    /// let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
419    ///     println!("Property changed from {} to {}", old_value, new_value);
420    /// })).map_err(|e| {
421    ///     eprintln!("Failed to subscribe observer: {}", e);
422    ///     e
423    /// })?;
424    ///
425    /// // Later, unsubscribe using the returned ID
426    /// property.unsubscribe(observer_id).map_err(|e| {
427    ///     eprintln!("Failed to unsubscribe observer: {}", e);
428    ///     e
429    /// })?;
430    /// # Ok::<(), observable_property::PropertyError>(())
431    /// ```
432    pub fn subscribe(&self, observer: Observer<T>) -> Result<ObserverId, PropertyError> {
433        let mut prop = self
434            .inner
435            .write()
436            .map_err(|_| PropertyError::WriteLockError {
437                context: "subscribing observer".to_string(),
438            })?;
439
440        let id = prop.next_id;
441        prop.next_id += 1;
442        prop.observers.insert(id, observer);
443        Ok(id)
444    }
445
446    /// Removes an observer by its ID
447    ///
448    /// # Arguments
449    ///
450    /// * `id` - The observer ID returned by `subscribe()`
451    ///
452    /// # Returns
453    ///
454    /// `Ok(bool)` where `true` means the observer was found and removed,
455    /// `false` means no observer with that ID existed.
456    /// Returns `Err(PropertyError)` if the lock is poisoned.
457    ///
458    /// # Examples
459    ///
460    /// ```rust
461    /// use observable_property::ObservableProperty;
462    /// use std::sync::Arc;
463    ///
464    /// let property = ObservableProperty::new(0);
465    /// let id = property.subscribe(Arc::new(|_, _| {})).unwrap();
466    ///
467    /// let was_removed = property.unsubscribe(id).unwrap();
468    /// assert!(was_removed); // Observer existed and was removed
469    ///
470    /// let was_removed_again = property.unsubscribe(id).unwrap();
471    /// assert!(!was_removed_again); // Observer no longer exists
472    /// ```
473    pub fn unsubscribe(&self, id: ObserverId) -> Result<bool, PropertyError> {
474        let mut prop = self
475            .inner
476            .write()
477            .map_err(|_| PropertyError::WriteLockError {
478                context: "unsubscribing observer".to_string(),
479            })?;
480
481        let was_present = prop.observers.remove(&id).is_some();
482        Ok(was_present)
483    }
484
485    /// Subscribes an observer that only gets called when a filter condition is met
486    ///
487    /// This is useful for observing only specific types of changes, such as
488    /// when a value increases or crosses a threshold.
489    ///
490    /// # Arguments
491    ///
492    /// * `observer` - The observer function to call when the filter passes
493    /// * `filter` - A predicate function that receives `(old_value, new_value)` and returns `bool`
494    ///
495    /// # Returns
496    ///
497    /// `Ok(ObserverId)` for the filtered observer, or `Err(PropertyError)` if the lock is poisoned.
498    ///
499    /// # Examples
500    ///
501    /// ```rust
502    /// use observable_property::ObservableProperty;
503    /// use std::sync::Arc;
504    ///
505    /// let property = ObservableProperty::new(0);
506    ///
507    /// // Only notify when value increases
508    /// let id = property.subscribe_filtered(
509    ///     Arc::new(|old, new| println!("Value increased: {} -> {}", old, new)),
510    ///     |old, new| new > old
511    /// ).unwrap();
512    ///
513    /// property.set(10).unwrap(); // Triggers observer (0 -> 10)
514    /// property.set(5).unwrap();  // Does NOT trigger observer (10 -> 5)
515    /// property.set(15).unwrap(); // Triggers observer (5 -> 15)
516    /// ```
517    pub fn subscribe_filtered<F>(
518        &self,
519        observer: Observer<T>,
520        filter: F,
521    ) -> Result<ObserverId, PropertyError>
522    where
523        F: Fn(&T, &T) -> bool + Send + Sync + 'static,
524    {
525        let filter = Arc::new(filter);
526        let filtered_observer = Arc::new(move |old_val: &T, new_val: &T| {
527            if filter(old_val, new_val) {
528                observer(old_val, new_val);
529            }
530        });
531
532        self.subscribe(filtered_observer)
533    }
534}
535
536impl<T: Clone> Clone for ObservableProperty<T> {
537    /// Creates a new reference to the same observable property
538    ///
539    /// This creates a new `ObservableProperty` instance that shares the same
540    /// underlying data with the original. Changes made through either instance
541    /// will be visible to observers subscribed through both instances.
542    ///
543    /// # Examples
544    ///
545    /// ```rust
546    /// use observable_property::ObservableProperty;
547    /// use std::sync::Arc;
548    ///
549    /// let property1 = ObservableProperty::new(42);
550    /// let property2 = property1.clone();
551    ///
552    /// property2.subscribe(Arc::new(|old, new| {
553    ///     println!("Observer on property2 saw change: {} -> {}", old, new);
554    /// })).unwrap();
555    ///
556    /// // This change through property1 will trigger the observer on property2
557    /// property1.set(100).unwrap();
558    /// ```
559    fn clone(&self) -> Self {
560        Self {
561            inner: Arc::clone(&self.inner),
562        }
563    }
564}
565
566impl<T: Clone + std::fmt::Debug + Send + Sync + 'static> std::fmt::Debug for ObservableProperty<T> {
567    /// Debug implementation that shows the current value if accessible
568    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
569        match self.get() {
570            Ok(value) => f.debug_struct("ObservableProperty")
571                .field("value", &value)
572                .field("observers_count", &"[hidden]")
573                .finish(),
574            Err(_) => f.debug_struct("ObservableProperty")
575                .field("value", &"[inaccessible]")
576                .field("observers_count", &"[hidden]")
577                .finish(),
578        }
579    }
580}
581
582#[cfg(test)]
583mod tests {
584    use super::*;
585    use std::sync::atomic::{AtomicUsize, Ordering};
586    use std::time::Duration;
587
588    #[test]
589    fn test_property_creation_and_basic_operations() {
590        let prop = ObservableProperty::new(42);
591        
592        // Test initial value
593        match prop.get() {
594            Ok(value) => assert_eq!(value, 42),
595            Err(e) => panic!("Failed to get initial value: {}", e),
596        }
597        
598        // Test setting value
599        if let Err(e) = prop.set(100) {
600            panic!("Failed to set value: {}", e);
601        }
602        
603        match prop.get() {
604            Ok(value) => assert_eq!(value, 100),
605            Err(e) => panic!("Failed to get updated value: {}", e),
606        }
607    }
608
609    #[test]
610    fn test_observer_subscription_and_notification() {
611        let prop = ObservableProperty::new("initial".to_string());
612        let notification_count = Arc::new(AtomicUsize::new(0));
613        let last_old_value = Arc::new(RwLock::new(String::new()));
614        let last_new_value = Arc::new(RwLock::new(String::new()));
615        
616        let count_clone = notification_count.clone();
617        let old_clone = last_old_value.clone();
618        let new_clone = last_new_value.clone();
619        
620        let observer_id = match prop.subscribe(Arc::new(move |old, new| {
621            count_clone.fetch_add(1, Ordering::SeqCst);
622            if let Ok(mut old_val) = old_clone.write() {
623                *old_val = old.clone();
624            }
625            if let Ok(mut new_val) = new_clone.write() {
626                *new_val = new.clone();
627            }
628        })) {
629            Ok(id) => id,
630            Err(e) => panic!("Failed to subscribe observer: {}", e),
631        };
632        
633        // Change value and verify notification
634        if let Err(e) = prop.set("changed".to_string()) {
635            panic!("Failed to set property value: {}", e);
636        }
637        
638        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
639        
640        match last_old_value.read() {
641            Ok(old_val) => assert_eq!(*old_val, "initial"),
642            Err(e) => panic!("Failed to read old value: {:?}", e),
643        }
644        
645        match last_new_value.read() {
646            Ok(new_val) => assert_eq!(*new_val, "changed"),
647            Err(e) => panic!("Failed to read new value: {:?}", e),
648        }
649        
650        // Test unsubscription
651        match prop.unsubscribe(observer_id) {
652            Ok(was_present) => assert!(was_present),
653            Err(e) => panic!("Failed to unsubscribe observer: {}", e),
654        }
655        
656        // Change value again - should not notify
657        if let Err(e) = prop.set("not_notified".to_string()) {
658            panic!("Failed to set property value after unsubscribe: {}", e);
659        }
660        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
661    }
662
663    #[test]
664    fn test_filtered_observer() {
665        let prop = ObservableProperty::new(0i32);
666        let notification_count = Arc::new(AtomicUsize::new(0));
667        let count_clone = notification_count.clone();
668        
669        // Observer only triggered when value increases
670        let observer_id = match prop.subscribe_filtered(
671            Arc::new(move |_, _| {
672                count_clone.fetch_add(1, Ordering::SeqCst);
673            }),
674            |old, new| new > old
675        ) {
676            Ok(id) => id,
677            Err(e) => panic!("Failed to subscribe filtered observer: {}", e),
678        };
679        
680        // Should trigger (0 -> 5)
681        if let Err(e) = prop.set(5) {
682            panic!("Failed to set property value to 5: {}", e);
683        }
684        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
685        
686        // Should NOT trigger (5 -> 3)
687        if let Err(e) = prop.set(3) {
688            panic!("Failed to set property value to 3: {}", e);
689        }
690        assert_eq!(notification_count.load(Ordering::SeqCst), 1);
691        
692        // Should trigger (3 -> 10)
693        if let Err(e) = prop.set(10) {
694            panic!("Failed to set property value to 10: {}", e);
695        }
696        assert_eq!(notification_count.load(Ordering::SeqCst), 2);
697        
698        match prop.unsubscribe(observer_id) {
699            Ok(_) => {},
700            Err(e) => panic!("Failed to unsubscribe filtered observer: {}", e),
701        }
702    }
703
704    #[test]
705    fn test_thread_safety_concurrent_reads() {
706        let prop = Arc::new(ObservableProperty::new(42i32));
707        let num_threads = 10;
708        let reads_per_thread = 100;
709        
710        let handles: Vec<_> = (0..num_threads).map(|_| {
711            let prop_clone = prop.clone();
712            thread::spawn(move || {
713                for _ in 0..reads_per_thread {
714                    match prop_clone.get() {
715                        Ok(value) => assert_eq!(value, 42),
716                        Err(e) => panic!("Failed to read property value: {}", e),
717                    }
718                    thread::sleep(Duration::from_millis(1));
719                }
720            })
721        }).collect();
722        
723        for handle in handles {
724            if let Err(e) = handle.join() {
725                panic!("Thread failed to complete: {:?}", e);
726            }
727        }
728    }
729
730    #[test]
731    fn test_async_set_performance() {
732        let prop = ObservableProperty::new(0i32);
733        let slow_observer_count = Arc::new(AtomicUsize::new(0));
734        let count_clone = slow_observer_count.clone();
735        
736        // Add observer that simulates slow work
737        let _id = match prop.subscribe(Arc::new(move |_, _| {
738            thread::sleep(Duration::from_millis(50));
739            count_clone.fetch_add(1, Ordering::SeqCst);
740        })) {
741            Ok(id) => id,
742            Err(e) => panic!("Failed to subscribe slow observer: {}", e),
743        };
744        
745        // Test synchronous set (should be slow)
746        let start = std::time::Instant::now();
747        if let Err(e) = prop.set(1) {
748            panic!("Failed to set property value synchronously: {}", e);
749        }
750        let sync_duration = start.elapsed();
751        
752        // Test asynchronous set (should be fast)
753        let start = std::time::Instant::now();
754        if let Err(e) = prop.set_async(2) {
755            panic!("Failed to set property value asynchronously: {}", e);
756        }
757        let async_duration = start.elapsed();
758        
759        // Async should be much faster than sync
760        assert!(async_duration < sync_duration);
761        assert!(async_duration.as_millis() < 10); // Should be very fast
762        
763        // Wait for async observer to complete
764        thread::sleep(Duration::from_millis(100));
765        
766        // Both observers should have been called
767        assert_eq!(slow_observer_count.load(Ordering::SeqCst), 2);
768    }
769
770    #[test]
771    fn test_lock_poisoning() {
772        // Create a property that we'll poison
773        let prop = Arc::new(ObservableProperty::new(0));
774        let prop_clone = prop.clone();
775
776        // Create a thread that will deliberately poison the lock
777        let poison_thread = thread::spawn(move || {
778            // Get write lock and then panic, which will poison the lock
779            let _guard = prop_clone.inner.write().unwrap();
780            panic!("Deliberate panic to poison the lock");
781        });
782
783        // Wait for the thread to complete (it will panic)
784        let _ = poison_thread.join();
785
786        // Now the lock should be poisoned, verify all operations return appropriate errors
787        match prop.get() {
788            Ok(_) => panic!("get() should fail on a poisoned lock"),
789            Err(e) => match e {
790                PropertyError::PoisonedLock => {} // Expected error
791                _ => panic!("Expected PoisonedLock error, got: {:?}", e),
792            }
793        }
794
795        match prop.set(42) {
796            Ok(_) => panic!("set() should fail on a poisoned lock"),
797            Err(e) => match e {
798                PropertyError::WriteLockError { .. } | PropertyError::PoisonedLock => {} // Either is acceptable
799                _ => panic!("Expected lock-related error, got: {:?}", e),
800            }
801        }
802
803        match prop.subscribe(Arc::new(|_, _| {})) {
804            Ok(_) => panic!("subscribe() should fail on a poisoned lock"),
805            Err(e) => match e {
806                PropertyError::WriteLockError { .. } | PropertyError::PoisonedLock => {} // Either is acceptable
807                _ => panic!("Expected lock-related error, got: {:?}", e),
808            }
809        }
810    }
811
812    #[test]
813    fn test_observer_panic_isolation() {
814        let prop = ObservableProperty::new(0);
815        let call_counts = Arc::new(AtomicUsize::new(0));
816
817        // First observer will panic
818        let panic_observer_id = prop.subscribe(Arc::new(|_, _| {
819            panic!("This observer deliberately panics");
820        })).unwrap();
821
822        // Second observer should still be called despite first one panicking
823        let counts = call_counts.clone();
824        let normal_observer_id = prop.subscribe(Arc::new(move |_, _| {
825            counts.fetch_add(1, Ordering::SeqCst);
826        })).unwrap();
827
828        // Trigger the observers - this shouldn't panic despite the first observer panicking
829        prop.set(42).unwrap();
830
831        // Verify the second observer was still called
832        assert_eq!(call_counts.load(Ordering::SeqCst), 1);
833
834        // Clean up
835        prop.unsubscribe(panic_observer_id).unwrap();
836        prop.unsubscribe(normal_observer_id).unwrap();
837    }
838
839    #[test]
840    fn test_observer_id_edge_cases() {
841        let prop = ObservableProperty::new(0);
842        let mut observer_ids = Vec::new();
843
844        // Helper function to generate many observer IDs
845        // Using a separate function instead of a closure to avoid borrow issues
846        fn subscribe_observers(prop: &ObservableProperty<i32>, count: usize) -> Vec<ObserverId> {
847            let mut ids = Vec::with_capacity(count);
848            for _ in 0..count {
849                let id = prop.subscribe(Arc::new(|_, _| {})).unwrap();
850                ids.push(id);
851            }
852            ids
853        }
854
855        // 1. Test very large number of observers (testing ID generation)
856        let new_ids = subscribe_observers(&prop, 1000);
857        observer_ids.extend(new_ids);
858
859        // 2. Verify all IDs are unique
860        let unique_count = observer_ids.iter().collect::<std::collections::HashSet<_>>().len();
861        assert_eq!(unique_count, observer_ids.len(), "Observer IDs should be unique");
862
863        // 3. Test unsubscribing all observers
864        for id in &observer_ids {
865            let result = prop.unsubscribe(*id).unwrap();
866            assert!(result, "Unsubscribe should return true for valid ID");
867        }
868
869        // 4. Create one more observer to ensure ID generation works after mass unsubscribe
870        let new_id = prop.subscribe(Arc::new(|_, _| {})).unwrap();
871
872        // Observe internal state to confirm next_id didn't reset (would require exposing internal state)
873        // Instead, we just verify the new ID doesn't duplicate any previous ID
874        assert!(!observer_ids.contains(&new_id), "New ID should not duplicate any previous ID");
875    }
876
877    #[test]
878    fn test_complex_custom_types() {
879        #[derive(Clone, Debug, PartialEq)]
880        struct ComplexType {
881            name: String,
882            values: Vec<i32>,
883            metadata: HashMap<String, String>,
884        }
885
886        let initial = ComplexType {
887            name: "Initial".to_string(),
888            values: vec![1, 2, 3],
889            metadata: {
890                let mut map = HashMap::new();
891                map.insert("created".to_string(), "now".to_string());
892                map
893            },
894        };
895
896        let property = ObservableProperty::new(initial.clone());
897        let received_values = Arc::new(RwLock::new(Vec::new()));
898
899        let values_clone = received_values.clone();
900        let observer_id = property.subscribe(Arc::new(move |_, new| {
901            if let Ok(mut values) = values_clone.write() {
902                values.push(new.clone());
903            }
904        })).unwrap();
905
906        let updated = ComplexType {
907            name: "Updated".to_string(),
908            values: vec![4, 5, 6],
909            metadata: {
910                let mut map = HashMap::new();
911                map.insert("created".to_string(), "now".to_string());
912                map.insert("updated".to_string(), "later".to_string());
913                map
914            },
915        };
916
917        // Set the new value
918        property.set(updated.clone()).unwrap();
919
920        // Verify the observer received the correct value
921        let received = received_values.read().unwrap();
922        assert_eq!(received.len(), 1);
923        assert_eq!(&received[0], &updated);
924
925        // Verify get() returns the updated value
926        let current = property.get().unwrap();
927        assert_eq!(current, updated);
928
929        property.unsubscribe(observer_id).unwrap();
930    }
931
932    #[test]
933    fn test_unsubscribe_nonexistent_observer() {
934        let property = ObservableProperty::new(0);
935
936        // Generate a valid observer ID
937        let valid_id = property.subscribe(Arc::new(|_, _| {})).unwrap();
938
939        // Create an ID that doesn't exist (valid_id + 1000 should not exist)
940        let nonexistent_id = valid_id + 1000;
941
942        // Test unsubscribing a nonexistent observer
943        match property.unsubscribe(nonexistent_id) {
944            Ok(was_present) => {
945                assert!(!was_present, "Unsubscribe should return false for nonexistent ID");
946            },
947            Err(e) => panic!("Unsubscribe returned error: {:?}", e),
948        }
949
950        // Also verify that unsubscribing twice returns false the second time
951        property.unsubscribe(valid_id).unwrap(); // First unsubscribe should return true
952
953        let result = property.unsubscribe(valid_id).unwrap();
954        assert!(!result, "Second unsubscribe should return false");
955    }
956}