claude_agent/session/
persistence_jsonl.rs

1//! JSONL-based persistence backend compatible with Claude Code CLI.
2//!
3//! This module provides file-based session persistence using the JSONL (JSON Lines) format,
4//! matching the Claude Code CLI's storage format for full interoperability.
5//!
6//! # Features
7//!
8//! - **Claude Code CLI Compatible**: Uses the same file structure and format as the official CLI
9//! - **DAG Structure**: Messages form a directed acyclic graph via parent_uuid references
10//! - **Incremental Writes**: Only new entries are appended, avoiding full file rewrites
11//! - **Project-Based Organization**: Sessions organized by encoded project paths
12//! - **Async I/O**: Non-blocking file operations via tokio
13//!
14//! # File Structure
15//!
16//! ```text
17//! ~/.claude/
18//! └── projects/
19//!     └── {encoded-project-path}/
20//!         ├── {session-uuid}.jsonl    # Conversation history
21//!         └── ...
22//! ```
23
24use std::collections::{HashMap, HashSet};
25use std::hash::{Hash, Hasher};
26use std::io::{BufRead, BufReader, Write};
27use std::path::{Path, PathBuf};
28use std::sync::Arc;
29
30use chrono::{DateTime, Utc};
31use serde::{Deserialize, Serialize};
32use tokio::sync::RwLock;
33use uuid::Uuid;
34
35use super::state::{
36    MessageId, Session, SessionConfig, SessionId, SessionMessage, SessionMode, SessionState,
37    SessionType,
38};
39use super::types::{
40    CompactRecord, CompactTrigger, EnvironmentContext, Plan, PlanStatus, QueueItem, QueueStatus,
41    SummarySnapshot, TodoItem, TodoStatus,
42};
43use super::{Persistence, SessionError, SessionResult};
44use crate::types::{ContentBlock, Role, TokenUsage};
45
46// ============================================================================
47// Configuration
48// ============================================================================
49
50/// Sync mode for file operations.
51#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
52pub enum SyncMode {
53    /// No explicit sync (OS buffering only).
54    #[default]
55    None,
56    /// Sync after every write (safest, slowest).
57    OnWrite,
58}
59
60/// Configuration for JSONL persistence.
61#[derive(Clone, Debug)]
62pub struct JsonlConfig {
63    /// Base directory for storage (default: ~/.claude).
64    pub base_dir: PathBuf,
65    /// Log retention period in days (default: 30).
66    pub retention_days: u32,
67    /// File sync mode for durability.
68    pub sync_mode: SyncMode,
69}
70
71impl Default for JsonlConfig {
72    fn default() -> Self {
73        Self {
74            base_dir: dirs::home_dir()
75                .unwrap_or_else(|| PathBuf::from("."))
76                .join(".claude"),
77            retention_days: 30,
78            sync_mode: SyncMode::default(),
79        }
80    }
81}
82
83impl JsonlConfig {
84    pub fn builder() -> JsonlConfigBuilder {
85        JsonlConfigBuilder::default()
86    }
87
88    fn projects_dir(&self) -> PathBuf {
89        self.base_dir.join("projects")
90    }
91
92    /// Encode a project path for use as a directory name.
93    /// Cross-platform: handles both Unix and Windows path separators.
94    fn encode_project_path(&self, path: &Path) -> String {
95        path.to_string_lossy()
96            .replace(['/', '\\'], "-")
97            .replace(':', "_") // Windows drive letters
98    }
99
100    fn project_dir(&self, project_path: &Path) -> PathBuf {
101        self.projects_dir()
102            .join(self.encode_project_path(project_path))
103    }
104}
105
106/// Builder for JsonlConfig.
107#[derive(Default)]
108pub struct JsonlConfigBuilder {
109    base_dir: Option<PathBuf>,
110    retention_days: Option<u32>,
111    sync_mode: Option<SyncMode>,
112}
113
114impl JsonlConfigBuilder {
115    pub fn base_dir(mut self, path: impl Into<PathBuf>) -> Self {
116        self.base_dir = Some(path.into());
117        self
118    }
119
120    pub fn retention_days(mut self, days: u32) -> Self {
121        self.retention_days = Some(days);
122        self
123    }
124
125    pub fn sync_mode(mut self, mode: SyncMode) -> Self {
126        self.sync_mode = Some(mode);
127        self
128    }
129
130    pub fn build(self) -> JsonlConfig {
131        let default = JsonlConfig::default();
132        JsonlConfig {
133            base_dir: self.base_dir.unwrap_or(default.base_dir),
134            retention_days: self.retention_days.unwrap_or(default.retention_days),
135            sync_mode: self.sync_mode.unwrap_or(default.sync_mode),
136        }
137    }
138}
139
140// ============================================================================
141// JSONL Entry Types (Claude Code CLI Compatible)
142// ============================================================================
143
144/// JSONL entry types matching Claude Code CLI format.
145#[derive(Clone, Debug, Serialize, Deserialize)]
146#[serde(tag = "type", rename_all = "snake_case")]
147pub enum JsonlEntry {
148    User(UserEntry),
149    Assistant(AssistantEntry),
150    System(SystemEntry),
151    QueueOperation(QueueOperationEntry),
152    Summary(SummaryEntry),
153    SessionMeta(SessionMetaEntry),
154    Todo(TodoEntry),
155    Plan(PlanEntry),
156    Compact(CompactEntry),
157}
158
159/// Common fields shared by message entries.
160#[derive(Clone, Debug, Serialize, Deserialize)]
161pub struct EntryCommon {
162    pub uuid: String,
163    #[serde(rename = "parentUuid", skip_serializing_if = "Option::is_none")]
164    pub parent_uuid: Option<String>,
165    #[serde(rename = "sessionId")]
166    pub session_id: String,
167    pub timestamp: DateTime<Utc>,
168    #[serde(skip_serializing_if = "Option::is_none")]
169    pub cwd: Option<PathBuf>,
170    pub version: String,
171    #[serde(rename = "gitBranch", default)]
172    pub git_branch: String,
173    #[serde(rename = "isSidechain", default)]
174    pub is_sidechain: bool,
175}
176
177impl EntryCommon {
178    fn from_message(session_id: &SessionId, msg: &SessionMessage) -> Self {
179        Self {
180            uuid: msg.id.to_string(),
181            parent_uuid: msg.parent_id.as_ref().map(|id| id.to_string()),
182            session_id: session_id.to_string(),
183            timestamp: msg.timestamp,
184            cwd: msg.environment.as_ref().and_then(|e| e.cwd.clone()),
185            version: env!("CARGO_PKG_VERSION").to_string(),
186            git_branch: msg
187                .environment
188                .as_ref()
189                .and_then(|e| e.git_branch.clone())
190                .unwrap_or_default(),
191            is_sidechain: msg.is_sidechain,
192        }
193    }
194
195    fn to_environment(&self) -> EnvironmentContext {
196        EnvironmentContext {
197            cwd: self.cwd.clone(),
198            git_branch: if self.git_branch.is_empty() {
199                None
200            } else {
201                Some(self.git_branch.clone())
202            },
203            ..Default::default()
204        }
205    }
206}
207
208#[derive(Clone, Debug, Serialize, Deserialize)]
209pub struct UserMessageContent {
210    pub role: String,
211    pub content: serde_json::Value,
212}
213
214#[derive(Clone, Debug, Serialize, Deserialize)]
215pub struct UserEntry {
216    #[serde(flatten)]
217    pub common: EntryCommon,
218    pub message: UserMessageContent,
219    #[serde(rename = "isCompactSummary", default)]
220    pub is_compact_summary: bool,
221}
222
223#[derive(Clone, Debug, Serialize, Deserialize)]
224pub struct AssistantMessageContent {
225    #[serde(skip_serializing_if = "Option::is_none")]
226    pub id: Option<String>,
227    #[serde(skip_serializing_if = "Option::is_none")]
228    pub model: Option<String>,
229    pub role: String,
230    #[serde(skip_serializing_if = "Option::is_none")]
231    pub stop_reason: Option<String>,
232    pub content: serde_json::Value,
233    #[serde(skip_serializing_if = "Option::is_none")]
234    pub usage: Option<UsageInfo>,
235}
236
237#[derive(Clone, Debug, Default, Serialize, Deserialize)]
238pub struct UsageInfo {
239    pub input_tokens: u64,
240    pub output_tokens: u64,
241    #[serde(default)]
242    pub cache_creation_input_tokens: u64,
243    #[serde(default)]
244    pub cache_read_input_tokens: u64,
245}
246
247impl From<&TokenUsage> for UsageInfo {
248    fn from(u: &TokenUsage) -> Self {
249        Self {
250            input_tokens: u.input_tokens,
251            output_tokens: u.output_tokens,
252            cache_creation_input_tokens: u.cache_creation_input_tokens,
253            cache_read_input_tokens: u.cache_read_input_tokens,
254        }
255    }
256}
257
258impl From<&UsageInfo> for TokenUsage {
259    fn from(u: &UsageInfo) -> Self {
260        Self {
261            input_tokens: u.input_tokens,
262            output_tokens: u.output_tokens,
263            cache_creation_input_tokens: u.cache_creation_input_tokens,
264            cache_read_input_tokens: u.cache_read_input_tokens,
265        }
266    }
267}
268
269#[derive(Clone, Debug, Serialize, Deserialize)]
270pub struct AssistantEntry {
271    #[serde(flatten)]
272    pub common: EntryCommon,
273    pub message: AssistantMessageContent,
274    #[serde(rename = "requestId", skip_serializing_if = "Option::is_none")]
275    pub request_id: Option<String>,
276}
277
278#[derive(Clone, Debug, Serialize, Deserialize)]
279pub struct SystemEntry {
280    #[serde(flatten)]
281    pub common: EntryCommon,
282    pub subtype: String,
283    #[serde(skip_serializing_if = "Option::is_none")]
284    pub content: Option<String>,
285}
286
287#[derive(Clone, Debug, Serialize, Deserialize)]
288pub struct QueueOperationEntry {
289    pub operation: String,
290    #[serde(rename = "sessionId")]
291    pub session_id: String,
292    pub timestamp: DateTime<Utc>,
293    pub content: String,
294    pub priority: i32,
295    #[serde(rename = "itemId")]
296    pub item_id: String,
297}
298
299#[derive(Clone, Debug, Serialize, Deserialize)]
300pub struct SummaryEntry {
301    #[serde(rename = "sessionId")]
302    pub session_id: String,
303    pub summary: String,
304    #[serde(rename = "leafUuid", skip_serializing_if = "Option::is_none")]
305    pub leaf_uuid: Option<String>,
306    pub timestamp: DateTime<Utc>,
307}
308
309#[derive(Clone, Debug, Serialize, Deserialize)]
310pub struct SessionMetaEntry {
311    #[serde(rename = "sessionId")]
312    pub session_id: String,
313    #[serde(rename = "parentSessionId", skip_serializing_if = "Option::is_none")]
314    pub parent_session_id: Option<String>,
315    #[serde(rename = "tenantId", skip_serializing_if = "Option::is_none")]
316    pub tenant_id: Option<String>,
317    #[serde(rename = "sessionType")]
318    pub session_type: serde_json::Value,
319    pub mode: String,
320    pub state: String,
321    pub config: serde_json::Value,
322    #[serde(rename = "permissionPolicy")]
323    pub permission_policy: serde_json::Value,
324    #[serde(rename = "totalUsage", default)]
325    pub total_usage: UsageInfo,
326    #[serde(rename = "totalCostUsd", default)]
327    pub total_cost_usd: f64,
328    #[serde(rename = "staticContextHash", skip_serializing_if = "Option::is_none")]
329    pub static_context_hash: Option<String>,
330    #[serde(skip_serializing_if = "Option::is_none")]
331    pub error: Option<String>,
332    #[serde(rename = "createdAt")]
333    pub created_at: DateTime<Utc>,
334    #[serde(rename = "updatedAt")]
335    pub updated_at: DateTime<Utc>,
336    #[serde(rename = "expiresAt", skip_serializing_if = "Option::is_none")]
337    pub expires_at: Option<DateTime<Utc>>,
338}
339
340#[derive(Clone, Debug, Serialize, Deserialize)]
341pub struct TodoEntry {
342    pub id: String,
343    #[serde(rename = "sessionId")]
344    pub session_id: String,
345    pub content: String,
346    #[serde(rename = "activeForm")]
347    pub active_form: String,
348    pub status: String,
349    #[serde(rename = "planId", skip_serializing_if = "Option::is_none")]
350    pub plan_id: Option<String>,
351    #[serde(rename = "createdAt")]
352    pub created_at: DateTime<Utc>,
353    #[serde(rename = "startedAt", skip_serializing_if = "Option::is_none")]
354    pub started_at: Option<DateTime<Utc>>,
355    #[serde(rename = "completedAt", skip_serializing_if = "Option::is_none")]
356    pub completed_at: Option<DateTime<Utc>>,
357}
358
359#[derive(Clone, Debug, Serialize, Deserialize)]
360pub struct PlanEntry {
361    pub id: String,
362    #[serde(rename = "sessionId")]
363    pub session_id: String,
364    #[serde(skip_serializing_if = "Option::is_none")]
365    pub name: Option<String>,
366    pub content: String,
367    pub status: String,
368    #[serde(skip_serializing_if = "Option::is_none")]
369    pub error: Option<String>,
370    #[serde(rename = "createdAt")]
371    pub created_at: DateTime<Utc>,
372    #[serde(rename = "approvedAt", skip_serializing_if = "Option::is_none")]
373    pub approved_at: Option<DateTime<Utc>>,
374    #[serde(rename = "startedAt", skip_serializing_if = "Option::is_none")]
375    pub started_at: Option<DateTime<Utc>>,
376    #[serde(rename = "completedAt", skip_serializing_if = "Option::is_none")]
377    pub completed_at: Option<DateTime<Utc>>,
378}
379
380#[derive(Clone, Debug, Serialize, Deserialize)]
381pub struct CompactEntry {
382    pub id: String,
383    #[serde(rename = "sessionId")]
384    pub session_id: String,
385    pub trigger: String,
386    #[serde(rename = "preTokens")]
387    pub pre_tokens: usize,
388    #[serde(rename = "postTokens")]
389    pub post_tokens: usize,
390    #[serde(rename = "savedTokens")]
391    pub saved_tokens: usize,
392    pub summary: String,
393    #[serde(rename = "originalCount")]
394    pub original_count: usize,
395    #[serde(rename = "newCount")]
396    pub new_count: usize,
397    #[serde(rename = "logicalParentId", skip_serializing_if = "Option::is_none")]
398    pub logical_parent_id: Option<String>,
399    #[serde(rename = "createdAt")]
400    pub created_at: DateTime<Utc>,
401}
402
403// ============================================================================
404// Conversion: SessionMessage <-> JsonlEntry
405// ============================================================================
406
407impl JsonlEntry {
408    fn from_message(session_id: &SessionId, msg: &SessionMessage) -> Self {
409        let common = EntryCommon::from_message(session_id, msg);
410
411        match msg.role {
412            Role::User => JsonlEntry::User(UserEntry {
413                common,
414                message: UserMessageContent {
415                    role: "user".to_string(),
416                    content: serde_json::to_value(&msg.content).unwrap_or_default(),
417                },
418                is_compact_summary: msg.is_compact_summary,
419            }),
420            Role::Assistant => JsonlEntry::Assistant(AssistantEntry {
421                common,
422                message: AssistantMessageContent {
423                    id: msg.metadata.request_id.clone(),
424                    model: msg.metadata.model.clone(),
425                    role: "assistant".to_string(),
426                    stop_reason: None,
427                    content: serde_json::to_value(&msg.content).unwrap_or_default(),
428                    usage: msg.usage.as_ref().map(UsageInfo::from),
429                },
430                request_id: msg.metadata.request_id.clone(),
431            }),
432        }
433    }
434
435    fn to_session_message(&self) -> Option<SessionMessage> {
436        match self {
437            JsonlEntry::User(entry) => {
438                let content: Vec<ContentBlock> =
439                    serde_json::from_value(entry.message.content.clone()).unwrap_or_default();
440                let mut msg = SessionMessage::user(content);
441                msg.id = MessageId::from_string(&entry.common.uuid);
442                msg.parent_id = entry
443                    .common
444                    .parent_uuid
445                    .as_ref()
446                    .map(MessageId::from_string);
447                msg.timestamp = entry.common.timestamp;
448                msg.is_sidechain = entry.common.is_sidechain;
449                msg.is_compact_summary = entry.is_compact_summary;
450                msg.environment = Some(entry.common.to_environment());
451                Some(msg)
452            }
453            JsonlEntry::Assistant(entry) => {
454                let content: Vec<ContentBlock> =
455                    serde_json::from_value(entry.message.content.clone()).unwrap_or_default();
456                let mut msg = SessionMessage::assistant(content);
457                msg.id = MessageId::from_string(&entry.common.uuid);
458                msg.parent_id = entry
459                    .common
460                    .parent_uuid
461                    .as_ref()
462                    .map(MessageId::from_string);
463                msg.timestamp = entry.common.timestamp;
464                msg.is_sidechain = entry.common.is_sidechain;
465                msg.usage = entry.message.usage.as_ref().map(TokenUsage::from);
466                msg.metadata.model.clone_from(&entry.message.model);
467                msg.metadata.request_id.clone_from(&entry.request_id);
468                msg.environment = Some(entry.common.to_environment());
469                Some(msg)
470            }
471            _ => None,
472        }
473    }
474
475    #[cfg(test)]
476    fn message_uuid(&self) -> Option<&str> {
477        match self {
478            JsonlEntry::User(e) => Some(&e.common.uuid),
479            JsonlEntry::Assistant(e) => Some(&e.common.uuid),
480            _ => None,
481        }
482    }
483}
484
485// ============================================================================
486// Session Index
487// ============================================================================
488
489#[derive(Clone, Debug)]
490struct SessionMeta {
491    path: PathBuf,
492    project_path: Option<PathBuf>,
493    tenant_id: Option<String>,
494    parent_id: Option<SessionId>,
495    updated_at: DateTime<Utc>,
496    /// IDs of persisted messages and compacts
497    persisted_ids: HashSet<String>,
498    /// Hash of last persisted todos state (to detect changes)
499    todos_hash: u64,
500    /// Hash of last persisted plan state (to detect changes)
501    plan_hash: u64,
502}
503
504#[derive(Default)]
505struct SessionIndex {
506    sessions: HashMap<SessionId, SessionMeta>,
507    by_project: HashMap<PathBuf, Vec<SessionId>>,
508    by_tenant: HashMap<String, Vec<SessionId>>,
509    by_parent: HashMap<SessionId, Vec<SessionId>>,
510}
511
512impl SessionIndex {
513    fn insert(&mut self, session_id: SessionId, meta: SessionMeta) {
514        // Remove old entries if updating
515        self.remove(&session_id);
516
517        if let Some(ref project) = meta.project_path {
518            self.by_project
519                .entry(project.clone())
520                .or_default()
521                .push(session_id);
522        }
523        if let Some(ref tenant) = meta.tenant_id {
524            self.by_tenant
525                .entry(tenant.clone())
526                .or_default()
527                .push(session_id);
528        }
529        if let Some(parent) = meta.parent_id {
530            self.by_parent.entry(parent).or_default().push(session_id);
531        }
532        self.sessions.insert(session_id, meta);
533    }
534
535    fn remove(&mut self, session_id: &SessionId) -> Option<SessionMeta> {
536        let meta = self.sessions.remove(session_id)?;
537
538        if let Some(ref project) = meta.project_path
539            && let Some(ids) = self.by_project.get_mut(project)
540        {
541            ids.retain(|id| id != session_id);
542        }
543        if let Some(ref tenant) = meta.tenant_id
544            && let Some(ids) = self.by_tenant.get_mut(tenant)
545        {
546            ids.retain(|id| id != session_id);
547        }
548        if let Some(parent) = meta.parent_id
549            && let Some(ids) = self.by_parent.get_mut(&parent)
550        {
551            ids.retain(|id| id != session_id);
552        }
553        Some(meta)
554    }
555}
556
557// ============================================================================
558// File Operations (blocking, run via spawn_blocking)
559// ============================================================================
560
561fn read_entries_sync(path: &Path) -> SessionResult<Vec<JsonlEntry>> {
562    if !path.exists() {
563        return Ok(Vec::new());
564    }
565
566    let file = std::fs::File::open(path).map_err(|e| SessionError::Storage {
567        message: format!("Failed to open {}: {}", path.display(), e),
568    })?;
569
570    let reader = BufReader::with_capacity(64 * 1024, file);
571    let mut entries = Vec::with_capacity(128);
572
573    for (line_num, line) in reader.lines().enumerate() {
574        let line = line.map_err(|e| SessionError::Storage {
575            message: format!("Read error at line {}: {}", line_num + 1, e),
576        })?;
577
578        if line.trim().is_empty() {
579            continue;
580        }
581
582        match serde_json::from_str::<JsonlEntry>(&line) {
583            Ok(entry) => entries.push(entry),
584            Err(e) => {
585                tracing::warn!(
586                    path = %path.display(),
587                    line = line_num + 1,
588                    error = %e,
589                    "Skipping malformed JSONL entry"
590                );
591            }
592        }
593    }
594
595    Ok(entries)
596}
597
598fn append_entries_sync(path: &Path, entries: &[JsonlEntry], sync: bool) -> SessionResult<()> {
599    if entries.is_empty() {
600        return Ok(());
601    }
602
603    if let Some(parent) = path.parent() {
604        std::fs::create_dir_all(parent).map_err(|e| SessionError::Storage {
605            message: format!("Failed to create directory {}: {}", parent.display(), e),
606        })?;
607    }
608
609    let file = std::fs::OpenOptions::new()
610        .create(true)
611        .append(true)
612        .open(path)
613        .map_err(|e| SessionError::Storage {
614            message: format!("Failed to open {} for writing: {}", path.display(), e),
615        })?;
616
617    let mut writer = std::io::BufWriter::with_capacity(64 * 1024, file);
618
619    for entry in entries {
620        serde_json::to_writer(&mut writer, entry)?;
621        writeln!(writer).map_err(|e| SessionError::Storage {
622            message: format!("Write failed: {}", e),
623        })?;
624    }
625
626    writer.flush().map_err(|e| SessionError::Storage {
627        message: format!("Flush failed: {}", e),
628    })?;
629
630    if sync {
631        writer
632            .into_inner()
633            .map_err(|e| SessionError::Storage {
634                message: format!("Buffer error: {}", e.error()),
635            })?
636            .sync_all()
637            .map_err(|e| SessionError::Storage {
638                message: format!("Sync failed: {}", e),
639            })?;
640    }
641
642    Ok(())
643}
644
645// ============================================================================
646// JSONL Persistence Implementation
647// ============================================================================
648
649pub struct JsonlPersistence {
650    config: JsonlConfig,
651    index: Arc<RwLock<SessionIndex>>,
652    summaries: Arc<RwLock<HashMap<SessionId, Vec<SummarySnapshot>>>>,
653    queue: Arc<RwLock<HashMap<SessionId, Vec<QueueItem>>>>,
654}
655
656impl JsonlPersistence {
657    pub async fn new(config: JsonlConfig) -> SessionResult<Self> {
658        tokio::fs::create_dir_all(config.projects_dir())
659            .await
660            .map_err(|e| SessionError::Storage {
661                message: format!("Failed to create projects directory: {}", e),
662            })?;
663
664        let persistence = Self {
665            config,
666            index: Arc::new(RwLock::new(SessionIndex::default())),
667            summaries: Arc::new(RwLock::new(HashMap::new())),
668            queue: Arc::new(RwLock::new(HashMap::new())),
669        };
670
671        persistence.rebuild_index().await?;
672        Ok(persistence)
673    }
674
675    pub async fn default_config() -> SessionResult<Self> {
676        Self::new(JsonlConfig::default()).await
677    }
678
679    async fn rebuild_index(&self) -> SessionResult<()> {
680        let projects_dir = self.config.projects_dir();
681        if !projects_dir.exists() {
682            return Ok(());
683        }
684
685        let mut index = self.index.write().await;
686        let mut summaries = self.summaries.write().await;
687
688        let mut entries =
689            tokio::fs::read_dir(&projects_dir)
690                .await
691                .map_err(|e| SessionError::Storage {
692                    message: format!("Failed to read projects dir: {}", e),
693                })?;
694
695        while let Some(project_entry) =
696            entries
697                .next_entry()
698                .await
699                .map_err(|e| SessionError::Storage {
700                    message: format!("Failed to read entry: {}", e),
701                })?
702        {
703            let file_type = project_entry.file_type().await.ok();
704            if !file_type.map(|t| t.is_dir()).unwrap_or(false) {
705                continue;
706            }
707
708            let project_path = project_entry.path();
709            let mut files =
710                tokio::fs::read_dir(&project_path)
711                    .await
712                    .map_err(|e| SessionError::Storage {
713                        message: format!("Failed to read project dir: {}", e),
714                    })?;
715
716            while let Some(file_entry) =
717                files
718                    .next_entry()
719                    .await
720                    .map_err(|e| SessionError::Storage {
721                        message: format!("Failed to read file entry: {}", e),
722                    })?
723            {
724                let file_path = file_entry.path();
725                if file_path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
726                    continue;
727                }
728
729                let session_id = match file_path
730                    .file_stem()
731                    .and_then(|s| s.to_str())
732                    .and_then(SessionId::parse)
733                {
734                    Some(id) => id,
735                    None => continue,
736                };
737
738                // Read file in blocking context
739                let path_clone = file_path.clone();
740                let parsed = tokio::task::spawn_blocking(move || read_entries_sync(&path_clone))
741                    .await
742                    .map_err(|e| SessionError::Storage {
743                        message: format!("Task join error: {}", e),
744                    })??;
745
746                let (meta, session_summaries) =
747                    Self::parse_file_metadata(session_id, file_path, &parsed);
748
749                index.insert(session_id, meta);
750                if !session_summaries.is_empty() {
751                    summaries.insert(session_id, session_summaries);
752                }
753            }
754        }
755
756        Ok(())
757    }
758
759    fn parse_file_metadata(
760        session_id: SessionId,
761        path: PathBuf,
762        entries: &[JsonlEntry],
763    ) -> (SessionMeta, Vec<SummarySnapshot>) {
764        let mut project_path: Option<PathBuf> = None;
765        let mut tenant_id: Option<String> = None;
766        let mut parent_id: Option<SessionId> = None;
767        let mut updated_at = Utc::now();
768        let mut summaries = Vec::new();
769        let mut persisted_ids = HashSet::with_capacity(entries.len());
770
771        for entry in entries {
772            match entry {
773                JsonlEntry::User(e) => {
774                    if project_path.is_none() {
775                        project_path = e.common.cwd.clone();
776                    }
777                    updated_at = e.common.timestamp;
778                    persisted_ids.insert(e.common.uuid.clone());
779                }
780                JsonlEntry::Assistant(e) => {
781                    if project_path.is_none() {
782                        project_path = e.common.cwd.clone();
783                    }
784                    updated_at = e.common.timestamp;
785                    persisted_ids.insert(e.common.uuid.clone());
786                }
787                JsonlEntry::SessionMeta(m) => {
788                    tenant_id.clone_from(&m.tenant_id);
789                    parent_id = m
790                        .parent_session_id
791                        .as_ref()
792                        .and_then(|s| SessionId::parse(s));
793                    updated_at = m.updated_at;
794                }
795                JsonlEntry::Summary(s) => {
796                    summaries.push(SummarySnapshot {
797                        id: Uuid::new_v4(),
798                        session_id,
799                        summary: s.summary.clone(),
800                        leaf_message_id: s.leaf_uuid.as_ref().map(MessageId::from_string),
801                        created_at: s.timestamp,
802                    });
803                }
804                _ => {}
805            }
806        }
807
808        (
809            SessionMeta {
810                path,
811                project_path,
812                tenant_id,
813                parent_id,
814                updated_at,
815                persisted_ids,
816                todos_hash: 0, // Will be computed on first save
817                plan_hash: 0,  // Will be computed on first save
818            },
819            summaries,
820        )
821    }
822
823    fn session_file_path(&self, session_id: &SessionId, project_path: Option<&Path>) -> PathBuf {
824        let dir = match project_path {
825            Some(p) => self.config.project_dir(p),
826            None => self.config.projects_dir().join("_default"),
827        };
828        dir.join(format!("{}.jsonl", session_id))
829    }
830
831    fn get_project_path(session: &Session) -> Option<PathBuf> {
832        session
833            .messages
834            .first()
835            .and_then(|m| m.environment.as_ref())
836            .and_then(|e| e.cwd.clone())
837    }
838
839    /// Compute a simple hash of todos for change detection.
840    fn compute_todos_hash(todos: &[TodoItem]) -> u64 {
841        let mut hasher = std::collections::hash_map::DefaultHasher::new();
842        for todo in todos {
843            todo.id.hash(&mut hasher);
844            format!("{:?}", todo.status).hash(&mut hasher);
845            todo.content.hash(&mut hasher);
846        }
847        hasher.finish()
848    }
849
850    /// Compute a simple hash of plan for change detection.
851    fn compute_plan_hash(plan: Option<&Plan>) -> u64 {
852        let mut hasher = std::collections::hash_map::DefaultHasher::new();
853        if let Some(p) = plan {
854            p.id.hash(&mut hasher);
855            format!("{:?}", p.status).hash(&mut hasher);
856            p.content.hash(&mut hasher);
857        }
858        hasher.finish()
859    }
860
861    fn session_to_meta_entry(session: &Session) -> JsonlEntry {
862        JsonlEntry::SessionMeta(SessionMetaEntry {
863            session_id: session.id.to_string(),
864            parent_session_id: session.parent_id.map(|p| p.to_string()),
865            tenant_id: session.tenant_id.clone(),
866            session_type: serde_json::to_value(&session.session_type).unwrap_or_default(),
867            mode: format!("{:?}", session.mode).to_lowercase(),
868            state: format!("{:?}", session.state).to_lowercase(),
869            config: serde_json::to_value(&session.config).unwrap_or_default(),
870            permission_policy: serde_json::to_value(&session.permission_policy).unwrap_or_default(),
871            total_usage: UsageInfo::from(&session.total_usage),
872            total_cost_usd: session.total_cost_usd,
873            static_context_hash: session.static_context_hash.clone(),
874            error: session.error.clone(),
875            created_at: session.created_at,
876            updated_at: session.updated_at,
877            expires_at: session.expires_at,
878        })
879    }
880
881    fn reconstruct_session(session_id: SessionId, entries: Vec<JsonlEntry>) -> Session {
882        let mut session = Session::new(SessionConfig::default());
883        session.id = session_id;
884
885        let mut messages: HashMap<String, SessionMessage> = HashMap::with_capacity(entries.len());
886        let mut todos_map: HashMap<String, TodoItem> = HashMap::new();
887        let mut latest_plan: Option<Plan> = None;
888        let mut compacts: Vec<CompactRecord> = Vec::new();
889
890        for entry in entries {
891            match entry {
892                JsonlEntry::User(_) | JsonlEntry::Assistant(_) => {
893                    if let Some(msg) = entry.to_session_message() {
894                        messages.insert(msg.id.to_string(), msg);
895                    }
896                }
897                JsonlEntry::SessionMeta(m) => {
898                    session.tenant_id = m.tenant_id;
899                    session.parent_id = m
900                        .parent_session_id
901                        .as_ref()
902                        .and_then(|s| SessionId::parse(s));
903                    session.session_type =
904                        serde_json::from_value(m.session_type).unwrap_or(SessionType::Main);
905                    session.mode = serde_json::from_str(&format!("\"{}\"", m.mode))
906                        .unwrap_or(SessionMode::default());
907                    session.state = match m.state.as_str() {
908                        "active" => SessionState::Active,
909                        "completed" => SessionState::Completed,
910                        "failed" => SessionState::Failed,
911                        "cancelled" => SessionState::Cancelled,
912                        "waitingfortools" => SessionState::WaitingForTools,
913                        _ => SessionState::Created,
914                    };
915                    session.config = serde_json::from_value(m.config).unwrap_or_default();
916                    session.permission_policy =
917                        serde_json::from_value(m.permission_policy).unwrap_or_default();
918                    session.total_usage = TokenUsage::from(&m.total_usage);
919                    session.total_cost_usd = m.total_cost_usd;
920                    session.static_context_hash = m.static_context_hash;
921                    session.error = m.error;
922                    session.created_at = m.created_at;
923                    session.updated_at = m.updated_at;
924                    session.expires_at = m.expires_at;
925                }
926                JsonlEntry::Summary(s) => {
927                    session.summary = Some(s.summary);
928                }
929                JsonlEntry::Todo(t) => {
930                    let status = match t.status.as_str() {
931                        "in_progress" | "inprogress" => TodoStatus::InProgress,
932                        "completed" => TodoStatus::Completed,
933                        _ => TodoStatus::Pending,
934                    };
935                    let todo = TodoItem {
936                        id: Uuid::parse_str(&t.id).unwrap_or_else(|_| Uuid::new_v4()),
937                        session_id,
938                        content: t.content,
939                        active_form: t.active_form,
940                        status,
941                        plan_id: t.plan_id.and_then(|s| Uuid::parse_str(&s).ok()),
942                        created_at: t.created_at,
943                        started_at: t.started_at,
944                        completed_at: t.completed_at,
945                    };
946                    // Use map to get latest version of each todo
947                    todos_map.insert(t.id, todo);
948                }
949                JsonlEntry::Plan(p) => {
950                    let status = match p.status.as_str() {
951                        "approved" => PlanStatus::Approved,
952                        "executing" | "inprogress" | "in_progress" => PlanStatus::Executing,
953                        "completed" => PlanStatus::Completed,
954                        "cancelled" => PlanStatus::Cancelled,
955                        "failed" => PlanStatus::Failed,
956                        _ => PlanStatus::Draft,
957                    };
958                    let plan = Plan {
959                        id: Uuid::parse_str(&p.id).unwrap_or_else(|_| Uuid::new_v4()),
960                        session_id,
961                        name: p.name,
962                        content: p.content,
963                        status,
964                        error: p.error,
965                        created_at: p.created_at,
966                        approved_at: p.approved_at,
967                        started_at: p.started_at,
968                        completed_at: p.completed_at,
969                    };
970                    // Keep the latest plan entry
971                    latest_plan = Some(plan);
972                }
973                JsonlEntry::Compact(c) => {
974                    let trigger = match c.trigger.as_str() {
975                        "auto" | "automatic" => CompactTrigger::Auto,
976                        "threshold" => CompactTrigger::Threshold,
977                        _ => CompactTrigger::Manual,
978                    };
979                    compacts.push(CompactRecord {
980                        id: Uuid::parse_str(&c.id).unwrap_or_else(|_| Uuid::new_v4()),
981                        session_id,
982                        trigger,
983                        pre_tokens: c.pre_tokens,
984                        post_tokens: c.post_tokens,
985                        saved_tokens: c.saved_tokens,
986                        summary: c.summary,
987                        original_count: c.original_count,
988                        new_count: c.new_count,
989                        logical_parent_id: c.logical_parent_id.as_ref().map(MessageId::from_string),
990                        created_at: c.created_at,
991                    });
992                }
993                _ => {}
994            }
995        }
996
997        // Topological sort preserving order
998        let ordered = Self::topological_sort(&messages);
999        session.messages = Vec::with_capacity(ordered.len());
1000        for msg in ordered {
1001            session.add_message(msg);
1002        }
1003
1004        // Restore todos, plan, and compacts
1005        session.todos = todos_map.into_values().collect();
1006        session
1007            .todos
1008            .sort_by(|a, b| a.created_at.cmp(&b.created_at));
1009        session.current_plan = latest_plan;
1010        session.compact_history = compacts;
1011
1012        session
1013    }
1014
1015    fn topological_sort(messages: &HashMap<String, SessionMessage>) -> Vec<SessionMessage> {
1016        if messages.is_empty() {
1017            return Vec::new();
1018        }
1019
1020        // Build parent -> children mapping, sorted by timestamp to preserve order
1021        let mut children: HashMap<Option<String>, Vec<&SessionMessage>> = HashMap::new();
1022        for msg in messages.values() {
1023            children
1024                .entry(msg.parent_id.as_ref().map(|p| p.to_string()))
1025                .or_default()
1026                .push(msg);
1027        }
1028
1029        // Sort each group by timestamp
1030        for group in children.values_mut() {
1031            group.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
1032        }
1033
1034        // BFS traversal using VecDeque for FIFO order
1035        let mut result = Vec::with_capacity(messages.len());
1036        let mut queue = std::collections::VecDeque::new();
1037
1038        // Start with root messages (no parent)
1039        if let Some(roots) = children.remove(&None) {
1040            queue.extend(roots);
1041        }
1042
1043        while let Some(msg) = queue.pop_front() {
1044            let id = msg.id.to_string();
1045            result.push(msg.clone());
1046            if let Some(child_msgs) = children.remove(&Some(id)) {
1047                queue.extend(child_msgs);
1048            }
1049        }
1050
1051        result
1052    }
1053}
1054
1055#[async_trait::async_trait]
1056impl Persistence for JsonlPersistence {
1057    fn name(&self) -> &str {
1058        "jsonl"
1059    }
1060
1061    async fn save(&self, session: &Session) -> SessionResult<()> {
1062        let project_path = Self::get_project_path(session);
1063        let file_path = self.session_file_path(&session.id, project_path.as_deref());
1064
1065        // Get persisted state from index (avoid re-reading file)
1066        let (persisted_ids, prev_todos_hash, prev_plan_hash) = {
1067            let index = self.index.read().await;
1068            match index.sessions.get(&session.id) {
1069                Some(m) => (m.persisted_ids.clone(), m.todos_hash, m.plan_hash),
1070                None => (HashSet::new(), 0, 0),
1071            }
1072        };
1073
1074        let mut new_entries = Vec::new();
1075        let mut new_ids = HashSet::new();
1076        let is_first_save = persisted_ids.is_empty();
1077
1078        // Session meta - always write on first save
1079        if is_first_save {
1080            new_entries.push(Self::session_to_meta_entry(session));
1081        }
1082
1083        // Only new messages (incremental)
1084        for msg in &session.messages {
1085            let id = msg.id.to_string();
1086            if !persisted_ids.contains(&id) {
1087                new_entries.push(JsonlEntry::from_message(&session.id, msg));
1088                new_ids.insert(id);
1089            }
1090        }
1091
1092        // Compute current hashes
1093        let current_todos_hash = Self::compute_todos_hash(&session.todos);
1094        let current_plan_hash = Self::compute_plan_hash(session.current_plan.as_ref());
1095
1096        // Persist todos only if changed (including when cleared)
1097        if current_todos_hash != prev_todos_hash {
1098            for todo in &session.todos {
1099                new_entries.push(JsonlEntry::Todo(TodoEntry {
1100                    id: todo.id.to_string(),
1101                    session_id: session.id.to_string(),
1102                    content: todo.content.clone(),
1103                    active_form: todo.active_form.clone(),
1104                    status: format!("{:?}", todo.status).to_lowercase(),
1105                    plan_id: todo.plan_id.map(|id| id.to_string()),
1106                    created_at: todo.created_at,
1107                    started_at: todo.started_at,
1108                    completed_at: todo.completed_at,
1109                }));
1110            }
1111        }
1112
1113        // Persist plan only if changed
1114        if current_plan_hash != prev_plan_hash
1115            && let Some(ref plan) = session.current_plan
1116        {
1117            new_entries.push(JsonlEntry::Plan(PlanEntry {
1118                id: plan.id.to_string(),
1119                session_id: session.id.to_string(),
1120                name: plan.name.clone(),
1121                content: plan.content.clone(),
1122                status: format!("{:?}", plan.status).to_lowercase(),
1123                error: plan.error.clone(),
1124                created_at: plan.created_at,
1125                approved_at: plan.approved_at,
1126                started_at: plan.started_at,
1127                completed_at: plan.completed_at,
1128            }));
1129        }
1130
1131        // Persist compact history (incremental by ID)
1132        for compact in &session.compact_history {
1133            let compact_id = format!("compact:{}", compact.id);
1134            if !persisted_ids.contains(&compact_id) {
1135                new_entries.push(JsonlEntry::Compact(CompactEntry {
1136                    id: compact.id.to_string(),
1137                    session_id: session.id.to_string(),
1138                    trigger: format!("{:?}", compact.trigger).to_lowercase(),
1139                    pre_tokens: compact.pre_tokens,
1140                    post_tokens: compact.post_tokens,
1141                    saved_tokens: compact.saved_tokens,
1142                    summary: compact.summary.clone(),
1143                    original_count: compact.original_count,
1144                    new_count: compact.new_count,
1145                    logical_parent_id: compact.logical_parent_id.as_ref().map(|id| id.to_string()),
1146                    created_at: compact.created_at,
1147                }));
1148                new_ids.insert(compact_id);
1149            }
1150        }
1151
1152        if new_entries.is_empty() {
1153            return Ok(());
1154        }
1155
1156        // Write in blocking context
1157        let path_clone = file_path.clone();
1158        let sync = self.config.sync_mode == SyncMode::OnWrite;
1159        tokio::task::spawn_blocking(move || append_entries_sync(&path_clone, &new_entries, sync))
1160            .await
1161            .map_err(|e| SessionError::Storage {
1162                message: format!("Task join error: {}", e),
1163            })??;
1164
1165        // Update index with new hashes
1166        let mut index = self.index.write().await;
1167        let mut persisted = persisted_ids;
1168        persisted.extend(new_ids);
1169
1170        index.insert(
1171            session.id,
1172            SessionMeta {
1173                path: file_path,
1174                project_path,
1175                tenant_id: session.tenant_id.clone(),
1176                parent_id: session.parent_id,
1177                updated_at: session.updated_at,
1178                persisted_ids: persisted,
1179                todos_hash: current_todos_hash,
1180                plan_hash: current_plan_hash,
1181            },
1182        );
1183
1184        Ok(())
1185    }
1186
1187    async fn load(&self, id: &SessionId) -> SessionResult<Option<Session>> {
1188        let path = {
1189            let index = self.index.read().await;
1190            match index.sessions.get(id) {
1191                Some(m) => m.path.clone(),
1192                None => return Ok(None),
1193            }
1194        };
1195
1196        let entries = tokio::task::spawn_blocking(move || read_entries_sync(&path))
1197            .await
1198            .map_err(|e| SessionError::Storage {
1199                message: format!("Task join error: {}", e),
1200            })??;
1201
1202        if entries.is_empty() {
1203            return Ok(None);
1204        }
1205
1206        let session = Self::reconstruct_session(*id, entries);
1207        Ok(Some(session))
1208    }
1209
1210    async fn delete(&self, id: &SessionId) -> SessionResult<bool> {
1211        let meta = {
1212            let mut index = self.index.write().await;
1213            index.remove(id)
1214        };
1215
1216        let Some(meta) = meta else {
1217            return Ok(false);
1218        };
1219
1220        if meta.path.exists() {
1221            tokio::fs::remove_file(&meta.path)
1222                .await
1223                .map_err(|e| SessionError::Storage {
1224                    message: format!("Failed to delete {}: {}", meta.path.display(), e),
1225                })?;
1226        }
1227
1228        self.summaries.write().await.remove(id);
1229        self.queue.write().await.remove(id);
1230        Ok(true)
1231    }
1232
1233    async fn list(&self, tenant_id: Option<&str>) -> SessionResult<Vec<SessionId>> {
1234        let index = self.index.read().await;
1235        Ok(match tenant_id {
1236            Some(tid) => index.by_tenant.get(tid).cloned().unwrap_or_default(),
1237            None => index.sessions.keys().copied().collect(),
1238        })
1239    }
1240
1241    async fn list_children(&self, parent_id: &SessionId) -> SessionResult<Vec<SessionId>> {
1242        let index = self.index.read().await;
1243        Ok(index.by_parent.get(parent_id).cloned().unwrap_or_default())
1244    }
1245
1246    async fn add_summary(&self, snapshot: SummarySnapshot) -> SessionResult<()> {
1247        let path = {
1248            let index = self.index.read().await;
1249            index
1250                .sessions
1251                .get(&snapshot.session_id)
1252                .map(|m| m.path.clone())
1253        };
1254
1255        if let Some(path) = path {
1256            let entry = JsonlEntry::Summary(SummaryEntry {
1257                session_id: snapshot.session_id.to_string(),
1258                summary: snapshot.summary.clone(),
1259                leaf_uuid: snapshot.leaf_message_id.as_ref().map(|id| id.to_string()),
1260                timestamp: snapshot.created_at,
1261            });
1262
1263            let sync = self.config.sync_mode == SyncMode::OnWrite;
1264            tokio::task::spawn_blocking(move || append_entries_sync(&path, &[entry], sync))
1265                .await
1266                .map_err(|e| SessionError::Storage {
1267                    message: format!("Task join error: {}", e),
1268                })??;
1269        }
1270
1271        self.summaries
1272            .write()
1273            .await
1274            .entry(snapshot.session_id)
1275            .or_default()
1276            .push(snapshot);
1277
1278        Ok(())
1279    }
1280
1281    async fn get_summaries(&self, session_id: &SessionId) -> SessionResult<Vec<SummarySnapshot>> {
1282        Ok(self
1283            .summaries
1284            .read()
1285            .await
1286            .get(session_id)
1287            .cloned()
1288            .unwrap_or_default())
1289    }
1290
1291    async fn enqueue(
1292        &self,
1293        session_id: &SessionId,
1294        content: String,
1295        priority: i32,
1296    ) -> SessionResult<QueueItem> {
1297        let item = QueueItem::enqueue(*session_id, content.clone()).with_priority(priority);
1298
1299        let path = {
1300            let index = self.index.read().await;
1301            index.sessions.get(session_id).map(|m| m.path.clone())
1302        };
1303
1304        if let Some(path) = path {
1305            let entry = JsonlEntry::QueueOperation(QueueOperationEntry {
1306                operation: "enqueue".to_string(),
1307                session_id: session_id.to_string(),
1308                timestamp: Utc::now(),
1309                content,
1310                priority,
1311                item_id: item.id.to_string(),
1312            });
1313
1314            let sync = self.config.sync_mode == SyncMode::OnWrite;
1315            tokio::task::spawn_blocking(move || append_entries_sync(&path, &[entry], sync))
1316                .await
1317                .map_err(|e| SessionError::Storage {
1318                    message: format!("Task join error: {}", e),
1319                })??;
1320        }
1321
1322        self.queue
1323            .write()
1324            .await
1325            .entry(*session_id)
1326            .or_default()
1327            .push(item.clone());
1328
1329        Ok(item)
1330    }
1331
1332    async fn dequeue(&self, session_id: &SessionId) -> SessionResult<Option<QueueItem>> {
1333        let mut queue = self.queue.write().await;
1334        let items = match queue.get_mut(session_id) {
1335            Some(items) => items,
1336            None => return Ok(None),
1337        };
1338
1339        items.sort_by(|a, b| b.priority.cmp(&a.priority));
1340
1341        for item in items.iter_mut() {
1342            if item.status == QueueStatus::Pending {
1343                item.start_processing();
1344                return Ok(Some(item.clone()));
1345            }
1346        }
1347
1348        Ok(None)
1349    }
1350
1351    async fn cancel_queued(&self, item_id: Uuid) -> SessionResult<bool> {
1352        let mut queue = self.queue.write().await;
1353        for items in queue.values_mut() {
1354            if let Some(item) = items.iter_mut().find(|i| i.id == item_id) {
1355                item.cancel();
1356                return Ok(true);
1357            }
1358        }
1359        Ok(false)
1360    }
1361
1362    async fn pending_queue(&self, session_id: &SessionId) -> SessionResult<Vec<QueueItem>> {
1363        Ok(self
1364            .queue
1365            .read()
1366            .await
1367            .get(session_id)
1368            .map(|items| {
1369                items
1370                    .iter()
1371                    .filter(|i| i.status == QueueStatus::Pending)
1372                    .cloned()
1373                    .collect()
1374            })
1375            .unwrap_or_default())
1376    }
1377
1378    async fn cleanup_expired(&self) -> SessionResult<usize> {
1379        let cutoff = Utc::now() - chrono::Duration::days(self.config.retention_days as i64);
1380
1381        // Collect expired sessions and remove from index in single lock
1382        let expired_paths: Vec<PathBuf> = {
1383            let mut index = self.index.write().await;
1384            let expired_ids: Vec<SessionId> = index
1385                .sessions
1386                .iter()
1387                .filter(|(_, m)| m.updated_at < cutoff)
1388                .map(|(id, _)| *id)
1389                .collect();
1390
1391            let mut paths = Vec::with_capacity(expired_ids.len());
1392            for id in &expired_ids {
1393                if let Some(meta) = index.remove(id) {
1394                    paths.push(meta.path);
1395                }
1396            }
1397            paths
1398        };
1399
1400        let count = expired_paths.len();
1401
1402        // Delete files without holding the lock
1403        for path in expired_paths {
1404            let _ = tokio::fs::remove_file(&path).await;
1405        }
1406
1407        Ok(count)
1408    }
1409}
1410
1411// ============================================================================
1412// Tests
1413// ============================================================================
1414
1415#[cfg(test)]
1416mod tests {
1417    use super::*;
1418    use crate::types::ContentBlock;
1419    use tempfile::TempDir;
1420
1421    async fn create_test_persistence() -> (JsonlPersistence, TempDir) {
1422        let temp_dir = TempDir::new().unwrap();
1423        let config = JsonlConfig::builder()
1424            .base_dir(temp_dir.path().to_path_buf())
1425            .build();
1426        let persistence = JsonlPersistence::new(config).await.unwrap();
1427        (persistence, temp_dir)
1428    }
1429
1430    #[tokio::test]
1431    async fn test_save_and_load_session() {
1432        let (persistence, _temp) = create_test_persistence().await;
1433
1434        let mut session = Session::new(SessionConfig::default());
1435        session.add_message(SessionMessage::user(vec![ContentBlock::text("Hello")]));
1436        session.add_message(SessionMessage::assistant(vec![ContentBlock::text(
1437            "Hi there!",
1438        )]));
1439
1440        persistence.save(&session).await.unwrap();
1441
1442        let loaded = persistence.load(&session.id).await.unwrap().unwrap();
1443        assert_eq!(loaded.id, session.id);
1444        assert_eq!(loaded.messages.len(), 2);
1445    }
1446
1447    #[tokio::test]
1448    async fn test_incremental_save() {
1449        let (persistence, _temp) = create_test_persistence().await;
1450
1451        let mut session = Session::new(SessionConfig::default());
1452        session.add_message(SessionMessage::user(vec![ContentBlock::text("First")]));
1453        persistence.save(&session).await.unwrap();
1454
1455        session.add_message(SessionMessage::assistant(vec![ContentBlock::text(
1456            "Second",
1457        )]));
1458        persistence.save(&session).await.unwrap();
1459
1460        let loaded = persistence.load(&session.id).await.unwrap().unwrap();
1461        assert_eq!(loaded.messages.len(), 2);
1462    }
1463
1464    #[tokio::test]
1465    async fn test_delete_session() {
1466        let (persistence, _temp) = create_test_persistence().await;
1467
1468        let session = Session::new(SessionConfig::default());
1469        persistence.save(&session).await.unwrap();
1470
1471        assert!(persistence.delete(&session.id).await.unwrap());
1472        assert!(persistence.load(&session.id).await.unwrap().is_none());
1473    }
1474
1475    #[tokio::test]
1476    async fn test_list_sessions() {
1477        let (persistence, _temp) = create_test_persistence().await;
1478
1479        let s1 = Session::new(SessionConfig::default());
1480        let s2 = Session::new(SessionConfig::default());
1481
1482        persistence.save(&s1).await.unwrap();
1483        persistence.save(&s2).await.unwrap();
1484
1485        let list = persistence.list(None).await.unwrap();
1486        assert_eq!(list.len(), 2);
1487    }
1488
1489    #[tokio::test]
1490    async fn test_tenant_filtering() {
1491        let (persistence, _temp) = create_test_persistence().await;
1492
1493        let mut s1 = Session::new(SessionConfig::default());
1494        s1.tenant_id = Some("tenant-a".to_string());
1495
1496        let mut s2 = Session::new(SessionConfig::default());
1497        s2.tenant_id = Some("tenant-b".to_string());
1498
1499        persistence.save(&s1).await.unwrap();
1500        persistence.save(&s2).await.unwrap();
1501
1502        let list = persistence.list(Some("tenant-a")).await.unwrap();
1503        assert_eq!(list.len(), 1);
1504        assert_eq!(list[0], s1.id);
1505    }
1506
1507    #[tokio::test]
1508    async fn test_summaries() {
1509        let (persistence, _temp) = create_test_persistence().await;
1510
1511        let session = Session::new(SessionConfig::default());
1512        persistence.save(&session).await.unwrap();
1513
1514        persistence
1515            .add_summary(SummarySnapshot::new(session.id, "Summary 1"))
1516            .await
1517            .unwrap();
1518        persistence
1519            .add_summary(SummarySnapshot::new(session.id, "Summary 2"))
1520            .await
1521            .unwrap();
1522
1523        let summaries = persistence.get_summaries(&session.id).await.unwrap();
1524        assert_eq!(summaries.len(), 2);
1525    }
1526
1527    #[tokio::test]
1528    async fn test_queue_operations() {
1529        let (persistence, _temp) = create_test_persistence().await;
1530
1531        let session = Session::new(SessionConfig::default());
1532        persistence.save(&session).await.unwrap();
1533
1534        persistence
1535            .enqueue(&session.id, "Low priority".to_string(), 1)
1536            .await
1537            .unwrap();
1538        persistence
1539            .enqueue(&session.id, "High priority".to_string(), 10)
1540            .await
1541            .unwrap();
1542
1543        let next = persistence.dequeue(&session.id).await.unwrap().unwrap();
1544        assert_eq!(next.content, "High priority");
1545    }
1546
1547    #[tokio::test]
1548    async fn test_dag_reconstruction() {
1549        let (persistence, _temp) = create_test_persistence().await;
1550
1551        let mut session = Session::new(SessionConfig::default());
1552        session.add_message(SessionMessage::user(vec![ContentBlock::text("Q1")]));
1553        session.add_message(SessionMessage::assistant(vec![ContentBlock::text("A1")]));
1554        session.add_message(SessionMessage::user(vec![ContentBlock::text("Q2")]));
1555        session.add_message(SessionMessage::assistant(vec![ContentBlock::text("A2")]));
1556
1557        persistence.save(&session).await.unwrap();
1558
1559        let loaded = persistence.load(&session.id).await.unwrap().unwrap();
1560
1561        assert_eq!(loaded.messages.len(), 4);
1562        assert!(
1563            loaded.messages[0]
1564                .content
1565                .iter()
1566                .any(|c| c.as_text() == Some("Q1"))
1567        );
1568        assert!(
1569            loaded.messages[1]
1570                .content
1571                .iter()
1572                .any(|c| c.as_text() == Some("A1"))
1573        );
1574        assert!(
1575            loaded.messages[2]
1576                .content
1577                .iter()
1578                .any(|c| c.as_text() == Some("Q2"))
1579        );
1580        assert!(
1581            loaded.messages[3]
1582                .content
1583                .iter()
1584                .any(|c| c.as_text() == Some("A2"))
1585        );
1586    }
1587
1588    #[tokio::test]
1589    async fn test_project_path_encoding() {
1590        let config = JsonlConfig::default();
1591
1592        assert_eq!(
1593            config.encode_project_path(Path::new("/home/user/project")),
1594            "-home-user-project"
1595        );
1596        assert_eq!(
1597            config.encode_project_path(Path::new("/Users/alice/work/app")),
1598            "-Users-alice-work-app"
1599        );
1600    }
1601
1602    #[test]
1603    fn test_jsonl_entry_serialization() {
1604        let msg = SessionMessage::user(vec![ContentBlock::text("Hello")]);
1605        let session_id = SessionId::new();
1606        let entry = JsonlEntry::from_message(&session_id, &msg);
1607
1608        let json = serde_json::to_string(&entry).unwrap();
1609        assert!(json.contains("\"type\":\"user\""));
1610
1611        let parsed: JsonlEntry = serde_json::from_str(&json).unwrap();
1612        assert!(matches!(parsed, JsonlEntry::User(_)));
1613    }
1614
1615    #[tokio::test]
1616    async fn test_no_duplicate_writes() {
1617        let (persistence, _temp) = create_test_persistence().await;
1618
1619        let mut session = Session::new(SessionConfig::default());
1620        session.add_message(SessionMessage::user(vec![ContentBlock::text("Hello")]));
1621        persistence.save(&session).await.unwrap();
1622        persistence.save(&session).await.unwrap(); // Save same data twice
1623        persistence.save(&session).await.unwrap(); // And again
1624
1625        // Check file has only one message entry + session meta
1626        let file_path = persistence.session_file_path(&session.id, None);
1627        let entries = read_entries_sync(&file_path).unwrap();
1628        let message_count = entries
1629            .iter()
1630            .filter(|e| e.message_uuid().is_some())
1631            .count();
1632        assert_eq!(message_count, 1, "Should not duplicate message entries");
1633    }
1634
1635    #[test]
1636    fn test_windows_path_encoding() {
1637        let config = JsonlConfig::default();
1638        // Windows-style path
1639        assert_eq!(
1640            config.encode_project_path(Path::new("C:\\Users\\alice\\project")),
1641            "C_-Users-alice-project"
1642        );
1643    }
1644}