1#[macro_use]
2extern crate lazy_static;
3
4#[macro_use]
5pub extern crate txtx_addon_kit as kit;
6
7pub extern crate mustache;
8
9mod constants;
10pub mod errors;
11pub mod eval;
12pub mod manifest;
13pub mod runbook;
15pub mod std;
16pub mod templates;
17pub mod types;
18pub mod validation;
19
20#[cfg(test)]
21mod tests;
22pub mod utils;
23
24use ::std::collections::BTreeMap;
25use ::std::future::Future;
26use ::std::pin::Pin;
27use ::std::thread::sleep;
28use ::std::time::Duration;
29
30use crate::runbook::flow_context::FlowContext;
31use constants::ACTION_ITEM_ENV;
32use constants::ACTION_ITEM_GENESIS;
33use constants::ACTION_ITEM_VALIDATE_BLOCK;
34use eval::run_constructs_evaluation;
35use eval::run_signers_evaluation;
36use kit::constants::ACTION_ITEM_CHECK_BALANCE;
37use runbook::get_source_context_for_diagnostic;
38use tokio::sync::broadcast::error::TryRecvError;
39use txtx_addon_kit::channel::Sender;
40use txtx_addon_kit::constants::ACTION_ITEM_CHECK_ADDRESS;
41use txtx_addon_kit::hcl::Span;
42use txtx_addon_kit::types::block_id::BlockId;
43use txtx_addon_kit::types::commands::CommandExecutionResult;
44use txtx_addon_kit::types::diagnostics::Diagnostic;
45use txtx_addon_kit::types::frontend::ActionItemRequest;
46use txtx_addon_kit::types::frontend::ActionItemRequestType;
47use txtx_addon_kit::types::frontend::ActionItemRequestUpdate;
48use txtx_addon_kit::types::frontend::ActionItemResponse;
49use txtx_addon_kit::types::frontend::ActionItemResponseType;
50use txtx_addon_kit::types::frontend::ActionItemStatus;
51use txtx_addon_kit::types::frontend::Actions;
52use txtx_addon_kit::types::frontend::Block;
53use txtx_addon_kit::types::frontend::BlockEvent;
54use txtx_addon_kit::types::frontend::ErrorPanelData;
55use txtx_addon_kit::types::frontend::InputOption;
56use txtx_addon_kit::types::frontend::NormalizedActionItemRequestUpdate;
57use txtx_addon_kit::types::frontend::Panel;
58use txtx_addon_kit::types::frontend::PickInputOptionRequest;
59use txtx_addon_kit::types::frontend::ReviewedInputResponse;
60use txtx_addon_kit::types::frontend::ValidateBlockData;
61use txtx_addon_kit::types::types::RunbookSupervisionContext;
62use txtx_addon_kit::types::ConstructDid;
63use txtx_addon_kit::uuid::Uuid;
64use types::Runbook;
65
66lazy_static! {
67 pub static ref SET_ENV_ACTION: ActionItemRequest =ActionItemRequestType::PickInputOption(PickInputOptionRequest {
69 options: vec![],
70 selected: InputOption::default(),
71 }).to_request("", ACTION_ITEM_ENV)
72 .with_meta_description("Select the environment to target")
73 .with_status(ActionItemStatus::Success(None))
74 ;
75}
76
77pub async fn start_unsupervised_runbook_runloop(
78 runbook: &mut Runbook,
79 progress_tx: &txtx_addon_kit::channel::Sender<BlockEvent>,
80) -> Result<(), Vec<Diagnostic>> {
81 runbook.supervision_context = RunbookSupervisionContext {
82 review_input_default_values: false,
83 review_input_values: false,
84 is_supervised: false,
85 };
86
87 for flow_context in runbook.flow_contexts.iter_mut() {
88 if !flow_context.is_enabled() {
89 continue;
90 }
91
92 let mut action_item_requests = BTreeMap::new();
93 let action_item_responses = BTreeMap::new();
94
95 let pass_results = run_signers_evaluation(
96 &flow_context.workspace_context,
97 &mut flow_context.execution_context,
98 &runbook.runtime_context,
99 &runbook.supervision_context,
100 &mut action_item_requests,
101 &action_item_responses,
102 &progress_tx,
103 )
104 .await;
105
106 if pass_results.actions.has_pending_actions() {
107 return Err(vec![diagnosed_error!(
108 "unsupervised executions should not be generating actions"
109 )]);
110 }
111
112 if pass_results.has_diagnostics() {
113 return Err(pass_results.with_spans_filled(&runbook.sources));
114 }
115
116 let mut uuid = Uuid::new_v4();
117 let mut background_tasks_futures = vec![];
118 let mut background_tasks_contructs_dids = vec![];
119 let mut runbook_completed = false;
120
121 loop {
122 let mut pass_results = run_constructs_evaluation(
123 &uuid,
124 &flow_context.workspace_context,
125 &mut flow_context.execution_context,
126 &mut runbook.runtime_context,
127 &runbook.supervision_context,
128 &mut action_item_requests,
129 &action_item_responses,
130 &progress_tx,
131 )
132 .await;
133
134 if pass_results.has_diagnostics() {
135 return Err(pass_results.with_spans_filled(&runbook.sources));
136 }
137
138 if !pass_results.pending_background_tasks_constructs_uuids.is_empty() {
139 background_tasks_futures.append(&mut pass_results.pending_background_tasks_futures);
140 background_tasks_contructs_dids
141 .append(&mut pass_results.pending_background_tasks_constructs_uuids);
142 }
143
144 if !pass_results.actions.has_pending_actions()
145 && background_tasks_contructs_dids.is_empty()
146 && pass_results.nodes_to_re_execute.is_empty()
147 {
148 runbook_completed = true;
149 }
150
151 if background_tasks_futures.is_empty() {
152 } else {
154 process_background_tasks(
155 None,
156 background_tasks_contructs_dids,
157 background_tasks_futures,
158 flow_context,
159 )
160 .await
161 .map_err(|mut diag| {
162 diag.span = get_source_context_for_diagnostic(&diag, &runbook.sources);
163 vec![diag]
164 })?;
165 background_tasks_futures = vec![];
166 background_tasks_contructs_dids = vec![];
167 }
168
169 uuid = Uuid::new_v4();
170 if runbook_completed {
171 break;
172 }
173 }
174 }
175
176 Ok(())
177}
178
179pub async fn start_supervised_runbook_runloop(
180 runbook: &mut Runbook,
181 block_tx: Sender<BlockEvent>,
182 mut action_item_responses_rx: tokio::sync::broadcast::Receiver<ActionItemResponse>,
183) -> Result<(), Vec<Diagnostic>> {
184 let mut intialized_flow_index: i16 = -1;
187 runbook.supervision_context = RunbookSupervisionContext {
188 review_input_default_values: true,
189 review_input_values: true,
190 is_supervised: true,
191 };
192
193 let mut flow_action_item_requests: BTreeMap<usize, BTreeMap<BlockId, ActionItemRequest>> =
198 BTreeMap::new();
199 let mut flow_action_item_responses = BTreeMap::new();
201
202 let mut background_tasks_futures = vec![];
203 let mut background_tasks_contructs_dids = vec![];
204 let mut background_tasks_handle_uuid = Uuid::new_v4();
205 let mut validated_blocks = 0;
206 let total_flows_count = runbook.flow_contexts.len();
207 let mut current_flow_index: usize = 0;
208 loop {
209 let event_opt = match action_item_responses_rx.try_recv() {
210 Ok(action) => Some(action),
211 Err(TryRecvError::Empty) | Err(TryRecvError::Lagged(_)) => None,
212 Err(TryRecvError::Closed) => return Ok(()),
213 };
214
215 if intialized_flow_index != current_flow_index as i16 {
216 intialized_flow_index = current_flow_index as i16;
217
218 flow_action_item_responses.insert(current_flow_index, BTreeMap::new());
219 flow_action_item_requests.insert(current_flow_index, BTreeMap::new());
220
221 let action_item_responses =
222 flow_action_item_responses.get_mut(¤t_flow_index).unwrap();
223 let mut action_item_requests =
224 flow_action_item_requests.get_mut(¤t_flow_index).unwrap();
225
226 let genesis_events = build_genesis_panel(
227 runbook,
228 &mut action_item_requests,
229 &action_item_responses,
230 &block_tx.clone(),
231 validated_blocks,
232 current_flow_index,
233 total_flows_count,
234 )
235 .await?;
236 for event in genesis_events {
237 let _ = block_tx.send(event).unwrap();
238 }
239 }
240 let action_item_responses =
241 flow_action_item_responses.get_mut(¤t_flow_index).unwrap();
242 let mut action_item_requests =
243 flow_action_item_requests.get_mut(¤t_flow_index).unwrap();
244
245 let Some(action_item_response) = event_opt else {
247 sleep(Duration::from_millis(50));
248 continue;
249 };
250 let ActionItemResponse { action_item_id, payload } = action_item_response.clone();
251
252 if action_item_id == SET_ENV_ACTION.id {
253 if let Err(diags) = reset_runbook_execution(
254 runbook,
255 &payload,
256 &mut action_item_requests,
257 &action_item_responses,
258 &block_tx.clone(),
259 current_flow_index,
260 total_flows_count,
261 )
262 .await
263 {
264 let _ = block_tx.send(BlockEvent::Error(Block {
265 uuid: Uuid::new_v4(),
266 visible: true,
267 panel: Panel::ErrorPanel(ErrorPanelData::from_diagnostics(&diags)),
268 }));
269 return Err(diags);
270 };
271 continue;
272 }
273
274 if let Some(action_item) = action_item_requests.get(&action_item_id) {
275 let action_item = action_item.clone();
276 if let Some(construct_did) = action_item.construct_did {
277 if let Some(responses) = action_item_responses.get_mut(&construct_did) {
278 responses.push(action_item_response);
279 } else {
280 action_item_responses.insert(construct_did, vec![action_item_response]);
281 }
282 }
283 }
284
285 match &payload {
286 ActionItemResponseType::ValidateModal => {}
287 ActionItemResponseType::ValidateBlock => {
288 let mut bg_uuid_initialized = false;
291
292 loop {
303 let start_of_loop_had_bg_tasks = !background_tasks_futures.is_empty();
304 if start_of_loop_had_bg_tasks {
306 let flow_context =
307 runbook.flow_contexts.get_mut(current_flow_index).unwrap();
308 let supervised_bg_context = if bg_uuid_initialized {
309 None
310 } else {
311 Some(SupervisedBackgroundTaskContext::new(&block_tx, &action_item_id))
312 };
313 process_background_tasks(
314 supervised_bg_context,
315 background_tasks_contructs_dids,
316 background_tasks_futures,
317 flow_context,
318 )
319 .await
320 .map_err(|mut diag| {
321 diag.span = get_source_context_for_diagnostic(&diag, &runbook.sources);
322 vec![diag]
323 })?;
324 bg_uuid_initialized = true;
325 background_tasks_futures = vec![];
326 background_tasks_contructs_dids = vec![];
327 }
328
329 let mut flow_execution_completed = false;
331 let mut map: BTreeMap<ConstructDid, _> = BTreeMap::new();
332 let flow_context = runbook.flow_contexts.get_mut(current_flow_index).unwrap();
333 let mut pass_results = run_constructs_evaluation(
334 &background_tasks_handle_uuid,
335 &flow_context.workspace_context,
336 &mut flow_context.execution_context,
337 &mut runbook.runtime_context,
338 &runbook.supervision_context,
339 &mut map,
340 &action_item_responses,
341 &block_tx.clone(),
342 )
343 .await;
344
345 if let Some(error_event) = pass_results.compile_diagnostics_to_block() {
347 let _ = block_tx.send(BlockEvent::Error(error_event));
348 return Err(pass_results.with_spans_filled(&runbook.sources));
349 }
350
351 let pass_has_pending_bg_tasks =
352 !pass_results.pending_background_tasks_constructs_uuids.is_empty();
353 let pass_has_pending_actions = pass_results.actions.has_pending_actions();
354 let pass_has_nodes_to_re_execute = !pass_results.nodes_to_re_execute.is_empty();
355
356 let additional_info = flow_context
357 .execution_context
358 .commands_execution_results
359 .values()
360 .filter_map(|result| result.runbook_complete_additional_info())
361 .collect::<Vec<_>>();
362
363 if !pass_has_pending_actions
364 && !pass_has_pending_bg_tasks
365 && !pass_has_nodes_to_re_execute
366 {
367 let flow_context =
368 runbook.flow_contexts.get_mut(current_flow_index).unwrap();
369 let grouped_actions_items =
370 flow_context.execution_context.collect_outputs_constructs_results(
371 &runbook.runtime_context.authorization_context,
372 );
373 let mut actions = Actions::new_panel("output review", "");
374 for (key, action_items) in grouped_actions_items.into_iter() {
375 actions.push_group(key.as_str(), action_items);
376 }
377 pass_results.actions.append(&mut actions);
378
379 flow_execution_completed = true;
380 } else if !pass_results.actions.store.is_empty() {
381 validated_blocks = validated_blocks + 1;
382 pass_results.actions.push_sub_group(
383 None,
384 vec![ActionItemRequestType::ValidateBlock(ValidateBlockData::new(
385 validated_blocks,
386 ))
387 .to_request("Validate", ACTION_ITEM_VALIDATE_BLOCK)],
388 );
389 }
390
391 if pass_has_pending_bg_tasks {
392 background_tasks_futures
393 .append(&mut pass_results.pending_background_tasks_futures);
394 background_tasks_contructs_dids
395 .append(&mut pass_results.pending_background_tasks_constructs_uuids);
396 }
397
398 if !pass_has_pending_bg_tasks && !start_of_loop_had_bg_tasks {
399 let update = ActionItemRequestUpdate::from_id(&action_item_id)
400 .set_status(ActionItemStatus::Success(None));
401 pass_results.actions.push_action_item_update(update);
402 for new_request in
403 pass_results.actions.get_new_action_item_requests().into_iter()
404 {
405 action_item_requests
406 .insert(new_request.id.clone(), new_request.clone());
407 }
408 let block_events = pass_results
409 .actions
410 .compile_actions_to_block_events(&action_item_requests);
411
412 for event in block_events.into_iter() {
413 let _ = block_tx.send(event);
414 }
415 }
416 if flow_execution_completed && !start_of_loop_had_bg_tasks {
417 if current_flow_index == total_flows_count - 1 {
418 let _ = block_tx.send(BlockEvent::RunbookCompleted(additional_info));
419 return Ok(());
420 } else {
421 current_flow_index += 1;
422 }
423 }
424 if !pass_has_pending_bg_tasks
425 && !start_of_loop_had_bg_tasks
426 && !pass_has_nodes_to_re_execute
427 {
428 background_tasks_handle_uuid = Uuid::new_v4();
429 break;
430 }
431 }
432 }
433 ActionItemResponseType::PickInputOption(_) => {}
434 ActionItemResponseType::ProvideInput(_) => {}
435 ActionItemResponseType::ReviewInput(ReviewedInputResponse {
436 value_checked,
437 force_execution,
438 ..
439 }) => {
440 let new_status = match value_checked {
441 true => ActionItemStatus::Success(None),
442 false => ActionItemStatus::Todo,
443 };
444 if let Some(update) = ActionItemRequestUpdate::from_id(&action_item_id)
445 .set_status(new_status)
446 .normalize(&action_item_requests)
447 {
448 let _ = block_tx.send(BlockEvent::UpdateActionItems(vec![update]));
449 }
450 if let Some(request) = action_item_requests.get(&action_item_id) {
454 if request.internal_key == ACTION_ITEM_CHECK_ADDRESS
455 || request.internal_key == ACTION_ITEM_CHECK_BALANCE
456 {
457 process_signers_action_item_response(
458 runbook,
459 &block_tx,
460 &action_item_id,
461 &mut action_item_requests,
462 &action_item_responses,
463 current_flow_index,
464 )
465 .await;
466 }
467 }
468
469 if *force_execution {
470 let running_context =
471 runbook.flow_contexts.get_mut(current_flow_index).unwrap();
472 let mut pass_results = run_constructs_evaluation(
473 &background_tasks_handle_uuid,
474 &running_context.workspace_context,
475 &mut running_context.execution_context,
476 &mut runbook.runtime_context,
477 &runbook.supervision_context,
478 &mut BTreeMap::new(),
479 &action_item_responses,
480 &block_tx.clone(),
481 )
482 .await;
483 let mut updated_actions = vec![];
484 for action in pass_results
485 .actions
486 .compile_actions_to_item_updates(&action_item_requests)
487 .into_iter()
488 {
489 updated_actions.push(action.normalize(&action_item_requests).unwrap())
490 }
491 let _ = block_tx.send(BlockEvent::UpdateActionItems(updated_actions));
492
493 if !pass_results.pending_background_tasks_constructs_uuids.is_empty() {
494 background_tasks_futures
495 .append(&mut pass_results.pending_background_tasks_futures);
496 background_tasks_contructs_dids
497 .append(&mut pass_results.pending_background_tasks_constructs_uuids);
498 }
499
500 if pass_results.has_diagnostics() {
501 pass_results.fill_diagnostic_span(&runbook.sources);
502 }
503 if let Some(error_event) = pass_results.compile_diagnostics_to_block() {
504 let _ = block_tx.send(BlockEvent::Error(error_event));
505 return Err(pass_results.with_spans_filled(&runbook.sources));
506 }
507 }
508 }
509 ActionItemResponseType::ProvidePublicKey(_response) => {
510 process_signers_action_item_response(
511 runbook,
512 &block_tx,
513 &action_item_id,
514 &mut action_item_requests,
515 &action_item_responses,
516 current_flow_index,
517 )
518 .await;
519 }
520 ActionItemResponseType::VerifyThirdPartySignature(_)
521 | ActionItemResponseType::ProvideSignedTransaction(_)
522 | ActionItemResponseType::SendTransaction(_)
523 | ActionItemResponseType::ProvideSignedMessage(_) => {
524 let Some((signing_action_construct_did, scoped_requests)) =
526 retrieve_related_action_items_requests(
527 &action_item_id,
528 &mut action_item_requests,
529 )
530 else {
531 continue;
532 };
533 let mut map: BTreeMap<ConstructDid, _> = BTreeMap::new();
534 map.insert(signing_action_construct_did, scoped_requests);
535
536 let running_context = runbook.flow_contexts.get_mut(current_flow_index).unwrap();
537 let mut pass_results = run_constructs_evaluation(
538 &background_tasks_handle_uuid,
539 &running_context.workspace_context,
540 &mut running_context.execution_context,
541 &mut runbook.runtime_context,
542 &runbook.supervision_context,
543 &mut map,
544 &action_item_responses,
545 &block_tx.clone(),
546 )
547 .await;
548
549 let mut updated_actions = vec![];
550 for action in pass_results
551 .actions
552 .compile_actions_to_item_updates(&action_item_requests)
553 .into_iter()
554 {
555 updated_actions.push(action.normalize(&action_item_requests).unwrap())
556 }
557
558 let _ = block_tx.send(BlockEvent::UpdateActionItems(updated_actions));
559
560 if !pass_results.pending_background_tasks_constructs_uuids.is_empty() {
561 background_tasks_futures
562 .append(&mut pass_results.pending_background_tasks_futures);
563 background_tasks_contructs_dids
564 .append(&mut pass_results.pending_background_tasks_constructs_uuids);
565 }
566 if pass_results.has_diagnostics() {
567 pass_results.fill_diagnostic_span(&runbook.sources);
568 }
569 if let Some(error_event) = pass_results.compile_diagnostics_to_block() {
570 let _ = block_tx.send(BlockEvent::Error(error_event));
571 return Err(pass_results.with_spans_filled(&runbook.sources));
572 }
573 }
574 };
575 }
576}
577
578pub fn register_action_items_from_actions(
579 actions: &Actions,
580 action_item_requests: &mut BTreeMap<BlockId, ActionItemRequest>,
581) {
582 for action in actions.get_new_action_item_requests().into_iter() {
583 action_item_requests.insert(action.id.clone(), action.clone());
584 }
585}
586
587pub fn retrieve_related_action_items_requests<'a>(
588 action_item_id: &BlockId,
589 action_item_requests: &'a mut BTreeMap<BlockId, ActionItemRequest>,
590) -> Option<(ConstructDid, Vec<&'a mut ActionItemRequest>)> {
591 let Some(signer_construct_did) =
592 action_item_requests.get(&action_item_id).and_then(|a| a.construct_did.clone())
593 else {
594 eprintln!("unable to retrieve {}", action_item_id);
595 return None;
597 };
598 let mut scoped_requests = vec![];
601 for (_, request) in action_item_requests.iter_mut() {
602 let Some(ref construct_did) = request.construct_did else {
603 continue;
604 };
605 if construct_did.eq(&signer_construct_did) {
606 scoped_requests.push(request);
607 }
608 }
609 Some((signer_construct_did, scoped_requests))
610}
611
612pub async fn reset_runbook_execution(
613 runbook: &mut Runbook,
614 payload: &ActionItemResponseType,
615 action_item_requests: &mut BTreeMap<BlockId, ActionItemRequest>,
616 action_item_responses: &BTreeMap<ConstructDid, Vec<ActionItemResponse>>,
617 progress_tx: &Sender<BlockEvent>,
618 current_flow_index: usize,
619 total_flows_count: usize,
620) -> Result<(), Vec<Diagnostic>> {
621 let ActionItemResponseType::PickInputOption(environment_key) = payload else {
622 unreachable!(
623 "Action item event wih environment uuid sent with invalid payload {:?}",
624 payload
625 );
626 };
627
628 let reset = runbook.update_inputs_selector(Some(environment_key.to_string()), true).await?;
629
630 if !reset {
631 unimplemented!()
632 }
633
634 let _ = progress_tx.send(BlockEvent::Clear);
635 let genesis_events = build_genesis_panel(
636 runbook,
637 action_item_requests,
638 action_item_responses,
639 &progress_tx,
640 0,
641 current_flow_index,
642 total_flows_count,
643 )
644 .await?;
645 for event in genesis_events {
646 let _ = progress_tx.send(event).unwrap();
647 }
648 Ok(())
649}
650
651pub async fn build_genesis_panel(
652 runbook: &mut Runbook,
653 action_item_requests: &mut BTreeMap<BlockId, ActionItemRequest>,
654 action_item_responses: &BTreeMap<ConstructDid, Vec<ActionItemResponse>>,
655 progress_tx: &Sender<BlockEvent>,
656 validated_blocks: usize,
657 current_flow_index: usize,
658 total_flows_count: usize,
659) -> Result<Vec<BlockEvent>, Vec<Diagnostic>> {
660 let mut actions = Actions::none();
661
662 let environments = runbook.get_inputs_selectors();
663 let selector = runbook.get_active_inputs_selector();
664
665 let Some(flow_context) = runbook.flow_contexts.get_mut(current_flow_index) else {
666 return Err(vec![diagnosed_error!(
667 "internal error: attempted to access a flow that does not exist"
668 )]);
669 };
670
671 if total_flows_count > 1 {
672 actions.push_begin_flow_panel(
673 current_flow_index,
674 total_flows_count,
675 &flow_context.name,
676 &flow_context.description,
677 );
678 } else {
679 }
680 actions.push_panel("runbook checklist", "");
681
682 if environments.len() > 0 {
683 let input_options: Vec<InputOption> = environments
684 .iter()
685 .map(|k| InputOption { value: k.to_string(), displayed_value: k.to_string() })
686 .collect();
687 let selected_option: InputOption = selector
688 .clone()
689 .and_then(|e| Some(InputOption { value: e.clone(), displayed_value: e.clone() }))
690 .unwrap_or({
691 let k = environments.iter().next().unwrap();
692 InputOption { value: k.clone(), displayed_value: k.clone() }
693 });
694
695 let action_request = ActionItemRequestType::PickInputOption(PickInputOptionRequest {
696 options: input_options,
697 selected: selected_option,
698 })
699 .to_request("", ACTION_ITEM_ENV)
700 .with_meta_description("Select the environment to target")
701 .with_status(ActionItemStatus::Success(None));
702
703 actions.push_sub_group(None, vec![action_request]);
704 }
705
706 let mut pass_result: eval::EvaluationPassResult = run_signers_evaluation(
707 &flow_context.workspace_context,
708 &mut flow_context.execution_context,
709 &runbook.runtime_context,
710 &runbook.supervision_context,
711 &mut BTreeMap::new(),
712 &action_item_responses,
713 &progress_tx,
714 )
715 .await;
716
717 if pass_result.has_diagnostics() {
718 return Err(pass_result.with_spans_filled(&runbook.sources));
719 }
720
721 actions.append(&mut pass_result.actions);
722
723 let validate_action =
724 ActionItemRequestType::ValidateBlock(ValidateBlockData::new(validated_blocks))
725 .to_request("start runbook", ACTION_ITEM_GENESIS);
726
727 actions.push_sub_group(None, vec![validate_action]);
728
729 register_action_items_from_actions(&actions, action_item_requests);
730
731 let panels = actions.compile_actions_to_block_events(&action_item_requests);
732 for panel in panels.iter() {
733 match panel {
734 BlockEvent::Modal(_) => {}
735 BlockEvent::Action(_) => {}
736 _ => {
737 println!("-----");
738 }
739 }
740 }
741 Ok(panels)
744}
745
746#[derive(Debug, Clone)]
747pub struct SupervisedBackgroundTaskContext {
748 block_tx: Sender<BlockEvent>,
749 action_item_id: BlockId,
750}
751impl SupervisedBackgroundTaskContext {
752 pub fn new(block_tx: &Sender<BlockEvent>, action_item_id: &BlockId) -> Self {
753 SupervisedBackgroundTaskContext {
754 block_tx: block_tx.clone(),
755 action_item_id: action_item_id.clone(),
756 }
757 }
758}
759
760pub async fn process_background_tasks(
761 supervised_context: Option<SupervisedBackgroundTaskContext>,
762 background_tasks_contructs_dids: Vec<(ConstructDid, ConstructDid)>,
763 background_tasks_futures: Vec<
764 Pin<Box<dyn Future<Output = Result<CommandExecutionResult, Diagnostic>> + Send>>,
765 >,
766 flow_context: &mut FlowContext,
767) -> Result<(), Diagnostic> {
768 if let Some(SupervisedBackgroundTaskContext { block_tx, action_item_id, .. }) =
769 supervised_context.as_ref()
770 {
771 let _ =
772 block_tx.send(BlockEvent::UpdateActionItems(vec![NormalizedActionItemRequestUpdate {
773 id: action_item_id.clone(),
774 action_status: Some(ActionItemStatus::Success(None)),
775 action_type: None,
776 }]));
777 }
778
779 let results: Vec<Result<CommandExecutionResult, Diagnostic>> =
780 txtx_addon_kit::futures::future::join_all(background_tasks_futures).await;
781 for ((nested_construct_did, construct_did), result) in
782 background_tasks_contructs_dids.into_iter().zip(results)
783 {
784 match result {
785 Ok(result) => {
786 flow_context
787 .execution_context
788 .append_commands_execution_result(&nested_construct_did, &result);
789 }
790 Err(mut diag) => {
791 let construct_id =
792 flow_context.workspace_context.expect_construct_id(&construct_did);
793 diag = diag.location(&construct_id.construct_location);
794 if let Some(command_instance) =
795 flow_context.execution_context.commands_instances.get_mut(&construct_did)
796 {
797 diag = diag.set_span_range(command_instance.block.span());
798 };
799 if let Some(SupervisedBackgroundTaskContext { block_tx, .. }) =
800 supervised_context.as_ref()
801 {
802 let _ = block_tx.send(BlockEvent::Error(Block {
803 uuid: Uuid::new_v4(),
804 visible: true,
805 panel: Panel::ErrorPanel(ErrorPanelData::from_diagnostics(&vec![
806 diag.clone()
807 ])),
808 }));
809 }
810 return Err(diag);
811 }
812 }
813 }
814
815 Ok(())
816}
817
818pub async fn process_signers_action_item_response(
819 runbook: &mut Runbook,
820 block_tx: &Sender<BlockEvent>,
821 action_item_id: &BlockId,
822 action_item_requests: &mut BTreeMap<BlockId, ActionItemRequest>,
823 action_item_responses: &BTreeMap<ConstructDid, Vec<ActionItemResponse>>,
824 current_flow_index: usize,
825) {
826 let Some((signer_construct_did, scoped_requests)) =
828 retrieve_related_action_items_requests(&action_item_id, action_item_requests)
829 else {
830 return;
831 };
832
833 let mut map = BTreeMap::new();
834 map.insert(signer_construct_did, scoped_requests);
835
836 let flow_context = runbook.flow_contexts.get_mut(current_flow_index).unwrap();
837 let mut pass_result = run_signers_evaluation(
838 &flow_context.workspace_context,
839 &mut flow_context.execution_context,
840 &mut runbook.runtime_context,
841 &runbook.supervision_context,
842 &mut map,
843 &action_item_responses,
844 &block_tx.clone(),
845 )
846 .await;
847
848 if pass_result.has_diagnostics() {
849 pass_result.fill_diagnostic_span(&runbook.sources);
850 }
851
852 if let Some(error_event) = pass_result.compile_diagnostics_to_block() {
853 let _ = block_tx.send(BlockEvent::Error(error_event));
854 } else {
855 let updated_actions = pass_result
856 .actions
857 .compile_actions_to_item_updates(&action_item_requests)
858 .into_iter()
859 .map(|u| u.normalize(&action_item_requests).unwrap())
860 .collect();
861 let _ = block_tx.send(BlockEvent::UpdateActionItems(updated_actions));
862 }
863}