1use tonic::Status;
2use uuid::Uuid;
3
4use dk_engine::workspace::merge::{merge_workspace, WorkspaceMergeResult};
5
6use crate::server::ProtocolServer;
7use crate::{merge_response, ConflictDetail, MergeConflict, MergeRequest, MergeResponse, MergeSuccess};
8
9const CONFLICT_TYPE_TRUE: &str = "true_conflict";
11
12use dk_core::sanitize_for_proto;
13
14pub async fn handle_merge(
15 server: &ProtocolServer,
16 req: MergeRequest,
17) -> Result<MergeResponse, Status> {
18 let session = server.validate_session(&req.session_id)?;
19 let engine = server.engine();
20
21 let sid = req
22 .session_id
23 .parse::<Uuid>()
24 .map_err(|_| Status::invalid_argument("Invalid session ID"))?;
25
26 let repo_id_str = match engine.get_repo(&session.codebase).await {
28 Ok((rid, _)) => rid.to_string(),
29 Err(_) => String::new(),
30 };
31
32 let changeset_id = req.changeset_id.parse::<Uuid>()
33 .map_err(|_| Status::invalid_argument("invalid changeset_id"))?;
34
35 let changeset = engine.changeset_store().get(changeset_id).await
37 .map_err(|e| Status::not_found(e.to_string()))?;
38
39 if changeset.state != "approved" {
40 return Err(Status::failed_precondition(format!(
41 "changeset is '{}', must be 'approved' to merge",
42 changeset.state
43 )));
44 }
45
46 let ws = engine
48 .workspace_manager()
49 .get_workspace(&sid)
50 .ok_or_else(|| Status::not_found("Workspace not found for session"))?;
51
52 let (repo_id, git_repo) = engine.get_repo(&session.codebase).await
55 .map_err(|e| Status::internal(e.to_string()))?;
56
57 let agent = changeset.agent_id.as_deref().unwrap_or("agent");
58
59 let (effective_name, effective_email) =
60 dk_core::resolve_author(&req.author_name, &req.author_email, agent);
61
62 let affected_files: Vec<crate::FileChange> = ws.overlay.list_changes()
64 .iter()
65 .map(|(path, entry)| {
66 let operation = match entry {
67 dk_engine::workspace::overlay::OverlayEntry::Added { .. } => "add",
68 dk_engine::workspace::overlay::OverlayEntry::Modified { .. } => "modify",
69 dk_engine::workspace::overlay::OverlayEntry::Deleted => "delete",
70 };
71 crate::FileChange {
72 path: path.clone(),
73 operation: operation.to_string(),
74 }
75 })
76 .collect();
77
78 let merge_result = merge_workspace(
80 &ws,
81 &git_repo,
82 engine.parser(),
83 &req.commit_message,
84 &effective_name,
85 &effective_email,
86 )
87 .map_err(|e| Status::internal(format!("merge failed: {e}")))?;
88
89 drop(ws);
91
92 match merge_result {
93 WorkspaceMergeResult::FastMerge { commit_hash } => {
94 release_locks_and_emit(server, repo_id, sid, &req.session_id).await;
97
98 engine.changeset_store().set_merged(changeset_id, &commit_hash).await
100 .map_err(|e| Status::internal(e.to_string()))?;
101
102 server.event_bus().publish(crate::WatchEvent {
104 event_type: "changeset.merged".to_string(),
105 changeset_id: changeset_id.to_string(),
106 agent_id: changeset.agent_id.clone().unwrap_or_default(),
107 affected_symbols: vec![],
108 details: format!("fast-merged as {}", commit_hash),
109 session_id: req.session_id.clone(),
110 affected_files: affected_files.clone(),
111 symbol_changes: vec![],
112 repo_id: repo_id_str.clone(),
113 event_id: Uuid::new_v4().to_string(),
114 });
115
116 Ok(MergeResponse {
117 result: Some(merge_response::Result::Success(MergeSuccess {
118 commit_hash: commit_hash.clone(),
119 merged_version: commit_hash,
120 auto_rebased: false,
121 auto_rebased_files: Vec::new(),
122 })),
123 })
124 }
125
126 WorkspaceMergeResult::RebaseMerge {
127 commit_hash,
128 auto_rebased_files,
129 } => {
130 release_locks_and_emit(server, repo_id, sid, &req.session_id).await;
132
133 engine.changeset_store().set_merged(changeset_id, &commit_hash).await
135 .map_err(|e| Status::internal(e.to_string()))?;
136
137 server.event_bus().publish(crate::WatchEvent {
139 event_type: "changeset.merged".to_string(),
140 changeset_id: changeset_id.to_string(),
141 agent_id: changeset.agent_id.clone().unwrap_or_default(),
142 affected_symbols: vec![],
143 details: format!(
144 "rebase-merged as {} (auto-rebased {} files)",
145 commit_hash,
146 auto_rebased_files.len()
147 ),
148 session_id: req.session_id.clone(),
149 affected_files,
150 symbol_changes: vec![],
151 repo_id: repo_id_str.clone(),
152 event_id: Uuid::new_v4().to_string(),
153 });
154
155 Ok(MergeResponse {
156 result: Some(merge_response::Result::Success(MergeSuccess {
157 commit_hash: commit_hash.clone(),
158 merged_version: commit_hash,
159 auto_rebased: true,
160 auto_rebased_files,
161 })),
162 })
163 }
164
165 WorkspaceMergeResult::Conflicts { conflicts } => {
166 let conflict_details: Vec<ConflictDetail> = conflicts
170 .iter()
171 .map(|c| {
172 let file = sanitize_for_proto(&c.file_path);
173 let symbol = sanitize_for_proto(&c.symbol_name);
174 ConflictDetail {
175 file_path: file,
176 symbols: vec![symbol.clone()],
177 your_agent: agent.to_string(),
178 their_agent: String::new(),
181 conflict_type: CONFLICT_TYPE_TRUE.to_string(),
182 description: format!(
183 "Symbol '{}' — our change: {:?}, their change: {:?}",
184 symbol, c.our_change, c.their_change
185 ),
186 }
187 })
188 .collect();
189
190 let suggested_action = "adapt".to_string();
191 let available_actions = vec!["adapt".to_string(), "keep_mine".to_string(), "keep_theirs".to_string()];
192
193 debug_assert!(
194 available_actions.iter().any(|a| a == &suggested_action),
195 "suggested_action '{}' is not in available_actions {:?}",
196 suggested_action, available_actions
197 );
198
199 Ok(MergeResponse {
200 result: Some(merge_response::Result::Conflict(MergeConflict {
201 changeset_id: changeset_id.to_string(),
202 conflicts: conflict_details,
203 suggested_action,
204 available_actions,
205 })),
206 })
207 }
208 }
209}
210
211async fn release_locks_and_emit(
214 server: &ProtocolServer,
215 repo_id: Uuid,
216 session_id: Uuid,
217 session_id_str: &str,
218) {
219 let released = server.claim_tracker().release_locks(repo_id, session_id).await;
220
221 if released.is_empty() {
222 return;
223 }
224
225 let mut by_file: std::collections::HashMap<String, Vec<String>> = std::collections::HashMap::new();
227 for lock in &released {
228 by_file
229 .entry(lock.file_path.clone())
230 .or_default()
231 .push(lock.qualified_name.clone());
232 }
233
234 for (file_path, symbols) in by_file {
235 server.event_bus().publish(crate::WatchEvent {
236 event_type: EVENT_LOCK_RELEASED.to_string(),
237 changeset_id: String::new(),
238 agent_id: released.first().map(|r| r.agent_name.clone()).unwrap_or_default(),
239 affected_symbols: symbols,
240 details: format!("Symbol locks released on {}", file_path),
241 session_id: session_id_str.to_string(),
242 affected_files: vec![crate::FileChange {
243 path: file_path,
244 operation: "unlock".to_string(),
245 }],
246 symbol_changes: vec![],
247 repo_id: repo_id.to_string(),
248 event_id: Uuid::new_v4().to_string(),
249 });
250 }
251}
252
253pub const EVENT_MERGED: &str = "changeset.merged";
257
258pub const EVENT_LOCK_RELEASED: &str = "symbol.lock.released";
261
262#[cfg(test)]
263mod tests {
264 use super::*;
265
266 #[test]
267 fn merged_event_type() {
268 assert_eq!(EVENT_MERGED, "changeset.merged");
269 }
270
271 #[test]
272 fn merged_event_type_uses_dot_separator() {
273 assert!(
274 EVENT_MERGED.contains('.'),
275 "event type should use dot separator"
276 );
277 assert!(
278 EVENT_MERGED.starts_with("changeset."),
279 "event type should start with 'changeset.'"
280 );
281 }
282
283 #[test]
284 fn merged_event_type_is_not_underscore_format() {
285 assert_ne!(EVENT_MERGED, "changeset_merged");
287 assert_eq!(EVENT_MERGED, "changeset.merged");
288 }
289
290 #[test]
291 fn merge_success_construction() {
292 let resp = MergeResponse {
293 result: Some(merge_response::Result::Success(MergeSuccess {
294 commit_hash: "abc123".to_string(),
295 merged_version: "abc123".to_string(),
296 auto_rebased: false,
297 auto_rebased_files: Vec::new(),
298 })),
299 };
300 match resp.result {
301 Some(merge_response::Result::Success(s)) => {
302 assert_eq!(s.commit_hash, "abc123");
303 assert!(!s.auto_rebased);
304 assert!(s.auto_rebased_files.is_empty());
305 }
306 _ => panic!("expected MergeSuccess"),
307 }
308 }
309
310 #[test]
311 fn merge_success_with_rebase() {
312 let resp = MergeResponse {
313 result: Some(merge_response::Result::Success(MergeSuccess {
314 commit_hash: "def456".to_string(),
315 merged_version: "def456".to_string(),
316 auto_rebased: true,
317 auto_rebased_files: vec!["src/main.rs".to_string()],
318 })),
319 };
320 match resp.result {
321 Some(merge_response::Result::Success(s)) => {
322 assert!(s.auto_rebased);
323 assert_eq!(s.auto_rebased_files, vec!["src/main.rs"]);
324 }
325 _ => panic!("expected MergeSuccess"),
326 }
327 }
328
329 #[test]
330 fn merge_conflict_construction() {
331 let detail = ConflictDetail {
335 file_path: "src/lib.rs".to_string(),
336 symbols: vec!["process_data".to_string()],
337 your_agent: "agent-1".to_string(),
338 their_agent: String::new(),
339 conflict_type: CONFLICT_TYPE_TRUE.to_string(),
340 description: "both agents modified process_data".to_string(),
341 };
342 let resp = MergeResponse {
343 result: Some(merge_response::Result::Conflict(MergeConflict {
344 changeset_id: "cs-001".to_string(),
345 conflicts: vec![detail],
346 suggested_action: "adapt".to_string(),
347 available_actions: vec![
348 "adapt".to_string(),
349 "keep_mine".to_string(),
350 "keep_theirs".to_string(),
351 ],
352 })),
353 };
354 match resp.result {
355 Some(merge_response::Result::Conflict(c)) => {
356 assert_eq!(c.changeset_id, "cs-001");
357 assert_eq!(c.conflicts.len(), 1);
358 assert_eq!(c.conflicts[0].file_path, "src/lib.rs");
359 assert_eq!(c.conflicts[0].symbols, vec!["process_data"]);
360 assert_eq!(c.conflicts[0].your_agent, "agent-1");
361 assert!(c.conflicts[0].their_agent.is_empty());
362 assert_eq!(c.suggested_action, "adapt");
363 assert_eq!(c.available_actions.len(), 3);
364 }
365 _ => panic!("expected MergeConflict"),
366 }
367 }
368
369 #[test]
370 fn conflict_detail_fields() {
371 let detail = ConflictDetail {
372 file_path: "src/handler.rs".to_string(),
373 symbols: vec!["handle_request".to_string(), "parse_input".to_string()],
374 your_agent: "agent-a".to_string(),
375 their_agent: "agent-b".to_string(),
376 conflict_type: CONFLICT_TYPE_TRUE.to_string(),
377 description: "multiple symbols in conflict".to_string(),
378 };
379 assert_eq!(detail.symbols.len(), 2);
380 assert_eq!(detail.conflict_type, CONFLICT_TYPE_TRUE);
381 }
382
383 #[test]
384 fn sanitize_for_proto_strips_null_bytes() {
385 assert_eq!(sanitize_for_proto("hello\0world"), "helloworld");
386 assert_eq!(sanitize_for_proto("\0\0"), "");
387 assert_eq!(sanitize_for_proto("clean"), "clean");
388 }
389
390 #[test]
391 fn sanitize_for_proto_preserves_valid_utf8() {
392 assert_eq!(sanitize_for_proto("fn résumé()"), "fn résumé()");
394 assert_eq!(sanitize_for_proto("日本語"), "日本語");
395 assert_eq!(sanitize_for_proto("bad\u{FFFD}char"), "bad\u{FFFD}char");
397 }
398}