Expand description
§Observable Property
A thread-safe observable property implementation for Rust that allows you to observe changes to values across multiple threads.
§Features
- Thread-safe: Uses
Arc<RwLock<>>for safe concurrent access - Observer pattern: Subscribe to property changes with callbacks
- RAII subscriptions: Automatic cleanup with subscription guards (no manual unsubscribe needed)
- Filtered observers: Only notify when specific conditions are met
- Async notifications: Non-blocking observer notifications with background threads
- Panic isolation: Observer panics don’t crash the system
- Graceful lock recovery: Continues operation even after lock poisoning from panics
- Memory protection: Observer limit (10,000) prevents memory exhaustion
- Type-safe: Generic implementation works with any
Clone + Send + Synctype
§Quick Start
use observable_property::ObservableProperty;
use std::sync::Arc;
// Create an observable property
let property = ObservableProperty::new(42);
// Subscribe to changes
let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
println!("Value changed from {} to {}", old_value, new_value);
})).map_err(|e| {
eprintln!("Failed to subscribe: {}", e);
e
})?;
// Change the value (triggers observer)
property.set(100).map_err(|e| {
eprintln!("Failed to set value: {}", e);
e
})?;
// Unsubscribe when done
property.unsubscribe(observer_id).map_err(|e| {
eprintln!("Failed to unsubscribe: {}", e);
e
})?;§Multi-threading Example
use observable_property::ObservableProperty;
use std::sync::Arc;
use std::thread;
let property = Arc::new(ObservableProperty::new(0));
let property_clone = property.clone();
// Subscribe from one thread
property.subscribe(Arc::new(|old, new| {
println!("Value changed: {} -> {}", old, new);
})).map_err(|e| {
eprintln!("Failed to subscribe: {}", e);
e
})?;
// Modify from another thread
thread::spawn(move || {
if let Err(e) = property_clone.set(42) {
eprintln!("Failed to set value: {}", e);
}
}).join().expect("Thread panicked");§RAII Subscriptions (Recommended)
For automatic cleanup without manual unsubscribe calls, use RAII subscriptions:
use observable_property::ObservableProperty;
use std::sync::Arc;
let property = ObservableProperty::new(0);
{
// Create RAII subscription - automatically cleaned up when dropped
let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
println!("Value changed: {} -> {}", old, new);
}))?;
property.set(42)?; // Prints: "Value changed: 0 -> 42"
// Subscription automatically unsubscribes when leaving this scope
}
// No observer active anymore
property.set(100)?; // No output§Filtered RAII Subscriptions
Combine filtering with automatic cleanup for conditional monitoring:
use observable_property::ObservableProperty;
use std::sync::Arc;
let temperature = ObservableProperty::new(20.0);
{
// Monitor only significant temperature increases with automatic cleanup
let _heat_warning = temperature.subscribe_filtered_with_subscription(
Arc::new(|old, new| {
println!("🔥 Heat warning! {:.1}°C -> {:.1}°C", old, new);
}),
|old, new| new > old && (new - old) > 5.0
)?;
temperature.set(22.0)?; // No warning (only 2°C increase)
temperature.set(28.0)?; // Prints warning (6°C increase from 22°C)
// Subscription automatically cleaned up here
}
temperature.set(35.0)?; // No warning (subscription was cleaned up)§Subscription Management Comparison
use observable_property::ObservableProperty;
use std::sync::Arc;
let property = ObservableProperty::new(0);
let observer = Arc::new(|old: &i32, new: &i32| {
println!("Value: {} -> {}", old, new);
});
// Method 1: Manual subscription management
let observer_id = property.subscribe(observer.clone())?;
property.set(42)?;
property.unsubscribe(observer_id)?; // Manual cleanup required
// Method 2: RAII subscription management (recommended)
{
let _subscription = property.subscribe_with_subscription(observer)?;
property.set(100)?;
// Automatic cleanup when _subscription goes out of scope
}§Advanced RAII Patterns
Comprehensive example showing various RAII subscription patterns:
use observable_property::ObservableProperty;
use std::sync::Arc;
// System monitoring example
let cpu_usage = ObservableProperty::new(25.0f64); // percentage
let memory_usage = ObservableProperty::new(1024); // MB
let active_connections = ObservableProperty::new(0u32);
// Conditional monitoring based on system state
let high_load_monitoring = cpu_usage.get()? > 50.0;
if high_load_monitoring {
// Critical system monitoring - active only during high load
let _cpu_critical = cpu_usage.subscribe_filtered_with_subscription(
Arc::new(|old, new| {
println!("🚨 Critical CPU usage: {:.1}% -> {:.1}%", old, new);
}),
|_, new| *new > 80.0
)?;
let _memory_warning = memory_usage.subscribe_filtered_with_subscription(
Arc::new(|old, new| {
println!("⚠️ High memory usage: {}MB -> {}MB", old, new);
}),
|_, new| *new > 8192 // > 8GB
)?;
// Simulate system load changes
cpu_usage.set(85.0)?; // Would trigger critical alert
memory_usage.set(9216)?; // Would trigger memory warning
// All monitoring automatically stops when exiting this block
}
// Connection monitoring with scoped lifetime
{
let _connection_monitor = active_connections.subscribe_with_subscription(
Arc::new(|old, new| {
if new > old {
println!("📈 New connections: {} -> {}", old, new);
} else if new < old {
println!("📉 Connections closed: {} -> {}", old, new);
}
})
)?;
active_connections.set(5)?; // Prints: "📈 New connections: 0 -> 5"
active_connections.set(3)?; // Prints: "📉 Connections closed: 5 -> 3"
active_connections.set(8)?; // Prints: "📈 New connections: 3 -> 8"
// Connection monitoring automatically stops here
}
// No monitoring active anymore
cpu_usage.set(95.0)?; // No output
memory_usage.set(10240)?; // No output
active_connections.set(15)?; // No output
println!("All monitoring automatically cleaned up!");Structs§
- Observable
Property - A thread-safe observable property that notifies observers when its value changes
- Property
Event - Represents a single change event in the property’s history
- Property
Metrics - Performance metrics for an observable property
- Subscription
- A RAII guard for an observer subscription that automatically unsubscribes when dropped
Enums§
- Property
Error - Errors that can occur when working with ObservableProperty
Traits§
- Property
Persistence - Trait for implementing property value persistence
Functions§
- computed
- Creates a computed property that automatically recomputes when any dependency changes
Type Aliases§
- Observer
- Function type for observers that get called when property values change
- Observer
Id - Unique identifier for registered observers