computation_process/
lib.rs

1//! # Computation process
2//!
3//! A Rust library for defining stateful computations (and generators) that support
4//! suspend/resume, interleaving, cancellation, and serialization.
5//!
6//! This crate does not use `unsafe`.
7
8#![forbid(unsafe_code)]
9#![warn(missing_docs)]
10
11//!
12//! ## Overview
13//!
14//! This library provides abstractions for defining "long-running" computations.
15//! The concepts are similar to asynchronous code but offer features that were never
16//! a priority in async programming. **The target audience are projects that implement
17//! CPU-intensive, long-running computations but require granular control over the
18//! computation state.**
19//!
20//! ## Key Features
21//!
22//! - **Cancellation:** Each computation can be forcefully stopped using cooperative
23//!   cancellation (compatible with the [`cancel-this`](https://crates.io/crates/cancel-this) crate).
24//! - **Suspend/resume:** A computation can define safe suspend points. During these points,
25//!   it is safe to serialize, interleave, or otherwise "transfer" the computation.
26//! - **Interleaving:** Suspend points allow safely interleaving multiple computations on a
27//!   single thread with priority-based scheduling.
28//! - **Serialization:** The state of each computation is isolated and can be saved/restored.
29//!
30//! ## Core Concepts
31//!
32//! - [`Completable<T>`]: A result type that can be incomplete (`Suspended`, `Cancelled`, or `Exhausted`).
33//! - [`Computable<T>`]: A trait for objects that can be driven to completion by calling [`Computable::try_compute`].
34//! - [`Algorithm<CTX, STATE, T>`]: Extends [`Computable`] with access to context and state.
35//! - [`Generatable<T>`]: Like [`Computable`], but produces a stream of values.
36//! - [`GenAlgorithm<CTX, STATE, T>`]: Extends [`Generatable`] with context and state.
37//! - [`Computation`] and [`Generator`]: Default implementations using step functions.
38//!
39//! ## Quick Example
40//!
41//! ```rust
42//! use computation_process::{Computation, ComputationStep, Completable, Incomplete, Computable, Stateful};
43//!
44//! struct CountingStep;
45//!
46//! impl ComputationStep<u32, u32, u32> for CountingStep {
47//!     fn step(target: &u32, count: &mut u32) -> Completable<u32> {
48//!         *count += 1;
49//!         if *count >= *target {
50//!             Ok(*count)
51//!         } else {
52//!             Err(Incomplete::Suspended)
53//!         }
54//!     }
55//! }
56//!
57//! let mut computation = Computation::<u32, u32, u32, CountingStep>::from_parts(5, 0);
58//! let result = computation.compute().unwrap();
59//! assert_eq!(result, 5);
60//! ```
61
62// All traits/structs have dedicated modules for encapsulation, and we then re-export
63// these types here for easier public usage.
64
65mod algorithm;
66mod collector;
67mod completable;
68mod computable;
69mod computable_identity;
70mod computation;
71mod generatable;
72mod generator;
73
74#[cfg(all(feature = "serde", test))]
75mod test_serialization;
76
77pub use algorithm::{Algorithm, GenAlgorithm, Stateful};
78pub use collector::Collector;
79pub use completable::{Completable, Incomplete};
80pub use computable::{Computable, ComputableResult};
81pub use computable_identity::ComputableIdentity;
82pub use computation::{Computation, ComputationStep};
83pub use generatable::Generatable;
84pub use generator::{Generator, GeneratorStep};
85
86/// A type alias for `Box<dyn Computable<T>>`.
87pub type DynComputable<T> = Box<dyn Computable<T>>;
88
89/// A type alias for `Box<dyn Generatable<T>>`.
90pub type DynGeneratable<T> = Box<dyn Generatable<T>>;
91
92/// A type alias for `Box<dyn Algorithm<CONTEXT, STATE, OUTPUT>>`.
93pub type DynAlgorithm<CONTEXT, STATE, OUTPUT> = Box<dyn Algorithm<CONTEXT, STATE, OUTPUT>>;
94
95/// A type alias for `Box<dyn GenAlgorithm<CONTEXT, STATE, OUTPUT>>`.
96pub type DynGenAlgorithm<CONTEXT, STATE, ITEM> = Box<dyn GenAlgorithm<CONTEXT, STATE, ITEM>>;
97
98// Dummy implementations of Computable / Generatable for dynamic objects, because these
99// are not implemented automatically.
100
101impl<T> Computable<T> for DynComputable<T> {
102    fn try_compute(&mut self) -> Completable<T> {
103        (**self).try_compute()
104    }
105}
106
107impl<CONTEXT, STATE, OUTPUT> Computable<OUTPUT> for DynAlgorithm<CONTEXT, STATE, OUTPUT> {
108    fn try_compute(&mut self) -> Completable<OUTPUT> {
109        (**self).try_compute()
110    }
111}
112
113impl<T> Generatable<T> for DynGeneratable<T> {
114    fn try_next(&mut self) -> Option<Completable<T>> {
115        (**self).try_next()
116    }
117}
118
119impl<CONTEXT, STATE, OUTPUT> Generatable<OUTPUT> for DynGenAlgorithm<CONTEXT, STATE, OUTPUT> {
120    fn try_next(&mut self) -> Option<Completable<OUTPUT>> {
121        (**self).try_next()
122    }
123}
124
125#[cfg(test)]
126mod integration_tests {
127    use super::*;
128    use crate::{Computation, ComputationStep, Generator, GeneratorStep, Incomplete};
129
130    struct SumComputationStep;
131
132    impl ComputationStep<Vec<i32>, i32, i32> for SumComputationStep {
133        fn step(context: &Vec<i32>, state: &mut i32) -> Completable<i32> {
134            if *state < context.len() as i32 {
135                *state += 1;
136                Err(Incomplete::Suspended)
137            } else {
138                Ok(context.iter().sum())
139            }
140        }
141    }
142
143    #[test]
144    fn test_dyn_computable_integration() {
145        let identity: ComputableIdentity<i32> = 42.into();
146        let mut dyn_computable: DynComputable<i32> = identity.dyn_computable();
147        let result = dyn_computable.try_compute().unwrap();
148        assert_eq!(result, 42);
149    }
150
151    #[test]
152    fn test_dyn_algorithm_integration() {
153        let computation = Computation::<Vec<i32>, i32, i32, SumComputationStep>::from_parts(
154            vec![1, 2, 3, 4, 5],
155            0,
156        );
157        let mut dyn_algorithm: DynAlgorithm<Vec<i32>, i32, i32> = computation.dyn_algorithm();
158        let result = dyn_algorithm.compute().unwrap();
159        assert_eq!(result, 15);
160    }
161
162    #[test]
163    fn test_dyn_algorithm_as_computable() {
164        let computation =
165            Computation::<Vec<i32>, i32, i32, SumComputationStep>::from_parts(vec![10, 20], 0);
166        let mut dyn_algorithm: DynAlgorithm<Vec<i32>, i32, i32> = computation.dyn_algorithm();
167        // Test that DynAlgorithm implements Computable
168        let result = dyn_algorithm.try_compute();
169        assert!(matches!(result, Err(Incomplete::Suspended)));
170
171        let result = dyn_algorithm.compute().unwrap();
172        assert_eq!(result, 30);
173    }
174
175    struct RangeGeneratorStep;
176
177    impl GeneratorStep<i32, i32, i32> for RangeGeneratorStep {
178        fn step(context: &i32, state: &mut i32) -> Completable<Option<i32>> {
179            *state += 1;
180            if *state <= *context {
181                Ok(Some(*state))
182            } else {
183                Ok(None)
184            }
185        }
186    }
187
188    #[test]
189    fn test_dyn_generatable_integration() {
190        let generator = Generator::<i32, i32, i32, RangeGeneratorStep>::from_parts(5, 0);
191        let mut dyn_generatable: DynGeneratable<i32> = generator.dyn_generatable();
192
193        let mut items = Vec::new();
194        while let Some(item) = dyn_generatable.try_next() {
195            items.push(item.unwrap());
196        }
197
198        assert_eq!(items, vec![1, 2, 3, 4, 5]);
199    }
200
201    #[test]
202    fn test_dyn_gen_algorithm_integration() {
203        let generator = Generator::<i32, i32, i32, RangeGeneratorStep>::from_parts(3, 0);
204        let mut dyn_gen_algorithm: DynGenAlgorithm<i32, i32, i32> = generator.dyn_algorithm();
205
206        let mut items = Vec::new();
207        while let Some(item) = dyn_gen_algorithm.try_next() {
208            items.push(item.unwrap());
209        }
210
211        assert_eq!(items, vec![1, 2, 3]);
212    }
213
214    #[test]
215    fn test_dyn_gen_algorithm_as_generatable() {
216        let generator = Generator::<i32, i32, i32, RangeGeneratorStep>::from_parts(3, 0);
217        let mut dyn_gen_algorithm: DynGenAlgorithm<i32, i32, i32> = generator.dyn_algorithm();
218
219        // Test that DynGenAlgorithm implements Generatable
220        let item = dyn_gen_algorithm.try_next().unwrap().unwrap();
221        assert_eq!(item, 1);
222    }
223
224    #[test]
225    fn test_end_to_end_computation_with_suspensions() {
226        let computation =
227            Computation::<Vec<i32>, i32, i32, SumComputationStep>::from_parts(vec![1, 2, 3], 0);
228        let mut dyn_computable: DynComputable<i32> = computation.dyn_computable();
229
230        // The first call should suspend
231        assert!(matches!(
232            dyn_computable.try_compute(),
233            Err(Incomplete::Suspended)
234        ));
235
236        // The second call should suspend
237        assert!(matches!(
238            dyn_computable.try_compute(),
239            Err(Incomplete::Suspended)
240        ));
241
242        // The third call should suspend
243        assert!(matches!(
244            dyn_computable.try_compute(),
245            Err(Incomplete::Suspended)
246        ));
247
248        // Fourth call should complete
249        let result = dyn_computable.try_compute().unwrap();
250        assert_eq!(result, 6);
251    }
252
253    #[test]
254    fn test_end_to_end_generator_collection() {
255        let generator = Generator::<i32, i32, i32, RangeGeneratorStep>::from_parts(4, 0);
256        let mut computation = generator.computation::<Vec<i32>>();
257        let result = computation.compute().unwrap();
258        assert_eq!(result, vec![1, 2, 3, 4]);
259    }
260
261    #[test]
262    fn test_end_to_end_generator_with_dyn_types() {
263        let generator = Generator::<i32, i32, i32, RangeGeneratorStep>::from_parts(3, 0);
264        let dyn_generatable: DynGeneratable<i32> = generator.dyn_generatable();
265        let mut collector: Collector<i32, Vec<i32>> = dyn_generatable.into();
266        let result = collector.compute().unwrap();
267        assert_eq!(result, vec![1, 2, 3]);
268    }
269
270    #[test]
271    fn test_algorithm_run_static_method() {
272        let result =
273            Computation::<Vec<i32>, i32, i32, SumComputationStep>::run(vec![5, 10, 15], 0i32)
274                .unwrap();
275        assert_eq!(result, 30);
276    }
277
278    #[test]
279    fn test_computable_result_integration() {
280        let computation =
281            Computation::<Vec<i32>, i32, i32, SumComputationStep>::from_parts(vec![1, 2], 0);
282        let mut result = ComputableResult::new(computation);
283
284        // The computation suspends multiple times, so we need to call try_compute until it succeeds
285        let computed = loop {
286            match result.try_compute() {
287                Ok(value) => break value,
288                Err(Incomplete::Suspended) => continue,
289                Err(e) => panic!("Unexpected error: {:?}", e),
290            }
291        };
292        assert_eq!(*computed, 3);
293        let computed_ptr = computed as *const i32;
294
295        // Second call returns cached result
296        let cached = result.try_compute().unwrap();
297        assert_eq!(*cached, 3);
298        let cached_ptr = cached as *const i32;
299        assert_eq!(computed_ptr, cached_ptr);
300    }
301
302    #[test]
303    fn test_multiple_dyn_computable() {
304        let identity1: ComputableIdentity<i32> = 10.into();
305        let identity2: ComputableIdentity<i32> = 20.into();
306
307        let mut dyn1: DynComputable<i32> = identity1.dyn_computable();
308        let mut dyn2: DynComputable<i32> = identity2.dyn_computable();
309
310        assert_eq!(dyn1.try_compute().unwrap(), 10);
311        assert_eq!(dyn2.try_compute().unwrap(), 20);
312    }
313
314    // Cancellation integration tests
315
316    struct LongRunningStep;
317
318    impl ComputationStep<u32, u32, u32> for LongRunningStep {
319        fn step(target: &u32, state: &mut u32) -> Completable<u32> {
320            *state += 1;
321            if *state >= *target {
322                Ok(*state)
323            } else {
324                Err(Incomplete::Suspended)
325            }
326        }
327    }
328
329    #[test]
330    fn test_computation_cancellation() {
331        use cancel_this::{CancelAtomic, on_trigger};
332
333        let trigger = CancelAtomic::new();
334        trigger.cancel(); // Pre-cancel
335
336        let mut computation = Computation::<u32, u32, u32, LongRunningStep>::from_parts(1000, 0);
337
338        let result = on_trigger(trigger, || computation.try_compute());
339
340        assert!(matches!(result, Err(Incomplete::Cancelled(_))));
341    }
342
343    #[test]
344    fn test_computation_compute_with_cancellation() {
345        use cancel_this::{CancelAtomic, on_trigger};
346
347        let trigger = CancelAtomic::new();
348        trigger.cancel(); // Pre-cancel
349
350        let mut computation = Computation::<u32, u32, u32, LongRunningStep>::from_parts(1000, 0);
351
352        let result = on_trigger(trigger, || computation.compute());
353
354        assert!(result.is_err());
355    }
356
357    struct CancellableGeneratorStep;
358
359    impl GeneratorStep<u32, u32, u32> for CancellableGeneratorStep {
360        fn step(max: &u32, state: &mut u32) -> Completable<Option<u32>> {
361            *state += 1;
362            if *state <= *max {
363                Ok(Some(*state))
364            } else {
365                Ok(None)
366            }
367        }
368    }
369
370    #[test]
371    fn test_generator_iterator_cancellation() {
372        use cancel_this::{CancelAtomic, on_trigger};
373
374        let trigger = CancelAtomic::new();
375        trigger.cancel(); // Pre-cancel
376
377        let mut generator =
378            Generator::<u32, u32, u32, CancellableGeneratorStep>::from_parts(100, 0);
379
380        // on_trigger expects Result, so we wrap the iterator call
381        let result = on_trigger(trigger, || match generator.next() {
382            Some(Ok(v)) => Ok(Some(v)),
383            Some(Err(e)) => Err(e),
384            None => Ok(None),
385        });
386
387        // Should be canceled
388        assert!(result.is_err());
389    }
390
391    #[test]
392    fn test_collector_with_cancellation() {
393        use cancel_this::{CancelAtomic, on_trigger};
394
395        let trigger = CancelAtomic::new();
396        trigger.cancel(); // Pre-cancel
397
398        let generator = Generator::<u32, u32, u32, CancellableGeneratorStep>::from_parts(100, 0);
399        let mut collector: Collector<u32, Vec<u32>> = generator.dyn_generatable().into();
400
401        let result = on_trigger(trigger, || collector.compute());
402
403        assert!(result.is_err());
404    }
405}