1use anyhow::Result;
7use std::collections::HashMap;
8use std::path::Path;
9use std::sync::{Arc, RwLock};
10use std::time::Instant;
11
12use super::events::{StreamEvent, StreamEventKind};
13
14const MAX_OUTPUT_LINES: usize = 10_000;
16
17const MAX_EVENTS: usize = 50_000;
19
20#[derive(Debug, Clone, PartialEq, Eq)]
22pub enum SessionStatus {
23 Starting,
24 Running,
25 Completed,
26 Failed,
27}
28
29#[derive(Debug)]
31pub struct SessionStream {
32 pub session_id: String,
34 pub task_id: String,
36 pub tag: String,
38 pub events: Vec<StreamEvent>,
40 pub output_lines: Vec<String>,
42 pub status: SessionStatus,
44 pub started_at: Instant,
46 pub pid: Option<u32>,
48 partial_line: String,
50}
51
52impl SessionStream {
53 pub fn new(task_id: &str, tag: &str) -> Self {
54 Self {
55 session_id: String::new(),
56 task_id: task_id.to_string(),
57 tag: tag.to_string(),
58 events: Vec::new(),
59 output_lines: Vec::new(),
60 status: SessionStatus::Starting,
61 started_at: Instant::now(),
62 pid: None,
63 partial_line: String::new(),
64 }
65 }
66
67 pub fn push_event(&mut self, mut event: StreamEvent) {
69 event.timestamp_ms = self.started_at.elapsed().as_millis() as u64;
70
71 match &event.kind {
73 StreamEventKind::TextDelta { text } => {
74 self.append_text(text);
75 }
76 StreamEventKind::ToolStart {
77 tool_name,
78 input_summary,
79 ..
80 } => {
81 self.flush_partial_line();
83 self.push_line(format!(">> {} {}", tool_name, input_summary));
84 }
85 StreamEventKind::ToolResult {
86 tool_name, success, ..
87 } => {
88 self.flush_partial_line();
89 let status = if *success { "ok" } else { "failed" };
90 self.push_line(format!("<< {} {}", tool_name, status));
91 }
92 StreamEventKind::Complete { success } => {
93 self.flush_partial_line();
94 self.status = if *success {
95 SessionStatus::Completed
96 } else {
97 SessionStatus::Failed
98 };
99 }
100 StreamEventKind::Error { message } => {
101 self.flush_partial_line();
102 self.push_line(format!("ERROR: {}", message));
103 self.status = SessionStatus::Failed;
104 }
105 StreamEventKind::SessionAssigned { session_id } => {
106 self.session_id = session_id.clone();
107 self.status = SessionStatus::Running;
108 }
109 }
110
111 if matches!(self.status, SessionStatus::Starting)
113 && matches!(
114 event.kind,
115 StreamEventKind::TextDelta { .. }
116 | StreamEventKind::ToolStart { .. }
117 | StreamEventKind::ToolResult { .. }
118 )
119 {
120 self.status = SessionStatus::Running;
121 }
122
123 if self.events.len() >= MAX_EVENTS {
125 let drain_count = MAX_EVENTS / 10;
127 self.events.drain(0..drain_count);
128 }
129 self.events.push(event);
130 }
131
132 fn append_text(&mut self, text: &str) {
134 for ch in text.chars() {
135 if ch == '\n' {
136 let line = std::mem::take(&mut self.partial_line);
138 self.push_line(line);
139 } else {
140 self.partial_line.push(ch);
141 }
142 }
143 }
144
145 fn flush_partial_line(&mut self) {
147 if !self.partial_line.is_empty() {
148 let line = std::mem::take(&mut self.partial_line);
149 self.push_line(line);
150 }
151 }
152
153 fn push_line(&mut self, line: String) {
155 if self.output_lines.len() >= MAX_OUTPUT_LINES {
156 let drain_count = MAX_OUTPUT_LINES / 10;
158 self.output_lines.drain(0..drain_count);
159 }
160 self.output_lines.push(line);
161 }
162
163 pub fn tail(&self, n: usize) -> &[String] {
165 let start = self.output_lines.len().saturating_sub(n);
166 &self.output_lines[start..]
167 }
168
169 pub fn get_all_output(&self) -> Vec<String> {
171 let mut lines = self.output_lines.clone();
172 if !self.partial_line.is_empty() {
173 lines.push(self.partial_line.clone());
174 }
175 lines
176 }
177
178 pub fn is_active(&self) -> bool {
180 matches!(self.status, SessionStatus::Starting | SessionStatus::Running)
181 }
182
183 pub fn event_count(&self) -> usize {
185 self.events.len()
186 }
187
188 pub fn line_count(&self) -> usize {
190 self.output_lines.len()
191 }
192}
193
194#[derive(Debug, Clone, Default)]
196pub struct StreamStore {
197 sessions: Arc<RwLock<HashMap<String, SessionStream>>>,
198}
199
200impl StreamStore {
201 pub fn new() -> Self {
202 Self::default()
203 }
204
205 pub fn create_session(&self, task_id: &str, tag: &str) -> String {
207 let mut sessions = self.sessions.write().unwrap();
208 let stream = SessionStream::new(task_id, tag);
209 let key = task_id.to_string();
210 sessions.insert(key.clone(), stream);
211 key
212 }
213
214 pub fn push_event(&self, task_id: &str, event: StreamEvent) {
216 let mut sessions = self.sessions.write().unwrap();
217 if let Some(stream) = sessions.get_mut(task_id) {
218 stream.push_event(event);
219 }
220 }
221
222 pub fn set_session_id(&self, task_id: &str, session_id: &str) {
224 let mut sessions = self.sessions.write().unwrap();
225 if let Some(stream) = sessions.get_mut(task_id) {
226 stream.session_id = session_id.to_string();
227 stream.status = SessionStatus::Running;
228 }
229 }
230
231 pub fn set_pid(&self, task_id: &str, pid: u32) {
233 let mut sessions = self.sessions.write().unwrap();
234 if let Some(stream) = sessions.get_mut(task_id) {
235 stream.pid = Some(pid);
236 }
237 }
238
239 pub fn get_output(&self, task_id: &str, limit: usize) -> Vec<String> {
241 let sessions = self.sessions.read().unwrap();
242 sessions
243 .get(task_id)
244 .map(|s| s.tail(limit).to_vec())
245 .unwrap_or_default()
246 }
247
248 pub fn get_all_output(&self, task_id: &str) -> Vec<String> {
250 let sessions = self.sessions.read().unwrap();
251 sessions
252 .get(task_id)
253 .map(|s| s.get_all_output())
254 .unwrap_or_default()
255 }
256
257 pub fn get_status(&self, task_id: &str) -> Option<SessionStatus> {
259 let sessions = self.sessions.read().unwrap();
260 sessions.get(task_id).map(|s| s.status.clone())
261 }
262
263 pub fn get_session_id(&self, task_id: &str) -> Option<String> {
265 let sessions = self.sessions.read().unwrap();
266 sessions
267 .get(task_id)
268 .filter(|s| !s.session_id.is_empty())
269 .map(|s| s.session_id.clone())
270 }
271
272 pub fn active_tasks(&self) -> Vec<String> {
274 let sessions = self.sessions.read().unwrap();
275 sessions
276 .iter()
277 .filter(|(_, s)| s.is_active())
278 .map(|(k, _)| k.clone())
279 .collect()
280 }
281
282 pub fn all_tasks(&self) -> Vec<String> {
284 let sessions = self.sessions.read().unwrap();
285 sessions.keys().cloned().collect()
286 }
287
288 pub fn has_session(&self, task_id: &str) -> bool {
290 let sessions = self.sessions.read().unwrap();
291 sessions.contains_key(task_id)
292 }
293
294 pub fn remove_session(&self, task_id: &str) -> Option<SessionStream> {
296 let mut sessions = self.sessions.write().unwrap();
297 sessions.remove(task_id)
298 }
299
300 pub fn session_stats(&self, task_id: &str) -> Option<(usize, usize)> {
302 let sessions = self.sessions.read().unwrap();
303 sessions
304 .get(task_id)
305 .map(|s| (s.event_count(), s.line_count()))
306 }
307
308 pub fn save_session_metadata(&self, task_id: &str, project_root: &Path) -> Result<()> {
313 let sessions = self.sessions.read().unwrap();
314 let session = sessions
315 .get(task_id)
316 .ok_or_else(|| anyhow::anyhow!("Session not found: {}", task_id))?;
317
318 let metadata_dir = project_root.join(".scud").join("headless");
319 std::fs::create_dir_all(&metadata_dir)?;
320
321 let metadata = serde_json::json!({
322 "task_id": session.task_id,
323 "session_id": session.session_id,
324 "tag": session.tag,
325 "pid": session.pid,
326 "status": format!("{:?}", session.status),
327 "started_at_ms": session.started_at.elapsed().as_millis() as u64,
328 });
329
330 let metadata_file = metadata_dir.join(format!("{}.json", task_id));
331 std::fs::write(&metadata_file, serde_json::to_string_pretty(&metadata)?)?;
332
333 Ok(())
334 }
335
336 pub fn load_session_metadata(task_id: &str, project_root: &Path) -> Result<Option<String>> {
340 let metadata_file = project_root
341 .join(".scud")
342 .join("headless")
343 .join(format!("{}.json", task_id));
344
345 if !metadata_file.exists() {
346 return Ok(None);
347 }
348
349 let content = std::fs::read_to_string(&metadata_file)?;
350 let data: serde_json::Value = serde_json::from_str(&content)?;
351
352 Ok(data
353 .get("session_id")
354 .and_then(|v| v.as_str())
355 .filter(|s| !s.is_empty())
356 .map(|s| s.to_string()))
357 }
358}
359
360#[cfg(test)]
361mod tests {
362 use super::*;
363
364 #[test]
365 fn test_session_stream_new() {
366 let stream = SessionStream::new("task-1", "phase-a");
367 assert_eq!(stream.task_id, "task-1");
368 assert_eq!(stream.tag, "phase-a");
369 assert_eq!(stream.status, SessionStatus::Starting);
370 assert!(stream.session_id.is_empty());
371 assert!(stream.events.is_empty());
372 assert!(stream.output_lines.is_empty());
373 }
374
375 #[test]
376 fn test_push_text_delta_single_line() {
377 let mut stream = SessionStream::new("task-1", "test");
378 stream.push_event(StreamEvent::text_delta("Hello world"));
379
380 assert_eq!(stream.output_lines.len(), 0);
382 assert_eq!(stream.partial_line, "Hello world");
383 assert_eq!(stream.events.len(), 1);
384 }
385
386 #[test]
387 fn test_push_text_delta_with_newline() {
388 let mut stream = SessionStream::new("task-1", "test");
389 stream.push_event(StreamEvent::text_delta("Hello\nWorld\n"));
390
391 assert_eq!(stream.output_lines.len(), 2);
392 assert_eq!(stream.output_lines[0], "Hello");
393 assert_eq!(stream.output_lines[1], "World");
394 assert!(stream.partial_line.is_empty());
395 }
396
397 #[test]
398 fn test_push_text_delta_incremental() {
399 let mut stream = SessionStream::new("task-1", "test");
400 stream.push_event(StreamEvent::text_delta("Hel"));
401 stream.push_event(StreamEvent::text_delta("lo "));
402 stream.push_event(StreamEvent::text_delta("world\n"));
403
404 assert_eq!(stream.output_lines.len(), 1);
405 assert_eq!(stream.output_lines[0], "Hello world");
406 }
407
408 #[test]
409 fn test_push_tool_start() {
410 let mut stream = SessionStream::new("task-1", "test");
411 stream.push_event(StreamEvent::text_delta("Some text"));
412 stream.push_event(StreamEvent::tool_start("Read", "tool-1", "src/main.rs"));
413
414 assert_eq!(stream.output_lines.len(), 2);
416 assert_eq!(stream.output_lines[0], "Some text");
417 assert_eq!(stream.output_lines[1], ">> Read src/main.rs");
418 }
419
420 #[test]
421 fn test_push_tool_result() {
422 let mut stream = SessionStream::new("task-1", "test");
423 stream.push_event(StreamEvent::new(StreamEventKind::ToolResult {
424 tool_name: "Read".to_string(),
425 tool_id: "tool-1".to_string(),
426 success: true,
427 }));
428
429 assert_eq!(stream.output_lines.len(), 1);
430 assert_eq!(stream.output_lines[0], "<< Read ok");
431 }
432
433 #[test]
434 fn test_push_tool_result_failed() {
435 let mut stream = SessionStream::new("task-1", "test");
436 stream.push_event(StreamEvent::new(StreamEventKind::ToolResult {
437 tool_name: "Bash".to_string(),
438 tool_id: "tool-2".to_string(),
439 success: false,
440 }));
441
442 assert_eq!(stream.output_lines[0], "<< Bash failed");
443 }
444
445 #[test]
446 fn test_session_assigned() {
447 let mut stream = SessionStream::new("task-1", "test");
448 assert_eq!(stream.status, SessionStatus::Starting);
449
450 stream.push_event(StreamEvent::new(StreamEventKind::SessionAssigned {
451 session_id: "sess-abc123".to_string(),
452 }));
453
454 assert_eq!(stream.session_id, "sess-abc123");
455 assert_eq!(stream.status, SessionStatus::Running);
456 }
457
458 #[test]
459 fn test_complete_success() {
460 let mut stream = SessionStream::new("task-1", "test");
461 stream.push_event(StreamEvent::complete(true));
462
463 assert_eq!(stream.status, SessionStatus::Completed);
464 }
465
466 #[test]
467 fn test_complete_failure() {
468 let mut stream = SessionStream::new("task-1", "test");
469 stream.push_event(StreamEvent::complete(false));
470
471 assert_eq!(stream.status, SessionStatus::Failed);
472 }
473
474 #[test]
475 fn test_error_event() {
476 let mut stream = SessionStream::new("task-1", "test");
477 stream.push_event(StreamEvent::error("Something went wrong"));
478
479 assert_eq!(stream.status, SessionStatus::Failed);
480 assert_eq!(stream.output_lines[0], "ERROR: Something went wrong");
481 }
482
483 #[test]
484 fn test_tail() {
485 let mut stream = SessionStream::new("task-1", "test");
486 for i in 0..10 {
487 stream.push_event(StreamEvent::text_delta(&format!("Line {}\n", i)));
488 }
489
490 let last3 = stream.tail(3);
491 assert_eq!(last3.len(), 3);
492 assert_eq!(last3[0], "Line 7");
493 assert_eq!(last3[1], "Line 8");
494 assert_eq!(last3[2], "Line 9");
495 }
496
497 #[test]
498 fn test_tail_less_than_requested() {
499 let mut stream = SessionStream::new("task-1", "test");
500 stream.push_event(StreamEvent::text_delta("Only one\n"));
501
502 let last10 = stream.tail(10);
503 assert_eq!(last10.len(), 1);
504 assert_eq!(last10[0], "Only one");
505 }
506
507 #[test]
508 fn test_get_all_output_with_partial() {
509 let mut stream = SessionStream::new("task-1", "test");
510 stream.push_event(StreamEvent::text_delta("Complete line\n"));
511 stream.push_event(StreamEvent::text_delta("Partial"));
512
513 let output = stream.get_all_output();
514 assert_eq!(output.len(), 2);
515 assert_eq!(output[0], "Complete line");
516 assert_eq!(output[1], "Partial");
517 }
518
519 #[test]
520 fn test_is_active() {
521 let mut stream = SessionStream::new("task-1", "test");
522 assert!(stream.is_active()); stream.status = SessionStatus::Running;
525 assert!(stream.is_active());
526
527 stream.status = SessionStatus::Completed;
528 assert!(!stream.is_active());
529
530 stream.status = SessionStatus::Failed;
531 assert!(!stream.is_active());
532 }
533
534 #[test]
535 fn test_event_timestamp() {
536 let mut stream = SessionStream::new("task-1", "test");
537
538 std::thread::sleep(std::time::Duration::from_millis(10));
540
541 stream.push_event(StreamEvent::text_delta("Hello"));
542 assert!(stream.events[0].timestamp_ms > 0);
543 }
544
545 #[test]
548 fn test_store_create_session() {
549 let store = StreamStore::new();
550 let key = store.create_session("task-1", "phase-a");
551
552 assert_eq!(key, "task-1");
553 assert!(store.has_session("task-1"));
554 }
555
556 #[test]
557 fn test_store_push_event() {
558 let store = StreamStore::new();
559 store.create_session("task-1", "phase-a");
560 store.push_event("task-1", StreamEvent::text_delta("Hello\n"));
561
562 let output = store.get_output("task-1", 100);
563 assert_eq!(output.len(), 1);
564 assert_eq!(output[0], "Hello");
565 }
566
567 #[test]
568 fn test_store_set_session_id() {
569 let store = StreamStore::new();
570 store.create_session("task-1", "phase-a");
571 store.set_session_id("task-1", "sess-xyz");
572
573 let session_id = store.get_session_id("task-1");
574 assert_eq!(session_id, Some("sess-xyz".to_string()));
575 }
576
577 #[test]
578 fn test_store_set_pid() {
579 let store = StreamStore::new();
580 store.create_session("task-1", "phase-a");
581 store.set_pid("task-1", 12345);
582
583 assert!(store.has_session("task-1"));
585 }
586
587 #[test]
588 fn test_store_get_status() {
589 let store = StreamStore::new();
590 store.create_session("task-1", "phase-a");
591
592 assert_eq!(store.get_status("task-1"), Some(SessionStatus::Starting));
593
594 store.push_event("task-1", StreamEvent::complete(true));
595 assert_eq!(store.get_status("task-1"), Some(SessionStatus::Completed));
596 }
597
598 #[test]
599 fn test_store_active_tasks() {
600 let store = StreamStore::new();
601 store.create_session("task-1", "phase-a");
602 store.create_session("task-2", "phase-a");
603 store.push_event("task-2", StreamEvent::complete(true));
604
605 let active = store.active_tasks();
606 assert_eq!(active.len(), 1);
607 assert!(active.contains(&"task-1".to_string()));
608 }
609
610 #[test]
611 fn test_store_all_tasks() {
612 let store = StreamStore::new();
613 store.create_session("task-1", "phase-a");
614 store.create_session("task-2", "phase-b");
615
616 let all = store.all_tasks();
617 assert_eq!(all.len(), 2);
618 }
619
620 #[test]
621 fn test_store_remove_session() {
622 let store = StreamStore::new();
623 store.create_session("task-1", "phase-a");
624 assert!(store.has_session("task-1"));
625
626 let removed = store.remove_session("task-1");
627 assert!(removed.is_some());
628 assert!(!store.has_session("task-1"));
629 }
630
631 #[test]
632 fn test_store_session_stats() {
633 let store = StreamStore::new();
634 store.create_session("task-1", "phase-a");
635 store.push_event("task-1", StreamEvent::text_delta("Line 1\n"));
636 store.push_event("task-1", StreamEvent::text_delta("Line 2\n"));
637
638 let stats = store.session_stats("task-1");
639 assert!(stats.is_some());
640 let (events, lines) = stats.unwrap();
641 assert_eq!(events, 2);
642 assert_eq!(lines, 2);
643 }
644
645 #[test]
646 fn test_store_nonexistent_session() {
647 let store = StreamStore::new();
648
649 assert_eq!(store.get_output("nonexistent", 100), Vec::<String>::new());
650 assert_eq!(store.get_status("nonexistent"), None);
651 assert_eq!(store.get_session_id("nonexistent"), None);
652 }
653
654 #[test]
655 fn test_store_thread_safety() {
656 use std::sync::Arc;
657 use std::thread;
658
659 let store = Arc::new(StreamStore::new());
660 store.create_session("task-1", "phase-a");
661
662 let handles: Vec<_> = (0..10)
663 .map(|i| {
664 let store = Arc::clone(&store);
665 thread::spawn(move || {
666 for j in 0..100 {
667 store.push_event(
668 "task-1",
669 StreamEvent::text_delta(&format!("Thread {} line {}\n", i, j)),
670 );
671 }
672 })
673 })
674 .collect();
675
676 for handle in handles {
677 handle.join().unwrap();
678 }
679
680 let stats = store.session_stats("task-1").unwrap();
681 assert_eq!(stats.0, 1000); assert_eq!(stats.1, 1000); }
684
685 #[test]
686 fn test_memory_limit_output_lines() {
687 let mut stream = SessionStream::new("task-1", "test");
688
689 for i in 0..MAX_OUTPUT_LINES + 1000 {
691 stream.push_event(StreamEvent::text_delta(&format!("Line {}\n", i)));
692 }
693
694 assert!(stream.output_lines.len() <= MAX_OUTPUT_LINES);
696 }
697
698 #[test]
699 fn test_memory_limit_events() {
700 let mut stream = SessionStream::new("task-1", "test");
701
702 for i in 0..MAX_EVENTS + 1000 {
704 stream.push_event(StreamEvent::text_delta(&format!("{}", i)));
705 }
706
707 assert!(stream.events.len() <= MAX_EVENTS);
709 }
710
711 #[test]
712 fn test_save_and_load_session_metadata() {
713 let temp_dir = std::env::temp_dir().join(format!("scud_test_{}", std::process::id()));
714 std::fs::create_dir_all(&temp_dir).unwrap();
715
716 let store = StreamStore::new();
717 store.create_session("task-1", "phase-a");
718 store.set_session_id("task-1", "sess-abc123");
719 store.set_pid("task-1", 12345);
720
721 store.save_session_metadata("task-1", &temp_dir).unwrap();
723
724 let metadata_file = temp_dir.join(".scud").join("headless").join("task-1.json");
726 assert!(metadata_file.exists());
727
728 let loaded = StreamStore::load_session_metadata("task-1", &temp_dir).unwrap();
730 assert_eq!(loaded, Some("sess-abc123".to_string()));
731
732 std::fs::remove_dir_all(&temp_dir).ok();
734 }
735
736 #[test]
737 fn test_load_nonexistent_metadata() {
738 let temp_dir = std::env::temp_dir().join(format!("scud_test_ne_{}", std::process::id()));
739 let loaded = StreamStore::load_session_metadata("nonexistent", &temp_dir).unwrap();
740 assert_eq!(loaded, None);
741 }
742
743 #[test]
744 fn test_get_session_id_empty_string() {
745 let store = StreamStore::new();
746 store.create_session("task-1", "phase-a");
747 let session_id = store.get_session_id("task-1");
751 assert_eq!(session_id, None);
752 }
753}