observable_property/lib.rs
1//! # Observable Property
2//!
3//! A thread-safe observable property implementation for Rust that allows you to
4//! observe changes to values across multiple threads.
5//!
6//! ## Features
7//!
8//! - **Thread-safe**: Uses `Arc<RwLock<>>` for safe concurrent access
9//! - **Observer pattern**: Subscribe to property changes with callbacks
10//! - **RAII subscriptions**: Automatic cleanup with subscription guards (no manual unsubscribe needed)
11//! - **Filtered observers**: Only notify when specific conditions are met
12//! - **Async notifications**: Non-blocking observer notifications with background threads
13//! - **Panic isolation**: Observer panics don't crash the system
14//! - **Graceful lock recovery**: Continues operation even after lock poisoning from panics
15//! - **Memory protection**: Observer limit (10,000) prevents memory exhaustion
16//! - **Type-safe**: Generic implementation works with any `Clone + Send + Sync` type
17//!
18//! ## Quick Start
19//!
20//! ```rust
21//! use observable_property::ObservableProperty;
22//! use std::sync::Arc;
23//!
24//! // Create an observable property
25//! let property = ObservableProperty::new(42);
26//!
27//! // Subscribe to changes
28//! let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
29//! println!("Value changed from {} to {}", old_value, new_value);
30//! })).map_err(|e| {
31//! eprintln!("Failed to subscribe: {}", e);
32//! e
33//! })?;
34//!
35//! // Change the value (triggers observer)
36//! property.set(100).map_err(|e| {
37//! eprintln!("Failed to set value: {}", e);
38//! e
39//! })?;
40//!
41//! // Unsubscribe when done
42//! property.unsubscribe(observer_id).map_err(|e| {
43//! eprintln!("Failed to unsubscribe: {}", e);
44//! e
45//! })?;
46//! # Ok::<(), observable_property::PropertyError>(())
47//! ```
48//!
49//! ## Multi-threading Example
50//!
51//! ```rust
52//! use observable_property::ObservableProperty;
53//! use std::sync::Arc;
54//! use std::thread;
55//!
56//! let property = Arc::new(ObservableProperty::new(0));
57//! let property_clone = property.clone();
58//!
59//! // Subscribe from one thread
60//! property.subscribe(Arc::new(|old, new| {
61//! println!("Value changed: {} -> {}", old, new);
62//! })).map_err(|e| {
63//! eprintln!("Failed to subscribe: {}", e);
64//! e
65//! })?;
66//!
67//! // Modify from another thread
68//! thread::spawn(move || {
69//! if let Err(e) = property_clone.set(42) {
70//! eprintln!("Failed to set value: {}", e);
71//! }
72//! }).join().expect("Thread panicked");
73//! # Ok::<(), observable_property::PropertyError>(())
74//! ```
75//!
76//! ## RAII Subscriptions (Recommended)
77//!
78//! For automatic cleanup without manual unsubscribe calls, use RAII subscriptions:
79//!
80//! ```rust
81//! use observable_property::ObservableProperty;
82//! use std::sync::Arc;
83//!
84//! # fn main() -> Result<(), observable_property::PropertyError> {
85//! let property = ObservableProperty::new(0);
86//!
87//! {
88//! // Create RAII subscription - automatically cleaned up when dropped
89//! let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
90//! println!("Value changed: {} -> {}", old, new);
91//! }))?;
92//!
93//! property.set(42)?; // Prints: "Value changed: 0 -> 42"
94//!
95//! // Subscription automatically unsubscribes when leaving this scope
96//! }
97//!
98//! // No observer active anymore
99//! property.set(100)?; // No output
100//! # Ok(())
101//! # }
102//! ```
103//!
104//! ## Filtered RAII Subscriptions
105//!
106//! Combine filtering with automatic cleanup for conditional monitoring:
107//!
108//! ```rust
109//! use observable_property::ObservableProperty;
110//! use std::sync::Arc;
111//!
112//! # fn main() -> Result<(), observable_property::PropertyError> {
113//! let temperature = ObservableProperty::new(20.0);
114//!
115//! {
116//! // Monitor only significant temperature increases with automatic cleanup
117//! let _heat_warning = temperature.subscribe_filtered_with_subscription(
118//! Arc::new(|old, new| {
119//! println!("🔥 Heat warning! {:.1}°C -> {:.1}°C", old, new);
120//! }),
121//! |old, new| new > old && (new - old) > 5.0
122//! )?;
123//!
124//! temperature.set(22.0)?; // No warning (only 2°C increase)
125//! temperature.set(28.0)?; // Prints warning (6°C increase from 22°C)
126//!
127//! // Subscription automatically cleaned up here
128//! }
129//!
130//! temperature.set(35.0)?; // No warning (subscription was cleaned up)
131//! # Ok(())
132//! # }
133//! ```
134//!
135//! ## Subscription Management Comparison
136//!
137//! ```rust
138//! use observable_property::ObservableProperty;
139//! use std::sync::Arc;
140//!
141//! # fn main() -> Result<(), observable_property::PropertyError> {
142//! let property = ObservableProperty::new(0);
143//! let observer = Arc::new(|old: &i32, new: &i32| {
144//! println!("Value: {} -> {}", old, new);
145//! });
146//!
147//! // Method 1: Manual subscription management
148//! let observer_id = property.subscribe(observer.clone())?;
149//! property.set(42)?;
150//! property.unsubscribe(observer_id)?; // Manual cleanup required
151//!
152//! // Method 2: RAII subscription management (recommended)
153//! {
154//! let _subscription = property.subscribe_with_subscription(observer)?;
155//! property.set(100)?;
156//! // Automatic cleanup when _subscription goes out of scope
157//! }
158//! # Ok(())
159//! # }
160//! ```
161//!
162//! ## Advanced RAII Patterns
163//!
164//! Comprehensive example showing various RAII subscription patterns:
165//!
166//! ```rust
167//! use observable_property::ObservableProperty;
168//! use std::sync::Arc;
169//!
170//! # fn main() -> Result<(), observable_property::PropertyError> {
171//! // System monitoring example
172//! let cpu_usage = ObservableProperty::new(25.0f64); // percentage
173//! let memory_usage = ObservableProperty::new(1024); // MB
174//! let active_connections = ObservableProperty::new(0u32);
175//!
176//! // Conditional monitoring based on system state
177//! let high_load_monitoring = cpu_usage.get()? > 50.0;
178//!
179//! if high_load_monitoring {
180//! // Critical system monitoring - active only during high load
181//! let _cpu_critical = cpu_usage.subscribe_filtered_with_subscription(
182//! Arc::new(|old, new| {
183//! println!("🚨 Critical CPU usage: {:.1}% -> {:.1}%", old, new);
184//! }),
185//! |_, new| *new > 80.0
186//! )?;
187//!
188//! let _memory_warning = memory_usage.subscribe_filtered_with_subscription(
189//! Arc::new(|old, new| {
190//! println!("⚠️ High memory usage: {}MB -> {}MB", old, new);
191//! }),
192//! |_, new| *new > 8192 // > 8GB
193//! )?;
194//!
195//! // Simulate system load changes
196//! cpu_usage.set(85.0)?; // Would trigger critical alert
197//! memory_usage.set(9216)?; // Would trigger memory warning
198//!
199//! // All monitoring automatically stops when exiting this block
200//! }
201//!
202//! // Connection monitoring with scoped lifetime
203//! {
204//! let _connection_monitor = active_connections.subscribe_with_subscription(
205//! Arc::new(|old, new| {
206//! if new > old {
207//! println!("📈 New connections: {} -> {}", old, new);
208//! } else if new < old {
209//! println!("📉 Connections closed: {} -> {}", old, new);
210//! }
211//! })
212//! )?;
213//!
214//! active_connections.set(5)?; // Prints: "📈 New connections: 0 -> 5"
215//! active_connections.set(3)?; // Prints: "📉 Connections closed: 5 -> 3"
216//! active_connections.set(8)?; // Prints: "📈 New connections: 3 -> 8"
217//!
218//! // Connection monitoring automatically stops here
219//! }
220//!
221//! // No monitoring active anymore
222//! cpu_usage.set(95.0)?; // No output
223//! memory_usage.set(10240)?; // No output
224//! active_connections.set(15)?; // No output
225//!
226//! println!("All monitoring automatically cleaned up!");
227//! # Ok(())
228//! # }
229//! ```
230
231use std::collections::HashMap;
232use std::fmt;
233use std::panic;
234use std::sync::{Arc, RwLock};
235use std::thread;
236
237/// Maximum number of background threads used for asynchronous observer notifications
238///
239/// This constant controls the degree of parallelism when using `set_async()` to notify
240/// observers. The observer list is divided into batches, with each batch running in
241/// its own background thread, up to this maximum number of threads.
242///
243/// # Rationale
244///
245/// - **Resource Control**: Prevents unbounded thread creation that could exhaust system resources
246/// - **Performance Balance**: Provides parallelism benefits without excessive context switching overhead
247/// - **Scalability**: Ensures consistent behavior regardless of the number of observers
248/// - **System Responsiveness**: Limits thread contention on multi-core systems
249///
250/// # Implementation Details
251///
252/// When `set_async()` is called:
253/// 1. All observers are collected into a snapshot
254/// 2. Observers are divided into `MAX_THREADS` batches (or fewer if there are fewer observers)
255/// 3. Each batch executes in its own `thread::spawn()` call
256/// 4. Observers within each batch are executed sequentially
257///
258/// For example, with 100 observers and `MAX_THREADS = 4`:
259/// - Batch 1: Observers 1-25 (Thread 1)
260/// - Batch 2: Observers 26-50 (Thread 2)
261/// - Batch 3: Observers 51-75 (Thread 3)
262/// - Batch 4: Observers 76-100 (Thread 4)
263///
264/// # Tuning Considerations
265///
266/// This value can be adjusted based on your application's needs:
267/// - **CPU-bound observers**: Higher values may improve throughput on multi-core systems
268/// - **I/O-bound observers**: Higher values can improve concurrency for network/disk operations
269/// - **Memory-constrained systems**: Lower values reduce thread overhead
270/// - **Real-time systems**: Lower values reduce scheduling unpredictability
271///
272/// # Thread Safety
273///
274/// This constant is used only during the batching calculation and does not affect
275/// the thread safety of the overall system.
276const MAX_THREADS: usize = 4;
277
278/// Maximum number of observers allowed per property instance
279///
280/// This limit prevents memory exhaustion from unbounded observer registration.
281/// Once this limit is reached, attempts to add more observers will fail with
282/// an `InvalidConfiguration` error.
283///
284/// # Rationale
285///
286/// - **Memory Protection**: Prevents unbounded memory growth from observer accumulation
287/// - **Resource Management**: Ensures predictable memory usage in long-running applications
288/// - **Early Detection**: Catches potential memory leaks from forgotten unsubscriptions
289/// - **System Stability**: Prevents out-of-memory conditions in constrained environments
290///
291/// # Tuning Considerations
292///
293/// This value can be adjusted based on your application's needs:
294/// - **High-frequency properties**: May need fewer observers to avoid notification overhead
295/// - **Low-frequency properties**: Can safely support more observers
296/// - **Memory-constrained systems**: Lower values prevent memory pressure
297/// - **Development/testing**: Higher values may be useful for comprehensive test coverage
298///
299/// # Default Value
300///
301/// The default of 10,000 observers provides generous headroom for most applications while
302/// still preventing pathological cases. In practice, most properties have fewer than 100
303/// observers.
304///
305/// # Example Scenarios
306///
307/// - **User sessions**: 1,000 concurrent users, each with 5 properties = ~5,000 observers
308/// - **IoT devices**: 5,000 devices, each with 1-2 observers = ~10,000 observers
309/// - **Monitoring system**: 100 metrics, each with 20 dashboard widgets = ~2,000 observers
310const MAX_OBSERVERS: usize = 10_000;
311
312/// Errors that can occur when working with ObservableProperty
313///
314/// # Note on Lock Poisoning
315///
316/// This implementation uses **graceful degradation** for poisoned locks. When a lock
317/// is poisoned (typically due to a panic in an observer or another thread), the
318/// library automatically recovers the inner value using [`PoisonError::into_inner()`](std::sync::PoisonError::into_inner).
319///
320/// This means:
321/// - **All operations continue to work** even after a lock is poisoned
322/// - No `ReadLockError`, `WriteLockError`, or `PoisonedLock` errors will occur in practice
323/// - The system remains operational and observers continue to function
324///
325/// The error variants are kept for backward compatibility and potential future use cases,
326/// but with the current implementation, poisoned locks are transparent to users.
327///
328/// # Production Benefit
329///
330/// This approach ensures maximum availability and resilience in production systems where
331/// a misbehaving observer shouldn't bring down the entire property system.
332#[derive(Debug, Clone)]
333pub enum PropertyError {
334 /// Failed to acquire a read lock on the property
335 ///
336 /// **Note**: With graceful degradation, this error is unlikely to occur in practice
337 /// as poisoned locks are automatically recovered.
338 ReadLockError {
339 /// Context describing what operation was being attempted
340 context: String,
341 },
342 /// Failed to acquire a write lock on the property
343 ///
344 /// **Note**: With graceful degradation, this error is unlikely to occur in practice
345 /// as poisoned locks are automatically recovered.
346 WriteLockError {
347 /// Context describing what operation was being attempted
348 context: String,
349 },
350 /// Attempted to unsubscribe an observer that doesn't exist
351 ObserverNotFound {
352 /// The ID of the observer that wasn't found
353 id: usize,
354 },
355 /// The property's lock has been poisoned due to a panic in another thread
356 ///
357 /// **Note**: With graceful degradation, this error will not occur in practice
358 /// as the implementation automatically recovers from poisoned locks.
359 PoisonedLock,
360 /// An observer function encountered an error during execution
361 ObserverError {
362 /// Description of what went wrong
363 reason: String,
364 },
365 /// The thread pool for async notifications is exhausted
366 ThreadPoolExhausted,
367 /// Invalid configuration was provided
368 InvalidConfiguration {
369 /// Description of the invalid configuration
370 reason: String,
371 },
372}
373
374impl fmt::Display for PropertyError {
375 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
376 match self {
377 PropertyError::ReadLockError { context } => {
378 write!(f, "Failed to acquire read lock: {}", context)
379 }
380 PropertyError::WriteLockError { context } => {
381 write!(f, "Failed to acquire write lock: {}", context)
382 }
383 PropertyError::ObserverNotFound { id } => {
384 write!(f, "Observer with ID {} not found", id)
385 }
386 PropertyError::PoisonedLock => {
387 write!(
388 f,
389 "Property is in a poisoned state due to a panic in another thread"
390 )
391 }
392 PropertyError::ObserverError { reason } => {
393 write!(f, "Observer execution failed: {}", reason)
394 }
395 PropertyError::ThreadPoolExhausted => {
396 write!(f, "Thread pool is exhausted and cannot spawn more observers")
397 }
398 PropertyError::InvalidConfiguration { reason } => {
399 write!(f, "Invalid configuration: {}", reason)
400 }
401 }
402 }
403}
404
405impl std::error::Error for PropertyError {}
406
407/// Function type for observers that get called when property values change
408pub type Observer<T> = Arc<dyn Fn(&T, &T) + Send + Sync>;
409
410/// Unique identifier for registered observers
411pub type ObserverId = usize;
412
413/// A RAII guard for an observer subscription that automatically unsubscribes when dropped
414///
415/// This struct provides automatic cleanup for observer subscriptions using RAII (Resource
416/// Acquisition Is Initialization) pattern. When a `Subscription` goes out of scope, its
417/// `Drop` implementation automatically removes the associated observer from the property.
418///
419/// This eliminates the need for manual `unsubscribe()` calls and helps prevent resource
420/// leaks in scenarios where observers might otherwise be forgotten.
421///
422/// # Type Requirements
423///
424/// The generic type `T` must implement the same traits as `ObservableProperty<T>`:
425/// - `Clone`: Required for observer notifications
426/// - `Send`: Required for transferring between threads
427/// - `Sync`: Required for concurrent access from multiple threads
428/// - `'static`: Required for observer callbacks that may outlive the original scope
429///
430/// # Examples
431///
432/// ## Basic RAII Subscription
433///
434/// ```rust
435/// use observable_property::ObservableProperty;
436/// use std::sync::Arc;
437///
438/// # fn main() -> Result<(), observable_property::PropertyError> {
439/// let property = ObservableProperty::new(0);
440///
441/// {
442/// // Create subscription - observer is automatically registered
443/// let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
444/// println!("Value changed: {} -> {}", old, new);
445/// }))?;
446///
447/// property.set(42)?; // Observer is called: "Value changed: 0 -> 42"
448///
449/// // When _subscription goes out of scope here, observer is automatically removed
450/// }
451///
452/// property.set(100)?; // No observer output - subscription was automatically cleaned up
453/// # Ok(())
454/// # }
455/// ```
456///
457/// ## Cross-Thread Subscription Management
458///
459/// ```rust
460/// use observable_property::ObservableProperty;
461/// use std::sync::Arc;
462/// use std::thread;
463///
464/// # fn main() -> Result<(), observable_property::PropertyError> {
465/// let property = Arc::new(ObservableProperty::new(0));
466/// let property_clone = property.clone();
467///
468/// // Create subscription in main thread
469/// let subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
470/// println!("Observed: {} -> {}", old, new);
471/// }))?;
472///
473/// // Move subscription to another thread for cleanup
474/// let handle = thread::spawn(move || {
475/// // Subscription is still active here
476/// let _ = property_clone.set(42); // Will trigger observer
477///
478/// // When subscription is dropped here (end of thread), observer is cleaned up
479/// drop(subscription);
480/// });
481///
482/// handle.join().unwrap();
483///
484/// // Observer is no longer active
485/// property.set(100)?; // No output
486/// # Ok(())
487/// # }
488/// ```
489///
490/// ## Conditional Scoped Subscriptions
491///
492/// ```rust
493/// use observable_property::ObservableProperty;
494/// use std::sync::Arc;
495///
496/// # fn main() -> Result<(), observable_property::PropertyError> {
497/// let counter = ObservableProperty::new(0);
498/// let debug_mode = true;
499///
500/// if debug_mode {
501/// let _debug_subscription = counter.subscribe_with_subscription(Arc::new(|old, new| {
502/// println!("Debug: counter {} -> {}", old, new);
503/// }))?;
504///
505/// counter.set(1)?; // Prints debug info
506/// counter.set(2)?; // Prints debug info
507///
508/// // Debug subscription automatically cleaned up when exiting if block
509/// }
510///
511/// counter.set(3)?; // No debug output (subscription was cleaned up)
512/// # Ok(())
513/// # }
514/// ```
515///
516/// # Thread Safety
517///
518/// Like `ObservableProperty` itself, `Subscription` is thread-safe. It can be safely
519/// sent between threads and the automatic cleanup will work correctly even if the
520/// subscription is dropped from a different thread than where it was created.
521pub struct Subscription<T: Clone + Send + Sync + 'static> {
522 inner: Arc<RwLock<InnerProperty<T>>>,
523 id: ObserverId,
524}
525
526impl<T: Clone + Send + Sync + 'static> std::fmt::Debug for Subscription<T> {
527 /// Debug implementation that shows the subscription ID without exposing internals
528 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
529 f.debug_struct("Subscription")
530 .field("id", &self.id)
531 .field("inner", &"[ObservableProperty]")
532 .finish()
533 }
534}
535
536impl<T: Clone + Send + Sync + 'static> Drop for Subscription<T> {
537 /// Automatically removes the associated observer when the subscription is dropped
538 ///
539 /// This implementation provides automatic cleanup by removing the observer
540 /// from the property's observer list when the `Subscription` goes out of scope.
541 ///
542 /// # Error Handling
543 ///
544 /// If the property's lock is poisoned or inaccessible during cleanup, the error
545 /// is silently ignored using the `let _ = ...` pattern. This is intentional
546 /// because:
547 /// 1. Drop implementations should not panic
548 /// 2. If the property is poisoned, it's likely unusable anyway
549 /// 3. There's no meaningful way to handle cleanup errors in a destructor
550 ///
551 /// # Thread Safety
552 ///
553 /// This method is safe to call from any thread, even if the subscription
554 /// was created on a different thread.
555 fn drop(&mut self) {
556 // Graceful degradation: always attempt to clean up, even from poisoned locks
557 match self.inner.write() {
558 Ok(mut guard) => {
559 guard.observers.remove(&self.id);
560 }
561 Err(poisoned) => {
562 // Recover from poisoned lock to ensure cleanup happens
563 let mut guard = poisoned.into_inner();
564 guard.observers.remove(&self.id);
565 }
566 }
567 }
568}
569
570/// A thread-safe observable property that notifies observers when its value changes
571///
572/// This type wraps a value of type `T` and allows multiple observers to be notified
573/// whenever the value is modified. All operations are thread-safe and can be called
574/// from multiple threads concurrently.
575///
576/// # Type Requirements
577///
578/// The generic type `T` must implement:
579/// - `Clone`: Required for returning values and passing them to observers
580/// - `Send`: Required for transferring between threads
581/// - `Sync`: Required for concurrent access from multiple threads
582/// - `'static`: Required for observer callbacks that may outlive the original scope
583///
584/// # Examples
585///
586/// ```rust
587/// use observable_property::ObservableProperty;
588/// use std::sync::Arc;
589///
590/// let property = ObservableProperty::new("initial".to_string());
591///
592/// let observer_id = property.subscribe(Arc::new(|old, new| {
593/// println!("Changed from '{}' to '{}'", old, new);
594/// })).map_err(|e| {
595/// eprintln!("Failed to subscribe: {}", e);
596/// e
597/// })?;
598///
599/// property.set("updated".to_string()).map_err(|e| {
600/// eprintln!("Failed to set value: {}", e);
601/// e
602/// })?; // Prints: Changed from 'initial' to 'updated'
603///
604/// property.unsubscribe(observer_id).map_err(|e| {
605/// eprintln!("Failed to unsubscribe: {}", e);
606/// e
607/// })?;
608/// # Ok::<(), observable_property::PropertyError>(())
609/// ```
610pub struct ObservableProperty<T> {
611 inner: Arc<RwLock<InnerProperty<T>>>,
612 max_threads: usize,
613}
614
615struct InnerProperty<T> {
616 value: T,
617 observers: HashMap<ObserverId, Observer<T>>,
618 next_id: ObserverId,
619}
620
621impl<T: Clone + Send + Sync + 'static> ObservableProperty<T> {
622 /// Creates a new observable property with the given initial value
623 ///
624 /// # Arguments
625 ///
626 /// * `initial_value` - The starting value for this property
627 ///
628 /// # Examples
629 ///
630 /// ```rust
631 /// use observable_property::ObservableProperty;
632 ///
633 /// let property = ObservableProperty::new(42);
634 /// match property.get() {
635 /// Ok(value) => assert_eq!(value, 42),
636 /// Err(e) => eprintln!("Failed to get property value: {}", e),
637 /// }
638 /// ```
639 pub fn new(initial_value: T) -> Self {
640 Self {
641 inner: Arc::new(RwLock::new(InnerProperty {
642 value: initial_value,
643 observers: HashMap::new(),
644 next_id: 0,
645 })),
646 max_threads: MAX_THREADS,
647 }
648 }
649
650 /// Creates a new observable property with a custom maximum thread count for async notifications
651 ///
652 /// This constructor allows you to customize the maximum number of threads used for
653 /// asynchronous observer notifications via `set_async()`. This is useful for tuning
654 /// performance based on your specific use case and system constraints.
655 ///
656 /// # Arguments
657 ///
658 /// * `initial_value` - The starting value for this property
659 /// * `max_threads` - Maximum number of threads to use for async notifications.
660 /// If 0 is provided, defaults to 4.
661 ///
662 /// # Thread Pool Behavior
663 ///
664 /// When `set_async()` is called, observers are divided into batches and each batch
665 /// runs in its own thread, up to the specified maximum. For example:
666 /// - With 100 observers and `max_threads = 4`: 4 threads with ~25 observers each
667 /// - With 10 observers and `max_threads = 8`: 10 threads with 1 observer each
668 /// - With 2 observers and `max_threads = 4`: 2 threads with 1 observer each
669 ///
670 /// # Use Cases
671 ///
672 /// ## High-Throughput Systems
673 /// ```rust
674 /// use observable_property::ObservableProperty;
675 ///
676 /// // For systems with many CPU cores and CPU-bound observers
677 /// let property = ObservableProperty::with_max_threads(0, 8);
678 /// ```
679 ///
680 /// ## Resource-Constrained Systems
681 /// ```rust
682 /// use observable_property::ObservableProperty;
683 ///
684 /// // For embedded systems or memory-constrained environments
685 /// let property = ObservableProperty::with_max_threads(42, 1);
686 /// ```
687 ///
688 /// ## I/O-Heavy Observers
689 /// ```rust
690 /// use observable_property::ObservableProperty;
691 ///
692 /// // For observers that do network/database operations
693 /// let property = ObservableProperty::with_max_threads("data".to_string(), 16);
694 /// ```
695 ///
696 /// # Performance Considerations
697 ///
698 /// - **Higher values**: Better parallelism but more thread overhead and memory usage
699 /// - **Lower values**: Less overhead but potentially slower async notifications
700 /// - **Optimal range**: Typically between 1 and 2x the number of CPU cores
701 /// - **Zero value**: Automatically uses the default value (4)
702 ///
703 /// # Thread Safety
704 ///
705 /// This setting only affects async notifications (`set_async()`). Synchronous
706 /// operations (`set()`) always execute observers sequentially regardless of this setting.
707 ///
708 /// # Examples
709 ///
710 /// ```rust
711 /// use observable_property::ObservableProperty;
712 /// use std::sync::Arc;
713 ///
714 /// // Create property with custom thread pool size
715 /// let property = ObservableProperty::with_max_threads(42, 2);
716 ///
717 /// // Subscribe observers as usual
718 /// let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
719 /// println!("Value changed: {} -> {}", old, new);
720 /// })).expect("Failed to create subscription");
721 ///
722 /// // Async notifications will use at most 2 threads
723 /// property.set_async(100).expect("Failed to set value asynchronously");
724 /// ```
725 pub fn with_max_threads(initial_value: T, max_threads: usize) -> Self {
726 let max_threads = if max_threads == 0 {
727 MAX_THREADS
728 } else {
729 max_threads
730 };
731 Self {
732 inner: Arc::new(RwLock::new(InnerProperty {
733 value: initial_value,
734 observers: HashMap::new(),
735 next_id: 0,
736 })),
737 max_threads,
738 }
739 }
740
741 /// Gets the current value of the property
742 ///
743 /// This method acquires a read lock, which allows multiple concurrent readers
744 /// but will block if a writer currently holds the lock.
745 ///
746 /// # Returns
747 ///
748 /// `Ok(T)` containing a clone of the current value, or `Err(PropertyError)`
749 /// if the lock is poisoned.
750 ///
751 /// # Examples
752 ///
753 /// ```rust
754 /// use observable_property::ObservableProperty;
755 ///
756 /// let property = ObservableProperty::new("hello".to_string());
757 /// match property.get() {
758 /// Ok(value) => assert_eq!(value, "hello"),
759 /// Err(e) => eprintln!("Failed to get property value: {}", e),
760 /// }
761 /// ```
762 pub fn get(&self) -> Result<T, PropertyError> {
763 match self.inner.read() {
764 Ok(prop) => Ok(prop.value.clone()),
765 Err(poisoned) => {
766 // Graceful degradation: recover value from poisoned lock
767 // This allows continued operation even after a panic in another thread
768 Ok(poisoned.into_inner().value.clone())
769 }
770 }
771 }
772
773 /// Sets the property to a new value and notifies all observers
774 ///
775 /// This method will:
776 /// 1. Acquire a write lock (blocking other readers/writers)
777 /// 2. Update the value and capture a snapshot of observers
778 /// 3. Release the lock
779 /// 4. Notify all observers sequentially with the old and new values
780 ///
781 /// Observer notifications are wrapped in panic recovery to prevent one
782 /// misbehaving observer from affecting others.
783 ///
784 /// # Arguments
785 ///
786 /// * `new_value` - The new value to set
787 ///
788 /// # Returns
789 ///
790 /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
791 ///
792 /// # Examples
793 ///
794 /// ```rust
795 /// use observable_property::ObservableProperty;
796 /// use std::sync::Arc;
797 ///
798 /// let property = ObservableProperty::new(10);
799 ///
800 /// property.subscribe(Arc::new(|old, new| {
801 /// println!("Value changed from {} to {}", old, new);
802 /// })).map_err(|e| {
803 /// eprintln!("Failed to subscribe: {}", e);
804 /// e
805 /// })?;
806 ///
807 /// property.set(20).map_err(|e| {
808 /// eprintln!("Failed to set property value: {}", e);
809 /// e
810 /// })?; // Triggers observer notification
811 /// # Ok::<(), observable_property::PropertyError>(())
812 /// ```
813 pub fn set(&self, new_value: T) -> Result<(), PropertyError> {
814 let (old_value, observers_snapshot) = {
815 let mut prop = match self.inner.write() {
816 Ok(guard) => guard,
817 Err(poisoned) => {
818 // Graceful degradation: recover from poisoned write lock
819 // Clear the poison flag by taking ownership of the inner value
820 poisoned.into_inner()
821 }
822 };
823
824 let old_value = prop.value.clone();
825 prop.value = new_value.clone();
826 let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
827 (old_value, observers_snapshot)
828 };
829
830 for observer in observers_snapshot {
831 if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
832 observer(&old_value, &new_value);
833 })) {
834 eprintln!("Observer panic: {:?}", e);
835 }
836 }
837
838 Ok(())
839 }
840
841 /// Sets the property to a new value and notifies observers asynchronously
842 ///
843 /// This method is similar to `set()` but spawns observers in background threads
844 /// for non-blocking operation. This is useful when observers might perform
845 /// time-consuming operations.
846 ///
847 /// Observers are batched into groups and each batch runs in its own thread
848 /// to limit resource usage while still providing parallelism.
849 ///
850 /// # Thread Management (Fire-and-Forget Pattern)
851 ///
852 /// **Important**: This method uses a fire-and-forget pattern. Spawned threads are
853 /// **not joined** and run independently in the background. This design is intentional
854 /// for non-blocking behavior but has important implications:
855 ///
856 /// ## Characteristics:
857 /// - ✅ **Non-blocking**: Returns immediately without waiting for observers
858 /// - ✅ **High performance**: No synchronization overhead
859 /// - ⚠️ **No completion guarantee**: Thread may still be running when method returns
860 /// - ⚠️ **No error propagation**: Observer errors are logged but not returned
861 /// - ⚠️ **Testing caveat**: May need explicit delays to observe side effects
862 ///
863 /// ## Use Cases:
864 /// - **UI updates**: Fire updates without blocking the main thread
865 /// - **Logging**: Asynchronous logging that doesn't block operations
866 /// - **Metrics**: Non-critical telemetry that can be lost
867 /// - **Notifications**: Fire-and-forget alerts or messages
868 ///
869 /// ## When NOT to Use:
870 /// - **Critical operations**: Use `set()` if you need guarantees
871 /// - **Transactional updates**: Use `set()` for atomic operations
872 /// - **Sequential dependencies**: If next operation depends on observer completion
873 ///
874 /// ## Testing Considerations:
875 /// ```rust
876 /// use observable_property::ObservableProperty;
877 /// use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
878 /// use std::time::Duration;
879 ///
880 /// # fn main() -> Result<(), observable_property::PropertyError> {
881 /// let property = ObservableProperty::new(0);
882 /// let was_called = Arc::new(AtomicBool::new(false));
883 /// let flag = was_called.clone();
884 ///
885 /// property.subscribe(Arc::new(move |_, _| {
886 /// flag.store(true, Ordering::SeqCst);
887 /// }))?;
888 ///
889 /// property.set_async(42)?;
890 ///
891 /// // ⚠️ Immediate check might fail - thread may not have run yet
892 /// // assert!(was_called.load(Ordering::SeqCst)); // May fail!
893 ///
894 /// // ✅ Add a small delay to allow background thread to complete
895 /// std::thread::sleep(Duration::from_millis(10));
896 /// assert!(was_called.load(Ordering::SeqCst)); // Now reliable
897 /// # Ok(())
898 /// # }
899 /// ```
900 ///
901 /// # Arguments
902 ///
903 /// * `new_value` - The new value to set
904 ///
905 /// # Returns
906 ///
907 /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
908 /// Note that this only indicates the property was updated successfully;
909 /// observer execution happens asynchronously and errors are not returned.
910 ///
911 /// # Examples
912 ///
913 /// ## Basic Usage
914 ///
915 /// ```rust
916 /// use observable_property::ObservableProperty;
917 /// use std::sync::Arc;
918 /// use std::time::Duration;
919 ///
920 /// let property = ObservableProperty::new(0);
921 ///
922 /// property.subscribe(Arc::new(|old, new| {
923 /// // This observer does slow work but won't block the caller
924 /// std::thread::sleep(Duration::from_millis(100));
925 /// println!("Slow observer: {} -> {}", old, new);
926 /// })).map_err(|e| {
927 /// eprintln!("Failed to subscribe: {}", e);
928 /// e
929 /// })?;
930 ///
931 /// // This returns immediately even though observer is slow
932 /// property.set_async(42).map_err(|e| {
933 /// eprintln!("Failed to set value asynchronously: {}", e);
934 /// e
935 /// })?;
936 ///
937 /// // Continue working immediately - observer runs in background
938 /// println!("Main thread continues without waiting");
939 /// # Ok::<(), observable_property::PropertyError>(())
940 /// ```
941 ///
942 /// ## Multiple Rapid Updates
943 ///
944 /// ```rust
945 /// use observable_property::ObservableProperty;
946 /// use std::sync::Arc;
947 ///
948 /// # fn main() -> Result<(), observable_property::PropertyError> {
949 /// let property = ObservableProperty::new(0);
950 ///
951 /// property.subscribe(Arc::new(|old, new| {
952 /// // Expensive operation (e.g., database update, API call)
953 /// println!("Processing: {} -> {}", old, new);
954 /// }))?;
955 ///
956 /// // All of these return immediately - observers run in parallel
957 /// property.set_async(1)?;
958 /// property.set_async(2)?;
959 /// property.set_async(3)?;
960 /// property.set_async(4)?;
961 /// property.set_async(5)?;
962 ///
963 /// // All observer calls are now running in background threads
964 /// # Ok(())
965 /// # }
966 /// ```
967 pub fn set_async(&self, new_value: T) -> Result<(), PropertyError> {
968 let (old_value, observers_snapshot) = {
969 let mut prop = match self.inner.write() {
970 Ok(guard) => guard,
971 Err(poisoned) => {
972 // Graceful degradation: recover from poisoned write lock
973 poisoned.into_inner()
974 }
975 };
976
977 let old_value = prop.value.clone();
978 prop.value = new_value.clone();
979 let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
980 (old_value, observers_snapshot)
981 };
982
983 if observers_snapshot.is_empty() {
984 return Ok(());
985 }
986
987 let observers_per_thread = observers_snapshot.len().div_ceil(self.max_threads);
988
989 // Fire-and-forget pattern: Spawn threads without joining
990 // This is intentional for non-blocking behavior. Observers run independently
991 // and the caller continues immediately without waiting for completion.
992 // Trade-offs:
993 // ✅ Non-blocking, high performance
994 // ⚠️ No completion guarantee, no error propagation to caller
995 for batch in observers_snapshot.chunks(observers_per_thread) {
996 let batch_observers = batch.to_vec();
997 let old_val = old_value.clone();
998 let new_val = new_value.clone();
999
1000 thread::spawn(move || {
1001 for observer in batch_observers {
1002 if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
1003 observer(&old_val, &new_val);
1004 })) {
1005 eprintln!("Observer panic in batch thread: {:?}", e);
1006 }
1007 }
1008 });
1009 // Thread handle intentionally dropped - fire-and-forget pattern
1010 }
1011
1012 Ok(())
1013 }
1014
1015 /// Subscribes an observer function to be called when the property changes
1016 ///
1017 /// The observer function will be called with the old and new values whenever
1018 /// the property is modified via `set()` or `set_async()`.
1019 ///
1020 /// # Arguments
1021 ///
1022 /// * `observer` - A function wrapped in `Arc` that takes `(&T, &T)` parameters
1023 ///
1024 /// # Returns
1025 ///
1026 /// `Ok(ObserverId)` containing a unique identifier for this observer,
1027 /// or `Err(PropertyError::InvalidConfiguration)` if the maximum observer limit is exceeded.
1028 ///
1029 /// # Observer Limit
1030 ///
1031 /// To prevent memory exhaustion, there is a maximum limit of observers per property
1032 /// (currently set to 10,000). If you attempt to add more observers than this limit,
1033 /// the subscription will fail with an `InvalidConfiguration` error.
1034 ///
1035 /// This protection helps prevent:
1036 /// - Memory leaks from forgotten unsubscriptions
1037 /// - Unbounded memory growth in long-running applications
1038 /// - Out-of-memory conditions in resource-constrained environments
1039 ///
1040 /// # Examples
1041 ///
1042 /// ```rust
1043 /// use observable_property::ObservableProperty;
1044 /// use std::sync::Arc;
1045 ///
1046 /// let property = ObservableProperty::new(0);
1047 ///
1048 /// let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
1049 /// println!("Property changed from {} to {}", old_value, new_value);
1050 /// })).map_err(|e| {
1051 /// eprintln!("Failed to subscribe observer: {}", e);
1052 /// e
1053 /// })?;
1054 ///
1055 /// // Later, unsubscribe using the returned ID
1056 /// property.unsubscribe(observer_id).map_err(|e| {
1057 /// eprintln!("Failed to unsubscribe observer: {}", e);
1058 /// e
1059 /// })?;
1060 /// # Ok::<(), observable_property::PropertyError>(())
1061 /// ```
1062 pub fn subscribe(&self, observer: Observer<T>) -> Result<ObserverId, PropertyError> {
1063 let mut prop = match self.inner.write() {
1064 Ok(guard) => guard,
1065 Err(poisoned) => {
1066 // Graceful degradation: recover from poisoned write lock
1067 poisoned.into_inner()
1068 }
1069 };
1070
1071 // Check observer limit to prevent memory exhaustion
1072 if prop.observers.len() >= MAX_OBSERVERS {
1073 return Err(PropertyError::InvalidConfiguration {
1074 reason: format!(
1075 "Maximum observer limit ({}) exceeded. Current observers: {}. \
1076 Consider unsubscribing unused observers to free resources.",
1077 MAX_OBSERVERS,
1078 prop.observers.len()
1079 ),
1080 });
1081 }
1082
1083 let id = prop.next_id;
1084 prop.next_id += 1;
1085 prop.observers.insert(id, observer);
1086 Ok(id)
1087 }
1088
1089 /// Removes an observer identified by its ID
1090 ///
1091 /// # Arguments
1092 ///
1093 /// * `id` - The observer ID returned by `subscribe()`
1094 ///
1095 /// # Returns
1096 ///
1097 /// `Ok(bool)` where `true` means the observer was found and removed,
1098 /// `false` means no observer with that ID existed.
1099 /// Returns `Err(PropertyError)` if the lock is poisoned.
1100 ///
1101 /// # Examples
1102 ///
1103 /// ```rust
1104 /// use observable_property::ObservableProperty;
1105 /// use std::sync::Arc;
1106 ///
1107 /// let property = ObservableProperty::new(0);
1108 /// let id = property.subscribe(Arc::new(|_, _| {})).map_err(|e| {
1109 /// eprintln!("Failed to subscribe: {}", e);
1110 /// e
1111 /// })?;
1112 ///
1113 /// let was_removed = property.unsubscribe(id).map_err(|e| {
1114 /// eprintln!("Failed to unsubscribe: {}", e);
1115 /// e
1116 /// })?;
1117 /// assert!(was_removed); // Observer existed and was removed
1118 ///
1119 /// let was_removed_again = property.unsubscribe(id).map_err(|e| {
1120 /// eprintln!("Failed to unsubscribe again: {}", e);
1121 /// e
1122 /// })?;
1123 /// assert!(!was_removed_again); // Observer no longer exists
1124 /// # Ok::<(), observable_property::PropertyError>(())
1125 /// ```
1126 pub fn unsubscribe(&self, id: ObserverId) -> Result<bool, PropertyError> {
1127 let mut prop = match self.inner.write() {
1128 Ok(guard) => guard,
1129 Err(poisoned) => {
1130 // Graceful degradation: recover from poisoned write lock
1131 poisoned.into_inner()
1132 }
1133 };
1134
1135 let was_present = prop.observers.remove(&id).is_some();
1136 Ok(was_present)
1137 }
1138
1139 /// Subscribes an observer that only gets called when a filter condition is met
1140 ///
1141 /// This is useful for observing only specific types of changes, such as
1142 /// when a value increases or crosses a threshold.
1143 ///
1144 /// # Arguments
1145 ///
1146 /// * `observer` - The observer function to call when the filter passes
1147 /// * `filter` - A predicate function that receives `(old_value, new_value)` and returns `bool`
1148 ///
1149 /// # Returns
1150 ///
1151 /// `Ok(ObserverId)` for the filtered observer, or `Err(PropertyError)` if the lock is poisoned.
1152 ///
1153 /// # Examples
1154 ///
1155 /// ```rust
1156 /// use observable_property::ObservableProperty;
1157 /// use std::sync::Arc;
1158 ///
1159 /// let property = ObservableProperty::new(0);
1160 ///
1161 /// // Only notify when value increases
1162 /// let id = property.subscribe_filtered(
1163 /// Arc::new(|old, new| println!("Value increased: {} -> {}", old, new)),
1164 /// |old, new| new > old
1165 /// ).map_err(|e| {
1166 /// eprintln!("Failed to subscribe filtered observer: {}", e);
1167 /// e
1168 /// })?;
1169 ///
1170 /// property.set(10).map_err(|e| {
1171 /// eprintln!("Failed to set value: {}", e);
1172 /// e
1173 /// })?; // Triggers observer (0 -> 10)
1174 /// property.set(5).map_err(|e| {
1175 /// eprintln!("Failed to set value: {}", e);
1176 /// e
1177 /// })?; // Does NOT trigger observer (10 -> 5)
1178 /// property.set(15).map_err(|e| {
1179 /// eprintln!("Failed to set value: {}", e);
1180 /// e
1181 /// })?; // Triggers observer (5 -> 15)
1182 /// # Ok::<(), observable_property::PropertyError>(())
1183 /// ```
1184 pub fn subscribe_filtered<F>(
1185 &self,
1186 observer: Observer<T>,
1187 filter: F,
1188 ) -> Result<ObserverId, PropertyError>
1189 where
1190 F: Fn(&T, &T) -> bool + Send + Sync + 'static,
1191 {
1192 let filter = Arc::new(filter);
1193 let filtered_observer = Arc::new(move |old_val: &T, new_val: &T| {
1194 if filter(old_val, new_val) {
1195 observer(old_val, new_val);
1196 }
1197 });
1198
1199 self.subscribe(filtered_observer)
1200 }
1201
1202 /// Notifies all observers with a batch of changes
1203 ///
1204 /// This method allows you to trigger observer notifications for multiple
1205 /// value changes efficiently. Unlike individual `set()` calls, this method
1206 /// acquires the observer list once and then notifies all observers with each
1207 /// change in the batch.
1208 ///
1209 /// # Performance Characteristics
1210 ///
1211 /// - **Lock optimization**: Acquires read lock only to snapshot observers, then releases it
1212 /// - **Non-blocking**: Other operations can proceed during observer notifications
1213 /// - **Panic isolation**: Individual observer panics don't affect other observers
1214 ///
1215 /// # Arguments
1216 ///
1217 /// * `changes` - A vector of tuples `(old_value, new_value)` to notify observers about
1218 ///
1219 /// # Returns
1220 ///
1221 /// `Ok(())` if successful. Observer errors are logged but don't cause the method to fail.
1222 ///
1223 /// # Examples
1224 ///
1225 /// ```rust
1226 /// use observable_property::ObservableProperty;
1227 /// use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
1228 ///
1229 /// # fn main() -> Result<(), observable_property::PropertyError> {
1230 /// let property = ObservableProperty::new(0);
1231 /// let call_count = Arc::new(AtomicUsize::new(0));
1232 /// let count_clone = call_count.clone();
1233 ///
1234 /// property.subscribe(Arc::new(move |old, new| {
1235 /// count_clone.fetch_add(1, Ordering::SeqCst);
1236 /// println!("Change: {} -> {}", old, new);
1237 /// }))?;
1238 ///
1239 /// // Notify with multiple changes at once
1240 /// property.notify_observers_batch(vec![
1241 /// (0, 10),
1242 /// (10, 20),
1243 /// (20, 30),
1244 /// ])?;
1245 ///
1246 /// assert_eq!(call_count.load(Ordering::SeqCst), 3);
1247 /// # Ok(())
1248 /// # }
1249 /// ```
1250 ///
1251 /// # Note
1252 ///
1253 /// This method does NOT update the property's actual value - it only triggers
1254 /// observer notifications. Use `set()` if you want to update the value and
1255 /// notify observers.
1256 pub fn notify_observers_batch(&self, changes: Vec<(T, T)>) -> Result<(), PropertyError> {
1257 // Acquire lock, clone observers, then release lock immediately
1258 // This prevents blocking other operations during potentially long notification process
1259 let observers_snapshot = {
1260 let prop = match self.inner.read() {
1261 Ok(guard) => guard,
1262 Err(poisoned) => {
1263 // Graceful degradation: recover from poisoned read lock
1264 poisoned.into_inner()
1265 }
1266 };
1267 prop.observers.values().cloned().collect::<Vec<_>>()
1268 }; // Lock released here
1269
1270 // Notify observers without holding the lock
1271 for (old_val, new_val) in changes {
1272 for observer in &observers_snapshot {
1273 // Wrap in panic recovery like other notification methods
1274 if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
1275 observer(&old_val, &new_val);
1276 })) {
1277 eprintln!("Observer panic in batch notification: {:?}", e);
1278 }
1279 }
1280 }
1281 Ok(())
1282 }
1283
1284 /// Subscribes an observer and returns a RAII guard for automatic cleanup
1285 ///
1286 /// This method is similar to `subscribe()` but returns a `Subscription` object
1287 /// that automatically removes the observer when it goes out of scope. This
1288 /// provides a more convenient and safer alternative to manual subscription
1289 /// management.
1290 ///
1291 /// # Arguments
1292 ///
1293 /// * `observer` - A function wrapped in `Arc` that takes `(&T, &T)` parameters
1294 ///
1295 /// # Returns
1296 ///
1297 /// `Ok(Subscription<T>)` containing a RAII guard for the observer,
1298 /// or `Err(PropertyError)` if the lock is poisoned.
1299 ///
1300 /// # Examples
1301 ///
1302 /// ## Basic RAII Subscription
1303 ///
1304 /// ```rust
1305 /// use observable_property::ObservableProperty;
1306 /// use std::sync::Arc;
1307 ///
1308 /// # fn main() -> Result<(), observable_property::PropertyError> {
1309 /// let property = ObservableProperty::new(0);
1310 ///
1311 /// {
1312 /// let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
1313 /// println!("Value: {} -> {}", old, new);
1314 /// }))?;
1315 ///
1316 /// property.set(42)?; // Prints: "Value: 0 -> 42"
1317 /// property.set(100)?; // Prints: "Value: 42 -> 100"
1318 ///
1319 /// // Automatic cleanup when _subscription goes out of scope
1320 /// }
1321 ///
1322 /// property.set(200)?; // No output - subscription was cleaned up
1323 /// # Ok(())
1324 /// # }
1325 /// ```
1326 ///
1327 /// ## Comparison with Manual Management
1328 ///
1329 /// ```rust
1330 /// use observable_property::ObservableProperty;
1331 /// use std::sync::Arc;
1332 ///
1333 /// # fn main() -> Result<(), observable_property::PropertyError> {
1334 /// let property = ObservableProperty::new("initial".to_string());
1335 ///
1336 /// // Method 1: Manual subscription management (traditional approach)
1337 /// let observer_id = property.subscribe(Arc::new(|old, new| {
1338 /// println!("Manual: {} -> {}", old, new);
1339 /// }))?;
1340 ///
1341 /// // Method 2: RAII subscription management (recommended)
1342 /// let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
1343 /// println!("RAII: {} -> {}", old, new);
1344 /// }))?;
1345 ///
1346 /// // Both observers will be called
1347 /// property.set("changed".to_string())?;
1348 /// // Prints:
1349 /// // "Manual: initial -> changed"
1350 /// // "RAII: initial -> changed"
1351 ///
1352 /// // Manual cleanup required for first observer
1353 /// property.unsubscribe(observer_id)?;
1354 ///
1355 /// // Second observer (_subscription) is automatically cleaned up when
1356 /// // the variable goes out of scope - no manual intervention needed
1357 /// # Ok(())
1358 /// # }
1359 /// ```
1360 ///
1361 /// ## Error Handling with Early Returns
1362 ///
1363 /// ```rust
1364 /// use observable_property::ObservableProperty;
1365 /// use std::sync::Arc;
1366 ///
1367 /// fn process_with_monitoring(property: &ObservableProperty<i32>) -> Result<(), observable_property::PropertyError> {
1368 /// let _monitoring = property.subscribe_with_subscription(Arc::new(|old, new| {
1369 /// println!("Processing: {} -> {}", old, new);
1370 /// }))?;
1371 ///
1372 /// property.set(1)?;
1373 ///
1374 /// if property.get()? > 0 {
1375 /// return Ok(()); // Subscription automatically cleaned up on early return
1376 /// }
1377 ///
1378 /// property.set(2)?;
1379 /// Ok(()) // Subscription automatically cleaned up on normal return
1380 /// }
1381 ///
1382 /// # fn main() -> Result<(), observable_property::PropertyError> {
1383 /// let property = ObservableProperty::new(0);
1384 /// process_with_monitoring(&property)?; // Monitoring active only during function call
1385 /// property.set(99)?; // No monitoring output - subscription was cleaned up
1386 /// # Ok(())
1387 /// # }
1388 /// ```
1389 ///
1390 /// ## Multi-threaded Subscription Management
1391 ///
1392 /// ```rust
1393 /// use observable_property::ObservableProperty;
1394 /// use std::sync::Arc;
1395 /// use std::thread;
1396 ///
1397 /// # fn main() -> Result<(), observable_property::PropertyError> {
1398 /// let property = Arc::new(ObservableProperty::new(0));
1399 /// let property_clone = property.clone();
1400 ///
1401 /// let handle = thread::spawn(move || -> Result<(), observable_property::PropertyError> {
1402 /// let _subscription = property_clone.subscribe_with_subscription(Arc::new(|old, new| {
1403 /// println!("Thread observer: {} -> {}", old, new);
1404 /// }))?;
1405 ///
1406 /// property_clone.set(42)?; // Prints: "Thread observer: 0 -> 42"
1407 ///
1408 /// // Subscription automatically cleaned up when thread ends
1409 /// Ok(())
1410 /// });
1411 ///
1412 /// handle.join().unwrap()?;
1413 /// property.set(100)?; // No output - thread subscription was cleaned up
1414 /// # Ok(())
1415 /// # }
1416 /// ```
1417 ///
1418 /// # Use Cases
1419 ///
1420 /// This method is particularly useful in scenarios such as:
1421 /// - Temporary observers that should be active only during a specific scope
1422 /// - Error-prone code where manual cleanup might be forgotten
1423 /// - Complex control flow where multiple exit points make manual cleanup difficult
1424 /// - Resource-constrained environments where observer leaks are problematic
1425 pub fn subscribe_with_subscription(
1426 &self,
1427 observer: Observer<T>,
1428 ) -> Result<Subscription<T>, PropertyError> {
1429 let id = self.subscribe(observer)?;
1430 Ok(Subscription {
1431 inner: Arc::clone(&self.inner),
1432 id,
1433 })
1434 }
1435
1436 /// Subscribes a filtered observer and returns a RAII guard for automatic cleanup
1437 ///
1438 /// This method combines the functionality of `subscribe_filtered()` with the automatic
1439 /// cleanup benefits of `subscribe_with_subscription()`. The observer will only be
1440 /// called when the filter condition is satisfied, and it will be automatically
1441 /// unsubscribed when the returned `Subscription` goes out of scope.
1442 ///
1443 /// # Arguments
1444 ///
1445 /// * `observer` - The observer function to call when the filter passes
1446 /// * `filter` - A predicate function that receives `(old_value, new_value)` and returns `bool`
1447 ///
1448 /// # Returns
1449 ///
1450 /// `Ok(Subscription<T>)` containing a RAII guard for the filtered observer,
1451 /// or `Err(PropertyError)` if the lock is poisoned.
1452 ///
1453 /// # Examples
1454 ///
1455 /// ## Basic Filtered RAII Subscription
1456 ///
1457 /// ```rust
1458 /// use observable_property::ObservableProperty;
1459 /// use std::sync::Arc;
1460 ///
1461 /// # fn main() -> Result<(), observable_property::PropertyError> {
1462 /// let counter = ObservableProperty::new(0);
1463 ///
1464 /// {
1465 /// // Monitor only increases with automatic cleanup
1466 /// let _increase_monitor = counter.subscribe_filtered_with_subscription(
1467 /// Arc::new(|old, new| {
1468 /// println!("Counter increased: {} -> {}", old, new);
1469 /// }),
1470 /// |old, new| new > old
1471 /// )?;
1472 ///
1473 /// counter.set(5)?; // Prints: "Counter increased: 0 -> 5"
1474 /// counter.set(3)?; // No output (decrease)
1475 /// counter.set(7)?; // Prints: "Counter increased: 3 -> 7"
1476 ///
1477 /// // Subscription automatically cleaned up when leaving scope
1478 /// }
1479 ///
1480 /// counter.set(10)?; // No output - subscription was cleaned up
1481 /// # Ok(())
1482 /// # }
1483 /// ```
1484 ///
1485 /// ## Multi-Condition Temperature Monitoring
1486 ///
1487 /// ```rust
1488 /// use observable_property::ObservableProperty;
1489 /// use std::sync::Arc;
1490 ///
1491 /// # fn main() -> Result<(), observable_property::PropertyError> {
1492 /// let temperature = ObservableProperty::new(20.0_f64);
1493 ///
1494 /// {
1495 /// // Create filtered subscription that only triggers for significant temperature increases
1496 /// let _heat_warning = temperature.subscribe_filtered_with_subscription(
1497 /// Arc::new(|old_temp, new_temp| {
1498 /// println!("🔥 Heat warning! Temperature rose from {:.1}°C to {:.1}°C",
1499 /// old_temp, new_temp);
1500 /// }),
1501 /// |old, new| new > old && (new - old) > 5.0 // Only trigger for increases > 5°C
1502 /// )?;
1503 ///
1504 /// // Create another filtered subscription for cooling alerts
1505 /// let _cooling_alert = temperature.subscribe_filtered_with_subscription(
1506 /// Arc::new(|old_temp, new_temp| {
1507 /// println!("❄️ Cooling alert! Temperature dropped from {:.1}°C to {:.1}°C",
1508 /// old_temp, new_temp);
1509 /// }),
1510 /// |old, new| new < old && (old - new) > 3.0 // Only trigger for decreases > 3°C
1511 /// )?;
1512 ///
1513 /// // Test the filters
1514 /// temperature.set(22.0)?; // No alerts (increase of only 2°C)
1515 /// temperature.set(28.0)?; // Heat warning triggered (increase of 6°C from 22°C)
1516 /// temperature.set(23.0)?; // Cooling alert triggered (decrease of 5°C)
1517 ///
1518 /// // Both subscriptions are automatically cleaned up when they go out of scope
1519 /// }
1520 ///
1521 /// temperature.set(35.0)?; // No alerts - subscriptions were cleaned up
1522 /// # Ok(())
1523 /// # }
1524 /// ```
1525 ///
1526 /// ## Conditional Monitoring with Complex Filters
1527 ///
1528 /// ```rust
1529 /// use observable_property::ObservableProperty;
1530 /// use std::sync::Arc;
1531 ///
1532 /// # fn main() -> Result<(), observable_property::PropertyError> {
1533 /// let stock_price = ObservableProperty::new(100.0_f64);
1534 ///
1535 /// {
1536 /// // Monitor significant price movements (> 5% change)
1537 /// let _volatility_alert = stock_price.subscribe_filtered_with_subscription(
1538 /// Arc::new(|old_price, new_price| {
1539 /// let change_percent = ((new_price - old_price) / old_price * 100.0).abs();
1540 /// println!("📈 Significant price movement: ${:.2} -> ${:.2} ({:.1}%)",
1541 /// old_price, new_price, change_percent);
1542 /// }),
1543 /// |old, new| {
1544 /// let change_percent = ((new - old) / old * 100.0).abs();
1545 /// change_percent > 5.0 // Trigger on > 5% change
1546 /// }
1547 /// )?;
1548 ///
1549 /// stock_price.set(103.0)?; // No alert (3% change)
1550 /// stock_price.set(108.0)?; // Alert triggered (4.85% from 103, but let's say it rounds up)
1551 /// stock_price.set(95.0)?; // Alert triggered (12% decrease)
1552 ///
1553 /// // Subscription automatically cleaned up when leaving scope
1554 /// }
1555 ///
1556 /// stock_price.set(200.0)?; // No alert - monitoring ended
1557 /// # Ok(())
1558 /// # }
1559 /// ```
1560 ///
1561 /// ## Cross-Thread Filtered Monitoring
1562 ///
1563 /// ```rust
1564 /// use observable_property::ObservableProperty;
1565 /// use std::sync::Arc;
1566 /// use std::thread;
1567 /// use std::time::Duration;
1568 ///
1569 /// # fn main() -> Result<(), observable_property::PropertyError> {
1570 /// let network_latency = Arc::new(ObservableProperty::new(50)); // milliseconds
1571 /// let latency_clone = network_latency.clone();
1572 ///
1573 /// let monitor_handle = thread::spawn(move || -> Result<(), observable_property::PropertyError> {
1574 /// // Monitor high latency in background thread with automatic cleanup
1575 /// let _high_latency_alert = latency_clone.subscribe_filtered_with_subscription(
1576 /// Arc::new(|old_ms, new_ms| {
1577 /// println!("⚠️ High latency detected: {}ms -> {}ms", old_ms, new_ms);
1578 /// }),
1579 /// |_, new| *new > 100 // Alert when latency exceeds 100ms
1580 /// )?;
1581 ///
1582 /// // Simulate monitoring for a short time
1583 /// thread::sleep(Duration::from_millis(10));
1584 ///
1585 /// // Subscription automatically cleaned up when thread ends
1586 /// Ok(())
1587 /// });
1588 ///
1589 /// // Simulate network conditions
1590 /// network_latency.set(80)?; // No alert (under threshold)
1591 /// network_latency.set(150)?; // Alert triggered in background thread
1592 ///
1593 /// monitor_handle.join().unwrap()?;
1594 /// network_latency.set(200)?; // No alert - background monitoring ended
1595 /// # Ok(())
1596 /// # }
1597 /// ```
1598 ///
1599 /// # Use Cases
1600 ///
1601 /// This method is ideal for:
1602 /// - Threshold-based monitoring with automatic cleanup
1603 /// - Temporary conditional observers in specific code blocks
1604 /// - Event-driven systems where observers should be active only during certain phases
1605 /// - Resource management scenarios where filtered observers have limited lifetimes
1606 ///
1607 /// # Performance Notes
1608 ///
1609 /// The filter function is evaluated for every property change, so it should be
1610 /// lightweight. Complex filtering logic should be optimized to avoid performance
1611 /// bottlenecks, especially in high-frequency update scenarios.
1612 pub fn subscribe_filtered_with_subscription<F>(
1613 &self,
1614 observer: Observer<T>,
1615 filter: F,
1616 ) -> Result<Subscription<T>, PropertyError>
1617 where
1618 F: Fn(&T, &T) -> bool + Send + Sync + 'static,
1619 {
1620 let id = self.subscribe_filtered(observer, filter)?;
1621 Ok(Subscription {
1622 inner: Arc::clone(&self.inner),
1623 id,
1624 })
1625 }
1626}
1627
1628impl<T: Clone> Clone for ObservableProperty<T> {
1629 /// Creates a new reference to the same observable property
1630 ///
1631 /// This creates a new `ObservableProperty` instance that shares the same
1632 /// underlying data with the original. Changes made through either instance
1633 /// will be visible to observers subscribed through both instances.
1634 ///
1635 /// # Examples
1636 ///
1637 /// ```rust
1638 /// use observable_property::ObservableProperty;
1639 /// use std::sync::Arc;
1640 ///
1641 /// let property1 = ObservableProperty::new(42);
1642 /// let property2 = property1.clone();
1643 ///
1644 /// property2.subscribe(Arc::new(|old, new| {
1645 /// println!("Observer on property2 saw change: {} -> {}", old, new);
1646 /// })).map_err(|e| {
1647 /// eprintln!("Failed to subscribe: {}", e);
1648 /// e
1649 /// })?;
1650 ///
1651 /// // This change through property1 will trigger the observer on property2
1652 /// property1.set(100).map_err(|e| {
1653 /// eprintln!("Failed to set value: {}", e);
1654 /// e
1655 /// })?;
1656 /// # Ok::<(), observable_property::PropertyError>(())
1657 /// ```
1658 fn clone(&self) -> Self {
1659 Self {
1660 inner: Arc::clone(&self.inner),
1661 max_threads: self.max_threads,
1662 }
1663 }
1664}
1665
1666impl<T: Clone + std::fmt::Debug + Send + Sync + 'static> std::fmt::Debug for ObservableProperty<T> {
1667 /// Debug implementation that shows the current value if accessible
1668 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1669 match self.get() {
1670 Ok(value) => f
1671 .debug_struct("ObservableProperty")
1672 .field("value", &value)
1673 .field("observers_count", &"[hidden]")
1674 .field("max_threads", &self.max_threads)
1675 .finish(),
1676 Err(_) => f
1677 .debug_struct("ObservableProperty")
1678 .field("value", &"[inaccessible]")
1679 .field("observers_count", &"[hidden]")
1680 .finish(),
1681 }
1682 }
1683}
1684
1685#[cfg(test)]
1686mod tests {
1687 use super::*;
1688 use std::sync::atomic::{AtomicUsize, Ordering};
1689 use std::time::Duration;
1690
1691 #[test]
1692 fn test_property_creation_and_basic_operations() {
1693 let prop = ObservableProperty::new(42);
1694
1695 // Test initial value
1696 match prop.get() {
1697 Ok(value) => assert_eq!(value, 42),
1698 Err(e) => panic!("Failed to get initial value: {}", e),
1699 }
1700
1701 // Test setting value
1702 if let Err(e) = prop.set(100) {
1703 panic!("Failed to set value: {}", e);
1704 }
1705
1706 match prop.get() {
1707 Ok(value) => assert_eq!(value, 100),
1708 Err(e) => panic!("Failed to get updated value: {}", e),
1709 }
1710 }
1711
1712 #[test]
1713 fn test_observer_subscription_and_notification() {
1714 let prop = ObservableProperty::new("initial".to_string());
1715 let notification_count = Arc::new(AtomicUsize::new(0));
1716 let last_old_value = Arc::new(RwLock::new(String::new()));
1717 let last_new_value = Arc::new(RwLock::new(String::new()));
1718
1719 let count_clone = notification_count.clone();
1720 let old_clone = last_old_value.clone();
1721 let new_clone = last_new_value.clone();
1722
1723 let observer_id = match prop.subscribe(Arc::new(move |old, new| {
1724 count_clone.fetch_add(1, Ordering::SeqCst);
1725 if let Ok(mut old_val) = old_clone.write() {
1726 *old_val = old.clone();
1727 }
1728 if let Ok(mut new_val) = new_clone.write() {
1729 *new_val = new.clone();
1730 }
1731 })) {
1732 Ok(id) => id,
1733 Err(e) => panic!("Failed to subscribe observer: {}", e),
1734 };
1735
1736 // Change value and verify notification
1737 if let Err(e) = prop.set("changed".to_string()) {
1738 panic!("Failed to set property value: {}", e);
1739 }
1740
1741 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1742
1743 match last_old_value.read() {
1744 Ok(old_val) => assert_eq!(*old_val, "initial"),
1745 Err(e) => panic!("Failed to read old value: {:?}", e),
1746 }
1747
1748 match last_new_value.read() {
1749 Ok(new_val) => assert_eq!(*new_val, "changed"),
1750 Err(e) => panic!("Failed to read new value: {:?}", e),
1751 }
1752
1753 // Test unsubscription
1754 match prop.unsubscribe(observer_id) {
1755 Ok(was_present) => assert!(was_present),
1756 Err(e) => panic!("Failed to unsubscribe observer: {}", e),
1757 }
1758
1759 // Change value again - should not notify
1760 if let Err(e) = prop.set("not_notified".to_string()) {
1761 panic!("Failed to set property value after unsubscribe: {}", e);
1762 }
1763 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1764 }
1765
1766 #[test]
1767 fn test_filtered_observer() {
1768 let prop = ObservableProperty::new(0i32);
1769 let notification_count = Arc::new(AtomicUsize::new(0));
1770 let count_clone = notification_count.clone();
1771
1772 // Observer only triggered when value increases
1773 let observer_id = match prop.subscribe_filtered(
1774 Arc::new(move |_, _| {
1775 count_clone.fetch_add(1, Ordering::SeqCst);
1776 }),
1777 |old, new| new > old,
1778 ) {
1779 Ok(id) => id,
1780 Err(e) => panic!("Failed to subscribe filtered observer: {}", e),
1781 };
1782
1783 // Should trigger (0 -> 5)
1784 if let Err(e) = prop.set(5) {
1785 panic!("Failed to set property value to 5: {}", e);
1786 }
1787 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1788
1789 // Should NOT trigger (5 -> 3)
1790 if let Err(e) = prop.set(3) {
1791 panic!("Failed to set property value to 3: {}", e);
1792 }
1793 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
1794
1795 // Should trigger (3 -> 10)
1796 if let Err(e) = prop.set(10) {
1797 panic!("Failed to set property value to 10: {}", e);
1798 }
1799 assert_eq!(notification_count.load(Ordering::SeqCst), 2);
1800
1801 match prop.unsubscribe(observer_id) {
1802 Ok(_) => {}
1803 Err(e) => panic!("Failed to unsubscribe filtered observer: {}", e),
1804 }
1805 }
1806
1807 #[test]
1808 fn test_thread_safety_concurrent_reads() {
1809 let prop = Arc::new(ObservableProperty::new(42i32));
1810 let num_threads = 10;
1811 let reads_per_thread = 100;
1812
1813 let handles: Vec<_> = (0..num_threads)
1814 .map(|_| {
1815 let prop_clone = prop.clone();
1816 thread::spawn(move || {
1817 for _ in 0..reads_per_thread {
1818 match prop_clone.get() {
1819 Ok(value) => assert_eq!(value, 42),
1820 Err(e) => panic!("Failed to read property value: {}", e),
1821 }
1822 thread::sleep(Duration::from_millis(1));
1823 }
1824 })
1825 })
1826 .collect();
1827
1828 for handle in handles {
1829 if let Err(e) = handle.join() {
1830 panic!("Thread failed to complete: {:?}", e);
1831 }
1832 }
1833 }
1834
1835 #[test]
1836 fn test_async_set_performance() {
1837 let prop = ObservableProperty::new(0i32);
1838 let slow_observer_count = Arc::new(AtomicUsize::new(0));
1839 let count_clone = slow_observer_count.clone();
1840
1841 // Add observer that simulates slow work
1842 let _id = match prop.subscribe(Arc::new(move |_, _| {
1843 thread::sleep(Duration::from_millis(50));
1844 count_clone.fetch_add(1, Ordering::SeqCst);
1845 })) {
1846 Ok(id) => id,
1847 Err(e) => panic!("Failed to subscribe slow observer: {}", e),
1848 };
1849
1850 // Test synchronous set (should be slow)
1851 let start = std::time::Instant::now();
1852 if let Err(e) = prop.set(1) {
1853 panic!("Failed to set property value synchronously: {}", e);
1854 }
1855 let sync_duration = start.elapsed();
1856
1857 // Test asynchronous set (should be fast)
1858 let start = std::time::Instant::now();
1859 if let Err(e) = prop.set_async(2) {
1860 panic!("Failed to set property value asynchronously: {}", e);
1861 }
1862 let async_duration = start.elapsed();
1863
1864 // Async should be much faster than sync
1865 assert!(async_duration < sync_duration);
1866 assert!(async_duration.as_millis() < 10); // Should be very fast
1867
1868 // Wait for async observer to complete
1869 thread::sleep(Duration::from_millis(100));
1870
1871 // Both observers should have been called
1872 assert_eq!(slow_observer_count.load(Ordering::SeqCst), 2);
1873 }
1874
1875 #[test]
1876 fn test_lock_poisoning() {
1877 // Create a property that we'll poison
1878 let prop = Arc::new(ObservableProperty::new(0));
1879 let prop_clone = prop.clone();
1880
1881 // Create a thread that will deliberately poison the lock
1882 let poison_thread = thread::spawn(move || {
1883 // Get write lock and then panic, which will poison the lock
1884 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for poisoning test");
1885 panic!("Deliberate panic to poison the lock");
1886 });
1887
1888 // Wait for the thread to complete (it will panic)
1889 let _ = poison_thread.join();
1890
1891 // With graceful degradation, operations should succeed even with poisoned locks
1892 // The implementation recovers the inner value using into_inner()
1893
1894 // get() should succeed by recovering from poisoned lock
1895 match prop.get() {
1896 Ok(value) => assert_eq!(value, 0), // Should recover the value
1897 Err(e) => panic!("get() should succeed with graceful degradation, got error: {:?}", e),
1898 }
1899
1900 // set() should succeed by recovering from poisoned lock
1901 match prop.set(42) {
1902 Ok(_) => {}, // Expected success with graceful degradation
1903 Err(e) => panic!("set() should succeed with graceful degradation, got error: {:?}", e),
1904 }
1905
1906 // Verify the value was actually set
1907 assert_eq!(prop.get().expect("Failed to get value after set"), 42);
1908
1909 // subscribe() should succeed by recovering from poisoned lock
1910 match prop.subscribe(Arc::new(|_, _| {})) {
1911 Ok(_) => {}, // Expected success with graceful degradation
1912 Err(e) => panic!("subscribe() should succeed with graceful degradation, got error: {:?}", e),
1913 }
1914 }
1915
1916 #[test]
1917 fn test_observer_panic_isolation() {
1918 let prop = ObservableProperty::new(0);
1919 let call_counts = Arc::new(AtomicUsize::new(0));
1920
1921 // First observer will panic
1922 let panic_observer_id = prop
1923 .subscribe(Arc::new(|_, _| {
1924 panic!("This observer deliberately panics");
1925 }))
1926 .expect("Failed to subscribe panic observer");
1927
1928 // Second observer should still be called despite first one panicking
1929 let counts = call_counts.clone();
1930 let normal_observer_id = prop
1931 .subscribe(Arc::new(move |_, _| {
1932 counts.fetch_add(1, Ordering::SeqCst);
1933 }))
1934 .expect("Failed to subscribe normal observer");
1935
1936 // Trigger the observers - this shouldn't panic despite the first observer panicking
1937 prop.set(42).expect("Failed to set property value");
1938
1939 // Verify the second observer was still called
1940 assert_eq!(call_counts.load(Ordering::SeqCst), 1);
1941
1942 // Clean up
1943 prop.unsubscribe(panic_observer_id).expect("Failed to unsubscribe panic observer");
1944 prop.unsubscribe(normal_observer_id).expect("Failed to unsubscribe normal observer");
1945 }
1946
1947 #[test]
1948 fn test_unsubscribe_nonexistent_observer() {
1949 let property = ObservableProperty::new(0);
1950
1951 // Generate a valid observer ID
1952 let valid_id = property.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe test observer");
1953
1954 // Create an ID that doesn't exist (valid_id + 1000 should not exist)
1955 let nonexistent_id = valid_id + 1000;
1956
1957 // Test unsubscribing a nonexistent observer
1958 match property.unsubscribe(nonexistent_id) {
1959 Ok(was_present) => {
1960 assert!(
1961 !was_present,
1962 "Unsubscribe should return false for nonexistent ID"
1963 );
1964 }
1965 Err(e) => panic!("Unsubscribe returned error: {:?}", e),
1966 }
1967
1968 // Also verify that unsubscribing twice returns false the second time
1969 property.unsubscribe(valid_id).expect("Failed first unsubscribe"); // First unsubscribe should return true
1970
1971 let result = property.unsubscribe(valid_id).expect("Failed second unsubscribe");
1972 assert!(!result, "Second unsubscribe should return false");
1973 }
1974
1975 #[test]
1976 fn test_observer_id_wraparound() {
1977 let prop = ObservableProperty::new(0);
1978
1979 // Test that observer IDs increment properly and don't wrap around unexpectedly
1980 let id1 = prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe observer 1");
1981 let id2 = prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe observer 2");
1982 let id3 = prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe observer 3");
1983
1984 assert!(id2 > id1, "Observer IDs should increment");
1985 assert!(id3 > id2, "Observer IDs should continue incrementing");
1986 assert_eq!(id2, id1 + 1, "Observer IDs should increment by 1");
1987 assert_eq!(id3, id2 + 1, "Observer IDs should increment by 1");
1988
1989 // Clean up
1990 prop.unsubscribe(id1).expect("Failed to unsubscribe observer 1");
1991 prop.unsubscribe(id2).expect("Failed to unsubscribe observer 2");
1992 prop.unsubscribe(id3).expect("Failed to unsubscribe observer 3");
1993 }
1994
1995 #[test]
1996 fn test_concurrent_subscribe_unsubscribe() {
1997 let prop = Arc::new(ObservableProperty::new(0));
1998 let num_threads = 8;
1999 let operations_per_thread = 100;
2000
2001 let handles: Vec<_> = (0..num_threads)
2002 .map(|thread_id| {
2003 let prop_clone = prop.clone();
2004 thread::spawn(move || {
2005 let mut local_ids = Vec::new();
2006
2007 for i in 0..operations_per_thread {
2008 // Subscribe an observer
2009 let observer_id = prop_clone
2010 .subscribe(Arc::new(move |old, new| {
2011 // Do some work to simulate real observer
2012 let _ = thread_id + i + old + new;
2013 }))
2014 .expect("Subscribe should succeed");
2015
2016 local_ids.push(observer_id);
2017
2018 // Occasionally unsubscribe some observers
2019 if i % 10 == 0 && !local_ids.is_empty() {
2020 let idx = i % local_ids.len();
2021 let id_to_remove = local_ids.remove(idx);
2022 prop_clone
2023 .unsubscribe(id_to_remove)
2024 .expect("Unsubscribe should succeed");
2025 }
2026 }
2027
2028 // Clean up remaining observers
2029 for id in local_ids {
2030 prop_clone
2031 .unsubscribe(id)
2032 .expect("Final cleanup should succeed");
2033 }
2034 })
2035 })
2036 .collect();
2037
2038 // Wait for all threads to complete
2039 for handle in handles {
2040 handle.join().expect("Thread should complete successfully");
2041 }
2042
2043 // Property should still be functional
2044 prop.set(42)
2045 .expect("Property should still work after concurrent operations");
2046 }
2047
2048 #[test]
2049 fn test_multiple_observer_panics_isolation() {
2050 let prop = ObservableProperty::new(0);
2051 let successful_calls = Arc::new(AtomicUsize::new(0));
2052
2053 // Create multiple observers that will panic
2054 let _panic_id1 = prop
2055 .subscribe(Arc::new(|_, _| {
2056 panic!("First panic observer");
2057 }))
2058 .expect("Failed to subscribe first panic observer");
2059
2060 let _panic_id2 = prop
2061 .subscribe(Arc::new(|_, _| {
2062 panic!("Second panic observer");
2063 }))
2064 .expect("Failed to subscribe second panic observer");
2065
2066 // Create observers that should succeed despite the panics
2067 let count1 = successful_calls.clone();
2068 let _success_id1 = prop
2069 .subscribe(Arc::new(move |_, _| {
2070 count1.fetch_add(1, Ordering::SeqCst);
2071 }))
2072 .expect("Failed to subscribe first success observer");
2073
2074 let count2 = successful_calls.clone();
2075 let _success_id2 = prop
2076 .subscribe(Arc::new(move |_, _| {
2077 count2.fetch_add(1, Ordering::SeqCst);
2078 }))
2079 .expect("Failed to subscribe second success observer");
2080
2081 // Trigger all observers
2082 prop.set(42).expect("Failed to set property value for panic isolation test");
2083
2084 // Both successful observers should have been called despite the panics
2085 assert_eq!(successful_calls.load(Ordering::SeqCst), 2);
2086 }
2087
2088 #[test]
2089 fn test_async_observer_panic_isolation() {
2090 let prop = ObservableProperty::new(0);
2091 let successful_calls = Arc::new(AtomicUsize::new(0));
2092
2093 // Create observer that will panic
2094 let _panic_id = prop
2095 .subscribe(Arc::new(|_, _| {
2096 panic!("Async panic observer");
2097 }))
2098 .expect("Failed to subscribe async panic observer");
2099
2100 // Create observer that should succeed
2101 let count = successful_calls.clone();
2102 let _success_id = prop
2103 .subscribe(Arc::new(move |_, _| {
2104 count.fetch_add(1, Ordering::SeqCst);
2105 }))
2106 .expect("Failed to subscribe async success observer");
2107
2108 // Use async set to trigger observers in background threads
2109 prop.set_async(42).expect("Failed to set property value asynchronously");
2110
2111 // Wait for async observers to complete
2112 thread::sleep(Duration::from_millis(100));
2113
2114 // The successful observer should have been called despite the panic
2115 assert_eq!(successful_calls.load(Ordering::SeqCst), 1);
2116 }
2117
2118 #[test]
2119 fn test_very_large_observer_count() {
2120 let prop = ObservableProperty::new(0);
2121 let total_calls = Arc::new(AtomicUsize::new(0));
2122 let observer_count = 1000;
2123
2124 // Subscribe many observers
2125 let mut observer_ids = Vec::with_capacity(observer_count);
2126 for i in 0..observer_count {
2127 let count = total_calls.clone();
2128 let id = prop
2129 .subscribe(Arc::new(move |old, new| {
2130 count.fetch_add(1, Ordering::SeqCst);
2131 // Verify we got the right values
2132 assert_eq!(*old, 0);
2133 assert_eq!(*new, i + 1);
2134 }))
2135 .expect("Failed to subscribe large observer count test observer");
2136 observer_ids.push(id);
2137 }
2138
2139 // Trigger all observers
2140 prop.set(observer_count).expect("Failed to set property value for large observer count test");
2141
2142 // All observers should have been called
2143 assert_eq!(total_calls.load(Ordering::SeqCst), observer_count);
2144
2145 // Clean up
2146 for id in observer_ids {
2147 prop.unsubscribe(id).expect("Failed to unsubscribe observer in large count test");
2148 }
2149 }
2150
2151 #[test]
2152 fn test_observer_with_mutable_state() {
2153 let prop = ObservableProperty::new(0);
2154 let call_history = Arc::new(RwLock::new(Vec::new()));
2155
2156 let history = call_history.clone();
2157 let observer_id = prop
2158 .subscribe(Arc::new(move |old, new| {
2159 if let Ok(mut hist) = history.write() {
2160 hist.push((*old, *new));
2161 }
2162 }))
2163 .expect("Failed to subscribe mutable state observer");
2164
2165 // Make several changes
2166 prop.set(1).expect("Failed to set property to 1");
2167 prop.set(2).expect("Failed to set property to 2");
2168 prop.set(3).expect("Failed to set property to 3");
2169
2170 // Verify the history was recorded correctly
2171 let history = call_history.read().expect("Failed to read call history");
2172 assert_eq!(history.len(), 3);
2173 assert_eq!(history[0], (0, 1));
2174 assert_eq!(history[1], (1, 2));
2175 assert_eq!(history[2], (2, 3));
2176
2177 prop.unsubscribe(observer_id).expect("Failed to unsubscribe mutable state observer");
2178 }
2179
2180 #[test]
2181 fn test_subscription_automatic_cleanup() {
2182 let prop = ObservableProperty::new(0);
2183 let call_count = Arc::new(AtomicUsize::new(0));
2184
2185 // Test that subscription automatically cleans up when dropped
2186 {
2187 let count = call_count.clone();
2188 let _subscription = prop
2189 .subscribe_with_subscription(Arc::new(move |_, _| {
2190 count.fetch_add(1, Ordering::SeqCst);
2191 }))
2192 .expect("Failed to create subscription for automatic cleanup test");
2193
2194 // Observer should be active while subscription is in scope
2195 prop.set(1).expect("Failed to set property value in subscription test");
2196 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2197
2198 // Subscription goes out of scope here and should auto-cleanup
2199 }
2200
2201 // Observer should no longer be active after subscription dropped
2202 prop.set(2).expect("Failed to set property value after subscription dropped");
2203 assert_eq!(call_count.load(Ordering::SeqCst), 1); // No additional calls
2204 }
2205
2206 #[test]
2207 fn test_subscription_explicit_drop() {
2208 let prop = ObservableProperty::new(0);
2209 let call_count = Arc::new(AtomicUsize::new(0));
2210
2211 let count = call_count.clone();
2212 let subscription = prop
2213 .subscribe_with_subscription(Arc::new(move |_, _| {
2214 count.fetch_add(1, Ordering::SeqCst);
2215 }))
2216 .expect("Failed to create subscription for explicit drop test");
2217
2218 // Observer should be active
2219 prop.set(1).expect("Failed to set property value before explicit drop");
2220 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2221
2222 // Explicitly drop the subscription
2223 drop(subscription);
2224
2225 // Observer should no longer be active
2226 prop.set(2).expect("Failed to set property value after explicit drop");
2227 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2228 }
2229
2230 #[test]
2231 fn test_multiple_subscriptions_with_cleanup() {
2232 let prop = ObservableProperty::new(0);
2233 let call_count1 = Arc::new(AtomicUsize::new(0));
2234 let call_count2 = Arc::new(AtomicUsize::new(0));
2235 let call_count3 = Arc::new(AtomicUsize::new(0));
2236
2237 let count1 = call_count1.clone();
2238 let count2 = call_count2.clone();
2239 let count3 = call_count3.clone();
2240
2241 let subscription1 = prop
2242 .subscribe_with_subscription(Arc::new(move |_, _| {
2243 count1.fetch_add(1, Ordering::SeqCst);
2244 }))
2245 .expect("Failed to create first subscription");
2246
2247 let subscription2 = prop
2248 .subscribe_with_subscription(Arc::new(move |_, _| {
2249 count2.fetch_add(1, Ordering::SeqCst);
2250 }))
2251 .expect("Failed to create second subscription");
2252
2253 let subscription3 = prop
2254 .subscribe_with_subscription(Arc::new(move |_, _| {
2255 count3.fetch_add(1, Ordering::SeqCst);
2256 }))
2257 .expect("Failed to create third subscription");
2258
2259 // All observers should be active
2260 prop.set(1).expect("Failed to set property value with all subscriptions");
2261 assert_eq!(call_count1.load(Ordering::SeqCst), 1);
2262 assert_eq!(call_count2.load(Ordering::SeqCst), 1);
2263 assert_eq!(call_count3.load(Ordering::SeqCst), 1);
2264
2265 // Drop second subscription
2266 drop(subscription2);
2267
2268 // Only first and third should be active
2269 prop.set(2).expect("Failed to set property value with partial subscriptions");
2270 assert_eq!(call_count1.load(Ordering::SeqCst), 2);
2271 assert_eq!(call_count2.load(Ordering::SeqCst), 1); // No change
2272 assert_eq!(call_count3.load(Ordering::SeqCst), 2);
2273
2274 // Drop remaining subscriptions
2275 drop(subscription1);
2276 drop(subscription3);
2277
2278 // No observers should be active
2279 prop.set(3).expect("Failed to set property value with no subscriptions");
2280 assert_eq!(call_count1.load(Ordering::SeqCst), 2);
2281 assert_eq!(call_count2.load(Ordering::SeqCst), 1);
2282 assert_eq!(call_count3.load(Ordering::SeqCst), 2);
2283 }
2284
2285 #[test]
2286 fn test_subscription_drop_with_poisoned_lock() {
2287 let prop = Arc::new(ObservableProperty::new(0));
2288 let prop_clone = prop.clone();
2289
2290 // Create a subscription
2291 let call_count = Arc::new(AtomicUsize::new(0));
2292 let count = call_count.clone();
2293 let subscription = prop
2294 .subscribe_with_subscription(Arc::new(move |_, _| {
2295 count.fetch_add(1, Ordering::SeqCst);
2296 }))
2297 .expect("Failed to create subscription for poisoned lock test");
2298
2299 // Poison the lock by panicking while holding a write lock
2300 let poison_thread = thread::spawn(move || {
2301 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for poisoning test");
2302 panic!("Deliberate panic to poison the lock");
2303 });
2304 let _ = poison_thread.join(); // Ignore the panic result
2305
2306 // Dropping the subscription should not panic even with poisoned lock
2307 // This tests that the Drop implementation handles poisoned locks gracefully
2308 drop(subscription); // Should complete without panic
2309
2310 // Test passes if we reach here without panicking
2311 }
2312
2313 #[test]
2314 fn test_subscription_vs_manual_unsubscribe() {
2315 let prop = ObservableProperty::new(0);
2316 let auto_count = Arc::new(AtomicUsize::new(0));
2317 let manual_count = Arc::new(AtomicUsize::new(0));
2318
2319 // Manual subscription
2320 let manual_count_clone = manual_count.clone();
2321 let manual_id = prop
2322 .subscribe(Arc::new(move |_, _| {
2323 manual_count_clone.fetch_add(1, Ordering::SeqCst);
2324 }))
2325 .expect("Failed to create manual subscription");
2326
2327 // Automatic subscription
2328 let auto_count_clone = auto_count.clone();
2329 let _auto_subscription = prop
2330 .subscribe_with_subscription(Arc::new(move |_, _| {
2331 auto_count_clone.fetch_add(1, Ordering::SeqCst);
2332 }))
2333 .expect("Failed to create automatic subscription");
2334
2335 // Both should be active
2336 prop.set(1).expect("Failed to set property value with both subscriptions");
2337 assert_eq!(manual_count.load(Ordering::SeqCst), 1);
2338 assert_eq!(auto_count.load(Ordering::SeqCst), 1);
2339
2340 // Manual unsubscribe
2341 prop.unsubscribe(manual_id).expect("Failed to manually unsubscribe");
2342
2343 // Only automatic subscription should be active
2344 prop.set(2).expect("Failed to set property value after manual unsubscribe");
2345 assert_eq!(manual_count.load(Ordering::SeqCst), 1); // No change
2346 assert_eq!(auto_count.load(Ordering::SeqCst), 2);
2347
2348 // Auto subscription goes out of scope here and cleans up automatically
2349 }
2350
2351 #[test]
2352 fn test_subscribe_with_subscription_error_handling() {
2353 let prop = Arc::new(ObservableProperty::new(0));
2354 let prop_clone = prop.clone();
2355
2356 // Poison the lock
2357 let poison_thread = thread::spawn(move || {
2358 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for poisoning test");
2359 panic!("Deliberate panic to poison the lock");
2360 });
2361 let _ = poison_thread.join();
2362
2363 // With graceful degradation, subscribe_with_subscription should succeed
2364 let result = prop.subscribe_with_subscription(Arc::new(|_, _| {}));
2365 assert!(result.is_ok(), "subscribe_with_subscription should succeed with graceful degradation");
2366 }
2367
2368 #[test]
2369 fn test_subscription_with_property_cloning() {
2370 let prop1 = ObservableProperty::new(0);
2371 let prop2 = prop1.clone();
2372 let call_count = Arc::new(AtomicUsize::new(0));
2373
2374 // Subscribe to prop1
2375 let count = call_count.clone();
2376 let _subscription = prop1
2377 .subscribe_with_subscription(Arc::new(move |_, _| {
2378 count.fetch_add(1, Ordering::SeqCst);
2379 }))
2380 .expect("Failed to create subscription for cloned property test");
2381
2382 // Changes through prop2 should trigger the observer subscribed to prop1
2383 prop2.set(42).expect("Failed to set property value through prop2");
2384 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2385
2386 // Changes through prop1 should also trigger the observer
2387 prop1.set(100).expect("Failed to set property value through prop1");
2388 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2389 }
2390
2391 #[test]
2392 fn test_subscription_in_conditional_blocks() {
2393 let prop = ObservableProperty::new(0);
2394 let call_count = Arc::new(AtomicUsize::new(0));
2395
2396 let should_subscribe = true;
2397
2398 if should_subscribe {
2399 let count = call_count.clone();
2400 let _subscription = prop
2401 .subscribe_with_subscription(Arc::new(move |_, _| {
2402 count.fetch_add(1, Ordering::SeqCst);
2403 }))
2404 .expect("Failed to create subscription in conditional block");
2405
2406 // Observer active within this block
2407 prop.set(1).expect("Failed to set property value in conditional block");
2408 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2409
2410 // Subscription dropped when exiting this block
2411 }
2412
2413 // Observer should be inactive now
2414 prop.set(2).expect("Failed to set property value after conditional block");
2415 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2416 }
2417
2418 #[test]
2419 fn test_subscription_with_early_return() {
2420 fn test_function(
2421 prop: &ObservableProperty<i32>,
2422 should_return_early: bool,
2423 ) -> Result<(), PropertyError> {
2424 let call_count = Arc::new(AtomicUsize::new(0));
2425 let count = call_count.clone();
2426
2427 let _subscription = prop.subscribe_with_subscription(Arc::new(move |_, _| {
2428 count.fetch_add(1, Ordering::SeqCst);
2429 }))?;
2430
2431 prop.set(1)?;
2432 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2433
2434 if should_return_early {
2435 return Ok(()); // Subscription should be cleaned up here
2436 }
2437
2438 prop.set(2)?;
2439 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2440
2441 Ok(())
2442 // Subscription cleaned up when function exits normally
2443 }
2444
2445 let prop = ObservableProperty::new(0);
2446
2447 // Test early return
2448 test_function(&prop, true).expect("Failed to test early return");
2449
2450 // Verify observer is no longer active after early return
2451 prop.set(10).expect("Failed to set property value after early return");
2452
2453 // Test normal exit
2454 test_function(&prop, false).expect("Failed to test normal exit");
2455
2456 // Verify observer is no longer active after normal exit
2457 prop.set(20).expect("Failed to set property value after normal exit");
2458 }
2459
2460 #[test]
2461 fn test_subscription_move_semantics() {
2462 let prop = ObservableProperty::new(0);
2463 let call_count = Arc::new(AtomicUsize::new(0));
2464
2465 let count = call_count.clone();
2466 let subscription = prop
2467 .subscribe_with_subscription(Arc::new(move |_, _| {
2468 count.fetch_add(1, Ordering::SeqCst);
2469 }))
2470 .expect("Failed to create subscription for move semantics test");
2471
2472 // Observer should be active
2473 prop.set(1).expect("Failed to set property value before move");
2474 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2475
2476 // Move subscription to a new variable
2477 let moved_subscription = subscription;
2478
2479 // Observer should still be active after move
2480 prop.set(2).expect("Failed to set property value after move");
2481 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2482
2483 // Drop the moved subscription
2484 drop(moved_subscription);
2485
2486 // Observer should now be inactive
2487 prop.set(3).expect("Failed to set property value after moved subscription drop");
2488 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2489 }
2490
2491 #[test]
2492 fn test_filtered_subscription_automatic_cleanup() {
2493 let prop = ObservableProperty::new(0);
2494 let call_count = Arc::new(AtomicUsize::new(0));
2495
2496 {
2497 let count = call_count.clone();
2498 let _subscription = prop
2499 .subscribe_filtered_with_subscription(
2500 Arc::new(move |_, _| {
2501 count.fetch_add(1, Ordering::SeqCst);
2502 }),
2503 |old, new| new > old, // Only trigger on increases
2504 )
2505 .expect("Failed to create filtered subscription");
2506
2507 // Should trigger (0 -> 5)
2508 prop.set(5).expect("Failed to set property value to 5 in filtered test");
2509 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2510
2511 // Should NOT trigger (5 -> 3)
2512 prop.set(3).expect("Failed to set property value to 3 in filtered test");
2513 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2514
2515 // Should trigger (3 -> 10)
2516 prop.set(10).expect("Failed to set property value to 10 in filtered test");
2517 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2518
2519 // Subscription goes out of scope here
2520 }
2521
2522 // Observer should be inactive after subscription cleanup
2523 prop.set(20).expect("Failed to set property value after filtered subscription cleanup");
2524 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2525 }
2526
2527 #[test]
2528 fn test_multiple_filtered_subscriptions() {
2529 let prop = ObservableProperty::new(10);
2530 let increase_count = Arc::new(AtomicUsize::new(0));
2531 let decrease_count = Arc::new(AtomicUsize::new(0));
2532 let significant_change_count = Arc::new(AtomicUsize::new(0));
2533
2534 let inc_count = increase_count.clone();
2535 let dec_count = decrease_count.clone();
2536 let sig_count = significant_change_count.clone();
2537
2538 let _increase_sub = prop
2539 .subscribe_filtered_with_subscription(
2540 Arc::new(move |_, _| {
2541 inc_count.fetch_add(1, Ordering::SeqCst);
2542 }),
2543 |old, new| new > old,
2544 )
2545 .expect("Failed to create increase subscription");
2546
2547 let _decrease_sub = prop
2548 .subscribe_filtered_with_subscription(
2549 Arc::new(move |_, _| {
2550 dec_count.fetch_add(1, Ordering::SeqCst);
2551 }),
2552 |old, new| new < old,
2553 )
2554 .expect("Failed to create decrease subscription");
2555
2556 let _significant_sub = prop
2557 .subscribe_filtered_with_subscription(
2558 Arc::new(move |_, _| {
2559 sig_count.fetch_add(1, Ordering::SeqCst);
2560 }),
2561 |old, new| ((*new as i32) - (*old as i32)).abs() > 5,
2562 )
2563 .expect("Failed to create significant change subscription");
2564
2565 // Test increases
2566 prop.set(15).expect("Failed to set property to 15 in multiple filtered test"); // +5: triggers increase, not significant
2567 assert_eq!(increase_count.load(Ordering::SeqCst), 1);
2568 assert_eq!(decrease_count.load(Ordering::SeqCst), 0);
2569 assert_eq!(significant_change_count.load(Ordering::SeqCst), 0);
2570
2571 // Test significant increase
2572 prop.set(25).expect("Failed to set property to 25 in multiple filtered test"); // +10: triggers increase and significant
2573 assert_eq!(increase_count.load(Ordering::SeqCst), 2);
2574 assert_eq!(decrease_count.load(Ordering::SeqCst), 0);
2575 assert_eq!(significant_change_count.load(Ordering::SeqCst), 1);
2576
2577 // Test significant decrease
2578 prop.set(5).expect("Failed to set property to 5 in multiple filtered test"); // -20: triggers decrease and significant
2579 assert_eq!(increase_count.load(Ordering::SeqCst), 2);
2580 assert_eq!(decrease_count.load(Ordering::SeqCst), 1);
2581 assert_eq!(significant_change_count.load(Ordering::SeqCst), 2);
2582
2583 // Test small decrease
2584 prop.set(3).expect("Failed to set property to 3 in multiple filtered test"); // -2: triggers decrease, not significant
2585 assert_eq!(increase_count.load(Ordering::SeqCst), 2);
2586 assert_eq!(decrease_count.load(Ordering::SeqCst), 2);
2587 assert_eq!(significant_change_count.load(Ordering::SeqCst), 2);
2588
2589 // All subscriptions auto-cleanup when they go out of scope
2590 }
2591
2592 #[test]
2593 fn test_filtered_subscription_complex_filter() {
2594 let prop = ObservableProperty::new(0.0f64);
2595 let call_count = Arc::new(AtomicUsize::new(0));
2596 let values_received = Arc::new(RwLock::new(Vec::new()));
2597
2598 let count = call_count.clone();
2599 let values = values_received.clone();
2600 let _subscription = prop
2601 .subscribe_filtered_with_subscription(
2602 Arc::new(move |old, new| {
2603 count.fetch_add(1, Ordering::SeqCst);
2604 if let Ok(mut v) = values.write() {
2605 v.push((*old, *new));
2606 }
2607 }),
2608 |old, new| {
2609 // Complex filter: trigger only when crossing integer boundaries
2610 // and the change is significant (> 0.5)
2611 let old_int = old.floor() as i32;
2612 let new_int = new.floor() as i32;
2613 old_int != new_int && (new - old).abs() > 0.5_f64
2614 },
2615 )
2616 .expect("Failed to create complex filtered subscription");
2617
2618 // Small changes within same integer - should not trigger
2619 prop.set(0.3).expect("Failed to set property to 0.3 in complex filter test");
2620 prop.set(0.7).expect("Failed to set property to 0.7 in complex filter test");
2621 assert_eq!(call_count.load(Ordering::SeqCst), 0);
2622
2623 // Cross integer boundary with significant change - should trigger
2624 prop.set(1.3).expect("Failed to set property to 1.3 in complex filter test"); // Change of 0.6, which is > 0.5
2625 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2626
2627 // Small cross-boundary change - should not trigger
2628 prop.set(1.9).expect("Failed to set property to 1.9 in complex filter test");
2629 prop.set(2.1).expect("Failed to set property to 2.1 in complex filter test"); // Change of 0.2, less than 0.5
2630 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2631
2632 // Large cross-boundary change - should trigger
2633 prop.set(3.5).expect("Failed to set property to 3.5 in complex filter test");
2634 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2635
2636 // Verify received values
2637 let values = values_received.read().expect("Failed to read values in complex filter test");
2638 assert_eq!(values.len(), 2);
2639 assert_eq!(values[0], (0.7, 1.3));
2640 assert_eq!(values[1], (2.1, 3.5));
2641 }
2642
2643 #[test]
2644 fn test_filtered_subscription_error_handling() {
2645 let prop = Arc::new(ObservableProperty::new(0));
2646 let prop_clone = prop.clone();
2647
2648 // Poison the lock
2649 let poison_thread = thread::spawn(move || {
2650 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for filtered subscription poison test");
2651 panic!("Deliberate panic to poison the lock");
2652 });
2653 let _ = poison_thread.join();
2654
2655 // With graceful degradation, subscribe_filtered_with_subscription should succeed
2656 let result = prop.subscribe_filtered_with_subscription(Arc::new(|_, _| {}), |_, _| true);
2657 assert!(result.is_ok(), "subscribe_filtered_with_subscription should succeed with graceful degradation");
2658 }
2659
2660 #[test]
2661 fn test_filtered_subscription_vs_manual_filtered() {
2662 let prop = ObservableProperty::new(0);
2663 let auto_count = Arc::new(AtomicUsize::new(0));
2664 let manual_count = Arc::new(AtomicUsize::new(0));
2665
2666 // Manual filtered subscription
2667 let manual_count_clone = manual_count.clone();
2668 let manual_id = prop
2669 .subscribe_filtered(
2670 Arc::new(move |_, _| {
2671 manual_count_clone.fetch_add(1, Ordering::SeqCst);
2672 }),
2673 |old, new| new > old,
2674 )
2675 .expect("Failed to create manual filtered subscription");
2676
2677 // Automatic filtered subscription
2678 let auto_count_clone = auto_count.clone();
2679 let _auto_subscription = prop
2680 .subscribe_filtered_with_subscription(
2681 Arc::new(move |_, _| {
2682 auto_count_clone.fetch_add(1, Ordering::SeqCst);
2683 }),
2684 |old, new| new > old,
2685 )
2686 .expect("Failed to create automatic filtered subscription");
2687
2688 // Both should be triggered by increases
2689 prop.set(5).expect("Failed to set property to 5 in filtered vs manual test");
2690 assert_eq!(manual_count.load(Ordering::SeqCst), 1);
2691 assert_eq!(auto_count.load(Ordering::SeqCst), 1);
2692
2693 // Neither should be triggered by decreases
2694 prop.set(3).expect("Failed to set property to 3 in filtered vs manual test");
2695 assert_eq!(manual_count.load(Ordering::SeqCst), 1);
2696 assert_eq!(auto_count.load(Ordering::SeqCst), 1);
2697
2698 // Both should be triggered by increases again
2699 prop.set(10).expect("Failed to set property to 10 in filtered vs manual test");
2700 assert_eq!(manual_count.load(Ordering::SeqCst), 2);
2701 assert_eq!(auto_count.load(Ordering::SeqCst), 2);
2702
2703 // Manual cleanup
2704 prop.unsubscribe(manual_id).expect("Failed to unsubscribe manual filtered observer");
2705
2706 // Only automatic subscription should be active
2707 prop.set(15).expect("Failed to set property to 15 after manual cleanup");
2708 assert_eq!(manual_count.load(Ordering::SeqCst), 2); // No change
2709 assert_eq!(auto_count.load(Ordering::SeqCst), 3);
2710
2711 // Auto subscription cleaned up when it goes out of scope
2712 }
2713
2714 #[test]
2715 fn test_filtered_subscription_with_panicking_filter() {
2716 let prop = ObservableProperty::new(0);
2717 let call_count = Arc::new(AtomicUsize::new(0));
2718
2719 let count = call_count.clone();
2720 let _subscription = prop
2721 .subscribe_filtered_with_subscription(
2722 Arc::new(move |_, _| {
2723 count.fetch_add(1, Ordering::SeqCst);
2724 }),
2725 |_, new| {
2726 if *new == 42 {
2727 panic!("Filter panic on 42");
2728 }
2729 true // Accept all other values
2730 },
2731 )
2732 .expect("Failed to create panicking filter subscription");
2733
2734 // Normal value should work
2735 prop.set(1).expect("Failed to set property to 1 in panicking filter test");
2736 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2737
2738 // Value that causes filter to panic should be handled gracefully
2739 // The behavior here depends on how the filter panic is handled
2740 // In the current implementation, filter panics may cause the observer to not be called
2741 prop.set(42).expect("Failed to set property to 42 in panicking filter test");
2742
2743 // Observer should still work for subsequent normal values
2744 prop.set(2).expect("Failed to set property to 2 after filter panic");
2745 // Note: The exact count here depends on panic handling implementation
2746 // The important thing is that the system doesn't crash
2747 }
2748
2749 #[test]
2750 fn test_subscription_thread_safety() {
2751 let prop = Arc::new(ObservableProperty::new(0));
2752 let num_threads = 8;
2753 let operations_per_thread = 50;
2754 let total_calls = Arc::new(AtomicUsize::new(0));
2755
2756 let handles: Vec<_> = (0..num_threads)
2757 .map(|thread_id| {
2758 let prop_clone = prop.clone();
2759 let calls_clone = total_calls.clone();
2760
2761 thread::spawn(move || {
2762 let mut local_subscriptions = Vec::new();
2763
2764 for i in 0..operations_per_thread {
2765 let calls = calls_clone.clone();
2766 let subscription = prop_clone
2767 .subscribe_with_subscription(Arc::new(move |old, new| {
2768 calls.fetch_add(1, Ordering::SeqCst);
2769 // Simulate some work
2770 let _ = thread_id + i + old + new;
2771 }))
2772 .expect("Should be able to create subscription");
2773
2774 local_subscriptions.push(subscription);
2775
2776 // Trigger observers
2777 prop_clone
2778 .set(thread_id * 1000 + i)
2779 .expect("Should be able to set value");
2780
2781 // Occasionally drop some subscriptions
2782 if i % 5 == 0 && !local_subscriptions.is_empty() {
2783 local_subscriptions.remove(0); // Drop first subscription
2784 }
2785 }
2786
2787 // All remaining subscriptions dropped when vector goes out of scope
2788 })
2789 })
2790 .collect();
2791
2792 // Wait for all threads to complete
2793 for handle in handles {
2794 handle.join().expect("Thread should complete successfully");
2795 }
2796
2797 // Property should still be functional after all the concurrent operations
2798 prop.set(9999).expect("Property should still work");
2799
2800 // We can't easily verify the exact call count due to the complex timing,
2801 // but we can verify that the system didn't crash and is still operational
2802 println!(
2803 "Total observer calls: {}",
2804 total_calls.load(Ordering::SeqCst)
2805 );
2806 }
2807
2808 #[test]
2809 fn test_subscription_cross_thread_drop() {
2810 let prop = Arc::new(ObservableProperty::new(0));
2811 let call_count = Arc::new(AtomicUsize::new(0));
2812
2813 // Create subscription in main thread
2814 let count = call_count.clone();
2815 let subscription = prop
2816 .subscribe_with_subscription(Arc::new(move |_, _| {
2817 count.fetch_add(1, Ordering::SeqCst);
2818 }))
2819 .expect("Failed to create subscription for cross-thread drop test");
2820
2821 // Verify observer is active
2822 prop.set(1).expect("Failed to set property value in cross-thread drop test");
2823 assert_eq!(call_count.load(Ordering::SeqCst), 1);
2824
2825 // Move subscription to another thread and drop it there
2826 let prop_clone = prop.clone();
2827 let call_count_clone = call_count.clone();
2828
2829 let handle = thread::spawn(move || {
2830 // Verify observer is still active in the other thread
2831 prop_clone.set(2).expect("Failed to set property value in other thread");
2832 assert_eq!(call_count_clone.load(Ordering::SeqCst), 2);
2833
2834 // Drop subscription in this thread
2835 drop(subscription);
2836
2837 // Verify observer is no longer active
2838 prop_clone.set(3).expect("Failed to set property value after drop in other thread");
2839 assert_eq!(call_count_clone.load(Ordering::SeqCst), 2); // No change
2840 });
2841
2842 handle.join().expect("Failed to join cross-thread drop test thread");
2843
2844 // Verify observer is still inactive in main thread
2845 prop.set(4).expect("Failed to set property value after thread join");
2846 assert_eq!(call_count.load(Ordering::SeqCst), 2);
2847 }
2848
2849 #[test]
2850 fn test_concurrent_subscription_creation_and_property_changes() {
2851 let prop = Arc::new(ObservableProperty::new(0));
2852 let total_notifications = Arc::new(AtomicUsize::new(0));
2853 let num_subscriber_threads = 4;
2854 let num_setter_threads = 2;
2855 let operations_per_thread = 25;
2856
2857 // Threads that create and destroy subscriptions
2858 let subscriber_handles: Vec<_> = (0..num_subscriber_threads)
2859 .map(|_| {
2860 let prop_clone = prop.clone();
2861 let notifications_clone = total_notifications.clone();
2862
2863 thread::spawn(move || {
2864 for _ in 0..operations_per_thread {
2865 let notifications = notifications_clone.clone();
2866 let _subscription = prop_clone
2867 .subscribe_with_subscription(Arc::new(move |_, _| {
2868 notifications.fetch_add(1, Ordering::SeqCst);
2869 }))
2870 .expect("Should create subscription");
2871
2872 // Keep subscription alive for a short time
2873 thread::sleep(Duration::from_millis(1));
2874
2875 // Subscription dropped when _subscription goes out of scope
2876 }
2877 })
2878 })
2879 .collect();
2880
2881 // Threads that continuously change the property value
2882 let setter_handles: Vec<_> = (0..num_setter_threads)
2883 .map(|thread_id| {
2884 let prop_clone = prop.clone();
2885
2886 thread::spawn(move || {
2887 for i in 0..operations_per_thread * 2 {
2888 prop_clone
2889 .set(thread_id * 10000 + i)
2890 .expect("Should set value");
2891 thread::sleep(Duration::from_millis(1));
2892 }
2893 })
2894 })
2895 .collect();
2896
2897 // Wait for all threads to complete
2898 for handle in subscriber_handles
2899 .into_iter()
2900 .chain(setter_handles.into_iter())
2901 {
2902 handle.join().expect("Thread should complete");
2903 }
2904
2905 // System should be stable after concurrent operations
2906 prop.set(99999).expect("Property should still work");
2907
2908 println!(
2909 "Total notifications during concurrent test: {}",
2910 total_notifications.load(Ordering::SeqCst)
2911 );
2912 }
2913
2914 #[test]
2915 fn test_filtered_subscription_thread_safety() {
2916 let prop = Arc::new(ObservableProperty::new(0));
2917 let increase_notifications = Arc::new(AtomicUsize::new(0));
2918 let decrease_notifications = Arc::new(AtomicUsize::new(0));
2919 let num_threads = 6;
2920
2921 let handles: Vec<_> = (0..num_threads)
2922 .map(|thread_id| {
2923 let prop_clone = prop.clone();
2924 let inc_notifications = increase_notifications.clone();
2925 let dec_notifications = decrease_notifications.clone();
2926
2927 thread::spawn(move || {
2928 // Create increase-only subscription
2929 let inc_count = inc_notifications.clone();
2930 let _inc_subscription = prop_clone
2931 .subscribe_filtered_with_subscription(
2932 Arc::new(move |_, _| {
2933 inc_count.fetch_add(1, Ordering::SeqCst);
2934 }),
2935 |old, new| new > old,
2936 )
2937 .expect("Should create filtered subscription");
2938
2939 // Create decrease-only subscription
2940 let dec_count = dec_notifications.clone();
2941 let _dec_subscription = prop_clone
2942 .subscribe_filtered_with_subscription(
2943 Arc::new(move |_, _| {
2944 dec_count.fetch_add(1, Ordering::SeqCst);
2945 }),
2946 |old, new| new < old,
2947 )
2948 .expect("Should create filtered subscription");
2949
2950 // Perform some property changes
2951 let base_value = thread_id * 100;
2952 for i in 0..20 {
2953 let new_value = base_value + (i % 10); // Creates increases and decreases
2954 prop_clone.set(new_value).expect("Should set value");
2955 thread::sleep(Duration::from_millis(1));
2956 }
2957
2958 // Subscriptions automatically cleaned up when going out of scope
2959 })
2960 })
2961 .collect();
2962
2963 // Wait for all threads
2964 for handle in handles {
2965 handle.join().expect("Thread should complete");
2966 }
2967
2968 // Verify system is still operational
2969 let initial_inc = increase_notifications.load(Ordering::SeqCst);
2970 let initial_dec = decrease_notifications.load(Ordering::SeqCst);
2971
2972 prop.set(1000).expect("Property should still work");
2973 prop.set(2000).expect("Property should still work");
2974
2975 // No new notifications should occur (all subscriptions cleaned up)
2976 assert_eq!(increase_notifications.load(Ordering::SeqCst), initial_inc);
2977 assert_eq!(decrease_notifications.load(Ordering::SeqCst), initial_dec);
2978
2979 println!(
2980 "Increase notifications: {}, Decrease notifications: {}",
2981 initial_inc, initial_dec
2982 );
2983 }
2984
2985 #[test]
2986 fn test_subscription_with_async_property_changes() {
2987 let prop = Arc::new(ObservableProperty::new(0));
2988 let sync_notifications = Arc::new(AtomicUsize::new(0));
2989 let async_notifications = Arc::new(AtomicUsize::new(0));
2990
2991 // Subscription that tracks sync notifications
2992 let sync_count = sync_notifications.clone();
2993 let _sync_subscription = prop
2994 .subscribe_with_subscription(Arc::new(move |old, new| {
2995 sync_count.fetch_add(1, Ordering::SeqCst);
2996 // Simulate slow observer work
2997 thread::sleep(Duration::from_millis(5));
2998 println!("Sync observer: {} -> {}", old, new);
2999 }))
3000 .expect("Failed to create sync subscription");
3001
3002 // Subscription that tracks async notifications
3003 let async_count = async_notifications.clone();
3004 let _async_subscription = prop
3005 .subscribe_with_subscription(Arc::new(move |old, new| {
3006 async_count.fetch_add(1, Ordering::SeqCst);
3007 println!("Async observer: {} -> {}", old, new);
3008 }))
3009 .expect("Failed to create async subscription");
3010
3011 // Test sync property changes
3012 let start = std::time::Instant::now();
3013 prop.set(1).expect("Failed to set property value 1 in async test");
3014 prop.set(2).expect("Failed to set property value 2 in async test");
3015 let sync_duration = start.elapsed();
3016
3017 // Test async property changes
3018 let start = std::time::Instant::now();
3019 prop.set_async(3).expect("Failed to set property value 3 async");
3020 prop.set_async(4).expect("Failed to set property value 4 async");
3021 let async_duration = start.elapsed();
3022
3023 // Async should be much faster
3024 assert!(async_duration < sync_duration);
3025
3026 // Wait for async observers to complete
3027 thread::sleep(Duration::from_millis(50));
3028
3029 // All observers should have been called
3030 assert_eq!(sync_notifications.load(Ordering::SeqCst), 4);
3031 assert_eq!(async_notifications.load(Ordering::SeqCst), 4);
3032
3033 // Subscriptions auto-cleanup when going out of scope
3034 }
3035
3036 #[test]
3037 fn test_subscription_creation_with_poisoned_lock() {
3038 let prop = Arc::new(ObservableProperty::new(0));
3039 let prop_clone = prop.clone();
3040
3041 // Create a valid subscription before poisoning
3042 let call_count = Arc::new(AtomicUsize::new(0));
3043 let count = call_count.clone();
3044 let existing_subscription = prop
3045 .subscribe_with_subscription(Arc::new(move |_, _| {
3046 count.fetch_add(1, Ordering::SeqCst);
3047 }))
3048 .expect("Failed to create subscription before poisoning");
3049
3050 // Poison the lock
3051 let poison_thread = thread::spawn(move || {
3052 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for subscription poison test");
3053 panic!("Deliberate panic to poison the lock");
3054 });
3055 let _ = poison_thread.join();
3056
3057 // With graceful degradation, new subscription creation should succeed
3058 let result = prop.subscribe_with_subscription(Arc::new(|_, _| {}));
3059 assert!(result.is_ok(), "subscribe_with_subscription should succeed with graceful degradation");
3060
3061 // New filtered subscription creation should also succeed
3062 let filtered_result =
3063 prop.subscribe_filtered_with_subscription(Arc::new(|_, _| {}), |_, _| true);
3064 assert!(filtered_result.is_ok(), "subscribe_filtered_with_subscription should succeed with graceful degradation");
3065
3066 // Dropping existing subscription should not panic
3067 drop(existing_subscription);
3068 }
3069
3070 #[test]
3071 fn test_subscription_cleanup_behavior_with_poisoned_lock() {
3072 // This test specifically verifies that Drop doesn't panic with poisoned locks
3073 let prop = Arc::new(ObservableProperty::new(0));
3074 let call_count = Arc::new(AtomicUsize::new(0));
3075
3076 // Create subscription
3077 let count = call_count.clone();
3078 let subscription = prop
3079 .subscribe_with_subscription(Arc::new(move |_, _| {
3080 count.fetch_add(1, Ordering::SeqCst);
3081 }))
3082 .expect("Failed to create subscription for cleanup behavior test");
3083
3084 // Verify it works initially
3085 prop.set(1).expect("Failed to set property value in cleanup behavior test");
3086 assert_eq!(call_count.load(Ordering::SeqCst), 1);
3087
3088 // Poison the lock from another thread
3089 let prop_clone = prop.clone();
3090 let poison_thread = thread::spawn(move || {
3091 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for cleanup behavior poison test");
3092 panic!("Deliberate panic to poison the lock");
3093 });
3094 let _ = poison_thread.join();
3095
3096 // Now drop the subscription - this should NOT panic
3097 // The Drop implementation should handle the poisoned lock gracefully
3098 drop(subscription);
3099
3100 // Test succeeds if we reach this point without panicking
3101 }
3102
3103 #[test]
3104 fn test_multiple_subscription_cleanup_with_poisoned_lock() {
3105 let prop = Arc::new(ObservableProperty::new(0));
3106 let mut subscriptions = Vec::new();
3107
3108 // Create multiple subscriptions
3109 for i in 0..5 {
3110 let call_count = Arc::new(AtomicUsize::new(0));
3111 let count = call_count.clone();
3112 let subscription = prop
3113 .subscribe_with_subscription(Arc::new(move |old, new| {
3114 count.fetch_add(1, Ordering::SeqCst);
3115 println!("Observer {}: {} -> {}", i, old, new);
3116 }))
3117 .expect("Failed to create subscription in multiple cleanup test");
3118 subscriptions.push(subscription);
3119 }
3120
3121 // Verify they all work
3122 prop.set(42).expect("Failed to set property value in multiple cleanup test");
3123
3124 // Poison the lock
3125 let prop_clone = prop.clone();
3126 let poison_thread = thread::spawn(move || {
3127 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for multiple cleanup poison test");
3128 panic!("Deliberate panic to poison the lock");
3129 });
3130 let _ = poison_thread.join();
3131
3132 // Drop all subscriptions - none should panic
3133 for subscription in subscriptions {
3134 drop(subscription);
3135 }
3136
3137 // Test succeeds if no panics occurred
3138 }
3139
3140 #[test]
3141 fn test_subscription_behavior_before_and_after_poison() {
3142 let prop = Arc::new(ObservableProperty::new(0));
3143 let before_poison_count = Arc::new(AtomicUsize::new(0));
3144 let after_poison_count = Arc::new(AtomicUsize::new(0));
3145
3146 // Create subscription before poisoning
3147 let before_count = before_poison_count.clone();
3148 let before_subscription = prop
3149 .subscribe_with_subscription(Arc::new(move |_, _| {
3150 before_count.fetch_add(1, Ordering::SeqCst);
3151 }))
3152 .expect("Failed to create subscription before poison test");
3153
3154 // Verify it works
3155 prop.set(1).expect("Failed to set property value before poison test");
3156 assert_eq!(before_poison_count.load(Ordering::SeqCst), 1);
3157
3158 // Poison the lock
3159 let prop_clone = prop.clone();
3160 let poison_thread = thread::spawn(move || {
3161 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for before/after poison test");
3162 panic!("Deliberate panic to poison the lock");
3163 });
3164 let _ = poison_thread.join();
3165
3166 // With graceful degradation, subscription creation after poisoning should succeed
3167 let after_count = after_poison_count.clone();
3168 let after_result = prop.subscribe_with_subscription(Arc::new(move |_, _| {
3169 after_count.fetch_add(1, Ordering::SeqCst);
3170 }));
3171 assert!(after_result.is_ok(), "subscribe_with_subscription should succeed with graceful degradation");
3172
3173 let _after_subscription = after_result.unwrap();
3174
3175 // Clean up the before-poison subscription - should not panic
3176 drop(before_subscription);
3177 }
3178
3179 #[test]
3180 fn test_concurrent_subscription_drops_with_poison() {
3181 let prop = Arc::new(ObservableProperty::new(0));
3182 let num_subscriptions = 10;
3183 let mut subscriptions = Vec::new();
3184
3185 // Create multiple subscriptions
3186 for i in 0..num_subscriptions {
3187 let call_count = Arc::new(AtomicUsize::new(0));
3188 let count = call_count.clone();
3189 let subscription = prop
3190 .subscribe_with_subscription(Arc::new(move |_, _| {
3191 count.fetch_add(1, Ordering::SeqCst);
3192 println!("Observer {}", i);
3193 }))
3194 .expect("Failed to create subscription in concurrent drops test");
3195 subscriptions.push(subscription);
3196 }
3197
3198 // Poison the lock
3199 let prop_clone = prop.clone();
3200 let poison_thread = thread::spawn(move || {
3201 let _guard = prop_clone.inner.write().expect("Failed to acquire write lock for concurrent drops poison test");
3202 panic!("Deliberate panic to poison the lock");
3203 });
3204 let _ = poison_thread.join();
3205
3206 // Drop subscriptions concurrently from multiple threads
3207 let handles: Vec<_> = subscriptions
3208 .into_iter()
3209 .enumerate()
3210 .map(|(i, subscription)| {
3211 thread::spawn(move || {
3212 // Add some randomness to timing
3213 thread::sleep(Duration::from_millis(i as u64 % 5));
3214 drop(subscription);
3215 println!("Dropped subscription {}", i);
3216 })
3217 })
3218 .collect();
3219
3220 // Wait for all drops to complete
3221 for handle in handles {
3222 handle
3223 .join()
3224 .expect("Drop thread should complete without panic");
3225 }
3226
3227 // Test succeeds if all threads completed successfully
3228 }
3229
3230
3231 #[test]
3232 fn test_with_max_threads_creation() {
3233 // Test creation with various thread counts
3234 let prop1 = ObservableProperty::with_max_threads(42, 1);
3235 let prop2 = ObservableProperty::with_max_threads("test".to_string(), 8);
3236 let prop3 = ObservableProperty::with_max_threads(0.5_f64, 16);
3237
3238 // Verify initial values are correct
3239 assert_eq!(prop1.get().expect("Failed to get prop1 value"), 42);
3240 assert_eq!(prop2.get().expect("Failed to get prop2 value"), "test");
3241 assert_eq!(prop3.get().expect("Failed to get prop3 value"), 0.5);
3242 }
3243
3244 #[test]
3245 fn test_with_max_threads_zero_defaults_to_max_threads() {
3246 // Test that zero max_threads defaults to MAX_THREADS (4)
3247 let prop1 = ObservableProperty::with_max_threads(100, 0);
3248 let prop2 = ObservableProperty::new(100); // Uses default MAX_THREADS
3249
3250 // Both should have the same max_threads value
3251 // We can't directly access max_threads, but we can verify behavior is consistent
3252 assert_eq!(prop1.get().expect("Failed to get prop1 value"), 100);
3253 assert_eq!(prop2.get().expect("Failed to get prop2 value"), 100);
3254 }
3255
3256 #[test]
3257 fn test_with_max_threads_basic_functionality() {
3258 let prop = ObservableProperty::with_max_threads(0, 2);
3259 let call_count = Arc::new(AtomicUsize::new(0));
3260
3261 // Subscribe an observer
3262 let count = call_count.clone();
3263 let _subscription = prop
3264 .subscribe_with_subscription(Arc::new(move |old, new| {
3265 count.fetch_add(1, Ordering::SeqCst);
3266 assert_eq!(*old, 0);
3267 assert_eq!(*new, 42);
3268 }))
3269 .expect("Failed to create subscription for max_threads test");
3270
3271 // Test synchronous set
3272 prop.set(42).expect("Failed to set value synchronously");
3273 assert_eq!(call_count.load(Ordering::SeqCst), 1);
3274
3275 // Test asynchronous set
3276 prop.set_async(43).expect("Failed to set value asynchronously");
3277
3278 // Wait for async observers to complete
3279 thread::sleep(Duration::from_millis(50));
3280 assert_eq!(call_count.load(Ordering::SeqCst), 2);
3281 }
3282
3283 #[test]
3284 fn test_with_max_threads_async_performance() {
3285 // Test that with_max_threads affects async performance
3286 let prop = ObservableProperty::with_max_threads(0, 1); // Single thread
3287 let slow_call_count = Arc::new(AtomicUsize::new(0));
3288
3289 // Add multiple slow observers
3290 let mut subscriptions = Vec::new();
3291 for _ in 0..4 {
3292 let count = slow_call_count.clone();
3293 let subscription = prop
3294 .subscribe_with_subscription(Arc::new(move |_, _| {
3295 thread::sleep(Duration::from_millis(25)); // Simulate slow work
3296 count.fetch_add(1, Ordering::SeqCst);
3297 }))
3298 .expect("Failed to create slow observer subscription");
3299 subscriptions.push(subscription);
3300 }
3301
3302 // Measure time for async notification
3303 let start = std::time::Instant::now();
3304 prop.set_async(42).expect("Failed to set value asynchronously");
3305 let async_duration = start.elapsed();
3306
3307 // Should return quickly even with slow observers
3308 assert!(async_duration.as_millis() < 50, "Async set should return quickly");
3309
3310 // Wait for all observers to complete
3311 thread::sleep(Duration::from_millis(200));
3312 assert_eq!(slow_call_count.load(Ordering::SeqCst), 4);
3313 }
3314
3315 #[test]
3316 fn test_with_max_threads_vs_regular_constructor() {
3317 let prop_regular = ObservableProperty::new(42);
3318 let prop_custom = ObservableProperty::with_max_threads(42, 4); // Same as default
3319
3320 let count_regular = Arc::new(AtomicUsize::new(0));
3321 let count_custom = Arc::new(AtomicUsize::new(0));
3322
3323 // Both should behave identically
3324 let count1 = count_regular.clone();
3325 let _sub1 = prop_regular
3326 .subscribe_with_subscription(Arc::new(move |_, _| {
3327 count1.fetch_add(1, Ordering::SeqCst);
3328 }))
3329 .expect("Failed to create regular subscription");
3330
3331 let count2 = count_custom.clone();
3332 let _sub2 = prop_custom
3333 .subscribe_with_subscription(Arc::new(move |_, _| {
3334 count2.fetch_add(1, Ordering::SeqCst);
3335 }))
3336 .expect("Failed to create custom subscription");
3337
3338 // Test sync behavior
3339 prop_regular.set(100).expect("Failed to set regular property");
3340 prop_custom.set(100).expect("Failed to set custom property");
3341
3342 assert_eq!(count_regular.load(Ordering::SeqCst), 1);
3343 assert_eq!(count_custom.load(Ordering::SeqCst), 1);
3344
3345 // Test async behavior
3346 prop_regular.set_async(200).expect("Failed to set regular property async");
3347 prop_custom.set_async(200).expect("Failed to set custom property async");
3348
3349 thread::sleep(Duration::from_millis(50));
3350 assert_eq!(count_regular.load(Ordering::SeqCst), 2);
3351 assert_eq!(count_custom.load(Ordering::SeqCst), 2);
3352 }
3353
3354 #[test]
3355 fn test_with_max_threads_large_values() {
3356 // Test with very large max_threads values
3357 let prop = ObservableProperty::with_max_threads(0, 1000);
3358 let call_count = Arc::new(AtomicUsize::new(0));
3359
3360 // Add a few observers
3361 let count = call_count.clone();
3362 let _subscription = prop
3363 .subscribe_with_subscription(Arc::new(move |_, _| {
3364 count.fetch_add(1, Ordering::SeqCst);
3365 }))
3366 .expect("Failed to create subscription for large max_threads test");
3367
3368 // Should work normally even with excessive thread limit
3369 prop.set_async(42).expect("Failed to set value with large max_threads");
3370
3371 thread::sleep(Duration::from_millis(50));
3372 assert_eq!(call_count.load(Ordering::SeqCst), 1);
3373 }
3374
3375 #[test]
3376 fn test_with_max_threads_clone_behavior() {
3377 let prop1 = ObservableProperty::with_max_threads(42, 2);
3378 let prop2 = prop1.clone();
3379
3380 let call_count1 = Arc::new(AtomicUsize::new(0));
3381 let call_count2 = Arc::new(AtomicUsize::new(0));
3382
3383 // Subscribe to both properties
3384 let count1 = call_count1.clone();
3385 let _sub1 = prop1
3386 .subscribe_with_subscription(Arc::new(move |_, _| {
3387 count1.fetch_add(1, Ordering::SeqCst);
3388 }))
3389 .expect("Failed to create subscription for cloned property test");
3390
3391 let count2 = call_count2.clone();
3392 let _sub2 = prop2
3393 .subscribe_with_subscription(Arc::new(move |_, _| {
3394 count2.fetch_add(1, Ordering::SeqCst);
3395 }))
3396 .expect("Failed to create subscription for original property test");
3397
3398 // Changes through either property should trigger both observers
3399 prop1.set_async(100).expect("Failed to set value through prop1");
3400 thread::sleep(Duration::from_millis(50));
3401
3402 assert_eq!(call_count1.load(Ordering::SeqCst), 1);
3403 assert_eq!(call_count2.load(Ordering::SeqCst), 1);
3404
3405 prop2.set_async(200).expect("Failed to set value through prop2");
3406 thread::sleep(Duration::from_millis(50));
3407
3408 assert_eq!(call_count1.load(Ordering::SeqCst), 2);
3409 assert_eq!(call_count2.load(Ordering::SeqCst), 2);
3410 }
3411
3412 #[test]
3413 fn test_with_max_threads_thread_safety() {
3414 let prop = Arc::new(ObservableProperty::with_max_threads(0, 3));
3415 let call_count = Arc::new(AtomicUsize::new(0));
3416
3417 // Add observers from multiple threads
3418 let handles: Vec<_> = (0..5)
3419 .map(|thread_id| {
3420 let prop_clone = prop.clone();
3421 let count_clone = call_count.clone();
3422
3423 thread::spawn(move || {
3424 let count = count_clone.clone();
3425 let _subscription = prop_clone
3426 .subscribe_with_subscription(Arc::new(move |_, _| {
3427 count.fetch_add(1, Ordering::SeqCst);
3428 }))
3429 .expect("Failed to create thread-safe subscription");
3430
3431 // Trigger async notifications from this thread
3432 prop_clone
3433 .set_async(thread_id * 10)
3434 .expect("Failed to set value from thread");
3435
3436 thread::sleep(Duration::from_millis(10));
3437 })
3438 })
3439 .collect();
3440
3441 // Wait for all threads to complete
3442 for handle in handles {
3443 handle.join().expect("Thread should complete successfully");
3444 }
3445
3446 // Wait for all async operations to complete
3447 thread::sleep(Duration::from_millis(100));
3448
3449 // Each set_async should trigger all active observers at that time
3450 // The exact count depends on timing, but should be > 0
3451 assert!(call_count.load(Ordering::SeqCst) > 0);
3452 }
3453
3454 #[test]
3455 fn test_with_max_threads_error_handling() {
3456 let prop = ObservableProperty::with_max_threads(42, 2);
3457
3458 // Test that error handling works the same as regular properties
3459 let _subscription = prop
3460 .subscribe_with_subscription(Arc::new(|_, _| {
3461 // Normal observer
3462 }))
3463 .expect("Failed to create subscription for error handling test");
3464
3465 // Should handle errors gracefully
3466 assert!(prop.set(100).is_ok());
3467 assert!(prop.set_async(200).is_ok());
3468 assert_eq!(prop.get().expect("Failed to get value after error test"), 200);
3469 }
3470
3471 #[test]
3472 fn test_observer_limit_enforcement() {
3473 let prop = ObservableProperty::new(0);
3474 let mut observer_ids = Vec::new();
3475
3476 // Add observers up to the limit (using a small test to avoid slow tests)
3477 // In reality, MAX_OBSERVERS is 10,000, but we'll test the mechanism
3478 // by adding a reasonable number and then checking the error message
3479 for i in 0..100 {
3480 let result = prop.subscribe(Arc::new(move |_, _| {
3481 let _ = i; // Use the capture to make each observer unique
3482 }));
3483 assert!(result.is_ok(), "Should be able to add observer {}", i);
3484 observer_ids.push(result.unwrap());
3485 }
3486
3487 // Verify all observers were added
3488 assert_eq!(observer_ids.len(), 100);
3489
3490 // Verify we can still add more (we're well under the 10,000 limit)
3491 let result = prop.subscribe(Arc::new(|_, _| {}));
3492 assert!(result.is_ok());
3493 }
3494
3495 #[test]
3496 fn test_observer_limit_error_message() {
3497 let prop = ObservableProperty::new(0);
3498
3499 // We can't easily test hitting the actual 10,000 limit in a unit test
3500 // (it would be too slow), but we can verify the error type exists
3501 // and the subscribe method has the check
3502
3503 // Add a few observers successfully
3504 for _ in 0..10 {
3505 assert!(prop.subscribe(Arc::new(|_, _| {})).is_ok());
3506 }
3507
3508 // The mechanism is in place - the limit check happens before insertion
3509 // In production, if 10,000 observers are added, the 10,001st will fail
3510 }
3511
3512 #[test]
3513 fn test_observer_limit_with_unsubscribe() {
3514 let prop = ObservableProperty::new(0);
3515
3516 // Add observers
3517 let mut ids = Vec::new();
3518 for _ in 0..50 {
3519 ids.push(prop.subscribe(Arc::new(|_, _| {})).expect("Failed to subscribe"));
3520 }
3521
3522 // Remove half of them
3523 for id in ids.iter().take(25) {
3524 assert!(prop.unsubscribe(*id).expect("Failed to unsubscribe"));
3525 }
3526
3527 // Should be able to add more observers after unsubscribing
3528 for _ in 0..30 {
3529 assert!(prop.subscribe(Arc::new(|_, _| {})).is_ok());
3530 }
3531 }
3532
3533 #[test]
3534 fn test_observer_limit_with_raii_subscriptions() {
3535 let prop = ObservableProperty::new(0);
3536
3537 // Create RAII subscriptions
3538 let mut subscriptions = Vec::new();
3539 for _ in 0..50 {
3540 subscriptions.push(
3541 prop.subscribe_with_subscription(Arc::new(|_, _| {}))
3542 .expect("Failed to create subscription")
3543 );
3544 }
3545
3546 // Drop half of them (automatic cleanup)
3547 subscriptions.truncate(25);
3548
3549 // Should be able to add more after RAII cleanup
3550 for _ in 0..30 {
3551 let _sub = prop.subscribe_with_subscription(Arc::new(|_, _| {}))
3552 .expect("Failed to create subscription after RAII cleanup");
3553 }
3554 }
3555
3556 #[test]
3557 fn test_filtered_subscription_respects_observer_limit() {
3558 let prop = ObservableProperty::new(0);
3559
3560 // Add regular and filtered observers
3561 for i in 0..50 {
3562 if i % 2 == 0 {
3563 assert!(prop.subscribe(Arc::new(|_, _| {})).is_ok());
3564 } else {
3565 assert!(prop.subscribe_filtered(Arc::new(|_, _| {}), |_, _| true).is_ok());
3566 }
3567 }
3568
3569 // Both types count toward the limit
3570 // Should still be well under the 10,000 limit
3571 assert!(prop.subscribe_filtered(Arc::new(|_, _| {}), |_, _| true).is_ok());
3572 }
3573
3574 #[test]
3575 fn test_observer_limit_concurrent_subscriptions() {
3576 let prop = Arc::new(ObservableProperty::new(0));
3577 let success_count = Arc::new(AtomicUsize::new(0));
3578
3579 // Try to add observers from multiple threads
3580 let handles: Vec<_> = (0..10)
3581 .map(|_| {
3582 let prop_clone = prop.clone();
3583 let count_clone = success_count.clone();
3584
3585 thread::spawn(move || {
3586 for _ in 0..10 {
3587 if prop_clone.subscribe(Arc::new(|_, _| {})).is_ok() {
3588 count_clone.fetch_add(1, Ordering::SeqCst);
3589 }
3590 thread::sleep(Duration::from_micros(10));
3591 }
3592 })
3593 })
3594 .collect();
3595
3596 for handle in handles {
3597 handle.join().expect("Thread should complete");
3598 }
3599
3600 // All 100 subscriptions should succeed (well under limit)
3601 assert_eq!(success_count.load(Ordering::SeqCst), 100);
3602 }
3603
3604 #[test]
3605 fn test_notify_observers_batch_releases_lock_early() {
3606 use std::sync::atomic::AtomicBool;
3607
3608 let prop = Arc::new(ObservableProperty::new(0));
3609 let call_count = Arc::new(AtomicUsize::new(0));
3610 let started = Arc::new(AtomicBool::new(false));
3611
3612 // Subscribe with a slow observer
3613 let started_clone = started.clone();
3614 let count_clone = call_count.clone();
3615 prop.subscribe(Arc::new(move |_, _| {
3616 started_clone.store(true, Ordering::SeqCst);
3617 count_clone.fetch_add(1, Ordering::SeqCst);
3618 // Simulate slow observer
3619 thread::sleep(Duration::from_millis(50));
3620 })).expect("Failed to subscribe");
3621
3622 // Start batch notification in background
3623 let prop_clone = prop.clone();
3624 let batch_handle = thread::spawn(move || {
3625 prop_clone.notify_observers_batch(vec![(0, 1), (1, 2)]).expect("Failed to notify batch");
3626 });
3627
3628 // Wait for observer to start
3629 while !started.load(Ordering::SeqCst) {
3630 thread::sleep(Duration::from_millis(1));
3631 }
3632
3633 // Now verify we can still subscribe while observer is running
3634 // This proves the lock was released before observer execution
3635 let subscribe_result = prop.subscribe(Arc::new(|_, _| {
3636 // New observer
3637 }));
3638
3639 assert!(subscribe_result.is_ok(), "Should be able to subscribe while batch notification is in progress");
3640
3641 // Wait for batch to complete
3642 batch_handle.join().expect("Batch thread should complete");
3643
3644 // Verify observers were called (2 changes in batch)
3645 assert_eq!(call_count.load(Ordering::SeqCst), 2);
3646 }
3647
3648 #[test]
3649 fn test_notify_observers_batch_panic_isolation() {
3650 let prop = ObservableProperty::new(0);
3651 let good_observer_count = Arc::new(AtomicUsize::new(0));
3652 let count_clone = good_observer_count.clone();
3653
3654 // First observer that panics
3655 prop.subscribe(Arc::new(|_, _| {
3656 panic!("Deliberate panic in batch observer");
3657 })).expect("Failed to subscribe panicking observer");
3658
3659 // Second observer that should still be called
3660 prop.subscribe(Arc::new(move |_, _| {
3661 count_clone.fetch_add(1, Ordering::SeqCst);
3662 })).expect("Failed to subscribe good observer");
3663
3664 // Batch notification should not fail despite panic
3665 let result = prop.notify_observers_batch(vec![(0, 1), (1, 2), (2, 3)]);
3666 assert!(result.is_ok());
3667
3668 // Second observer should have been called for all 3 changes
3669 assert_eq!(good_observer_count.load(Ordering::SeqCst), 3);
3670 }
3671
3672 #[test]
3673 fn test_notify_observers_batch_multiple_changes() {
3674 let prop = ObservableProperty::new(0);
3675 let received_changes = Arc::new(RwLock::new(Vec::new()));
3676 let changes_clone = received_changes.clone();
3677
3678 prop.subscribe(Arc::new(move |old, new| {
3679 if let Ok(mut changes) = changes_clone.write() {
3680 changes.push((*old, *new));
3681 }
3682 })).expect("Failed to subscribe");
3683
3684 // Send multiple changes
3685 prop.notify_observers_batch(vec![
3686 (0, 10),
3687 (10, 20),
3688 (20, 30),
3689 (30, 40),
3690 ]).expect("Failed to notify batch");
3691
3692 let changes = received_changes.read().expect("Failed to read changes");
3693 assert_eq!(changes.len(), 4);
3694 assert_eq!(changes[0], (0, 10));
3695 assert_eq!(changes[1], (10, 20));
3696 assert_eq!(changes[2], (20, 30));
3697 assert_eq!(changes[3], (30, 40));
3698 }
3699
3700 #[test]
3701 fn test_notify_observers_batch_empty() {
3702 let prop = ObservableProperty::new(0);
3703 let call_count = Arc::new(AtomicUsize::new(0));
3704 let count_clone = call_count.clone();
3705
3706 prop.subscribe(Arc::new(move |_, _| {
3707 count_clone.fetch_add(1, Ordering::SeqCst);
3708 })).expect("Failed to subscribe");
3709
3710 // Empty batch should succeed without calling observers
3711 prop.notify_observers_batch(vec![]).expect("Failed with empty batch");
3712
3713 assert_eq!(call_count.load(Ordering::SeqCst), 0);
3714 }
3715}