observable_property/lib.rs
1//! # Observable Property
2//!
3//! A thread-safe observable property implementation for Rust that allows you to
4//! observe changes to values across multiple threads.
5//!
6//! ## Features
7//!
8//! - **Thread-safe**: Uses `Arc<RwLock<>>` for safe concurrent access
9//! - **Observer pattern**: Subscribe to property changes with callbacks
10//! - **RAII subscriptions**: Automatic cleanup with subscription guards (no manual unsubscribe needed)
11//! - **Filtered observers**: Only notify when specific conditions are met
12//! - **Async notifications**: Non-blocking observer notifications with background threads
13//! - **Panic isolation**: Observer panics don't crash the system
14//! - **Graceful lock recovery**: Continues operation even after lock poisoning from panics
15//! - **Memory protection**: Observer limit (10,000) prevents memory exhaustion
16//! - **Type-safe**: Generic implementation works with any `Clone + Send + Sync` type
17//!
18//! ## Quick Start
19//!
20//! ```rust
21//! use observable_property::ObservableProperty;
22//! use std::sync::Arc;
23//!
24//! // Create an observable property
25//! let property = ObservableProperty::new(42);
26//!
27//! // Subscribe to changes
28//! let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
29//! println!("Value changed from {} to {}", old_value, new_value);
30//! })).map_err(|e| {
31//! eprintln!("Failed to subscribe: {}", e);
32//! e
33//! })?;
34//!
35//! // Change the value (triggers observer)
36//! property.set(100).map_err(|e| {
37//! eprintln!("Failed to set value: {}", e);
38//! e
39//! })?;
40//!
41//! // Unsubscribe when done
42//! property.unsubscribe(observer_id).map_err(|e| {
43//! eprintln!("Failed to unsubscribe: {}", e);
44//! e
45//! })?;
46//! # Ok::<(), observable_property::PropertyError>(())
47//! ```
48//!
49//! ## Multi-threading Example
50//!
51//! ```rust
52//! use observable_property::ObservableProperty;
53//! use std::sync::Arc;
54//! use std::thread;
55//!
56//! let property = Arc::new(ObservableProperty::new(0));
57//! let property_clone = property.clone();
58//!
59//! // Subscribe from one thread
60//! property.subscribe(Arc::new(|old, new| {
61//! println!("Value changed: {} -> {}", old, new);
62//! })).map_err(|e| {
63//! eprintln!("Failed to subscribe: {}", e);
64//! e
65//! })?;
66//!
67//! // Modify from another thread
68//! thread::spawn(move || {
69//! if let Err(e) = property_clone.set(42) {
70//! eprintln!("Failed to set value: {}", e);
71//! }
72//! }).join().expect("Thread panicked");
73//! # Ok::<(), observable_property::PropertyError>(())
74//! ```
75//!
76//! ## RAII Subscriptions (Recommended)
77//!
78//! For automatic cleanup without manual unsubscribe calls, use RAII subscriptions:
79//!
80//! ```rust
81//! use observable_property::ObservableProperty;
82//! use std::sync::Arc;
83//!
84//! # fn main() -> Result<(), observable_property::PropertyError> {
85//! let property = ObservableProperty::new(0);
86//!
87//! {
88//! // Create RAII subscription - automatically cleaned up when dropped
89//! let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
90//! println!("Value changed: {} -> {}", old, new);
91//! }))?;
92//!
93//! property.set(42)?; // Prints: "Value changed: 0 -> 42"
94//!
95//! // Subscription automatically unsubscribes when leaving this scope
96//! }
97//!
98//! // No observer active anymore
99//! property.set(100)?; // No output
100//! # Ok(())
101//! # }
102//! ```
103//!
104//! ## Filtered RAII Subscriptions
105//!
106//! Combine filtering with automatic cleanup for conditional monitoring:
107//!
108//! ```rust
109//! use observable_property::ObservableProperty;
110//! use std::sync::Arc;
111//!
112//! # fn main() -> Result<(), observable_property::PropertyError> {
113//! let temperature = ObservableProperty::new(20.0);
114//!
115//! {
116//! // Monitor only significant temperature increases with automatic cleanup
117//! let _heat_warning = temperature.subscribe_filtered_with_subscription(
118//! Arc::new(|old, new| {
119//! println!("🔥 Heat warning! {:.1}°C -> {:.1}°C", old, new);
120//! }),
121//! |old, new| new > old && (new - old) > 5.0
122//! )?;
123//!
124//! temperature.set(22.0)?; // No warning (only 2°C increase)
125//! temperature.set(28.0)?; // Prints warning (6°C increase from 22°C)
126//!
127//! // Subscription automatically cleaned up here
128//! }
129//!
130//! temperature.set(35.0)?; // No warning (subscription was cleaned up)
131//! # Ok(())
132//! # }
133//! ```
134//!
135//! ## Subscription Management Comparison
136//!
137//! ```rust
138//! use observable_property::ObservableProperty;
139//! use std::sync::Arc;
140//!
141//! # fn main() -> Result<(), observable_property::PropertyError> {
142//! let property = ObservableProperty::new(0);
143//! let observer = Arc::new(|old: &i32, new: &i32| {
144//! println!("Value: {} -> {}", old, new);
145//! });
146//!
147//! // Method 1: Manual subscription management
148//! let observer_id = property.subscribe(observer.clone())?;
149//! property.set(42)?;
150//! property.unsubscribe(observer_id)?; // Manual cleanup required
151//!
152//! // Method 2: RAII subscription management (recommended)
153//! {
154//! let _subscription = property.subscribe_with_subscription(observer)?;
155//! property.set(100)?;
156//! // Automatic cleanup when _subscription goes out of scope
157//! }
158//! # Ok(())
159//! # }
160//! ```
161//!
162//! ## Advanced RAII Patterns
163//!
164//! Comprehensive example showing various RAII subscription patterns:
165//!
166//! ```rust
167//! use observable_property::ObservableProperty;
168//! use std::sync::Arc;
169//!
170//! # fn main() -> Result<(), observable_property::PropertyError> {
171//! // System monitoring example
172//! let cpu_usage = ObservableProperty::new(25.0f64); // percentage
173//! let memory_usage = ObservableProperty::new(1024); // MB
174//! let active_connections = ObservableProperty::new(0u32);
175//!
176//! // Conditional monitoring based on system state
177//! let high_load_monitoring = cpu_usage.get()? > 50.0;
178//!
179//! if high_load_monitoring {
180//! // Critical system monitoring - active only during high load
181//! let _cpu_critical = cpu_usage.subscribe_filtered_with_subscription(
182//! Arc::new(|old, new| {
183//! println!("🚨 Critical CPU usage: {:.1}% -> {:.1}%", old, new);
184//! }),
185//! |_, new| *new > 80.0
186//! )?;
187//!
188//! let _memory_warning = memory_usage.subscribe_filtered_with_subscription(
189//! Arc::new(|old, new| {
190//! println!("⚠️ High memory usage: {}MB -> {}MB", old, new);
191//! }),
192//! |_, new| *new > 8192 // > 8GB
193//! )?;
194//!
195//! // Simulate system load changes
196//! cpu_usage.set(85.0)?; // Would trigger critical alert
197//! memory_usage.set(9216)?; // Would trigger memory warning
198//!
199//! // All monitoring automatically stops when exiting this block
200//! }
201//!
202//! // Connection monitoring with scoped lifetime
203//! {
204//! let _connection_monitor = active_connections.subscribe_with_subscription(
205//! Arc::new(|old, new| {
206//! if new > old {
207//! println!("📈 New connections: {} -> {}", old, new);
208//! } else if new < old {
209//! println!("📉 Connections closed: {} -> {}", old, new);
210//! }
211//! })
212//! )?;
213//!
214//! active_connections.set(5)?; // Prints: "📈 New connections: 0 -> 5"
215//! active_connections.set(3)?; // Prints: "📉 Connections closed: 5 -> 3"
216//! active_connections.set(8)?; // Prints: "📈 New connections: 3 -> 8"
217//!
218//! // Connection monitoring automatically stops here
219//! }
220//!
221//! // No monitoring active anymore
222//! cpu_usage.set(95.0)?; // No output
223//! memory_usage.set(10240)?; // No output
224//! active_connections.set(15)?; // No output
225//!
226//! println!("All monitoring automatically cleaned up!");
227//! # Ok(())
228//! # }
229//! ```
230
231use std::collections::HashMap;
232use std::fmt;
233use std::mem;
234use std::panic;
235use std::sync::{Arc, RwLock};
236use std::thread;
237
238/// Maximum number of background threads used for asynchronous observer notifications
239///
240/// This constant controls the degree of parallelism when using `set_async()` to notify
241/// observers. The observer list is divided into batches, with each batch running in
242/// its own background thread, up to this maximum number of threads.
243///
244/// # Rationale
245///
246/// - **Resource Control**: Prevents unbounded thread creation that could exhaust system resources
247/// - **Performance Balance**: Provides parallelism benefits without excessive context switching overhead
248/// - **Scalability**: Ensures consistent behavior regardless of the number of observers
249/// - **System Responsiveness**: Limits thread contention on multi-core systems
250///
251/// # Implementation Details
252///
253/// When `set_async()` is called:
254/// 1. All observers are collected into a snapshot
255/// 2. Observers are divided into `MAX_THREADS` batches (or fewer if there are fewer observers)
256/// 3. Each batch executes in its own `thread::spawn()` call
257/// 4. Observers within each batch are executed sequentially
258///
259/// For example, with 100 observers and `MAX_THREADS = 4`:
260/// - Batch 1: Observers 1-25 (Thread 1)
261/// - Batch 2: Observers 26-50 (Thread 2)
262/// - Batch 3: Observers 51-75 (Thread 3)
263/// - Batch 4: Observers 76-100 (Thread 4)
264///
265/// # Tuning Considerations
266///
267/// This value can be adjusted based on your application's needs:
268/// - **CPU-bound observers**: Higher values may improve throughput on multi-core systems
269/// - **I/O-bound observers**: Higher values can improve concurrency for network/disk operations
270/// - **Memory-constrained systems**: Lower values reduce thread overhead
271/// - **Real-time systems**: Lower values reduce scheduling unpredictability
272///
273/// # Thread Safety
274///
275/// This constant is used only during the batching calculation and does not affect
276/// the thread safety of the overall system.
277const MAX_THREADS: usize = 4;
278
279/// Maximum number of observers allowed per property instance
280///
281/// This limit prevents memory exhaustion from unbounded observer registration.
282/// Once this limit is reached, attempts to add more observers will fail with
283/// an `InvalidConfiguration` error.
284///
285/// # Rationale
286///
287/// - **Memory Protection**: Prevents unbounded memory growth from observer accumulation
288/// - **Resource Management**: Ensures predictable memory usage in long-running applications
289/// - **Early Detection**: Catches potential memory leaks from forgotten unsubscriptions
290/// - **System Stability**: Prevents out-of-memory conditions in constrained environments
291///
292/// # Tuning Considerations
293///
294/// This value can be adjusted based on your application's needs:
295/// - **High-frequency properties**: May need fewer observers to avoid notification overhead
296/// - **Low-frequency properties**: Can safely support more observers
297/// - **Memory-constrained systems**: Lower values prevent memory pressure
298/// - **Development/testing**: Higher values may be useful for comprehensive test coverage
299///
300/// # Default Value
301///
302/// The default of 10,000 observers provides generous headroom for most applications while
303/// still preventing pathological cases. In practice, most properties have fewer than 100
304/// observers.
305///
306/// # Example Scenarios
307///
308/// - **User sessions**: 1,000 concurrent users, each with 5 properties = ~5,000 observers
309/// - **IoT devices**: 5,000 devices, each with 1-2 observers = ~10,000 observers
310/// - **Monitoring system**: 100 metrics, each with 20 dashboard widgets = ~2,000 observers
311const MAX_OBSERVERS: usize = 10_000;
312
313/// Errors that can occur when working with ObservableProperty
314///
315/// # Note on Lock Poisoning
316///
317/// This implementation uses **graceful degradation** for poisoned locks. When a lock
318/// is poisoned (typically due to a panic in an observer or another thread), the
319/// library automatically recovers the inner value using [`PoisonError::into_inner()`](std::sync::PoisonError::into_inner).
320///
321/// This means:
322/// - **All operations continue to work** even after a lock is poisoned
323/// - No `ReadLockError`, `WriteLockError`, or `PoisonedLock` errors will occur in practice
324/// - The system remains operational and observers continue to function
325///
326/// The error variants are kept for backward compatibility and potential future use cases,
327/// but with the current implementation, poisoned locks are transparent to users.
328///
329/// # Production Benefit
330///
331/// This approach ensures maximum availability and resilience in production systems where
332/// a misbehaving observer shouldn't bring down the entire property system.
333#[derive(Debug, Clone)]
334pub enum PropertyError {
335 /// Failed to acquire a read lock on the property
336 ///
337 /// **Note**: With graceful degradation, this error is unlikely to occur in practice
338 /// as poisoned locks are automatically recovered.
339 ReadLockError {
340 /// Context describing what operation was being attempted
341 context: String,
342 },
343 /// Failed to acquire a write lock on the property
344 ///
345 /// **Note**: With graceful degradation, this error is unlikely to occur in practice
346 /// as poisoned locks are automatically recovered.
347 WriteLockError {
348 /// Context describing what operation was being attempted
349 context: String,
350 },
351 /// Attempted to unsubscribe an observer that doesn't exist
352 ObserverNotFound {
353 /// The ID of the observer that wasn't found
354 id: usize,
355 },
356 /// The property's lock has been poisoned due to a panic in another thread
357 ///
358 /// **Note**: With graceful degradation, this error will not occur in practice
359 /// as the implementation automatically recovers from poisoned locks.
360 PoisonedLock,
361 /// An observer function encountered an error during execution
362 ObserverError {
363 /// Description of what went wrong
364 reason: String,
365 },
366 /// The thread pool for async notifications is exhausted
367 ThreadPoolExhausted,
368 /// Invalid configuration was provided
369 InvalidConfiguration {
370 /// Description of the invalid configuration
371 reason: String,
372 },
373}
374
375impl fmt::Display for PropertyError {
376 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
377 match self {
378 PropertyError::ReadLockError { context } => {
379 write!(f, "Failed to acquire read lock: {}", context)
380 }
381 PropertyError::WriteLockError { context } => {
382 write!(f, "Failed to acquire write lock: {}", context)
383 }
384 PropertyError::ObserverNotFound { id } => {
385 write!(f, "Observer with ID {} not found", id)
386 }
387 PropertyError::PoisonedLock => {
388 write!(
389 f,
390 "Property is in a poisoned state due to a panic in another thread"
391 )
392 }
393 PropertyError::ObserverError { reason } => {
394 write!(f, "Observer execution failed: {}", reason)
395 }
396 PropertyError::ThreadPoolExhausted => {
397 write!(f, "Thread pool is exhausted and cannot spawn more observers")
398 }
399 PropertyError::InvalidConfiguration { reason } => {
400 write!(f, "Invalid configuration: {}", reason)
401 }
402 }
403 }
404}
405
406impl std::error::Error for PropertyError {}
407
408/// Function type for observers that get called when property values change
409pub type Observer<T> = Arc<dyn Fn(&T, &T) + Send + Sync>;
410
411/// Unique identifier for registered observers
412pub type ObserverId = usize;
413
414/// A RAII guard for an observer subscription that automatically unsubscribes when dropped
415///
416/// This struct provides automatic cleanup for observer subscriptions using RAII (Resource
417/// Acquisition Is Initialization) pattern. When a `Subscription` goes out of scope, its
418/// `Drop` implementation automatically removes the associated observer from the property.
419///
420/// This eliminates the need for manual `unsubscribe()` calls and helps prevent resource
421/// leaks in scenarios where observers might otherwise be forgotten.
422///
423/// # Type Requirements
424///
425/// The generic type `T` must implement the same traits as `ObservableProperty<T>`:
426/// - `Clone`: Required for observer notifications
427/// - `Send`: Required for transferring between threads
428/// - `Sync`: Required for concurrent access from multiple threads
429/// - `'static`: Required for observer callbacks that may outlive the original scope
430///
431/// # Examples
432///
433/// ## Basic RAII Subscription
434///
435/// ```rust
436/// use observable_property::ObservableProperty;
437/// use std::sync::Arc;
438///
439/// # fn main() -> Result<(), observable_property::PropertyError> {
440/// let property = ObservableProperty::new(0);
441///
442/// {
443/// // Create subscription - observer is automatically registered
444/// let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
445/// println!("Value changed: {} -> {}", old, new);
446/// }))?;
447///
448/// property.set(42)?; // Observer is called: "Value changed: 0 -> 42"
449///
450/// // When _subscription goes out of scope here, observer is automatically removed
451/// }
452///
453/// property.set(100)?; // No observer output - subscription was automatically cleaned up
454/// # Ok(())
455/// # }
456/// ```
457///
458/// ## Cross-Thread Subscription Management
459///
460/// ```rust
461/// use observable_property::ObservableProperty;
462/// use std::sync::Arc;
463/// use std::thread;
464///
465/// # fn main() -> Result<(), observable_property::PropertyError> {
466/// let property = Arc::new(ObservableProperty::new(0));
467/// let property_clone = property.clone();
468///
469/// // Create subscription in main thread
470/// let subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
471/// println!("Observed: {} -> {}", old, new);
472/// }))?;
473///
474/// // Move subscription to another thread for cleanup
475/// let handle = thread::spawn(move || {
476/// // Subscription is still active here
477/// let _ = property_clone.set(42); // Will trigger observer
478///
479/// // When subscription is dropped here (end of thread), observer is cleaned up
480/// drop(subscription);
481/// });
482///
483/// handle.join().unwrap();
484///
485/// // Observer is no longer active
486/// property.set(100)?; // No output
487/// # Ok(())
488/// # }
489/// ```
490///
491/// ## Conditional Scoped Subscriptions
492///
493/// ```rust
494/// use observable_property::ObservableProperty;
495/// use std::sync::Arc;
496///
497/// # fn main() -> Result<(), observable_property::PropertyError> {
498/// let counter = ObservableProperty::new(0);
499/// let debug_mode = true;
500///
501/// if debug_mode {
502/// let _debug_subscription = counter.subscribe_with_subscription(Arc::new(|old, new| {
503/// println!("Debug: counter {} -> {}", old, new);
504/// }))?;
505///
506/// counter.set(1)?; // Prints debug info
507/// counter.set(2)?; // Prints debug info
508///
509/// // Debug subscription automatically cleaned up when exiting if block
510/// }
511///
512/// counter.set(3)?; // No debug output (subscription was cleaned up)
513/// # Ok(())
514/// # }
515/// ```
516///
517/// # Thread Safety
518///
519/// Like `ObservableProperty` itself, `Subscription` is thread-safe. It can be safely
520/// sent between threads and the automatic cleanup will work correctly even if the
521/// subscription is dropped from a different thread than where it was created.
522pub struct Subscription<T: Clone + Send + Sync + 'static> {
523 inner: Arc<RwLock<InnerProperty<T>>>,
524 id: ObserverId,
525}
526
527impl<T: Clone + Send + Sync + 'static> std::fmt::Debug for Subscription<T> {
528 /// Debug implementation that shows the subscription ID without exposing internals
529 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
530 f.debug_struct("Subscription")
531 .field("id", &self.id)
532 .field("inner", &"[ObservableProperty]")
533 .finish()
534 }
535}
536
537impl<T: Clone + Send + Sync + 'static> Drop for Subscription<T> {
538 /// Automatically removes the associated observer when the subscription is dropped
539 ///
540 /// This implementation provides automatic cleanup by removing the observer
541 /// from the property's observer list when the `Subscription` goes out of scope.
542 ///
543 /// # Error Handling
544 ///
545 /// If the property's lock is poisoned or inaccessible during cleanup, the error
546 /// is silently ignored using the `let _ = ...` pattern. This is intentional
547 /// because:
548 /// 1. Drop implementations should not panic
549 /// 2. If the property is poisoned, it's likely unusable anyway
550 /// 3. There's no meaningful way to handle cleanup errors in a destructor
551 ///
552 /// # Thread Safety
553 ///
554 /// This method is safe to call from any thread, even if the subscription
555 /// was created on a different thread.
556 fn drop(&mut self) {
557 // Graceful degradation: always attempt to clean up, even from poisoned locks
558 match self.inner.write() {
559 Ok(mut guard) => {
560 guard.observers.remove(&self.id);
561 }
562 Err(poisoned) => {
563 // Recover from poisoned lock to ensure cleanup happens
564 let mut guard = poisoned.into_inner();
565 guard.observers.remove(&self.id);
566 }
567 }
568 }
569}
570
571/// A thread-safe observable property that notifies observers when its value changes
572///
573/// This type wraps a value of type `T` and allows multiple observers to be notified
574/// whenever the value is modified. All operations are thread-safe and can be called
575/// from multiple threads concurrently.
576///
577/// # Type Requirements
578///
579/// The generic type `T` must implement:
580/// - `Clone`: Required for returning values and passing them to observers
581/// - `Send`: Required for transferring between threads
582/// - `Sync`: Required for concurrent access from multiple threads
583/// - `'static`: Required for observer callbacks that may outlive the original scope
584///
585/// # Examples
586///
587/// ```rust
588/// use observable_property::ObservableProperty;
589/// use std::sync::Arc;
590///
591/// let property = ObservableProperty::new("initial".to_string());
592///
593/// let observer_id = property.subscribe(Arc::new(|old, new| {
594/// println!("Changed from '{}' to '{}'", old, new);
595/// })).map_err(|e| {
596/// eprintln!("Failed to subscribe: {}", e);
597/// e
598/// })?;
599///
600/// property.set("updated".to_string()).map_err(|e| {
601/// eprintln!("Failed to set value: {}", e);
602/// e
603/// })?; // Prints: Changed from 'initial' to 'updated'
604///
605/// property.unsubscribe(observer_id).map_err(|e| {
606/// eprintln!("Failed to unsubscribe: {}", e);
607/// e
608/// })?;
609/// # Ok::<(), observable_property::PropertyError>(())
610/// ```
611pub struct ObservableProperty<T> {
612 inner: Arc<RwLock<InnerProperty<T>>>,
613 max_threads: usize,
614 max_observers: usize,
615}
616
617struct InnerProperty<T> {
618 value: T,
619 observers: HashMap<ObserverId, Observer<T>>,
620 next_id: ObserverId,
621}
622
623impl<T: Clone + Send + Sync + 'static> ObservableProperty<T> {
624 /// Creates a new observable property with the given initial value
625 ///
626 /// # Arguments
627 ///
628 /// * `initial_value` - The starting value for this property
629 ///
630 /// # Examples
631 ///
632 /// ```rust
633 /// use observable_property::ObservableProperty;
634 ///
635 /// let property = ObservableProperty::new(42);
636 /// match property.get() {
637 /// Ok(value) => assert_eq!(value, 42),
638 /// Err(e) => eprintln!("Failed to get property value: {}", e),
639 /// }
640 /// ```
641 pub fn new(initial_value: T) -> Self {
642 Self {
643 inner: Arc::new(RwLock::new(InnerProperty {
644 value: initial_value,
645 observers: HashMap::new(),
646 next_id: 0,
647 })),
648 max_threads: MAX_THREADS,
649 max_observers: MAX_OBSERVERS,
650 }
651 }
652
653 /// Creates a new observable property with a custom maximum thread count for async notifications
654 ///
655 /// This constructor allows you to customize the maximum number of threads used for
656 /// asynchronous observer notifications via `set_async()`. This is useful for tuning
657 /// performance based on your specific use case and system constraints.
658 ///
659 /// # Arguments
660 ///
661 /// * `initial_value` - The starting value for this property
662 /// * `max_threads` - Maximum number of threads to use for async notifications.
663 /// If 0 is provided, defaults to 4.
664 ///
665 /// # Thread Pool Behavior
666 ///
667 /// When `set_async()` is called, observers are divided into batches and each batch
668 /// runs in its own thread, up to the specified maximum. For example:
669 /// - With 100 observers and `max_threads = 4`: 4 threads with ~25 observers each
670 /// - With 10 observers and `max_threads = 8`: 10 threads with 1 observer each
671 /// - With 2 observers and `max_threads = 4`: 2 threads with 1 observer each
672 ///
673 /// # Use Cases
674 ///
675 /// ## High-Throughput Systems
676 /// ```rust
677 /// use observable_property::ObservableProperty;
678 ///
679 /// // For systems with many CPU cores and CPU-bound observers
680 /// let property = ObservableProperty::with_max_threads(0, 8);
681 /// ```
682 ///
683 /// ## Resource-Constrained Systems
684 /// ```rust
685 /// use observable_property::ObservableProperty;
686 ///
687 /// // For embedded systems or memory-constrained environments
688 /// let property = ObservableProperty::with_max_threads(42, 1);
689 /// ```
690 ///
691 /// ## I/O-Heavy Observers
692 /// ```rust
693 /// use observable_property::ObservableProperty;
694 ///
695 /// // For observers that do network/database operations
696 /// let property = ObservableProperty::with_max_threads("data".to_string(), 16);
697 /// ```
698 ///
699 /// # Performance Considerations
700 ///
701 /// - **Higher values**: Better parallelism but more thread overhead and memory usage
702 /// - **Lower values**: Less overhead but potentially slower async notifications
703 /// - **Optimal range**: Typically between 1 and 2x the number of CPU cores
704 /// - **Zero value**: Automatically uses the default value (4)
705 ///
706 /// # Thread Safety
707 ///
708 /// This setting only affects async notifications (`set_async()`). Synchronous
709 /// operations (`set()`) always execute observers sequentially regardless of this setting.
710 ///
711 /// # Examples
712 ///
713 /// ```rust
714 /// use observable_property::ObservableProperty;
715 /// use std::sync::Arc;
716 ///
717 /// // Create property with custom thread pool size
718 /// let property = ObservableProperty::with_max_threads(42, 2);
719 ///
720 /// // Subscribe observers as usual
721 /// let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
722 /// println!("Value changed: {} -> {}", old, new);
723 /// })).expect("Failed to create subscription");
724 ///
725 /// // Async notifications will use at most 2 threads
726 /// property.set_async(100).expect("Failed to set value asynchronously");
727 /// ```
728 pub fn with_max_threads(initial_value: T, max_threads: usize) -> Self {
729 let max_threads = if max_threads == 0 {
730 MAX_THREADS
731 } else {
732 max_threads
733 };
734 Self {
735 inner: Arc::new(RwLock::new(InnerProperty {
736 value: initial_value,
737 observers: HashMap::new(),
738 next_id: 0,
739 })),
740 max_threads,
741 max_observers: MAX_OBSERVERS,
742 }
743 }
744
745 /// Gets the current value of the property
746 ///
747 /// This method acquires a read lock, which allows multiple concurrent readers
748 /// but will block if a writer currently holds the lock.
749 ///
750 /// # Returns
751 ///
752 /// `Ok(T)` containing a clone of the current value, or `Err(PropertyError)`
753 /// if the lock is poisoned.
754 ///
755 /// # Examples
756 ///
757 /// ```rust
758 /// use observable_property::ObservableProperty;
759 ///
760 /// let property = ObservableProperty::new("hello".to_string());
761 /// match property.get() {
762 /// Ok(value) => assert_eq!(value, "hello"),
763 /// Err(e) => eprintln!("Failed to get property value: {}", e),
764 /// }
765 /// ```
766 pub fn get(&self) -> Result<T, PropertyError> {
767 match self.inner.read() {
768 Ok(prop) => Ok(prop.value.clone()),
769 Err(poisoned) => {
770 // Graceful degradation: recover value from poisoned lock
771 // This allows continued operation even after a panic in another thread
772 Ok(poisoned.into_inner().value.clone())
773 }
774 }
775 }
776
777 /// Sets the property to a new value and notifies all observers
778 ///
779 /// This method will:
780 /// 1. Acquire a write lock (blocking other readers/writers)
781 /// 2. Update the value and capture a snapshot of observers
782 /// 3. Release the lock
783 /// 4. Notify all observers sequentially with the old and new values
784 ///
785 /// Observer notifications are wrapped in panic recovery to prevent one
786 /// misbehaving observer from affecting others.
787 ///
788 /// # Arguments
789 ///
790 /// * `new_value` - The new value to set
791 ///
792 /// # Returns
793 ///
794 /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
795 ///
796 /// # Examples
797 ///
798 /// ```rust
799 /// use observable_property::ObservableProperty;
800 /// use std::sync::Arc;
801 ///
802 /// let property = ObservableProperty::new(10);
803 ///
804 /// property.subscribe(Arc::new(|old, new| {
805 /// println!("Value changed from {} to {}", old, new);
806 /// })).map_err(|e| {
807 /// eprintln!("Failed to subscribe: {}", e);
808 /// e
809 /// })?;
810 ///
811 /// property.set(20).map_err(|e| {
812 /// eprintln!("Failed to set property value: {}", e);
813 /// e
814 /// })?; // Triggers observer notification
815 /// # Ok::<(), observable_property::PropertyError>(())
816 /// ```
817 pub fn set(&self, new_value: T) -> Result<(), PropertyError> {
818 let (old_value, observers_snapshot) = {
819 let mut prop = match self.inner.write() {
820 Ok(guard) => guard,
821 Err(poisoned) => {
822 // Graceful degradation: recover from poisoned write lock
823 // Clear the poison flag by taking ownership of the inner value
824 poisoned.into_inner()
825 }
826 };
827
828 // Performance optimization: use mem::replace to avoid one clone operation
829 let old_value = mem::replace(&mut prop.value, new_value.clone());
830 let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
831 (old_value, observers_snapshot)
832 };
833
834 for observer in observers_snapshot {
835 if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
836 observer(&old_value, &new_value);
837 })) {
838 eprintln!("Observer panic: {:?}", e);
839 }
840 }
841
842 Ok(())
843 }
844
845 /// Sets the property to a new value and notifies observers asynchronously
846 ///
847 /// This method is similar to `set()` but spawns observers in background threads
848 /// for non-blocking operation. This is useful when observers might perform
849 /// time-consuming operations.
850 ///
851 /// Observers are batched into groups and each batch runs in its own thread
852 /// to limit resource usage while still providing parallelism.
853 ///
854 /// # Thread Management (Fire-and-Forget Pattern)
855 ///
856 /// **Important**: This method uses a fire-and-forget pattern. Spawned threads are
857 /// **not joined** and run independently in the background. This design is intentional
858 /// for non-blocking behavior but has important implications:
859 ///
860 /// ## Characteristics:
861 /// - ✅ **Non-blocking**: Returns immediately without waiting for observers
862 /// - ✅ **High performance**: No synchronization overhead
863 /// - ⚠️ **No completion guarantee**: Thread may still be running when method returns
864 /// - ⚠️ **No error propagation**: Observer errors are logged but not returned
865 /// - ⚠️ **Testing caveat**: May need explicit delays to observe side effects
866 /// - ⚠️ **Ordering caveat**: Multiple rapid `set_async()` calls may result in observers
867 /// receiving notifications out of order due to thread scheduling. Use `set()` if
868 /// sequential ordering is critical.
869 ///
870 /// ## Use Cases:
871 /// - **UI updates**: Fire updates without blocking the main thread
872 /// - **Logging**: Asynchronous logging that doesn't block operations
873 /// - **Metrics**: Non-critical telemetry that can be lost
874 /// - **Notifications**: Fire-and-forget alerts or messages
875 ///
876 /// ## When NOT to Use:
877 /// - **Critical operations**: Use `set()` if you need guarantees
878 /// - **Transactional updates**: Use `set()` for atomic operations
879 /// - **Sequential dependencies**: If next operation depends on observer completion
880 ///
881 /// ## Testing Considerations:
882 /// ```rust
883 /// use observable_property::ObservableProperty;
884 /// use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
885 /// use std::time::Duration;
886 ///
887 /// # fn main() -> Result<(), observable_property::PropertyError> {
888 /// let property = ObservableProperty::new(0);
889 /// let was_called = Arc::new(AtomicBool::new(false));
890 /// let flag = was_called.clone();
891 ///
892 /// property.subscribe(Arc::new(move |_, _| {
893 /// flag.store(true, Ordering::SeqCst);
894 /// }))?;
895 ///
896 /// property.set_async(42)?;
897 ///
898 /// // ⚠️ Immediate check might fail - thread may not have run yet
899 /// // assert!(was_called.load(Ordering::SeqCst)); // May fail!
900 ///
901 /// // ✅ Add a small delay to allow background thread to complete
902 /// std::thread::sleep(Duration::from_millis(10));
903 /// assert!(was_called.load(Ordering::SeqCst)); // Now reliable
904 /// # Ok(())
905 /// # }
906 /// ```
907 ///
908 /// # Arguments
909 ///
910 /// * `new_value` - The new value to set
911 ///
912 /// # Returns
913 ///
914 /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
915 /// Note that this only indicates the property was updated successfully;
916 /// observer execution happens asynchronously and errors are not returned.
917 ///
918 /// # Examples
919 ///
920 /// ## Basic Usage
921 ///
922 /// ```rust
923 /// use observable_property::ObservableProperty;
924 /// use std::sync::Arc;
925 /// use std::time::Duration;
926 ///
927 /// let property = ObservableProperty::new(0);
928 ///
929 /// property.subscribe(Arc::new(|old, new| {
930 /// // This observer does slow work but won't block the caller
931 /// std::thread::sleep(Duration::from_millis(100));
932 /// println!("Slow observer: {} -> {}", old, new);
933 /// })).map_err(|e| {
934 /// eprintln!("Failed to subscribe: {}", e);
935 /// e
936 /// })?;
937 ///
938 /// // This returns immediately even though observer is slow
939 /// property.set_async(42).map_err(|e| {
940 /// eprintln!("Failed to set value asynchronously: {}", e);
941 /// e
942 /// })?;
943 ///
944 /// // Continue working immediately - observer runs in background
945 /// println!("Main thread continues without waiting");
946 /// # Ok::<(), observable_property::PropertyError>(())
947 /// ```
948 ///
949 /// ## Multiple Rapid Updates
950 ///
951 /// ```rust
952 /// use observable_property::ObservableProperty;
953 /// use std::sync::Arc;
954 ///
955 /// # fn main() -> Result<(), observable_property::PropertyError> {
956 /// let property = ObservableProperty::new(0);
957 ///
958 /// property.subscribe(Arc::new(|old, new| {
959 /// // Expensive operation (e.g., database update, API call)
960 /// println!("Processing: {} -> {}", old, new);
961 /// }))?;
962 ///
963 /// // All of these return immediately - observers run in parallel
964 /// property.set_async(1)?;
965 /// property.set_async(2)?;
966 /// property.set_async(3)?;
967 /// property.set_async(4)?;
968 /// property.set_async(5)?;
969 ///
970 /// // All observer calls are now running in background threads
971 /// # Ok(())
972 /// # }
973 /// ```
974 pub fn set_async(&self, new_value: T) -> Result<(), PropertyError> {
975 let (old_value, observers_snapshot) = {
976 let mut prop = match self.inner.write() {
977 Ok(guard) => guard,
978 Err(poisoned) => {
979 // Graceful degradation: recover from poisoned write lock
980 poisoned.into_inner()
981 }
982 };
983
984 // Performance optimization: use mem::replace to avoid one clone operation
985 let old_value = mem::replace(&mut prop.value, new_value.clone());
986 let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
987 (old_value, observers_snapshot)
988 };
989
990 if observers_snapshot.is_empty() {
991 return Ok(());
992 }
993
994 let observers_per_thread = observers_snapshot.len().div_ceil(self.max_threads);
995
996 // Fire-and-forget pattern: Spawn threads without joining
997 // This is intentional for non-blocking behavior. Observers run independently
998 // and the caller continues immediately without waiting for completion.
999 // Trade-offs:
1000 // ✅ Non-blocking, high performance
1001 // ⚠️ No completion guarantee, no error propagation to caller
1002 for batch in observers_snapshot.chunks(observers_per_thread) {
1003 let batch_observers = batch.to_vec();
1004 let old_val = old_value.clone();
1005 let new_val = new_value.clone();
1006
1007 thread::spawn(move || {
1008 for observer in batch_observers {
1009 if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
1010 observer(&old_val, &new_val);
1011 })) {
1012 eprintln!("Observer panic in batch thread: {:?}", e);
1013 }
1014 }
1015 });
1016 // Thread handle intentionally dropped - fire-and-forget pattern
1017 }
1018
1019 Ok(())
1020 }
1021
1022 /// Subscribes an observer function to be called when the property changes
1023 ///
1024 /// The observer function will be called with the old and new values whenever
1025 /// the property is modified via `set()` or `set_async()`.
1026 ///
1027 /// # Arguments
1028 ///
1029 /// * `observer` - A function wrapped in `Arc` that takes `(&T, &T)` parameters
1030 ///
1031 /// # Returns
1032 ///
1033 /// `Ok(ObserverId)` containing a unique identifier for this observer,
1034 /// or `Err(PropertyError::InvalidConfiguration)` if the maximum observer limit is exceeded.
1035 ///
1036 /// # Observer Limit
1037 ///
1038 /// To prevent memory exhaustion, there is a maximum limit of observers per property
1039 /// (currently set to 10,000). If you attempt to add more observers than this limit,
1040 /// the subscription will fail with an `InvalidConfiguration` error.
1041 ///
1042 /// This protection helps prevent:
1043 /// - Memory leaks from forgotten unsubscriptions
1044 /// - Unbounded memory growth in long-running applications
1045 /// - Out-of-memory conditions in resource-constrained environments
1046 ///
1047 /// # Examples
1048 ///
1049 /// ```rust
1050 /// use observable_property::ObservableProperty;
1051 /// use std::sync::Arc;
1052 ///
1053 /// let property = ObservableProperty::new(0);
1054 ///
1055 /// let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
1056 /// println!("Property changed from {} to {}", old_value, new_value);
1057 /// })).map_err(|e| {
1058 /// eprintln!("Failed to subscribe observer: {}", e);
1059 /// e
1060 /// })?;
1061 ///
1062 /// // Later, unsubscribe using the returned ID
1063 /// property.unsubscribe(observer_id).map_err(|e| {
1064 /// eprintln!("Failed to unsubscribe observer: {}", e);
1065 /// e
1066 /// })?;
1067 /// # Ok::<(), observable_property::PropertyError>(())
1068 /// ```
1069 pub fn subscribe(&self, observer: Observer<T>) -> Result<ObserverId, PropertyError> {
1070 let mut prop = match self.inner.write() {
1071 Ok(guard) => guard,
1072 Err(poisoned) => {
1073 // Graceful degradation: recover from poisoned write lock
1074 poisoned.into_inner()
1075 }
1076 };
1077
1078 // Check observer limit to prevent memory exhaustion
1079 if prop.observers.len() >= self.max_observers {
1080 return Err(PropertyError::InvalidConfiguration {
1081 reason: format!(
1082 "Maximum observer limit ({}) exceeded. Current observers: {}. \
1083 Consider unsubscribing unused observers to free resources.",
1084 self.max_observers,
1085 prop.observers.len()
1086 ),
1087 });
1088 }
1089
1090 let id = prop.next_id;
1091 // Use wrapping_add to prevent overflow panics in production
1092 // After ~usize::MAX subscriptions, IDs will wrap around
1093 // This is acceptable as old observers are typically unsubscribed
1094 prop.next_id = prop.next_id.wrapping_add(1);
1095 prop.observers.insert(id, observer);
1096 Ok(id)
1097 }
1098
1099 /// Removes an observer identified by its ID
1100 ///
1101 /// # Arguments
1102 ///
1103 /// * `id` - The observer ID returned by `subscribe()`
1104 ///
1105 /// # Returns
1106 ///
1107 /// `Ok(bool)` where `true` means the observer was found and removed,
1108 /// `false` means no observer with that ID existed.
1109 /// Returns `Err(PropertyError)` if the lock is poisoned.
1110 ///
1111 /// # Examples
1112 ///
1113 /// ```rust
1114 /// use observable_property::ObservableProperty;
1115 /// use std::sync::Arc;
1116 ///
1117 /// let property = ObservableProperty::new(0);
1118 /// let id = property.subscribe(Arc::new(|_, _| {})).map_err(|e| {
1119 /// eprintln!("Failed to subscribe: {}", e);
1120 /// e
1121 /// })?;
1122 ///
1123 /// let was_removed = property.unsubscribe(id).map_err(|e| {
1124 /// eprintln!("Failed to unsubscribe: {}", e);
1125 /// e
1126 /// })?;
1127 /// assert!(was_removed); // Observer existed and was removed
1128 ///
1129 /// let was_removed_again = property.unsubscribe(id).map_err(|e| {
1130 /// eprintln!("Failed to unsubscribe again: {}", e);
1131 /// e
1132 /// })?;
1133 /// assert!(!was_removed_again); // Observer no longer exists
1134 /// # Ok::<(), observable_property::PropertyError>(())
1135 /// ```
1136 pub fn unsubscribe(&self, id: ObserverId) -> Result<bool, PropertyError> {
1137 let mut prop = match self.inner.write() {
1138 Ok(guard) => guard,
1139 Err(poisoned) => {
1140 // Graceful degradation: recover from poisoned write lock
1141 poisoned.into_inner()
1142 }
1143 };
1144
1145 let was_present = prop.observers.remove(&id).is_some();
1146 Ok(was_present)
1147 }
1148
1149 /// Subscribes an observer that only gets called when a filter condition is met
1150 ///
1151 /// This is useful for observing only specific types of changes, such as
1152 /// when a value increases or crosses a threshold.
1153 ///
1154 /// # Arguments
1155 ///
1156 /// * `observer` - The observer function to call when the filter passes
1157 /// * `filter` - A predicate function that receives `(old_value, new_value)` and returns `bool`
1158 ///
1159 /// # Returns
1160 ///
1161 /// `Ok(ObserverId)` for the filtered observer, or `Err(PropertyError)` if the lock is poisoned.
1162 ///
1163 /// # Examples
1164 ///
1165 /// ```rust
1166 /// use observable_property::ObservableProperty;
1167 /// use std::sync::Arc;
1168 ///
1169 /// let property = ObservableProperty::new(0);
1170 ///
1171 /// // Only notify when value increases
1172 /// let id = property.subscribe_filtered(
1173 /// Arc::new(|old, new| println!("Value increased: {} -> {}", old, new)),
1174 /// |old, new| new > old
1175 /// ).map_err(|e| {
1176 /// eprintln!("Failed to subscribe filtered observer: {}", e);
1177 /// e
1178 /// })?;
1179 ///
1180 /// property.set(10).map_err(|e| {
1181 /// eprintln!("Failed to set value: {}", e);
1182 /// e
1183 /// })?; // Triggers observer (0 -> 10)
1184 /// property.set(5).map_err(|e| {
1185 /// eprintln!("Failed to set value: {}", e);
1186 /// e
1187 /// })?; // Does NOT trigger observer (10 -> 5)
1188 /// property.set(15).map_err(|e| {
1189 /// eprintln!("Failed to set value: {}", e);
1190 /// e
1191 /// })?; // Triggers observer (5 -> 15)
1192 /// # Ok::<(), observable_property::PropertyError>(())
1193 /// ```
1194 pub fn subscribe_filtered<F>(
1195 &self,
1196 observer: Observer<T>,
1197 filter: F,
1198 ) -> Result<ObserverId, PropertyError>
1199 where
1200 F: Fn(&T, &T) -> bool + Send + Sync + 'static,
1201 {
1202 let filter = Arc::new(filter);
1203 let filtered_observer = Arc::new(move |old_val: &T, new_val: &T| {
1204 if filter(old_val, new_val) {
1205 observer(old_val, new_val);
1206 }
1207 });
1208
1209 self.subscribe(filtered_observer)
1210 }
1211
1212 /// Notifies all observers with a batch of changes
1213 ///
1214 /// This method allows you to trigger observer notifications for multiple
1215 /// value changes efficiently. Unlike individual `set()` calls, this method
1216 /// acquires the observer list once and then notifies all observers with each
1217 /// change in the batch.
1218 ///
1219 /// # Performance Characteristics
1220 ///
1221 /// - **Lock optimization**: Acquires read lock only to snapshot observers, then releases it
1222 /// - **Non-blocking**: Other operations can proceed during observer notifications
1223 /// - **Panic isolation**: Individual observer panics don't affect other observers
1224 ///
1225 /// # Arguments
1226 ///
1227 /// * `changes` - A vector of tuples `(old_value, new_value)` to notify observers about
1228 ///
1229 /// # Returns
1230 ///
1231 /// `Ok(())` if successful. Observer errors are logged but don't cause the method to fail.
1232 ///
1233 /// # Examples
1234 ///
1235 /// ```rust
1236 /// use observable_property::ObservableProperty;
1237 /// use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
1238 ///
1239 /// # fn main() -> Result<(), observable_property::PropertyError> {
1240 /// let property = ObservableProperty::new(0);
1241 /// let call_count = Arc::new(AtomicUsize::new(0));
1242 /// let count_clone = call_count.clone();
1243 ///
1244 /// property.subscribe(Arc::new(move |old, new| {
1245 /// count_clone.fetch_add(1, Ordering::SeqCst);
1246 /// println!("Change: {} -> {}", old, new);
1247 /// }))?;
1248 ///
1249 /// // Notify with multiple changes at once
1250 /// property.notify_observers_batch(vec![
1251 /// (0, 10),
1252 /// (10, 20),
1253 /// (20, 30),
1254 /// ])?;
1255 ///
1256 /// assert_eq!(call_count.load(Ordering::SeqCst), 3);
1257 /// # Ok(())
1258 /// # }
1259 /// ```
1260 ///
1261 /// # Note
1262 ///
1263 /// This method does NOT update the property's actual value - it only triggers
1264 /// observer notifications. Use `set()` if you want to update the value and
1265 /// notify observers.
1266 pub fn notify_observers_batch(&self, changes: Vec<(T, T)>) -> Result<(), PropertyError> {
1267 // Acquire lock, clone observers, then release lock immediately
1268 // This prevents blocking other operations during potentially long notification process
1269 let observers_snapshot = {
1270 let prop = match self.inner.read() {
1271 Ok(guard) => guard,
1272 Err(poisoned) => {
1273 // Graceful degradation: recover from poisoned read lock
1274 poisoned.into_inner()
1275 }
1276 };
1277 prop.observers.values().cloned().collect::<Vec<_>>()
1278 }; // Lock released here
1279
1280 // Notify observers without holding the lock
1281 for (old_val, new_val) in changes {
1282 for observer in &observers_snapshot {
1283 // Wrap in panic recovery like other notification methods
1284 if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
1285 observer(&old_val, &new_val);
1286 })) {
1287 eprintln!("Observer panic in batch notification: {:?}", e);
1288 }
1289 }
1290 }
1291 Ok(())
1292 }
1293
1294 /// Subscribes an observer and returns a RAII guard for automatic cleanup
1295 ///
1296 /// This method is similar to `subscribe()` but returns a `Subscription` object
1297 /// that automatically removes the observer when it goes out of scope. This
1298 /// provides a more convenient and safer alternative to manual subscription
1299 /// management.
1300 ///
1301 /// # Arguments
1302 ///
1303 /// * `observer` - A function wrapped in `Arc` that takes `(&T, &T)` parameters
1304 ///
1305 /// # Returns
1306 ///
1307 /// `Ok(Subscription<T>)` containing a RAII guard for the observer,
1308 /// or `Err(PropertyError)` if the lock is poisoned.
1309 ///
1310 /// # Examples
1311 ///
1312 /// ## Basic RAII Subscription
1313 ///
1314 /// ```rust
1315 /// use observable_property::ObservableProperty;
1316 /// use std::sync::Arc;
1317 ///
1318 /// # fn main() -> Result<(), observable_property::PropertyError> {
1319 /// let property = ObservableProperty::new(0);
1320 ///
1321 /// {
1322 /// let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
1323 /// println!("Value: {} -> {}", old, new);
1324 /// }))?;
1325 ///
1326 /// property.set(42)?; // Prints: "Value: 0 -> 42"
1327 /// property.set(100)?; // Prints: "Value: 42 -> 100"
1328 ///
1329 /// // Automatic cleanup when _subscription goes out of scope
1330 /// }
1331 ///
1332 /// property.set(200)?; // No output - subscription was cleaned up
1333 /// # Ok(())
1334 /// # }
1335 /// ```
1336 ///
1337 /// ## Comparison with Manual Management
1338 ///
1339 /// ```rust
1340 /// use observable_property::ObservableProperty;
1341 /// use std::sync::Arc;
1342 ///
1343 /// # fn main() -> Result<(), observable_property::PropertyError> {
1344 /// let property = ObservableProperty::new("initial".to_string());
1345 ///
1346 /// // Method 1: Manual subscription management (traditional approach)
1347 /// let observer_id = property.subscribe(Arc::new(|old, new| {
1348 /// println!("Manual: {} -> {}", old, new);
1349 /// }))?;
1350 ///
1351 /// // Method 2: RAII subscription management (recommended)
1352 /// let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
1353 /// println!("RAII: {} -> {}", old, new);
1354 /// }))?;
1355 ///
1356 /// // Both observers will be called
1357 /// property.set("changed".to_string())?;
1358 /// // Prints:
1359 /// // "Manual: initial -> changed"
1360 /// // "RAII: initial -> changed"
1361 ///
1362 /// // Manual cleanup required for first observer
1363 /// property.unsubscribe(observer_id)?;
1364 ///
1365 /// // Second observer (_subscription) is automatically cleaned up when
1366 /// // the variable goes out of scope - no manual intervention needed
1367 /// # Ok(())
1368 /// # }
1369 /// ```
1370 ///
1371 /// ## Error Handling with Early Returns
1372 ///
1373 /// ```rust
1374 /// use observable_property::ObservableProperty;
1375 /// use std::sync::Arc;
1376 ///
1377 /// fn process_with_monitoring(property: &ObservableProperty<i32>) -> Result<(), observable_property::PropertyError> {
1378 /// let _monitoring = property.subscribe_with_subscription(Arc::new(|old, new| {
1379 /// println!("Processing: {} -> {}", old, new);
1380 /// }))?;
1381 ///
1382 /// property.set(1)?;
1383 ///
1384 /// if property.get()? > 0 {
1385 /// return Ok(()); // Subscription automatically cleaned up on early return
1386 /// }
1387 ///
1388 /// property.set(2)?;
1389 /// Ok(()) // Subscription automatically cleaned up on normal return
1390 /// }
1391 ///
1392 /// # fn main() -> Result<(), observable_property::PropertyError> {
1393 /// let property = ObservableProperty::new(0);
1394 /// process_with_monitoring(&property)?; // Monitoring active only during function call
1395 /// property.set(99)?; // No monitoring output - subscription was cleaned up
1396 /// # Ok(())
1397 /// # }
1398 /// ```
1399 ///
1400 /// ## Multi-threaded Subscription Management
1401 ///
1402 /// ```rust
1403 /// use observable_property::ObservableProperty;
1404 /// use std::sync::Arc;
1405 /// use std::thread;
1406 ///
1407 /// # fn main() -> Result<(), observable_property::PropertyError> {
1408 /// let property = Arc::new(ObservableProperty::new(0));
1409 /// let property_clone = property.clone();
1410 ///
1411 /// let handle = thread::spawn(move || -> Result<(), observable_property::PropertyError> {
1412 /// let _subscription = property_clone.subscribe_with_subscription(Arc::new(|old, new| {
1413 /// println!("Thread observer: {} -> {}", old, new);
1414 /// }))?;
1415 ///
1416 /// property_clone.set(42)?; // Prints: "Thread observer: 0 -> 42"
1417 ///
1418 /// // Subscription automatically cleaned up when thread ends
1419 /// Ok(())
1420 /// });
1421 ///
1422 /// handle.join().unwrap()?;
1423 /// property.set(100)?; // No output - thread subscription was cleaned up
1424 /// # Ok(())
1425 /// # }
1426 /// ```
1427 ///
1428 /// # Use Cases
1429 ///
1430 /// This method is particularly useful in scenarios such as:
1431 /// - Temporary observers that should be active only during a specific scope
1432 /// - Error-prone code where manual cleanup might be forgotten
1433 /// - Complex control flow where multiple exit points make manual cleanup difficult
1434 /// - Resource-constrained environments where observer leaks are problematic
1435 pub fn subscribe_with_subscription(
1436 &self,
1437 observer: Observer<T>,
1438 ) -> Result<Subscription<T>, PropertyError> {
1439 let id = self.subscribe(observer)?;
1440 Ok(Subscription {
1441 inner: Arc::clone(&self.inner),
1442 id,
1443 })
1444 }
1445
1446 /// Subscribes a filtered observer and returns a RAII guard for automatic cleanup
1447 ///
1448 /// This method combines the functionality of `subscribe_filtered()` with the automatic
1449 /// cleanup benefits of `subscribe_with_subscription()`. The observer will only be
1450 /// called when the filter condition is satisfied, and it will be automatically
1451 /// unsubscribed when the returned `Subscription` goes out of scope.
1452 ///
1453 /// # Arguments
1454 ///
1455 /// * `observer` - The observer function to call when the filter passes
1456 /// * `filter` - A predicate function that receives `(old_value, new_value)` and returns `bool`
1457 ///
1458 /// # Returns
1459 ///
1460 /// `Ok(Subscription<T>)` containing a RAII guard for the filtered observer,
1461 /// or `Err(PropertyError)` if the lock is poisoned.
1462 ///
1463 /// # Examples
1464 ///
1465 /// ## Basic Filtered RAII Subscription
1466 ///
1467 /// ```rust
1468 /// use observable_property::ObservableProperty;
1469 /// use std::sync::Arc;
1470 ///
1471 /// # fn main() -> Result<(), observable_property::PropertyError> {
1472 /// let counter = ObservableProperty::new(0);
1473 ///
1474 /// {
1475 /// // Monitor only increases with automatic cleanup
1476 /// let _increase_monitor = counter.subscribe_filtered_with_subscription(
1477 /// Arc::new(|old, new| {
1478 /// println!("Counter increased: {} -> {}", old, new);
1479 /// }),
1480 /// |old, new| new > old
1481 /// )?;
1482 ///
1483 /// counter.set(5)?; // Prints: "Counter increased: 0 -> 5"
1484 /// counter.set(3)?; // No output (decrease)
1485 /// counter.set(7)?; // Prints: "Counter increased: 3 -> 7"
1486 ///
1487 /// // Subscription automatically cleaned up when leaving scope
1488 /// }
1489 ///
1490 /// counter.set(10)?; // No output - subscription was cleaned up
1491 /// # Ok(())
1492 /// # }
1493 /// ```
1494 ///
1495 /// ## Multi-Condition Temperature Monitoring
1496 ///
1497 /// ```rust
1498 /// use observable_property::ObservableProperty;
1499 /// use std::sync::Arc;
1500 ///
1501 /// # fn main() -> Result<(), observable_property::PropertyError> {
1502 /// let temperature = ObservableProperty::new(20.0_f64);
1503 ///
1504 /// {
1505 /// // Create filtered subscription that only triggers for significant temperature increases
1506 /// let _heat_warning = temperature.subscribe_filtered_with_subscription(
1507 /// Arc::new(|old_temp, new_temp| {
1508 /// println!("🔥 Heat warning! Temperature rose from {:.1}°C to {:.1}°C",
1509 /// old_temp, new_temp);
1510 /// }),
1511 /// |old, new| new > old && (new - old) > 5.0 // Only trigger for increases > 5°C
1512 /// )?;
1513 ///
1514 /// // Create another filtered subscription for cooling alerts
1515 /// let _cooling_alert = temperature.subscribe_filtered_with_subscription(
1516 /// Arc::new(|old_temp, new_temp| {
1517 /// println!("❄️ Cooling alert! Temperature dropped from {:.1}°C to {:.1}°C",
1518 /// old_temp, new_temp);
1519 /// }),
1520 /// |old, new| new < old && (old - new) > 3.0 // Only trigger for decreases > 3°C
1521 /// )?;
1522 ///
1523 /// // Test the filters
1524 /// temperature.set(22.0)?; // No alerts (increase of only 2°C)
1525 /// temperature.set(28.0)?; // Heat warning triggered (increase of 6°C from 22°C)
1526 /// temperature.set(23.0)?; // Cooling alert triggered (decrease of 5°C)
1527 ///
1528 /// // Both subscriptions are automatically cleaned up when they go out of scope
1529 /// }
1530 ///
1531 /// temperature.set(35.0)?; // No alerts - subscriptions were cleaned up
1532 /// # Ok(())
1533 /// # }
1534 /// ```
1535 ///
1536 /// ## Conditional Monitoring with Complex Filters
1537 ///
1538 /// ```rust
1539 /// use observable_property::ObservableProperty;
1540 /// use std::sync::Arc;
1541 ///
1542 /// # fn main() -> Result<(), observable_property::PropertyError> {
1543 /// let stock_price = ObservableProperty::new(100.0_f64);
1544 ///
1545 /// {
1546 /// // Monitor significant price movements (> 5% change)
1547 /// let _volatility_alert = stock_price.subscribe_filtered_with_subscription(
1548 /// Arc::new(|old_price, new_price| {
1549 /// let change_percent = ((new_price - old_price) / old_price * 100.0).abs();
1550 /// println!("📈 Significant price movement: ${:.2} -> ${:.2} ({:.1}%)",
1551 /// old_price, new_price, change_percent);
1552 /// }),
1553 /// |old, new| {
1554 /// let change_percent = ((new - old) / old * 100.0).abs();
1555 /// change_percent > 5.0 // Trigger on > 5% change
1556 /// }
1557 /// )?;
1558 ///
1559 /// stock_price.set(103.0)?; // No alert (3% change)
1560 /// stock_price.set(108.0)?; // Alert triggered (4.85% from 103, but let's say it rounds up)
1561 /// stock_price.set(95.0)?; // Alert triggered (12% decrease)
1562 ///
1563 /// // Subscription automatically cleaned up when leaving scope
1564 /// }
1565 ///
1566 /// stock_price.set(200.0)?; // No alert - monitoring ended
1567 /// # Ok(())
1568 /// # }
1569 /// ```
1570 ///
1571 /// ## Cross-Thread Filtered Monitoring
1572 ///
1573 /// ```rust
1574 /// use observable_property::ObservableProperty;
1575 /// use std::sync::Arc;
1576 /// use std::thread;
1577 /// use std::time::Duration;
1578 ///
1579 /// # fn main() -> Result<(), observable_property::PropertyError> {
1580 /// let network_latency = Arc::new(ObservableProperty::new(50)); // milliseconds
1581 /// let latency_clone = network_latency.clone();
1582 ///
1583 /// let monitor_handle = thread::spawn(move || -> Result<(), observable_property::PropertyError> {
1584 /// // Monitor high latency in background thread with automatic cleanup
1585 /// let _high_latency_alert = latency_clone.subscribe_filtered_with_subscription(
1586 /// Arc::new(|old_ms, new_ms| {
1587 /// println!("⚠️ High latency detected: {}ms -> {}ms", old_ms, new_ms);
1588 /// }),
1589 /// |_, new| *new > 100 // Alert when latency exceeds 100ms
1590 /// )?;
1591 ///
1592 /// // Simulate monitoring for a short time
1593 /// thread::sleep(Duration::from_millis(10));
1594 ///
1595 /// // Subscription automatically cleaned up when thread ends
1596 /// Ok(())
1597 /// });
1598 ///
1599 /// // Simulate network conditions
1600 /// network_latency.set(80)?; // No alert (under threshold)
1601 /// network_latency.set(150)?; // Alert triggered in background thread
1602 ///
1603 /// monitor_handle.join().unwrap()?;
1604 /// network_latency.set(200)?; // No alert - background monitoring ended
1605 /// # Ok(())
1606 /// # }
1607 /// ```
1608 ///
1609 /// # Use Cases
1610 ///
1611 /// This method is ideal for:
1612 /// - Threshold-based monitoring with automatic cleanup
1613 /// - Temporary conditional observers in specific code blocks
1614 /// - Event-driven systems where observers should be active only during certain phases
1615 /// - Resource management scenarios where filtered observers have limited lifetimes
1616 ///
1617 /// # Performance Notes
1618 ///
1619 /// The filter function is evaluated for every property change, so it should be
1620 /// lightweight. Complex filtering logic should be optimized to avoid performance
1621 /// bottlenecks, especially in high-frequency update scenarios.
1622 pub fn subscribe_filtered_with_subscription<F>(
1623 &self,
1624 observer: Observer<T>,
1625 filter: F,
1626 ) -> Result<Subscription<T>, PropertyError>
1627 where
1628 F: Fn(&T, &T) -> bool + Send + Sync + 'static,
1629 {
1630 let id = self.subscribe_filtered(observer, filter)?;
1631 Ok(Subscription {
1632 inner: Arc::clone(&self.inner),
1633 id,
1634 })
1635 }
1636
1637 /// Creates a new observable property with full configuration control
1638 ///
1639 /// This constructor provides complete control over the property's configuration,
1640 /// allowing you to customize both thread pool size and maximum observer count.
1641 ///
1642 /// # Arguments
1643 ///
1644 /// * `initial_value` - The starting value for this property
1645 /// * `max_threads` - Maximum threads for async notifications (0 = use default)
1646 /// * `max_observers` - Maximum number of allowed observers (0 = use default)
1647 ///
1648 /// # Examples
1649 ///
1650 /// ```rust
1651 /// use observable_property::ObservableProperty;
1652 ///
1653 /// // Create a property optimized for high-frequency updates with many observers
1654 /// let property = ObservableProperty::with_config(0, 8, 50000);
1655 /// assert_eq!(property.get().unwrap(), 0);
1656 /// ```
1657 pub fn with_config(initial_value: T, max_threads: usize, max_observers: usize) -> Self {
1658 Self {
1659 inner: Arc::new(RwLock::new(InnerProperty {
1660 value: initial_value,
1661 observers: HashMap::new(),
1662 next_id: 0,
1663 })),
1664 max_threads: if max_threads == 0 { MAX_THREADS } else { max_threads },
1665 max_observers: if max_observers == 0 { MAX_OBSERVERS } else { max_observers },
1666 }
1667 }
1668
1669 /// Returns the current number of active observers
1670 ///
1671 /// This method is useful for debugging, monitoring, and testing to verify
1672 /// that observers are being properly managed and cleaned up.
1673 ///
1674 /// # Returns
1675 ///
1676 /// The number of currently subscribed observers, or 0 if the lock is poisoned.
1677 ///
1678 /// # Examples
1679 ///
1680 /// ```rust
1681 /// use observable_property::ObservableProperty;
1682 /// use std::sync::Arc;
1683 ///
1684 /// # fn main() -> Result<(), observable_property::PropertyError> {
1685 /// let property = ObservableProperty::new(42);
1686 /// assert_eq!(property.observer_count(), 0);
1687 ///
1688 /// let id1 = property.subscribe(Arc::new(|_, _| {}))?;
1689 /// assert_eq!(property.observer_count(), 1);
1690 ///
1691 /// let id2 = property.subscribe(Arc::new(|_, _| {}))?;
1692 /// assert_eq!(property.observer_count(), 2);
1693 ///
1694 /// property.unsubscribe(id1)?;
1695 /// assert_eq!(property.observer_count(), 1);
1696 /// # Ok(())
1697 /// # }
1698 /// ```
1699 pub fn observer_count(&self) -> usize {
1700 match self.inner.read() {
1701 Ok(prop) => prop.observers.len(),
1702 Err(poisoned) => {
1703 // Graceful degradation: recover from poisoned lock
1704 poisoned.into_inner().observers.len()
1705 }
1706 }
1707 }
1708
1709 /// Gets the current value without Result wrapping
1710 ///
1711 /// This is a convenience method that returns `None` if the lock is poisoned
1712 /// (which shouldn't happen with graceful degradation) instead of a Result.
1713 ///
1714 /// # Returns
1715 ///
1716 /// `Some(T)` containing the current value, or `None` if somehow inaccessible.
1717 ///
1718 /// # Examples
1719 ///
1720 /// ```rust
1721 /// use observable_property::ObservableProperty;
1722 ///
1723 /// let property = ObservableProperty::new(42);
1724 /// assert_eq!(property.try_get(), Some(42));
1725 /// ```
1726 pub fn try_get(&self) -> Option<T> {
1727 self.get().ok()
1728 }
1729
1730 /// Atomically modifies the property value using a closure
1731 ///
1732 /// This method allows you to update the property based on its current value
1733 /// in a single atomic operation. The closure receives a mutable reference to
1734 /// the value and can modify it in place.
1735 ///
1736 /// # Arguments
1737 ///
1738 /// * `f` - A closure that receives `&mut T` and modifies it
1739 ///
1740 /// # Returns
1741 ///
1742 /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
1743 ///
1744 /// # Examples
1745 ///
1746 /// ```rust
1747 /// use observable_property::ObservableProperty;
1748 /// use std::sync::Arc;
1749 ///
1750 /// # fn main() -> Result<(), observable_property::PropertyError> {
1751 /// let counter = ObservableProperty::new(0);
1752 ///
1753 /// counter.subscribe(Arc::new(|old, new| {
1754 /// println!("Counter: {} -> {}", old, new);
1755 /// }))?;
1756 ///
1757 /// // Increment counter atomically
1758 /// counter.modify(|value| *value += 1)?;
1759 /// assert_eq!(counter.get()?, 1);
1760 ///
1761 /// // Double the counter atomically
1762 /// counter.modify(|value| *value *= 2)?;
1763 /// assert_eq!(counter.get()?, 2);
1764 /// # Ok(())
1765 /// # }
1766 /// ```
1767 pub fn modify<F>(&self, f: F) -> Result<(), PropertyError>
1768 where
1769 F: FnOnce(&mut T),
1770 {
1771 let (old_value, new_value, observers_snapshot) = {
1772 let mut prop = match self.inner.write() {
1773 Ok(guard) => guard,
1774 Err(poisoned) => {
1775 // Graceful degradation: recover from poisoned write lock
1776 poisoned.into_inner()
1777 }
1778 };
1779
1780 let old_value = prop.value.clone();
1781 f(&mut prop.value);
1782 let new_value = prop.value.clone();
1783 let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
1784 (old_value, new_value, observers_snapshot)
1785 };
1786
1787 // Notify observers with old and new values
1788 for observer in observers_snapshot {
1789 if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
1790 observer(&old_value, &new_value);
1791 })) {
1792 eprintln!("Observer panic in modify: {:?}", e);
1793 }
1794 }
1795
1796 Ok(())
1797 }
1798}
1799
1800impl<T: Clone + Default + Send + Sync + 'static> ObservableProperty<T> {
1801 /// Gets the current value or returns the default if inaccessible
1802 ///
1803 /// This convenience method is only available when `T` implements `Default`.
1804 /// It provides a fallback to `T::default()` if the value cannot be read.
1805 ///
1806 /// # Examples
1807 ///
1808 /// ```rust
1809 /// use observable_property::ObservableProperty;
1810 ///
1811 /// let property = ObservableProperty::new(42);
1812 /// assert_eq!(property.get_or_default(), 42);
1813 ///
1814 /// // Even if somehow inaccessible, returns default
1815 /// let empty_property: ObservableProperty<i32> = ObservableProperty::new(0);
1816 /// assert_eq!(empty_property.get_or_default(), 0);
1817 /// ```
1818 pub fn get_or_default(&self) -> T {
1819 self.get().unwrap_or_default()
1820 }
1821}
1822
1823impl<T: Clone> Clone for ObservableProperty<T> {
1824 /// Creates a new reference to the same observable property
1825 ///
1826 /// This creates a new `ObservableProperty` instance that shares the same
1827 /// underlying data with the original. Changes made through either instance
1828 /// will be visible to observers subscribed through both instances.
1829 ///
1830 /// # Examples
1831 ///
1832 /// ```rust
1833 /// use observable_property::ObservableProperty;
1834 /// use std::sync::Arc;
1835 ///
1836 /// let property1 = ObservableProperty::new(42);
1837 /// let property2 = property1.clone();
1838 ///
1839 /// property2.subscribe(Arc::new(|old, new| {
1840 /// println!("Observer on property2 saw change: {} -> {}", old, new);
1841 /// })).map_err(|e| {
1842 /// eprintln!("Failed to subscribe: {}", e);
1843 /// e
1844 /// })?;
1845 ///
1846 /// // This change through property1 will trigger the observer on property2
1847 /// property1.set(100).map_err(|e| {
1848 /// eprintln!("Failed to set value: {}", e);
1849 /// e
1850 /// })?;
1851 /// # Ok::<(), observable_property::PropertyError>(())
1852 /// ```
1853 fn clone(&self) -> Self {
1854 Self {
1855 inner: Arc::clone(&self.inner),
1856 max_threads: self.max_threads,
1857 max_observers: self.max_observers,
1858 }
1859 }
1860}
1861
1862impl<T: Clone + std::fmt::Debug + Send + Sync + 'static> std::fmt::Debug for ObservableProperty<T> {
1863 /// Debug implementation that shows the current value if accessible
1864 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1865 match self.get() {
1866 Ok(value) => f
1867 .debug_struct("ObservableProperty")
1868 .field("value", &value)
1869 .field("observers_count", &"[hidden]")
1870 .field("max_threads", &self.max_threads)
1871 .field("max_observers", &self.max_observers)
1872 .finish(),
1873 Err(_) => f
1874 .debug_struct("ObservableProperty")
1875 .field("value", &"[inaccessible]")
1876 .field("observers_count", &"[hidden]")
1877 .field("max_threads", &self.max_threads)
1878 .field("max_observers", &self.max_observers)
1879 .finish(),
1880 }
1881 }
1882}
1883
1884#[cfg(test)]
1885mod tests {
1886 use super::*;
1887 use std::sync::atomic::{AtomicUsize, Ordering};
1888 use std::time::Duration;
1889
1890 #[test]
1891 fn test_property_creation_and_basic_operations() {
1892 let prop = ObservableProperty::new(42);
1893
1894 // Test initial value
1895 match prop.get() {
1896 Ok(value) => assert_eq!(value, 42),
1897 Err(e) => panic!("Failed to get initial value: {}", e),
1898 }
1899
1900 // Test setting value
1901 if let Err(e) = prop.set(100) {
1902 panic!("Failed to set value: {}", e);
1903 }
1904
1905 match prop.get() {
1906 Ok(value) => assert_eq!(value, 100),
1907 Err(e) => panic!("Failed to get updated value: {}", e),
1908 }
1909 }
1910
1911 #[test]
1912 fn test_observer_subscription_and_notification() {
1913 let prop = ObservableProperty::new("initial".to_string());
1914 let notification_count = Arc::new(AtomicUsize::new(0));
1915 let last_old_value = Arc::new(RwLock::new(String::new()));
1916 let last_new_value = Arc::new(RwLock::new(String::new()));
1917
1918 let count_clone = notification_count.clone();
1919 let old_clone = last_old_value.clone();
1920 let new_clone = last_new_value.clone();
1921
1922 let observer_id = match prop.subscribe(Arc::new(move |old, new| {
1923 count_clone.fetch_add(1, Ordering::SeqCst);
1924 if let Ok(mut old_val) = old_clone.write() {
1925 *old_val = old.clone();
1926 }
1927 if let Ok(mut new_val) = new_clone.write() {
1928 *new_val = new.clone();
1929 }
1930 })) {
1931 Ok(id) => id,
1932 Err(e) => panic!("Failed to subscribe observer: {}", e),
1933 };
1934
1935 // Change value and verify notification
1936 if let Err(e) = prop.set("changed".to_string()) {
1937 panic!("Failed to set property value: {}", e);
1938 }
1939
1940 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1941
1942 match last_old_value.read() {
1943 Ok(old_val) => assert_eq!(*old_val, "initial"),
1944 Err(e) => panic!("Failed to read old value: {:?}", e),
1945 }
1946
1947 match last_new_value.read() {
1948 Ok(new_val) => assert_eq!(*new_val, "changed"),
1949 Err(e) => panic!("Failed to read new value: {:?}", e),
1950 }
1951
1952 // Test unsubscription
1953 match prop.unsubscribe(observer_id) {
1954 Ok(was_present) => assert!(was_present),
1955 Err(e) => panic!("Failed to unsubscribe observer: {}", e),
1956 }
1957
1958 // Change value again - should not notify
1959 if let Err(e) = prop.set("not_notified".to_string()) {
1960 panic!("Failed to set property value after unsubscribe: {}", e);
1961 }
1962 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1963 }
1964
1965 #[test]
1966 fn test_filtered_observer() {
1967 let prop = ObservableProperty::new(0i32);
1968 let notification_count = Arc::new(AtomicUsize::new(0));
1969 let count_clone = notification_count.clone();
1970
1971 // Observer only triggered when value increases
1972 let observer_id = match prop.subscribe_filtered(
1973 Arc::new(move |_, _| {
1974 count_clone.fetch_add(1, Ordering::SeqCst);
1975 }),
1976 |old, new| new > old,
1977 ) {
1978 Ok(id) => id,
1979 Err(e) => panic!("Failed to subscribe filtered observer: {}", e),
1980 };
1981
1982 // Should trigger (0 -> 5)
1983 if let Err(e) = prop.set(5) {
1984 panic!("Failed to set property value to 5: {}", e);
1985 }
1986 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1987
1988 // Should NOT trigger (5 -> 3)
1989 if let Err(e) = prop.set(3) {
1990 panic!("Failed to set property value to 3: {}", e);
1991 }
1992 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1993
1994 // Should trigger (3 -> 10)
1995 if let Err(e) = prop.set(10) {
1996 panic!("Failed to set property value to 10: {}", e);
1997 }
1998 assert_eq!(notification_count.load(Ordering::SeqCst), 2);
1999
2000 match prop.unsubscribe(observer_id) {
2001 Ok(_) => {}
2002 Err(e) => panic!("Failed to unsubscribe filtered observer: {}", e),
2003 }
2004 }
2005
2006 #[test]
2007 fn test_thread_safety_concurrent_reads() {
2008 let prop = Arc::new(ObservableProperty::new(42i32));
2009 let num_threads = 10;
2010 let reads_per_thread = 100;
2011
2012 let handles: Vec<_> = (0..num_threads)
2013 .map(|_| {
2014 let prop_clone = prop.clone();
2015 thread::spawn(move || {
2016 for _ in 0..reads_per_thread {
2017 match prop_clone.get() {
2018 Ok(value) => assert_eq!(value, 42),
2019 Err(e) => panic!("Failed to read property value: {}", e),
2020 }
2021 thread::sleep(Duration::from_millis(1));
2022 }
2023 })
2024 })
2025 .collect();
2026
2027 for handle in handles {
2028 if let Err(e) = handle.join() {
2029 panic!("Thread failed to complete: {:?}", e);
2030 }
2031 }
2032 }
2033
2034 #[test]
2035 fn test_async_set_performance() {
2036 let prop = ObservableProperty::new(0i32);
2037 let slow_observer_count = Arc::new(AtomicUsize::new(0));
2038 let count_clone = slow_observer_count.clone();
2039
2040 // Add observer that simulates slow work
2041 let _id = match prop.subscribe(Arc::new(move |_, _| {
2042 thread::sleep(Duration::from_millis(50));
2043 count_clone.fetch_add(1, Ordering::SeqCst);
2044 })) {
2045 Ok(id) => id,
2046 Err(e) => panic!("Failed to subscribe slow observer: {}", e),
2047 };
2048
2049 // Test synchronous set (should be slow)
2050 let start = std::time::Instant::now();
2051 if let Err(e) = prop.set(1) {
2052 panic!("Failed to set property value synchronously: {}", e);
2053 }
2054 let sync_duration = start.elapsed();
2055
2056 // Test asynchronous set (should be fast)
2057 let start = std::time::Instant::now();
2058 if let Err(e) = prop.set_async(2) {
2059 panic!("Failed to set property value asynchronously: {}", e);
2060 }
2061 let async_duration = start.elapsed();
2062
2063 // Async should be much faster than sync
2064 assert!(async_duration < sync_duration);
2065 assert!(async_duration.as_millis() < 10); // Should be very fast
2066
2067 // Wait for async observer to complete
2068 thread::sleep(Duration::from_millis(100));
2069
2070 // Both observers should have been called
2071 assert_eq!(slow_observer_count.load(Ordering::SeqCst), 2);
2072 }
2073
2074 #[test]
2075 fn test_lock_poisoning() {
2076 // Create a property that we'll poison
2077 let prop = Arc::new(ObservableProperty::new(0));
2078 let prop_clone = prop.clone();
2079
2080 // Create a thread that will deliberately poison the lock
2081 let poison_thread = thread::spawn(move || {
2082 // Get write lock and then panic, which will poison the lock
2083 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for poisoning test");
2084 panic!("Deliberate panic to poison the lock");
2085 });
2086
2087 // Wait for the thread to complete (it will panic)
2088 let _ = poison_thread.join();
2089
2090 // With graceful degradation, operations should succeed even with poisoned locks
2091 // The implementation recovers the inner value using into_inner()
2092
2093 // get() should succeed by recovering from poisoned lock
2094 match prop.get() {
2095 Ok(value) => assert_eq!(value, 0), // Should recover the value
2096 Err(e) => panic!("get() should succeed with graceful degradation, got error: {:?}", e),
2097 }
2098
2099 // set() should succeed by recovering from poisoned lock
2100 match prop.set(42) {
2101 Ok(_) => {}, // Expected success with graceful degradation
2102 Err(e) => panic!("set() should succeed with graceful degradation, got error: {:?}", e),
2103 }
2104
2105 // Verify the value was actually set
2106 assert_eq!(prop.get().expect("Failed to get value after set"), 42);
2107
2108 // subscribe() should succeed by recovering from poisoned lock
2109 match prop.subscribe(Arc::new(|_, _| {})) {
2110 Ok(_) => {}, // Expected success with graceful degradation
2111 Err(e) => panic!("subscribe() should succeed with graceful degradation, got error: {:?}", e),
2112 }
2113 }
2114
2115 #[test]
2116 fn test_observer_panic_isolation() {
2117 let prop = ObservableProperty::new(0);
2118 let call_counts = Arc::new(AtomicUsize::new(0));
2119
2120 // First observer will panic
2121 let panic_observer_id = prop
2122 .subscribe(Arc::new(|_, _| {
2123 panic!("This observer deliberately panics");
2124 }))
2125 .expect("Failed to subscribe panic observer");
2126
2127 // Second observer should still be called despite first one panicking
2128 let counts = call_counts.clone();
2129 let normal_observer_id = prop
2130 .subscribe(Arc::new(move |_, _| {
2131 counts.fetch_add(1, Ordering::SeqCst);
2132 }))
2133 .expect("Failed to subscribe normal observer");
2134
2135 // Trigger the observers - this shouldn't panic despite the first observer panicking
2136 prop.set(42).expect("Failed to set property value");
2137
2138 // Verify the second observer was still called
2139 assert_eq!(call_counts.load(Ordering::SeqCst), 1);
2140
2141 // Clean up
2142 prop.unsubscribe(panic_observer_id).expect("Failed to unsubscribe panic observer");
2143 prop.unsubscribe(normal_observer_id).expect("Failed to unsubscribe normal observer");
2144 }
2145
2146 #[test]
2147 fn test_unsubscribe_nonexistent_observer() {
2148 let property = ObservableProperty::new(0);
2149
2150 // Generate a valid observer ID
2151 let valid_id = property.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe test observer");
2152
2153 // Create an ID that doesn't exist (valid_id + 1000 should not exist)
2154 let nonexistent_id = valid_id + 1000;
2155
2156 // Test unsubscribing a nonexistent observer
2157 match property.unsubscribe(nonexistent_id) {
2158 Ok(was_present) => {
2159 assert!(
2160 !was_present,
2161 "Unsubscribe should return false for nonexistent ID"
2162 );
2163 }
2164 Err(e) => panic!("Unsubscribe returned error: {:?}", e),
2165 }
2166
2167 // Also verify that unsubscribing twice returns false the second time
2168 property.unsubscribe(valid_id).expect("Failed first unsubscribe"); // First unsubscribe should return true
2169
2170 let result = property.unsubscribe(valid_id).expect("Failed second unsubscribe");
2171 assert!(!result, "Second unsubscribe should return false");
2172 }
2173
2174 #[test]
2175 fn test_observer_id_wraparound() {
2176 let prop = ObservableProperty::new(0);
2177
2178 // Test that observer IDs increment properly and don't wrap around unexpectedly
2179 let id1 = prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe observer 1");
2180 let id2 = prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe observer 2");
2181 let id3 = prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe observer 3");
2182
2183 assert!(id2 > id1, "Observer IDs should increment");
2184 assert!(id3 > id2, "Observer IDs should continue incrementing");
2185 assert_eq!(id2, id1 + 1, "Observer IDs should increment by 1");
2186 assert_eq!(id3, id2 + 1, "Observer IDs should increment by 1");
2187
2188 // Clean up
2189 prop.unsubscribe(id1).expect("Failed to unsubscribe observer 1");
2190 prop.unsubscribe(id2).expect("Failed to unsubscribe observer 2");
2191 prop.unsubscribe(id3).expect("Failed to unsubscribe observer 3");
2192 }
2193
2194 #[test]
2195 fn test_concurrent_subscribe_unsubscribe() {
2196 let prop = Arc::new(ObservableProperty::new(0));
2197 let num_threads = 8;
2198 let operations_per_thread = 100;
2199
2200 let handles: Vec<_> = (0..num_threads)
2201 .map(|thread_id| {
2202 let prop_clone = prop.clone();
2203 thread::spawn(move || {
2204 let mut local_ids = Vec::new();
2205
2206 for i in 0..operations_per_thread {
2207 // Subscribe an observer
2208 let observer_id = prop_clone
2209 .subscribe(Arc::new(move |old, new| {
2210 // Do some work to simulate real observer
2211 let _ = thread_id + i + old + new;
2212 }))
2213 .expect("Subscribe should succeed");
2214
2215 local_ids.push(observer_id);
2216
2217 // Occasionally unsubscribe some observers
2218 if i % 10 == 0 && !local_ids.is_empty() {
2219 let idx = i % local_ids.len();
2220 let id_to_remove = local_ids.remove(idx);
2221 prop_clone
2222 .unsubscribe(id_to_remove)
2223 .expect("Unsubscribe should succeed");
2224 }
2225 }
2226
2227 // Clean up remaining observers
2228 for id in local_ids {
2229 prop_clone
2230 .unsubscribe(id)
2231 .expect("Final cleanup should succeed");
2232 }
2233 })
2234 })
2235 .collect();
2236
2237 // Wait for all threads to complete
2238 for handle in handles {
2239 handle.join().expect("Thread should complete successfully");
2240 }
2241
2242 // Property should still be functional
2243 prop.set(42)
2244 .expect("Property should still work after concurrent operations");
2245 }
2246
2247 #[test]
2248 fn test_multiple_observer_panics_isolation() {
2249 let prop = ObservableProperty::new(0);
2250 let successful_calls = Arc::new(AtomicUsize::new(0));
2251
2252 // Create multiple observers that will panic
2253 let _panic_id1 = prop
2254 .subscribe(Arc::new(|_, _| {
2255 panic!("First panic observer");
2256 }))
2257 .expect("Failed to subscribe first panic observer");
2258
2259 let _panic_id2 = prop
2260 .subscribe(Arc::new(|_, _| {
2261 panic!("Second panic observer");
2262 }))
2263 .expect("Failed to subscribe second panic observer");
2264
2265 // Create observers that should succeed despite the panics
2266 let count1 = successful_calls.clone();
2267 let _success_id1 = prop
2268 .subscribe(Arc::new(move |_, _| {
2269 count1.fetch_add(1, Ordering::SeqCst);
2270 }))
2271 .expect("Failed to subscribe first success observer");
2272
2273 let count2 = successful_calls.clone();
2274 let _success_id2 = prop
2275 .subscribe(Arc::new(move |_, _| {
2276 count2.fetch_add(1, Ordering::SeqCst);
2277 }))
2278 .expect("Failed to subscribe second success observer");
2279
2280 // Trigger all observers
2281 prop.set(42).expect("Failed to set property value for panic isolation test");
2282
2283 // Both successful observers should have been called despite the panics
2284 assert_eq!(successful_calls.load(Ordering::SeqCst), 2);
2285 }
2286
2287 #[test]
2288 fn test_async_observer_panic_isolation() {
2289 let prop = ObservableProperty::new(0);
2290 let successful_calls = Arc::new(AtomicUsize::new(0));
2291
2292 // Create observer that will panic
2293 let _panic_id = prop
2294 .subscribe(Arc::new(|_, _| {
2295 panic!("Async panic observer");
2296 }))
2297 .expect("Failed to subscribe async panic observer");
2298
2299 // Create observer that should succeed
2300 let count = successful_calls.clone();
2301 let _success_id = prop
2302 .subscribe(Arc::new(move |_, _| {
2303 count.fetch_add(1, Ordering::SeqCst);
2304 }))
2305 .expect("Failed to subscribe async success observer");
2306
2307 // Use async set to trigger observers in background threads
2308 prop.set_async(42).expect("Failed to set property value asynchronously");
2309
2310 // Wait for async observers to complete
2311 thread::sleep(Duration::from_millis(100));
2312
2313 // The successful observer should have been called despite the panic
2314 assert_eq!(successful_calls.load(Ordering::SeqCst), 1);
2315 }
2316
2317 #[test]
2318 fn test_very_large_observer_count() {
2319 let prop = ObservableProperty::new(0);
2320 let total_calls = Arc::new(AtomicUsize::new(0));
2321 let observer_count = 1000;
2322
2323 // Subscribe many observers
2324 let mut observer_ids = Vec::with_capacity(observer_count);
2325 for i in 0..observer_count {
2326 let count = total_calls.clone();
2327 let id = prop
2328 .subscribe(Arc::new(move |old, new| {
2329 count.fetch_add(1, Ordering::SeqCst);
2330 // Verify we got the right values
2331 assert_eq!(*old, 0);
2332 assert_eq!(*new, i + 1);
2333 }))
2334 .expect("Failed to subscribe large observer count test observer");
2335 observer_ids.push(id);
2336 }
2337
2338 // Trigger all observers
2339 prop.set(observer_count).expect("Failed to set property value for large observer count test");
2340
2341 // All observers should have been called
2342 assert_eq!(total_calls.load(Ordering::SeqCst), observer_count);
2343
2344 // Clean up
2345 for id in observer_ids {
2346 prop.unsubscribe(id).expect("Failed to unsubscribe observer in large count test");
2347 }
2348 }
2349
2350 #[test]
2351 fn test_observer_with_mutable_state() {
2352 let prop = ObservableProperty::new(0);
2353 let call_history = Arc::new(RwLock::new(Vec::new()));
2354
2355 let history = call_history.clone();
2356 let observer_id = prop
2357 .subscribe(Arc::new(move |old, new| {
2358 if let Ok(mut hist) = history.write() {
2359 hist.push((*old, *new));
2360 }
2361 }))
2362 .expect("Failed to subscribe mutable state observer");
2363
2364 // Make several changes
2365 prop.set(1).expect("Failed to set property to 1");
2366 prop.set(2).expect("Failed to set property to 2");
2367 prop.set(3).expect("Failed to set property to 3");
2368
2369 // Verify the history was recorded correctly
2370 let history = call_history.read().expect("Failed to read call history");
2371 assert_eq!(history.len(), 3);
2372 assert_eq!(history[0], (0, 1));
2373 assert_eq!(history[1], (1, 2));
2374 assert_eq!(history[2], (2, 3));
2375
2376 prop.unsubscribe(observer_id).expect("Failed to unsubscribe mutable state observer");
2377 }
2378
2379 #[test]
2380 fn test_subscription_automatic_cleanup() {
2381 let prop = ObservableProperty::new(0);
2382 let call_count = Arc::new(AtomicUsize::new(0));
2383
2384 // Test that subscription automatically cleans up when dropped
2385 {
2386 let count = call_count.clone();
2387 let _subscription = prop
2388 .subscribe_with_subscription(Arc::new(move |_, _| {
2389 count.fetch_add(1, Ordering::SeqCst);
2390 }))
2391 .expect("Failed to create subscription for automatic cleanup test");
2392
2393 // Observer should be active while subscription is in scope
2394 prop.set(1).expect("Failed to set property value in subscription test");
2395 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2396
2397 // Subscription goes out of scope here and should auto-cleanup
2398 }
2399
2400 // Observer should no longer be active after subscription dropped
2401 prop.set(2).expect("Failed to set property value after subscription dropped");
2402 assert_eq!(call_count.load(Ordering::SeqCst), 1); // No additional calls
2403 }
2404
2405 #[test]
2406 fn test_subscription_explicit_drop() {
2407 let prop = ObservableProperty::new(0);
2408 let call_count = Arc::new(AtomicUsize::new(0));
2409
2410 let count = call_count.clone();
2411 let subscription = prop
2412 .subscribe_with_subscription(Arc::new(move |_, _| {
2413 count.fetch_add(1, Ordering::SeqCst);
2414 }))
2415 .expect("Failed to create subscription for explicit drop test");
2416
2417 // Observer should be active
2418 prop.set(1).expect("Failed to set property value before explicit drop");
2419 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2420
2421 // Explicitly drop the subscription
2422 drop(subscription);
2423
2424 // Observer should no longer be active
2425 prop.set(2).expect("Failed to set property value after explicit drop");
2426 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2427 }
2428
2429 #[test]
2430 fn test_multiple_subscriptions_with_cleanup() {
2431 let prop = ObservableProperty::new(0);
2432 let call_count1 = Arc::new(AtomicUsize::new(0));
2433 let call_count2 = Arc::new(AtomicUsize::new(0));
2434 let call_count3 = Arc::new(AtomicUsize::new(0));
2435
2436 let count1 = call_count1.clone();
2437 let count2 = call_count2.clone();
2438 let count3 = call_count3.clone();
2439
2440 let subscription1 = prop
2441 .subscribe_with_subscription(Arc::new(move |_, _| {
2442 count1.fetch_add(1, Ordering::SeqCst);
2443 }))
2444 .expect("Failed to create first subscription");
2445
2446 let subscription2 = prop
2447 .subscribe_with_subscription(Arc::new(move |_, _| {
2448 count2.fetch_add(1, Ordering::SeqCst);
2449 }))
2450 .expect("Failed to create second subscription");
2451
2452 let subscription3 = prop
2453 .subscribe_with_subscription(Arc::new(move |_, _| {
2454 count3.fetch_add(1, Ordering::SeqCst);
2455 }))
2456 .expect("Failed to create third subscription");
2457
2458 // All observers should be active
2459 prop.set(1).expect("Failed to set property value with all subscriptions");
2460 assert_eq!(call_count1.load(Ordering::SeqCst), 1);
2461 assert_eq!(call_count2.load(Ordering::SeqCst), 1);
2462 assert_eq!(call_count3.load(Ordering::SeqCst), 1);
2463
2464 // Drop second subscription
2465 drop(subscription2);
2466
2467 // Only first and third should be active
2468 prop.set(2).expect("Failed to set property value with partial subscriptions");
2469 assert_eq!(call_count1.load(Ordering::SeqCst), 2);
2470 assert_eq!(call_count2.load(Ordering::SeqCst), 1); // No change
2471 assert_eq!(call_count3.load(Ordering::SeqCst), 2);
2472
2473 // Drop remaining subscriptions
2474 drop(subscription1);
2475 drop(subscription3);
2476
2477 // No observers should be active
2478 prop.set(3).expect("Failed to set property value with no subscriptions");
2479 assert_eq!(call_count1.load(Ordering::SeqCst), 2);
2480 assert_eq!(call_count2.load(Ordering::SeqCst), 1);
2481 assert_eq!(call_count3.load(Ordering::SeqCst), 2);
2482 }
2483
2484 #[test]
2485 fn test_subscription_drop_with_poisoned_lock() {
2486 let prop = Arc::new(ObservableProperty::new(0));
2487 let prop_clone = prop.clone();
2488
2489 // Create a subscription
2490 let call_count = Arc::new(AtomicUsize::new(0));
2491 let count = call_count.clone();
2492 let subscription = prop
2493 .subscribe_with_subscription(Arc::new(move |_, _| {
2494 count.fetch_add(1, Ordering::SeqCst);
2495 }))
2496 .expect("Failed to create subscription for poisoned lock test");
2497
2498 // Poison the lock by panicking while holding a write lock
2499 let poison_thread = thread::spawn(move || {
2500 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for poisoning test");
2501 panic!("Deliberate panic to poison the lock");
2502 });
2503 let _ = poison_thread.join(); // Ignore the panic result
2504
2505 // Dropping the subscription should not panic even with poisoned lock
2506 // This tests that the Drop implementation handles poisoned locks gracefully
2507 drop(subscription); // Should complete without panic
2508
2509 // Test passes if we reach here without panicking
2510 }
2511
2512 #[test]
2513 fn test_subscription_vs_manual_unsubscribe() {
2514 let prop = ObservableProperty::new(0);
2515 let auto_count = Arc::new(AtomicUsize::new(0));
2516 let manual_count = Arc::new(AtomicUsize::new(0));
2517
2518 // Manual subscription
2519 let manual_count_clone = manual_count.clone();
2520 let manual_id = prop
2521 .subscribe(Arc::new(move |_, _| {
2522 manual_count_clone.fetch_add(1, Ordering::SeqCst);
2523 }))
2524 .expect("Failed to create manual subscription");
2525
2526 // Automatic subscription
2527 let auto_count_clone = auto_count.clone();
2528 let _auto_subscription = prop
2529 .subscribe_with_subscription(Arc::new(move |_, _| {
2530 auto_count_clone.fetch_add(1, Ordering::SeqCst);
2531 }))
2532 .expect("Failed to create automatic subscription");
2533
2534 // Both should be active
2535 prop.set(1).expect("Failed to set property value with both subscriptions");
2536 assert_eq!(manual_count.load(Ordering::SeqCst), 1);
2537 assert_eq!(auto_count.load(Ordering::SeqCst), 1);
2538
2539 // Manual unsubscribe
2540 prop.unsubscribe(manual_id).expect("Failed to manually unsubscribe");
2541
2542 // Only automatic subscription should be active
2543 prop.set(2).expect("Failed to set property value after manual unsubscribe");
2544 assert_eq!(manual_count.load(Ordering::SeqCst), 1); // No change
2545 assert_eq!(auto_count.load(Ordering::SeqCst), 2);
2546
2547 // Auto subscription goes out of scope here and cleans up automatically
2548 }
2549
2550 #[test]
2551 fn test_subscribe_with_subscription_error_handling() {
2552 let prop = Arc::new(ObservableProperty::new(0));
2553 let prop_clone = prop.clone();
2554
2555 // Poison the lock
2556 let poison_thread = thread::spawn(move || {
2557 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for poisoning test");
2558 panic!("Deliberate panic to poison the lock");
2559 });
2560 let _ = poison_thread.join();
2561
2562 // With graceful degradation, subscribe_with_subscription should succeed
2563 let result = prop.subscribe_with_subscription(Arc::new(|_, _| {}));
2564 assert!(result.is_ok(), "subscribe_with_subscription should succeed with graceful degradation");
2565 }
2566
2567 #[test]
2568 fn test_subscription_with_property_cloning() {
2569 let prop1 = ObservableProperty::new(0);
2570 let prop2 = prop1.clone();
2571 let call_count = Arc::new(AtomicUsize::new(0));
2572
2573 // Subscribe to prop1
2574 let count = call_count.clone();
2575 let _subscription = prop1
2576 .subscribe_with_subscription(Arc::new(move |_, _| {
2577 count.fetch_add(1, Ordering::SeqCst);
2578 }))
2579 .expect("Failed to create subscription for cloned property test");
2580
2581 // Changes through prop2 should trigger the observer subscribed to prop1
2582 prop2.set(42).expect("Failed to set property value through prop2");
2583 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2584
2585 // Changes through prop1 should also trigger the observer
2586 prop1.set(100).expect("Failed to set property value through prop1");
2587 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2588 }
2589
2590 #[test]
2591 fn test_subscription_in_conditional_blocks() {
2592 let prop = ObservableProperty::new(0);
2593 let call_count = Arc::new(AtomicUsize::new(0));
2594
2595 let should_subscribe = true;
2596
2597 if should_subscribe {
2598 let count = call_count.clone();
2599 let _subscription = prop
2600 .subscribe_with_subscription(Arc::new(move |_, _| {
2601 count.fetch_add(1, Ordering::SeqCst);
2602 }))
2603 .expect("Failed to create subscription in conditional block");
2604
2605 // Observer active within this block
2606 prop.set(1).expect("Failed to set property value in conditional block");
2607 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2608
2609 // Subscription dropped when exiting this block
2610 }
2611
2612 // Observer should be inactive now
2613 prop.set(2).expect("Failed to set property value after conditional block");
2614 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2615 }
2616
2617 #[test]
2618 fn test_subscription_with_early_return() {
2619 fn test_function(
2620 prop: &ObservableProperty<i32>,
2621 should_return_early: bool,
2622 ) -> Result<(), PropertyError> {
2623 let call_count = Arc::new(AtomicUsize::new(0));
2624 let count = call_count.clone();
2625
2626 let _subscription = prop.subscribe_with_subscription(Arc::new(move |_, _| {
2627 count.fetch_add(1, Ordering::SeqCst);
2628 }))?;
2629
2630 prop.set(1)?;
2631 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2632
2633 if should_return_early {
2634 return Ok(()); // Subscription should be cleaned up here
2635 }
2636
2637 prop.set(2)?;
2638 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2639
2640 Ok(())
2641 // Subscription cleaned up when function exits normally
2642 }
2643
2644 let prop = ObservableProperty::new(0);
2645
2646 // Test early return
2647 test_function(&prop, true).expect("Failed to test early return");
2648
2649 // Verify observer is no longer active after early return
2650 prop.set(10).expect("Failed to set property value after early return");
2651
2652 // Test normal exit
2653 test_function(&prop, false).expect("Failed to test normal exit");
2654
2655 // Verify observer is no longer active after normal exit
2656 prop.set(20).expect("Failed to set property value after normal exit");
2657 }
2658
2659 #[test]
2660 fn test_subscription_move_semantics() {
2661 let prop = ObservableProperty::new(0);
2662 let call_count = Arc::new(AtomicUsize::new(0));
2663
2664 let count = call_count.clone();
2665 let subscription = prop
2666 .subscribe_with_subscription(Arc::new(move |_, _| {
2667 count.fetch_add(1, Ordering::SeqCst);
2668 }))
2669 .expect("Failed to create subscription for move semantics test");
2670
2671 // Observer should be active
2672 prop.set(1).expect("Failed to set property value before move");
2673 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2674
2675 // Move subscription to a new variable
2676 let moved_subscription = subscription;
2677
2678 // Observer should still be active after move
2679 prop.set(2).expect("Failed to set property value after move");
2680 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2681
2682 // Drop the moved subscription
2683 drop(moved_subscription);
2684
2685 // Observer should now be inactive
2686 prop.set(3).expect("Failed to set property value after moved subscription drop");
2687 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2688 }
2689
2690 #[test]
2691 fn test_filtered_subscription_automatic_cleanup() {
2692 let prop = ObservableProperty::new(0);
2693 let call_count = Arc::new(AtomicUsize::new(0));
2694
2695 {
2696 let count = call_count.clone();
2697 let _subscription = prop
2698 .subscribe_filtered_with_subscription(
2699 Arc::new(move |_, _| {
2700 count.fetch_add(1, Ordering::SeqCst);
2701 }),
2702 |old, new| new > old, // Only trigger on increases
2703 )
2704 .expect("Failed to create filtered subscription");
2705
2706 // Should trigger (0 -> 5)
2707 prop.set(5).expect("Failed to set property value to 5 in filtered test");
2708 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2709
2710 // Should NOT trigger (5 -> 3)
2711 prop.set(3).expect("Failed to set property value to 3 in filtered test");
2712 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2713
2714 // Should trigger (3 -> 10)
2715 prop.set(10).expect("Failed to set property value to 10 in filtered test");
2716 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2717
2718 // Subscription goes out of scope here
2719 }
2720
2721 // Observer should be inactive after subscription cleanup
2722 prop.set(20).expect("Failed to set property value after filtered subscription cleanup");
2723 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2724 }
2725
2726 #[test]
2727 fn test_multiple_filtered_subscriptions() {
2728 let prop = ObservableProperty::new(10);
2729 let increase_count = Arc::new(AtomicUsize::new(0));
2730 let decrease_count = Arc::new(AtomicUsize::new(0));
2731 let significant_change_count = Arc::new(AtomicUsize::new(0));
2732
2733 let inc_count = increase_count.clone();
2734 let dec_count = decrease_count.clone();
2735 let sig_count = significant_change_count.clone();
2736
2737 let _increase_sub = prop
2738 .subscribe_filtered_with_subscription(
2739 Arc::new(move |_, _| {
2740 inc_count.fetch_add(1, Ordering::SeqCst);
2741 }),
2742 |old, new| new > old,
2743 )
2744 .expect("Failed to create increase subscription");
2745
2746 let _decrease_sub = prop
2747 .subscribe_filtered_with_subscription(
2748 Arc::new(move |_, _| {
2749 dec_count.fetch_add(1, Ordering::SeqCst);
2750 }),
2751 |old, new| new < old,
2752 )
2753 .expect("Failed to create decrease subscription");
2754
2755 let _significant_sub = prop
2756 .subscribe_filtered_with_subscription(
2757 Arc::new(move |_, _| {
2758 sig_count.fetch_add(1, Ordering::SeqCst);
2759 }),
2760 |old, new| ((*new as i32) - (*old as i32)).abs() > 5,
2761 )
2762 .expect("Failed to create significant change subscription");
2763
2764 // Test increases
2765 prop.set(15).expect("Failed to set property to 15 in multiple filtered test"); // +5: triggers increase, not significant
2766 assert_eq!(increase_count.load(Ordering::SeqCst), 1);
2767 assert_eq!(decrease_count.load(Ordering::SeqCst), 0);
2768 assert_eq!(significant_change_count.load(Ordering::SeqCst), 0);
2769
2770 // Test significant increase
2771 prop.set(25).expect("Failed to set property to 25 in multiple filtered test"); // +10: triggers increase and significant
2772 assert_eq!(increase_count.load(Ordering::SeqCst), 2);
2773 assert_eq!(decrease_count.load(Ordering::SeqCst), 0);
2774 assert_eq!(significant_change_count.load(Ordering::SeqCst), 1);
2775
2776 // Test significant decrease
2777 prop.set(5).expect("Failed to set property to 5 in multiple filtered test"); // -20: triggers decrease and significant
2778 assert_eq!(increase_count.load(Ordering::SeqCst), 2);
2779 assert_eq!(decrease_count.load(Ordering::SeqCst), 1);
2780 assert_eq!(significant_change_count.load(Ordering::SeqCst), 2);
2781
2782 // Test small decrease
2783 prop.set(3).expect("Failed to set property to 3 in multiple filtered test"); // -2: triggers decrease, not significant
2784 assert_eq!(increase_count.load(Ordering::SeqCst), 2);
2785 assert_eq!(decrease_count.load(Ordering::SeqCst), 2);
2786 assert_eq!(significant_change_count.load(Ordering::SeqCst), 2);
2787
2788 // All subscriptions auto-cleanup when they go out of scope
2789 }
2790
2791 #[test]
2792 fn test_filtered_subscription_complex_filter() {
2793 let prop = ObservableProperty::new(0.0f64);
2794 let call_count = Arc::new(AtomicUsize::new(0));
2795 let values_received = Arc::new(RwLock::new(Vec::new()));
2796
2797 let count = call_count.clone();
2798 let values = values_received.clone();
2799 let _subscription = prop
2800 .subscribe_filtered_with_subscription(
2801 Arc::new(move |old, new| {
2802 count.fetch_add(1, Ordering::SeqCst);
2803 if let Ok(mut v) = values.write() {
2804 v.push((*old, *new));
2805 }
2806 }),
2807 |old, new| {
2808 // Complex filter: trigger only when crossing integer boundaries
2809 // and the change is significant (> 0.5)
2810 let old_int = old.floor() as i32;
2811 let new_int = new.floor() as i32;
2812 old_int != new_int && (new - old).abs() > 0.5_f64
2813 },
2814 )
2815 .expect("Failed to create complex filtered subscription");
2816
2817 // Small changes within same integer - should not trigger
2818 prop.set(0.3).expect("Failed to set property to 0.3 in complex filter test");
2819 prop.set(0.7).expect("Failed to set property to 0.7 in complex filter test");
2820 assert_eq!(call_count.load(Ordering::SeqCst), 0);
2821
2822 // Cross integer boundary with significant change - should trigger
2823 prop.set(1.3).expect("Failed to set property to 1.3 in complex filter test"); // Change of 0.6, which is > 0.5
2824 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2825
2826 // Small cross-boundary change - should not trigger
2827 prop.set(1.9).expect("Failed to set property to 1.9 in complex filter test");
2828 prop.set(2.1).expect("Failed to set property to 2.1 in complex filter test"); // Change of 0.2, less than 0.5
2829 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2830
2831 // Large cross-boundary change - should trigger
2832 prop.set(3.5).expect("Failed to set property to 3.5 in complex filter test");
2833 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2834
2835 // Verify received values
2836 let values = values_received.read().expect("Failed to read values in complex filter test");
2837 assert_eq!(values.len(), 2);
2838 assert_eq!(values[0], (0.7, 1.3));
2839 assert_eq!(values[1], (2.1, 3.5));
2840 }
2841
2842 #[test]
2843 fn test_filtered_subscription_error_handling() {
2844 let prop = Arc::new(ObservableProperty::new(0));
2845 let prop_clone = prop.clone();
2846
2847 // Poison the lock
2848 let poison_thread = thread::spawn(move || {
2849 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for filtered subscription poison test");
2850 panic!("Deliberate panic to poison the lock");
2851 });
2852 let _ = poison_thread.join();
2853
2854 // With graceful degradation, subscribe_filtered_with_subscription should succeed
2855 let result = prop.subscribe_filtered_with_subscription(Arc::new(|_, _| {}), |_, _| true);
2856 assert!(result.is_ok(), "subscribe_filtered_with_subscription should succeed with graceful degradation");
2857 }
2858
2859 #[test]
2860 fn test_filtered_subscription_vs_manual_filtered() {
2861 let prop = ObservableProperty::new(0);
2862 let auto_count = Arc::new(AtomicUsize::new(0));
2863 let manual_count = Arc::new(AtomicUsize::new(0));
2864
2865 // Manual filtered subscription
2866 let manual_count_clone = manual_count.clone();
2867 let manual_id = prop
2868 .subscribe_filtered(
2869 Arc::new(move |_, _| {
2870 manual_count_clone.fetch_add(1, Ordering::SeqCst);
2871 }),
2872 |old, new| new > old,
2873 )
2874 .expect("Failed to create manual filtered subscription");
2875
2876 // Automatic filtered subscription
2877 let auto_count_clone = auto_count.clone();
2878 let _auto_subscription = prop
2879 .subscribe_filtered_with_subscription(
2880 Arc::new(move |_, _| {
2881 auto_count_clone.fetch_add(1, Ordering::SeqCst);
2882 }),
2883 |old, new| new > old,
2884 )
2885 .expect("Failed to create automatic filtered subscription");
2886
2887 // Both should be triggered by increases
2888 prop.set(5).expect("Failed to set property to 5 in filtered vs manual test");
2889 assert_eq!(manual_count.load(Ordering::SeqCst), 1);
2890 assert_eq!(auto_count.load(Ordering::SeqCst), 1);
2891
2892 // Neither should be triggered by decreases
2893 prop.set(3).expect("Failed to set property to 3 in filtered vs manual test");
2894 assert_eq!(manual_count.load(Ordering::SeqCst), 1);
2895 assert_eq!(auto_count.load(Ordering::SeqCst), 1);
2896
2897 // Both should be triggered by increases again
2898 prop.set(10).expect("Failed to set property to 10 in filtered vs manual test");
2899 assert_eq!(manual_count.load(Ordering::SeqCst), 2);
2900 assert_eq!(auto_count.load(Ordering::SeqCst), 2);
2901
2902 // Manual cleanup
2903 prop.unsubscribe(manual_id).expect("Failed to unsubscribe manual filtered observer");
2904
2905 // Only automatic subscription should be active
2906 prop.set(15).expect("Failed to set property to 15 after manual cleanup");
2907 assert_eq!(manual_count.load(Ordering::SeqCst), 2); // No change
2908 assert_eq!(auto_count.load(Ordering::SeqCst), 3);
2909
2910 // Auto subscription cleaned up when it goes out of scope
2911 }
2912
2913 #[test]
2914 fn test_filtered_subscription_with_panicking_filter() {
2915 let prop = ObservableProperty::new(0);
2916 let call_count = Arc::new(AtomicUsize::new(0));
2917
2918 let count = call_count.clone();
2919 let _subscription = prop
2920 .subscribe_filtered_with_subscription(
2921 Arc::new(move |_, _| {
2922 count.fetch_add(1, Ordering::SeqCst);
2923 }),
2924 |_, new| {
2925 if *new == 42 {
2926 panic!("Filter panic on 42");
2927 }
2928 true // Accept all other values
2929 },
2930 )
2931 .expect("Failed to create panicking filter subscription");
2932
2933 // Normal value should work
2934 prop.set(1).expect("Failed to set property to 1 in panicking filter test");
2935 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2936
2937 // Value that causes filter to panic should be handled gracefully
2938 // The behavior here depends on how the filter panic is handled
2939 // In the current implementation, filter panics may cause the observer to not be called
2940 prop.set(42).expect("Failed to set property to 42 in panicking filter test");
2941
2942 // Observer should still work for subsequent normal values
2943 prop.set(2).expect("Failed to set property to 2 after filter panic");
2944 // Note: The exact count here depends on panic handling implementation
2945 // The important thing is that the system doesn't crash
2946 }
2947
2948 #[test]
2949 fn test_subscription_thread_safety() {
2950 let prop = Arc::new(ObservableProperty::new(0));
2951 let num_threads = 8;
2952 let operations_per_thread = 50;
2953 let total_calls = Arc::new(AtomicUsize::new(0));
2954
2955 let handles: Vec<_> = (0..num_threads)
2956 .map(|thread_id| {
2957 let prop_clone = prop.clone();
2958 let calls_clone = total_calls.clone();
2959
2960 thread::spawn(move || {
2961 let mut local_subscriptions = Vec::new();
2962
2963 for i in 0..operations_per_thread {
2964 let calls = calls_clone.clone();
2965 let subscription = prop_clone
2966 .subscribe_with_subscription(Arc::new(move |old, new| {
2967 calls.fetch_add(1, Ordering::SeqCst);
2968 // Simulate some work
2969 let _ = thread_id + i + old + new;
2970 }))
2971 .expect("Should be able to create subscription");
2972
2973 local_subscriptions.push(subscription);
2974
2975 // Trigger observers
2976 prop_clone
2977 .set(thread_id * 1000 + i)
2978 .expect("Should be able to set value");
2979
2980 // Occasionally drop some subscriptions
2981 if i % 5 == 0 && !local_subscriptions.is_empty() {
2982 local_subscriptions.remove(0); // Drop first subscription
2983 }
2984 }
2985
2986 // All remaining subscriptions dropped when vector goes out of scope
2987 })
2988 })
2989 .collect();
2990
2991 // Wait for all threads to complete
2992 for handle in handles {
2993 handle.join().expect("Thread should complete successfully");
2994 }
2995
2996 // Property should still be functional after all the concurrent operations
2997 prop.set(9999).expect("Property should still work");
2998
2999 // We can't easily verify the exact call count due to the complex timing,
3000 // but we can verify that the system didn't crash and is still operational
3001 println!(
3002 "Total observer calls: {}",
3003 total_calls.load(Ordering::SeqCst)
3004 );
3005 }
3006
3007 #[test]
3008 fn test_subscription_cross_thread_drop() {
3009 let prop = Arc::new(ObservableProperty::new(0));
3010 let call_count = Arc::new(AtomicUsize::new(0));
3011
3012 // Create subscription in main thread
3013 let count = call_count.clone();
3014 let subscription = prop
3015 .subscribe_with_subscription(Arc::new(move |_, _| {
3016 count.fetch_add(1, Ordering::SeqCst);
3017 }))
3018 .expect("Failed to create subscription for cross-thread drop test");
3019
3020 // Verify observer is active
3021 prop.set(1).expect("Failed to set property value in cross-thread drop test");
3022 assert_eq!(call_count.load(Ordering::SeqCst), 1);
3023
3024 // Move subscription to another thread and drop it there
3025 let prop_clone = prop.clone();
3026 let call_count_clone = call_count.clone();
3027
3028 let handle = thread::spawn(move || {
3029 // Verify observer is still active in the other thread
3030 prop_clone.set(2).expect("Failed to set property value in other thread");
3031 assert_eq!(call_count_clone.load(Ordering::SeqCst), 2);
3032
3033 // Drop subscription in this thread
3034 drop(subscription);
3035
3036 // Verify observer is no longer active
3037 prop_clone.set(3).expect("Failed to set property value after drop in other thread");
3038 assert_eq!(call_count_clone.load(Ordering::SeqCst), 2); // No change
3039 });
3040
3041 handle.join().expect("Failed to join cross-thread drop test thread");
3042
3043 // Verify observer is still inactive in main thread
3044 prop.set(4).expect("Failed to set property value after thread join");
3045 assert_eq!(call_count.load(Ordering::SeqCst), 2);
3046 }
3047
3048 #[test]
3049 fn test_concurrent_subscription_creation_and_property_changes() {
3050 let prop = Arc::new(ObservableProperty::new(0));
3051 let total_notifications = Arc::new(AtomicUsize::new(0));
3052 let num_subscriber_threads = 4;
3053 let num_setter_threads = 2;
3054 let operations_per_thread = 25;
3055
3056 // Threads that create and destroy subscriptions
3057 let subscriber_handles: Vec<_> = (0..num_subscriber_threads)
3058 .map(|_| {
3059 let prop_clone = prop.clone();
3060 let notifications_clone = total_notifications.clone();
3061
3062 thread::spawn(move || {
3063 for _ in 0..operations_per_thread {
3064 let notifications = notifications_clone.clone();
3065 let _subscription = prop_clone
3066 .subscribe_with_subscription(Arc::new(move |_, _| {
3067 notifications.fetch_add(1, Ordering::SeqCst);
3068 }))
3069 .expect("Should create subscription");
3070
3071 // Keep subscription alive for a short time
3072 thread::sleep(Duration::from_millis(1));
3073
3074 // Subscription dropped when _subscription goes out of scope
3075 }
3076 })
3077 })
3078 .collect();
3079
3080 // Threads that continuously change the property value
3081 let setter_handles: Vec<_> = (0..num_setter_threads)
3082 .map(|thread_id| {
3083 let prop_clone = prop.clone();
3084
3085 thread::spawn(move || {
3086 for i in 0..operations_per_thread * 2 {
3087 prop_clone
3088 .set(thread_id * 10000 + i)
3089 .expect("Should set value");
3090 thread::sleep(Duration::from_millis(1));
3091 }
3092 })
3093 })
3094 .collect();
3095
3096 // Wait for all threads to complete
3097 for handle in subscriber_handles
3098 .into_iter()
3099 .chain(setter_handles.into_iter())
3100 {
3101 handle.join().expect("Thread should complete");
3102 }
3103
3104 // System should be stable after concurrent operations
3105 prop.set(99999).expect("Property should still work");
3106
3107 println!(
3108 "Total notifications during concurrent test: {}",
3109 total_notifications.load(Ordering::SeqCst)
3110 );
3111 }
3112
3113 #[test]
3114 fn test_filtered_subscription_thread_safety() {
3115 let prop = Arc::new(ObservableProperty::new(0));
3116 let increase_notifications = Arc::new(AtomicUsize::new(0));
3117 let decrease_notifications = Arc::new(AtomicUsize::new(0));
3118 let num_threads = 6;
3119
3120 let handles: Vec<_> = (0..num_threads)
3121 .map(|thread_id| {
3122 let prop_clone = prop.clone();
3123 let inc_notifications = increase_notifications.clone();
3124 let dec_notifications = decrease_notifications.clone();
3125
3126 thread::spawn(move || {
3127 // Create increase-only subscription
3128 let inc_count = inc_notifications.clone();
3129 let _inc_subscription = prop_clone
3130 .subscribe_filtered_with_subscription(
3131 Arc::new(move |_, _| {
3132 inc_count.fetch_add(1, Ordering::SeqCst);
3133 }),
3134 |old, new| new > old,
3135 )
3136 .expect("Should create filtered subscription");
3137
3138 // Create decrease-only subscription
3139 let dec_count = dec_notifications.clone();
3140 let _dec_subscription = prop_clone
3141 .subscribe_filtered_with_subscription(
3142 Arc::new(move |_, _| {
3143 dec_count.fetch_add(1, Ordering::SeqCst);
3144 }),
3145 |old, new| new < old,
3146 )
3147 .expect("Should create filtered subscription");
3148
3149 // Perform some property changes
3150 let base_value = thread_id * 100;
3151 for i in 0..20 {
3152 let new_value = base_value + (i % 10); // Creates increases and decreases
3153 prop_clone.set(new_value).expect("Should set value");
3154 thread::sleep(Duration::from_millis(1));
3155 }
3156
3157 // Subscriptions automatically cleaned up when going out of scope
3158 })
3159 })
3160 .collect();
3161
3162 // Wait for all threads
3163 for handle in handles {
3164 handle.join().expect("Thread should complete");
3165 }
3166
3167 // Verify system is still operational
3168 let initial_inc = increase_notifications.load(Ordering::SeqCst);
3169 let initial_dec = decrease_notifications.load(Ordering::SeqCst);
3170
3171 prop.set(1000).expect("Property should still work");
3172 prop.set(2000).expect("Property should still work");
3173
3174 // No new notifications should occur (all subscriptions cleaned up)
3175 assert_eq!(increase_notifications.load(Ordering::SeqCst), initial_inc);
3176 assert_eq!(decrease_notifications.load(Ordering::SeqCst), initial_dec);
3177
3178 println!(
3179 "Increase notifications: {}, Decrease notifications: {}",
3180 initial_inc, initial_dec
3181 );
3182 }
3183
3184 #[test]
3185 fn test_subscription_with_async_property_changes() {
3186 let prop = Arc::new(ObservableProperty::new(0));
3187 let sync_notifications = Arc::new(AtomicUsize::new(0));
3188 let async_notifications = Arc::new(AtomicUsize::new(0));
3189
3190 // Subscription that tracks sync notifications
3191 let sync_count = sync_notifications.clone();
3192 let _sync_subscription = prop
3193 .subscribe_with_subscription(Arc::new(move |old, new| {
3194 sync_count.fetch_add(1, Ordering::SeqCst);
3195 // Simulate slow observer work
3196 thread::sleep(Duration::from_millis(5));
3197 println!("Sync observer: {} -> {}", old, new);
3198 }))
3199 .expect("Failed to create sync subscription");
3200
3201 // Subscription that tracks async notifications
3202 let async_count = async_notifications.clone();
3203 let _async_subscription = prop
3204 .subscribe_with_subscription(Arc::new(move |old, new| {
3205 async_count.fetch_add(1, Ordering::SeqCst);
3206 println!("Async observer: {} -> {}", old, new);
3207 }))
3208 .expect("Failed to create async subscription");
3209
3210 // Test sync property changes
3211 let start = std::time::Instant::now();
3212 prop.set(1).expect("Failed to set property value 1 in async test");
3213 prop.set(2).expect("Failed to set property value 2 in async test");
3214 let sync_duration = start.elapsed();
3215
3216 // Test async property changes
3217 let start = std::time::Instant::now();
3218 prop.set_async(3).expect("Failed to set property value 3 async");
3219 prop.set_async(4).expect("Failed to set property value 4 async");
3220 let async_duration = start.elapsed();
3221
3222 // Async should be much faster
3223 assert!(async_duration < sync_duration);
3224
3225 // Wait for async observers to complete
3226 thread::sleep(Duration::from_millis(50));
3227
3228 // All observers should have been called
3229 assert_eq!(sync_notifications.load(Ordering::SeqCst), 4);
3230 assert_eq!(async_notifications.load(Ordering::SeqCst), 4);
3231
3232 // Subscriptions auto-cleanup when going out of scope
3233 }
3234
3235 #[test]
3236 fn test_subscription_creation_with_poisoned_lock() {
3237 let prop = Arc::new(ObservableProperty::new(0));
3238 let prop_clone = prop.clone();
3239
3240 // Create a valid subscription before poisoning
3241 let call_count = Arc::new(AtomicUsize::new(0));
3242 let count = call_count.clone();
3243 let existing_subscription = prop
3244 .subscribe_with_subscription(Arc::new(move |_, _| {
3245 count.fetch_add(1, Ordering::SeqCst);
3246 }))
3247 .expect("Failed to create subscription before poisoning");
3248
3249 // Poison the lock
3250 let poison_thread = thread::spawn(move || {
3251 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for subscription poison test");
3252 panic!("Deliberate panic to poison the lock");
3253 });
3254 let _ = poison_thread.join();
3255
3256 // With graceful degradation, new subscription creation should succeed
3257 let result = prop.subscribe_with_subscription(Arc::new(|_, _| {}));
3258 assert!(result.is_ok(), "subscribe_with_subscription should succeed with graceful degradation");
3259
3260 // New filtered subscription creation should also succeed
3261 let filtered_result =
3262 prop.subscribe_filtered_with_subscription(Arc::new(|_, _| {}), |_, _| true);
3263 assert!(filtered_result.is_ok(), "subscribe_filtered_with_subscription should succeed with graceful degradation");
3264
3265 // Dropping existing subscription should not panic
3266 drop(existing_subscription);
3267 }
3268
3269 #[test]
3270 fn test_subscription_cleanup_behavior_with_poisoned_lock() {
3271 // This test specifically verifies that Drop doesn't panic with poisoned locks
3272 let prop = Arc::new(ObservableProperty::new(0));
3273 let call_count = Arc::new(AtomicUsize::new(0));
3274
3275 // Create subscription
3276 let count = call_count.clone();
3277 let subscription = prop
3278 .subscribe_with_subscription(Arc::new(move |_, _| {
3279 count.fetch_add(1, Ordering::SeqCst);
3280 }))
3281 .expect("Failed to create subscription for cleanup behavior test");
3282
3283 // Verify it works initially
3284 prop.set(1).expect("Failed to set property value in cleanup behavior test");
3285 assert_eq!(call_count.load(Ordering::SeqCst), 1);
3286
3287 // Poison the lock from another thread
3288 let prop_clone = prop.clone();
3289 let poison_thread = thread::spawn(move || {
3290 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for cleanup behavior poison test");
3291 panic!("Deliberate panic to poison the lock");
3292 });
3293 let _ = poison_thread.join();
3294
3295 // Now drop the subscription - this should NOT panic
3296 // The Drop implementation should handle the poisoned lock gracefully
3297 drop(subscription);
3298
3299 // Test succeeds if we reach this point without panicking
3300 }
3301
3302 #[test]
3303 fn test_multiple_subscription_cleanup_with_poisoned_lock() {
3304 let prop = Arc::new(ObservableProperty::new(0));
3305 let mut subscriptions = Vec::new();
3306
3307 // Create multiple subscriptions
3308 for i in 0..5 {
3309 let call_count = Arc::new(AtomicUsize::new(0));
3310 let count = call_count.clone();
3311 let subscription = prop
3312 .subscribe_with_subscription(Arc::new(move |old, new| {
3313 count.fetch_add(1, Ordering::SeqCst);
3314 println!("Observer {}: {} -> {}", i, old, new);
3315 }))
3316 .expect("Failed to create subscription in multiple cleanup test");
3317 subscriptions.push(subscription);
3318 }
3319
3320 // Verify they all work
3321 prop.set(42).expect("Failed to set property value in multiple cleanup test");
3322
3323 // Poison the lock
3324 let prop_clone = prop.clone();
3325 let poison_thread = thread::spawn(move || {
3326 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for multiple cleanup poison test");
3327 panic!("Deliberate panic to poison the lock");
3328 });
3329 let _ = poison_thread.join();
3330
3331 // Drop all subscriptions - none should panic
3332 for subscription in subscriptions {
3333 drop(subscription);
3334 }
3335
3336 // Test succeeds if no panics occurred
3337 }
3338
3339 #[test]
3340 fn test_subscription_behavior_before_and_after_poison() {
3341 let prop = Arc::new(ObservableProperty::new(0));
3342 let before_poison_count = Arc::new(AtomicUsize::new(0));
3343 let after_poison_count = Arc::new(AtomicUsize::new(0));
3344
3345 // Create subscription before poisoning
3346 let before_count = before_poison_count.clone();
3347 let before_subscription = prop
3348 .subscribe_with_subscription(Arc::new(move |_, _| {
3349 before_count.fetch_add(1, Ordering::SeqCst);
3350 }))
3351 .expect("Failed to create subscription before poison test");
3352
3353 // Verify it works
3354 prop.set(1).expect("Failed to set property value before poison test");
3355 assert_eq!(before_poison_count.load(Ordering::SeqCst), 1);
3356
3357 // Poison the lock
3358 let prop_clone = prop.clone();
3359 let poison_thread = thread::spawn(move || {
3360 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for before/after poison test");
3361 panic!("Deliberate panic to poison the lock");
3362 });
3363 let _ = poison_thread.join();
3364
3365 // With graceful degradation, subscription creation after poisoning should succeed
3366 let after_count = after_poison_count.clone();
3367 let after_result = prop.subscribe_with_subscription(Arc::new(move |_, _| {
3368 after_count.fetch_add(1, Ordering::SeqCst);
3369 }));
3370 assert!(after_result.is_ok(), "subscribe_with_subscription should succeed with graceful degradation");
3371
3372 let _after_subscription = after_result.unwrap();
3373
3374 // Clean up the before-poison subscription - should not panic
3375 drop(before_subscription);
3376 }
3377
3378 #[test]
3379 fn test_concurrent_subscription_drops_with_poison() {
3380 let prop = Arc::new(ObservableProperty::new(0));
3381 let num_subscriptions = 10;
3382 let mut subscriptions = Vec::new();
3383
3384 // Create multiple subscriptions
3385 for i in 0..num_subscriptions {
3386 let call_count = Arc::new(AtomicUsize::new(0));
3387 let count = call_count.clone();
3388 let subscription = prop
3389 .subscribe_with_subscription(Arc::new(move |_, _| {
3390 count.fetch_add(1, Ordering::SeqCst);
3391 println!("Observer {}", i);
3392 }))
3393 .expect("Failed to create subscription in concurrent drops test");
3394 subscriptions.push(subscription);
3395 }
3396
3397 // Poison the lock
3398 let prop_clone = prop.clone();
3399 let poison_thread = thread::spawn(move || {
3400 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for concurrent drops poison test");
3401 panic!("Deliberate panic to poison the lock");
3402 });
3403 let _ = poison_thread.join();
3404
3405 // Drop subscriptions concurrently from multiple threads
3406 let handles: Vec<_> = subscriptions
3407 .into_iter()
3408 .enumerate()
3409 .map(|(i, subscription)| {
3410 thread::spawn(move || {
3411 // Add some randomness to timing
3412 thread::sleep(Duration::from_millis(i as u64 % 5));
3413 drop(subscription);
3414 println!("Dropped subscription {}", i);
3415 })
3416 })
3417 .collect();
3418
3419 // Wait for all drops to complete
3420 for handle in handles {
3421 handle
3422 .join()
3423 .expect("Drop thread should complete without panic");
3424 }
3425
3426 // Test succeeds if all threads completed successfully
3427 }
3428
3429
3430 #[test]
3431 fn test_with_max_threads_creation() {
3432 // Test creation with various thread counts
3433 let prop1 = ObservableProperty::with_max_threads(42, 1);
3434 let prop2 = ObservableProperty::with_max_threads("test".to_string(), 8);
3435 let prop3 = ObservableProperty::with_max_threads(0.5_f64, 16);
3436
3437 // Verify initial values are correct
3438 assert_eq!(prop1.get().expect("Failed to get prop1 value"), 42);
3439 assert_eq!(prop2.get().expect("Failed to get prop2 value"), "test");
3440 assert_eq!(prop3.get().expect("Failed to get prop3 value"), 0.5);
3441 }
3442
3443 #[test]
3444 fn test_with_max_threads_zero_defaults_to_max_threads() {
3445 // Test that zero max_threads defaults to MAX_THREADS (4)
3446 let prop1 = ObservableProperty::with_max_threads(100, 0);
3447 let prop2 = ObservableProperty::new(100); // Uses default MAX_THREADS
3448
3449 // Both should have the same max_threads value
3450 // We can't directly access max_threads, but we can verify behavior is consistent
3451 assert_eq!(prop1.get().expect("Failed to get prop1 value"), 100);
3452 assert_eq!(prop2.get().expect("Failed to get prop2 value"), 100);
3453 }
3454
3455 #[test]
3456 fn test_with_max_threads_basic_functionality() {
3457 let prop = ObservableProperty::with_max_threads(0, 2);
3458 let call_count = Arc::new(AtomicUsize::new(0));
3459
3460 // Subscribe an observer
3461 let count = call_count.clone();
3462 let _subscription = prop
3463 .subscribe_with_subscription(Arc::new(move |old, new| {
3464 count.fetch_add(1, Ordering::SeqCst);
3465 assert_eq!(*old, 0);
3466 assert_eq!(*new, 42);
3467 }))
3468 .expect("Failed to create subscription for max_threads test");
3469
3470 // Test synchronous set
3471 prop.set(42).expect("Failed to set value synchronously");
3472 assert_eq!(call_count.load(Ordering::SeqCst), 1);
3473
3474 // Test asynchronous set
3475 prop.set_async(43).expect("Failed to set value asynchronously");
3476
3477 // Wait for async observers to complete
3478 thread::sleep(Duration::from_millis(50));
3479 assert_eq!(call_count.load(Ordering::SeqCst), 2);
3480 }
3481
3482 #[test]
3483 fn test_with_max_threads_async_performance() {
3484 // Test that with_max_threads affects async performance
3485 let prop = ObservableProperty::with_max_threads(0, 1); // Single thread
3486 let slow_call_count = Arc::new(AtomicUsize::new(0));
3487
3488 // Add multiple slow observers
3489 let mut subscriptions = Vec::new();
3490 for _ in 0..4 {
3491 let count = slow_call_count.clone();
3492 let subscription = prop
3493 .subscribe_with_subscription(Arc::new(move |_, _| {
3494 thread::sleep(Duration::from_millis(25)); // Simulate slow work
3495 count.fetch_add(1, Ordering::SeqCst);
3496 }))
3497 .expect("Failed to create slow observer subscription");
3498 subscriptions.push(subscription);
3499 }
3500
3501 // Measure time for async notification
3502 let start = std::time::Instant::now();
3503 prop.set_async(42).expect("Failed to set value asynchronously");
3504 let async_duration = start.elapsed();
3505
3506 // Should return quickly even with slow observers
3507 assert!(async_duration.as_millis() < 50, "Async set should return quickly");
3508
3509 // Wait for all observers to complete
3510 thread::sleep(Duration::from_millis(200));
3511 assert_eq!(slow_call_count.load(Ordering::SeqCst), 4);
3512 }
3513
3514 #[test]
3515 fn test_with_max_threads_vs_regular_constructor() {
3516 let prop_regular = ObservableProperty::new(42);
3517 let prop_custom = ObservableProperty::with_max_threads(42, 4); // Same as default
3518
3519 let count_regular = Arc::new(AtomicUsize::new(0));
3520 let count_custom = Arc::new(AtomicUsize::new(0));
3521
3522 // Both should behave identically
3523 let count1 = count_regular.clone();
3524 let _sub1 = prop_regular
3525 .subscribe_with_subscription(Arc::new(move |_, _| {
3526 count1.fetch_add(1, Ordering::SeqCst);
3527 }))
3528 .expect("Failed to create regular subscription");
3529
3530 let count2 = count_custom.clone();
3531 let _sub2 = prop_custom
3532 .subscribe_with_subscription(Arc::new(move |_, _| {
3533 count2.fetch_add(1, Ordering::SeqCst);
3534 }))
3535 .expect("Failed to create custom subscription");
3536
3537 // Test sync behavior
3538 prop_regular.set(100).expect("Failed to set regular property");
3539 prop_custom.set(100).expect("Failed to set custom property");
3540
3541 assert_eq!(count_regular.load(Ordering::SeqCst), 1);
3542 assert_eq!(count_custom.load(Ordering::SeqCst), 1);
3543
3544 // Test async behavior
3545 prop_regular.set_async(200).expect("Failed to set regular property async");
3546 prop_custom.set_async(200).expect("Failed to set custom property async");
3547
3548 thread::sleep(Duration::from_millis(50));
3549 assert_eq!(count_regular.load(Ordering::SeqCst), 2);
3550 assert_eq!(count_custom.load(Ordering::SeqCst), 2);
3551 }
3552
3553 #[test]
3554 fn test_with_max_threads_large_values() {
3555 // Test with very large max_threads values
3556 let prop = ObservableProperty::with_max_threads(0, 1000);
3557 let call_count = Arc::new(AtomicUsize::new(0));
3558
3559 // Add a few observers
3560 let count = call_count.clone();
3561 let _subscription = prop
3562 .subscribe_with_subscription(Arc::new(move |_, _| {
3563 count.fetch_add(1, Ordering::SeqCst);
3564 }))
3565 .expect("Failed to create subscription for large max_threads test");
3566
3567 // Should work normally even with excessive thread limit
3568 prop.set_async(42).expect("Failed to set value with large max_threads");
3569
3570 thread::sleep(Duration::from_millis(50));
3571 assert_eq!(call_count.load(Ordering::SeqCst), 1);
3572 }
3573
3574 #[test]
3575 fn test_with_max_threads_clone_behavior() {
3576 let prop1 = ObservableProperty::with_max_threads(42, 2);
3577 let prop2 = prop1.clone();
3578
3579 let call_count1 = Arc::new(AtomicUsize::new(0));
3580 let call_count2 = Arc::new(AtomicUsize::new(0));
3581
3582 // Subscribe to both properties
3583 let count1 = call_count1.clone();
3584 let _sub1 = prop1
3585 .subscribe_with_subscription(Arc::new(move |_, _| {
3586 count1.fetch_add(1, Ordering::SeqCst);
3587 }))
3588 .expect("Failed to create subscription for cloned property test");
3589
3590 let count2 = call_count2.clone();
3591 let _sub2 = prop2
3592 .subscribe_with_subscription(Arc::new(move |_, _| {
3593 count2.fetch_add(1, Ordering::SeqCst);
3594 }))
3595 .expect("Failed to create subscription for original property test");
3596
3597 // Changes through either property should trigger both observers
3598 prop1.set_async(100).expect("Failed to set value through prop1");
3599 thread::sleep(Duration::from_millis(50));
3600
3601 assert_eq!(call_count1.load(Ordering::SeqCst), 1);
3602 assert_eq!(call_count2.load(Ordering::SeqCst), 1);
3603
3604 prop2.set_async(200).expect("Failed to set value through prop2");
3605 thread::sleep(Duration::from_millis(50));
3606
3607 assert_eq!(call_count1.load(Ordering::SeqCst), 2);
3608 assert_eq!(call_count2.load(Ordering::SeqCst), 2);
3609 }
3610
3611 #[test]
3612 fn test_with_max_threads_thread_safety() {
3613 let prop = Arc::new(ObservableProperty::with_max_threads(0, 3));
3614 let call_count = Arc::new(AtomicUsize::new(0));
3615
3616 // Add observers from multiple threads
3617 let handles: Vec<_> = (0..5)
3618 .map(|thread_id| {
3619 let prop_clone = prop.clone();
3620 let count_clone = call_count.clone();
3621
3622 thread::spawn(move || {
3623 let count = count_clone.clone();
3624 let _subscription = prop_clone
3625 .subscribe_with_subscription(Arc::new(move |_, _| {
3626 count.fetch_add(1, Ordering::SeqCst);
3627 }))
3628 .expect("Failed to create thread-safe subscription");
3629
3630 // Trigger async notifications from this thread
3631 prop_clone
3632 .set_async(thread_id * 10)
3633 .expect("Failed to set value from thread");
3634
3635 thread::sleep(Duration::from_millis(10));
3636 })
3637 })
3638 .collect();
3639
3640 // Wait for all threads to complete
3641 for handle in handles {
3642 handle.join().expect("Thread should complete successfully");
3643 }
3644
3645 // Wait for all async operations to complete
3646 thread::sleep(Duration::from_millis(100));
3647
3648 // Each set_async should trigger all active observers at that time
3649 // The exact count depends on timing, but should be > 0
3650 assert!(call_count.load(Ordering::SeqCst) > 0);
3651 }
3652
3653 #[test]
3654 fn test_with_max_threads_error_handling() {
3655 let prop = ObservableProperty::with_max_threads(42, 2);
3656
3657 // Test that error handling works the same as regular properties
3658 let _subscription = prop
3659 .subscribe_with_subscription(Arc::new(|_, _| {
3660 // Normal observer
3661 }))
3662 .expect("Failed to create subscription for error handling test");
3663
3664 // Should handle errors gracefully
3665 assert!(prop.set(100).is_ok());
3666 assert!(prop.set_async(200).is_ok());
3667 assert_eq!(prop.get().expect("Failed to get value after error test"), 200);
3668 }
3669
3670 #[test]
3671 fn test_observer_limit_enforcement() {
3672 let prop = ObservableProperty::new(0);
3673 let mut observer_ids = Vec::new();
3674
3675 // Add observers up to the limit (using a small test to avoid slow tests)
3676 // In reality, MAX_OBSERVERS is 10,000, but we'll test the mechanism
3677 // by adding a reasonable number and then checking the error message
3678 for i in 0..100 {
3679 let result = prop.subscribe(Arc::new(move |_, _| {
3680 let _ = i; // Use the capture to make each observer unique
3681 }));
3682 assert!(result.is_ok(), "Should be able to add observer {}", i);
3683 observer_ids.push(result.unwrap());
3684 }
3685
3686 // Verify all observers were added
3687 assert_eq!(observer_ids.len(), 100);
3688
3689 // Verify we can still add more (we're well under the 10,000 limit)
3690 let result = prop.subscribe(Arc::new(|_, _| {}));
3691 assert!(result.is_ok());
3692 }
3693
3694 #[test]
3695 fn test_observer_limit_error_message() {
3696 let prop = ObservableProperty::new(0);
3697
3698 // We can't easily test hitting the actual 10,000 limit in a unit test
3699 // (it would be too slow), but we can verify the error type exists
3700 // and the subscribe method has the check
3701
3702 // Add a few observers successfully
3703 for _ in 0..10 {
3704 assert!(prop.subscribe(Arc::new(|_, _| {})).is_ok());
3705 }
3706
3707 // The mechanism is in place - the limit check happens before insertion
3708 // In production, if 10,000 observers are added, the 10,001st will fail
3709 }
3710
3711 #[test]
3712 fn test_observer_limit_with_unsubscribe() {
3713 let prop = ObservableProperty::new(0);
3714
3715 // Add observers
3716 let mut ids = Vec::new();
3717 for _ in 0..50 {
3718 ids.push(prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe"));
3719 }
3720
3721 // Remove half of them
3722 for id in ids.iter().take(25) {
3723 assert!(prop.unsubscribe(*id).expect("Failed to unsubscribe"));
3724 }
3725
3726 // Should be able to add more observers after unsubscribing
3727 for _ in 0..30 {
3728 assert!(prop.subscribe(Arc::new(|_, _| {})).is_ok());
3729 }
3730 }
3731
3732 #[test]
3733 fn test_observer_limit_with_raii_subscriptions() {
3734 let prop = ObservableProperty::new(0);
3735
3736 // Create RAII subscriptions
3737 let mut subscriptions = Vec::new();
3738 for _ in 0..50 {
3739 subscriptions.push(
3740 prop.subscribe_with_subscription(Arc::new(|_, _| {}))
3741 .expect("Failed to create subscription")
3742 );
3743 }
3744
3745 // Drop half of them (automatic cleanup)
3746 subscriptions.truncate(25);
3747
3748 // Should be able to add more after RAII cleanup
3749 for _ in 0..30 {
3750 let _sub = prop.subscribe_with_subscription(Arc::new(|_, _| {}))
3751 .expect("Failed to create subscription after RAII cleanup");
3752 }
3753 }
3754
3755 #[test]
3756 fn test_filtered_subscription_respects_observer_limit() {
3757 let prop = ObservableProperty::new(0);
3758
3759 // Add regular and filtered observers
3760 for i in 0..50 {
3761 if i % 2 == 0 {
3762 assert!(prop.subscribe(Arc::new(|_, _| {})).is_ok());
3763 } else {
3764 assert!(prop.subscribe_filtered(Arc::new(|_, _| {}), |_, _| true).is_ok());
3765 }
3766 }
3767
3768 // Both types count toward the limit
3769 // Should still be well under the 10,000 limit
3770 assert!(prop.subscribe_filtered(Arc::new(|_, _| {}), |_, _| true).is_ok());
3771 }
3772
3773 #[test]
3774 fn test_observer_limit_concurrent_subscriptions() {
3775 let prop = Arc::new(ObservableProperty::new(0));
3776 let success_count = Arc::new(AtomicUsize::new(0));
3777
3778 // Try to add observers from multiple threads
3779 let handles: Vec<_> = (0..10)
3780 .map(|_| {
3781 let prop_clone = prop.clone();
3782 let count_clone = success_count.clone();
3783
3784 thread::spawn(move || {
3785 for _ in 0..10 {
3786 if prop_clone.subscribe(Arc::new(|_, _| {})).is_ok() {
3787 count_clone.fetch_add(1, Ordering::SeqCst);
3788 }
3789 thread::sleep(Duration::from_micros(10));
3790 }
3791 })
3792 })
3793 .collect();
3794
3795 for handle in handles {
3796 handle.join().expect("Thread should complete");
3797 }
3798
3799 // All 100 subscriptions should succeed (well under limit)
3800 assert_eq!(success_count.load(Ordering::SeqCst), 100);
3801 }
3802
3803 #[test]
3804 fn test_notify_observers_batch_releases_lock_early() {
3805 use std::sync::atomic::AtomicBool;
3806
3807 let prop = Arc::new(ObservableProperty::new(0));
3808 let call_count = Arc::new(AtomicUsize::new(0));
3809 let started = Arc::new(AtomicBool::new(false));
3810
3811 // Subscribe with a slow observer
3812 let started_clone = started.clone();
3813 let count_clone = call_count.clone();
3814 prop.subscribe(Arc::new(move |_, _| {
3815 started_clone.store(true, Ordering::SeqCst);
3816 count_clone.fetch_add(1, Ordering::SeqCst);
3817 // Simulate slow observer
3818 thread::sleep(Duration::from_millis(50));
3819 })).expect("Failed to subscribe");
3820
3821 // Start batch notification in background
3822 let prop_clone = prop.clone();
3823 let batch_handle = thread::spawn(move || {
3824 prop_clone.notify_observers_batch(vec![(0, 1), (1, 2)]).expect("Failed to notify batch");
3825 });
3826
3827 // Wait for observer to start
3828 while !started.load(Ordering::SeqCst) {
3829 thread::sleep(Duration::from_millis(1));
3830 }
3831
3832 // Now verify we can still subscribe while observer is running
3833 // This proves the lock was released before observer execution
3834 let subscribe_result = prop.subscribe(Arc::new(|_, _| {
3835 // New observer
3836 }));
3837
3838 assert!(subscribe_result.is_ok(), "Should be able to subscribe while batch notification is in progress");
3839
3840 // Wait for batch to complete
3841 batch_handle.join().expect("Batch thread should complete");
3842
3843 // Verify observers were called (2 changes in batch)
3844 assert_eq!(call_count.load(Ordering::SeqCst), 2);
3845 }
3846
3847 #[test]
3848 fn test_notify_observers_batch_panic_isolation() {
3849 let prop = ObservableProperty::new(0);
3850 let good_observer_count = Arc::new(AtomicUsize::new(0));
3851 let count_clone = good_observer_count.clone();
3852
3853 // First observer that panics
3854 prop.subscribe(Arc::new(|_, _| {
3855 panic!("Deliberate panic in batch observer");
3856 })).expect("Failed to subscribe panicking observer");
3857
3858 // Second observer that should still be called
3859 prop.subscribe(Arc::new(move |_, _| {
3860 count_clone.fetch_add(1, Ordering::SeqCst);
3861 })).expect("Failed to subscribe good observer");
3862
3863 // Batch notification should not fail despite panic
3864 let result = prop.notify_observers_batch(vec![(0, 1), (1, 2), (2, 3)]);
3865 assert!(result.is_ok());
3866
3867 // Second observer should have been called for all 3 changes
3868 assert_eq!(good_observer_count.load(Ordering::SeqCst), 3);
3869 }
3870
3871 #[test]
3872 fn test_notify_observers_batch_multiple_changes() {
3873 let prop = ObservableProperty::new(0);
3874 let received_changes = Arc::new(RwLock::new(Vec::new()));
3875 let changes_clone = received_changes.clone();
3876
3877 prop.subscribe(Arc::new(move |old, new| {
3878 if let Ok(mut changes) = changes_clone.write() {
3879 changes.push((*old, *new));
3880 }
3881 })).expect("Failed to subscribe");
3882
3883 // Send multiple changes
3884 prop.notify_observers_batch(vec![
3885 (0, 10),
3886 (10, 20),
3887 (20, 30),
3888 (30, 40),
3889 ]).expect("Failed to notify batch");
3890
3891 let changes = received_changes.read().expect("Failed to read changes");
3892 assert_eq!(changes.len(), 4);
3893 assert_eq!(changes[0], (0, 10));
3894 assert_eq!(changes[1], (10, 20));
3895 assert_eq!(changes[2], (20, 30));
3896 assert_eq!(changes[3], (30, 40));
3897 }
3898
3899 #[test]
3900 fn test_notify_observers_batch_empty() {
3901 let prop = ObservableProperty::new(0);
3902 let call_count = Arc::new(AtomicUsize::new(0));
3903 let count_clone = call_count.clone();
3904
3905 prop.subscribe(Arc::new(move |_, _| {
3906 count_clone.fetch_add(1, Ordering::SeqCst);
3907 })).expect("Failed to subscribe");
3908
3909 // Empty batch should succeed without calling observers
3910 prop.notify_observers_batch(vec![]).expect("Failed with empty batch");
3911
3912 assert_eq!(call_count.load(Ordering::SeqCst), 0);
3913 }
3914}