Skip to main content

forge_reasoning/verification/
runner.rs

1//! Async verification runner with parallel execution
2//!
3//! This module provides VerificationRunner for executing verification checks
4//! in parallel with configurable concurrency limits and automatic evidence attachment.
5
6use std::sync::Arc;
7use std::time::Duration;
8use tokio::sync::Semaphore;
9use tokio::task::JoinSet;
10
11use crate::hypothesis::HypothesisBoard;
12use crate::hypothesis::types::HypothesisId;
13use crate::hypothesis::evidence::{EvidenceType, EvidenceMetadata};
14use crate::errors::Result;
15use super::check::{
16    VerificationCheck, CheckResult, CheckStatus, VerificationCommand,
17    PassAction, FailAction, CheckId,
18};
19
20/// Verification runner for async check execution
21pub struct VerificationRunner {
22    board: Arc<HypothesisBoard>,
23    max_concurrent: usize,
24    checks: Arc<tokio::sync::Mutex<std::collections::HashMap<CheckId, VerificationCheck>>>,
25}
26
27impl VerificationRunner {
28    /// Create a new verification runner
29    pub fn new(board: Arc<HypothesisBoard>, max_concurrent: usize) -> Self {
30        Self {
31            board,
32            max_concurrent,
33            checks: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())),
34        }
35    }
36
37    /// Register a verification check
38    pub async fn register_check(
39        &self,
40        name: String,
41        hypothesis_id: HypothesisId,
42        command: VerificationCommand,
43        timeout: Duration,
44        on_pass: Option<PassAction>,
45        on_fail: Option<FailAction>,
46    ) -> Result<CheckId> {
47        let check = VerificationCheck::new(
48            name,
49            hypothesis_id,
50            timeout,
51            command,
52            on_pass,
53            on_fail,
54        );
55
56        let check_id = check.id;
57        let mut checks = self.checks.lock().await;
58        checks.insert(check_id, check);
59        Ok(check_id)
60    }
61
62    /// Get status of a check
63    pub async fn get_status(&self, check_id: CheckId) -> Option<CheckStatus> {
64        let checks = self.checks.lock().await;
65        checks.get(&check_id).map(|c| c.status.clone())
66    }
67
68    /// List all checks
69    pub async fn list_checks(&self) -> Vec<(CheckId, CheckStatus)> {
70        let checks = self.checks.lock().await;
71        checks.iter().map(|(id, check)| (*id, check.status.clone())).collect()
72    }
73
74    /// Execute multiple checks in parallel
75    pub async fn execute_checks(&self, check_ids: Vec<CheckId>) -> Vec<(CheckId, CheckResult)> {
76        let semaphore = Arc::new(Semaphore::new(self.max_concurrent));
77        let mut join_set = JoinSet::new();
78
79        for check_id in check_ids {
80            let semaphore = semaphore.clone();
81            let checks = self.checks.clone();
82            let board = self.board.clone();
83
84            join_set.spawn(async move {
85                let _permit = semaphore.acquire().await.unwrap();
86
87                // Get check
88                let check = {
89                    let checks_lock = checks.lock().await;
90                    checks_lock.get(&check_id).cloned()
91                };
92
93                let result = match check {
94                    Some(c) => {
95                        // Execute shell command
96                        let cmd = match &c.command {
97                            VerificationCommand::ShellCommand(cmd) => cmd.clone(),
98                            VerificationCommand::CustomAssertion { .. } => {
99                                return (check_id, CheckResult::Panic {
100                                    message: "Custom assertions not yet implemented".to_string()
101                                });
102                            }
103                        };
104
105                        let start = std::time::Instant::now();
106                        let cmd_result = tokio::process::Command::new("sh")
107                            .arg("-c")
108                            .arg(&cmd)
109                            .output()
110                            .await;
111
112                        let duration = start.elapsed();
113
114                        let result = match cmd_result {
115                            Ok(output) => {
116                                if output.status.success() {
117                                    CheckResult::Passed {
118                                        output: String::from_utf8_lossy(&output.stdout).to_string(),
119                                        duration,
120                                    }
121                                } else {
122                                    CheckResult::Failed {
123                                        output: String::from_utf8_lossy(&output.stdout).to_string(),
124                                        error: String::from_utf8_lossy(&output.stderr).to_string(),
125                                    }
126                                }
127                            }
128                            Err(e) => CheckResult::Panic {
129                                message: format!("Failed to execute: {}", e),
130                            }
131                        };
132
133                        // Record evidence
134                        let (strength, passed) = match &result {
135                            CheckResult::Passed { .. } => (1.0, true),
136                            _ => (-1.0, false),
137                        };
138
139                        let metadata = EvidenceMetadata::Experiment {
140                            name: c.name.clone(),
141                            test_command: cmd,
142                            output: result.output().unwrap_or("").to_string(),
143                            passed,
144                        };
145
146                        let _ = board.attach_evidence(
147                            c.hypothesis_id,
148                            EvidenceType::Experiment,
149                            strength,
150                            metadata,
151                        ).await;
152
153                        // Execute actions
154                        if result.is_success() {
155                            if let Some(PassAction::SetStatus(status)) = &c.on_pass {
156                                let _ = board.set_status(c.hypothesis_id, status.clone()).await;
157                            }
158                        } else {
159                            if let Some(FailAction::SetStatus(status)) = &c.on_fail {
160                                let _ = board.set_status(c.hypothesis_id, status.clone()).await;
161                            }
162                        }
163
164                        result
165                    }
166                    None => {
167                        return (check_id, CheckResult::Panic {
168                            message: "Check not found".to_string()
169                        });
170                    }
171                };
172
173                (check_id, result)
174            });
175        }
176
177        let mut results = Vec::new();
178        while let Some(result) = join_set.join_next().await {
179            if let Ok(r) = result {
180                results.push(r);
181            }
182        }
183
184        results
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191    use crate::hypothesis::confidence::Confidence;
192    use crate::hypothesis::types::HypothesisStatus;
193
194    #[tokio::test]
195    async fn test_runner_creation() {
196        let board = Arc::new(HypothesisBoard::in_memory());
197        let runner = VerificationRunner::new(board, 10);
198        assert_eq!(runner.max_concurrent, 10);
199    }
200
201    #[tokio::test]
202    async fn test_register_check() {
203        let board = Arc::new(HypothesisBoard::in_memory());
204        let runner = VerificationRunner::new(board.clone(), 10);
205
206        let prior = Confidence::new(0.5).unwrap();
207        let h_id = board.propose("Test hypothesis", prior).await.unwrap();
208
209        let check_id = runner.register_check(
210            "test check".to_string(),
211            h_id,
212            VerificationCommand::ShellCommand("echo test".to_string()),
213            Duration::from_secs(5),
214            None,
215            None,
216        ).await;
217
218        assert!(check_id.is_ok());
219
220        // List checks should include our check
221        let checks = runner.list_checks().await;
222        assert_eq!(checks.len(), 1);
223    }
224
225    #[tokio::test]
226    async fn test_evidence_attachment() {
227        let board = Arc::new(HypothesisBoard::in_memory());
228        let runner = VerificationRunner::new(board.clone(), 10);
229
230        let prior = Confidence::new(0.5).unwrap();
231        let h_id = board.propose("Test", prior).await.unwrap();
232
233        let check = VerificationCheck::new(
234            "test".to_string(),
235            h_id,
236            Duration::from_secs(1),
237            VerificationCommand::ShellCommand("echo test".to_string()),
238            None,
239            None,
240        );
241
242        let result = CheckResult::Passed {
243            output: "test output".to_string(),
244            duration: Duration::from_millis(100),
245        };
246
247        // Evidence is automatically attached during execute_checks
248        // This is tested implicitly by that function
249    }
250
251    #[tokio::test]
252    async fn test_parallel_execution_respects_semaphore() {
253        let board = Arc::new(HypothesisBoard::in_memory());
254        let runner = VerificationRunner::new(board.clone(), 2); // max 2 concurrent
255
256        let prior = Confidence::new(0.5).unwrap();
257        let h1 = board.propose("H1", prior).await.unwrap();
258        let h2 = board.propose("H2", prior).await.unwrap();
259        let h3 = board.propose("H3", prior).await.unwrap();
260
261        // Register 3 checks
262        let c1 = runner.register_check(
263            "check1".to_string(),
264            h1,
265            VerificationCommand::ShellCommand("sleep 0.1".to_string()),
266            Duration::from_secs(1),
267            None,
268            None,
269        ).await.unwrap();
270
271        let c2 = runner.register_check(
272            "check2".to_string(),
273            h2,
274            VerificationCommand::ShellCommand("sleep 0.1".to_string()),
275            Duration::from_secs(1),
276            None,
277            None,
278        ).await.unwrap();
279
280        let c3 = runner.register_check(
281            "check3".to_string(),
282            h3,
283            VerificationCommand::ShellCommand("sleep 0.1".to_string()),
284            Duration::from_secs(1),
285            None,
286            None,
287        ).await.unwrap();
288
289        // Execute all 3
290        let results = runner.execute_checks(vec![c1, c2, c3]).await;
291        assert_eq!(results.len(), 3);
292    }
293}