1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use anyhow::Context;
5use chrono::{TimeZone, Utc};
6use serde::{Deserialize, Serialize};
7use serde_json::{json, Value};
8use tokio::fs;
9use tokio::sync::RwLock;
10use tokio::task;
11use uuid::Uuid;
12
13use tandem_types::{Message, MessagePart, MessageRole, Session};
14
15use crate::{
16 derive_session_title_from_prompt, normalize_workspace_path, title_needs_repair,
17 workspace_project_id,
18};
19
20#[derive(Debug, Clone, Serialize, Deserialize, Default)]
21pub struct SessionMeta {
22 pub parent_id: Option<String>,
23 #[serde(default)]
24 pub archived: bool,
25 #[serde(default)]
26 pub shared: bool,
27 pub share_id: Option<String>,
28 pub summary: Option<String>,
29 #[serde(default)]
30 pub snapshots: Vec<Vec<Message>>,
31 pub pre_revert: Option<Vec<Message>>,
32 #[serde(default)]
33 pub todos: Vec<Value>,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct QuestionToolRef {
38 #[serde(rename = "callID")]
39 pub call_id: String,
40 #[serde(rename = "messageID")]
41 pub message_id: String,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct QuestionRequest {
46 pub id: String,
47 #[serde(rename = "sessionID")]
48 pub session_id: String,
49 #[serde(default)]
50 pub questions: Vec<Value>,
51 #[serde(skip_serializing_if = "Option::is_none")]
52 pub tool: Option<QuestionToolRef>,
53}
54
55pub struct Storage {
56 base: PathBuf,
57 sessions: RwLock<HashMap<String, Session>>,
58 metadata: RwLock<HashMap<String, SessionMeta>>,
59 question_requests: RwLock<HashMap<String, QuestionRequest>>,
60}
61
62#[derive(Debug, Clone)]
63pub enum SessionListScope {
64 Global,
65 Workspace { workspace_root: String },
66}
67
68#[derive(Debug, Clone, Default, Serialize, Deserialize)]
69pub struct SessionRepairStats {
70 pub sessions_repaired: u64,
71 pub messages_recovered: u64,
72 pub parts_recovered: u64,
73 pub conflicts_merged: u64,
74}
75
76const LEGACY_IMPORT_MARKER_FILE: &str = "legacy_import_marker.json";
77const LEGACY_IMPORT_MARKER_VERSION: u32 = 1;
78const MAX_SESSION_SNAPSHOTS: usize = 5;
79
80#[derive(Debug, Clone, Default, Serialize, Deserialize)]
81pub struct LegacyTreeCounts {
82 pub session_files: u64,
83 pub message_files: u64,
84 pub part_files: u64,
85}
86
87#[derive(Debug, Clone, Default, Serialize, Deserialize)]
88pub struct LegacyImportedCounts {
89 pub sessions: u64,
90 pub messages: u64,
91 pub parts: u64,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct LegacyImportMarker {
96 pub version: u32,
97 pub created_at_ms: u64,
98 pub last_checked_at_ms: u64,
99 pub legacy_counts: LegacyTreeCounts,
100 pub imported_counts: LegacyImportedCounts,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct LegacyRepairRunReport {
105 pub status: String,
106 pub marker_updated: bool,
107 pub sessions_merged: u64,
108 pub messages_recovered: u64,
109 pub parts_recovered: u64,
110 pub legacy_counts: LegacyTreeCounts,
111 pub imported_counts: LegacyImportedCounts,
112}
113
114fn snapshot_session_messages(
115 session_id: &str,
116 session: &Session,
117 metadata: &mut HashMap<String, SessionMeta>,
118) {
119 let meta = metadata
120 .entry(session_id.to_string())
121 .or_insert_with(SessionMeta::default);
122 meta.snapshots.push(session.messages.clone());
123 trim_session_snapshots(&mut meta.snapshots);
124}
125
126fn trim_session_snapshots(snapshots: &mut Vec<Vec<Message>>) {
127 if snapshots.len() > MAX_SESSION_SNAPSHOTS {
128 let keep_from = snapshots.len() - MAX_SESSION_SNAPSHOTS;
129 snapshots.drain(0..keep_from);
130 }
131}
132
133fn compact_session_snapshots(snapshots: &mut Vec<Vec<Message>>) -> usize {
134 if snapshots.is_empty() {
135 return 0;
136 }
137
138 let original_len = snapshots.len();
139 let mut compacted = Vec::with_capacity(original_len);
140 let mut previous_encoded: Option<Vec<u8>> = None;
141
142 for snapshot in snapshots.drain(..) {
143 let encoded = serde_json::to_vec(&snapshot).unwrap_or_default();
144 if previous_encoded.as_ref() == Some(&encoded) {
145 continue;
146 }
147 previous_encoded = Some(encoded);
148 compacted.push(snapshot);
149 }
150
151 trim_session_snapshots(&mut compacted);
152 let removed = original_len.saturating_sub(compacted.len());
153 *snapshots = compacted;
154 removed
155}
156
157fn session_meta_is_empty(meta: &SessionMeta) -> bool {
158 meta.parent_id.is_none()
159 && !meta.archived
160 && !meta.shared
161 && meta.share_id.is_none()
162 && meta.summary.is_none()
163 && meta.snapshots.is_empty()
164 && meta.pre_revert.is_none()
165 && meta.todos.is_empty()
166}
167
168#[derive(Debug, Default)]
169struct SessionMetaCompactionStats {
170 metadata_pruned: u64,
171 snapshots_removed: u64,
172}
173
174fn compact_session_metadata(
175 sessions: &HashMap<String, Session>,
176 metadata: &mut HashMap<String, SessionMeta>,
177) -> SessionMetaCompactionStats {
178 let mut stats = SessionMetaCompactionStats::default();
179
180 metadata.retain(|session_id, meta| {
181 if !sessions.contains_key(session_id) {
182 stats.metadata_pruned += 1;
183 return false;
184 }
185
186 let removed = compact_session_snapshots(&mut meta.snapshots) as u64;
187 stats.snapshots_removed += removed;
188
189 if session_meta_is_empty(meta) {
190 stats.metadata_pruned += 1;
191 return false;
192 }
193
194 true
195 });
196
197 stats
198}
199
200fn merge_message_part(message: &mut Message, part: MessagePart) {
201 match part {
202 MessagePart::ToolInvocation {
203 tool,
204 args,
205 result,
206 error,
207 } => {
208 let args_are_empty = tool_args_are_empty(&args);
209 if result.is_none() && error.is_none() {
210 if let Some(existing) = message.parts.iter_mut().rev().find(|existing| {
211 matches!(
212 existing,
213 MessagePart::ToolInvocation {
214 tool: existing_tool,
215 result: None,
216 error: None,
217 ..
218 } if existing_tool == &tool
219 )
220 }) {
221 if let MessagePart::ToolInvocation {
222 args: existing_args,
223 ..
224 } = existing
225 {
226 if should_replace_tool_args(existing_args, &args) || existing_args == &args
227 {
228 *existing_args = args;
229 }
230 return;
231 }
232 }
233 }
234 if result.is_some() || error.is_some() {
235 if let Some(existing) = message.parts.iter_mut().rev().find(|existing| {
236 matches!(
237 existing,
238 MessagePart::ToolInvocation {
239 tool: existing_tool,
240 result: None,
241 error: None,
242 ..
243 } if existing_tool == &tool
244 )
245 }) {
246 if let MessagePart::ToolInvocation {
247 args: existing_args,
248 result: existing_result,
249 error: existing_error,
250 ..
251 } = existing
252 {
253 if should_replace_tool_args(existing_args, &args) {
254 if tool == "write" && args_are_empty {
255 tracing::info!(
256 tool = %tool,
257 "merging write result/error into existing tool part with empty args"
258 );
259 }
260 *existing_args = args.clone();
261 }
262 *existing_result = result;
263 *existing_error = error;
264 return;
265 }
266 }
267 }
268 message.parts.push(MessagePart::ToolInvocation {
269 tool,
270 args,
271 result,
272 error,
273 });
274 }
275 other => message.parts.push(other),
276 }
277}
278
279fn tool_args_are_empty(args: &Value) -> bool {
280 match args {
281 Value::Null => true,
282 Value::Object(values) => values.is_empty(),
283 Value::Array(values) => values.is_empty(),
284 Value::String(value) => value.trim().is_empty(),
285 _ => false,
286 }
287}
288
289fn tool_args_have_more_structure(existing: &Value, incoming: &Value) -> bool {
290 match (existing, incoming) {
291 (Value::String(current), Value::Object(values)) => {
292 !current.trim().is_empty() && !values.is_empty()
293 }
294 (Value::Object(current), Value::Object(next)) => {
295 next.len() > current.len()
296 && current
297 .iter()
298 .all(|(key, value)| next.get(key) == Some(value))
299 }
300 _ => false,
301 }
302}
303
304fn should_replace_tool_args(existing: &Value, incoming: &Value) -> bool {
305 if tool_args_are_empty(incoming) {
306 return tool_args_are_empty(existing);
307 }
308 if tool_args_are_empty(existing) {
309 return true;
310 }
311 tool_args_have_more_structure(existing, incoming)
312}
313
314impl Storage {
315 pub async fn new(base: impl AsRef<Path>) -> anyhow::Result<Self> {
316 let base = base.as_ref().to_path_buf();
317 fs::create_dir_all(&base).await?;
318 let sessions_file = base.join("sessions.json");
319 let marker_path = base.join(LEGACY_IMPORT_MARKER_FILE);
320 let sessions_file_exists = sessions_file.exists();
321 let mut imported_legacy_sessions = false;
322 let mut sessions = if sessions_file_exists {
323 let raw = fs::read_to_string(&sessions_file).await?;
324 serde_json::from_str::<HashMap<String, Session>>(&raw).unwrap_or_default()
325 } else {
326 HashMap::new()
327 };
328
329 let mut marker_to_write = None;
330 if should_run_legacy_scan_on_startup(&marker_path, sessions_file_exists).await {
331 let base_for_scan = base.clone();
332 let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
333 .await
334 .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
335 if merge_legacy_sessions(&mut sessions, scan.sessions) {
336 imported_legacy_sessions = true;
337 }
338 marker_to_write = Some(LegacyImportMarker {
339 version: LEGACY_IMPORT_MARKER_VERSION,
340 created_at_ms: now_ms_u64(),
341 last_checked_at_ms: now_ms_u64(),
342 legacy_counts: scan.legacy_counts,
343 imported_counts: scan.imported_counts,
344 });
345 }
346
347 if hydrate_workspace_roots(&mut sessions) {
348 imported_legacy_sessions = true;
349 }
350 if repair_session_titles(&mut sessions) {
351 imported_legacy_sessions = true;
352 }
353 let metadata_file = base.join("session_meta.json");
354 let mut metadata = if metadata_file.exists() {
355 let raw = fs::read_to_string(&metadata_file).await?;
356 serde_json::from_str::<HashMap<String, SessionMeta>>(&raw).unwrap_or_default()
357 } else {
358 HashMap::new()
359 };
360 let compaction = compact_session_metadata(&sessions, &mut metadata);
361 let metadata_compacted = compaction.metadata_pruned > 0 || compaction.snapshots_removed > 0;
362 if metadata_compacted {
363 tracing::info!(
364 metadata_pruned = compaction.metadata_pruned,
365 snapshots_removed = compaction.snapshots_removed,
366 "compacted persisted session metadata"
367 );
368 }
369 let questions_file = base.join("questions.json");
370 let question_requests = if questions_file.exists() {
371 let raw = fs::read_to_string(&questions_file).await?;
372 serde_json::from_str::<HashMap<String, QuestionRequest>>(&raw).unwrap_or_default()
373 } else {
374 HashMap::new()
375 };
376 let storage = Self {
377 base,
378 sessions: RwLock::new(sessions),
379 metadata: RwLock::new(metadata),
380 question_requests: RwLock::new(question_requests),
381 };
382
383 if imported_legacy_sessions || metadata_compacted {
384 storage.flush().await?;
385 }
386 if let Some(marker) = marker_to_write {
387 storage.write_legacy_import_marker(&marker).await?;
388 }
389
390 Ok(storage)
391 }
392
393 pub async fn list_sessions(&self) -> Vec<Session> {
394 self.list_sessions_scoped(SessionListScope::Global).await
395 }
396
397 pub async fn list_sessions_scoped(&self, scope: SessionListScope) -> Vec<Session> {
398 let all = self
399 .sessions
400 .read()
401 .await
402 .values()
403 .cloned()
404 .collect::<Vec<_>>();
405 match scope {
406 SessionListScope::Global => all,
407 SessionListScope::Workspace { workspace_root } => {
408 let Some(normalized_workspace) = normalize_workspace_path(&workspace_root) else {
409 return Vec::new();
410 };
411 all.into_iter()
412 .filter(|session| {
413 let direct = session
414 .workspace_root
415 .as_ref()
416 .and_then(|p| normalize_workspace_path(p))
417 .map(|p| p == normalized_workspace)
418 .unwrap_or(false);
419 if direct {
420 return true;
421 }
422 normalize_workspace_path(&session.directory)
423 .map(|p| p == normalized_workspace)
424 .unwrap_or(false)
425 })
426 .collect()
427 }
428 }
429 }
430
431 pub async fn get_session(&self, id: &str) -> Option<Session> {
432 self.sessions.read().await.get(id).cloned()
433 }
434
435 pub async fn save_session(&self, mut session: Session) -> anyhow::Result<()> {
436 if session.workspace_root.is_none() {
437 session.workspace_root = normalize_workspace_path(&session.directory);
438 }
439 let session_id = session.id.clone();
440 self.sessions
441 .write()
442 .await
443 .insert(session_id.clone(), session);
444 self.metadata
445 .write()
446 .await
447 .entry(session_id)
448 .or_insert_with(SessionMeta::default);
449 self.flush().await
450 }
451
452 pub async fn repair_sessions_from_file_store(&self) -> anyhow::Result<SessionRepairStats> {
453 let mut stats = SessionRepairStats::default();
454 let mut sessions = self.sessions.write().await;
455
456 for session in sessions.values_mut() {
457 let imported = load_legacy_session_messages(&self.base, &session.id);
458 if imported.is_empty() {
459 continue;
460 }
461
462 let (merged, merge_stats, changed) =
463 merge_session_messages(&session.messages, &imported);
464 if changed {
465 session.messages = merged;
466 session.time.updated =
467 most_recent_message_time(&session.messages).unwrap_or(session.time.updated);
468 stats.sessions_repaired += 1;
469 stats.messages_recovered += merge_stats.messages_recovered;
470 stats.parts_recovered += merge_stats.parts_recovered;
471 stats.conflicts_merged += merge_stats.conflicts_merged;
472 }
473 }
474
475 if stats.sessions_repaired > 0 {
476 drop(sessions);
477 self.flush().await?;
478 }
479
480 Ok(stats)
481 }
482
483 pub async fn run_legacy_storage_repair_scan(
484 &self,
485 force: bool,
486 ) -> anyhow::Result<LegacyRepairRunReport> {
487 let marker_path = self.base.join(LEGACY_IMPORT_MARKER_FILE);
488 let sessions_exists = self.base.join("sessions.json").exists();
489 let should_scan = if force {
490 true
491 } else {
492 should_run_legacy_scan_on_startup(&marker_path, sessions_exists).await
493 };
494 if !should_scan {
495 let marker = read_legacy_import_marker(&marker_path)
496 .await
497 .unwrap_or_else(|| LegacyImportMarker {
498 version: LEGACY_IMPORT_MARKER_VERSION,
499 created_at_ms: now_ms_u64(),
500 last_checked_at_ms: now_ms_u64(),
501 legacy_counts: LegacyTreeCounts::default(),
502 imported_counts: LegacyImportedCounts::default(),
503 });
504 return Ok(LegacyRepairRunReport {
505 status: "skipped".to_string(),
506 marker_updated: false,
507 sessions_merged: 0,
508 messages_recovered: 0,
509 parts_recovered: 0,
510 legacy_counts: marker.legacy_counts,
511 imported_counts: marker.imported_counts,
512 });
513 }
514
515 let base_for_scan = self.base.clone();
516 let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
517 .await
518 .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
519
520 let merge_stats = {
521 let mut sessions = self.sessions.write().await;
522 merge_legacy_sessions_with_stats(&mut sessions, scan.sessions)
523 };
524
525 if merge_stats.changed {
526 self.flush().await?;
527 }
528
529 let marker = LegacyImportMarker {
530 version: LEGACY_IMPORT_MARKER_VERSION,
531 created_at_ms: now_ms_u64(),
532 last_checked_at_ms: now_ms_u64(),
533 legacy_counts: scan.legacy_counts.clone(),
534 imported_counts: scan.imported_counts.clone(),
535 };
536 self.write_legacy_import_marker(&marker).await?;
537
538 Ok(LegacyRepairRunReport {
539 status: if merge_stats.changed {
540 "updated".to_string()
541 } else {
542 "no_changes".to_string()
543 },
544 marker_updated: true,
545 sessions_merged: merge_stats.sessions_merged,
546 messages_recovered: merge_stats.messages_recovered,
547 parts_recovered: merge_stats.parts_recovered,
548 legacy_counts: scan.legacy_counts,
549 imported_counts: scan.imported_counts,
550 })
551 }
552
553 pub async fn delete_session(&self, id: &str) -> anyhow::Result<bool> {
554 let removed = self.sessions.write().await.remove(id).is_some();
555 self.metadata.write().await.remove(id);
556 self.question_requests
557 .write()
558 .await
559 .retain(|_, request| request.session_id != id);
560 if removed {
561 self.flush().await?;
562 }
563 Ok(removed)
564 }
565
566 pub async fn append_message(&self, session_id: &str, msg: Message) -> anyhow::Result<()> {
567 let mut sessions = self.sessions.write().await;
568 let session = sessions
569 .get_mut(session_id)
570 .context("session not found for append_message")?;
571 session.messages.push(msg);
572 session.time.updated = Utc::now();
573 drop(sessions);
574 self.flush().await
575 }
576
577 pub async fn append_message_part(
578 &self,
579 session_id: &str,
580 message_id: &str,
581 part: MessagePart,
582 ) -> anyhow::Result<()> {
583 let mut sessions = self.sessions.write().await;
584 let session = sessions
585 .get_mut(session_id)
586 .context("session not found for append_message_part")?;
587 let message = if let Some(message) = session
588 .messages
589 .iter_mut()
590 .find(|message| message.id == message_id)
591 {
592 message
593 } else {
594 session
595 .messages
596 .iter_mut()
597 .rev()
598 .find(|message| matches!(message.role, MessageRole::User))
599 .context("message not found for append_message_part")?
600 };
601 merge_message_part(message, part);
602 session.time.updated = Utc::now();
603 drop(sessions);
604 self.flush().await
605 }
606
607 pub async fn fork_session(&self, id: &str) -> anyhow::Result<Option<Session>> {
608 let source = {
609 let sessions = self.sessions.read().await;
610 sessions.get(id).cloned()
611 };
612 let Some(mut child) = source else {
613 return Ok(None);
614 };
615
616 child.id = Uuid::new_v4().to_string();
617 child.title = format!("{} (fork)", child.title);
618 child.time.created = Utc::now();
619 child.time.updated = child.time.created;
620 child.slug = None;
621
622 self.sessions
623 .write()
624 .await
625 .insert(child.id.clone(), child.clone());
626 self.metadata.write().await.insert(
627 child.id.clone(),
628 SessionMeta {
629 parent_id: Some(id.to_string()),
630 snapshots: vec![child.messages.clone()],
631 ..SessionMeta::default()
632 },
633 );
634 self.flush().await?;
635 Ok(Some(child))
636 }
637
638 pub async fn revert_session(&self, id: &str) -> anyhow::Result<bool> {
639 let mut sessions = self.sessions.write().await;
640 let Some(session) = sessions.get_mut(id) else {
641 return Ok(false);
642 };
643 let mut metadata = self.metadata.write().await;
644 let meta = metadata
645 .entry(id.to_string())
646 .or_insert_with(SessionMeta::default);
647 let Some(snapshot) = meta.snapshots.pop() else {
648 return Ok(false);
649 };
650 meta.pre_revert = Some(session.messages.clone());
651 session.messages = snapshot;
652 session.time.updated = Utc::now();
653 drop(metadata);
654 drop(sessions);
655 self.flush().await?;
656 Ok(true)
657 }
658
659 pub async fn unrevert_session(&self, id: &str) -> anyhow::Result<bool> {
660 let mut sessions = self.sessions.write().await;
661 let Some(session) = sessions.get_mut(id) else {
662 return Ok(false);
663 };
664 let mut metadata = self.metadata.write().await;
665 let Some(meta) = metadata.get_mut(id) else {
666 return Ok(false);
667 };
668 let Some(previous) = meta.pre_revert.take() else {
669 return Ok(false);
670 };
671 meta.snapshots.push(session.messages.clone());
672 trim_session_snapshots(&mut meta.snapshots);
673 session.messages = previous;
674 session.time.updated = Utc::now();
675 drop(metadata);
676 drop(sessions);
677 self.flush().await?;
678 Ok(true)
679 }
680
681 pub async fn set_shared(&self, id: &str, shared: bool) -> anyhow::Result<Option<String>> {
682 let mut metadata = self.metadata.write().await;
683 let meta = metadata
684 .entry(id.to_string())
685 .or_insert_with(SessionMeta::default);
686 meta.shared = shared;
687 if shared {
688 if meta.share_id.is_none() {
689 meta.share_id = Some(Uuid::new_v4().to_string());
690 }
691 } else {
692 meta.share_id = None;
693 }
694 let share_id = meta.share_id.clone();
695 drop(metadata);
696 self.flush().await?;
697 Ok(share_id)
698 }
699
700 pub async fn set_archived(&self, id: &str, archived: bool) -> anyhow::Result<bool> {
701 let mut metadata = self.metadata.write().await;
702 let meta = metadata
703 .entry(id.to_string())
704 .or_insert_with(SessionMeta::default);
705 meta.archived = archived;
706 drop(metadata);
707 self.flush().await?;
708 Ok(true)
709 }
710
711 pub async fn set_summary(&self, id: &str, summary: String) -> anyhow::Result<bool> {
712 let mut metadata = self.metadata.write().await;
713 let meta = metadata
714 .entry(id.to_string())
715 .or_insert_with(SessionMeta::default);
716 meta.summary = Some(summary);
717 drop(metadata);
718 self.flush().await?;
719 Ok(true)
720 }
721
722 pub async fn children(&self, parent_id: &str) -> Vec<Session> {
723 let child_ids = {
724 let metadata = self.metadata.read().await;
725 metadata
726 .iter()
727 .filter(|(_, meta)| meta.parent_id.as_deref() == Some(parent_id))
728 .map(|(id, _)| id.clone())
729 .collect::<Vec<_>>()
730 };
731 let sessions = self.sessions.read().await;
732 child_ids
733 .into_iter()
734 .filter_map(|id| sessions.get(&id).cloned())
735 .collect()
736 }
737
738 pub async fn session_status(&self, id: &str) -> Option<Value> {
739 let metadata = self.metadata.read().await;
740 metadata.get(id).map(|meta| {
741 json!({
742 "archived": meta.archived,
743 "shared": meta.shared,
744 "parentID": meta.parent_id,
745 "snapshotCount": meta.snapshots.len()
746 })
747 })
748 }
749
750 pub async fn session_diff(&self, id: &str) -> Option<Value> {
751 let sessions = self.sessions.read().await;
752 let current = sessions.get(id)?;
753 let metadata = self.metadata.read().await;
754 let default = SessionMeta::default();
755 let meta = metadata.get(id).unwrap_or(&default);
756 let last_snapshot_len = meta.snapshots.last().map(|s| s.len()).unwrap_or(0);
757 Some(json!({
758 "sessionID": id,
759 "currentMessageCount": current.messages.len(),
760 "lastSnapshotMessageCount": last_snapshot_len,
761 "delta": current.messages.len() as i64 - last_snapshot_len as i64
762 }))
763 }
764
765 pub async fn set_todos(&self, id: &str, todos: Vec<Value>) -> anyhow::Result<()> {
766 let mut metadata = self.metadata.write().await;
767 let meta = metadata
768 .entry(id.to_string())
769 .or_insert_with(SessionMeta::default);
770 meta.todos = normalize_todo_items(todos);
771 drop(metadata);
772 self.flush().await
773 }
774
775 pub async fn get_todos(&self, id: &str) -> Vec<Value> {
776 let todos = self
777 .metadata
778 .read()
779 .await
780 .get(id)
781 .map(|meta| meta.todos.clone())
782 .unwrap_or_default();
783 normalize_todo_items(todos)
784 }
785
786 pub async fn add_question_request(
787 &self,
788 session_id: &str,
789 message_id: &str,
790 questions: Vec<Value>,
791 ) -> anyhow::Result<QuestionRequest> {
792 if questions.is_empty() {
793 return Err(anyhow::anyhow!(
794 "cannot add empty question request for session {}",
795 session_id
796 ));
797 }
798 let request = QuestionRequest {
799 id: format!("q-{}", Uuid::new_v4()),
800 session_id: session_id.to_string(),
801 questions,
802 tool: Some(QuestionToolRef {
803 call_id: format!("call-{}", Uuid::new_v4()),
804 message_id: message_id.to_string(),
805 }),
806 };
807 self.question_requests
808 .write()
809 .await
810 .insert(request.id.clone(), request.clone());
811 self.flush().await?;
812 Ok(request)
813 }
814
815 pub async fn list_question_requests(&self) -> Vec<QuestionRequest> {
816 self.question_requests
817 .read()
818 .await
819 .values()
820 .cloned()
821 .collect()
822 }
823
824 pub async fn reply_question(&self, request_id: &str) -> anyhow::Result<bool> {
825 let removed = self
826 .question_requests
827 .write()
828 .await
829 .remove(request_id)
830 .is_some();
831 if removed {
832 self.flush().await?;
833 }
834 Ok(removed)
835 }
836
837 pub async fn reject_question(&self, request_id: &str) -> anyhow::Result<bool> {
838 self.reply_question(request_id).await
839 }
840
841 pub async fn attach_session_to_workspace(
842 &self,
843 session_id: &str,
844 target_workspace: &str,
845 reason_tag: &str,
846 ) -> anyhow::Result<Option<Session>> {
847 let Some(target_workspace) = normalize_workspace_path(target_workspace) else {
848 return Ok(None);
849 };
850 let mut sessions = self.sessions.write().await;
851 let Some(session) = sessions.get_mut(session_id) else {
852 return Ok(None);
853 };
854 let previous_workspace = session
855 .workspace_root
856 .clone()
857 .or_else(|| normalize_workspace_path(&session.directory));
858
859 if session.origin_workspace_root.is_none() {
860 session.origin_workspace_root = previous_workspace.clone();
861 }
862 session.attached_from_workspace = previous_workspace;
863 session.attached_to_workspace = Some(target_workspace.clone());
864 session.attach_timestamp_ms = Some(Utc::now().timestamp_millis().max(0) as u64);
865 session.attach_reason = Some(reason_tag.trim().to_string());
866 session.workspace_root = Some(target_workspace.clone());
867 session.project_id = workspace_project_id(&target_workspace);
868 session.directory = target_workspace;
869 session.time.updated = Utc::now();
870 let updated = session.clone();
871 drop(sessions);
872 self.flush().await?;
873 Ok(Some(updated))
874 }
875
876 async fn flush(&self) -> anyhow::Result<()> {
877 {
878 let snapshot = self.sessions.read().await.clone();
879 self.flush_file("sessions.json", &snapshot).await?;
880 }
881 {
882 let metadata_snapshot = self.metadata.read().await.clone();
883 self.flush_file("session_meta.json", &metadata_snapshot)
884 .await?;
885 }
886 {
887 let questions_snapshot = self.question_requests.read().await.clone();
888 self.flush_file("questions.json", &questions_snapshot)
889 .await?;
890 }
891 Ok(())
892 }
893
894 async fn flush_file(&self, filename: &str, data: &impl serde::Serialize) -> anyhow::Result<()> {
895 let path = self.base.join(filename);
896 let temp_path = self.base.join(format!("{}.tmp", filename));
897 let payload = serde_json::to_string_pretty(data)?;
898 fs::write(&temp_path, payload).await.with_context(|| {
899 format!("failed to write temp storage file {}", temp_path.display())
900 })?;
901 let std_temp_path: std::path::PathBuf = temp_path.clone().try_into()?;
902 tokio::task::spawn_blocking(move || {
903 let file = std::fs::File::open(&std_temp_path)?;
904 file.sync_all()?;
905 Ok::<(), std::io::Error>(())
906 })
907 .await??;
908 commit_temp_file(&temp_path, &path).await.with_context(|| {
909 format!(
910 "failed to atomically replace storage file {} with {}",
911 path.display(),
912 temp_path.display()
913 )
914 })?;
915 Ok(())
916 }
917
918 async fn write_legacy_import_marker(&self, marker: &LegacyImportMarker) -> anyhow::Result<()> {
919 let payload = serde_json::to_string_pretty(marker)?;
920 fs::write(self.base.join(LEGACY_IMPORT_MARKER_FILE), payload).await?;
921 Ok(())
922 }
923}
924
925async fn commit_temp_file(temp_path: &Path, path: &Path) -> std::io::Result<()> {
926 match tokio::fs::rename(temp_path, path).await {
927 Ok(()) => Ok(()),
928 Err(err) => {
929 #[cfg(windows)]
930 {
931 use std::io::ErrorKind;
934 if matches!(
935 err.kind(),
936 ErrorKind::PermissionDenied | ErrorKind::AlreadyExists
937 ) {
938 match tokio::fs::remove_file(path).await {
939 Ok(()) => {}
940 Err(remove_err) if remove_err.kind() == ErrorKind::NotFound => {}
941 Err(remove_err) => return Err(remove_err),
942 }
943 return tokio::fs::rename(temp_path, path).await;
944 }
945 }
946 Err(err)
947 }
948 }
949}
950
951fn normalize_todo_items(items: Vec<Value>) -> Vec<Value> {
952 items
953 .into_iter()
954 .filter_map(|item| {
955 let obj = item.as_object()?;
956 let content = obj
957 .get("content")
958 .and_then(|v| v.as_str())
959 .or_else(|| obj.get("text").and_then(|v| v.as_str()))
960 .unwrap_or("")
961 .trim()
962 .to_string();
963 if content.is_empty() {
964 return None;
965 }
966 let id = obj
967 .get("id")
968 .and_then(|v| v.as_str())
969 .filter(|s| !s.trim().is_empty())
970 .map(ToString::to_string)
971 .unwrap_or_else(|| format!("todo-{}", Uuid::new_v4()));
972 let status = obj
973 .get("status")
974 .and_then(|v| v.as_str())
975 .filter(|s| !s.trim().is_empty())
976 .map(ToString::to_string)
977 .unwrap_or_else(|| "pending".to_string());
978 Some(json!({
979 "id": id,
980 "content": content,
981 "status": status
982 }))
983 })
984 .collect()
985}
986
987#[derive(Debug)]
988struct LegacyScanResult {
989 sessions: HashMap<String, Session>,
990 legacy_counts: LegacyTreeCounts,
991 imported_counts: LegacyImportedCounts,
992}
993
994#[derive(Debug, Default)]
995struct LegacyMergeStats {
996 changed: bool,
997 sessions_merged: u64,
998 messages_recovered: u64,
999 parts_recovered: u64,
1000}
1001
1002fn now_ms_u64() -> u64 {
1003 Utc::now().timestamp_millis().max(0) as u64
1004}
1005
1006async fn should_run_legacy_scan_on_startup(marker_path: &Path, sessions_exist: bool) -> bool {
1007 if !sessions_exist {
1008 return true;
1009 }
1010 if read_legacy_import_marker(marker_path).await.is_none() {
1013 return false;
1014 }
1015 false
1016}
1017
1018async fn read_legacy_import_marker(marker_path: &Path) -> Option<LegacyImportMarker> {
1019 let raw = fs::read_to_string(marker_path).await.ok()?;
1020 serde_json::from_str::<LegacyImportMarker>(&raw).ok()
1021}
1022
1023fn scan_legacy_sessions(base: &Path) -> anyhow::Result<LegacyScanResult> {
1024 let sessions = load_legacy_opencode_sessions(base).unwrap_or_default();
1025 let imported_counts = LegacyImportedCounts {
1026 sessions: sessions.len() as u64,
1027 messages: sessions.values().map(|s| s.messages.len() as u64).sum(),
1028 parts: sessions
1029 .values()
1030 .flat_map(|s| s.messages.iter())
1031 .map(|m| m.parts.len() as u64)
1032 .sum(),
1033 };
1034 let legacy_counts = LegacyTreeCounts {
1035 session_files: count_legacy_json_files(&base.join("session")),
1036 message_files: count_legacy_json_files(&base.join("message")),
1037 part_files: count_legacy_json_files(&base.join("part")),
1038 };
1039 Ok(LegacyScanResult {
1040 sessions,
1041 legacy_counts,
1042 imported_counts,
1043 })
1044}
1045
1046fn count_legacy_json_files(root: &Path) -> u64 {
1047 if !root.is_dir() {
1048 return 0;
1049 }
1050 let mut count = 0u64;
1051 let mut stack = vec![root.to_path_buf()];
1052 while let Some(dir) = stack.pop() {
1053 if let Ok(entries) = std::fs::read_dir(&dir) {
1054 for entry in entries.flatten() {
1055 let path = entry.path();
1056 if entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
1057 stack.push(path);
1058 continue;
1059 }
1060 if path.extension().and_then(|ext| ext.to_str()) == Some("json") {
1061 count += 1;
1062 }
1063 }
1064 }
1065 }
1066 count
1067}
1068
1069fn merge_legacy_sessions(
1070 current: &mut HashMap<String, Session>,
1071 imported: HashMap<String, Session>,
1072) -> bool {
1073 merge_legacy_sessions_with_stats(current, imported).changed
1074}
1075
1076fn merge_legacy_sessions_with_stats(
1077 current: &mut HashMap<String, Session>,
1078 imported: HashMap<String, Session>,
1079) -> LegacyMergeStats {
1080 let mut stats = LegacyMergeStats::default();
1081 for (id, legacy) in imported {
1082 let legacy_message_count = legacy.messages.len() as u64;
1083 let legacy_part_count = legacy
1084 .messages
1085 .iter()
1086 .map(|m| m.parts.len() as u64)
1087 .sum::<u64>();
1088 match current.get_mut(&id) {
1089 None => {
1090 current.insert(id, legacy);
1091 stats.changed = true;
1092 stats.sessions_merged += 1;
1093 stats.messages_recovered += legacy_message_count;
1094 stats.parts_recovered += legacy_part_count;
1095 }
1096 Some(existing) => {
1097 let should_merge_messages =
1098 existing.messages.is_empty() && !legacy.messages.is_empty();
1099 let should_fill_title =
1100 existing.title.trim().is_empty() && !legacy.title.trim().is_empty();
1101 let should_fill_directory = (existing.directory.trim().is_empty()
1102 || existing.directory.trim() == "."
1103 || existing.directory.trim() == "./"
1104 || existing.directory.trim() == ".\\")
1105 && !legacy.directory.trim().is_empty();
1106 let should_fill_workspace =
1107 existing.workspace_root.is_none() && legacy.workspace_root.is_some();
1108 if should_merge_messages {
1109 existing.messages = legacy.messages.clone();
1110 }
1111 if should_fill_title {
1112 existing.title = legacy.title.clone();
1113 }
1114 if should_fill_directory {
1115 existing.directory = legacy.directory.clone();
1116 }
1117 if should_fill_workspace {
1118 existing.workspace_root = legacy.workspace_root.clone();
1119 }
1120 if should_merge_messages
1121 || should_fill_title
1122 || should_fill_directory
1123 || should_fill_workspace
1124 {
1125 stats.changed = true;
1126 if should_merge_messages {
1127 stats.sessions_merged += 1;
1128 stats.messages_recovered += legacy_message_count;
1129 stats.parts_recovered += legacy_part_count;
1130 }
1131 }
1132 }
1133 }
1134 }
1135 stats
1136}
1137
1138fn hydrate_workspace_roots(sessions: &mut HashMap<String, Session>) -> bool {
1139 let mut changed = false;
1140 for session in sessions.values_mut() {
1141 if session.workspace_root.is_none() {
1142 let normalized = normalize_workspace_path(&session.directory);
1143 if normalized.is_some() {
1144 session.workspace_root = normalized;
1145 changed = true;
1146 }
1147 }
1148 }
1149 changed
1150}
1151
1152fn repair_session_titles(sessions: &mut HashMap<String, Session>) -> bool {
1153 let mut changed = false;
1154 for session in sessions.values_mut() {
1155 if !title_needs_repair(&session.title) {
1156 continue;
1157 }
1158 let first_user_text = session.messages.iter().find_map(|message| {
1159 if !matches!(message.role, MessageRole::User) {
1160 return None;
1161 }
1162 message.parts.iter().find_map(|part| match part {
1163 MessagePart::Text { text } if !text.trim().is_empty() => Some(text.as_str()),
1164 _ => None,
1165 })
1166 });
1167 let Some(source) = first_user_text else {
1168 continue;
1169 };
1170 let Some(derived) = derive_session_title_from_prompt(source, 60) else {
1171 continue;
1172 };
1173 if derived == session.title {
1174 continue;
1175 }
1176 session.title = derived;
1177 session.time.updated = Utc::now();
1178 changed = true;
1179 }
1180 changed
1181}
1182
1183#[derive(Debug, Deserialize)]
1184struct LegacySessionTime {
1185 created: i64,
1186 updated: i64,
1187}
1188
1189#[derive(Debug, Deserialize)]
1190struct LegacySession {
1191 id: String,
1192 slug: Option<String>,
1193 version: Option<String>,
1194 #[serde(rename = "projectID")]
1195 project_id: Option<String>,
1196 title: Option<String>,
1197 directory: Option<String>,
1198 time: LegacySessionTime,
1199}
1200
1201fn load_legacy_opencode_sessions(base: &Path) -> anyhow::Result<HashMap<String, Session>> {
1202 let legacy_root = base.join("session");
1203 if !legacy_root.is_dir() {
1204 return Ok(HashMap::new());
1205 }
1206
1207 let mut out = HashMap::new();
1208 let mut stack = vec![legacy_root];
1209 while let Some(dir) = stack.pop() {
1210 for entry in std::fs::read_dir(&dir)? {
1211 let entry = entry?;
1212 let path = entry.path();
1213 if entry.file_type()?.is_dir() {
1214 stack.push(path);
1215 continue;
1216 }
1217 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1218 continue;
1219 }
1220 let raw = match std::fs::read_to_string(&path) {
1221 Ok(v) => v,
1222 Err(_) => continue,
1223 };
1224 let legacy = match serde_json::from_str::<LegacySession>(&raw) {
1225 Ok(v) => v,
1226 Err(_) => continue,
1227 };
1228 let created = Utc
1229 .timestamp_millis_opt(legacy.time.created)
1230 .single()
1231 .unwrap_or_else(Utc::now);
1232 let updated = Utc
1233 .timestamp_millis_opt(legacy.time.updated)
1234 .single()
1235 .unwrap_or(created);
1236
1237 let session_id = legacy.id.clone();
1238 out.insert(
1239 session_id.clone(),
1240 Session {
1241 id: session_id.clone(),
1242 slug: legacy.slug,
1243 version: legacy.version,
1244 project_id: legacy.project_id,
1245 title: legacy
1246 .title
1247 .filter(|s| !s.trim().is_empty())
1248 .unwrap_or_else(|| "New session".to_string()),
1249 directory: legacy
1250 .directory
1251 .clone()
1252 .filter(|s| !s.trim().is_empty())
1253 .unwrap_or_else(|| ".".to_string()),
1254 workspace_root: legacy
1255 .directory
1256 .as_deref()
1257 .and_then(normalize_workspace_path),
1258 origin_workspace_root: None,
1259 attached_from_workspace: None,
1260 attached_to_workspace: None,
1261 attach_timestamp_ms: None,
1262 attach_reason: None,
1263 time: tandem_types::SessionTime { created, updated },
1264 model: None,
1265 provider: None,
1266 environment: None,
1267 messages: load_legacy_session_messages(base, &session_id),
1268 },
1269 );
1270 }
1271 }
1272 Ok(out)
1273}
1274
1275#[derive(Debug, Deserialize)]
1276struct LegacyMessageTime {
1277 created: i64,
1278}
1279
1280#[derive(Debug, Deserialize)]
1281struct LegacyMessage {
1282 id: String,
1283 role: String,
1284 time: LegacyMessageTime,
1285}
1286
1287#[derive(Debug, Deserialize)]
1288struct LegacyPart {
1289 #[serde(rename = "type")]
1290 part_type: Option<String>,
1291 text: Option<String>,
1292 tool: Option<String>,
1293 args: Option<Value>,
1294 result: Option<Value>,
1295 error: Option<String>,
1296}
1297
1298fn load_legacy_session_messages(base: &Path, session_id: &str) -> Vec<Message> {
1299 let msg_dir = base.join("message").join(session_id);
1300 if !msg_dir.is_dir() {
1301 return Vec::new();
1302 }
1303
1304 let mut legacy_messages: Vec<(i64, Message)> = Vec::new();
1305
1306 let Ok(entries) = std::fs::read_dir(&msg_dir) else {
1307 return Vec::new();
1308 };
1309
1310 for entry in entries.flatten() {
1311 let path = entry.path();
1312 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1313 continue;
1314 }
1315 let Ok(raw) = std::fs::read_to_string(&path) else {
1316 continue;
1317 };
1318 let Ok(legacy) = serde_json::from_str::<LegacyMessage>(&raw) else {
1319 continue;
1320 };
1321
1322 let created_at = Utc
1323 .timestamp_millis_opt(legacy.time.created)
1324 .single()
1325 .unwrap_or_else(Utc::now);
1326
1327 legacy_messages.push((
1328 legacy.time.created,
1329 Message {
1330 id: legacy.id.clone(),
1331 role: legacy_role_to_message_role(&legacy.role),
1332 parts: load_legacy_message_parts(base, &legacy.id),
1333 created_at,
1334 },
1335 ));
1336 }
1337
1338 legacy_messages.sort_by_key(|(created_ms, _)| *created_ms);
1339 legacy_messages.into_iter().map(|(_, msg)| msg).collect()
1340}
1341
1342fn load_legacy_message_parts(base: &Path, message_id: &str) -> Vec<MessagePart> {
1343 let parts_dir = base.join("part").join(message_id);
1344 if !parts_dir.is_dir() {
1345 return Vec::new();
1346 }
1347
1348 let Ok(entries) = std::fs::read_dir(&parts_dir) else {
1349 return Vec::new();
1350 };
1351
1352 let mut out = Vec::new();
1353 for entry in entries.flatten() {
1354 let path = entry.path();
1355 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1356 continue;
1357 }
1358 let Ok(raw) = std::fs::read_to_string(&path) else {
1359 continue;
1360 };
1361 let Ok(part) = serde_json::from_str::<LegacyPart>(&raw) else {
1362 continue;
1363 };
1364
1365 let mapped = if let Some(tool) = part.tool {
1366 Some(MessagePart::ToolInvocation {
1367 tool,
1368 args: part.args.unwrap_or_else(|| json!({})),
1369 result: part.result,
1370 error: part.error,
1371 })
1372 } else {
1373 match part.part_type.as_deref() {
1374 Some("reasoning") => Some(MessagePart::Reasoning {
1375 text: part.text.unwrap_or_default(),
1376 }),
1377 Some("tool") => Some(MessagePart::ToolInvocation {
1378 tool: "tool".to_string(),
1379 args: part.args.unwrap_or_else(|| json!({})),
1380 result: part.result,
1381 error: part.error,
1382 }),
1383 Some("text") | None => Some(MessagePart::Text {
1384 text: part.text.unwrap_or_default(),
1385 }),
1386 _ => None,
1387 }
1388 };
1389
1390 if let Some(part) = mapped {
1391 out.push(part);
1392 }
1393 }
1394 out
1395}
1396
1397fn legacy_role_to_message_role(role: &str) -> MessageRole {
1398 match role.to_lowercase().as_str() {
1399 "user" => MessageRole::User,
1400 "assistant" => MessageRole::Assistant,
1401 "system" => MessageRole::System,
1402 "tool" => MessageRole::Tool,
1403 _ => MessageRole::Assistant,
1404 }
1405}
1406
1407#[derive(Debug, Clone, Default)]
1408struct MessageMergeStats {
1409 messages_recovered: u64,
1410 parts_recovered: u64,
1411 conflicts_merged: u64,
1412}
1413
1414fn message_richness(msg: &Message) -> usize {
1415 msg.parts
1416 .iter()
1417 .map(|p| match p {
1418 MessagePart::Text { text } | MessagePart::Reasoning { text } => {
1419 if text.trim().is_empty() {
1420 0
1421 } else {
1422 1
1423 }
1424 }
1425 MessagePart::ToolInvocation { result, error, .. } => {
1426 if result.is_some() || error.is_some() {
1427 2
1428 } else {
1429 1
1430 }
1431 }
1432 })
1433 .sum()
1434}
1435
1436fn most_recent_message_time(messages: &[Message]) -> Option<chrono::DateTime<Utc>> {
1437 messages.iter().map(|m| m.created_at).max()
1438}
1439
1440fn merge_session_messages(
1441 existing: &[Message],
1442 imported: &[Message],
1443) -> (Vec<Message>, MessageMergeStats, bool) {
1444 if existing.is_empty() {
1445 let messages_recovered = imported.len() as u64;
1446 let parts_recovered = imported.iter().map(|m| m.parts.len() as u64).sum();
1447 return (
1448 imported.to_vec(),
1449 MessageMergeStats {
1450 messages_recovered,
1451 parts_recovered,
1452 conflicts_merged: 0,
1453 },
1454 true,
1455 );
1456 }
1457
1458 let mut merged_by_id: HashMap<String, Message> = existing
1459 .iter()
1460 .cloned()
1461 .map(|m| (m.id.clone(), m))
1462 .collect();
1463 let mut stats = MessageMergeStats::default();
1464 let mut changed = false;
1465
1466 for incoming in imported {
1467 match merged_by_id.get(&incoming.id) {
1468 None => {
1469 merged_by_id.insert(incoming.id.clone(), incoming.clone());
1470 stats.messages_recovered += 1;
1471 stats.parts_recovered += incoming.parts.len() as u64;
1472 changed = true;
1473 }
1474 Some(current) => {
1475 let incoming_richer = message_richness(incoming) > message_richness(current)
1476 || incoming.parts.len() > current.parts.len();
1477 if incoming_richer {
1478 merged_by_id.insert(incoming.id.clone(), incoming.clone());
1479 stats.conflicts_merged += 1;
1480 changed = true;
1481 }
1482 }
1483 }
1484 }
1485
1486 let mut out: Vec<Message> = merged_by_id.into_values().collect();
1487 out.sort_by_key(|m| m.created_at);
1488 (out, stats, changed)
1489}
1490
1491#[cfg(test)]
1492mod tests {
1493 use super::*;
1494 use std::fs as stdfs;
1495
1496 #[tokio::test]
1497 async fn todos_are_normalized_to_wire_shape() {
1498 let base = std::env::temp_dir().join(format!("tandem-core-test-{}", Uuid::new_v4()));
1499 let storage = Storage::new(&base).await.expect("storage");
1500 let session = Session::new(Some("test".to_string()), Some(".".to_string()));
1501 let id = session.id.clone();
1502 storage.save_session(session).await.expect("save session");
1503
1504 storage
1505 .set_todos(
1506 &id,
1507 vec![
1508 json!({"content":"first"}),
1509 json!({"text":"second", "status":"in_progress"}),
1510 json!({"id":"keep-id","content":"third","status":"completed"}),
1511 ],
1512 )
1513 .await
1514 .expect("set todos");
1515
1516 let todos = storage.get_todos(&id).await;
1517 assert_eq!(todos.len(), 3);
1518 for todo in todos {
1519 assert!(todo.get("id").and_then(|v| v.as_str()).is_some());
1520 assert!(todo.get("content").and_then(|v| v.as_str()).is_some());
1521 assert!(todo.get("status").and_then(|v| v.as_str()).is_some());
1522 }
1523 }
1524
1525 #[tokio::test]
1526 async fn imports_legacy_opencode_session_index_when_sessions_json_missing() {
1527 let base =
1528 std::env::temp_dir().join(format!("tandem-core-legacy-import-{}", Uuid::new_v4()));
1529 let legacy_session_dir = base.join("session").join("global");
1530 stdfs::create_dir_all(&legacy_session_dir).expect("legacy session dir");
1531 stdfs::write(
1532 legacy_session_dir.join("ses_test.json"),
1533 r#"{
1534 "id": "ses_test",
1535 "slug": "test",
1536 "version": "1.0.0",
1537 "projectID": "proj_1",
1538 "directory": "C:\\work\\demo",
1539 "title": "Legacy Session",
1540 "time": { "created": 1770913145613, "updated": 1770913146613 }
1541}"#,
1542 )
1543 .expect("legacy session write");
1544
1545 let storage = Storage::new(&base).await.expect("storage");
1546 let sessions = storage.list_sessions().await;
1547 assert_eq!(sessions.len(), 1);
1548 assert_eq!(sessions[0].id, "ses_test");
1549 assert_eq!(sessions[0].title, "Legacy Session");
1550 assert!(base.join("sessions.json").exists());
1551 }
1552
1553 #[tokio::test]
1554 async fn imports_legacy_messages_and_parts_for_session() {
1555 let base = std::env::temp_dir().join(format!("tandem-core-legacy-msg-{}", Uuid::new_v4()));
1556 let session_dir = base.join("session").join("global");
1557 let message_dir = base.join("message").join("ses_test");
1558 let part_dir = base.join("part").join("msg_1");
1559 stdfs::create_dir_all(&session_dir).expect("session dir");
1560 stdfs::create_dir_all(&message_dir).expect("message dir");
1561 stdfs::create_dir_all(&part_dir).expect("part dir");
1562
1563 stdfs::write(
1564 session_dir.join("ses_test.json"),
1565 r#"{
1566 "id": "ses_test",
1567 "projectID": "proj_1",
1568 "directory": "C:\\work\\demo",
1569 "title": "Legacy Session",
1570 "time": { "created": 1770913145613, "updated": 1770913146613 }
1571}"#,
1572 )
1573 .expect("write session");
1574
1575 stdfs::write(
1576 message_dir.join("msg_1.json"),
1577 r#"{
1578 "id": "msg_1",
1579 "sessionID": "ses_test",
1580 "role": "assistant",
1581 "time": { "created": 1770913145613 }
1582}"#,
1583 )
1584 .expect("write msg");
1585
1586 stdfs::write(
1587 part_dir.join("prt_1.json"),
1588 r#"{
1589 "id": "prt_1",
1590 "sessionID": "ses_test",
1591 "messageID": "msg_1",
1592 "type": "text",
1593 "text": "hello from legacy"
1594}"#,
1595 )
1596 .expect("write part");
1597
1598 let storage = Storage::new(&base).await.expect("storage");
1599 let sessions = storage.list_sessions().await;
1600 assert_eq!(sessions.len(), 1);
1601 assert_eq!(sessions[0].messages.len(), 1);
1602 assert_eq!(sessions[0].messages[0].parts.len(), 1);
1603 }
1604
1605 #[tokio::test]
1606 async fn skips_legacy_merge_when_sessions_json_exists() {
1607 let base =
1608 std::env::temp_dir().join(format!("tandem-core-legacy-merge-{}", Uuid::new_v4()));
1609 stdfs::create_dir_all(&base).expect("base");
1610 stdfs::write(
1611 base.join("sessions.json"),
1612 r#"{
1613 "ses_current": {
1614 "id": "ses_current",
1615 "slug": null,
1616 "version": "v1",
1617 "project_id": null,
1618 "title": "Current Session",
1619 "directory": ".",
1620 "workspace_root": null,
1621 "origin_workspace_root": null,
1622 "attached_from_workspace": null,
1623 "attached_to_workspace": null,
1624 "attach_timestamp_ms": null,
1625 "attach_reason": null,
1626 "time": {"created":"2026-01-01T00:00:00Z","updated":"2026-01-01T00:00:00Z"},
1627 "model": null,
1628 "provider": null,
1629 "messages": []
1630 }
1631}"#,
1632 )
1633 .expect("sessions.json");
1634
1635 let legacy_session_dir = base.join("session").join("global");
1636 stdfs::create_dir_all(&legacy_session_dir).expect("legacy session dir");
1637 stdfs::write(
1638 legacy_session_dir.join("ses_legacy.json"),
1639 r#"{
1640 "id": "ses_legacy",
1641 "slug": "legacy",
1642 "version": "1.0.0",
1643 "projectID": "proj_legacy",
1644 "directory": "C:\\work\\legacy",
1645 "title": "Legacy Session",
1646 "time": { "created": 1770913145613, "updated": 1770913146613 }
1647}"#,
1648 )
1649 .expect("legacy session write");
1650
1651 let storage = Storage::new(&base).await.expect("storage");
1652 let sessions = storage.list_sessions().await;
1653 let ids = sessions.iter().map(|s| s.id.clone()).collect::<Vec<_>>();
1654 assert!(ids.contains(&"ses_current".to_string()));
1655 assert!(!ids.contains(&"ses_legacy".to_string()));
1656 }
1657
1658 #[tokio::test]
1659 async fn list_sessions_scoped_filters_by_workspace_root() {
1660 let base = std::env::temp_dir().join(format!("tandem-core-scope-{}", Uuid::new_v4()));
1661 let storage = Storage::new(&base).await.expect("storage");
1662 let ws_a = base.join("ws-a");
1663 let ws_b = base.join("ws-b");
1664 stdfs::create_dir_all(&ws_a).expect("ws_a");
1665 stdfs::create_dir_all(&ws_b).expect("ws_b");
1666 let ws_a_str = ws_a.to_string_lossy().to_string();
1667 let ws_b_str = ws_b.to_string_lossy().to_string();
1668
1669 let mut a = Session::new(Some("a".to_string()), Some(ws_a_str.clone()));
1670 a.workspace_root = Some(ws_a_str.clone());
1671 storage.save_session(a).await.expect("save a");
1672
1673 let mut b = Session::new(Some("b".to_string()), Some(ws_b_str.clone()));
1674 b.workspace_root = Some(ws_b_str);
1675 storage.save_session(b).await.expect("save b");
1676
1677 let scoped = storage
1678 .list_sessions_scoped(SessionListScope::Workspace {
1679 workspace_root: ws_a_str,
1680 })
1681 .await;
1682 assert_eq!(scoped.len(), 1);
1683 assert_eq!(scoped[0].title, "a");
1684 }
1685
1686 #[tokio::test]
1687 async fn attach_session_persists_audit_metadata() {
1688 let base = std::env::temp_dir().join(format!("tandem-core-attach-{}", Uuid::new_v4()));
1689 let storage = Storage::new(&base).await.expect("storage");
1690 let ws_a = base.join("ws-a");
1691 let ws_b = base.join("ws-b");
1692 stdfs::create_dir_all(&ws_a).expect("ws_a");
1693 stdfs::create_dir_all(&ws_b).expect("ws_b");
1694 let ws_a_str = ws_a.to_string_lossy().to_string();
1695 let ws_b_str = ws_b.to_string_lossy().to_string();
1696 let mut session = Session::new(Some("s".to_string()), Some(ws_a_str.clone()));
1697 session.workspace_root = Some(ws_a_str);
1698 let id = session.id.clone();
1699 storage.save_session(session).await.expect("save");
1700
1701 let updated = storage
1702 .attach_session_to_workspace(&id, &ws_b_str, "manual")
1703 .await
1704 .expect("attach")
1705 .expect("session exists");
1706 let normalized_expected = normalize_workspace_path(&ws_b_str).expect("normalized path");
1707 assert_eq!(
1708 updated.workspace_root.as_deref(),
1709 Some(normalized_expected.as_str())
1710 );
1711 assert_eq!(
1712 updated.attached_to_workspace.as_deref(),
1713 Some(normalized_expected.as_str())
1714 );
1715 assert_eq!(updated.attach_reason.as_deref(), Some("manual"));
1716 assert!(updated.attach_timestamp_ms.is_some());
1717 }
1718
1719 #[tokio::test]
1720 async fn append_message_part_persists_tool_invocation_and_result() {
1721 let base = std::env::temp_dir().join(format!("tandem-core-tool-parts-{}", Uuid::new_v4()));
1722 let storage = Storage::new(&base).await.expect("storage");
1723 let session = Session::new(Some("tool parts".to_string()), Some(".".to_string()));
1724 let session_id = session.id.clone();
1725 storage.save_session(session).await.expect("save session");
1726
1727 let user = Message::new(
1728 MessageRole::User,
1729 vec![MessagePart::Text {
1730 text: "build ui".to_string(),
1731 }],
1732 );
1733 let message_id = user.id.clone();
1734 storage
1735 .append_message(&session_id, user)
1736 .await
1737 .expect("append user");
1738
1739 storage
1740 .append_message_part(
1741 &session_id,
1742 &message_id,
1743 MessagePart::ToolInvocation {
1744 tool: "write".to_string(),
1745 args: json!({"path":"game.html","content":"<html></html>"}),
1746 result: None,
1747 error: None,
1748 },
1749 )
1750 .await
1751 .expect("append invocation");
1752 storage
1753 .append_message_part(
1754 &session_id,
1755 &message_id,
1756 MessagePart::ToolInvocation {
1757 tool: "write".to_string(),
1758 args: json!({}),
1759 result: Some(json!("ok")),
1760 error: None,
1761 },
1762 )
1763 .await
1764 .expect("append result");
1765
1766 let session = storage.get_session(&session_id).await.expect("session");
1767 let message = session
1768 .messages
1769 .iter()
1770 .find(|message| message.id == message_id)
1771 .expect("message");
1772 assert_eq!(message.parts.len(), 2);
1773 match &message.parts[1] {
1774 MessagePart::ToolInvocation {
1775 tool,
1776 result,
1777 error,
1778 ..
1779 } => {
1780 assert_eq!(tool, "write");
1781 assert_eq!(result.as_ref(), Some(&json!("ok")));
1782 assert_eq!(error.as_deref(), None);
1783 }
1784 other => panic!("expected tool part, got {other:?}"),
1785 }
1786 }
1787
1788 #[tokio::test]
1789 async fn append_message_part_retains_failed_tool_error() {
1790 let base = std::env::temp_dir().join(format!("tandem-core-tool-error-{}", Uuid::new_v4()));
1791 let storage = Storage::new(&base).await.expect("storage");
1792 let session = Session::new(Some("tool errors".to_string()), Some(".".to_string()));
1793 let session_id = session.id.clone();
1794 storage.save_session(session).await.expect("save session");
1795
1796 let user = Message::new(
1797 MessageRole::User,
1798 vec![MessagePart::Text {
1799 text: "write file".to_string(),
1800 }],
1801 );
1802 let message_id = user.id.clone();
1803 storage
1804 .append_message(&session_id, user)
1805 .await
1806 .expect("append user");
1807
1808 storage
1809 .append_message_part(
1810 &session_id,
1811 &message_id,
1812 MessagePart::ToolInvocation {
1813 tool: "write".to_string(),
1814 args: json!({"path":"game.html"}),
1815 result: None,
1816 error: None,
1817 },
1818 )
1819 .await
1820 .expect("append invocation");
1821 storage
1822 .append_message_part(
1823 &session_id,
1824 &message_id,
1825 MessagePart::ToolInvocation {
1826 tool: "write".to_string(),
1827 args: json!({}),
1828 result: None,
1829 error: Some("WRITE_CONTENT_MISSING".to_string()),
1830 },
1831 )
1832 .await
1833 .expect("append error");
1834
1835 let session = storage.get_session(&session_id).await.expect("session");
1836 let message = session
1837 .messages
1838 .iter()
1839 .find(|message| message.id == message_id)
1840 .expect("message");
1841 match &message.parts[1] {
1842 MessagePart::ToolInvocation { error, .. } => {
1843 assert_eq!(error.as_deref(), Some("WRITE_CONTENT_MISSING"));
1844 }
1845 other => panic!("expected tool part, got {other:?}"),
1846 }
1847 }
1848
1849 #[tokio::test]
1850 async fn append_message_part_coalesces_repeated_tool_invocation_updates() {
1851 let base = std::env::temp_dir().join(format!("tandem-core-tool-merge-{}", Uuid::new_v4()));
1852 let storage = Storage::new(&base).await.expect("storage");
1853 let session = Session::new(Some("tool merge".to_string()), Some(".".to_string()));
1854 let session_id = session.id.clone();
1855 storage.save_session(session).await.expect("save session");
1856
1857 let user = Message::new(
1858 MessageRole::User,
1859 vec![MessagePart::Text {
1860 text: "build ui".to_string(),
1861 }],
1862 );
1863 let message_id = user.id.clone();
1864 storage
1865 .append_message(&session_id, user)
1866 .await
1867 .expect("append user");
1868
1869 storage
1870 .append_message_part(
1871 &session_id,
1872 &message_id,
1873 MessagePart::ToolInvocation {
1874 tool: "write".to_string(),
1875 args: json!({"path":"game.html"}),
1876 result: None,
1877 error: None,
1878 },
1879 )
1880 .await
1881 .expect("append first invocation");
1882 storage
1883 .append_message_part(
1884 &session_id,
1885 &message_id,
1886 MessagePart::ToolInvocation {
1887 tool: "write".to_string(),
1888 args: json!({"path":"game.html","content":"<html></html>"}),
1889 result: None,
1890 error: None,
1891 },
1892 )
1893 .await
1894 .expect("append updated invocation");
1895
1896 let session = storage.get_session(&session_id).await.expect("session");
1897 let message = session
1898 .messages
1899 .iter()
1900 .find(|message| message.id == message_id)
1901 .expect("message");
1902 assert_eq!(message.parts.len(), 2);
1903 match &message.parts[1] {
1904 MessagePart::ToolInvocation { tool, args, .. } => {
1905 assert_eq!(tool, "write");
1906 assert_eq!(args["path"], "game.html");
1907 assert_eq!(args["content"], "<html></html>");
1908 }
1909 other => panic!("expected tool part, got {other:?}"),
1910 }
1911 }
1912
1913 #[tokio::test]
1914 async fn append_message_part_upgrades_raw_string_args_to_structured_invocation_args() {
1915 let base =
1916 std::env::temp_dir().join(format!("tandem-core-tool-raw-upgrade-{}", Uuid::new_v4()));
1917 let storage = Storage::new(&base).await.expect("storage");
1918 let session = Session::new(Some("tool raw upgrade".to_string()), Some(".".to_string()));
1919 let session_id = session.id.clone();
1920 storage.save_session(session).await.expect("save session");
1921
1922 let user = Message::new(
1923 MessageRole::User,
1924 vec![MessagePart::Text {
1925 text: "build ui".to_string(),
1926 }],
1927 );
1928 let message_id = user.id.clone();
1929 storage
1930 .append_message(&session_id, user)
1931 .await
1932 .expect("append user");
1933
1934 storage
1935 .append_message_part(
1936 &session_id,
1937 &message_id,
1938 MessagePart::ToolInvocation {
1939 tool: "write".to_string(),
1940 args: json!("{\"path\":\"game.html\",\"content\":\"<html>draft</html>\"}"),
1941 result: None,
1942 error: None,
1943 },
1944 )
1945 .await
1946 .expect("append raw invocation");
1947 storage
1948 .append_message_part(
1949 &session_id,
1950 &message_id,
1951 MessagePart::ToolInvocation {
1952 tool: "write".to_string(),
1953 args: json!({"path":"game.html","content":"<html>draft</html>"}),
1954 result: None,
1955 error: None,
1956 },
1957 )
1958 .await
1959 .expect("append structured invocation");
1960
1961 let session = storage.get_session(&session_id).await.expect("session");
1962 let message = session
1963 .messages
1964 .iter()
1965 .find(|message| message.id == message_id)
1966 .expect("message");
1967 assert_eq!(message.parts.len(), 2);
1968 match &message.parts[1] {
1969 MessagePart::ToolInvocation { tool, args, .. } => {
1970 assert_eq!(tool, "write");
1971 assert_eq!(args["path"], "game.html");
1972 assert_eq!(args["content"], "<html>draft</html>");
1973 }
1974 other => panic!("expected tool part, got {other:?}"),
1975 }
1976 }
1977
1978 #[tokio::test]
1979 async fn append_message_part_upgrades_raw_string_args_when_result_arrives_with_structure() {
1980 let base = std::env::temp_dir().join(format!(
1981 "tandem-core-tool-raw-result-upgrade-{}",
1982 Uuid::new_v4()
1983 ));
1984 let storage = Storage::new(&base).await.expect("storage");
1985 let session = Session::new(
1986 Some("tool raw result upgrade".to_string()),
1987 Some(".".to_string()),
1988 );
1989 let session_id = session.id.clone();
1990 storage.save_session(session).await.expect("save session");
1991
1992 let user = Message::new(
1993 MessageRole::User,
1994 vec![MessagePart::Text {
1995 text: "build ui".to_string(),
1996 }],
1997 );
1998 let message_id = user.id.clone();
1999 storage
2000 .append_message(&session_id, user)
2001 .await
2002 .expect("append user");
2003
2004 storage
2005 .append_message_part(
2006 &session_id,
2007 &message_id,
2008 MessagePart::ToolInvocation {
2009 tool: "write".to_string(),
2010 args: json!("{\"path\":\"game.html\",\"content\":\"<html>draft</html>\"}"),
2011 result: None,
2012 error: None,
2013 },
2014 )
2015 .await
2016 .expect("append raw invocation");
2017 storage
2018 .append_message_part(
2019 &session_id,
2020 &message_id,
2021 MessagePart::ToolInvocation {
2022 tool: "write".to_string(),
2023 args: json!({"path":"game.html","content":"<html>draft</html>"}),
2024 result: Some(json!("ok")),
2025 error: None,
2026 },
2027 )
2028 .await
2029 .expect("append structured result");
2030
2031 let session = storage.get_session(&session_id).await.expect("session");
2032 let message = session
2033 .messages
2034 .iter()
2035 .find(|message| message.id == message_id)
2036 .expect("message");
2037 assert_eq!(message.parts.len(), 2);
2038 match &message.parts[1] {
2039 MessagePart::ToolInvocation {
2040 tool,
2041 args,
2042 result,
2043 error,
2044 } => {
2045 assert_eq!(tool, "write");
2046 assert_eq!(args["path"], "game.html");
2047 assert_eq!(args["content"], "<html>draft</html>");
2048 assert_eq!(result.as_ref(), Some(&json!("ok")));
2049 assert_eq!(error.as_deref(), None);
2050 }
2051 other => panic!("expected tool part, got {other:?}"),
2052 }
2053 }
2054
2055 #[tokio::test]
2056 async fn append_message_part_upgrades_partial_structured_args_when_result_adds_fields() {
2057 let base = std::env::temp_dir().join(format!(
2058 "tandem-core-tool-structured-result-upgrade-{}",
2059 Uuid::new_v4()
2060 ));
2061 let storage = Storage::new(&base).await.expect("storage");
2062 let session = Session::new(
2063 Some("tool structured result upgrade".to_string()),
2064 Some(".".to_string()),
2065 );
2066 let session_id = session.id.clone();
2067 storage.save_session(session).await.expect("save session");
2068
2069 let user = Message::new(
2070 MessageRole::User,
2071 vec![MessagePart::Text {
2072 text: "build ui".to_string(),
2073 }],
2074 );
2075 let message_id = user.id.clone();
2076 storage
2077 .append_message(&session_id, user)
2078 .await
2079 .expect("append user");
2080
2081 storage
2082 .append_message_part(
2083 &session_id,
2084 &message_id,
2085 MessagePart::ToolInvocation {
2086 tool: "write".to_string(),
2087 args: json!({"path":"game.html"}),
2088 result: None,
2089 error: None,
2090 },
2091 )
2092 .await
2093 .expect("append partial invocation");
2094 storage
2095 .append_message_part(
2096 &session_id,
2097 &message_id,
2098 MessagePart::ToolInvocation {
2099 tool: "write".to_string(),
2100 args: json!({"path":"game.html","content":"<html>draft</html>"}),
2101 result: Some(json!("ok")),
2102 error: None,
2103 },
2104 )
2105 .await
2106 .expect("append richer result");
2107
2108 let session = storage.get_session(&session_id).await.expect("session");
2109 let message = session
2110 .messages
2111 .iter()
2112 .find(|message| message.id == message_id)
2113 .expect("message");
2114 assert_eq!(message.parts.len(), 2);
2115 match &message.parts[1] {
2116 MessagePart::ToolInvocation {
2117 tool,
2118 args,
2119 result,
2120 error,
2121 } => {
2122 assert_eq!(tool, "write");
2123 assert_eq!(args["path"], "game.html");
2124 assert_eq!(args["content"], "<html>draft</html>");
2125 assert_eq!(result.as_ref(), Some(&json!("ok")));
2126 assert_eq!(error.as_deref(), None);
2127 }
2128 other => panic!("expected tool part, got {other:?}"),
2129 }
2130 }
2131
2132 #[tokio::test]
2133 async fn append_message_part_falls_back_to_latest_user_message_when_id_missing() {
2134 let base =
2135 std::env::temp_dir().join(format!("tandem-core-tool-fallback-{}", Uuid::new_v4()));
2136 let storage = Storage::new(&base).await.expect("storage");
2137 let session = Session::new(Some("tool fallback".to_string()), Some(".".to_string()));
2138 let session_id = session.id.clone();
2139 storage.save_session(session).await.expect("save session");
2140
2141 let first = Message::new(
2142 MessageRole::User,
2143 vec![MessagePart::Text {
2144 text: "first prompt".to_string(),
2145 }],
2146 );
2147 let second = Message::new(
2148 MessageRole::User,
2149 vec![MessagePart::Text {
2150 text: "second prompt".to_string(),
2151 }],
2152 );
2153 let second_id = second.id.clone();
2154 storage
2155 .append_message(&session_id, first)
2156 .await
2157 .expect("append first");
2158 storage
2159 .append_message(&session_id, second)
2160 .await
2161 .expect("append second");
2162
2163 storage
2164 .append_message_part(
2165 &session_id,
2166 "missing-message-id",
2167 MessagePart::ToolInvocation {
2168 tool: "glob".to_string(),
2169 args: json!({"pattern":"*"}),
2170 result: Some(json!(["README.md"])),
2171 error: None,
2172 },
2173 )
2174 .await
2175 .expect("append fallback tool part");
2176
2177 let session = storage.get_session(&session_id).await.expect("session");
2178 let message = session
2179 .messages
2180 .iter()
2181 .find(|message| message.id == second_id)
2182 .expect("latest user message");
2183 match &message.parts[1] {
2184 MessagePart::ToolInvocation { tool, result, .. } => {
2185 assert_eq!(tool, "glob");
2186 assert_eq!(result.as_ref(), Some(&json!(["README.md"])));
2187 }
2188 other => panic!("expected tool part, got {other:?}"),
2189 }
2190 }
2191
2192 #[tokio::test]
2193 async fn commit_temp_file_replaces_existing_destination() {
2194 let base =
2195 std::env::temp_dir().join(format!("tandem-core-commit-temp-file-{}", Uuid::new_v4()));
2196 stdfs::create_dir_all(&base).expect("base dir");
2197 let destination = base.join("sessions.json");
2198 let temp = base.join("sessions.json.tmp");
2199 stdfs::write(&destination, "{\"version\":\"old\"}").expect("write destination");
2200 stdfs::write(&temp, "{\"version\":\"new\"}").expect("write temp");
2201
2202 commit_temp_file(&temp, &destination)
2203 .await
2204 .expect("replace destination");
2205
2206 let raw = stdfs::read_to_string(&destination).expect("read destination");
2207 assert_eq!(raw, "{\"version\":\"new\"}");
2208 assert!(!temp.exists());
2209 }
2210
2211 #[tokio::test]
2212 async fn startup_compacts_session_snapshot_metadata() {
2213 let base = std::env::temp_dir().join(format!(
2214 "tandem-core-snapshot-compaction-{}",
2215 Uuid::new_v4()
2216 ));
2217 stdfs::create_dir_all(&base).expect("base dir");
2218
2219 let mut session = Session::new(
2220 Some("snapshot compaction".to_string()),
2221 Some(".".to_string()),
2222 );
2223 session.messages.push(Message::new(
2224 MessageRole::User,
2225 vec![MessagePart::Text {
2226 text: "current".to_string(),
2227 }],
2228 ));
2229 let session_id = session.id.clone();
2230
2231 let mut sessions = HashMap::new();
2232 sessions.insert(session_id.clone(), session);
2233 stdfs::write(
2234 base.join("sessions.json"),
2235 serde_json::to_string_pretty(&sessions).expect("serialize sessions"),
2236 )
2237 .expect("write sessions");
2238
2239 let mut snapshots = Vec::new();
2240 for label in ["a", "a", "b", "c", "d", "e", "f"] {
2241 snapshots.push(vec![Message::new(
2242 MessageRole::User,
2243 vec![MessagePart::Text {
2244 text: label.to_string(),
2245 }],
2246 )]);
2247 }
2248 let mut metadata = HashMap::new();
2249 metadata.insert(
2250 session_id.clone(),
2251 SessionMeta {
2252 snapshots,
2253 ..SessionMeta::default()
2254 },
2255 );
2256 metadata.insert("orphan".to_string(), SessionMeta::default());
2257 stdfs::write(
2258 base.join("session_meta.json"),
2259 serde_json::to_string_pretty(&metadata).expect("serialize metadata"),
2260 )
2261 .expect("write metadata");
2262 stdfs::write(base.join("questions.json"), "{}").expect("write questions");
2263
2264 let _storage = Storage::new(&base).await.expect("storage");
2265
2266 let raw = stdfs::read_to_string(base.join("session_meta.json")).expect("read metadata");
2267 let stored: HashMap<String, SessionMeta> =
2268 serde_json::from_str(&raw).expect("parse metadata");
2269 assert_eq!(stored.len(), 1);
2270 let compacted = stored.get(&session_id).expect("session metadata");
2271 assert_eq!(compacted.snapshots.len(), MAX_SESSION_SNAPSHOTS);
2272
2273 let labels = compacted
2274 .snapshots
2275 .iter()
2276 .map(|snapshot| {
2277 snapshot[0]
2278 .parts
2279 .iter()
2280 .find_map(|part| match part {
2281 MessagePart::Text { text } => Some(text.clone()),
2282 _ => None,
2283 })
2284 .expect("snapshot text")
2285 })
2286 .collect::<Vec<_>>();
2287 assert_eq!(labels, vec!["b", "c", "d", "e", "f"]);
2288 }
2289
2290 #[tokio::test]
2291 async fn startup_repairs_placeholder_titles_from_wrapped_user_messages() {
2292 let base =
2293 std::env::temp_dir().join(format!("tandem-core-title-repair-{}", Uuid::new_v4()));
2294 let storage = Storage::new(&base).await.expect("storage");
2295 let wrapped = "<memory_context>\n<current_session>\n- fact\n</current_session>\n</memory_context>\n\n[Mode instructions]\nUse tools.\n\n[User request]\nExplain this bug";
2296 let mut session = Session::new(Some("<memory_context>".to_string()), Some(".".to_string()));
2297 let id = session.id.clone();
2298 session.messages.push(Message::new(
2299 MessageRole::User,
2300 vec![MessagePart::Text {
2301 text: wrapped.to_string(),
2302 }],
2303 ));
2304 storage.save_session(session).await.expect("save");
2305 drop(storage);
2306
2307 let storage = Storage::new(&base).await.expect("storage");
2308 let repaired = storage.get_session(&id).await.expect("session");
2309 assert_eq!(repaired.title, "Explain this bug");
2310 }
2311}