pub struct ObservableProperty<T> { /* private fields */ }Expand description
A thread-safe observable property that notifies observers when its value changes
This type wraps a value of type T and allows multiple observers to be notified
whenever the value is modified. All operations are thread-safe and can be called
from multiple threads concurrently.
§Type Requirements
The generic type T must implement:
Clone: Required for returning values and passing them to observersSend: Required for transferring between threadsSync: Required for concurrent access from multiple threads'static: Required for observer callbacks that may outlive the original scope
§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;
let property = ObservableProperty::new("initial".to_string());
let observer_id = property.subscribe(Arc::new(|old, new| {
println!("Changed from '{}' to '{}'", old, new);
})).map_err(|e| {
eprintln!("Failed to subscribe: {}", e);
e
})?;
property.set("updated".to_string()).map_err(|e| {
eprintln!("Failed to set value: {}", e);
e
})?; // Prints: Changed from 'initial' to 'updated'
property.unsubscribe(observer_id).map_err(|e| {
eprintln!("Failed to unsubscribe: {}", e);
e
})?;Implementations§
Source§impl<T: Clone + Send + Sync + 'static> ObservableProperty<T>
impl<T: Clone + Send + Sync + 'static> ObservableProperty<T>
Sourcepub fn new(initial_value: T) -> Self
pub fn new(initial_value: T) -> Self
Creates a new observable property with the given initial value
§Arguments
initial_value- The starting value for this property
§Examples
use observable_property::ObservableProperty;
let property = ObservableProperty::new(42);
match property.get() {
Ok(value) => assert_eq!(value, 42),
Err(e) => eprintln!("Failed to get property value: {}", e),
}Examples found in repository?
16 fn new(name: String, age: i32) -> Self {
17 Self {
18 name: ObservableProperty::new(name),
19 age: ObservableProperty::new(age),
20 }
21 }
22
23 fn get_name(&self) -> Result<String, observable_property::PropertyError> {
24 self.name.get()
25 }
26
27 fn set_name(&self, new_name: String) -> Result<(), observable_property::PropertyError> {
28 self.name.set(new_name)
29 }
30
31 fn get_age(&self) -> Result<i32, observable_property::PropertyError> {
32 self.age.get()
33 }
34
35 fn set_age(&self, new_age: i32) -> Result<(), observable_property::PropertyError> {
36 self.age.set(new_age)
37 }
38
39 fn celebrate_birthday(&self) -> Result<(), observable_property::PropertyError> {
40 let current_age = self.age.get()?;
41 self.age.set(current_age + 1)
42 }
43}
44
45fn main() -> Result<(), Box<dyn std::error::Error>> {
46 println!("=== Observable Property Basic Example ===\n");
47
48 // Create a person
49 let person = Person::new("Alice".to_string(), 25);
50
51 // Subscribe to name changes
52 let name_observer_id = person.name.subscribe(Arc::new(|old_name, new_name| {
53 println!("📝 Name changed: '{}' → '{}'", old_name, new_name);
54 }))?;
55
56 // Subscribe to age changes
57 let age_observer_id = person.age.subscribe(Arc::new(|old_age, new_age| {
58 println!("🎂 Age changed: {} → {}", old_age, new_age);
59 }))?;
60
61 // Subscribe to only significant age changes (milestones)
62 let milestone_observer_id = person.age.subscribe_filtered(
63 Arc::new(|_old_age, new_age| {
64 println!("🎉 Milestone reached! {} is now a special age: {}",
65 if *new_age >= 18 { "Adult" } else { "Child" }, new_age);
66 }),
67 |old_age, new_age| {
68 // Notify on milestone ages
69 let milestones = [18, 21, 30, 40, 50, 65];
70 milestones.contains(new_age) ||
71 (milestones.contains(old_age) && !milestones.contains(new_age))
72 }
73 )?;
74
75 println!("Initial state:");
76 println!(" Name: {}", person.get_name()?);
77 println!(" Age: {}\n", person.get_age()?);
78
79 // Demonstrate property changes
80 println!("Making changes...\n");
81
82 // Change name
83 person.set_name("Alice Johnson".to_string())?;
84
85 // Age up a few times
86 person.celebrate_birthday()?;
87 person.celebrate_birthday()?;
88 person.celebrate_birthday()?;
89
90 // Change to milestone age
91 person.set_age(21)?;
92
93 // Direct property access
94 println!("\nDirect property access:");
95 let simple_property = ObservableProperty::new(100);
96
97 simple_property.subscribe(Arc::new(|old, new| {
98 println!("💰 Value changed: {} → {}", old, new);
99 }))?;
100
101 simple_property.set(200)?;
102 simple_property.set(150)?;
103
104 // Cleanup observers
105 person.name.unsubscribe(name_observer_id)?;
106 person.age.unsubscribe(age_observer_id)?;
107 person.age.unsubscribe(milestone_observer_id)?;
108
109 println!("\n✅ Example completed successfully!");
110 Ok(())
111}More examples
Sourcepub fn with_max_threads(initial_value: T, max_threads: usize) -> Self
pub fn with_max_threads(initial_value: T, max_threads: usize) -> Self
Creates a new observable property with a custom maximum thread count for async notifications
This constructor allows you to customize the maximum number of threads used for
asynchronous observer notifications via set_async(). This is useful for tuning
performance based on your specific use case and system constraints.
§Arguments
initial_value- The starting value for this propertymax_threads- Maximum number of threads to use for async notifications. If 0 is provided, defaults to 4.
§Thread Pool Behavior
When set_async() is called, observers are divided into batches and each batch
runs in its own thread, up to the specified maximum. For example:
- With 100 observers and
max_threads = 4: 4 threads with ~25 observers each - With 10 observers and
max_threads = 8: 10 threads with 1 observer each - With 2 observers and
max_threads = 4: 2 threads with 1 observer each
§Use Cases
§High-Throughput Systems
use observable_property::ObservableProperty;
// For systems with many CPU cores and CPU-bound observers
let property = ObservableProperty::with_max_threads(0, 8);§Resource-Constrained Systems
use observable_property::ObservableProperty;
// For embedded systems or memory-constrained environments
let property = ObservableProperty::with_max_threads(42, 1);§I/O-Heavy Observers
use observable_property::ObservableProperty;
// For observers that do network/database operations
let property = ObservableProperty::with_max_threads("data".to_string(), 16);§Performance Considerations
- Higher values: Better parallelism but more thread overhead and memory usage
- Lower values: Less overhead but potentially slower async notifications
- Optimal range: Typically between 1 and 2x the number of CPU cores
- Zero value: Automatically uses the default value (4)
§Thread Safety
This setting only affects async notifications (set_async()). Synchronous
operations (set()) always execute observers sequentially regardless of this setting.
§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;
// Create property with custom thread pool size
let property = ObservableProperty::with_max_threads(42, 2);
// Subscribe observers as usual
let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
println!("Value changed: {} -> {}", old, new);
})).expect("Failed to create subscription");
// Async notifications will use at most 2 threads
property.set_async(100).expect("Failed to set value asynchronously");Sourcepub fn get(&self) -> Result<T, PropertyError>
pub fn get(&self) -> Result<T, PropertyError>
Gets the current value of the property
This method acquires a read lock, which allows multiple concurrent readers but will block if a writer currently holds the lock.
§Returns
Ok(T) containing a clone of the current value, or Err(PropertyError)
if the lock is poisoned.
§Examples
use observable_property::ObservableProperty;
let property = ObservableProperty::new("hello".to_string());
match property.get() {
Ok(value) => assert_eq!(value, "hello"),
Err(e) => eprintln!("Failed to get property value: {}", e),
}Examples found in repository?
23 fn get_name(&self) -> Result<String, observable_property::PropertyError> {
24 self.name.get()
25 }
26
27 fn set_name(&self, new_name: String) -> Result<(), observable_property::PropertyError> {
28 self.name.set(new_name)
29 }
30
31 fn get_age(&self) -> Result<i32, observable_property::PropertyError> {
32 self.age.get()
33 }
34
35 fn set_age(&self, new_age: i32) -> Result<(), observable_property::PropertyError> {
36 self.age.set(new_age)
37 }
38
39 fn celebrate_birthday(&self) -> Result<(), observable_property::PropertyError> {
40 let current_age = self.age.get()?;
41 self.age.set(current_age + 1)
42 }More examples
21 fn get_name(&self) -> Result<String, observable_property::PropertyError> {
22 self.name.get()
23 }
24
25 fn set_name(&self, new_name: String) -> Result<(), observable_property::PropertyError> {
26 self.name.set(new_name)
27 }
28
29 fn get_age(&self) -> Result<i32, observable_property::PropertyError> {
30 self.age.get()
31 }
32
33 fn set_age(&self, new_age: i32) -> Result<(), observable_property::PropertyError> {
34 self.age.set(new_age)
35 }
36
37 fn celebrate_birthday(&self) -> Result<(), observable_property::PropertyError> {
38 let current_age = self.age.get()?;
39 self.age.set(current_age + 1)
40 }Sourcepub fn set(&self, new_value: T) -> Result<(), PropertyError>
pub fn set(&self, new_value: T) -> Result<(), PropertyError>
Sets the property to a new value and notifies all observers
This method will:
- Acquire a write lock (blocking other readers/writers)
- Update the value and capture a snapshot of observers
- Release the lock
- Notify all observers sequentially with the old and new values
Observer notifications are wrapped in panic recovery to prevent one misbehaving observer from affecting others.
§Arguments
new_value- The new value to set
§Returns
Ok(()) if successful, or Err(PropertyError) if the lock is poisoned.
§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;
let property = ObservableProperty::new(10);
property.subscribe(Arc::new(|old, new| {
println!("Value changed from {} to {}", old, new);
})).map_err(|e| {
eprintln!("Failed to subscribe: {}", e);
e
})?;
property.set(20).map_err(|e| {
eprintln!("Failed to set property value: {}", e);
e
})?; // Triggers observer notificationExamples found in repository?
27 fn set_name(&self, new_name: String) -> Result<(), observable_property::PropertyError> {
28 self.name.set(new_name)
29 }
30
31 fn get_age(&self) -> Result<i32, observable_property::PropertyError> {
32 self.age.get()
33 }
34
35 fn set_age(&self, new_age: i32) -> Result<(), observable_property::PropertyError> {
36 self.age.set(new_age)
37 }
38
39 fn celebrate_birthday(&self) -> Result<(), observable_property::PropertyError> {
40 let current_age = self.age.get()?;
41 self.age.set(current_age + 1)
42 }
43}
44
45fn main() -> Result<(), Box<dyn std::error::Error>> {
46 println!("=== Observable Property Basic Example ===\n");
47
48 // Create a person
49 let person = Person::new("Alice".to_string(), 25);
50
51 // Subscribe to name changes
52 let name_observer_id = person.name.subscribe(Arc::new(|old_name, new_name| {
53 println!("📝 Name changed: '{}' → '{}'", old_name, new_name);
54 }))?;
55
56 // Subscribe to age changes
57 let age_observer_id = person.age.subscribe(Arc::new(|old_age, new_age| {
58 println!("🎂 Age changed: {} → {}", old_age, new_age);
59 }))?;
60
61 // Subscribe to only significant age changes (milestones)
62 let milestone_observer_id = person.age.subscribe_filtered(
63 Arc::new(|_old_age, new_age| {
64 println!("🎉 Milestone reached! {} is now a special age: {}",
65 if *new_age >= 18 { "Adult" } else { "Child" }, new_age);
66 }),
67 |old_age, new_age| {
68 // Notify on milestone ages
69 let milestones = [18, 21, 30, 40, 50, 65];
70 milestones.contains(new_age) ||
71 (milestones.contains(old_age) && !milestones.contains(new_age))
72 }
73 )?;
74
75 println!("Initial state:");
76 println!(" Name: {}", person.get_name()?);
77 println!(" Age: {}\n", person.get_age()?);
78
79 // Demonstrate property changes
80 println!("Making changes...\n");
81
82 // Change name
83 person.set_name("Alice Johnson".to_string())?;
84
85 // Age up a few times
86 person.celebrate_birthday()?;
87 person.celebrate_birthday()?;
88 person.celebrate_birthday()?;
89
90 // Change to milestone age
91 person.set_age(21)?;
92
93 // Direct property access
94 println!("\nDirect property access:");
95 let simple_property = ObservableProperty::new(100);
96
97 simple_property.subscribe(Arc::new(|old, new| {
98 println!("💰 Value changed: {} → {}", old, new);
99 }))?;
100
101 simple_property.set(200)?;
102 simple_property.set(150)?;
103
104 // Cleanup observers
105 person.name.unsubscribe(name_observer_id)?;
106 person.age.unsubscribe(age_observer_id)?;
107 person.age.unsubscribe(milestone_observer_id)?;
108
109 println!("\n✅ Example completed successfully!");
110 Ok(())
111}More examples
25 fn set_name(&self, new_name: String) -> Result<(), observable_property::PropertyError> {
26 self.name.set(new_name)
27 }
28
29 fn get_age(&self) -> Result<i32, observable_property::PropertyError> {
30 self.age.get()
31 }
32
33 fn set_age(&self, new_age: i32) -> Result<(), observable_property::PropertyError> {
34 self.age.set(new_age)
35 }
36
37 fn celebrate_birthday(&self) -> Result<(), observable_property::PropertyError> {
38 let current_age = self.age.get()?;
39 self.age.set(current_age + 1)
40 }Sourcepub fn set_async(&self, new_value: T) -> Result<(), PropertyError>
pub fn set_async(&self, new_value: T) -> Result<(), PropertyError>
Sets the property to a new value and notifies observers asynchronously
This method is similar to set() but spawns observers in background threads
for non-blocking operation. This is useful when observers might perform
time-consuming operations.
Observers are batched into groups and each batch runs in its own thread to limit resource usage while still providing parallelism.
§Thread Management (Fire-and-Forget Pattern)
Important: This method uses a fire-and-forget pattern. Spawned threads are not joined and run independently in the background. This design is intentional for non-blocking behavior but has important implications:
§Characteristics:
- ✅ Non-blocking: Returns immediately without waiting for observers
- ✅ High performance: No synchronization overhead
- ⚠️ No completion guarantee: Thread may still be running when method returns
- ⚠️ No error propagation: Observer errors are logged but not returned
- ⚠️ Testing caveat: May need explicit delays to observe side effects
- ⚠️ Ordering caveat: Multiple rapid
set_async()calls may result in observers receiving notifications out of order due to thread scheduling. Useset()if sequential ordering is critical.
§Use Cases:
- UI updates: Fire updates without blocking the main thread
- Logging: Asynchronous logging that doesn’t block operations
- Metrics: Non-critical telemetry that can be lost
- Notifications: Fire-and-forget alerts or messages
§When NOT to Use:
- Critical operations: Use
set()if you need guarantees - Transactional updates: Use
set()for atomic operations - Sequential dependencies: If next operation depends on observer completion
§Testing Considerations:
use observable_property::ObservableProperty;
use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
use std::time::Duration;
let property = ObservableProperty::new(0);
let was_called = Arc::new(AtomicBool::new(false));
let flag = was_called.clone();
property.subscribe(Arc::new(move |_, _| {
flag.store(true, Ordering::SeqCst);
}))?;
property.set_async(42)?;
// ⚠️ Immediate check might fail - thread may not have run yet
// assert!(was_called.load(Ordering::SeqCst)); // May fail!
// ✅ Add a small delay to allow background thread to complete
std::thread::sleep(Duration::from_millis(10));
assert!(was_called.load(Ordering::SeqCst)); // Now reliable§Arguments
new_value- The new value to set
§Returns
Ok(()) if successful, or Err(PropertyError) if the lock is poisoned.
Note that this only indicates the property was updated successfully;
observer execution happens asynchronously and errors are not returned.
§Examples
§Basic Usage
use observable_property::ObservableProperty;
use std::sync::Arc;
use std::time::Duration;
let property = ObservableProperty::new(0);
property.subscribe(Arc::new(|old, new| {
// This observer does slow work but won't block the caller
std::thread::sleep(Duration::from_millis(100));
println!("Slow observer: {} -> {}", old, new);
})).map_err(|e| {
eprintln!("Failed to subscribe: {}", e);
e
})?;
// This returns immediately even though observer is slow
property.set_async(42).map_err(|e| {
eprintln!("Failed to set value asynchronously: {}", e);
e
})?;
// Continue working immediately - observer runs in background
println!("Main thread continues without waiting");§Multiple Rapid Updates
use observable_property::ObservableProperty;
use std::sync::Arc;
let property = ObservableProperty::new(0);
property.subscribe(Arc::new(|old, new| {
// Expensive operation (e.g., database update, API call)
println!("Processing: {} -> {}", old, new);
}))?;
// All of these return immediately - observers run in parallel
property.set_async(1)?;
property.set_async(2)?;
property.set_async(3)?;
property.set_async(4)?;
property.set_async(5)?;
// All observer calls are now running in background threadsSourcepub fn subscribe(
&self,
observer: Observer<T>,
) -> Result<ObserverId, PropertyError>
pub fn subscribe( &self, observer: Observer<T>, ) -> Result<ObserverId, PropertyError>
Subscribes an observer function to be called when the property changes
The observer function will be called with the old and new values whenever
the property is modified via set() or set_async().
§Arguments
observer- A function wrapped inArcthat takes(&T, &T)parameters
§Returns
Ok(ObserverId) containing a unique identifier for this observer,
or Err(PropertyError::InvalidConfiguration) if the maximum observer limit is exceeded.
§Observer Limit
To prevent memory exhaustion, there is a maximum limit of observers per property
(currently set to 10,000). If you attempt to add more observers than this limit,
the subscription will fail with an InvalidConfiguration error.
This protection helps prevent:
- Memory leaks from forgotten unsubscriptions
- Unbounded memory growth in long-running applications
- Out-of-memory conditions in resource-constrained environments
§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;
let property = ObservableProperty::new(0);
let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
println!("Property changed from {} to {}", old_value, new_value);
})).map_err(|e| {
eprintln!("Failed to subscribe observer: {}", e);
e
})?;
// Later, unsubscribe using the returned ID
property.unsubscribe(observer_id).map_err(|e| {
eprintln!("Failed to unsubscribe observer: {}", e);
e
})?;Examples found in repository?
44fn main() -> Result<(), Box<dyn std::error::Error>> {
45 println!("=== Observable Property Multithreaded Person Example ===\n");
46
47 let person = Arc::new(Person::new("Bob".to_string(), 30));
48
49 // Subscribe to name changes
50 let name_observer_id = person.name.subscribe(Arc::new(|old_name, new_name| {
51 println!("📝 Name changed: '{}' → '{}'", old_name, new_name);
52 }))?;
53
54 // Subscribe to age changes
55 let age_observer_id = person.age.subscribe(Arc::new(|old_age, new_age| {
56 println!("🎂 Age changed: {} → {}", old_age, new_age);
57 }))?;
58
59 // Spawn threads to update name and age concurrently
60 let person_clone1 = person.clone();
61 let handle1 = thread::spawn(move || {
62 for i in 0..3 {
63 let new_name = format!("Bob #{}", i + 1);
64 person_clone1.set_name(new_name).unwrap();
65 thread::sleep(Duration::from_millis(50));
66 }
67 });
68
69 let person_clone2 = person.clone();
70 let handle2 = thread::spawn(move || {
71 for _ in 0..3 {
72 person_clone2.celebrate_birthday().unwrap();
73 thread::sleep(Duration::from_millis(30));
74 }
75 });
76
77 handle1.join().unwrap();
78 handle2.join().unwrap();
79
80 println!("\nFinal state:");
81 println!(" Name: {}", person.get_name()?);
82 println!(" Age: {}", person.get_age()?);
83
84 // Cleanup observers
85 person.name.unsubscribe(name_observer_id)?;
86 person.age.unsubscribe(age_observer_id)?;
87
88 println!("\n✅ Multithreaded Person example completed successfully!");
89 Ok(())
90}More examples
45fn main() -> Result<(), Box<dyn std::error::Error>> {
46 println!("=== Observable Property Basic Example ===\n");
47
48 // Create a person
49 let person = Person::new("Alice".to_string(), 25);
50
51 // Subscribe to name changes
52 let name_observer_id = person.name.subscribe(Arc::new(|old_name, new_name| {
53 println!("📝 Name changed: '{}' → '{}'", old_name, new_name);
54 }))?;
55
56 // Subscribe to age changes
57 let age_observer_id = person.age.subscribe(Arc::new(|old_age, new_age| {
58 println!("🎂 Age changed: {} → {}", old_age, new_age);
59 }))?;
60
61 // Subscribe to only significant age changes (milestones)
62 let milestone_observer_id = person.age.subscribe_filtered(
63 Arc::new(|_old_age, new_age| {
64 println!("🎉 Milestone reached! {} is now a special age: {}",
65 if *new_age >= 18 { "Adult" } else { "Child" }, new_age);
66 }),
67 |old_age, new_age| {
68 // Notify on milestone ages
69 let milestones = [18, 21, 30, 40, 50, 65];
70 milestones.contains(new_age) ||
71 (milestones.contains(old_age) && !milestones.contains(new_age))
72 }
73 )?;
74
75 println!("Initial state:");
76 println!(" Name: {}", person.get_name()?);
77 println!(" Age: {}\n", person.get_age()?);
78
79 // Demonstrate property changes
80 println!("Making changes...\n");
81
82 // Change name
83 person.set_name("Alice Johnson".to_string())?;
84
85 // Age up a few times
86 person.celebrate_birthday()?;
87 person.celebrate_birthday()?;
88 person.celebrate_birthday()?;
89
90 // Change to milestone age
91 person.set_age(21)?;
92
93 // Direct property access
94 println!("\nDirect property access:");
95 let simple_property = ObservableProperty::new(100);
96
97 simple_property.subscribe(Arc::new(|old, new| {
98 println!("💰 Value changed: {} → {}", old, new);
99 }))?;
100
101 simple_property.set(200)?;
102 simple_property.set(150)?;
103
104 // Cleanup observers
105 person.name.unsubscribe(name_observer_id)?;
106 person.age.unsubscribe(age_observer_id)?;
107 person.age.unsubscribe(milestone_observer_id)?;
108
109 println!("\n✅ Example completed successfully!");
110 Ok(())
111}Sourcepub fn unsubscribe(&self, id: ObserverId) -> Result<bool, PropertyError>
pub fn unsubscribe(&self, id: ObserverId) -> Result<bool, PropertyError>
Removes an observer identified by its ID
§Arguments
id- The observer ID returned bysubscribe()
§Returns
Ok(bool) where true means the observer was found and removed,
false means no observer with that ID existed.
Returns Err(PropertyError) if the lock is poisoned.
§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;
let property = ObservableProperty::new(0);
let id = property.subscribe(Arc::new(|_, _| {})).map_err(|e| {
eprintln!("Failed to subscribe: {}", e);
e
})?;
let was_removed = property.unsubscribe(id).map_err(|e| {
eprintln!("Failed to unsubscribe: {}", e);
e
})?;
assert!(was_removed); // Observer existed and was removed
let was_removed_again = property.unsubscribe(id).map_err(|e| {
eprintln!("Failed to unsubscribe again: {}", e);
e
})?;
assert!(!was_removed_again); // Observer no longer existsExamples found in repository?
44fn main() -> Result<(), Box<dyn std::error::Error>> {
45 println!("=== Observable Property Multithreaded Person Example ===\n");
46
47 let person = Arc::new(Person::new("Bob".to_string(), 30));
48
49 // Subscribe to name changes
50 let name_observer_id = person.name.subscribe(Arc::new(|old_name, new_name| {
51 println!("📝 Name changed: '{}' → '{}'", old_name, new_name);
52 }))?;
53
54 // Subscribe to age changes
55 let age_observer_id = person.age.subscribe(Arc::new(|old_age, new_age| {
56 println!("🎂 Age changed: {} → {}", old_age, new_age);
57 }))?;
58
59 // Spawn threads to update name and age concurrently
60 let person_clone1 = person.clone();
61 let handle1 = thread::spawn(move || {
62 for i in 0..3 {
63 let new_name = format!("Bob #{}", i + 1);
64 person_clone1.set_name(new_name).unwrap();
65 thread::sleep(Duration::from_millis(50));
66 }
67 });
68
69 let person_clone2 = person.clone();
70 let handle2 = thread::spawn(move || {
71 for _ in 0..3 {
72 person_clone2.celebrate_birthday().unwrap();
73 thread::sleep(Duration::from_millis(30));
74 }
75 });
76
77 handle1.join().unwrap();
78 handle2.join().unwrap();
79
80 println!("\nFinal state:");
81 println!(" Name: {}", person.get_name()?);
82 println!(" Age: {}", person.get_age()?);
83
84 // Cleanup observers
85 person.name.unsubscribe(name_observer_id)?;
86 person.age.unsubscribe(age_observer_id)?;
87
88 println!("\n✅ Multithreaded Person example completed successfully!");
89 Ok(())
90}More examples
45fn main() -> Result<(), Box<dyn std::error::Error>> {
46 println!("=== Observable Property Basic Example ===\n");
47
48 // Create a person
49 let person = Person::new("Alice".to_string(), 25);
50
51 // Subscribe to name changes
52 let name_observer_id = person.name.subscribe(Arc::new(|old_name, new_name| {
53 println!("📝 Name changed: '{}' → '{}'", old_name, new_name);
54 }))?;
55
56 // Subscribe to age changes
57 let age_observer_id = person.age.subscribe(Arc::new(|old_age, new_age| {
58 println!("🎂 Age changed: {} → {}", old_age, new_age);
59 }))?;
60
61 // Subscribe to only significant age changes (milestones)
62 let milestone_observer_id = person.age.subscribe_filtered(
63 Arc::new(|_old_age, new_age| {
64 println!("🎉 Milestone reached! {} is now a special age: {}",
65 if *new_age >= 18 { "Adult" } else { "Child" }, new_age);
66 }),
67 |old_age, new_age| {
68 // Notify on milestone ages
69 let milestones = [18, 21, 30, 40, 50, 65];
70 milestones.contains(new_age) ||
71 (milestones.contains(old_age) && !milestones.contains(new_age))
72 }
73 )?;
74
75 println!("Initial state:");
76 println!(" Name: {}", person.get_name()?);
77 println!(" Age: {}\n", person.get_age()?);
78
79 // Demonstrate property changes
80 println!("Making changes...\n");
81
82 // Change name
83 person.set_name("Alice Johnson".to_string())?;
84
85 // Age up a few times
86 person.celebrate_birthday()?;
87 person.celebrate_birthday()?;
88 person.celebrate_birthday()?;
89
90 // Change to milestone age
91 person.set_age(21)?;
92
93 // Direct property access
94 println!("\nDirect property access:");
95 let simple_property = ObservableProperty::new(100);
96
97 simple_property.subscribe(Arc::new(|old, new| {
98 println!("💰 Value changed: {} → {}", old, new);
99 }))?;
100
101 simple_property.set(200)?;
102 simple_property.set(150)?;
103
104 // Cleanup observers
105 person.name.unsubscribe(name_observer_id)?;
106 person.age.unsubscribe(age_observer_id)?;
107 person.age.unsubscribe(milestone_observer_id)?;
108
109 println!("\n✅ Example completed successfully!");
110 Ok(())
111}Sourcepub fn subscribe_filtered<F>(
&self,
observer: Observer<T>,
filter: F,
) -> Result<ObserverId, PropertyError>
pub fn subscribe_filtered<F>( &self, observer: Observer<T>, filter: F, ) -> Result<ObserverId, PropertyError>
Subscribes an observer that only gets called when a filter condition is met
This is useful for observing only specific types of changes, such as when a value increases or crosses a threshold.
§Arguments
observer- The observer function to call when the filter passesfilter- A predicate function that receives(old_value, new_value)and returnsbool
§Returns
Ok(ObserverId) for the filtered observer, or Err(PropertyError) if the lock is poisoned.
§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;
let property = ObservableProperty::new(0);
// Only notify when value increases
let id = property.subscribe_filtered(
Arc::new(|old, new| println!("Value increased: {} -> {}", old, new)),
|old, new| new > old
).map_err(|e| {
eprintln!("Failed to subscribe filtered observer: {}", e);
e
})?;
property.set(10).map_err(|e| {
eprintln!("Failed to set value: {}", e);
e
})?; // Triggers observer (0 -> 10)
property.set(5).map_err(|e| {
eprintln!("Failed to set value: {}", e);
e
})?; // Does NOT trigger observer (10 -> 5)
property.set(15).map_err(|e| {
eprintln!("Failed to set value: {}", e);
e
})?; // Triggers observer (5 -> 15)Examples found in repository?
45fn main() -> Result<(), Box<dyn std::error::Error>> {
46 println!("=== Observable Property Basic Example ===\n");
47
48 // Create a person
49 let person = Person::new("Alice".to_string(), 25);
50
51 // Subscribe to name changes
52 let name_observer_id = person.name.subscribe(Arc::new(|old_name, new_name| {
53 println!("📝 Name changed: '{}' → '{}'", old_name, new_name);
54 }))?;
55
56 // Subscribe to age changes
57 let age_observer_id = person.age.subscribe(Arc::new(|old_age, new_age| {
58 println!("🎂 Age changed: {} → {}", old_age, new_age);
59 }))?;
60
61 // Subscribe to only significant age changes (milestones)
62 let milestone_observer_id = person.age.subscribe_filtered(
63 Arc::new(|_old_age, new_age| {
64 println!("🎉 Milestone reached! {} is now a special age: {}",
65 if *new_age >= 18 { "Adult" } else { "Child" }, new_age);
66 }),
67 |old_age, new_age| {
68 // Notify on milestone ages
69 let milestones = [18, 21, 30, 40, 50, 65];
70 milestones.contains(new_age) ||
71 (milestones.contains(old_age) && !milestones.contains(new_age))
72 }
73 )?;
74
75 println!("Initial state:");
76 println!(" Name: {}", person.get_name()?);
77 println!(" Age: {}\n", person.get_age()?);
78
79 // Demonstrate property changes
80 println!("Making changes...\n");
81
82 // Change name
83 person.set_name("Alice Johnson".to_string())?;
84
85 // Age up a few times
86 person.celebrate_birthday()?;
87 person.celebrate_birthday()?;
88 person.celebrate_birthday()?;
89
90 // Change to milestone age
91 person.set_age(21)?;
92
93 // Direct property access
94 println!("\nDirect property access:");
95 let simple_property = ObservableProperty::new(100);
96
97 simple_property.subscribe(Arc::new(|old, new| {
98 println!("💰 Value changed: {} → {}", old, new);
99 }))?;
100
101 simple_property.set(200)?;
102 simple_property.set(150)?;
103
104 // Cleanup observers
105 person.name.unsubscribe(name_observer_id)?;
106 person.age.unsubscribe(age_observer_id)?;
107 person.age.unsubscribe(milestone_observer_id)?;
108
109 println!("\n✅ Example completed successfully!");
110 Ok(())
111}Sourcepub fn notify_observers_batch(
&self,
changes: Vec<(T, T)>,
) -> Result<(), PropertyError>
pub fn notify_observers_batch( &self, changes: Vec<(T, T)>, ) -> Result<(), PropertyError>
Notifies all observers with a batch of changes
This method allows you to trigger observer notifications for multiple
value changes efficiently. Unlike individual set() calls, this method
acquires the observer list once and then notifies all observers with each
change in the batch.
§Performance Characteristics
- Lock optimization: Acquires read lock only to snapshot observers, then releases it
- Non-blocking: Other operations can proceed during observer notifications
- Panic isolation: Individual observer panics don’t affect other observers
§Arguments
changes- A vector of tuples(old_value, new_value)to notify observers about
§Returns
Ok(()) if successful. Observer errors are logged but don’t cause the method to fail.
§Examples
use observable_property::ObservableProperty;
use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
let property = ObservableProperty::new(0);
let call_count = Arc::new(AtomicUsize::new(0));
let count_clone = call_count.clone();
property.subscribe(Arc::new(move |old, new| {
count_clone.fetch_add(1, Ordering::SeqCst);
println!("Change: {} -> {}", old, new);
}))?;
// Notify with multiple changes at once
property.notify_observers_batch(vec![
(0, 10),
(10, 20),
(20, 30),
])?;
assert_eq!(call_count.load(Ordering::SeqCst), 3);§Note
This method does NOT update the property’s actual value - it only triggers
observer notifications. Use set() if you want to update the value and
notify observers.
Sourcepub fn subscribe_with_subscription(
&self,
observer: Observer<T>,
) -> Result<Subscription<T>, PropertyError>
pub fn subscribe_with_subscription( &self, observer: Observer<T>, ) -> Result<Subscription<T>, PropertyError>
Subscribes an observer and returns a RAII guard for automatic cleanup
This method is similar to subscribe() but returns a Subscription object
that automatically removes the observer when it goes out of scope. This
provides a more convenient and safer alternative to manual subscription
management.
§Arguments
observer- A function wrapped inArcthat takes(&T, &T)parameters
§Returns
Ok(Subscription<T>) containing a RAII guard for the observer,
or Err(PropertyError) if the lock is poisoned.
§Examples
§Basic RAII Subscription
use observable_property::ObservableProperty;
use std::sync::Arc;
let property = ObservableProperty::new(0);
{
let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
println!("Value: {} -> {}", old, new);
}))?;
property.set(42)?; // Prints: "Value: 0 -> 42"
property.set(100)?; // Prints: "Value: 42 -> 100"
// Automatic cleanup when _subscription goes out of scope
}
property.set(200)?; // No output - subscription was cleaned up§Comparison with Manual Management
use observable_property::ObservableProperty;
use std::sync::Arc;
let property = ObservableProperty::new("initial".to_string());
// Method 1: Manual subscription management (traditional approach)
let observer_id = property.subscribe(Arc::new(|old, new| {
println!("Manual: {} -> {}", old, new);
}))?;
// Method 2: RAII subscription management (recommended)
let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
println!("RAII: {} -> {}", old, new);
}))?;
// Both observers will be called
property.set("changed".to_string())?;
// Prints:
// "Manual: initial -> changed"
// "RAII: initial -> changed"
// Manual cleanup required for first observer
property.unsubscribe(observer_id)?;
// Second observer (_subscription) is automatically cleaned up when
// the variable goes out of scope - no manual intervention needed§Error Handling with Early Returns
use observable_property::ObservableProperty;
use std::sync::Arc;
fn process_with_monitoring(property: &ObservableProperty<i32>) -> Result<(), observable_property::PropertyError> {
let _monitoring = property.subscribe_with_subscription(Arc::new(|old, new| {
println!("Processing: {} -> {}", old, new);
}))?;
property.set(1)?;
if property.get()? > 0 {
return Ok(()); // Subscription automatically cleaned up on early return
}
property.set(2)?;
Ok(()) // Subscription automatically cleaned up on normal return
}
let property = ObservableProperty::new(0);
process_with_monitoring(&property)?; // Monitoring active only during function call
property.set(99)?; // No monitoring output - subscription was cleaned up§Multi-threaded Subscription Management
use observable_property::ObservableProperty;
use std::sync::Arc;
use std::thread;
let property = Arc::new(ObservableProperty::new(0));
let property_clone = property.clone();
let handle = thread::spawn(move || -> Result<(), observable_property::PropertyError> {
let _subscription = property_clone.subscribe_with_subscription(Arc::new(|old, new| {
println!("Thread observer: {} -> {}", old, new);
}))?;
property_clone.set(42)?; // Prints: "Thread observer: 0 -> 42"
// Subscription automatically cleaned up when thread ends
Ok(())
});
handle.join().unwrap()?;
property.set(100)?; // No output - thread subscription was cleaned up§Use Cases
This method is particularly useful in scenarios such as:
- Temporary observers that should be active only during a specific scope
- Error-prone code where manual cleanup might be forgotten
- Complex control flow where multiple exit points make manual cleanup difficult
- Resource-constrained environments where observer leaks are problematic
Sourcepub fn subscribe_filtered_with_subscription<F>(
&self,
observer: Observer<T>,
filter: F,
) -> Result<Subscription<T>, PropertyError>
pub fn subscribe_filtered_with_subscription<F>( &self, observer: Observer<T>, filter: F, ) -> Result<Subscription<T>, PropertyError>
Subscribes a filtered observer and returns a RAII guard for automatic cleanup
This method combines the functionality of subscribe_filtered() with the automatic
cleanup benefits of subscribe_with_subscription(). The observer will only be
called when the filter condition is satisfied, and it will be automatically
unsubscribed when the returned Subscription goes out of scope.
§Arguments
observer- The observer function to call when the filter passesfilter- A predicate function that receives(old_value, new_value)and returnsbool
§Returns
Ok(Subscription<T>) containing a RAII guard for the filtered observer,
or Err(PropertyError) if the lock is poisoned.
§Examples
§Basic Filtered RAII Subscription
use observable_property::ObservableProperty;
use std::sync::Arc;
let counter = ObservableProperty::new(0);
{
// Monitor only increases with automatic cleanup
let _increase_monitor = counter.subscribe_filtered_with_subscription(
Arc::new(|old, new| {
println!("Counter increased: {} -> {}", old, new);
}),
|old, new| new > old
)?;
counter.set(5)?; // Prints: "Counter increased: 0 -> 5"
counter.set(3)?; // No output (decrease)
counter.set(7)?; // Prints: "Counter increased: 3 -> 7"
// Subscription automatically cleaned up when leaving scope
}
counter.set(10)?; // No output - subscription was cleaned up§Multi-Condition Temperature Monitoring
use observable_property::ObservableProperty;
use std::sync::Arc;
let temperature = ObservableProperty::new(20.0_f64);
{
// Create filtered subscription that only triggers for significant temperature increases
let _heat_warning = temperature.subscribe_filtered_with_subscription(
Arc::new(|old_temp, new_temp| {
println!("🔥 Heat warning! Temperature rose from {:.1}°C to {:.1}°C",
old_temp, new_temp);
}),
|old, new| new > old && (new - old) > 5.0 // Only trigger for increases > 5°C
)?;
// Create another filtered subscription for cooling alerts
let _cooling_alert = temperature.subscribe_filtered_with_subscription(
Arc::new(|old_temp, new_temp| {
println!("❄️ Cooling alert! Temperature dropped from {:.1}°C to {:.1}°C",
old_temp, new_temp);
}),
|old, new| new < old && (old - new) > 3.0 // Only trigger for decreases > 3°C
)?;
// Test the filters
temperature.set(22.0)?; // No alerts (increase of only 2°C)
temperature.set(28.0)?; // Heat warning triggered (increase of 6°C from 22°C)
temperature.set(23.0)?; // Cooling alert triggered (decrease of 5°C)
// Both subscriptions are automatically cleaned up when they go out of scope
}
temperature.set(35.0)?; // No alerts - subscriptions were cleaned up§Conditional Monitoring with Complex Filters
use observable_property::ObservableProperty;
use std::sync::Arc;
let stock_price = ObservableProperty::new(100.0_f64);
{
// Monitor significant price movements (> 5% change)
let _volatility_alert = stock_price.subscribe_filtered_with_subscription(
Arc::new(|old_price, new_price| {
let change_percent = ((new_price - old_price) / old_price * 100.0).abs();
println!("📈 Significant price movement: ${:.2} -> ${:.2} ({:.1}%)",
old_price, new_price, change_percent);
}),
|old, new| {
let change_percent = ((new - old) / old * 100.0).abs();
change_percent > 5.0 // Trigger on > 5% change
}
)?;
stock_price.set(103.0)?; // No alert (3% change)
stock_price.set(108.0)?; // Alert triggered (4.85% from 103, but let's say it rounds up)
stock_price.set(95.0)?; // Alert triggered (12% decrease)
// Subscription automatically cleaned up when leaving scope
}
stock_price.set(200.0)?; // No alert - monitoring ended§Cross-Thread Filtered Monitoring
use observable_property::ObservableProperty;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
let network_latency = Arc::new(ObservableProperty::new(50)); // milliseconds
let latency_clone = network_latency.clone();
let monitor_handle = thread::spawn(move || -> Result<(), observable_property::PropertyError> {
// Monitor high latency in background thread with automatic cleanup
let _high_latency_alert = latency_clone.subscribe_filtered_with_subscription(
Arc::new(|old_ms, new_ms| {
println!("⚠️ High latency detected: {}ms -> {}ms", old_ms, new_ms);
}),
|_, new| *new > 100 // Alert when latency exceeds 100ms
)?;
// Simulate monitoring for a short time
thread::sleep(Duration::from_millis(10));
// Subscription automatically cleaned up when thread ends
Ok(())
});
// Simulate network conditions
network_latency.set(80)?; // No alert (under threshold)
network_latency.set(150)?; // Alert triggered in background thread
monitor_handle.join().unwrap()?;
network_latency.set(200)?; // No alert - background monitoring ended§Use Cases
This method is ideal for:
- Threshold-based monitoring with automatic cleanup
- Temporary conditional observers in specific code blocks
- Event-driven systems where observers should be active only during certain phases
- Resource management scenarios where filtered observers have limited lifetimes
§Performance Notes
The filter function is evaluated for every property change, so it should be lightweight. Complex filtering logic should be optimized to avoid performance bottlenecks, especially in high-frequency update scenarios.
Sourcepub fn with_config(
initial_value: T,
max_threads: usize,
max_observers: usize,
) -> Self
pub fn with_config( initial_value: T, max_threads: usize, max_observers: usize, ) -> Self
Creates a new observable property with full configuration control
This constructor provides complete control over the property’s configuration, allowing you to customize both thread pool size and maximum observer count.
§Arguments
initial_value- The starting value for this propertymax_threads- Maximum threads for async notifications (0 = use default)max_observers- Maximum number of allowed observers (0 = use default)
§Examples
use observable_property::ObservableProperty;
// Create a property optimized for high-frequency updates with many observers
let property = ObservableProperty::with_config(0, 8, 50000);
assert_eq!(property.get().unwrap(), 0);Sourcepub fn observer_count(&self) -> usize
pub fn observer_count(&self) -> usize
Returns the current number of active observers
This method is useful for debugging, monitoring, and testing to verify that observers are being properly managed and cleaned up.
§Returns
The number of currently subscribed observers, or 0 if the lock is poisoned.
§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;
let property = ObservableProperty::new(42);
assert_eq!(property.observer_count(), 0);
let id1 = property.subscribe(Arc::new(|_, _| {}))?;
assert_eq!(property.observer_count(), 1);
let id2 = property.subscribe(Arc::new(|_, _| {}))?;
assert_eq!(property.observer_count(), 2);
property.unsubscribe(id1)?;
assert_eq!(property.observer_count(), 1);Sourcepub fn try_get(&self) -> Option<T>
pub fn try_get(&self) -> Option<T>
Gets the current value without Result wrapping
This is a convenience method that returns None if the lock is poisoned
(which shouldn’t happen with graceful degradation) instead of a Result.
§Returns
Some(T) containing the current value, or None if somehow inaccessible.
§Examples
use observable_property::ObservableProperty;
let property = ObservableProperty::new(42);
assert_eq!(property.try_get(), Some(42));Sourcepub fn modify<F>(&self, f: F) -> Result<(), PropertyError>
pub fn modify<F>(&self, f: F) -> Result<(), PropertyError>
Atomically modifies the property value using a closure
This method allows you to update the property based on its current value in a single atomic operation. The closure receives a mutable reference to the value and can modify it in place.
§Arguments
f- A closure that receives&mut Tand modifies it
§Returns
Ok(()) if successful, or Err(PropertyError) if the lock is poisoned.
§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;
let counter = ObservableProperty::new(0);
counter.subscribe(Arc::new(|old, new| {
println!("Counter: {} -> {}", old, new);
}))?;
// Increment counter atomically
counter.modify(|value| *value += 1)?;
assert_eq!(counter.get()?, 1);
// Double the counter atomically
counter.modify(|value| *value *= 2)?;
assert_eq!(counter.get()?, 2);Source§impl<T: Clone + Default + Send + Sync + 'static> ObservableProperty<T>
impl<T: Clone + Default + Send + Sync + 'static> ObservableProperty<T>
Sourcepub fn get_or_default(&self) -> T
pub fn get_or_default(&self) -> T
Gets the current value or returns the default if inaccessible
This convenience method is only available when T implements Default.
It provides a fallback to T::default() if the value cannot be read.
§Examples
use observable_property::ObservableProperty;
let property = ObservableProperty::new(42);
assert_eq!(property.get_or_default(), 42);
// Even if somehow inaccessible, returns default
let empty_property: ObservableProperty<i32> = ObservableProperty::new(0);
assert_eq!(empty_property.get_or_default(), 0);Trait Implementations§
Source§impl<T: Clone> Clone for ObservableProperty<T>
impl<T: Clone> Clone for ObservableProperty<T>
Source§fn clone(&self) -> Self
fn clone(&self) -> Self
Creates a new reference to the same observable property
This creates a new ObservableProperty instance that shares the same
underlying data with the original. Changes made through either instance
will be visible to observers subscribed through both instances.
§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;
let property1 = ObservableProperty::new(42);
let property2 = property1.clone();
property2.subscribe(Arc::new(|old, new| {
println!("Observer on property2 saw change: {} -> {}", old, new);
})).map_err(|e| {
eprintln!("Failed to subscribe: {}", e);
e
})?;
// This change through property1 will trigger the observer on property2
property1.set(100).map_err(|e| {
eprintln!("Failed to set value: {}", e);
e
})?;1.0.0§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreAuto Trait Implementations§
impl<T> Freeze for ObservableProperty<T>
impl<T> RefUnwindSafe for ObservableProperty<T>
impl<T> Send for ObservableProperty<T>
impl<T> Sync for ObservableProperty<T>
impl<T> Unpin for ObservableProperty<T>
impl<T> UnwindSafe for ObservableProperty<T>
Blanket Implementations§
§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
§unsafe fn clone_to_uninit(&self, dest: *mut u8)
unsafe fn clone_to_uninit(&self, dest: *mut u8)
clone_to_uninit)