Skip to main content

scud/commands/spawn/headless/
store.rs

1//! In-memory storage for streaming agent output
2//!
3//! Provides thread-safe storage for multiple headless agent sessions,
4//! with methods for event storage, output rendering, and session management.
5
6use anyhow::Result;
7use std::collections::HashMap;
8use std::path::Path;
9use std::sync::{Arc, RwLock};
10use std::time::Instant;
11
12use super::events::{StreamEvent, StreamEventKind};
13
14/// Maximum number of output lines to retain per session (memory limit)
15const MAX_OUTPUT_LINES: usize = 10_000;
16
17/// Maximum number of events to retain per session (memory limit)
18const MAX_EVENTS: usize = 50_000;
19
20/// Status of a headless session
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub enum SessionStatus {
23    Starting,
24    Running,
25    Completed,
26    Failed,
27}
28
29/// Stream data for a single agent session
30#[derive(Debug)]
31pub struct SessionStream {
32    /// Unique session ID (from harness)
33    pub session_id: String,
34    /// Associated task ID
35    pub task_id: String,
36    /// Tag/phase
37    pub tag: String,
38    /// All events received (bounded by MAX_EVENTS)
39    pub events: Vec<StreamEvent>,
40    /// Rendered output lines for display (bounded by MAX_OUTPUT_LINES)
41    pub output_lines: Vec<String>,
42    /// Current status
43    pub status: SessionStatus,
44    /// When the session started
45    pub started_at: Instant,
46    /// Process ID (for interruption)
47    pub pid: Option<u32>,
48    /// Partial line buffer for incomplete text deltas
49    partial_line: String,
50}
51
52impl SessionStream {
53    pub fn new(task_id: &str, tag: &str) -> Self {
54        Self {
55            session_id: String::new(),
56            task_id: task_id.to_string(),
57            tag: tag.to_string(),
58            events: Vec::new(),
59            output_lines: Vec::new(),
60            status: SessionStatus::Starting,
61            started_at: Instant::now(),
62            pid: None,
63            partial_line: String::new(),
64        }
65    }
66
67    /// Add an event and update output lines
68    pub fn push_event(&mut self, mut event: StreamEvent) {
69        event.timestamp_ms = self.started_at.elapsed().as_millis() as u64;
70
71        // Update output lines based on event
72        match &event.kind {
73            StreamEventKind::TextDelta { text } => {
74                self.append_text(text);
75            }
76            StreamEventKind::ToolStart {
77                tool_name,
78                input_summary,
79                ..
80            } => {
81                // Flush any partial line first
82                self.flush_partial_line();
83                self.push_line(format!(">> {} {}", tool_name, input_summary));
84            }
85            StreamEventKind::ToolResult {
86                tool_name, success, ..
87            } => {
88                self.flush_partial_line();
89                let status = if *success { "ok" } else { "failed" };
90                self.push_line(format!("<< {} {}", tool_name, status));
91            }
92            StreamEventKind::Complete { success } => {
93                self.flush_partial_line();
94                self.status = if *success {
95                    SessionStatus::Completed
96                } else {
97                    SessionStatus::Failed
98                };
99            }
100            StreamEventKind::Error { message } => {
101                self.flush_partial_line();
102                self.push_line(format!("ERROR: {}", message));
103                self.status = SessionStatus::Failed;
104            }
105            StreamEventKind::SessionAssigned { session_id } => {
106                self.session_id = session_id.clone();
107                self.status = SessionStatus::Running;
108            }
109        }
110
111        // Transition from Starting to Running on first meaningful event
112        if matches!(self.status, SessionStatus::Starting)
113            && matches!(
114                event.kind,
115                StreamEventKind::TextDelta { .. }
116                    | StreamEventKind::ToolStart { .. }
117                    | StreamEventKind::ToolResult { .. }
118            )
119        {
120            self.status = SessionStatus::Running;
121        }
122
123        // Store event with memory limit
124        if self.events.len() >= MAX_EVENTS {
125            // Remove oldest 10% when limit reached
126            let drain_count = MAX_EVENTS / 10;
127            self.events.drain(0..drain_count);
128        }
129        self.events.push(event);
130    }
131
132    /// Append text, handling newlines properly
133    fn append_text(&mut self, text: &str) {
134        for ch in text.chars() {
135            if ch == '\n' {
136                // Complete the current line and start a new one
137                let line = std::mem::take(&mut self.partial_line);
138                self.push_line(line);
139            } else {
140                self.partial_line.push(ch);
141            }
142        }
143    }
144
145    /// Flush any remaining partial line as a complete line
146    fn flush_partial_line(&mut self) {
147        if !self.partial_line.is_empty() {
148            let line = std::mem::take(&mut self.partial_line);
149            self.push_line(line);
150        }
151    }
152
153    /// Push a line to output with memory limit
154    fn push_line(&mut self, line: String) {
155        if self.output_lines.len() >= MAX_OUTPUT_LINES {
156            // Remove oldest 10% when limit reached
157            let drain_count = MAX_OUTPUT_LINES / 10;
158            self.output_lines.drain(0..drain_count);
159        }
160        self.output_lines.push(line);
161    }
162
163    /// Get the last N output lines
164    pub fn tail(&self, n: usize) -> &[String] {
165        let start = self.output_lines.len().saturating_sub(n);
166        &self.output_lines[start..]
167    }
168
169    /// Get all output lines including any partial line in progress
170    pub fn get_all_output(&self) -> Vec<String> {
171        let mut lines = self.output_lines.clone();
172        if !self.partial_line.is_empty() {
173            lines.push(self.partial_line.clone());
174        }
175        lines
176    }
177
178    /// Check if session is still active
179    pub fn is_active(&self) -> bool {
180        matches!(
181            self.status,
182            SessionStatus::Starting | SessionStatus::Running
183        )
184    }
185
186    /// Get the event count
187    pub fn event_count(&self) -> usize {
188        self.events.len()
189    }
190
191    /// Get the output line count
192    pub fn line_count(&self) -> usize {
193        self.output_lines.len()
194    }
195}
196
197/// Thread-safe store for multiple agent sessions
198#[derive(Debug, Clone, Default)]
199pub struct StreamStore {
200    sessions: Arc<RwLock<HashMap<String, SessionStream>>>,
201}
202
203impl StreamStore {
204    pub fn new() -> Self {
205        Self::default()
206    }
207
208    /// Create a new session for a task
209    pub fn create_session(&self, task_id: &str, tag: &str) -> String {
210        let mut sessions = self.sessions.write().unwrap();
211        let stream = SessionStream::new(task_id, tag);
212        let key = task_id.to_string();
213        sessions.insert(key.clone(), stream);
214        key
215    }
216
217    /// Push an event to a session
218    pub fn push_event(&self, task_id: &str, event: StreamEvent) {
219        let mut sessions = self.sessions.write().unwrap();
220        if let Some(stream) = sessions.get_mut(task_id) {
221            stream.push_event(event);
222        }
223    }
224
225    /// Set the harness session ID for a task
226    pub fn set_session_id(&self, task_id: &str, session_id: &str) {
227        let mut sessions = self.sessions.write().unwrap();
228        if let Some(stream) = sessions.get_mut(task_id) {
229            stream.session_id = session_id.to_string();
230            stream.status = SessionStatus::Running;
231        }
232    }
233
234    /// Set the process ID for a task
235    pub fn set_pid(&self, task_id: &str, pid: u32) {
236        let mut sessions = self.sessions.write().unwrap();
237        if let Some(stream) = sessions.get_mut(task_id) {
238            stream.pid = Some(pid);
239        }
240    }
241
242    /// Get the process ID for a task
243    pub fn get_pid(&self, task_id: &str) -> Option<u32> {
244        let sessions = self.sessions.read().unwrap();
245        sessions.get(task_id).and_then(|s| s.pid)
246    }
247
248    /// Get output lines for a task
249    pub fn get_output(&self, task_id: &str, limit: usize) -> Vec<String> {
250        let sessions = self.sessions.read().unwrap();
251        sessions
252            .get(task_id)
253            .map(|s| s.tail(limit).to_vec())
254            .unwrap_or_default()
255    }
256
257    /// Get all output lines for a task, including any partial line
258    pub fn get_all_output(&self, task_id: &str) -> Vec<String> {
259        let sessions = self.sessions.read().unwrap();
260        sessions
261            .get(task_id)
262            .map(|s| s.get_all_output())
263            .unwrap_or_default()
264    }
265
266    /// Get session status
267    pub fn get_status(&self, task_id: &str) -> Option<SessionStatus> {
268        let sessions = self.sessions.read().unwrap();
269        sessions.get(task_id).map(|s| s.status.clone())
270    }
271
272    /// Get harness session ID for continuation
273    pub fn get_session_id(&self, task_id: &str) -> Option<String> {
274        let sessions = self.sessions.read().unwrap();
275        sessions
276            .get(task_id)
277            .filter(|s| !s.session_id.is_empty())
278            .map(|s| s.session_id.clone())
279    }
280
281    /// List all active task IDs
282    pub fn active_tasks(&self) -> Vec<String> {
283        let sessions = self.sessions.read().unwrap();
284        sessions
285            .iter()
286            .filter(|(_, s)| s.is_active())
287            .map(|(k, _)| k.clone())
288            .collect()
289    }
290
291    /// Get all task IDs
292    pub fn all_tasks(&self) -> Vec<String> {
293        let sessions = self.sessions.read().unwrap();
294        sessions.keys().cloned().collect()
295    }
296
297    /// Check if a session exists
298    pub fn has_session(&self, task_id: &str) -> bool {
299        let sessions = self.sessions.read().unwrap();
300        sessions.contains_key(task_id)
301    }
302
303    /// Remove a session
304    pub fn remove_session(&self, task_id: &str) -> Option<SessionStream> {
305        let mut sessions = self.sessions.write().unwrap();
306        sessions.remove(task_id)
307    }
308
309    /// Get session statistics
310    pub fn session_stats(&self, task_id: &str) -> Option<(usize, usize)> {
311        let sessions = self.sessions.read().unwrap();
312        sessions
313            .get(task_id)
314            .map(|s| (s.event_count(), s.line_count()))
315    }
316
317    /// Get elapsed seconds since session started
318    pub fn get_elapsed_secs(&self, task_id: &str) -> Option<u64> {
319        let sessions = self.sessions.read().unwrap();
320        sessions
321            .get(task_id)
322            .map(|s| s.started_at.elapsed().as_secs())
323    }
324
325    /// Get last tool activity (most recent tool_start or tool_result line)
326    pub fn get_last_tool_line(&self, task_id: &str) -> Option<String> {
327        let sessions = self.sessions.read().unwrap();
328        sessions.get(task_id).and_then(|s| {
329            s.output_lines
330                .iter()
331                .rev()
332                .find(|l| l.starts_with(">>") || l.starts_with("<<"))
333                .cloned()
334        })
335    }
336
337    /// Save session metadata for later continuation
338    ///
339    /// Persists the session ID, task ID, tag, and PID to a JSON file
340    /// in the `.scud/headless/` directory for use by the `attach` command.
341    pub fn save_session_metadata(&self, task_id: &str, project_root: &Path) -> Result<()> {
342        let sessions = self.sessions.read().unwrap();
343        let session = sessions
344            .get(task_id)
345            .ok_or_else(|| anyhow::anyhow!("Session not found: {}", task_id))?;
346
347        let metadata_dir = project_root.join(".scud").join("headless");
348        std::fs::create_dir_all(&metadata_dir)?;
349
350        let metadata = serde_json::json!({
351            "task_id": session.task_id,
352            "session_id": session.session_id,
353            "tag": session.tag,
354            "pid": session.pid,
355            "status": format!("{:?}", session.status),
356            "started_at_ms": session.started_at.elapsed().as_millis() as u64,
357        });
358
359        let metadata_file = metadata_dir.join(format!("{}.json", task_id));
360        std::fs::write(&metadata_file, serde_json::to_string_pretty(&metadata)?)?;
361
362        Ok(())
363    }
364
365    /// Load session metadata from disk
366    ///
367    /// Returns the stored session_id if available for continuation.
368    pub fn load_session_metadata(task_id: &str, project_root: &Path) -> Result<Option<String>> {
369        let metadata_file = project_root
370            .join(".scud")
371            .join("headless")
372            .join(format!("{}.json", task_id));
373
374        if !metadata_file.exists() {
375            return Ok(None);
376        }
377
378        let content = std::fs::read_to_string(&metadata_file)?;
379        let data: serde_json::Value = serde_json::from_str(&content)?;
380
381        Ok(data
382            .get("session_id")
383            .and_then(|v| v.as_str())
384            .filter(|s| !s.is_empty())
385            .map(|s| s.to_string()))
386    }
387}
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392
393    #[test]
394    fn test_session_stream_new() {
395        let stream = SessionStream::new("task-1", "phase-a");
396        assert_eq!(stream.task_id, "task-1");
397        assert_eq!(stream.tag, "phase-a");
398        assert_eq!(stream.status, SessionStatus::Starting);
399        assert!(stream.session_id.is_empty());
400        assert!(stream.events.is_empty());
401        assert!(stream.output_lines.is_empty());
402    }
403
404    #[test]
405    fn test_push_text_delta_single_line() {
406        let mut stream = SessionStream::new("task-1", "test");
407        stream.push_event(StreamEvent::text_delta("Hello world"));
408
409        // Text without newline stays in partial buffer
410        assert_eq!(stream.output_lines.len(), 0);
411        assert_eq!(stream.partial_line, "Hello world");
412        assert_eq!(stream.events.len(), 1);
413    }
414
415    #[test]
416    fn test_push_text_delta_with_newline() {
417        let mut stream = SessionStream::new("task-1", "test");
418        stream.push_event(StreamEvent::text_delta("Hello\nWorld\n"));
419
420        assert_eq!(stream.output_lines.len(), 2);
421        assert_eq!(stream.output_lines[0], "Hello");
422        assert_eq!(stream.output_lines[1], "World");
423        assert!(stream.partial_line.is_empty());
424    }
425
426    #[test]
427    fn test_push_text_delta_incremental() {
428        let mut stream = SessionStream::new("task-1", "test");
429        stream.push_event(StreamEvent::text_delta("Hel"));
430        stream.push_event(StreamEvent::text_delta("lo "));
431        stream.push_event(StreamEvent::text_delta("world\n"));
432
433        assert_eq!(stream.output_lines.len(), 1);
434        assert_eq!(stream.output_lines[0], "Hello world");
435    }
436
437    #[test]
438    fn test_push_tool_start() {
439        let mut stream = SessionStream::new("task-1", "test");
440        stream.push_event(StreamEvent::text_delta("Some text"));
441        stream.push_event(StreamEvent::tool_start("Read", "tool-1", "src/main.rs"));
442
443        // Tool start should flush partial line
444        assert_eq!(stream.output_lines.len(), 2);
445        assert_eq!(stream.output_lines[0], "Some text");
446        assert_eq!(stream.output_lines[1], ">> Read src/main.rs");
447    }
448
449    #[test]
450    fn test_push_tool_result() {
451        let mut stream = SessionStream::new("task-1", "test");
452        stream.push_event(StreamEvent::new(StreamEventKind::ToolResult {
453            tool_name: "Read".to_string(),
454            tool_id: "tool-1".to_string(),
455            success: true,
456        }));
457
458        assert_eq!(stream.output_lines.len(), 1);
459        assert_eq!(stream.output_lines[0], "<< Read ok");
460    }
461
462    #[test]
463    fn test_push_tool_result_failed() {
464        let mut stream = SessionStream::new("task-1", "test");
465        stream.push_event(StreamEvent::new(StreamEventKind::ToolResult {
466            tool_name: "Bash".to_string(),
467            tool_id: "tool-2".to_string(),
468            success: false,
469        }));
470
471        assert_eq!(stream.output_lines[0], "<< Bash failed");
472    }
473
474    #[test]
475    fn test_session_assigned() {
476        let mut stream = SessionStream::new("task-1", "test");
477        assert_eq!(stream.status, SessionStatus::Starting);
478
479        stream.push_event(StreamEvent::new(StreamEventKind::SessionAssigned {
480            session_id: "sess-abc123".to_string(),
481        }));
482
483        assert_eq!(stream.session_id, "sess-abc123");
484        assert_eq!(stream.status, SessionStatus::Running);
485    }
486
487    #[test]
488    fn test_complete_success() {
489        let mut stream = SessionStream::new("task-1", "test");
490        stream.push_event(StreamEvent::complete(true));
491
492        assert_eq!(stream.status, SessionStatus::Completed);
493    }
494
495    #[test]
496    fn test_complete_failure() {
497        let mut stream = SessionStream::new("task-1", "test");
498        stream.push_event(StreamEvent::complete(false));
499
500        assert_eq!(stream.status, SessionStatus::Failed);
501    }
502
503    #[test]
504    fn test_error_event() {
505        let mut stream = SessionStream::new("task-1", "test");
506        stream.push_event(StreamEvent::error("Something went wrong"));
507
508        assert_eq!(stream.status, SessionStatus::Failed);
509        assert_eq!(stream.output_lines[0], "ERROR: Something went wrong");
510    }
511
512    #[test]
513    fn test_tail() {
514        let mut stream = SessionStream::new("task-1", "test");
515        for i in 0..10 {
516            stream.push_event(StreamEvent::text_delta(&format!("Line {}\n", i)));
517        }
518
519        let last3 = stream.tail(3);
520        assert_eq!(last3.len(), 3);
521        assert_eq!(last3[0], "Line 7");
522        assert_eq!(last3[1], "Line 8");
523        assert_eq!(last3[2], "Line 9");
524    }
525
526    #[test]
527    fn test_tail_less_than_requested() {
528        let mut stream = SessionStream::new("task-1", "test");
529        stream.push_event(StreamEvent::text_delta("Only one\n"));
530
531        let last10 = stream.tail(10);
532        assert_eq!(last10.len(), 1);
533        assert_eq!(last10[0], "Only one");
534    }
535
536    #[test]
537    fn test_get_all_output_with_partial() {
538        let mut stream = SessionStream::new("task-1", "test");
539        stream.push_event(StreamEvent::text_delta("Complete line\n"));
540        stream.push_event(StreamEvent::text_delta("Partial"));
541
542        let output = stream.get_all_output();
543        assert_eq!(output.len(), 2);
544        assert_eq!(output[0], "Complete line");
545        assert_eq!(output[1], "Partial");
546    }
547
548    #[test]
549    fn test_is_active() {
550        let mut stream = SessionStream::new("task-1", "test");
551        assert!(stream.is_active()); // Starting
552
553        stream.status = SessionStatus::Running;
554        assert!(stream.is_active());
555
556        stream.status = SessionStatus::Completed;
557        assert!(!stream.is_active());
558
559        stream.status = SessionStatus::Failed;
560        assert!(!stream.is_active());
561    }
562
563    #[test]
564    fn test_event_timestamp() {
565        let mut stream = SessionStream::new("task-1", "test");
566
567        // Small sleep to ensure non-zero timestamp
568        std::thread::sleep(std::time::Duration::from_millis(10));
569
570        stream.push_event(StreamEvent::text_delta("Hello"));
571        assert!(stream.events[0].timestamp_ms > 0);
572    }
573
574    // StreamStore tests
575
576    #[test]
577    fn test_store_create_session() {
578        let store = StreamStore::new();
579        let key = store.create_session("task-1", "phase-a");
580
581        assert_eq!(key, "task-1");
582        assert!(store.has_session("task-1"));
583    }
584
585    #[test]
586    fn test_store_push_event() {
587        let store = StreamStore::new();
588        store.create_session("task-1", "phase-a");
589        store.push_event("task-1", StreamEvent::text_delta("Hello\n"));
590
591        let output = store.get_output("task-1", 100);
592        assert_eq!(output.len(), 1);
593        assert_eq!(output[0], "Hello");
594    }
595
596    #[test]
597    fn test_store_set_session_id() {
598        let store = StreamStore::new();
599        store.create_session("task-1", "phase-a");
600        store.set_session_id("task-1", "sess-xyz");
601
602        let session_id = store.get_session_id("task-1");
603        assert_eq!(session_id, Some("sess-xyz".to_string()));
604    }
605
606    #[test]
607    fn test_store_set_pid() {
608        let store = StreamStore::new();
609        store.create_session("task-1", "phase-a");
610        store.set_pid("task-1", 12345);
611
612        // Verify by checking stats or through save_session_metadata
613        assert!(store.has_session("task-1"));
614    }
615
616    #[test]
617    fn test_store_get_status() {
618        let store = StreamStore::new();
619        store.create_session("task-1", "phase-a");
620
621        assert_eq!(store.get_status("task-1"), Some(SessionStatus::Starting));
622
623        store.push_event("task-1", StreamEvent::complete(true));
624        assert_eq!(store.get_status("task-1"), Some(SessionStatus::Completed));
625    }
626
627    #[test]
628    fn test_store_active_tasks() {
629        let store = StreamStore::new();
630        store.create_session("task-1", "phase-a");
631        store.create_session("task-2", "phase-a");
632        store.push_event("task-2", StreamEvent::complete(true));
633
634        let active = store.active_tasks();
635        assert_eq!(active.len(), 1);
636        assert!(active.contains(&"task-1".to_string()));
637    }
638
639    #[test]
640    fn test_store_all_tasks() {
641        let store = StreamStore::new();
642        store.create_session("task-1", "phase-a");
643        store.create_session("task-2", "phase-b");
644
645        let all = store.all_tasks();
646        assert_eq!(all.len(), 2);
647    }
648
649    #[test]
650    fn test_store_remove_session() {
651        let store = StreamStore::new();
652        store.create_session("task-1", "phase-a");
653        assert!(store.has_session("task-1"));
654
655        let removed = store.remove_session("task-1");
656        assert!(removed.is_some());
657        assert!(!store.has_session("task-1"));
658    }
659
660    #[test]
661    fn test_store_session_stats() {
662        let store = StreamStore::new();
663        store.create_session("task-1", "phase-a");
664        store.push_event("task-1", StreamEvent::text_delta("Line 1\n"));
665        store.push_event("task-1", StreamEvent::text_delta("Line 2\n"));
666
667        let stats = store.session_stats("task-1");
668        assert!(stats.is_some());
669        let (events, lines) = stats.unwrap();
670        assert_eq!(events, 2);
671        assert_eq!(lines, 2);
672    }
673
674    #[test]
675    fn test_store_nonexistent_session() {
676        let store = StreamStore::new();
677
678        assert_eq!(store.get_output("nonexistent", 100), Vec::<String>::new());
679        assert_eq!(store.get_status("nonexistent"), None);
680        assert_eq!(store.get_session_id("nonexistent"), None);
681    }
682
683    #[test]
684    fn test_store_thread_safety() {
685        use std::sync::Arc;
686        use std::thread;
687
688        let store = Arc::new(StreamStore::new());
689        store.create_session("task-1", "phase-a");
690
691        let handles: Vec<_> = (0..10)
692            .map(|i| {
693                let store = Arc::clone(&store);
694                thread::spawn(move || {
695                    for j in 0..100 {
696                        store.push_event(
697                            "task-1",
698                            StreamEvent::text_delta(&format!("Thread {} line {}\n", i, j)),
699                        );
700                    }
701                })
702            })
703            .collect();
704
705        for handle in handles {
706            handle.join().unwrap();
707        }
708
709        let stats = store.session_stats("task-1").unwrap();
710        assert_eq!(stats.0, 1000); // 10 threads * 100 events
711        assert_eq!(stats.1, 1000); // 10 threads * 100 lines
712    }
713
714    #[test]
715    fn test_memory_limit_output_lines() {
716        let mut stream = SessionStream::new("task-1", "test");
717
718        // Push more than MAX_OUTPUT_LINES
719        for i in 0..MAX_OUTPUT_LINES + 1000 {
720            stream.push_event(StreamEvent::text_delta(&format!("Line {}\n", i)));
721        }
722
723        // Should have trimmed to within limits
724        assert!(stream.output_lines.len() <= MAX_OUTPUT_LINES);
725    }
726
727    #[test]
728    fn test_memory_limit_events() {
729        let mut stream = SessionStream::new("task-1", "test");
730
731        // Push more than MAX_EVENTS
732        for i in 0..MAX_EVENTS + 1000 {
733            stream.push_event(StreamEvent::text_delta(&format!("{}", i)));
734        }
735
736        // Should have trimmed to within limits
737        assert!(stream.events.len() <= MAX_EVENTS);
738    }
739
740    #[test]
741    fn test_save_and_load_session_metadata() {
742        let temp_dir = std::env::temp_dir().join(format!("scud_test_{}", std::process::id()));
743        std::fs::create_dir_all(&temp_dir).unwrap();
744
745        let store = StreamStore::new();
746        store.create_session("task-1", "phase-a");
747        store.set_session_id("task-1", "sess-abc123");
748        store.set_pid("task-1", 12345);
749
750        // Save metadata
751        store.save_session_metadata("task-1", &temp_dir).unwrap();
752
753        // Verify file exists
754        let metadata_file = temp_dir.join(".scud").join("headless").join("task-1.json");
755        assert!(metadata_file.exists());
756
757        // Load metadata
758        let loaded = StreamStore::load_session_metadata("task-1", &temp_dir).unwrap();
759        assert_eq!(loaded, Some("sess-abc123".to_string()));
760
761        // Cleanup
762        std::fs::remove_dir_all(&temp_dir).ok();
763    }
764
765    #[test]
766    fn test_load_nonexistent_metadata() {
767        let temp_dir = std::env::temp_dir().join(format!("scud_test_ne_{}", std::process::id()));
768        let loaded = StreamStore::load_session_metadata("nonexistent", &temp_dir).unwrap();
769        assert_eq!(loaded, None);
770    }
771
772    #[test]
773    fn test_get_session_id_empty_string() {
774        let store = StreamStore::new();
775        store.create_session("task-1", "phase-a");
776        // Session ID is empty by default
777
778        // Should return None for empty session ID
779        let session_id = store.get_session_id("task-1");
780        assert_eq!(session_id, None);
781    }
782}