Skip to main content

claude_agent/session/
persistence_jsonl.rs

1//! JSONL-based persistence backend compatible with Claude Code CLI.
2//!
3//! This module provides file-based session persistence using the JSONL (JSON Lines) format,
4//! matching the Claude Code CLI's storage format for full interoperability.
5//!
6//! # Features
7//!
8//! - **Claude Code CLI Compatible**: Uses the same file structure and format as the official CLI
9//! - **DAG Structure**: Messages form a directed acyclic graph via parent_uuid references
10//! - **Incremental Writes**: Only new entries are appended, avoiding full file rewrites
11//! - **Project-Based Organization**: Sessions organized by encoded project paths
12//! - **Async I/O**: Non-blocking file operations via tokio
13//!
14//! # File Structure
15//!
16//! ```text
17//! ~/.claude/
18//! └── projects/
19//!     └── {encoded-project-path}/
20//!         ├── {session-uuid}.jsonl    # Conversation history
21//!         └── ...
22//! ```
23
24use std::collections::{HashMap, HashSet, VecDeque};
25use std::hash::{BuildHasher, Hash, Hasher};
26use std::io::{BufRead, BufReader, Write};
27use std::path::{Path, PathBuf};
28use std::sync::Arc;
29
30use chrono::{DateTime, Utc};
31use rust_decimal::Decimal;
32use serde::{Deserialize, Serialize};
33use tokio::sync::RwLock;
34use uuid::Uuid;
35
36use super::state::{MessageId, Session, SessionConfig, SessionId, SessionMessage, SessionType};
37use super::types::{
38    CompactRecord, EnvironmentContext, Plan, QueueItem, QueueOperation, QueueStatus,
39    SummarySnapshot, TodoItem,
40};
41use super::{Persistence, SessionError, SessionResult};
42use crate::types::{ContentBlock, Role, TokenUsage};
43
44// ============================================================================
45// Enum Serialization Helpers (consistent with persistence_postgres.rs)
46// ============================================================================
47
48/// Serialize an enum to its serde string representation for JSONL storage.
49fn enum_to_jsonl<T: serde::Serialize>(value: &T, default: &str) -> String {
50    serde_json::to_string(value)
51        .map(|s| s.trim_matches('"').to_string())
52        .unwrap_or_else(|_| default.to_string())
53}
54
55/// Deserialize an enum from its serde string representation in JSONL storage.
56fn jsonl_to_enum<T: serde::de::DeserializeOwned>(s: &str) -> Option<T> {
57    serde_json::from_str(&format!("\"{}\"", s)).ok()
58}
59
60// ============================================================================
61// Configuration
62// ============================================================================
63
64/// Sync mode for file operations.
65#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
66pub enum SyncMode {
67    /// No explicit sync (OS buffering only).
68    #[default]
69    None,
70    /// Sync after every write (safest, slowest).
71    OnWrite,
72}
73
74/// Configuration for JSONL persistence.
75#[derive(Clone, Debug)]
76pub struct JsonlConfig {
77    /// Base directory for storage (default: ~/.claude).
78    pub base_dir: PathBuf,
79    /// Log retention period in days (default: 30).
80    pub retention_days: u32,
81    /// File sync mode for durability.
82    pub sync_mode: SyncMode,
83}
84
85impl Default for JsonlConfig {
86    fn default() -> Self {
87        Self {
88            base_dir: crate::common::home_dir()
89                .unwrap_or_else(|| PathBuf::from("."))
90                .join(".claude"),
91            retention_days: 30,
92            sync_mode: SyncMode::default(),
93        }
94    }
95}
96
97impl JsonlConfig {
98    pub fn builder() -> JsonlConfigBuilder {
99        JsonlConfigBuilder::default()
100    }
101
102    fn projects_dir(&self) -> PathBuf {
103        self.base_dir.join("projects")
104    }
105
106    /// Encode a project path for use as a directory name using hex encoding.
107    /// Each byte of the path is encoded as two hex characters, producing collision-free names.
108    fn encode_project_path(&self, path: &Path) -> String {
109        path.as_os_str()
110            .as_encoded_bytes()
111            .iter()
112            .map(|b| format!("{:02x}", b))
113            .collect::<String>()
114    }
115
116    fn project_dir(&self, project_path: &Path) -> PathBuf {
117        self.projects_dir()
118            .join(self.encode_project_path(project_path))
119    }
120}
121
122/// Builder for JsonlConfig.
123#[derive(Default)]
124pub struct JsonlConfigBuilder {
125    base_dir: Option<PathBuf>,
126    retention_days: Option<u32>,
127    sync_mode: Option<SyncMode>,
128}
129
130impl JsonlConfigBuilder {
131    pub fn base_dir(mut self, path: impl Into<PathBuf>) -> Self {
132        self.base_dir = Some(path.into());
133        self
134    }
135
136    pub fn retention_days(mut self, days: u32) -> Self {
137        self.retention_days = Some(days);
138        self
139    }
140
141    pub fn sync_mode(mut self, mode: SyncMode) -> Self {
142        self.sync_mode = Some(mode);
143        self
144    }
145
146    pub fn build(self) -> JsonlConfig {
147        let default = JsonlConfig::default();
148        JsonlConfig {
149            base_dir: self.base_dir.unwrap_or(default.base_dir),
150            retention_days: self.retention_days.unwrap_or(default.retention_days),
151            sync_mode: self.sync_mode.unwrap_or(default.sync_mode),
152        }
153    }
154}
155
156// ============================================================================
157// JSONL Entry Types (Claude Code CLI Compatible)
158// ============================================================================
159
160/// JSONL entry types matching Claude Code CLI format.
161#[derive(Clone, Debug, Serialize, Deserialize)]
162#[serde(tag = "type", rename_all = "snake_case")]
163pub enum JsonlEntry {
164    User(UserEntry),
165    Assistant(AssistantEntry),
166    System(SystemEntry),
167    QueueOperation(QueueOperationEntry),
168    Summary(SummaryEntry),
169    SessionMeta(SessionMetaEntry),
170    Todo(TodoEntry),
171    Plan(PlanEntry),
172    Compact(CompactEntry),
173}
174
175/// Common fields shared by message entries.
176#[derive(Clone, Debug, Serialize, Deserialize)]
177pub struct EntryCommon {
178    pub uuid: String,
179    #[serde(rename = "parentUuid", skip_serializing_if = "Option::is_none")]
180    pub parent_uuid: Option<String>,
181    #[serde(rename = "sessionId")]
182    pub session_id: String,
183    pub timestamp: DateTime<Utc>,
184    #[serde(skip_serializing_if = "Option::is_none")]
185    pub cwd: Option<PathBuf>,
186    pub version: String,
187    #[serde(rename = "gitBranch", default)]
188    pub git_branch: String,
189    #[serde(rename = "isSidechain", default)]
190    pub is_sidechain: bool,
191}
192
193impl EntryCommon {
194    fn from_message(session_id: &SessionId, msg: &SessionMessage) -> Self {
195        Self {
196            uuid: msg.id.to_string(),
197            parent_uuid: msg.parent_id.as_ref().map(|id| id.to_string()),
198            session_id: session_id.to_string(),
199            timestamp: msg.timestamp,
200            cwd: msg.environment.as_ref().and_then(|e| e.cwd.clone()),
201            version: env!("CARGO_PKG_VERSION").to_string(),
202            git_branch: msg
203                .environment
204                .as_ref()
205                .and_then(|e| e.git_branch.clone())
206                .unwrap_or_default(),
207            is_sidechain: msg.is_sidechain,
208        }
209    }
210
211    fn to_environment(&self) -> EnvironmentContext {
212        EnvironmentContext {
213            cwd: self.cwd.clone(),
214            git_branch: if self.git_branch.is_empty() {
215                None
216            } else {
217                Some(self.git_branch.clone())
218            },
219            ..Default::default()
220        }
221    }
222}
223
224#[derive(Clone, Debug, Serialize, Deserialize)]
225pub struct UserMessageContent {
226    pub role: String,
227    pub content: serde_json::Value,
228}
229
230#[derive(Clone, Debug, Serialize, Deserialize)]
231pub struct UserEntry {
232    #[serde(flatten)]
233    pub common: EntryCommon,
234    pub message: UserMessageContent,
235    #[serde(rename = "isCompactSummary", default)]
236    pub is_compact_summary: bool,
237}
238
239#[derive(Clone, Debug, Serialize, Deserialize)]
240pub struct AssistantMessageContent {
241    #[serde(skip_serializing_if = "Option::is_none")]
242    pub id: Option<String>,
243    #[serde(skip_serializing_if = "Option::is_none")]
244    pub model: Option<String>,
245    pub role: String,
246    #[serde(skip_serializing_if = "Option::is_none")]
247    pub stop_reason: Option<String>,
248    pub content: serde_json::Value,
249    #[serde(skip_serializing_if = "Option::is_none")]
250    pub usage: Option<UsageInfo>,
251}
252
253#[derive(Clone, Debug, Default, Serialize, Deserialize)]
254pub struct UsageInfo {
255    pub input_tokens: u64,
256    pub output_tokens: u64,
257    #[serde(default)]
258    pub cache_creation_input_tokens: u64,
259    #[serde(default)]
260    pub cache_read_input_tokens: u64,
261}
262
263impl From<&TokenUsage> for UsageInfo {
264    fn from(u: &TokenUsage) -> Self {
265        Self {
266            input_tokens: u.input_tokens,
267            output_tokens: u.output_tokens,
268            cache_creation_input_tokens: u.cache_creation_input_tokens,
269            cache_read_input_tokens: u.cache_read_input_tokens,
270        }
271    }
272}
273
274impl From<&UsageInfo> for TokenUsage {
275    fn from(u: &UsageInfo) -> Self {
276        Self {
277            input_tokens: u.input_tokens,
278            output_tokens: u.output_tokens,
279            cache_creation_input_tokens: u.cache_creation_input_tokens,
280            cache_read_input_tokens: u.cache_read_input_tokens,
281        }
282    }
283}
284
285#[derive(Clone, Debug, Serialize, Deserialize)]
286pub struct AssistantEntry {
287    #[serde(flatten)]
288    pub common: EntryCommon,
289    pub message: AssistantMessageContent,
290    #[serde(rename = "requestId", skip_serializing_if = "Option::is_none")]
291    pub request_id: Option<String>,
292}
293
294#[derive(Clone, Debug, Serialize, Deserialize)]
295pub struct SystemEntry {
296    #[serde(flatten)]
297    pub common: EntryCommon,
298    pub subtype: String,
299    #[serde(skip_serializing_if = "Option::is_none")]
300    pub content: Option<String>,
301}
302
303#[derive(Clone, Debug, Serialize, Deserialize)]
304pub struct QueueOperationEntry {
305    pub operation: String,
306    #[serde(rename = "sessionId")]
307    pub session_id: String,
308    pub timestamp: DateTime<Utc>,
309    pub content: String,
310    pub priority: i32,
311    #[serde(rename = "itemId")]
312    pub item_id: String,
313}
314
315#[derive(Clone, Debug, Serialize, Deserialize)]
316pub struct SummaryEntry {
317    #[serde(rename = "sessionId")]
318    pub session_id: String,
319    pub summary: String,
320    #[serde(rename = "leafUuid", skip_serializing_if = "Option::is_none")]
321    pub leaf_uuid: Option<String>,
322    pub timestamp: DateTime<Utc>,
323}
324
325#[derive(Clone, Debug, Serialize, Deserialize)]
326pub struct SessionMetaEntry {
327    #[serde(rename = "sessionId")]
328    pub session_id: String,
329    #[serde(rename = "parentSessionId", skip_serializing_if = "Option::is_none")]
330    pub parent_session_id: Option<String>,
331    #[serde(rename = "tenantId", skip_serializing_if = "Option::is_none")]
332    pub tenant_id: Option<String>,
333    #[serde(rename = "sessionType")]
334    pub session_type: serde_json::Value,
335    pub mode: String,
336    pub state: String,
337    pub config: serde_json::Value,
338    #[serde(rename = "permissionPolicy")]
339    pub permission_policy: serde_json::Value,
340    #[serde(rename = "totalUsage", default)]
341    pub total_usage: UsageInfo,
342    #[serde(rename = "totalCostUsd", default)]
343    pub total_cost_usd: Decimal,
344    #[serde(rename = "staticContextHash", skip_serializing_if = "Option::is_none")]
345    pub static_context_hash: Option<String>,
346    #[serde(skip_serializing_if = "Option::is_none")]
347    pub error: Option<String>,
348    #[serde(rename = "createdAt")]
349    pub created_at: DateTime<Utc>,
350    #[serde(rename = "updatedAt")]
351    pub updated_at: DateTime<Utc>,
352    #[serde(rename = "expiresAt", skip_serializing_if = "Option::is_none")]
353    pub expires_at: Option<DateTime<Utc>>,
354}
355
356#[derive(Clone, Debug, Serialize, Deserialize)]
357pub struct TodoEntry {
358    pub id: String,
359    #[serde(rename = "sessionId")]
360    pub session_id: String,
361    pub content: String,
362    #[serde(rename = "activeForm")]
363    pub active_form: String,
364    pub status: String,
365    #[serde(rename = "planId", skip_serializing_if = "Option::is_none")]
366    pub plan_id: Option<String>,
367    #[serde(rename = "createdAt")]
368    pub created_at: DateTime<Utc>,
369    #[serde(rename = "startedAt", skip_serializing_if = "Option::is_none")]
370    pub started_at: Option<DateTime<Utc>>,
371    #[serde(rename = "completedAt", skip_serializing_if = "Option::is_none")]
372    pub completed_at: Option<DateTime<Utc>>,
373}
374
375#[derive(Clone, Debug, Serialize, Deserialize)]
376pub struct PlanEntry {
377    pub id: String,
378    #[serde(rename = "sessionId")]
379    pub session_id: String,
380    #[serde(skip_serializing_if = "Option::is_none")]
381    pub name: Option<String>,
382    pub content: String,
383    pub status: String,
384    #[serde(skip_serializing_if = "Option::is_none")]
385    pub error: Option<String>,
386    #[serde(rename = "createdAt")]
387    pub created_at: DateTime<Utc>,
388    #[serde(rename = "approvedAt", skip_serializing_if = "Option::is_none")]
389    pub approved_at: Option<DateTime<Utc>>,
390    #[serde(rename = "startedAt", skip_serializing_if = "Option::is_none")]
391    pub started_at: Option<DateTime<Utc>>,
392    #[serde(rename = "completedAt", skip_serializing_if = "Option::is_none")]
393    pub completed_at: Option<DateTime<Utc>>,
394}
395
396#[derive(Clone, Debug, Serialize, Deserialize)]
397pub struct CompactEntry {
398    pub id: String,
399    #[serde(rename = "sessionId")]
400    pub session_id: String,
401    pub trigger: String,
402    #[serde(rename = "preTokens")]
403    pub pre_tokens: usize,
404    #[serde(rename = "postTokens")]
405    pub post_tokens: usize,
406    #[serde(rename = "savedTokens")]
407    pub saved_tokens: usize,
408    pub summary: String,
409    #[serde(rename = "originalCount")]
410    pub original_count: usize,
411    #[serde(rename = "newCount")]
412    pub new_count: usize,
413    #[serde(rename = "logicalParentId", skip_serializing_if = "Option::is_none")]
414    pub logical_parent_id: Option<String>,
415    #[serde(rename = "createdAt")]
416    pub created_at: DateTime<Utc>,
417}
418
419// ============================================================================
420// Conversion: SessionMessage <-> JsonlEntry
421// ============================================================================
422
423impl JsonlEntry {
424    fn from_message(session_id: &SessionId, msg: &SessionMessage) -> Self {
425        let common = EntryCommon::from_message(session_id, msg);
426
427        let serialize_content = |content: &[ContentBlock]| -> serde_json::Value {
428            serde_json::to_value(content).unwrap_or_else(|e| {
429                tracing::warn!(
430                    session_id = %session_id,
431                    error = %e,
432                    "Failed to serialize message content, using empty array"
433                );
434                serde_json::Value::Array(Vec::new())
435            })
436        };
437
438        match msg.role {
439            Role::User => JsonlEntry::User(UserEntry {
440                common,
441                message: UserMessageContent {
442                    role: "user".to_string(),
443                    content: serialize_content(&msg.content),
444                },
445                is_compact_summary: msg.is_compact_summary,
446            }),
447            Role::Assistant => JsonlEntry::Assistant(AssistantEntry {
448                common,
449                message: AssistantMessageContent {
450                    id: msg.metadata.request_id.clone(),
451                    model: msg.metadata.model.clone(),
452                    role: "assistant".to_string(),
453                    stop_reason: None,
454                    content: serialize_content(&msg.content),
455                    usage: msg.usage.as_ref().map(UsageInfo::from),
456                },
457                request_id: msg.metadata.request_id.clone(),
458            }),
459        }
460    }
461
462    fn to_session_message(&self) -> Option<SessionMessage> {
463        match self {
464            JsonlEntry::User(entry) => {
465                let content: Vec<ContentBlock> =
466                    serde_json::from_value(entry.message.content.clone()).unwrap_or_else(|e| {
467                        tracing::warn!(
468                            uuid = %entry.common.uuid,
469                            error = %e,
470                            "Failed to deserialize user message content"
471                        );
472                        Vec::new()
473                    });
474                let mut msg = SessionMessage::user(content);
475                msg.id = MessageId::from_string(&entry.common.uuid);
476                msg.parent_id = entry
477                    .common
478                    .parent_uuid
479                    .as_ref()
480                    .map(MessageId::from_string);
481                msg.timestamp = entry.common.timestamp;
482                msg.is_sidechain = entry.common.is_sidechain;
483                msg.is_compact_summary = entry.is_compact_summary;
484                msg.environment = Some(entry.common.to_environment());
485                Some(msg)
486            }
487            JsonlEntry::Assistant(entry) => {
488                let content: Vec<ContentBlock> =
489                    serde_json::from_value(entry.message.content.clone()).unwrap_or_else(|e| {
490                        tracing::warn!(
491                            uuid = %entry.common.uuid,
492                            error = %e,
493                            "Failed to deserialize assistant message content"
494                        );
495                        Vec::new()
496                    });
497                let mut msg = SessionMessage::assistant(content);
498                msg.id = MessageId::from_string(&entry.common.uuid);
499                msg.parent_id = entry
500                    .common
501                    .parent_uuid
502                    .as_ref()
503                    .map(MessageId::from_string);
504                msg.timestamp = entry.common.timestamp;
505                msg.is_sidechain = entry.common.is_sidechain;
506                msg.usage = entry.message.usage.as_ref().map(TokenUsage::from);
507                msg.metadata.model.clone_from(&entry.message.model);
508                msg.metadata.request_id.clone_from(&entry.request_id);
509                msg.environment = Some(entry.common.to_environment());
510                Some(msg)
511            }
512            _ => None,
513        }
514    }
515
516    #[cfg(test)]
517    fn message_uuid(&self) -> Option<&str> {
518        match self {
519            JsonlEntry::User(e) => Some(&e.common.uuid),
520            JsonlEntry::Assistant(e) => Some(&e.common.uuid),
521            _ => None,
522        }
523    }
524}
525
526// ============================================================================
527// Session Index
528// ============================================================================
529
530#[derive(Clone, Debug)]
531struct SessionMeta {
532    path: PathBuf,
533    project_path: Option<PathBuf>,
534    tenant_id: Option<String>,
535    updated_at: DateTime<Utc>,
536    expires_at: Option<DateTime<Utc>>,
537    persisted_ids: HashSet<String>,
538    todos_hash: u64,
539    plan_hash: u64,
540}
541
542#[derive(Default)]
543struct SessionIndex {
544    sessions: HashMap<SessionId, SessionMeta>,
545    by_project: HashMap<PathBuf, Vec<SessionId>>,
546    by_tenant: HashMap<String, Vec<SessionId>>,
547}
548
549impl SessionIndex {
550    fn insert(&mut self, session_id: SessionId, meta: SessionMeta) {
551        // Remove old entries if updating
552        self.remove(&session_id);
553
554        if let Some(ref project) = meta.project_path {
555            self.by_project
556                .entry(project.clone())
557                .or_default()
558                .push(session_id);
559        }
560        if let Some(ref tenant) = meta.tenant_id {
561            self.by_tenant
562                .entry(tenant.clone())
563                .or_default()
564                .push(session_id);
565        }
566        self.sessions.insert(session_id, meta);
567    }
568
569    fn remove(&mut self, session_id: &SessionId) -> Option<SessionMeta> {
570        let meta = self.sessions.remove(session_id)?;
571
572        if let Some(ref project) = meta.project_path
573            && let Some(ids) = self.by_project.get_mut(project)
574        {
575            ids.retain(|id| id != session_id);
576        }
577        if let Some(ref tenant) = meta.tenant_id
578            && let Some(ids) = self.by_tenant.get_mut(tenant)
579        {
580            ids.retain(|id| id != session_id);
581        }
582        Some(meta)
583    }
584}
585
586// ============================================================================
587// File Operations (blocking, run via spawn_blocking)
588// ============================================================================
589
590fn read_entries_sync(path: &Path) -> SessionResult<Vec<JsonlEntry>> {
591    if !path.exists() {
592        return Ok(Vec::new());
593    }
594
595    let file = std::fs::File::open(path).map_err(|e| SessionError::Storage {
596        message: format!("Failed to open {}: {}", path.display(), e),
597    })?;
598
599    let reader = BufReader::with_capacity(64 * 1024, file);
600    let mut entries = Vec::with_capacity(128);
601
602    for (line_num, line) in reader.lines().enumerate() {
603        let line = line.map_err(|e| SessionError::Storage {
604            message: format!("Read error at line {}: {}", line_num + 1, e),
605        })?;
606
607        if line.trim().is_empty() {
608            continue;
609        }
610
611        match serde_json::from_str::<JsonlEntry>(&line) {
612            Ok(entry) => entries.push(entry),
613            Err(e) => {
614                tracing::warn!(
615                    path = %path.display(),
616                    line = line_num + 1,
617                    error = %e,
618                    "Skipping malformed JSONL entry"
619                );
620            }
621        }
622    }
623
624    Ok(entries)
625}
626
627fn append_entries_sync(path: &Path, entries: &[JsonlEntry], sync: bool) -> SessionResult<()> {
628    if entries.is_empty() {
629        return Ok(());
630    }
631
632    if let Some(parent) = path.parent() {
633        std::fs::create_dir_all(parent).map_err(|e| SessionError::Storage {
634            message: format!("Failed to create directory {}: {}", parent.display(), e),
635        })?;
636    }
637
638    let file = std::fs::OpenOptions::new()
639        .create(true)
640        .append(true)
641        .open(path)
642        .map_err(|e| SessionError::Storage {
643            message: format!("Failed to open {} for writing: {}", path.display(), e),
644        })?;
645
646    let mut writer = std::io::BufWriter::with_capacity(64 * 1024, file);
647
648    for entry in entries {
649        serde_json::to_writer(&mut writer, entry)?;
650        writeln!(writer).map_err(|e| SessionError::Storage {
651            message: format!("Write failed: {}", e),
652        })?;
653    }
654
655    writer.flush().map_err(|e| SessionError::Storage {
656        message: format!("Flush failed: {}", e),
657    })?;
658
659    if sync {
660        writer
661            .into_inner()
662            .map_err(|e| SessionError::Storage {
663                message: format!("Buffer error: {}", e.error()),
664            })?
665            .sync_all()
666            .map_err(|e| SessionError::Storage {
667                message: format!("Sync failed: {}", e),
668            })?;
669    }
670
671    Ok(())
672}
673
674// ============================================================================
675// JSONL Persistence Implementation
676// ============================================================================
677
678pub struct JsonlPersistence {
679    config: JsonlConfig,
680    index: Arc<RwLock<SessionIndex>>,
681    summaries: Arc<RwLock<HashMap<SessionId, Vec<SummarySnapshot>>>>,
682    queue: Arc<RwLock<HashMap<SessionId, Vec<QueueItem>>>>,
683}
684
685impl JsonlPersistence {
686    pub async fn new(config: JsonlConfig) -> SessionResult<Self> {
687        tokio::fs::create_dir_all(config.projects_dir())
688            .await
689            .map_err(|e| SessionError::Storage {
690                message: format!("Failed to create projects directory: {}", e),
691            })?;
692
693        let persistence = Self {
694            config,
695            index: Arc::new(RwLock::new(SessionIndex::default())),
696            summaries: Arc::new(RwLock::new(HashMap::new())),
697            queue: Arc::new(RwLock::new(HashMap::new())),
698        };
699
700        persistence.rebuild_index().await?;
701        Ok(persistence)
702    }
703
704    pub async fn default_config() -> SessionResult<Self> {
705        Self::new(JsonlConfig::default()).await
706    }
707
708    async fn rebuild_index(&self) -> SessionResult<()> {
709        let projects_dir = self.config.projects_dir();
710        if !projects_dir.exists() {
711            return Ok(());
712        }
713
714        let mut index = self.index.write().await;
715        let mut summaries = self.summaries.write().await;
716        let mut queue = self.queue.write().await;
717
718        let mut entries =
719            tokio::fs::read_dir(&projects_dir)
720                .await
721                .map_err(|e| SessionError::Storage {
722                    message: format!("Failed to read projects dir: {}", e),
723                })?;
724
725        while let Some(project_entry) =
726            entries
727                .next_entry()
728                .await
729                .map_err(|e| SessionError::Storage {
730                    message: format!("Failed to read entry: {}", e),
731                })?
732        {
733            let file_type = project_entry.file_type().await.ok();
734            if !file_type.map(|t| t.is_dir()).unwrap_or(false) {
735                continue;
736            }
737
738            let project_path = project_entry.path();
739            let mut files =
740                tokio::fs::read_dir(&project_path)
741                    .await
742                    .map_err(|e| SessionError::Storage {
743                        message: format!("Failed to read project dir: {}", e),
744                    })?;
745
746            while let Some(file_entry) =
747                files
748                    .next_entry()
749                    .await
750                    .map_err(|e| SessionError::Storage {
751                        message: format!("Failed to read file entry: {}", e),
752                    })?
753            {
754                let file_path = file_entry.path();
755                if file_path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
756                    continue;
757                }
758
759                let session_id = match file_path
760                    .file_stem()
761                    .and_then(|s| s.to_str())
762                    .and_then(SessionId::parse)
763                {
764                    Some(id) => id,
765                    None => continue,
766                };
767
768                // Read file in blocking context
769                let path_clone = file_path.clone();
770                let parsed = tokio::task::spawn_blocking(move || read_entries_sync(&path_clone))
771                    .await
772                    .map_err(|e| SessionError::Storage {
773                        message: format!("Task join error: {}", e),
774                    })??;
775
776                let (meta, session_summaries, session_queue) =
777                    Self::parse_file_metadata(session_id, file_path, &parsed);
778
779                index.insert(session_id, meta);
780                if !session_summaries.is_empty() {
781                    summaries.insert(session_id, session_summaries);
782                }
783                if !session_queue.is_empty() {
784                    queue.insert(session_id, session_queue);
785                }
786            }
787        }
788
789        Ok(())
790    }
791
792    fn parse_file_metadata(
793        session_id: SessionId,
794        path: PathBuf,
795        entries: &[JsonlEntry],
796    ) -> (SessionMeta, Vec<SummarySnapshot>, Vec<QueueItem>) {
797        let mut project_path: Option<PathBuf> = None;
798        let mut tenant_id: Option<String> = None;
799        let mut updated_at = Utc::now();
800        let mut expires_at: Option<DateTime<Utc>> = None;
801        let mut summaries = Vec::new();
802        let mut queue_items: HashMap<String, QueueItem> = HashMap::new();
803        let mut persisted_ids = HashSet::with_capacity(entries.len());
804
805        for entry in entries {
806            match entry {
807                JsonlEntry::User(e) => {
808                    if project_path.is_none() {
809                        project_path = e.common.cwd.clone();
810                    }
811                    updated_at = e.common.timestamp;
812                    persisted_ids.insert(e.common.uuid.clone());
813                }
814                JsonlEntry::Assistant(e) => {
815                    if project_path.is_none() {
816                        project_path = e.common.cwd.clone();
817                    }
818                    updated_at = e.common.timestamp;
819                    persisted_ids.insert(e.common.uuid.clone());
820                }
821                JsonlEntry::SessionMeta(m) => {
822                    tenant_id.clone_from(&m.tenant_id);
823                    updated_at = m.updated_at;
824                    expires_at = m.expires_at;
825                }
826                JsonlEntry::Summary(s) => {
827                    summaries.push(SummarySnapshot {
828                        id: Uuid::new_v4(),
829                        session_id,
830                        summary: s.summary.clone(),
831                        leaf_message_id: s.leaf_uuid.as_ref().map(MessageId::from_string),
832                        created_at: s.timestamp,
833                    });
834                }
835                JsonlEntry::QueueOperation(q) => {
836                    let item_id = match Uuid::parse_str(&q.item_id) {
837                        Ok(id) => id,
838                        Err(_) => continue,
839                    };
840                    match q.operation.as_str() {
841                        "enqueue" => {
842                            queue_items.insert(
843                                q.item_id.clone(),
844                                QueueItem {
845                                    id: item_id,
846                                    session_id,
847                                    operation: QueueOperation::Enqueue,
848                                    content: q.content.clone(),
849                                    priority: q.priority,
850                                    status: QueueStatus::Pending,
851                                    created_at: q.timestamp,
852                                    processed_at: None,
853                                },
854                            );
855                        }
856                        "dequeue" => {
857                            if let Some(item) = queue_items.get_mut(&q.item_id) {
858                                item.status = QueueStatus::Processing;
859                                item.processed_at = Some(q.timestamp);
860                            }
861                        }
862                        "cancel" => {
863                            if let Some(item) = queue_items.get_mut(&q.item_id) {
864                                item.status = QueueStatus::Cancelled;
865                                item.processed_at = Some(q.timestamp);
866                            }
867                        }
868                        _ => {}
869                    }
870                }
871                _ => {}
872            }
873        }
874
875        let queue: Vec<QueueItem> = queue_items.into_values().collect();
876
877        (
878            SessionMeta {
879                path,
880                project_path,
881                tenant_id,
882                updated_at,
883                expires_at,
884                persisted_ids,
885                todos_hash: 0,
886                plan_hash: 0,
887            },
888            summaries,
889            queue,
890        )
891    }
892
893    fn session_file_path(&self, session_id: &SessionId, project_path: Option<&Path>) -> PathBuf {
894        let dir = match project_path {
895            Some(p) => self.config.project_dir(p),
896            None => self.config.projects_dir().join("_default"),
897        };
898        dir.join(format!("{}.jsonl", session_id))
899    }
900
901    fn get_project_path(session: &Session) -> Option<PathBuf> {
902        session
903            .messages
904            .first()
905            .and_then(|m| m.environment.as_ref())
906            .and_then(|e| e.cwd.clone())
907    }
908
909    /// Compute a stable hash of todos for change detection.
910    /// Uses ahash with fixed seeds for deterministic cross-version hashing.
911    fn compute_todos_hash(todos: &[TodoItem]) -> u64 {
912        let hasher_state = ahash::RandomState::with_seeds(1234, 5678, 9012, 3456);
913        let mut hasher = hasher_state.build_hasher();
914        for todo in todos {
915            todo.id.hash(&mut hasher);
916            enum_to_jsonl(&todo.status, "pending").hash(&mut hasher);
917            todo.content.hash(&mut hasher);
918        }
919        hasher.finish()
920    }
921
922    /// Compute a stable hash of plan for change detection.
923    /// Uses ahash with fixed seeds for deterministic cross-version hashing.
924    fn compute_plan_hash(plan: Option<&Plan>) -> u64 {
925        let hasher_state = ahash::RandomState::with_seeds(1234, 5678, 9012, 3456);
926        let mut hasher = hasher_state.build_hasher();
927        if let Some(p) = plan {
928            p.id.hash(&mut hasher);
929            enum_to_jsonl(&p.status, "draft").hash(&mut hasher);
930            p.content.hash(&mut hasher);
931        }
932        hasher.finish()
933    }
934
935    fn session_to_meta_entry(session: &Session) -> JsonlEntry {
936        let warn_serialize = |field: &str, e: serde_json::Error| -> serde_json::Value {
937            tracing::warn!(
938                session_id = %session.id,
939                field,
940                error = %e,
941                "Failed to serialize session field"
942            );
943            serde_json::Value::Object(Default::default())
944        };
945
946        JsonlEntry::SessionMeta(SessionMetaEntry {
947            session_id: session.id.to_string(),
948            parent_session_id: session.parent_id.map(|p| p.to_string()),
949            tenant_id: session.tenant_id.clone(),
950            session_type: serde_json::to_value(&session.session_type)
951                .unwrap_or_else(|e| warn_serialize("session_type", e)),
952            mode: "stateless".to_string(),
953            state: enum_to_jsonl(&session.state, "created"),
954            config: serde_json::to_value(&session.config)
955                .unwrap_or_else(|e| warn_serialize("config", e)),
956            permission_policy: serde_json::to_value(&session.permissions)
957                .unwrap_or_else(|e| warn_serialize("permissions", e)),
958            total_usage: UsageInfo::from(&session.total_usage),
959            total_cost_usd: session.total_cost_usd,
960            static_context_hash: session.static_context_hash.clone(),
961            error: session.error.clone(),
962            created_at: session.created_at,
963            updated_at: session.updated_at,
964            expires_at: session.expires_at,
965        })
966    }
967
968    fn reconstruct_session(session_id: SessionId, entries: Vec<JsonlEntry>) -> Session {
969        let mut session = Session::new(SessionConfig::default());
970        session.id = session_id;
971
972        let mut messages: HashMap<String, SessionMessage> = HashMap::with_capacity(entries.len());
973        let mut todos_map: HashMap<String, TodoItem> = HashMap::new();
974        let mut latest_plan: Option<Plan> = None;
975        let mut compacts: Vec<CompactRecord> = Vec::new();
976
977        for entry in entries {
978            match entry {
979                JsonlEntry::User(_) | JsonlEntry::Assistant(_) => {
980                    if let Some(msg) = entry.to_session_message() {
981                        messages.insert(msg.id.to_string(), msg);
982                    }
983                }
984                JsonlEntry::SessionMeta(m) => {
985                    session.tenant_id = m.tenant_id;
986                    session.parent_id = m
987                        .parent_session_id
988                        .as_ref()
989                        .and_then(|s| SessionId::parse(s));
990                    session.session_type =
991                        serde_json::from_value(m.session_type).unwrap_or(SessionType::Main);
992                    // m.mode is ignored; SessionMode was removed (always stateless)
993                    session.state = jsonl_to_enum(&m.state).unwrap_or_default();
994                    session.config = serde_json::from_value(m.config).unwrap_or_else(|e| {
995                        tracing::warn!(
996                            session_id = %session_id,
997                            error = %e,
998                            "Failed to deserialize session config, using default"
999                        );
1000                        Default::default()
1001                    });
1002                    session.permissions = serde_json::from_value(m.permission_policy)
1003                        .unwrap_or_else(|e| {
1004                            tracing::warn!(
1005                                session_id = %session_id,
1006                                error = %e,
1007                                "Failed to deserialize session permissions, using default"
1008                            );
1009                            Default::default()
1010                        });
1011                    session.total_usage = TokenUsage::from(&m.total_usage);
1012                    session.total_cost_usd = m.total_cost_usd;
1013                    session.static_context_hash = m.static_context_hash;
1014                    session.error = m.error;
1015                    session.created_at = m.created_at;
1016                    session.updated_at = m.updated_at;
1017                    session.expires_at = m.expires_at;
1018                }
1019                JsonlEntry::Summary(s) => {
1020                    session.summary = Some(s.summary);
1021                }
1022                JsonlEntry::Todo(t) => {
1023                    let todo = TodoItem {
1024                        id: Uuid::parse_str(&t.id).unwrap_or_else(|_| Uuid::new_v4()),
1025                        session_id,
1026                        content: t.content,
1027                        active_form: t.active_form,
1028                        status: jsonl_to_enum(&t.status).unwrap_or_default(),
1029                        plan_id: t.plan_id.and_then(|s| Uuid::parse_str(&s).ok()),
1030                        created_at: t.created_at,
1031                        started_at: t.started_at,
1032                        completed_at: t.completed_at,
1033                    };
1034                    // Use map to get latest version of each todo
1035                    todos_map.insert(t.id, todo);
1036                }
1037                JsonlEntry::Plan(p) => {
1038                    let plan = Plan {
1039                        id: Uuid::parse_str(&p.id).unwrap_or_else(|_| Uuid::new_v4()),
1040                        session_id,
1041                        name: p.name,
1042                        content: p.content,
1043                        status: jsonl_to_enum(&p.status).unwrap_or_default(),
1044                        error: p.error,
1045                        created_at: p.created_at,
1046                        approved_at: p.approved_at,
1047                        started_at: p.started_at,
1048                        completed_at: p.completed_at,
1049                    };
1050                    // Keep the latest plan entry
1051                    latest_plan = Some(plan);
1052                }
1053                JsonlEntry::Compact(c) => {
1054                    compacts.push(CompactRecord {
1055                        id: Uuid::parse_str(&c.id).unwrap_or_else(|_| Uuid::new_v4()),
1056                        session_id,
1057                        trigger: jsonl_to_enum(&c.trigger).unwrap_or_default(),
1058                        pre_tokens: c.pre_tokens,
1059                        post_tokens: c.post_tokens,
1060                        saved_tokens: c.saved_tokens,
1061                        summary: c.summary,
1062                        original_count: c.original_count,
1063                        new_count: c.new_count,
1064                        logical_parent_id: c.logical_parent_id.as_ref().map(MessageId::from_string),
1065                        created_at: c.created_at,
1066                    });
1067                }
1068                _ => {}
1069            }
1070        }
1071
1072        // Topological sort preserving order
1073        let ordered = Self::topological_sort(&messages);
1074        session.messages = Vec::with_capacity(ordered.len());
1075        for msg in ordered {
1076            session.add_message(msg);
1077        }
1078
1079        // Restore todos, plan, and compacts
1080        session.todos = todos_map.into_values().collect();
1081        session
1082            .todos
1083            .sort_by(|a, b| a.created_at.cmp(&b.created_at));
1084        session.current_plan = latest_plan;
1085        session.compact_history = VecDeque::from(compacts);
1086
1087        session
1088    }
1089
1090    fn topological_sort(messages: &HashMap<String, SessionMessage>) -> Vec<SessionMessage> {
1091        if messages.is_empty() {
1092            return Vec::new();
1093        }
1094
1095        // Build parent -> children mapping, sorted by timestamp to preserve order
1096        let mut children: HashMap<Option<String>, Vec<&SessionMessage>> = HashMap::new();
1097        for msg in messages.values() {
1098            children
1099                .entry(msg.parent_id.as_ref().map(|p| p.to_string()))
1100                .or_default()
1101                .push(msg);
1102        }
1103
1104        // Sort each group by timestamp
1105        for group in children.values_mut() {
1106            group.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
1107        }
1108
1109        // BFS traversal using VecDeque for FIFO order
1110        let mut result = Vec::with_capacity(messages.len());
1111        let mut queue = std::collections::VecDeque::new();
1112
1113        // Start with root messages (no parent)
1114        if let Some(roots) = children.remove(&None) {
1115            queue.extend(roots);
1116        }
1117
1118        while let Some(msg) = queue.pop_front() {
1119            let id = msg.id.to_string();
1120            result.push(msg.clone());
1121            if let Some(child_msgs) = children.remove(&Some(id)) {
1122                queue.extend(child_msgs);
1123            }
1124        }
1125
1126        result
1127    }
1128}
1129
1130#[async_trait::async_trait]
1131impl Persistence for JsonlPersistence {
1132    fn name(&self) -> &str {
1133        "jsonl"
1134    }
1135
1136    async fn save(&self, session: &Session) -> SessionResult<()> {
1137        let project_path = Self::get_project_path(session);
1138        let file_path = self.session_file_path(&session.id, project_path.as_deref());
1139
1140        // Get persisted state from index (avoid re-reading file)
1141        let (persisted_ids, prev_todos_hash, prev_plan_hash) = {
1142            let index = self.index.read().await;
1143            match index.sessions.get(&session.id) {
1144                Some(m) => (m.persisted_ids.clone(), m.todos_hash, m.plan_hash),
1145                None => (HashSet::new(), 0, 0),
1146            }
1147        };
1148
1149        let mut new_entries = Vec::new();
1150        let mut new_ids = HashSet::new();
1151
1152        new_entries.push(Self::session_to_meta_entry(session));
1153
1154        // Only new messages (incremental)
1155        for msg in &session.messages {
1156            let id = msg.id.to_string();
1157            if !persisted_ids.contains(&id) {
1158                new_entries.push(JsonlEntry::from_message(&session.id, msg));
1159                new_ids.insert(id);
1160            }
1161        }
1162
1163        // Compute current hashes
1164        let current_todos_hash = Self::compute_todos_hash(&session.todos);
1165        let current_plan_hash = Self::compute_plan_hash(session.current_plan.as_ref());
1166
1167        // Persist todos only if changed (including when cleared)
1168        if current_todos_hash != prev_todos_hash {
1169            for todo in &session.todos {
1170                new_entries.push(JsonlEntry::Todo(TodoEntry {
1171                    id: todo.id.to_string(),
1172                    session_id: session.id.to_string(),
1173                    content: todo.content.clone(),
1174                    active_form: todo.active_form.clone(),
1175                    status: enum_to_jsonl(&todo.status, "pending"),
1176                    plan_id: todo.plan_id.map(|id| id.to_string()),
1177                    created_at: todo.created_at,
1178                    started_at: todo.started_at,
1179                    completed_at: todo.completed_at,
1180                }));
1181            }
1182        }
1183
1184        // Persist plan only if changed
1185        if current_plan_hash != prev_plan_hash
1186            && let Some(ref plan) = session.current_plan
1187        {
1188            new_entries.push(JsonlEntry::Plan(PlanEntry {
1189                id: plan.id.to_string(),
1190                session_id: session.id.to_string(),
1191                name: plan.name.clone(),
1192                content: plan.content.clone(),
1193                status: enum_to_jsonl(&plan.status, "draft"),
1194                error: plan.error.clone(),
1195                created_at: plan.created_at,
1196                approved_at: plan.approved_at,
1197                started_at: plan.started_at,
1198                completed_at: plan.completed_at,
1199            }));
1200        }
1201
1202        // Persist compact history (incremental by ID)
1203        for compact in &session.compact_history {
1204            let compact_id = format!("compact:{}", compact.id);
1205            if !persisted_ids.contains(&compact_id) {
1206                new_entries.push(JsonlEntry::Compact(CompactEntry {
1207                    id: compact.id.to_string(),
1208                    session_id: session.id.to_string(),
1209                    trigger: enum_to_jsonl(&compact.trigger, "manual"),
1210                    pre_tokens: compact.pre_tokens,
1211                    post_tokens: compact.post_tokens,
1212                    saved_tokens: compact.saved_tokens,
1213                    summary: compact.summary.clone(),
1214                    original_count: compact.original_count,
1215                    new_count: compact.new_count,
1216                    logical_parent_id: compact.logical_parent_id.as_ref().map(|id| id.to_string()),
1217                    created_at: compact.created_at,
1218                }));
1219                new_ids.insert(compact_id);
1220            }
1221        }
1222
1223        if new_entries.is_empty() {
1224            return Ok(());
1225        }
1226
1227        // Write in blocking context
1228        let path_clone = file_path.clone();
1229        let sync = self.config.sync_mode == SyncMode::OnWrite;
1230        tokio::task::spawn_blocking(move || append_entries_sync(&path_clone, &new_entries, sync))
1231            .await
1232            .map_err(|e| SessionError::Storage {
1233                message: format!("Task join error: {}", e),
1234            })??;
1235
1236        let mut index = self.index.write().await;
1237        let mut persisted = persisted_ids;
1238        persisted.extend(new_ids);
1239
1240        index.insert(
1241            session.id,
1242            SessionMeta {
1243                path: file_path,
1244                project_path,
1245                tenant_id: session.tenant_id.clone(),
1246                updated_at: session.updated_at,
1247                expires_at: session.expires_at,
1248                persisted_ids: persisted,
1249                todos_hash: current_todos_hash,
1250                plan_hash: current_plan_hash,
1251            },
1252        );
1253
1254        Ok(())
1255    }
1256
1257    async fn load(&self, id: &SessionId) -> SessionResult<Option<Session>> {
1258        let path = {
1259            let index = self.index.read().await;
1260            match index.sessions.get(id) {
1261                Some(m) => m.path.clone(),
1262                None => return Ok(None),
1263            }
1264        };
1265
1266        let entries = tokio::task::spawn_blocking(move || read_entries_sync(&path))
1267            .await
1268            .map_err(|e| SessionError::Storage {
1269                message: format!("Task join error: {}", e),
1270            })??;
1271
1272        if entries.is_empty() {
1273            return Ok(None);
1274        }
1275
1276        let session = Self::reconstruct_session(*id, entries);
1277        Ok(Some(session))
1278    }
1279
1280    async fn delete(&self, id: &SessionId) -> SessionResult<bool> {
1281        let meta = {
1282            let mut index = self.index.write().await;
1283            index.remove(id)
1284        };
1285
1286        let Some(meta) = meta else {
1287            return Ok(false);
1288        };
1289
1290        if meta.path.exists() {
1291            tokio::fs::remove_file(&meta.path)
1292                .await
1293                .map_err(|e| SessionError::Storage {
1294                    message: format!("Failed to delete {}: {}", meta.path.display(), e),
1295                })?;
1296        }
1297
1298        self.summaries.write().await.remove(id);
1299        self.queue.write().await.remove(id);
1300        Ok(true)
1301    }
1302
1303    async fn list(&self, tenant_id: Option<&str>) -> SessionResult<Vec<SessionId>> {
1304        let index = self.index.read().await;
1305        Ok(match tenant_id {
1306            Some(tid) => index.by_tenant.get(tid).cloned().unwrap_or_default(),
1307            None => index.sessions.keys().copied().collect(),
1308        })
1309    }
1310
1311    async fn add_summary(&self, snapshot: SummarySnapshot) -> SessionResult<()> {
1312        let path = {
1313            let index = self.index.read().await;
1314            index
1315                .sessions
1316                .get(&snapshot.session_id)
1317                .map(|m| m.path.clone())
1318        };
1319
1320        if let Some(path) = path {
1321            let entry = JsonlEntry::Summary(SummaryEntry {
1322                session_id: snapshot.session_id.to_string(),
1323                summary: snapshot.summary.clone(),
1324                leaf_uuid: snapshot.leaf_message_id.as_ref().map(|id| id.to_string()),
1325                timestamp: snapshot.created_at,
1326            });
1327
1328            let sync = self.config.sync_mode == SyncMode::OnWrite;
1329            tokio::task::spawn_blocking(move || append_entries_sync(&path, &[entry], sync))
1330                .await
1331                .map_err(|e| SessionError::Storage {
1332                    message: format!("Task join error: {}", e),
1333                })??;
1334        }
1335
1336        self.summaries
1337            .write()
1338            .await
1339            .entry(snapshot.session_id)
1340            .or_default()
1341            .push(snapshot);
1342
1343        Ok(())
1344    }
1345
1346    async fn get_summaries(&self, session_id: &SessionId) -> SessionResult<Vec<SummarySnapshot>> {
1347        Ok(self
1348            .summaries
1349            .read()
1350            .await
1351            .get(session_id)
1352            .cloned()
1353            .unwrap_or_default())
1354    }
1355
1356    async fn enqueue(
1357        &self,
1358        session_id: &SessionId,
1359        content: String,
1360        priority: i32,
1361    ) -> SessionResult<QueueItem> {
1362        let item = QueueItem::enqueue(*session_id, content.clone()).priority(priority);
1363
1364        let path = {
1365            let index = self.index.read().await;
1366            index.sessions.get(session_id).map(|m| m.path.clone())
1367        };
1368
1369        if let Some(path) = path {
1370            let entry = JsonlEntry::QueueOperation(QueueOperationEntry {
1371                operation: "enqueue".to_string(),
1372                session_id: session_id.to_string(),
1373                timestamp: Utc::now(),
1374                content,
1375                priority,
1376                item_id: item.id.to_string(),
1377            });
1378
1379            let sync = self.config.sync_mode == SyncMode::OnWrite;
1380            tokio::task::spawn_blocking(move || append_entries_sync(&path, &[entry], sync))
1381                .await
1382                .map_err(|e| SessionError::Storage {
1383                    message: format!("Task join error: {}", e),
1384                })??;
1385        }
1386
1387        self.queue
1388            .write()
1389            .await
1390            .entry(*session_id)
1391            .or_default()
1392            .push(item.clone());
1393
1394        Ok(item)
1395    }
1396
1397    async fn dequeue(&self, session_id: &SessionId) -> SessionResult<Option<QueueItem>> {
1398        let dequeued = {
1399            let mut queue = self.queue.write().await;
1400            let items = match queue.get_mut(session_id) {
1401                Some(items) => items,
1402                None => return Ok(None),
1403            };
1404
1405            items.sort_by(|a, b| b.priority.cmp(&a.priority));
1406
1407            let mut result = None;
1408            for item in items.iter_mut() {
1409                if item.status == QueueStatus::Pending {
1410                    item.start_processing();
1411                    result = Some(item.clone());
1412                    break;
1413                }
1414            }
1415            result
1416        };
1417
1418        if let Some(ref item) = dequeued {
1419            let path = {
1420                let index = self.index.read().await;
1421                index.sessions.get(session_id).map(|m| m.path.clone())
1422            };
1423
1424            if let Some(path) = path {
1425                let entry = JsonlEntry::QueueOperation(QueueOperationEntry {
1426                    operation: "dequeue".to_string(),
1427                    session_id: session_id.to_string(),
1428                    timestamp: Utc::now(),
1429                    content: item.content.clone(),
1430                    priority: item.priority,
1431                    item_id: item.id.to_string(),
1432                });
1433
1434                let sync = self.config.sync_mode == SyncMode::OnWrite;
1435                tokio::task::spawn_blocking(move || append_entries_sync(&path, &[entry], sync))
1436                    .await
1437                    .map_err(|e| SessionError::Storage {
1438                        message: format!("Task join error: {}", e),
1439                    })??;
1440            }
1441        }
1442
1443        Ok(dequeued)
1444    }
1445
1446    async fn cancel_queued(&self, item_id: Uuid) -> SessionResult<bool> {
1447        let cancelled = {
1448            let mut queue = self.queue.write().await;
1449            let mut found = None;
1450            for items in queue.values_mut() {
1451                if let Some(item) = items.iter_mut().find(|i| i.id == item_id) {
1452                    item.cancel();
1453                    found = Some(item.clone());
1454                    break;
1455                }
1456            }
1457            found
1458        };
1459
1460        let Some(item) = cancelled else {
1461            return Ok(false);
1462        };
1463
1464        let path = {
1465            let index = self.index.read().await;
1466            index.sessions.get(&item.session_id).map(|m| m.path.clone())
1467        };
1468
1469        if let Some(path) = path {
1470            let entry = JsonlEntry::QueueOperation(QueueOperationEntry {
1471                operation: "cancel".to_string(),
1472                session_id: item.session_id.to_string(),
1473                timestamp: Utc::now(),
1474                content: item.content.clone(),
1475                priority: item.priority,
1476                item_id: item.id.to_string(),
1477            });
1478
1479            let sync = self.config.sync_mode == SyncMode::OnWrite;
1480            tokio::task::spawn_blocking(move || append_entries_sync(&path, &[entry], sync))
1481                .await
1482                .map_err(|e| SessionError::Storage {
1483                    message: format!("Task join error: {}", e),
1484                })??;
1485        }
1486
1487        Ok(true)
1488    }
1489
1490    async fn pending_queue(&self, session_id: &SessionId) -> SessionResult<Vec<QueueItem>> {
1491        Ok(self
1492            .queue
1493            .read()
1494            .await
1495            .get(session_id)
1496            .map(|items| {
1497                items
1498                    .iter()
1499                    .filter(|i| i.status == QueueStatus::Pending)
1500                    .cloned()
1501                    .collect()
1502            })
1503            .unwrap_or_default())
1504    }
1505
1506    async fn cleanup_expired(&self) -> SessionResult<usize> {
1507        let now = Utc::now();
1508        let retention_cutoff = now - chrono::Duration::days(self.config.retention_days as i64);
1509
1510        let (expired_ids, expired_paths) = {
1511            let mut index = self.index.write().await;
1512            let expired_ids: Vec<SessionId> = index
1513                .sessions
1514                .iter()
1515                .filter(|(_, m)| {
1516                    if let Some(expires_at) = m.expires_at {
1517                        expires_at < now
1518                    } else {
1519                        m.updated_at < retention_cutoff
1520                    }
1521                })
1522                .map(|(id, _)| *id)
1523                .collect();
1524
1525            let mut paths = Vec::with_capacity(expired_ids.len());
1526            for id in &expired_ids {
1527                if let Some(meta) = index.remove(id) {
1528                    paths.push(meta.path);
1529                }
1530            }
1531            (expired_ids, paths)
1532        };
1533
1534        let count = expired_paths.len();
1535
1536        if !expired_ids.is_empty() {
1537            let mut summaries = self.summaries.write().await;
1538            let mut queue = self.queue.write().await;
1539            for id in &expired_ids {
1540                summaries.remove(id);
1541                queue.remove(id);
1542            }
1543        }
1544
1545        for path in expired_paths {
1546            let _ = tokio::fs::remove_file(&path).await;
1547        }
1548
1549        Ok(count)
1550    }
1551}
1552
1553// ============================================================================
1554// Tests
1555// ============================================================================
1556
1557#[cfg(test)]
1558mod tests {
1559    use super::*;
1560    use crate::types::ContentBlock;
1561    use tempfile::TempDir;
1562
1563    async fn create_test_persistence() -> (JsonlPersistence, TempDir) {
1564        let temp_dir = TempDir::new().unwrap();
1565        let config = JsonlConfig::builder()
1566            .base_dir(temp_dir.path().to_path_buf())
1567            .build();
1568        let persistence = JsonlPersistence::new(config).await.unwrap();
1569        (persistence, temp_dir)
1570    }
1571
1572    #[tokio::test]
1573    async fn test_save_and_load_session() {
1574        let (persistence, _temp) = create_test_persistence().await;
1575
1576        let mut session = Session::new(SessionConfig::default());
1577        session.add_message(SessionMessage::user(vec![ContentBlock::text("Hello")]));
1578        session.add_message(SessionMessage::assistant(vec![ContentBlock::text(
1579            "Hi there!",
1580        )]));
1581
1582        persistence.save(&session).await.unwrap();
1583
1584        let loaded = persistence.load(&session.id).await.unwrap().unwrap();
1585        assert_eq!(loaded.id, session.id);
1586        assert_eq!(loaded.messages.len(), 2);
1587    }
1588
1589    #[tokio::test]
1590    async fn test_incremental_save() {
1591        let (persistence, _temp) = create_test_persistence().await;
1592
1593        let mut session = Session::new(SessionConfig::default());
1594        session.add_message(SessionMessage::user(vec![ContentBlock::text("First")]));
1595        persistence.save(&session).await.unwrap();
1596
1597        session.add_message(SessionMessage::assistant(vec![ContentBlock::text(
1598            "Second",
1599        )]));
1600        persistence.save(&session).await.unwrap();
1601
1602        let loaded = persistence.load(&session.id).await.unwrap().unwrap();
1603        assert_eq!(loaded.messages.len(), 2);
1604    }
1605
1606    #[tokio::test]
1607    async fn test_delete_session() {
1608        let (persistence, _temp) = create_test_persistence().await;
1609
1610        let session = Session::new(SessionConfig::default());
1611        persistence.save(&session).await.unwrap();
1612
1613        assert!(persistence.delete(&session.id).await.unwrap());
1614        assert!(persistence.load(&session.id).await.unwrap().is_none());
1615    }
1616
1617    #[tokio::test]
1618    async fn test_list_sessions() {
1619        let (persistence, _temp) = create_test_persistence().await;
1620
1621        let s1 = Session::new(SessionConfig::default());
1622        let s2 = Session::new(SessionConfig::default());
1623
1624        persistence.save(&s1).await.unwrap();
1625        persistence.save(&s2).await.unwrap();
1626
1627        let list = persistence.list(None).await.unwrap();
1628        assert_eq!(list.len(), 2);
1629    }
1630
1631    #[tokio::test]
1632    async fn test_tenant_filtering() {
1633        let (persistence, _temp) = create_test_persistence().await;
1634
1635        let mut s1 = Session::new(SessionConfig::default());
1636        s1.tenant_id = Some("tenant-a".to_string());
1637
1638        let mut s2 = Session::new(SessionConfig::default());
1639        s2.tenant_id = Some("tenant-b".to_string());
1640
1641        persistence.save(&s1).await.unwrap();
1642        persistence.save(&s2).await.unwrap();
1643
1644        let list = persistence.list(Some("tenant-a")).await.unwrap();
1645        assert_eq!(list.len(), 1);
1646        assert_eq!(list[0], s1.id);
1647    }
1648
1649    #[tokio::test]
1650    async fn test_summaries() {
1651        let (persistence, _temp) = create_test_persistence().await;
1652
1653        let session = Session::new(SessionConfig::default());
1654        persistence.save(&session).await.unwrap();
1655
1656        persistence
1657            .add_summary(SummarySnapshot::new(session.id, "Summary 1"))
1658            .await
1659            .unwrap();
1660        persistence
1661            .add_summary(SummarySnapshot::new(session.id, "Summary 2"))
1662            .await
1663            .unwrap();
1664
1665        let summaries = persistence.get_summaries(&session.id).await.unwrap();
1666        assert_eq!(summaries.len(), 2);
1667    }
1668
1669    #[tokio::test]
1670    async fn test_queue_operations() {
1671        let (persistence, _temp) = create_test_persistence().await;
1672
1673        let session = Session::new(SessionConfig::default());
1674        persistence.save(&session).await.unwrap();
1675
1676        persistence
1677            .enqueue(&session.id, "Low priority".to_string(), 1)
1678            .await
1679            .unwrap();
1680        persistence
1681            .enqueue(&session.id, "High priority".to_string(), 10)
1682            .await
1683            .unwrap();
1684
1685        let next = persistence.dequeue(&session.id).await.unwrap().unwrap();
1686        assert_eq!(next.content, "High priority");
1687    }
1688
1689    #[tokio::test]
1690    async fn test_dag_reconstruction() {
1691        let (persistence, _temp) = create_test_persistence().await;
1692
1693        let mut session = Session::new(SessionConfig::default());
1694        session.add_message(SessionMessage::user(vec![ContentBlock::text("Q1")]));
1695        session.add_message(SessionMessage::assistant(vec![ContentBlock::text("A1")]));
1696        session.add_message(SessionMessage::user(vec![ContentBlock::text("Q2")]));
1697        session.add_message(SessionMessage::assistant(vec![ContentBlock::text("A2")]));
1698
1699        persistence.save(&session).await.unwrap();
1700
1701        let loaded = persistence.load(&session.id).await.unwrap().unwrap();
1702
1703        assert_eq!(loaded.messages.len(), 4);
1704        assert!(
1705            loaded.messages[0]
1706                .content
1707                .iter()
1708                .any(|c| c.as_text() == Some("Q1"))
1709        );
1710        assert!(
1711            loaded.messages[1]
1712                .content
1713                .iter()
1714                .any(|c| c.as_text() == Some("A1"))
1715        );
1716        assert!(
1717            loaded.messages[2]
1718                .content
1719                .iter()
1720                .any(|c| c.as_text() == Some("Q2"))
1721        );
1722        assert!(
1723            loaded.messages[3]
1724                .content
1725                .iter()
1726                .any(|c| c.as_text() == Some("A2"))
1727        );
1728    }
1729
1730    #[tokio::test]
1731    async fn test_project_path_encoding() {
1732        let config = JsonlConfig::default();
1733
1734        assert_eq!(
1735            config.encode_project_path(Path::new("/home/user/project")),
1736            "2f686f6d652f757365722f70726f6a656374"
1737        );
1738        assert_eq!(
1739            config.encode_project_path(Path::new("/Users/alice/work/app")),
1740            "2f55736572732f616c6963652f776f726b2f617070"
1741        );
1742    }
1743
1744    #[test]
1745    fn test_jsonl_entry_serialization() {
1746        let msg = SessionMessage::user(vec![ContentBlock::text("Hello")]);
1747        let session_id = SessionId::new();
1748        let entry = JsonlEntry::from_message(&session_id, &msg);
1749
1750        let json = serde_json::to_string(&entry).unwrap();
1751        assert!(json.contains("\"type\":\"user\""));
1752
1753        let parsed: JsonlEntry = serde_json::from_str(&json).unwrap();
1754        assert!(matches!(parsed, JsonlEntry::User(_)));
1755    }
1756
1757    #[tokio::test]
1758    async fn test_no_duplicate_writes() {
1759        let (persistence, _temp) = create_test_persistence().await;
1760
1761        let mut session = Session::new(SessionConfig::default());
1762        session.add_message(SessionMessage::user(vec![ContentBlock::text("Hello")]));
1763        persistence.save(&session).await.unwrap();
1764        persistence.save(&session).await.unwrap(); // Save same data twice
1765        persistence.save(&session).await.unwrap(); // And again
1766
1767        // Check file has only one message entry + session meta
1768        let file_path = persistence.session_file_path(&session.id, None);
1769        let entries = read_entries_sync(&file_path).unwrap();
1770        let message_count = entries
1771            .iter()
1772            .filter(|e| e.message_uuid().is_some())
1773            .count();
1774        assert_eq!(message_count, 1, "Should not duplicate message entries");
1775    }
1776
1777    #[test]
1778    fn test_windows_path_encoding() {
1779        let config = JsonlConfig::default();
1780        let encoded = config.encode_project_path(Path::new("C:\\Users\\alice\\project"));
1781        assert!(encoded.chars().all(|c| c.is_ascii_hexdigit()));
1782        assert!(!encoded.is_empty());
1783    }
1784}