1#[derive(Debug, Clone, Serialize, Deserialize, Default)]
2pub struct SessionMeta {
3 pub parent_id: Option<String>,
4 #[serde(default)]
5 pub archived: bool,
6 #[serde(default)]
7 pub shared: bool,
8 pub share_id: Option<String>,
9 pub summary: Option<String>,
10 #[serde(default)]
11 pub snapshots: Vec<Vec<Message>>,
12 pub pre_revert: Option<Vec<Message>>,
13 #[serde(default)]
14 pub todos: Vec<Value>,
15}
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct QuestionToolRef {
19 #[serde(rename = "callID")]
20 pub call_id: String,
21 #[serde(rename = "messageID")]
22 pub message_id: String,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct QuestionRequest {
27 pub id: String,
28 #[serde(rename = "sessionID")]
29 pub session_id: String,
30 #[serde(default)]
31 pub questions: Vec<Value>,
32 #[serde(skip_serializing_if = "Option::is_none")]
33 pub tool: Option<QuestionToolRef>,
34}
35
36pub struct Storage {
37 base: PathBuf,
38 sessions: RwLock<HashMap<String, Session>>,
39 metadata: RwLock<HashMap<String, SessionMeta>>,
40 question_requests: RwLock<HashMap<String, QuestionRequest>>,
41 flush_lock: Mutex<()>,
42}
43
44#[derive(Debug, Clone)]
45pub enum SessionListScope {
46 Global,
47 Workspace { workspace_root: String },
48}
49
50#[derive(Debug, Clone, Default, Serialize, Deserialize)]
51pub struct SessionRepairStats {
52 pub sessions_repaired: u64,
53 pub messages_recovered: u64,
54 pub parts_recovered: u64,
55 pub conflicts_merged: u64,
56}
57
58const LEGACY_IMPORT_MARKER_FILE: &str = "legacy_import_marker.json";
59const LEGACY_IMPORT_MARKER_VERSION: u32 = 1;
60const MAX_SESSION_SNAPSHOTS: usize = 5;
61
62#[derive(Debug, Clone, Default, Serialize, Deserialize)]
63pub struct LegacyTreeCounts {
64 pub session_files: u64,
65 pub message_files: u64,
66 pub part_files: u64,
67}
68
69#[derive(Debug, Clone, Default, Serialize, Deserialize)]
70pub struct LegacyImportedCounts {
71 pub sessions: u64,
72 pub messages: u64,
73 pub parts: u64,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct LegacyImportMarker {
78 pub version: u32,
79 pub created_at_ms: u64,
80 pub last_checked_at_ms: u64,
81 pub legacy_counts: LegacyTreeCounts,
82 pub imported_counts: LegacyImportedCounts,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct LegacyRepairRunReport {
87 pub status: String,
88 pub marker_updated: bool,
89 pub sessions_merged: u64,
90 pub messages_recovered: u64,
91 pub parts_recovered: u64,
92 pub legacy_counts: LegacyTreeCounts,
93 pub imported_counts: LegacyImportedCounts,
94}
95
96fn snapshot_session_messages(
97 session_id: &str,
98 session: &Session,
99 metadata: &mut HashMap<String, SessionMeta>,
100) {
101 let meta = metadata
102 .entry(session_id.to_string())
103 .or_insert_with(SessionMeta::default);
104 meta.snapshots.push(session.messages.clone());
105 trim_session_snapshots(&mut meta.snapshots);
106}
107
108fn trim_session_snapshots(snapshots: &mut Vec<Vec<Message>>) {
109 if snapshots.len() > MAX_SESSION_SNAPSHOTS {
110 let keep_from = snapshots.len() - MAX_SESSION_SNAPSHOTS;
111 snapshots.drain(0..keep_from);
112 }
113}
114
115fn compact_session_snapshots(snapshots: &mut Vec<Vec<Message>>) -> usize {
116 if snapshots.is_empty() {
117 return 0;
118 }
119
120 let original_len = snapshots.len();
121 let mut compacted = Vec::with_capacity(original_len);
122 let mut previous_encoded: Option<Vec<u8>> = None;
123
124 for snapshot in snapshots.drain(..) {
125 let encoded = serde_json::to_vec(&snapshot).unwrap_or_default();
126 if previous_encoded.as_ref() == Some(&encoded) {
127 continue;
128 }
129 previous_encoded = Some(encoded);
130 compacted.push(snapshot);
131 }
132
133 trim_session_snapshots(&mut compacted);
134 let removed = original_len.saturating_sub(compacted.len());
135 *snapshots = compacted;
136 removed
137}
138
139fn session_meta_is_empty(meta: &SessionMeta) -> bool {
140 meta.parent_id.is_none()
141 && !meta.archived
142 && !meta.shared
143 && meta.share_id.is_none()
144 && meta.summary.is_none()
145 && meta.snapshots.is_empty()
146 && meta.pre_revert.is_none()
147 && meta.todos.is_empty()
148}
149
150#[derive(Debug, Default)]
151struct SessionMetaCompactionStats {
152 metadata_pruned: u64,
153 snapshots_removed: u64,
154}
155
156fn compact_session_metadata(
157 sessions: &HashMap<String, Session>,
158 metadata: &mut HashMap<String, SessionMeta>,
159) -> SessionMetaCompactionStats {
160 let mut stats = SessionMetaCompactionStats::default();
161
162 metadata.retain(|session_id, meta| {
163 if !sessions.contains_key(session_id) {
164 stats.metadata_pruned += 1;
165 return false;
166 }
167
168 let removed = compact_session_snapshots(&mut meta.snapshots) as u64;
169 stats.snapshots_removed += removed;
170
171 if session_meta_is_empty(meta) {
172 stats.metadata_pruned += 1;
173 return false;
174 }
175
176 true
177 });
178
179 stats
180}
181
182impl Storage {
183 pub async fn new(base: impl AsRef<Path>) -> anyhow::Result<Self> {
184 let base = base.as_ref().to_path_buf();
185 fs::create_dir_all(&base).await?;
186 let sessions_file = base.join("sessions.json");
187 let marker_path = base.join(LEGACY_IMPORT_MARKER_FILE);
188 let sessions_file_exists = sessions_file.exists();
189 let mut imported_legacy_sessions = false;
190 let mut sessions = if sessions_file_exists {
191 let raw = fs::read_to_string(&sessions_file).await?;
192 serde_json::from_str::<HashMap<String, Session>>(&raw).unwrap_or_default()
193 } else {
194 HashMap::new()
195 };
196
197 let mut marker_to_write = None;
198 if should_run_legacy_scan_on_startup(&marker_path, sessions_file_exists).await {
199 let base_for_scan = base.clone();
200 let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
201 .await
202 .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
203 if merge_legacy_sessions(&mut sessions, scan.sessions) {
204 imported_legacy_sessions = true;
205 }
206 marker_to_write = Some(LegacyImportMarker {
207 version: LEGACY_IMPORT_MARKER_VERSION,
208 created_at_ms: now_ms_u64(),
209 last_checked_at_ms: now_ms_u64(),
210 legacy_counts: scan.legacy_counts,
211 imported_counts: scan.imported_counts,
212 });
213 }
214
215 if hydrate_workspace_roots(&mut sessions) {
216 imported_legacy_sessions = true;
217 }
218 if repair_session_titles(&mut sessions) {
219 imported_legacy_sessions = true;
220 }
221 let metadata_file = base.join("session_meta.json");
222 let mut metadata = if metadata_file.exists() {
223 let raw = fs::read_to_string(&metadata_file).await?;
224 serde_json::from_str::<HashMap<String, SessionMeta>>(&raw).unwrap_or_default()
225 } else {
226 HashMap::new()
227 };
228 let compaction = compact_session_metadata(&sessions, &mut metadata);
229 let metadata_compacted = compaction.metadata_pruned > 0 || compaction.snapshots_removed > 0;
230 if metadata_compacted {
231 tracing::info!(
232 metadata_pruned = compaction.metadata_pruned,
233 snapshots_removed = compaction.snapshots_removed,
234 "compacted persisted session metadata"
235 );
236 }
237 let questions_file = base.join("questions.json");
238 let question_requests = if questions_file.exists() {
239 let raw = fs::read_to_string(&questions_file).await?;
240 serde_json::from_str::<HashMap<String, QuestionRequest>>(&raw).unwrap_or_default()
241 } else {
242 HashMap::new()
243 };
244 let storage = Self {
245 base,
246 sessions: RwLock::new(sessions),
247 metadata: RwLock::new(metadata),
248 question_requests: RwLock::new(question_requests),
249 flush_lock: Mutex::new(()),
250 };
251
252 if imported_legacy_sessions || metadata_compacted {
253 storage.flush().await?;
254 }
255 if let Some(marker) = marker_to_write {
256 storage.write_legacy_import_marker(&marker).await?;
257 }
258
259 Ok(storage)
260 }
261
262 pub async fn list_sessions(&self) -> Vec<Session> {
263 self.list_sessions_scoped(SessionListScope::Global).await
264 }
265
266 pub async fn list_sessions_scoped(&self, scope: SessionListScope) -> Vec<Session> {
267 let all = self
268 .sessions
269 .read()
270 .await
271 .values()
272 .cloned()
273 .collect::<Vec<_>>();
274 match scope {
275 SessionListScope::Global => all,
276 SessionListScope::Workspace { workspace_root } => {
277 let Some(normalized_workspace) = normalize_workspace_path(&workspace_root) else {
278 return Vec::new();
279 };
280 all.into_iter()
281 .filter(|session| {
282 let direct = session
283 .workspace_root
284 .as_ref()
285 .and_then(|p| normalize_workspace_path(p))
286 .map(|p| p == normalized_workspace)
287 .unwrap_or(false);
288 if direct {
289 return true;
290 }
291 normalize_workspace_path(&session.directory)
292 .map(|p| p == normalized_workspace)
293 .unwrap_or(false)
294 })
295 .collect()
296 }
297 }
298 }
299
300 pub async fn get_session(&self, id: &str) -> Option<Session> {
301 self.sessions.read().await.get(id).cloned()
302 }
303
304 pub async fn save_session(&self, mut session: Session) -> anyhow::Result<()> {
305 if session.workspace_root.is_none() {
306 session.workspace_root = normalize_workspace_path(&session.directory);
307 }
308 let session_id = session.id.clone();
309 self.sessions
310 .write()
311 .await
312 .insert(session_id.clone(), session);
313 self.metadata
314 .write()
315 .await
316 .entry(session_id)
317 .or_insert_with(SessionMeta::default);
318 self.flush().await
319 }
320
321 pub async fn repair_sessions_from_file_store(&self) -> anyhow::Result<SessionRepairStats> {
322 let mut stats = SessionRepairStats::default();
323 let mut sessions = self.sessions.write().await;
324
325 for session in sessions.values_mut() {
326 let imported = load_legacy_session_messages(&self.base, &session.id);
327 if imported.is_empty() {
328 continue;
329 }
330
331 let (merged, merge_stats, changed) =
332 merge_session_messages(&session.messages, &imported);
333 if changed {
334 session.messages = merged;
335 session.time.updated =
336 most_recent_message_time(&session.messages).unwrap_or(session.time.updated);
337 stats.sessions_repaired += 1;
338 stats.messages_recovered += merge_stats.messages_recovered;
339 stats.parts_recovered += merge_stats.parts_recovered;
340 stats.conflicts_merged += merge_stats.conflicts_merged;
341 }
342 }
343
344 if stats.sessions_repaired > 0 {
345 drop(sessions);
346 self.flush().await?;
347 }
348
349 Ok(stats)
350 }
351
352 pub async fn run_legacy_storage_repair_scan(
353 &self,
354 force: bool,
355 ) -> anyhow::Result<LegacyRepairRunReport> {
356 let marker_path = self.base.join(LEGACY_IMPORT_MARKER_FILE);
357 let sessions_exists = self.base.join("sessions.json").exists();
358 let should_scan = if force {
359 true
360 } else {
361 should_run_legacy_scan_on_startup(&marker_path, sessions_exists).await
362 };
363 if !should_scan {
364 let marker = read_legacy_import_marker(&marker_path)
365 .await
366 .unwrap_or_else(|| LegacyImportMarker {
367 version: LEGACY_IMPORT_MARKER_VERSION,
368 created_at_ms: now_ms_u64(),
369 last_checked_at_ms: now_ms_u64(),
370 legacy_counts: LegacyTreeCounts::default(),
371 imported_counts: LegacyImportedCounts::default(),
372 });
373 return Ok(LegacyRepairRunReport {
374 status: "skipped".to_string(),
375 marker_updated: false,
376 sessions_merged: 0,
377 messages_recovered: 0,
378 parts_recovered: 0,
379 legacy_counts: marker.legacy_counts,
380 imported_counts: marker.imported_counts,
381 });
382 }
383
384 let base_for_scan = self.base.clone();
385 let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
386 .await
387 .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
388
389 let merge_stats = {
390 let mut sessions = self.sessions.write().await;
391 merge_legacy_sessions_with_stats(&mut sessions, scan.sessions)
392 };
393
394 if merge_stats.changed {
395 self.flush().await?;
396 }
397
398 let marker = LegacyImportMarker {
399 version: LEGACY_IMPORT_MARKER_VERSION,
400 created_at_ms: now_ms_u64(),
401 last_checked_at_ms: now_ms_u64(),
402 legacy_counts: scan.legacy_counts.clone(),
403 imported_counts: scan.imported_counts.clone(),
404 };
405 self.write_legacy_import_marker(&marker).await?;
406
407 Ok(LegacyRepairRunReport {
408 status: if merge_stats.changed {
409 "updated".to_string()
410 } else {
411 "no_changes".to_string()
412 },
413 marker_updated: true,
414 sessions_merged: merge_stats.sessions_merged,
415 messages_recovered: merge_stats.messages_recovered,
416 parts_recovered: merge_stats.parts_recovered,
417 legacy_counts: scan.legacy_counts,
418 imported_counts: scan.imported_counts,
419 })
420 }
421
422 pub async fn delete_session(&self, id: &str) -> anyhow::Result<bool> {
423 let removed = self.sessions.write().await.remove(id).is_some();
424 self.metadata.write().await.remove(id);
425 self.question_requests
426 .write()
427 .await
428 .retain(|_, request| request.session_id != id);
429 if removed {
430 self.flush().await?;
431 }
432 Ok(removed)
433 }
434
435 pub async fn append_message(&self, session_id: &str, msg: Message) -> anyhow::Result<()> {
436 let mut sessions = self.sessions.write().await;
437 let session = sessions
438 .get_mut(session_id)
439 .context("session not found for append_message")?;
440 session.messages.push(msg);
441 session.time.updated = Utc::now();
442 drop(sessions);
443 self.flush().await
444 }
445
446 pub async fn append_message_part(
447 &self,
448 session_id: &str,
449 message_id: &str,
450 part: MessagePart,
451 ) -> anyhow::Result<()> {
452 let mut sessions = self.sessions.write().await;
453 let session = sessions
454 .get_mut(session_id)
455 .context("session not found for append_message_part")?;
456 let message = if let Some(message) = session
457 .messages
458 .iter_mut()
459 .find(|message| message.id == message_id)
460 {
461 message
462 } else {
463 session
464 .messages
465 .iter_mut()
466 .rev()
467 .find(|message| matches!(message.role, MessageRole::User))
468 .context("message not found for append_message_part")?
469 };
470 reduce_message_parts(&mut message.parts, part);
471 session.time.updated = Utc::now();
472 drop(sessions);
473 self.flush().await
474 }
475
476 pub async fn fork_session(&self, id: &str) -> anyhow::Result<Option<Session>> {
477 let source = {
478 let sessions = self.sessions.read().await;
479 sessions.get(id).cloned()
480 };
481 let Some(mut child) = source else {
482 return Ok(None);
483 };
484
485 child.id = Uuid::new_v4().to_string();
486 child.title = format!("{} (fork)", child.title);
487 child.time.created = Utc::now();
488 child.time.updated = child.time.created;
489 child.slug = None;
490
491 self.sessions
492 .write()
493 .await
494 .insert(child.id.clone(), child.clone());
495 self.metadata.write().await.insert(
496 child.id.clone(),
497 SessionMeta {
498 parent_id: Some(id.to_string()),
499 snapshots: vec![child.messages.clone()],
500 ..SessionMeta::default()
501 },
502 );
503 self.flush().await?;
504 Ok(Some(child))
505 }
506
507 pub async fn revert_session(&self, id: &str) -> anyhow::Result<bool> {
508 let mut sessions = self.sessions.write().await;
509 let Some(session) = sessions.get_mut(id) else {
510 return Ok(false);
511 };
512 let mut metadata = self.metadata.write().await;
513 let meta = metadata
514 .entry(id.to_string())
515 .or_insert_with(SessionMeta::default);
516 let Some(snapshot) = meta.snapshots.pop() else {
517 return Ok(false);
518 };
519 meta.pre_revert = Some(session.messages.clone());
520 session.messages = snapshot;
521 session.time.updated = Utc::now();
522 drop(metadata);
523 drop(sessions);
524 self.flush().await?;
525 Ok(true)
526 }
527
528 pub async fn unrevert_session(&self, id: &str) -> anyhow::Result<bool> {
529 let mut sessions = self.sessions.write().await;
530 let Some(session) = sessions.get_mut(id) else {
531 return Ok(false);
532 };
533 let mut metadata = self.metadata.write().await;
534 let Some(meta) = metadata.get_mut(id) else {
535 return Ok(false);
536 };
537 let Some(previous) = meta.pre_revert.take() else {
538 return Ok(false);
539 };
540 meta.snapshots.push(session.messages.clone());
541 trim_session_snapshots(&mut meta.snapshots);
542 session.messages = previous;
543 session.time.updated = Utc::now();
544 drop(metadata);
545 drop(sessions);
546 self.flush().await?;
547 Ok(true)
548 }
549
550 pub async fn set_shared(&self, id: &str, shared: bool) -> anyhow::Result<Option<String>> {
551 let mut metadata = self.metadata.write().await;
552 let meta = metadata
553 .entry(id.to_string())
554 .or_insert_with(SessionMeta::default);
555 meta.shared = shared;
556 if shared {
557 if meta.share_id.is_none() {
558 meta.share_id = Some(Uuid::new_v4().to_string());
559 }
560 } else {
561 meta.share_id = None;
562 }
563 let share_id = meta.share_id.clone();
564 drop(metadata);
565 self.flush().await?;
566 Ok(share_id)
567 }
568
569 pub async fn set_archived(&self, id: &str, archived: bool) -> anyhow::Result<bool> {
570 let mut metadata = self.metadata.write().await;
571 let meta = metadata
572 .entry(id.to_string())
573 .or_insert_with(SessionMeta::default);
574 meta.archived = archived;
575 drop(metadata);
576 self.flush().await?;
577 Ok(true)
578 }
579
580 pub async fn set_summary(&self, id: &str, summary: String) -> anyhow::Result<bool> {
581 let mut metadata = self.metadata.write().await;
582 let meta = metadata
583 .entry(id.to_string())
584 .or_insert_with(SessionMeta::default);
585 meta.summary = Some(summary);
586 drop(metadata);
587 self.flush().await?;
588 Ok(true)
589 }
590
591 pub async fn children(&self, parent_id: &str) -> Vec<Session> {
592 let child_ids = {
593 let metadata = self.metadata.read().await;
594 metadata
595 .iter()
596 .filter(|(_, meta)| meta.parent_id.as_deref() == Some(parent_id))
597 .map(|(id, _)| id.clone())
598 .collect::<Vec<_>>()
599 };
600 let sessions = self.sessions.read().await;
601 child_ids
602 .into_iter()
603 .filter_map(|id| sessions.get(&id).cloned())
604 .collect()
605 }
606
607 pub async fn session_status(&self, id: &str) -> Option<Value> {
608 let metadata = self.metadata.read().await;
609 metadata.get(id).map(|meta| {
610 json!({
611 "archived": meta.archived,
612 "shared": meta.shared,
613 "parentID": meta.parent_id,
614 "snapshotCount": meta.snapshots.len()
615 })
616 })
617 }
618
619 pub async fn session_diff(&self, id: &str) -> Option<Value> {
620 let sessions = self.sessions.read().await;
621 let current = sessions.get(id)?;
622 let metadata = self.metadata.read().await;
623 let default = SessionMeta::default();
624 let meta = metadata.get(id).unwrap_or(&default);
625 let last_snapshot_len = meta.snapshots.last().map(|s| s.len()).unwrap_or(0);
626 Some(json!({
627 "sessionID": id,
628 "currentMessageCount": current.messages.len(),
629 "lastSnapshotMessageCount": last_snapshot_len,
630 "delta": current.messages.len() as i64 - last_snapshot_len as i64
631 }))
632 }
633
634 pub async fn set_todos(&self, id: &str, todos: Vec<Value>) -> anyhow::Result<()> {
635 let mut metadata = self.metadata.write().await;
636 let meta = metadata
637 .entry(id.to_string())
638 .or_insert_with(SessionMeta::default);
639 meta.todos = normalize_todo_items(todos);
640 drop(metadata);
641 self.flush().await
642 }
643
644 pub async fn get_todos(&self, id: &str) -> Vec<Value> {
645 let todos = self
646 .metadata
647 .read()
648 .await
649 .get(id)
650 .map(|meta| meta.todos.clone())
651 .unwrap_or_default();
652 normalize_todo_items(todos)
653 }
654
655 pub async fn add_question_request(
656 &self,
657 session_id: &str,
658 message_id: &str,
659 questions: Vec<Value>,
660 ) -> anyhow::Result<QuestionRequest> {
661 if questions.is_empty() {
662 return Err(anyhow::anyhow!(
663 "cannot add empty question request for session {}",
664 session_id
665 ));
666 }
667 let request = QuestionRequest {
668 id: format!("q-{}", Uuid::new_v4()),
669 session_id: session_id.to_string(),
670 questions,
671 tool: Some(QuestionToolRef {
672 call_id: format!("call-{}", Uuid::new_v4()),
673 message_id: message_id.to_string(),
674 }),
675 };
676 self.question_requests
677 .write()
678 .await
679 .insert(request.id.clone(), request.clone());
680 self.flush().await?;
681 Ok(request)
682 }
683
684 pub async fn list_question_requests(&self) -> Vec<QuestionRequest> {
685 self.question_requests
686 .read()
687 .await
688 .values()
689 .cloned()
690 .collect()
691 }
692
693 pub async fn reply_question(&self, request_id: &str) -> anyhow::Result<bool> {
694 let removed = self
695 .question_requests
696 .write()
697 .await
698 .remove(request_id)
699 .is_some();
700 if removed {
701 self.flush().await?;
702 }
703 Ok(removed)
704 }
705
706 pub async fn reject_question(&self, request_id: &str) -> anyhow::Result<bool> {
707 self.reply_question(request_id).await
708 }
709
710 pub async fn attach_session_to_workspace(
711 &self,
712 session_id: &str,
713 target_workspace: &str,
714 reason_tag: &str,
715 ) -> anyhow::Result<Option<Session>> {
716 let Some(target_workspace) = normalize_workspace_path(target_workspace) else {
717 return Ok(None);
718 };
719 let mut sessions = self.sessions.write().await;
720 let Some(session) = sessions.get_mut(session_id) else {
721 return Ok(None);
722 };
723 let previous_workspace = session
724 .workspace_root
725 .clone()
726 .or_else(|| normalize_workspace_path(&session.directory));
727
728 if session.origin_workspace_root.is_none() {
729 session.origin_workspace_root = previous_workspace.clone();
730 }
731 session.attached_from_workspace = previous_workspace;
732 session.attached_to_workspace = Some(target_workspace.clone());
733 session.attach_timestamp_ms = Some(Utc::now().timestamp_millis().max(0) as u64);
734 session.attach_reason = Some(reason_tag.trim().to_string());
735 session.workspace_root = Some(target_workspace.clone());
736 session.project_id = workspace_project_id(&target_workspace);
737 session.directory = target_workspace;
738 session.time.updated = Utc::now();
739 let updated = session.clone();
740 drop(sessions);
741 self.flush().await?;
742 Ok(Some(updated))
743 }
744
745 async fn flush(&self) -> anyhow::Result<()> {
746 let _flush_guard = self.flush_lock.lock().await;
747 {
748 let snapshot = self.sessions.read().await.clone();
749 self.flush_file("sessions.json", &snapshot).await?;
750 }
751 {
752 let metadata_snapshot = self.metadata.read().await.clone();
753 self.flush_file("session_meta.json", &metadata_snapshot)
754 .await?;
755 }
756 {
757 let questions_snapshot = self.question_requests.read().await.clone();
758 self.flush_file("questions.json", &questions_snapshot)
759 .await?;
760 }
761 Ok(())
762 }
763
764 async fn flush_file(&self, filename: &str, data: &impl serde::Serialize) -> anyhow::Result<()> {
765 let path = self.base.join(filename);
766 let temp_path = self.base.join(format!("{}.tmp", filename));
767 let payload = serde_json::to_string_pretty(data)?;
768 fs::write(&temp_path, payload).await.with_context(|| {
769 format!("failed to write temp storage file {}", temp_path.display())
770 })?;
771 let std_temp_path: std::path::PathBuf = temp_path.clone().try_into()?;
772 tokio::task::spawn_blocking(move || {
773 let file = std::fs::File::open(&std_temp_path)?;
774 file.sync_all()?;
775 Ok::<(), std::io::Error>(())
776 })
777 .await??;
778 commit_temp_file(&temp_path, &path).await.with_context(|| {
779 format!(
780 "failed to atomically replace storage file {} with {}",
781 path.display(),
782 temp_path.display()
783 )
784 })?;
785 Ok(())
786 }
787
788 async fn write_legacy_import_marker(&self, marker: &LegacyImportMarker) -> anyhow::Result<()> {
789 let payload = serde_json::to_string_pretty(marker)?;
790 fs::write(self.base.join(LEGACY_IMPORT_MARKER_FILE), payload).await?;
791 Ok(())
792 }
793}
794
795async fn commit_temp_file(temp_path: &Path, path: &Path) -> std::io::Result<()> {
796 match tokio::fs::rename(temp_path, path).await {
797 Ok(()) => Ok(()),
798 Err(err) => {
799 #[cfg(windows)]
800 {
801 use std::io::ErrorKind;
804 if matches!(
805 err.kind(),
806 ErrorKind::PermissionDenied | ErrorKind::AlreadyExists
807 ) {
808 match tokio::fs::remove_file(path).await {
809 Ok(()) => {}
810 Err(remove_err) if remove_err.kind() == ErrorKind::NotFound => {}
811 Err(remove_err) => return Err(remove_err),
812 }
813 return tokio::fs::rename(temp_path, path).await;
814 }
815 }
816 Err(err)
817 }
818 }
819}
820
821fn normalize_todo_items(items: Vec<Value>) -> Vec<Value> {
822 items
823 .into_iter()
824 .filter_map(|item| {
825 let obj = item.as_object()?;
826 let content = obj
827 .get("content")
828 .and_then(|v| v.as_str())
829 .or_else(|| obj.get("text").and_then(|v| v.as_str()))
830 .unwrap_or("")
831 .trim()
832 .to_string();
833 if content.is_empty() {
834 return None;
835 }
836 let id = obj
837 .get("id")
838 .and_then(|v| v.as_str())
839 .filter(|s| !s.trim().is_empty())
840 .map(ToString::to_string)
841 .unwrap_or_else(|| format!("todo-{}", Uuid::new_v4()));
842 let status = obj
843 .get("status")
844 .and_then(|v| v.as_str())
845 .filter(|s| !s.trim().is_empty())
846 .map(ToString::to_string)
847 .unwrap_or_else(|| "pending".to_string());
848 Some(json!({
849 "id": id,
850 "content": content,
851 "status": status
852 }))
853 })
854 .collect()
855}
856
857#[derive(Debug)]
858struct LegacyScanResult {
859 sessions: HashMap<String, Session>,
860 legacy_counts: LegacyTreeCounts,
861 imported_counts: LegacyImportedCounts,
862}
863
864#[derive(Debug, Default)]
865struct LegacyMergeStats {
866 changed: bool,
867 sessions_merged: u64,
868 messages_recovered: u64,
869 parts_recovered: u64,
870}
871
872fn now_ms_u64() -> u64 {
873 Utc::now().timestamp_millis().max(0) as u64
874}
875
876async fn should_run_legacy_scan_on_startup(marker_path: &Path, sessions_exist: bool) -> bool {
877 if !sessions_exist {
878 return true;
879 }
880 if read_legacy_import_marker(marker_path).await.is_none() {
883 return false;
884 }
885 false
886}
887
888async fn read_legacy_import_marker(marker_path: &Path) -> Option<LegacyImportMarker> {
889 let raw = fs::read_to_string(marker_path).await.ok()?;
890 serde_json::from_str::<LegacyImportMarker>(&raw).ok()
891}
892
893fn scan_legacy_sessions(base: &Path) -> anyhow::Result<LegacyScanResult> {
894 let sessions = load_legacy_opencode_sessions(base).unwrap_or_default();
895 let imported_counts = LegacyImportedCounts {
896 sessions: sessions.len() as u64,
897 messages: sessions.values().map(|s| s.messages.len() as u64).sum(),
898 parts: sessions
899 .values()
900 .flat_map(|s| s.messages.iter())
901 .map(|m| m.parts.len() as u64)
902 .sum(),
903 };
904 let legacy_counts = LegacyTreeCounts {
905 session_files: count_legacy_json_files(&base.join("session")),
906 message_files: count_legacy_json_files(&base.join("message")),
907 part_files: count_legacy_json_files(&base.join("part")),
908 };
909 Ok(LegacyScanResult {
910 sessions,
911 legacy_counts,
912 imported_counts,
913 })
914}
915
916fn count_legacy_json_files(root: &Path) -> u64 {
917 if !root.is_dir() {
918 return 0;
919 }
920 let mut count = 0u64;
921 let mut stack = vec![root.to_path_buf()];
922 while let Some(dir) = stack.pop() {
923 if let Ok(entries) = std::fs::read_dir(&dir) {
924 for entry in entries.flatten() {
925 let path = entry.path();
926 if entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
927 stack.push(path);
928 continue;
929 }
930 if path.extension().and_then(|ext| ext.to_str()) == Some("json") {
931 count += 1;
932 }
933 }
934 }
935 }
936 count
937}
938
939fn merge_legacy_sessions(
940 current: &mut HashMap<String, Session>,
941 imported: HashMap<String, Session>,
942) -> bool {
943 merge_legacy_sessions_with_stats(current, imported).changed
944}
945
946fn merge_legacy_sessions_with_stats(
947 current: &mut HashMap<String, Session>,
948 imported: HashMap<String, Session>,
949) -> LegacyMergeStats {
950 let mut stats = LegacyMergeStats::default();
951 for (id, legacy) in imported {
952 let legacy_message_count = legacy.messages.len() as u64;
953 let legacy_part_count = legacy
954 .messages
955 .iter()
956 .map(|m| m.parts.len() as u64)
957 .sum::<u64>();
958 match current.get_mut(&id) {
959 None => {
960 current.insert(id, legacy);
961 stats.changed = true;
962 stats.sessions_merged += 1;
963 stats.messages_recovered += legacy_message_count;
964 stats.parts_recovered += legacy_part_count;
965 }
966 Some(existing) => {
967 let should_merge_messages =
968 existing.messages.is_empty() && !legacy.messages.is_empty();
969 let should_fill_title =
970 existing.title.trim().is_empty() && !legacy.title.trim().is_empty();
971 let should_fill_directory = (existing.directory.trim().is_empty()
972 || existing.directory.trim() == "."
973 || existing.directory.trim() == "./"
974 || existing.directory.trim() == ".\\")
975 && !legacy.directory.trim().is_empty();
976 let should_fill_workspace =
977 existing.workspace_root.is_none() && legacy.workspace_root.is_some();
978 if should_merge_messages {
979 existing.messages = legacy.messages.clone();
980 }
981 if should_fill_title {
982 existing.title = legacy.title.clone();
983 }
984 if should_fill_directory {
985 existing.directory = legacy.directory.clone();
986 }
987 if should_fill_workspace {
988 existing.workspace_root = legacy.workspace_root.clone();
989 }
990 if should_merge_messages
991 || should_fill_title
992 || should_fill_directory
993 || should_fill_workspace
994 {
995 stats.changed = true;
996 if should_merge_messages {
997 stats.sessions_merged += 1;
998 stats.messages_recovered += legacy_message_count;
999 stats.parts_recovered += legacy_part_count;
1000 }
1001 }
1002 }
1003 }
1004 }
1005 stats
1006}
1007
1008fn hydrate_workspace_roots(sessions: &mut HashMap<String, Session>) -> bool {
1009 let mut changed = false;
1010 for session in sessions.values_mut() {
1011 if session.workspace_root.is_none() {
1012 let normalized = normalize_workspace_path(&session.directory);
1013 if normalized.is_some() {
1014 session.workspace_root = normalized;
1015 changed = true;
1016 }
1017 }
1018 }
1019 changed
1020}
1021
1022fn repair_session_titles(sessions: &mut HashMap<String, Session>) -> bool {
1023 let mut changed = false;
1024 for session in sessions.values_mut() {
1025 if !title_needs_repair(&session.title) {
1026 continue;
1027 }
1028 let first_user_text = session.messages.iter().find_map(|message| {
1029 if !matches!(message.role, MessageRole::User) {
1030 return None;
1031 }
1032 message.parts.iter().find_map(|part| match part {
1033 MessagePart::Text { text } if !text.trim().is_empty() => Some(text.as_str()),
1034 _ => None,
1035 })
1036 });
1037 let Some(source) = first_user_text else {
1038 continue;
1039 };
1040 let Some(derived) = derive_session_title_from_prompt(source, 60) else {
1041 continue;
1042 };
1043 if derived == session.title {
1044 continue;
1045 }
1046 session.title = derived;
1047 session.time.updated = Utc::now();
1048 changed = true;
1049 }
1050 changed
1051}
1052
1053#[derive(Debug, Deserialize)]
1054struct LegacySessionTime {
1055 created: i64,
1056 updated: i64,
1057}
1058
1059#[derive(Debug, Deserialize)]
1060struct LegacySession {
1061 id: String,
1062 slug: Option<String>,
1063 version: Option<String>,
1064 #[serde(rename = "projectID")]
1065 project_id: Option<String>,
1066 title: Option<String>,
1067 directory: Option<String>,
1068 time: LegacySessionTime,
1069}
1070
1071fn load_legacy_opencode_sessions(base: &Path) -> anyhow::Result<HashMap<String, Session>> {
1072 let legacy_root = base.join("session");
1073 if !legacy_root.is_dir() {
1074 return Ok(HashMap::new());
1075 }
1076
1077 let mut out = HashMap::new();
1078 let mut stack = vec![legacy_root];
1079 while let Some(dir) = stack.pop() {
1080 for entry in std::fs::read_dir(&dir)? {
1081 let entry = entry?;
1082 let path = entry.path();
1083 if entry.file_type()?.is_dir() {
1084 stack.push(path);
1085 continue;
1086 }
1087 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1088 continue;
1089 }
1090 let raw = match std::fs::read_to_string(&path) {
1091 Ok(v) => v,
1092 Err(_) => continue,
1093 };
1094 let legacy = match serde_json::from_str::<LegacySession>(&raw) {
1095 Ok(v) => v,
1096 Err(_) => continue,
1097 };
1098 let created = Utc
1099 .timestamp_millis_opt(legacy.time.created)
1100 .single()
1101 .unwrap_or_else(Utc::now);
1102 let updated = Utc
1103 .timestamp_millis_opt(legacy.time.updated)
1104 .single()
1105 .unwrap_or(created);
1106
1107 let session_id = legacy.id.clone();
1108 out.insert(
1109 session_id.clone(),
1110 Session {
1111 id: session_id.clone(),
1112 slug: legacy.slug,
1113 version: legacy.version,
1114 project_id: legacy.project_id,
1115 title: legacy
1116 .title
1117 .filter(|s| !s.trim().is_empty())
1118 .unwrap_or_else(|| "New session".to_string()),
1119 directory: legacy
1120 .directory
1121 .clone()
1122 .filter(|s| !s.trim().is_empty())
1123 .unwrap_or_else(|| ".".to_string()),
1124 workspace_root: legacy
1125 .directory
1126 .as_deref()
1127 .and_then(normalize_workspace_path),
1128 origin_workspace_root: None,
1129 attached_from_workspace: None,
1130 attached_to_workspace: None,
1131 attach_timestamp_ms: None,
1132 attach_reason: None,
1133 tenant_context: tandem_types::LocalImplicitTenant.into(),
1134 time: tandem_types::SessionTime { created, updated },
1135 model: None,
1136 provider: None,
1137 environment: None,
1138 messages: load_legacy_session_messages(base, &session_id),
1139 },
1140 );
1141 }
1142 }
1143 Ok(out)
1144}
1145
1146#[derive(Debug, Deserialize)]
1147struct LegacyMessageTime {
1148 created: i64,
1149}
1150
1151#[derive(Debug, Deserialize)]
1152struct LegacyMessage {
1153 id: String,
1154 role: String,
1155 time: LegacyMessageTime,
1156}
1157
1158#[derive(Debug, Deserialize)]
1159struct LegacyPart {
1160 #[serde(rename = "type")]
1161 part_type: Option<String>,
1162 text: Option<String>,
1163 tool: Option<String>,
1164 args: Option<Value>,
1165 result: Option<Value>,
1166 error: Option<String>,
1167}
1168
1169fn load_legacy_session_messages(base: &Path, session_id: &str) -> Vec<Message> {
1170 let msg_dir = base.join("message").join(session_id);
1171 if !msg_dir.is_dir() {
1172 return Vec::new();
1173 }
1174
1175 let mut legacy_messages: Vec<(i64, Message)> = Vec::new();
1176
1177 let Ok(entries) = std::fs::read_dir(&msg_dir) else {
1178 return Vec::new();
1179 };
1180
1181 for entry in entries.flatten() {
1182 let path = entry.path();
1183 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1184 continue;
1185 }
1186 let Ok(raw) = std::fs::read_to_string(&path) else {
1187 continue;
1188 };
1189 let Ok(legacy) = serde_json::from_str::<LegacyMessage>(&raw) else {
1190 continue;
1191 };
1192
1193 let created_at = Utc
1194 .timestamp_millis_opt(legacy.time.created)
1195 .single()
1196 .unwrap_or_else(Utc::now);
1197
1198 legacy_messages.push((
1199 legacy.time.created,
1200 Message {
1201 id: legacy.id.clone(),
1202 role: legacy_role_to_message_role(&legacy.role),
1203 parts: load_legacy_message_parts(base, &legacy.id),
1204 created_at,
1205 },
1206 ));
1207 }
1208
1209 legacy_messages.sort_by_key(|(created_ms, _)| *created_ms);
1210 legacy_messages.into_iter().map(|(_, msg)| msg).collect()
1211}
1212
1213fn load_legacy_message_parts(base: &Path, message_id: &str) -> Vec<MessagePart> {
1214 let parts_dir = base.join("part").join(message_id);
1215 if !parts_dir.is_dir() {
1216 return Vec::new();
1217 }
1218
1219 let Ok(entries) = std::fs::read_dir(&parts_dir) else {
1220 return Vec::new();
1221 };
1222
1223 let mut out = Vec::new();
1224 for entry in entries.flatten() {
1225 let path = entry.path();
1226 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1227 continue;
1228 }
1229 let Ok(raw) = std::fs::read_to_string(&path) else {
1230 continue;
1231 };
1232 let Ok(part) = serde_json::from_str::<LegacyPart>(&raw) else {
1233 continue;
1234 };
1235
1236 let mapped = if let Some(tool) = part.tool {
1237 Some(MessagePart::ToolInvocation {
1238 tool,
1239 args: part.args.unwrap_or_else(|| json!({})),
1240 result: part.result,
1241 error: part.error,
1242 })
1243 } else {
1244 match part.part_type.as_deref() {
1245 Some("reasoning") => Some(MessagePart::Reasoning {
1246 text: part.text.unwrap_or_default(),
1247 }),
1248 Some("tool") => Some(MessagePart::ToolInvocation {
1249 tool: "tool".to_string(),
1250 args: part.args.unwrap_or_else(|| json!({})),
1251 result: part.result,
1252 error: part.error,
1253 }),
1254 Some("text") | None => Some(MessagePart::Text {
1255 text: part.text.unwrap_or_default(),
1256 }),
1257 _ => None,
1258 }
1259 };
1260
1261 if let Some(part) = mapped {
1262 out.push(part);
1263 }
1264 }
1265 out
1266}
1267
1268fn legacy_role_to_message_role(role: &str) -> MessageRole {
1269 match role.to_lowercase().as_str() {
1270 "user" => MessageRole::User,
1271 "assistant" => MessageRole::Assistant,
1272 "system" => MessageRole::System,
1273 "tool" => MessageRole::Tool,
1274 _ => MessageRole::Assistant,
1275 }
1276}
1277
1278#[derive(Debug, Clone, Default)]
1279struct MessageMergeStats {
1280 messages_recovered: u64,
1281 parts_recovered: u64,
1282 conflicts_merged: u64,
1283}
1284
1285fn message_richness(msg: &Message) -> usize {
1286 msg.parts
1287 .iter()
1288 .map(|p| match p {
1289 MessagePart::Text { text } | MessagePart::Reasoning { text } => {
1290 if text.trim().is_empty() {
1291 0
1292 } else {
1293 1
1294 }
1295 }
1296 MessagePart::ToolInvocation { result, error, .. } => {
1297 if result.is_some() || error.is_some() {
1298 2
1299 } else {
1300 1
1301 }
1302 }
1303 })
1304 .sum()
1305}
1306
1307fn most_recent_message_time(messages: &[Message]) -> Option<chrono::DateTime<Utc>> {
1308 messages.iter().map(|m| m.created_at).max()
1309}
1310
1311fn merge_session_messages(
1312 existing: &[Message],
1313 imported: &[Message],
1314) -> (Vec<Message>, MessageMergeStats, bool) {
1315 if existing.is_empty() {
1316 let messages_recovered = imported.len() as u64;
1317 let parts_recovered = imported.iter().map(|m| m.parts.len() as u64).sum();
1318 return (
1319 imported.to_vec(),
1320 MessageMergeStats {
1321 messages_recovered,
1322 parts_recovered,
1323 conflicts_merged: 0,
1324 },
1325 true,
1326 );
1327 }
1328
1329 let mut merged_by_id: HashMap<String, Message> = existing
1330 .iter()
1331 .cloned()
1332 .map(|m| (m.id.clone(), m))
1333 .collect();
1334 let mut stats = MessageMergeStats::default();
1335 let mut changed = false;
1336
1337 for incoming in imported {
1338 match merged_by_id.get(&incoming.id) {
1339 None => {
1340 merged_by_id.insert(incoming.id.clone(), incoming.clone());
1341 stats.messages_recovered += 1;
1342 stats.parts_recovered += incoming.parts.len() as u64;
1343 changed = true;
1344 }
1345 Some(current) => {
1346 let incoming_richer = message_richness(incoming) > message_richness(current)
1347 || incoming.parts.len() > current.parts.len();
1348 if incoming_richer {
1349 merged_by_id.insert(incoming.id.clone(), incoming.clone());
1350 stats.conflicts_merged += 1;
1351 changed = true;
1352 }
1353 }
1354 }
1355 }
1356
1357 let mut out: Vec<Message> = merged_by_id.into_values().collect();
1358 out.sort_by_key(|m| m.created_at);
1359 (out, stats, changed)
1360}