peace_rt/cmd_blocks/
apply_exec_cmd_block.rs

1use std::{fmt::Debug, marker::PhantomData};
2
3use fn_graph::{StreamOpts, StreamOutcome};
4use futures::join;
5use peace_cfg::{ApplyCheck, FnCtx};
6use peace_cmd_ctx::{CmdCtxSpsfFields, CmdCtxTypes};
7use peace_cmd_model::CmdBlockOutcome;
8use peace_cmd_rt::{async_trait, CmdBlock};
9use peace_item_model::ItemId;
10use peace_params::ParamsSpecs;
11use peace_resource_rt::{
12    internal::StatesMut,
13    resources::ts::SetUp,
14    states::{
15        ts::{Clean, Cleaned, CleanedDry, Ensured, EnsuredDry, Goal},
16        States, StatesCurrent, StatesPrevious,
17    },
18    ResourceFetchError, Resources,
19};
20use peace_rt_model::{
21    outcomes::{ItemApplyBoxed, ItemApplyPartialBoxed},
22    ItemBoxed, ItemRt,
23};
24use tokio::sync::mpsc::{self, Receiver};
25
26use peace_rt_model_core::IndexMap;
27use tokio::sync::mpsc::Sender;
28
29use crate::BUFFERED_FUTURES_MAX;
30
31cfg_if::cfg_if! {
32    if #[cfg(feature = "output_progress")] {
33        use std::error::Error;
34
35        use peace_progress_model::{
36            CmdBlockItemInteractionType,
37            CmdProgressUpdate,
38            ProgressComplete,
39            ProgressMsgUpdate,
40            ProgressUpdate,
41            ProgressUpdateAndId,
42            ProgressSender,
43        };
44    }
45}
46
47/// Stops a `CmdExecution` if stored states and discovered states are not in
48/// sync.
49pub struct ApplyExecCmdBlock<CmdCtxTypesT, StatesTs>(PhantomData<(CmdCtxTypesT, StatesTs)>);
50
51impl<CmdCtxTypesT, StatesTs> Debug for ApplyExecCmdBlock<CmdCtxTypesT, StatesTs> {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        f.debug_tuple("ApplyExecCmdBlock").field(&self.0).finish()
54    }
55}
56
57impl<CmdCtxTypesT, StatesTs> ApplyExecCmdBlock<CmdCtxTypesT, StatesTs> {
58    /// Returns an `ApplyExecCmdBlock`.
59    ///
60    /// This is a generic constructor where `StatesTs` determines whether the
61    /// goal state or clean state is the target state.
62    pub fn new() -> Self {
63        Self(PhantomData)
64    }
65}
66
67impl<CmdCtxTypesT, StatesTs> Default for ApplyExecCmdBlock<CmdCtxTypesT, StatesTs> {
68    fn default() -> Self {
69        Self(PhantomData)
70    }
71}
72
73impl<CmdCtxTypesT> ApplyExecCmdBlock<CmdCtxTypesT, Ensured>
74where
75    CmdCtxTypesT: CmdCtxTypes,
76{
77    /// Returns an `ApplyExecCmdBlock` with the goal state as the target state.
78    pub fn ensure() -> Self {
79        Self(PhantomData)
80    }
81}
82
83impl<CmdCtxTypesT> ApplyExecCmdBlock<CmdCtxTypesT, EnsuredDry>
84where
85    CmdCtxTypesT: CmdCtxTypes,
86{
87    /// Returns an `ApplyExecCmdBlock` with the goal state as the target state.
88    pub fn ensure_dry() -> Self {
89        Self(PhantomData)
90    }
91}
92
93impl<CmdCtxTypesT> ApplyExecCmdBlock<CmdCtxTypesT, Cleaned>
94where
95    CmdCtxTypesT: CmdCtxTypes,
96{
97    /// Returns an `ApplyExecCmdBlock` with the clean state as the target state.
98    pub fn clean() -> Self {
99        Self(PhantomData)
100    }
101}
102
103impl<CmdCtxTypesT> ApplyExecCmdBlock<CmdCtxTypesT, CleanedDry>
104where
105    CmdCtxTypesT: CmdCtxTypes,
106{
107    /// Returns an `ApplyExecCmdBlock` with the clean state as the target state.
108    pub fn clean_dry() -> Self {
109        Self(PhantomData)
110    }
111}
112
113impl<CmdCtxTypesT, StatesTs> ApplyExecCmdBlock<CmdCtxTypesT, StatesTs>
114where
115    CmdCtxTypesT: CmdCtxTypes,
116    StatesTs: StatesTsApplyExt + Debug + Send,
117{
118    ///
119    /// # Implementation Note
120    ///
121    /// Tried passing through the function to execute instead of a `dry_run`
122    /// parameter, but couldn't convince the compiler that the lifetimes match
123    /// up:
124    ///
125    /// ```rust,ignore
126    /// async fn item_apply_exec<F, Fut>(
127    ///     resources: &Resources<SetUp>,
128    ///     outcomes_tx: &Sender<ItemApplyOutcome<<CmdCtxTypesT as CmdCtxTypes>::AppError>>,
129    ///     item: FnRef<'_, ItemBoxed<<CmdCtxTypesT as CmdCtxTypes>::AppError>>,
130    ///     f: F,
131    /// ) -> bool
132    /// where
133    ///     F: (Fn(&dyn ItemRt<<CmdCtxTypesT as CmdCtxTypes>::AppError>, fn_ctx: OpCtx<'_>, &Resources<SetUp>, &mut ItemApplyBoxed) -> Fut) + Copy,
134    ///     Fut: Future<Output = Result<(), <CmdCtxTypesT as CmdCtxTypes>::AppError>>,
135    /// ```
136    async fn item_apply_exec(
137        item_apply_exec_ctx: ItemApplyExecCtx<'_, <CmdCtxTypesT as CmdCtxTypes>::AppError>,
138        item: &ItemBoxed<<CmdCtxTypesT as CmdCtxTypes>::AppError>,
139    ) -> Result<(), ()> {
140        let ItemApplyExecCtx {
141            params_specs,
142            resources,
143            apply_for_internal,
144            #[cfg(feature = "output_progress")]
145            progress_tx,
146            outcomes_tx,
147        } = item_apply_exec_ctx;
148
149        let item_id = item.id();
150
151        // Indicate this item is running, so that an `Interrupt` message from
152        // `CmdExecution` does not cause it to be rendered as `Interrupted`.
153        #[cfg(feature = "output_progress")]
154        let _progress_send_unused = progress_tx.try_send(
155            ProgressUpdateAndId {
156                item_id: item_id.clone(),
157                progress_update: ProgressUpdate::Queued,
158                msg_update: ProgressMsgUpdate::NoChange,
159            }
160            .into(),
161        );
162
163        let apply_fn = if StatesTs::dry_run() {
164            ItemRt::apply_exec_dry
165        } else {
166            ItemRt::apply_exec
167        };
168
169        let fn_ctx = FnCtx::new(
170            item_id,
171            #[cfg(feature = "output_progress")]
172            ProgressSender::new(item_id, progress_tx),
173        );
174        let item_apply = match apply_for_internal {
175            ApplyForInternal::Ensure => {
176                ItemRt::ensure_prepare(&**item, params_specs, resources, fn_ctx).await
177            }
178            ApplyForInternal::Clean { states_current } => {
179                ItemRt::clean_prepare(&**item, states_current, params_specs, resources).await
180            }
181        };
182
183        match item_apply {
184            Ok(mut item_apply) => {
185                match item_apply.apply_check() {
186                    #[cfg(not(feature = "output_progress"))]
187                    ApplyCheck::ExecRequired => {}
188                    #[cfg(feature = "output_progress")]
189                    ApplyCheck::ExecRequired { progress_limit } => {
190                        // Update `OutputWrite`s with progress limit.
191                        let _progress_send_unused = progress_tx.try_send(
192                            ProgressUpdateAndId {
193                                item_id: item_id.clone(),
194                                progress_update: ProgressUpdate::Limit(progress_limit),
195                                msg_update: ProgressMsgUpdate::Set(String::from("in progress")),
196                            }
197                            .into(),
198                        );
199                    }
200                    ApplyCheck::ExecNotRequired => {
201                        #[cfg(feature = "output_progress")]
202                        let _progress_send_unused = progress_tx.try_send(
203                            ProgressUpdateAndId {
204                                item_id: item_id.clone(),
205                                progress_update: ProgressUpdate::Complete(
206                                    ProgressComplete::Success,
207                                ),
208                                msg_update: ProgressMsgUpdate::Set(String::from("nothing to do!")),
209                            }
210                            .into(),
211                        );
212
213                        // TODO: write test for this case
214                        // In case of an interrupt or power failure, we may not have written states
215                        // to disk.
216                        outcomes_tx
217                            .send(ItemApplyOutcome::Success {
218                                item_id: item.id().clone(),
219                                item_apply,
220                            })
221                            .await
222                            .expect("unreachable: `outcomes_rx` is in a sibling task.");
223
224                        // short-circuit
225                        return Ok(());
226                    }
227                }
228                match apply_fn(&**item, params_specs, resources, fn_ctx, &mut item_apply).await {
229                    Ok(()) => {
230                        // apply succeeded
231
232                        #[cfg(feature = "output_progress")]
233                        let _progress_send_unused = progress_tx.try_send(
234                            ProgressUpdateAndId {
235                                item_id: item_id.clone(),
236                                progress_update: ProgressUpdate::Complete(
237                                    ProgressComplete::Success,
238                                ),
239                                msg_update: ProgressMsgUpdate::Set(String::from("done!")),
240                            }
241                            .into(),
242                        );
243
244                        outcomes_tx
245                            .send(ItemApplyOutcome::Success {
246                                item_id: item.id().clone(),
247                                item_apply,
248                            })
249                            .await
250                            .expect("unreachable: `outcomes_rx` is in a sibling task.");
251
252                        Ok(())
253                    }
254                    Err(error) => {
255                        // apply failed
256
257                        #[cfg(feature = "output_progress")]
258                        let _progress_send_unused = progress_tx.try_send(
259                            ProgressUpdateAndId {
260                                item_id: item_id.clone(),
261                                progress_update: ProgressUpdate::Complete(ProgressComplete::Fail),
262                                msg_update: ProgressMsgUpdate::Set(
263                                    error
264                                        .source()
265                                        .map(|source| format!("{source}"))
266                                        .unwrap_or_else(|| format!("{error}")),
267                                ),
268                            }
269                            .into(),
270                        );
271
272                        outcomes_tx
273                            .send(ItemApplyOutcome::Fail {
274                                item_id: item.id().clone(),
275                                item_apply,
276                                error,
277                            })
278                            .await
279                            .expect("unreachable: `outcomes_rx` is in a sibling task.");
280
281                        // we should stop processing.
282                        Err(())
283                    }
284                }
285            }
286            Err((error, item_apply_partial)) => {
287                #[cfg(feature = "output_progress")]
288                let _progress_send_unused = progress_tx.try_send(
289                    ProgressUpdateAndId {
290                        item_id: item.id().clone(),
291                        progress_update: ProgressUpdate::Complete(ProgressComplete::Fail),
292                        msg_update: ProgressMsgUpdate::Set(
293                            error
294                                .source()
295                                .map(|source| format!("{source}"))
296                                .unwrap_or_else(|| format!("{error}")),
297                        ),
298                    }
299                    .into(),
300                );
301
302                outcomes_tx
303                    .send(ItemApplyOutcome::PrepareFail {
304                        item_id: item.id().clone(),
305                        item_apply_partial,
306                        error,
307                    })
308                    .await
309                    .expect("unreachable: `outcomes_rx` is in a sibling task.");
310
311                Err(())
312            }
313        }
314    }
315
316    async fn outcome_collate_task(
317        mut outcomes_rx: Receiver<ItemApplyOutcome<<CmdCtxTypesT as CmdCtxTypes>::AppError>>,
318        mut states_applied_mut: StatesMut<StatesTs>,
319        mut states_target_mut: StatesMut<StatesTs::TsTarget>,
320    ) -> Result<
321        (
322            States<StatesTs>,
323            States<StatesTs::TsTarget>,
324            IndexMap<ItemId, <CmdCtxTypesT as CmdCtxTypes>::AppError>,
325        ),
326        <CmdCtxTypesT as CmdCtxTypes>::AppError,
327    > {
328        let mut errors = IndexMap::new();
329        while let Some(item_outcome) = outcomes_rx.recv().await {
330            Self::outcome_collate(
331                &mut states_applied_mut,
332                &mut states_target_mut,
333                &mut errors,
334                item_outcome,
335            )?;
336        }
337
338        let states_applied = States::<StatesTs>::from(states_applied_mut);
339        let states_target = States::<StatesTs::TsTarget>::from(states_target_mut);
340
341        Ok((states_applied, states_target, errors))
342    }
343
344    fn outcome_collate(
345        states_applied_mut: &mut StatesMut<StatesTs>,
346        states_target_mut: &mut StatesMut<StatesTs::TsTarget>,
347        errors: &mut IndexMap<ItemId, <CmdCtxTypesT as CmdCtxTypes>::AppError>,
348        outcome_partial: ItemApplyOutcome<<CmdCtxTypesT as CmdCtxTypes>::AppError>,
349    ) -> Result<(), <CmdCtxTypesT as CmdCtxTypes>::AppError> {
350        let apply_for = StatesTs::apply_for();
351
352        match outcome_partial {
353            ItemApplyOutcome::PrepareFail {
354                item_id,
355                item_apply_partial,
356                error,
357            } => {
358                errors.insert(item_id.clone(), error);
359
360                // Save `state_target` (which is `state_goal`) if we are not cleaning
361                // up.
362                match apply_for {
363                    ApplyFor::Ensure => {
364                        if let Some(state_target) = item_apply_partial.state_target() {
365                            states_target_mut.insert_raw(item_id, state_target);
366                        }
367                    }
368                    ApplyFor::Clean => {}
369                }
370            }
371            ItemApplyOutcome::Success {
372                item_id,
373                item_apply,
374            } => {
375                if let Some(state_applied) = item_apply.state_applied() {
376                    states_applied_mut.insert_raw(item_id.clone(), state_applied);
377                } else {
378                    // Item was already in the goal state.
379                    // No change to current state.
380                }
381
382                // Save `state_target` (which is state_target) if we are not cleaning
383                // up.
384                match apply_for {
385                    ApplyFor::Ensure => {
386                        let state_target = item_apply.state_target();
387                        states_target_mut.insert_raw(item_id, state_target);
388                    }
389                    ApplyFor::Clean => {}
390                }
391            }
392            ItemApplyOutcome::Fail {
393                item_id,
394                item_apply,
395                error,
396            } => {
397                errors.insert(item_id.clone(), error);
398                if let Some(state_applied) = item_apply.state_applied() {
399                    states_applied_mut.insert_raw(item_id.clone(), state_applied);
400                }
401
402                // Save `state_target` (which is `state_goal`) if we are not cleaning
403                // up.
404                match apply_for {
405                    ApplyFor::Ensure => {
406                        let state_target = item_apply.state_target();
407                        states_target_mut.insert_raw(item_id, state_target);
408                    }
409                    ApplyFor::Clean => {}
410                }
411            }
412        }
413
414        Ok(())
415    }
416}
417
418#[async_trait(?Send)]
419impl<CmdCtxTypesT, StatesTs> CmdBlock for ApplyExecCmdBlock<CmdCtxTypesT, StatesTs>
420where
421    CmdCtxTypesT: CmdCtxTypes,
422    StatesTs: StatesTsApplyExt + Debug + Send + Sync + 'static,
423{
424    type CmdCtxTypes = CmdCtxTypesT;
425    type InputT = (StatesCurrent, States<StatesTs::TsTarget>);
426    type Outcome = (StatesPrevious, States<StatesTs>, States<StatesTs::TsTarget>);
427
428    #[cfg(feature = "output_progress")]
429    fn cmd_block_item_interaction_type(&self) -> CmdBlockItemInteractionType {
430        CmdBlockItemInteractionType::Write
431    }
432
433    fn input_fetch(
434        &self,
435        resources: &mut Resources<SetUp>,
436    ) -> Result<Self::InputT, ResourceFetchError> {
437        let states_current = resources.try_remove::<StatesCurrent>()?;
438
439        let states_target = resources.try_remove::<States<StatesTs::TsTarget>>()?;
440
441        Ok((states_current, states_target))
442    }
443
444    fn input_type_names(&self) -> Vec<String> {
445        vec![
446            tynm::type_name::<StatesCurrent>(),
447            tynm::type_name::<States<StatesTs::TsTarget>>(),
448        ]
449    }
450
451    fn outcome_insert(&self, resources: &mut Resources<SetUp>, outcome: Self::Outcome) {
452        let (states_previous, states_applied, states_target) = outcome;
453        resources.insert(states_previous);
454        resources.insert(states_applied);
455        resources.insert(states_target);
456    }
457
458    fn outcome_type_names(&self) -> Vec<String> {
459        vec![
460            tynm::type_name::<StatesPrevious>(),
461            tynm::type_name::<States<StatesTs>>(),
462            tynm::type_name::<States<StatesTs::TsTarget>>(),
463        ]
464    }
465
466    async fn exec(
467        &self,
468        input: Self::InputT,
469        cmd_ctx_spsf_fields: &mut CmdCtxSpsfFields<'_, Self::CmdCtxTypes>,
470        #[cfg(feature = "output_progress")] progress_tx: &Sender<CmdProgressUpdate>,
471    ) -> Result<
472        CmdBlockOutcome<Self::Outcome, <Self::CmdCtxTypes as CmdCtxTypes>::AppError>,
473        <Self::CmdCtxTypes as CmdCtxTypes>::AppError,
474    > {
475        let (states_current, states_target) = input;
476        let (states_previous, states_applied_mut, states_target_mut) = {
477            let states_previous = StatesPrevious::from(states_current.clone());
478            // `Ensured`, `EnsuredDry`, `Cleaned`, `CleanedDry` states start as the current
479            // state, and are altered.
480            let states_applied_mut =
481                StatesMut::<StatesTs>::from(states_current.clone().into_inner());
482            let states_target_mut =
483                StatesMut::<StatesTs::TsTarget>::from(states_target.clone().into_inner());
484
485            (states_previous, states_applied_mut, states_target_mut)
486        };
487
488        let CmdCtxSpsfFields {
489            interruptibility_state,
490            flow,
491            params_specs,
492            resources,
493            ..
494        } = cmd_ctx_spsf_fields;
495
496        let item_graph = flow.graph();
497        let resources_ref = &*resources;
498        let apply_for = StatesTs::apply_for();
499        let apply_for_internal = match apply_for {
500            ApplyFor::Ensure => ApplyForInternal::Ensure,
501            ApplyFor::Clean => ApplyForInternal::Clean { states_current },
502        };
503
504        let (outcomes_tx, outcomes_rx) = mpsc::channel::<
505            ItemApplyOutcome<<CmdCtxTypesT as CmdCtxTypes>::AppError>,
506        >(item_graph.node_count());
507
508        let stream_opts = {
509            let stream_opts = StreamOpts::new()
510                .interruptibility_state(interruptibility_state.reborrow())
511                .interrupted_next_item_include(false);
512            match apply_for {
513                ApplyFor::Ensure => stream_opts,
514                ApplyFor::Clean => stream_opts.rev(),
515            }
516        };
517
518        let (stream_outcome_result, outcome_collate) = {
519            let item_apply_exec_task = async move {
520                let stream_outcome = item_graph
521                    .try_for_each_concurrent_with(BUFFERED_FUTURES_MAX, stream_opts, |item| {
522                        let item_apply_exec_ctx = ItemApplyExecCtx {
523                            params_specs,
524                            resources: resources_ref,
525                            apply_for_internal: &apply_for_internal,
526                            #[cfg(feature = "output_progress")]
527                            progress_tx,
528                            outcomes_tx: &outcomes_tx,
529                        };
530                        Self::item_apply_exec(item_apply_exec_ctx, item)
531                    })
532                    .await;
533
534                drop(outcomes_tx);
535
536                stream_outcome
537            };
538            let outcome_collate_task =
539                Self::outcome_collate_task(outcomes_rx, states_applied_mut, states_target_mut);
540
541            join!(item_apply_exec_task, outcome_collate_task)
542        };
543        let (states_applied, states_target, errors) = outcome_collate?;
544
545        let stream_outcome = {
546            let (Ok(stream_outcome) | Err((stream_outcome, ()))) = stream_outcome_result.map_err(
547                |(stream_outcome, _vec_unit): (StreamOutcome<()>, Vec<()>)| (stream_outcome, ()),
548            );
549
550            stream_outcome.map(|()| (states_previous, states_applied, states_target))
551        };
552
553        Ok(CmdBlockOutcome::ItemWise {
554            stream_outcome,
555            errors,
556        })
557    }
558}
559
560/// Whether the `ApplyCmd` is for `Ensure` or `Clean`.
561#[derive(Clone, Copy, Debug, PartialEq, Eq)]
562pub enum ApplyFor {
563    /// The apply target state is `state_goal`.
564    Ensure,
565    /// The apply target state is `state_clean`.
566    Clean,
567}
568
569/// Whether the `ApplyCmd` is for `Ensure` or `Clean`.
570#[derive(Debug)]
571enum ApplyForInternal {
572    Ensure,
573    Clean { states_current: StatesCurrent },
574}
575
576struct ItemApplyExecCtx<'f, E> {
577    /// Map of item ID to its params' specs.
578    params_specs: &'f ParamsSpecs,
579    /// Map of all types at runtime.
580    resources: &'f Resources<SetUp>,
581    /// Whether the `ApplyCmd` is for `Ensure` or `Clean`.
582    apply_for_internal: &'f ApplyForInternal,
583    /// Channel sender for `CmdBlock` item outcomes.
584    #[cfg(feature = "output_progress")]
585    progress_tx: &'f Sender<CmdProgressUpdate>,
586    outcomes_tx: &'f Sender<ItemApplyOutcome<E>>,
587}
588
589#[derive(Debug)]
590pub enum ItemApplyOutcome<E> {
591    /// Error occurred when discovering current state, goal states, state
592    /// diff, or `ApplyCheck`.
593    PrepareFail {
594        item_id: ItemId,
595        item_apply_partial: ItemApplyPartialBoxed,
596        error: E,
597    },
598    /// Ensure execution succeeded.
599    Success {
600        item_id: ItemId,
601        item_apply: ItemApplyBoxed,
602    },
603    /// Ensure execution failed.
604    Fail {
605        item_id: ItemId,
606        item_apply: ItemApplyBoxed,
607        error: E,
608    },
609}
610
611/// Infers the target state, ensure or clean, and dry run, from a `StateTs`.
612pub trait StatesTsApplyExt {
613    type TsTarget: Debug + Send + Sync + Unpin + 'static;
614
615    /// Returns the `ApplyFor` this `StatesTs` is meant for.
616    fn apply_for() -> ApplyFor;
617    /// Returns whether this `StatesTs` is for a dry run.
618    fn dry_run() -> bool;
619}
620
621impl StatesTsApplyExt for Ensured {
622    type TsTarget = Goal;
623
624    fn apply_for() -> ApplyFor {
625        ApplyFor::Ensure
626    }
627
628    fn dry_run() -> bool {
629        false
630    }
631}
632
633impl StatesTsApplyExt for EnsuredDry {
634    type TsTarget = Goal;
635
636    fn apply_for() -> ApplyFor {
637        ApplyFor::Ensure
638    }
639
640    fn dry_run() -> bool {
641        true
642    }
643}
644
645impl StatesTsApplyExt for Cleaned {
646    type TsTarget = Clean;
647
648    fn apply_for() -> ApplyFor {
649        ApplyFor::Clean
650    }
651
652    fn dry_run() -> bool {
653        false
654    }
655}
656
657impl StatesTsApplyExt for CleanedDry {
658    type TsTarget = Clean;
659
660    fn apply_for() -> ApplyFor {
661        ApplyFor::Clean
662    }
663
664    fn dry_run() -> bool {
665        true
666    }
667}