Skip to main content

tandem_core/
storage.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use anyhow::Context;
5use chrono::{TimeZone, Utc};
6use serde::{Deserialize, Serialize};
7use serde_json::{json, Value};
8use tokio::fs;
9use tokio::sync::RwLock;
10use tokio::task;
11use uuid::Uuid;
12
13use tandem_types::{Message, MessagePart, MessageRole, Session};
14
15use crate::{
16    derive_session_title_from_prompt, normalize_workspace_path, title_needs_repair,
17    workspace_project_id,
18};
19
20#[derive(Debug, Clone, Serialize, Deserialize, Default)]
21pub struct SessionMeta {
22    pub parent_id: Option<String>,
23    #[serde(default)]
24    pub archived: bool,
25    #[serde(default)]
26    pub shared: bool,
27    pub share_id: Option<String>,
28    pub summary: Option<String>,
29    #[serde(default)]
30    pub snapshots: Vec<Vec<Message>>,
31    pub pre_revert: Option<Vec<Message>>,
32    #[serde(default)]
33    pub todos: Vec<Value>,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct QuestionToolRef {
38    #[serde(rename = "callID")]
39    pub call_id: String,
40    #[serde(rename = "messageID")]
41    pub message_id: String,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct QuestionRequest {
46    pub id: String,
47    #[serde(rename = "sessionID")]
48    pub session_id: String,
49    #[serde(default)]
50    pub questions: Vec<Value>,
51    #[serde(skip_serializing_if = "Option::is_none")]
52    pub tool: Option<QuestionToolRef>,
53}
54
55pub struct Storage {
56    base: PathBuf,
57    sessions: RwLock<HashMap<String, Session>>,
58    metadata: RwLock<HashMap<String, SessionMeta>>,
59    question_requests: RwLock<HashMap<String, QuestionRequest>>,
60}
61
62#[derive(Debug, Clone)]
63pub enum SessionListScope {
64    Global,
65    Workspace { workspace_root: String },
66}
67
68#[derive(Debug, Clone, Default, Serialize, Deserialize)]
69pub struct SessionRepairStats {
70    pub sessions_repaired: u64,
71    pub messages_recovered: u64,
72    pub parts_recovered: u64,
73    pub conflicts_merged: u64,
74}
75
76const LEGACY_IMPORT_MARKER_FILE: &str = "legacy_import_marker.json";
77const LEGACY_IMPORT_MARKER_VERSION: u32 = 1;
78
79#[derive(Debug, Clone, Default, Serialize, Deserialize)]
80pub struct LegacyTreeCounts {
81    pub session_files: u64,
82    pub message_files: u64,
83    pub part_files: u64,
84}
85
86#[derive(Debug, Clone, Default, Serialize, Deserialize)]
87pub struct LegacyImportedCounts {
88    pub sessions: u64,
89    pub messages: u64,
90    pub parts: u64,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct LegacyImportMarker {
95    pub version: u32,
96    pub created_at_ms: u64,
97    pub last_checked_at_ms: u64,
98    pub legacy_counts: LegacyTreeCounts,
99    pub imported_counts: LegacyImportedCounts,
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct LegacyRepairRunReport {
104    pub status: String,
105    pub marker_updated: bool,
106    pub sessions_merged: u64,
107    pub messages_recovered: u64,
108    pub parts_recovered: u64,
109    pub legacy_counts: LegacyTreeCounts,
110    pub imported_counts: LegacyImportedCounts,
111}
112
113fn snapshot_session_messages(
114    session_id: &str,
115    session: &Session,
116    metadata: &mut HashMap<String, SessionMeta>,
117) {
118    let meta = metadata
119        .entry(session_id.to_string())
120        .or_insert_with(SessionMeta::default);
121    meta.snapshots.push(session.messages.clone());
122    if meta.snapshots.len() > 25 {
123        let _ = meta.snapshots.remove(0);
124    }
125}
126
127fn merge_message_part(message: &mut Message, part: MessagePart) {
128    match part {
129        MessagePart::ToolInvocation {
130            tool,
131            args,
132            result,
133            error,
134        } => {
135            let args_are_empty =
136                args.is_null() || args.as_object().is_some_and(|value| value.is_empty());
137            if result.is_none() && error.is_none() {
138                if let Some(existing) = message.parts.iter_mut().rev().find(|existing| {
139                    matches!(
140                        existing,
141                        MessagePart::ToolInvocation {
142                            tool: existing_tool,
143                            result: None,
144                            error: None,
145                            ..
146                        } if existing_tool == &tool
147                    )
148                }) {
149                    if let MessagePart::ToolInvocation {
150                        args: existing_args,
151                        ..
152                    } = existing
153                    {
154                        let existing_args_are_empty = existing_args.is_null()
155                            || existing_args
156                                .as_object()
157                                .is_some_and(|value| value.is_empty());
158                        if !args_are_empty || existing_args_are_empty {
159                            *existing_args = args;
160                        }
161                        return;
162                    }
163                }
164            }
165            if result.is_some() || error.is_some() {
166                if let Some(existing) = message.parts.iter_mut().rev().find(|existing| {
167                    matches!(
168                        existing,
169                        MessagePart::ToolInvocation {
170                            tool: existing_tool,
171                            result: None,
172                            error: None,
173                            ..
174                        } if existing_tool == &tool
175                    )
176                }) {
177                    if let MessagePart::ToolInvocation {
178                        args: existing_args,
179                        result: existing_result,
180                        error: existing_error,
181                        ..
182                    } = existing
183                    {
184                        let existing_args_are_empty = existing_args.is_null()
185                            || existing_args
186                                .as_object()
187                                .is_some_and(|value| value.is_empty());
188                        if existing_args_are_empty {
189                            if tool == "write" && args_are_empty {
190                                tracing::info!(
191                                    tool = %tool,
192                                    "merging write result/error into existing tool part with empty args"
193                                );
194                            }
195                            *existing_args = args.clone();
196                        }
197                        *existing_result = result;
198                        *existing_error = error;
199                        return;
200                    }
201                }
202            }
203            message.parts.push(MessagePart::ToolInvocation {
204                tool,
205                args,
206                result,
207                error,
208            });
209        }
210        other => message.parts.push(other),
211    }
212}
213
214impl Storage {
215    pub async fn new(base: impl AsRef<Path>) -> anyhow::Result<Self> {
216        let base = base.as_ref().to_path_buf();
217        fs::create_dir_all(&base).await?;
218        let sessions_file = base.join("sessions.json");
219        let marker_path = base.join(LEGACY_IMPORT_MARKER_FILE);
220        let sessions_file_exists = sessions_file.exists();
221        let mut imported_legacy_sessions = false;
222        let mut sessions = if sessions_file_exists {
223            let raw = fs::read_to_string(&sessions_file).await?;
224            serde_json::from_str::<HashMap<String, Session>>(&raw).unwrap_or_default()
225        } else {
226            HashMap::new()
227        };
228
229        let mut marker_to_write = None;
230        if should_run_legacy_scan_on_startup(&marker_path, sessions_file_exists).await {
231            let base_for_scan = base.clone();
232            let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
233                .await
234                .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
235            if merge_legacy_sessions(&mut sessions, scan.sessions) {
236                imported_legacy_sessions = true;
237            }
238            marker_to_write = Some(LegacyImportMarker {
239                version: LEGACY_IMPORT_MARKER_VERSION,
240                created_at_ms: now_ms_u64(),
241                last_checked_at_ms: now_ms_u64(),
242                legacy_counts: scan.legacy_counts,
243                imported_counts: scan.imported_counts,
244            });
245        }
246
247        if hydrate_workspace_roots(&mut sessions) {
248            imported_legacy_sessions = true;
249        }
250        if repair_session_titles(&mut sessions) {
251            imported_legacy_sessions = true;
252        }
253        let metadata_file = base.join("session_meta.json");
254        let metadata = if metadata_file.exists() {
255            let raw = fs::read_to_string(&metadata_file).await?;
256            serde_json::from_str::<HashMap<String, SessionMeta>>(&raw).unwrap_or_default()
257        } else {
258            HashMap::new()
259        };
260        let questions_file = base.join("questions.json");
261        let question_requests = if questions_file.exists() {
262            let raw = fs::read_to_string(&questions_file).await?;
263            serde_json::from_str::<HashMap<String, QuestionRequest>>(&raw).unwrap_or_default()
264        } else {
265            HashMap::new()
266        };
267        let storage = Self {
268            base,
269            sessions: RwLock::new(sessions),
270            metadata: RwLock::new(metadata),
271            question_requests: RwLock::new(question_requests),
272        };
273
274        if imported_legacy_sessions {
275            storage.flush().await?;
276        }
277        if let Some(marker) = marker_to_write {
278            storage.write_legacy_import_marker(&marker).await?;
279        }
280
281        Ok(storage)
282    }
283
284    pub async fn list_sessions(&self) -> Vec<Session> {
285        self.list_sessions_scoped(SessionListScope::Global).await
286    }
287
288    pub async fn list_sessions_scoped(&self, scope: SessionListScope) -> Vec<Session> {
289        let all = self
290            .sessions
291            .read()
292            .await
293            .values()
294            .cloned()
295            .collect::<Vec<_>>();
296        match scope {
297            SessionListScope::Global => all,
298            SessionListScope::Workspace { workspace_root } => {
299                let Some(normalized_workspace) = normalize_workspace_path(&workspace_root) else {
300                    return Vec::new();
301                };
302                all.into_iter()
303                    .filter(|session| {
304                        let direct = session
305                            .workspace_root
306                            .as_ref()
307                            .and_then(|p| normalize_workspace_path(p))
308                            .map(|p| p == normalized_workspace)
309                            .unwrap_or(false);
310                        if direct {
311                            return true;
312                        }
313                        normalize_workspace_path(&session.directory)
314                            .map(|p| p == normalized_workspace)
315                            .unwrap_or(false)
316                    })
317                    .collect()
318            }
319        }
320    }
321
322    pub async fn get_session(&self, id: &str) -> Option<Session> {
323        self.sessions.read().await.get(id).cloned()
324    }
325
326    pub async fn save_session(&self, mut session: Session) -> anyhow::Result<()> {
327        if session.workspace_root.is_none() {
328            session.workspace_root = normalize_workspace_path(&session.directory);
329        }
330        let session_id = session.id.clone();
331        self.sessions
332            .write()
333            .await
334            .insert(session_id.clone(), session);
335        self.metadata
336            .write()
337            .await
338            .entry(session_id)
339            .or_insert_with(SessionMeta::default);
340        self.flush().await
341    }
342
343    pub async fn repair_sessions_from_file_store(&self) -> anyhow::Result<SessionRepairStats> {
344        let mut stats = SessionRepairStats::default();
345        let mut sessions = self.sessions.write().await;
346
347        for session in sessions.values_mut() {
348            let imported = load_legacy_session_messages(&self.base, &session.id);
349            if imported.is_empty() {
350                continue;
351            }
352
353            let (merged, merge_stats, changed) =
354                merge_session_messages(&session.messages, &imported);
355            if changed {
356                session.messages = merged;
357                session.time.updated =
358                    most_recent_message_time(&session.messages).unwrap_or(session.time.updated);
359                stats.sessions_repaired += 1;
360                stats.messages_recovered += merge_stats.messages_recovered;
361                stats.parts_recovered += merge_stats.parts_recovered;
362                stats.conflicts_merged += merge_stats.conflicts_merged;
363            }
364        }
365
366        if stats.sessions_repaired > 0 {
367            drop(sessions);
368            self.flush().await?;
369        }
370
371        Ok(stats)
372    }
373
374    pub async fn run_legacy_storage_repair_scan(
375        &self,
376        force: bool,
377    ) -> anyhow::Result<LegacyRepairRunReport> {
378        let marker_path = self.base.join(LEGACY_IMPORT_MARKER_FILE);
379        let sessions_exists = self.base.join("sessions.json").exists();
380        let should_scan = if force {
381            true
382        } else {
383            should_run_legacy_scan_on_startup(&marker_path, sessions_exists).await
384        };
385        if !should_scan {
386            let marker = read_legacy_import_marker(&marker_path)
387                .await
388                .unwrap_or_else(|| LegacyImportMarker {
389                    version: LEGACY_IMPORT_MARKER_VERSION,
390                    created_at_ms: now_ms_u64(),
391                    last_checked_at_ms: now_ms_u64(),
392                    legacy_counts: LegacyTreeCounts::default(),
393                    imported_counts: LegacyImportedCounts::default(),
394                });
395            return Ok(LegacyRepairRunReport {
396                status: "skipped".to_string(),
397                marker_updated: false,
398                sessions_merged: 0,
399                messages_recovered: 0,
400                parts_recovered: 0,
401                legacy_counts: marker.legacy_counts,
402                imported_counts: marker.imported_counts,
403            });
404        }
405
406        let base_for_scan = self.base.clone();
407        let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
408            .await
409            .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
410
411        let merge_stats = {
412            let mut sessions = self.sessions.write().await;
413            merge_legacy_sessions_with_stats(&mut sessions, scan.sessions)
414        };
415
416        if merge_stats.changed {
417            self.flush().await?;
418        }
419
420        let marker = LegacyImportMarker {
421            version: LEGACY_IMPORT_MARKER_VERSION,
422            created_at_ms: now_ms_u64(),
423            last_checked_at_ms: now_ms_u64(),
424            legacy_counts: scan.legacy_counts.clone(),
425            imported_counts: scan.imported_counts.clone(),
426        };
427        self.write_legacy_import_marker(&marker).await?;
428
429        Ok(LegacyRepairRunReport {
430            status: if merge_stats.changed {
431                "updated".to_string()
432            } else {
433                "no_changes".to_string()
434            },
435            marker_updated: true,
436            sessions_merged: merge_stats.sessions_merged,
437            messages_recovered: merge_stats.messages_recovered,
438            parts_recovered: merge_stats.parts_recovered,
439            legacy_counts: scan.legacy_counts,
440            imported_counts: scan.imported_counts,
441        })
442    }
443
444    pub async fn delete_session(&self, id: &str) -> anyhow::Result<bool> {
445        let removed = self.sessions.write().await.remove(id).is_some();
446        self.metadata.write().await.remove(id);
447        self.question_requests
448            .write()
449            .await
450            .retain(|_, request| request.session_id != id);
451        if removed {
452            self.flush().await?;
453        }
454        Ok(removed)
455    }
456
457    pub async fn append_message(&self, session_id: &str, msg: Message) -> anyhow::Result<()> {
458        let mut sessions = self.sessions.write().await;
459        let session = sessions
460            .get_mut(session_id)
461            .context("session not found for append_message")?;
462        let mut meta_guard = self.metadata.write().await;
463        snapshot_session_messages(session_id, session, &mut meta_guard);
464        session.messages.push(msg);
465        session.time.updated = Utc::now();
466        drop(sessions);
467        drop(meta_guard);
468        self.flush().await
469    }
470
471    pub async fn append_message_part(
472        &self,
473        session_id: &str,
474        message_id: &str,
475        part: MessagePart,
476    ) -> anyhow::Result<()> {
477        let mut sessions = self.sessions.write().await;
478        let session = sessions
479            .get_mut(session_id)
480            .context("session not found for append_message_part")?;
481        let mut meta_guard = self.metadata.write().await;
482        snapshot_session_messages(session_id, session, &mut meta_guard);
483        let message = if let Some(message) = session
484            .messages
485            .iter_mut()
486            .find(|message| message.id == message_id)
487        {
488            message
489        } else {
490            session
491                .messages
492                .iter_mut()
493                .rev()
494                .find(|message| matches!(message.role, MessageRole::User))
495                .context("message not found for append_message_part")?
496        };
497        merge_message_part(message, part);
498        session.time.updated = Utc::now();
499        drop(sessions);
500        drop(meta_guard);
501        self.flush().await
502    }
503
504    pub async fn fork_session(&self, id: &str) -> anyhow::Result<Option<Session>> {
505        let source = {
506            let sessions = self.sessions.read().await;
507            sessions.get(id).cloned()
508        };
509        let Some(mut child) = source else {
510            return Ok(None);
511        };
512
513        child.id = Uuid::new_v4().to_string();
514        child.title = format!("{} (fork)", child.title);
515        child.time.created = Utc::now();
516        child.time.updated = child.time.created;
517        child.slug = None;
518
519        self.sessions
520            .write()
521            .await
522            .insert(child.id.clone(), child.clone());
523        self.metadata.write().await.insert(
524            child.id.clone(),
525            SessionMeta {
526                parent_id: Some(id.to_string()),
527                snapshots: vec![child.messages.clone()],
528                ..SessionMeta::default()
529            },
530        );
531        self.flush().await?;
532        Ok(Some(child))
533    }
534
535    pub async fn revert_session(&self, id: &str) -> anyhow::Result<bool> {
536        let mut sessions = self.sessions.write().await;
537        let Some(session) = sessions.get_mut(id) else {
538            return Ok(false);
539        };
540        let mut metadata = self.metadata.write().await;
541        let meta = metadata
542            .entry(id.to_string())
543            .or_insert_with(SessionMeta::default);
544        let Some(snapshot) = meta.snapshots.pop() else {
545            return Ok(false);
546        };
547        meta.pre_revert = Some(session.messages.clone());
548        session.messages = snapshot;
549        session.time.updated = Utc::now();
550        drop(metadata);
551        drop(sessions);
552        self.flush().await?;
553        Ok(true)
554    }
555
556    pub async fn unrevert_session(&self, id: &str) -> anyhow::Result<bool> {
557        let mut sessions = self.sessions.write().await;
558        let Some(session) = sessions.get_mut(id) else {
559            return Ok(false);
560        };
561        let mut metadata = self.metadata.write().await;
562        let Some(meta) = metadata.get_mut(id) else {
563            return Ok(false);
564        };
565        let Some(previous) = meta.pre_revert.take() else {
566            return Ok(false);
567        };
568        meta.snapshots.push(session.messages.clone());
569        session.messages = previous;
570        session.time.updated = Utc::now();
571        drop(metadata);
572        drop(sessions);
573        self.flush().await?;
574        Ok(true)
575    }
576
577    pub async fn set_shared(&self, id: &str, shared: bool) -> anyhow::Result<Option<String>> {
578        let mut metadata = self.metadata.write().await;
579        let meta = metadata
580            .entry(id.to_string())
581            .or_insert_with(SessionMeta::default);
582        meta.shared = shared;
583        if shared {
584            if meta.share_id.is_none() {
585                meta.share_id = Some(Uuid::new_v4().to_string());
586            }
587        } else {
588            meta.share_id = None;
589        }
590        let share_id = meta.share_id.clone();
591        drop(metadata);
592        self.flush().await?;
593        Ok(share_id)
594    }
595
596    pub async fn set_archived(&self, id: &str, archived: bool) -> anyhow::Result<bool> {
597        let mut metadata = self.metadata.write().await;
598        let meta = metadata
599            .entry(id.to_string())
600            .or_insert_with(SessionMeta::default);
601        meta.archived = archived;
602        drop(metadata);
603        self.flush().await?;
604        Ok(true)
605    }
606
607    pub async fn set_summary(&self, id: &str, summary: String) -> anyhow::Result<bool> {
608        let mut metadata = self.metadata.write().await;
609        let meta = metadata
610            .entry(id.to_string())
611            .or_insert_with(SessionMeta::default);
612        meta.summary = Some(summary);
613        drop(metadata);
614        self.flush().await?;
615        Ok(true)
616    }
617
618    pub async fn children(&self, parent_id: &str) -> Vec<Session> {
619        let child_ids = {
620            let metadata = self.metadata.read().await;
621            metadata
622                .iter()
623                .filter(|(_, meta)| meta.parent_id.as_deref() == Some(parent_id))
624                .map(|(id, _)| id.clone())
625                .collect::<Vec<_>>()
626        };
627        let sessions = self.sessions.read().await;
628        child_ids
629            .into_iter()
630            .filter_map(|id| sessions.get(&id).cloned())
631            .collect()
632    }
633
634    pub async fn session_status(&self, id: &str) -> Option<Value> {
635        let metadata = self.metadata.read().await;
636        metadata.get(id).map(|meta| {
637            json!({
638                "archived": meta.archived,
639                "shared": meta.shared,
640                "parentID": meta.parent_id,
641                "snapshotCount": meta.snapshots.len()
642            })
643        })
644    }
645
646    pub async fn session_diff(&self, id: &str) -> Option<Value> {
647        let sessions = self.sessions.read().await;
648        let current = sessions.get(id)?;
649        let metadata = self.metadata.read().await;
650        let default = SessionMeta::default();
651        let meta = metadata.get(id).unwrap_or(&default);
652        let last_snapshot_len = meta.snapshots.last().map(|s| s.len()).unwrap_or(0);
653        Some(json!({
654            "sessionID": id,
655            "currentMessageCount": current.messages.len(),
656            "lastSnapshotMessageCount": last_snapshot_len,
657            "delta": current.messages.len() as i64 - last_snapshot_len as i64
658        }))
659    }
660
661    pub async fn set_todos(&self, id: &str, todos: Vec<Value>) -> anyhow::Result<()> {
662        let mut metadata = self.metadata.write().await;
663        let meta = metadata
664            .entry(id.to_string())
665            .or_insert_with(SessionMeta::default);
666        meta.todos = normalize_todo_items(todos);
667        drop(metadata);
668        self.flush().await
669    }
670
671    pub async fn get_todos(&self, id: &str) -> Vec<Value> {
672        let todos = self
673            .metadata
674            .read()
675            .await
676            .get(id)
677            .map(|meta| meta.todos.clone())
678            .unwrap_or_default();
679        normalize_todo_items(todos)
680    }
681
682    pub async fn add_question_request(
683        &self,
684        session_id: &str,
685        message_id: &str,
686        questions: Vec<Value>,
687    ) -> anyhow::Result<QuestionRequest> {
688        if questions.is_empty() {
689            return Err(anyhow::anyhow!(
690                "cannot add empty question request for session {}",
691                session_id
692            ));
693        }
694        let request = QuestionRequest {
695            id: format!("q-{}", Uuid::new_v4()),
696            session_id: session_id.to_string(),
697            questions,
698            tool: Some(QuestionToolRef {
699                call_id: format!("call-{}", Uuid::new_v4()),
700                message_id: message_id.to_string(),
701            }),
702        };
703        self.question_requests
704            .write()
705            .await
706            .insert(request.id.clone(), request.clone());
707        self.flush().await?;
708        Ok(request)
709    }
710
711    pub async fn list_question_requests(&self) -> Vec<QuestionRequest> {
712        self.question_requests
713            .read()
714            .await
715            .values()
716            .cloned()
717            .collect()
718    }
719
720    pub async fn reply_question(&self, request_id: &str) -> anyhow::Result<bool> {
721        let removed = self
722            .question_requests
723            .write()
724            .await
725            .remove(request_id)
726            .is_some();
727        if removed {
728            self.flush().await?;
729        }
730        Ok(removed)
731    }
732
733    pub async fn reject_question(&self, request_id: &str) -> anyhow::Result<bool> {
734        self.reply_question(request_id).await
735    }
736
737    pub async fn attach_session_to_workspace(
738        &self,
739        session_id: &str,
740        target_workspace: &str,
741        reason_tag: &str,
742    ) -> anyhow::Result<Option<Session>> {
743        let Some(target_workspace) = normalize_workspace_path(target_workspace) else {
744            return Ok(None);
745        };
746        let mut sessions = self.sessions.write().await;
747        let Some(session) = sessions.get_mut(session_id) else {
748            return Ok(None);
749        };
750        let previous_workspace = session
751            .workspace_root
752            .clone()
753            .or_else(|| normalize_workspace_path(&session.directory));
754
755        if session.origin_workspace_root.is_none() {
756            session.origin_workspace_root = previous_workspace.clone();
757        }
758        session.attached_from_workspace = previous_workspace;
759        session.attached_to_workspace = Some(target_workspace.clone());
760        session.attach_timestamp_ms = Some(Utc::now().timestamp_millis().max(0) as u64);
761        session.attach_reason = Some(reason_tag.trim().to_string());
762        session.workspace_root = Some(target_workspace.clone());
763        session.project_id = workspace_project_id(&target_workspace);
764        session.directory = target_workspace;
765        session.time.updated = Utc::now();
766        let updated = session.clone();
767        drop(sessions);
768        self.flush().await?;
769        Ok(Some(updated))
770    }
771
772    async fn flush(&self) -> anyhow::Result<()> {
773        let snapshot = self.sessions.read().await.clone();
774        let payload = serde_json::to_string_pretty(&snapshot)?;
775        fs::write(self.base.join("sessions.json"), payload).await?;
776        let metadata_snapshot = self.metadata.read().await.clone();
777        let metadata_payload = serde_json::to_string_pretty(&metadata_snapshot)?;
778        fs::write(self.base.join("session_meta.json"), metadata_payload).await?;
779        let questions_snapshot = self.question_requests.read().await.clone();
780        let questions_payload = serde_json::to_string_pretty(&questions_snapshot)?;
781        fs::write(self.base.join("questions.json"), questions_payload).await?;
782        Ok(())
783    }
784
785    async fn write_legacy_import_marker(&self, marker: &LegacyImportMarker) -> anyhow::Result<()> {
786        let payload = serde_json::to_string_pretty(marker)?;
787        fs::write(self.base.join(LEGACY_IMPORT_MARKER_FILE), payload).await?;
788        Ok(())
789    }
790}
791
792fn normalize_todo_items(items: Vec<Value>) -> Vec<Value> {
793    items
794        .into_iter()
795        .filter_map(|item| {
796            let obj = item.as_object()?;
797            let content = obj
798                .get("content")
799                .and_then(|v| v.as_str())
800                .or_else(|| obj.get("text").and_then(|v| v.as_str()))
801                .unwrap_or("")
802                .trim()
803                .to_string();
804            if content.is_empty() {
805                return None;
806            }
807            let id = obj
808                .get("id")
809                .and_then(|v| v.as_str())
810                .filter(|s| !s.trim().is_empty())
811                .map(ToString::to_string)
812                .unwrap_or_else(|| format!("todo-{}", Uuid::new_v4()));
813            let status = obj
814                .get("status")
815                .and_then(|v| v.as_str())
816                .filter(|s| !s.trim().is_empty())
817                .map(ToString::to_string)
818                .unwrap_or_else(|| "pending".to_string());
819            Some(json!({
820                "id": id,
821                "content": content,
822                "status": status
823            }))
824        })
825        .collect()
826}
827
828#[derive(Debug)]
829struct LegacyScanResult {
830    sessions: HashMap<String, Session>,
831    legacy_counts: LegacyTreeCounts,
832    imported_counts: LegacyImportedCounts,
833}
834
835#[derive(Debug, Default)]
836struct LegacyMergeStats {
837    changed: bool,
838    sessions_merged: u64,
839    messages_recovered: u64,
840    parts_recovered: u64,
841}
842
843fn now_ms_u64() -> u64 {
844    Utc::now().timestamp_millis().max(0) as u64
845}
846
847async fn should_run_legacy_scan_on_startup(marker_path: &Path, sessions_exist: bool) -> bool {
848    if !sessions_exist {
849        return true;
850    }
851    // Fast-path startup: if canonical sessions already exist, do not block startup
852    // on deep legacy tree scans. Users can trigger an explicit repair scan later.
853    if read_legacy_import_marker(marker_path).await.is_none() {
854        return false;
855    }
856    false
857}
858
859async fn read_legacy_import_marker(marker_path: &Path) -> Option<LegacyImportMarker> {
860    let raw = fs::read_to_string(marker_path).await.ok()?;
861    serde_json::from_str::<LegacyImportMarker>(&raw).ok()
862}
863
864fn scan_legacy_sessions(base: &Path) -> anyhow::Result<LegacyScanResult> {
865    let sessions = load_legacy_opencode_sessions(base).unwrap_or_default();
866    let imported_counts = LegacyImportedCounts {
867        sessions: sessions.len() as u64,
868        messages: sessions.values().map(|s| s.messages.len() as u64).sum(),
869        parts: sessions
870            .values()
871            .flat_map(|s| s.messages.iter())
872            .map(|m| m.parts.len() as u64)
873            .sum(),
874    };
875    let legacy_counts = LegacyTreeCounts {
876        session_files: count_legacy_json_files(&base.join("session")),
877        message_files: count_legacy_json_files(&base.join("message")),
878        part_files: count_legacy_json_files(&base.join("part")),
879    };
880    Ok(LegacyScanResult {
881        sessions,
882        legacy_counts,
883        imported_counts,
884    })
885}
886
887fn count_legacy_json_files(root: &Path) -> u64 {
888    if !root.is_dir() {
889        return 0;
890    }
891    let mut count = 0u64;
892    let mut stack = vec![root.to_path_buf()];
893    while let Some(dir) = stack.pop() {
894        if let Ok(entries) = std::fs::read_dir(&dir) {
895            for entry in entries.flatten() {
896                let path = entry.path();
897                if entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
898                    stack.push(path);
899                    continue;
900                }
901                if path.extension().and_then(|ext| ext.to_str()) == Some("json") {
902                    count += 1;
903                }
904            }
905        }
906    }
907    count
908}
909
910fn merge_legacy_sessions(
911    current: &mut HashMap<String, Session>,
912    imported: HashMap<String, Session>,
913) -> bool {
914    merge_legacy_sessions_with_stats(current, imported).changed
915}
916
917fn merge_legacy_sessions_with_stats(
918    current: &mut HashMap<String, Session>,
919    imported: HashMap<String, Session>,
920) -> LegacyMergeStats {
921    let mut stats = LegacyMergeStats::default();
922    for (id, legacy) in imported {
923        let legacy_message_count = legacy.messages.len() as u64;
924        let legacy_part_count = legacy
925            .messages
926            .iter()
927            .map(|m| m.parts.len() as u64)
928            .sum::<u64>();
929        match current.get_mut(&id) {
930            None => {
931                current.insert(id, legacy);
932                stats.changed = true;
933                stats.sessions_merged += 1;
934                stats.messages_recovered += legacy_message_count;
935                stats.parts_recovered += legacy_part_count;
936            }
937            Some(existing) => {
938                let should_merge_messages =
939                    existing.messages.is_empty() && !legacy.messages.is_empty();
940                let should_fill_title =
941                    existing.title.trim().is_empty() && !legacy.title.trim().is_empty();
942                let should_fill_directory = (existing.directory.trim().is_empty()
943                    || existing.directory.trim() == "."
944                    || existing.directory.trim() == "./"
945                    || existing.directory.trim() == ".\\")
946                    && !legacy.directory.trim().is_empty();
947                let should_fill_workspace =
948                    existing.workspace_root.is_none() && legacy.workspace_root.is_some();
949                if should_merge_messages {
950                    existing.messages = legacy.messages.clone();
951                }
952                if should_fill_title {
953                    existing.title = legacy.title.clone();
954                }
955                if should_fill_directory {
956                    existing.directory = legacy.directory.clone();
957                }
958                if should_fill_workspace {
959                    existing.workspace_root = legacy.workspace_root.clone();
960                }
961                if should_merge_messages
962                    || should_fill_title
963                    || should_fill_directory
964                    || should_fill_workspace
965                {
966                    stats.changed = true;
967                    if should_merge_messages {
968                        stats.sessions_merged += 1;
969                        stats.messages_recovered += legacy_message_count;
970                        stats.parts_recovered += legacy_part_count;
971                    }
972                }
973            }
974        }
975    }
976    stats
977}
978
979fn hydrate_workspace_roots(sessions: &mut HashMap<String, Session>) -> bool {
980    let mut changed = false;
981    for session in sessions.values_mut() {
982        if session.workspace_root.is_none() {
983            let normalized = normalize_workspace_path(&session.directory);
984            if normalized.is_some() {
985                session.workspace_root = normalized;
986                changed = true;
987            }
988        }
989    }
990    changed
991}
992
993fn repair_session_titles(sessions: &mut HashMap<String, Session>) -> bool {
994    let mut changed = false;
995    for session in sessions.values_mut() {
996        if !title_needs_repair(&session.title) {
997            continue;
998        }
999        let first_user_text = session.messages.iter().find_map(|message| {
1000            if !matches!(message.role, MessageRole::User) {
1001                return None;
1002            }
1003            message.parts.iter().find_map(|part| match part {
1004                MessagePart::Text { text } if !text.trim().is_empty() => Some(text.as_str()),
1005                _ => None,
1006            })
1007        });
1008        let Some(source) = first_user_text else {
1009            continue;
1010        };
1011        let Some(derived) = derive_session_title_from_prompt(source, 60) else {
1012            continue;
1013        };
1014        if derived == session.title {
1015            continue;
1016        }
1017        session.title = derived;
1018        session.time.updated = Utc::now();
1019        changed = true;
1020    }
1021    changed
1022}
1023
1024#[derive(Debug, Deserialize)]
1025struct LegacySessionTime {
1026    created: i64,
1027    updated: i64,
1028}
1029
1030#[derive(Debug, Deserialize)]
1031struct LegacySession {
1032    id: String,
1033    slug: Option<String>,
1034    version: Option<String>,
1035    #[serde(rename = "projectID")]
1036    project_id: Option<String>,
1037    title: Option<String>,
1038    directory: Option<String>,
1039    time: LegacySessionTime,
1040}
1041
1042fn load_legacy_opencode_sessions(base: &Path) -> anyhow::Result<HashMap<String, Session>> {
1043    let legacy_root = base.join("session");
1044    if !legacy_root.is_dir() {
1045        return Ok(HashMap::new());
1046    }
1047
1048    let mut out = HashMap::new();
1049    let mut stack = vec![legacy_root];
1050    while let Some(dir) = stack.pop() {
1051        for entry in std::fs::read_dir(&dir)? {
1052            let entry = entry?;
1053            let path = entry.path();
1054            if entry.file_type()?.is_dir() {
1055                stack.push(path);
1056                continue;
1057            }
1058            if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1059                continue;
1060            }
1061            let raw = match std::fs::read_to_string(&path) {
1062                Ok(v) => v,
1063                Err(_) => continue,
1064            };
1065            let legacy = match serde_json::from_str::<LegacySession>(&raw) {
1066                Ok(v) => v,
1067                Err(_) => continue,
1068            };
1069            let created = Utc
1070                .timestamp_millis_opt(legacy.time.created)
1071                .single()
1072                .unwrap_or_else(Utc::now);
1073            let updated = Utc
1074                .timestamp_millis_opt(legacy.time.updated)
1075                .single()
1076                .unwrap_or(created);
1077
1078            let session_id = legacy.id.clone();
1079            out.insert(
1080                session_id.clone(),
1081                Session {
1082                    id: session_id.clone(),
1083                    slug: legacy.slug,
1084                    version: legacy.version,
1085                    project_id: legacy.project_id,
1086                    title: legacy
1087                        .title
1088                        .filter(|s| !s.trim().is_empty())
1089                        .unwrap_or_else(|| "New session".to_string()),
1090                    directory: legacy
1091                        .directory
1092                        .clone()
1093                        .filter(|s| !s.trim().is_empty())
1094                        .unwrap_or_else(|| ".".to_string()),
1095                    workspace_root: legacy
1096                        .directory
1097                        .as_deref()
1098                        .and_then(normalize_workspace_path),
1099                    origin_workspace_root: None,
1100                    attached_from_workspace: None,
1101                    attached_to_workspace: None,
1102                    attach_timestamp_ms: None,
1103                    attach_reason: None,
1104                    time: tandem_types::SessionTime { created, updated },
1105                    model: None,
1106                    provider: None,
1107                    environment: None,
1108                    messages: load_legacy_session_messages(base, &session_id),
1109                },
1110            );
1111        }
1112    }
1113    Ok(out)
1114}
1115
1116#[derive(Debug, Deserialize)]
1117struct LegacyMessageTime {
1118    created: i64,
1119}
1120
1121#[derive(Debug, Deserialize)]
1122struct LegacyMessage {
1123    id: String,
1124    role: String,
1125    time: LegacyMessageTime,
1126}
1127
1128#[derive(Debug, Deserialize)]
1129struct LegacyPart {
1130    #[serde(rename = "type")]
1131    part_type: Option<String>,
1132    text: Option<String>,
1133    tool: Option<String>,
1134    args: Option<Value>,
1135    result: Option<Value>,
1136    error: Option<String>,
1137}
1138
1139fn load_legacy_session_messages(base: &Path, session_id: &str) -> Vec<Message> {
1140    let msg_dir = base.join("message").join(session_id);
1141    if !msg_dir.is_dir() {
1142        return Vec::new();
1143    }
1144
1145    let mut legacy_messages: Vec<(i64, Message)> = Vec::new();
1146
1147    let Ok(entries) = std::fs::read_dir(&msg_dir) else {
1148        return Vec::new();
1149    };
1150
1151    for entry in entries.flatten() {
1152        let path = entry.path();
1153        if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1154            continue;
1155        }
1156        let Ok(raw) = std::fs::read_to_string(&path) else {
1157            continue;
1158        };
1159        let Ok(legacy) = serde_json::from_str::<LegacyMessage>(&raw) else {
1160            continue;
1161        };
1162
1163        let created_at = Utc
1164            .timestamp_millis_opt(legacy.time.created)
1165            .single()
1166            .unwrap_or_else(Utc::now);
1167
1168        legacy_messages.push((
1169            legacy.time.created,
1170            Message {
1171                id: legacy.id.clone(),
1172                role: legacy_role_to_message_role(&legacy.role),
1173                parts: load_legacy_message_parts(base, &legacy.id),
1174                created_at,
1175            },
1176        ));
1177    }
1178
1179    legacy_messages.sort_by_key(|(created_ms, _)| *created_ms);
1180    legacy_messages.into_iter().map(|(_, msg)| msg).collect()
1181}
1182
1183fn load_legacy_message_parts(base: &Path, message_id: &str) -> Vec<MessagePart> {
1184    let parts_dir = base.join("part").join(message_id);
1185    if !parts_dir.is_dir() {
1186        return Vec::new();
1187    }
1188
1189    let Ok(entries) = std::fs::read_dir(&parts_dir) else {
1190        return Vec::new();
1191    };
1192
1193    let mut out = Vec::new();
1194    for entry in entries.flatten() {
1195        let path = entry.path();
1196        if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1197            continue;
1198        }
1199        let Ok(raw) = std::fs::read_to_string(&path) else {
1200            continue;
1201        };
1202        let Ok(part) = serde_json::from_str::<LegacyPart>(&raw) else {
1203            continue;
1204        };
1205
1206        let mapped = if let Some(tool) = part.tool {
1207            Some(MessagePart::ToolInvocation {
1208                tool,
1209                args: part.args.unwrap_or_else(|| json!({})),
1210                result: part.result,
1211                error: part.error,
1212            })
1213        } else {
1214            match part.part_type.as_deref() {
1215                Some("reasoning") => Some(MessagePart::Reasoning {
1216                    text: part.text.unwrap_or_default(),
1217                }),
1218                Some("tool") => Some(MessagePart::ToolInvocation {
1219                    tool: "tool".to_string(),
1220                    args: part.args.unwrap_or_else(|| json!({})),
1221                    result: part.result,
1222                    error: part.error,
1223                }),
1224                Some("text") | None => Some(MessagePart::Text {
1225                    text: part.text.unwrap_or_default(),
1226                }),
1227                _ => None,
1228            }
1229        };
1230
1231        if let Some(part) = mapped {
1232            out.push(part);
1233        }
1234    }
1235    out
1236}
1237
1238fn legacy_role_to_message_role(role: &str) -> MessageRole {
1239    match role.to_lowercase().as_str() {
1240        "user" => MessageRole::User,
1241        "assistant" => MessageRole::Assistant,
1242        "system" => MessageRole::System,
1243        "tool" => MessageRole::Tool,
1244        _ => MessageRole::Assistant,
1245    }
1246}
1247
1248#[derive(Debug, Clone, Default)]
1249struct MessageMergeStats {
1250    messages_recovered: u64,
1251    parts_recovered: u64,
1252    conflicts_merged: u64,
1253}
1254
1255fn message_richness(msg: &Message) -> usize {
1256    msg.parts
1257        .iter()
1258        .map(|p| match p {
1259            MessagePart::Text { text } | MessagePart::Reasoning { text } => {
1260                if text.trim().is_empty() {
1261                    0
1262                } else {
1263                    1
1264                }
1265            }
1266            MessagePart::ToolInvocation { result, error, .. } => {
1267                if result.is_some() || error.is_some() {
1268                    2
1269                } else {
1270                    1
1271                }
1272            }
1273        })
1274        .sum()
1275}
1276
1277fn most_recent_message_time(messages: &[Message]) -> Option<chrono::DateTime<Utc>> {
1278    messages.iter().map(|m| m.created_at).max()
1279}
1280
1281fn merge_session_messages(
1282    existing: &[Message],
1283    imported: &[Message],
1284) -> (Vec<Message>, MessageMergeStats, bool) {
1285    if existing.is_empty() {
1286        let messages_recovered = imported.len() as u64;
1287        let parts_recovered = imported.iter().map(|m| m.parts.len() as u64).sum();
1288        return (
1289            imported.to_vec(),
1290            MessageMergeStats {
1291                messages_recovered,
1292                parts_recovered,
1293                conflicts_merged: 0,
1294            },
1295            true,
1296        );
1297    }
1298
1299    let mut merged_by_id: HashMap<String, Message> = existing
1300        .iter()
1301        .cloned()
1302        .map(|m| (m.id.clone(), m))
1303        .collect();
1304    let mut stats = MessageMergeStats::default();
1305    let mut changed = false;
1306
1307    for incoming in imported {
1308        match merged_by_id.get(&incoming.id) {
1309            None => {
1310                merged_by_id.insert(incoming.id.clone(), incoming.clone());
1311                stats.messages_recovered += 1;
1312                stats.parts_recovered += incoming.parts.len() as u64;
1313                changed = true;
1314            }
1315            Some(current) => {
1316                let incoming_richer = message_richness(incoming) > message_richness(current)
1317                    || incoming.parts.len() > current.parts.len();
1318                if incoming_richer {
1319                    merged_by_id.insert(incoming.id.clone(), incoming.clone());
1320                    stats.conflicts_merged += 1;
1321                    changed = true;
1322                }
1323            }
1324        }
1325    }
1326
1327    let mut out: Vec<Message> = merged_by_id.into_values().collect();
1328    out.sort_by_key(|m| m.created_at);
1329    (out, stats, changed)
1330}
1331
1332#[cfg(test)]
1333mod tests {
1334    use super::*;
1335    use std::fs as stdfs;
1336
1337    #[tokio::test]
1338    async fn todos_are_normalized_to_wire_shape() {
1339        let base = std::env::temp_dir().join(format!("tandem-core-test-{}", Uuid::new_v4()));
1340        let storage = Storage::new(&base).await.expect("storage");
1341        let session = Session::new(Some("test".to_string()), Some(".".to_string()));
1342        let id = session.id.clone();
1343        storage.save_session(session).await.expect("save session");
1344
1345        storage
1346            .set_todos(
1347                &id,
1348                vec![
1349                    json!({"content":"first"}),
1350                    json!({"text":"second", "status":"in_progress"}),
1351                    json!({"id":"keep-id","content":"third","status":"completed"}),
1352                ],
1353            )
1354            .await
1355            .expect("set todos");
1356
1357        let todos = storage.get_todos(&id).await;
1358        assert_eq!(todos.len(), 3);
1359        for todo in todos {
1360            assert!(todo.get("id").and_then(|v| v.as_str()).is_some());
1361            assert!(todo.get("content").and_then(|v| v.as_str()).is_some());
1362            assert!(todo.get("status").and_then(|v| v.as_str()).is_some());
1363        }
1364    }
1365
1366    #[tokio::test]
1367    async fn imports_legacy_opencode_session_index_when_sessions_json_missing() {
1368        let base =
1369            std::env::temp_dir().join(format!("tandem-core-legacy-import-{}", Uuid::new_v4()));
1370        let legacy_session_dir = base.join("session").join("global");
1371        stdfs::create_dir_all(&legacy_session_dir).expect("legacy session dir");
1372        stdfs::write(
1373            legacy_session_dir.join("ses_test.json"),
1374            r#"{
1375  "id": "ses_test",
1376  "slug": "test",
1377  "version": "1.0.0",
1378  "projectID": "proj_1",
1379  "directory": "C:\\work\\demo",
1380  "title": "Legacy Session",
1381  "time": { "created": 1770913145613, "updated": 1770913146613 }
1382}"#,
1383        )
1384        .expect("legacy session write");
1385
1386        let storage = Storage::new(&base).await.expect("storage");
1387        let sessions = storage.list_sessions().await;
1388        assert_eq!(sessions.len(), 1);
1389        assert_eq!(sessions[0].id, "ses_test");
1390        assert_eq!(sessions[0].title, "Legacy Session");
1391        assert!(base.join("sessions.json").exists());
1392    }
1393
1394    #[tokio::test]
1395    async fn imports_legacy_messages_and_parts_for_session() {
1396        let base = std::env::temp_dir().join(format!("tandem-core-legacy-msg-{}", Uuid::new_v4()));
1397        let session_dir = base.join("session").join("global");
1398        let message_dir = base.join("message").join("ses_test");
1399        let part_dir = base.join("part").join("msg_1");
1400        stdfs::create_dir_all(&session_dir).expect("session dir");
1401        stdfs::create_dir_all(&message_dir).expect("message dir");
1402        stdfs::create_dir_all(&part_dir).expect("part dir");
1403
1404        stdfs::write(
1405            session_dir.join("ses_test.json"),
1406            r#"{
1407  "id": "ses_test",
1408  "projectID": "proj_1",
1409  "directory": "C:\\work\\demo",
1410  "title": "Legacy Session",
1411  "time": { "created": 1770913145613, "updated": 1770913146613 }
1412}"#,
1413        )
1414        .expect("write session");
1415
1416        stdfs::write(
1417            message_dir.join("msg_1.json"),
1418            r#"{
1419  "id": "msg_1",
1420  "sessionID": "ses_test",
1421  "role": "assistant",
1422  "time": { "created": 1770913145613 }
1423}"#,
1424        )
1425        .expect("write msg");
1426
1427        stdfs::write(
1428            part_dir.join("prt_1.json"),
1429            r#"{
1430  "id": "prt_1",
1431  "sessionID": "ses_test",
1432  "messageID": "msg_1",
1433  "type": "text",
1434  "text": "hello from legacy"
1435}"#,
1436        )
1437        .expect("write part");
1438
1439        let storage = Storage::new(&base).await.expect("storage");
1440        let sessions = storage.list_sessions().await;
1441        assert_eq!(sessions.len(), 1);
1442        assert_eq!(sessions[0].messages.len(), 1);
1443        assert_eq!(sessions[0].messages[0].parts.len(), 1);
1444    }
1445
1446    #[tokio::test]
1447    async fn skips_legacy_merge_when_sessions_json_exists() {
1448        let base =
1449            std::env::temp_dir().join(format!("tandem-core-legacy-merge-{}", Uuid::new_v4()));
1450        stdfs::create_dir_all(&base).expect("base");
1451        stdfs::write(
1452            base.join("sessions.json"),
1453            r#"{
1454  "ses_current": {
1455    "id": "ses_current",
1456    "slug": null,
1457    "version": "v1",
1458    "project_id": null,
1459    "title": "Current Session",
1460    "directory": ".",
1461    "workspace_root": null,
1462    "origin_workspace_root": null,
1463    "attached_from_workspace": null,
1464    "attached_to_workspace": null,
1465    "attach_timestamp_ms": null,
1466    "attach_reason": null,
1467    "time": {"created":"2026-01-01T00:00:00Z","updated":"2026-01-01T00:00:00Z"},
1468    "model": null,
1469    "provider": null,
1470    "messages": []
1471  }
1472}"#,
1473        )
1474        .expect("sessions.json");
1475
1476        let legacy_session_dir = base.join("session").join("global");
1477        stdfs::create_dir_all(&legacy_session_dir).expect("legacy session dir");
1478        stdfs::write(
1479            legacy_session_dir.join("ses_legacy.json"),
1480            r#"{
1481  "id": "ses_legacy",
1482  "slug": "legacy",
1483  "version": "1.0.0",
1484  "projectID": "proj_legacy",
1485  "directory": "C:\\work\\legacy",
1486  "title": "Legacy Session",
1487  "time": { "created": 1770913145613, "updated": 1770913146613 }
1488}"#,
1489        )
1490        .expect("legacy session write");
1491
1492        let storage = Storage::new(&base).await.expect("storage");
1493        let sessions = storage.list_sessions().await;
1494        let ids = sessions.iter().map(|s| s.id.clone()).collect::<Vec<_>>();
1495        assert!(ids.contains(&"ses_current".to_string()));
1496        assert!(!ids.contains(&"ses_legacy".to_string()));
1497    }
1498
1499    #[tokio::test]
1500    async fn list_sessions_scoped_filters_by_workspace_root() {
1501        let base = std::env::temp_dir().join(format!("tandem-core-scope-{}", Uuid::new_v4()));
1502        let storage = Storage::new(&base).await.expect("storage");
1503        let ws_a = base.join("ws-a");
1504        let ws_b = base.join("ws-b");
1505        stdfs::create_dir_all(&ws_a).expect("ws_a");
1506        stdfs::create_dir_all(&ws_b).expect("ws_b");
1507        let ws_a_str = ws_a.to_string_lossy().to_string();
1508        let ws_b_str = ws_b.to_string_lossy().to_string();
1509
1510        let mut a = Session::new(Some("a".to_string()), Some(ws_a_str.clone()));
1511        a.workspace_root = Some(ws_a_str.clone());
1512        storage.save_session(a).await.expect("save a");
1513
1514        let mut b = Session::new(Some("b".to_string()), Some(ws_b_str.clone()));
1515        b.workspace_root = Some(ws_b_str);
1516        storage.save_session(b).await.expect("save b");
1517
1518        let scoped = storage
1519            .list_sessions_scoped(SessionListScope::Workspace {
1520                workspace_root: ws_a_str,
1521            })
1522            .await;
1523        assert_eq!(scoped.len(), 1);
1524        assert_eq!(scoped[0].title, "a");
1525    }
1526
1527    #[tokio::test]
1528    async fn attach_session_persists_audit_metadata() {
1529        let base = std::env::temp_dir().join(format!("tandem-core-attach-{}", Uuid::new_v4()));
1530        let storage = Storage::new(&base).await.expect("storage");
1531        let ws_a = base.join("ws-a");
1532        let ws_b = base.join("ws-b");
1533        stdfs::create_dir_all(&ws_a).expect("ws_a");
1534        stdfs::create_dir_all(&ws_b).expect("ws_b");
1535        let ws_a_str = ws_a.to_string_lossy().to_string();
1536        let ws_b_str = ws_b.to_string_lossy().to_string();
1537        let mut session = Session::new(Some("s".to_string()), Some(ws_a_str.clone()));
1538        session.workspace_root = Some(ws_a_str);
1539        let id = session.id.clone();
1540        storage.save_session(session).await.expect("save");
1541
1542        let updated = storage
1543            .attach_session_to_workspace(&id, &ws_b_str, "manual")
1544            .await
1545            .expect("attach")
1546            .expect("session exists");
1547        let normalized_expected = normalize_workspace_path(&ws_b_str).expect("normalized path");
1548        assert_eq!(
1549            updated.workspace_root.as_deref(),
1550            Some(normalized_expected.as_str())
1551        );
1552        assert_eq!(
1553            updated.attached_to_workspace.as_deref(),
1554            Some(normalized_expected.as_str())
1555        );
1556        assert_eq!(updated.attach_reason.as_deref(), Some("manual"));
1557        assert!(updated.attach_timestamp_ms.is_some());
1558    }
1559
1560    #[tokio::test]
1561    async fn append_message_part_persists_tool_invocation_and_result() {
1562        let base = std::env::temp_dir().join(format!("tandem-core-tool-parts-{}", Uuid::new_v4()));
1563        let storage = Storage::new(&base).await.expect("storage");
1564        let session = Session::new(Some("tool parts".to_string()), Some(".".to_string()));
1565        let session_id = session.id.clone();
1566        storage.save_session(session).await.expect("save session");
1567
1568        let user = Message::new(
1569            MessageRole::User,
1570            vec![MessagePart::Text {
1571                text: "build ui".to_string(),
1572            }],
1573        );
1574        let message_id = user.id.clone();
1575        storage
1576            .append_message(&session_id, user)
1577            .await
1578            .expect("append user");
1579
1580        storage
1581            .append_message_part(
1582                &session_id,
1583                &message_id,
1584                MessagePart::ToolInvocation {
1585                    tool: "write".to_string(),
1586                    args: json!({"path":"game.html","content":"<html></html>"}),
1587                    result: None,
1588                    error: None,
1589                },
1590            )
1591            .await
1592            .expect("append invocation");
1593        storage
1594            .append_message_part(
1595                &session_id,
1596                &message_id,
1597                MessagePart::ToolInvocation {
1598                    tool: "write".to_string(),
1599                    args: json!({}),
1600                    result: Some(json!("ok")),
1601                    error: None,
1602                },
1603            )
1604            .await
1605            .expect("append result");
1606
1607        let session = storage.get_session(&session_id).await.expect("session");
1608        let message = session
1609            .messages
1610            .iter()
1611            .find(|message| message.id == message_id)
1612            .expect("message");
1613        assert_eq!(message.parts.len(), 2);
1614        match &message.parts[1] {
1615            MessagePart::ToolInvocation {
1616                tool,
1617                result,
1618                error,
1619                ..
1620            } => {
1621                assert_eq!(tool, "write");
1622                assert_eq!(result.as_ref(), Some(&json!("ok")));
1623                assert_eq!(error.as_deref(), None);
1624            }
1625            other => panic!("expected tool part, got {other:?}"),
1626        }
1627    }
1628
1629    #[tokio::test]
1630    async fn append_message_part_retains_failed_tool_error() {
1631        let base = std::env::temp_dir().join(format!("tandem-core-tool-error-{}", Uuid::new_v4()));
1632        let storage = Storage::new(&base).await.expect("storage");
1633        let session = Session::new(Some("tool errors".to_string()), Some(".".to_string()));
1634        let session_id = session.id.clone();
1635        storage.save_session(session).await.expect("save session");
1636
1637        let user = Message::new(
1638            MessageRole::User,
1639            vec![MessagePart::Text {
1640                text: "write file".to_string(),
1641            }],
1642        );
1643        let message_id = user.id.clone();
1644        storage
1645            .append_message(&session_id, user)
1646            .await
1647            .expect("append user");
1648
1649        storage
1650            .append_message_part(
1651                &session_id,
1652                &message_id,
1653                MessagePart::ToolInvocation {
1654                    tool: "write".to_string(),
1655                    args: json!({"path":"game.html"}),
1656                    result: None,
1657                    error: None,
1658                },
1659            )
1660            .await
1661            .expect("append invocation");
1662        storage
1663            .append_message_part(
1664                &session_id,
1665                &message_id,
1666                MessagePart::ToolInvocation {
1667                    tool: "write".to_string(),
1668                    args: json!({}),
1669                    result: None,
1670                    error: Some("WRITE_CONTENT_MISSING".to_string()),
1671                },
1672            )
1673            .await
1674            .expect("append error");
1675
1676        let session = storage.get_session(&session_id).await.expect("session");
1677        let message = session
1678            .messages
1679            .iter()
1680            .find(|message| message.id == message_id)
1681            .expect("message");
1682        match &message.parts[1] {
1683            MessagePart::ToolInvocation { error, .. } => {
1684                assert_eq!(error.as_deref(), Some("WRITE_CONTENT_MISSING"));
1685            }
1686            other => panic!("expected tool part, got {other:?}"),
1687        }
1688    }
1689
1690    #[tokio::test]
1691    async fn append_message_part_coalesces_repeated_tool_invocation_updates() {
1692        let base = std::env::temp_dir().join(format!("tandem-core-tool-merge-{}", Uuid::new_v4()));
1693        let storage = Storage::new(&base).await.expect("storage");
1694        let session = Session::new(Some("tool merge".to_string()), Some(".".to_string()));
1695        let session_id = session.id.clone();
1696        storage.save_session(session).await.expect("save session");
1697
1698        let user = Message::new(
1699            MessageRole::User,
1700            vec![MessagePart::Text {
1701                text: "build ui".to_string(),
1702            }],
1703        );
1704        let message_id = user.id.clone();
1705        storage
1706            .append_message(&session_id, user)
1707            .await
1708            .expect("append user");
1709
1710        storage
1711            .append_message_part(
1712                &session_id,
1713                &message_id,
1714                MessagePart::ToolInvocation {
1715                    tool: "write".to_string(),
1716                    args: json!({"path":"game.html"}),
1717                    result: None,
1718                    error: None,
1719                },
1720            )
1721            .await
1722            .expect("append first invocation");
1723        storage
1724            .append_message_part(
1725                &session_id,
1726                &message_id,
1727                MessagePart::ToolInvocation {
1728                    tool: "write".to_string(),
1729                    args: json!({"path":"game.html","content":"<html></html>"}),
1730                    result: None,
1731                    error: None,
1732                },
1733            )
1734            .await
1735            .expect("append updated invocation");
1736
1737        let session = storage.get_session(&session_id).await.expect("session");
1738        let message = session
1739            .messages
1740            .iter()
1741            .find(|message| message.id == message_id)
1742            .expect("message");
1743        assert_eq!(message.parts.len(), 2);
1744        match &message.parts[1] {
1745            MessagePart::ToolInvocation { tool, args, .. } => {
1746                assert_eq!(tool, "write");
1747                assert_eq!(args["path"], "game.html");
1748                assert_eq!(args["content"], "<html></html>");
1749            }
1750            other => panic!("expected tool part, got {other:?}"),
1751        }
1752    }
1753
1754    #[tokio::test]
1755    async fn append_message_part_falls_back_to_latest_user_message_when_id_missing() {
1756        let base =
1757            std::env::temp_dir().join(format!("tandem-core-tool-fallback-{}", Uuid::new_v4()));
1758        let storage = Storage::new(&base).await.expect("storage");
1759        let session = Session::new(Some("tool fallback".to_string()), Some(".".to_string()));
1760        let session_id = session.id.clone();
1761        storage.save_session(session).await.expect("save session");
1762
1763        let first = Message::new(
1764            MessageRole::User,
1765            vec![MessagePart::Text {
1766                text: "first prompt".to_string(),
1767            }],
1768        );
1769        let second = Message::new(
1770            MessageRole::User,
1771            vec![MessagePart::Text {
1772                text: "second prompt".to_string(),
1773            }],
1774        );
1775        let second_id = second.id.clone();
1776        storage
1777            .append_message(&session_id, first)
1778            .await
1779            .expect("append first");
1780        storage
1781            .append_message(&session_id, second)
1782            .await
1783            .expect("append second");
1784
1785        storage
1786            .append_message_part(
1787                &session_id,
1788                "missing-message-id",
1789                MessagePart::ToolInvocation {
1790                    tool: "glob".to_string(),
1791                    args: json!({"pattern":"*"}),
1792                    result: Some(json!(["README.md"])),
1793                    error: None,
1794                },
1795            )
1796            .await
1797            .expect("append fallback tool part");
1798
1799        let session = storage.get_session(&session_id).await.expect("session");
1800        let message = session
1801            .messages
1802            .iter()
1803            .find(|message| message.id == second_id)
1804            .expect("latest user message");
1805        match &message.parts[1] {
1806            MessagePart::ToolInvocation { tool, result, .. } => {
1807                assert_eq!(tool, "glob");
1808                assert_eq!(result.as_ref(), Some(&json!(["README.md"])));
1809            }
1810            other => panic!("expected tool part, got {other:?}"),
1811        }
1812    }
1813
1814    #[tokio::test]
1815    async fn startup_repairs_placeholder_titles_from_wrapped_user_messages() {
1816        let base =
1817            std::env::temp_dir().join(format!("tandem-core-title-repair-{}", Uuid::new_v4()));
1818        let storage = Storage::new(&base).await.expect("storage");
1819        let wrapped = "<memory_context>\n<current_session>\n- fact\n</current_session>\n</memory_context>\n\n[Mode instructions]\nUse tools.\n\n[User request]\nExplain this bug";
1820        let mut session = Session::new(Some("<memory_context>".to_string()), Some(".".to_string()));
1821        let id = session.id.clone();
1822        session.messages.push(Message::new(
1823            MessageRole::User,
1824            vec![MessagePart::Text {
1825                text: wrapped.to_string(),
1826            }],
1827        ));
1828        storage.save_session(session).await.expect("save");
1829        drop(storage);
1830
1831        let storage = Storage::new(&base).await.expect("storage");
1832        let repaired = storage.get_session(&id).await.expect("session");
1833        assert_eq!(repaired.title, "Explain this bug");
1834    }
1835}