1use std::collections::{HashMap, HashSet, VecDeque};
25use std::hash::{BuildHasher, Hash, Hasher};
26use std::io::{BufRead, BufReader, Write};
27use std::path::{Path, PathBuf};
28use std::sync::Arc;
29
30use chrono::{DateTime, Utc};
31use rust_decimal::Decimal;
32use serde::{Deserialize, Serialize};
33use tokio::sync::RwLock;
34use uuid::Uuid;
35
36use super::state::{MessageId, Session, SessionConfig, SessionId, SessionMessage, SessionType};
37use super::types::{
38 CompactRecord, EnvironmentContext, Plan, QueueItem, QueueOperation, QueueStatus,
39 SummarySnapshot, TodoItem,
40};
41use super::{Persistence, SessionError, SessionResult};
42use crate::types::{ContentBlock, Role, TokenUsage};
43
44fn enum_to_jsonl<T: serde::Serialize>(value: &T, default: &str) -> String {
50 serde_json::to_string(value)
51 .map(|s| s.trim_matches('"').to_string())
52 .unwrap_or_else(|_| default.to_string())
53}
54
55fn jsonl_to_enum<T: serde::de::DeserializeOwned>(s: &str) -> Option<T> {
57 serde_json::from_str(&format!("\"{}\"", s)).ok()
58}
59
60#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
66pub enum SyncMode {
67 #[default]
69 None,
70 OnWrite,
72}
73
74#[derive(Clone, Debug)]
76pub struct JsonlConfig {
77 pub base_dir: PathBuf,
79 pub retention_days: u32,
81 pub sync_mode: SyncMode,
83}
84
85impl Default for JsonlConfig {
86 fn default() -> Self {
87 Self {
88 base_dir: crate::common::home_dir()
89 .unwrap_or_else(|| PathBuf::from("."))
90 .join(".claude"),
91 retention_days: 30,
92 sync_mode: SyncMode::default(),
93 }
94 }
95}
96
97impl JsonlConfig {
98 pub fn builder() -> JsonlConfigBuilder {
99 JsonlConfigBuilder::default()
100 }
101
102 fn projects_dir(&self) -> PathBuf {
103 self.base_dir.join("projects")
104 }
105
106 fn encode_project_path(&self, path: &Path) -> String {
109 path.as_os_str()
110 .as_encoded_bytes()
111 .iter()
112 .map(|b| format!("{:02x}", b))
113 .collect::<String>()
114 }
115
116 fn project_dir(&self, project_path: &Path) -> PathBuf {
117 self.projects_dir()
118 .join(self.encode_project_path(project_path))
119 }
120}
121
122#[derive(Default)]
124pub struct JsonlConfigBuilder {
125 base_dir: Option<PathBuf>,
126 retention_days: Option<u32>,
127 sync_mode: Option<SyncMode>,
128}
129
130impl JsonlConfigBuilder {
131 pub fn base_dir(mut self, path: impl Into<PathBuf>) -> Self {
132 self.base_dir = Some(path.into());
133 self
134 }
135
136 pub fn retention_days(mut self, days: u32) -> Self {
137 self.retention_days = Some(days);
138 self
139 }
140
141 pub fn sync_mode(mut self, mode: SyncMode) -> Self {
142 self.sync_mode = Some(mode);
143 self
144 }
145
146 pub fn build(self) -> JsonlConfig {
147 let default = JsonlConfig::default();
148 JsonlConfig {
149 base_dir: self.base_dir.unwrap_or(default.base_dir),
150 retention_days: self.retention_days.unwrap_or(default.retention_days),
151 sync_mode: self.sync_mode.unwrap_or(default.sync_mode),
152 }
153 }
154}
155
156#[derive(Clone, Debug, Serialize, Deserialize)]
162#[serde(tag = "type", rename_all = "snake_case")]
163pub enum JsonlEntry {
164 User(UserEntry),
165 Assistant(AssistantEntry),
166 System(SystemEntry),
167 QueueOperation(QueueOperationEntry),
168 Summary(SummaryEntry),
169 SessionMeta(SessionMetaEntry),
170 Todo(TodoEntry),
171 Plan(PlanEntry),
172 Compact(CompactEntry),
173}
174
175#[derive(Clone, Debug, Serialize, Deserialize)]
177pub struct EntryCommon {
178 pub uuid: String,
179 #[serde(rename = "parentUuid", skip_serializing_if = "Option::is_none")]
180 pub parent_uuid: Option<String>,
181 #[serde(rename = "sessionId")]
182 pub session_id: String,
183 pub timestamp: DateTime<Utc>,
184 #[serde(skip_serializing_if = "Option::is_none")]
185 pub cwd: Option<PathBuf>,
186 pub version: String,
187 #[serde(rename = "gitBranch", default)]
188 pub git_branch: String,
189 #[serde(rename = "isSidechain", default)]
190 pub is_sidechain: bool,
191}
192
193impl EntryCommon {
194 fn from_message(session_id: &SessionId, msg: &SessionMessage) -> Self {
195 Self {
196 uuid: msg.id.to_string(),
197 parent_uuid: msg.parent_id.as_ref().map(|id| id.to_string()),
198 session_id: session_id.to_string(),
199 timestamp: msg.timestamp,
200 cwd: msg.environment.as_ref().and_then(|e| e.cwd.clone()),
201 version: env!("CARGO_PKG_VERSION").to_string(),
202 git_branch: msg
203 .environment
204 .as_ref()
205 .and_then(|e| e.git_branch.clone())
206 .unwrap_or_default(),
207 is_sidechain: msg.is_sidechain,
208 }
209 }
210
211 fn to_environment(&self) -> EnvironmentContext {
212 EnvironmentContext {
213 cwd: self.cwd.clone(),
214 git_branch: if self.git_branch.is_empty() {
215 None
216 } else {
217 Some(self.git_branch.clone())
218 },
219 ..Default::default()
220 }
221 }
222}
223
224#[derive(Clone, Debug, Serialize, Deserialize)]
225pub struct UserMessageContent {
226 pub role: String,
227 pub content: serde_json::Value,
228}
229
230#[derive(Clone, Debug, Serialize, Deserialize)]
231pub struct UserEntry {
232 #[serde(flatten)]
233 pub common: EntryCommon,
234 pub message: UserMessageContent,
235 #[serde(rename = "isCompactSummary", default)]
236 pub is_compact_summary: bool,
237}
238
239#[derive(Clone, Debug, Serialize, Deserialize)]
240pub struct AssistantMessageContent {
241 #[serde(skip_serializing_if = "Option::is_none")]
242 pub id: Option<String>,
243 #[serde(skip_serializing_if = "Option::is_none")]
244 pub model: Option<String>,
245 pub role: String,
246 #[serde(skip_serializing_if = "Option::is_none")]
247 pub stop_reason: Option<String>,
248 pub content: serde_json::Value,
249 #[serde(skip_serializing_if = "Option::is_none")]
250 pub usage: Option<UsageInfo>,
251}
252
253#[derive(Clone, Debug, Default, Serialize, Deserialize)]
254pub struct UsageInfo {
255 pub input_tokens: u64,
256 pub output_tokens: u64,
257 #[serde(default)]
258 pub cache_creation_input_tokens: u64,
259 #[serde(default)]
260 pub cache_read_input_tokens: u64,
261}
262
263impl From<&TokenUsage> for UsageInfo {
264 fn from(u: &TokenUsage) -> Self {
265 Self {
266 input_tokens: u.input_tokens,
267 output_tokens: u.output_tokens,
268 cache_creation_input_tokens: u.cache_creation_input_tokens,
269 cache_read_input_tokens: u.cache_read_input_tokens,
270 }
271 }
272}
273
274impl From<&UsageInfo> for TokenUsage {
275 fn from(u: &UsageInfo) -> Self {
276 Self {
277 input_tokens: u.input_tokens,
278 output_tokens: u.output_tokens,
279 cache_creation_input_tokens: u.cache_creation_input_tokens,
280 cache_read_input_tokens: u.cache_read_input_tokens,
281 }
282 }
283}
284
285#[derive(Clone, Debug, Serialize, Deserialize)]
286pub struct AssistantEntry {
287 #[serde(flatten)]
288 pub common: EntryCommon,
289 pub message: AssistantMessageContent,
290 #[serde(rename = "requestId", skip_serializing_if = "Option::is_none")]
291 pub request_id: Option<String>,
292}
293
294#[derive(Clone, Debug, Serialize, Deserialize)]
295pub struct SystemEntry {
296 #[serde(flatten)]
297 pub common: EntryCommon,
298 pub subtype: String,
299 #[serde(skip_serializing_if = "Option::is_none")]
300 pub content: Option<String>,
301}
302
303#[derive(Clone, Debug, Serialize, Deserialize)]
304pub struct QueueOperationEntry {
305 pub operation: String,
306 #[serde(rename = "sessionId")]
307 pub session_id: String,
308 pub timestamp: DateTime<Utc>,
309 pub content: String,
310 pub priority: i32,
311 #[serde(rename = "itemId")]
312 pub item_id: String,
313}
314
315#[derive(Clone, Debug, Serialize, Deserialize)]
316pub struct SummaryEntry {
317 #[serde(rename = "sessionId")]
318 pub session_id: String,
319 pub summary: String,
320 #[serde(rename = "leafUuid", skip_serializing_if = "Option::is_none")]
321 pub leaf_uuid: Option<String>,
322 pub timestamp: DateTime<Utc>,
323}
324
325#[derive(Clone, Debug, Serialize, Deserialize)]
326pub struct SessionMetaEntry {
327 #[serde(rename = "sessionId")]
328 pub session_id: String,
329 #[serde(rename = "parentSessionId", skip_serializing_if = "Option::is_none")]
330 pub parent_session_id: Option<String>,
331 #[serde(rename = "tenantId", skip_serializing_if = "Option::is_none")]
332 pub tenant_id: Option<String>,
333 #[serde(rename = "sessionType")]
334 pub session_type: serde_json::Value,
335 pub mode: String,
336 pub state: String,
337 pub config: serde_json::Value,
338 #[serde(rename = "permissionPolicy")]
339 pub permission_policy: serde_json::Value,
340 #[serde(rename = "totalUsage", default)]
341 pub total_usage: UsageInfo,
342 #[serde(rename = "totalCostUsd", default)]
343 pub total_cost_usd: Decimal,
344 #[serde(rename = "staticContextHash", skip_serializing_if = "Option::is_none")]
345 pub static_context_hash: Option<String>,
346 #[serde(skip_serializing_if = "Option::is_none")]
347 pub error: Option<String>,
348 #[serde(rename = "createdAt")]
349 pub created_at: DateTime<Utc>,
350 #[serde(rename = "updatedAt")]
351 pub updated_at: DateTime<Utc>,
352 #[serde(rename = "expiresAt", skip_serializing_if = "Option::is_none")]
353 pub expires_at: Option<DateTime<Utc>>,
354}
355
356#[derive(Clone, Debug, Serialize, Deserialize)]
357pub struct TodoEntry {
358 pub id: String,
359 #[serde(rename = "sessionId")]
360 pub session_id: String,
361 pub content: String,
362 #[serde(rename = "activeForm")]
363 pub active_form: String,
364 pub status: String,
365 #[serde(rename = "planId", skip_serializing_if = "Option::is_none")]
366 pub plan_id: Option<String>,
367 #[serde(rename = "createdAt")]
368 pub created_at: DateTime<Utc>,
369 #[serde(rename = "startedAt", skip_serializing_if = "Option::is_none")]
370 pub started_at: Option<DateTime<Utc>>,
371 #[serde(rename = "completedAt", skip_serializing_if = "Option::is_none")]
372 pub completed_at: Option<DateTime<Utc>>,
373}
374
375#[derive(Clone, Debug, Serialize, Deserialize)]
376pub struct PlanEntry {
377 pub id: String,
378 #[serde(rename = "sessionId")]
379 pub session_id: String,
380 #[serde(skip_serializing_if = "Option::is_none")]
381 pub name: Option<String>,
382 pub content: String,
383 pub status: String,
384 #[serde(skip_serializing_if = "Option::is_none")]
385 pub error: Option<String>,
386 #[serde(rename = "createdAt")]
387 pub created_at: DateTime<Utc>,
388 #[serde(rename = "approvedAt", skip_serializing_if = "Option::is_none")]
389 pub approved_at: Option<DateTime<Utc>>,
390 #[serde(rename = "startedAt", skip_serializing_if = "Option::is_none")]
391 pub started_at: Option<DateTime<Utc>>,
392 #[serde(rename = "completedAt", skip_serializing_if = "Option::is_none")]
393 pub completed_at: Option<DateTime<Utc>>,
394}
395
396#[derive(Clone, Debug, Serialize, Deserialize)]
397pub struct CompactEntry {
398 pub id: String,
399 #[serde(rename = "sessionId")]
400 pub session_id: String,
401 pub trigger: String,
402 #[serde(rename = "preTokens")]
403 pub pre_tokens: usize,
404 #[serde(rename = "postTokens")]
405 pub post_tokens: usize,
406 #[serde(rename = "savedTokens")]
407 pub saved_tokens: usize,
408 pub summary: String,
409 #[serde(rename = "originalCount")]
410 pub original_count: usize,
411 #[serde(rename = "newCount")]
412 pub new_count: usize,
413 #[serde(rename = "logicalParentId", skip_serializing_if = "Option::is_none")]
414 pub logical_parent_id: Option<String>,
415 #[serde(rename = "createdAt")]
416 pub created_at: DateTime<Utc>,
417}
418
419impl JsonlEntry {
424 fn from_message(session_id: &SessionId, msg: &SessionMessage) -> Self {
425 let common = EntryCommon::from_message(session_id, msg);
426
427 let serialize_content = |content: &[ContentBlock]| -> serde_json::Value {
428 serde_json::to_value(content).unwrap_or_else(|e| {
429 tracing::warn!(
430 session_id = %session_id,
431 error = %e,
432 "Failed to serialize message content, using empty array"
433 );
434 serde_json::Value::Array(Vec::new())
435 })
436 };
437
438 match msg.role {
439 Role::User => JsonlEntry::User(UserEntry {
440 common,
441 message: UserMessageContent {
442 role: "user".to_string(),
443 content: serialize_content(&msg.content),
444 },
445 is_compact_summary: msg.is_compact_summary,
446 }),
447 Role::Assistant => JsonlEntry::Assistant(AssistantEntry {
448 common,
449 message: AssistantMessageContent {
450 id: msg.metadata.request_id.clone(),
451 model: msg.metadata.model.clone(),
452 role: "assistant".to_string(),
453 stop_reason: None,
454 content: serialize_content(&msg.content),
455 usage: msg.usage.as_ref().map(UsageInfo::from),
456 },
457 request_id: msg.metadata.request_id.clone(),
458 }),
459 }
460 }
461
462 fn to_session_message(&self) -> Option<SessionMessage> {
463 match self {
464 JsonlEntry::User(entry) => {
465 let content: Vec<ContentBlock> =
466 serde_json::from_value(entry.message.content.clone()).unwrap_or_else(|e| {
467 tracing::warn!(
468 uuid = %entry.common.uuid,
469 error = %e,
470 "Failed to deserialize user message content"
471 );
472 Vec::new()
473 });
474 let mut msg = SessionMessage::user(content);
475 msg.id = MessageId::from_string(&entry.common.uuid);
476 msg.parent_id = entry
477 .common
478 .parent_uuid
479 .as_ref()
480 .map(MessageId::from_string);
481 msg.timestamp = entry.common.timestamp;
482 msg.is_sidechain = entry.common.is_sidechain;
483 msg.is_compact_summary = entry.is_compact_summary;
484 msg.environment = Some(entry.common.to_environment());
485 Some(msg)
486 }
487 JsonlEntry::Assistant(entry) => {
488 let content: Vec<ContentBlock> =
489 serde_json::from_value(entry.message.content.clone()).unwrap_or_else(|e| {
490 tracing::warn!(
491 uuid = %entry.common.uuid,
492 error = %e,
493 "Failed to deserialize assistant message content"
494 );
495 Vec::new()
496 });
497 let mut msg = SessionMessage::assistant(content);
498 msg.id = MessageId::from_string(&entry.common.uuid);
499 msg.parent_id = entry
500 .common
501 .parent_uuid
502 .as_ref()
503 .map(MessageId::from_string);
504 msg.timestamp = entry.common.timestamp;
505 msg.is_sidechain = entry.common.is_sidechain;
506 msg.usage = entry.message.usage.as_ref().map(TokenUsage::from);
507 msg.metadata.model.clone_from(&entry.message.model);
508 msg.metadata.request_id.clone_from(&entry.request_id);
509 msg.environment = Some(entry.common.to_environment());
510 Some(msg)
511 }
512 _ => None,
513 }
514 }
515
516 #[cfg(test)]
517 fn message_uuid(&self) -> Option<&str> {
518 match self {
519 JsonlEntry::User(e) => Some(&e.common.uuid),
520 JsonlEntry::Assistant(e) => Some(&e.common.uuid),
521 _ => None,
522 }
523 }
524}
525
526#[derive(Clone, Debug)]
531struct SessionMeta {
532 path: PathBuf,
533 project_path: Option<PathBuf>,
534 tenant_id: Option<String>,
535 updated_at: DateTime<Utc>,
536 expires_at: Option<DateTime<Utc>>,
537 persisted_ids: HashSet<String>,
538 todos_hash: u64,
539 plan_hash: u64,
540}
541
542#[derive(Default)]
543struct SessionIndex {
544 sessions: HashMap<SessionId, SessionMeta>,
545 by_project: HashMap<PathBuf, Vec<SessionId>>,
546 by_tenant: HashMap<String, Vec<SessionId>>,
547}
548
549impl SessionIndex {
550 fn insert(&mut self, session_id: SessionId, meta: SessionMeta) {
551 self.remove(&session_id);
553
554 if let Some(ref project) = meta.project_path {
555 self.by_project
556 .entry(project.clone())
557 .or_default()
558 .push(session_id);
559 }
560 if let Some(ref tenant) = meta.tenant_id {
561 self.by_tenant
562 .entry(tenant.clone())
563 .or_default()
564 .push(session_id);
565 }
566 self.sessions.insert(session_id, meta);
567 }
568
569 fn remove(&mut self, session_id: &SessionId) -> Option<SessionMeta> {
570 let meta = self.sessions.remove(session_id)?;
571
572 if let Some(ref project) = meta.project_path
573 && let Some(ids) = self.by_project.get_mut(project)
574 {
575 ids.retain(|id| id != session_id);
576 }
577 if let Some(ref tenant) = meta.tenant_id
578 && let Some(ids) = self.by_tenant.get_mut(tenant)
579 {
580 ids.retain(|id| id != session_id);
581 }
582 Some(meta)
583 }
584}
585
586fn read_entries_sync(path: &Path) -> SessionResult<Vec<JsonlEntry>> {
591 if !path.exists() {
592 return Ok(Vec::new());
593 }
594
595 let file = std::fs::File::open(path).map_err(|e| SessionError::Storage {
596 message: format!("Failed to open {}: {}", path.display(), e),
597 })?;
598
599 let reader = BufReader::with_capacity(64 * 1024, file);
600 let mut entries = Vec::with_capacity(128);
601
602 for (line_num, line) in reader.lines().enumerate() {
603 let line = line.map_err(|e| SessionError::Storage {
604 message: format!("Read error at line {}: {}", line_num + 1, e),
605 })?;
606
607 if line.trim().is_empty() {
608 continue;
609 }
610
611 match serde_json::from_str::<JsonlEntry>(&line) {
612 Ok(entry) => entries.push(entry),
613 Err(e) => {
614 tracing::warn!(
615 path = %path.display(),
616 line = line_num + 1,
617 error = %e,
618 "Skipping malformed JSONL entry"
619 );
620 }
621 }
622 }
623
624 Ok(entries)
625}
626
627fn append_entries_sync(path: &Path, entries: &[JsonlEntry], sync: bool) -> SessionResult<()> {
628 if entries.is_empty() {
629 return Ok(());
630 }
631
632 if let Some(parent) = path.parent() {
633 std::fs::create_dir_all(parent).map_err(|e| SessionError::Storage {
634 message: format!("Failed to create directory {}: {}", parent.display(), e),
635 })?;
636 }
637
638 let file = std::fs::OpenOptions::new()
639 .create(true)
640 .append(true)
641 .open(path)
642 .map_err(|e| SessionError::Storage {
643 message: format!("Failed to open {} for writing: {}", path.display(), e),
644 })?;
645
646 let mut writer = std::io::BufWriter::with_capacity(64 * 1024, file);
647
648 for entry in entries {
649 serde_json::to_writer(&mut writer, entry)?;
650 writeln!(writer).map_err(|e| SessionError::Storage {
651 message: format!("Write failed: {}", e),
652 })?;
653 }
654
655 writer.flush().map_err(|e| SessionError::Storage {
656 message: format!("Flush failed: {}", e),
657 })?;
658
659 if sync {
660 writer
661 .into_inner()
662 .map_err(|e| SessionError::Storage {
663 message: format!("Buffer error: {}", e.error()),
664 })?
665 .sync_all()
666 .map_err(|e| SessionError::Storage {
667 message: format!("Sync failed: {}", e),
668 })?;
669 }
670
671 Ok(())
672}
673
674pub struct JsonlPersistence {
679 config: JsonlConfig,
680 index: Arc<RwLock<SessionIndex>>,
681 summaries: Arc<RwLock<HashMap<SessionId, Vec<SummarySnapshot>>>>,
682 queue: Arc<RwLock<HashMap<SessionId, Vec<QueueItem>>>>,
683}
684
685impl JsonlPersistence {
686 pub async fn new(config: JsonlConfig) -> SessionResult<Self> {
687 tokio::fs::create_dir_all(config.projects_dir())
688 .await
689 .map_err(|e| SessionError::Storage {
690 message: format!("Failed to create projects directory: {}", e),
691 })?;
692
693 let persistence = Self {
694 config,
695 index: Arc::new(RwLock::new(SessionIndex::default())),
696 summaries: Arc::new(RwLock::new(HashMap::new())),
697 queue: Arc::new(RwLock::new(HashMap::new())),
698 };
699
700 persistence.rebuild_index().await?;
701 Ok(persistence)
702 }
703
704 pub async fn default_config() -> SessionResult<Self> {
705 Self::new(JsonlConfig::default()).await
706 }
707
708 async fn rebuild_index(&self) -> SessionResult<()> {
709 let projects_dir = self.config.projects_dir();
710 if !projects_dir.exists() {
711 return Ok(());
712 }
713
714 let mut index = self.index.write().await;
715 let mut summaries = self.summaries.write().await;
716 let mut queue = self.queue.write().await;
717
718 let mut entries =
719 tokio::fs::read_dir(&projects_dir)
720 .await
721 .map_err(|e| SessionError::Storage {
722 message: format!("Failed to read projects dir: {}", e),
723 })?;
724
725 while let Some(project_entry) =
726 entries
727 .next_entry()
728 .await
729 .map_err(|e| SessionError::Storage {
730 message: format!("Failed to read entry: {}", e),
731 })?
732 {
733 let file_type = project_entry.file_type().await.ok();
734 if !file_type.map(|t| t.is_dir()).unwrap_or(false) {
735 continue;
736 }
737
738 let project_path = project_entry.path();
739 let mut files =
740 tokio::fs::read_dir(&project_path)
741 .await
742 .map_err(|e| SessionError::Storage {
743 message: format!("Failed to read project dir: {}", e),
744 })?;
745
746 while let Some(file_entry) =
747 files
748 .next_entry()
749 .await
750 .map_err(|e| SessionError::Storage {
751 message: format!("Failed to read file entry: {}", e),
752 })?
753 {
754 let file_path = file_entry.path();
755 if file_path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
756 continue;
757 }
758
759 let session_id = match file_path
760 .file_stem()
761 .and_then(|s| s.to_str())
762 .and_then(SessionId::parse)
763 {
764 Some(id) => id,
765 None => continue,
766 };
767
768 let path_clone = file_path.clone();
770 let parsed = tokio::task::spawn_blocking(move || read_entries_sync(&path_clone))
771 .await
772 .map_err(|e| SessionError::Storage {
773 message: format!("Task join error: {}", e),
774 })??;
775
776 let (meta, session_summaries, session_queue) =
777 Self::parse_file_metadata(session_id, file_path, &parsed);
778
779 index.insert(session_id, meta);
780 if !session_summaries.is_empty() {
781 summaries.insert(session_id, session_summaries);
782 }
783 if !session_queue.is_empty() {
784 queue.insert(session_id, session_queue);
785 }
786 }
787 }
788
789 Ok(())
790 }
791
792 fn parse_file_metadata(
793 session_id: SessionId,
794 path: PathBuf,
795 entries: &[JsonlEntry],
796 ) -> (SessionMeta, Vec<SummarySnapshot>, Vec<QueueItem>) {
797 let mut project_path: Option<PathBuf> = None;
798 let mut tenant_id: Option<String> = None;
799 let mut updated_at = Utc::now();
800 let mut expires_at: Option<DateTime<Utc>> = None;
801 let mut summaries = Vec::new();
802 let mut queue_items: HashMap<String, QueueItem> = HashMap::new();
803 let mut persisted_ids = HashSet::with_capacity(entries.len());
804
805 for entry in entries {
806 match entry {
807 JsonlEntry::User(e) => {
808 if project_path.is_none() {
809 project_path = e.common.cwd.clone();
810 }
811 updated_at = e.common.timestamp;
812 persisted_ids.insert(e.common.uuid.clone());
813 }
814 JsonlEntry::Assistant(e) => {
815 if project_path.is_none() {
816 project_path = e.common.cwd.clone();
817 }
818 updated_at = e.common.timestamp;
819 persisted_ids.insert(e.common.uuid.clone());
820 }
821 JsonlEntry::SessionMeta(m) => {
822 tenant_id.clone_from(&m.tenant_id);
823 updated_at = m.updated_at;
824 expires_at = m.expires_at;
825 }
826 JsonlEntry::Summary(s) => {
827 summaries.push(SummarySnapshot {
828 id: Uuid::new_v4(),
829 session_id,
830 summary: s.summary.clone(),
831 leaf_message_id: s.leaf_uuid.as_ref().map(MessageId::from_string),
832 created_at: s.timestamp,
833 });
834 }
835 JsonlEntry::QueueOperation(q) => {
836 let item_id = match Uuid::parse_str(&q.item_id) {
837 Ok(id) => id,
838 Err(_) => continue,
839 };
840 match q.operation.as_str() {
841 "enqueue" => {
842 queue_items.insert(
843 q.item_id.clone(),
844 QueueItem {
845 id: item_id,
846 session_id,
847 operation: QueueOperation::Enqueue,
848 content: q.content.clone(),
849 priority: q.priority,
850 status: QueueStatus::Pending,
851 created_at: q.timestamp,
852 processed_at: None,
853 },
854 );
855 }
856 "dequeue" => {
857 if let Some(item) = queue_items.get_mut(&q.item_id) {
858 item.status = QueueStatus::Processing;
859 item.processed_at = Some(q.timestamp);
860 }
861 }
862 "cancel" => {
863 if let Some(item) = queue_items.get_mut(&q.item_id) {
864 item.status = QueueStatus::Cancelled;
865 item.processed_at = Some(q.timestamp);
866 }
867 }
868 _ => {}
869 }
870 }
871 _ => {}
872 }
873 }
874
875 let queue: Vec<QueueItem> = queue_items.into_values().collect();
876
877 (
878 SessionMeta {
879 path,
880 project_path,
881 tenant_id,
882 updated_at,
883 expires_at,
884 persisted_ids,
885 todos_hash: 0,
886 plan_hash: 0,
887 },
888 summaries,
889 queue,
890 )
891 }
892
893 fn session_file_path(&self, session_id: &SessionId, project_path: Option<&Path>) -> PathBuf {
894 let dir = match project_path {
895 Some(p) => self.config.project_dir(p),
896 None => self.config.projects_dir().join("_default"),
897 };
898 dir.join(format!("{}.jsonl", session_id))
899 }
900
901 fn get_project_path(session: &Session) -> Option<PathBuf> {
902 session
903 .messages
904 .first()
905 .and_then(|m| m.environment.as_ref())
906 .and_then(|e| e.cwd.clone())
907 }
908
909 fn compute_todos_hash(todos: &[TodoItem]) -> u64 {
912 let hasher_state = ahash::RandomState::with_seeds(1234, 5678, 9012, 3456);
913 let mut hasher = hasher_state.build_hasher();
914 for todo in todos {
915 todo.id.hash(&mut hasher);
916 enum_to_jsonl(&todo.status, "pending").hash(&mut hasher);
917 todo.content.hash(&mut hasher);
918 }
919 hasher.finish()
920 }
921
922 fn compute_plan_hash(plan: Option<&Plan>) -> u64 {
925 let hasher_state = ahash::RandomState::with_seeds(1234, 5678, 9012, 3456);
926 let mut hasher = hasher_state.build_hasher();
927 if let Some(p) = plan {
928 p.id.hash(&mut hasher);
929 enum_to_jsonl(&p.status, "draft").hash(&mut hasher);
930 p.content.hash(&mut hasher);
931 }
932 hasher.finish()
933 }
934
935 fn session_to_meta_entry(session: &Session) -> JsonlEntry {
936 let warn_serialize = |field: &str, e: serde_json::Error| -> serde_json::Value {
937 tracing::warn!(
938 session_id = %session.id,
939 field,
940 error = %e,
941 "Failed to serialize session field"
942 );
943 serde_json::Value::Object(Default::default())
944 };
945
946 JsonlEntry::SessionMeta(SessionMetaEntry {
947 session_id: session.id.to_string(),
948 parent_session_id: session.parent_id.map(|p| p.to_string()),
949 tenant_id: session.tenant_id.clone(),
950 session_type: serde_json::to_value(&session.session_type)
951 .unwrap_or_else(|e| warn_serialize("session_type", e)),
952 mode: "stateless".to_string(),
953 state: enum_to_jsonl(&session.state, "created"),
954 config: serde_json::to_value(&session.config)
955 .unwrap_or_else(|e| warn_serialize("config", e)),
956 permission_policy: serde_json::to_value(&session.permissions)
957 .unwrap_or_else(|e| warn_serialize("permissions", e)),
958 total_usage: UsageInfo::from(&session.total_usage),
959 total_cost_usd: session.total_cost_usd,
960 static_context_hash: session.static_context_hash.clone(),
961 error: session.error.clone(),
962 created_at: session.created_at,
963 updated_at: session.updated_at,
964 expires_at: session.expires_at,
965 })
966 }
967
968 fn reconstruct_session(session_id: SessionId, entries: Vec<JsonlEntry>) -> Session {
969 let mut session = Session::new(SessionConfig::default());
970 session.id = session_id;
971
972 let mut messages: HashMap<String, SessionMessage> = HashMap::with_capacity(entries.len());
973 let mut todos_map: HashMap<String, TodoItem> = HashMap::new();
974 let mut latest_plan: Option<Plan> = None;
975 let mut compacts: Vec<CompactRecord> = Vec::new();
976
977 for entry in entries {
978 match entry {
979 JsonlEntry::User(_) | JsonlEntry::Assistant(_) => {
980 if let Some(msg) = entry.to_session_message() {
981 messages.insert(msg.id.to_string(), msg);
982 }
983 }
984 JsonlEntry::SessionMeta(m) => {
985 session.tenant_id = m.tenant_id;
986 session.parent_id = m
987 .parent_session_id
988 .as_ref()
989 .and_then(|s| SessionId::parse(s));
990 session.session_type =
991 serde_json::from_value(m.session_type).unwrap_or(SessionType::Main);
992 session.state = jsonl_to_enum(&m.state).unwrap_or_default();
994 session.config = serde_json::from_value(m.config).unwrap_or_else(|e| {
995 tracing::warn!(
996 session_id = %session_id,
997 error = %e,
998 "Failed to deserialize session config, using default"
999 );
1000 Default::default()
1001 });
1002 session.permissions = serde_json::from_value(m.permission_policy)
1003 .unwrap_or_else(|e| {
1004 tracing::warn!(
1005 session_id = %session_id,
1006 error = %e,
1007 "Failed to deserialize session permissions, using default"
1008 );
1009 Default::default()
1010 });
1011 session.total_usage = TokenUsage::from(&m.total_usage);
1012 session.total_cost_usd = m.total_cost_usd;
1013 session.static_context_hash = m.static_context_hash;
1014 session.error = m.error;
1015 session.created_at = m.created_at;
1016 session.updated_at = m.updated_at;
1017 session.expires_at = m.expires_at;
1018 }
1019 JsonlEntry::Summary(s) => {
1020 session.summary = Some(s.summary);
1021 }
1022 JsonlEntry::Todo(t) => {
1023 let todo = TodoItem {
1024 id: Uuid::parse_str(&t.id).unwrap_or_else(|_| Uuid::new_v4()),
1025 session_id,
1026 content: t.content,
1027 active_form: t.active_form,
1028 status: jsonl_to_enum(&t.status).unwrap_or_default(),
1029 plan_id: t.plan_id.and_then(|s| Uuid::parse_str(&s).ok()),
1030 created_at: t.created_at,
1031 started_at: t.started_at,
1032 completed_at: t.completed_at,
1033 };
1034 todos_map.insert(t.id, todo);
1036 }
1037 JsonlEntry::Plan(p) => {
1038 let plan = Plan {
1039 id: Uuid::parse_str(&p.id).unwrap_or_else(|_| Uuid::new_v4()),
1040 session_id,
1041 name: p.name,
1042 content: p.content,
1043 status: jsonl_to_enum(&p.status).unwrap_or_default(),
1044 error: p.error,
1045 created_at: p.created_at,
1046 approved_at: p.approved_at,
1047 started_at: p.started_at,
1048 completed_at: p.completed_at,
1049 };
1050 latest_plan = Some(plan);
1052 }
1053 JsonlEntry::Compact(c) => {
1054 compacts.push(CompactRecord {
1055 id: Uuid::parse_str(&c.id).unwrap_or_else(|_| Uuid::new_v4()),
1056 session_id,
1057 trigger: jsonl_to_enum(&c.trigger).unwrap_or_default(),
1058 pre_tokens: c.pre_tokens,
1059 post_tokens: c.post_tokens,
1060 saved_tokens: c.saved_tokens,
1061 summary: c.summary,
1062 original_count: c.original_count,
1063 new_count: c.new_count,
1064 logical_parent_id: c.logical_parent_id.as_ref().map(MessageId::from_string),
1065 created_at: c.created_at,
1066 });
1067 }
1068 _ => {}
1069 }
1070 }
1071
1072 let ordered = Self::topological_sort(&messages);
1074 session.messages = Vec::with_capacity(ordered.len());
1075 for msg in ordered {
1076 session.add_message(msg);
1077 }
1078
1079 session.todos = todos_map.into_values().collect();
1081 session
1082 .todos
1083 .sort_by(|a, b| a.created_at.cmp(&b.created_at));
1084 session.current_plan = latest_plan;
1085 session.compact_history = VecDeque::from(compacts);
1086
1087 session
1088 }
1089
1090 fn topological_sort(messages: &HashMap<String, SessionMessage>) -> Vec<SessionMessage> {
1091 if messages.is_empty() {
1092 return Vec::new();
1093 }
1094
1095 let mut children: HashMap<Option<String>, Vec<&SessionMessage>> = HashMap::new();
1097 for msg in messages.values() {
1098 children
1099 .entry(msg.parent_id.as_ref().map(|p| p.to_string()))
1100 .or_default()
1101 .push(msg);
1102 }
1103
1104 for group in children.values_mut() {
1106 group.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
1107 }
1108
1109 let mut result = Vec::with_capacity(messages.len());
1111 let mut queue = std::collections::VecDeque::new();
1112
1113 if let Some(roots) = children.remove(&None) {
1115 queue.extend(roots);
1116 }
1117
1118 while let Some(msg) = queue.pop_front() {
1119 let id = msg.id.to_string();
1120 result.push(msg.clone());
1121 if let Some(child_msgs) = children.remove(&Some(id)) {
1122 queue.extend(child_msgs);
1123 }
1124 }
1125
1126 result
1127 }
1128}
1129
1130#[async_trait::async_trait]
1131impl Persistence for JsonlPersistence {
1132 fn name(&self) -> &str {
1133 "jsonl"
1134 }
1135
1136 async fn save(&self, session: &Session) -> SessionResult<()> {
1137 let project_path = Self::get_project_path(session);
1138 let file_path = self.session_file_path(&session.id, project_path.as_deref());
1139
1140 let (persisted_ids, prev_todos_hash, prev_plan_hash) = {
1142 let index = self.index.read().await;
1143 match index.sessions.get(&session.id) {
1144 Some(m) => (m.persisted_ids.clone(), m.todos_hash, m.plan_hash),
1145 None => (HashSet::new(), 0, 0),
1146 }
1147 };
1148
1149 let mut new_entries = Vec::new();
1150 let mut new_ids = HashSet::new();
1151
1152 new_entries.push(Self::session_to_meta_entry(session));
1153
1154 for msg in &session.messages {
1156 let id = msg.id.to_string();
1157 if !persisted_ids.contains(&id) {
1158 new_entries.push(JsonlEntry::from_message(&session.id, msg));
1159 new_ids.insert(id);
1160 }
1161 }
1162
1163 let current_todos_hash = Self::compute_todos_hash(&session.todos);
1165 let current_plan_hash = Self::compute_plan_hash(session.current_plan.as_ref());
1166
1167 if current_todos_hash != prev_todos_hash {
1169 for todo in &session.todos {
1170 new_entries.push(JsonlEntry::Todo(TodoEntry {
1171 id: todo.id.to_string(),
1172 session_id: session.id.to_string(),
1173 content: todo.content.clone(),
1174 active_form: todo.active_form.clone(),
1175 status: enum_to_jsonl(&todo.status, "pending"),
1176 plan_id: todo.plan_id.map(|id| id.to_string()),
1177 created_at: todo.created_at,
1178 started_at: todo.started_at,
1179 completed_at: todo.completed_at,
1180 }));
1181 }
1182 }
1183
1184 if current_plan_hash != prev_plan_hash
1186 && let Some(ref plan) = session.current_plan
1187 {
1188 new_entries.push(JsonlEntry::Plan(PlanEntry {
1189 id: plan.id.to_string(),
1190 session_id: session.id.to_string(),
1191 name: plan.name.clone(),
1192 content: plan.content.clone(),
1193 status: enum_to_jsonl(&plan.status, "draft"),
1194 error: plan.error.clone(),
1195 created_at: plan.created_at,
1196 approved_at: plan.approved_at,
1197 started_at: plan.started_at,
1198 completed_at: plan.completed_at,
1199 }));
1200 }
1201
1202 for compact in &session.compact_history {
1204 let compact_id = format!("compact:{}", compact.id);
1205 if !persisted_ids.contains(&compact_id) {
1206 new_entries.push(JsonlEntry::Compact(CompactEntry {
1207 id: compact.id.to_string(),
1208 session_id: session.id.to_string(),
1209 trigger: enum_to_jsonl(&compact.trigger, "manual"),
1210 pre_tokens: compact.pre_tokens,
1211 post_tokens: compact.post_tokens,
1212 saved_tokens: compact.saved_tokens,
1213 summary: compact.summary.clone(),
1214 original_count: compact.original_count,
1215 new_count: compact.new_count,
1216 logical_parent_id: compact.logical_parent_id.as_ref().map(|id| id.to_string()),
1217 created_at: compact.created_at,
1218 }));
1219 new_ids.insert(compact_id);
1220 }
1221 }
1222
1223 if new_entries.is_empty() {
1224 return Ok(());
1225 }
1226
1227 let path_clone = file_path.clone();
1229 let sync = self.config.sync_mode == SyncMode::OnWrite;
1230 tokio::task::spawn_blocking(move || append_entries_sync(&path_clone, &new_entries, sync))
1231 .await
1232 .map_err(|e| SessionError::Storage {
1233 message: format!("Task join error: {}", e),
1234 })??;
1235
1236 let mut index = self.index.write().await;
1237 let mut persisted = persisted_ids;
1238 persisted.extend(new_ids);
1239
1240 index.insert(
1241 session.id,
1242 SessionMeta {
1243 path: file_path,
1244 project_path,
1245 tenant_id: session.tenant_id.clone(),
1246 updated_at: session.updated_at,
1247 expires_at: session.expires_at,
1248 persisted_ids: persisted,
1249 todos_hash: current_todos_hash,
1250 plan_hash: current_plan_hash,
1251 },
1252 );
1253
1254 Ok(())
1255 }
1256
1257 async fn load(&self, id: &SessionId) -> SessionResult<Option<Session>> {
1258 let path = {
1259 let index = self.index.read().await;
1260 match index.sessions.get(id) {
1261 Some(m) => m.path.clone(),
1262 None => return Ok(None),
1263 }
1264 };
1265
1266 let entries = tokio::task::spawn_blocking(move || read_entries_sync(&path))
1267 .await
1268 .map_err(|e| SessionError::Storage {
1269 message: format!("Task join error: {}", e),
1270 })??;
1271
1272 if entries.is_empty() {
1273 return Ok(None);
1274 }
1275
1276 let session = Self::reconstruct_session(*id, entries);
1277 Ok(Some(session))
1278 }
1279
1280 async fn delete(&self, id: &SessionId) -> SessionResult<bool> {
1281 let meta = {
1282 let mut index = self.index.write().await;
1283 index.remove(id)
1284 };
1285
1286 let Some(meta) = meta else {
1287 return Ok(false);
1288 };
1289
1290 if meta.path.exists() {
1291 tokio::fs::remove_file(&meta.path)
1292 .await
1293 .map_err(|e| SessionError::Storage {
1294 message: format!("Failed to delete {}: {}", meta.path.display(), e),
1295 })?;
1296 }
1297
1298 self.summaries.write().await.remove(id);
1299 self.queue.write().await.remove(id);
1300 Ok(true)
1301 }
1302
1303 async fn list(&self, tenant_id: Option<&str>) -> SessionResult<Vec<SessionId>> {
1304 let index = self.index.read().await;
1305 Ok(match tenant_id {
1306 Some(tid) => index.by_tenant.get(tid).cloned().unwrap_or_default(),
1307 None => index.sessions.keys().copied().collect(),
1308 })
1309 }
1310
1311 async fn add_summary(&self, snapshot: SummarySnapshot) -> SessionResult<()> {
1312 let path = {
1313 let index = self.index.read().await;
1314 index
1315 .sessions
1316 .get(&snapshot.session_id)
1317 .map(|m| m.path.clone())
1318 };
1319
1320 if let Some(path) = path {
1321 let entry = JsonlEntry::Summary(SummaryEntry {
1322 session_id: snapshot.session_id.to_string(),
1323 summary: snapshot.summary.clone(),
1324 leaf_uuid: snapshot.leaf_message_id.as_ref().map(|id| id.to_string()),
1325 timestamp: snapshot.created_at,
1326 });
1327
1328 let sync = self.config.sync_mode == SyncMode::OnWrite;
1329 tokio::task::spawn_blocking(move || append_entries_sync(&path, &[entry], sync))
1330 .await
1331 .map_err(|e| SessionError::Storage {
1332 message: format!("Task join error: {}", e),
1333 })??;
1334 }
1335
1336 self.summaries
1337 .write()
1338 .await
1339 .entry(snapshot.session_id)
1340 .or_default()
1341 .push(snapshot);
1342
1343 Ok(())
1344 }
1345
1346 async fn get_summaries(&self, session_id: &SessionId) -> SessionResult<Vec<SummarySnapshot>> {
1347 Ok(self
1348 .summaries
1349 .read()
1350 .await
1351 .get(session_id)
1352 .cloned()
1353 .unwrap_or_default())
1354 }
1355
1356 async fn enqueue(
1357 &self,
1358 session_id: &SessionId,
1359 content: String,
1360 priority: i32,
1361 ) -> SessionResult<QueueItem> {
1362 let item = QueueItem::enqueue(*session_id, content.clone()).priority(priority);
1363
1364 let path = {
1365 let index = self.index.read().await;
1366 index.sessions.get(session_id).map(|m| m.path.clone())
1367 };
1368
1369 if let Some(path) = path {
1370 let entry = JsonlEntry::QueueOperation(QueueOperationEntry {
1371 operation: "enqueue".to_string(),
1372 session_id: session_id.to_string(),
1373 timestamp: Utc::now(),
1374 content,
1375 priority,
1376 item_id: item.id.to_string(),
1377 });
1378
1379 let sync = self.config.sync_mode == SyncMode::OnWrite;
1380 tokio::task::spawn_blocking(move || append_entries_sync(&path, &[entry], sync))
1381 .await
1382 .map_err(|e| SessionError::Storage {
1383 message: format!("Task join error: {}", e),
1384 })??;
1385 }
1386
1387 self.queue
1388 .write()
1389 .await
1390 .entry(*session_id)
1391 .or_default()
1392 .push(item.clone());
1393
1394 Ok(item)
1395 }
1396
1397 async fn dequeue(&self, session_id: &SessionId) -> SessionResult<Option<QueueItem>> {
1398 let dequeued = {
1399 let mut queue = self.queue.write().await;
1400 let items = match queue.get_mut(session_id) {
1401 Some(items) => items,
1402 None => return Ok(None),
1403 };
1404
1405 items.sort_by(|a, b| b.priority.cmp(&a.priority));
1406
1407 let mut result = None;
1408 for item in items.iter_mut() {
1409 if item.status == QueueStatus::Pending {
1410 item.start_processing();
1411 result = Some(item.clone());
1412 break;
1413 }
1414 }
1415 result
1416 };
1417
1418 if let Some(ref item) = dequeued {
1419 let path = {
1420 let index = self.index.read().await;
1421 index.sessions.get(session_id).map(|m| m.path.clone())
1422 };
1423
1424 if let Some(path) = path {
1425 let entry = JsonlEntry::QueueOperation(QueueOperationEntry {
1426 operation: "dequeue".to_string(),
1427 session_id: session_id.to_string(),
1428 timestamp: Utc::now(),
1429 content: item.content.clone(),
1430 priority: item.priority,
1431 item_id: item.id.to_string(),
1432 });
1433
1434 let sync = self.config.sync_mode == SyncMode::OnWrite;
1435 tokio::task::spawn_blocking(move || append_entries_sync(&path, &[entry], sync))
1436 .await
1437 .map_err(|e| SessionError::Storage {
1438 message: format!("Task join error: {}", e),
1439 })??;
1440 }
1441 }
1442
1443 Ok(dequeued)
1444 }
1445
1446 async fn cancel_queued(&self, item_id: Uuid) -> SessionResult<bool> {
1447 let cancelled = {
1448 let mut queue = self.queue.write().await;
1449 let mut found = None;
1450 for items in queue.values_mut() {
1451 if let Some(item) = items.iter_mut().find(|i| i.id == item_id) {
1452 item.cancel();
1453 found = Some(item.clone());
1454 break;
1455 }
1456 }
1457 found
1458 };
1459
1460 let Some(item) = cancelled else {
1461 return Ok(false);
1462 };
1463
1464 let path = {
1465 let index = self.index.read().await;
1466 index.sessions.get(&item.session_id).map(|m| m.path.clone())
1467 };
1468
1469 if let Some(path) = path {
1470 let entry = JsonlEntry::QueueOperation(QueueOperationEntry {
1471 operation: "cancel".to_string(),
1472 session_id: item.session_id.to_string(),
1473 timestamp: Utc::now(),
1474 content: item.content.clone(),
1475 priority: item.priority,
1476 item_id: item.id.to_string(),
1477 });
1478
1479 let sync = self.config.sync_mode == SyncMode::OnWrite;
1480 tokio::task::spawn_blocking(move || append_entries_sync(&path, &[entry], sync))
1481 .await
1482 .map_err(|e| SessionError::Storage {
1483 message: format!("Task join error: {}", e),
1484 })??;
1485 }
1486
1487 Ok(true)
1488 }
1489
1490 async fn pending_queue(&self, session_id: &SessionId) -> SessionResult<Vec<QueueItem>> {
1491 Ok(self
1492 .queue
1493 .read()
1494 .await
1495 .get(session_id)
1496 .map(|items| {
1497 items
1498 .iter()
1499 .filter(|i| i.status == QueueStatus::Pending)
1500 .cloned()
1501 .collect()
1502 })
1503 .unwrap_or_default())
1504 }
1505
1506 async fn cleanup_expired(&self) -> SessionResult<usize> {
1507 let now = Utc::now();
1508 let retention_cutoff = now - chrono::Duration::days(self.config.retention_days as i64);
1509
1510 let (expired_ids, expired_paths) = {
1511 let mut index = self.index.write().await;
1512 let expired_ids: Vec<SessionId> = index
1513 .sessions
1514 .iter()
1515 .filter(|(_, m)| {
1516 if let Some(expires_at) = m.expires_at {
1517 expires_at < now
1518 } else {
1519 m.updated_at < retention_cutoff
1520 }
1521 })
1522 .map(|(id, _)| *id)
1523 .collect();
1524
1525 let mut paths = Vec::with_capacity(expired_ids.len());
1526 for id in &expired_ids {
1527 if let Some(meta) = index.remove(id) {
1528 paths.push(meta.path);
1529 }
1530 }
1531 (expired_ids, paths)
1532 };
1533
1534 let count = expired_paths.len();
1535
1536 if !expired_ids.is_empty() {
1537 let mut summaries = self.summaries.write().await;
1538 let mut queue = self.queue.write().await;
1539 for id in &expired_ids {
1540 summaries.remove(id);
1541 queue.remove(id);
1542 }
1543 }
1544
1545 for path in expired_paths {
1546 let _ = tokio::fs::remove_file(&path).await;
1547 }
1548
1549 Ok(count)
1550 }
1551}
1552
1553#[cfg(test)]
1558mod tests {
1559 use super::*;
1560 use crate::types::ContentBlock;
1561 use tempfile::TempDir;
1562
1563 async fn create_test_persistence() -> (JsonlPersistence, TempDir) {
1564 let temp_dir = TempDir::new().unwrap();
1565 let config = JsonlConfig::builder()
1566 .base_dir(temp_dir.path().to_path_buf())
1567 .build();
1568 let persistence = JsonlPersistence::new(config).await.unwrap();
1569 (persistence, temp_dir)
1570 }
1571
1572 #[tokio::test]
1573 async fn test_save_and_load_session() {
1574 let (persistence, _temp) = create_test_persistence().await;
1575
1576 let mut session = Session::new(SessionConfig::default());
1577 session.add_message(SessionMessage::user(vec![ContentBlock::text("Hello")]));
1578 session.add_message(SessionMessage::assistant(vec![ContentBlock::text(
1579 "Hi there!",
1580 )]));
1581
1582 persistence.save(&session).await.unwrap();
1583
1584 let loaded = persistence.load(&session.id).await.unwrap().unwrap();
1585 assert_eq!(loaded.id, session.id);
1586 assert_eq!(loaded.messages.len(), 2);
1587 }
1588
1589 #[tokio::test]
1590 async fn test_incremental_save() {
1591 let (persistence, _temp) = create_test_persistence().await;
1592
1593 let mut session = Session::new(SessionConfig::default());
1594 session.add_message(SessionMessage::user(vec![ContentBlock::text("First")]));
1595 persistence.save(&session).await.unwrap();
1596
1597 session.add_message(SessionMessage::assistant(vec![ContentBlock::text(
1598 "Second",
1599 )]));
1600 persistence.save(&session).await.unwrap();
1601
1602 let loaded = persistence.load(&session.id).await.unwrap().unwrap();
1603 assert_eq!(loaded.messages.len(), 2);
1604 }
1605
1606 #[tokio::test]
1607 async fn test_delete_session() {
1608 let (persistence, _temp) = create_test_persistence().await;
1609
1610 let session = Session::new(SessionConfig::default());
1611 persistence.save(&session).await.unwrap();
1612
1613 assert!(persistence.delete(&session.id).await.unwrap());
1614 assert!(persistence.load(&session.id).await.unwrap().is_none());
1615 }
1616
1617 #[tokio::test]
1618 async fn test_list_sessions() {
1619 let (persistence, _temp) = create_test_persistence().await;
1620
1621 let s1 = Session::new(SessionConfig::default());
1622 let s2 = Session::new(SessionConfig::default());
1623
1624 persistence.save(&s1).await.unwrap();
1625 persistence.save(&s2).await.unwrap();
1626
1627 let list = persistence.list(None).await.unwrap();
1628 assert_eq!(list.len(), 2);
1629 }
1630
1631 #[tokio::test]
1632 async fn test_tenant_filtering() {
1633 let (persistence, _temp) = create_test_persistence().await;
1634
1635 let mut s1 = Session::new(SessionConfig::default());
1636 s1.tenant_id = Some("tenant-a".to_string());
1637
1638 let mut s2 = Session::new(SessionConfig::default());
1639 s2.tenant_id = Some("tenant-b".to_string());
1640
1641 persistence.save(&s1).await.unwrap();
1642 persistence.save(&s2).await.unwrap();
1643
1644 let list = persistence.list(Some("tenant-a")).await.unwrap();
1645 assert_eq!(list.len(), 1);
1646 assert_eq!(list[0], s1.id);
1647 }
1648
1649 #[tokio::test]
1650 async fn test_summaries() {
1651 let (persistence, _temp) = create_test_persistence().await;
1652
1653 let session = Session::new(SessionConfig::default());
1654 persistence.save(&session).await.unwrap();
1655
1656 persistence
1657 .add_summary(SummarySnapshot::new(session.id, "Summary 1"))
1658 .await
1659 .unwrap();
1660 persistence
1661 .add_summary(SummarySnapshot::new(session.id, "Summary 2"))
1662 .await
1663 .unwrap();
1664
1665 let summaries = persistence.get_summaries(&session.id).await.unwrap();
1666 assert_eq!(summaries.len(), 2);
1667 }
1668
1669 #[tokio::test]
1670 async fn test_queue_operations() {
1671 let (persistence, _temp) = create_test_persistence().await;
1672
1673 let session = Session::new(SessionConfig::default());
1674 persistence.save(&session).await.unwrap();
1675
1676 persistence
1677 .enqueue(&session.id, "Low priority".to_string(), 1)
1678 .await
1679 .unwrap();
1680 persistence
1681 .enqueue(&session.id, "High priority".to_string(), 10)
1682 .await
1683 .unwrap();
1684
1685 let next = persistence.dequeue(&session.id).await.unwrap().unwrap();
1686 assert_eq!(next.content, "High priority");
1687 }
1688
1689 #[tokio::test]
1690 async fn test_dag_reconstruction() {
1691 let (persistence, _temp) = create_test_persistence().await;
1692
1693 let mut session = Session::new(SessionConfig::default());
1694 session.add_message(SessionMessage::user(vec![ContentBlock::text("Q1")]));
1695 session.add_message(SessionMessage::assistant(vec![ContentBlock::text("A1")]));
1696 session.add_message(SessionMessage::user(vec![ContentBlock::text("Q2")]));
1697 session.add_message(SessionMessage::assistant(vec![ContentBlock::text("A2")]));
1698
1699 persistence.save(&session).await.unwrap();
1700
1701 let loaded = persistence.load(&session.id).await.unwrap().unwrap();
1702
1703 assert_eq!(loaded.messages.len(), 4);
1704 assert!(
1705 loaded.messages[0]
1706 .content
1707 .iter()
1708 .any(|c| c.as_text() == Some("Q1"))
1709 );
1710 assert!(
1711 loaded.messages[1]
1712 .content
1713 .iter()
1714 .any(|c| c.as_text() == Some("A1"))
1715 );
1716 assert!(
1717 loaded.messages[2]
1718 .content
1719 .iter()
1720 .any(|c| c.as_text() == Some("Q2"))
1721 );
1722 assert!(
1723 loaded.messages[3]
1724 .content
1725 .iter()
1726 .any(|c| c.as_text() == Some("A2"))
1727 );
1728 }
1729
1730 #[tokio::test]
1731 async fn test_project_path_encoding() {
1732 let config = JsonlConfig::default();
1733
1734 assert_eq!(
1735 config.encode_project_path(Path::new("/home/user/project")),
1736 "2f686f6d652f757365722f70726f6a656374"
1737 );
1738 assert_eq!(
1739 config.encode_project_path(Path::new("/Users/alice/work/app")),
1740 "2f55736572732f616c6963652f776f726b2f617070"
1741 );
1742 }
1743
1744 #[test]
1745 fn test_jsonl_entry_serialization() {
1746 let msg = SessionMessage::user(vec![ContentBlock::text("Hello")]);
1747 let session_id = SessionId::new();
1748 let entry = JsonlEntry::from_message(&session_id, &msg);
1749
1750 let json = serde_json::to_string(&entry).unwrap();
1751 assert!(json.contains("\"type\":\"user\""));
1752
1753 let parsed: JsonlEntry = serde_json::from_str(&json).unwrap();
1754 assert!(matches!(parsed, JsonlEntry::User(_)));
1755 }
1756
1757 #[tokio::test]
1758 async fn test_no_duplicate_writes() {
1759 let (persistence, _temp) = create_test_persistence().await;
1760
1761 let mut session = Session::new(SessionConfig::default());
1762 session.add_message(SessionMessage::user(vec![ContentBlock::text("Hello")]));
1763 persistence.save(&session).await.unwrap();
1764 persistence.save(&session).await.unwrap(); persistence.save(&session).await.unwrap(); let file_path = persistence.session_file_path(&session.id, None);
1769 let entries = read_entries_sync(&file_path).unwrap();
1770 let message_count = entries
1771 .iter()
1772 .filter(|e| e.message_uuid().is_some())
1773 .count();
1774 assert_eq!(message_count, 1, "Should not duplicate message entries");
1775 }
1776
1777 #[test]
1778 fn test_windows_path_encoding() {
1779 let config = JsonlConfig::default();
1780 let encoded = config.encode_project_path(Path::new("C:\\Users\\alice\\project"));
1781 assert!(encoded.chars().all(|c| c.is_ascii_hexdigit()));
1782 assert!(!encoded.is_empty());
1783 }
1784}