1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use anyhow::Context;
5use chrono::{TimeZone, Utc};
6use serde::{Deserialize, Serialize};
7use serde_json::{json, Value};
8use tokio::fs;
9use tokio::sync::RwLock;
10use tokio::task;
11use uuid::Uuid;
12
13use tandem_types::{Message, MessagePart, MessageRole, Session};
14
15use crate::{derive_session_title_from_prompt, normalize_workspace_path, title_needs_repair};
16
17#[derive(Debug, Clone, Serialize, Deserialize, Default)]
18pub struct SessionMeta {
19 pub parent_id: Option<String>,
20 #[serde(default)]
21 pub archived: bool,
22 #[serde(default)]
23 pub shared: bool,
24 pub share_id: Option<String>,
25 pub summary: Option<String>,
26 #[serde(default)]
27 pub snapshots: Vec<Vec<Message>>,
28 pub pre_revert: Option<Vec<Message>>,
29 #[serde(default)]
30 pub todos: Vec<Value>,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct QuestionToolRef {
35 #[serde(rename = "callID")]
36 pub call_id: String,
37 #[serde(rename = "messageID")]
38 pub message_id: String,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct QuestionRequest {
43 pub id: String,
44 #[serde(rename = "sessionID")]
45 pub session_id: String,
46 #[serde(default)]
47 pub questions: Vec<Value>,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 pub tool: Option<QuestionToolRef>,
50}
51
52pub struct Storage {
53 base: PathBuf,
54 sessions: RwLock<HashMap<String, Session>>,
55 metadata: RwLock<HashMap<String, SessionMeta>>,
56 question_requests: RwLock<HashMap<String, QuestionRequest>>,
57}
58
59#[derive(Debug, Clone)]
60pub enum SessionListScope {
61 Global,
62 Workspace { workspace_root: String },
63}
64
65#[derive(Debug, Clone, Default, Serialize, Deserialize)]
66pub struct SessionRepairStats {
67 pub sessions_repaired: u64,
68 pub messages_recovered: u64,
69 pub parts_recovered: u64,
70 pub conflicts_merged: u64,
71}
72
73const LEGACY_IMPORT_MARKER_FILE: &str = "legacy_import_marker.json";
74const LEGACY_IMPORT_MARKER_VERSION: u32 = 1;
75
76#[derive(Debug, Clone, Default, Serialize, Deserialize)]
77pub struct LegacyTreeCounts {
78 pub session_files: u64,
79 pub message_files: u64,
80 pub part_files: u64,
81}
82
83#[derive(Debug, Clone, Default, Serialize, Deserialize)]
84pub struct LegacyImportedCounts {
85 pub sessions: u64,
86 pub messages: u64,
87 pub parts: u64,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct LegacyImportMarker {
92 pub version: u32,
93 pub created_at_ms: u64,
94 pub last_checked_at_ms: u64,
95 pub legacy_counts: LegacyTreeCounts,
96 pub imported_counts: LegacyImportedCounts,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct LegacyRepairRunReport {
101 pub status: String,
102 pub marker_updated: bool,
103 pub sessions_merged: u64,
104 pub messages_recovered: u64,
105 pub parts_recovered: u64,
106 pub legacy_counts: LegacyTreeCounts,
107 pub imported_counts: LegacyImportedCounts,
108}
109
110impl Storage {
111 pub async fn new(base: impl AsRef<Path>) -> anyhow::Result<Self> {
112 let base = base.as_ref().to_path_buf();
113 fs::create_dir_all(&base).await?;
114 let sessions_file = base.join("sessions.json");
115 let marker_path = base.join(LEGACY_IMPORT_MARKER_FILE);
116 let sessions_file_exists = sessions_file.exists();
117 let mut imported_legacy_sessions = false;
118 let mut sessions = if sessions_file_exists {
119 let raw = fs::read_to_string(&sessions_file).await?;
120 serde_json::from_str::<HashMap<String, Session>>(&raw).unwrap_or_default()
121 } else {
122 HashMap::new()
123 };
124
125 let mut marker_to_write = None;
126 if should_run_legacy_scan_on_startup(&marker_path, sessions_file_exists).await {
127 let base_for_scan = base.clone();
128 let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
129 .await
130 .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
131 if merge_legacy_sessions(&mut sessions, scan.sessions) {
132 imported_legacy_sessions = true;
133 }
134 marker_to_write = Some(LegacyImportMarker {
135 version: LEGACY_IMPORT_MARKER_VERSION,
136 created_at_ms: now_ms_u64(),
137 last_checked_at_ms: now_ms_u64(),
138 legacy_counts: scan.legacy_counts,
139 imported_counts: scan.imported_counts,
140 });
141 }
142
143 if hydrate_workspace_roots(&mut sessions) {
144 imported_legacy_sessions = true;
145 }
146 if repair_session_titles(&mut sessions) {
147 imported_legacy_sessions = true;
148 }
149 let metadata_file = base.join("session_meta.json");
150 let metadata = if metadata_file.exists() {
151 let raw = fs::read_to_string(&metadata_file).await?;
152 serde_json::from_str::<HashMap<String, SessionMeta>>(&raw).unwrap_or_default()
153 } else {
154 HashMap::new()
155 };
156 let questions_file = base.join("questions.json");
157 let question_requests = if questions_file.exists() {
158 let raw = fs::read_to_string(&questions_file).await?;
159 serde_json::from_str::<HashMap<String, QuestionRequest>>(&raw).unwrap_or_default()
160 } else {
161 HashMap::new()
162 };
163 let storage = Self {
164 base,
165 sessions: RwLock::new(sessions),
166 metadata: RwLock::new(metadata),
167 question_requests: RwLock::new(question_requests),
168 };
169
170 if imported_legacy_sessions {
171 storage.flush().await?;
172 }
173 if let Some(marker) = marker_to_write {
174 storage.write_legacy_import_marker(&marker).await?;
175 }
176
177 Ok(storage)
178 }
179
180 pub async fn list_sessions(&self) -> Vec<Session> {
181 self.list_sessions_scoped(SessionListScope::Global).await
182 }
183
184 pub async fn list_sessions_scoped(&self, scope: SessionListScope) -> Vec<Session> {
185 let all = self
186 .sessions
187 .read()
188 .await
189 .values()
190 .cloned()
191 .collect::<Vec<_>>();
192 match scope {
193 SessionListScope::Global => all,
194 SessionListScope::Workspace { workspace_root } => {
195 let Some(normalized_workspace) = normalize_workspace_path(&workspace_root) else {
196 return Vec::new();
197 };
198 all.into_iter()
199 .filter(|session| {
200 let direct = session
201 .workspace_root
202 .as_ref()
203 .and_then(|p| normalize_workspace_path(p))
204 .map(|p| p == normalized_workspace)
205 .unwrap_or(false);
206 if direct {
207 return true;
208 }
209 normalize_workspace_path(&session.directory)
210 .map(|p| p == normalized_workspace)
211 .unwrap_or(false)
212 })
213 .collect()
214 }
215 }
216 }
217
218 pub async fn get_session(&self, id: &str) -> Option<Session> {
219 self.sessions.read().await.get(id).cloned()
220 }
221
222 pub async fn save_session(&self, mut session: Session) -> anyhow::Result<()> {
223 if session.workspace_root.is_none() {
224 session.workspace_root = normalize_workspace_path(&session.directory);
225 }
226 let session_id = session.id.clone();
227 self.sessions
228 .write()
229 .await
230 .insert(session_id.clone(), session);
231 self.metadata
232 .write()
233 .await
234 .entry(session_id)
235 .or_insert_with(SessionMeta::default);
236 self.flush().await
237 }
238
239 pub async fn repair_sessions_from_file_store(&self) -> anyhow::Result<SessionRepairStats> {
240 let mut stats = SessionRepairStats::default();
241 let mut sessions = self.sessions.write().await;
242
243 for session in sessions.values_mut() {
244 let imported = load_legacy_session_messages(&self.base, &session.id);
245 if imported.is_empty() {
246 continue;
247 }
248
249 let (merged, merge_stats, changed) =
250 merge_session_messages(&session.messages, &imported);
251 if changed {
252 session.messages = merged;
253 session.time.updated =
254 most_recent_message_time(&session.messages).unwrap_or(session.time.updated);
255 stats.sessions_repaired += 1;
256 stats.messages_recovered += merge_stats.messages_recovered;
257 stats.parts_recovered += merge_stats.parts_recovered;
258 stats.conflicts_merged += merge_stats.conflicts_merged;
259 }
260 }
261
262 if stats.sessions_repaired > 0 {
263 drop(sessions);
264 self.flush().await?;
265 }
266
267 Ok(stats)
268 }
269
270 pub async fn run_legacy_storage_repair_scan(
271 &self,
272 force: bool,
273 ) -> anyhow::Result<LegacyRepairRunReport> {
274 let marker_path = self.base.join(LEGACY_IMPORT_MARKER_FILE);
275 let sessions_exists = self.base.join("sessions.json").exists();
276 let should_scan = if force {
277 true
278 } else {
279 should_run_legacy_scan_on_startup(&marker_path, sessions_exists).await
280 };
281 if !should_scan {
282 let marker = read_legacy_import_marker(&marker_path)
283 .await
284 .unwrap_or_else(|| LegacyImportMarker {
285 version: LEGACY_IMPORT_MARKER_VERSION,
286 created_at_ms: now_ms_u64(),
287 last_checked_at_ms: now_ms_u64(),
288 legacy_counts: LegacyTreeCounts::default(),
289 imported_counts: LegacyImportedCounts::default(),
290 });
291 return Ok(LegacyRepairRunReport {
292 status: "skipped".to_string(),
293 marker_updated: false,
294 sessions_merged: 0,
295 messages_recovered: 0,
296 parts_recovered: 0,
297 legacy_counts: marker.legacy_counts,
298 imported_counts: marker.imported_counts,
299 });
300 }
301
302 let base_for_scan = self.base.clone();
303 let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
304 .await
305 .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
306
307 let merge_stats = {
308 let mut sessions = self.sessions.write().await;
309 merge_legacy_sessions_with_stats(&mut sessions, scan.sessions)
310 };
311
312 if merge_stats.changed {
313 self.flush().await?;
314 }
315
316 let marker = LegacyImportMarker {
317 version: LEGACY_IMPORT_MARKER_VERSION,
318 created_at_ms: now_ms_u64(),
319 last_checked_at_ms: now_ms_u64(),
320 legacy_counts: scan.legacy_counts.clone(),
321 imported_counts: scan.imported_counts.clone(),
322 };
323 self.write_legacy_import_marker(&marker).await?;
324
325 Ok(LegacyRepairRunReport {
326 status: if merge_stats.changed {
327 "updated".to_string()
328 } else {
329 "no_changes".to_string()
330 },
331 marker_updated: true,
332 sessions_merged: merge_stats.sessions_merged,
333 messages_recovered: merge_stats.messages_recovered,
334 parts_recovered: merge_stats.parts_recovered,
335 legacy_counts: scan.legacy_counts,
336 imported_counts: scan.imported_counts,
337 })
338 }
339
340 pub async fn delete_session(&self, id: &str) -> anyhow::Result<bool> {
341 let removed = self.sessions.write().await.remove(id).is_some();
342 self.metadata.write().await.remove(id);
343 self.question_requests
344 .write()
345 .await
346 .retain(|_, request| request.session_id != id);
347 if removed {
348 self.flush().await?;
349 }
350 Ok(removed)
351 }
352
353 pub async fn append_message(&self, session_id: &str, msg: Message) -> anyhow::Result<()> {
354 let mut sessions = self.sessions.write().await;
355 let session = sessions
356 .get_mut(session_id)
357 .context("session not found for append_message")?;
358 let mut meta_guard = self.metadata.write().await;
359 let meta = meta_guard
360 .entry(session_id.to_string())
361 .or_insert_with(SessionMeta::default);
362 meta.snapshots.push(session.messages.clone());
363 if meta.snapshots.len() > 25 {
364 let _ = meta.snapshots.remove(0);
365 }
366 session.messages.push(msg);
367 session.time.updated = Utc::now();
368 drop(sessions);
369 drop(meta_guard);
370 self.flush().await
371 }
372
373 pub async fn fork_session(&self, id: &str) -> anyhow::Result<Option<Session>> {
374 let source = {
375 let sessions = self.sessions.read().await;
376 sessions.get(id).cloned()
377 };
378 let Some(mut child) = source else {
379 return Ok(None);
380 };
381
382 child.id = Uuid::new_v4().to_string();
383 child.title = format!("{} (fork)", child.title);
384 child.time.created = Utc::now();
385 child.time.updated = child.time.created;
386 child.slug = None;
387
388 self.sessions
389 .write()
390 .await
391 .insert(child.id.clone(), child.clone());
392 self.metadata.write().await.insert(
393 child.id.clone(),
394 SessionMeta {
395 parent_id: Some(id.to_string()),
396 snapshots: vec![child.messages.clone()],
397 ..SessionMeta::default()
398 },
399 );
400 self.flush().await?;
401 Ok(Some(child))
402 }
403
404 pub async fn revert_session(&self, id: &str) -> anyhow::Result<bool> {
405 let mut sessions = self.sessions.write().await;
406 let Some(session) = sessions.get_mut(id) else {
407 return Ok(false);
408 };
409 let mut metadata = self.metadata.write().await;
410 let meta = metadata
411 .entry(id.to_string())
412 .or_insert_with(SessionMeta::default);
413 let Some(snapshot) = meta.snapshots.pop() else {
414 return Ok(false);
415 };
416 meta.pre_revert = Some(session.messages.clone());
417 session.messages = snapshot;
418 session.time.updated = Utc::now();
419 drop(metadata);
420 drop(sessions);
421 self.flush().await?;
422 Ok(true)
423 }
424
425 pub async fn unrevert_session(&self, id: &str) -> anyhow::Result<bool> {
426 let mut sessions = self.sessions.write().await;
427 let Some(session) = sessions.get_mut(id) else {
428 return Ok(false);
429 };
430 let mut metadata = self.metadata.write().await;
431 let Some(meta) = metadata.get_mut(id) else {
432 return Ok(false);
433 };
434 let Some(previous) = meta.pre_revert.take() else {
435 return Ok(false);
436 };
437 meta.snapshots.push(session.messages.clone());
438 session.messages = previous;
439 session.time.updated = Utc::now();
440 drop(metadata);
441 drop(sessions);
442 self.flush().await?;
443 Ok(true)
444 }
445
446 pub async fn set_shared(&self, id: &str, shared: bool) -> anyhow::Result<Option<String>> {
447 let mut metadata = self.metadata.write().await;
448 let meta = metadata
449 .entry(id.to_string())
450 .or_insert_with(SessionMeta::default);
451 meta.shared = shared;
452 if shared {
453 if meta.share_id.is_none() {
454 meta.share_id = Some(Uuid::new_v4().to_string());
455 }
456 } else {
457 meta.share_id = None;
458 }
459 let share_id = meta.share_id.clone();
460 drop(metadata);
461 self.flush().await?;
462 Ok(share_id)
463 }
464
465 pub async fn set_archived(&self, id: &str, archived: bool) -> anyhow::Result<bool> {
466 let mut metadata = self.metadata.write().await;
467 let meta = metadata
468 .entry(id.to_string())
469 .or_insert_with(SessionMeta::default);
470 meta.archived = archived;
471 drop(metadata);
472 self.flush().await?;
473 Ok(true)
474 }
475
476 pub async fn set_summary(&self, id: &str, summary: String) -> anyhow::Result<bool> {
477 let mut metadata = self.metadata.write().await;
478 let meta = metadata
479 .entry(id.to_string())
480 .or_insert_with(SessionMeta::default);
481 meta.summary = Some(summary);
482 drop(metadata);
483 self.flush().await?;
484 Ok(true)
485 }
486
487 pub async fn children(&self, parent_id: &str) -> Vec<Session> {
488 let child_ids = {
489 let metadata = self.metadata.read().await;
490 metadata
491 .iter()
492 .filter(|(_, meta)| meta.parent_id.as_deref() == Some(parent_id))
493 .map(|(id, _)| id.clone())
494 .collect::<Vec<_>>()
495 };
496 let sessions = self.sessions.read().await;
497 child_ids
498 .into_iter()
499 .filter_map(|id| sessions.get(&id).cloned())
500 .collect()
501 }
502
503 pub async fn session_status(&self, id: &str) -> Option<Value> {
504 let metadata = self.metadata.read().await;
505 metadata.get(id).map(|meta| {
506 json!({
507 "archived": meta.archived,
508 "shared": meta.shared,
509 "parentID": meta.parent_id,
510 "snapshotCount": meta.snapshots.len()
511 })
512 })
513 }
514
515 pub async fn session_diff(&self, id: &str) -> Option<Value> {
516 let sessions = self.sessions.read().await;
517 let current = sessions.get(id)?;
518 let metadata = self.metadata.read().await;
519 let default = SessionMeta::default();
520 let meta = metadata.get(id).unwrap_or(&default);
521 let last_snapshot_len = meta.snapshots.last().map(|s| s.len()).unwrap_or(0);
522 Some(json!({
523 "sessionID": id,
524 "currentMessageCount": current.messages.len(),
525 "lastSnapshotMessageCount": last_snapshot_len,
526 "delta": current.messages.len() as i64 - last_snapshot_len as i64
527 }))
528 }
529
530 pub async fn set_todos(&self, id: &str, todos: Vec<Value>) -> anyhow::Result<()> {
531 let mut metadata = self.metadata.write().await;
532 let meta = metadata
533 .entry(id.to_string())
534 .or_insert_with(SessionMeta::default);
535 meta.todos = normalize_todo_items(todos);
536 drop(metadata);
537 self.flush().await
538 }
539
540 pub async fn get_todos(&self, id: &str) -> Vec<Value> {
541 let todos = self
542 .metadata
543 .read()
544 .await
545 .get(id)
546 .map(|meta| meta.todos.clone())
547 .unwrap_or_default();
548 normalize_todo_items(todos)
549 }
550
551 pub async fn add_question_request(
552 &self,
553 session_id: &str,
554 message_id: &str,
555 questions: Vec<Value>,
556 ) -> anyhow::Result<QuestionRequest> {
557 let request = QuestionRequest {
558 id: format!("q-{}", Uuid::new_v4()),
559 session_id: session_id.to_string(),
560 questions,
561 tool: Some(QuestionToolRef {
562 call_id: format!("call-{}", Uuid::new_v4()),
563 message_id: message_id.to_string(),
564 }),
565 };
566 self.question_requests
567 .write()
568 .await
569 .insert(request.id.clone(), request.clone());
570 self.flush().await?;
571 Ok(request)
572 }
573
574 pub async fn list_question_requests(&self) -> Vec<QuestionRequest> {
575 self.question_requests
576 .read()
577 .await
578 .values()
579 .cloned()
580 .collect()
581 }
582
583 pub async fn reply_question(&self, request_id: &str) -> anyhow::Result<bool> {
584 let removed = self
585 .question_requests
586 .write()
587 .await
588 .remove(request_id)
589 .is_some();
590 if removed {
591 self.flush().await?;
592 }
593 Ok(removed)
594 }
595
596 pub async fn reject_question(&self, request_id: &str) -> anyhow::Result<bool> {
597 self.reply_question(request_id).await
598 }
599
600 pub async fn attach_session_to_workspace(
601 &self,
602 session_id: &str,
603 target_workspace: &str,
604 reason_tag: &str,
605 ) -> anyhow::Result<Option<Session>> {
606 let Some(target_workspace) = normalize_workspace_path(target_workspace) else {
607 return Ok(None);
608 };
609 let mut sessions = self.sessions.write().await;
610 let Some(session) = sessions.get_mut(session_id) else {
611 return Ok(None);
612 };
613 let previous_workspace = session
614 .workspace_root
615 .clone()
616 .or_else(|| normalize_workspace_path(&session.directory));
617
618 if session.origin_workspace_root.is_none() {
619 session.origin_workspace_root = previous_workspace.clone();
620 }
621 session.attached_from_workspace = previous_workspace;
622 session.attached_to_workspace = Some(target_workspace.clone());
623 session.attach_timestamp_ms = Some(Utc::now().timestamp_millis().max(0) as u64);
624 session.attach_reason = Some(reason_tag.trim().to_string());
625 session.workspace_root = Some(target_workspace.clone());
626 session.directory = target_workspace;
627 session.time.updated = Utc::now();
628 let updated = session.clone();
629 drop(sessions);
630 self.flush().await?;
631 Ok(Some(updated))
632 }
633
634 async fn flush(&self) -> anyhow::Result<()> {
635 let snapshot = self.sessions.read().await.clone();
636 let payload = serde_json::to_string_pretty(&snapshot)?;
637 fs::write(self.base.join("sessions.json"), payload).await?;
638 let metadata_snapshot = self.metadata.read().await.clone();
639 let metadata_payload = serde_json::to_string_pretty(&metadata_snapshot)?;
640 fs::write(self.base.join("session_meta.json"), metadata_payload).await?;
641 let questions_snapshot = self.question_requests.read().await.clone();
642 let questions_payload = serde_json::to_string_pretty(&questions_snapshot)?;
643 fs::write(self.base.join("questions.json"), questions_payload).await?;
644 Ok(())
645 }
646
647 async fn write_legacy_import_marker(&self, marker: &LegacyImportMarker) -> anyhow::Result<()> {
648 let payload = serde_json::to_string_pretty(marker)?;
649 fs::write(self.base.join(LEGACY_IMPORT_MARKER_FILE), payload).await?;
650 Ok(())
651 }
652}
653
654fn normalize_todo_items(items: Vec<Value>) -> Vec<Value> {
655 items
656 .into_iter()
657 .filter_map(|item| {
658 let obj = item.as_object()?;
659 let content = obj
660 .get("content")
661 .and_then(|v| v.as_str())
662 .or_else(|| obj.get("text").and_then(|v| v.as_str()))
663 .unwrap_or("")
664 .trim()
665 .to_string();
666 if content.is_empty() {
667 return None;
668 }
669 let id = obj
670 .get("id")
671 .and_then(|v| v.as_str())
672 .filter(|s| !s.trim().is_empty())
673 .map(ToString::to_string)
674 .unwrap_or_else(|| format!("todo-{}", Uuid::new_v4()));
675 let status = obj
676 .get("status")
677 .and_then(|v| v.as_str())
678 .filter(|s| !s.trim().is_empty())
679 .map(ToString::to_string)
680 .unwrap_or_else(|| "pending".to_string());
681 Some(json!({
682 "id": id,
683 "content": content,
684 "status": status
685 }))
686 })
687 .collect()
688}
689
690#[derive(Debug)]
691struct LegacyScanResult {
692 sessions: HashMap<String, Session>,
693 legacy_counts: LegacyTreeCounts,
694 imported_counts: LegacyImportedCounts,
695}
696
697#[derive(Debug, Default)]
698struct LegacyMergeStats {
699 changed: bool,
700 sessions_merged: u64,
701 messages_recovered: u64,
702 parts_recovered: u64,
703}
704
705fn now_ms_u64() -> u64 {
706 Utc::now().timestamp_millis().max(0) as u64
707}
708
709async fn should_run_legacy_scan_on_startup(marker_path: &Path, sessions_exist: bool) -> bool {
710 if !sessions_exist {
711 return true;
712 }
713 if read_legacy_import_marker(marker_path).await.is_none() {
716 return false;
717 }
718 false
719}
720
721async fn read_legacy_import_marker(marker_path: &Path) -> Option<LegacyImportMarker> {
722 let raw = fs::read_to_string(marker_path).await.ok()?;
723 serde_json::from_str::<LegacyImportMarker>(&raw).ok()
724}
725
726fn scan_legacy_sessions(base: &Path) -> anyhow::Result<LegacyScanResult> {
727 let sessions = load_legacy_opencode_sessions(base).unwrap_or_default();
728 let imported_counts = LegacyImportedCounts {
729 sessions: sessions.len() as u64,
730 messages: sessions.values().map(|s| s.messages.len() as u64).sum(),
731 parts: sessions
732 .values()
733 .flat_map(|s| s.messages.iter())
734 .map(|m| m.parts.len() as u64)
735 .sum(),
736 };
737 let legacy_counts = LegacyTreeCounts {
738 session_files: count_legacy_json_files(&base.join("session")),
739 message_files: count_legacy_json_files(&base.join("message")),
740 part_files: count_legacy_json_files(&base.join("part")),
741 };
742 Ok(LegacyScanResult {
743 sessions,
744 legacy_counts,
745 imported_counts,
746 })
747}
748
749fn count_legacy_json_files(root: &Path) -> u64 {
750 if !root.is_dir() {
751 return 0;
752 }
753 let mut count = 0u64;
754 let mut stack = vec![root.to_path_buf()];
755 while let Some(dir) = stack.pop() {
756 if let Ok(entries) = std::fs::read_dir(&dir) {
757 for entry in entries.flatten() {
758 let path = entry.path();
759 if entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
760 stack.push(path);
761 continue;
762 }
763 if path.extension().and_then(|ext| ext.to_str()) == Some("json") {
764 count += 1;
765 }
766 }
767 }
768 }
769 count
770}
771
772fn merge_legacy_sessions(
773 current: &mut HashMap<String, Session>,
774 imported: HashMap<String, Session>,
775) -> bool {
776 merge_legacy_sessions_with_stats(current, imported).changed
777}
778
779fn merge_legacy_sessions_with_stats(
780 current: &mut HashMap<String, Session>,
781 imported: HashMap<String, Session>,
782) -> LegacyMergeStats {
783 let mut stats = LegacyMergeStats::default();
784 for (id, legacy) in imported {
785 let legacy_message_count = legacy.messages.len() as u64;
786 let legacy_part_count = legacy
787 .messages
788 .iter()
789 .map(|m| m.parts.len() as u64)
790 .sum::<u64>();
791 match current.get_mut(&id) {
792 None => {
793 current.insert(id, legacy);
794 stats.changed = true;
795 stats.sessions_merged += 1;
796 stats.messages_recovered += legacy_message_count;
797 stats.parts_recovered += legacy_part_count;
798 }
799 Some(existing) => {
800 let should_merge_messages =
801 existing.messages.is_empty() && !legacy.messages.is_empty();
802 let should_fill_title =
803 existing.title.trim().is_empty() && !legacy.title.trim().is_empty();
804 let should_fill_directory = (existing.directory.trim().is_empty()
805 || existing.directory.trim() == "."
806 || existing.directory.trim() == "./"
807 || existing.directory.trim() == ".\\")
808 && !legacy.directory.trim().is_empty();
809 let should_fill_workspace =
810 existing.workspace_root.is_none() && legacy.workspace_root.is_some();
811 if should_merge_messages {
812 existing.messages = legacy.messages.clone();
813 }
814 if should_fill_title {
815 existing.title = legacy.title.clone();
816 }
817 if should_fill_directory {
818 existing.directory = legacy.directory.clone();
819 }
820 if should_fill_workspace {
821 existing.workspace_root = legacy.workspace_root.clone();
822 }
823 if should_merge_messages
824 || should_fill_title
825 || should_fill_directory
826 || should_fill_workspace
827 {
828 stats.changed = true;
829 if should_merge_messages {
830 stats.sessions_merged += 1;
831 stats.messages_recovered += legacy_message_count;
832 stats.parts_recovered += legacy_part_count;
833 }
834 }
835 }
836 }
837 }
838 stats
839}
840
841fn hydrate_workspace_roots(sessions: &mut HashMap<String, Session>) -> bool {
842 let mut changed = false;
843 for session in sessions.values_mut() {
844 if session.workspace_root.is_none() {
845 let normalized = normalize_workspace_path(&session.directory);
846 if normalized.is_some() {
847 session.workspace_root = normalized;
848 changed = true;
849 }
850 }
851 }
852 changed
853}
854
855fn repair_session_titles(sessions: &mut HashMap<String, Session>) -> bool {
856 let mut changed = false;
857 for session in sessions.values_mut() {
858 if !title_needs_repair(&session.title) {
859 continue;
860 }
861 let first_user_text = session.messages.iter().find_map(|message| {
862 if !matches!(message.role, MessageRole::User) {
863 return None;
864 }
865 message.parts.iter().find_map(|part| match part {
866 MessagePart::Text { text } if !text.trim().is_empty() => Some(text.as_str()),
867 _ => None,
868 })
869 });
870 let Some(source) = first_user_text else {
871 continue;
872 };
873 let Some(derived) = derive_session_title_from_prompt(source, 60) else {
874 continue;
875 };
876 if derived == session.title {
877 continue;
878 }
879 session.title = derived;
880 session.time.updated = Utc::now();
881 changed = true;
882 }
883 changed
884}
885
886#[derive(Debug, Deserialize)]
887struct LegacySessionTime {
888 created: i64,
889 updated: i64,
890}
891
892#[derive(Debug, Deserialize)]
893struct LegacySession {
894 id: String,
895 slug: Option<String>,
896 version: Option<String>,
897 #[serde(rename = "projectID")]
898 project_id: Option<String>,
899 title: Option<String>,
900 directory: Option<String>,
901 time: LegacySessionTime,
902}
903
904fn load_legacy_opencode_sessions(base: &Path) -> anyhow::Result<HashMap<String, Session>> {
905 let legacy_root = base.join("session");
906 if !legacy_root.is_dir() {
907 return Ok(HashMap::new());
908 }
909
910 let mut out = HashMap::new();
911 let mut stack = vec![legacy_root];
912 while let Some(dir) = stack.pop() {
913 for entry in std::fs::read_dir(&dir)? {
914 let entry = entry?;
915 let path = entry.path();
916 if entry.file_type()?.is_dir() {
917 stack.push(path);
918 continue;
919 }
920 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
921 continue;
922 }
923 let raw = match std::fs::read_to_string(&path) {
924 Ok(v) => v,
925 Err(_) => continue,
926 };
927 let legacy = match serde_json::from_str::<LegacySession>(&raw) {
928 Ok(v) => v,
929 Err(_) => continue,
930 };
931 let created = Utc
932 .timestamp_millis_opt(legacy.time.created)
933 .single()
934 .unwrap_or_else(Utc::now);
935 let updated = Utc
936 .timestamp_millis_opt(legacy.time.updated)
937 .single()
938 .unwrap_or(created);
939
940 let session_id = legacy.id.clone();
941 out.insert(
942 session_id.clone(),
943 Session {
944 id: session_id.clone(),
945 slug: legacy.slug,
946 version: legacy.version,
947 project_id: legacy.project_id,
948 title: legacy
949 .title
950 .filter(|s| !s.trim().is_empty())
951 .unwrap_or_else(|| "New session".to_string()),
952 directory: legacy
953 .directory
954 .clone()
955 .filter(|s| !s.trim().is_empty())
956 .unwrap_or_else(|| ".".to_string()),
957 workspace_root: legacy
958 .directory
959 .as_deref()
960 .and_then(normalize_workspace_path),
961 origin_workspace_root: None,
962 attached_from_workspace: None,
963 attached_to_workspace: None,
964 attach_timestamp_ms: None,
965 attach_reason: None,
966 time: tandem_types::SessionTime { created, updated },
967 model: None,
968 provider: None,
969 messages: load_legacy_session_messages(base, &session_id),
970 },
971 );
972 }
973 }
974 Ok(out)
975}
976
977#[derive(Debug, Deserialize)]
978struct LegacyMessageTime {
979 created: i64,
980}
981
982#[derive(Debug, Deserialize)]
983struct LegacyMessage {
984 id: String,
985 role: String,
986 time: LegacyMessageTime,
987}
988
989#[derive(Debug, Deserialize)]
990struct LegacyPart {
991 #[serde(rename = "type")]
992 part_type: Option<String>,
993 text: Option<String>,
994 tool: Option<String>,
995 args: Option<Value>,
996 result: Option<Value>,
997 error: Option<String>,
998}
999
1000fn load_legacy_session_messages(base: &Path, session_id: &str) -> Vec<Message> {
1001 let msg_dir = base.join("message").join(session_id);
1002 if !msg_dir.is_dir() {
1003 return Vec::new();
1004 }
1005
1006 let mut legacy_messages: Vec<(i64, Message)> = Vec::new();
1007
1008 let Ok(entries) = std::fs::read_dir(&msg_dir) else {
1009 return Vec::new();
1010 };
1011
1012 for entry in entries.flatten() {
1013 let path = entry.path();
1014 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1015 continue;
1016 }
1017 let Ok(raw) = std::fs::read_to_string(&path) else {
1018 continue;
1019 };
1020 let Ok(legacy) = serde_json::from_str::<LegacyMessage>(&raw) else {
1021 continue;
1022 };
1023
1024 let created_at = Utc
1025 .timestamp_millis_opt(legacy.time.created)
1026 .single()
1027 .unwrap_or_else(Utc::now);
1028
1029 legacy_messages.push((
1030 legacy.time.created,
1031 Message {
1032 id: legacy.id.clone(),
1033 role: legacy_role_to_message_role(&legacy.role),
1034 parts: load_legacy_message_parts(base, &legacy.id),
1035 created_at,
1036 },
1037 ));
1038 }
1039
1040 legacy_messages.sort_by_key(|(created_ms, _)| *created_ms);
1041 legacy_messages.into_iter().map(|(_, msg)| msg).collect()
1042}
1043
1044fn load_legacy_message_parts(base: &Path, message_id: &str) -> Vec<MessagePart> {
1045 let parts_dir = base.join("part").join(message_id);
1046 if !parts_dir.is_dir() {
1047 return Vec::new();
1048 }
1049
1050 let Ok(entries) = std::fs::read_dir(&parts_dir) else {
1051 return Vec::new();
1052 };
1053
1054 let mut out = Vec::new();
1055 for entry in entries.flatten() {
1056 let path = entry.path();
1057 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1058 continue;
1059 }
1060 let Ok(raw) = std::fs::read_to_string(&path) else {
1061 continue;
1062 };
1063 let Ok(part) = serde_json::from_str::<LegacyPart>(&raw) else {
1064 continue;
1065 };
1066
1067 let mapped = if let Some(tool) = part.tool {
1068 Some(MessagePart::ToolInvocation {
1069 tool,
1070 args: part.args.unwrap_or_else(|| json!({})),
1071 result: part.result,
1072 error: part.error,
1073 })
1074 } else {
1075 match part.part_type.as_deref() {
1076 Some("reasoning") => Some(MessagePart::Reasoning {
1077 text: part.text.unwrap_or_default(),
1078 }),
1079 Some("tool") => Some(MessagePart::ToolInvocation {
1080 tool: "tool".to_string(),
1081 args: part.args.unwrap_or_else(|| json!({})),
1082 result: part.result,
1083 error: part.error,
1084 }),
1085 Some("text") | None => Some(MessagePart::Text {
1086 text: part.text.unwrap_or_default(),
1087 }),
1088 _ => None,
1089 }
1090 };
1091
1092 if let Some(part) = mapped {
1093 out.push(part);
1094 }
1095 }
1096 out
1097}
1098
1099fn legacy_role_to_message_role(role: &str) -> MessageRole {
1100 match role.to_lowercase().as_str() {
1101 "user" => MessageRole::User,
1102 "assistant" => MessageRole::Assistant,
1103 "system" => MessageRole::System,
1104 "tool" => MessageRole::Tool,
1105 _ => MessageRole::Assistant,
1106 }
1107}
1108
1109#[derive(Debug, Clone, Default)]
1110struct MessageMergeStats {
1111 messages_recovered: u64,
1112 parts_recovered: u64,
1113 conflicts_merged: u64,
1114}
1115
1116fn message_richness(msg: &Message) -> usize {
1117 msg.parts
1118 .iter()
1119 .map(|p| match p {
1120 MessagePart::Text { text } | MessagePart::Reasoning { text } => {
1121 if text.trim().is_empty() {
1122 0
1123 } else {
1124 1
1125 }
1126 }
1127 MessagePart::ToolInvocation { result, error, .. } => {
1128 if result.is_some() || error.is_some() {
1129 2
1130 } else {
1131 1
1132 }
1133 }
1134 })
1135 .sum()
1136}
1137
1138fn most_recent_message_time(messages: &[Message]) -> Option<chrono::DateTime<Utc>> {
1139 messages.iter().map(|m| m.created_at).max()
1140}
1141
1142fn merge_session_messages(
1143 existing: &[Message],
1144 imported: &[Message],
1145) -> (Vec<Message>, MessageMergeStats, bool) {
1146 if existing.is_empty() {
1147 let messages_recovered = imported.len() as u64;
1148 let parts_recovered = imported.iter().map(|m| m.parts.len() as u64).sum();
1149 return (
1150 imported.to_vec(),
1151 MessageMergeStats {
1152 messages_recovered,
1153 parts_recovered,
1154 conflicts_merged: 0,
1155 },
1156 true,
1157 );
1158 }
1159
1160 let mut merged_by_id: HashMap<String, Message> = existing
1161 .iter()
1162 .cloned()
1163 .map(|m| (m.id.clone(), m))
1164 .collect();
1165 let mut stats = MessageMergeStats::default();
1166 let mut changed = false;
1167
1168 for incoming in imported {
1169 match merged_by_id.get(&incoming.id) {
1170 None => {
1171 merged_by_id.insert(incoming.id.clone(), incoming.clone());
1172 stats.messages_recovered += 1;
1173 stats.parts_recovered += incoming.parts.len() as u64;
1174 changed = true;
1175 }
1176 Some(current) => {
1177 let incoming_richer = message_richness(incoming) > message_richness(current)
1178 || incoming.parts.len() > current.parts.len();
1179 if incoming_richer {
1180 merged_by_id.insert(incoming.id.clone(), incoming.clone());
1181 stats.conflicts_merged += 1;
1182 changed = true;
1183 }
1184 }
1185 }
1186 }
1187
1188 let mut out: Vec<Message> = merged_by_id.into_values().collect();
1189 out.sort_by_key(|m| m.created_at);
1190 (out, stats, changed)
1191}
1192
1193#[cfg(test)]
1194mod tests {
1195 use super::*;
1196 use std::fs as stdfs;
1197
1198 #[tokio::test]
1199 async fn todos_are_normalized_to_wire_shape() {
1200 let base = std::env::temp_dir().join(format!("tandem-core-test-{}", Uuid::new_v4()));
1201 let storage = Storage::new(&base).await.expect("storage");
1202 let session = Session::new(Some("test".to_string()), Some(".".to_string()));
1203 let id = session.id.clone();
1204 storage.save_session(session).await.expect("save session");
1205
1206 storage
1207 .set_todos(
1208 &id,
1209 vec![
1210 json!({"content":"first"}),
1211 json!({"text":"second", "status":"in_progress"}),
1212 json!({"id":"keep-id","content":"third","status":"completed"}),
1213 ],
1214 )
1215 .await
1216 .expect("set todos");
1217
1218 let todos = storage.get_todos(&id).await;
1219 assert_eq!(todos.len(), 3);
1220 for todo in todos {
1221 assert!(todo.get("id").and_then(|v| v.as_str()).is_some());
1222 assert!(todo.get("content").and_then(|v| v.as_str()).is_some());
1223 assert!(todo.get("status").and_then(|v| v.as_str()).is_some());
1224 }
1225 }
1226
1227 #[tokio::test]
1228 async fn imports_legacy_opencode_session_index_when_sessions_json_missing() {
1229 let base =
1230 std::env::temp_dir().join(format!("tandem-core-legacy-import-{}", Uuid::new_v4()));
1231 let legacy_session_dir = base.join("session").join("global");
1232 stdfs::create_dir_all(&legacy_session_dir).expect("legacy session dir");
1233 stdfs::write(
1234 legacy_session_dir.join("ses_test.json"),
1235 r#"{
1236 "id": "ses_test",
1237 "slug": "test",
1238 "version": "1.0.0",
1239 "projectID": "proj_1",
1240 "directory": "C:\\work\\demo",
1241 "title": "Legacy Session",
1242 "time": { "created": 1770913145613, "updated": 1770913146613 }
1243}"#,
1244 )
1245 .expect("legacy session write");
1246
1247 let storage = Storage::new(&base).await.expect("storage");
1248 let sessions = storage.list_sessions().await;
1249 assert_eq!(sessions.len(), 1);
1250 assert_eq!(sessions[0].id, "ses_test");
1251 assert_eq!(sessions[0].title, "Legacy Session");
1252 assert!(base.join("sessions.json").exists());
1253 }
1254
1255 #[tokio::test]
1256 async fn imports_legacy_messages_and_parts_for_session() {
1257 let base = std::env::temp_dir().join(format!("tandem-core-legacy-msg-{}", Uuid::new_v4()));
1258 let session_dir = base.join("session").join("global");
1259 let message_dir = base.join("message").join("ses_test");
1260 let part_dir = base.join("part").join("msg_1");
1261 stdfs::create_dir_all(&session_dir).expect("session dir");
1262 stdfs::create_dir_all(&message_dir).expect("message dir");
1263 stdfs::create_dir_all(&part_dir).expect("part dir");
1264
1265 stdfs::write(
1266 session_dir.join("ses_test.json"),
1267 r#"{
1268 "id": "ses_test",
1269 "projectID": "proj_1",
1270 "directory": "C:\\work\\demo",
1271 "title": "Legacy Session",
1272 "time": { "created": 1770913145613, "updated": 1770913146613 }
1273}"#,
1274 )
1275 .expect("write session");
1276
1277 stdfs::write(
1278 message_dir.join("msg_1.json"),
1279 r#"{
1280 "id": "msg_1",
1281 "sessionID": "ses_test",
1282 "role": "assistant",
1283 "time": { "created": 1770913145613 }
1284}"#,
1285 )
1286 .expect("write msg");
1287
1288 stdfs::write(
1289 part_dir.join("prt_1.json"),
1290 r#"{
1291 "id": "prt_1",
1292 "sessionID": "ses_test",
1293 "messageID": "msg_1",
1294 "type": "text",
1295 "text": "hello from legacy"
1296}"#,
1297 )
1298 .expect("write part");
1299
1300 let storage = Storage::new(&base).await.expect("storage");
1301 let sessions = storage.list_sessions().await;
1302 assert_eq!(sessions.len(), 1);
1303 assert_eq!(sessions[0].messages.len(), 1);
1304 assert_eq!(sessions[0].messages[0].parts.len(), 1);
1305 }
1306
1307 #[tokio::test]
1308 async fn skips_legacy_merge_when_sessions_json_exists() {
1309 let base =
1310 std::env::temp_dir().join(format!("tandem-core-legacy-merge-{}", Uuid::new_v4()));
1311 stdfs::create_dir_all(&base).expect("base");
1312 stdfs::write(
1313 base.join("sessions.json"),
1314 r#"{
1315 "ses_current": {
1316 "id": "ses_current",
1317 "slug": null,
1318 "version": "v1",
1319 "project_id": null,
1320 "title": "Current Session",
1321 "directory": ".",
1322 "workspace_root": null,
1323 "origin_workspace_root": null,
1324 "attached_from_workspace": null,
1325 "attached_to_workspace": null,
1326 "attach_timestamp_ms": null,
1327 "attach_reason": null,
1328 "time": {"created":"2026-01-01T00:00:00Z","updated":"2026-01-01T00:00:00Z"},
1329 "model": null,
1330 "provider": null,
1331 "messages": []
1332 }
1333}"#,
1334 )
1335 .expect("sessions.json");
1336
1337 let legacy_session_dir = base.join("session").join("global");
1338 stdfs::create_dir_all(&legacy_session_dir).expect("legacy session dir");
1339 stdfs::write(
1340 legacy_session_dir.join("ses_legacy.json"),
1341 r#"{
1342 "id": "ses_legacy",
1343 "slug": "legacy",
1344 "version": "1.0.0",
1345 "projectID": "proj_legacy",
1346 "directory": "C:\\work\\legacy",
1347 "title": "Legacy Session",
1348 "time": { "created": 1770913145613, "updated": 1770913146613 }
1349}"#,
1350 )
1351 .expect("legacy session write");
1352
1353 let storage = Storage::new(&base).await.expect("storage");
1354 let sessions = storage.list_sessions().await;
1355 let ids = sessions.iter().map(|s| s.id.clone()).collect::<Vec<_>>();
1356 assert!(ids.contains(&"ses_current".to_string()));
1357 assert!(!ids.contains(&"ses_legacy".to_string()));
1358 }
1359
1360 #[tokio::test]
1361 async fn list_sessions_scoped_filters_by_workspace_root() {
1362 let base = std::env::temp_dir().join(format!("tandem-core-scope-{}", Uuid::new_v4()));
1363 let storage = Storage::new(&base).await.expect("storage");
1364 let ws_a = base.join("ws-a");
1365 let ws_b = base.join("ws-b");
1366 stdfs::create_dir_all(&ws_a).expect("ws_a");
1367 stdfs::create_dir_all(&ws_b).expect("ws_b");
1368 let ws_a_str = ws_a.to_string_lossy().to_string();
1369 let ws_b_str = ws_b.to_string_lossy().to_string();
1370
1371 let mut a = Session::new(Some("a".to_string()), Some(ws_a_str.clone()));
1372 a.workspace_root = Some(ws_a_str.clone());
1373 storage.save_session(a).await.expect("save a");
1374
1375 let mut b = Session::new(Some("b".to_string()), Some(ws_b_str.clone()));
1376 b.workspace_root = Some(ws_b_str);
1377 storage.save_session(b).await.expect("save b");
1378
1379 let scoped = storage
1380 .list_sessions_scoped(SessionListScope::Workspace {
1381 workspace_root: ws_a_str,
1382 })
1383 .await;
1384 assert_eq!(scoped.len(), 1);
1385 assert_eq!(scoped[0].title, "a");
1386 }
1387
1388 #[tokio::test]
1389 async fn attach_session_persists_audit_metadata() {
1390 let base = std::env::temp_dir().join(format!("tandem-core-attach-{}", Uuid::new_v4()));
1391 let storage = Storage::new(&base).await.expect("storage");
1392 let ws_a = base.join("ws-a");
1393 let ws_b = base.join("ws-b");
1394 stdfs::create_dir_all(&ws_a).expect("ws_a");
1395 stdfs::create_dir_all(&ws_b).expect("ws_b");
1396 let ws_a_str = ws_a.to_string_lossy().to_string();
1397 let ws_b_str = ws_b.to_string_lossy().to_string();
1398 let mut session = Session::new(Some("s".to_string()), Some(ws_a_str.clone()));
1399 session.workspace_root = Some(ws_a_str);
1400 let id = session.id.clone();
1401 storage.save_session(session).await.expect("save");
1402
1403 let updated = storage
1404 .attach_session_to_workspace(&id, &ws_b_str, "manual")
1405 .await
1406 .expect("attach")
1407 .expect("session exists");
1408 let normalized_expected = normalize_workspace_path(&ws_b_str).expect("normalized path");
1409 assert_eq!(
1410 updated.workspace_root.as_deref(),
1411 Some(normalized_expected.as_str())
1412 );
1413 assert_eq!(
1414 updated.attached_to_workspace.as_deref(),
1415 Some(normalized_expected.as_str())
1416 );
1417 assert_eq!(updated.attach_reason.as_deref(), Some("manual"));
1418 assert!(updated.attach_timestamp_ms.is_some());
1419 }
1420
1421 #[tokio::test]
1422 async fn startup_repairs_placeholder_titles_from_wrapped_user_messages() {
1423 let base =
1424 std::env::temp_dir().join(format!("tandem-core-title-repair-{}", Uuid::new_v4()));
1425 let storage = Storage::new(&base).await.expect("storage");
1426 let wrapped = "<memory_context>\n<current_session>\n- fact\n</current_session>\n</memory_context>\n\n[Mode instructions]\nUse tools.\n\n[User request]\nExplain this bug";
1427 let mut session = Session::new(Some("<memory_context>".to_string()), Some(".".to_string()));
1428 let id = session.id.clone();
1429 session.messages.push(Message::new(
1430 MessageRole::User,
1431 vec![MessagePart::Text {
1432 text: wrapped.to_string(),
1433 }],
1434 ));
1435 storage.save_session(session).await.expect("save");
1436 drop(storage);
1437
1438 let storage = Storage::new(&base).await.expect("storage");
1439 let repaired = storage.get_session(&id).await.expect("session");
1440 assert_eq!(repaired.title, "Explain this bug");
1441 }
1442}