1use std::collections::{HashMap, HashSet};
25use std::hash::{Hash, Hasher};
26use std::io::{BufRead, BufReader, Write};
27use std::path::{Path, PathBuf};
28use std::sync::Arc;
29
30use chrono::{DateTime, Utc};
31use serde::{Deserialize, Serialize};
32use tokio::sync::RwLock;
33use uuid::Uuid;
34
35use super::state::{
36 MessageId, Session, SessionConfig, SessionId, SessionMessage, SessionMode, SessionState,
37 SessionType,
38};
39use super::types::{
40 CompactRecord, CompactTrigger, EnvironmentContext, Plan, PlanStatus, QueueItem, QueueStatus,
41 SummarySnapshot, TodoItem, TodoStatus,
42};
43use super::{Persistence, SessionError, SessionResult};
44use crate::types::{ContentBlock, Role, TokenUsage};
45
46#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
52pub enum SyncMode {
53 #[default]
55 None,
56 OnWrite,
58}
59
60#[derive(Clone, Debug)]
62pub struct JsonlConfig {
63 pub base_dir: PathBuf,
65 pub retention_days: u32,
67 pub sync_mode: SyncMode,
69}
70
71impl Default for JsonlConfig {
72 fn default() -> Self {
73 Self {
74 base_dir: dirs::home_dir()
75 .unwrap_or_else(|| PathBuf::from("."))
76 .join(".claude"),
77 retention_days: 30,
78 sync_mode: SyncMode::default(),
79 }
80 }
81}
82
83impl JsonlConfig {
84 pub fn builder() -> JsonlConfigBuilder {
85 JsonlConfigBuilder::default()
86 }
87
88 fn projects_dir(&self) -> PathBuf {
89 self.base_dir.join("projects")
90 }
91
92 fn encode_project_path(&self, path: &Path) -> String {
95 path.to_string_lossy()
96 .replace(['/', '\\'], "-")
97 .replace(':', "_") }
99
100 fn project_dir(&self, project_path: &Path) -> PathBuf {
101 self.projects_dir()
102 .join(self.encode_project_path(project_path))
103 }
104}
105
106#[derive(Default)]
108pub struct JsonlConfigBuilder {
109 base_dir: Option<PathBuf>,
110 retention_days: Option<u32>,
111 sync_mode: Option<SyncMode>,
112}
113
114impl JsonlConfigBuilder {
115 pub fn base_dir(mut self, path: impl Into<PathBuf>) -> Self {
116 self.base_dir = Some(path.into());
117 self
118 }
119
120 pub fn retention_days(mut self, days: u32) -> Self {
121 self.retention_days = Some(days);
122 self
123 }
124
125 pub fn sync_mode(mut self, mode: SyncMode) -> Self {
126 self.sync_mode = Some(mode);
127 self
128 }
129
130 pub fn build(self) -> JsonlConfig {
131 let default = JsonlConfig::default();
132 JsonlConfig {
133 base_dir: self.base_dir.unwrap_or(default.base_dir),
134 retention_days: self.retention_days.unwrap_or(default.retention_days),
135 sync_mode: self.sync_mode.unwrap_or(default.sync_mode),
136 }
137 }
138}
139
140#[derive(Clone, Debug, Serialize, Deserialize)]
146#[serde(tag = "type", rename_all = "snake_case")]
147pub enum JsonlEntry {
148 User(UserEntry),
149 Assistant(AssistantEntry),
150 System(SystemEntry),
151 QueueOperation(QueueOperationEntry),
152 Summary(SummaryEntry),
153 SessionMeta(SessionMetaEntry),
154 Todo(TodoEntry),
155 Plan(PlanEntry),
156 Compact(CompactEntry),
157}
158
159#[derive(Clone, Debug, Serialize, Deserialize)]
161pub struct EntryCommon {
162 pub uuid: String,
163 #[serde(rename = "parentUuid", skip_serializing_if = "Option::is_none")]
164 pub parent_uuid: Option<String>,
165 #[serde(rename = "sessionId")]
166 pub session_id: String,
167 pub timestamp: DateTime<Utc>,
168 #[serde(skip_serializing_if = "Option::is_none")]
169 pub cwd: Option<PathBuf>,
170 pub version: String,
171 #[serde(rename = "gitBranch", default)]
172 pub git_branch: String,
173 #[serde(rename = "isSidechain", default)]
174 pub is_sidechain: bool,
175}
176
177impl EntryCommon {
178 fn from_message(session_id: &SessionId, msg: &SessionMessage) -> Self {
179 Self {
180 uuid: msg.id.to_string(),
181 parent_uuid: msg.parent_id.as_ref().map(|id| id.to_string()),
182 session_id: session_id.to_string(),
183 timestamp: msg.timestamp,
184 cwd: msg.environment.as_ref().and_then(|e| e.cwd.clone()),
185 version: env!("CARGO_PKG_VERSION").to_string(),
186 git_branch: msg
187 .environment
188 .as_ref()
189 .and_then(|e| e.git_branch.clone())
190 .unwrap_or_default(),
191 is_sidechain: msg.is_sidechain,
192 }
193 }
194
195 fn to_environment(&self) -> EnvironmentContext {
196 EnvironmentContext {
197 cwd: self.cwd.clone(),
198 git_branch: if self.git_branch.is_empty() {
199 None
200 } else {
201 Some(self.git_branch.clone())
202 },
203 ..Default::default()
204 }
205 }
206}
207
208#[derive(Clone, Debug, Serialize, Deserialize)]
209pub struct UserMessageContent {
210 pub role: String,
211 pub content: serde_json::Value,
212}
213
214#[derive(Clone, Debug, Serialize, Deserialize)]
215pub struct UserEntry {
216 #[serde(flatten)]
217 pub common: EntryCommon,
218 pub message: UserMessageContent,
219 #[serde(rename = "isCompactSummary", default)]
220 pub is_compact_summary: bool,
221}
222
223#[derive(Clone, Debug, Serialize, Deserialize)]
224pub struct AssistantMessageContent {
225 #[serde(skip_serializing_if = "Option::is_none")]
226 pub id: Option<String>,
227 #[serde(skip_serializing_if = "Option::is_none")]
228 pub model: Option<String>,
229 pub role: String,
230 #[serde(skip_serializing_if = "Option::is_none")]
231 pub stop_reason: Option<String>,
232 pub content: serde_json::Value,
233 #[serde(skip_serializing_if = "Option::is_none")]
234 pub usage: Option<UsageInfo>,
235}
236
237#[derive(Clone, Debug, Default, Serialize, Deserialize)]
238pub struct UsageInfo {
239 pub input_tokens: u64,
240 pub output_tokens: u64,
241 #[serde(default)]
242 pub cache_creation_input_tokens: u64,
243 #[serde(default)]
244 pub cache_read_input_tokens: u64,
245}
246
247impl From<&TokenUsage> for UsageInfo {
248 fn from(u: &TokenUsage) -> Self {
249 Self {
250 input_tokens: u.input_tokens,
251 output_tokens: u.output_tokens,
252 cache_creation_input_tokens: u.cache_creation_input_tokens,
253 cache_read_input_tokens: u.cache_read_input_tokens,
254 }
255 }
256}
257
258impl From<&UsageInfo> for TokenUsage {
259 fn from(u: &UsageInfo) -> Self {
260 Self {
261 input_tokens: u.input_tokens,
262 output_tokens: u.output_tokens,
263 cache_creation_input_tokens: u.cache_creation_input_tokens,
264 cache_read_input_tokens: u.cache_read_input_tokens,
265 }
266 }
267}
268
269#[derive(Clone, Debug, Serialize, Deserialize)]
270pub struct AssistantEntry {
271 #[serde(flatten)]
272 pub common: EntryCommon,
273 pub message: AssistantMessageContent,
274 #[serde(rename = "requestId", skip_serializing_if = "Option::is_none")]
275 pub request_id: Option<String>,
276}
277
278#[derive(Clone, Debug, Serialize, Deserialize)]
279pub struct SystemEntry {
280 #[serde(flatten)]
281 pub common: EntryCommon,
282 pub subtype: String,
283 #[serde(skip_serializing_if = "Option::is_none")]
284 pub content: Option<String>,
285}
286
287#[derive(Clone, Debug, Serialize, Deserialize)]
288pub struct QueueOperationEntry {
289 pub operation: String,
290 #[serde(rename = "sessionId")]
291 pub session_id: String,
292 pub timestamp: DateTime<Utc>,
293 pub content: String,
294 pub priority: i32,
295 #[serde(rename = "itemId")]
296 pub item_id: String,
297}
298
299#[derive(Clone, Debug, Serialize, Deserialize)]
300pub struct SummaryEntry {
301 #[serde(rename = "sessionId")]
302 pub session_id: String,
303 pub summary: String,
304 #[serde(rename = "leafUuid", skip_serializing_if = "Option::is_none")]
305 pub leaf_uuid: Option<String>,
306 pub timestamp: DateTime<Utc>,
307}
308
309#[derive(Clone, Debug, Serialize, Deserialize)]
310pub struct SessionMetaEntry {
311 #[serde(rename = "sessionId")]
312 pub session_id: String,
313 #[serde(rename = "parentSessionId", skip_serializing_if = "Option::is_none")]
314 pub parent_session_id: Option<String>,
315 #[serde(rename = "tenantId", skip_serializing_if = "Option::is_none")]
316 pub tenant_id: Option<String>,
317 #[serde(rename = "sessionType")]
318 pub session_type: serde_json::Value,
319 pub mode: String,
320 pub state: String,
321 pub config: serde_json::Value,
322 #[serde(rename = "permissionPolicy")]
323 pub permission_policy: serde_json::Value,
324 #[serde(rename = "totalUsage", default)]
325 pub total_usage: UsageInfo,
326 #[serde(rename = "totalCostUsd", default)]
327 pub total_cost_usd: f64,
328 #[serde(rename = "staticContextHash", skip_serializing_if = "Option::is_none")]
329 pub static_context_hash: Option<String>,
330 #[serde(skip_serializing_if = "Option::is_none")]
331 pub error: Option<String>,
332 #[serde(rename = "createdAt")]
333 pub created_at: DateTime<Utc>,
334 #[serde(rename = "updatedAt")]
335 pub updated_at: DateTime<Utc>,
336 #[serde(rename = "expiresAt", skip_serializing_if = "Option::is_none")]
337 pub expires_at: Option<DateTime<Utc>>,
338}
339
340#[derive(Clone, Debug, Serialize, Deserialize)]
341pub struct TodoEntry {
342 pub id: String,
343 #[serde(rename = "sessionId")]
344 pub session_id: String,
345 pub content: String,
346 #[serde(rename = "activeForm")]
347 pub active_form: String,
348 pub status: String,
349 #[serde(rename = "planId", skip_serializing_if = "Option::is_none")]
350 pub plan_id: Option<String>,
351 #[serde(rename = "createdAt")]
352 pub created_at: DateTime<Utc>,
353 #[serde(rename = "startedAt", skip_serializing_if = "Option::is_none")]
354 pub started_at: Option<DateTime<Utc>>,
355 #[serde(rename = "completedAt", skip_serializing_if = "Option::is_none")]
356 pub completed_at: Option<DateTime<Utc>>,
357}
358
359#[derive(Clone, Debug, Serialize, Deserialize)]
360pub struct PlanEntry {
361 pub id: String,
362 #[serde(rename = "sessionId")]
363 pub session_id: String,
364 #[serde(skip_serializing_if = "Option::is_none")]
365 pub name: Option<String>,
366 pub content: String,
367 pub status: String,
368 #[serde(skip_serializing_if = "Option::is_none")]
369 pub error: Option<String>,
370 #[serde(rename = "createdAt")]
371 pub created_at: DateTime<Utc>,
372 #[serde(rename = "approvedAt", skip_serializing_if = "Option::is_none")]
373 pub approved_at: Option<DateTime<Utc>>,
374 #[serde(rename = "startedAt", skip_serializing_if = "Option::is_none")]
375 pub started_at: Option<DateTime<Utc>>,
376 #[serde(rename = "completedAt", skip_serializing_if = "Option::is_none")]
377 pub completed_at: Option<DateTime<Utc>>,
378}
379
380#[derive(Clone, Debug, Serialize, Deserialize)]
381pub struct CompactEntry {
382 pub id: String,
383 #[serde(rename = "sessionId")]
384 pub session_id: String,
385 pub trigger: String,
386 #[serde(rename = "preTokens")]
387 pub pre_tokens: usize,
388 #[serde(rename = "postTokens")]
389 pub post_tokens: usize,
390 #[serde(rename = "savedTokens")]
391 pub saved_tokens: usize,
392 pub summary: String,
393 #[serde(rename = "originalCount")]
394 pub original_count: usize,
395 #[serde(rename = "newCount")]
396 pub new_count: usize,
397 #[serde(rename = "logicalParentId", skip_serializing_if = "Option::is_none")]
398 pub logical_parent_id: Option<String>,
399 #[serde(rename = "createdAt")]
400 pub created_at: DateTime<Utc>,
401}
402
403impl JsonlEntry {
408 fn from_message(session_id: &SessionId, msg: &SessionMessage) -> Self {
409 let common = EntryCommon::from_message(session_id, msg);
410
411 match msg.role {
412 Role::User => JsonlEntry::User(UserEntry {
413 common,
414 message: UserMessageContent {
415 role: "user".to_string(),
416 content: serde_json::to_value(&msg.content).unwrap_or_default(),
417 },
418 is_compact_summary: msg.is_compact_summary,
419 }),
420 Role::Assistant => JsonlEntry::Assistant(AssistantEntry {
421 common,
422 message: AssistantMessageContent {
423 id: msg.metadata.request_id.clone(),
424 model: msg.metadata.model.clone(),
425 role: "assistant".to_string(),
426 stop_reason: None,
427 content: serde_json::to_value(&msg.content).unwrap_or_default(),
428 usage: msg.usage.as_ref().map(UsageInfo::from),
429 },
430 request_id: msg.metadata.request_id.clone(),
431 }),
432 }
433 }
434
435 fn to_session_message(&self) -> Option<SessionMessage> {
436 match self {
437 JsonlEntry::User(entry) => {
438 let content: Vec<ContentBlock> =
439 serde_json::from_value(entry.message.content.clone()).unwrap_or_default();
440 let mut msg = SessionMessage::user(content);
441 msg.id = MessageId::from_string(&entry.common.uuid);
442 msg.parent_id = entry
443 .common
444 .parent_uuid
445 .as_ref()
446 .map(MessageId::from_string);
447 msg.timestamp = entry.common.timestamp;
448 msg.is_sidechain = entry.common.is_sidechain;
449 msg.is_compact_summary = entry.is_compact_summary;
450 msg.environment = Some(entry.common.to_environment());
451 Some(msg)
452 }
453 JsonlEntry::Assistant(entry) => {
454 let content: Vec<ContentBlock> =
455 serde_json::from_value(entry.message.content.clone()).unwrap_or_default();
456 let mut msg = SessionMessage::assistant(content);
457 msg.id = MessageId::from_string(&entry.common.uuid);
458 msg.parent_id = entry
459 .common
460 .parent_uuid
461 .as_ref()
462 .map(MessageId::from_string);
463 msg.timestamp = entry.common.timestamp;
464 msg.is_sidechain = entry.common.is_sidechain;
465 msg.usage = entry.message.usage.as_ref().map(TokenUsage::from);
466 msg.metadata.model.clone_from(&entry.message.model);
467 msg.metadata.request_id.clone_from(&entry.request_id);
468 msg.environment = Some(entry.common.to_environment());
469 Some(msg)
470 }
471 _ => None,
472 }
473 }
474
475 #[cfg(test)]
476 fn message_uuid(&self) -> Option<&str> {
477 match self {
478 JsonlEntry::User(e) => Some(&e.common.uuid),
479 JsonlEntry::Assistant(e) => Some(&e.common.uuid),
480 _ => None,
481 }
482 }
483}
484
485#[derive(Clone, Debug)]
490struct SessionMeta {
491 path: PathBuf,
492 project_path: Option<PathBuf>,
493 tenant_id: Option<String>,
494 parent_id: Option<SessionId>,
495 updated_at: DateTime<Utc>,
496 persisted_ids: HashSet<String>,
498 todos_hash: u64,
500 plan_hash: u64,
502}
503
504#[derive(Default)]
505struct SessionIndex {
506 sessions: HashMap<SessionId, SessionMeta>,
507 by_project: HashMap<PathBuf, Vec<SessionId>>,
508 by_tenant: HashMap<String, Vec<SessionId>>,
509 by_parent: HashMap<SessionId, Vec<SessionId>>,
510}
511
512impl SessionIndex {
513 fn insert(&mut self, session_id: SessionId, meta: SessionMeta) {
514 self.remove(&session_id);
516
517 if let Some(ref project) = meta.project_path {
518 self.by_project
519 .entry(project.clone())
520 .or_default()
521 .push(session_id);
522 }
523 if let Some(ref tenant) = meta.tenant_id {
524 self.by_tenant
525 .entry(tenant.clone())
526 .or_default()
527 .push(session_id);
528 }
529 if let Some(parent) = meta.parent_id {
530 self.by_parent.entry(parent).or_default().push(session_id);
531 }
532 self.sessions.insert(session_id, meta);
533 }
534
535 fn remove(&mut self, session_id: &SessionId) -> Option<SessionMeta> {
536 let meta = self.sessions.remove(session_id)?;
537
538 if let Some(ref project) = meta.project_path
539 && let Some(ids) = self.by_project.get_mut(project)
540 {
541 ids.retain(|id| id != session_id);
542 }
543 if let Some(ref tenant) = meta.tenant_id
544 && let Some(ids) = self.by_tenant.get_mut(tenant)
545 {
546 ids.retain(|id| id != session_id);
547 }
548 if let Some(parent) = meta.parent_id
549 && let Some(ids) = self.by_parent.get_mut(&parent)
550 {
551 ids.retain(|id| id != session_id);
552 }
553 Some(meta)
554 }
555}
556
557fn read_entries_sync(path: &Path) -> SessionResult<Vec<JsonlEntry>> {
562 if !path.exists() {
563 return Ok(Vec::new());
564 }
565
566 let file = std::fs::File::open(path).map_err(|e| SessionError::Storage {
567 message: format!("Failed to open {}: {}", path.display(), e),
568 })?;
569
570 let reader = BufReader::with_capacity(64 * 1024, file);
571 let mut entries = Vec::with_capacity(128);
572
573 for (line_num, line) in reader.lines().enumerate() {
574 let line = line.map_err(|e| SessionError::Storage {
575 message: format!("Read error at line {}: {}", line_num + 1, e),
576 })?;
577
578 if line.trim().is_empty() {
579 continue;
580 }
581
582 match serde_json::from_str::<JsonlEntry>(&line) {
583 Ok(entry) => entries.push(entry),
584 Err(e) => {
585 tracing::warn!(
586 path = %path.display(),
587 line = line_num + 1,
588 error = %e,
589 "Skipping malformed JSONL entry"
590 );
591 }
592 }
593 }
594
595 Ok(entries)
596}
597
598fn append_entries_sync(path: &Path, entries: &[JsonlEntry], sync: bool) -> SessionResult<()> {
599 if entries.is_empty() {
600 return Ok(());
601 }
602
603 if let Some(parent) = path.parent() {
604 std::fs::create_dir_all(parent).map_err(|e| SessionError::Storage {
605 message: format!("Failed to create directory {}: {}", parent.display(), e),
606 })?;
607 }
608
609 let file = std::fs::OpenOptions::new()
610 .create(true)
611 .append(true)
612 .open(path)
613 .map_err(|e| SessionError::Storage {
614 message: format!("Failed to open {} for writing: {}", path.display(), e),
615 })?;
616
617 let mut writer = std::io::BufWriter::with_capacity(64 * 1024, file);
618
619 for entry in entries {
620 serde_json::to_writer(&mut writer, entry)?;
621 writeln!(writer).map_err(|e| SessionError::Storage {
622 message: format!("Write failed: {}", e),
623 })?;
624 }
625
626 writer.flush().map_err(|e| SessionError::Storage {
627 message: format!("Flush failed: {}", e),
628 })?;
629
630 if sync {
631 writer
632 .into_inner()
633 .map_err(|e| SessionError::Storage {
634 message: format!("Buffer error: {}", e.error()),
635 })?
636 .sync_all()
637 .map_err(|e| SessionError::Storage {
638 message: format!("Sync failed: {}", e),
639 })?;
640 }
641
642 Ok(())
643}
644
645pub struct JsonlPersistence {
650 config: JsonlConfig,
651 index: Arc<RwLock<SessionIndex>>,
652 summaries: Arc<RwLock<HashMap<SessionId, Vec<SummarySnapshot>>>>,
653 queue: Arc<RwLock<HashMap<SessionId, Vec<QueueItem>>>>,
654}
655
656impl JsonlPersistence {
657 pub async fn new(config: JsonlConfig) -> SessionResult<Self> {
658 tokio::fs::create_dir_all(config.projects_dir())
659 .await
660 .map_err(|e| SessionError::Storage {
661 message: format!("Failed to create projects directory: {}", e),
662 })?;
663
664 let persistence = Self {
665 config,
666 index: Arc::new(RwLock::new(SessionIndex::default())),
667 summaries: Arc::new(RwLock::new(HashMap::new())),
668 queue: Arc::new(RwLock::new(HashMap::new())),
669 };
670
671 persistence.rebuild_index().await?;
672 Ok(persistence)
673 }
674
675 pub async fn default_config() -> SessionResult<Self> {
676 Self::new(JsonlConfig::default()).await
677 }
678
679 async fn rebuild_index(&self) -> SessionResult<()> {
680 let projects_dir = self.config.projects_dir();
681 if !projects_dir.exists() {
682 return Ok(());
683 }
684
685 let mut index = self.index.write().await;
686 let mut summaries = self.summaries.write().await;
687
688 let mut entries =
689 tokio::fs::read_dir(&projects_dir)
690 .await
691 .map_err(|e| SessionError::Storage {
692 message: format!("Failed to read projects dir: {}", e),
693 })?;
694
695 while let Some(project_entry) =
696 entries
697 .next_entry()
698 .await
699 .map_err(|e| SessionError::Storage {
700 message: format!("Failed to read entry: {}", e),
701 })?
702 {
703 let file_type = project_entry.file_type().await.ok();
704 if !file_type.map(|t| t.is_dir()).unwrap_or(false) {
705 continue;
706 }
707
708 let project_path = project_entry.path();
709 let mut files =
710 tokio::fs::read_dir(&project_path)
711 .await
712 .map_err(|e| SessionError::Storage {
713 message: format!("Failed to read project dir: {}", e),
714 })?;
715
716 while let Some(file_entry) =
717 files
718 .next_entry()
719 .await
720 .map_err(|e| SessionError::Storage {
721 message: format!("Failed to read file entry: {}", e),
722 })?
723 {
724 let file_path = file_entry.path();
725 if file_path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
726 continue;
727 }
728
729 let session_id = match file_path
730 .file_stem()
731 .and_then(|s| s.to_str())
732 .and_then(SessionId::parse)
733 {
734 Some(id) => id,
735 None => continue,
736 };
737
738 let path_clone = file_path.clone();
740 let parsed = tokio::task::spawn_blocking(move || read_entries_sync(&path_clone))
741 .await
742 .map_err(|e| SessionError::Storage {
743 message: format!("Task join error: {}", e),
744 })??;
745
746 let (meta, session_summaries) =
747 Self::parse_file_metadata(session_id, file_path, &parsed);
748
749 index.insert(session_id, meta);
750 if !session_summaries.is_empty() {
751 summaries.insert(session_id, session_summaries);
752 }
753 }
754 }
755
756 Ok(())
757 }
758
759 fn parse_file_metadata(
760 session_id: SessionId,
761 path: PathBuf,
762 entries: &[JsonlEntry],
763 ) -> (SessionMeta, Vec<SummarySnapshot>) {
764 let mut project_path: Option<PathBuf> = None;
765 let mut tenant_id: Option<String> = None;
766 let mut parent_id: Option<SessionId> = None;
767 let mut updated_at = Utc::now();
768 let mut summaries = Vec::new();
769 let mut persisted_ids = HashSet::with_capacity(entries.len());
770
771 for entry in entries {
772 match entry {
773 JsonlEntry::User(e) => {
774 if project_path.is_none() {
775 project_path = e.common.cwd.clone();
776 }
777 updated_at = e.common.timestamp;
778 persisted_ids.insert(e.common.uuid.clone());
779 }
780 JsonlEntry::Assistant(e) => {
781 if project_path.is_none() {
782 project_path = e.common.cwd.clone();
783 }
784 updated_at = e.common.timestamp;
785 persisted_ids.insert(e.common.uuid.clone());
786 }
787 JsonlEntry::SessionMeta(m) => {
788 tenant_id.clone_from(&m.tenant_id);
789 parent_id = m
790 .parent_session_id
791 .as_ref()
792 .and_then(|s| SessionId::parse(s));
793 updated_at = m.updated_at;
794 }
795 JsonlEntry::Summary(s) => {
796 summaries.push(SummarySnapshot {
797 id: Uuid::new_v4(),
798 session_id,
799 summary: s.summary.clone(),
800 leaf_message_id: s.leaf_uuid.as_ref().map(MessageId::from_string),
801 created_at: s.timestamp,
802 });
803 }
804 _ => {}
805 }
806 }
807
808 (
809 SessionMeta {
810 path,
811 project_path,
812 tenant_id,
813 parent_id,
814 updated_at,
815 persisted_ids,
816 todos_hash: 0, plan_hash: 0, },
819 summaries,
820 )
821 }
822
823 fn session_file_path(&self, session_id: &SessionId, project_path: Option<&Path>) -> PathBuf {
824 let dir = match project_path {
825 Some(p) => self.config.project_dir(p),
826 None => self.config.projects_dir().join("_default"),
827 };
828 dir.join(format!("{}.jsonl", session_id))
829 }
830
831 fn get_project_path(session: &Session) -> Option<PathBuf> {
832 session
833 .messages
834 .first()
835 .and_then(|m| m.environment.as_ref())
836 .and_then(|e| e.cwd.clone())
837 }
838
839 fn compute_todos_hash(todos: &[TodoItem]) -> u64 {
841 let mut hasher = std::collections::hash_map::DefaultHasher::new();
842 for todo in todos {
843 todo.id.hash(&mut hasher);
844 format!("{:?}", todo.status).hash(&mut hasher);
845 todo.content.hash(&mut hasher);
846 }
847 hasher.finish()
848 }
849
850 fn compute_plan_hash(plan: Option<&Plan>) -> u64 {
852 let mut hasher = std::collections::hash_map::DefaultHasher::new();
853 if let Some(p) = plan {
854 p.id.hash(&mut hasher);
855 format!("{:?}", p.status).hash(&mut hasher);
856 p.content.hash(&mut hasher);
857 }
858 hasher.finish()
859 }
860
861 fn session_to_meta_entry(session: &Session) -> JsonlEntry {
862 JsonlEntry::SessionMeta(SessionMetaEntry {
863 session_id: session.id.to_string(),
864 parent_session_id: session.parent_id.map(|p| p.to_string()),
865 tenant_id: session.tenant_id.clone(),
866 session_type: serde_json::to_value(&session.session_type).unwrap_or_default(),
867 mode: format!("{:?}", session.mode).to_lowercase(),
868 state: format!("{:?}", session.state).to_lowercase(),
869 config: serde_json::to_value(&session.config).unwrap_or_default(),
870 permission_policy: serde_json::to_value(&session.permission_policy).unwrap_or_default(),
871 total_usage: UsageInfo::from(&session.total_usage),
872 total_cost_usd: session.total_cost_usd,
873 static_context_hash: session.static_context_hash.clone(),
874 error: session.error.clone(),
875 created_at: session.created_at,
876 updated_at: session.updated_at,
877 expires_at: session.expires_at,
878 })
879 }
880
881 fn reconstruct_session(session_id: SessionId, entries: Vec<JsonlEntry>) -> Session {
882 let mut session = Session::new(SessionConfig::default());
883 session.id = session_id;
884
885 let mut messages: HashMap<String, SessionMessage> = HashMap::with_capacity(entries.len());
886 let mut todos_map: HashMap<String, TodoItem> = HashMap::new();
887 let mut latest_plan: Option<Plan> = None;
888 let mut compacts: Vec<CompactRecord> = Vec::new();
889
890 for entry in entries {
891 match entry {
892 JsonlEntry::User(_) | JsonlEntry::Assistant(_) => {
893 if let Some(msg) = entry.to_session_message() {
894 messages.insert(msg.id.to_string(), msg);
895 }
896 }
897 JsonlEntry::SessionMeta(m) => {
898 session.tenant_id = m.tenant_id;
899 session.parent_id = m
900 .parent_session_id
901 .as_ref()
902 .and_then(|s| SessionId::parse(s));
903 session.session_type =
904 serde_json::from_value(m.session_type).unwrap_or(SessionType::Main);
905 session.mode = serde_json::from_str(&format!("\"{}\"", m.mode))
906 .unwrap_or(SessionMode::default());
907 session.state = match m.state.as_str() {
908 "active" => SessionState::Active,
909 "completed" => SessionState::Completed,
910 "failed" => SessionState::Failed,
911 "cancelled" => SessionState::Cancelled,
912 "waitingfortools" => SessionState::WaitingForTools,
913 _ => SessionState::Created,
914 };
915 session.config = serde_json::from_value(m.config).unwrap_or_default();
916 session.permission_policy =
917 serde_json::from_value(m.permission_policy).unwrap_or_default();
918 session.total_usage = TokenUsage::from(&m.total_usage);
919 session.total_cost_usd = m.total_cost_usd;
920 session.static_context_hash = m.static_context_hash;
921 session.error = m.error;
922 session.created_at = m.created_at;
923 session.updated_at = m.updated_at;
924 session.expires_at = m.expires_at;
925 }
926 JsonlEntry::Summary(s) => {
927 session.summary = Some(s.summary);
928 }
929 JsonlEntry::Todo(t) => {
930 let status = match t.status.as_str() {
931 "in_progress" | "inprogress" => TodoStatus::InProgress,
932 "completed" => TodoStatus::Completed,
933 _ => TodoStatus::Pending,
934 };
935 let todo = TodoItem {
936 id: Uuid::parse_str(&t.id).unwrap_or_else(|_| Uuid::new_v4()),
937 session_id,
938 content: t.content,
939 active_form: t.active_form,
940 status,
941 plan_id: t.plan_id.and_then(|s| Uuid::parse_str(&s).ok()),
942 created_at: t.created_at,
943 started_at: t.started_at,
944 completed_at: t.completed_at,
945 };
946 todos_map.insert(t.id, todo);
948 }
949 JsonlEntry::Plan(p) => {
950 let status = match p.status.as_str() {
951 "approved" => PlanStatus::Approved,
952 "executing" | "inprogress" | "in_progress" => PlanStatus::Executing,
953 "completed" => PlanStatus::Completed,
954 "cancelled" => PlanStatus::Cancelled,
955 "failed" => PlanStatus::Failed,
956 _ => PlanStatus::Draft,
957 };
958 let plan = Plan {
959 id: Uuid::parse_str(&p.id).unwrap_or_else(|_| Uuid::new_v4()),
960 session_id,
961 name: p.name,
962 content: p.content,
963 status,
964 error: p.error,
965 created_at: p.created_at,
966 approved_at: p.approved_at,
967 started_at: p.started_at,
968 completed_at: p.completed_at,
969 };
970 latest_plan = Some(plan);
972 }
973 JsonlEntry::Compact(c) => {
974 let trigger = match c.trigger.as_str() {
975 "auto" | "automatic" => CompactTrigger::Auto,
976 "threshold" => CompactTrigger::Threshold,
977 _ => CompactTrigger::Manual,
978 };
979 compacts.push(CompactRecord {
980 id: Uuid::parse_str(&c.id).unwrap_or_else(|_| Uuid::new_v4()),
981 session_id,
982 trigger,
983 pre_tokens: c.pre_tokens,
984 post_tokens: c.post_tokens,
985 saved_tokens: c.saved_tokens,
986 summary: c.summary,
987 original_count: c.original_count,
988 new_count: c.new_count,
989 logical_parent_id: c.logical_parent_id.as_ref().map(MessageId::from_string),
990 created_at: c.created_at,
991 });
992 }
993 _ => {}
994 }
995 }
996
997 let ordered = Self::topological_sort(&messages);
999 session.messages = Vec::with_capacity(ordered.len());
1000 for msg in ordered {
1001 session.add_message(msg);
1002 }
1003
1004 session.todos = todos_map.into_values().collect();
1006 session
1007 .todos
1008 .sort_by(|a, b| a.created_at.cmp(&b.created_at));
1009 session.current_plan = latest_plan;
1010 session.compact_history = compacts;
1011
1012 session
1013 }
1014
1015 fn topological_sort(messages: &HashMap<String, SessionMessage>) -> Vec<SessionMessage> {
1016 if messages.is_empty() {
1017 return Vec::new();
1018 }
1019
1020 let mut children: HashMap<Option<String>, Vec<&SessionMessage>> = HashMap::new();
1022 for msg in messages.values() {
1023 children
1024 .entry(msg.parent_id.as_ref().map(|p| p.to_string()))
1025 .or_default()
1026 .push(msg);
1027 }
1028
1029 for group in children.values_mut() {
1031 group.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
1032 }
1033
1034 let mut result = Vec::with_capacity(messages.len());
1036 let mut queue = std::collections::VecDeque::new();
1037
1038 if let Some(roots) = children.remove(&None) {
1040 queue.extend(roots);
1041 }
1042
1043 while let Some(msg) = queue.pop_front() {
1044 let id = msg.id.to_string();
1045 result.push(msg.clone());
1046 if let Some(child_msgs) = children.remove(&Some(id)) {
1047 queue.extend(child_msgs);
1048 }
1049 }
1050
1051 result
1052 }
1053}
1054
1055#[async_trait::async_trait]
1056impl Persistence for JsonlPersistence {
1057 fn name(&self) -> &str {
1058 "jsonl"
1059 }
1060
1061 async fn save(&self, session: &Session) -> SessionResult<()> {
1062 let project_path = Self::get_project_path(session);
1063 let file_path = self.session_file_path(&session.id, project_path.as_deref());
1064
1065 let (persisted_ids, prev_todos_hash, prev_plan_hash) = {
1067 let index = self.index.read().await;
1068 match index.sessions.get(&session.id) {
1069 Some(m) => (m.persisted_ids.clone(), m.todos_hash, m.plan_hash),
1070 None => (HashSet::new(), 0, 0),
1071 }
1072 };
1073
1074 let mut new_entries = Vec::new();
1075 let mut new_ids = HashSet::new();
1076 let is_first_save = persisted_ids.is_empty();
1077
1078 if is_first_save {
1080 new_entries.push(Self::session_to_meta_entry(session));
1081 }
1082
1083 for msg in &session.messages {
1085 let id = msg.id.to_string();
1086 if !persisted_ids.contains(&id) {
1087 new_entries.push(JsonlEntry::from_message(&session.id, msg));
1088 new_ids.insert(id);
1089 }
1090 }
1091
1092 let current_todos_hash = Self::compute_todos_hash(&session.todos);
1094 let current_plan_hash = Self::compute_plan_hash(session.current_plan.as_ref());
1095
1096 if current_todos_hash != prev_todos_hash {
1098 for todo in &session.todos {
1099 new_entries.push(JsonlEntry::Todo(TodoEntry {
1100 id: todo.id.to_string(),
1101 session_id: session.id.to_string(),
1102 content: todo.content.clone(),
1103 active_form: todo.active_form.clone(),
1104 status: format!("{:?}", todo.status).to_lowercase(),
1105 plan_id: todo.plan_id.map(|id| id.to_string()),
1106 created_at: todo.created_at,
1107 started_at: todo.started_at,
1108 completed_at: todo.completed_at,
1109 }));
1110 }
1111 }
1112
1113 if current_plan_hash != prev_plan_hash
1115 && let Some(ref plan) = session.current_plan
1116 {
1117 new_entries.push(JsonlEntry::Plan(PlanEntry {
1118 id: plan.id.to_string(),
1119 session_id: session.id.to_string(),
1120 name: plan.name.clone(),
1121 content: plan.content.clone(),
1122 status: format!("{:?}", plan.status).to_lowercase(),
1123 error: plan.error.clone(),
1124 created_at: plan.created_at,
1125 approved_at: plan.approved_at,
1126 started_at: plan.started_at,
1127 completed_at: plan.completed_at,
1128 }));
1129 }
1130
1131 for compact in &session.compact_history {
1133 let compact_id = format!("compact:{}", compact.id);
1134 if !persisted_ids.contains(&compact_id) {
1135 new_entries.push(JsonlEntry::Compact(CompactEntry {
1136 id: compact.id.to_string(),
1137 session_id: session.id.to_string(),
1138 trigger: format!("{:?}", compact.trigger).to_lowercase(),
1139 pre_tokens: compact.pre_tokens,
1140 post_tokens: compact.post_tokens,
1141 saved_tokens: compact.saved_tokens,
1142 summary: compact.summary.clone(),
1143 original_count: compact.original_count,
1144 new_count: compact.new_count,
1145 logical_parent_id: compact.logical_parent_id.as_ref().map(|id| id.to_string()),
1146 created_at: compact.created_at,
1147 }));
1148 new_ids.insert(compact_id);
1149 }
1150 }
1151
1152 if new_entries.is_empty() {
1153 return Ok(());
1154 }
1155
1156 let path_clone = file_path.clone();
1158 let sync = self.config.sync_mode == SyncMode::OnWrite;
1159 tokio::task::spawn_blocking(move || append_entries_sync(&path_clone, &new_entries, sync))
1160 .await
1161 .map_err(|e| SessionError::Storage {
1162 message: format!("Task join error: {}", e),
1163 })??;
1164
1165 let mut index = self.index.write().await;
1167 let mut persisted = persisted_ids;
1168 persisted.extend(new_ids);
1169
1170 index.insert(
1171 session.id,
1172 SessionMeta {
1173 path: file_path,
1174 project_path,
1175 tenant_id: session.tenant_id.clone(),
1176 parent_id: session.parent_id,
1177 updated_at: session.updated_at,
1178 persisted_ids: persisted,
1179 todos_hash: current_todos_hash,
1180 plan_hash: current_plan_hash,
1181 },
1182 );
1183
1184 Ok(())
1185 }
1186
1187 async fn load(&self, id: &SessionId) -> SessionResult<Option<Session>> {
1188 let path = {
1189 let index = self.index.read().await;
1190 match index.sessions.get(id) {
1191 Some(m) => m.path.clone(),
1192 None => return Ok(None),
1193 }
1194 };
1195
1196 let entries = tokio::task::spawn_blocking(move || read_entries_sync(&path))
1197 .await
1198 .map_err(|e| SessionError::Storage {
1199 message: format!("Task join error: {}", e),
1200 })??;
1201
1202 if entries.is_empty() {
1203 return Ok(None);
1204 }
1205
1206 let session = Self::reconstruct_session(*id, entries);
1207 Ok(Some(session))
1208 }
1209
1210 async fn delete(&self, id: &SessionId) -> SessionResult<bool> {
1211 let meta = {
1212 let mut index = self.index.write().await;
1213 index.remove(id)
1214 };
1215
1216 let Some(meta) = meta else {
1217 return Ok(false);
1218 };
1219
1220 if meta.path.exists() {
1221 tokio::fs::remove_file(&meta.path)
1222 .await
1223 .map_err(|e| SessionError::Storage {
1224 message: format!("Failed to delete {}: {}", meta.path.display(), e),
1225 })?;
1226 }
1227
1228 self.summaries.write().await.remove(id);
1229 self.queue.write().await.remove(id);
1230 Ok(true)
1231 }
1232
1233 async fn list(&self, tenant_id: Option<&str>) -> SessionResult<Vec<SessionId>> {
1234 let index = self.index.read().await;
1235 Ok(match tenant_id {
1236 Some(tid) => index.by_tenant.get(tid).cloned().unwrap_or_default(),
1237 None => index.sessions.keys().copied().collect(),
1238 })
1239 }
1240
1241 async fn list_children(&self, parent_id: &SessionId) -> SessionResult<Vec<SessionId>> {
1242 let index = self.index.read().await;
1243 Ok(index.by_parent.get(parent_id).cloned().unwrap_or_default())
1244 }
1245
1246 async fn add_summary(&self, snapshot: SummarySnapshot) -> SessionResult<()> {
1247 let path = {
1248 let index = self.index.read().await;
1249 index
1250 .sessions
1251 .get(&snapshot.session_id)
1252 .map(|m| m.path.clone())
1253 };
1254
1255 if let Some(path) = path {
1256 let entry = JsonlEntry::Summary(SummaryEntry {
1257 session_id: snapshot.session_id.to_string(),
1258 summary: snapshot.summary.clone(),
1259 leaf_uuid: snapshot.leaf_message_id.as_ref().map(|id| id.to_string()),
1260 timestamp: snapshot.created_at,
1261 });
1262
1263 let sync = self.config.sync_mode == SyncMode::OnWrite;
1264 tokio::task::spawn_blocking(move || append_entries_sync(&path, &[entry], sync))
1265 .await
1266 .map_err(|e| SessionError::Storage {
1267 message: format!("Task join error: {}", e),
1268 })??;
1269 }
1270
1271 self.summaries
1272 .write()
1273 .await
1274 .entry(snapshot.session_id)
1275 .or_default()
1276 .push(snapshot);
1277
1278 Ok(())
1279 }
1280
1281 async fn get_summaries(&self, session_id: &SessionId) -> SessionResult<Vec<SummarySnapshot>> {
1282 Ok(self
1283 .summaries
1284 .read()
1285 .await
1286 .get(session_id)
1287 .cloned()
1288 .unwrap_or_default())
1289 }
1290
1291 async fn enqueue(
1292 &self,
1293 session_id: &SessionId,
1294 content: String,
1295 priority: i32,
1296 ) -> SessionResult<QueueItem> {
1297 let item = QueueItem::enqueue(*session_id, content.clone()).with_priority(priority);
1298
1299 let path = {
1300 let index = self.index.read().await;
1301 index.sessions.get(session_id).map(|m| m.path.clone())
1302 };
1303
1304 if let Some(path) = path {
1305 let entry = JsonlEntry::QueueOperation(QueueOperationEntry {
1306 operation: "enqueue".to_string(),
1307 session_id: session_id.to_string(),
1308 timestamp: Utc::now(),
1309 content,
1310 priority,
1311 item_id: item.id.to_string(),
1312 });
1313
1314 let sync = self.config.sync_mode == SyncMode::OnWrite;
1315 tokio::task::spawn_blocking(move || append_entries_sync(&path, &[entry], sync))
1316 .await
1317 .map_err(|e| SessionError::Storage {
1318 message: format!("Task join error: {}", e),
1319 })??;
1320 }
1321
1322 self.queue
1323 .write()
1324 .await
1325 .entry(*session_id)
1326 .or_default()
1327 .push(item.clone());
1328
1329 Ok(item)
1330 }
1331
1332 async fn dequeue(&self, session_id: &SessionId) -> SessionResult<Option<QueueItem>> {
1333 let mut queue = self.queue.write().await;
1334 let items = match queue.get_mut(session_id) {
1335 Some(items) => items,
1336 None => return Ok(None),
1337 };
1338
1339 items.sort_by(|a, b| b.priority.cmp(&a.priority));
1340
1341 for item in items.iter_mut() {
1342 if item.status == QueueStatus::Pending {
1343 item.start_processing();
1344 return Ok(Some(item.clone()));
1345 }
1346 }
1347
1348 Ok(None)
1349 }
1350
1351 async fn cancel_queued(&self, item_id: Uuid) -> SessionResult<bool> {
1352 let mut queue = self.queue.write().await;
1353 for items in queue.values_mut() {
1354 if let Some(item) = items.iter_mut().find(|i| i.id == item_id) {
1355 item.cancel();
1356 return Ok(true);
1357 }
1358 }
1359 Ok(false)
1360 }
1361
1362 async fn pending_queue(&self, session_id: &SessionId) -> SessionResult<Vec<QueueItem>> {
1363 Ok(self
1364 .queue
1365 .read()
1366 .await
1367 .get(session_id)
1368 .map(|items| {
1369 items
1370 .iter()
1371 .filter(|i| i.status == QueueStatus::Pending)
1372 .cloned()
1373 .collect()
1374 })
1375 .unwrap_or_default())
1376 }
1377
1378 async fn cleanup_expired(&self) -> SessionResult<usize> {
1379 let cutoff = Utc::now() - chrono::Duration::days(self.config.retention_days as i64);
1380
1381 let expired_paths: Vec<PathBuf> = {
1383 let mut index = self.index.write().await;
1384 let expired_ids: Vec<SessionId> = index
1385 .sessions
1386 .iter()
1387 .filter(|(_, m)| m.updated_at < cutoff)
1388 .map(|(id, _)| *id)
1389 .collect();
1390
1391 let mut paths = Vec::with_capacity(expired_ids.len());
1392 for id in &expired_ids {
1393 if let Some(meta) = index.remove(id) {
1394 paths.push(meta.path);
1395 }
1396 }
1397 paths
1398 };
1399
1400 let count = expired_paths.len();
1401
1402 for path in expired_paths {
1404 let _ = tokio::fs::remove_file(&path).await;
1405 }
1406
1407 Ok(count)
1408 }
1409}
1410
1411#[cfg(test)]
1416mod tests {
1417 use super::*;
1418 use crate::types::ContentBlock;
1419 use tempfile::TempDir;
1420
1421 async fn create_test_persistence() -> (JsonlPersistence, TempDir) {
1422 let temp_dir = TempDir::new().unwrap();
1423 let config = JsonlConfig::builder()
1424 .base_dir(temp_dir.path().to_path_buf())
1425 .build();
1426 let persistence = JsonlPersistence::new(config).await.unwrap();
1427 (persistence, temp_dir)
1428 }
1429
1430 #[tokio::test]
1431 async fn test_save_and_load_session() {
1432 let (persistence, _temp) = create_test_persistence().await;
1433
1434 let mut session = Session::new(SessionConfig::default());
1435 session.add_message(SessionMessage::user(vec![ContentBlock::text("Hello")]));
1436 session.add_message(SessionMessage::assistant(vec![ContentBlock::text(
1437 "Hi there!",
1438 )]));
1439
1440 persistence.save(&session).await.unwrap();
1441
1442 let loaded = persistence.load(&session.id).await.unwrap().unwrap();
1443 assert_eq!(loaded.id, session.id);
1444 assert_eq!(loaded.messages.len(), 2);
1445 }
1446
1447 #[tokio::test]
1448 async fn test_incremental_save() {
1449 let (persistence, _temp) = create_test_persistence().await;
1450
1451 let mut session = Session::new(SessionConfig::default());
1452 session.add_message(SessionMessage::user(vec![ContentBlock::text("First")]));
1453 persistence.save(&session).await.unwrap();
1454
1455 session.add_message(SessionMessage::assistant(vec![ContentBlock::text(
1456 "Second",
1457 )]));
1458 persistence.save(&session).await.unwrap();
1459
1460 let loaded = persistence.load(&session.id).await.unwrap().unwrap();
1461 assert_eq!(loaded.messages.len(), 2);
1462 }
1463
1464 #[tokio::test]
1465 async fn test_delete_session() {
1466 let (persistence, _temp) = create_test_persistence().await;
1467
1468 let session = Session::new(SessionConfig::default());
1469 persistence.save(&session).await.unwrap();
1470
1471 assert!(persistence.delete(&session.id).await.unwrap());
1472 assert!(persistence.load(&session.id).await.unwrap().is_none());
1473 }
1474
1475 #[tokio::test]
1476 async fn test_list_sessions() {
1477 let (persistence, _temp) = create_test_persistence().await;
1478
1479 let s1 = Session::new(SessionConfig::default());
1480 let s2 = Session::new(SessionConfig::default());
1481
1482 persistence.save(&s1).await.unwrap();
1483 persistence.save(&s2).await.unwrap();
1484
1485 let list = persistence.list(None).await.unwrap();
1486 assert_eq!(list.len(), 2);
1487 }
1488
1489 #[tokio::test]
1490 async fn test_tenant_filtering() {
1491 let (persistence, _temp) = create_test_persistence().await;
1492
1493 let mut s1 = Session::new(SessionConfig::default());
1494 s1.tenant_id = Some("tenant-a".to_string());
1495
1496 let mut s2 = Session::new(SessionConfig::default());
1497 s2.tenant_id = Some("tenant-b".to_string());
1498
1499 persistence.save(&s1).await.unwrap();
1500 persistence.save(&s2).await.unwrap();
1501
1502 let list = persistence.list(Some("tenant-a")).await.unwrap();
1503 assert_eq!(list.len(), 1);
1504 assert_eq!(list[0], s1.id);
1505 }
1506
1507 #[tokio::test]
1508 async fn test_summaries() {
1509 let (persistence, _temp) = create_test_persistence().await;
1510
1511 let session = Session::new(SessionConfig::default());
1512 persistence.save(&session).await.unwrap();
1513
1514 persistence
1515 .add_summary(SummarySnapshot::new(session.id, "Summary 1"))
1516 .await
1517 .unwrap();
1518 persistence
1519 .add_summary(SummarySnapshot::new(session.id, "Summary 2"))
1520 .await
1521 .unwrap();
1522
1523 let summaries = persistence.get_summaries(&session.id).await.unwrap();
1524 assert_eq!(summaries.len(), 2);
1525 }
1526
1527 #[tokio::test]
1528 async fn test_queue_operations() {
1529 let (persistence, _temp) = create_test_persistence().await;
1530
1531 let session = Session::new(SessionConfig::default());
1532 persistence.save(&session).await.unwrap();
1533
1534 persistence
1535 .enqueue(&session.id, "Low priority".to_string(), 1)
1536 .await
1537 .unwrap();
1538 persistence
1539 .enqueue(&session.id, "High priority".to_string(), 10)
1540 .await
1541 .unwrap();
1542
1543 let next = persistence.dequeue(&session.id).await.unwrap().unwrap();
1544 assert_eq!(next.content, "High priority");
1545 }
1546
1547 #[tokio::test]
1548 async fn test_dag_reconstruction() {
1549 let (persistence, _temp) = create_test_persistence().await;
1550
1551 let mut session = Session::new(SessionConfig::default());
1552 session.add_message(SessionMessage::user(vec![ContentBlock::text("Q1")]));
1553 session.add_message(SessionMessage::assistant(vec![ContentBlock::text("A1")]));
1554 session.add_message(SessionMessage::user(vec![ContentBlock::text("Q2")]));
1555 session.add_message(SessionMessage::assistant(vec![ContentBlock::text("A2")]));
1556
1557 persistence.save(&session).await.unwrap();
1558
1559 let loaded = persistence.load(&session.id).await.unwrap().unwrap();
1560
1561 assert_eq!(loaded.messages.len(), 4);
1562 assert!(
1563 loaded.messages[0]
1564 .content
1565 .iter()
1566 .any(|c| c.as_text() == Some("Q1"))
1567 );
1568 assert!(
1569 loaded.messages[1]
1570 .content
1571 .iter()
1572 .any(|c| c.as_text() == Some("A1"))
1573 );
1574 assert!(
1575 loaded.messages[2]
1576 .content
1577 .iter()
1578 .any(|c| c.as_text() == Some("Q2"))
1579 );
1580 assert!(
1581 loaded.messages[3]
1582 .content
1583 .iter()
1584 .any(|c| c.as_text() == Some("A2"))
1585 );
1586 }
1587
1588 #[tokio::test]
1589 async fn test_project_path_encoding() {
1590 let config = JsonlConfig::default();
1591
1592 assert_eq!(
1593 config.encode_project_path(Path::new("/home/user/project")),
1594 "-home-user-project"
1595 );
1596 assert_eq!(
1597 config.encode_project_path(Path::new("/Users/alice/work/app")),
1598 "-Users-alice-work-app"
1599 );
1600 }
1601
1602 #[test]
1603 fn test_jsonl_entry_serialization() {
1604 let msg = SessionMessage::user(vec![ContentBlock::text("Hello")]);
1605 let session_id = SessionId::new();
1606 let entry = JsonlEntry::from_message(&session_id, &msg);
1607
1608 let json = serde_json::to_string(&entry).unwrap();
1609 assert!(json.contains("\"type\":\"user\""));
1610
1611 let parsed: JsonlEntry = serde_json::from_str(&json).unwrap();
1612 assert!(matches!(parsed, JsonlEntry::User(_)));
1613 }
1614
1615 #[tokio::test]
1616 async fn test_no_duplicate_writes() {
1617 let (persistence, _temp) = create_test_persistence().await;
1618
1619 let mut session = Session::new(SessionConfig::default());
1620 session.add_message(SessionMessage::user(vec![ContentBlock::text("Hello")]));
1621 persistence.save(&session).await.unwrap();
1622 persistence.save(&session).await.unwrap(); persistence.save(&session).await.unwrap(); let file_path = persistence.session_file_path(&session.id, None);
1627 let entries = read_entries_sync(&file_path).unwrap();
1628 let message_count = entries
1629 .iter()
1630 .filter(|e| e.message_uuid().is_some())
1631 .count();
1632 assert_eq!(message_count, 1, "Should not duplicate message entries");
1633 }
1634
1635 #[test]
1636 fn test_windows_path_encoding() {
1637 let config = JsonlConfig::default();
1638 assert_eq!(
1640 config.encode_project_path(Path::new("C:\\Users\\alice\\project")),
1641 "C_-Users-alice-project"
1642 );
1643 }
1644}