1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use anyhow::Context;
5use chrono::{TimeZone, Utc};
6use serde::{Deserialize, Serialize};
7use serde_json::{json, Value};
8use tokio::fs;
9use tokio::sync::RwLock;
10use tokio::task;
11use uuid::Uuid;
12
13use tandem_types::{Message, MessagePart, MessageRole, Session};
14
15use crate::{derive_session_title_from_prompt, normalize_workspace_path, title_needs_repair};
16
17#[derive(Debug, Clone, Serialize, Deserialize, Default)]
18pub struct SessionMeta {
19 pub parent_id: Option<String>,
20 #[serde(default)]
21 pub archived: bool,
22 #[serde(default)]
23 pub shared: bool,
24 pub share_id: Option<String>,
25 pub summary: Option<String>,
26 #[serde(default)]
27 pub snapshots: Vec<Vec<Message>>,
28 pub pre_revert: Option<Vec<Message>>,
29 #[serde(default)]
30 pub todos: Vec<Value>,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct QuestionToolRef {
35 #[serde(rename = "callID")]
36 pub call_id: String,
37 #[serde(rename = "messageID")]
38 pub message_id: String,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct QuestionRequest {
43 pub id: String,
44 #[serde(rename = "sessionID")]
45 pub session_id: String,
46 #[serde(default)]
47 pub questions: Vec<Value>,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 pub tool: Option<QuestionToolRef>,
50}
51
52pub struct Storage {
53 base: PathBuf,
54 sessions: RwLock<HashMap<String, Session>>,
55 metadata: RwLock<HashMap<String, SessionMeta>>,
56 question_requests: RwLock<HashMap<String, QuestionRequest>>,
57}
58
59#[derive(Debug, Clone)]
60pub enum SessionListScope {
61 Global,
62 Workspace { workspace_root: String },
63}
64
65#[derive(Debug, Clone, Default, Serialize, Deserialize)]
66pub struct SessionRepairStats {
67 pub sessions_repaired: u64,
68 pub messages_recovered: u64,
69 pub parts_recovered: u64,
70 pub conflicts_merged: u64,
71}
72
73const LEGACY_IMPORT_MARKER_FILE: &str = "legacy_import_marker.json";
74const LEGACY_IMPORT_MARKER_VERSION: u32 = 1;
75
76#[derive(Debug, Clone, Default, Serialize, Deserialize)]
77pub struct LegacyTreeCounts {
78 pub session_files: u64,
79 pub message_files: u64,
80 pub part_files: u64,
81}
82
83#[derive(Debug, Clone, Default, Serialize, Deserialize)]
84pub struct LegacyImportedCounts {
85 pub sessions: u64,
86 pub messages: u64,
87 pub parts: u64,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct LegacyImportMarker {
92 pub version: u32,
93 pub created_at_ms: u64,
94 pub last_checked_at_ms: u64,
95 pub legacy_counts: LegacyTreeCounts,
96 pub imported_counts: LegacyImportedCounts,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct LegacyRepairRunReport {
101 pub status: String,
102 pub marker_updated: bool,
103 pub sessions_merged: u64,
104 pub messages_recovered: u64,
105 pub parts_recovered: u64,
106 pub legacy_counts: LegacyTreeCounts,
107 pub imported_counts: LegacyImportedCounts,
108}
109
110fn snapshot_session_messages(
111 session_id: &str,
112 session: &Session,
113 metadata: &mut HashMap<String, SessionMeta>,
114) {
115 let meta = metadata
116 .entry(session_id.to_string())
117 .or_insert_with(SessionMeta::default);
118 meta.snapshots.push(session.messages.clone());
119 if meta.snapshots.len() > 25 {
120 let _ = meta.snapshots.remove(0);
121 }
122}
123
124fn merge_message_part(message: &mut Message, part: MessagePart) {
125 match part {
126 MessagePart::ToolInvocation {
127 tool,
128 args,
129 result,
130 error,
131 } => {
132 let args_are_empty =
133 args.is_null() || args.as_object().is_some_and(|value| value.is_empty());
134 if result.is_none() && error.is_none() {
135 if let Some(existing) = message.parts.iter_mut().rev().find(|existing| {
136 matches!(
137 existing,
138 MessagePart::ToolInvocation {
139 tool: existing_tool,
140 result: None,
141 error: None,
142 ..
143 } if existing_tool == &tool
144 )
145 }) {
146 if let MessagePart::ToolInvocation {
147 args: existing_args,
148 ..
149 } = existing
150 {
151 let existing_args_are_empty = existing_args.is_null()
152 || existing_args
153 .as_object()
154 .is_some_and(|value| value.is_empty());
155 if !args_are_empty || existing_args_are_empty {
156 *existing_args = args;
157 }
158 return;
159 }
160 }
161 }
162 if result.is_some() || error.is_some() {
163 if let Some(existing) = message.parts.iter_mut().rev().find(|existing| {
164 matches!(
165 existing,
166 MessagePart::ToolInvocation {
167 tool: existing_tool,
168 result: None,
169 error: None,
170 ..
171 } if existing_tool == &tool
172 )
173 }) {
174 if let MessagePart::ToolInvocation {
175 args: existing_args,
176 result: existing_result,
177 error: existing_error,
178 ..
179 } = existing
180 {
181 let existing_args_are_empty = existing_args.is_null()
182 || existing_args
183 .as_object()
184 .is_some_and(|value| value.is_empty());
185 if existing_args_are_empty {
186 if tool == "write" && args_are_empty {
187 tracing::info!(
188 tool = %tool,
189 "merging write result/error into existing tool part with empty args"
190 );
191 }
192 *existing_args = args.clone();
193 }
194 *existing_result = result;
195 *existing_error = error;
196 return;
197 }
198 }
199 }
200 message.parts.push(MessagePart::ToolInvocation {
201 tool,
202 args,
203 result,
204 error,
205 });
206 }
207 other => message.parts.push(other),
208 }
209}
210
211impl Storage {
212 pub async fn new(base: impl AsRef<Path>) -> anyhow::Result<Self> {
213 let base = base.as_ref().to_path_buf();
214 fs::create_dir_all(&base).await?;
215 let sessions_file = base.join("sessions.json");
216 let marker_path = base.join(LEGACY_IMPORT_MARKER_FILE);
217 let sessions_file_exists = sessions_file.exists();
218 let mut imported_legacy_sessions = false;
219 let mut sessions = if sessions_file_exists {
220 let raw = fs::read_to_string(&sessions_file).await?;
221 serde_json::from_str::<HashMap<String, Session>>(&raw).unwrap_or_default()
222 } else {
223 HashMap::new()
224 };
225
226 let mut marker_to_write = None;
227 if should_run_legacy_scan_on_startup(&marker_path, sessions_file_exists).await {
228 let base_for_scan = base.clone();
229 let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
230 .await
231 .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
232 if merge_legacy_sessions(&mut sessions, scan.sessions) {
233 imported_legacy_sessions = true;
234 }
235 marker_to_write = Some(LegacyImportMarker {
236 version: LEGACY_IMPORT_MARKER_VERSION,
237 created_at_ms: now_ms_u64(),
238 last_checked_at_ms: now_ms_u64(),
239 legacy_counts: scan.legacy_counts,
240 imported_counts: scan.imported_counts,
241 });
242 }
243
244 if hydrate_workspace_roots(&mut sessions) {
245 imported_legacy_sessions = true;
246 }
247 if repair_session_titles(&mut sessions) {
248 imported_legacy_sessions = true;
249 }
250 let metadata_file = base.join("session_meta.json");
251 let metadata = if metadata_file.exists() {
252 let raw = fs::read_to_string(&metadata_file).await?;
253 serde_json::from_str::<HashMap<String, SessionMeta>>(&raw).unwrap_or_default()
254 } else {
255 HashMap::new()
256 };
257 let questions_file = base.join("questions.json");
258 let question_requests = if questions_file.exists() {
259 let raw = fs::read_to_string(&questions_file).await?;
260 serde_json::from_str::<HashMap<String, QuestionRequest>>(&raw).unwrap_or_default()
261 } else {
262 HashMap::new()
263 };
264 let storage = Self {
265 base,
266 sessions: RwLock::new(sessions),
267 metadata: RwLock::new(metadata),
268 question_requests: RwLock::new(question_requests),
269 };
270
271 if imported_legacy_sessions {
272 storage.flush().await?;
273 }
274 if let Some(marker) = marker_to_write {
275 storage.write_legacy_import_marker(&marker).await?;
276 }
277
278 Ok(storage)
279 }
280
281 pub async fn list_sessions(&self) -> Vec<Session> {
282 self.list_sessions_scoped(SessionListScope::Global).await
283 }
284
285 pub async fn list_sessions_scoped(&self, scope: SessionListScope) -> Vec<Session> {
286 let all = self
287 .sessions
288 .read()
289 .await
290 .values()
291 .cloned()
292 .collect::<Vec<_>>();
293 match scope {
294 SessionListScope::Global => all,
295 SessionListScope::Workspace { workspace_root } => {
296 let Some(normalized_workspace) = normalize_workspace_path(&workspace_root) else {
297 return Vec::new();
298 };
299 all.into_iter()
300 .filter(|session| {
301 let direct = session
302 .workspace_root
303 .as_ref()
304 .and_then(|p| normalize_workspace_path(p))
305 .map(|p| p == normalized_workspace)
306 .unwrap_or(false);
307 if direct {
308 return true;
309 }
310 normalize_workspace_path(&session.directory)
311 .map(|p| p == normalized_workspace)
312 .unwrap_or(false)
313 })
314 .collect()
315 }
316 }
317 }
318
319 pub async fn get_session(&self, id: &str) -> Option<Session> {
320 self.sessions.read().await.get(id).cloned()
321 }
322
323 pub async fn save_session(&self, mut session: Session) -> anyhow::Result<()> {
324 if session.workspace_root.is_none() {
325 session.workspace_root = normalize_workspace_path(&session.directory);
326 }
327 let session_id = session.id.clone();
328 self.sessions
329 .write()
330 .await
331 .insert(session_id.clone(), session);
332 self.metadata
333 .write()
334 .await
335 .entry(session_id)
336 .or_insert_with(SessionMeta::default);
337 self.flush().await
338 }
339
340 pub async fn repair_sessions_from_file_store(&self) -> anyhow::Result<SessionRepairStats> {
341 let mut stats = SessionRepairStats::default();
342 let mut sessions = self.sessions.write().await;
343
344 for session in sessions.values_mut() {
345 let imported = load_legacy_session_messages(&self.base, &session.id);
346 if imported.is_empty() {
347 continue;
348 }
349
350 let (merged, merge_stats, changed) =
351 merge_session_messages(&session.messages, &imported);
352 if changed {
353 session.messages = merged;
354 session.time.updated =
355 most_recent_message_time(&session.messages).unwrap_or(session.time.updated);
356 stats.sessions_repaired += 1;
357 stats.messages_recovered += merge_stats.messages_recovered;
358 stats.parts_recovered += merge_stats.parts_recovered;
359 stats.conflicts_merged += merge_stats.conflicts_merged;
360 }
361 }
362
363 if stats.sessions_repaired > 0 {
364 drop(sessions);
365 self.flush().await?;
366 }
367
368 Ok(stats)
369 }
370
371 pub async fn run_legacy_storage_repair_scan(
372 &self,
373 force: bool,
374 ) -> anyhow::Result<LegacyRepairRunReport> {
375 let marker_path = self.base.join(LEGACY_IMPORT_MARKER_FILE);
376 let sessions_exists = self.base.join("sessions.json").exists();
377 let should_scan = if force {
378 true
379 } else {
380 should_run_legacy_scan_on_startup(&marker_path, sessions_exists).await
381 };
382 if !should_scan {
383 let marker = read_legacy_import_marker(&marker_path)
384 .await
385 .unwrap_or_else(|| LegacyImportMarker {
386 version: LEGACY_IMPORT_MARKER_VERSION,
387 created_at_ms: now_ms_u64(),
388 last_checked_at_ms: now_ms_u64(),
389 legacy_counts: LegacyTreeCounts::default(),
390 imported_counts: LegacyImportedCounts::default(),
391 });
392 return Ok(LegacyRepairRunReport {
393 status: "skipped".to_string(),
394 marker_updated: false,
395 sessions_merged: 0,
396 messages_recovered: 0,
397 parts_recovered: 0,
398 legacy_counts: marker.legacy_counts,
399 imported_counts: marker.imported_counts,
400 });
401 }
402
403 let base_for_scan = self.base.clone();
404 let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
405 .await
406 .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
407
408 let merge_stats = {
409 let mut sessions = self.sessions.write().await;
410 merge_legacy_sessions_with_stats(&mut sessions, scan.sessions)
411 };
412
413 if merge_stats.changed {
414 self.flush().await?;
415 }
416
417 let marker = LegacyImportMarker {
418 version: LEGACY_IMPORT_MARKER_VERSION,
419 created_at_ms: now_ms_u64(),
420 last_checked_at_ms: now_ms_u64(),
421 legacy_counts: scan.legacy_counts.clone(),
422 imported_counts: scan.imported_counts.clone(),
423 };
424 self.write_legacy_import_marker(&marker).await?;
425
426 Ok(LegacyRepairRunReport {
427 status: if merge_stats.changed {
428 "updated".to_string()
429 } else {
430 "no_changes".to_string()
431 },
432 marker_updated: true,
433 sessions_merged: merge_stats.sessions_merged,
434 messages_recovered: merge_stats.messages_recovered,
435 parts_recovered: merge_stats.parts_recovered,
436 legacy_counts: scan.legacy_counts,
437 imported_counts: scan.imported_counts,
438 })
439 }
440
441 pub async fn delete_session(&self, id: &str) -> anyhow::Result<bool> {
442 let removed = self.sessions.write().await.remove(id).is_some();
443 self.metadata.write().await.remove(id);
444 self.question_requests
445 .write()
446 .await
447 .retain(|_, request| request.session_id != id);
448 if removed {
449 self.flush().await?;
450 }
451 Ok(removed)
452 }
453
454 pub async fn append_message(&self, session_id: &str, msg: Message) -> anyhow::Result<()> {
455 let mut sessions = self.sessions.write().await;
456 let session = sessions
457 .get_mut(session_id)
458 .context("session not found for append_message")?;
459 let mut meta_guard = self.metadata.write().await;
460 snapshot_session_messages(session_id, session, &mut meta_guard);
461 session.messages.push(msg);
462 session.time.updated = Utc::now();
463 drop(sessions);
464 drop(meta_guard);
465 self.flush().await
466 }
467
468 pub async fn append_message_part(
469 &self,
470 session_id: &str,
471 message_id: &str,
472 part: MessagePart,
473 ) -> anyhow::Result<()> {
474 let mut sessions = self.sessions.write().await;
475 let session = sessions
476 .get_mut(session_id)
477 .context("session not found for append_message_part")?;
478 let mut meta_guard = self.metadata.write().await;
479 snapshot_session_messages(session_id, session, &mut meta_guard);
480 let message = if let Some(message) = session
481 .messages
482 .iter_mut()
483 .find(|message| message.id == message_id)
484 {
485 message
486 } else {
487 session
488 .messages
489 .iter_mut()
490 .rev()
491 .find(|message| matches!(message.role, MessageRole::User))
492 .context("message not found for append_message_part")?
493 };
494 merge_message_part(message, part);
495 session.time.updated = Utc::now();
496 drop(sessions);
497 drop(meta_guard);
498 self.flush().await
499 }
500
501 pub async fn fork_session(&self, id: &str) -> anyhow::Result<Option<Session>> {
502 let source = {
503 let sessions = self.sessions.read().await;
504 sessions.get(id).cloned()
505 };
506 let Some(mut child) = source else {
507 return Ok(None);
508 };
509
510 child.id = Uuid::new_v4().to_string();
511 child.title = format!("{} (fork)", child.title);
512 child.time.created = Utc::now();
513 child.time.updated = child.time.created;
514 child.slug = None;
515
516 self.sessions
517 .write()
518 .await
519 .insert(child.id.clone(), child.clone());
520 self.metadata.write().await.insert(
521 child.id.clone(),
522 SessionMeta {
523 parent_id: Some(id.to_string()),
524 snapshots: vec![child.messages.clone()],
525 ..SessionMeta::default()
526 },
527 );
528 self.flush().await?;
529 Ok(Some(child))
530 }
531
532 pub async fn revert_session(&self, id: &str) -> anyhow::Result<bool> {
533 let mut sessions = self.sessions.write().await;
534 let Some(session) = sessions.get_mut(id) else {
535 return Ok(false);
536 };
537 let mut metadata = self.metadata.write().await;
538 let meta = metadata
539 .entry(id.to_string())
540 .or_insert_with(SessionMeta::default);
541 let Some(snapshot) = meta.snapshots.pop() else {
542 return Ok(false);
543 };
544 meta.pre_revert = Some(session.messages.clone());
545 session.messages = snapshot;
546 session.time.updated = Utc::now();
547 drop(metadata);
548 drop(sessions);
549 self.flush().await?;
550 Ok(true)
551 }
552
553 pub async fn unrevert_session(&self, id: &str) -> anyhow::Result<bool> {
554 let mut sessions = self.sessions.write().await;
555 let Some(session) = sessions.get_mut(id) else {
556 return Ok(false);
557 };
558 let mut metadata = self.metadata.write().await;
559 let Some(meta) = metadata.get_mut(id) else {
560 return Ok(false);
561 };
562 let Some(previous) = meta.pre_revert.take() else {
563 return Ok(false);
564 };
565 meta.snapshots.push(session.messages.clone());
566 session.messages = previous;
567 session.time.updated = Utc::now();
568 drop(metadata);
569 drop(sessions);
570 self.flush().await?;
571 Ok(true)
572 }
573
574 pub async fn set_shared(&self, id: &str, shared: bool) -> anyhow::Result<Option<String>> {
575 let mut metadata = self.metadata.write().await;
576 let meta = metadata
577 .entry(id.to_string())
578 .or_insert_with(SessionMeta::default);
579 meta.shared = shared;
580 if shared {
581 if meta.share_id.is_none() {
582 meta.share_id = Some(Uuid::new_v4().to_string());
583 }
584 } else {
585 meta.share_id = None;
586 }
587 let share_id = meta.share_id.clone();
588 drop(metadata);
589 self.flush().await?;
590 Ok(share_id)
591 }
592
593 pub async fn set_archived(&self, id: &str, archived: bool) -> anyhow::Result<bool> {
594 let mut metadata = self.metadata.write().await;
595 let meta = metadata
596 .entry(id.to_string())
597 .or_insert_with(SessionMeta::default);
598 meta.archived = archived;
599 drop(metadata);
600 self.flush().await?;
601 Ok(true)
602 }
603
604 pub async fn set_summary(&self, id: &str, summary: String) -> anyhow::Result<bool> {
605 let mut metadata = self.metadata.write().await;
606 let meta = metadata
607 .entry(id.to_string())
608 .or_insert_with(SessionMeta::default);
609 meta.summary = Some(summary);
610 drop(metadata);
611 self.flush().await?;
612 Ok(true)
613 }
614
615 pub async fn children(&self, parent_id: &str) -> Vec<Session> {
616 let child_ids = {
617 let metadata = self.metadata.read().await;
618 metadata
619 .iter()
620 .filter(|(_, meta)| meta.parent_id.as_deref() == Some(parent_id))
621 .map(|(id, _)| id.clone())
622 .collect::<Vec<_>>()
623 };
624 let sessions = self.sessions.read().await;
625 child_ids
626 .into_iter()
627 .filter_map(|id| sessions.get(&id).cloned())
628 .collect()
629 }
630
631 pub async fn session_status(&self, id: &str) -> Option<Value> {
632 let metadata = self.metadata.read().await;
633 metadata.get(id).map(|meta| {
634 json!({
635 "archived": meta.archived,
636 "shared": meta.shared,
637 "parentID": meta.parent_id,
638 "snapshotCount": meta.snapshots.len()
639 })
640 })
641 }
642
643 pub async fn session_diff(&self, id: &str) -> Option<Value> {
644 let sessions = self.sessions.read().await;
645 let current = sessions.get(id)?;
646 let metadata = self.metadata.read().await;
647 let default = SessionMeta::default();
648 let meta = metadata.get(id).unwrap_or(&default);
649 let last_snapshot_len = meta.snapshots.last().map(|s| s.len()).unwrap_or(0);
650 Some(json!({
651 "sessionID": id,
652 "currentMessageCount": current.messages.len(),
653 "lastSnapshotMessageCount": last_snapshot_len,
654 "delta": current.messages.len() as i64 - last_snapshot_len as i64
655 }))
656 }
657
658 pub async fn set_todos(&self, id: &str, todos: Vec<Value>) -> anyhow::Result<()> {
659 let mut metadata = self.metadata.write().await;
660 let meta = metadata
661 .entry(id.to_string())
662 .or_insert_with(SessionMeta::default);
663 meta.todos = normalize_todo_items(todos);
664 drop(metadata);
665 self.flush().await
666 }
667
668 pub async fn get_todos(&self, id: &str) -> Vec<Value> {
669 let todos = self
670 .metadata
671 .read()
672 .await
673 .get(id)
674 .map(|meta| meta.todos.clone())
675 .unwrap_or_default();
676 normalize_todo_items(todos)
677 }
678
679 pub async fn add_question_request(
680 &self,
681 session_id: &str,
682 message_id: &str,
683 questions: Vec<Value>,
684 ) -> anyhow::Result<QuestionRequest> {
685 if questions.is_empty() {
686 return Err(anyhow::anyhow!(
687 "cannot add empty question request for session {}",
688 session_id
689 ));
690 }
691 let request = QuestionRequest {
692 id: format!("q-{}", Uuid::new_v4()),
693 session_id: session_id.to_string(),
694 questions,
695 tool: Some(QuestionToolRef {
696 call_id: format!("call-{}", Uuid::new_v4()),
697 message_id: message_id.to_string(),
698 }),
699 };
700 self.question_requests
701 .write()
702 .await
703 .insert(request.id.clone(), request.clone());
704 self.flush().await?;
705 Ok(request)
706 }
707
708 pub async fn list_question_requests(&self) -> Vec<QuestionRequest> {
709 self.question_requests
710 .read()
711 .await
712 .values()
713 .cloned()
714 .collect()
715 }
716
717 pub async fn reply_question(&self, request_id: &str) -> anyhow::Result<bool> {
718 let removed = self
719 .question_requests
720 .write()
721 .await
722 .remove(request_id)
723 .is_some();
724 if removed {
725 self.flush().await?;
726 }
727 Ok(removed)
728 }
729
730 pub async fn reject_question(&self, request_id: &str) -> anyhow::Result<bool> {
731 self.reply_question(request_id).await
732 }
733
734 pub async fn attach_session_to_workspace(
735 &self,
736 session_id: &str,
737 target_workspace: &str,
738 reason_tag: &str,
739 ) -> anyhow::Result<Option<Session>> {
740 let Some(target_workspace) = normalize_workspace_path(target_workspace) else {
741 return Ok(None);
742 };
743 let mut sessions = self.sessions.write().await;
744 let Some(session) = sessions.get_mut(session_id) else {
745 return Ok(None);
746 };
747 let previous_workspace = session
748 .workspace_root
749 .clone()
750 .or_else(|| normalize_workspace_path(&session.directory));
751
752 if session.origin_workspace_root.is_none() {
753 session.origin_workspace_root = previous_workspace.clone();
754 }
755 session.attached_from_workspace = previous_workspace;
756 session.attached_to_workspace = Some(target_workspace.clone());
757 session.attach_timestamp_ms = Some(Utc::now().timestamp_millis().max(0) as u64);
758 session.attach_reason = Some(reason_tag.trim().to_string());
759 session.workspace_root = Some(target_workspace.clone());
760 session.directory = target_workspace;
761 session.time.updated = Utc::now();
762 let updated = session.clone();
763 drop(sessions);
764 self.flush().await?;
765 Ok(Some(updated))
766 }
767
768 async fn flush(&self) -> anyhow::Result<()> {
769 let snapshot = self.sessions.read().await.clone();
770 let payload = serde_json::to_string_pretty(&snapshot)?;
771 fs::write(self.base.join("sessions.json"), payload).await?;
772 let metadata_snapshot = self.metadata.read().await.clone();
773 let metadata_payload = serde_json::to_string_pretty(&metadata_snapshot)?;
774 fs::write(self.base.join("session_meta.json"), metadata_payload).await?;
775 let questions_snapshot = self.question_requests.read().await.clone();
776 let questions_payload = serde_json::to_string_pretty(&questions_snapshot)?;
777 fs::write(self.base.join("questions.json"), questions_payload).await?;
778 Ok(())
779 }
780
781 async fn write_legacy_import_marker(&self, marker: &LegacyImportMarker) -> anyhow::Result<()> {
782 let payload = serde_json::to_string_pretty(marker)?;
783 fs::write(self.base.join(LEGACY_IMPORT_MARKER_FILE), payload).await?;
784 Ok(())
785 }
786}
787
788fn normalize_todo_items(items: Vec<Value>) -> Vec<Value> {
789 items
790 .into_iter()
791 .filter_map(|item| {
792 let obj = item.as_object()?;
793 let content = obj
794 .get("content")
795 .and_then(|v| v.as_str())
796 .or_else(|| obj.get("text").and_then(|v| v.as_str()))
797 .unwrap_or("")
798 .trim()
799 .to_string();
800 if content.is_empty() {
801 return None;
802 }
803 let id = obj
804 .get("id")
805 .and_then(|v| v.as_str())
806 .filter(|s| !s.trim().is_empty())
807 .map(ToString::to_string)
808 .unwrap_or_else(|| format!("todo-{}", Uuid::new_v4()));
809 let status = obj
810 .get("status")
811 .and_then(|v| v.as_str())
812 .filter(|s| !s.trim().is_empty())
813 .map(ToString::to_string)
814 .unwrap_or_else(|| "pending".to_string());
815 Some(json!({
816 "id": id,
817 "content": content,
818 "status": status
819 }))
820 })
821 .collect()
822}
823
824#[derive(Debug)]
825struct LegacyScanResult {
826 sessions: HashMap<String, Session>,
827 legacy_counts: LegacyTreeCounts,
828 imported_counts: LegacyImportedCounts,
829}
830
831#[derive(Debug, Default)]
832struct LegacyMergeStats {
833 changed: bool,
834 sessions_merged: u64,
835 messages_recovered: u64,
836 parts_recovered: u64,
837}
838
839fn now_ms_u64() -> u64 {
840 Utc::now().timestamp_millis().max(0) as u64
841}
842
843async fn should_run_legacy_scan_on_startup(marker_path: &Path, sessions_exist: bool) -> bool {
844 if !sessions_exist {
845 return true;
846 }
847 if read_legacy_import_marker(marker_path).await.is_none() {
850 return false;
851 }
852 false
853}
854
855async fn read_legacy_import_marker(marker_path: &Path) -> Option<LegacyImportMarker> {
856 let raw = fs::read_to_string(marker_path).await.ok()?;
857 serde_json::from_str::<LegacyImportMarker>(&raw).ok()
858}
859
860fn scan_legacy_sessions(base: &Path) -> anyhow::Result<LegacyScanResult> {
861 let sessions = load_legacy_opencode_sessions(base).unwrap_or_default();
862 let imported_counts = LegacyImportedCounts {
863 sessions: sessions.len() as u64,
864 messages: sessions.values().map(|s| s.messages.len() as u64).sum(),
865 parts: sessions
866 .values()
867 .flat_map(|s| s.messages.iter())
868 .map(|m| m.parts.len() as u64)
869 .sum(),
870 };
871 let legacy_counts = LegacyTreeCounts {
872 session_files: count_legacy_json_files(&base.join("session")),
873 message_files: count_legacy_json_files(&base.join("message")),
874 part_files: count_legacy_json_files(&base.join("part")),
875 };
876 Ok(LegacyScanResult {
877 sessions,
878 legacy_counts,
879 imported_counts,
880 })
881}
882
883fn count_legacy_json_files(root: &Path) -> u64 {
884 if !root.is_dir() {
885 return 0;
886 }
887 let mut count = 0u64;
888 let mut stack = vec![root.to_path_buf()];
889 while let Some(dir) = stack.pop() {
890 if let Ok(entries) = std::fs::read_dir(&dir) {
891 for entry in entries.flatten() {
892 let path = entry.path();
893 if entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
894 stack.push(path);
895 continue;
896 }
897 if path.extension().and_then(|ext| ext.to_str()) == Some("json") {
898 count += 1;
899 }
900 }
901 }
902 }
903 count
904}
905
906fn merge_legacy_sessions(
907 current: &mut HashMap<String, Session>,
908 imported: HashMap<String, Session>,
909) -> bool {
910 merge_legacy_sessions_with_stats(current, imported).changed
911}
912
913fn merge_legacy_sessions_with_stats(
914 current: &mut HashMap<String, Session>,
915 imported: HashMap<String, Session>,
916) -> LegacyMergeStats {
917 let mut stats = LegacyMergeStats::default();
918 for (id, legacy) in imported {
919 let legacy_message_count = legacy.messages.len() as u64;
920 let legacy_part_count = legacy
921 .messages
922 .iter()
923 .map(|m| m.parts.len() as u64)
924 .sum::<u64>();
925 match current.get_mut(&id) {
926 None => {
927 current.insert(id, legacy);
928 stats.changed = true;
929 stats.sessions_merged += 1;
930 stats.messages_recovered += legacy_message_count;
931 stats.parts_recovered += legacy_part_count;
932 }
933 Some(existing) => {
934 let should_merge_messages =
935 existing.messages.is_empty() && !legacy.messages.is_empty();
936 let should_fill_title =
937 existing.title.trim().is_empty() && !legacy.title.trim().is_empty();
938 let should_fill_directory = (existing.directory.trim().is_empty()
939 || existing.directory.trim() == "."
940 || existing.directory.trim() == "./"
941 || existing.directory.trim() == ".\\")
942 && !legacy.directory.trim().is_empty();
943 let should_fill_workspace =
944 existing.workspace_root.is_none() && legacy.workspace_root.is_some();
945 if should_merge_messages {
946 existing.messages = legacy.messages.clone();
947 }
948 if should_fill_title {
949 existing.title = legacy.title.clone();
950 }
951 if should_fill_directory {
952 existing.directory = legacy.directory.clone();
953 }
954 if should_fill_workspace {
955 existing.workspace_root = legacy.workspace_root.clone();
956 }
957 if should_merge_messages
958 || should_fill_title
959 || should_fill_directory
960 || should_fill_workspace
961 {
962 stats.changed = true;
963 if should_merge_messages {
964 stats.sessions_merged += 1;
965 stats.messages_recovered += legacy_message_count;
966 stats.parts_recovered += legacy_part_count;
967 }
968 }
969 }
970 }
971 }
972 stats
973}
974
975fn hydrate_workspace_roots(sessions: &mut HashMap<String, Session>) -> bool {
976 let mut changed = false;
977 for session in sessions.values_mut() {
978 if session.workspace_root.is_none() {
979 let normalized = normalize_workspace_path(&session.directory);
980 if normalized.is_some() {
981 session.workspace_root = normalized;
982 changed = true;
983 }
984 }
985 }
986 changed
987}
988
989fn repair_session_titles(sessions: &mut HashMap<String, Session>) -> bool {
990 let mut changed = false;
991 for session in sessions.values_mut() {
992 if !title_needs_repair(&session.title) {
993 continue;
994 }
995 let first_user_text = session.messages.iter().find_map(|message| {
996 if !matches!(message.role, MessageRole::User) {
997 return None;
998 }
999 message.parts.iter().find_map(|part| match part {
1000 MessagePart::Text { text } if !text.trim().is_empty() => Some(text.as_str()),
1001 _ => None,
1002 })
1003 });
1004 let Some(source) = first_user_text else {
1005 continue;
1006 };
1007 let Some(derived) = derive_session_title_from_prompt(source, 60) else {
1008 continue;
1009 };
1010 if derived == session.title {
1011 continue;
1012 }
1013 session.title = derived;
1014 session.time.updated = Utc::now();
1015 changed = true;
1016 }
1017 changed
1018}
1019
1020#[derive(Debug, Deserialize)]
1021struct LegacySessionTime {
1022 created: i64,
1023 updated: i64,
1024}
1025
1026#[derive(Debug, Deserialize)]
1027struct LegacySession {
1028 id: String,
1029 slug: Option<String>,
1030 version: Option<String>,
1031 #[serde(rename = "projectID")]
1032 project_id: Option<String>,
1033 title: Option<String>,
1034 directory: Option<String>,
1035 time: LegacySessionTime,
1036}
1037
1038fn load_legacy_opencode_sessions(base: &Path) -> anyhow::Result<HashMap<String, Session>> {
1039 let legacy_root = base.join("session");
1040 if !legacy_root.is_dir() {
1041 return Ok(HashMap::new());
1042 }
1043
1044 let mut out = HashMap::new();
1045 let mut stack = vec![legacy_root];
1046 while let Some(dir) = stack.pop() {
1047 for entry in std::fs::read_dir(&dir)? {
1048 let entry = entry?;
1049 let path = entry.path();
1050 if entry.file_type()?.is_dir() {
1051 stack.push(path);
1052 continue;
1053 }
1054 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1055 continue;
1056 }
1057 let raw = match std::fs::read_to_string(&path) {
1058 Ok(v) => v,
1059 Err(_) => continue,
1060 };
1061 let legacy = match serde_json::from_str::<LegacySession>(&raw) {
1062 Ok(v) => v,
1063 Err(_) => continue,
1064 };
1065 let created = Utc
1066 .timestamp_millis_opt(legacy.time.created)
1067 .single()
1068 .unwrap_or_else(Utc::now);
1069 let updated = Utc
1070 .timestamp_millis_opt(legacy.time.updated)
1071 .single()
1072 .unwrap_or(created);
1073
1074 let session_id = legacy.id.clone();
1075 out.insert(
1076 session_id.clone(),
1077 Session {
1078 id: session_id.clone(),
1079 slug: legacy.slug,
1080 version: legacy.version,
1081 project_id: legacy.project_id,
1082 title: legacy
1083 .title
1084 .filter(|s| !s.trim().is_empty())
1085 .unwrap_or_else(|| "New session".to_string()),
1086 directory: legacy
1087 .directory
1088 .clone()
1089 .filter(|s| !s.trim().is_empty())
1090 .unwrap_or_else(|| ".".to_string()),
1091 workspace_root: legacy
1092 .directory
1093 .as_deref()
1094 .and_then(normalize_workspace_path),
1095 origin_workspace_root: None,
1096 attached_from_workspace: None,
1097 attached_to_workspace: None,
1098 attach_timestamp_ms: None,
1099 attach_reason: None,
1100 time: tandem_types::SessionTime { created, updated },
1101 model: None,
1102 provider: None,
1103 environment: None,
1104 messages: load_legacy_session_messages(base, &session_id),
1105 },
1106 );
1107 }
1108 }
1109 Ok(out)
1110}
1111
1112#[derive(Debug, Deserialize)]
1113struct LegacyMessageTime {
1114 created: i64,
1115}
1116
1117#[derive(Debug, Deserialize)]
1118struct LegacyMessage {
1119 id: String,
1120 role: String,
1121 time: LegacyMessageTime,
1122}
1123
1124#[derive(Debug, Deserialize)]
1125struct LegacyPart {
1126 #[serde(rename = "type")]
1127 part_type: Option<String>,
1128 text: Option<String>,
1129 tool: Option<String>,
1130 args: Option<Value>,
1131 result: Option<Value>,
1132 error: Option<String>,
1133}
1134
1135fn load_legacy_session_messages(base: &Path, session_id: &str) -> Vec<Message> {
1136 let msg_dir = base.join("message").join(session_id);
1137 if !msg_dir.is_dir() {
1138 return Vec::new();
1139 }
1140
1141 let mut legacy_messages: Vec<(i64, Message)> = Vec::new();
1142
1143 let Ok(entries) = std::fs::read_dir(&msg_dir) else {
1144 return Vec::new();
1145 };
1146
1147 for entry in entries.flatten() {
1148 let path = entry.path();
1149 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1150 continue;
1151 }
1152 let Ok(raw) = std::fs::read_to_string(&path) else {
1153 continue;
1154 };
1155 let Ok(legacy) = serde_json::from_str::<LegacyMessage>(&raw) else {
1156 continue;
1157 };
1158
1159 let created_at = Utc
1160 .timestamp_millis_opt(legacy.time.created)
1161 .single()
1162 .unwrap_or_else(Utc::now);
1163
1164 legacy_messages.push((
1165 legacy.time.created,
1166 Message {
1167 id: legacy.id.clone(),
1168 role: legacy_role_to_message_role(&legacy.role),
1169 parts: load_legacy_message_parts(base, &legacy.id),
1170 created_at,
1171 },
1172 ));
1173 }
1174
1175 legacy_messages.sort_by_key(|(created_ms, _)| *created_ms);
1176 legacy_messages.into_iter().map(|(_, msg)| msg).collect()
1177}
1178
1179fn load_legacy_message_parts(base: &Path, message_id: &str) -> Vec<MessagePart> {
1180 let parts_dir = base.join("part").join(message_id);
1181 if !parts_dir.is_dir() {
1182 return Vec::new();
1183 }
1184
1185 let Ok(entries) = std::fs::read_dir(&parts_dir) else {
1186 return Vec::new();
1187 };
1188
1189 let mut out = Vec::new();
1190 for entry in entries.flatten() {
1191 let path = entry.path();
1192 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1193 continue;
1194 }
1195 let Ok(raw) = std::fs::read_to_string(&path) else {
1196 continue;
1197 };
1198 let Ok(part) = serde_json::from_str::<LegacyPart>(&raw) else {
1199 continue;
1200 };
1201
1202 let mapped = if let Some(tool) = part.tool {
1203 Some(MessagePart::ToolInvocation {
1204 tool,
1205 args: part.args.unwrap_or_else(|| json!({})),
1206 result: part.result,
1207 error: part.error,
1208 })
1209 } else {
1210 match part.part_type.as_deref() {
1211 Some("reasoning") => Some(MessagePart::Reasoning {
1212 text: part.text.unwrap_or_default(),
1213 }),
1214 Some("tool") => Some(MessagePart::ToolInvocation {
1215 tool: "tool".to_string(),
1216 args: part.args.unwrap_or_else(|| json!({})),
1217 result: part.result,
1218 error: part.error,
1219 }),
1220 Some("text") | None => Some(MessagePart::Text {
1221 text: part.text.unwrap_or_default(),
1222 }),
1223 _ => None,
1224 }
1225 };
1226
1227 if let Some(part) = mapped {
1228 out.push(part);
1229 }
1230 }
1231 out
1232}
1233
1234fn legacy_role_to_message_role(role: &str) -> MessageRole {
1235 match role.to_lowercase().as_str() {
1236 "user" => MessageRole::User,
1237 "assistant" => MessageRole::Assistant,
1238 "system" => MessageRole::System,
1239 "tool" => MessageRole::Tool,
1240 _ => MessageRole::Assistant,
1241 }
1242}
1243
1244#[derive(Debug, Clone, Default)]
1245struct MessageMergeStats {
1246 messages_recovered: u64,
1247 parts_recovered: u64,
1248 conflicts_merged: u64,
1249}
1250
1251fn message_richness(msg: &Message) -> usize {
1252 msg.parts
1253 .iter()
1254 .map(|p| match p {
1255 MessagePart::Text { text } | MessagePart::Reasoning { text } => {
1256 if text.trim().is_empty() {
1257 0
1258 } else {
1259 1
1260 }
1261 }
1262 MessagePart::ToolInvocation { result, error, .. } => {
1263 if result.is_some() || error.is_some() {
1264 2
1265 } else {
1266 1
1267 }
1268 }
1269 })
1270 .sum()
1271}
1272
1273fn most_recent_message_time(messages: &[Message]) -> Option<chrono::DateTime<Utc>> {
1274 messages.iter().map(|m| m.created_at).max()
1275}
1276
1277fn merge_session_messages(
1278 existing: &[Message],
1279 imported: &[Message],
1280) -> (Vec<Message>, MessageMergeStats, bool) {
1281 if existing.is_empty() {
1282 let messages_recovered = imported.len() as u64;
1283 let parts_recovered = imported.iter().map(|m| m.parts.len() as u64).sum();
1284 return (
1285 imported.to_vec(),
1286 MessageMergeStats {
1287 messages_recovered,
1288 parts_recovered,
1289 conflicts_merged: 0,
1290 },
1291 true,
1292 );
1293 }
1294
1295 let mut merged_by_id: HashMap<String, Message> = existing
1296 .iter()
1297 .cloned()
1298 .map(|m| (m.id.clone(), m))
1299 .collect();
1300 let mut stats = MessageMergeStats::default();
1301 let mut changed = false;
1302
1303 for incoming in imported {
1304 match merged_by_id.get(&incoming.id) {
1305 None => {
1306 merged_by_id.insert(incoming.id.clone(), incoming.clone());
1307 stats.messages_recovered += 1;
1308 stats.parts_recovered += incoming.parts.len() as u64;
1309 changed = true;
1310 }
1311 Some(current) => {
1312 let incoming_richer = message_richness(incoming) > message_richness(current)
1313 || incoming.parts.len() > current.parts.len();
1314 if incoming_richer {
1315 merged_by_id.insert(incoming.id.clone(), incoming.clone());
1316 stats.conflicts_merged += 1;
1317 changed = true;
1318 }
1319 }
1320 }
1321 }
1322
1323 let mut out: Vec<Message> = merged_by_id.into_values().collect();
1324 out.sort_by_key(|m| m.created_at);
1325 (out, stats, changed)
1326}
1327
1328#[cfg(test)]
1329mod tests {
1330 use super::*;
1331 use std::fs as stdfs;
1332
1333 #[tokio::test]
1334 async fn todos_are_normalized_to_wire_shape() {
1335 let base = std::env::temp_dir().join(format!("tandem-core-test-{}", Uuid::new_v4()));
1336 let storage = Storage::new(&base).await.expect("storage");
1337 let session = Session::new(Some("test".to_string()), Some(".".to_string()));
1338 let id = session.id.clone();
1339 storage.save_session(session).await.expect("save session");
1340
1341 storage
1342 .set_todos(
1343 &id,
1344 vec![
1345 json!({"content":"first"}),
1346 json!({"text":"second", "status":"in_progress"}),
1347 json!({"id":"keep-id","content":"third","status":"completed"}),
1348 ],
1349 )
1350 .await
1351 .expect("set todos");
1352
1353 let todos = storage.get_todos(&id).await;
1354 assert_eq!(todos.len(), 3);
1355 for todo in todos {
1356 assert!(todo.get("id").and_then(|v| v.as_str()).is_some());
1357 assert!(todo.get("content").and_then(|v| v.as_str()).is_some());
1358 assert!(todo.get("status").and_then(|v| v.as_str()).is_some());
1359 }
1360 }
1361
1362 #[tokio::test]
1363 async fn imports_legacy_opencode_session_index_when_sessions_json_missing() {
1364 let base =
1365 std::env::temp_dir().join(format!("tandem-core-legacy-import-{}", Uuid::new_v4()));
1366 let legacy_session_dir = base.join("session").join("global");
1367 stdfs::create_dir_all(&legacy_session_dir).expect("legacy session dir");
1368 stdfs::write(
1369 legacy_session_dir.join("ses_test.json"),
1370 r#"{
1371 "id": "ses_test",
1372 "slug": "test",
1373 "version": "1.0.0",
1374 "projectID": "proj_1",
1375 "directory": "C:\\work\\demo",
1376 "title": "Legacy Session",
1377 "time": { "created": 1770913145613, "updated": 1770913146613 }
1378}"#,
1379 )
1380 .expect("legacy session write");
1381
1382 let storage = Storage::new(&base).await.expect("storage");
1383 let sessions = storage.list_sessions().await;
1384 assert_eq!(sessions.len(), 1);
1385 assert_eq!(sessions[0].id, "ses_test");
1386 assert_eq!(sessions[0].title, "Legacy Session");
1387 assert!(base.join("sessions.json").exists());
1388 }
1389
1390 #[tokio::test]
1391 async fn imports_legacy_messages_and_parts_for_session() {
1392 let base = std::env::temp_dir().join(format!("tandem-core-legacy-msg-{}", Uuid::new_v4()));
1393 let session_dir = base.join("session").join("global");
1394 let message_dir = base.join("message").join("ses_test");
1395 let part_dir = base.join("part").join("msg_1");
1396 stdfs::create_dir_all(&session_dir).expect("session dir");
1397 stdfs::create_dir_all(&message_dir).expect("message dir");
1398 stdfs::create_dir_all(&part_dir).expect("part dir");
1399
1400 stdfs::write(
1401 session_dir.join("ses_test.json"),
1402 r#"{
1403 "id": "ses_test",
1404 "projectID": "proj_1",
1405 "directory": "C:\\work\\demo",
1406 "title": "Legacy Session",
1407 "time": { "created": 1770913145613, "updated": 1770913146613 }
1408}"#,
1409 )
1410 .expect("write session");
1411
1412 stdfs::write(
1413 message_dir.join("msg_1.json"),
1414 r#"{
1415 "id": "msg_1",
1416 "sessionID": "ses_test",
1417 "role": "assistant",
1418 "time": { "created": 1770913145613 }
1419}"#,
1420 )
1421 .expect("write msg");
1422
1423 stdfs::write(
1424 part_dir.join("prt_1.json"),
1425 r#"{
1426 "id": "prt_1",
1427 "sessionID": "ses_test",
1428 "messageID": "msg_1",
1429 "type": "text",
1430 "text": "hello from legacy"
1431}"#,
1432 )
1433 .expect("write part");
1434
1435 let storage = Storage::new(&base).await.expect("storage");
1436 let sessions = storage.list_sessions().await;
1437 assert_eq!(sessions.len(), 1);
1438 assert_eq!(sessions[0].messages.len(), 1);
1439 assert_eq!(sessions[0].messages[0].parts.len(), 1);
1440 }
1441
1442 #[tokio::test]
1443 async fn skips_legacy_merge_when_sessions_json_exists() {
1444 let base =
1445 std::env::temp_dir().join(format!("tandem-core-legacy-merge-{}", Uuid::new_v4()));
1446 stdfs::create_dir_all(&base).expect("base");
1447 stdfs::write(
1448 base.join("sessions.json"),
1449 r#"{
1450 "ses_current": {
1451 "id": "ses_current",
1452 "slug": null,
1453 "version": "v1",
1454 "project_id": null,
1455 "title": "Current Session",
1456 "directory": ".",
1457 "workspace_root": null,
1458 "origin_workspace_root": null,
1459 "attached_from_workspace": null,
1460 "attached_to_workspace": null,
1461 "attach_timestamp_ms": null,
1462 "attach_reason": null,
1463 "time": {"created":"2026-01-01T00:00:00Z","updated":"2026-01-01T00:00:00Z"},
1464 "model": null,
1465 "provider": null,
1466 "messages": []
1467 }
1468}"#,
1469 )
1470 .expect("sessions.json");
1471
1472 let legacy_session_dir = base.join("session").join("global");
1473 stdfs::create_dir_all(&legacy_session_dir).expect("legacy session dir");
1474 stdfs::write(
1475 legacy_session_dir.join("ses_legacy.json"),
1476 r#"{
1477 "id": "ses_legacy",
1478 "slug": "legacy",
1479 "version": "1.0.0",
1480 "projectID": "proj_legacy",
1481 "directory": "C:\\work\\legacy",
1482 "title": "Legacy Session",
1483 "time": { "created": 1770913145613, "updated": 1770913146613 }
1484}"#,
1485 )
1486 .expect("legacy session write");
1487
1488 let storage = Storage::new(&base).await.expect("storage");
1489 let sessions = storage.list_sessions().await;
1490 let ids = sessions.iter().map(|s| s.id.clone()).collect::<Vec<_>>();
1491 assert!(ids.contains(&"ses_current".to_string()));
1492 assert!(!ids.contains(&"ses_legacy".to_string()));
1493 }
1494
1495 #[tokio::test]
1496 async fn list_sessions_scoped_filters_by_workspace_root() {
1497 let base = std::env::temp_dir().join(format!("tandem-core-scope-{}", Uuid::new_v4()));
1498 let storage = Storage::new(&base).await.expect("storage");
1499 let ws_a = base.join("ws-a");
1500 let ws_b = base.join("ws-b");
1501 stdfs::create_dir_all(&ws_a).expect("ws_a");
1502 stdfs::create_dir_all(&ws_b).expect("ws_b");
1503 let ws_a_str = ws_a.to_string_lossy().to_string();
1504 let ws_b_str = ws_b.to_string_lossy().to_string();
1505
1506 let mut a = Session::new(Some("a".to_string()), Some(ws_a_str.clone()));
1507 a.workspace_root = Some(ws_a_str.clone());
1508 storage.save_session(a).await.expect("save a");
1509
1510 let mut b = Session::new(Some("b".to_string()), Some(ws_b_str.clone()));
1511 b.workspace_root = Some(ws_b_str);
1512 storage.save_session(b).await.expect("save b");
1513
1514 let scoped = storage
1515 .list_sessions_scoped(SessionListScope::Workspace {
1516 workspace_root: ws_a_str,
1517 })
1518 .await;
1519 assert_eq!(scoped.len(), 1);
1520 assert_eq!(scoped[0].title, "a");
1521 }
1522
1523 #[tokio::test]
1524 async fn attach_session_persists_audit_metadata() {
1525 let base = std::env::temp_dir().join(format!("tandem-core-attach-{}", Uuid::new_v4()));
1526 let storage = Storage::new(&base).await.expect("storage");
1527 let ws_a = base.join("ws-a");
1528 let ws_b = base.join("ws-b");
1529 stdfs::create_dir_all(&ws_a).expect("ws_a");
1530 stdfs::create_dir_all(&ws_b).expect("ws_b");
1531 let ws_a_str = ws_a.to_string_lossy().to_string();
1532 let ws_b_str = ws_b.to_string_lossy().to_string();
1533 let mut session = Session::new(Some("s".to_string()), Some(ws_a_str.clone()));
1534 session.workspace_root = Some(ws_a_str);
1535 let id = session.id.clone();
1536 storage.save_session(session).await.expect("save");
1537
1538 let updated = storage
1539 .attach_session_to_workspace(&id, &ws_b_str, "manual")
1540 .await
1541 .expect("attach")
1542 .expect("session exists");
1543 let normalized_expected = normalize_workspace_path(&ws_b_str).expect("normalized path");
1544 assert_eq!(
1545 updated.workspace_root.as_deref(),
1546 Some(normalized_expected.as_str())
1547 );
1548 assert_eq!(
1549 updated.attached_to_workspace.as_deref(),
1550 Some(normalized_expected.as_str())
1551 );
1552 assert_eq!(updated.attach_reason.as_deref(), Some("manual"));
1553 assert!(updated.attach_timestamp_ms.is_some());
1554 }
1555
1556 #[tokio::test]
1557 async fn append_message_part_persists_tool_invocation_and_result() {
1558 let base = std::env::temp_dir().join(format!("tandem-core-tool-parts-{}", Uuid::new_v4()));
1559 let storage = Storage::new(&base).await.expect("storage");
1560 let session = Session::new(Some("tool parts".to_string()), Some(".".to_string()));
1561 let session_id = session.id.clone();
1562 storage.save_session(session).await.expect("save session");
1563
1564 let user = Message::new(
1565 MessageRole::User,
1566 vec![MessagePart::Text {
1567 text: "build ui".to_string(),
1568 }],
1569 );
1570 let message_id = user.id.clone();
1571 storage
1572 .append_message(&session_id, user)
1573 .await
1574 .expect("append user");
1575
1576 storage
1577 .append_message_part(
1578 &session_id,
1579 &message_id,
1580 MessagePart::ToolInvocation {
1581 tool: "write".to_string(),
1582 args: json!({"path":"game.html","content":"<html></html>"}),
1583 result: None,
1584 error: None,
1585 },
1586 )
1587 .await
1588 .expect("append invocation");
1589 storage
1590 .append_message_part(
1591 &session_id,
1592 &message_id,
1593 MessagePart::ToolInvocation {
1594 tool: "write".to_string(),
1595 args: json!({}),
1596 result: Some(json!("ok")),
1597 error: None,
1598 },
1599 )
1600 .await
1601 .expect("append result");
1602
1603 let session = storage.get_session(&session_id).await.expect("session");
1604 let message = session
1605 .messages
1606 .iter()
1607 .find(|message| message.id == message_id)
1608 .expect("message");
1609 assert_eq!(message.parts.len(), 2);
1610 match &message.parts[1] {
1611 MessagePart::ToolInvocation {
1612 tool,
1613 result,
1614 error,
1615 ..
1616 } => {
1617 assert_eq!(tool, "write");
1618 assert_eq!(result.as_ref(), Some(&json!("ok")));
1619 assert_eq!(error.as_deref(), None);
1620 }
1621 other => panic!("expected tool part, got {other:?}"),
1622 }
1623 }
1624
1625 #[tokio::test]
1626 async fn append_message_part_retains_failed_tool_error() {
1627 let base = std::env::temp_dir().join(format!("tandem-core-tool-error-{}", Uuid::new_v4()));
1628 let storage = Storage::new(&base).await.expect("storage");
1629 let session = Session::new(Some("tool errors".to_string()), Some(".".to_string()));
1630 let session_id = session.id.clone();
1631 storage.save_session(session).await.expect("save session");
1632
1633 let user = Message::new(
1634 MessageRole::User,
1635 vec![MessagePart::Text {
1636 text: "write file".to_string(),
1637 }],
1638 );
1639 let message_id = user.id.clone();
1640 storage
1641 .append_message(&session_id, user)
1642 .await
1643 .expect("append user");
1644
1645 storage
1646 .append_message_part(
1647 &session_id,
1648 &message_id,
1649 MessagePart::ToolInvocation {
1650 tool: "write".to_string(),
1651 args: json!({"path":"game.html"}),
1652 result: None,
1653 error: None,
1654 },
1655 )
1656 .await
1657 .expect("append invocation");
1658 storage
1659 .append_message_part(
1660 &session_id,
1661 &message_id,
1662 MessagePart::ToolInvocation {
1663 tool: "write".to_string(),
1664 args: json!({}),
1665 result: None,
1666 error: Some("WRITE_CONTENT_MISSING".to_string()),
1667 },
1668 )
1669 .await
1670 .expect("append error");
1671
1672 let session = storage.get_session(&session_id).await.expect("session");
1673 let message = session
1674 .messages
1675 .iter()
1676 .find(|message| message.id == message_id)
1677 .expect("message");
1678 match &message.parts[1] {
1679 MessagePart::ToolInvocation { error, .. } => {
1680 assert_eq!(error.as_deref(), Some("WRITE_CONTENT_MISSING"));
1681 }
1682 other => panic!("expected tool part, got {other:?}"),
1683 }
1684 }
1685
1686 #[tokio::test]
1687 async fn append_message_part_coalesces_repeated_tool_invocation_updates() {
1688 let base = std::env::temp_dir().join(format!("tandem-core-tool-merge-{}", Uuid::new_v4()));
1689 let storage = Storage::new(&base).await.expect("storage");
1690 let session = Session::new(Some("tool merge".to_string()), Some(".".to_string()));
1691 let session_id = session.id.clone();
1692 storage.save_session(session).await.expect("save session");
1693
1694 let user = Message::new(
1695 MessageRole::User,
1696 vec![MessagePart::Text {
1697 text: "build ui".to_string(),
1698 }],
1699 );
1700 let message_id = user.id.clone();
1701 storage
1702 .append_message(&session_id, user)
1703 .await
1704 .expect("append user");
1705
1706 storage
1707 .append_message_part(
1708 &session_id,
1709 &message_id,
1710 MessagePart::ToolInvocation {
1711 tool: "write".to_string(),
1712 args: json!({"path":"game.html"}),
1713 result: None,
1714 error: None,
1715 },
1716 )
1717 .await
1718 .expect("append first invocation");
1719 storage
1720 .append_message_part(
1721 &session_id,
1722 &message_id,
1723 MessagePart::ToolInvocation {
1724 tool: "write".to_string(),
1725 args: json!({"path":"game.html","content":"<html></html>"}),
1726 result: None,
1727 error: None,
1728 },
1729 )
1730 .await
1731 .expect("append updated invocation");
1732
1733 let session = storage.get_session(&session_id).await.expect("session");
1734 let message = session
1735 .messages
1736 .iter()
1737 .find(|message| message.id == message_id)
1738 .expect("message");
1739 assert_eq!(message.parts.len(), 2);
1740 match &message.parts[1] {
1741 MessagePart::ToolInvocation { tool, args, .. } => {
1742 assert_eq!(tool, "write");
1743 assert_eq!(args["path"], "game.html");
1744 assert_eq!(args["content"], "<html></html>");
1745 }
1746 other => panic!("expected tool part, got {other:?}"),
1747 }
1748 }
1749
1750 #[tokio::test]
1751 async fn append_message_part_falls_back_to_latest_user_message_when_id_missing() {
1752 let base =
1753 std::env::temp_dir().join(format!("tandem-core-tool-fallback-{}", Uuid::new_v4()));
1754 let storage = Storage::new(&base).await.expect("storage");
1755 let session = Session::new(Some("tool fallback".to_string()), Some(".".to_string()));
1756 let session_id = session.id.clone();
1757 storage.save_session(session).await.expect("save session");
1758
1759 let first = Message::new(
1760 MessageRole::User,
1761 vec![MessagePart::Text {
1762 text: "first prompt".to_string(),
1763 }],
1764 );
1765 let second = Message::new(
1766 MessageRole::User,
1767 vec![MessagePart::Text {
1768 text: "second prompt".to_string(),
1769 }],
1770 );
1771 let second_id = second.id.clone();
1772 storage
1773 .append_message(&session_id, first)
1774 .await
1775 .expect("append first");
1776 storage
1777 .append_message(&session_id, second)
1778 .await
1779 .expect("append second");
1780
1781 storage
1782 .append_message_part(
1783 &session_id,
1784 "missing-message-id",
1785 MessagePart::ToolInvocation {
1786 tool: "glob".to_string(),
1787 args: json!({"pattern":"*"}),
1788 result: Some(json!(["README.md"])),
1789 error: None,
1790 },
1791 )
1792 .await
1793 .expect("append fallback tool part");
1794
1795 let session = storage.get_session(&session_id).await.expect("session");
1796 let message = session
1797 .messages
1798 .iter()
1799 .find(|message| message.id == second_id)
1800 .expect("latest user message");
1801 match &message.parts[1] {
1802 MessagePart::ToolInvocation { tool, result, .. } => {
1803 assert_eq!(tool, "glob");
1804 assert_eq!(result.as_ref(), Some(&json!(["README.md"])));
1805 }
1806 other => panic!("expected tool part, got {other:?}"),
1807 }
1808 }
1809
1810 #[tokio::test]
1811 async fn startup_repairs_placeholder_titles_from_wrapped_user_messages() {
1812 let base =
1813 std::env::temp_dir().join(format!("tandem-core-title-repair-{}", Uuid::new_v4()));
1814 let storage = Storage::new(&base).await.expect("storage");
1815 let wrapped = "<memory_context>\n<current_session>\n- fact\n</current_session>\n</memory_context>\n\n[Mode instructions]\nUse tools.\n\n[User request]\nExplain this bug";
1816 let mut session = Session::new(Some("<memory_context>".to_string()), Some(".".to_string()));
1817 let id = session.id.clone();
1818 session.messages.push(Message::new(
1819 MessageRole::User,
1820 vec![MessagePart::Text {
1821 text: wrapped.to_string(),
1822 }],
1823 ));
1824 storage.save_session(session).await.expect("save");
1825 drop(storage);
1826
1827 let storage = Storage::new(&base).await.expect("storage");
1828 let repaired = storage.get_session(&id).await.expect("session");
1829 assert_eq!(repaired.title, "Explain this bug");
1830 }
1831}