1use std::sync::atomic::{AtomicU32, Ordering};
8use std::sync::Arc;
9
10use dashmap::DashMap;
11use dk_core::{AgentId, RepoId, Result};
12use serde::Serialize;
13use sqlx::PgPool;
14use tokio::time::Instant;
15use uuid::Uuid;
16
17use crate::workspace::cache::{NoOpCache, WorkspaceCache};
18use crate::workspace::session_workspace::{
19 SessionId, SessionWorkspace, WorkspaceMode,
20};
21
22#[derive(Debug, Clone, Serialize)]
26pub struct SessionInfo {
27 pub session_id: Uuid,
28 pub agent_id: String,
29 pub agent_name: String,
30 pub intent: String,
31 pub repo_id: Uuid,
32 pub changeset_id: Uuid,
33 pub state: String,
34 pub elapsed_secs: u64,
35}
36
37const TOUCH_DEBOUNCE: std::time::Duration = std::time::Duration::from_secs(30);
41
42pub struct WorkspaceManager {
52 workspaces: DashMap<SessionId, SessionWorkspace>,
53 agent_counters: DashMap<Uuid, AtomicU32>,
54 db: PgPool,
55 cache: Arc<dyn WorkspaceCache>,
56 last_touched: DashMap<SessionId, Instant>,
58}
59
60impl WorkspaceManager {
61 pub fn new(db: PgPool) -> Self {
63 Self::with_cache(db, Arc::new(NoOpCache))
64 }
65
66 pub fn with_cache(db: PgPool, cache: Arc<dyn WorkspaceCache>) -> Self {
71 Self {
72 workspaces: DashMap::new(),
73 agent_counters: DashMap::new(),
74 db,
75 cache,
76 last_touched: DashMap::new(),
77 }
78 }
79
80 pub fn cache(&self) -> &dyn WorkspaceCache {
82 self.cache.as_ref()
83 }
84
85 pub fn next_agent_name(&self, repo_id: &Uuid) -> String {
89 let counter = self
90 .agent_counters
91 .entry(*repo_id)
92 .or_insert_with(|| AtomicU32::new(0));
93 let n = counter.value().fetch_add(1, Ordering::Relaxed) + 1;
94 format!("agent-{n}")
95 }
96
97 #[allow(clippy::too_many_arguments)]
99 pub async fn create_workspace(
100 &self,
101 session_id: SessionId,
102 repo_id: RepoId,
103 agent_id: AgentId,
104 changeset_id: uuid::Uuid,
105 intent: String,
106 base_commit: String,
107 mode: WorkspaceMode,
108 agent_name: String,
109 ) -> Result<SessionId> {
110 let ws = SessionWorkspace::new(
111 session_id,
112 repo_id,
113 agent_id,
114 changeset_id,
115 intent,
116 base_commit,
117 mode,
118 agent_name,
119 self.db.clone(),
120 )
121 .await?;
122
123 let snapshot = crate::workspace::cache::WorkspaceSnapshot {
126 session_id: ws.session_id,
127 repo_id: ws.repo_id,
128 agent_id: ws.agent_id.clone(),
129 agent_name: ws.agent_name.clone(),
130 changeset_id: ws.changeset_id,
131 intent: ws.intent.clone(),
132 base_commit: ws.base_commit.clone(),
133 state: ws.state.as_str().to_string(),
134 mode: ws.mode.as_str().to_string(),
135 };
136 let cache = self.cache.clone();
137 tokio::spawn(async move {
138 if let Err(e) = cache.cache_workspace(&session_id, &snapshot).await {
139 tracing::warn!("L2 cache write failed on create: {e}");
140 }
141 });
142
143 self.workspaces.insert(session_id, ws);
144 Ok(session_id)
145 }
146
147 pub fn get_workspace(
149 &self,
150 session_id: &SessionId,
151 ) -> Option<dashmap::mapref::one::Ref<'_, SessionId, SessionWorkspace>> {
152 let result = self.workspaces.get(session_id);
153 if result.is_some() {
154 self.touch_in_cache(session_id);
155 }
156 result
157 }
158
159 pub fn get_workspace_mut(
161 &self,
162 session_id: &SessionId,
163 ) -> Option<dashmap::mapref::one::RefMut<'_, SessionId, SessionWorkspace>> {
164 let result = self.workspaces.get_mut(session_id);
165 if result.is_some() {
166 self.touch_in_cache(session_id);
167 }
168 result
169 }
170
171 fn evict_from_cache(&self, session_ids: &[SessionId]) {
174 if let Ok(handle) = tokio::runtime::Handle::try_current() {
175 for &sid in session_ids {
176 let cache = self.cache.clone();
177 handle.spawn(async move {
178 if let Err(e) = cache.evict(&sid).await {
179 tracing::warn!("L2 cache evict failed: {e}");
180 }
181 });
182 }
183 }
184 }
185
186 fn touch_in_cache(&self, session_id: &SessionId) {
189 let now = Instant::now();
190 let should_touch = self
191 .last_touched
192 .get(session_id)
193 .is_none_or(|t| now.duration_since(*t) > TOUCH_DEBOUNCE);
194 if !should_touch {
195 return;
196 }
197 self.last_touched.insert(*session_id, now);
198 if let Ok(handle) = tokio::runtime::Handle::try_current() {
199 let sid = *session_id;
200 let cache = self.cache.clone();
201 handle.spawn(async move {
202 if let Err(e) = cache.touch(&sid).await {
203 tracing::warn!("L2 cache touch failed: {e}");
204 }
205 });
206 }
207 }
208
209 pub fn destroy_workspace(&self, session_id: &SessionId) -> Option<SessionWorkspace> {
211 self.last_touched.remove(session_id);
212 self.evict_from_cache(&[*session_id]);
213 self.workspaces.remove(session_id).map(|(_, ws)| ws)
214 }
215
216 pub fn active_count(&self, repo_id: RepoId) -> usize {
218 self.workspaces
219 .iter()
220 .filter(|entry| entry.value().repo_id == repo_id)
221 .count()
222 }
223
224 pub fn active_sessions_for_repo(
227 &self,
228 repo_id: RepoId,
229 exclude_session: Option<SessionId>,
230 ) -> Vec<SessionId> {
231 self.workspaces
232 .iter()
233 .filter(|entry| {
234 entry.value().repo_id == repo_id
235 && exclude_session.is_none_or(|ex| *entry.key() != ex)
236 })
237 .map(|entry| *entry.key())
238 .collect()
239 }
240
241 pub fn gc_expired(&self) -> Vec<SessionId> {
247 let now = Instant::now();
248 let mut expired = Vec::new();
249
250 self.workspaces.iter().for_each(|entry| {
252 if let WorkspaceMode::Persistent {
253 expires_at: Some(deadline),
254 } = &entry.value().mode
255 {
256 if now >= *deadline {
257 expired.push(*entry.key());
258 }
259 }
260 });
261
262 for sid in &expired {
263 self.last_touched.remove(sid);
264 self.workspaces.remove(sid);
265 }
266 self.evict_from_cache(&expired);
267
268 expired
269 }
270
271 pub fn cleanup_disconnected(&self, active_session_ids: &[uuid::Uuid]) {
274 let to_remove: Vec<uuid::Uuid> = self.workspaces.iter()
275 .filter(|entry| !active_session_ids.contains(entry.key()))
276 .map(|entry| *entry.key())
277 .collect();
278 for sid in &to_remove {
279 self.last_touched.remove(sid);
280 self.workspaces.remove(sid);
281 }
282 self.evict_from_cache(&to_remove);
283 }
284
285 pub fn gc_expired_sessions(
291 &self,
292 idle_ttl: std::time::Duration,
293 max_ttl: std::time::Duration,
294 ) -> Vec<SessionId> {
295 let now = Instant::now();
296 let mut expired = Vec::new();
297
298 self.workspaces.retain(|_session_id, ws| {
299 let idle = now.duration_since(ws.last_active);
300 let total = now.duration_since(ws.created_at);
301
302 if idle > idle_ttl || total > max_ttl {
303 expired.push(ws.session_id);
304 false } else {
306 true }
308 });
309 for sid in &expired {
310 self.last_touched.remove(sid);
311 }
312 self.evict_from_cache(&expired);
313
314 expired
315 }
316
317 #[doc(hidden)]
322 pub fn insert_test_workspace(&self, ws: SessionWorkspace) {
323 let sid = ws.session_id;
324 self.workspaces.insert(sid, ws);
325 }
326
327 pub fn total_active(&self) -> usize {
329 self.workspaces.len()
330 }
331
332 pub fn describe_other_modifiers(
338 &self,
339 file_path: &str,
340 repo_id: RepoId,
341 exclude_session: SessionId,
342 ) -> String {
343 let mut parts: Vec<String> = Vec::new();
344
345 for entry in self.workspaces.iter() {
346 let ws = entry.value();
347 if ws.repo_id != repo_id || ws.session_id == exclude_session {
348 continue;
349 }
350
351 if !ws.overlay.list_paths().contains(&file_path.to_string()) {
353 continue;
354 }
355
356 let symbols = ws.graph.changed_symbols_for_file(file_path);
358 let agent = &ws.agent_name;
359
360 if symbols.is_empty() {
361 parts.push(format!("modified by {agent}"));
362 } else {
363 let sym_list: Vec<&str> = symbols.iter().take(3).map(|s| s.as_str()).collect();
365 let sym_str = sym_list.join(", ");
366 if symbols.len() > 3 {
367 parts.push(format!("{sym_str},... modified by {agent}"));
368 } else {
369 parts.push(format!("{sym_str} modified by {agent}"));
370 }
371 }
372 }
373
374 parts.join("; ")
375 }
376
377 pub fn list_sessions(&self, repo_id: RepoId) -> Vec<SessionInfo> {
379 let now = Instant::now();
380 self.workspaces
381 .iter()
382 .filter(|entry| entry.value().repo_id == repo_id)
383 .map(|entry| {
384 let ws = entry.value();
385 SessionInfo {
386 session_id: ws.session_id,
387 agent_id: ws.agent_id.clone(),
388 agent_name: ws.agent_name.clone(),
389 intent: ws.intent.clone(),
390 repo_id: ws.repo_id,
391 changeset_id: ws.changeset_id,
392 state: ws.state.as_str().to_string(),
393 elapsed_secs: now.duration_since(ws.created_at).as_secs(),
394 }
395 })
396 .collect()
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use super::*;
403
404 #[test]
405 fn session_info_serializes_to_json() {
406 let info = SessionInfo {
407 session_id: Uuid::nil(),
408 agent_id: "test-agent".to_string(),
409 agent_name: "agent-1".to_string(),
410 intent: "fix bug".to_string(),
411 repo_id: Uuid::nil(),
412 changeset_id: Uuid::nil(),
413 state: "active".to_string(),
414 elapsed_secs: 42,
415 };
416
417 let json = serde_json::to_value(&info).expect("SessionInfo should serialize to JSON");
418
419 assert_eq!(json["agent_id"], "test-agent");
420 assert_eq!(json["agent_name"], "agent-1");
421 assert_eq!(json["intent"], "fix bug");
422 assert_eq!(json["state"], "active");
423 assert_eq!(json["elapsed_secs"], 42);
424 assert_eq!(
425 json["session_id"],
426 "00000000-0000-0000-0000-000000000000"
427 );
428 }
429
430 #[test]
431 fn session_info_all_fields_present_in_json() {
432 let info = SessionInfo {
433 session_id: Uuid::new_v4(),
434 agent_id: "claude".to_string(),
435 agent_name: "agent-1".to_string(),
436 intent: "refactor".to_string(),
437 repo_id: Uuid::new_v4(),
438 changeset_id: Uuid::new_v4(),
439 state: "submitted".to_string(),
440 elapsed_secs: 100,
441 };
442
443 let json = serde_json::to_value(&info).expect("serialize");
444 let obj = json.as_object().expect("should be an object");
445
446 let expected_keys = [
447 "session_id",
448 "agent_id",
449 "agent_name",
450 "intent",
451 "repo_id",
452 "changeset_id",
453 "state",
454 "elapsed_secs",
455 ];
456 for key in &expected_keys {
457 assert!(obj.contains_key(*key), "missing key: {}", key);
458 }
459 assert_eq!(obj.len(), expected_keys.len(), "unexpected extra keys in SessionInfo JSON");
460 }
461
462 #[test]
463 fn session_info_clone_preserves_values() {
464 let info = SessionInfo {
465 session_id: Uuid::new_v4(),
466 agent_id: "agent-1".to_string(),
467 agent_name: "feature-bot".to_string(),
468 intent: "deploy".to_string(),
469 repo_id: Uuid::new_v4(),
470 changeset_id: Uuid::new_v4(),
471 state: "active".to_string(),
472 elapsed_secs: 5,
473 };
474
475 let cloned = info.clone();
476 assert_eq!(info.session_id, cloned.session_id);
477 assert_eq!(info.agent_id, cloned.agent_id);
478 assert_eq!(info.agent_name, cloned.agent_name);
479 assert_eq!(info.intent, cloned.intent);
480 assert_eq!(info.repo_id, cloned.repo_id);
481 assert_eq!(info.changeset_id, cloned.changeset_id);
482 assert_eq!(info.state, cloned.state);
483 assert_eq!(info.elapsed_secs, cloned.elapsed_secs);
484 }
485
486 #[tokio::test]
487 async fn next_agent_name_increments_per_repo() {
488 let db = PgPool::connect_lazy("postgres://localhost/nonexistent").unwrap();
489 let mgr = WorkspaceManager::new(db);
490 let repo1 = Uuid::new_v4();
491 let repo2 = Uuid::new_v4();
492
493 assert_eq!(mgr.next_agent_name(&repo1), "agent-1");
494 assert_eq!(mgr.next_agent_name(&repo1), "agent-2");
495 assert_eq!(mgr.next_agent_name(&repo1), "agent-3");
496
497 assert_eq!(mgr.next_agent_name(&repo2), "agent-1");
499 assert_eq!(mgr.next_agent_name(&repo2), "agent-2");
500
501 assert_eq!(mgr.next_agent_name(&repo1), "agent-4");
503 }
504
505 #[test]
510 #[ignore]
511 fn list_sessions_returns_empty_for_unknown_repo() {
512 }
515
516 #[tokio::test]
517 async fn describe_other_modifiers_empty_when_no_other_sessions() {
518 let db = PgPool::connect_lazy("postgres://localhost/nonexistent").unwrap();
519 let mgr = WorkspaceManager::new(db);
520 let repo_id = Uuid::new_v4();
521 let session_id = Uuid::new_v4();
522
523 let result = mgr.describe_other_modifiers("src/lib.rs", repo_id, session_id);
524 assert!(result.is_empty());
525 }
526
527 #[tokio::test]
528 async fn describe_other_modifiers_shows_agent_name() {
529 use crate::workspace::session_workspace::{SessionWorkspace, WorkspaceMode};
530
531 let db = PgPool::connect_lazy("postgres://localhost/nonexistent").unwrap();
532 let mgr = WorkspaceManager::new(db);
533 let repo_id = Uuid::new_v4();
534
535 let session1 = Uuid::new_v4();
536 let session2 = Uuid::new_v4();
537
538 let mut ws2 = SessionWorkspace::new_test(
539 session2,
540 repo_id,
541 "agent-2-id".to_string(),
542 "fix bug".to_string(),
543 "abc123".to_string(),
544 WorkspaceMode::Ephemeral,
545 );
546 ws2.agent_name = "agent-2".to_string();
547 ws2.overlay.write_local("src/lib.rs", b"content".to_vec(), false);
548
549 mgr.insert_test_workspace(ws2);
550
551 let result = mgr.describe_other_modifiers("src/lib.rs", repo_id, session1);
552 assert_eq!(result, "modified by agent-2");
553
554 let result2 = mgr.describe_other_modifiers("src/other.rs", repo_id, session1);
555 assert!(result2.is_empty());
556
557 let result3 = mgr.describe_other_modifiers("src/lib.rs", repo_id, session2);
558 assert!(result3.is_empty());
559 }
560
561 #[tokio::test]
562 async fn describe_other_modifiers_includes_symbols() {
563 use crate::workspace::session_workspace::{SessionWorkspace, WorkspaceMode};
564 use dk_core::{Span, Symbol, SymbolKind, Visibility};
565 use std::path::PathBuf;
566
567 let db = PgPool::connect_lazy("postgres://localhost/nonexistent").unwrap();
568 let mgr = WorkspaceManager::new(db);
569 let repo_id = Uuid::new_v4();
570
571 let session1 = Uuid::new_v4();
572 let session2 = Uuid::new_v4();
573
574 let mut ws2 = SessionWorkspace::new_test(
575 session2,
576 repo_id,
577 "agent-2-id".to_string(),
578 "add feature".to_string(),
579 "abc123".to_string(),
580 WorkspaceMode::Ephemeral,
581 );
582 ws2.agent_name = "agent-2".to_string();
583 ws2.overlay
584 .write_local("src/tasks.rs", b"fn create_task() {}".to_vec(), true);
585 ws2.graph.add_symbol(Symbol {
586 id: Uuid::new_v4(),
587 name: "create_task".to_string(),
588 qualified_name: "create_task".to_string(),
589 kind: SymbolKind::Function,
590 visibility: Visibility::Public,
591 file_path: PathBuf::from("src/tasks.rs"),
592 span: Span {
593 start_byte: 0,
594 end_byte: 20,
595 },
596 signature: None,
597 doc_comment: None,
598 parent: None,
599 last_modified_by: None,
600 last_modified_intent: None,
601 });
602
603 mgr.insert_test_workspace(ws2);
604
605 let result = mgr.describe_other_modifiers("src/tasks.rs", repo_id, session1);
606 assert_eq!(result, "create_task modified by agent-2");
607 }
608}