1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use anyhow::Context;
5use chrono::{TimeZone, Utc};
6use serde::{Deserialize, Serialize};
7use serde_json::{json, Value};
8use tokio::fs;
9use tokio::sync::RwLock;
10use tokio::task;
11use uuid::Uuid;
12
13use tandem_types::{Message, MessagePart, MessageRole, Session};
14
15use crate::{derive_session_title_from_prompt, normalize_workspace_path, title_needs_repair};
16
17#[derive(Debug, Clone, Serialize, Deserialize, Default)]
18pub struct SessionMeta {
19 pub parent_id: Option<String>,
20 #[serde(default)]
21 pub archived: bool,
22 #[serde(default)]
23 pub shared: bool,
24 pub share_id: Option<String>,
25 pub summary: Option<String>,
26 #[serde(default)]
27 pub snapshots: Vec<Vec<Message>>,
28 pub pre_revert: Option<Vec<Message>>,
29 #[serde(default)]
30 pub todos: Vec<Value>,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct QuestionToolRef {
35 #[serde(rename = "callID")]
36 pub call_id: String,
37 #[serde(rename = "messageID")]
38 pub message_id: String,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct QuestionRequest {
43 pub id: String,
44 #[serde(rename = "sessionID")]
45 pub session_id: String,
46 #[serde(default)]
47 pub questions: Vec<Value>,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 pub tool: Option<QuestionToolRef>,
50}
51
52pub struct Storage {
53 base: PathBuf,
54 sessions: RwLock<HashMap<String, Session>>,
55 metadata: RwLock<HashMap<String, SessionMeta>>,
56 question_requests: RwLock<HashMap<String, QuestionRequest>>,
57}
58
59#[derive(Debug, Clone)]
60pub enum SessionListScope {
61 Global,
62 Workspace { workspace_root: String },
63}
64
65#[derive(Debug, Clone, Default, Serialize, Deserialize)]
66pub struct SessionRepairStats {
67 pub sessions_repaired: u64,
68 pub messages_recovered: u64,
69 pub parts_recovered: u64,
70 pub conflicts_merged: u64,
71}
72
73const LEGACY_IMPORT_MARKER_FILE: &str = "legacy_import_marker.json";
74const LEGACY_IMPORT_MARKER_VERSION: u32 = 1;
75
76#[derive(Debug, Clone, Default, Serialize, Deserialize)]
77pub struct LegacyTreeCounts {
78 pub session_files: u64,
79 pub message_files: u64,
80 pub part_files: u64,
81}
82
83#[derive(Debug, Clone, Default, Serialize, Deserialize)]
84pub struct LegacyImportedCounts {
85 pub sessions: u64,
86 pub messages: u64,
87 pub parts: u64,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct LegacyImportMarker {
92 pub version: u32,
93 pub created_at_ms: u64,
94 pub last_checked_at_ms: u64,
95 pub legacy_counts: LegacyTreeCounts,
96 pub imported_counts: LegacyImportedCounts,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct LegacyRepairRunReport {
101 pub status: String,
102 pub marker_updated: bool,
103 pub sessions_merged: u64,
104 pub messages_recovered: u64,
105 pub parts_recovered: u64,
106 pub legacy_counts: LegacyTreeCounts,
107 pub imported_counts: LegacyImportedCounts,
108}
109
110impl Storage {
111 pub async fn new(base: impl AsRef<Path>) -> anyhow::Result<Self> {
112 let base = base.as_ref().to_path_buf();
113 fs::create_dir_all(&base).await?;
114 let sessions_file = base.join("sessions.json");
115 let marker_path = base.join(LEGACY_IMPORT_MARKER_FILE);
116 let sessions_file_exists = sessions_file.exists();
117 let mut imported_legacy_sessions = false;
118 let mut sessions = if sessions_file_exists {
119 let raw = fs::read_to_string(&sessions_file).await?;
120 serde_json::from_str::<HashMap<String, Session>>(&raw).unwrap_or_default()
121 } else {
122 HashMap::new()
123 };
124
125 let mut marker_to_write = None;
126 if should_run_legacy_scan_on_startup(&marker_path, sessions_file_exists).await {
127 let base_for_scan = base.clone();
128 let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
129 .await
130 .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
131 if merge_legacy_sessions(&mut sessions, scan.sessions) {
132 imported_legacy_sessions = true;
133 }
134 marker_to_write = Some(LegacyImportMarker {
135 version: LEGACY_IMPORT_MARKER_VERSION,
136 created_at_ms: now_ms_u64(),
137 last_checked_at_ms: now_ms_u64(),
138 legacy_counts: scan.legacy_counts,
139 imported_counts: scan.imported_counts,
140 });
141 }
142
143 if hydrate_workspace_roots(&mut sessions) {
144 imported_legacy_sessions = true;
145 }
146 if repair_session_titles(&mut sessions) {
147 imported_legacy_sessions = true;
148 }
149 let metadata_file = base.join("session_meta.json");
150 let metadata = if metadata_file.exists() {
151 let raw = fs::read_to_string(&metadata_file).await?;
152 serde_json::from_str::<HashMap<String, SessionMeta>>(&raw).unwrap_or_default()
153 } else {
154 HashMap::new()
155 };
156 let questions_file = base.join("questions.json");
157 let question_requests = if questions_file.exists() {
158 let raw = fs::read_to_string(&questions_file).await?;
159 serde_json::from_str::<HashMap<String, QuestionRequest>>(&raw).unwrap_or_default()
160 } else {
161 HashMap::new()
162 };
163 let storage = Self {
164 base,
165 sessions: RwLock::new(sessions),
166 metadata: RwLock::new(metadata),
167 question_requests: RwLock::new(question_requests),
168 };
169
170 if imported_legacy_sessions {
171 storage.flush().await?;
172 }
173 if let Some(marker) = marker_to_write {
174 storage.write_legacy_import_marker(&marker).await?;
175 }
176
177 Ok(storage)
178 }
179
180 pub async fn list_sessions(&self) -> Vec<Session> {
181 self.list_sessions_scoped(SessionListScope::Global).await
182 }
183
184 pub async fn list_sessions_scoped(&self, scope: SessionListScope) -> Vec<Session> {
185 let all = self
186 .sessions
187 .read()
188 .await
189 .values()
190 .cloned()
191 .collect::<Vec<_>>();
192 match scope {
193 SessionListScope::Global => all,
194 SessionListScope::Workspace { workspace_root } => {
195 let Some(normalized_workspace) = normalize_workspace_path(&workspace_root) else {
196 return Vec::new();
197 };
198 all.into_iter()
199 .filter(|session| {
200 let direct = session
201 .workspace_root
202 .as_ref()
203 .and_then(|p| normalize_workspace_path(p))
204 .map(|p| p == normalized_workspace)
205 .unwrap_or(false);
206 if direct {
207 return true;
208 }
209 normalize_workspace_path(&session.directory)
210 .map(|p| p == normalized_workspace)
211 .unwrap_or(false)
212 })
213 .collect()
214 }
215 }
216 }
217
218 pub async fn get_session(&self, id: &str) -> Option<Session> {
219 self.sessions.read().await.get(id).cloned()
220 }
221
222 pub async fn save_session(&self, mut session: Session) -> anyhow::Result<()> {
223 if session.workspace_root.is_none() {
224 session.workspace_root = normalize_workspace_path(&session.directory);
225 }
226 let session_id = session.id.clone();
227 self.sessions
228 .write()
229 .await
230 .insert(session_id.clone(), session);
231 self.metadata
232 .write()
233 .await
234 .entry(session_id)
235 .or_insert_with(SessionMeta::default);
236 self.flush().await
237 }
238
239 pub async fn repair_sessions_from_file_store(&self) -> anyhow::Result<SessionRepairStats> {
240 let mut stats = SessionRepairStats::default();
241 let mut sessions = self.sessions.write().await;
242
243 for session in sessions.values_mut() {
244 let imported = load_legacy_session_messages(&self.base, &session.id);
245 if imported.is_empty() {
246 continue;
247 }
248
249 let (merged, merge_stats, changed) =
250 merge_session_messages(&session.messages, &imported);
251 if changed {
252 session.messages = merged;
253 session.time.updated =
254 most_recent_message_time(&session.messages).unwrap_or(session.time.updated);
255 stats.sessions_repaired += 1;
256 stats.messages_recovered += merge_stats.messages_recovered;
257 stats.parts_recovered += merge_stats.parts_recovered;
258 stats.conflicts_merged += merge_stats.conflicts_merged;
259 }
260 }
261
262 if stats.sessions_repaired > 0 {
263 drop(sessions);
264 self.flush().await?;
265 }
266
267 Ok(stats)
268 }
269
270 pub async fn run_legacy_storage_repair_scan(
271 &self,
272 force: bool,
273 ) -> anyhow::Result<LegacyRepairRunReport> {
274 let marker_path = self.base.join(LEGACY_IMPORT_MARKER_FILE);
275 let sessions_exists = self.base.join("sessions.json").exists();
276 let should_scan = if force {
277 true
278 } else {
279 should_run_legacy_scan_on_startup(&marker_path, sessions_exists).await
280 };
281 if !should_scan {
282 let marker = read_legacy_import_marker(&marker_path)
283 .await
284 .unwrap_or_else(|| LegacyImportMarker {
285 version: LEGACY_IMPORT_MARKER_VERSION,
286 created_at_ms: now_ms_u64(),
287 last_checked_at_ms: now_ms_u64(),
288 legacy_counts: LegacyTreeCounts::default(),
289 imported_counts: LegacyImportedCounts::default(),
290 });
291 return Ok(LegacyRepairRunReport {
292 status: "skipped".to_string(),
293 marker_updated: false,
294 sessions_merged: 0,
295 messages_recovered: 0,
296 parts_recovered: 0,
297 legacy_counts: marker.legacy_counts,
298 imported_counts: marker.imported_counts,
299 });
300 }
301
302 let base_for_scan = self.base.clone();
303 let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
304 .await
305 .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
306
307 let merge_stats = {
308 let mut sessions = self.sessions.write().await;
309 merge_legacy_sessions_with_stats(&mut sessions, scan.sessions)
310 };
311
312 if merge_stats.changed {
313 self.flush().await?;
314 }
315
316 let marker = LegacyImportMarker {
317 version: LEGACY_IMPORT_MARKER_VERSION,
318 created_at_ms: now_ms_u64(),
319 last_checked_at_ms: now_ms_u64(),
320 legacy_counts: scan.legacy_counts.clone(),
321 imported_counts: scan.imported_counts.clone(),
322 };
323 self.write_legacy_import_marker(&marker).await?;
324
325 Ok(LegacyRepairRunReport {
326 status: if merge_stats.changed {
327 "updated".to_string()
328 } else {
329 "no_changes".to_string()
330 },
331 marker_updated: true,
332 sessions_merged: merge_stats.sessions_merged,
333 messages_recovered: merge_stats.messages_recovered,
334 parts_recovered: merge_stats.parts_recovered,
335 legacy_counts: scan.legacy_counts,
336 imported_counts: scan.imported_counts,
337 })
338 }
339
340 pub async fn delete_session(&self, id: &str) -> anyhow::Result<bool> {
341 let removed = self.sessions.write().await.remove(id).is_some();
342 self.metadata.write().await.remove(id);
343 self.question_requests
344 .write()
345 .await
346 .retain(|_, request| request.session_id != id);
347 if removed {
348 self.flush().await?;
349 }
350 Ok(removed)
351 }
352
353 pub async fn append_message(&self, session_id: &str, msg: Message) -> anyhow::Result<()> {
354 let mut sessions = self.sessions.write().await;
355 let session = sessions
356 .get_mut(session_id)
357 .context("session not found for append_message")?;
358 let mut meta_guard = self.metadata.write().await;
359 let meta = meta_guard
360 .entry(session_id.to_string())
361 .or_insert_with(SessionMeta::default);
362 meta.snapshots.push(session.messages.clone());
363 if meta.snapshots.len() > 25 {
364 let _ = meta.snapshots.remove(0);
365 }
366 session.messages.push(msg);
367 session.time.updated = Utc::now();
368 drop(sessions);
369 drop(meta_guard);
370 self.flush().await
371 }
372
373 pub async fn fork_session(&self, id: &str) -> anyhow::Result<Option<Session>> {
374 let source = {
375 let sessions = self.sessions.read().await;
376 sessions.get(id).cloned()
377 };
378 let Some(mut child) = source else {
379 return Ok(None);
380 };
381
382 child.id = Uuid::new_v4().to_string();
383 child.title = format!("{} (fork)", child.title);
384 child.time.created = Utc::now();
385 child.time.updated = child.time.created;
386 child.slug = None;
387
388 self.sessions
389 .write()
390 .await
391 .insert(child.id.clone(), child.clone());
392 self.metadata.write().await.insert(
393 child.id.clone(),
394 SessionMeta {
395 parent_id: Some(id.to_string()),
396 snapshots: vec![child.messages.clone()],
397 ..SessionMeta::default()
398 },
399 );
400 self.flush().await?;
401 Ok(Some(child))
402 }
403
404 pub async fn revert_session(&self, id: &str) -> anyhow::Result<bool> {
405 let mut sessions = self.sessions.write().await;
406 let Some(session) = sessions.get_mut(id) else {
407 return Ok(false);
408 };
409 let mut metadata = self.metadata.write().await;
410 let meta = metadata
411 .entry(id.to_string())
412 .or_insert_with(SessionMeta::default);
413 let Some(snapshot) = meta.snapshots.pop() else {
414 return Ok(false);
415 };
416 meta.pre_revert = Some(session.messages.clone());
417 session.messages = snapshot;
418 session.time.updated = Utc::now();
419 drop(metadata);
420 drop(sessions);
421 self.flush().await?;
422 Ok(true)
423 }
424
425 pub async fn unrevert_session(&self, id: &str) -> anyhow::Result<bool> {
426 let mut sessions = self.sessions.write().await;
427 let Some(session) = sessions.get_mut(id) else {
428 return Ok(false);
429 };
430 let mut metadata = self.metadata.write().await;
431 let Some(meta) = metadata.get_mut(id) else {
432 return Ok(false);
433 };
434 let Some(previous) = meta.pre_revert.take() else {
435 return Ok(false);
436 };
437 meta.snapshots.push(session.messages.clone());
438 session.messages = previous;
439 session.time.updated = Utc::now();
440 drop(metadata);
441 drop(sessions);
442 self.flush().await?;
443 Ok(true)
444 }
445
446 pub async fn set_shared(&self, id: &str, shared: bool) -> anyhow::Result<Option<String>> {
447 let mut metadata = self.metadata.write().await;
448 let meta = metadata
449 .entry(id.to_string())
450 .or_insert_with(SessionMeta::default);
451 meta.shared = shared;
452 if shared {
453 if meta.share_id.is_none() {
454 meta.share_id = Some(Uuid::new_v4().to_string());
455 }
456 } else {
457 meta.share_id = None;
458 }
459 let share_id = meta.share_id.clone();
460 drop(metadata);
461 self.flush().await?;
462 Ok(share_id)
463 }
464
465 pub async fn set_archived(&self, id: &str, archived: bool) -> anyhow::Result<bool> {
466 let mut metadata = self.metadata.write().await;
467 let meta = metadata
468 .entry(id.to_string())
469 .or_insert_with(SessionMeta::default);
470 meta.archived = archived;
471 drop(metadata);
472 self.flush().await?;
473 Ok(true)
474 }
475
476 pub async fn set_summary(&self, id: &str, summary: String) -> anyhow::Result<bool> {
477 let mut metadata = self.metadata.write().await;
478 let meta = metadata
479 .entry(id.to_string())
480 .or_insert_with(SessionMeta::default);
481 meta.summary = Some(summary);
482 drop(metadata);
483 self.flush().await?;
484 Ok(true)
485 }
486
487 pub async fn children(&self, parent_id: &str) -> Vec<Session> {
488 let child_ids = {
489 let metadata = self.metadata.read().await;
490 metadata
491 .iter()
492 .filter(|(_, meta)| meta.parent_id.as_deref() == Some(parent_id))
493 .map(|(id, _)| id.clone())
494 .collect::<Vec<_>>()
495 };
496 let sessions = self.sessions.read().await;
497 child_ids
498 .into_iter()
499 .filter_map(|id| sessions.get(&id).cloned())
500 .collect()
501 }
502
503 pub async fn session_status(&self, id: &str) -> Option<Value> {
504 let metadata = self.metadata.read().await;
505 metadata.get(id).map(|meta| {
506 json!({
507 "archived": meta.archived,
508 "shared": meta.shared,
509 "parentID": meta.parent_id,
510 "snapshotCount": meta.snapshots.len()
511 })
512 })
513 }
514
515 pub async fn session_diff(&self, id: &str) -> Option<Value> {
516 let sessions = self.sessions.read().await;
517 let current = sessions.get(id)?;
518 let metadata = self.metadata.read().await;
519 let default = SessionMeta::default();
520 let meta = metadata.get(id).unwrap_or(&default);
521 let last_snapshot_len = meta.snapshots.last().map(|s| s.len()).unwrap_or(0);
522 Some(json!({
523 "sessionID": id,
524 "currentMessageCount": current.messages.len(),
525 "lastSnapshotMessageCount": last_snapshot_len,
526 "delta": current.messages.len() as i64 - last_snapshot_len as i64
527 }))
528 }
529
530 pub async fn set_todos(&self, id: &str, todos: Vec<Value>) -> anyhow::Result<()> {
531 let mut metadata = self.metadata.write().await;
532 let meta = metadata
533 .entry(id.to_string())
534 .or_insert_with(SessionMeta::default);
535 meta.todos = normalize_todo_items(todos);
536 drop(metadata);
537 self.flush().await
538 }
539
540 pub async fn get_todos(&self, id: &str) -> Vec<Value> {
541 let todos = self
542 .metadata
543 .read()
544 .await
545 .get(id)
546 .map(|meta| meta.todos.clone())
547 .unwrap_or_default();
548 normalize_todo_items(todos)
549 }
550
551 pub async fn add_question_request(
552 &self,
553 session_id: &str,
554 message_id: &str,
555 questions: Vec<Value>,
556 ) -> anyhow::Result<QuestionRequest> {
557 let request = QuestionRequest {
558 id: format!("q-{}", Uuid::new_v4()),
559 session_id: session_id.to_string(),
560 questions,
561 tool: Some(QuestionToolRef {
562 call_id: format!("call-{}", Uuid::new_v4()),
563 message_id: message_id.to_string(),
564 }),
565 };
566 self.question_requests
567 .write()
568 .await
569 .insert(request.id.clone(), request.clone());
570 self.flush().await?;
571 Ok(request)
572 }
573
574 pub async fn list_question_requests(&self) -> Vec<QuestionRequest> {
575 self.question_requests
576 .read()
577 .await
578 .values()
579 .cloned()
580 .collect()
581 }
582
583 pub async fn reply_question(&self, request_id: &str) -> anyhow::Result<bool> {
584 let removed = self
585 .question_requests
586 .write()
587 .await
588 .remove(request_id)
589 .is_some();
590 if removed {
591 self.flush().await?;
592 }
593 Ok(removed)
594 }
595
596 pub async fn reject_question(&self, request_id: &str) -> anyhow::Result<bool> {
597 self.reply_question(request_id).await
598 }
599
600 pub async fn attach_session_to_workspace(
601 &self,
602 session_id: &str,
603 target_workspace: &str,
604 reason_tag: &str,
605 ) -> anyhow::Result<Option<Session>> {
606 let Some(target_workspace) = normalize_workspace_path(target_workspace) else {
607 return Ok(None);
608 };
609 let mut sessions = self.sessions.write().await;
610 let Some(session) = sessions.get_mut(session_id) else {
611 return Ok(None);
612 };
613 let previous_workspace = session
614 .workspace_root
615 .clone()
616 .or_else(|| normalize_workspace_path(&session.directory));
617
618 if session.origin_workspace_root.is_none() {
619 session.origin_workspace_root = previous_workspace.clone();
620 }
621 session.attached_from_workspace = previous_workspace;
622 session.attached_to_workspace = Some(target_workspace.clone());
623 session.attach_timestamp_ms = Some(Utc::now().timestamp_millis().max(0) as u64);
624 session.attach_reason = Some(reason_tag.trim().to_string());
625 session.workspace_root = Some(target_workspace.clone());
626 session.directory = target_workspace;
627 session.time.updated = Utc::now();
628 let updated = session.clone();
629 drop(sessions);
630 self.flush().await?;
631 Ok(Some(updated))
632 }
633
634 async fn flush(&self) -> anyhow::Result<()> {
635 let snapshot = self.sessions.read().await.clone();
636 let payload = serde_json::to_string_pretty(&snapshot)?;
637 fs::write(self.base.join("sessions.json"), payload).await?;
638 let metadata_snapshot = self.metadata.read().await.clone();
639 let metadata_payload = serde_json::to_string_pretty(&metadata_snapshot)?;
640 fs::write(self.base.join("session_meta.json"), metadata_payload).await?;
641 let questions_snapshot = self.question_requests.read().await.clone();
642 let questions_payload = serde_json::to_string_pretty(&questions_snapshot)?;
643 fs::write(self.base.join("questions.json"), questions_payload).await?;
644 Ok(())
645 }
646
647 async fn write_legacy_import_marker(&self, marker: &LegacyImportMarker) -> anyhow::Result<()> {
648 let payload = serde_json::to_string_pretty(marker)?;
649 fs::write(self.base.join(LEGACY_IMPORT_MARKER_FILE), payload).await?;
650 Ok(())
651 }
652}
653
654fn normalize_todo_items(items: Vec<Value>) -> Vec<Value> {
655 items
656 .into_iter()
657 .filter_map(|item| {
658 let obj = item.as_object()?;
659 let content = obj
660 .get("content")
661 .and_then(|v| v.as_str())
662 .or_else(|| obj.get("text").and_then(|v| v.as_str()))
663 .unwrap_or("")
664 .trim()
665 .to_string();
666 if content.is_empty() {
667 return None;
668 }
669 let id = obj
670 .get("id")
671 .and_then(|v| v.as_str())
672 .filter(|s| !s.trim().is_empty())
673 .map(ToString::to_string)
674 .unwrap_or_else(|| format!("todo-{}", Uuid::new_v4()));
675 let status = obj
676 .get("status")
677 .and_then(|v| v.as_str())
678 .filter(|s| !s.trim().is_empty())
679 .map(ToString::to_string)
680 .unwrap_or_else(|| "pending".to_string());
681 Some(json!({
682 "id": id,
683 "content": content,
684 "status": status
685 }))
686 })
687 .collect()
688}
689
690#[derive(Debug)]
691struct LegacyScanResult {
692 sessions: HashMap<String, Session>,
693 legacy_counts: LegacyTreeCounts,
694 imported_counts: LegacyImportedCounts,
695}
696
697#[derive(Debug, Default)]
698struct LegacyMergeStats {
699 changed: bool,
700 sessions_merged: u64,
701 messages_recovered: u64,
702 parts_recovered: u64,
703}
704
705fn now_ms_u64() -> u64 {
706 Utc::now().timestamp_millis().max(0) as u64
707}
708
709async fn should_run_legacy_scan_on_startup(marker_path: &Path, sessions_exist: bool) -> bool {
710 if !sessions_exist {
711 return true;
712 }
713 if read_legacy_import_marker(marker_path).await.is_none() {
716 return false;
717 }
718 false
719}
720
721async fn read_legacy_import_marker(marker_path: &Path) -> Option<LegacyImportMarker> {
722 let raw = fs::read_to_string(marker_path).await.ok()?;
723 serde_json::from_str::<LegacyImportMarker>(&raw).ok()
724}
725
726fn scan_legacy_sessions(base: &Path) -> anyhow::Result<LegacyScanResult> {
727 let sessions = load_legacy_opencode_sessions(base).unwrap_or_default();
728 let imported_counts = LegacyImportedCounts {
729 sessions: sessions.len() as u64,
730 messages: sessions.values().map(|s| s.messages.len() as u64).sum(),
731 parts: sessions
732 .values()
733 .flat_map(|s| s.messages.iter())
734 .map(|m| m.parts.len() as u64)
735 .sum(),
736 };
737 let legacy_counts = LegacyTreeCounts {
738 session_files: count_legacy_json_files(&base.join("session")),
739 message_files: count_legacy_json_files(&base.join("message")),
740 part_files: count_legacy_json_files(&base.join("part")),
741 };
742 Ok(LegacyScanResult {
743 sessions,
744 legacy_counts,
745 imported_counts,
746 })
747}
748
749fn count_legacy_json_files(root: &Path) -> u64 {
750 if !root.is_dir() {
751 return 0;
752 }
753 let mut count = 0u64;
754 let mut stack = vec![root.to_path_buf()];
755 while let Some(dir) = stack.pop() {
756 if let Ok(entries) = std::fs::read_dir(&dir) {
757 for entry in entries.flatten() {
758 let path = entry.path();
759 if entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
760 stack.push(path);
761 continue;
762 }
763 if path.extension().and_then(|ext| ext.to_str()) == Some("json") {
764 count += 1;
765 }
766 }
767 }
768 }
769 count
770}
771
772fn merge_legacy_sessions(
773 current: &mut HashMap<String, Session>,
774 imported: HashMap<String, Session>,
775) -> bool {
776 merge_legacy_sessions_with_stats(current, imported).changed
777}
778
779fn merge_legacy_sessions_with_stats(
780 current: &mut HashMap<String, Session>,
781 imported: HashMap<String, Session>,
782) -> LegacyMergeStats {
783 let mut stats = LegacyMergeStats::default();
784 for (id, legacy) in imported {
785 let legacy_message_count = legacy.messages.len() as u64;
786 let legacy_part_count = legacy
787 .messages
788 .iter()
789 .map(|m| m.parts.len() as u64)
790 .sum::<u64>();
791 match current.get_mut(&id) {
792 None => {
793 current.insert(id, legacy);
794 stats.changed = true;
795 stats.sessions_merged += 1;
796 stats.messages_recovered += legacy_message_count;
797 stats.parts_recovered += legacy_part_count;
798 }
799 Some(existing) => {
800 let should_merge_messages =
801 existing.messages.is_empty() && !legacy.messages.is_empty();
802 let should_fill_title =
803 existing.title.trim().is_empty() && !legacy.title.trim().is_empty();
804 let should_fill_directory = (existing.directory.trim().is_empty()
805 || existing.directory.trim() == "."
806 || existing.directory.trim() == "./"
807 || existing.directory.trim() == ".\\")
808 && !legacy.directory.trim().is_empty();
809 let should_fill_workspace =
810 existing.workspace_root.is_none() && legacy.workspace_root.is_some();
811 if should_merge_messages {
812 existing.messages = legacy.messages.clone();
813 }
814 if should_fill_title {
815 existing.title = legacy.title.clone();
816 }
817 if should_fill_directory {
818 existing.directory = legacy.directory.clone();
819 }
820 if should_fill_workspace {
821 existing.workspace_root = legacy.workspace_root.clone();
822 }
823 if should_merge_messages
824 || should_fill_title
825 || should_fill_directory
826 || should_fill_workspace
827 {
828 stats.changed = true;
829 if should_merge_messages {
830 stats.sessions_merged += 1;
831 stats.messages_recovered += legacy_message_count;
832 stats.parts_recovered += legacy_part_count;
833 }
834 }
835 }
836 }
837 }
838 stats
839}
840
841fn hydrate_workspace_roots(sessions: &mut HashMap<String, Session>) -> bool {
842 let mut changed = false;
843 for session in sessions.values_mut() {
844 if session.workspace_root.is_none() {
845 let normalized = normalize_workspace_path(&session.directory);
846 if normalized.is_some() {
847 session.workspace_root = normalized;
848 changed = true;
849 }
850 }
851 }
852 changed
853}
854
855fn repair_session_titles(sessions: &mut HashMap<String, Session>) -> bool {
856 let mut changed = false;
857 for session in sessions.values_mut() {
858 if !title_needs_repair(&session.title) {
859 continue;
860 }
861 let first_user_text = session.messages.iter().find_map(|message| {
862 if !matches!(message.role, MessageRole::User) {
863 return None;
864 }
865 message.parts.iter().find_map(|part| match part {
866 MessagePart::Text { text } if !text.trim().is_empty() => Some(text.as_str()),
867 _ => None,
868 })
869 });
870 let Some(source) = first_user_text else {
871 continue;
872 };
873 let Some(derived) = derive_session_title_from_prompt(source, 60) else {
874 continue;
875 };
876 if derived == session.title {
877 continue;
878 }
879 session.title = derived;
880 session.time.updated = Utc::now();
881 changed = true;
882 }
883 changed
884}
885
886#[derive(Debug, Deserialize)]
887struct LegacySessionTime {
888 created: i64,
889 updated: i64,
890}
891
892#[derive(Debug, Deserialize)]
893struct LegacySession {
894 id: String,
895 slug: Option<String>,
896 version: Option<String>,
897 #[serde(rename = "projectID")]
898 project_id: Option<String>,
899 title: Option<String>,
900 directory: Option<String>,
901 time: LegacySessionTime,
902}
903
904fn load_legacy_opencode_sessions(base: &Path) -> anyhow::Result<HashMap<String, Session>> {
905 let legacy_root = base.join("session");
906 if !legacy_root.is_dir() {
907 return Ok(HashMap::new());
908 }
909
910 let mut out = HashMap::new();
911 let mut stack = vec![legacy_root];
912 while let Some(dir) = stack.pop() {
913 for entry in std::fs::read_dir(&dir)? {
914 let entry = entry?;
915 let path = entry.path();
916 if entry.file_type()?.is_dir() {
917 stack.push(path);
918 continue;
919 }
920 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
921 continue;
922 }
923 let raw = match std::fs::read_to_string(&path) {
924 Ok(v) => v,
925 Err(_) => continue,
926 };
927 let legacy = match serde_json::from_str::<LegacySession>(&raw) {
928 Ok(v) => v,
929 Err(_) => continue,
930 };
931 let created = Utc
932 .timestamp_millis_opt(legacy.time.created)
933 .single()
934 .unwrap_or_else(Utc::now);
935 let updated = Utc
936 .timestamp_millis_opt(legacy.time.updated)
937 .single()
938 .unwrap_or(created);
939
940 let session_id = legacy.id.clone();
941 out.insert(
942 session_id.clone(),
943 Session {
944 id: session_id.clone(),
945 slug: legacy.slug,
946 version: legacy.version,
947 project_id: legacy.project_id,
948 title: legacy
949 .title
950 .filter(|s| !s.trim().is_empty())
951 .unwrap_or_else(|| "New session".to_string()),
952 directory: legacy
953 .directory
954 .clone()
955 .filter(|s| !s.trim().is_empty())
956 .unwrap_or_else(|| ".".to_string()),
957 workspace_root: legacy
958 .directory
959 .as_deref()
960 .and_then(normalize_workspace_path),
961 origin_workspace_root: None,
962 attached_from_workspace: None,
963 attached_to_workspace: None,
964 attach_timestamp_ms: None,
965 attach_reason: None,
966 time: tandem_types::SessionTime { created, updated },
967 model: None,
968 provider: None,
969 environment: None,
970 messages: load_legacy_session_messages(base, &session_id),
971 },
972 );
973 }
974 }
975 Ok(out)
976}
977
978#[derive(Debug, Deserialize)]
979struct LegacyMessageTime {
980 created: i64,
981}
982
983#[derive(Debug, Deserialize)]
984struct LegacyMessage {
985 id: String,
986 role: String,
987 time: LegacyMessageTime,
988}
989
990#[derive(Debug, Deserialize)]
991struct LegacyPart {
992 #[serde(rename = "type")]
993 part_type: Option<String>,
994 text: Option<String>,
995 tool: Option<String>,
996 args: Option<Value>,
997 result: Option<Value>,
998 error: Option<String>,
999}
1000
1001fn load_legacy_session_messages(base: &Path, session_id: &str) -> Vec<Message> {
1002 let msg_dir = base.join("message").join(session_id);
1003 if !msg_dir.is_dir() {
1004 return Vec::new();
1005 }
1006
1007 let mut legacy_messages: Vec<(i64, Message)> = Vec::new();
1008
1009 let Ok(entries) = std::fs::read_dir(&msg_dir) else {
1010 return Vec::new();
1011 };
1012
1013 for entry in entries.flatten() {
1014 let path = entry.path();
1015 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1016 continue;
1017 }
1018 let Ok(raw) = std::fs::read_to_string(&path) else {
1019 continue;
1020 };
1021 let Ok(legacy) = serde_json::from_str::<LegacyMessage>(&raw) else {
1022 continue;
1023 };
1024
1025 let created_at = Utc
1026 .timestamp_millis_opt(legacy.time.created)
1027 .single()
1028 .unwrap_or_else(Utc::now);
1029
1030 legacy_messages.push((
1031 legacy.time.created,
1032 Message {
1033 id: legacy.id.clone(),
1034 role: legacy_role_to_message_role(&legacy.role),
1035 parts: load_legacy_message_parts(base, &legacy.id),
1036 created_at,
1037 },
1038 ));
1039 }
1040
1041 legacy_messages.sort_by_key(|(created_ms, _)| *created_ms);
1042 legacy_messages.into_iter().map(|(_, msg)| msg).collect()
1043}
1044
1045fn load_legacy_message_parts(base: &Path, message_id: &str) -> Vec<MessagePart> {
1046 let parts_dir = base.join("part").join(message_id);
1047 if !parts_dir.is_dir() {
1048 return Vec::new();
1049 }
1050
1051 let Ok(entries) = std::fs::read_dir(&parts_dir) else {
1052 return Vec::new();
1053 };
1054
1055 let mut out = Vec::new();
1056 for entry in entries.flatten() {
1057 let path = entry.path();
1058 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1059 continue;
1060 }
1061 let Ok(raw) = std::fs::read_to_string(&path) else {
1062 continue;
1063 };
1064 let Ok(part) = serde_json::from_str::<LegacyPart>(&raw) else {
1065 continue;
1066 };
1067
1068 let mapped = if let Some(tool) = part.tool {
1069 Some(MessagePart::ToolInvocation {
1070 tool,
1071 args: part.args.unwrap_or_else(|| json!({})),
1072 result: part.result,
1073 error: part.error,
1074 })
1075 } else {
1076 match part.part_type.as_deref() {
1077 Some("reasoning") => Some(MessagePart::Reasoning {
1078 text: part.text.unwrap_or_default(),
1079 }),
1080 Some("tool") => Some(MessagePart::ToolInvocation {
1081 tool: "tool".to_string(),
1082 args: part.args.unwrap_or_else(|| json!({})),
1083 result: part.result,
1084 error: part.error,
1085 }),
1086 Some("text") | None => Some(MessagePart::Text {
1087 text: part.text.unwrap_or_default(),
1088 }),
1089 _ => None,
1090 }
1091 };
1092
1093 if let Some(part) = mapped {
1094 out.push(part);
1095 }
1096 }
1097 out
1098}
1099
1100fn legacy_role_to_message_role(role: &str) -> MessageRole {
1101 match role.to_lowercase().as_str() {
1102 "user" => MessageRole::User,
1103 "assistant" => MessageRole::Assistant,
1104 "system" => MessageRole::System,
1105 "tool" => MessageRole::Tool,
1106 _ => MessageRole::Assistant,
1107 }
1108}
1109
1110#[derive(Debug, Clone, Default)]
1111struct MessageMergeStats {
1112 messages_recovered: u64,
1113 parts_recovered: u64,
1114 conflicts_merged: u64,
1115}
1116
1117fn message_richness(msg: &Message) -> usize {
1118 msg.parts
1119 .iter()
1120 .map(|p| match p {
1121 MessagePart::Text { text } | MessagePart::Reasoning { text } => {
1122 if text.trim().is_empty() {
1123 0
1124 } else {
1125 1
1126 }
1127 }
1128 MessagePart::ToolInvocation { result, error, .. } => {
1129 if result.is_some() || error.is_some() {
1130 2
1131 } else {
1132 1
1133 }
1134 }
1135 })
1136 .sum()
1137}
1138
1139fn most_recent_message_time(messages: &[Message]) -> Option<chrono::DateTime<Utc>> {
1140 messages.iter().map(|m| m.created_at).max()
1141}
1142
1143fn merge_session_messages(
1144 existing: &[Message],
1145 imported: &[Message],
1146) -> (Vec<Message>, MessageMergeStats, bool) {
1147 if existing.is_empty() {
1148 let messages_recovered = imported.len() as u64;
1149 let parts_recovered = imported.iter().map(|m| m.parts.len() as u64).sum();
1150 return (
1151 imported.to_vec(),
1152 MessageMergeStats {
1153 messages_recovered,
1154 parts_recovered,
1155 conflicts_merged: 0,
1156 },
1157 true,
1158 );
1159 }
1160
1161 let mut merged_by_id: HashMap<String, Message> = existing
1162 .iter()
1163 .cloned()
1164 .map(|m| (m.id.clone(), m))
1165 .collect();
1166 let mut stats = MessageMergeStats::default();
1167 let mut changed = false;
1168
1169 for incoming in imported {
1170 match merged_by_id.get(&incoming.id) {
1171 None => {
1172 merged_by_id.insert(incoming.id.clone(), incoming.clone());
1173 stats.messages_recovered += 1;
1174 stats.parts_recovered += incoming.parts.len() as u64;
1175 changed = true;
1176 }
1177 Some(current) => {
1178 let incoming_richer = message_richness(incoming) > message_richness(current)
1179 || incoming.parts.len() > current.parts.len();
1180 if incoming_richer {
1181 merged_by_id.insert(incoming.id.clone(), incoming.clone());
1182 stats.conflicts_merged += 1;
1183 changed = true;
1184 }
1185 }
1186 }
1187 }
1188
1189 let mut out: Vec<Message> = merged_by_id.into_values().collect();
1190 out.sort_by_key(|m| m.created_at);
1191 (out, stats, changed)
1192}
1193
1194#[cfg(test)]
1195mod tests {
1196 use super::*;
1197 use std::fs as stdfs;
1198
1199 #[tokio::test]
1200 async fn todos_are_normalized_to_wire_shape() {
1201 let base = std::env::temp_dir().join(format!("tandem-core-test-{}", Uuid::new_v4()));
1202 let storage = Storage::new(&base).await.expect("storage");
1203 let session = Session::new(Some("test".to_string()), Some(".".to_string()));
1204 let id = session.id.clone();
1205 storage.save_session(session).await.expect("save session");
1206
1207 storage
1208 .set_todos(
1209 &id,
1210 vec![
1211 json!({"content":"first"}),
1212 json!({"text":"second", "status":"in_progress"}),
1213 json!({"id":"keep-id","content":"third","status":"completed"}),
1214 ],
1215 )
1216 .await
1217 .expect("set todos");
1218
1219 let todos = storage.get_todos(&id).await;
1220 assert_eq!(todos.len(), 3);
1221 for todo in todos {
1222 assert!(todo.get("id").and_then(|v| v.as_str()).is_some());
1223 assert!(todo.get("content").and_then(|v| v.as_str()).is_some());
1224 assert!(todo.get("status").and_then(|v| v.as_str()).is_some());
1225 }
1226 }
1227
1228 #[tokio::test]
1229 async fn imports_legacy_opencode_session_index_when_sessions_json_missing() {
1230 let base =
1231 std::env::temp_dir().join(format!("tandem-core-legacy-import-{}", Uuid::new_v4()));
1232 let legacy_session_dir = base.join("session").join("global");
1233 stdfs::create_dir_all(&legacy_session_dir).expect("legacy session dir");
1234 stdfs::write(
1235 legacy_session_dir.join("ses_test.json"),
1236 r#"{
1237 "id": "ses_test",
1238 "slug": "test",
1239 "version": "1.0.0",
1240 "projectID": "proj_1",
1241 "directory": "C:\\work\\demo",
1242 "title": "Legacy Session",
1243 "time": { "created": 1770913145613, "updated": 1770913146613 }
1244}"#,
1245 )
1246 .expect("legacy session write");
1247
1248 let storage = Storage::new(&base).await.expect("storage");
1249 let sessions = storage.list_sessions().await;
1250 assert_eq!(sessions.len(), 1);
1251 assert_eq!(sessions[0].id, "ses_test");
1252 assert_eq!(sessions[0].title, "Legacy Session");
1253 assert!(base.join("sessions.json").exists());
1254 }
1255
1256 #[tokio::test]
1257 async fn imports_legacy_messages_and_parts_for_session() {
1258 let base = std::env::temp_dir().join(format!("tandem-core-legacy-msg-{}", Uuid::new_v4()));
1259 let session_dir = base.join("session").join("global");
1260 let message_dir = base.join("message").join("ses_test");
1261 let part_dir = base.join("part").join("msg_1");
1262 stdfs::create_dir_all(&session_dir).expect("session dir");
1263 stdfs::create_dir_all(&message_dir).expect("message dir");
1264 stdfs::create_dir_all(&part_dir).expect("part dir");
1265
1266 stdfs::write(
1267 session_dir.join("ses_test.json"),
1268 r#"{
1269 "id": "ses_test",
1270 "projectID": "proj_1",
1271 "directory": "C:\\work\\demo",
1272 "title": "Legacy Session",
1273 "time": { "created": 1770913145613, "updated": 1770913146613 }
1274}"#,
1275 )
1276 .expect("write session");
1277
1278 stdfs::write(
1279 message_dir.join("msg_1.json"),
1280 r#"{
1281 "id": "msg_1",
1282 "sessionID": "ses_test",
1283 "role": "assistant",
1284 "time": { "created": 1770913145613 }
1285}"#,
1286 )
1287 .expect("write msg");
1288
1289 stdfs::write(
1290 part_dir.join("prt_1.json"),
1291 r#"{
1292 "id": "prt_1",
1293 "sessionID": "ses_test",
1294 "messageID": "msg_1",
1295 "type": "text",
1296 "text": "hello from legacy"
1297}"#,
1298 )
1299 .expect("write part");
1300
1301 let storage = Storage::new(&base).await.expect("storage");
1302 let sessions = storage.list_sessions().await;
1303 assert_eq!(sessions.len(), 1);
1304 assert_eq!(sessions[0].messages.len(), 1);
1305 assert_eq!(sessions[0].messages[0].parts.len(), 1);
1306 }
1307
1308 #[tokio::test]
1309 async fn skips_legacy_merge_when_sessions_json_exists() {
1310 let base =
1311 std::env::temp_dir().join(format!("tandem-core-legacy-merge-{}", Uuid::new_v4()));
1312 stdfs::create_dir_all(&base).expect("base");
1313 stdfs::write(
1314 base.join("sessions.json"),
1315 r#"{
1316 "ses_current": {
1317 "id": "ses_current",
1318 "slug": null,
1319 "version": "v1",
1320 "project_id": null,
1321 "title": "Current Session",
1322 "directory": ".",
1323 "workspace_root": null,
1324 "origin_workspace_root": null,
1325 "attached_from_workspace": null,
1326 "attached_to_workspace": null,
1327 "attach_timestamp_ms": null,
1328 "attach_reason": null,
1329 "time": {"created":"2026-01-01T00:00:00Z","updated":"2026-01-01T00:00:00Z"},
1330 "model": null,
1331 "provider": null,
1332 "messages": []
1333 }
1334}"#,
1335 )
1336 .expect("sessions.json");
1337
1338 let legacy_session_dir = base.join("session").join("global");
1339 stdfs::create_dir_all(&legacy_session_dir).expect("legacy session dir");
1340 stdfs::write(
1341 legacy_session_dir.join("ses_legacy.json"),
1342 r#"{
1343 "id": "ses_legacy",
1344 "slug": "legacy",
1345 "version": "1.0.0",
1346 "projectID": "proj_legacy",
1347 "directory": "C:\\work\\legacy",
1348 "title": "Legacy Session",
1349 "time": { "created": 1770913145613, "updated": 1770913146613 }
1350}"#,
1351 )
1352 .expect("legacy session write");
1353
1354 let storage = Storage::new(&base).await.expect("storage");
1355 let sessions = storage.list_sessions().await;
1356 let ids = sessions.iter().map(|s| s.id.clone()).collect::<Vec<_>>();
1357 assert!(ids.contains(&"ses_current".to_string()));
1358 assert!(!ids.contains(&"ses_legacy".to_string()));
1359 }
1360
1361 #[tokio::test]
1362 async fn list_sessions_scoped_filters_by_workspace_root() {
1363 let base = std::env::temp_dir().join(format!("tandem-core-scope-{}", Uuid::new_v4()));
1364 let storage = Storage::new(&base).await.expect("storage");
1365 let ws_a = base.join("ws-a");
1366 let ws_b = base.join("ws-b");
1367 stdfs::create_dir_all(&ws_a).expect("ws_a");
1368 stdfs::create_dir_all(&ws_b).expect("ws_b");
1369 let ws_a_str = ws_a.to_string_lossy().to_string();
1370 let ws_b_str = ws_b.to_string_lossy().to_string();
1371
1372 let mut a = Session::new(Some("a".to_string()), Some(ws_a_str.clone()));
1373 a.workspace_root = Some(ws_a_str.clone());
1374 storage.save_session(a).await.expect("save a");
1375
1376 let mut b = Session::new(Some("b".to_string()), Some(ws_b_str.clone()));
1377 b.workspace_root = Some(ws_b_str);
1378 storage.save_session(b).await.expect("save b");
1379
1380 let scoped = storage
1381 .list_sessions_scoped(SessionListScope::Workspace {
1382 workspace_root: ws_a_str,
1383 })
1384 .await;
1385 assert_eq!(scoped.len(), 1);
1386 assert_eq!(scoped[0].title, "a");
1387 }
1388
1389 #[tokio::test]
1390 async fn attach_session_persists_audit_metadata() {
1391 let base = std::env::temp_dir().join(format!("tandem-core-attach-{}", Uuid::new_v4()));
1392 let storage = Storage::new(&base).await.expect("storage");
1393 let ws_a = base.join("ws-a");
1394 let ws_b = base.join("ws-b");
1395 stdfs::create_dir_all(&ws_a).expect("ws_a");
1396 stdfs::create_dir_all(&ws_b).expect("ws_b");
1397 let ws_a_str = ws_a.to_string_lossy().to_string();
1398 let ws_b_str = ws_b.to_string_lossy().to_string();
1399 let mut session = Session::new(Some("s".to_string()), Some(ws_a_str.clone()));
1400 session.workspace_root = Some(ws_a_str);
1401 let id = session.id.clone();
1402 storage.save_session(session).await.expect("save");
1403
1404 let updated = storage
1405 .attach_session_to_workspace(&id, &ws_b_str, "manual")
1406 .await
1407 .expect("attach")
1408 .expect("session exists");
1409 let normalized_expected = normalize_workspace_path(&ws_b_str).expect("normalized path");
1410 assert_eq!(
1411 updated.workspace_root.as_deref(),
1412 Some(normalized_expected.as_str())
1413 );
1414 assert_eq!(
1415 updated.attached_to_workspace.as_deref(),
1416 Some(normalized_expected.as_str())
1417 );
1418 assert_eq!(updated.attach_reason.as_deref(), Some("manual"));
1419 assert!(updated.attach_timestamp_ms.is_some());
1420 }
1421
1422 #[tokio::test]
1423 async fn startup_repairs_placeholder_titles_from_wrapped_user_messages() {
1424 let base =
1425 std::env::temp_dir().join(format!("tandem-core-title-repair-{}", Uuid::new_v4()));
1426 let storage = Storage::new(&base).await.expect("storage");
1427 let wrapped = "<memory_context>\n<current_session>\n- fact\n</current_session>\n</memory_context>\n\n[Mode instructions]\nUse tools.\n\n[User request]\nExplain this bug";
1428 let mut session = Session::new(Some("<memory_context>".to_string()), Some(".".to_string()));
1429 let id = session.id.clone();
1430 session.messages.push(Message::new(
1431 MessageRole::User,
1432 vec![MessagePart::Text {
1433 text: wrapped.to_string(),
1434 }],
1435 ));
1436 storage.save_session(session).await.expect("save");
1437 drop(storage);
1438
1439 let storage = Storage::new(&base).await.expect("storage");
1440 let repaired = storage.get_session(&id).await.expect("session");
1441 assert_eq!(repaired.title, "Explain this bug");
1442 }
1443}