1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use anyhow::Context;
5use chrono::{TimeZone, Utc};
6use serde::{Deserialize, Serialize};
7use serde_json::{json, Value};
8use tokio::fs;
9use tokio::sync::{Mutex, RwLock};
10use tokio::task;
11use uuid::Uuid;
12
13use tandem_types::{Message, MessagePart, MessageRole, Session};
14
15use crate::message_part_reducer::reduce_message_parts;
16use crate::{
17 derive_session_title_from_prompt, normalize_workspace_path, title_needs_repair,
18 workspace_project_id,
19};
20
21#[derive(Debug, Clone, Serialize, Deserialize, Default)]
22pub struct SessionMeta {
23 pub parent_id: Option<String>,
24 #[serde(default)]
25 pub archived: bool,
26 #[serde(default)]
27 pub shared: bool,
28 pub share_id: Option<String>,
29 pub summary: Option<String>,
30 #[serde(default)]
31 pub snapshots: Vec<Vec<Message>>,
32 pub pre_revert: Option<Vec<Message>>,
33 #[serde(default)]
34 pub todos: Vec<Value>,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct QuestionToolRef {
39 #[serde(rename = "callID")]
40 pub call_id: String,
41 #[serde(rename = "messageID")]
42 pub message_id: String,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct QuestionRequest {
47 pub id: String,
48 #[serde(rename = "sessionID")]
49 pub session_id: String,
50 #[serde(default)]
51 pub questions: Vec<Value>,
52 #[serde(skip_serializing_if = "Option::is_none")]
53 pub tool: Option<QuestionToolRef>,
54}
55
56pub struct Storage {
57 base: PathBuf,
58 sessions: RwLock<HashMap<String, Session>>,
59 metadata: RwLock<HashMap<String, SessionMeta>>,
60 question_requests: RwLock<HashMap<String, QuestionRequest>>,
61 flush_lock: Mutex<()>,
62}
63
64#[derive(Debug, Clone)]
65pub enum SessionListScope {
66 Global,
67 Workspace { workspace_root: String },
68}
69
70#[derive(Debug, Clone, Default, Serialize, Deserialize)]
71pub struct SessionRepairStats {
72 pub sessions_repaired: u64,
73 pub messages_recovered: u64,
74 pub parts_recovered: u64,
75 pub conflicts_merged: u64,
76}
77
78const LEGACY_IMPORT_MARKER_FILE: &str = "legacy_import_marker.json";
79const LEGACY_IMPORT_MARKER_VERSION: u32 = 1;
80const MAX_SESSION_SNAPSHOTS: usize = 5;
81
82#[derive(Debug, Clone, Default, Serialize, Deserialize)]
83pub struct LegacyTreeCounts {
84 pub session_files: u64,
85 pub message_files: u64,
86 pub part_files: u64,
87}
88
89#[derive(Debug, Clone, Default, Serialize, Deserialize)]
90pub struct LegacyImportedCounts {
91 pub sessions: u64,
92 pub messages: u64,
93 pub parts: u64,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct LegacyImportMarker {
98 pub version: u32,
99 pub created_at_ms: u64,
100 pub last_checked_at_ms: u64,
101 pub legacy_counts: LegacyTreeCounts,
102 pub imported_counts: LegacyImportedCounts,
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct LegacyRepairRunReport {
107 pub status: String,
108 pub marker_updated: bool,
109 pub sessions_merged: u64,
110 pub messages_recovered: u64,
111 pub parts_recovered: u64,
112 pub legacy_counts: LegacyTreeCounts,
113 pub imported_counts: LegacyImportedCounts,
114}
115
116fn snapshot_session_messages(
117 session_id: &str,
118 session: &Session,
119 metadata: &mut HashMap<String, SessionMeta>,
120) {
121 let meta = metadata
122 .entry(session_id.to_string())
123 .or_insert_with(SessionMeta::default);
124 meta.snapshots.push(session.messages.clone());
125 trim_session_snapshots(&mut meta.snapshots);
126}
127
128fn trim_session_snapshots(snapshots: &mut Vec<Vec<Message>>) {
129 if snapshots.len() > MAX_SESSION_SNAPSHOTS {
130 let keep_from = snapshots.len() - MAX_SESSION_SNAPSHOTS;
131 snapshots.drain(0..keep_from);
132 }
133}
134
135fn compact_session_snapshots(snapshots: &mut Vec<Vec<Message>>) -> usize {
136 if snapshots.is_empty() {
137 return 0;
138 }
139
140 let original_len = snapshots.len();
141 let mut compacted = Vec::with_capacity(original_len);
142 let mut previous_encoded: Option<Vec<u8>> = None;
143
144 for snapshot in snapshots.drain(..) {
145 let encoded = serde_json::to_vec(&snapshot).unwrap_or_default();
146 if previous_encoded.as_ref() == Some(&encoded) {
147 continue;
148 }
149 previous_encoded = Some(encoded);
150 compacted.push(snapshot);
151 }
152
153 trim_session_snapshots(&mut compacted);
154 let removed = original_len.saturating_sub(compacted.len());
155 *snapshots = compacted;
156 removed
157}
158
159fn session_meta_is_empty(meta: &SessionMeta) -> bool {
160 meta.parent_id.is_none()
161 && !meta.archived
162 && !meta.shared
163 && meta.share_id.is_none()
164 && meta.summary.is_none()
165 && meta.snapshots.is_empty()
166 && meta.pre_revert.is_none()
167 && meta.todos.is_empty()
168}
169
170#[derive(Debug, Default)]
171struct SessionMetaCompactionStats {
172 metadata_pruned: u64,
173 snapshots_removed: u64,
174}
175
176fn compact_session_metadata(
177 sessions: &HashMap<String, Session>,
178 metadata: &mut HashMap<String, SessionMeta>,
179) -> SessionMetaCompactionStats {
180 let mut stats = SessionMetaCompactionStats::default();
181
182 metadata.retain(|session_id, meta| {
183 if !sessions.contains_key(session_id) {
184 stats.metadata_pruned += 1;
185 return false;
186 }
187
188 let removed = compact_session_snapshots(&mut meta.snapshots) as u64;
189 stats.snapshots_removed += removed;
190
191 if session_meta_is_empty(meta) {
192 stats.metadata_pruned += 1;
193 return false;
194 }
195
196 true
197 });
198
199 stats
200}
201
202impl Storage {
203 pub async fn new(base: impl AsRef<Path>) -> anyhow::Result<Self> {
204 let base = base.as_ref().to_path_buf();
205 fs::create_dir_all(&base).await?;
206 let sessions_file = base.join("sessions.json");
207 let marker_path = base.join(LEGACY_IMPORT_MARKER_FILE);
208 let sessions_file_exists = sessions_file.exists();
209 let mut imported_legacy_sessions = false;
210 let mut sessions = if sessions_file_exists {
211 let raw = fs::read_to_string(&sessions_file).await?;
212 serde_json::from_str::<HashMap<String, Session>>(&raw).unwrap_or_default()
213 } else {
214 HashMap::new()
215 };
216
217 let mut marker_to_write = None;
218 if should_run_legacy_scan_on_startup(&marker_path, sessions_file_exists).await {
219 let base_for_scan = base.clone();
220 let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
221 .await
222 .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
223 if merge_legacy_sessions(&mut sessions, scan.sessions) {
224 imported_legacy_sessions = true;
225 }
226 marker_to_write = Some(LegacyImportMarker {
227 version: LEGACY_IMPORT_MARKER_VERSION,
228 created_at_ms: now_ms_u64(),
229 last_checked_at_ms: now_ms_u64(),
230 legacy_counts: scan.legacy_counts,
231 imported_counts: scan.imported_counts,
232 });
233 }
234
235 if hydrate_workspace_roots(&mut sessions) {
236 imported_legacy_sessions = true;
237 }
238 if repair_session_titles(&mut sessions) {
239 imported_legacy_sessions = true;
240 }
241 let metadata_file = base.join("session_meta.json");
242 let mut metadata = if metadata_file.exists() {
243 let raw = fs::read_to_string(&metadata_file).await?;
244 serde_json::from_str::<HashMap<String, SessionMeta>>(&raw).unwrap_or_default()
245 } else {
246 HashMap::new()
247 };
248 let compaction = compact_session_metadata(&sessions, &mut metadata);
249 let metadata_compacted = compaction.metadata_pruned > 0 || compaction.snapshots_removed > 0;
250 if metadata_compacted {
251 tracing::info!(
252 metadata_pruned = compaction.metadata_pruned,
253 snapshots_removed = compaction.snapshots_removed,
254 "compacted persisted session metadata"
255 );
256 }
257 let questions_file = base.join("questions.json");
258 let question_requests = if questions_file.exists() {
259 let raw = fs::read_to_string(&questions_file).await?;
260 serde_json::from_str::<HashMap<String, QuestionRequest>>(&raw).unwrap_or_default()
261 } else {
262 HashMap::new()
263 };
264 let storage = Self {
265 base,
266 sessions: RwLock::new(sessions),
267 metadata: RwLock::new(metadata),
268 question_requests: RwLock::new(question_requests),
269 flush_lock: Mutex::new(()),
270 };
271
272 if imported_legacy_sessions || metadata_compacted {
273 storage.flush().await?;
274 }
275 if let Some(marker) = marker_to_write {
276 storage.write_legacy_import_marker(&marker).await?;
277 }
278
279 Ok(storage)
280 }
281
282 pub async fn list_sessions(&self) -> Vec<Session> {
283 self.list_sessions_scoped(SessionListScope::Global).await
284 }
285
286 pub async fn list_sessions_scoped(&self, scope: SessionListScope) -> Vec<Session> {
287 let all = self
288 .sessions
289 .read()
290 .await
291 .values()
292 .cloned()
293 .collect::<Vec<_>>();
294 match scope {
295 SessionListScope::Global => all,
296 SessionListScope::Workspace { workspace_root } => {
297 let Some(normalized_workspace) = normalize_workspace_path(&workspace_root) else {
298 return Vec::new();
299 };
300 all.into_iter()
301 .filter(|session| {
302 let direct = session
303 .workspace_root
304 .as_ref()
305 .and_then(|p| normalize_workspace_path(p))
306 .map(|p| p == normalized_workspace)
307 .unwrap_or(false);
308 if direct {
309 return true;
310 }
311 normalize_workspace_path(&session.directory)
312 .map(|p| p == normalized_workspace)
313 .unwrap_or(false)
314 })
315 .collect()
316 }
317 }
318 }
319
320 pub async fn get_session(&self, id: &str) -> Option<Session> {
321 self.sessions.read().await.get(id).cloned()
322 }
323
324 pub async fn save_session(&self, mut session: Session) -> anyhow::Result<()> {
325 if session.workspace_root.is_none() {
326 session.workspace_root = normalize_workspace_path(&session.directory);
327 }
328 let session_id = session.id.clone();
329 self.sessions
330 .write()
331 .await
332 .insert(session_id.clone(), session);
333 self.metadata
334 .write()
335 .await
336 .entry(session_id)
337 .or_insert_with(SessionMeta::default);
338 self.flush().await
339 }
340
341 pub async fn repair_sessions_from_file_store(&self) -> anyhow::Result<SessionRepairStats> {
342 let mut stats = SessionRepairStats::default();
343 let mut sessions = self.sessions.write().await;
344
345 for session in sessions.values_mut() {
346 let imported = load_legacy_session_messages(&self.base, &session.id);
347 if imported.is_empty() {
348 continue;
349 }
350
351 let (merged, merge_stats, changed) =
352 merge_session_messages(&session.messages, &imported);
353 if changed {
354 session.messages = merged;
355 session.time.updated =
356 most_recent_message_time(&session.messages).unwrap_or(session.time.updated);
357 stats.sessions_repaired += 1;
358 stats.messages_recovered += merge_stats.messages_recovered;
359 stats.parts_recovered += merge_stats.parts_recovered;
360 stats.conflicts_merged += merge_stats.conflicts_merged;
361 }
362 }
363
364 if stats.sessions_repaired > 0 {
365 drop(sessions);
366 self.flush().await?;
367 }
368
369 Ok(stats)
370 }
371
372 pub async fn run_legacy_storage_repair_scan(
373 &self,
374 force: bool,
375 ) -> anyhow::Result<LegacyRepairRunReport> {
376 let marker_path = self.base.join(LEGACY_IMPORT_MARKER_FILE);
377 let sessions_exists = self.base.join("sessions.json").exists();
378 let should_scan = if force {
379 true
380 } else {
381 should_run_legacy_scan_on_startup(&marker_path, sessions_exists).await
382 };
383 if !should_scan {
384 let marker = read_legacy_import_marker(&marker_path)
385 .await
386 .unwrap_or_else(|| LegacyImportMarker {
387 version: LEGACY_IMPORT_MARKER_VERSION,
388 created_at_ms: now_ms_u64(),
389 last_checked_at_ms: now_ms_u64(),
390 legacy_counts: LegacyTreeCounts::default(),
391 imported_counts: LegacyImportedCounts::default(),
392 });
393 return Ok(LegacyRepairRunReport {
394 status: "skipped".to_string(),
395 marker_updated: false,
396 sessions_merged: 0,
397 messages_recovered: 0,
398 parts_recovered: 0,
399 legacy_counts: marker.legacy_counts,
400 imported_counts: marker.imported_counts,
401 });
402 }
403
404 let base_for_scan = self.base.clone();
405 let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
406 .await
407 .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
408
409 let merge_stats = {
410 let mut sessions = self.sessions.write().await;
411 merge_legacy_sessions_with_stats(&mut sessions, scan.sessions)
412 };
413
414 if merge_stats.changed {
415 self.flush().await?;
416 }
417
418 let marker = LegacyImportMarker {
419 version: LEGACY_IMPORT_MARKER_VERSION,
420 created_at_ms: now_ms_u64(),
421 last_checked_at_ms: now_ms_u64(),
422 legacy_counts: scan.legacy_counts.clone(),
423 imported_counts: scan.imported_counts.clone(),
424 };
425 self.write_legacy_import_marker(&marker).await?;
426
427 Ok(LegacyRepairRunReport {
428 status: if merge_stats.changed {
429 "updated".to_string()
430 } else {
431 "no_changes".to_string()
432 },
433 marker_updated: true,
434 sessions_merged: merge_stats.sessions_merged,
435 messages_recovered: merge_stats.messages_recovered,
436 parts_recovered: merge_stats.parts_recovered,
437 legacy_counts: scan.legacy_counts,
438 imported_counts: scan.imported_counts,
439 })
440 }
441
442 pub async fn delete_session(&self, id: &str) -> anyhow::Result<bool> {
443 let removed = self.sessions.write().await.remove(id).is_some();
444 self.metadata.write().await.remove(id);
445 self.question_requests
446 .write()
447 .await
448 .retain(|_, request| request.session_id != id);
449 if removed {
450 self.flush().await?;
451 }
452 Ok(removed)
453 }
454
455 pub async fn append_message(&self, session_id: &str, msg: Message) -> anyhow::Result<()> {
456 let mut sessions = self.sessions.write().await;
457 let session = sessions
458 .get_mut(session_id)
459 .context("session not found for append_message")?;
460 session.messages.push(msg);
461 session.time.updated = Utc::now();
462 drop(sessions);
463 self.flush().await
464 }
465
466 pub async fn append_message_part(
467 &self,
468 session_id: &str,
469 message_id: &str,
470 part: MessagePart,
471 ) -> anyhow::Result<()> {
472 let mut sessions = self.sessions.write().await;
473 let session = sessions
474 .get_mut(session_id)
475 .context("session not found for append_message_part")?;
476 let message = if let Some(message) = session
477 .messages
478 .iter_mut()
479 .find(|message| message.id == message_id)
480 {
481 message
482 } else {
483 session
484 .messages
485 .iter_mut()
486 .rev()
487 .find(|message| matches!(message.role, MessageRole::User))
488 .context("message not found for append_message_part")?
489 };
490 reduce_message_parts(&mut message.parts, part);
491 session.time.updated = Utc::now();
492 drop(sessions);
493 self.flush().await
494 }
495
496 pub async fn fork_session(&self, id: &str) -> anyhow::Result<Option<Session>> {
497 let source = {
498 let sessions = self.sessions.read().await;
499 sessions.get(id).cloned()
500 };
501 let Some(mut child) = source else {
502 return Ok(None);
503 };
504
505 child.id = Uuid::new_v4().to_string();
506 child.title = format!("{} (fork)", child.title);
507 child.time.created = Utc::now();
508 child.time.updated = child.time.created;
509 child.slug = None;
510
511 self.sessions
512 .write()
513 .await
514 .insert(child.id.clone(), child.clone());
515 self.metadata.write().await.insert(
516 child.id.clone(),
517 SessionMeta {
518 parent_id: Some(id.to_string()),
519 snapshots: vec![child.messages.clone()],
520 ..SessionMeta::default()
521 },
522 );
523 self.flush().await?;
524 Ok(Some(child))
525 }
526
527 pub async fn revert_session(&self, id: &str) -> anyhow::Result<bool> {
528 let mut sessions = self.sessions.write().await;
529 let Some(session) = sessions.get_mut(id) else {
530 return Ok(false);
531 };
532 let mut metadata = self.metadata.write().await;
533 let meta = metadata
534 .entry(id.to_string())
535 .or_insert_with(SessionMeta::default);
536 let Some(snapshot) = meta.snapshots.pop() else {
537 return Ok(false);
538 };
539 meta.pre_revert = Some(session.messages.clone());
540 session.messages = snapshot;
541 session.time.updated = Utc::now();
542 drop(metadata);
543 drop(sessions);
544 self.flush().await?;
545 Ok(true)
546 }
547
548 pub async fn unrevert_session(&self, id: &str) -> anyhow::Result<bool> {
549 let mut sessions = self.sessions.write().await;
550 let Some(session) = sessions.get_mut(id) else {
551 return Ok(false);
552 };
553 let mut metadata = self.metadata.write().await;
554 let Some(meta) = metadata.get_mut(id) else {
555 return Ok(false);
556 };
557 let Some(previous) = meta.pre_revert.take() else {
558 return Ok(false);
559 };
560 meta.snapshots.push(session.messages.clone());
561 trim_session_snapshots(&mut meta.snapshots);
562 session.messages = previous;
563 session.time.updated = Utc::now();
564 drop(metadata);
565 drop(sessions);
566 self.flush().await?;
567 Ok(true)
568 }
569
570 pub async fn set_shared(&self, id: &str, shared: bool) -> anyhow::Result<Option<String>> {
571 let mut metadata = self.metadata.write().await;
572 let meta = metadata
573 .entry(id.to_string())
574 .or_insert_with(SessionMeta::default);
575 meta.shared = shared;
576 if shared {
577 if meta.share_id.is_none() {
578 meta.share_id = Some(Uuid::new_v4().to_string());
579 }
580 } else {
581 meta.share_id = None;
582 }
583 let share_id = meta.share_id.clone();
584 drop(metadata);
585 self.flush().await?;
586 Ok(share_id)
587 }
588
589 pub async fn set_archived(&self, id: &str, archived: bool) -> anyhow::Result<bool> {
590 let mut metadata = self.metadata.write().await;
591 let meta = metadata
592 .entry(id.to_string())
593 .or_insert_with(SessionMeta::default);
594 meta.archived = archived;
595 drop(metadata);
596 self.flush().await?;
597 Ok(true)
598 }
599
600 pub async fn set_summary(&self, id: &str, summary: String) -> anyhow::Result<bool> {
601 let mut metadata = self.metadata.write().await;
602 let meta = metadata
603 .entry(id.to_string())
604 .or_insert_with(SessionMeta::default);
605 meta.summary = Some(summary);
606 drop(metadata);
607 self.flush().await?;
608 Ok(true)
609 }
610
611 pub async fn children(&self, parent_id: &str) -> Vec<Session> {
612 let child_ids = {
613 let metadata = self.metadata.read().await;
614 metadata
615 .iter()
616 .filter(|(_, meta)| meta.parent_id.as_deref() == Some(parent_id))
617 .map(|(id, _)| id.clone())
618 .collect::<Vec<_>>()
619 };
620 let sessions = self.sessions.read().await;
621 child_ids
622 .into_iter()
623 .filter_map(|id| sessions.get(&id).cloned())
624 .collect()
625 }
626
627 pub async fn session_status(&self, id: &str) -> Option<Value> {
628 let metadata = self.metadata.read().await;
629 metadata.get(id).map(|meta| {
630 json!({
631 "archived": meta.archived,
632 "shared": meta.shared,
633 "parentID": meta.parent_id,
634 "snapshotCount": meta.snapshots.len()
635 })
636 })
637 }
638
639 pub async fn session_diff(&self, id: &str) -> Option<Value> {
640 let sessions = self.sessions.read().await;
641 let current = sessions.get(id)?;
642 let metadata = self.metadata.read().await;
643 let default = SessionMeta::default();
644 let meta = metadata.get(id).unwrap_or(&default);
645 let last_snapshot_len = meta.snapshots.last().map(|s| s.len()).unwrap_or(0);
646 Some(json!({
647 "sessionID": id,
648 "currentMessageCount": current.messages.len(),
649 "lastSnapshotMessageCount": last_snapshot_len,
650 "delta": current.messages.len() as i64 - last_snapshot_len as i64
651 }))
652 }
653
654 pub async fn set_todos(&self, id: &str, todos: Vec<Value>) -> anyhow::Result<()> {
655 let mut metadata = self.metadata.write().await;
656 let meta = metadata
657 .entry(id.to_string())
658 .or_insert_with(SessionMeta::default);
659 meta.todos = normalize_todo_items(todos);
660 drop(metadata);
661 self.flush().await
662 }
663
664 pub async fn get_todos(&self, id: &str) -> Vec<Value> {
665 let todos = self
666 .metadata
667 .read()
668 .await
669 .get(id)
670 .map(|meta| meta.todos.clone())
671 .unwrap_or_default();
672 normalize_todo_items(todos)
673 }
674
675 pub async fn add_question_request(
676 &self,
677 session_id: &str,
678 message_id: &str,
679 questions: Vec<Value>,
680 ) -> anyhow::Result<QuestionRequest> {
681 if questions.is_empty() {
682 return Err(anyhow::anyhow!(
683 "cannot add empty question request for session {}",
684 session_id
685 ));
686 }
687 let request = QuestionRequest {
688 id: format!("q-{}", Uuid::new_v4()),
689 session_id: session_id.to_string(),
690 questions,
691 tool: Some(QuestionToolRef {
692 call_id: format!("call-{}", Uuid::new_v4()),
693 message_id: message_id.to_string(),
694 }),
695 };
696 self.question_requests
697 .write()
698 .await
699 .insert(request.id.clone(), request.clone());
700 self.flush().await?;
701 Ok(request)
702 }
703
704 pub async fn list_question_requests(&self) -> Vec<QuestionRequest> {
705 self.question_requests
706 .read()
707 .await
708 .values()
709 .cloned()
710 .collect()
711 }
712
713 pub async fn reply_question(&self, request_id: &str) -> anyhow::Result<bool> {
714 let removed = self
715 .question_requests
716 .write()
717 .await
718 .remove(request_id)
719 .is_some();
720 if removed {
721 self.flush().await?;
722 }
723 Ok(removed)
724 }
725
726 pub async fn reject_question(&self, request_id: &str) -> anyhow::Result<bool> {
727 self.reply_question(request_id).await
728 }
729
730 pub async fn attach_session_to_workspace(
731 &self,
732 session_id: &str,
733 target_workspace: &str,
734 reason_tag: &str,
735 ) -> anyhow::Result<Option<Session>> {
736 let Some(target_workspace) = normalize_workspace_path(target_workspace) else {
737 return Ok(None);
738 };
739 let mut sessions = self.sessions.write().await;
740 let Some(session) = sessions.get_mut(session_id) else {
741 return Ok(None);
742 };
743 let previous_workspace = session
744 .workspace_root
745 .clone()
746 .or_else(|| normalize_workspace_path(&session.directory));
747
748 if session.origin_workspace_root.is_none() {
749 session.origin_workspace_root = previous_workspace.clone();
750 }
751 session.attached_from_workspace = previous_workspace;
752 session.attached_to_workspace = Some(target_workspace.clone());
753 session.attach_timestamp_ms = Some(Utc::now().timestamp_millis().max(0) as u64);
754 session.attach_reason = Some(reason_tag.trim().to_string());
755 session.workspace_root = Some(target_workspace.clone());
756 session.project_id = workspace_project_id(&target_workspace);
757 session.directory = target_workspace;
758 session.time.updated = Utc::now();
759 let updated = session.clone();
760 drop(sessions);
761 self.flush().await?;
762 Ok(Some(updated))
763 }
764
765 async fn flush(&self) -> anyhow::Result<()> {
766 let _flush_guard = self.flush_lock.lock().await;
767 {
768 let snapshot = self.sessions.read().await.clone();
769 self.flush_file("sessions.json", &snapshot).await?;
770 }
771 {
772 let metadata_snapshot = self.metadata.read().await.clone();
773 self.flush_file("session_meta.json", &metadata_snapshot)
774 .await?;
775 }
776 {
777 let questions_snapshot = self.question_requests.read().await.clone();
778 self.flush_file("questions.json", &questions_snapshot)
779 .await?;
780 }
781 Ok(())
782 }
783
784 async fn flush_file(&self, filename: &str, data: &impl serde::Serialize) -> anyhow::Result<()> {
785 let path = self.base.join(filename);
786 let temp_path = self.base.join(format!("{}.tmp", filename));
787 let payload = serde_json::to_string_pretty(data)?;
788 fs::write(&temp_path, payload).await.with_context(|| {
789 format!("failed to write temp storage file {}", temp_path.display())
790 })?;
791 let std_temp_path: std::path::PathBuf = temp_path.clone().try_into()?;
792 tokio::task::spawn_blocking(move || {
793 let file = std::fs::File::open(&std_temp_path)?;
794 file.sync_all()?;
795 Ok::<(), std::io::Error>(())
796 })
797 .await??;
798 commit_temp_file(&temp_path, &path).await.with_context(|| {
799 format!(
800 "failed to atomically replace storage file {} with {}",
801 path.display(),
802 temp_path.display()
803 )
804 })?;
805 Ok(())
806 }
807
808 async fn write_legacy_import_marker(&self, marker: &LegacyImportMarker) -> anyhow::Result<()> {
809 let payload = serde_json::to_string_pretty(marker)?;
810 fs::write(self.base.join(LEGACY_IMPORT_MARKER_FILE), payload).await?;
811 Ok(())
812 }
813}
814
815async fn commit_temp_file(temp_path: &Path, path: &Path) -> std::io::Result<()> {
816 match tokio::fs::rename(temp_path, path).await {
817 Ok(()) => Ok(()),
818 Err(err) => {
819 #[cfg(windows)]
820 {
821 use std::io::ErrorKind;
824 if matches!(
825 err.kind(),
826 ErrorKind::PermissionDenied | ErrorKind::AlreadyExists
827 ) {
828 match tokio::fs::remove_file(path).await {
829 Ok(()) => {}
830 Err(remove_err) if remove_err.kind() == ErrorKind::NotFound => {}
831 Err(remove_err) => return Err(remove_err),
832 }
833 return tokio::fs::rename(temp_path, path).await;
834 }
835 }
836 Err(err)
837 }
838 }
839}
840
841fn normalize_todo_items(items: Vec<Value>) -> Vec<Value> {
842 items
843 .into_iter()
844 .filter_map(|item| {
845 let obj = item.as_object()?;
846 let content = obj
847 .get("content")
848 .and_then(|v| v.as_str())
849 .or_else(|| obj.get("text").and_then(|v| v.as_str()))
850 .unwrap_or("")
851 .trim()
852 .to_string();
853 if content.is_empty() {
854 return None;
855 }
856 let id = obj
857 .get("id")
858 .and_then(|v| v.as_str())
859 .filter(|s| !s.trim().is_empty())
860 .map(ToString::to_string)
861 .unwrap_or_else(|| format!("todo-{}", Uuid::new_v4()));
862 let status = obj
863 .get("status")
864 .and_then(|v| v.as_str())
865 .filter(|s| !s.trim().is_empty())
866 .map(ToString::to_string)
867 .unwrap_or_else(|| "pending".to_string());
868 Some(json!({
869 "id": id,
870 "content": content,
871 "status": status
872 }))
873 })
874 .collect()
875}
876
877#[derive(Debug)]
878struct LegacyScanResult {
879 sessions: HashMap<String, Session>,
880 legacy_counts: LegacyTreeCounts,
881 imported_counts: LegacyImportedCounts,
882}
883
884#[derive(Debug, Default)]
885struct LegacyMergeStats {
886 changed: bool,
887 sessions_merged: u64,
888 messages_recovered: u64,
889 parts_recovered: u64,
890}
891
892fn now_ms_u64() -> u64 {
893 Utc::now().timestamp_millis().max(0) as u64
894}
895
896async fn should_run_legacy_scan_on_startup(marker_path: &Path, sessions_exist: bool) -> bool {
897 if !sessions_exist {
898 return true;
899 }
900 if read_legacy_import_marker(marker_path).await.is_none() {
903 return false;
904 }
905 false
906}
907
908async fn read_legacy_import_marker(marker_path: &Path) -> Option<LegacyImportMarker> {
909 let raw = fs::read_to_string(marker_path).await.ok()?;
910 serde_json::from_str::<LegacyImportMarker>(&raw).ok()
911}
912
913fn scan_legacy_sessions(base: &Path) -> anyhow::Result<LegacyScanResult> {
914 let sessions = load_legacy_opencode_sessions(base).unwrap_or_default();
915 let imported_counts = LegacyImportedCounts {
916 sessions: sessions.len() as u64,
917 messages: sessions.values().map(|s| s.messages.len() as u64).sum(),
918 parts: sessions
919 .values()
920 .flat_map(|s| s.messages.iter())
921 .map(|m| m.parts.len() as u64)
922 .sum(),
923 };
924 let legacy_counts = LegacyTreeCounts {
925 session_files: count_legacy_json_files(&base.join("session")),
926 message_files: count_legacy_json_files(&base.join("message")),
927 part_files: count_legacy_json_files(&base.join("part")),
928 };
929 Ok(LegacyScanResult {
930 sessions,
931 legacy_counts,
932 imported_counts,
933 })
934}
935
936fn count_legacy_json_files(root: &Path) -> u64 {
937 if !root.is_dir() {
938 return 0;
939 }
940 let mut count = 0u64;
941 let mut stack = vec![root.to_path_buf()];
942 while let Some(dir) = stack.pop() {
943 if let Ok(entries) = std::fs::read_dir(&dir) {
944 for entry in entries.flatten() {
945 let path = entry.path();
946 if entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
947 stack.push(path);
948 continue;
949 }
950 if path.extension().and_then(|ext| ext.to_str()) == Some("json") {
951 count += 1;
952 }
953 }
954 }
955 }
956 count
957}
958
959fn merge_legacy_sessions(
960 current: &mut HashMap<String, Session>,
961 imported: HashMap<String, Session>,
962) -> bool {
963 merge_legacy_sessions_with_stats(current, imported).changed
964}
965
966fn merge_legacy_sessions_with_stats(
967 current: &mut HashMap<String, Session>,
968 imported: HashMap<String, Session>,
969) -> LegacyMergeStats {
970 let mut stats = LegacyMergeStats::default();
971 for (id, legacy) in imported {
972 let legacy_message_count = legacy.messages.len() as u64;
973 let legacy_part_count = legacy
974 .messages
975 .iter()
976 .map(|m| m.parts.len() as u64)
977 .sum::<u64>();
978 match current.get_mut(&id) {
979 None => {
980 current.insert(id, legacy);
981 stats.changed = true;
982 stats.sessions_merged += 1;
983 stats.messages_recovered += legacy_message_count;
984 stats.parts_recovered += legacy_part_count;
985 }
986 Some(existing) => {
987 let should_merge_messages =
988 existing.messages.is_empty() && !legacy.messages.is_empty();
989 let should_fill_title =
990 existing.title.trim().is_empty() && !legacy.title.trim().is_empty();
991 let should_fill_directory = (existing.directory.trim().is_empty()
992 || existing.directory.trim() == "."
993 || existing.directory.trim() == "./"
994 || existing.directory.trim() == ".\\")
995 && !legacy.directory.trim().is_empty();
996 let should_fill_workspace =
997 existing.workspace_root.is_none() && legacy.workspace_root.is_some();
998 if should_merge_messages {
999 existing.messages = legacy.messages.clone();
1000 }
1001 if should_fill_title {
1002 existing.title = legacy.title.clone();
1003 }
1004 if should_fill_directory {
1005 existing.directory = legacy.directory.clone();
1006 }
1007 if should_fill_workspace {
1008 existing.workspace_root = legacy.workspace_root.clone();
1009 }
1010 if should_merge_messages
1011 || should_fill_title
1012 || should_fill_directory
1013 || should_fill_workspace
1014 {
1015 stats.changed = true;
1016 if should_merge_messages {
1017 stats.sessions_merged += 1;
1018 stats.messages_recovered += legacy_message_count;
1019 stats.parts_recovered += legacy_part_count;
1020 }
1021 }
1022 }
1023 }
1024 }
1025 stats
1026}
1027
1028fn hydrate_workspace_roots(sessions: &mut HashMap<String, Session>) -> bool {
1029 let mut changed = false;
1030 for session in sessions.values_mut() {
1031 if session.workspace_root.is_none() {
1032 let normalized = normalize_workspace_path(&session.directory);
1033 if normalized.is_some() {
1034 session.workspace_root = normalized;
1035 changed = true;
1036 }
1037 }
1038 }
1039 changed
1040}
1041
1042fn repair_session_titles(sessions: &mut HashMap<String, Session>) -> bool {
1043 let mut changed = false;
1044 for session in sessions.values_mut() {
1045 if !title_needs_repair(&session.title) {
1046 continue;
1047 }
1048 let first_user_text = session.messages.iter().find_map(|message| {
1049 if !matches!(message.role, MessageRole::User) {
1050 return None;
1051 }
1052 message.parts.iter().find_map(|part| match part {
1053 MessagePart::Text { text } if !text.trim().is_empty() => Some(text.as_str()),
1054 _ => None,
1055 })
1056 });
1057 let Some(source) = first_user_text else {
1058 continue;
1059 };
1060 let Some(derived) = derive_session_title_from_prompt(source, 60) else {
1061 continue;
1062 };
1063 if derived == session.title {
1064 continue;
1065 }
1066 session.title = derived;
1067 session.time.updated = Utc::now();
1068 changed = true;
1069 }
1070 changed
1071}
1072
1073#[derive(Debug, Deserialize)]
1074struct LegacySessionTime {
1075 created: i64,
1076 updated: i64,
1077}
1078
1079#[derive(Debug, Deserialize)]
1080struct LegacySession {
1081 id: String,
1082 slug: Option<String>,
1083 version: Option<String>,
1084 #[serde(rename = "projectID")]
1085 project_id: Option<String>,
1086 title: Option<String>,
1087 directory: Option<String>,
1088 time: LegacySessionTime,
1089}
1090
1091fn load_legacy_opencode_sessions(base: &Path) -> anyhow::Result<HashMap<String, Session>> {
1092 let legacy_root = base.join("session");
1093 if !legacy_root.is_dir() {
1094 return Ok(HashMap::new());
1095 }
1096
1097 let mut out = HashMap::new();
1098 let mut stack = vec![legacy_root];
1099 while let Some(dir) = stack.pop() {
1100 for entry in std::fs::read_dir(&dir)? {
1101 let entry = entry?;
1102 let path = entry.path();
1103 if entry.file_type()?.is_dir() {
1104 stack.push(path);
1105 continue;
1106 }
1107 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1108 continue;
1109 }
1110 let raw = match std::fs::read_to_string(&path) {
1111 Ok(v) => v,
1112 Err(_) => continue,
1113 };
1114 let legacy = match serde_json::from_str::<LegacySession>(&raw) {
1115 Ok(v) => v,
1116 Err(_) => continue,
1117 };
1118 let created = Utc
1119 .timestamp_millis_opt(legacy.time.created)
1120 .single()
1121 .unwrap_or_else(Utc::now);
1122 let updated = Utc
1123 .timestamp_millis_opt(legacy.time.updated)
1124 .single()
1125 .unwrap_or(created);
1126
1127 let session_id = legacy.id.clone();
1128 out.insert(
1129 session_id.clone(),
1130 Session {
1131 id: session_id.clone(),
1132 slug: legacy.slug,
1133 version: legacy.version,
1134 project_id: legacy.project_id,
1135 title: legacy
1136 .title
1137 .filter(|s| !s.trim().is_empty())
1138 .unwrap_or_else(|| "New session".to_string()),
1139 directory: legacy
1140 .directory
1141 .clone()
1142 .filter(|s| !s.trim().is_empty())
1143 .unwrap_or_else(|| ".".to_string()),
1144 workspace_root: legacy
1145 .directory
1146 .as_deref()
1147 .and_then(normalize_workspace_path),
1148 origin_workspace_root: None,
1149 attached_from_workspace: None,
1150 attached_to_workspace: None,
1151 attach_timestamp_ms: None,
1152 attach_reason: None,
1153 time: tandem_types::SessionTime { created, updated },
1154 model: None,
1155 provider: None,
1156 environment: None,
1157 messages: load_legacy_session_messages(base, &session_id),
1158 },
1159 );
1160 }
1161 }
1162 Ok(out)
1163}
1164
1165#[derive(Debug, Deserialize)]
1166struct LegacyMessageTime {
1167 created: i64,
1168}
1169
1170#[derive(Debug, Deserialize)]
1171struct LegacyMessage {
1172 id: String,
1173 role: String,
1174 time: LegacyMessageTime,
1175}
1176
1177#[derive(Debug, Deserialize)]
1178struct LegacyPart {
1179 #[serde(rename = "type")]
1180 part_type: Option<String>,
1181 text: Option<String>,
1182 tool: Option<String>,
1183 args: Option<Value>,
1184 result: Option<Value>,
1185 error: Option<String>,
1186}
1187
1188fn load_legacy_session_messages(base: &Path, session_id: &str) -> Vec<Message> {
1189 let msg_dir = base.join("message").join(session_id);
1190 if !msg_dir.is_dir() {
1191 return Vec::new();
1192 }
1193
1194 let mut legacy_messages: Vec<(i64, Message)> = Vec::new();
1195
1196 let Ok(entries) = std::fs::read_dir(&msg_dir) else {
1197 return Vec::new();
1198 };
1199
1200 for entry in entries.flatten() {
1201 let path = entry.path();
1202 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1203 continue;
1204 }
1205 let Ok(raw) = std::fs::read_to_string(&path) else {
1206 continue;
1207 };
1208 let Ok(legacy) = serde_json::from_str::<LegacyMessage>(&raw) else {
1209 continue;
1210 };
1211
1212 let created_at = Utc
1213 .timestamp_millis_opt(legacy.time.created)
1214 .single()
1215 .unwrap_or_else(Utc::now);
1216
1217 legacy_messages.push((
1218 legacy.time.created,
1219 Message {
1220 id: legacy.id.clone(),
1221 role: legacy_role_to_message_role(&legacy.role),
1222 parts: load_legacy_message_parts(base, &legacy.id),
1223 created_at,
1224 },
1225 ));
1226 }
1227
1228 legacy_messages.sort_by_key(|(created_ms, _)| *created_ms);
1229 legacy_messages.into_iter().map(|(_, msg)| msg).collect()
1230}
1231
1232fn load_legacy_message_parts(base: &Path, message_id: &str) -> Vec<MessagePart> {
1233 let parts_dir = base.join("part").join(message_id);
1234 if !parts_dir.is_dir() {
1235 return Vec::new();
1236 }
1237
1238 let Ok(entries) = std::fs::read_dir(&parts_dir) else {
1239 return Vec::new();
1240 };
1241
1242 let mut out = Vec::new();
1243 for entry in entries.flatten() {
1244 let path = entry.path();
1245 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1246 continue;
1247 }
1248 let Ok(raw) = std::fs::read_to_string(&path) else {
1249 continue;
1250 };
1251 let Ok(part) = serde_json::from_str::<LegacyPart>(&raw) else {
1252 continue;
1253 };
1254
1255 let mapped = if let Some(tool) = part.tool {
1256 Some(MessagePart::ToolInvocation {
1257 tool,
1258 args: part.args.unwrap_or_else(|| json!({})),
1259 result: part.result,
1260 error: part.error,
1261 })
1262 } else {
1263 match part.part_type.as_deref() {
1264 Some("reasoning") => Some(MessagePart::Reasoning {
1265 text: part.text.unwrap_or_default(),
1266 }),
1267 Some("tool") => Some(MessagePart::ToolInvocation {
1268 tool: "tool".to_string(),
1269 args: part.args.unwrap_or_else(|| json!({})),
1270 result: part.result,
1271 error: part.error,
1272 }),
1273 Some("text") | None => Some(MessagePart::Text {
1274 text: part.text.unwrap_or_default(),
1275 }),
1276 _ => None,
1277 }
1278 };
1279
1280 if let Some(part) = mapped {
1281 out.push(part);
1282 }
1283 }
1284 out
1285}
1286
1287fn legacy_role_to_message_role(role: &str) -> MessageRole {
1288 match role.to_lowercase().as_str() {
1289 "user" => MessageRole::User,
1290 "assistant" => MessageRole::Assistant,
1291 "system" => MessageRole::System,
1292 "tool" => MessageRole::Tool,
1293 _ => MessageRole::Assistant,
1294 }
1295}
1296
1297#[derive(Debug, Clone, Default)]
1298struct MessageMergeStats {
1299 messages_recovered: u64,
1300 parts_recovered: u64,
1301 conflicts_merged: u64,
1302}
1303
1304fn message_richness(msg: &Message) -> usize {
1305 msg.parts
1306 .iter()
1307 .map(|p| match p {
1308 MessagePart::Text { text } | MessagePart::Reasoning { text } => {
1309 if text.trim().is_empty() {
1310 0
1311 } else {
1312 1
1313 }
1314 }
1315 MessagePart::ToolInvocation { result, error, .. } => {
1316 if result.is_some() || error.is_some() {
1317 2
1318 } else {
1319 1
1320 }
1321 }
1322 })
1323 .sum()
1324}
1325
1326fn most_recent_message_time(messages: &[Message]) -> Option<chrono::DateTime<Utc>> {
1327 messages.iter().map(|m| m.created_at).max()
1328}
1329
1330fn merge_session_messages(
1331 existing: &[Message],
1332 imported: &[Message],
1333) -> (Vec<Message>, MessageMergeStats, bool) {
1334 if existing.is_empty() {
1335 let messages_recovered = imported.len() as u64;
1336 let parts_recovered = imported.iter().map(|m| m.parts.len() as u64).sum();
1337 return (
1338 imported.to_vec(),
1339 MessageMergeStats {
1340 messages_recovered,
1341 parts_recovered,
1342 conflicts_merged: 0,
1343 },
1344 true,
1345 );
1346 }
1347
1348 let mut merged_by_id: HashMap<String, Message> = existing
1349 .iter()
1350 .cloned()
1351 .map(|m| (m.id.clone(), m))
1352 .collect();
1353 let mut stats = MessageMergeStats::default();
1354 let mut changed = false;
1355
1356 for incoming in imported {
1357 match merged_by_id.get(&incoming.id) {
1358 None => {
1359 merged_by_id.insert(incoming.id.clone(), incoming.clone());
1360 stats.messages_recovered += 1;
1361 stats.parts_recovered += incoming.parts.len() as u64;
1362 changed = true;
1363 }
1364 Some(current) => {
1365 let incoming_richer = message_richness(incoming) > message_richness(current)
1366 || incoming.parts.len() > current.parts.len();
1367 if incoming_richer {
1368 merged_by_id.insert(incoming.id.clone(), incoming.clone());
1369 stats.conflicts_merged += 1;
1370 changed = true;
1371 }
1372 }
1373 }
1374 }
1375
1376 let mut out: Vec<Message> = merged_by_id.into_values().collect();
1377 out.sort_by_key(|m| m.created_at);
1378 (out, stats, changed)
1379}
1380
1381#[cfg(test)]
1382mod tests {
1383 use super::*;
1384 use std::fs as stdfs;
1385 use std::sync::Arc;
1386
1387 #[tokio::test]
1388 async fn todos_are_normalized_to_wire_shape() {
1389 let base = std::env::temp_dir().join(format!("tandem-core-test-{}", Uuid::new_v4()));
1390 let storage = Storage::new(&base).await.expect("storage");
1391 let session = Session::new(Some("test".to_string()), Some(".".to_string()));
1392 let id = session.id.clone();
1393 storage.save_session(session).await.expect("save session");
1394
1395 storage
1396 .set_todos(
1397 &id,
1398 vec![
1399 json!({"content":"first"}),
1400 json!({"text":"second", "status":"in_progress"}),
1401 json!({"id":"keep-id","content":"third","status":"completed"}),
1402 ],
1403 )
1404 .await
1405 .expect("set todos");
1406
1407 let todos = storage.get_todos(&id).await;
1408 assert_eq!(todos.len(), 3);
1409 for todo in todos {
1410 assert!(todo.get("id").and_then(|v| v.as_str()).is_some());
1411 assert!(todo.get("content").and_then(|v| v.as_str()).is_some());
1412 assert!(todo.get("status").and_then(|v| v.as_str()).is_some());
1413 }
1414 }
1415
1416 #[tokio::test]
1417 async fn imports_legacy_opencode_session_index_when_sessions_json_missing() {
1418 let base =
1419 std::env::temp_dir().join(format!("tandem-core-legacy-import-{}", Uuid::new_v4()));
1420 let legacy_session_dir = base.join("session").join("global");
1421 stdfs::create_dir_all(&legacy_session_dir).expect("legacy session dir");
1422 stdfs::write(
1423 legacy_session_dir.join("ses_test.json"),
1424 r#"{
1425 "id": "ses_test",
1426 "slug": "test",
1427 "version": "1.0.0",
1428 "projectID": "proj_1",
1429 "directory": "C:\\work\\demo",
1430 "title": "Legacy Session",
1431 "time": { "created": 1770913145613, "updated": 1770913146613 }
1432}"#,
1433 )
1434 .expect("legacy session write");
1435
1436 let storage = Storage::new(&base).await.expect("storage");
1437 let sessions = storage.list_sessions().await;
1438 assert_eq!(sessions.len(), 1);
1439 assert_eq!(sessions[0].id, "ses_test");
1440 assert_eq!(sessions[0].title, "Legacy Session");
1441 assert!(base.join("sessions.json").exists());
1442 }
1443
1444 #[tokio::test]
1445 async fn imports_legacy_messages_and_parts_for_session() {
1446 let base = std::env::temp_dir().join(format!("tandem-core-legacy-msg-{}", Uuid::new_v4()));
1447 let session_dir = base.join("session").join("global");
1448 let message_dir = base.join("message").join("ses_test");
1449 let part_dir = base.join("part").join("msg_1");
1450 stdfs::create_dir_all(&session_dir).expect("session dir");
1451 stdfs::create_dir_all(&message_dir).expect("message dir");
1452 stdfs::create_dir_all(&part_dir).expect("part dir");
1453
1454 stdfs::write(
1455 session_dir.join("ses_test.json"),
1456 r#"{
1457 "id": "ses_test",
1458 "projectID": "proj_1",
1459 "directory": "C:\\work\\demo",
1460 "title": "Legacy Session",
1461 "time": { "created": 1770913145613, "updated": 1770913146613 }
1462}"#,
1463 )
1464 .expect("write session");
1465
1466 stdfs::write(
1467 message_dir.join("msg_1.json"),
1468 r#"{
1469 "id": "msg_1",
1470 "sessionID": "ses_test",
1471 "role": "assistant",
1472 "time": { "created": 1770913145613 }
1473}"#,
1474 )
1475 .expect("write msg");
1476
1477 stdfs::write(
1478 part_dir.join("prt_1.json"),
1479 r#"{
1480 "id": "prt_1",
1481 "sessionID": "ses_test",
1482 "messageID": "msg_1",
1483 "type": "text",
1484 "text": "hello from legacy"
1485}"#,
1486 )
1487 .expect("write part");
1488
1489 let storage = Storage::new(&base).await.expect("storage");
1490 let sessions = storage.list_sessions().await;
1491 assert_eq!(sessions.len(), 1);
1492 assert_eq!(sessions[0].messages.len(), 1);
1493 assert_eq!(sessions[0].messages[0].parts.len(), 1);
1494 }
1495
1496 #[tokio::test]
1497 async fn skips_legacy_merge_when_sessions_json_exists() {
1498 let base =
1499 std::env::temp_dir().join(format!("tandem-core-legacy-merge-{}", Uuid::new_v4()));
1500 stdfs::create_dir_all(&base).expect("base");
1501 stdfs::write(
1502 base.join("sessions.json"),
1503 r#"{
1504 "ses_current": {
1505 "id": "ses_current",
1506 "slug": null,
1507 "version": "v1",
1508 "project_id": null,
1509 "title": "Current Session",
1510 "directory": ".",
1511 "workspace_root": null,
1512 "origin_workspace_root": null,
1513 "attached_from_workspace": null,
1514 "attached_to_workspace": null,
1515 "attach_timestamp_ms": null,
1516 "attach_reason": null,
1517 "time": {"created":"2026-01-01T00:00:00Z","updated":"2026-01-01T00:00:00Z"},
1518 "model": null,
1519 "provider": null,
1520 "messages": []
1521 }
1522}"#,
1523 )
1524 .expect("sessions.json");
1525
1526 let legacy_session_dir = base.join("session").join("global");
1527 stdfs::create_dir_all(&legacy_session_dir).expect("legacy session dir");
1528 stdfs::write(
1529 legacy_session_dir.join("ses_legacy.json"),
1530 r#"{
1531 "id": "ses_legacy",
1532 "slug": "legacy",
1533 "version": "1.0.0",
1534 "projectID": "proj_legacy",
1535 "directory": "C:\\work\\legacy",
1536 "title": "Legacy Session",
1537 "time": { "created": 1770913145613, "updated": 1770913146613 }
1538}"#,
1539 )
1540 .expect("legacy session write");
1541
1542 let storage = Storage::new(&base).await.expect("storage");
1543 let sessions = storage.list_sessions().await;
1544 let ids = sessions.iter().map(|s| s.id.clone()).collect::<Vec<_>>();
1545 assert!(ids.contains(&"ses_current".to_string()));
1546 assert!(!ids.contains(&"ses_legacy".to_string()));
1547 }
1548
1549 #[tokio::test]
1550 async fn list_sessions_scoped_filters_by_workspace_root() {
1551 let base = std::env::temp_dir().join(format!("tandem-core-scope-{}", Uuid::new_v4()));
1552 let storage = Storage::new(&base).await.expect("storage");
1553 let ws_a = base.join("ws-a");
1554 let ws_b = base.join("ws-b");
1555 stdfs::create_dir_all(&ws_a).expect("ws_a");
1556 stdfs::create_dir_all(&ws_b).expect("ws_b");
1557 let ws_a_str = ws_a.to_string_lossy().to_string();
1558 let ws_b_str = ws_b.to_string_lossy().to_string();
1559
1560 let mut a = Session::new(Some("a".to_string()), Some(ws_a_str.clone()));
1561 a.workspace_root = Some(ws_a_str.clone());
1562 storage.save_session(a).await.expect("save a");
1563
1564 let mut b = Session::new(Some("b".to_string()), Some(ws_b_str.clone()));
1565 b.workspace_root = Some(ws_b_str);
1566 storage.save_session(b).await.expect("save b");
1567
1568 let scoped = storage
1569 .list_sessions_scoped(SessionListScope::Workspace {
1570 workspace_root: ws_a_str,
1571 })
1572 .await;
1573 assert_eq!(scoped.len(), 1);
1574 assert_eq!(scoped[0].title, "a");
1575 }
1576
1577 #[tokio::test]
1578 async fn attach_session_persists_audit_metadata() {
1579 let base = std::env::temp_dir().join(format!("tandem-core-attach-{}", Uuid::new_v4()));
1580 let storage = Storage::new(&base).await.expect("storage");
1581 let ws_a = base.join("ws-a");
1582 let ws_b = base.join("ws-b");
1583 stdfs::create_dir_all(&ws_a).expect("ws_a");
1584 stdfs::create_dir_all(&ws_b).expect("ws_b");
1585 let ws_a_str = ws_a.to_string_lossy().to_string();
1586 let ws_b_str = ws_b.to_string_lossy().to_string();
1587 let mut session = Session::new(Some("s".to_string()), Some(ws_a_str.clone()));
1588 session.workspace_root = Some(ws_a_str);
1589 let id = session.id.clone();
1590 storage.save_session(session).await.expect("save");
1591
1592 let updated = storage
1593 .attach_session_to_workspace(&id, &ws_b_str, "manual")
1594 .await
1595 .expect("attach")
1596 .expect("session exists");
1597 let normalized_expected = normalize_workspace_path(&ws_b_str).expect("normalized path");
1598 assert_eq!(
1599 updated.workspace_root.as_deref(),
1600 Some(normalized_expected.as_str())
1601 );
1602 assert_eq!(
1603 updated.attached_to_workspace.as_deref(),
1604 Some(normalized_expected.as_str())
1605 );
1606 assert_eq!(updated.attach_reason.as_deref(), Some("manual"));
1607 assert!(updated.attach_timestamp_ms.is_some());
1608 }
1609
1610 #[tokio::test]
1611 async fn append_message_part_persists_tool_invocation_and_result() {
1612 let base = std::env::temp_dir().join(format!("tandem-core-tool-parts-{}", Uuid::new_v4()));
1613 let storage = Storage::new(&base).await.expect("storage");
1614 let session = Session::new(Some("tool parts".to_string()), Some(".".to_string()));
1615 let session_id = session.id.clone();
1616 storage.save_session(session).await.expect("save session");
1617
1618 let user = Message::new(
1619 MessageRole::User,
1620 vec![MessagePart::Text {
1621 text: "build ui".to_string(),
1622 }],
1623 );
1624 let message_id = user.id.clone();
1625 storage
1626 .append_message(&session_id, user)
1627 .await
1628 .expect("append user");
1629
1630 storage
1631 .append_message_part(
1632 &session_id,
1633 &message_id,
1634 MessagePart::ToolInvocation {
1635 tool: "write".to_string(),
1636 args: json!({"path":"game.html","content":"<html></html>"}),
1637 result: None,
1638 error: None,
1639 },
1640 )
1641 .await
1642 .expect("append invocation");
1643 storage
1644 .append_message_part(
1645 &session_id,
1646 &message_id,
1647 MessagePart::ToolInvocation {
1648 tool: "write".to_string(),
1649 args: json!({}),
1650 result: Some(json!("ok")),
1651 error: None,
1652 },
1653 )
1654 .await
1655 .expect("append result");
1656
1657 let session = storage.get_session(&session_id).await.expect("session");
1658 let message = session
1659 .messages
1660 .iter()
1661 .find(|message| message.id == message_id)
1662 .expect("message");
1663 assert_eq!(message.parts.len(), 2);
1664 match &message.parts[1] {
1665 MessagePart::ToolInvocation {
1666 tool,
1667 result,
1668 error,
1669 ..
1670 } => {
1671 assert_eq!(tool, "write");
1672 assert_eq!(result.as_ref(), Some(&json!("ok")));
1673 assert_eq!(error.as_deref(), None);
1674 }
1675 other => panic!("expected tool part, got {other:?}"),
1676 }
1677 }
1678
1679 #[tokio::test]
1680 async fn append_message_part_retains_failed_tool_error() {
1681 let base = std::env::temp_dir().join(format!("tandem-core-tool-error-{}", Uuid::new_v4()));
1682 let storage = Storage::new(&base).await.expect("storage");
1683 let session = Session::new(Some("tool errors".to_string()), Some(".".to_string()));
1684 let session_id = session.id.clone();
1685 storage.save_session(session).await.expect("save session");
1686
1687 let user = Message::new(
1688 MessageRole::User,
1689 vec![MessagePart::Text {
1690 text: "write file".to_string(),
1691 }],
1692 );
1693 let message_id = user.id.clone();
1694 storage
1695 .append_message(&session_id, user)
1696 .await
1697 .expect("append user");
1698
1699 storage
1700 .append_message_part(
1701 &session_id,
1702 &message_id,
1703 MessagePart::ToolInvocation {
1704 tool: "write".to_string(),
1705 args: json!({"path":"game.html"}),
1706 result: None,
1707 error: None,
1708 },
1709 )
1710 .await
1711 .expect("append invocation");
1712 storage
1713 .append_message_part(
1714 &session_id,
1715 &message_id,
1716 MessagePart::ToolInvocation {
1717 tool: "write".to_string(),
1718 args: json!({}),
1719 result: None,
1720 error: Some("WRITE_CONTENT_MISSING".to_string()),
1721 },
1722 )
1723 .await
1724 .expect("append error");
1725
1726 let session = storage.get_session(&session_id).await.expect("session");
1727 let message = session
1728 .messages
1729 .iter()
1730 .find(|message| message.id == message_id)
1731 .expect("message");
1732 match &message.parts[1] {
1733 MessagePart::ToolInvocation { error, .. } => {
1734 assert_eq!(error.as_deref(), Some("WRITE_CONTENT_MISSING"));
1735 }
1736 other => panic!("expected tool part, got {other:?}"),
1737 }
1738 }
1739
1740 #[tokio::test]
1741 async fn append_message_part_coalesces_repeated_tool_invocation_updates() {
1742 let base = std::env::temp_dir().join(format!("tandem-core-tool-merge-{}", Uuid::new_v4()));
1743 let storage = Storage::new(&base).await.expect("storage");
1744 let session = Session::new(Some("tool merge".to_string()), Some(".".to_string()));
1745 let session_id = session.id.clone();
1746 storage.save_session(session).await.expect("save session");
1747
1748 let user = Message::new(
1749 MessageRole::User,
1750 vec![MessagePart::Text {
1751 text: "build ui".to_string(),
1752 }],
1753 );
1754 let message_id = user.id.clone();
1755 storage
1756 .append_message(&session_id, user)
1757 .await
1758 .expect("append user");
1759
1760 storage
1761 .append_message_part(
1762 &session_id,
1763 &message_id,
1764 MessagePart::ToolInvocation {
1765 tool: "write".to_string(),
1766 args: json!({"path":"game.html"}),
1767 result: None,
1768 error: None,
1769 },
1770 )
1771 .await
1772 .expect("append first invocation");
1773 storage
1774 .append_message_part(
1775 &session_id,
1776 &message_id,
1777 MessagePart::ToolInvocation {
1778 tool: "write".to_string(),
1779 args: json!({"path":"game.html","content":"<html></html>"}),
1780 result: None,
1781 error: None,
1782 },
1783 )
1784 .await
1785 .expect("append updated invocation");
1786
1787 let session = storage.get_session(&session_id).await.expect("session");
1788 let message = session
1789 .messages
1790 .iter()
1791 .find(|message| message.id == message_id)
1792 .expect("message");
1793 assert_eq!(message.parts.len(), 2);
1794 match &message.parts[1] {
1795 MessagePart::ToolInvocation { tool, args, .. } => {
1796 assert_eq!(tool, "write");
1797 assert_eq!(args["path"], "game.html");
1798 assert_eq!(args["content"], "<html></html>");
1799 }
1800 other => panic!("expected tool part, got {other:?}"),
1801 }
1802 }
1803
1804 #[tokio::test]
1805 async fn append_message_part_upgrades_raw_string_args_to_structured_invocation_args() {
1806 let base =
1807 std::env::temp_dir().join(format!("tandem-core-tool-raw-upgrade-{}", Uuid::new_v4()));
1808 let storage = Storage::new(&base).await.expect("storage");
1809 let session = Session::new(Some("tool raw upgrade".to_string()), Some(".".to_string()));
1810 let session_id = session.id.clone();
1811 storage.save_session(session).await.expect("save session");
1812
1813 let user = Message::new(
1814 MessageRole::User,
1815 vec![MessagePart::Text {
1816 text: "build ui".to_string(),
1817 }],
1818 );
1819 let message_id = user.id.clone();
1820 storage
1821 .append_message(&session_id, user)
1822 .await
1823 .expect("append user");
1824
1825 storage
1826 .append_message_part(
1827 &session_id,
1828 &message_id,
1829 MessagePart::ToolInvocation {
1830 tool: "write".to_string(),
1831 args: json!("{\"path\":\"game.html\",\"content\":\"<html>draft</html>\"}"),
1832 result: None,
1833 error: None,
1834 },
1835 )
1836 .await
1837 .expect("append raw invocation");
1838 storage
1839 .append_message_part(
1840 &session_id,
1841 &message_id,
1842 MessagePart::ToolInvocation {
1843 tool: "write".to_string(),
1844 args: json!({"path":"game.html","content":"<html>draft</html>"}),
1845 result: None,
1846 error: None,
1847 },
1848 )
1849 .await
1850 .expect("append structured invocation");
1851
1852 let session = storage.get_session(&session_id).await.expect("session");
1853 let message = session
1854 .messages
1855 .iter()
1856 .find(|message| message.id == message_id)
1857 .expect("message");
1858 assert_eq!(message.parts.len(), 2);
1859 match &message.parts[1] {
1860 MessagePart::ToolInvocation { tool, args, .. } => {
1861 assert_eq!(tool, "write");
1862 assert_eq!(args["path"], "game.html");
1863 assert_eq!(args["content"], "<html>draft</html>");
1864 }
1865 other => panic!("expected tool part, got {other:?}"),
1866 }
1867 }
1868
1869 #[tokio::test]
1870 async fn append_message_part_upgrades_raw_string_args_when_result_arrives_with_structure() {
1871 let base = std::env::temp_dir().join(format!(
1872 "tandem-core-tool-raw-result-upgrade-{}",
1873 Uuid::new_v4()
1874 ));
1875 let storage = Storage::new(&base).await.expect("storage");
1876 let session = Session::new(
1877 Some("tool raw result upgrade".to_string()),
1878 Some(".".to_string()),
1879 );
1880 let session_id = session.id.clone();
1881 storage.save_session(session).await.expect("save session");
1882
1883 let user = Message::new(
1884 MessageRole::User,
1885 vec![MessagePart::Text {
1886 text: "build ui".to_string(),
1887 }],
1888 );
1889 let message_id = user.id.clone();
1890 storage
1891 .append_message(&session_id, user)
1892 .await
1893 .expect("append user");
1894
1895 storage
1896 .append_message_part(
1897 &session_id,
1898 &message_id,
1899 MessagePart::ToolInvocation {
1900 tool: "write".to_string(),
1901 args: json!("{\"path\":\"game.html\",\"content\":\"<html>draft</html>\"}"),
1902 result: None,
1903 error: None,
1904 },
1905 )
1906 .await
1907 .expect("append raw invocation");
1908 storage
1909 .append_message_part(
1910 &session_id,
1911 &message_id,
1912 MessagePart::ToolInvocation {
1913 tool: "write".to_string(),
1914 args: json!({"path":"game.html","content":"<html>draft</html>"}),
1915 result: Some(json!("ok")),
1916 error: None,
1917 },
1918 )
1919 .await
1920 .expect("append structured result");
1921
1922 let session = storage.get_session(&session_id).await.expect("session");
1923 let message = session
1924 .messages
1925 .iter()
1926 .find(|message| message.id == message_id)
1927 .expect("message");
1928 assert_eq!(message.parts.len(), 2);
1929 match &message.parts[1] {
1930 MessagePart::ToolInvocation {
1931 tool,
1932 args,
1933 result,
1934 error,
1935 } => {
1936 assert_eq!(tool, "write");
1937 assert_eq!(args["path"], "game.html");
1938 assert_eq!(args["content"], "<html>draft</html>");
1939 assert_eq!(result.as_ref(), Some(&json!("ok")));
1940 assert_eq!(error.as_deref(), None);
1941 }
1942 other => panic!("expected tool part, got {other:?}"),
1943 }
1944 }
1945
1946 #[tokio::test]
1947 async fn append_message_part_upgrades_partial_structured_args_when_result_adds_fields() {
1948 let base = std::env::temp_dir().join(format!(
1949 "tandem-core-tool-structured-result-upgrade-{}",
1950 Uuid::new_v4()
1951 ));
1952 let storage = Storage::new(&base).await.expect("storage");
1953 let session = Session::new(
1954 Some("tool structured result upgrade".to_string()),
1955 Some(".".to_string()),
1956 );
1957 let session_id = session.id.clone();
1958 storage.save_session(session).await.expect("save session");
1959
1960 let user = Message::new(
1961 MessageRole::User,
1962 vec![MessagePart::Text {
1963 text: "build ui".to_string(),
1964 }],
1965 );
1966 let message_id = user.id.clone();
1967 storage
1968 .append_message(&session_id, user)
1969 .await
1970 .expect("append user");
1971
1972 storage
1973 .append_message_part(
1974 &session_id,
1975 &message_id,
1976 MessagePart::ToolInvocation {
1977 tool: "write".to_string(),
1978 args: json!({"path":"game.html"}),
1979 result: None,
1980 error: None,
1981 },
1982 )
1983 .await
1984 .expect("append partial invocation");
1985 storage
1986 .append_message_part(
1987 &session_id,
1988 &message_id,
1989 MessagePart::ToolInvocation {
1990 tool: "write".to_string(),
1991 args: json!({"path":"game.html","content":"<html>draft</html>"}),
1992 result: Some(json!("ok")),
1993 error: None,
1994 },
1995 )
1996 .await
1997 .expect("append richer result");
1998
1999 let session = storage.get_session(&session_id).await.expect("session");
2000 let message = session
2001 .messages
2002 .iter()
2003 .find(|message| message.id == message_id)
2004 .expect("message");
2005 assert_eq!(message.parts.len(), 2);
2006 match &message.parts[1] {
2007 MessagePart::ToolInvocation {
2008 tool,
2009 args,
2010 result,
2011 error,
2012 } => {
2013 assert_eq!(tool, "write");
2014 assert_eq!(args["path"], "game.html");
2015 assert_eq!(args["content"], "<html>draft</html>");
2016 assert_eq!(result.as_ref(), Some(&json!("ok")));
2017 assert_eq!(error.as_deref(), None);
2018 }
2019 other => panic!("expected tool part, got {other:?}"),
2020 }
2021 }
2022
2023 #[tokio::test]
2024 async fn append_message_part_replaces_malformed_object_args_with_structured_result_args() {
2025 let base = std::env::temp_dir().join(format!(
2026 "tandem-core-tool-malformed-args-replace-{}",
2027 Uuid::new_v4()
2028 ));
2029 let storage = Storage::new(&base).await.expect("storage");
2030 let session = Session::new(
2031 Some("tool malformed args replacement".to_string()),
2032 Some(".".to_string()),
2033 );
2034 let session_id = session.id.clone();
2035 storage.save_session(session).await.expect("save session");
2036
2037 let user = Message::new(
2038 MessageRole::User,
2039 vec![MessagePart::Text {
2040 text: "build ui".to_string(),
2041 }],
2042 );
2043 let message_id = user.id.clone();
2044 storage
2045 .append_message(&session_id, user)
2046 .await
2047 .expect("append user");
2048
2049 storage
2050 .append_message_part(
2051 &session_id,
2052 &message_id,
2053 MessagePart::ToolInvocation {
2054 tool: "write".to_string(),
2055 args: json!({"{\"allow_empty": null}),
2056 result: None,
2057 error: None,
2058 },
2059 )
2060 .await
2061 .expect("append malformed invocation");
2062 storage
2063 .append_message_part(
2064 &session_id,
2065 &message_id,
2066 MessagePart::ToolInvocation {
2067 tool: "write".to_string(),
2068 args: json!({"path":"game.html","content":"<html>draft</html>"}),
2069 result: Some(json!("ok")),
2070 error: None,
2071 },
2072 )
2073 .await
2074 .expect("append structured result");
2075
2076 let session = storage.get_session(&session_id).await.expect("session");
2077 let message = session
2078 .messages
2079 .iter()
2080 .find(|message| message.id == message_id)
2081 .expect("message");
2082 assert_eq!(message.parts.len(), 2);
2083 match &message.parts[1] {
2084 MessagePart::ToolInvocation {
2085 tool,
2086 args,
2087 result,
2088 error,
2089 } => {
2090 assert_eq!(tool, "write");
2091 assert_eq!(args["path"], "game.html");
2092 assert_eq!(args["content"], "<html>draft</html>");
2093 assert_eq!(result.as_ref(), Some(&json!("ok")));
2094 assert_eq!(error.as_deref(), None);
2095 }
2096 other => panic!("expected tool part, got {other:?}"),
2097 }
2098 }
2099
2100 #[tokio::test]
2101 async fn append_message_part_replaces_partial_write_args_when_result_adds_path_and_content() {
2102 let base = std::env::temp_dir().join(format!(
2103 "tandem-core-tool-partial-write-args-replace-{}",
2104 Uuid::new_v4()
2105 ));
2106 let storage = Storage::new(&base).await.expect("storage");
2107 let session = Session::new(
2108 Some("tool partial write args replacement".to_string()),
2109 Some(".".to_string()),
2110 );
2111 let session_id = session.id.clone();
2112 storage.save_session(session).await.expect("save session");
2113
2114 let user = Message::new(
2115 MessageRole::User,
2116 vec![MessagePart::Text {
2117 text: "build ui".to_string(),
2118 }],
2119 );
2120 let message_id = user.id.clone();
2121 storage
2122 .append_message(&session_id, user)
2123 .await
2124 .expect("append user");
2125
2126 storage
2127 .append_message_part(
2128 &session_id,
2129 &message_id,
2130 MessagePart::ToolInvocation {
2131 tool: "write".to_string(),
2132 args: json!({"content": ""}),
2133 result: None,
2134 error: None,
2135 },
2136 )
2137 .await
2138 .expect("append partial invocation");
2139 storage
2140 .append_message_part(
2141 &session_id,
2142 &message_id,
2143 MessagePart::ToolInvocation {
2144 tool: "write".to_string(),
2145 args: json!({"path":"notes/report.md","content":"# Report\n"}),
2146 result: Some(json!("ok")),
2147 error: None,
2148 },
2149 )
2150 .await
2151 .expect("append structured result");
2152
2153 let session = storage.get_session(&session_id).await.expect("session");
2154 let message = session
2155 .messages
2156 .iter()
2157 .find(|message| message.id == message_id)
2158 .expect("message");
2159 assert_eq!(message.parts.len(), 2);
2160 match &message.parts[1] {
2161 MessagePart::ToolInvocation {
2162 tool,
2163 args,
2164 result,
2165 error,
2166 } => {
2167 assert_eq!(tool, "write");
2168 assert_eq!(args["path"], "notes/report.md");
2169 assert_eq!(args["content"], "# Report\n");
2170 assert_eq!(result.as_ref(), Some(&json!("ok")));
2171 assert_eq!(error.as_deref(), None);
2172 }
2173 other => panic!("expected tool part, got {other:?}"),
2174 }
2175 }
2176
2177 #[tokio::test]
2178 async fn append_message_part_prefers_executed_write_args_with_context_over_pending_raw_args() {
2179 let base = std::env::temp_dir().join(format!(
2180 "tandem-core-tool-executed-args-preferred-{}",
2181 Uuid::new_v4()
2182 ));
2183 let storage = Storage::new(&base).await.expect("storage");
2184 let session = Session::new(
2185 Some("tool executed args preferred".to_string()),
2186 Some(".".to_string()),
2187 );
2188 let session_id = session.id.clone();
2189 storage.save_session(session).await.expect("save session");
2190
2191 let user = Message::new(
2192 MessageRole::User,
2193 vec![MessagePart::Text {
2194 text: "build report".to_string(),
2195 }],
2196 );
2197 let message_id = user.id.clone();
2198 storage
2199 .append_message(&session_id, user)
2200 .await
2201 .expect("append user");
2202
2203 storage
2204 .append_message_part(
2205 &session_id,
2206 &message_id,
2207 MessagePart::ToolInvocation {
2208 tool: "write".to_string(),
2209 args: json!({"path":".","content":"draft"}),
2210 result: None,
2211 error: None,
2212 },
2213 )
2214 .await
2215 .expect("append raw pending invocation");
2216 storage
2217 .append_message_part(
2218 &session_id,
2219 &message_id,
2220 MessagePart::ToolInvocation {
2221 tool: "write".to_string(),
2222 args: json!({
2223 "path":".tandem/runs/run-123/artifacts/research-sources.json",
2224 "content":"draft",
2225 "__workspace_root":"/home/evan/marketing-tandem",
2226 "__effective_cwd":"/home/evan/marketing-tandem"
2227 }),
2228 result: Some(json!("ok")),
2229 error: None,
2230 },
2231 )
2232 .await
2233 .expect("append executed result");
2234
2235 let session = storage.get_session(&session_id).await.expect("session");
2236 let message = session
2237 .messages
2238 .iter()
2239 .find(|message| message.id == message_id)
2240 .expect("message");
2241 assert_eq!(message.parts.len(), 2);
2242 match &message.parts[1] {
2243 MessagePart::ToolInvocation {
2244 tool,
2245 args,
2246 result,
2247 error,
2248 } => {
2249 assert_eq!(tool, "write");
2250 assert_eq!(
2251 args["path"],
2252 ".tandem/runs/run-123/artifacts/research-sources.json"
2253 );
2254 assert_eq!(args["content"], "draft");
2255 assert_eq!(args["__workspace_root"], "/home/evan/marketing-tandem");
2256 assert_eq!(result.as_ref(), Some(&json!("ok")));
2257 assert_eq!(error.as_deref(), None);
2258 }
2259 other => panic!("expected tool part, got {other:?}"),
2260 }
2261 }
2262
2263 #[tokio::test]
2264 async fn append_message_part_falls_back_to_latest_user_message_when_id_missing() {
2265 let base =
2266 std::env::temp_dir().join(format!("tandem-core-tool-fallback-{}", Uuid::new_v4()));
2267 let storage = Storage::new(&base).await.expect("storage");
2268 let session = Session::new(Some("tool fallback".to_string()), Some(".".to_string()));
2269 let session_id = session.id.clone();
2270 storage.save_session(session).await.expect("save session");
2271
2272 let first = Message::new(
2273 MessageRole::User,
2274 vec![MessagePart::Text {
2275 text: "first prompt".to_string(),
2276 }],
2277 );
2278 let second = Message::new(
2279 MessageRole::User,
2280 vec![MessagePart::Text {
2281 text: "second prompt".to_string(),
2282 }],
2283 );
2284 let second_id = second.id.clone();
2285 storage
2286 .append_message(&session_id, first)
2287 .await
2288 .expect("append first");
2289 storage
2290 .append_message(&session_id, second)
2291 .await
2292 .expect("append second");
2293
2294 storage
2295 .append_message_part(
2296 &session_id,
2297 "missing-message-id",
2298 MessagePart::ToolInvocation {
2299 tool: "glob".to_string(),
2300 args: json!({"pattern":"*"}),
2301 result: Some(json!(["README.md"])),
2302 error: None,
2303 },
2304 )
2305 .await
2306 .expect("append fallback tool part");
2307
2308 let session = storage.get_session(&session_id).await.expect("session");
2309 let message = session
2310 .messages
2311 .iter()
2312 .find(|message| message.id == second_id)
2313 .expect("latest user message");
2314 match &message.parts[1] {
2315 MessagePart::ToolInvocation { tool, result, .. } => {
2316 assert_eq!(tool, "glob");
2317 assert_eq!(result.as_ref(), Some(&json!(["README.md"])));
2318 }
2319 other => panic!("expected tool part, got {other:?}"),
2320 }
2321 }
2322
2323 #[tokio::test]
2324 async fn commit_temp_file_replaces_existing_destination() {
2325 let base =
2326 std::env::temp_dir().join(format!("tandem-core-commit-temp-file-{}", Uuid::new_v4()));
2327 stdfs::create_dir_all(&base).expect("base dir");
2328 let destination = base.join("sessions.json");
2329 let temp = base.join("sessions.json.tmp");
2330 stdfs::write(&destination, "{\"version\":\"old\"}").expect("write destination");
2331 stdfs::write(&temp, "{\"version\":\"new\"}").expect("write temp");
2332
2333 commit_temp_file(&temp, &destination)
2334 .await
2335 .expect("replace destination");
2336
2337 let raw = stdfs::read_to_string(&destination).expect("read destination");
2338 assert_eq!(raw, "{\"version\":\"new\"}");
2339 assert!(!temp.exists());
2340 }
2341
2342 #[tokio::test]
2343 async fn startup_compacts_session_snapshot_metadata() {
2344 let base = std::env::temp_dir().join(format!(
2345 "tandem-core-snapshot-compaction-{}",
2346 Uuid::new_v4()
2347 ));
2348 stdfs::create_dir_all(&base).expect("base dir");
2349
2350 let mut session = Session::new(
2351 Some("snapshot compaction".to_string()),
2352 Some(".".to_string()),
2353 );
2354 session.messages.push(Message::new(
2355 MessageRole::User,
2356 vec![MessagePart::Text {
2357 text: "current".to_string(),
2358 }],
2359 ));
2360 let session_id = session.id.clone();
2361
2362 let mut sessions = HashMap::new();
2363 sessions.insert(session_id.clone(), session);
2364 stdfs::write(
2365 base.join("sessions.json"),
2366 serde_json::to_string_pretty(&sessions).expect("serialize sessions"),
2367 )
2368 .expect("write sessions");
2369
2370 let mut snapshots = Vec::new();
2371 for label in ["a", "a", "b", "c", "d", "e", "f"] {
2372 snapshots.push(vec![Message::new(
2373 MessageRole::User,
2374 vec![MessagePart::Text {
2375 text: label.to_string(),
2376 }],
2377 )]);
2378 }
2379 let mut metadata = HashMap::new();
2380 metadata.insert(
2381 session_id.clone(),
2382 SessionMeta {
2383 snapshots,
2384 ..SessionMeta::default()
2385 },
2386 );
2387 metadata.insert("orphan".to_string(), SessionMeta::default());
2388 stdfs::write(
2389 base.join("session_meta.json"),
2390 serde_json::to_string_pretty(&metadata).expect("serialize metadata"),
2391 )
2392 .expect("write metadata");
2393 stdfs::write(base.join("questions.json"), "{}").expect("write questions");
2394
2395 let _storage = Storage::new(&base).await.expect("storage");
2396
2397 let raw = stdfs::read_to_string(base.join("session_meta.json")).expect("read metadata");
2398 let stored: HashMap<String, SessionMeta> =
2399 serde_json::from_str(&raw).expect("parse metadata");
2400 assert_eq!(stored.len(), 1);
2401 let compacted = stored.get(&session_id).expect("session metadata");
2402 assert_eq!(compacted.snapshots.len(), MAX_SESSION_SNAPSHOTS);
2403
2404 let labels = compacted
2405 .snapshots
2406 .iter()
2407 .map(|snapshot| {
2408 snapshot[0]
2409 .parts
2410 .iter()
2411 .find_map(|part| match part {
2412 MessagePart::Text { text } => Some(text.clone()),
2413 _ => None,
2414 })
2415 .expect("snapshot text")
2416 })
2417 .collect::<Vec<_>>();
2418 assert_eq!(labels, vec!["b", "c", "d", "e", "f"]);
2419 }
2420
2421 #[tokio::test]
2422 async fn startup_repairs_placeholder_titles_from_wrapped_user_messages() {
2423 let base =
2424 std::env::temp_dir().join(format!("tandem-core-title-repair-{}", Uuid::new_v4()));
2425 let storage = Storage::new(&base).await.expect("storage");
2426 let wrapped = "<memory_context>\n<current_session>\n- fact\n</current_session>\n</memory_context>\n\n[Mode instructions]\nUse tools.\n\n[User request]\nExplain this bug";
2427 let mut session = Session::new(Some("<memory_context>".to_string()), Some(".".to_string()));
2428 let id = session.id.clone();
2429 session.messages.push(Message::new(
2430 MessageRole::User,
2431 vec![MessagePart::Text {
2432 text: wrapped.to_string(),
2433 }],
2434 ));
2435 storage.save_session(session).await.expect("save");
2436 drop(storage);
2437
2438 let storage = Storage::new(&base).await.expect("storage");
2439 let repaired = storage.get_session(&id).await.expect("session");
2440 assert_eq!(repaired.title, "Explain this bug");
2441 }
2442
2443 #[tokio::test]
2444 async fn concurrent_storage_flushes_do_not_fail() {
2445 let base = std::env::temp_dir().join(format!("tandem-core-flush-race-{}", Uuid::new_v4()));
2446 let storage = Arc::new(Storage::new(&base).await.expect("storage"));
2447 let session = Session::new(Some("flush race".to_string()), Some(".".to_string()));
2448 let session_id = session.id.clone();
2449 storage.save_session(session).await.expect("save session");
2450
2451 let mut tasks = Vec::new();
2452 for task_index in 0..12 {
2453 let storage = Arc::clone(&storage);
2454 let session_id = session_id.clone();
2455 tasks.push(tokio::spawn(async move {
2456 for part_index in 0..8 {
2457 let message = Message::new(
2458 MessageRole::User,
2459 vec![MessagePart::Text {
2460 text: format!("task {task_index} part {part_index}"),
2461 }],
2462 );
2463 storage
2464 .append_message(&session_id, message)
2465 .await
2466 .expect("append message");
2467 }
2468 }));
2469 }
2470
2471 for task in tasks {
2472 task.await.expect("join task");
2473 }
2474
2475 let session = storage.get_session(&session_id).await.expect("session");
2476 assert_eq!(session.messages.len(), 12 * 8);
2477 assert!(base.join("sessions.json").exists());
2478 }
2479}