Skip to main content

aivcs_core/role_orchestration/
executor.rs

1//! State-safe parallel role execution.
2//!
3//! Each role is assigned an isolated [`RunId`] in the shared [`RunLedger`], so
4//! writes from role A can never overwrite role B's events. The caller injects
5//! a `role_executor` async closure for testability; in production this closure
6//! dispatches to the real agent runtime.
7
8use std::future::Future;
9use std::sync::Arc;
10
11use tokio::sync::Mutex;
12use tracing::{instrument, warn};
13
14use oxidized_state::storage_traits::{ContentDigest, RunId, RunLedger, RunMetadata, RunSummary};
15
16use crate::role_orchestration::{
17    error::{RoleError, RoleResult},
18    roles::{AgentRole, HandoffToken, RoleOutput},
19};
20
21/// The outcome of a single role run.
22#[derive(Debug, Clone)]
23pub struct RoleRunResult {
24    pub role: AgentRole,
25    /// The isolated `RunId` created for this role's ledger entries.
26    pub run_id: RunId,
27    pub output: RoleOutput,
28    pub success: bool,
29}
30
31/// Configuration for a parallel role execution batch.
32#[derive(Debug, Clone)]
33pub struct ParallelRoleConfig {
34    /// Maximum number of concurrent role tasks.
35    pub max_concurrent: usize,
36    /// Abort remaining tasks as soon as one fails.
37    pub fail_fast: bool,
38}
39
40impl Default for ParallelRoleConfig {
41    fn default() -> Self {
42        Self {
43            max_concurrent: 4,
44            fail_fast: false,
45        }
46    }
47}
48
49/// Execute `roles` in parallel, each in an isolated ledger namespace.
50///
51/// `role_executor` is an async closure `(AgentRole, RunId) -> RoleResult<RoleOutput>`.
52/// Inject a deterministic stub in tests; wire to the real agent runtime in production.
53///
54/// Each role gets its own [`RunId`] scoped with `parent_run_id` as a tag, so
55/// cross-role ledger contamination is structurally impossible.
56///
57/// Returns `Err(RoleError::ParallelExecutionFailed)` only when *every* role fails.
58/// Individual role failures are recorded in `RoleRunResult::success = false`.
59#[instrument(skip(ledger, role_executor), fields(parent_run_id = %parent_run_id))]
60pub async fn execute_roles_parallel<F, Fut>(
61    ledger: Arc<dyn RunLedger>,
62    parent_run_id: &str,
63    roles: Vec<AgentRole>,
64    spec_digest: &ContentDigest,
65    config: ParallelRoleConfig,
66    role_executor: F,
67) -> RoleResult<Vec<RoleRunResult>>
68where
69    F: Fn(AgentRole, RunId) -> Fut + Send + Sync + 'static,
70    Fut: Future<Output = RoleResult<RoleOutput>> + Send,
71{
72    let executor = Arc::new(role_executor);
73    let results: Arc<Mutex<Vec<RoleRunResult>>> = Arc::new(Mutex::new(Vec::new()));
74    let (fail_tx, _fail_rx_guard) = tokio::sync::watch::channel(false);
75    let fail_flag = Arc::new(fail_tx);
76
77    // Semaphore enforces max_concurrent
78    let sem = Arc::new(tokio::sync::Semaphore::new(config.max_concurrent));
79
80    let mut tasks = Vec::new();
81
82    for role in roles {
83        let ledger = Arc::clone(&ledger);
84        let spec_digest = spec_digest.clone();
85        let executor = Arc::clone(&executor);
86        let results = Arc::clone(&results);
87        let fail_flag = Arc::clone(&fail_flag);
88        let fail_rx = fail_flag.subscribe();
89        let config = config.clone();
90        let parent_id = parent_run_id.to_string();
91        let sem = Arc::clone(&sem);
92
93        let task = tokio::spawn(async move {
94            let _permit = match sem.acquire_owned().await {
95                Ok(permit) => permit,
96                Err(e) => {
97                    warn!(role = %role, error = %e, "failed to acquire semaphore permit");
98                    return;
99                }
100            };
101
102            // Abort early if fail_fast was triggered by a sibling.
103            if *fail_rx.borrow() {
104                return;
105            }
106
107            let metadata = RunMetadata {
108                git_sha: None,
109                agent_name: format!("role:{role}"),
110                tags: serde_json::json!({
111                    "parent_run_id": parent_id,
112                    "role": role.to_string(),
113                }),
114            };
115
116            let run_id = match ledger.create_run(&spec_digest, metadata).await {
117                Ok(id) => id,
118                Err(e) => {
119                    warn!(role = %role, error = %e, "failed to create run for role");
120                    if config.fail_fast {
121                        let _ = fail_flag.send(true);
122                    }
123                    results.lock().await.push(RoleRunResult {
124                        role,
125                        run_id: RunId::new(),
126                        output: RoleOutput::Fix {
127                            patch_digest: String::new(),
128                            resolved_issues: vec![format!("failed to create run: {e}")],
129                        },
130                        success: false,
131                    });
132                    return;
133                }
134            };
135
136            match executor(role.clone(), run_id.clone()).await {
137                Ok(output) => {
138                    let _ = ledger
139                        .complete_run(
140                            &run_id,
141                            RunSummary {
142                                total_events: 1,
143                                final_state_digest: None,
144                                duration_ms: 0,
145                                success: true,
146                            },
147                        )
148                        .await;
149
150                    results.lock().await.push(RoleRunResult {
151                        role,
152                        run_id,
153                        output,
154                        success: true,
155                    });
156                }
157                Err(e) => {
158                    warn!(role = %role, error = %e, "role execution failed");
159                    let _ = ledger
160                        .fail_run(
161                            &run_id,
162                            RunSummary {
163                                total_events: 0,
164                                final_state_digest: None,
165                                duration_ms: 0,
166                                success: false,
167                            },
168                        )
169                        .await;
170
171                    if config.fail_fast {
172                        let _ = fail_flag.send(true);
173                    }
174
175                    results.lock().await.push(RoleRunResult {
176                        role,
177                        run_id,
178                        output: RoleOutput::Fix {
179                            patch_digest: String::new(),
180                            resolved_issues: vec![e.to_string()],
181                        },
182                        success: false,
183                    });
184                }
185            }
186        });
187
188        tasks.push(task);
189    }
190
191    for task in tasks {
192        let _ = task.await;
193    }
194
195    let guard = results.lock().await;
196    let results_vec: Vec<RoleRunResult> = guard.clone();
197    drop(guard);
198
199    if !results_vec.is_empty() && results_vec.iter().all(|r| !r.success) {
200        return Err(RoleError::ParallelExecutionFailed {
201            detail: "all parallel role runs failed".to_string(),
202        });
203    }
204
205    Ok(results_vec)
206}
207
208/// Convert a [`RoleRunResult`] into a [`HandoffToken`].
209///
210/// Returns [`RoleError::Domain`] if the output cannot be serialised.
211pub fn token_from_result(result: RoleRunResult) -> RoleResult<HandoffToken> {
212    HandoffToken::new(result.output).map_err(RoleError::Domain)
213}