ObservableProperty

Struct ObservableProperty 

Source
pub struct ObservableProperty<T> { /* private fields */ }
Expand description

A thread-safe observable property that notifies observers when its value changes

This type wraps a value of type T and allows multiple observers to be notified whenever the value is modified. All operations are thread-safe and can be called from multiple threads concurrently.

§Type Requirements

The generic type T must implement:

  • Clone: Required for returning values and passing them to observers
  • Send: Required for transferring between threads
  • Sync: Required for concurrent access from multiple threads
  • 'static: Required for observer callbacks that may outlive the original scope

§Examples

use observable_property::ObservableProperty;
use std::sync::Arc;

let property = ObservableProperty::new("initial".to_string());

let observer_id = property.subscribe(Arc::new(|old, new| {
    println!("Changed from '{}' to '{}'", old, new);
})).map_err(|e| {
    eprintln!("Failed to subscribe: {}", e);
    e
})?;

property.set("updated".to_string()).map_err(|e| {
    eprintln!("Failed to set value: {}", e);
    e
})?; // Prints: Changed from 'initial' to 'updated'

property.unsubscribe(observer_id).map_err(|e| {
    eprintln!("Failed to unsubscribe: {}", e);
    e
})?;

Implementations§

Source§

impl<T: Clone + Send + Sync + 'static> ObservableProperty<T>

Source

pub fn new(initial_value: T) -> Self

Creates a new observable property with the given initial value

§Arguments
  • initial_value - The starting value for this property
§Examples
use observable_property::ObservableProperty;

let property = ObservableProperty::new(42);
match property.get() {
    Ok(value) => assert_eq!(value, 42),
    Err(e) => eprintln!("Failed to get property value: {}", e),
}
Examples found in repository?
examples/basic.rs (line 18)
16    fn new(name: String, age: i32) -> Self {
17        Self {
18            name: ObservableProperty::new(name),
19            age: ObservableProperty::new(age),
20        }
21    }
22
23    fn get_name(&self) -> Result<String, observable_property::PropertyError> {
24        self.name.get()
25    }
26
27    fn set_name(&self, new_name: String) -> Result<(), observable_property::PropertyError> {
28        self.name.set(new_name)
29    }
30
31    fn get_age(&self) -> Result<i32, observable_property::PropertyError> {
32        self.age.get()
33    }
34
35    fn set_age(&self, new_age: i32) -> Result<(), observable_property::PropertyError> {
36        self.age.set(new_age)
37    }
38
39    fn celebrate_birthday(&self) -> Result<(), observable_property::PropertyError> {
40        let current_age = self.age.get()?;
41        self.age.set(current_age + 1)
42    }
43}
44
45fn main() -> Result<(), Box<dyn std::error::Error>> {
46    println!("=== Observable Property Basic Example ===\n");
47
48    // Create a person
49    let person = Person::new("Alice".to_string(), 25);
50
51    // Subscribe to name changes
52    let name_observer_id = person.name.subscribe(Arc::new(|old_name, new_name| {
53        println!("📝 Name changed: '{}' → '{}'", old_name, new_name);
54    }))?;
55
56    // Subscribe to age changes
57    let age_observer_id = person.age.subscribe(Arc::new(|old_age, new_age| {
58        println!("🎂 Age changed: {} → {}", old_age, new_age);
59    }))?;
60
61    // Subscribe to only significant age changes (milestones)
62    let milestone_observer_id = person.age.subscribe_filtered(
63        Arc::new(|_old_age, new_age| {
64            println!("🎉 Milestone reached! {} is now a special age: {}", 
65                    if *new_age >= 18 { "Adult" } else { "Child" }, new_age);
66        }),
67        |old_age, new_age| {
68            // Notify on milestone ages
69            let milestones = [18, 21, 30, 40, 50, 65];
70            milestones.contains(new_age) || 
71            (milestones.contains(old_age) && !milestones.contains(new_age))
72        }
73    )?;
74
75    println!("Initial state:");
76    println!("  Name: {}", person.get_name()?);
77    println!("  Age: {}\n", person.get_age()?);
78
79    // Demonstrate property changes
80    println!("Making changes...\n");
81
82    // Change name
83    person.set_name("Alice Johnson".to_string())?;
84
85    // Age up a few times
86    person.celebrate_birthday()?;
87    person.celebrate_birthday()?;
88    person.celebrate_birthday()?;
89
90    // Change to milestone age
91    person.set_age(21)?;
92
93    // Direct property access
94    println!("\nDirect property access:");
95    let simple_property = ObservableProperty::new(100);
96    
97    simple_property.subscribe(Arc::new(|old, new| {
98        println!("💰 Value changed: {} → {}", old, new);
99    }))?;
100
101    simple_property.set(200)?;
102    simple_property.set(150)?;
103
104    // Cleanup observers
105    person.name.unsubscribe(name_observer_id)?;
106    person.age.unsubscribe(age_observer_id)?;
107    person.age.unsubscribe(milestone_observer_id)?;
108
109    println!("\n✅ Example completed successfully!");
110    Ok(())
111}
More examples
Hide additional examples
examples/multithreaded.rs (line 16)
14    fn new(name: String, age: i32) -> Self {
15        Self {
16            name: ObservableProperty::new(name),
17            age: ObservableProperty::new(age),
18        }
19    }
Source

pub fn with_max_threads(initial_value: T, max_threads: usize) -> Self

Creates a new observable property with a custom maximum thread count for async notifications

This constructor allows you to customize the maximum number of threads used for asynchronous observer notifications via set_async(). This is useful for tuning performance based on your specific use case and system constraints.

§Arguments
  • initial_value - The starting value for this property
  • max_threads - Maximum number of threads to use for async notifications. If 0 is provided, defaults to 4.
§Thread Pool Behavior

When set_async() is called, observers are divided into batches and each batch runs in its own thread, up to the specified maximum. For example:

  • With 100 observers and max_threads = 4: 4 threads with ~25 observers each
  • With 10 observers and max_threads = 8: 10 threads with 1 observer each
  • With 2 observers and max_threads = 4: 2 threads with 1 observer each
§Use Cases
§High-Throughput Systems
use observable_property::ObservableProperty;

// For systems with many CPU cores and CPU-bound observers
let property = ObservableProperty::with_max_threads(0, 8);
§Resource-Constrained Systems
use observable_property::ObservableProperty;

// For embedded systems or memory-constrained environments
let property = ObservableProperty::with_max_threads(42, 1);
§I/O-Heavy Observers
use observable_property::ObservableProperty;

// For observers that do network/database operations
let property = ObservableProperty::with_max_threads("data".to_string(), 16);
§Performance Considerations
  • Higher values: Better parallelism but more thread overhead and memory usage
  • Lower values: Less overhead but potentially slower async notifications
  • Optimal range: Typically between 1 and 2x the number of CPU cores
  • Zero value: Automatically uses the default value (4)
§Thread Safety

This setting only affects async notifications (set_async()). Synchronous operations (set()) always execute observers sequentially regardless of this setting.

§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;

// Create property with custom thread pool size
let property = ObservableProperty::with_max_threads(42, 2);

// Subscribe observers as usual
let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
    println!("Value changed: {} -> {}", old, new);
})).expect("Failed to create subscription");

// Async notifications will use at most 2 threads
property.set_async(100).expect("Failed to set value asynchronously");
Source

pub fn get(&self) -> Result<T, PropertyError>

Gets the current value of the property

This method acquires a read lock, which allows multiple concurrent readers but will block if a writer currently holds the lock.

§Returns

Ok(T) containing a clone of the current value, or Err(PropertyError) if the lock is poisoned.

§Examples
use observable_property::ObservableProperty;

let property = ObservableProperty::new("hello".to_string());
match property.get() {
    Ok(value) => assert_eq!(value, "hello"),
    Err(e) => eprintln!("Failed to get property value: {}", e),
}
Examples found in repository?
examples/basic.rs (line 24)
23    fn get_name(&self) -> Result<String, observable_property::PropertyError> {
24        self.name.get()
25    }
26
27    fn set_name(&self, new_name: String) -> Result<(), observable_property::PropertyError> {
28        self.name.set(new_name)
29    }
30
31    fn get_age(&self) -> Result<i32, observable_property::PropertyError> {
32        self.age.get()
33    }
34
35    fn set_age(&self, new_age: i32) -> Result<(), observable_property::PropertyError> {
36        self.age.set(new_age)
37    }
38
39    fn celebrate_birthday(&self) -> Result<(), observable_property::PropertyError> {
40        let current_age = self.age.get()?;
41        self.age.set(current_age + 1)
42    }
More examples
Hide additional examples
examples/multithreaded.rs (line 22)
21    fn get_name(&self) -> Result<String, observable_property::PropertyError> {
22        self.name.get()
23    }
24
25    fn set_name(&self, new_name: String) -> Result<(), observable_property::PropertyError> {
26        self.name.set(new_name)
27    }
28
29    fn get_age(&self) -> Result<i32, observable_property::PropertyError> {
30        self.age.get()
31    }
32
33    fn set_age(&self, new_age: i32) -> Result<(), observable_property::PropertyError> {
34        self.age.set(new_age)
35    }
36
37    fn celebrate_birthday(&self) -> Result<(), observable_property::PropertyError> {
38        let current_age = self.age.get()?;
39        self.age.set(current_age + 1)
40    }
Source

pub fn set(&self, new_value: T) -> Result<(), PropertyError>

Sets the property to a new value and notifies all observers

This method will:

  1. Acquire a write lock (blocking other readers/writers)
  2. Update the value and capture a snapshot of observers
  3. Release the lock
  4. Notify all observers sequentially with the old and new values

Observer notifications are wrapped in panic recovery to prevent one misbehaving observer from affecting others.

§Arguments
  • new_value - The new value to set
§Returns

Ok(()) if successful, or Err(PropertyError) if the lock is poisoned.

§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;

let property = ObservableProperty::new(10);

property.subscribe(Arc::new(|old, new| {
    println!("Value changed from {} to {}", old, new);
})).map_err(|e| {
    eprintln!("Failed to subscribe: {}", e);
    e
})?;

property.set(20).map_err(|e| {
    eprintln!("Failed to set property value: {}", e);
    e
})?; // Triggers observer notification
Examples found in repository?
examples/basic.rs (line 28)
27    fn set_name(&self, new_name: String) -> Result<(), observable_property::PropertyError> {
28        self.name.set(new_name)
29    }
30
31    fn get_age(&self) -> Result<i32, observable_property::PropertyError> {
32        self.age.get()
33    }
34
35    fn set_age(&self, new_age: i32) -> Result<(), observable_property::PropertyError> {
36        self.age.set(new_age)
37    }
38
39    fn celebrate_birthday(&self) -> Result<(), observable_property::PropertyError> {
40        let current_age = self.age.get()?;
41        self.age.set(current_age + 1)
42    }
43}
44
45fn main() -> Result<(), Box<dyn std::error::Error>> {
46    println!("=== Observable Property Basic Example ===\n");
47
48    // Create a person
49    let person = Person::new("Alice".to_string(), 25);
50
51    // Subscribe to name changes
52    let name_observer_id = person.name.subscribe(Arc::new(|old_name, new_name| {
53        println!("📝 Name changed: '{}' → '{}'", old_name, new_name);
54    }))?;
55
56    // Subscribe to age changes
57    let age_observer_id = person.age.subscribe(Arc::new(|old_age, new_age| {
58        println!("🎂 Age changed: {} → {}", old_age, new_age);
59    }))?;
60
61    // Subscribe to only significant age changes (milestones)
62    let milestone_observer_id = person.age.subscribe_filtered(
63        Arc::new(|_old_age, new_age| {
64            println!("🎉 Milestone reached! {} is now a special age: {}", 
65                    if *new_age >= 18 { "Adult" } else { "Child" }, new_age);
66        }),
67        |old_age, new_age| {
68            // Notify on milestone ages
69            let milestones = [18, 21, 30, 40, 50, 65];
70            milestones.contains(new_age) || 
71            (milestones.contains(old_age) && !milestones.contains(new_age))
72        }
73    )?;
74
75    println!("Initial state:");
76    println!("  Name: {}", person.get_name()?);
77    println!("  Age: {}\n", person.get_age()?);
78
79    // Demonstrate property changes
80    println!("Making changes...\n");
81
82    // Change name
83    person.set_name("Alice Johnson".to_string())?;
84
85    // Age up a few times
86    person.celebrate_birthday()?;
87    person.celebrate_birthday()?;
88    person.celebrate_birthday()?;
89
90    // Change to milestone age
91    person.set_age(21)?;
92
93    // Direct property access
94    println!("\nDirect property access:");
95    let simple_property = ObservableProperty::new(100);
96    
97    simple_property.subscribe(Arc::new(|old, new| {
98        println!("💰 Value changed: {} → {}", old, new);
99    }))?;
100
101    simple_property.set(200)?;
102    simple_property.set(150)?;
103
104    // Cleanup observers
105    person.name.unsubscribe(name_observer_id)?;
106    person.age.unsubscribe(age_observer_id)?;
107    person.age.unsubscribe(milestone_observer_id)?;
108
109    println!("\n✅ Example completed successfully!");
110    Ok(())
111}
More examples
Hide additional examples
examples/multithreaded.rs (line 26)
25    fn set_name(&self, new_name: String) -> Result<(), observable_property::PropertyError> {
26        self.name.set(new_name)
27    }
28
29    fn get_age(&self) -> Result<i32, observable_property::PropertyError> {
30        self.age.get()
31    }
32
33    fn set_age(&self, new_age: i32) -> Result<(), observable_property::PropertyError> {
34        self.age.set(new_age)
35    }
36
37    fn celebrate_birthday(&self) -> Result<(), observable_property::PropertyError> {
38        let current_age = self.age.get()?;
39        self.age.set(current_age + 1)
40    }
Source

pub fn set_async(&self, new_value: T) -> Result<(), PropertyError>

Sets the property to a new value and notifies observers asynchronously

This method is similar to set() but spawns observers in background threads for non-blocking operation. This is useful when observers might perform time-consuming operations.

Observers are batched into groups and each batch runs in its own thread to limit resource usage while still providing parallelism.

§Arguments
  • new_value - The new value to set
§Returns

Ok(()) if successful, or Err(PropertyError) if the lock is poisoned. Note that this only indicates the property was updated successfully; observer execution happens asynchronously.

§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;
use std::time::Duration;

let property = ObservableProperty::new(0);

property.subscribe(Arc::new(|old, new| {
    // This observer does slow work but won't block the caller
    std::thread::sleep(Duration::from_millis(100));
    println!("Slow observer: {} -> {}", old, new);
})).map_err(|e| {
    eprintln!("Failed to subscribe: {}", e);
    e
})?;

// This returns immediately even though observer is slow
property.set_async(42).map_err(|e| {
    eprintln!("Failed to set value asynchronously: {}", e);
    e
})?;
Source

pub fn subscribe( &self, observer: Observer<T>, ) -> Result<ObserverId, PropertyError>

Subscribes an observer function to be called when the property changes

The observer function will be called with the old and new values whenever the property is modified via set() or set_async().

§Arguments
  • observer - A function wrapped in Arc that takes (&T, &T) parameters
§Returns

Ok(ObserverId) containing a unique identifier for this observer, or Err(PropertyError) if the lock is poisoned.

§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;

let property = ObservableProperty::new(0);

let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
    println!("Property changed from {} to {}", old_value, new_value);
})).map_err(|e| {
    eprintln!("Failed to subscribe observer: {}", e);
    e
})?;

// Later, unsubscribe using the returned ID
property.unsubscribe(observer_id).map_err(|e| {
    eprintln!("Failed to unsubscribe observer: {}", e);
    e
})?;
Examples found in repository?
examples/multithreaded.rs (lines 50-52)
44fn main() -> Result<(), Box<dyn std::error::Error>> {
45    println!("=== Observable Property Multithreaded Person Example ===\n");
46
47    let person = Arc::new(Person::new("Bob".to_string(), 30));
48
49    // Subscribe to name changes
50    let name_observer_id = person.name.subscribe(Arc::new(|old_name, new_name| {
51        println!("📝 Name changed: '{}' → '{}'", old_name, new_name);
52    }))?;
53
54    // Subscribe to age changes
55    let age_observer_id = person.age.subscribe(Arc::new(|old_age, new_age| {
56        println!("🎂 Age changed: {} → {}", old_age, new_age);
57    }))?;
58
59    // Spawn threads to update name and age concurrently
60    let person_clone1 = person.clone();
61    let handle1 = thread::spawn(move || {
62        for i in 0..3 {
63            let new_name = format!("Bob #{}", i + 1);
64            person_clone1.set_name(new_name).unwrap();
65            thread::sleep(Duration::from_millis(50));
66        }
67    });
68
69    let person_clone2 = person.clone();
70    let handle2 = thread::spawn(move || {
71        for _ in 0..3 {
72            person_clone2.celebrate_birthday().unwrap();
73            thread::sleep(Duration::from_millis(30));
74        }
75    });
76
77    handle1.join().unwrap();
78    handle2.join().unwrap();
79
80    println!("\nFinal state:");
81    println!("  Name: {}", person.get_name()?);
82    println!("  Age: {}", person.get_age()?);
83
84    // Cleanup observers
85    person.name.unsubscribe(name_observer_id)?;
86    person.age.unsubscribe(age_observer_id)?;
87
88    println!("\n✅ Multithreaded Person example completed successfully!");
89    Ok(())
90}
More examples
Hide additional examples
examples/basic.rs (lines 52-54)
45fn main() -> Result<(), Box<dyn std::error::Error>> {
46    println!("=== Observable Property Basic Example ===\n");
47
48    // Create a person
49    let person = Person::new("Alice".to_string(), 25);
50
51    // Subscribe to name changes
52    let name_observer_id = person.name.subscribe(Arc::new(|old_name, new_name| {
53        println!("📝 Name changed: '{}' → '{}'", old_name, new_name);
54    }))?;
55
56    // Subscribe to age changes
57    let age_observer_id = person.age.subscribe(Arc::new(|old_age, new_age| {
58        println!("🎂 Age changed: {} → {}", old_age, new_age);
59    }))?;
60
61    // Subscribe to only significant age changes (milestones)
62    let milestone_observer_id = person.age.subscribe_filtered(
63        Arc::new(|_old_age, new_age| {
64            println!("🎉 Milestone reached! {} is now a special age: {}", 
65                    if *new_age >= 18 { "Adult" } else { "Child" }, new_age);
66        }),
67        |old_age, new_age| {
68            // Notify on milestone ages
69            let milestones = [18, 21, 30, 40, 50, 65];
70            milestones.contains(new_age) || 
71            (milestones.contains(old_age) && !milestones.contains(new_age))
72        }
73    )?;
74
75    println!("Initial state:");
76    println!("  Name: {}", person.get_name()?);
77    println!("  Age: {}\n", person.get_age()?);
78
79    // Demonstrate property changes
80    println!("Making changes...\n");
81
82    // Change name
83    person.set_name("Alice Johnson".to_string())?;
84
85    // Age up a few times
86    person.celebrate_birthday()?;
87    person.celebrate_birthday()?;
88    person.celebrate_birthday()?;
89
90    // Change to milestone age
91    person.set_age(21)?;
92
93    // Direct property access
94    println!("\nDirect property access:");
95    let simple_property = ObservableProperty::new(100);
96    
97    simple_property.subscribe(Arc::new(|old, new| {
98        println!("💰 Value changed: {} → {}", old, new);
99    }))?;
100
101    simple_property.set(200)?;
102    simple_property.set(150)?;
103
104    // Cleanup observers
105    person.name.unsubscribe(name_observer_id)?;
106    person.age.unsubscribe(age_observer_id)?;
107    person.age.unsubscribe(milestone_observer_id)?;
108
109    println!("\n✅ Example completed successfully!");
110    Ok(())
111}
Source

pub fn unsubscribe(&self, id: ObserverId) -> Result<bool, PropertyError>

Removes an observer identified by its ID

§Arguments
  • id - The observer ID returned by subscribe()
§Returns

Ok(bool) where true means the observer was found and removed, false means no observer with that ID existed. Returns Err(PropertyError) if the lock is poisoned.

§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;

let property = ObservableProperty::new(0);
let id = property.subscribe(Arc::new(|_, _| {})).map_err(|e| {
    eprintln!("Failed to subscribe: {}", e);
    e
})?;

let was_removed = property.unsubscribe(id).map_err(|e| {
    eprintln!("Failed to unsubscribe: {}", e);
    e
})?;
assert!(was_removed); // Observer existed and was removed

let was_removed_again = property.unsubscribe(id).map_err(|e| {
    eprintln!("Failed to unsubscribe again: {}", e);
    e
})?;
assert!(!was_removed_again); // Observer no longer exists
Examples found in repository?
examples/multithreaded.rs (line 85)
44fn main() -> Result<(), Box<dyn std::error::Error>> {
45    println!("=== Observable Property Multithreaded Person Example ===\n");
46
47    let person = Arc::new(Person::new("Bob".to_string(), 30));
48
49    // Subscribe to name changes
50    let name_observer_id = person.name.subscribe(Arc::new(|old_name, new_name| {
51        println!("📝 Name changed: '{}' → '{}'", old_name, new_name);
52    }))?;
53
54    // Subscribe to age changes
55    let age_observer_id = person.age.subscribe(Arc::new(|old_age, new_age| {
56        println!("🎂 Age changed: {} → {}", old_age, new_age);
57    }))?;
58
59    // Spawn threads to update name and age concurrently
60    let person_clone1 = person.clone();
61    let handle1 = thread::spawn(move || {
62        for i in 0..3 {
63            let new_name = format!("Bob #{}", i + 1);
64            person_clone1.set_name(new_name).unwrap();
65            thread::sleep(Duration::from_millis(50));
66        }
67    });
68
69    let person_clone2 = person.clone();
70    let handle2 = thread::spawn(move || {
71        for _ in 0..3 {
72            person_clone2.celebrate_birthday().unwrap();
73            thread::sleep(Duration::from_millis(30));
74        }
75    });
76
77    handle1.join().unwrap();
78    handle2.join().unwrap();
79
80    println!("\nFinal state:");
81    println!("  Name: {}", person.get_name()?);
82    println!("  Age: {}", person.get_age()?);
83
84    // Cleanup observers
85    person.name.unsubscribe(name_observer_id)?;
86    person.age.unsubscribe(age_observer_id)?;
87
88    println!("\n✅ Multithreaded Person example completed successfully!");
89    Ok(())
90}
More examples
Hide additional examples
examples/basic.rs (line 105)
45fn main() -> Result<(), Box<dyn std::error::Error>> {
46    println!("=== Observable Property Basic Example ===\n");
47
48    // Create a person
49    let person = Person::new("Alice".to_string(), 25);
50
51    // Subscribe to name changes
52    let name_observer_id = person.name.subscribe(Arc::new(|old_name, new_name| {
53        println!("📝 Name changed: '{}' → '{}'", old_name, new_name);
54    }))?;
55
56    // Subscribe to age changes
57    let age_observer_id = person.age.subscribe(Arc::new(|old_age, new_age| {
58        println!("🎂 Age changed: {} → {}", old_age, new_age);
59    }))?;
60
61    // Subscribe to only significant age changes (milestones)
62    let milestone_observer_id = person.age.subscribe_filtered(
63        Arc::new(|_old_age, new_age| {
64            println!("🎉 Milestone reached! {} is now a special age: {}", 
65                    if *new_age >= 18 { "Adult" } else { "Child" }, new_age);
66        }),
67        |old_age, new_age| {
68            // Notify on milestone ages
69            let milestones = [18, 21, 30, 40, 50, 65];
70            milestones.contains(new_age) || 
71            (milestones.contains(old_age) && !milestones.contains(new_age))
72        }
73    )?;
74
75    println!("Initial state:");
76    println!("  Name: {}", person.get_name()?);
77    println!("  Age: {}\n", person.get_age()?);
78
79    // Demonstrate property changes
80    println!("Making changes...\n");
81
82    // Change name
83    person.set_name("Alice Johnson".to_string())?;
84
85    // Age up a few times
86    person.celebrate_birthday()?;
87    person.celebrate_birthday()?;
88    person.celebrate_birthday()?;
89
90    // Change to milestone age
91    person.set_age(21)?;
92
93    // Direct property access
94    println!("\nDirect property access:");
95    let simple_property = ObservableProperty::new(100);
96    
97    simple_property.subscribe(Arc::new(|old, new| {
98        println!("💰 Value changed: {} → {}", old, new);
99    }))?;
100
101    simple_property.set(200)?;
102    simple_property.set(150)?;
103
104    // Cleanup observers
105    person.name.unsubscribe(name_observer_id)?;
106    person.age.unsubscribe(age_observer_id)?;
107    person.age.unsubscribe(milestone_observer_id)?;
108
109    println!("\n✅ Example completed successfully!");
110    Ok(())
111}
Source

pub fn subscribe_filtered<F>( &self, observer: Observer<T>, filter: F, ) -> Result<ObserverId, PropertyError>
where F: Fn(&T, &T) -> bool + Send + Sync + 'static,

Subscribes an observer that only gets called when a filter condition is met

This is useful for observing only specific types of changes, such as when a value increases or crosses a threshold.

§Arguments
  • observer - The observer function to call when the filter passes
  • filter - A predicate function that receives (old_value, new_value) and returns bool
§Returns

Ok(ObserverId) for the filtered observer, or Err(PropertyError) if the lock is poisoned.

§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;

let property = ObservableProperty::new(0);

// Only notify when value increases
let id = property.subscribe_filtered(
    Arc::new(|old, new| println!("Value increased: {} -> {}", old, new)),
    |old, new| new > old
).map_err(|e| {
    eprintln!("Failed to subscribe filtered observer: {}", e);
    e
})?;

property.set(10).map_err(|e| {
    eprintln!("Failed to set value: {}", e);
    e
})?; // Triggers observer (0 -> 10)
property.set(5).map_err(|e| {
    eprintln!("Failed to set value: {}", e);
    e
})?;  // Does NOT trigger observer (10 -> 5)
property.set(15).map_err(|e| {
    eprintln!("Failed to set value: {}", e);
    e
})?; // Triggers observer (5 -> 15)
Examples found in repository?
examples/basic.rs (lines 62-73)
45fn main() -> Result<(), Box<dyn std::error::Error>> {
46    println!("=== Observable Property Basic Example ===\n");
47
48    // Create a person
49    let person = Person::new("Alice".to_string(), 25);
50
51    // Subscribe to name changes
52    let name_observer_id = person.name.subscribe(Arc::new(|old_name, new_name| {
53        println!("📝 Name changed: '{}' → '{}'", old_name, new_name);
54    }))?;
55
56    // Subscribe to age changes
57    let age_observer_id = person.age.subscribe(Arc::new(|old_age, new_age| {
58        println!("🎂 Age changed: {} → {}", old_age, new_age);
59    }))?;
60
61    // Subscribe to only significant age changes (milestones)
62    let milestone_observer_id = person.age.subscribe_filtered(
63        Arc::new(|_old_age, new_age| {
64            println!("🎉 Milestone reached! {} is now a special age: {}", 
65                    if *new_age >= 18 { "Adult" } else { "Child" }, new_age);
66        }),
67        |old_age, new_age| {
68            // Notify on milestone ages
69            let milestones = [18, 21, 30, 40, 50, 65];
70            milestones.contains(new_age) || 
71            (milestones.contains(old_age) && !milestones.contains(new_age))
72        }
73    )?;
74
75    println!("Initial state:");
76    println!("  Name: {}", person.get_name()?);
77    println!("  Age: {}\n", person.get_age()?);
78
79    // Demonstrate property changes
80    println!("Making changes...\n");
81
82    // Change name
83    person.set_name("Alice Johnson".to_string())?;
84
85    // Age up a few times
86    person.celebrate_birthday()?;
87    person.celebrate_birthday()?;
88    person.celebrate_birthday()?;
89
90    // Change to milestone age
91    person.set_age(21)?;
92
93    // Direct property access
94    println!("\nDirect property access:");
95    let simple_property = ObservableProperty::new(100);
96    
97    simple_property.subscribe(Arc::new(|old, new| {
98        println!("💰 Value changed: {} → {}", old, new);
99    }))?;
100
101    simple_property.set(200)?;
102    simple_property.set(150)?;
103
104    // Cleanup observers
105    person.name.unsubscribe(name_observer_id)?;
106    person.age.unsubscribe(age_observer_id)?;
107    person.age.unsubscribe(milestone_observer_id)?;
108
109    println!("\n✅ Example completed successfully!");
110    Ok(())
111}
Source

pub fn notify_observers_batch( &self, changes: Vec<(T, T)>, ) -> Result<(), PropertyError>

Source

pub fn subscribe_with_subscription( &self, observer: Observer<T>, ) -> Result<Subscription<T>, PropertyError>

Subscribes an observer and returns a RAII guard for automatic cleanup

This method is similar to subscribe() but returns a Subscription object that automatically removes the observer when it goes out of scope. This provides a more convenient and safer alternative to manual subscription management.

§Arguments
  • observer - A function wrapped in Arc that takes (&T, &T) parameters
§Returns

Ok(Subscription<T>) containing a RAII guard for the observer, or Err(PropertyError) if the lock is poisoned.

§Examples
§Basic RAII Subscription
use observable_property::ObservableProperty;
use std::sync::Arc;

let property = ObservableProperty::new(0);

{
    let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
        println!("Value: {} -> {}", old, new);
    }))?;

    property.set(42)?; // Prints: "Value: 0 -> 42"
    property.set(100)?; // Prints: "Value: 42 -> 100"

    // Automatic cleanup when _subscription goes out of scope
}

property.set(200)?; // No output - subscription was cleaned up
§Comparison with Manual Management
use observable_property::ObservableProperty;
use std::sync::Arc;

let property = ObservableProperty::new("initial".to_string());

// Method 1: Manual subscription management (traditional approach)
let observer_id = property.subscribe(Arc::new(|old, new| {
    println!("Manual: {} -> {}", old, new);
}))?;

// Method 2: RAII subscription management (recommended)
let _subscription = property.subscribe_with_subscription(Arc::new(|old, new| {
    println!("RAII: {} -> {}", old, new);
}))?;

// Both observers will be called
property.set("changed".to_string())?;
// Prints:
// "Manual: initial -> changed"
// "RAII: initial -> changed"

// Manual cleanup required for first observer
property.unsubscribe(observer_id)?;

// Second observer (_subscription) is automatically cleaned up when
// the variable goes out of scope - no manual intervention needed
§Error Handling with Early Returns
use observable_property::ObservableProperty;
use std::sync::Arc;

fn process_with_monitoring(property: &ObservableProperty<i32>) -> Result<(), observable_property::PropertyError> {
    let _monitoring = property.subscribe_with_subscription(Arc::new(|old, new| {
        println!("Processing: {} -> {}", old, new);
    }))?;

    property.set(1)?;
     
    if property.get()? > 0 {
        return Ok(()); // Subscription automatically cleaned up on early return
    }

    property.set(2)?;
    Ok(()) // Subscription automatically cleaned up on normal return
}

let property = ObservableProperty::new(0);
process_with_monitoring(&property)?; // Monitoring active only during function call
property.set(99)?; // No monitoring output - subscription was cleaned up
§Multi-threaded Subscription Management
use observable_property::ObservableProperty;
use std::sync::Arc;
use std::thread;

let property = Arc::new(ObservableProperty::new(0));
let property_clone = property.clone();

let handle = thread::spawn(move || -> Result<(), observable_property::PropertyError> {
    let _subscription = property_clone.subscribe_with_subscription(Arc::new(|old, new| {
        println!("Thread observer: {} -> {}", old, new);
    }))?;

    property_clone.set(42)?; // Prints: "Thread observer: 0 -> 42"
     
    // Subscription automatically cleaned up when thread ends
    Ok(())
});

handle.join().unwrap()?;
property.set(100)?; // No output - thread subscription was cleaned up
§Use Cases

This method is particularly useful in scenarios such as:

  • Temporary observers that should be active only during a specific scope
  • Error-prone code where manual cleanup might be forgotten
  • Complex control flow where multiple exit points make manual cleanup difficult
  • Resource-constrained environments where observer leaks are problematic
Source

pub fn subscribe_filtered_with_subscription<F>( &self, observer: Observer<T>, filter: F, ) -> Result<Subscription<T>, PropertyError>
where F: Fn(&T, &T) -> bool + Send + Sync + 'static,

Subscribes a filtered observer and returns a RAII guard for automatic cleanup

This method combines the functionality of subscribe_filtered() with the automatic cleanup benefits of subscribe_with_subscription(). The observer will only be called when the filter condition is satisfied, and it will be automatically unsubscribed when the returned Subscription goes out of scope.

§Arguments
  • observer - The observer function to call when the filter passes
  • filter - A predicate function that receives (old_value, new_value) and returns bool
§Returns

Ok(Subscription<T>) containing a RAII guard for the filtered observer, or Err(PropertyError) if the lock is poisoned.

§Examples
§Basic Filtered RAII Subscription
use observable_property::ObservableProperty;
use std::sync::Arc;

let counter = ObservableProperty::new(0);

{
    // Monitor only increases with automatic cleanup
    let _increase_monitor = counter.subscribe_filtered_with_subscription(
        Arc::new(|old, new| {
            println!("Counter increased: {} -> {}", old, new);
        }),
        |old, new| new > old
    )?;

    counter.set(5)?;  // Prints: "Counter increased: 0 -> 5"
    counter.set(3)?;  // No output (decrease)
    counter.set(7)?;  // Prints: "Counter increased: 3 -> 7"

    // Subscription automatically cleaned up when leaving scope
}

counter.set(10)?; // No output - subscription was cleaned up
§Multi-Condition Temperature Monitoring
use observable_property::ObservableProperty;
use std::sync::Arc;

let temperature = ObservableProperty::new(20.0_f64);

{
    // Create filtered subscription that only triggers for significant temperature increases
    let _heat_warning = temperature.subscribe_filtered_with_subscription(
        Arc::new(|old_temp, new_temp| {
            println!("🔥 Heat warning! Temperature rose from {:.1}°C to {:.1}°C",
                     old_temp, new_temp);
        }),
        |old, new| new > old && (new - old) > 5.0  // Only trigger for increases > 5°C
    )?;

    // Create another filtered subscription for cooling alerts
    let _cooling_alert = temperature.subscribe_filtered_with_subscription(
        Arc::new(|old_temp, new_temp| {
            println!("❄️ Cooling alert! Temperature dropped from {:.1}°C to {:.1}°C",
                     old_temp, new_temp);
        }),
        |old, new| new < old && (old - new) > 3.0  // Only trigger for decreases > 3°C
    )?;

    // Test the filters
    temperature.set(22.0)?; // No alerts (increase of only 2°C)
    temperature.set(28.0)?; // Heat warning triggered (increase of 6°C from 22°C)
    temperature.set(23.0)?; // Cooling alert triggered (decrease of 5°C)

    // Both subscriptions are automatically cleaned up when they go out of scope
}

temperature.set(35.0)?; // No alerts - subscriptions were cleaned up
§Conditional Monitoring with Complex Filters
use observable_property::ObservableProperty;
use std::sync::Arc;

let stock_price = ObservableProperty::new(100.0_f64);

{
    // Monitor significant price movements (> 5% change)
    let _volatility_alert = stock_price.subscribe_filtered_with_subscription(
        Arc::new(|old_price, new_price| {
            let change_percent = ((new_price - old_price) / old_price * 100.0).abs();
            println!("📈 Significant price movement: ${:.2} -> ${:.2} ({:.1}%)",
                    old_price, new_price, change_percent);
        }),
        |old, new| {
            let change_percent = ((new - old) / old * 100.0).abs();
            change_percent > 5.0  // Trigger on > 5% change
        }
    )?;

    stock_price.set(103.0)?; // No alert (3% change)
    stock_price.set(108.0)?; // Alert triggered (4.85% from 103, but let's say it rounds up)
    stock_price.set(95.0)?;  // Alert triggered (12% decrease)

    // Subscription automatically cleaned up when leaving scope
}

stock_price.set(200.0)?; // No alert - monitoring ended
§Cross-Thread Filtered Monitoring
use observable_property::ObservableProperty;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

let network_latency = Arc::new(ObservableProperty::new(50)); // milliseconds
let latency_clone = network_latency.clone();

let monitor_handle = thread::spawn(move || -> Result<(), observable_property::PropertyError> {
    // Monitor high latency in background thread with automatic cleanup
    let _high_latency_alert = latency_clone.subscribe_filtered_with_subscription(
        Arc::new(|old_ms, new_ms| {
            println!("⚠️ High latency detected: {}ms -> {}ms", old_ms, new_ms);
        }),
        |_, new| *new > 100  // Alert when latency exceeds 100ms
    )?;

    // Simulate monitoring for a short time
    thread::sleep(Duration::from_millis(10));
     
    // Subscription automatically cleaned up when thread ends
    Ok(())
});

// Simulate network conditions
network_latency.set(80)?;  // No alert (under threshold)
network_latency.set(150)?; // Alert triggered in background thread

monitor_handle.join().unwrap()?;
network_latency.set(200)?; // No alert - background monitoring ended
§Use Cases

This method is ideal for:

  • Threshold-based monitoring with automatic cleanup
  • Temporary conditional observers in specific code blocks
  • Event-driven systems where observers should be active only during certain phases
  • Resource management scenarios where filtered observers have limited lifetimes
§Performance Notes

The filter function is evaluated for every property change, so it should be lightweight. Complex filtering logic should be optimized to avoid performance bottlenecks, especially in high-frequency update scenarios.

Trait Implementations§

Source§

impl<T: Clone> Clone for ObservableProperty<T>

Source§

fn clone(&self) -> Self

Creates a new reference to the same observable property

This creates a new ObservableProperty instance that shares the same underlying data with the original. Changes made through either instance will be visible to observers subscribed through both instances.

§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;

let property1 = ObservableProperty::new(42);
let property2 = property1.clone();

property2.subscribe(Arc::new(|old, new| {
    println!("Observer on property2 saw change: {} -> {}", old, new);
})).map_err(|e| {
    eprintln!("Failed to subscribe: {}", e);
    e
})?;

// This change through property1 will trigger the observer on property2
property1.set(100).map_err(|e| {
    eprintln!("Failed to set value: {}", e);
    e
})?;
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<T: Clone + Debug + Send + Sync + 'static> Debug for ObservableProperty<T>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Debug implementation that shows the current value if accessible

Auto Trait Implementations§

§

impl<T> Freeze for ObservableProperty<T>

§

impl<T> RefUnwindSafe for ObservableProperty<T>

§

impl<T> Send for ObservableProperty<T>
where T: Send + Sync,

§

impl<T> Sync for ObservableProperty<T>
where T: Send + Sync,

§

impl<T> Unpin for ObservableProperty<T>

§

impl<T> UnwindSafe for ObservableProperty<T>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.