Skip to main content

Crate observable_property

Crate observable_property 

Source
Expand description

§Observable Property

A thread-safe observable property implementation for Rust that allows you to observe changes to values across multiple threads.

§Features

  • Thread-safe: Uses Arc<RwLock<>> for safe concurrent access
  • Observer pattern: Subscribe to property changes with callbacks
  • RAII subscriptions: Automatic cleanup with subscription guards (no manual unsubscribe needed)
  • Filtered observers: Only notify when specific conditions are met
  • Async notifications: Non-blocking observer notifications with background threads
  • Panic isolation: Observer panics don’t crash the system
  • Graceful lock recovery: Continues operation even after lock poisoning from panics
  • Memory protection: Observer limit (10,000) prevents memory exhaustion
  • Type-safe: Generic implementation works with any Clone + Send + Sync type

§Quick Start

use observable_property::ObservableProperty;
use std::sync::Arc;

// Create an observable property
let property = ObservableProperty::new(42);

// Subscribe to changes
let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
    println!("Value changed from {} to {}", old_value, new_value);
})).map_err(|e| {
    eprintln!("Failed to subscribe: {}", e);
    e
})?;

// Change the value (triggers observer)
property.set(100).map_err(|e| {
    eprintln!("Failed to set value: {}", e);
    e
})?;

// Unsubscribe when done
property.unsubscribe(observer_id).map_err(|e| {
    eprintln!("Failed to unsubscribe: {}", e);
    e
})?;

§Multi-threading Example

use observable_property::ObservableProperty;
use std::sync::Arc;
use std::thread;

let property = Arc::new(ObservableProperty::new(0));
let property_clone = property.clone();

// Subscribe from one thread
property.subscribe(Arc::new(|old, new| {
    println!("Value changed: {} -> {}", old, new);
})).map_err(|e| {
    eprintln!("Failed to subscribe: {}", e);
    e
})?;

// Modify from another thread
thread::spawn(move || {
    if let Err(e) = property_clone.set(42) {
        eprintln!("Failed to set value: {}", e);
    }
}).join().expect("Thread panicked");

For automatic cleanup without manual unsubscribe calls, use RAII subscriptions:

use observable_property::ObservableProperty;
use std::sync::Arc;

let property = ObservableProperty::new(0);

{
    // Create RAII subscription - automatically cleaned up when dropped
    let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
        println!("Value changed: {} -> {}", old, new);
    }))?;

    property.set(42)?; // Prints: "Value changed: 0 -> 42"

    // Subscription automatically unsubscribes when leaving this scope
}

// No observer active anymore
property.set(100)?; // No output

§Filtered RAII Subscriptions

Combine filtering with automatic cleanup for conditional monitoring:

use observable_property::ObservableProperty;
use std::sync::Arc;

let temperature = ObservableProperty::new(20.0);

{
    // Monitor only significant temperature increases with automatic cleanup
    let _heat_warning = temperature.subscribe_filtered_with_subscription(
        Arc::new(|old, new| {
            println!("🔥 Heat warning! {:.1}°C -> {:.1}°C", old, new);
        }),
        |old, new| new > old && (new - old) > 5.0
    )?;

    temperature.set(22.0)?; // No warning (only 2°C increase)
    temperature.set(28.0)?; // Prints warning (6°C increase from 22°C)

    // Subscription automatically cleaned up here
}

temperature.set(35.0)?; // No warning (subscription was cleaned up)

§Subscription Management Comparison

use observable_property::ObservableProperty;
use std::sync::Arc;

let property = ObservableProperty::new(0);
let observer = Arc::new(|old: &i32, new: &i32| {
    println!("Value: {} -> {}", old, new);
});

// Method 1: Manual subscription management
let observer_id = property.subscribe(observer.clone())?;
property.set(42)?;
property.unsubscribe(observer_id)?; // Manual cleanup required

// Method 2: RAII subscription management (recommended)
{
    let _subscription = property.subscribe_with_subscription(observer)?;
    property.set(100)?;
    // Automatic cleanup when _subscription goes out of scope
}

§Advanced RAII Patterns

Comprehensive example showing various RAII subscription patterns:

use observable_property::ObservableProperty;
use std::sync::Arc;

// System monitoring example
let cpu_usage = ObservableProperty::new(25.0f64); // percentage
let memory_usage = ObservableProperty::new(1024); // MB
let active_connections = ObservableProperty::new(0u32);

// Conditional monitoring based on system state
let high_load_monitoring = cpu_usage.get()? > 50.0;

if high_load_monitoring {
    // Critical system monitoring - active only during high load
    let _cpu_critical = cpu_usage.subscribe_filtered_with_subscription(
        Arc::new(|old, new| {
            println!("🚨 Critical CPU usage: {:.1}% -> {:.1}%", old, new);
        }),
        |_, new| *new > 80.0
    )?;

    let _memory_warning = memory_usage.subscribe_filtered_with_subscription(
        Arc::new(|old, new| {
            println!("⚠️ High memory usage: {}MB -> {}MB", old, new);
        }),
        |_, new| *new > 8192 // > 8GB
    )?;

    // Simulate system load changes
    cpu_usage.set(85.0)?;     // Would trigger critical alert
    memory_usage.set(9216)?;  // Would trigger memory warning
     
    // All monitoring automatically stops when exiting this block
}

// Connection monitoring with scoped lifetime
{
    let _connection_monitor = active_connections.subscribe_with_subscription(
        Arc::new(|old, new| {
            if new > old {
                println!("📈 New connections: {} -> {}", old, new);
            } else if new < old {
                println!("📉 Connections closed: {} -> {}", old, new);
            }
        })
    )?;

    active_connections.set(5)?;  // Prints: "📈 New connections: 0 -> 5"
    active_connections.set(3)?;  // Prints: "📉 Connections closed: 5 -> 3"
    active_connections.set(8)?;  // Prints: "📈 New connections: 3 -> 8"

    // Connection monitoring automatically stops here
}

// No monitoring active anymore
cpu_usage.set(95.0)?;         // No output
memory_usage.set(10240)?;     // No output  
active_connections.set(15)?;  // No output

println!("All monitoring automatically cleaned up!");

Structs§

ObservableProperty
A thread-safe observable property that notifies observers when its value changes
PropertyEvent
Represents a single change event in the property’s history
PropertyMetrics
Performance metrics for an observable property
Subscription
A RAII guard for an observer subscription that automatically unsubscribes when dropped

Enums§

PropertyError
Errors that can occur when working with ObservableProperty

Traits§

PropertyPersistence
Trait for implementing property value persistence

Functions§

computed
Creates a computed property that automatically recomputes when any dependency changes

Type Aliases§

Observer
Function type for observers that get called when property values change
ObserverId
Unique identifier for registered observers