1use std::{fmt::Debug, marker::PhantomData};
2
3use fn_graph::{StreamOpts, StreamOutcome};
4use futures::join;
5use peace_cfg::{ApplyCheck, FnCtx};
6use peace_cmd_ctx::{CmdCtxSpsfFields, CmdCtxTypes};
7use peace_cmd_model::CmdBlockOutcome;
8use peace_cmd_rt::{async_trait, CmdBlock};
9use peace_item_model::ItemId;
10use peace_params::ParamsSpecs;
11use peace_resource_rt::{
12 internal::StatesMut,
13 resources::ts::SetUp,
14 states::{
15 ts::{Clean, Cleaned, CleanedDry, Ensured, EnsuredDry, Goal},
16 States, StatesCurrent, StatesPrevious,
17 },
18 ResourceFetchError, Resources,
19};
20use peace_rt_model::{
21 outcomes::{ItemApplyBoxed, ItemApplyPartialBoxed},
22 ItemBoxed, ItemRt,
23};
24use tokio::sync::mpsc::{self, Receiver};
25
26use peace_rt_model_core::IndexMap;
27use tokio::sync::mpsc::Sender;
28
29use crate::BUFFERED_FUTURES_MAX;
30
31cfg_if::cfg_if! {
32 if #[cfg(feature = "output_progress")] {
33 use std::error::Error;
34
35 use peace_progress_model::{
36 CmdBlockItemInteractionType,
37 CmdProgressUpdate,
38 ProgressComplete,
39 ProgressMsgUpdate,
40 ProgressUpdate,
41 ProgressUpdateAndId,
42 ProgressSender,
43 };
44 }
45}
46
47pub struct ApplyExecCmdBlock<CmdCtxTypesT, StatesTs>(PhantomData<(CmdCtxTypesT, StatesTs)>);
50
51impl<CmdCtxTypesT, StatesTs> Debug for ApplyExecCmdBlock<CmdCtxTypesT, StatesTs> {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 f.debug_tuple("ApplyExecCmdBlock").field(&self.0).finish()
54 }
55}
56
57impl<CmdCtxTypesT, StatesTs> ApplyExecCmdBlock<CmdCtxTypesT, StatesTs> {
58 pub fn new() -> Self {
63 Self(PhantomData)
64 }
65}
66
67impl<CmdCtxTypesT, StatesTs> Default for ApplyExecCmdBlock<CmdCtxTypesT, StatesTs> {
68 fn default() -> Self {
69 Self(PhantomData)
70 }
71}
72
73impl<CmdCtxTypesT> ApplyExecCmdBlock<CmdCtxTypesT, Ensured>
74where
75 CmdCtxTypesT: CmdCtxTypes,
76{
77 pub fn ensure() -> Self {
79 Self(PhantomData)
80 }
81}
82
83impl<CmdCtxTypesT> ApplyExecCmdBlock<CmdCtxTypesT, EnsuredDry>
84where
85 CmdCtxTypesT: CmdCtxTypes,
86{
87 pub fn ensure_dry() -> Self {
89 Self(PhantomData)
90 }
91}
92
93impl<CmdCtxTypesT> ApplyExecCmdBlock<CmdCtxTypesT, Cleaned>
94where
95 CmdCtxTypesT: CmdCtxTypes,
96{
97 pub fn clean() -> Self {
99 Self(PhantomData)
100 }
101}
102
103impl<CmdCtxTypesT> ApplyExecCmdBlock<CmdCtxTypesT, CleanedDry>
104where
105 CmdCtxTypesT: CmdCtxTypes,
106{
107 pub fn clean_dry() -> Self {
109 Self(PhantomData)
110 }
111}
112
113impl<CmdCtxTypesT, StatesTs> ApplyExecCmdBlock<CmdCtxTypesT, StatesTs>
114where
115 CmdCtxTypesT: CmdCtxTypes,
116 StatesTs: StatesTsApplyExt + Debug + Send,
117{
118 async fn item_apply_exec(
137 item_apply_exec_ctx: ItemApplyExecCtx<'_, <CmdCtxTypesT as CmdCtxTypes>::AppError>,
138 item: &ItemBoxed<<CmdCtxTypesT as CmdCtxTypes>::AppError>,
139 ) -> Result<(), ()> {
140 let ItemApplyExecCtx {
141 params_specs,
142 resources,
143 apply_for_internal,
144 #[cfg(feature = "output_progress")]
145 progress_tx,
146 outcomes_tx,
147 } = item_apply_exec_ctx;
148
149 let item_id = item.id();
150
151 #[cfg(feature = "output_progress")]
154 let _progress_send_unused = progress_tx.try_send(
155 ProgressUpdateAndId {
156 item_id: item_id.clone(),
157 progress_update: ProgressUpdate::Queued,
158 msg_update: ProgressMsgUpdate::NoChange,
159 }
160 .into(),
161 );
162
163 let apply_fn = if StatesTs::dry_run() {
164 ItemRt::apply_exec_dry
165 } else {
166 ItemRt::apply_exec
167 };
168
169 let fn_ctx = FnCtx::new(
170 item_id,
171 #[cfg(feature = "output_progress")]
172 ProgressSender::new(item_id, progress_tx),
173 );
174 let item_apply = match apply_for_internal {
175 ApplyForInternal::Ensure => {
176 ItemRt::ensure_prepare(&**item, params_specs, resources, fn_ctx).await
177 }
178 ApplyForInternal::Clean { states_current } => {
179 ItemRt::clean_prepare(&**item, states_current, params_specs, resources).await
180 }
181 };
182
183 match item_apply {
184 Ok(mut item_apply) => {
185 match item_apply.apply_check() {
186 #[cfg(not(feature = "output_progress"))]
187 ApplyCheck::ExecRequired => {}
188 #[cfg(feature = "output_progress")]
189 ApplyCheck::ExecRequired { progress_limit } => {
190 let _progress_send_unused = progress_tx.try_send(
192 ProgressUpdateAndId {
193 item_id: item_id.clone(),
194 progress_update: ProgressUpdate::Limit(progress_limit),
195 msg_update: ProgressMsgUpdate::Set(String::from("in progress")),
196 }
197 .into(),
198 );
199 }
200 ApplyCheck::ExecNotRequired => {
201 #[cfg(feature = "output_progress")]
202 let _progress_send_unused = progress_tx.try_send(
203 ProgressUpdateAndId {
204 item_id: item_id.clone(),
205 progress_update: ProgressUpdate::Complete(
206 ProgressComplete::Success,
207 ),
208 msg_update: ProgressMsgUpdate::Set(String::from("nothing to do!")),
209 }
210 .into(),
211 );
212
213 outcomes_tx
217 .send(ItemApplyOutcome::Success {
218 item_id: item.id().clone(),
219 item_apply,
220 })
221 .await
222 .expect("unreachable: `outcomes_rx` is in a sibling task.");
223
224 return Ok(());
226 }
227 }
228 match apply_fn(&**item, params_specs, resources, fn_ctx, &mut item_apply).await {
229 Ok(()) => {
230 #[cfg(feature = "output_progress")]
233 let _progress_send_unused = progress_tx.try_send(
234 ProgressUpdateAndId {
235 item_id: item_id.clone(),
236 progress_update: ProgressUpdate::Complete(
237 ProgressComplete::Success,
238 ),
239 msg_update: ProgressMsgUpdate::Set(String::from("done!")),
240 }
241 .into(),
242 );
243
244 outcomes_tx
245 .send(ItemApplyOutcome::Success {
246 item_id: item.id().clone(),
247 item_apply,
248 })
249 .await
250 .expect("unreachable: `outcomes_rx` is in a sibling task.");
251
252 Ok(())
253 }
254 Err(error) => {
255 #[cfg(feature = "output_progress")]
258 let _progress_send_unused = progress_tx.try_send(
259 ProgressUpdateAndId {
260 item_id: item_id.clone(),
261 progress_update: ProgressUpdate::Complete(ProgressComplete::Fail),
262 msg_update: ProgressMsgUpdate::Set(
263 error
264 .source()
265 .map(|source| format!("{source}"))
266 .unwrap_or_else(|| format!("{error}")),
267 ),
268 }
269 .into(),
270 );
271
272 outcomes_tx
273 .send(ItemApplyOutcome::Fail {
274 item_id: item.id().clone(),
275 item_apply,
276 error,
277 })
278 .await
279 .expect("unreachable: `outcomes_rx` is in a sibling task.");
280
281 Err(())
283 }
284 }
285 }
286 Err((error, item_apply_partial)) => {
287 #[cfg(feature = "output_progress")]
288 let _progress_send_unused = progress_tx.try_send(
289 ProgressUpdateAndId {
290 item_id: item.id().clone(),
291 progress_update: ProgressUpdate::Complete(ProgressComplete::Fail),
292 msg_update: ProgressMsgUpdate::Set(
293 error
294 .source()
295 .map(|source| format!("{source}"))
296 .unwrap_or_else(|| format!("{error}")),
297 ),
298 }
299 .into(),
300 );
301
302 outcomes_tx
303 .send(ItemApplyOutcome::PrepareFail {
304 item_id: item.id().clone(),
305 item_apply_partial,
306 error,
307 })
308 .await
309 .expect("unreachable: `outcomes_rx` is in a sibling task.");
310
311 Err(())
312 }
313 }
314 }
315
316 async fn outcome_collate_task(
317 mut outcomes_rx: Receiver<ItemApplyOutcome<<CmdCtxTypesT as CmdCtxTypes>::AppError>>,
318 mut states_applied_mut: StatesMut<StatesTs>,
319 mut states_target_mut: StatesMut<StatesTs::TsTarget>,
320 ) -> Result<
321 (
322 States<StatesTs>,
323 States<StatesTs::TsTarget>,
324 IndexMap<ItemId, <CmdCtxTypesT as CmdCtxTypes>::AppError>,
325 ),
326 <CmdCtxTypesT as CmdCtxTypes>::AppError,
327 > {
328 let mut errors = IndexMap::new();
329 while let Some(item_outcome) = outcomes_rx.recv().await {
330 Self::outcome_collate(
331 &mut states_applied_mut,
332 &mut states_target_mut,
333 &mut errors,
334 item_outcome,
335 )?;
336 }
337
338 let states_applied = States::<StatesTs>::from(states_applied_mut);
339 let states_target = States::<StatesTs::TsTarget>::from(states_target_mut);
340
341 Ok((states_applied, states_target, errors))
342 }
343
344 fn outcome_collate(
345 states_applied_mut: &mut StatesMut<StatesTs>,
346 states_target_mut: &mut StatesMut<StatesTs::TsTarget>,
347 errors: &mut IndexMap<ItemId, <CmdCtxTypesT as CmdCtxTypes>::AppError>,
348 outcome_partial: ItemApplyOutcome<<CmdCtxTypesT as CmdCtxTypes>::AppError>,
349 ) -> Result<(), <CmdCtxTypesT as CmdCtxTypes>::AppError> {
350 let apply_for = StatesTs::apply_for();
351
352 match outcome_partial {
353 ItemApplyOutcome::PrepareFail {
354 item_id,
355 item_apply_partial,
356 error,
357 } => {
358 errors.insert(item_id.clone(), error);
359
360 match apply_for {
363 ApplyFor::Ensure => {
364 if let Some(state_target) = item_apply_partial.state_target() {
365 states_target_mut.insert_raw(item_id, state_target);
366 }
367 }
368 ApplyFor::Clean => {}
369 }
370 }
371 ItemApplyOutcome::Success {
372 item_id,
373 item_apply,
374 } => {
375 if let Some(state_applied) = item_apply.state_applied() {
376 states_applied_mut.insert_raw(item_id.clone(), state_applied);
377 } else {
378 }
381
382 match apply_for {
385 ApplyFor::Ensure => {
386 let state_target = item_apply.state_target();
387 states_target_mut.insert_raw(item_id, state_target);
388 }
389 ApplyFor::Clean => {}
390 }
391 }
392 ItemApplyOutcome::Fail {
393 item_id,
394 item_apply,
395 error,
396 } => {
397 errors.insert(item_id.clone(), error);
398 if let Some(state_applied) = item_apply.state_applied() {
399 states_applied_mut.insert_raw(item_id.clone(), state_applied);
400 }
401
402 match apply_for {
405 ApplyFor::Ensure => {
406 let state_target = item_apply.state_target();
407 states_target_mut.insert_raw(item_id, state_target);
408 }
409 ApplyFor::Clean => {}
410 }
411 }
412 }
413
414 Ok(())
415 }
416}
417
418#[async_trait(?Send)]
419impl<CmdCtxTypesT, StatesTs> CmdBlock for ApplyExecCmdBlock<CmdCtxTypesT, StatesTs>
420where
421 CmdCtxTypesT: CmdCtxTypes,
422 StatesTs: StatesTsApplyExt + Debug + Send + Sync + 'static,
423{
424 type CmdCtxTypes = CmdCtxTypesT;
425 type InputT = (StatesCurrent, States<StatesTs::TsTarget>);
426 type Outcome = (StatesPrevious, States<StatesTs>, States<StatesTs::TsTarget>);
427
428 #[cfg(feature = "output_progress")]
429 fn cmd_block_item_interaction_type(&self) -> CmdBlockItemInteractionType {
430 CmdBlockItemInteractionType::Write
431 }
432
433 fn input_fetch(
434 &self,
435 resources: &mut Resources<SetUp>,
436 ) -> Result<Self::InputT, ResourceFetchError> {
437 let states_current = resources.try_remove::<StatesCurrent>()?;
438
439 let states_target = resources.try_remove::<States<StatesTs::TsTarget>>()?;
440
441 Ok((states_current, states_target))
442 }
443
444 fn input_type_names(&self) -> Vec<String> {
445 vec![
446 tynm::type_name::<StatesCurrent>(),
447 tynm::type_name::<States<StatesTs::TsTarget>>(),
448 ]
449 }
450
451 fn outcome_insert(&self, resources: &mut Resources<SetUp>, outcome: Self::Outcome) {
452 let (states_previous, states_applied, states_target) = outcome;
453 resources.insert(states_previous);
454 resources.insert(states_applied);
455 resources.insert(states_target);
456 }
457
458 fn outcome_type_names(&self) -> Vec<String> {
459 vec![
460 tynm::type_name::<StatesPrevious>(),
461 tynm::type_name::<States<StatesTs>>(),
462 tynm::type_name::<States<StatesTs::TsTarget>>(),
463 ]
464 }
465
466 async fn exec(
467 &self,
468 input: Self::InputT,
469 cmd_ctx_spsf_fields: &mut CmdCtxSpsfFields<'_, Self::CmdCtxTypes>,
470 #[cfg(feature = "output_progress")] progress_tx: &Sender<CmdProgressUpdate>,
471 ) -> Result<
472 CmdBlockOutcome<Self::Outcome, <Self::CmdCtxTypes as CmdCtxTypes>::AppError>,
473 <Self::CmdCtxTypes as CmdCtxTypes>::AppError,
474 > {
475 let (states_current, states_target) = input;
476 let (states_previous, states_applied_mut, states_target_mut) = {
477 let states_previous = StatesPrevious::from(states_current.clone());
478 let states_applied_mut =
481 StatesMut::<StatesTs>::from(states_current.clone().into_inner());
482 let states_target_mut =
483 StatesMut::<StatesTs::TsTarget>::from(states_target.clone().into_inner());
484
485 (states_previous, states_applied_mut, states_target_mut)
486 };
487
488 let CmdCtxSpsfFields {
489 interruptibility_state,
490 flow,
491 params_specs,
492 resources,
493 ..
494 } = cmd_ctx_spsf_fields;
495
496 let item_graph = flow.graph();
497 let resources_ref = &*resources;
498 let apply_for = StatesTs::apply_for();
499 let apply_for_internal = match apply_for {
500 ApplyFor::Ensure => ApplyForInternal::Ensure,
501 ApplyFor::Clean => ApplyForInternal::Clean { states_current },
502 };
503
504 let (outcomes_tx, outcomes_rx) = mpsc::channel::<
505 ItemApplyOutcome<<CmdCtxTypesT as CmdCtxTypes>::AppError>,
506 >(item_graph.node_count());
507
508 let stream_opts = {
509 let stream_opts = StreamOpts::new()
510 .interruptibility_state(interruptibility_state.reborrow())
511 .interrupted_next_item_include(false);
512 match apply_for {
513 ApplyFor::Ensure => stream_opts,
514 ApplyFor::Clean => stream_opts.rev(),
515 }
516 };
517
518 let (stream_outcome_result, outcome_collate) = {
519 let item_apply_exec_task = async move {
520 let stream_outcome = item_graph
521 .try_for_each_concurrent_with(BUFFERED_FUTURES_MAX, stream_opts, |item| {
522 let item_apply_exec_ctx = ItemApplyExecCtx {
523 params_specs,
524 resources: resources_ref,
525 apply_for_internal: &apply_for_internal,
526 #[cfg(feature = "output_progress")]
527 progress_tx,
528 outcomes_tx: &outcomes_tx,
529 };
530 Self::item_apply_exec(item_apply_exec_ctx, item)
531 })
532 .await;
533
534 drop(outcomes_tx);
535
536 stream_outcome
537 };
538 let outcome_collate_task =
539 Self::outcome_collate_task(outcomes_rx, states_applied_mut, states_target_mut);
540
541 join!(item_apply_exec_task, outcome_collate_task)
542 };
543 let (states_applied, states_target, errors) = outcome_collate?;
544
545 let stream_outcome = {
546 let (Ok(stream_outcome) | Err((stream_outcome, ()))) = stream_outcome_result.map_err(
547 |(stream_outcome, _vec_unit): (StreamOutcome<()>, Vec<()>)| (stream_outcome, ()),
548 );
549
550 stream_outcome.map(|()| (states_previous, states_applied, states_target))
551 };
552
553 Ok(CmdBlockOutcome::ItemWise {
554 stream_outcome,
555 errors,
556 })
557 }
558}
559
560#[derive(Clone, Copy, Debug, PartialEq, Eq)]
562pub enum ApplyFor {
563 Ensure,
565 Clean,
567}
568
569#[derive(Debug)]
571enum ApplyForInternal {
572 Ensure,
573 Clean { states_current: StatesCurrent },
574}
575
576struct ItemApplyExecCtx<'f, E> {
577 params_specs: &'f ParamsSpecs,
579 resources: &'f Resources<SetUp>,
581 apply_for_internal: &'f ApplyForInternal,
583 #[cfg(feature = "output_progress")]
585 progress_tx: &'f Sender<CmdProgressUpdate>,
586 outcomes_tx: &'f Sender<ItemApplyOutcome<E>>,
587}
588
589#[derive(Debug)]
590pub enum ItemApplyOutcome<E> {
591 PrepareFail {
594 item_id: ItemId,
595 item_apply_partial: ItemApplyPartialBoxed,
596 error: E,
597 },
598 Success {
600 item_id: ItemId,
601 item_apply: ItemApplyBoxed,
602 },
603 Fail {
605 item_id: ItemId,
606 item_apply: ItemApplyBoxed,
607 error: E,
608 },
609}
610
611pub trait StatesTsApplyExt {
613 type TsTarget: Debug + Send + Sync + Unpin + 'static;
614
615 fn apply_for() -> ApplyFor;
617 fn dry_run() -> bool;
619}
620
621impl StatesTsApplyExt for Ensured {
622 type TsTarget = Goal;
623
624 fn apply_for() -> ApplyFor {
625 ApplyFor::Ensure
626 }
627
628 fn dry_run() -> bool {
629 false
630 }
631}
632
633impl StatesTsApplyExt for EnsuredDry {
634 type TsTarget = Goal;
635
636 fn apply_for() -> ApplyFor {
637 ApplyFor::Ensure
638 }
639
640 fn dry_run() -> bool {
641 true
642 }
643}
644
645impl StatesTsApplyExt for Cleaned {
646 type TsTarget = Clean;
647
648 fn apply_for() -> ApplyFor {
649 ApplyFor::Clean
650 }
651
652 fn dry_run() -> bool {
653 false
654 }
655}
656
657impl StatesTsApplyExt for CleanedDry {
658 type TsTarget = Clean;
659
660 fn apply_for() -> ApplyFor {
661 ApplyFor::Clean
662 }
663
664 fn dry_run() -> bool {
665 true
666 }
667}