Skip to main content

tandem_core/
storage.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use anyhow::Context;
5use chrono::{TimeZone, Utc};
6use serde::{Deserialize, Serialize};
7use serde_json::{json, Value};
8use tokio::fs;
9use tokio::sync::RwLock;
10use tokio::task;
11use uuid::Uuid;
12
13use tandem_types::{Message, MessagePart, MessageRole, Session};
14
15use crate::{derive_session_title_from_prompt, normalize_workspace_path, title_needs_repair};
16
17#[derive(Debug, Clone, Serialize, Deserialize, Default)]
18pub struct SessionMeta {
19    pub parent_id: Option<String>,
20    #[serde(default)]
21    pub archived: bool,
22    #[serde(default)]
23    pub shared: bool,
24    pub share_id: Option<String>,
25    pub summary: Option<String>,
26    #[serde(default)]
27    pub snapshots: Vec<Vec<Message>>,
28    pub pre_revert: Option<Vec<Message>>,
29    #[serde(default)]
30    pub todos: Vec<Value>,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct QuestionToolRef {
35    #[serde(rename = "callID")]
36    pub call_id: String,
37    #[serde(rename = "messageID")]
38    pub message_id: String,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct QuestionRequest {
43    pub id: String,
44    #[serde(rename = "sessionID")]
45    pub session_id: String,
46    #[serde(default)]
47    pub questions: Vec<Value>,
48    #[serde(skip_serializing_if = "Option::is_none")]
49    pub tool: Option<QuestionToolRef>,
50}
51
52pub struct Storage {
53    base: PathBuf,
54    sessions: RwLock<HashMap<String, Session>>,
55    metadata: RwLock<HashMap<String, SessionMeta>>,
56    question_requests: RwLock<HashMap<String, QuestionRequest>>,
57}
58
59#[derive(Debug, Clone)]
60pub enum SessionListScope {
61    Global,
62    Workspace { workspace_root: String },
63}
64
65#[derive(Debug, Clone, Default, Serialize, Deserialize)]
66pub struct SessionRepairStats {
67    pub sessions_repaired: u64,
68    pub messages_recovered: u64,
69    pub parts_recovered: u64,
70    pub conflicts_merged: u64,
71}
72
73const LEGACY_IMPORT_MARKER_FILE: &str = "legacy_import_marker.json";
74const LEGACY_IMPORT_MARKER_VERSION: u32 = 1;
75
76#[derive(Debug, Clone, Default, Serialize, Deserialize)]
77pub struct LegacyTreeCounts {
78    pub session_files: u64,
79    pub message_files: u64,
80    pub part_files: u64,
81}
82
83#[derive(Debug, Clone, Default, Serialize, Deserialize)]
84pub struct LegacyImportedCounts {
85    pub sessions: u64,
86    pub messages: u64,
87    pub parts: u64,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct LegacyImportMarker {
92    pub version: u32,
93    pub created_at_ms: u64,
94    pub last_checked_at_ms: u64,
95    pub legacy_counts: LegacyTreeCounts,
96    pub imported_counts: LegacyImportedCounts,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct LegacyRepairRunReport {
101    pub status: String,
102    pub marker_updated: bool,
103    pub sessions_merged: u64,
104    pub messages_recovered: u64,
105    pub parts_recovered: u64,
106    pub legacy_counts: LegacyTreeCounts,
107    pub imported_counts: LegacyImportedCounts,
108}
109
110fn snapshot_session_messages(
111    session_id: &str,
112    session: &Session,
113    metadata: &mut HashMap<String, SessionMeta>,
114) {
115    let meta = metadata
116        .entry(session_id.to_string())
117        .or_insert_with(SessionMeta::default);
118    meta.snapshots.push(session.messages.clone());
119    if meta.snapshots.len() > 25 {
120        let _ = meta.snapshots.remove(0);
121    }
122}
123
124fn merge_message_part(message: &mut Message, part: MessagePart) {
125    match part {
126        MessagePart::ToolInvocation {
127            tool,
128            args,
129            result,
130            error,
131        } => {
132            let args_are_empty =
133                args.is_null() || args.as_object().is_some_and(|value| value.is_empty());
134            if result.is_none() && error.is_none() {
135                if let Some(existing) = message.parts.iter_mut().rev().find(|existing| {
136                    matches!(
137                        existing,
138                        MessagePart::ToolInvocation {
139                            tool: existing_tool,
140                            result: None,
141                            error: None,
142                            ..
143                        } if existing_tool == &tool
144                    )
145                }) {
146                    if let MessagePart::ToolInvocation {
147                        args: existing_args,
148                        ..
149                    } = existing
150                    {
151                        let existing_args_are_empty = existing_args.is_null()
152                            || existing_args
153                                .as_object()
154                                .is_some_and(|value| value.is_empty());
155                        if !args_are_empty || existing_args_are_empty {
156                            *existing_args = args;
157                        }
158                        return;
159                    }
160                }
161            }
162            if result.is_some() || error.is_some() {
163                if let Some(existing) = message.parts.iter_mut().rev().find(|existing| {
164                    matches!(
165                        existing,
166                        MessagePart::ToolInvocation {
167                            tool: existing_tool,
168                            result: None,
169                            error: None,
170                            ..
171                        } if existing_tool == &tool
172                    )
173                }) {
174                    if let MessagePart::ToolInvocation {
175                        args: existing_args,
176                        result: existing_result,
177                        error: existing_error,
178                        ..
179                    } = existing
180                    {
181                        let existing_args_are_empty = existing_args.is_null()
182                            || existing_args
183                                .as_object()
184                                .is_some_and(|value| value.is_empty());
185                        if existing_args_are_empty {
186                            if tool == "write" && args_are_empty {
187                                tracing::info!(
188                                    tool = %tool,
189                                    "merging write result/error into existing tool part with empty args"
190                                );
191                            }
192                            *existing_args = args.clone();
193                        }
194                        *existing_result = result;
195                        *existing_error = error;
196                        return;
197                    }
198                }
199            }
200            message.parts.push(MessagePart::ToolInvocation {
201                tool,
202                args,
203                result,
204                error,
205            });
206        }
207        other => message.parts.push(other),
208    }
209}
210
211impl Storage {
212    pub async fn new(base: impl AsRef<Path>) -> anyhow::Result<Self> {
213        let base = base.as_ref().to_path_buf();
214        fs::create_dir_all(&base).await?;
215        let sessions_file = base.join("sessions.json");
216        let marker_path = base.join(LEGACY_IMPORT_MARKER_FILE);
217        let sessions_file_exists = sessions_file.exists();
218        let mut imported_legacy_sessions = false;
219        let mut sessions = if sessions_file_exists {
220            let raw = fs::read_to_string(&sessions_file).await?;
221            serde_json::from_str::<HashMap<String, Session>>(&raw).unwrap_or_default()
222        } else {
223            HashMap::new()
224        };
225
226        let mut marker_to_write = None;
227        if should_run_legacy_scan_on_startup(&marker_path, sessions_file_exists).await {
228            let base_for_scan = base.clone();
229            let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
230                .await
231                .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
232            if merge_legacy_sessions(&mut sessions, scan.sessions) {
233                imported_legacy_sessions = true;
234            }
235            marker_to_write = Some(LegacyImportMarker {
236                version: LEGACY_IMPORT_MARKER_VERSION,
237                created_at_ms: now_ms_u64(),
238                last_checked_at_ms: now_ms_u64(),
239                legacy_counts: scan.legacy_counts,
240                imported_counts: scan.imported_counts,
241            });
242        }
243
244        if hydrate_workspace_roots(&mut sessions) {
245            imported_legacy_sessions = true;
246        }
247        if repair_session_titles(&mut sessions) {
248            imported_legacy_sessions = true;
249        }
250        let metadata_file = base.join("session_meta.json");
251        let metadata = if metadata_file.exists() {
252            let raw = fs::read_to_string(&metadata_file).await?;
253            serde_json::from_str::<HashMap<String, SessionMeta>>(&raw).unwrap_or_default()
254        } else {
255            HashMap::new()
256        };
257        let questions_file = base.join("questions.json");
258        let question_requests = if questions_file.exists() {
259            let raw = fs::read_to_string(&questions_file).await?;
260            serde_json::from_str::<HashMap<String, QuestionRequest>>(&raw).unwrap_or_default()
261        } else {
262            HashMap::new()
263        };
264        let storage = Self {
265            base,
266            sessions: RwLock::new(sessions),
267            metadata: RwLock::new(metadata),
268            question_requests: RwLock::new(question_requests),
269        };
270
271        if imported_legacy_sessions {
272            storage.flush().await?;
273        }
274        if let Some(marker) = marker_to_write {
275            storage.write_legacy_import_marker(&marker).await?;
276        }
277
278        Ok(storage)
279    }
280
281    pub async fn list_sessions(&self) -> Vec<Session> {
282        self.list_sessions_scoped(SessionListScope::Global).await
283    }
284
285    pub async fn list_sessions_scoped(&self, scope: SessionListScope) -> Vec<Session> {
286        let all = self
287            .sessions
288            .read()
289            .await
290            .values()
291            .cloned()
292            .collect::<Vec<_>>();
293        match scope {
294            SessionListScope::Global => all,
295            SessionListScope::Workspace { workspace_root } => {
296                let Some(normalized_workspace) = normalize_workspace_path(&workspace_root) else {
297                    return Vec::new();
298                };
299                all.into_iter()
300                    .filter(|session| {
301                        let direct = session
302                            .workspace_root
303                            .as_ref()
304                            .and_then(|p| normalize_workspace_path(p))
305                            .map(|p| p == normalized_workspace)
306                            .unwrap_or(false);
307                        if direct {
308                            return true;
309                        }
310                        normalize_workspace_path(&session.directory)
311                            .map(|p| p == normalized_workspace)
312                            .unwrap_or(false)
313                    })
314                    .collect()
315            }
316        }
317    }
318
319    pub async fn get_session(&self, id: &str) -> Option<Session> {
320        self.sessions.read().await.get(id).cloned()
321    }
322
323    pub async fn save_session(&self, mut session: Session) -> anyhow::Result<()> {
324        if session.workspace_root.is_none() {
325            session.workspace_root = normalize_workspace_path(&session.directory);
326        }
327        let session_id = session.id.clone();
328        self.sessions
329            .write()
330            .await
331            .insert(session_id.clone(), session);
332        self.metadata
333            .write()
334            .await
335            .entry(session_id)
336            .or_insert_with(SessionMeta::default);
337        self.flush().await
338    }
339
340    pub async fn repair_sessions_from_file_store(&self) -> anyhow::Result<SessionRepairStats> {
341        let mut stats = SessionRepairStats::default();
342        let mut sessions = self.sessions.write().await;
343
344        for session in sessions.values_mut() {
345            let imported = load_legacy_session_messages(&self.base, &session.id);
346            if imported.is_empty() {
347                continue;
348            }
349
350            let (merged, merge_stats, changed) =
351                merge_session_messages(&session.messages, &imported);
352            if changed {
353                session.messages = merged;
354                session.time.updated =
355                    most_recent_message_time(&session.messages).unwrap_or(session.time.updated);
356                stats.sessions_repaired += 1;
357                stats.messages_recovered += merge_stats.messages_recovered;
358                stats.parts_recovered += merge_stats.parts_recovered;
359                stats.conflicts_merged += merge_stats.conflicts_merged;
360            }
361        }
362
363        if stats.sessions_repaired > 0 {
364            drop(sessions);
365            self.flush().await?;
366        }
367
368        Ok(stats)
369    }
370
371    pub async fn run_legacy_storage_repair_scan(
372        &self,
373        force: bool,
374    ) -> anyhow::Result<LegacyRepairRunReport> {
375        let marker_path = self.base.join(LEGACY_IMPORT_MARKER_FILE);
376        let sessions_exists = self.base.join("sessions.json").exists();
377        let should_scan = if force {
378            true
379        } else {
380            should_run_legacy_scan_on_startup(&marker_path, sessions_exists).await
381        };
382        if !should_scan {
383            let marker = read_legacy_import_marker(&marker_path)
384                .await
385                .unwrap_or_else(|| LegacyImportMarker {
386                    version: LEGACY_IMPORT_MARKER_VERSION,
387                    created_at_ms: now_ms_u64(),
388                    last_checked_at_ms: now_ms_u64(),
389                    legacy_counts: LegacyTreeCounts::default(),
390                    imported_counts: LegacyImportedCounts::default(),
391                });
392            return Ok(LegacyRepairRunReport {
393                status: "skipped".to_string(),
394                marker_updated: false,
395                sessions_merged: 0,
396                messages_recovered: 0,
397                parts_recovered: 0,
398                legacy_counts: marker.legacy_counts,
399                imported_counts: marker.imported_counts,
400            });
401        }
402
403        let base_for_scan = self.base.clone();
404        let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
405            .await
406            .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
407
408        let merge_stats = {
409            let mut sessions = self.sessions.write().await;
410            merge_legacy_sessions_with_stats(&mut sessions, scan.sessions)
411        };
412
413        if merge_stats.changed {
414            self.flush().await?;
415        }
416
417        let marker = LegacyImportMarker {
418            version: LEGACY_IMPORT_MARKER_VERSION,
419            created_at_ms: now_ms_u64(),
420            last_checked_at_ms: now_ms_u64(),
421            legacy_counts: scan.legacy_counts.clone(),
422            imported_counts: scan.imported_counts.clone(),
423        };
424        self.write_legacy_import_marker(&marker).await?;
425
426        Ok(LegacyRepairRunReport {
427            status: if merge_stats.changed {
428                "updated".to_string()
429            } else {
430                "no_changes".to_string()
431            },
432            marker_updated: true,
433            sessions_merged: merge_stats.sessions_merged,
434            messages_recovered: merge_stats.messages_recovered,
435            parts_recovered: merge_stats.parts_recovered,
436            legacy_counts: scan.legacy_counts,
437            imported_counts: scan.imported_counts,
438        })
439    }
440
441    pub async fn delete_session(&self, id: &str) -> anyhow::Result<bool> {
442        let removed = self.sessions.write().await.remove(id).is_some();
443        self.metadata.write().await.remove(id);
444        self.question_requests
445            .write()
446            .await
447            .retain(|_, request| request.session_id != id);
448        if removed {
449            self.flush().await?;
450        }
451        Ok(removed)
452    }
453
454    pub async fn append_message(&self, session_id: &str, msg: Message) -> anyhow::Result<()> {
455        let mut sessions = self.sessions.write().await;
456        let session = sessions
457            .get_mut(session_id)
458            .context("session not found for append_message")?;
459        let mut meta_guard = self.metadata.write().await;
460        snapshot_session_messages(session_id, session, &mut meta_guard);
461        session.messages.push(msg);
462        session.time.updated = Utc::now();
463        drop(sessions);
464        drop(meta_guard);
465        self.flush().await
466    }
467
468    pub async fn append_message_part(
469        &self,
470        session_id: &str,
471        message_id: &str,
472        part: MessagePart,
473    ) -> anyhow::Result<()> {
474        let mut sessions = self.sessions.write().await;
475        let session = sessions
476            .get_mut(session_id)
477            .context("session not found for append_message_part")?;
478        let mut meta_guard = self.metadata.write().await;
479        snapshot_session_messages(session_id, session, &mut meta_guard);
480        let message = if let Some(message) = session
481            .messages
482            .iter_mut()
483            .find(|message| message.id == message_id)
484        {
485            message
486        } else {
487            session
488                .messages
489                .iter_mut()
490                .rev()
491                .find(|message| matches!(message.role, MessageRole::User))
492                .context("message not found for append_message_part")?
493        };
494        merge_message_part(message, part);
495        session.time.updated = Utc::now();
496        drop(sessions);
497        drop(meta_guard);
498        self.flush().await
499    }
500
501    pub async fn fork_session(&self, id: &str) -> anyhow::Result<Option<Session>> {
502        let source = {
503            let sessions = self.sessions.read().await;
504            sessions.get(id).cloned()
505        };
506        let Some(mut child) = source else {
507            return Ok(None);
508        };
509
510        child.id = Uuid::new_v4().to_string();
511        child.title = format!("{} (fork)", child.title);
512        child.time.created = Utc::now();
513        child.time.updated = child.time.created;
514        child.slug = None;
515
516        self.sessions
517            .write()
518            .await
519            .insert(child.id.clone(), child.clone());
520        self.metadata.write().await.insert(
521            child.id.clone(),
522            SessionMeta {
523                parent_id: Some(id.to_string()),
524                snapshots: vec![child.messages.clone()],
525                ..SessionMeta::default()
526            },
527        );
528        self.flush().await?;
529        Ok(Some(child))
530    }
531
532    pub async fn revert_session(&self, id: &str) -> anyhow::Result<bool> {
533        let mut sessions = self.sessions.write().await;
534        let Some(session) = sessions.get_mut(id) else {
535            return Ok(false);
536        };
537        let mut metadata = self.metadata.write().await;
538        let meta = metadata
539            .entry(id.to_string())
540            .or_insert_with(SessionMeta::default);
541        let Some(snapshot) = meta.snapshots.pop() else {
542            return Ok(false);
543        };
544        meta.pre_revert = Some(session.messages.clone());
545        session.messages = snapshot;
546        session.time.updated = Utc::now();
547        drop(metadata);
548        drop(sessions);
549        self.flush().await?;
550        Ok(true)
551    }
552
553    pub async fn unrevert_session(&self, id: &str) -> anyhow::Result<bool> {
554        let mut sessions = self.sessions.write().await;
555        let Some(session) = sessions.get_mut(id) else {
556            return Ok(false);
557        };
558        let mut metadata = self.metadata.write().await;
559        let Some(meta) = metadata.get_mut(id) else {
560            return Ok(false);
561        };
562        let Some(previous) = meta.pre_revert.take() else {
563            return Ok(false);
564        };
565        meta.snapshots.push(session.messages.clone());
566        session.messages = previous;
567        session.time.updated = Utc::now();
568        drop(metadata);
569        drop(sessions);
570        self.flush().await?;
571        Ok(true)
572    }
573
574    pub async fn set_shared(&self, id: &str, shared: bool) -> anyhow::Result<Option<String>> {
575        let mut metadata = self.metadata.write().await;
576        let meta = metadata
577            .entry(id.to_string())
578            .or_insert_with(SessionMeta::default);
579        meta.shared = shared;
580        if shared {
581            if meta.share_id.is_none() {
582                meta.share_id = Some(Uuid::new_v4().to_string());
583            }
584        } else {
585            meta.share_id = None;
586        }
587        let share_id = meta.share_id.clone();
588        drop(metadata);
589        self.flush().await?;
590        Ok(share_id)
591    }
592
593    pub async fn set_archived(&self, id: &str, archived: bool) -> anyhow::Result<bool> {
594        let mut metadata = self.metadata.write().await;
595        let meta = metadata
596            .entry(id.to_string())
597            .or_insert_with(SessionMeta::default);
598        meta.archived = archived;
599        drop(metadata);
600        self.flush().await?;
601        Ok(true)
602    }
603
604    pub async fn set_summary(&self, id: &str, summary: String) -> anyhow::Result<bool> {
605        let mut metadata = self.metadata.write().await;
606        let meta = metadata
607            .entry(id.to_string())
608            .or_insert_with(SessionMeta::default);
609        meta.summary = Some(summary);
610        drop(metadata);
611        self.flush().await?;
612        Ok(true)
613    }
614
615    pub async fn children(&self, parent_id: &str) -> Vec<Session> {
616        let child_ids = {
617            let metadata = self.metadata.read().await;
618            metadata
619                .iter()
620                .filter(|(_, meta)| meta.parent_id.as_deref() == Some(parent_id))
621                .map(|(id, _)| id.clone())
622                .collect::<Vec<_>>()
623        };
624        let sessions = self.sessions.read().await;
625        child_ids
626            .into_iter()
627            .filter_map(|id| sessions.get(&id).cloned())
628            .collect()
629    }
630
631    pub async fn session_status(&self, id: &str) -> Option<Value> {
632        let metadata = self.metadata.read().await;
633        metadata.get(id).map(|meta| {
634            json!({
635                "archived": meta.archived,
636                "shared": meta.shared,
637                "parentID": meta.parent_id,
638                "snapshotCount": meta.snapshots.len()
639            })
640        })
641    }
642
643    pub async fn session_diff(&self, id: &str) -> Option<Value> {
644        let sessions = self.sessions.read().await;
645        let current = sessions.get(id)?;
646        let metadata = self.metadata.read().await;
647        let default = SessionMeta::default();
648        let meta = metadata.get(id).unwrap_or(&default);
649        let last_snapshot_len = meta.snapshots.last().map(|s| s.len()).unwrap_or(0);
650        Some(json!({
651            "sessionID": id,
652            "currentMessageCount": current.messages.len(),
653            "lastSnapshotMessageCount": last_snapshot_len,
654            "delta": current.messages.len() as i64 - last_snapshot_len as i64
655        }))
656    }
657
658    pub async fn set_todos(&self, id: &str, todos: Vec<Value>) -> anyhow::Result<()> {
659        let mut metadata = self.metadata.write().await;
660        let meta = metadata
661            .entry(id.to_string())
662            .or_insert_with(SessionMeta::default);
663        meta.todos = normalize_todo_items(todos);
664        drop(metadata);
665        self.flush().await
666    }
667
668    pub async fn get_todos(&self, id: &str) -> Vec<Value> {
669        let todos = self
670            .metadata
671            .read()
672            .await
673            .get(id)
674            .map(|meta| meta.todos.clone())
675            .unwrap_or_default();
676        normalize_todo_items(todos)
677    }
678
679    pub async fn add_question_request(
680        &self,
681        session_id: &str,
682        message_id: &str,
683        questions: Vec<Value>,
684    ) -> anyhow::Result<QuestionRequest> {
685        if questions.is_empty() {
686            return Err(anyhow::anyhow!(
687                "cannot add empty question request for session {}",
688                session_id
689            ));
690        }
691        let request = QuestionRequest {
692            id: format!("q-{}", Uuid::new_v4()),
693            session_id: session_id.to_string(),
694            questions,
695            tool: Some(QuestionToolRef {
696                call_id: format!("call-{}", Uuid::new_v4()),
697                message_id: message_id.to_string(),
698            }),
699        };
700        self.question_requests
701            .write()
702            .await
703            .insert(request.id.clone(), request.clone());
704        self.flush().await?;
705        Ok(request)
706    }
707
708    pub async fn list_question_requests(&self) -> Vec<QuestionRequest> {
709        self.question_requests
710            .read()
711            .await
712            .values()
713            .cloned()
714            .collect()
715    }
716
717    pub async fn reply_question(&self, request_id: &str) -> anyhow::Result<bool> {
718        let removed = self
719            .question_requests
720            .write()
721            .await
722            .remove(request_id)
723            .is_some();
724        if removed {
725            self.flush().await?;
726        }
727        Ok(removed)
728    }
729
730    pub async fn reject_question(&self, request_id: &str) -> anyhow::Result<bool> {
731        self.reply_question(request_id).await
732    }
733
734    pub async fn attach_session_to_workspace(
735        &self,
736        session_id: &str,
737        target_workspace: &str,
738        reason_tag: &str,
739    ) -> anyhow::Result<Option<Session>> {
740        let Some(target_workspace) = normalize_workspace_path(target_workspace) else {
741            return Ok(None);
742        };
743        let mut sessions = self.sessions.write().await;
744        let Some(session) = sessions.get_mut(session_id) else {
745            return Ok(None);
746        };
747        let previous_workspace = session
748            .workspace_root
749            .clone()
750            .or_else(|| normalize_workspace_path(&session.directory));
751
752        if session.origin_workspace_root.is_none() {
753            session.origin_workspace_root = previous_workspace.clone();
754        }
755        session.attached_from_workspace = previous_workspace;
756        session.attached_to_workspace = Some(target_workspace.clone());
757        session.attach_timestamp_ms = Some(Utc::now().timestamp_millis().max(0) as u64);
758        session.attach_reason = Some(reason_tag.trim().to_string());
759        session.workspace_root = Some(target_workspace.clone());
760        session.directory = target_workspace;
761        session.time.updated = Utc::now();
762        let updated = session.clone();
763        drop(sessions);
764        self.flush().await?;
765        Ok(Some(updated))
766    }
767
768    async fn flush(&self) -> anyhow::Result<()> {
769        let snapshot = self.sessions.read().await.clone();
770        let payload = serde_json::to_string_pretty(&snapshot)?;
771        fs::write(self.base.join("sessions.json"), payload).await?;
772        let metadata_snapshot = self.metadata.read().await.clone();
773        let metadata_payload = serde_json::to_string_pretty(&metadata_snapshot)?;
774        fs::write(self.base.join("session_meta.json"), metadata_payload).await?;
775        let questions_snapshot = self.question_requests.read().await.clone();
776        let questions_payload = serde_json::to_string_pretty(&questions_snapshot)?;
777        fs::write(self.base.join("questions.json"), questions_payload).await?;
778        Ok(())
779    }
780
781    async fn write_legacy_import_marker(&self, marker: &LegacyImportMarker) -> anyhow::Result<()> {
782        let payload = serde_json::to_string_pretty(marker)?;
783        fs::write(self.base.join(LEGACY_IMPORT_MARKER_FILE), payload).await?;
784        Ok(())
785    }
786}
787
788fn normalize_todo_items(items: Vec<Value>) -> Vec<Value> {
789    items
790        .into_iter()
791        .filter_map(|item| {
792            let obj = item.as_object()?;
793            let content = obj
794                .get("content")
795                .and_then(|v| v.as_str())
796                .or_else(|| obj.get("text").and_then(|v| v.as_str()))
797                .unwrap_or("")
798                .trim()
799                .to_string();
800            if content.is_empty() {
801                return None;
802            }
803            let id = obj
804                .get("id")
805                .and_then(|v| v.as_str())
806                .filter(|s| !s.trim().is_empty())
807                .map(ToString::to_string)
808                .unwrap_or_else(|| format!("todo-{}", Uuid::new_v4()));
809            let status = obj
810                .get("status")
811                .and_then(|v| v.as_str())
812                .filter(|s| !s.trim().is_empty())
813                .map(ToString::to_string)
814                .unwrap_or_else(|| "pending".to_string());
815            Some(json!({
816                "id": id,
817                "content": content,
818                "status": status
819            }))
820        })
821        .collect()
822}
823
824#[derive(Debug)]
825struct LegacyScanResult {
826    sessions: HashMap<String, Session>,
827    legacy_counts: LegacyTreeCounts,
828    imported_counts: LegacyImportedCounts,
829}
830
831#[derive(Debug, Default)]
832struct LegacyMergeStats {
833    changed: bool,
834    sessions_merged: u64,
835    messages_recovered: u64,
836    parts_recovered: u64,
837}
838
839fn now_ms_u64() -> u64 {
840    Utc::now().timestamp_millis().max(0) as u64
841}
842
843async fn should_run_legacy_scan_on_startup(marker_path: &Path, sessions_exist: bool) -> bool {
844    if !sessions_exist {
845        return true;
846    }
847    // Fast-path startup: if canonical sessions already exist, do not block startup
848    // on deep legacy tree scans. Users can trigger an explicit repair scan later.
849    if read_legacy_import_marker(marker_path).await.is_none() {
850        return false;
851    }
852    false
853}
854
855async fn read_legacy_import_marker(marker_path: &Path) -> Option<LegacyImportMarker> {
856    let raw = fs::read_to_string(marker_path).await.ok()?;
857    serde_json::from_str::<LegacyImportMarker>(&raw).ok()
858}
859
860fn scan_legacy_sessions(base: &Path) -> anyhow::Result<LegacyScanResult> {
861    let sessions = load_legacy_opencode_sessions(base).unwrap_or_default();
862    let imported_counts = LegacyImportedCounts {
863        sessions: sessions.len() as u64,
864        messages: sessions.values().map(|s| s.messages.len() as u64).sum(),
865        parts: sessions
866            .values()
867            .flat_map(|s| s.messages.iter())
868            .map(|m| m.parts.len() as u64)
869            .sum(),
870    };
871    let legacy_counts = LegacyTreeCounts {
872        session_files: count_legacy_json_files(&base.join("session")),
873        message_files: count_legacy_json_files(&base.join("message")),
874        part_files: count_legacy_json_files(&base.join("part")),
875    };
876    Ok(LegacyScanResult {
877        sessions,
878        legacy_counts,
879        imported_counts,
880    })
881}
882
883fn count_legacy_json_files(root: &Path) -> u64 {
884    if !root.is_dir() {
885        return 0;
886    }
887    let mut count = 0u64;
888    let mut stack = vec![root.to_path_buf()];
889    while let Some(dir) = stack.pop() {
890        if let Ok(entries) = std::fs::read_dir(&dir) {
891            for entry in entries.flatten() {
892                let path = entry.path();
893                if entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
894                    stack.push(path);
895                    continue;
896                }
897                if path.extension().and_then(|ext| ext.to_str()) == Some("json") {
898                    count += 1;
899                }
900            }
901        }
902    }
903    count
904}
905
906fn merge_legacy_sessions(
907    current: &mut HashMap<String, Session>,
908    imported: HashMap<String, Session>,
909) -> bool {
910    merge_legacy_sessions_with_stats(current, imported).changed
911}
912
913fn merge_legacy_sessions_with_stats(
914    current: &mut HashMap<String, Session>,
915    imported: HashMap<String, Session>,
916) -> LegacyMergeStats {
917    let mut stats = LegacyMergeStats::default();
918    for (id, legacy) in imported {
919        let legacy_message_count = legacy.messages.len() as u64;
920        let legacy_part_count = legacy
921            .messages
922            .iter()
923            .map(|m| m.parts.len() as u64)
924            .sum::<u64>();
925        match current.get_mut(&id) {
926            None => {
927                current.insert(id, legacy);
928                stats.changed = true;
929                stats.sessions_merged += 1;
930                stats.messages_recovered += legacy_message_count;
931                stats.parts_recovered += legacy_part_count;
932            }
933            Some(existing) => {
934                let should_merge_messages =
935                    existing.messages.is_empty() && !legacy.messages.is_empty();
936                let should_fill_title =
937                    existing.title.trim().is_empty() && !legacy.title.trim().is_empty();
938                let should_fill_directory = (existing.directory.trim().is_empty()
939                    || existing.directory.trim() == "."
940                    || existing.directory.trim() == "./"
941                    || existing.directory.trim() == ".\\")
942                    && !legacy.directory.trim().is_empty();
943                let should_fill_workspace =
944                    existing.workspace_root.is_none() && legacy.workspace_root.is_some();
945                if should_merge_messages {
946                    existing.messages = legacy.messages.clone();
947                }
948                if should_fill_title {
949                    existing.title = legacy.title.clone();
950                }
951                if should_fill_directory {
952                    existing.directory = legacy.directory.clone();
953                }
954                if should_fill_workspace {
955                    existing.workspace_root = legacy.workspace_root.clone();
956                }
957                if should_merge_messages
958                    || should_fill_title
959                    || should_fill_directory
960                    || should_fill_workspace
961                {
962                    stats.changed = true;
963                    if should_merge_messages {
964                        stats.sessions_merged += 1;
965                        stats.messages_recovered += legacy_message_count;
966                        stats.parts_recovered += legacy_part_count;
967                    }
968                }
969            }
970        }
971    }
972    stats
973}
974
975fn hydrate_workspace_roots(sessions: &mut HashMap<String, Session>) -> bool {
976    let mut changed = false;
977    for session in sessions.values_mut() {
978        if session.workspace_root.is_none() {
979            let normalized = normalize_workspace_path(&session.directory);
980            if normalized.is_some() {
981                session.workspace_root = normalized;
982                changed = true;
983            }
984        }
985    }
986    changed
987}
988
989fn repair_session_titles(sessions: &mut HashMap<String, Session>) -> bool {
990    let mut changed = false;
991    for session in sessions.values_mut() {
992        if !title_needs_repair(&session.title) {
993            continue;
994        }
995        let first_user_text = session.messages.iter().find_map(|message| {
996            if !matches!(message.role, MessageRole::User) {
997                return None;
998            }
999            message.parts.iter().find_map(|part| match part {
1000                MessagePart::Text { text } if !text.trim().is_empty() => Some(text.as_str()),
1001                _ => None,
1002            })
1003        });
1004        let Some(source) = first_user_text else {
1005            continue;
1006        };
1007        let Some(derived) = derive_session_title_from_prompt(source, 60) else {
1008            continue;
1009        };
1010        if derived == session.title {
1011            continue;
1012        }
1013        session.title = derived;
1014        session.time.updated = Utc::now();
1015        changed = true;
1016    }
1017    changed
1018}
1019
1020#[derive(Debug, Deserialize)]
1021struct LegacySessionTime {
1022    created: i64,
1023    updated: i64,
1024}
1025
1026#[derive(Debug, Deserialize)]
1027struct LegacySession {
1028    id: String,
1029    slug: Option<String>,
1030    version: Option<String>,
1031    #[serde(rename = "projectID")]
1032    project_id: Option<String>,
1033    title: Option<String>,
1034    directory: Option<String>,
1035    time: LegacySessionTime,
1036}
1037
1038fn load_legacy_opencode_sessions(base: &Path) -> anyhow::Result<HashMap<String, Session>> {
1039    let legacy_root = base.join("session");
1040    if !legacy_root.is_dir() {
1041        return Ok(HashMap::new());
1042    }
1043
1044    let mut out = HashMap::new();
1045    let mut stack = vec![legacy_root];
1046    while let Some(dir) = stack.pop() {
1047        for entry in std::fs::read_dir(&dir)? {
1048            let entry = entry?;
1049            let path = entry.path();
1050            if entry.file_type()?.is_dir() {
1051                stack.push(path);
1052                continue;
1053            }
1054            if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1055                continue;
1056            }
1057            let raw = match std::fs::read_to_string(&path) {
1058                Ok(v) => v,
1059                Err(_) => continue,
1060            };
1061            let legacy = match serde_json::from_str::<LegacySession>(&raw) {
1062                Ok(v) => v,
1063                Err(_) => continue,
1064            };
1065            let created = Utc
1066                .timestamp_millis_opt(legacy.time.created)
1067                .single()
1068                .unwrap_or_else(Utc::now);
1069            let updated = Utc
1070                .timestamp_millis_opt(legacy.time.updated)
1071                .single()
1072                .unwrap_or(created);
1073
1074            let session_id = legacy.id.clone();
1075            out.insert(
1076                session_id.clone(),
1077                Session {
1078                    id: session_id.clone(),
1079                    slug: legacy.slug,
1080                    version: legacy.version,
1081                    project_id: legacy.project_id,
1082                    title: legacy
1083                        .title
1084                        .filter(|s| !s.trim().is_empty())
1085                        .unwrap_or_else(|| "New session".to_string()),
1086                    directory: legacy
1087                        .directory
1088                        .clone()
1089                        .filter(|s| !s.trim().is_empty())
1090                        .unwrap_or_else(|| ".".to_string()),
1091                    workspace_root: legacy
1092                        .directory
1093                        .as_deref()
1094                        .and_then(normalize_workspace_path),
1095                    origin_workspace_root: None,
1096                    attached_from_workspace: None,
1097                    attached_to_workspace: None,
1098                    attach_timestamp_ms: None,
1099                    attach_reason: None,
1100                    time: tandem_types::SessionTime { created, updated },
1101                    model: None,
1102                    provider: None,
1103                    environment: None,
1104                    messages: load_legacy_session_messages(base, &session_id),
1105                },
1106            );
1107        }
1108    }
1109    Ok(out)
1110}
1111
1112#[derive(Debug, Deserialize)]
1113struct LegacyMessageTime {
1114    created: i64,
1115}
1116
1117#[derive(Debug, Deserialize)]
1118struct LegacyMessage {
1119    id: String,
1120    role: String,
1121    time: LegacyMessageTime,
1122}
1123
1124#[derive(Debug, Deserialize)]
1125struct LegacyPart {
1126    #[serde(rename = "type")]
1127    part_type: Option<String>,
1128    text: Option<String>,
1129    tool: Option<String>,
1130    args: Option<Value>,
1131    result: Option<Value>,
1132    error: Option<String>,
1133}
1134
1135fn load_legacy_session_messages(base: &Path, session_id: &str) -> Vec<Message> {
1136    let msg_dir = base.join("message").join(session_id);
1137    if !msg_dir.is_dir() {
1138        return Vec::new();
1139    }
1140
1141    let mut legacy_messages: Vec<(i64, Message)> = Vec::new();
1142
1143    let Ok(entries) = std::fs::read_dir(&msg_dir) else {
1144        return Vec::new();
1145    };
1146
1147    for entry in entries.flatten() {
1148        let path = entry.path();
1149        if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1150            continue;
1151        }
1152        let Ok(raw) = std::fs::read_to_string(&path) else {
1153            continue;
1154        };
1155        let Ok(legacy) = serde_json::from_str::<LegacyMessage>(&raw) else {
1156            continue;
1157        };
1158
1159        let created_at = Utc
1160            .timestamp_millis_opt(legacy.time.created)
1161            .single()
1162            .unwrap_or_else(Utc::now);
1163
1164        legacy_messages.push((
1165            legacy.time.created,
1166            Message {
1167                id: legacy.id.clone(),
1168                role: legacy_role_to_message_role(&legacy.role),
1169                parts: load_legacy_message_parts(base, &legacy.id),
1170                created_at,
1171            },
1172        ));
1173    }
1174
1175    legacy_messages.sort_by_key(|(created_ms, _)| *created_ms);
1176    legacy_messages.into_iter().map(|(_, msg)| msg).collect()
1177}
1178
1179fn load_legacy_message_parts(base: &Path, message_id: &str) -> Vec<MessagePart> {
1180    let parts_dir = base.join("part").join(message_id);
1181    if !parts_dir.is_dir() {
1182        return Vec::new();
1183    }
1184
1185    let Ok(entries) = std::fs::read_dir(&parts_dir) else {
1186        return Vec::new();
1187    };
1188
1189    let mut out = Vec::new();
1190    for entry in entries.flatten() {
1191        let path = entry.path();
1192        if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1193            continue;
1194        }
1195        let Ok(raw) = std::fs::read_to_string(&path) else {
1196            continue;
1197        };
1198        let Ok(part) = serde_json::from_str::<LegacyPart>(&raw) else {
1199            continue;
1200        };
1201
1202        let mapped = if let Some(tool) = part.tool {
1203            Some(MessagePart::ToolInvocation {
1204                tool,
1205                args: part.args.unwrap_or_else(|| json!({})),
1206                result: part.result,
1207                error: part.error,
1208            })
1209        } else {
1210            match part.part_type.as_deref() {
1211                Some("reasoning") => Some(MessagePart::Reasoning {
1212                    text: part.text.unwrap_or_default(),
1213                }),
1214                Some("tool") => Some(MessagePart::ToolInvocation {
1215                    tool: "tool".to_string(),
1216                    args: part.args.unwrap_or_else(|| json!({})),
1217                    result: part.result,
1218                    error: part.error,
1219                }),
1220                Some("text") | None => Some(MessagePart::Text {
1221                    text: part.text.unwrap_or_default(),
1222                }),
1223                _ => None,
1224            }
1225        };
1226
1227        if let Some(part) = mapped {
1228            out.push(part);
1229        }
1230    }
1231    out
1232}
1233
1234fn legacy_role_to_message_role(role: &str) -> MessageRole {
1235    match role.to_lowercase().as_str() {
1236        "user" => MessageRole::User,
1237        "assistant" => MessageRole::Assistant,
1238        "system" => MessageRole::System,
1239        "tool" => MessageRole::Tool,
1240        _ => MessageRole::Assistant,
1241    }
1242}
1243
1244#[derive(Debug, Clone, Default)]
1245struct MessageMergeStats {
1246    messages_recovered: u64,
1247    parts_recovered: u64,
1248    conflicts_merged: u64,
1249}
1250
1251fn message_richness(msg: &Message) -> usize {
1252    msg.parts
1253        .iter()
1254        .map(|p| match p {
1255            MessagePart::Text { text } | MessagePart::Reasoning { text } => {
1256                if text.trim().is_empty() {
1257                    0
1258                } else {
1259                    1
1260                }
1261            }
1262            MessagePart::ToolInvocation { result, error, .. } => {
1263                if result.is_some() || error.is_some() {
1264                    2
1265                } else {
1266                    1
1267                }
1268            }
1269        })
1270        .sum()
1271}
1272
1273fn most_recent_message_time(messages: &[Message]) -> Option<chrono::DateTime<Utc>> {
1274    messages.iter().map(|m| m.created_at).max()
1275}
1276
1277fn merge_session_messages(
1278    existing: &[Message],
1279    imported: &[Message],
1280) -> (Vec<Message>, MessageMergeStats, bool) {
1281    if existing.is_empty() {
1282        let messages_recovered = imported.len() as u64;
1283        let parts_recovered = imported.iter().map(|m| m.parts.len() as u64).sum();
1284        return (
1285            imported.to_vec(),
1286            MessageMergeStats {
1287                messages_recovered,
1288                parts_recovered,
1289                conflicts_merged: 0,
1290            },
1291            true,
1292        );
1293    }
1294
1295    let mut merged_by_id: HashMap<String, Message> = existing
1296        .iter()
1297        .cloned()
1298        .map(|m| (m.id.clone(), m))
1299        .collect();
1300    let mut stats = MessageMergeStats::default();
1301    let mut changed = false;
1302
1303    for incoming in imported {
1304        match merged_by_id.get(&incoming.id) {
1305            None => {
1306                merged_by_id.insert(incoming.id.clone(), incoming.clone());
1307                stats.messages_recovered += 1;
1308                stats.parts_recovered += incoming.parts.len() as u64;
1309                changed = true;
1310            }
1311            Some(current) => {
1312                let incoming_richer = message_richness(incoming) > message_richness(current)
1313                    || incoming.parts.len() > current.parts.len();
1314                if incoming_richer {
1315                    merged_by_id.insert(incoming.id.clone(), incoming.clone());
1316                    stats.conflicts_merged += 1;
1317                    changed = true;
1318                }
1319            }
1320        }
1321    }
1322
1323    let mut out: Vec<Message> = merged_by_id.into_values().collect();
1324    out.sort_by_key(|m| m.created_at);
1325    (out, stats, changed)
1326}
1327
1328#[cfg(test)]
1329mod tests {
1330    use super::*;
1331    use std::fs as stdfs;
1332
1333    #[tokio::test]
1334    async fn todos_are_normalized_to_wire_shape() {
1335        let base = std::env::temp_dir().join(format!("tandem-core-test-{}", Uuid::new_v4()));
1336        let storage = Storage::new(&base).await.expect("storage");
1337        let session = Session::new(Some("test".to_string()), Some(".".to_string()));
1338        let id = session.id.clone();
1339        storage.save_session(session).await.expect("save session");
1340
1341        storage
1342            .set_todos(
1343                &id,
1344                vec![
1345                    json!({"content":"first"}),
1346                    json!({"text":"second", "status":"in_progress"}),
1347                    json!({"id":"keep-id","content":"third","status":"completed"}),
1348                ],
1349            )
1350            .await
1351            .expect("set todos");
1352
1353        let todos = storage.get_todos(&id).await;
1354        assert_eq!(todos.len(), 3);
1355        for todo in todos {
1356            assert!(todo.get("id").and_then(|v| v.as_str()).is_some());
1357            assert!(todo.get("content").and_then(|v| v.as_str()).is_some());
1358            assert!(todo.get("status").and_then(|v| v.as_str()).is_some());
1359        }
1360    }
1361
1362    #[tokio::test]
1363    async fn imports_legacy_opencode_session_index_when_sessions_json_missing() {
1364        let base =
1365            std::env::temp_dir().join(format!("tandem-core-legacy-import-{}", Uuid::new_v4()));
1366        let legacy_session_dir = base.join("session").join("global");
1367        stdfs::create_dir_all(&legacy_session_dir).expect("legacy session dir");
1368        stdfs::write(
1369            legacy_session_dir.join("ses_test.json"),
1370            r#"{
1371  "id": "ses_test",
1372  "slug": "test",
1373  "version": "1.0.0",
1374  "projectID": "proj_1",
1375  "directory": "C:\\work\\demo",
1376  "title": "Legacy Session",
1377  "time": { "created": 1770913145613, "updated": 1770913146613 }
1378}"#,
1379        )
1380        .expect("legacy session write");
1381
1382        let storage = Storage::new(&base).await.expect("storage");
1383        let sessions = storage.list_sessions().await;
1384        assert_eq!(sessions.len(), 1);
1385        assert_eq!(sessions[0].id, "ses_test");
1386        assert_eq!(sessions[0].title, "Legacy Session");
1387        assert!(base.join("sessions.json").exists());
1388    }
1389
1390    #[tokio::test]
1391    async fn imports_legacy_messages_and_parts_for_session() {
1392        let base = std::env::temp_dir().join(format!("tandem-core-legacy-msg-{}", Uuid::new_v4()));
1393        let session_dir = base.join("session").join("global");
1394        let message_dir = base.join("message").join("ses_test");
1395        let part_dir = base.join("part").join("msg_1");
1396        stdfs::create_dir_all(&session_dir).expect("session dir");
1397        stdfs::create_dir_all(&message_dir).expect("message dir");
1398        stdfs::create_dir_all(&part_dir).expect("part dir");
1399
1400        stdfs::write(
1401            session_dir.join("ses_test.json"),
1402            r#"{
1403  "id": "ses_test",
1404  "projectID": "proj_1",
1405  "directory": "C:\\work\\demo",
1406  "title": "Legacy Session",
1407  "time": { "created": 1770913145613, "updated": 1770913146613 }
1408}"#,
1409        )
1410        .expect("write session");
1411
1412        stdfs::write(
1413            message_dir.join("msg_1.json"),
1414            r#"{
1415  "id": "msg_1",
1416  "sessionID": "ses_test",
1417  "role": "assistant",
1418  "time": { "created": 1770913145613 }
1419}"#,
1420        )
1421        .expect("write msg");
1422
1423        stdfs::write(
1424            part_dir.join("prt_1.json"),
1425            r#"{
1426  "id": "prt_1",
1427  "sessionID": "ses_test",
1428  "messageID": "msg_1",
1429  "type": "text",
1430  "text": "hello from legacy"
1431}"#,
1432        )
1433        .expect("write part");
1434
1435        let storage = Storage::new(&base).await.expect("storage");
1436        let sessions = storage.list_sessions().await;
1437        assert_eq!(sessions.len(), 1);
1438        assert_eq!(sessions[0].messages.len(), 1);
1439        assert_eq!(sessions[0].messages[0].parts.len(), 1);
1440    }
1441
1442    #[tokio::test]
1443    async fn skips_legacy_merge_when_sessions_json_exists() {
1444        let base =
1445            std::env::temp_dir().join(format!("tandem-core-legacy-merge-{}", Uuid::new_v4()));
1446        stdfs::create_dir_all(&base).expect("base");
1447        stdfs::write(
1448            base.join("sessions.json"),
1449            r#"{
1450  "ses_current": {
1451    "id": "ses_current",
1452    "slug": null,
1453    "version": "v1",
1454    "project_id": null,
1455    "title": "Current Session",
1456    "directory": ".",
1457    "workspace_root": null,
1458    "origin_workspace_root": null,
1459    "attached_from_workspace": null,
1460    "attached_to_workspace": null,
1461    "attach_timestamp_ms": null,
1462    "attach_reason": null,
1463    "time": {"created":"2026-01-01T00:00:00Z","updated":"2026-01-01T00:00:00Z"},
1464    "model": null,
1465    "provider": null,
1466    "messages": []
1467  }
1468}"#,
1469        )
1470        .expect("sessions.json");
1471
1472        let legacy_session_dir = base.join("session").join("global");
1473        stdfs::create_dir_all(&legacy_session_dir).expect("legacy session dir");
1474        stdfs::write(
1475            legacy_session_dir.join("ses_legacy.json"),
1476            r#"{
1477  "id": "ses_legacy",
1478  "slug": "legacy",
1479  "version": "1.0.0",
1480  "projectID": "proj_legacy",
1481  "directory": "C:\\work\\legacy",
1482  "title": "Legacy Session",
1483  "time": { "created": 1770913145613, "updated": 1770913146613 }
1484}"#,
1485        )
1486        .expect("legacy session write");
1487
1488        let storage = Storage::new(&base).await.expect("storage");
1489        let sessions = storage.list_sessions().await;
1490        let ids = sessions.iter().map(|s| s.id.clone()).collect::<Vec<_>>();
1491        assert!(ids.contains(&"ses_current".to_string()));
1492        assert!(!ids.contains(&"ses_legacy".to_string()));
1493    }
1494
1495    #[tokio::test]
1496    async fn list_sessions_scoped_filters_by_workspace_root() {
1497        let base = std::env::temp_dir().join(format!("tandem-core-scope-{}", Uuid::new_v4()));
1498        let storage = Storage::new(&base).await.expect("storage");
1499        let ws_a = base.join("ws-a");
1500        let ws_b = base.join("ws-b");
1501        stdfs::create_dir_all(&ws_a).expect("ws_a");
1502        stdfs::create_dir_all(&ws_b).expect("ws_b");
1503        let ws_a_str = ws_a.to_string_lossy().to_string();
1504        let ws_b_str = ws_b.to_string_lossy().to_string();
1505
1506        let mut a = Session::new(Some("a".to_string()), Some(ws_a_str.clone()));
1507        a.workspace_root = Some(ws_a_str.clone());
1508        storage.save_session(a).await.expect("save a");
1509
1510        let mut b = Session::new(Some("b".to_string()), Some(ws_b_str.clone()));
1511        b.workspace_root = Some(ws_b_str);
1512        storage.save_session(b).await.expect("save b");
1513
1514        let scoped = storage
1515            .list_sessions_scoped(SessionListScope::Workspace {
1516                workspace_root: ws_a_str,
1517            })
1518            .await;
1519        assert_eq!(scoped.len(), 1);
1520        assert_eq!(scoped[0].title, "a");
1521    }
1522
1523    #[tokio::test]
1524    async fn attach_session_persists_audit_metadata() {
1525        let base = std::env::temp_dir().join(format!("tandem-core-attach-{}", Uuid::new_v4()));
1526        let storage = Storage::new(&base).await.expect("storage");
1527        let ws_a = base.join("ws-a");
1528        let ws_b = base.join("ws-b");
1529        stdfs::create_dir_all(&ws_a).expect("ws_a");
1530        stdfs::create_dir_all(&ws_b).expect("ws_b");
1531        let ws_a_str = ws_a.to_string_lossy().to_string();
1532        let ws_b_str = ws_b.to_string_lossy().to_string();
1533        let mut session = Session::new(Some("s".to_string()), Some(ws_a_str.clone()));
1534        session.workspace_root = Some(ws_a_str);
1535        let id = session.id.clone();
1536        storage.save_session(session).await.expect("save");
1537
1538        let updated = storage
1539            .attach_session_to_workspace(&id, &ws_b_str, "manual")
1540            .await
1541            .expect("attach")
1542            .expect("session exists");
1543        let normalized_expected = normalize_workspace_path(&ws_b_str).expect("normalized path");
1544        assert_eq!(
1545            updated.workspace_root.as_deref(),
1546            Some(normalized_expected.as_str())
1547        );
1548        assert_eq!(
1549            updated.attached_to_workspace.as_deref(),
1550            Some(normalized_expected.as_str())
1551        );
1552        assert_eq!(updated.attach_reason.as_deref(), Some("manual"));
1553        assert!(updated.attach_timestamp_ms.is_some());
1554    }
1555
1556    #[tokio::test]
1557    async fn append_message_part_persists_tool_invocation_and_result() {
1558        let base = std::env::temp_dir().join(format!("tandem-core-tool-parts-{}", Uuid::new_v4()));
1559        let storage = Storage::new(&base).await.expect("storage");
1560        let session = Session::new(Some("tool parts".to_string()), Some(".".to_string()));
1561        let session_id = session.id.clone();
1562        storage.save_session(session).await.expect("save session");
1563
1564        let user = Message::new(
1565            MessageRole::User,
1566            vec![MessagePart::Text {
1567                text: "build ui".to_string(),
1568            }],
1569        );
1570        let message_id = user.id.clone();
1571        storage
1572            .append_message(&session_id, user)
1573            .await
1574            .expect("append user");
1575
1576        storage
1577            .append_message_part(
1578                &session_id,
1579                &message_id,
1580                MessagePart::ToolInvocation {
1581                    tool: "write".to_string(),
1582                    args: json!({"path":"game.html","content":"<html></html>"}),
1583                    result: None,
1584                    error: None,
1585                },
1586            )
1587            .await
1588            .expect("append invocation");
1589        storage
1590            .append_message_part(
1591                &session_id,
1592                &message_id,
1593                MessagePart::ToolInvocation {
1594                    tool: "write".to_string(),
1595                    args: json!({}),
1596                    result: Some(json!("ok")),
1597                    error: None,
1598                },
1599            )
1600            .await
1601            .expect("append result");
1602
1603        let session = storage.get_session(&session_id).await.expect("session");
1604        let message = session
1605            .messages
1606            .iter()
1607            .find(|message| message.id == message_id)
1608            .expect("message");
1609        assert_eq!(message.parts.len(), 2);
1610        match &message.parts[1] {
1611            MessagePart::ToolInvocation {
1612                tool,
1613                result,
1614                error,
1615                ..
1616            } => {
1617                assert_eq!(tool, "write");
1618                assert_eq!(result.as_ref(), Some(&json!("ok")));
1619                assert_eq!(error.as_deref(), None);
1620            }
1621            other => panic!("expected tool part, got {other:?}"),
1622        }
1623    }
1624
1625    #[tokio::test]
1626    async fn append_message_part_retains_failed_tool_error() {
1627        let base = std::env::temp_dir().join(format!("tandem-core-tool-error-{}", Uuid::new_v4()));
1628        let storage = Storage::new(&base).await.expect("storage");
1629        let session = Session::new(Some("tool errors".to_string()), Some(".".to_string()));
1630        let session_id = session.id.clone();
1631        storage.save_session(session).await.expect("save session");
1632
1633        let user = Message::new(
1634            MessageRole::User,
1635            vec![MessagePart::Text {
1636                text: "write file".to_string(),
1637            }],
1638        );
1639        let message_id = user.id.clone();
1640        storage
1641            .append_message(&session_id, user)
1642            .await
1643            .expect("append user");
1644
1645        storage
1646            .append_message_part(
1647                &session_id,
1648                &message_id,
1649                MessagePart::ToolInvocation {
1650                    tool: "write".to_string(),
1651                    args: json!({"path":"game.html"}),
1652                    result: None,
1653                    error: None,
1654                },
1655            )
1656            .await
1657            .expect("append invocation");
1658        storage
1659            .append_message_part(
1660                &session_id,
1661                &message_id,
1662                MessagePart::ToolInvocation {
1663                    tool: "write".to_string(),
1664                    args: json!({}),
1665                    result: None,
1666                    error: Some("WRITE_CONTENT_MISSING".to_string()),
1667                },
1668            )
1669            .await
1670            .expect("append error");
1671
1672        let session = storage.get_session(&session_id).await.expect("session");
1673        let message = session
1674            .messages
1675            .iter()
1676            .find(|message| message.id == message_id)
1677            .expect("message");
1678        match &message.parts[1] {
1679            MessagePart::ToolInvocation { error, .. } => {
1680                assert_eq!(error.as_deref(), Some("WRITE_CONTENT_MISSING"));
1681            }
1682            other => panic!("expected tool part, got {other:?}"),
1683        }
1684    }
1685
1686    #[tokio::test]
1687    async fn append_message_part_coalesces_repeated_tool_invocation_updates() {
1688        let base = std::env::temp_dir().join(format!("tandem-core-tool-merge-{}", Uuid::new_v4()));
1689        let storage = Storage::new(&base).await.expect("storage");
1690        let session = Session::new(Some("tool merge".to_string()), Some(".".to_string()));
1691        let session_id = session.id.clone();
1692        storage.save_session(session).await.expect("save session");
1693
1694        let user = Message::new(
1695            MessageRole::User,
1696            vec![MessagePart::Text {
1697                text: "build ui".to_string(),
1698            }],
1699        );
1700        let message_id = user.id.clone();
1701        storage
1702            .append_message(&session_id, user)
1703            .await
1704            .expect("append user");
1705
1706        storage
1707            .append_message_part(
1708                &session_id,
1709                &message_id,
1710                MessagePart::ToolInvocation {
1711                    tool: "write".to_string(),
1712                    args: json!({"path":"game.html"}),
1713                    result: None,
1714                    error: None,
1715                },
1716            )
1717            .await
1718            .expect("append first invocation");
1719        storage
1720            .append_message_part(
1721                &session_id,
1722                &message_id,
1723                MessagePart::ToolInvocation {
1724                    tool: "write".to_string(),
1725                    args: json!({"path":"game.html","content":"<html></html>"}),
1726                    result: None,
1727                    error: None,
1728                },
1729            )
1730            .await
1731            .expect("append updated invocation");
1732
1733        let session = storage.get_session(&session_id).await.expect("session");
1734        let message = session
1735            .messages
1736            .iter()
1737            .find(|message| message.id == message_id)
1738            .expect("message");
1739        assert_eq!(message.parts.len(), 2);
1740        match &message.parts[1] {
1741            MessagePart::ToolInvocation { tool, args, .. } => {
1742                assert_eq!(tool, "write");
1743                assert_eq!(args["path"], "game.html");
1744                assert_eq!(args["content"], "<html></html>");
1745            }
1746            other => panic!("expected tool part, got {other:?}"),
1747        }
1748    }
1749
1750    #[tokio::test]
1751    async fn append_message_part_falls_back_to_latest_user_message_when_id_missing() {
1752        let base =
1753            std::env::temp_dir().join(format!("tandem-core-tool-fallback-{}", Uuid::new_v4()));
1754        let storage = Storage::new(&base).await.expect("storage");
1755        let session = Session::new(Some("tool fallback".to_string()), Some(".".to_string()));
1756        let session_id = session.id.clone();
1757        storage.save_session(session).await.expect("save session");
1758
1759        let first = Message::new(
1760            MessageRole::User,
1761            vec![MessagePart::Text {
1762                text: "first prompt".to_string(),
1763            }],
1764        );
1765        let second = Message::new(
1766            MessageRole::User,
1767            vec![MessagePart::Text {
1768                text: "second prompt".to_string(),
1769            }],
1770        );
1771        let second_id = second.id.clone();
1772        storage
1773            .append_message(&session_id, first)
1774            .await
1775            .expect("append first");
1776        storage
1777            .append_message(&session_id, second)
1778            .await
1779            .expect("append second");
1780
1781        storage
1782            .append_message_part(
1783                &session_id,
1784                "missing-message-id",
1785                MessagePart::ToolInvocation {
1786                    tool: "glob".to_string(),
1787                    args: json!({"pattern":"*"}),
1788                    result: Some(json!(["README.md"])),
1789                    error: None,
1790                },
1791            )
1792            .await
1793            .expect("append fallback tool part");
1794
1795        let session = storage.get_session(&session_id).await.expect("session");
1796        let message = session
1797            .messages
1798            .iter()
1799            .find(|message| message.id == second_id)
1800            .expect("latest user message");
1801        match &message.parts[1] {
1802            MessagePart::ToolInvocation { tool, result, .. } => {
1803                assert_eq!(tool, "glob");
1804                assert_eq!(result.as_ref(), Some(&json!(["README.md"])));
1805            }
1806            other => panic!("expected tool part, got {other:?}"),
1807        }
1808    }
1809
1810    #[tokio::test]
1811    async fn startup_repairs_placeholder_titles_from_wrapped_user_messages() {
1812        let base =
1813            std::env::temp_dir().join(format!("tandem-core-title-repair-{}", Uuid::new_v4()));
1814        let storage = Storage::new(&base).await.expect("storage");
1815        let wrapped = "<memory_context>\n<current_session>\n- fact\n</current_session>\n</memory_context>\n\n[Mode instructions]\nUse tools.\n\n[User request]\nExplain this bug";
1816        let mut session = Session::new(Some("<memory_context>".to_string()), Some(".".to_string()));
1817        let id = session.id.clone();
1818        session.messages.push(Message::new(
1819            MessageRole::User,
1820            vec![MessagePart::Text {
1821                text: wrapped.to_string(),
1822            }],
1823        ));
1824        storage.save_session(session).await.expect("save");
1825        drop(storage);
1826
1827        let storage = Storage::new(&base).await.expect("storage");
1828        let repaired = storage.get_session(&id).await.expect("session");
1829        assert_eq!(repaired.title, "Explain this bug");
1830    }
1831}