Skip to main content

dk_engine/workspace/
session_manager.rs

1//! WorkspaceManager — manages all active session workspaces.
2//!
3//! Provides creation, lookup, destruction, and garbage collection of
4//! workspaces. Uses `DashMap` for lock-free concurrent access from
5//! multiple agent sessions.
6
7use std::sync::atomic::{AtomicU32, Ordering};
8use std::sync::Arc;
9
10use dashmap::DashMap;
11use dk_core::{AgentId, RepoId, Result};
12use serde::Serialize;
13use sqlx::PgPool;
14use tokio::time::Instant;
15use uuid::Uuid;
16
17use crate::workspace::cache::{NoOpCache, WorkspaceCache};
18use crate::workspace::session_workspace::{
19    SessionId, SessionWorkspace, WorkspaceMode,
20};
21
22// ── SessionInfo ─────────────────────────────────────────────────────
23
24/// Lightweight snapshot of a session workspace, suitable for JSON serialization.
25#[derive(Debug, Clone, Serialize)]
26pub struct SessionInfo {
27    pub session_id: Uuid,
28    pub agent_id: String,
29    pub agent_name: String,
30    pub intent: String,
31    pub repo_id: Uuid,
32    pub changeset_id: Uuid,
33    pub state: String,
34    pub elapsed_secs: u64,
35}
36
37// ── WorkspaceManager ─────────────────────────────────────────────────
38
39/// Minimum interval between L2 cache touch calls per session.
40const TOUCH_DEBOUNCE: std::time::Duration = std::time::Duration::from_secs(30);
41
42/// Central registry of all active session workspaces.
43///
44/// Thread-safe via `DashMap`; every public method is either `&self` or
45/// returns a scoped reference guard.
46///
47/// The optional `cache` field holds an [`Arc`]-wrapped [`WorkspaceCache`]
48/// implementation. In single-pod deployments the default [`NoOpCache`] is
49/// used. Multi-pod deployments can supply a `ValkeyCache` (or any other
50/// implementation) via [`WorkspaceManager::with_cache`].
51pub struct WorkspaceManager {
52    workspaces: DashMap<SessionId, SessionWorkspace>,
53    agent_counters: DashMap<Uuid, AtomicU32>,
54    db: PgPool,
55    cache: Arc<dyn WorkspaceCache>,
56    /// Tracks when each session was last touched in L2 cache to debounce.
57    last_touched: DashMap<SessionId, Instant>,
58}
59
60impl WorkspaceManager {
61    /// Create a new, empty workspace manager backed by [`NoOpCache`].
62    pub fn new(db: PgPool) -> Self {
63        Self::with_cache(db, Arc::new(NoOpCache))
64    }
65
66    /// Create a workspace manager with an explicit cache implementation.
67    ///
68    /// Use this constructor when a `ValkeyCache` or other L2 cache is
69    /// available. Pass `Arc::new(NoOpCache)` to opt-out of caching.
70    pub fn with_cache(db: PgPool, cache: Arc<dyn WorkspaceCache>) -> Self {
71        Self {
72            workspaces: DashMap::new(),
73            agent_counters: DashMap::new(),
74            db,
75            cache,
76            last_touched: DashMap::new(),
77        }
78    }
79
80    /// Return a reference to the underlying cache implementation.
81    pub fn cache(&self) -> &dyn WorkspaceCache {
82        self.cache.as_ref()
83    }
84
85    /// Auto-assign the next agent name for a repository.
86    ///
87    /// Returns "agent-1", "agent-2", etc. incrementing per repo.
88    pub fn next_agent_name(&self, repo_id: &Uuid) -> String {
89        let counter = self
90            .agent_counters
91            .entry(*repo_id)
92            .or_insert_with(|| AtomicU32::new(0));
93        let n = counter.value().fetch_add(1, Ordering::Relaxed) + 1;
94        format!("agent-{n}")
95    }
96
97    /// Create a new workspace for a session and register it.
98    #[allow(clippy::too_many_arguments)]
99    pub async fn create_workspace(
100        &self,
101        session_id: SessionId,
102        repo_id: RepoId,
103        agent_id: AgentId,
104        changeset_id: uuid::Uuid,
105        intent: String,
106        base_commit: String,
107        mode: WorkspaceMode,
108        agent_name: String,
109    ) -> Result<SessionId> {
110        let ws = SessionWorkspace::new(
111            session_id,
112            repo_id,
113            agent_id,
114            changeset_id,
115            intent,
116            base_commit,
117            mode,
118            agent_name,
119            self.db.clone(),
120        )
121        .await?;
122
123        // Write-through to L2 cache (fire-and-forget — Valkey failure
124        // does not block workspace creation).
125        let snapshot = crate::workspace::cache::WorkspaceSnapshot {
126            session_id: ws.session_id,
127            repo_id: ws.repo_id,
128            agent_id: ws.agent_id.clone(),
129            agent_name: ws.agent_name.clone(),
130            changeset_id: ws.changeset_id,
131            intent: ws.intent.clone(),
132            base_commit: ws.base_commit.clone(),
133            state: ws.state.as_str().to_string(),
134            mode: ws.mode.as_str().to_string(),
135        };
136        let cache = self.cache.clone();
137        tokio::spawn(async move {
138            if let Err(e) = cache.cache_workspace(&session_id, &snapshot).await {
139                tracing::warn!("L2 cache write failed on create: {e}");
140            }
141        });
142
143        self.workspaces.insert(session_id, ws);
144        Ok(session_id)
145    }
146
147    /// Get an immutable reference to a workspace.
148    pub fn get_workspace(
149        &self,
150        session_id: &SessionId,
151    ) -> Option<dashmap::mapref::one::Ref<'_, SessionId, SessionWorkspace>> {
152        let result = self.workspaces.get(session_id);
153        if result.is_some() {
154            self.touch_in_cache(session_id);
155        }
156        result
157    }
158
159    /// Get a mutable reference to a workspace.
160    pub fn get_workspace_mut(
161        &self,
162        session_id: &SessionId,
163    ) -> Option<dashmap::mapref::one::RefMut<'_, SessionId, SessionWorkspace>> {
164        let result = self.workspaces.get_mut(session_id);
165        if result.is_some() {
166            self.touch_in_cache(session_id);
167        }
168        result
169    }
170
171    /// Fire-and-forget L2 cache eviction for one or more session IDs.
172    /// Safe to call from sync contexts — silently skips if no Tokio runtime.
173    fn evict_from_cache(&self, session_ids: &[SessionId]) {
174        if let Ok(handle) = tokio::runtime::Handle::try_current() {
175            for &sid in session_ids {
176                let cache = self.cache.clone();
177                handle.spawn(async move {
178                    if let Err(e) = cache.evict(&sid).await {
179                        tracing::warn!("L2 cache evict failed: {e}");
180                    }
181                });
182            }
183        }
184    }
185
186    /// Fire-and-forget L2 cache TTL refresh.
187    /// Prevents cache entries from expiring during long-lived sessions.
188    fn touch_in_cache(&self, session_id: &SessionId) {
189        let now = Instant::now();
190        let should_touch = self
191            .last_touched
192            .get(session_id)
193            .is_none_or(|t| now.duration_since(*t) > TOUCH_DEBOUNCE);
194        if !should_touch {
195            return;
196        }
197        self.last_touched.insert(*session_id, now);
198        if let Ok(handle) = tokio::runtime::Handle::try_current() {
199            let sid = *session_id;
200            let cache = self.cache.clone();
201            handle.spawn(async move {
202                if let Err(e) = cache.touch(&sid).await {
203                    tracing::warn!("L2 cache touch failed: {e}");
204                }
205            });
206        }
207    }
208
209    /// Remove and drop a workspace.
210    pub fn destroy_workspace(&self, session_id: &SessionId) -> Option<SessionWorkspace> {
211        self.last_touched.remove(session_id);
212        self.evict_from_cache(&[*session_id]);
213        self.workspaces.remove(session_id).map(|(_, ws)| ws)
214    }
215
216    /// Count active workspaces for a specific repository.
217    pub fn active_count(&self, repo_id: RepoId) -> usize {
218        self.workspaces
219            .iter()
220            .filter(|entry| entry.value().repo_id == repo_id)
221            .count()
222    }
223
224    /// Return session IDs of all active workspaces for a repo,
225    /// optionally excluding one session.
226    pub fn active_sessions_for_repo(
227        &self,
228        repo_id: RepoId,
229        exclude_session: Option<SessionId>,
230    ) -> Vec<SessionId> {
231        self.workspaces
232            .iter()
233            .filter(|entry| {
234                entry.value().repo_id == repo_id
235                    && exclude_session.is_none_or(|ex| *entry.key() != ex)
236            })
237            .map(|entry| *entry.key())
238            .collect()
239    }
240
241    /// Garbage-collect expired persistent workspaces.
242    ///
243    /// Ephemeral workspaces are not GC'd here — they are destroyed when
244    /// the session disconnects. This only handles persistent workspaces
245    /// whose `expires_at` deadline has passed.
246    pub fn gc_expired(&self) -> Vec<SessionId> {
247        let now = Instant::now();
248        let mut expired = Vec::new();
249
250        // Collect IDs first to avoid holding DashMap guards during removal.
251        self.workspaces.iter().for_each(|entry| {
252            if let WorkspaceMode::Persistent {
253                expires_at: Some(deadline),
254            } = &entry.value().mode
255            {
256                if now >= *deadline {
257                    expired.push(*entry.key());
258                }
259            }
260        });
261
262        for sid in &expired {
263            self.last_touched.remove(sid);
264            self.workspaces.remove(sid);
265        }
266        self.evict_from_cache(&expired);
267
268        expired
269    }
270
271    /// Destroy workspaces for sessions that no longer exist.
272    /// Call this when a session disconnects or during periodic cleanup.
273    pub fn cleanup_disconnected(&self, active_session_ids: &[uuid::Uuid]) {
274        let to_remove: Vec<uuid::Uuid> = self.workspaces.iter()
275            .filter(|entry| !active_session_ids.contains(entry.key()))
276            .map(|entry| *entry.key())
277            .collect();
278        for sid in &to_remove {
279            self.last_touched.remove(sid);
280            self.workspaces.remove(sid);
281        }
282        self.evict_from_cache(&to_remove);
283    }
284
285    /// Remove workspaces that are idle beyond `idle_ttl` or alive beyond `max_ttl`.
286    ///
287    /// Returns the list of expired session IDs. This complements [`gc_expired`]
288    /// (which handles persistent workspace deadlines) by enforcing activity-based
289    /// and hard-maximum lifetime limits on **all** workspaces.
290    pub fn gc_expired_sessions(
291        &self,
292        idle_ttl: std::time::Duration,
293        max_ttl: std::time::Duration,
294    ) -> Vec<SessionId> {
295        let now = Instant::now();
296        let mut expired = Vec::new();
297
298        self.workspaces.retain(|_session_id, ws| {
299            let idle = now.duration_since(ws.last_active);
300            let total = now.duration_since(ws.created_at);
301
302            if idle > idle_ttl || total > max_ttl {
303                expired.push(ws.session_id);
304                false // remove
305            } else {
306                true // keep
307            }
308        });
309        for sid in &expired {
310            self.last_touched.remove(sid);
311        }
312        self.evict_from_cache(&expired);
313
314        expired
315    }
316
317    /// Insert a pre-built workspace (test-only).
318    ///
319    /// Allows unit tests to insert workspaces with manipulated timestamps
320    /// without requiring a live database connection.
321    #[doc(hidden)]
322    pub fn insert_test_workspace(&self, ws: SessionWorkspace) {
323        let sid = ws.session_id;
324        self.workspaces.insert(sid, ws);
325    }
326
327    /// Total number of active workspaces across all repos.
328    pub fn total_active(&self) -> usize {
329        self.workspaces.len()
330    }
331
332    /// Describe which other sessions have modified a given file.
333    ///
334    /// Returns a formatted string like `"fn create_task modified by agent-2"`
335    /// or `"modified by agent-2, agent-3"`. Returns an empty string if no
336    /// other session has touched the file.
337    pub fn describe_other_modifiers(
338        &self,
339        file_path: &str,
340        repo_id: RepoId,
341        exclude_session: SessionId,
342    ) -> String {
343        let mut parts: Vec<String> = Vec::new();
344
345        for entry in self.workspaces.iter() {
346            let ws = entry.value();
347            if ws.repo_id != repo_id || ws.session_id == exclude_session {
348                continue;
349            }
350
351            // Check if this other session has the file in its overlay
352            if !ws.overlay.list_paths().contains(&file_path.to_string()) {
353                continue;
354            }
355
356            // Get changed symbols for this file from the session graph
357            let symbols = ws.graph.changed_symbols_for_file(file_path);
358            let agent = &ws.agent_name;
359
360            if symbols.is_empty() {
361                parts.push(format!("modified by {agent}"));
362            } else {
363                // Take up to 3 symbol names to keep it concise
364                let sym_list: Vec<&str> = symbols.iter().take(3).map(|s| s.as_str()).collect();
365                let sym_str = sym_list.join(", ");
366                if symbols.len() > 3 {
367                    parts.push(format!("{sym_str},... modified by {agent}"));
368                } else {
369                    parts.push(format!("{sym_str} modified by {agent}"));
370                }
371            }
372        }
373
374        parts.join("; ")
375    }
376
377    /// List all active sessions for a given repository.
378    pub fn list_sessions(&self, repo_id: RepoId) -> Vec<SessionInfo> {
379        let now = Instant::now();
380        self.workspaces
381            .iter()
382            .filter(|entry| entry.value().repo_id == repo_id)
383            .map(|entry| {
384                let ws = entry.value();
385                SessionInfo {
386                    session_id: ws.session_id,
387                    agent_id: ws.agent_id.clone(),
388                    agent_name: ws.agent_name.clone(),
389                    intent: ws.intent.clone(),
390                    repo_id: ws.repo_id,
391                    changeset_id: ws.changeset_id,
392                    state: ws.state.as_str().to_string(),
393                    elapsed_secs: now.duration_since(ws.created_at).as_secs(),
394                }
395            })
396            .collect()
397    }
398}
399
400#[cfg(test)]
401mod tests {
402    use super::*;
403
404    #[test]
405    fn session_info_serializes_to_json() {
406        let info = SessionInfo {
407            session_id: Uuid::nil(),
408            agent_id: "test-agent".to_string(),
409            agent_name: "agent-1".to_string(),
410            intent: "fix bug".to_string(),
411            repo_id: Uuid::nil(),
412            changeset_id: Uuid::nil(),
413            state: "active".to_string(),
414            elapsed_secs: 42,
415        };
416
417        let json = serde_json::to_value(&info).expect("SessionInfo should serialize to JSON");
418
419        assert_eq!(json["agent_id"], "test-agent");
420        assert_eq!(json["agent_name"], "agent-1");
421        assert_eq!(json["intent"], "fix bug");
422        assert_eq!(json["state"], "active");
423        assert_eq!(json["elapsed_secs"], 42);
424        assert_eq!(
425            json["session_id"],
426            "00000000-0000-0000-0000-000000000000"
427        );
428    }
429
430    #[test]
431    fn session_info_all_fields_present_in_json() {
432        let info = SessionInfo {
433            session_id: Uuid::new_v4(),
434            agent_id: "claude".to_string(),
435            agent_name: "agent-1".to_string(),
436            intent: "refactor".to_string(),
437            repo_id: Uuid::new_v4(),
438            changeset_id: Uuid::new_v4(),
439            state: "submitted".to_string(),
440            elapsed_secs: 100,
441        };
442
443        let json = serde_json::to_value(&info).expect("serialize");
444        let obj = json.as_object().expect("should be an object");
445
446        let expected_keys = [
447            "session_id",
448            "agent_id",
449            "agent_name",
450            "intent",
451            "repo_id",
452            "changeset_id",
453            "state",
454            "elapsed_secs",
455        ];
456        for key in &expected_keys {
457            assert!(obj.contains_key(*key), "missing key: {}", key);
458        }
459        assert_eq!(obj.len(), expected_keys.len(), "unexpected extra keys in SessionInfo JSON");
460    }
461
462    #[test]
463    fn session_info_clone_preserves_values() {
464        let info = SessionInfo {
465            session_id: Uuid::new_v4(),
466            agent_id: "agent-1".to_string(),
467            agent_name: "feature-bot".to_string(),
468            intent: "deploy".to_string(),
469            repo_id: Uuid::new_v4(),
470            changeset_id: Uuid::new_v4(),
471            state: "active".to_string(),
472            elapsed_secs: 5,
473        };
474
475        let cloned = info.clone();
476        assert_eq!(info.session_id, cloned.session_id);
477        assert_eq!(info.agent_id, cloned.agent_id);
478        assert_eq!(info.agent_name, cloned.agent_name);
479        assert_eq!(info.intent, cloned.intent);
480        assert_eq!(info.repo_id, cloned.repo_id);
481        assert_eq!(info.changeset_id, cloned.changeset_id);
482        assert_eq!(info.state, cloned.state);
483        assert_eq!(info.elapsed_secs, cloned.elapsed_secs);
484    }
485
486    #[tokio::test]
487    async fn next_agent_name_increments_per_repo() {
488        let db = PgPool::connect_lazy("postgres://localhost/nonexistent").unwrap();
489        let mgr = WorkspaceManager::new(db);
490        let repo1 = Uuid::new_v4();
491        let repo2 = Uuid::new_v4();
492
493        assert_eq!(mgr.next_agent_name(&repo1), "agent-1");
494        assert_eq!(mgr.next_agent_name(&repo1), "agent-2");
495        assert_eq!(mgr.next_agent_name(&repo1), "agent-3");
496
497        // Different repo starts at 1
498        assert_eq!(mgr.next_agent_name(&repo2), "agent-1");
499        assert_eq!(mgr.next_agent_name(&repo2), "agent-2");
500
501        // Original repo continues
502        assert_eq!(mgr.next_agent_name(&repo1), "agent-4");
503    }
504
505    /// Integration-level test for list_sessions and WorkspaceManager.
506    /// Requires PgPool which we cannot construct without a DB, so this
507    /// is marked #[ignore]. Run with:
508    ///   DATABASE_URL=postgres://localhost/dkod_test cargo test -p dk-engine -- --ignored
509    #[test]
510    #[ignore]
511    fn list_sessions_returns_empty_for_unknown_repo() {
512        // This test would require a PgPool. The structural tests above
513        // validate SessionInfo independently.
514    }
515
516    #[tokio::test]
517    async fn describe_other_modifiers_empty_when_no_other_sessions() {
518        let db = PgPool::connect_lazy("postgres://localhost/nonexistent").unwrap();
519        let mgr = WorkspaceManager::new(db);
520        let repo_id = Uuid::new_v4();
521        let session_id = Uuid::new_v4();
522
523        let result = mgr.describe_other_modifiers("src/lib.rs", repo_id, session_id);
524        assert!(result.is_empty());
525    }
526
527    #[tokio::test]
528    async fn describe_other_modifiers_shows_agent_name() {
529        use crate::workspace::session_workspace::{SessionWorkspace, WorkspaceMode};
530
531        let db = PgPool::connect_lazy("postgres://localhost/nonexistent").unwrap();
532        let mgr = WorkspaceManager::new(db);
533        let repo_id = Uuid::new_v4();
534
535        let session1 = Uuid::new_v4();
536        let session2 = Uuid::new_v4();
537
538        let mut ws2 = SessionWorkspace::new_test(
539            session2,
540            repo_id,
541            "agent-2-id".to_string(),
542            "fix bug".to_string(),
543            "abc123".to_string(),
544            WorkspaceMode::Ephemeral,
545        );
546        ws2.agent_name = "agent-2".to_string();
547        ws2.overlay.write_local("src/lib.rs", b"content".to_vec(), false);
548
549        mgr.insert_test_workspace(ws2);
550
551        let result = mgr.describe_other_modifiers("src/lib.rs", repo_id, session1);
552        assert_eq!(result, "modified by agent-2");
553
554        let result2 = mgr.describe_other_modifiers("src/other.rs", repo_id, session1);
555        assert!(result2.is_empty());
556
557        let result3 = mgr.describe_other_modifiers("src/lib.rs", repo_id, session2);
558        assert!(result3.is_empty());
559    }
560
561    #[tokio::test]
562    async fn describe_other_modifiers_includes_symbols() {
563        use crate::workspace::session_workspace::{SessionWorkspace, WorkspaceMode};
564        use dk_core::{Span, Symbol, SymbolKind, Visibility};
565        use std::path::PathBuf;
566
567        let db = PgPool::connect_lazy("postgres://localhost/nonexistent").unwrap();
568        let mgr = WorkspaceManager::new(db);
569        let repo_id = Uuid::new_v4();
570
571        let session1 = Uuid::new_v4();
572        let session2 = Uuid::new_v4();
573
574        let mut ws2 = SessionWorkspace::new_test(
575            session2,
576            repo_id,
577            "agent-2-id".to_string(),
578            "add feature".to_string(),
579            "abc123".to_string(),
580            WorkspaceMode::Ephemeral,
581        );
582        ws2.agent_name = "agent-2".to_string();
583        ws2.overlay
584            .write_local("src/tasks.rs", b"fn create_task() {}".to_vec(), true);
585        ws2.graph.add_symbol(Symbol {
586            id: Uuid::new_v4(),
587            name: "create_task".to_string(),
588            qualified_name: "create_task".to_string(),
589            kind: SymbolKind::Function,
590            visibility: Visibility::Public,
591            file_path: PathBuf::from("src/tasks.rs"),
592            span: Span {
593                start_byte: 0,
594                end_byte: 20,
595            },
596            signature: None,
597            doc_comment: None,
598            parent: None,
599            last_modified_by: None,
600            last_modified_intent: None,
601        });
602
603        mgr.insert_test_workspace(ws2);
604
605        let result = mgr.describe_other_modifiers("src/tasks.rs", repo_id, session1);
606        assert_eq!(result, "create_task modified by agent-2");
607    }
608}