Skip to main content

rustyclaw_core/threads/
manager.rs

1//! Thread manager — manages all agent threads.
2
3use super::{AgentThread, MessageRole, ThreadEvent, ThreadId, ThreadInfo, ThreadStatus};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::path::Path;
7use std::sync::Arc;
8use std::time::{Duration, SystemTime};
9use tokio::sync::{broadcast, RwLock};
10use tracing::debug;
11
12/// Configuration for thread management.
13#[derive(Debug, Clone)]
14pub struct ThreadManagerConfig {
15    /// How long to keep completed ephemeral threads before cleanup
16    pub ephemeral_retention: Duration,
17    
18    /// Maximum messages before compaction is triggered
19    pub compaction_threshold: usize,
20    
21    /// How many recent messages to keep after compaction
22    pub messages_after_compaction: usize,
23}
24
25impl Default for ThreadManagerConfig {
26    fn default() -> Self {
27        Self {
28            ephemeral_retention: Duration::from_secs(300), // 5 minutes
29            compaction_threshold: 20,
30            messages_after_compaction: 5,
31        }
32    }
33}
34
35/// Manages all agent threads with event emission.
36pub struct ThreadManager {
37    /// All threads by ID
38    threads: HashMap<ThreadId, AgentThread>,
39    
40    /// Currently foregrounded thread
41    foreground_id: Option<ThreadId>,
42    
43    /// Event broadcast channel
44    events_tx: broadcast::Sender<ThreadEvent>,
45    
46    /// Configuration
47    config: ThreadManagerConfig,
48}
49
50impl ThreadManager {
51    /// Create a new thread manager.
52    pub fn new() -> Self {
53        Self::with_config(ThreadManagerConfig::default())
54    }
55    
56    /// Create with custom config.
57    pub fn with_config(config: ThreadManagerConfig) -> Self {
58        let (events_tx, _) = broadcast::channel(256);
59        Self {
60            threads: HashMap::new(),
61            foreground_id: None,
62            events_tx,
63            config,
64        }
65    }
66    
67    /// Subscribe to thread events.
68    pub fn subscribe(&self) -> broadcast::Receiver<ThreadEvent> {
69        self.events_tx.subscribe()
70    }
71    
72    // ── Thread Creation ─────────────────────────────────────────────────────
73    
74    /// Create a new chat thread and make it foreground.
75    pub fn create_chat(&mut self, label: impl Into<String>) -> ThreadId {
76        let mut thread = AgentThread::new_chat(label);
77        thread.is_foreground = true;
78        let id = thread.id;
79        
80        // Background the old foreground
81        if let Some(old_fg) = self.foreground_id {
82            if let Some(t) = self.threads.get_mut(&old_fg) {
83                t.is_foreground = false;
84            }
85        }
86        
87        self.foreground_id = Some(id);
88        let info = thread.to_info();
89        self.threads.insert(id, thread);
90        
91        self.emit(ThreadEvent::Created {
92            thread: info,
93            parent_id: None,
94        });
95        
96        id
97    }
98    
99    /// Create a sub-agent thread.
100    pub fn create_subagent(
101        &mut self,
102        label: impl Into<String>,
103        agent_id: impl Into<String>,
104        task: impl Into<String>,
105        parent_id: Option<ThreadId>,
106    ) -> ThreadId {
107        let thread = AgentThread::new_subagent(label, agent_id, task, parent_id);
108        let id = thread.id;
109        let info = thread.to_info();
110        
111        self.threads.insert(id, thread);
112        
113        self.emit(ThreadEvent::Created {
114            thread: info,
115            parent_id,
116        });
117        
118        id
119    }
120    
121    /// Create a background thread.
122    pub fn create_background(
123        &mut self,
124        label: impl Into<String>,
125        purpose: impl Into<String>,
126        parent_id: Option<ThreadId>,
127    ) -> ThreadId {
128        let thread = AgentThread::new_background(label, purpose, parent_id);
129        let id = thread.id;
130        let info = thread.to_info();
131        
132        self.threads.insert(id, thread);
133        
134        self.emit(ThreadEvent::Created {
135            thread: info,
136            parent_id,
137        });
138        
139        id
140    }
141    
142    /// Create a task thread.
143    pub fn create_task(
144        &mut self,
145        label: impl Into<String>,
146        action: impl Into<String>,
147        parent_id: Option<ThreadId>,
148    ) -> ThreadId {
149        let thread = AgentThread::new_task(label, action, parent_id);
150        let id = thread.id;
151        let info = thread.to_info();
152        
153        self.threads.insert(id, thread);
154        
155        self.emit(ThreadEvent::Created {
156            thread: info,
157            parent_id,
158        });
159        
160        id
161    }
162    
163    // ── Thread Access ───────────────────────────────────────────────────────
164    
165    /// Get a thread by ID.
166    pub fn get(&self, id: ThreadId) -> Option<&AgentThread> {
167        self.threads.get(&id)
168    }
169    
170    /// Get a mutable thread by ID.
171    pub fn get_mut(&mut self, id: ThreadId) -> Option<&mut AgentThread> {
172        self.threads.get_mut(&id)
173    }
174    
175    /// Get the foreground thread.
176    pub fn foreground(&self) -> Option<&AgentThread> {
177        self.foreground_id.and_then(|id| self.threads.get(&id))
178    }
179    
180    /// Get the foreground thread mutably.
181    pub fn foreground_mut(&mut self) -> Option<&mut AgentThread> {
182        self.foreground_id.and_then(|id| self.threads.get_mut(&id))
183    }
184    
185    /// Get foreground thread ID.
186    pub fn foreground_id(&self) -> Option<ThreadId> {
187        self.foreground_id
188    }
189    
190    /// List all threads.
191    pub fn list(&self) -> Vec<&AgentThread> {
192        self.threads.values().collect()
193    }
194    
195    /// List thread info for sidebar display.
196    pub fn list_info(&self) -> Vec<ThreadInfo> {
197        self.threads.values().map(|t| t.to_info()).collect()
198    }
199    
200    // ── Thread Updates ──────────────────────────────────────────────────────
201    
202    /// Set description for a thread.
203    pub fn set_description(&mut self, id: ThreadId, description: impl Into<String>) {
204        let desc = description.into();
205        if let Some(thread) = self.threads.get_mut(&id) {
206            thread.set_description(&desc);
207            self.emit(ThreadEvent::DescriptionChanged {
208                thread_id: id,
209                description: desc,
210            });
211        }
212    }
213    
214    /// Set description for the foreground thread.
215    pub fn set_foreground_description(&mut self, description: impl Into<String>) {
216        if let Some(id) = self.foreground_id {
217            self.set_description(id, description);
218        }
219    }
220    
221    /// Update thread status.
222    pub fn set_status(&mut self, id: ThreadId, status: ThreadStatus) {
223        if let Some(thread) = self.threads.get_mut(&id) {
224            let old_status = thread.status.clone();
225            thread.set_status(status.clone());
226            self.emit(ThreadEvent::StatusChanged {
227                thread_id: id,
228                old_status,
229                new_status: status,
230            });
231        }
232    }
233    
234    /// Switch foreground to a different thread.
235    pub fn switch_foreground(&mut self, id: ThreadId) -> bool {
236        if !self.threads.contains_key(&id) {
237            return false;
238        }
239        
240        let old_fg = self.foreground_id;
241        
242        // Background the old foreground
243        if let Some(old_id) = old_fg {
244            if let Some(t) = self.threads.get_mut(&old_id) {
245                t.is_foreground = false;
246            }
247        }
248        
249        // Foreground the new one
250        if let Some(t) = self.threads.get_mut(&id) {
251            t.is_foreground = true;
252        }
253        
254        self.foreground_id = Some(id);
255        
256        self.emit(ThreadEvent::Foregrounded {
257            thread_id: id,
258            previous_foreground: old_fg,
259        });
260        
261        true
262    }
263    
264    /// Clear the foreground — no thread is active (background all).
265    pub fn clear_foreground(&mut self) {
266        if let Some(old_id) = self.foreground_id {
267            if let Some(t) = self.threads.get_mut(&old_id) {
268                t.is_foreground = false;
269            }
270        }
271        self.foreground_id = None;
272    }
273    
274    /// Rename a thread.
275    pub fn rename(&mut self, id: ThreadId, new_label: impl Into<String>) -> bool {
276        if let Some(thread) = self.threads.get_mut(&id) {
277            thread.label = new_label.into();
278            thread.last_activity = SystemTime::now();
279            true
280        } else {
281            false
282        }
283    }
284    
285    /// Find the best matching thread for a given message content.
286    /// Returns the thread ID if a good match is found, None otherwise.
287    pub fn find_best_match(&self, content: &str) -> Option<ThreadId> {
288        let content_lower = content.to_lowercase();
289        
290        // Look for threads where label or description matches content keywords
291        for thread in self.threads.values() {
292            // Skip the current foreground
293            if thread.is_foreground {
294                continue;
295            }
296            
297            // Check if thread label is mentioned in content
298            if content_lower.contains(&thread.label.to_lowercase()) {
299                return Some(thread.id);
300            }
301            
302            // Check if description keywords match
303            if let Some(desc) = &thread.description {
304                let desc_words: Vec<&str> = desc.split_whitespace()
305                    .filter(|w| w.len() > 3)
306                    .collect();
307                let matches = desc_words.iter()
308                    .filter(|w| content_lower.contains(&w.to_lowercase()))
309                    .count();
310                if matches >= 2 {
311                    return Some(thread.id);
312                }
313            }
314        }
315        
316        None
317    }
318    
319    /// Mark a thread as completed.
320    pub fn complete(&mut self, id: ThreadId, summary: Option<String>, result: Option<String>) {
321        if let Some(thread) = self.threads.get_mut(&id) {
322            thread.complete(summary.clone(), result.clone());
323            self.emit(ThreadEvent::Completed {
324                thread_id: id,
325                summary,
326                result,
327            });
328        }
329    }
330    
331    /// Mark a thread as failed.
332    pub fn fail(&mut self, id: ThreadId, error: impl Into<String>) {
333        let err = error.into();
334        if let Some(thread) = self.threads.get_mut(&id) {
335            thread.fail(&err);
336            self.emit(ThreadEvent::Failed {
337                thread_id: id,
338                error: err,
339            });
340        }
341    }
342    
343    /// Add a message to a thread.
344    pub fn add_message(&mut self, id: ThreadId, role: MessageRole, content: impl Into<String>) {
345        let message_count = if let Some(thread) = self.threads.get_mut(&id) {
346            thread.add_message(role, content);
347            thread.messages.len()
348        } else {
349            return;
350        };
351        
352        self.emit(ThreadEvent::MessageAdded {
353            thread_id: id,
354            message_count,
355        });
356    }
357    
358    /// Add a message to the foreground thread.
359    pub fn add_foreground_message(&mut self, role: MessageRole, content: impl Into<String>) {
360        if let Some(id) = self.foreground_id {
361            self.add_message(id, role, content);
362        }
363    }
364    
365    // ── Thread Removal ──────────────────────────────────────────────────────
366    
367    /// Remove a thread.
368    pub fn remove(&mut self, id: ThreadId) -> Option<AgentThread> {
369        let thread = self.threads.remove(&id);
370        if thread.is_some() {
371            if self.foreground_id == Some(id) {
372                self.foreground_id = None;
373            }
374            self.emit(ThreadEvent::Removed { thread_id: id });
375        }
376        thread
377    }
378    
379    /// Clean up old ephemeral threads.
380    pub fn cleanup_ephemeral(&mut self) {
381        let now = SystemTime::now();
382        let retention = self.config.ephemeral_retention;
383        
384        let to_remove: Vec<ThreadId> = self.threads
385            .iter()
386            .filter(|(_, t)| {
387                t.kind.is_ephemeral()
388                    && t.status.is_terminal()
389                    && now.duration_since(t.last_activity)
390                        .map(|d| d > retention)
391                        .unwrap_or(false)
392            })
393            .map(|(id, _)| *id)
394            .collect();
395        
396        for id in to_remove {
397            self.remove(id);
398        }
399    }
400    
401    // ── Context Building ────────────────────────────────────────────────────
402    
403    /// Build global context from all threads that share context.
404    pub fn build_global_context(&self) -> String {
405        let mut context = String::new();
406        
407        for thread in self.threads.values() {
408            if !thread.share_context || thread.is_foreground {
409                continue;
410            }
411            
412            // Include summary or recent info for backgrounded threads
413            if let Some(summary) = &thread.compact_summary {
414                context.push_str(&format!(
415                    "## {} ({})\n{}\n\n",
416                    thread.label,
417                    thread.kind.display_name(),
418                    summary
419                ));
420            } else if !thread.messages.is_empty() {
421                let recent: Vec<_> = thread.messages.iter().rev().take(2).collect();
422                context.push_str(&format!(
423                    "## {} ({}) - {} messages\n",
424                    thread.label,
425                    thread.kind.display_name(),
426                    thread.messages.len()
427                ));
428                for msg in recent.into_iter().rev() {
429                    context.push_str(&format!("{:?}: {}\n", msg.role, &msg.content[..msg.content.len().min(100)]));
430                }
431                context.push('\n');
432            }
433        }
434        
435        context
436    }
437    
438    // ── Persistence ─────────────────────────────────────────────────────────
439    
440    /// Save threads to a file.
441    pub fn save_to_file(&self, path: &Path) -> std::io::Result<()> {
442        let state = PersistentState {
443            threads: self.threads.values().cloned().collect(),
444            foreground_id: self.foreground_id,
445        };
446        let json = serde_json::to_string_pretty(&state)
447            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
448        std::fs::write(path, json)
449    }
450    
451    /// Load threads from a file.
452    pub fn load_from_file(path: &Path) -> std::io::Result<Self> {
453        let json = std::fs::read_to_string(path)?;
454        let state: PersistentState = serde_json::from_str(&json)
455            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
456        
457        let (events_tx, _) = broadcast::channel(256);
458        let mut mgr = Self {
459            threads: HashMap::new(),
460            foreground_id: state.foreground_id,
461            events_tx,
462            config: ThreadManagerConfig::default(),
463        };
464        
465        for thread in state.threads {
466            mgr.threads.insert(thread.id, thread);
467        }
468        
469        Ok(mgr)
470    }
471    
472    /// Load from file or create with default chat thread.
473    pub fn load_or_default(path: &Path) -> Self {
474        match Self::load_from_file(path) {
475            Ok(mgr) => {
476                debug!("Loaded {} threads from {:?}", mgr.threads.len(), path);
477                mgr
478            }
479            Err(e) => {
480                debug!("Creating new thread manager (load failed: {})", e);
481                let mut mgr = Self::new();
482                mgr.create_chat("Main");
483                mgr
484            }
485        }
486    }
487    
488    // ── Backwards Compatibility ─────────────────────────────────────────────
489    // These methods match the old tasks::ThreadManager API for easier migration.
490    
491    /// Alias for create_chat (old API compatibility).
492    pub fn create_thread(&mut self, label: impl Into<String>) -> ThreadId {
493        self.create_chat(label)
494    }
495    
496    /// Alias for switch_foreground that returns old foreground ID (old API compatibility).
497    pub fn switch_to(&mut self, id: ThreadId) -> Option<ThreadId> {
498        let old_fg = self.foreground_id;
499        if self.switch_foreground(id) {
500            old_fg
501        } else {
502            None
503        }
504    }
505    
506    /// Get a thread by ID (compatibility - already exists as get()).
507    pub fn get_by_id(&self, id: ThreadId) -> Option<&AgentThread> {
508        self.get(id)
509    }
510    
511    /// Get a mutable thread by ID (compatibility - already exists as get_mut()).
512    pub fn get_by_id_mut(&mut self, id: ThreadId) -> Option<&mut AgentThread> {
513        self.get_mut(id)
514    }
515    
516    // ── Internal ────────────────────────────────────────────────────────────
517    
518    fn emit(&self, event: ThreadEvent) {
519        // Ignore send errors (no subscribers)
520        let _ = self.events_tx.send(event);
521    }
522}
523
524impl Default for ThreadManager {
525    fn default() -> Self {
526        Self::new()
527    }
528}
529
530/// State for persistence.
531#[derive(Debug, Serialize, Deserialize)]
532struct PersistentState {
533    threads: Vec<AgentThread>,
534    foreground_id: Option<ThreadId>,
535}
536
537/// Shared thread manager type.
538pub type SharedThreadManager = Arc<RwLock<ThreadManager>>;
539
540#[cfg(test)]
541mod tests {
542    use super::*;
543    use crate::threads::ThreadKind;
544    
545    #[test]
546    fn test_create_chat_thread() {
547        let mut mgr = ThreadManager::new();
548        let id = mgr.create_chat("Test Chat");
549        
550        assert!(mgr.get(id).is_some());
551        assert_eq!(mgr.foreground_id(), Some(id));
552        assert!(mgr.get(id).unwrap().is_foreground);
553    }
554    
555    #[test]
556    fn test_create_subagent() {
557        let mut mgr = ThreadManager::new();
558        let parent = mgr.create_chat("Main");
559        let child = mgr.create_subagent("Worker", "gpt-4", "Do the thing", Some(parent));
560        
561        let thread = mgr.get(child).unwrap();
562        assert!(matches!(thread.kind, ThreadKind::SubAgent { .. }));
563        assert_eq!(thread.parent_id, Some(parent));
564    }
565    
566    #[test]
567    fn test_switch_foreground() {
568        let mut mgr = ThreadManager::new();
569        let id1 = mgr.create_chat("Chat 1");
570        let id2 = mgr.create_chat("Chat 2");
571        
572        assert_eq!(mgr.foreground_id(), Some(id2));
573        assert!(!mgr.get(id1).unwrap().is_foreground);
574        
575        mgr.switch_foreground(id1);
576        assert_eq!(mgr.foreground_id(), Some(id1));
577        assert!(mgr.get(id1).unwrap().is_foreground);
578        assert!(!mgr.get(id2).unwrap().is_foreground);
579    }
580    
581    #[test]
582    fn test_set_description() {
583        let mut mgr = ThreadManager::new();
584        let id = mgr.create_chat("Test");
585        
586        mgr.set_description(id, "Working on taxes");
587        assert_eq!(mgr.get(id).unwrap().description.as_deref(), Some("Working on taxes"));
588    }
589    
590    #[test]
591    fn test_complete_and_fail() {
592        let mut mgr = ThreadManager::new();
593        
594        let task1 = mgr.create_task("Task 1", "Do thing", None);
595        mgr.complete(task1, Some("Done!".into()), Some("result data".into()));
596        assert!(mgr.get(task1).unwrap().status.is_terminal());
597        assert_eq!(mgr.get(task1).unwrap().result.as_deref(), Some("result data"));
598        
599        let task2 = mgr.create_task("Task 2", "Do other thing", None);
600        mgr.fail(task2, "Something went wrong");
601        assert!(matches!(mgr.get(task2).unwrap().status, ThreadStatus::Failed { .. }));
602    }
603    
604    #[test]
605    fn test_list_info() {
606        let mut mgr = ThreadManager::new();
607        mgr.create_chat("Chat");
608        mgr.create_subagent("Worker", "gpt-4", "task", None);
609        
610        let info = mgr.list_info();
611        assert_eq!(info.len(), 2);
612    }
613}