1use std::collections::{HashMap, HashSet};
25use std::hash::{Hash, Hasher};
26use std::io::{BufRead, BufReader, Write};
27use std::path::{Path, PathBuf};
28use std::sync::Arc;
29
30use chrono::{DateTime, Utc};
31use serde::{Deserialize, Serialize};
32use tokio::sync::RwLock;
33use uuid::Uuid;
34
35use super::state::{
36 MessageId, Session, SessionConfig, SessionId, SessionMessage, SessionMode, SessionState,
37 SessionType,
38};
39use super::types::{
40 CompactRecord, CompactTrigger, EnvironmentContext, Plan, PlanStatus, QueueItem, QueueStatus,
41 SummarySnapshot, TodoItem, TodoStatus,
42};
43use super::{Persistence, SessionError, SessionResult};
44use crate::types::{ContentBlock, Role, TokenUsage};
45
46#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
52pub enum SyncMode {
53 #[default]
55 None,
56 OnWrite,
58}
59
60#[derive(Clone, Debug)]
62pub struct JsonlConfig {
63 pub base_dir: PathBuf,
65 pub retention_days: u32,
67 pub sync_mode: SyncMode,
69}
70
71impl Default for JsonlConfig {
72 fn default() -> Self {
73 Self {
74 base_dir: dirs::home_dir()
75 .unwrap_or_else(|| PathBuf::from("."))
76 .join(".claude"),
77 retention_days: 30,
78 sync_mode: SyncMode::default(),
79 }
80 }
81}
82
83impl JsonlConfig {
84 pub fn builder() -> JsonlConfigBuilder {
85 JsonlConfigBuilder::default()
86 }
87
88 fn projects_dir(&self) -> PathBuf {
89 self.base_dir.join("projects")
90 }
91
92 fn encode_project_path(&self, path: &Path) -> String {
95 path.to_string_lossy()
96 .replace(['/', '\\'], "-")
97 .replace(':', "_") }
99
100 fn project_dir(&self, project_path: &Path) -> PathBuf {
101 self.projects_dir()
102 .join(self.encode_project_path(project_path))
103 }
104}
105
106#[derive(Default)]
108pub struct JsonlConfigBuilder {
109 base_dir: Option<PathBuf>,
110 retention_days: Option<u32>,
111 sync_mode: Option<SyncMode>,
112}
113
114impl JsonlConfigBuilder {
115 pub fn base_dir(mut self, path: impl Into<PathBuf>) -> Self {
116 self.base_dir = Some(path.into());
117 self
118 }
119
120 pub fn retention_days(mut self, days: u32) -> Self {
121 self.retention_days = Some(days);
122 self
123 }
124
125 pub fn sync_mode(mut self, mode: SyncMode) -> Self {
126 self.sync_mode = Some(mode);
127 self
128 }
129
130 pub fn build(self) -> JsonlConfig {
131 let default = JsonlConfig::default();
132 JsonlConfig {
133 base_dir: self.base_dir.unwrap_or(default.base_dir),
134 retention_days: self.retention_days.unwrap_or(default.retention_days),
135 sync_mode: self.sync_mode.unwrap_or(default.sync_mode),
136 }
137 }
138}
139
140#[derive(Clone, Debug, Serialize, Deserialize)]
146#[serde(tag = "type", rename_all = "snake_case")]
147pub enum JsonlEntry {
148 User(UserEntry),
149 Assistant(AssistantEntry),
150 System(SystemEntry),
151 QueueOperation(QueueOperationEntry),
152 Summary(SummaryEntry),
153 SessionMeta(SessionMetaEntry),
154 Todo(TodoEntry),
155 Plan(PlanEntry),
156 Compact(CompactEntry),
157}
158
159#[derive(Clone, Debug, Serialize, Deserialize)]
161pub struct EntryCommon {
162 pub uuid: String,
163 #[serde(rename = "parentUuid", skip_serializing_if = "Option::is_none")]
164 pub parent_uuid: Option<String>,
165 #[serde(rename = "sessionId")]
166 pub session_id: String,
167 pub timestamp: DateTime<Utc>,
168 #[serde(skip_serializing_if = "Option::is_none")]
169 pub cwd: Option<PathBuf>,
170 pub version: String,
171 #[serde(rename = "gitBranch", default)]
172 pub git_branch: String,
173 #[serde(rename = "isSidechain", default)]
174 pub is_sidechain: bool,
175}
176
177impl EntryCommon {
178 fn from_message(session_id: &SessionId, msg: &SessionMessage) -> Self {
179 Self {
180 uuid: msg.id.to_string(),
181 parent_uuid: msg.parent_id.as_ref().map(|id| id.to_string()),
182 session_id: session_id.to_string(),
183 timestamp: msg.timestamp,
184 cwd: msg.environment.as_ref().and_then(|e| e.cwd.clone()),
185 version: env!("CARGO_PKG_VERSION").to_string(),
186 git_branch: msg
187 .environment
188 .as_ref()
189 .and_then(|e| e.git_branch.clone())
190 .unwrap_or_default(),
191 is_sidechain: msg.is_sidechain,
192 }
193 }
194
195 fn to_environment(&self) -> EnvironmentContext {
196 EnvironmentContext {
197 cwd: self.cwd.clone(),
198 git_branch: if self.git_branch.is_empty() {
199 None
200 } else {
201 Some(self.git_branch.clone())
202 },
203 ..Default::default()
204 }
205 }
206}
207
208#[derive(Clone, Debug, Serialize, Deserialize)]
209pub struct UserMessageContent {
210 pub role: String,
211 pub content: serde_json::Value,
212}
213
214#[derive(Clone, Debug, Serialize, Deserialize)]
215pub struct UserEntry {
216 #[serde(flatten)]
217 pub common: EntryCommon,
218 pub message: UserMessageContent,
219 #[serde(rename = "isCompactSummary", default)]
220 pub is_compact_summary: bool,
221}
222
223#[derive(Clone, Debug, Serialize, Deserialize)]
224pub struct AssistantMessageContent {
225 #[serde(skip_serializing_if = "Option::is_none")]
226 pub id: Option<String>,
227 #[serde(skip_serializing_if = "Option::is_none")]
228 pub model: Option<String>,
229 pub role: String,
230 #[serde(skip_serializing_if = "Option::is_none")]
231 pub stop_reason: Option<String>,
232 pub content: serde_json::Value,
233 #[serde(skip_serializing_if = "Option::is_none")]
234 pub usage: Option<UsageInfo>,
235}
236
237#[derive(Clone, Debug, Default, Serialize, Deserialize)]
238pub struct UsageInfo {
239 pub input_tokens: u64,
240 pub output_tokens: u64,
241 #[serde(default)]
242 pub cache_creation_input_tokens: u64,
243 #[serde(default)]
244 pub cache_read_input_tokens: u64,
245}
246
247impl From<&TokenUsage> for UsageInfo {
248 fn from(u: &TokenUsage) -> Self {
249 Self {
250 input_tokens: u.input_tokens,
251 output_tokens: u.output_tokens,
252 cache_creation_input_tokens: u.cache_creation_input_tokens,
253 cache_read_input_tokens: u.cache_read_input_tokens,
254 }
255 }
256}
257
258impl From<&UsageInfo> for TokenUsage {
259 fn from(u: &UsageInfo) -> Self {
260 Self {
261 input_tokens: u.input_tokens,
262 output_tokens: u.output_tokens,
263 cache_creation_input_tokens: u.cache_creation_input_tokens,
264 cache_read_input_tokens: u.cache_read_input_tokens,
265 }
266 }
267}
268
269#[derive(Clone, Debug, Serialize, Deserialize)]
270pub struct AssistantEntry {
271 #[serde(flatten)]
272 pub common: EntryCommon,
273 pub message: AssistantMessageContent,
274 #[serde(rename = "requestId", skip_serializing_if = "Option::is_none")]
275 pub request_id: Option<String>,
276}
277
278#[derive(Clone, Debug, Serialize, Deserialize)]
279pub struct SystemEntry {
280 #[serde(flatten)]
281 pub common: EntryCommon,
282 pub subtype: String,
283 #[serde(skip_serializing_if = "Option::is_none")]
284 pub content: Option<String>,
285}
286
287#[derive(Clone, Debug, Serialize, Deserialize)]
288pub struct QueueOperationEntry {
289 pub operation: String,
290 #[serde(rename = "sessionId")]
291 pub session_id: String,
292 pub timestamp: DateTime<Utc>,
293 pub content: String,
294 pub priority: i32,
295 #[serde(rename = "itemId")]
296 pub item_id: String,
297}
298
299#[derive(Clone, Debug, Serialize, Deserialize)]
300pub struct SummaryEntry {
301 #[serde(rename = "sessionId")]
302 pub session_id: String,
303 pub summary: String,
304 #[serde(rename = "leafUuid", skip_serializing_if = "Option::is_none")]
305 pub leaf_uuid: Option<String>,
306 pub timestamp: DateTime<Utc>,
307}
308
309#[derive(Clone, Debug, Serialize, Deserialize)]
310pub struct SessionMetaEntry {
311 #[serde(rename = "sessionId")]
312 pub session_id: String,
313 #[serde(rename = "parentSessionId", skip_serializing_if = "Option::is_none")]
314 pub parent_session_id: Option<String>,
315 #[serde(rename = "tenantId", skip_serializing_if = "Option::is_none")]
316 pub tenant_id: Option<String>,
317 #[serde(rename = "sessionType")]
318 pub session_type: serde_json::Value,
319 pub mode: String,
320 pub state: String,
321 pub config: serde_json::Value,
322 #[serde(rename = "permissionPolicy")]
323 pub permission_policy: serde_json::Value,
324 #[serde(rename = "totalUsage", default)]
325 pub total_usage: UsageInfo,
326 #[serde(rename = "totalCostUsd", default)]
327 pub total_cost_usd: f64,
328 #[serde(rename = "staticContextHash", skip_serializing_if = "Option::is_none")]
329 pub static_context_hash: Option<String>,
330 #[serde(skip_serializing_if = "Option::is_none")]
331 pub error: Option<String>,
332 #[serde(rename = "createdAt")]
333 pub created_at: DateTime<Utc>,
334 #[serde(rename = "updatedAt")]
335 pub updated_at: DateTime<Utc>,
336 #[serde(rename = "expiresAt", skip_serializing_if = "Option::is_none")]
337 pub expires_at: Option<DateTime<Utc>>,
338}
339
340#[derive(Clone, Debug, Serialize, Deserialize)]
341pub struct TodoEntry {
342 pub id: String,
343 #[serde(rename = "sessionId")]
344 pub session_id: String,
345 pub content: String,
346 #[serde(rename = "activeForm")]
347 pub active_form: String,
348 pub status: String,
349 #[serde(rename = "planId", skip_serializing_if = "Option::is_none")]
350 pub plan_id: Option<String>,
351 #[serde(rename = "createdAt")]
352 pub created_at: DateTime<Utc>,
353 #[serde(rename = "startedAt", skip_serializing_if = "Option::is_none")]
354 pub started_at: Option<DateTime<Utc>>,
355 #[serde(rename = "completedAt", skip_serializing_if = "Option::is_none")]
356 pub completed_at: Option<DateTime<Utc>>,
357}
358
359#[derive(Clone, Debug, Serialize, Deserialize)]
360pub struct PlanEntry {
361 pub id: String,
362 #[serde(rename = "sessionId")]
363 pub session_id: String,
364 #[serde(skip_serializing_if = "Option::is_none")]
365 pub name: Option<String>,
366 pub content: String,
367 pub status: String,
368 #[serde(skip_serializing_if = "Option::is_none")]
369 pub error: Option<String>,
370 #[serde(rename = "createdAt")]
371 pub created_at: DateTime<Utc>,
372 #[serde(rename = "approvedAt", skip_serializing_if = "Option::is_none")]
373 pub approved_at: Option<DateTime<Utc>>,
374 #[serde(rename = "startedAt", skip_serializing_if = "Option::is_none")]
375 pub started_at: Option<DateTime<Utc>>,
376 #[serde(rename = "completedAt", skip_serializing_if = "Option::is_none")]
377 pub completed_at: Option<DateTime<Utc>>,
378}
379
380#[derive(Clone, Debug, Serialize, Deserialize)]
381pub struct CompactEntry {
382 pub id: String,
383 #[serde(rename = "sessionId")]
384 pub session_id: String,
385 pub trigger: String,
386 #[serde(rename = "preTokens")]
387 pub pre_tokens: usize,
388 #[serde(rename = "postTokens")]
389 pub post_tokens: usize,
390 #[serde(rename = "savedTokens")]
391 pub saved_tokens: usize,
392 pub summary: String,
393 #[serde(rename = "originalCount")]
394 pub original_count: usize,
395 #[serde(rename = "newCount")]
396 pub new_count: usize,
397 #[serde(rename = "logicalParentId", skip_serializing_if = "Option::is_none")]
398 pub logical_parent_id: Option<String>,
399 #[serde(rename = "createdAt")]
400 pub created_at: DateTime<Utc>,
401}
402
403impl JsonlEntry {
408 fn from_message(session_id: &SessionId, msg: &SessionMessage) -> Self {
409 let common = EntryCommon::from_message(session_id, msg);
410
411 match msg.role {
412 Role::User => JsonlEntry::User(UserEntry {
413 common,
414 message: UserMessageContent {
415 role: "user".to_string(),
416 content: serde_json::to_value(&msg.content).unwrap_or_default(),
417 },
418 is_compact_summary: msg.is_compact_summary,
419 }),
420 Role::Assistant => JsonlEntry::Assistant(AssistantEntry {
421 common,
422 message: AssistantMessageContent {
423 id: msg.metadata.request_id.clone(),
424 model: msg.metadata.model.clone(),
425 role: "assistant".to_string(),
426 stop_reason: None,
427 content: serde_json::to_value(&msg.content).unwrap_or_default(),
428 usage: msg.usage.as_ref().map(UsageInfo::from),
429 },
430 request_id: msg.metadata.request_id.clone(),
431 }),
432 }
433 }
434
435 fn to_session_message(&self) -> Option<SessionMessage> {
436 match self {
437 JsonlEntry::User(entry) => {
438 let content: Vec<ContentBlock> =
439 serde_json::from_value(entry.message.content.clone()).unwrap_or_default();
440 let mut msg = SessionMessage::user(content);
441 msg.id = MessageId::from_string(&entry.common.uuid);
442 msg.parent_id = entry
443 .common
444 .parent_uuid
445 .as_ref()
446 .map(MessageId::from_string);
447 msg.timestamp = entry.common.timestamp;
448 msg.is_sidechain = entry.common.is_sidechain;
449 msg.is_compact_summary = entry.is_compact_summary;
450 msg.environment = Some(entry.common.to_environment());
451 Some(msg)
452 }
453 JsonlEntry::Assistant(entry) => {
454 let content: Vec<ContentBlock> =
455 serde_json::from_value(entry.message.content.clone()).unwrap_or_default();
456 let mut msg = SessionMessage::assistant(content);
457 msg.id = MessageId::from_string(&entry.common.uuid);
458 msg.parent_id = entry
459 .common
460 .parent_uuid
461 .as_ref()
462 .map(MessageId::from_string);
463 msg.timestamp = entry.common.timestamp;
464 msg.is_sidechain = entry.common.is_sidechain;
465 msg.usage = entry.message.usage.as_ref().map(TokenUsage::from);
466 msg.metadata.model.clone_from(&entry.message.model);
467 msg.metadata.request_id.clone_from(&entry.request_id);
468 msg.environment = Some(entry.common.to_environment());
469 Some(msg)
470 }
471 _ => None,
472 }
473 }
474
475 #[cfg(test)]
476 fn message_uuid(&self) -> Option<&str> {
477 match self {
478 JsonlEntry::User(e) => Some(&e.common.uuid),
479 JsonlEntry::Assistant(e) => Some(&e.common.uuid),
480 _ => None,
481 }
482 }
483}
484
485#[derive(Clone, Debug)]
490struct SessionMeta {
491 path: PathBuf,
492 project_path: Option<PathBuf>,
493 tenant_id: Option<String>,
494 parent_id: Option<SessionId>,
495 updated_at: DateTime<Utc>,
496 persisted_ids: HashSet<String>,
498 todos_hash: u64,
500 plan_hash: u64,
502}
503
504#[derive(Default)]
505struct SessionIndex {
506 sessions: HashMap<SessionId, SessionMeta>,
507 by_project: HashMap<PathBuf, Vec<SessionId>>,
508 by_tenant: HashMap<String, Vec<SessionId>>,
509 by_parent: HashMap<SessionId, Vec<SessionId>>,
510}
511
512impl SessionIndex {
513 fn insert(&mut self, session_id: SessionId, meta: SessionMeta) {
514 self.remove(&session_id);
516
517 if let Some(ref project) = meta.project_path {
518 self.by_project
519 .entry(project.clone())
520 .or_default()
521 .push(session_id);
522 }
523 if let Some(ref tenant) = meta.tenant_id {
524 self.by_tenant
525 .entry(tenant.clone())
526 .or_default()
527 .push(session_id);
528 }
529 if let Some(parent) = meta.parent_id {
530 self.by_parent.entry(parent).or_default().push(session_id);
531 }
532 self.sessions.insert(session_id, meta);
533 }
534
535 fn remove(&mut self, session_id: &SessionId) -> Option<SessionMeta> {
536 let meta = self.sessions.remove(session_id)?;
537
538 if let Some(ref project) = meta.project_path
539 && let Some(ids) = self.by_project.get_mut(project)
540 {
541 ids.retain(|id| id != session_id);
542 }
543 if let Some(ref tenant) = meta.tenant_id
544 && let Some(ids) = self.by_tenant.get_mut(tenant)
545 {
546 ids.retain(|id| id != session_id);
547 }
548 if let Some(parent) = meta.parent_id
549 && let Some(ids) = self.by_parent.get_mut(&parent)
550 {
551 ids.retain(|id| id != session_id);
552 }
553 Some(meta)
554 }
555}
556
557fn read_entries_sync(path: &Path) -> SessionResult<Vec<JsonlEntry>> {
562 if !path.exists() {
563 return Ok(Vec::new());
564 }
565
566 let file = std::fs::File::open(path).map_err(|e| SessionError::Storage {
567 message: format!("Failed to open {}: {}", path.display(), e),
568 })?;
569
570 let reader = BufReader::with_capacity(64 * 1024, file);
571 let mut entries = Vec::with_capacity(128);
572
573 for (line_num, line) in reader.lines().enumerate() {
574 let line = line.map_err(|e| SessionError::Storage {
575 message: format!("Read error at line {}: {}", line_num + 1, e),
576 })?;
577
578 if line.trim().is_empty() {
579 continue;
580 }
581
582 match serde_json::from_str::<JsonlEntry>(&line) {
583 Ok(entry) => entries.push(entry),
584 Err(e) => {
585 tracing::warn!(
586 path = %path.display(),
587 line = line_num + 1,
588 error = %e,
589 "Skipping malformed JSONL entry"
590 );
591 }
592 }
593 }
594
595 Ok(entries)
596}
597
598fn append_entries_sync(path: &Path, entries: &[JsonlEntry], sync: bool) -> SessionResult<()> {
599 if entries.is_empty() {
600 return Ok(());
601 }
602
603 if let Some(parent) = path.parent() {
604 std::fs::create_dir_all(parent).map_err(|e| SessionError::Storage {
605 message: format!("Failed to create directory {}: {}", parent.display(), e),
606 })?;
607 }
608
609 let file = std::fs::OpenOptions::new()
610 .create(true)
611 .append(true)
612 .open(path)
613 .map_err(|e| SessionError::Storage {
614 message: format!("Failed to open {} for writing: {}", path.display(), e),
615 })?;
616
617 let mut writer = std::io::BufWriter::with_capacity(64 * 1024, file);
618
619 for entry in entries {
620 serde_json::to_writer(&mut writer, entry)?;
621 writeln!(writer).map_err(|e| SessionError::Storage {
622 message: format!("Write failed: {}", e),
623 })?;
624 }
625
626 writer.flush().map_err(|e| SessionError::Storage {
627 message: format!("Flush failed: {}", e),
628 })?;
629
630 if sync {
631 writer
632 .into_inner()
633 .map_err(|e| SessionError::Storage {
634 message: format!("Buffer error: {}", e.error()),
635 })?
636 .sync_all()
637 .map_err(|e| SessionError::Storage {
638 message: format!("Sync failed: {}", e),
639 })?;
640 }
641
642 Ok(())
643}
644
645pub struct JsonlPersistence {
650 config: JsonlConfig,
651 index: Arc<RwLock<SessionIndex>>,
652 summaries: Arc<RwLock<HashMap<SessionId, Vec<SummarySnapshot>>>>,
653 queue: Arc<RwLock<HashMap<SessionId, Vec<QueueItem>>>>,
654}
655
656impl JsonlPersistence {
657 pub async fn new(config: JsonlConfig) -> SessionResult<Self> {
658 tokio::fs::create_dir_all(config.projects_dir())
659 .await
660 .map_err(|e| SessionError::Storage {
661 message: format!("Failed to create projects directory: {}", e),
662 })?;
663
664 let persistence = Self {
665 config,
666 index: Arc::new(RwLock::new(SessionIndex::default())),
667 summaries: Arc::new(RwLock::new(HashMap::new())),
668 queue: Arc::new(RwLock::new(HashMap::new())),
669 };
670
671 persistence.rebuild_index().await?;
672 Ok(persistence)
673 }
674
675 pub async fn default_config() -> SessionResult<Self> {
676 Self::new(JsonlConfig::default()).await
677 }
678
679 async fn rebuild_index(&self) -> SessionResult<()> {
680 let projects_dir = self.config.projects_dir();
681 if !projects_dir.exists() {
682 return Ok(());
683 }
684
685 let mut index = self.index.write().await;
686 let mut summaries = self.summaries.write().await;
687
688 let mut entries =
689 tokio::fs::read_dir(&projects_dir)
690 .await
691 .map_err(|e| SessionError::Storage {
692 message: format!("Failed to read projects dir: {}", e),
693 })?;
694
695 while let Some(project_entry) =
696 entries
697 .next_entry()
698 .await
699 .map_err(|e| SessionError::Storage {
700 message: format!("Failed to read entry: {}", e),
701 })?
702 {
703 let file_type = project_entry.file_type().await.ok();
704 if !file_type.map(|t| t.is_dir()).unwrap_or(false) {
705 continue;
706 }
707
708 let project_path = project_entry.path();
709 let mut files =
710 tokio::fs::read_dir(&project_path)
711 .await
712 .map_err(|e| SessionError::Storage {
713 message: format!("Failed to read project dir: {}", e),
714 })?;
715
716 while let Some(file_entry) =
717 files
718 .next_entry()
719 .await
720 .map_err(|e| SessionError::Storage {
721 message: format!("Failed to read file entry: {}", e),
722 })?
723 {
724 let file_path = file_entry.path();
725 if file_path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
726 continue;
727 }
728
729 let session_id = match file_path
730 .file_stem()
731 .and_then(|s| s.to_str())
732 .and_then(SessionId::parse)
733 {
734 Some(id) => id,
735 None => continue,
736 };
737
738 let path_clone = file_path.clone();
740 let parsed = tokio::task::spawn_blocking(move || read_entries_sync(&path_clone))
741 .await
742 .map_err(|e| SessionError::Storage {
743 message: format!("Task join error: {}", e),
744 })??;
745
746 let (meta, session_summaries) =
747 Self::parse_file_metadata(session_id, file_path, &parsed);
748
749 index.insert(session_id, meta);
750 if !session_summaries.is_empty() {
751 summaries.insert(session_id, session_summaries);
752 }
753 }
754 }
755
756 Ok(())
757 }
758
759 fn parse_file_metadata(
760 session_id: SessionId,
761 path: PathBuf,
762 entries: &[JsonlEntry],
763 ) -> (SessionMeta, Vec<SummarySnapshot>) {
764 let mut project_path: Option<PathBuf> = None;
765 let mut tenant_id: Option<String> = None;
766 let mut parent_id: Option<SessionId> = None;
767 let mut updated_at = Utc::now();
768 let mut summaries = Vec::new();
769 let mut persisted_ids = HashSet::with_capacity(entries.len());
770
771 for entry in entries {
772 match entry {
773 JsonlEntry::User(e) => {
774 if project_path.is_none() {
775 project_path = e.common.cwd.clone();
776 }
777 updated_at = e.common.timestamp;
778 persisted_ids.insert(e.common.uuid.clone());
779 }
780 JsonlEntry::Assistant(e) => {
781 if project_path.is_none() {
782 project_path = e.common.cwd.clone();
783 }
784 updated_at = e.common.timestamp;
785 persisted_ids.insert(e.common.uuid.clone());
786 }
787 JsonlEntry::SessionMeta(m) => {
788 tenant_id.clone_from(&m.tenant_id);
789 parent_id = m
790 .parent_session_id
791 .as_ref()
792 .and_then(|s| SessionId::parse(s));
793 updated_at = m.updated_at;
794 }
795 JsonlEntry::Summary(s) => {
796 summaries.push(SummarySnapshot {
797 id: Uuid::new_v4(),
798 session_id,
799 summary: s.summary.clone(),
800 leaf_message_id: s.leaf_uuid.as_ref().map(MessageId::from_string),
801 created_at: s.timestamp,
802 });
803 }
804 _ => {}
805 }
806 }
807
808 (
809 SessionMeta {
810 path,
811 project_path,
812 tenant_id,
813 parent_id,
814 updated_at,
815 persisted_ids,
816 todos_hash: 0, plan_hash: 0, },
819 summaries,
820 )
821 }
822
823 fn session_file_path(&self, session_id: &SessionId, project_path: Option<&Path>) -> PathBuf {
824 let dir = match project_path {
825 Some(p) => self.config.project_dir(p),
826 None => self.config.projects_dir().join("_default"),
827 };
828 dir.join(format!("{}.jsonl", session_id))
829 }
830
831 fn get_project_path(session: &Session) -> Option<PathBuf> {
832 session
833 .messages
834 .first()
835 .and_then(|m| m.environment.as_ref())
836 .and_then(|e| e.cwd.clone())
837 }
838
839 fn compute_todos_hash(todos: &[TodoItem]) -> u64 {
841 let mut hasher = std::collections::hash_map::DefaultHasher::new();
842 for todo in todos {
843 todo.id.hash(&mut hasher);
844 format!("{:?}", todo.status).hash(&mut hasher);
845 todo.content.hash(&mut hasher);
846 }
847 hasher.finish()
848 }
849
850 fn compute_plan_hash(plan: Option<&Plan>) -> u64 {
852 let mut hasher = std::collections::hash_map::DefaultHasher::new();
853 if let Some(p) = plan {
854 p.id.hash(&mut hasher);
855 format!("{:?}", p.status).hash(&mut hasher);
856 p.content.hash(&mut hasher);
857 }
858 hasher.finish()
859 }
860
861 fn session_to_meta_entry(session: &Session) -> JsonlEntry {
862 JsonlEntry::SessionMeta(SessionMetaEntry {
863 session_id: session.id.to_string(),
864 parent_session_id: session.parent_id.map(|p| p.to_string()),
865 tenant_id: session.tenant_id.clone(),
866 session_type: serde_json::to_value(&session.session_type).unwrap_or_default(),
867 mode: format!("{:?}", session.mode).to_lowercase(),
868 state: format!("{:?}", session.state).to_lowercase(),
869 config: serde_json::to_value(&session.config).unwrap_or_default(),
870 permission_policy: serde_json::to_value(&session.permission_policy).unwrap_or_default(),
871 total_usage: UsageInfo::from(&session.total_usage),
872 total_cost_usd: session.total_cost_usd,
873 static_context_hash: session.static_context_hash.clone(),
874 error: session.error.clone(),
875 created_at: session.created_at,
876 updated_at: session.updated_at,
877 expires_at: session.expires_at,
878 })
879 }
880
881 fn reconstruct_session(session_id: SessionId, entries: Vec<JsonlEntry>) -> Session {
882 let mut session = Session::new(SessionConfig::default());
883 session.id = session_id;
884
885 let mut messages: HashMap<String, SessionMessage> = HashMap::with_capacity(entries.len());
886 let mut todos_map: HashMap<String, TodoItem> = HashMap::new();
887 let mut latest_plan: Option<Plan> = None;
888 let mut compacts: Vec<CompactRecord> = Vec::new();
889
890 for entry in entries {
891 match entry {
892 JsonlEntry::User(_) | JsonlEntry::Assistant(_) => {
893 if let Some(msg) = entry.to_session_message() {
894 messages.insert(msg.id.to_string(), msg);
895 }
896 }
897 JsonlEntry::SessionMeta(m) => {
898 session.tenant_id = m.tenant_id;
899 session.parent_id = m
900 .parent_session_id
901 .as_ref()
902 .and_then(|s| SessionId::parse(s));
903 session.session_type =
904 serde_json::from_value(m.session_type).unwrap_or(SessionType::Main);
905 session.mode = serde_json::from_str(&format!("\"{}\"", m.mode))
906 .unwrap_or(SessionMode::default());
907 session.state = SessionState::from_str_lenient(&m.state);
908 session.config = serde_json::from_value(m.config).unwrap_or_default();
909 session.permission_policy =
910 serde_json::from_value(m.permission_policy).unwrap_or_default();
911 session.total_usage = TokenUsage::from(&m.total_usage);
912 session.total_cost_usd = m.total_cost_usd;
913 session.static_context_hash = m.static_context_hash;
914 session.error = m.error;
915 session.created_at = m.created_at;
916 session.updated_at = m.updated_at;
917 session.expires_at = m.expires_at;
918 }
919 JsonlEntry::Summary(s) => {
920 session.summary = Some(s.summary);
921 }
922 JsonlEntry::Todo(t) => {
923 let todo = TodoItem {
924 id: Uuid::parse_str(&t.id).unwrap_or_else(|_| Uuid::new_v4()),
925 session_id,
926 content: t.content,
927 active_form: t.active_form,
928 status: TodoStatus::from_str_lenient(&t.status),
929 plan_id: t.plan_id.and_then(|s| Uuid::parse_str(&s).ok()),
930 created_at: t.created_at,
931 started_at: t.started_at,
932 completed_at: t.completed_at,
933 };
934 todos_map.insert(t.id, todo);
936 }
937 JsonlEntry::Plan(p) => {
938 let plan = Plan {
939 id: Uuid::parse_str(&p.id).unwrap_or_else(|_| Uuid::new_v4()),
940 session_id,
941 name: p.name,
942 content: p.content,
943 status: PlanStatus::from_str_lenient(&p.status),
944 error: p.error,
945 created_at: p.created_at,
946 approved_at: p.approved_at,
947 started_at: p.started_at,
948 completed_at: p.completed_at,
949 };
950 latest_plan = Some(plan);
952 }
953 JsonlEntry::Compact(c) => {
954 compacts.push(CompactRecord {
955 id: Uuid::parse_str(&c.id).unwrap_or_else(|_| Uuid::new_v4()),
956 session_id,
957 trigger: CompactTrigger::from_str_lenient(&c.trigger),
958 pre_tokens: c.pre_tokens,
959 post_tokens: c.post_tokens,
960 saved_tokens: c.saved_tokens,
961 summary: c.summary,
962 original_count: c.original_count,
963 new_count: c.new_count,
964 logical_parent_id: c.logical_parent_id.as_ref().map(MessageId::from_string),
965 created_at: c.created_at,
966 });
967 }
968 _ => {}
969 }
970 }
971
972 let ordered = Self::topological_sort(&messages);
974 session.messages = Vec::with_capacity(ordered.len());
975 for msg in ordered {
976 session.add_message(msg);
977 }
978
979 session.todos = todos_map.into_values().collect();
981 session
982 .todos
983 .sort_by(|a, b| a.created_at.cmp(&b.created_at));
984 session.current_plan = latest_plan;
985 session.compact_history = compacts;
986
987 session
988 }
989
990 fn topological_sort(messages: &HashMap<String, SessionMessage>) -> Vec<SessionMessage> {
991 if messages.is_empty() {
992 return Vec::new();
993 }
994
995 let mut children: HashMap<Option<String>, Vec<&SessionMessage>> = HashMap::new();
997 for msg in messages.values() {
998 children
999 .entry(msg.parent_id.as_ref().map(|p| p.to_string()))
1000 .or_default()
1001 .push(msg);
1002 }
1003
1004 for group in children.values_mut() {
1006 group.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
1007 }
1008
1009 let mut result = Vec::with_capacity(messages.len());
1011 let mut queue = std::collections::VecDeque::new();
1012
1013 if let Some(roots) = children.remove(&None) {
1015 queue.extend(roots);
1016 }
1017
1018 while let Some(msg) = queue.pop_front() {
1019 let id = msg.id.to_string();
1020 result.push(msg.clone());
1021 if let Some(child_msgs) = children.remove(&Some(id)) {
1022 queue.extend(child_msgs);
1023 }
1024 }
1025
1026 result
1027 }
1028}
1029
1030#[async_trait::async_trait]
1031impl Persistence for JsonlPersistence {
1032 fn name(&self) -> &str {
1033 "jsonl"
1034 }
1035
1036 async fn save(&self, session: &Session) -> SessionResult<()> {
1037 let project_path = Self::get_project_path(session);
1038 let file_path = self.session_file_path(&session.id, project_path.as_deref());
1039
1040 let (persisted_ids, prev_todos_hash, prev_plan_hash) = {
1042 let index = self.index.read().await;
1043 match index.sessions.get(&session.id) {
1044 Some(m) => (m.persisted_ids.clone(), m.todos_hash, m.plan_hash),
1045 None => (HashSet::new(), 0, 0),
1046 }
1047 };
1048
1049 let mut new_entries = Vec::new();
1050 let mut new_ids = HashSet::new();
1051 let is_first_save = persisted_ids.is_empty();
1052
1053 if is_first_save {
1055 new_entries.push(Self::session_to_meta_entry(session));
1056 }
1057
1058 for msg in &session.messages {
1060 let id = msg.id.to_string();
1061 if !persisted_ids.contains(&id) {
1062 new_entries.push(JsonlEntry::from_message(&session.id, msg));
1063 new_ids.insert(id);
1064 }
1065 }
1066
1067 let current_todos_hash = Self::compute_todos_hash(&session.todos);
1069 let current_plan_hash = Self::compute_plan_hash(session.current_plan.as_ref());
1070
1071 if current_todos_hash != prev_todos_hash {
1073 for todo in &session.todos {
1074 new_entries.push(JsonlEntry::Todo(TodoEntry {
1075 id: todo.id.to_string(),
1076 session_id: session.id.to_string(),
1077 content: todo.content.clone(),
1078 active_form: todo.active_form.clone(),
1079 status: format!("{:?}", todo.status).to_lowercase(),
1080 plan_id: todo.plan_id.map(|id| id.to_string()),
1081 created_at: todo.created_at,
1082 started_at: todo.started_at,
1083 completed_at: todo.completed_at,
1084 }));
1085 }
1086 }
1087
1088 if current_plan_hash != prev_plan_hash
1090 && let Some(ref plan) = session.current_plan
1091 {
1092 new_entries.push(JsonlEntry::Plan(PlanEntry {
1093 id: plan.id.to_string(),
1094 session_id: session.id.to_string(),
1095 name: plan.name.clone(),
1096 content: plan.content.clone(),
1097 status: format!("{:?}", plan.status).to_lowercase(),
1098 error: plan.error.clone(),
1099 created_at: plan.created_at,
1100 approved_at: plan.approved_at,
1101 started_at: plan.started_at,
1102 completed_at: plan.completed_at,
1103 }));
1104 }
1105
1106 for compact in &session.compact_history {
1108 let compact_id = format!("compact:{}", compact.id);
1109 if !persisted_ids.contains(&compact_id) {
1110 new_entries.push(JsonlEntry::Compact(CompactEntry {
1111 id: compact.id.to_string(),
1112 session_id: session.id.to_string(),
1113 trigger: format!("{:?}", compact.trigger).to_lowercase(),
1114 pre_tokens: compact.pre_tokens,
1115 post_tokens: compact.post_tokens,
1116 saved_tokens: compact.saved_tokens,
1117 summary: compact.summary.clone(),
1118 original_count: compact.original_count,
1119 new_count: compact.new_count,
1120 logical_parent_id: compact.logical_parent_id.as_ref().map(|id| id.to_string()),
1121 created_at: compact.created_at,
1122 }));
1123 new_ids.insert(compact_id);
1124 }
1125 }
1126
1127 if new_entries.is_empty() {
1128 return Ok(());
1129 }
1130
1131 let path_clone = file_path.clone();
1133 let sync = self.config.sync_mode == SyncMode::OnWrite;
1134 tokio::task::spawn_blocking(move || append_entries_sync(&path_clone, &new_entries, sync))
1135 .await
1136 .map_err(|e| SessionError::Storage {
1137 message: format!("Task join error: {}", e),
1138 })??;
1139
1140 let mut index = self.index.write().await;
1142 let mut persisted = persisted_ids;
1143 persisted.extend(new_ids);
1144
1145 index.insert(
1146 session.id,
1147 SessionMeta {
1148 path: file_path,
1149 project_path,
1150 tenant_id: session.tenant_id.clone(),
1151 parent_id: session.parent_id,
1152 updated_at: session.updated_at,
1153 persisted_ids: persisted,
1154 todos_hash: current_todos_hash,
1155 plan_hash: current_plan_hash,
1156 },
1157 );
1158
1159 Ok(())
1160 }
1161
1162 async fn load(&self, id: &SessionId) -> SessionResult<Option<Session>> {
1163 let path = {
1164 let index = self.index.read().await;
1165 match index.sessions.get(id) {
1166 Some(m) => m.path.clone(),
1167 None => return Ok(None),
1168 }
1169 };
1170
1171 let entries = tokio::task::spawn_blocking(move || read_entries_sync(&path))
1172 .await
1173 .map_err(|e| SessionError::Storage {
1174 message: format!("Task join error: {}", e),
1175 })??;
1176
1177 if entries.is_empty() {
1178 return Ok(None);
1179 }
1180
1181 let session = Self::reconstruct_session(*id, entries);
1182 Ok(Some(session))
1183 }
1184
1185 async fn delete(&self, id: &SessionId) -> SessionResult<bool> {
1186 let meta = {
1187 let mut index = self.index.write().await;
1188 index.remove(id)
1189 };
1190
1191 let Some(meta) = meta else {
1192 return Ok(false);
1193 };
1194
1195 if meta.path.exists() {
1196 tokio::fs::remove_file(&meta.path)
1197 .await
1198 .map_err(|e| SessionError::Storage {
1199 message: format!("Failed to delete {}: {}", meta.path.display(), e),
1200 })?;
1201 }
1202
1203 self.summaries.write().await.remove(id);
1204 self.queue.write().await.remove(id);
1205 Ok(true)
1206 }
1207
1208 async fn list(&self, tenant_id: Option<&str>) -> SessionResult<Vec<SessionId>> {
1209 let index = self.index.read().await;
1210 Ok(match tenant_id {
1211 Some(tid) => index.by_tenant.get(tid).cloned().unwrap_or_default(),
1212 None => index.sessions.keys().copied().collect(),
1213 })
1214 }
1215
1216 async fn list_children(&self, parent_id: &SessionId) -> SessionResult<Vec<SessionId>> {
1217 let index = self.index.read().await;
1218 Ok(index.by_parent.get(parent_id).cloned().unwrap_or_default())
1219 }
1220
1221 async fn add_summary(&self, snapshot: SummarySnapshot) -> SessionResult<()> {
1222 let path = {
1223 let index = self.index.read().await;
1224 index
1225 .sessions
1226 .get(&snapshot.session_id)
1227 .map(|m| m.path.clone())
1228 };
1229
1230 if let Some(path) = path {
1231 let entry = JsonlEntry::Summary(SummaryEntry {
1232 session_id: snapshot.session_id.to_string(),
1233 summary: snapshot.summary.clone(),
1234 leaf_uuid: snapshot.leaf_message_id.as_ref().map(|id| id.to_string()),
1235 timestamp: snapshot.created_at,
1236 });
1237
1238 let sync = self.config.sync_mode == SyncMode::OnWrite;
1239 tokio::task::spawn_blocking(move || append_entries_sync(&path, &[entry], sync))
1240 .await
1241 .map_err(|e| SessionError::Storage {
1242 message: format!("Task join error: {}", e),
1243 })??;
1244 }
1245
1246 self.summaries
1247 .write()
1248 .await
1249 .entry(snapshot.session_id)
1250 .or_default()
1251 .push(snapshot);
1252
1253 Ok(())
1254 }
1255
1256 async fn get_summaries(&self, session_id: &SessionId) -> SessionResult<Vec<SummarySnapshot>> {
1257 Ok(self
1258 .summaries
1259 .read()
1260 .await
1261 .get(session_id)
1262 .cloned()
1263 .unwrap_or_default())
1264 }
1265
1266 async fn enqueue(
1267 &self,
1268 session_id: &SessionId,
1269 content: String,
1270 priority: i32,
1271 ) -> SessionResult<QueueItem> {
1272 let item = QueueItem::enqueue(*session_id, content.clone()).with_priority(priority);
1273
1274 let path = {
1275 let index = self.index.read().await;
1276 index.sessions.get(session_id).map(|m| m.path.clone())
1277 };
1278
1279 if let Some(path) = path {
1280 let entry = JsonlEntry::QueueOperation(QueueOperationEntry {
1281 operation: "enqueue".to_string(),
1282 session_id: session_id.to_string(),
1283 timestamp: Utc::now(),
1284 content,
1285 priority,
1286 item_id: item.id.to_string(),
1287 });
1288
1289 let sync = self.config.sync_mode == SyncMode::OnWrite;
1290 tokio::task::spawn_blocking(move || append_entries_sync(&path, &[entry], sync))
1291 .await
1292 .map_err(|e| SessionError::Storage {
1293 message: format!("Task join error: {}", e),
1294 })??;
1295 }
1296
1297 self.queue
1298 .write()
1299 .await
1300 .entry(*session_id)
1301 .or_default()
1302 .push(item.clone());
1303
1304 Ok(item)
1305 }
1306
1307 async fn dequeue(&self, session_id: &SessionId) -> SessionResult<Option<QueueItem>> {
1308 let mut queue = self.queue.write().await;
1309 let items = match queue.get_mut(session_id) {
1310 Some(items) => items,
1311 None => return Ok(None),
1312 };
1313
1314 items.sort_by(|a, b| b.priority.cmp(&a.priority));
1315
1316 for item in items.iter_mut() {
1317 if item.status == QueueStatus::Pending {
1318 item.start_processing();
1319 return Ok(Some(item.clone()));
1320 }
1321 }
1322
1323 Ok(None)
1324 }
1325
1326 async fn cancel_queued(&self, item_id: Uuid) -> SessionResult<bool> {
1327 let mut queue = self.queue.write().await;
1328 for items in queue.values_mut() {
1329 if let Some(item) = items.iter_mut().find(|i| i.id == item_id) {
1330 item.cancel();
1331 return Ok(true);
1332 }
1333 }
1334 Ok(false)
1335 }
1336
1337 async fn pending_queue(&self, session_id: &SessionId) -> SessionResult<Vec<QueueItem>> {
1338 Ok(self
1339 .queue
1340 .read()
1341 .await
1342 .get(session_id)
1343 .map(|items| {
1344 items
1345 .iter()
1346 .filter(|i| i.status == QueueStatus::Pending)
1347 .cloned()
1348 .collect()
1349 })
1350 .unwrap_or_default())
1351 }
1352
1353 async fn cleanup_expired(&self) -> SessionResult<usize> {
1354 let cutoff = Utc::now() - chrono::Duration::days(self.config.retention_days as i64);
1355
1356 let expired_paths: Vec<PathBuf> = {
1358 let mut index = self.index.write().await;
1359 let expired_ids: Vec<SessionId> = index
1360 .sessions
1361 .iter()
1362 .filter(|(_, m)| m.updated_at < cutoff)
1363 .map(|(id, _)| *id)
1364 .collect();
1365
1366 let mut paths = Vec::with_capacity(expired_ids.len());
1367 for id in &expired_ids {
1368 if let Some(meta) = index.remove(id) {
1369 paths.push(meta.path);
1370 }
1371 }
1372 paths
1373 };
1374
1375 let count = expired_paths.len();
1376
1377 for path in expired_paths {
1379 let _ = tokio::fs::remove_file(&path).await;
1380 }
1381
1382 Ok(count)
1383 }
1384}
1385
1386#[cfg(test)]
1391mod tests {
1392 use super::*;
1393 use crate::types::ContentBlock;
1394 use tempfile::TempDir;
1395
1396 async fn create_test_persistence() -> (JsonlPersistence, TempDir) {
1397 let temp_dir = TempDir::new().unwrap();
1398 let config = JsonlConfig::builder()
1399 .base_dir(temp_dir.path().to_path_buf())
1400 .build();
1401 let persistence = JsonlPersistence::new(config).await.unwrap();
1402 (persistence, temp_dir)
1403 }
1404
1405 #[tokio::test]
1406 async fn test_save_and_load_session() {
1407 let (persistence, _temp) = create_test_persistence().await;
1408
1409 let mut session = Session::new(SessionConfig::default());
1410 session.add_message(SessionMessage::user(vec![ContentBlock::text("Hello")]));
1411 session.add_message(SessionMessage::assistant(vec![ContentBlock::text(
1412 "Hi there!",
1413 )]));
1414
1415 persistence.save(&session).await.unwrap();
1416
1417 let loaded = persistence.load(&session.id).await.unwrap().unwrap();
1418 assert_eq!(loaded.id, session.id);
1419 assert_eq!(loaded.messages.len(), 2);
1420 }
1421
1422 #[tokio::test]
1423 async fn test_incremental_save() {
1424 let (persistence, _temp) = create_test_persistence().await;
1425
1426 let mut session = Session::new(SessionConfig::default());
1427 session.add_message(SessionMessage::user(vec![ContentBlock::text("First")]));
1428 persistence.save(&session).await.unwrap();
1429
1430 session.add_message(SessionMessage::assistant(vec![ContentBlock::text(
1431 "Second",
1432 )]));
1433 persistence.save(&session).await.unwrap();
1434
1435 let loaded = persistence.load(&session.id).await.unwrap().unwrap();
1436 assert_eq!(loaded.messages.len(), 2);
1437 }
1438
1439 #[tokio::test]
1440 async fn test_delete_session() {
1441 let (persistence, _temp) = create_test_persistence().await;
1442
1443 let session = Session::new(SessionConfig::default());
1444 persistence.save(&session).await.unwrap();
1445
1446 assert!(persistence.delete(&session.id).await.unwrap());
1447 assert!(persistence.load(&session.id).await.unwrap().is_none());
1448 }
1449
1450 #[tokio::test]
1451 async fn test_list_sessions() {
1452 let (persistence, _temp) = create_test_persistence().await;
1453
1454 let s1 = Session::new(SessionConfig::default());
1455 let s2 = Session::new(SessionConfig::default());
1456
1457 persistence.save(&s1).await.unwrap();
1458 persistence.save(&s2).await.unwrap();
1459
1460 let list = persistence.list(None).await.unwrap();
1461 assert_eq!(list.len(), 2);
1462 }
1463
1464 #[tokio::test]
1465 async fn test_tenant_filtering() {
1466 let (persistence, _temp) = create_test_persistence().await;
1467
1468 let mut s1 = Session::new(SessionConfig::default());
1469 s1.tenant_id = Some("tenant-a".to_string());
1470
1471 let mut s2 = Session::new(SessionConfig::default());
1472 s2.tenant_id = Some("tenant-b".to_string());
1473
1474 persistence.save(&s1).await.unwrap();
1475 persistence.save(&s2).await.unwrap();
1476
1477 let list = persistence.list(Some("tenant-a")).await.unwrap();
1478 assert_eq!(list.len(), 1);
1479 assert_eq!(list[0], s1.id);
1480 }
1481
1482 #[tokio::test]
1483 async fn test_summaries() {
1484 let (persistence, _temp) = create_test_persistence().await;
1485
1486 let session = Session::new(SessionConfig::default());
1487 persistence.save(&session).await.unwrap();
1488
1489 persistence
1490 .add_summary(SummarySnapshot::new(session.id, "Summary 1"))
1491 .await
1492 .unwrap();
1493 persistence
1494 .add_summary(SummarySnapshot::new(session.id, "Summary 2"))
1495 .await
1496 .unwrap();
1497
1498 let summaries = persistence.get_summaries(&session.id).await.unwrap();
1499 assert_eq!(summaries.len(), 2);
1500 }
1501
1502 #[tokio::test]
1503 async fn test_queue_operations() {
1504 let (persistence, _temp) = create_test_persistence().await;
1505
1506 let session = Session::new(SessionConfig::default());
1507 persistence.save(&session).await.unwrap();
1508
1509 persistence
1510 .enqueue(&session.id, "Low priority".to_string(), 1)
1511 .await
1512 .unwrap();
1513 persistence
1514 .enqueue(&session.id, "High priority".to_string(), 10)
1515 .await
1516 .unwrap();
1517
1518 let next = persistence.dequeue(&session.id).await.unwrap().unwrap();
1519 assert_eq!(next.content, "High priority");
1520 }
1521
1522 #[tokio::test]
1523 async fn test_dag_reconstruction() {
1524 let (persistence, _temp) = create_test_persistence().await;
1525
1526 let mut session = Session::new(SessionConfig::default());
1527 session.add_message(SessionMessage::user(vec![ContentBlock::text("Q1")]));
1528 session.add_message(SessionMessage::assistant(vec![ContentBlock::text("A1")]));
1529 session.add_message(SessionMessage::user(vec![ContentBlock::text("Q2")]));
1530 session.add_message(SessionMessage::assistant(vec![ContentBlock::text("A2")]));
1531
1532 persistence.save(&session).await.unwrap();
1533
1534 let loaded = persistence.load(&session.id).await.unwrap().unwrap();
1535
1536 assert_eq!(loaded.messages.len(), 4);
1537 assert!(
1538 loaded.messages[0]
1539 .content
1540 .iter()
1541 .any(|c| c.as_text() == Some("Q1"))
1542 );
1543 assert!(
1544 loaded.messages[1]
1545 .content
1546 .iter()
1547 .any(|c| c.as_text() == Some("A1"))
1548 );
1549 assert!(
1550 loaded.messages[2]
1551 .content
1552 .iter()
1553 .any(|c| c.as_text() == Some("Q2"))
1554 );
1555 assert!(
1556 loaded.messages[3]
1557 .content
1558 .iter()
1559 .any(|c| c.as_text() == Some("A2"))
1560 );
1561 }
1562
1563 #[tokio::test]
1564 async fn test_project_path_encoding() {
1565 let config = JsonlConfig::default();
1566
1567 assert_eq!(
1568 config.encode_project_path(Path::new("/home/user/project")),
1569 "-home-user-project"
1570 );
1571 assert_eq!(
1572 config.encode_project_path(Path::new("/Users/alice/work/app")),
1573 "-Users-alice-work-app"
1574 );
1575 }
1576
1577 #[test]
1578 fn test_jsonl_entry_serialization() {
1579 let msg = SessionMessage::user(vec![ContentBlock::text("Hello")]);
1580 let session_id = SessionId::new();
1581 let entry = JsonlEntry::from_message(&session_id, &msg);
1582
1583 let json = serde_json::to_string(&entry).unwrap();
1584 assert!(json.contains("\"type\":\"user\""));
1585
1586 let parsed: JsonlEntry = serde_json::from_str(&json).unwrap();
1587 assert!(matches!(parsed, JsonlEntry::User(_)));
1588 }
1589
1590 #[tokio::test]
1591 async fn test_no_duplicate_writes() {
1592 let (persistence, _temp) = create_test_persistence().await;
1593
1594 let mut session = Session::new(SessionConfig::default());
1595 session.add_message(SessionMessage::user(vec![ContentBlock::text("Hello")]));
1596 persistence.save(&session).await.unwrap();
1597 persistence.save(&session).await.unwrap(); persistence.save(&session).await.unwrap(); let file_path = persistence.session_file_path(&session.id, None);
1602 let entries = read_entries_sync(&file_path).unwrap();
1603 let message_count = entries
1604 .iter()
1605 .filter(|e| e.message_uuid().is_some())
1606 .count();
1607 assert_eq!(message_count, 1, "Should not duplicate message entries");
1608 }
1609
1610 #[test]
1611 fn test_windows_path_encoding() {
1612 let config = JsonlConfig::default();
1613 assert_eq!(
1615 config.encode_project_path(Path::new("C:\\Users\\alice\\project")),
1616 "C_-Users-alice-project"
1617 );
1618 }
1619}