
Thread-Share
"I got tired of playing around with data passing between threads and decided to write this library"
A powerful Rust library for safe data exchange between threads with automatic thread management.
๐ฏ What Problem Does This Solve?
Working with shared data between threads in Rust is often frustrating:
- Manual
Arc<Mutex<T>> or Arc<RwLock<T>> combinations
- Complex ownership patterns
- Boilerplate code for every thread-safe structure
- Manual thread spawning and joining
Thread-Share provides a simple API that handles all the complexity for you.
๐ Features
- ๐ Thread-Safe: Built-in synchronization with
RwLock and AtomicPtr
- โก High Performance: Efficient
parking_lot synchronization primitives
- ๐งต Automatic Thread Management: Spawn and manage multiple threads with one macro
- ๐ฆ Zero-Copy: Support for working without cloning data between threads
- ๐ Change Detection: Built-in waiting mechanisms for data changes
- โจ Macro Support: Convenient macros for quick setup
- ๐ Serialization Support: JSON serialization for all types with
serialize feature
๐ฆ Installation
cargo add thread-share
cargo add thread-share --features serialize
๐ Quick Start
Basic Usage with Thread Management
use thread_share::{share, enhanced_share, spawn_workers};
use std::time::Duration;
fn main() {
let counter = enhanced_share!(0);
let manager = spawn_workers!(counter, {
incrementer: |counter| {
for i in 1..=10 {
counter.set(i);
std::thread::sleep(Duration::from_millis(100));
}
},
monitor: |counter| {
for _ in 0..10 {
println!("Value: {}", counter.get());
std::thread::sleep(Duration::from_millis(200));
}
}
});
while counter.get() < 10 {
println!("Counter: {}", counter.get());
std::thread::sleep(Duration::from_millis(150));
}
let counter_clone = counter.clone();
let additional_worker = std::thread::spawn(move || {
for _ in 0..3 {
counter_clone.update(|x| *x *= 2);
std::thread::sleep(Duration::from_millis(300));
}
});
manager.add_worker("multiplier", additional_worker)?;
println!("Active workers: {}", manager.active_workers());
println!("Worker names: {:?}", manager.get_worker_names());
manager.pause_worker("incrementer")?;
println!("Incrementer paused for 1 second");
std::thread::sleep(Duration::from_secs(1));
manager.resume_worker("incrementer")?;
println!("Incrementer resumed");
manager.remove_worker("monitor")?;
println!("Monitor worker stopped and removed");
manager.remove_all_workers()?;
println!("All workers stopped");
println!("Final active workers: {}", manager.active_workers());
manager.join_all().expect("Failed to join threads");
}
๐ง Core Types
1. ThreadShare - Basic Thread-Safe Data
use thread_share::share;
let data = share!(vec![1, 2, 3]);
data.set(vec![4, 5, 6]); let value = data.get(); data.update(|v| v.push(7));
data.wait_for_change_forever(); data.wait_for_change(timeout);
2. EnhancedThreadShare - Automatic Thread Management
use thread_share::{enhanced_share, spawn_workers};
let data = enhanced_share!(0);
let manager = spawn_workers!(data, {
worker1: |data| { },
worker2: |data| { }
});
manager.join_all().expect("Failed to join threads");
3. WorkerManager - Fine-Grained Control
let manager = spawn_workers!(data, {
worker: |data| { }
});
manager.pause_worker("worker")?;
manager.resume_worker("worker")?;
manager.remove_worker("worker")?;
println!("Active workers: {}", manager.active_workers());
println!("Worker names: {:?}", manager.get_worker_names());
Creating WorkerManager Directly
use thread_share::worker_manager::WorkerManager;
use std::thread;
use std::time::Duration;
fn main() {
let manager = WorkerManager::new();
let handle1 = thread::spawn(|| {
for i in 1..=5 {
println!("Worker 1: {}", i);
thread::sleep(Duration::from_millis(100));
}
});
let handle2 = thread::spawn(|| {
for i in 1..=3 {
println!("Worker 2: {}", i);
thread::sleep(Duration::from_millis(150));
}
});
manager.add_worker("worker1", handle1)?;
manager.add_worker("worker2", handle2)?;
println!("Active workers: {}", manager.active_workers());
println!("Worker names: {:?}", manager.get_worker_names());
manager.join_all()?;
}
Creating WorkerManager with Existing Threads
use thread_share::{enhanced_share, worker_manager::WorkerManager};
fn main() {
let data = enhanced_share!(0);
let existing_threads = data.get_threads();
let manager = WorkerManager::new_with_threads(existing_threads);
let data_clone = data.clone();
let new_worker = thread::spawn(move || {
for _ in 0..3 {
data_clone.update(|x| *x += 10);
thread::sleep(Duration::from_millis(200));
}
});
manager.add_worker("additional_worker", new_worker)?;
println!("Active workers: {}", manager.active_workers());
manager.join_all()?;
}
Worker State Management
use thread_share::{enhanced_share, spawn_workers};
fn main() {
let data = enhanced_share!(0);
let manager = spawn_workers!(data, {
counter: |data| {
for i in 1..=10 {
data.set(i);
thread::sleep(Duration::from_millis(500));
}
},
monitor: |data| {
for _ in 0..10 {
println!("Value: {}", data.get());
thread::sleep(Duration::from_millis(1000));
}
}
});
thread::sleep(Duration::from_secs(2));
manager.pause_worker("counter")?;
println!("Counter worker paused");
thread::sleep(Duration::from_secs(1));
manager.resume_worker("counter")?;
println!("Counter worker resumed");
manager.remove_worker("monitor")?;
println!("Monitor worker removed");
manager.join_all()?;
}
๐ Examples
Producer-Consumer Pattern
use thread_share::{enhanced_share, spawn_workers};
fn main() {
let queue = enhanced_share!(Vec::<String>::new());
let manager = spawn_workers!(queue, {
producer: |queue| {
for i in 0..5 {
queue.update(|q| q.push(format!("Message {}", i)));
std::thread::sleep(Duration::from_millis(100));
}
},
consumer: |queue| {
let mut processed = 0;
while processed < 5 {
let messages = queue.get();
if !messages.is_empty() {
queue.update(|q| {
if let Some(msg) = q.pop() {
println!("Processed: {}", msg);
processed += 1;
}
});
} else {
std::thread::sleep(Duration::from_millis(50));
}
}
}
});
manager.join_all().expect("Failed to join threads");
}
HTTP Server with Visit Counter
use thread_share::{enhanced_share, spawn_workers};
use std::net::{TcpListener, TcpStream};
use std::io::{Read, Write};
fn main() {
let visits = enhanced_share!(0);
let manager = spawn_workers!(visits, {
server: |visits| {
let listener = TcpListener::bind("127.0.0.1:8080").unwrap();
for stream in listener.incoming() {
let mut stream = stream.unwrap();
let mut buffer = [0; 1024];
stream.read(&mut buffer).unwrap();
if buffer.starts_with(b"GET / ") {
visits.update(|v| *v += 1);
}
let response = "HTTP/1.1 200 OK\r\n\r\nHello World!";
stream.write(response.as_bytes()).unwrap();
}
}
});
for _ in 0..10 {
std::thread::sleep(Duration::from_secs(1));
println!("Total visits: {}", visits.get());
}
}
Dynamic Worker Management
use thread_share::{enhanced_share, spawn_workers, worker_manager::WorkerManager};
fn main() {
let data = enhanced_share!(0);
let manager = spawn_workers!(data, {
counter: |data| {
for i in 1..=5 {
data.set(i);
std::thread::sleep(Duration::from_millis(500));
}
}
});
let data_clone = data.clone();
let new_worker = std::thread::spawn(move || {
for _ in 0..3 {
data_clone.update(|x| *x *= 2);
std::thread::sleep(Duration::from_millis(300));
}
});
manager.add_worker("multiplier", new_worker)?;
manager.pause_worker("counter")?;
std::thread::sleep(Duration::from_secs(1));
manager.resume_worker("counter")?;
manager.join_all()?;
}
Custom Types with Change Detection
use thread_share::{enhanced_share, spawn_workers};
#[derive(Clone, Debug)]
struct User {
name: String,
age: u32,
is_online: bool,
}
fn main() {
let user = enhanced_share!(User {
name: "Alice".to_string(),
age: 25,
is_online: true,
});
let manager = spawn_workers!(user, {
age_updater: |user| {
for _ in 0..5 {
user.update(|u| u.age += 1);
std::thread::sleep(Duration::from_millis(200));
}
},
status_toggler: |user| {
for _ in 0..5 {
user.update(|u| u.is_online = !u.is_online);
std::thread::sleep(Duration::from_millis(300));
}
},
monitor: |user| {
for _ in 0..10 {
let current = user.get();
println!("User: {} ({}), Age: {}, Online: {}",
current.name, current.age, current.age, current.is_online);
std::thread::sleep(Duration::from_millis(500));
}
}
});
manager.join_all().expect("Failed to join threads");
let final_user = user.get();
println!("Final state: {:?}", final_user);
}
๐ When to Use Each Type
| Type |
Use Case |
Description |
| ThreadShare |
Simple data sharing |
Basic thread-safe data with manual thread management |
| EnhancedThreadShare |
Multi-threaded apps |
Automatic thread spawning and joining |
| WorkerManager |
Complex workflows |
Fine-grained control over individual workers |
โ ๏ธ Important Notes
ArcThreadShare - Use with Caution
use thread_share::ArcThreadShare;
let data = ArcThreadShare::new(0);
data.update(|x| *x += 1);
data.increment(); data.add(5);
When NOT to use ArcThreadShare:
- High-frequency updates (>1000 ops/second)
- Critical data integrity requirements
- Predictable performance needs
๐งช Running Examples
cargo run --example basic_usage
cargo run --example atomic_usage
cargo run --example worker_management
cargo run --example http_integration_helpers
cargo run --example socket_client_usage
cargo test
๐ง Requirements
- Rust: 1.85.0 or higher
- Dependencies:
parking_lot (required), serde (optional)
๐ License
MIT License - see LICENSE file for details.
๐ค Contributing
Contributions welcome! Please submit a Pull Request.
๐ More Examples
Check the examples/ directory for complete working examples:
- HTTP servers with Tokio and async-std
- Socket client with Node.js server
- Advanced worker management patterns
- Performance benchmarks