1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use anyhow::Context;
5use chrono::{TimeZone, Utc};
6use serde::{Deserialize, Serialize};
7use serde_json::{json, Value};
8use tokio::fs;
9use tokio::sync::{Mutex, RwLock};
10use tokio::task;
11use uuid::Uuid;
12
13use tandem_types::{Message, MessagePart, MessageRole, Session};
14
15use crate::message_part_reducer::reduce_message_parts;
16use crate::{
17 derive_session_title_from_prompt, normalize_workspace_path, title_needs_repair,
18 workspace_project_id,
19};
20
21#[derive(Debug, Clone, Serialize, Deserialize, Default)]
22pub struct SessionMeta {
23 pub parent_id: Option<String>,
24 #[serde(default)]
25 pub archived: bool,
26 #[serde(default)]
27 pub shared: bool,
28 pub share_id: Option<String>,
29 pub summary: Option<String>,
30 #[serde(default)]
31 pub snapshots: Vec<Vec<Message>>,
32 pub pre_revert: Option<Vec<Message>>,
33 #[serde(default)]
34 pub todos: Vec<Value>,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct QuestionToolRef {
39 #[serde(rename = "callID")]
40 pub call_id: String,
41 #[serde(rename = "messageID")]
42 pub message_id: String,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct QuestionRequest {
47 pub id: String,
48 #[serde(rename = "sessionID")]
49 pub session_id: String,
50 #[serde(default)]
51 pub questions: Vec<Value>,
52 #[serde(skip_serializing_if = "Option::is_none")]
53 pub tool: Option<QuestionToolRef>,
54}
55
56pub struct Storage {
57 base: PathBuf,
58 sessions: RwLock<HashMap<String, Session>>,
59 metadata: RwLock<HashMap<String, SessionMeta>>,
60 question_requests: RwLock<HashMap<String, QuestionRequest>>,
61 flush_lock: Mutex<()>,
62}
63
64#[derive(Debug, Clone)]
65pub enum SessionListScope {
66 Global,
67 Workspace { workspace_root: String },
68}
69
70#[derive(Debug, Clone, Default, Serialize, Deserialize)]
71pub struct SessionRepairStats {
72 pub sessions_repaired: u64,
73 pub messages_recovered: u64,
74 pub parts_recovered: u64,
75 pub conflicts_merged: u64,
76}
77
78const LEGACY_IMPORT_MARKER_FILE: &str = "legacy_import_marker.json";
79const LEGACY_IMPORT_MARKER_VERSION: u32 = 1;
80const MAX_SESSION_SNAPSHOTS: usize = 5;
81
82#[derive(Debug, Clone, Default, Serialize, Deserialize)]
83pub struct LegacyTreeCounts {
84 pub session_files: u64,
85 pub message_files: u64,
86 pub part_files: u64,
87}
88
89#[derive(Debug, Clone, Default, Serialize, Deserialize)]
90pub struct LegacyImportedCounts {
91 pub sessions: u64,
92 pub messages: u64,
93 pub parts: u64,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct LegacyImportMarker {
98 pub version: u32,
99 pub created_at_ms: u64,
100 pub last_checked_at_ms: u64,
101 pub legacy_counts: LegacyTreeCounts,
102 pub imported_counts: LegacyImportedCounts,
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct LegacyRepairRunReport {
107 pub status: String,
108 pub marker_updated: bool,
109 pub sessions_merged: u64,
110 pub messages_recovered: u64,
111 pub parts_recovered: u64,
112 pub legacy_counts: LegacyTreeCounts,
113 pub imported_counts: LegacyImportedCounts,
114}
115
116fn snapshot_session_messages(
117 session_id: &str,
118 session: &Session,
119 metadata: &mut HashMap<String, SessionMeta>,
120) {
121 let meta = metadata
122 .entry(session_id.to_string())
123 .or_insert_with(SessionMeta::default);
124 meta.snapshots.push(session.messages.clone());
125 trim_session_snapshots(&mut meta.snapshots);
126}
127
128fn trim_session_snapshots(snapshots: &mut Vec<Vec<Message>>) {
129 if snapshots.len() > MAX_SESSION_SNAPSHOTS {
130 let keep_from = snapshots.len() - MAX_SESSION_SNAPSHOTS;
131 snapshots.drain(0..keep_from);
132 }
133}
134
135fn compact_session_snapshots(snapshots: &mut Vec<Vec<Message>>) -> usize {
136 if snapshots.is_empty() {
137 return 0;
138 }
139
140 let original_len = snapshots.len();
141 let mut compacted = Vec::with_capacity(original_len);
142 let mut previous_encoded: Option<Vec<u8>> = None;
143
144 for snapshot in snapshots.drain(..) {
145 let encoded = serde_json::to_vec(&snapshot).unwrap_or_default();
146 if previous_encoded.as_ref() == Some(&encoded) {
147 continue;
148 }
149 previous_encoded = Some(encoded);
150 compacted.push(snapshot);
151 }
152
153 trim_session_snapshots(&mut compacted);
154 let removed = original_len.saturating_sub(compacted.len());
155 *snapshots = compacted;
156 removed
157}
158
159fn session_meta_is_empty(meta: &SessionMeta) -> bool {
160 meta.parent_id.is_none()
161 && !meta.archived
162 && !meta.shared
163 && meta.share_id.is_none()
164 && meta.summary.is_none()
165 && meta.snapshots.is_empty()
166 && meta.pre_revert.is_none()
167 && meta.todos.is_empty()
168}
169
170#[derive(Debug, Default)]
171struct SessionMetaCompactionStats {
172 metadata_pruned: u64,
173 snapshots_removed: u64,
174}
175
176fn compact_session_metadata(
177 sessions: &HashMap<String, Session>,
178 metadata: &mut HashMap<String, SessionMeta>,
179) -> SessionMetaCompactionStats {
180 let mut stats = SessionMetaCompactionStats::default();
181
182 metadata.retain(|session_id, meta| {
183 if !sessions.contains_key(session_id) {
184 stats.metadata_pruned += 1;
185 return false;
186 }
187
188 let removed = compact_session_snapshots(&mut meta.snapshots) as u64;
189 stats.snapshots_removed += removed;
190
191 if session_meta_is_empty(meta) {
192 stats.metadata_pruned += 1;
193 return false;
194 }
195
196 true
197 });
198
199 stats
200}
201
202impl Storage {
203 pub async fn new(base: impl AsRef<Path>) -> anyhow::Result<Self> {
204 let base = base.as_ref().to_path_buf();
205 fs::create_dir_all(&base).await?;
206 let sessions_file = base.join("sessions.json");
207 let marker_path = base.join(LEGACY_IMPORT_MARKER_FILE);
208 let sessions_file_exists = sessions_file.exists();
209 let mut imported_legacy_sessions = false;
210 let mut sessions = if sessions_file_exists {
211 let raw = fs::read_to_string(&sessions_file).await?;
212 serde_json::from_str::<HashMap<String, Session>>(&raw).unwrap_or_default()
213 } else {
214 HashMap::new()
215 };
216
217 let mut marker_to_write = None;
218 if should_run_legacy_scan_on_startup(&marker_path, sessions_file_exists).await {
219 let base_for_scan = base.clone();
220 let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
221 .await
222 .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
223 if merge_legacy_sessions(&mut sessions, scan.sessions) {
224 imported_legacy_sessions = true;
225 }
226 marker_to_write = Some(LegacyImportMarker {
227 version: LEGACY_IMPORT_MARKER_VERSION,
228 created_at_ms: now_ms_u64(),
229 last_checked_at_ms: now_ms_u64(),
230 legacy_counts: scan.legacy_counts,
231 imported_counts: scan.imported_counts,
232 });
233 }
234
235 if hydrate_workspace_roots(&mut sessions) {
236 imported_legacy_sessions = true;
237 }
238 if repair_session_titles(&mut sessions) {
239 imported_legacy_sessions = true;
240 }
241 let metadata_file = base.join("session_meta.json");
242 let mut metadata = if metadata_file.exists() {
243 let raw = fs::read_to_string(&metadata_file).await?;
244 serde_json::from_str::<HashMap<String, SessionMeta>>(&raw).unwrap_or_default()
245 } else {
246 HashMap::new()
247 };
248 let compaction = compact_session_metadata(&sessions, &mut metadata);
249 let metadata_compacted = compaction.metadata_pruned > 0 || compaction.snapshots_removed > 0;
250 if metadata_compacted {
251 tracing::info!(
252 metadata_pruned = compaction.metadata_pruned,
253 snapshots_removed = compaction.snapshots_removed,
254 "compacted persisted session metadata"
255 );
256 }
257 let questions_file = base.join("questions.json");
258 let question_requests = if questions_file.exists() {
259 let raw = fs::read_to_string(&questions_file).await?;
260 serde_json::from_str::<HashMap<String, QuestionRequest>>(&raw).unwrap_or_default()
261 } else {
262 HashMap::new()
263 };
264 let storage = Self {
265 base,
266 sessions: RwLock::new(sessions),
267 metadata: RwLock::new(metadata),
268 question_requests: RwLock::new(question_requests),
269 flush_lock: Mutex::new(()),
270 };
271
272 if imported_legacy_sessions || metadata_compacted {
273 storage.flush().await?;
274 }
275 if let Some(marker) = marker_to_write {
276 storage.write_legacy_import_marker(&marker).await?;
277 }
278
279 Ok(storage)
280 }
281
282 pub async fn list_sessions(&self) -> Vec<Session> {
283 self.list_sessions_scoped(SessionListScope::Global).await
284 }
285
286 pub async fn list_sessions_scoped(&self, scope: SessionListScope) -> Vec<Session> {
287 let all = self
288 .sessions
289 .read()
290 .await
291 .values()
292 .cloned()
293 .collect::<Vec<_>>();
294 match scope {
295 SessionListScope::Global => all,
296 SessionListScope::Workspace { workspace_root } => {
297 let Some(normalized_workspace) = normalize_workspace_path(&workspace_root) else {
298 return Vec::new();
299 };
300 all.into_iter()
301 .filter(|session| {
302 let direct = session
303 .workspace_root
304 .as_ref()
305 .and_then(|p| normalize_workspace_path(p))
306 .map(|p| p == normalized_workspace)
307 .unwrap_or(false);
308 if direct {
309 return true;
310 }
311 normalize_workspace_path(&session.directory)
312 .map(|p| p == normalized_workspace)
313 .unwrap_or(false)
314 })
315 .collect()
316 }
317 }
318 }
319
320 pub async fn get_session(&self, id: &str) -> Option<Session> {
321 self.sessions.read().await.get(id).cloned()
322 }
323
324 pub async fn save_session(&self, mut session: Session) -> anyhow::Result<()> {
325 if session.workspace_root.is_none() {
326 session.workspace_root = normalize_workspace_path(&session.directory);
327 }
328 let session_id = session.id.clone();
329 self.sessions
330 .write()
331 .await
332 .insert(session_id.clone(), session);
333 self.metadata
334 .write()
335 .await
336 .entry(session_id)
337 .or_insert_with(SessionMeta::default);
338 self.flush().await
339 }
340
341 pub async fn repair_sessions_from_file_store(&self) -> anyhow::Result<SessionRepairStats> {
342 let mut stats = SessionRepairStats::default();
343 let mut sessions = self.sessions.write().await;
344
345 for session in sessions.values_mut() {
346 let imported = load_legacy_session_messages(&self.base, &session.id);
347 if imported.is_empty() {
348 continue;
349 }
350
351 let (merged, merge_stats, changed) =
352 merge_session_messages(&session.messages, &imported);
353 if changed {
354 session.messages = merged;
355 session.time.updated =
356 most_recent_message_time(&session.messages).unwrap_or(session.time.updated);
357 stats.sessions_repaired += 1;
358 stats.messages_recovered += merge_stats.messages_recovered;
359 stats.parts_recovered += merge_stats.parts_recovered;
360 stats.conflicts_merged += merge_stats.conflicts_merged;
361 }
362 }
363
364 if stats.sessions_repaired > 0 {
365 drop(sessions);
366 self.flush().await?;
367 }
368
369 Ok(stats)
370 }
371
372 pub async fn run_legacy_storage_repair_scan(
373 &self,
374 force: bool,
375 ) -> anyhow::Result<LegacyRepairRunReport> {
376 let marker_path = self.base.join(LEGACY_IMPORT_MARKER_FILE);
377 let sessions_exists = self.base.join("sessions.json").exists();
378 let should_scan = if force {
379 true
380 } else {
381 should_run_legacy_scan_on_startup(&marker_path, sessions_exists).await
382 };
383 if !should_scan {
384 let marker = read_legacy_import_marker(&marker_path)
385 .await
386 .unwrap_or_else(|| LegacyImportMarker {
387 version: LEGACY_IMPORT_MARKER_VERSION,
388 created_at_ms: now_ms_u64(),
389 last_checked_at_ms: now_ms_u64(),
390 legacy_counts: LegacyTreeCounts::default(),
391 imported_counts: LegacyImportedCounts::default(),
392 });
393 return Ok(LegacyRepairRunReport {
394 status: "skipped".to_string(),
395 marker_updated: false,
396 sessions_merged: 0,
397 messages_recovered: 0,
398 parts_recovered: 0,
399 legacy_counts: marker.legacy_counts,
400 imported_counts: marker.imported_counts,
401 });
402 }
403
404 let base_for_scan = self.base.clone();
405 let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
406 .await
407 .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
408
409 let merge_stats = {
410 let mut sessions = self.sessions.write().await;
411 merge_legacy_sessions_with_stats(&mut sessions, scan.sessions)
412 };
413
414 if merge_stats.changed {
415 self.flush().await?;
416 }
417
418 let marker = LegacyImportMarker {
419 version: LEGACY_IMPORT_MARKER_VERSION,
420 created_at_ms: now_ms_u64(),
421 last_checked_at_ms: now_ms_u64(),
422 legacy_counts: scan.legacy_counts.clone(),
423 imported_counts: scan.imported_counts.clone(),
424 };
425 self.write_legacy_import_marker(&marker).await?;
426
427 Ok(LegacyRepairRunReport {
428 status: if merge_stats.changed {
429 "updated".to_string()
430 } else {
431 "no_changes".to_string()
432 },
433 marker_updated: true,
434 sessions_merged: merge_stats.sessions_merged,
435 messages_recovered: merge_stats.messages_recovered,
436 parts_recovered: merge_stats.parts_recovered,
437 legacy_counts: scan.legacy_counts,
438 imported_counts: scan.imported_counts,
439 })
440 }
441
442 pub async fn delete_session(&self, id: &str) -> anyhow::Result<bool> {
443 let removed = self.sessions.write().await.remove(id).is_some();
444 self.metadata.write().await.remove(id);
445 self.question_requests
446 .write()
447 .await
448 .retain(|_, request| request.session_id != id);
449 if removed {
450 self.flush().await?;
451 }
452 Ok(removed)
453 }
454
455 pub async fn append_message(&self, session_id: &str, msg: Message) -> anyhow::Result<()> {
456 let mut sessions = self.sessions.write().await;
457 let session = sessions
458 .get_mut(session_id)
459 .context("session not found for append_message")?;
460 session.messages.push(msg);
461 session.time.updated = Utc::now();
462 drop(sessions);
463 self.flush().await
464 }
465
466 pub async fn append_message_part(
467 &self,
468 session_id: &str,
469 message_id: &str,
470 part: MessagePart,
471 ) -> anyhow::Result<()> {
472 let mut sessions = self.sessions.write().await;
473 let session = sessions
474 .get_mut(session_id)
475 .context("session not found for append_message_part")?;
476 let message = if let Some(message) = session
477 .messages
478 .iter_mut()
479 .find(|message| message.id == message_id)
480 {
481 message
482 } else {
483 session
484 .messages
485 .iter_mut()
486 .rev()
487 .find(|message| matches!(message.role, MessageRole::User))
488 .context("message not found for append_message_part")?
489 };
490 reduce_message_parts(&mut message.parts, part);
491 session.time.updated = Utc::now();
492 drop(sessions);
493 self.flush().await
494 }
495
496 pub async fn fork_session(&self, id: &str) -> anyhow::Result<Option<Session>> {
497 let source = {
498 let sessions = self.sessions.read().await;
499 sessions.get(id).cloned()
500 };
501 let Some(mut child) = source else {
502 return Ok(None);
503 };
504
505 child.id = Uuid::new_v4().to_string();
506 child.title = format!("{} (fork)", child.title);
507 child.time.created = Utc::now();
508 child.time.updated = child.time.created;
509 child.slug = None;
510
511 self.sessions
512 .write()
513 .await
514 .insert(child.id.clone(), child.clone());
515 self.metadata.write().await.insert(
516 child.id.clone(),
517 SessionMeta {
518 parent_id: Some(id.to_string()),
519 snapshots: vec![child.messages.clone()],
520 ..SessionMeta::default()
521 },
522 );
523 self.flush().await?;
524 Ok(Some(child))
525 }
526
527 pub async fn revert_session(&self, id: &str) -> anyhow::Result<bool> {
528 let mut sessions = self.sessions.write().await;
529 let Some(session) = sessions.get_mut(id) else {
530 return Ok(false);
531 };
532 let mut metadata = self.metadata.write().await;
533 let meta = metadata
534 .entry(id.to_string())
535 .or_insert_with(SessionMeta::default);
536 let Some(snapshot) = meta.snapshots.pop() else {
537 return Ok(false);
538 };
539 meta.pre_revert = Some(session.messages.clone());
540 session.messages = snapshot;
541 session.time.updated = Utc::now();
542 drop(metadata);
543 drop(sessions);
544 self.flush().await?;
545 Ok(true)
546 }
547
548 pub async fn unrevert_session(&self, id: &str) -> anyhow::Result<bool> {
549 let mut sessions = self.sessions.write().await;
550 let Some(session) = sessions.get_mut(id) else {
551 return Ok(false);
552 };
553 let mut metadata = self.metadata.write().await;
554 let Some(meta) = metadata.get_mut(id) else {
555 return Ok(false);
556 };
557 let Some(previous) = meta.pre_revert.take() else {
558 return Ok(false);
559 };
560 meta.snapshots.push(session.messages.clone());
561 trim_session_snapshots(&mut meta.snapshots);
562 session.messages = previous;
563 session.time.updated = Utc::now();
564 drop(metadata);
565 drop(sessions);
566 self.flush().await?;
567 Ok(true)
568 }
569
570 pub async fn set_shared(&self, id: &str, shared: bool) -> anyhow::Result<Option<String>> {
571 let mut metadata = self.metadata.write().await;
572 let meta = metadata
573 .entry(id.to_string())
574 .or_insert_with(SessionMeta::default);
575 meta.shared = shared;
576 if shared {
577 if meta.share_id.is_none() {
578 meta.share_id = Some(Uuid::new_v4().to_string());
579 }
580 } else {
581 meta.share_id = None;
582 }
583 let share_id = meta.share_id.clone();
584 drop(metadata);
585 self.flush().await?;
586 Ok(share_id)
587 }
588
589 pub async fn set_archived(&self, id: &str, archived: bool) -> anyhow::Result<bool> {
590 let mut metadata = self.metadata.write().await;
591 let meta = metadata
592 .entry(id.to_string())
593 .or_insert_with(SessionMeta::default);
594 meta.archived = archived;
595 drop(metadata);
596 self.flush().await?;
597 Ok(true)
598 }
599
600 pub async fn set_summary(&self, id: &str, summary: String) -> anyhow::Result<bool> {
601 let mut metadata = self.metadata.write().await;
602 let meta = metadata
603 .entry(id.to_string())
604 .or_insert_with(SessionMeta::default);
605 meta.summary = Some(summary);
606 drop(metadata);
607 self.flush().await?;
608 Ok(true)
609 }
610
611 pub async fn children(&self, parent_id: &str) -> Vec<Session> {
612 let child_ids = {
613 let metadata = self.metadata.read().await;
614 metadata
615 .iter()
616 .filter(|(_, meta)| meta.parent_id.as_deref() == Some(parent_id))
617 .map(|(id, _)| id.clone())
618 .collect::<Vec<_>>()
619 };
620 let sessions = self.sessions.read().await;
621 child_ids
622 .into_iter()
623 .filter_map(|id| sessions.get(&id).cloned())
624 .collect()
625 }
626
627 pub async fn session_status(&self, id: &str) -> Option<Value> {
628 let metadata = self.metadata.read().await;
629 metadata.get(id).map(|meta| {
630 json!({
631 "archived": meta.archived,
632 "shared": meta.shared,
633 "parentID": meta.parent_id,
634 "snapshotCount": meta.snapshots.len()
635 })
636 })
637 }
638
639 pub async fn session_diff(&self, id: &str) -> Option<Value> {
640 let sessions = self.sessions.read().await;
641 let current = sessions.get(id)?;
642 let metadata = self.metadata.read().await;
643 let default = SessionMeta::default();
644 let meta = metadata.get(id).unwrap_or(&default);
645 let last_snapshot_len = meta.snapshots.last().map(|s| s.len()).unwrap_or(0);
646 Some(json!({
647 "sessionID": id,
648 "currentMessageCount": current.messages.len(),
649 "lastSnapshotMessageCount": last_snapshot_len,
650 "delta": current.messages.len() as i64 - last_snapshot_len as i64
651 }))
652 }
653
654 pub async fn set_todos(&self, id: &str, todos: Vec<Value>) -> anyhow::Result<()> {
655 let mut metadata = self.metadata.write().await;
656 let meta = metadata
657 .entry(id.to_string())
658 .or_insert_with(SessionMeta::default);
659 meta.todos = normalize_todo_items(todos);
660 drop(metadata);
661 self.flush().await
662 }
663
664 pub async fn get_todos(&self, id: &str) -> Vec<Value> {
665 let todos = self
666 .metadata
667 .read()
668 .await
669 .get(id)
670 .map(|meta| meta.todos.clone())
671 .unwrap_or_default();
672 normalize_todo_items(todos)
673 }
674
675 pub async fn add_question_request(
676 &self,
677 session_id: &str,
678 message_id: &str,
679 questions: Vec<Value>,
680 ) -> anyhow::Result<QuestionRequest> {
681 if questions.is_empty() {
682 return Err(anyhow::anyhow!(
683 "cannot add empty question request for session {}",
684 session_id
685 ));
686 }
687 let request = QuestionRequest {
688 id: format!("q-{}", Uuid::new_v4()),
689 session_id: session_id.to_string(),
690 questions,
691 tool: Some(QuestionToolRef {
692 call_id: format!("call-{}", Uuid::new_v4()),
693 message_id: message_id.to_string(),
694 }),
695 };
696 self.question_requests
697 .write()
698 .await
699 .insert(request.id.clone(), request.clone());
700 self.flush().await?;
701 Ok(request)
702 }
703
704 pub async fn list_question_requests(&self) -> Vec<QuestionRequest> {
705 self.question_requests
706 .read()
707 .await
708 .values()
709 .cloned()
710 .collect()
711 }
712
713 pub async fn reply_question(&self, request_id: &str) -> anyhow::Result<bool> {
714 let removed = self
715 .question_requests
716 .write()
717 .await
718 .remove(request_id)
719 .is_some();
720 if removed {
721 self.flush().await?;
722 }
723 Ok(removed)
724 }
725
726 pub async fn reject_question(&self, request_id: &str) -> anyhow::Result<bool> {
727 self.reply_question(request_id).await
728 }
729
730 pub async fn attach_session_to_workspace(
731 &self,
732 session_id: &str,
733 target_workspace: &str,
734 reason_tag: &str,
735 ) -> anyhow::Result<Option<Session>> {
736 let Some(target_workspace) = normalize_workspace_path(target_workspace) else {
737 return Ok(None);
738 };
739 let mut sessions = self.sessions.write().await;
740 let Some(session) = sessions.get_mut(session_id) else {
741 return Ok(None);
742 };
743 let previous_workspace = session
744 .workspace_root
745 .clone()
746 .or_else(|| normalize_workspace_path(&session.directory));
747
748 if session.origin_workspace_root.is_none() {
749 session.origin_workspace_root = previous_workspace.clone();
750 }
751 session.attached_from_workspace = previous_workspace;
752 session.attached_to_workspace = Some(target_workspace.clone());
753 session.attach_timestamp_ms = Some(Utc::now().timestamp_millis().max(0) as u64);
754 session.attach_reason = Some(reason_tag.trim().to_string());
755 session.workspace_root = Some(target_workspace.clone());
756 session.project_id = workspace_project_id(&target_workspace);
757 session.directory = target_workspace;
758 session.time.updated = Utc::now();
759 let updated = session.clone();
760 drop(sessions);
761 self.flush().await?;
762 Ok(Some(updated))
763 }
764
765 async fn flush(&self) -> anyhow::Result<()> {
766 let _flush_guard = self.flush_lock.lock().await;
767 {
768 let snapshot = self.sessions.read().await.clone();
769 self.flush_file("sessions.json", &snapshot).await?;
770 }
771 {
772 let metadata_snapshot = self.metadata.read().await.clone();
773 self.flush_file("session_meta.json", &metadata_snapshot)
774 .await?;
775 }
776 {
777 let questions_snapshot = self.question_requests.read().await.clone();
778 self.flush_file("questions.json", &questions_snapshot)
779 .await?;
780 }
781 Ok(())
782 }
783
784 async fn flush_file(&self, filename: &str, data: &impl serde::Serialize) -> anyhow::Result<()> {
785 let path = self.base.join(filename);
786 let temp_path = self.base.join(format!("{}.tmp", filename));
787 let payload = serde_json::to_string_pretty(data)?;
788 fs::write(&temp_path, payload).await.with_context(|| {
789 format!("failed to write temp storage file {}", temp_path.display())
790 })?;
791 let std_temp_path: std::path::PathBuf = temp_path.clone().try_into()?;
792 tokio::task::spawn_blocking(move || {
793 let file = std::fs::File::open(&std_temp_path)?;
794 file.sync_all()?;
795 Ok::<(), std::io::Error>(())
796 })
797 .await??;
798 commit_temp_file(&temp_path, &path).await.with_context(|| {
799 format!(
800 "failed to atomically replace storage file {} with {}",
801 path.display(),
802 temp_path.display()
803 )
804 })?;
805 Ok(())
806 }
807
808 async fn write_legacy_import_marker(&self, marker: &LegacyImportMarker) -> anyhow::Result<()> {
809 let payload = serde_json::to_string_pretty(marker)?;
810 fs::write(self.base.join(LEGACY_IMPORT_MARKER_FILE), payload).await?;
811 Ok(())
812 }
813}
814
815async fn commit_temp_file(temp_path: &Path, path: &Path) -> std::io::Result<()> {
816 match tokio::fs::rename(temp_path, path).await {
817 Ok(()) => Ok(()),
818 Err(err) => {
819 #[cfg(windows)]
820 {
821 use std::io::ErrorKind;
824 if matches!(
825 err.kind(),
826 ErrorKind::PermissionDenied | ErrorKind::AlreadyExists
827 ) {
828 match tokio::fs::remove_file(path).await {
829 Ok(()) => {}
830 Err(remove_err) if remove_err.kind() == ErrorKind::NotFound => {}
831 Err(remove_err) => return Err(remove_err),
832 }
833 return tokio::fs::rename(temp_path, path).await;
834 }
835 }
836 Err(err)
837 }
838 }
839}
840
841fn normalize_todo_items(items: Vec<Value>) -> Vec<Value> {
842 items
843 .into_iter()
844 .filter_map(|item| {
845 let obj = item.as_object()?;
846 let content = obj
847 .get("content")
848 .and_then(|v| v.as_str())
849 .or_else(|| obj.get("text").and_then(|v| v.as_str()))
850 .unwrap_or("")
851 .trim()
852 .to_string();
853 if content.is_empty() {
854 return None;
855 }
856 let id = obj
857 .get("id")
858 .and_then(|v| v.as_str())
859 .filter(|s| !s.trim().is_empty())
860 .map(ToString::to_string)
861 .unwrap_or_else(|| format!("todo-{}", Uuid::new_v4()));
862 let status = obj
863 .get("status")
864 .and_then(|v| v.as_str())
865 .filter(|s| !s.trim().is_empty())
866 .map(ToString::to_string)
867 .unwrap_or_else(|| "pending".to_string());
868 Some(json!({
869 "id": id,
870 "content": content,
871 "status": status
872 }))
873 })
874 .collect()
875}
876
877#[derive(Debug)]
878struct LegacyScanResult {
879 sessions: HashMap<String, Session>,
880 legacy_counts: LegacyTreeCounts,
881 imported_counts: LegacyImportedCounts,
882}
883
884#[derive(Debug, Default)]
885struct LegacyMergeStats {
886 changed: bool,
887 sessions_merged: u64,
888 messages_recovered: u64,
889 parts_recovered: u64,
890}
891
892fn now_ms_u64() -> u64 {
893 Utc::now().timestamp_millis().max(0) as u64
894}
895
896async fn should_run_legacy_scan_on_startup(marker_path: &Path, sessions_exist: bool) -> bool {
897 if !sessions_exist {
898 return true;
899 }
900 if read_legacy_import_marker(marker_path).await.is_none() {
903 return false;
904 }
905 false
906}
907
908async fn read_legacy_import_marker(marker_path: &Path) -> Option<LegacyImportMarker> {
909 let raw = fs::read_to_string(marker_path).await.ok()?;
910 serde_json::from_str::<LegacyImportMarker>(&raw).ok()
911}
912
913fn scan_legacy_sessions(base: &Path) -> anyhow::Result<LegacyScanResult> {
914 let sessions = load_legacy_opencode_sessions(base).unwrap_or_default();
915 let imported_counts = LegacyImportedCounts {
916 sessions: sessions.len() as u64,
917 messages: sessions.values().map(|s| s.messages.len() as u64).sum(),
918 parts: sessions
919 .values()
920 .flat_map(|s| s.messages.iter())
921 .map(|m| m.parts.len() as u64)
922 .sum(),
923 };
924 let legacy_counts = LegacyTreeCounts {
925 session_files: count_legacy_json_files(&base.join("session")),
926 message_files: count_legacy_json_files(&base.join("message")),
927 part_files: count_legacy_json_files(&base.join("part")),
928 };
929 Ok(LegacyScanResult {
930 sessions,
931 legacy_counts,
932 imported_counts,
933 })
934}
935
936fn count_legacy_json_files(root: &Path) -> u64 {
937 if !root.is_dir() {
938 return 0;
939 }
940 let mut count = 0u64;
941 let mut stack = vec![root.to_path_buf()];
942 while let Some(dir) = stack.pop() {
943 if let Ok(entries) = std::fs::read_dir(&dir) {
944 for entry in entries.flatten() {
945 let path = entry.path();
946 if entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
947 stack.push(path);
948 continue;
949 }
950 if path.extension().and_then(|ext| ext.to_str()) == Some("json") {
951 count += 1;
952 }
953 }
954 }
955 }
956 count
957}
958
959fn merge_legacy_sessions(
960 current: &mut HashMap<String, Session>,
961 imported: HashMap<String, Session>,
962) -> bool {
963 merge_legacy_sessions_with_stats(current, imported).changed
964}
965
966fn merge_legacy_sessions_with_stats(
967 current: &mut HashMap<String, Session>,
968 imported: HashMap<String, Session>,
969) -> LegacyMergeStats {
970 let mut stats = LegacyMergeStats::default();
971 for (id, legacy) in imported {
972 let legacy_message_count = legacy.messages.len() as u64;
973 let legacy_part_count = legacy
974 .messages
975 .iter()
976 .map(|m| m.parts.len() as u64)
977 .sum::<u64>();
978 match current.get_mut(&id) {
979 None => {
980 current.insert(id, legacy);
981 stats.changed = true;
982 stats.sessions_merged += 1;
983 stats.messages_recovered += legacy_message_count;
984 stats.parts_recovered += legacy_part_count;
985 }
986 Some(existing) => {
987 let should_merge_messages =
988 existing.messages.is_empty() && !legacy.messages.is_empty();
989 let should_fill_title =
990 existing.title.trim().is_empty() && !legacy.title.trim().is_empty();
991 let should_fill_directory = (existing.directory.trim().is_empty()
992 || existing.directory.trim() == "."
993 || existing.directory.trim() == "./"
994 || existing.directory.trim() == ".\\")
995 && !legacy.directory.trim().is_empty();
996 let should_fill_workspace =
997 existing.workspace_root.is_none() && legacy.workspace_root.is_some();
998 if should_merge_messages {
999 existing.messages = legacy.messages.clone();
1000 }
1001 if should_fill_title {
1002 existing.title = legacy.title.clone();
1003 }
1004 if should_fill_directory {
1005 existing.directory = legacy.directory.clone();
1006 }
1007 if should_fill_workspace {
1008 existing.workspace_root = legacy.workspace_root.clone();
1009 }
1010 if should_merge_messages
1011 || should_fill_title
1012 || should_fill_directory
1013 || should_fill_workspace
1014 {
1015 stats.changed = true;
1016 if should_merge_messages {
1017 stats.sessions_merged += 1;
1018 stats.messages_recovered += legacy_message_count;
1019 stats.parts_recovered += legacy_part_count;
1020 }
1021 }
1022 }
1023 }
1024 }
1025 stats
1026}
1027
1028fn hydrate_workspace_roots(sessions: &mut HashMap<String, Session>) -> bool {
1029 let mut changed = false;
1030 for session in sessions.values_mut() {
1031 if session.workspace_root.is_none() {
1032 let normalized = normalize_workspace_path(&session.directory);
1033 if normalized.is_some() {
1034 session.workspace_root = normalized;
1035 changed = true;
1036 }
1037 }
1038 }
1039 changed
1040}
1041
1042fn repair_session_titles(sessions: &mut HashMap<String, Session>) -> bool {
1043 let mut changed = false;
1044 for session in sessions.values_mut() {
1045 if !title_needs_repair(&session.title) {
1046 continue;
1047 }
1048 let first_user_text = session.messages.iter().find_map(|message| {
1049 if !matches!(message.role, MessageRole::User) {
1050 return None;
1051 }
1052 message.parts.iter().find_map(|part| match part {
1053 MessagePart::Text { text } if !text.trim().is_empty() => Some(text.as_str()),
1054 _ => None,
1055 })
1056 });
1057 let Some(source) = first_user_text else {
1058 continue;
1059 };
1060 let Some(derived) = derive_session_title_from_prompt(source, 60) else {
1061 continue;
1062 };
1063 if derived == session.title {
1064 continue;
1065 }
1066 session.title = derived;
1067 session.time.updated = Utc::now();
1068 changed = true;
1069 }
1070 changed
1071}
1072
1073#[derive(Debug, Deserialize)]
1074struct LegacySessionTime {
1075 created: i64,
1076 updated: i64,
1077}
1078
1079#[derive(Debug, Deserialize)]
1080struct LegacySession {
1081 id: String,
1082 slug: Option<String>,
1083 version: Option<String>,
1084 #[serde(rename = "projectID")]
1085 project_id: Option<String>,
1086 title: Option<String>,
1087 directory: Option<String>,
1088 time: LegacySessionTime,
1089}
1090
1091fn load_legacy_opencode_sessions(base: &Path) -> anyhow::Result<HashMap<String, Session>> {
1092 let legacy_root = base.join("session");
1093 if !legacy_root.is_dir() {
1094 return Ok(HashMap::new());
1095 }
1096
1097 let mut out = HashMap::new();
1098 let mut stack = vec![legacy_root];
1099 while let Some(dir) = stack.pop() {
1100 for entry in std::fs::read_dir(&dir)? {
1101 let entry = entry?;
1102 let path = entry.path();
1103 if entry.file_type()?.is_dir() {
1104 stack.push(path);
1105 continue;
1106 }
1107 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1108 continue;
1109 }
1110 let raw = match std::fs::read_to_string(&path) {
1111 Ok(v) => v,
1112 Err(_) => continue,
1113 };
1114 let legacy = match serde_json::from_str::<LegacySession>(&raw) {
1115 Ok(v) => v,
1116 Err(_) => continue,
1117 };
1118 let created = Utc
1119 .timestamp_millis_opt(legacy.time.created)
1120 .single()
1121 .unwrap_or_else(Utc::now);
1122 let updated = Utc
1123 .timestamp_millis_opt(legacy.time.updated)
1124 .single()
1125 .unwrap_or(created);
1126
1127 let session_id = legacy.id.clone();
1128 out.insert(
1129 session_id.clone(),
1130 Session {
1131 id: session_id.clone(),
1132 slug: legacy.slug,
1133 version: legacy.version,
1134 project_id: legacy.project_id,
1135 title: legacy
1136 .title
1137 .filter(|s| !s.trim().is_empty())
1138 .unwrap_or_else(|| "New session".to_string()),
1139 directory: legacy
1140 .directory
1141 .clone()
1142 .filter(|s| !s.trim().is_empty())
1143 .unwrap_or_else(|| ".".to_string()),
1144 workspace_root: legacy
1145 .directory
1146 .as_deref()
1147 .and_then(normalize_workspace_path),
1148 origin_workspace_root: None,
1149 attached_from_workspace: None,
1150 attached_to_workspace: None,
1151 attach_timestamp_ms: None,
1152 attach_reason: None,
1153 tenant_context: tandem_types::LocalImplicitTenant.into(),
1154 time: tandem_types::SessionTime { created, updated },
1155 model: None,
1156 provider: None,
1157 environment: None,
1158 messages: load_legacy_session_messages(base, &session_id),
1159 },
1160 );
1161 }
1162 }
1163 Ok(out)
1164}
1165
1166#[derive(Debug, Deserialize)]
1167struct LegacyMessageTime {
1168 created: i64,
1169}
1170
1171#[derive(Debug, Deserialize)]
1172struct LegacyMessage {
1173 id: String,
1174 role: String,
1175 time: LegacyMessageTime,
1176}
1177
1178#[derive(Debug, Deserialize)]
1179struct LegacyPart {
1180 #[serde(rename = "type")]
1181 part_type: Option<String>,
1182 text: Option<String>,
1183 tool: Option<String>,
1184 args: Option<Value>,
1185 result: Option<Value>,
1186 error: Option<String>,
1187}
1188
1189fn load_legacy_session_messages(base: &Path, session_id: &str) -> Vec<Message> {
1190 let msg_dir = base.join("message").join(session_id);
1191 if !msg_dir.is_dir() {
1192 return Vec::new();
1193 }
1194
1195 let mut legacy_messages: Vec<(i64, Message)> = Vec::new();
1196
1197 let Ok(entries) = std::fs::read_dir(&msg_dir) else {
1198 return Vec::new();
1199 };
1200
1201 for entry in entries.flatten() {
1202 let path = entry.path();
1203 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1204 continue;
1205 }
1206 let Ok(raw) = std::fs::read_to_string(&path) else {
1207 continue;
1208 };
1209 let Ok(legacy) = serde_json::from_str::<LegacyMessage>(&raw) else {
1210 continue;
1211 };
1212
1213 let created_at = Utc
1214 .timestamp_millis_opt(legacy.time.created)
1215 .single()
1216 .unwrap_or_else(Utc::now);
1217
1218 legacy_messages.push((
1219 legacy.time.created,
1220 Message {
1221 id: legacy.id.clone(),
1222 role: legacy_role_to_message_role(&legacy.role),
1223 parts: load_legacy_message_parts(base, &legacy.id),
1224 created_at,
1225 },
1226 ));
1227 }
1228
1229 legacy_messages.sort_by_key(|(created_ms, _)| *created_ms);
1230 legacy_messages.into_iter().map(|(_, msg)| msg).collect()
1231}
1232
1233fn load_legacy_message_parts(base: &Path, message_id: &str) -> Vec<MessagePart> {
1234 let parts_dir = base.join("part").join(message_id);
1235 if !parts_dir.is_dir() {
1236 return Vec::new();
1237 }
1238
1239 let Ok(entries) = std::fs::read_dir(&parts_dir) else {
1240 return Vec::new();
1241 };
1242
1243 let mut out = Vec::new();
1244 for entry in entries.flatten() {
1245 let path = entry.path();
1246 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1247 continue;
1248 }
1249 let Ok(raw) = std::fs::read_to_string(&path) else {
1250 continue;
1251 };
1252 let Ok(part) = serde_json::from_str::<LegacyPart>(&raw) else {
1253 continue;
1254 };
1255
1256 let mapped = if let Some(tool) = part.tool {
1257 Some(MessagePart::ToolInvocation {
1258 tool,
1259 args: part.args.unwrap_or_else(|| json!({})),
1260 result: part.result,
1261 error: part.error,
1262 })
1263 } else {
1264 match part.part_type.as_deref() {
1265 Some("reasoning") => Some(MessagePart::Reasoning {
1266 text: part.text.unwrap_or_default(),
1267 }),
1268 Some("tool") => Some(MessagePart::ToolInvocation {
1269 tool: "tool".to_string(),
1270 args: part.args.unwrap_or_else(|| json!({})),
1271 result: part.result,
1272 error: part.error,
1273 }),
1274 Some("text") | None => Some(MessagePart::Text {
1275 text: part.text.unwrap_or_default(),
1276 }),
1277 _ => None,
1278 }
1279 };
1280
1281 if let Some(part) = mapped {
1282 out.push(part);
1283 }
1284 }
1285 out
1286}
1287
1288fn legacy_role_to_message_role(role: &str) -> MessageRole {
1289 match role.to_lowercase().as_str() {
1290 "user" => MessageRole::User,
1291 "assistant" => MessageRole::Assistant,
1292 "system" => MessageRole::System,
1293 "tool" => MessageRole::Tool,
1294 _ => MessageRole::Assistant,
1295 }
1296}
1297
1298#[derive(Debug, Clone, Default)]
1299struct MessageMergeStats {
1300 messages_recovered: u64,
1301 parts_recovered: u64,
1302 conflicts_merged: u64,
1303}
1304
1305fn message_richness(msg: &Message) -> usize {
1306 msg.parts
1307 .iter()
1308 .map(|p| match p {
1309 MessagePart::Text { text } | MessagePart::Reasoning { text } => {
1310 if text.trim().is_empty() {
1311 0
1312 } else {
1313 1
1314 }
1315 }
1316 MessagePart::ToolInvocation { result, error, .. } => {
1317 if result.is_some() || error.is_some() {
1318 2
1319 } else {
1320 1
1321 }
1322 }
1323 })
1324 .sum()
1325}
1326
1327fn most_recent_message_time(messages: &[Message]) -> Option<chrono::DateTime<Utc>> {
1328 messages.iter().map(|m| m.created_at).max()
1329}
1330
1331fn merge_session_messages(
1332 existing: &[Message],
1333 imported: &[Message],
1334) -> (Vec<Message>, MessageMergeStats, bool) {
1335 if existing.is_empty() {
1336 let messages_recovered = imported.len() as u64;
1337 let parts_recovered = imported.iter().map(|m| m.parts.len() as u64).sum();
1338 return (
1339 imported.to_vec(),
1340 MessageMergeStats {
1341 messages_recovered,
1342 parts_recovered,
1343 conflicts_merged: 0,
1344 },
1345 true,
1346 );
1347 }
1348
1349 let mut merged_by_id: HashMap<String, Message> = existing
1350 .iter()
1351 .cloned()
1352 .map(|m| (m.id.clone(), m))
1353 .collect();
1354 let mut stats = MessageMergeStats::default();
1355 let mut changed = false;
1356
1357 for incoming in imported {
1358 match merged_by_id.get(&incoming.id) {
1359 None => {
1360 merged_by_id.insert(incoming.id.clone(), incoming.clone());
1361 stats.messages_recovered += 1;
1362 stats.parts_recovered += incoming.parts.len() as u64;
1363 changed = true;
1364 }
1365 Some(current) => {
1366 let incoming_richer = message_richness(incoming) > message_richness(current)
1367 || incoming.parts.len() > current.parts.len();
1368 if incoming_richer {
1369 merged_by_id.insert(incoming.id.clone(), incoming.clone());
1370 stats.conflicts_merged += 1;
1371 changed = true;
1372 }
1373 }
1374 }
1375 }
1376
1377 let mut out: Vec<Message> = merged_by_id.into_values().collect();
1378 out.sort_by_key(|m| m.created_at);
1379 (out, stats, changed)
1380}
1381
1382#[cfg(test)]
1383mod tests {
1384 use super::*;
1385 use std::fs as stdfs;
1386 use std::sync::Arc;
1387
1388 #[tokio::test]
1389 async fn todos_are_normalized_to_wire_shape() {
1390 let base = std::env::temp_dir().join(format!("tandem-core-test-{}", Uuid::new_v4()));
1391 let storage = Storage::new(&base).await.expect("storage");
1392 let session = Session::new(Some("test".to_string()), Some(".".to_string()));
1393 let id = session.id.clone();
1394 storage.save_session(session).await.expect("save session");
1395
1396 storage
1397 .set_todos(
1398 &id,
1399 vec![
1400 json!({"content":"first"}),
1401 json!({"text":"second", "status":"in_progress"}),
1402 json!({"id":"keep-id","content":"third","status":"completed"}),
1403 ],
1404 )
1405 .await
1406 .expect("set todos");
1407
1408 let todos = storage.get_todos(&id).await;
1409 assert_eq!(todos.len(), 3);
1410 for todo in todos {
1411 assert!(todo.get("id").and_then(|v| v.as_str()).is_some());
1412 assert!(todo.get("content").and_then(|v| v.as_str()).is_some());
1413 assert!(todo.get("status").and_then(|v| v.as_str()).is_some());
1414 }
1415 }
1416
1417 #[tokio::test]
1418 async fn imports_legacy_opencode_session_index_when_sessions_json_missing() {
1419 let base =
1420 std::env::temp_dir().join(format!("tandem-core-legacy-import-{}", Uuid::new_v4()));
1421 let legacy_session_dir = base.join("session").join("global");
1422 stdfs::create_dir_all(&legacy_session_dir).expect("legacy session dir");
1423 stdfs::write(
1424 legacy_session_dir.join("ses_test.json"),
1425 r#"{
1426 "id": "ses_test",
1427 "slug": "test",
1428 "version": "1.0.0",
1429 "projectID": "proj_1",
1430 "directory": "C:\\work\\demo",
1431 "title": "Legacy Session",
1432 "time": { "created": 1770913145613, "updated": 1770913146613 }
1433}"#,
1434 )
1435 .expect("legacy session write");
1436
1437 let storage = Storage::new(&base).await.expect("storage");
1438 let sessions = storage.list_sessions().await;
1439 assert_eq!(sessions.len(), 1);
1440 assert_eq!(sessions[0].id, "ses_test");
1441 assert_eq!(sessions[0].title, "Legacy Session");
1442 assert!(base.join("sessions.json").exists());
1443 }
1444
1445 #[tokio::test]
1446 async fn imports_legacy_messages_and_parts_for_session() {
1447 let base = std::env::temp_dir().join(format!("tandem-core-legacy-msg-{}", Uuid::new_v4()));
1448 let session_dir = base.join("session").join("global");
1449 let message_dir = base.join("message").join("ses_test");
1450 let part_dir = base.join("part").join("msg_1");
1451 stdfs::create_dir_all(&session_dir).expect("session dir");
1452 stdfs::create_dir_all(&message_dir).expect("message dir");
1453 stdfs::create_dir_all(&part_dir).expect("part dir");
1454
1455 stdfs::write(
1456 session_dir.join("ses_test.json"),
1457 r#"{
1458 "id": "ses_test",
1459 "projectID": "proj_1",
1460 "directory": "C:\\work\\demo",
1461 "title": "Legacy Session",
1462 "time": { "created": 1770913145613, "updated": 1770913146613 }
1463}"#,
1464 )
1465 .expect("write session");
1466
1467 stdfs::write(
1468 message_dir.join("msg_1.json"),
1469 r#"{
1470 "id": "msg_1",
1471 "sessionID": "ses_test",
1472 "role": "assistant",
1473 "time": { "created": 1770913145613 }
1474}"#,
1475 )
1476 .expect("write msg");
1477
1478 stdfs::write(
1479 part_dir.join("prt_1.json"),
1480 r#"{
1481 "id": "prt_1",
1482 "sessionID": "ses_test",
1483 "messageID": "msg_1",
1484 "type": "text",
1485 "text": "hello from legacy"
1486}"#,
1487 )
1488 .expect("write part");
1489
1490 let storage = Storage::new(&base).await.expect("storage");
1491 let sessions = storage.list_sessions().await;
1492 assert_eq!(sessions.len(), 1);
1493 assert_eq!(sessions[0].messages.len(), 1);
1494 assert_eq!(sessions[0].messages[0].parts.len(), 1);
1495 }
1496
1497 #[tokio::test]
1498 async fn skips_legacy_merge_when_sessions_json_exists() {
1499 let base =
1500 std::env::temp_dir().join(format!("tandem-core-legacy-merge-{}", Uuid::new_v4()));
1501 stdfs::create_dir_all(&base).expect("base");
1502 stdfs::write(
1503 base.join("sessions.json"),
1504 r#"{
1505 "ses_current": {
1506 "id": "ses_current",
1507 "slug": null,
1508 "version": "v1",
1509 "project_id": null,
1510 "title": "Current Session",
1511 "directory": ".",
1512 "workspace_root": null,
1513 "origin_workspace_root": null,
1514 "attached_from_workspace": null,
1515 "attached_to_workspace": null,
1516 "attach_timestamp_ms": null,
1517 "attach_reason": null,
1518 "time": {"created":"2026-01-01T00:00:00Z","updated":"2026-01-01T00:00:00Z"},
1519 "model": null,
1520 "provider": null,
1521 "messages": []
1522 }
1523}"#,
1524 )
1525 .expect("sessions.json");
1526
1527 let legacy_session_dir = base.join("session").join("global");
1528 stdfs::create_dir_all(&legacy_session_dir).expect("legacy session dir");
1529 stdfs::write(
1530 legacy_session_dir.join("ses_legacy.json"),
1531 r#"{
1532 "id": "ses_legacy",
1533 "slug": "legacy",
1534 "version": "1.0.0",
1535 "projectID": "proj_legacy",
1536 "directory": "C:\\work\\legacy",
1537 "title": "Legacy Session",
1538 "time": { "created": 1770913145613, "updated": 1770913146613 }
1539}"#,
1540 )
1541 .expect("legacy session write");
1542
1543 let storage = Storage::new(&base).await.expect("storage");
1544 let sessions = storage.list_sessions().await;
1545 let ids = sessions.iter().map(|s| s.id.clone()).collect::<Vec<_>>();
1546 assert!(ids.contains(&"ses_current".to_string()));
1547 assert!(!ids.contains(&"ses_legacy".to_string()));
1548 }
1549
1550 #[tokio::test]
1551 async fn list_sessions_scoped_filters_by_workspace_root() {
1552 let base = std::env::temp_dir().join(format!("tandem-core-scope-{}", Uuid::new_v4()));
1553 let storage = Storage::new(&base).await.expect("storage");
1554 let ws_a = base.join("ws-a");
1555 let ws_b = base.join("ws-b");
1556 stdfs::create_dir_all(&ws_a).expect("ws_a");
1557 stdfs::create_dir_all(&ws_b).expect("ws_b");
1558 let ws_a_str = ws_a.to_string_lossy().to_string();
1559 let ws_b_str = ws_b.to_string_lossy().to_string();
1560
1561 let mut a = Session::new(Some("a".to_string()), Some(ws_a_str.clone()));
1562 a.workspace_root = Some(ws_a_str.clone());
1563 storage.save_session(a).await.expect("save a");
1564
1565 let mut b = Session::new(Some("b".to_string()), Some(ws_b_str.clone()));
1566 b.workspace_root = Some(ws_b_str);
1567 storage.save_session(b).await.expect("save b");
1568
1569 let scoped = storage
1570 .list_sessions_scoped(SessionListScope::Workspace {
1571 workspace_root: ws_a_str,
1572 })
1573 .await;
1574 assert_eq!(scoped.len(), 1);
1575 assert_eq!(scoped[0].title, "a");
1576 }
1577
1578 #[tokio::test]
1579 async fn attach_session_persists_audit_metadata() {
1580 let base = std::env::temp_dir().join(format!("tandem-core-attach-{}", Uuid::new_v4()));
1581 let storage = Storage::new(&base).await.expect("storage");
1582 let ws_a = base.join("ws-a");
1583 let ws_b = base.join("ws-b");
1584 stdfs::create_dir_all(&ws_a).expect("ws_a");
1585 stdfs::create_dir_all(&ws_b).expect("ws_b");
1586 let ws_a_str = ws_a.to_string_lossy().to_string();
1587 let ws_b_str = ws_b.to_string_lossy().to_string();
1588 let mut session = Session::new(Some("s".to_string()), Some(ws_a_str.clone()));
1589 session.workspace_root = Some(ws_a_str);
1590 let id = session.id.clone();
1591 storage.save_session(session).await.expect("save");
1592
1593 let updated = storage
1594 .attach_session_to_workspace(&id, &ws_b_str, "manual")
1595 .await
1596 .expect("attach")
1597 .expect("session exists");
1598 let normalized_expected = normalize_workspace_path(&ws_b_str).expect("normalized path");
1599 assert_eq!(
1600 updated.workspace_root.as_deref(),
1601 Some(normalized_expected.as_str())
1602 );
1603 assert_eq!(
1604 updated.attached_to_workspace.as_deref(),
1605 Some(normalized_expected.as_str())
1606 );
1607 assert_eq!(updated.attach_reason.as_deref(), Some("manual"));
1608 assert!(updated.attach_timestamp_ms.is_some());
1609 }
1610
1611 #[tokio::test]
1612 async fn append_message_part_persists_tool_invocation_and_result() {
1613 let base = std::env::temp_dir().join(format!("tandem-core-tool-parts-{}", Uuid::new_v4()));
1614 let storage = Storage::new(&base).await.expect("storage");
1615 let session = Session::new(Some("tool parts".to_string()), Some(".".to_string()));
1616 let session_id = session.id.clone();
1617 storage.save_session(session).await.expect("save session");
1618
1619 let user = Message::new(
1620 MessageRole::User,
1621 vec![MessagePart::Text {
1622 text: "build ui".to_string(),
1623 }],
1624 );
1625 let message_id = user.id.clone();
1626 storage
1627 .append_message(&session_id, user)
1628 .await
1629 .expect("append user");
1630
1631 storage
1632 .append_message_part(
1633 &session_id,
1634 &message_id,
1635 MessagePart::ToolInvocation {
1636 tool: "write".to_string(),
1637 args: json!({"path":"game.html","content":"<html></html>"}),
1638 result: None,
1639 error: None,
1640 },
1641 )
1642 .await
1643 .expect("append invocation");
1644 storage
1645 .append_message_part(
1646 &session_id,
1647 &message_id,
1648 MessagePart::ToolInvocation {
1649 tool: "write".to_string(),
1650 args: json!({}),
1651 result: Some(json!("ok")),
1652 error: None,
1653 },
1654 )
1655 .await
1656 .expect("append result");
1657
1658 let session = storage.get_session(&session_id).await.expect("session");
1659 let message = session
1660 .messages
1661 .iter()
1662 .find(|message| message.id == message_id)
1663 .expect("message");
1664 assert_eq!(message.parts.len(), 2);
1665 match &message.parts[1] {
1666 MessagePart::ToolInvocation {
1667 tool,
1668 result,
1669 error,
1670 ..
1671 } => {
1672 assert_eq!(tool, "write");
1673 assert_eq!(result.as_ref(), Some(&json!("ok")));
1674 assert_eq!(error.as_deref(), None);
1675 }
1676 other => panic!("expected tool part, got {other:?}"),
1677 }
1678 }
1679
1680 #[tokio::test]
1681 async fn append_message_part_retains_failed_tool_error() {
1682 let base = std::env::temp_dir().join(format!("tandem-core-tool-error-{}", Uuid::new_v4()));
1683 let storage = Storage::new(&base).await.expect("storage");
1684 let session = Session::new(Some("tool errors".to_string()), Some(".".to_string()));
1685 let session_id = session.id.clone();
1686 storage.save_session(session).await.expect("save session");
1687
1688 let user = Message::new(
1689 MessageRole::User,
1690 vec![MessagePart::Text {
1691 text: "write file".to_string(),
1692 }],
1693 );
1694 let message_id = user.id.clone();
1695 storage
1696 .append_message(&session_id, user)
1697 .await
1698 .expect("append user");
1699
1700 storage
1701 .append_message_part(
1702 &session_id,
1703 &message_id,
1704 MessagePart::ToolInvocation {
1705 tool: "write".to_string(),
1706 args: json!({"path":"game.html"}),
1707 result: None,
1708 error: None,
1709 },
1710 )
1711 .await
1712 .expect("append invocation");
1713 storage
1714 .append_message_part(
1715 &session_id,
1716 &message_id,
1717 MessagePart::ToolInvocation {
1718 tool: "write".to_string(),
1719 args: json!({}),
1720 result: None,
1721 error: Some("WRITE_CONTENT_MISSING".to_string()),
1722 },
1723 )
1724 .await
1725 .expect("append error");
1726
1727 let session = storage.get_session(&session_id).await.expect("session");
1728 let message = session
1729 .messages
1730 .iter()
1731 .find(|message| message.id == message_id)
1732 .expect("message");
1733 match &message.parts[1] {
1734 MessagePart::ToolInvocation { error, .. } => {
1735 assert_eq!(error.as_deref(), Some("WRITE_CONTENT_MISSING"));
1736 }
1737 other => panic!("expected tool part, got {other:?}"),
1738 }
1739 }
1740
1741 #[tokio::test]
1742 async fn append_message_part_coalesces_repeated_tool_invocation_updates() {
1743 let base = std::env::temp_dir().join(format!("tandem-core-tool-merge-{}", Uuid::new_v4()));
1744 let storage = Storage::new(&base).await.expect("storage");
1745 let session = Session::new(Some("tool merge".to_string()), Some(".".to_string()));
1746 let session_id = session.id.clone();
1747 storage.save_session(session).await.expect("save session");
1748
1749 let user = Message::new(
1750 MessageRole::User,
1751 vec![MessagePart::Text {
1752 text: "build ui".to_string(),
1753 }],
1754 );
1755 let message_id = user.id.clone();
1756 storage
1757 .append_message(&session_id, user)
1758 .await
1759 .expect("append user");
1760
1761 storage
1762 .append_message_part(
1763 &session_id,
1764 &message_id,
1765 MessagePart::ToolInvocation {
1766 tool: "write".to_string(),
1767 args: json!({"path":"game.html"}),
1768 result: None,
1769 error: None,
1770 },
1771 )
1772 .await
1773 .expect("append first invocation");
1774 storage
1775 .append_message_part(
1776 &session_id,
1777 &message_id,
1778 MessagePart::ToolInvocation {
1779 tool: "write".to_string(),
1780 args: json!({"path":"game.html","content":"<html></html>"}),
1781 result: None,
1782 error: None,
1783 },
1784 )
1785 .await
1786 .expect("append updated invocation");
1787
1788 let session = storage.get_session(&session_id).await.expect("session");
1789 let message = session
1790 .messages
1791 .iter()
1792 .find(|message| message.id == message_id)
1793 .expect("message");
1794 assert_eq!(message.parts.len(), 2);
1795 match &message.parts[1] {
1796 MessagePart::ToolInvocation { tool, args, .. } => {
1797 assert_eq!(tool, "write");
1798 assert_eq!(args["path"], "game.html");
1799 assert_eq!(args["content"], "<html></html>");
1800 }
1801 other => panic!("expected tool part, got {other:?}"),
1802 }
1803 }
1804
1805 #[tokio::test]
1806 async fn append_message_part_upgrades_raw_string_args_to_structured_invocation_args() {
1807 let base =
1808 std::env::temp_dir().join(format!("tandem-core-tool-raw-upgrade-{}", Uuid::new_v4()));
1809 let storage = Storage::new(&base).await.expect("storage");
1810 let session = Session::new(Some("tool raw upgrade".to_string()), Some(".".to_string()));
1811 let session_id = session.id.clone();
1812 storage.save_session(session).await.expect("save session");
1813
1814 let user = Message::new(
1815 MessageRole::User,
1816 vec![MessagePart::Text {
1817 text: "build ui".to_string(),
1818 }],
1819 );
1820 let message_id = user.id.clone();
1821 storage
1822 .append_message(&session_id, user)
1823 .await
1824 .expect("append user");
1825
1826 storage
1827 .append_message_part(
1828 &session_id,
1829 &message_id,
1830 MessagePart::ToolInvocation {
1831 tool: "write".to_string(),
1832 args: json!("{\"path\":\"game.html\",\"content\":\"<html>draft</html>\"}"),
1833 result: None,
1834 error: None,
1835 },
1836 )
1837 .await
1838 .expect("append raw invocation");
1839 storage
1840 .append_message_part(
1841 &session_id,
1842 &message_id,
1843 MessagePart::ToolInvocation {
1844 tool: "write".to_string(),
1845 args: json!({"path":"game.html","content":"<html>draft</html>"}),
1846 result: None,
1847 error: None,
1848 },
1849 )
1850 .await
1851 .expect("append structured invocation");
1852
1853 let session = storage.get_session(&session_id).await.expect("session");
1854 let message = session
1855 .messages
1856 .iter()
1857 .find(|message| message.id == message_id)
1858 .expect("message");
1859 assert_eq!(message.parts.len(), 2);
1860 match &message.parts[1] {
1861 MessagePart::ToolInvocation { tool, args, .. } => {
1862 assert_eq!(tool, "write");
1863 assert_eq!(args["path"], "game.html");
1864 assert_eq!(args["content"], "<html>draft</html>");
1865 }
1866 other => panic!("expected tool part, got {other:?}"),
1867 }
1868 }
1869
1870 #[tokio::test]
1871 async fn append_message_part_upgrades_raw_string_args_when_result_arrives_with_structure() {
1872 let base = std::env::temp_dir().join(format!(
1873 "tandem-core-tool-raw-result-upgrade-{}",
1874 Uuid::new_v4()
1875 ));
1876 let storage = Storage::new(&base).await.expect("storage");
1877 let session = Session::new(
1878 Some("tool raw result upgrade".to_string()),
1879 Some(".".to_string()),
1880 );
1881 let session_id = session.id.clone();
1882 storage.save_session(session).await.expect("save session");
1883
1884 let user = Message::new(
1885 MessageRole::User,
1886 vec![MessagePart::Text {
1887 text: "build ui".to_string(),
1888 }],
1889 );
1890 let message_id = user.id.clone();
1891 storage
1892 .append_message(&session_id, user)
1893 .await
1894 .expect("append user");
1895
1896 storage
1897 .append_message_part(
1898 &session_id,
1899 &message_id,
1900 MessagePart::ToolInvocation {
1901 tool: "write".to_string(),
1902 args: json!("{\"path\":\"game.html\",\"content\":\"<html>draft</html>\"}"),
1903 result: None,
1904 error: None,
1905 },
1906 )
1907 .await
1908 .expect("append raw invocation");
1909 storage
1910 .append_message_part(
1911 &session_id,
1912 &message_id,
1913 MessagePart::ToolInvocation {
1914 tool: "write".to_string(),
1915 args: json!({"path":"game.html","content":"<html>draft</html>"}),
1916 result: Some(json!("ok")),
1917 error: None,
1918 },
1919 )
1920 .await
1921 .expect("append structured result");
1922
1923 let session = storage.get_session(&session_id).await.expect("session");
1924 let message = session
1925 .messages
1926 .iter()
1927 .find(|message| message.id == message_id)
1928 .expect("message");
1929 assert_eq!(message.parts.len(), 2);
1930 match &message.parts[1] {
1931 MessagePart::ToolInvocation {
1932 tool,
1933 args,
1934 result,
1935 error,
1936 } => {
1937 assert_eq!(tool, "write");
1938 assert_eq!(args["path"], "game.html");
1939 assert_eq!(args["content"], "<html>draft</html>");
1940 assert_eq!(result.as_ref(), Some(&json!("ok")));
1941 assert_eq!(error.as_deref(), None);
1942 }
1943 other => panic!("expected tool part, got {other:?}"),
1944 }
1945 }
1946
1947 #[tokio::test]
1948 async fn append_message_part_upgrades_partial_structured_args_when_result_adds_fields() {
1949 let base = std::env::temp_dir().join(format!(
1950 "tandem-core-tool-structured-result-upgrade-{}",
1951 Uuid::new_v4()
1952 ));
1953 let storage = Storage::new(&base).await.expect("storage");
1954 let session = Session::new(
1955 Some("tool structured result upgrade".to_string()),
1956 Some(".".to_string()),
1957 );
1958 let session_id = session.id.clone();
1959 storage.save_session(session).await.expect("save session");
1960
1961 let user = Message::new(
1962 MessageRole::User,
1963 vec![MessagePart::Text {
1964 text: "build ui".to_string(),
1965 }],
1966 );
1967 let message_id = user.id.clone();
1968 storage
1969 .append_message(&session_id, user)
1970 .await
1971 .expect("append user");
1972
1973 storage
1974 .append_message_part(
1975 &session_id,
1976 &message_id,
1977 MessagePart::ToolInvocation {
1978 tool: "write".to_string(),
1979 args: json!({"path":"game.html"}),
1980 result: None,
1981 error: None,
1982 },
1983 )
1984 .await
1985 .expect("append partial invocation");
1986 storage
1987 .append_message_part(
1988 &session_id,
1989 &message_id,
1990 MessagePart::ToolInvocation {
1991 tool: "write".to_string(),
1992 args: json!({"path":"game.html","content":"<html>draft</html>"}),
1993 result: Some(json!("ok")),
1994 error: None,
1995 },
1996 )
1997 .await
1998 .expect("append richer result");
1999
2000 let session = storage.get_session(&session_id).await.expect("session");
2001 let message = session
2002 .messages
2003 .iter()
2004 .find(|message| message.id == message_id)
2005 .expect("message");
2006 assert_eq!(message.parts.len(), 2);
2007 match &message.parts[1] {
2008 MessagePart::ToolInvocation {
2009 tool,
2010 args,
2011 result,
2012 error,
2013 } => {
2014 assert_eq!(tool, "write");
2015 assert_eq!(args["path"], "game.html");
2016 assert_eq!(args["content"], "<html>draft</html>");
2017 assert_eq!(result.as_ref(), Some(&json!("ok")));
2018 assert_eq!(error.as_deref(), None);
2019 }
2020 other => panic!("expected tool part, got {other:?}"),
2021 }
2022 }
2023
2024 #[tokio::test]
2025 async fn append_message_part_replaces_malformed_object_args_with_structured_result_args() {
2026 let base = std::env::temp_dir().join(format!(
2027 "tandem-core-tool-malformed-args-replace-{}",
2028 Uuid::new_v4()
2029 ));
2030 let storage = Storage::new(&base).await.expect("storage");
2031 let session = Session::new(
2032 Some("tool malformed args replacement".to_string()),
2033 Some(".".to_string()),
2034 );
2035 let session_id = session.id.clone();
2036 storage.save_session(session).await.expect("save session");
2037
2038 let user = Message::new(
2039 MessageRole::User,
2040 vec![MessagePart::Text {
2041 text: "build ui".to_string(),
2042 }],
2043 );
2044 let message_id = user.id.clone();
2045 storage
2046 .append_message(&session_id, user)
2047 .await
2048 .expect("append user");
2049
2050 storage
2051 .append_message_part(
2052 &session_id,
2053 &message_id,
2054 MessagePart::ToolInvocation {
2055 tool: "write".to_string(),
2056 args: json!({"{\"allow_empty": null}),
2057 result: None,
2058 error: None,
2059 },
2060 )
2061 .await
2062 .expect("append malformed invocation");
2063 storage
2064 .append_message_part(
2065 &session_id,
2066 &message_id,
2067 MessagePart::ToolInvocation {
2068 tool: "write".to_string(),
2069 args: json!({"path":"game.html","content":"<html>draft</html>"}),
2070 result: Some(json!("ok")),
2071 error: None,
2072 },
2073 )
2074 .await
2075 .expect("append structured result");
2076
2077 let session = storage.get_session(&session_id).await.expect("session");
2078 let message = session
2079 .messages
2080 .iter()
2081 .find(|message| message.id == message_id)
2082 .expect("message");
2083 assert_eq!(message.parts.len(), 2);
2084 match &message.parts[1] {
2085 MessagePart::ToolInvocation {
2086 tool,
2087 args,
2088 result,
2089 error,
2090 } => {
2091 assert_eq!(tool, "write");
2092 assert_eq!(args["path"], "game.html");
2093 assert_eq!(args["content"], "<html>draft</html>");
2094 assert_eq!(result.as_ref(), Some(&json!("ok")));
2095 assert_eq!(error.as_deref(), None);
2096 }
2097 other => panic!("expected tool part, got {other:?}"),
2098 }
2099 }
2100
2101 #[tokio::test]
2102 async fn append_message_part_replaces_partial_write_args_when_result_adds_path_and_content() {
2103 let base = std::env::temp_dir().join(format!(
2104 "tandem-core-tool-partial-write-args-replace-{}",
2105 Uuid::new_v4()
2106 ));
2107 let storage = Storage::new(&base).await.expect("storage");
2108 let session = Session::new(
2109 Some("tool partial write args replacement".to_string()),
2110 Some(".".to_string()),
2111 );
2112 let session_id = session.id.clone();
2113 storage.save_session(session).await.expect("save session");
2114
2115 let user = Message::new(
2116 MessageRole::User,
2117 vec![MessagePart::Text {
2118 text: "build ui".to_string(),
2119 }],
2120 );
2121 let message_id = user.id.clone();
2122 storage
2123 .append_message(&session_id, user)
2124 .await
2125 .expect("append user");
2126
2127 storage
2128 .append_message_part(
2129 &session_id,
2130 &message_id,
2131 MessagePart::ToolInvocation {
2132 tool: "write".to_string(),
2133 args: json!({"content": ""}),
2134 result: None,
2135 error: None,
2136 },
2137 )
2138 .await
2139 .expect("append partial invocation");
2140 storage
2141 .append_message_part(
2142 &session_id,
2143 &message_id,
2144 MessagePart::ToolInvocation {
2145 tool: "write".to_string(),
2146 args: json!({"path":"notes/report.md","content":"# Report\n"}),
2147 result: Some(json!("ok")),
2148 error: None,
2149 },
2150 )
2151 .await
2152 .expect("append structured result");
2153
2154 let session = storage.get_session(&session_id).await.expect("session");
2155 let message = session
2156 .messages
2157 .iter()
2158 .find(|message| message.id == message_id)
2159 .expect("message");
2160 assert_eq!(message.parts.len(), 2);
2161 match &message.parts[1] {
2162 MessagePart::ToolInvocation {
2163 tool,
2164 args,
2165 result,
2166 error,
2167 } => {
2168 assert_eq!(tool, "write");
2169 assert_eq!(args["path"], "notes/report.md");
2170 assert_eq!(args["content"], "# Report\n");
2171 assert_eq!(result.as_ref(), Some(&json!("ok")));
2172 assert_eq!(error.as_deref(), None);
2173 }
2174 other => panic!("expected tool part, got {other:?}"),
2175 }
2176 }
2177
2178 #[tokio::test]
2179 async fn append_message_part_prefers_executed_write_args_with_context_over_pending_raw_args() {
2180 let base = std::env::temp_dir().join(format!(
2181 "tandem-core-tool-executed-args-preferred-{}",
2182 Uuid::new_v4()
2183 ));
2184 let storage = Storage::new(&base).await.expect("storage");
2185 let session = Session::new(
2186 Some("tool executed args preferred".to_string()),
2187 Some(".".to_string()),
2188 );
2189 let session_id = session.id.clone();
2190 storage.save_session(session).await.expect("save session");
2191
2192 let user = Message::new(
2193 MessageRole::User,
2194 vec![MessagePart::Text {
2195 text: "build report".to_string(),
2196 }],
2197 );
2198 let message_id = user.id.clone();
2199 storage
2200 .append_message(&session_id, user)
2201 .await
2202 .expect("append user");
2203
2204 storage
2205 .append_message_part(
2206 &session_id,
2207 &message_id,
2208 MessagePart::ToolInvocation {
2209 tool: "write".to_string(),
2210 args: json!({"path":".","content":"draft"}),
2211 result: None,
2212 error: None,
2213 },
2214 )
2215 .await
2216 .expect("append raw pending invocation");
2217 storage
2218 .append_message_part(
2219 &session_id,
2220 &message_id,
2221 MessagePart::ToolInvocation {
2222 tool: "write".to_string(),
2223 args: json!({
2224 "path":".tandem/runs/run-123/artifacts/research-sources.json",
2225 "content":"draft",
2226 "__workspace_root":"/home/user/marketing-tandem",
2227 "__effective_cwd":"/home/user/marketing-tandem"
2228 }),
2229 result: Some(json!("ok")),
2230 error: None,
2231 },
2232 )
2233 .await
2234 .expect("append executed result");
2235
2236 let session = storage.get_session(&session_id).await.expect("session");
2237 let message = session
2238 .messages
2239 .iter()
2240 .find(|message| message.id == message_id)
2241 .expect("message");
2242 assert_eq!(message.parts.len(), 2);
2243 match &message.parts[1] {
2244 MessagePart::ToolInvocation {
2245 tool,
2246 args,
2247 result,
2248 error,
2249 } => {
2250 assert_eq!(tool, "write");
2251 assert_eq!(
2252 args["path"],
2253 ".tandem/runs/run-123/artifacts/research-sources.json"
2254 );
2255 assert_eq!(args["content"], "draft");
2256 assert_eq!(args["__workspace_root"], "/home/user/marketing-tandem");
2257 assert_eq!(result.as_ref(), Some(&json!("ok")));
2258 assert_eq!(error.as_deref(), None);
2259 }
2260 other => panic!("expected tool part, got {other:?}"),
2261 }
2262 }
2263
2264 #[tokio::test]
2265 async fn append_message_part_falls_back_to_latest_user_message_when_id_missing() {
2266 let base =
2267 std::env::temp_dir().join(format!("tandem-core-tool-fallback-{}", Uuid::new_v4()));
2268 let storage = Storage::new(&base).await.expect("storage");
2269 let session = Session::new(Some("tool fallback".to_string()), Some(".".to_string()));
2270 let session_id = session.id.clone();
2271 storage.save_session(session).await.expect("save session");
2272
2273 let first = Message::new(
2274 MessageRole::User,
2275 vec![MessagePart::Text {
2276 text: "first prompt".to_string(),
2277 }],
2278 );
2279 let second = Message::new(
2280 MessageRole::User,
2281 vec![MessagePart::Text {
2282 text: "second prompt".to_string(),
2283 }],
2284 );
2285 let second_id = second.id.clone();
2286 storage
2287 .append_message(&session_id, first)
2288 .await
2289 .expect("append first");
2290 storage
2291 .append_message(&session_id, second)
2292 .await
2293 .expect("append second");
2294
2295 storage
2296 .append_message_part(
2297 &session_id,
2298 "missing-message-id",
2299 MessagePart::ToolInvocation {
2300 tool: "glob".to_string(),
2301 args: json!({"pattern":"*"}),
2302 result: Some(json!(["README.md"])),
2303 error: None,
2304 },
2305 )
2306 .await
2307 .expect("append fallback tool part");
2308
2309 let session = storage.get_session(&session_id).await.expect("session");
2310 let message = session
2311 .messages
2312 .iter()
2313 .find(|message| message.id == second_id)
2314 .expect("latest user message");
2315 match &message.parts[1] {
2316 MessagePart::ToolInvocation { tool, result, .. } => {
2317 assert_eq!(tool, "glob");
2318 assert_eq!(result.as_ref(), Some(&json!(["README.md"])));
2319 }
2320 other => panic!("expected tool part, got {other:?}"),
2321 }
2322 }
2323
2324 #[tokio::test]
2325 async fn commit_temp_file_replaces_existing_destination() {
2326 let base =
2327 std::env::temp_dir().join(format!("tandem-core-commit-temp-file-{}", Uuid::new_v4()));
2328 stdfs::create_dir_all(&base).expect("base dir");
2329 let destination = base.join("sessions.json");
2330 let temp = base.join("sessions.json.tmp");
2331 stdfs::write(&destination, "{\"version\":\"old\"}").expect("write destination");
2332 stdfs::write(&temp, "{\"version\":\"new\"}").expect("write temp");
2333
2334 commit_temp_file(&temp, &destination)
2335 .await
2336 .expect("replace destination");
2337
2338 let raw = stdfs::read_to_string(&destination).expect("read destination");
2339 assert_eq!(raw, "{\"version\":\"new\"}");
2340 assert!(!temp.exists());
2341 }
2342
2343 #[tokio::test]
2344 async fn startup_compacts_session_snapshot_metadata() {
2345 let base = std::env::temp_dir().join(format!(
2346 "tandem-core-snapshot-compaction-{}",
2347 Uuid::new_v4()
2348 ));
2349 stdfs::create_dir_all(&base).expect("base dir");
2350
2351 let mut session = Session::new(
2352 Some("snapshot compaction".to_string()),
2353 Some(".".to_string()),
2354 );
2355 session.messages.push(Message::new(
2356 MessageRole::User,
2357 vec![MessagePart::Text {
2358 text: "current".to_string(),
2359 }],
2360 ));
2361 let session_id = session.id.clone();
2362
2363 let mut sessions = HashMap::new();
2364 sessions.insert(session_id.clone(), session);
2365 stdfs::write(
2366 base.join("sessions.json"),
2367 serde_json::to_string_pretty(&sessions).expect("serialize sessions"),
2368 )
2369 .expect("write sessions");
2370
2371 let mut snapshots = Vec::new();
2372 for label in ["a", "a", "b", "c", "d", "e", "f"] {
2373 snapshots.push(vec![Message::new(
2374 MessageRole::User,
2375 vec![MessagePart::Text {
2376 text: label.to_string(),
2377 }],
2378 )]);
2379 }
2380 let mut metadata = HashMap::new();
2381 metadata.insert(
2382 session_id.clone(),
2383 SessionMeta {
2384 snapshots,
2385 ..SessionMeta::default()
2386 },
2387 );
2388 metadata.insert("orphan".to_string(), SessionMeta::default());
2389 stdfs::write(
2390 base.join("session_meta.json"),
2391 serde_json::to_string_pretty(&metadata).expect("serialize metadata"),
2392 )
2393 .expect("write metadata");
2394 stdfs::write(base.join("questions.json"), "{}").expect("write questions");
2395
2396 let _storage = Storage::new(&base).await.expect("storage");
2397
2398 let raw = stdfs::read_to_string(base.join("session_meta.json")).expect("read metadata");
2399 let stored: HashMap<String, SessionMeta> =
2400 serde_json::from_str(&raw).expect("parse metadata");
2401 assert_eq!(stored.len(), 1);
2402 let compacted = stored.get(&session_id).expect("session metadata");
2403 assert_eq!(compacted.snapshots.len(), MAX_SESSION_SNAPSHOTS);
2404
2405 let labels = compacted
2406 .snapshots
2407 .iter()
2408 .map(|snapshot| {
2409 snapshot[0]
2410 .parts
2411 .iter()
2412 .find_map(|part| match part {
2413 MessagePart::Text { text } => Some(text.clone()),
2414 _ => None,
2415 })
2416 .expect("snapshot text")
2417 })
2418 .collect::<Vec<_>>();
2419 assert_eq!(labels, vec!["b", "c", "d", "e", "f"]);
2420 }
2421
2422 #[tokio::test]
2423 async fn startup_repairs_placeholder_titles_from_wrapped_user_messages() {
2424 let base =
2425 std::env::temp_dir().join(format!("tandem-core-title-repair-{}", Uuid::new_v4()));
2426 let storage = Storage::new(&base).await.expect("storage");
2427 let wrapped = "<memory_context>\n<current_session>\n- fact\n</current_session>\n</memory_context>\n\n[Mode instructions]\nUse tools.\n\n[User request]\nExplain this bug";
2428 let mut session = Session::new(Some("<memory_context>".to_string()), Some(".".to_string()));
2429 let id = session.id.clone();
2430 session.messages.push(Message::new(
2431 MessageRole::User,
2432 vec![MessagePart::Text {
2433 text: wrapped.to_string(),
2434 }],
2435 ));
2436 storage.save_session(session).await.expect("save");
2437 drop(storage);
2438
2439 let storage = Storage::new(&base).await.expect("storage");
2440 let repaired = storage.get_session(&id).await.expect("session");
2441 assert_eq!(repaired.title, "Explain this bug");
2442 }
2443
2444 #[tokio::test]
2445 async fn concurrent_storage_flushes_do_not_fail() {
2446 let base = std::env::temp_dir().join(format!("tandem-core-flush-race-{}", Uuid::new_v4()));
2447 let storage = Arc::new(Storage::new(&base).await.expect("storage"));
2448 let session = Session::new(Some("flush race".to_string()), Some(".".to_string()));
2449 let session_id = session.id.clone();
2450 storage.save_session(session).await.expect("save session");
2451
2452 let mut tasks = Vec::new();
2453 for task_index in 0..12 {
2454 let storage = Arc::clone(&storage);
2455 let session_id = session_id.clone();
2456 tasks.push(tokio::spawn(async move {
2457 for part_index in 0..8 {
2458 let message = Message::new(
2459 MessageRole::User,
2460 vec![MessagePart::Text {
2461 text: format!("task {task_index} part {part_index}"),
2462 }],
2463 );
2464 storage
2465 .append_message(&session_id, message)
2466 .await
2467 .expect("append message");
2468 }
2469 }));
2470 }
2471
2472 for task in tasks {
2473 task.await.expect("join task");
2474 }
2475
2476 let session = storage.get_session(&session_id).await.expect("session");
2477 assert_eq!(session.messages.len(), 12 * 8);
2478 assert!(base.join("sessions.json").exists());
2479 }
2480}