1use tonic::Status;
2use uuid::Uuid;
3
4use dk_engine::workspace::merge::{merge_workspace, WorkspaceMergeResult};
5
6use crate::server::ProtocolServer;
7use crate::{merge_response, ConflictDetail, MergeConflict, MergeRequest, MergeResponse, MergeSuccess};
8
9const CONFLICT_TYPE_TRUE: &str = "true_conflict";
11
12fn sanitize_for_proto(s: &str) -> String {
19 s.replace('\0', "")
20}
21
22pub async fn handle_merge(
23 server: &ProtocolServer,
24 req: MergeRequest,
25) -> Result<MergeResponse, Status> {
26 let session = server.validate_session(&req.session_id)?;
27 let engine = server.engine();
28
29 let sid = req
30 .session_id
31 .parse::<Uuid>()
32 .map_err(|_| Status::invalid_argument("Invalid session ID"))?;
33
34 let repo_id_str = match engine.get_repo(&session.codebase).await {
36 Ok((rid, _)) => rid.to_string(),
37 Err(_) => String::new(),
38 };
39
40 let changeset_id = req.changeset_id.parse::<Uuid>()
41 .map_err(|_| Status::invalid_argument("invalid changeset_id"))?;
42
43 let changeset = engine.changeset_store().get(changeset_id).await
45 .map_err(|e| Status::not_found(e.to_string()))?;
46
47 if changeset.state != "approved" {
48 return Err(Status::failed_precondition(format!(
49 "changeset is '{}', must be 'approved' to merge",
50 changeset.state
51 )));
52 }
53
54 let ws = engine
56 .workspace_manager()
57 .get_workspace(&sid)
58 .ok_or_else(|| Status::not_found("Workspace not found for session"))?;
59
60 let (_, git_repo) = engine.get_repo(&session.codebase).await
62 .map_err(|e| Status::internal(e.to_string()))?;
63
64 let agent = changeset.agent_id.as_deref().unwrap_or("agent");
65
66 let (effective_name, effective_email) =
67 dk_core::resolve_author(&req.author_name, &req.author_email, agent);
68
69 let affected_files: Vec<crate::FileChange> = ws.overlay.list_changes()
71 .iter()
72 .map(|(path, entry)| {
73 let operation = match entry {
74 dk_engine::workspace::overlay::OverlayEntry::Added { .. } => "add",
75 dk_engine::workspace::overlay::OverlayEntry::Modified { .. } => "modify",
76 dk_engine::workspace::overlay::OverlayEntry::Deleted => "delete",
77 };
78 crate::FileChange {
79 path: path.clone(),
80 operation: operation.to_string(),
81 }
82 })
83 .collect();
84
85 let merge_result = merge_workspace(
87 &ws,
88 &git_repo,
89 engine.parser(),
90 &req.commit_message,
91 &effective_name,
92 &effective_email,
93 )
94 .map_err(|e| Status::internal(format!("merge failed: {e}")))?;
95
96 drop(ws);
98
99 match merge_result {
100 WorkspaceMergeResult::FastMerge { commit_hash } => {
101 engine.changeset_store().set_merged(changeset_id, &commit_hash).await
103 .map_err(|e| Status::internal(e.to_string()))?;
104
105 server.event_bus().publish(crate::WatchEvent {
107 event_type: "changeset.merged".to_string(),
108 changeset_id: changeset_id.to_string(),
109 agent_id: changeset.agent_id.clone().unwrap_or_default(),
110 affected_symbols: vec![],
111 details: format!("fast-merged as {}", commit_hash),
112 session_id: req.session_id.clone(),
113 affected_files: affected_files.clone(),
114 symbol_changes: vec![],
115 repo_id: repo_id_str.clone(),
116 event_id: Uuid::new_v4().to_string(),
117 });
118
119 Ok(MergeResponse {
120 result: Some(merge_response::Result::Success(MergeSuccess {
121 commit_hash: commit_hash.clone(),
122 merged_version: commit_hash,
123 auto_rebased: false,
124 auto_rebased_files: Vec::new(),
125 })),
126 })
127 }
128
129 WorkspaceMergeResult::RebaseMerge {
130 commit_hash,
131 auto_rebased_files,
132 } => {
133 engine.changeset_store().set_merged(changeset_id, &commit_hash).await
135 .map_err(|e| Status::internal(e.to_string()))?;
136
137 server.event_bus().publish(crate::WatchEvent {
139 event_type: "changeset.merged".to_string(),
140 changeset_id: changeset_id.to_string(),
141 agent_id: changeset.agent_id.clone().unwrap_or_default(),
142 affected_symbols: vec![],
143 details: format!(
144 "rebase-merged as {} (auto-rebased {} files)",
145 commit_hash,
146 auto_rebased_files.len()
147 ),
148 session_id: req.session_id.clone(),
149 affected_files,
150 symbol_changes: vec![],
151 repo_id: repo_id_str.clone(),
152 event_id: Uuid::new_v4().to_string(),
153 });
154
155 Ok(MergeResponse {
156 result: Some(merge_response::Result::Success(MergeSuccess {
157 commit_hash: commit_hash.clone(),
158 merged_version: commit_hash,
159 auto_rebased: true,
160 auto_rebased_files,
161 })),
162 })
163 }
164
165 WorkspaceMergeResult::Conflicts { conflicts } => {
166 let conflict_details: Vec<ConflictDetail> = conflicts
167 .iter()
168 .map(|c| {
169 let file = sanitize_for_proto(&c.file_path);
170 let symbol = sanitize_for_proto(&c.symbol_name);
171 ConflictDetail {
172 file_path: file,
173 symbols: vec![symbol.clone()],
174 your_agent: agent.to_string(),
175 their_agent: String::new(),
178 conflict_type: CONFLICT_TYPE_TRUE.to_string(),
179 description: format!(
180 "Symbol '{}' — our change: {:?}, their change: {:?}",
181 symbol, c.our_change, c.their_change
182 ),
183 }
184 })
185 .collect();
186
187 let suggested_action = "adapt".to_string();
188 let available_actions = vec!["adapt".to_string(), "keep_mine".to_string(), "keep_theirs".to_string()];
189
190 debug_assert!(
191 available_actions.iter().any(|a| a == &suggested_action),
192 "suggested_action '{}' is not in available_actions {:?}",
193 suggested_action, available_actions
194 );
195
196 Ok(MergeResponse {
197 result: Some(merge_response::Result::Conflict(MergeConflict {
198 changeset_id: changeset_id.to_string(),
199 conflicts: conflict_details,
200 suggested_action,
201 available_actions,
202 })),
203 })
204 }
205 }
206}
207
208pub const EVENT_MERGED: &str = "changeset.merged";
212
213#[cfg(test)]
214mod tests {
215 use super::*;
216
217 #[test]
218 fn merged_event_type() {
219 assert_eq!(EVENT_MERGED, "changeset.merged");
220 }
221
222 #[test]
223 fn merged_event_type_uses_dot_separator() {
224 assert!(
225 EVENT_MERGED.contains('.'),
226 "event type should use dot separator"
227 );
228 assert!(
229 EVENT_MERGED.starts_with("changeset."),
230 "event type should start with 'changeset.'"
231 );
232 }
233
234 #[test]
235 fn merged_event_type_is_not_underscore_format() {
236 assert_ne!(EVENT_MERGED, "changeset_merged");
238 assert_eq!(EVENT_MERGED, "changeset.merged");
239 }
240
241 #[test]
242 fn merge_success_construction() {
243 let resp = MergeResponse {
244 result: Some(merge_response::Result::Success(MergeSuccess {
245 commit_hash: "abc123".to_string(),
246 merged_version: "abc123".to_string(),
247 auto_rebased: false,
248 auto_rebased_files: Vec::new(),
249 })),
250 };
251 match resp.result {
252 Some(merge_response::Result::Success(s)) => {
253 assert_eq!(s.commit_hash, "abc123");
254 assert!(!s.auto_rebased);
255 assert!(s.auto_rebased_files.is_empty());
256 }
257 _ => panic!("expected MergeSuccess"),
258 }
259 }
260
261 #[test]
262 fn merge_success_with_rebase() {
263 let resp = MergeResponse {
264 result: Some(merge_response::Result::Success(MergeSuccess {
265 commit_hash: "def456".to_string(),
266 merged_version: "def456".to_string(),
267 auto_rebased: true,
268 auto_rebased_files: vec!["src/main.rs".to_string()],
269 })),
270 };
271 match resp.result {
272 Some(merge_response::Result::Success(s)) => {
273 assert!(s.auto_rebased);
274 assert_eq!(s.auto_rebased_files, vec!["src/main.rs"]);
275 }
276 _ => panic!("expected MergeSuccess"),
277 }
278 }
279
280 #[test]
281 fn merge_conflict_construction() {
282 let detail = ConflictDetail {
286 file_path: "src/lib.rs".to_string(),
287 symbols: vec!["process_data".to_string()],
288 your_agent: "agent-1".to_string(),
289 their_agent: String::new(),
290 conflict_type: CONFLICT_TYPE_TRUE.to_string(),
291 description: "both agents modified process_data".to_string(),
292 };
293 let resp = MergeResponse {
294 result: Some(merge_response::Result::Conflict(MergeConflict {
295 changeset_id: "cs-001".to_string(),
296 conflicts: vec![detail],
297 suggested_action: "adapt".to_string(),
298 available_actions: vec![
299 "adapt".to_string(),
300 "keep_mine".to_string(),
301 "keep_theirs".to_string(),
302 ],
303 })),
304 };
305 match resp.result {
306 Some(merge_response::Result::Conflict(c)) => {
307 assert_eq!(c.changeset_id, "cs-001");
308 assert_eq!(c.conflicts.len(), 1);
309 assert_eq!(c.conflicts[0].file_path, "src/lib.rs");
310 assert_eq!(c.conflicts[0].symbols, vec!["process_data"]);
311 assert_eq!(c.conflicts[0].your_agent, "agent-1");
312 assert!(c.conflicts[0].their_agent.is_empty());
313 assert_eq!(c.suggested_action, "adapt");
314 assert_eq!(c.available_actions.len(), 3);
315 }
316 _ => panic!("expected MergeConflict"),
317 }
318 }
319
320 #[test]
321 fn conflict_detail_fields() {
322 let detail = ConflictDetail {
323 file_path: "src/handler.rs".to_string(),
324 symbols: vec!["handle_request".to_string(), "parse_input".to_string()],
325 your_agent: "agent-a".to_string(),
326 their_agent: "agent-b".to_string(),
327 conflict_type: CONFLICT_TYPE_TRUE.to_string(),
328 description: "multiple symbols in conflict".to_string(),
329 };
330 assert_eq!(detail.symbols.len(), 2);
331 assert_eq!(detail.conflict_type, CONFLICT_TYPE_TRUE);
332 }
333
334 #[test]
335 fn sanitize_for_proto_strips_null_bytes() {
336 assert_eq!(sanitize_for_proto("hello\0world"), "helloworld");
337 assert_eq!(sanitize_for_proto("\0\0"), "");
338 assert_eq!(sanitize_for_proto("clean"), "clean");
339 }
340
341 #[test]
342 fn sanitize_for_proto_preserves_valid_utf8() {
343 assert_eq!(sanitize_for_proto("fn résumé()"), "fn résumé()");
345 assert_eq!(sanitize_for_proto("日本語"), "日本語");
346 assert_eq!(sanitize_for_proto("bad\u{FFFD}char"), "bad\u{FFFD}char");
348 }
349}