1use anyhow::Result;
7use std::collections::HashMap;
8use std::path::Path;
9use std::sync::{Arc, RwLock};
10use std::time::Instant;
11
12use super::events::{StreamEvent, StreamEventKind};
13
14const MAX_OUTPUT_LINES: usize = 10_000;
16
17const MAX_EVENTS: usize = 50_000;
19
20#[derive(Debug, Clone, PartialEq, Eq)]
22pub enum SessionStatus {
23 Starting,
24 Running,
25 Completed,
26 Failed,
27}
28
29#[derive(Debug)]
31pub struct SessionStream {
32 pub session_id: String,
34 pub task_id: String,
36 pub tag: String,
38 pub events: Vec<StreamEvent>,
40 pub output_lines: Vec<String>,
42 pub status: SessionStatus,
44 pub started_at: Instant,
46 pub pid: Option<u32>,
48 partial_line: String,
50}
51
52impl SessionStream {
53 pub fn new(task_id: &str, tag: &str) -> Self {
54 Self {
55 session_id: String::new(),
56 task_id: task_id.to_string(),
57 tag: tag.to_string(),
58 events: Vec::new(),
59 output_lines: Vec::new(),
60 status: SessionStatus::Starting,
61 started_at: Instant::now(),
62 pid: None,
63 partial_line: String::new(),
64 }
65 }
66
67 pub fn push_event(&mut self, mut event: StreamEvent) {
69 event.timestamp_ms = self.started_at.elapsed().as_millis() as u64;
70
71 match &event.kind {
73 StreamEventKind::TextDelta { text } => {
74 self.append_text(text);
75 }
76 StreamEventKind::ToolStart {
77 tool_name,
78 input_summary,
79 ..
80 } => {
81 self.flush_partial_line();
83 self.push_line(format!(">> {} {}", tool_name, input_summary));
84 }
85 StreamEventKind::ToolResult {
86 tool_name, success, ..
87 } => {
88 self.flush_partial_line();
89 let status = if *success { "ok" } else { "failed" };
90 self.push_line(format!("<< {} {}", tool_name, status));
91 }
92 StreamEventKind::Complete { success } => {
93 self.flush_partial_line();
94 self.status = if *success {
95 SessionStatus::Completed
96 } else {
97 SessionStatus::Failed
98 };
99 }
100 StreamEventKind::Error { message } => {
101 self.flush_partial_line();
102 self.push_line(format!("ERROR: {}", message));
103 self.status = SessionStatus::Failed;
104 }
105 StreamEventKind::SessionAssigned { session_id } => {
106 self.session_id = session_id.clone();
107 self.status = SessionStatus::Running;
108 }
109 }
110
111 if matches!(self.status, SessionStatus::Starting)
113 && matches!(
114 event.kind,
115 StreamEventKind::TextDelta { .. }
116 | StreamEventKind::ToolStart { .. }
117 | StreamEventKind::ToolResult { .. }
118 )
119 {
120 self.status = SessionStatus::Running;
121 }
122
123 if self.events.len() >= MAX_EVENTS {
125 let drain_count = MAX_EVENTS / 10;
127 self.events.drain(0..drain_count);
128 }
129 self.events.push(event);
130 }
131
132 fn append_text(&mut self, text: &str) {
134 for ch in text.chars() {
135 if ch == '\n' {
136 let line = std::mem::take(&mut self.partial_line);
138 self.push_line(line);
139 } else {
140 self.partial_line.push(ch);
141 }
142 }
143 }
144
145 fn flush_partial_line(&mut self) {
147 if !self.partial_line.is_empty() {
148 let line = std::mem::take(&mut self.partial_line);
149 self.push_line(line);
150 }
151 }
152
153 fn push_line(&mut self, line: String) {
155 if self.output_lines.len() >= MAX_OUTPUT_LINES {
156 let drain_count = MAX_OUTPUT_LINES / 10;
158 self.output_lines.drain(0..drain_count);
159 }
160 self.output_lines.push(line);
161 }
162
163 pub fn tail(&self, n: usize) -> &[String] {
165 let start = self.output_lines.len().saturating_sub(n);
166 &self.output_lines[start..]
167 }
168
169 pub fn get_all_output(&self) -> Vec<String> {
171 let mut lines = self.output_lines.clone();
172 if !self.partial_line.is_empty() {
173 lines.push(self.partial_line.clone());
174 }
175 lines
176 }
177
178 pub fn is_active(&self) -> bool {
180 matches!(
181 self.status,
182 SessionStatus::Starting | SessionStatus::Running
183 )
184 }
185
186 pub fn event_count(&self) -> usize {
188 self.events.len()
189 }
190
191 pub fn line_count(&self) -> usize {
193 self.output_lines.len()
194 }
195}
196
197#[derive(Debug, Clone, Default)]
199pub struct StreamStore {
200 sessions: Arc<RwLock<HashMap<String, SessionStream>>>,
201}
202
203impl StreamStore {
204 pub fn new() -> Self {
205 Self::default()
206 }
207
208 pub fn create_session(&self, task_id: &str, tag: &str) -> String {
210 let mut sessions = self.sessions.write().unwrap();
211 let stream = SessionStream::new(task_id, tag);
212 let key = task_id.to_string();
213 sessions.insert(key.clone(), stream);
214 key
215 }
216
217 pub fn push_event(&self, task_id: &str, event: StreamEvent) {
219 let mut sessions = self.sessions.write().unwrap();
220 if let Some(stream) = sessions.get_mut(task_id) {
221 stream.push_event(event);
222 }
223 }
224
225 pub fn set_session_id(&self, task_id: &str, session_id: &str) {
227 let mut sessions = self.sessions.write().unwrap();
228 if let Some(stream) = sessions.get_mut(task_id) {
229 stream.session_id = session_id.to_string();
230 stream.status = SessionStatus::Running;
231 }
232 }
233
234 pub fn set_pid(&self, task_id: &str, pid: u32) {
236 let mut sessions = self.sessions.write().unwrap();
237 if let Some(stream) = sessions.get_mut(task_id) {
238 stream.pid = Some(pid);
239 }
240 }
241
242 pub fn get_pid(&self, task_id: &str) -> Option<u32> {
244 let sessions = self.sessions.read().unwrap();
245 sessions.get(task_id).and_then(|s| s.pid)
246 }
247
248 pub fn get_output(&self, task_id: &str, limit: usize) -> Vec<String> {
250 let sessions = self.sessions.read().unwrap();
251 sessions
252 .get(task_id)
253 .map(|s| s.tail(limit).to_vec())
254 .unwrap_or_default()
255 }
256
257 pub fn get_all_output(&self, task_id: &str) -> Vec<String> {
259 let sessions = self.sessions.read().unwrap();
260 sessions
261 .get(task_id)
262 .map(|s| s.get_all_output())
263 .unwrap_or_default()
264 }
265
266 pub fn get_status(&self, task_id: &str) -> Option<SessionStatus> {
268 let sessions = self.sessions.read().unwrap();
269 sessions.get(task_id).map(|s| s.status.clone())
270 }
271
272 pub fn get_session_id(&self, task_id: &str) -> Option<String> {
274 let sessions = self.sessions.read().unwrap();
275 sessions
276 .get(task_id)
277 .filter(|s| !s.session_id.is_empty())
278 .map(|s| s.session_id.clone())
279 }
280
281 pub fn active_tasks(&self) -> Vec<String> {
283 let sessions = self.sessions.read().unwrap();
284 sessions
285 .iter()
286 .filter(|(_, s)| s.is_active())
287 .map(|(k, _)| k.clone())
288 .collect()
289 }
290
291 pub fn all_tasks(&self) -> Vec<String> {
293 let sessions = self.sessions.read().unwrap();
294 sessions.keys().cloned().collect()
295 }
296
297 pub fn has_session(&self, task_id: &str) -> bool {
299 let sessions = self.sessions.read().unwrap();
300 sessions.contains_key(task_id)
301 }
302
303 pub fn remove_session(&self, task_id: &str) -> Option<SessionStream> {
305 let mut sessions = self.sessions.write().unwrap();
306 sessions.remove(task_id)
307 }
308
309 pub fn session_stats(&self, task_id: &str) -> Option<(usize, usize)> {
311 let sessions = self.sessions.read().unwrap();
312 sessions
313 .get(task_id)
314 .map(|s| (s.event_count(), s.line_count()))
315 }
316
317 pub fn get_elapsed_secs(&self, task_id: &str) -> Option<u64> {
319 let sessions = self.sessions.read().unwrap();
320 sessions
321 .get(task_id)
322 .map(|s| s.started_at.elapsed().as_secs())
323 }
324
325 pub fn get_last_tool_line(&self, task_id: &str) -> Option<String> {
327 let sessions = self.sessions.read().unwrap();
328 sessions.get(task_id).and_then(|s| {
329 s.output_lines
330 .iter()
331 .rev()
332 .find(|l| l.starts_with(">>") || l.starts_with("<<"))
333 .cloned()
334 })
335 }
336
337 pub fn save_session_metadata(&self, task_id: &str, project_root: &Path) -> Result<()> {
342 let sessions = self.sessions.read().unwrap();
343 let session = sessions
344 .get(task_id)
345 .ok_or_else(|| anyhow::anyhow!("Session not found: {}", task_id))?;
346
347 let metadata_dir = project_root.join(".scud").join("headless");
348 std::fs::create_dir_all(&metadata_dir)?;
349
350 let metadata = serde_json::json!({
351 "task_id": session.task_id,
352 "session_id": session.session_id,
353 "tag": session.tag,
354 "pid": session.pid,
355 "status": format!("{:?}", session.status),
356 "started_at_ms": session.started_at.elapsed().as_millis() as u64,
357 });
358
359 let metadata_file = metadata_dir.join(format!("{}.json", task_id));
360 std::fs::write(&metadata_file, serde_json::to_string_pretty(&metadata)?)?;
361
362 Ok(())
363 }
364
365 pub fn load_session_metadata(task_id: &str, project_root: &Path) -> Result<Option<String>> {
369 let metadata_file = project_root
370 .join(".scud")
371 .join("headless")
372 .join(format!("{}.json", task_id));
373
374 if !metadata_file.exists() {
375 return Ok(None);
376 }
377
378 let content = std::fs::read_to_string(&metadata_file)?;
379 let data: serde_json::Value = serde_json::from_str(&content)?;
380
381 Ok(data
382 .get("session_id")
383 .and_then(|v| v.as_str())
384 .filter(|s| !s.is_empty())
385 .map(|s| s.to_string()))
386 }
387}
388
389#[cfg(test)]
390mod tests {
391 use super::*;
392
393 #[test]
394 fn test_session_stream_new() {
395 let stream = SessionStream::new("task-1", "phase-a");
396 assert_eq!(stream.task_id, "task-1");
397 assert_eq!(stream.tag, "phase-a");
398 assert_eq!(stream.status, SessionStatus::Starting);
399 assert!(stream.session_id.is_empty());
400 assert!(stream.events.is_empty());
401 assert!(stream.output_lines.is_empty());
402 }
403
404 #[test]
405 fn test_push_text_delta_single_line() {
406 let mut stream = SessionStream::new("task-1", "test");
407 stream.push_event(StreamEvent::text_delta("Hello world"));
408
409 assert_eq!(stream.output_lines.len(), 0);
411 assert_eq!(stream.partial_line, "Hello world");
412 assert_eq!(stream.events.len(), 1);
413 }
414
415 #[test]
416 fn test_push_text_delta_with_newline() {
417 let mut stream = SessionStream::new("task-1", "test");
418 stream.push_event(StreamEvent::text_delta("Hello\nWorld\n"));
419
420 assert_eq!(stream.output_lines.len(), 2);
421 assert_eq!(stream.output_lines[0], "Hello");
422 assert_eq!(stream.output_lines[1], "World");
423 assert!(stream.partial_line.is_empty());
424 }
425
426 #[test]
427 fn test_push_text_delta_incremental() {
428 let mut stream = SessionStream::new("task-1", "test");
429 stream.push_event(StreamEvent::text_delta("Hel"));
430 stream.push_event(StreamEvent::text_delta("lo "));
431 stream.push_event(StreamEvent::text_delta("world\n"));
432
433 assert_eq!(stream.output_lines.len(), 1);
434 assert_eq!(stream.output_lines[0], "Hello world");
435 }
436
437 #[test]
438 fn test_push_tool_start() {
439 let mut stream = SessionStream::new("task-1", "test");
440 stream.push_event(StreamEvent::text_delta("Some text"));
441 stream.push_event(StreamEvent::tool_start("Read", "tool-1", "src/main.rs"));
442
443 assert_eq!(stream.output_lines.len(), 2);
445 assert_eq!(stream.output_lines[0], "Some text");
446 assert_eq!(stream.output_lines[1], ">> Read src/main.rs");
447 }
448
449 #[test]
450 fn test_push_tool_result() {
451 let mut stream = SessionStream::new("task-1", "test");
452 stream.push_event(StreamEvent::new(StreamEventKind::ToolResult {
453 tool_name: "Read".to_string(),
454 tool_id: "tool-1".to_string(),
455 success: true,
456 }));
457
458 assert_eq!(stream.output_lines.len(), 1);
459 assert_eq!(stream.output_lines[0], "<< Read ok");
460 }
461
462 #[test]
463 fn test_push_tool_result_failed() {
464 let mut stream = SessionStream::new("task-1", "test");
465 stream.push_event(StreamEvent::new(StreamEventKind::ToolResult {
466 tool_name: "Bash".to_string(),
467 tool_id: "tool-2".to_string(),
468 success: false,
469 }));
470
471 assert_eq!(stream.output_lines[0], "<< Bash failed");
472 }
473
474 #[test]
475 fn test_session_assigned() {
476 let mut stream = SessionStream::new("task-1", "test");
477 assert_eq!(stream.status, SessionStatus::Starting);
478
479 stream.push_event(StreamEvent::new(StreamEventKind::SessionAssigned {
480 session_id: "sess-abc123".to_string(),
481 }));
482
483 assert_eq!(stream.session_id, "sess-abc123");
484 assert_eq!(stream.status, SessionStatus::Running);
485 }
486
487 #[test]
488 fn test_complete_success() {
489 let mut stream = SessionStream::new("task-1", "test");
490 stream.push_event(StreamEvent::complete(true));
491
492 assert_eq!(stream.status, SessionStatus::Completed);
493 }
494
495 #[test]
496 fn test_complete_failure() {
497 let mut stream = SessionStream::new("task-1", "test");
498 stream.push_event(StreamEvent::complete(false));
499
500 assert_eq!(stream.status, SessionStatus::Failed);
501 }
502
503 #[test]
504 fn test_error_event() {
505 let mut stream = SessionStream::new("task-1", "test");
506 stream.push_event(StreamEvent::error("Something went wrong"));
507
508 assert_eq!(stream.status, SessionStatus::Failed);
509 assert_eq!(stream.output_lines[0], "ERROR: Something went wrong");
510 }
511
512 #[test]
513 fn test_tail() {
514 let mut stream = SessionStream::new("task-1", "test");
515 for i in 0..10 {
516 stream.push_event(StreamEvent::text_delta(&format!("Line {}\n", i)));
517 }
518
519 let last3 = stream.tail(3);
520 assert_eq!(last3.len(), 3);
521 assert_eq!(last3[0], "Line 7");
522 assert_eq!(last3[1], "Line 8");
523 assert_eq!(last3[2], "Line 9");
524 }
525
526 #[test]
527 fn test_tail_less_than_requested() {
528 let mut stream = SessionStream::new("task-1", "test");
529 stream.push_event(StreamEvent::text_delta("Only one\n"));
530
531 let last10 = stream.tail(10);
532 assert_eq!(last10.len(), 1);
533 assert_eq!(last10[0], "Only one");
534 }
535
536 #[test]
537 fn test_get_all_output_with_partial() {
538 let mut stream = SessionStream::new("task-1", "test");
539 stream.push_event(StreamEvent::text_delta("Complete line\n"));
540 stream.push_event(StreamEvent::text_delta("Partial"));
541
542 let output = stream.get_all_output();
543 assert_eq!(output.len(), 2);
544 assert_eq!(output[0], "Complete line");
545 assert_eq!(output[1], "Partial");
546 }
547
548 #[test]
549 fn test_is_active() {
550 let mut stream = SessionStream::new("task-1", "test");
551 assert!(stream.is_active()); stream.status = SessionStatus::Running;
554 assert!(stream.is_active());
555
556 stream.status = SessionStatus::Completed;
557 assert!(!stream.is_active());
558
559 stream.status = SessionStatus::Failed;
560 assert!(!stream.is_active());
561 }
562
563 #[test]
564 fn test_event_timestamp() {
565 let mut stream = SessionStream::new("task-1", "test");
566
567 std::thread::sleep(std::time::Duration::from_millis(10));
569
570 stream.push_event(StreamEvent::text_delta("Hello"));
571 assert!(stream.events[0].timestamp_ms > 0);
572 }
573
574 #[test]
577 fn test_store_create_session() {
578 let store = StreamStore::new();
579 let key = store.create_session("task-1", "phase-a");
580
581 assert_eq!(key, "task-1");
582 assert!(store.has_session("task-1"));
583 }
584
585 #[test]
586 fn test_store_push_event() {
587 let store = StreamStore::new();
588 store.create_session("task-1", "phase-a");
589 store.push_event("task-1", StreamEvent::text_delta("Hello\n"));
590
591 let output = store.get_output("task-1", 100);
592 assert_eq!(output.len(), 1);
593 assert_eq!(output[0], "Hello");
594 }
595
596 #[test]
597 fn test_store_set_session_id() {
598 let store = StreamStore::new();
599 store.create_session("task-1", "phase-a");
600 store.set_session_id("task-1", "sess-xyz");
601
602 let session_id = store.get_session_id("task-1");
603 assert_eq!(session_id, Some("sess-xyz".to_string()));
604 }
605
606 #[test]
607 fn test_store_set_pid() {
608 let store = StreamStore::new();
609 store.create_session("task-1", "phase-a");
610 store.set_pid("task-1", 12345);
611
612 assert!(store.has_session("task-1"));
614 }
615
616 #[test]
617 fn test_store_get_status() {
618 let store = StreamStore::new();
619 store.create_session("task-1", "phase-a");
620
621 assert_eq!(store.get_status("task-1"), Some(SessionStatus::Starting));
622
623 store.push_event("task-1", StreamEvent::complete(true));
624 assert_eq!(store.get_status("task-1"), Some(SessionStatus::Completed));
625 }
626
627 #[test]
628 fn test_store_active_tasks() {
629 let store = StreamStore::new();
630 store.create_session("task-1", "phase-a");
631 store.create_session("task-2", "phase-a");
632 store.push_event("task-2", StreamEvent::complete(true));
633
634 let active = store.active_tasks();
635 assert_eq!(active.len(), 1);
636 assert!(active.contains(&"task-1".to_string()));
637 }
638
639 #[test]
640 fn test_store_all_tasks() {
641 let store = StreamStore::new();
642 store.create_session("task-1", "phase-a");
643 store.create_session("task-2", "phase-b");
644
645 let all = store.all_tasks();
646 assert_eq!(all.len(), 2);
647 }
648
649 #[test]
650 fn test_store_remove_session() {
651 let store = StreamStore::new();
652 store.create_session("task-1", "phase-a");
653 assert!(store.has_session("task-1"));
654
655 let removed = store.remove_session("task-1");
656 assert!(removed.is_some());
657 assert!(!store.has_session("task-1"));
658 }
659
660 #[test]
661 fn test_store_session_stats() {
662 let store = StreamStore::new();
663 store.create_session("task-1", "phase-a");
664 store.push_event("task-1", StreamEvent::text_delta("Line 1\n"));
665 store.push_event("task-1", StreamEvent::text_delta("Line 2\n"));
666
667 let stats = store.session_stats("task-1");
668 assert!(stats.is_some());
669 let (events, lines) = stats.unwrap();
670 assert_eq!(events, 2);
671 assert_eq!(lines, 2);
672 }
673
674 #[test]
675 fn test_store_nonexistent_session() {
676 let store = StreamStore::new();
677
678 assert_eq!(store.get_output("nonexistent", 100), Vec::<String>::new());
679 assert_eq!(store.get_status("nonexistent"), None);
680 assert_eq!(store.get_session_id("nonexistent"), None);
681 }
682
683 #[test]
684 fn test_store_thread_safety() {
685 use std::sync::Arc;
686 use std::thread;
687
688 let store = Arc::new(StreamStore::new());
689 store.create_session("task-1", "phase-a");
690
691 let handles: Vec<_> = (0..10)
692 .map(|i| {
693 let store = Arc::clone(&store);
694 thread::spawn(move || {
695 for j in 0..100 {
696 store.push_event(
697 "task-1",
698 StreamEvent::text_delta(&format!("Thread {} line {}\n", i, j)),
699 );
700 }
701 })
702 })
703 .collect();
704
705 for handle in handles {
706 handle.join().unwrap();
707 }
708
709 let stats = store.session_stats("task-1").unwrap();
710 assert_eq!(stats.0, 1000); assert_eq!(stats.1, 1000); }
713
714 #[test]
715 fn test_memory_limit_output_lines() {
716 let mut stream = SessionStream::new("task-1", "test");
717
718 for i in 0..MAX_OUTPUT_LINES + 1000 {
720 stream.push_event(StreamEvent::text_delta(&format!("Line {}\n", i)));
721 }
722
723 assert!(stream.output_lines.len() <= MAX_OUTPUT_LINES);
725 }
726
727 #[test]
728 fn test_memory_limit_events() {
729 let mut stream = SessionStream::new("task-1", "test");
730
731 for i in 0..MAX_EVENTS + 1000 {
733 stream.push_event(StreamEvent::text_delta(&format!("{}", i)));
734 }
735
736 assert!(stream.events.len() <= MAX_EVENTS);
738 }
739
740 #[test]
741 fn test_save_and_load_session_metadata() {
742 let temp_dir = std::env::temp_dir().join(format!("scud_test_{}", std::process::id()));
743 std::fs::create_dir_all(&temp_dir).unwrap();
744
745 let store = StreamStore::new();
746 store.create_session("task-1", "phase-a");
747 store.set_session_id("task-1", "sess-abc123");
748 store.set_pid("task-1", 12345);
749
750 store.save_session_metadata("task-1", &temp_dir).unwrap();
752
753 let metadata_file = temp_dir.join(".scud").join("headless").join("task-1.json");
755 assert!(metadata_file.exists());
756
757 let loaded = StreamStore::load_session_metadata("task-1", &temp_dir).unwrap();
759 assert_eq!(loaded, Some("sess-abc123".to_string()));
760
761 std::fs::remove_dir_all(&temp_dir).ok();
763 }
764
765 #[test]
766 fn test_load_nonexistent_metadata() {
767 let temp_dir = std::env::temp_dir().join(format!("scud_test_ne_{}", std::process::id()));
768 let loaded = StreamStore::load_session_metadata("nonexistent", &temp_dir).unwrap();
769 assert_eq!(loaded, None);
770 }
771
772 #[test]
773 fn test_get_session_id_empty_string() {
774 let store = StreamStore::new();
775 store.create_session("task-1", "phase-a");
776 let session_id = store.get_session_id("task-1");
780 assert_eq!(session_id, None);
781 }
782}