thread_share/
enhanced.rs

1//! # Enhanced Module - EnhancedThreadShare<T>
2//!
3//! This module provides `EnhancedThreadShare<T>`, a powerful extension of `ThreadShare<T>`
4//! that adds automatic thread management capabilities.
5//!
6//! ## 🚀 Overview
7//!
8//! `EnhancedThreadShare<T>` eliminates the need for manual thread management by providing:
9//!
10//! - **Automatic Thread Spawning**: Spawn threads with a single method call
11//! - **Built-in Thread Tracking**: Monitor active thread count and status
12//! - **Automatic Thread Joining**: Wait for all threads to complete with `join_all()`
13//! - **Thread Naming**: Give meaningful names to threads for debugging
14//! - **All ThreadShare Features**: Inherits all capabilities from `ThreadShare<T>`
15//!
16//! ## Key Benefits
17//!
18//! ### 🎯 Simplified Thread Management
19//! ```rust
20//! // Old way: Manual thread management
21//! use thread_share::share;
22//! use std::thread;
23//!
24//! let data = share!(vec![1, 2, 3]);
25//! let clone1 = data.clone();
26//! let clone2 = data.clone();
27//!
28//! let handle1 = thread::spawn(move || { /* logic */ });
29//! let handle2 = thread::spawn(move || { /* logic */ });
30//!
31//! handle1.join().expect("Failed to join");
32//! handle2.join().expect("Failed to join");
33//!
34//! // New way: Enhanced thread management
35//! use thread_share::enhanced_share;
36//!
37//! let enhanced = enhanced_share!(vec![1, 2, 3]);
38//!
39//! enhanced.spawn("worker1", |data| { /* logic */ });
40//! enhanced.spawn("worker2", |data| { /* logic */ });
41//!
42//! enhanced.join_all().expect("Failed to join");
43//! ```
44//!
45//! ### 📊 Real-time Monitoring
46//! ```rust
47//! use thread_share::enhanced_share;
48//!
49//! let enhanced = enhanced_share!(vec![1, 2, 3]);
50//!
51//! enhanced.spawn("processor", |data| { /* logic */ });
52//! enhanced.spawn("validator", |data| { /* logic */ });
53//!
54//! println!("Active threads: {}", enhanced.active_threads());
55//!
56//! // Wait for completion
57//! enhanced.join_all().expect("Failed to join");
58//!
59//! assert!(enhanced.is_complete());
60//! ```
61//!
62//! ## Architecture
63//!
64//! `EnhancedThreadShare<T>` wraps a `ThreadShare<T>` and adds:
65//!
66//! - **`inner: ThreadShare<T>`** - The underlying shared data
67//! - **`threads: Arc<Mutex<HashMap<String, JoinHandle<()>>>>`** - Thread tracking
68//!
69//! ## Thread Lifecycle
70//!
71//! 1. **Creation**: `EnhancedThreadShare::new(data)` or `enhanced_share!(data)`
72//! 2. **Spawning**: `enhanced.spawn(name, function)` creates named threads
73//! 3. **Execution**: Threads run with access to shared data
74//! 4. **Monitoring**: Track active threads with `active_threads()`
75//! 5. **Completion**: Wait for all threads with `join_all()`
76//!
77//! ## Example Usage
78//!
79//! ### Basic Thread Management
80//! ```rust
81//! use thread_share::{enhanced_share, spawn_workers};
82//!
83//! let data = enhanced_share!(vec![1, 2, 3]);
84//!
85//! // Spawn individual threads
86//! data.spawn("sorter", |data| {
87//!     data.update(|v| v.sort());
88//! });
89//!
90//! data.spawn("validator", |data| {
91//!     assert!(data.get().is_sorted());
92//! });
93//!
94//! // Wait for completion
95//! data.join_all().expect("Failed to join");
96//! ```
97//!
98//! ### Using Macros
99//! ```rust
100//! use thread_share::share;
101//!
102//! let data = share!(String::from("Hello"));
103//! let clone = data.clone();
104//!
105//! // Spawn a simple thread
106//! std::thread::spawn(move || {
107//!     clone.update(|s| s.push_str(" World"));
108//! });
109//!
110//! // Wait a bit and check result
111//! std::thread::sleep(std::time::Duration::from_millis(100));
112//! println!("Updated: {}", data.get());
113//! ```
114//!
115//! ### Real-world Example
116//! ```rust
117//! use thread_share::share;
118//! use std::time::Duration;
119//!
120//! #[derive(Clone)]
121//! struct Server {
122//!     port: u16,
123//!     is_running: bool,
124//!     connections: u32,
125//! }
126//!
127//! let server = share!(Server {
128//!     port: 8080,
129//!     is_running: false,
130//!     connections: 0,
131//! });
132//!
133//! let server_clone = server.clone();
134//!
135//! // Spawn a simple server thread
136//! std::thread::spawn(move || {
137//!     server_clone.update(|s| {
138//!         s.is_running = true;
139//!         s.connections = 5;
140//!     });
141//! });
142//!
143//! // Wait a bit and check result
144//! std::thread::sleep(Duration::from_millis(100));
145//! let final_state = server.get();
146//! println!("Server running: {}, connections: {}", final_state.is_running, final_state.connections);
147//! ```
148//!
149//! ## Performance Characteristics
150//!
151//! - **Thread Spawning**: Minimal overhead over standard `thread::spawn`
152//! - **Thread Tracking**: Constant-time operations for thread management
153//! - **Memory Usage**: Small overhead for thread tracking structures
154//! - **Scalability**: Efficient for up to hundreds of threads
155//!
156//! ## Best Practices
157//!
158//! 1. **Use descriptive thread names** for easier debugging
159//! 2. **Keep thread functions focused** on single responsibilities
160//! 3. **Always call `join_all()`** to ensure proper cleanup
161//! 4. **Monitor thread count** with `active_threads()` for debugging
162//! 5. **Handle errors gracefully** from `join_all()` and `spawn()`
163//!
164//! ## Error Handling
165//!
166//! ```rust
167//! use thread_share::share;
168//!
169//! let data = share!(0);
170//! let clone = data.clone();
171//!
172//! // Spawn thread with error handling
173//! let handle = std::thread::spawn(move || {
174//!     clone.update(|x| *x = *x + 1);
175//! });
176//!
177//! // Handle join errors
178//! if let Err(e) = handle.join() {
179//!     eprintln!("Thread execution failed: {:?}", e);
180//! }
181//! ```
182//!
183//! ## Thread Safety
184//!
185//! `EnhancedThreadShare<T>` automatically implements `Send` and `Sync` traits
186//! when `T` implements them, making it safe to use across thread boundaries.
187//!
188//! ## Integration with Macros
189//!
190//! This module works seamlessly with the library's macros:
191//!
192//! - **`enhanced_share!`** - Creates `EnhancedThreadShare<T>` instances
193//! - **`spawn_workers!`** - Spawns multiple threads with single macro call
194//!
195//! ## Comparison with Manual Thread Management
196//!
197//! | Aspect | Manual Management | EnhancedThreadShare |
198//! |--------|-------------------|-------------------|
199//! | **Thread Creation** | `thread::spawn()` calls | `enhanced.spawn()` |
200//! | **Thread Tracking** | Manual `JoinHandle` storage | Automatic tracking |
201//! | **Thread Joining** | Manual `join()` calls | `join_all()` |
202//! | **Error Handling** | Per-thread error handling | Centralized error handling |
203//! | **Debugging** | No thread identification | Named threads |
204//! | **Code Complexity** | High | Low |
205
206use std::collections::HashMap;
207use std::sync::{Arc, Mutex};
208use std::thread::{self};
209use crate::core::ThreadShare;
210
211#[cfg(feature = "serialize")]
212use serde::{Serialize, Deserialize};
213
214/// Enhanced ThreadShare with built-in thread management
215///
216/// `EnhancedThreadShare<T>` extends `ThreadShare<T>` with automatic thread management
217/// capabilities, eliminating the need for manual thread spawning and joining.
218///
219/// ## Key Features
220///
221/// - **Automatic Thread Spawning**: Spawn threads with a single method call
222/// - **Built-in Thread Tracking**: Monitor active thread count and status
223/// - **Automatic Thread Joining**: Wait for all threads to complete with `join_all()`
224/// - **Thread Naming**: Give meaningful names to threads for debugging
225/// - **All ThreadShare Features**: Inherits all capabilities from `ThreadShare<T>`
226///
227/// ## Example
228///
229/// ```rust
230/// use thread_share::{enhanced_share, spawn_workers};
231///
232/// let data = enhanced_share!(vec![1, 2, 3]);
233///
234/// // Spawn individual threads
235/// data.spawn("sorter", |data| {
236///     data.update(|v| v.sort());
237/// });
238///
239/// data.spawn("validator", |data| {
240///     assert!(data.get().is_sorted());
241/// });
242///
243/// // Wait for completion
244/// data.join_all().expect("Failed to join");
245/// ```
246///
247/// ## Thread Lifecycle
248///
249/// 1. **Creation**: `EnhancedThreadShare::new(data)` or `enhanced_share!(data)`
250/// 2. **Spawning**: `enhanced.spawn(name, function)` creates named threads
251/// 3. **Execution**: Threads run with access to shared data
252/// 4. **Monitoring**: Track active threads with `active_threads()`
253/// 5. **Completion**: Wait for all threads with `join_all()`
254///
255/// ## Performance
256///
257/// - **Thread Spawning**: Minimal overhead over standard `thread::spawn`
258/// - **Thread Tracking**: Constant-time operations for thread management
259/// - **Memory Usage**: Small overhead for thread tracking structures
260/// - **Scalability**: Efficient for up to hundreds of threads
261pub struct EnhancedThreadShare<T> {
262    inner: ThreadShare<T>,
263    threads: Arc<Mutex<HashMap<String, thread::JoinHandle<()>>>>,
264}
265
266impl<T> EnhancedThreadShare<T> {
267    /// Creates a new EnhancedThreadShare
268    ///
269    /// This method creates a new `EnhancedThreadShare<T>` instance with the provided data.
270    /// The data is wrapped in a `ThreadShare<T>` for safe sharing between threads.
271    ///
272    /// ## Arguments
273    ///
274    /// * `data` - The initial data to share between threads
275    ///
276    /// ## Returns
277    ///
278    /// A new `EnhancedThreadShare<T>` instance.
279    ///
280    /// ## Example
281    ///
282    /// ```rust
283    /// use thread_share::EnhancedThreadShare;
284    ///
285    /// let enhanced = EnhancedThreadShare::new(0);
286    /// let enhanced = EnhancedThreadShare::new(String::from("Hello"));
287    /// let enhanced = EnhancedThreadShare::new(vec![1, 2, 3]);
288    /// ```
289    pub fn new(data: T) -> Self {
290        Self {
291            inner: ThreadShare::new(data),
292            threads: Arc::new(Mutex::new(HashMap::new())),
293        }
294    }
295
296    /// Spawns a thread with access to this shared data
297    ///
298    /// This method creates a new thread with the given name and function.
299    /// The thread receives a clone of the shared data and can safely modify it.
300    ///
301    /// ## Arguments
302    ///
303    /// * `name` - A descriptive name for the thread (useful for debugging)
304    /// * `f` - A function that receives `ThreadShare<T>` and performs the thread's work
305    ///
306    /// ## Requirements
307    ///
308    /// The function `F` must:
309    /// - Implement `FnOnce(ThreadShare<T>)` - called once with shared data
310    /// - Implement `Send` - safe to send across thread boundaries
311    /// - Have `'static` lifetime - no borrowed references
312    ///
313    /// The type `T` must implement `Send + Sync + 'static`.
314    ///
315    /// ## Returns
316    ///
317    /// `Ok(())` on success, `Err(String)` if thread spawning fails.
318    ///
319    /// ## Example
320    ///
321    /// ```rust
322    /// use thread_share::EnhancedThreadShare;
323    ///
324    /// let enhanced = EnhancedThreadShare::new(0);
325    ///
326    /// // Spawn a worker thread
327    /// enhanced.spawn("worker", |data| {
328    ///     for _ in 0..100 {
329    ///         data.update(|x| *x += 1);
330    ///         std::thread::sleep(std::time::Duration::from_millis(10));
331    ///     }
332    /// }).expect("Failed to spawn worker");
333    ///
334    /// // Spawn a monitor thread
335    /// enhanced.spawn("monitor", |data| {
336    ///     for _ in 0..10 {
337    ///         std::thread::sleep(std::time::Duration::from_millis(100));
338    ///         println!("Current value: {}", data.get());
339    ///     }
340    /// }).expect("Failed to spawn monitor");
341    /// ```
342    pub fn spawn<F>(&self, name: &str, f: F) -> Result<(), String>
343    where
344        F: FnOnce(ThreadShare<T>) + Send + 'static,
345        T: Send + Sync + 'static,
346    {
347        let thread_name = name.to_string();
348        let thread_data = self.inner.clone();
349
350        let handle = thread::spawn(move || {
351            f(thread_data);
352        });
353
354        self.threads.lock().unwrap().insert(thread_name, handle);
355        Ok(())
356    }
357
358    /// Spawns multiple threads with different names and functions
359    ///
360    /// This method spawns multiple threads from a vector of configurations.
361    /// Each configuration contains a thread name and a function.
362    ///
363    /// ## Arguments
364    ///
365    /// * `thread_configs` - Vector of `(name, function)` tuples
366    ///
367    /// ## Requirements
368    ///
369    /// The function `F` must implement `Clone` in addition to the standard requirements.
370    ///
371    /// ## Returns
372    ///
373    /// `Ok(())` on success, `Err(String)` if any thread spawning fails.
374    /// ```
375    pub fn spawn_multiple<F>(&self, thread_configs: Vec<(&str, F)>) -> Result<(), String>
376    where
377        F: FnOnce(ThreadShare<T>) + Send + Clone + 'static,
378        T: Send + Sync + 'static,
379    {
380        for (name, func) in thread_configs {
381            self.spawn(name, func)?;
382        }
383        Ok(())
384    }
385
386    /// Spawns multiple threads with boxed closures
387    ///
388    /// This method spawns multiple threads using boxed closures, which allows
389    /// for different function types in the same vector.
390    ///
391    /// ## Arguments
392    ///
393    /// * `thread_configs` - Vector of `(name, boxed_function)` tuples
394    ///
395    /// ## Returns
396    ///
397    /// `Ok(())` on success, `Err(String)` if any thread spawning fails.
398    ///
399    /// ## Example
400    ///
401    /// ```rust
402    /// use thread_share::EnhancedThreadShare;
403    ///
404    /// let enhanced = EnhancedThreadShare::new(0);
405    ///
406    /// let configs = vec![
407    ///     ("worker1", Box::new(|data: thread_share::ThreadShare<i32>| { data.update(|x| *x = *x + 1); }) as Box<dyn FnOnce(thread_share::ThreadShare<i32>) + Send>),
408    ///     ("worker2", Box::new(|data: thread_share::ThreadShare<i32>| { data.update(|x| *x = *x + 2); }) as Box<dyn FnOnce(thread_share::ThreadShare<i32>) + Send>),
409    /// ];
410    ///
411    /// enhanced.spawn_multiple_boxed(configs).expect("Failed to spawn threads");
412    /// ```
413    pub fn spawn_multiple_boxed(
414        &self,
415        thread_configs: Vec<(&str, Box<dyn FnOnce(ThreadShare<T>) + Send>)>,
416    ) -> Result<(), String>
417    where
418        T: Send + Sync + 'static,
419    {
420        for (name, func) in thread_configs {
421            let thread_data = self.inner.clone();
422            let handle = thread::spawn(move || {
423                func(thread_data);
424            });
425            self.threads
426                .lock()
427                .unwrap()
428                .insert(name.to_string(), handle);
429        }
430        Ok(())
431    }
432
433    /// Waits for all spawned threads to complete
434    ///
435    /// This method blocks until all spawned threads have finished execution.
436    /// It joins each thread and returns an error if any thread panics.
437    ///
438    /// ## Returns
439    ///
440    /// `Ok(())` when all threads complete successfully, `Err(String)` if any thread fails.
441    ///
442    /// ## Example
443    ///
444    /// ```rust
445    /// use thread_share::EnhancedThreadShare;
446    ///
447    /// let enhanced = EnhancedThreadShare::new(0);
448    ///
449    /// enhanced.spawn("worker", |data| {
450    ///     data.update(|x| *x = *x + 100);
451    /// }).expect("Failed to spawn worker");
452    ///
453    /// // Wait for all threads to complete
454    /// enhanced.join_all().expect("Thread execution failed");
455    ///
456    /// // Now safe to access the final result
457    /// assert_eq!(enhanced.get(), 100);
458    /// ```
459    pub fn join_all(&self) -> Result<(), String> {
460        let mut threads = self.threads.lock().unwrap();
461        let thread_handles: Vec<_> = threads.drain().collect();
462        drop(threads);
463
464        for (name, handle) in thread_handles {
465            let result = handle.join();
466            if let Err(e) = result {
467                return Err(format!("Thread '{}' failed: {:?}", name, e));
468            }
469        }
470        Ok(())
471    }
472
473    /// Gets the number of active threads
474    ///
475    /// This method returns the current number of threads that are still running.
476    ///
477    /// ## Returns
478    ///
479    /// The number of active threads.
480    ///
481    /// ## Example
482    ///
483    /// ```rust
484    /// use thread_share::EnhancedThreadShare;
485    /// use std::time::Duration;
486    ///
487    /// let enhanced = EnhancedThreadShare::new(0);
488    ///
489    /// enhanced.spawn("worker", |data| {
490    ///     std::thread::sleep(Duration::from_millis(100));
491    /// }).expect("Failed to spawn worker");
492    ///
493    /// println!("Active threads: {}", enhanced.active_threads()); // Prints: 1
494    ///
495    /// // Wait for completion
496    /// enhanced.join_all().expect("Failed to join");
497    ///
498    /// println!("Active threads: {}", enhanced.active_threads()); // Prints: 0
499    /// ```
500    pub fn active_threads(&self) -> usize {
501        self.threads.lock().unwrap().len()
502    }
503
504    /// Checks if all threads have completed
505    ///
506    /// This method returns `true` if there are no active threads, `false` otherwise.
507    ///
508    /// ## Returns
509    ///
510    /// `true` if all threads have completed, `false` if any threads are still running.
511    ///
512    /// ## Example
513    ///
514    /// ```rust
515    /// use thread_share::EnhancedThreadShare;
516    ///
517    /// let enhanced = EnhancedThreadShare::new(0);
518    ///
519    /// enhanced.spawn("worker", |data| {
520    ///     data.update(|x| *x = *x + 1);
521    /// }).expect("Failed to spawn worker");
522    ///
523    /// assert!(!enhanced.is_complete()); // Thread is still running
524    ///
525    /// enhanced.join_all().expect("Failed to join");
526    ///
527    /// assert!(enhanced.is_complete()); // All threads completed
528    /// ```
529    pub fn is_complete(&self) -> bool {
530        self.threads.lock().unwrap().is_empty()
531    }
532
533    /// Gets a reference to the threads HashMap for external management
534    pub fn get_threads(&self) -> Arc<Mutex<HashMap<String, thread::JoinHandle<()>>>> {
535        self.threads.clone()
536    }
537
538    /// Delegates all ThreadShare methods
539    ///
540    /// Gets a copy of the shared data.
541    ///
542    /// ## Requirements
543    ///
544    /// The type `T` must implement `Clone` trait.
545    ///
546    /// ## Returns
547    ///
548    /// A copy of the current data.
549    ///
550    /// ## Example
551    ///
552    /// ```rust
553    /// use thread_share::EnhancedThreadShare;
554    ///
555    /// let enhanced = EnhancedThreadShare::new(42);
556    /// let value = enhanced.get();
557    /// assert_eq!(value, 42);
558    /// ```
559    pub fn get(&self) -> T
560    where
561        T: Clone,
562    {
563        self.inner.get()
564    }
565
566    /// Sets new data and notifies waiting threads
567    ///
568    /// This method replaces the current data and notifies all threads
569    /// waiting for changes.
570    ///
571    /// ## Arguments
572    ///
573    /// * `new_data` - The new data to set
574    ///
575    /// ## Example
576    ///
577    /// ```rust
578    /// use thread_share::EnhancedThreadShare;
579    ///
580    /// let enhanced = EnhancedThreadShare::new(0);
581    /// enhanced.set(100);
582    /// assert_eq!(enhanced.get(), 100);
583    /// ```
584    pub fn set(&self, new_data: T) {
585        self.inner.set(new_data);
586    }
587
588    pub fn update<F>(&self, f: F)
589    where
590        F: FnOnce(&mut T),
591    {
592        self.inner.update(f);
593    }
594
595    pub fn read<F, R>(&self, f: F) -> R
596    where
597        F: FnOnce(&T) -> R,
598    {
599        self.inner.read(f)
600    }
601
602    pub fn write<F, R>(&self, f: F) -> R
603    where
604        F: FnOnce(&mut T) -> R,
605    {
606        self.inner.write(f)
607    }
608
609    pub fn wait_for_change(&self, timeout: std::time::Duration) -> bool {
610        self.inner.wait_for_change(timeout)
611    }
612
613    pub fn wait_for_change_forever(&self) {
614        self.inner.wait_for_change_forever();
615    }
616
617    pub fn clone(&self) -> Self {
618        Self {
619            inner: self.inner.clone(),
620            threads: Arc::new(Mutex::new(HashMap::new())),
621        }
622    }
623}
624
625impl<T> Clone for EnhancedThreadShare<T> {
626    fn clone(&self) -> Self {
627        self.clone()
628    }
629}
630
631#[cfg(feature = "serialize")]
632impl<T: Serialize + Clone + for<'de> Deserialize<'de>> EnhancedThreadShare<T> {
633    /// Serializes the shared data to JSON
634    ///
635    /// This method serializes the current data of type `T` to a JSON string.
636    ///
637    /// ## Returns
638    ///
639    /// A JSON string representation of the data.
640    ///
641    /// ## Example
642    ///
643    /// ```rust
644    /// use thread_share::EnhancedThreadShare;
645    ///
646    /// let enhanced = EnhancedThreadShare::new(42);
647    /// let json_string = enhanced.to_json().expect("Failed to serialize");
648    /// assert_eq!(json_string, "42");
649    /// ```
650    pub fn to_json(&self) -> Result<String, serde_json::Error> {
651        serde_json::to_string(&self.inner.get())
652    }
653
654    /// Deserializes JSON data back into the shared data
655    ///
656    /// This method takes a JSON string and attempts to deserialize it back into
657    /// the shared data type `T`.
658    ///
659    /// ## Arguments
660    ///
661    /// * `json_string` - The JSON string to deserialize
662    ///
663    /// ## Returns
664    ///
665    /// `Ok(())` on success, `Err(serde_json::Error)` if deserialization fails.
666    ///
667    /// ## Example
668    ///
669    /// ```rust
670    /// use thread_share::EnhancedThreadShare;
671    ///
672    /// let enhanced = EnhancedThreadShare::new(42);
673    /// let json_string = enhanced.to_json().unwrap();
674    /// enhanced.from_json(&json_string).unwrap();
675    /// assert_eq!(enhanced.get(), 42);
676    /// ```
677    pub fn from_json(&self, json_string: &str) -> Result<(), serde_json::Error> {
678        let data: T = serde_json::from_str(json_string)?;
679        self.set(data);
680        Ok(())
681    }
682}
683
684
685
686