multithreaded/
multithreaded.rs

1//! Multithreaded example demonstrating thread-safe observable properties
2//!
3//! Run with: cargo run --example multithreaded
4
5use observable_property::ObservableProperty;
6use std::sync::{Arc, Barrier};
7use std::thread;
8use std::time::{Duration, Instant};
9use std::sync::atomic::{AtomicUsize, Ordering};
10
11fn main() -> Result<(), Box<dyn std::error::Error>> {
12    println!("=== Observable Property Multithreaded Example ===\n");
13
14    // Shared counter with multiple observers
15    let counter = Arc::new(ObservableProperty::new(0i32));
16    let notification_count = Arc::new(AtomicUsize::new(0));
17
18    // Add observers that track notifications
19    let count_clone = notification_count.clone();
20    let observer1_id = counter.subscribe(Arc::new(move |old, new| {
21        count_clone.fetch_add(1, Ordering::SeqCst);
22        println!("๐Ÿ”ด Observer 1: {} โ†’ {}", old, new);
23    }))?;
24
25    let count_clone = notification_count.clone();
26    let observer2_id = counter.subscribe(Arc::new(move |old, new| {
27        count_clone.fetch_add(1, Ordering::SeqCst);
28        println!("๐Ÿ”ต Observer 2: {} โ†’ {}", old, new);
29    }))?;
30
31    // Filtered observer for even numbers only
32    let count_clone = notification_count.clone();
33    let even_observer_id = counter.subscribe_filtered(
34        Arc::new(move |old, new| {
35            count_clone.fetch_add(1, Ordering::SeqCst);
36            println!("๐ŸŸข Even Observer: {} โ†’ {} (new value is even!)", old, new);
37        }),
38        |_old, new| new % 2 == 0
39    )?;
40
41    println!("๐Ÿ“Š Starting concurrent updates...\n");
42
43    // Test 1: Multiple threads updating the counter
44    {
45        let num_threads = 4;
46        let updates_per_thread = 5;
47        let barrier = Arc::new(Barrier::new(num_threads));
48
49        let handles: Vec<_> = (0..num_threads).map(|thread_id| {
50            let counter_clone = counter.clone();
51            let barrier_clone = barrier.clone();
52
53            thread::spawn(move || {
54                // Wait for all threads to be ready
55                barrier_clone.wait();
56
57                for i in 0..updates_per_thread {
58                    let new_value = (thread_id * 100 + i) as i32;
59                    
60                    if let Err(e) = counter_clone.set(new_value) {
61                        eprintln!("Thread {} failed to set counter: {}", thread_id, e);
62                    } else {
63                        println!("๐Ÿงต Thread {} set counter to {}", thread_id, new_value);
64                    }
65
66                    // Small delay to see interleaving
67                    thread::sleep(Duration::from_millis(10));
68                }
69            })
70        }).collect();
71
72        // Wait for all threads to complete
73        for handle in handles {
74            if let Err(e) = handle.join() {
75                eprintln!("Thread panicked: {:?}", e);
76            }
77        }
78    }
79
80    thread::sleep(Duration::from_millis(100)); // Let notifications settle
81
82    println!("\n๐Ÿ“ˆ Current counter value: {}", counter.get()?);
83    println!("๐Ÿ”” Total notifications sent: {}\n", notification_count.load(Ordering::SeqCst));
84
85    // Test 2: Performance comparison between sync and async notifications
86    println!("โšก Performance Test: Sync vs Async notifications\n");
87
88    // Create a property with a slow observer
89    let perf_counter = ObservableProperty::new(0i32);
90    let slow_notifications = Arc::new(AtomicUsize::new(0));
91    let slow_count_clone = slow_notifications.clone();
92
93    perf_counter.subscribe(Arc::new(move |_old, _new| {
94        // Simulate slow observer work
95        thread::sleep(Duration::from_millis(20));
96        slow_count_clone.fetch_add(1, Ordering::SeqCst);
97    }))?;
98
99    // Test synchronous updates (will block)
100    let start = Instant::now();
101    for i in 1..=3 {
102        perf_counter.set(i)?;
103    }
104    let sync_duration = start.elapsed();
105    println!("๐ŸŒ Sync updates (3 changes): {:?}", sync_duration);
106
107    // Test asynchronous updates (won't block)
108    let start = Instant::now();
109    for i in 4..=6 {
110        perf_counter.set_async(i)?;
111    }
112    let async_duration = start.elapsed();
113    println!("๐Ÿš€ Async updates (3 changes): {:?}", async_duration);
114
115    println!("โšก Speedup: {:.1}x faster", 
116             sync_duration.as_secs_f64() / async_duration.as_secs_f64());
117
118    // Wait for async observers to complete
119    thread::sleep(Duration::from_millis(100));
120    println!("๐Ÿ”” Slow observer notifications: {}\n", slow_notifications.load(Ordering::SeqCst));
121
122    // Test 3: Concurrent reads while updating
123    println!("๐Ÿ“š Concurrent reads test...\n");
124    {
125        let read_counter = Arc::new(ObservableProperty::new(1000i32));
126        let read_count = Arc::new(AtomicUsize::new(0));
127        let num_readers = 8;
128        let reads_per_reader = 50;
129
130        // Start reader threads
131        let reader_handles: Vec<_> = (0..num_readers).map(|reader_id| {
132            let counter_clone = read_counter.clone();
133            let count_clone = read_count.clone();
134
135            thread::spawn(move || {
136                for _ in 0..reads_per_reader {
137                    match counter_clone.get() {
138                        Ok(value) => {
139                            count_clone.fetch_add(1, Ordering::SeqCst);
140                            if value % 100 == 0 {
141                                println!("๐Ÿ“– Reader {} saw value: {}", reader_id, value);
142                            }
143                        }
144                        Err(e) => eprintln!("Reader {} error: {}", reader_id, e),
145                    }
146                    thread::sleep(Duration::from_millis(1));
147                }
148            })
149        }).collect();
150
151        // Start writer thread
152        let writer_handle = {
153            let counter_clone = read_counter.clone();
154            thread::spawn(move || {
155                for i in 1..=20 {
156                    let new_value = 1000 + i * 50;
157                    if let Err(e) = counter_clone.set(new_value) {
158                        eprintln!("Writer error: {}", e);
159                    } else {
160                        println!("โœ๏ธ  Writer set value to: {}", new_value);
161                    }
162                    thread::sleep(Duration::from_millis(25));
163                }
164            })
165        };
166
167        // Wait for all threads
168        for handle in reader_handles {
169            if let Err(e) = handle.join() {
170                eprintln!("Reader thread panicked: {:?}", e);
171            }
172        }
173
174        if let Err(e) = writer_handle.join() {
175            eprintln!("Writer thread panicked: {:?}", e);
176        }
177
178        println!("๐Ÿ“Š Total successful reads: {}", read_count.load(Ordering::SeqCst));
179    }
180
181    // Cleanup
182    counter.unsubscribe(observer1_id)?;
183    counter.unsubscribe(observer2_id)?;
184    counter.unsubscribe(even_observer_id)?;
185
186    println!("\nโœ… Multithreaded example completed successfully!");
187    Ok(())
188}