Skip to main content

tandem_core/storage_parts/
part01.rs

1#[derive(Debug, Clone, Serialize, Deserialize, Default)]
2pub struct SessionMeta {
3    pub parent_id: Option<String>,
4    #[serde(default)]
5    pub archived: bool,
6    #[serde(default)]
7    pub shared: bool,
8    pub share_id: Option<String>,
9    pub summary: Option<String>,
10    #[serde(default)]
11    pub snapshots: Vec<Vec<Message>>,
12    pub pre_revert: Option<Vec<Message>>,
13    #[serde(default)]
14    pub todos: Vec<Value>,
15}
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct QuestionToolRef {
19    #[serde(rename = "callID")]
20    pub call_id: String,
21    #[serde(rename = "messageID")]
22    pub message_id: String,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct QuestionRequest {
27    pub id: String,
28    #[serde(rename = "sessionID")]
29    pub session_id: String,
30    #[serde(default)]
31    pub questions: Vec<Value>,
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub tool: Option<QuestionToolRef>,
34}
35
36pub struct Storage {
37    base: PathBuf,
38    sessions: RwLock<HashMap<String, Session>>,
39    metadata: RwLock<HashMap<String, SessionMeta>>,
40    question_requests: RwLock<HashMap<String, QuestionRequest>>,
41    flush_lock: Mutex<()>,
42}
43
44#[derive(Debug, Clone)]
45pub enum SessionListScope {
46    Global,
47    Workspace { workspace_root: String },
48}
49
50#[derive(Debug, Clone, Default, Serialize, Deserialize)]
51pub struct SessionRepairStats {
52    pub sessions_repaired: u64,
53    pub messages_recovered: u64,
54    pub parts_recovered: u64,
55    pub conflicts_merged: u64,
56}
57
58const LEGACY_IMPORT_MARKER_FILE: &str = "legacy_import_marker.json";
59const LEGACY_IMPORT_MARKER_VERSION: u32 = 1;
60const MAX_SESSION_SNAPSHOTS: usize = 5;
61
62#[derive(Debug, Clone, Default, Serialize, Deserialize)]
63pub struct LegacyTreeCounts {
64    pub session_files: u64,
65    pub message_files: u64,
66    pub part_files: u64,
67}
68
69#[derive(Debug, Clone, Default, Serialize, Deserialize)]
70pub struct LegacyImportedCounts {
71    pub sessions: u64,
72    pub messages: u64,
73    pub parts: u64,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct LegacyImportMarker {
78    pub version: u32,
79    pub created_at_ms: u64,
80    pub last_checked_at_ms: u64,
81    pub legacy_counts: LegacyTreeCounts,
82    pub imported_counts: LegacyImportedCounts,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct LegacyRepairRunReport {
87    pub status: String,
88    pub marker_updated: bool,
89    pub sessions_merged: u64,
90    pub messages_recovered: u64,
91    pub parts_recovered: u64,
92    pub legacy_counts: LegacyTreeCounts,
93    pub imported_counts: LegacyImportedCounts,
94}
95
96fn snapshot_session_messages(
97    session_id: &str,
98    session: &Session,
99    metadata: &mut HashMap<String, SessionMeta>,
100) {
101    let meta = metadata
102        .entry(session_id.to_string())
103        .or_insert_with(SessionMeta::default);
104    meta.snapshots.push(session.messages.clone());
105    trim_session_snapshots(&mut meta.snapshots);
106}
107
108fn trim_session_snapshots(snapshots: &mut Vec<Vec<Message>>) {
109    if snapshots.len() > MAX_SESSION_SNAPSHOTS {
110        let keep_from = snapshots.len() - MAX_SESSION_SNAPSHOTS;
111        snapshots.drain(0..keep_from);
112    }
113}
114
115fn compact_session_snapshots(snapshots: &mut Vec<Vec<Message>>) -> usize {
116    if snapshots.is_empty() {
117        return 0;
118    }
119
120    let original_len = snapshots.len();
121    let mut compacted = Vec::with_capacity(original_len);
122    let mut previous_encoded: Option<Vec<u8>> = None;
123
124    for snapshot in snapshots.drain(..) {
125        let encoded = serde_json::to_vec(&snapshot).unwrap_or_default();
126        if previous_encoded.as_ref() == Some(&encoded) {
127            continue;
128        }
129        previous_encoded = Some(encoded);
130        compacted.push(snapshot);
131    }
132
133    trim_session_snapshots(&mut compacted);
134    let removed = original_len.saturating_sub(compacted.len());
135    *snapshots = compacted;
136    removed
137}
138
139fn session_meta_is_empty(meta: &SessionMeta) -> bool {
140    meta.parent_id.is_none()
141        && !meta.archived
142        && !meta.shared
143        && meta.share_id.is_none()
144        && meta.summary.is_none()
145        && meta.snapshots.is_empty()
146        && meta.pre_revert.is_none()
147        && meta.todos.is_empty()
148}
149
150#[derive(Debug, Default)]
151struct SessionMetaCompactionStats {
152    metadata_pruned: u64,
153    snapshots_removed: u64,
154}
155
156fn compact_session_metadata(
157    sessions: &HashMap<String, Session>,
158    metadata: &mut HashMap<String, SessionMeta>,
159) -> SessionMetaCompactionStats {
160    let mut stats = SessionMetaCompactionStats::default();
161
162    metadata.retain(|session_id, meta| {
163        if !sessions.contains_key(session_id) {
164            stats.metadata_pruned += 1;
165            return false;
166        }
167
168        let removed = compact_session_snapshots(&mut meta.snapshots) as u64;
169        stats.snapshots_removed += removed;
170
171        if session_meta_is_empty(meta) {
172            stats.metadata_pruned += 1;
173            return false;
174        }
175
176        true
177    });
178
179    stats
180}
181
182impl Storage {
183    pub async fn new(base: impl AsRef<Path>) -> anyhow::Result<Self> {
184        let base = base.as_ref().to_path_buf();
185        fs::create_dir_all(&base).await?;
186        let sessions_file = base.join("sessions.json");
187        let marker_path = base.join(LEGACY_IMPORT_MARKER_FILE);
188        let sessions_file_exists = sessions_file.exists();
189        let mut imported_legacy_sessions = false;
190        let mut sessions = if sessions_file_exists {
191            let raw = fs::read_to_string(&sessions_file).await?;
192            serde_json::from_str::<HashMap<String, Session>>(&raw).unwrap_or_default()
193        } else {
194            HashMap::new()
195        };
196
197        let mut marker_to_write = None;
198        if should_run_legacy_scan_on_startup(&marker_path, sessions_file_exists).await {
199            let base_for_scan = base.clone();
200            let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
201                .await
202                .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
203            if merge_legacy_sessions(&mut sessions, scan.sessions) {
204                imported_legacy_sessions = true;
205            }
206            marker_to_write = Some(LegacyImportMarker {
207                version: LEGACY_IMPORT_MARKER_VERSION,
208                created_at_ms: now_ms_u64(),
209                last_checked_at_ms: now_ms_u64(),
210                legacy_counts: scan.legacy_counts,
211                imported_counts: scan.imported_counts,
212            });
213        }
214
215        if hydrate_workspace_roots(&mut sessions) {
216            imported_legacy_sessions = true;
217        }
218        if repair_session_titles(&mut sessions) {
219            imported_legacy_sessions = true;
220        }
221        let metadata_file = base.join("session_meta.json");
222        let mut metadata = if metadata_file.exists() {
223            let raw = fs::read_to_string(&metadata_file).await?;
224            serde_json::from_str::<HashMap<String, SessionMeta>>(&raw).unwrap_or_default()
225        } else {
226            HashMap::new()
227        };
228        let compaction = compact_session_metadata(&sessions, &mut metadata);
229        let metadata_compacted = compaction.metadata_pruned > 0 || compaction.snapshots_removed > 0;
230        if metadata_compacted {
231            tracing::info!(
232                metadata_pruned = compaction.metadata_pruned,
233                snapshots_removed = compaction.snapshots_removed,
234                "compacted persisted session metadata"
235            );
236        }
237        let questions_file = base.join("questions.json");
238        let question_requests = if questions_file.exists() {
239            let raw = fs::read_to_string(&questions_file).await?;
240            serde_json::from_str::<HashMap<String, QuestionRequest>>(&raw).unwrap_or_default()
241        } else {
242            HashMap::new()
243        };
244        let storage = Self {
245            base,
246            sessions: RwLock::new(sessions),
247            metadata: RwLock::new(metadata),
248            question_requests: RwLock::new(question_requests),
249            flush_lock: Mutex::new(()),
250        };
251
252        if imported_legacy_sessions || metadata_compacted {
253            storage.flush().await?;
254        }
255        if let Some(marker) = marker_to_write {
256            storage.write_legacy_import_marker(&marker).await?;
257        }
258
259        Ok(storage)
260    }
261
262    pub async fn list_sessions(&self) -> Vec<Session> {
263        self.list_sessions_scoped(SessionListScope::Global).await
264    }
265
266    pub async fn list_sessions_scoped(&self, scope: SessionListScope) -> Vec<Session> {
267        let all = self
268            .sessions
269            .read()
270            .await
271            .values()
272            .cloned()
273            .collect::<Vec<_>>();
274        match scope {
275            SessionListScope::Global => all,
276            SessionListScope::Workspace { workspace_root } => {
277                let Some(normalized_workspace) = normalize_workspace_path(&workspace_root) else {
278                    return Vec::new();
279                };
280                all.into_iter()
281                    .filter(|session| {
282                        let direct = session
283                            .workspace_root
284                            .as_ref()
285                            .and_then(|p| normalize_workspace_path(p))
286                            .map(|p| p == normalized_workspace)
287                            .unwrap_or(false);
288                        if direct {
289                            return true;
290                        }
291                        normalize_workspace_path(&session.directory)
292                            .map(|p| p == normalized_workspace)
293                            .unwrap_or(false)
294                    })
295                    .collect()
296            }
297        }
298    }
299
300    pub async fn get_session(&self, id: &str) -> Option<Session> {
301        self.sessions.read().await.get(id).cloned()
302    }
303
304    pub async fn save_session(&self, mut session: Session) -> anyhow::Result<()> {
305        if session.workspace_root.is_none() {
306            session.workspace_root = normalize_workspace_path(&session.directory);
307        }
308        let session_id = session.id.clone();
309        self.sessions
310            .write()
311            .await
312            .insert(session_id.clone(), session);
313        self.metadata
314            .write()
315            .await
316            .entry(session_id)
317            .or_insert_with(SessionMeta::default);
318        self.flush().await
319    }
320
321    pub async fn repair_sessions_from_file_store(&self) -> anyhow::Result<SessionRepairStats> {
322        let mut stats = SessionRepairStats::default();
323        let mut sessions = self.sessions.write().await;
324
325        for session in sessions.values_mut() {
326            let imported = load_legacy_session_messages(&self.base, &session.id);
327            if imported.is_empty() {
328                continue;
329            }
330
331            let (merged, merge_stats, changed) =
332                merge_session_messages(&session.messages, &imported);
333            if changed {
334                session.messages = merged;
335                session.time.updated =
336                    most_recent_message_time(&session.messages).unwrap_or(session.time.updated);
337                stats.sessions_repaired += 1;
338                stats.messages_recovered += merge_stats.messages_recovered;
339                stats.parts_recovered += merge_stats.parts_recovered;
340                stats.conflicts_merged += merge_stats.conflicts_merged;
341            }
342        }
343
344        if stats.sessions_repaired > 0 {
345            drop(sessions);
346            self.flush().await?;
347        }
348
349        Ok(stats)
350    }
351
352    pub async fn run_legacy_storage_repair_scan(
353        &self,
354        force: bool,
355    ) -> anyhow::Result<LegacyRepairRunReport> {
356        let marker_path = self.base.join(LEGACY_IMPORT_MARKER_FILE);
357        let sessions_exists = self.base.join("sessions.json").exists();
358        let should_scan = if force {
359            true
360        } else {
361            should_run_legacy_scan_on_startup(&marker_path, sessions_exists).await
362        };
363        if !should_scan {
364            let marker = read_legacy_import_marker(&marker_path)
365                .await
366                .unwrap_or_else(|| LegacyImportMarker {
367                    version: LEGACY_IMPORT_MARKER_VERSION,
368                    created_at_ms: now_ms_u64(),
369                    last_checked_at_ms: now_ms_u64(),
370                    legacy_counts: LegacyTreeCounts::default(),
371                    imported_counts: LegacyImportedCounts::default(),
372                });
373            return Ok(LegacyRepairRunReport {
374                status: "skipped".to_string(),
375                marker_updated: false,
376                sessions_merged: 0,
377                messages_recovered: 0,
378                parts_recovered: 0,
379                legacy_counts: marker.legacy_counts,
380                imported_counts: marker.imported_counts,
381            });
382        }
383
384        let base_for_scan = self.base.clone();
385        let scan = task::spawn_blocking(move || scan_legacy_sessions(&base_for_scan))
386            .await
387            .map_err(|err| anyhow::anyhow!("legacy scan task join error: {}", err))??;
388
389        let merge_stats = {
390            let mut sessions = self.sessions.write().await;
391            merge_legacy_sessions_with_stats(&mut sessions, scan.sessions)
392        };
393
394        if merge_stats.changed {
395            self.flush().await?;
396        }
397
398        let marker = LegacyImportMarker {
399            version: LEGACY_IMPORT_MARKER_VERSION,
400            created_at_ms: now_ms_u64(),
401            last_checked_at_ms: now_ms_u64(),
402            legacy_counts: scan.legacy_counts.clone(),
403            imported_counts: scan.imported_counts.clone(),
404        };
405        self.write_legacy_import_marker(&marker).await?;
406
407        Ok(LegacyRepairRunReport {
408            status: if merge_stats.changed {
409                "updated".to_string()
410            } else {
411                "no_changes".to_string()
412            },
413            marker_updated: true,
414            sessions_merged: merge_stats.sessions_merged,
415            messages_recovered: merge_stats.messages_recovered,
416            parts_recovered: merge_stats.parts_recovered,
417            legacy_counts: scan.legacy_counts,
418            imported_counts: scan.imported_counts,
419        })
420    }
421
422    pub async fn delete_session(&self, id: &str) -> anyhow::Result<bool> {
423        let removed = self.sessions.write().await.remove(id).is_some();
424        self.metadata.write().await.remove(id);
425        self.question_requests
426            .write()
427            .await
428            .retain(|_, request| request.session_id != id);
429        if removed {
430            self.flush().await?;
431        }
432        Ok(removed)
433    }
434
435    pub async fn append_message(&self, session_id: &str, msg: Message) -> anyhow::Result<()> {
436        let mut sessions = self.sessions.write().await;
437        let session = sessions
438            .get_mut(session_id)
439            .context("session not found for append_message")?;
440        session.messages.push(msg);
441        session.time.updated = Utc::now();
442        drop(sessions);
443        self.flush().await
444    }
445
446    pub async fn append_message_part(
447        &self,
448        session_id: &str,
449        message_id: &str,
450        part: MessagePart,
451    ) -> anyhow::Result<()> {
452        let mut sessions = self.sessions.write().await;
453        let session = sessions
454            .get_mut(session_id)
455            .context("session not found for append_message_part")?;
456        let message = if let Some(message) = session
457            .messages
458            .iter_mut()
459            .find(|message| message.id == message_id)
460        {
461            message
462        } else {
463            session
464                .messages
465                .iter_mut()
466                .rev()
467                .find(|message| matches!(message.role, MessageRole::User))
468                .context("message not found for append_message_part")?
469        };
470        reduce_message_parts(&mut message.parts, part);
471        session.time.updated = Utc::now();
472        drop(sessions);
473        self.flush().await
474    }
475
476    pub async fn fork_session(&self, id: &str) -> anyhow::Result<Option<Session>> {
477        let source = {
478            let sessions = self.sessions.read().await;
479            sessions.get(id).cloned()
480        };
481        let Some(mut child) = source else {
482            return Ok(None);
483        };
484
485        child.id = Uuid::new_v4().to_string();
486        child.title = format!("{} (fork)", child.title);
487        child.time.created = Utc::now();
488        child.time.updated = child.time.created;
489        child.slug = None;
490
491        self.sessions
492            .write()
493            .await
494            .insert(child.id.clone(), child.clone());
495        self.metadata.write().await.insert(
496            child.id.clone(),
497            SessionMeta {
498                parent_id: Some(id.to_string()),
499                snapshots: vec![child.messages.clone()],
500                ..SessionMeta::default()
501            },
502        );
503        self.flush().await?;
504        Ok(Some(child))
505    }
506
507    pub async fn revert_session(&self, id: &str) -> anyhow::Result<bool> {
508        let mut sessions = self.sessions.write().await;
509        let Some(session) = sessions.get_mut(id) else {
510            return Ok(false);
511        };
512        let mut metadata = self.metadata.write().await;
513        let meta = metadata
514            .entry(id.to_string())
515            .or_insert_with(SessionMeta::default);
516        let Some(snapshot) = meta.snapshots.pop() else {
517            return Ok(false);
518        };
519        meta.pre_revert = Some(session.messages.clone());
520        session.messages = snapshot;
521        session.time.updated = Utc::now();
522        drop(metadata);
523        drop(sessions);
524        self.flush().await?;
525        Ok(true)
526    }
527
528    pub async fn unrevert_session(&self, id: &str) -> anyhow::Result<bool> {
529        let mut sessions = self.sessions.write().await;
530        let Some(session) = sessions.get_mut(id) else {
531            return Ok(false);
532        };
533        let mut metadata = self.metadata.write().await;
534        let Some(meta) = metadata.get_mut(id) else {
535            return Ok(false);
536        };
537        let Some(previous) = meta.pre_revert.take() else {
538            return Ok(false);
539        };
540        meta.snapshots.push(session.messages.clone());
541        trim_session_snapshots(&mut meta.snapshots);
542        session.messages = previous;
543        session.time.updated = Utc::now();
544        drop(metadata);
545        drop(sessions);
546        self.flush().await?;
547        Ok(true)
548    }
549
550    pub async fn set_shared(&self, id: &str, shared: bool) -> anyhow::Result<Option<String>> {
551        let mut metadata = self.metadata.write().await;
552        let meta = metadata
553            .entry(id.to_string())
554            .or_insert_with(SessionMeta::default);
555        meta.shared = shared;
556        if shared {
557            if meta.share_id.is_none() {
558                meta.share_id = Some(Uuid::new_v4().to_string());
559            }
560        } else {
561            meta.share_id = None;
562        }
563        let share_id = meta.share_id.clone();
564        drop(metadata);
565        self.flush().await?;
566        Ok(share_id)
567    }
568
569    pub async fn set_archived(&self, id: &str, archived: bool) -> anyhow::Result<bool> {
570        let mut metadata = self.metadata.write().await;
571        let meta = metadata
572            .entry(id.to_string())
573            .or_insert_with(SessionMeta::default);
574        meta.archived = archived;
575        drop(metadata);
576        self.flush().await?;
577        Ok(true)
578    }
579
580    pub async fn set_summary(&self, id: &str, summary: String) -> anyhow::Result<bool> {
581        let mut metadata = self.metadata.write().await;
582        let meta = metadata
583            .entry(id.to_string())
584            .or_insert_with(SessionMeta::default);
585        meta.summary = Some(summary);
586        drop(metadata);
587        self.flush().await?;
588        Ok(true)
589    }
590
591    pub async fn children(&self, parent_id: &str) -> Vec<Session> {
592        let child_ids = {
593            let metadata = self.metadata.read().await;
594            metadata
595                .iter()
596                .filter(|(_, meta)| meta.parent_id.as_deref() == Some(parent_id))
597                .map(|(id, _)| id.clone())
598                .collect::<Vec<_>>()
599        };
600        let sessions = self.sessions.read().await;
601        child_ids
602            .into_iter()
603            .filter_map(|id| sessions.get(&id).cloned())
604            .collect()
605    }
606
607    pub async fn session_status(&self, id: &str) -> Option<Value> {
608        let metadata = self.metadata.read().await;
609        metadata.get(id).map(|meta| {
610            json!({
611                "archived": meta.archived,
612                "shared": meta.shared,
613                "parentID": meta.parent_id,
614                "snapshotCount": meta.snapshots.len()
615            })
616        })
617    }
618
619    pub async fn session_diff(&self, id: &str) -> Option<Value> {
620        let sessions = self.sessions.read().await;
621        let current = sessions.get(id)?;
622        let metadata = self.metadata.read().await;
623        let default = SessionMeta::default();
624        let meta = metadata.get(id).unwrap_or(&default);
625        let last_snapshot_len = meta.snapshots.last().map(|s| s.len()).unwrap_or(0);
626        Some(json!({
627            "sessionID": id,
628            "currentMessageCount": current.messages.len(),
629            "lastSnapshotMessageCount": last_snapshot_len,
630            "delta": current.messages.len() as i64 - last_snapshot_len as i64
631        }))
632    }
633
634    pub async fn set_todos(&self, id: &str, todos: Vec<Value>) -> anyhow::Result<()> {
635        let mut metadata = self.metadata.write().await;
636        let meta = metadata
637            .entry(id.to_string())
638            .or_insert_with(SessionMeta::default);
639        meta.todos = normalize_todo_items(todos);
640        drop(metadata);
641        self.flush().await
642    }
643
644    pub async fn get_todos(&self, id: &str) -> Vec<Value> {
645        let todos = self
646            .metadata
647            .read()
648            .await
649            .get(id)
650            .map(|meta| meta.todos.clone())
651            .unwrap_or_default();
652        normalize_todo_items(todos)
653    }
654
655    pub async fn add_question_request(
656        &self,
657        session_id: &str,
658        message_id: &str,
659        questions: Vec<Value>,
660    ) -> anyhow::Result<QuestionRequest> {
661        if questions.is_empty() {
662            return Err(anyhow::anyhow!(
663                "cannot add empty question request for session {}",
664                session_id
665            ));
666        }
667        let request = QuestionRequest {
668            id: format!("q-{}", Uuid::new_v4()),
669            session_id: session_id.to_string(),
670            questions,
671            tool: Some(QuestionToolRef {
672                call_id: format!("call-{}", Uuid::new_v4()),
673                message_id: message_id.to_string(),
674            }),
675        };
676        self.question_requests
677            .write()
678            .await
679            .insert(request.id.clone(), request.clone());
680        self.flush().await?;
681        Ok(request)
682    }
683
684    pub async fn list_question_requests(&self) -> Vec<QuestionRequest> {
685        self.question_requests
686            .read()
687            .await
688            .values()
689            .cloned()
690            .collect()
691    }
692
693    pub async fn reply_question(&self, request_id: &str) -> anyhow::Result<bool> {
694        let removed = self
695            .question_requests
696            .write()
697            .await
698            .remove(request_id)
699            .is_some();
700        if removed {
701            self.flush().await?;
702        }
703        Ok(removed)
704    }
705
706    pub async fn reject_question(&self, request_id: &str) -> anyhow::Result<bool> {
707        self.reply_question(request_id).await
708    }
709
710    pub async fn attach_session_to_workspace(
711        &self,
712        session_id: &str,
713        target_workspace: &str,
714        reason_tag: &str,
715    ) -> anyhow::Result<Option<Session>> {
716        let Some(target_workspace) = normalize_workspace_path(target_workspace) else {
717            return Ok(None);
718        };
719        let mut sessions = self.sessions.write().await;
720        let Some(session) = sessions.get_mut(session_id) else {
721            return Ok(None);
722        };
723        let previous_workspace = session
724            .workspace_root
725            .clone()
726            .or_else(|| normalize_workspace_path(&session.directory));
727
728        if session.origin_workspace_root.is_none() {
729            session.origin_workspace_root = previous_workspace.clone();
730        }
731        session.attached_from_workspace = previous_workspace;
732        session.attached_to_workspace = Some(target_workspace.clone());
733        session.attach_timestamp_ms = Some(Utc::now().timestamp_millis().max(0) as u64);
734        session.attach_reason = Some(reason_tag.trim().to_string());
735        session.workspace_root = Some(target_workspace.clone());
736        session.project_id = workspace_project_id(&target_workspace);
737        session.directory = target_workspace;
738        session.time.updated = Utc::now();
739        let updated = session.clone();
740        drop(sessions);
741        self.flush().await?;
742        Ok(Some(updated))
743    }
744
745    async fn flush(&self) -> anyhow::Result<()> {
746        let _flush_guard = self.flush_lock.lock().await;
747        {
748            let snapshot = self.sessions.read().await.clone();
749            self.flush_file("sessions.json", &snapshot).await?;
750        }
751        {
752            let metadata_snapshot = self.metadata.read().await.clone();
753            self.flush_file("session_meta.json", &metadata_snapshot)
754                .await?;
755        }
756        {
757            let questions_snapshot = self.question_requests.read().await.clone();
758            self.flush_file("questions.json", &questions_snapshot)
759                .await?;
760        }
761        Ok(())
762    }
763
764    async fn flush_file(&self, filename: &str, data: &impl serde::Serialize) -> anyhow::Result<()> {
765        let path = self.base.join(filename);
766        let temp_path = self.base.join(format!("{}.tmp", filename));
767        let payload = serde_json::to_string_pretty(data)?;
768        fs::write(&temp_path, payload).await.with_context(|| {
769            format!("failed to write temp storage file {}", temp_path.display())
770        })?;
771        let std_temp_path: std::path::PathBuf = temp_path.clone().try_into()?;
772        tokio::task::spawn_blocking(move || {
773            let file = std::fs::File::open(&std_temp_path)?;
774            file.sync_all()?;
775            Ok::<(), std::io::Error>(())
776        })
777        .await??;
778        commit_temp_file(&temp_path, &path).await.with_context(|| {
779            format!(
780                "failed to atomically replace storage file {} with {}",
781                path.display(),
782                temp_path.display()
783            )
784        })?;
785        Ok(())
786    }
787
788    async fn write_legacy_import_marker(&self, marker: &LegacyImportMarker) -> anyhow::Result<()> {
789        let payload = serde_json::to_string_pretty(marker)?;
790        fs::write(self.base.join(LEGACY_IMPORT_MARKER_FILE), payload).await?;
791        Ok(())
792    }
793}
794
795async fn commit_temp_file(temp_path: &Path, path: &Path) -> std::io::Result<()> {
796    match tokio::fs::rename(temp_path, path).await {
797        Ok(()) => Ok(()),
798        Err(err) => {
799            #[cfg(windows)]
800            {
801                // Windows `rename` can return PermissionDenied when replacing an existing file.
802                // Fall back to delete-then-rename for this case.
803                use std::io::ErrorKind;
804                if matches!(
805                    err.kind(),
806                    ErrorKind::PermissionDenied | ErrorKind::AlreadyExists
807                ) {
808                    match tokio::fs::remove_file(path).await {
809                        Ok(()) => {}
810                        Err(remove_err) if remove_err.kind() == ErrorKind::NotFound => {}
811                        Err(remove_err) => return Err(remove_err),
812                    }
813                    return tokio::fs::rename(temp_path, path).await;
814                }
815            }
816            Err(err)
817        }
818    }
819}
820
821fn normalize_todo_items(items: Vec<Value>) -> Vec<Value> {
822    items
823        .into_iter()
824        .filter_map(|item| {
825            let obj = item.as_object()?;
826            let content = obj
827                .get("content")
828                .and_then(|v| v.as_str())
829                .or_else(|| obj.get("text").and_then(|v| v.as_str()))
830                .unwrap_or("")
831                .trim()
832                .to_string();
833            if content.is_empty() {
834                return None;
835            }
836            let id = obj
837                .get("id")
838                .and_then(|v| v.as_str())
839                .filter(|s| !s.trim().is_empty())
840                .map(ToString::to_string)
841                .unwrap_or_else(|| format!("todo-{}", Uuid::new_v4()));
842            let status = obj
843                .get("status")
844                .and_then(|v| v.as_str())
845                .filter(|s| !s.trim().is_empty())
846                .map(ToString::to_string)
847                .unwrap_or_else(|| "pending".to_string());
848            Some(json!({
849                "id": id,
850                "content": content,
851                "status": status
852            }))
853        })
854        .collect()
855}
856
857#[derive(Debug)]
858struct LegacyScanResult {
859    sessions: HashMap<String, Session>,
860    legacy_counts: LegacyTreeCounts,
861    imported_counts: LegacyImportedCounts,
862}
863
864#[derive(Debug, Default)]
865struct LegacyMergeStats {
866    changed: bool,
867    sessions_merged: u64,
868    messages_recovered: u64,
869    parts_recovered: u64,
870}
871
872fn now_ms_u64() -> u64 {
873    Utc::now().timestamp_millis().max(0) as u64
874}
875
876async fn should_run_legacy_scan_on_startup(marker_path: &Path, sessions_exist: bool) -> bool {
877    if !sessions_exist {
878        return true;
879    }
880    // Fast-path startup: if canonical sessions already exist, do not block startup
881    // on deep legacy tree scans. Users can trigger an explicit repair scan later.
882    if read_legacy_import_marker(marker_path).await.is_none() {
883        return false;
884    }
885    false
886}
887
888async fn read_legacy_import_marker(marker_path: &Path) -> Option<LegacyImportMarker> {
889    let raw = fs::read_to_string(marker_path).await.ok()?;
890    serde_json::from_str::<LegacyImportMarker>(&raw).ok()
891}
892
893fn scan_legacy_sessions(base: &Path) -> anyhow::Result<LegacyScanResult> {
894    let sessions = load_legacy_opencode_sessions(base).unwrap_or_default();
895    let imported_counts = LegacyImportedCounts {
896        sessions: sessions.len() as u64,
897        messages: sessions.values().map(|s| s.messages.len() as u64).sum(),
898        parts: sessions
899            .values()
900            .flat_map(|s| s.messages.iter())
901            .map(|m| m.parts.len() as u64)
902            .sum(),
903    };
904    let legacy_counts = LegacyTreeCounts {
905        session_files: count_legacy_json_files(&base.join("session")),
906        message_files: count_legacy_json_files(&base.join("message")),
907        part_files: count_legacy_json_files(&base.join("part")),
908    };
909    Ok(LegacyScanResult {
910        sessions,
911        legacy_counts,
912        imported_counts,
913    })
914}
915
916fn count_legacy_json_files(root: &Path) -> u64 {
917    if !root.is_dir() {
918        return 0;
919    }
920    let mut count = 0u64;
921    let mut stack = vec![root.to_path_buf()];
922    while let Some(dir) = stack.pop() {
923        if let Ok(entries) = std::fs::read_dir(&dir) {
924            for entry in entries.flatten() {
925                let path = entry.path();
926                if entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
927                    stack.push(path);
928                    continue;
929                }
930                if path.extension().and_then(|ext| ext.to_str()) == Some("json") {
931                    count += 1;
932                }
933            }
934        }
935    }
936    count
937}
938
939fn merge_legacy_sessions(
940    current: &mut HashMap<String, Session>,
941    imported: HashMap<String, Session>,
942) -> bool {
943    merge_legacy_sessions_with_stats(current, imported).changed
944}
945
946fn merge_legacy_sessions_with_stats(
947    current: &mut HashMap<String, Session>,
948    imported: HashMap<String, Session>,
949) -> LegacyMergeStats {
950    let mut stats = LegacyMergeStats::default();
951    for (id, legacy) in imported {
952        let legacy_message_count = legacy.messages.len() as u64;
953        let legacy_part_count = legacy
954            .messages
955            .iter()
956            .map(|m| m.parts.len() as u64)
957            .sum::<u64>();
958        match current.get_mut(&id) {
959            None => {
960                current.insert(id, legacy);
961                stats.changed = true;
962                stats.sessions_merged += 1;
963                stats.messages_recovered += legacy_message_count;
964                stats.parts_recovered += legacy_part_count;
965            }
966            Some(existing) => {
967                let should_merge_messages =
968                    existing.messages.is_empty() && !legacy.messages.is_empty();
969                let should_fill_title =
970                    existing.title.trim().is_empty() && !legacy.title.trim().is_empty();
971                let should_fill_directory = (existing.directory.trim().is_empty()
972                    || existing.directory.trim() == "."
973                    || existing.directory.trim() == "./"
974                    || existing.directory.trim() == ".\\")
975                    && !legacy.directory.trim().is_empty();
976                let should_fill_workspace =
977                    existing.workspace_root.is_none() && legacy.workspace_root.is_some();
978                if should_merge_messages {
979                    existing.messages = legacy.messages.clone();
980                }
981                if should_fill_title {
982                    existing.title = legacy.title.clone();
983                }
984                if should_fill_directory {
985                    existing.directory = legacy.directory.clone();
986                }
987                if should_fill_workspace {
988                    existing.workspace_root = legacy.workspace_root.clone();
989                }
990                if should_merge_messages
991                    || should_fill_title
992                    || should_fill_directory
993                    || should_fill_workspace
994                {
995                    stats.changed = true;
996                    if should_merge_messages {
997                        stats.sessions_merged += 1;
998                        stats.messages_recovered += legacy_message_count;
999                        stats.parts_recovered += legacy_part_count;
1000                    }
1001                }
1002            }
1003        }
1004    }
1005    stats
1006}
1007
1008fn hydrate_workspace_roots(sessions: &mut HashMap<String, Session>) -> bool {
1009    let mut changed = false;
1010    for session in sessions.values_mut() {
1011        if session.workspace_root.is_none() {
1012            let normalized = normalize_workspace_path(&session.directory);
1013            if normalized.is_some() {
1014                session.workspace_root = normalized;
1015                changed = true;
1016            }
1017        }
1018    }
1019    changed
1020}
1021
1022fn repair_session_titles(sessions: &mut HashMap<String, Session>) -> bool {
1023    let mut changed = false;
1024    for session in sessions.values_mut() {
1025        if !title_needs_repair(&session.title) {
1026            continue;
1027        }
1028        let first_user_text = session.messages.iter().find_map(|message| {
1029            if !matches!(message.role, MessageRole::User) {
1030                return None;
1031            }
1032            message.parts.iter().find_map(|part| match part {
1033                MessagePart::Text { text } if !text.trim().is_empty() => Some(text.as_str()),
1034                _ => None,
1035            })
1036        });
1037        let Some(source) = first_user_text else {
1038            continue;
1039        };
1040        let Some(derived) = derive_session_title_from_prompt(source, 60) else {
1041            continue;
1042        };
1043        if derived == session.title {
1044            continue;
1045        }
1046        session.title = derived;
1047        session.time.updated = Utc::now();
1048        changed = true;
1049    }
1050    changed
1051}
1052
1053#[derive(Debug, Deserialize)]
1054struct LegacySessionTime {
1055    created: i64,
1056    updated: i64,
1057}
1058
1059#[derive(Debug, Deserialize)]
1060struct LegacySession {
1061    id: String,
1062    slug: Option<String>,
1063    version: Option<String>,
1064    #[serde(rename = "projectID")]
1065    project_id: Option<String>,
1066    title: Option<String>,
1067    directory: Option<String>,
1068    time: LegacySessionTime,
1069}
1070
1071fn load_legacy_opencode_sessions(base: &Path) -> anyhow::Result<HashMap<String, Session>> {
1072    let legacy_root = base.join("session");
1073    if !legacy_root.is_dir() {
1074        return Ok(HashMap::new());
1075    }
1076
1077    let mut out = HashMap::new();
1078    let mut stack = vec![legacy_root];
1079    while let Some(dir) = stack.pop() {
1080        for entry in std::fs::read_dir(&dir)? {
1081            let entry = entry?;
1082            let path = entry.path();
1083            if entry.file_type()?.is_dir() {
1084                stack.push(path);
1085                continue;
1086            }
1087            if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1088                continue;
1089            }
1090            let raw = match std::fs::read_to_string(&path) {
1091                Ok(v) => v,
1092                Err(_) => continue,
1093            };
1094            let legacy = match serde_json::from_str::<LegacySession>(&raw) {
1095                Ok(v) => v,
1096                Err(_) => continue,
1097            };
1098            let created = Utc
1099                .timestamp_millis_opt(legacy.time.created)
1100                .single()
1101                .unwrap_or_else(Utc::now);
1102            let updated = Utc
1103                .timestamp_millis_opt(legacy.time.updated)
1104                .single()
1105                .unwrap_or(created);
1106
1107            let session_id = legacy.id.clone();
1108            out.insert(
1109                session_id.clone(),
1110                Session {
1111                    id: session_id.clone(),
1112                    slug: legacy.slug,
1113                    version: legacy.version,
1114                    project_id: legacy.project_id,
1115                    title: legacy
1116                        .title
1117                        .filter(|s| !s.trim().is_empty())
1118                        .unwrap_or_else(|| "New session".to_string()),
1119                    directory: legacy
1120                        .directory
1121                        .clone()
1122                        .filter(|s| !s.trim().is_empty())
1123                        .unwrap_or_else(|| ".".to_string()),
1124                    workspace_root: legacy
1125                        .directory
1126                        .as_deref()
1127                        .and_then(normalize_workspace_path),
1128                    origin_workspace_root: None,
1129                    attached_from_workspace: None,
1130                    attached_to_workspace: None,
1131                    attach_timestamp_ms: None,
1132                    attach_reason: None,
1133                    tenant_context: tandem_types::LocalImplicitTenant.into(),
1134                    time: tandem_types::SessionTime { created, updated },
1135                    model: None,
1136                    provider: None,
1137                    environment: None,
1138                    messages: load_legacy_session_messages(base, &session_id),
1139                },
1140            );
1141        }
1142    }
1143    Ok(out)
1144}
1145
1146#[derive(Debug, Deserialize)]
1147struct LegacyMessageTime {
1148    created: i64,
1149}
1150
1151#[derive(Debug, Deserialize)]
1152struct LegacyMessage {
1153    id: String,
1154    role: String,
1155    time: LegacyMessageTime,
1156}
1157
1158#[derive(Debug, Deserialize)]
1159struct LegacyPart {
1160    #[serde(rename = "type")]
1161    part_type: Option<String>,
1162    text: Option<String>,
1163    tool: Option<String>,
1164    args: Option<Value>,
1165    result: Option<Value>,
1166    error: Option<String>,
1167}
1168
1169fn load_legacy_session_messages(base: &Path, session_id: &str) -> Vec<Message> {
1170    let msg_dir = base.join("message").join(session_id);
1171    if !msg_dir.is_dir() {
1172        return Vec::new();
1173    }
1174
1175    let mut legacy_messages: Vec<(i64, Message)> = Vec::new();
1176
1177    let Ok(entries) = std::fs::read_dir(&msg_dir) else {
1178        return Vec::new();
1179    };
1180
1181    for entry in entries.flatten() {
1182        let path = entry.path();
1183        if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1184            continue;
1185        }
1186        let Ok(raw) = std::fs::read_to_string(&path) else {
1187            continue;
1188        };
1189        let Ok(legacy) = serde_json::from_str::<LegacyMessage>(&raw) else {
1190            continue;
1191        };
1192
1193        let created_at = Utc
1194            .timestamp_millis_opt(legacy.time.created)
1195            .single()
1196            .unwrap_or_else(Utc::now);
1197
1198        legacy_messages.push((
1199            legacy.time.created,
1200            Message {
1201                id: legacy.id.clone(),
1202                role: legacy_role_to_message_role(&legacy.role),
1203                parts: load_legacy_message_parts(base, &legacy.id),
1204                created_at,
1205            },
1206        ));
1207    }
1208
1209    legacy_messages.sort_by_key(|(created_ms, _)| *created_ms);
1210    legacy_messages.into_iter().map(|(_, msg)| msg).collect()
1211}
1212
1213fn load_legacy_message_parts(base: &Path, message_id: &str) -> Vec<MessagePart> {
1214    let parts_dir = base.join("part").join(message_id);
1215    if !parts_dir.is_dir() {
1216        return Vec::new();
1217    }
1218
1219    let Ok(entries) = std::fs::read_dir(&parts_dir) else {
1220        return Vec::new();
1221    };
1222
1223    let mut out = Vec::new();
1224    for entry in entries.flatten() {
1225        let path = entry.path();
1226        if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
1227            continue;
1228        }
1229        let Ok(raw) = std::fs::read_to_string(&path) else {
1230            continue;
1231        };
1232        let Ok(part) = serde_json::from_str::<LegacyPart>(&raw) else {
1233            continue;
1234        };
1235
1236        let mapped = if let Some(tool) = part.tool {
1237            Some(MessagePart::ToolInvocation {
1238                tool,
1239                args: part.args.unwrap_or_else(|| json!({})),
1240                result: part.result,
1241                error: part.error,
1242            })
1243        } else {
1244            match part.part_type.as_deref() {
1245                Some("reasoning") => Some(MessagePart::Reasoning {
1246                    text: part.text.unwrap_or_default(),
1247                }),
1248                Some("tool") => Some(MessagePart::ToolInvocation {
1249                    tool: "tool".to_string(),
1250                    args: part.args.unwrap_or_else(|| json!({})),
1251                    result: part.result,
1252                    error: part.error,
1253                }),
1254                Some("text") | None => Some(MessagePart::Text {
1255                    text: part.text.unwrap_or_default(),
1256                }),
1257                _ => None,
1258            }
1259        };
1260
1261        if let Some(part) = mapped {
1262            out.push(part);
1263        }
1264    }
1265    out
1266}
1267
1268fn legacy_role_to_message_role(role: &str) -> MessageRole {
1269    match role.to_lowercase().as_str() {
1270        "user" => MessageRole::User,
1271        "assistant" => MessageRole::Assistant,
1272        "system" => MessageRole::System,
1273        "tool" => MessageRole::Tool,
1274        _ => MessageRole::Assistant,
1275    }
1276}
1277
1278#[derive(Debug, Clone, Default)]
1279struct MessageMergeStats {
1280    messages_recovered: u64,
1281    parts_recovered: u64,
1282    conflicts_merged: u64,
1283}
1284
1285fn message_richness(msg: &Message) -> usize {
1286    msg.parts
1287        .iter()
1288        .map(|p| match p {
1289            MessagePart::Text { text } | MessagePart::Reasoning { text } => {
1290                if text.trim().is_empty() {
1291                    0
1292                } else {
1293                    1
1294                }
1295            }
1296            MessagePart::ToolInvocation { result, error, .. } => {
1297                if result.is_some() || error.is_some() {
1298                    2
1299                } else {
1300                    1
1301                }
1302            }
1303        })
1304        .sum()
1305}
1306
1307fn most_recent_message_time(messages: &[Message]) -> Option<chrono::DateTime<Utc>> {
1308    messages.iter().map(|m| m.created_at).max()
1309}
1310
1311fn merge_session_messages(
1312    existing: &[Message],
1313    imported: &[Message],
1314) -> (Vec<Message>, MessageMergeStats, bool) {
1315    if existing.is_empty() {
1316        let messages_recovered = imported.len() as u64;
1317        let parts_recovered = imported.iter().map(|m| m.parts.len() as u64).sum();
1318        return (
1319            imported.to_vec(),
1320            MessageMergeStats {
1321                messages_recovered,
1322                parts_recovered,
1323                conflicts_merged: 0,
1324            },
1325            true,
1326        );
1327    }
1328
1329    let mut merged_by_id: HashMap<String, Message> = existing
1330        .iter()
1331        .cloned()
1332        .map(|m| (m.id.clone(), m))
1333        .collect();
1334    let mut stats = MessageMergeStats::default();
1335    let mut changed = false;
1336
1337    for incoming in imported {
1338        match merged_by_id.get(&incoming.id) {
1339            None => {
1340                merged_by_id.insert(incoming.id.clone(), incoming.clone());
1341                stats.messages_recovered += 1;
1342                stats.parts_recovered += incoming.parts.len() as u64;
1343                changed = true;
1344            }
1345            Some(current) => {
1346                let incoming_richer = message_richness(incoming) > message_richness(current)
1347                    || incoming.parts.len() > current.parts.len();
1348                if incoming_richer {
1349                    merged_by_id.insert(incoming.id.clone(), incoming.clone());
1350                    stats.conflicts_merged += 1;
1351                    changed = true;
1352                }
1353            }
1354        }
1355    }
1356
1357    let mut out: Vec<Message> = merged_by_id.into_values().collect();
1358    out.sort_by_key(|m| m.created_at);
1359    (out, stats, changed)
1360}