txtx_core/
lib.rs

1#[macro_use]
2extern crate lazy_static;
3
4#[macro_use]
5pub extern crate txtx_addon_kit as kit;
6
7pub extern crate mustache;
8
9mod constants;
10pub mod errors;
11pub mod eval;
12pub mod manifest;
13// pub mod snapshot;
14pub mod runbook;
15pub mod std;
16pub mod templates;
17pub mod types;
18
19#[cfg(test)]
20mod tests;
21pub mod utils;
22
23use ::std::collections::BTreeMap;
24use ::std::future::Future;
25use ::std::pin::Pin;
26use ::std::thread::sleep;
27use ::std::time::Duration;
28
29use crate::runbook::flow_context::FlowContext;
30use constants::ACTION_ITEM_ENV;
31use constants::ACTION_ITEM_GENESIS;
32use constants::ACTION_ITEM_VALIDATE_BLOCK;
33use eval::run_constructs_evaluation;
34use eval::run_signers_evaluation;
35use kit::constants::ACTION_ITEM_CHECK_BALANCE;
36use runbook::get_source_context_for_diagnostic;
37use tokio::sync::broadcast::error::TryRecvError;
38use txtx_addon_kit::channel::Sender;
39use txtx_addon_kit::constants::ACTION_ITEM_CHECK_ADDRESS;
40use txtx_addon_kit::hcl::Span;
41use txtx_addon_kit::types::block_id::BlockId;
42use txtx_addon_kit::types::commands::CommandExecutionResult;
43use txtx_addon_kit::types::diagnostics::Diagnostic;
44use txtx_addon_kit::types::frontend::ActionItemRequest;
45use txtx_addon_kit::types::frontend::ActionItemRequestType;
46use txtx_addon_kit::types::frontend::ActionItemRequestUpdate;
47use txtx_addon_kit::types::frontend::ActionItemResponse;
48use txtx_addon_kit::types::frontend::ActionItemResponseType;
49use txtx_addon_kit::types::frontend::ActionItemStatus;
50use txtx_addon_kit::types::frontend::Actions;
51use txtx_addon_kit::types::frontend::Block;
52use txtx_addon_kit::types::frontend::BlockEvent;
53use txtx_addon_kit::types::frontend::ErrorPanelData;
54use txtx_addon_kit::types::frontend::InputOption;
55use txtx_addon_kit::types::frontend::NormalizedActionItemRequestUpdate;
56use txtx_addon_kit::types::frontend::Panel;
57use txtx_addon_kit::types::frontend::PickInputOptionRequest;
58use txtx_addon_kit::types::frontend::ProgressBarVisibilityUpdate;
59use txtx_addon_kit::types::frontend::ReviewedInputResponse;
60use txtx_addon_kit::types::frontend::ValidateBlockData;
61use txtx_addon_kit::types::types::RunbookSupervisionContext;
62use txtx_addon_kit::types::ConstructDid;
63use txtx_addon_kit::uuid::Uuid;
64use types::Runbook;
65
66lazy_static! {
67    // create this action so we can reference its `id` property, which is built from the immutable data
68     pub static ref SET_ENV_ACTION: ActionItemRequest = ActionItemRequest ::new(
69        &None,
70        "Select the environment to target",
71        None,
72        ActionItemStatus::Success(None),
73        ActionItemRequestType::PickInputOption(PickInputOptionRequest {
74            options: vec![],
75            selected: InputOption::default(),
76        }),
77        ACTION_ITEM_ENV,
78      );
79}
80
81pub async fn start_unsupervised_runbook_runloop(
82    runbook: &mut Runbook,
83    progress_tx: &txtx_addon_kit::channel::Sender<BlockEvent>,
84) -> Result<(), Vec<Diagnostic>> {
85    runbook.supervision_context = RunbookSupervisionContext {
86        review_input_default_values: false,
87        review_input_values: false,
88        is_supervised: false,
89    };
90
91    for flow_context in runbook.flow_contexts.iter_mut() {
92        if !flow_context.is_enabled() {
93            continue;
94        }
95
96        let mut action_item_requests = BTreeMap::new();
97        let action_item_responses = BTreeMap::new();
98
99        let pass_results = run_signers_evaluation(
100            &flow_context.workspace_context,
101            &mut flow_context.execution_context,
102            &runbook.runtime_context,
103            &runbook.supervision_context,
104            &mut action_item_requests,
105            &action_item_responses,
106            &progress_tx,
107        )
108        .await;
109
110        if pass_results.actions.has_pending_actions() {
111            return Err(vec![diagnosed_error!(
112                "unsupervised executions should not be generating actions"
113            )]);
114        }
115
116        if pass_results.has_diagnostics() {
117            return Err(pass_results.with_spans_filled(&runbook.sources));
118        }
119
120        let mut uuid = Uuid::new_v4();
121        let mut background_tasks_futures = vec![];
122        let mut background_tasks_contructs_dids = vec![];
123        let mut runbook_completed = false;
124
125        loop {
126            let mut pass_results = run_constructs_evaluation(
127                &uuid,
128                &flow_context.workspace_context,
129                &mut flow_context.execution_context,
130                &mut runbook.runtime_context,
131                &runbook.supervision_context,
132                &mut action_item_requests,
133                &action_item_responses,
134                &progress_tx,
135            )
136            .await;
137
138            if pass_results.has_diagnostics() {
139                return Err(pass_results.with_spans_filled(&runbook.sources));
140            }
141
142            if !pass_results.pending_background_tasks_constructs_uuids.is_empty() {
143                background_tasks_futures.append(&mut pass_results.pending_background_tasks_futures);
144                background_tasks_contructs_dids
145                    .append(&mut pass_results.pending_background_tasks_constructs_uuids);
146            }
147
148            if !pass_results.actions.has_pending_actions()
149                && background_tasks_contructs_dids.is_empty()
150                && pass_results.nodes_to_re_execute.is_empty()
151            {
152                runbook_completed = true;
153            }
154
155            if background_tasks_futures.is_empty() {
156                // sleep(time::Duration::from_secs(3));
157            } else {
158                process_background_tasks(
159                    None,
160                    background_tasks_contructs_dids,
161                    background_tasks_futures,
162                    flow_context,
163                )
164                .await
165                .map_err(|mut diag| {
166                    diag.span = get_source_context_for_diagnostic(&diag, &runbook.sources);
167                    vec![diag]
168                })?;
169                background_tasks_futures = vec![];
170                background_tasks_contructs_dids = vec![];
171            }
172
173            uuid = Uuid::new_v4();
174            if runbook_completed {
175                break;
176            }
177        }
178    }
179
180    Ok(())
181}
182
183pub async fn start_supervised_runbook_runloop(
184    runbook: &mut Runbook,
185    block_tx: Sender<BlockEvent>,
186    mut action_item_responses_rx: tokio::sync::broadcast::Receiver<ActionItemResponse>,
187) -> Result<(), Vec<Diagnostic>> {
188    // let mut runbook_state = BTreeMap::new();
189
190    let mut intialized_flow_index: i16 = -1;
191    runbook.supervision_context = RunbookSupervisionContext {
192        review_input_default_values: true,
193        review_input_values: true,
194        is_supervised: true,
195    };
196
197    // Compute number of steps
198    // A step is
199
200    // store of action_item_ids and the associated action_item_request, grouped by the flow index
201    let mut flow_action_item_requests: BTreeMap<usize, BTreeMap<BlockId, ActionItemRequest>> =
202        BTreeMap::new();
203    // store of construct_dids and its associated action_item_response_types, grouped by the flow index
204    let mut flow_action_item_responses = BTreeMap::new();
205
206    let mut background_tasks_futures = vec![];
207    let mut background_tasks_contructs_dids = vec![];
208    let mut background_tasks_handle_uuid = Uuid::new_v4();
209    let mut validated_blocks = 0;
210    let total_flows_count = runbook.flow_contexts.len();
211    let mut current_flow_index: usize = 0;
212    loop {
213        let event_opt = match action_item_responses_rx.try_recv() {
214            Ok(action) => Some(action),
215            Err(TryRecvError::Empty) | Err(TryRecvError::Lagged(_)) => None,
216            Err(TryRecvError::Closed) => return Ok(()),
217        };
218
219        if intialized_flow_index != current_flow_index as i16 {
220            intialized_flow_index = current_flow_index as i16;
221
222            flow_action_item_responses.insert(current_flow_index, BTreeMap::new());
223            flow_action_item_requests.insert(current_flow_index, BTreeMap::new());
224
225            let action_item_responses =
226                flow_action_item_responses.get_mut(&current_flow_index).unwrap();
227            let mut action_item_requests =
228                flow_action_item_requests.get_mut(&current_flow_index).unwrap();
229
230            let genesis_events = build_genesis_panel(
231                runbook,
232                &mut action_item_requests,
233                &action_item_responses,
234                &block_tx.clone(),
235                validated_blocks,
236                current_flow_index,
237                total_flows_count,
238            )
239            .await?;
240            for event in genesis_events {
241                let _ = block_tx.send(event).unwrap();
242            }
243        }
244        let action_item_responses =
245            flow_action_item_responses.get_mut(&current_flow_index).unwrap();
246        let mut action_item_requests =
247            flow_action_item_requests.get_mut(&current_flow_index).unwrap();
248
249        // Cooldown
250        let Some(action_item_response) = event_opt else {
251            sleep(Duration::from_millis(50));
252            continue;
253        };
254        let ActionItemResponse { action_item_id, payload } = action_item_response.clone();
255
256        if action_item_id == SET_ENV_ACTION.id {
257            if let Err(diags) = reset_runbook_execution(
258                runbook,
259                &payload,
260                &mut action_item_requests,
261                &action_item_responses,
262                &block_tx.clone(),
263                current_flow_index,
264                total_flows_count,
265            )
266            .await
267            {
268                let _ = block_tx.send(BlockEvent::Error(Block {
269                    uuid: Uuid::new_v4(),
270                    visible: true,
271                    panel: Panel::ErrorPanel(ErrorPanelData::from_diagnostics(&diags)),
272                }));
273                return Err(diags);
274            };
275            continue;
276        }
277
278        if let Some(action_item) = action_item_requests.get(&action_item_id) {
279            let action_item = action_item.clone();
280            if let Some(construct_did) = action_item.construct_did {
281                if let Some(responses) = action_item_responses.get_mut(&construct_did) {
282                    responses.push(action_item_response);
283                } else {
284                    action_item_responses.insert(construct_did, vec![action_item_response]);
285                }
286            }
287        }
288
289        match &payload {
290            ActionItemResponseType::ValidateModal => {}
291            ActionItemResponseType::ValidateBlock => {
292                // Keep track of whether we've initialized this bg uuid to avoid sending more updates
293                // for this action item than necessary
294                let mut bg_uuid_initialized = false;
295
296                // When a block is validated, the pass could have some set of nested constructs. Each of these constructs
297                // needs to have their background tasks awaited before continuing to the next.
298                // So in this loop we:
299                // 1. Await background tasks, if we have any
300                // 2. Evaluate the graph to get new actions
301                //   a. If there are no new actions or new pending background tasks, mark the runbook as completed
302                //   b. If the runbook isn't completed yet, and there were background tasks at the start of the loop, and we have new background tasks,
303                //      we need to loop again to flush out the background tasks
304                //   c. If there are new actions and there are no background tasks to await, add the actions to the action item requests and send them to the block processor
305                //      to be processed by the frontend
306                loop {
307                    let start_of_loop_had_bg_tasks = !background_tasks_futures.is_empty();
308                    // Handle background tasks
309                    if start_of_loop_had_bg_tasks {
310                        let flow_context =
311                            runbook.flow_contexts.get_mut(current_flow_index).unwrap();
312                        let supervised_bg_context = if bg_uuid_initialized {
313                            None
314                        } else {
315                            Some(SupervisedBackgroundTaskContext::new(
316                                &block_tx,
317                                &background_tasks_handle_uuid,
318                                &action_item_id,
319                            ))
320                        };
321                        process_background_tasks(
322                            supervised_bg_context,
323                            background_tasks_contructs_dids,
324                            background_tasks_futures,
325                            flow_context,
326                        )
327                        .await
328                        .map_err(|mut diag| {
329                            diag.span = get_source_context_for_diagnostic(&diag, &runbook.sources);
330                            vec![diag]
331                        })?;
332                        bg_uuid_initialized = true;
333                        background_tasks_futures = vec![];
334                        background_tasks_contructs_dids = vec![];
335                    }
336
337                    // Retrieve the previous requests sent and update their statuses.
338                    let mut flow_execution_completed = false;
339                    let mut map: BTreeMap<ConstructDid, _> = BTreeMap::new();
340                    let flow_context = runbook.flow_contexts.get_mut(current_flow_index).unwrap();
341                    let mut pass_results = run_constructs_evaluation(
342                        &background_tasks_handle_uuid,
343                        &flow_context.workspace_context,
344                        &mut flow_context.execution_context,
345                        &mut runbook.runtime_context,
346                        &runbook.supervision_context,
347                        &mut map,
348                        &action_item_responses,
349                        &block_tx.clone(),
350                    )
351                    .await;
352
353                    // if there were errors, return them to complete execution
354                    if let Some(error_event) = pass_results.compile_diagnostics_to_block() {
355                        let _ = block_tx.send(BlockEvent::Error(error_event));
356                        set_progress_bar_visibility(
357                            &block_tx,
358                            &background_tasks_handle_uuid,
359                            false,
360                        );
361                        return Err(pass_results.with_spans_filled(&runbook.sources));
362                    }
363
364                    let pass_has_pending_bg_tasks =
365                        !pass_results.pending_background_tasks_constructs_uuids.is_empty();
366                    let pass_has_pending_actions = pass_results.actions.has_pending_actions();
367                    let pass_has_nodes_to_re_execute = !pass_results.nodes_to_re_execute.is_empty();
368
369                    if !pass_has_pending_actions
370                        && !pass_has_pending_bg_tasks
371                        && !pass_has_nodes_to_re_execute
372                    {
373                        let flow_context =
374                            runbook.flow_contexts.get_mut(current_flow_index).unwrap();
375                        let grouped_actions_items =
376                            flow_context.execution_context.collect_outputs_constructs_results();
377                        let mut actions = Actions::new_panel("output review", "");
378                        for (key, action_items) in grouped_actions_items.into_iter() {
379                            actions.push_group(key.as_str(), action_items);
380                        }
381                        pass_results.actions.append(&mut actions);
382
383                        flow_execution_completed = true;
384                    } else if !pass_results.actions.store.is_empty() {
385                        validated_blocks = validated_blocks + 1;
386                        pass_results.actions.push_sub_group(
387                            None,
388                            vec![ActionItemRequest::new(
389                                &None,
390                                "Validate",
391                                None,
392                                ActionItemStatus::Todo,
393                                ActionItemRequestType::ValidateBlock(ValidateBlockData::new(
394                                    validated_blocks,
395                                )),
396                                ACTION_ITEM_VALIDATE_BLOCK,
397                            )],
398                        );
399                    }
400
401                    if pass_has_pending_bg_tasks {
402                        background_tasks_futures
403                            .append(&mut pass_results.pending_background_tasks_futures);
404                        background_tasks_contructs_dids
405                            .append(&mut pass_results.pending_background_tasks_constructs_uuids);
406                    }
407
408                    if !pass_has_pending_bg_tasks && !start_of_loop_had_bg_tasks {
409                        let update = ActionItemRequestUpdate::from_id(&action_item_id)
410                            .set_status(ActionItemStatus::Success(None));
411                        pass_results.actions.push_action_item_update(update);
412                        for new_request in
413                            pass_results.actions.get_new_action_item_requests().into_iter()
414                        {
415                            action_item_requests
416                                .insert(new_request.id.clone(), new_request.clone());
417                        }
418                        let block_events = pass_results
419                            .actions
420                            .compile_actions_to_block_events(&action_item_requests);
421
422                        for event in block_events.into_iter() {
423                            let _ = block_tx.send(event);
424                        }
425                    }
426                    if flow_execution_completed && !start_of_loop_had_bg_tasks {
427                        set_progress_bar_visibility(
428                            &block_tx,
429                            &background_tasks_handle_uuid,
430                            false,
431                        );
432                        if current_flow_index == total_flows_count - 1 {
433                            let _ = block_tx.send(BlockEvent::RunbookCompleted);
434                            return Ok(());
435                        } else {
436                            current_flow_index += 1;
437                        }
438                    }
439                    if !pass_has_pending_bg_tasks
440                        && !start_of_loop_had_bg_tasks
441                        && !pass_has_nodes_to_re_execute
442                    {
443                        set_progress_bar_visibility(
444                            &block_tx,
445                            &background_tasks_handle_uuid,
446                            false,
447                        );
448                        background_tasks_handle_uuid = Uuid::new_v4();
449                        break;
450                    }
451                }
452            }
453            ActionItemResponseType::PickInputOption(_) => {}
454            ActionItemResponseType::ProvideInput(_) => {}
455            ActionItemResponseType::ReviewInput(ReviewedInputResponse {
456                value_checked,
457                force_execution,
458                ..
459            }) => {
460                let new_status = match value_checked {
461                    true => ActionItemStatus::Success(None),
462                    false => ActionItemStatus::Todo,
463                };
464                let _ = block_tx.send(BlockEvent::UpdateActionItems(vec![
465                    ActionItemRequestUpdate::from_id(&action_item_id)
466                        .set_status(new_status)
467                        .normalize(&action_item_requests)
468                        .unwrap(),
469                ]));
470                // Some signers do not actually need the user to provide the address/pubkey,
471                // but they need to confirm it in the supervisor. when it is confirmed, we need to
472                // reprocess the signers
473                if let Some(request) = action_item_requests.get(&action_item_id) {
474                    if request.internal_key == ACTION_ITEM_CHECK_ADDRESS
475                        || request.internal_key == ACTION_ITEM_CHECK_BALANCE
476                    {
477                        process_signers_action_item_response(
478                            runbook,
479                            &block_tx,
480                            &action_item_id,
481                            &mut action_item_requests,
482                            &action_item_responses,
483                            current_flow_index,
484                        )
485                        .await;
486                    }
487                }
488
489                if *force_execution {
490                    let running_context =
491                        runbook.flow_contexts.get_mut(current_flow_index).unwrap();
492                    let mut pass_results = run_constructs_evaluation(
493                        &background_tasks_handle_uuid,
494                        &running_context.workspace_context,
495                        &mut running_context.execution_context,
496                        &mut runbook.runtime_context,
497                        &runbook.supervision_context,
498                        &mut BTreeMap::new(),
499                        &action_item_responses,
500                        &block_tx.clone(),
501                    )
502                    .await;
503                    let mut updated_actions = vec![];
504                    for action in pass_results
505                        .actions
506                        .compile_actions_to_item_updates(&action_item_requests)
507                        .into_iter()
508                    {
509                        updated_actions.push(action.normalize(&action_item_requests).unwrap())
510                    }
511                    let _ = block_tx.send(BlockEvent::UpdateActionItems(updated_actions));
512
513                    if !pass_results.pending_background_tasks_constructs_uuids.is_empty() {
514                        background_tasks_futures
515                            .append(&mut pass_results.pending_background_tasks_futures);
516                        background_tasks_contructs_dids
517                            .append(&mut pass_results.pending_background_tasks_constructs_uuids);
518                    }
519
520                    if pass_results.has_diagnostics() {
521                        pass_results.fill_diagnostic_span(&runbook.sources);
522                    }
523                    if let Some(error_event) = pass_results.compile_diagnostics_to_block() {
524                        let _ = block_tx.send(BlockEvent::Error(error_event));
525                    }
526                }
527            }
528            ActionItemResponseType::ProvidePublicKey(_response) => {
529                process_signers_action_item_response(
530                    runbook,
531                    &block_tx,
532                    &action_item_id,
533                    &mut action_item_requests,
534                    &action_item_responses,
535                    current_flow_index,
536                )
537                .await;
538            }
539            ActionItemResponseType::ProvideSignedTransaction(_)
540            | ActionItemResponseType::SendTransaction(_)
541            | ActionItemResponseType::ProvideSignedMessage(_) => {
542                // Retrieve the previous requests sent and update their statuses.
543                let Some((signing_action_construct_did, scoped_requests)) =
544                    retrieve_related_action_items_requests(
545                        &action_item_id,
546                        &mut action_item_requests,
547                    )
548                else {
549                    continue;
550                };
551                let mut map: BTreeMap<ConstructDid, _> = BTreeMap::new();
552                map.insert(signing_action_construct_did, scoped_requests);
553
554                let running_context = runbook.flow_contexts.get_mut(current_flow_index).unwrap();
555                let mut pass_results = run_constructs_evaluation(
556                    &background_tasks_handle_uuid,
557                    &running_context.workspace_context,
558                    &mut running_context.execution_context,
559                    &mut runbook.runtime_context,
560                    &runbook.supervision_context,
561                    &mut map,
562                    &action_item_responses,
563                    &block_tx.clone(),
564                )
565                .await;
566
567                let mut updated_actions = vec![];
568                for action in pass_results
569                    .actions
570                    .compile_actions_to_item_updates(&action_item_requests)
571                    .into_iter()
572                {
573                    updated_actions.push(action.normalize(&action_item_requests).unwrap())
574                }
575
576                let _ = block_tx.send(BlockEvent::UpdateActionItems(updated_actions));
577
578                if !pass_results.pending_background_tasks_constructs_uuids.is_empty() {
579                    background_tasks_futures
580                        .append(&mut pass_results.pending_background_tasks_futures);
581                    background_tasks_contructs_dids
582                        .append(&mut pass_results.pending_background_tasks_constructs_uuids);
583                }
584                if pass_results.has_diagnostics() {
585                    pass_results.fill_diagnostic_span(&runbook.sources);
586                }
587                if let Some(error_event) = pass_results.compile_diagnostics_to_block() {
588                    let _ = block_tx.send(BlockEvent::Error(error_event));
589                }
590            }
591        };
592    }
593}
594
595pub fn register_action_items_from_actions(
596    actions: &Actions,
597    action_item_requests: &mut BTreeMap<BlockId, ActionItemRequest>,
598) {
599    for action in actions.get_new_action_item_requests().into_iter() {
600        action_item_requests.insert(action.id.clone(), action.clone());
601    }
602}
603
604pub fn retrieve_related_action_items_requests<'a>(
605    action_item_id: &BlockId,
606    action_item_requests: &'a mut BTreeMap<BlockId, ActionItemRequest>,
607) -> Option<(ConstructDid, Vec<&'a mut ActionItemRequest>)> {
608    let Some(signer_construct_did) =
609        action_item_requests.get(&action_item_id).and_then(|a| a.construct_did.clone())
610    else {
611        eprintln!("unable to retrieve {}", action_item_id);
612        // todo: log error
613        return None;
614    };
615    // // Retrieve the previous requests sent
616    // // and update their statuses.
617    let mut scoped_requests = vec![];
618    for (_, request) in action_item_requests.iter_mut() {
619        let Some(ref construct_did) = request.construct_did else {
620            continue;
621        };
622        if construct_did.eq(&signer_construct_did) {
623            scoped_requests.push(request);
624        }
625    }
626    Some((signer_construct_did, scoped_requests))
627}
628
629pub async fn reset_runbook_execution(
630    runbook: &mut Runbook,
631    payload: &ActionItemResponseType,
632    action_item_requests: &mut BTreeMap<BlockId, ActionItemRequest>,
633    action_item_responses: &BTreeMap<ConstructDid, Vec<ActionItemResponse>>,
634    progress_tx: &Sender<BlockEvent>,
635    current_flow_index: usize,
636    total_flows_count: usize,
637) -> Result<(), Vec<Diagnostic>> {
638    let ActionItemResponseType::PickInputOption(environment_key) = payload else {
639        unreachable!(
640            "Action item event wih environment uuid sent with invalid payload {:?}",
641            payload
642        );
643    };
644
645    let reset = runbook.update_inputs_selector(Some(environment_key.to_string()), true).await?;
646
647    if !reset {
648        unimplemented!()
649    }
650
651    let _ = progress_tx.send(BlockEvent::Clear);
652    let genesis_events = build_genesis_panel(
653        runbook,
654        action_item_requests,
655        action_item_responses,
656        &progress_tx,
657        0,
658        current_flow_index,
659        total_flows_count,
660    )
661    .await?;
662    for event in genesis_events {
663        let _ = progress_tx.send(event).unwrap();
664    }
665    Ok(())
666}
667
668pub async fn build_genesis_panel(
669    runbook: &mut Runbook,
670    action_item_requests: &mut BTreeMap<BlockId, ActionItemRequest>,
671    action_item_responses: &BTreeMap<ConstructDid, Vec<ActionItemResponse>>,
672    progress_tx: &Sender<BlockEvent>,
673    validated_blocks: usize,
674    current_flow_index: usize,
675    total_flows_count: usize,
676) -> Result<Vec<BlockEvent>, Vec<Diagnostic>> {
677    let mut actions = Actions::none();
678
679    let environments = runbook.get_inputs_selectors();
680    let selector = runbook.get_active_inputs_selector();
681
682    let Some(flow_context) = runbook.flow_contexts.get_mut(current_flow_index) else {
683        return Err(vec![diagnosed_error!(
684            "internal error: attempted to access a flow that does not exist"
685        )]);
686    };
687
688    if total_flows_count > 1 {
689        actions.push_begin_flow_panel(
690            current_flow_index,
691            &flow_context.name,
692            &flow_context.description,
693        );
694    } else {
695    }
696    actions.push_panel("runbook checklist", "");
697
698    if environments.len() > 0 {
699        let input_options: Vec<InputOption> = environments
700            .iter()
701            .map(|k| InputOption { value: k.to_string(), displayed_value: k.to_string() })
702            .collect();
703        let selected_option: InputOption = selector
704            .clone()
705            .and_then(|e| Some(InputOption { value: e.clone(), displayed_value: e.clone() }))
706            .unwrap_or({
707                let k = environments.iter().next().unwrap();
708                InputOption { value: k.clone(), displayed_value: k.clone() }
709            });
710        let action_request = ActionItemRequest::new(
711            &None,
712            "Select the environment to target",
713            None,
714            ActionItemStatus::Success(None),
715            ActionItemRequestType::PickInputOption(PickInputOptionRequest {
716                options: input_options,
717                selected: selected_option,
718            }),
719            ACTION_ITEM_ENV,
720        );
721        actions.push_sub_group(None, vec![action_request]);
722    }
723
724    let mut pass_result: eval::EvaluationPassResult = run_signers_evaluation(
725        &flow_context.workspace_context,
726        &mut flow_context.execution_context,
727        &runbook.runtime_context,
728        &runbook.supervision_context,
729        &mut BTreeMap::new(),
730        &action_item_responses,
731        &progress_tx,
732    )
733    .await;
734
735    if pass_result.has_diagnostics() {
736        return Err(pass_result.with_spans_filled(&runbook.sources));
737    }
738
739    actions.append(&mut pass_result.actions);
740
741    let validate_action = ActionItemRequest::new(
742        &None,
743        "start runbook".into(),
744        None,
745        ActionItemStatus::Todo,
746        ActionItemRequestType::ValidateBlock(ValidateBlockData::new(validated_blocks)),
747        ACTION_ITEM_GENESIS,
748    );
749    actions.push_sub_group(None, vec![validate_action]);
750
751    register_action_items_from_actions(&actions, action_item_requests);
752
753    let panels = actions.compile_actions_to_block_events(&action_item_requests);
754    for panel in panels.iter() {
755        match panel {
756            BlockEvent::Modal(_) => {}
757            BlockEvent::Action(_) => {}
758            _ => {
759                println!("-----");
760            }
761        }
762    }
763    // assert_eq!(panels.len(), 1);
764
765    Ok(panels)
766}
767
768#[derive(Debug, Clone)]
769pub struct SupervisedBackgroundTaskContext {
770    block_tx: Sender<BlockEvent>,
771    background_tasks_handle_uuid: Uuid,
772    action_item_id: BlockId,
773}
774impl SupervisedBackgroundTaskContext {
775    pub fn new(
776        block_tx: &Sender<BlockEvent>,
777        background_tasks_handle_uuid: &Uuid,
778        action_item_id: &BlockId,
779    ) -> Self {
780        SupervisedBackgroundTaskContext {
781            block_tx: block_tx.clone(),
782            background_tasks_handle_uuid: background_tasks_handle_uuid.clone(),
783            action_item_id: action_item_id.clone(),
784        }
785    }
786}
787
788pub async fn process_background_tasks(
789    supervised_context: Option<SupervisedBackgroundTaskContext>,
790    background_tasks_contructs_dids: Vec<(ConstructDid, ConstructDid)>,
791    background_tasks_futures: Vec<
792        Pin<Box<dyn Future<Output = Result<CommandExecutionResult, Diagnostic>> + Send>>,
793    >,
794    flow_context: &mut FlowContext,
795) -> Result<(), Diagnostic> {
796    if let Some(SupervisedBackgroundTaskContext {
797        block_tx,
798        background_tasks_handle_uuid,
799        action_item_id,
800    }) = supervised_context.as_ref()
801    {
802        let _ =
803            block_tx.send(BlockEvent::UpdateActionItems(vec![NormalizedActionItemRequestUpdate {
804                id: action_item_id.clone(),
805                action_status: Some(ActionItemStatus::Success(None)),
806                action_type: None,
807            }]));
808        let _ = block_tx.send(BlockEvent::ProgressBar(Block::new(
809            &background_tasks_handle_uuid,
810            Panel::ProgressBar(vec![]),
811        )));
812    }
813
814    let results: Vec<Result<CommandExecutionResult, Diagnostic>> =
815        txtx_addon_kit::futures::future::join_all(background_tasks_futures).await;
816    for ((nested_construct_did, construct_did), result) in
817        background_tasks_contructs_dids.into_iter().zip(results)
818    {
819        match result {
820            Ok(result) => {
821                flow_context
822                    .execution_context
823                    .append_commands_execution_result(&nested_construct_did, &result);
824            }
825            Err(mut diag) => {
826                let construct_id =
827                    flow_context.workspace_context.expect_construct_id(&construct_did);
828                diag = diag.location(&construct_id.construct_location);
829                if let Some(command_instance) =
830                    flow_context.execution_context.commands_instances.get_mut(&construct_did)
831                {
832                    diag = diag.set_span_range(command_instance.block.span());
833                };
834                if let Some(SupervisedBackgroundTaskContext {
835                    block_tx,
836                    background_tasks_handle_uuid,
837                    ..
838                }) = supervised_context.as_ref()
839                {
840                    let _ = block_tx.send(BlockEvent::Error(Block {
841                        uuid: Uuid::new_v4(),
842                        visible: true,
843                        panel: Panel::ErrorPanel(ErrorPanelData::from_diagnostics(&vec![
844                            diag.clone()
845                        ])),
846                    }));
847                    set_progress_bar_visibility(block_tx, background_tasks_handle_uuid, false);
848                }
849                return Err(diag);
850            }
851        }
852    }
853
854    Ok(())
855}
856
857pub fn set_progress_bar_visibility(
858    block_tx: &Sender<BlockEvent>,
859    background_tasks_handle_uuid: &Uuid,
860    visibility: bool,
861) {
862    let _ = block_tx.send(BlockEvent::UpdateProgressBarVisibility(
863        ProgressBarVisibilityUpdate::new(&background_tasks_handle_uuid, visibility),
864    ));
865}
866
867pub async fn process_signers_action_item_response(
868    runbook: &mut Runbook,
869    block_tx: &Sender<BlockEvent>,
870    action_item_id: &BlockId,
871    action_item_requests: &mut BTreeMap<BlockId, ActionItemRequest>,
872    action_item_responses: &BTreeMap<ConstructDid, Vec<ActionItemResponse>>,
873    current_flow_index: usize,
874) {
875    // Retrieve the previous requests sent and update their statuses.
876    let Some((signer_construct_did, scoped_requests)) =
877        retrieve_related_action_items_requests(&action_item_id, action_item_requests)
878    else {
879        return;
880    };
881
882    let mut map = BTreeMap::new();
883    map.insert(signer_construct_did, scoped_requests);
884
885    let flow_context = runbook.flow_contexts.get_mut(current_flow_index).unwrap();
886    let mut pass_result = run_signers_evaluation(
887        &flow_context.workspace_context,
888        &mut flow_context.execution_context,
889        &mut runbook.runtime_context,
890        &runbook.supervision_context,
891        &mut map,
892        &action_item_responses,
893        &block_tx.clone(),
894    )
895    .await;
896
897    if pass_result.has_diagnostics() {
898        pass_result.fill_diagnostic_span(&runbook.sources);
899    }
900
901    if let Some(error_event) = pass_result.compile_diagnostics_to_block() {
902        let _ = block_tx.send(BlockEvent::Error(error_event));
903    } else {
904        let updated_actions = pass_result
905            .actions
906            .compile_actions_to_item_updates(&action_item_requests)
907            .into_iter()
908            .map(|u| u.normalize(&action_item_requests).unwrap())
909            .collect();
910        let _ = block_tx.send(BlockEvent::UpdateActionItems(updated_actions));
911    }
912}