forge_reasoning/verification/
runner.rs1use std::sync::Arc;
7use std::time::Duration;
8use tokio::sync::Semaphore;
9use tokio::task::JoinSet;
10
11use crate::hypothesis::HypothesisBoard;
12use crate::hypothesis::types::HypothesisId;
13use crate::hypothesis::evidence::{EvidenceType, EvidenceMetadata};
14use crate::errors::Result;
15use super::check::{
16 VerificationCheck, CheckResult, CheckStatus, VerificationCommand,
17 PassAction, FailAction, CheckId,
18};
19
20pub struct VerificationRunner {
22 board: Arc<HypothesisBoard>,
23 max_concurrent: usize,
24 checks: Arc<tokio::sync::Mutex<std::collections::HashMap<CheckId, VerificationCheck>>>,
25}
26
27impl VerificationRunner {
28 pub fn new(board: Arc<HypothesisBoard>, max_concurrent: usize) -> Self {
30 Self {
31 board,
32 max_concurrent,
33 checks: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())),
34 }
35 }
36
37 pub async fn register_check(
39 &self,
40 name: String,
41 hypothesis_id: HypothesisId,
42 command: VerificationCommand,
43 timeout: Duration,
44 on_pass: Option<PassAction>,
45 on_fail: Option<FailAction>,
46 ) -> Result<CheckId> {
47 let check = VerificationCheck::new(
48 name,
49 hypothesis_id,
50 timeout,
51 command,
52 on_pass,
53 on_fail,
54 );
55
56 let check_id = check.id;
57 let mut checks = self.checks.lock().await;
58 checks.insert(check_id, check);
59 Ok(check_id)
60 }
61
62 pub async fn get_status(&self, check_id: CheckId) -> Option<CheckStatus> {
64 let checks = self.checks.lock().await;
65 checks.get(&check_id).map(|c| c.status.clone())
66 }
67
68 pub async fn list_checks(&self) -> Vec<(CheckId, CheckStatus)> {
70 let checks = self.checks.lock().await;
71 checks.iter().map(|(id, check)| (*id, check.status.clone())).collect()
72 }
73
74 pub async fn execute_checks(&self, check_ids: Vec<CheckId>) -> Vec<(CheckId, CheckResult)> {
76 let semaphore = Arc::new(Semaphore::new(self.max_concurrent));
77 let mut join_set = JoinSet::new();
78
79 for check_id in check_ids {
80 let semaphore = semaphore.clone();
81 let checks = self.checks.clone();
82 let board = self.board.clone();
83
84 join_set.spawn(async move {
85 let _permit = semaphore.acquire().await.unwrap();
86
87 let check = {
89 let checks_lock = checks.lock().await;
90 checks_lock.get(&check_id).cloned()
91 };
92
93 let result = match check {
94 Some(c) => {
95 let cmd = match &c.command {
97 VerificationCommand::ShellCommand(cmd) => cmd.clone(),
98 VerificationCommand::CustomAssertion { .. } => {
99 return (check_id, CheckResult::Panic {
100 message: "Custom assertions not yet implemented".to_string()
101 });
102 }
103 };
104
105 let start = std::time::Instant::now();
106 let cmd_result = tokio::process::Command::new("sh")
107 .arg("-c")
108 .arg(&cmd)
109 .output()
110 .await;
111
112 let duration = start.elapsed();
113
114 let result = match cmd_result {
115 Ok(output) => {
116 if output.status.success() {
117 CheckResult::Passed {
118 output: String::from_utf8_lossy(&output.stdout).to_string(),
119 duration,
120 }
121 } else {
122 CheckResult::Failed {
123 output: String::from_utf8_lossy(&output.stdout).to_string(),
124 error: String::from_utf8_lossy(&output.stderr).to_string(),
125 }
126 }
127 }
128 Err(e) => CheckResult::Panic {
129 message: format!("Failed to execute: {}", e),
130 }
131 };
132
133 let (strength, passed) = match &result {
135 CheckResult::Passed { .. } => (1.0, true),
136 _ => (-1.0, false),
137 };
138
139 let metadata = EvidenceMetadata::Experiment {
140 name: c.name.clone(),
141 test_command: cmd,
142 output: result.output().unwrap_or("").to_string(),
143 passed,
144 };
145
146 let _ = board.attach_evidence(
147 c.hypothesis_id,
148 EvidenceType::Experiment,
149 strength,
150 metadata,
151 ).await;
152
153 if result.is_success() {
155 if let Some(PassAction::SetStatus(status)) = &c.on_pass {
156 let _ = board.set_status(c.hypothesis_id, status.clone()).await;
157 }
158 } else {
159 if let Some(FailAction::SetStatus(status)) = &c.on_fail {
160 let _ = board.set_status(c.hypothesis_id, status.clone()).await;
161 }
162 }
163
164 result
165 }
166 None => {
167 return (check_id, CheckResult::Panic {
168 message: "Check not found".to_string()
169 });
170 }
171 };
172
173 (check_id, result)
174 });
175 }
176
177 let mut results = Vec::new();
178 while let Some(result) = join_set.join_next().await {
179 if let Ok(r) = result {
180 results.push(r);
181 }
182 }
183
184 results
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191 use crate::hypothesis::confidence::Confidence;
192 use crate::hypothesis::types::HypothesisStatus;
193
194 #[tokio::test]
195 async fn test_runner_creation() {
196 let board = Arc::new(HypothesisBoard::in_memory());
197 let runner = VerificationRunner::new(board, 10);
198 assert_eq!(runner.max_concurrent, 10);
199 }
200
201 #[tokio::test]
202 async fn test_register_check() {
203 let board = Arc::new(HypothesisBoard::in_memory());
204 let runner = VerificationRunner::new(board.clone(), 10);
205
206 let prior = Confidence::new(0.5).unwrap();
207 let h_id = board.propose("Test hypothesis", prior).await.unwrap();
208
209 let check_id = runner.register_check(
210 "test check".to_string(),
211 h_id,
212 VerificationCommand::ShellCommand("echo test".to_string()),
213 Duration::from_secs(5),
214 None,
215 None,
216 ).await;
217
218 assert!(check_id.is_ok());
219
220 let checks = runner.list_checks().await;
222 assert_eq!(checks.len(), 1);
223 }
224
225 #[tokio::test]
226 async fn test_evidence_attachment() {
227 let board = Arc::new(HypothesisBoard::in_memory());
228 let runner = VerificationRunner::new(board.clone(), 10);
229
230 let prior = Confidence::new(0.5).unwrap();
231 let h_id = board.propose("Test", prior).await.unwrap();
232
233 let check = VerificationCheck::new(
234 "test".to_string(),
235 h_id,
236 Duration::from_secs(1),
237 VerificationCommand::ShellCommand("echo test".to_string()),
238 None,
239 None,
240 );
241
242 let result = CheckResult::Passed {
243 output: "test output".to_string(),
244 duration: Duration::from_millis(100),
245 };
246
247 }
250
251 #[tokio::test]
252 async fn test_parallel_execution_respects_semaphore() {
253 let board = Arc::new(HypothesisBoard::in_memory());
254 let runner = VerificationRunner::new(board.clone(), 2); let prior = Confidence::new(0.5).unwrap();
257 let h1 = board.propose("H1", prior).await.unwrap();
258 let h2 = board.propose("H2", prior).await.unwrap();
259 let h3 = board.propose("H3", prior).await.unwrap();
260
261 let c1 = runner.register_check(
263 "check1".to_string(),
264 h1,
265 VerificationCommand::ShellCommand("sleep 0.1".to_string()),
266 Duration::from_secs(1),
267 None,
268 None,
269 ).await.unwrap();
270
271 let c2 = runner.register_check(
272 "check2".to_string(),
273 h2,
274 VerificationCommand::ShellCommand("sleep 0.1".to_string()),
275 Duration::from_secs(1),
276 None,
277 None,
278 ).await.unwrap();
279
280 let c3 = runner.register_check(
281 "check3".to_string(),
282 h3,
283 VerificationCommand::ShellCommand("sleep 0.1".to_string()),
284 Duration::from_secs(1),
285 None,
286 None,
287 ).await.unwrap();
288
289 let results = runner.execute_checks(vec![c1, c2, c3]).await;
291 assert_eq!(results.len(), 3);
292 }
293}