Skip to main content

dk_engine/conflict/
claim_tracker.rs

1use dashmap::DashMap;
2use std::time::Instant;
3use uuid::Uuid;
4
5use dk_core::SymbolKind;
6
7/// A claim that a particular session has touched a symbol.
8#[derive(Debug, Clone)]
9pub struct SymbolClaim {
10    pub session_id: Uuid,
11    pub agent_name: String,
12    pub qualified_name: String,
13    pub kind: SymbolKind,
14    pub first_touched_at: Instant,
15}
16
17/// Information about a detected conflict: another session already claims
18/// ownership of a symbol that the current session wants to modify.
19#[derive(Debug, Clone)]
20pub struct ConflictInfo {
21    pub qualified_name: String,
22    pub kind: SymbolKind,
23    pub conflicting_session: Uuid,
24    pub conflicting_agent: String,
25    pub first_touched_at: Instant,
26}
27
28/// Information about a symbol lock held by another session.
29/// Returned when `acquire_lock` finds the symbol is already locked.
30#[derive(Debug, Clone)]
31pub struct SymbolLocked {
32    pub qualified_name: String,
33    pub kind: SymbolKind,
34    pub locked_by_session: Uuid,
35    pub locked_by_agent: String,
36    pub locked_since: Instant,
37    pub file_path: String,
38}
39
40/// Outcome of a successful `acquire_lock` call.
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum AcquireOutcome {
43    /// Lock freshly acquired — include in rollback list.
44    Fresh,
45    /// Session already held this lock — exclude from rollback.
46    ReAcquired,
47}
48
49/// Result of releasing locks for a session. Contains the symbols that
50/// were released, so callers can emit `symbol.lock.released` events.
51#[derive(Debug, Clone)]
52pub struct ReleasedLock {
53    pub file_path: String,
54    pub qualified_name: String,
55    pub kind: SymbolKind,
56    pub agent_name: String,
57}
58
59/// Thread-safe, lock-free tracker for symbol-level claims across sessions.
60///
61/// Key insight: two sessions modifying DIFFERENT symbols in the same file is
62/// NOT a conflict. Only same-symbol modifications across sessions are TRUE
63/// conflicts. This is dkod's core differentiator over line-based VCS.
64///
65/// The tracker is keyed by `(repo_id, file_path)` and stores a `Vec<SymbolClaim>`
66/// for each file. DashMap provides fine-grained per-shard locking so reads are
67/// effectively lock-free when not contending on the same shard.
68pub struct SymbolClaimTracker {
69    /// Map from (repo_id, file_path) to the list of claims on that file.
70    claims: DashMap<(Uuid, String), Vec<SymbolClaim>>,
71}
72
73impl SymbolClaimTracker {
74    /// Create a new, empty tracker.
75    pub fn new() -> Self {
76        Self {
77            claims: DashMap::new(),
78        }
79    }
80
81    /// Record a symbol claim. If the same session already claims the same
82    /// `qualified_name` in the same file, the existing claim is updated
83    /// (not duplicated).
84    pub fn record_claim(&self, repo_id: Uuid, file_path: &str, claim: SymbolClaim) {
85        let key = (repo_id, file_path.to_string());
86        let mut entry = self.claims.entry(key).or_default();
87        let claims = entry.value_mut();
88
89        // Deduplicate: same session + same qualified_name → update in place
90        if let Some(existing) = claims.iter_mut().find(|c| {
91            c.session_id == claim.session_id && c.qualified_name == claim.qualified_name
92        }) {
93            existing.kind = claim.kind;
94            existing.agent_name = claim.agent_name;
95            // Keep the original first_touched_at
96        } else {
97            claims.push(claim);
98        }
99    }
100
101    /// Attempt to acquire a symbol lock. If the symbol is already claimed by
102    /// another session, returns `Err(SymbolLocked)` — the write MUST NOT proceed.
103    /// If claimed by the same session, or unclaimed, acquires and returns `Ok(())`.
104    ///
105    /// This is the blocking counterpart to `record_claim`. Use this when writes
106    /// should be rejected if another agent holds the symbol.
107    pub fn acquire_lock(
108        &self,
109        repo_id: Uuid,
110        file_path: &str,
111        claim: SymbolClaim,
112    ) -> Result<AcquireOutcome, SymbolLocked> {
113        let key = (repo_id, file_path.to_string());
114        let mut entry = self.claims.entry(key).or_default();
115        let claims = entry.value_mut();
116
117        // Check if another session already holds this symbol
118        if let Some(existing) = claims.iter().find(|c| {
119            c.qualified_name == claim.qualified_name && c.session_id != claim.session_id
120        }) {
121            return Err(SymbolLocked {
122                qualified_name: claim.qualified_name,
123                kind: existing.kind.clone(),
124                locked_by_session: existing.session_id,
125                locked_by_agent: existing.agent_name.clone(),
126                locked_since: existing.first_touched_at,
127                file_path: file_path.to_string(),
128            });
129        }
130
131        // Same session re-acquisition — update metadata, return ReAcquired
132        if let Some(existing) = claims.iter_mut().find(|c| {
133            c.session_id == claim.session_id && c.qualified_name == claim.qualified_name
134        }) {
135            existing.kind = claim.kind;
136            existing.agent_name = claim.agent_name;
137            return Ok(AcquireOutcome::ReAcquired);
138        }
139
140        // Fresh claim
141        claims.push(claim);
142        Ok(AcquireOutcome::Fresh)
143    }
144
145    /// Release a single symbol lock for a session in a specific file.
146    /// Used to roll back partially-acquired locks when a batch fails.
147    pub fn release_lock(
148        &self,
149        repo_id: Uuid,
150        file_path: &str,
151        session_id: Uuid,
152        qualified_name: &str,
153    ) {
154        let key = (repo_id, file_path.to_string());
155        if let Some(mut entry) = self.claims.get_mut(&key) {
156            entry.value_mut().retain(|c| {
157                !(c.session_id == session_id && c.qualified_name == qualified_name)
158            });
159        }
160        // Clean up empty entries to prevent unbounded growth from repeated rollbacks
161        self.claims.remove_if(&key, |_, v| v.is_empty());
162    }
163
164    /// Release all locks held by a session and return what was released.
165    /// Callers should emit `symbol.lock.released` events for each returned entry.
166    pub fn release_locks(&self, repo_id: Uuid, session_id: Uuid) -> Vec<ReleasedLock> {
167        let mut released = Vec::new();
168        let mut empty_keys = Vec::new();
169
170        for mut entry in self.claims.iter_mut() {
171            let key = entry.key().clone();
172            if key.0 != repo_id {
173                continue;
174            }
175            let file_path = &key.1;
176            let claims = entry.value_mut();
177
178            // Collect released locks before removing
179            for claim in claims.iter().filter(|c| c.session_id == session_id) {
180                released.push(ReleasedLock {
181                    file_path: file_path.clone(),
182                    qualified_name: claim.qualified_name.clone(),
183                    kind: claim.kind.clone(),
184                    agent_name: claim.agent_name.clone(),
185                });
186            }
187
188            claims.retain(|c| c.session_id != session_id);
189            if claims.is_empty() {
190                empty_keys.push(key);
191            }
192        }
193
194        for key in empty_keys {
195            self.claims.remove_if(&key, |_, v| v.is_empty());
196        }
197
198        released
199    }
200
201    /// Check whether any of the given `qualified_names` are already claimed by
202    /// a session other than `session_id`. Returns a `ConflictInfo` for each
203    /// conflicting symbol.
204    pub fn check_conflicts(
205        &self,
206        repo_id: Uuid,
207        file_path: &str,
208        session_id: Uuid,
209        qualified_names: &[String],
210    ) -> Vec<ConflictInfo> {
211        let key = (repo_id, file_path.to_string());
212        let Some(entry) = self.claims.get(&key) else {
213            return Vec::new();
214        };
215
216        let mut conflicts = Vec::new();
217        for name in qualified_names {
218            for claim in entry.value() {
219                if claim.qualified_name == *name && claim.session_id != session_id {
220                    conflicts.push(ConflictInfo {
221                        qualified_name: name.clone(),
222                        kind: claim.kind.clone(),
223                        conflicting_session: claim.session_id,
224                        conflicting_agent: claim.agent_name.clone(),
225                        first_touched_at: claim.first_touched_at,
226                    });
227                    // Only report the first conflicting session per symbol
228                    break;
229                }
230            }
231        }
232        conflicts
233    }
234
235    /// Return all conflicts for a given session across ALL file paths.
236    ///
237    /// This checks every tracked file to find symbols where `session_id` has
238    /// a claim AND another session also claims the same symbol.
239    pub fn get_all_conflicts_for_session(
240        &self,
241        repo_id: Uuid,
242        session_id: Uuid,
243    ) -> Vec<(String, ConflictInfo)> {
244        let mut results = Vec::new();
245        for entry in self.claims.iter() {
246            let (entry_repo_id, file_path) = entry.key();
247            if *entry_repo_id != repo_id {
248                continue;
249            }
250            let claims = entry.value();
251
252            // Find symbols claimed by this session
253            let my_symbols: Vec<&SymbolClaim> = claims
254                .iter()
255                .filter(|c| c.session_id == session_id)
256                .collect();
257
258            for my_claim in &my_symbols {
259                // Check if any OTHER session also claims this symbol
260                for other_claim in claims {
261                    if other_claim.session_id != session_id
262                        && other_claim.qualified_name == my_claim.qualified_name
263                    {
264                        results.push((
265                            file_path.clone(),
266                            ConflictInfo {
267                                qualified_name: my_claim.qualified_name.clone(),
268                                kind: my_claim.kind.clone(),
269                                conflicting_session: other_claim.session_id,
270                                conflicting_agent: other_claim.agent_name.clone(),
271                                first_touched_at: other_claim.first_touched_at,
272                            },
273                        ));
274                        // Only report the first conflicting session per symbol
275                        break;
276                    }
277                }
278            }
279        }
280        results
281    }
282
283    /// Remove all claims belonging to a session across ALL repos (e.g. on
284    /// disconnect or GC). Returns the released locks so callers can emit
285    /// `symbol.lock.released` events to unblock waiting agents.
286    pub fn clear_session(&self, session_id: Uuid) -> Vec<ReleasedLock> {
287        let mut released = Vec::new();
288        let mut empty_keys = Vec::new();
289        for mut entry in self.claims.iter_mut() {
290            let key = entry.key().clone();
291            let file_path = &key.1;
292            let claims = entry.value_mut();
293
294            for claim in claims.iter().filter(|c| c.session_id == session_id) {
295                released.push(ReleasedLock {
296                    file_path: file_path.clone(),
297                    qualified_name: claim.qualified_name.clone(),
298                    kind: claim.kind.clone(),
299                    agent_name: claim.agent_name.clone(),
300                });
301            }
302
303            claims.retain(|c| c.session_id != session_id);
304            if claims.is_empty() {
305                empty_keys.push(key);
306            }
307        }
308        for key in empty_keys {
309            self.claims.remove_if(&key, |_, v| v.is_empty());
310        }
311        released
312    }
313}
314
315impl Default for SymbolClaimTracker {
316    fn default() -> Self {
317        Self::new()
318    }
319}
320
321#[cfg(test)]
322mod tests {
323    use super::*;
324
325    fn make_claim(session_id: Uuid, agent: &str, name: &str, kind: SymbolKind) -> SymbolClaim {
326        SymbolClaim {
327            session_id,
328            agent_name: agent.to_string(),
329            qualified_name: name.to_string(),
330            kind,
331            first_touched_at: Instant::now(),
332        }
333    }
334
335    #[test]
336    fn no_conflict_different_symbols_same_file() {
337        let tracker = SymbolClaimTracker::new();
338        let repo = Uuid::new_v4();
339        let session_a = Uuid::new_v4();
340        let session_b = Uuid::new_v4();
341
342        tracker.record_claim(
343            repo,
344            "src/lib.rs",
345            make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
346        );
347
348        let conflicts = tracker.check_conflicts(
349            repo,
350            "src/lib.rs",
351            session_b,
352            &["fn_b".to_string()],
353        );
354        assert!(conflicts.is_empty(), "different symbols should not conflict");
355    }
356
357    #[test]
358    fn conflict_same_symbol() {
359        let tracker = SymbolClaimTracker::new();
360        let repo = Uuid::new_v4();
361        let session_a = Uuid::new_v4();
362        let session_b = Uuid::new_v4();
363
364        tracker.record_claim(
365            repo,
366            "src/lib.rs",
367            make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
368        );
369
370        let conflicts = tracker.check_conflicts(
371            repo,
372            "src/lib.rs",
373            session_b,
374            &["fn_a".to_string()],
375        );
376        assert_eq!(conflicts.len(), 1);
377        assert_eq!(conflicts[0].qualified_name, "fn_a");
378        assert_eq!(conflicts[0].conflicting_session, session_a);
379        assert_eq!(conflicts[0].conflicting_agent, "agent-1");
380    }
381
382    #[test]
383    fn claims_cleared_on_session_destroy() {
384        let tracker = SymbolClaimTracker::new();
385        let repo = Uuid::new_v4();
386        let session_a = Uuid::new_v4();
387        let session_b = Uuid::new_v4();
388
389        tracker.record_claim(
390            repo,
391            "src/lib.rs",
392            make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
393        );
394
395        tracker.clear_session(session_a);
396
397        let conflicts = tracker.check_conflicts(
398            repo,
399            "src/lib.rs",
400            session_b,
401            &["fn_a".to_string()],
402        );
403        assert!(conflicts.is_empty(), "cleared session should not cause conflicts");
404    }
405
406    #[test]
407    fn same_session_no_self_conflict() {
408        let tracker = SymbolClaimTracker::new();
409        let repo = Uuid::new_v4();
410        let session_a = Uuid::new_v4();
411
412        tracker.record_claim(
413            repo,
414            "src/lib.rs",
415            make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
416        );
417        // Re-write same symbol from same session
418        tracker.record_claim(
419            repo,
420            "src/lib.rs",
421            make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
422        );
423
424        let conflicts = tracker.check_conflicts(
425            repo,
426            "src/lib.rs",
427            session_a,
428            &["fn_a".to_string()],
429        );
430        assert!(conflicts.is_empty(), "same session should not conflict with itself");
431    }
432
433    #[test]
434    fn multiple_conflicts() {
435        let tracker = SymbolClaimTracker::new();
436        let repo = Uuid::new_v4();
437        let session_a = Uuid::new_v4();
438        let session_b = Uuid::new_v4();
439
440        tracker.record_claim(
441            repo,
442            "src/lib.rs",
443            make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
444        );
445        tracker.record_claim(
446            repo,
447            "src/lib.rs",
448            make_claim(session_a, "agent-1", "fn_b", SymbolKind::Function),
449        );
450
451        let conflicts = tracker.check_conflicts(
452            repo,
453            "src/lib.rs",
454            session_b,
455            &["fn_a".to_string(), "fn_b".to_string()],
456        );
457        assert_eq!(conflicts.len(), 2);
458
459        let names: Vec<&str> = conflicts.iter().map(|c| c.qualified_name.as_str()).collect();
460        assert!(names.contains(&"fn_a"));
461        assert!(names.contains(&"fn_b"));
462    }
463
464    // ── acquire_lock tests ──
465
466    #[test]
467    fn acquire_lock_unclaimed_succeeds() {
468        let tracker = SymbolClaimTracker::new();
469        let repo = Uuid::new_v4();
470        let session = Uuid::new_v4();
471
472        let result = tracker.acquire_lock(
473            repo,
474            "src/lib.rs",
475            make_claim(session, "agent-1", "fn_a", SymbolKind::Function),
476        );
477        assert!(result.is_ok());
478    }
479
480    #[test]
481    fn acquire_lock_same_session_succeeds() {
482        let tracker = SymbolClaimTracker::new();
483        let repo = Uuid::new_v4();
484        let session = Uuid::new_v4();
485
486        tracker.acquire_lock(
487            repo,
488            "src/lib.rs",
489            make_claim(session, "agent-1", "fn_a", SymbolKind::Function),
490        ).unwrap();
491
492        // Same session re-acquiring same symbol should succeed
493        let result = tracker.acquire_lock(
494            repo,
495            "src/lib.rs",
496            make_claim(session, "agent-1", "fn_a", SymbolKind::Function),
497        );
498        assert!(result.is_ok());
499    }
500
501    #[test]
502    fn acquire_lock_cross_session_blocked() {
503        let tracker = SymbolClaimTracker::new();
504        let repo = Uuid::new_v4();
505        let session_a = Uuid::new_v4();
506        let session_b = Uuid::new_v4();
507
508        tracker.acquire_lock(
509            repo,
510            "src/lib.rs",
511            make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
512        ).unwrap();
513
514        let result = tracker.acquire_lock(
515            repo,
516            "src/lib.rs",
517            make_claim(session_b, "agent-2", "fn_a", SymbolKind::Function),
518        );
519        assert!(result.is_err());
520        let locked = result.unwrap_err();
521        assert_eq!(locked.qualified_name, "fn_a");
522        assert_eq!(locked.locked_by_session, session_a);
523        assert_eq!(locked.locked_by_agent, "agent-1");
524    }
525
526    #[test]
527    fn acquire_lock_different_symbols_same_file() {
528        let tracker = SymbolClaimTracker::new();
529        let repo = Uuid::new_v4();
530        let session_a = Uuid::new_v4();
531        let session_b = Uuid::new_v4();
532
533        tracker.acquire_lock(
534            repo,
535            "src/lib.rs",
536            make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
537        ).unwrap();
538
539        // Different symbol in same file — should succeed
540        let result = tracker.acquire_lock(
541            repo,
542            "src/lib.rs",
543            make_claim(session_b, "agent-2", "fn_b", SymbolKind::Function),
544        );
545        assert!(result.is_ok());
546    }
547
548    // ── release_lock tests ──
549
550    #[test]
551    fn release_lock_single_symbol() {
552        let tracker = SymbolClaimTracker::new();
553        let repo = Uuid::new_v4();
554        let session_a = Uuid::new_v4();
555        let session_b = Uuid::new_v4();
556
557        tracker.acquire_lock(
558            repo,
559            "src/lib.rs",
560            make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
561        ).unwrap();
562
563        // Release the lock
564        tracker.release_lock(repo, "src/lib.rs", session_a, "fn_a");
565
566        // Now another session should be able to acquire it
567        let result = tracker.acquire_lock(
568            repo,
569            "src/lib.rs",
570            make_claim(session_b, "agent-2", "fn_a", SymbolKind::Function),
571        );
572        assert!(result.is_ok());
573    }
574
575    #[test]
576    fn release_lock_cleans_empty_entries() {
577        let tracker = SymbolClaimTracker::new();
578        let repo = Uuid::new_v4();
579        let session = Uuid::new_v4();
580
581        tracker.acquire_lock(
582            repo,
583            "src/lib.rs",
584            make_claim(session, "agent-1", "fn_a", SymbolKind::Function),
585        ).unwrap();
586
587        tracker.release_lock(repo, "src/lib.rs", session, "fn_a");
588
589        // The key should be removed from the map (no empty vecs lingering)
590        let key = (repo, "src/lib.rs".to_string());
591        assert!(tracker.claims.get(&key).is_none());
592    }
593
594    // ── release_locks tests ──
595
596    #[test]
597    fn release_locks_returns_released_entries() {
598        let tracker = SymbolClaimTracker::new();
599        let repo = Uuid::new_v4();
600        let session = Uuid::new_v4();
601
602        tracker.acquire_lock(
603            repo,
604            "src/lib.rs",
605            make_claim(session, "agent-1", "fn_a", SymbolKind::Function),
606        ).unwrap();
607        tracker.acquire_lock(
608            repo,
609            "src/api.rs",
610            make_claim(session, "agent-1", "handler", SymbolKind::Function),
611        ).unwrap();
612
613        let released = tracker.release_locks(repo, session);
614        assert_eq!(released.len(), 2);
615
616        let names: Vec<&str> = released.iter().map(|r| r.qualified_name.as_str()).collect();
617        assert!(names.contains(&"fn_a"));
618        assert!(names.contains(&"handler"));
619    }
620
621    #[test]
622    fn release_locks_unblocks_other_session() {
623        let tracker = SymbolClaimTracker::new();
624        let repo = Uuid::new_v4();
625        let session_a = Uuid::new_v4();
626        let session_b = Uuid::new_v4();
627
628        tracker.acquire_lock(
629            repo,
630            "src/lib.rs",
631            make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
632        ).unwrap();
633
634        // session_b is blocked
635        assert!(tracker.acquire_lock(
636            repo,
637            "src/lib.rs",
638            make_claim(session_b, "agent-2", "fn_a", SymbolKind::Function),
639        ).is_err());
640
641        // Release session_a
642        tracker.release_locks(repo, session_a);
643
644        // session_b can now acquire
645        assert!(tracker.acquire_lock(
646            repo,
647            "src/lib.rs",
648            make_claim(session_b, "agent-2", "fn_a", SymbolKind::Function),
649        ).is_ok());
650    }
651}