1use super::{AgentThread, MessageRole, ThreadEvent, ThreadId, ThreadInfo, ThreadStatus};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::path::Path;
7use std::sync::Arc;
8use std::time::{Duration, SystemTime};
9use tokio::sync::{broadcast, RwLock};
10use tracing::debug;
11
12#[derive(Debug, Clone)]
14pub struct ThreadManagerConfig {
15 pub ephemeral_retention: Duration,
17
18 pub compaction_threshold: usize,
20
21 pub messages_after_compaction: usize,
23}
24
25impl Default for ThreadManagerConfig {
26 fn default() -> Self {
27 Self {
28 ephemeral_retention: Duration::from_secs(300), compaction_threshold: 20,
30 messages_after_compaction: 5,
31 }
32 }
33}
34
35pub struct ThreadManager {
37 threads: HashMap<ThreadId, AgentThread>,
39
40 foreground_id: Option<ThreadId>,
42
43 events_tx: broadcast::Sender<ThreadEvent>,
45
46 config: ThreadManagerConfig,
48}
49
50impl ThreadManager {
51 pub fn new() -> Self {
53 Self::with_config(ThreadManagerConfig::default())
54 }
55
56 pub fn with_config(config: ThreadManagerConfig) -> Self {
58 let (events_tx, _) = broadcast::channel(256);
59 Self {
60 threads: HashMap::new(),
61 foreground_id: None,
62 events_tx,
63 config,
64 }
65 }
66
67 pub fn subscribe(&self) -> broadcast::Receiver<ThreadEvent> {
69 self.events_tx.subscribe()
70 }
71
72 pub fn create_chat(&mut self, label: impl Into<String>) -> ThreadId {
76 let mut thread = AgentThread::new_chat(label);
77 thread.is_foreground = true;
78 let id = thread.id;
79
80 if let Some(old_fg) = self.foreground_id {
82 if let Some(t) = self.threads.get_mut(&old_fg) {
83 t.is_foreground = false;
84 }
85 }
86
87 self.foreground_id = Some(id);
88 let info = thread.to_info();
89 self.threads.insert(id, thread);
90
91 self.emit(ThreadEvent::Created {
92 thread: info,
93 parent_id: None,
94 });
95
96 id
97 }
98
99 pub fn create_subagent(
101 &mut self,
102 label: impl Into<String>,
103 agent_id: impl Into<String>,
104 task: impl Into<String>,
105 parent_id: Option<ThreadId>,
106 ) -> ThreadId {
107 let thread = AgentThread::new_subagent(label, agent_id, task, parent_id);
108 let id = thread.id;
109 let info = thread.to_info();
110
111 self.threads.insert(id, thread);
112
113 self.emit(ThreadEvent::Created {
114 thread: info,
115 parent_id,
116 });
117
118 id
119 }
120
121 pub fn create_background(
123 &mut self,
124 label: impl Into<String>,
125 purpose: impl Into<String>,
126 parent_id: Option<ThreadId>,
127 ) -> ThreadId {
128 let thread = AgentThread::new_background(label, purpose, parent_id);
129 let id = thread.id;
130 let info = thread.to_info();
131
132 self.threads.insert(id, thread);
133
134 self.emit(ThreadEvent::Created {
135 thread: info,
136 parent_id,
137 });
138
139 id
140 }
141
142 pub fn create_task(
144 &mut self,
145 label: impl Into<String>,
146 action: impl Into<String>,
147 parent_id: Option<ThreadId>,
148 ) -> ThreadId {
149 let thread = AgentThread::new_task(label, action, parent_id);
150 let id = thread.id;
151 let info = thread.to_info();
152
153 self.threads.insert(id, thread);
154
155 self.emit(ThreadEvent::Created {
156 thread: info,
157 parent_id,
158 });
159
160 id
161 }
162
163 pub fn get(&self, id: ThreadId) -> Option<&AgentThread> {
167 self.threads.get(&id)
168 }
169
170 pub fn get_mut(&mut self, id: ThreadId) -> Option<&mut AgentThread> {
172 self.threads.get_mut(&id)
173 }
174
175 pub fn foreground(&self) -> Option<&AgentThread> {
177 self.foreground_id.and_then(|id| self.threads.get(&id))
178 }
179
180 pub fn foreground_mut(&mut self) -> Option<&mut AgentThread> {
182 self.foreground_id.and_then(|id| self.threads.get_mut(&id))
183 }
184
185 pub fn foreground_id(&self) -> Option<ThreadId> {
187 self.foreground_id
188 }
189
190 pub fn list(&self) -> Vec<&AgentThread> {
192 self.threads.values().collect()
193 }
194
195 pub fn list_info(&self) -> Vec<ThreadInfo> {
197 self.threads.values().map(|t| t.to_info()).collect()
198 }
199
200 pub fn set_description(&mut self, id: ThreadId, description: impl Into<String>) {
204 let desc = description.into();
205 if let Some(thread) = self.threads.get_mut(&id) {
206 thread.set_description(&desc);
207 self.emit(ThreadEvent::DescriptionChanged {
208 thread_id: id,
209 description: desc,
210 });
211 }
212 }
213
214 pub fn set_foreground_description(&mut self, description: impl Into<String>) {
216 if let Some(id) = self.foreground_id {
217 self.set_description(id, description);
218 }
219 }
220
221 pub fn set_status(&mut self, id: ThreadId, status: ThreadStatus) {
223 if let Some(thread) = self.threads.get_mut(&id) {
224 let old_status = thread.status.clone();
225 thread.set_status(status.clone());
226 self.emit(ThreadEvent::StatusChanged {
227 thread_id: id,
228 old_status,
229 new_status: status,
230 });
231 }
232 }
233
234 pub fn switch_foreground(&mut self, id: ThreadId) -> bool {
236 if !self.threads.contains_key(&id) {
237 return false;
238 }
239
240 let old_fg = self.foreground_id;
241
242 if let Some(old_id) = old_fg {
244 if let Some(t) = self.threads.get_mut(&old_id) {
245 t.is_foreground = false;
246 }
247 }
248
249 if let Some(t) = self.threads.get_mut(&id) {
251 t.is_foreground = true;
252 }
253
254 self.foreground_id = Some(id);
255
256 self.emit(ThreadEvent::Foregrounded {
257 thread_id: id,
258 previous_foreground: old_fg,
259 });
260
261 true
262 }
263
264 pub fn clear_foreground(&mut self) {
266 if let Some(old_id) = self.foreground_id {
267 if let Some(t) = self.threads.get_mut(&old_id) {
268 t.is_foreground = false;
269 }
270 }
271 self.foreground_id = None;
272 }
273
274 pub fn rename(&mut self, id: ThreadId, new_label: impl Into<String>) -> bool {
276 if let Some(thread) = self.threads.get_mut(&id) {
277 thread.label = new_label.into();
278 thread.last_activity = SystemTime::now();
279 true
280 } else {
281 false
282 }
283 }
284
285 pub fn find_best_match(&self, content: &str) -> Option<ThreadId> {
288 let content_lower = content.to_lowercase();
289
290 for thread in self.threads.values() {
292 if thread.is_foreground {
294 continue;
295 }
296
297 if content_lower.contains(&thread.label.to_lowercase()) {
299 return Some(thread.id);
300 }
301
302 if let Some(desc) = &thread.description {
304 let desc_words: Vec<&str> = desc.split_whitespace()
305 .filter(|w| w.len() > 3)
306 .collect();
307 let matches = desc_words.iter()
308 .filter(|w| content_lower.contains(&w.to_lowercase()))
309 .count();
310 if matches >= 2 {
311 return Some(thread.id);
312 }
313 }
314 }
315
316 None
317 }
318
319 pub fn complete(&mut self, id: ThreadId, summary: Option<String>, result: Option<String>) {
321 if let Some(thread) = self.threads.get_mut(&id) {
322 thread.complete(summary.clone(), result.clone());
323 self.emit(ThreadEvent::Completed {
324 thread_id: id,
325 summary,
326 result,
327 });
328 }
329 }
330
331 pub fn fail(&mut self, id: ThreadId, error: impl Into<String>) {
333 let err = error.into();
334 if let Some(thread) = self.threads.get_mut(&id) {
335 thread.fail(&err);
336 self.emit(ThreadEvent::Failed {
337 thread_id: id,
338 error: err,
339 });
340 }
341 }
342
343 pub fn add_message(&mut self, id: ThreadId, role: MessageRole, content: impl Into<String>) {
345 let message_count = if let Some(thread) = self.threads.get_mut(&id) {
346 thread.add_message(role, content);
347 thread.messages.len()
348 } else {
349 return;
350 };
351
352 self.emit(ThreadEvent::MessageAdded {
353 thread_id: id,
354 message_count,
355 });
356 }
357
358 pub fn add_foreground_message(&mut self, role: MessageRole, content: impl Into<String>) {
360 if let Some(id) = self.foreground_id {
361 self.add_message(id, role, content);
362 }
363 }
364
365 pub fn remove(&mut self, id: ThreadId) -> Option<AgentThread> {
369 let thread = self.threads.remove(&id);
370 if thread.is_some() {
371 if self.foreground_id == Some(id) {
372 self.foreground_id = None;
373 }
374 self.emit(ThreadEvent::Removed { thread_id: id });
375 }
376 thread
377 }
378
379 pub fn cleanup_ephemeral(&mut self) {
381 let now = SystemTime::now();
382 let retention = self.config.ephemeral_retention;
383
384 let to_remove: Vec<ThreadId> = self.threads
385 .iter()
386 .filter(|(_, t)| {
387 t.kind.is_ephemeral()
388 && t.status.is_terminal()
389 && now.duration_since(t.last_activity)
390 .map(|d| d > retention)
391 .unwrap_or(false)
392 })
393 .map(|(id, _)| *id)
394 .collect();
395
396 for id in to_remove {
397 self.remove(id);
398 }
399 }
400
401 pub fn build_global_context(&self) -> String {
405 let mut context = String::new();
406
407 for thread in self.threads.values() {
408 if !thread.share_context || thread.is_foreground {
409 continue;
410 }
411
412 if let Some(summary) = &thread.compact_summary {
414 context.push_str(&format!(
415 "## {} ({})\n{}\n\n",
416 thread.label,
417 thread.kind.display_name(),
418 summary
419 ));
420 } else if !thread.messages.is_empty() {
421 let recent: Vec<_> = thread.messages.iter().rev().take(2).collect();
422 context.push_str(&format!(
423 "## {} ({}) - {} messages\n",
424 thread.label,
425 thread.kind.display_name(),
426 thread.messages.len()
427 ));
428 for msg in recent.into_iter().rev() {
429 context.push_str(&format!("{:?}: {}\n", msg.role, &msg.content[..msg.content.len().min(100)]));
430 }
431 context.push('\n');
432 }
433 }
434
435 context
436 }
437
438 pub fn save_to_file(&self, path: &Path) -> std::io::Result<()> {
442 let state = PersistentState {
443 threads: self.threads.values().cloned().collect(),
444 foreground_id: self.foreground_id,
445 };
446 let json = serde_json::to_string_pretty(&state)
447 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
448 std::fs::write(path, json)
449 }
450
451 pub fn load_from_file(path: &Path) -> std::io::Result<Self> {
453 let json = std::fs::read_to_string(path)?;
454 let state: PersistentState = serde_json::from_str(&json)
455 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
456
457 let (events_tx, _) = broadcast::channel(256);
458 let mut mgr = Self {
459 threads: HashMap::new(),
460 foreground_id: state.foreground_id,
461 events_tx,
462 config: ThreadManagerConfig::default(),
463 };
464
465 for thread in state.threads {
466 mgr.threads.insert(thread.id, thread);
467 }
468
469 Ok(mgr)
470 }
471
472 pub fn load_or_default(path: &Path) -> Self {
474 match Self::load_from_file(path) {
475 Ok(mgr) => {
476 debug!("Loaded {} threads from {:?}", mgr.threads.len(), path);
477 mgr
478 }
479 Err(e) => {
480 debug!("Creating new thread manager (load failed: {})", e);
481 let mut mgr = Self::new();
482 mgr.create_chat("Main");
483 mgr
484 }
485 }
486 }
487
488 pub fn create_thread(&mut self, label: impl Into<String>) -> ThreadId {
493 self.create_chat(label)
494 }
495
496 pub fn switch_to(&mut self, id: ThreadId) -> Option<ThreadId> {
498 let old_fg = self.foreground_id;
499 if self.switch_foreground(id) {
500 old_fg
501 } else {
502 None
503 }
504 }
505
506 pub fn get_by_id(&self, id: ThreadId) -> Option<&AgentThread> {
508 self.get(id)
509 }
510
511 pub fn get_by_id_mut(&mut self, id: ThreadId) -> Option<&mut AgentThread> {
513 self.get_mut(id)
514 }
515
516 fn emit(&self, event: ThreadEvent) {
519 let _ = self.events_tx.send(event);
521 }
522}
523
524impl Default for ThreadManager {
525 fn default() -> Self {
526 Self::new()
527 }
528}
529
530#[derive(Debug, Serialize, Deserialize)]
532struct PersistentState {
533 threads: Vec<AgentThread>,
534 foreground_id: Option<ThreadId>,
535}
536
537pub type SharedThreadManager = Arc<RwLock<ThreadManager>>;
539
540#[cfg(test)]
541mod tests {
542 use super::*;
543 use crate::threads::ThreadKind;
544
545 #[test]
546 fn test_create_chat_thread() {
547 let mut mgr = ThreadManager::new();
548 let id = mgr.create_chat("Test Chat");
549
550 assert!(mgr.get(id).is_some());
551 assert_eq!(mgr.foreground_id(), Some(id));
552 assert!(mgr.get(id).unwrap().is_foreground);
553 }
554
555 #[test]
556 fn test_create_subagent() {
557 let mut mgr = ThreadManager::new();
558 let parent = mgr.create_chat("Main");
559 let child = mgr.create_subagent("Worker", "gpt-4", "Do the thing", Some(parent));
560
561 let thread = mgr.get(child).unwrap();
562 assert!(matches!(thread.kind, ThreadKind::SubAgent { .. }));
563 assert_eq!(thread.parent_id, Some(parent));
564 }
565
566 #[test]
567 fn test_switch_foreground() {
568 let mut mgr = ThreadManager::new();
569 let id1 = mgr.create_chat("Chat 1");
570 let id2 = mgr.create_chat("Chat 2");
571
572 assert_eq!(mgr.foreground_id(), Some(id2));
573 assert!(!mgr.get(id1).unwrap().is_foreground);
574
575 mgr.switch_foreground(id1);
576 assert_eq!(mgr.foreground_id(), Some(id1));
577 assert!(mgr.get(id1).unwrap().is_foreground);
578 assert!(!mgr.get(id2).unwrap().is_foreground);
579 }
580
581 #[test]
582 fn test_set_description() {
583 let mut mgr = ThreadManager::new();
584 let id = mgr.create_chat("Test");
585
586 mgr.set_description(id, "Working on taxes");
587 assert_eq!(mgr.get(id).unwrap().description.as_deref(), Some("Working on taxes"));
588 }
589
590 #[test]
591 fn test_complete_and_fail() {
592 let mut mgr = ThreadManager::new();
593
594 let task1 = mgr.create_task("Task 1", "Do thing", None);
595 mgr.complete(task1, Some("Done!".into()), Some("result data".into()));
596 assert!(mgr.get(task1).unwrap().status.is_terminal());
597 assert_eq!(mgr.get(task1).unwrap().result.as_deref(), Some("result data"));
598
599 let task2 = mgr.create_task("Task 2", "Do other thing", None);
600 mgr.fail(task2, "Something went wrong");
601 assert!(matches!(mgr.get(task2).unwrap().status, ThreadStatus::Failed { .. }));
602 }
603
604 #[test]
605 fn test_list_info() {
606 let mut mgr = ThreadManager::new();
607 mgr.create_chat("Chat");
608 mgr.create_subagent("Worker", "gpt-4", "task", None);
609
610 let info = mgr.list_info();
611 assert_eq!(info.len(), 2);
612 }
613}