observable_property/lib.rs
1//! # Observable Property
2//!
3//! A thread-safe observable property implementation for Rust that allows you to
4//! observe changes to values across multiple threads.
5//!
6//! ## Features
7//!
8//! - **Thread-safe**: Uses `Arc<RwLock<>>` for safe concurrent access
9//! - **Observer pattern**: Subscribe to property changes with callbacks
10//! - **Filtered observers**: Only notify when specific conditions are met
11//! - **Async notifications**: Non-blocking observer notifications with background threads
12//! - **Panic isolation**: Observer panics don't crash the system
13//! - **Type-safe**: Generic implementation works with any `Clone + Send + Sync` type
14//!
15//! ## Quick Start
16//!
17//! ```rust
18//! use observable_property::ObservableProperty;
19//! use std::sync::Arc;
20//!
21//! // Create an observable property
22//! let property = ObservableProperty::new(42);
23//!
24//! // Subscribe to changes
25//! let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
26//! println!("Value changed from {} to {}", old_value, new_value);
27//! })).map_err(|e| {
28//! eprintln!("Failed to subscribe: {}", e);
29//! e
30//! })?;
31//!
32//! // Change the value (triggers observer)
33//! property.set(100).map_err(|e| {
34//! eprintln!("Failed to set value: {}", e);
35//! e
36//! })?;
37//!
38//! // Unsubscribe when done
39//! property.unsubscribe(observer_id).map_err(|e| {
40//! eprintln!("Failed to unsubscribe: {}", e);
41//! e
42//! })?;
43//! # Ok::<(), observable_property::PropertyError>(())
44//! ```
45//!
46//! ## Multi-threading Example
47//!
48//! ```rust
49//! use observable_property::ObservableProperty;
50//! use std::sync::Arc;
51//! use std::thread;
52//!
53//! let property = Arc::new(ObservableProperty::new(0));
54//! let property_clone = property.clone();
55//!
56//! // Subscribe from one thread
57//! property.subscribe(Arc::new(|old, new| {
58//! println!("Value changed: {} -> {}", old, new);
59//! })).map_err(|e| {
60//! eprintln!("Failed to subscribe: {}", e);
61//! e
62//! })?;
63//!
64//! // Modify from another thread
65//! thread::spawn(move || {
66//! if let Err(e) = property_clone.set(42) {
67//! eprintln!("Failed to set value: {}", e);
68//! }
69//! }).join().expect("Thread panicked");
70//! # Ok::<(), observable_property::PropertyError>(())
71//! ```
72
73use std::collections::HashMap;
74use std::fmt;
75use std::panic;
76use std::sync::{Arc, RwLock};
77use std::thread;
78
79/// Errors that can occur when working with ObservableProperty
80#[derive(Debug, Clone)]
81pub enum PropertyError {
82 /// Failed to acquire a read lock on the property
83 ReadLockError {
84 /// Context describing what operation was being attempted
85 context: String
86 },
87 /// Failed to acquire a write lock on the property
88 WriteLockError {
89 /// Context describing what operation was being attempted
90 context: String
91 },
92 /// Attempted to unsubscribe an observer that doesn't exist
93 ObserverNotFound {
94 /// The ID of the observer that wasn't found
95 id: usize
96 },
97 /// The property's lock has been poisoned due to a panic in another thread
98 PoisonedLock,
99 /// An observer function encountered an error during execution
100 ObserverError {
101 /// Description of what went wrong
102 reason: String
103 },
104}
105
106impl fmt::Display for PropertyError {
107 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108 match self {
109 PropertyError::ReadLockError { context } => {
110 write!(f, "Failed to acquire read lock: {}", context)
111 }
112 PropertyError::WriteLockError { context } => {
113 write!(f, "Failed to acquire write lock: {}", context)
114 }
115 PropertyError::ObserverNotFound { id } => {
116 write!(f, "Observer with ID {} not found", id)
117 }
118 PropertyError::PoisonedLock => {
119 write!(
120 f,
121 "Property is in a poisoned state due to a panic in another thread"
122 )
123 }
124 PropertyError::ObserverError { reason } => {
125 write!(f, "Observer execution failed: {}", reason)
126 }
127 }
128 }
129}
130
131impl std::error::Error for PropertyError {}
132
133/// Function type for observers that get called when property values change
134pub type Observer<T> = Arc<dyn Fn(&T, &T) + Send + Sync>;
135
136/// Unique identifier for registered observers
137pub type ObserverId = usize;
138
139/// A thread-safe observable property that notifies observers when its value changes
140///
141/// This type wraps a value of type `T` and allows multiple observers to be notified
142/// whenever the value is modified. All operations are thread-safe and can be called
143/// from multiple threads concurrently.
144///
145/// # Type Requirements
146///
147/// The generic type `T` must implement:
148/// - `Clone`: Required for returning values and passing them to observers
149/// - `Send`: Required for transferring between threads
150/// - `Sync`: Required for concurrent access from multiple threads
151/// - `'static`: Required for observer callbacks that may outlive the original scope
152///
153/// # Examples
154///
155/// ```rust
156/// use observable_property::ObservableProperty;
157/// use std::sync::Arc;
158///
159/// let property = ObservableProperty::new("initial".to_string());
160///
161/// let observer_id = property.subscribe(Arc::new(|old, new| {
162/// println!("Changed from '{}' to '{}'", old, new);
163/// })).map_err(|e| {
164/// eprintln!("Failed to subscribe: {}", e);
165/// e
166/// })?;
167///
168/// property.set("updated".to_string()).map_err(|e| {
169/// eprintln!("Failed to set value: {}", e);
170/// e
171/// })?; // Prints: Changed from 'initial' to 'updated'
172///
173/// property.unsubscribe(observer_id).map_err(|e| {
174/// eprintln!("Failed to unsubscribe: {}", e);
175/// e
176/// })?;
177/// # Ok::<(), observable_property::PropertyError>(())
178/// ```
179pub struct ObservableProperty<T> {
180 inner: Arc<RwLock<InnerProperty<T>>>,
181}
182
183struct InnerProperty<T> {
184 value: T,
185 observers: HashMap<ObserverId, Observer<T>>,
186 next_id: ObserverId,
187}
188
189impl<T: Clone + Send + Sync + 'static> ObservableProperty<T> {
190 /// Creates a new observable property with the given initial value
191 ///
192 /// # Arguments
193 ///
194 /// * `initial_value` - The starting value for this property
195 ///
196 /// # Examples
197 ///
198 /// ```rust
199 /// use observable_property::ObservableProperty;
200 ///
201 /// let property = ObservableProperty::new(42);
202 /// match property.get() {
203 /// Ok(value) => assert_eq!(value, 42),
204 /// Err(e) => eprintln!("Failed to get property value: {}", e),
205 /// }
206 /// ```
207 pub fn new(initial_value: T) -> Self {
208 Self {
209 inner: Arc::new(RwLock::new(InnerProperty {
210 value: initial_value,
211 observers: HashMap::new(),
212 next_id: 0,
213 })),
214 }
215 }
216
217 /// Gets the current value of the property
218 ///
219 /// This method acquires a read lock, which allows multiple concurrent readers
220 /// but will block if a writer currently holds the lock.
221 ///
222 /// # Returns
223 ///
224 /// `Ok(T)` containing a clone of the current value, or `Err(PropertyError)`
225 /// if the lock is poisoned.
226 ///
227 /// # Examples
228 ///
229 /// ```rust
230 /// use observable_property::ObservableProperty;
231 ///
232 /// let property = ObservableProperty::new("hello".to_string());
233 /// match property.get() {
234 /// Ok(value) => assert_eq!(value, "hello"),
235 /// Err(e) => eprintln!("Failed to get property value: {}", e),
236 /// }
237 /// ```
238 pub fn get(&self) -> Result<T, PropertyError> {
239 self.inner
240 .read()
241 .map(|prop| prop.value.clone())
242 .map_err(|_| PropertyError::PoisonedLock)
243 }
244
245 /// Sets the property to a new value and notifies all observers
246 ///
247 /// This method will:
248 /// 1. Acquire a write lock (blocking other readers/writers)
249 /// 2. Update the value and capture a snapshot of observers
250 /// 3. Release the lock
251 /// 4. Notify all observers sequentially with the old and new values
252 ///
253 /// Observer notifications are wrapped in panic recovery to prevent one
254 /// misbehaving observer from affecting others.
255 ///
256 /// # Arguments
257 ///
258 /// * `new_value` - The new value to set
259 ///
260 /// # Returns
261 ///
262 /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
263 ///
264 /// # Examples
265 ///
266 /// ```rust
267 /// use observable_property::ObservableProperty;
268 /// use std::sync::Arc;
269 ///
270 /// let property = ObservableProperty::new(10);
271 ///
272 /// property.subscribe(Arc::new(|old, new| {
273 /// println!("Value changed from {} to {}", old, new);
274 /// })).map_err(|e| {
275 /// eprintln!("Failed to subscribe: {}", e);
276 /// e
277 /// })?;
278 ///
279 /// property.set(20).map_err(|e| {
280 /// eprintln!("Failed to set property value: {}", e);
281 /// e
282 /// })?; // Triggers observer notification
283 /// # Ok::<(), observable_property::PropertyError>(())
284 /// ```
285 pub fn set(&self, new_value: T) -> Result<(), PropertyError> {
286 let (old_value, observers_snapshot) = {
287 let mut prop = self
288 .inner
289 .write()
290 .map_err(|_| PropertyError::WriteLockError {
291 context: "setting property value".to_string(),
292 })?;
293
294 let old_value = prop.value.clone();
295 prop.value = new_value.clone();
296 let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
297 (old_value, observers_snapshot)
298 };
299
300 for observer in observers_snapshot {
301 if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
302 observer(&old_value, &new_value);
303 })) {
304 eprintln!("Observer panic: {:?}", e);
305 }
306 }
307
308 Ok(())
309 }
310
311 /// Sets the property to a new value and notifies observers asynchronously
312 ///
313 /// This method is similar to `set()` but spawns observers in background threads
314 /// for non-blocking operation. This is useful when observers might perform
315 /// time-consuming operations.
316 ///
317 /// Observers are batched into groups and each batch runs in its own thread
318 /// to limit resource usage while still providing parallelism.
319 ///
320 /// # Arguments
321 ///
322 /// * `new_value` - The new value to set
323 ///
324 /// # Returns
325 ///
326 /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
327 /// Note that this only indicates the property was updated successfully;
328 /// observer execution happens asynchronously.
329 ///
330 /// # Examples
331 ///
332 /// ```rust
333 /// use observable_property::ObservableProperty;
334 /// use std::sync::Arc;
335 /// use std::time::Duration;
336 ///
337 /// let property = ObservableProperty::new(0);
338 ///
339 /// property.subscribe(Arc::new(|old, new| {
340 /// // This observer does slow work but won't block the caller
341 /// std::thread::sleep(Duration::from_millis(100));
342 /// println!("Slow observer: {} -> {}", old, new);
343 /// })).map_err(|e| {
344 /// eprintln!("Failed to subscribe: {}", e);
345 /// e
346 /// })?;
347 ///
348 /// // This returns immediately even though observer is slow
349 /// property.set_async(42).map_err(|e| {
350 /// eprintln!("Failed to set value asynchronously: {}", e);
351 /// e
352 /// })?;
353 /// # Ok::<(), observable_property::PropertyError>(())
354 /// ```
355 pub fn set_async(&self, new_value: T) -> Result<(), PropertyError> {
356 let (old_value, observers_snapshot) = {
357 let mut prop = self
358 .inner
359 .write()
360 .map_err(|_| PropertyError::WriteLockError {
361 context: "setting property value".to_string(),
362 })?;
363
364 let old_value = prop.value.clone();
365 prop.value = new_value.clone();
366 let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
367 (old_value, observers_snapshot)
368 };
369
370 if observers_snapshot.is_empty() {
371 return Ok(());
372 }
373
374 const MAX_THREADS: usize = 4;
375 let observers_per_thread = observers_snapshot.len().div_ceil(MAX_THREADS);
376
377 for batch in observers_snapshot.chunks(observers_per_thread) {
378 let batch_observers = batch.to_vec();
379 let old_val = old_value.clone();
380 let new_val = new_value.clone();
381
382 thread::spawn(move || {
383 for observer in batch_observers {
384 if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
385 observer(&old_val, &new_val);
386 })) {
387 eprintln!("Observer panic in batch thread: {:?}", e);
388 }
389 }
390 });
391 }
392
393 Ok(())
394 }
395
396 /// Subscribes an observer function to be called when the property changes
397 ///
398 /// The observer function will be called with the old and new values whenever
399 /// the property is modified via `set()` or `set_async()`.
400 ///
401 /// # Arguments
402 ///
403 /// * `observer` - A function wrapped in `Arc` that takes `(&T, &T)` parameters
404 ///
405 /// # Returns
406 ///
407 /// `Ok(ObserverId)` containing a unique identifier for this observer,
408 /// or `Err(PropertyError)` if the lock is poisoned.
409 ///
410 /// # Examples
411 ///
412 /// ```rust
413 /// use observable_property::ObservableProperty;
414 /// use std::sync::Arc;
415 ///
416 /// let property = ObservableProperty::new(0);
417 ///
418 /// let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
419 /// println!("Property changed from {} to {}", old_value, new_value);
420 /// })).map_err(|e| {
421 /// eprintln!("Failed to subscribe observer: {}", e);
422 /// e
423 /// })?;
424 ///
425 /// // Later, unsubscribe using the returned ID
426 /// property.unsubscribe(observer_id).map_err(|e| {
427 /// eprintln!("Failed to unsubscribe observer: {}", e);
428 /// e
429 /// })?;
430 /// # Ok::<(), observable_property::PropertyError>(())
431 /// ```
432 pub fn subscribe(&self, observer: Observer<T>) -> Result<ObserverId, PropertyError> {
433 let mut prop = self
434 .inner
435 .write()
436 .map_err(|_| PropertyError::WriteLockError {
437 context: "subscribing observer".to_string(),
438 })?;
439
440 let id = prop.next_id;
441 prop.next_id += 1;
442 prop.observers.insert(id, observer);
443 Ok(id)
444 }
445
446 /// Removes an observer by its ID
447 ///
448 /// # Arguments
449 ///
450 /// * `id` - The observer ID returned by `subscribe()`
451 ///
452 /// # Returns
453 ///
454 /// `Ok(bool)` where `true` means the observer was found and removed,
455 /// `false` means no observer with that ID existed.
456 /// Returns `Err(PropertyError)` if the lock is poisoned.
457 ///
458 /// # Examples
459 ///
460 /// ```rust
461 /// use observable_property::ObservableProperty;
462 /// use std::sync::Arc;
463 ///
464 /// let property = ObservableProperty::new(0);
465 /// let id = property.subscribe(Arc::new(|_, _| {})).unwrap();
466 ///
467 /// let was_removed = property.unsubscribe(id).unwrap();
468 /// assert!(was_removed); // Observer existed and was removed
469 ///
470 /// let was_removed_again = property.unsubscribe(id).unwrap();
471 /// assert!(!was_removed_again); // Observer no longer exists
472 /// ```
473 pub fn unsubscribe(&self, id: ObserverId) -> Result<bool, PropertyError> {
474 let mut prop = self
475 .inner
476 .write()
477 .map_err(|_| PropertyError::WriteLockError {
478 context: "unsubscribing observer".to_string(),
479 })?;
480
481 let was_present = prop.observers.remove(&id).is_some();
482 Ok(was_present)
483 }
484
485 /// Subscribes an observer that only gets called when a filter condition is met
486 ///
487 /// This is useful for observing only specific types of changes, such as
488 /// when a value increases or crosses a threshold.
489 ///
490 /// # Arguments
491 ///
492 /// * `observer` - The observer function to call when the filter passes
493 /// * `filter` - A predicate function that receives `(old_value, new_value)` and returns `bool`
494 ///
495 /// # Returns
496 ///
497 /// `Ok(ObserverId)` for the filtered observer, or `Err(PropertyError)` if the lock is poisoned.
498 ///
499 /// # Examples
500 ///
501 /// ```rust
502 /// use observable_property::ObservableProperty;
503 /// use std::sync::Arc;
504 ///
505 /// let property = ObservableProperty::new(0);
506 ///
507 /// // Only notify when value increases
508 /// let id = property.subscribe_filtered(
509 /// Arc::new(|old, new| println!("Value increased: {} -> {}", old, new)),
510 /// |old, new| new > old
511 /// ).unwrap();
512 ///
513 /// property.set(10).unwrap(); // Triggers observer (0 -> 10)
514 /// property.set(5).unwrap(); // Does NOT trigger observer (10 -> 5)
515 /// property.set(15).unwrap(); // Triggers observer (5 -> 15)
516 /// ```
517 pub fn subscribe_filtered<F>(
518 &self,
519 observer: Observer<T>,
520 filter: F,
521 ) -> Result<ObserverId, PropertyError>
522 where
523 F: Fn(&T, &T) -> bool + Send + Sync + 'static,
524 {
525 let filter = Arc::new(filter);
526 let filtered_observer = Arc::new(move |old_val: &T, new_val: &T| {
527 if filter(old_val, new_val) {
528 observer(old_val, new_val);
529 }
530 });
531
532 self.subscribe(filtered_observer)
533 }
534}
535
536impl<T: Clone> Clone for ObservableProperty<T> {
537 /// Creates a new reference to the same observable property
538 ///
539 /// This creates a new `ObservableProperty` instance that shares the same
540 /// underlying data with the original. Changes made through either instance
541 /// will be visible to observers subscribed through both instances.
542 ///
543 /// # Examples
544 ///
545 /// ```rust
546 /// use observable_property::ObservableProperty;
547 /// use std::sync::Arc;
548 ///
549 /// let property1 = ObservableProperty::new(42);
550 /// let property2 = property1.clone();
551 ///
552 /// property2.subscribe(Arc::new(|old, new| {
553 /// println!("Observer on property2 saw change: {} -> {}", old, new);
554 /// })).unwrap();
555 ///
556 /// // This change through property1 will trigger the observer on property2
557 /// property1.set(100).unwrap();
558 /// ```
559 fn clone(&self) -> Self {
560 Self {
561 inner: Arc::clone(&self.inner),
562 }
563 }
564}
565
566impl<T: Clone + std::fmt::Debug + Send + Sync + 'static> std::fmt::Debug for ObservableProperty<T> {
567 /// Debug implementation that shows the current value if accessible
568 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
569 match self.get() {
570 Ok(value) => f.debug_struct("ObservableProperty")
571 .field("value", &value)
572 .field("observers_count", &"[hidden]")
573 .finish(),
574 Err(_) => f.debug_struct("ObservableProperty")
575 .field("value", &"[inaccessible]")
576 .field("observers_count", &"[hidden]")
577 .finish(),
578 }
579 }
580}
581
582#[cfg(test)]
583mod tests {
584 use super::*;
585 use std::sync::atomic::{AtomicUsize, Ordering};
586 use std::time::Duration;
587
588 #[test]
589 fn test_property_creation_and_basic_operations() {
590 let prop = ObservableProperty::new(42);
591
592 // Test initial value
593 match prop.get() {
594 Ok(value) => assert_eq!(value, 42),
595 Err(e) => panic!("Failed to get initial value: {}", e),
596 }
597
598 // Test setting value
599 if let Err(e) = prop.set(100) {
600 panic!("Failed to set value: {}", e);
601 }
602
603 match prop.get() {
604 Ok(value) => assert_eq!(value, 100),
605 Err(e) => panic!("Failed to get updated value: {}", e),
606 }
607 }
608
609 #[test]
610 fn test_observer_subscription_and_notification() {
611 let prop = ObservableProperty::new("initial".to_string());
612 let notification_count = Arc::new(AtomicUsize::new(0));
613 let last_old_value = Arc::new(RwLock::new(String::new()));
614 let last_new_value = Arc::new(RwLock::new(String::new()));
615
616 let count_clone = notification_count.clone();
617 let old_clone = last_old_value.clone();
618 let new_clone = last_new_value.clone();
619
620 let observer_id = match prop.subscribe(Arc::new(move |old, new| {
621 count_clone.fetch_add(1, Ordering::SeqCst);
622 if let Ok(mut old_val) = old_clone.write() {
623 *old_val = old.clone();
624 }
625 if let Ok(mut new_val) = new_clone.write() {
626 *new_val = new.clone();
627 }
628 })) {
629 Ok(id) => id,
630 Err(e) => panic!("Failed to subscribe observer: {}", e),
631 };
632
633 // Change value and verify notification
634 if let Err(e) = prop.set("changed".to_string()) {
635 panic!("Failed to set property value: {}", e);
636 }
637
638 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
639
640 match last_old_value.read() {
641 Ok(old_val) => assert_eq!(*old_val, "initial"),
642 Err(e) => panic!("Failed to read old value: {:?}", e),
643 }
644
645 match last_new_value.read() {
646 Ok(new_val) => assert_eq!(*new_val, "changed"),
647 Err(e) => panic!("Failed to read new value: {:?}", e),
648 }
649
650 // Test unsubscription
651 match prop.unsubscribe(observer_id) {
652 Ok(was_present) => assert!(was_present),
653 Err(e) => panic!("Failed to unsubscribe observer: {}", e),
654 }
655
656 // Change value again - should not notify
657 if let Err(e) = prop.set("not_notified".to_string()) {
658 panic!("Failed to set property value after unsubscribe: {}", e);
659 }
660 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
661 }
662
663 #[test]
664 fn test_filtered_observer() {
665 let prop = ObservableProperty::new(0i32);
666 let notification_count = Arc::new(AtomicUsize::new(0));
667 let count_clone = notification_count.clone();
668
669 // Observer only triggered when value increases
670 let observer_id = match prop.subscribe_filtered(
671 Arc::new(move |_, _| {
672 count_clone.fetch_add(1, Ordering::SeqCst);
673 }),
674 |old, new| new > old
675 ) {
676 Ok(id) => id,
677 Err(e) => panic!("Failed to subscribe filtered observer: {}", e),
678 };
679
680 // Should trigger (0 -> 5)
681 if let Err(e) = prop.set(5) {
682 panic!("Failed to set property value to 5: {}", e);
683 }
684 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
685
686 // Should NOT trigger (5 -> 3)
687 if let Err(e) = prop.set(3) {
688 panic!("Failed to set property value to 3: {}", e);
689 }
690 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
691
692 // Should trigger (3 -> 10)
693 if let Err(e) = prop.set(10) {
694 panic!("Failed to set property value to 10: {}", e);
695 }
696 assert_eq!(notification_count.load(Ordering::SeqCst), 2);
697
698 match prop.unsubscribe(observer_id) {
699 Ok(_) => {},
700 Err(e) => panic!("Failed to unsubscribe filtered observer: {}", e),
701 }
702 }
703
704 #[test]
705 fn test_thread_safety_concurrent_reads() {
706 let prop = Arc::new(ObservableProperty::new(42i32));
707 let num_threads = 10;
708 let reads_per_thread = 100;
709
710 let handles: Vec<_> = (0..num_threads).map(|_| {
711 let prop_clone = prop.clone();
712 thread::spawn(move || {
713 for _ in 0..reads_per_thread {
714 match prop_clone.get() {
715 Ok(value) => assert_eq!(value, 42),
716 Err(e) => panic!("Failed to read property value: {}", e),
717 }
718 thread::sleep(Duration::from_millis(1));
719 }
720 })
721 }).collect();
722
723 for handle in handles {
724 if let Err(e) = handle.join() {
725 panic!("Thread failed to complete: {:?}", e);
726 }
727 }
728 }
729
730 #[test]
731 fn test_async_set_performance() {
732 let prop = ObservableProperty::new(0i32);
733 let slow_observer_count = Arc::new(AtomicUsize::new(0));
734 let count_clone = slow_observer_count.clone();
735
736 // Add observer that simulates slow work
737 let _id = match prop.subscribe(Arc::new(move |_, _| {
738 thread::sleep(Duration::from_millis(50));
739 count_clone.fetch_add(1, Ordering::SeqCst);
740 })) {
741 Ok(id) => id,
742 Err(e) => panic!("Failed to subscribe slow observer: {}", e),
743 };
744
745 // Test synchronous set (should be slow)
746 let start = std::time::Instant::now();
747 if let Err(e) = prop.set(1) {
748 panic!("Failed to set property value synchronously: {}", e);
749 }
750 let sync_duration = start.elapsed();
751
752 // Test asynchronous set (should be fast)
753 let start = std::time::Instant::now();
754 if let Err(e) = prop.set_async(2) {
755 panic!("Failed to set property value asynchronously: {}", e);
756 }
757 let async_duration = start.elapsed();
758
759 // Async should be much faster than sync
760 assert!(async_duration < sync_duration);
761 assert!(async_duration.as_millis() < 10); // Should be very fast
762
763 // Wait for async observer to complete
764 thread::sleep(Duration::from_millis(100));
765
766 // Both observers should have been called
767 assert_eq!(slow_observer_count.load(Ordering::SeqCst), 2);
768 }
769
770 #[test]
771 fn test_lock_poisoning() {
772 // Create a property that we'll poison
773 let prop = Arc::new(ObservableProperty::new(0));
774 let prop_clone = prop.clone();
775
776 // Create a thread that will deliberately poison the lock
777 let poison_thread = thread::spawn(move || {
778 // Get write lock and then panic, which will poison the lock
779 let _guard = prop_clone.inner.write().unwrap();
780 panic!("Deliberate panic to poison the lock");
781 });
782
783 // Wait for the thread to complete (it will panic)
784 let _ = poison_thread.join();
785
786 // Now the lock should be poisoned, verify all operations return appropriate errors
787 match prop.get() {
788 Ok(_) => panic!("get() should fail on a poisoned lock"),
789 Err(e) => match e {
790 PropertyError::PoisonedLock => {} // Expected error
791 _ => panic!("Expected PoisonedLock error, got: {:?}", e),
792 }
793 }
794
795 match prop.set(42) {
796 Ok(_) => panic!("set() should fail on a poisoned lock"),
797 Err(e) => match e {
798 PropertyError::WriteLockError { .. } | PropertyError::PoisonedLock => {} // Either is acceptable
799 _ => panic!("Expected lock-related error, got: {:?}", e),
800 }
801 }
802
803 match prop.subscribe(Arc::new(|_, _| {})) {
804 Ok(_) => panic!("subscribe() should fail on a poisoned lock"),
805 Err(e) => match e {
806 PropertyError::WriteLockError { .. } | PropertyError::PoisonedLock => {} // Either is acceptable
807 _ => panic!("Expected lock-related error, got: {:?}", e),
808 }
809 }
810 }
811
812 #[test]
813 fn test_observer_panic_isolation() {
814 let prop = ObservableProperty::new(0);
815 let call_counts = Arc::new(AtomicUsize::new(0));
816
817 // First observer will panic
818 let panic_observer_id = prop.subscribe(Arc::new(|_, _| {
819 panic!("This observer deliberately panics");
820 })).unwrap();
821
822 // Second observer should still be called despite first one panicking
823 let counts = call_counts.clone();
824 let normal_observer_id = prop.subscribe(Arc::new(move |_, _| {
825 counts.fetch_add(1, Ordering::SeqCst);
826 })).unwrap();
827
828 // Trigger the observers - this shouldn't panic despite the first observer panicking
829 prop.set(42).unwrap();
830
831 // Verify the second observer was still called
832 assert_eq!(call_counts.load(Ordering::SeqCst), 1);
833
834 // Clean up
835 prop.unsubscribe(panic_observer_id).unwrap();
836 prop.unsubscribe(normal_observer_id).unwrap();
837 }
838
839 #[test]
840 fn test_observer_id_edge_cases() {
841 let prop = ObservableProperty::new(0);
842 let mut observer_ids = Vec::new();
843
844 // Helper function to generate many observer IDs
845 // Using a separate function instead of a closure to avoid borrow issues
846 fn subscribe_observers(prop: &ObservableProperty<i32>, count: usize) -> Vec<ObserverId> {
847 let mut ids = Vec::with_capacity(count);
848 for _ in 0..count {
849 let id = prop.subscribe(Arc::new(|_, _| {})).unwrap();
850 ids.push(id);
851 }
852 ids
853 }
854
855 // 1. Test very large number of observers (testing ID generation)
856 let new_ids = subscribe_observers(&prop, 1000);
857 observer_ids.extend(new_ids);
858
859 // 2. Verify all IDs are unique
860 let unique_count = observer_ids.iter().collect::<std::collections::HashSet<_>>().len();
861 assert_eq!(unique_count, observer_ids.len(), "Observer IDs should be unique");
862
863 // 3. Test unsubscribing all observers
864 for id in &observer_ids {
865 let result = prop.unsubscribe(*id).unwrap();
866 assert!(result, "Unsubscribe should return true for valid ID");
867 }
868
869 // 4. Create one more observer to ensure ID generation works after mass unsubscribe
870 let new_id = prop.subscribe(Arc::new(|_, _| {})).unwrap();
871
872 // Observe internal state to confirm next_id didn't reset (would require exposing internal state)
873 // Instead, we just verify the new ID doesn't duplicate any previous ID
874 assert!(!observer_ids.contains(&new_id), "New ID should not duplicate any previous ID");
875 }
876
877 #[test]
878 fn test_complex_custom_types() {
879 #[derive(Clone, Debug, PartialEq)]
880 struct ComplexType {
881 name: String,
882 values: Vec<i32>,
883 metadata: HashMap<String, String>,
884 }
885
886 let initial = ComplexType {
887 name: "Initial".to_string(),
888 values: vec![1, 2, 3],
889 metadata: {
890 let mut map = HashMap::new();
891 map.insert("created".to_string(), "now".to_string());
892 map
893 },
894 };
895
896 let property = ObservableProperty::new(initial.clone());
897 let received_values = Arc::new(RwLock::new(Vec::new()));
898
899 let values_clone = received_values.clone();
900 let observer_id = property.subscribe(Arc::new(move |_, new| {
901 if let Ok(mut values) = values_clone.write() {
902 values.push(new.clone());
903 }
904 })).unwrap();
905
906 let updated = ComplexType {
907 name: "Updated".to_string(),
908 values: vec![4, 5, 6],
909 metadata: {
910 let mut map = HashMap::new();
911 map.insert("created".to_string(), "now".to_string());
912 map.insert("updated".to_string(), "later".to_string());
913 map
914 },
915 };
916
917 // Set the new value
918 property.set(updated.clone()).unwrap();
919
920 // Verify the observer received the correct value
921 let received = received_values.read().unwrap();
922 assert_eq!(received.len(), 1);
923 assert_eq!(&received[0], &updated);
924
925 // Verify get() returns the updated value
926 let current = property.get().unwrap();
927 assert_eq!(current, updated);
928
929 property.unsubscribe(observer_id).unwrap();
930 }
931
932 #[test]
933 fn test_unsubscribe_nonexistent_observer() {
934 let property = ObservableProperty::new(0);
935
936 // Generate a valid observer ID
937 let valid_id = property.subscribe(Arc::new(|_, _| {})).unwrap();
938
939 // Create an ID that doesn't exist (valid_id + 1000 should not exist)
940 let nonexistent_id = valid_id + 1000;
941
942 // Test unsubscribing a nonexistent observer
943 match property.unsubscribe(nonexistent_id) {
944 Ok(was_present) => {
945 assert!(!was_present, "Unsubscribe should return false for nonexistent ID");
946 },
947 Err(e) => panic!("Unsubscribe returned error: {:?}", e),
948 }
949
950 // Also verify that unsubscribing twice returns false the second time
951 property.unsubscribe(valid_id).unwrap(); // First unsubscribe should return true
952
953 let result = property.unsubscribe(valid_id).unwrap();
954 assert!(!result, "Second unsubscribe should return false");
955 }
956}