1use tokio::sync::mpsc;
2use tonic::Status;
3use uuid::Uuid;
4
5use dk_runner::executor::process::ProcessExecutor;
6use dk_runner::Runner;
7
8use crate::server::ProtocolServer;
9use crate::{VerifyRequest, VerifyStepResult};
10use crate::proto::dkod::v1::Finding as ProtoFinding;
11use crate::proto::dkod::v1::Suggestion as ProtoSuggestion;
12
13pub async fn handle_verify(
14 server: &ProtocolServer,
15 req: VerifyRequest,
16 tx: mpsc::Sender<Result<VerifyStepResult, Status>>,
17) {
18 let session = match server.validate_session(&req.session_id) {
20 Ok(s) => s,
21 Err(e) => {
22 let _ = tx.send(Err(e)).await;
23 return;
24 }
25 };
26
27 let changeset_id = match req.changeset_id.parse::<Uuid>() {
28 Ok(id) => id,
29 Err(_) => {
30 let _ = tx
31 .send(Err(Status::invalid_argument("invalid changeset_id")))
32 .await;
33 return;
34 }
35 };
36
37 let engine = server.engine();
39 {
40 if let Err(e) = engine.changeset_store().get(changeset_id).await {
41 let _ = tx.send(Err(Status::not_found(e.to_string()))).await;
42 return;
43 }
44 let _ = engine
45 .changeset_store()
46 .update_status(changeset_id, "verifying")
47 .await;
48 }
49
50 let repo_id_str = match engine.get_repo(&session.codebase).await {
52 Ok((rid, _)) => rid.to_string(),
53 Err(_) => String::new(),
54 };
55
56 server.event_bus().publish(crate::WatchEvent {
58 event_type: "changeset.verify_started".to_string(),
59 changeset_id: changeset_id.to_string(),
60 agent_id: session.agent_id.clone(),
61 affected_symbols: vec![],
62 details: String::new(),
63 session_id: req.session_id.clone(),
64 affected_files: vec![],
65 symbol_changes: vec![],
66 repo_id: repo_id_str.clone(),
67 event_id: Uuid::new_v4().to_string(),
68 });
69
70 let runner = Runner::new(
72 server.engine.clone(),
73 Box::new(ProcessExecutor::new()),
74 );
75
76 let (runner_tx, mut runner_rx) = tokio::sync::mpsc::channel(32);
78
79 let codebase = session.codebase.clone();
80 let grpc_tx = tx.clone();
81
82 let runner_handle = tokio::spawn(async move {
84 runner.verify(changeset_id, &codebase, runner_tx).await
85 });
86
87 let mut step_counter = 0i32;
89 while let Some(result) = runner_rx.recv().await {
90 step_counter += 1;
91
92 let step_status_str = result.status.as_str().to_string();
93 let step_name_str = result.step_name.clone();
94
95 let findings: Vec<ProtoFinding> = result.findings.iter().map(|f| {
96 ProtoFinding {
97 severity: f.severity.as_str().to_string(),
98 check_name: f.check_name.clone(),
99 message: f.message.clone(),
100 file_path: f.file_path.clone(),
101 line: f.line,
102 symbol: f.symbol.clone(),
103 }
104 }).collect();
105
106 let suggestions: Vec<ProtoSuggestion> = result.suggestions.iter().map(|s| {
107 ProtoSuggestion {
108 finding_index: s.finding_index as u32,
109 description: s.description.clone(),
110 file_path: s.file_path.clone(),
111 replacement: s.replacement.clone(),
112 }
113 }).collect();
114
115 let _ = grpc_tx
116 .send(Ok(VerifyStepResult {
117 step_order: step_counter,
118 step_name: result.step_name,
119 status: result.status.as_str().to_string(),
120 output: result.output,
121 required: result.required,
122 findings,
123 suggestions,
124 }))
125 .await;
126
127 server.event_bus().publish(crate::WatchEvent {
129 event_type: "changeset.verify_step".to_string(),
130 changeset_id: changeset_id.to_string(),
131 agent_id: session.agent_id.clone(),
132 affected_symbols: vec![],
133 details: format!("{}:{}", step_name_str, step_status_str),
134 session_id: req.session_id.clone(),
135 affected_files: vec![],
136 symbol_changes: vec![],
137 repo_id: repo_id_str.clone(),
138 event_id: Uuid::new_v4().to_string(),
139 });
140 }
141
142 let final_status = match runner_handle.await {
144 Ok(Ok(passed)) => if passed { "approved" } else { "rejected" },
145 Ok(Err(e)) => {
146 tracing::error!("runner error: {e}");
147 "rejected"
148 }
149 Err(e) => {
150 tracing::error!("runner task panicked: {e}");
151 "rejected"
152 }
153 };
154
155 let _ = engine
156 .changeset_store()
157 .update_status(changeset_id, final_status)
158 .await;
159
160 server.event_bus().publish(crate::WatchEvent {
162 event_type: "changeset.verified".to_string(),
163 changeset_id: changeset_id.to_string(),
164 agent_id: session.agent_id.clone(),
165 affected_symbols: vec![],
166 details: final_status.to_string(),
167 session_id: req.session_id.clone(),
168 affected_files: vec![],
169 symbol_changes: vec![],
170 repo_id: repo_id_str.clone(),
171 event_id: Uuid::new_v4().to_string(),
172 });
173}
174
175pub const EVENT_VERIFY_STARTED: &str = "changeset.verify_started";
181pub const EVENT_VERIFY_STEP: &str = "changeset.verify_step";
183pub const EVENT_VERIFIED: &str = "changeset.verified";
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189
190 #[test]
191 fn verify_started_event_type() {
192 assert_eq!(EVENT_VERIFY_STARTED, "changeset.verify_started");
193 }
194
195 #[test]
196 fn verify_step_event_type() {
197 assert_eq!(EVENT_VERIFY_STEP, "changeset.verify_step");
198 }
199
200 #[test]
201 fn verified_event_type() {
202 assert_eq!(EVENT_VERIFIED, "changeset.verified");
203 }
204
205 #[test]
206 fn verify_event_types_use_dot_separator() {
207 for event in [EVENT_VERIFY_STARTED, EVENT_VERIFY_STEP, EVENT_VERIFIED] {
208 assert!(
209 event.contains('.'),
210 "event type '{}' should use dot separator",
211 event
212 );
213 assert!(
214 event.starts_with("changeset."),
215 "event type '{}' should start with 'changeset.'",
216 event
217 );
218 }
219 }
220
221 #[test]
222 fn verify_event_types_are_distinct() {
223 let events = [EVENT_VERIFY_STARTED, EVENT_VERIFY_STEP, EVENT_VERIFIED];
224 for i in 0..events.len() {
225 for j in (i + 1)..events.len() {
226 assert_ne!(
227 events[i], events[j],
228 "event types should be distinct: '{}' vs '{}'",
229 events[i], events[j]
230 );
231 }
232 }
233 }
234}