multithreaded/
multithreaded.rs1use observable_property::ObservableProperty;
6use std::sync::{Arc, Barrier};
7use std::thread;
8use std::time::{Duration, Instant};
9use std::sync::atomic::{AtomicUsize, Ordering};
10
11fn main() -> Result<(), Box<dyn std::error::Error>> {
12 println!("=== Observable Property Multithreaded Example ===\n");
13
14 let counter = Arc::new(ObservableProperty::new(0i32));
16 let notification_count = Arc::new(AtomicUsize::new(0));
17
18 let count_clone = notification_count.clone();
20 let observer1_id = counter.subscribe(Arc::new(move |old, new| {
21 count_clone.fetch_add(1, Ordering::SeqCst);
22 println!("๐ด Observer 1: {} โ {}", old, new);
23 }))?;
24
25 let count_clone = notification_count.clone();
26 let observer2_id = counter.subscribe(Arc::new(move |old, new| {
27 count_clone.fetch_add(1, Ordering::SeqCst);
28 println!("๐ต Observer 2: {} โ {}", old, new);
29 }))?;
30
31 let count_clone = notification_count.clone();
33 let even_observer_id = counter.subscribe_filtered(
34 Arc::new(move |old, new| {
35 count_clone.fetch_add(1, Ordering::SeqCst);
36 println!("๐ข Even Observer: {} โ {} (new value is even!)", old, new);
37 }),
38 |_old, new| new % 2 == 0
39 )?;
40
41 println!("๐ Starting concurrent updates...\n");
42
43 {
45 let num_threads = 4;
46 let updates_per_thread = 5;
47 let barrier = Arc::new(Barrier::new(num_threads));
48
49 let handles: Vec<_> = (0..num_threads).map(|thread_id| {
50 let counter_clone = counter.clone();
51 let barrier_clone = barrier.clone();
52
53 thread::spawn(move || {
54 barrier_clone.wait();
56
57 for i in 0..updates_per_thread {
58 let new_value = (thread_id * 100 + i) as i32;
59
60 if let Err(e) = counter_clone.set(new_value) {
61 eprintln!("Thread {} failed to set counter: {}", thread_id, e);
62 } else {
63 println!("๐งต Thread {} set counter to {}", thread_id, new_value);
64 }
65
66 thread::sleep(Duration::from_millis(10));
68 }
69 })
70 }).collect();
71
72 for handle in handles {
74 if let Err(e) = handle.join() {
75 eprintln!("Thread panicked: {:?}", e);
76 }
77 }
78 }
79
80 thread::sleep(Duration::from_millis(100)); println!("\n๐ Current counter value: {}", counter.get()?);
83 println!("๐ Total notifications sent: {}\n", notification_count.load(Ordering::SeqCst));
84
85 println!("โก Performance Test: Sync vs Async notifications\n");
87
88 let perf_counter = ObservableProperty::new(0i32);
90 let slow_notifications = Arc::new(AtomicUsize::new(0));
91 let slow_count_clone = slow_notifications.clone();
92
93 perf_counter.subscribe(Arc::new(move |_old, _new| {
94 thread::sleep(Duration::from_millis(20));
96 slow_count_clone.fetch_add(1, Ordering::SeqCst);
97 }))?;
98
99 let start = Instant::now();
101 for i in 1..=3 {
102 perf_counter.set(i)?;
103 }
104 let sync_duration = start.elapsed();
105 println!("๐ Sync updates (3 changes): {:?}", sync_duration);
106
107 let start = Instant::now();
109 for i in 4..=6 {
110 perf_counter.set_async(i)?;
111 }
112 let async_duration = start.elapsed();
113 println!("๐ Async updates (3 changes): {:?}", async_duration);
114
115 println!("โก Speedup: {:.1}x faster",
116 sync_duration.as_secs_f64() / async_duration.as_secs_f64());
117
118 thread::sleep(Duration::from_millis(100));
120 println!("๐ Slow observer notifications: {}\n", slow_notifications.load(Ordering::SeqCst));
121
122 println!("๐ Concurrent reads test...\n");
124 {
125 let read_counter = Arc::new(ObservableProperty::new(1000i32));
126 let read_count = Arc::new(AtomicUsize::new(0));
127 let num_readers = 8;
128 let reads_per_reader = 50;
129
130 let reader_handles: Vec<_> = (0..num_readers).map(|reader_id| {
132 let counter_clone = read_counter.clone();
133 let count_clone = read_count.clone();
134
135 thread::spawn(move || {
136 for _ in 0..reads_per_reader {
137 match counter_clone.get() {
138 Ok(value) => {
139 count_clone.fetch_add(1, Ordering::SeqCst);
140 if value % 100 == 0 {
141 println!("๐ Reader {} saw value: {}", reader_id, value);
142 }
143 }
144 Err(e) => eprintln!("Reader {} error: {}", reader_id, e),
145 }
146 thread::sleep(Duration::from_millis(1));
147 }
148 })
149 }).collect();
150
151 let writer_handle = {
153 let counter_clone = read_counter.clone();
154 thread::spawn(move || {
155 for i in 1..=20 {
156 let new_value = 1000 + i * 50;
157 if let Err(e) = counter_clone.set(new_value) {
158 eprintln!("Writer error: {}", e);
159 } else {
160 println!("โ๏ธ Writer set value to: {}", new_value);
161 }
162 thread::sleep(Duration::from_millis(25));
163 }
164 })
165 };
166
167 for handle in reader_handles {
169 if let Err(e) = handle.join() {
170 eprintln!("Reader thread panicked: {:?}", e);
171 }
172 }
173
174 if let Err(e) = writer_handle.join() {
175 eprintln!("Writer thread panicked: {:?}", e);
176 }
177
178 println!("๐ Total successful reads: {}", read_count.load(Ordering::SeqCst));
179 }
180
181 counter.unsubscribe(observer1_id)?;
183 counter.unsubscribe(observer2_id)?;
184 counter.unsubscribe(even_observer_id)?;
185
186 println!("\nโ
Multithreaded example completed successfully!");
187 Ok(())
188}