1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use anyhow::Context;
5use chrono::{TimeZone, Utc};
6use serde::{Deserialize, Serialize};
7use serde_json::{json, Value};
8use tokio::fs;
9use tokio::sync::RwLock;
10use tokio::task;
11use uuid::Uuid;
12
13use tandem_types::{Message, MessagePart, MessageRole, Session};
14
15use crate::{
16 derive_session_title_from_prompt, normalize_workspace_path, title_needs_repair,
17 workspace_project_id,
18};
19
20#[derive(Debug, Clone, Serialize, Deserialize, Default)]
21pub struct SessionMeta {
22 pub parent_id: Option<String>,
23 #[serde(default)]
24 pub archived: bool,
25 #[serde(default)]
26 pub shared: bool,
27 pub share_id: Option<String>,
28 pub summary: Option<String>,
29 #[serde(default)]
30 pub snapshots: Vec<Vec<Message>>,
31 pub pre_revert: Option<Vec<Message>>,
32 #[serde(default)]
33 pub todos: Vec<Value>,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct QuestionToolRef {
38 #[serde(rename = "callID")]
39 pub call_id: String,
40 #[serde(rename = "messageID")]
41 pub message_id: String,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct QuestionRequest {
46 pub id: String,
47 #[serde(rename = "sessionID")]
48 pub session_id: String,
49 #[serde(default)]
50 pub questions: Vec<Value>,
51 #[serde(skip_serializing_if = "Option::is_none")]
52 pub tool: Option<QuestionToolRef>,
53}
54
55pub struct Storage {
56 base: PathBuf,
57 sessions: RwLock<HashMap<String, Session>>,
58 metadata: RwLock<HashMap<String, SessionMeta>>,
59 question_requests: RwLock<HashMap<String, QuestionRequest>>,
60}
61
62#[derive(Debug, Clone)]
63pub enum SessionListScope {
64 Global,
65 Workspace { workspace_root: String },
66}
67
68#[derive(Debug, Clone, Default, Serialize, Deserialize)]
69pub struct SessionRepairStats {
70 pub sessions_repaired: u64,
71 pub messages_recovered: u64,
72 pub parts_recovered: u64,
73 pub conflicts_merged: u64,
74}
75
76const LEGACY_IMPORT_MARKER_FILE: &str = "legacy_import_marker.json";
77const LEGACY_IMPORT_MARKER_VERSION: u32 = 1;
78
79#[derive(Debug, Clone, Default, Serialize, Deserialize)]
80pub struct LegacyTreeCounts {
81 pub session_files: u64,
82 pub message_files: u64,
83 pub part_files: u64,
84}
85
86#[derive(Debug, Clone, Default, Serialize, Deserialize)]
87pub struct LegacyImportedCounts {
88 pub sessions: u64,
89 pub messages: u64,
90 pub parts: u64,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct LegacyImportMarker {
95 pub version: u32,
96 pub created_at_ms: u64,
97 pub last_checked_at_ms: u64,
98 pub legacy_counts: LegacyTreeCounts,
99 pub imported_counts: LegacyImportedCounts,
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct LegacyRepairRunReport {
104 pub status: String,
105 pub marker_updated: bool,
106 pub sessions_merged: u64,
107 pub messages_recovered: u64,
108 pub parts_recovered: u64,
109 pub legacy_counts: LegacyTreeCounts,
110 pub imported_counts: LegacyImportedCounts,
111}
112
113fn snapshot_session_messages(
114 session_id: &str,
115 session: &Session,
116 metadata: &mut HashMap<String, SessionMeta>,
117) {
118 let meta = metadata
119 .entry(session_id.to_string())
120 .or_insert_with(SessionMeta::default);
121 meta.snapshots.push(session.messages.clone());
122 if meta.snapshots.len() > 25 {
123 let _ = meta.snapshots.remove(0);
124 }
125}
126
127fn merge_message_part(message: &mut Message, part: MessagePart) {
128 match part {
129 MessagePart::ToolInvocation {
130 tool,
131 args,
132 result,
133 error,
134 } => {
135 let args_are_empty =
136 args.is_null() || args.as_object().is_some_and(|value| value.is_empty());
137 if result.is_none() && error.is_none() {
138 if let Some(existing) = message.parts.iter_mut().rev().find(|existing| {
139 matches!(
140 existing,
141 MessagePart::ToolInvocation {
142 tool: existing_tool,
143 result: None,
144 error: None,
145 ..
146 } if existing_tool == &tool
147 )
148 }) {
149 if let MessagePart::ToolInvocation {
150 args: existing_args,
151 ..
152 } = existing
153 {
154 let existing_args_are_empty = existing_args.is_null()
155 || existing_args
156 .as_object()
157 .is_some_and(|value| value.is_empty());
158 if !args_are_empty || existing_args_are_empty {
159 *existing_args = args;
160 }
161 return;
162 }
163 }
164 }
165 if result.is_some() || error.is_some() {
166 if let Some(existing) = message.parts.iter_mut().rev().find(|existing| {
167 matches!(
168 existing,
169 MessagePart::ToolInvocation {
170 tool: existing_tool,
171 result: None,
172 error: None,
173 ..
174 } if existing_tool == &tool
175 )
176 }) {
177 if let MessagePart::ToolInvocation {
178 args: existing_args,
179 result: existing_result,
180 error: existing_error,
181 ..
182 } = existing
183 {
184 let existing_args_are_empty = existing_args.is_null()
185 || existing_args
186 .as_object()
187 .is_some_and(|value| value.is_empty());
188 if existing_args_are_empty {
189 if tool == "write" && args_are_empty {
190 tracing::info!(
191 tool = %tool,
192 "merging write result/error into existing tool part with empty args"
193 );
194 }
195 *existing_args = args.clone();
196 }
197 *existing_result = result;
198 *existing_error = error;
199 return;
200 }
201 }
202 }
203 message.parts.push(MessagePart::ToolInvocation {
204 tool,
205 args,
206 result,
207 error,
208 });
209 }
210 other => message.parts.push(other),
211 }
212}
213
214impl Storage {
215 pub async fn new(base: impl AsRef<Path>) -> anyhow::Result<Self> {
216 let base = base.as_ref().to_path_buf();
217 fs::create_dir_all(&base).await?;
218 let sessions_file = base.join("sessions.json");
219 let marker_path = base.join(LEGACY_IMPORT_MARKER_FILE);
220 let sessions_file_exists = sessions_file.exists();
221 let mut imported_legacy_sessions = false;
222 let mut sessions = if sessions_file_exists {
223 let raw = fs::read_to_string(&sessions_file).await?;
224 serde_json::from_str::<HashMap<String, Session>>(&raw).unwrap_or_default()
225 } else {
226 HashMap::new()
227 };
228
229 let mut marker_to_write = None;
230 if should_run_legacy_scan_on_startup(&marker_path, sessions_file_exists).await {
231 let base_for_scan = base.clone();
232 let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
233 .await
234 .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
235 if merge_legacy_sessions(&mut sessions, scan.sessions) {
236 imported_legacy_sessions = true;
237 }
238 marker_to_write = Some(LegacyImportMarker {
239 version: LEGACY_IMPORT_MARKER_VERSION,
240 created_at_ms: now_ms_u64(),
241 last_checked_at_ms: now_ms_u64(),
242 legacy_counts: scan.legacy_counts,
243 imported_counts: scan.imported_counts,
244 });
245 }
246
247 if hydrate_workspace_roots(&mut sessions) {
248 imported_legacy_sessions = true;
249 }
250 if repair_session_titles(&mut sessions) {
251 imported_legacy_sessions = true;
252 }
253 let metadata_file = base.join("session_meta.json");
254 let metadata = if metadata_file.exists() {
255 let raw = fs::read_to_string(&metadata_file).await?;
256 serde_json::from_str::<HashMap<String, SessionMeta>>(&raw).unwrap_or_default()
257 } else {
258 HashMap::new()
259 };
260 let questions_file = base.join("questions.json");
261 let question_requests = if questions_file.exists() {
262 let raw = fs::read_to_string(&questions_file).await?;
263 serde_json::from_str::<HashMap<String, QuestionRequest>>(&raw).unwrap_or_default()
264 } else {
265 HashMap::new()
266 };
267 let storage = Self {
268 base,
269 sessions: RwLock::new(sessions),
270 metadata: RwLock::new(metadata),
271 question_requests: RwLock::new(question_requests),
272 };
273
274 if imported_legacy_sessions {
275 storage.flush().await?;
276 }
277 if let Some(marker) = marker_to_write {
278 storage.write_legacy_import_marker(&marker).await?;
279 }
280
281 Ok(storage)
282 }
283
284 pub async fn list_sessions(&self) -> Vec<Session> {
285 self.list_sessions_scoped(SessionListScope::Global).await
286 }
287
288 pub async fn list_sessions_scoped(&self, scope: SessionListScope) -> Vec<Session> {
289 let all = self
290 .sessions
291 .read()
292 .await
293 .values()
294 .cloned()
295 .collect::<Vec<_>>();
296 match scope {
297 SessionListScope::Global => all,
298 SessionListScope::Workspace { workspace_root } => {
299 let Some(normalized_workspace) = normalize_workspace_path(&workspace_root) else {
300 return Vec::new();
301 };
302 all.into_iter()
303 .filter(|session| {
304 let direct = session
305 .workspace_root
306 .as_ref()
307 .and_then(|p| normalize_workspace_path(p))
308 .map(|p| p == normalized_workspace)
309 .unwrap_or(false);
310 if direct {
311 return true;
312 }
313 normalize_workspace_path(&session.directory)
314 .map(|p| p == normalized_workspace)
315 .unwrap_or(false)
316 })
317 .collect()
318 }
319 }
320 }
321
322 pub async fn get_session(&self, id: &str) -> Option<Session> {
323 self.sessions.read().await.get(id).cloned()
324 }
325
326 pub async fn save_session(&self, mut session: Session) -> anyhow::Result<()> {
327 if session.workspace_root.is_none() {
328 session.workspace_root = normalize_workspace_path(&session.directory);
329 }
330 let session_id = session.id.clone();
331 self.sessions
332 .write()
333 .await
334 .insert(session_id.clone(), session);
335 self.metadata
336 .write()
337 .await
338 .entry(session_id)
339 .or_insert_with(SessionMeta::default);
340 self.flush().await
341 }
342
343 pub async fn repair_sessions_from_file_store(&self) -> anyhow::Result<SessionRepairStats> {
344 let mut stats = SessionRepairStats::default();
345 let mut sessions = self.sessions.write().await;
346
347 for session in sessions.values_mut() {
348 let imported = load_legacy_session_messages(&self.base, &session.id);
349 if imported.is_empty() {
350 continue;
351 }
352
353 let (merged, merge_stats, changed) =
354 merge_session_messages(&session.messages, &imported);
355 if changed {
356 session.messages = merged;
357 session.time.updated =
358 most_recent_message_time(&session.messages).unwrap_or(session.time.updated);
359 stats.sessions_repaired += 1;
360 stats.messages_recovered += merge_stats.messages_recovered;
361 stats.parts_recovered += merge_stats.parts_recovered;
362 stats.conflicts_merged += merge_stats.conflicts_merged;
363 }
364 }
365
366 if stats.sessions_repaired > 0 {
367 drop(sessions);
368 self.flush().await?;
369 }
370
371 Ok(stats)
372 }
373
374 pub async fn run_legacy_storage_repair_scan(
375 &self,
376 force: bool,
377 ) -> anyhow::Result<LegacyRepairRunReport> {
378 let marker_path = self.base.join(LEGACY_IMPORT_MARKER_FILE);
379 let sessions_exists = self.base.join("sessions.json").exists();
380 let should_scan = if force {
381 true
382 } else {
383 should_run_legacy_scan_on_startup(&marker_path, sessions_exists).await
384 };
385 if !should_scan {
386 let marker = read_legacy_import_marker(&marker_path)
387 .await
388 .unwrap_or_else(|| LegacyImportMarker {
389 version: LEGACY_IMPORT_MARKER_VERSION,
390 created_at_ms: now_ms_u64(),
391 last_checked_at_ms: now_ms_u64(),
392 legacy_counts: LegacyTreeCounts::default(),
393 imported_counts: LegacyImportedCounts::default(),
394 });
395 return Ok(LegacyRepairRunReport {
396 status: "skipped".to_string(),
397 marker_updated: false,
398 sessions_merged: 0,
399 messages_recovered: 0,
400 parts_recovered: 0,
401 legacy_counts: marker.legacy_counts,
402 imported_counts: marker.imported_counts,
403 });
404 }
405
406 let base_for_scan = self.base.clone();
407 let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
408 .await
409 .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
410
411 let merge_stats = {
412 let mut sessions = self.sessions.write().await;
413 merge_legacy_sessions_with_stats(&mut sessions, scan.sessions)
414 };
415
416 if merge_stats.changed {
417 self.flush().await?;
418 }
419
420 let marker = LegacyImportMarker {
421 version: LEGACY_IMPORT_MARKER_VERSION,
422 created_at_ms: now_ms_u64(),
423 last_checked_at_ms: now_ms_u64(),
424 legacy_counts: scan.legacy_counts.clone(),
425 imported_counts: scan.imported_counts.clone(),
426 };
427 self.write_legacy_import_marker(&marker).await?;
428
429 Ok(LegacyRepairRunReport {
430 status: if merge_stats.changed {
431 "updated".to_string()
432 } else {
433 "no_changes".to_string()
434 },
435 marker_updated: true,
436 sessions_merged: merge_stats.sessions_merged,
437 messages_recovered: merge_stats.messages_recovered,
438 parts_recovered: merge_stats.parts_recovered,
439 legacy_counts: scan.legacy_counts,
440 imported_counts: scan.imported_counts,
441 })
442 }
443
444 pub async fn delete_session(&self, id: &str) -> anyhow::Result<bool> {
445 let removed = self.sessions.write().await.remove(id).is_some();
446 self.metadata.write().await.remove(id);
447 self.question_requests
448 .write()
449 .await
450 .retain(|_, request| request.session_id != id);
451 if removed {
452 self.flush().await?;
453 }
454 Ok(removed)
455 }
456
457 pub async fn append_message(&self, session_id: &str, msg: Message) -> anyhow::Result<()> {
458 let mut sessions = self.sessions.write().await;
459 let session = sessions
460 .get_mut(session_id)
461 .context("session not found for append_message")?;
462 let mut meta_guard = self.metadata.write().await;
463 snapshot_session_messages(session_id, session, &mut meta_guard);
464 session.messages.push(msg);
465 session.time.updated = Utc::now();
466 drop(sessions);
467 drop(meta_guard);
468 self.flush().await
469 }
470
471 pub async fn append_message_part(
472 &self,
473 session_id: &str,
474 message_id: &str,
475 part: MessagePart,
476 ) -> anyhow::Result<()> {
477 let mut sessions = self.sessions.write().await;
478 let session = sessions
479 .get_mut(session_id)
480 .context("session not found for append_message_part")?;
481 let mut meta_guard = self.metadata.write().await;
482 snapshot_session_messages(session_id, session, &mut meta_guard);
483 let message = if let Some(message) = session
484 .messages
485 .iter_mut()
486 .find(|message| message.id == message_id)
487 {
488 message
489 } else {
490 session
491 .messages
492 .iter_mut()
493 .rev()
494 .find(|message| matches!(message.role, MessageRole::User))
495 .context("message not found for append_message_part")?
496 };
497 merge_message_part(message, part);
498 session.time.updated = Utc::now();
499 drop(sessions);
500 drop(meta_guard);
501 self.flush().await
502 }
503
504 pub async fn fork_session(&self, id: &str) -> anyhow::Result<Option<Session>> {
505 let source = {
506 let sessions = self.sessions.read().await;
507 sessions.get(id).cloned()
508 };
509 let Some(mut child) = source else {
510 return Ok(None);
511 };
512
513 child.id = Uuid::new_v4().to_string();
514 child.title = format!("{} (fork)", child.title);
515 child.time.created = Utc::now();
516 child.time.updated = child.time.created;
517 child.slug = None;
518
519 self.sessions
520 .write()
521 .await
522 .insert(child.id.clone(), child.clone());
523 self.metadata.write().await.insert(
524 child.id.clone(),
525 SessionMeta {
526 parent_id: Some(id.to_string()),
527 snapshots: vec![child.messages.clone()],
528 ..SessionMeta::default()
529 },
530 );
531 self.flush().await?;
532 Ok(Some(child))
533 }
534
535 pub async fn revert_session(&self, id: &str) -> anyhow::Result<bool> {
536 let mut sessions = self.sessions.write().await;
537 let Some(session) = sessions.get_mut(id) else {
538 return Ok(false);
539 };
540 let mut metadata = self.metadata.write().await;
541 let meta = metadata
542 .entry(id.to_string())
543 .or_insert_with(SessionMeta::default);
544 let Some(snapshot) = meta.snapshots.pop() else {
545 return Ok(false);
546 };
547 meta.pre_revert = Some(session.messages.clone());
548 session.messages = snapshot;
549 session.time.updated = Utc::now();
550 drop(metadata);
551 drop(sessions);
552 self.flush().await?;
553 Ok(true)
554 }
555
556 pub async fn unrevert_session(&self, id: &str) -> anyhow::Result<bool> {
557 let mut sessions = self.sessions.write().await;
558 let Some(session) = sessions.get_mut(id) else {
559 return Ok(false);
560 };
561 let mut metadata = self.metadata.write().await;
562 let Some(meta) = metadata.get_mut(id) else {
563 return Ok(false);
564 };
565 let Some(previous) = meta.pre_revert.take() else {
566 return Ok(false);
567 };
568 meta.snapshots.push(session.messages.clone());
569 session.messages = previous;
570 session.time.updated = Utc::now();
571 drop(metadata);
572 drop(sessions);
573 self.flush().await?;
574 Ok(true)
575 }
576
577 pub async fn set_shared(&self, id: &str, shared: bool) -> anyhow::Result<Option<String>> {
578 let mut metadata = self.metadata.write().await;
579 let meta = metadata
580 .entry(id.to_string())
581 .or_insert_with(SessionMeta::default);
582 meta.shared = shared;
583 if shared {
584 if meta.share_id.is_none() {
585 meta.share_id = Some(Uuid::new_v4().to_string());
586 }
587 } else {
588 meta.share_id = None;
589 }
590 let share_id = meta.share_id.clone();
591 drop(metadata);
592 self.flush().await?;
593 Ok(share_id)
594 }
595
596 pub async fn set_archived(&self, id: &str, archived: bool) -> anyhow::Result<bool> {
597 let mut metadata = self.metadata.write().await;
598 let meta = metadata
599 .entry(id.to_string())
600 .or_insert_with(SessionMeta::default);
601 meta.archived = archived;
602 drop(metadata);
603 self.flush().await?;
604 Ok(true)
605 }
606
607 pub async fn set_summary(&self, id: &str, summary: String) -> anyhow::Result<bool> {
608 let mut metadata = self.metadata.write().await;
609 let meta = metadata
610 .entry(id.to_string())
611 .or_insert_with(SessionMeta::default);
612 meta.summary = Some(summary);
613 drop(metadata);
614 self.flush().await?;
615 Ok(true)
616 }
617
618 pub async fn children(&self, parent_id: &str) -> Vec<Session> {
619 let child_ids = {
620 let metadata = self.metadata.read().await;
621 metadata
622 .iter()
623 .filter(|(_, meta)| meta.parent_id.as_deref() == Some(parent_id))
624 .map(|(id, _)| id.clone())
625 .collect::<Vec<_>>()
626 };
627 let sessions = self.sessions.read().await;
628 child_ids
629 .into_iter()
630 .filter_map(|id| sessions.get(&id).cloned())
631 .collect()
632 }
633
634 pub async fn session_status(&self, id: &str) -> Option<Value> {
635 let metadata = self.metadata.read().await;
636 metadata.get(id).map(|meta| {
637 json!({
638 "archived": meta.archived,
639 "shared": meta.shared,
640 "parentID": meta.parent_id,
641 "snapshotCount": meta.snapshots.len()
642 })
643 })
644 }
645
646 pub async fn session_diff(&self, id: &str) -> Option<Value> {
647 let sessions = self.sessions.read().await;
648 let current = sessions.get(id)?;
649 let metadata = self.metadata.read().await;
650 let default = SessionMeta::default();
651 let meta = metadata.get(id).unwrap_or(&default);
652 let last_snapshot_len = meta.snapshots.last().map(|s| s.len()).unwrap_or(0);
653 Some(json!({
654 "sessionID": id,
655 "currentMessageCount": current.messages.len(),
656 "lastSnapshotMessageCount": last_snapshot_len,
657 "delta": current.messages.len() as i64 - last_snapshot_len as i64
658 }))
659 }
660
661 pub async fn set_todos(&self, id: &str, todos: Vec<Value>) -> anyhow::Result<()> {
662 let mut metadata = self.metadata.write().await;
663 let meta = metadata
664 .entry(id.to_string())
665 .or_insert_with(SessionMeta::default);
666 meta.todos = normalize_todo_items(todos);
667 drop(metadata);
668 self.flush().await
669 }
670
671 pub async fn get_todos(&self, id: &str) -> Vec<Value> {
672 let todos = self
673 .metadata
674 .read()
675 .await
676 .get(id)
677 .map(|meta| meta.todos.clone())
678 .unwrap_or_default();
679 normalize_todo_items(todos)
680 }
681
682 pub async fn add_question_request(
683 &self,
684 session_id: &str,
685 message_id: &str,
686 questions: Vec<Value>,
687 ) -> anyhow::Result<QuestionRequest> {
688 if questions.is_empty() {
689 return Err(anyhow::anyhow!(
690 "cannot add empty question request for session {}",
691 session_id
692 ));
693 }
694 let request = QuestionRequest {
695 id: format!("q-{}", Uuid::new_v4()),
696 session_id: session_id.to_string(),
697 questions,
698 tool: Some(QuestionToolRef {
699 call_id: format!("call-{}", Uuid::new_v4()),
700 message_id: message_id.to_string(),
701 }),
702 };
703 self.question_requests
704 .write()
705 .await
706 .insert(request.id.clone(), request.clone());
707 self.flush().await?;
708 Ok(request)
709 }
710
711 pub async fn list_question_requests(&self) -> Vec<QuestionRequest> {
712 self.question_requests
713 .read()
714 .await
715 .values()
716 .cloned()
717 .collect()
718 }
719
720 pub async fn reply_question(&self, request_id: &str) -> anyhow::Result<bool> {
721 let removed = self
722 .question_requests
723 .write()
724 .await
725 .remove(request_id)
726 .is_some();
727 if removed {
728 self.flush().await?;
729 }
730 Ok(removed)
731 }
732
733 pub async fn reject_question(&self, request_id: &str) -> anyhow::Result<bool> {
734 self.reply_question(request_id).await
735 }
736
737 pub async fn attach_session_to_workspace(
738 &self,
739 session_id: &str,
740 target_workspace: &str,
741 reason_tag: &str,
742 ) -> anyhow::Result<Option<Session>> {
743 let Some(target_workspace) = normalize_workspace_path(target_workspace) else {
744 return Ok(None);
745 };
746 let mut sessions = self.sessions.write().await;
747 let Some(session) = sessions.get_mut(session_id) else {
748 return Ok(None);
749 };
750 let previous_workspace = session
751 .workspace_root
752 .clone()
753 .or_else(|| normalize_workspace_path(&session.directory));
754
755 if session.origin_workspace_root.is_none() {
756 session.origin_workspace_root = previous_workspace.clone();
757 }
758 session.attached_from_workspace = previous_workspace;
759 session.attached_to_workspace = Some(target_workspace.clone());
760 session.attach_timestamp_ms = Some(Utc::now().timestamp_millis().max(0) as u64);
761 session.attach_reason = Some(reason_tag.trim().to_string());
762 session.workspace_root = Some(target_workspace.clone());
763 session.project_id = workspace_project_id(&target_workspace);
764 session.directory = target_workspace;
765 session.time.updated = Utc::now();
766 let updated = session.clone();
767 drop(sessions);
768 self.flush().await?;
769 Ok(Some(updated))
770 }
771
772 async fn flush(&self) -> anyhow::Result<()> {
773 let snapshot = self.sessions.read().await.clone();
774 let payload = serde_json::to_string_pretty(&snapshot)?;
775 fs::write(self.base.join("sessions.json"), payload).await?;
776 let metadata_snapshot = self.metadata.read().await.clone();
777 let metadata_payload = serde_json::to_string_pretty(&metadata_snapshot)?;
778 fs::write(self.base.join("session_meta.json"), metadata_payload).await?;
779 let questions_snapshot = self.question_requests.read().await.clone();
780 let questions_payload = serde_json::to_string_pretty(&questions_snapshot)?;
781 fs::write(self.base.join("questions.json"), questions_payload).await?;
782 Ok(())
783 }
784
785 async fn write_legacy_import_marker(&self, marker: &LegacyImportMarker) -> anyhow::Result<()> {
786 let payload = serde_json::to_string_pretty(marker)?;
787 fs::write(self.base.join(LEGACY_IMPORT_MARKER_FILE), payload).await?;
788 Ok(())
789 }
790}
791
792fn normalize_todo_items(items: Vec<Value>) -> Vec<Value> {
793 items
794 .into_iter()
795 .filter_map(|item| {
796 let obj = item.as_object()?;
797 let content = obj
798 .get("content")
799 .and_then(|v| v.as_str())
800 .or_else(|| obj.get("text").and_then(|v| v.as_str()))
801 .unwrap_or("")
802 .trim()
803 .to_string();
804 if content.is_empty() {
805 return None;
806 }
807 let id = obj
808 .get("id")
809 .and_then(|v| v.as_str())
810 .filter(|s| !s.trim().is_empty())
811 .map(ToString::to_string)
812 .unwrap_or_else(|| format!("todo-{}", Uuid::new_v4()));
813 let status = obj
814 .get("status")
815 .and_then(|v| v.as_str())
816 .filter(|s| !s.trim().is_empty())
817 .map(ToString::to_string)
818 .unwrap_or_else(|| "pending".to_string());
819 Some(json!({
820 "id": id,
821 "content": content,
822 "status": status
823 }))
824 })
825 .collect()
826}
827
828#[derive(Debug)]
829struct LegacyScanResult {
830 sessions: HashMap<String, Session>,
831 legacy_counts: LegacyTreeCounts,
832 imported_counts: LegacyImportedCounts,
833}
834
835#[derive(Debug, Default)]
836struct LegacyMergeStats {
837 changed: bool,
838 sessions_merged: u64,
839 messages_recovered: u64,
840 parts_recovered: u64,
841}
842
843fn now_ms_u64() -> u64 {
844 Utc::now().timestamp_millis().max(0) as u64
845}
846
847async fn should_run_legacy_scan_on_startup(marker_path: &Path, sessions_exist: bool) -> bool {
848 if !sessions_exist {
849 return true;
850 }
851 if read_legacy_import_marker(marker_path).await.is_none() {
854 return false;
855 }
856 false
857}
858
859async fn read_legacy_import_marker(marker_path: &Path) -> Option<LegacyImportMarker> {
860 let raw = fs::read_to_string(marker_path).await.ok()?;
861 serde_json::from_str::<LegacyImportMarker>(&raw).ok()
862}
863
864fn scan_legacy_sessions(base: &Path) -> anyhow::Result<LegacyScanResult> {
865 let sessions = load_legacy_opencode_sessions(base).unwrap_or_default();
866 let imported_counts = LegacyImportedCounts {
867 sessions: sessions.len() as u64,
868 messages: sessions.values().map(|s| s.messages.len() as u64).sum(),
869 parts: sessions
870 .values()
871 .flat_map(|s| s.messages.iter())
872 .map(|m| m.parts.len() as u64)
873 .sum(),
874 };
875 let legacy_counts = LegacyTreeCounts {
876 session_files: count_legacy_json_files(&base.join("session")),
877 message_files: count_legacy_json_files(&base.join("message")),
878 part_files: count_legacy_json_files(&base.join("part")),
879 };
880 Ok(LegacyScanResult {
881 sessions,
882 legacy_counts,
883 imported_counts,
884 })
885}
886
887fn count_legacy_json_files(root: &Path) -> u64 {
888 if !root.is_dir() {
889 return 0;
890 }
891 let mut count = 0u64;
892 let mut stack = vec![root.to_path_buf()];
893 while let Some(dir) = stack.pop() {
894 if let Ok(entries) = std::fs::read_dir(&dir) {
895 for entry in entries.flatten() {
896 let path = entry.path();
897 if entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
898 stack.push(path);
899 continue;
900 }
901 if path.extension().and_then(|ext| ext.to_str()) == Some("json") {
902 count += 1;
903 }
904 }
905 }
906 }
907 count
908}
909
910fn merge_legacy_sessions(
911 current: &mut HashMap<String, Session>,
912 imported: HashMap<String, Session>,
913) -> bool {
914 merge_legacy_sessions_with_stats(current, imported).changed
915}
916
917fn merge_legacy_sessions_with_stats(
918 current: &mut HashMap<String, Session>,
919 imported: HashMap<String, Session>,
920) -> LegacyMergeStats {
921 let mut stats = LegacyMergeStats::default();
922 for (id, legacy) in imported {
923 let legacy_message_count = legacy.messages.len() as u64;
924 let legacy_part_count = legacy
925 .messages
926 .iter()
927 .map(|m| m.parts.len() as u64)
928 .sum::<u64>();
929 match current.get_mut(&id) {
930 None => {
931 current.insert(id, legacy);
932 stats.changed = true;
933 stats.sessions_merged += 1;
934 stats.messages_recovered += legacy_message_count;
935 stats.parts_recovered += legacy_part_count;
936 }
937 Some(existing) => {
938 let should_merge_messages =
939 existing.messages.is_empty() && !legacy.messages.is_empty();
940 let should_fill_title =
941 existing.title.trim().is_empty() && !legacy.title.trim().is_empty();
942 let should_fill_directory = (existing.directory.trim().is_empty()
943 || existing.directory.trim() == "."
944 || existing.directory.trim() == "./"
945 || existing.directory.trim() == ".\\")
946 && !legacy.directory.trim().is_empty();
947 let should_fill_workspace =
948 existing.workspace_root.is_none() && legacy.workspace_root.is_some();
949 if should_merge_messages {
950 existing.messages = legacy.messages.clone();
951 }
952 if should_fill_title {
953 existing.title = legacy.title.clone();
954 }
955 if should_fill_directory {
956 existing.directory = legacy.directory.clone();
957 }
958 if should_fill_workspace {
959 existing.workspace_root = legacy.workspace_root.clone();
960 }
961 if should_merge_messages
962 || should_fill_title
963 || should_fill_directory
964 || should_fill_workspace
965 {
966 stats.changed = true;
967 if should_merge_messages {
968 stats.sessions_merged += 1;
969 stats.messages_recovered += legacy_message_count;
970 stats.parts_recovered += legacy_part_count;
971 }
972 }
973 }
974 }
975 }
976 stats
977}
978
979fn hydrate_workspace_roots(sessions: &mut HashMap<String, Session>) -> bool {
980 let mut changed = false;
981 for session in sessions.values_mut() {
982 if session.workspace_root.is_none() {
983 let normalized = normalize_workspace_path(&session.directory);
984 if normalized.is_some() {
985 session.workspace_root = normalized;
986 changed = true;
987 }
988 }
989 }
990 changed
991}
992
993fn repair_session_titles(sessions: &mut HashMap<String, Session>) -> bool {
994 let mut changed = false;
995 for session in sessions.values_mut() {
996 if !title_needs_repair(&session.title) {
997 continue;
998 }
999 let first_user_text = session.messages.iter().find_map(|message| {
1000 if !matches!(message.role, MessageRole::User) {
1001 return None;
1002 }
1003 message.parts.iter().find_map(|part| match part {
1004 MessagePart::Text { text } if !text.trim().is_empty() => Some(text.as_str()),
1005 _ => None,
1006 })
1007 });
1008 let Some(source) = first_user_text else {
1009 continue;
1010 };
1011 let Some(derived) = derive_session_title_from_prompt(source, 60) else {
1012 continue;
1013 };
1014 if derived == session.title {
1015 continue;
1016 }
1017 session.title = derived;
1018 session.time.updated = Utc::now();
1019 changed = true;
1020 }
1021 changed
1022}
1023
1024#[derive(Debug, Deserialize)]
1025struct LegacySessionTime {
1026 created: i64,
1027 updated: i64,
1028}
1029
1030#[derive(Debug, Deserialize)]
1031struct LegacySession {
1032 id: String,
1033 slug: Option<String>,
1034 version: Option<String>,
1035 #[serde(rename = "projectID")]
1036 project_id: Option<String>,
1037 title: Option<String>,
1038 directory: Option<String>,
1039 time: LegacySessionTime,
1040}
1041
1042fn load_legacy_opencode_sessions(base: &Path) -> anyhow::Result<HashMap<String, Session>> {
1043 let legacy_root = base.join("session");
1044 if !legacy_root.is_dir() {
1045 return Ok(HashMap::new());
1046 }
1047
1048 let mut out = HashMap::new();
1049 let mut stack = vec![legacy_root];
1050 while let Some(dir) = stack.pop() {
1051 for entry in std::fs::read_dir(&dir)? {
1052 let entry = entry?;
1053 let path = entry.path();
1054 if entry.file_type()?.is_dir() {
1055 stack.push(path);
1056 continue;
1057 }
1058 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1059 continue;
1060 }
1061 let raw = match std::fs::read_to_string(&path) {
1062 Ok(v) => v,
1063 Err(_) => continue,
1064 };
1065 let legacy = match serde_json::from_str::<LegacySession>(&raw) {
1066 Ok(v) => v,
1067 Err(_) => continue,
1068 };
1069 let created = Utc
1070 .timestamp_millis_opt(legacy.time.created)
1071 .single()
1072 .unwrap_or_else(Utc::now);
1073 let updated = Utc
1074 .timestamp_millis_opt(legacy.time.updated)
1075 .single()
1076 .unwrap_or(created);
1077
1078 let session_id = legacy.id.clone();
1079 out.insert(
1080 session_id.clone(),
1081 Session {
1082 id: session_id.clone(),
1083 slug: legacy.slug,
1084 version: legacy.version,
1085 project_id: legacy.project_id,
1086 title: legacy
1087 .title
1088 .filter(|s| !s.trim().is_empty())
1089 .unwrap_or_else(|| "New session".to_string()),
1090 directory: legacy
1091 .directory
1092 .clone()
1093 .filter(|s| !s.trim().is_empty())
1094 .unwrap_or_else(|| ".".to_string()),
1095 workspace_root: legacy
1096 .directory
1097 .as_deref()
1098 .and_then(normalize_workspace_path),
1099 origin_workspace_root: None,
1100 attached_from_workspace: None,
1101 attached_to_workspace: None,
1102 attach_timestamp_ms: None,
1103 attach_reason: None,
1104 time: tandem_types::SessionTime { created, updated },
1105 model: None,
1106 provider: None,
1107 environment: None,
1108 messages: load_legacy_session_messages(base, &session_id),
1109 },
1110 );
1111 }
1112 }
1113 Ok(out)
1114}
1115
1116#[derive(Debug, Deserialize)]
1117struct LegacyMessageTime {
1118 created: i64,
1119}
1120
1121#[derive(Debug, Deserialize)]
1122struct LegacyMessage {
1123 id: String,
1124 role: String,
1125 time: LegacyMessageTime,
1126}
1127
1128#[derive(Debug, Deserialize)]
1129struct LegacyPart {
1130 #[serde(rename = "type")]
1131 part_type: Option<String>,
1132 text: Option<String>,
1133 tool: Option<String>,
1134 args: Option<Value>,
1135 result: Option<Value>,
1136 error: Option<String>,
1137}
1138
1139fn load_legacy_session_messages(base: &Path, session_id: &str) -> Vec<Message> {
1140 let msg_dir = base.join("message").join(session_id);
1141 if !msg_dir.is_dir() {
1142 return Vec::new();
1143 }
1144
1145 let mut legacy_messages: Vec<(i64, Message)> = Vec::new();
1146
1147 let Ok(entries) = std::fs::read_dir(&msg_dir) else {
1148 return Vec::new();
1149 };
1150
1151 for entry in entries.flatten() {
1152 let path = entry.path();
1153 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1154 continue;
1155 }
1156 let Ok(raw) = std::fs::read_to_string(&path) else {
1157 continue;
1158 };
1159 let Ok(legacy) = serde_json::from_str::<LegacyMessage>(&raw) else {
1160 continue;
1161 };
1162
1163 let created_at = Utc
1164 .timestamp_millis_opt(legacy.time.created)
1165 .single()
1166 .unwrap_or_else(Utc::now);
1167
1168 legacy_messages.push((
1169 legacy.time.created,
1170 Message {
1171 id: legacy.id.clone(),
1172 role: legacy_role_to_message_role(&legacy.role),
1173 parts: load_legacy_message_parts(base, &legacy.id),
1174 created_at,
1175 },
1176 ));
1177 }
1178
1179 legacy_messages.sort_by_key(|(created_ms, _)| *created_ms);
1180 legacy_messages.into_iter().map(|(_, msg)| msg).collect()
1181}
1182
1183fn load_legacy_message_parts(base: &Path, message_id: &str) -> Vec<MessagePart> {
1184 let parts_dir = base.join("part").join(message_id);
1185 if !parts_dir.is_dir() {
1186 return Vec::new();
1187 }
1188
1189 let Ok(entries) = std::fs::read_dir(&parts_dir) else {
1190 return Vec::new();
1191 };
1192
1193 let mut out = Vec::new();
1194 for entry in entries.flatten() {
1195 let path = entry.path();
1196 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1197 continue;
1198 }
1199 let Ok(raw) = std::fs::read_to_string(&path) else {
1200 continue;
1201 };
1202 let Ok(part) = serde_json::from_str::<LegacyPart>(&raw) else {
1203 continue;
1204 };
1205
1206 let mapped = if let Some(tool) = part.tool {
1207 Some(MessagePart::ToolInvocation {
1208 tool,
1209 args: part.args.unwrap_or_else(|| json!({})),
1210 result: part.result,
1211 error: part.error,
1212 })
1213 } else {
1214 match part.part_type.as_deref() {
1215 Some("reasoning") => Some(MessagePart::Reasoning {
1216 text: part.text.unwrap_or_default(),
1217 }),
1218 Some("tool") => Some(MessagePart::ToolInvocation {
1219 tool: "tool".to_string(),
1220 args: part.args.unwrap_or_else(|| json!({})),
1221 result: part.result,
1222 error: part.error,
1223 }),
1224 Some("text") | None => Some(MessagePart::Text {
1225 text: part.text.unwrap_or_default(),
1226 }),
1227 _ => None,
1228 }
1229 };
1230
1231 if let Some(part) = mapped {
1232 out.push(part);
1233 }
1234 }
1235 out
1236}
1237
1238fn legacy_role_to_message_role(role: &str) -> MessageRole {
1239 match role.to_lowercase().as_str() {
1240 "user" => MessageRole::User,
1241 "assistant" => MessageRole::Assistant,
1242 "system" => MessageRole::System,
1243 "tool" => MessageRole::Tool,
1244 _ => MessageRole::Assistant,
1245 }
1246}
1247
1248#[derive(Debug, Clone, Default)]
1249struct MessageMergeStats {
1250 messages_recovered: u64,
1251 parts_recovered: u64,
1252 conflicts_merged: u64,
1253}
1254
1255fn message_richness(msg: &Message) -> usize {
1256 msg.parts
1257 .iter()
1258 .map(|p| match p {
1259 MessagePart::Text { text } | MessagePart::Reasoning { text } => {
1260 if text.trim().is_empty() {
1261 0
1262 } else {
1263 1
1264 }
1265 }
1266 MessagePart::ToolInvocation { result, error, .. } => {
1267 if result.is_some() || error.is_some() {
1268 2
1269 } else {
1270 1
1271 }
1272 }
1273 })
1274 .sum()
1275}
1276
1277fn most_recent_message_time(messages: &[Message]) -> Option<chrono::DateTime<Utc>> {
1278 messages.iter().map(|m| m.created_at).max()
1279}
1280
1281fn merge_session_messages(
1282 existing: &[Message],
1283 imported: &[Message],
1284) -> (Vec<Message>, MessageMergeStats, bool) {
1285 if existing.is_empty() {
1286 let messages_recovered = imported.len() as u64;
1287 let parts_recovered = imported.iter().map(|m| m.parts.len() as u64).sum();
1288 return (
1289 imported.to_vec(),
1290 MessageMergeStats {
1291 messages_recovered,
1292 parts_recovered,
1293 conflicts_merged: 0,
1294 },
1295 true,
1296 );
1297 }
1298
1299 let mut merged_by_id: HashMap<String, Message> = existing
1300 .iter()
1301 .cloned()
1302 .map(|m| (m.id.clone(), m))
1303 .collect();
1304 let mut stats = MessageMergeStats::default();
1305 let mut changed = false;
1306
1307 for incoming in imported {
1308 match merged_by_id.get(&incoming.id) {
1309 None => {
1310 merged_by_id.insert(incoming.id.clone(), incoming.clone());
1311 stats.messages_recovered += 1;
1312 stats.parts_recovered += incoming.parts.len() as u64;
1313 changed = true;
1314 }
1315 Some(current) => {
1316 let incoming_richer = message_richness(incoming) > message_richness(current)
1317 || incoming.parts.len() > current.parts.len();
1318 if incoming_richer {
1319 merged_by_id.insert(incoming.id.clone(), incoming.clone());
1320 stats.conflicts_merged += 1;
1321 changed = true;
1322 }
1323 }
1324 }
1325 }
1326
1327 let mut out: Vec<Message> = merged_by_id.into_values().collect();
1328 out.sort_by_key(|m| m.created_at);
1329 (out, stats, changed)
1330}
1331
1332#[cfg(test)]
1333mod tests {
1334 use super::*;
1335 use std::fs as stdfs;
1336
1337 #[tokio::test]
1338 async fn todos_are_normalized_to_wire_shape() {
1339 let base = std::env::temp_dir().join(format!("tandem-core-test-{}", Uuid::new_v4()));
1340 let storage = Storage::new(&base).await.expect("storage");
1341 let session = Session::new(Some("test".to_string()), Some(".".to_string()));
1342 let id = session.id.clone();
1343 storage.save_session(session).await.expect("save session");
1344
1345 storage
1346 .set_todos(
1347 &id,
1348 vec![
1349 json!({"content":"first"}),
1350 json!({"text":"second", "status":"in_progress"}),
1351 json!({"id":"keep-id","content":"third","status":"completed"}),
1352 ],
1353 )
1354 .await
1355 .expect("set todos");
1356
1357 let todos = storage.get_todos(&id).await;
1358 assert_eq!(todos.len(), 3);
1359 for todo in todos {
1360 assert!(todo.get("id").and_then(|v| v.as_str()).is_some());
1361 assert!(todo.get("content").and_then(|v| v.as_str()).is_some());
1362 assert!(todo.get("status").and_then(|v| v.as_str()).is_some());
1363 }
1364 }
1365
1366 #[tokio::test]
1367 async fn imports_legacy_opencode_session_index_when_sessions_json_missing() {
1368 let base =
1369 std::env::temp_dir().join(format!("tandem-core-legacy-import-{}", Uuid::new_v4()));
1370 let legacy_session_dir = base.join("session").join("global");
1371 stdfs::create_dir_all(&legacy_session_dir).expect("legacy session dir");
1372 stdfs::write(
1373 legacy_session_dir.join("ses_test.json"),
1374 r#"{
1375 "id": "ses_test",
1376 "slug": "test",
1377 "version": "1.0.0",
1378 "projectID": "proj_1",
1379 "directory": "C:\\work\\demo",
1380 "title": "Legacy Session",
1381 "time": { "created": 1770913145613, "updated": 1770913146613 }
1382}"#,
1383 )
1384 .expect("legacy session write");
1385
1386 let storage = Storage::new(&base).await.expect("storage");
1387 let sessions = storage.list_sessions().await;
1388 assert_eq!(sessions.len(), 1);
1389 assert_eq!(sessions[0].id, "ses_test");
1390 assert_eq!(sessions[0].title, "Legacy Session");
1391 assert!(base.join("sessions.json").exists());
1392 }
1393
1394 #[tokio::test]
1395 async fn imports_legacy_messages_and_parts_for_session() {
1396 let base = std::env::temp_dir().join(format!("tandem-core-legacy-msg-{}", Uuid::new_v4()));
1397 let session_dir = base.join("session").join("global");
1398 let message_dir = base.join("message").join("ses_test");
1399 let part_dir = base.join("part").join("msg_1");
1400 stdfs::create_dir_all(&session_dir).expect("session dir");
1401 stdfs::create_dir_all(&message_dir).expect("message dir");
1402 stdfs::create_dir_all(&part_dir).expect("part dir");
1403
1404 stdfs::write(
1405 session_dir.join("ses_test.json"),
1406 r#"{
1407 "id": "ses_test",
1408 "projectID": "proj_1",
1409 "directory": "C:\\work\\demo",
1410 "title": "Legacy Session",
1411 "time": { "created": 1770913145613, "updated": 1770913146613 }
1412}"#,
1413 )
1414 .expect("write session");
1415
1416 stdfs::write(
1417 message_dir.join("msg_1.json"),
1418 r#"{
1419 "id": "msg_1",
1420 "sessionID": "ses_test",
1421 "role": "assistant",
1422 "time": { "created": 1770913145613 }
1423}"#,
1424 )
1425 .expect("write msg");
1426
1427 stdfs::write(
1428 part_dir.join("prt_1.json"),
1429 r#"{
1430 "id": "prt_1",
1431 "sessionID": "ses_test",
1432 "messageID": "msg_1",
1433 "type": "text",
1434 "text": "hello from legacy"
1435}"#,
1436 )
1437 .expect("write part");
1438
1439 let storage = Storage::new(&base).await.expect("storage");
1440 let sessions = storage.list_sessions().await;
1441 assert_eq!(sessions.len(), 1);
1442 assert_eq!(sessions[0].messages.len(), 1);
1443 assert_eq!(sessions[0].messages[0].parts.len(), 1);
1444 }
1445
1446 #[tokio::test]
1447 async fn skips_legacy_merge_when_sessions_json_exists() {
1448 let base =
1449 std::env::temp_dir().join(format!("tandem-core-legacy-merge-{}", Uuid::new_v4()));
1450 stdfs::create_dir_all(&base).expect("base");
1451 stdfs::write(
1452 base.join("sessions.json"),
1453 r#"{
1454 "ses_current": {
1455 "id": "ses_current",
1456 "slug": null,
1457 "version": "v1",
1458 "project_id": null,
1459 "title": "Current Session",
1460 "directory": ".",
1461 "workspace_root": null,
1462 "origin_workspace_root": null,
1463 "attached_from_workspace": null,
1464 "attached_to_workspace": null,
1465 "attach_timestamp_ms": null,
1466 "attach_reason": null,
1467 "time": {"created":"2026-01-01T00:00:00Z","updated":"2026-01-01T00:00:00Z"},
1468 "model": null,
1469 "provider": null,
1470 "messages": []
1471 }
1472}"#,
1473 )
1474 .expect("sessions.json");
1475
1476 let legacy_session_dir = base.join("session").join("global");
1477 stdfs::create_dir_all(&legacy_session_dir).expect("legacy session dir");
1478 stdfs::write(
1479 legacy_session_dir.join("ses_legacy.json"),
1480 r#"{
1481 "id": "ses_legacy",
1482 "slug": "legacy",
1483 "version": "1.0.0",
1484 "projectID": "proj_legacy",
1485 "directory": "C:\\work\\legacy",
1486 "title": "Legacy Session",
1487 "time": { "created": 1770913145613, "updated": 1770913146613 }
1488}"#,
1489 )
1490 .expect("legacy session write");
1491
1492 let storage = Storage::new(&base).await.expect("storage");
1493 let sessions = storage.list_sessions().await;
1494 let ids = sessions.iter().map(|s| s.id.clone()).collect::<Vec<_>>();
1495 assert!(ids.contains(&"ses_current".to_string()));
1496 assert!(!ids.contains(&"ses_legacy".to_string()));
1497 }
1498
1499 #[tokio::test]
1500 async fn list_sessions_scoped_filters_by_workspace_root() {
1501 let base = std::env::temp_dir().join(format!("tandem-core-scope-{}", Uuid::new_v4()));
1502 let storage = Storage::new(&base).await.expect("storage");
1503 let ws_a = base.join("ws-a");
1504 let ws_b = base.join("ws-b");
1505 stdfs::create_dir_all(&ws_a).expect("ws_a");
1506 stdfs::create_dir_all(&ws_b).expect("ws_b");
1507 let ws_a_str = ws_a.to_string_lossy().to_string();
1508 let ws_b_str = ws_b.to_string_lossy().to_string();
1509
1510 let mut a = Session::new(Some("a".to_string()), Some(ws_a_str.clone()));
1511 a.workspace_root = Some(ws_a_str.clone());
1512 storage.save_session(a).await.expect("save a");
1513
1514 let mut b = Session::new(Some("b".to_string()), Some(ws_b_str.clone()));
1515 b.workspace_root = Some(ws_b_str);
1516 storage.save_session(b).await.expect("save b");
1517
1518 let scoped = storage
1519 .list_sessions_scoped(SessionListScope::Workspace {
1520 workspace_root: ws_a_str,
1521 })
1522 .await;
1523 assert_eq!(scoped.len(), 1);
1524 assert_eq!(scoped[0].title, "a");
1525 }
1526
1527 #[tokio::test]
1528 async fn attach_session_persists_audit_metadata() {
1529 let base = std::env::temp_dir().join(format!("tandem-core-attach-{}", Uuid::new_v4()));
1530 let storage = Storage::new(&base).await.expect("storage");
1531 let ws_a = base.join("ws-a");
1532 let ws_b = base.join("ws-b");
1533 stdfs::create_dir_all(&ws_a).expect("ws_a");
1534 stdfs::create_dir_all(&ws_b).expect("ws_b");
1535 let ws_a_str = ws_a.to_string_lossy().to_string();
1536 let ws_b_str = ws_b.to_string_lossy().to_string();
1537 let mut session = Session::new(Some("s".to_string()), Some(ws_a_str.clone()));
1538 session.workspace_root = Some(ws_a_str);
1539 let id = session.id.clone();
1540 storage.save_session(session).await.expect("save");
1541
1542 let updated = storage
1543 .attach_session_to_workspace(&id, &ws_b_str, "manual")
1544 .await
1545 .expect("attach")
1546 .expect("session exists");
1547 let normalized_expected = normalize_workspace_path(&ws_b_str).expect("normalized path");
1548 assert_eq!(
1549 updated.workspace_root.as_deref(),
1550 Some(normalized_expected.as_str())
1551 );
1552 assert_eq!(
1553 updated.attached_to_workspace.as_deref(),
1554 Some(normalized_expected.as_str())
1555 );
1556 assert_eq!(updated.attach_reason.as_deref(), Some("manual"));
1557 assert!(updated.attach_timestamp_ms.is_some());
1558 }
1559
1560 #[tokio::test]
1561 async fn append_message_part_persists_tool_invocation_and_result() {
1562 let base = std::env::temp_dir().join(format!("tandem-core-tool-parts-{}", Uuid::new_v4()));
1563 let storage = Storage::new(&base).await.expect("storage");
1564 let session = Session::new(Some("tool parts".to_string()), Some(".".to_string()));
1565 let session_id = session.id.clone();
1566 storage.save_session(session).await.expect("save session");
1567
1568 let user = Message::new(
1569 MessageRole::User,
1570 vec![MessagePart::Text {
1571 text: "build ui".to_string(),
1572 }],
1573 );
1574 let message_id = user.id.clone();
1575 storage
1576 .append_message(&session_id, user)
1577 .await
1578 .expect("append user");
1579
1580 storage
1581 .append_message_part(
1582 &session_id,
1583 &message_id,
1584 MessagePart::ToolInvocation {
1585 tool: "write".to_string(),
1586 args: json!({"path":"game.html","content":"<html></html>"}),
1587 result: None,
1588 error: None,
1589 },
1590 )
1591 .await
1592 .expect("append invocation");
1593 storage
1594 .append_message_part(
1595 &session_id,
1596 &message_id,
1597 MessagePart::ToolInvocation {
1598 tool: "write".to_string(),
1599 args: json!({}),
1600 result: Some(json!("ok")),
1601 error: None,
1602 },
1603 )
1604 .await
1605 .expect("append result");
1606
1607 let session = storage.get_session(&session_id).await.expect("session");
1608 let message = session
1609 .messages
1610 .iter()
1611 .find(|message| message.id == message_id)
1612 .expect("message");
1613 assert_eq!(message.parts.len(), 2);
1614 match &message.parts[1] {
1615 MessagePart::ToolInvocation {
1616 tool,
1617 result,
1618 error,
1619 ..
1620 } => {
1621 assert_eq!(tool, "write");
1622 assert_eq!(result.as_ref(), Some(&json!("ok")));
1623 assert_eq!(error.as_deref(), None);
1624 }
1625 other => panic!("expected tool part, got {other:?}"),
1626 }
1627 }
1628
1629 #[tokio::test]
1630 async fn append_message_part_retains_failed_tool_error() {
1631 let base = std::env::temp_dir().join(format!("tandem-core-tool-error-{}", Uuid::new_v4()));
1632 let storage = Storage::new(&base).await.expect("storage");
1633 let session = Session::new(Some("tool errors".to_string()), Some(".".to_string()));
1634 let session_id = session.id.clone();
1635 storage.save_session(session).await.expect("save session");
1636
1637 let user = Message::new(
1638 MessageRole::User,
1639 vec![MessagePart::Text {
1640 text: "write file".to_string(),
1641 }],
1642 );
1643 let message_id = user.id.clone();
1644 storage
1645 .append_message(&session_id, user)
1646 .await
1647 .expect("append user");
1648
1649 storage
1650 .append_message_part(
1651 &session_id,
1652 &message_id,
1653 MessagePart::ToolInvocation {
1654 tool: "write".to_string(),
1655 args: json!({"path":"game.html"}),
1656 result: None,
1657 error: None,
1658 },
1659 )
1660 .await
1661 .expect("append invocation");
1662 storage
1663 .append_message_part(
1664 &session_id,
1665 &message_id,
1666 MessagePart::ToolInvocation {
1667 tool: "write".to_string(),
1668 args: json!({}),
1669 result: None,
1670 error: Some("WRITE_CONTENT_MISSING".to_string()),
1671 },
1672 )
1673 .await
1674 .expect("append error");
1675
1676 let session = storage.get_session(&session_id).await.expect("session");
1677 let message = session
1678 .messages
1679 .iter()
1680 .find(|message| message.id == message_id)
1681 .expect("message");
1682 match &message.parts[1] {
1683 MessagePart::ToolInvocation { error, .. } => {
1684 assert_eq!(error.as_deref(), Some("WRITE_CONTENT_MISSING"));
1685 }
1686 other => panic!("expected tool part, got {other:?}"),
1687 }
1688 }
1689
1690 #[tokio::test]
1691 async fn append_message_part_coalesces_repeated_tool_invocation_updates() {
1692 let base = std::env::temp_dir().join(format!("tandem-core-tool-merge-{}", Uuid::new_v4()));
1693 let storage = Storage::new(&base).await.expect("storage");
1694 let session = Session::new(Some("tool merge".to_string()), Some(".".to_string()));
1695 let session_id = session.id.clone();
1696 storage.save_session(session).await.expect("save session");
1697
1698 let user = Message::new(
1699 MessageRole::User,
1700 vec![MessagePart::Text {
1701 text: "build ui".to_string(),
1702 }],
1703 );
1704 let message_id = user.id.clone();
1705 storage
1706 .append_message(&session_id, user)
1707 .await
1708 .expect("append user");
1709
1710 storage
1711 .append_message_part(
1712 &session_id,
1713 &message_id,
1714 MessagePart::ToolInvocation {
1715 tool: "write".to_string(),
1716 args: json!({"path":"game.html"}),
1717 result: None,
1718 error: None,
1719 },
1720 )
1721 .await
1722 .expect("append first invocation");
1723 storage
1724 .append_message_part(
1725 &session_id,
1726 &message_id,
1727 MessagePart::ToolInvocation {
1728 tool: "write".to_string(),
1729 args: json!({"path":"game.html","content":"<html></html>"}),
1730 result: None,
1731 error: None,
1732 },
1733 )
1734 .await
1735 .expect("append updated invocation");
1736
1737 let session = storage.get_session(&session_id).await.expect("session");
1738 let message = session
1739 .messages
1740 .iter()
1741 .find(|message| message.id == message_id)
1742 .expect("message");
1743 assert_eq!(message.parts.len(), 2);
1744 match &message.parts[1] {
1745 MessagePart::ToolInvocation { tool, args, .. } => {
1746 assert_eq!(tool, "write");
1747 assert_eq!(args["path"], "game.html");
1748 assert_eq!(args["content"], "<html></html>");
1749 }
1750 other => panic!("expected tool part, got {other:?}"),
1751 }
1752 }
1753
1754 #[tokio::test]
1755 async fn append_message_part_falls_back_to_latest_user_message_when_id_missing() {
1756 let base =
1757 std::env::temp_dir().join(format!("tandem-core-tool-fallback-{}", Uuid::new_v4()));
1758 let storage = Storage::new(&base).await.expect("storage");
1759 let session = Session::new(Some("tool fallback".to_string()), Some(".".to_string()));
1760 let session_id = session.id.clone();
1761 storage.save_session(session).await.expect("save session");
1762
1763 let first = Message::new(
1764 MessageRole::User,
1765 vec![MessagePart::Text {
1766 text: "first prompt".to_string(),
1767 }],
1768 );
1769 let second = Message::new(
1770 MessageRole::User,
1771 vec![MessagePart::Text {
1772 text: "second prompt".to_string(),
1773 }],
1774 );
1775 let second_id = second.id.clone();
1776 storage
1777 .append_message(&session_id, first)
1778 .await
1779 .expect("append first");
1780 storage
1781 .append_message(&session_id, second)
1782 .await
1783 .expect("append second");
1784
1785 storage
1786 .append_message_part(
1787 &session_id,
1788 "missing-message-id",
1789 MessagePart::ToolInvocation {
1790 tool: "glob".to_string(),
1791 args: json!({"pattern":"*"}),
1792 result: Some(json!(["README.md"])),
1793 error: None,
1794 },
1795 )
1796 .await
1797 .expect("append fallback tool part");
1798
1799 let session = storage.get_session(&session_id).await.expect("session");
1800 let message = session
1801 .messages
1802 .iter()
1803 .find(|message| message.id == second_id)
1804 .expect("latest user message");
1805 match &message.parts[1] {
1806 MessagePart::ToolInvocation { tool, result, .. } => {
1807 assert_eq!(tool, "glob");
1808 assert_eq!(result.as_ref(), Some(&json!(["README.md"])));
1809 }
1810 other => panic!("expected tool part, got {other:?}"),
1811 }
1812 }
1813
1814 #[tokio::test]
1815 async fn startup_repairs_placeholder_titles_from_wrapped_user_messages() {
1816 let base =
1817 std::env::temp_dir().join(format!("tandem-core-title-repair-{}", Uuid::new_v4()));
1818 let storage = Storage::new(&base).await.expect("storage");
1819 let wrapped = "<memory_context>\n<current_session>\n- fact\n</current_session>\n</memory_context>\n\n[Mode instructions]\nUse tools.\n\n[User request]\nExplain this bug";
1820 let mut session = Session::new(Some("<memory_context>".to_string()), Some(".".to_string()));
1821 let id = session.id.clone();
1822 session.messages.push(Message::new(
1823 MessageRole::User,
1824 vec![MessagePart::Text {
1825 text: wrapped.to_string(),
1826 }],
1827 ));
1828 storage.save_session(session).await.expect("save");
1829 drop(storage);
1830
1831 let storage = Storage::new(&base).await.expect("storage");
1832 let repaired = storage.get_session(&id).await.expect("session");
1833 assert_eq!(repaired.title, "Explain this bug");
1834 }
1835}