Skip to main content

dk_protocol/
submit.rs

1use std::path::PathBuf;
2
3use tonic::{Response, Status};
4use tracing::{info, warn};
5
6use dk_engine::workspace::overlay::OverlayEntry;
7
8use crate::server::ProtocolServer;
9use crate::validation::validate_file_path;
10use crate::{ChangeType, SubmitError, SubmitRequest, SubmitResponse, SubmitStatus};
11
12/// Handle a SUBMIT RPC.
13///
14/// 1. Validates the session.
15/// 2. Resolves the repo to obtain the working directory path.
16/// 3. Applies file-level writes through the session workspace overlay.
17/// 4. Re-opens the repo and re-indexes changed files through the engine.
18/// 5. Returns ACCEPTED with a new changeset ID, or REJECTED with errors.
19pub async fn handle_submit(
20    server: &ProtocolServer,
21    req: SubmitRequest,
22) -> Result<Response<SubmitResponse>, Status> {
23    // 1. Validate session
24    let session = server.validate_session(&req.session_id)?;
25
26    // Validate all file paths before any processing
27    for change in &req.changes {
28        validate_file_path(&change.file_path)?;
29    }
30
31    let sid = req
32        .session_id
33        .parse::<uuid::Uuid>()
34        .map_err(|_| Status::invalid_argument("Invalid session ID"))?;
35    server.session_mgr().touch_session(&sid);
36
37    // 2. Resolve repo — extract work_dir, repo_id, and file-existence checks
38    //    in a single get_repo call. The `GitRepository` (gix::Repository is
39    //    !Sync) is dropped before any subsequent .await.
40    let engine = server.engine();
41
42    // Parse changeset_id from request
43    let changeset_id = req.changeset_id.parse::<uuid::Uuid>()
44        .map_err(|_| Status::invalid_argument("invalid changeset_id"))?;
45
46    // Get workspace for this session
47    let ws = engine
48        .workspace_manager()
49        .get_workspace(&sid)
50        .ok_or_else(|| Status::not_found("Workspace not found for session"))?;
51
52    let base_commit = ws.base_commit.clone();
53
54    // Single get_repo call: extract work_dir and pre-compute is_new for each file
55    let (repo_id, work_dir, file_checks) = {
56        let (repo_id, git_repo) = engine
57            .get_repo(&session.codebase)
58            .await
59            .map_err(|e| Status::internal(format!("Repo error: {e}")))?;
60
61        let work_dir = git_repo.path().to_path_buf();
62
63        let checks: Vec<(&crate::Change, bool)> = req
64            .changes
65            .iter()
66            .map(|change| {
67                let exists_in_base = git_repo
68                    .read_tree_entry(&base_commit, &change.file_path)
69                    .is_ok();
70                (change, exists_in_base)
71            })
72            .collect();
73
74        (repo_id, work_dir, checks)
75        // git_repo is dropped here
76    };
77
78    // Snapshot the existing symbols per file (file_path -> (qualified_name -> id))
79    // for files that will be changed.  After re-indexing we compare by ID so that
80    // modifications (same name, new UUID) are still detected.
81    let mut pre_submit_symbols: std::collections::HashMap<String, std::collections::HashMap<String, uuid::Uuid>> = {
82        let mut file_syms = std::collections::HashMap::new();
83        for change in &req.changes {
84            let entry: &mut std::collections::HashMap<String, uuid::Uuid> = file_syms.entry(change.file_path.clone()).or_default();
85            if let Ok(symbols) = engine.symbol_store().find_by_file(repo_id, &change.file_path).await {
86                for sym in symbols {
87                    entry.insert(sym.qualified_name, sym.id);
88                }
89            }
90        }
91        file_syms
92    };
93
94    // Snapshot overlay early so we can reuse it for both pre_submit_symbols
95    // (MCP path) and later for changeset_files, avoiding a redundant call.
96    let early_overlay_snapshot = if req.changes.is_empty() {
97        let snap = ws.overlay.list_changes();
98        // Populate pre_submit_symbols from overlay paths so symbol diffs are accurate.
99        for (path, _) in &snap {
100            let entry = pre_submit_symbols.entry(path.clone()).or_default();
101            if let Ok(symbols) = engine.symbol_store().find_by_file(repo_id, path).await {
102                for sym in symbols {
103                    entry.insert(sym.qualified_name, sym.id);
104                }
105            }
106        }
107        Some(snap)
108    } else {
109        None
110    };
111
112    let mut errors = Vec::new();
113    let mut changed_files = Vec::new();
114
115    // 3. Apply each change through the session workspace overlay.
116    for (change, exists_in_base) in &file_checks {
117        match change.r#type() {
118            ChangeType::ModifyFunction | ChangeType::ModifyType => {
119                // Target file must already exist in base or overlay
120                let in_overlay = ws.overlay.contains(&change.file_path);
121                if !exists_in_base && !in_overlay {
122                    errors.push(SubmitError {
123                        message: format!("File not found: {}", change.file_path),
124                        symbol_id: change.old_symbol_id.clone(),
125                        file_path: Some(change.file_path.clone()),
126                    });
127                    continue;
128                }
129                let is_new = !exists_in_base;
130                if let Err(e) = ws
131                    .overlay
132                    .write(&change.file_path, change.new_source.as_bytes().to_vec(), is_new)
133                    .await
134                {
135                    errors.push(SubmitError {
136                        message: format!("Write failed: {e}"),
137                        symbol_id: None,
138                        file_path: Some(change.file_path.clone()),
139                    });
140                    continue;
141                }
142                changed_files.push(PathBuf::from(&change.file_path));
143            }
144
145            ChangeType::AddFunction | ChangeType::AddType | ChangeType::AddDependency => {
146                let is_new = !exists_in_base;
147                if let Err(e) = ws
148                    .overlay
149                    .write(&change.file_path, change.new_source.as_bytes().to_vec(), is_new)
150                    .await
151                {
152                    errors.push(SubmitError {
153                        message: format!("Write failed: {e}"),
154                        symbol_id: None,
155                        file_path: Some(change.file_path.clone()),
156                    });
157                    continue;
158                }
159                changed_files.push(PathBuf::from(&change.file_path));
160            }
161
162            ChangeType::DeleteFunction => {
163                // For deletes we track the file as changed so the engine
164                // can re-index it (the function body will have been removed
165                // from the source by the agent).
166                changed_files.push(PathBuf::from(&change.file_path));
167            }
168        }
169    }
170
171    // Reuse early snapshot if available (MCP path), otherwise take it now.
172    let overlay_snapshot = early_overlay_snapshot.unwrap_or_else(|| ws.overlay.list_changes());
173
174    // Reject empty changesets — there must be at least one file modification.
175    if overlay_snapshot.is_empty() && changed_files.is_empty() && errors.is_empty() {
176        warn!(
177            session_id = %req.session_id,
178            "SUBMIT: rejected — no file changes in overlay"
179        );
180        return Ok(Response::new(SubmitResponse {
181            status: SubmitStatus::Rejected.into(),
182            changeset_id: String::new(),
183            new_version: None,
184            errors: vec![SubmitError {
185                message: "No changes to submit".to_string(),
186                symbol_id: None,
187                file_path: None,
188            }],
189            conflict_block: None,
190            review_summary: None,
191        }));
192    }
193
194    // Drop the workspace guard before further async work
195    drop(ws);
196
197    // Record file changes in changeset — always use the overlay as the
198    // single source of truth.  This unifies the "standard" path (changes
199    // sent inline via req.changes) and the "MCP" path (files written
200    // earlier via dk_file_write).  The overlay is session-scoped, so we
201    // only capture files belonging to this session.
202    for (path, entry) in &overlay_snapshot {
203        let (op, content) = match entry {
204            OverlayEntry::Added { content, .. } => {
205                ("add", Some(String::from_utf8_lossy(content).into_owned()))
206            }
207            OverlayEntry::Modified { content, .. } => {
208                ("modify", Some(String::from_utf8_lossy(content).into_owned()))
209            }
210            OverlayEntry::Deleted => ("delete", None),
211        };
212        engine.changeset_store()
213            .upsert_file(changeset_id, path, op, content.as_deref())
214            .await
215            .map_err(|e| Status::internal(format!("changeset file record failed: {e}")))?;
216        // Ensure MCP-path files end up in changed_files for re-indexing
217        if !changed_files.iter().any(|p| p.to_string_lossy() == *path) {
218            changed_files.push(PathBuf::from(path));
219        }
220    }
221
222    // If any change failed, reject the whole submission.
223    if !errors.is_empty() {
224        warn!(
225            session_id = %req.session_id,
226            error_count = errors.len(),
227            "SUBMIT: rejected with errors"
228        );
229        return Ok(Response::new(SubmitResponse {
230            status: SubmitStatus::Rejected.into(),
231            changeset_id: String::new(),
232            new_version: None,
233            errors,
234            conflict_block: None,
235            review_summary: None,
236        }));
237    }
238
239    // 4. Re-index changed files through the semantic graph.
240    //    Use `update_files_by_root` which takes a `&Path` instead of
241    //    `&GitRepository` (the latter is !Sync and cannot cross .await).
242    if let Err(e) = engine
243        .update_files_by_root(repo_id, &work_dir, &changed_files)
244        .await
245    {
246        return Ok(Response::new(SubmitResponse {
247            status: SubmitStatus::Rejected.into(),
248            changeset_id: String::new(),
249            new_version: None,
250            errors: vec![SubmitError {
251                message: format!("Re-indexing failed: {e}"),
252                symbol_id: None,
253                file_path: None,
254            }],
255            conflict_block: None,
256            review_summary: None,
257        }));
258    }
259
260    // Record only NEW or CHANGED symbols in the changeset.
261    // A symbol is "affected" if:
262    //   (a) its qualified_name did not exist before (new symbol), OR
263    //   (b) its qualified_name existed but its UUID changed after re-index
264    //       (the symbol was modified -- see symbols.rs ON CONFLICT ... SET id).
265    for file_path in &changed_files {
266        let rel_str = file_path.to_string_lossy().to_string();
267        let file_pre_syms = pre_submit_symbols.get(&rel_str);
268        if let Ok(new_symbols) = engine.symbol_store().find_by_file(repo_id, &rel_str).await {
269            for sym in &new_symbols {
270                // Record if the symbol is new OR its ID changed (modified).
271                let unchanged = file_pre_syms
272                    .and_then(|m| m.get(&sym.qualified_name))
273                    .is_some_and(|old_id| *old_id == sym.id);
274                if !unchanged {
275                    let _ = engine.changeset_store()
276                        .record_affected_symbol(changeset_id, sym.id, &sym.qualified_name)
277                        .await;
278                }
279            }
280        }
281    }
282
283    // Update changeset status to "submitted"
284    engine.changeset_store().update_status(changeset_id, "submitted").await
285        .map_err(|e| Status::internal(format!("changeset status update failed: {e}")))?;
286
287    // Build affected_files list from overlay (unified source of truth)
288    let affected_files: Vec<crate::FileChange> = overlay_snapshot.iter().map(|(path, entry)| {
289        let operation = match entry {
290            OverlayEntry::Added { .. } => "add",
291            OverlayEntry::Modified { .. } => "modify",
292            OverlayEntry::Deleted => "delete",
293        };
294        crate::FileChange {
295            path: path.clone(),
296            operation: operation.to_string(),
297        }
298    }).collect();
299
300    // Build symbol_changes from pre/post symbol comparison
301    let mut symbol_changes: Vec<crate::SymbolChangeDetail> = Vec::new();
302    for file_path in &changed_files {
303        let rel_str = file_path.to_string_lossy().to_string();
304        let file_pre_syms = pre_submit_symbols.get(&rel_str);
305        if let Ok(new_symbols) = engine.symbol_store().find_by_file(repo_id, &rel_str).await {
306            // Detect added/modified symbols
307            for sym in &new_symbols {
308                let change_type = match file_pre_syms.and_then(|m| m.get(&sym.qualified_name)) {
309                    Some(old_id) if *old_id == sym.id => continue, // unchanged
310                    Some(_) => "modified",
311                    None => "added",
312                };
313                symbol_changes.push(crate::SymbolChangeDetail {
314                    symbol_name: sym.qualified_name.clone(),
315                    file_path: rel_str.clone(),
316                    change_type: change_type.to_string(),
317                    kind: sym.kind.to_string(),
318                });
319            }
320            // Detect deleted symbols: only check symbols that belonged to THIS file
321            if let Some(old_syms) = file_pre_syms {
322                for name in old_syms.keys() {
323                    let still_exists = new_symbols.iter().any(|s| s.qualified_name == *name);
324                    if !still_exists {
325                        symbol_changes.push(crate::SymbolChangeDetail {
326                            symbol_name: name.clone(),
327                            file_path: rel_str.clone(),
328                            change_type: "deleted".to_string(),
329                            kind: String::new(),
330                        });
331                    }
332                }
333            }
334        }
335    }
336
337    // Publish event
338    server.event_bus().publish(crate::WatchEvent {
339        event_type: "changeset.submitted".to_string(),
340        changeset_id: changeset_id.to_string(),
341        agent_id: session.agent_id.clone(),
342        affected_symbols: vec![],
343        details: req.intent.clone(),
344        session_id: req.session_id.clone(),
345        affected_files,
346        symbol_changes,
347        repo_id: repo_id.to_string(),
348        event_id: uuid::Uuid::new_v4().to_string(),
349    });
350
351    // Read HEAD version without holding the GitRepository across awaits.
352    let new_version = {
353        let (_repo_id, git_repo) = engine
354            .get_repo(&session.codebase)
355            .await
356            .map_err(|e| Status::internal(format!("Repo error (head read): {e}")))?;
357        git_repo
358            .head_hash()
359            .ok()
360            .flatten()
361            .unwrap_or_else(|| "pending".to_string())
362    };
363
364    // 5. Return ACCEPTED with the changeset ID from the request.
365    info!(
366        session_id = %req.session_id,
367        changeset_id = %changeset_id,
368        files_changed = changed_files.len(),
369        "SUBMIT: accepted"
370    );
371
372    Ok(Response::new(SubmitResponse {
373        status: SubmitStatus::Accepted.into(),
374        changeset_id: changeset_id.to_string(),
375        new_version: Some(new_version),
376        errors: vec![],
377        conflict_block: None,
378        review_summary: None,
379    }))
380}