pub struct ObservableProperty<T> { /* private fields */ }Expand description
A thread-safe observable property that notifies observers when its value changes
This type wraps a value of type T and allows multiple observers to be notified
whenever the value is modified. All operations are thread-safe and can be called
from multiple threads concurrently.
§Type Requirements
The generic type T must implement:
Clone: Required for returning values and passing them to observersSend: Required for transferring between threadsSync: Required for concurrent access from multiple threads'static: Required for observer callbacks that may outlive the original scope
§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;
let property = ObservableProperty::new("initial".to_string());
let observer_id = property.subscribe(Arc::new(|old, new| {
println!("Changed from '{}' to '{}'", old, new);
})).unwrap();
property.set("updated".to_string()).unwrap(); // Prints: Changed from 'initial' to 'updated'
property.unsubscribe(observer_id).unwrap();Implementations§
Source§impl<T: Clone + Send + Sync + 'static> ObservableProperty<T>
impl<T: Clone + Send + Sync + 'static> ObservableProperty<T>
Sourcepub fn new(initial_value: T) -> Self
pub fn new(initial_value: T) -> Self
Creates a new observable property with the given initial value
§Arguments
initial_value- The starting value for this property
§Examples
use observable_property::ObservableProperty;
let property = ObservableProperty::new(42);
assert_eq!(property.get().unwrap(), 42);Examples found in repository?
16 fn new(name: String, age: i32) -> Self {
17 Self {
18 name: ObservableProperty::new(name),
19 age: ObservableProperty::new(age),
20 }
21 }
22
23 fn get_name(&self) -> Result<String, observable_property::PropertyError> {
24 self.name.get()
25 }
26
27 fn set_name(&self, new_name: String) -> Result<(), observable_property::PropertyError> {
28 self.name.set(new_name)
29 }
30
31 fn get_age(&self) -> Result<i32, observable_property::PropertyError> {
32 self.age.get()
33 }
34
35 fn set_age(&self, new_age: i32) -> Result<(), observable_property::PropertyError> {
36 self.age.set(new_age)
37 }
38
39 fn celebrate_birthday(&self) -> Result<(), observable_property::PropertyError> {
40 let current_age = self.age.get()?;
41 self.age.set(current_age + 1)
42 }
43}
44
45fn main() -> Result<(), Box<dyn std::error::Error>> {
46 println!("=== Observable Property Basic Example ===\n");
47
48 // Create a person
49 let person = Person::new("Alice".to_string(), 25);
50
51 // Subscribe to name changes
52 let name_observer_id = person.name.subscribe(Arc::new(|old_name, new_name| {
53 println!("📝 Name changed: '{}' → '{}'", old_name, new_name);
54 }))?;
55
56 // Subscribe to age changes
57 let age_observer_id = person.age.subscribe(Arc::new(|old_age, new_age| {
58 println!("🎂 Age changed: {} → {}", old_age, new_age);
59 }))?;
60
61 // Subscribe to only significant age changes (milestones)
62 let milestone_observer_id = person.age.subscribe_filtered(
63 Arc::new(|_old_age, new_age| {
64 println!("🎉 Milestone reached! {} is now a special age: {}",
65 if *new_age >= 18 { "Adult" } else { "Child" }, new_age);
66 }),
67 |old_age, new_age| {
68 // Notify on milestone ages
69 let milestones = [18, 21, 30, 40, 50, 65];
70 milestones.contains(new_age) ||
71 (milestones.contains(old_age) && !milestones.contains(new_age))
72 }
73 )?;
74
75 println!("Initial state:");
76 println!(" Name: {}", person.get_name()?);
77 println!(" Age: {}\n", person.get_age()?);
78
79 // Demonstrate property changes
80 println!("Making changes...\n");
81
82 // Change name
83 person.set_name("Alice Johnson".to_string())?;
84
85 // Age up a few times
86 person.celebrate_birthday()?;
87 person.celebrate_birthday()?;
88 person.celebrate_birthday()?;
89
90 // Change to milestone age
91 person.set_age(21)?;
92
93 // Direct property access
94 println!("\nDirect property access:");
95 let simple_property = ObservableProperty::new(100);
96
97 simple_property.subscribe(Arc::new(|old, new| {
98 println!("💰 Value changed: {} → {}", old, new);
99 }))?;
100
101 simple_property.set(200)?;
102 simple_property.set(150)?;
103
104 // Cleanup observers
105 person.name.unsubscribe(name_observer_id)?;
106 person.age.unsubscribe(age_observer_id)?;
107 person.age.unsubscribe(milestone_observer_id)?;
108
109 println!("\n✅ Example completed successfully!");
110 Ok(())
111}More examples
11fn main() -> Result<(), Box<dyn std::error::Error>> {
12 println!("=== Observable Property Multithreaded Example ===\n");
13
14 // Shared counter with multiple observers
15 let counter = Arc::new(ObservableProperty::new(0i32));
16 let notification_count = Arc::new(AtomicUsize::new(0));
17
18 // Add observers that track notifications
19 let count_clone = notification_count.clone();
20 let observer1_id = counter.subscribe(Arc::new(move |old, new| {
21 count_clone.fetch_add(1, Ordering::SeqCst);
22 println!("🔴 Observer 1: {} → {}", old, new);
23 }))?;
24
25 let count_clone = notification_count.clone();
26 let observer2_id = counter.subscribe(Arc::new(move |old, new| {
27 count_clone.fetch_add(1, Ordering::SeqCst);
28 println!("🔵 Observer 2: {} → {}", old, new);
29 }))?;
30
31 // Filtered observer for even numbers only
32 let count_clone = notification_count.clone();
33 let even_observer_id = counter.subscribe_filtered(
34 Arc::new(move |old, new| {
35 count_clone.fetch_add(1, Ordering::SeqCst);
36 println!("🟢 Even Observer: {} → {} (new value is even!)", old, new);
37 }),
38 |_old, new| new % 2 == 0
39 )?;
40
41 println!("📊 Starting concurrent updates...\n");
42
43 // Test 1: Multiple threads updating the counter
44 {
45 let num_threads = 4;
46 let updates_per_thread = 5;
47 let barrier = Arc::new(Barrier::new(num_threads));
48
49 let handles: Vec<_> = (0..num_threads).map(|thread_id| {
50 let counter_clone = counter.clone();
51 let barrier_clone = barrier.clone();
52
53 thread::spawn(move || {
54 // Wait for all threads to be ready
55 barrier_clone.wait();
56
57 for i in 0..updates_per_thread {
58 let new_value = (thread_id * 100 + i) as i32;
59
60 if let Err(e) = counter_clone.set(new_value) {
61 eprintln!("Thread {} failed to set counter: {}", thread_id, e);
62 } else {
63 println!("🧵 Thread {} set counter to {}", thread_id, new_value);
64 }
65
66 // Small delay to see interleaving
67 thread::sleep(Duration::from_millis(10));
68 }
69 })
70 }).collect();
71
72 // Wait for all threads to complete
73 for handle in handles {
74 if let Err(e) = handle.join() {
75 eprintln!("Thread panicked: {:?}", e);
76 }
77 }
78 }
79
80 thread::sleep(Duration::from_millis(100)); // Let notifications settle
81
82 println!("\n📈 Current counter value: {}", counter.get()?);
83 println!("🔔 Total notifications sent: {}\n", notification_count.load(Ordering::SeqCst));
84
85 // Test 2: Performance comparison between sync and async notifications
86 println!("⚡ Performance Test: Sync vs Async notifications\n");
87
88 // Create a property with a slow observer
89 let perf_counter = ObservableProperty::new(0i32);
90 let slow_notifications = Arc::new(AtomicUsize::new(0));
91 let slow_count_clone = slow_notifications.clone();
92
93 perf_counter.subscribe(Arc::new(move |_old, _new| {
94 // Simulate slow observer work
95 thread::sleep(Duration::from_millis(20));
96 slow_count_clone.fetch_add(1, Ordering::SeqCst);
97 }))?;
98
99 // Test synchronous updates (will block)
100 let start = Instant::now();
101 for i in 1..=3 {
102 perf_counter.set(i)?;
103 }
104 let sync_duration = start.elapsed();
105 println!("🐌 Sync updates (3 changes): {:?}", sync_duration);
106
107 // Test asynchronous updates (won't block)
108 let start = Instant::now();
109 for i in 4..=6 {
110 perf_counter.set_async(i)?;
111 }
112 let async_duration = start.elapsed();
113 println!("🚀 Async updates (3 changes): {:?}", async_duration);
114
115 println!("⚡ Speedup: {:.1}x faster",
116 sync_duration.as_secs_f64() / async_duration.as_secs_f64());
117
118 // Wait for async observers to complete
119 thread::sleep(Duration::from_millis(100));
120 println!("🔔 Slow observer notifications: {}\n", slow_notifications.load(Ordering::SeqCst));
121
122 // Test 3: Concurrent reads while updating
123 println!("📚 Concurrent reads test...\n");
124 {
125 let read_counter = Arc::new(ObservableProperty::new(1000i32));
126 let read_count = Arc::new(AtomicUsize::new(0));
127 let num_readers = 8;
128 let reads_per_reader = 50;
129
130 // Start reader threads
131 let reader_handles: Vec<_> = (0..num_readers).map(|reader_id| {
132 let counter_clone = read_counter.clone();
133 let count_clone = read_count.clone();
134
135 thread::spawn(move || {
136 for _ in 0..reads_per_reader {
137 match counter_clone.get() {
138 Ok(value) => {
139 count_clone.fetch_add(1, Ordering::SeqCst);
140 if value % 100 == 0 {
141 println!("📖 Reader {} saw value: {}", reader_id, value);
142 }
143 }
144 Err(e) => eprintln!("Reader {} error: {}", reader_id, e),
145 }
146 thread::sleep(Duration::from_millis(1));
147 }
148 })
149 }).collect();
150
151 // Start writer thread
152 let writer_handle = {
153 let counter_clone = read_counter.clone();
154 thread::spawn(move || {
155 for i in 1..=20 {
156 let new_value = 1000 + i * 50;
157 if let Err(e) = counter_clone.set(new_value) {
158 eprintln!("Writer error: {}", e);
159 } else {
160 println!("✍️ Writer set value to: {}", new_value);
161 }
162 thread::sleep(Duration::from_millis(25));
163 }
164 })
165 };
166
167 // Wait for all threads
168 for handle in reader_handles {
169 if let Err(e) = handle.join() {
170 eprintln!("Reader thread panicked: {:?}", e);
171 }
172 }
173
174 if let Err(e) = writer_handle.join() {
175 eprintln!("Writer thread panicked: {:?}", e);
176 }
177
178 println!("📊 Total successful reads: {}", read_count.load(Ordering::SeqCst));
179 }
180
181 // Cleanup
182 counter.unsubscribe(observer1_id)?;
183 counter.unsubscribe(observer2_id)?;
184 counter.unsubscribe(even_observer_id)?;
185
186 println!("\n✅ Multithreaded example completed successfully!");
187 Ok(())
188}Sourcepub fn get(&self) -> Result<T, PropertyError>
pub fn get(&self) -> Result<T, PropertyError>
Gets the current value of the property
This method acquires a read lock, which allows multiple concurrent readers but will block if a writer currently holds the lock.
§Returns
Ok(T) containing a clone of the current value, or Err(PropertyError)
if the lock is poisoned.
§Examples
use observable_property::ObservableProperty;
let property = ObservableProperty::new("hello".to_string());
assert_eq!(property.get().unwrap(), "hello");Examples found in repository?
23 fn get_name(&self) -> Result<String, observable_property::PropertyError> {
24 self.name.get()
25 }
26
27 fn set_name(&self, new_name: String) -> Result<(), observable_property::PropertyError> {
28 self.name.set(new_name)
29 }
30
31 fn get_age(&self) -> Result<i32, observable_property::PropertyError> {
32 self.age.get()
33 }
34
35 fn set_age(&self, new_age: i32) -> Result<(), observable_property::PropertyError> {
36 self.age.set(new_age)
37 }
38
39 fn celebrate_birthday(&self) -> Result<(), observable_property::PropertyError> {
40 let current_age = self.age.get()?;
41 self.age.set(current_age + 1)
42 }More examples
11fn main() -> Result<(), Box<dyn std::error::Error>> {
12 println!("=== Observable Property Multithreaded Example ===\n");
13
14 // Shared counter with multiple observers
15 let counter = Arc::new(ObservableProperty::new(0i32));
16 let notification_count = Arc::new(AtomicUsize::new(0));
17
18 // Add observers that track notifications
19 let count_clone = notification_count.clone();
20 let observer1_id = counter.subscribe(Arc::new(move |old, new| {
21 count_clone.fetch_add(1, Ordering::SeqCst);
22 println!("🔴 Observer 1: {} → {}", old, new);
23 }))?;
24
25 let count_clone = notification_count.clone();
26 let observer2_id = counter.subscribe(Arc::new(move |old, new| {
27 count_clone.fetch_add(1, Ordering::SeqCst);
28 println!("🔵 Observer 2: {} → {}", old, new);
29 }))?;
30
31 // Filtered observer for even numbers only
32 let count_clone = notification_count.clone();
33 let even_observer_id = counter.subscribe_filtered(
34 Arc::new(move |old, new| {
35 count_clone.fetch_add(1, Ordering::SeqCst);
36 println!("🟢 Even Observer: {} → {} (new value is even!)", old, new);
37 }),
38 |_old, new| new % 2 == 0
39 )?;
40
41 println!("📊 Starting concurrent updates...\n");
42
43 // Test 1: Multiple threads updating the counter
44 {
45 let num_threads = 4;
46 let updates_per_thread = 5;
47 let barrier = Arc::new(Barrier::new(num_threads));
48
49 let handles: Vec<_> = (0..num_threads).map(|thread_id| {
50 let counter_clone = counter.clone();
51 let barrier_clone = barrier.clone();
52
53 thread::spawn(move || {
54 // Wait for all threads to be ready
55 barrier_clone.wait();
56
57 for i in 0..updates_per_thread {
58 let new_value = (thread_id * 100 + i) as i32;
59
60 if let Err(e) = counter_clone.set(new_value) {
61 eprintln!("Thread {} failed to set counter: {}", thread_id, e);
62 } else {
63 println!("🧵 Thread {} set counter to {}", thread_id, new_value);
64 }
65
66 // Small delay to see interleaving
67 thread::sleep(Duration::from_millis(10));
68 }
69 })
70 }).collect();
71
72 // Wait for all threads to complete
73 for handle in handles {
74 if let Err(e) = handle.join() {
75 eprintln!("Thread panicked: {:?}", e);
76 }
77 }
78 }
79
80 thread::sleep(Duration::from_millis(100)); // Let notifications settle
81
82 println!("\n📈 Current counter value: {}", counter.get()?);
83 println!("🔔 Total notifications sent: {}\n", notification_count.load(Ordering::SeqCst));
84
85 // Test 2: Performance comparison between sync and async notifications
86 println!("⚡ Performance Test: Sync vs Async notifications\n");
87
88 // Create a property with a slow observer
89 let perf_counter = ObservableProperty::new(0i32);
90 let slow_notifications = Arc::new(AtomicUsize::new(0));
91 let slow_count_clone = slow_notifications.clone();
92
93 perf_counter.subscribe(Arc::new(move |_old, _new| {
94 // Simulate slow observer work
95 thread::sleep(Duration::from_millis(20));
96 slow_count_clone.fetch_add(1, Ordering::SeqCst);
97 }))?;
98
99 // Test synchronous updates (will block)
100 let start = Instant::now();
101 for i in 1..=3 {
102 perf_counter.set(i)?;
103 }
104 let sync_duration = start.elapsed();
105 println!("🐌 Sync updates (3 changes): {:?}", sync_duration);
106
107 // Test asynchronous updates (won't block)
108 let start = Instant::now();
109 for i in 4..=6 {
110 perf_counter.set_async(i)?;
111 }
112 let async_duration = start.elapsed();
113 println!("🚀 Async updates (3 changes): {:?}", async_duration);
114
115 println!("⚡ Speedup: {:.1}x faster",
116 sync_duration.as_secs_f64() / async_duration.as_secs_f64());
117
118 // Wait for async observers to complete
119 thread::sleep(Duration::from_millis(100));
120 println!("🔔 Slow observer notifications: {}\n", slow_notifications.load(Ordering::SeqCst));
121
122 // Test 3: Concurrent reads while updating
123 println!("📚 Concurrent reads test...\n");
124 {
125 let read_counter = Arc::new(ObservableProperty::new(1000i32));
126 let read_count = Arc::new(AtomicUsize::new(0));
127 let num_readers = 8;
128 let reads_per_reader = 50;
129
130 // Start reader threads
131 let reader_handles: Vec<_> = (0..num_readers).map(|reader_id| {
132 let counter_clone = read_counter.clone();
133 let count_clone = read_count.clone();
134
135 thread::spawn(move || {
136 for _ in 0..reads_per_reader {
137 match counter_clone.get() {
138 Ok(value) => {
139 count_clone.fetch_add(1, Ordering::SeqCst);
140 if value % 100 == 0 {
141 println!("📖 Reader {} saw value: {}", reader_id, value);
142 }
143 }
144 Err(e) => eprintln!("Reader {} error: {}", reader_id, e),
145 }
146 thread::sleep(Duration::from_millis(1));
147 }
148 })
149 }).collect();
150
151 // Start writer thread
152 let writer_handle = {
153 let counter_clone = read_counter.clone();
154 thread::spawn(move || {
155 for i in 1..=20 {
156 let new_value = 1000 + i * 50;
157 if let Err(e) = counter_clone.set(new_value) {
158 eprintln!("Writer error: {}", e);
159 } else {
160 println!("✍️ Writer set value to: {}", new_value);
161 }
162 thread::sleep(Duration::from_millis(25));
163 }
164 })
165 };
166
167 // Wait for all threads
168 for handle in reader_handles {
169 if let Err(e) = handle.join() {
170 eprintln!("Reader thread panicked: {:?}", e);
171 }
172 }
173
174 if let Err(e) = writer_handle.join() {
175 eprintln!("Writer thread panicked: {:?}", e);
176 }
177
178 println!("📊 Total successful reads: {}", read_count.load(Ordering::SeqCst));
179 }
180
181 // Cleanup
182 counter.unsubscribe(observer1_id)?;
183 counter.unsubscribe(observer2_id)?;
184 counter.unsubscribe(even_observer_id)?;
185
186 println!("\n✅ Multithreaded example completed successfully!");
187 Ok(())
188}Sourcepub fn set(&self, new_value: T) -> Result<(), PropertyError>
pub fn set(&self, new_value: T) -> Result<(), PropertyError>
Sets the property to a new value and notifies all observers
This method will:
- Acquire a write lock (blocking other readers/writers)
- Update the value and capture a snapshot of observers
- Release the lock
- Notify all observers sequentially with the old and new values
Observer notifications are wrapped in panic recovery to prevent one misbehaving observer from affecting others.
§Arguments
new_value- The new value to set
§Returns
Ok(()) if successful, or Err(PropertyError) if the lock is poisoned.
§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;
let property = ObservableProperty::new(10);
property.subscribe(Arc::new(|old, new| {
println!("Value changed from {} to {}", old, new);
})).unwrap();
property.set(20).unwrap(); // Triggers observer notificationExamples found in repository?
27 fn set_name(&self, new_name: String) -> Result<(), observable_property::PropertyError> {
28 self.name.set(new_name)
29 }
30
31 fn get_age(&self) -> Result<i32, observable_property::PropertyError> {
32 self.age.get()
33 }
34
35 fn set_age(&self, new_age: i32) -> Result<(), observable_property::PropertyError> {
36 self.age.set(new_age)
37 }
38
39 fn celebrate_birthday(&self) -> Result<(), observable_property::PropertyError> {
40 let current_age = self.age.get()?;
41 self.age.set(current_age + 1)
42 }
43}
44
45fn main() -> Result<(), Box<dyn std::error::Error>> {
46 println!("=== Observable Property Basic Example ===\n");
47
48 // Create a person
49 let person = Person::new("Alice".to_string(), 25);
50
51 // Subscribe to name changes
52 let name_observer_id = person.name.subscribe(Arc::new(|old_name, new_name| {
53 println!("📝 Name changed: '{}' → '{}'", old_name, new_name);
54 }))?;
55
56 // Subscribe to age changes
57 let age_observer_id = person.age.subscribe(Arc::new(|old_age, new_age| {
58 println!("🎂 Age changed: {} → {}", old_age, new_age);
59 }))?;
60
61 // Subscribe to only significant age changes (milestones)
62 let milestone_observer_id = person.age.subscribe_filtered(
63 Arc::new(|_old_age, new_age| {
64 println!("🎉 Milestone reached! {} is now a special age: {}",
65 if *new_age >= 18 { "Adult" } else { "Child" }, new_age);
66 }),
67 |old_age, new_age| {
68 // Notify on milestone ages
69 let milestones = [18, 21, 30, 40, 50, 65];
70 milestones.contains(new_age) ||
71 (milestones.contains(old_age) && !milestones.contains(new_age))
72 }
73 )?;
74
75 println!("Initial state:");
76 println!(" Name: {}", person.get_name()?);
77 println!(" Age: {}\n", person.get_age()?);
78
79 // Demonstrate property changes
80 println!("Making changes...\n");
81
82 // Change name
83 person.set_name("Alice Johnson".to_string())?;
84
85 // Age up a few times
86 person.celebrate_birthday()?;
87 person.celebrate_birthday()?;
88 person.celebrate_birthday()?;
89
90 // Change to milestone age
91 person.set_age(21)?;
92
93 // Direct property access
94 println!("\nDirect property access:");
95 let simple_property = ObservableProperty::new(100);
96
97 simple_property.subscribe(Arc::new(|old, new| {
98 println!("💰 Value changed: {} → {}", old, new);
99 }))?;
100
101 simple_property.set(200)?;
102 simple_property.set(150)?;
103
104 // Cleanup observers
105 person.name.unsubscribe(name_observer_id)?;
106 person.age.unsubscribe(age_observer_id)?;
107 person.age.unsubscribe(milestone_observer_id)?;
108
109 println!("\n✅ Example completed successfully!");
110 Ok(())
111}More examples
11fn main() -> Result<(), Box<dyn std::error::Error>> {
12 println!("=== Observable Property Multithreaded Example ===\n");
13
14 // Shared counter with multiple observers
15 let counter = Arc::new(ObservableProperty::new(0i32));
16 let notification_count = Arc::new(AtomicUsize::new(0));
17
18 // Add observers that track notifications
19 let count_clone = notification_count.clone();
20 let observer1_id = counter.subscribe(Arc::new(move |old, new| {
21 count_clone.fetch_add(1, Ordering::SeqCst);
22 println!("🔴 Observer 1: {} → {}", old, new);
23 }))?;
24
25 let count_clone = notification_count.clone();
26 let observer2_id = counter.subscribe(Arc::new(move |old, new| {
27 count_clone.fetch_add(1, Ordering::SeqCst);
28 println!("🔵 Observer 2: {} → {}", old, new);
29 }))?;
30
31 // Filtered observer for even numbers only
32 let count_clone = notification_count.clone();
33 let even_observer_id = counter.subscribe_filtered(
34 Arc::new(move |old, new| {
35 count_clone.fetch_add(1, Ordering::SeqCst);
36 println!("🟢 Even Observer: {} → {} (new value is even!)", old, new);
37 }),
38 |_old, new| new % 2 == 0
39 )?;
40
41 println!("📊 Starting concurrent updates...\n");
42
43 // Test 1: Multiple threads updating the counter
44 {
45 let num_threads = 4;
46 let updates_per_thread = 5;
47 let barrier = Arc::new(Barrier::new(num_threads));
48
49 let handles: Vec<_> = (0..num_threads).map(|thread_id| {
50 let counter_clone = counter.clone();
51 let barrier_clone = barrier.clone();
52
53 thread::spawn(move || {
54 // Wait for all threads to be ready
55 barrier_clone.wait();
56
57 for i in 0..updates_per_thread {
58 let new_value = (thread_id * 100 + i) as i32;
59
60 if let Err(e) = counter_clone.set(new_value) {
61 eprintln!("Thread {} failed to set counter: {}", thread_id, e);
62 } else {
63 println!("🧵 Thread {} set counter to {}", thread_id, new_value);
64 }
65
66 // Small delay to see interleaving
67 thread::sleep(Duration::from_millis(10));
68 }
69 })
70 }).collect();
71
72 // Wait for all threads to complete
73 for handle in handles {
74 if let Err(e) = handle.join() {
75 eprintln!("Thread panicked: {:?}", e);
76 }
77 }
78 }
79
80 thread::sleep(Duration::from_millis(100)); // Let notifications settle
81
82 println!("\n📈 Current counter value: {}", counter.get()?);
83 println!("🔔 Total notifications sent: {}\n", notification_count.load(Ordering::SeqCst));
84
85 // Test 2: Performance comparison between sync and async notifications
86 println!("⚡ Performance Test: Sync vs Async notifications\n");
87
88 // Create a property with a slow observer
89 let perf_counter = ObservableProperty::new(0i32);
90 let slow_notifications = Arc::new(AtomicUsize::new(0));
91 let slow_count_clone = slow_notifications.clone();
92
93 perf_counter.subscribe(Arc::new(move |_old, _new| {
94 // Simulate slow observer work
95 thread::sleep(Duration::from_millis(20));
96 slow_count_clone.fetch_add(1, Ordering::SeqCst);
97 }))?;
98
99 // Test synchronous updates (will block)
100 let start = Instant::now();
101 for i in 1..=3 {
102 perf_counter.set(i)?;
103 }
104 let sync_duration = start.elapsed();
105 println!("🐌 Sync updates (3 changes): {:?}", sync_duration);
106
107 // Test asynchronous updates (won't block)
108 let start = Instant::now();
109 for i in 4..=6 {
110 perf_counter.set_async(i)?;
111 }
112 let async_duration = start.elapsed();
113 println!("🚀 Async updates (3 changes): {:?}", async_duration);
114
115 println!("⚡ Speedup: {:.1}x faster",
116 sync_duration.as_secs_f64() / async_duration.as_secs_f64());
117
118 // Wait for async observers to complete
119 thread::sleep(Duration::from_millis(100));
120 println!("🔔 Slow observer notifications: {}\n", slow_notifications.load(Ordering::SeqCst));
121
122 // Test 3: Concurrent reads while updating
123 println!("📚 Concurrent reads test...\n");
124 {
125 let read_counter = Arc::new(ObservableProperty::new(1000i32));
126 let read_count = Arc::new(AtomicUsize::new(0));
127 let num_readers = 8;
128 let reads_per_reader = 50;
129
130 // Start reader threads
131 let reader_handles: Vec<_> = (0..num_readers).map(|reader_id| {
132 let counter_clone = read_counter.clone();
133 let count_clone = read_count.clone();
134
135 thread::spawn(move || {
136 for _ in 0..reads_per_reader {
137 match counter_clone.get() {
138 Ok(value) => {
139 count_clone.fetch_add(1, Ordering::SeqCst);
140 if value % 100 == 0 {
141 println!("📖 Reader {} saw value: {}", reader_id, value);
142 }
143 }
144 Err(e) => eprintln!("Reader {} error: {}", reader_id, e),
145 }
146 thread::sleep(Duration::from_millis(1));
147 }
148 })
149 }).collect();
150
151 // Start writer thread
152 let writer_handle = {
153 let counter_clone = read_counter.clone();
154 thread::spawn(move || {
155 for i in 1..=20 {
156 let new_value = 1000 + i * 50;
157 if let Err(e) = counter_clone.set(new_value) {
158 eprintln!("Writer error: {}", e);
159 } else {
160 println!("✍️ Writer set value to: {}", new_value);
161 }
162 thread::sleep(Duration::from_millis(25));
163 }
164 })
165 };
166
167 // Wait for all threads
168 for handle in reader_handles {
169 if let Err(e) = handle.join() {
170 eprintln!("Reader thread panicked: {:?}", e);
171 }
172 }
173
174 if let Err(e) = writer_handle.join() {
175 eprintln!("Writer thread panicked: {:?}", e);
176 }
177
178 println!("📊 Total successful reads: {}", read_count.load(Ordering::SeqCst));
179 }
180
181 // Cleanup
182 counter.unsubscribe(observer1_id)?;
183 counter.unsubscribe(observer2_id)?;
184 counter.unsubscribe(even_observer_id)?;
185
186 println!("\n✅ Multithreaded example completed successfully!");
187 Ok(())
188}Sourcepub fn set_async(&self, new_value: T) -> Result<(), PropertyError>
pub fn set_async(&self, new_value: T) -> Result<(), PropertyError>
Sets the property to a new value and notifies observers asynchronously
This method is similar to set() but spawns observers in background threads
for non-blocking operation. This is useful when observers might perform
time-consuming operations.
Observers are batched into groups and each batch runs in its own thread to limit resource usage while still providing parallelism.
§Arguments
new_value- The new value to set
§Returns
Ok(()) if successful, or Err(PropertyError) if the lock is poisoned.
Note that this only indicates the property was updated successfully;
observer execution happens asynchronously.
§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;
use std::time::Duration;
let property = ObservableProperty::new(0);
property.subscribe(Arc::new(|old, new| {
// This observer does slow work but won't block the caller
std::thread::sleep(Duration::from_millis(100));
println!("Slow observer: {} -> {}", old, new);
})).unwrap();
// This returns immediately even though observer is slow
property.set_async(42).unwrap();Examples found in repository?
11fn main() -> Result<(), Box<dyn std::error::Error>> {
12 println!("=== Observable Property Multithreaded Example ===\n");
13
14 // Shared counter with multiple observers
15 let counter = Arc::new(ObservableProperty::new(0i32));
16 let notification_count = Arc::new(AtomicUsize::new(0));
17
18 // Add observers that track notifications
19 let count_clone = notification_count.clone();
20 let observer1_id = counter.subscribe(Arc::new(move |old, new| {
21 count_clone.fetch_add(1, Ordering::SeqCst);
22 println!("🔴 Observer 1: {} → {}", old, new);
23 }))?;
24
25 let count_clone = notification_count.clone();
26 let observer2_id = counter.subscribe(Arc::new(move |old, new| {
27 count_clone.fetch_add(1, Ordering::SeqCst);
28 println!("🔵 Observer 2: {} → {}", old, new);
29 }))?;
30
31 // Filtered observer for even numbers only
32 let count_clone = notification_count.clone();
33 let even_observer_id = counter.subscribe_filtered(
34 Arc::new(move |old, new| {
35 count_clone.fetch_add(1, Ordering::SeqCst);
36 println!("🟢 Even Observer: {} → {} (new value is even!)", old, new);
37 }),
38 |_old, new| new % 2 == 0
39 )?;
40
41 println!("📊 Starting concurrent updates...\n");
42
43 // Test 1: Multiple threads updating the counter
44 {
45 let num_threads = 4;
46 let updates_per_thread = 5;
47 let barrier = Arc::new(Barrier::new(num_threads));
48
49 let handles: Vec<_> = (0..num_threads).map(|thread_id| {
50 let counter_clone = counter.clone();
51 let barrier_clone = barrier.clone();
52
53 thread::spawn(move || {
54 // Wait for all threads to be ready
55 barrier_clone.wait();
56
57 for i in 0..updates_per_thread {
58 let new_value = (thread_id * 100 + i) as i32;
59
60 if let Err(e) = counter_clone.set(new_value) {
61 eprintln!("Thread {} failed to set counter: {}", thread_id, e);
62 } else {
63 println!("🧵 Thread {} set counter to {}", thread_id, new_value);
64 }
65
66 // Small delay to see interleaving
67 thread::sleep(Duration::from_millis(10));
68 }
69 })
70 }).collect();
71
72 // Wait for all threads to complete
73 for handle in handles {
74 if let Err(e) = handle.join() {
75 eprintln!("Thread panicked: {:?}", e);
76 }
77 }
78 }
79
80 thread::sleep(Duration::from_millis(100)); // Let notifications settle
81
82 println!("\n📈 Current counter value: {}", counter.get()?);
83 println!("🔔 Total notifications sent: {}\n", notification_count.load(Ordering::SeqCst));
84
85 // Test 2: Performance comparison between sync and async notifications
86 println!("⚡ Performance Test: Sync vs Async notifications\n");
87
88 // Create a property with a slow observer
89 let perf_counter = ObservableProperty::new(0i32);
90 let slow_notifications = Arc::new(AtomicUsize::new(0));
91 let slow_count_clone = slow_notifications.clone();
92
93 perf_counter.subscribe(Arc::new(move |_old, _new| {
94 // Simulate slow observer work
95 thread::sleep(Duration::from_millis(20));
96 slow_count_clone.fetch_add(1, Ordering::SeqCst);
97 }))?;
98
99 // Test synchronous updates (will block)
100 let start = Instant::now();
101 for i in 1..=3 {
102 perf_counter.set(i)?;
103 }
104 let sync_duration = start.elapsed();
105 println!("🐌 Sync updates (3 changes): {:?}", sync_duration);
106
107 // Test asynchronous updates (won't block)
108 let start = Instant::now();
109 for i in 4..=6 {
110 perf_counter.set_async(i)?;
111 }
112 let async_duration = start.elapsed();
113 println!("🚀 Async updates (3 changes): {:?}", async_duration);
114
115 println!("⚡ Speedup: {:.1}x faster",
116 sync_duration.as_secs_f64() / async_duration.as_secs_f64());
117
118 // Wait for async observers to complete
119 thread::sleep(Duration::from_millis(100));
120 println!("🔔 Slow observer notifications: {}\n", slow_notifications.load(Ordering::SeqCst));
121
122 // Test 3: Concurrent reads while updating
123 println!("📚 Concurrent reads test...\n");
124 {
125 let read_counter = Arc::new(ObservableProperty::new(1000i32));
126 let read_count = Arc::new(AtomicUsize::new(0));
127 let num_readers = 8;
128 let reads_per_reader = 50;
129
130 // Start reader threads
131 let reader_handles: Vec<_> = (0..num_readers).map(|reader_id| {
132 let counter_clone = read_counter.clone();
133 let count_clone = read_count.clone();
134
135 thread::spawn(move || {
136 for _ in 0..reads_per_reader {
137 match counter_clone.get() {
138 Ok(value) => {
139 count_clone.fetch_add(1, Ordering::SeqCst);
140 if value % 100 == 0 {
141 println!("📖 Reader {} saw value: {}", reader_id, value);
142 }
143 }
144 Err(e) => eprintln!("Reader {} error: {}", reader_id, e),
145 }
146 thread::sleep(Duration::from_millis(1));
147 }
148 })
149 }).collect();
150
151 // Start writer thread
152 let writer_handle = {
153 let counter_clone = read_counter.clone();
154 thread::spawn(move || {
155 for i in 1..=20 {
156 let new_value = 1000 + i * 50;
157 if let Err(e) = counter_clone.set(new_value) {
158 eprintln!("Writer error: {}", e);
159 } else {
160 println!("✍️ Writer set value to: {}", new_value);
161 }
162 thread::sleep(Duration::from_millis(25));
163 }
164 })
165 };
166
167 // Wait for all threads
168 for handle in reader_handles {
169 if let Err(e) = handle.join() {
170 eprintln!("Reader thread panicked: {:?}", e);
171 }
172 }
173
174 if let Err(e) = writer_handle.join() {
175 eprintln!("Writer thread panicked: {:?}", e);
176 }
177
178 println!("📊 Total successful reads: {}", read_count.load(Ordering::SeqCst));
179 }
180
181 // Cleanup
182 counter.unsubscribe(observer1_id)?;
183 counter.unsubscribe(observer2_id)?;
184 counter.unsubscribe(even_observer_id)?;
185
186 println!("\n✅ Multithreaded example completed successfully!");
187 Ok(())
188}Sourcepub fn subscribe(
&self,
observer: Observer<T>,
) -> Result<ObserverId, PropertyError>
pub fn subscribe( &self, observer: Observer<T>, ) -> Result<ObserverId, PropertyError>
Subscribes an observer function to be called when the property changes
The observer function will be called with the old and new values whenever
the property is modified via set() or set_async().
§Arguments
observer- A function wrapped inArcthat takes(&T, &T)parameters
§Returns
Ok(ObserverId) containing a unique identifier for this observer,
or Err(PropertyError) if the lock is poisoned.
§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;
let property = ObservableProperty::new(0);
let observer_id = property.subscribe(Arc::new(|old_value, new_value| {
println!("Property changed from {} to {}", old_value, new_value);
})).unwrap();
// Later, unsubscribe using the returned ID
property.unsubscribe(observer_id).unwrap();Examples found in repository?
45fn main() -> Result<(), Box<dyn std::error::Error>> {
46 println!("=== Observable Property Basic Example ===\n");
47
48 // Create a person
49 let person = Person::new("Alice".to_string(), 25);
50
51 // Subscribe to name changes
52 let name_observer_id = person.name.subscribe(Arc::new(|old_name, new_name| {
53 println!("📝 Name changed: '{}' → '{}'", old_name, new_name);
54 }))?;
55
56 // Subscribe to age changes
57 let age_observer_id = person.age.subscribe(Arc::new(|old_age, new_age| {
58 println!("🎂 Age changed: {} → {}", old_age, new_age);
59 }))?;
60
61 // Subscribe to only significant age changes (milestones)
62 let milestone_observer_id = person.age.subscribe_filtered(
63 Arc::new(|_old_age, new_age| {
64 println!("🎉 Milestone reached! {} is now a special age: {}",
65 if *new_age >= 18 { "Adult" } else { "Child" }, new_age);
66 }),
67 |old_age, new_age| {
68 // Notify on milestone ages
69 let milestones = [18, 21, 30, 40, 50, 65];
70 milestones.contains(new_age) ||
71 (milestones.contains(old_age) && !milestones.contains(new_age))
72 }
73 )?;
74
75 println!("Initial state:");
76 println!(" Name: {}", person.get_name()?);
77 println!(" Age: {}\n", person.get_age()?);
78
79 // Demonstrate property changes
80 println!("Making changes...\n");
81
82 // Change name
83 person.set_name("Alice Johnson".to_string())?;
84
85 // Age up a few times
86 person.celebrate_birthday()?;
87 person.celebrate_birthday()?;
88 person.celebrate_birthday()?;
89
90 // Change to milestone age
91 person.set_age(21)?;
92
93 // Direct property access
94 println!("\nDirect property access:");
95 let simple_property = ObservableProperty::new(100);
96
97 simple_property.subscribe(Arc::new(|old, new| {
98 println!("💰 Value changed: {} → {}", old, new);
99 }))?;
100
101 simple_property.set(200)?;
102 simple_property.set(150)?;
103
104 // Cleanup observers
105 person.name.unsubscribe(name_observer_id)?;
106 person.age.unsubscribe(age_observer_id)?;
107 person.age.unsubscribe(milestone_observer_id)?;
108
109 println!("\n✅ Example completed successfully!");
110 Ok(())
111}More examples
11fn main() -> Result<(), Box<dyn std::error::Error>> {
12 println!("=== Observable Property Multithreaded Example ===\n");
13
14 // Shared counter with multiple observers
15 let counter = Arc::new(ObservableProperty::new(0i32));
16 let notification_count = Arc::new(AtomicUsize::new(0));
17
18 // Add observers that track notifications
19 let count_clone = notification_count.clone();
20 let observer1_id = counter.subscribe(Arc::new(move |old, new| {
21 count_clone.fetch_add(1, Ordering::SeqCst);
22 println!("🔴 Observer 1: {} → {}", old, new);
23 }))?;
24
25 let count_clone = notification_count.clone();
26 let observer2_id = counter.subscribe(Arc::new(move |old, new| {
27 count_clone.fetch_add(1, Ordering::SeqCst);
28 println!("🔵 Observer 2: {} → {}", old, new);
29 }))?;
30
31 // Filtered observer for even numbers only
32 let count_clone = notification_count.clone();
33 let even_observer_id = counter.subscribe_filtered(
34 Arc::new(move |old, new| {
35 count_clone.fetch_add(1, Ordering::SeqCst);
36 println!("🟢 Even Observer: {} → {} (new value is even!)", old, new);
37 }),
38 |_old, new| new % 2 == 0
39 )?;
40
41 println!("📊 Starting concurrent updates...\n");
42
43 // Test 1: Multiple threads updating the counter
44 {
45 let num_threads = 4;
46 let updates_per_thread = 5;
47 let barrier = Arc::new(Barrier::new(num_threads));
48
49 let handles: Vec<_> = (0..num_threads).map(|thread_id| {
50 let counter_clone = counter.clone();
51 let barrier_clone = barrier.clone();
52
53 thread::spawn(move || {
54 // Wait for all threads to be ready
55 barrier_clone.wait();
56
57 for i in 0..updates_per_thread {
58 let new_value = (thread_id * 100 + i) as i32;
59
60 if let Err(e) = counter_clone.set(new_value) {
61 eprintln!("Thread {} failed to set counter: {}", thread_id, e);
62 } else {
63 println!("🧵 Thread {} set counter to {}", thread_id, new_value);
64 }
65
66 // Small delay to see interleaving
67 thread::sleep(Duration::from_millis(10));
68 }
69 })
70 }).collect();
71
72 // Wait for all threads to complete
73 for handle in handles {
74 if let Err(e) = handle.join() {
75 eprintln!("Thread panicked: {:?}", e);
76 }
77 }
78 }
79
80 thread::sleep(Duration::from_millis(100)); // Let notifications settle
81
82 println!("\n📈 Current counter value: {}", counter.get()?);
83 println!("🔔 Total notifications sent: {}\n", notification_count.load(Ordering::SeqCst));
84
85 // Test 2: Performance comparison between sync and async notifications
86 println!("⚡ Performance Test: Sync vs Async notifications\n");
87
88 // Create a property with a slow observer
89 let perf_counter = ObservableProperty::new(0i32);
90 let slow_notifications = Arc::new(AtomicUsize::new(0));
91 let slow_count_clone = slow_notifications.clone();
92
93 perf_counter.subscribe(Arc::new(move |_old, _new| {
94 // Simulate slow observer work
95 thread::sleep(Duration::from_millis(20));
96 slow_count_clone.fetch_add(1, Ordering::SeqCst);
97 }))?;
98
99 // Test synchronous updates (will block)
100 let start = Instant::now();
101 for i in 1..=3 {
102 perf_counter.set(i)?;
103 }
104 let sync_duration = start.elapsed();
105 println!("🐌 Sync updates (3 changes): {:?}", sync_duration);
106
107 // Test asynchronous updates (won't block)
108 let start = Instant::now();
109 for i in 4..=6 {
110 perf_counter.set_async(i)?;
111 }
112 let async_duration = start.elapsed();
113 println!("🚀 Async updates (3 changes): {:?}", async_duration);
114
115 println!("⚡ Speedup: {:.1}x faster",
116 sync_duration.as_secs_f64() / async_duration.as_secs_f64());
117
118 // Wait for async observers to complete
119 thread::sleep(Duration::from_millis(100));
120 println!("🔔 Slow observer notifications: {}\n", slow_notifications.load(Ordering::SeqCst));
121
122 // Test 3: Concurrent reads while updating
123 println!("📚 Concurrent reads test...\n");
124 {
125 let read_counter = Arc::new(ObservableProperty::new(1000i32));
126 let read_count = Arc::new(AtomicUsize::new(0));
127 let num_readers = 8;
128 let reads_per_reader = 50;
129
130 // Start reader threads
131 let reader_handles: Vec<_> = (0..num_readers).map(|reader_id| {
132 let counter_clone = read_counter.clone();
133 let count_clone = read_count.clone();
134
135 thread::spawn(move || {
136 for _ in 0..reads_per_reader {
137 match counter_clone.get() {
138 Ok(value) => {
139 count_clone.fetch_add(1, Ordering::SeqCst);
140 if value % 100 == 0 {
141 println!("📖 Reader {} saw value: {}", reader_id, value);
142 }
143 }
144 Err(e) => eprintln!("Reader {} error: {}", reader_id, e),
145 }
146 thread::sleep(Duration::from_millis(1));
147 }
148 })
149 }).collect();
150
151 // Start writer thread
152 let writer_handle = {
153 let counter_clone = read_counter.clone();
154 thread::spawn(move || {
155 for i in 1..=20 {
156 let new_value = 1000 + i * 50;
157 if let Err(e) = counter_clone.set(new_value) {
158 eprintln!("Writer error: {}", e);
159 } else {
160 println!("✍️ Writer set value to: {}", new_value);
161 }
162 thread::sleep(Duration::from_millis(25));
163 }
164 })
165 };
166
167 // Wait for all threads
168 for handle in reader_handles {
169 if let Err(e) = handle.join() {
170 eprintln!("Reader thread panicked: {:?}", e);
171 }
172 }
173
174 if let Err(e) = writer_handle.join() {
175 eprintln!("Writer thread panicked: {:?}", e);
176 }
177
178 println!("📊 Total successful reads: {}", read_count.load(Ordering::SeqCst));
179 }
180
181 // Cleanup
182 counter.unsubscribe(observer1_id)?;
183 counter.unsubscribe(observer2_id)?;
184 counter.unsubscribe(even_observer_id)?;
185
186 println!("\n✅ Multithreaded example completed successfully!");
187 Ok(())
188}Sourcepub fn unsubscribe(&self, id: ObserverId) -> Result<bool, PropertyError>
pub fn unsubscribe(&self, id: ObserverId) -> Result<bool, PropertyError>
Removes an observer by its ID
§Arguments
id- The observer ID returned bysubscribe()
§Returns
Ok(bool) where true means the observer was found and removed,
false means no observer with that ID existed.
Returns Err(PropertyError) if the lock is poisoned.
§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;
let property = ObservableProperty::new(0);
let id = property.subscribe(Arc::new(|_, _| {})).unwrap();
let was_removed = property.unsubscribe(id).unwrap();
assert!(was_removed); // Observer existed and was removed
let was_removed_again = property.unsubscribe(id).unwrap();
assert!(!was_removed_again); // Observer no longer existsExamples found in repository?
45fn main() -> Result<(), Box<dyn std::error::Error>> {
46 println!("=== Observable Property Basic Example ===\n");
47
48 // Create a person
49 let person = Person::new("Alice".to_string(), 25);
50
51 // Subscribe to name changes
52 let name_observer_id = person.name.subscribe(Arc::new(|old_name, new_name| {
53 println!("📝 Name changed: '{}' → '{}'", old_name, new_name);
54 }))?;
55
56 // Subscribe to age changes
57 let age_observer_id = person.age.subscribe(Arc::new(|old_age, new_age| {
58 println!("🎂 Age changed: {} → {}", old_age, new_age);
59 }))?;
60
61 // Subscribe to only significant age changes (milestones)
62 let milestone_observer_id = person.age.subscribe_filtered(
63 Arc::new(|_old_age, new_age| {
64 println!("🎉 Milestone reached! {} is now a special age: {}",
65 if *new_age >= 18 { "Adult" } else { "Child" }, new_age);
66 }),
67 |old_age, new_age| {
68 // Notify on milestone ages
69 let milestones = [18, 21, 30, 40, 50, 65];
70 milestones.contains(new_age) ||
71 (milestones.contains(old_age) && !milestones.contains(new_age))
72 }
73 )?;
74
75 println!("Initial state:");
76 println!(" Name: {}", person.get_name()?);
77 println!(" Age: {}\n", person.get_age()?);
78
79 // Demonstrate property changes
80 println!("Making changes...\n");
81
82 // Change name
83 person.set_name("Alice Johnson".to_string())?;
84
85 // Age up a few times
86 person.celebrate_birthday()?;
87 person.celebrate_birthday()?;
88 person.celebrate_birthday()?;
89
90 // Change to milestone age
91 person.set_age(21)?;
92
93 // Direct property access
94 println!("\nDirect property access:");
95 let simple_property = ObservableProperty::new(100);
96
97 simple_property.subscribe(Arc::new(|old, new| {
98 println!("💰 Value changed: {} → {}", old, new);
99 }))?;
100
101 simple_property.set(200)?;
102 simple_property.set(150)?;
103
104 // Cleanup observers
105 person.name.unsubscribe(name_observer_id)?;
106 person.age.unsubscribe(age_observer_id)?;
107 person.age.unsubscribe(milestone_observer_id)?;
108
109 println!("\n✅ Example completed successfully!");
110 Ok(())
111}More examples
11fn main() -> Result<(), Box<dyn std::error::Error>> {
12 println!("=== Observable Property Multithreaded Example ===\n");
13
14 // Shared counter with multiple observers
15 let counter = Arc::new(ObservableProperty::new(0i32));
16 let notification_count = Arc::new(AtomicUsize::new(0));
17
18 // Add observers that track notifications
19 let count_clone = notification_count.clone();
20 let observer1_id = counter.subscribe(Arc::new(move |old, new| {
21 count_clone.fetch_add(1, Ordering::SeqCst);
22 println!("🔴 Observer 1: {} → {}", old, new);
23 }))?;
24
25 let count_clone = notification_count.clone();
26 let observer2_id = counter.subscribe(Arc::new(move |old, new| {
27 count_clone.fetch_add(1, Ordering::SeqCst);
28 println!("🔵 Observer 2: {} → {}", old, new);
29 }))?;
30
31 // Filtered observer for even numbers only
32 let count_clone = notification_count.clone();
33 let even_observer_id = counter.subscribe_filtered(
34 Arc::new(move |old, new| {
35 count_clone.fetch_add(1, Ordering::SeqCst);
36 println!("🟢 Even Observer: {} → {} (new value is even!)", old, new);
37 }),
38 |_old, new| new % 2 == 0
39 )?;
40
41 println!("📊 Starting concurrent updates...\n");
42
43 // Test 1: Multiple threads updating the counter
44 {
45 let num_threads = 4;
46 let updates_per_thread = 5;
47 let barrier = Arc::new(Barrier::new(num_threads));
48
49 let handles: Vec<_> = (0..num_threads).map(|thread_id| {
50 let counter_clone = counter.clone();
51 let barrier_clone = barrier.clone();
52
53 thread::spawn(move || {
54 // Wait for all threads to be ready
55 barrier_clone.wait();
56
57 for i in 0..updates_per_thread {
58 let new_value = (thread_id * 100 + i) as i32;
59
60 if let Err(e) = counter_clone.set(new_value) {
61 eprintln!("Thread {} failed to set counter: {}", thread_id, e);
62 } else {
63 println!("🧵 Thread {} set counter to {}", thread_id, new_value);
64 }
65
66 // Small delay to see interleaving
67 thread::sleep(Duration::from_millis(10));
68 }
69 })
70 }).collect();
71
72 // Wait for all threads to complete
73 for handle in handles {
74 if let Err(e) = handle.join() {
75 eprintln!("Thread panicked: {:?}", e);
76 }
77 }
78 }
79
80 thread::sleep(Duration::from_millis(100)); // Let notifications settle
81
82 println!("\n📈 Current counter value: {}", counter.get()?);
83 println!("🔔 Total notifications sent: {}\n", notification_count.load(Ordering::SeqCst));
84
85 // Test 2: Performance comparison between sync and async notifications
86 println!("⚡ Performance Test: Sync vs Async notifications\n");
87
88 // Create a property with a slow observer
89 let perf_counter = ObservableProperty::new(0i32);
90 let slow_notifications = Arc::new(AtomicUsize::new(0));
91 let slow_count_clone = slow_notifications.clone();
92
93 perf_counter.subscribe(Arc::new(move |_old, _new| {
94 // Simulate slow observer work
95 thread::sleep(Duration::from_millis(20));
96 slow_count_clone.fetch_add(1, Ordering::SeqCst);
97 }))?;
98
99 // Test synchronous updates (will block)
100 let start = Instant::now();
101 for i in 1..=3 {
102 perf_counter.set(i)?;
103 }
104 let sync_duration = start.elapsed();
105 println!("🐌 Sync updates (3 changes): {:?}", sync_duration);
106
107 // Test asynchronous updates (won't block)
108 let start = Instant::now();
109 for i in 4..=6 {
110 perf_counter.set_async(i)?;
111 }
112 let async_duration = start.elapsed();
113 println!("🚀 Async updates (3 changes): {:?}", async_duration);
114
115 println!("⚡ Speedup: {:.1}x faster",
116 sync_duration.as_secs_f64() / async_duration.as_secs_f64());
117
118 // Wait for async observers to complete
119 thread::sleep(Duration::from_millis(100));
120 println!("🔔 Slow observer notifications: {}\n", slow_notifications.load(Ordering::SeqCst));
121
122 // Test 3: Concurrent reads while updating
123 println!("📚 Concurrent reads test...\n");
124 {
125 let read_counter = Arc::new(ObservableProperty::new(1000i32));
126 let read_count = Arc::new(AtomicUsize::new(0));
127 let num_readers = 8;
128 let reads_per_reader = 50;
129
130 // Start reader threads
131 let reader_handles: Vec<_> = (0..num_readers).map(|reader_id| {
132 let counter_clone = read_counter.clone();
133 let count_clone = read_count.clone();
134
135 thread::spawn(move || {
136 for _ in 0..reads_per_reader {
137 match counter_clone.get() {
138 Ok(value) => {
139 count_clone.fetch_add(1, Ordering::SeqCst);
140 if value % 100 == 0 {
141 println!("📖 Reader {} saw value: {}", reader_id, value);
142 }
143 }
144 Err(e) => eprintln!("Reader {} error: {}", reader_id, e),
145 }
146 thread::sleep(Duration::from_millis(1));
147 }
148 })
149 }).collect();
150
151 // Start writer thread
152 let writer_handle = {
153 let counter_clone = read_counter.clone();
154 thread::spawn(move || {
155 for i in 1..=20 {
156 let new_value = 1000 + i * 50;
157 if let Err(e) = counter_clone.set(new_value) {
158 eprintln!("Writer error: {}", e);
159 } else {
160 println!("✍️ Writer set value to: {}", new_value);
161 }
162 thread::sleep(Duration::from_millis(25));
163 }
164 })
165 };
166
167 // Wait for all threads
168 for handle in reader_handles {
169 if let Err(e) = handle.join() {
170 eprintln!("Reader thread panicked: {:?}", e);
171 }
172 }
173
174 if let Err(e) = writer_handle.join() {
175 eprintln!("Writer thread panicked: {:?}", e);
176 }
177
178 println!("📊 Total successful reads: {}", read_count.load(Ordering::SeqCst));
179 }
180
181 // Cleanup
182 counter.unsubscribe(observer1_id)?;
183 counter.unsubscribe(observer2_id)?;
184 counter.unsubscribe(even_observer_id)?;
185
186 println!("\n✅ Multithreaded example completed successfully!");
187 Ok(())
188}Sourcepub fn subscribe_filtered<F>(
&self,
observer: Observer<T>,
filter: F,
) -> Result<ObserverId, PropertyError>
pub fn subscribe_filtered<F>( &self, observer: Observer<T>, filter: F, ) -> Result<ObserverId, PropertyError>
Subscribes an observer that only gets called when a filter condition is met
This is useful for observing only specific types of changes, such as when a value increases or crosses a threshold.
§Arguments
observer- The observer function to call when the filter passesfilter- A predicate function that receives(old_value, new_value)and returnsbool
§Returns
Ok(ObserverId) for the filtered observer, or Err(PropertyError) if the lock is poisoned.
§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;
let property = ObservableProperty::new(0);
// Only notify when value increases
let id = property.subscribe_filtered(
Arc::new(|old, new| println!("Value increased: {} -> {}", old, new)),
|old, new| new > old
).unwrap();
property.set(10).unwrap(); // Triggers observer (0 -> 10)
property.set(5).unwrap(); // Does NOT trigger observer (10 -> 5)
property.set(15).unwrap(); // Triggers observer (5 -> 15)Examples found in repository?
45fn main() -> Result<(), Box<dyn std::error::Error>> {
46 println!("=== Observable Property Basic Example ===\n");
47
48 // Create a person
49 let person = Person::new("Alice".to_string(), 25);
50
51 // Subscribe to name changes
52 let name_observer_id = person.name.subscribe(Arc::new(|old_name, new_name| {
53 println!("📝 Name changed: '{}' → '{}'", old_name, new_name);
54 }))?;
55
56 // Subscribe to age changes
57 let age_observer_id = person.age.subscribe(Arc::new(|old_age, new_age| {
58 println!("🎂 Age changed: {} → {}", old_age, new_age);
59 }))?;
60
61 // Subscribe to only significant age changes (milestones)
62 let milestone_observer_id = person.age.subscribe_filtered(
63 Arc::new(|_old_age, new_age| {
64 println!("🎉 Milestone reached! {} is now a special age: {}",
65 if *new_age >= 18 { "Adult" } else { "Child" }, new_age);
66 }),
67 |old_age, new_age| {
68 // Notify on milestone ages
69 let milestones = [18, 21, 30, 40, 50, 65];
70 milestones.contains(new_age) ||
71 (milestones.contains(old_age) && !milestones.contains(new_age))
72 }
73 )?;
74
75 println!("Initial state:");
76 println!(" Name: {}", person.get_name()?);
77 println!(" Age: {}\n", person.get_age()?);
78
79 // Demonstrate property changes
80 println!("Making changes...\n");
81
82 // Change name
83 person.set_name("Alice Johnson".to_string())?;
84
85 // Age up a few times
86 person.celebrate_birthday()?;
87 person.celebrate_birthday()?;
88 person.celebrate_birthday()?;
89
90 // Change to milestone age
91 person.set_age(21)?;
92
93 // Direct property access
94 println!("\nDirect property access:");
95 let simple_property = ObservableProperty::new(100);
96
97 simple_property.subscribe(Arc::new(|old, new| {
98 println!("💰 Value changed: {} → {}", old, new);
99 }))?;
100
101 simple_property.set(200)?;
102 simple_property.set(150)?;
103
104 // Cleanup observers
105 person.name.unsubscribe(name_observer_id)?;
106 person.age.unsubscribe(age_observer_id)?;
107 person.age.unsubscribe(milestone_observer_id)?;
108
109 println!("\n✅ Example completed successfully!");
110 Ok(())
111}More examples
11fn main() -> Result<(), Box<dyn std::error::Error>> {
12 println!("=== Observable Property Multithreaded Example ===\n");
13
14 // Shared counter with multiple observers
15 let counter = Arc::new(ObservableProperty::new(0i32));
16 let notification_count = Arc::new(AtomicUsize::new(0));
17
18 // Add observers that track notifications
19 let count_clone = notification_count.clone();
20 let observer1_id = counter.subscribe(Arc::new(move |old, new| {
21 count_clone.fetch_add(1, Ordering::SeqCst);
22 println!("🔴 Observer 1: {} → {}", old, new);
23 }))?;
24
25 let count_clone = notification_count.clone();
26 let observer2_id = counter.subscribe(Arc::new(move |old, new| {
27 count_clone.fetch_add(1, Ordering::SeqCst);
28 println!("🔵 Observer 2: {} → {}", old, new);
29 }))?;
30
31 // Filtered observer for even numbers only
32 let count_clone = notification_count.clone();
33 let even_observer_id = counter.subscribe_filtered(
34 Arc::new(move |old, new| {
35 count_clone.fetch_add(1, Ordering::SeqCst);
36 println!("🟢 Even Observer: {} → {} (new value is even!)", old, new);
37 }),
38 |_old, new| new % 2 == 0
39 )?;
40
41 println!("📊 Starting concurrent updates...\n");
42
43 // Test 1: Multiple threads updating the counter
44 {
45 let num_threads = 4;
46 let updates_per_thread = 5;
47 let barrier = Arc::new(Barrier::new(num_threads));
48
49 let handles: Vec<_> = (0..num_threads).map(|thread_id| {
50 let counter_clone = counter.clone();
51 let barrier_clone = barrier.clone();
52
53 thread::spawn(move || {
54 // Wait for all threads to be ready
55 barrier_clone.wait();
56
57 for i in 0..updates_per_thread {
58 let new_value = (thread_id * 100 + i) as i32;
59
60 if let Err(e) = counter_clone.set(new_value) {
61 eprintln!("Thread {} failed to set counter: {}", thread_id, e);
62 } else {
63 println!("🧵 Thread {} set counter to {}", thread_id, new_value);
64 }
65
66 // Small delay to see interleaving
67 thread::sleep(Duration::from_millis(10));
68 }
69 })
70 }).collect();
71
72 // Wait for all threads to complete
73 for handle in handles {
74 if let Err(e) = handle.join() {
75 eprintln!("Thread panicked: {:?}", e);
76 }
77 }
78 }
79
80 thread::sleep(Duration::from_millis(100)); // Let notifications settle
81
82 println!("\n📈 Current counter value: {}", counter.get()?);
83 println!("🔔 Total notifications sent: {}\n", notification_count.load(Ordering::SeqCst));
84
85 // Test 2: Performance comparison between sync and async notifications
86 println!("⚡ Performance Test: Sync vs Async notifications\n");
87
88 // Create a property with a slow observer
89 let perf_counter = ObservableProperty::new(0i32);
90 let slow_notifications = Arc::new(AtomicUsize::new(0));
91 let slow_count_clone = slow_notifications.clone();
92
93 perf_counter.subscribe(Arc::new(move |_old, _new| {
94 // Simulate slow observer work
95 thread::sleep(Duration::from_millis(20));
96 slow_count_clone.fetch_add(1, Ordering::SeqCst);
97 }))?;
98
99 // Test synchronous updates (will block)
100 let start = Instant::now();
101 for i in 1..=3 {
102 perf_counter.set(i)?;
103 }
104 let sync_duration = start.elapsed();
105 println!("🐌 Sync updates (3 changes): {:?}", sync_duration);
106
107 // Test asynchronous updates (won't block)
108 let start = Instant::now();
109 for i in 4..=6 {
110 perf_counter.set_async(i)?;
111 }
112 let async_duration = start.elapsed();
113 println!("🚀 Async updates (3 changes): {:?}", async_duration);
114
115 println!("⚡ Speedup: {:.1}x faster",
116 sync_duration.as_secs_f64() / async_duration.as_secs_f64());
117
118 // Wait for async observers to complete
119 thread::sleep(Duration::from_millis(100));
120 println!("🔔 Slow observer notifications: {}\n", slow_notifications.load(Ordering::SeqCst));
121
122 // Test 3: Concurrent reads while updating
123 println!("📚 Concurrent reads test...\n");
124 {
125 let read_counter = Arc::new(ObservableProperty::new(1000i32));
126 let read_count = Arc::new(AtomicUsize::new(0));
127 let num_readers = 8;
128 let reads_per_reader = 50;
129
130 // Start reader threads
131 let reader_handles: Vec<_> = (0..num_readers).map(|reader_id| {
132 let counter_clone = read_counter.clone();
133 let count_clone = read_count.clone();
134
135 thread::spawn(move || {
136 for _ in 0..reads_per_reader {
137 match counter_clone.get() {
138 Ok(value) => {
139 count_clone.fetch_add(1, Ordering::SeqCst);
140 if value % 100 == 0 {
141 println!("📖 Reader {} saw value: {}", reader_id, value);
142 }
143 }
144 Err(e) => eprintln!("Reader {} error: {}", reader_id, e),
145 }
146 thread::sleep(Duration::from_millis(1));
147 }
148 })
149 }).collect();
150
151 // Start writer thread
152 let writer_handle = {
153 let counter_clone = read_counter.clone();
154 thread::spawn(move || {
155 for i in 1..=20 {
156 let new_value = 1000 + i * 50;
157 if let Err(e) = counter_clone.set(new_value) {
158 eprintln!("Writer error: {}", e);
159 } else {
160 println!("✍️ Writer set value to: {}", new_value);
161 }
162 thread::sleep(Duration::from_millis(25));
163 }
164 })
165 };
166
167 // Wait for all threads
168 for handle in reader_handles {
169 if let Err(e) = handle.join() {
170 eprintln!("Reader thread panicked: {:?}", e);
171 }
172 }
173
174 if let Err(e) = writer_handle.join() {
175 eprintln!("Writer thread panicked: {:?}", e);
176 }
177
178 println!("📊 Total successful reads: {}", read_count.load(Ordering::SeqCst));
179 }
180
181 // Cleanup
182 counter.unsubscribe(observer1_id)?;
183 counter.unsubscribe(observer2_id)?;
184 counter.unsubscribe(even_observer_id)?;
185
186 println!("\n✅ Multithreaded example completed successfully!");
187 Ok(())
188}Trait Implementations§
Source§impl<T: Clone> Clone for ObservableProperty<T>
impl<T: Clone> Clone for ObservableProperty<T>
Source§fn clone(&self) -> Self
fn clone(&self) -> Self
Creates a new reference to the same observable property
This creates a new ObservableProperty instance that shares the same
underlying data with the original. Changes made through either instance
will be visible to observers subscribed through both instances.
§Examples
use observable_property::ObservableProperty;
use std::sync::Arc;
let property1 = ObservableProperty::new(42);
let property2 = property1.clone();
property2.subscribe(Arc::new(|old, new| {
println!("Observer on property2 saw change: {} -> {}", old, new);
})).unwrap();
// This change through property1 will trigger the observer on property2
property1.set(100).unwrap();1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read more