Skip to main content

dk_protocol/
merge.rs

1use tonic::Status;
2use uuid::Uuid;
3
4use dk_engine::workspace::merge::{merge_workspace, WorkspaceMergeResult};
5
6use crate::server::ProtocolServer;
7use crate::{merge_response, ConflictDetail, MergeConflict, MergeRequest, MergeResponse, MergeSuccess};
8
9/// Conflict type for true write-write semantic conflicts.
10const CONFLICT_TYPE_TRUE: &str = "true_conflict";
11
12use dk_core::sanitize_for_proto;
13
14pub async fn handle_merge(
15    server: &ProtocolServer,
16    req: MergeRequest,
17) -> Result<MergeResponse, Status> {
18    let session = server.validate_session(&req.session_id)?;
19    let engine = server.engine();
20
21    let sid = req
22        .session_id
23        .parse::<Uuid>()
24        .map_err(|_| Status::invalid_argument("Invalid session ID"))?;
25
26    // Resolve repo_id_str for enriched events (non-fatal — empty string on failure)
27    let repo_id_str = match engine.get_repo(&session.codebase).await {
28        Ok((rid, _)) => rid.to_string(),
29        Err(_) => String::new(),
30    };
31
32    let changeset_id = req.changeset_id.parse::<Uuid>()
33        .map_err(|_| Status::invalid_argument("invalid changeset_id"))?;
34
35    // Get changeset and verify it's approved
36    let changeset = engine.changeset_store().get(changeset_id).await
37        .map_err(|e| Status::not_found(e.to_string()))?;
38
39    if changeset.state != "approved" {
40        return Err(Status::failed_precondition(format!(
41            "changeset is '{}', must be 'approved' to merge",
42            changeset.state
43        )));
44    }
45
46    // Get workspace for this session
47    let ws = engine
48        .workspace_manager()
49        .get_workspace(&sid)
50        .ok_or_else(|| Status::not_found("Workspace not found for session"))?;
51
52    // Get git repo — also use this repo_id for lock release (the first get_repo
53    // call is non-fatal and may return empty string, but this one propagates errors)
54    let (repo_id, git_repo) = engine.get_repo(&session.codebase).await
55        .map_err(|e| Status::internal(e.to_string()))?;
56
57    let agent = changeset.agent_id.as_deref().unwrap_or("agent");
58
59    let (effective_name, effective_email) =
60        dk_core::resolve_author(&req.author_name, &req.author_email, agent);
61
62    // Capture affected files from workspace overlay before merge/drop
63    let affected_files: Vec<crate::FileChange> = ws.overlay.list_changes()
64        .iter()
65        .map(|(path, entry)| {
66            let operation = match entry {
67                dk_engine::workspace::overlay::OverlayEntry::Added { .. } => "add",
68                dk_engine::workspace::overlay::OverlayEntry::Modified { .. } => "modify",
69                dk_engine::workspace::overlay::OverlayEntry::Deleted => "delete",
70            };
71            crate::FileChange {
72                path: path.clone(),
73                operation: operation.to_string(),
74            }
75        })
76        .collect();
77
78    // Use the programmatic workspace merge instead of git add -A
79    let merge_result = merge_workspace(
80        &ws,
81        &git_repo,
82        engine.parser(),
83        &req.commit_message,
84        &effective_name,
85        &effective_email,
86    )
87    .map_err(|e| Status::internal(format!("merge failed: {e}")))?;
88
89    // Drop workspace guard before further async work
90    drop(ws);
91
92    match merge_result {
93        WorkspaceMergeResult::FastMerge { commit_hash } => {
94            // Release locks first — git commit is already in the tree,
95            // so locks must be freed regardless of changeset-store state.
96            release_locks_and_emit(server, repo_id, sid, &req.session_id).await;
97
98            // Update changeset status to merged
99            engine.changeset_store().set_merged(changeset_id, &commit_hash).await
100                .map_err(|e| Status::internal(e.to_string()))?;
101
102            // Publish merge event
103            server.event_bus().publish(crate::WatchEvent {
104                event_type: "changeset.merged".to_string(),
105                changeset_id: changeset_id.to_string(),
106                agent_id: changeset.agent_id.clone().unwrap_or_default(),
107                affected_symbols: vec![],
108                details: format!("fast-merged as {}", commit_hash),
109                session_id: req.session_id.clone(),
110                affected_files: affected_files.clone(),
111                symbol_changes: vec![],
112                repo_id: repo_id_str.clone(),
113                event_id: Uuid::new_v4().to_string(),
114            });
115
116            Ok(MergeResponse {
117                result: Some(merge_response::Result::Success(MergeSuccess {
118                    commit_hash: commit_hash.clone(),
119                    merged_version: commit_hash,
120                    auto_rebased: false,
121                    auto_rebased_files: Vec::new(),
122                })),
123            })
124        }
125
126        WorkspaceMergeResult::RebaseMerge {
127            commit_hash,
128            auto_rebased_files,
129        } => {
130            // Release locks first — git commit is already in the tree.
131            release_locks_and_emit(server, repo_id, sid, &req.session_id).await;
132
133            // Update changeset status to merged
134            engine.changeset_store().set_merged(changeset_id, &commit_hash).await
135                .map_err(|e| Status::internal(e.to_string()))?;
136
137            // Publish merge event
138            server.event_bus().publish(crate::WatchEvent {
139                event_type: "changeset.merged".to_string(),
140                changeset_id: changeset_id.to_string(),
141                agent_id: changeset.agent_id.clone().unwrap_or_default(),
142                affected_symbols: vec![],
143                details: format!(
144                    "rebase-merged as {} (auto-rebased {} files)",
145                    commit_hash,
146                    auto_rebased_files.len()
147                ),
148                session_id: req.session_id.clone(),
149                affected_files,
150                symbol_changes: vec![],
151                repo_id: repo_id_str.clone(),
152                event_id: Uuid::new_v4().to_string(),
153            });
154
155            Ok(MergeResponse {
156                result: Some(merge_response::Result::Success(MergeSuccess {
157                    commit_hash: commit_hash.clone(),
158                    merged_version: commit_hash,
159                    auto_rebased: true,
160                    auto_rebased_files,
161                })),
162            })
163        }
164
165        WorkspaceMergeResult::Conflicts { conflicts } => {
166            // Intentionally NOT releasing locks here. The agent retains its locks
167            // while resolving conflicts (dk_resolve → retry dk_merge). Locks are
168            // released when the session is closed (dk_close) or times out (30 min GC).
169            let conflict_details: Vec<ConflictDetail> = conflicts
170                .iter()
171                .map(|c| {
172                    let file = sanitize_for_proto(&c.file_path);
173                    let symbol = sanitize_for_proto(&c.symbol_name);
174                    ConflictDetail {
175                        file_path: file,
176                        symbols: vec![symbol.clone()],
177                        your_agent: agent.to_string(),
178                        // TODO: resolve their_agent from the session/changeset store
179                        // once SemanticConflict carries agent attribution.
180                        their_agent: String::new(),
181                        conflict_type: CONFLICT_TYPE_TRUE.to_string(),
182                        description: format!(
183                            "Symbol '{}' — our change: {:?}, their change: {:?}",
184                            symbol, c.our_change, c.their_change
185                        ),
186                    }
187                })
188                .collect();
189
190            let suggested_action = "adapt".to_string();
191            let available_actions = vec!["adapt".to_string(), "keep_mine".to_string(), "keep_theirs".to_string()];
192
193            debug_assert!(
194                available_actions.iter().any(|a| a == &suggested_action),
195                "suggested_action '{}' is not in available_actions {:?}",
196                suggested_action, available_actions
197            );
198
199            Ok(MergeResponse {
200                result: Some(merge_response::Result::Conflict(MergeConflict {
201                    changeset_id: changeset_id.to_string(),
202                    conflicts: conflict_details,
203                    suggested_action,
204                    available_actions,
205                })),
206            })
207        }
208    }
209}
210
211/// Release all symbol locks for a session and emit `symbol.lock.released` events
212/// so blocked agents can wake up and retry their writes.
213async fn release_locks_and_emit(
214    server: &ProtocolServer,
215    repo_id: Uuid,
216    session_id: Uuid,
217    session_id_str: &str,
218) {
219    let released = server.claim_tracker().release_locks(repo_id, session_id).await;
220
221    if released.is_empty() {
222        return;
223    }
224
225    // Group released locks by file_path for efficient event emission
226    let mut by_file: std::collections::HashMap<String, Vec<String>> = std::collections::HashMap::new();
227    for lock in &released {
228        by_file
229            .entry(lock.file_path.clone())
230            .or_default()
231            .push(lock.qualified_name.clone());
232    }
233
234    for (file_path, symbols) in by_file {
235        server.event_bus().publish(crate::WatchEvent {
236            event_type: EVENT_LOCK_RELEASED.to_string(),
237            changeset_id: String::new(),
238            agent_id: released.first().map(|r| r.agent_name.clone()).unwrap_or_default(),
239            affected_symbols: symbols,
240            details: format!("Symbol locks released on {}", file_path),
241            session_id: session_id_str.to_string(),
242            affected_files: vec![crate::FileChange {
243                path: file_path,
244                operation: "unlock".to_string(),
245            }],
246            symbol_changes: vec![],
247            repo_id: repo_id.to_string(),
248            event_id: Uuid::new_v4().to_string(),
249        });
250    }
251}
252
253// ── Event type constants ────────────────────────────────────────────
254
255/// Event published when a changeset is successfully merged.
256pub const EVENT_MERGED: &str = "changeset.merged";
257
258/// Event published when symbol locks are released (after merge, close, or timeout).
259/// Blocked agents watch for this to retry their `dk_file_write`.
260pub const EVENT_LOCK_RELEASED: &str = "symbol.lock.released";
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265
266    #[test]
267    fn merged_event_type() {
268        assert_eq!(EVENT_MERGED, "changeset.merged");
269    }
270
271    #[test]
272    fn merged_event_type_uses_dot_separator() {
273        assert!(
274            EVENT_MERGED.contains('.'),
275            "event type should use dot separator"
276        );
277        assert!(
278            EVENT_MERGED.starts_with("changeset."),
279            "event type should start with 'changeset.'"
280        );
281    }
282
283    #[test]
284    fn merged_event_type_is_not_underscore_format() {
285        // Verify the event was renamed from "changeset_merged" to "changeset.merged"
286        assert_ne!(EVENT_MERGED, "changeset_merged");
287        assert_eq!(EVENT_MERGED, "changeset.merged");
288    }
289
290    #[test]
291    fn merge_success_construction() {
292        let resp = MergeResponse {
293            result: Some(merge_response::Result::Success(MergeSuccess {
294                commit_hash: "abc123".to_string(),
295                merged_version: "abc123".to_string(),
296                auto_rebased: false,
297                auto_rebased_files: Vec::new(),
298            })),
299        };
300        match resp.result {
301            Some(merge_response::Result::Success(s)) => {
302                assert_eq!(s.commit_hash, "abc123");
303                assert!(!s.auto_rebased);
304                assert!(s.auto_rebased_files.is_empty());
305            }
306            _ => panic!("expected MergeSuccess"),
307        }
308    }
309
310    #[test]
311    fn merge_success_with_rebase() {
312        let resp = MergeResponse {
313            result: Some(merge_response::Result::Success(MergeSuccess {
314                commit_hash: "def456".to_string(),
315                merged_version: "def456".to_string(),
316                auto_rebased: true,
317                auto_rebased_files: vec!["src/main.rs".to_string()],
318            })),
319        };
320        match resp.result {
321            Some(merge_response::Result::Success(s)) => {
322                assert!(s.auto_rebased);
323                assert_eq!(s.auto_rebased_files, vec!["src/main.rs"]);
324            }
325            _ => panic!("expected MergeSuccess"),
326        }
327    }
328
329    #[test]
330    fn merge_conflict_construction() {
331        // their_agent is currently not populated by the server (SemanticConflict
332        // does not carry agent attribution yet), so the test mirrors real
333        // behavior by using an empty string.
334        let detail = ConflictDetail {
335            file_path: "src/lib.rs".to_string(),
336            symbols: vec!["process_data".to_string()],
337            your_agent: "agent-1".to_string(),
338            their_agent: String::new(),
339            conflict_type: CONFLICT_TYPE_TRUE.to_string(),
340            description: "both agents modified process_data".to_string(),
341        };
342        let resp = MergeResponse {
343            result: Some(merge_response::Result::Conflict(MergeConflict {
344                changeset_id: "cs-001".to_string(),
345                conflicts: vec![detail],
346                suggested_action: "adapt".to_string(),
347                available_actions: vec![
348                    "adapt".to_string(),
349                    "keep_mine".to_string(),
350                    "keep_theirs".to_string(),
351                ],
352            })),
353        };
354        match resp.result {
355            Some(merge_response::Result::Conflict(c)) => {
356                assert_eq!(c.changeset_id, "cs-001");
357                assert_eq!(c.conflicts.len(), 1);
358                assert_eq!(c.conflicts[0].file_path, "src/lib.rs");
359                assert_eq!(c.conflicts[0].symbols, vec!["process_data"]);
360                assert_eq!(c.conflicts[0].your_agent, "agent-1");
361                assert!(c.conflicts[0].their_agent.is_empty());
362                assert_eq!(c.suggested_action, "adapt");
363                assert_eq!(c.available_actions.len(), 3);
364            }
365            _ => panic!("expected MergeConflict"),
366        }
367    }
368
369    #[test]
370    fn conflict_detail_fields() {
371        let detail = ConflictDetail {
372            file_path: "src/handler.rs".to_string(),
373            symbols: vec!["handle_request".to_string(), "parse_input".to_string()],
374            your_agent: "agent-a".to_string(),
375            their_agent: "agent-b".to_string(),
376            conflict_type: CONFLICT_TYPE_TRUE.to_string(),
377            description: "multiple symbols in conflict".to_string(),
378        };
379        assert_eq!(detail.symbols.len(), 2);
380        assert_eq!(detail.conflict_type, CONFLICT_TYPE_TRUE);
381    }
382
383    #[test]
384    fn sanitize_for_proto_strips_null_bytes() {
385        assert_eq!(sanitize_for_proto("hello\0world"), "helloworld");
386        assert_eq!(sanitize_for_proto("\0\0"), "");
387        assert_eq!(sanitize_for_proto("clean"), "clean");
388    }
389
390    #[test]
391    fn sanitize_for_proto_preserves_valid_utf8() {
392        // Multi-byte UTF-8 characters must survive sanitization
393        assert_eq!(sanitize_for_proto("fn résumé()"), "fn résumé()");
394        assert_eq!(sanitize_for_proto("日本語"), "日本語");
395        // Replacement character from String::from_utf8_lossy is valid UTF-8
396        assert_eq!(sanitize_for_proto("bad\u{FFFD}char"), "bad\u{FFFD}char");
397    }
398}