1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use anyhow::Context;
5use chrono::{TimeZone, Utc};
6use serde::{Deserialize, Serialize};
7use serde_json::{json, Value};
8use tokio::fs;
9use tokio::sync::RwLock;
10use tokio::task;
11use uuid::Uuid;
12
13use tandem_types::{Message, MessagePart, MessageRole, Session};
14
15use crate::{derive_session_title_from_prompt, normalize_workspace_path, title_needs_repair};
16
17#[derive(Debug, Clone, Serialize, Deserialize, Default)]
18pub struct SessionMeta {
19 pub parent_id: Option<String>,
20 #[serde(default)]
21 pub archived: bool,
22 #[serde(default)]
23 pub shared: bool,
24 pub share_id: Option<String>,
25 pub summary: Option<String>,
26 #[serde(default)]
27 pub snapshots: Vec<Vec<Message>>,
28 pub pre_revert: Option<Vec<Message>>,
29 #[serde(default)]
30 pub todos: Vec<Value>,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct QuestionToolRef {
35 #[serde(rename = "callID")]
36 pub call_id: String,
37 #[serde(rename = "messageID")]
38 pub message_id: String,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct QuestionRequest {
43 pub id: String,
44 #[serde(rename = "sessionID")]
45 pub session_id: String,
46 #[serde(default)]
47 pub questions: Vec<Value>,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 pub tool: Option<QuestionToolRef>,
50}
51
52pub struct Storage {
53 base: PathBuf,
54 sessions: RwLock<HashMap<String, Session>>,
55 metadata: RwLock<HashMap<String, SessionMeta>>,
56 question_requests: RwLock<HashMap<String, QuestionRequest>>,
57}
58
59#[derive(Debug, Clone)]
60pub enum SessionListScope {
61 Global,
62 Workspace { workspace_root: String },
63}
64
65#[derive(Debug, Clone, Default, Serialize, Deserialize)]
66pub struct SessionRepairStats {
67 pub sessions_repaired: u64,
68 pub messages_recovered: u64,
69 pub parts_recovered: u64,
70 pub conflicts_merged: u64,
71}
72
73const LEGACY_IMPORT_MARKER_FILE: &str = "legacy_import_marker.json";
74const LEGACY_IMPORT_MARKER_VERSION: u32 = 1;
75
76#[derive(Debug, Clone, Default, Serialize, Deserialize)]
77pub struct LegacyTreeCounts {
78 pub session_files: u64,
79 pub message_files: u64,
80 pub part_files: u64,
81}
82
83#[derive(Debug, Clone, Default, Serialize, Deserialize)]
84pub struct LegacyImportedCounts {
85 pub sessions: u64,
86 pub messages: u64,
87 pub parts: u64,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct LegacyImportMarker {
92 pub version: u32,
93 pub created_at_ms: u64,
94 pub last_checked_at_ms: u64,
95 pub legacy_counts: LegacyTreeCounts,
96 pub imported_counts: LegacyImportedCounts,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct LegacyRepairRunReport {
101 pub status: String,
102 pub marker_updated: bool,
103 pub sessions_merged: u64,
104 pub messages_recovered: u64,
105 pub parts_recovered: u64,
106 pub legacy_counts: LegacyTreeCounts,
107 pub imported_counts: LegacyImportedCounts,
108}
109
110impl Storage {
111 pub async fn new(base: impl AsRef<Path>) -> anyhow::Result<Self> {
112 let base = base.as_ref().to_path_buf();
113 fs::create_dir_all(&base).await?;
114 let sessions_file = base.join("sessions.json");
115 let marker_path = base.join(LEGACY_IMPORT_MARKER_FILE);
116 let sessions_file_exists = sessions_file.exists();
117 let mut imported_legacy_sessions = false;
118 let mut sessions = if sessions_file_exists {
119 let raw = fs::read_to_string(&sessions_file).await?;
120 serde_json::from_str::<HashMap<String, Session>>(&raw).unwrap_or_default()
121 } else {
122 HashMap::new()
123 };
124
125 let mut marker_to_write = None;
126 if should_run_legacy_scan_on_startup(&marker_path, sessions_file_exists).await {
127 let base_for_scan = base.clone();
128 let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
129 .await
130 .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
131 if merge_legacy_sessions(&mut sessions, scan.sessions) {
132 imported_legacy_sessions = true;
133 }
134 marker_to_write = Some(LegacyImportMarker {
135 version: LEGACY_IMPORT_MARKER_VERSION,
136 created_at_ms: now_ms_u64(),
137 last_checked_at_ms: now_ms_u64(),
138 legacy_counts: scan.legacy_counts,
139 imported_counts: scan.imported_counts,
140 });
141 }
142
143 if hydrate_workspace_roots(&mut sessions) {
144 imported_legacy_sessions = true;
145 }
146 if repair_session_titles(&mut sessions) {
147 imported_legacy_sessions = true;
148 }
149 let metadata_file = base.join("session_meta.json");
150 let metadata = if metadata_file.exists() {
151 let raw = fs::read_to_string(&metadata_file).await?;
152 serde_json::from_str::<HashMap<String, SessionMeta>>(&raw).unwrap_or_default()
153 } else {
154 HashMap::new()
155 };
156 let questions_file = base.join("questions.json");
157 let question_requests = if questions_file.exists() {
158 let raw = fs::read_to_string(&questions_file).await?;
159 serde_json::from_str::<HashMap<String, QuestionRequest>>(&raw).unwrap_or_default()
160 } else {
161 HashMap::new()
162 };
163 let storage = Self {
164 base,
165 sessions: RwLock::new(sessions),
166 metadata: RwLock::new(metadata),
167 question_requests: RwLock::new(question_requests),
168 };
169
170 if imported_legacy_sessions {
171 storage.flush().await?;
172 }
173 if let Some(marker) = marker_to_write {
174 storage.write_legacy_import_marker(&marker).await?;
175 }
176
177 Ok(storage)
178 }
179
180 pub async fn list_sessions(&self) -> Vec<Session> {
181 self.list_sessions_scoped(SessionListScope::Global).await
182 }
183
184 pub async fn list_sessions_scoped(&self, scope: SessionListScope) -> Vec<Session> {
185 let all = self
186 .sessions
187 .read()
188 .await
189 .values()
190 .cloned()
191 .collect::<Vec<_>>();
192 match scope {
193 SessionListScope::Global => all,
194 SessionListScope::Workspace { workspace_root } => {
195 let Some(normalized_workspace) = normalize_workspace_path(&workspace_root) else {
196 return Vec::new();
197 };
198 all.into_iter()
199 .filter(|session| {
200 let direct = session
201 .workspace_root
202 .as_ref()
203 .and_then(|p| normalize_workspace_path(p))
204 .map(|p| p == normalized_workspace)
205 .unwrap_or(false);
206 if direct {
207 return true;
208 }
209 normalize_workspace_path(&session.directory)
210 .map(|p| p == normalized_workspace)
211 .unwrap_or(false)
212 })
213 .collect()
214 }
215 }
216 }
217
218 pub async fn get_session(&self, id: &str) -> Option<Session> {
219 self.sessions.read().await.get(id).cloned()
220 }
221
222 pub async fn save_session(&self, mut session: Session) -> anyhow::Result<()> {
223 if session.workspace_root.is_none() {
224 session.workspace_root = normalize_workspace_path(&session.directory);
225 }
226 let session_id = session.id.clone();
227 self.sessions
228 .write()
229 .await
230 .insert(session_id.clone(), session);
231 self.metadata
232 .write()
233 .await
234 .entry(session_id)
235 .or_insert_with(SessionMeta::default);
236 self.flush().await
237 }
238
239 pub async fn repair_sessions_from_file_store(&self) -> anyhow::Result<SessionRepairStats> {
240 let mut stats = SessionRepairStats::default();
241 let mut sessions = self.sessions.write().await;
242
243 for session in sessions.values_mut() {
244 let imported = load_legacy_session_messages(&self.base, &session.id);
245 if imported.is_empty() {
246 continue;
247 }
248
249 let (merged, merge_stats, changed) =
250 merge_session_messages(&session.messages, &imported);
251 if changed {
252 session.messages = merged;
253 session.time.updated =
254 most_recent_message_time(&session.messages).unwrap_or(session.time.updated);
255 stats.sessions_repaired += 1;
256 stats.messages_recovered += merge_stats.messages_recovered;
257 stats.parts_recovered += merge_stats.parts_recovered;
258 stats.conflicts_merged += merge_stats.conflicts_merged;
259 }
260 }
261
262 if stats.sessions_repaired > 0 {
263 drop(sessions);
264 self.flush().await?;
265 }
266
267 Ok(stats)
268 }
269
270 pub async fn run_legacy_storage_repair_scan(
271 &self,
272 force: bool,
273 ) -> anyhow::Result<LegacyRepairRunReport> {
274 let marker_path = self.base.join(LEGACY_IMPORT_MARKER_FILE);
275 let sessions_exists = self.base.join("sessions.json").exists();
276 let should_scan = if force {
277 true
278 } else {
279 should_run_legacy_scan_on_startup(&marker_path, sessions_exists).await
280 };
281 if !should_scan {
282 let marker = read_legacy_import_marker(&marker_path)
283 .await
284 .unwrap_or_else(|| LegacyImportMarker {
285 version: LEGACY_IMPORT_MARKER_VERSION,
286 created_at_ms: now_ms_u64(),
287 last_checked_at_ms: now_ms_u64(),
288 legacy_counts: LegacyTreeCounts::default(),
289 imported_counts: LegacyImportedCounts::default(),
290 });
291 return Ok(LegacyRepairRunReport {
292 status: "skipped".to_string(),
293 marker_updated: false,
294 sessions_merged: 0,
295 messages_recovered: 0,
296 parts_recovered: 0,
297 legacy_counts: marker.legacy_counts,
298 imported_counts: marker.imported_counts,
299 });
300 }
301
302 let base_for_scan = self.base.clone();
303 let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
304 .await
305 .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
306
307 let merge_stats = {
308 let mut sessions = self.sessions.write().await;
309 merge_legacy_sessions_with_stats(&mut sessions, scan.sessions)
310 };
311
312 if merge_stats.changed {
313 self.flush().await?;
314 }
315
316 let marker = LegacyImportMarker {
317 version: LEGACY_IMPORT_MARKER_VERSION,
318 created_at_ms: now_ms_u64(),
319 last_checked_at_ms: now_ms_u64(),
320 legacy_counts: scan.legacy_counts.clone(),
321 imported_counts: scan.imported_counts.clone(),
322 };
323 self.write_legacy_import_marker(&marker).await?;
324
325 Ok(LegacyRepairRunReport {
326 status: if merge_stats.changed {
327 "updated".to_string()
328 } else {
329 "no_changes".to_string()
330 },
331 marker_updated: true,
332 sessions_merged: merge_stats.sessions_merged,
333 messages_recovered: merge_stats.messages_recovered,
334 parts_recovered: merge_stats.parts_recovered,
335 legacy_counts: scan.legacy_counts,
336 imported_counts: scan.imported_counts,
337 })
338 }
339
340 pub async fn delete_session(&self, id: &str) -> anyhow::Result<bool> {
341 let removed = self.sessions.write().await.remove(id).is_some();
342 self.metadata.write().await.remove(id);
343 self.question_requests
344 .write()
345 .await
346 .retain(|_, request| request.session_id != id);
347 if removed {
348 self.flush().await?;
349 }
350 Ok(removed)
351 }
352
353 pub async fn append_message(&self, session_id: &str, msg: Message) -> anyhow::Result<()> {
354 let mut sessions = self.sessions.write().await;
355 let session = sessions
356 .get_mut(session_id)
357 .context("session not found for append_message")?;
358 let mut meta_guard = self.metadata.write().await;
359 let meta = meta_guard
360 .entry(session_id.to_string())
361 .or_insert_with(SessionMeta::default);
362 meta.snapshots.push(session.messages.clone());
363 if meta.snapshots.len() > 25 {
364 let _ = meta.snapshots.remove(0);
365 }
366 session.messages.push(msg);
367 session.time.updated = Utc::now();
368 drop(sessions);
369 drop(meta_guard);
370 self.flush().await
371 }
372
373 pub async fn fork_session(&self, id: &str) -> anyhow::Result<Option<Session>> {
374 let source = {
375 let sessions = self.sessions.read().await;
376 sessions.get(id).cloned()
377 };
378 let Some(mut child) = source else {
379 return Ok(None);
380 };
381
382 child.id = Uuid::new_v4().to_string();
383 child.title = format!("{} (fork)", child.title);
384 child.time.created = Utc::now();
385 child.time.updated = child.time.created;
386 child.slug = None;
387
388 self.sessions
389 .write()
390 .await
391 .insert(child.id.clone(), child.clone());
392 self.metadata.write().await.insert(
393 child.id.clone(),
394 SessionMeta {
395 parent_id: Some(id.to_string()),
396 snapshots: vec![child.messages.clone()],
397 ..SessionMeta::default()
398 },
399 );
400 self.flush().await?;
401 Ok(Some(child))
402 }
403
404 pub async fn revert_session(&self, id: &str) -> anyhow::Result<bool> {
405 let mut sessions = self.sessions.write().await;
406 let Some(session) = sessions.get_mut(id) else {
407 return Ok(false);
408 };
409 let mut metadata = self.metadata.write().await;
410 let meta = metadata
411 .entry(id.to_string())
412 .or_insert_with(SessionMeta::default);
413 let Some(snapshot) = meta.snapshots.pop() else {
414 return Ok(false);
415 };
416 meta.pre_revert = Some(session.messages.clone());
417 session.messages = snapshot;
418 session.time.updated = Utc::now();
419 drop(metadata);
420 drop(sessions);
421 self.flush().await?;
422 Ok(true)
423 }
424
425 pub async fn unrevert_session(&self, id: &str) -> anyhow::Result<bool> {
426 let mut sessions = self.sessions.write().await;
427 let Some(session) = sessions.get_mut(id) else {
428 return Ok(false);
429 };
430 let mut metadata = self.metadata.write().await;
431 let Some(meta) = metadata.get_mut(id) else {
432 return Ok(false);
433 };
434 let Some(previous) = meta.pre_revert.take() else {
435 return Ok(false);
436 };
437 meta.snapshots.push(session.messages.clone());
438 session.messages = previous;
439 session.time.updated = Utc::now();
440 drop(metadata);
441 drop(sessions);
442 self.flush().await?;
443 Ok(true)
444 }
445
446 pub async fn set_shared(&self, id: &str, shared: bool) -> anyhow::Result<Option<String>> {
447 let mut metadata = self.metadata.write().await;
448 let meta = metadata
449 .entry(id.to_string())
450 .or_insert_with(SessionMeta::default);
451 meta.shared = shared;
452 if shared {
453 if meta.share_id.is_none() {
454 meta.share_id = Some(Uuid::new_v4().to_string());
455 }
456 } else {
457 meta.share_id = None;
458 }
459 let share_id = meta.share_id.clone();
460 drop(metadata);
461 self.flush().await?;
462 Ok(share_id)
463 }
464
465 pub async fn set_archived(&self, id: &str, archived: bool) -> anyhow::Result<bool> {
466 let mut metadata = self.metadata.write().await;
467 let meta = metadata
468 .entry(id.to_string())
469 .or_insert_with(SessionMeta::default);
470 meta.archived = archived;
471 drop(metadata);
472 self.flush().await?;
473 Ok(true)
474 }
475
476 pub async fn set_summary(&self, id: &str, summary: String) -> anyhow::Result<bool> {
477 let mut metadata = self.metadata.write().await;
478 let meta = metadata
479 .entry(id.to_string())
480 .or_insert_with(SessionMeta::default);
481 meta.summary = Some(summary);
482 drop(metadata);
483 self.flush().await?;
484 Ok(true)
485 }
486
487 pub async fn children(&self, parent_id: &str) -> Vec<Session> {
488 let child_ids = {
489 let metadata = self.metadata.read().await;
490 metadata
491 .iter()
492 .filter(|(_, meta)| meta.parent_id.as_deref() == Some(parent_id))
493 .map(|(id, _)| id.clone())
494 .collect::<Vec<_>>()
495 };
496 let sessions = self.sessions.read().await;
497 child_ids
498 .into_iter()
499 .filter_map(|id| sessions.get(&id).cloned())
500 .collect()
501 }
502
503 pub async fn session_status(&self, id: &str) -> Option<Value> {
504 let metadata = self.metadata.read().await;
505 metadata.get(id).map(|meta| {
506 json!({
507 "archived": meta.archived,
508 "shared": meta.shared,
509 "parentID": meta.parent_id,
510 "snapshotCount": meta.snapshots.len()
511 })
512 })
513 }
514
515 pub async fn session_diff(&self, id: &str) -> Option<Value> {
516 let sessions = self.sessions.read().await;
517 let current = sessions.get(id)?;
518 let metadata = self.metadata.read().await;
519 let default = SessionMeta::default();
520 let meta = metadata.get(id).unwrap_or(&default);
521 let last_snapshot_len = meta.snapshots.last().map(|s| s.len()).unwrap_or(0);
522 Some(json!({
523 "sessionID": id,
524 "currentMessageCount": current.messages.len(),
525 "lastSnapshotMessageCount": last_snapshot_len,
526 "delta": current.messages.len() as i64 - last_snapshot_len as i64
527 }))
528 }
529
530 pub async fn set_todos(&self, id: &str, todos: Vec<Value>) -> anyhow::Result<()> {
531 let mut metadata = self.metadata.write().await;
532 let meta = metadata
533 .entry(id.to_string())
534 .or_insert_with(SessionMeta::default);
535 meta.todos = normalize_todo_items(todos);
536 drop(metadata);
537 self.flush().await
538 }
539
540 pub async fn get_todos(&self, id: &str) -> Vec<Value> {
541 let todos = self
542 .metadata
543 .read()
544 .await
545 .get(id)
546 .map(|meta| meta.todos.clone())
547 .unwrap_or_default();
548 normalize_todo_items(todos)
549 }
550
551 pub async fn add_question_request(
552 &self,
553 session_id: &str,
554 message_id: &str,
555 questions: Vec<Value>,
556 ) -> anyhow::Result<QuestionRequest> {
557 if questions.is_empty() {
558 return Err(anyhow::anyhow!(
559 "cannot add empty question request for session {}",
560 session_id
561 ));
562 }
563 let request = QuestionRequest {
564 id: format!("q-{}", Uuid::new_v4()),
565 session_id: session_id.to_string(),
566 questions,
567 tool: Some(QuestionToolRef {
568 call_id: format!("call-{}", Uuid::new_v4()),
569 message_id: message_id.to_string(),
570 }),
571 };
572 self.question_requests
573 .write()
574 .await
575 .insert(request.id.clone(), request.clone());
576 self.flush().await?;
577 Ok(request)
578 }
579
580 pub async fn list_question_requests(&self) -> Vec<QuestionRequest> {
581 self.question_requests
582 .read()
583 .await
584 .values()
585 .cloned()
586 .collect()
587 }
588
589 pub async fn reply_question(&self, request_id: &str) -> anyhow::Result<bool> {
590 let removed = self
591 .question_requests
592 .write()
593 .await
594 .remove(request_id)
595 .is_some();
596 if removed {
597 self.flush().await?;
598 }
599 Ok(removed)
600 }
601
602 pub async fn reject_question(&self, request_id: &str) -> anyhow::Result<bool> {
603 self.reply_question(request_id).await
604 }
605
606 pub async fn attach_session_to_workspace(
607 &self,
608 session_id: &str,
609 target_workspace: &str,
610 reason_tag: &str,
611 ) -> anyhow::Result<Option<Session>> {
612 let Some(target_workspace) = normalize_workspace_path(target_workspace) else {
613 return Ok(None);
614 };
615 let mut sessions = self.sessions.write().await;
616 let Some(session) = sessions.get_mut(session_id) else {
617 return Ok(None);
618 };
619 let previous_workspace = session
620 .workspace_root
621 .clone()
622 .or_else(|| normalize_workspace_path(&session.directory));
623
624 if session.origin_workspace_root.is_none() {
625 session.origin_workspace_root = previous_workspace.clone();
626 }
627 session.attached_from_workspace = previous_workspace;
628 session.attached_to_workspace = Some(target_workspace.clone());
629 session.attach_timestamp_ms = Some(Utc::now().timestamp_millis().max(0) as u64);
630 session.attach_reason = Some(reason_tag.trim().to_string());
631 session.workspace_root = Some(target_workspace.clone());
632 session.directory = target_workspace;
633 session.time.updated = Utc::now();
634 let updated = session.clone();
635 drop(sessions);
636 self.flush().await?;
637 Ok(Some(updated))
638 }
639
640 async fn flush(&self) -> anyhow::Result<()> {
641 let snapshot = self.sessions.read().await.clone();
642 let payload = serde_json::to_string_pretty(&snapshot)?;
643 fs::write(self.base.join("sessions.json"), payload).await?;
644 let metadata_snapshot = self.metadata.read().await.clone();
645 let metadata_payload = serde_json::to_string_pretty(&metadata_snapshot)?;
646 fs::write(self.base.join("session_meta.json"), metadata_payload).await?;
647 let questions_snapshot = self.question_requests.read().await.clone();
648 let questions_payload = serde_json::to_string_pretty(&questions_snapshot)?;
649 fs::write(self.base.join("questions.json"), questions_payload).await?;
650 Ok(())
651 }
652
653 async fn write_legacy_import_marker(&self, marker: &LegacyImportMarker) -> anyhow::Result<()> {
654 let payload = serde_json::to_string_pretty(marker)?;
655 fs::write(self.base.join(LEGACY_IMPORT_MARKER_FILE), payload).await?;
656 Ok(())
657 }
658}
659
660fn normalize_todo_items(items: Vec<Value>) -> Vec<Value> {
661 items
662 .into_iter()
663 .filter_map(|item| {
664 let obj = item.as_object()?;
665 let content = obj
666 .get("content")
667 .and_then(|v| v.as_str())
668 .or_else(|| obj.get("text").and_then(|v| v.as_str()))
669 .unwrap_or("")
670 .trim()
671 .to_string();
672 if content.is_empty() {
673 return None;
674 }
675 let id = obj
676 .get("id")
677 .and_then(|v| v.as_str())
678 .filter(|s| !s.trim().is_empty())
679 .map(ToString::to_string)
680 .unwrap_or_else(|| format!("todo-{}", Uuid::new_v4()));
681 let status = obj
682 .get("status")
683 .and_then(|v| v.as_str())
684 .filter(|s| !s.trim().is_empty())
685 .map(ToString::to_string)
686 .unwrap_or_else(|| "pending".to_string());
687 Some(json!({
688 "id": id,
689 "content": content,
690 "status": status
691 }))
692 })
693 .collect()
694}
695
696#[derive(Debug)]
697struct LegacyScanResult {
698 sessions: HashMap<String, Session>,
699 legacy_counts: LegacyTreeCounts,
700 imported_counts: LegacyImportedCounts,
701}
702
703#[derive(Debug, Default)]
704struct LegacyMergeStats {
705 changed: bool,
706 sessions_merged: u64,
707 messages_recovered: u64,
708 parts_recovered: u64,
709}
710
711fn now_ms_u64() -> u64 {
712 Utc::now().timestamp_millis().max(0) as u64
713}
714
715async fn should_run_legacy_scan_on_startup(marker_path: &Path, sessions_exist: bool) -> bool {
716 if !sessions_exist {
717 return true;
718 }
719 if read_legacy_import_marker(marker_path).await.is_none() {
722 return false;
723 }
724 false
725}
726
727async fn read_legacy_import_marker(marker_path: &Path) -> Option<LegacyImportMarker> {
728 let raw = fs::read_to_string(marker_path).await.ok()?;
729 serde_json::from_str::<LegacyImportMarker>(&raw).ok()
730}
731
732fn scan_legacy_sessions(base: &Path) -> anyhow::Result<LegacyScanResult> {
733 let sessions = load_legacy_opencode_sessions(base).unwrap_or_default();
734 let imported_counts = LegacyImportedCounts {
735 sessions: sessions.len() as u64,
736 messages: sessions.values().map(|s| s.messages.len() as u64).sum(),
737 parts: sessions
738 .values()
739 .flat_map(|s| s.messages.iter())
740 .map(|m| m.parts.len() as u64)
741 .sum(),
742 };
743 let legacy_counts = LegacyTreeCounts {
744 session_files: count_legacy_json_files(&base.join("session")),
745 message_files: count_legacy_json_files(&base.join("message")),
746 part_files: count_legacy_json_files(&base.join("part")),
747 };
748 Ok(LegacyScanResult {
749 sessions,
750 legacy_counts,
751 imported_counts,
752 })
753}
754
755fn count_legacy_json_files(root: &Path) -> u64 {
756 if !root.is_dir() {
757 return 0;
758 }
759 let mut count = 0u64;
760 let mut stack = vec![root.to_path_buf()];
761 while let Some(dir) = stack.pop() {
762 if let Ok(entries) = std::fs::read_dir(&dir) {
763 for entry in entries.flatten() {
764 let path = entry.path();
765 if entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
766 stack.push(path);
767 continue;
768 }
769 if path.extension().and_then(|ext| ext.to_str()) == Some("json") {
770 count += 1;
771 }
772 }
773 }
774 }
775 count
776}
777
778fn merge_legacy_sessions(
779 current: &mut HashMap<String, Session>,
780 imported: HashMap<String, Session>,
781) -> bool {
782 merge_legacy_sessions_with_stats(current, imported).changed
783}
784
785fn merge_legacy_sessions_with_stats(
786 current: &mut HashMap<String, Session>,
787 imported: HashMap<String, Session>,
788) -> LegacyMergeStats {
789 let mut stats = LegacyMergeStats::default();
790 for (id, legacy) in imported {
791 let legacy_message_count = legacy.messages.len() as u64;
792 let legacy_part_count = legacy
793 .messages
794 .iter()
795 .map(|m| m.parts.len() as u64)
796 .sum::<u64>();
797 match current.get_mut(&id) {
798 None => {
799 current.insert(id, legacy);
800 stats.changed = true;
801 stats.sessions_merged += 1;
802 stats.messages_recovered += legacy_message_count;
803 stats.parts_recovered += legacy_part_count;
804 }
805 Some(existing) => {
806 let should_merge_messages =
807 existing.messages.is_empty() && !legacy.messages.is_empty();
808 let should_fill_title =
809 existing.title.trim().is_empty() && !legacy.title.trim().is_empty();
810 let should_fill_directory = (existing.directory.trim().is_empty()
811 || existing.directory.trim() == "."
812 || existing.directory.trim() == "./"
813 || existing.directory.trim() == ".\\")
814 && !legacy.directory.trim().is_empty();
815 let should_fill_workspace =
816 existing.workspace_root.is_none() && legacy.workspace_root.is_some();
817 if should_merge_messages {
818 existing.messages = legacy.messages.clone();
819 }
820 if should_fill_title {
821 existing.title = legacy.title.clone();
822 }
823 if should_fill_directory {
824 existing.directory = legacy.directory.clone();
825 }
826 if should_fill_workspace {
827 existing.workspace_root = legacy.workspace_root.clone();
828 }
829 if should_merge_messages
830 || should_fill_title
831 || should_fill_directory
832 || should_fill_workspace
833 {
834 stats.changed = true;
835 if should_merge_messages {
836 stats.sessions_merged += 1;
837 stats.messages_recovered += legacy_message_count;
838 stats.parts_recovered += legacy_part_count;
839 }
840 }
841 }
842 }
843 }
844 stats
845}
846
847fn hydrate_workspace_roots(sessions: &mut HashMap<String, Session>) -> bool {
848 let mut changed = false;
849 for session in sessions.values_mut() {
850 if session.workspace_root.is_none() {
851 let normalized = normalize_workspace_path(&session.directory);
852 if normalized.is_some() {
853 session.workspace_root = normalized;
854 changed = true;
855 }
856 }
857 }
858 changed
859}
860
861fn repair_session_titles(sessions: &mut HashMap<String, Session>) -> bool {
862 let mut changed = false;
863 for session in sessions.values_mut() {
864 if !title_needs_repair(&session.title) {
865 continue;
866 }
867 let first_user_text = session.messages.iter().find_map(|message| {
868 if !matches!(message.role, MessageRole::User) {
869 return None;
870 }
871 message.parts.iter().find_map(|part| match part {
872 MessagePart::Text { text } if !text.trim().is_empty() => Some(text.as_str()),
873 _ => None,
874 })
875 });
876 let Some(source) = first_user_text else {
877 continue;
878 };
879 let Some(derived) = derive_session_title_from_prompt(source, 60) else {
880 continue;
881 };
882 if derived == session.title {
883 continue;
884 }
885 session.title = derived;
886 session.time.updated = Utc::now();
887 changed = true;
888 }
889 changed
890}
891
892#[derive(Debug, Deserialize)]
893struct LegacySessionTime {
894 created: i64,
895 updated: i64,
896}
897
898#[derive(Debug, Deserialize)]
899struct LegacySession {
900 id: String,
901 slug: Option<String>,
902 version: Option<String>,
903 #[serde(rename = "projectID")]
904 project_id: Option<String>,
905 title: Option<String>,
906 directory: Option<String>,
907 time: LegacySessionTime,
908}
909
910fn load_legacy_opencode_sessions(base: &Path) -> anyhow::Result<HashMap<String, Session>> {
911 let legacy_root = base.join("session");
912 if !legacy_root.is_dir() {
913 return Ok(HashMap::new());
914 }
915
916 let mut out = HashMap::new();
917 let mut stack = vec![legacy_root];
918 while let Some(dir) = stack.pop() {
919 for entry in std::fs::read_dir(&dir)? {
920 let entry = entry?;
921 let path = entry.path();
922 if entry.file_type()?.is_dir() {
923 stack.push(path);
924 continue;
925 }
926 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
927 continue;
928 }
929 let raw = match std::fs::read_to_string(&path) {
930 Ok(v) => v,
931 Err(_) => continue,
932 };
933 let legacy = match serde_json::from_str::<LegacySession>(&raw) {
934 Ok(v) => v,
935 Err(_) => continue,
936 };
937 let created = Utc
938 .timestamp_millis_opt(legacy.time.created)
939 .single()
940 .unwrap_or_else(Utc::now);
941 let updated = Utc
942 .timestamp_millis_opt(legacy.time.updated)
943 .single()
944 .unwrap_or(created);
945
946 let session_id = legacy.id.clone();
947 out.insert(
948 session_id.clone(),
949 Session {
950 id: session_id.clone(),
951 slug: legacy.slug,
952 version: legacy.version,
953 project_id: legacy.project_id,
954 title: legacy
955 .title
956 .filter(|s| !s.trim().is_empty())
957 .unwrap_or_else(|| "New session".to_string()),
958 directory: legacy
959 .directory
960 .clone()
961 .filter(|s| !s.trim().is_empty())
962 .unwrap_or_else(|| ".".to_string()),
963 workspace_root: legacy
964 .directory
965 .as_deref()
966 .and_then(normalize_workspace_path),
967 origin_workspace_root: None,
968 attached_from_workspace: None,
969 attached_to_workspace: None,
970 attach_timestamp_ms: None,
971 attach_reason: None,
972 time: tandem_types::SessionTime { created, updated },
973 model: None,
974 provider: None,
975 environment: None,
976 messages: load_legacy_session_messages(base, &session_id),
977 },
978 );
979 }
980 }
981 Ok(out)
982}
983
984#[derive(Debug, Deserialize)]
985struct LegacyMessageTime {
986 created: i64,
987}
988
989#[derive(Debug, Deserialize)]
990struct LegacyMessage {
991 id: String,
992 role: String,
993 time: LegacyMessageTime,
994}
995
996#[derive(Debug, Deserialize)]
997struct LegacyPart {
998 #[serde(rename = "type")]
999 part_type: Option<String>,
1000 text: Option<String>,
1001 tool: Option<String>,
1002 args: Option<Value>,
1003 result: Option<Value>,
1004 error: Option<String>,
1005}
1006
1007fn load_legacy_session_messages(base: &Path, session_id: &str) -> Vec<Message> {
1008 let msg_dir = base.join("message").join(session_id);
1009 if !msg_dir.is_dir() {
1010 return Vec::new();
1011 }
1012
1013 let mut legacy_messages: Vec<(i64, Message)> = Vec::new();
1014
1015 let Ok(entries) = std::fs::read_dir(&msg_dir) else {
1016 return Vec::new();
1017 };
1018
1019 for entry in entries.flatten() {
1020 let path = entry.path();
1021 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1022 continue;
1023 }
1024 let Ok(raw) = std::fs::read_to_string(&path) else {
1025 continue;
1026 };
1027 let Ok(legacy) = serde_json::from_str::<LegacyMessage>(&raw) else {
1028 continue;
1029 };
1030
1031 let created_at = Utc
1032 .timestamp_millis_opt(legacy.time.created)
1033 .single()
1034 .unwrap_or_else(Utc::now);
1035
1036 legacy_messages.push((
1037 legacy.time.created,
1038 Message {
1039 id: legacy.id.clone(),
1040 role: legacy_role_to_message_role(&legacy.role),
1041 parts: load_legacy_message_parts(base, &legacy.id),
1042 created_at,
1043 },
1044 ));
1045 }
1046
1047 legacy_messages.sort_by_key(|(created_ms, _)| *created_ms);
1048 legacy_messages.into_iter().map(|(_, msg)| msg).collect()
1049}
1050
1051fn load_legacy_message_parts(base: &Path, message_id: &str) -> Vec<MessagePart> {
1052 let parts_dir = base.join("part").join(message_id);
1053 if !parts_dir.is_dir() {
1054 return Vec::new();
1055 }
1056
1057 let Ok(entries) = std::fs::read_dir(&parts_dir) else {
1058 return Vec::new();
1059 };
1060
1061 let mut out = Vec::new();
1062 for entry in entries.flatten() {
1063 let path = entry.path();
1064 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1065 continue;
1066 }
1067 let Ok(raw) = std::fs::read_to_string(&path) else {
1068 continue;
1069 };
1070 let Ok(part) = serde_json::from_str::<LegacyPart>(&raw) else {
1071 continue;
1072 };
1073
1074 let mapped = if let Some(tool) = part.tool {
1075 Some(MessagePart::ToolInvocation {
1076 tool,
1077 args: part.args.unwrap_or_else(|| json!({})),
1078 result: part.result,
1079 error: part.error,
1080 })
1081 } else {
1082 match part.part_type.as_deref() {
1083 Some("reasoning") => Some(MessagePart::Reasoning {
1084 text: part.text.unwrap_or_default(),
1085 }),
1086 Some("tool") => Some(MessagePart::ToolInvocation {
1087 tool: "tool".to_string(),
1088 args: part.args.unwrap_or_else(|| json!({})),
1089 result: part.result,
1090 error: part.error,
1091 }),
1092 Some("text") | None => Some(MessagePart::Text {
1093 text: part.text.unwrap_or_default(),
1094 }),
1095 _ => None,
1096 }
1097 };
1098
1099 if let Some(part) = mapped {
1100 out.push(part);
1101 }
1102 }
1103 out
1104}
1105
1106fn legacy_role_to_message_role(role: &str) -> MessageRole {
1107 match role.to_lowercase().as_str() {
1108 "user" => MessageRole::User,
1109 "assistant" => MessageRole::Assistant,
1110 "system" => MessageRole::System,
1111 "tool" => MessageRole::Tool,
1112 _ => MessageRole::Assistant,
1113 }
1114}
1115
1116#[derive(Debug, Clone, Default)]
1117struct MessageMergeStats {
1118 messages_recovered: u64,
1119 parts_recovered: u64,
1120 conflicts_merged: u64,
1121}
1122
1123fn message_richness(msg: &Message) -> usize {
1124 msg.parts
1125 .iter()
1126 .map(|p| match p {
1127 MessagePart::Text { text } | MessagePart::Reasoning { text } => {
1128 if text.trim().is_empty() {
1129 0
1130 } else {
1131 1
1132 }
1133 }
1134 MessagePart::ToolInvocation { result, error, .. } => {
1135 if result.is_some() || error.is_some() {
1136 2
1137 } else {
1138 1
1139 }
1140 }
1141 })
1142 .sum()
1143}
1144
1145fn most_recent_message_time(messages: &[Message]) -> Option<chrono::DateTime<Utc>> {
1146 messages.iter().map(|m| m.created_at).max()
1147}
1148
1149fn merge_session_messages(
1150 existing: &[Message],
1151 imported: &[Message],
1152) -> (Vec<Message>, MessageMergeStats, bool) {
1153 if existing.is_empty() {
1154 let messages_recovered = imported.len() as u64;
1155 let parts_recovered = imported.iter().map(|m| m.parts.len() as u64).sum();
1156 return (
1157 imported.to_vec(),
1158 MessageMergeStats {
1159 messages_recovered,
1160 parts_recovered,
1161 conflicts_merged: 0,
1162 },
1163 true,
1164 );
1165 }
1166
1167 let mut merged_by_id: HashMap<String, Message> = existing
1168 .iter()
1169 .cloned()
1170 .map(|m| (m.id.clone(), m))
1171 .collect();
1172 let mut stats = MessageMergeStats::default();
1173 let mut changed = false;
1174
1175 for incoming in imported {
1176 match merged_by_id.get(&incoming.id) {
1177 None => {
1178 merged_by_id.insert(incoming.id.clone(), incoming.clone());
1179 stats.messages_recovered += 1;
1180 stats.parts_recovered += incoming.parts.len() as u64;
1181 changed = true;
1182 }
1183 Some(current) => {
1184 let incoming_richer = message_richness(incoming) > message_richness(current)
1185 || incoming.parts.len() > current.parts.len();
1186 if incoming_richer {
1187 merged_by_id.insert(incoming.id.clone(), incoming.clone());
1188 stats.conflicts_merged += 1;
1189 changed = true;
1190 }
1191 }
1192 }
1193 }
1194
1195 let mut out: Vec<Message> = merged_by_id.into_values().collect();
1196 out.sort_by_key(|m| m.created_at);
1197 (out, stats, changed)
1198}
1199
1200#[cfg(test)]
1201mod tests {
1202 use super::*;
1203 use std::fs as stdfs;
1204
1205 #[tokio::test]
1206 async fn todos_are_normalized_to_wire_shape() {
1207 let base = std::env::temp_dir().join(format!("tandem-core-test-{}", Uuid::new_v4()));
1208 let storage = Storage::new(&base).await.expect("storage");
1209 let session = Session::new(Some("test".to_string()), Some(".".to_string()));
1210 let id = session.id.clone();
1211 storage.save_session(session).await.expect("save session");
1212
1213 storage
1214 .set_todos(
1215 &id,
1216 vec![
1217 json!({"content":"first"}),
1218 json!({"text":"second", "status":"in_progress"}),
1219 json!({"id":"keep-id","content":"third","status":"completed"}),
1220 ],
1221 )
1222 .await
1223 .expect("set todos");
1224
1225 let todos = storage.get_todos(&id).await;
1226 assert_eq!(todos.len(), 3);
1227 for todo in todos {
1228 assert!(todo.get("id").and_then(|v| v.as_str()).is_some());
1229 assert!(todo.get("content").and_then(|v| v.as_str()).is_some());
1230 assert!(todo.get("status").and_then(|v| v.as_str()).is_some());
1231 }
1232 }
1233
1234 #[tokio::test]
1235 async fn imports_legacy_opencode_session_index_when_sessions_json_missing() {
1236 let base =
1237 std::env::temp_dir().join(format!("tandem-core-legacy-import-{}", Uuid::new_v4()));
1238 let legacy_session_dir = base.join("session").join("global");
1239 stdfs::create_dir_all(&legacy_session_dir).expect("legacy session dir");
1240 stdfs::write(
1241 legacy_session_dir.join("ses_test.json"),
1242 r#"{
1243 "id": "ses_test",
1244 "slug": "test",
1245 "version": "1.0.0",
1246 "projectID": "proj_1",
1247 "directory": "C:\\work\\demo",
1248 "title": "Legacy Session",
1249 "time": { "created": 1770913145613, "updated": 1770913146613 }
1250}"#,
1251 )
1252 .expect("legacy session write");
1253
1254 let storage = Storage::new(&base).await.expect("storage");
1255 let sessions = storage.list_sessions().await;
1256 assert_eq!(sessions.len(), 1);
1257 assert_eq!(sessions[0].id, "ses_test");
1258 assert_eq!(sessions[0].title, "Legacy Session");
1259 assert!(base.join("sessions.json").exists());
1260 }
1261
1262 #[tokio::test]
1263 async fn imports_legacy_messages_and_parts_for_session() {
1264 let base = std::env::temp_dir().join(format!("tandem-core-legacy-msg-{}", Uuid::new_v4()));
1265 let session_dir = base.join("session").join("global");
1266 let message_dir = base.join("message").join("ses_test");
1267 let part_dir = base.join("part").join("msg_1");
1268 stdfs::create_dir_all(&session_dir).expect("session dir");
1269 stdfs::create_dir_all(&message_dir).expect("message dir");
1270 stdfs::create_dir_all(&part_dir).expect("part dir");
1271
1272 stdfs::write(
1273 session_dir.join("ses_test.json"),
1274 r#"{
1275 "id": "ses_test",
1276 "projectID": "proj_1",
1277 "directory": "C:\\work\\demo",
1278 "title": "Legacy Session",
1279 "time": { "created": 1770913145613, "updated": 1770913146613 }
1280}"#,
1281 )
1282 .expect("write session");
1283
1284 stdfs::write(
1285 message_dir.join("msg_1.json"),
1286 r#"{
1287 "id": "msg_1",
1288 "sessionID": "ses_test",
1289 "role": "assistant",
1290 "time": { "created": 1770913145613 }
1291}"#,
1292 )
1293 .expect("write msg");
1294
1295 stdfs::write(
1296 part_dir.join("prt_1.json"),
1297 r#"{
1298 "id": "prt_1",
1299 "sessionID": "ses_test",
1300 "messageID": "msg_1",
1301 "type": "text",
1302 "text": "hello from legacy"
1303}"#,
1304 )
1305 .expect("write part");
1306
1307 let storage = Storage::new(&base).await.expect("storage");
1308 let sessions = storage.list_sessions().await;
1309 assert_eq!(sessions.len(), 1);
1310 assert_eq!(sessions[0].messages.len(), 1);
1311 assert_eq!(sessions[0].messages[0].parts.len(), 1);
1312 }
1313
1314 #[tokio::test]
1315 async fn skips_legacy_merge_when_sessions_json_exists() {
1316 let base =
1317 std::env::temp_dir().join(format!("tandem-core-legacy-merge-{}", Uuid::new_v4()));
1318 stdfs::create_dir_all(&base).expect("base");
1319 stdfs::write(
1320 base.join("sessions.json"),
1321 r#"{
1322 "ses_current": {
1323 "id": "ses_current",
1324 "slug": null,
1325 "version": "v1",
1326 "project_id": null,
1327 "title": "Current Session",
1328 "directory": ".",
1329 "workspace_root": null,
1330 "origin_workspace_root": null,
1331 "attached_from_workspace": null,
1332 "attached_to_workspace": null,
1333 "attach_timestamp_ms": null,
1334 "attach_reason": null,
1335 "time": {"created":"2026-01-01T00:00:00Z","updated":"2026-01-01T00:00:00Z"},
1336 "model": null,
1337 "provider": null,
1338 "messages": []
1339 }
1340}"#,
1341 )
1342 .expect("sessions.json");
1343
1344 let legacy_session_dir = base.join("session").join("global");
1345 stdfs::create_dir_all(&legacy_session_dir).expect("legacy session dir");
1346 stdfs::write(
1347 legacy_session_dir.join("ses_legacy.json"),
1348 r#"{
1349 "id": "ses_legacy",
1350 "slug": "legacy",
1351 "version": "1.0.0",
1352 "projectID": "proj_legacy",
1353 "directory": "C:\\work\\legacy",
1354 "title": "Legacy Session",
1355 "time": { "created": 1770913145613, "updated": 1770913146613 }
1356}"#,
1357 )
1358 .expect("legacy session write");
1359
1360 let storage = Storage::new(&base).await.expect("storage");
1361 let sessions = storage.list_sessions().await;
1362 let ids = sessions.iter().map(|s| s.id.clone()).collect::<Vec<_>>();
1363 assert!(ids.contains(&"ses_current".to_string()));
1364 assert!(!ids.contains(&"ses_legacy".to_string()));
1365 }
1366
1367 #[tokio::test]
1368 async fn list_sessions_scoped_filters_by_workspace_root() {
1369 let base = std::env::temp_dir().join(format!("tandem-core-scope-{}", Uuid::new_v4()));
1370 let storage = Storage::new(&base).await.expect("storage");
1371 let ws_a = base.join("ws-a");
1372 let ws_b = base.join("ws-b");
1373 stdfs::create_dir_all(&ws_a).expect("ws_a");
1374 stdfs::create_dir_all(&ws_b).expect("ws_b");
1375 let ws_a_str = ws_a.to_string_lossy().to_string();
1376 let ws_b_str = ws_b.to_string_lossy().to_string();
1377
1378 let mut a = Session::new(Some("a".to_string()), Some(ws_a_str.clone()));
1379 a.workspace_root = Some(ws_a_str.clone());
1380 storage.save_session(a).await.expect("save a");
1381
1382 let mut b = Session::new(Some("b".to_string()), Some(ws_b_str.clone()));
1383 b.workspace_root = Some(ws_b_str);
1384 storage.save_session(b).await.expect("save b");
1385
1386 let scoped = storage
1387 .list_sessions_scoped(SessionListScope::Workspace {
1388 workspace_root: ws_a_str,
1389 })
1390 .await;
1391 assert_eq!(scoped.len(), 1);
1392 assert_eq!(scoped[0].title, "a");
1393 }
1394
1395 #[tokio::test]
1396 async fn attach_session_persists_audit_metadata() {
1397 let base = std::env::temp_dir().join(format!("tandem-core-attach-{}", Uuid::new_v4()));
1398 let storage = Storage::new(&base).await.expect("storage");
1399 let ws_a = base.join("ws-a");
1400 let ws_b = base.join("ws-b");
1401 stdfs::create_dir_all(&ws_a).expect("ws_a");
1402 stdfs::create_dir_all(&ws_b).expect("ws_b");
1403 let ws_a_str = ws_a.to_string_lossy().to_string();
1404 let ws_b_str = ws_b.to_string_lossy().to_string();
1405 let mut session = Session::new(Some("s".to_string()), Some(ws_a_str.clone()));
1406 session.workspace_root = Some(ws_a_str);
1407 let id = session.id.clone();
1408 storage.save_session(session).await.expect("save");
1409
1410 let updated = storage
1411 .attach_session_to_workspace(&id, &ws_b_str, "manual")
1412 .await
1413 .expect("attach")
1414 .expect("session exists");
1415 let normalized_expected = normalize_workspace_path(&ws_b_str).expect("normalized path");
1416 assert_eq!(
1417 updated.workspace_root.as_deref(),
1418 Some(normalized_expected.as_str())
1419 );
1420 assert_eq!(
1421 updated.attached_to_workspace.as_deref(),
1422 Some(normalized_expected.as_str())
1423 );
1424 assert_eq!(updated.attach_reason.as_deref(), Some("manual"));
1425 assert!(updated.attach_timestamp_ms.is_some());
1426 }
1427
1428 #[tokio::test]
1429 async fn startup_repairs_placeholder_titles_from_wrapped_user_messages() {
1430 let base =
1431 std::env::temp_dir().join(format!("tandem-core-title-repair-{}", Uuid::new_v4()));
1432 let storage = Storage::new(&base).await.expect("storage");
1433 let wrapped = "<memory_context>\n<current_session>\n- fact\n</current_session>\n</memory_context>\n\n[Mode instructions]\nUse tools.\n\n[User request]\nExplain this bug";
1434 let mut session = Session::new(Some("<memory_context>".to_string()), Some(".".to_string()));
1435 let id = session.id.clone();
1436 session.messages.push(Message::new(
1437 MessageRole::User,
1438 vec![MessagePart::Text {
1439 text: wrapped.to_string(),
1440 }],
1441 ));
1442 storage.save_session(session).await.expect("save");
1443 drop(storage);
1444
1445 let storage = Storage::new(&base).await.expect("storage");
1446 let repaired = storage.get_session(&id).await.expect("session");
1447 assert_eq!(repaired.title, "Explain this bug");
1448 }
1449}