Skip to main content

dk_protocol/
merge.rs

1use tonic::Status;
2use uuid::Uuid;
3
4use dk_engine::workspace::merge::{merge_workspace, WorkspaceMergeResult};
5
6use crate::server::ProtocolServer;
7use crate::{merge_response, ConflictDetail, MergeConflict, MergeRequest, MergeResponse, MergeSuccess};
8
9/// Conflict type for true write-write semantic conflicts.
10const CONFLICT_TYPE_TRUE: &str = "true_conflict";
11
12/// Sanitize a string for protobuf `string` fields.
13///
14/// Rust `String` is guaranteed valid UTF-8, but content originating from
15/// tree-sitter AST parsing may contain null bytes or replacement characters
16/// from lossy conversions.  Strip null bytes so the value round-trips cleanly
17/// through protobuf serialization/deserialization.
18fn sanitize_for_proto(s: &str) -> String {
19    s.replace('\0', "")
20}
21
22pub async fn handle_merge(
23    server: &ProtocolServer,
24    req: MergeRequest,
25) -> Result<MergeResponse, Status> {
26    let session = server.validate_session(&req.session_id)?;
27    let engine = server.engine();
28
29    let sid = req
30        .session_id
31        .parse::<Uuid>()
32        .map_err(|_| Status::invalid_argument("Invalid session ID"))?;
33
34    // Resolve repo_id for enriched events
35    let repo_id_str = match engine.get_repo(&session.codebase).await {
36        Ok((rid, _)) => rid.to_string(),
37        Err(_) => String::new(),
38    };
39
40    let changeset_id = req.changeset_id.parse::<Uuid>()
41        .map_err(|_| Status::invalid_argument("invalid changeset_id"))?;
42
43    // Get changeset and verify it's approved
44    let changeset = engine.changeset_store().get(changeset_id).await
45        .map_err(|e| Status::not_found(e.to_string()))?;
46
47    if changeset.state != "approved" {
48        return Err(Status::failed_precondition(format!(
49            "changeset is '{}', must be 'approved' to merge",
50            changeset.state
51        )));
52    }
53
54    // Get workspace for this session
55    let ws = engine
56        .workspace_manager()
57        .get_workspace(&sid)
58        .ok_or_else(|| Status::not_found("Workspace not found for session"))?;
59
60    // Get git repo
61    let (_, git_repo) = engine.get_repo(&session.codebase).await
62        .map_err(|e| Status::internal(e.to_string()))?;
63
64    let agent = changeset.agent_id.as_deref().unwrap_or("agent");
65
66    let (effective_name, effective_email) =
67        dk_core::resolve_author(&req.author_name, &req.author_email, agent);
68
69    // Capture affected files from workspace overlay before merge/drop
70    let affected_files: Vec<crate::FileChange> = ws.overlay.list_changes()
71        .iter()
72        .map(|(path, entry)| {
73            let operation = match entry {
74                dk_engine::workspace::overlay::OverlayEntry::Added { .. } => "add",
75                dk_engine::workspace::overlay::OverlayEntry::Modified { .. } => "modify",
76                dk_engine::workspace::overlay::OverlayEntry::Deleted => "delete",
77            };
78            crate::FileChange {
79                path: path.clone(),
80                operation: operation.to_string(),
81            }
82        })
83        .collect();
84
85    // Use the programmatic workspace merge instead of git add -A
86    let merge_result = merge_workspace(
87        &ws,
88        &git_repo,
89        engine.parser(),
90        &req.commit_message,
91        &effective_name,
92        &effective_email,
93    )
94    .map_err(|e| Status::internal(format!("merge failed: {e}")))?;
95
96    // Drop workspace guard before further async work
97    drop(ws);
98
99    match merge_result {
100        WorkspaceMergeResult::FastMerge { commit_hash } => {
101            // Update changeset status to merged
102            engine.changeset_store().set_merged(changeset_id, &commit_hash).await
103                .map_err(|e| Status::internal(e.to_string()))?;
104
105            // Publish event
106            server.event_bus().publish(crate::WatchEvent {
107                event_type: "changeset.merged".to_string(),
108                changeset_id: changeset_id.to_string(),
109                agent_id: changeset.agent_id.clone().unwrap_or_default(),
110                affected_symbols: vec![],
111                details: format!("fast-merged as {}", commit_hash),
112                session_id: req.session_id.clone(),
113                affected_files: affected_files.clone(),
114                symbol_changes: vec![],
115                repo_id: repo_id_str.clone(),
116                event_id: Uuid::new_v4().to_string(),
117            });
118
119            Ok(MergeResponse {
120                result: Some(merge_response::Result::Success(MergeSuccess {
121                    commit_hash: commit_hash.clone(),
122                    merged_version: commit_hash,
123                    auto_rebased: false,
124                    auto_rebased_files: Vec::new(),
125                })),
126            })
127        }
128
129        WorkspaceMergeResult::RebaseMerge {
130            commit_hash,
131            auto_rebased_files,
132        } => {
133            // Update changeset status to merged
134            engine.changeset_store().set_merged(changeset_id, &commit_hash).await
135                .map_err(|e| Status::internal(e.to_string()))?;
136
137            // Publish event
138            server.event_bus().publish(crate::WatchEvent {
139                event_type: "changeset.merged".to_string(),
140                changeset_id: changeset_id.to_string(),
141                agent_id: changeset.agent_id.clone().unwrap_or_default(),
142                affected_symbols: vec![],
143                details: format!(
144                    "rebase-merged as {} (auto-rebased {} files)",
145                    commit_hash,
146                    auto_rebased_files.len()
147                ),
148                session_id: req.session_id.clone(),
149                affected_files,
150                symbol_changes: vec![],
151                repo_id: repo_id_str.clone(),
152                event_id: Uuid::new_v4().to_string(),
153            });
154
155            Ok(MergeResponse {
156                result: Some(merge_response::Result::Success(MergeSuccess {
157                    commit_hash: commit_hash.clone(),
158                    merged_version: commit_hash,
159                    auto_rebased: true,
160                    auto_rebased_files,
161                })),
162            })
163        }
164
165        WorkspaceMergeResult::Conflicts { conflicts } => {
166            let conflict_details: Vec<ConflictDetail> = conflicts
167                .iter()
168                .map(|c| {
169                    let file = sanitize_for_proto(&c.file_path);
170                    let symbol = sanitize_for_proto(&c.symbol_name);
171                    ConflictDetail {
172                        file_path: file,
173                        symbols: vec![symbol.clone()],
174                        your_agent: agent.to_string(),
175                        // TODO: resolve their_agent from the session/changeset store
176                        // once SemanticConflict carries agent attribution.
177                        their_agent: String::new(),
178                        conflict_type: CONFLICT_TYPE_TRUE.to_string(),
179                        description: format!(
180                            "Symbol '{}' — our change: {:?}, their change: {:?}",
181                            symbol, c.our_change, c.their_change
182                        ),
183                    }
184                })
185                .collect();
186
187            let suggested_action = "adapt".to_string();
188            let available_actions = vec!["adapt".to_string(), "keep_mine".to_string(), "keep_theirs".to_string()];
189
190            debug_assert!(
191                available_actions.iter().any(|a| a == &suggested_action),
192                "suggested_action '{}' is not in available_actions {:?}",
193                suggested_action, available_actions
194            );
195
196            Ok(MergeResponse {
197                result: Some(merge_response::Result::Conflict(MergeConflict {
198                    changeset_id: changeset_id.to_string(),
199                    conflicts: conflict_details,
200                    suggested_action,
201                    available_actions,
202                })),
203            })
204        }
205    }
206}
207
208// ── Event type constant ─────────────────────────────────────────────
209
210/// Event published when a changeset is successfully merged.
211pub const EVENT_MERGED: &str = "changeset.merged";
212
213#[cfg(test)]
214mod tests {
215    use super::*;
216
217    #[test]
218    fn merged_event_type() {
219        assert_eq!(EVENT_MERGED, "changeset.merged");
220    }
221
222    #[test]
223    fn merged_event_type_uses_dot_separator() {
224        assert!(
225            EVENT_MERGED.contains('.'),
226            "event type should use dot separator"
227        );
228        assert!(
229            EVENT_MERGED.starts_with("changeset."),
230            "event type should start with 'changeset.'"
231        );
232    }
233
234    #[test]
235    fn merged_event_type_is_not_underscore_format() {
236        // Verify the event was renamed from "changeset_merged" to "changeset.merged"
237        assert_ne!(EVENT_MERGED, "changeset_merged");
238        assert_eq!(EVENT_MERGED, "changeset.merged");
239    }
240
241    #[test]
242    fn merge_success_construction() {
243        let resp = MergeResponse {
244            result: Some(merge_response::Result::Success(MergeSuccess {
245                commit_hash: "abc123".to_string(),
246                merged_version: "abc123".to_string(),
247                auto_rebased: false,
248                auto_rebased_files: Vec::new(),
249            })),
250        };
251        match resp.result {
252            Some(merge_response::Result::Success(s)) => {
253                assert_eq!(s.commit_hash, "abc123");
254                assert!(!s.auto_rebased);
255                assert!(s.auto_rebased_files.is_empty());
256            }
257            _ => panic!("expected MergeSuccess"),
258        }
259    }
260
261    #[test]
262    fn merge_success_with_rebase() {
263        let resp = MergeResponse {
264            result: Some(merge_response::Result::Success(MergeSuccess {
265                commit_hash: "def456".to_string(),
266                merged_version: "def456".to_string(),
267                auto_rebased: true,
268                auto_rebased_files: vec!["src/main.rs".to_string()],
269            })),
270        };
271        match resp.result {
272            Some(merge_response::Result::Success(s)) => {
273                assert!(s.auto_rebased);
274                assert_eq!(s.auto_rebased_files, vec!["src/main.rs"]);
275            }
276            _ => panic!("expected MergeSuccess"),
277        }
278    }
279
280    #[test]
281    fn merge_conflict_construction() {
282        // their_agent is currently not populated by the server (SemanticConflict
283        // does not carry agent attribution yet), so the test mirrors real
284        // behavior by using an empty string.
285        let detail = ConflictDetail {
286            file_path: "src/lib.rs".to_string(),
287            symbols: vec!["process_data".to_string()],
288            your_agent: "agent-1".to_string(),
289            their_agent: String::new(),
290            conflict_type: CONFLICT_TYPE_TRUE.to_string(),
291            description: "both agents modified process_data".to_string(),
292        };
293        let resp = MergeResponse {
294            result: Some(merge_response::Result::Conflict(MergeConflict {
295                changeset_id: "cs-001".to_string(),
296                conflicts: vec![detail],
297                suggested_action: "adapt".to_string(),
298                available_actions: vec![
299                    "adapt".to_string(),
300                    "keep_mine".to_string(),
301                    "keep_theirs".to_string(),
302                ],
303            })),
304        };
305        match resp.result {
306            Some(merge_response::Result::Conflict(c)) => {
307                assert_eq!(c.changeset_id, "cs-001");
308                assert_eq!(c.conflicts.len(), 1);
309                assert_eq!(c.conflicts[0].file_path, "src/lib.rs");
310                assert_eq!(c.conflicts[0].symbols, vec!["process_data"]);
311                assert_eq!(c.conflicts[0].your_agent, "agent-1");
312                assert!(c.conflicts[0].their_agent.is_empty());
313                assert_eq!(c.suggested_action, "adapt");
314                assert_eq!(c.available_actions.len(), 3);
315            }
316            _ => panic!("expected MergeConflict"),
317        }
318    }
319
320    #[test]
321    fn conflict_detail_fields() {
322        let detail = ConflictDetail {
323            file_path: "src/handler.rs".to_string(),
324            symbols: vec!["handle_request".to_string(), "parse_input".to_string()],
325            your_agent: "agent-a".to_string(),
326            their_agent: "agent-b".to_string(),
327            conflict_type: CONFLICT_TYPE_TRUE.to_string(),
328            description: "multiple symbols in conflict".to_string(),
329        };
330        assert_eq!(detail.symbols.len(), 2);
331        assert_eq!(detail.conflict_type, CONFLICT_TYPE_TRUE);
332    }
333
334    #[test]
335    fn sanitize_for_proto_strips_null_bytes() {
336        assert_eq!(sanitize_for_proto("hello\0world"), "helloworld");
337        assert_eq!(sanitize_for_proto("\0\0"), "");
338        assert_eq!(sanitize_for_proto("clean"), "clean");
339    }
340
341    #[test]
342    fn sanitize_for_proto_preserves_valid_utf8() {
343        // Multi-byte UTF-8 characters must survive sanitization
344        assert_eq!(sanitize_for_proto("fn résumé()"), "fn résumé()");
345        assert_eq!(sanitize_for_proto("日本語"), "日本語");
346        // Replacement character from String::from_utf8_lossy is valid UTF-8
347        assert_eq!(sanitize_for_proto("bad\u{FFFD}char"), "bad\u{FFFD}char");
348    }
349}