lambda_channel/lib.rs
1pub mod channel;
2pub mod err;
3pub mod thread;
4
5/// Creates two normal [`channels`] and connects them to a new [`ThreadPool`] to create a
6/// multi-producer multi-consumer multi-threaded lambda-channel.
7///
8/// [`channels`]: channel
9/// [`ThreadPool`]: thread::ThreadPool
10///
11/// # Examples
12///
13/// ```
14/// use crossbeam_channel::RecvError;
15/// use lambda_channel::new_lambda_channel;
16///
17/// fn fib(_: &Option<()>, n: i32) -> u64 {
18/// if n <= 1 {
19/// n as u64
20/// } else {
21/// fib(&None, n - 1) + fib(&None, n - 2)
22/// }
23/// }
24///
25/// let (s, r, _p) = new_lambda_channel(None, None, None, fib);
26/// s.send(20).unwrap();
27/// assert_eq!(r.recv(), Ok(6765));
28///
29/// s.send(10).unwrap();
30/// drop(s);
31/// assert_eq!(r.recv(), Ok(55));
32/// assert_eq!(r.recv(), Err(RecvError));
33/// ```
34pub fn new_lambda_channel<T: Send + 'static, U: Send + 'static, V: Clone + Send + 'static>(
35 input_capacity: Option<usize>,
36 output_capacity: Option<usize>,
37 shared_resource: V,
38 function: fn(&V, T) -> U,
39) -> (channel::Sender<T>, channel::Receiver<U>, thread::ThreadPool) {
40 let (out_tx, rx) = channel::new_channel(output_capacity);
41 let (tx, in_rx) = channel::new_channel_with_dependency(input_capacity, &out_tx, &rx);
42
43 let pool = thread::ThreadPool::new_lambda_pool(in_rx, out_tx, shared_resource, function);
44
45 (tx, rx, pool)
46}
47
48/// Connects two existing [`channels`] to a new [`ThreadPool`] to
49/// create a multi-producer multi-consumer multi-threaded lambda-channel.
50/// While it is not required for the input channel of the lambda-channel to have the output
51/// channel as a dependency, it may lead to undesired termination behaviors.
52///
53/// [`channels`]: channel
54/// [`ThreadPool`]: thread::ThreadPool
55///
56/// # Examples
57///
58/// ```
59/// use lambda_channel::new_lambda_channel_with_input_and_output;
60/// use lambda_channel::channel::{new_channel, new_channel_with_dependency};
61///
62/// fn convert_units(c: &f64, u: f64) -> f64 {
63/// c * u
64/// }
65///
66/// fn approx_eq(v: f64, c: f64) -> bool {
67/// if (v > (c + 0.000001)) || (v < (c - 0.000001)) {
68/// return false;
69/// }
70/// true
71/// }
72///
73/// let (s_km, r_km) = new_channel(None);
74/// let (s_mile, r_mile_to_km) = new_channel_with_dependency(None, &s_km, &r_km);
75/// let (s_nautical_mile, r_nautical_mile_to_km) = new_channel_with_dependency(None, &s_km, &r_km);
76///
77/// let _p_mile_to_km = new_lambda_channel_with_input_and_output(r_mile_to_km, s_km.clone(), 1.60934, convert_units);
78/// let _p_nautical_mile_to_km = new_lambda_channel_with_input_and_output(r_nautical_mile_to_km, s_km, 1.852, convert_units);
79///
80/// s_mile.send(12.0).unwrap();
81/// s_nautical_mile.send(2.0).unwrap();
82///
83/// let msg1 = r_km.recv().unwrap();
84/// let msg2 = r_km.recv().unwrap();
85/// assert!(approx_eq(msg1 + msg2, 23.01608));
86/// ```
87pub fn new_lambda_channel_with_input_and_output<
88 T: Send + 'static,
89 U: Send + 'static,
90 V: Clone + Send + 'static,
91>(
92 input_receiver: channel::Receiver<T>,
93 output_sender: channel::Sender<U>,
94 shared_resource: V,
95 function: fn(&V, T) -> U,
96) -> thread::ThreadPool {
97 thread::ThreadPool::new_lambda_pool(input_receiver, output_sender, shared_resource, function)
98}
99
100/// Creates a normal [`channel`] and connects it to a new [`ThreadPool`] to create a
101/// multi-producer multi-threaded lambda-sink.
102///
103/// [`ThreadPool`]: thread::ThreadPool
104///
105/// # Examples
106///
107/// ```
108/// use lambda_channel::new_lambda_sink;
109///
110/// fn do_something(_: &Option<()>, n: i32) {
111/// println!("Do something without output: {}", n);
112/// }
113///
114/// let (s, _p) = new_lambda_sink(Some(0), None, do_something);
115/// s.send(1).unwrap();
116/// ```
117pub fn new_lambda_sink<T: Send + 'static, V: Clone + Send + 'static>(
118 input_capacity: Option<usize>,
119 shared_resource: V,
120 function: fn(&V, T),
121) -> (channel::Sender<T>, thread::ThreadPool) {
122 let (tx, in_rx) = channel::new_channel(input_capacity);
123
124 let pool = thread::ThreadPool::new_sink_pool(in_rx, shared_resource, function);
125
126 (tx, pool)
127}
128
129/// Connects an existing [`channel`] to a new [`ThreadPool`] to create a
130/// multi-producer multi-threaded lambda-sink.
131///
132/// [`ThreadPool`]: thread::ThreadPool
133///
134/// # Examples
135///
136/// ```
137/// use std::collections::HashMap;
138/// use rand::{seq::SliceRandom, thread_rng};
139///
140/// use lambda_channel::new_lambda_sink_with_input_from;
141/// use lambda_channel::channel::new_channel;
142///
143/// fn generate_cipher_map() -> HashMap<char, char> {
144/// let mut rng = thread_rng();
145/// let mut alphanumeric: Vec<char> = Vec::new();
146/// alphanumeric.extend('0'..='9');
147/// alphanumeric.extend('a'..='z');
148/// alphanumeric.extend('A'..='Z');
149///
150/// let mut shuffled_alphanumeric = alphanumeric.clone();
151/// shuffled_alphanumeric.shuffle(&mut rng);
152///
153/// let mut cipher_map = HashMap::new();
154/// for (i, &char_val) in alphanumeric.iter().enumerate() {
155/// cipher_map.insert(char_val, shuffled_alphanumeric[i]);
156/// }
157/// cipher_map
158/// }
159///
160/// fn encode_text(cipher: &HashMap<char, char>, text: String) {
161/// let mut encoded_text = String::new();
162/// println!("Encoding: {}", text);
163/// for c in text.chars() {
164/// if let Some(v) = cipher.get(&c) {
165/// encoded_text.push(*v)
166/// } else {
167/// encoded_text.push(c)
168/// }
169/// }
170/// println!("Cipher Text: {}", encoded_text);
171/// }
172///
173/// let cipher_map = generate_cipher_map();
174/// let (s, r) = new_channel(Some(0));
175/// let _p = new_lambda_sink_with_input_from(r, cipher_map, encode_text);
176/// s.send("This is a test!".to_string()).unwrap();
177/// ```
178pub fn new_lambda_sink_with_input_from<T: Send + 'static, V: Clone + Send + 'static>(
179 input_receiver: channel::Receiver<T>,
180 shared_resource: V,
181 function: fn(&V, T),
182) -> thread::ThreadPool {
183 thread::ThreadPool::new_sink_pool(input_receiver, shared_resource, function)
184}