1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
// Copyright (c) Sean Lawlor
//
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.
//! An actor which wraps an operation that should be done in a looped fashion.
//! This actor continually executes the operation in an infinite basis until
//! the operation either signals it's complete or throws an error.
//!
//! To keep dependent services agnostic to the underlying actor logic, you can simply
//! use [spawn_loop] to start an operation which should be done in a loop.
//!
//! ```
//! use ractor::ActorProcessingErr;
//! use ractor::concurrency::Duration;
//! use ractor::concurrency::sleep;
//! use ractor_actors::streams::looping::{Operation, IterationResult, spawn_loop};
//!
//! struct SampleOperation;
//!
//! #[async_trait::async_trait]
//! impl Operation for SampleOperation {
//! type State = ();
//!
//! async fn work(&self, state: &mut Self::State) -> Result<IterationResult, ActorProcessingErr> {
//! println!("I'm a loop!");
//! sleep(Duration::from_millis(25)).await;
//! Ok(IterationResult::Continue)
//! }
//! }
//! let _ = async {
//! let _actor = spawn_loop(SampleOperation, (), None).await.expect("Failed to start sample");
//! };
//! ```
use PhantomData;
use cast;
use Actor;
use ActorCell;
use ActorProcessingErr;
use ActorRef;
use SpawnErr;
/// Represents the result of a looping operation. It signals if the
/// loop operation is should continue or end.
/// An operation is an implementation which is repeatedly called, blocking the task, until
/// either (a) shutdown ([IterationResult::End]) or (b) error. This could be processing a stream
/// request or some continuous polling operation (something like a configuration element,
/// which emits signals when the config changes)
/// A blocking actor which performs a given async operation continually
/// in a loop and does not handle external messages. It is strongly typed
/// to the inner implementation of the [Operation] that it's looping on.
/// Spawn an operation which will continue executing until either
///
/// 1. One of the operation calls panics or returns an error
/// 2. The operation signals it's completed with [IterationResult::End]
///
/// The loop operation additionally can thread a state which can be used in loop operations.
///
/// * `op`: The [Operation] implementation defining the work at each loop iteration
/// * `istate`: The initial state for the loop operation
/// * `supervisor`: (Optional) The [ActorCell] supervisor actor which will supervise the underlying loop operation actor
///
/// Returns [Ok(ActorCell)] if the underlying actor was successfully spawned, [SpawnErr] if a startup failure occurs.
pub async