Skip to main content

oxide_update_engine/
context.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4
5use crate::{UpdateEngine, errors::ExecutionError};
6use derive_where::derive_where;
7use futures::FutureExt;
8use oxide_update_engine_types::{
9    buffer::EventBuffer,
10    errors::NestedEngineError,
11    events::{Event, EventReport, ExecutionUuid, StepEventKind, StepProgress},
12    spec::{EngineSpec, GenericSpec, SerializableError},
13};
14use std::{collections::HashMap, fmt, marker::PhantomData, sync::Mutex};
15use tokio::{
16    sync::{mpsc, oneshot},
17    time::Instant,
18};
19
20/// Context for a step's execution function.
21///
22/// This is passed into the function registered for a step, and can be
23/// used to send progress updates as the function continues execution.
24///
25/// # Notes
26///
27/// `StepContext` deliberately does not implement `Clone`, to make it
28/// more likely that it is dropped at the same time the future
29/// completes.
30#[derive(Debug)]
31pub struct StepContext<S: EngineSpec> {
32    log: slog::Logger,
33    payload_sender: mpsc::Sender<StepContextPayload<S>>,
34    token: StepHandleToken<S>,
35    // This is keyed by root execution ID in case there are multiple
36    // nested events taking place. Each `NestedEventBuffer` tracks one
37    // such execution ID.
38    nested_buffers: Mutex<HashMap<ExecutionUuid, NestedEventBuffer>>,
39}
40
41impl<S: EngineSpec> StepContext<S> {
42    pub(crate) fn new(
43        log: &slog::Logger,
44        payload_sender: mpsc::Sender<StepContextPayload<S>>,
45    ) -> Self {
46        Self {
47            log: log.clone(),
48            payload_sender,
49            token: StepHandleToken::new(),
50            nested_buffers: Default::default(),
51        }
52    }
53
54    /// Sends a progress update to the update engine.
55    #[inline]
56    pub async fn send_progress(&self, progress: StepProgress<S>) {
57        let now = Instant::now();
58        let (done, done_rx) = oneshot::channel();
59        self.payload_sender
60            .send(StepContextPayload::Progress { now, progress, done })
61            .await
62            .expect("our code always keeps payload_receiver open");
63        _ = done_rx.await;
64    }
65
66    /// Sends a report from a nested engine, typically one running on
67    /// a remote machine.
68    ///
69    /// Returns an error if a [`StepEventKind::ExecutionFailed`] event
70    /// was seen.
71    #[inline]
72    pub async fn send_nested_report<S2: EngineSpec>(
73        &self,
74        report: EventReport<S2>,
75    ) -> Result<(), NestedEngineError<GenericSpec>> {
76        let now = Instant::now();
77
78        let mut res = Ok(());
79        let delta_report = if let Some(id) = report.root_execution_id {
80            let mut nested_buffers = self.nested_buffers.lock().unwrap();
81            Some(nested_buffers.entry(id).or_default().add_event_report(report))
82        } else {
83            // If there's no root execution ID set, report is expected
84            // to be empty. However, report is untrusted data so we
85            // can't assert on it. Instead, log this.
86            if !report.step_events.is_empty() {
87                slog::warn!(
88                    self.log,
89                    "received non-empty report with empty \
90                     root execution ID";
91                    "report" => ?report,
92                );
93            }
94            None
95        };
96
97        if let Some(delta_report) = delta_report {
98            for event in delta_report.step_events {
99                match &event.kind {
100                    StepEventKind::ExecutionFailed {
101                        failed_step,
102                        message,
103                        causes,
104                        ..
105                    } => {
106                        res =
107                            Err(NestedEngineError::StepFailed {
108                                component: failed_step
109                                    .info
110                                    .component
111                                    .clone(),
112                                id: failed_step.info.id.clone(),
113                                description: failed_step
114                                    .info
115                                    .description
116                                    .clone(),
117                                error: SerializableError::from_message_and_causes(
118                                    message.clone(),
119                                    causes.clone(),
120                                ),
121                            });
122                    }
123                    StepEventKind::ExecutionAborted {
124                        aborted_step,
125                        message,
126                        ..
127                    } => {
128                        res = Err(NestedEngineError::Aborted {
129                            component: aborted_step
130                                .info
131                                .component
132                                .clone(),
133                            id: aborted_step.info.id.clone(),
134                            description: aborted_step
135                                .info
136                                .description
137                                .clone(),
138                            message: message.clone(),
139                        });
140                    }
141                    StepEventKind::NoStepsDefined
142                    | StepEventKind::ExecutionStarted { .. }
143                    | StepEventKind::AttemptRetry { .. }
144                    | StepEventKind::ProgressReset { .. }
145                    | StepEventKind::StepCompleted { .. }
146                    | StepEventKind::ExecutionCompleted { .. }
147                    // Note: we do not care about nested failures or
148                    // aborts. That's because the parent step might
149                    // have restarted nested engines. Only top-level
150                    // failures or aborts matter.
151                    | StepEventKind::Nested { .. }
152                    | StepEventKind::Unknown => {}
153                }
154
155                self.payload_sender
156                    .send(StepContextPayload::Nested {
157                        now,
158                        event: Event::Step(event),
159                    })
160                    .await
161                    .expect(
162                        "our code always keeps \
163                         payload_receiver open",
164                    );
165            }
166
167            for event in delta_report.progress_events {
168                self.payload_sender
169                    .send(StepContextPayload::Nested {
170                        now,
171                        event: Event::Progress(event),
172                    })
173                    .await
174                    .expect(
175                        "our code always keeps \
176                         payload_receiver open",
177                    );
178            }
179
180            // Ensure that all reports have been received by the
181            // engine before returning.
182            let (done, done_rx) = oneshot::channel();
183            self.payload_sender
184                .send(StepContextPayload::Sync { done })
185                .await
186                .expect("our code always keeps payload_receiver open");
187            _ = done_rx.await;
188        }
189
190        res
191    }
192
193    /// Creates a nested execution engine.
194    ///
195    /// An individual step can generate other steps: these steps are
196    /// treated as *nested*, and carry their own progress.
197    pub async fn with_nested_engine<'a, 'this, F, S2>(
198        &'this self,
199        engine_fn: F,
200    ) -> Result<CompletionContext<S2>, NestedEngineError<S2>>
201    where
202        'this: 'a,
203        F: FnOnce(&mut UpdateEngine<'a, S2>) -> Result<(), S2::Error> + Send,
204        S2: EngineSpec + 'a,
205    {
206        // Previously, this code was of the form:
207        //
208        //     let (sender, mut receiver) = mpsc::channel(128);
209        //     let mut engine = UpdateEngine::new(&self.log, sender);
210        //
211        // And there was a loop below that selected over `engine` and
212        // `receiver`.
213        //
214        // That approach was abandoned because it had ordering issues,
215        // because it wasn't guaranteed that events were received in
216        // the order they were processed. For example, consider what
217        // happens if:
218        //
219        // 1. User code sent an event E1 through a child (nested)
220        //    StepContext.
221        // 2. Then in quick succession, the same code sent an event E2
222        //    through self.
223        //
224        // What users would expect to happen is that E1 is received
225        // before E2. However, what actually happened was that:
226        //
227        // 1. `engine` was driven until the next suspend point. This
228        //    caused E2 to be sent.
229        // 2. Then, `receiver` was polled. This caused E1 to be
230        //    received.
231        //
232        // So the order of events was reversed.
233        //
234        // To fix this, we now use a single channel, and send events
235        // through it both from the nested engine and from self.
236        //
237        // An alternative would be to use a oneshot channel as a
238        // synchronization tool. However, sharing a channel is easier.
239        let mut engine = UpdateEngine::<S2>::new_nested(
240            &self.log,
241            self.payload_sender.clone(),
242        );
243
244        // Create the engine's steps.
245        (engine_fn)(&mut engine)
246            .map_err(|error| NestedEngineError::Creation { error })?;
247
248        // Now run the engine.
249        let engine = engine.execute();
250        match engine.await {
251            Ok(cx) => Ok(cx),
252            Err(ExecutionError::EventSendError(_)) => {
253                unreachable!("our code always keeps payload_receiver open")
254            }
255            Err(ExecutionError::StepFailed {
256                component,
257                id,
258                description,
259                error,
260            }) => Err(NestedEngineError::StepFailed {
261                component,
262                id,
263                description,
264                error,
265            }),
266            Err(ExecutionError::Aborted {
267                component,
268                id,
269                description,
270                message,
271            }) => Err(NestedEngineError::Aborted {
272                component,
273                id,
274                description,
275                message,
276            }),
277        }
278    }
279
280    /// Retrieves a token used to fetch the value out of a
281    /// [`StepHandle`].
282    pub fn token(&self) -> &StepHandleToken<S> {
283        &self.token
284    }
285}
286
287/// Tracker for [`StepContext::send_nested_report`].
288///
289/// Nested event reports might contain events already seen in prior
290/// runs: `NestedEventBuffer` deduplicates those events such that only
291/// deltas are sent over the channel.
292#[derive(Debug, Default)]
293struct NestedEventBuffer {
294    buffer: EventBuffer<GenericSpec>,
295    last_seen: Option<usize>,
296}
297
298impl NestedEventBuffer {
299    /// Adds an event report to the buffer, and generates a
300    /// corresponding event report that can be used to send data
301    /// upstream.
302    fn add_event_report<S: EngineSpec>(
303        &mut self,
304        report: EventReport<S>,
305    ) -> EventReport<GenericSpec> {
306        self.buffer.add_event_report(report.into_generic());
307        self.buffer.generate_report_since(&mut self.last_seen)
308    }
309}
310
311/// An uninhabited type for oneshot channels, since we only care about
312/// them being dropped.
313#[derive(Debug)]
314pub(crate) enum Never {}
315
316#[derive_where(Debug)]
317pub(crate) enum StepContextPayload<S: EngineSpec> {
318    Progress {
319        now: Instant,
320        progress: StepProgress<S>,
321        done: oneshot::Sender<Never>,
322    },
323    /// A single nested event with synchronization.
324    NestedSingle {
325        now: Instant,
326        event: Event<GenericSpec>,
327        done: oneshot::Sender<Never>,
328    },
329    /// One out of a series of nested events sent in succession.
330    Nested {
331        now: Instant,
332        event: Event<GenericSpec>,
333    },
334    Sync {
335        done: oneshot::Sender<Never>,
336    },
337}
338
339/// Context for a step's metadata-generation function.
340///
341/// This is passed into the function registered to generate a step's
342/// metadata.
343///
344/// # Notes
345///
346/// `MetadataContext` deliberately does not implement `Clone`, to make
347/// it more likely that it is dropped at the same time the future
348/// completes.
349#[derive_where(Debug)]
350pub struct MetadataContext<S: EngineSpec> {
351    token: StepHandleToken<S>,
352}
353
354impl<S: EngineSpec> MetadataContext<S> {
355    pub(crate) fn new() -> Self {
356        Self { token: StepHandleToken::new() }
357    }
358
359    /// Retrieves a token used to fetch the value out of a
360    /// [`StepHandle`].
361    pub fn token(&self) -> &StepHandleToken<S> {
362        &self.token
363    }
364}
365
366/// Context returned by a successful
367/// [`UpdateEngine::execute`](crate::UpdateEngine::execute).
368///
369/// This can be used to retrieve the value of a `StepHandle`. In the
370/// future, it may also be extended to provide more information.
371#[derive_where(Debug)]
372pub struct CompletionContext<S: EngineSpec> {
373    token: StepHandleToken<S>,
374}
375
376impl<S: EngineSpec> CompletionContext<S> {
377    pub(crate) fn new() -> Self {
378        Self { token: StepHandleToken::new() }
379    }
380
381    /// Retrieves a token used to fetch the value out of a
382    /// [`StepHandle`].
383    pub fn token(&self) -> &StepHandleToken<S> {
384        &self.token
385    }
386}
387
388/// A token to retrieve the value within a [`StepHandle`].
389///
390/// For more information, see the documentation for [`StepHandle`].
391#[derive_where(Debug)]
392pub struct StepHandleToken<S>(PhantomData<S>);
393
394impl<S> StepHandleToken<S> {
395    fn new() -> Self {
396        Self(PhantomData)
397    }
398}
399
400/// A way to obtain a step's result.
401///
402/// This handle can be used to transfer data between steps.
403///
404/// `StepHandle`s must be awaited:
405///
406/// 1. Either within a future step,
407/// 2. Or after the engine completes executing.
408///
409/// To ensure this, the only way to retrieve a `StepHandle` is with a
410/// `StepHandleToken`, returned by:
411///
412/// * [`StepContext::token`]
413/// * [`MetadataContext::token`]
414/// * [`CompletionContext::token`]
415///
416/// It is important that `StepHandle`s never be awaited outside the
417/// context of a step that comes afterwards -- doing so will cause an
418/// immediate deadlock.
419///
420/// Dropping a `StepHandle` has no effect on whether the step itself
421/// runs.
422#[derive_where(Debug; T: fmt::Debug)]
423pub struct StepHandle<T, S> {
424    receiver: oneshot::Receiver<T>,
425    _marker: PhantomData<S>,
426}
427
428impl<T, S> StepHandle<T, S> {
429    /// Creates a `StepHandle` that immediately provides a value.
430    ///
431    /// The value is always available and can be used within any steps.
432    pub fn ready(value: T) -> Self {
433        let (sender, receiver) = oneshot::channel();
434        // Can't use expect here because T doesn't implement Debug.
435        if sender.send(value).is_err() {
436            unreachable!("we're holding the receiver open")
437        }
438        Self::new(receiver)
439    }
440
441    pub(crate) fn new(receiver: oneshot::Receiver<T>) -> Self {
442        Self { receiver, _marker: PhantomData }
443    }
444
445    /// Resolves to the output from a step, `T`.
446    pub async fn into_value(self, _token: &StepHandleToken<S>) -> T {
447        self.receiver.await.expect("update-engine always sends a value")
448    }
449
450    /// Returns a shared handle.
451    ///
452    /// A `SharedStepHandle` can be used to retrieve the output of a
453    /// future across multiple dependent steps.
454    pub fn into_shared(self) -> SharedStepHandle<T, S>
455    where
456        T: Clone,
457    {
458        SharedStepHandle {
459            receiver: self.receiver.shared(),
460            _marker: self._marker,
461        }
462    }
463}
464
465/// A shared version of [`StepHandle`].
466///
467/// A `SharedStepHandle` is cloneable, and can be used to retrieve the
468/// output of a step across multiple dependent steps.
469#[derive_where(Debug; T: fmt::Debug)]
470#[derive_where(Clone; T: Clone)]
471pub struct SharedStepHandle<T, S> {
472    receiver: futures::future::Shared<oneshot::Receiver<T>>,
473    _marker: PhantomData<S>,
474}
475
476impl<T: Clone, S> SharedStepHandle<T, S> {
477    /// Resolves to the output from a step, `T`.
478    pub async fn into_value(self, _token: &StepHandleToken<S>) -> T {
479        self.receiver.await.expect("update-engine always sends a value")
480    }
481}