Skip to main content

dk_engine/conflict/
claim_tracker.rs

1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use dashmap::DashMap;
4use serde::{Deserialize, Serialize};
5use uuid::Uuid;
6
7use dk_core::SymbolKind;
8
9/// A claim that a particular session has touched a symbol.
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct SymbolClaim {
12    pub session_id: Uuid,
13    pub agent_name: String,
14    pub qualified_name: String,
15    pub kind: SymbolKind,
16    pub first_touched_at: DateTime<Utc>,
17}
18
19/// Information about a detected conflict: another session already claims
20/// ownership of a symbol that the current session wants to modify.
21#[derive(Debug, Clone)]
22pub struct ConflictInfo {
23    pub qualified_name: String,
24    pub kind: SymbolKind,
25    pub conflicting_session: Uuid,
26    pub conflicting_agent: String,
27    pub first_touched_at: DateTime<Utc>,
28}
29
30/// Information about a symbol lock held by another session.
31/// Returned when `acquire_lock` finds the symbol is already locked.
32#[derive(Debug, Clone)]
33pub struct SymbolLocked {
34    pub qualified_name: String,
35    pub kind: SymbolKind,
36    pub locked_by_session: Uuid,
37    pub locked_by_agent: String,
38    pub locked_since: DateTime<Utc>,
39    pub file_path: String,
40}
41
42/// Outcome of a successful `acquire_lock` call.
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum AcquireOutcome {
45    /// Lock freshly acquired — include in rollback list.
46    Fresh,
47    /// Session already held this lock — exclude from rollback.
48    ReAcquired,
49}
50
51/// Result of releasing locks for a session. Contains the symbols that
52/// were released, so callers can emit `symbol.lock.released` events.
53#[derive(Debug, Clone)]
54pub struct ReleasedLock {
55    pub file_path: String,
56    pub qualified_name: String,
57    pub kind: SymbolKind,
58    pub agent_name: String,
59}
60
61// ---------------------------------------------------------------------------
62// ClaimTracker trait
63// ---------------------------------------------------------------------------
64
65/// Async trait for symbol-level claim tracking across sessions.
66///
67/// Implementations:
68/// - [`LocalClaimTracker`] — in-memory DashMap (single-pod / tests)
69/// - `ValkeyClaimTracker` — Valkey/Redis-backed (multi-pod, behind `valkey` feature)
70#[async_trait]
71pub trait ClaimTracker: Send + Sync {
72    /// Record a symbol claim (non-blocking — does not reject on conflict).
73    async fn record_claim(&self, repo_id: Uuid, file_path: &str, claim: SymbolClaim);
74
75    /// Attempt to acquire a symbol lock. Returns `Err(SymbolLocked)` if
76    /// another session already holds the symbol.
77    async fn acquire_lock(
78        &self,
79        repo_id: Uuid,
80        file_path: &str,
81        claim: SymbolClaim,
82    ) -> Result<AcquireOutcome, SymbolLocked>;
83
84    /// Release a single symbol lock for a session in a specific file.
85    async fn release_lock(
86        &self,
87        repo_id: Uuid,
88        file_path: &str,
89        session_id: Uuid,
90        qualified_name: &str,
91    );
92
93    /// Release all locks held by a session for a specific repo.
94    async fn release_locks(&self, repo_id: Uuid, session_id: Uuid) -> Vec<ReleasedLock>;
95
96    /// Check whether any of the given symbols conflict with another session.
97    async fn check_conflicts(
98        &self,
99        repo_id: Uuid,
100        file_path: &str,
101        session_id: Uuid,
102        qualified_names: &[String],
103    ) -> Vec<ConflictInfo>;
104
105    /// Return all conflicts for a session across all file paths.
106    ///
107    /// This method is only meaningful for the non-blocking `record_claim` path
108    /// where multiple sessions can record overlapping claims without rejection.
109    /// Backends that use exclusive locking (e.g. `ValkeyClaimTracker`) may
110    /// return an empty list since cross-session conflicts are prevented at
111    /// write time by `acquire_lock`.
112    async fn get_all_conflicts_for_session(
113        &self,
114        repo_id: Uuid,
115        session_id: Uuid,
116    ) -> Vec<(String, ConflictInfo)>;
117
118    /// Remove all claims belonging to a session across ALL repos.
119    async fn clear_session(&self, session_id: Uuid) -> Vec<ReleasedLock>;
120}
121
122// ---------------------------------------------------------------------------
123// LocalClaimTracker — in-memory DashMap implementation
124// ---------------------------------------------------------------------------
125
126/// Thread-safe, lock-free tracker for symbol-level claims across sessions.
127///
128/// Key insight: two sessions modifying DIFFERENT symbols in the same file is
129/// NOT a conflict. Only same-symbol modifications across sessions are TRUE
130/// conflicts. This is dkod's core differentiator over line-based VCS.
131///
132/// The tracker is keyed by `(repo_id, file_path)` and stores a `Vec<SymbolClaim>`
133/// for each file. DashMap provides fine-grained per-shard locking so reads are
134/// effectively lock-free when not contending on the same shard.
135pub struct LocalClaimTracker {
136    /// Map from (repo_id, file_path) to the list of claims on that file.
137    claims: DashMap<(Uuid, String), Vec<SymbolClaim>>,
138}
139
140impl LocalClaimTracker {
141    pub fn new() -> Self {
142        Self {
143            claims: DashMap::new(),
144        }
145    }
146}
147
148impl Default for LocalClaimTracker {
149    fn default() -> Self {
150        Self::new()
151    }
152}
153
154#[async_trait]
155impl ClaimTracker for LocalClaimTracker {
156    async fn record_claim(&self, repo_id: Uuid, file_path: &str, claim: SymbolClaim) {
157        let key = (repo_id, file_path.to_string());
158        let mut entry = self.claims.entry(key).or_default();
159        let claims = entry.value_mut();
160
161        if let Some(existing) = claims.iter_mut().find(|c| {
162            c.session_id == claim.session_id && c.qualified_name == claim.qualified_name
163        }) {
164            existing.kind = claim.kind;
165            existing.agent_name = claim.agent_name;
166        } else {
167            claims.push(claim);
168        }
169    }
170
171    async fn acquire_lock(
172        &self,
173        repo_id: Uuid,
174        file_path: &str,
175        claim: SymbolClaim,
176    ) -> Result<AcquireOutcome, SymbolLocked> {
177        let key = (repo_id, file_path.to_string());
178        let mut entry = self.claims.entry(key).or_default();
179        let claims = entry.value_mut();
180
181        if let Some(existing) = claims.iter().find(|c| {
182            c.qualified_name == claim.qualified_name && c.session_id != claim.session_id
183        }) {
184            return Err(SymbolLocked {
185                qualified_name: claim.qualified_name,
186                kind: existing.kind.clone(),
187                locked_by_session: existing.session_id,
188                locked_by_agent: existing.agent_name.clone(),
189                locked_since: existing.first_touched_at,
190                file_path: file_path.to_string(),
191            });
192        }
193
194        if let Some(existing) = claims.iter_mut().find(|c| {
195            c.session_id == claim.session_id && c.qualified_name == claim.qualified_name
196        }) {
197            existing.kind = claim.kind;
198            existing.agent_name = claim.agent_name;
199            return Ok(AcquireOutcome::ReAcquired);
200        }
201
202        claims.push(claim);
203        Ok(AcquireOutcome::Fresh)
204    }
205
206    async fn release_lock(
207        &self,
208        repo_id: Uuid,
209        file_path: &str,
210        session_id: Uuid,
211        qualified_name: &str,
212    ) {
213        let key = (repo_id, file_path.to_string());
214        if let Some(mut entry) = self.claims.get_mut(&key) {
215            entry.value_mut().retain(|c| {
216                !(c.session_id == session_id && c.qualified_name == qualified_name)
217            });
218        }
219        self.claims.remove_if(&key, |_, v| v.is_empty());
220    }
221
222    async fn release_locks(&self, repo_id: Uuid, session_id: Uuid) -> Vec<ReleasedLock> {
223        let mut released = Vec::new();
224        let mut empty_keys = Vec::new();
225
226        for mut entry in self.claims.iter_mut() {
227            let key = entry.key().clone();
228            if key.0 != repo_id {
229                continue;
230            }
231            let file_path = &key.1;
232            let claims = entry.value_mut();
233
234            for claim in claims.iter().filter(|c| c.session_id == session_id) {
235                released.push(ReleasedLock {
236                    file_path: file_path.clone(),
237                    qualified_name: claim.qualified_name.clone(),
238                    kind: claim.kind.clone(),
239                    agent_name: claim.agent_name.clone(),
240                });
241            }
242
243            claims.retain(|c| c.session_id != session_id);
244            if claims.is_empty() {
245                empty_keys.push(key);
246            }
247        }
248
249        for key in empty_keys {
250            self.claims.remove_if(&key, |_, v| v.is_empty());
251        }
252
253        released
254    }
255
256    async fn check_conflicts(
257        &self,
258        repo_id: Uuid,
259        file_path: &str,
260        session_id: Uuid,
261        qualified_names: &[String],
262    ) -> Vec<ConflictInfo> {
263        let key = (repo_id, file_path.to_string());
264        let Some(entry) = self.claims.get(&key) else {
265            return Vec::new();
266        };
267
268        let mut conflicts = Vec::new();
269        for name in qualified_names {
270            for claim in entry.value() {
271                if claim.qualified_name == *name && claim.session_id != session_id {
272                    conflicts.push(ConflictInfo {
273                        qualified_name: name.clone(),
274                        kind: claim.kind.clone(),
275                        conflicting_session: claim.session_id,
276                        conflicting_agent: claim.agent_name.clone(),
277                        first_touched_at: claim.first_touched_at,
278                    });
279                    break;
280                }
281            }
282        }
283        conflicts
284    }
285
286    async fn get_all_conflicts_for_session(
287        &self,
288        repo_id: Uuid,
289        session_id: Uuid,
290    ) -> Vec<(String, ConflictInfo)> {
291        let mut results = Vec::new();
292        for entry in self.claims.iter() {
293            let (entry_repo_id, file_path) = entry.key();
294            if *entry_repo_id != repo_id {
295                continue;
296            }
297            let claims = entry.value();
298
299            let my_symbols: Vec<&SymbolClaim> = claims
300                .iter()
301                .filter(|c| c.session_id == session_id)
302                .collect();
303
304            for my_claim in &my_symbols {
305                for other_claim in claims {
306                    if other_claim.session_id != session_id
307                        && other_claim.qualified_name == my_claim.qualified_name
308                    {
309                        results.push((
310                            file_path.clone(),
311                            ConflictInfo {
312                                qualified_name: my_claim.qualified_name.clone(),
313                                kind: my_claim.kind.clone(),
314                                conflicting_session: other_claim.session_id,
315                                conflicting_agent: other_claim.agent_name.clone(),
316                                first_touched_at: other_claim.first_touched_at,
317                            },
318                        ));
319                        break;
320                    }
321                }
322            }
323        }
324        results
325    }
326
327    async fn clear_session(&self, session_id: Uuid) -> Vec<ReleasedLock> {
328        let mut released = Vec::new();
329        let mut empty_keys = Vec::new();
330        for mut entry in self.claims.iter_mut() {
331            let key = entry.key().clone();
332            let file_path = &key.1;
333            let claims = entry.value_mut();
334
335            for claim in claims.iter().filter(|c| c.session_id == session_id) {
336                released.push(ReleasedLock {
337                    file_path: file_path.clone(),
338                    qualified_name: claim.qualified_name.clone(),
339                    kind: claim.kind.clone(),
340                    agent_name: claim.agent_name.clone(),
341                });
342            }
343
344            claims.retain(|c| c.session_id != session_id);
345            if claims.is_empty() {
346                empty_keys.push(key);
347            }
348        }
349        for key in empty_keys {
350            self.claims.remove_if(&key, |_, v| v.is_empty());
351        }
352        released
353    }
354}
355
356#[cfg(test)]
357mod tests {
358    use super::*;
359
360    fn make_claim(session_id: Uuid, agent: &str, name: &str, kind: SymbolKind) -> SymbolClaim {
361        SymbolClaim {
362            session_id,
363            agent_name: agent.to_string(),
364            qualified_name: name.to_string(),
365            kind,
366            first_touched_at: Utc::now(),
367        }
368    }
369
370    #[tokio::test]
371    async fn no_conflict_different_symbols_same_file() {
372        let tracker = LocalClaimTracker::new();
373        let repo = Uuid::new_v4();
374        let session_a = Uuid::new_v4();
375        let session_b = Uuid::new_v4();
376
377        tracker
378            .record_claim(
379                repo,
380                "src/lib.rs",
381                make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
382            )
383            .await;
384
385        let conflicts = tracker
386            .check_conflicts(repo, "src/lib.rs", session_b, &["fn_b".to_string()])
387            .await;
388        assert!(conflicts.is_empty(), "different symbols should not conflict");
389    }
390
391    #[tokio::test]
392    async fn conflict_same_symbol() {
393        let tracker = LocalClaimTracker::new();
394        let repo = Uuid::new_v4();
395        let session_a = Uuid::new_v4();
396        let session_b = Uuid::new_v4();
397
398        tracker
399            .record_claim(
400                repo,
401                "src/lib.rs",
402                make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
403            )
404            .await;
405
406        let conflicts = tracker
407            .check_conflicts(repo, "src/lib.rs", session_b, &["fn_a".to_string()])
408            .await;
409        assert_eq!(conflicts.len(), 1);
410        assert_eq!(conflicts[0].qualified_name, "fn_a");
411        assert_eq!(conflicts[0].conflicting_session, session_a);
412        assert_eq!(conflicts[0].conflicting_agent, "agent-1");
413    }
414
415    #[tokio::test]
416    async fn claims_cleared_on_session_destroy() {
417        let tracker = LocalClaimTracker::new();
418        let repo = Uuid::new_v4();
419        let session_a = Uuid::new_v4();
420        let session_b = Uuid::new_v4();
421
422        tracker
423            .record_claim(
424                repo,
425                "src/lib.rs",
426                make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
427            )
428            .await;
429
430        tracker.clear_session(session_a).await;
431
432        let conflicts = tracker
433            .check_conflicts(repo, "src/lib.rs", session_b, &["fn_a".to_string()])
434            .await;
435        assert!(
436            conflicts.is_empty(),
437            "cleared session should not cause conflicts"
438        );
439    }
440
441    #[tokio::test]
442    async fn same_session_no_self_conflict() {
443        let tracker = LocalClaimTracker::new();
444        let repo = Uuid::new_v4();
445        let session_a = Uuid::new_v4();
446
447        tracker
448            .record_claim(
449                repo,
450                "src/lib.rs",
451                make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
452            )
453            .await;
454        tracker
455            .record_claim(
456                repo,
457                "src/lib.rs",
458                make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
459            )
460            .await;
461
462        let conflicts = tracker
463            .check_conflicts(repo, "src/lib.rs", session_a, &["fn_a".to_string()])
464            .await;
465        assert!(
466            conflicts.is_empty(),
467            "same session should not conflict with itself"
468        );
469    }
470
471    #[tokio::test]
472    async fn multiple_conflicts() {
473        let tracker = LocalClaimTracker::new();
474        let repo = Uuid::new_v4();
475        let session_a = Uuid::new_v4();
476        let session_b = Uuid::new_v4();
477
478        tracker
479            .record_claim(
480                repo,
481                "src/lib.rs",
482                make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
483            )
484            .await;
485        tracker
486            .record_claim(
487                repo,
488                "src/lib.rs",
489                make_claim(session_a, "agent-1", "fn_b", SymbolKind::Function),
490            )
491            .await;
492
493        let conflicts = tracker
494            .check_conflicts(
495                repo,
496                "src/lib.rs",
497                session_b,
498                &["fn_a".to_string(), "fn_b".to_string()],
499            )
500            .await;
501        assert_eq!(conflicts.len(), 2);
502
503        let names: Vec<&str> = conflicts.iter().map(|c| c.qualified_name.as_str()).collect();
504        assert!(names.contains(&"fn_a"));
505        assert!(names.contains(&"fn_b"));
506    }
507
508    #[tokio::test]
509    async fn acquire_lock_unclaimed_succeeds() {
510        let tracker = LocalClaimTracker::new();
511        let repo = Uuid::new_v4();
512        let session = Uuid::new_v4();
513
514        let result = tracker
515            .acquire_lock(
516                repo,
517                "src/lib.rs",
518                make_claim(session, "agent-1", "fn_a", SymbolKind::Function),
519            )
520            .await;
521        assert!(result.is_ok());
522    }
523
524    #[tokio::test]
525    async fn acquire_lock_same_session_succeeds() {
526        let tracker = LocalClaimTracker::new();
527        let repo = Uuid::new_v4();
528        let session = Uuid::new_v4();
529
530        tracker
531            .acquire_lock(
532                repo,
533                "src/lib.rs",
534                make_claim(session, "agent-1", "fn_a", SymbolKind::Function),
535            )
536            .await
537            .unwrap();
538
539        let result = tracker
540            .acquire_lock(
541                repo,
542                "src/lib.rs",
543                make_claim(session, "agent-1", "fn_a", SymbolKind::Function),
544            )
545            .await;
546        assert!(result.is_ok());
547    }
548
549    #[tokio::test]
550    async fn acquire_lock_cross_session_blocked() {
551        let tracker = LocalClaimTracker::new();
552        let repo = Uuid::new_v4();
553        let session_a = Uuid::new_v4();
554        let session_b = Uuid::new_v4();
555
556        tracker
557            .acquire_lock(
558                repo,
559                "src/lib.rs",
560                make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
561            )
562            .await
563            .unwrap();
564
565        let result = tracker
566            .acquire_lock(
567                repo,
568                "src/lib.rs",
569                make_claim(session_b, "agent-2", "fn_a", SymbolKind::Function),
570            )
571            .await;
572        assert!(result.is_err());
573        let locked = result.unwrap_err();
574        assert_eq!(locked.qualified_name, "fn_a");
575        assert_eq!(locked.locked_by_session, session_a);
576        assert_eq!(locked.locked_by_agent, "agent-1");
577    }
578
579    #[tokio::test]
580    async fn acquire_lock_different_symbols_same_file() {
581        let tracker = LocalClaimTracker::new();
582        let repo = Uuid::new_v4();
583        let session_a = Uuid::new_v4();
584        let session_b = Uuid::new_v4();
585
586        tracker
587            .acquire_lock(
588                repo,
589                "src/lib.rs",
590                make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
591            )
592            .await
593            .unwrap();
594
595        let result = tracker
596            .acquire_lock(
597                repo,
598                "src/lib.rs",
599                make_claim(session_b, "agent-2", "fn_b", SymbolKind::Function),
600            )
601            .await;
602        assert!(result.is_ok());
603    }
604
605    #[tokio::test]
606    async fn release_lock_single_symbol() {
607        let tracker = LocalClaimTracker::new();
608        let repo = Uuid::new_v4();
609        let session_a = Uuid::new_v4();
610        let session_b = Uuid::new_v4();
611
612        tracker
613            .acquire_lock(
614                repo,
615                "src/lib.rs",
616                make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
617            )
618            .await
619            .unwrap();
620
621        tracker
622            .release_lock(repo, "src/lib.rs", session_a, "fn_a")
623            .await;
624
625        let result = tracker
626            .acquire_lock(
627                repo,
628                "src/lib.rs",
629                make_claim(session_b, "agent-2", "fn_a", SymbolKind::Function),
630            )
631            .await;
632        assert!(result.is_ok());
633    }
634
635    #[tokio::test]
636    async fn release_lock_cleans_empty_entries() {
637        let tracker = LocalClaimTracker::new();
638        let repo = Uuid::new_v4();
639        let session = Uuid::new_v4();
640
641        tracker
642            .acquire_lock(
643                repo,
644                "src/lib.rs",
645                make_claim(session, "agent-1", "fn_a", SymbolKind::Function),
646            )
647            .await
648            .unwrap();
649
650        tracker
651            .release_lock(repo, "src/lib.rs", session, "fn_a")
652            .await;
653
654        let key = (repo, "src/lib.rs".to_string());
655        assert!(tracker.claims.get(&key).is_none());
656    }
657
658    #[tokio::test]
659    async fn release_locks_returns_released_entries() {
660        let tracker = LocalClaimTracker::new();
661        let repo = Uuid::new_v4();
662        let session = Uuid::new_v4();
663
664        tracker
665            .acquire_lock(
666                repo,
667                "src/lib.rs",
668                make_claim(session, "agent-1", "fn_a", SymbolKind::Function),
669            )
670            .await
671            .unwrap();
672        tracker
673            .acquire_lock(
674                repo,
675                "src/api.rs",
676                make_claim(session, "agent-1", "handler", SymbolKind::Function),
677            )
678            .await
679            .unwrap();
680
681        let released = tracker.release_locks(repo, session).await;
682        assert_eq!(released.len(), 2);
683
684        let names: Vec<&str> = released.iter().map(|r| r.qualified_name.as_str()).collect();
685        assert!(names.contains(&"fn_a"));
686        assert!(names.contains(&"handler"));
687    }
688
689    #[tokio::test]
690    async fn release_locks_unblocks_other_session() {
691        let tracker = LocalClaimTracker::new();
692        let repo = Uuid::new_v4();
693        let session_a = Uuid::new_v4();
694        let session_b = Uuid::new_v4();
695
696        tracker
697            .acquire_lock(
698                repo,
699                "src/lib.rs",
700                make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
701            )
702            .await
703            .unwrap();
704
705        assert!(tracker
706            .acquire_lock(
707                repo,
708                "src/lib.rs",
709                make_claim(session_b, "agent-2", "fn_a", SymbolKind::Function),
710            )
711            .await
712            .is_err());
713
714        tracker.release_locks(repo, session_a).await;
715
716        assert!(tracker
717            .acquire_lock(
718                repo,
719                "src/lib.rs",
720                make_claim(session_b, "agent-2", "fn_a", SymbolKind::Function),
721            )
722            .await
723            .is_ok());
724    }
725}