Skip to main content

dk_protocol/
verify.rs

1use tokio::sync::mpsc;
2use tonic::Status;
3use uuid::Uuid;
4
5use dk_runner::executor::process::ProcessExecutor;
6use dk_runner::Runner;
7
8use crate::server::ProtocolServer;
9use crate::{VerifyRequest, VerifyStepResult};
10use crate::proto::dkod::v1::Finding as ProtoFinding;
11use crate::proto::dkod::v1::Suggestion as ProtoSuggestion;
12
13pub async fn handle_verify(
14    server: &ProtocolServer,
15    req: VerifyRequest,
16    tx: mpsc::Sender<Result<VerifyStepResult, Status>>,
17) {
18    // Validate session
19    let session = match server.validate_session(&req.session_id) {
20        Ok(s) => s,
21        Err(e) => {
22            let _ = tx.send(Err(e)).await;
23            return;
24        }
25    };
26
27    let changeset_id = match req.changeset_id.parse::<Uuid>() {
28        Ok(id) => id,
29        Err(_) => {
30            let _ = tx
31                .send(Err(Status::invalid_argument("invalid changeset_id")))
32                .await;
33            return;
34        }
35    };
36
37    // Verify changeset exists and update status to verifying
38    let engine = server.engine();
39    {
40        if let Err(e) = engine.changeset_store().get(changeset_id).await {
41            let _ = tx.send(Err(Status::not_found(e.to_string()))).await;
42            return;
43        }
44        let _ = engine
45            .changeset_store()
46            .update_status(changeset_id, "verifying")
47            .await;
48    }
49
50    // Resolve repo_id for enriched events
51    let repo_id_str = match engine.get_repo(&session.codebase).await {
52        Ok((rid, _)) => rid.to_string(),
53        Err(_) => String::new(),
54    };
55
56    // Publish verify_started event
57    server.event_bus().publish(crate::WatchEvent {
58        event_type: "changeset.verify_started".to_string(),
59        changeset_id: changeset_id.to_string(),
60        agent_id: session.agent_id.clone(),
61        affected_symbols: vec![],
62        details: String::new(),
63        session_id: req.session_id.clone(),
64        affected_files: vec![],
65        symbol_changes: vec![],
66        repo_id: repo_id_str.clone(),
67        event_id: Uuid::new_v4().to_string(),
68    });
69
70    // Create runner with process executor
71    let runner = Runner::new(
72        server.engine.clone(),
73        Box::new(ProcessExecutor::new()),
74    );
75
76    // Bridge dk-runner StepResults to gRPC VerifyStepResults
77    let (runner_tx, mut runner_rx) = tokio::sync::mpsc::channel(32);
78
79    let codebase = session.codebase.clone();
80    let grpc_tx = tx.clone();
81
82    // Spawn runner in background
83    let runner_handle = tokio::spawn(async move {
84        runner.verify(changeset_id, &codebase, runner_tx).await
85    });
86
87    // Forward results from runner to gRPC stream
88    let mut step_counter = 0i32;
89    while let Some(result) = runner_rx.recv().await {
90        step_counter += 1;
91
92        let step_status_str = result.status.as_str().to_string();
93        let step_name_str = result.step_name.clone();
94
95        let findings: Vec<ProtoFinding> = result.findings.iter().map(|f| {
96            ProtoFinding {
97                severity: f.severity.as_str().to_string(),
98                check_name: f.check_name.clone(),
99                message: f.message.clone(),
100                file_path: f.file_path.clone(),
101                line: f.line,
102                symbol: f.symbol.clone(),
103            }
104        }).collect();
105
106        let suggestions: Vec<ProtoSuggestion> = result.suggestions.iter().map(|s| {
107            ProtoSuggestion {
108                finding_index: s.finding_index as u32,
109                description: s.description.clone(),
110                file_path: s.file_path.clone(),
111                replacement: s.replacement.clone(),
112            }
113        }).collect();
114
115        let _ = grpc_tx
116            .send(Ok(VerifyStepResult {
117                step_order: step_counter,
118                step_name: result.step_name,
119                status: result.status.as_str().to_string(),
120                output: result.output,
121                required: result.required,
122                findings,
123                suggestions,
124            }))
125            .await;
126
127        // Publish verify_step event for each step
128        server.event_bus().publish(crate::WatchEvent {
129            event_type: "changeset.verify_step".to_string(),
130            changeset_id: changeset_id.to_string(),
131            agent_id: session.agent_id.clone(),
132            affected_symbols: vec![],
133            details: format!("{}:{}", step_name_str, step_status_str),
134            session_id: req.session_id.clone(),
135            affected_files: vec![],
136            symbol_changes: vec![],
137            repo_id: repo_id_str.clone(),
138            event_id: Uuid::new_v4().to_string(),
139        });
140    }
141
142    // Get final result and update changeset status
143    let final_status = match runner_handle.await {
144        Ok(Ok(passed)) => if passed { "approved" } else { "rejected" },
145        Ok(Err(e)) => {
146            tracing::error!("runner error: {e}");
147            "rejected"
148        }
149        Err(e) => {
150            tracing::error!("runner task panicked: {e}");
151            "rejected"
152        }
153    };
154
155    let _ = engine
156        .changeset_store()
157        .update_status(changeset_id, final_status)
158        .await;
159
160    // Publish verified event with final status
161    server.event_bus().publish(crate::WatchEvent {
162        event_type: "changeset.verified".to_string(),
163        changeset_id: changeset_id.to_string(),
164        agent_id: session.agent_id.clone(),
165        affected_symbols: vec![],
166        details: final_status.to_string(),
167        session_id: req.session_id.clone(),
168        affected_files: vec![],
169        symbol_changes: vec![],
170        repo_id: repo_id_str.clone(),
171        event_id: Uuid::new_v4().to_string(),
172    });
173}
174
175// ── Event type constants ────────────────────────────────────────────
176// Extracted from the handler above so they can be tested and referenced
177// by other modules without string duplication.
178
179/// Event published when verification begins.
180pub const EVENT_VERIFY_STARTED: &str = "changeset.verify_started";
181/// Event published after each verification step completes.
182pub const EVENT_VERIFY_STEP: &str = "changeset.verify_step";
183/// Event published when the entire verification pipeline finishes.
184pub const EVENT_VERIFIED: &str = "changeset.verified";
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189
190    #[test]
191    fn verify_started_event_type() {
192        assert_eq!(EVENT_VERIFY_STARTED, "changeset.verify_started");
193    }
194
195    #[test]
196    fn verify_step_event_type() {
197        assert_eq!(EVENT_VERIFY_STEP, "changeset.verify_step");
198    }
199
200    #[test]
201    fn verified_event_type() {
202        assert_eq!(EVENT_VERIFIED, "changeset.verified");
203    }
204
205    #[test]
206    fn verify_event_types_use_dot_separator() {
207        for event in [EVENT_VERIFY_STARTED, EVENT_VERIFY_STEP, EVENT_VERIFIED] {
208            assert!(
209                event.contains('.'),
210                "event type '{}' should use dot separator",
211                event
212            );
213            assert!(
214                event.starts_with("changeset."),
215                "event type '{}' should start with 'changeset.'",
216                event
217            );
218        }
219    }
220
221    #[test]
222    fn verify_event_types_are_distinct() {
223        let events = [EVENT_VERIFY_STARTED, EVENT_VERIFY_STEP, EVENT_VERIFIED];
224        for i in 0..events.len() {
225            for j in (i + 1)..events.len() {
226                assert_ne!(
227                    events[i], events[j],
228                    "event types should be distinct: '{}' vs '{}'",
229                    events[i], events[j]
230                );
231            }
232        }
233    }
234}