1#[macro_use]
2extern crate lazy_static;
3
4#[macro_use]
5pub extern crate txtx_addon_kit as kit;
6
7pub extern crate mustache;
8
9mod constants;
10pub mod errors;
11pub mod eval;
12pub mod manifest;
13pub mod runbook;
15pub mod std;
16pub mod templates;
17pub mod types;
18
19#[cfg(test)]
20mod tests;
21pub mod utils;
22
23use ::std::collections::BTreeMap;
24use ::std::future::Future;
25use ::std::pin::Pin;
26use ::std::thread::sleep;
27use ::std::time::Duration;
28
29use crate::runbook::flow_context::FlowContext;
30use constants::ACTION_ITEM_ENV;
31use constants::ACTION_ITEM_GENESIS;
32use constants::ACTION_ITEM_VALIDATE_BLOCK;
33use eval::run_constructs_evaluation;
34use eval::run_signers_evaluation;
35use kit::constants::ACTION_ITEM_CHECK_BALANCE;
36use runbook::get_source_context_for_diagnostic;
37use tokio::sync::broadcast::error::TryRecvError;
38use txtx_addon_kit::channel::Sender;
39use txtx_addon_kit::constants::ACTION_ITEM_CHECK_ADDRESS;
40use txtx_addon_kit::hcl::Span;
41use txtx_addon_kit::types::block_id::BlockId;
42use txtx_addon_kit::types::commands::CommandExecutionResult;
43use txtx_addon_kit::types::diagnostics::Diagnostic;
44use txtx_addon_kit::types::frontend::ActionItemRequest;
45use txtx_addon_kit::types::frontend::ActionItemRequestType;
46use txtx_addon_kit::types::frontend::ActionItemRequestUpdate;
47use txtx_addon_kit::types::frontend::ActionItemResponse;
48use txtx_addon_kit::types::frontend::ActionItemResponseType;
49use txtx_addon_kit::types::frontend::ActionItemStatus;
50use txtx_addon_kit::types::frontend::Actions;
51use txtx_addon_kit::types::frontend::Block;
52use txtx_addon_kit::types::frontend::BlockEvent;
53use txtx_addon_kit::types::frontend::ErrorPanelData;
54use txtx_addon_kit::types::frontend::InputOption;
55use txtx_addon_kit::types::frontend::NormalizedActionItemRequestUpdate;
56use txtx_addon_kit::types::frontend::Panel;
57use txtx_addon_kit::types::frontend::PickInputOptionRequest;
58use txtx_addon_kit::types::frontend::ProgressBarVisibilityUpdate;
59use txtx_addon_kit::types::frontend::ReviewedInputResponse;
60use txtx_addon_kit::types::frontend::ValidateBlockData;
61use txtx_addon_kit::types::types::RunbookSupervisionContext;
62use txtx_addon_kit::types::ConstructDid;
63use txtx_addon_kit::uuid::Uuid;
64use types::Runbook;
65
66lazy_static! {
67 pub static ref SET_ENV_ACTION: ActionItemRequest = ActionItemRequest ::new(
69 &None,
70 "Select the environment to target",
71 None,
72 ActionItemStatus::Success(None),
73 ActionItemRequestType::PickInputOption(PickInputOptionRequest {
74 options: vec![],
75 selected: InputOption::default(),
76 }),
77 ACTION_ITEM_ENV,
78 );
79}
80
81pub async fn start_unsupervised_runbook_runloop(
82 runbook: &mut Runbook,
83 progress_tx: &txtx_addon_kit::channel::Sender<BlockEvent>,
84) -> Result<(), Vec<Diagnostic>> {
85 runbook.supervision_context = RunbookSupervisionContext {
86 review_input_default_values: false,
87 review_input_values: false,
88 is_supervised: false,
89 };
90
91 for flow_context in runbook.flow_contexts.iter_mut() {
92 if !flow_context.is_enabled() {
93 continue;
94 }
95
96 let mut action_item_requests = BTreeMap::new();
97 let action_item_responses = BTreeMap::new();
98
99 let pass_results = run_signers_evaluation(
100 &flow_context.workspace_context,
101 &mut flow_context.execution_context,
102 &runbook.runtime_context,
103 &runbook.supervision_context,
104 &mut action_item_requests,
105 &action_item_responses,
106 &progress_tx,
107 )
108 .await;
109
110 if pass_results.actions.has_pending_actions() {
111 return Err(vec![diagnosed_error!(
112 "unsupervised executions should not be generating actions"
113 )]);
114 }
115
116 if pass_results.has_diagnostics() {
117 return Err(pass_results.with_spans_filled(&runbook.sources));
118 }
119
120 let mut uuid = Uuid::new_v4();
121 let mut background_tasks_futures = vec![];
122 let mut background_tasks_contructs_dids = vec![];
123 let mut runbook_completed = false;
124
125 loop {
126 let mut pass_results = run_constructs_evaluation(
127 &uuid,
128 &flow_context.workspace_context,
129 &mut flow_context.execution_context,
130 &mut runbook.runtime_context,
131 &runbook.supervision_context,
132 &mut action_item_requests,
133 &action_item_responses,
134 &progress_tx,
135 )
136 .await;
137
138 if pass_results.has_diagnostics() {
139 return Err(pass_results.with_spans_filled(&runbook.sources));
140 }
141
142 if !pass_results.pending_background_tasks_constructs_uuids.is_empty() {
143 background_tasks_futures.append(&mut pass_results.pending_background_tasks_futures);
144 background_tasks_contructs_dids
145 .append(&mut pass_results.pending_background_tasks_constructs_uuids);
146 }
147
148 if !pass_results.actions.has_pending_actions()
149 && background_tasks_contructs_dids.is_empty()
150 && pass_results.nodes_to_re_execute.is_empty()
151 {
152 runbook_completed = true;
153 }
154
155 if background_tasks_futures.is_empty() {
156 } else {
158 process_background_tasks(
159 None,
160 background_tasks_contructs_dids,
161 background_tasks_futures,
162 flow_context,
163 )
164 .await
165 .map_err(|mut diag| {
166 diag.span = get_source_context_for_diagnostic(&diag, &runbook.sources);
167 vec![diag]
168 })?;
169 background_tasks_futures = vec![];
170 background_tasks_contructs_dids = vec![];
171 }
172
173 uuid = Uuid::new_v4();
174 if runbook_completed {
175 break;
176 }
177 }
178 }
179
180 Ok(())
181}
182
183pub async fn start_supervised_runbook_runloop(
184 runbook: &mut Runbook,
185 block_tx: Sender<BlockEvent>,
186 mut action_item_responses_rx: tokio::sync::broadcast::Receiver<ActionItemResponse>,
187) -> Result<(), Vec<Diagnostic>> {
188 let mut intialized_flow_index: i16 = -1;
191 runbook.supervision_context = RunbookSupervisionContext {
192 review_input_default_values: true,
193 review_input_values: true,
194 is_supervised: true,
195 };
196
197 let mut flow_action_item_requests: BTreeMap<usize, BTreeMap<BlockId, ActionItemRequest>> =
202 BTreeMap::new();
203 let mut flow_action_item_responses = BTreeMap::new();
205
206 let mut background_tasks_futures = vec![];
207 let mut background_tasks_contructs_dids = vec![];
208 let mut background_tasks_handle_uuid = Uuid::new_v4();
209 let mut validated_blocks = 0;
210 let total_flows_count = runbook.flow_contexts.len();
211 let mut current_flow_index: usize = 0;
212 loop {
213 let event_opt = match action_item_responses_rx.try_recv() {
214 Ok(action) => Some(action),
215 Err(TryRecvError::Empty) | Err(TryRecvError::Lagged(_)) => None,
216 Err(TryRecvError::Closed) => return Ok(()),
217 };
218
219 if intialized_flow_index != current_flow_index as i16 {
220 intialized_flow_index = current_flow_index as i16;
221
222 flow_action_item_responses.insert(current_flow_index, BTreeMap::new());
223 flow_action_item_requests.insert(current_flow_index, BTreeMap::new());
224
225 let action_item_responses =
226 flow_action_item_responses.get_mut(¤t_flow_index).unwrap();
227 let mut action_item_requests =
228 flow_action_item_requests.get_mut(¤t_flow_index).unwrap();
229
230 let genesis_events = build_genesis_panel(
231 runbook,
232 &mut action_item_requests,
233 &action_item_responses,
234 &block_tx.clone(),
235 validated_blocks,
236 current_flow_index,
237 total_flows_count,
238 )
239 .await?;
240 for event in genesis_events {
241 let _ = block_tx.send(event).unwrap();
242 }
243 }
244 let action_item_responses =
245 flow_action_item_responses.get_mut(¤t_flow_index).unwrap();
246 let mut action_item_requests =
247 flow_action_item_requests.get_mut(¤t_flow_index).unwrap();
248
249 let Some(action_item_response) = event_opt else {
251 sleep(Duration::from_millis(50));
252 continue;
253 };
254 let ActionItemResponse { action_item_id, payload } = action_item_response.clone();
255
256 if action_item_id == SET_ENV_ACTION.id {
257 if let Err(diags) = reset_runbook_execution(
258 runbook,
259 &payload,
260 &mut action_item_requests,
261 &action_item_responses,
262 &block_tx.clone(),
263 current_flow_index,
264 total_flows_count,
265 )
266 .await
267 {
268 let _ = block_tx.send(BlockEvent::Error(Block {
269 uuid: Uuid::new_v4(),
270 visible: true,
271 panel: Panel::ErrorPanel(ErrorPanelData::from_diagnostics(&diags)),
272 }));
273 return Err(diags);
274 };
275 continue;
276 }
277
278 if let Some(action_item) = action_item_requests.get(&action_item_id) {
279 let action_item = action_item.clone();
280 if let Some(construct_did) = action_item.construct_did {
281 if let Some(responses) = action_item_responses.get_mut(&construct_did) {
282 responses.push(action_item_response);
283 } else {
284 action_item_responses.insert(construct_did, vec![action_item_response]);
285 }
286 }
287 }
288
289 match &payload {
290 ActionItemResponseType::ValidateModal => {}
291 ActionItemResponseType::ValidateBlock => {
292 let mut bg_uuid_initialized = false;
295
296 loop {
307 let start_of_loop_had_bg_tasks = !background_tasks_futures.is_empty();
308 if start_of_loop_had_bg_tasks {
310 let flow_context =
311 runbook.flow_contexts.get_mut(current_flow_index).unwrap();
312 let supervised_bg_context = if bg_uuid_initialized {
313 None
314 } else {
315 Some(SupervisedBackgroundTaskContext::new(
316 &block_tx,
317 &background_tasks_handle_uuid,
318 &action_item_id,
319 ))
320 };
321 process_background_tasks(
322 supervised_bg_context,
323 background_tasks_contructs_dids,
324 background_tasks_futures,
325 flow_context,
326 )
327 .await
328 .map_err(|mut diag| {
329 diag.span = get_source_context_for_diagnostic(&diag, &runbook.sources);
330 vec![diag]
331 })?;
332 bg_uuid_initialized = true;
333 background_tasks_futures = vec![];
334 background_tasks_contructs_dids = vec![];
335 }
336
337 let mut flow_execution_completed = false;
339 let mut map: BTreeMap<ConstructDid, _> = BTreeMap::new();
340 let flow_context = runbook.flow_contexts.get_mut(current_flow_index).unwrap();
341 let mut pass_results = run_constructs_evaluation(
342 &background_tasks_handle_uuid,
343 &flow_context.workspace_context,
344 &mut flow_context.execution_context,
345 &mut runbook.runtime_context,
346 &runbook.supervision_context,
347 &mut map,
348 &action_item_responses,
349 &block_tx.clone(),
350 )
351 .await;
352
353 if let Some(error_event) = pass_results.compile_diagnostics_to_block() {
355 let _ = block_tx.send(BlockEvent::Error(error_event));
356 set_progress_bar_visibility(
357 &block_tx,
358 &background_tasks_handle_uuid,
359 false,
360 );
361 return Err(pass_results.with_spans_filled(&runbook.sources));
362 }
363
364 let pass_has_pending_bg_tasks =
365 !pass_results.pending_background_tasks_constructs_uuids.is_empty();
366 let pass_has_pending_actions = pass_results.actions.has_pending_actions();
367 let pass_has_nodes_to_re_execute = !pass_results.nodes_to_re_execute.is_empty();
368
369 if !pass_has_pending_actions
370 && !pass_has_pending_bg_tasks
371 && !pass_has_nodes_to_re_execute
372 {
373 let flow_context =
374 runbook.flow_contexts.get_mut(current_flow_index).unwrap();
375 let grouped_actions_items =
376 flow_context.execution_context.collect_outputs_constructs_results();
377 let mut actions = Actions::new_panel("output review", "");
378 for (key, action_items) in grouped_actions_items.into_iter() {
379 actions.push_group(key.as_str(), action_items);
380 }
381 pass_results.actions.append(&mut actions);
382
383 flow_execution_completed = true;
384 } else if !pass_results.actions.store.is_empty() {
385 validated_blocks = validated_blocks + 1;
386 pass_results.actions.push_sub_group(
387 None,
388 vec![ActionItemRequest::new(
389 &None,
390 "Validate",
391 None,
392 ActionItemStatus::Todo,
393 ActionItemRequestType::ValidateBlock(ValidateBlockData::new(
394 validated_blocks,
395 )),
396 ACTION_ITEM_VALIDATE_BLOCK,
397 )],
398 );
399 }
400
401 if pass_has_pending_bg_tasks {
402 background_tasks_futures
403 .append(&mut pass_results.pending_background_tasks_futures);
404 background_tasks_contructs_dids
405 .append(&mut pass_results.pending_background_tasks_constructs_uuids);
406 }
407
408 if !pass_has_pending_bg_tasks && !start_of_loop_had_bg_tasks {
409 let update = ActionItemRequestUpdate::from_id(&action_item_id)
410 .set_status(ActionItemStatus::Success(None));
411 pass_results.actions.push_action_item_update(update);
412 for new_request in
413 pass_results.actions.get_new_action_item_requests().into_iter()
414 {
415 action_item_requests
416 .insert(new_request.id.clone(), new_request.clone());
417 }
418 let block_events = pass_results
419 .actions
420 .compile_actions_to_block_events(&action_item_requests);
421
422 for event in block_events.into_iter() {
423 let _ = block_tx.send(event);
424 }
425 }
426 if flow_execution_completed && !start_of_loop_had_bg_tasks {
427 set_progress_bar_visibility(
428 &block_tx,
429 &background_tasks_handle_uuid,
430 false,
431 );
432 if current_flow_index == total_flows_count - 1 {
433 let _ = block_tx.send(BlockEvent::RunbookCompleted);
434 return Ok(());
435 } else {
436 current_flow_index += 1;
437 }
438 }
439 if !pass_has_pending_bg_tasks
440 && !start_of_loop_had_bg_tasks
441 && !pass_has_nodes_to_re_execute
442 {
443 set_progress_bar_visibility(
444 &block_tx,
445 &background_tasks_handle_uuid,
446 false,
447 );
448 background_tasks_handle_uuid = Uuid::new_v4();
449 break;
450 }
451 }
452 }
453 ActionItemResponseType::PickInputOption(_) => {}
454 ActionItemResponseType::ProvideInput(_) => {}
455 ActionItemResponseType::ReviewInput(ReviewedInputResponse {
456 value_checked,
457 force_execution,
458 ..
459 }) => {
460 let new_status = match value_checked {
461 true => ActionItemStatus::Success(None),
462 false => ActionItemStatus::Todo,
463 };
464 let _ = block_tx.send(BlockEvent::UpdateActionItems(vec![
465 ActionItemRequestUpdate::from_id(&action_item_id)
466 .set_status(new_status)
467 .normalize(&action_item_requests)
468 .unwrap(),
469 ]));
470 if let Some(request) = action_item_requests.get(&action_item_id) {
474 if request.internal_key == ACTION_ITEM_CHECK_ADDRESS
475 || request.internal_key == ACTION_ITEM_CHECK_BALANCE
476 {
477 process_signers_action_item_response(
478 runbook,
479 &block_tx,
480 &action_item_id,
481 &mut action_item_requests,
482 &action_item_responses,
483 current_flow_index,
484 )
485 .await;
486 }
487 }
488
489 if *force_execution {
490 let running_context =
491 runbook.flow_contexts.get_mut(current_flow_index).unwrap();
492 let mut pass_results = run_constructs_evaluation(
493 &background_tasks_handle_uuid,
494 &running_context.workspace_context,
495 &mut running_context.execution_context,
496 &mut runbook.runtime_context,
497 &runbook.supervision_context,
498 &mut BTreeMap::new(),
499 &action_item_responses,
500 &block_tx.clone(),
501 )
502 .await;
503 let mut updated_actions = vec![];
504 for action in pass_results
505 .actions
506 .compile_actions_to_item_updates(&action_item_requests)
507 .into_iter()
508 {
509 updated_actions.push(action.normalize(&action_item_requests).unwrap())
510 }
511 let _ = block_tx.send(BlockEvent::UpdateActionItems(updated_actions));
512
513 if !pass_results.pending_background_tasks_constructs_uuids.is_empty() {
514 background_tasks_futures
515 .append(&mut pass_results.pending_background_tasks_futures);
516 background_tasks_contructs_dids
517 .append(&mut pass_results.pending_background_tasks_constructs_uuids);
518 }
519
520 if pass_results.has_diagnostics() {
521 pass_results.fill_diagnostic_span(&runbook.sources);
522 }
523 if let Some(error_event) = pass_results.compile_diagnostics_to_block() {
524 let _ = block_tx.send(BlockEvent::Error(error_event));
525 }
526 }
527 }
528 ActionItemResponseType::ProvidePublicKey(_response) => {
529 process_signers_action_item_response(
530 runbook,
531 &block_tx,
532 &action_item_id,
533 &mut action_item_requests,
534 &action_item_responses,
535 current_flow_index,
536 )
537 .await;
538 }
539 ActionItemResponseType::ProvideSignedTransaction(_)
540 | ActionItemResponseType::SendTransaction(_)
541 | ActionItemResponseType::ProvideSignedMessage(_) => {
542 let Some((signing_action_construct_did, scoped_requests)) =
544 retrieve_related_action_items_requests(
545 &action_item_id,
546 &mut action_item_requests,
547 )
548 else {
549 continue;
550 };
551 let mut map: BTreeMap<ConstructDid, _> = BTreeMap::new();
552 map.insert(signing_action_construct_did, scoped_requests);
553
554 let running_context = runbook.flow_contexts.get_mut(current_flow_index).unwrap();
555 let mut pass_results = run_constructs_evaluation(
556 &background_tasks_handle_uuid,
557 &running_context.workspace_context,
558 &mut running_context.execution_context,
559 &mut runbook.runtime_context,
560 &runbook.supervision_context,
561 &mut map,
562 &action_item_responses,
563 &block_tx.clone(),
564 )
565 .await;
566
567 let mut updated_actions = vec![];
568 for action in pass_results
569 .actions
570 .compile_actions_to_item_updates(&action_item_requests)
571 .into_iter()
572 {
573 updated_actions.push(action.normalize(&action_item_requests).unwrap())
574 }
575
576 let _ = block_tx.send(BlockEvent::UpdateActionItems(updated_actions));
577
578 if !pass_results.pending_background_tasks_constructs_uuids.is_empty() {
579 background_tasks_futures
580 .append(&mut pass_results.pending_background_tasks_futures);
581 background_tasks_contructs_dids
582 .append(&mut pass_results.pending_background_tasks_constructs_uuids);
583 }
584 if pass_results.has_diagnostics() {
585 pass_results.fill_diagnostic_span(&runbook.sources);
586 }
587 if let Some(error_event) = pass_results.compile_diagnostics_to_block() {
588 let _ = block_tx.send(BlockEvent::Error(error_event));
589 }
590 }
591 };
592 }
593}
594
595pub fn register_action_items_from_actions(
596 actions: &Actions,
597 action_item_requests: &mut BTreeMap<BlockId, ActionItemRequest>,
598) {
599 for action in actions.get_new_action_item_requests().into_iter() {
600 action_item_requests.insert(action.id.clone(), action.clone());
601 }
602}
603
604pub fn retrieve_related_action_items_requests<'a>(
605 action_item_id: &BlockId,
606 action_item_requests: &'a mut BTreeMap<BlockId, ActionItemRequest>,
607) -> Option<(ConstructDid, Vec<&'a mut ActionItemRequest>)> {
608 let Some(signer_construct_did) =
609 action_item_requests.get(&action_item_id).and_then(|a| a.construct_did.clone())
610 else {
611 eprintln!("unable to retrieve {}", action_item_id);
612 return None;
614 };
615 let mut scoped_requests = vec![];
618 for (_, request) in action_item_requests.iter_mut() {
619 let Some(ref construct_did) = request.construct_did else {
620 continue;
621 };
622 if construct_did.eq(&signer_construct_did) {
623 scoped_requests.push(request);
624 }
625 }
626 Some((signer_construct_did, scoped_requests))
627}
628
629pub async fn reset_runbook_execution(
630 runbook: &mut Runbook,
631 payload: &ActionItemResponseType,
632 action_item_requests: &mut BTreeMap<BlockId, ActionItemRequest>,
633 action_item_responses: &BTreeMap<ConstructDid, Vec<ActionItemResponse>>,
634 progress_tx: &Sender<BlockEvent>,
635 current_flow_index: usize,
636 total_flows_count: usize,
637) -> Result<(), Vec<Diagnostic>> {
638 let ActionItemResponseType::PickInputOption(environment_key) = payload else {
639 unreachable!(
640 "Action item event wih environment uuid sent with invalid payload {:?}",
641 payload
642 );
643 };
644
645 let reset = runbook.update_inputs_selector(Some(environment_key.to_string()), true).await?;
646
647 if !reset {
648 unimplemented!()
649 }
650
651 let _ = progress_tx.send(BlockEvent::Clear);
652 let genesis_events = build_genesis_panel(
653 runbook,
654 action_item_requests,
655 action_item_responses,
656 &progress_tx,
657 0,
658 current_flow_index,
659 total_flows_count,
660 )
661 .await?;
662 for event in genesis_events {
663 let _ = progress_tx.send(event).unwrap();
664 }
665 Ok(())
666}
667
668pub async fn build_genesis_panel(
669 runbook: &mut Runbook,
670 action_item_requests: &mut BTreeMap<BlockId, ActionItemRequest>,
671 action_item_responses: &BTreeMap<ConstructDid, Vec<ActionItemResponse>>,
672 progress_tx: &Sender<BlockEvent>,
673 validated_blocks: usize,
674 current_flow_index: usize,
675 total_flows_count: usize,
676) -> Result<Vec<BlockEvent>, Vec<Diagnostic>> {
677 let mut actions = Actions::none();
678
679 let environments = runbook.get_inputs_selectors();
680 let selector = runbook.get_active_inputs_selector();
681
682 let Some(flow_context) = runbook.flow_contexts.get_mut(current_flow_index) else {
683 return Err(vec![diagnosed_error!(
684 "internal error: attempted to access a flow that does not exist"
685 )]);
686 };
687
688 if total_flows_count > 1 {
689 actions.push_begin_flow_panel(
690 current_flow_index,
691 &flow_context.name,
692 &flow_context.description,
693 );
694 } else {
695 }
696 actions.push_panel("runbook checklist", "");
697
698 if environments.len() > 0 {
699 let input_options: Vec<InputOption> = environments
700 .iter()
701 .map(|k| InputOption { value: k.to_string(), displayed_value: k.to_string() })
702 .collect();
703 let selected_option: InputOption = selector
704 .clone()
705 .and_then(|e| Some(InputOption { value: e.clone(), displayed_value: e.clone() }))
706 .unwrap_or({
707 let k = environments.iter().next().unwrap();
708 InputOption { value: k.clone(), displayed_value: k.clone() }
709 });
710 let action_request = ActionItemRequest::new(
711 &None,
712 "Select the environment to target",
713 None,
714 ActionItemStatus::Success(None),
715 ActionItemRequestType::PickInputOption(PickInputOptionRequest {
716 options: input_options,
717 selected: selected_option,
718 }),
719 ACTION_ITEM_ENV,
720 );
721 actions.push_sub_group(None, vec![action_request]);
722 }
723
724 let mut pass_result: eval::EvaluationPassResult = run_signers_evaluation(
725 &flow_context.workspace_context,
726 &mut flow_context.execution_context,
727 &runbook.runtime_context,
728 &runbook.supervision_context,
729 &mut BTreeMap::new(),
730 &action_item_responses,
731 &progress_tx,
732 )
733 .await;
734
735 if pass_result.has_diagnostics() {
736 return Err(pass_result.with_spans_filled(&runbook.sources));
737 }
738
739 actions.append(&mut pass_result.actions);
740
741 let validate_action = ActionItemRequest::new(
742 &None,
743 "start runbook".into(),
744 None,
745 ActionItemStatus::Todo,
746 ActionItemRequestType::ValidateBlock(ValidateBlockData::new(validated_blocks)),
747 ACTION_ITEM_GENESIS,
748 );
749 actions.push_sub_group(None, vec![validate_action]);
750
751 register_action_items_from_actions(&actions, action_item_requests);
752
753 let panels = actions.compile_actions_to_block_events(&action_item_requests);
754 for panel in panels.iter() {
755 match panel {
756 BlockEvent::Modal(_) => {}
757 BlockEvent::Action(_) => {}
758 _ => {
759 println!("-----");
760 }
761 }
762 }
763 Ok(panels)
766}
767
768#[derive(Debug, Clone)]
769pub struct SupervisedBackgroundTaskContext {
770 block_tx: Sender<BlockEvent>,
771 background_tasks_handle_uuid: Uuid,
772 action_item_id: BlockId,
773}
774impl SupervisedBackgroundTaskContext {
775 pub fn new(
776 block_tx: &Sender<BlockEvent>,
777 background_tasks_handle_uuid: &Uuid,
778 action_item_id: &BlockId,
779 ) -> Self {
780 SupervisedBackgroundTaskContext {
781 block_tx: block_tx.clone(),
782 background_tasks_handle_uuid: background_tasks_handle_uuid.clone(),
783 action_item_id: action_item_id.clone(),
784 }
785 }
786}
787
788pub async fn process_background_tasks(
789 supervised_context: Option<SupervisedBackgroundTaskContext>,
790 background_tasks_contructs_dids: Vec<(ConstructDid, ConstructDid)>,
791 background_tasks_futures: Vec<
792 Pin<Box<dyn Future<Output = Result<CommandExecutionResult, Diagnostic>> + Send>>,
793 >,
794 flow_context: &mut FlowContext,
795) -> Result<(), Diagnostic> {
796 if let Some(SupervisedBackgroundTaskContext {
797 block_tx,
798 background_tasks_handle_uuid,
799 action_item_id,
800 }) = supervised_context.as_ref()
801 {
802 let _ =
803 block_tx.send(BlockEvent::UpdateActionItems(vec![NormalizedActionItemRequestUpdate {
804 id: action_item_id.clone(),
805 action_status: Some(ActionItemStatus::Success(None)),
806 action_type: None,
807 }]));
808 let _ = block_tx.send(BlockEvent::ProgressBar(Block::new(
809 &background_tasks_handle_uuid,
810 Panel::ProgressBar(vec![]),
811 )));
812 }
813
814 let results: Vec<Result<CommandExecutionResult, Diagnostic>> =
815 txtx_addon_kit::futures::future::join_all(background_tasks_futures).await;
816 for ((nested_construct_did, construct_did), result) in
817 background_tasks_contructs_dids.into_iter().zip(results)
818 {
819 match result {
820 Ok(result) => {
821 flow_context
822 .execution_context
823 .append_commands_execution_result(&nested_construct_did, &result);
824 }
825 Err(mut diag) => {
826 let construct_id =
827 flow_context.workspace_context.expect_construct_id(&construct_did);
828 diag = diag.location(&construct_id.construct_location);
829 if let Some(command_instance) =
830 flow_context.execution_context.commands_instances.get_mut(&construct_did)
831 {
832 diag = diag.set_span_range(command_instance.block.span());
833 };
834 if let Some(SupervisedBackgroundTaskContext {
835 block_tx,
836 background_tasks_handle_uuid,
837 ..
838 }) = supervised_context.as_ref()
839 {
840 let _ = block_tx.send(BlockEvent::Error(Block {
841 uuid: Uuid::new_v4(),
842 visible: true,
843 panel: Panel::ErrorPanel(ErrorPanelData::from_diagnostics(&vec![
844 diag.clone()
845 ])),
846 }));
847 set_progress_bar_visibility(block_tx, background_tasks_handle_uuid, false);
848 }
849 return Err(diag);
850 }
851 }
852 }
853
854 Ok(())
855}
856
857pub fn set_progress_bar_visibility(
858 block_tx: &Sender<BlockEvent>,
859 background_tasks_handle_uuid: &Uuid,
860 visibility: bool,
861) {
862 let _ = block_tx.send(BlockEvent::UpdateProgressBarVisibility(
863 ProgressBarVisibilityUpdate::new(&background_tasks_handle_uuid, visibility),
864 ));
865}
866
867pub async fn process_signers_action_item_response(
868 runbook: &mut Runbook,
869 block_tx: &Sender<BlockEvent>,
870 action_item_id: &BlockId,
871 action_item_requests: &mut BTreeMap<BlockId, ActionItemRequest>,
872 action_item_responses: &BTreeMap<ConstructDid, Vec<ActionItemResponse>>,
873 current_flow_index: usize,
874) {
875 let Some((signer_construct_did, scoped_requests)) =
877 retrieve_related_action_items_requests(&action_item_id, action_item_requests)
878 else {
879 return;
880 };
881
882 let mut map = BTreeMap::new();
883 map.insert(signer_construct_did, scoped_requests);
884
885 let flow_context = runbook.flow_contexts.get_mut(current_flow_index).unwrap();
886 let mut pass_result = run_signers_evaluation(
887 &flow_context.workspace_context,
888 &mut flow_context.execution_context,
889 &mut runbook.runtime_context,
890 &runbook.supervision_context,
891 &mut map,
892 &action_item_responses,
893 &block_tx.clone(),
894 )
895 .await;
896
897 if pass_result.has_diagnostics() {
898 pass_result.fill_diagnostic_span(&runbook.sources);
899 }
900
901 if let Some(error_event) = pass_result.compile_diagnostics_to_block() {
902 let _ = block_tx.send(BlockEvent::Error(error_event));
903 } else {
904 let updated_actions = pass_result
905 .actions
906 .compile_actions_to_item_updates(&action_item_requests)
907 .into_iter()
908 .map(|u| u.normalize(&action_item_requests).unwrap())
909 .collect();
910 let _ = block_tx.send(BlockEvent::UpdateActionItems(updated_actions));
911 }
912}