oxide_update_engine/context.rs
1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4
5use crate::{UpdateEngine, errors::ExecutionError};
6use derive_where::derive_where;
7use futures::FutureExt;
8use oxide_update_engine_types::{
9 buffer::EventBuffer,
10 errors::NestedEngineError,
11 events::{Event, EventReport, ExecutionUuid, StepEventKind, StepProgress},
12 spec::{EngineSpec, GenericSpec, SerializableError},
13};
14use std::{collections::HashMap, fmt, marker::PhantomData, sync::Mutex};
15use tokio::{
16 sync::{mpsc, oneshot},
17 time::Instant,
18};
19
20/// Context for a step's execution function.
21///
22/// This is passed into the function registered for a step, and can be
23/// used to send progress updates as the function continues execution.
24///
25/// # Notes
26///
27/// `StepContext` deliberately does not implement `Clone`, to make it
28/// more likely that it is dropped at the same time the future
29/// completes.
30#[derive(Debug)]
31pub struct StepContext<S: EngineSpec> {
32 log: slog::Logger,
33 payload_sender: mpsc::Sender<StepContextPayload<S>>,
34 token: StepHandleToken<S>,
35 // This is keyed by root execution ID in case there are multiple
36 // nested events taking place. Each `NestedEventBuffer` tracks one
37 // such execution ID.
38 nested_buffers: Mutex<HashMap<ExecutionUuid, NestedEventBuffer>>,
39}
40
41impl<S: EngineSpec> StepContext<S> {
42 pub(crate) fn new(
43 log: &slog::Logger,
44 payload_sender: mpsc::Sender<StepContextPayload<S>>,
45 ) -> Self {
46 Self {
47 log: log.clone(),
48 payload_sender,
49 token: StepHandleToken::new(),
50 nested_buffers: Default::default(),
51 }
52 }
53
54 /// Sends a progress update to the update engine.
55 #[inline]
56 pub async fn send_progress(&self, progress: StepProgress<S>) {
57 let now = Instant::now();
58 let (done, done_rx) = oneshot::channel();
59 self.payload_sender
60 .send(StepContextPayload::Progress { now, progress, done })
61 .await
62 .expect("our code always keeps payload_receiver open");
63 _ = done_rx.await;
64 }
65
66 /// Sends a report from a nested engine, typically one running on
67 /// a remote machine.
68 ///
69 /// Returns an error if a [`StepEventKind::ExecutionFailed`] event
70 /// was seen.
71 #[inline]
72 pub async fn send_nested_report<S2: EngineSpec>(
73 &self,
74 report: EventReport<S2>,
75 ) -> Result<(), NestedEngineError<GenericSpec>> {
76 let now = Instant::now();
77
78 let mut res = Ok(());
79 let delta_report = if let Some(id) = report.root_execution_id {
80 let mut nested_buffers = self.nested_buffers.lock().unwrap();
81 Some(nested_buffers.entry(id).or_default().add_event_report(report))
82 } else {
83 // If there's no root execution ID set, report is expected
84 // to be empty. However, report is untrusted data so we
85 // can't assert on it. Instead, log this.
86 if !report.step_events.is_empty() {
87 slog::warn!(
88 self.log,
89 "received non-empty report with empty \
90 root execution ID";
91 "report" => ?report,
92 );
93 }
94 None
95 };
96
97 if let Some(delta_report) = delta_report {
98 for event in delta_report.step_events {
99 match &event.kind {
100 StepEventKind::ExecutionFailed {
101 failed_step,
102 message,
103 causes,
104 ..
105 } => {
106 res =
107 Err(NestedEngineError::StepFailed {
108 component: failed_step
109 .info
110 .component
111 .clone(),
112 id: failed_step.info.id.clone(),
113 description: failed_step
114 .info
115 .description
116 .clone(),
117 error: SerializableError::from_message_and_causes(
118 message.clone(),
119 causes.clone(),
120 ),
121 });
122 }
123 StepEventKind::ExecutionAborted {
124 aborted_step,
125 message,
126 ..
127 } => {
128 res = Err(NestedEngineError::Aborted {
129 component: aborted_step
130 .info
131 .component
132 .clone(),
133 id: aborted_step.info.id.clone(),
134 description: aborted_step
135 .info
136 .description
137 .clone(),
138 message: message.clone(),
139 });
140 }
141 StepEventKind::NoStepsDefined
142 | StepEventKind::ExecutionStarted { .. }
143 | StepEventKind::AttemptRetry { .. }
144 | StepEventKind::ProgressReset { .. }
145 | StepEventKind::StepCompleted { .. }
146 | StepEventKind::ExecutionCompleted { .. }
147 // Note: we do not care about nested failures or
148 // aborts. That's because the parent step might
149 // have restarted nested engines. Only top-level
150 // failures or aborts matter.
151 | StepEventKind::Nested { .. }
152 | StepEventKind::Unknown => {}
153 }
154
155 self.payload_sender
156 .send(StepContextPayload::Nested {
157 now,
158 event: Event::Step(event),
159 })
160 .await
161 .expect(
162 "our code always keeps \
163 payload_receiver open",
164 );
165 }
166
167 for event in delta_report.progress_events {
168 self.payload_sender
169 .send(StepContextPayload::Nested {
170 now,
171 event: Event::Progress(event),
172 })
173 .await
174 .expect(
175 "our code always keeps \
176 payload_receiver open",
177 );
178 }
179
180 // Ensure that all reports have been received by the
181 // engine before returning.
182 let (done, done_rx) = oneshot::channel();
183 self.payload_sender
184 .send(StepContextPayload::Sync { done })
185 .await
186 .expect("our code always keeps payload_receiver open");
187 _ = done_rx.await;
188 }
189
190 res
191 }
192
193 /// Creates a nested execution engine.
194 ///
195 /// An individual step can generate other steps: these steps are
196 /// treated as *nested*, and carry their own progress.
197 pub async fn with_nested_engine<'a, 'this, F, S2>(
198 &'this self,
199 engine_fn: F,
200 ) -> Result<CompletionContext<S2>, NestedEngineError<S2>>
201 where
202 'this: 'a,
203 F: FnOnce(&mut UpdateEngine<'a, S2>) -> Result<(), S2::Error> + Send,
204 S2: EngineSpec + 'a,
205 {
206 // Previously, this code was of the form:
207 //
208 // let (sender, mut receiver) = mpsc::channel(128);
209 // let mut engine = UpdateEngine::new(&self.log, sender);
210 //
211 // And there was a loop below that selected over `engine` and
212 // `receiver`.
213 //
214 // That approach was abandoned because it had ordering issues,
215 // because it wasn't guaranteed that events were received in
216 // the order they were processed. For example, consider what
217 // happens if:
218 //
219 // 1. User code sent an event E1 through a child (nested)
220 // StepContext.
221 // 2. Then in quick succession, the same code sent an event E2
222 // through self.
223 //
224 // What users would expect to happen is that E1 is received
225 // before E2. However, what actually happened was that:
226 //
227 // 1. `engine` was driven until the next suspend point. This
228 // caused E2 to be sent.
229 // 2. Then, `receiver` was polled. This caused E1 to be
230 // received.
231 //
232 // So the order of events was reversed.
233 //
234 // To fix this, we now use a single channel, and send events
235 // through it both from the nested engine and from self.
236 //
237 // An alternative would be to use a oneshot channel as a
238 // synchronization tool. However, sharing a channel is easier.
239 let mut engine = UpdateEngine::<S2>::new_nested(
240 &self.log,
241 self.payload_sender.clone(),
242 );
243
244 // Create the engine's steps.
245 (engine_fn)(&mut engine)
246 .map_err(|error| NestedEngineError::Creation { error })?;
247
248 // Now run the engine.
249 let engine = engine.execute();
250 match engine.await {
251 Ok(cx) => Ok(cx),
252 Err(ExecutionError::EventSendError(_)) => {
253 unreachable!("our code always keeps payload_receiver open")
254 }
255 Err(ExecutionError::StepFailed {
256 component,
257 id,
258 description,
259 error,
260 }) => Err(NestedEngineError::StepFailed {
261 component,
262 id,
263 description,
264 error,
265 }),
266 Err(ExecutionError::Aborted {
267 component,
268 id,
269 description,
270 message,
271 }) => Err(NestedEngineError::Aborted {
272 component,
273 id,
274 description,
275 message,
276 }),
277 }
278 }
279
280 /// Retrieves a token used to fetch the value out of a
281 /// [`StepHandle`].
282 pub fn token(&self) -> &StepHandleToken<S> {
283 &self.token
284 }
285}
286
287/// Tracker for [`StepContext::send_nested_report`].
288///
289/// Nested event reports might contain events already seen in prior
290/// runs: `NestedEventBuffer` deduplicates those events such that only
291/// deltas are sent over the channel.
292#[derive(Debug, Default)]
293struct NestedEventBuffer {
294 buffer: EventBuffer<GenericSpec>,
295 last_seen: Option<usize>,
296}
297
298impl NestedEventBuffer {
299 /// Adds an event report to the buffer, and generates a
300 /// corresponding event report that can be used to send data
301 /// upstream.
302 fn add_event_report<S: EngineSpec>(
303 &mut self,
304 report: EventReport<S>,
305 ) -> EventReport<GenericSpec> {
306 self.buffer.add_event_report(report.into_generic());
307 self.buffer.generate_report_since(&mut self.last_seen)
308 }
309}
310
311/// An uninhabited type for oneshot channels, since we only care about
312/// them being dropped.
313#[derive(Debug)]
314pub(crate) enum Never {}
315
316#[derive_where(Debug)]
317pub(crate) enum StepContextPayload<S: EngineSpec> {
318 Progress {
319 now: Instant,
320 progress: StepProgress<S>,
321 done: oneshot::Sender<Never>,
322 },
323 /// A single nested event with synchronization.
324 NestedSingle {
325 now: Instant,
326 event: Event<GenericSpec>,
327 done: oneshot::Sender<Never>,
328 },
329 /// One out of a series of nested events sent in succession.
330 Nested {
331 now: Instant,
332 event: Event<GenericSpec>,
333 },
334 Sync {
335 done: oneshot::Sender<Never>,
336 },
337}
338
339/// Context for a step's metadata-generation function.
340///
341/// This is passed into the function registered to generate a step's
342/// metadata.
343///
344/// # Notes
345///
346/// `MetadataContext` deliberately does not implement `Clone`, to make
347/// it more likely that it is dropped at the same time the future
348/// completes.
349#[derive_where(Debug)]
350pub struct MetadataContext<S: EngineSpec> {
351 token: StepHandleToken<S>,
352}
353
354impl<S: EngineSpec> MetadataContext<S> {
355 pub(crate) fn new() -> Self {
356 Self { token: StepHandleToken::new() }
357 }
358
359 /// Retrieves a token used to fetch the value out of a
360 /// [`StepHandle`].
361 pub fn token(&self) -> &StepHandleToken<S> {
362 &self.token
363 }
364}
365
366/// Context returned by a successful
367/// [`UpdateEngine::execute`](crate::UpdateEngine::execute).
368///
369/// This can be used to retrieve the value of a `StepHandle`. In the
370/// future, it may also be extended to provide more information.
371#[derive_where(Debug)]
372pub struct CompletionContext<S: EngineSpec> {
373 token: StepHandleToken<S>,
374}
375
376impl<S: EngineSpec> CompletionContext<S> {
377 pub(crate) fn new() -> Self {
378 Self { token: StepHandleToken::new() }
379 }
380
381 /// Retrieves a token used to fetch the value out of a
382 /// [`StepHandle`].
383 pub fn token(&self) -> &StepHandleToken<S> {
384 &self.token
385 }
386}
387
388/// A token to retrieve the value within a [`StepHandle`].
389///
390/// For more information, see the documentation for [`StepHandle`].
391#[derive_where(Debug)]
392pub struct StepHandleToken<S>(PhantomData<S>);
393
394impl<S> StepHandleToken<S> {
395 fn new() -> Self {
396 Self(PhantomData)
397 }
398}
399
400/// A way to obtain a step's result.
401///
402/// This handle can be used to transfer data between steps.
403///
404/// `StepHandle`s must be awaited:
405///
406/// 1. Either within a future step,
407/// 2. Or after the engine completes executing.
408///
409/// To ensure this, the only way to retrieve a `StepHandle` is with a
410/// `StepHandleToken`, returned by:
411///
412/// * [`StepContext::token`]
413/// * [`MetadataContext::token`]
414/// * [`CompletionContext::token`]
415///
416/// It is important that `StepHandle`s never be awaited outside the
417/// context of a step that comes afterwards -- doing so will cause an
418/// immediate deadlock.
419///
420/// Dropping a `StepHandle` has no effect on whether the step itself
421/// runs.
422#[derive_where(Debug; T: fmt::Debug)]
423pub struct StepHandle<T, S> {
424 receiver: oneshot::Receiver<T>,
425 _marker: PhantomData<S>,
426}
427
428impl<T, S> StepHandle<T, S> {
429 /// Creates a `StepHandle` that immediately provides a value.
430 ///
431 /// The value is always available and can be used within any steps.
432 pub fn ready(value: T) -> Self {
433 let (sender, receiver) = oneshot::channel();
434 // Can't use expect here because T doesn't implement Debug.
435 if sender.send(value).is_err() {
436 unreachable!("we're holding the receiver open")
437 }
438 Self::new(receiver)
439 }
440
441 pub(crate) fn new(receiver: oneshot::Receiver<T>) -> Self {
442 Self { receiver, _marker: PhantomData }
443 }
444
445 /// Resolves to the output from a step, `T`.
446 pub async fn into_value(self, _token: &StepHandleToken<S>) -> T {
447 self.receiver.await.expect("update-engine always sends a value")
448 }
449
450 /// Returns a shared handle.
451 ///
452 /// A `SharedStepHandle` can be used to retrieve the output of a
453 /// future across multiple dependent steps.
454 pub fn into_shared(self) -> SharedStepHandle<T, S>
455 where
456 T: Clone,
457 {
458 SharedStepHandle {
459 receiver: self.receiver.shared(),
460 _marker: self._marker,
461 }
462 }
463}
464
465/// A shared version of [`StepHandle`].
466///
467/// A `SharedStepHandle` is cloneable, and can be used to retrieve the
468/// output of a step across multiple dependent steps.
469#[derive_where(Debug; T: fmt::Debug)]
470#[derive_where(Clone; T: Clone)]
471pub struct SharedStepHandle<T, S> {
472 receiver: futures::future::Shared<oneshot::Receiver<T>>,
473 _marker: PhantomData<S>,
474}
475
476impl<T: Clone, S> SharedStepHandle<T, S> {
477 /// Resolves to the output from a step, `T`.
478 pub async fn into_value(self, _token: &StepHandleToken<S>) -> T {
479 self.receiver.await.expect("update-engine always sends a value")
480 }
481}