1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
use std::{
any::TypeId,
collections::{HashMap, HashSet},
marker::PhantomData,
};
use tokio_util::sync::CancellationToken;
use crate::{
graph::{Ctx, Er, Graph},
node::{NodeBuilder, Payload},
};
/// Describes a set of work to be executed by a [`crate::graph::Graph`].
pub struct Job<C: Ctx, E: Er> {
/// Execution will solve the graph for reaching (and executing) these nodes.
pub(crate) targets: HashSet<TypeId>,
/// Optionally provided values. Used for skipping parts of a graph.
pub(crate) inputs: HashMap<TypeId, Payload>,
/// Cancellation token for stopping execution of a job. The default token never cancels.
pub(crate) cancellation_token: CancellationToken,
/// Types of nodes. Used for compile time guarantee that the job fits the graph.
node_type: PhantomData<(C, E)>,
}
impl<C: Ctx, E: Er> Job<C, E> {
/// Create a new empty Job.
#[must_use]
pub fn new() -> Self {
Self::default()
}
/// Sets a provided [`CancellationToken`] if you already have one.
///
/// The more common usecase here, though, is to clone the one already present and call cancel on that. See [`Job::cancellation_token`].
#[must_use]
pub fn with_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self {
self.cancellation_token = cancellation_token;
self
}
/// Clone the [`CancellationToken`]. Useful for, eg. implementing a timeout for the job.
#[must_use]
pub fn cancellation_token(&self) -> CancellationToken {
self.cancellation_token.clone()
}
/// Set a target for the job. Without setting this, the job is essentially a no-op.
/// # Example
/// ```ignore
/// #[derive(Clone)]
/// struct A;
///
/// #[executor]
/// async fn create_a(_: ()) -> Result<A, String> {
/// Ok(A)
/// }
///
/// let mut job = Job::new();
/// job.target::<A>();
/// ```
pub fn target<T: 'static + NodeBuilder<C, E>>(&mut self) {
self.targets.insert(TypeId::of::<T>());
}
/// Set a target for the job. Without setting this, the job is essentially a no-op.
///
/// # Example
/// ```ignore
/// #[derive(Clone)]
/// struct A;
///
/// #[executor]
/// async fn create_a(_: ()) -> Result<A, String> {
/// Ok(A)
/// }
///
/// let job = Job::new().with_target::<A>();
/// ```
#[must_use]
pub fn with_target<T: 'static + NodeBuilder<C, E>>(mut self) -> Self {
self.target::<T>();
self
}
/// Adds data to the job that will be used as if the node producing that data has already run,
/// and none of its dependencies will be run either. This is useful for things like cached
/// values, or continuing a previously failed execution.
///
/// # Example
/// ```ignore
/// #[derive(Clone)]
/// struct A(i32);
///
/// let mut job = Job::new();
/// job.with_input(A(22));
/// ```
pub fn input<T: Send + 'static>(&mut self, value: T) {
let key = TypeId::of::<T>();
let val = Box::new(value) as Payload;
self.inputs.insert(key, val);
}
/// Adds data to the job that will be used as if the node producing that data has already run,
/// and none of its dependencies will be run either. This is useful for things like cached
/// values, or continuing a previously failed execution.
///
/// # Example
/// ```ignore
/// #[derive(Clone)]
/// struct A(i32);
///
/// let job = Job::new().with_input(A(22));
/// ```
#[must_use]
pub fn with_input<T: Send + 'static>(mut self, value: T) -> Self {
self.input(value);
self
}
pub(crate) fn pending(&self, graph: &Graph<C, E>) -> HashSet<usize> {
let mut pending = HashSet::new();
let mut stack: Vec<usize> = vec![];
for id in &self.targets {
if !self.inputs.contains_key(id) {
let i = graph.nodes.binary_search_by_key(id, |n| n.id).unwrap();
pending.insert(i);
stack.extend(&graph.adj[i]);
}
}
while let Some(i) = stack.pop() {
let id = graph.nodes[i].id;
let not_input = !self.inputs.contains_key(&id);
let not_pending = !pending.contains(&i);
if not_input && not_pending {
pending.insert(i);
stack.extend(&graph.adj[i]);
}
}
pending
}
}
impl<C: Ctx, E: Er> Default for Job<C, E> {
fn default() -> Self {
Self {
targets: HashSet::new(),
inputs: HashMap::new(),
cancellation_token: CancellationToken::new(),
node_type: PhantomData,
}
}
}