observable_property/lib.rs
1//! # Observable Property
2//!
3//! A thread-safe observable property implementation for Rust that allows you to
4//! observe changes to values across multiple threads.
5//!
6//! ## Features
7//!
8//! - **Thread-safe**: Uses `Arc<RwLock<>>` for safe concurrent access
9//! - **Observer pattern**: Subscribe to property changes with callbacks
10//! - **Filtered observers**: Only notify when specific conditions are met
11//! - **Async notifications**: Non-blocking observer notifications with background threads
12//! - **Panic isolation**: Observer panics don't crash the system
13//! - **Type-safe**: Generic implementation works with any `Clone + Send + Sync` type
14//!
15//! ## Quick Start
16//!
17//! ```rust
18//! use observable_property::ObservableProperty;
19//! use std::sync::Arc;
20//!
21//! // Create an observable property
22//! let property = ObservableProperty::new(42);
23//!
24//! // Subscribe to changes
25//! let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
26//! println!("Value changed from {} to {}", old_value, new_value);
27//! })).unwrap();
28//!
29//! // Change the value (triggers observer)
30//! property.set(100).unwrap();
31//!
32//! // Unsubscribe when done
33//! property.unsubscribe(observer_id).unwrap();
34//! ```
35//!
36//! ## Multi-threading Example
37//!
38//! ```rust
39//! use observable_property::ObservableProperty;
40//! use std::sync::Arc;
41//! use std::thread;
42//!
43//! let property = Arc::new(ObservableProperty::new(0));
44//! let property_clone = property.clone();
45//!
46//! // Subscribe from one thread
47//! property.subscribe(Arc::new(|old, new| {
48//! println!("Value changed: {} -> {}", old, new);
49//! })).unwrap();
50//!
51//! // Modify from another thread
52//! thread::spawn(move || {
53//! property_clone.set(42).unwrap();
54//! }).join().unwrap();
55//! ```
56
57use std::collections::HashMap;
58use std::fmt;
59use std::panic;
60use std::sync::{Arc, RwLock};
61use std::thread;
62
63/// Errors that can occur when working with ObservableProperty
64#[derive(Debug, Clone)]
65pub enum PropertyError {
66 /// Failed to acquire a read lock on the property
67 ReadLockError {
68 /// Context describing what operation was being attempted
69 context: String
70 },
71 /// Failed to acquire a write lock on the property
72 WriteLockError {
73 /// Context describing what operation was being attempted
74 context: String
75 },
76 /// Attempted to unsubscribe an observer that doesn't exist
77 ObserverNotFound {
78 /// The ID of the observer that wasn't found
79 id: usize
80 },
81 /// The property's lock has been poisoned due to a panic in another thread
82 PoisonedLock,
83 /// An observer function encountered an error during execution
84 ObserverError {
85 /// Description of what went wrong
86 reason: String
87 },
88}
89
90impl fmt::Display for PropertyError {
91 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92 match self {
93 PropertyError::ReadLockError { context } => {
94 write!(f, "Failed to acquire read lock: {}", context)
95 }
96 PropertyError::WriteLockError { context } => {
97 write!(f, "Failed to acquire write lock: {}", context)
98 }
99 PropertyError::ObserverNotFound { id } => {
100 write!(f, "Observer with ID {} not found", id)
101 }
102 PropertyError::PoisonedLock => {
103 write!(
104 f,
105 "Property is in a poisoned state due to a panic in another thread"
106 )
107 }
108 PropertyError::ObserverError { reason } => {
109 write!(f, "Observer execution failed: {}", reason)
110 }
111 }
112 }
113}
114
115impl std::error::Error for PropertyError {}
116
117/// Function type for observers that get called when property values change
118pub type Observer<T> = Arc<dyn Fn(&T, &T) + Send + Sync>;
119
120/// Unique identifier for registered observers
121pub type ObserverId = usize;
122
123/// A thread-safe observable property that notifies observers when its value changes
124///
125/// This type wraps a value of type `T` and allows multiple observers to be notified
126/// whenever the value is modified. All operations are thread-safe and can be called
127/// from multiple threads concurrently.
128///
129/// # Type Requirements
130///
131/// The generic type `T` must implement:
132/// - `Clone`: Required for returning values and passing them to observers
133/// - `Send`: Required for transferring between threads
134/// - `Sync`: Required for concurrent access from multiple threads
135/// - `'static`: Required for observer callbacks that may outlive the original scope
136///
137/// # Examples
138///
139/// ```rust
140/// use observable_property::ObservableProperty;
141/// use std::sync::Arc;
142///
143/// let property = ObservableProperty::new("initial".to_string());
144///
145/// let observer_id = property.subscribe(Arc::new(|old, new| {
146/// println!("Changed from '{}' to '{}'", old, new);
147/// })).unwrap();
148///
149/// property.set("updated".to_string()).unwrap(); // Prints: Changed from 'initial' to 'updated'
150/// property.unsubscribe(observer_id).unwrap();
151/// ```
152pub struct ObservableProperty<T> {
153 inner: Arc<RwLock<InnerProperty<T>>>,
154}
155
156struct InnerProperty<T> {
157 value: T,
158 observers: HashMap<ObserverId, Observer<T>>,
159 next_id: ObserverId,
160}
161
162impl<T: Clone + Send + Sync + 'static> ObservableProperty<T> {
163 /// Creates a new observable property with the given initial value
164 ///
165 /// # Arguments
166 ///
167 /// * `initial_value` - The starting value for this property
168 ///
169 /// # Examples
170 ///
171 /// ```rust
172 /// use observable_property::ObservableProperty;
173 ///
174 /// let property = ObservableProperty::new(42);
175 /// assert_eq!(property.get().unwrap(), 42);
176 /// ```
177 pub fn new(initial_value: T) -> Self {
178 Self {
179 inner: Arc::new(RwLock::new(InnerProperty {
180 value: initial_value,
181 observers: HashMap::new(),
182 next_id: 0,
183 })),
184 }
185 }
186
187 /// Gets the current value of the property
188 ///
189 /// This method acquires a read lock, which allows multiple concurrent readers
190 /// but will block if a writer currently holds the lock.
191 ///
192 /// # Returns
193 ///
194 /// `Ok(T)` containing a clone of the current value, or `Err(PropertyError)`
195 /// if the lock is poisoned.
196 ///
197 /// # Examples
198 ///
199 /// ```rust
200 /// use observable_property::ObservableProperty;
201 ///
202 /// let property = ObservableProperty::new("hello".to_string());
203 /// assert_eq!(property.get().unwrap(), "hello");
204 /// ```
205 pub fn get(&self) -> Result<T, PropertyError> {
206 self.inner
207 .read()
208 .map(|prop| prop.value.clone())
209 .map_err(|_| PropertyError::PoisonedLock)
210 }
211
212 /// Sets the property to a new value and notifies all observers
213 ///
214 /// This method will:
215 /// 1. Acquire a write lock (blocking other readers/writers)
216 /// 2. Update the value and capture a snapshot of observers
217 /// 3. Release the lock
218 /// 4. Notify all observers sequentially with the old and new values
219 ///
220 /// Observer notifications are wrapped in panic recovery to prevent one
221 /// misbehaving observer from affecting others.
222 ///
223 /// # Arguments
224 ///
225 /// * `new_value` - The new value to set
226 ///
227 /// # Returns
228 ///
229 /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
230 ///
231 /// # Examples
232 ///
233 /// ```rust
234 /// use observable_property::ObservableProperty;
235 /// use std::sync::Arc;
236 ///
237 /// let property = ObservableProperty::new(10);
238 ///
239 /// property.subscribe(Arc::new(|old, new| {
240 /// println!("Value changed from {} to {}", old, new);
241 /// })).unwrap();
242 ///
243 /// property.set(20).unwrap(); // Triggers observer notification
244 /// ```
245 pub fn set(&self, new_value: T) -> Result<(), PropertyError> {
246 let (old_value, observers_snapshot) = {
247 let mut prop = self
248 .inner
249 .write()
250 .map_err(|_| PropertyError::WriteLockError {
251 context: "setting property value".to_string(),
252 })?;
253
254 let old_value = prop.value.clone();
255 prop.value = new_value.clone();
256 let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
257 (old_value, observers_snapshot)
258 };
259
260 for observer in observers_snapshot {
261 if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
262 observer(&old_value, &new_value);
263 })) {
264 eprintln!("Observer panic: {:?}", e);
265 }
266 }
267
268 Ok(())
269 }
270
271 /// Sets the property to a new value and notifies observers asynchronously
272 ///
273 /// This method is similar to `set()` but spawns observers in background threads
274 /// for non-blocking operation. This is useful when observers might perform
275 /// time-consuming operations.
276 ///
277 /// Observers are batched into groups and each batch runs in its own thread
278 /// to limit resource usage while still providing parallelism.
279 ///
280 /// # Arguments
281 ///
282 /// * `new_value` - The new value to set
283 ///
284 /// # Returns
285 ///
286 /// `Ok(())` if successful, or `Err(PropertyError)` if the lock is poisoned.
287 /// Note that this only indicates the property was updated successfully;
288 /// observer execution happens asynchronously.
289 ///
290 /// # Examples
291 ///
292 /// ```rust
293 /// use observable_property::ObservableProperty;
294 /// use std::sync::Arc;
295 /// use std::time::Duration;
296 ///
297 /// let property = ObservableProperty::new(0);
298 ///
299 /// property.subscribe(Arc::new(|old, new| {
300 /// // This observer does slow work but won't block the caller
301 /// std::thread::sleep(Duration::from_millis(100));
302 /// println!("Slow observer: {} -> {}", old, new);
303 /// })).unwrap();
304 ///
305 /// // This returns immediately even though observer is slow
306 /// property.set_async(42).unwrap();
307 /// ```
308 pub fn set_async(&self, new_value: T) -> Result<(), PropertyError> {
309 let (old_value, observers_snapshot) = {
310 let mut prop = self
311 .inner
312 .write()
313 .map_err(|_| PropertyError::WriteLockError {
314 context: "setting property value".to_string(),
315 })?;
316
317 let old_value = prop.value.clone();
318 prop.value = new_value.clone();
319 let observers_snapshot: Vec<Observer<T>> = prop.observers.values().cloned().collect();
320 (old_value, observers_snapshot)
321 };
322
323 if observers_snapshot.is_empty() {
324 return Ok(());
325 }
326
327 const MAX_THREADS: usize = 4;
328 let observers_per_thread = (observers_snapshot.len() + MAX_THREADS - 1) / MAX_THREADS;
329
330 for batch in observers_snapshot.chunks(observers_per_thread) {
331 let batch_observers = batch.to_vec();
332 let old_val = old_value.clone();
333 let new_val = new_value.clone();
334
335 thread::spawn(move || {
336 for observer in batch_observers {
337 if let Err(e) = panic::catch_unwind(panic::AssertUnwindSafe(|| {
338 observer(&old_val, &new_val);
339 })) {
340 eprintln!("Observer panic in batch thread: {:?}", e);
341 }
342 }
343 });
344 }
345
346 Ok(())
347 }
348
349 /// Subscribes an observer function to be called when the property changes
350 ///
351 /// The observer function will be called with the old and new values whenever
352 /// the property is modified via `set()` or `set_async()`.
353 ///
354 /// # Arguments
355 ///
356 /// * `observer` - A function wrapped in `Arc` that takes `(&T, &T)` parameters
357 ///
358 /// # Returns
359 ///
360 /// `Ok(ObserverId)` containing a unique identifier for this observer,
361 /// or `Err(PropertyError)` if the lock is poisoned.
362 ///
363 /// # Examples
364 ///
365 /// ```rust
366 /// use observable_property::ObservableProperty;
367 /// use std::sync::Arc;
368 ///
369 /// let property = ObservableProperty::new(0);
370 ///
371 /// let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
372 /// println!("Property changed from {} to {}", old_value, new_value);
373 /// })).unwrap();
374 ///
375 /// // Later, unsubscribe using the returned ID
376 /// property.unsubscribe(observer_id).unwrap();
377 /// ```
378 pub fn subscribe(&self, observer: Observer<T>) -> Result<ObserverId, PropertyError> {
379 let mut prop = self
380 .inner
381 .write()
382 .map_err(|_| PropertyError::WriteLockError {
383 context: "subscribing observer".to_string(),
384 })?;
385
386 let id = prop.next_id;
387 prop.next_id += 1;
388 prop.observers.insert(id, observer);
389 Ok(id)
390 }
391
392 /// Removes an observer by its ID
393 ///
394 /// # Arguments
395 ///
396 /// * `id` - The observer ID returned by `subscribe()`
397 ///
398 /// # Returns
399 ///
400 /// `Ok(bool)` where `true` means the observer was found and removed,
401 /// `false` means no observer with that ID existed.
402 /// Returns `Err(PropertyError)` if the lock is poisoned.
403 ///
404 /// # Examples
405 ///
406 /// ```rust
407 /// use observable_property::ObservableProperty;
408 /// use std::sync::Arc;
409 ///
410 /// let property = ObservableProperty::new(0);
411 /// let id = property.subscribe(Arc::new(|_, _| {})).unwrap();
412 ///
413 /// let was_removed = property.unsubscribe(id).unwrap();
414 /// assert!(was_removed); // Observer existed and was removed
415 ///
416 /// let was_removed_again = property.unsubscribe(id).unwrap();
417 /// assert!(!was_removed_again); // Observer no longer exists
418 /// ```
419 pub fn unsubscribe(&self, id: ObserverId) -> Result<bool, PropertyError> {
420 let mut prop = self
421 .inner
422 .write()
423 .map_err(|_| PropertyError::WriteLockError {
424 context: "unsubscribing observer".to_string(),
425 })?;
426
427 let was_present = prop.observers.remove(&id).is_some();
428 Ok(was_present)
429 }
430
431 /// Subscribes an observer that only gets called when a filter condition is met
432 ///
433 /// This is useful for observing only specific types of changes, such as
434 /// when a value increases or crosses a threshold.
435 ///
436 /// # Arguments
437 ///
438 /// * `observer` - The observer function to call when the filter passes
439 /// * `filter` - A predicate function that receives `(old_value, new_value)` and returns `bool`
440 ///
441 /// # Returns
442 ///
443 /// `Ok(ObserverId)` for the filtered observer, or `Err(PropertyError)` if the lock is poisoned.
444 ///
445 /// # Examples
446 ///
447 /// ```rust
448 /// use observable_property::ObservableProperty;
449 /// use std::sync::Arc;
450 ///
451 /// let property = ObservableProperty::new(0);
452 ///
453 /// // Only notify when value increases
454 /// let id = property.subscribe_filtered(
455 /// Arc::new(|old, new| println!("Value increased: {} -> {}", old, new)),
456 /// |old, new| new > old
457 /// ).unwrap();
458 ///
459 /// property.set(10).unwrap(); // Triggers observer (0 -> 10)
460 /// property.set(5).unwrap(); // Does NOT trigger observer (10 -> 5)
461 /// property.set(15).unwrap(); // Triggers observer (5 -> 15)
462 /// ```
463 pub fn subscribe_filtered<F>(
464 &self,
465 observer: Observer<T>,
466 filter: F,
467 ) -> Result<ObserverId, PropertyError>
468 where
469 F: Fn(&T, &T) -> bool + Send + Sync + 'static,
470 {
471 let filter = Arc::new(filter);
472 let filtered_observer = Arc::new(move |old_val: &T, new_val: &T| {
473 if filter(old_val, new_val) {
474 observer(old_val, new_val);
475 }
476 });
477
478 self.subscribe(filtered_observer)
479 }
480}
481
482impl<T: Clone> Clone for ObservableProperty<T> {
483 /// Creates a new reference to the same observable property
484 ///
485 /// This creates a new `ObservableProperty` instance that shares the same
486 /// underlying data with the original. Changes made through either instance
487 /// will be visible to observers subscribed through both instances.
488 ///
489 /// # Examples
490 ///
491 /// ```rust
492 /// use observable_property::ObservableProperty;
493 /// use std::sync::Arc;
494 ///
495 /// let property1 = ObservableProperty::new(42);
496 /// let property2 = property1.clone();
497 ///
498 /// property2.subscribe(Arc::new(|old, new| {
499 /// println!("Observer on property2 saw change: {} -> {}", old, new);
500 /// })).unwrap();
501 ///
502 /// // This change through property1 will trigger the observer on property2
503 /// property1.set(100).unwrap();
504 /// ```
505 fn clone(&self) -> Self {
506 Self {
507 inner: Arc::clone(&self.inner),
508 }
509 }
510}
511
512impl<T: Clone + std::fmt::Debug + Send + Sync + 'static> std::fmt::Debug for ObservableProperty<T> {
513 /// Debug implementation that shows the current value if accessible
514 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
515 match self.get() {
516 Ok(value) => f.debug_struct("ObservableProperty")
517 .field("value", &value)
518 .field("observers_count", &"[hidden]")
519 .finish(),
520 Err(_) => f.debug_struct("ObservableProperty")
521 .field("value", &"[inaccessible]")
522 .field("observers_count", &"[hidden]")
523 .finish(),
524 }
525 }
526}
527
528#[cfg(test)]
529mod tests {
530 use super::*;
531 use std::sync::atomic::{AtomicUsize, Ordering};
532 use std::time::Duration;
533
534 #[test]
535 fn test_property_creation_and_basic_operations() {
536 let prop = ObservableProperty::new(42);
537
538 // Test initial value
539 match prop.get() {
540 Ok(value) => assert_eq!(value, 42),
541 Err(e) => panic!("Failed to get initial value: {}", e),
542 }
543
544 // Test setting value
545 if let Err(e) = prop.set(100) {
546 panic!("Failed to set value: {}", e);
547 }
548
549 match prop.get() {
550 Ok(value) => assert_eq!(value, 100),
551 Err(e) => panic!("Failed to get updated value: {}", e),
552 }
553 }
554
555 #[test]
556 fn test_observer_subscription_and_notification() {
557 let prop = ObservableProperty::new("initial".to_string());
558 let notification_count = Arc::new(AtomicUsize::new(0));
559 let last_old_value = Arc::new(RwLock::new(String::new()));
560 let last_new_value = Arc::new(RwLock::new(String::new()));
561
562 let count_clone = notification_count.clone();
563 let old_clone = last_old_value.clone();
564 let new_clone = last_new_value.clone();
565
566 let observer_id = match prop.subscribe(Arc::new(move |old, new| {
567 count_clone.fetch_add(1, Ordering::SeqCst);
568 if let Ok(mut old_val) = old_clone.write() {
569 *old_val = old.clone();
570 }
571 if let Ok(mut new_val) = new_clone.write() {
572 *new_val = new.clone();
573 }
574 })) {
575 Ok(id) => id,
576 Err(e) => panic!("Failed to subscribe observer: {}", e),
577 };
578
579 // Change value and verify notification
580 if let Err(e) = prop.set("changed".to_string()) {
581 panic!("Failed to set property value: {}", e);
582 }
583
584 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
585
586 match last_old_value.read() {
587 Ok(old_val) => assert_eq!(*old_val, "initial"),
588 Err(e) => panic!("Failed to read old value: {:?}", e),
589 }
590
591 match last_new_value.read() {
592 Ok(new_val) => assert_eq!(*new_val, "changed"),
593 Err(e) => panic!("Failed to read new value: {:?}", e),
594 }
595
596 // Test unsubscription
597 match prop.unsubscribe(observer_id) {
598 Ok(was_present) => assert!(was_present),
599 Err(e) => panic!("Failed to unsubscribe observer: {}", e),
600 }
601
602 // Change value again - should not notify
603 if let Err(e) = prop.set("not_notified".to_string()) {
604 panic!("Failed to set property value after unsubscribe: {}", e);
605 }
606 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
607 }
608
609 #[test]
610 fn test_filtered_observer() {
611 let prop = ObservableProperty::new(0i32);
612 let notification_count = Arc::new(AtomicUsize::new(0));
613 let count_clone = notification_count.clone();
614
615 // Observer only triggered when value increases
616 let observer_id = match prop.subscribe_filtered(
617 Arc::new(move |_, _| {
618 count_clone.fetch_add(1, Ordering::SeqCst);
619 }),
620 |old, new| new > old
621 ) {
622 Ok(id) => id,
623 Err(e) => panic!("Failed to subscribe filtered observer: {}", e),
624 };
625
626 // Should trigger (0 -> 5)
627 if let Err(e) = prop.set(5) {
628 panic!("Failed to set property value to 5: {}", e);
629 }
630 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
631
632 // Should NOT trigger (5 -> 3)
633 if let Err(e) = prop.set(3) {
634 panic!("Failed to set property value to 3: {}", e);
635 }
636 assert_eq!(notification_count.load(Ordering::SeqCst), 1);
637
638 // Should trigger (3 -> 10)
639 if let Err(e) = prop.set(10) {
640 panic!("Failed to set property value to 10: {}", e);
641 }
642 assert_eq!(notification_count.load(Ordering::SeqCst), 2);
643
644 match prop.unsubscribe(observer_id) {
645 Ok(_) => {},
646 Err(e) => panic!("Failed to unsubscribe filtered observer: {}", e),
647 }
648 }
649
650 #[test]
651 fn test_thread_safety_concurrent_reads() {
652 let prop = Arc::new(ObservableProperty::new(42i32));
653 let num_threads = 10;
654 let reads_per_thread = 100;
655
656 let handles: Vec<_> = (0..num_threads).map(|_| {
657 let prop_clone = prop.clone();
658 thread::spawn(move || {
659 for _ in 0..reads_per_thread {
660 match prop_clone.get() {
661 Ok(value) => assert_eq!(value, 42),
662 Err(e) => panic!("Failed to read property value: {}", e),
663 }
664 thread::sleep(Duration::from_millis(1));
665 }
666 })
667 }).collect();
668
669 for handle in handles {
670 if let Err(e) = handle.join() {
671 panic!("Thread failed to complete: {:?}", e);
672 }
673 }
674 }
675
676 #[test]
677 fn test_async_set_performance() {
678 let prop = ObservableProperty::new(0i32);
679 let slow_observer_count = Arc::new(AtomicUsize::new(0));
680 let count_clone = slow_observer_count.clone();
681
682 // Add observer that simulates slow work
683 let _id = match prop.subscribe(Arc::new(move |_, _| {
684 thread::sleep(Duration::from_millis(50));
685 count_clone.fetch_add(1, Ordering::SeqCst);
686 })) {
687 Ok(id) => id,
688 Err(e) => panic!("Failed to subscribe slow observer: {}", e),
689 };
690
691 // Test synchronous set (should be slow)
692 let start = std::time::Instant::now();
693 if let Err(e) = prop.set(1) {
694 panic!("Failed to set property value synchronously: {}", e);
695 }
696 let sync_duration = start.elapsed();
697
698 // Test asynchronous set (should be fast)
699 let start = std::time::Instant::now();
700 if let Err(e) = prop.set_async(2) {
701 panic!("Failed to set property value asynchronously: {}", e);
702 }
703 let async_duration = start.elapsed();
704
705 // Async should be much faster than sync
706 assert!(async_duration < sync_duration);
707 assert!(async_duration.as_millis() < 10); // Should be very fast
708
709 // Wait for async observer to complete
710 thread::sleep(Duration::from_millis(100));
711
712 // Both observers should have been called
713 assert_eq!(slow_observer_count.load(Ordering::SeqCst), 2);
714 }
715}