Skip to main content

txtx_core/
lib.rs

1#[macro_use]
2extern crate lazy_static;
3
4#[macro_use]
5pub extern crate txtx_addon_kit as kit;
6
7pub extern crate mustache;
8
9mod constants;
10pub mod errors;
11pub mod eval;
12pub mod manifest;
13// pub mod snapshot;
14pub mod runbook;
15pub mod std;
16pub mod templates;
17pub mod types;
18pub mod validation;
19
20#[cfg(test)]
21mod tests;
22pub mod utils;
23
24use ::std::collections::BTreeMap;
25use ::std::future::Future;
26use ::std::pin::Pin;
27use ::std::thread::sleep;
28use ::std::time::Duration;
29
30use crate::runbook::flow_context::FlowContext;
31use constants::ACTION_ITEM_ENV;
32use constants::ACTION_ITEM_GENESIS;
33use constants::ACTION_ITEM_VALIDATE_BLOCK;
34use eval::run_constructs_evaluation;
35use eval::run_signers_evaluation;
36use kit::constants::ACTION_ITEM_CHECK_BALANCE;
37use runbook::get_source_context_for_diagnostic;
38use tokio::sync::broadcast::error::TryRecvError;
39use txtx_addon_kit::channel::Sender;
40use txtx_addon_kit::constants::ACTION_ITEM_CHECK_ADDRESS;
41use txtx_addon_kit::hcl::Span;
42use txtx_addon_kit::types::block_id::BlockId;
43use txtx_addon_kit::types::commands::CommandExecutionResult;
44use txtx_addon_kit::types::diagnostics::Diagnostic;
45use txtx_addon_kit::types::frontend::ActionItemRequest;
46use txtx_addon_kit::types::frontend::ActionItemRequestType;
47use txtx_addon_kit::types::frontend::ActionItemRequestUpdate;
48use txtx_addon_kit::types::frontend::ActionItemResponse;
49use txtx_addon_kit::types::frontend::ActionItemResponseType;
50use txtx_addon_kit::types::frontend::ActionItemStatus;
51use txtx_addon_kit::types::frontend::Actions;
52use txtx_addon_kit::types::frontend::Block;
53use txtx_addon_kit::types::frontend::BlockEvent;
54use txtx_addon_kit::types::frontend::ErrorPanelData;
55use txtx_addon_kit::types::frontend::InputOption;
56use txtx_addon_kit::types::frontend::NormalizedActionItemRequestUpdate;
57use txtx_addon_kit::types::frontend::Panel;
58use txtx_addon_kit::types::frontend::PickInputOptionRequest;
59use txtx_addon_kit::types::frontend::ReviewedInputResponse;
60use txtx_addon_kit::types::frontend::ValidateBlockData;
61use txtx_addon_kit::types::types::RunbookSupervisionContext;
62use txtx_addon_kit::types::ConstructDid;
63use txtx_addon_kit::uuid::Uuid;
64use types::Runbook;
65
66lazy_static! {
67    // create this action so we can reference its `id` property, which is built from the immutable data
68     pub static ref SET_ENV_ACTION: ActionItemRequest =ActionItemRequestType::PickInputOption(PickInputOptionRequest {
69            options: vec![],
70            selected: InputOption::default(),
71        }).to_request("", ACTION_ITEM_ENV)
72        .with_meta_description("Select the environment to target")
73        .with_status(ActionItemStatus::Success(None))
74      ;
75}
76
77pub async fn start_unsupervised_runbook_runloop(
78    runbook: &mut Runbook,
79    progress_tx: &txtx_addon_kit::channel::Sender<BlockEvent>,
80) -> Result<(), Vec<Diagnostic>> {
81    runbook.supervision_context = RunbookSupervisionContext {
82        review_input_default_values: false,
83        review_input_values: false,
84        is_supervised: false,
85    };
86
87    for flow_context in runbook.flow_contexts.iter_mut() {
88        if !flow_context.is_enabled() {
89            continue;
90        }
91
92        let mut action_item_requests = BTreeMap::new();
93        let action_item_responses = BTreeMap::new();
94
95        let pass_results = run_signers_evaluation(
96            &flow_context.workspace_context,
97            &mut flow_context.execution_context,
98            &runbook.runtime_context,
99            &runbook.supervision_context,
100            &mut action_item_requests,
101            &action_item_responses,
102            &progress_tx,
103        )
104        .await;
105
106        if pass_results.actions.has_pending_actions() {
107            return Err(vec![diagnosed_error!(
108                "unsupervised executions should not be generating actions"
109            )]);
110        }
111
112        if pass_results.has_diagnostics() {
113            return Err(pass_results.with_spans_filled(&runbook.sources));
114        }
115
116        let mut uuid = Uuid::new_v4();
117        let mut background_tasks_futures = vec![];
118        let mut background_tasks_contructs_dids = vec![];
119        let mut runbook_completed = false;
120
121        loop {
122            let mut pass_results = run_constructs_evaluation(
123                &uuid,
124                &flow_context.workspace_context,
125                &mut flow_context.execution_context,
126                &mut runbook.runtime_context,
127                &runbook.supervision_context,
128                &mut action_item_requests,
129                &action_item_responses,
130                &progress_tx,
131            )
132            .await;
133
134            if pass_results.has_diagnostics() {
135                return Err(pass_results.with_spans_filled(&runbook.sources));
136            }
137
138            if !pass_results.pending_background_tasks_constructs_uuids.is_empty() {
139                background_tasks_futures.append(&mut pass_results.pending_background_tasks_futures);
140                background_tasks_contructs_dids
141                    .append(&mut pass_results.pending_background_tasks_constructs_uuids);
142            }
143
144            if !pass_results.actions.has_pending_actions()
145                && background_tasks_contructs_dids.is_empty()
146                && pass_results.nodes_to_re_execute.is_empty()
147            {
148                runbook_completed = true;
149            }
150
151            if background_tasks_futures.is_empty() {
152                // sleep(time::Duration::from_secs(3));
153            } else {
154                process_background_tasks(
155                    None,
156                    background_tasks_contructs_dids,
157                    background_tasks_futures,
158                    flow_context,
159                )
160                .await
161                .map_err(|mut diag| {
162                    diag.span = get_source_context_for_diagnostic(&diag, &runbook.sources);
163                    vec![diag]
164                })?;
165                background_tasks_futures = vec![];
166                background_tasks_contructs_dids = vec![];
167            }
168
169            uuid = Uuid::new_v4();
170            if runbook_completed {
171                break;
172            }
173        }
174    }
175
176    Ok(())
177}
178
179pub async fn start_supervised_runbook_runloop(
180    runbook: &mut Runbook,
181    block_tx: Sender<BlockEvent>,
182    mut action_item_responses_rx: tokio::sync::broadcast::Receiver<ActionItemResponse>,
183) -> Result<(), Vec<Diagnostic>> {
184    // let mut runbook_state = BTreeMap::new();
185
186    let mut intialized_flow_index: i16 = -1;
187    runbook.supervision_context = RunbookSupervisionContext {
188        review_input_default_values: true,
189        review_input_values: true,
190        is_supervised: true,
191    };
192
193    // Compute number of steps
194    // A step is
195
196    // store of action_item_ids and the associated action_item_request, grouped by the flow index
197    let mut flow_action_item_requests: BTreeMap<usize, BTreeMap<BlockId, ActionItemRequest>> =
198        BTreeMap::new();
199    // store of construct_dids and its associated action_item_response_types, grouped by the flow index
200    let mut flow_action_item_responses = BTreeMap::new();
201
202    let mut background_tasks_futures = vec![];
203    let mut background_tasks_contructs_dids = vec![];
204    let mut background_tasks_handle_uuid = Uuid::new_v4();
205    let mut validated_blocks = 0;
206    let total_flows_count = runbook.flow_contexts.len();
207    let mut current_flow_index: usize = 0;
208    loop {
209        let event_opt = match action_item_responses_rx.try_recv() {
210            Ok(action) => Some(action),
211            Err(TryRecvError::Empty) | Err(TryRecvError::Lagged(_)) => None,
212            Err(TryRecvError::Closed) => return Ok(()),
213        };
214
215        if intialized_flow_index != current_flow_index as i16 {
216            intialized_flow_index = current_flow_index as i16;
217
218            flow_action_item_responses.insert(current_flow_index, BTreeMap::new());
219            flow_action_item_requests.insert(current_flow_index, BTreeMap::new());
220
221            let action_item_responses =
222                flow_action_item_responses.get_mut(&current_flow_index).unwrap();
223            let mut action_item_requests =
224                flow_action_item_requests.get_mut(&current_flow_index).unwrap();
225
226            let genesis_events = build_genesis_panel(
227                runbook,
228                &mut action_item_requests,
229                &action_item_responses,
230                &block_tx.clone(),
231                validated_blocks,
232                current_flow_index,
233                total_flows_count,
234            )
235            .await?;
236            for event in genesis_events {
237                let _ = block_tx.send(event).unwrap();
238            }
239        }
240        let action_item_responses =
241            flow_action_item_responses.get_mut(&current_flow_index).unwrap();
242        let mut action_item_requests =
243            flow_action_item_requests.get_mut(&current_flow_index).unwrap();
244
245        // Cooldown
246        let Some(action_item_response) = event_opt else {
247            sleep(Duration::from_millis(50));
248            continue;
249        };
250        let ActionItemResponse { action_item_id, payload } = action_item_response.clone();
251
252        if action_item_id == SET_ENV_ACTION.id {
253            if let Err(diags) = reset_runbook_execution(
254                runbook,
255                &payload,
256                &mut action_item_requests,
257                &action_item_responses,
258                &block_tx.clone(),
259                current_flow_index,
260                total_flows_count,
261            )
262            .await
263            {
264                let _ = block_tx.send(BlockEvent::Error(Block {
265                    uuid: Uuid::new_v4(),
266                    visible: true,
267                    panel: Panel::ErrorPanel(ErrorPanelData::from_diagnostics(&diags)),
268                }));
269                return Err(diags);
270            };
271            continue;
272        }
273
274        if let Some(action_item) = action_item_requests.get(&action_item_id) {
275            let action_item = action_item.clone();
276            if let Some(construct_did) = action_item.construct_did {
277                if let Some(responses) = action_item_responses.get_mut(&construct_did) {
278                    responses.push(action_item_response);
279                } else {
280                    action_item_responses.insert(construct_did, vec![action_item_response]);
281                }
282            }
283        }
284
285        match &payload {
286            ActionItemResponseType::ValidateModal => {}
287            ActionItemResponseType::ValidateBlock => {
288                // Keep track of whether we've initialized this bg uuid to avoid sending more updates
289                // for this action item than necessary
290                let mut bg_uuid_initialized = false;
291
292                // When a block is validated, the pass could have some set of nested constructs. Each of these constructs
293                // needs to have their background tasks awaited before continuing to the next.
294                // So in this loop we:
295                // 1. Await background tasks, if we have any
296                // 2. Evaluate the graph to get new actions
297                //   a. If there are no new actions or new pending background tasks, mark the runbook as completed
298                //   b. If the runbook isn't completed yet, and there were background tasks at the start of the loop, and we have new background tasks,
299                //      we need to loop again to flush out the background tasks
300                //   c. If there are new actions and there are no background tasks to await, add the actions to the action item requests and send them to the block processor
301                //      to be processed by the frontend
302                loop {
303                    let start_of_loop_had_bg_tasks = !background_tasks_futures.is_empty();
304                    // Handle background tasks
305                    if start_of_loop_had_bg_tasks {
306                        let flow_context =
307                            runbook.flow_contexts.get_mut(current_flow_index).unwrap();
308                        let supervised_bg_context = if bg_uuid_initialized {
309                            None
310                        } else {
311                            Some(SupervisedBackgroundTaskContext::new(&block_tx, &action_item_id))
312                        };
313                        process_background_tasks(
314                            supervised_bg_context,
315                            background_tasks_contructs_dids,
316                            background_tasks_futures,
317                            flow_context,
318                        )
319                        .await
320                        .map_err(|mut diag| {
321                            diag.span = get_source_context_for_diagnostic(&diag, &runbook.sources);
322                            vec![diag]
323                        })?;
324                        bg_uuid_initialized = true;
325                        background_tasks_futures = vec![];
326                        background_tasks_contructs_dids = vec![];
327                    }
328
329                    // Retrieve the previous requests sent and update their statuses.
330                    let mut flow_execution_completed = false;
331                    let mut map: BTreeMap<ConstructDid, _> = BTreeMap::new();
332                    let flow_context = runbook.flow_contexts.get_mut(current_flow_index).unwrap();
333                    let mut pass_results = run_constructs_evaluation(
334                        &background_tasks_handle_uuid,
335                        &flow_context.workspace_context,
336                        &mut flow_context.execution_context,
337                        &mut runbook.runtime_context,
338                        &runbook.supervision_context,
339                        &mut map,
340                        &action_item_responses,
341                        &block_tx.clone(),
342                    )
343                    .await;
344
345                    // if there were errors, return them to complete execution
346                    if let Some(error_event) = pass_results.compile_diagnostics_to_block() {
347                        let _ = block_tx.send(BlockEvent::Error(error_event));
348                        return Err(pass_results.with_spans_filled(&runbook.sources));
349                    }
350
351                    let pass_has_pending_bg_tasks =
352                        !pass_results.pending_background_tasks_constructs_uuids.is_empty();
353                    let pass_has_pending_actions = pass_results.actions.has_pending_actions();
354                    let pass_has_nodes_to_re_execute = !pass_results.nodes_to_re_execute.is_empty();
355
356                    let additional_info = flow_context
357                        .execution_context
358                        .commands_execution_results
359                        .values()
360                        .filter_map(|result| result.runbook_complete_additional_info())
361                        .collect::<Vec<_>>();
362
363                    if !pass_has_pending_actions
364                        && !pass_has_pending_bg_tasks
365                        && !pass_has_nodes_to_re_execute
366                    {
367                        let flow_context =
368                            runbook.flow_contexts.get_mut(current_flow_index).unwrap();
369                        let grouped_actions_items =
370                            flow_context.execution_context.collect_outputs_constructs_results(
371                                &runbook.runtime_context.authorization_context,
372                            );
373                        let mut actions = Actions::new_panel("output review", "");
374                        for (key, action_items) in grouped_actions_items.into_iter() {
375                            actions.push_group(key.as_str(), action_items);
376                        }
377                        pass_results.actions.append(&mut actions);
378
379                        flow_execution_completed = true;
380                    } else if !pass_results.actions.store.is_empty() {
381                        validated_blocks = validated_blocks + 1;
382                        pass_results.actions.push_sub_group(
383                            None,
384                            vec![ActionItemRequestType::ValidateBlock(ValidateBlockData::new(
385                                validated_blocks,
386                            ))
387                            .to_request("Validate", ACTION_ITEM_VALIDATE_BLOCK)],
388                        );
389                    }
390
391                    if pass_has_pending_bg_tasks {
392                        background_tasks_futures
393                            .append(&mut pass_results.pending_background_tasks_futures);
394                        background_tasks_contructs_dids
395                            .append(&mut pass_results.pending_background_tasks_constructs_uuids);
396                    }
397
398                    if !pass_has_pending_bg_tasks && !start_of_loop_had_bg_tasks {
399                        let update = ActionItemRequestUpdate::from_id(&action_item_id)
400                            .set_status(ActionItemStatus::Success(None));
401                        pass_results.actions.push_action_item_update(update);
402                        for new_request in
403                            pass_results.actions.get_new_action_item_requests().into_iter()
404                        {
405                            action_item_requests
406                                .insert(new_request.id.clone(), new_request.clone());
407                        }
408                        let block_events = pass_results
409                            .actions
410                            .compile_actions_to_block_events(&action_item_requests);
411
412                        for event in block_events.into_iter() {
413                            let _ = block_tx.send(event);
414                        }
415                    }
416                    if flow_execution_completed && !start_of_loop_had_bg_tasks {
417                        if current_flow_index == total_flows_count - 1 {
418                            let _ = block_tx.send(BlockEvent::RunbookCompleted(additional_info));
419                            return Ok(());
420                        } else {
421                            current_flow_index += 1;
422                        }
423                    }
424                    if !pass_has_pending_bg_tasks
425                        && !start_of_loop_had_bg_tasks
426                        && !pass_has_nodes_to_re_execute
427                    {
428                        background_tasks_handle_uuid = Uuid::new_v4();
429                        break;
430                    }
431                }
432            }
433            ActionItemResponseType::PickInputOption(_) => {}
434            ActionItemResponseType::ProvideInput(_) => {}
435            ActionItemResponseType::ReviewInput(ReviewedInputResponse {
436                value_checked,
437                force_execution,
438                ..
439            }) => {
440                let new_status = match value_checked {
441                    true => ActionItemStatus::Success(None),
442                    false => ActionItemStatus::Todo,
443                };
444                if let Some(update) = ActionItemRequestUpdate::from_id(&action_item_id)
445                    .set_status(new_status)
446                    .normalize(&action_item_requests)
447                {
448                    let _ = block_tx.send(BlockEvent::UpdateActionItems(vec![update]));
449                }
450                // Some signers do not actually need the user to provide the address/pubkey,
451                // but they need to confirm it in the supervisor. when it is confirmed, we need to
452                // reprocess the signers
453                if let Some(request) = action_item_requests.get(&action_item_id) {
454                    if request.internal_key == ACTION_ITEM_CHECK_ADDRESS
455                        || request.internal_key == ACTION_ITEM_CHECK_BALANCE
456                    {
457                        process_signers_action_item_response(
458                            runbook,
459                            &block_tx,
460                            &action_item_id,
461                            &mut action_item_requests,
462                            &action_item_responses,
463                            current_flow_index,
464                        )
465                        .await;
466                    }
467                }
468
469                if *force_execution {
470                    let running_context =
471                        runbook.flow_contexts.get_mut(current_flow_index).unwrap();
472                    let mut pass_results = run_constructs_evaluation(
473                        &background_tasks_handle_uuid,
474                        &running_context.workspace_context,
475                        &mut running_context.execution_context,
476                        &mut runbook.runtime_context,
477                        &runbook.supervision_context,
478                        &mut BTreeMap::new(),
479                        &action_item_responses,
480                        &block_tx.clone(),
481                    )
482                    .await;
483                    let mut updated_actions = vec![];
484                    for action in pass_results
485                        .actions
486                        .compile_actions_to_item_updates(&action_item_requests)
487                        .into_iter()
488                    {
489                        updated_actions.push(action.normalize(&action_item_requests).unwrap())
490                    }
491                    let _ = block_tx.send(BlockEvent::UpdateActionItems(updated_actions));
492
493                    if !pass_results.pending_background_tasks_constructs_uuids.is_empty() {
494                        background_tasks_futures
495                            .append(&mut pass_results.pending_background_tasks_futures);
496                        background_tasks_contructs_dids
497                            .append(&mut pass_results.pending_background_tasks_constructs_uuids);
498                    }
499
500                    if pass_results.has_diagnostics() {
501                        pass_results.fill_diagnostic_span(&runbook.sources);
502                    }
503                    if let Some(error_event) = pass_results.compile_diagnostics_to_block() {
504                        let _ = block_tx.send(BlockEvent::Error(error_event));
505                        return Err(pass_results.with_spans_filled(&runbook.sources));
506                    }
507                }
508            }
509            ActionItemResponseType::ProvidePublicKey(_response) => {
510                process_signers_action_item_response(
511                    runbook,
512                    &block_tx,
513                    &action_item_id,
514                    &mut action_item_requests,
515                    &action_item_responses,
516                    current_flow_index,
517                )
518                .await;
519            }
520            ActionItemResponseType::VerifyThirdPartySignature(_)
521            | ActionItemResponseType::ProvideSignedTransaction(_)
522            | ActionItemResponseType::SendTransaction(_)
523            | ActionItemResponseType::ProvideSignedMessage(_) => {
524                // Retrieve the previous requests sent and update their statuses.
525                let Some((signing_action_construct_did, scoped_requests)) =
526                    retrieve_related_action_items_requests(
527                        &action_item_id,
528                        &mut action_item_requests,
529                    )
530                else {
531                    continue;
532                };
533                let mut map: BTreeMap<ConstructDid, _> = BTreeMap::new();
534                map.insert(signing_action_construct_did, scoped_requests);
535
536                let running_context = runbook.flow_contexts.get_mut(current_flow_index).unwrap();
537                let mut pass_results = run_constructs_evaluation(
538                    &background_tasks_handle_uuid,
539                    &running_context.workspace_context,
540                    &mut running_context.execution_context,
541                    &mut runbook.runtime_context,
542                    &runbook.supervision_context,
543                    &mut map,
544                    &action_item_responses,
545                    &block_tx.clone(),
546                )
547                .await;
548
549                let mut updated_actions = vec![];
550                for action in pass_results
551                    .actions
552                    .compile_actions_to_item_updates(&action_item_requests)
553                    .into_iter()
554                {
555                    updated_actions.push(action.normalize(&action_item_requests).unwrap())
556                }
557
558                let _ = block_tx.send(BlockEvent::UpdateActionItems(updated_actions));
559
560                if !pass_results.pending_background_tasks_constructs_uuids.is_empty() {
561                    background_tasks_futures
562                        .append(&mut pass_results.pending_background_tasks_futures);
563                    background_tasks_contructs_dids
564                        .append(&mut pass_results.pending_background_tasks_constructs_uuids);
565                }
566                if pass_results.has_diagnostics() {
567                    pass_results.fill_diagnostic_span(&runbook.sources);
568                }
569                if let Some(error_event) = pass_results.compile_diagnostics_to_block() {
570                    let _ = block_tx.send(BlockEvent::Error(error_event));
571                    return Err(pass_results.with_spans_filled(&runbook.sources));
572                }
573            }
574        };
575    }
576}
577
578pub fn register_action_items_from_actions(
579    actions: &Actions,
580    action_item_requests: &mut BTreeMap<BlockId, ActionItemRequest>,
581) {
582    for action in actions.get_new_action_item_requests().into_iter() {
583        action_item_requests.insert(action.id.clone(), action.clone());
584    }
585}
586
587pub fn retrieve_related_action_items_requests<'a>(
588    action_item_id: &BlockId,
589    action_item_requests: &'a mut BTreeMap<BlockId, ActionItemRequest>,
590) -> Option<(ConstructDid, Vec<&'a mut ActionItemRequest>)> {
591    let Some(signer_construct_did) =
592        action_item_requests.get(&action_item_id).and_then(|a| a.construct_did.clone())
593    else {
594        eprintln!("unable to retrieve {}", action_item_id);
595        // todo: log error
596        return None;
597    };
598    // // Retrieve the previous requests sent
599    // // and update their statuses.
600    let mut scoped_requests = vec![];
601    for (_, request) in action_item_requests.iter_mut() {
602        let Some(ref construct_did) = request.construct_did else {
603            continue;
604        };
605        if construct_did.eq(&signer_construct_did) {
606            scoped_requests.push(request);
607        }
608    }
609    Some((signer_construct_did, scoped_requests))
610}
611
612pub async fn reset_runbook_execution(
613    runbook: &mut Runbook,
614    payload: &ActionItemResponseType,
615    action_item_requests: &mut BTreeMap<BlockId, ActionItemRequest>,
616    action_item_responses: &BTreeMap<ConstructDid, Vec<ActionItemResponse>>,
617    progress_tx: &Sender<BlockEvent>,
618    current_flow_index: usize,
619    total_flows_count: usize,
620) -> Result<(), Vec<Diagnostic>> {
621    let ActionItemResponseType::PickInputOption(environment_key) = payload else {
622        unreachable!(
623            "Action item event wih environment uuid sent with invalid payload {:?}",
624            payload
625        );
626    };
627
628    let reset = runbook.update_inputs_selector(Some(environment_key.to_string()), true).await?;
629
630    if !reset {
631        unimplemented!()
632    }
633
634    let _ = progress_tx.send(BlockEvent::Clear);
635    let genesis_events = build_genesis_panel(
636        runbook,
637        action_item_requests,
638        action_item_responses,
639        &progress_tx,
640        0,
641        current_flow_index,
642        total_flows_count,
643    )
644    .await?;
645    for event in genesis_events {
646        let _ = progress_tx.send(event).unwrap();
647    }
648    Ok(())
649}
650
651pub async fn build_genesis_panel(
652    runbook: &mut Runbook,
653    action_item_requests: &mut BTreeMap<BlockId, ActionItemRequest>,
654    action_item_responses: &BTreeMap<ConstructDid, Vec<ActionItemResponse>>,
655    progress_tx: &Sender<BlockEvent>,
656    validated_blocks: usize,
657    current_flow_index: usize,
658    total_flows_count: usize,
659) -> Result<Vec<BlockEvent>, Vec<Diagnostic>> {
660    let mut actions = Actions::none();
661
662    let environments = runbook.get_inputs_selectors();
663    let selector = runbook.get_active_inputs_selector();
664
665    let Some(flow_context) = runbook.flow_contexts.get_mut(current_flow_index) else {
666        return Err(vec![diagnosed_error!(
667            "internal error: attempted to access a flow that does not exist"
668        )]);
669    };
670
671    if total_flows_count > 1 {
672        actions.push_begin_flow_panel(
673            current_flow_index,
674            total_flows_count,
675            &flow_context.name,
676            &flow_context.description,
677        );
678    } else {
679    }
680    actions.push_panel("runbook checklist", "");
681
682    if environments.len() > 0 {
683        let input_options: Vec<InputOption> = environments
684            .iter()
685            .map(|k| InputOption { value: k.to_string(), displayed_value: k.to_string() })
686            .collect();
687        let selected_option: InputOption = selector
688            .clone()
689            .and_then(|e| Some(InputOption { value: e.clone(), displayed_value: e.clone() }))
690            .unwrap_or({
691                let k = environments.iter().next().unwrap();
692                InputOption { value: k.clone(), displayed_value: k.clone() }
693            });
694
695        let action_request = ActionItemRequestType::PickInputOption(PickInputOptionRequest {
696            options: input_options,
697            selected: selected_option,
698        })
699        .to_request("", ACTION_ITEM_ENV)
700        .with_meta_description("Select the environment to target")
701        .with_status(ActionItemStatus::Success(None));
702
703        actions.push_sub_group(None, vec![action_request]);
704    }
705
706    let mut pass_result: eval::EvaluationPassResult = run_signers_evaluation(
707        &flow_context.workspace_context,
708        &mut flow_context.execution_context,
709        &runbook.runtime_context,
710        &runbook.supervision_context,
711        &mut BTreeMap::new(),
712        &action_item_responses,
713        &progress_tx,
714    )
715    .await;
716
717    if pass_result.has_diagnostics() {
718        return Err(pass_result.with_spans_filled(&runbook.sources));
719    }
720
721    actions.append(&mut pass_result.actions);
722
723    let validate_action =
724        ActionItemRequestType::ValidateBlock(ValidateBlockData::new(validated_blocks))
725            .to_request("start runbook", ACTION_ITEM_GENESIS);
726
727    actions.push_sub_group(None, vec![validate_action]);
728
729    register_action_items_from_actions(&actions, action_item_requests);
730
731    let panels = actions.compile_actions_to_block_events(&action_item_requests);
732    for panel in panels.iter() {
733        match panel {
734            BlockEvent::Modal(_) => {}
735            BlockEvent::Action(_) => {}
736            _ => {
737                println!("-----");
738            }
739        }
740    }
741    // assert_eq!(panels.len(), 1);
742
743    Ok(panels)
744}
745
746#[derive(Debug, Clone)]
747pub struct SupervisedBackgroundTaskContext {
748    block_tx: Sender<BlockEvent>,
749    action_item_id: BlockId,
750}
751impl SupervisedBackgroundTaskContext {
752    pub fn new(block_tx: &Sender<BlockEvent>, action_item_id: &BlockId) -> Self {
753        SupervisedBackgroundTaskContext {
754            block_tx: block_tx.clone(),
755            action_item_id: action_item_id.clone(),
756        }
757    }
758}
759
760pub async fn process_background_tasks(
761    supervised_context: Option<SupervisedBackgroundTaskContext>,
762    background_tasks_contructs_dids: Vec<(ConstructDid, ConstructDid)>,
763    background_tasks_futures: Vec<
764        Pin<Box<dyn Future<Output = Result<CommandExecutionResult, Diagnostic>> + Send>>,
765    >,
766    flow_context: &mut FlowContext,
767) -> Result<(), Diagnostic> {
768    if let Some(SupervisedBackgroundTaskContext { block_tx, action_item_id, .. }) =
769        supervised_context.as_ref()
770    {
771        let _ =
772            block_tx.send(BlockEvent::UpdateActionItems(vec![NormalizedActionItemRequestUpdate {
773                id: action_item_id.clone(),
774                action_status: Some(ActionItemStatus::Success(None)),
775                action_type: None,
776            }]));
777    }
778
779    let results: Vec<Result<CommandExecutionResult, Diagnostic>> =
780        txtx_addon_kit::futures::future::join_all(background_tasks_futures).await;
781    for ((nested_construct_did, construct_did), result) in
782        background_tasks_contructs_dids.into_iter().zip(results)
783    {
784        match result {
785            Ok(result) => {
786                flow_context
787                    .execution_context
788                    .append_commands_execution_result(&nested_construct_did, &result);
789            }
790            Err(mut diag) => {
791                let construct_id =
792                    flow_context.workspace_context.expect_construct_id(&construct_did);
793                diag = diag.location(&construct_id.construct_location);
794                if let Some(command_instance) =
795                    flow_context.execution_context.commands_instances.get_mut(&construct_did)
796                {
797                    diag = diag.set_span_range(command_instance.block.span());
798                };
799                if let Some(SupervisedBackgroundTaskContext { block_tx, .. }) =
800                    supervised_context.as_ref()
801                {
802                    let _ = block_tx.send(BlockEvent::Error(Block {
803                        uuid: Uuid::new_v4(),
804                        visible: true,
805                        panel: Panel::ErrorPanel(ErrorPanelData::from_diagnostics(&vec![
806                            diag.clone()
807                        ])),
808                    }));
809                }
810                return Err(diag);
811            }
812        }
813    }
814
815    Ok(())
816}
817
818pub async fn process_signers_action_item_response(
819    runbook: &mut Runbook,
820    block_tx: &Sender<BlockEvent>,
821    action_item_id: &BlockId,
822    action_item_requests: &mut BTreeMap<BlockId, ActionItemRequest>,
823    action_item_responses: &BTreeMap<ConstructDid, Vec<ActionItemResponse>>,
824    current_flow_index: usize,
825) {
826    // Retrieve the previous requests sent and update their statuses.
827    let Some((signer_construct_did, scoped_requests)) =
828        retrieve_related_action_items_requests(&action_item_id, action_item_requests)
829    else {
830        return;
831    };
832
833    let mut map = BTreeMap::new();
834    map.insert(signer_construct_did, scoped_requests);
835
836    let flow_context = runbook.flow_contexts.get_mut(current_flow_index).unwrap();
837    let mut pass_result = run_signers_evaluation(
838        &flow_context.workspace_context,
839        &mut flow_context.execution_context,
840        &mut runbook.runtime_context,
841        &runbook.supervision_context,
842        &mut map,
843        &action_item_responses,
844        &block_tx.clone(),
845    )
846    .await;
847
848    if pass_result.has_diagnostics() {
849        pass_result.fill_diagnostic_span(&runbook.sources);
850    }
851
852    if let Some(error_event) = pass_result.compile_diagnostics_to_block() {
853        let _ = block_tx.send(BlockEvent::Error(error_event));
854    } else {
855        let updated_actions = pass_result
856            .actions
857            .compile_actions_to_item_updates(&action_item_requests)
858            .into_iter()
859            .map(|u| u.normalize(&action_item_requests).unwrap())
860            .collect();
861        let _ = block_tx.send(BlockEvent::UpdateActionItems(updated_actions));
862    }
863}