Skip to main content

tandem_core/
storage.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use anyhow::Context;
5use chrono::{TimeZone, Utc};
6use serde::{Deserialize, Serialize};
7use serde_json::{json, Value};
8use tokio::fs;
9use tokio::sync::{Mutex, RwLock};
10use tokio::task;
11use uuid::Uuid;
12
13use tandem_types::{Message, MessagePart, MessageRole, Session};
14
15use crate::message_part_reducer::reduce_message_parts;
16use crate::{
17    derive_session_title_from_prompt, normalize_workspace_path, title_needs_repair,
18    workspace_project_id,
19};
20
21#[derive(Debug, Clone, Serialize, Deserialize, Default)]
22pub struct SessionMeta {
23    pub parent_id: Option<String>,
24    #[serde(default)]
25    pub archived: bool,
26    #[serde(default)]
27    pub shared: bool,
28    pub share_id: Option<String>,
29    pub summary: Option<String>,
30    #[serde(default)]
31    pub snapshots: Vec<Vec<Message>>,
32    pub pre_revert: Option<Vec<Message>>,
33    #[serde(default)]
34    pub todos: Vec<Value>,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct QuestionToolRef {
39    #[serde(rename = "callID")]
40    pub call_id: String,
41    #[serde(rename = "messageID")]
42    pub message_id: String,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct QuestionRequest {
47    pub id: String,
48    #[serde(rename = "sessionID")]
49    pub session_id: String,
50    #[serde(default)]
51    pub questions: Vec<Value>,
52    #[serde(skip_serializing_if = "Option::is_none")]
53    pub tool: Option<QuestionToolRef>,
54}
55
56pub struct Storage {
57    base: PathBuf,
58    sessions: RwLock<HashMap<String, Session>>,
59    metadata: RwLock<HashMap<String, SessionMeta>>,
60    question_requests: RwLock<HashMap<String, QuestionRequest>>,
61    flush_lock: Mutex<()>,
62}
63
64#[derive(Debug, Clone)]
65pub enum SessionListScope {
66    Global,
67    Workspace { workspace_root: String },
68}
69
70#[derive(Debug, Clone, Default, Serialize, Deserialize)]
71pub struct SessionRepairStats {
72    pub sessions_repaired: u64,
73    pub messages_recovered: u64,
74    pub parts_recovered: u64,
75    pub conflicts_merged: u64,
76}
77
78const LEGACY_IMPORT_MARKER_FILE: &str = "legacy_import_marker.json";
79const LEGACY_IMPORT_MARKER_VERSION: u32 = 1;
80const MAX_SESSION_SNAPSHOTS: usize = 5;
81
82#[derive(Debug, Clone, Default, Serialize, Deserialize)]
83pub struct LegacyTreeCounts {
84    pub session_files: u64,
85    pub message_files: u64,
86    pub part_files: u64,
87}
88
89#[derive(Debug, Clone, Default, Serialize, Deserialize)]
90pub struct LegacyImportedCounts {
91    pub sessions: u64,
92    pub messages: u64,
93    pub parts: u64,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct LegacyImportMarker {
98    pub version: u32,
99    pub created_at_ms: u64,
100    pub last_checked_at_ms: u64,
101    pub legacy_counts: LegacyTreeCounts,
102    pub imported_counts: LegacyImportedCounts,
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct LegacyRepairRunReport {
107    pub status: String,
108    pub marker_updated: bool,
109    pub sessions_merged: u64,
110    pub messages_recovered: u64,
111    pub parts_recovered: u64,
112    pub legacy_counts: LegacyTreeCounts,
113    pub imported_counts: LegacyImportedCounts,
114}
115
116fn snapshot_session_messages(
117    session_id: &str,
118    session: &Session,
119    metadata: &mut HashMap<String, SessionMeta>,
120) {
121    let meta = metadata
122        .entry(session_id.to_string())
123        .or_insert_with(SessionMeta::default);
124    meta.snapshots.push(session.messages.clone());
125    trim_session_snapshots(&mut meta.snapshots);
126}
127
128fn trim_session_snapshots(snapshots: &mut Vec<Vec<Message>>) {
129    if snapshots.len() > MAX_SESSION_SNAPSHOTS {
130        let keep_from = snapshots.len() - MAX_SESSION_SNAPSHOTS;
131        snapshots.drain(0..keep_from);
132    }
133}
134
135fn compact_session_snapshots(snapshots: &mut Vec<Vec<Message>>) -> usize {
136    if snapshots.is_empty() {
137        return 0;
138    }
139
140    let original_len = snapshots.len();
141    let mut compacted = Vec::with_capacity(original_len);
142    let mut previous_encoded: Option<Vec<u8>> = None;
143
144    for snapshot in snapshots.drain(..) {
145        let encoded = serde_json::to_vec(&snapshot).unwrap_or_default();
146        if previous_encoded.as_ref() == Some(&encoded) {
147            continue;
148        }
149        previous_encoded = Some(encoded);
150        compacted.push(snapshot);
151    }
152
153    trim_session_snapshots(&mut compacted);
154    let removed = original_len.saturating_sub(compacted.len());
155    *snapshots = compacted;
156    removed
157}
158
159fn session_meta_is_empty(meta: &SessionMeta) -> bool {
160    meta.parent_id.is_none()
161        && !meta.archived
162        && !meta.shared
163        && meta.share_id.is_none()
164        && meta.summary.is_none()
165        && meta.snapshots.is_empty()
166        && meta.pre_revert.is_none()
167        && meta.todos.is_empty()
168}
169
170#[derive(Debug, Default)]
171struct SessionMetaCompactionStats {
172    metadata_pruned: u64,
173    snapshots_removed: u64,
174}
175
176fn compact_session_metadata(
177    sessions: &HashMap<String, Session>,
178    metadata: &mut HashMap<String, SessionMeta>,
179) -> SessionMetaCompactionStats {
180    let mut stats = SessionMetaCompactionStats::default();
181
182    metadata.retain(|session_id, meta| {
183        if !sessions.contains_key(session_id) {
184            stats.metadata_pruned += 1;
185            return false;
186        }
187
188        let removed = compact_session_snapshots(&mut meta.snapshots) as u64;
189        stats.snapshots_removed += removed;
190
191        if session_meta_is_empty(meta) {
192            stats.metadata_pruned += 1;
193            return false;
194        }
195
196        true
197    });
198
199    stats
200}
201
202impl Storage {
203    pub async fn new(base: impl AsRef<Path>) -> anyhow::Result<Self> {
204        let base = base.as_ref().to_path_buf();
205        fs::create_dir_all(&base).await?;
206        let sessions_file = base.join("sessions.json");
207        let marker_path = base.join(LEGACY_IMPORT_MARKER_FILE);
208        let sessions_file_exists = sessions_file.exists();
209        let mut imported_legacy_sessions = false;
210        let mut sessions = if sessions_file_exists {
211            let raw = fs::read_to_string(&sessions_file).await?;
212            serde_json::from_str::<HashMap<String, Session>>(&raw).unwrap_or_default()
213        } else {
214            HashMap::new()
215        };
216
217        let mut marker_to_write = None;
218        if should_run_legacy_scan_on_startup(&marker_path, sessions_file_exists).await {
219            let base_for_scan = base.clone();
220            let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
221                .await
222                .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
223            if merge_legacy_sessions(&mut sessions, scan.sessions) {
224                imported_legacy_sessions = true;
225            }
226            marker_to_write = Some(LegacyImportMarker {
227                version: LEGACY_IMPORT_MARKER_VERSION,
228                created_at_ms: now_ms_u64(),
229                last_checked_at_ms: now_ms_u64(),
230                legacy_counts: scan.legacy_counts,
231                imported_counts: scan.imported_counts,
232            });
233        }
234
235        if hydrate_workspace_roots(&mut sessions) {
236            imported_legacy_sessions = true;
237        }
238        if repair_session_titles(&mut sessions) {
239            imported_legacy_sessions = true;
240        }
241        let metadata_file = base.join("session_meta.json");
242        let mut metadata = if metadata_file.exists() {
243            let raw = fs::read_to_string(&metadata_file).await?;
244            serde_json::from_str::<HashMap<String, SessionMeta>>(&raw).unwrap_or_default()
245        } else {
246            HashMap::new()
247        };
248        let compaction = compact_session_metadata(&sessions, &mut metadata);
249        let metadata_compacted = compaction.metadata_pruned > 0 || compaction.snapshots_removed > 0;
250        if metadata_compacted {
251            tracing::info!(
252                metadata_pruned = compaction.metadata_pruned,
253                snapshots_removed = compaction.snapshots_removed,
254                "compacted persisted session metadata"
255            );
256        }
257        let questions_file = base.join("questions.json");
258        let question_requests = if questions_file.exists() {
259            let raw = fs::read_to_string(&questions_file).await?;
260            serde_json::from_str::<HashMap<String, QuestionRequest>>(&raw).unwrap_or_default()
261        } else {
262            HashMap::new()
263        };
264        let storage = Self {
265            base,
266            sessions: RwLock::new(sessions),
267            metadata: RwLock::new(metadata),
268            question_requests: RwLock::new(question_requests),
269            flush_lock: Mutex::new(()),
270        };
271
272        if imported_legacy_sessions || metadata_compacted {
273            storage.flush().await?;
274        }
275        if let Some(marker) = marker_to_write {
276            storage.write_legacy_import_marker(&marker).await?;
277        }
278
279        Ok(storage)
280    }
281
282    pub async fn list_sessions(&self) -> Vec<Session> {
283        self.list_sessions_scoped(SessionListScope::Global).await
284    }
285
286    pub async fn list_sessions_scoped(&self, scope: SessionListScope) -> Vec<Session> {
287        let all = self
288            .sessions
289            .read()
290            .await
291            .values()
292            .cloned()
293            .collect::<Vec<_>>();
294        match scope {
295            SessionListScope::Global => all,
296            SessionListScope::Workspace { workspace_root } => {
297                let Some(normalized_workspace) = normalize_workspace_path(&workspace_root) else {
298                    return Vec::new();
299                };
300                all.into_iter()
301                    .filter(|session| {
302                        let direct = session
303                            .workspace_root
304                            .as_ref()
305                            .and_then(|p| normalize_workspace_path(p))
306                            .map(|p| p == normalized_workspace)
307                            .unwrap_or(false);
308                        if direct {
309                            return true;
310                        }
311                        normalize_workspace_path(&session.directory)
312                            .map(|p| p == normalized_workspace)
313                            .unwrap_or(false)
314                    })
315                    .collect()
316            }
317        }
318    }
319
320    pub async fn get_session(&self, id: &str) -> Option<Session> {
321        self.sessions.read().await.get(id).cloned()
322    }
323
324    pub async fn save_session(&self, mut session: Session) -> anyhow::Result<()> {
325        if session.workspace_root.is_none() {
326            session.workspace_root = normalize_workspace_path(&session.directory);
327        }
328        let session_id = session.id.clone();
329        self.sessions
330            .write()
331            .await
332            .insert(session_id.clone(), session);
333        self.metadata
334            .write()
335            .await
336            .entry(session_id)
337            .or_insert_with(SessionMeta::default);
338        self.flush().await
339    }
340
341    pub async fn repair_sessions_from_file_store(&self) -> anyhow::Result<SessionRepairStats> {
342        let mut stats = SessionRepairStats::default();
343        let mut sessions = self.sessions.write().await;
344
345        for session in sessions.values_mut() {
346            let imported = load_legacy_session_messages(&self.base, &session.id);
347            if imported.is_empty() {
348                continue;
349            }
350
351            let (merged, merge_stats, changed) =
352                merge_session_messages(&session.messages, &imported);
353            if changed {
354                session.messages = merged;
355                session.time.updated =
356                    most_recent_message_time(&session.messages).unwrap_or(session.time.updated);
357                stats.sessions_repaired += 1;
358                stats.messages_recovered += merge_stats.messages_recovered;
359                stats.parts_recovered += merge_stats.parts_recovered;
360                stats.conflicts_merged += merge_stats.conflicts_merged;
361            }
362        }
363
364        if stats.sessions_repaired > 0 {
365            drop(sessions);
366            self.flush().await?;
367        }
368
369        Ok(stats)
370    }
371
372    pub async fn run_legacy_storage_repair_scan(
373        &self,
374        force: bool,
375    ) -> anyhow::Result<LegacyRepairRunReport> {
376        let marker_path = self.base.join(LEGACY_IMPORT_MARKER_FILE);
377        let sessions_exists = self.base.join("sessions.json").exists();
378        let should_scan = if force {
379            true
380        } else {
381            should_run_legacy_scan_on_startup(&marker_path, sessions_exists).await
382        };
383        if !should_scan {
384            let marker = read_legacy_import_marker(&marker_path)
385                .await
386                .unwrap_or_else(|| LegacyImportMarker {
387                    version: LEGACY_IMPORT_MARKER_VERSION,
388                    created_at_ms: now_ms_u64(),
389                    last_checked_at_ms: now_ms_u64(),
390                    legacy_counts: LegacyTreeCounts::default(),
391                    imported_counts: LegacyImportedCounts::default(),
392                });
393            return Ok(LegacyRepairRunReport {
394                status: "skipped".to_string(),
395                marker_updated: false,
396                sessions_merged: 0,
397                messages_recovered: 0,
398                parts_recovered: 0,
399                legacy_counts: marker.legacy_counts,
400                imported_counts: marker.imported_counts,
401            });
402        }
403
404        let base_for_scan = self.base.clone();
405        let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
406            .await
407            .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
408
409        let merge_stats = {
410            let mut sessions = self.sessions.write().await;
411            merge_legacy_sessions_with_stats(&mut sessions, scan.sessions)
412        };
413
414        if merge_stats.changed {
415            self.flush().await?;
416        }
417
418        let marker = LegacyImportMarker {
419            version: LEGACY_IMPORT_MARKER_VERSION,
420            created_at_ms: now_ms_u64(),
421            last_checked_at_ms: now_ms_u64(),
422            legacy_counts: scan.legacy_counts.clone(),
423            imported_counts: scan.imported_counts.clone(),
424        };
425        self.write_legacy_import_marker(&marker).await?;
426
427        Ok(LegacyRepairRunReport {
428            status: if merge_stats.changed {
429                "updated".to_string()
430            } else {
431                "no_changes".to_string()
432            },
433            marker_updated: true,
434            sessions_merged: merge_stats.sessions_merged,
435            messages_recovered: merge_stats.messages_recovered,
436            parts_recovered: merge_stats.parts_recovered,
437            legacy_counts: scan.legacy_counts,
438            imported_counts: scan.imported_counts,
439        })
440    }
441
442    pub async fn delete_session(&self, id: &str) -> anyhow::Result<bool> {
443        let removed = self.sessions.write().await.remove(id).is_some();
444        self.metadata.write().await.remove(id);
445        self.question_requests
446            .write()
447            .await
448            .retain(|_, request| request.session_id != id);
449        if removed {
450            self.flush().await?;
451        }
452        Ok(removed)
453    }
454
455    pub async fn append_message(&self, session_id: &str, msg: Message) -> anyhow::Result<()> {
456        let mut sessions = self.sessions.write().await;
457        let session = sessions
458            .get_mut(session_id)
459            .context("session not found for append_message")?;
460        session.messages.push(msg);
461        session.time.updated = Utc::now();
462        drop(sessions);
463        self.flush().await
464    }
465
466    pub async fn append_message_part(
467        &self,
468        session_id: &str,
469        message_id: &str,
470        part: MessagePart,
471    ) -> anyhow::Result<()> {
472        let mut sessions = self.sessions.write().await;
473        let session = sessions
474            .get_mut(session_id)
475            .context("session not found for append_message_part")?;
476        let message = if let Some(message) = session
477            .messages
478            .iter_mut()
479            .find(|message| message.id == message_id)
480        {
481            message
482        } else {
483            session
484                .messages
485                .iter_mut()
486                .rev()
487                .find(|message| matches!(message.role, MessageRole::User))
488                .context("message not found for append_message_part")?
489        };
490        reduce_message_parts(&mut message.parts, part);
491        session.time.updated = Utc::now();
492        drop(sessions);
493        self.flush().await
494    }
495
496    pub async fn fork_session(&self, id: &str) -> anyhow::Result<Option<Session>> {
497        let source = {
498            let sessions = self.sessions.read().await;
499            sessions.get(id).cloned()
500        };
501        let Some(mut child) = source else {
502            return Ok(None);
503        };
504
505        child.id = Uuid::new_v4().to_string();
506        child.title = format!("{} (fork)", child.title);
507        child.time.created = Utc::now();
508        child.time.updated = child.time.created;
509        child.slug = None;
510
511        self.sessions
512            .write()
513            .await
514            .insert(child.id.clone(), child.clone());
515        self.metadata.write().await.insert(
516            child.id.clone(),
517            SessionMeta {
518                parent_id: Some(id.to_string()),
519                snapshots: vec![child.messages.clone()],
520                ..SessionMeta::default()
521            },
522        );
523        self.flush().await?;
524        Ok(Some(child))
525    }
526
527    pub async fn revert_session(&self, id: &str) -> anyhow::Result<bool> {
528        let mut sessions = self.sessions.write().await;
529        let Some(session) = sessions.get_mut(id) else {
530            return Ok(false);
531        };
532        let mut metadata = self.metadata.write().await;
533        let meta = metadata
534            .entry(id.to_string())
535            .or_insert_with(SessionMeta::default);
536        let Some(snapshot) = meta.snapshots.pop() else {
537            return Ok(false);
538        };
539        meta.pre_revert = Some(session.messages.clone());
540        session.messages = snapshot;
541        session.time.updated = Utc::now();
542        drop(metadata);
543        drop(sessions);
544        self.flush().await?;
545        Ok(true)
546    }
547
548    pub async fn unrevert_session(&self, id: &str) -> anyhow::Result<bool> {
549        let mut sessions = self.sessions.write().await;
550        let Some(session) = sessions.get_mut(id) else {
551            return Ok(false);
552        };
553        let mut metadata = self.metadata.write().await;
554        let Some(meta) = metadata.get_mut(id) else {
555            return Ok(false);
556        };
557        let Some(previous) = meta.pre_revert.take() else {
558            return Ok(false);
559        };
560        meta.snapshots.push(session.messages.clone());
561        trim_session_snapshots(&mut meta.snapshots);
562        session.messages = previous;
563        session.time.updated = Utc::now();
564        drop(metadata);
565        drop(sessions);
566        self.flush().await?;
567        Ok(true)
568    }
569
570    pub async fn set_shared(&self, id: &str, shared: bool) -> anyhow::Result<Option<String>> {
571        let mut metadata = self.metadata.write().await;
572        let meta = metadata
573            .entry(id.to_string())
574            .or_insert_with(SessionMeta::default);
575        meta.shared = shared;
576        if shared {
577            if meta.share_id.is_none() {
578                meta.share_id = Some(Uuid::new_v4().to_string());
579            }
580        } else {
581            meta.share_id = None;
582        }
583        let share_id = meta.share_id.clone();
584        drop(metadata);
585        self.flush().await?;
586        Ok(share_id)
587    }
588
589    pub async fn set_archived(&self, id: &str, archived: bool) -> anyhow::Result<bool> {
590        let mut metadata = self.metadata.write().await;
591        let meta = metadata
592            .entry(id.to_string())
593            .or_insert_with(SessionMeta::default);
594        meta.archived = archived;
595        drop(metadata);
596        self.flush().await?;
597        Ok(true)
598    }
599
600    pub async fn set_summary(&self, id: &str, summary: String) -> anyhow::Result<bool> {
601        let mut metadata = self.metadata.write().await;
602        let meta = metadata
603            .entry(id.to_string())
604            .or_insert_with(SessionMeta::default);
605        meta.summary = Some(summary);
606        drop(metadata);
607        self.flush().await?;
608        Ok(true)
609    }
610
611    pub async fn children(&self, parent_id: &str) -> Vec<Session> {
612        let child_ids = {
613            let metadata = self.metadata.read().await;
614            metadata
615                .iter()
616                .filter(|(_, meta)| meta.parent_id.as_deref() == Some(parent_id))
617                .map(|(id, _)| id.clone())
618                .collect::<Vec<_>>()
619        };
620        let sessions = self.sessions.read().await;
621        child_ids
622            .into_iter()
623            .filter_map(|id| sessions.get(&id).cloned())
624            .collect()
625    }
626
627    pub async fn session_status(&self, id: &str) -> Option<Value> {
628        let metadata = self.metadata.read().await;
629        metadata.get(id).map(|meta| {
630            json!({
631                "archived": meta.archived,
632                "shared": meta.shared,
633                "parentID": meta.parent_id,
634                "snapshotCount": meta.snapshots.len()
635            })
636        })
637    }
638
639    pub async fn session_diff(&self, id: &str) -> Option<Value> {
640        let sessions = self.sessions.read().await;
641        let current = sessions.get(id)?;
642        let metadata = self.metadata.read().await;
643        let default = SessionMeta::default();
644        let meta = metadata.get(id).unwrap_or(&default);
645        let last_snapshot_len = meta.snapshots.last().map(|s| s.len()).unwrap_or(0);
646        Some(json!({
647            "sessionID": id,
648            "currentMessageCount": current.messages.len(),
649            "lastSnapshotMessageCount": last_snapshot_len,
650            "delta": current.messages.len() as i64 - last_snapshot_len as i64
651        }))
652    }
653
654    pub async fn set_todos(&self, id: &str, todos: Vec<Value>) -> anyhow::Result<()> {
655        let mut metadata = self.metadata.write().await;
656        let meta = metadata
657            .entry(id.to_string())
658            .or_insert_with(SessionMeta::default);
659        meta.todos = normalize_todo_items(todos);
660        drop(metadata);
661        self.flush().await
662    }
663
664    pub async fn get_todos(&self, id: &str) -> Vec<Value> {
665        let todos = self
666            .metadata
667            .read()
668            .await
669            .get(id)
670            .map(|meta| meta.todos.clone())
671            .unwrap_or_default();
672        normalize_todo_items(todos)
673    }
674
675    pub async fn add_question_request(
676        &self,
677        session_id: &str,
678        message_id: &str,
679        questions: Vec<Value>,
680    ) -> anyhow::Result<QuestionRequest> {
681        if questions.is_empty() {
682            return Err(anyhow::anyhow!(
683                "cannot add empty question request for session {}",
684                session_id
685            ));
686        }
687        let request = QuestionRequest {
688            id: format!("q-{}", Uuid::new_v4()),
689            session_id: session_id.to_string(),
690            questions,
691            tool: Some(QuestionToolRef {
692                call_id: format!("call-{}", Uuid::new_v4()),
693                message_id: message_id.to_string(),
694            }),
695        };
696        self.question_requests
697            .write()
698            .await
699            .insert(request.id.clone(), request.clone());
700        self.flush().await?;
701        Ok(request)
702    }
703
704    pub async fn list_question_requests(&self) -> Vec<QuestionRequest> {
705        self.question_requests
706            .read()
707            .await
708            .values()
709            .cloned()
710            .collect()
711    }
712
713    pub async fn reply_question(&self, request_id: &str) -> anyhow::Result<bool> {
714        let removed = self
715            .question_requests
716            .write()
717            .await
718            .remove(request_id)
719            .is_some();
720        if removed {
721            self.flush().await?;
722        }
723        Ok(removed)
724    }
725
726    pub async fn reject_question(&self, request_id: &str) -> anyhow::Result<bool> {
727        self.reply_question(request_id).await
728    }
729
730    pub async fn attach_session_to_workspace(
731        &self,
732        session_id: &str,
733        target_workspace: &str,
734        reason_tag: &str,
735    ) -> anyhow::Result<Option<Session>> {
736        let Some(target_workspace) = normalize_workspace_path(target_workspace) else {
737            return Ok(None);
738        };
739        let mut sessions = self.sessions.write().await;
740        let Some(session) = sessions.get_mut(session_id) else {
741            return Ok(None);
742        };
743        let previous_workspace = session
744            .workspace_root
745            .clone()
746            .or_else(|| normalize_workspace_path(&session.directory));
747
748        if session.origin_workspace_root.is_none() {
749            session.origin_workspace_root = previous_workspace.clone();
750        }
751        session.attached_from_workspace = previous_workspace;
752        session.attached_to_workspace = Some(target_workspace.clone());
753        session.attach_timestamp_ms = Some(Utc::now().timestamp_millis().max(0) as u64);
754        session.attach_reason = Some(reason_tag.trim().to_string());
755        session.workspace_root = Some(target_workspace.clone());
756        session.project_id = workspace_project_id(&target_workspace);
757        session.directory = target_workspace;
758        session.time.updated = Utc::now();
759        let updated = session.clone();
760        drop(sessions);
761        self.flush().await?;
762        Ok(Some(updated))
763    }
764
765    async fn flush(&self) -> anyhow::Result<()> {
766        let _flush_guard = self.flush_lock.lock().await;
767        {
768            let snapshot = self.sessions.read().await.clone();
769            self.flush_file("sessions.json", &snapshot).await?;
770        }
771        {
772            let metadata_snapshot = self.metadata.read().await.clone();
773            self.flush_file("session_meta.json", &metadata_snapshot)
774                .await?;
775        }
776        {
777            let questions_snapshot = self.question_requests.read().await.clone();
778            self.flush_file("questions.json", &questions_snapshot)
779                .await?;
780        }
781        Ok(())
782    }
783
784    async fn flush_file(&self, filename: &str, data: &impl serde::Serialize) -> anyhow::Result<()> {
785        let path = self.base.join(filename);
786        let temp_path = self.base.join(format!("{}.tmp", filename));
787        let payload = serde_json::to_string_pretty(data)?;
788        fs::write(&temp_path, payload).await.with_context(|| {
789            format!("failed to write temp storage file {}", temp_path.display())
790        })?;
791        let std_temp_path: std::path::PathBuf = temp_path.clone().try_into()?;
792        tokio::task::spawn_blocking(move || {
793            let file = std::fs::File::open(&std_temp_path)?;
794            file.sync_all()?;
795            Ok::<(), std::io::Error>(())
796        })
797        .await??;
798        commit_temp_file(&temp_path, &path).await.with_context(|| {
799            format!(
800                "failed to atomically replace storage file {} with {}",
801                path.display(),
802                temp_path.display()
803            )
804        })?;
805        Ok(())
806    }
807
808    async fn write_legacy_import_marker(&self, marker: &LegacyImportMarker) -> anyhow::Result<()> {
809        let payload = serde_json::to_string_pretty(marker)?;
810        fs::write(self.base.join(LEGACY_IMPORT_MARKER_FILE), payload).await?;
811        Ok(())
812    }
813}
814
815async fn commit_temp_file(temp_path: &Path, path: &Path) -> std::io::Result<()> {
816    match tokio::fs::rename(temp_path, path).await {
817        Ok(()) => Ok(()),
818        Err(err) => {
819            #[cfg(windows)]
820            {
821                // Windows `rename` can return PermissionDenied when replacing an existing file.
822                // Fall back to delete-then-rename for this case.
823                use std::io::ErrorKind;
824                if matches!(
825                    err.kind(),
826                    ErrorKind::PermissionDenied | ErrorKind::AlreadyExists
827                ) {
828                    match tokio::fs::remove_file(path).await {
829                        Ok(()) => {}
830                        Err(remove_err) if remove_err.kind() == ErrorKind::NotFound => {}
831                        Err(remove_err) => return Err(remove_err),
832                    }
833                    return tokio::fs::rename(temp_path, path).await;
834                }
835            }
836            Err(err)
837        }
838    }
839}
840
841fn normalize_todo_items(items: Vec<Value>) -> Vec<Value> {
842    items
843        .into_iter()
844        .filter_map(|item| {
845            let obj = item.as_object()?;
846            let content = obj
847                .get("content")
848                .and_then(|v| v.as_str())
849                .or_else(|| obj.get("text").and_then(|v| v.as_str()))
850                .unwrap_or("")
851                .trim()
852                .to_string();
853            if content.is_empty() {
854                return None;
855            }
856            let id = obj
857                .get("id")
858                .and_then(|v| v.as_str())
859                .filter(|s| !s.trim().is_empty())
860                .map(ToString::to_string)
861                .unwrap_or_else(|| format!("todo-{}", Uuid::new_v4()));
862            let status = obj
863                .get("status")
864                .and_then(|v| v.as_str())
865                .filter(|s| !s.trim().is_empty())
866                .map(ToString::to_string)
867                .unwrap_or_else(|| "pending".to_string());
868            Some(json!({
869                "id": id,
870                "content": content,
871                "status": status
872            }))
873        })
874        .collect()
875}
876
877#[derive(Debug)]
878struct LegacyScanResult {
879    sessions: HashMap<String, Session>,
880    legacy_counts: LegacyTreeCounts,
881    imported_counts: LegacyImportedCounts,
882}
883
884#[derive(Debug, Default)]
885struct LegacyMergeStats {
886    changed: bool,
887    sessions_merged: u64,
888    messages_recovered: u64,
889    parts_recovered: u64,
890}
891
892fn now_ms_u64() -> u64 {
893    Utc::now().timestamp_millis().max(0) as u64
894}
895
896async fn should_run_legacy_scan_on_startup(marker_path: &Path, sessions_exist: bool) -> bool {
897    if !sessions_exist {
898        return true;
899    }
900    // Fast-path startup: if canonical sessions already exist, do not block startup
901    // on deep legacy tree scans. Users can trigger an explicit repair scan later.
902    if read_legacy_import_marker(marker_path).await.is_none() {
903        return false;
904    }
905    false
906}
907
908async fn read_legacy_import_marker(marker_path: &Path) -> Option<LegacyImportMarker> {
909    let raw = fs::read_to_string(marker_path).await.ok()?;
910    serde_json::from_str::<LegacyImportMarker>(&raw).ok()
911}
912
913fn scan_legacy_sessions(base: &Path) -> anyhow::Result<LegacyScanResult> {
914    let sessions = load_legacy_opencode_sessions(base).unwrap_or_default();
915    let imported_counts = LegacyImportedCounts {
916        sessions: sessions.len() as u64,
917        messages: sessions.values().map(|s| s.messages.len() as u64).sum(),
918        parts: sessions
919            .values()
920            .flat_map(|s| s.messages.iter())
921            .map(|m| m.parts.len() as u64)
922            .sum(),
923    };
924    let legacy_counts = LegacyTreeCounts {
925        session_files: count_legacy_json_files(&base.join("session")),
926        message_files: count_legacy_json_files(&base.join("message")),
927        part_files: count_legacy_json_files(&base.join("part")),
928    };
929    Ok(LegacyScanResult {
930        sessions,
931        legacy_counts,
932        imported_counts,
933    })
934}
935
936fn count_legacy_json_files(root: &Path) -> u64 {
937    if !root.is_dir() {
938        return 0;
939    }
940    let mut count = 0u64;
941    let mut stack = vec![root.to_path_buf()];
942    while let Some(dir) = stack.pop() {
943        if let Ok(entries) = std::fs::read_dir(&dir) {
944            for entry in entries.flatten() {
945                let path = entry.path();
946                if entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
947                    stack.push(path);
948                    continue;
949                }
950                if path.extension().and_then(|ext| ext.to_str()) == Some("json") {
951                    count += 1;
952                }
953            }
954        }
955    }
956    count
957}
958
959fn merge_legacy_sessions(
960    current: &mut HashMap<String, Session>,
961    imported: HashMap<String, Session>,
962) -> bool {
963    merge_legacy_sessions_with_stats(current, imported).changed
964}
965
966fn merge_legacy_sessions_with_stats(
967    current: &mut HashMap<String, Session>,
968    imported: HashMap<String, Session>,
969) -> LegacyMergeStats {
970    let mut stats = LegacyMergeStats::default();
971    for (id, legacy) in imported {
972        let legacy_message_count = legacy.messages.len() as u64;
973        let legacy_part_count = legacy
974            .messages
975            .iter()
976            .map(|m| m.parts.len() as u64)
977            .sum::<u64>();
978        match current.get_mut(&id) {
979            None => {
980                current.insert(id, legacy);
981                stats.changed = true;
982                stats.sessions_merged += 1;
983                stats.messages_recovered += legacy_message_count;
984                stats.parts_recovered += legacy_part_count;
985            }
986            Some(existing) => {
987                let should_merge_messages =
988                    existing.messages.is_empty() && !legacy.messages.is_empty();
989                let should_fill_title =
990                    existing.title.trim().is_empty() && !legacy.title.trim().is_empty();
991                let should_fill_directory = (existing.directory.trim().is_empty()
992                    || existing.directory.trim() == "."
993                    || existing.directory.trim() == "./"
994                    || existing.directory.trim() == ".\\")
995                    && !legacy.directory.trim().is_empty();
996                let should_fill_workspace =
997                    existing.workspace_root.is_none() && legacy.workspace_root.is_some();
998                if should_merge_messages {
999                    existing.messages = legacy.messages.clone();
1000                }
1001                if should_fill_title {
1002                    existing.title = legacy.title.clone();
1003                }
1004                if should_fill_directory {
1005                    existing.directory = legacy.directory.clone();
1006                }
1007                if should_fill_workspace {
1008                    existing.workspace_root = legacy.workspace_root.clone();
1009                }
1010                if should_merge_messages
1011                    || should_fill_title
1012                    || should_fill_directory
1013                    || should_fill_workspace
1014                {
1015                    stats.changed = true;
1016                    if should_merge_messages {
1017                        stats.sessions_merged += 1;
1018                        stats.messages_recovered += legacy_message_count;
1019                        stats.parts_recovered += legacy_part_count;
1020                    }
1021                }
1022            }
1023        }
1024    }
1025    stats
1026}
1027
1028fn hydrate_workspace_roots(sessions: &mut HashMap<String, Session>) -> bool {
1029    let mut changed = false;
1030    for session in sessions.values_mut() {
1031        if session.workspace_root.is_none() {
1032            let normalized = normalize_workspace_path(&session.directory);
1033            if normalized.is_some() {
1034                session.workspace_root = normalized;
1035                changed = true;
1036            }
1037        }
1038    }
1039    changed
1040}
1041
1042fn repair_session_titles(sessions: &mut HashMap<String, Session>) -> bool {
1043    let mut changed = false;
1044    for session in sessions.values_mut() {
1045        if !title_needs_repair(&session.title) {
1046            continue;
1047        }
1048        let first_user_text = session.messages.iter().find_map(|message| {
1049            if !matches!(message.role, MessageRole::User) {
1050                return None;
1051            }
1052            message.parts.iter().find_map(|part| match part {
1053                MessagePart::Text { text } if !text.trim().is_empty() => Some(text.as_str()),
1054                _ => None,
1055            })
1056        });
1057        let Some(source) = first_user_text else {
1058            continue;
1059        };
1060        let Some(derived) = derive_session_title_from_prompt(source, 60) else {
1061            continue;
1062        };
1063        if derived == session.title {
1064            continue;
1065        }
1066        session.title = derived;
1067        session.time.updated = Utc::now();
1068        changed = true;
1069    }
1070    changed
1071}
1072
1073#[derive(Debug, Deserialize)]
1074struct LegacySessionTime {
1075    created: i64,
1076    updated: i64,
1077}
1078
1079#[derive(Debug, Deserialize)]
1080struct LegacySession {
1081    id: String,
1082    slug: Option<String>,
1083    version: Option<String>,
1084    #[serde(rename = "projectID")]
1085    project_id: Option<String>,
1086    title: Option<String>,
1087    directory: Option<String>,
1088    time: LegacySessionTime,
1089}
1090
1091fn load_legacy_opencode_sessions(base: &Path) -> anyhow::Result<HashMap<String, Session>> {
1092    let legacy_root = base.join("session");
1093    if !legacy_root.is_dir() {
1094        return Ok(HashMap::new());
1095    }
1096
1097    let mut out = HashMap::new();
1098    let mut stack = vec![legacy_root];
1099    while let Some(dir) = stack.pop() {
1100        for entry in std::fs::read_dir(&dir)? {
1101            let entry = entry?;
1102            let path = entry.path();
1103            if entry.file_type()?.is_dir() {
1104                stack.push(path);
1105                continue;
1106            }
1107            if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1108                continue;
1109            }
1110            let raw = match std::fs::read_to_string(&path) {
1111                Ok(v) => v,
1112                Err(_) => continue,
1113            };
1114            let legacy = match serde_json::from_str::<LegacySession>(&raw) {
1115                Ok(v) => v,
1116                Err(_) => continue,
1117            };
1118            let created = Utc
1119                .timestamp_millis_opt(legacy.time.created)
1120                .single()
1121                .unwrap_or_else(Utc::now);
1122            let updated = Utc
1123                .timestamp_millis_opt(legacy.time.updated)
1124                .single()
1125                .unwrap_or(created);
1126
1127            let session_id = legacy.id.clone();
1128            out.insert(
1129                session_id.clone(),
1130                Session {
1131                    id: session_id.clone(),
1132                    slug: legacy.slug,
1133                    version: legacy.version,
1134                    project_id: legacy.project_id,
1135                    title: legacy
1136                        .title
1137                        .filter(|s| !s.trim().is_empty())
1138                        .unwrap_or_else(|| "New session".to_string()),
1139                    directory: legacy
1140                        .directory
1141                        .clone()
1142                        .filter(|s| !s.trim().is_empty())
1143                        .unwrap_or_else(|| ".".to_string()),
1144                    workspace_root: legacy
1145                        .directory
1146                        .as_deref()
1147                        .and_then(normalize_workspace_path),
1148                    origin_workspace_root: None,
1149                    attached_from_workspace: None,
1150                    attached_to_workspace: None,
1151                    attach_timestamp_ms: None,
1152                    attach_reason: None,
1153                    tenant_context: tandem_types::LocalImplicitTenant.into(),
1154                    time: tandem_types::SessionTime { created, updated },
1155                    model: None,
1156                    provider: None,
1157                    environment: None,
1158                    messages: load_legacy_session_messages(base, &session_id),
1159                },
1160            );
1161        }
1162    }
1163    Ok(out)
1164}
1165
1166#[derive(Debug, Deserialize)]
1167struct LegacyMessageTime {
1168    created: i64,
1169}
1170
1171#[derive(Debug, Deserialize)]
1172struct LegacyMessage {
1173    id: String,
1174    role: String,
1175    time: LegacyMessageTime,
1176}
1177
1178#[derive(Debug, Deserialize)]
1179struct LegacyPart {
1180    #[serde(rename = "type")]
1181    part_type: Option<String>,
1182    text: Option<String>,
1183    tool: Option<String>,
1184    args: Option<Value>,
1185    result: Option<Value>,
1186    error: Option<String>,
1187}
1188
1189fn load_legacy_session_messages(base: &Path, session_id: &str) -> Vec<Message> {
1190    let msg_dir = base.join("message").join(session_id);
1191    if !msg_dir.is_dir() {
1192        return Vec::new();
1193    }
1194
1195    let mut legacy_messages: Vec<(i64, Message)> = Vec::new();
1196
1197    let Ok(entries) = std::fs::read_dir(&msg_dir) else {
1198        return Vec::new();
1199    };
1200
1201    for entry in entries.flatten() {
1202        let path = entry.path();
1203        if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1204            continue;
1205        }
1206        let Ok(raw) = std::fs::read_to_string(&path) else {
1207            continue;
1208        };
1209        let Ok(legacy) = serde_json::from_str::<LegacyMessage>(&raw) else {
1210            continue;
1211        };
1212
1213        let created_at = Utc
1214            .timestamp_millis_opt(legacy.time.created)
1215            .single()
1216            .unwrap_or_else(Utc::now);
1217
1218        legacy_messages.push((
1219            legacy.time.created,
1220            Message {
1221                id: legacy.id.clone(),
1222                role: legacy_role_to_message_role(&legacy.role),
1223                parts: load_legacy_message_parts(base, &legacy.id),
1224                created_at,
1225            },
1226        ));
1227    }
1228
1229    legacy_messages.sort_by_key(|(created_ms, _)| *created_ms);
1230    legacy_messages.into_iter().map(|(_, msg)| msg).collect()
1231}
1232
1233fn load_legacy_message_parts(base: &Path, message_id: &str) -> Vec<MessagePart> {
1234    let parts_dir = base.join("part").join(message_id);
1235    if !parts_dir.is_dir() {
1236        return Vec::new();
1237    }
1238
1239    let Ok(entries) = std::fs::read_dir(&parts_dir) else {
1240        return Vec::new();
1241    };
1242
1243    let mut out = Vec::new();
1244    for entry in entries.flatten() {
1245        let path = entry.path();
1246        if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1247            continue;
1248        }
1249        let Ok(raw) = std::fs::read_to_string(&path) else {
1250            continue;
1251        };
1252        let Ok(part) = serde_json::from_str::<LegacyPart>(&raw) else {
1253            continue;
1254        };
1255
1256        let mapped = if let Some(tool) = part.tool {
1257            Some(MessagePart::ToolInvocation {
1258                tool,
1259                args: part.args.unwrap_or_else(|| json!({})),
1260                result: part.result,
1261                error: part.error,
1262            })
1263        } else {
1264            match part.part_type.as_deref() {
1265                Some("reasoning") => Some(MessagePart::Reasoning {
1266                    text: part.text.unwrap_or_default(),
1267                }),
1268                Some("tool") => Some(MessagePart::ToolInvocation {
1269                    tool: "tool".to_string(),
1270                    args: part.args.unwrap_or_else(|| json!({})),
1271                    result: part.result,
1272                    error: part.error,
1273                }),
1274                Some("text") | None => Some(MessagePart::Text {
1275                    text: part.text.unwrap_or_default(),
1276                }),
1277                _ => None,
1278            }
1279        };
1280
1281        if let Some(part) = mapped {
1282            out.push(part);
1283        }
1284    }
1285    out
1286}
1287
1288fn legacy_role_to_message_role(role: &str) -> MessageRole {
1289    match role.to_lowercase().as_str() {
1290        "user" => MessageRole::User,
1291        "assistant" => MessageRole::Assistant,
1292        "system" => MessageRole::System,
1293        "tool" => MessageRole::Tool,
1294        _ => MessageRole::Assistant,
1295    }
1296}
1297
1298#[derive(Debug, Clone, Default)]
1299struct MessageMergeStats {
1300    messages_recovered: u64,
1301    parts_recovered: u64,
1302    conflicts_merged: u64,
1303}
1304
1305fn message_richness(msg: &Message) -> usize {
1306    msg.parts
1307        .iter()
1308        .map(|p| match p {
1309            MessagePart::Text { text } | MessagePart::Reasoning { text } => {
1310                if text.trim().is_empty() {
1311                    0
1312                } else {
1313                    1
1314                }
1315            }
1316            MessagePart::ToolInvocation { result, error, .. } => {
1317                if result.is_some() || error.is_some() {
1318                    2
1319                } else {
1320                    1
1321                }
1322            }
1323        })
1324        .sum()
1325}
1326
1327fn most_recent_message_time(messages: &[Message]) -> Option<chrono::DateTime<Utc>> {
1328    messages.iter().map(|m| m.created_at).max()
1329}
1330
1331fn merge_session_messages(
1332    existing: &[Message],
1333    imported: &[Message],
1334) -> (Vec<Message>, MessageMergeStats, bool) {
1335    if existing.is_empty() {
1336        let messages_recovered = imported.len() as u64;
1337        let parts_recovered = imported.iter().map(|m| m.parts.len() as u64).sum();
1338        return (
1339            imported.to_vec(),
1340            MessageMergeStats {
1341                messages_recovered,
1342                parts_recovered,
1343                conflicts_merged: 0,
1344            },
1345            true,
1346        );
1347    }
1348
1349    let mut merged_by_id: HashMap<String, Message> = existing
1350        .iter()
1351        .cloned()
1352        .map(|m| (m.id.clone(), m))
1353        .collect();
1354    let mut stats = MessageMergeStats::default();
1355    let mut changed = false;
1356
1357    for incoming in imported {
1358        match merged_by_id.get(&incoming.id) {
1359            None => {
1360                merged_by_id.insert(incoming.id.clone(), incoming.clone());
1361                stats.messages_recovered += 1;
1362                stats.parts_recovered += incoming.parts.len() as u64;
1363                changed = true;
1364            }
1365            Some(current) => {
1366                let incoming_richer = message_richness(incoming) > message_richness(current)
1367                    || incoming.parts.len() > current.parts.len();
1368                if incoming_richer {
1369                    merged_by_id.insert(incoming.id.clone(), incoming.clone());
1370                    stats.conflicts_merged += 1;
1371                    changed = true;
1372                }
1373            }
1374        }
1375    }
1376
1377    let mut out: Vec<Message> = merged_by_id.into_values().collect();
1378    out.sort_by_key(|m| m.created_at);
1379    (out, stats, changed)
1380}
1381
1382#[cfg(test)]
1383mod tests {
1384    use super::*;
1385    use std::fs as stdfs;
1386    use std::sync::Arc;
1387
1388    #[tokio::test]
1389    async fn todos_are_normalized_to_wire_shape() {
1390        let base = std::env::temp_dir().join(format!("tandem-core-test-{}", Uuid::new_v4()));
1391        let storage = Storage::new(&base).await.expect("storage");
1392        let session = Session::new(Some("test".to_string()), Some(".".to_string()));
1393        let id = session.id.clone();
1394        storage.save_session(session).await.expect("save session");
1395
1396        storage
1397            .set_todos(
1398                &id,
1399                vec![
1400                    json!({"content":"first"}),
1401                    json!({"text":"second", "status":"in_progress"}),
1402                    json!({"id":"keep-id","content":"third","status":"completed"}),
1403                ],
1404            )
1405            .await
1406            .expect("set todos");
1407
1408        let todos = storage.get_todos(&id).await;
1409        assert_eq!(todos.len(), 3);
1410        for todo in todos {
1411            assert!(todo.get("id").and_then(|v| v.as_str()).is_some());
1412            assert!(todo.get("content").and_then(|v| v.as_str()).is_some());
1413            assert!(todo.get("status").and_then(|v| v.as_str()).is_some());
1414        }
1415    }
1416
1417    #[tokio::test]
1418    async fn imports_legacy_opencode_session_index_when_sessions_json_missing() {
1419        let base =
1420            std::env::temp_dir().join(format!("tandem-core-legacy-import-{}", Uuid::new_v4()));
1421        let legacy_session_dir = base.join("session").join("global");
1422        stdfs::create_dir_all(&legacy_session_dir).expect("legacy session dir");
1423        stdfs::write(
1424            legacy_session_dir.join("ses_test.json"),
1425            r#"{
1426  "id": "ses_test",
1427  "slug": "test",
1428  "version": "1.0.0",
1429  "projectID": "proj_1",
1430  "directory": "C:\\work\\demo",
1431  "title": "Legacy Session",
1432  "time": { "created": 1770913145613, "updated": 1770913146613 }
1433}"#,
1434        )
1435        .expect("legacy session write");
1436
1437        let storage = Storage::new(&base).await.expect("storage");
1438        let sessions = storage.list_sessions().await;
1439        assert_eq!(sessions.len(), 1);
1440        assert_eq!(sessions[0].id, "ses_test");
1441        assert_eq!(sessions[0].title, "Legacy Session");
1442        assert!(base.join("sessions.json").exists());
1443    }
1444
1445    #[tokio::test]
1446    async fn imports_legacy_messages_and_parts_for_session() {
1447        let base = std::env::temp_dir().join(format!("tandem-core-legacy-msg-{}", Uuid::new_v4()));
1448        let session_dir = base.join("session").join("global");
1449        let message_dir = base.join("message").join("ses_test");
1450        let part_dir = base.join("part").join("msg_1");
1451        stdfs::create_dir_all(&session_dir).expect("session dir");
1452        stdfs::create_dir_all(&message_dir).expect("message dir");
1453        stdfs::create_dir_all(&part_dir).expect("part dir");
1454
1455        stdfs::write(
1456            session_dir.join("ses_test.json"),
1457            r#"{
1458  "id": "ses_test",
1459  "projectID": "proj_1",
1460  "directory": "C:\\work\\demo",
1461  "title": "Legacy Session",
1462  "time": { "created": 1770913145613, "updated": 1770913146613 }
1463}"#,
1464        )
1465        .expect("write session");
1466
1467        stdfs::write(
1468            message_dir.join("msg_1.json"),
1469            r#"{
1470  "id": "msg_1",
1471  "sessionID": "ses_test",
1472  "role": "assistant",
1473  "time": { "created": 1770913145613 }
1474}"#,
1475        )
1476        .expect("write msg");
1477
1478        stdfs::write(
1479            part_dir.join("prt_1.json"),
1480            r#"{
1481  "id": "prt_1",
1482  "sessionID": "ses_test",
1483  "messageID": "msg_1",
1484  "type": "text",
1485  "text": "hello from legacy"
1486}"#,
1487        )
1488        .expect("write part");
1489
1490        let storage = Storage::new(&base).await.expect("storage");
1491        let sessions = storage.list_sessions().await;
1492        assert_eq!(sessions.len(), 1);
1493        assert_eq!(sessions[0].messages.len(), 1);
1494        assert_eq!(sessions[0].messages[0].parts.len(), 1);
1495    }
1496
1497    #[tokio::test]
1498    async fn skips_legacy_merge_when_sessions_json_exists() {
1499        let base =
1500            std::env::temp_dir().join(format!("tandem-core-legacy-merge-{}", Uuid::new_v4()));
1501        stdfs::create_dir_all(&base).expect("base");
1502        stdfs::write(
1503            base.join("sessions.json"),
1504            r#"{
1505  "ses_current": {
1506    "id": "ses_current",
1507    "slug": null,
1508    "version": "v1",
1509    "project_id": null,
1510    "title": "Current Session",
1511    "directory": ".",
1512    "workspace_root": null,
1513    "origin_workspace_root": null,
1514    "attached_from_workspace": null,
1515    "attached_to_workspace": null,
1516    "attach_timestamp_ms": null,
1517    "attach_reason": null,
1518    "time": {"created":"2026-01-01T00:00:00Z","updated":"2026-01-01T00:00:00Z"},
1519    "model": null,
1520    "provider": null,
1521    "messages": []
1522  }
1523}"#,
1524        )
1525        .expect("sessions.json");
1526
1527        let legacy_session_dir = base.join("session").join("global");
1528        stdfs::create_dir_all(&legacy_session_dir).expect("legacy session dir");
1529        stdfs::write(
1530            legacy_session_dir.join("ses_legacy.json"),
1531            r#"{
1532  "id": "ses_legacy",
1533  "slug": "legacy",
1534  "version": "1.0.0",
1535  "projectID": "proj_legacy",
1536  "directory": "C:\\work\\legacy",
1537  "title": "Legacy Session",
1538  "time": { "created": 1770913145613, "updated": 1770913146613 }
1539}"#,
1540        )
1541        .expect("legacy session write");
1542
1543        let storage = Storage::new(&base).await.expect("storage");
1544        let sessions = storage.list_sessions().await;
1545        let ids = sessions.iter().map(|s| s.id.clone()).collect::<Vec<_>>();
1546        assert!(ids.contains(&"ses_current".to_string()));
1547        assert!(!ids.contains(&"ses_legacy".to_string()));
1548    }
1549
1550    #[tokio::test]
1551    async fn list_sessions_scoped_filters_by_workspace_root() {
1552        let base = std::env::temp_dir().join(format!("tandem-core-scope-{}", Uuid::new_v4()));
1553        let storage = Storage::new(&base).await.expect("storage");
1554        let ws_a = base.join("ws-a");
1555        let ws_b = base.join("ws-b");
1556        stdfs::create_dir_all(&ws_a).expect("ws_a");
1557        stdfs::create_dir_all(&ws_b).expect("ws_b");
1558        let ws_a_str = ws_a.to_string_lossy().to_string();
1559        let ws_b_str = ws_b.to_string_lossy().to_string();
1560
1561        let mut a = Session::new(Some("a".to_string()), Some(ws_a_str.clone()));
1562        a.workspace_root = Some(ws_a_str.clone());
1563        storage.save_session(a).await.expect("save a");
1564
1565        let mut b = Session::new(Some("b".to_string()), Some(ws_b_str.clone()));
1566        b.workspace_root = Some(ws_b_str);
1567        storage.save_session(b).await.expect("save b");
1568
1569        let scoped = storage
1570            .list_sessions_scoped(SessionListScope::Workspace {
1571                workspace_root: ws_a_str,
1572            })
1573            .await;
1574        assert_eq!(scoped.len(), 1);
1575        assert_eq!(scoped[0].title, "a");
1576    }
1577
1578    #[tokio::test]
1579    async fn attach_session_persists_audit_metadata() {
1580        let base = std::env::temp_dir().join(format!("tandem-core-attach-{}", Uuid::new_v4()));
1581        let storage = Storage::new(&base).await.expect("storage");
1582        let ws_a = base.join("ws-a");
1583        let ws_b = base.join("ws-b");
1584        stdfs::create_dir_all(&ws_a).expect("ws_a");
1585        stdfs::create_dir_all(&ws_b).expect("ws_b");
1586        let ws_a_str = ws_a.to_string_lossy().to_string();
1587        let ws_b_str = ws_b.to_string_lossy().to_string();
1588        let mut session = Session::new(Some("s".to_string()), Some(ws_a_str.clone()));
1589        session.workspace_root = Some(ws_a_str);
1590        let id = session.id.clone();
1591        storage.save_session(session).await.expect("save");
1592
1593        let updated = storage
1594            .attach_session_to_workspace(&id, &ws_b_str, "manual")
1595            .await
1596            .expect("attach")
1597            .expect("session exists");
1598        let normalized_expected = normalize_workspace_path(&ws_b_str).expect("normalized path");
1599        assert_eq!(
1600            updated.workspace_root.as_deref(),
1601            Some(normalized_expected.as_str())
1602        );
1603        assert_eq!(
1604            updated.attached_to_workspace.as_deref(),
1605            Some(normalized_expected.as_str())
1606        );
1607        assert_eq!(updated.attach_reason.as_deref(), Some("manual"));
1608        assert!(updated.attach_timestamp_ms.is_some());
1609    }
1610
1611    #[tokio::test]
1612    async fn append_message_part_persists_tool_invocation_and_result() {
1613        let base = std::env::temp_dir().join(format!("tandem-core-tool-parts-{}", Uuid::new_v4()));
1614        let storage = Storage::new(&base).await.expect("storage");
1615        let session = Session::new(Some("tool parts".to_string()), Some(".".to_string()));
1616        let session_id = session.id.clone();
1617        storage.save_session(session).await.expect("save session");
1618
1619        let user = Message::new(
1620            MessageRole::User,
1621            vec![MessagePart::Text {
1622                text: "build ui".to_string(),
1623            }],
1624        );
1625        let message_id = user.id.clone();
1626        storage
1627            .append_message(&session_id, user)
1628            .await
1629            .expect("append user");
1630
1631        storage
1632            .append_message_part(
1633                &session_id,
1634                &message_id,
1635                MessagePart::ToolInvocation {
1636                    tool: "write".to_string(),
1637                    args: json!({"path":"game.html","content":"<html></html>"}),
1638                    result: None,
1639                    error: None,
1640                },
1641            )
1642            .await
1643            .expect("append invocation");
1644        storage
1645            .append_message_part(
1646                &session_id,
1647                &message_id,
1648                MessagePart::ToolInvocation {
1649                    tool: "write".to_string(),
1650                    args: json!({}),
1651                    result: Some(json!("ok")),
1652                    error: None,
1653                },
1654            )
1655            .await
1656            .expect("append result");
1657
1658        let session = storage.get_session(&session_id).await.expect("session");
1659        let message = session
1660            .messages
1661            .iter()
1662            .find(|message| message.id == message_id)
1663            .expect("message");
1664        assert_eq!(message.parts.len(), 2);
1665        match &message.parts[1] {
1666            MessagePart::ToolInvocation {
1667                tool,
1668                result,
1669                error,
1670                ..
1671            } => {
1672                assert_eq!(tool, "write");
1673                assert_eq!(result.as_ref(), Some(&json!("ok")));
1674                assert_eq!(error.as_deref(), None);
1675            }
1676            other => panic!("expected tool part, got {other:?}"),
1677        }
1678    }
1679
1680    #[tokio::test]
1681    async fn append_message_part_retains_failed_tool_error() {
1682        let base = std::env::temp_dir().join(format!("tandem-core-tool-error-{}", Uuid::new_v4()));
1683        let storage = Storage::new(&base).await.expect("storage");
1684        let session = Session::new(Some("tool errors".to_string()), Some(".".to_string()));
1685        let session_id = session.id.clone();
1686        storage.save_session(session).await.expect("save session");
1687
1688        let user = Message::new(
1689            MessageRole::User,
1690            vec![MessagePart::Text {
1691                text: "write file".to_string(),
1692            }],
1693        );
1694        let message_id = user.id.clone();
1695        storage
1696            .append_message(&session_id, user)
1697            .await
1698            .expect("append user");
1699
1700        storage
1701            .append_message_part(
1702                &session_id,
1703                &message_id,
1704                MessagePart::ToolInvocation {
1705                    tool: "write".to_string(),
1706                    args: json!({"path":"game.html"}),
1707                    result: None,
1708                    error: None,
1709                },
1710            )
1711            .await
1712            .expect("append invocation");
1713        storage
1714            .append_message_part(
1715                &session_id,
1716                &message_id,
1717                MessagePart::ToolInvocation {
1718                    tool: "write".to_string(),
1719                    args: json!({}),
1720                    result: None,
1721                    error: Some("WRITE_CONTENT_MISSING".to_string()),
1722                },
1723            )
1724            .await
1725            .expect("append error");
1726
1727        let session = storage.get_session(&session_id).await.expect("session");
1728        let message = session
1729            .messages
1730            .iter()
1731            .find(|message| message.id == message_id)
1732            .expect("message");
1733        match &message.parts[1] {
1734            MessagePart::ToolInvocation { error, .. } => {
1735                assert_eq!(error.as_deref(), Some("WRITE_CONTENT_MISSING"));
1736            }
1737            other => panic!("expected tool part, got {other:?}"),
1738        }
1739    }
1740
1741    #[tokio::test]
1742    async fn append_message_part_coalesces_repeated_tool_invocation_updates() {
1743        let base = std::env::temp_dir().join(format!("tandem-core-tool-merge-{}", Uuid::new_v4()));
1744        let storage = Storage::new(&base).await.expect("storage");
1745        let session = Session::new(Some("tool merge".to_string()), Some(".".to_string()));
1746        let session_id = session.id.clone();
1747        storage.save_session(session).await.expect("save session");
1748
1749        let user = Message::new(
1750            MessageRole::User,
1751            vec![MessagePart::Text {
1752                text: "build ui".to_string(),
1753            }],
1754        );
1755        let message_id = user.id.clone();
1756        storage
1757            .append_message(&session_id, user)
1758            .await
1759            .expect("append user");
1760
1761        storage
1762            .append_message_part(
1763                &session_id,
1764                &message_id,
1765                MessagePart::ToolInvocation {
1766                    tool: "write".to_string(),
1767                    args: json!({"path":"game.html"}),
1768                    result: None,
1769                    error: None,
1770                },
1771            )
1772            .await
1773            .expect("append first invocation");
1774        storage
1775            .append_message_part(
1776                &session_id,
1777                &message_id,
1778                MessagePart::ToolInvocation {
1779                    tool: "write".to_string(),
1780                    args: json!({"path":"game.html","content":"<html></html>"}),
1781                    result: None,
1782                    error: None,
1783                },
1784            )
1785            .await
1786            .expect("append updated invocation");
1787
1788        let session = storage.get_session(&session_id).await.expect("session");
1789        let message = session
1790            .messages
1791            .iter()
1792            .find(|message| message.id == message_id)
1793            .expect("message");
1794        assert_eq!(message.parts.len(), 2);
1795        match &message.parts[1] {
1796            MessagePart::ToolInvocation { tool, args, .. } => {
1797                assert_eq!(tool, "write");
1798                assert_eq!(args["path"], "game.html");
1799                assert_eq!(args["content"], "<html></html>");
1800            }
1801            other => panic!("expected tool part, got {other:?}"),
1802        }
1803    }
1804
1805    #[tokio::test]
1806    async fn append_message_part_upgrades_raw_string_args_to_structured_invocation_args() {
1807        let base =
1808            std::env::temp_dir().join(format!("tandem-core-tool-raw-upgrade-{}", Uuid::new_v4()));
1809        let storage = Storage::new(&base).await.expect("storage");
1810        let session = Session::new(Some("tool raw upgrade".to_string()), Some(".".to_string()));
1811        let session_id = session.id.clone();
1812        storage.save_session(session).await.expect("save session");
1813
1814        let user = Message::new(
1815            MessageRole::User,
1816            vec![MessagePart::Text {
1817                text: "build ui".to_string(),
1818            }],
1819        );
1820        let message_id = user.id.clone();
1821        storage
1822            .append_message(&session_id, user)
1823            .await
1824            .expect("append user");
1825
1826        storage
1827            .append_message_part(
1828                &session_id,
1829                &message_id,
1830                MessagePart::ToolInvocation {
1831                    tool: "write".to_string(),
1832                    args: json!("{\"path\":\"game.html\",\"content\":\"<html>draft</html>\"}"),
1833                    result: None,
1834                    error: None,
1835                },
1836            )
1837            .await
1838            .expect("append raw invocation");
1839        storage
1840            .append_message_part(
1841                &session_id,
1842                &message_id,
1843                MessagePart::ToolInvocation {
1844                    tool: "write".to_string(),
1845                    args: json!({"path":"game.html","content":"<html>draft</html>"}),
1846                    result: None,
1847                    error: None,
1848                },
1849            )
1850            .await
1851            .expect("append structured invocation");
1852
1853        let session = storage.get_session(&session_id).await.expect("session");
1854        let message = session
1855            .messages
1856            .iter()
1857            .find(|message| message.id == message_id)
1858            .expect("message");
1859        assert_eq!(message.parts.len(), 2);
1860        match &message.parts[1] {
1861            MessagePart::ToolInvocation { tool, args, .. } => {
1862                assert_eq!(tool, "write");
1863                assert_eq!(args["path"], "game.html");
1864                assert_eq!(args["content"], "<html>draft</html>");
1865            }
1866            other => panic!("expected tool part, got {other:?}"),
1867        }
1868    }
1869
1870    #[tokio::test]
1871    async fn append_message_part_upgrades_raw_string_args_when_result_arrives_with_structure() {
1872        let base = std::env::temp_dir().join(format!(
1873            "tandem-core-tool-raw-result-upgrade-{}",
1874            Uuid::new_v4()
1875        ));
1876        let storage = Storage::new(&base).await.expect("storage");
1877        let session = Session::new(
1878            Some("tool raw result upgrade".to_string()),
1879            Some(".".to_string()),
1880        );
1881        let session_id = session.id.clone();
1882        storage.save_session(session).await.expect("save session");
1883
1884        let user = Message::new(
1885            MessageRole::User,
1886            vec![MessagePart::Text {
1887                text: "build ui".to_string(),
1888            }],
1889        );
1890        let message_id = user.id.clone();
1891        storage
1892            .append_message(&session_id, user)
1893            .await
1894            .expect("append user");
1895
1896        storage
1897            .append_message_part(
1898                &session_id,
1899                &message_id,
1900                MessagePart::ToolInvocation {
1901                    tool: "write".to_string(),
1902                    args: json!("{\"path\":\"game.html\",\"content\":\"<html>draft</html>\"}"),
1903                    result: None,
1904                    error: None,
1905                },
1906            )
1907            .await
1908            .expect("append raw invocation");
1909        storage
1910            .append_message_part(
1911                &session_id,
1912                &message_id,
1913                MessagePart::ToolInvocation {
1914                    tool: "write".to_string(),
1915                    args: json!({"path":"game.html","content":"<html>draft</html>"}),
1916                    result: Some(json!("ok")),
1917                    error: None,
1918                },
1919            )
1920            .await
1921            .expect("append structured result");
1922
1923        let session = storage.get_session(&session_id).await.expect("session");
1924        let message = session
1925            .messages
1926            .iter()
1927            .find(|message| message.id == message_id)
1928            .expect("message");
1929        assert_eq!(message.parts.len(), 2);
1930        match &message.parts[1] {
1931            MessagePart::ToolInvocation {
1932                tool,
1933                args,
1934                result,
1935                error,
1936            } => {
1937                assert_eq!(tool, "write");
1938                assert_eq!(args["path"], "game.html");
1939                assert_eq!(args["content"], "<html>draft</html>");
1940                assert_eq!(result.as_ref(), Some(&json!("ok")));
1941                assert_eq!(error.as_deref(), None);
1942            }
1943            other => panic!("expected tool part, got {other:?}"),
1944        }
1945    }
1946
1947    #[tokio::test]
1948    async fn append_message_part_upgrades_partial_structured_args_when_result_adds_fields() {
1949        let base = std::env::temp_dir().join(format!(
1950            "tandem-core-tool-structured-result-upgrade-{}",
1951            Uuid::new_v4()
1952        ));
1953        let storage = Storage::new(&base).await.expect("storage");
1954        let session = Session::new(
1955            Some("tool structured result upgrade".to_string()),
1956            Some(".".to_string()),
1957        );
1958        let session_id = session.id.clone();
1959        storage.save_session(session).await.expect("save session");
1960
1961        let user = Message::new(
1962            MessageRole::User,
1963            vec![MessagePart::Text {
1964                text: "build ui".to_string(),
1965            }],
1966        );
1967        let message_id = user.id.clone();
1968        storage
1969            .append_message(&session_id, user)
1970            .await
1971            .expect("append user");
1972
1973        storage
1974            .append_message_part(
1975                &session_id,
1976                &message_id,
1977                MessagePart::ToolInvocation {
1978                    tool: "write".to_string(),
1979                    args: json!({"path":"game.html"}),
1980                    result: None,
1981                    error: None,
1982                },
1983            )
1984            .await
1985            .expect("append partial invocation");
1986        storage
1987            .append_message_part(
1988                &session_id,
1989                &message_id,
1990                MessagePart::ToolInvocation {
1991                    tool: "write".to_string(),
1992                    args: json!({"path":"game.html","content":"<html>draft</html>"}),
1993                    result: Some(json!("ok")),
1994                    error: None,
1995                },
1996            )
1997            .await
1998            .expect("append richer result");
1999
2000        let session = storage.get_session(&session_id).await.expect("session");
2001        let message = session
2002            .messages
2003            .iter()
2004            .find(|message| message.id == message_id)
2005            .expect("message");
2006        assert_eq!(message.parts.len(), 2);
2007        match &message.parts[1] {
2008            MessagePart::ToolInvocation {
2009                tool,
2010                args,
2011                result,
2012                error,
2013            } => {
2014                assert_eq!(tool, "write");
2015                assert_eq!(args["path"], "game.html");
2016                assert_eq!(args["content"], "<html>draft</html>");
2017                assert_eq!(result.as_ref(), Some(&json!("ok")));
2018                assert_eq!(error.as_deref(), None);
2019            }
2020            other => panic!("expected tool part, got {other:?}"),
2021        }
2022    }
2023
2024    #[tokio::test]
2025    async fn append_message_part_replaces_malformed_object_args_with_structured_result_args() {
2026        let base = std::env::temp_dir().join(format!(
2027            "tandem-core-tool-malformed-args-replace-{}",
2028            Uuid::new_v4()
2029        ));
2030        let storage = Storage::new(&base).await.expect("storage");
2031        let session = Session::new(
2032            Some("tool malformed args replacement".to_string()),
2033            Some(".".to_string()),
2034        );
2035        let session_id = session.id.clone();
2036        storage.save_session(session).await.expect("save session");
2037
2038        let user = Message::new(
2039            MessageRole::User,
2040            vec![MessagePart::Text {
2041                text: "build ui".to_string(),
2042            }],
2043        );
2044        let message_id = user.id.clone();
2045        storage
2046            .append_message(&session_id, user)
2047            .await
2048            .expect("append user");
2049
2050        storage
2051            .append_message_part(
2052                &session_id,
2053                &message_id,
2054                MessagePart::ToolInvocation {
2055                    tool: "write".to_string(),
2056                    args: json!({"{\"allow_empty": null}),
2057                    result: None,
2058                    error: None,
2059                },
2060            )
2061            .await
2062            .expect("append malformed invocation");
2063        storage
2064            .append_message_part(
2065                &session_id,
2066                &message_id,
2067                MessagePart::ToolInvocation {
2068                    tool: "write".to_string(),
2069                    args: json!({"path":"game.html","content":"<html>draft</html>"}),
2070                    result: Some(json!("ok")),
2071                    error: None,
2072                },
2073            )
2074            .await
2075            .expect("append structured result");
2076
2077        let session = storage.get_session(&session_id).await.expect("session");
2078        let message = session
2079            .messages
2080            .iter()
2081            .find(|message| message.id == message_id)
2082            .expect("message");
2083        assert_eq!(message.parts.len(), 2);
2084        match &message.parts[1] {
2085            MessagePart::ToolInvocation {
2086                tool,
2087                args,
2088                result,
2089                error,
2090            } => {
2091                assert_eq!(tool, "write");
2092                assert_eq!(args["path"], "game.html");
2093                assert_eq!(args["content"], "<html>draft</html>");
2094                assert_eq!(result.as_ref(), Some(&json!("ok")));
2095                assert_eq!(error.as_deref(), None);
2096            }
2097            other => panic!("expected tool part, got {other:?}"),
2098        }
2099    }
2100
2101    #[tokio::test]
2102    async fn append_message_part_replaces_partial_write_args_when_result_adds_path_and_content() {
2103        let base = std::env::temp_dir().join(format!(
2104            "tandem-core-tool-partial-write-args-replace-{}",
2105            Uuid::new_v4()
2106        ));
2107        let storage = Storage::new(&base).await.expect("storage");
2108        let session = Session::new(
2109            Some("tool partial write args replacement".to_string()),
2110            Some(".".to_string()),
2111        );
2112        let session_id = session.id.clone();
2113        storage.save_session(session).await.expect("save session");
2114
2115        let user = Message::new(
2116            MessageRole::User,
2117            vec![MessagePart::Text {
2118                text: "build ui".to_string(),
2119            }],
2120        );
2121        let message_id = user.id.clone();
2122        storage
2123            .append_message(&session_id, user)
2124            .await
2125            .expect("append user");
2126
2127        storage
2128            .append_message_part(
2129                &session_id,
2130                &message_id,
2131                MessagePart::ToolInvocation {
2132                    tool: "write".to_string(),
2133                    args: json!({"content": ""}),
2134                    result: None,
2135                    error: None,
2136                },
2137            )
2138            .await
2139            .expect("append partial invocation");
2140        storage
2141            .append_message_part(
2142                &session_id,
2143                &message_id,
2144                MessagePart::ToolInvocation {
2145                    tool: "write".to_string(),
2146                    args: json!({"path":"notes/report.md","content":"# Report\n"}),
2147                    result: Some(json!("ok")),
2148                    error: None,
2149                },
2150            )
2151            .await
2152            .expect("append structured result");
2153
2154        let session = storage.get_session(&session_id).await.expect("session");
2155        let message = session
2156            .messages
2157            .iter()
2158            .find(|message| message.id == message_id)
2159            .expect("message");
2160        assert_eq!(message.parts.len(), 2);
2161        match &message.parts[1] {
2162            MessagePart::ToolInvocation {
2163                tool,
2164                args,
2165                result,
2166                error,
2167            } => {
2168                assert_eq!(tool, "write");
2169                assert_eq!(args["path"], "notes/report.md");
2170                assert_eq!(args["content"], "# Report\n");
2171                assert_eq!(result.as_ref(), Some(&json!("ok")));
2172                assert_eq!(error.as_deref(), None);
2173            }
2174            other => panic!("expected tool part, got {other:?}"),
2175        }
2176    }
2177
2178    #[tokio::test]
2179    async fn append_message_part_prefers_executed_write_args_with_context_over_pending_raw_args() {
2180        let base = std::env::temp_dir().join(format!(
2181            "tandem-core-tool-executed-args-preferred-{}",
2182            Uuid::new_v4()
2183        ));
2184        let storage = Storage::new(&base).await.expect("storage");
2185        let session = Session::new(
2186            Some("tool executed args preferred".to_string()),
2187            Some(".".to_string()),
2188        );
2189        let session_id = session.id.clone();
2190        storage.save_session(session).await.expect("save session");
2191
2192        let user = Message::new(
2193            MessageRole::User,
2194            vec![MessagePart::Text {
2195                text: "build report".to_string(),
2196            }],
2197        );
2198        let message_id = user.id.clone();
2199        storage
2200            .append_message(&session_id, user)
2201            .await
2202            .expect("append user");
2203
2204        storage
2205            .append_message_part(
2206                &session_id,
2207                &message_id,
2208                MessagePart::ToolInvocation {
2209                    tool: "write".to_string(),
2210                    args: json!({"path":".","content":"draft"}),
2211                    result: None,
2212                    error: None,
2213                },
2214            )
2215            .await
2216            .expect("append raw pending invocation");
2217        storage
2218            .append_message_part(
2219                &session_id,
2220                &message_id,
2221                MessagePart::ToolInvocation {
2222                    tool: "write".to_string(),
2223                    args: json!({
2224                        "path":".tandem/runs/run-123/artifacts/research-sources.json",
2225                        "content":"draft",
2226                        "__workspace_root":"/home/user/marketing-tandem",
2227                        "__effective_cwd":"/home/user/marketing-tandem"
2228                    }),
2229                    result: Some(json!("ok")),
2230                    error: None,
2231                },
2232            )
2233            .await
2234            .expect("append executed result");
2235
2236        let session = storage.get_session(&session_id).await.expect("session");
2237        let message = session
2238            .messages
2239            .iter()
2240            .find(|message| message.id == message_id)
2241            .expect("message");
2242        assert_eq!(message.parts.len(), 2);
2243        match &message.parts[1] {
2244            MessagePart::ToolInvocation {
2245                tool,
2246                args,
2247                result,
2248                error,
2249            } => {
2250                assert_eq!(tool, "write");
2251                assert_eq!(
2252                    args["path"],
2253                    ".tandem/runs/run-123/artifacts/research-sources.json"
2254                );
2255                assert_eq!(args["content"], "draft");
2256                assert_eq!(args["__workspace_root"], "/home/user/marketing-tandem");
2257                assert_eq!(result.as_ref(), Some(&json!("ok")));
2258                assert_eq!(error.as_deref(), None);
2259            }
2260            other => panic!("expected tool part, got {other:?}"),
2261        }
2262    }
2263
2264    #[tokio::test]
2265    async fn append_message_part_falls_back_to_latest_user_message_when_id_missing() {
2266        let base =
2267            std::env::temp_dir().join(format!("tandem-core-tool-fallback-{}", Uuid::new_v4()));
2268        let storage = Storage::new(&base).await.expect("storage");
2269        let session = Session::new(Some("tool fallback".to_string()), Some(".".to_string()));
2270        let session_id = session.id.clone();
2271        storage.save_session(session).await.expect("save session");
2272
2273        let first = Message::new(
2274            MessageRole::User,
2275            vec![MessagePart::Text {
2276                text: "first prompt".to_string(),
2277            }],
2278        );
2279        let second = Message::new(
2280            MessageRole::User,
2281            vec![MessagePart::Text {
2282                text: "second prompt".to_string(),
2283            }],
2284        );
2285        let second_id = second.id.clone();
2286        storage
2287            .append_message(&session_id, first)
2288            .await
2289            .expect("append first");
2290        storage
2291            .append_message(&session_id, second)
2292            .await
2293            .expect("append second");
2294
2295        storage
2296            .append_message_part(
2297                &session_id,
2298                "missing-message-id",
2299                MessagePart::ToolInvocation {
2300                    tool: "glob".to_string(),
2301                    args: json!({"pattern":"*"}),
2302                    result: Some(json!(["README.md"])),
2303                    error: None,
2304                },
2305            )
2306            .await
2307            .expect("append fallback tool part");
2308
2309        let session = storage.get_session(&session_id).await.expect("session");
2310        let message = session
2311            .messages
2312            .iter()
2313            .find(|message| message.id == second_id)
2314            .expect("latest user message");
2315        match &message.parts[1] {
2316            MessagePart::ToolInvocation { tool, result, .. } => {
2317                assert_eq!(tool, "glob");
2318                assert_eq!(result.as_ref(), Some(&json!(["README.md"])));
2319            }
2320            other => panic!("expected tool part, got {other:?}"),
2321        }
2322    }
2323
2324    #[tokio::test]
2325    async fn commit_temp_file_replaces_existing_destination() {
2326        let base =
2327            std::env::temp_dir().join(format!("tandem-core-commit-temp-file-{}", Uuid::new_v4()));
2328        stdfs::create_dir_all(&base).expect("base dir");
2329        let destination = base.join("sessions.json");
2330        let temp = base.join("sessions.json.tmp");
2331        stdfs::write(&destination, "{\"version\":\"old\"}").expect("write destination");
2332        stdfs::write(&temp, "{\"version\":\"new\"}").expect("write temp");
2333
2334        commit_temp_file(&temp, &destination)
2335            .await
2336            .expect("replace destination");
2337
2338        let raw = stdfs::read_to_string(&destination).expect("read destination");
2339        assert_eq!(raw, "{\"version\":\"new\"}");
2340        assert!(!temp.exists());
2341    }
2342
2343    #[tokio::test]
2344    async fn startup_compacts_session_snapshot_metadata() {
2345        let base = std::env::temp_dir().join(format!(
2346            "tandem-core-snapshot-compaction-{}",
2347            Uuid::new_v4()
2348        ));
2349        stdfs::create_dir_all(&base).expect("base dir");
2350
2351        let mut session = Session::new(
2352            Some("snapshot compaction".to_string()),
2353            Some(".".to_string()),
2354        );
2355        session.messages.push(Message::new(
2356            MessageRole::User,
2357            vec![MessagePart::Text {
2358                text: "current".to_string(),
2359            }],
2360        ));
2361        let session_id = session.id.clone();
2362
2363        let mut sessions = HashMap::new();
2364        sessions.insert(session_id.clone(), session);
2365        stdfs::write(
2366            base.join("sessions.json"),
2367            serde_json::to_string_pretty(&sessions).expect("serialize sessions"),
2368        )
2369        .expect("write sessions");
2370
2371        let mut snapshots = Vec::new();
2372        for label in ["a", "a", "b", "c", "d", "e", "f"] {
2373            snapshots.push(vec![Message::new(
2374                MessageRole::User,
2375                vec![MessagePart::Text {
2376                    text: label.to_string(),
2377                }],
2378            )]);
2379        }
2380        let mut metadata = HashMap::new();
2381        metadata.insert(
2382            session_id.clone(),
2383            SessionMeta {
2384                snapshots,
2385                ..SessionMeta::default()
2386            },
2387        );
2388        metadata.insert("orphan".to_string(), SessionMeta::default());
2389        stdfs::write(
2390            base.join("session_meta.json"),
2391            serde_json::to_string_pretty(&metadata).expect("serialize metadata"),
2392        )
2393        .expect("write metadata");
2394        stdfs::write(base.join("questions.json"), "{}").expect("write questions");
2395
2396        let _storage = Storage::new(&base).await.expect("storage");
2397
2398        let raw = stdfs::read_to_string(base.join("session_meta.json")).expect("read metadata");
2399        let stored: HashMap<String, SessionMeta> =
2400            serde_json::from_str(&raw).expect("parse metadata");
2401        assert_eq!(stored.len(), 1);
2402        let compacted = stored.get(&session_id).expect("session metadata");
2403        assert_eq!(compacted.snapshots.len(), MAX_SESSION_SNAPSHOTS);
2404
2405        let labels = compacted
2406            .snapshots
2407            .iter()
2408            .map(|snapshot| {
2409                snapshot[0]
2410                    .parts
2411                    .iter()
2412                    .find_map(|part| match part {
2413                        MessagePart::Text { text } => Some(text.clone()),
2414                        _ => None,
2415                    })
2416                    .expect("snapshot text")
2417            })
2418            .collect::<Vec<_>>();
2419        assert_eq!(labels, vec!["b", "c", "d", "e", "f"]);
2420    }
2421
2422    #[tokio::test]
2423    async fn startup_repairs_placeholder_titles_from_wrapped_user_messages() {
2424        let base =
2425            std::env::temp_dir().join(format!("tandem-core-title-repair-{}", Uuid::new_v4()));
2426        let storage = Storage::new(&base).await.expect("storage");
2427        let wrapped = "<memory_context>\n<current_session>\n- fact\n</current_session>\n</memory_context>\n\n[Mode instructions]\nUse tools.\n\n[User request]\nExplain this bug";
2428        let mut session = Session::new(Some("<memory_context>".to_string()), Some(".".to_string()));
2429        let id = session.id.clone();
2430        session.messages.push(Message::new(
2431            MessageRole::User,
2432            vec![MessagePart::Text {
2433                text: wrapped.to_string(),
2434            }],
2435        ));
2436        storage.save_session(session).await.expect("save");
2437        drop(storage);
2438
2439        let storage = Storage::new(&base).await.expect("storage");
2440        let repaired = storage.get_session(&id).await.expect("session");
2441        assert_eq!(repaired.title, "Explain this bug");
2442    }
2443
2444    #[tokio::test]
2445    async fn concurrent_storage_flushes_do_not_fail() {
2446        let base = std::env::temp_dir().join(format!("tandem-core-flush-race-{}", Uuid::new_v4()));
2447        let storage = Arc::new(Storage::new(&base).await.expect("storage"));
2448        let session = Session::new(Some("flush race".to_string()), Some(".".to_string()));
2449        let session_id = session.id.clone();
2450        storage.save_session(session).await.expect("save session");
2451
2452        let mut tasks = Vec::new();
2453        for task_index in 0..12 {
2454            let storage = Arc::clone(&storage);
2455            let session_id = session_id.clone();
2456            tasks.push(tokio::spawn(async move {
2457                for part_index in 0..8 {
2458                    let message = Message::new(
2459                        MessageRole::User,
2460                        vec![MessagePart::Text {
2461                            text: format!("task {task_index} part {part_index}"),
2462                        }],
2463                    );
2464                    storage
2465                        .append_message(&session_id, message)
2466                        .await
2467                        .expect("append message");
2468                }
2469            }));
2470        }
2471
2472        for task in tasks {
2473            task.await.expect("join task");
2474        }
2475
2476        let session = storage.get_session(&session_id).await.expect("session");
2477        assert_eq!(session.messages.len(), 12 * 8);
2478        assert!(base.join("sessions.json").exists());
2479    }
2480}