Skip to main content

dk_protocol/
submit.rs

1use std::path::PathBuf;
2
3use tonic::{Response, Status};
4use tracing::{info, warn};
5
6use dk_engine::workspace::overlay::OverlayEntry;
7
8use crate::server::ProtocolServer;
9use crate::validation::validate_file_path;
10use crate::{ChangeType, SubmitError, SubmitRequest, SubmitResponse, SubmitStatus};
11
12/// Handle a SUBMIT RPC.
13///
14/// 1. Validates the session.
15/// 2. Resolves the repo to obtain the working directory path.
16/// 3. Applies file-level writes through the session workspace overlay.
17/// 4. Re-opens the repo and re-indexes changed files through the engine.
18/// 5. Returns ACCEPTED with a new changeset ID, or REJECTED with errors.
19pub async fn handle_submit(
20    server: &ProtocolServer,
21    req: SubmitRequest,
22) -> Result<Response<SubmitResponse>, Status> {
23    // 1. Validate session
24    let session = server.validate_session(&req.session_id)?;
25
26    // Validate all file paths before any processing
27    for change in &req.changes {
28        validate_file_path(&change.file_path)?;
29    }
30
31    let sid = req
32        .session_id
33        .parse::<uuid::Uuid>()
34        .map_err(|_| Status::invalid_argument("Invalid session ID"))?;
35    server.session_mgr().touch_session(&sid);
36
37    // 2. Resolve repo — extract work_dir, repo_id, and file-existence checks
38    //    in a single get_repo call. The `GitRepository` (gix::Repository is
39    //    !Sync) is dropped before any subsequent .await.
40    let engine = server.engine();
41
42    // Parse changeset_id from request
43    let changeset_id = req.changeset_id.parse::<uuid::Uuid>()
44        .map_err(|_| Status::invalid_argument("invalid changeset_id"))?;
45
46    // Get workspace for this session
47    let ws = engine
48        .workspace_manager()
49        .get_workspace(&sid)
50        .ok_or_else(|| Status::not_found("Workspace not found for session"))?;
51
52    let base_commit = ws.base_commit.clone();
53
54    // Single get_repo call: extract work_dir and pre-compute is_new for each file
55    let (repo_id, work_dir, file_checks) = {
56        let (repo_id, git_repo) = engine
57            .get_repo(&session.codebase)
58            .await
59            .map_err(|e| Status::internal(format!("Repo error: {e}")))?;
60
61        let work_dir = git_repo.path().to_path_buf();
62
63        let checks: Vec<(&crate::Change, bool)> = req
64            .changes
65            .iter()
66            .map(|change| {
67                let exists_in_base = git_repo
68                    .read_tree_entry(&base_commit, &change.file_path)
69                    .is_ok();
70                (change, exists_in_base)
71            })
72            .collect();
73
74        (repo_id, work_dir, checks)
75        // git_repo is dropped here
76    };
77
78    // Snapshot the existing symbols per file (file_path -> (qualified_name -> id))
79    // for files that will be changed.  After re-indexing we compare by ID so that
80    // modifications (same name, new UUID) are still detected.
81    let mut pre_submit_symbols: std::collections::HashMap<String, std::collections::HashMap<String, uuid::Uuid>> = {
82        let mut file_syms = std::collections::HashMap::new();
83        for change in &req.changes {
84            let entry: &mut std::collections::HashMap<String, uuid::Uuid> = file_syms.entry(change.file_path.clone()).or_default();
85            if let Ok(symbols) = engine.symbol_store().find_by_file(repo_id, &change.file_path).await {
86                for sym in symbols {
87                    entry.insert(sym.qualified_name, sym.id);
88                }
89            }
90        }
91        file_syms
92    };
93
94    // Snapshot overlay early so we can reuse it for both pre_submit_symbols
95    // (MCP path) and later for changeset_files, avoiding a redundant call.
96    let early_overlay_snapshot = if req.changes.is_empty() {
97        let snap = ws.overlay.list_changes();
98        // Populate pre_submit_symbols from overlay paths so symbol diffs are accurate.
99        for (path, _) in &snap {
100            let entry = pre_submit_symbols.entry(path.clone()).or_default();
101            if let Ok(symbols) = engine.symbol_store().find_by_file(repo_id, path).await {
102                for sym in symbols {
103                    entry.insert(sym.qualified_name, sym.id);
104                }
105            }
106        }
107        Some(snap)
108    } else {
109        None
110    };
111
112    let mut errors = Vec::new();
113    let mut changed_files = Vec::new();
114
115    // 3. Apply each change through the session workspace overlay.
116    for (change, exists_in_base) in &file_checks {
117        match change.r#type() {
118            ChangeType::ModifyFunction | ChangeType::ModifyType => {
119                // Target file must already exist in base or overlay
120                let in_overlay = ws.overlay.contains(&change.file_path);
121                if !exists_in_base && !in_overlay {
122                    errors.push(SubmitError {
123                        message: format!("File not found: {}", change.file_path),
124                        symbol_id: change.old_symbol_id.clone(),
125                        file_path: Some(change.file_path.clone()),
126                    });
127                    continue;
128                }
129                let is_new = !exists_in_base;
130                if let Err(e) = ws
131                    .overlay
132                    .write(&change.file_path, change.new_source.as_bytes().to_vec(), is_new)
133                    .await
134                {
135                    errors.push(SubmitError {
136                        message: format!("Write failed: {e}"),
137                        symbol_id: None,
138                        file_path: Some(change.file_path.clone()),
139                    });
140                    continue;
141                }
142                changed_files.push(PathBuf::from(&change.file_path));
143            }
144
145            ChangeType::AddFunction | ChangeType::AddType | ChangeType::AddDependency => {
146                let is_new = !exists_in_base;
147                if let Err(e) = ws
148                    .overlay
149                    .write(&change.file_path, change.new_source.as_bytes().to_vec(), is_new)
150                    .await
151                {
152                    errors.push(SubmitError {
153                        message: format!("Write failed: {e}"),
154                        symbol_id: None,
155                        file_path: Some(change.file_path.clone()),
156                    });
157                    continue;
158                }
159                changed_files.push(PathBuf::from(&change.file_path));
160            }
161
162            ChangeType::DeleteFunction => {
163                // For deletes we track the file as changed so the engine
164                // can re-index it (the function body will have been removed
165                // from the source by the agent).
166                changed_files.push(PathBuf::from(&change.file_path));
167            }
168        }
169    }
170
171    // Reuse early snapshot if available (MCP path), otherwise take it now.
172    let overlay_snapshot = early_overlay_snapshot.unwrap_or_else(|| ws.overlay.list_changes());
173
174    // Reject empty changesets — there must be at least one file modification.
175    if overlay_snapshot.is_empty() && changed_files.is_empty() && errors.is_empty() {
176        warn!(
177            session_id = %req.session_id,
178            "SUBMIT: rejected — no file changes in overlay"
179        );
180        return Ok(Response::new(SubmitResponse {
181            status: SubmitStatus::Rejected.into(),
182            changeset_id: String::new(),
183            new_version: None,
184            errors: vec![SubmitError {
185                message: "No changes to submit".to_string(),
186                symbol_id: None,
187                file_path: None,
188            }],
189            conflict_block: None,
190        }));
191    }
192
193    // Drop the workspace guard before further async work
194    drop(ws);
195
196    // Record file changes in changeset — always use the overlay as the
197    // single source of truth.  This unifies the "standard" path (changes
198    // sent inline via req.changes) and the "MCP" path (files written
199    // earlier via dk_file_write).  The overlay is session-scoped, so we
200    // only capture files belonging to this session.
201    for (path, entry) in &overlay_snapshot {
202        let (op, content) = match entry {
203            OverlayEntry::Added { content, .. } => {
204                ("add", Some(String::from_utf8_lossy(content).into_owned()))
205            }
206            OverlayEntry::Modified { content, .. } => {
207                ("modify", Some(String::from_utf8_lossy(content).into_owned()))
208            }
209            OverlayEntry::Deleted => ("delete", None),
210        };
211        engine.changeset_store()
212            .upsert_file(changeset_id, path, op, content.as_deref())
213            .await
214            .map_err(|e| Status::internal(format!("changeset file record failed: {e}")))?;
215        // Ensure MCP-path files end up in changed_files for re-indexing
216        if !changed_files.iter().any(|p| p.to_string_lossy() == *path) {
217            changed_files.push(PathBuf::from(path));
218        }
219    }
220
221    // If any change failed, reject the whole submission.
222    if !errors.is_empty() {
223        warn!(
224            session_id = %req.session_id,
225            error_count = errors.len(),
226            "SUBMIT: rejected with errors"
227        );
228        return Ok(Response::new(SubmitResponse {
229            status: SubmitStatus::Rejected.into(),
230            changeset_id: String::new(),
231            new_version: None,
232            errors,
233            conflict_block: None,
234        }));
235    }
236
237    // 4. Re-index changed files through the semantic graph.
238    //    Use `update_files_by_root` which takes a `&Path` instead of
239    //    `&GitRepository` (the latter is !Sync and cannot cross .await).
240    if let Err(e) = engine
241        .update_files_by_root(repo_id, &work_dir, &changed_files)
242        .await
243    {
244        return Ok(Response::new(SubmitResponse {
245            status: SubmitStatus::Rejected.into(),
246            changeset_id: String::new(),
247            new_version: None,
248            errors: vec![SubmitError {
249                message: format!("Re-indexing failed: {e}"),
250                symbol_id: None,
251                file_path: None,
252            }],
253            conflict_block: None,
254        }));
255    }
256
257    // Record only NEW or CHANGED symbols in the changeset.
258    // A symbol is "affected" if:
259    //   (a) its qualified_name did not exist before (new symbol), OR
260    //   (b) its qualified_name existed but its UUID changed after re-index
261    //       (the symbol was modified -- see symbols.rs ON CONFLICT ... SET id).
262    for file_path in &changed_files {
263        let rel_str = file_path.to_string_lossy().to_string();
264        let file_pre_syms = pre_submit_symbols.get(&rel_str);
265        if let Ok(new_symbols) = engine.symbol_store().find_by_file(repo_id, &rel_str).await {
266            for sym in &new_symbols {
267                // Record if the symbol is new OR its ID changed (modified).
268                let unchanged = file_pre_syms
269                    .and_then(|m| m.get(&sym.qualified_name))
270                    .is_some_and(|old_id| *old_id == sym.id);
271                if !unchanged {
272                    let _ = engine.changeset_store()
273                        .record_affected_symbol(changeset_id, sym.id, &sym.qualified_name)
274                        .await;
275                }
276            }
277        }
278    }
279
280    // Update changeset status to "submitted"
281    engine.changeset_store().update_status(changeset_id, "submitted").await
282        .map_err(|e| Status::internal(format!("changeset status update failed: {e}")))?;
283
284    // Build affected_files list from overlay (unified source of truth)
285    let affected_files: Vec<crate::FileChange> = overlay_snapshot.iter().map(|(path, entry)| {
286        let operation = match entry {
287            OverlayEntry::Added { .. } => "add",
288            OverlayEntry::Modified { .. } => "modify",
289            OverlayEntry::Deleted => "delete",
290        };
291        crate::FileChange {
292            path: path.clone(),
293            operation: operation.to_string(),
294        }
295    }).collect();
296
297    // Build symbol_changes from pre/post symbol comparison
298    let mut symbol_changes: Vec<crate::SymbolChangeDetail> = Vec::new();
299    for file_path in &changed_files {
300        let rel_str = file_path.to_string_lossy().to_string();
301        let file_pre_syms = pre_submit_symbols.get(&rel_str);
302        if let Ok(new_symbols) = engine.symbol_store().find_by_file(repo_id, &rel_str).await {
303            // Detect added/modified symbols
304            for sym in &new_symbols {
305                let change_type = match file_pre_syms.and_then(|m| m.get(&sym.qualified_name)) {
306                    Some(old_id) if *old_id == sym.id => continue, // unchanged
307                    Some(_) => "modified",
308                    None => "added",
309                };
310                symbol_changes.push(crate::SymbolChangeDetail {
311                    symbol_name: sym.qualified_name.clone(),
312                    file_path: rel_str.clone(),
313                    change_type: change_type.to_string(),
314                    kind: sym.kind.to_string(),
315                });
316            }
317            // Detect deleted symbols: only check symbols that belonged to THIS file
318            if let Some(old_syms) = file_pre_syms {
319                for name in old_syms.keys() {
320                    let still_exists = new_symbols.iter().any(|s| s.qualified_name == *name);
321                    if !still_exists {
322                        symbol_changes.push(crate::SymbolChangeDetail {
323                            symbol_name: name.clone(),
324                            file_path: rel_str.clone(),
325                            change_type: "deleted".to_string(),
326                            kind: String::new(),
327                        });
328                    }
329                }
330            }
331        }
332    }
333
334    // Publish event
335    server.event_bus().publish(crate::WatchEvent {
336        event_type: "changeset.submitted".to_string(),
337        changeset_id: changeset_id.to_string(),
338        agent_id: session.agent_id.clone(),
339        affected_symbols: vec![],
340        details: req.intent.clone(),
341        session_id: req.session_id.clone(),
342        affected_files,
343        symbol_changes,
344        repo_id: repo_id.to_string(),
345        event_id: uuid::Uuid::new_v4().to_string(),
346    });
347
348    // Read HEAD version without holding the GitRepository across awaits.
349    let new_version = {
350        let (_repo_id, git_repo) = engine
351            .get_repo(&session.codebase)
352            .await
353            .map_err(|e| Status::internal(format!("Repo error (head read): {e}")))?;
354        git_repo
355            .head_hash()
356            .ok()
357            .flatten()
358            .unwrap_or_else(|| "pending".to_string())
359    };
360
361    // 5. Return ACCEPTED with the changeset ID from the request.
362    info!(
363        session_id = %req.session_id,
364        changeset_id = %changeset_id,
365        files_changed = changed_files.len(),
366        "SUBMIT: accepted"
367    );
368
369    Ok(Response::new(SubmitResponse {
370        status: SubmitStatus::Accepted.into(),
371        changeset_id: changeset_id.to_string(),
372        new_version: Some(new_version),
373        errors: vec![],
374        conflict_block: None,
375    }))
376}