Skip to main content

tandem_core/
storage.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use anyhow::Context;
5use chrono::{TimeZone, Utc};
6use serde::{Deserialize, Serialize};
7use serde_json::{json, Value};
8use tokio::fs;
9use tokio::sync::{Mutex, RwLock};
10use tokio::task;
11use uuid::Uuid;
12
13use tandem_types::{Message, MessagePart, MessageRole, Session};
14
15use crate::message_part_reducer::reduce_message_parts;
16use crate::{
17    derive_session_title_from_prompt, normalize_workspace_path, title_needs_repair,
18    workspace_project_id,
19};
20
21#[derive(Debug, Clone, Serialize, Deserialize, Default)]
22pub struct SessionMeta {
23    pub parent_id: Option<String>,
24    #[serde(default)]
25    pub archived: bool,
26    #[serde(default)]
27    pub shared: bool,
28    pub share_id: Option<String>,
29    pub summary: Option<String>,
30    #[serde(default)]
31    pub snapshots: Vec<Vec<Message>>,
32    pub pre_revert: Option<Vec<Message>>,
33    #[serde(default)]
34    pub todos: Vec<Value>,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct QuestionToolRef {
39    #[serde(rename = "callID")]
40    pub call_id: String,
41    #[serde(rename = "messageID")]
42    pub message_id: String,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct QuestionRequest {
47    pub id: String,
48    #[serde(rename = "sessionID")]
49    pub session_id: String,
50    #[serde(default)]
51    pub questions: Vec<Value>,
52    #[serde(skip_serializing_if = "Option::is_none")]
53    pub tool: Option<QuestionToolRef>,
54}
55
56pub struct Storage {
57    base: PathBuf,
58    sessions: RwLock<HashMap<String, Session>>,
59    metadata: RwLock<HashMap<String, SessionMeta>>,
60    question_requests: RwLock<HashMap<String, QuestionRequest>>,
61    flush_lock: Mutex<()>,
62}
63
64#[derive(Debug, Clone)]
65pub enum SessionListScope {
66    Global,
67    Workspace { workspace_root: String },
68}
69
70#[derive(Debug, Clone, Default, Serialize, Deserialize)]
71pub struct SessionRepairStats {
72    pub sessions_repaired: u64,
73    pub messages_recovered: u64,
74    pub parts_recovered: u64,
75    pub conflicts_merged: u64,
76}
77
78const LEGACY_IMPORT_MARKER_FILE: &str = "legacy_import_marker.json";
79const LEGACY_IMPORT_MARKER_VERSION: u32 = 1;
80const MAX_SESSION_SNAPSHOTS: usize = 5;
81
82#[derive(Debug, Clone, Default, Serialize, Deserialize)]
83pub struct LegacyTreeCounts {
84    pub session_files: u64,
85    pub message_files: u64,
86    pub part_files: u64,
87}
88
89#[derive(Debug, Clone, Default, Serialize, Deserialize)]
90pub struct LegacyImportedCounts {
91    pub sessions: u64,
92    pub messages: u64,
93    pub parts: u64,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct LegacyImportMarker {
98    pub version: u32,
99    pub created_at_ms: u64,
100    pub last_checked_at_ms: u64,
101    pub legacy_counts: LegacyTreeCounts,
102    pub imported_counts: LegacyImportedCounts,
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct LegacyRepairRunReport {
107    pub status: String,
108    pub marker_updated: bool,
109    pub sessions_merged: u64,
110    pub messages_recovered: u64,
111    pub parts_recovered: u64,
112    pub legacy_counts: LegacyTreeCounts,
113    pub imported_counts: LegacyImportedCounts,
114}
115
116fn snapshot_session_messages(
117    session_id: &str,
118    session: &Session,
119    metadata: &mut HashMap<String, SessionMeta>,
120) {
121    let meta = metadata
122        .entry(session_id.to_string())
123        .or_insert_with(SessionMeta::default);
124    meta.snapshots.push(session.messages.clone());
125    trim_session_snapshots(&mut meta.snapshots);
126}
127
128fn trim_session_snapshots(snapshots: &mut Vec<Vec<Message>>) {
129    if snapshots.len() > MAX_SESSION_SNAPSHOTS {
130        let keep_from = snapshots.len() - MAX_SESSION_SNAPSHOTS;
131        snapshots.drain(0..keep_from);
132    }
133}
134
135fn compact_session_snapshots(snapshots: &mut Vec<Vec<Message>>) -> usize {
136    if snapshots.is_empty() {
137        return 0;
138    }
139
140    let original_len = snapshots.len();
141    let mut compacted = Vec::with_capacity(original_len);
142    let mut previous_encoded: Option<Vec<u8>> = None;
143
144    for snapshot in snapshots.drain(..) {
145        let encoded = serde_json::to_vec(&snapshot).unwrap_or_default();
146        if previous_encoded.as_ref() == Some(&encoded) {
147            continue;
148        }
149        previous_encoded = Some(encoded);
150        compacted.push(snapshot);
151    }
152
153    trim_session_snapshots(&mut compacted);
154    let removed = original_len.saturating_sub(compacted.len());
155    *snapshots = compacted;
156    removed
157}
158
159fn session_meta_is_empty(meta: &SessionMeta) -> bool {
160    meta.parent_id.is_none()
161        && !meta.archived
162        && !meta.shared
163        && meta.share_id.is_none()
164        && meta.summary.is_none()
165        && meta.snapshots.is_empty()
166        && meta.pre_revert.is_none()
167        && meta.todos.is_empty()
168}
169
170#[derive(Debug, Default)]
171struct SessionMetaCompactionStats {
172    metadata_pruned: u64,
173    snapshots_removed: u64,
174}
175
176fn compact_session_metadata(
177    sessions: &HashMap<String, Session>,
178    metadata: &mut HashMap<String, SessionMeta>,
179) -> SessionMetaCompactionStats {
180    let mut stats = SessionMetaCompactionStats::default();
181
182    metadata.retain(|session_id, meta| {
183        if !sessions.contains_key(session_id) {
184            stats.metadata_pruned += 1;
185            return false;
186        }
187
188        let removed = compact_session_snapshots(&mut meta.snapshots) as u64;
189        stats.snapshots_removed += removed;
190
191        if session_meta_is_empty(meta) {
192            stats.metadata_pruned += 1;
193            return false;
194        }
195
196        true
197    });
198
199    stats
200}
201
202impl Storage {
203    pub async fn new(base: impl AsRef<Path>) -> anyhow::Result<Self> {
204        let base = base.as_ref().to_path_buf();
205        fs::create_dir_all(&base).await?;
206        let sessions_file = base.join("sessions.json");
207        let marker_path = base.join(LEGACY_IMPORT_MARKER_FILE);
208        let sessions_file_exists = sessions_file.exists();
209        let mut imported_legacy_sessions = false;
210        let mut sessions = if sessions_file_exists {
211            let raw = fs::read_to_string(&sessions_file).await?;
212            serde_json::from_str::<HashMap<String, Session>>(&raw).unwrap_or_default()
213        } else {
214            HashMap::new()
215        };
216
217        let mut marker_to_write = None;
218        if should_run_legacy_scan_on_startup(&marker_path, sessions_file_exists).await {
219            let base_for_scan = base.clone();
220            let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
221                .await
222                .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
223            if merge_legacy_sessions(&mut sessions, scan.sessions) {
224                imported_legacy_sessions = true;
225            }
226            marker_to_write = Some(LegacyImportMarker {
227                version: LEGACY_IMPORT_MARKER_VERSION,
228                created_at_ms: now_ms_u64(),
229                last_checked_at_ms: now_ms_u64(),
230                legacy_counts: scan.legacy_counts,
231                imported_counts: scan.imported_counts,
232            });
233        }
234
235        if hydrate_workspace_roots(&mut sessions) {
236            imported_legacy_sessions = true;
237        }
238        if repair_session_titles(&mut sessions) {
239            imported_legacy_sessions = true;
240        }
241        let metadata_file = base.join("session_meta.json");
242        let mut metadata = if metadata_file.exists() {
243            let raw = fs::read_to_string(&metadata_file).await?;
244            serde_json::from_str::<HashMap<String, SessionMeta>>(&raw).unwrap_or_default()
245        } else {
246            HashMap::new()
247        };
248        let compaction = compact_session_metadata(&sessions, &mut metadata);
249        let metadata_compacted = compaction.metadata_pruned > 0 || compaction.snapshots_removed > 0;
250        if metadata_compacted {
251            tracing::info!(
252                metadata_pruned = compaction.metadata_pruned,
253                snapshots_removed = compaction.snapshots_removed,
254                "compacted persisted session metadata"
255            );
256        }
257        let questions_file = base.join("questions.json");
258        let question_requests = if questions_file.exists() {
259            let raw = fs::read_to_string(&questions_file).await?;
260            serde_json::from_str::<HashMap<String, QuestionRequest>>(&raw).unwrap_or_default()
261        } else {
262            HashMap::new()
263        };
264        let storage = Self {
265            base,
266            sessions: RwLock::new(sessions),
267            metadata: RwLock::new(metadata),
268            question_requests: RwLock::new(question_requests),
269            flush_lock: Mutex::new(()),
270        };
271
272        if imported_legacy_sessions || metadata_compacted {
273            storage.flush().await?;
274        }
275        if let Some(marker) = marker_to_write {
276            storage.write_legacy_import_marker(&marker).await?;
277        }
278
279        Ok(storage)
280    }
281
282    pub async fn list_sessions(&self) -> Vec<Session> {
283        self.list_sessions_scoped(SessionListScope::Global).await
284    }
285
286    pub async fn list_sessions_scoped(&self, scope: SessionListScope) -> Vec<Session> {
287        let all = self
288            .sessions
289            .read()
290            .await
291            .values()
292            .cloned()
293            .collect::<Vec<_>>();
294        match scope {
295            SessionListScope::Global => all,
296            SessionListScope::Workspace { workspace_root } => {
297                let Some(normalized_workspace) = normalize_workspace_path(&workspace_root) else {
298                    return Vec::new();
299                };
300                all.into_iter()
301                    .filter(|session| {
302                        let direct = session
303                            .workspace_root
304                            .as_ref()
305                            .and_then(|p| normalize_workspace_path(p))
306                            .map(|p| p == normalized_workspace)
307                            .unwrap_or(false);
308                        if direct {
309                            return true;
310                        }
311                        normalize_workspace_path(&session.directory)
312                            .map(|p| p == normalized_workspace)
313                            .unwrap_or(false)
314                    })
315                    .collect()
316            }
317        }
318    }
319
320    pub async fn get_session(&self, id: &str) -> Option<Session> {
321        self.sessions.read().await.get(id).cloned()
322    }
323
324    pub async fn save_session(&self, mut session: Session) -> anyhow::Result<()> {
325        if session.workspace_root.is_none() {
326            session.workspace_root = normalize_workspace_path(&session.directory);
327        }
328        let session_id = session.id.clone();
329        self.sessions
330            .write()
331            .await
332            .insert(session_id.clone(), session);
333        self.metadata
334            .write()
335            .await
336            .entry(session_id)
337            .or_insert_with(SessionMeta::default);
338        self.flush().await
339    }
340
341    pub async fn repair_sessions_from_file_store(&self) -> anyhow::Result<SessionRepairStats> {
342        let mut stats = SessionRepairStats::default();
343        let mut sessions = self.sessions.write().await;
344
345        for session in sessions.values_mut() {
346            let imported = load_legacy_session_messages(&self.base, &session.id);
347            if imported.is_empty() {
348                continue;
349            }
350
351            let (merged, merge_stats, changed) =
352                merge_session_messages(&session.messages, &imported);
353            if changed {
354                session.messages = merged;
355                session.time.updated =
356                    most_recent_message_time(&session.messages).unwrap_or(session.time.updated);
357                stats.sessions_repaired += 1;
358                stats.messages_recovered += merge_stats.messages_recovered;
359                stats.parts_recovered += merge_stats.parts_recovered;
360                stats.conflicts_merged += merge_stats.conflicts_merged;
361            }
362        }
363
364        if stats.sessions_repaired > 0 {
365            drop(sessions);
366            self.flush().await?;
367        }
368
369        Ok(stats)
370    }
371
372    pub async fn run_legacy_storage_repair_scan(
373        &self,
374        force: bool,
375    ) -> anyhow::Result<LegacyRepairRunReport> {
376        let marker_path = self.base.join(LEGACY_IMPORT_MARKER_FILE);
377        let sessions_exists = self.base.join("sessions.json").exists();
378        let should_scan = if force {
379            true
380        } else {
381            should_run_legacy_scan_on_startup(&marker_path, sessions_exists).await
382        };
383        if !should_scan {
384            let marker = read_legacy_import_marker(&marker_path)
385                .await
386                .unwrap_or_else(|| LegacyImportMarker {
387                    version: LEGACY_IMPORT_MARKER_VERSION,
388                    created_at_ms: now_ms_u64(),
389                    last_checked_at_ms: now_ms_u64(),
390                    legacy_counts: LegacyTreeCounts::default(),
391                    imported_counts: LegacyImportedCounts::default(),
392                });
393            return Ok(LegacyRepairRunReport {
394                status: "skipped".to_string(),
395                marker_updated: false,
396                sessions_merged: 0,
397                messages_recovered: 0,
398                parts_recovered: 0,
399                legacy_counts: marker.legacy_counts,
400                imported_counts: marker.imported_counts,
401            });
402        }
403
404        let base_for_scan = self.base.clone();
405        let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
406            .await
407            .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
408
409        let merge_stats = {
410            let mut sessions = self.sessions.write().await;
411            merge_legacy_sessions_with_stats(&mut sessions, scan.sessions)
412        };
413
414        if merge_stats.changed {
415            self.flush().await?;
416        }
417
418        let marker = LegacyImportMarker {
419            version: LEGACY_IMPORT_MARKER_VERSION,
420            created_at_ms: now_ms_u64(),
421            last_checked_at_ms: now_ms_u64(),
422            legacy_counts: scan.legacy_counts.clone(),
423            imported_counts: scan.imported_counts.clone(),
424        };
425        self.write_legacy_import_marker(&marker).await?;
426
427        Ok(LegacyRepairRunReport {
428            status: if merge_stats.changed {
429                "updated".to_string()
430            } else {
431                "no_changes".to_string()
432            },
433            marker_updated: true,
434            sessions_merged: merge_stats.sessions_merged,
435            messages_recovered: merge_stats.messages_recovered,
436            parts_recovered: merge_stats.parts_recovered,
437            legacy_counts: scan.legacy_counts,
438            imported_counts: scan.imported_counts,
439        })
440    }
441
442    pub async fn delete_session(&self, id: &str) -> anyhow::Result<bool> {
443        let removed = self.sessions.write().await.remove(id).is_some();
444        self.metadata.write().await.remove(id);
445        self.question_requests
446            .write()
447            .await
448            .retain(|_, request| request.session_id != id);
449        if removed {
450            self.flush().await?;
451        }
452        Ok(removed)
453    }
454
455    pub async fn append_message(&self, session_id: &str, msg: Message) -> anyhow::Result<()> {
456        let mut sessions = self.sessions.write().await;
457        let session = sessions
458            .get_mut(session_id)
459            .context("session not found for append_message")?;
460        session.messages.push(msg);
461        session.time.updated = Utc::now();
462        drop(sessions);
463        self.flush().await
464    }
465
466    pub async fn append_message_part(
467        &self,
468        session_id: &str,
469        message_id: &str,
470        part: MessagePart,
471    ) -> anyhow::Result<()> {
472        let mut sessions = self.sessions.write().await;
473        let session = sessions
474            .get_mut(session_id)
475            .context("session not found for append_message_part")?;
476        let message = if let Some(message) = session
477            .messages
478            .iter_mut()
479            .find(|message| message.id == message_id)
480        {
481            message
482        } else {
483            session
484                .messages
485                .iter_mut()
486                .rev()
487                .find(|message| matches!(message.role, MessageRole::User))
488                .context("message not found for append_message_part")?
489        };
490        reduce_message_parts(&mut message.parts, part);
491        session.time.updated = Utc::now();
492        drop(sessions);
493        self.flush().await
494    }
495
496    pub async fn fork_session(&self, id: &str) -> anyhow::Result<Option<Session>> {
497        let source = {
498            let sessions = self.sessions.read().await;
499            sessions.get(id).cloned()
500        };
501        let Some(mut child) = source else {
502            return Ok(None);
503        };
504
505        child.id = Uuid::new_v4().to_string();
506        child.title = format!("{} (fork)", child.title);
507        child.time.created = Utc::now();
508        child.time.updated = child.time.created;
509        child.slug = None;
510
511        self.sessions
512            .write()
513            .await
514            .insert(child.id.clone(), child.clone());
515        self.metadata.write().await.insert(
516            child.id.clone(),
517            SessionMeta {
518                parent_id: Some(id.to_string()),
519                snapshots: vec![child.messages.clone()],
520                ..SessionMeta::default()
521            },
522        );
523        self.flush().await?;
524        Ok(Some(child))
525    }
526
527    pub async fn revert_session(&self, id: &str) -> anyhow::Result<bool> {
528        let mut sessions = self.sessions.write().await;
529        let Some(session) = sessions.get_mut(id) else {
530            return Ok(false);
531        };
532        let mut metadata = self.metadata.write().await;
533        let meta = metadata
534            .entry(id.to_string())
535            .or_insert_with(SessionMeta::default);
536        let Some(snapshot) = meta.snapshots.pop() else {
537            return Ok(false);
538        };
539        meta.pre_revert = Some(session.messages.clone());
540        session.messages = snapshot;
541        session.time.updated = Utc::now();
542        drop(metadata);
543        drop(sessions);
544        self.flush().await?;
545        Ok(true)
546    }
547
548    pub async fn unrevert_session(&self, id: &str) -> anyhow::Result<bool> {
549        let mut sessions = self.sessions.write().await;
550        let Some(session) = sessions.get_mut(id) else {
551            return Ok(false);
552        };
553        let mut metadata = self.metadata.write().await;
554        let Some(meta) = metadata.get_mut(id) else {
555            return Ok(false);
556        };
557        let Some(previous) = meta.pre_revert.take() else {
558            return Ok(false);
559        };
560        meta.snapshots.push(session.messages.clone());
561        trim_session_snapshots(&mut meta.snapshots);
562        session.messages = previous;
563        session.time.updated = Utc::now();
564        drop(metadata);
565        drop(sessions);
566        self.flush().await?;
567        Ok(true)
568    }
569
570    pub async fn set_shared(&self, id: &str, shared: bool) -> anyhow::Result<Option<String>> {
571        let mut metadata = self.metadata.write().await;
572        let meta = metadata
573            .entry(id.to_string())
574            .or_insert_with(SessionMeta::default);
575        meta.shared = shared;
576        if shared {
577            if meta.share_id.is_none() {
578                meta.share_id = Some(Uuid::new_v4().to_string());
579            }
580        } else {
581            meta.share_id = None;
582        }
583        let share_id = meta.share_id.clone();
584        drop(metadata);
585        self.flush().await?;
586        Ok(share_id)
587    }
588
589    pub async fn set_archived(&self, id: &str, archived: bool) -> anyhow::Result<bool> {
590        let mut metadata = self.metadata.write().await;
591        let meta = metadata
592            .entry(id.to_string())
593            .or_insert_with(SessionMeta::default);
594        meta.archived = archived;
595        drop(metadata);
596        self.flush().await?;
597        Ok(true)
598    }
599
600    pub async fn set_summary(&self, id: &str, summary: String) -> anyhow::Result<bool> {
601        let mut metadata = self.metadata.write().await;
602        let meta = metadata
603            .entry(id.to_string())
604            .or_insert_with(SessionMeta::default);
605        meta.summary = Some(summary);
606        drop(metadata);
607        self.flush().await?;
608        Ok(true)
609    }
610
611    pub async fn children(&self, parent_id: &str) -> Vec<Session> {
612        let child_ids = {
613            let metadata = self.metadata.read().await;
614            metadata
615                .iter()
616                .filter(|(_, meta)| meta.parent_id.as_deref() == Some(parent_id))
617                .map(|(id, _)| id.clone())
618                .collect::<Vec<_>>()
619        };
620        let sessions = self.sessions.read().await;
621        child_ids
622            .into_iter()
623            .filter_map(|id| sessions.get(&id).cloned())
624            .collect()
625    }
626
627    pub async fn session_status(&self, id: &str) -> Option<Value> {
628        let metadata = self.metadata.read().await;
629        metadata.get(id).map(|meta| {
630            json!({
631                "archived": meta.archived,
632                "shared": meta.shared,
633                "parentID": meta.parent_id,
634                "snapshotCount": meta.snapshots.len()
635            })
636        })
637    }
638
639    pub async fn session_diff(&self, id: &str) -> Option<Value> {
640        let sessions = self.sessions.read().await;
641        let current = sessions.get(id)?;
642        let metadata = self.metadata.read().await;
643        let default = SessionMeta::default();
644        let meta = metadata.get(id).unwrap_or(&default);
645        let last_snapshot_len = meta.snapshots.last().map(|s| s.len()).unwrap_or(0);
646        Some(json!({
647            "sessionID": id,
648            "currentMessageCount": current.messages.len(),
649            "lastSnapshotMessageCount": last_snapshot_len,
650            "delta": current.messages.len() as i64 - last_snapshot_len as i64
651        }))
652    }
653
654    pub async fn set_todos(&self, id: &str, todos: Vec<Value>) -> anyhow::Result<()> {
655        let mut metadata = self.metadata.write().await;
656        let meta = metadata
657            .entry(id.to_string())
658            .or_insert_with(SessionMeta::default);
659        meta.todos = normalize_todo_items(todos);
660        drop(metadata);
661        self.flush().await
662    }
663
664    pub async fn get_todos(&self, id: &str) -> Vec<Value> {
665        let todos = self
666            .metadata
667            .read()
668            .await
669            .get(id)
670            .map(|meta| meta.todos.clone())
671            .unwrap_or_default();
672        normalize_todo_items(todos)
673    }
674
675    pub async fn add_question_request(
676        &self,
677        session_id: &str,
678        message_id: &str,
679        questions: Vec<Value>,
680    ) -> anyhow::Result<QuestionRequest> {
681        if questions.is_empty() {
682            return Err(anyhow::anyhow!(
683                "cannot add empty question request for session {}",
684                session_id
685            ));
686        }
687        let request = QuestionRequest {
688            id: format!("q-{}", Uuid::new_v4()),
689            session_id: session_id.to_string(),
690            questions,
691            tool: Some(QuestionToolRef {
692                call_id: format!("call-{}", Uuid::new_v4()),
693                message_id: message_id.to_string(),
694            }),
695        };
696        self.question_requests
697            .write()
698            .await
699            .insert(request.id.clone(), request.clone());
700        self.flush().await?;
701        Ok(request)
702    }
703
704    pub async fn list_question_requests(&self) -> Vec<QuestionRequest> {
705        self.question_requests
706            .read()
707            .await
708            .values()
709            .cloned()
710            .collect()
711    }
712
713    pub async fn reply_question(&self, request_id: &str) -> anyhow::Result<bool> {
714        let removed = self
715            .question_requests
716            .write()
717            .await
718            .remove(request_id)
719            .is_some();
720        if removed {
721            self.flush().await?;
722        }
723        Ok(removed)
724    }
725
726    pub async fn reject_question(&self, request_id: &str) -> anyhow::Result<bool> {
727        self.reply_question(request_id).await
728    }
729
730    pub async fn attach_session_to_workspace(
731        &self,
732        session_id: &str,
733        target_workspace: &str,
734        reason_tag: &str,
735    ) -> anyhow::Result<Option<Session>> {
736        let Some(target_workspace) = normalize_workspace_path(target_workspace) else {
737            return Ok(None);
738        };
739        let mut sessions = self.sessions.write().await;
740        let Some(session) = sessions.get_mut(session_id) else {
741            return Ok(None);
742        };
743        let previous_workspace = session
744            .workspace_root
745            .clone()
746            .or_else(|| normalize_workspace_path(&session.directory));
747
748        if session.origin_workspace_root.is_none() {
749            session.origin_workspace_root = previous_workspace.clone();
750        }
751        session.attached_from_workspace = previous_workspace;
752        session.attached_to_workspace = Some(target_workspace.clone());
753        session.attach_timestamp_ms = Some(Utc::now().timestamp_millis().max(0) as u64);
754        session.attach_reason = Some(reason_tag.trim().to_string());
755        session.workspace_root = Some(target_workspace.clone());
756        session.project_id = workspace_project_id(&target_workspace);
757        session.directory = target_workspace;
758        session.time.updated = Utc::now();
759        let updated = session.clone();
760        drop(sessions);
761        self.flush().await?;
762        Ok(Some(updated))
763    }
764
765    async fn flush(&self) -> anyhow::Result<()> {
766        let _flush_guard = self.flush_lock.lock().await;
767        {
768            let snapshot = self.sessions.read().await.clone();
769            self.flush_file("sessions.json", &snapshot).await?;
770        }
771        {
772            let metadata_snapshot = self.metadata.read().await.clone();
773            self.flush_file("session_meta.json", &metadata_snapshot)
774                .await?;
775        }
776        {
777            let questions_snapshot = self.question_requests.read().await.clone();
778            self.flush_file("questions.json", &questions_snapshot)
779                .await?;
780        }
781        Ok(())
782    }
783
784    async fn flush_file(&self, filename: &str, data: &impl serde::Serialize) -> anyhow::Result<()> {
785        let path = self.base.join(filename);
786        let temp_path = self.base.join(format!("{}.tmp", filename));
787        let payload = serde_json::to_string_pretty(data)?;
788        fs::write(&temp_path, payload).await.with_context(|| {
789            format!("failed to write temp storage file {}", temp_path.display())
790        })?;
791        let std_temp_path: std::path::PathBuf = temp_path.clone().try_into()?;
792        tokio::task::spawn_blocking(move || {
793            let file = std::fs::File::open(&std_temp_path)?;
794            file.sync_all()?;
795            Ok::<(), std::io::Error>(())
796        })
797        .await??;
798        commit_temp_file(&temp_path, &path).await.with_context(|| {
799            format!(
800                "failed to atomically replace storage file {} with {}",
801                path.display(),
802                temp_path.display()
803            )
804        })?;
805        Ok(())
806    }
807
808    async fn write_legacy_import_marker(&self, marker: &LegacyImportMarker) -> anyhow::Result<()> {
809        let payload = serde_json::to_string_pretty(marker)?;
810        fs::write(self.base.join(LEGACY_IMPORT_MARKER_FILE), payload).await?;
811        Ok(())
812    }
813}
814
815async fn commit_temp_file(temp_path: &Path, path: &Path) -> std::io::Result<()> {
816    match tokio::fs::rename(temp_path, path).await {
817        Ok(()) => Ok(()),
818        Err(err) => {
819            #[cfg(windows)]
820            {
821                // Windows `rename` can return PermissionDenied when replacing an existing file.
822                // Fall back to delete-then-rename for this case.
823                use std::io::ErrorKind;
824                if matches!(
825                    err.kind(),
826                    ErrorKind::PermissionDenied | ErrorKind::AlreadyExists
827                ) {
828                    match tokio::fs::remove_file(path).await {
829                        Ok(()) => {}
830                        Err(remove_err) if remove_err.kind() == ErrorKind::NotFound => {}
831                        Err(remove_err) => return Err(remove_err),
832                    }
833                    return tokio::fs::rename(temp_path, path).await;
834                }
835            }
836            Err(err)
837        }
838    }
839}
840
841fn normalize_todo_items(items: Vec<Value>) -> Vec<Value> {
842    items
843        .into_iter()
844        .filter_map(|item| {
845            let obj = item.as_object()?;
846            let content = obj
847                .get("content")
848                .and_then(|v| v.as_str())
849                .or_else(|| obj.get("text").and_then(|v| v.as_str()))
850                .unwrap_or("")
851                .trim()
852                .to_string();
853            if content.is_empty() {
854                return None;
855            }
856            let id = obj
857                .get("id")
858                .and_then(|v| v.as_str())
859                .filter(|s| !s.trim().is_empty())
860                .map(ToString::to_string)
861                .unwrap_or_else(|| format!("todo-{}", Uuid::new_v4()));
862            let status = obj
863                .get("status")
864                .and_then(|v| v.as_str())
865                .filter(|s| !s.trim().is_empty())
866                .map(ToString::to_string)
867                .unwrap_or_else(|| "pending".to_string());
868            Some(json!({
869                "id": id,
870                "content": content,
871                "status": status
872            }))
873        })
874        .collect()
875}
876
877#[derive(Debug)]
878struct LegacyScanResult {
879    sessions: HashMap<String, Session>,
880    legacy_counts: LegacyTreeCounts,
881    imported_counts: LegacyImportedCounts,
882}
883
884#[derive(Debug, Default)]
885struct LegacyMergeStats {
886    changed: bool,
887    sessions_merged: u64,
888    messages_recovered: u64,
889    parts_recovered: u64,
890}
891
892fn now_ms_u64() -> u64 {
893    Utc::now().timestamp_millis().max(0) as u64
894}
895
896async fn should_run_legacy_scan_on_startup(marker_path: &Path, sessions_exist: bool) -> bool {
897    if !sessions_exist {
898        return true;
899    }
900    // Fast-path startup: if canonical sessions already exist, do not block startup
901    // on deep legacy tree scans. Users can trigger an explicit repair scan later.
902    if read_legacy_import_marker(marker_path).await.is_none() {
903        return false;
904    }
905    false
906}
907
908async fn read_legacy_import_marker(marker_path: &Path) -> Option<LegacyImportMarker> {
909    let raw = fs::read_to_string(marker_path).await.ok()?;
910    serde_json::from_str::<LegacyImportMarker>(&raw).ok()
911}
912
913fn scan_legacy_sessions(base: &Path) -> anyhow::Result<LegacyScanResult> {
914    let sessions = load_legacy_opencode_sessions(base).unwrap_or_default();
915    let imported_counts = LegacyImportedCounts {
916        sessions: sessions.len() as u64,
917        messages: sessions.values().map(|s| s.messages.len() as u64).sum(),
918        parts: sessions
919            .values()
920            .flat_map(|s| s.messages.iter())
921            .map(|m| m.parts.len() as u64)
922            .sum(),
923    };
924    let legacy_counts = LegacyTreeCounts {
925        session_files: count_legacy_json_files(&base.join("session")),
926        message_files: count_legacy_json_files(&base.join("message")),
927        part_files: count_legacy_json_files(&base.join("part")),
928    };
929    Ok(LegacyScanResult {
930        sessions,
931        legacy_counts,
932        imported_counts,
933    })
934}
935
936fn count_legacy_json_files(root: &Path) -> u64 {
937    if !root.is_dir() {
938        return 0;
939    }
940    let mut count = 0u64;
941    let mut stack = vec![root.to_path_buf()];
942    while let Some(dir) = stack.pop() {
943        if let Ok(entries) = std::fs::read_dir(&dir) {
944            for entry in entries.flatten() {
945                let path = entry.path();
946                if entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
947                    stack.push(path);
948                    continue;
949                }
950                if path.extension().and_then(|ext| ext.to_str()) == Some("json") {
951                    count += 1;
952                }
953            }
954        }
955    }
956    count
957}
958
959fn merge_legacy_sessions(
960    current: &mut HashMap<String, Session>,
961    imported: HashMap<String, Session>,
962) -> bool {
963    merge_legacy_sessions_with_stats(current, imported).changed
964}
965
966fn merge_legacy_sessions_with_stats(
967    current: &mut HashMap<String, Session>,
968    imported: HashMap<String, Session>,
969) -> LegacyMergeStats {
970    let mut stats = LegacyMergeStats::default();
971    for (id, legacy) in imported {
972        let legacy_message_count = legacy.messages.len() as u64;
973        let legacy_part_count = legacy
974            .messages
975            .iter()
976            .map(|m| m.parts.len() as u64)
977            .sum::<u64>();
978        match current.get_mut(&id) {
979            None => {
980                current.insert(id, legacy);
981                stats.changed = true;
982                stats.sessions_merged += 1;
983                stats.messages_recovered += legacy_message_count;
984                stats.parts_recovered += legacy_part_count;
985            }
986            Some(existing) => {
987                let should_merge_messages =
988                    existing.messages.is_empty() && !legacy.messages.is_empty();
989                let should_fill_title =
990                    existing.title.trim().is_empty() && !legacy.title.trim().is_empty();
991                let should_fill_directory = (existing.directory.trim().is_empty()
992                    || existing.directory.trim() == "."
993                    || existing.directory.trim() == "./"
994                    || existing.directory.trim() == ".\\")
995                    && !legacy.directory.trim().is_empty();
996                let should_fill_workspace =
997                    existing.workspace_root.is_none() && legacy.workspace_root.is_some();
998                if should_merge_messages {
999                    existing.messages = legacy.messages.clone();
1000                }
1001                if should_fill_title {
1002                    existing.title = legacy.title.clone();
1003                }
1004                if should_fill_directory {
1005                    existing.directory = legacy.directory.clone();
1006                }
1007                if should_fill_workspace {
1008                    existing.workspace_root = legacy.workspace_root.clone();
1009                }
1010                if should_merge_messages
1011                    || should_fill_title
1012                    || should_fill_directory
1013                    || should_fill_workspace
1014                {
1015                    stats.changed = true;
1016                    if should_merge_messages {
1017                        stats.sessions_merged += 1;
1018                        stats.messages_recovered += legacy_message_count;
1019                        stats.parts_recovered += legacy_part_count;
1020                    }
1021                }
1022            }
1023        }
1024    }
1025    stats
1026}
1027
1028fn hydrate_workspace_roots(sessions: &mut HashMap<String, Session>) -> bool {
1029    let mut changed = false;
1030    for session in sessions.values_mut() {
1031        if session.workspace_root.is_none() {
1032            let normalized = normalize_workspace_path(&session.directory);
1033            if normalized.is_some() {
1034                session.workspace_root = normalized;
1035                changed = true;
1036            }
1037        }
1038    }
1039    changed
1040}
1041
1042fn repair_session_titles(sessions: &mut HashMap<String, Session>) -> bool {
1043    let mut changed = false;
1044    for session in sessions.values_mut() {
1045        if !title_needs_repair(&session.title) {
1046            continue;
1047        }
1048        let first_user_text = session.messages.iter().find_map(|message| {
1049            if !matches!(message.role, MessageRole::User) {
1050                return None;
1051            }
1052            message.parts.iter().find_map(|part| match part {
1053                MessagePart::Text { text } if !text.trim().is_empty() => Some(text.as_str()),
1054                _ => None,
1055            })
1056        });
1057        let Some(source) = first_user_text else {
1058            continue;
1059        };
1060        let Some(derived) = derive_session_title_from_prompt(source, 60) else {
1061            continue;
1062        };
1063        if derived == session.title {
1064            continue;
1065        }
1066        session.title = derived;
1067        session.time.updated = Utc::now();
1068        changed = true;
1069    }
1070    changed
1071}
1072
1073#[derive(Debug, Deserialize)]
1074struct LegacySessionTime {
1075    created: i64,
1076    updated: i64,
1077}
1078
1079#[derive(Debug, Deserialize)]
1080struct LegacySession {
1081    id: String,
1082    slug: Option<String>,
1083    version: Option<String>,
1084    #[serde(rename = "projectID")]
1085    project_id: Option<String>,
1086    title: Option<String>,
1087    directory: Option<String>,
1088    time: LegacySessionTime,
1089}
1090
1091fn load_legacy_opencode_sessions(base: &Path) -> anyhow::Result<HashMap<String, Session>> {
1092    let legacy_root = base.join("session");
1093    if !legacy_root.is_dir() {
1094        return Ok(HashMap::new());
1095    }
1096
1097    let mut out = HashMap::new();
1098    let mut stack = vec![legacy_root];
1099    while let Some(dir) = stack.pop() {
1100        for entry in std::fs::read_dir(&dir)? {
1101            let entry = entry?;
1102            let path = entry.path();
1103            if entry.file_type()?.is_dir() {
1104                stack.push(path);
1105                continue;
1106            }
1107            if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1108                continue;
1109            }
1110            let raw = match std::fs::read_to_string(&path) {
1111                Ok(v) => v,
1112                Err(_) => continue,
1113            };
1114            let legacy = match serde_json::from_str::<LegacySession>(&raw) {
1115                Ok(v) => v,
1116                Err(_) => continue,
1117            };
1118            let created = Utc
1119                .timestamp_millis_opt(legacy.time.created)
1120                .single()
1121                .unwrap_or_else(Utc::now);
1122            let updated = Utc
1123                .timestamp_millis_opt(legacy.time.updated)
1124                .single()
1125                .unwrap_or(created);
1126
1127            let session_id = legacy.id.clone();
1128            out.insert(
1129                session_id.clone(),
1130                Session {
1131                    id: session_id.clone(),
1132                    slug: legacy.slug,
1133                    version: legacy.version,
1134                    project_id: legacy.project_id,
1135                    title: legacy
1136                        .title
1137                        .filter(|s| !s.trim().is_empty())
1138                        .unwrap_or_else(|| "New session".to_string()),
1139                    directory: legacy
1140                        .directory
1141                        .clone()
1142                        .filter(|s| !s.trim().is_empty())
1143                        .unwrap_or_else(|| ".".to_string()),
1144                    workspace_root: legacy
1145                        .directory
1146                        .as_deref()
1147                        .and_then(normalize_workspace_path),
1148                    origin_workspace_root: None,
1149                    attached_from_workspace: None,
1150                    attached_to_workspace: None,
1151                    attach_timestamp_ms: None,
1152                    attach_reason: None,
1153                    time: tandem_types::SessionTime { created, updated },
1154                    model: None,
1155                    provider: None,
1156                    environment: None,
1157                    messages: load_legacy_session_messages(base, &session_id),
1158                },
1159            );
1160        }
1161    }
1162    Ok(out)
1163}
1164
1165#[derive(Debug, Deserialize)]
1166struct LegacyMessageTime {
1167    created: i64,
1168}
1169
1170#[derive(Debug, Deserialize)]
1171struct LegacyMessage {
1172    id: String,
1173    role: String,
1174    time: LegacyMessageTime,
1175}
1176
1177#[derive(Debug, Deserialize)]
1178struct LegacyPart {
1179    #[serde(rename = "type")]
1180    part_type: Option<String>,
1181    text: Option<String>,
1182    tool: Option<String>,
1183    args: Option<Value>,
1184    result: Option<Value>,
1185    error: Option<String>,
1186}
1187
1188fn load_legacy_session_messages(base: &Path, session_id: &str) -> Vec<Message> {
1189    let msg_dir = base.join("message").join(session_id);
1190    if !msg_dir.is_dir() {
1191        return Vec::new();
1192    }
1193
1194    let mut legacy_messages: Vec<(i64, Message)> = Vec::new();
1195
1196    let Ok(entries) = std::fs::read_dir(&msg_dir) else {
1197        return Vec::new();
1198    };
1199
1200    for entry in entries.flatten() {
1201        let path = entry.path();
1202        if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1203            continue;
1204        }
1205        let Ok(raw) = std::fs::read_to_string(&path) else {
1206            continue;
1207        };
1208        let Ok(legacy) = serde_json::from_str::<LegacyMessage>(&raw) else {
1209            continue;
1210        };
1211
1212        let created_at = Utc
1213            .timestamp_millis_opt(legacy.time.created)
1214            .single()
1215            .unwrap_or_else(Utc::now);
1216
1217        legacy_messages.push((
1218            legacy.time.created,
1219            Message {
1220                id: legacy.id.clone(),
1221                role: legacy_role_to_message_role(&legacy.role),
1222                parts: load_legacy_message_parts(base, &legacy.id),
1223                created_at,
1224            },
1225        ));
1226    }
1227
1228    legacy_messages.sort_by_key(|(created_ms, _)| *created_ms);
1229    legacy_messages.into_iter().map(|(_, msg)| msg).collect()
1230}
1231
1232fn load_legacy_message_parts(base: &Path, message_id: &str) -> Vec<MessagePart> {
1233    let parts_dir = base.join("part").join(message_id);
1234    if !parts_dir.is_dir() {
1235        return Vec::new();
1236    }
1237
1238    let Ok(entries) = std::fs::read_dir(&parts_dir) else {
1239        return Vec::new();
1240    };
1241
1242    let mut out = Vec::new();
1243    for entry in entries.flatten() {
1244        let path = entry.path();
1245        if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1246            continue;
1247        }
1248        let Ok(raw) = std::fs::read_to_string(&path) else {
1249            continue;
1250        };
1251        let Ok(part) = serde_json::from_str::<LegacyPart>(&raw) else {
1252            continue;
1253        };
1254
1255        let mapped = if let Some(tool) = part.tool {
1256            Some(MessagePart::ToolInvocation {
1257                tool,
1258                args: part.args.unwrap_or_else(|| json!({})),
1259                result: part.result,
1260                error: part.error,
1261            })
1262        } else {
1263            match part.part_type.as_deref() {
1264                Some("reasoning") => Some(MessagePart::Reasoning {
1265                    text: part.text.unwrap_or_default(),
1266                }),
1267                Some("tool") => Some(MessagePart::ToolInvocation {
1268                    tool: "tool".to_string(),
1269                    args: part.args.unwrap_or_else(|| json!({})),
1270                    result: part.result,
1271                    error: part.error,
1272                }),
1273                Some("text") | None => Some(MessagePart::Text {
1274                    text: part.text.unwrap_or_default(),
1275                }),
1276                _ => None,
1277            }
1278        };
1279
1280        if let Some(part) = mapped {
1281            out.push(part);
1282        }
1283    }
1284    out
1285}
1286
1287fn legacy_role_to_message_role(role: &str) -> MessageRole {
1288    match role.to_lowercase().as_str() {
1289        "user" => MessageRole::User,
1290        "assistant" => MessageRole::Assistant,
1291        "system" => MessageRole::System,
1292        "tool" => MessageRole::Tool,
1293        _ => MessageRole::Assistant,
1294    }
1295}
1296
1297#[derive(Debug, Clone, Default)]
1298struct MessageMergeStats {
1299    messages_recovered: u64,
1300    parts_recovered: u64,
1301    conflicts_merged: u64,
1302}
1303
1304fn message_richness(msg: &Message) -> usize {
1305    msg.parts
1306        .iter()
1307        .map(|p| match p {
1308            MessagePart::Text { text } | MessagePart::Reasoning { text } => {
1309                if text.trim().is_empty() {
1310                    0
1311                } else {
1312                    1
1313                }
1314            }
1315            MessagePart::ToolInvocation { result, error, .. } => {
1316                if result.is_some() || error.is_some() {
1317                    2
1318                } else {
1319                    1
1320                }
1321            }
1322        })
1323        .sum()
1324}
1325
1326fn most_recent_message_time(messages: &[Message]) -> Option<chrono::DateTime<Utc>> {
1327    messages.iter().map(|m| m.created_at).max()
1328}
1329
1330fn merge_session_messages(
1331    existing: &[Message],
1332    imported: &[Message],
1333) -> (Vec<Message>, MessageMergeStats, bool) {
1334    if existing.is_empty() {
1335        let messages_recovered = imported.len() as u64;
1336        let parts_recovered = imported.iter().map(|m| m.parts.len() as u64).sum();
1337        return (
1338            imported.to_vec(),
1339            MessageMergeStats {
1340                messages_recovered,
1341                parts_recovered,
1342                conflicts_merged: 0,
1343            },
1344            true,
1345        );
1346    }
1347
1348    let mut merged_by_id: HashMap<String, Message> = existing
1349        .iter()
1350        .cloned()
1351        .map(|m| (m.id.clone(), m))
1352        .collect();
1353    let mut stats = MessageMergeStats::default();
1354    let mut changed = false;
1355
1356    for incoming in imported {
1357        match merged_by_id.get(&incoming.id) {
1358            None => {
1359                merged_by_id.insert(incoming.id.clone(), incoming.clone());
1360                stats.messages_recovered += 1;
1361                stats.parts_recovered += incoming.parts.len() as u64;
1362                changed = true;
1363            }
1364            Some(current) => {
1365                let incoming_richer = message_richness(incoming) > message_richness(current)
1366                    || incoming.parts.len() > current.parts.len();
1367                if incoming_richer {
1368                    merged_by_id.insert(incoming.id.clone(), incoming.clone());
1369                    stats.conflicts_merged += 1;
1370                    changed = true;
1371                }
1372            }
1373        }
1374    }
1375
1376    let mut out: Vec<Message> = merged_by_id.into_values().collect();
1377    out.sort_by_key(|m| m.created_at);
1378    (out, stats, changed)
1379}
1380
1381#[cfg(test)]
1382mod tests {
1383    use super::*;
1384    use std::fs as stdfs;
1385    use std::sync::Arc;
1386
1387    #[tokio::test]
1388    async fn todos_are_normalized_to_wire_shape() {
1389        let base = std::env::temp_dir().join(format!("tandem-core-test-{}", Uuid::new_v4()));
1390        let storage = Storage::new(&base).await.expect("storage");
1391        let session = Session::new(Some("test".to_string()), Some(".".to_string()));
1392        let id = session.id.clone();
1393        storage.save_session(session).await.expect("save session");
1394
1395        storage
1396            .set_todos(
1397                &id,
1398                vec![
1399                    json!({"content":"first"}),
1400                    json!({"text":"second", "status":"in_progress"}),
1401                    json!({"id":"keep-id","content":"third","status":"completed"}),
1402                ],
1403            )
1404            .await
1405            .expect("set todos");
1406
1407        let todos = storage.get_todos(&id).await;
1408        assert_eq!(todos.len(), 3);
1409        for todo in todos {
1410            assert!(todo.get("id").and_then(|v| v.as_str()).is_some());
1411            assert!(todo.get("content").and_then(|v| v.as_str()).is_some());
1412            assert!(todo.get("status").and_then(|v| v.as_str()).is_some());
1413        }
1414    }
1415
1416    #[tokio::test]
1417    async fn imports_legacy_opencode_session_index_when_sessions_json_missing() {
1418        let base =
1419            std::env::temp_dir().join(format!("tandem-core-legacy-import-{}", Uuid::new_v4()));
1420        let legacy_session_dir = base.join("session").join("global");
1421        stdfs::create_dir_all(&legacy_session_dir).expect("legacy session dir");
1422        stdfs::write(
1423            legacy_session_dir.join("ses_test.json"),
1424            r#"{
1425  "id": "ses_test",
1426  "slug": "test",
1427  "version": "1.0.0",
1428  "projectID": "proj_1",
1429  "directory": "C:\\work\\demo",
1430  "title": "Legacy Session",
1431  "time": { "created": 1770913145613, "updated": 1770913146613 }
1432}"#,
1433        )
1434        .expect("legacy session write");
1435
1436        let storage = Storage::new(&base).await.expect("storage");
1437        let sessions = storage.list_sessions().await;
1438        assert_eq!(sessions.len(), 1);
1439        assert_eq!(sessions[0].id, "ses_test");
1440        assert_eq!(sessions[0].title, "Legacy Session");
1441        assert!(base.join("sessions.json").exists());
1442    }
1443
1444    #[tokio::test]
1445    async fn imports_legacy_messages_and_parts_for_session() {
1446        let base = std::env::temp_dir().join(format!("tandem-core-legacy-msg-{}", Uuid::new_v4()));
1447        let session_dir = base.join("session").join("global");
1448        let message_dir = base.join("message").join("ses_test");
1449        let part_dir = base.join("part").join("msg_1");
1450        stdfs::create_dir_all(&session_dir).expect("session dir");
1451        stdfs::create_dir_all(&message_dir).expect("message dir");
1452        stdfs::create_dir_all(&part_dir).expect("part dir");
1453
1454        stdfs::write(
1455            session_dir.join("ses_test.json"),
1456            r#"{
1457  "id": "ses_test",
1458  "projectID": "proj_1",
1459  "directory": "C:\\work\\demo",
1460  "title": "Legacy Session",
1461  "time": { "created": 1770913145613, "updated": 1770913146613 }
1462}"#,
1463        )
1464        .expect("write session");
1465
1466        stdfs::write(
1467            message_dir.join("msg_1.json"),
1468            r#"{
1469  "id": "msg_1",
1470  "sessionID": "ses_test",
1471  "role": "assistant",
1472  "time": { "created": 1770913145613 }
1473}"#,
1474        )
1475        .expect("write msg");
1476
1477        stdfs::write(
1478            part_dir.join("prt_1.json"),
1479            r#"{
1480  "id": "prt_1",
1481  "sessionID": "ses_test",
1482  "messageID": "msg_1",
1483  "type": "text",
1484  "text": "hello from legacy"
1485}"#,
1486        )
1487        .expect("write part");
1488
1489        let storage = Storage::new(&base).await.expect("storage");
1490        let sessions = storage.list_sessions().await;
1491        assert_eq!(sessions.len(), 1);
1492        assert_eq!(sessions[0].messages.len(), 1);
1493        assert_eq!(sessions[0].messages[0].parts.len(), 1);
1494    }
1495
1496    #[tokio::test]
1497    async fn skips_legacy_merge_when_sessions_json_exists() {
1498        let base =
1499            std::env::temp_dir().join(format!("tandem-core-legacy-merge-{}", Uuid::new_v4()));
1500        stdfs::create_dir_all(&base).expect("base");
1501        stdfs::write(
1502            base.join("sessions.json"),
1503            r#"{
1504  "ses_current": {
1505    "id": "ses_current",
1506    "slug": null,
1507    "version": "v1",
1508    "project_id": null,
1509    "title": "Current Session",
1510    "directory": ".",
1511    "workspace_root": null,
1512    "origin_workspace_root": null,
1513    "attached_from_workspace": null,
1514    "attached_to_workspace": null,
1515    "attach_timestamp_ms": null,
1516    "attach_reason": null,
1517    "time": {"created":"2026-01-01T00:00:00Z","updated":"2026-01-01T00:00:00Z"},
1518    "model": null,
1519    "provider": null,
1520    "messages": []
1521  }
1522}"#,
1523        )
1524        .expect("sessions.json");
1525
1526        let legacy_session_dir = base.join("session").join("global");
1527        stdfs::create_dir_all(&legacy_session_dir).expect("legacy session dir");
1528        stdfs::write(
1529            legacy_session_dir.join("ses_legacy.json"),
1530            r#"{
1531  "id": "ses_legacy",
1532  "slug": "legacy",
1533  "version": "1.0.0",
1534  "projectID": "proj_legacy",
1535  "directory": "C:\\work\\legacy",
1536  "title": "Legacy Session",
1537  "time": { "created": 1770913145613, "updated": 1770913146613 }
1538}"#,
1539        )
1540        .expect("legacy session write");
1541
1542        let storage = Storage::new(&base).await.expect("storage");
1543        let sessions = storage.list_sessions().await;
1544        let ids = sessions.iter().map(|s| s.id.clone()).collect::<Vec<_>>();
1545        assert!(ids.contains(&"ses_current".to_string()));
1546        assert!(!ids.contains(&"ses_legacy".to_string()));
1547    }
1548
1549    #[tokio::test]
1550    async fn list_sessions_scoped_filters_by_workspace_root() {
1551        let base = std::env::temp_dir().join(format!("tandem-core-scope-{}", Uuid::new_v4()));
1552        let storage = Storage::new(&base).await.expect("storage");
1553        let ws_a = base.join("ws-a");
1554        let ws_b = base.join("ws-b");
1555        stdfs::create_dir_all(&ws_a).expect("ws_a");
1556        stdfs::create_dir_all(&ws_b).expect("ws_b");
1557        let ws_a_str = ws_a.to_string_lossy().to_string();
1558        let ws_b_str = ws_b.to_string_lossy().to_string();
1559
1560        let mut a = Session::new(Some("a".to_string()), Some(ws_a_str.clone()));
1561        a.workspace_root = Some(ws_a_str.clone());
1562        storage.save_session(a).await.expect("save a");
1563
1564        let mut b = Session::new(Some("b".to_string()), Some(ws_b_str.clone()));
1565        b.workspace_root = Some(ws_b_str);
1566        storage.save_session(b).await.expect("save b");
1567
1568        let scoped = storage
1569            .list_sessions_scoped(SessionListScope::Workspace {
1570                workspace_root: ws_a_str,
1571            })
1572            .await;
1573        assert_eq!(scoped.len(), 1);
1574        assert_eq!(scoped[0].title, "a");
1575    }
1576
1577    #[tokio::test]
1578    async fn attach_session_persists_audit_metadata() {
1579        let base = std::env::temp_dir().join(format!("tandem-core-attach-{}", Uuid::new_v4()));
1580        let storage = Storage::new(&base).await.expect("storage");
1581        let ws_a = base.join("ws-a");
1582        let ws_b = base.join("ws-b");
1583        stdfs::create_dir_all(&ws_a).expect("ws_a");
1584        stdfs::create_dir_all(&ws_b).expect("ws_b");
1585        let ws_a_str = ws_a.to_string_lossy().to_string();
1586        let ws_b_str = ws_b.to_string_lossy().to_string();
1587        let mut session = Session::new(Some("s".to_string()), Some(ws_a_str.clone()));
1588        session.workspace_root = Some(ws_a_str);
1589        let id = session.id.clone();
1590        storage.save_session(session).await.expect("save");
1591
1592        let updated = storage
1593            .attach_session_to_workspace(&id, &ws_b_str, "manual")
1594            .await
1595            .expect("attach")
1596            .expect("session exists");
1597        let normalized_expected = normalize_workspace_path(&ws_b_str).expect("normalized path");
1598        assert_eq!(
1599            updated.workspace_root.as_deref(),
1600            Some(normalized_expected.as_str())
1601        );
1602        assert_eq!(
1603            updated.attached_to_workspace.as_deref(),
1604            Some(normalized_expected.as_str())
1605        );
1606        assert_eq!(updated.attach_reason.as_deref(), Some("manual"));
1607        assert!(updated.attach_timestamp_ms.is_some());
1608    }
1609
1610    #[tokio::test]
1611    async fn append_message_part_persists_tool_invocation_and_result() {
1612        let base = std::env::temp_dir().join(format!("tandem-core-tool-parts-{}", Uuid::new_v4()));
1613        let storage = Storage::new(&base).await.expect("storage");
1614        let session = Session::new(Some("tool parts".to_string()), Some(".".to_string()));
1615        let session_id = session.id.clone();
1616        storage.save_session(session).await.expect("save session");
1617
1618        let user = Message::new(
1619            MessageRole::User,
1620            vec![MessagePart::Text {
1621                text: "build ui".to_string(),
1622            }],
1623        );
1624        let message_id = user.id.clone();
1625        storage
1626            .append_message(&session_id, user)
1627            .await
1628            .expect("append user");
1629
1630        storage
1631            .append_message_part(
1632                &session_id,
1633                &message_id,
1634                MessagePart::ToolInvocation {
1635                    tool: "write".to_string(),
1636                    args: json!({"path":"game.html","content":"<html></html>"}),
1637                    result: None,
1638                    error: None,
1639                },
1640            )
1641            .await
1642            .expect("append invocation");
1643        storage
1644            .append_message_part(
1645                &session_id,
1646                &message_id,
1647                MessagePart::ToolInvocation {
1648                    tool: "write".to_string(),
1649                    args: json!({}),
1650                    result: Some(json!("ok")),
1651                    error: None,
1652                },
1653            )
1654            .await
1655            .expect("append result");
1656
1657        let session = storage.get_session(&session_id).await.expect("session");
1658        let message = session
1659            .messages
1660            .iter()
1661            .find(|message| message.id == message_id)
1662            .expect("message");
1663        assert_eq!(message.parts.len(), 2);
1664        match &message.parts[1] {
1665            MessagePart::ToolInvocation {
1666                tool,
1667                result,
1668                error,
1669                ..
1670            } => {
1671                assert_eq!(tool, "write");
1672                assert_eq!(result.as_ref(), Some(&json!("ok")));
1673                assert_eq!(error.as_deref(), None);
1674            }
1675            other => panic!("expected tool part, got {other:?}"),
1676        }
1677    }
1678
1679    #[tokio::test]
1680    async fn append_message_part_retains_failed_tool_error() {
1681        let base = std::env::temp_dir().join(format!("tandem-core-tool-error-{}", Uuid::new_v4()));
1682        let storage = Storage::new(&base).await.expect("storage");
1683        let session = Session::new(Some("tool errors".to_string()), Some(".".to_string()));
1684        let session_id = session.id.clone();
1685        storage.save_session(session).await.expect("save session");
1686
1687        let user = Message::new(
1688            MessageRole::User,
1689            vec![MessagePart::Text {
1690                text: "write file".to_string(),
1691            }],
1692        );
1693        let message_id = user.id.clone();
1694        storage
1695            .append_message(&session_id, user)
1696            .await
1697            .expect("append user");
1698
1699        storage
1700            .append_message_part(
1701                &session_id,
1702                &message_id,
1703                MessagePart::ToolInvocation {
1704                    tool: "write".to_string(),
1705                    args: json!({"path":"game.html"}),
1706                    result: None,
1707                    error: None,
1708                },
1709            )
1710            .await
1711            .expect("append invocation");
1712        storage
1713            .append_message_part(
1714                &session_id,
1715                &message_id,
1716                MessagePart::ToolInvocation {
1717                    tool: "write".to_string(),
1718                    args: json!({}),
1719                    result: None,
1720                    error: Some("WRITE_CONTENT_MISSING".to_string()),
1721                },
1722            )
1723            .await
1724            .expect("append error");
1725
1726        let session = storage.get_session(&session_id).await.expect("session");
1727        let message = session
1728            .messages
1729            .iter()
1730            .find(|message| message.id == message_id)
1731            .expect("message");
1732        match &message.parts[1] {
1733            MessagePart::ToolInvocation { error, .. } => {
1734                assert_eq!(error.as_deref(), Some("WRITE_CONTENT_MISSING"));
1735            }
1736            other => panic!("expected tool part, got {other:?}"),
1737        }
1738    }
1739
1740    #[tokio::test]
1741    async fn append_message_part_coalesces_repeated_tool_invocation_updates() {
1742        let base = std::env::temp_dir().join(format!("tandem-core-tool-merge-{}", Uuid::new_v4()));
1743        let storage = Storage::new(&base).await.expect("storage");
1744        let session = Session::new(Some("tool merge".to_string()), Some(".".to_string()));
1745        let session_id = session.id.clone();
1746        storage.save_session(session).await.expect("save session");
1747
1748        let user = Message::new(
1749            MessageRole::User,
1750            vec![MessagePart::Text {
1751                text: "build ui".to_string(),
1752            }],
1753        );
1754        let message_id = user.id.clone();
1755        storage
1756            .append_message(&session_id, user)
1757            .await
1758            .expect("append user");
1759
1760        storage
1761            .append_message_part(
1762                &session_id,
1763                &message_id,
1764                MessagePart::ToolInvocation {
1765                    tool: "write".to_string(),
1766                    args: json!({"path":"game.html"}),
1767                    result: None,
1768                    error: None,
1769                },
1770            )
1771            .await
1772            .expect("append first invocation");
1773        storage
1774            .append_message_part(
1775                &session_id,
1776                &message_id,
1777                MessagePart::ToolInvocation {
1778                    tool: "write".to_string(),
1779                    args: json!({"path":"game.html","content":"<html></html>"}),
1780                    result: None,
1781                    error: None,
1782                },
1783            )
1784            .await
1785            .expect("append updated invocation");
1786
1787        let session = storage.get_session(&session_id).await.expect("session");
1788        let message = session
1789            .messages
1790            .iter()
1791            .find(|message| message.id == message_id)
1792            .expect("message");
1793        assert_eq!(message.parts.len(), 2);
1794        match &message.parts[1] {
1795            MessagePart::ToolInvocation { tool, args, .. } => {
1796                assert_eq!(tool, "write");
1797                assert_eq!(args["path"], "game.html");
1798                assert_eq!(args["content"], "<html></html>");
1799            }
1800            other => panic!("expected tool part, got {other:?}"),
1801        }
1802    }
1803
1804    #[tokio::test]
1805    async fn append_message_part_upgrades_raw_string_args_to_structured_invocation_args() {
1806        let base =
1807            std::env::temp_dir().join(format!("tandem-core-tool-raw-upgrade-{}", Uuid::new_v4()));
1808        let storage = Storage::new(&base).await.expect("storage");
1809        let session = Session::new(Some("tool raw upgrade".to_string()), Some(".".to_string()));
1810        let session_id = session.id.clone();
1811        storage.save_session(session).await.expect("save session");
1812
1813        let user = Message::new(
1814            MessageRole::User,
1815            vec![MessagePart::Text {
1816                text: "build ui".to_string(),
1817            }],
1818        );
1819        let message_id = user.id.clone();
1820        storage
1821            .append_message(&session_id, user)
1822            .await
1823            .expect("append user");
1824
1825        storage
1826            .append_message_part(
1827                &session_id,
1828                &message_id,
1829                MessagePart::ToolInvocation {
1830                    tool: "write".to_string(),
1831                    args: json!("{\"path\":\"game.html\",\"content\":\"<html>draft</html>\"}"),
1832                    result: None,
1833                    error: None,
1834                },
1835            )
1836            .await
1837            .expect("append raw invocation");
1838        storage
1839            .append_message_part(
1840                &session_id,
1841                &message_id,
1842                MessagePart::ToolInvocation {
1843                    tool: "write".to_string(),
1844                    args: json!({"path":"game.html","content":"<html>draft</html>"}),
1845                    result: None,
1846                    error: None,
1847                },
1848            )
1849            .await
1850            .expect("append structured invocation");
1851
1852        let session = storage.get_session(&session_id).await.expect("session");
1853        let message = session
1854            .messages
1855            .iter()
1856            .find(|message| message.id == message_id)
1857            .expect("message");
1858        assert_eq!(message.parts.len(), 2);
1859        match &message.parts[1] {
1860            MessagePart::ToolInvocation { tool, args, .. } => {
1861                assert_eq!(tool, "write");
1862                assert_eq!(args["path"], "game.html");
1863                assert_eq!(args["content"], "<html>draft</html>");
1864            }
1865            other => panic!("expected tool part, got {other:?}"),
1866        }
1867    }
1868
1869    #[tokio::test]
1870    async fn append_message_part_upgrades_raw_string_args_when_result_arrives_with_structure() {
1871        let base = std::env::temp_dir().join(format!(
1872            "tandem-core-tool-raw-result-upgrade-{}",
1873            Uuid::new_v4()
1874        ));
1875        let storage = Storage::new(&base).await.expect("storage");
1876        let session = Session::new(
1877            Some("tool raw result upgrade".to_string()),
1878            Some(".".to_string()),
1879        );
1880        let session_id = session.id.clone();
1881        storage.save_session(session).await.expect("save session");
1882
1883        let user = Message::new(
1884            MessageRole::User,
1885            vec![MessagePart::Text {
1886                text: "build ui".to_string(),
1887            }],
1888        );
1889        let message_id = user.id.clone();
1890        storage
1891            .append_message(&session_id, user)
1892            .await
1893            .expect("append user");
1894
1895        storage
1896            .append_message_part(
1897                &session_id,
1898                &message_id,
1899                MessagePart::ToolInvocation {
1900                    tool: "write".to_string(),
1901                    args: json!("{\"path\":\"game.html\",\"content\":\"<html>draft</html>\"}"),
1902                    result: None,
1903                    error: None,
1904                },
1905            )
1906            .await
1907            .expect("append raw invocation");
1908        storage
1909            .append_message_part(
1910                &session_id,
1911                &message_id,
1912                MessagePart::ToolInvocation {
1913                    tool: "write".to_string(),
1914                    args: json!({"path":"game.html","content":"<html>draft</html>"}),
1915                    result: Some(json!("ok")),
1916                    error: None,
1917                },
1918            )
1919            .await
1920            .expect("append structured result");
1921
1922        let session = storage.get_session(&session_id).await.expect("session");
1923        let message = session
1924            .messages
1925            .iter()
1926            .find(|message| message.id == message_id)
1927            .expect("message");
1928        assert_eq!(message.parts.len(), 2);
1929        match &message.parts[1] {
1930            MessagePart::ToolInvocation {
1931                tool,
1932                args,
1933                result,
1934                error,
1935            } => {
1936                assert_eq!(tool, "write");
1937                assert_eq!(args["path"], "game.html");
1938                assert_eq!(args["content"], "<html>draft</html>");
1939                assert_eq!(result.as_ref(), Some(&json!("ok")));
1940                assert_eq!(error.as_deref(), None);
1941            }
1942            other => panic!("expected tool part, got {other:?}"),
1943        }
1944    }
1945
1946    #[tokio::test]
1947    async fn append_message_part_upgrades_partial_structured_args_when_result_adds_fields() {
1948        let base = std::env::temp_dir().join(format!(
1949            "tandem-core-tool-structured-result-upgrade-{}",
1950            Uuid::new_v4()
1951        ));
1952        let storage = Storage::new(&base).await.expect("storage");
1953        let session = Session::new(
1954            Some("tool structured result upgrade".to_string()),
1955            Some(".".to_string()),
1956        );
1957        let session_id = session.id.clone();
1958        storage.save_session(session).await.expect("save session");
1959
1960        let user = Message::new(
1961            MessageRole::User,
1962            vec![MessagePart::Text {
1963                text: "build ui".to_string(),
1964            }],
1965        );
1966        let message_id = user.id.clone();
1967        storage
1968            .append_message(&session_id, user)
1969            .await
1970            .expect("append user");
1971
1972        storage
1973            .append_message_part(
1974                &session_id,
1975                &message_id,
1976                MessagePart::ToolInvocation {
1977                    tool: "write".to_string(),
1978                    args: json!({"path":"game.html"}),
1979                    result: None,
1980                    error: None,
1981                },
1982            )
1983            .await
1984            .expect("append partial invocation");
1985        storage
1986            .append_message_part(
1987                &session_id,
1988                &message_id,
1989                MessagePart::ToolInvocation {
1990                    tool: "write".to_string(),
1991                    args: json!({"path":"game.html","content":"<html>draft</html>"}),
1992                    result: Some(json!("ok")),
1993                    error: None,
1994                },
1995            )
1996            .await
1997            .expect("append richer result");
1998
1999        let session = storage.get_session(&session_id).await.expect("session");
2000        let message = session
2001            .messages
2002            .iter()
2003            .find(|message| message.id == message_id)
2004            .expect("message");
2005        assert_eq!(message.parts.len(), 2);
2006        match &message.parts[1] {
2007            MessagePart::ToolInvocation {
2008                tool,
2009                args,
2010                result,
2011                error,
2012            } => {
2013                assert_eq!(tool, "write");
2014                assert_eq!(args["path"], "game.html");
2015                assert_eq!(args["content"], "<html>draft</html>");
2016                assert_eq!(result.as_ref(), Some(&json!("ok")));
2017                assert_eq!(error.as_deref(), None);
2018            }
2019            other => panic!("expected tool part, got {other:?}"),
2020        }
2021    }
2022
2023    #[tokio::test]
2024    async fn append_message_part_replaces_malformed_object_args_with_structured_result_args() {
2025        let base = std::env::temp_dir().join(format!(
2026            "tandem-core-tool-malformed-args-replace-{}",
2027            Uuid::new_v4()
2028        ));
2029        let storage = Storage::new(&base).await.expect("storage");
2030        let session = Session::new(
2031            Some("tool malformed args replacement".to_string()),
2032            Some(".".to_string()),
2033        );
2034        let session_id = session.id.clone();
2035        storage.save_session(session).await.expect("save session");
2036
2037        let user = Message::new(
2038            MessageRole::User,
2039            vec![MessagePart::Text {
2040                text: "build ui".to_string(),
2041            }],
2042        );
2043        let message_id = user.id.clone();
2044        storage
2045            .append_message(&session_id, user)
2046            .await
2047            .expect("append user");
2048
2049        storage
2050            .append_message_part(
2051                &session_id,
2052                &message_id,
2053                MessagePart::ToolInvocation {
2054                    tool: "write".to_string(),
2055                    args: json!({"{\"allow_empty": null}),
2056                    result: None,
2057                    error: None,
2058                },
2059            )
2060            .await
2061            .expect("append malformed invocation");
2062        storage
2063            .append_message_part(
2064                &session_id,
2065                &message_id,
2066                MessagePart::ToolInvocation {
2067                    tool: "write".to_string(),
2068                    args: json!({"path":"game.html","content":"<html>draft</html>"}),
2069                    result: Some(json!("ok")),
2070                    error: None,
2071                },
2072            )
2073            .await
2074            .expect("append structured result");
2075
2076        let session = storage.get_session(&session_id).await.expect("session");
2077        let message = session
2078            .messages
2079            .iter()
2080            .find(|message| message.id == message_id)
2081            .expect("message");
2082        assert_eq!(message.parts.len(), 2);
2083        match &message.parts[1] {
2084            MessagePart::ToolInvocation {
2085                tool,
2086                args,
2087                result,
2088                error,
2089            } => {
2090                assert_eq!(tool, "write");
2091                assert_eq!(args["path"], "game.html");
2092                assert_eq!(args["content"], "<html>draft</html>");
2093                assert_eq!(result.as_ref(), Some(&json!("ok")));
2094                assert_eq!(error.as_deref(), None);
2095            }
2096            other => panic!("expected tool part, got {other:?}"),
2097        }
2098    }
2099
2100    #[tokio::test]
2101    async fn append_message_part_replaces_partial_write_args_when_result_adds_path_and_content() {
2102        let base = std::env::temp_dir().join(format!(
2103            "tandem-core-tool-partial-write-args-replace-{}",
2104            Uuid::new_v4()
2105        ));
2106        let storage = Storage::new(&base).await.expect("storage");
2107        let session = Session::new(
2108            Some("tool partial write args replacement".to_string()),
2109            Some(".".to_string()),
2110        );
2111        let session_id = session.id.clone();
2112        storage.save_session(session).await.expect("save session");
2113
2114        let user = Message::new(
2115            MessageRole::User,
2116            vec![MessagePart::Text {
2117                text: "build ui".to_string(),
2118            }],
2119        );
2120        let message_id = user.id.clone();
2121        storage
2122            .append_message(&session_id, user)
2123            .await
2124            .expect("append user");
2125
2126        storage
2127            .append_message_part(
2128                &session_id,
2129                &message_id,
2130                MessagePart::ToolInvocation {
2131                    tool: "write".to_string(),
2132                    args: json!({"content": ""}),
2133                    result: None,
2134                    error: None,
2135                },
2136            )
2137            .await
2138            .expect("append partial invocation");
2139        storage
2140            .append_message_part(
2141                &session_id,
2142                &message_id,
2143                MessagePart::ToolInvocation {
2144                    tool: "write".to_string(),
2145                    args: json!({"path":"notes/report.md","content":"# Report\n"}),
2146                    result: Some(json!("ok")),
2147                    error: None,
2148                },
2149            )
2150            .await
2151            .expect("append structured result");
2152
2153        let session = storage.get_session(&session_id).await.expect("session");
2154        let message = session
2155            .messages
2156            .iter()
2157            .find(|message| message.id == message_id)
2158            .expect("message");
2159        assert_eq!(message.parts.len(), 2);
2160        match &message.parts[1] {
2161            MessagePart::ToolInvocation {
2162                tool,
2163                args,
2164                result,
2165                error,
2166            } => {
2167                assert_eq!(tool, "write");
2168                assert_eq!(args["path"], "notes/report.md");
2169                assert_eq!(args["content"], "# Report\n");
2170                assert_eq!(result.as_ref(), Some(&json!("ok")));
2171                assert_eq!(error.as_deref(), None);
2172            }
2173            other => panic!("expected tool part, got {other:?}"),
2174        }
2175    }
2176
2177    #[tokio::test]
2178    async fn append_message_part_prefers_executed_write_args_with_context_over_pending_raw_args() {
2179        let base = std::env::temp_dir().join(format!(
2180            "tandem-core-tool-executed-args-preferred-{}",
2181            Uuid::new_v4()
2182        ));
2183        let storage = Storage::new(&base).await.expect("storage");
2184        let session = Session::new(
2185            Some("tool executed args preferred".to_string()),
2186            Some(".".to_string()),
2187        );
2188        let session_id = session.id.clone();
2189        storage.save_session(session).await.expect("save session");
2190
2191        let user = Message::new(
2192            MessageRole::User,
2193            vec![MessagePart::Text {
2194                text: "build report".to_string(),
2195            }],
2196        );
2197        let message_id = user.id.clone();
2198        storage
2199            .append_message(&session_id, user)
2200            .await
2201            .expect("append user");
2202
2203        storage
2204            .append_message_part(
2205                &session_id,
2206                &message_id,
2207                MessagePart::ToolInvocation {
2208                    tool: "write".to_string(),
2209                    args: json!({"path":".","content":"draft"}),
2210                    result: None,
2211                    error: None,
2212                },
2213            )
2214            .await
2215            .expect("append raw pending invocation");
2216        storage
2217            .append_message_part(
2218                &session_id,
2219                &message_id,
2220                MessagePart::ToolInvocation {
2221                    tool: "write".to_string(),
2222                    args: json!({
2223                        "path":".tandem/runs/run-123/artifacts/research-sources.json",
2224                        "content":"draft",
2225                        "__workspace_root":"/home/evan/marketing-tandem",
2226                        "__effective_cwd":"/home/evan/marketing-tandem"
2227                    }),
2228                    result: Some(json!("ok")),
2229                    error: None,
2230                },
2231            )
2232            .await
2233            .expect("append executed result");
2234
2235        let session = storage.get_session(&session_id).await.expect("session");
2236        let message = session
2237            .messages
2238            .iter()
2239            .find(|message| message.id == message_id)
2240            .expect("message");
2241        assert_eq!(message.parts.len(), 2);
2242        match &message.parts[1] {
2243            MessagePart::ToolInvocation {
2244                tool,
2245                args,
2246                result,
2247                error,
2248            } => {
2249                assert_eq!(tool, "write");
2250                assert_eq!(
2251                    args["path"],
2252                    ".tandem/runs/run-123/artifacts/research-sources.json"
2253                );
2254                assert_eq!(args["content"], "draft");
2255                assert_eq!(args["__workspace_root"], "/home/evan/marketing-tandem");
2256                assert_eq!(result.as_ref(), Some(&json!("ok")));
2257                assert_eq!(error.as_deref(), None);
2258            }
2259            other => panic!("expected tool part, got {other:?}"),
2260        }
2261    }
2262
2263    #[tokio::test]
2264    async fn append_message_part_falls_back_to_latest_user_message_when_id_missing() {
2265        let base =
2266            std::env::temp_dir().join(format!("tandem-core-tool-fallback-{}", Uuid::new_v4()));
2267        let storage = Storage::new(&base).await.expect("storage");
2268        let session = Session::new(Some("tool fallback".to_string()), Some(".".to_string()));
2269        let session_id = session.id.clone();
2270        storage.save_session(session).await.expect("save session");
2271
2272        let first = Message::new(
2273            MessageRole::User,
2274            vec![MessagePart::Text {
2275                text: "first prompt".to_string(),
2276            }],
2277        );
2278        let second = Message::new(
2279            MessageRole::User,
2280            vec![MessagePart::Text {
2281                text: "second prompt".to_string(),
2282            }],
2283        );
2284        let second_id = second.id.clone();
2285        storage
2286            .append_message(&session_id, first)
2287            .await
2288            .expect("append first");
2289        storage
2290            .append_message(&session_id, second)
2291            .await
2292            .expect("append second");
2293
2294        storage
2295            .append_message_part(
2296                &session_id,
2297                "missing-message-id",
2298                MessagePart::ToolInvocation {
2299                    tool: "glob".to_string(),
2300                    args: json!({"pattern":"*"}),
2301                    result: Some(json!(["README.md"])),
2302                    error: None,
2303                },
2304            )
2305            .await
2306            .expect("append fallback tool part");
2307
2308        let session = storage.get_session(&session_id).await.expect("session");
2309        let message = session
2310            .messages
2311            .iter()
2312            .find(|message| message.id == second_id)
2313            .expect("latest user message");
2314        match &message.parts[1] {
2315            MessagePart::ToolInvocation { tool, result, .. } => {
2316                assert_eq!(tool, "glob");
2317                assert_eq!(result.as_ref(), Some(&json!(["README.md"])));
2318            }
2319            other => panic!("expected tool part, got {other:?}"),
2320        }
2321    }
2322
2323    #[tokio::test]
2324    async fn commit_temp_file_replaces_existing_destination() {
2325        let base =
2326            std::env::temp_dir().join(format!("tandem-core-commit-temp-file-{}", Uuid::new_v4()));
2327        stdfs::create_dir_all(&base).expect("base dir");
2328        let destination = base.join("sessions.json");
2329        let temp = base.join("sessions.json.tmp");
2330        stdfs::write(&destination, "{\"version\":\"old\"}").expect("write destination");
2331        stdfs::write(&temp, "{\"version\":\"new\"}").expect("write temp");
2332
2333        commit_temp_file(&temp, &destination)
2334            .await
2335            .expect("replace destination");
2336
2337        let raw = stdfs::read_to_string(&destination).expect("read destination");
2338        assert_eq!(raw, "{\"version\":\"new\"}");
2339        assert!(!temp.exists());
2340    }
2341
2342    #[tokio::test]
2343    async fn startup_compacts_session_snapshot_metadata() {
2344        let base = std::env::temp_dir().join(format!(
2345            "tandem-core-snapshot-compaction-{}",
2346            Uuid::new_v4()
2347        ));
2348        stdfs::create_dir_all(&base).expect("base dir");
2349
2350        let mut session = Session::new(
2351            Some("snapshot compaction".to_string()),
2352            Some(".".to_string()),
2353        );
2354        session.messages.push(Message::new(
2355            MessageRole::User,
2356            vec![MessagePart::Text {
2357                text: "current".to_string(),
2358            }],
2359        ));
2360        let session_id = session.id.clone();
2361
2362        let mut sessions = HashMap::new();
2363        sessions.insert(session_id.clone(), session);
2364        stdfs::write(
2365            base.join("sessions.json"),
2366            serde_json::to_string_pretty(&sessions).expect("serialize sessions"),
2367        )
2368        .expect("write sessions");
2369
2370        let mut snapshots = Vec::new();
2371        for label in ["a", "a", "b", "c", "d", "e", "f"] {
2372            snapshots.push(vec![Message::new(
2373                MessageRole::User,
2374                vec![MessagePart::Text {
2375                    text: label.to_string(),
2376                }],
2377            )]);
2378        }
2379        let mut metadata = HashMap::new();
2380        metadata.insert(
2381            session_id.clone(),
2382            SessionMeta {
2383                snapshots,
2384                ..SessionMeta::default()
2385            },
2386        );
2387        metadata.insert("orphan".to_string(), SessionMeta::default());
2388        stdfs::write(
2389            base.join("session_meta.json"),
2390            serde_json::to_string_pretty(&metadata).expect("serialize metadata"),
2391        )
2392        .expect("write metadata");
2393        stdfs::write(base.join("questions.json"), "{}").expect("write questions");
2394
2395        let _storage = Storage::new(&base).await.expect("storage");
2396
2397        let raw = stdfs::read_to_string(base.join("session_meta.json")).expect("read metadata");
2398        let stored: HashMap<String, SessionMeta> =
2399            serde_json::from_str(&raw).expect("parse metadata");
2400        assert_eq!(stored.len(), 1);
2401        let compacted = stored.get(&session_id).expect("session metadata");
2402        assert_eq!(compacted.snapshots.len(), MAX_SESSION_SNAPSHOTS);
2403
2404        let labels = compacted
2405            .snapshots
2406            .iter()
2407            .map(|snapshot| {
2408                snapshot[0]
2409                    .parts
2410                    .iter()
2411                    .find_map(|part| match part {
2412                        MessagePart::Text { text } => Some(text.clone()),
2413                        _ => None,
2414                    })
2415                    .expect("snapshot text")
2416            })
2417            .collect::<Vec<_>>();
2418        assert_eq!(labels, vec!["b", "c", "d", "e", "f"]);
2419    }
2420
2421    #[tokio::test]
2422    async fn startup_repairs_placeholder_titles_from_wrapped_user_messages() {
2423        let base =
2424            std::env::temp_dir().join(format!("tandem-core-title-repair-{}", Uuid::new_v4()));
2425        let storage = Storage::new(&base).await.expect("storage");
2426        let wrapped = "<memory_context>\n<current_session>\n- fact\n</current_session>\n</memory_context>\n\n[Mode instructions]\nUse tools.\n\n[User request]\nExplain this bug";
2427        let mut session = Session::new(Some("<memory_context>".to_string()), Some(".".to_string()));
2428        let id = session.id.clone();
2429        session.messages.push(Message::new(
2430            MessageRole::User,
2431            vec![MessagePart::Text {
2432                text: wrapped.to_string(),
2433            }],
2434        ));
2435        storage.save_session(session).await.expect("save");
2436        drop(storage);
2437
2438        let storage = Storage::new(&base).await.expect("storage");
2439        let repaired = storage.get_session(&id).await.expect("session");
2440        assert_eq!(repaired.title, "Explain this bug");
2441    }
2442
2443    #[tokio::test]
2444    async fn concurrent_storage_flushes_do_not_fail() {
2445        let base = std::env::temp_dir().join(format!("tandem-core-flush-race-{}", Uuid::new_v4()));
2446        let storage = Arc::new(Storage::new(&base).await.expect("storage"));
2447        let session = Session::new(Some("flush race".to_string()), Some(".".to_string()));
2448        let session_id = session.id.clone();
2449        storage.save_session(session).await.expect("save session");
2450
2451        let mut tasks = Vec::new();
2452        for task_index in 0..12 {
2453            let storage = Arc::clone(&storage);
2454            let session_id = session_id.clone();
2455            tasks.push(tokio::spawn(async move {
2456                for part_index in 0..8 {
2457                    let message = Message::new(
2458                        MessageRole::User,
2459                        vec![MessagePart::Text {
2460                            text: format!("task {task_index} part {part_index}"),
2461                        }],
2462                    );
2463                    storage
2464                        .append_message(&session_id, message)
2465                        .await
2466                        .expect("append message");
2467                }
2468            }));
2469        }
2470
2471        for task in tasks {
2472            task.await.expect("join task");
2473        }
2474
2475        let session = storage.get_session(&session_id).await.expect("session");
2476        assert_eq!(session.messages.len(), 12 * 8);
2477        assert!(base.join("sessions.json").exists());
2478    }
2479}