observable_property/lib.rs
1//! # Observable Property
2//!
3//! A thread-safe observable property implementation for Rust that allows you to
4//! observe changes to values across multiple threads.
5//!
6//! ## Features
7//!
8//! - **Thread-safe**: Uses `Arc<RwLock<>>` for safe concurrent access
9//! - **Observer pattern**: Subscribe to property changes with callbacks
10//! - **Filtered observers**: Only notify when specific conditions are met
11//! - **Async notifications**: Non-blocking observer notifications with background threads
12//! - **Panic isolation**: Observer panics don't crash the system
13//! - **Type-safe**: Generic implementation works with any `Clone + Send + Sync` type
14//!
15//! ## Quick Start
16//!
17//! ```rust
18//! use observable_property::ObservableProperty;
19//! use std::sync::Arc;
20//!
21//! // Create an observable property
22//! let property = ObservableProperty::new(42);
23//!
24//! // Subscribe to changes
25//! let observer_id = match property.subscribe(Arc::new(|old_value, new_value| {
26//! println!("Value changed from {} to {}", old_value, new_value);
27//! })) {
28//! Ok(id) => id,
29//! Err(e) => {
30//! eprintln!("Failed to subscribe observer: {}", e);
31//! return; // or handle error appropriately
32//! }
33//! };
34//!
35//! // Change the value (triggers observer)
36//! if let Err(e) = property.set(100) {
37//! eprintln!("Failed to set property value: {}", e);
38//! return; // or handle error appropriately
39//! }
40//!
41//! // Unsubscribe when done
42//! match property.unsubscribe(observer_id) {
43//! Ok(_) => println!("Observer unsubscribed successfully"),
44//! Err(e) => eprintln!("Failed to unsubscribe observer: {}", e),
45//! }
46//! ```
47//!
48//! ## Multi-threading Example
49//!
50//! ```rust
51//! use observable_property::ObservableProperty;
52//! use std::sync::Arc;
53//! use std::thread;
54//!
55//! let property = Arc::new(ObservableProperty::new(0));
56//! let property_clone = property.clone();
57//!
58//! // Subscribe from one thread
59//! match property.subscribe(Arc::new(|old, new| {
60//! println!("Value changed: {} -> {}", old, new);
61//! })) {
62//! Ok(_) => println!("Observer subscribed successfully"),
63//! Err(e) => {
64//! eprintln!("Failed to subscribe observer: {}", e);
65//! return; // or handle error appropriately
66//! }
67//! };
68//!
69//! // Modify from another thread
70//! match thread::spawn(move || {
71//! if let Err(e) = property_clone.set(42) {
72//! eprintln!("Failed to set property value: {}", e);
73//! }
74//! }).join() {
75//! Ok(_) => println!("Thread completed successfully"),
76//! Err(e) => eprintln!("Thread panicked: {:?}", e),
77//! };
78//! ```
79
80use std::collections::HashMap;
81use std::fmt;
82use std::panic;
83use std::sync::{Arc, RwLock};
84use std::thread;
85
86/// Errors that can occur when working with ObservableProperty
87#[derive(Debug, Clone)]
88pub enum PropertyError {
89 /// Failed to acquire a read lock on the property
90 ReadLockError {
91 /// Context describing what operation was being attempted
92 context: String
93 },
94 /// Failed to acquire a write lock on the property
95 WriteLockError {
96 /// Context describing what operation was being attempted
97 context: String
98 },
99 /// Attempted to unsubscribe an observer that doesn't exist
100 ObserverNotFound {
101 /// The ID of the observer that wasn't found
102 id: usize
103 },
104 /// The property's lock has been poisoned due to a panic in another thread
105 PoisonedLock,
106 /// An observer function encountered an error during execution
107 ObserverError {
108 /// Description of what went wrong
109 reason: String
110 },
111}
112
113impl fmt::Display for PropertyError {
114 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115 match self {
116 PropertyError::ReadLockError { context } => {
117 write!(f, "Failed to acquire read lock: {}", context)
118 }
119 PropertyError::WriteLockError { context } => {
120 write!(f, "Failed to acquire write lock: {}", context)
121 }
122 PropertyError::ObserverNotFound { id } => {
123 write!(f, "Observer with ID {} not found", id)
124 }
125 PropertyError::PoisonedLock => {
126 write!(
127 f,
128 "Property is in a poisoned state due to a panic in another thread"
129 )
130 }
131 PropertyError::ObserverError { reason } => {
132 write!(f, "Observer execution failed: {}", reason)
133 }
134 }
135 }
136}
137
138impl std::error::Error for PropertyError {}
139
140/// Function type for observers that get called when property values change
141pub type Observer<T> = Arc<dyn Fn(&T, &T) + Send + Sync>;
142
143/// Unique identifier for registered observers
144pub type ObserverId = usize;
145
146/// A thread-safe observable property that notifies observers when its value changes
147///
148/// This type wraps a value of type `T` and allows multiple observers to be notified
149/// whenever the value is modified. All operations are thread-safe and can be called
150/// from multiple threads concurrently.
151///
152/// # Type Requirements
153///
154/// The generic type `T` must implement:
155/// - `Clone`: Required for returning values and passing them to observers
156/// - `Send`: Required for transferring between threads
157/// - `Sync`: Required for concurrent access from multiple threads
158/// - `'static`: Required for observer callbacks that may outlive the original scope
159///
160/// # Examples
161///
162/// ```rust
163/// use observable_property::ObservableProperty;
164/// use std::sync::Arc;
165///
166/// let property = ObservableProperty::new("initial".to_string());
167///
168/// let observer_id = match property.subscribe(Arc::new(|old, new| {
169/// println!("Changed from '{}' to '{}'", old, new);
170/// })) {
171/// Ok(id) => id,
172/// Err(e) => {
173/// eprintln!("Failed to subscribe observer: {}", e);
174/// return; // or handle error appropriately
175/// }
176/// };
177///
178/// if let Err(e) = property.set("updated".to_string()) {
179/// eprintln!("Failed to set property value: {}", e);
180/// } // Prints: Changed from 'initial' to 'updated'
181///
182/// if let Err(e) = property.unsubscribe(observer_id) {
183/// eprintln!("Failed to unsubscribe observer: {}", e);
184/// }
185/// ```
186pub struct ObservableProperty<T> {
187 inner: Arc<RwLock<InnerProperty<T>>>,
188}
189
190struct InnerProperty<T> {
191 value: T,
192 observers: HashMap<ObserverId, Observer<T>>,
193 next_id: ObserverId,
194}
195
196impl<T: Clone + Send + Sync + 'static> ObservableProperty<T> {
197 /// Creates a new observable property with the given initial value
198 ///
199 /// # Arguments
200 ///
201 /// * `initial_value` - The starting value for this property
202 ///
203 /// # Examples
204 ///
205 /// ```rust
206 /// use observable_property::ObservableProperty;
207 ///
208 /// let property = ObservableProperty::new(42);
209 /// match property.get() {
210 /// Ok(value) => assert_eq!(value, 42),
211 /// Err(e) => eprintln!("Failed to get property value: {}", e),
212 /// }
213 /// ```
214 pub fn new(initial_value: T) -> Self {
215 Self {
216 inner: Arc::new(RwLock::new(InnerProperty {
217 value: initial_value,
218 observers: HashMap::new(),
219 next_id: 0,
220 })),
221 }
222 }
223
224 /// Gets the current value of the property
225 ///
226 /// This method acquires a read lock, which allows multiple concurrent readers
227 /// but will block if a writer currently holds the lock.
228 ///
229 /// # Returns
230 ///
231 /// `Ok(T)` containing a clone of the current value, or `Err(PropertyError)`
232 /// if the lock is poisoned.
233 ///
234 /// # Examples
235 ///
236 /// ```rust
237 /// use observable_property::ObservableProperty;
238 ///
239 /// let property = ObservableProperty::new("hello".to_string());
240 /// match property.get() {
241 /// Ok(value) => assert_eq!(value, "hello"),
242 /// Err(e) => eprintln!("Failed to get property value: {}", e),
243 /// }
244 /// ```
245 pub fn get(&self) -> Result<T, PropertyError> {
246 self.inner
247 .read()
248 .map(|prop| prop.value.clone())
249 .map_err(|_| PropertyError::PoisonedLock)
250 }
251
252 /// Sets the property to a new value and notifies all observers
253 ///
254 /// This method will:
255 /// 1. Acquire a write lock (blocking other readers/writers)
256 /// 2. Update the value and capture a snapshot of observers
257 /// 3. Release the lock
258 /// 4. Notify all observers sequentially with the old and new values
259 ///
260 /// Observer notifications are wrapped in panic recovery to prevent one
261 /// misbehaving observer from affecting others.
262 ///
263 /// # Arguments
264 ///
265 /// * `new_value` - The new value to set
266 ///
267 /// # Returns
268 ///
269 /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
270 ///
271 /// # Examples
272 ///
273 /// ```rust
274 /// use observable_property::ObservableProperty;
275 /// use std::sync::Arc;
276 ///
277 /// let property = ObservableProperty::new(10);
278 ///
279 /// match property.subscribe(Arc::new(|old, new| {
280 /// println!("Value changed from {} to {}", old, new);
281 /// })) {
282 /// Ok(_) => println!("Observer subscribed successfully"),
283 /// Err(e) => {
284 /// eprintln!("Failed to subscribe observer: {}", e);
285 /// return; // or handle error appropriately
286 /// }
287 /// };
288 ///
289 /// // Triggers observer notification
290 /// if let Err(e) = property.set(20) {
291 /// eprintln!("Failed to set property value: {}", e);
292 /// }
293 /// ```
294 pub fn set(&self, new_value: T) -> Result<(), PropertyError> {
295 let (old_value, observers_snapshot) = {
296 let mut prop = self
297 .inner
298 .write()
299 .map_err(|_| PropertyError::WriteLockError {
300 context: "setting property value".to_string(),
301 })?;
302
303 let old_value = prop.value.clone();
304 prop.value = new_value.clone();
305 let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
306 (old_value, observers_snapshot)
307 };
308
309 for observer in observers_snapshot {
310 if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
311 observer(&old_value, &new_value);
312 })) {
313 eprintln!("Observer panic: {:?}", e);
314 }
315 }
316
317 Ok(())
318 }
319
320 /// Sets the property to a new value and notifies observers asynchronously
321 ///
322 /// This method is similar to `set()` but spawns observers in background threads
323 /// for non-blocking operation. This is useful when observers might perform
324 /// time-consuming operations.
325 ///
326 /// Observers are batched into groups and each batch runs in its own thread
327 /// to limit resource usage while still providing parallelism.
328 ///
329 /// # Arguments
330 ///
331 /// * `new_value` - The new value to set
332 ///
333 /// # Returns
334 ///
335 /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
336 /// Note that this only indicates the property was updated successfully;
337 /// observer execution happens asynchronously.
338 ///
339 /// # Examples
340 ///
341 /// ```rust
342 /// use observable_property::ObservableProperty;
343 /// use std::sync::Arc;
344 /// use std::time::Duration;
345 ///
346 /// let property = ObservableProperty::new(0);
347 ///
348 /// match property.subscribe(Arc::new(|old, new| {
349 /// // This observer does slow work but won't block the caller
350 /// std::thread::sleep(Duration::from_millis(100));
351 /// println!("Slow observer: {} -> {}", old, new);
352 /// })) {
353 /// Ok(_) => println!("Observer subscribed successfully"),
354 /// Err(e) => {
355 /// eprintln!("Failed to subscribe observer: {}", e);
356 /// return; // or handle error appropriately
357 /// }
358 /// };
359 ///
360 /// // This returns immediately even though observer is slow
361 /// if let Err(e) = property.set_async(42) {
362 /// eprintln!("Failed to set property value asynchronously: {}", e);
363 /// }
364 /// ```
365 pub fn set_async(&self, new_value: T) -> Result<(), PropertyError> {
366 let (old_value, observers_snapshot) = {
367 let mut prop = self
368 .inner
369 .write()
370 .map_err(|_| PropertyError::WriteLockError {
371 context: "setting property value".to_string(),
372 })?;
373
374 let old_value = prop.value.clone();
375 prop.value = new_value.clone();
376 let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
377 (old_value, observers_snapshot)
378 };
379
380 if observers_snapshot.is_empty() {
381 return Ok(());
382 }
383
384 const MAX_THREADS: usize = 4;
385 let observers_per_thread = observers_snapshot.len().div_ceil(MAX_THREADS);
386
387 for batch in observers_snapshot.chunks(observers_per_thread) {
388 let batch_observers = batch.to_vec();
389 let old_val = old_value.clone();
390 let new_val = new_value.clone();
391
392 thread::spawn(move || {
393 for observer in batch_observers {
394 if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
395 observer(&old_val, &new_val);
396 })) {
397 eprintln!("Observer panic in batch thread: {:?}", e);
398 }
399 }
400 });
401 }
402
403 Ok(())
404 }
405
406 /// Subscribes an observer function to be called when the property changes
407 ///
408 /// The observer function will be called with the old and new values whenever
409 /// the property is modified via `set()` or `set_async()`.
410 ///
411 /// # Arguments
412 ///
413 /// * `observer` - A function wrapped in `Arc` that takes `(&T, &T)` parameters
414 ///
415 /// # Returns
416 ///
417 /// `Ok(ObserverId)` containing a unique identifier for this observer,
418 /// or `Err(PropertyError)` if the lock is poisoned.
419 ///
420 /// # Examples
421 ///
422 /// ```rust
423 /// use observable_property::ObservableProperty;
424 /// use std::sync::Arc;
425 ///
426 /// let property = ObservableProperty::new(0);
427 ///
428 /// let observer_id = match property.subscribe(Arc::new(|old_value, new_value| {
429 /// println!("Property changed from {} to {}", old_value, new_value);
430 /// })) {
431 /// Ok(id) => id,
432 /// Err(e) => {
433 /// eprintln!("Failed to subscribe observer: {}", e);
434 /// return; // or handle error appropriately
435 /// }
436 /// };
437 ///
438 /// // Later, unsubscribe using the returned ID
439 /// if let Err(e) = property.unsubscribe(observer_id) {
440 /// eprintln!("Failed to unsubscribe observer: {}", e);
441 /// }
442 /// ```
443 pub fn subscribe(&self, observer: Observer<T>) -> Result<ObserverId, PropertyError> {
444 let mut prop = self
445 .inner
446 .write()
447 .map_err(|_| PropertyError::WriteLockError {
448 context: "subscribing observer".to_string(),
449 })?;
450
451 let id = prop.next_id;
452 prop.next_id += 1;
453 prop.observers.insert(id, observer);
454 Ok(id)
455 }
456
457 /// Removes an observer by its ID
458 ///
459 /// # Arguments
460 ///
461 /// * `id` - The observer ID returned by `subscribe()`
462 ///
463 /// # Returns
464 ///
465 /// `Ok(bool)` where `true` means the observer was found and removed,
466 /// `false` means no observer with that ID existed.
467 /// Returns `Err(PropertyError)` if the lock is poisoned.
468 ///
469 /// # Examples
470 ///
471 /// ```rust
472 /// use observable_property::ObservableProperty;
473 /// use std::sync::Arc;
474 ///
475 /// let property = ObservableProperty::new(0);
476 ///
477 /// // First, subscribe an observer
478 /// let id = match property.subscribe(Arc::new(|_, _| {})) {
479 /// Ok(id) => id,
480 /// Err(e) => {
481 /// eprintln!("Failed to subscribe observer: {}", e);
482 /// return; // or handle error appropriately
483 /// }
484 /// };
485 ///
486 /// // Then unsubscribe it
487 /// match property.unsubscribe(id) {
488 /// Ok(was_removed) => {
489 /// assert!(was_removed); // Observer existed and was removed
490 /// },
491 /// Err(e) => eprintln!("Failed to unsubscribe observer: {}", e),
492 /// }
493 ///
494 /// // Try unsubscribing again - should return false
495 /// match property.unsubscribe(id) {
496 /// Ok(was_removed_again) => {
497 /// assert!(!was_removed_again); // Observer no longer exists
498 /// },
499 /// Err(e) => eprintln!("Failed to unsubscribe observer: {}", e),
500 /// }
501 /// ```
502 pub fn unsubscribe(&self, id: ObserverId) -> Result<bool, PropertyError> {
503 let mut prop = self
504 .inner
505 .write()
506 .map_err(|_| PropertyError::WriteLockError {
507 context: "unsubscribing observer".to_string(),
508 })?;
509
510 let was_present = prop.observers.remove(&id).is_some();
511 Ok(was_present)
512 }
513
514 /// Subscribes an observer that only gets called when a filter condition is met
515 ///
516 /// This is useful for observing only specific types of changes, such as
517 /// when a value increases or crosses a threshold.
518 ///
519 /// # Arguments
520 ///
521 /// * `observer` - The observer function to call when the filter passes
522 /// * `filter` - A predicate function that receives `(old_value, new_value)` and returns `bool`
523 ///
524 /// # Returns
525 ///
526 /// `Ok(ObserverId)` for the filtered observer, or `Err(PropertyError)` if the lock is poisoned.
527 ///
528 /// # Examples
529 ///
530 /// ```rust
531 /// use observable_property::ObservableProperty;
532 /// use std::sync::Arc;
533 ///
534 /// let property = ObservableProperty::new(0);
535 ///
536 /// // Only notify when value increases
537 /// let id = match property.subscribe_filtered(
538 /// Arc::new(|old, new| println!("Value increased: {} -> {}", old, new)),
539 /// |old, new| new > old
540 /// ) {
541 /// Ok(id) => id,
542 /// Err(e) => {
543 /// eprintln!("Failed to subscribe filtered observer: {}", e);
544 /// return; // or handle error appropriately
545 /// }
546 /// };
547 ///
548 /// // Triggers observer (0 -> 10)
549 /// if let Err(e) = property.set(10) {
550 /// eprintln!("Failed to set property value: {}", e);
551 /// }
552 ///
553 /// // Does NOT trigger observer (10 -> 5)
554 /// if let Err(e) = property.set(5) {
555 /// eprintln!("Failed to set property value: {}", e);
556 /// }
557 ///
558 /// // Triggers observer (5 -> 15)
559 /// if let Err(e) = property.set(15) {
560 /// eprintln!("Failed to set property value: {}", e);
561 /// }
562 /// ```
563 pub fn subscribe_filtered<F>(
564 &self,
565 observer: Observer<T>,
566 filter: F,
567 ) -> Result<ObserverId, PropertyError>
568 where
569 F: Fn(&T, &T) -> bool + Send + Sync + 'static,
570 {
571 let filter = Arc::new(filter);
572 let filtered_observer = Arc::new(move |old_val: &T, new_val: &T| {
573 if filter(old_val, new_val) {
574 observer(old_val, new_val);
575 }
576 });
577
578 self.subscribe(filtered_observer)
579 }
580}
581
582impl<T: Clone> Clone for ObservableProperty<T> {
583 /// Creates a new reference to the same observable property
584 ///
585 /// This creates a new `ObservableProperty` instance that shares the same
586 /// underlying data with the original. Changes made through either instance
587 /// will be visible to observers subscribed through both instances.
588 ///
589 /// # Examples
590 ///
591 /// ```rust
592 /// use observable_property::ObservableProperty;
593 /// use std::sync::Arc;
594 ///
595 /// let property1 = ObservableProperty::new(42);
596 /// let property2 = property1.clone();
597 ///
598 /// match property2.subscribe(Arc::new(|old, new| {
599 /// println!("Observer on property2 saw change: {} -> {}", old, new);
600 /// })) {
601 /// Ok(_) => println!("Observer subscribed successfully"),
602 /// Err(e) => {
603 /// eprintln!("Failed to subscribe observer: {}", e);
604 /// return; // or handle error appropriately
605 /// }
606 /// };
607 ///
608 /// // This change through property1 will trigger the observer on property2
609 /// if let Err(e) = property1.set(100) {
610 /// eprintln!("Failed to set property value: {}", e);
611 /// }
612 /// ```
613 fn clone(&self) -> Self {
614 Self {
615 inner: Arc::clone(&self.inner),
616 }
617 }
618}
619
620impl<T: Clone + std::fmt::Debug + Send + Sync + 'static> std::fmt::Debug for ObservableProperty<T> {
621 /// Debug implementation that shows the current value if accessible
622 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
623 match self.get() {
624 Ok(value) => f.debug_struct("ObservableProperty")
625 .field("value", &value)
626 .field("observers_count", &"[hidden]")
627 .finish(),
628 Err(_) => f.debug_struct("ObservableProperty")
629 .field("value", &"[inaccessible]")
630 .field("observers_count", &"[hidden]")
631 .finish(),
632 }
633 }
634}
635
636#[cfg(test)]
637mod tests {
638 use super::*;
639 use std::sync::atomic::{AtomicUsize, Ordering};
640 use std::time::Duration;
641
642 #[test]
643 fn test_property_creation_and_basic_operations() {
644 let prop = ObservableProperty::new(42);
645
646 // Test initial value
647 match prop.get() {
648 Ok(value) => assert_eq!(value, 42),
649 Err(e) => panic!("Failed to get initial value: {}", e),
650 }
651
652 // Test setting value
653 if let Err(e) = prop.set(100) {
654 panic!("Failed to set value: {}", e);
655 }
656
657 match prop.get() {
658 Ok(value) => assert_eq!(value, 100),
659 Err(e) => panic!("Failed to get updated value: {}", e),
660 }
661 }
662
663 #[test]
664 fn test_observer_subscription_and_notification() {
665 let prop = ObservableProperty::new("initial".to_string());
666 let notification_count = Arc::new(AtomicUsize::new(0));
667 let last_old_value = Arc::new(RwLock::new(String::new()));
668 let last_new_value = Arc::new(RwLock::new(String::new()));
669
670 let count_clone = notification_count.clone();
671 let old_clone = last_old_value.clone();
672 let new_clone = last_new_value.clone();
673
674 let observer_id = match prop.subscribe(Arc::new(move |old, new| {
675 count_clone.fetch_add(1, Ordering::SeqCst);
676 if let Ok(mut old_val) = old_clone.write() {
677 *old_val = old.clone();
678 }
679 if let Ok(mut new_val) = new_clone.write() {
680 *new_val = new.clone();
681 }
682 })) {
683 Ok(id) => id,
684 Err(e) => panic!("Failed to subscribe observer: {}", e),
685 };
686
687 // Change value and verify notification
688 if let Err(e) = prop.set("changed".to_string()) {
689 panic!("Failed to set property value: {}", e);
690 }
691
692 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
693
694 match last_old_value.read() {
695 Ok(old_val) => assert_eq!(*old_val, "initial"),
696 Err(e) => panic!("Failed to read old value: {:?}", e),
697 }
698
699 match last_new_value.read() {
700 Ok(new_val) => assert_eq!(*new_val, "changed"),
701 Err(e) => panic!("Failed to read new value: {:?}", e),
702 }
703
704 // Test unsubscription
705 match prop.unsubscribe(observer_id) {
706 Ok(was_present) => assert!(was_present),
707 Err(e) => panic!("Failed to unsubscribe observer: {}", e),
708 }
709
710 // Change value again - should not notify
711 if let Err(e) = prop.set("not_notified".to_string()) {
712 panic!("Failed to set property value after unsubscribe: {}", e);
713 }
714 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
715 }
716
717 #[test]
718 fn test_filtered_observer() {
719 let prop = ObservableProperty::new(0i32);
720 let notification_count = Arc::new(AtomicUsize::new(0));
721 let count_clone = notification_count.clone();
722
723 // Observer only triggered when value increases
724 let observer_id = match prop.subscribe_filtered(
725 Arc::new(move |_, _| {
726 count_clone.fetch_add(1, Ordering::SeqCst);
727 }),
728 |old, new| new > old
729 ) {
730 Ok(id) => id,
731 Err(e) => panic!("Failed to subscribe filtered observer: {}", e),
732 };
733
734 // Should trigger (0 -> 5)
735 if let Err(e) = prop.set(5) {
736 panic!("Failed to set property value to 5: {}", e);
737 }
738 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
739
740 // Should NOT trigger (5 -> 3)
741 if let Err(e) = prop.set(3) {
742 panic!("Failed to set property value to 3: {}", e);
743 }
744 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
745
746 // Should trigger (3 -> 10)
747 if let Err(e) = prop.set(10) {
748 panic!("Failed to set property value to 10: {}", e);
749 }
750 assert_eq!(notification_count.load(Ordering::SeqCst), 2);
751
752 match prop.unsubscribe(observer_id) {
753 Ok(_) => {},
754 Err(e) => panic!("Failed to unsubscribe filtered observer: {}", e),
755 }
756 }
757
758 #[test]
759 fn test_thread_safety_concurrent_reads() {
760 let prop = Arc::new(ObservableProperty::new(42i32));
761 let num_threads = 10;
762 let reads_per_thread = 100;
763
764 let handles: Vec<_> = (0..num_threads).map(|_| {
765 let prop_clone = prop.clone();
766 thread::spawn(move || {
767 for _ in 0..reads_per_thread {
768 match prop_clone.get() {
769 Ok(value) => assert_eq!(value, 42),
770 Err(e) => panic!("Failed to read property value: {}", e),
771 }
772 thread::sleep(Duration::from_millis(1));
773 }
774 })
775 }).collect();
776
777 for handle in handles {
778 if let Err(e) = handle.join() {
779 panic!("Thread failed to complete: {:?}", e);
780 }
781 }
782 }
783
784 #[test]
785 fn test_async_set_performance() {
786 let prop = ObservableProperty::new(0i32);
787 let slow_observer_count = Arc::new(AtomicUsize::new(0));
788 let count_clone = slow_observer_count.clone();
789
790 // Add observer that simulates slow work
791 let _id = match prop.subscribe(Arc::new(move |_, _| {
792 thread::sleep(Duration::from_millis(50));
793 count_clone.fetch_add(1, Ordering::SeqCst);
794 })) {
795 Ok(id) => id,
796 Err(e) => panic!("Failed to subscribe slow observer: {}", e),
797 };
798
799 // Test synchronous set (should be slow)
800 let start = std::time::Instant::now();
801 if let Err(e) = prop.set(1) {
802 panic!("Failed to set property value synchronously: {}", e);
803 }
804 let sync_duration = start.elapsed();
805
806 // Test asynchronous set (should be fast)
807 let start = std::time::Instant::now();
808 if let Err(e) = prop.set_async(2) {
809 panic!("Failed to set property value asynchronously: {}", e);
810 }
811 let async_duration = start.elapsed();
812
813 // Async should be much faster than sync
814 assert!(async_duration < sync_duration);
815 assert!(async_duration.as_millis() < 10); // Should be very fast
816
817 // Wait for async observer to complete
818 thread::sleep(Duration::from_millis(100));
819
820 // Both observers should have been called
821 assert_eq!(slow_observer_count.load(Ordering::SeqCst), 2);
822 }
823}