Skip to main content

ai_agent/
session.rs

1// Source: /data/home/swei/claudecode/openclaudecode/src/commands/session/session.tsx
2use crate::constants::env::system;
3use crate::types::Message;
4use serde::{Deserialize, Serialize};
5use std::io::{Read, Write};
6use std::path::PathBuf;
7use tokio::fs;
8
9/// Session metadata.
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct SessionMetadata {
12    pub id: String,
13    pub cwd: String,
14    pub model: String,
15    #[serde(rename = "createdAt")]
16    pub created_at: String,
17    #[serde(rename = "updatedAt")]
18    pub updated_at: String,
19    #[serde(rename = "messageCount")]
20    pub message_count: u32,
21    pub summary: Option<String>,
22    pub tag: Option<String>,
23}
24
25/// Session data on disk.
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct SessionData {
28    pub metadata: SessionMetadata,
29    pub messages: Vec<Message>,
30}
31
32/// Get the sessions directory path.
33pub fn get_sessions_dir() -> PathBuf {
34    let home = std::env::var(system::HOME)
35        .or_else(|_| std::env::var(system::USERPROFILE))
36        .unwrap_or_else(|_| "/tmp".to_string());
37    PathBuf::from(home).join(".open-agent-sdk").join("sessions")
38}
39
40/// Get the path for a specific session.
41pub fn get_session_path(session_id: &str) -> PathBuf {
42    get_sessions_dir().join(session_id)
43}
44
45/// Save session to disk.
46pub async fn save_session(
47    session_id: &str,
48    messages: Vec<Message>,
49    metadata: Option<SessionMetadata>,
50) -> Result<(), crate::error::AgentError> {
51    let dir = get_session_path(session_id);
52    fs::create_dir_all(&dir)
53        .await
54        .map_err(crate::error::AgentError::Io)?;
55
56    let cwd = metadata
57        .as_ref()
58        .and_then(|m| Some(m.cwd.clone()))
59        .unwrap_or_else(|| {
60            std::env::current_dir()
61                .unwrap_or_default()
62                .to_string_lossy()
63                .to_string()
64        });
65
66    let model = metadata
67        .as_ref()
68        .and_then(|m| Some(m.model.clone()))
69        .unwrap_or_else(|| "claude-sonnet-4-6".to_string());
70
71    let created_at = metadata
72        .as_ref()
73        .and_then(|m| Some(m.created_at.clone()))
74        .unwrap_or_else(|| chrono::Utc::now().to_rfc3339());
75
76    let summary = metadata.as_ref().and_then(|m| m.summary.clone());
77    let tag = metadata.as_ref().and_then(|m| m.tag.clone());
78
79    let data = SessionData {
80        metadata: SessionMetadata {
81            id: session_id.to_string(),
82            cwd,
83            model,
84            created_at: created_at.clone(),
85            updated_at: chrono::Utc::now().to_rfc3339(),
86            message_count: messages.len() as u32,
87            summary,
88            tag,
89        },
90        messages,
91    };
92
93    let path = dir.join("transcript.json");
94    let json = serde_json::to_string_pretty(&data).map_err(crate::error::AgentError::Json)?;
95    fs::write(&path, json)
96        .await
97        .map_err(crate::error::AgentError::Io)?;
98
99    Ok(())
100}
101
102/// Load session from disk.
103pub async fn load_session(
104    session_id: &str,
105) -> Result<Option<SessionData>, crate::error::AgentError> {
106    let path = get_session_path(session_id).join("transcript.json");
107
108    match fs::read_to_string(&path).await {
109        Ok(content) => {
110            let data: SessionData =
111                serde_json::from_str(&content).map_err(crate::error::AgentError::Json)?;
112            Ok(Some(data))
113        }
114        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
115        Err(e) => Err(crate::error::AgentError::Io(e)),
116    }
117}
118
119/// List all sessions.
120pub async fn list_sessions() -> Result<Vec<SessionMetadata>, crate::error::AgentError> {
121    let dir = get_sessions_dir();
122
123    let mut entries = match fs::read_dir(&dir).await {
124        Ok(entries) => entries,
125        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(vec![]),
126        Err(e) => return Err(crate::error::AgentError::Io(e)),
127    };
128
129    let mut sessions = Vec::new();
130
131    while let Some(entry) = entries
132        .next_entry()
133        .await
134        .map_err(crate::error::AgentError::Io)?
135    {
136        let entry_id = entry.file_name().to_string_lossy().to_string();
137        if let Ok(Some(data)) = load_session(&entry_id).await {
138            if let Some(metadata) = Some(data.metadata) {
139                sessions.push(metadata);
140            }
141        }
142    }
143
144    // Sort by updatedAt descending
145    sessions.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
146
147    Ok(sessions)
148}
149
150/// Fork a session (create a copy with a new ID).
151pub async fn fork_session(
152    source_session_id: &str,
153    new_session_id: Option<String>,
154) -> Result<Option<String>, crate::error::AgentError> {
155    let data = match load_session(source_session_id).await? {
156        Some(d) => d,
157        None => return Ok(None),
158    };
159
160    let fork_id = new_session_id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
161
162    save_session(
163        &fork_id,
164        data.messages,
165        Some(SessionMetadata {
166            id: fork_id.clone(),
167            cwd: data.metadata.cwd,
168            model: data.metadata.model,
169            created_at: chrono::Utc::now().to_rfc3339(),
170            updated_at: chrono::Utc::now().to_rfc3339(),
171            message_count: data.metadata.message_count,
172            summary: Some(format!("Forked from session {}", source_session_id)),
173            tag: None,
174        }),
175    )
176    .await?;
177
178    Ok(Some(fork_id))
179}
180
181/// Get session messages.
182pub async fn get_session_messages(
183    session_id: &str,
184) -> Result<Vec<Message>, crate::error::AgentError> {
185    match load_session(session_id).await? {
186        Some(data) => Ok(data.messages),
187        None => Ok(vec![]),
188    }
189}
190
191/// Append a message to a session transcript.
192pub async fn append_to_session(
193    session_id: &str,
194    message: Message,
195) -> Result<(), crate::error::AgentError> {
196    let mut data = match load_session(session_id).await? {
197        Some(d) => d,
198        None => return Ok(()),
199    };
200
201    data.messages.push(message);
202    data.metadata.updated_at = chrono::Utc::now().to_rfc3339();
203    data.metadata.message_count = data.messages.len() as u32;
204
205    save_session(session_id, data.messages, Some(data.metadata)).await
206}
207
208/// Delete a session.
209pub async fn delete_session(session_id: &str) -> Result<bool, crate::error::AgentError> {
210    let path = get_session_path(session_id);
211
212    match fs::remove_dir_all(&path).await {
213        Ok(_) => Ok(true),
214        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
215        Err(e) => Err(crate::error::AgentError::Io(e)),
216    }
217}
218
219/// Get info about a specific session.
220pub async fn get_session_info(
221    session_id: &str,
222) -> Result<Option<SessionMetadata>, crate::error::AgentError> {
223    match load_session(session_id).await? {
224        Some(data) => Ok(Some(data.metadata)),
225        None => Ok(None),
226    }
227}
228
229/// Rename a session.
230pub async fn rename_session(session_id: &str, title: &str) -> Result<(), crate::error::AgentError> {
231    let mut data = match load_session(session_id).await? {
232        Some(d) => d,
233        None => return Ok(()),
234    };
235
236    data.metadata.summary = Some(title.to_string());
237    data.metadata.updated_at = chrono::Utc::now().to_rfc3339();
238
239    save_session(session_id, data.messages, Some(data.metadata)).await
240}
241
242/// Tag a session.
243pub async fn tag_session(
244    session_id: &str,
245    tag: Option<&str>,
246) -> Result<(), crate::error::AgentError> {
247    let mut data = match load_session(session_id).await? {
248        Some(d) => d,
249        None => return Ok(()),
250    };
251
252    data.metadata.tag = tag.map(|s| s.to_string());
253    data.metadata.updated_at = chrono::Utc::now().to_rfc3339();
254
255    save_session(session_id, data.messages, Some(data.metadata)).await
256}
257
258/// Configuration for session resume.
259#[derive(Debug, Clone, Default)]
260pub struct ResumeConfig {
261    /// Maximum number of tail messages to load (default: all messages)
262    pub max_tail_messages: Option<usize>,
263    /// Session ID to resume from. Messages after this point will be loaded.
264    /// When None, loads the full session.
265    pub tail_uuid: Option<String>,
266}
267
268/// Result of resuming a session.
269#[derive(Debug, Clone)]
270pub struct ResumeResult {
271    /// Messages to inject into the QueryEngine (deduplicated, tail segment)
272    pub messages: Vec<Message>,
273    /// Session metadata (model, cwd, etc.)
274    pub metadata: Option<SessionMetadata>,
275    /// Number of messages dropped (deduplicated or outside tail window)
276    pub dropped_count: usize,
277}
278
279/// Resume a session by loading its messages from disk.
280///
281/// This implements the core resume logic:
282/// 1. Load session from disk
283/// 2. Apply tail segment (load only messages after tail_uuid)
284/// 3. Deduplicate messages by UUID/content
285/// 4. Return messages ready to set on QueryEngine
286///
287/// Matches TypeScript's resume flow: load transcript → preserved segment → dedup → continue.
288pub async fn resume_session(
289    session_id: &str,
290    config: &ResumeConfig,
291) -> Result<ResumeResult, crate::error::AgentError> {
292    let data = match load_session(session_id).await? {
293        Some(d) => d,
294        None => {
295            return Ok(ResumeResult {
296                messages: vec![],
297                metadata: None,
298                dropped_count: 0,
299            })
300        }
301    };
302
303    let mut messages = data.messages;
304    let mut dropped = 0;
305
306    // Apply tail segment: skip messages before tail_uuid
307    if let Some(ref tail_uuid) = config.tail_uuid {
308        // Find the index of the message matching tail_uuid, take everything after
309        if let Some(idx) = messages.iter().position(|m| is_message_uuid(m, tail_uuid)) {
310            let after_tail = messages.drain(idx + 1..).collect::<Vec<_>>();
311            dropped += messages.len();
312            messages = after_tail;
313        }
314        // tail_uuid not found — keep all messages (fallback)
315    }
316
317    // Apply tail limit: keep only the last N messages
318    if let Some(max_tail) = config.max_tail_messages {
319        if messages.len() > max_tail {
320            let dropped_tail = messages.len() - max_tail;
321            messages.drain(..dropped_tail);
322            dropped += dropped_tail;
323        }
324    }
325
326    // Deduplicate messages by content
327    let before_dedup = messages.len();
328    messages = deduplicate_messages(messages);
329    dropped += before_dedup - messages.len();
330
331    Ok(ResumeResult {
332        messages,
333        metadata: Some(data.metadata),
334        dropped_count: dropped,
335    })
336}
337
338/// Check if a message matches a UUID (for tail segment loading).
339/// Since our simplified Message type doesn't have a UUID field,
340/// we match by tool_call_id or content hash.
341fn is_message_uuid(msg: &Message, uuid: &str) -> bool {
342    // Match by tool_call_id if present
343    if let Some(ref tool_call_id) = msg.tool_call_id {
344        if tool_call_id == uuid {
345            return true;
346        }
347    }
348    // Fallback: match by content hash (for messages without tool_call_id)
349    let content_hash = format!("{:x}", md5_hash(&msg.content));
350    content_hash == uuid
351}
352
353/// Simple hash for content matching.
354fn md5_hash(content: &str) -> u64 {
355    let mut hash: u64 = 5381;
356    for b in content.bytes() {
357        hash = hash.wrapping_mul(33).wrapping_add(b as u64);
358    }
359    hash
360}
361
362/// Deduplicate messages by content.
363/// Keeps the first occurrence of each unique message.
364fn deduplicate_messages(messages: Vec<Message>) -> Vec<Message> {
365    let mut seen = std::collections::HashSet::new();
366    let mut result = Vec::with_capacity(messages.len());
367    for msg in messages {
368        let key = (msg.role.clone(), msg.content.clone());
369        if seen.insert(key) {
370            result.push(msg);
371        }
372    }
373    result
374}
375
376/// Create a preserved segment from the last N messages.
377///
378/// Preserved segments are kept during compaction to maintain context.
379/// This mirrors the TypeScript `preservedSegment` used in `getAppStateForCompact`.
380pub fn create_preserved_segment(
381    messages: &[Message],
382    max_tokens: u32,
383    tail_count: usize,
384) -> Vec<Message> {
385    let tail = &messages[messages.len().saturating_sub(tail_count)..];
386    let mut tokens = 0;
387    let mut result = Vec::new();
388
389    for msg in tail.iter().rev() {
390        let msg_tokens = crate::compact::rough_token_count_estimation_for_content(&msg.content);
391        if tokens + msg_tokens > max_tokens as usize {
392            break;
393        }
394        tokens += msg_tokens;
395        result.push(msg.clone());
396    }
397
398    // Reverse to maintain chronological order
399    result.reverse();
400    result
401}
402
403// --------------------------------------------------------------------------
404// NDJSON Streaming Session Writes
405// Source: ~/claudecode/openclaudecode/src/utils/sessionStorage.ts
406//
407// Replaces monolithic transcript.json with incremental .jsonl writes.
408// Each session entry (message, metadata) is serialized as one NDJSON line
409// and appended to {session_id}.jsonl. A global write queue with 100ms drain
410// timer batches writes for efficiency.
411// --------------------------------------------------------------------------
412
413use crate::cli_ndjson_safe_stringify::serialize_to_ndjson;
414use std::collections::HashMap;
415use tokio::io::AsyncWriteExt;
416use std::sync::LazyLock;
417use tokio::time;
418
419/// One line in the NDJSON transcript file.
420#[derive(Debug, Clone, Serialize, Deserialize)]
421pub struct SessionEntry {
422    #[serde(skip_serializing_if = "Option::is_none")]
423    #[serde(rename = "t")]
424    pub timestamp: Option<String>,
425    #[serde(skip_serializing_if = "Option::is_none")]
426    #[serde(rename = "type")]
427    pub entry_type: Option<String>,
428    #[serde(skip_serializing_if = "Option::is_none")]
429    #[serde(rename = "d")]
430    pub data: Option<serde_json::Value>,
431}
432
433impl SessionEntry {
434    pub fn message(message: &Message) -> Self {
435        Self {
436            timestamp: Some(chrono::Utc::now().to_rfc3339()),
437            entry_type: Some("message".to_string()),
438            data: Some(serde_json::to_value(message).unwrap_or(serde_json::Value::Null)),
439        }
440    }
441
442    pub fn metadata(metadata: &SessionMetadata) -> Self {
443        Self {
444            timestamp: Some(chrono::Utc::now().to_rfc3339()),
445            entry_type: Some("metadata".to_string()),
446            data: Some(
447                serde_json::to_value(metadata).unwrap_or(serde_json::Value::Null),
448            ),
449        }
450    }
451
452    /// Create a sidechain (subagent) message entry
453    pub fn sidechain_message(message: &Message, agent_id: &str, parent_uuid: Option<&str>) -> Self {
454        let mut data_obj = serde_json::to_value(message).unwrap_or(serde_json::Value::Null);
455        if let Some(obj) = data_obj.as_object_mut() {
456            obj.insert("agentId".to_string(), serde_json::json!(agent_id));
457            obj.insert("isSidechain".to_string(), serde_json::json!(true));
458            if let Some(uuid) = parent_uuid {
459                obj.insert("parentUuid".to_string(), serde_json::json!(uuid));
460            }
461        }
462        Self {
463            timestamp: Some(chrono::Utc::now().to_rfc3339()),
464            entry_type: Some("message".to_string()),
465            data: Some(data_obj),
466        }
467    }
468}
469
470/// Get the sidechain .jsonl path for a subagent's transcript within a session.
471/// Sidechain transcripts are stored as {session_id}/sidechains/{agent_id}.jsonl
472pub fn get_sidechain_jsonl_path(session_id: &str, agent_id: &str) -> PathBuf {
473    get_session_path(session_id)
474        .join("sidechains")
475        .join(format!("{}.jsonl", agent_id))
476}
477
478/// Record a sidechain (subagent) transcript message.
479///
480/// Mirrors TS `recordSidechainTranscript(messages, agentId?, startingParentUuid?)`.
481/// Writes messages to the session's sidechain subdirectory.
482pub async fn record_sidechain_transcript(
483    session_id: &str,
484    messages: &[Message],
485    agent_id: &str,
486    starting_parent_uuid: Option<String>,
487) -> Result<(), crate::error::AgentError> {
488    let mut current_parent_uuid = starting_parent_uuid;
489
490    for message in messages {
491        let entry =
492            SessionEntry::sidechain_message(message, agent_id, current_parent_uuid.as_deref());
493
494        let path = get_sidechain_jsonl_path(session_id, agent_id);
495        let line =
496            crate::cli_ndjson_safe_stringify::serialize_to_ndjson(&entry)
497                .map_err(crate::error::AgentError::Json)?;
498
499        // Hold write lock during file I/O to prevent races with drain().
500        tokio::task::spawn_blocking(move || -> std::result::Result<(), crate::error::AgentError> {
501            std::fs::create_dir_all(path.parent().unwrap())
502                .map_err(crate::error::AgentError::Io)?;
503            let _guard = SESSION_WRITE_LOCK.lock().unwrap();
504            let mut file = std::fs::OpenOptions::new()
505                .create(true)
506                .append(true)
507                .open(&path)
508                .map_err(crate::error::AgentError::Io)?;
509            file.write_all(format!("{line}\n").as_bytes())
510                .map_err(crate::error::AgentError::Io)?;
511            Ok(())
512        })
513        .await
514        .map_err(|_| crate::error::AgentError::Io(std::io::Error::new(
515            std::io::ErrorKind::Other,
516            "task joined",
517        )))??;
518
519        // Chain parent UUID for next message
520        current_parent_uuid = Some(uuid::Uuid::new_v4().to_string());
521    }
522
523    Ok(())
524}
525
526/// Insert a chain of messages into a session transcript.
527///
528/// Mirrors TS `insertMessageChain(messages, isSidechain, agentId, startingParentUuid)`.
529/// When `is_sidechain` is true, delegates to `record_sidechain_transcript`.
530/// When false, appends to the main session transcript.
531pub async fn insert_message_chain(
532    session_id: &str,
533    messages: &[Message],
534    is_sidechain: bool,
535    agent_id: Option<String>,
536    starting_parent_uuid: Option<String>,
537) -> Result<(), crate::error::AgentError> {
538    if is_sidechain {
539        let aid = agent_id.unwrap_or_else(|| "default".to_string());
540        record_sidechain_transcript(session_id, messages, &aid, starting_parent_uuid).await
541    } else {
542        for message in messages {
543            append_session_message(session_id, message).await?;
544        }
545        Ok(())
546    }
547}
548
549/// Get the .jsonl path for a session's NDJSON transcript.
550pub fn get_jsonl_path(session_id: &str) -> PathBuf {
551    get_session_path(session_id).join(format!("{session_id}.jsonl"))
552}
553
554/// Append one NDJSON session entry to the transcript file.
555///
556/// Creates the session directory if needed, opens the file in append mode,
557/// and writes one NDJSON-safe line. This is O(1) per message.
558pub async fn append_session_entry(
559    session_id: &str,
560    entry: &SessionEntry,
561) -> Result<(), crate::error::AgentError> {
562    let path = get_jsonl_path(session_id);
563    fs::create_dir_all(path.parent().unwrap())
564        .await
565        .map_err(crate::error::AgentError::Io)?;
566
567    let line = serialize_to_ndjson(entry).map_err(crate::error::AgentError::Json)?;
568    // Hold write lock during file I/O to prevent races with drain().
569    // Use spawn_blocking so we can hold a std::sync::MutexGuard.
570    tokio::task::spawn_blocking(move || -> std::result::Result<(), crate::error::AgentError> {
571        let _guard = SESSION_WRITE_LOCK.lock().unwrap();
572        std::fs::create_dir_all(path.parent().unwrap())
573            .map_err(crate::error::AgentError::Io)?;
574        let mut file = std::fs::OpenOptions::new()
575            .create(true)
576            .append(true)
577            .open(&path)
578            .map_err(crate::error::AgentError::Io)?;
579        file.write_all(format!("{line}\n").as_bytes())
580            .map_err(crate::error::AgentError::Io)?;
581        Ok(())
582    })
583    .await
584    .map_err(|_| crate::error::AgentError::Io(std::io::Error::new(
585        std::io::ErrorKind::Other,
586        "task joined",
587    )))??;
588    Ok(())
589}
590
591/// Append a single message to the session as an NDJSON entry.
592///
593/// Convenience wrapper around `append_session_entry`.
594pub async fn append_session_message(
595    session_id: &str,
596    message: &Message,
597) -> Result<(), crate::error::AgentError> {
598    let entry = SessionEntry::message(message);
599    append_session_entry(session_id, &entry).await
600}
601
602/// Load a session from its NDJSON transcript file.
603///
604/// Reads all lines, parses each as a SessionEntry, and reconstructs
605/// the SessionData from message entries.
606pub async fn load_session_jsonl(
607    session_id: &str,
608) -> Result<Option<SessionData>, crate::error::AgentError> {
609    let path = get_jsonl_path(session_id);
610    match fs::read_to_string(&path).await {
611        Ok(content) => {
612            let mut messages = Vec::new();
613            let mut metadata: Option<SessionMetadata> = None;
614
615            for line in content.lines() {
616                let line = line.trim().to_string();
617                if line.is_empty() {
618                    continue;
619                }
620                let entry: SessionEntry =
621                    serde_json::from_str(&line).map_err(crate::error::AgentError::Json)?;
622                if entry.entry_type.as_deref() == Some("message") {
623                    if let Some(data) = &entry.data {
624                        let msg: Message =
625                            serde_json::from_value(data.clone()).map_err(crate::error::AgentError::Json)?;
626                        messages.push(msg);
627                    }
628                } else if entry.entry_type.as_deref() == Some("metadata") {
629                    if let Some(data) = &entry.data {
630                        metadata =
631                            Some(serde_json::from_value(data.clone()).map_err(crate::error::AgentError::Json)?);
632                    }
633                }
634            }
635
636            if messages.is_empty() && metadata.is_none() {
637                return Ok(None);
638            }
639
640            let final_metadata = metadata.unwrap_or_else(|| SessionMetadata {
641                id: session_id.to_string(),
642                cwd: std::env::current_dir()
643                    .unwrap_or_default()
644                    .to_string_lossy()
645                    .to_string(),
646                model: "claude-sonnet-4-6".to_string(),
647                created_at: chrono::Utc::now().to_rfc3339(),
648                updated_at: chrono::Utc::now().to_rfc3339(),
649                message_count: messages.len() as u32,
650                summary: None,
651                tag: None,
652            });
653
654            Ok(Some(SessionData {
655                metadata: final_metadata,
656                messages,
657            }))
658        }
659        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
660        Err(e) => Err(crate::error::AgentError::Io(e)),
661    }
662}
663
664/// Per-session pending NDJSON lines (global static).
665static SESSION_PENDING: LazyLock<std::sync::Mutex<HashMap<String, Vec<String>>>> =
666    LazyLock::new(|| std::sync::Mutex::new(HashMap::new()));
667
668/// Whether background drain task is running.
669static SESSION_DRAINING: LazyLock<std::sync::Mutex<bool>> =
670    LazyLock::new(|| std::sync::Mutex::new(false));
671
672/// Whether a test reset has been requested (for test isolation).
673static SESSION_RESET_REQUESTED: LazyLock<std::sync::Mutex<bool>> =
674    LazyLock::new(|| std::sync::Mutex::new(false));
675
676/// When true, the drain loop should not start. Used to prevent
677/// background disk I/O from racing with test I/O.
678static SESSION_DRAIN_PAUSED: LazyLock<std::sync::Mutex<bool>> =
679    LazyLock::new(|| std::sync::Mutex::new(false));
680
681/// Guard that, when held, prevents any drain() from doing file I/O.
682/// Tests should hold this during their file I/O section.
683/// drain() acquires this before each file write batch.
684/// This is an std::sync::Mutex (parking_lot-style) that blocks the async
685/// drain until the synchronous test code releases it.
686static SESSION_WRITE_LOCK: LazyLock<std::sync::Mutex<()>> =
687    LazyLock::new(|| std::sync::Mutex::new(()));
688
689
690/// Drain interval in milliseconds (matches TS FLUSH_INTERVAL_MS = 100).
691const SESSION_FLUSH_INTERVAL_MS: u64 = 100;
692
693pub struct SessionWriter;
694
695impl SessionWriter {
696    /// Enqueue an NDJSON line for a session. Starts a background drain
697    /// task if one isn't already running.
698    pub fn enqueue(session_id: &str, line: String) {
699        {
700            let mut pending = SESSION_PENDING.lock().unwrap();
701            pending
702                .entry(session_id.to_string())
703                .or_default()
704                .push(line);
705        }
706
707        // Start background drain if not already running and not paused
708        {
709            let paused = *SESSION_DRAIN_PAUSED.lock().unwrap();
710            if paused {
711                return;
712            }
713            let mut draining = SESSION_DRAINING.lock().unwrap();
714            if *draining {
715                return;
716            }
717            *draining = true;
718        }
719        tokio::spawn(Self::drain_loop());
720    }
721
722    /// Background drain loop: flushes all pending writes, then sleeps.
723    /// Exits when all queues are empty.
724    /// Sleeps in 10ms intervals to respond promptly to reset requests.
725    async fn drain_loop() {
726        let mut ticks = 0u32;
727        loop {
728            time::sleep(time::Duration::from_millis(10)).await;
729            ticks += 1;
730            // If paused (e.g. during testing), exit immediately without writing.
731            if *SESSION_DRAIN_PAUSED.lock().unwrap() {
732                *SESSION_DRAINING.lock().unwrap() = false;
733                return;
734            }
735            // If a test reset has been requested, flush and exit promptly.
736            if *SESSION_RESET_REQUESTED.lock().unwrap() {
737                Self::drain().await;
738                *SESSION_DRAINING.lock().unwrap() = false;
739                return;
740            }
741            // Flush every SESSION_FLUSH_INTERVAL_MS (100ms = 10 ticks of 10ms)
742            if ticks % ((SESSION_FLUSH_INTERVAL_MS / 10) as u32) == 0 {
743                if Self::drain().await {
744                    *SESSION_DRAINING.lock().unwrap() = false;
745                    break;
746                }
747            }
748        }
749    }
750
751    /// Drain all pending writes to disk. Returns true if all queues are now empty.
752    pub async fn drain() -> bool {
753        // Fast-path: if a test reset is in progress, skip to avoid
754        // racing with test file I/O. Don't clear pending — another test
755        // may be actively using it.
756        if *SESSION_RESET_REQUESTED.lock().unwrap() {
757            return false;
758        }
759
760        let to_drain = {
761            let mut pending = SESSION_PENDING.lock().unwrap();
762            let mut batch = HashMap::new();
763            for (session_id, lines) in pending.iter_mut() {
764                if !lines.is_empty() {
765                    batch.insert(session_id.clone(), lines.clone());
766                    lines.clear();
767                }
768            }
769            batch
770        };
771
772        if to_drain.is_empty() {
773            return SESSION_PENDING.lock().unwrap().is_empty();
774        }
775
776        // Re-check reset flag after acquiring data.
777        if *SESSION_RESET_REQUESTED.lock().unwrap() {
778            return false;
779        }
780
781        // Do file I/O in a blocking task while holding the write lock.
782        // This prevents races with synchronous test code that also
783        // holds SESSION_WRITE_LOCK during its file operations.
784        tokio::task::spawn_blocking(move || {
785            let _guard = SESSION_WRITE_LOCK.lock().unwrap();
786            for (session_id, lines) in to_drain {
787                let path = get_jsonl_path(&session_id);
788                let content: String = lines.join("\n");
789                let _ = std::fs::create_dir_all(path.parent().unwrap());
790                if let Ok(mut file) = std::fs::OpenOptions::new()
791                    .create(true)
792                    .append(true)
793                    .open(&path)
794                {
795                    let _ = file.write_all(format!("{content}\n").as_bytes());
796                }
797            }
798        })
799        .await
800        .ok();
801
802        SESSION_PENDING.lock().unwrap().is_empty()
803    }
804
805    /// Flush a specific session's pending writes immediately.
806    pub async fn flush(_session_id: &str) {
807        Self::drain().await;
808    }
809}
810
811/// Reset session globals for test isolation.
812pub fn reset_session_globals_for_testing() {
813    // Pause the drain loop to prevent new drain loops from starting.
814    *SESSION_DRAIN_PAUSED.lock().unwrap() = true;
815    // Signal any existing drain loop to exit.
816    *SESSION_RESET_REQUESTED.lock().unwrap() = true;
817    // Wait for drain loop to notice the flag and exit.
818    // The drain loop sleeps 10ms per iteration, so we poll every 20ms.
819    let start = std::time::Instant::now();
820    while start.elapsed() < std::time::Duration::from_millis(500) {
821        if !*SESSION_DRAINING.lock().unwrap() {
822            break;
823        }
824        std::thread::sleep(std::time::Duration::from_millis(20));
825    }
826    // Don't clear SESSION_PENDING — other parallel tests may be using it.
827    // Just stop the drain loop and keep it paused.
828    *SESSION_DRAINING.lock().unwrap() = false;
829    *SESSION_RESET_REQUESTED.lock().unwrap() = false;
830}
831
832/// Enqueue a message for NDJSON streaming write.
833///
834/// This is the primary way to persist session messages incrementally.
835/// The global write queue will drain to disk every 100ms.
836pub fn enqueue_session_message(session_id: &str, message: &Message) {
837    let line = serialize_to_ndjson(&SessionEntry::message(message))
838        .unwrap_or_default();
839    SessionWriter::enqueue(session_id, line);
840}
841
842/// Enqueue metadata for NDJSON streaming write.
843pub fn enqueue_session_metadata(session_id: &str, metadata: &SessionMetadata) {
844    let line = serialize_to_ndjson(&SessionEntry::metadata(metadata))
845        .unwrap_or_default();
846    SessionWriter::enqueue(session_id, line);
847}
848
849/// Drain all pending session writes. Call on shutdown.
850pub async fn drain_all_sessions() {
851    loop {
852        if SessionWriter::drain().await {
853            break;
854        }
855    }
856}
857
858#[cfg(test)]
859mod resume_tests {
860    use super::*;
861
862    #[test]
863    fn test_deduplicate_messages() {
864        let messages = vec![
865            Message {
866                role: crate::types::MessageRole::User,
867                content: "hello".to_string(),
868                ..Default::default()
869            },
870            Message {
871                role: crate::types::MessageRole::User,
872                content: "hello".to_string(),
873                ..Default::default()
874            },
875            Message {
876                role: crate::types::MessageRole::Assistant,
877                content: "hi back".to_string(),
878                ..Default::default()
879            },
880        ];
881        let deduped = deduplicate_messages(messages);
882        assert_eq!(deduped.len(), 2);
883    }
884
885    #[test]
886    fn test_deduplicate_preserves_order() {
887        let messages = vec![
888            Message {
889                role: crate::types::MessageRole::User,
890                content: "first".to_string(),
891                ..Default::default()
892            },
893            Message {
894                role: crate::types::MessageRole::Assistant,
895                content: "second".to_string(),
896                ..Default::default()
897            },
898            Message {
899                role: crate::types::MessageRole::User,
900                content: "first".to_string(),
901                ..Default::default()
902            },
903        ];
904        let deduped = deduplicate_messages(messages);
905        assert_eq!(deduped.len(), 2);
906        assert_eq!(deduped[0].content, "first");
907        assert_eq!(deduped[1].content, "second");
908    }
909
910    #[tokio::test]
911    async fn test_resume_session_not_found() {
912        let config = ResumeConfig::default();
913        let result = resume_session("nonexistent-id", &config).await;
914        assert!(result.is_ok());
915        let r = result.unwrap();
916        assert!(r.messages.is_empty());
917        assert!(r.metadata.is_none());
918    }
919
920    #[test]
921    fn test_create_preserved_segment() {
922        let messages: Vec<Message> = (0..10)
923            .map(|i| Message {
924                role: crate::types::MessageRole::User,
925                content: format!("msg {}", i),
926                ..Default::default()
927            })
928            .collect();
929        let segment = create_preserved_segment(&messages, 100, 5);
930        assert!(!segment.is_empty());
931        assert!(segment.len() <= 5);
932        // Messages should be in chronological order
933        for i in 1..segment.len() {
934            assert!(segment[i].content > segment[i - 1].content);
935        }
936    }
937
938    #[test]
939    fn test_create_preserved_segment_respects_token_budget() {
940        let messages: Vec<Message> = (0..100)
941            .map(|i| Message {
942                role: crate::types::MessageRole::User,
943                content: "x".repeat(10_000),
944                ..Default::default()
945            })
946            .collect();
947        let segment = create_preserved_segment(&messages, 5_000, 10);
948        assert!(segment.len() <= 2);
949    }
950
951    #[test]
952    fn test_is_message_uuid_matches_tool_call_id() {
953        let msg = Message {
954            tool_call_id: Some("abc-123".to_string()),
955            ..Default::default()
956        };
957        assert!(is_message_uuid(&msg, "abc-123"));
958        assert!(!is_message_uuid(&msg, "other-id"));
959    }
960
961    #[test]
962    fn test_md5_hash_deterministic() {
963        let h1 = md5_hash("hello world");
964        let h2 = md5_hash("hello world");
965        assert_eq!(h1, h2);
966        assert_ne!(h1, md5_hash("different"));
967    }
968}
969mod tests {
970    use super::*;
971    use crate::types::MessageRole;
972
973    fn create_test_message(content: &str) -> Message {
974        Message {
975            role: MessageRole::User,
976            content: content.to_string(),
977            ..Default::default()
978        }
979    }
980
981    #[tokio::test]
982    async fn test_get_sessions_dir() {
983        let dir = get_sessions_dir();
984        assert!(dir.to_string_lossy().contains(".open-agent-sdk"));
985    }
986
987    #[tokio::test]
988    async fn test_save_and_load_session() {
989        let _session_id = format!("test-session-{}", uuid::Uuid::new_v4());
990        let session_id = _session_id.as_str();
991        let messages = vec![create_test_message("Hello")];
992
993        // Save
994        save_session(session_id, messages.clone(), None)
995            .await
996            .unwrap();
997
998        // Load
999        let loaded = load_session(session_id).await.unwrap();
1000        assert!(loaded.is_some());
1001        assert_eq!(loaded.unwrap().messages.len(), 1);
1002
1003        // Cleanup
1004        delete_session(session_id).await.unwrap();
1005    }
1006
1007    #[tokio::test]
1008    async fn test_load_nonexistent_session() {
1009        let loaded = load_session("nonexistent-session").await.unwrap();
1010        assert!(loaded.is_none());
1011    }
1012
1013    #[tokio::test]
1014    async fn test_fork_session() {
1015        let _source_id = format!("fork-source-{}", uuid::Uuid::new_v4());
1016        let source_id = _source_id.as_str();
1017        let messages = vec![
1018            create_test_message("First"),
1019            Message {
1020                role: MessageRole::Assistant,
1021                content: "Response".to_string(),
1022                ..Default::default()
1023            },
1024        ];
1025
1026        // Save original
1027        save_session(source_id, messages, None).await.unwrap();
1028
1029        // Fork
1030        let fork_id = fork_session(source_id, None).await.unwrap();
1031        assert!(fork_id.is_some());
1032
1033        // Verify fork has messages
1034        let fork_messages = get_session_messages(fork_id.as_ref().unwrap())
1035            .await
1036            .unwrap();
1037        assert_eq!(fork_messages.len(), 2);
1038
1039        // Cleanup
1040        delete_session(source_id).await.unwrap();
1041        delete_session(fork_id.as_ref().unwrap()).await.unwrap();
1042    }
1043
1044    #[tokio::test]
1045    async fn test_append_to_session() {
1046        let _session_id = format!("append-test-{}", uuid::Uuid::new_v4());
1047        let session_id = _session_id.as_str();
1048
1049        // Create with initial message
1050        save_session(session_id, vec![create_test_message("Initial")], None)
1051            .await
1052            .unwrap();
1053
1054        // Append
1055        append_to_session(
1056            session_id,
1057            Message {
1058                role: MessageRole::Assistant,
1059                content: "Response".to_string(),
1060                ..Default::default()
1061            },
1062        )
1063        .await
1064        .unwrap();
1065
1066        // Verify
1067        let loaded = load_session(session_id).await.unwrap().unwrap();
1068        assert_eq!(loaded.messages.len(), 2);
1069
1070        // Cleanup
1071        delete_session(session_id).await.unwrap();
1072    }
1073
1074    #[tokio::test]
1075    async fn test_rename_session() {
1076        let _session_id = format!("rename-test-{}", uuid::Uuid::new_v4());
1077        let session_id = _session_id.as_str();
1078        save_session(session_id, vec![create_test_message("Test")], None)
1079            .await
1080            .unwrap();
1081
1082        rename_session(session_id, "My Session").await.unwrap();
1083
1084        let info = get_session_info(session_id).await.unwrap().unwrap();
1085        assert_eq!(info.summary, Some("My Session".to_string()));
1086
1087        // Cleanup
1088        delete_session(session_id).await.unwrap();
1089    }
1090
1091    #[tokio::test]
1092    async fn test_tag_session() {
1093        let _session_id = format!("tag-test-{}", uuid::Uuid::new_v4());
1094        let session_id = _session_id.as_str();
1095        save_session(session_id, vec![create_test_message("Test")], None)
1096            .await
1097            .unwrap();
1098
1099        tag_session(session_id, Some("important")).await.unwrap();
1100
1101        let info = get_session_info(session_id).await.unwrap().unwrap();
1102        assert_eq!(info.tag, Some("important".to_string()));
1103
1104        // Cleanup
1105        delete_session(session_id).await.unwrap();
1106    }
1107
1108    #[tokio::test]
1109    async fn test_delete_session() {
1110        let _session_id = format!("delete-test-{}", uuid::Uuid::new_v4());
1111        let session_id = _session_id.as_str();
1112        save_session(session_id, vec![create_test_message("Test")], None)
1113            .await
1114            .unwrap();
1115
1116        let result = delete_session(session_id).await.unwrap();
1117        assert!(result);
1118
1119        // Should not exist now
1120        let loaded = load_session(session_id).await.unwrap();
1121        assert!(loaded.is_none());
1122    }
1123}
1124
1125#[cfg(test)]
1126mod ndjson_tests {
1127    use super::*;
1128
1129    #[test]
1130    fn test_session_entry_message() {
1131        let msg = Message {
1132            role: crate::types::MessageRole::User,
1133            content: "hello world".to_string(),
1134            ..Default::default()
1135        };
1136        let entry = SessionEntry::message(&msg);
1137        assert_eq!(entry.entry_type, Some("message".to_string()));
1138        assert!(entry.timestamp.is_some());
1139        assert!(entry.data.is_some());
1140    }
1141
1142    #[test]
1143    fn test_session_entry_metadata() {
1144        let meta = SessionMetadata {
1145            id: "test-session".to_string(),
1146            cwd: "/tmp".to_string(),
1147            model: "claude-sonnet-4-6".to_string(),
1148            created_at: chrono::Utc::now().to_rfc3339(),
1149            updated_at: chrono::Utc::now().to_rfc3339(),
1150            message_count: 5,
1151            summary: None,
1152            tag: None,
1153        };
1154        let entry = SessionEntry::metadata(&meta);
1155        assert_eq!(entry.entry_type, Some("metadata".to_string()));
1156    }
1157
1158    #[test]
1159    fn test_session_entry_serializes() {
1160        let msg = Message {
1161            role: crate::types::MessageRole::User,
1162            content: "test message".to_string(),
1163            ..Default::default()
1164        };
1165        let entry = SessionEntry::message(&msg);
1166        let json = serde_json::to_string(&entry).unwrap();
1167        assert!(json.contains("\"type\":\"message\""));
1168        assert!(json.contains("\"t\""));
1169    }
1170
1171    #[test]
1172    fn test_session_entry_serializes_with_unicode() {
1173        let msg = Message {
1174            role: crate::types::MessageRole::User,
1175            content: "test\u{2028}line\u{2029}sep".to_string(),
1176            ..Default::default()
1177        };
1178        let entry = SessionEntry::message(&msg);
1179        let json = serialize_to_ndjson(&entry).unwrap();
1180        // Should escape U+2028/U+2029
1181        assert!(json.contains("\\u2028"));
1182        assert!(json.contains("\\u2029"));
1183        // Must be valid JSON
1184        assert!(serde_json::from_str::<serde_json::Value>(&json).is_ok());
1185    }
1186
1187    #[test]
1188    fn test_get_jsonl_path() {
1189        let path = get_jsonl_path("test-session-123");
1190        assert!(path.to_string_lossy().contains("test-session-123"));
1191        assert!(path.extension().map(|e| e == "jsonl").unwrap_or(false));
1192    }
1193
1194    #[tokio::test]
1195    async fn test_append_session_entry() {
1196        crate::tests::common::clear_all_test_state();
1197        let session_id = format!("ndjson-append-test-{}", uuid::Uuid::new_v4());
1198        let msg = Message {
1199            role: crate::types::MessageRole::User,
1200            content: "first message".to_string(),
1201            ..Default::default()
1202        };
1203        let entry = SessionEntry::message(&msg);
1204
1205        append_session_entry(&session_id, &entry).await.unwrap();
1206
1207        // Verify file was created
1208        let path = get_jsonl_path(&session_id);
1209        assert!(path.exists());
1210
1211        // Verify content is valid NDJSON
1212        let content = fs::read_to_string(&path).await.unwrap();
1213        let lines: Vec<&str> = content.lines().collect();
1214        assert_eq!(lines.len(), 1);
1215        let parsed: SessionEntry = serde_json::from_str(lines[0]).unwrap();
1216        assert_eq!(parsed.entry_type, Some("message".to_string()));
1217
1218        // Append second message
1219        let msg2 = Message {
1220            role: crate::types::MessageRole::Assistant,
1221            content: "response".to_string(),
1222            ..Default::default()
1223        };
1224        let entry2 = SessionEntry::message(&msg2);
1225        append_session_entry(&session_id, &entry2).await.unwrap();
1226
1227        let content = fs::read_to_string(&path).await.unwrap();
1228        let lines: Vec<&str> = content.lines().collect();
1229        assert_eq!(lines.len(), 2);
1230        let parsed2: SessionEntry = serde_json::from_str(lines[1]).unwrap();
1231        assert_eq!(parsed2.entry_type, Some("message".to_string()));
1232
1233        // Cleanup
1234        let _ = fs::remove_dir_all(get_session_path(&session_id)).await;
1235    }
1236
1237    #[tokio::test]
1238    async fn test_load_session_jsonl() {
1239        crate::tests::common::clear_all_test_state();
1240        let session_id = format!("ndjson-load-test-{}", uuid::Uuid::new_v4());
1241
1242        // Create session dir and append entries
1243        let dir = get_session_path(&session_id);
1244        fs::create_dir_all(&dir).await.unwrap();
1245
1246        let msg1 = Message {
1247            role: crate::types::MessageRole::User,
1248            content: "hello".to_string(),
1249            ..Default::default()
1250        };
1251        let msg2 = Message {
1252            role: crate::types::MessageRole::Assistant,
1253            content: "hi there".to_string(),
1254            ..Default::default()
1255        };
1256        append_session_entry(&session_id, &SessionEntry::message(&msg1)).await.unwrap();
1257        append_session_entry(&session_id, &SessionEntry::message(&msg2)).await.unwrap();
1258
1259        // Load back
1260        let data = load_session_jsonl(&session_id).await.unwrap();
1261        assert!(data.is_some());
1262        let data = data.unwrap();
1263        assert_eq!(data.messages.len(), 2);
1264        assert_eq!(data.messages[0].content, "hello");
1265        assert_eq!(data.messages[1].content, "hi there");
1266
1267        // Cleanup
1268        let _ = fs::remove_dir_all(get_session_path(&session_id)).await;
1269    }
1270
1271    #[tokio::test]
1272    async fn test_append_session_message() {
1273        crate::tests::common::clear_all_test_state();
1274        let session_id = format!("ndjson-append-msg-{}", uuid::Uuid::new_v4());
1275
1276        let msg = Message {
1277            role: crate::types::MessageRole::User,
1278            content: "quick test".to_string(),
1279            ..Default::default()
1280        };
1281        append_session_message(&session_id, &msg).await.unwrap();
1282
1283        let path = get_jsonl_path(&session_id);
1284        assert!(path.exists());
1285
1286        // Cleanup
1287        let _ = fs::remove_dir_all(get_session_path(&session_id)).await;
1288    }
1289
1290    #[tokio::test]
1291    async fn test_load_empty_jsonl() {
1292        crate::tests::common::clear_all_test_state();
1293        let session_id = format!("ndjson-empty-test-{}", uuid::Uuid::new_v4());
1294        let result = load_session_jsonl(&session_id).await.unwrap();
1295        assert!(result.is_none());
1296    }
1297
1298    #[tokio::test]
1299    async fn test_enqueue_and_drain() {
1300        crate::tests::common::clear_all_test_state();
1301        let session_id = format!("ndjson-enqueue-test-{}", uuid::Uuid::new_v4());
1302
1303        SessionWriter::enqueue(&session_id, "{\"test\":1}".to_string());
1304        SessionWriter::enqueue(&session_id, "{\"test\":2}".to_string());
1305
1306        // Drain immediately
1307        SessionWriter::drain().await;
1308
1309        // Verify file was created
1310        let path = get_jsonl_path(&session_id);
1311        assert!(path.exists());
1312        let content = fs::read_to_string(&path).await.unwrap();
1313        assert!(content.contains("{\"test\":1}"));
1314        assert!(content.contains("{\"test\":2}"));
1315
1316        // Cleanup: remove only our session from pending + disk
1317        {
1318            let mut pending = SESSION_PENDING.lock().unwrap();
1319            pending.remove(&session_id);
1320        }
1321        let _ = fs::remove_dir_all(get_session_path(&session_id)).await;
1322    }
1323
1324    #[tokio::test]
1325    async fn test_enqueue_session_message() {
1326        crate::tests::common::clear_all_test_state();
1327        let session_id = format!("ndjson-enqueue-msg-{}", uuid::Uuid::new_v4());
1328
1329        let msg = Message {
1330            role: crate::types::MessageRole::User,
1331            content: "streaming test".to_string(),
1332            ..Default::default()
1333        };
1334        enqueue_session_message(&session_id, &msg);
1335
1336        // Force drain
1337        SessionWriter::drain().await;
1338
1339        let path = get_jsonl_path(&session_id);
1340        assert!(path.exists());
1341        let content = fs::read_to_string(&path).await.unwrap();
1342        assert!(content.contains("streaming test"));
1343
1344        // Cleanup: remove only our session from pending + disk
1345        {
1346            let mut pending = SESSION_PENDING.lock().unwrap();
1347            pending.remove(&session_id);
1348        }
1349        let _ = fs::remove_dir_all(get_session_path(&session_id)).await;
1350    }
1351
1352    #[tokio::test]
1353    async fn test_multiple_sessions_drain() {
1354        crate::tests::common::clear_all_test_state();
1355        let session_id1 = format!("ndjson-multi-1-{}", uuid::Uuid::new_v4());
1356        let session_id2 = format!("ndjson-multi-2-{}", uuid::Uuid::new_v4());
1357
1358        SessionWriter::enqueue(&session_id1, "{\"s\":1}".to_string());
1359        SessionWriter::enqueue(&session_id2, "{\"s\":2}".to_string());
1360        SessionWriter::enqueue(&session_id1, "{\"s\":3}".to_string());
1361
1362        SessionWriter::drain().await;
1363
1364        let content1 = fs::read_to_string(get_jsonl_path(&session_id1)).await.unwrap();
1365        let content2 = fs::read_to_string(get_jsonl_path(&session_id2)).await.unwrap();
1366
1367        let lines1: Vec<&str> = content1.lines().collect();
1368        let lines2: Vec<&str> = content2.lines().collect();
1369        assert_eq!(lines1.len(), 2);
1370        assert_eq!(lines2.len(), 1);
1371
1372        // Cleanup: remove only our sessions from pending + disk
1373        {
1374            let mut pending = SESSION_PENDING.lock().unwrap();
1375            pending.remove(&session_id1);
1376            pending.remove(&session_id2);
1377        }
1378        let _ = fs::remove_dir_all(get_session_path(&session_id1)).await;
1379        let _ = fs::remove_dir_all(get_session_path(&session_id2)).await;
1380    }
1381
1382    #[tokio::test]
1383    async fn test_sidechain_jsonl_path() {
1384        let path = get_sidechain_jsonl_path("test-session", "agent-123");
1385        assert!(path.to_string_lossy().contains("test-session"));
1386        assert!(path.to_string_lossy().contains("sidechains"));
1387        assert!(path.to_string_lossy().contains("agent-123.jsonl"));
1388    }
1389
1390    #[tokio::test]
1391    async fn test_record_sidechain_transcript() {
1392        crate::tests::common::clear_all_test_state();
1393        let session_id = format!("sidechain-test-{}", uuid::Uuid::new_v4());
1394        let agent_id = "test-agent-001";
1395
1396        let msgs = vec![
1397            Message {
1398                role: crate::types::MessageRole::Assistant,
1399                content: "subagent start".to_string(),
1400                ..Default::default()
1401            },
1402            Message {
1403                role: crate::types::MessageRole::User,
1404                content: "tool result".to_string(),
1405                ..Default::default()
1406            },
1407        ];
1408
1409        record_sidechain_transcript(&session_id, &msgs, agent_id, None)
1410            .await
1411            .unwrap();
1412
1413        // Verify sidechain file exists
1414        let path = get_sidechain_jsonl_path(&session_id, agent_id);
1415        assert!(path.exists());
1416
1417        // Verify content
1418        let content = fs::read_to_string(&path).await.unwrap();
1419        let lines: Vec<&str> = content.lines().collect();
1420        assert_eq!(lines.len(), 2); // 2 messages
1421
1422        for line in &lines {
1423            let entry: SessionEntry = serde_json::from_str(line).unwrap();
1424            assert_eq!(entry.entry_type.as_deref(), Some("message"));
1425            // Verify isSidechain and agentId are in the data
1426            let data = entry.data.unwrap();
1427            assert!(data.get("isSidechain").unwrap().as_bool().unwrap());
1428            assert_eq!(
1429                data.get("agentId").unwrap().as_str().unwrap(),
1430                agent_id
1431            );
1432        }
1433
1434        // Cleanup
1435        let _ = fs::remove_dir_all(get_session_path(&session_id)).await;
1436    }
1437
1438    #[tokio::test]
1439    async fn test_sidechain_parent_uuid_chaining() {
1440        crate::tests::common::clear_all_test_state();
1441        let session_id = format!("sidechain-uuid-{}", uuid::Uuid::new_v4());
1442        let agent_id = "uuid-agent";
1443
1444        let starting_uuid = "start-uuid-123".to_string();
1445        let msgs = vec![
1446            Message {
1447                role: crate::types::MessageRole::Assistant,
1448                content: "msg1".to_string(),
1449                ..Default::default()
1450            },
1451            Message {
1452                role: crate::types::MessageRole::Assistant,
1453                content: "msg2".to_string(),
1454                ..Default::default()
1455            },
1456        ];
1457
1458        record_sidechain_transcript(&session_id, &msgs, agent_id, Some(starting_uuid))
1459            .await
1460            .unwrap();
1461
1462        let content =
1463            fs::read_to_string(get_sidechain_jsonl_path(&session_id, agent_id))
1464                .await
1465                .unwrap();
1466        let lines: Vec<&str> = content.lines().collect();
1467
1468        // First message should have the starting parent UUID
1469        let first: SessionEntry = serde_json::from_str(lines[0]).unwrap();
1470        assert_eq!(
1471            first.data.unwrap().get("parentUuid").unwrap().as_str().unwrap(),
1472            "start-uuid-123"
1473        );
1474
1475        // Second message should have a different (auto-generated) parent UUID
1476        let second: SessionEntry = serde_json::from_str(lines[1]).unwrap();
1477        let second_data = second.data.unwrap();
1478        let second_parent = second_data.get("parentUuid");
1479        assert!(second_parent.is_some());
1480        // It should NOT be the starting UUID (it's chained from the first)
1481        assert_ne!(
1482            second_parent.unwrap().as_str().unwrap(),
1483            "start-uuid-123"
1484        );
1485
1486        // Cleanup
1487        let _ = fs::remove_dir_all(get_session_path(&session_id)).await;
1488    }
1489
1490    #[tokio::test]
1491    async fn test_insert_message_chain_sidechain() {
1492        crate::tests::common::clear_all_test_state();
1493        let session_id = format!("insert-chain-{}", uuid::Uuid::new_v4());
1494        let msgs = vec![Message {
1495            role: crate::types::MessageRole::Assistant,
1496            content: "chain msg".to_string(),
1497            ..Default::default()
1498        }];
1499
1500        insert_message_chain(
1501            &session_id,
1502            &msgs,
1503            true,
1504            Some("chain-agent".to_string()),
1505            None,
1506        )
1507        .await
1508        .unwrap();
1509
1510        let path = get_sidechain_jsonl_path(&session_id, "chain-agent");
1511        assert!(path.exists());
1512
1513        // Cleanup
1514        let _ = fs::remove_dir_all(get_session_path(&session_id)).await;
1515    }
1516
1517    #[tokio::test]
1518    async fn test_insert_message_chain_main() {
1519        crate::tests::common::clear_all_test_state();
1520        let session_id = format!("insert-main-{}", uuid::Uuid::new_v4());
1521        let msgs = vec![Message {
1522            role: crate::types::MessageRole::User,
1523            content: "main msg".to_string(),
1524            ..Default::default()
1525        }];
1526
1527        insert_message_chain(&session_id, &msgs, false, None, None)
1528            .await
1529            .unwrap();
1530
1531        // Should go to main session file, not sidechain
1532        let path = get_jsonl_path(&session_id);
1533        assert!(path.exists());
1534
1535        // Cleanup
1536        let _ = fs::remove_dir_all(get_session_path(&session_id)).await;
1537    }
1538
1539    #[tokio::test]
1540    async fn test_sidechain_message_entry() {
1541        let msg = Message {
1542            role: crate::types::MessageRole::Assistant,
1543            content: "test".to_string(),
1544            ..Default::default()
1545        };
1546        let entry = SessionEntry::sidechain_message(&msg, "agent-1", Some("parent-uuid"));
1547
1548        assert_eq!(entry.entry_type.as_deref(), Some("message"));
1549        let data = entry.data.unwrap();
1550        assert!(data.get("isSidechain").unwrap().is_boolean());
1551        assert_eq!(data.get("agentId").unwrap().as_str().unwrap(), "agent-1");
1552        assert_eq!(
1553            data.get("parentUuid").unwrap().as_str().unwrap(),
1554            "parent-uuid"
1555        );
1556    }
1557}