Skip to main content

vtcode_core/core/
threads.rs

1use crate::exec::events::ThreadEvent;
2use crate::llm::provider::Message;
3use crate::utils::session_archive::{
4    SessionArchive, SessionArchiveMetadata, SessionForkMode, SessionListing,
5    find_session_by_identifier, list_recent_sessions, reserve_session_archive_identifier,
6    session_listing_matches_workspace,
7};
8use anyhow::{Result, anyhow};
9use parking_lot::Mutex;
10use serde::{Deserialize, Serialize};
11use std::collections::VecDeque;
12use std::path::{Path, PathBuf};
13use std::sync::Arc;
14use uuid::Uuid;
15use vtcode_macros::StringNewtype;
16
17const DEFAULT_EVENT_BUFFER_CAPACITY: usize = 512;
18
19/// Unique identifier for a thread in the runtime.
20#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, StringNewtype)]
21pub struct ThreadId(String);
22
23/// Unique identifier for a submission within a thread turn.
24#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, StringNewtype)]
25pub struct SubmissionId(String);
26
27impl SubmissionId {
28    /// Generate a new submission identifier with a `sub-` prefix.
29    pub fn generate() -> Self {
30        Self(format!("sub-{}", Uuid::new_v4()))
31    }
32}
33
34impl Default for SubmissionId {
35    fn default() -> Self {
36        Self::generate()
37    }
38}
39
40#[derive(Debug, Clone)]
41pub struct ThreadEventRecord {
42    pub sequence: u64,
43    pub thread_id: ThreadId,
44    pub submission_id: Option<SubmissionId>,
45    pub turn_id: Option<String>,
46    pub event: ThreadEvent,
47}
48
49#[derive(Debug, Clone)]
50pub struct ThreadSnapshot {
51    pub thread_id: ThreadId,
52    pub metadata: Option<SessionArchiveMetadata>,
53    pub archive_listing: Option<SessionListing>,
54    pub messages: Vec<Message>,
55    pub loaded_skills: Vec<String>,
56    pub turn_in_flight: bool,
57}
58
59#[derive(Debug, Clone)]
60pub struct ThreadBootstrap {
61    pub metadata: Option<SessionArchiveMetadata>,
62    pub archive_listing: Option<SessionListing>,
63    pub messages: Vec<Message>,
64    pub loaded_skills: Vec<String>,
65}
66
67impl ThreadBootstrap {
68    pub fn new(metadata: Option<SessionArchiveMetadata>) -> Self {
69        Self {
70            metadata,
71            archive_listing: None,
72            messages: Vec::new(),
73            loaded_skills: Vec::new(),
74        }
75    }
76
77    pub fn from_listing(listing: SessionListing) -> Self {
78        Self {
79            metadata: Some(listing.snapshot.metadata.clone()),
80            messages: messages_from_session_listing(&listing),
81            loaded_skills: loaded_skills_from_session_listing(&listing),
82            archive_listing: Some(listing),
83        }
84    }
85
86    pub fn from_snapshot(snapshot: ThreadSnapshot) -> Self {
87        Self {
88            metadata: snapshot.metadata,
89            archive_listing: snapshot.archive_listing,
90            messages: snapshot.messages,
91            loaded_skills: snapshot.loaded_skills,
92        }
93    }
94
95    pub fn with_messages(mut self, messages: Vec<Message>) -> Self {
96        self.messages = messages;
97        self
98    }
99
100    pub fn with_loaded_skills(mut self, loaded_skills: Vec<String>) -> Self {
101        self.loaded_skills = loaded_skills;
102        self
103    }
104}
105
106#[derive(Debug, Clone, PartialEq, Eq)]
107pub enum SessionQueryScope {
108    CurrentWorkspace(PathBuf),
109    All,
110}
111
112#[derive(Debug, Clone, PartialEq, Eq)]
113pub enum ArchivedSessionIntent {
114    ResumeInPlace,
115    ForkNewArchive {
116        custom_suffix: Option<String>,
117        summarize: bool,
118    },
119}
120
121#[derive(Debug, Clone)]
122pub struct PreparedArchivedSession {
123    pub source: SessionListing,
124    pub workspace: PathBuf,
125    pub bootstrap: ThreadBootstrap,
126    pub thread_id: String,
127    pub archive: SessionArchive,
128}
129
130#[derive(Default)]
131struct ThreadEventStore {
132    capacity: usize,
133    next_sequence: u64,
134    events: VecDeque<ThreadEventRecord>,
135}
136
137impl ThreadEventStore {
138    fn with_capacity(capacity: usize) -> Self {
139        Self {
140            capacity: capacity.max(1),
141            ..Self::default()
142        }
143    }
144
145    fn push(
146        &mut self,
147        thread_id: &ThreadId,
148        submission_id: Option<SubmissionId>,
149        turn_id: Option<String>,
150        event: ThreadEvent,
151    ) {
152        let record = ThreadEventRecord {
153            sequence: self.next_sequence,
154            thread_id: thread_id.clone(),
155            submission_id,
156            turn_id,
157            event,
158        };
159        self.next_sequence = self.next_sequence.saturating_add(1);
160
161        if self.events.len() >= self.capacity {
162            self.events.pop_front();
163        }
164        self.events.push_back(record);
165    }
166
167    fn snapshot(&self) -> Vec<ThreadEventRecord> {
168        self.events.iter().cloned().collect()
169    }
170}
171
172struct ThreadSessionState {
173    thread_id: ThreadId,
174    metadata: Option<SessionArchiveMetadata>,
175    archive_listing: Option<SessionListing>,
176    messages: Vec<Message>,
177    loaded_skills: Vec<String>,
178    turn_in_flight: bool,
179}
180
181impl ThreadSessionState {
182    fn snapshot(&self) -> ThreadSnapshot {
183        ThreadSnapshot {
184            thread_id: self.thread_id.clone(),
185            metadata: self.metadata.clone(),
186            archive_listing: self.archive_listing.clone(),
187            messages: self.messages.clone(),
188            loaded_skills: self.loaded_skills.clone(),
189            turn_in_flight: self.turn_in_flight,
190        }
191    }
192}
193
194#[derive(Clone)]
195pub struct ThreadRuntimeHandle {
196    inner: Arc<ThreadRuntimeInner>,
197}
198
199struct ThreadRuntimeInner {
200    session: Mutex<ThreadSessionState>,
201    event_store: Mutex<ThreadEventStore>,
202}
203
204impl ThreadRuntimeHandle {
205    fn new(thread_id: ThreadId, bootstrap: ThreadBootstrap, event_capacity: usize) -> Self {
206        let session = ThreadSessionState {
207            thread_id,
208            metadata: bootstrap.metadata,
209            archive_listing: bootstrap.archive_listing,
210            messages: bootstrap.messages,
211            loaded_skills: bootstrap.loaded_skills,
212            turn_in_flight: false,
213        };
214
215        Self {
216            inner: Arc::new(ThreadRuntimeInner {
217                session: Mutex::new(session),
218                event_store: Mutex::new(ThreadEventStore::with_capacity(event_capacity)),
219            }),
220        }
221    }
222
223    pub fn thread_id(&self) -> ThreadId {
224        self.inner.session.lock().thread_id.clone()
225    }
226
227    pub fn snapshot(&self) -> ThreadSnapshot {
228        self.inner.session.lock().snapshot()
229    }
230
231    pub fn metadata(&self) -> Option<SessionArchiveMetadata> {
232        self.inner.session.lock().metadata.clone()
233    }
234
235    pub fn replace_metadata(&self, metadata: Option<SessionArchiveMetadata>) {
236        self.inner.session.lock().metadata = metadata;
237    }
238
239    pub fn archive_listing(&self) -> Option<SessionListing> {
240        self.inner.session.lock().archive_listing.clone()
241    }
242
243    pub fn messages(&self) -> Vec<Message> {
244        self.inner.session.lock().messages.clone()
245    }
246
247    pub fn replace_messages(&self, messages: Vec<Message>) {
248        self.inner.session.lock().messages = messages;
249    }
250
251    pub fn append_message(&self, message: Message) {
252        self.inner.session.lock().messages.push(message);
253    }
254
255    pub fn begin_turn(&self) -> Result<SubmissionId> {
256        let mut session = self.inner.session.lock();
257        if session.turn_in_flight {
258            return Err(anyhow!(
259                "thread '{}' already has an in-flight turn",
260                session.thread_id
261            ));
262        }
263
264        session.turn_in_flight = true;
265        Ok(SubmissionId::generate())
266    }
267
268    pub fn finish_turn(&self) {
269        self.inner.session.lock().turn_in_flight = false;
270    }
271
272    pub fn record_event(
273        &self,
274        submission_id: Option<SubmissionId>,
275        turn_id: Option<String>,
276        event: ThreadEvent,
277    ) {
278        let thread_id = self.thread_id();
279        self.inner
280            .event_store
281            .lock()
282            .push(&thread_id, submission_id, turn_id, event);
283    }
284
285    pub fn replay_recent(&self) -> Vec<ThreadEventRecord> {
286        self.inner.event_store.lock().snapshot()
287    }
288
289    pub fn recent_events(&self) -> Vec<ThreadEvent> {
290        self.replay_recent()
291            .into_iter()
292            .map(|record| record.event)
293            .collect()
294    }
295}
296
297#[derive(Clone)]
298pub struct ThreadManager {
299    event_buffer_capacity: usize,
300}
301
302impl Default for ThreadManager {
303    fn default() -> Self {
304        Self::new()
305    }
306}
307
308impl ThreadManager {
309    pub fn new() -> Self {
310        Self {
311            event_buffer_capacity: DEFAULT_EVENT_BUFFER_CAPACITY,
312        }
313    }
314
315    pub fn with_event_buffer_capacity(event_buffer_capacity: usize) -> Self {
316        Self {
317            event_buffer_capacity: event_buffer_capacity.max(1),
318        }
319    }
320
321    pub fn start_thread_with_identifier(
322        &self,
323        identifier: impl Into<String>,
324        bootstrap: ThreadBootstrap,
325    ) -> ThreadRuntimeHandle {
326        ThreadRuntimeHandle::new(
327            ThreadId::new(identifier.into()),
328            bootstrap,
329            self.event_buffer_capacity,
330        )
331    }
332
333    pub async fn start_thread(
334        &self,
335        workspace_label: &str,
336        custom_suffix: Option<String>,
337        bootstrap: ThreadBootstrap,
338    ) -> Result<ThreadRuntimeHandle> {
339        let identifier = reserve_session_archive_identifier(workspace_label, custom_suffix).await?;
340        Ok(self.start_thread_with_identifier(identifier, bootstrap))
341    }
342
343    pub async fn resume_thread(&self, identifier: &str) -> Result<Option<ThreadRuntimeHandle>> {
344        let listing = find_session_by_identifier(identifier).await?;
345        Ok(listing.map(|listing| {
346            self.start_thread_with_identifier(
347                listing.identifier(),
348                ThreadBootstrap::from_listing(listing),
349            )
350        }))
351    }
352}
353
354pub async fn list_recent_sessions_in_scope(
355    limit: usize,
356    scope: &SessionQueryScope,
357) -> Result<Vec<SessionListing>> {
358    let mut listings = list_recent_sessions(limit.saturating_mul(4).max(limit)).await?;
359    if let SessionQueryScope::CurrentWorkspace(workspace) = scope {
360        listings.retain(|listing| session_listing_matches_workspace(listing, workspace));
361    }
362    listings.truncate(limit);
363    Ok(listings)
364}
365
366pub async fn prepare_archived_session(
367    source: SessionListing,
368    workspace: PathBuf,
369    metadata: SessionArchiveMetadata,
370    intent: ArchivedSessionIntent,
371    reserved_identifier: Option<String>,
372) -> Result<PreparedArchivedSession> {
373    let mut metadata =
374        preserve_prompt_cache_lineage_if_compatible(metadata, &source.snapshot.metadata);
375    metadata.continuation_metadata = source.snapshot.metadata.continuation_metadata.clone();
376    let mut bootstrap = ThreadBootstrap::from_listing(source.clone());
377    bootstrap.metadata = Some(metadata.clone());
378
379    let thread_id = match &intent {
380        ArchivedSessionIntent::ResumeInPlace => source.identifier(),
381        ArchivedSessionIntent::ForkNewArchive { custom_suffix, .. } => {
382            if let Some(identifier) = reserved_identifier {
383                identifier
384            } else {
385                reserve_session_archive_identifier(&metadata.workspace_label, custom_suffix.clone())
386                    .await?
387            }
388        }
389    };
390
391    if let ArchivedSessionIntent::ForkNewArchive { summarize, .. } = &intent {
392        metadata.parent_session_id = Some(source.identifier());
393        metadata.fork_mode = Some(if *summarize {
394            SessionForkMode::Summarized
395        } else {
396            SessionForkMode::FullCopy
397        });
398        bootstrap.metadata = Some(metadata.clone());
399    }
400
401    let archive = match intent {
402        ArchivedSessionIntent::ResumeInPlace => {
403            SessionArchive::resume_from_listing(&source, metadata)
404        }
405        ArchivedSessionIntent::ForkNewArchive { .. } => {
406            SessionArchive::new_with_identifier(metadata, thread_id.clone()).await?
407        }
408    };
409
410    Ok(PreparedArchivedSession {
411        source,
412        workspace,
413        bootstrap,
414        thread_id,
415        archive,
416    })
417}
418
419fn preserve_prompt_cache_lineage_if_compatible(
420    mut metadata: SessionArchiveMetadata,
421    source: &SessionArchiveMetadata,
422) -> SessionArchiveMetadata {
423    let is_compatible = metadata.workspace_path == source.workspace_path
424        && metadata.provider == source.provider
425        && metadata.model == source.model;
426    if is_compatible && let Some(lineage_id) = source.prompt_cache_lineage_id.as_ref() {
427        metadata.prompt_cache_lineage_id = Some(lineage_id.clone());
428    }
429    metadata
430}
431
432pub fn messages_from_session_listing(listing: &SessionListing) -> Vec<Message> {
433    if let Some(progress) = &listing.snapshot.progress
434        && !progress.recent_messages.is_empty()
435    {
436        progress.recent_messages.iter().map(Message::from).collect()
437    } else if !listing.snapshot.messages.is_empty() {
438        listing
439            .snapshot
440            .messages
441            .iter()
442            .map(Message::from)
443            .collect()
444    } else {
445        Vec::new()
446    }
447}
448
449pub fn loaded_skills_from_session_listing(listing: &SessionListing) -> Vec<String> {
450    listing
451        .snapshot
452        .progress
453        .as_ref()
454        .map(|progress| progress.loaded_skills.clone())
455        .filter(|skills| !skills.is_empty())
456        .unwrap_or_else(|| listing.snapshot.metadata.loaded_skills.clone())
457}
458
459pub fn build_thread_archive_metadata(
460    workspace: &Path,
461    model: &str,
462    provider: &str,
463    theme: &str,
464    reasoning_effort: &str,
465) -> SessionArchiveMetadata {
466    let workspace_label = workspace
467        .file_name()
468        .and_then(|value| value.to_str())
469        .unwrap_or("workspace");
470
471    SessionArchiveMetadata::new(
472        workspace_label,
473        workspace.to_string_lossy().to_string(),
474        model,
475        provider,
476        theme,
477        reasoning_effort,
478    )
479    .ensure_prompt_cache_lineage_id()
480}
481
482#[cfg(test)]
483mod tests {
484    use super::*;
485    use crate::exec::events::{ThreadEvent, ThreadStartedEvent};
486    use crate::llm::provider::MessageRole;
487    use crate::utils::session_archive::{
488        SessionArchiveMetadata, SessionMessage, SessionProgress, SessionSnapshot,
489        clear_sessions_dir_override_for_tests, override_sessions_dir_for_tests,
490    };
491    use chrono::Utc;
492    use std::sync::{LazyLock, Mutex};
493    use tempfile::TempDir;
494
495    static SESSION_DIR_TEST_GUARD: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
496
497    #[test]
498    fn event_store_evicts_old_records() {
499        let manager = ThreadManager::with_event_buffer_capacity(2);
500        let handle = manager.start_thread_with_identifier("thread-1", ThreadBootstrap::new(None));
501
502        handle.record_event(
503            None,
504            None,
505            ThreadEvent::ThreadStarted(ThreadStartedEvent {
506                thread_id: "thread-1".to_string(),
507            }),
508        );
509        handle.record_event(
510            None,
511            Some("turn-1".to_string()),
512            ThreadEvent::ThreadStarted(ThreadStartedEvent {
513                thread_id: "thread-1-turn-1".to_string(),
514            }),
515        );
516        handle.record_event(
517            None,
518            Some("turn-2".to_string()),
519            ThreadEvent::ThreadStarted(ThreadStartedEvent {
520                thread_id: "thread-1-turn-2".to_string(),
521            }),
522        );
523
524        let records = handle.replay_recent();
525        assert_eq!(records.len(), 2);
526        assert_eq!(records[0].sequence, 1);
527        assert_eq!(records[1].sequence, 2);
528    }
529
530    #[test]
531    fn start_thread_with_identifier_preserves_message_history() {
532        let manager = ThreadManager::new();
533        let bootstrap = ThreadBootstrap::new(None)
534            .with_messages(vec![Message::user("hello".to_string())])
535            .with_loaded_skills(vec!["repo-skill".to_string()]);
536        let handle = manager.start_thread_with_identifier("thread-123", bootstrap);
537
538        assert_eq!(handle.thread_id().as_str(), "thread-123");
539        let snapshot = handle.snapshot();
540        assert_eq!(snapshot.messages.len(), 1);
541        assert_eq!(snapshot.loaded_skills, vec!["repo-skill".to_string()]);
542    }
543
544    #[test]
545    fn submit_enforces_single_in_flight_turn() {
546        let manager = ThreadManager::new();
547        let handle = manager.start_thread_with_identifier("thread-123", ThreadBootstrap::new(None));
548
549        let _first = handle.begin_turn().expect("first turn");
550        let err = handle.begin_turn().expect_err("second turn should fail");
551        assert!(err.to_string().contains("in-flight turn"));
552        handle.finish_turn();
553        handle.begin_turn().expect("turn after finish");
554    }
555
556    #[test]
557    fn list_recent_sessions_in_scope_filters_by_workspace() {
558        let _guard = SESSION_DIR_TEST_GUARD
559            .lock()
560            .expect("session dir test guard");
561        let tmp = TempDir::new().expect("temp dir");
562        override_sessions_dir_for_tests(tmp.path());
563
564        let listing = SessionListing {
565            path: tmp.path().join("session-alpha.json"),
566            snapshot: SessionSnapshot {
567                metadata: SessionArchiveMetadata::new(
568                    "ws",
569                    tmp.path().join("workspace").display().to_string(),
570                    "model",
571                    "provider",
572                    "theme",
573                    "medium",
574                ),
575                started_at: Utc::now(),
576                ended_at: Utc::now(),
577                total_messages: 1,
578                distinct_tools: Vec::new(),
579                transcript: Vec::new(),
580                messages: vec![SessionMessage::new(MessageRole::User, "hello")],
581                progress: None,
582                error_logs: Vec::new(),
583            },
584        };
585        std::fs::write(
586            &listing.path,
587            serde_json::to_string(&listing.snapshot).expect("serialize snapshot"),
588        )
589        .expect("write listing");
590
591        let runtime = tokio::runtime::Runtime::new().expect("runtime");
592        let filtered = runtime
593            .block_on(list_recent_sessions_in_scope(
594                5,
595                &SessionQueryScope::CurrentWorkspace(tmp.path().join("workspace")),
596            ))
597            .expect("filter by workspace");
598        let all = runtime
599            .block_on(list_recent_sessions_in_scope(5, &SessionQueryScope::All))
600            .expect("list all");
601
602        clear_sessions_dir_override_for_tests();
603
604        assert_eq!(filtered.len(), 1);
605        assert_eq!(all.len(), 1);
606    }
607
608    #[test]
609    fn prepare_archived_session_resume_reuses_source_identifier_and_archive() {
610        let _guard = SESSION_DIR_TEST_GUARD
611            .lock()
612            .expect("session dir test guard");
613        let tmp = TempDir::new().expect("temp dir");
614        override_sessions_dir_for_tests(tmp.path());
615
616        let listing = SessionListing {
617            path: tmp.path().join("session-source.json"),
618            snapshot: SessionSnapshot {
619                metadata: SessionArchiveMetadata::new(
620                    "ws",
621                    tmp.path().join("workspace").display().to_string(),
622                    "old-model",
623                    "old-provider",
624                    "old-theme",
625                    "medium",
626                ),
627                started_at: Utc::now(),
628                ended_at: Utc::now(),
629                total_messages: 2,
630                distinct_tools: vec!["tool_a".to_string()],
631                transcript: Vec::new(),
632                messages: vec![SessionMessage::new(MessageRole::User, "hello")],
633                progress: Some(Box::new(SessionProgress {
634                    turn_number: 1,
635                    recent_messages: vec![SessionMessage::new(MessageRole::Assistant, "recent")],
636                    tool_summaries: Vec::new(),
637                    token_usage: None,
638                    max_context_tokens: None,
639                    loaded_skills: vec!["skill_a".to_string()],
640                })),
641                error_logs: Vec::new(),
642            },
643        };
644
645        let runtime = tokio::runtime::Runtime::new().expect("runtime");
646        let prepared = runtime
647            .block_on(prepare_archived_session(
648                listing.clone(),
649                tmp.path().join("workspace"),
650                SessionArchiveMetadata::new(
651                    "ws",
652                    tmp.path().join("workspace").display().to_string(),
653                    "new-model",
654                    "new-provider",
655                    "new-theme",
656                    "high",
657                ),
658                ArchivedSessionIntent::ResumeInPlace,
659                Some("should-not-be-used".to_string()),
660            ))
661            .expect("prepare resume");
662
663        clear_sessions_dir_override_for_tests();
664
665        assert_eq!(prepared.thread_id, listing.identifier());
666        assert_eq!(prepared.archive.path(), listing.path.as_path());
667        assert_eq!(prepared.bootstrap.messages[0].content.as_text(), "recent");
668        assert_eq!(
669            prepared.bootstrap.loaded_skills,
670            vec!["skill_a".to_string()]
671        );
672        assert_eq!(
673            prepared
674                .bootstrap
675                .metadata
676                .as_ref()
677                .expect("metadata")
678                .model,
679            "new-model"
680        );
681    }
682
683    #[test]
684    fn prepare_archived_session_fork_uses_new_identifier_and_preserves_history() {
685        let _guard = SESSION_DIR_TEST_GUARD
686            .lock()
687            .expect("session dir test guard");
688        let tmp = TempDir::new().expect("temp dir");
689        override_sessions_dir_for_tests(tmp.path());
690
691        let listing = SessionListing {
692            path: tmp.path().join("session-source.json"),
693            snapshot: SessionSnapshot {
694                metadata: SessionArchiveMetadata::new(
695                    "ws",
696                    tmp.path().join("workspace").display().to_string(),
697                    "model",
698                    "provider",
699                    "theme",
700                    "medium",
701                ),
702                started_at: Utc::now(),
703                ended_at: Utc::now(),
704                total_messages: 1,
705                distinct_tools: Vec::new(),
706                transcript: Vec::new(),
707                messages: vec![SessionMessage::new(MessageRole::User, "hello")],
708                progress: None,
709                error_logs: Vec::new(),
710            },
711        };
712
713        let runtime = tokio::runtime::Runtime::new().expect("runtime");
714        let prepared = runtime
715            .block_on(prepare_archived_session(
716                listing.clone(),
717                tmp.path().join("workspace"),
718                SessionArchiveMetadata::new(
719                    "ws",
720                    tmp.path().join("workspace").display().to_string(),
721                    "model",
722                    "provider",
723                    "theme",
724                    "medium",
725                ),
726                ArchivedSessionIntent::ForkNewArchive {
727                    custom_suffix: Some("branch".to_string()),
728                    summarize: false,
729                },
730                Some("session-forked".to_string()),
731            ))
732            .expect("prepare fork");
733
734        clear_sessions_dir_override_for_tests();
735
736        assert_eq!(prepared.thread_id, "session-forked");
737        assert_ne!(prepared.archive.path(), listing.path.as_path());
738        assert!(
739            prepared
740                .archive
741                .path()
742                .ends_with(Path::new("session-forked.json"))
743        );
744        assert_eq!(prepared.bootstrap.messages[0].content.as_text(), "hello");
745        assert_eq!(
746            prepared
747                .bootstrap
748                .metadata
749                .as_ref()
750                .and_then(|metadata| metadata.parent_session_id.as_deref()),
751            Some("session-source")
752        );
753        assert_eq!(
754            prepared
755                .bootstrap
756                .metadata
757                .as_ref()
758                .and_then(|metadata| metadata.fork_mode),
759            Some(SessionForkMode::FullCopy)
760        );
761    }
762
763    #[test]
764    fn prepare_archived_session_preserves_prompt_cache_lineage_when_compatible() {
765        let _guard = SESSION_DIR_TEST_GUARD
766            .lock()
767            .expect("session dir test guard");
768        let tmp = TempDir::new().expect("temp dir");
769        override_sessions_dir_for_tests(tmp.path());
770
771        let listing = SessionListing {
772            path: tmp.path().join("session-source.json"),
773            snapshot: SessionSnapshot {
774                metadata: SessionArchiveMetadata::new(
775                    "ws",
776                    tmp.path().join("workspace").display().to_string(),
777                    "model",
778                    "provider",
779                    "theme",
780                    "medium",
781                )
782                .with_prompt_cache_lineage_id("lineage-source"),
783                started_at: Utc::now(),
784                ended_at: Utc::now(),
785                total_messages: 1,
786                distinct_tools: Vec::new(),
787                transcript: Vec::new(),
788                messages: vec![SessionMessage::new(MessageRole::User, "hello")],
789                progress: None,
790                error_logs: Vec::new(),
791            },
792        };
793
794        let runtime = tokio::runtime::Runtime::new().expect("runtime");
795        let prepared = runtime
796            .block_on(prepare_archived_session(
797                listing,
798                tmp.path().join("workspace"),
799                SessionArchiveMetadata::new(
800                    "ws",
801                    tmp.path().join("workspace").display().to_string(),
802                    "model",
803                    "provider",
804                    "theme",
805                    "medium",
806                )
807                .with_prompt_cache_lineage_id("lineage-new"),
808                ArchivedSessionIntent::ResumeInPlace,
809                None,
810            ))
811            .expect("prepare resume");
812
813        clear_sessions_dir_override_for_tests();
814
815        assert_eq!(
816            prepared
817                .bootstrap
818                .metadata
819                .as_ref()
820                .and_then(|metadata| metadata.prompt_cache_lineage_id.as_deref()),
821            Some("lineage-source")
822        );
823    }
824
825    #[test]
826    fn prepare_archived_session_resets_prompt_cache_lineage_on_model_change() {
827        let _guard = SESSION_DIR_TEST_GUARD
828            .lock()
829            .expect("session dir test guard");
830        let tmp = TempDir::new().expect("temp dir");
831        override_sessions_dir_for_tests(tmp.path());
832
833        let listing = SessionListing {
834            path: tmp.path().join("session-source.json"),
835            snapshot: SessionSnapshot {
836                metadata: SessionArchiveMetadata::new(
837                    "ws",
838                    tmp.path().join("workspace").display().to_string(),
839                    "model-a",
840                    "provider",
841                    "theme",
842                    "medium",
843                )
844                .with_prompt_cache_lineage_id("lineage-source"),
845                started_at: Utc::now(),
846                ended_at: Utc::now(),
847                total_messages: 1,
848                distinct_tools: Vec::new(),
849                transcript: Vec::new(),
850                messages: vec![SessionMessage::new(MessageRole::User, "hello")],
851                progress: None,
852                error_logs: Vec::new(),
853            },
854        };
855
856        let runtime = tokio::runtime::Runtime::new().expect("runtime");
857        let prepared = runtime
858            .block_on(prepare_archived_session(
859                listing,
860                tmp.path().join("workspace"),
861                SessionArchiveMetadata::new(
862                    "ws",
863                    tmp.path().join("workspace").display().to_string(),
864                    "model-b",
865                    "provider",
866                    "theme",
867                    "medium",
868                )
869                .with_prompt_cache_lineage_id("lineage-new"),
870                ArchivedSessionIntent::ResumeInPlace,
871                None,
872            ))
873            .expect("prepare resume");
874
875        clear_sessions_dir_override_for_tests();
876
877        assert_eq!(
878            prepared
879                .bootstrap
880                .metadata
881                .as_ref()
882                .and_then(|metadata| metadata.prompt_cache_lineage_id.as_deref()),
883            Some("lineage-new")
884        );
885    }
886
887    #[test]
888    fn messages_from_session_listing_preserves_assistant_phases_from_progress() {
889        let listing = SessionListing {
890            path: PathBuf::from("session.json"),
891            snapshot: SessionSnapshot {
892                metadata: SessionArchiveMetadata::new(
893                    "ws", "/tmp/ws", "gpt-5.4", "openai", "theme", "medium",
894                ),
895                started_at: Utc::now(),
896                ended_at: Utc::now(),
897                total_messages: 2,
898                distinct_tools: Vec::new(),
899                transcript: Vec::new(),
900                messages: Vec::new(),
901                progress: Some(Box::new(SessionProgress {
902                    turn_number: 2,
903                    recent_messages: vec![
904                        SessionMessage::from(
905                            &Message::assistant("Working".to_string())
906                                .with_phase(Some(crate::llm::provider::AssistantPhase::Commentary)),
907                        ),
908                        SessionMessage::from(
909                            &Message::assistant("Done".to_string()).with_phase(Some(
910                                crate::llm::provider::AssistantPhase::FinalAnswer,
911                            )),
912                        ),
913                    ],
914                    tool_summaries: Vec::new(),
915                    token_usage: None,
916                    max_context_tokens: None,
917                    loaded_skills: Vec::new(),
918                })),
919                error_logs: Vec::new(),
920            },
921        };
922
923        let messages = messages_from_session_listing(&listing);
924        assert_eq!(
925            messages
926                .iter()
927                .map(|message| message.phase)
928                .collect::<Vec<_>>(),
929            vec![
930                Some(crate::llm::provider::AssistantPhase::Commentary),
931                Some(crate::llm::provider::AssistantPhase::FinalAnswer),
932            ]
933        );
934    }
935}