ObservableProperty

Struct ObservableProperty 

Source
pub struct ObservableProperty<T> { /* private fields */ }
Expand description

A thread-safe observable property that notifies observers when its value changes

This type wraps a value of type T and allows multiple observers to be notified whenever the value is modified. All operations are thread-safe and can be called from multiple threads concurrently.

§Type Requirements

The generic type T must implement:

  • Clone: Required for returning values and passing them to observers
  • Send: Required for transferring between threads
  • Sync: Required for concurrent access from multiple threads
  • 'static: Required for observer callbacks that may outlive the original scope

§Examples

use observable_property::ObservableProperty;
use std::sync::Arc;

let property = ObservableProperty::new("initial".to_string());

let observer_id = property.subscribe(Arc::new(|old, new| {
    println!("Changed from '{}' to '{}'", old, new);
})).map_err(|e| {
    eprintln!("Failed to subscribe: {}", e);
    e
})?;

property.set("updated".to_string()).map_err(|e| {
    eprintln!("Failed to set value: {}", e);
    e
})?; // Prints: Changed from 'initial' to 'updated'

property.unsubscribe(observer_id).map_err(|e| {
    eprintln!("Failed to unsubscribe: {}", e);
    e
})?;

Implementations§

Source§

impl<T: Clone + Send + Sync + 'static> ObservableProperty<T>

Source

pub fn new(initial_value: T) -> Self

Creates a new observable property with the given initial value

§Arguments
  • initial_value - The starting value for this property
§Examples
use observable_property::ObservableProperty;

let property = ObservableProperty::new(42);
match property.get() {
    Ok(value) => assert_eq!(value, 42),
    Err(e) => eprintln!("Failed to get property value: {}", e),
}
Examples found in repository?
examples/basic.rs (line 18)
16    fn new(name: String, age: i32) -> Self {
17        Self {
18            name: ObservableProperty::new(name),
19            age: ObservableProperty::new(age),
20        }
21    }
22
23    fn get_name(&self) -> Result<String, observable_property::PropertyError> {
24        self.name.get()
25    }
26
27    fn set_name(&self, new_name: String) -> Result<(), observable_property::PropertyError> {
28        self.name.set(new_name)
29    }
30
31    fn get_age(&self) -> Result<i32, observable_property::PropertyError> {
32        self.age.get()
33    }
34
35    fn set_age(&self, new_age: i32) -> Result<(), observable_property::PropertyError> {
36        self.age.set(new_age)
37    }
38
39    fn celebrate_birthday(&self) -> Result<(), observable_property::PropertyError> {
40        let current_age = self.age.get()?;
41        self.age.set(current_age + 1)
42    }
43}
44
45fn main() -> Result<(), Box<dyn std::error::Error>> {
46    println!("=== Observable Property Basic Example ===\n");
47
48    // Create a person
49    let person = Person::new("Alice".to_string(), 25);
50
51    // Subscribe to name changes
52    let name_observer_id = person.name.subscribe(Arc::new(|old_name, new_name| {
53        println!("📝 Name changed: '{}' → '{}'", old_name, new_name);
54    }))?;
55
56    // Subscribe to age changes
57    let age_observer_id = person.age.subscribe(Arc::new(|old_age, new_age| {
58        println!("🎂 Age changed: {} → {}", old_age, new_age);
59    }))?;
60
61    // Subscribe to only significant age changes (milestones)
62    let milestone_observer_id = person.age.subscribe_filtered(
63        Arc::new(|_old_age, new_age| {
64            println!("🎉 Milestone reached! {} is now a special age: {}", 
65                    if *new_age >= 18 { "Adult" } else { "Child" }, new_age);
66        }),
67        |old_age, new_age| {
68            // Notify on milestone ages
69            let milestones = [18, 21, 30, 40, 50, 65];
70            milestones.contains(new_age) || 
71            (milestones.contains(old_age) && !milestones.contains(new_age))
72        }
73    )?;
74
75    println!("Initial state:");
76    println!("  Name: {}", person.get_name()?);
77    println!("  Age: {}\n", person.get_age()?);
78
79    // Demonstrate property changes
80    println!("Making changes...\n");
81
82    // Change name
83    person.set_name("Alice Johnson".to_string())?;
84
85    // Age up a few times
86    person.celebrate_birthday()?;
87    person.celebrate_birthday()?;
88    person.celebrate_birthday()?;
89
90    // Change to milestone age
91    person.set_age(21)?;
92
93    // Direct property access
94    println!("\nDirect property access:");
95    let simple_property = ObservableProperty::new(100);
96    
97    simple_property.subscribe(Arc::new(|old, new| {
98        println!("💰 Value changed: {} → {}", old, new);
99    }))?;
100
101    simple_property.set(200)?;
102    simple_property.set(150)?;
103
104    // Cleanup observers
105    person.name.unsubscribe(name_observer_id)?;
106    person.age.unsubscribe(age_observer_id)?;
107    person.age.unsubscribe(milestone_observer_id)?;
108
109    println!("\n✅ Example completed successfully!");
110    Ok(())
111}
More examples
Hide additional examples
examples/multithreaded.rs (line 16)
14    fn new(name: String, age: i32) -> Self {
15        Self {
16            name: ObservableProperty::new(name),
17            age: ObservableProperty::new(age),
18        }
19    }
Source

pub fn with_max_threads(initial_value: T, max_threads: usize) -> Self

Creates a new observable property with a custom maximum thread count for async notifications

This constructor allows you to customize the maximum number of threads used for asynchronous observer notifications via set_async(). This is useful for tuning performance based on your specific use case and system constraints.

§Arguments
  • initial_value - The starting value for this property
  • max_threads - Maximum number of threads to use for async notifications. If 0 is provided, defaults to 4.
§Thread Pool Behavior

When set_async() is called, observers are divided into batches and each batch runs in its own thread, up to the specified maximum. For example:

  • With 100 observers and max_threads = 4: 4 threads with ~25 observers each
  • With 10 observers and max_threads = 8: 10 threads with 1 observer each
  • With 2 observers and max_threads = 4: 2 threads with 1 observer each
§Use Cases
§High-Throughput Systems
use observable_property::ObservableProperty;

// For systems with many CPU cores and CPU-bound observers
let property = ObservableProperty::with_max_threads(0, 8);
§Resource-Constrained Systems
use observable_property::ObservableProperty;

// For embedded systems or memory-constrained environments
let property = ObservableProperty::with_max_threads(42, 1);
§I/O-Heavy Observers
use observable_property::ObservableProperty;

// For observers that do network/database operations
let property = ObservableProperty::with_max_threads("data".to_string(), 16);
§Performance Considerations
  • Higher values: Better parallelism but more thread overhead and memory usage
  • Lower values: Less overhead but potentially slower async notifications
  • Optimal range: Typically between 1 and 2x the number of CPU cores
  • Zero value: Automatically uses the default value (4)
§Thread Safety

This setting only affects async notifications (set_async()). Synchronous operations (set()) always execute observers sequentially regardless of this setting.

§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;

// Create property with custom thread pool size
let property = ObservableProperty::with_max_threads(42, 2);

// Subscribe observers as usual
let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
    println!("Value changed: {} -> {}", old, new);
})).expect("Failed to create subscription");

// Async notifications will use at most 2 threads
property.set_async(100).expect("Failed to set value asynchronously");
Source

pub fn get(&self) -> Result<T, PropertyError>

Gets the current value of the property

This method acquires a read lock, which allows multiple concurrent readers but will block if a writer currently holds the lock.

§Returns

Ok(T) containing a clone of the current value, or Err(PropertyError) if the lock is poisoned.

§Examples
use observable_property::ObservableProperty;

let property = ObservableProperty::new("hello".to_string());
match property.get() {
    Ok(value) => assert_eq!(value, "hello"),
    Err(e) => eprintln!("Failed to get property value: {}", e),
}
Examples found in repository?
examples/basic.rs (line 24)
23    fn get_name(&self) -> Result<String, observable_property::PropertyError> {
24        self.name.get()
25    }
26
27    fn set_name(&self, new_name: String) -> Result<(), observable_property::PropertyError> {
28        self.name.set(new_name)
29    }
30
31    fn get_age(&self) -> Result<i32, observable_property::PropertyError> {
32        self.age.get()
33    }
34
35    fn set_age(&self, new_age: i32) -> Result<(), observable_property::PropertyError> {
36        self.age.set(new_age)
37    }
38
39    fn celebrate_birthday(&self) -> Result<(), observable_property::PropertyError> {
40        let current_age = self.age.get()?;
41        self.age.set(current_age + 1)
42    }
More examples
Hide additional examples
examples/multithreaded.rs (line 22)
21    fn get_name(&self) -> Result<String, observable_property::PropertyError> {
22        self.name.get()
23    }
24
25    fn set_name(&self, new_name: String) -> Result<(), observable_property::PropertyError> {
26        self.name.set(new_name)
27    }
28
29    fn get_age(&self) -> Result<i32, observable_property::PropertyError> {
30        self.age.get()
31    }
32
33    fn set_age(&self, new_age: i32) -> Result<(), observable_property::PropertyError> {
34        self.age.set(new_age)
35    }
36
37    fn celebrate_birthday(&self) -> Result<(), observable_property::PropertyError> {
38        let current_age = self.age.get()?;
39        self.age.set(current_age + 1)
40    }
Source

pub fn set(&self, new_value: T) -> Result<(), PropertyError>

Sets the property to a new value and notifies all observers

This method will:

  1. Acquire a write lock (blocking other readers/writers)
  2. Update the value and capture a snapshot of observers
  3. Release the lock
  4. Notify all observers sequentially with the old and new values

Observer notifications are wrapped in panic recovery to prevent one misbehaving observer from affecting others.

§Arguments
  • new_value - The new value to set
§Returns

Ok(()) if successful, or Err(PropertyError) if the lock is poisoned.

§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;

let property = ObservableProperty::new(10);

property.subscribe(Arc::new(|old, new| {
    println!("Value changed from {} to {}", old, new);
})).map_err(|e| {
    eprintln!("Failed to subscribe: {}", e);
    e
})?;

property.set(20).map_err(|e| {
    eprintln!("Failed to set property value: {}", e);
    e
})?; // Triggers observer notification
Examples found in repository?
examples/basic.rs (line 28)
27    fn set_name(&self, new_name: String) -> Result<(), observable_property::PropertyError> {
28        self.name.set(new_name)
29    }
30
31    fn get_age(&self) -> Result<i32, observable_property::PropertyError> {
32        self.age.get()
33    }
34
35    fn set_age(&self, new_age: i32) -> Result<(), observable_property::PropertyError> {
36        self.age.set(new_age)
37    }
38
39    fn celebrate_birthday(&self) -> Result<(), observable_property::PropertyError> {
40        let current_age = self.age.get()?;
41        self.age.set(current_age + 1)
42    }
43}
44
45fn main() -> Result<(), Box<dyn std::error::Error>> {
46    println!("=== Observable Property Basic Example ===\n");
47
48    // Create a person
49    let person = Person::new("Alice".to_string(), 25);
50
51    // Subscribe to name changes
52    let name_observer_id = person.name.subscribe(Arc::new(|old_name, new_name| {
53        println!("📝 Name changed: '{}' → '{}'", old_name, new_name);
54    }))?;
55
56    // Subscribe to age changes
57    let age_observer_id = person.age.subscribe(Arc::new(|old_age, new_age| {
58        println!("🎂 Age changed: {} → {}", old_age, new_age);
59    }))?;
60
61    // Subscribe to only significant age changes (milestones)
62    let milestone_observer_id = person.age.subscribe_filtered(
63        Arc::new(|_old_age, new_age| {
64            println!("🎉 Milestone reached! {} is now a special age: {}", 
65                    if *new_age >= 18 { "Adult" } else { "Child" }, new_age);
66        }),
67        |old_age, new_age| {
68            // Notify on milestone ages
69            let milestones = [18, 21, 30, 40, 50, 65];
70            milestones.contains(new_age) || 
71            (milestones.contains(old_age) && !milestones.contains(new_age))
72        }
73    )?;
74
75    println!("Initial state:");
76    println!("  Name: {}", person.get_name()?);
77    println!("  Age: {}\n", person.get_age()?);
78
79    // Demonstrate property changes
80    println!("Making changes...\n");
81
82    // Change name
83    person.set_name("Alice Johnson".to_string())?;
84
85    // Age up a few times
86    person.celebrate_birthday()?;
87    person.celebrate_birthday()?;
88    person.celebrate_birthday()?;
89
90    // Change to milestone age
91    person.set_age(21)?;
92
93    // Direct property access
94    println!("\nDirect property access:");
95    let simple_property = ObservableProperty::new(100);
96    
97    simple_property.subscribe(Arc::new(|old, new| {
98        println!("💰 Value changed: {} → {}", old, new);
99    }))?;
100
101    simple_property.set(200)?;
102    simple_property.set(150)?;
103
104    // Cleanup observers
105    person.name.unsubscribe(name_observer_id)?;
106    person.age.unsubscribe(age_observer_id)?;
107    person.age.unsubscribe(milestone_observer_id)?;
108
109    println!("\n✅ Example completed successfully!");
110    Ok(())
111}
More examples
Hide additional examples
examples/multithreaded.rs (line 26)
25    fn set_name(&self, new_name: String) -> Result<(), observable_property::PropertyError> {
26        self.name.set(new_name)
27    }
28
29    fn get_age(&self) -> Result<i32, observable_property::PropertyError> {
30        self.age.get()
31    }
32
33    fn set_age(&self, new_age: i32) -> Result<(), observable_property::PropertyError> {
34        self.age.set(new_age)
35    }
36
37    fn celebrate_birthday(&self) -> Result<(), observable_property::PropertyError> {
38        let current_age = self.age.get()?;
39        self.age.set(current_age + 1)
40    }
Source

pub fn set_async(&self, new_value: T) -> Result<(), PropertyError>

Sets the property to a new value and notifies observers asynchronously

This method is similar to set() but spawns observers in background threads for non-blocking operation. This is useful when observers might perform time-consuming operations.

Observers are batched into groups and each batch runs in its own thread to limit resource usage while still providing parallelism.

§Thread Management (Fire-and-Forget Pattern)

Important: This method uses a fire-and-forget pattern. Spawned threads are not joined and run independently in the background. This design is intentional for non-blocking behavior but has important implications:

§Characteristics:
  • Non-blocking: Returns immediately without waiting for observers
  • High performance: No synchronization overhead
  • ⚠️ No completion guarantee: Thread may still be running when method returns
  • ⚠️ No error propagation: Observer errors are logged but not returned
  • ⚠️ Testing caveat: May need explicit delays to observe side effects
  • ⚠️ Ordering caveat: Multiple rapid set_async() calls may result in observers receiving notifications out of order due to thread scheduling. Use set() if sequential ordering is critical.
§Use Cases:
  • UI updates: Fire updates without blocking the main thread
  • Logging: Asynchronous logging that doesn’t block operations
  • Metrics: Non-critical telemetry that can be lost
  • Notifications: Fire-and-forget alerts or messages
§When NOT to Use:
  • Critical operations: Use set() if you need guarantees
  • Transactional updates: Use set() for atomic operations
  • Sequential dependencies: If next operation depends on observer completion
§Testing Considerations:
use observable_property::ObservableProperty;
use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
use std::time::Duration;

let property = ObservableProperty::new(0);
let was_called = Arc::new(AtomicBool::new(false));
let flag = was_called.clone();

property.subscribe(Arc::new(move |_, _| {
    flag.store(true, Ordering::SeqCst);
}))?;

property.set_async(42)?;

// ⚠️ Immediate check might fail - thread may not have run yet
// assert!(was_called.load(Ordering::SeqCst)); // May fail!

// ✅ Add a small delay to allow background thread to complete
std::thread::sleep(Duration::from_millis(10));
assert!(was_called.load(Ordering::SeqCst)); // Now reliable
§Arguments
  • new_value - The new value to set
§Returns

Ok(()) if successful, or Err(PropertyError) if the lock is poisoned. Note that this only indicates the property was updated successfully; observer execution happens asynchronously and errors are not returned.

§Examples
§Basic Usage
use observable_property::ObservableProperty;
use std::sync::Arc;
use std::time::Duration;

let property = ObservableProperty::new(0);

property.subscribe(Arc::new(|old, new| {
    // This observer does slow work but won't block the caller
    std::thread::sleep(Duration::from_millis(100));
    println!("Slow observer: {} -> {}", old, new);
})).map_err(|e| {
    eprintln!("Failed to subscribe: {}", e);
    e
})?;

// This returns immediately even though observer is slow
property.set_async(42).map_err(|e| {
    eprintln!("Failed to set value asynchronously: {}", e);
    e
})?;

// Continue working immediately - observer runs in background
println!("Main thread continues without waiting");
§Multiple Rapid Updates
use observable_property::ObservableProperty;
use std::sync::Arc;

let property = ObservableProperty::new(0);

property.subscribe(Arc::new(|old, new| {
    // Expensive operation (e.g., database update, API call)
    println!("Processing: {} -> {}", old, new);
}))?;

// All of these return immediately - observers run in parallel
property.set_async(1)?;
property.set_async(2)?;
property.set_async(3)?;
property.set_async(4)?;
property.set_async(5)?;

// All observer calls are now running in background threads
Source

pub fn subscribe( &self, observer: Observer<T>, ) -> Result<ObserverId, PropertyError>

Subscribes an observer function to be called when the property changes

The observer function will be called with the old and new values whenever the property is modified via set() or set_async().

§Arguments
  • observer - A function wrapped in Arc that takes (&T, &T) parameters
§Returns

Ok(ObserverId) containing a unique identifier for this observer, or Err(PropertyError::InvalidConfiguration) if the maximum observer limit is exceeded.

§Observer Limit

To prevent memory exhaustion, there is a maximum limit of observers per property (currently set to 10,000). If you attempt to add more observers than this limit, the subscription will fail with an InvalidConfiguration error.

This protection helps prevent:

  • Memory leaks from forgotten unsubscriptions
  • Unbounded memory growth in long-running applications
  • Out-of-memory conditions in resource-constrained environments
§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;

let property = ObservableProperty::new(0);

let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
    println!("Property changed from {} to {}", old_value, new_value);
})).map_err(|e| {
    eprintln!("Failed to subscribe observer: {}", e);
    e
})?;

// Later, unsubscribe using the returned ID
property.unsubscribe(observer_id).map_err(|e| {
    eprintln!("Failed to unsubscribe observer: {}", e);
    e
})?;
Examples found in repository?
examples/multithreaded.rs (lines 50-52)
44fn main() -> Result<(), Box<dyn std::error::Error>> {
45    println!("=== Observable Property Multithreaded Person Example ===\n");
46
47    let person = Arc::new(Person::new("Bob".to_string(), 30));
48
49    // Subscribe to name changes
50    let name_observer_id = person.name.subscribe(Arc::new(|old_name, new_name| {
51        println!("📝 Name changed: '{}' → '{}'", old_name, new_name);
52    }))?;
53
54    // Subscribe to age changes
55    let age_observer_id = person.age.subscribe(Arc::new(|old_age, new_age| {
56        println!("🎂 Age changed: {} → {}", old_age, new_age);
57    }))?;
58
59    // Spawn threads to update name and age concurrently
60    let person_clone1 = person.clone();
61    let handle1 = thread::spawn(move || {
62        for i in 0..3 {
63            let new_name = format!("Bob #{}", i + 1);
64            person_clone1.set_name(new_name).unwrap();
65            thread::sleep(Duration::from_millis(50));
66        }
67    });
68
69    let person_clone2 = person.clone();
70    let handle2 = thread::spawn(move || {
71        for _ in 0..3 {
72            person_clone2.celebrate_birthday().unwrap();
73            thread::sleep(Duration::from_millis(30));
74        }
75    });
76
77    handle1.join().unwrap();
78    handle2.join().unwrap();
79
80    println!("\nFinal state:");
81    println!("  Name: {}", person.get_name()?);
82    println!("  Age: {}", person.get_age()?);
83
84    // Cleanup observers
85    person.name.unsubscribe(name_observer_id)?;
86    person.age.unsubscribe(age_observer_id)?;
87
88    println!("\n✅ Multithreaded Person example completed successfully!");
89    Ok(())
90}
More examples
Hide additional examples
examples/basic.rs (lines 52-54)
45fn main() -> Result<(), Box<dyn std::error::Error>> {
46    println!("=== Observable Property Basic Example ===\n");
47
48    // Create a person
49    let person = Person::new("Alice".to_string(), 25);
50
51    // Subscribe to name changes
52    let name_observer_id = person.name.subscribe(Arc::new(|old_name, new_name| {
53        println!("📝 Name changed: '{}' → '{}'", old_name, new_name);
54    }))?;
55
56    // Subscribe to age changes
57    let age_observer_id = person.age.subscribe(Arc::new(|old_age, new_age| {
58        println!("🎂 Age changed: {} → {}", old_age, new_age);
59    }))?;
60
61    // Subscribe to only significant age changes (milestones)
62    let milestone_observer_id = person.age.subscribe_filtered(
63        Arc::new(|_old_age, new_age| {
64            println!("🎉 Milestone reached! {} is now a special age: {}", 
65                    if *new_age >= 18 { "Adult" } else { "Child" }, new_age);
66        }),
67        |old_age, new_age| {
68            // Notify on milestone ages
69            let milestones = [18, 21, 30, 40, 50, 65];
70            milestones.contains(new_age) || 
71            (milestones.contains(old_age) && !milestones.contains(new_age))
72        }
73    )?;
74
75    println!("Initial state:");
76    println!("  Name: {}", person.get_name()?);
77    println!("  Age: {}\n", person.get_age()?);
78
79    // Demonstrate property changes
80    println!("Making changes...\n");
81
82    // Change name
83    person.set_name("Alice Johnson".to_string())?;
84
85    // Age up a few times
86    person.celebrate_birthday()?;
87    person.celebrate_birthday()?;
88    person.celebrate_birthday()?;
89
90    // Change to milestone age
91    person.set_age(21)?;
92
93    // Direct property access
94    println!("\nDirect property access:");
95    let simple_property = ObservableProperty::new(100);
96    
97    simple_property.subscribe(Arc::new(|old, new| {
98        println!("💰 Value changed: {} → {}", old, new);
99    }))?;
100
101    simple_property.set(200)?;
102    simple_property.set(150)?;
103
104    // Cleanup observers
105    person.name.unsubscribe(name_observer_id)?;
106    person.age.unsubscribe(age_observer_id)?;
107    person.age.unsubscribe(milestone_observer_id)?;
108
109    println!("\n✅ Example completed successfully!");
110    Ok(())
111}
Source

pub fn unsubscribe(&self, id: ObserverId) -> Result<bool, PropertyError>

Removes an observer identified by its ID

§Arguments
  • id - The observer ID returned by subscribe()
§Returns

Ok(bool) where true means the observer was found and removed, false means no observer with that ID existed. Returns Err(PropertyError) if the lock is poisoned.

§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;

let property = ObservableProperty::new(0);
let id = property.subscribe(Arc::new(|_, _| {})).map_err(|e| {
    eprintln!("Failed to subscribe: {}", e);
    e
})?;

let was_removed = property.unsubscribe(id).map_err(|e| {
    eprintln!("Failed to unsubscribe: {}", e);
    e
})?;
assert!(was_removed); // Observer existed and was removed

let was_removed_again = property.unsubscribe(id).map_err(|e| {
    eprintln!("Failed to unsubscribe again: {}", e);
    e
})?;
assert!(!was_removed_again); // Observer no longer exists
Examples found in repository?
examples/multithreaded.rs (line 85)
44fn main() -> Result<(), Box<dyn std::error::Error>> {
45    println!("=== Observable Property Multithreaded Person Example ===\n");
46
47    let person = Arc::new(Person::new("Bob".to_string(), 30));
48
49    // Subscribe to name changes
50    let name_observer_id = person.name.subscribe(Arc::new(|old_name, new_name| {
51        println!("📝 Name changed: '{}' → '{}'", old_name, new_name);
52    }))?;
53
54    // Subscribe to age changes
55    let age_observer_id = person.age.subscribe(Arc::new(|old_age, new_age| {
56        println!("🎂 Age changed: {} → {}", old_age, new_age);
57    }))?;
58
59    // Spawn threads to update name and age concurrently
60    let person_clone1 = person.clone();
61    let handle1 = thread::spawn(move || {
62        for i in 0..3 {
63            let new_name = format!("Bob #{}", i + 1);
64            person_clone1.set_name(new_name).unwrap();
65            thread::sleep(Duration::from_millis(50));
66        }
67    });
68
69    let person_clone2 = person.clone();
70    let handle2 = thread::spawn(move || {
71        for _ in 0..3 {
72            person_clone2.celebrate_birthday().unwrap();
73            thread::sleep(Duration::from_millis(30));
74        }
75    });
76
77    handle1.join().unwrap();
78    handle2.join().unwrap();
79
80    println!("\nFinal state:");
81    println!("  Name: {}", person.get_name()?);
82    println!("  Age: {}", person.get_age()?);
83
84    // Cleanup observers
85    person.name.unsubscribe(name_observer_id)?;
86    person.age.unsubscribe(age_observer_id)?;
87
88    println!("\n✅ Multithreaded Person example completed successfully!");
89    Ok(())
90}
More examples
Hide additional examples
examples/basic.rs (line 105)
45fn main() -> Result<(), Box<dyn std::error::Error>> {
46    println!("=== Observable Property Basic Example ===\n");
47
48    // Create a person
49    let person = Person::new("Alice".to_string(), 25);
50
51    // Subscribe to name changes
52    let name_observer_id = person.name.subscribe(Arc::new(|old_name, new_name| {
53        println!("📝 Name changed: '{}' → '{}'", old_name, new_name);
54    }))?;
55
56    // Subscribe to age changes
57    let age_observer_id = person.age.subscribe(Arc::new(|old_age, new_age| {
58        println!("🎂 Age changed: {} → {}", old_age, new_age);
59    }))?;
60
61    // Subscribe to only significant age changes (milestones)
62    let milestone_observer_id = person.age.subscribe_filtered(
63        Arc::new(|_old_age, new_age| {
64            println!("🎉 Milestone reached! {} is now a special age: {}", 
65                    if *new_age >= 18 { "Adult" } else { "Child" }, new_age);
66        }),
67        |old_age, new_age| {
68            // Notify on milestone ages
69            let milestones = [18, 21, 30, 40, 50, 65];
70            milestones.contains(new_age) || 
71            (milestones.contains(old_age) && !milestones.contains(new_age))
72        }
73    )?;
74
75    println!("Initial state:");
76    println!("  Name: {}", person.get_name()?);
77    println!("  Age: {}\n", person.get_age()?);
78
79    // Demonstrate property changes
80    println!("Making changes...\n");
81
82    // Change name
83    person.set_name("Alice Johnson".to_string())?;
84
85    // Age up a few times
86    person.celebrate_birthday()?;
87    person.celebrate_birthday()?;
88    person.celebrate_birthday()?;
89
90    // Change to milestone age
91    person.set_age(21)?;
92
93    // Direct property access
94    println!("\nDirect property access:");
95    let simple_property = ObservableProperty::new(100);
96    
97    simple_property.subscribe(Arc::new(|old, new| {
98        println!("💰 Value changed: {} → {}", old, new);
99    }))?;
100
101    simple_property.set(200)?;
102    simple_property.set(150)?;
103
104    // Cleanup observers
105    person.name.unsubscribe(name_observer_id)?;
106    person.age.unsubscribe(age_observer_id)?;
107    person.age.unsubscribe(milestone_observer_id)?;
108
109    println!("\n✅ Example completed successfully!");
110    Ok(())
111}
Source

pub fn subscribe_filtered<F>( &self, observer: Observer<T>, filter: F, ) -> Result<ObserverId, PropertyError>
where F: Fn(&T, &T) -> bool + Send + Sync + 'static,

Subscribes an observer that only gets called when a filter condition is met

This is useful for observing only specific types of changes, such as when a value increases or crosses a threshold.

§Arguments
  • observer - The observer function to call when the filter passes
  • filter - A predicate function that receives (old_value, new_value) and returns bool
§Returns

Ok(ObserverId) for the filtered observer, or Err(PropertyError) if the lock is poisoned.

§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;

let property = ObservableProperty::new(0);

// Only notify when value increases
let id = property.subscribe_filtered(
    Arc::new(|old, new| println!("Value increased: {} -> {}", old, new)),
    |old, new| new > old
).map_err(|e| {
    eprintln!("Failed to subscribe filtered observer: {}", e);
    e
})?;

property.set(10).map_err(|e| {
    eprintln!("Failed to set value: {}", e);
    e
})?; // Triggers observer (0 -> 10)
property.set(5).map_err(|e| {
    eprintln!("Failed to set value: {}", e);
    e
})?;  // Does NOT trigger observer (10 -> 5)
property.set(15).map_err(|e| {
    eprintln!("Failed to set value: {}", e);
    e
})?; // Triggers observer (5 -> 15)
Examples found in repository?
examples/basic.rs (lines 62-73)
45fn main() -> Result<(), Box<dyn std::error::Error>> {
46    println!("=== Observable Property Basic Example ===\n");
47
48    // Create a person
49    let person = Person::new("Alice".to_string(), 25);
50
51    // Subscribe to name changes
52    let name_observer_id = person.name.subscribe(Arc::new(|old_name, new_name| {
53        println!("📝 Name changed: '{}' → '{}'", old_name, new_name);
54    }))?;
55
56    // Subscribe to age changes
57    let age_observer_id = person.age.subscribe(Arc::new(|old_age, new_age| {
58        println!("🎂 Age changed: {} → {}", old_age, new_age);
59    }))?;
60
61    // Subscribe to only significant age changes (milestones)
62    let milestone_observer_id = person.age.subscribe_filtered(
63        Arc::new(|_old_age, new_age| {
64            println!("🎉 Milestone reached! {} is now a special age: {}", 
65                    if *new_age >= 18 { "Adult" } else { "Child" }, new_age);
66        }),
67        |old_age, new_age| {
68            // Notify on milestone ages
69            let milestones = [18, 21, 30, 40, 50, 65];
70            milestones.contains(new_age) || 
71            (milestones.contains(old_age) && !milestones.contains(new_age))
72        }
73    )?;
74
75    println!("Initial state:");
76    println!("  Name: {}", person.get_name()?);
77    println!("  Age: {}\n", person.get_age()?);
78
79    // Demonstrate property changes
80    println!("Making changes...\n");
81
82    // Change name
83    person.set_name("Alice Johnson".to_string())?;
84
85    // Age up a few times
86    person.celebrate_birthday()?;
87    person.celebrate_birthday()?;
88    person.celebrate_birthday()?;
89
90    // Change to milestone age
91    person.set_age(21)?;
92
93    // Direct property access
94    println!("\nDirect property access:");
95    let simple_property = ObservableProperty::new(100);
96    
97    simple_property.subscribe(Arc::new(|old, new| {
98        println!("💰 Value changed: {} → {}", old, new);
99    }))?;
100
101    simple_property.set(200)?;
102    simple_property.set(150)?;
103
104    // Cleanup observers
105    person.name.unsubscribe(name_observer_id)?;
106    person.age.unsubscribe(age_observer_id)?;
107    person.age.unsubscribe(milestone_observer_id)?;
108
109    println!("\n✅ Example completed successfully!");
110    Ok(())
111}
Source

pub fn notify_observers_batch( &self, changes: Vec<(T, T)>, ) -> Result<(), PropertyError>

Notifies all observers with a batch of changes

This method allows you to trigger observer notifications for multiple value changes efficiently. Unlike individual set() calls, this method acquires the observer list once and then notifies all observers with each change in the batch.

§Performance Characteristics
  • Lock optimization: Acquires read lock only to snapshot observers, then releases it
  • Non-blocking: Other operations can proceed during observer notifications
  • Panic isolation: Individual observer panics don’t affect other observers
§Arguments
  • changes - A vector of tuples (old_value, new_value) to notify observers about
§Returns

Ok(()) if successful. Observer errors are logged but don’t cause the method to fail.

§Examples
use observable_property::ObservableProperty;
use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};

let property = ObservableProperty::new(0);
let call_count = Arc::new(AtomicUsize::new(0));
let count_clone = call_count.clone();

property.subscribe(Arc::new(move |old, new| {
    count_clone.fetch_add(1, Ordering::SeqCst);
    println!("Change: {} -> {}", old, new);
}))?;

// Notify with multiple changes at once
property.notify_observers_batch(vec![
    (0, 10),
    (10, 20),
    (20, 30),
])?;

assert_eq!(call_count.load(Ordering::SeqCst), 3);
§Note

This method does NOT update the property’s actual value - it only triggers observer notifications. Use set() if you want to update the value and notify observers.

Source

pub fn subscribe_with_subscription( &self, observer: Observer<T>, ) -> Result<Subscription<T>, PropertyError>

Subscribes an observer and returns a RAII guard for automatic cleanup

This method is similar to subscribe() but returns a Subscription object that automatically removes the observer when it goes out of scope. This provides a more convenient and safer alternative to manual subscription management.

§Arguments
  • observer - A function wrapped in Arc that takes (&T, &T) parameters
§Returns

Ok(Subscription<T>) containing a RAII guard for the observer, or Err(PropertyError) if the lock is poisoned.

§Examples
§Basic RAII Subscription
use observable_property::ObservableProperty;
use std::sync::Arc;

let property = ObservableProperty::new(0);

{
    let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
        println!("Value: {} -> {}", old, new);
    }))?;

    property.set(42)?; // Prints: "Value: 0 -> 42"
    property.set(100)?; // Prints: "Value: 42 -> 100"

    // Automatic cleanup when _subscription goes out of scope
}

property.set(200)?; // No output - subscription was cleaned up
§Comparison with Manual Management
use observable_property::ObservableProperty;
use std::sync::Arc;

let property = ObservableProperty::new("initial".to_string());

// Method 1: Manual subscription management (traditional approach)
let observer_id = property.subscribe(Arc::new(|old, new| {
    println!("Manual: {} -> {}", old, new);
}))?;

// Method 2: RAII subscription management (recommended)
let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
    println!("RAII: {} -> {}", old, new);
}))?;

// Both observers will be called
property.set("changed".to_string())?;
// Prints:
// "Manual: initial -> changed"
// "RAII: initial -> changed"

// Manual cleanup required for first observer
property.unsubscribe(observer_id)?;

// Second observer (_subscription) is automatically cleaned up when
// the variable goes out of scope - no manual intervention needed
§Error Handling with Early Returns
use observable_property::ObservableProperty;
use std::sync::Arc;

fn process_with_monitoring(property: &ObservableProperty<i32>) -> Result<(), observable_property::PropertyError> {
    let _monitoring = property.subscribe_with_subscription(Arc::new(|old, new| {
        println!("Processing: {} -> {}", old, new);
    }))?;

    property.set(1)?;
     
    if property.get()? > 0 {
        return Ok(()); // Subscription automatically cleaned up on early return
    }

    property.set(2)?;
    Ok(()) // Subscription automatically cleaned up on normal return
}

let property = ObservableProperty::new(0);
process_with_monitoring(&property)?; // Monitoring active only during function call
property.set(99)?; // No monitoring output - subscription was cleaned up
§Multi-threaded Subscription Management
use observable_property::ObservableProperty;
use std::sync::Arc;
use std::thread;

let property = Arc::new(ObservableProperty::new(0));
let property_clone = property.clone();

let handle = thread::spawn(move || -> Result<(), observable_property::PropertyError> {
    let _subscription = property_clone.subscribe_with_subscription(Arc::new(|old, new| {
        println!("Thread observer: {} -> {}", old, new);
    }))?;

    property_clone.set(42)?; // Prints: "Thread observer: 0 -> 42"
     
    // Subscription automatically cleaned up when thread ends
    Ok(())
});

handle.join().unwrap()?;
property.set(100)?; // No output - thread subscription was cleaned up
§Use Cases

This method is particularly useful in scenarios such as:

  • Temporary observers that should be active only during a specific scope
  • Error-prone code where manual cleanup might be forgotten
  • Complex control flow where multiple exit points make manual cleanup difficult
  • Resource-constrained environments where observer leaks are problematic
Source

pub fn subscribe_filtered_with_subscription<F>( &self, observer: Observer<T>, filter: F, ) -> Result<Subscription<T>, PropertyError>
where F: Fn(&T, &T) -> bool + Send + Sync + 'static,

Subscribes a filtered observer and returns a RAII guard for automatic cleanup

This method combines the functionality of subscribe_filtered() with the automatic cleanup benefits of subscribe_with_subscription(). The observer will only be called when the filter condition is satisfied, and it will be automatically unsubscribed when the returned Subscription goes out of scope.

§Arguments
  • observer - The observer function to call when the filter passes
  • filter - A predicate function that receives (old_value, new_value) and returns bool
§Returns

Ok(Subscription<T>) containing a RAII guard for the filtered observer, or Err(PropertyError) if the lock is poisoned.

§Examples
§Basic Filtered RAII Subscription
use observable_property::ObservableProperty;
use std::sync::Arc;

let counter = ObservableProperty::new(0);

{
    // Monitor only increases with automatic cleanup
    let _increase_monitor = counter.subscribe_filtered_with_subscription(
        Arc::new(|old, new| {
            println!("Counter increased: {} -> {}", old, new);
        }),
        |old, new| new > old
    )?;

    counter.set(5)?;  // Prints: "Counter increased: 0 -> 5"
    counter.set(3)?;  // No output (decrease)
    counter.set(7)?;  // Prints: "Counter increased: 3 -> 7"

    // Subscription automatically cleaned up when leaving scope
}

counter.set(10)?; // No output - subscription was cleaned up
§Multi-Condition Temperature Monitoring
use observable_property::ObservableProperty;
use std::sync::Arc;

let temperature = ObservableProperty::new(20.0_f64);

{
    // Create filtered subscription that only triggers for significant temperature increases
    let _heat_warning = temperature.subscribe_filtered_with_subscription(
        Arc::new(|old_temp, new_temp| {
            println!("🔥 Heat warning! Temperature rose from {:.1}°C to {:.1}°C",
                     old_temp, new_temp);
        }),
        |old, new| new > old && (new - old) > 5.0  // Only trigger for increases > 5°C
    )?;

    // Create another filtered subscription for cooling alerts
    let _cooling_alert = temperature.subscribe_filtered_with_subscription(
        Arc::new(|old_temp, new_temp| {
            println!("❄️ Cooling alert! Temperature dropped from {:.1}°C to {:.1}°C",
                     old_temp, new_temp);
        }),
        |old, new| new < old && (old - new) > 3.0  // Only trigger for decreases > 3°C
    )?;

    // Test the filters
    temperature.set(22.0)?; // No alerts (increase of only 2°C)
    temperature.set(28.0)?; // Heat warning triggered (increase of 6°C from 22°C)
    temperature.set(23.0)?; // Cooling alert triggered (decrease of 5°C)

    // Both subscriptions are automatically cleaned up when they go out of scope
}

temperature.set(35.0)?; // No alerts - subscriptions were cleaned up
§Conditional Monitoring with Complex Filters
use observable_property::ObservableProperty;
use std::sync::Arc;

let stock_price = ObservableProperty::new(100.0_f64);

{
    // Monitor significant price movements (> 5% change)
    let _volatility_alert = stock_price.subscribe_filtered_with_subscription(
        Arc::new(|old_price, new_price| {
            let change_percent = ((new_price - old_price) / old_price * 100.0).abs();
            println!("📈 Significant price movement: ${:.2} -> ${:.2} ({:.1}%)",
                    old_price, new_price, change_percent);
        }),
        |old, new| {
            let change_percent = ((new - old) / old * 100.0).abs();
            change_percent > 5.0  // Trigger on > 5% change
        }
    )?;

    stock_price.set(103.0)?; // No alert (3% change)
    stock_price.set(108.0)?; // Alert triggered (4.85% from 103, but let's say it rounds up)
    stock_price.set(95.0)?;  // Alert triggered (12% decrease)

    // Subscription automatically cleaned up when leaving scope
}

stock_price.set(200.0)?; // No alert - monitoring ended
§Cross-Thread Filtered Monitoring
use observable_property::ObservableProperty;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

let network_latency = Arc::new(ObservableProperty::new(50)); // milliseconds
let latency_clone = network_latency.clone();

let monitor_handle = thread::spawn(move || -> Result<(), observable_property::PropertyError> {
    // Monitor high latency in background thread with automatic cleanup
    let _high_latency_alert = latency_clone.subscribe_filtered_with_subscription(
        Arc::new(|old_ms, new_ms| {
            println!("⚠️ High latency detected: {}ms -> {}ms", old_ms, new_ms);
        }),
        |_, new| *new > 100  // Alert when latency exceeds 100ms
    )?;

    // Simulate monitoring for a short time
    thread::sleep(Duration::from_millis(10));
     
    // Subscription automatically cleaned up when thread ends
    Ok(())
});

// Simulate network conditions
network_latency.set(80)?;  // No alert (under threshold)
network_latency.set(150)?; // Alert triggered in background thread

monitor_handle.join().unwrap()?;
network_latency.set(200)?; // No alert - background monitoring ended
§Use Cases

This method is ideal for:

  • Threshold-based monitoring with automatic cleanup
  • Temporary conditional observers in specific code blocks
  • Event-driven systems where observers should be active only during certain phases
  • Resource management scenarios where filtered observers have limited lifetimes
§Performance Notes

The filter function is evaluated for every property change, so it should be lightweight. Complex filtering logic should be optimized to avoid performance bottlenecks, especially in high-frequency update scenarios.

Source

pub fn with_config( initial_value: T, max_threads: usize, max_observers: usize, ) -> Self

Creates a new observable property with full configuration control

This constructor provides complete control over the property’s configuration, allowing you to customize both thread pool size and maximum observer count.

§Arguments
  • initial_value - The starting value for this property
  • max_threads - Maximum threads for async notifications (0 = use default)
  • max_observers - Maximum number of allowed observers (0 = use default)
§Examples
use observable_property::ObservableProperty;

// Create a property optimized for high-frequency updates with many observers
let property = ObservableProperty::with_config(0, 8, 50000);
assert_eq!(property.get().unwrap(), 0);
Source

pub fn observer_count(&self) -> usize

Returns the current number of active observers

This method is useful for debugging, monitoring, and testing to verify that observers are being properly managed and cleaned up.

§Returns

The number of currently subscribed observers, or 0 if the lock is poisoned.

§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;

let property = ObservableProperty::new(42);
assert_eq!(property.observer_count(), 0);

let id1 = property.subscribe(Arc::new(|_, _| {}))?;
assert_eq!(property.observer_count(), 1);

let id2 = property.subscribe(Arc::new(|_, _| {}))?;
assert_eq!(property.observer_count(), 2);

property.unsubscribe(id1)?;
assert_eq!(property.observer_count(), 1);
Source

pub fn try_get(&self) -> Option<T>

Gets the current value without Result wrapping

This is a convenience method that returns None if the lock is poisoned (which shouldn’t happen with graceful degradation) instead of a Result.

§Returns

Some(T) containing the current value, or None if somehow inaccessible.

§Examples
use observable_property::ObservableProperty;

let property = ObservableProperty::new(42);
assert_eq!(property.try_get(), Some(42));
Source

pub fn modify<F>(&self, f: F) -> Result<(), PropertyError>
where F: FnOnce(&mut T),

Atomically modifies the property value using a closure

This method allows you to update the property based on its current value in a single atomic operation. The closure receives a mutable reference to the value and can modify it in place.

§Arguments
  • f - A closure that receives &mut T and modifies it
§Returns

Ok(()) if successful, or Err(PropertyError) if the lock is poisoned.

§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;

let counter = ObservableProperty::new(0);

counter.subscribe(Arc::new(|old, new| {
    println!("Counter: {} -> {}", old, new);
}))?;

// Increment counter atomically
counter.modify(|value| *value += 1)?;
assert_eq!(counter.get()?, 1);

// Double the counter atomically
counter.modify(|value| *value *= 2)?;
assert_eq!(counter.get()?, 2);
Source§

impl<T: Clone + Default + Send + Sync + 'static> ObservableProperty<T>

Source

pub fn get_or_default(&self) -> T

Gets the current value or returns the default if inaccessible

This convenience method is only available when T implements Default. It provides a fallback to T::default() if the value cannot be read.

§Examples
use observable_property::ObservableProperty;

let property = ObservableProperty::new(42);
assert_eq!(property.get_or_default(), 42);

// Even if somehow inaccessible, returns default
let empty_property: ObservableProperty<i32> = ObservableProperty::new(0);
assert_eq!(empty_property.get_or_default(), 0);

Trait Implementations§

Source§

impl<T: Clone> Clone for ObservableProperty<T>

Source§

fn clone(&self) -> Self

Creates a new reference to the same observable property

This creates a new ObservableProperty instance that shares the same underlying data with the original. Changes made through either instance will be visible to observers subscribed through both instances.

§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;

let property1 = ObservableProperty::new(42);
let property2 = property1.clone();

property2.subscribe(Arc::new(|old, new| {
    println!("Observer on property2 saw change: {} -> {}", old, new);
})).map_err(|e| {
    eprintln!("Failed to subscribe: {}", e);
    e
})?;

// This change through property1 will trigger the observer on property2
property1.set(100).map_err(|e| {
    eprintln!("Failed to set value: {}", e);
    e
})?;
1.0.0§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<T: Clone + Debug + Send + Sync + 'static> Debug for ObservableProperty<T>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Debug implementation that shows the current value if accessible

Auto Trait Implementations§

§

impl<T> Freeze for ObservableProperty<T>

§

impl<T> RefUnwindSafe for ObservableProperty<T>

§

impl<T> Send for ObservableProperty<T>
where T: Send + Sync,

§

impl<T> Sync for ObservableProperty<T>
where T: Send + Sync,

§

impl<T> Unpin for ObservableProperty<T>

§

impl<T> UnwindSafe for ObservableProperty<T>

Blanket Implementations§

§

impl<T> Any for T
where T: 'static + ?Sized,

§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
§

impl<T> Borrow<T> for T
where T: ?Sized,

§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
§

impl<T> BorrowMut<T> for T
where T: ?Sized,

§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
§

impl<T> CloneToUninit for T
where T: Clone,

§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
§

impl<T> From<T> for T

§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T, U> Into<U> for T
where U: From<T>,

§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

§

impl<T> ToOwned for T
where T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.