Skip to main content

scud/commands/spawn/headless/
store.rs

1//! In-memory storage for streaming agent output
2//!
3//! Provides thread-safe storage for multiple headless agent sessions,
4//! with methods for event storage, output rendering, and session management.
5
6use anyhow::Result;
7use std::collections::HashMap;
8use std::path::Path;
9use std::sync::{Arc, RwLock};
10use std::time::Instant;
11
12use super::events::{StreamEvent, StreamEventKind};
13
14/// Maximum number of output lines to retain per session (memory limit)
15const MAX_OUTPUT_LINES: usize = 10_000;
16
17/// Maximum number of events to retain per session (memory limit)
18const MAX_EVENTS: usize = 50_000;
19
20/// Status of a headless session
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub enum SessionStatus {
23    Starting,
24    Running,
25    Completed,
26    Failed,
27}
28
29/// Stream data for a single agent session
30#[derive(Debug)]
31pub struct SessionStream {
32    /// Unique session ID (from harness)
33    pub session_id: String,
34    /// Associated task ID
35    pub task_id: String,
36    /// Tag/phase
37    pub tag: String,
38    /// All events received (bounded by MAX_EVENTS)
39    pub events: Vec<StreamEvent>,
40    /// Rendered output lines for display (bounded by MAX_OUTPUT_LINES)
41    pub output_lines: Vec<String>,
42    /// Current status
43    pub status: SessionStatus,
44    /// When the session started
45    pub started_at: Instant,
46    /// Process ID (for interruption)
47    pub pid: Option<u32>,
48    /// Partial line buffer for incomplete text deltas
49    partial_line: String,
50}
51
52impl SessionStream {
53    pub fn new(task_id: &str, tag: &str) -> Self {
54        Self {
55            session_id: String::new(),
56            task_id: task_id.to_string(),
57            tag: tag.to_string(),
58            events: Vec::new(),
59            output_lines: Vec::new(),
60            status: SessionStatus::Starting,
61            started_at: Instant::now(),
62            pid: None,
63            partial_line: String::new(),
64        }
65    }
66
67    /// Add an event and update output lines
68    pub fn push_event(&mut self, mut event: StreamEvent) {
69        event.timestamp_ms = self.started_at.elapsed().as_millis() as u64;
70
71        // Update output lines based on event
72        match &event.kind {
73            StreamEventKind::TextDelta { text } => {
74                self.append_text(text);
75            }
76            StreamEventKind::ToolStart {
77                tool_name,
78                input_summary,
79                ..
80            } => {
81                // Flush any partial line first
82                self.flush_partial_line();
83                self.push_line(format!(">> {} {}", tool_name, input_summary));
84            }
85            StreamEventKind::ToolResult {
86                tool_name, success, ..
87            } => {
88                self.flush_partial_line();
89                let status = if *success { "ok" } else { "failed" };
90                self.push_line(format!("<< {} {}", tool_name, status));
91            }
92            StreamEventKind::Complete { success } => {
93                self.flush_partial_line();
94                self.status = if *success {
95                    SessionStatus::Completed
96                } else {
97                    SessionStatus::Failed
98                };
99            }
100            StreamEventKind::Error { message } => {
101                self.flush_partial_line();
102                self.push_line(format!("ERROR: {}", message));
103                self.status = SessionStatus::Failed;
104            }
105            StreamEventKind::SessionAssigned { session_id } => {
106                self.session_id = session_id.clone();
107                self.status = SessionStatus::Running;
108            }
109        }
110
111        // Transition from Starting to Running on first meaningful event
112        if matches!(self.status, SessionStatus::Starting)
113            && matches!(
114                event.kind,
115                StreamEventKind::TextDelta { .. }
116                    | StreamEventKind::ToolStart { .. }
117                    | StreamEventKind::ToolResult { .. }
118            )
119        {
120            self.status = SessionStatus::Running;
121        }
122
123        // Store event with memory limit
124        if self.events.len() >= MAX_EVENTS {
125            // Remove oldest 10% when limit reached
126            let drain_count = MAX_EVENTS / 10;
127            self.events.drain(0..drain_count);
128        }
129        self.events.push(event);
130    }
131
132    /// Append text, handling newlines properly
133    fn append_text(&mut self, text: &str) {
134        for ch in text.chars() {
135            if ch == '\n' {
136                // Complete the current line and start a new one
137                let line = std::mem::take(&mut self.partial_line);
138                self.push_line(line);
139            } else {
140                self.partial_line.push(ch);
141            }
142        }
143    }
144
145    /// Flush any remaining partial line as a complete line
146    fn flush_partial_line(&mut self) {
147        if !self.partial_line.is_empty() {
148            let line = std::mem::take(&mut self.partial_line);
149            self.push_line(line);
150        }
151    }
152
153    /// Push a line to output with memory limit
154    fn push_line(&mut self, line: String) {
155        if self.output_lines.len() >= MAX_OUTPUT_LINES {
156            // Remove oldest 10% when limit reached
157            let drain_count = MAX_OUTPUT_LINES / 10;
158            self.output_lines.drain(0..drain_count);
159        }
160        self.output_lines.push(line);
161    }
162
163    /// Get the last N output lines
164    pub fn tail(&self, n: usize) -> &[String] {
165        let start = self.output_lines.len().saturating_sub(n);
166        &self.output_lines[start..]
167    }
168
169    /// Get all output lines including any partial line in progress
170    pub fn get_all_output(&self) -> Vec<String> {
171        let mut lines = self.output_lines.clone();
172        if !self.partial_line.is_empty() {
173            lines.push(self.partial_line.clone());
174        }
175        lines
176    }
177
178    /// Check if session is still active
179    pub fn is_active(&self) -> bool {
180        matches!(self.status, SessionStatus::Starting | SessionStatus::Running)
181    }
182
183    /// Get the event count
184    pub fn event_count(&self) -> usize {
185        self.events.len()
186    }
187
188    /// Get the output line count
189    pub fn line_count(&self) -> usize {
190        self.output_lines.len()
191    }
192}
193
194/// Thread-safe store for multiple agent sessions
195#[derive(Debug, Clone, Default)]
196pub struct StreamStore {
197    sessions: Arc<RwLock<HashMap<String, SessionStream>>>,
198}
199
200impl StreamStore {
201    pub fn new() -> Self {
202        Self::default()
203    }
204
205    /// Create a new session for a task
206    pub fn create_session(&self, task_id: &str, tag: &str) -> String {
207        let mut sessions = self.sessions.write().unwrap();
208        let stream = SessionStream::new(task_id, tag);
209        let key = task_id.to_string();
210        sessions.insert(key.clone(), stream);
211        key
212    }
213
214    /// Push an event to a session
215    pub fn push_event(&self, task_id: &str, event: StreamEvent) {
216        let mut sessions = self.sessions.write().unwrap();
217        if let Some(stream) = sessions.get_mut(task_id) {
218            stream.push_event(event);
219        }
220    }
221
222    /// Set the harness session ID for a task
223    pub fn set_session_id(&self, task_id: &str, session_id: &str) {
224        let mut sessions = self.sessions.write().unwrap();
225        if let Some(stream) = sessions.get_mut(task_id) {
226            stream.session_id = session_id.to_string();
227            stream.status = SessionStatus::Running;
228        }
229    }
230
231    /// Set the process ID for a task
232    pub fn set_pid(&self, task_id: &str, pid: u32) {
233        let mut sessions = self.sessions.write().unwrap();
234        if let Some(stream) = sessions.get_mut(task_id) {
235            stream.pid = Some(pid);
236        }
237    }
238
239    /// Get the process ID for a task
240    pub fn get_pid(&self, task_id: &str) -> Option<u32> {
241        let sessions = self.sessions.read().unwrap();
242        sessions.get(task_id).and_then(|s| s.pid)
243    }
244
245    /// Get output lines for a task
246    pub fn get_output(&self, task_id: &str, limit: usize) -> Vec<String> {
247        let sessions = self.sessions.read().unwrap();
248        sessions
249            .get(task_id)
250            .map(|s| s.tail(limit).to_vec())
251            .unwrap_or_default()
252    }
253
254    /// Get all output lines for a task, including any partial line
255    pub fn get_all_output(&self, task_id: &str) -> Vec<String> {
256        let sessions = self.sessions.read().unwrap();
257        sessions
258            .get(task_id)
259            .map(|s| s.get_all_output())
260            .unwrap_or_default()
261    }
262
263    /// Get session status
264    pub fn get_status(&self, task_id: &str) -> Option<SessionStatus> {
265        let sessions = self.sessions.read().unwrap();
266        sessions.get(task_id).map(|s| s.status.clone())
267    }
268
269    /// Get harness session ID for continuation
270    pub fn get_session_id(&self, task_id: &str) -> Option<String> {
271        let sessions = self.sessions.read().unwrap();
272        sessions
273            .get(task_id)
274            .filter(|s| !s.session_id.is_empty())
275            .map(|s| s.session_id.clone())
276    }
277
278    /// List all active task IDs
279    pub fn active_tasks(&self) -> Vec<String> {
280        let sessions = self.sessions.read().unwrap();
281        sessions
282            .iter()
283            .filter(|(_, s)| s.is_active())
284            .map(|(k, _)| k.clone())
285            .collect()
286    }
287
288    /// Get all task IDs
289    pub fn all_tasks(&self) -> Vec<String> {
290        let sessions = self.sessions.read().unwrap();
291        sessions.keys().cloned().collect()
292    }
293
294    /// Check if a session exists
295    pub fn has_session(&self, task_id: &str) -> bool {
296        let sessions = self.sessions.read().unwrap();
297        sessions.contains_key(task_id)
298    }
299
300    /// Remove a session
301    pub fn remove_session(&self, task_id: &str) -> Option<SessionStream> {
302        let mut sessions = self.sessions.write().unwrap();
303        sessions.remove(task_id)
304    }
305
306    /// Get session statistics
307    pub fn session_stats(&self, task_id: &str) -> Option<(usize, usize)> {
308        let sessions = self.sessions.read().unwrap();
309        sessions
310            .get(task_id)
311            .map(|s| (s.event_count(), s.line_count()))
312    }
313
314    /// Get elapsed seconds since session started
315    pub fn get_elapsed_secs(&self, task_id: &str) -> Option<u64> {
316        let sessions = self.sessions.read().unwrap();
317        sessions
318            .get(task_id)
319            .map(|s| s.started_at.elapsed().as_secs())
320    }
321
322    /// Get last tool activity (most recent tool_start or tool_result line)
323    pub fn get_last_tool_line(&self, task_id: &str) -> Option<String> {
324        let sessions = self.sessions.read().unwrap();
325        sessions.get(task_id).and_then(|s| {
326            s.output_lines
327                .iter()
328                .rev()
329                .find(|l| l.starts_with(">>") || l.starts_with("<<"))
330                .cloned()
331        })
332    }
333
334    /// Save session metadata for later continuation
335    ///
336    /// Persists the session ID, task ID, tag, and PID to a JSON file
337    /// in the `.scud/headless/` directory for use by the `attach` command.
338    pub fn save_session_metadata(&self, task_id: &str, project_root: &Path) -> Result<()> {
339        let sessions = self.sessions.read().unwrap();
340        let session = sessions
341            .get(task_id)
342            .ok_or_else(|| anyhow::anyhow!("Session not found: {}", task_id))?;
343
344        let metadata_dir = project_root.join(".scud").join("headless");
345        std::fs::create_dir_all(&metadata_dir)?;
346
347        let metadata = serde_json::json!({
348            "task_id": session.task_id,
349            "session_id": session.session_id,
350            "tag": session.tag,
351            "pid": session.pid,
352            "status": format!("{:?}", session.status),
353            "started_at_ms": session.started_at.elapsed().as_millis() as u64,
354        });
355
356        let metadata_file = metadata_dir.join(format!("{}.json", task_id));
357        std::fs::write(&metadata_file, serde_json::to_string_pretty(&metadata)?)?;
358
359        Ok(())
360    }
361
362    /// Load session metadata from disk
363    ///
364    /// Returns the stored session_id if available for continuation.
365    pub fn load_session_metadata(task_id: &str, project_root: &Path) -> Result<Option<String>> {
366        let metadata_file = project_root
367            .join(".scud")
368            .join("headless")
369            .join(format!("{}.json", task_id));
370
371        if !metadata_file.exists() {
372            return Ok(None);
373        }
374
375        let content = std::fs::read_to_string(&metadata_file)?;
376        let data: serde_json::Value = serde_json::from_str(&content)?;
377
378        Ok(data
379            .get("session_id")
380            .and_then(|v| v.as_str())
381            .filter(|s| !s.is_empty())
382            .map(|s| s.to_string()))
383    }
384}
385
386#[cfg(test)]
387mod tests {
388    use super::*;
389
390    #[test]
391    fn test_session_stream_new() {
392        let stream = SessionStream::new("task-1", "phase-a");
393        assert_eq!(stream.task_id, "task-1");
394        assert_eq!(stream.tag, "phase-a");
395        assert_eq!(stream.status, SessionStatus::Starting);
396        assert!(stream.session_id.is_empty());
397        assert!(stream.events.is_empty());
398        assert!(stream.output_lines.is_empty());
399    }
400
401    #[test]
402    fn test_push_text_delta_single_line() {
403        let mut stream = SessionStream::new("task-1", "test");
404        stream.push_event(StreamEvent::text_delta("Hello world"));
405
406        // Text without newline stays in partial buffer
407        assert_eq!(stream.output_lines.len(), 0);
408        assert_eq!(stream.partial_line, "Hello world");
409        assert_eq!(stream.events.len(), 1);
410    }
411
412    #[test]
413    fn test_push_text_delta_with_newline() {
414        let mut stream = SessionStream::new("task-1", "test");
415        stream.push_event(StreamEvent::text_delta("Hello\nWorld\n"));
416
417        assert_eq!(stream.output_lines.len(), 2);
418        assert_eq!(stream.output_lines[0], "Hello");
419        assert_eq!(stream.output_lines[1], "World");
420        assert!(stream.partial_line.is_empty());
421    }
422
423    #[test]
424    fn test_push_text_delta_incremental() {
425        let mut stream = SessionStream::new("task-1", "test");
426        stream.push_event(StreamEvent::text_delta("Hel"));
427        stream.push_event(StreamEvent::text_delta("lo "));
428        stream.push_event(StreamEvent::text_delta("world\n"));
429
430        assert_eq!(stream.output_lines.len(), 1);
431        assert_eq!(stream.output_lines[0], "Hello world");
432    }
433
434    #[test]
435    fn test_push_tool_start() {
436        let mut stream = SessionStream::new("task-1", "test");
437        stream.push_event(StreamEvent::text_delta("Some text"));
438        stream.push_event(StreamEvent::tool_start("Read", "tool-1", "src/main.rs"));
439
440        // Tool start should flush partial line
441        assert_eq!(stream.output_lines.len(), 2);
442        assert_eq!(stream.output_lines[0], "Some text");
443        assert_eq!(stream.output_lines[1], ">> Read src/main.rs");
444    }
445
446    #[test]
447    fn test_push_tool_result() {
448        let mut stream = SessionStream::new("task-1", "test");
449        stream.push_event(StreamEvent::new(StreamEventKind::ToolResult {
450            tool_name: "Read".to_string(),
451            tool_id: "tool-1".to_string(),
452            success: true,
453        }));
454
455        assert_eq!(stream.output_lines.len(), 1);
456        assert_eq!(stream.output_lines[0], "<< Read ok");
457    }
458
459    #[test]
460    fn test_push_tool_result_failed() {
461        let mut stream = SessionStream::new("task-1", "test");
462        stream.push_event(StreamEvent::new(StreamEventKind::ToolResult {
463            tool_name: "Bash".to_string(),
464            tool_id: "tool-2".to_string(),
465            success: false,
466        }));
467
468        assert_eq!(stream.output_lines[0], "<< Bash failed");
469    }
470
471    #[test]
472    fn test_session_assigned() {
473        let mut stream = SessionStream::new("task-1", "test");
474        assert_eq!(stream.status, SessionStatus::Starting);
475
476        stream.push_event(StreamEvent::new(StreamEventKind::SessionAssigned {
477            session_id: "sess-abc123".to_string(),
478        }));
479
480        assert_eq!(stream.session_id, "sess-abc123");
481        assert_eq!(stream.status, SessionStatus::Running);
482    }
483
484    #[test]
485    fn test_complete_success() {
486        let mut stream = SessionStream::new("task-1", "test");
487        stream.push_event(StreamEvent::complete(true));
488
489        assert_eq!(stream.status, SessionStatus::Completed);
490    }
491
492    #[test]
493    fn test_complete_failure() {
494        let mut stream = SessionStream::new("task-1", "test");
495        stream.push_event(StreamEvent::complete(false));
496
497        assert_eq!(stream.status, SessionStatus::Failed);
498    }
499
500    #[test]
501    fn test_error_event() {
502        let mut stream = SessionStream::new("task-1", "test");
503        stream.push_event(StreamEvent::error("Something went wrong"));
504
505        assert_eq!(stream.status, SessionStatus::Failed);
506        assert_eq!(stream.output_lines[0], "ERROR: Something went wrong");
507    }
508
509    #[test]
510    fn test_tail() {
511        let mut stream = SessionStream::new("task-1", "test");
512        for i in 0..10 {
513            stream.push_event(StreamEvent::text_delta(&format!("Line {}\n", i)));
514        }
515
516        let last3 = stream.tail(3);
517        assert_eq!(last3.len(), 3);
518        assert_eq!(last3[0], "Line 7");
519        assert_eq!(last3[1], "Line 8");
520        assert_eq!(last3[2], "Line 9");
521    }
522
523    #[test]
524    fn test_tail_less_than_requested() {
525        let mut stream = SessionStream::new("task-1", "test");
526        stream.push_event(StreamEvent::text_delta("Only one\n"));
527
528        let last10 = stream.tail(10);
529        assert_eq!(last10.len(), 1);
530        assert_eq!(last10[0], "Only one");
531    }
532
533    #[test]
534    fn test_get_all_output_with_partial() {
535        let mut stream = SessionStream::new("task-1", "test");
536        stream.push_event(StreamEvent::text_delta("Complete line\n"));
537        stream.push_event(StreamEvent::text_delta("Partial"));
538
539        let output = stream.get_all_output();
540        assert_eq!(output.len(), 2);
541        assert_eq!(output[0], "Complete line");
542        assert_eq!(output[1], "Partial");
543    }
544
545    #[test]
546    fn test_is_active() {
547        let mut stream = SessionStream::new("task-1", "test");
548        assert!(stream.is_active()); // Starting
549
550        stream.status = SessionStatus::Running;
551        assert!(stream.is_active());
552
553        stream.status = SessionStatus::Completed;
554        assert!(!stream.is_active());
555
556        stream.status = SessionStatus::Failed;
557        assert!(!stream.is_active());
558    }
559
560    #[test]
561    fn test_event_timestamp() {
562        let mut stream = SessionStream::new("task-1", "test");
563
564        // Small sleep to ensure non-zero timestamp
565        std::thread::sleep(std::time::Duration::from_millis(10));
566
567        stream.push_event(StreamEvent::text_delta("Hello"));
568        assert!(stream.events[0].timestamp_ms > 0);
569    }
570
571    // StreamStore tests
572
573    #[test]
574    fn test_store_create_session() {
575        let store = StreamStore::new();
576        let key = store.create_session("task-1", "phase-a");
577
578        assert_eq!(key, "task-1");
579        assert!(store.has_session("task-1"));
580    }
581
582    #[test]
583    fn test_store_push_event() {
584        let store = StreamStore::new();
585        store.create_session("task-1", "phase-a");
586        store.push_event("task-1", StreamEvent::text_delta("Hello\n"));
587
588        let output = store.get_output("task-1", 100);
589        assert_eq!(output.len(), 1);
590        assert_eq!(output[0], "Hello");
591    }
592
593    #[test]
594    fn test_store_set_session_id() {
595        let store = StreamStore::new();
596        store.create_session("task-1", "phase-a");
597        store.set_session_id("task-1", "sess-xyz");
598
599        let session_id = store.get_session_id("task-1");
600        assert_eq!(session_id, Some("sess-xyz".to_string()));
601    }
602
603    #[test]
604    fn test_store_set_pid() {
605        let store = StreamStore::new();
606        store.create_session("task-1", "phase-a");
607        store.set_pid("task-1", 12345);
608
609        // Verify by checking stats or through save_session_metadata
610        assert!(store.has_session("task-1"));
611    }
612
613    #[test]
614    fn test_store_get_status() {
615        let store = StreamStore::new();
616        store.create_session("task-1", "phase-a");
617
618        assert_eq!(store.get_status("task-1"), Some(SessionStatus::Starting));
619
620        store.push_event("task-1", StreamEvent::complete(true));
621        assert_eq!(store.get_status("task-1"), Some(SessionStatus::Completed));
622    }
623
624    #[test]
625    fn test_store_active_tasks() {
626        let store = StreamStore::new();
627        store.create_session("task-1", "phase-a");
628        store.create_session("task-2", "phase-a");
629        store.push_event("task-2", StreamEvent::complete(true));
630
631        let active = store.active_tasks();
632        assert_eq!(active.len(), 1);
633        assert!(active.contains(&"task-1".to_string()));
634    }
635
636    #[test]
637    fn test_store_all_tasks() {
638        let store = StreamStore::new();
639        store.create_session("task-1", "phase-a");
640        store.create_session("task-2", "phase-b");
641
642        let all = store.all_tasks();
643        assert_eq!(all.len(), 2);
644    }
645
646    #[test]
647    fn test_store_remove_session() {
648        let store = StreamStore::new();
649        store.create_session("task-1", "phase-a");
650        assert!(store.has_session("task-1"));
651
652        let removed = store.remove_session("task-1");
653        assert!(removed.is_some());
654        assert!(!store.has_session("task-1"));
655    }
656
657    #[test]
658    fn test_store_session_stats() {
659        let store = StreamStore::new();
660        store.create_session("task-1", "phase-a");
661        store.push_event("task-1", StreamEvent::text_delta("Line 1\n"));
662        store.push_event("task-1", StreamEvent::text_delta("Line 2\n"));
663
664        let stats = store.session_stats("task-1");
665        assert!(stats.is_some());
666        let (events, lines) = stats.unwrap();
667        assert_eq!(events, 2);
668        assert_eq!(lines, 2);
669    }
670
671    #[test]
672    fn test_store_nonexistent_session() {
673        let store = StreamStore::new();
674
675        assert_eq!(store.get_output("nonexistent", 100), Vec::<String>::new());
676        assert_eq!(store.get_status("nonexistent"), None);
677        assert_eq!(store.get_session_id("nonexistent"), None);
678    }
679
680    #[test]
681    fn test_store_thread_safety() {
682        use std::sync::Arc;
683        use std::thread;
684
685        let store = Arc::new(StreamStore::new());
686        store.create_session("task-1", "phase-a");
687
688        let handles: Vec<_> = (0..10)
689            .map(|i| {
690                let store = Arc::clone(&store);
691                thread::spawn(move || {
692                    for j in 0..100 {
693                        store.push_event(
694                            "task-1",
695                            StreamEvent::text_delta(&format!("Thread {} line {}\n", i, j)),
696                        );
697                    }
698                })
699            })
700            .collect();
701
702        for handle in handles {
703            handle.join().unwrap();
704        }
705
706        let stats = store.session_stats("task-1").unwrap();
707        assert_eq!(stats.0, 1000); // 10 threads * 100 events
708        assert_eq!(stats.1, 1000); // 10 threads * 100 lines
709    }
710
711    #[test]
712    fn test_memory_limit_output_lines() {
713        let mut stream = SessionStream::new("task-1", "test");
714
715        // Push more than MAX_OUTPUT_LINES
716        for i in 0..MAX_OUTPUT_LINES + 1000 {
717            stream.push_event(StreamEvent::text_delta(&format!("Line {}\n", i)));
718        }
719
720        // Should have trimmed to within limits
721        assert!(stream.output_lines.len() <= MAX_OUTPUT_LINES);
722    }
723
724    #[test]
725    fn test_memory_limit_events() {
726        let mut stream = SessionStream::new("task-1", "test");
727
728        // Push more than MAX_EVENTS
729        for i in 0..MAX_EVENTS + 1000 {
730            stream.push_event(StreamEvent::text_delta(&format!("{}", i)));
731        }
732
733        // Should have trimmed to within limits
734        assert!(stream.events.len() <= MAX_EVENTS);
735    }
736
737    #[test]
738    fn test_save_and_load_session_metadata() {
739        let temp_dir = std::env::temp_dir().join(format!("scud_test_{}", std::process::id()));
740        std::fs::create_dir_all(&temp_dir).unwrap();
741
742        let store = StreamStore::new();
743        store.create_session("task-1", "phase-a");
744        store.set_session_id("task-1", "sess-abc123");
745        store.set_pid("task-1", 12345);
746
747        // Save metadata
748        store.save_session_metadata("task-1", &temp_dir).unwrap();
749
750        // Verify file exists
751        let metadata_file = temp_dir.join(".scud").join("headless").join("task-1.json");
752        assert!(metadata_file.exists());
753
754        // Load metadata
755        let loaded = StreamStore::load_session_metadata("task-1", &temp_dir).unwrap();
756        assert_eq!(loaded, Some("sess-abc123".to_string()));
757
758        // Cleanup
759        std::fs::remove_dir_all(&temp_dir).ok();
760    }
761
762    #[test]
763    fn test_load_nonexistent_metadata() {
764        let temp_dir = std::env::temp_dir().join(format!("scud_test_ne_{}", std::process::id()));
765        let loaded = StreamStore::load_session_metadata("nonexistent", &temp_dir).unwrap();
766        assert_eq!(loaded, None);
767    }
768
769    #[test]
770    fn test_get_session_id_empty_string() {
771        let store = StreamStore::new();
772        store.create_session("task-1", "phase-a");
773        // Session ID is empty by default
774
775        // Should return None for empty session ID
776        let session_id = store.get_session_id("task-1");
777        assert_eq!(session_id, None);
778    }
779}