computation_process/
lib.rs

1//! # Computation process
2//!
3//! A Rust library for defining stateful computations (and generators) that support
4//! suspend/resume, interleaving, cancellation, and serialization.
5//!
6//! This crate does not use `unsafe`.
7
8#![forbid(unsafe_code)]
9//!
10//! ## Overview
11//!
12//! This library provides abstractions for defining "long-running" computations.
13//! The concepts are similar to asynchronous code but offer features that were never
14//! a priority in async programming. **The target audience are projects that implement
15//! CPU-intensive, long-running computations but require granular control over the
16//! computation state.**
17//!
18//! ## Key Features
19//!
20//! - **Cancellation:** Each computation can be forcefully stopped using cooperative
21//!   cancellation (compatible with the [`cancel-this`](https://crates.io/crates/cancel-this) crate).
22//! - **Suspend/resume:** A computation can define safe suspend points. During these points,
23//!   it is safe to serialize, interleave, or otherwise "transfer" the computation.
24//! - **Interleaving:** Suspend points allow safely interleaving multiple computations on a
25//!   single thread with priority-based scheduling.
26//! - **Serialization:** The state of each computation is isolated and can be saved/restored.
27//!
28//! ## Core Concepts
29//!
30//! - [`Completable<T>`]: A result type that can be incomplete (`Suspended`, `Cancelled`, or `Exhausted`).
31//! - [`Computable<T>`]: A trait for objects that can be driven to completion by calling [`Computable::try_compute`].
32//! - [`Algorithm<CTX, STATE, T>`]: Extends [`Computable`] with access to context and state.
33//! - [`Generatable<T>`]: Like [`Computable`], but produces a stream of values.
34//! - [`GenAlgorithm<CTX, STATE, T>`]: Extends [`Generatable`] with context and state.
35//! - [`Computation`] and [`Generator`]: Default implementations using step functions.
36//!
37//! ## Quick Example
38//!
39//! ```rust
40//! use computation_process::{Computation, ComputationStep, Completable, Incomplete, Computable, Stateful};
41//!
42//! struct CountingStep;
43//!
44//! impl ComputationStep<u32, u32, u32> for CountingStep {
45//!     fn step(target: &u32, count: &mut u32) -> Completable<u32> {
46//!         *count += 1;
47//!         if *count >= *target {
48//!             Ok(*count)
49//!         } else {
50//!             Err(Incomplete::Suspended)
51//!         }
52//!     }
53//! }
54//!
55//! let mut computation = Computation::<u32, u32, u32, CountingStep>::from_parts(5, 0);
56//! let result = computation.compute().unwrap();
57//! assert_eq!(result, 5);
58//! ```
59
60// All traits/structs have dedicated modules for encapsulation, and we then re-export
61// these types here for easier public usage.
62
63mod algorithm;
64mod collector;
65mod completable;
66mod computable;
67mod computable_identity;
68mod computation;
69mod generatable;
70mod generator;
71
72pub use algorithm::{Algorithm, GenAlgorithm, Stateful};
73pub use collector::Collector;
74pub use completable::{Completable, Incomplete};
75pub use computable::{Computable, ComputableResult};
76pub use computable_identity::ComputableIdentity;
77pub use computation::{Computation, ComputationStep};
78pub use generatable::Generatable;
79pub use generator::{Generator, GeneratorStep};
80
81/// A type alias for `Box<dyn Computable<T>>`.
82pub type DynComputable<T> = Box<dyn Computable<T>>;
83
84/// A type alias for `Box<dyn Generatable<T>>`.
85pub type DynGeneratable<T> = Box<dyn Generatable<T>>;
86
87/// A type alias for `Box<dyn Algorithm<CONTEXT, STATE, OUTPUT>>`.
88pub type DynAlgorithm<CONTEXT, STATE, OUTPUT> = Box<dyn Algorithm<CONTEXT, STATE, OUTPUT>>;
89
90/// A type alias for `Box<dyn GenAlgorithm<CONTEXT, STATE, OUTPUT>>`.
91pub type DynGenAlgorithm<CONTEXT, STATE, ITEM> = Box<dyn GenAlgorithm<CONTEXT, STATE, ITEM>>;
92
93// Dummy implementations of Computable / Generatable for dynamic objects, because these
94// are not implemented automatically.
95
96impl<T> Computable<T> for DynComputable<T> {
97    fn try_compute(&mut self) -> Completable<T> {
98        (**self).try_compute()
99    }
100}
101
102impl<CONTEXT, STATE, OUTPUT> Computable<OUTPUT> for DynAlgorithm<CONTEXT, STATE, OUTPUT> {
103    fn try_compute(&mut self) -> Completable<OUTPUT> {
104        (**self).try_compute()
105    }
106}
107
108impl<T> Generatable<T> for DynGeneratable<T> {
109    fn try_next(&mut self) -> Option<Completable<T>> {
110        (**self).try_next()
111    }
112}
113
114impl<CONTEXT, STATE, OUTPUT> Generatable<OUTPUT> for DynGenAlgorithm<CONTEXT, STATE, OUTPUT> {
115    fn try_next(&mut self) -> Option<Completable<OUTPUT>> {
116        (**self).try_next()
117    }
118}
119
120#[cfg(test)]
121mod integration_tests {
122    use super::*;
123    use crate::{Computation, ComputationStep, Generator, GeneratorStep, Incomplete};
124
125    struct SumComputationStep;
126
127    impl ComputationStep<Vec<i32>, i32, i32> for SumComputationStep {
128        fn step(context: &Vec<i32>, state: &mut i32) -> Completable<i32> {
129            if *state < context.len() as i32 {
130                *state += 1;
131                Err(Incomplete::Suspended)
132            } else {
133                Ok(context.iter().sum())
134            }
135        }
136    }
137
138    #[test]
139    fn test_dyn_computable_integration() {
140        let identity: ComputableIdentity<i32> = 42.into();
141        let mut dyn_computable: DynComputable<i32> = identity.dyn_computable();
142        let result = dyn_computable.try_compute().unwrap();
143        assert_eq!(result, 42);
144    }
145
146    #[test]
147    fn test_dyn_algorithm_integration() {
148        let computation = Computation::<Vec<i32>, i32, i32, SumComputationStep>::from_parts(
149            vec![1, 2, 3, 4, 5],
150            0,
151        );
152        let mut dyn_algorithm: DynAlgorithm<Vec<i32>, i32, i32> = computation.dyn_algorithm();
153        let result = dyn_algorithm.compute().unwrap();
154        assert_eq!(result, 15);
155    }
156
157    #[test]
158    fn test_dyn_algorithm_as_computable() {
159        let computation =
160            Computation::<Vec<i32>, i32, i32, SumComputationStep>::from_parts(vec![10, 20], 0);
161        let mut dyn_algorithm: DynAlgorithm<Vec<i32>, i32, i32> = computation.dyn_algorithm();
162        // Test that DynAlgorithm implements Computable
163        let result = dyn_algorithm.try_compute();
164        assert!(matches!(result, Err(Incomplete::Suspended)));
165
166        let result = dyn_algorithm.compute().unwrap();
167        assert_eq!(result, 30);
168    }
169
170    struct RangeGeneratorStep;
171
172    impl GeneratorStep<i32, i32, i32> for RangeGeneratorStep {
173        fn step(context: &i32, state: &mut i32) -> Completable<Option<i32>> {
174            *state += 1;
175            if *state <= *context {
176                Ok(Some(*state))
177            } else {
178                Ok(None)
179            }
180        }
181    }
182
183    #[test]
184    fn test_dyn_generatable_integration() {
185        let generator = Generator::<i32, i32, i32, RangeGeneratorStep>::from_parts(5, 0);
186        let mut dyn_generatable: DynGeneratable<i32> = generator.dyn_generatable();
187
188        let mut items = Vec::new();
189        while let Some(item) = dyn_generatable.try_next() {
190            items.push(item.unwrap());
191        }
192
193        assert_eq!(items, vec![1, 2, 3, 4, 5]);
194    }
195
196    #[test]
197    fn test_dyn_gen_algorithm_integration() {
198        let generator = Generator::<i32, i32, i32, RangeGeneratorStep>::from_parts(3, 0);
199        let mut dyn_gen_algorithm: DynGenAlgorithm<i32, i32, i32> = generator.dyn_algorithm();
200
201        let mut items = Vec::new();
202        while let Some(item) = dyn_gen_algorithm.try_next() {
203            items.push(item.unwrap());
204        }
205
206        assert_eq!(items, vec![1, 2, 3]);
207    }
208
209    #[test]
210    fn test_dyn_gen_algorithm_as_generatable() {
211        let generator = Generator::<i32, i32, i32, RangeGeneratorStep>::from_parts(3, 0);
212        let mut dyn_gen_algorithm: DynGenAlgorithm<i32, i32, i32> = generator.dyn_algorithm();
213
214        // Test that DynGenAlgorithm implements Generatable
215        let item = dyn_gen_algorithm.try_next().unwrap().unwrap();
216        assert_eq!(item, 1);
217    }
218
219    #[test]
220    fn test_end_to_end_computation_with_suspensions() {
221        let computation =
222            Computation::<Vec<i32>, i32, i32, SumComputationStep>::from_parts(vec![1, 2, 3], 0);
223        let mut dyn_computable: DynComputable<i32> = computation.dyn_computable();
224
225        // The first call should suspend
226        assert!(matches!(
227            dyn_computable.try_compute(),
228            Err(Incomplete::Suspended)
229        ));
230
231        // The second call should suspend
232        assert!(matches!(
233            dyn_computable.try_compute(),
234            Err(Incomplete::Suspended)
235        ));
236
237        // The third call should suspend
238        assert!(matches!(
239            dyn_computable.try_compute(),
240            Err(Incomplete::Suspended)
241        ));
242
243        // Fourth call should complete
244        let result = dyn_computable.try_compute().unwrap();
245        assert_eq!(result, 6);
246    }
247
248    #[test]
249    fn test_end_to_end_generator_collection() {
250        let generator = Generator::<i32, i32, i32, RangeGeneratorStep>::from_parts(4, 0);
251        let mut computation = generator.computation::<Vec<i32>>();
252        let result = computation.compute().unwrap();
253        assert_eq!(result, vec![1, 2, 3, 4]);
254    }
255
256    #[test]
257    fn test_end_to_end_generator_with_dyn_types() {
258        let generator = Generator::<i32, i32, i32, RangeGeneratorStep>::from_parts(3, 0);
259        let dyn_generatable: DynGeneratable<i32> = generator.dyn_generatable();
260        let mut collector: Collector<i32, Vec<i32>> = dyn_generatable.into();
261        let result = collector.compute().unwrap();
262        assert_eq!(result, vec![1, 2, 3]);
263    }
264
265    #[test]
266    fn test_algorithm_run_static_method() {
267        let result =
268            Computation::<Vec<i32>, i32, i32, SumComputationStep>::run(vec![5, 10, 15], 0i32)
269                .unwrap();
270        assert_eq!(result, 30);
271    }
272
273    #[test]
274    fn test_computable_result_integration() {
275        let computation =
276            Computation::<Vec<i32>, i32, i32, SumComputationStep>::from_parts(vec![1, 2], 0);
277        let mut result = ComputableResult::new(computation);
278
279        // The computation suspends multiple times, so we need to call try_compute until it succeeds
280        let computed = loop {
281            match result.try_compute() {
282                Ok(value) => break value,
283                Err(Incomplete::Suspended) => continue,
284                Err(e) => panic!("Unexpected error: {:?}", e),
285            }
286        };
287        assert_eq!(*computed, 3);
288        let computed_ptr = computed as *const i32;
289
290        // Second call returns cached result
291        let cached = result.try_compute().unwrap();
292        assert_eq!(*cached, 3);
293        let cached_ptr = cached as *const i32;
294        assert_eq!(computed_ptr, cached_ptr);
295    }
296
297    #[test]
298    fn test_multiple_dyn_computable() {
299        let identity1: ComputableIdentity<i32> = 10.into();
300        let identity2: ComputableIdentity<i32> = 20.into();
301
302        let mut dyn1: DynComputable<i32> = identity1.dyn_computable();
303        let mut dyn2: DynComputable<i32> = identity2.dyn_computable();
304
305        assert_eq!(dyn1.try_compute().unwrap(), 10);
306        assert_eq!(dyn2.try_compute().unwrap(), 20);
307    }
308
309    // Cancellation integration tests
310
311    struct LongRunningStep;
312
313    impl ComputationStep<u32, u32, u32> for LongRunningStep {
314        fn step(target: &u32, state: &mut u32) -> Completable<u32> {
315            *state += 1;
316            if *state >= *target {
317                Ok(*state)
318            } else {
319                Err(Incomplete::Suspended)
320            }
321        }
322    }
323
324    #[test]
325    fn test_computation_cancellation() {
326        use cancel_this::{CancelAtomic, on_trigger};
327
328        let trigger = CancelAtomic::new();
329        trigger.cancel(); // Pre-cancel
330
331        let mut computation = Computation::<u32, u32, u32, LongRunningStep>::from_parts(1000, 0);
332
333        let result = on_trigger(trigger, || computation.try_compute());
334
335        assert!(matches!(result, Err(Incomplete::Cancelled(_))));
336    }
337
338    #[test]
339    fn test_computation_compute_with_cancellation() {
340        use cancel_this::{CancelAtomic, on_trigger};
341
342        let trigger = CancelAtomic::new();
343        trigger.cancel(); // Pre-cancel
344
345        let mut computation = Computation::<u32, u32, u32, LongRunningStep>::from_parts(1000, 0);
346
347        let result = on_trigger(trigger, || computation.compute());
348
349        assert!(result.is_err());
350    }
351
352    struct CancellableGeneratorStep;
353
354    impl GeneratorStep<u32, u32, u32> for CancellableGeneratorStep {
355        fn step(max: &u32, state: &mut u32) -> Completable<Option<u32>> {
356            *state += 1;
357            if *state <= *max {
358                Ok(Some(*state))
359            } else {
360                Ok(None)
361            }
362        }
363    }
364
365    #[test]
366    fn test_generator_iterator_cancellation() {
367        use cancel_this::{CancelAtomic, on_trigger};
368
369        let trigger = CancelAtomic::new();
370        trigger.cancel(); // Pre-cancel
371
372        let mut generator =
373            Generator::<u32, u32, u32, CancellableGeneratorStep>::from_parts(100, 0);
374
375        // on_trigger expects Result, so we wrap the iterator call
376        let result = on_trigger(trigger, || match generator.next() {
377            Some(Ok(v)) => Ok(Some(v)),
378            Some(Err(e)) => Err(e),
379            None => Ok(None),
380        });
381
382        // Should be canceled
383        assert!(result.is_err());
384    }
385
386    #[test]
387    fn test_collector_with_cancellation() {
388        use cancel_this::{CancelAtomic, on_trigger};
389
390        let trigger = CancelAtomic::new();
391        trigger.cancel(); // Pre-cancel
392
393        let generator = Generator::<u32, u32, u32, CancellableGeneratorStep>::from_parts(100, 0);
394        let mut collector: Collector<u32, Vec<u32>> = generator.dyn_generatable().into();
395
396        let result = on_trigger(trigger, || collector.compute());
397
398        assert!(result.is_err());
399    }
400}