1use anyhow::Result;
7use std::collections::HashMap;
8use std::path::Path;
9use std::sync::{Arc, RwLock};
10use std::time::Instant;
11
12use super::events::{StreamEvent, StreamEventKind};
13
14const MAX_OUTPUT_LINES: usize = 10_000;
16
17const MAX_EVENTS: usize = 50_000;
19
20#[derive(Debug, Clone, PartialEq, Eq)]
22pub enum SessionStatus {
23 Starting,
24 Running,
25 Completed,
26 Failed,
27}
28
29#[derive(Debug)]
31pub struct SessionStream {
32 pub session_id: String,
34 pub task_id: String,
36 pub tag: String,
38 pub events: Vec<StreamEvent>,
40 pub output_lines: Vec<String>,
42 pub status: SessionStatus,
44 pub started_at: Instant,
46 pub pid: Option<u32>,
48 partial_line: String,
50}
51
52impl SessionStream {
53 pub fn new(task_id: &str, tag: &str) -> Self {
54 Self {
55 session_id: String::new(),
56 task_id: task_id.to_string(),
57 tag: tag.to_string(),
58 events: Vec::new(),
59 output_lines: Vec::new(),
60 status: SessionStatus::Starting,
61 started_at: Instant::now(),
62 pid: None,
63 partial_line: String::new(),
64 }
65 }
66
67 pub fn push_event(&mut self, mut event: StreamEvent) {
69 event.timestamp_ms = self.started_at.elapsed().as_millis() as u64;
70
71 match &event.kind {
73 StreamEventKind::TextDelta { text } => {
74 self.append_text(text);
75 }
76 StreamEventKind::ToolStart {
77 tool_name,
78 input_summary,
79 ..
80 } => {
81 self.flush_partial_line();
83 self.push_line(format!(">> {} {}", tool_name, input_summary));
84 }
85 StreamEventKind::ToolResult {
86 tool_name, success, ..
87 } => {
88 self.flush_partial_line();
89 let status = if *success { "ok" } else { "failed" };
90 self.push_line(format!("<< {} {}", tool_name, status));
91 }
92 StreamEventKind::Complete { success } => {
93 self.flush_partial_line();
94 self.status = if *success {
95 SessionStatus::Completed
96 } else {
97 SessionStatus::Failed
98 };
99 }
100 StreamEventKind::Error { message } => {
101 self.flush_partial_line();
102 self.push_line(format!("ERROR: {}", message));
103 self.status = SessionStatus::Failed;
104 }
105 StreamEventKind::SessionAssigned { session_id } => {
106 self.session_id = session_id.clone();
107 self.status = SessionStatus::Running;
108 }
109 }
110
111 if matches!(self.status, SessionStatus::Starting)
113 && matches!(
114 event.kind,
115 StreamEventKind::TextDelta { .. }
116 | StreamEventKind::ToolStart { .. }
117 | StreamEventKind::ToolResult { .. }
118 )
119 {
120 self.status = SessionStatus::Running;
121 }
122
123 if self.events.len() >= MAX_EVENTS {
125 let drain_count = MAX_EVENTS / 10;
127 self.events.drain(0..drain_count);
128 }
129 self.events.push(event);
130 }
131
132 fn append_text(&mut self, text: &str) {
134 for ch in text.chars() {
135 if ch == '\n' {
136 let line = std::mem::take(&mut self.partial_line);
138 self.push_line(line);
139 } else {
140 self.partial_line.push(ch);
141 }
142 }
143 }
144
145 fn flush_partial_line(&mut self) {
147 if !self.partial_line.is_empty() {
148 let line = std::mem::take(&mut self.partial_line);
149 self.push_line(line);
150 }
151 }
152
153 fn push_line(&mut self, line: String) {
155 if self.output_lines.len() >= MAX_OUTPUT_LINES {
156 let drain_count = MAX_OUTPUT_LINES / 10;
158 self.output_lines.drain(0..drain_count);
159 }
160 self.output_lines.push(line);
161 }
162
163 pub fn tail(&self, n: usize) -> &[String] {
165 let start = self.output_lines.len().saturating_sub(n);
166 &self.output_lines[start..]
167 }
168
169 pub fn get_all_output(&self) -> Vec<String> {
171 let mut lines = self.output_lines.clone();
172 if !self.partial_line.is_empty() {
173 lines.push(self.partial_line.clone());
174 }
175 lines
176 }
177
178 pub fn is_active(&self) -> bool {
180 matches!(self.status, SessionStatus::Starting | SessionStatus::Running)
181 }
182
183 pub fn event_count(&self) -> usize {
185 self.events.len()
186 }
187
188 pub fn line_count(&self) -> usize {
190 self.output_lines.len()
191 }
192}
193
194#[derive(Debug, Clone, Default)]
196pub struct StreamStore {
197 sessions: Arc<RwLock<HashMap<String, SessionStream>>>,
198}
199
200impl StreamStore {
201 pub fn new() -> Self {
202 Self::default()
203 }
204
205 pub fn create_session(&self, task_id: &str, tag: &str) -> String {
207 let mut sessions = self.sessions.write().unwrap();
208 let stream = SessionStream::new(task_id, tag);
209 let key = task_id.to_string();
210 sessions.insert(key.clone(), stream);
211 key
212 }
213
214 pub fn push_event(&self, task_id: &str, event: StreamEvent) {
216 let mut sessions = self.sessions.write().unwrap();
217 if let Some(stream) = sessions.get_mut(task_id) {
218 stream.push_event(event);
219 }
220 }
221
222 pub fn set_session_id(&self, task_id: &str, session_id: &str) {
224 let mut sessions = self.sessions.write().unwrap();
225 if let Some(stream) = sessions.get_mut(task_id) {
226 stream.session_id = session_id.to_string();
227 stream.status = SessionStatus::Running;
228 }
229 }
230
231 pub fn set_pid(&self, task_id: &str, pid: u32) {
233 let mut sessions = self.sessions.write().unwrap();
234 if let Some(stream) = sessions.get_mut(task_id) {
235 stream.pid = Some(pid);
236 }
237 }
238
239 pub fn get_pid(&self, task_id: &str) -> Option<u32> {
241 let sessions = self.sessions.read().unwrap();
242 sessions.get(task_id).and_then(|s| s.pid)
243 }
244
245 pub fn get_output(&self, task_id: &str, limit: usize) -> Vec<String> {
247 let sessions = self.sessions.read().unwrap();
248 sessions
249 .get(task_id)
250 .map(|s| s.tail(limit).to_vec())
251 .unwrap_or_default()
252 }
253
254 pub fn get_all_output(&self, task_id: &str) -> Vec<String> {
256 let sessions = self.sessions.read().unwrap();
257 sessions
258 .get(task_id)
259 .map(|s| s.get_all_output())
260 .unwrap_or_default()
261 }
262
263 pub fn get_status(&self, task_id: &str) -> Option<SessionStatus> {
265 let sessions = self.sessions.read().unwrap();
266 sessions.get(task_id).map(|s| s.status.clone())
267 }
268
269 pub fn get_session_id(&self, task_id: &str) -> Option<String> {
271 let sessions = self.sessions.read().unwrap();
272 sessions
273 .get(task_id)
274 .filter(|s| !s.session_id.is_empty())
275 .map(|s| s.session_id.clone())
276 }
277
278 pub fn active_tasks(&self) -> Vec<String> {
280 let sessions = self.sessions.read().unwrap();
281 sessions
282 .iter()
283 .filter(|(_, s)| s.is_active())
284 .map(|(k, _)| k.clone())
285 .collect()
286 }
287
288 pub fn all_tasks(&self) -> Vec<String> {
290 let sessions = self.sessions.read().unwrap();
291 sessions.keys().cloned().collect()
292 }
293
294 pub fn has_session(&self, task_id: &str) -> bool {
296 let sessions = self.sessions.read().unwrap();
297 sessions.contains_key(task_id)
298 }
299
300 pub fn remove_session(&self, task_id: &str) -> Option<SessionStream> {
302 let mut sessions = self.sessions.write().unwrap();
303 sessions.remove(task_id)
304 }
305
306 pub fn session_stats(&self, task_id: &str) -> Option<(usize, usize)> {
308 let sessions = self.sessions.read().unwrap();
309 sessions
310 .get(task_id)
311 .map(|s| (s.event_count(), s.line_count()))
312 }
313
314 pub fn get_elapsed_secs(&self, task_id: &str) -> Option<u64> {
316 let sessions = self.sessions.read().unwrap();
317 sessions
318 .get(task_id)
319 .map(|s| s.started_at.elapsed().as_secs())
320 }
321
322 pub fn get_last_tool_line(&self, task_id: &str) -> Option<String> {
324 let sessions = self.sessions.read().unwrap();
325 sessions.get(task_id).and_then(|s| {
326 s.output_lines
327 .iter()
328 .rev()
329 .find(|l| l.starts_with(">>") || l.starts_with("<<"))
330 .cloned()
331 })
332 }
333
334 pub fn save_session_metadata(&self, task_id: &str, project_root: &Path) -> Result<()> {
339 let sessions = self.sessions.read().unwrap();
340 let session = sessions
341 .get(task_id)
342 .ok_or_else(|| anyhow::anyhow!("Session not found: {}", task_id))?;
343
344 let metadata_dir = project_root.join(".scud").join("headless");
345 std::fs::create_dir_all(&metadata_dir)?;
346
347 let metadata = serde_json::json!({
348 "task_id": session.task_id,
349 "session_id": session.session_id,
350 "tag": session.tag,
351 "pid": session.pid,
352 "status": format!("{:?}", session.status),
353 "started_at_ms": session.started_at.elapsed().as_millis() as u64,
354 });
355
356 let metadata_file = metadata_dir.join(format!("{}.json", task_id));
357 std::fs::write(&metadata_file, serde_json::to_string_pretty(&metadata)?)?;
358
359 Ok(())
360 }
361
362 pub fn load_session_metadata(task_id: &str, project_root: &Path) -> Result<Option<String>> {
366 let metadata_file = project_root
367 .join(".scud")
368 .join("headless")
369 .join(format!("{}.json", task_id));
370
371 if !metadata_file.exists() {
372 return Ok(None);
373 }
374
375 let content = std::fs::read_to_string(&metadata_file)?;
376 let data: serde_json::Value = serde_json::from_str(&content)?;
377
378 Ok(data
379 .get("session_id")
380 .and_then(|v| v.as_str())
381 .filter(|s| !s.is_empty())
382 .map(|s| s.to_string()))
383 }
384}
385
386#[cfg(test)]
387mod tests {
388 use super::*;
389
390 #[test]
391 fn test_session_stream_new() {
392 let stream = SessionStream::new("task-1", "phase-a");
393 assert_eq!(stream.task_id, "task-1");
394 assert_eq!(stream.tag, "phase-a");
395 assert_eq!(stream.status, SessionStatus::Starting);
396 assert!(stream.session_id.is_empty());
397 assert!(stream.events.is_empty());
398 assert!(stream.output_lines.is_empty());
399 }
400
401 #[test]
402 fn test_push_text_delta_single_line() {
403 let mut stream = SessionStream::new("task-1", "test");
404 stream.push_event(StreamEvent::text_delta("Hello world"));
405
406 assert_eq!(stream.output_lines.len(), 0);
408 assert_eq!(stream.partial_line, "Hello world");
409 assert_eq!(stream.events.len(), 1);
410 }
411
412 #[test]
413 fn test_push_text_delta_with_newline() {
414 let mut stream = SessionStream::new("task-1", "test");
415 stream.push_event(StreamEvent::text_delta("Hello\nWorld\n"));
416
417 assert_eq!(stream.output_lines.len(), 2);
418 assert_eq!(stream.output_lines[0], "Hello");
419 assert_eq!(stream.output_lines[1], "World");
420 assert!(stream.partial_line.is_empty());
421 }
422
423 #[test]
424 fn test_push_text_delta_incremental() {
425 let mut stream = SessionStream::new("task-1", "test");
426 stream.push_event(StreamEvent::text_delta("Hel"));
427 stream.push_event(StreamEvent::text_delta("lo "));
428 stream.push_event(StreamEvent::text_delta("world\n"));
429
430 assert_eq!(stream.output_lines.len(), 1);
431 assert_eq!(stream.output_lines[0], "Hello world");
432 }
433
434 #[test]
435 fn test_push_tool_start() {
436 let mut stream = SessionStream::new("task-1", "test");
437 stream.push_event(StreamEvent::text_delta("Some text"));
438 stream.push_event(StreamEvent::tool_start("Read", "tool-1", "src/main.rs"));
439
440 assert_eq!(stream.output_lines.len(), 2);
442 assert_eq!(stream.output_lines[0], "Some text");
443 assert_eq!(stream.output_lines[1], ">> Read src/main.rs");
444 }
445
446 #[test]
447 fn test_push_tool_result() {
448 let mut stream = SessionStream::new("task-1", "test");
449 stream.push_event(StreamEvent::new(StreamEventKind::ToolResult {
450 tool_name: "Read".to_string(),
451 tool_id: "tool-1".to_string(),
452 success: true,
453 }));
454
455 assert_eq!(stream.output_lines.len(), 1);
456 assert_eq!(stream.output_lines[0], "<< Read ok");
457 }
458
459 #[test]
460 fn test_push_tool_result_failed() {
461 let mut stream = SessionStream::new("task-1", "test");
462 stream.push_event(StreamEvent::new(StreamEventKind::ToolResult {
463 tool_name: "Bash".to_string(),
464 tool_id: "tool-2".to_string(),
465 success: false,
466 }));
467
468 assert_eq!(stream.output_lines[0], "<< Bash failed");
469 }
470
471 #[test]
472 fn test_session_assigned() {
473 let mut stream = SessionStream::new("task-1", "test");
474 assert_eq!(stream.status, SessionStatus::Starting);
475
476 stream.push_event(StreamEvent::new(StreamEventKind::SessionAssigned {
477 session_id: "sess-abc123".to_string(),
478 }));
479
480 assert_eq!(stream.session_id, "sess-abc123");
481 assert_eq!(stream.status, SessionStatus::Running);
482 }
483
484 #[test]
485 fn test_complete_success() {
486 let mut stream = SessionStream::new("task-1", "test");
487 stream.push_event(StreamEvent::complete(true));
488
489 assert_eq!(stream.status, SessionStatus::Completed);
490 }
491
492 #[test]
493 fn test_complete_failure() {
494 let mut stream = SessionStream::new("task-1", "test");
495 stream.push_event(StreamEvent::complete(false));
496
497 assert_eq!(stream.status, SessionStatus::Failed);
498 }
499
500 #[test]
501 fn test_error_event() {
502 let mut stream = SessionStream::new("task-1", "test");
503 stream.push_event(StreamEvent::error("Something went wrong"));
504
505 assert_eq!(stream.status, SessionStatus::Failed);
506 assert_eq!(stream.output_lines[0], "ERROR: Something went wrong");
507 }
508
509 #[test]
510 fn test_tail() {
511 let mut stream = SessionStream::new("task-1", "test");
512 for i in 0..10 {
513 stream.push_event(StreamEvent::text_delta(&format!("Line {}\n", i)));
514 }
515
516 let last3 = stream.tail(3);
517 assert_eq!(last3.len(), 3);
518 assert_eq!(last3[0], "Line 7");
519 assert_eq!(last3[1], "Line 8");
520 assert_eq!(last3[2], "Line 9");
521 }
522
523 #[test]
524 fn test_tail_less_than_requested() {
525 let mut stream = SessionStream::new("task-1", "test");
526 stream.push_event(StreamEvent::text_delta("Only one\n"));
527
528 let last10 = stream.tail(10);
529 assert_eq!(last10.len(), 1);
530 assert_eq!(last10[0], "Only one");
531 }
532
533 #[test]
534 fn test_get_all_output_with_partial() {
535 let mut stream = SessionStream::new("task-1", "test");
536 stream.push_event(StreamEvent::text_delta("Complete line\n"));
537 stream.push_event(StreamEvent::text_delta("Partial"));
538
539 let output = stream.get_all_output();
540 assert_eq!(output.len(), 2);
541 assert_eq!(output[0], "Complete line");
542 assert_eq!(output[1], "Partial");
543 }
544
545 #[test]
546 fn test_is_active() {
547 let mut stream = SessionStream::new("task-1", "test");
548 assert!(stream.is_active()); stream.status = SessionStatus::Running;
551 assert!(stream.is_active());
552
553 stream.status = SessionStatus::Completed;
554 assert!(!stream.is_active());
555
556 stream.status = SessionStatus::Failed;
557 assert!(!stream.is_active());
558 }
559
560 #[test]
561 fn test_event_timestamp() {
562 let mut stream = SessionStream::new("task-1", "test");
563
564 std::thread::sleep(std::time::Duration::from_millis(10));
566
567 stream.push_event(StreamEvent::text_delta("Hello"));
568 assert!(stream.events[0].timestamp_ms > 0);
569 }
570
571 #[test]
574 fn test_store_create_session() {
575 let store = StreamStore::new();
576 let key = store.create_session("task-1", "phase-a");
577
578 assert_eq!(key, "task-1");
579 assert!(store.has_session("task-1"));
580 }
581
582 #[test]
583 fn test_store_push_event() {
584 let store = StreamStore::new();
585 store.create_session("task-1", "phase-a");
586 store.push_event("task-1", StreamEvent::text_delta("Hello\n"));
587
588 let output = store.get_output("task-1", 100);
589 assert_eq!(output.len(), 1);
590 assert_eq!(output[0], "Hello");
591 }
592
593 #[test]
594 fn test_store_set_session_id() {
595 let store = StreamStore::new();
596 store.create_session("task-1", "phase-a");
597 store.set_session_id("task-1", "sess-xyz");
598
599 let session_id = store.get_session_id("task-1");
600 assert_eq!(session_id, Some("sess-xyz".to_string()));
601 }
602
603 #[test]
604 fn test_store_set_pid() {
605 let store = StreamStore::new();
606 store.create_session("task-1", "phase-a");
607 store.set_pid("task-1", 12345);
608
609 assert!(store.has_session("task-1"));
611 }
612
613 #[test]
614 fn test_store_get_status() {
615 let store = StreamStore::new();
616 store.create_session("task-1", "phase-a");
617
618 assert_eq!(store.get_status("task-1"), Some(SessionStatus::Starting));
619
620 store.push_event("task-1", StreamEvent::complete(true));
621 assert_eq!(store.get_status("task-1"), Some(SessionStatus::Completed));
622 }
623
624 #[test]
625 fn test_store_active_tasks() {
626 let store = StreamStore::new();
627 store.create_session("task-1", "phase-a");
628 store.create_session("task-2", "phase-a");
629 store.push_event("task-2", StreamEvent::complete(true));
630
631 let active = store.active_tasks();
632 assert_eq!(active.len(), 1);
633 assert!(active.contains(&"task-1".to_string()));
634 }
635
636 #[test]
637 fn test_store_all_tasks() {
638 let store = StreamStore::new();
639 store.create_session("task-1", "phase-a");
640 store.create_session("task-2", "phase-b");
641
642 let all = store.all_tasks();
643 assert_eq!(all.len(), 2);
644 }
645
646 #[test]
647 fn test_store_remove_session() {
648 let store = StreamStore::new();
649 store.create_session("task-1", "phase-a");
650 assert!(store.has_session("task-1"));
651
652 let removed = store.remove_session("task-1");
653 assert!(removed.is_some());
654 assert!(!store.has_session("task-1"));
655 }
656
657 #[test]
658 fn test_store_session_stats() {
659 let store = StreamStore::new();
660 store.create_session("task-1", "phase-a");
661 store.push_event("task-1", StreamEvent::text_delta("Line 1\n"));
662 store.push_event("task-1", StreamEvent::text_delta("Line 2\n"));
663
664 let stats = store.session_stats("task-1");
665 assert!(stats.is_some());
666 let (events, lines) = stats.unwrap();
667 assert_eq!(events, 2);
668 assert_eq!(lines, 2);
669 }
670
671 #[test]
672 fn test_store_nonexistent_session() {
673 let store = StreamStore::new();
674
675 assert_eq!(store.get_output("nonexistent", 100), Vec::<String>::new());
676 assert_eq!(store.get_status("nonexistent"), None);
677 assert_eq!(store.get_session_id("nonexistent"), None);
678 }
679
680 #[test]
681 fn test_store_thread_safety() {
682 use std::sync::Arc;
683 use std::thread;
684
685 let store = Arc::new(StreamStore::new());
686 store.create_session("task-1", "phase-a");
687
688 let handles: Vec<_> = (0..10)
689 .map(|i| {
690 let store = Arc::clone(&store);
691 thread::spawn(move || {
692 for j in 0..100 {
693 store.push_event(
694 "task-1",
695 StreamEvent::text_delta(&format!("Thread {} line {}\n", i, j)),
696 );
697 }
698 })
699 })
700 .collect();
701
702 for handle in handles {
703 handle.join().unwrap();
704 }
705
706 let stats = store.session_stats("task-1").unwrap();
707 assert_eq!(stats.0, 1000); assert_eq!(stats.1, 1000); }
710
711 #[test]
712 fn test_memory_limit_output_lines() {
713 let mut stream = SessionStream::new("task-1", "test");
714
715 for i in 0..MAX_OUTPUT_LINES + 1000 {
717 stream.push_event(StreamEvent::text_delta(&format!("Line {}\n", i)));
718 }
719
720 assert!(stream.output_lines.len() <= MAX_OUTPUT_LINES);
722 }
723
724 #[test]
725 fn test_memory_limit_events() {
726 let mut stream = SessionStream::new("task-1", "test");
727
728 for i in 0..MAX_EVENTS + 1000 {
730 stream.push_event(StreamEvent::text_delta(&format!("{}", i)));
731 }
732
733 assert!(stream.events.len() <= MAX_EVENTS);
735 }
736
737 #[test]
738 fn test_save_and_load_session_metadata() {
739 let temp_dir = std::env::temp_dir().join(format!("scud_test_{}", std::process::id()));
740 std::fs::create_dir_all(&temp_dir).unwrap();
741
742 let store = StreamStore::new();
743 store.create_session("task-1", "phase-a");
744 store.set_session_id("task-1", "sess-abc123");
745 store.set_pid("task-1", 12345);
746
747 store.save_session_metadata("task-1", &temp_dir).unwrap();
749
750 let metadata_file = temp_dir.join(".scud").join("headless").join("task-1.json");
752 assert!(metadata_file.exists());
753
754 let loaded = StreamStore::load_session_metadata("task-1", &temp_dir).unwrap();
756 assert_eq!(loaded, Some("sess-abc123".to_string()));
757
758 std::fs::remove_dir_all(&temp_dir).ok();
760 }
761
762 #[test]
763 fn test_load_nonexistent_metadata() {
764 let temp_dir = std::env::temp_dir().join(format!("scud_test_ne_{}", std::process::id()));
765 let loaded = StreamStore::load_session_metadata("nonexistent", &temp_dir).unwrap();
766 assert_eq!(loaded, None);
767 }
768
769 #[test]
770 fn test_get_session_id_empty_string() {
771 let store = StreamStore::new();
772 store.create_session("task-1", "phase-a");
773 let session_id = store.get_session_id("task-1");
777 assert_eq!(session_id, None);
778 }
779}