1use std::path::PathBuf;
2
3use tonic::{Response, Status};
4use tracing::{info, warn};
5
6use dk_engine::workspace::overlay::OverlayEntry;
7
8use crate::server::ProtocolServer;
9use crate::validation::validate_file_path;
10use crate::{ChangeType, SubmitError, SubmitRequest, SubmitResponse, SubmitStatus};
11
12pub async fn handle_submit(
20 server: &ProtocolServer,
21 req: SubmitRequest,
22) -> Result<Response<SubmitResponse>, Status> {
23 let session = server.validate_session(&req.session_id)?;
25
26 for change in &req.changes {
28 validate_file_path(&change.file_path)?;
29 }
30
31 let sid = req
32 .session_id
33 .parse::<uuid::Uuid>()
34 .map_err(|_| Status::invalid_argument("Invalid session ID"))?;
35 server.session_mgr().touch_session(&sid);
36
37 let engine = server.engine();
41
42 let changeset_id = req.changeset_id.parse::<uuid::Uuid>()
44 .map_err(|_| Status::invalid_argument("invalid changeset_id"))?;
45
46 let ws = engine
48 .workspace_manager()
49 .get_workspace(&sid)
50 .ok_or_else(|| Status::not_found("Workspace not found for session"))?;
51
52 let base_commit = ws.base_commit.clone();
53
54 let (repo_id, work_dir, file_checks) = {
56 let (repo_id, git_repo) = engine
57 .get_repo(&session.codebase)
58 .await
59 .map_err(|e| Status::internal(format!("Repo error: {e}")))?;
60
61 let work_dir = git_repo.path().to_path_buf();
62
63 let checks: Vec<(&crate::Change, bool)> = req
64 .changes
65 .iter()
66 .map(|change| {
67 let exists_in_base = git_repo
68 .read_tree_entry(&base_commit, &change.file_path)
69 .is_ok();
70 (change, exists_in_base)
71 })
72 .collect();
73
74 (repo_id, work_dir, checks)
75 };
77
78 let mut pre_submit_symbols: std::collections::HashMap<String, std::collections::HashMap<String, uuid::Uuid>> = {
82 let mut file_syms = std::collections::HashMap::new();
83 for change in &req.changes {
84 let entry: &mut std::collections::HashMap<String, uuid::Uuid> = file_syms.entry(change.file_path.clone()).or_default();
85 if let Ok(symbols) = engine.symbol_store().find_by_file(repo_id, &change.file_path).await {
86 for sym in symbols {
87 entry.insert(sym.qualified_name, sym.id);
88 }
89 }
90 }
91 file_syms
92 };
93
94 let early_overlay_snapshot = if req.changes.is_empty() {
97 let snap = ws.overlay.list_changes();
98 for (path, _) in &snap {
100 let entry = pre_submit_symbols.entry(path.clone()).or_default();
101 if let Ok(symbols) = engine.symbol_store().find_by_file(repo_id, path).await {
102 for sym in symbols {
103 entry.insert(sym.qualified_name, sym.id);
104 }
105 }
106 }
107 Some(snap)
108 } else {
109 None
110 };
111
112 let mut errors = Vec::new();
113 let mut changed_files = Vec::new();
114
115 for (change, exists_in_base) in &file_checks {
117 match change.r#type() {
118 ChangeType::ModifyFunction | ChangeType::ModifyType => {
119 let in_overlay = ws.overlay.contains(&change.file_path);
121 if !exists_in_base && !in_overlay {
122 errors.push(SubmitError {
123 message: format!("File not found: {}", change.file_path),
124 symbol_id: change.old_symbol_id.clone(),
125 file_path: Some(change.file_path.clone()),
126 });
127 continue;
128 }
129 let is_new = !exists_in_base;
130 if let Err(e) = ws
131 .overlay
132 .write(&change.file_path, change.new_source.as_bytes().to_vec(), is_new)
133 .await
134 {
135 errors.push(SubmitError {
136 message: format!("Write failed: {e}"),
137 symbol_id: None,
138 file_path: Some(change.file_path.clone()),
139 });
140 continue;
141 }
142 changed_files.push(PathBuf::from(&change.file_path));
143 }
144
145 ChangeType::AddFunction | ChangeType::AddType | ChangeType::AddDependency => {
146 let is_new = !exists_in_base;
147 if let Err(e) = ws
148 .overlay
149 .write(&change.file_path, change.new_source.as_bytes().to_vec(), is_new)
150 .await
151 {
152 errors.push(SubmitError {
153 message: format!("Write failed: {e}"),
154 symbol_id: None,
155 file_path: Some(change.file_path.clone()),
156 });
157 continue;
158 }
159 changed_files.push(PathBuf::from(&change.file_path));
160 }
161
162 ChangeType::DeleteFunction => {
163 changed_files.push(PathBuf::from(&change.file_path));
167 }
168 }
169 }
170
171 let overlay_snapshot = early_overlay_snapshot.unwrap_or_else(|| ws.overlay.list_changes());
173
174 if overlay_snapshot.is_empty() && changed_files.is_empty() && errors.is_empty() {
176 warn!(
177 session_id = %req.session_id,
178 "SUBMIT: rejected — no file changes in overlay"
179 );
180 return Ok(Response::new(SubmitResponse {
181 status: SubmitStatus::Rejected.into(),
182 changeset_id: String::new(),
183 new_version: None,
184 errors: vec![SubmitError {
185 message: "No changes to submit".to_string(),
186 symbol_id: None,
187 file_path: None,
188 }],
189 conflict_block: None,
190 }));
191 }
192
193 drop(ws);
195
196 for (path, entry) in &overlay_snapshot {
202 let (op, content) = match entry {
203 OverlayEntry::Added { content, .. } => {
204 ("add", Some(String::from_utf8_lossy(content).into_owned()))
205 }
206 OverlayEntry::Modified { content, .. } => {
207 ("modify", Some(String::from_utf8_lossy(content).into_owned()))
208 }
209 OverlayEntry::Deleted => ("delete", None),
210 };
211 engine.changeset_store()
212 .upsert_file(changeset_id, path, op, content.as_deref())
213 .await
214 .map_err(|e| Status::internal(format!("changeset file record failed: {e}")))?;
215 if !changed_files.iter().any(|p| p.to_string_lossy() == *path) {
217 changed_files.push(PathBuf::from(path));
218 }
219 }
220
221 if !errors.is_empty() {
223 warn!(
224 session_id = %req.session_id,
225 error_count = errors.len(),
226 "SUBMIT: rejected with errors"
227 );
228 return Ok(Response::new(SubmitResponse {
229 status: SubmitStatus::Rejected.into(),
230 changeset_id: String::new(),
231 new_version: None,
232 errors,
233 conflict_block: None,
234 }));
235 }
236
237 if let Err(e) = engine
241 .update_files_by_root(repo_id, &work_dir, &changed_files)
242 .await
243 {
244 return Ok(Response::new(SubmitResponse {
245 status: SubmitStatus::Rejected.into(),
246 changeset_id: String::new(),
247 new_version: None,
248 errors: vec![SubmitError {
249 message: format!("Re-indexing failed: {e}"),
250 symbol_id: None,
251 file_path: None,
252 }],
253 conflict_block: None,
254 }));
255 }
256
257 for file_path in &changed_files {
263 let rel_str = file_path.to_string_lossy().to_string();
264 let file_pre_syms = pre_submit_symbols.get(&rel_str);
265 if let Ok(new_symbols) = engine.symbol_store().find_by_file(repo_id, &rel_str).await {
266 for sym in &new_symbols {
267 let unchanged = file_pre_syms
269 .and_then(|m| m.get(&sym.qualified_name))
270 .is_some_and(|old_id| *old_id == sym.id);
271 if !unchanged {
272 let _ = engine.changeset_store()
273 .record_affected_symbol(changeset_id, sym.id, &sym.qualified_name)
274 .await;
275 }
276 }
277 }
278 }
279
280 engine.changeset_store().update_status(changeset_id, "submitted").await
282 .map_err(|e| Status::internal(format!("changeset status update failed: {e}")))?;
283
284 let affected_files: Vec<crate::FileChange> = overlay_snapshot.iter().map(|(path, entry)| {
286 let operation = match entry {
287 OverlayEntry::Added { .. } => "add",
288 OverlayEntry::Modified { .. } => "modify",
289 OverlayEntry::Deleted => "delete",
290 };
291 crate::FileChange {
292 path: path.clone(),
293 operation: operation.to_string(),
294 }
295 }).collect();
296
297 let mut symbol_changes: Vec<crate::SymbolChangeDetail> = Vec::new();
299 for file_path in &changed_files {
300 let rel_str = file_path.to_string_lossy().to_string();
301 let file_pre_syms = pre_submit_symbols.get(&rel_str);
302 if let Ok(new_symbols) = engine.symbol_store().find_by_file(repo_id, &rel_str).await {
303 for sym in &new_symbols {
305 let change_type = match file_pre_syms.and_then(|m| m.get(&sym.qualified_name)) {
306 Some(old_id) if *old_id == sym.id => continue, Some(_) => "modified",
308 None => "added",
309 };
310 symbol_changes.push(crate::SymbolChangeDetail {
311 symbol_name: sym.qualified_name.clone(),
312 file_path: rel_str.clone(),
313 change_type: change_type.to_string(),
314 kind: sym.kind.to_string(),
315 });
316 }
317 if let Some(old_syms) = file_pre_syms {
319 for name in old_syms.keys() {
320 let still_exists = new_symbols.iter().any(|s| s.qualified_name == *name);
321 if !still_exists {
322 symbol_changes.push(crate::SymbolChangeDetail {
323 symbol_name: name.clone(),
324 file_path: rel_str.clone(),
325 change_type: "deleted".to_string(),
326 kind: String::new(),
327 });
328 }
329 }
330 }
331 }
332 }
333
334 server.event_bus().publish(crate::WatchEvent {
336 event_type: "changeset.submitted".to_string(),
337 changeset_id: changeset_id.to_string(),
338 agent_id: session.agent_id.clone(),
339 affected_symbols: vec![],
340 details: req.intent.clone(),
341 session_id: req.session_id.clone(),
342 affected_files,
343 symbol_changes,
344 repo_id: repo_id.to_string(),
345 event_id: uuid::Uuid::new_v4().to_string(),
346 });
347
348 let new_version = {
350 let (_repo_id, git_repo) = engine
351 .get_repo(&session.codebase)
352 .await
353 .map_err(|e| Status::internal(format!("Repo error (head read): {e}")))?;
354 git_repo
355 .head_hash()
356 .ok()
357 .flatten()
358 .unwrap_or_else(|| "pending".to_string())
359 };
360
361 info!(
363 session_id = %req.session_id,
364 changeset_id = %changeset_id,
365 files_changed = changed_files.len(),
366 "SUBMIT: accepted"
367 );
368
369 Ok(Response::new(SubmitResponse {
370 status: SubmitStatus::Accepted.into(),
371 changeset_id: changeset_id.to_string(),
372 new_version: Some(new_version),
373 errors: vec![],
374 conflict_block: None,
375 }))
376}