Skip to main content

claude_agent/session/
persistence_jsonl.rs

1//! JSONL-based persistence backend compatible with Claude Code CLI.
2//!
3//! This module provides file-based session persistence using the JSONL (JSON Lines) format,
4//! matching the Claude Code CLI's storage format for full interoperability.
5//!
6//! # Features
7//!
8//! - **Claude Code CLI Compatible**: Uses the same file structure and format as the official CLI
9//! - **DAG Structure**: Messages form a directed acyclic graph via parent_uuid references
10//! - **Incremental Writes**: Only new entries are appended, avoiding full file rewrites
11//! - **Project-Based Organization**: Sessions organized by encoded project paths
12//! - **Async I/O**: Non-blocking file operations via tokio
13//!
14//! # File Structure
15//!
16//! ```text
17//! ~/.claude/
18//! └── projects/
19//!     └── {encoded-project-path}/
20//!         ├── {session-uuid}.jsonl    # Conversation history
21//!         └── ...
22//! ```
23
24use std::collections::{HashMap, HashSet};
25use std::hash::{Hash, Hasher};
26use std::io::{BufRead, BufReader, Write};
27use std::path::{Path, PathBuf};
28use std::sync::Arc;
29
30use chrono::{DateTime, Utc};
31use serde::{Deserialize, Serialize};
32use tokio::sync::RwLock;
33use uuid::Uuid;
34
35use super::state::{
36    MessageId, Session, SessionConfig, SessionId, SessionMessage, SessionMode, SessionState,
37    SessionType,
38};
39use super::types::{
40    CompactRecord, CompactTrigger, EnvironmentContext, Plan, PlanStatus, QueueItem, QueueStatus,
41    SummarySnapshot, TodoItem, TodoStatus,
42};
43use super::{Persistence, SessionError, SessionResult};
44use crate::types::{ContentBlock, Role, TokenUsage};
45
46// ============================================================================
47// Configuration
48// ============================================================================
49
50/// Sync mode for file operations.
51#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
52pub enum SyncMode {
53    /// No explicit sync (OS buffering only).
54    #[default]
55    None,
56    /// Sync after every write (safest, slowest).
57    OnWrite,
58}
59
60/// Configuration for JSONL persistence.
61#[derive(Clone, Debug)]
62pub struct JsonlConfig {
63    /// Base directory for storage (default: ~/.claude).
64    pub base_dir: PathBuf,
65    /// Log retention period in days (default: 30).
66    pub retention_days: u32,
67    /// File sync mode for durability.
68    pub sync_mode: SyncMode,
69}
70
71impl Default for JsonlConfig {
72    fn default() -> Self {
73        Self {
74            base_dir: dirs::home_dir()
75                .unwrap_or_else(|| PathBuf::from("."))
76                .join(".claude"),
77            retention_days: 30,
78            sync_mode: SyncMode::default(),
79        }
80    }
81}
82
83impl JsonlConfig {
84    pub fn builder() -> JsonlConfigBuilder {
85        JsonlConfigBuilder::default()
86    }
87
88    fn projects_dir(&self) -> PathBuf {
89        self.base_dir.join("projects")
90    }
91
92    /// Encode a project path for use as a directory name.
93    /// Cross-platform: handles both Unix and Windows path separators.
94    fn encode_project_path(&self, path: &Path) -> String {
95        path.to_string_lossy()
96            .replace(['/', '\\'], "-")
97            .replace(':', "_") // Windows drive letters
98    }
99
100    fn project_dir(&self, project_path: &Path) -> PathBuf {
101        self.projects_dir()
102            .join(self.encode_project_path(project_path))
103    }
104}
105
106/// Builder for JsonlConfig.
107#[derive(Default)]
108pub struct JsonlConfigBuilder {
109    base_dir: Option<PathBuf>,
110    retention_days: Option<u32>,
111    sync_mode: Option<SyncMode>,
112}
113
114impl JsonlConfigBuilder {
115    pub fn base_dir(mut self, path: impl Into<PathBuf>) -> Self {
116        self.base_dir = Some(path.into());
117        self
118    }
119
120    pub fn retention_days(mut self, days: u32) -> Self {
121        self.retention_days = Some(days);
122        self
123    }
124
125    pub fn sync_mode(mut self, mode: SyncMode) -> Self {
126        self.sync_mode = Some(mode);
127        self
128    }
129
130    pub fn build(self) -> JsonlConfig {
131        let default = JsonlConfig::default();
132        JsonlConfig {
133            base_dir: self.base_dir.unwrap_or(default.base_dir),
134            retention_days: self.retention_days.unwrap_or(default.retention_days),
135            sync_mode: self.sync_mode.unwrap_or(default.sync_mode),
136        }
137    }
138}
139
140// ============================================================================
141// JSONL Entry Types (Claude Code CLI Compatible)
142// ============================================================================
143
144/// JSONL entry types matching Claude Code CLI format.
145#[derive(Clone, Debug, Serialize, Deserialize)]
146#[serde(tag = "type", rename_all = "snake_case")]
147pub enum JsonlEntry {
148    User(UserEntry),
149    Assistant(AssistantEntry),
150    System(SystemEntry),
151    QueueOperation(QueueOperationEntry),
152    Summary(SummaryEntry),
153    SessionMeta(SessionMetaEntry),
154    Todo(TodoEntry),
155    Plan(PlanEntry),
156    Compact(CompactEntry),
157}
158
159/// Common fields shared by message entries.
160#[derive(Clone, Debug, Serialize, Deserialize)]
161pub struct EntryCommon {
162    pub uuid: String,
163    #[serde(rename = "parentUuid", skip_serializing_if = "Option::is_none")]
164    pub parent_uuid: Option<String>,
165    #[serde(rename = "sessionId")]
166    pub session_id: String,
167    pub timestamp: DateTime<Utc>,
168    #[serde(skip_serializing_if = "Option::is_none")]
169    pub cwd: Option<PathBuf>,
170    pub version: String,
171    #[serde(rename = "gitBranch", default)]
172    pub git_branch: String,
173    #[serde(rename = "isSidechain", default)]
174    pub is_sidechain: bool,
175}
176
177impl EntryCommon {
178    fn from_message(session_id: &SessionId, msg: &SessionMessage) -> Self {
179        Self {
180            uuid: msg.id.to_string(),
181            parent_uuid: msg.parent_id.as_ref().map(|id| id.to_string()),
182            session_id: session_id.to_string(),
183            timestamp: msg.timestamp,
184            cwd: msg.environment.as_ref().and_then(|e| e.cwd.clone()),
185            version: env!("CARGO_PKG_VERSION").to_string(),
186            git_branch: msg
187                .environment
188                .as_ref()
189                .and_then(|e| e.git_branch.clone())
190                .unwrap_or_default(),
191            is_sidechain: msg.is_sidechain,
192        }
193    }
194
195    fn to_environment(&self) -> EnvironmentContext {
196        EnvironmentContext {
197            cwd: self.cwd.clone(),
198            git_branch: if self.git_branch.is_empty() {
199                None
200            } else {
201                Some(self.git_branch.clone())
202            },
203            ..Default::default()
204        }
205    }
206}
207
208#[derive(Clone, Debug, Serialize, Deserialize)]
209pub struct UserMessageContent {
210    pub role: String,
211    pub content: serde_json::Value,
212}
213
214#[derive(Clone, Debug, Serialize, Deserialize)]
215pub struct UserEntry {
216    #[serde(flatten)]
217    pub common: EntryCommon,
218    pub message: UserMessageContent,
219    #[serde(rename = "isCompactSummary", default)]
220    pub is_compact_summary: bool,
221}
222
223#[derive(Clone, Debug, Serialize, Deserialize)]
224pub struct AssistantMessageContent {
225    #[serde(skip_serializing_if = "Option::is_none")]
226    pub id: Option<String>,
227    #[serde(skip_serializing_if = "Option::is_none")]
228    pub model: Option<String>,
229    pub role: String,
230    #[serde(skip_serializing_if = "Option::is_none")]
231    pub stop_reason: Option<String>,
232    pub content: serde_json::Value,
233    #[serde(skip_serializing_if = "Option::is_none")]
234    pub usage: Option<UsageInfo>,
235}
236
237#[derive(Clone, Debug, Default, Serialize, Deserialize)]
238pub struct UsageInfo {
239    pub input_tokens: u64,
240    pub output_tokens: u64,
241    #[serde(default)]
242    pub cache_creation_input_tokens: u64,
243    #[serde(default)]
244    pub cache_read_input_tokens: u64,
245}
246
247impl From<&TokenUsage> for UsageInfo {
248    fn from(u: &TokenUsage) -> Self {
249        Self {
250            input_tokens: u.input_tokens,
251            output_tokens: u.output_tokens,
252            cache_creation_input_tokens: u.cache_creation_input_tokens,
253            cache_read_input_tokens: u.cache_read_input_tokens,
254        }
255    }
256}
257
258impl From<&UsageInfo> for TokenUsage {
259    fn from(u: &UsageInfo) -> Self {
260        Self {
261            input_tokens: u.input_tokens,
262            output_tokens: u.output_tokens,
263            cache_creation_input_tokens: u.cache_creation_input_tokens,
264            cache_read_input_tokens: u.cache_read_input_tokens,
265        }
266    }
267}
268
269#[derive(Clone, Debug, Serialize, Deserialize)]
270pub struct AssistantEntry {
271    #[serde(flatten)]
272    pub common: EntryCommon,
273    pub message: AssistantMessageContent,
274    #[serde(rename = "requestId", skip_serializing_if = "Option::is_none")]
275    pub request_id: Option<String>,
276}
277
278#[derive(Clone, Debug, Serialize, Deserialize)]
279pub struct SystemEntry {
280    #[serde(flatten)]
281    pub common: EntryCommon,
282    pub subtype: String,
283    #[serde(skip_serializing_if = "Option::is_none")]
284    pub content: Option<String>,
285}
286
287#[derive(Clone, Debug, Serialize, Deserialize)]
288pub struct QueueOperationEntry {
289    pub operation: String,
290    #[serde(rename = "sessionId")]
291    pub session_id: String,
292    pub timestamp: DateTime<Utc>,
293    pub content: String,
294    pub priority: i32,
295    #[serde(rename = "itemId")]
296    pub item_id: String,
297}
298
299#[derive(Clone, Debug, Serialize, Deserialize)]
300pub struct SummaryEntry {
301    #[serde(rename = "sessionId")]
302    pub session_id: String,
303    pub summary: String,
304    #[serde(rename = "leafUuid", skip_serializing_if = "Option::is_none")]
305    pub leaf_uuid: Option<String>,
306    pub timestamp: DateTime<Utc>,
307}
308
309#[derive(Clone, Debug, Serialize, Deserialize)]
310pub struct SessionMetaEntry {
311    #[serde(rename = "sessionId")]
312    pub session_id: String,
313    #[serde(rename = "parentSessionId", skip_serializing_if = "Option::is_none")]
314    pub parent_session_id: Option<String>,
315    #[serde(rename = "tenantId", skip_serializing_if = "Option::is_none")]
316    pub tenant_id: Option<String>,
317    #[serde(rename = "sessionType")]
318    pub session_type: serde_json::Value,
319    pub mode: String,
320    pub state: String,
321    pub config: serde_json::Value,
322    #[serde(rename = "permissionPolicy")]
323    pub permission_policy: serde_json::Value,
324    #[serde(rename = "totalUsage", default)]
325    pub total_usage: UsageInfo,
326    #[serde(rename = "totalCostUsd", default)]
327    pub total_cost_usd: f64,
328    #[serde(rename = "staticContextHash", skip_serializing_if = "Option::is_none")]
329    pub static_context_hash: Option<String>,
330    #[serde(skip_serializing_if = "Option::is_none")]
331    pub error: Option<String>,
332    #[serde(rename = "createdAt")]
333    pub created_at: DateTime<Utc>,
334    #[serde(rename = "updatedAt")]
335    pub updated_at: DateTime<Utc>,
336    #[serde(rename = "expiresAt", skip_serializing_if = "Option::is_none")]
337    pub expires_at: Option<DateTime<Utc>>,
338}
339
340#[derive(Clone, Debug, Serialize, Deserialize)]
341pub struct TodoEntry {
342    pub id: String,
343    #[serde(rename = "sessionId")]
344    pub session_id: String,
345    pub content: String,
346    #[serde(rename = "activeForm")]
347    pub active_form: String,
348    pub status: String,
349    #[serde(rename = "planId", skip_serializing_if = "Option::is_none")]
350    pub plan_id: Option<String>,
351    #[serde(rename = "createdAt")]
352    pub created_at: DateTime<Utc>,
353    #[serde(rename = "startedAt", skip_serializing_if = "Option::is_none")]
354    pub started_at: Option<DateTime<Utc>>,
355    #[serde(rename = "completedAt", skip_serializing_if = "Option::is_none")]
356    pub completed_at: Option<DateTime<Utc>>,
357}
358
359#[derive(Clone, Debug, Serialize, Deserialize)]
360pub struct PlanEntry {
361    pub id: String,
362    #[serde(rename = "sessionId")]
363    pub session_id: String,
364    #[serde(skip_serializing_if = "Option::is_none")]
365    pub name: Option<String>,
366    pub content: String,
367    pub status: String,
368    #[serde(skip_serializing_if = "Option::is_none")]
369    pub error: Option<String>,
370    #[serde(rename = "createdAt")]
371    pub created_at: DateTime<Utc>,
372    #[serde(rename = "approvedAt", skip_serializing_if = "Option::is_none")]
373    pub approved_at: Option<DateTime<Utc>>,
374    #[serde(rename = "startedAt", skip_serializing_if = "Option::is_none")]
375    pub started_at: Option<DateTime<Utc>>,
376    #[serde(rename = "completedAt", skip_serializing_if = "Option::is_none")]
377    pub completed_at: Option<DateTime<Utc>>,
378}
379
380#[derive(Clone, Debug, Serialize, Deserialize)]
381pub struct CompactEntry {
382    pub id: String,
383    #[serde(rename = "sessionId")]
384    pub session_id: String,
385    pub trigger: String,
386    #[serde(rename = "preTokens")]
387    pub pre_tokens: usize,
388    #[serde(rename = "postTokens")]
389    pub post_tokens: usize,
390    #[serde(rename = "savedTokens")]
391    pub saved_tokens: usize,
392    pub summary: String,
393    #[serde(rename = "originalCount")]
394    pub original_count: usize,
395    #[serde(rename = "newCount")]
396    pub new_count: usize,
397    #[serde(rename = "logicalParentId", skip_serializing_if = "Option::is_none")]
398    pub logical_parent_id: Option<String>,
399    #[serde(rename = "createdAt")]
400    pub created_at: DateTime<Utc>,
401}
402
403// ============================================================================
404// Conversion: SessionMessage <-> JsonlEntry
405// ============================================================================
406
407impl JsonlEntry {
408    fn from_message(session_id: &SessionId, msg: &SessionMessage) -> Self {
409        let common = EntryCommon::from_message(session_id, msg);
410
411        match msg.role {
412            Role::User => JsonlEntry::User(UserEntry {
413                common,
414                message: UserMessageContent {
415                    role: "user".to_string(),
416                    content: serde_json::to_value(&msg.content).unwrap_or_default(),
417                },
418                is_compact_summary: msg.is_compact_summary,
419            }),
420            Role::Assistant => JsonlEntry::Assistant(AssistantEntry {
421                common,
422                message: AssistantMessageContent {
423                    id: msg.metadata.request_id.clone(),
424                    model: msg.metadata.model.clone(),
425                    role: "assistant".to_string(),
426                    stop_reason: None,
427                    content: serde_json::to_value(&msg.content).unwrap_or_default(),
428                    usage: msg.usage.as_ref().map(UsageInfo::from),
429                },
430                request_id: msg.metadata.request_id.clone(),
431            }),
432        }
433    }
434
435    fn to_session_message(&self) -> Option<SessionMessage> {
436        match self {
437            JsonlEntry::User(entry) => {
438                let content: Vec<ContentBlock> =
439                    serde_json::from_value(entry.message.content.clone()).unwrap_or_default();
440                let mut msg = SessionMessage::user(content);
441                msg.id = MessageId::from_string(&entry.common.uuid);
442                msg.parent_id = entry
443                    .common
444                    .parent_uuid
445                    .as_ref()
446                    .map(MessageId::from_string);
447                msg.timestamp = entry.common.timestamp;
448                msg.is_sidechain = entry.common.is_sidechain;
449                msg.is_compact_summary = entry.is_compact_summary;
450                msg.environment = Some(entry.common.to_environment());
451                Some(msg)
452            }
453            JsonlEntry::Assistant(entry) => {
454                let content: Vec<ContentBlock> =
455                    serde_json::from_value(entry.message.content.clone()).unwrap_or_default();
456                let mut msg = SessionMessage::assistant(content);
457                msg.id = MessageId::from_string(&entry.common.uuid);
458                msg.parent_id = entry
459                    .common
460                    .parent_uuid
461                    .as_ref()
462                    .map(MessageId::from_string);
463                msg.timestamp = entry.common.timestamp;
464                msg.is_sidechain = entry.common.is_sidechain;
465                msg.usage = entry.message.usage.as_ref().map(TokenUsage::from);
466                msg.metadata.model.clone_from(&entry.message.model);
467                msg.metadata.request_id.clone_from(&entry.request_id);
468                msg.environment = Some(entry.common.to_environment());
469                Some(msg)
470            }
471            _ => None,
472        }
473    }
474
475    #[cfg(test)]
476    fn message_uuid(&self) -> Option<&str> {
477        match self {
478            JsonlEntry::User(e) => Some(&e.common.uuid),
479            JsonlEntry::Assistant(e) => Some(&e.common.uuid),
480            _ => None,
481        }
482    }
483}
484
485// ============================================================================
486// Session Index
487// ============================================================================
488
489#[derive(Clone, Debug)]
490struct SessionMeta {
491    path: PathBuf,
492    project_path: Option<PathBuf>,
493    tenant_id: Option<String>,
494    parent_id: Option<SessionId>,
495    updated_at: DateTime<Utc>,
496    /// IDs of persisted messages and compacts
497    persisted_ids: HashSet<String>,
498    /// Hash of last persisted todos state (to detect changes)
499    todos_hash: u64,
500    /// Hash of last persisted plan state (to detect changes)
501    plan_hash: u64,
502}
503
504#[derive(Default)]
505struct SessionIndex {
506    sessions: HashMap<SessionId, SessionMeta>,
507    by_project: HashMap<PathBuf, Vec<SessionId>>,
508    by_tenant: HashMap<String, Vec<SessionId>>,
509    by_parent: HashMap<SessionId, Vec<SessionId>>,
510}
511
512impl SessionIndex {
513    fn insert(&mut self, session_id: SessionId, meta: SessionMeta) {
514        // Remove old entries if updating
515        self.remove(&session_id);
516
517        if let Some(ref project) = meta.project_path {
518            self.by_project
519                .entry(project.clone())
520                .or_default()
521                .push(session_id);
522        }
523        if let Some(ref tenant) = meta.tenant_id {
524            self.by_tenant
525                .entry(tenant.clone())
526                .or_default()
527                .push(session_id);
528        }
529        if let Some(parent) = meta.parent_id {
530            self.by_parent.entry(parent).or_default().push(session_id);
531        }
532        self.sessions.insert(session_id, meta);
533    }
534
535    fn remove(&mut self, session_id: &SessionId) -> Option<SessionMeta> {
536        let meta = self.sessions.remove(session_id)?;
537
538        if let Some(ref project) = meta.project_path
539            && let Some(ids) = self.by_project.get_mut(project)
540        {
541            ids.retain(|id| id != session_id);
542        }
543        if let Some(ref tenant) = meta.tenant_id
544            && let Some(ids) = self.by_tenant.get_mut(tenant)
545        {
546            ids.retain(|id| id != session_id);
547        }
548        if let Some(parent) = meta.parent_id
549            && let Some(ids) = self.by_parent.get_mut(&parent)
550        {
551            ids.retain(|id| id != session_id);
552        }
553        Some(meta)
554    }
555}
556
557// ============================================================================
558// File Operations (blocking, run via spawn_blocking)
559// ============================================================================
560
561fn read_entries_sync(path: &Path) -> SessionResult<Vec<JsonlEntry>> {
562    if !path.exists() {
563        return Ok(Vec::new());
564    }
565
566    let file = std::fs::File::open(path).map_err(|e| SessionError::Storage {
567        message: format!("Failed to open {}: {}", path.display(), e),
568    })?;
569
570    let reader = BufReader::with_capacity(64 * 1024, file);
571    let mut entries = Vec::with_capacity(128);
572
573    for (line_num, line) in reader.lines().enumerate() {
574        let line = line.map_err(|e| SessionError::Storage {
575            message: format!("Read error at line {}: {}", line_num + 1, e),
576        })?;
577
578        if line.trim().is_empty() {
579            continue;
580        }
581
582        match serde_json::from_str::<JsonlEntry>(&line) {
583            Ok(entry) => entries.push(entry),
584            Err(e) => {
585                tracing::warn!(
586                    path = %path.display(),
587                    line = line_num + 1,
588                    error = %e,
589                    "Skipping malformed JSONL entry"
590                );
591            }
592        }
593    }
594
595    Ok(entries)
596}
597
598fn append_entries_sync(path: &Path, entries: &[JsonlEntry], sync: bool) -> SessionResult<()> {
599    if entries.is_empty() {
600        return Ok(());
601    }
602
603    if let Some(parent) = path.parent() {
604        std::fs::create_dir_all(parent).map_err(|e| SessionError::Storage {
605            message: format!("Failed to create directory {}: {}", parent.display(), e),
606        })?;
607    }
608
609    let file = std::fs::OpenOptions::new()
610        .create(true)
611        .append(true)
612        .open(path)
613        .map_err(|e| SessionError::Storage {
614            message: format!("Failed to open {} for writing: {}", path.display(), e),
615        })?;
616
617    let mut writer = std::io::BufWriter::with_capacity(64 * 1024, file);
618
619    for entry in entries {
620        serde_json::to_writer(&mut writer, entry)?;
621        writeln!(writer).map_err(|e| SessionError::Storage {
622            message: format!("Write failed: {}", e),
623        })?;
624    }
625
626    writer.flush().map_err(|e| SessionError::Storage {
627        message: format!("Flush failed: {}", e),
628    })?;
629
630    if sync {
631        writer
632            .into_inner()
633            .map_err(|e| SessionError::Storage {
634                message: format!("Buffer error: {}", e.error()),
635            })?
636            .sync_all()
637            .map_err(|e| SessionError::Storage {
638                message: format!("Sync failed: {}", e),
639            })?;
640    }
641
642    Ok(())
643}
644
645// ============================================================================
646// JSONL Persistence Implementation
647// ============================================================================
648
649pub struct JsonlPersistence {
650    config: JsonlConfig,
651    index: Arc<RwLock<SessionIndex>>,
652    summaries: Arc<RwLock<HashMap<SessionId, Vec<SummarySnapshot>>>>,
653    queue: Arc<RwLock<HashMap<SessionId, Vec<QueueItem>>>>,
654}
655
656impl JsonlPersistence {
657    pub async fn new(config: JsonlConfig) -> SessionResult<Self> {
658        tokio::fs::create_dir_all(config.projects_dir())
659            .await
660            .map_err(|e| SessionError::Storage {
661                message: format!("Failed to create projects directory: {}", e),
662            })?;
663
664        let persistence = Self {
665            config,
666            index: Arc::new(RwLock::new(SessionIndex::default())),
667            summaries: Arc::new(RwLock::new(HashMap::new())),
668            queue: Arc::new(RwLock::new(HashMap::new())),
669        };
670
671        persistence.rebuild_index().await?;
672        Ok(persistence)
673    }
674
675    pub async fn default_config() -> SessionResult<Self> {
676        Self::new(JsonlConfig::default()).await
677    }
678
679    async fn rebuild_index(&self) -> SessionResult<()> {
680        let projects_dir = self.config.projects_dir();
681        if !projects_dir.exists() {
682            return Ok(());
683        }
684
685        let mut index = self.index.write().await;
686        let mut summaries = self.summaries.write().await;
687
688        let mut entries =
689            tokio::fs::read_dir(&projects_dir)
690                .await
691                .map_err(|e| SessionError::Storage {
692                    message: format!("Failed to read projects dir: {}", e),
693                })?;
694
695        while let Some(project_entry) =
696            entries
697                .next_entry()
698                .await
699                .map_err(|e| SessionError::Storage {
700                    message: format!("Failed to read entry: {}", e),
701                })?
702        {
703            let file_type = project_entry.file_type().await.ok();
704            if !file_type.map(|t| t.is_dir()).unwrap_or(false) {
705                continue;
706            }
707
708            let project_path = project_entry.path();
709            let mut files =
710                tokio::fs::read_dir(&project_path)
711                    .await
712                    .map_err(|e| SessionError::Storage {
713                        message: format!("Failed to read project dir: {}", e),
714                    })?;
715
716            while let Some(file_entry) =
717                files
718                    .next_entry()
719                    .await
720                    .map_err(|e| SessionError::Storage {
721                        message: format!("Failed to read file entry: {}", e),
722                    })?
723            {
724                let file_path = file_entry.path();
725                if file_path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
726                    continue;
727                }
728
729                let session_id = match file_path
730                    .file_stem()
731                    .and_then(|s| s.to_str())
732                    .and_then(SessionId::parse)
733                {
734                    Some(id) => id,
735                    None => continue,
736                };
737
738                // Read file in blocking context
739                let path_clone = file_path.clone();
740                let parsed = tokio::task::spawn_blocking(move || read_entries_sync(&path_clone))
741                    .await
742                    .map_err(|e| SessionError::Storage {
743                        message: format!("Task join error: {}", e),
744                    })??;
745
746                let (meta, session_summaries) =
747                    Self::parse_file_metadata(session_id, file_path, &parsed);
748
749                index.insert(session_id, meta);
750                if !session_summaries.is_empty() {
751                    summaries.insert(session_id, session_summaries);
752                }
753            }
754        }
755
756        Ok(())
757    }
758
759    fn parse_file_metadata(
760        session_id: SessionId,
761        path: PathBuf,
762        entries: &[JsonlEntry],
763    ) -> (SessionMeta, Vec<SummarySnapshot>) {
764        let mut project_path: Option<PathBuf> = None;
765        let mut tenant_id: Option<String> = None;
766        let mut parent_id: Option<SessionId> = None;
767        let mut updated_at = Utc::now();
768        let mut summaries = Vec::new();
769        let mut persisted_ids = HashSet::with_capacity(entries.len());
770
771        for entry in entries {
772            match entry {
773                JsonlEntry::User(e) => {
774                    if project_path.is_none() {
775                        project_path = e.common.cwd.clone();
776                    }
777                    updated_at = e.common.timestamp;
778                    persisted_ids.insert(e.common.uuid.clone());
779                }
780                JsonlEntry::Assistant(e) => {
781                    if project_path.is_none() {
782                        project_path = e.common.cwd.clone();
783                    }
784                    updated_at = e.common.timestamp;
785                    persisted_ids.insert(e.common.uuid.clone());
786                }
787                JsonlEntry::SessionMeta(m) => {
788                    tenant_id.clone_from(&m.tenant_id);
789                    parent_id = m
790                        .parent_session_id
791                        .as_ref()
792                        .and_then(|s| SessionId::parse(s));
793                    updated_at = m.updated_at;
794                }
795                JsonlEntry::Summary(s) => {
796                    summaries.push(SummarySnapshot {
797                        id: Uuid::new_v4(),
798                        session_id,
799                        summary: s.summary.clone(),
800                        leaf_message_id: s.leaf_uuid.as_ref().map(MessageId::from_string),
801                        created_at: s.timestamp,
802                    });
803                }
804                _ => {}
805            }
806        }
807
808        (
809            SessionMeta {
810                path,
811                project_path,
812                tenant_id,
813                parent_id,
814                updated_at,
815                persisted_ids,
816                todos_hash: 0, // Will be computed on first save
817                plan_hash: 0,  // Will be computed on first save
818            },
819            summaries,
820        )
821    }
822
823    fn session_file_path(&self, session_id: &SessionId, project_path: Option<&Path>) -> PathBuf {
824        let dir = match project_path {
825            Some(p) => self.config.project_dir(p),
826            None => self.config.projects_dir().join("_default"),
827        };
828        dir.join(format!("{}.jsonl", session_id))
829    }
830
831    fn get_project_path(session: &Session) -> Option<PathBuf> {
832        session
833            .messages
834            .first()
835            .and_then(|m| m.environment.as_ref())
836            .and_then(|e| e.cwd.clone())
837    }
838
839    /// Compute a simple hash of todos for change detection.
840    fn compute_todos_hash(todos: &[TodoItem]) -> u64 {
841        let mut hasher = std::collections::hash_map::DefaultHasher::new();
842        for todo in todos {
843            todo.id.hash(&mut hasher);
844            format!("{:?}", todo.status).hash(&mut hasher);
845            todo.content.hash(&mut hasher);
846        }
847        hasher.finish()
848    }
849
850    /// Compute a simple hash of plan for change detection.
851    fn compute_plan_hash(plan: Option<&Plan>) -> u64 {
852        let mut hasher = std::collections::hash_map::DefaultHasher::new();
853        if let Some(p) = plan {
854            p.id.hash(&mut hasher);
855            format!("{:?}", p.status).hash(&mut hasher);
856            p.content.hash(&mut hasher);
857        }
858        hasher.finish()
859    }
860
861    fn session_to_meta_entry(session: &Session) -> JsonlEntry {
862        JsonlEntry::SessionMeta(SessionMetaEntry {
863            session_id: session.id.to_string(),
864            parent_session_id: session.parent_id.map(|p| p.to_string()),
865            tenant_id: session.tenant_id.clone(),
866            session_type: serde_json::to_value(&session.session_type).unwrap_or_default(),
867            mode: format!("{:?}", session.mode).to_lowercase(),
868            state: format!("{:?}", session.state).to_lowercase(),
869            config: serde_json::to_value(&session.config).unwrap_or_default(),
870            permission_policy: serde_json::to_value(&session.permission_policy).unwrap_or_default(),
871            total_usage: UsageInfo::from(&session.total_usage),
872            total_cost_usd: session.total_cost_usd,
873            static_context_hash: session.static_context_hash.clone(),
874            error: session.error.clone(),
875            created_at: session.created_at,
876            updated_at: session.updated_at,
877            expires_at: session.expires_at,
878        })
879    }
880
881    fn reconstruct_session(session_id: SessionId, entries: Vec<JsonlEntry>) -> Session {
882        let mut session = Session::new(SessionConfig::default());
883        session.id = session_id;
884
885        let mut messages: HashMap<String, SessionMessage> = HashMap::with_capacity(entries.len());
886        let mut todos_map: HashMap<String, TodoItem> = HashMap::new();
887        let mut latest_plan: Option<Plan> = None;
888        let mut compacts: Vec<CompactRecord> = Vec::new();
889
890        for entry in entries {
891            match entry {
892                JsonlEntry::User(_) | JsonlEntry::Assistant(_) => {
893                    if let Some(msg) = entry.to_session_message() {
894                        messages.insert(msg.id.to_string(), msg);
895                    }
896                }
897                JsonlEntry::SessionMeta(m) => {
898                    session.tenant_id = m.tenant_id;
899                    session.parent_id = m
900                        .parent_session_id
901                        .as_ref()
902                        .and_then(|s| SessionId::parse(s));
903                    session.session_type =
904                        serde_json::from_value(m.session_type).unwrap_or(SessionType::Main);
905                    session.mode = serde_json::from_str(&format!("\"{}\"", m.mode))
906                        .unwrap_or(SessionMode::default());
907                    session.state = SessionState::from_str_lenient(&m.state);
908                    session.config = serde_json::from_value(m.config).unwrap_or_default();
909                    session.permission_policy =
910                        serde_json::from_value(m.permission_policy).unwrap_or_default();
911                    session.total_usage = TokenUsage::from(&m.total_usage);
912                    session.total_cost_usd = m.total_cost_usd;
913                    session.static_context_hash = m.static_context_hash;
914                    session.error = m.error;
915                    session.created_at = m.created_at;
916                    session.updated_at = m.updated_at;
917                    session.expires_at = m.expires_at;
918                }
919                JsonlEntry::Summary(s) => {
920                    session.summary = Some(s.summary);
921                }
922                JsonlEntry::Todo(t) => {
923                    let todo = TodoItem {
924                        id: Uuid::parse_str(&t.id).unwrap_or_else(|_| Uuid::new_v4()),
925                        session_id,
926                        content: t.content,
927                        active_form: t.active_form,
928                        status: TodoStatus::from_str_lenient(&t.status),
929                        plan_id: t.plan_id.and_then(|s| Uuid::parse_str(&s).ok()),
930                        created_at: t.created_at,
931                        started_at: t.started_at,
932                        completed_at: t.completed_at,
933                    };
934                    // Use map to get latest version of each todo
935                    todos_map.insert(t.id, todo);
936                }
937                JsonlEntry::Plan(p) => {
938                    let plan = Plan {
939                        id: Uuid::parse_str(&p.id).unwrap_or_else(|_| Uuid::new_v4()),
940                        session_id,
941                        name: p.name,
942                        content: p.content,
943                        status: PlanStatus::from_str_lenient(&p.status),
944                        error: p.error,
945                        created_at: p.created_at,
946                        approved_at: p.approved_at,
947                        started_at: p.started_at,
948                        completed_at: p.completed_at,
949                    };
950                    // Keep the latest plan entry
951                    latest_plan = Some(plan);
952                }
953                JsonlEntry::Compact(c) => {
954                    compacts.push(CompactRecord {
955                        id: Uuid::parse_str(&c.id).unwrap_or_else(|_| Uuid::new_v4()),
956                        session_id,
957                        trigger: CompactTrigger::from_str_lenient(&c.trigger),
958                        pre_tokens: c.pre_tokens,
959                        post_tokens: c.post_tokens,
960                        saved_tokens: c.saved_tokens,
961                        summary: c.summary,
962                        original_count: c.original_count,
963                        new_count: c.new_count,
964                        logical_parent_id: c.logical_parent_id.as_ref().map(MessageId::from_string),
965                        created_at: c.created_at,
966                    });
967                }
968                _ => {}
969            }
970        }
971
972        // Topological sort preserving order
973        let ordered = Self::topological_sort(&messages);
974        session.messages = Vec::with_capacity(ordered.len());
975        for msg in ordered {
976            session.add_message(msg);
977        }
978
979        // Restore todos, plan, and compacts
980        session.todos = todos_map.into_values().collect();
981        session
982            .todos
983            .sort_by(|a, b| a.created_at.cmp(&b.created_at));
984        session.current_plan = latest_plan;
985        session.compact_history = compacts;
986
987        session
988    }
989
990    fn topological_sort(messages: &HashMap<String, SessionMessage>) -> Vec<SessionMessage> {
991        if messages.is_empty() {
992            return Vec::new();
993        }
994
995        // Build parent -> children mapping, sorted by timestamp to preserve order
996        let mut children: HashMap<Option<String>, Vec<&SessionMessage>> = HashMap::new();
997        for msg in messages.values() {
998            children
999                .entry(msg.parent_id.as_ref().map(|p| p.to_string()))
1000                .or_default()
1001                .push(msg);
1002        }
1003
1004        // Sort each group by timestamp
1005        for group in children.values_mut() {
1006            group.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
1007        }
1008
1009        // BFS traversal using VecDeque for FIFO order
1010        let mut result = Vec::with_capacity(messages.len());
1011        let mut queue = std::collections::VecDeque::new();
1012
1013        // Start with root messages (no parent)
1014        if let Some(roots) = children.remove(&None) {
1015            queue.extend(roots);
1016        }
1017
1018        while let Some(msg) = queue.pop_front() {
1019            let id = msg.id.to_string();
1020            result.push(msg.clone());
1021            if let Some(child_msgs) = children.remove(&Some(id)) {
1022                queue.extend(child_msgs);
1023            }
1024        }
1025
1026        result
1027    }
1028}
1029
1030#[async_trait::async_trait]
1031impl Persistence for JsonlPersistence {
1032    fn name(&self) -> &str {
1033        "jsonl"
1034    }
1035
1036    async fn save(&self, session: &Session) -> SessionResult<()> {
1037        let project_path = Self::get_project_path(session);
1038        let file_path = self.session_file_path(&session.id, project_path.as_deref());
1039
1040        // Get persisted state from index (avoid re-reading file)
1041        let (persisted_ids, prev_todos_hash, prev_plan_hash) = {
1042            let index = self.index.read().await;
1043            match index.sessions.get(&session.id) {
1044                Some(m) => (m.persisted_ids.clone(), m.todos_hash, m.plan_hash),
1045                None => (HashSet::new(), 0, 0),
1046            }
1047        };
1048
1049        let mut new_entries = Vec::new();
1050        let mut new_ids = HashSet::new();
1051        let is_first_save = persisted_ids.is_empty();
1052
1053        // Session meta - always write on first save
1054        if is_first_save {
1055            new_entries.push(Self::session_to_meta_entry(session));
1056        }
1057
1058        // Only new messages (incremental)
1059        for msg in &session.messages {
1060            let id = msg.id.to_string();
1061            if !persisted_ids.contains(&id) {
1062                new_entries.push(JsonlEntry::from_message(&session.id, msg));
1063                new_ids.insert(id);
1064            }
1065        }
1066
1067        // Compute current hashes
1068        let current_todos_hash = Self::compute_todos_hash(&session.todos);
1069        let current_plan_hash = Self::compute_plan_hash(session.current_plan.as_ref());
1070
1071        // Persist todos only if changed (including when cleared)
1072        if current_todos_hash != prev_todos_hash {
1073            for todo in &session.todos {
1074                new_entries.push(JsonlEntry::Todo(TodoEntry {
1075                    id: todo.id.to_string(),
1076                    session_id: session.id.to_string(),
1077                    content: todo.content.clone(),
1078                    active_form: todo.active_form.clone(),
1079                    status: format!("{:?}", todo.status).to_lowercase(),
1080                    plan_id: todo.plan_id.map(|id| id.to_string()),
1081                    created_at: todo.created_at,
1082                    started_at: todo.started_at,
1083                    completed_at: todo.completed_at,
1084                }));
1085            }
1086        }
1087
1088        // Persist plan only if changed
1089        if current_plan_hash != prev_plan_hash
1090            && let Some(ref plan) = session.current_plan
1091        {
1092            new_entries.push(JsonlEntry::Plan(PlanEntry {
1093                id: plan.id.to_string(),
1094                session_id: session.id.to_string(),
1095                name: plan.name.clone(),
1096                content: plan.content.clone(),
1097                status: format!("{:?}", plan.status).to_lowercase(),
1098                error: plan.error.clone(),
1099                created_at: plan.created_at,
1100                approved_at: plan.approved_at,
1101                started_at: plan.started_at,
1102                completed_at: plan.completed_at,
1103            }));
1104        }
1105
1106        // Persist compact history (incremental by ID)
1107        for compact in &session.compact_history {
1108            let compact_id = format!("compact:{}", compact.id);
1109            if !persisted_ids.contains(&compact_id) {
1110                new_entries.push(JsonlEntry::Compact(CompactEntry {
1111                    id: compact.id.to_string(),
1112                    session_id: session.id.to_string(),
1113                    trigger: format!("{:?}", compact.trigger).to_lowercase(),
1114                    pre_tokens: compact.pre_tokens,
1115                    post_tokens: compact.post_tokens,
1116                    saved_tokens: compact.saved_tokens,
1117                    summary: compact.summary.clone(),
1118                    original_count: compact.original_count,
1119                    new_count: compact.new_count,
1120                    logical_parent_id: compact.logical_parent_id.as_ref().map(|id| id.to_string()),
1121                    created_at: compact.created_at,
1122                }));
1123                new_ids.insert(compact_id);
1124            }
1125        }
1126
1127        if new_entries.is_empty() {
1128            return Ok(());
1129        }
1130
1131        // Write in blocking context
1132        let path_clone = file_path.clone();
1133        let sync = self.config.sync_mode == SyncMode::OnWrite;
1134        tokio::task::spawn_blocking(move || append_entries_sync(&path_clone, &new_entries, sync))
1135            .await
1136            .map_err(|e| SessionError::Storage {
1137                message: format!("Task join error: {}", e),
1138            })??;
1139
1140        // Update index with new hashes
1141        let mut index = self.index.write().await;
1142        let mut persisted = persisted_ids;
1143        persisted.extend(new_ids);
1144
1145        index.insert(
1146            session.id,
1147            SessionMeta {
1148                path: file_path,
1149                project_path,
1150                tenant_id: session.tenant_id.clone(),
1151                parent_id: session.parent_id,
1152                updated_at: session.updated_at,
1153                persisted_ids: persisted,
1154                todos_hash: current_todos_hash,
1155                plan_hash: current_plan_hash,
1156            },
1157        );
1158
1159        Ok(())
1160    }
1161
1162    async fn load(&self, id: &SessionId) -> SessionResult<Option<Session>> {
1163        let path = {
1164            let index = self.index.read().await;
1165            match index.sessions.get(id) {
1166                Some(m) => m.path.clone(),
1167                None => return Ok(None),
1168            }
1169        };
1170
1171        let entries = tokio::task::spawn_blocking(move || read_entries_sync(&path))
1172            .await
1173            .map_err(|e| SessionError::Storage {
1174                message: format!("Task join error: {}", e),
1175            })??;
1176
1177        if entries.is_empty() {
1178            return Ok(None);
1179        }
1180
1181        let session = Self::reconstruct_session(*id, entries);
1182        Ok(Some(session))
1183    }
1184
1185    async fn delete(&self, id: &SessionId) -> SessionResult<bool> {
1186        let meta = {
1187            let mut index = self.index.write().await;
1188            index.remove(id)
1189        };
1190
1191        let Some(meta) = meta else {
1192            return Ok(false);
1193        };
1194
1195        if meta.path.exists() {
1196            tokio::fs::remove_file(&meta.path)
1197                .await
1198                .map_err(|e| SessionError::Storage {
1199                    message: format!("Failed to delete {}: {}", meta.path.display(), e),
1200                })?;
1201        }
1202
1203        self.summaries.write().await.remove(id);
1204        self.queue.write().await.remove(id);
1205        Ok(true)
1206    }
1207
1208    async fn list(&self, tenant_id: Option<&str>) -> SessionResult<Vec<SessionId>> {
1209        let index = self.index.read().await;
1210        Ok(match tenant_id {
1211            Some(tid) => index.by_tenant.get(tid).cloned().unwrap_or_default(),
1212            None => index.sessions.keys().copied().collect(),
1213        })
1214    }
1215
1216    async fn list_children(&self, parent_id: &SessionId) -> SessionResult<Vec<SessionId>> {
1217        let index = self.index.read().await;
1218        Ok(index.by_parent.get(parent_id).cloned().unwrap_or_default())
1219    }
1220
1221    async fn add_summary(&self, snapshot: SummarySnapshot) -> SessionResult<()> {
1222        let path = {
1223            let index = self.index.read().await;
1224            index
1225                .sessions
1226                .get(&snapshot.session_id)
1227                .map(|m| m.path.clone())
1228        };
1229
1230        if let Some(path) = path {
1231            let entry = JsonlEntry::Summary(SummaryEntry {
1232                session_id: snapshot.session_id.to_string(),
1233                summary: snapshot.summary.clone(),
1234                leaf_uuid: snapshot.leaf_message_id.as_ref().map(|id| id.to_string()),
1235                timestamp: snapshot.created_at,
1236            });
1237
1238            let sync = self.config.sync_mode == SyncMode::OnWrite;
1239            tokio::task::spawn_blocking(move || append_entries_sync(&path, &[entry], sync))
1240                .await
1241                .map_err(|e| SessionError::Storage {
1242                    message: format!("Task join error: {}", e),
1243                })??;
1244        }
1245
1246        self.summaries
1247            .write()
1248            .await
1249            .entry(snapshot.session_id)
1250            .or_default()
1251            .push(snapshot);
1252
1253        Ok(())
1254    }
1255
1256    async fn get_summaries(&self, session_id: &SessionId) -> SessionResult<Vec<SummarySnapshot>> {
1257        Ok(self
1258            .summaries
1259            .read()
1260            .await
1261            .get(session_id)
1262            .cloned()
1263            .unwrap_or_default())
1264    }
1265
1266    async fn enqueue(
1267        &self,
1268        session_id: &SessionId,
1269        content: String,
1270        priority: i32,
1271    ) -> SessionResult<QueueItem> {
1272        let item = QueueItem::enqueue(*session_id, content.clone()).with_priority(priority);
1273
1274        let path = {
1275            let index = self.index.read().await;
1276            index.sessions.get(session_id).map(|m| m.path.clone())
1277        };
1278
1279        if let Some(path) = path {
1280            let entry = JsonlEntry::QueueOperation(QueueOperationEntry {
1281                operation: "enqueue".to_string(),
1282                session_id: session_id.to_string(),
1283                timestamp: Utc::now(),
1284                content,
1285                priority,
1286                item_id: item.id.to_string(),
1287            });
1288
1289            let sync = self.config.sync_mode == SyncMode::OnWrite;
1290            tokio::task::spawn_blocking(move || append_entries_sync(&path, &[entry], sync))
1291                .await
1292                .map_err(|e| SessionError::Storage {
1293                    message: format!("Task join error: {}", e),
1294                })??;
1295        }
1296
1297        self.queue
1298            .write()
1299            .await
1300            .entry(*session_id)
1301            .or_default()
1302            .push(item.clone());
1303
1304        Ok(item)
1305    }
1306
1307    async fn dequeue(&self, session_id: &SessionId) -> SessionResult<Option<QueueItem>> {
1308        let mut queue = self.queue.write().await;
1309        let items = match queue.get_mut(session_id) {
1310            Some(items) => items,
1311            None => return Ok(None),
1312        };
1313
1314        items.sort_by(|a, b| b.priority.cmp(&a.priority));
1315
1316        for item in items.iter_mut() {
1317            if item.status == QueueStatus::Pending {
1318                item.start_processing();
1319                return Ok(Some(item.clone()));
1320            }
1321        }
1322
1323        Ok(None)
1324    }
1325
1326    async fn cancel_queued(&self, item_id: Uuid) -> SessionResult<bool> {
1327        let mut queue = self.queue.write().await;
1328        for items in queue.values_mut() {
1329            if let Some(item) = items.iter_mut().find(|i| i.id == item_id) {
1330                item.cancel();
1331                return Ok(true);
1332            }
1333        }
1334        Ok(false)
1335    }
1336
1337    async fn pending_queue(&self, session_id: &SessionId) -> SessionResult<Vec<QueueItem>> {
1338        Ok(self
1339            .queue
1340            .read()
1341            .await
1342            .get(session_id)
1343            .map(|items| {
1344                items
1345                    .iter()
1346                    .filter(|i| i.status == QueueStatus::Pending)
1347                    .cloned()
1348                    .collect()
1349            })
1350            .unwrap_or_default())
1351    }
1352
1353    async fn cleanup_expired(&self) -> SessionResult<usize> {
1354        let cutoff = Utc::now() - chrono::Duration::days(self.config.retention_days as i64);
1355
1356        // Collect expired sessions and remove from index in single lock
1357        let expired_paths: Vec<PathBuf> = {
1358            let mut index = self.index.write().await;
1359            let expired_ids: Vec<SessionId> = index
1360                .sessions
1361                .iter()
1362                .filter(|(_, m)| m.updated_at < cutoff)
1363                .map(|(id, _)| *id)
1364                .collect();
1365
1366            let mut paths = Vec::with_capacity(expired_ids.len());
1367            for id in &expired_ids {
1368                if let Some(meta) = index.remove(id) {
1369                    paths.push(meta.path);
1370                }
1371            }
1372            paths
1373        };
1374
1375        let count = expired_paths.len();
1376
1377        // Delete files without holding the lock
1378        for path in expired_paths {
1379            let _ = tokio::fs::remove_file(&path).await;
1380        }
1381
1382        Ok(count)
1383    }
1384}
1385
1386// ============================================================================
1387// Tests
1388// ============================================================================
1389
1390#[cfg(test)]
1391mod tests {
1392    use super::*;
1393    use crate::types::ContentBlock;
1394    use tempfile::TempDir;
1395
1396    async fn create_test_persistence() -> (JsonlPersistence, TempDir) {
1397        let temp_dir = TempDir::new().unwrap();
1398        let config = JsonlConfig::builder()
1399            .base_dir(temp_dir.path().to_path_buf())
1400            .build();
1401        let persistence = JsonlPersistence::new(config).await.unwrap();
1402        (persistence, temp_dir)
1403    }
1404
1405    #[tokio::test]
1406    async fn test_save_and_load_session() {
1407        let (persistence, _temp) = create_test_persistence().await;
1408
1409        let mut session = Session::new(SessionConfig::default());
1410        session.add_message(SessionMessage::user(vec![ContentBlock::text("Hello")]));
1411        session.add_message(SessionMessage::assistant(vec![ContentBlock::text(
1412            "Hi there!",
1413        )]));
1414
1415        persistence.save(&session).await.unwrap();
1416
1417        let loaded = persistence.load(&session.id).await.unwrap().unwrap();
1418        assert_eq!(loaded.id, session.id);
1419        assert_eq!(loaded.messages.len(), 2);
1420    }
1421
1422    #[tokio::test]
1423    async fn test_incremental_save() {
1424        let (persistence, _temp) = create_test_persistence().await;
1425
1426        let mut session = Session::new(SessionConfig::default());
1427        session.add_message(SessionMessage::user(vec![ContentBlock::text("First")]));
1428        persistence.save(&session).await.unwrap();
1429
1430        session.add_message(SessionMessage::assistant(vec![ContentBlock::text(
1431            "Second",
1432        )]));
1433        persistence.save(&session).await.unwrap();
1434
1435        let loaded = persistence.load(&session.id).await.unwrap().unwrap();
1436        assert_eq!(loaded.messages.len(), 2);
1437    }
1438
1439    #[tokio::test]
1440    async fn test_delete_session() {
1441        let (persistence, _temp) = create_test_persistence().await;
1442
1443        let session = Session::new(SessionConfig::default());
1444        persistence.save(&session).await.unwrap();
1445
1446        assert!(persistence.delete(&session.id).await.unwrap());
1447        assert!(persistence.load(&session.id).await.unwrap().is_none());
1448    }
1449
1450    #[tokio::test]
1451    async fn test_list_sessions() {
1452        let (persistence, _temp) = create_test_persistence().await;
1453
1454        let s1 = Session::new(SessionConfig::default());
1455        let s2 = Session::new(SessionConfig::default());
1456
1457        persistence.save(&s1).await.unwrap();
1458        persistence.save(&s2).await.unwrap();
1459
1460        let list = persistence.list(None).await.unwrap();
1461        assert_eq!(list.len(), 2);
1462    }
1463
1464    #[tokio::test]
1465    async fn test_tenant_filtering() {
1466        let (persistence, _temp) = create_test_persistence().await;
1467
1468        let mut s1 = Session::new(SessionConfig::default());
1469        s1.tenant_id = Some("tenant-a".to_string());
1470
1471        let mut s2 = Session::new(SessionConfig::default());
1472        s2.tenant_id = Some("tenant-b".to_string());
1473
1474        persistence.save(&s1).await.unwrap();
1475        persistence.save(&s2).await.unwrap();
1476
1477        let list = persistence.list(Some("tenant-a")).await.unwrap();
1478        assert_eq!(list.len(), 1);
1479        assert_eq!(list[0], s1.id);
1480    }
1481
1482    #[tokio::test]
1483    async fn test_summaries() {
1484        let (persistence, _temp) = create_test_persistence().await;
1485
1486        let session = Session::new(SessionConfig::default());
1487        persistence.save(&session).await.unwrap();
1488
1489        persistence
1490            .add_summary(SummarySnapshot::new(session.id, "Summary 1"))
1491            .await
1492            .unwrap();
1493        persistence
1494            .add_summary(SummarySnapshot::new(session.id, "Summary 2"))
1495            .await
1496            .unwrap();
1497
1498        let summaries = persistence.get_summaries(&session.id).await.unwrap();
1499        assert_eq!(summaries.len(), 2);
1500    }
1501
1502    #[tokio::test]
1503    async fn test_queue_operations() {
1504        let (persistence, _temp) = create_test_persistence().await;
1505
1506        let session = Session::new(SessionConfig::default());
1507        persistence.save(&session).await.unwrap();
1508
1509        persistence
1510            .enqueue(&session.id, "Low priority".to_string(), 1)
1511            .await
1512            .unwrap();
1513        persistence
1514            .enqueue(&session.id, "High priority".to_string(), 10)
1515            .await
1516            .unwrap();
1517
1518        let next = persistence.dequeue(&session.id).await.unwrap().unwrap();
1519        assert_eq!(next.content, "High priority");
1520    }
1521
1522    #[tokio::test]
1523    async fn test_dag_reconstruction() {
1524        let (persistence, _temp) = create_test_persistence().await;
1525
1526        let mut session = Session::new(SessionConfig::default());
1527        session.add_message(SessionMessage::user(vec![ContentBlock::text("Q1")]));
1528        session.add_message(SessionMessage::assistant(vec![ContentBlock::text("A1")]));
1529        session.add_message(SessionMessage::user(vec![ContentBlock::text("Q2")]));
1530        session.add_message(SessionMessage::assistant(vec![ContentBlock::text("A2")]));
1531
1532        persistence.save(&session).await.unwrap();
1533
1534        let loaded = persistence.load(&session.id).await.unwrap().unwrap();
1535
1536        assert_eq!(loaded.messages.len(), 4);
1537        assert!(
1538            loaded.messages[0]
1539                .content
1540                .iter()
1541                .any(|c| c.as_text() == Some("Q1"))
1542        );
1543        assert!(
1544            loaded.messages[1]
1545                .content
1546                .iter()
1547                .any(|c| c.as_text() == Some("A1"))
1548        );
1549        assert!(
1550            loaded.messages[2]
1551                .content
1552                .iter()
1553                .any(|c| c.as_text() == Some("Q2"))
1554        );
1555        assert!(
1556            loaded.messages[3]
1557                .content
1558                .iter()
1559                .any(|c| c.as_text() == Some("A2"))
1560        );
1561    }
1562
1563    #[tokio::test]
1564    async fn test_project_path_encoding() {
1565        let config = JsonlConfig::default();
1566
1567        assert_eq!(
1568            config.encode_project_path(Path::new("/home/user/project")),
1569            "-home-user-project"
1570        );
1571        assert_eq!(
1572            config.encode_project_path(Path::new("/Users/alice/work/app")),
1573            "-Users-alice-work-app"
1574        );
1575    }
1576
1577    #[test]
1578    fn test_jsonl_entry_serialization() {
1579        let msg = SessionMessage::user(vec![ContentBlock::text("Hello")]);
1580        let session_id = SessionId::new();
1581        let entry = JsonlEntry::from_message(&session_id, &msg);
1582
1583        let json = serde_json::to_string(&entry).unwrap();
1584        assert!(json.contains("\"type\":\"user\""));
1585
1586        let parsed: JsonlEntry = serde_json::from_str(&json).unwrap();
1587        assert!(matches!(parsed, JsonlEntry::User(_)));
1588    }
1589
1590    #[tokio::test]
1591    async fn test_no_duplicate_writes() {
1592        let (persistence, _temp) = create_test_persistence().await;
1593
1594        let mut session = Session::new(SessionConfig::default());
1595        session.add_message(SessionMessage::user(vec![ContentBlock::text("Hello")]));
1596        persistence.save(&session).await.unwrap();
1597        persistence.save(&session).await.unwrap(); // Save same data twice
1598        persistence.save(&session).await.unwrap(); // And again
1599
1600        // Check file has only one message entry + session meta
1601        let file_path = persistence.session_file_path(&session.id, None);
1602        let entries = read_entries_sync(&file_path).unwrap();
1603        let message_count = entries
1604            .iter()
1605            .filter(|e| e.message_uuid().is_some())
1606            .count();
1607        assert_eq!(message_count, 1, "Should not duplicate message entries");
1608    }
1609
1610    #[test]
1611    fn test_windows_path_encoding() {
1612        let config = JsonlConfig::default();
1613        // Windows-style path
1614        assert_eq!(
1615            config.encode_project_path(Path::new("C:\\Users\\alice\\project")),
1616            "C_-Users-alice-project"
1617        );
1618    }
1619}