1use async_trait::async_trait;
2use chrono::{DateTime, Utc};
3use dashmap::DashMap;
4use serde::{Deserialize, Serialize};
5use uuid::Uuid;
6
7use dk_core::SymbolKind;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct SymbolClaim {
12 pub session_id: Uuid,
13 pub agent_name: String,
14 pub qualified_name: String,
15 pub kind: SymbolKind,
16 pub first_touched_at: DateTime<Utc>,
17}
18
19#[derive(Debug, Clone)]
22pub struct ConflictInfo {
23 pub qualified_name: String,
24 pub kind: SymbolKind,
25 pub conflicting_session: Uuid,
26 pub conflicting_agent: String,
27 pub first_touched_at: DateTime<Utc>,
28}
29
30#[derive(Debug, Clone)]
33pub struct SymbolLocked {
34 pub qualified_name: String,
35 pub kind: SymbolKind,
36 pub locked_by_session: Uuid,
37 pub locked_by_agent: String,
38 pub locked_since: DateTime<Utc>,
39 pub file_path: String,
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum AcquireOutcome {
45 Fresh,
47 ReAcquired,
49}
50
51#[derive(Debug, Clone)]
54pub struct ReleasedLock {
55 pub file_path: String,
56 pub qualified_name: String,
57 pub kind: SymbolKind,
58 pub agent_name: String,
59}
60
61#[async_trait]
71pub trait ClaimTracker: Send + Sync {
72 async fn record_claim(&self, repo_id: Uuid, file_path: &str, claim: SymbolClaim);
74
75 async fn acquire_lock(
78 &self,
79 repo_id: Uuid,
80 file_path: &str,
81 claim: SymbolClaim,
82 ) -> Result<AcquireOutcome, SymbolLocked>;
83
84 async fn release_lock(
86 &self,
87 repo_id: Uuid,
88 file_path: &str,
89 session_id: Uuid,
90 qualified_name: &str,
91 );
92
93 async fn release_locks(&self, repo_id: Uuid, session_id: Uuid) -> Vec<ReleasedLock>;
95
96 async fn check_conflicts(
98 &self,
99 repo_id: Uuid,
100 file_path: &str,
101 session_id: Uuid,
102 qualified_names: &[String],
103 ) -> Vec<ConflictInfo>;
104
105 async fn get_all_conflicts_for_session(
113 &self,
114 repo_id: Uuid,
115 session_id: Uuid,
116 ) -> Vec<(String, ConflictInfo)>;
117
118 async fn clear_session(&self, session_id: Uuid) -> Vec<ReleasedLock>;
120}
121
122pub struct LocalClaimTracker {
136 claims: DashMap<(Uuid, String), Vec<SymbolClaim>>,
138}
139
140impl LocalClaimTracker {
141 pub fn new() -> Self {
142 Self {
143 claims: DashMap::new(),
144 }
145 }
146}
147
148impl Default for LocalClaimTracker {
149 fn default() -> Self {
150 Self::new()
151 }
152}
153
154#[async_trait]
155impl ClaimTracker for LocalClaimTracker {
156 async fn record_claim(&self, repo_id: Uuid, file_path: &str, claim: SymbolClaim) {
157 let key = (repo_id, file_path.to_string());
158 let mut entry = self.claims.entry(key).or_default();
159 let claims = entry.value_mut();
160
161 if let Some(existing) = claims.iter_mut().find(|c| {
162 c.session_id == claim.session_id && c.qualified_name == claim.qualified_name
163 }) {
164 existing.kind = claim.kind;
165 existing.agent_name = claim.agent_name;
166 } else {
167 claims.push(claim);
168 }
169 }
170
171 async fn acquire_lock(
172 &self,
173 repo_id: Uuid,
174 file_path: &str,
175 claim: SymbolClaim,
176 ) -> Result<AcquireOutcome, SymbolLocked> {
177 let key = (repo_id, file_path.to_string());
178 let mut entry = self.claims.entry(key).or_default();
179 let claims = entry.value_mut();
180
181 if let Some(existing) = claims.iter().find(|c| {
182 c.qualified_name == claim.qualified_name && c.session_id != claim.session_id
183 }) {
184 return Err(SymbolLocked {
185 qualified_name: claim.qualified_name,
186 kind: existing.kind.clone(),
187 locked_by_session: existing.session_id,
188 locked_by_agent: existing.agent_name.clone(),
189 locked_since: existing.first_touched_at,
190 file_path: file_path.to_string(),
191 });
192 }
193
194 if let Some(existing) = claims.iter_mut().find(|c| {
195 c.session_id == claim.session_id && c.qualified_name == claim.qualified_name
196 }) {
197 existing.kind = claim.kind;
198 existing.agent_name = claim.agent_name;
199 return Ok(AcquireOutcome::ReAcquired);
200 }
201
202 claims.push(claim);
203 Ok(AcquireOutcome::Fresh)
204 }
205
206 async fn release_lock(
207 &self,
208 repo_id: Uuid,
209 file_path: &str,
210 session_id: Uuid,
211 qualified_name: &str,
212 ) {
213 let key = (repo_id, file_path.to_string());
214 if let Some(mut entry) = self.claims.get_mut(&key) {
215 entry.value_mut().retain(|c| {
216 !(c.session_id == session_id && c.qualified_name == qualified_name)
217 });
218 }
219 self.claims.remove_if(&key, |_, v| v.is_empty());
220 }
221
222 async fn release_locks(&self, repo_id: Uuid, session_id: Uuid) -> Vec<ReleasedLock> {
223 let mut released = Vec::new();
224 let mut empty_keys = Vec::new();
225
226 for mut entry in self.claims.iter_mut() {
227 let key = entry.key().clone();
228 if key.0 != repo_id {
229 continue;
230 }
231 let file_path = &key.1;
232 let claims = entry.value_mut();
233
234 for claim in claims.iter().filter(|c| c.session_id == session_id) {
235 released.push(ReleasedLock {
236 file_path: file_path.clone(),
237 qualified_name: claim.qualified_name.clone(),
238 kind: claim.kind.clone(),
239 agent_name: claim.agent_name.clone(),
240 });
241 }
242
243 claims.retain(|c| c.session_id != session_id);
244 if claims.is_empty() {
245 empty_keys.push(key);
246 }
247 }
248
249 for key in empty_keys {
250 self.claims.remove_if(&key, |_, v| v.is_empty());
251 }
252
253 released
254 }
255
256 async fn check_conflicts(
257 &self,
258 repo_id: Uuid,
259 file_path: &str,
260 session_id: Uuid,
261 qualified_names: &[String],
262 ) -> Vec<ConflictInfo> {
263 let key = (repo_id, file_path.to_string());
264 let Some(entry) = self.claims.get(&key) else {
265 return Vec::new();
266 };
267
268 let mut conflicts = Vec::new();
269 for name in qualified_names {
270 for claim in entry.value() {
271 if claim.qualified_name == *name && claim.session_id != session_id {
272 conflicts.push(ConflictInfo {
273 qualified_name: name.clone(),
274 kind: claim.kind.clone(),
275 conflicting_session: claim.session_id,
276 conflicting_agent: claim.agent_name.clone(),
277 first_touched_at: claim.first_touched_at,
278 });
279 break;
280 }
281 }
282 }
283 conflicts
284 }
285
286 async fn get_all_conflicts_for_session(
287 &self,
288 repo_id: Uuid,
289 session_id: Uuid,
290 ) -> Vec<(String, ConflictInfo)> {
291 let mut results = Vec::new();
292 for entry in self.claims.iter() {
293 let (entry_repo_id, file_path) = entry.key();
294 if *entry_repo_id != repo_id {
295 continue;
296 }
297 let claims = entry.value();
298
299 let my_symbols: Vec<&SymbolClaim> = claims
300 .iter()
301 .filter(|c| c.session_id == session_id)
302 .collect();
303
304 for my_claim in &my_symbols {
305 for other_claim in claims {
306 if other_claim.session_id != session_id
307 && other_claim.qualified_name == my_claim.qualified_name
308 {
309 results.push((
310 file_path.clone(),
311 ConflictInfo {
312 qualified_name: my_claim.qualified_name.clone(),
313 kind: my_claim.kind.clone(),
314 conflicting_session: other_claim.session_id,
315 conflicting_agent: other_claim.agent_name.clone(),
316 first_touched_at: other_claim.first_touched_at,
317 },
318 ));
319 break;
320 }
321 }
322 }
323 }
324 results
325 }
326
327 async fn clear_session(&self, session_id: Uuid) -> Vec<ReleasedLock> {
328 let mut released = Vec::new();
329 let mut empty_keys = Vec::new();
330 for mut entry in self.claims.iter_mut() {
331 let key = entry.key().clone();
332 let file_path = &key.1;
333 let claims = entry.value_mut();
334
335 for claim in claims.iter().filter(|c| c.session_id == session_id) {
336 released.push(ReleasedLock {
337 file_path: file_path.clone(),
338 qualified_name: claim.qualified_name.clone(),
339 kind: claim.kind.clone(),
340 agent_name: claim.agent_name.clone(),
341 });
342 }
343
344 claims.retain(|c| c.session_id != session_id);
345 if claims.is_empty() {
346 empty_keys.push(key);
347 }
348 }
349 for key in empty_keys {
350 self.claims.remove_if(&key, |_, v| v.is_empty());
351 }
352 released
353 }
354}
355
356#[cfg(test)]
357mod tests {
358 use super::*;
359
360 fn make_claim(session_id: Uuid, agent: &str, name: &str, kind: SymbolKind) -> SymbolClaim {
361 SymbolClaim {
362 session_id,
363 agent_name: agent.to_string(),
364 qualified_name: name.to_string(),
365 kind,
366 first_touched_at: Utc::now(),
367 }
368 }
369
370 #[tokio::test]
371 async fn no_conflict_different_symbols_same_file() {
372 let tracker = LocalClaimTracker::new();
373 let repo = Uuid::new_v4();
374 let session_a = Uuid::new_v4();
375 let session_b = Uuid::new_v4();
376
377 tracker
378 .record_claim(
379 repo,
380 "src/lib.rs",
381 make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
382 )
383 .await;
384
385 let conflicts = tracker
386 .check_conflicts(repo, "src/lib.rs", session_b, &["fn_b".to_string()])
387 .await;
388 assert!(conflicts.is_empty(), "different symbols should not conflict");
389 }
390
391 #[tokio::test]
392 async fn conflict_same_symbol() {
393 let tracker = LocalClaimTracker::new();
394 let repo = Uuid::new_v4();
395 let session_a = Uuid::new_v4();
396 let session_b = Uuid::new_v4();
397
398 tracker
399 .record_claim(
400 repo,
401 "src/lib.rs",
402 make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
403 )
404 .await;
405
406 let conflicts = tracker
407 .check_conflicts(repo, "src/lib.rs", session_b, &["fn_a".to_string()])
408 .await;
409 assert_eq!(conflicts.len(), 1);
410 assert_eq!(conflicts[0].qualified_name, "fn_a");
411 assert_eq!(conflicts[0].conflicting_session, session_a);
412 assert_eq!(conflicts[0].conflicting_agent, "agent-1");
413 }
414
415 #[tokio::test]
416 async fn claims_cleared_on_session_destroy() {
417 let tracker = LocalClaimTracker::new();
418 let repo = Uuid::new_v4();
419 let session_a = Uuid::new_v4();
420 let session_b = Uuid::new_v4();
421
422 tracker
423 .record_claim(
424 repo,
425 "src/lib.rs",
426 make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
427 )
428 .await;
429
430 tracker.clear_session(session_a).await;
431
432 let conflicts = tracker
433 .check_conflicts(repo, "src/lib.rs", session_b, &["fn_a".to_string()])
434 .await;
435 assert!(
436 conflicts.is_empty(),
437 "cleared session should not cause conflicts"
438 );
439 }
440
441 #[tokio::test]
442 async fn same_session_no_self_conflict() {
443 let tracker = LocalClaimTracker::new();
444 let repo = Uuid::new_v4();
445 let session_a = Uuid::new_v4();
446
447 tracker
448 .record_claim(
449 repo,
450 "src/lib.rs",
451 make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
452 )
453 .await;
454 tracker
455 .record_claim(
456 repo,
457 "src/lib.rs",
458 make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
459 )
460 .await;
461
462 let conflicts = tracker
463 .check_conflicts(repo, "src/lib.rs", session_a, &["fn_a".to_string()])
464 .await;
465 assert!(
466 conflicts.is_empty(),
467 "same session should not conflict with itself"
468 );
469 }
470
471 #[tokio::test]
472 async fn multiple_conflicts() {
473 let tracker = LocalClaimTracker::new();
474 let repo = Uuid::new_v4();
475 let session_a = Uuid::new_v4();
476 let session_b = Uuid::new_v4();
477
478 tracker
479 .record_claim(
480 repo,
481 "src/lib.rs",
482 make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
483 )
484 .await;
485 tracker
486 .record_claim(
487 repo,
488 "src/lib.rs",
489 make_claim(session_a, "agent-1", "fn_b", SymbolKind::Function),
490 )
491 .await;
492
493 let conflicts = tracker
494 .check_conflicts(
495 repo,
496 "src/lib.rs",
497 session_b,
498 &["fn_a".to_string(), "fn_b".to_string()],
499 )
500 .await;
501 assert_eq!(conflicts.len(), 2);
502
503 let names: Vec<&str> = conflicts.iter().map(|c| c.qualified_name.as_str()).collect();
504 assert!(names.contains(&"fn_a"));
505 assert!(names.contains(&"fn_b"));
506 }
507
508 #[tokio::test]
509 async fn acquire_lock_unclaimed_succeeds() {
510 let tracker = LocalClaimTracker::new();
511 let repo = Uuid::new_v4();
512 let session = Uuid::new_v4();
513
514 let result = tracker
515 .acquire_lock(
516 repo,
517 "src/lib.rs",
518 make_claim(session, "agent-1", "fn_a", SymbolKind::Function),
519 )
520 .await;
521 assert!(result.is_ok());
522 }
523
524 #[tokio::test]
525 async fn acquire_lock_same_session_succeeds() {
526 let tracker = LocalClaimTracker::new();
527 let repo = Uuid::new_v4();
528 let session = Uuid::new_v4();
529
530 tracker
531 .acquire_lock(
532 repo,
533 "src/lib.rs",
534 make_claim(session, "agent-1", "fn_a", SymbolKind::Function),
535 )
536 .await
537 .unwrap();
538
539 let result = tracker
540 .acquire_lock(
541 repo,
542 "src/lib.rs",
543 make_claim(session, "agent-1", "fn_a", SymbolKind::Function),
544 )
545 .await;
546 assert!(result.is_ok());
547 }
548
549 #[tokio::test]
550 async fn acquire_lock_cross_session_blocked() {
551 let tracker = LocalClaimTracker::new();
552 let repo = Uuid::new_v4();
553 let session_a = Uuid::new_v4();
554 let session_b = Uuid::new_v4();
555
556 tracker
557 .acquire_lock(
558 repo,
559 "src/lib.rs",
560 make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
561 )
562 .await
563 .unwrap();
564
565 let result = tracker
566 .acquire_lock(
567 repo,
568 "src/lib.rs",
569 make_claim(session_b, "agent-2", "fn_a", SymbolKind::Function),
570 )
571 .await;
572 assert!(result.is_err());
573 let locked = result.unwrap_err();
574 assert_eq!(locked.qualified_name, "fn_a");
575 assert_eq!(locked.locked_by_session, session_a);
576 assert_eq!(locked.locked_by_agent, "agent-1");
577 }
578
579 #[tokio::test]
580 async fn acquire_lock_different_symbols_same_file() {
581 let tracker = LocalClaimTracker::new();
582 let repo = Uuid::new_v4();
583 let session_a = Uuid::new_v4();
584 let session_b = Uuid::new_v4();
585
586 tracker
587 .acquire_lock(
588 repo,
589 "src/lib.rs",
590 make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
591 )
592 .await
593 .unwrap();
594
595 let result = tracker
596 .acquire_lock(
597 repo,
598 "src/lib.rs",
599 make_claim(session_b, "agent-2", "fn_b", SymbolKind::Function),
600 )
601 .await;
602 assert!(result.is_ok());
603 }
604
605 #[tokio::test]
606 async fn release_lock_single_symbol() {
607 let tracker = LocalClaimTracker::new();
608 let repo = Uuid::new_v4();
609 let session_a = Uuid::new_v4();
610 let session_b = Uuid::new_v4();
611
612 tracker
613 .acquire_lock(
614 repo,
615 "src/lib.rs",
616 make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
617 )
618 .await
619 .unwrap();
620
621 tracker
622 .release_lock(repo, "src/lib.rs", session_a, "fn_a")
623 .await;
624
625 let result = tracker
626 .acquire_lock(
627 repo,
628 "src/lib.rs",
629 make_claim(session_b, "agent-2", "fn_a", SymbolKind::Function),
630 )
631 .await;
632 assert!(result.is_ok());
633 }
634
635 #[tokio::test]
636 async fn release_lock_cleans_empty_entries() {
637 let tracker = LocalClaimTracker::new();
638 let repo = Uuid::new_v4();
639 let session = Uuid::new_v4();
640
641 tracker
642 .acquire_lock(
643 repo,
644 "src/lib.rs",
645 make_claim(session, "agent-1", "fn_a", SymbolKind::Function),
646 )
647 .await
648 .unwrap();
649
650 tracker
651 .release_lock(repo, "src/lib.rs", session, "fn_a")
652 .await;
653
654 let key = (repo, "src/lib.rs".to_string());
655 assert!(tracker.claims.get(&key).is_none());
656 }
657
658 #[tokio::test]
659 async fn release_locks_returns_released_entries() {
660 let tracker = LocalClaimTracker::new();
661 let repo = Uuid::new_v4();
662 let session = Uuid::new_v4();
663
664 tracker
665 .acquire_lock(
666 repo,
667 "src/lib.rs",
668 make_claim(session, "agent-1", "fn_a", SymbolKind::Function),
669 )
670 .await
671 .unwrap();
672 tracker
673 .acquire_lock(
674 repo,
675 "src/api.rs",
676 make_claim(session, "agent-1", "handler", SymbolKind::Function),
677 )
678 .await
679 .unwrap();
680
681 let released = tracker.release_locks(repo, session).await;
682 assert_eq!(released.len(), 2);
683
684 let names: Vec<&str> = released.iter().map(|r| r.qualified_name.as_str()).collect();
685 assert!(names.contains(&"fn_a"));
686 assert!(names.contains(&"handler"));
687 }
688
689 #[tokio::test]
690 async fn release_locks_unblocks_other_session() {
691 let tracker = LocalClaimTracker::new();
692 let repo = Uuid::new_v4();
693 let session_a = Uuid::new_v4();
694 let session_b = Uuid::new_v4();
695
696 tracker
697 .acquire_lock(
698 repo,
699 "src/lib.rs",
700 make_claim(session_a, "agent-1", "fn_a", SymbolKind::Function),
701 )
702 .await
703 .unwrap();
704
705 assert!(tracker
706 .acquire_lock(
707 repo,
708 "src/lib.rs",
709 make_claim(session_b, "agent-2", "fn_a", SymbolKind::Function),
710 )
711 .await
712 .is_err());
713
714 tracker.release_locks(repo, session_a).await;
715
716 assert!(tracker
717 .acquire_lock(
718 repo,
719 "src/lib.rs",
720 make_claim(session_b, "agent-2", "fn_a", SymbolKind::Function),
721 )
722 .await
723 .is_ok());
724 }
725}