chain_reaction/
lib.rs

1//! # chain_reaction
2//! 
3//! This library provides a flexible and composable way to build data processing pipelines.
4//! It includes the `Reactor` and `TimedReactor` structs for building and executing these pipelines,
5//! along with various traits and helper functions.
6
7use std::fmt::Debug;
8use std::marker::PhantomData;
9use std::time::{Duration, Instant};
10use std::collections::HashMap;
11use std::rc::Rc;
12use std::cell::RefCell;
13use std::mem;
14
15/// A type alias for the Result type used throughout the library.
16/// `O` is the output type, and `E` is the error type (defaulting to `Failure`).
17pub type Out<O, E = Failure> = Result<O, E>;
18
19/// The `Act` trait defines the core behavior for all actions in the Reactor pipeline.
20pub trait Act<I, O, E = Failure> {
21    /// Performs the action on the input and returns the result.
22    fn act(&self, input: I) -> Out<O, E>;
23}
24
25/// The `ChainableAct` trait extends `Act` with the ability to chain actions together.
26pub trait ChainableAct<I, O, E = Failure>: Act<I, O, E> 
27where
28    Self: Sized,
29{
30    /// Chains this action with another, creating a new `Chain` action.
31    fn then<O2, T>(self, transform: T) -> Chain<Self, T, I, O, O2, E>
32    where
33        T: Act<O, O2, E>,
34    {
35        Chain {
36            first: self,
37            second: transform,
38            _marker: PhantomData,
39        }
40    }
41}
42
43/// The `Chain` struct represents a sequence of two actions.
44pub struct Chain<A, B, I, O1, O2, E> 
45where 
46    A: Act<I, O1, E>,
47    B: Act<O1, O2, E>,
48{
49    first: A,
50    second: B,
51    _marker: PhantomData<(I, O1, O2, E)>,
52}
53
54impl<A, B, I, O1, O2, E> Act<I, O2, E> for Chain<A, B, I, O1, O2, E>
55where
56    A: Act<I, O1, E>,
57    B: Act<O1, O2, E>,
58{
59    fn act(&self, input: I) -> Out<O2, E> {
60        self.first.act(input).and_then(|o1| self.second.act(o1))
61    }
62}
63
64/// Implement `ChainableAct` for all types that implement `Act`.
65impl<I, O, E, F> ChainableAct<I, O, E> for F
66where
67    F: Act<I, O, E>,
68{}
69
70/// Implement `Act` for all functions that take an input and return an `Out`.
71impl<I, O, E, F> Act<I, O, E> for F
72where
73    F: Fn(I) -> Out<O, E>,
74{
75    fn act(&self, input: I) -> Out<O, E> {
76        self(input)
77    }
78}
79
80/// An enum representing either a left or right value.
81#[derive(Debug)]
82pub enum Either<L, R> {
83    Left(L),
84    Right(R),
85}
86
87/// The `Reactor` struct represents a data processing pipeline.
88pub struct Reactor<I, E = Failure> {
89    input: Out<I, E>,
90}
91
92/// The `TimedReactor` struct extends `Reactor` with timing capabilities.
93pub struct TimedReactor<I, E = Failure> {
94    reactor: Rc<RefCell<Reactor<I, E>>>,
95    timings: Rc<RefCell<HashMap<String, Duration>>>,
96    current_operation: Rc<RefCell<Option<String>>>,
97    start_time: Rc<RefCell<Option<Instant>>>,
98}
99
100impl<I, E> Reactor<I, E>
101where
102    E: Debug,
103{
104    /// Creates a new `Reactor` with the given input.
105    pub fn input(input: I) -> Self {
106        Self { input: Ok(input) }
107    }
108
109    /// Applies a transformation to the current input.
110    pub fn then<O, T>(&mut self, transform: T) -> Reactor<O, E>
111    where
112        T: ChainableAct<I, O, E>,
113    {
114        let input = mem::replace(&mut self.input, Err(unsafe { std::mem::zeroed() }));
115        Reactor {
116            input: input.and_then(|i| transform.act(i)),
117        }
118    }
119
120    /// Applies one of two transformations based on a condition.
121    pub fn if_else<O1, O2, C, T1, T2>(
122        &mut self,
123        condition: C,
124        true_transform: T1,
125        false_transform: T2,
126    ) -> Reactor<Either<O1, O2>, E>
127    where
128        C: Fn(&I) -> bool,
129        T1: ChainableAct<I, O1, E>,
130        T2: ChainableAct<I, O2, E>,
131    {
132        let input = mem::replace(&mut self.input, Err(unsafe { std::mem::zeroed() }));
133        Reactor {
134            input: input.and_then(|i| {
135                if condition(&i) {
136                    true_transform.act(i).map(Either::Left)
137                } else {
138                    false_transform.act(i).map(Either::Right)
139                }
140            }),
141        }
142    }
143
144    /// Applies a transformation to each item in an iterable input.
145    pub fn for_each<O, T>(&mut self, transform: T) -> Reactor<Vec<O>, E>
146    where
147        I: IntoIterator,
148        T: ChainableAct<I::Item, O, E> + Clone,
149    {
150        let input = mem::replace(&mut self.input, Err(unsafe { std::mem::zeroed() }));
151        Reactor {
152            input: input.and_then(|i| {
153                i.into_iter()
154                    .map(|item| transform.act(item))
155                    .collect::<Result<Vec<_>, _>>()
156            }),
157        }
158    }
159
160    /// Applies a function to the current input.
161    pub fn map<O, F>(&mut self, f: F) -> Reactor<O, E>
162    where
163        F: FnOnce(I) -> O,
164    {
165        let input = mem::replace(&mut self.input, Err(unsafe { std::mem::zeroed() }));
166        Reactor {
167            input: input.map(f),
168        }
169    }
170
171    /// Applies a fallible function to the current input.
172    pub fn and_then<O, F>(&mut self, f: F) -> Reactor<O, E>
173    where
174        F: FnOnce(I) -> Result<O, E>,
175    {
176        let input = mem::replace(&mut self.input, Err(unsafe { std::mem::zeroed() }));
177        Reactor {
178            input: input.and_then(f),
179        }
180    }
181
182    /// Merges the first two items of an iterable input using the provided function.
183    pub fn merge<O, F>(&mut self, f: F) -> Reactor<O, E>
184    where
185        I: IntoIterator,
186        I::Item: Clone,
187        F: Fn(I::Item, I::Item) -> O,
188    {
189        let input = mem::replace(&mut self.input, Err(unsafe { std::mem::zeroed() }));
190        Reactor {
191            input: input.and_then(|i| {
192                let mut iter = i.into_iter();
193                match (iter.next(), iter.next()) {
194                    (Some(a), Some(b)) => Ok(f(a, b)),
195                    _ => panic!("Merge operation requires at least two items")
196                }
197            }),
198        }
199    }
200
201    /// Runs the reactor and returns the final result.
202    pub fn run(&mut self) -> Out<I, E> {
203        mem::replace(&mut self.input, Err(unsafe { std::mem::zeroed() }))
204    }
205}
206
207impl<I, E> TimedReactor<I, E>
208where
209    E: Debug,
210{
211    /// Creates a new `TimedReactor` with the given input.
212    pub fn input(input: I) -> Self {
213        Self {
214            reactor: Rc::new(RefCell::new(Reactor::input(input))),
215            timings: Rc::new(RefCell::new(HashMap::new())),
216            current_operation: Rc::new(RefCell::new(None)),
217            start_time: Rc::new(RefCell::new(None)),
218        }
219    }
220
221    /// Starts timing for the current operation.
222    fn start_timing(&self, operation: &str) {
223        *self.current_operation.borrow_mut() = Some(operation.to_string());
224        *self.start_time.borrow_mut() = Some(Instant::now());
225    }
226
227    /// Ends timing for the current operation and records the duration.
228    fn end_timing(&self) {
229        if let (Some(operation), Some(start_time)) = (
230            self.current_operation.borrow_mut().take(),
231            self.start_time.borrow_mut().take(),
232        ) {
233            let duration = start_time.elapsed();
234            self.timings.borrow_mut().insert(operation, duration);
235        }
236    }
237
238    /// Applies a transformation to the current input and records the timing.
239    pub fn then<O, T>(self, transform: T) -> TimedReactor<O, E>
240    where
241        T: Act<I, O, E>,
242    {
243        self.start_timing("then");
244        let new_reactor = Rc::new(RefCell::new(self.reactor.borrow_mut().then(transform)));
245        self.end_timing();
246        TimedReactor {
247            reactor: new_reactor,
248            timings: Rc::clone(&self.timings),
249            current_operation: Rc::clone(&self.current_operation),
250            start_time: Rc::clone(&self.start_time),
251        }
252    }
253
254    /// Applies one of two transformations based on a condition and records the timing.
255    pub fn if_else<O1, O2, C, T1, T2>(
256        self,
257        condition: C,
258        true_transform: T1,
259        false_transform: T2,
260    ) -> TimedReactor<Either<O1, O2>, E>
261    where
262        C: Fn(&I) -> bool,
263        T1: Act<I, O1, E>,
264        T2: Act<I, O2, E>,
265    {
266        self.start_timing("if_else");
267        let new_reactor = Rc::new(RefCell::new(self.reactor.borrow_mut().if_else(condition, true_transform, false_transform)));
268        self.end_timing();
269        TimedReactor {
270            reactor: new_reactor,
271            timings: Rc::clone(&self.timings),
272            current_operation: Rc::clone(&self.current_operation),
273            start_time: Rc::clone(&self.start_time),
274        }
275    }
276
277    /// Applies a transformation to each item in an iterable input and records the timing.
278    pub fn for_each<O, T>(self, transform: T) -> TimedReactor<Vec<O>, E>
279    where
280        I: IntoIterator,
281        T: Act<I::Item, O, E> + Clone,
282    {
283        self.start_timing("for_each");
284        let new_reactor = Rc::new(RefCell::new(self.reactor.borrow_mut().for_each(transform)));
285        self.end_timing();
286        TimedReactor {
287            reactor: new_reactor,
288            timings: Rc::clone(&self.timings),
289            current_operation: Rc::clone(&self.current_operation),
290            start_time: Rc::clone(&self.start_time),
291        }
292    }
293
294    /// Applies a function to the current input and records the timing.
295    pub fn map<O, F>(self, f: F) -> TimedReactor<O, E>
296    where
297        F: FnOnce(I) -> O,
298    {
299        self.start_timing("map");
300        let new_reactor = Rc::new(RefCell::new(self.reactor.borrow_mut().map(f)));
301        self.end_timing();
302        TimedReactor {
303            reactor: new_reactor,
304            timings: Rc::clone(&self.timings),
305            current_operation: Rc::clone(&self.current_operation),
306            start_time: Rc::clone(&self.start_time),
307        }
308    }
309
310    /// Applies a fallible function to the current input and records the timing.
311    pub fn and_then<O, F>(self, f: F) -> TimedReactor<O, E>
312    where
313        F: FnOnce(I) -> Result<O, E>,
314    {
315        self.start_timing("and_then");
316        let new_reactor = Rc::new(RefCell::new(self.reactor.borrow_mut().and_then(f)));
317        self.end_timing();
318        TimedReactor {
319            reactor: new_reactor,
320            timings: Rc::clone(&self.timings),
321            current_operation: Rc::clone(&self.current_operation),
322            start_time: Rc::clone(&self.start_time),
323        }
324    }
325
326    /// Merges the first two items of an iterable input using the provided function and records the timing.
327    pub fn merge<O, F>(self, f: F) -> TimedReactor<O, E>
328    where
329        I: IntoIterator,
330        I::Item: Clone,
331        F: Fn(I::Item, I::Item) -> O,
332    {
333        self.start_timing("merge");
334        let new_reactor = Rc::new(RefCell::new(self.reactor.borrow_mut().merge(f)));
335        self.end_timing();
336        TimedReactor {
337            reactor: new_reactor,
338            timings: Rc::clone(&self.timings),
339            current_operation: Rc::clone(&self.current_operation),
340            start_time: Rc::clone(&self.start_time),
341        }
342    }
343
344    /// Runs the reactor and returns the final result along with the recorded timings.
345    pub fn run(self) -> (Out<I, E>, HashMap<String, Duration>) {
346        (self.reactor.borrow_mut().run(), self.timings.borrow().clone())
347    }
348}
349
350/// A generic enum representing various types of failures that can occur in the Reactor pipeline.
351#[derive(Debug)]
352pub enum Failure {
353    /// Represents an invalid input value.
354    InvalidInput(String),
355    /// Represents an arithmetic error (e.g., division by zero).
356    ArithmeticError(String),
357    /// Represents a custom error with a message.
358    Custom(String),
359}
360
361/// Adds one to the input number.
362pub fn add_one(x: i32) -> Out<i32> {
363    Ok(x + 1)
364}
365
366/// Squares the input number, returning an error for negative inputs.
367pub fn square(x: i32) -> Out<i32> {
368    if x < 0 {
369        Err(Failure::InvalidInput("Negative input for square function".to_string()))
370    } else {
371        Ok(x * x)
372    }
373}
374
375/// Converts the input number to a string.
376pub fn to_string(x: i32) -> Out<String> {
377    Ok(x.to_string())
378}
379
380/// Doubles the input number.
381pub fn double(x: i32) -> Out<i32> {
382    Ok(x * 2)
383}
384
385/// Divides the first number by the second, returning an error for division by zero.
386pub fn divide(x: i32, y: i32) -> Out<i32> {
387    if y == 0 {
388        Err(Failure::ArithmeticError("Division by zero".to_string()))
389    } else {
390        Ok(x / y)
391    }
392}