1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
use std::{
collections::{hash_map::Entry, HashMap},
str::FromStr,
sync::Arc,
};
use futures_util::future::join_all;
use prometheus_http_query::response::Data;
use promql_parser::label::{MatchOp, Matcher};
use rand::RngCore;
use snops_common::state::{AgentId, AgentState, CannonId, EnvId, TimelineId};
use tokio::{select, task::JoinHandle};
use tracing::{debug, error, info, warn};
use super::{error::ExecutionError, EnvError, Environment};
use crate::{
cannon::{
sink::TxSink,
source::{QueryTarget, TxSource},
CannonInstance,
},
env::PortType,
schema::{
outcomes::PromQuery,
timeline::{Action, ActionInstance, EventDuration},
},
state::{GlobalState, PendingAgentReconcile},
};
impl Environment {
pub async fn execute(
state: Arc<GlobalState>,
env_id: EnvId,
timeline_id: TimelineId,
) -> Result<(), EnvError> {
let env = state
.get_env(env_id)
.ok_or_else(|| ExecutionError::EnvNotFound(env_id))?;
let timeline = env
.timelines
.get(&timeline_id)
.ok_or_else(|| ExecutionError::TimelineNotFound(env_id, timeline_id))?
.clone();
info!(
"starting timeline {timeline_id} playback for env {env_id} with {} events",
timeline.len()
);
// TODO do we need to move these locks now to a new struct inside the timelines
// hashmap?
let handle_lock_env = Arc::clone(&env);
let mut handle_lock = handle_lock_env.timeline_handle.lock().await;
// abort if timeline is already being executed
if !handle_lock
.as_ref()
.map(JoinHandle::is_finished)
.unwrap_or(true)
{
Err(ExecutionError::TimelineAlreadyStarted)?;
}
*handle_lock = Some(tokio::spawn(async move {
for event in timeline.iter() {
debug!("next event in timeline {event:?}");
// task handles that must be awaited for this timeline event
let mut awaiting_handles: Vec<tokio::task::JoinHandle<Result<(), ExecutionError>>> =
vec![];
// add a duration sleep if a duration was specified
if let Some(duration) = &event.duration {
match duration {
&EventDuration::Time(duration) => {
awaiting_handles.push(tokio::spawn(async move {
tokio::time::sleep(duration).await;
Ok(())
}));
}
// TODO
_ => unimplemented!(),
}
}
// whether or not to reconcile asynchronously (if any of the reconcile actions
// are awaited)
let mut reconcile_async = false;
// the pending reconciliations
let mut pending_reconciliations: HashMap<AgentId, PendingAgentReconcile> =
HashMap::new();
macro_rules! set_node_field {
($agent:ident , $($key:ident = $val:expr),* ) => {
#[allow(unused_variables)]
match pending_reconciliations.entry($agent.id()) {
Entry::Occupied(mut ent) => {
match ent.get_mut().2 {
AgentState::Inventory => (),
AgentState::Node(_, ref mut n) => {
$({
let $key = &n.$key;
n.$key = $val;
})*
}
}
}
Entry::Vacant(ent) => {
ent.insert((
$agent.id(),
$agent.client_owned(),
$agent.state().clone().map_node(|mut n| {
$({
let $key = &n.$key;
n.$key = $val;
})*
n
})
));
}
}
};
}
for ActionInstance { action, awaited } in &event.actions.0 {
match action {
// toggle online state
Action::Online(targets) | Action::Offline(targets) => {
if *awaited {
reconcile_async = true;
}
let o = matches!(action, Action::Online(_));
for agent in env.matching_agents(targets, &state.pool) {
set_node_field!(agent, online = o);
}
}
Action::Cannon(cannons) => {
for cannon in cannons.iter() {
let counter = rand::thread_rng().next_u32();
let cannon_id =
CannonId::from_str(&format!("{}-{counter}", cannon.name))
// there is a small chance that the cannon's name is at the
// length limit, so this will force the cannon to be renamed
// to 'cannon-N'
.unwrap_or_else(|_| {
CannonId::from_str(&format!("cannon-{counter}"))
.expect("cannon id failed to parse")
});
let Some((mut source, mut sink)) =
env.cannon_configs.get(&cannon.name).map(|c| c.clone())
else {
return Err(ExecutionError::UnknownCannon(cannon.name));
};
// override the query and target if they are specified
if let (Some(q), TxSource::RealTime { query, .. }) =
(&cannon.query, &mut source)
{
*query = QueryTarget::Node(q.clone());
};
if let (Some(t), TxSink::RealTime { target, .. }) =
(&cannon.target, &mut sink)
{
*target = t.clone();
};
let count = cannon.count;
let (mut instance, rx) = CannonInstance::new(
Arc::clone(&state),
cannon_id,
(env.id, env.storage.id, &env.aot_bin),
source,
sink,
count,
)
.map_err(ExecutionError::Cannon)?;
if *awaited {
let ctx = instance.ctx().unwrap();
let env = Arc::clone(&env);
// debug!("instance started await mode");
awaiting_handles.push(tokio::task::spawn(async move {
let res = ctx.spawn(rx).await;
// remove the cannon after the task is complete
env.cannons.remove(&cannon_id);
res.map_err(ExecutionError::Cannon)
}));
} else {
instance.spawn_local(rx).map_err(ExecutionError::Cannon)?;
}
// insert the cannon
env.cannons.insert(cannon_id, Arc::new(instance));
}
}
Action::Config(configs) => {
for (targets, request) in configs.iter() {
for agent in env.matching_agents(targets, &state.pool) {
// any height action will force the height to be incremented
if let Some(h) = request.height {
let h = h.into();
set_node_field!(agent, height = (height.0 + 1, h));
}
// update the peers and validators
if let Some(p) = &request.peers {
let p: Vec<_> = env
.matching_nodes(p, &state.pool, PortType::Node)
.collect();
set_node_field!(agent, peers = p.clone());
}
if let Some(p) = &request.validators {
let v: Vec<_> = env
.matching_nodes(p, &state.pool, PortType::Bft)
.collect();
set_node_field!(agent, validators = v.clone());
}
}
}
}
Action::Execute(action) => action.execute(&env).await?,
};
}
// if there are any pending reconciliations,
if !pending_reconciliations.is_empty() {
// reconcile all nodes
let task_state = Arc::clone(&state);
let reconcile_handle = tokio::spawn(async move {
if let Err(e) = task_state
.reconcile_agents(pending_reconciliations.into_values())
.await
{
// TODO: timeline setting to enable cleanup on error
// in many cases, maintaining the failure state is easier to
// troubleshoot. can shoot alerts here too
/* error!("failed to reconcile agents in timeline: {e}");
if let Err(e) = Environment::cleanup(env_id, &task_state).await {
error!("failed to inventory agents: {e}");
} */
return Err(e.into());
};
Ok(())
});
// await the reconciliation if any of the actions were `.await`
if reconcile_async {
awaiting_handles.push(reconcile_handle);
}
}
let handles_fut = join_all(awaiting_handles.into_iter());
// wait for the awaiting futures to complete
let handles_result = match &event.timeout {
// apply a timeout to `handles_fut`
Some(timeout) => match timeout {
EventDuration::Time(timeout_duration) => select! {
_ = tokio::time::sleep(*timeout_duration) => continue,
res = handles_fut => res,
},
_ => unimplemented!(),
},
// no timeout, regularly await the handles
None => handles_fut.await,
};
for result in handles_result.into_iter() {
match result {
Ok(Ok(())) => (),
Ok(e) => return e,
Err(e) => return Err(ExecutionError::Join(e)),
}
}
}
info!("------------------------------------------");
info!("playback of environment timeline completed");
info!("------------------------------------------");
// perform outcome validation
if let Some(prometheus) = &*state.prometheus {
for (outcome_name, outcome) in env.outcomes.iter() {
let Some(mut query) = outcome
.query
.as_ref()
.or_else(|| PromQuery::builtin(outcome_name))
.cloned()
else {
warn!("unrecognized metric name (no built-in query found)");
continue;
};
// inject env ID matchers into the PromQL query
query.add_matchers(&[Matcher {
op: MatchOp::Equal,
name: String::from("env_id"),
value: env_id.to_string(),
}]);
// TODO: store pass/fails in environment
let query_response = prometheus.query(query.into_inner()).get().await;
match query_response {
Ok(result) => {
let value = match result.data() {
Data::Scalar(sample) => sample.value(),
Data::Vector(vector) => match vector.last() {
Some(item) => item.sample().value(),
None => {
warn!("empty vector response from prometheus");
continue;
}
},
_ => {
warn!("unsupported prometheus query response");
continue;
}
};
let message = outcome.validation.show_validation(value);
info!("OUTCOME {outcome_name}: {message}");
}
Err(e) => {
error!("failed to validate outcome {outcome_name}: {e}");
}
}
}
}
Ok(())
}));
Ok(())
}
}