1use crate::exec::events::ThreadEvent;
2use crate::llm::provider::Message;
3use crate::utils::session_archive::{
4 SessionArchive, SessionArchiveMetadata, SessionForkMode, SessionListing,
5 find_session_by_identifier, list_recent_sessions, reserve_session_archive_identifier,
6 session_listing_matches_workspace,
7};
8use anyhow::{Result, anyhow};
9use parking_lot::Mutex;
10use serde::{Deserialize, Serialize};
11use std::collections::VecDeque;
12use std::path::{Path, PathBuf};
13use std::sync::Arc;
14use uuid::Uuid;
15use vtcode_macros::StringNewtype;
16
17const DEFAULT_EVENT_BUFFER_CAPACITY: usize = 512;
18
19#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, StringNewtype)]
21pub struct ThreadId(String);
22
23#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, StringNewtype)]
25pub struct SubmissionId(String);
26
27impl SubmissionId {
28 pub fn generate() -> Self {
30 Self(format!("sub-{}", Uuid::new_v4()))
31 }
32}
33
34impl Default for SubmissionId {
35 fn default() -> Self {
36 Self::generate()
37 }
38}
39
40#[derive(Debug, Clone)]
41pub struct ThreadEventRecord {
42 pub sequence: u64,
43 pub thread_id: ThreadId,
44 pub submission_id: Option<SubmissionId>,
45 pub turn_id: Option<String>,
46 pub event: ThreadEvent,
47}
48
49#[derive(Debug, Clone)]
50pub struct ThreadSnapshot {
51 pub thread_id: ThreadId,
52 pub metadata: Option<SessionArchiveMetadata>,
53 pub archive_listing: Option<SessionListing>,
54 pub messages: Vec<Message>,
55 pub loaded_skills: Vec<String>,
56 pub turn_in_flight: bool,
57}
58
59#[derive(Debug, Clone)]
60pub struct ThreadBootstrap {
61 pub metadata: Option<SessionArchiveMetadata>,
62 pub archive_listing: Option<SessionListing>,
63 pub messages: Vec<Message>,
64 pub loaded_skills: Vec<String>,
65}
66
67impl ThreadBootstrap {
68 pub fn new(metadata: Option<SessionArchiveMetadata>) -> Self {
69 Self {
70 metadata,
71 archive_listing: None,
72 messages: Vec::new(),
73 loaded_skills: Vec::new(),
74 }
75 }
76
77 pub fn from_listing(listing: SessionListing) -> Self {
78 Self {
79 metadata: Some(listing.snapshot.metadata.clone()),
80 messages: messages_from_session_listing(&listing),
81 loaded_skills: loaded_skills_from_session_listing(&listing),
82 archive_listing: Some(listing),
83 }
84 }
85
86 pub fn from_snapshot(snapshot: ThreadSnapshot) -> Self {
87 Self {
88 metadata: snapshot.metadata,
89 archive_listing: snapshot.archive_listing,
90 messages: snapshot.messages,
91 loaded_skills: snapshot.loaded_skills,
92 }
93 }
94
95 pub fn with_messages(mut self, messages: Vec<Message>) -> Self {
96 self.messages = messages;
97 self
98 }
99
100 pub fn with_loaded_skills(mut self, loaded_skills: Vec<String>) -> Self {
101 self.loaded_skills = loaded_skills;
102 self
103 }
104}
105
106#[derive(Debug, Clone, PartialEq, Eq)]
107pub enum SessionQueryScope {
108 CurrentWorkspace(PathBuf),
109 All,
110}
111
112#[derive(Debug, Clone, PartialEq, Eq)]
113pub enum ArchivedSessionIntent {
114 ResumeInPlace,
115 ForkNewArchive {
116 custom_suffix: Option<String>,
117 summarize: bool,
118 },
119}
120
121#[derive(Debug, Clone)]
122pub struct PreparedArchivedSession {
123 pub source: SessionListing,
124 pub workspace: PathBuf,
125 pub bootstrap: ThreadBootstrap,
126 pub thread_id: String,
127 pub archive: SessionArchive,
128}
129
130#[derive(Default)]
131struct ThreadEventStore {
132 capacity: usize,
133 next_sequence: u64,
134 events: VecDeque<ThreadEventRecord>,
135}
136
137impl ThreadEventStore {
138 fn with_capacity(capacity: usize) -> Self {
139 Self {
140 capacity: capacity.max(1),
141 ..Self::default()
142 }
143 }
144
145 fn push(
146 &mut self,
147 thread_id: &ThreadId,
148 submission_id: Option<SubmissionId>,
149 turn_id: Option<String>,
150 event: ThreadEvent,
151 ) {
152 let record = ThreadEventRecord {
153 sequence: self.next_sequence,
154 thread_id: thread_id.clone(),
155 submission_id,
156 turn_id,
157 event,
158 };
159 self.next_sequence = self.next_sequence.saturating_add(1);
160
161 if self.events.len() >= self.capacity {
162 self.events.pop_front();
163 }
164 self.events.push_back(record);
165 }
166
167 fn snapshot(&self) -> Vec<ThreadEventRecord> {
168 self.events.iter().cloned().collect()
169 }
170}
171
172struct ThreadSessionState {
173 thread_id: ThreadId,
174 metadata: Option<SessionArchiveMetadata>,
175 archive_listing: Option<SessionListing>,
176 messages: Vec<Message>,
177 loaded_skills: Vec<String>,
178 turn_in_flight: bool,
179}
180
181impl ThreadSessionState {
182 fn snapshot(&self) -> ThreadSnapshot {
183 ThreadSnapshot {
184 thread_id: self.thread_id.clone(),
185 metadata: self.metadata.clone(),
186 archive_listing: self.archive_listing.clone(),
187 messages: self.messages.clone(),
188 loaded_skills: self.loaded_skills.clone(),
189 turn_in_flight: self.turn_in_flight,
190 }
191 }
192}
193
194#[derive(Clone)]
195pub struct ThreadRuntimeHandle {
196 inner: Arc<ThreadRuntimeInner>,
197}
198
199struct ThreadRuntimeInner {
200 session: Mutex<ThreadSessionState>,
201 event_store: Mutex<ThreadEventStore>,
202}
203
204impl ThreadRuntimeHandle {
205 fn new(thread_id: ThreadId, bootstrap: ThreadBootstrap, event_capacity: usize) -> Self {
206 let session = ThreadSessionState {
207 thread_id,
208 metadata: bootstrap.metadata,
209 archive_listing: bootstrap.archive_listing,
210 messages: bootstrap.messages,
211 loaded_skills: bootstrap.loaded_skills,
212 turn_in_flight: false,
213 };
214
215 Self {
216 inner: Arc::new(ThreadRuntimeInner {
217 session: Mutex::new(session),
218 event_store: Mutex::new(ThreadEventStore::with_capacity(event_capacity)),
219 }),
220 }
221 }
222
223 pub fn thread_id(&self) -> ThreadId {
224 self.inner.session.lock().thread_id.clone()
225 }
226
227 pub fn snapshot(&self) -> ThreadSnapshot {
228 self.inner.session.lock().snapshot()
229 }
230
231 pub fn metadata(&self) -> Option<SessionArchiveMetadata> {
232 self.inner.session.lock().metadata.clone()
233 }
234
235 pub fn replace_metadata(&self, metadata: Option<SessionArchiveMetadata>) {
236 self.inner.session.lock().metadata = metadata;
237 }
238
239 pub fn archive_listing(&self) -> Option<SessionListing> {
240 self.inner.session.lock().archive_listing.clone()
241 }
242
243 pub fn messages(&self) -> Vec<Message> {
244 self.inner.session.lock().messages.clone()
245 }
246
247 pub fn replace_messages(&self, messages: Vec<Message>) {
248 self.inner.session.lock().messages = messages;
249 }
250
251 pub fn append_message(&self, message: Message) {
252 self.inner.session.lock().messages.push(message);
253 }
254
255 pub fn begin_turn(&self) -> Result<SubmissionId> {
256 let mut session = self.inner.session.lock();
257 if session.turn_in_flight {
258 return Err(anyhow!(
259 "thread '{}' already has an in-flight turn",
260 session.thread_id
261 ));
262 }
263
264 session.turn_in_flight = true;
265 Ok(SubmissionId::generate())
266 }
267
268 pub fn finish_turn(&self) {
269 self.inner.session.lock().turn_in_flight = false;
270 }
271
272 pub fn record_event(
273 &self,
274 submission_id: Option<SubmissionId>,
275 turn_id: Option<String>,
276 event: ThreadEvent,
277 ) {
278 let thread_id = self.thread_id();
279 self.inner
280 .event_store
281 .lock()
282 .push(&thread_id, submission_id, turn_id, event);
283 }
284
285 pub fn replay_recent(&self) -> Vec<ThreadEventRecord> {
286 self.inner.event_store.lock().snapshot()
287 }
288
289 pub fn recent_events(&self) -> Vec<ThreadEvent> {
290 self.replay_recent()
291 .into_iter()
292 .map(|record| record.event)
293 .collect()
294 }
295}
296
297#[derive(Clone)]
298pub struct ThreadManager {
299 event_buffer_capacity: usize,
300}
301
302impl Default for ThreadManager {
303 fn default() -> Self {
304 Self::new()
305 }
306}
307
308impl ThreadManager {
309 pub fn new() -> Self {
310 Self {
311 event_buffer_capacity: DEFAULT_EVENT_BUFFER_CAPACITY,
312 }
313 }
314
315 pub fn with_event_buffer_capacity(event_buffer_capacity: usize) -> Self {
316 Self {
317 event_buffer_capacity: event_buffer_capacity.max(1),
318 }
319 }
320
321 pub fn start_thread_with_identifier(
322 &self,
323 identifier: impl Into<String>,
324 bootstrap: ThreadBootstrap,
325 ) -> ThreadRuntimeHandle {
326 ThreadRuntimeHandle::new(
327 ThreadId::new(identifier.into()),
328 bootstrap,
329 self.event_buffer_capacity,
330 )
331 }
332
333 pub async fn start_thread(
334 &self,
335 workspace_label: &str,
336 custom_suffix: Option<String>,
337 bootstrap: ThreadBootstrap,
338 ) -> Result<ThreadRuntimeHandle> {
339 let identifier = reserve_session_archive_identifier(workspace_label, custom_suffix).await?;
340 Ok(self.start_thread_with_identifier(identifier, bootstrap))
341 }
342
343 pub async fn resume_thread(&self, identifier: &str) -> Result<Option<ThreadRuntimeHandle>> {
344 let listing = find_session_by_identifier(identifier).await?;
345 Ok(listing.map(|listing| {
346 self.start_thread_with_identifier(
347 listing.identifier(),
348 ThreadBootstrap::from_listing(listing),
349 )
350 }))
351 }
352}
353
354pub async fn list_recent_sessions_in_scope(
355 limit: usize,
356 scope: &SessionQueryScope,
357) -> Result<Vec<SessionListing>> {
358 let mut listings = list_recent_sessions(limit.saturating_mul(4).max(limit)).await?;
359 if let SessionQueryScope::CurrentWorkspace(workspace) = scope {
360 listings.retain(|listing| session_listing_matches_workspace(listing, workspace));
361 }
362 listings.truncate(limit);
363 Ok(listings)
364}
365
366pub async fn prepare_archived_session(
367 source: SessionListing,
368 workspace: PathBuf,
369 metadata: SessionArchiveMetadata,
370 intent: ArchivedSessionIntent,
371 reserved_identifier: Option<String>,
372) -> Result<PreparedArchivedSession> {
373 let mut metadata =
374 preserve_prompt_cache_lineage_if_compatible(metadata, &source.snapshot.metadata);
375 metadata.continuation_metadata = source.snapshot.metadata.continuation_metadata.clone();
376 let mut bootstrap = ThreadBootstrap::from_listing(source.clone());
377 bootstrap.metadata = Some(metadata.clone());
378
379 let thread_id = match &intent {
380 ArchivedSessionIntent::ResumeInPlace => source.identifier(),
381 ArchivedSessionIntent::ForkNewArchive { custom_suffix, .. } => {
382 if let Some(identifier) = reserved_identifier {
383 identifier
384 } else {
385 reserve_session_archive_identifier(&metadata.workspace_label, custom_suffix.clone())
386 .await?
387 }
388 }
389 };
390
391 if let ArchivedSessionIntent::ForkNewArchive { summarize, .. } = &intent {
392 metadata.parent_session_id = Some(source.identifier());
393 metadata.fork_mode = Some(if *summarize {
394 SessionForkMode::Summarized
395 } else {
396 SessionForkMode::FullCopy
397 });
398 bootstrap.metadata = Some(metadata.clone());
399 }
400
401 let archive = match intent {
402 ArchivedSessionIntent::ResumeInPlace => {
403 SessionArchive::resume_from_listing(&source, metadata)
404 }
405 ArchivedSessionIntent::ForkNewArchive { .. } => {
406 SessionArchive::new_with_identifier(metadata, thread_id.clone()).await?
407 }
408 };
409
410 Ok(PreparedArchivedSession {
411 source,
412 workspace,
413 bootstrap,
414 thread_id,
415 archive,
416 })
417}
418
419fn preserve_prompt_cache_lineage_if_compatible(
420 mut metadata: SessionArchiveMetadata,
421 source: &SessionArchiveMetadata,
422) -> SessionArchiveMetadata {
423 let is_compatible = metadata.workspace_path == source.workspace_path
424 && metadata.provider == source.provider
425 && metadata.model == source.model;
426 if is_compatible && let Some(lineage_id) = source.prompt_cache_lineage_id.as_ref() {
427 metadata.prompt_cache_lineage_id = Some(lineage_id.clone());
428 }
429 metadata
430}
431
432pub fn messages_from_session_listing(listing: &SessionListing) -> Vec<Message> {
433 if let Some(progress) = &listing.snapshot.progress
434 && !progress.recent_messages.is_empty()
435 {
436 progress.recent_messages.iter().map(Message::from).collect()
437 } else if !listing.snapshot.messages.is_empty() {
438 listing
439 .snapshot
440 .messages
441 .iter()
442 .map(Message::from)
443 .collect()
444 } else {
445 Vec::new()
446 }
447}
448
449pub fn loaded_skills_from_session_listing(listing: &SessionListing) -> Vec<String> {
450 listing
451 .snapshot
452 .progress
453 .as_ref()
454 .map(|progress| progress.loaded_skills.clone())
455 .filter(|skills| !skills.is_empty())
456 .unwrap_or_else(|| listing.snapshot.metadata.loaded_skills.clone())
457}
458
459pub fn build_thread_archive_metadata(
460 workspace: &Path,
461 model: &str,
462 provider: &str,
463 theme: &str,
464 reasoning_effort: &str,
465) -> SessionArchiveMetadata {
466 let workspace_label = workspace
467 .file_name()
468 .and_then(|value| value.to_str())
469 .unwrap_or("workspace");
470
471 SessionArchiveMetadata::new(
472 workspace_label,
473 workspace.to_string_lossy().to_string(),
474 model,
475 provider,
476 theme,
477 reasoning_effort,
478 )
479 .ensure_prompt_cache_lineage_id()
480}
481
482#[cfg(test)]
483mod tests {
484 use super::*;
485 use crate::exec::events::{ThreadEvent, ThreadStartedEvent};
486 use crate::llm::provider::MessageRole;
487 use crate::utils::session_archive::{
488 SessionArchiveMetadata, SessionMessage, SessionProgress, SessionSnapshot,
489 clear_sessions_dir_override_for_tests, override_sessions_dir_for_tests,
490 };
491 use chrono::Utc;
492 use std::sync::{LazyLock, Mutex};
493 use tempfile::TempDir;
494
495 static SESSION_DIR_TEST_GUARD: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
496
497 #[test]
498 fn event_store_evicts_old_records() {
499 let manager = ThreadManager::with_event_buffer_capacity(2);
500 let handle = manager.start_thread_with_identifier("thread-1", ThreadBootstrap::new(None));
501
502 handle.record_event(
503 None,
504 None,
505 ThreadEvent::ThreadStarted(ThreadStartedEvent {
506 thread_id: "thread-1".to_string(),
507 }),
508 );
509 handle.record_event(
510 None,
511 Some("turn-1".to_string()),
512 ThreadEvent::ThreadStarted(ThreadStartedEvent {
513 thread_id: "thread-1-turn-1".to_string(),
514 }),
515 );
516 handle.record_event(
517 None,
518 Some("turn-2".to_string()),
519 ThreadEvent::ThreadStarted(ThreadStartedEvent {
520 thread_id: "thread-1-turn-2".to_string(),
521 }),
522 );
523
524 let records = handle.replay_recent();
525 assert_eq!(records.len(), 2);
526 assert_eq!(records[0].sequence, 1);
527 assert_eq!(records[1].sequence, 2);
528 }
529
530 #[test]
531 fn start_thread_with_identifier_preserves_message_history() {
532 let manager = ThreadManager::new();
533 let bootstrap = ThreadBootstrap::new(None)
534 .with_messages(vec![Message::user("hello".to_string())])
535 .with_loaded_skills(vec!["repo-skill".to_string()]);
536 let handle = manager.start_thread_with_identifier("thread-123", bootstrap);
537
538 assert_eq!(handle.thread_id().as_str(), "thread-123");
539 let snapshot = handle.snapshot();
540 assert_eq!(snapshot.messages.len(), 1);
541 assert_eq!(snapshot.loaded_skills, vec!["repo-skill".to_string()]);
542 }
543
544 #[test]
545 fn submit_enforces_single_in_flight_turn() {
546 let manager = ThreadManager::new();
547 let handle = manager.start_thread_with_identifier("thread-123", ThreadBootstrap::new(None));
548
549 let _first = handle.begin_turn().expect("first turn");
550 let err = handle.begin_turn().expect_err("second turn should fail");
551 assert!(err.to_string().contains("in-flight turn"));
552 handle.finish_turn();
553 handle.begin_turn().expect("turn after finish");
554 }
555
556 #[test]
557 fn list_recent_sessions_in_scope_filters_by_workspace() {
558 let _guard = SESSION_DIR_TEST_GUARD
559 .lock()
560 .expect("session dir test guard");
561 let tmp = TempDir::new().expect("temp dir");
562 override_sessions_dir_for_tests(tmp.path());
563
564 let listing = SessionListing {
565 path: tmp.path().join("session-alpha.json"),
566 snapshot: SessionSnapshot {
567 metadata: SessionArchiveMetadata::new(
568 "ws",
569 tmp.path().join("workspace").display().to_string(),
570 "model",
571 "provider",
572 "theme",
573 "medium",
574 ),
575 started_at: Utc::now(),
576 ended_at: Utc::now(),
577 total_messages: 1,
578 distinct_tools: Vec::new(),
579 transcript: Vec::new(),
580 messages: vec![SessionMessage::new(MessageRole::User, "hello")],
581 progress: None,
582 error_logs: Vec::new(),
583 },
584 };
585 std::fs::write(
586 &listing.path,
587 serde_json::to_string(&listing.snapshot).expect("serialize snapshot"),
588 )
589 .expect("write listing");
590
591 let runtime = tokio::runtime::Runtime::new().expect("runtime");
592 let filtered = runtime
593 .block_on(list_recent_sessions_in_scope(
594 5,
595 &SessionQueryScope::CurrentWorkspace(tmp.path().join("workspace")),
596 ))
597 .expect("filter by workspace");
598 let all = runtime
599 .block_on(list_recent_sessions_in_scope(5, &SessionQueryScope::All))
600 .expect("list all");
601
602 clear_sessions_dir_override_for_tests();
603
604 assert_eq!(filtered.len(), 1);
605 assert_eq!(all.len(), 1);
606 }
607
608 #[test]
609 fn prepare_archived_session_resume_reuses_source_identifier_and_archive() {
610 let _guard = SESSION_DIR_TEST_GUARD
611 .lock()
612 .expect("session dir test guard");
613 let tmp = TempDir::new().expect("temp dir");
614 override_sessions_dir_for_tests(tmp.path());
615
616 let listing = SessionListing {
617 path: tmp.path().join("session-source.json"),
618 snapshot: SessionSnapshot {
619 metadata: SessionArchiveMetadata::new(
620 "ws",
621 tmp.path().join("workspace").display().to_string(),
622 "old-model",
623 "old-provider",
624 "old-theme",
625 "medium",
626 ),
627 started_at: Utc::now(),
628 ended_at: Utc::now(),
629 total_messages: 2,
630 distinct_tools: vec!["tool_a".to_string()],
631 transcript: Vec::new(),
632 messages: vec![SessionMessage::new(MessageRole::User, "hello")],
633 progress: Some(Box::new(SessionProgress {
634 turn_number: 1,
635 recent_messages: vec![SessionMessage::new(MessageRole::Assistant, "recent")],
636 tool_summaries: Vec::new(),
637 token_usage: None,
638 max_context_tokens: None,
639 loaded_skills: vec!["skill_a".to_string()],
640 })),
641 error_logs: Vec::new(),
642 },
643 };
644
645 let runtime = tokio::runtime::Runtime::new().expect("runtime");
646 let prepared = runtime
647 .block_on(prepare_archived_session(
648 listing.clone(),
649 tmp.path().join("workspace"),
650 SessionArchiveMetadata::new(
651 "ws",
652 tmp.path().join("workspace").display().to_string(),
653 "new-model",
654 "new-provider",
655 "new-theme",
656 "high",
657 ),
658 ArchivedSessionIntent::ResumeInPlace,
659 Some("should-not-be-used".to_string()),
660 ))
661 .expect("prepare resume");
662
663 clear_sessions_dir_override_for_tests();
664
665 assert_eq!(prepared.thread_id, listing.identifier());
666 assert_eq!(prepared.archive.path(), listing.path.as_path());
667 assert_eq!(prepared.bootstrap.messages[0].content.as_text(), "recent");
668 assert_eq!(
669 prepared.bootstrap.loaded_skills,
670 vec!["skill_a".to_string()]
671 );
672 assert_eq!(
673 prepared
674 .bootstrap
675 .metadata
676 .as_ref()
677 .expect("metadata")
678 .model,
679 "new-model"
680 );
681 }
682
683 #[test]
684 fn prepare_archived_session_fork_uses_new_identifier_and_preserves_history() {
685 let _guard = SESSION_DIR_TEST_GUARD
686 .lock()
687 .expect("session dir test guard");
688 let tmp = TempDir::new().expect("temp dir");
689 override_sessions_dir_for_tests(tmp.path());
690
691 let listing = SessionListing {
692 path: tmp.path().join("session-source.json"),
693 snapshot: SessionSnapshot {
694 metadata: SessionArchiveMetadata::new(
695 "ws",
696 tmp.path().join("workspace").display().to_string(),
697 "model",
698 "provider",
699 "theme",
700 "medium",
701 ),
702 started_at: Utc::now(),
703 ended_at: Utc::now(),
704 total_messages: 1,
705 distinct_tools: Vec::new(),
706 transcript: Vec::new(),
707 messages: vec![SessionMessage::new(MessageRole::User, "hello")],
708 progress: None,
709 error_logs: Vec::new(),
710 },
711 };
712
713 let runtime = tokio::runtime::Runtime::new().expect("runtime");
714 let prepared = runtime
715 .block_on(prepare_archived_session(
716 listing.clone(),
717 tmp.path().join("workspace"),
718 SessionArchiveMetadata::new(
719 "ws",
720 tmp.path().join("workspace").display().to_string(),
721 "model",
722 "provider",
723 "theme",
724 "medium",
725 ),
726 ArchivedSessionIntent::ForkNewArchive {
727 custom_suffix: Some("branch".to_string()),
728 summarize: false,
729 },
730 Some("session-forked".to_string()),
731 ))
732 .expect("prepare fork");
733
734 clear_sessions_dir_override_for_tests();
735
736 assert_eq!(prepared.thread_id, "session-forked");
737 assert_ne!(prepared.archive.path(), listing.path.as_path());
738 assert!(
739 prepared
740 .archive
741 .path()
742 .ends_with(Path::new("session-forked.json"))
743 );
744 assert_eq!(prepared.bootstrap.messages[0].content.as_text(), "hello");
745 assert_eq!(
746 prepared
747 .bootstrap
748 .metadata
749 .as_ref()
750 .and_then(|metadata| metadata.parent_session_id.as_deref()),
751 Some("session-source")
752 );
753 assert_eq!(
754 prepared
755 .bootstrap
756 .metadata
757 .as_ref()
758 .and_then(|metadata| metadata.fork_mode),
759 Some(SessionForkMode::FullCopy)
760 );
761 }
762
763 #[test]
764 fn prepare_archived_session_preserves_prompt_cache_lineage_when_compatible() {
765 let _guard = SESSION_DIR_TEST_GUARD
766 .lock()
767 .expect("session dir test guard");
768 let tmp = TempDir::new().expect("temp dir");
769 override_sessions_dir_for_tests(tmp.path());
770
771 let listing = SessionListing {
772 path: tmp.path().join("session-source.json"),
773 snapshot: SessionSnapshot {
774 metadata: SessionArchiveMetadata::new(
775 "ws",
776 tmp.path().join("workspace").display().to_string(),
777 "model",
778 "provider",
779 "theme",
780 "medium",
781 )
782 .with_prompt_cache_lineage_id("lineage-source"),
783 started_at: Utc::now(),
784 ended_at: Utc::now(),
785 total_messages: 1,
786 distinct_tools: Vec::new(),
787 transcript: Vec::new(),
788 messages: vec![SessionMessage::new(MessageRole::User, "hello")],
789 progress: None,
790 error_logs: Vec::new(),
791 },
792 };
793
794 let runtime = tokio::runtime::Runtime::new().expect("runtime");
795 let prepared = runtime
796 .block_on(prepare_archived_session(
797 listing,
798 tmp.path().join("workspace"),
799 SessionArchiveMetadata::new(
800 "ws",
801 tmp.path().join("workspace").display().to_string(),
802 "model",
803 "provider",
804 "theme",
805 "medium",
806 )
807 .with_prompt_cache_lineage_id("lineage-new"),
808 ArchivedSessionIntent::ResumeInPlace,
809 None,
810 ))
811 .expect("prepare resume");
812
813 clear_sessions_dir_override_for_tests();
814
815 assert_eq!(
816 prepared
817 .bootstrap
818 .metadata
819 .as_ref()
820 .and_then(|metadata| metadata.prompt_cache_lineage_id.as_deref()),
821 Some("lineage-source")
822 );
823 }
824
825 #[test]
826 fn prepare_archived_session_resets_prompt_cache_lineage_on_model_change() {
827 let _guard = SESSION_DIR_TEST_GUARD
828 .lock()
829 .expect("session dir test guard");
830 let tmp = TempDir::new().expect("temp dir");
831 override_sessions_dir_for_tests(tmp.path());
832
833 let listing = SessionListing {
834 path: tmp.path().join("session-source.json"),
835 snapshot: SessionSnapshot {
836 metadata: SessionArchiveMetadata::new(
837 "ws",
838 tmp.path().join("workspace").display().to_string(),
839 "model-a",
840 "provider",
841 "theme",
842 "medium",
843 )
844 .with_prompt_cache_lineage_id("lineage-source"),
845 started_at: Utc::now(),
846 ended_at: Utc::now(),
847 total_messages: 1,
848 distinct_tools: Vec::new(),
849 transcript: Vec::new(),
850 messages: vec![SessionMessage::new(MessageRole::User, "hello")],
851 progress: None,
852 error_logs: Vec::new(),
853 },
854 };
855
856 let runtime = tokio::runtime::Runtime::new().expect("runtime");
857 let prepared = runtime
858 .block_on(prepare_archived_session(
859 listing,
860 tmp.path().join("workspace"),
861 SessionArchiveMetadata::new(
862 "ws",
863 tmp.path().join("workspace").display().to_string(),
864 "model-b",
865 "provider",
866 "theme",
867 "medium",
868 )
869 .with_prompt_cache_lineage_id("lineage-new"),
870 ArchivedSessionIntent::ResumeInPlace,
871 None,
872 ))
873 .expect("prepare resume");
874
875 clear_sessions_dir_override_for_tests();
876
877 assert_eq!(
878 prepared
879 .bootstrap
880 .metadata
881 .as_ref()
882 .and_then(|metadata| metadata.prompt_cache_lineage_id.as_deref()),
883 Some("lineage-new")
884 );
885 }
886
887 #[test]
888 fn messages_from_session_listing_preserves_assistant_phases_from_progress() {
889 let listing = SessionListing {
890 path: PathBuf::from("session.json"),
891 snapshot: SessionSnapshot {
892 metadata: SessionArchiveMetadata::new(
893 "ws", "/tmp/ws", "gpt-5.4", "openai", "theme", "medium",
894 ),
895 started_at: Utc::now(),
896 ended_at: Utc::now(),
897 total_messages: 2,
898 distinct_tools: Vec::new(),
899 transcript: Vec::new(),
900 messages: Vec::new(),
901 progress: Some(Box::new(SessionProgress {
902 turn_number: 2,
903 recent_messages: vec![
904 SessionMessage::from(
905 &Message::assistant("Working".to_string())
906 .with_phase(Some(crate::llm::provider::AssistantPhase::Commentary)),
907 ),
908 SessionMessage::from(
909 &Message::assistant("Done".to_string()).with_phase(Some(
910 crate::llm::provider::AssistantPhase::FinalAnswer,
911 )),
912 ),
913 ],
914 tool_summaries: Vec::new(),
915 token_usage: None,
916 max_context_tokens: None,
917 loaded_skills: Vec::new(),
918 })),
919 error_logs: Vec::new(),
920 },
921 };
922
923 let messages = messages_from_session_listing(&listing);
924 assert_eq!(
925 messages
926 .iter()
927 .map(|message| message.phase)
928 .collect::<Vec<_>>(),
929 vec![
930 Some(crate::llm::provider::AssistantPhase::Commentary),
931 Some(crate::llm::provider::AssistantPhase::FinalAnswer),
932 ]
933 );
934 }
935}