aivcs_core/role_orchestration/
executor.rs1use std::future::Future;
9use std::sync::Arc;
10
11use tokio::sync::Mutex;
12use tracing::{instrument, warn};
13
14use oxidized_state::storage_traits::{ContentDigest, RunId, RunLedger, RunMetadata, RunSummary};
15
16use crate::role_orchestration::{
17 error::{RoleError, RoleResult},
18 roles::{AgentRole, HandoffToken, RoleOutput},
19};
20
21#[derive(Debug, Clone)]
23pub struct RoleRunResult {
24 pub role: AgentRole,
25 pub run_id: RunId,
27 pub output: RoleOutput,
28 pub success: bool,
29}
30
31#[derive(Debug, Clone)]
33pub struct ParallelRoleConfig {
34 pub max_concurrent: usize,
36 pub fail_fast: bool,
38}
39
40impl Default for ParallelRoleConfig {
41 fn default() -> Self {
42 Self {
43 max_concurrent: 4,
44 fail_fast: false,
45 }
46 }
47}
48
49#[instrument(skip(ledger, role_executor), fields(parent_run_id = %parent_run_id))]
60pub async fn execute_roles_parallel<F, Fut>(
61 ledger: Arc<dyn RunLedger>,
62 parent_run_id: &str,
63 roles: Vec<AgentRole>,
64 spec_digest: &ContentDigest,
65 config: ParallelRoleConfig,
66 role_executor: F,
67) -> RoleResult<Vec<RoleRunResult>>
68where
69 F: Fn(AgentRole, RunId) -> Fut + Send + Sync + 'static,
70 Fut: Future<Output = RoleResult<RoleOutput>> + Send,
71{
72 let executor = Arc::new(role_executor);
73 let results: Arc<Mutex<Vec<RoleRunResult>>> = Arc::new(Mutex::new(Vec::new()));
74 let (fail_tx, _fail_rx_guard) = tokio::sync::watch::channel(false);
75 let fail_flag = Arc::new(fail_tx);
76
77 let sem = Arc::new(tokio::sync::Semaphore::new(config.max_concurrent));
79
80 let mut tasks = Vec::new();
81
82 for role in roles {
83 let ledger = Arc::clone(&ledger);
84 let spec_digest = spec_digest.clone();
85 let executor = Arc::clone(&executor);
86 let results = Arc::clone(&results);
87 let fail_flag = Arc::clone(&fail_flag);
88 let fail_rx = fail_flag.subscribe();
89 let config = config.clone();
90 let parent_id = parent_run_id.to_string();
91 let sem = Arc::clone(&sem);
92
93 let task = tokio::spawn(async move {
94 let _permit = match sem.acquire_owned().await {
95 Ok(permit) => permit,
96 Err(e) => {
97 warn!(role = %role, error = %e, "failed to acquire semaphore permit");
98 return;
99 }
100 };
101
102 if *fail_rx.borrow() {
104 return;
105 }
106
107 let metadata = RunMetadata {
108 git_sha: None,
109 agent_name: format!("role:{role}"),
110 tags: serde_json::json!({
111 "parent_run_id": parent_id,
112 "role": role.to_string(),
113 }),
114 };
115
116 let run_id = match ledger.create_run(&spec_digest, metadata).await {
117 Ok(id) => id,
118 Err(e) => {
119 warn!(role = %role, error = %e, "failed to create run for role");
120 if config.fail_fast {
121 let _ = fail_flag.send(true);
122 }
123 results.lock().await.push(RoleRunResult {
124 role,
125 run_id: RunId::new(),
126 output: RoleOutput::Fix {
127 patch_digest: String::new(),
128 resolved_issues: vec![format!("failed to create run: {e}")],
129 },
130 success: false,
131 });
132 return;
133 }
134 };
135
136 match executor(role.clone(), run_id.clone()).await {
137 Ok(output) => {
138 let _ = ledger
139 .complete_run(
140 &run_id,
141 RunSummary {
142 total_events: 1,
143 final_state_digest: None,
144 duration_ms: 0,
145 success: true,
146 },
147 )
148 .await;
149
150 results.lock().await.push(RoleRunResult {
151 role,
152 run_id,
153 output,
154 success: true,
155 });
156 }
157 Err(e) => {
158 warn!(role = %role, error = %e, "role execution failed");
159 let _ = ledger
160 .fail_run(
161 &run_id,
162 RunSummary {
163 total_events: 0,
164 final_state_digest: None,
165 duration_ms: 0,
166 success: false,
167 },
168 )
169 .await;
170
171 if config.fail_fast {
172 let _ = fail_flag.send(true);
173 }
174
175 results.lock().await.push(RoleRunResult {
176 role,
177 run_id,
178 output: RoleOutput::Fix {
179 patch_digest: String::new(),
180 resolved_issues: vec![e.to_string()],
181 },
182 success: false,
183 });
184 }
185 }
186 });
187
188 tasks.push(task);
189 }
190
191 for task in tasks {
192 let _ = task.await;
193 }
194
195 let guard = results.lock().await;
196 let results_vec: Vec<RoleRunResult> = guard.clone();
197 drop(guard);
198
199 if !results_vec.is_empty() && results_vec.iter().all(|r| !r.success) {
200 return Err(RoleError::ParallelExecutionFailed {
201 detail: "all parallel role runs failed".to_string(),
202 });
203 }
204
205 Ok(results_vec)
206}
207
208pub fn token_from_result(result: RoleRunResult) -> RoleResult<HandoffToken> {
212 HandoffToken::new(result.output).map_err(RoleError::Domain)
213}