Skip to main content

codewhale_core/
lib.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use anyhow::Result;
6use codewhale_agent::ModelRegistry;
7use codewhale_config::{CliRuntimeOverrides, ConfigToml, ProviderKind};
8use codewhale_execpolicy::{
9    AskForApproval, ExecApprovalRequirement, ExecPolicyContext, ExecPolicyDecision,
10    ExecPolicyEngine,
11};
12use codewhale_hooks::{HookDispatcher, HookEvent};
13use codewhale_mcp::{
14    McpManager, McpStartupCompleteEvent, McpStartupStatus as McpManagerStartupStatus,
15};
16use codewhale_protocol::{
17    AppResponse, EventFrame, ExecApprovalRequestEvent, PromptRequest, PromptResponse,
18    ResponseChannel, ReviewDecision, Thread, ThreadForkParams, ThreadGoal, ThreadGoalClearParams,
19    ThreadGoalGetParams, ThreadGoalProgressParams, ThreadGoalSetParams, ThreadGoalStatus,
20    ThreadListParams, ThreadReadParams, ThreadRequest, ThreadResponse, ThreadResumeParams,
21    ThreadSetNameParams, ThreadStatus, ToolPayload, UserInputRequestEvent,
22};
23use codewhale_state::{
24    JobStateRecord, JobStateStatus, SessionSource, StateStore, ThreadGoalRecord,
25    ThreadGoalStatus as PersistedThreadGoalStatus, ThreadListFilters, ThreadMetadata,
26    ThreadStatus as PersistedThreadStatus,
27};
28use codewhale_tools::{ToolCall, ToolRegistry};
29use serde_json::{Value, json};
30use uuid::Uuid;
31
32/// How a new thread's conversation history is initialized.
33#[derive(Debug, Clone)]
34pub enum InitialHistory {
35    /// Start with an empty conversation.
36    New,
37    /// Forked from an existing thread with the given history items.
38    Forked(Vec<Value>),
39    /// Resumed from a persisted thread with its full history.
40    Resumed {
41        conversation_id: String,
42        history: Vec<Value>,
43        rollout_path: PathBuf,
44    },
45}
46
47/// Result of spawning or resuming a thread.
48#[derive(Debug, Clone)]
49pub struct NewThread {
50    /// The thread metadata.
51    pub thread: Thread,
52    /// Resolved model identifier.
53    pub model: String,
54    /// Provider that serves the model.
55    pub model_provider: String,
56    /// Working directory for the thread.
57    pub cwd: PathBuf,
58    /// Approval policy override, if any.
59    pub approval_policy: Option<String>,
60    /// Sandbox mode override, if any.
61    pub sandbox: Option<String>,
62}
63
64/// Status of a background job.
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum JobStatus {
67    /// Waiting to be picked up.
68    Queued,
69    /// Currently executing.
70    Running,
71    /// Temporarily paused.
72    Paused,
73    /// Finished successfully.
74    Completed,
75    /// Finished with an error.
76    Failed,
77    /// Cancelled by the user.
78    Cancelled,
79}
80
81const JOB_DETAIL_SCHEMA_VERSION: u8 = 1;
82const DEFAULT_JOB_MAX_ATTEMPTS: u32 = 3;
83const DEFAULT_JOB_BACKOFF_BASE_MS: u64 = 500;
84const MAX_JOB_HISTORY_ENTRIES: usize = 64;
85
86/// Retry state for a job that failed and may be retried.
87#[derive(Debug, Clone)]
88pub struct JobRetryMetadata {
89    /// Current attempt number (0 = not yet retried).
90    pub attempt: u32,
91    /// Maximum number of retry attempts before giving up.
92    pub max_attempts: u32,
93    /// Base delay in milliseconds for exponential backoff.
94    pub backoff_base_ms: u64,
95    /// Computed delay in milliseconds until the next retry.
96    pub next_backoff_ms: u64,
97    /// Timestamp when the next retry should be attempted.
98    pub next_retry_at: Option<i64>,
99}
100
101impl Default for JobRetryMetadata {
102    fn default() -> Self {
103        Self {
104            attempt: 0,
105            max_attempts: DEFAULT_JOB_MAX_ATTEMPTS,
106            backoff_base_ms: DEFAULT_JOB_BACKOFF_BASE_MS,
107            next_backoff_ms: 0,
108            next_retry_at: None,
109        }
110    }
111}
112
113/// A single entry in a job's history log.
114#[derive(Debug, Clone)]
115pub struct JobHistoryEntry {
116    /// Timestamp when this entry was recorded.
117    pub at: i64,
118    /// Phase name (e.g., "created", "running", "failed").
119    pub phase: String,
120    /// Job status at this point in time.
121    pub status: JobStatus,
122    /// Progress percentage at this point, if available.
123    pub progress: Option<u8>,
124    /// Human-readable detail message.
125    pub detail: Option<String>,
126    /// Retry state snapshot at this point.
127    pub retry: JobRetryMetadata,
128}
129
130#[derive(Debug, Clone)]
131struct PersistedJobDetail {
132    pub status: JobStatus,
133    pub detail: Option<String>,
134    pub retry: JobRetryMetadata,
135    pub history: Vec<JobHistoryEntry>,
136}
137
138/// A complete job record with all metadata and history.
139#[derive(Debug, Clone)]
140pub struct JobRecord {
141    /// Unique job identifier.
142    pub id: String,
143    /// Human-readable job name.
144    pub name: String,
145    /// Current job status.
146    pub status: JobStatus,
147    /// Current progress percentage (0-100).
148    pub progress: Option<u8>,
149    /// Human-readable detail about the current state.
150    pub detail: Option<String>,
151    /// Retry state for failed jobs.
152    pub retry: JobRetryMetadata,
153    /// Chronological history of state transitions.
154    pub history: Vec<JobHistoryEntry>,
155    /// Timestamp when the job was created.
156    pub created_at: i64,
157    /// Timestamp of the last state change.
158    pub updated_at: i64,
159}
160
161/// Manages background jobs with retry logic and persistence.
162#[derive(Debug, Default)]
163pub struct JobManager {
164    jobs: HashMap<String, JobRecord>,
165}
166
167impl JobManager {
168    fn now_ts() -> i64 {
169        chrono::Utc::now().timestamp()
170    }
171
172    fn deterministic_backoff_ms(retry: &JobRetryMetadata) -> u64 {
173        if retry.attempt == 0 {
174            return 0;
175        }
176        let exponent = retry.attempt.saturating_sub(1).min(20);
177        let multiplier = 1u64.checked_shl(exponent).unwrap_or(u64::MAX);
178        retry.backoff_base_ms.saturating_mul(multiplier)
179    }
180
181    fn clear_retry_schedule(retry: &mut JobRetryMetadata) {
182        retry.next_backoff_ms = 0;
183        retry.next_retry_at = None;
184    }
185
186    fn push_history(job: &mut JobRecord, phase: &str) {
187        job.history.push(JobHistoryEntry {
188            at: job.updated_at,
189            phase: phase.to_string(),
190            status: job.status,
191            progress: job.progress,
192            detail: job.detail.clone(),
193            retry: job.retry.clone(),
194        });
195        if job.history.len() > MAX_JOB_HISTORY_ENTRIES {
196            let to_drain = job.history.len() - MAX_JOB_HISTORY_ENTRIES;
197            job.history.drain(0..to_drain);
198        }
199    }
200
201    fn parse_persisted_detail(raw: Option<&str>) -> Option<PersistedJobDetail> {
202        let raw = raw?;
203        let parsed: Value = serde_json::from_str(raw).ok()?;
204        let status = parsed
205            .get("status")
206            .and_then(Value::as_str)
207            .and_then(job_status_from_str)?;
208        let detail = parsed.get("detail").and_then(json_optional_string);
209        let retry = parse_retry_metadata(parsed.get("retry"));
210        let history = parsed
211            .get("history")
212            .and_then(Value::as_array)
213            .map(|items| {
214                items
215                    .iter()
216                    .filter_map(parse_history_entry)
217                    .collect::<Vec<_>>()
218            })
219            .unwrap_or_default();
220        Some(PersistedJobDetail {
221            status,
222            detail,
223            retry,
224            history,
225        })
226    }
227
228    fn encode_persisted_detail(job: &JobRecord) -> Result<Option<String>> {
229        let encoded = json!({
230            "schema_version": JOB_DETAIL_SCHEMA_VERSION,
231            "status": job_status_to_str(job.status),
232            "detail": job.detail.clone(),
233            "retry": job_retry_to_value(&job.retry),
234            "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
235        })
236        .to_string();
237        Ok(Some(encoded))
238    }
239
240    /// Enqueues a new job and returns its record.
241    pub fn enqueue(&mut self, name: impl Into<String>) -> JobRecord {
242        let now = Self::now_ts();
243        let id = format!("job-{}", Uuid::new_v4());
244        let mut job = JobRecord {
245            id: id.clone(),
246            name: name.into(),
247            status: JobStatus::Queued,
248            progress: Some(0),
249            detail: None,
250            retry: JobRetryMetadata::default(),
251            history: Vec::new(),
252            created_at: now,
253            updated_at: now,
254        };
255        Self::push_history(&mut job, "created");
256        self.jobs.insert(id, job.clone());
257        job
258    }
259
260    /// Transitions a job to running and clears its retry schedule.
261    pub fn set_running(&mut self, id: &str) {
262        if let Some(job) = self.jobs.get_mut(id) {
263            job.status = JobStatus::Running;
264            Self::clear_retry_schedule(&mut job.retry);
265            job.updated_at = Self::now_ts();
266            Self::push_history(job, "running");
267        }
268    }
269
270    /// Updates a job's progress (clamped to 100) and optional detail message.
271    pub fn update_progress(&mut self, id: &str, progress: u8, detail: Option<String>) {
272        if let Some(job) = self.jobs.get_mut(id) {
273            job.progress = Some(progress.min(100));
274            job.detail = detail;
275            job.updated_at = Self::now_ts();
276            Self::push_history(job, "progress_updated");
277        }
278    }
279
280    /// Marks a job as completed with 100% progress and clears its retry schedule.
281    pub fn complete(&mut self, id: &str) {
282        if let Some(job) = self.jobs.get_mut(id) {
283            job.status = JobStatus::Completed;
284            job.progress = Some(100);
285            Self::clear_retry_schedule(&mut job.retry);
286            job.updated_at = Self::now_ts();
287            Self::push_history(job, "completed");
288        }
289    }
290
291    /// Marks a job as failed and schedules a retry if attempts remain.
292    pub fn fail(&mut self, id: &str, detail: impl Into<String>) {
293        if let Some(job) = self.jobs.get_mut(id) {
294            let now = Self::now_ts();
295            job.status = JobStatus::Failed;
296            job.detail = Some(detail.into());
297            if job.retry.attempt < job.retry.max_attempts {
298                job.retry.attempt += 1;
299                job.retry.next_backoff_ms = Self::deterministic_backoff_ms(&job.retry);
300                let delay_secs = ((job.retry.next_backoff_ms.saturating_add(999)) / 1000)
301                    .min(i64::MAX as u64) as i64;
302                job.retry.next_retry_at = Some(now.saturating_add(delay_secs));
303            } else {
304                Self::clear_retry_schedule(&mut job.retry);
305            }
306            job.updated_at = now;
307            Self::push_history(job, "failed");
308        }
309    }
310
311    /// Cancels a job and clears any pending retry schedule.
312    pub fn cancel(&mut self, id: &str) {
313        if let Some(job) = self.jobs.get_mut(id) {
314            job.status = JobStatus::Cancelled;
315            Self::clear_retry_schedule(&mut job.retry);
316            job.updated_at = Self::now_ts();
317            Self::push_history(job, "cancelled");
318        }
319    }
320
321    /// Pauses a job, optionally updating its detail message.
322    pub fn pause(&mut self, id: &str, detail: Option<String>) {
323        if let Some(job) = self.jobs.get_mut(id) {
324            job.status = JobStatus::Paused;
325            if detail.is_some() {
326                job.detail = detail;
327            }
328            job.updated_at = Self::now_ts();
329            Self::push_history(job, "paused");
330        }
331    }
332
333    /// Resumes a paused or failed job back to running status.
334    pub fn resume(&mut self, id: &str, detail: Option<String>) {
335        if let Some(job) = self.jobs.get_mut(id) {
336            job.status = JobStatus::Running;
337            if detail.is_some() {
338                job.detail = detail;
339            }
340            Self::clear_retry_schedule(&mut job.retry);
341            job.updated_at = Self::now_ts();
342            Self::push_history(job, "resumed");
343        }
344    }
345
346    /// Returns all jobs sorted by most recently updated first.
347    pub fn list(&self) -> Vec<JobRecord> {
348        let mut out = self.jobs.values().cloned().collect::<Vec<_>>();
349        out.sort_by_key(|job| std::cmp::Reverse(job.updated_at));
350        out
351    }
352
353    /// Returns the history entries for a job, or an empty vec if not found.
354    pub fn history(&self, id: &str) -> Vec<JobHistoryEntry> {
355        self.jobs
356            .get(id)
357            .map(|job| job.history.clone())
358            .unwrap_or_default()
359    }
360
361    /// Resets queued or running jobs back to queued on application resume.
362    pub fn resume_pending(&mut self) -> Vec<JobRecord> {
363        let mut resumed = Vec::new();
364        for job in self.jobs.values_mut() {
365            if matches!(job.status, JobStatus::Queued | JobStatus::Running) {
366                job.status = JobStatus::Queued;
367                job.updated_at = Self::now_ts();
368                Self::push_history(job, "queued_after_resume");
369                resumed.push(job.clone());
370            }
371        }
372        resumed
373    }
374
375    /// Loads jobs from the state store, deserializing extended detail when available.
376    pub fn load_from_store(&mut self, store: &StateStore) -> Result<()> {
377        let persisted = store.list_jobs(Some(500))?;
378        for job in persisted {
379            let fallback_status = job_state_status_to_runtime(job.status);
380            let parsed = Self::parse_persisted_detail(job.detail.as_deref());
381            let (status, detail, retry, history) = if let Some(detail_state) = parsed {
382                (
383                    detail_state.status,
384                    detail_state.detail,
385                    detail_state.retry,
386                    detail_state.history,
387                )
388            } else {
389                (
390                    fallback_status,
391                    job.detail,
392                    JobRetryMetadata::default(),
393                    Vec::new(),
394                )
395            };
396            self.jobs.insert(
397                job.id.clone(),
398                JobRecord {
399                    id: job.id,
400                    name: job.name,
401                    status,
402                    progress: job.progress,
403                    detail,
404                    retry,
405                    history,
406                    created_at: job.created_at,
407                    updated_at: job.updated_at,
408                },
409            );
410        }
411        Ok(())
412    }
413
414    /// Persists a single job's current state to the state store.
415    pub fn persist_job(&self, store: &StateStore, id: &str) -> Result<()> {
416        let Some(job) = self.jobs.get(id) else {
417            return Ok(());
418        };
419        let encoded_detail = Self::encode_persisted_detail(job)?;
420        store.upsert_job(&JobStateRecord {
421            id: job.id.clone(),
422            name: job.name.clone(),
423            status: runtime_status_to_job_state(job.status),
424            progress: job.progress,
425            detail: encoded_detail,
426            created_at: job.created_at,
427            updated_at: job.updated_at,
428        })
429    }
430
431    /// Persists all in-memory jobs to the state store.
432    pub fn persist_all(&self, store: &StateStore) -> Result<()> {
433        for id in self.jobs.keys() {
434            self.persist_job(store, id)?;
435        }
436        Ok(())
437    }
438}
439
440/// Manages thread lifecycle: spawn, resume, fork, archive, and persistence.
441pub struct ThreadManager {
442    store: StateStore,
443    running_threads: HashMap<String, Thread>,
444    cli_version: String,
445}
446
447impl ThreadManager {
448    /// Creates a new `ThreadManager` backed by the given state store.
449    pub fn new(store: StateStore) -> Self {
450        Self {
451            store,
452            running_threads: HashMap::new(),
453            cli_version: env!("CARGO_PKG_VERSION").to_string(),
454        }
455    }
456
457    /// Returns a reference to the underlying state store.
458    pub fn state_store(&self) -> &StateStore {
459        &self.store
460    }
461
462    /// Spawns a new thread with the given initial history and persists it.
463    pub fn spawn_thread_with_history(
464        &mut self,
465        model_provider: String,
466        cwd: PathBuf,
467        initial_history: InitialHistory,
468        persist_extended_history: bool,
469    ) -> Result<NewThread> {
470        let id = format!("thread-{}", Uuid::new_v4());
471        let now = chrono::Utc::now().timestamp();
472        let preview = preview_from_initial_history(&initial_history);
473        let source = match initial_history {
474            InitialHistory::New => SessionSource::Interactive,
475            InitialHistory::Forked(_) => SessionSource::Fork,
476            InitialHistory::Resumed { .. } => SessionSource::Resume,
477        };
478        let thread = Thread {
479            id: id.clone(),
480            preview,
481            ephemeral: !persist_extended_history,
482            model_provider: model_provider.clone(),
483            created_at: now,
484            updated_at: now,
485            status: ThreadStatus::Running,
486            path: None,
487            cwd: cwd.clone(),
488            cli_version: self.cli_version.clone(),
489            source: match source {
490                SessionSource::Interactive => codewhale_protocol::SessionSource::Interactive,
491                SessionSource::Resume => codewhale_protocol::SessionSource::Resume,
492                SessionSource::Fork => codewhale_protocol::SessionSource::Fork,
493                SessionSource::Api => codewhale_protocol::SessionSource::Api,
494                SessionSource::Unknown => codewhale_protocol::SessionSource::Unknown,
495            },
496            name: None,
497        };
498        self.persist_thread(&thread, None)?;
499        match &initial_history {
500            InitialHistory::Forked(items) => {
501                for item in items {
502                    self.store.append_message(
503                        &thread.id,
504                        "history",
505                        &item.to_string(),
506                        Some(item.clone()),
507                    )?;
508                }
509            }
510            InitialHistory::Resumed { history, .. } => {
511                for item in history {
512                    self.store.append_message(
513                        &thread.id,
514                        "history",
515                        &item.to_string(),
516                        Some(item.clone()),
517                    )?;
518                }
519            }
520            InitialHistory::New => {}
521        }
522        self.running_threads
523            .insert(thread.id.clone(), thread.clone());
524        Ok(NewThread {
525            thread,
526            model: "auto".to_string(),
527            model_provider,
528            cwd,
529            approval_policy: None,
530            sandbox: None,
531        })
532    }
533
534    /// Resumes an existing thread, returning `None` if not found.
535    pub fn resume_thread_with_history(
536        &mut self,
537        params: &ThreadResumeParams,
538        fallback_cwd: &Path,
539        model_provider: String,
540    ) -> Result<Option<NewThread>> {
541        if params.history.is_none()
542            && let Some(thread) = self.running_threads.get(&params.thread_id).cloned()
543        {
544            return Ok(Some(NewThread {
545                model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
546                model_provider: params.model_provider.clone().unwrap_or(model_provider),
547                cwd: params.cwd.clone().unwrap_or_else(|| thread.cwd.clone()),
548                approval_policy: params.approval_policy.clone(),
549                sandbox: params.sandbox.clone(),
550                thread,
551            }));
552        }
553
554        let persisted = self.store.get_thread(&params.thread_id)?;
555        let Some(metadata) = persisted else {
556            return Ok(None);
557        };
558        let mut thread = to_protocol_thread(metadata);
559        thread.status = ThreadStatus::Running;
560        thread.updated_at = chrono::Utc::now().timestamp();
561        thread.cwd = params
562            .cwd
563            .clone()
564            .unwrap_or_else(|| fallback_cwd.to_path_buf());
565        self.persist_thread(&thread, None)?;
566        self.running_threads
567            .insert(thread.id.clone(), thread.clone());
568        if let Some(history) = params.history.as_ref() {
569            for item in history {
570                self.store.append_message(
571                    &thread.id,
572                    "history",
573                    &item.to_string(),
574                    Some(item.clone()),
575                )?;
576            }
577        }
578
579        Ok(Some(NewThread {
580            model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
581            model_provider: params.model_provider.clone().unwrap_or(model_provider),
582            cwd: thread.cwd.clone(),
583            approval_policy: params.approval_policy.clone(),
584            sandbox: params.sandbox.clone(),
585            thread,
586        }))
587    }
588
589    /// Forks an existing thread into a new one, inheriting the parent's provider.
590    pub fn fork_thread(
591        &mut self,
592        params: &ThreadForkParams,
593        fallback_cwd: &Path,
594    ) -> Result<Option<NewThread>> {
595        let parent = self.store.get_thread(&params.thread_id)?;
596        let Some(parent) = parent else {
597            return Ok(None);
598        };
599        let parent_thread = to_protocol_thread(parent);
600        let new = self.spawn_thread_with_history(
601            params
602                .model_provider
603                .clone()
604                .unwrap_or_else(|| parent_thread.model_provider.clone()),
605            params
606                .cwd
607                .clone()
608                .unwrap_or_else(|| fallback_cwd.to_path_buf()),
609            InitialHistory::Forked(vec![json!({
610                "type": "fork",
611                "from_thread_id": parent_thread.id
612            })]),
613            params.persist_extended_history,
614        )?;
615        Ok(Some(new))
616    }
617
618    /// Lists threads matching the given filter parameters.
619    pub fn list_threads(&self, params: &ThreadListParams) -> Result<Vec<Thread>> {
620        let list = self.store.list_threads(ThreadListFilters {
621            include_archived: params.include_archived,
622            limit: params.limit,
623        })?;
624        Ok(list.into_iter().map(to_protocol_thread).collect())
625    }
626
627    /// Reads a single thread by id, or `None` if not found.
628    pub fn read_thread(&self, params: &ThreadReadParams) -> Result<Option<Thread>> {
629        Ok(self
630            .store
631            .get_thread(&params.thread_id)?
632            .map(to_protocol_thread))
633    }
634
635    /// Sets the display name for a thread, returning the updated thread or `None`.
636    pub fn set_thread_name(&mut self, params: &ThreadSetNameParams) -> Result<Option<Thread>> {
637        let Some(mut metadata) = self.store.get_thread(&params.thread_id)? else {
638            return Ok(None);
639        };
640        metadata.name = Some(params.name.clone());
641        metadata.updated_at = chrono::Utc::now().timestamp();
642        self.store.upsert_thread(&metadata)?;
643        let updated = to_protocol_thread(metadata);
644        self.running_threads
645            .insert(updated.id.clone(), updated.clone());
646        Ok(Some(updated))
647    }
648
649    /// Sets or replaces the persisted goal for a thread.
650    pub fn set_thread_goal(&mut self, params: &ThreadGoalSetParams) -> Result<Option<ThreadGoal>> {
651        if self.store.get_thread(&params.thread_id)?.is_none() {
652            return Ok(None);
653        }
654        let now = chrono::Utc::now().timestamp();
655        let goal = ThreadGoalRecord {
656            thread_id: params.thread_id.clone(),
657            goal_id: format!("goal-{}", Uuid::new_v4()),
658            objective: params.objective.clone(),
659            status: PersistedThreadGoalStatus::Active,
660            token_budget: params.token_budget,
661            tokens_used: 0,
662            time_used_seconds: 0,
663            continuation_count: 0,
664            created_at: now,
665            updated_at: now,
666        };
667        self.store.upsert_thread_goal(&goal)?;
668        Ok(Some(to_protocol_goal(goal)))
669    }
670
671    /// Reads the persisted goal for a thread.
672    pub fn get_thread_goal(&self, params: &ThreadGoalGetParams) -> Result<Option<ThreadGoal>> {
673        Ok(self
674            .store
675            .get_thread_goal(&params.thread_id)?
676            .map(to_protocol_goal))
677    }
678
679    /// Accrues durable per-goal usage and/or a continuation pass for a thread.
680    pub fn record_thread_goal_progress(
681        &mut self,
682        params: &ThreadGoalProgressParams,
683    ) -> Result<Option<ThreadGoal>> {
684        if self.store.get_thread(&params.thread_id)?.is_none() {
685            return Ok(None);
686        }
687
688        let now = chrono::Utc::now().timestamp();
689        let mut goal = if params.token_delta != 0 || params.time_delta_seconds != 0 {
690            self.store.record_thread_goal_usage(
691                &params.thread_id,
692                params.token_delta,
693                params.time_delta_seconds,
694                now,
695            )?
696        } else {
697            self.store.get_thread_goal(&params.thread_id)?
698        };
699
700        if params.record_continuation {
701            goal = self
702                .store
703                .record_thread_goal_continuation(&params.thread_id, now)?;
704        }
705
706        Ok(goal.map(to_protocol_goal))
707    }
708
709    /// Clears the persisted goal for a thread, returning whether one existed.
710    pub fn clear_thread_goal(&mut self, params: &ThreadGoalClearParams) -> Result<bool> {
711        self.store.delete_thread_goal(&params.thread_id)
712    }
713
714    /// Archives a thread so it no longer appears in default listings.
715    pub fn archive_thread(&mut self, thread_id: &str) -> Result<()> {
716        self.store.mark_archived(thread_id)?;
717        if let Some(thread) = self.running_threads.get_mut(thread_id) {
718            thread.status = ThreadStatus::Archived;
719        }
720        Ok(())
721    }
722
723    /// Restores an archived thread to active status.
724    pub fn unarchive_thread(&mut self, thread_id: &str) -> Result<()> {
725        self.store.mark_unarchived(thread_id)?;
726        Ok(())
727    }
728
729    /// Records a user message in a thread and updates its preview and timestamp.
730    pub fn touch_message(&mut self, thread_id: &str, input: &str) -> Result<()> {
731        let Some(mut metadata) = self.store.get_thread(thread_id)? else {
732            return Ok(());
733        };
734        metadata.updated_at = chrono::Utc::now().timestamp();
735        metadata.preview = truncate_preview(input);
736        metadata.status = PersistedThreadStatus::Running;
737        self.store.upsert_thread(&metadata)?;
738        if let Some(thread) = self.running_threads.get_mut(thread_id) {
739            thread.updated_at = metadata.updated_at;
740            thread.preview = metadata.preview;
741            thread.status = ThreadStatus::Running;
742        }
743        let message_id = self.store.append_message(thread_id, "user", input, None)?;
744        self.store.save_checkpoint(
745            thread_id,
746            "latest",
747            &json!({
748                "reason": "thread_message",
749                "message_id": message_id,
750                "role": "user",
751                "preview": truncate_preview(input),
752                "updated_at": metadata.updated_at
753            }),
754        )?;
755        Ok(())
756    }
757
758    fn persist_thread(&self, thread: &Thread, rollout_path: Option<PathBuf>) -> Result<()> {
759        self.store.upsert_thread(&ThreadMetadata {
760            id: thread.id.clone(),
761            rollout_path,
762            preview: thread.preview.clone(),
763            ephemeral: thread.ephemeral,
764            model_provider: thread.model_provider.clone(),
765            created_at: thread.created_at,
766            updated_at: thread.updated_at,
767            status: to_persisted_status(&thread.status),
768            path: thread.path.clone(),
769            cwd: thread.cwd.clone(),
770            cli_version: thread.cli_version.clone(),
771            source: to_persisted_source(&thread.source),
772            name: thread.name.clone(),
773            sandbox_policy: None,
774            approval_mode: None,
775            archived: matches!(thread.status, ThreadStatus::Archived),
776            archived_at: None,
777            git_sha: None,
778            git_branch: None,
779            git_origin_url: None,
780            memory_mode: None,
781            current_leaf_id: None,
782        })
783    }
784}
785
786/// Top-level runtime combining config, model registry, threads, tools, MCP, and hooks.
787pub struct Runtime {
788    /// Resolved application configuration.
789    pub config: ConfigToml,
790    /// Registry of available model providers.
791    pub model_registry: ModelRegistry,
792    /// Manages conversation thread lifecycle.
793    pub thread_manager: ThreadManager,
794    /// Registry of callable tools.
795    pub tool_registry: Arc<ToolRegistry>,
796    /// Manager for MCP server connections.
797    pub mcp_manager: Arc<McpManager>,
798    /// Engine for evaluating execution policy decisions.
799    pub exec_policy: ExecPolicyEngine,
800    /// Dispatcher for lifecycle hooks.
801    pub hooks: HookDispatcher,
802    /// Manager for background job lifecycle.
803    pub jobs: JobManager,
804}
805
806impl Runtime {
807    /// Constructs a new `Runtime`, loading existing jobs from the state store.
808    pub fn new(
809        config: ConfigToml,
810        model_registry: ModelRegistry,
811        state: StateStore,
812        tool_registry: Arc<ToolRegistry>,
813        mcp_manager: Arc<McpManager>,
814        exec_policy: ExecPolicyEngine,
815        hooks: HookDispatcher,
816    ) -> Self {
817        let mut jobs = JobManager::default();
818        if let Err(e) = jobs.load_from_store(&state) {
819            tracing::warn!("Failed to load job store, starting with empty job list: {e}");
820        }
821        Self {
822            config,
823            model_registry,
824            thread_manager: ThreadManager::new(state),
825            tool_registry,
826            mcp_manager,
827            exec_policy,
828            hooks,
829            jobs,
830        }
831    }
832
833    fn persisted_thread_data(&self, thread_id: &str) -> Result<Value> {
834        let history = self
835            .thread_manager
836            .state_store()
837            .list_messages(thread_id, Some(500))?
838            .into_iter()
839            .map(|message| {
840                json!({
841                    "id": message.id,
842                    "role": message.role,
843                    "content": message.content,
844                    "item": message.item,
845                    "created_at": message.created_at
846                })
847            })
848            .collect::<Vec<_>>();
849
850        let checkpoint = self
851            .thread_manager
852            .state_store()
853            .load_checkpoint(thread_id, None)?
854            .map(|record| {
855                json!({
856                    "checkpoint_id": record.checkpoint_id,
857                    "state": record.state,
858                    "created_at": record.created_at
859                })
860            });
861
862        let goal = self
863            .thread_manager
864            .state_store()
865            .get_thread_goal(thread_id)?
866            .map(to_protocol_goal);
867
868        Ok(json!({
869            "history": history,
870            "checkpoint": checkpoint,
871            "goal": goal
872        }))
873    }
874
875    fn persist_latest_checkpoint(&self, thread_id: &str, reason: &str, state: Value) -> Result<()> {
876        self.thread_manager.state_store().save_checkpoint(
877            thread_id,
878            "latest",
879            &json!({
880                "reason": reason,
881                "saved_at": chrono::Utc::now().timestamp(),
882                "state": state
883            }),
884        )
885    }
886
887    /// Dispatches a thread request (create, start, resume, fork, list, read, etc.).
888    pub async fn handle_thread(&mut self, req: ThreadRequest) -> Result<ThreadResponse> {
889        match req {
890            ThreadRequest::Create { .. } => {
891                let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
892                let new = self.thread_manager.spawn_thread_with_history(
893                    "deepseek".to_string(),
894                    cwd,
895                    InitialHistory::New,
896                    false,
897                )?;
898                let mut response = thread_response_from_new("created", new);
899                response.data = self.persisted_thread_data(&response.thread_id)?;
900                Ok(response)
901            }
902            ThreadRequest::Start(params) => {
903                let cwd = params.cwd.clone().unwrap_or_else(|| {
904                    std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
905                });
906                let new = self.thread_manager.spawn_thread_with_history(
907                    params
908                        .model_provider
909                        .clone()
910                        .unwrap_or_else(|| "deepseek".to_string()),
911                    cwd,
912                    InitialHistory::New,
913                    params.persist_extended_history,
914                )?;
915                let mut response = thread_response_from_new("started", new);
916                response.data = self.persisted_thread_data(&response.thread_id)?;
917                Ok(response)
918            }
919            ThreadRequest::Resume(params) => {
920                let fallback_cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
921                if let Some(new) = self.thread_manager.resume_thread_with_history(
922                    &params,
923                    &fallback_cwd,
924                    "deepseek".to_string(),
925                )? {
926                    let mut response = thread_response_from_new("resumed", new);
927                    response.data = self.persisted_thread_data(&response.thread_id)?;
928                    Ok(response)
929                } else {
930                    Ok(ThreadResponse {
931                        thread_id: params.thread_id,
932                        status: "missing".to_string(),
933                        thread: None,
934                        threads: Vec::new(),
935                        goal: None,
936                        model: None,
937                        model_provider: None,
938                        cwd: None,
939                        approval_policy: params.approval_policy,
940                        sandbox: params.sandbox,
941                        events: Vec::new(),
942                        data: json!({"error":"thread not found"}),
943                    })
944                }
945            }
946            ThreadRequest::Fork(params) => {
947                let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
948                if let Some(new) = self.thread_manager.fork_thread(&params, &cwd)? {
949                    let mut response = thread_response_from_new("forked", new);
950                    response.data = self.persisted_thread_data(&response.thread_id)?;
951                    Ok(response)
952                } else {
953                    Ok(ThreadResponse {
954                        thread_id: params.thread_id,
955                        status: "missing".to_string(),
956                        thread: None,
957                        threads: Vec::new(),
958                        goal: None,
959                        model: None,
960                        model_provider: None,
961                        cwd: None,
962                        approval_policy: params.approval_policy,
963                        sandbox: params.sandbox,
964                        events: Vec::new(),
965                        data: json!({"error":"thread not found"}),
966                    })
967                }
968            }
969            ThreadRequest::List(params) => Ok(ThreadResponse {
970                thread_id: "list".to_string(),
971                status: "ok".to_string(),
972                thread: None,
973                threads: self.thread_manager.list_threads(&params)?,
974                goal: None,
975                model: None,
976                model_provider: None,
977                cwd: None,
978                approval_policy: None,
979                sandbox: None,
980                events: Vec::new(),
981                data: json!({}),
982            }),
983            ThreadRequest::Read(params) => {
984                let id = params.thread_id.clone();
985                let data = self.persisted_thread_data(&id)?;
986                Ok(ThreadResponse {
987                    thread_id: id,
988                    status: "ok".to_string(),
989                    thread: self.thread_manager.read_thread(&params)?,
990                    threads: Vec::new(),
991                    goal: self.thread_manager.get_thread_goal(&ThreadGoalGetParams {
992                        thread_id: params.thread_id,
993                    })?,
994                    model: None,
995                    model_provider: None,
996                    cwd: None,
997                    approval_policy: None,
998                    sandbox: None,
999                    events: Vec::new(),
1000                    data,
1001                })
1002            }
1003            ThreadRequest::SetName(params) => Ok(ThreadResponse {
1004                thread_id: params.thread_id.clone(),
1005                status: "ok".to_string(),
1006                thread: self.thread_manager.set_thread_name(&params)?,
1007                threads: Vec::new(),
1008                goal: None,
1009                model: None,
1010                model_provider: None,
1011                cwd: None,
1012                approval_policy: None,
1013                sandbox: None,
1014                events: Vec::new(),
1015                data: json!({}),
1016            }),
1017            ThreadRequest::GoalSet(params) => {
1018                let thread_id = params.thread_id.clone();
1019                if let Some(goal) = self.thread_manager.set_thread_goal(&params)? {
1020                    Ok(ThreadResponse {
1021                        thread_id,
1022                        status: "ok".to_string(),
1023                        thread: None,
1024                        threads: Vec::new(),
1025                        goal: Some(goal.clone()),
1026                        model: None,
1027                        model_provider: None,
1028                        cwd: None,
1029                        approval_policy: None,
1030                        sandbox: None,
1031                        events: vec![EventFrame::ThreadGoalUpdated { goal: goal.clone() }],
1032                        data: json!({ "goal": goal }),
1033                    })
1034                } else {
1035                    Ok(ThreadResponse {
1036                        thread_id,
1037                        status: "missing".to_string(),
1038                        thread: None,
1039                        threads: Vec::new(),
1040                        goal: None,
1041                        model: None,
1042                        model_provider: None,
1043                        cwd: None,
1044                        approval_policy: None,
1045                        sandbox: None,
1046                        events: Vec::new(),
1047                        data: json!({"error":"thread not found"}),
1048                    })
1049                }
1050            }
1051            ThreadRequest::GoalGet(params) => {
1052                let goal = self.thread_manager.get_thread_goal(&params)?;
1053                Ok(ThreadResponse {
1054                    thread_id: params.thread_id,
1055                    status: "ok".to_string(),
1056                    thread: None,
1057                    threads: Vec::new(),
1058                    goal: goal.clone(),
1059                    model: None,
1060                    model_provider: None,
1061                    cwd: None,
1062                    approval_policy: None,
1063                    sandbox: None,
1064                    events: Vec::new(),
1065                    data: json!({ "goal": goal }),
1066                })
1067            }
1068            ThreadRequest::GoalClear(params) => {
1069                let thread_id = params.thread_id.clone();
1070                let cleared = self.thread_manager.clear_thread_goal(&params)?;
1071                Ok(ThreadResponse {
1072                    thread_id: thread_id.clone(),
1073                    status: if cleared { "cleared" } else { "empty" }.to_string(),
1074                    thread: None,
1075                    threads: Vec::new(),
1076                    goal: None,
1077                    model: None,
1078                    model_provider: None,
1079                    cwd: None,
1080                    approval_policy: None,
1081                    sandbox: None,
1082                    events: if cleared {
1083                        vec![EventFrame::ThreadGoalCleared { thread_id }]
1084                    } else {
1085                        Vec::new()
1086                    },
1087                    data: json!({ "cleared": cleared }),
1088                })
1089            }
1090            ThreadRequest::GoalRecordProgress(params) => {
1091                let thread_id = params.thread_id.clone();
1092                if let Some(goal) = self.thread_manager.record_thread_goal_progress(&params)? {
1093                    Ok(ThreadResponse {
1094                        thread_id,
1095                        status: "ok".to_string(),
1096                        thread: None,
1097                        threads: Vec::new(),
1098                        goal: Some(goal.clone()),
1099                        model: None,
1100                        model_provider: None,
1101                        cwd: None,
1102                        approval_policy: None,
1103                        sandbox: None,
1104                        events: vec![EventFrame::ThreadGoalUpdated { goal: goal.clone() }],
1105                        data: json!({ "goal": goal }),
1106                    })
1107                } else {
1108                    Ok(ThreadResponse {
1109                        thread_id,
1110                        status: "missing".to_string(),
1111                        thread: None,
1112                        threads: Vec::new(),
1113                        goal: None,
1114                        model: None,
1115                        model_provider: None,
1116                        cwd: None,
1117                        approval_policy: None,
1118                        sandbox: None,
1119                        events: Vec::new(),
1120                        data: json!({"error":"thread or goal not found"}),
1121                    })
1122                }
1123            }
1124            ThreadRequest::Archive { thread_id } => {
1125                self.thread_manager.archive_thread(&thread_id)?;
1126                Ok(ThreadResponse {
1127                    thread_id,
1128                    status: "archived".to_string(),
1129                    thread: None,
1130                    threads: Vec::new(),
1131                    goal: None,
1132                    model: None,
1133                    model_provider: None,
1134                    cwd: None,
1135                    approval_policy: None,
1136                    sandbox: None,
1137                    events: Vec::new(),
1138                    data: json!({}),
1139                })
1140            }
1141            ThreadRequest::Unarchive { thread_id } => {
1142                self.thread_manager.unarchive_thread(&thread_id)?;
1143                Ok(ThreadResponse {
1144                    thread_id,
1145                    status: "unarchived".to_string(),
1146                    thread: None,
1147                    threads: Vec::new(),
1148                    goal: None,
1149                    model: None,
1150                    model_provider: None,
1151                    cwd: None,
1152                    approval_policy: None,
1153                    sandbox: None,
1154                    events: Vec::new(),
1155                    data: json!({}),
1156                })
1157            }
1158            ThreadRequest::Message { thread_id, input } => {
1159                self.thread_manager.touch_message(&thread_id, &input)?;
1160                let response_id = format!("{thread_id}:{}", input.len());
1161                self.hooks
1162                    .emit(HookEvent::ResponseStart {
1163                        response_id: response_id.clone(),
1164                    })
1165                    .await;
1166                self.hooks
1167                    .emit(HookEvent::ResponseEnd {
1168                        response_id: response_id.clone(),
1169                    })
1170                    .await;
1171
1172                Ok(ThreadResponse {
1173                    thread_id,
1174                    status: "accepted".to_string(),
1175                    thread: None,
1176                    threads: Vec::new(),
1177                    goal: None,
1178                    model: None,
1179                    model_provider: None,
1180                    cwd: None,
1181                    approval_policy: None,
1182                    sandbox: None,
1183                    events: vec![
1184                        EventFrame::ResponseStart {
1185                            response_id: response_id.clone(),
1186                        },
1187                        EventFrame::ResponseDelta {
1188                            response_id: response_id.clone(),
1189                            delta: "queued".to_string(),
1190                            channel: ResponseChannel::Text,
1191                        },
1192                        EventFrame::ResponseEnd { response_id },
1193                    ],
1194                    data: json!({}),
1195                })
1196            }
1197        }
1198    }
1199
1200    /// Resolves the model for a prompt, records the message, and returns the response.
1201    pub async fn handle_prompt(
1202        &mut self,
1203        req: PromptRequest,
1204        cli_overrides: &CliRuntimeOverrides,
1205    ) -> Result<PromptResponse> {
1206        let resolved = self.config.resolve_runtime_options(cli_overrides);
1207        let requested_model = req.model.clone().unwrap_or_else(|| resolved.model.clone());
1208        let selection = self
1209            .model_registry
1210            .resolve(Some(&requested_model), Some(resolved.provider));
1211        let resolved_model = selection.resolved.id.clone();
1212        let response_id = format!("resp-{}", Uuid::new_v4());
1213
1214        self.hooks
1215            .emit(HookEvent::ResponseStart {
1216                response_id: response_id.clone(),
1217            })
1218            .await;
1219        self.hooks
1220            .emit(HookEvent::ResponseDelta {
1221                response_id: response_id.clone(),
1222                delta: "model-selected".to_string(),
1223            })
1224            .await;
1225        self.hooks
1226            .emit(HookEvent::ResponseEnd {
1227                response_id: response_id.clone(),
1228            })
1229            .await;
1230
1231        let payload = json!({
1232            "provider": resolved.provider.as_str(),
1233            "model": resolved_model.clone(),
1234            "prompt": req.prompt,
1235            "telemetry": resolved.telemetry,
1236            "base_url": resolved.base_url,
1237            "has_api_key": resolved.api_key.as_ref().is_some_and(|k| !k.trim().is_empty()),
1238            "approval_policy": resolved.approval_policy,
1239            "sandbox_mode": resolved.sandbox_mode
1240        });
1241        if let Some(thread_id) = req.thread_id.as_ref() {
1242            self.thread_manager.touch_message(thread_id, &req.prompt)?;
1243            let assistant_message_id = self.thread_manager.store.append_message(
1244                thread_id,
1245                "assistant",
1246                &payload.to_string(),
1247                Some(payload.clone()),
1248            )?;
1249            self.persist_latest_checkpoint(
1250                thread_id,
1251                "prompt_response",
1252                json!({
1253                    "response_id": response_id.clone(),
1254                    "model": resolved_model.clone(),
1255                    "provider": resolved.provider.as_str(),
1256                    "assistant_message_id": assistant_message_id
1257                }),
1258            )?;
1259        }
1260
1261        Ok(PromptResponse {
1262            output: payload.to_string(),
1263            model: resolved_model,
1264            events: vec![
1265                EventFrame::ResponseStart {
1266                    response_id: response_id.clone(),
1267                },
1268                EventFrame::ResponseDelta {
1269                    response_id: response_id.clone(),
1270                    delta: "model-selected".to_string(),
1271                    channel: ResponseChannel::Text,
1272                },
1273                EventFrame::ResponseEnd { response_id },
1274            ],
1275        })
1276    }
1277
1278    /// Evaluates execution policy and dispatches a tool call.
1279    pub async fn invoke_tool(
1280        &self,
1281        call: ToolCall,
1282        approval_mode: AskForApproval,
1283        cwd: &Path,
1284    ) -> Result<Value> {
1285        let fallback_cwd = cwd.display().to_string();
1286        let (command, policy_cwd, execution_kind) = call.execution_subject(&fallback_cwd);
1287        let policy_tool = match &call.payload {
1288            ToolPayload::LocalShell { .. } => "exec_shell",
1289            _ => call.name.as_str(),
1290        };
1291        let policy_path = permission_path_for_call(&call);
1292        let decision = self.exec_policy.check(ExecPolicyContext {
1293            command: &command,
1294            cwd: &policy_cwd,
1295            tool: Some(policy_tool),
1296            path: policy_path.as_deref(),
1297            ask_for_approval: approval_mode,
1298            sandbox_mode: None,
1299        })?;
1300        let precheck = policy_precheck_payload(&decision, &command, &policy_cwd, execution_kind);
1301        let response_id = format!("tool-{}", Uuid::new_v4());
1302        let call_id = call
1303            .raw_tool_call_id
1304            .clone()
1305            .unwrap_or_else(|| format!("tool-call-{}", Uuid::new_v4()));
1306        self.hooks
1307            .emit(HookEvent::ToolLifecycle {
1308                response_id: response_id.clone(),
1309                tool_name: call.name.clone(),
1310                phase: "precheck".to_string(),
1311                payload: precheck.clone(),
1312            })
1313            .await;
1314
1315        if !decision.allow {
1316            let reason = decision.reason().to_string();
1317            let approval_id = format!("approval-{}", Uuid::new_v4());
1318            let error_frame = EventFrame::Error {
1319                response_id: response_id.clone(),
1320                message: reason.clone(),
1321            };
1322            self.hooks
1323                .emit(HookEvent::ApprovalLifecycle {
1324                    approval_id,
1325                    phase: "denied".to_string(),
1326                    reason: Some(reason.clone()),
1327                })
1328                .await;
1329            self.hooks
1330                .emit(HookEvent::GenericEventFrame {
1331                    frame: Box::new(error_frame.clone()),
1332                })
1333                .await;
1334            return Ok(json!({
1335                "ok": false,
1336                "status": "denied",
1337                "execution_kind": execution_kind,
1338                "response_id": response_id,
1339                "precheck": precheck,
1340                "error": reason,
1341                "events": [event_frame_payload(&error_frame)],
1342            }));
1343        }
1344
1345        if decision.requires_approval {
1346            let approval_id = format!("approval-{}", Uuid::new_v4());
1347            let reason = decision.reason().to_string();
1348            let maybe_approval_frame = approval_request_frame(
1349                &decision.requirement,
1350                decision.matched_rule.as_deref(),
1351                call_id,
1352                approval_id.clone(),
1353                response_id.clone(),
1354                command.clone(),
1355                policy_cwd.clone(),
1356            );
1357            self.hooks
1358                .emit(HookEvent::ApprovalLifecycle {
1359                    approval_id: approval_id.clone(),
1360                    phase: "requested".to_string(),
1361                    reason: Some(reason.clone()),
1362                })
1363                .await;
1364            let mut events = Vec::new();
1365            if let Some(frame) = maybe_approval_frame {
1366                self.hooks
1367                    .emit(HookEvent::GenericEventFrame {
1368                        frame: Box::new(frame.clone()),
1369                    })
1370                    .await;
1371                events.push(event_frame_payload(&frame));
1372            }
1373            return Ok(json!({
1374                "ok": false,
1375                "status": "approval_required",
1376                "execution_kind": execution_kind,
1377                "response_id": response_id,
1378                "approval_id": approval_id,
1379                "precheck": precheck,
1380                "error": reason,
1381                "events": events,
1382            }));
1383        }
1384
1385        // Headless `request_user_input`: mirror the approval fire-and-return
1386        // branch (issue #3102). The TUI intercepts this tool by name before
1387        // dispatch and blocks on a reply channel; the headless runtime instead
1388        // emits a typed `UserInputRequest` frame and returns a
1389        // `user_input_required` status so the client can render the question
1390        // and POST answers back via `AppRequest::SubmitUserInput`. It does NOT
1391        // block — consistent with the headless approval model, which has no
1392        // resume channel either.
1393        if call.name == REQUEST_USER_INPUT_TOOL_NAME {
1394            let request_id = format!("user-input-{}", Uuid::new_v4());
1395            let arguments = match &call.payload {
1396                ToolPayload::Function { arguments } => arguments.as_str(),
1397                // Custom/Mcp/LocalShell can't carry a user_input payload; fall
1398                // through to the generic dispatch error below.
1399                _ => "",
1400            };
1401            let maybe_frame = user_input_request_frame(
1402                call_id.clone(),
1403                response_id.clone(),
1404                request_id.clone(),
1405                arguments,
1406            );
1407            let mut events = Vec::new();
1408            if let Some(frame) = maybe_frame {
1409                self.hooks
1410                    .emit(HookEvent::GenericEventFrame {
1411                        frame: Box::new(frame.clone()),
1412                    })
1413                    .await;
1414                events.push(event_frame_payload(&frame));
1415            }
1416            return Ok(json!({
1417                "ok": false,
1418                "status": "user_input_required",
1419                "execution_kind": execution_kind,
1420                "response_id": response_id,
1421                "request_id": request_id,
1422                "precheck": precheck,
1423                "events": events,
1424            }));
1425        }
1426
1427        let start_frame = EventFrame::ToolCallStart {
1428            response_id: response_id.clone(),
1429            tool_name: call.name.clone(),
1430            arguments: tool_payload_value(&call.payload),
1431        };
1432        self.hooks
1433            .emit(HookEvent::GenericEventFrame {
1434                frame: Box::new(start_frame.clone()),
1435            })
1436            .await;
1437        self.hooks
1438            .emit(HookEvent::ToolLifecycle {
1439                response_id: response_id.clone(),
1440                tool_name: call.name.clone(),
1441                phase: "dispatching".to_string(),
1442                payload: json!({
1443                    "call_id": call_id,
1444                    "execution_kind": execution_kind
1445                }),
1446            })
1447            .await;
1448
1449        match self.tool_registry.dispatch(call.clone(), true).await {
1450            Ok(tool_output) => {
1451                let result_frame = EventFrame::ToolCallResult {
1452                    response_id: response_id.clone(),
1453                    tool_name: call.name.clone(),
1454                    output: tool_output_value(&tool_output),
1455                };
1456                self.hooks
1457                    .emit(HookEvent::GenericEventFrame {
1458                        frame: Box::new(result_frame.clone()),
1459                    })
1460                    .await;
1461                self.hooks
1462                    .emit(HookEvent::ToolLifecycle {
1463                        response_id: response_id.clone(),
1464                        tool_name: call.name,
1465                        phase: "completed".to_string(),
1466                        payload: json!({ "ok": true }),
1467                    })
1468                    .await;
1469                Ok(json!({
1470                    "ok": true,
1471                    "status": "completed",
1472                    "execution_kind": execution_kind,
1473                    "response_id": response_id,
1474                    "precheck": precheck,
1475                    "output": tool_output,
1476                    "events": [
1477                        event_frame_payload(&start_frame),
1478                        event_frame_payload(&result_frame)
1479                    ]
1480                }))
1481            }
1482            Err(err) => {
1483                let message = format!("{err:?}");
1484                let error_frame = EventFrame::Error {
1485                    response_id: response_id.clone(),
1486                    message: message.clone(),
1487                };
1488                self.hooks
1489                    .emit(HookEvent::GenericEventFrame {
1490                        frame: Box::new(error_frame.clone()),
1491                    })
1492                    .await;
1493                self.hooks
1494                    .emit(HookEvent::ToolLifecycle {
1495                        response_id: response_id.clone(),
1496                        tool_name: call.name,
1497                        phase: "failed".to_string(),
1498                        payload: json!({ "error": message.clone() }),
1499                    })
1500                    .await;
1501                Ok(json!({
1502                    "ok": false,
1503                    "status": "failed",
1504                    "execution_kind": execution_kind,
1505                    "response_id": response_id,
1506                    "precheck": precheck,
1507                    "error": message,
1508                    "events": [
1509                        event_frame_payload(&start_frame),
1510                        event_frame_payload(&error_frame)
1511                    ]
1512                }))
1513            }
1514        }
1515    }
1516
1517    /// Starts all configured MCP servers and emits startup events via hooks.
1518    pub async fn mcp_startup(&self) -> McpStartupCompleteEvent {
1519        let mut updates = Vec::new();
1520        let summary = self.mcp_manager.start_all(|update| {
1521            updates.push(update);
1522        });
1523        for update in updates {
1524            let status = match update.status {
1525                McpManagerStartupStatus::Starting => codewhale_protocol::McpStartupStatus::Starting,
1526                McpManagerStartupStatus::Ready => codewhale_protocol::McpStartupStatus::Ready,
1527                McpManagerStartupStatus::Failed { error } => {
1528                    codewhale_protocol::McpStartupStatus::Failed { error }
1529                }
1530                McpManagerStartupStatus::Cancelled => {
1531                    codewhale_protocol::McpStartupStatus::Cancelled
1532                }
1533            };
1534            self.hooks
1535                .emit(HookEvent::GenericEventFrame {
1536                    frame: Box::new(EventFrame::McpStartupUpdate {
1537                        update: codewhale_protocol::McpStartupUpdateEvent {
1538                            server_name: update.server_name,
1539                            status,
1540                        },
1541                    }),
1542                })
1543                .await;
1544        }
1545        self.hooks
1546            .emit(HookEvent::GenericEventFrame {
1547                frame: Box::new(EventFrame::McpStartupComplete {
1548                    summary: codewhale_protocol::McpStartupCompleteEvent {
1549                        ready: summary.ready.clone(),
1550                        failed: summary
1551                            .failed
1552                            .iter()
1553                            .map(|f| codewhale_protocol::McpStartupFailure {
1554                                server_name: f.server_name.clone(),
1555                                error: f.error.clone(),
1556                            })
1557                            .collect(),
1558                        cancelled: summary.cancelled.clone(),
1559                    },
1560                }),
1561            })
1562            .await;
1563        summary
1564    }
1565
1566    /// Returns the current application status including all jobs and their history.
1567    pub fn app_status(&self) -> AppResponse {
1568        let jobs = self.jobs.list();
1569        let events = jobs
1570            .iter()
1571            .flat_map(|job| {
1572                job.history.iter().map(|entry| EventFrame::ResponseDelta {
1573                    response_id: job.id.clone(),
1574                    delta: json!({
1575                        "kind": "job_transition",
1576                        "job_id": job.id.clone(),
1577                        "phase": entry.phase.clone(),
1578                        "status": job_status_to_str(entry.status),
1579                        "progress": entry.progress,
1580                        "detail": entry.detail.clone(),
1581                        "retry": job_retry_to_value(&entry.retry),
1582                        "at": entry.at
1583                    })
1584                    .to_string(),
1585                    channel: ResponseChannel::Text,
1586                })
1587            })
1588            .collect::<Vec<_>>();
1589        AppResponse {
1590            ok: true,
1591            data: json!({
1592                "jobs": jobs.into_iter().map(|job| {
1593                    json!({
1594                        "id": job.id,
1595                        "name": job.name,
1596                        "status": job_status_to_str(job.status),
1597                        "progress": job.progress,
1598                        "detail": job.detail,
1599                        "retry": job_retry_to_value(&job.retry),
1600                        "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
1601                    })
1602                }).collect::<Vec<_>>()
1603            }),
1604            events,
1605        }
1606    }
1607
1608    /// Returns the default model provider from the resolved configuration.
1609    pub fn provider_default(&self) -> ProviderKind {
1610        self.config.provider
1611    }
1612
1613    /// Saves a named checkpoint for a thread.
1614    pub fn save_thread_checkpoint(
1615        &self,
1616        thread_id: &str,
1617        checkpoint_id: &str,
1618        state: &Value,
1619    ) -> Result<()> {
1620        self.thread_manager
1621            .state_store()
1622            .save_checkpoint(thread_id, checkpoint_id, state)
1623    }
1624
1625    /// Loads a checkpoint for a thread. Pass `None` for the latest.
1626    pub fn load_thread_checkpoint(
1627        &self,
1628        thread_id: &str,
1629        checkpoint_id: Option<&str>,
1630    ) -> Result<Option<Value>> {
1631        Ok(self
1632            .thread_manager
1633            .state_store()
1634            .load_checkpoint(thread_id, checkpoint_id)?
1635            .map(|checkpoint| checkpoint.state))
1636    }
1637
1638    /// Enqueues a new background job and persists it immediately.
1639    pub fn enqueue_job(&mut self, name: impl Into<String>) -> Result<JobRecord> {
1640        let job = self.jobs.enqueue(name);
1641        self.jobs
1642            .persist_job(self.thread_manager.state_store(), &job.id)?;
1643        Ok(job)
1644    }
1645
1646    /// Transitions a job to running and persists the change.
1647    pub fn set_job_running(&mut self, job_id: &str) -> Result<()> {
1648        self.jobs.set_running(job_id);
1649        self.jobs
1650            .persist_job(self.thread_manager.state_store(), job_id)
1651    }
1652
1653    /// Updates a job's progress and persists the change.
1654    pub fn update_job_progress(
1655        &mut self,
1656        job_id: &str,
1657        progress: u8,
1658        detail: Option<String>,
1659    ) -> Result<()> {
1660        self.jobs.update_progress(job_id, progress, detail);
1661        self.jobs
1662            .persist_job(self.thread_manager.state_store(), job_id)
1663    }
1664
1665    /// Marks a job as completed and persists the change.
1666    pub fn complete_job(&mut self, job_id: &str) -> Result<()> {
1667        self.jobs.complete(job_id);
1668        self.jobs
1669            .persist_job(self.thread_manager.state_store(), job_id)
1670    }
1671
1672    /// Marks a job as failed and persists the change.
1673    pub fn fail_job(&mut self, job_id: &str, detail: impl Into<String>) -> Result<()> {
1674        self.jobs.fail(job_id, detail);
1675        self.jobs
1676            .persist_job(self.thread_manager.state_store(), job_id)
1677    }
1678
1679    /// Cancels a job and persists the change.
1680    pub fn cancel_job(&mut self, job_id: &str) -> Result<()> {
1681        self.jobs.cancel(job_id);
1682        self.jobs
1683            .persist_job(self.thread_manager.state_store(), job_id)
1684    }
1685
1686    /// Pauses a job and persists the change.
1687    pub fn pause_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1688        self.jobs.pause(job_id, detail);
1689        self.jobs
1690            .persist_job(self.thread_manager.state_store(), job_id)
1691    }
1692
1693    /// Resumes a paused job and persists the change.
1694    pub fn resume_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1695        self.jobs.resume(job_id, detail);
1696        self.jobs
1697            .persist_job(self.thread_manager.state_store(), job_id)
1698    }
1699
1700    /// Returns the state-transition history for a job.
1701    pub fn job_history(&self, job_id: &str) -> Vec<JobHistoryEntry> {
1702        self.jobs.history(job_id)
1703    }
1704}
1705
1706fn thread_response_from_new(status: &str, new: NewThread) -> ThreadResponse {
1707    ThreadResponse {
1708        thread_id: new.thread.id.clone(),
1709        status: status.to_string(),
1710        thread: Some(new.thread),
1711        threads: Vec::new(),
1712        goal: None,
1713        model: Some(new.model),
1714        model_provider: Some(new.model_provider),
1715        cwd: Some(new.cwd),
1716        approval_policy: new.approval_policy,
1717        sandbox: new.sandbox,
1718        events: Vec::new(),
1719        data: json!({}),
1720    }
1721}
1722
1723fn preview_from_initial_history(initial_history: &InitialHistory) -> String {
1724    match initial_history {
1725        InitialHistory::New => "New conversation".to_string(),
1726        InitialHistory::Forked(items) => truncate_preview(
1727            &items
1728                .first()
1729                .map(Value::to_string)
1730                .unwrap_or_else(|| "Forked conversation".to_string()),
1731        ),
1732        InitialHistory::Resumed { history, .. } => truncate_preview(
1733            &history
1734                .first()
1735                .map(Value::to_string)
1736                .unwrap_or_else(|| "Resumed conversation".to_string()),
1737        ),
1738    }
1739}
1740
1741fn permission_path_for_call(call: &ToolCall) -> Option<String> {
1742    match &call.payload {
1743        ToolPayload::Function { arguments } => serde_json::from_str::<Value>(arguments)
1744            .ok()
1745            .and_then(|value| {
1746                value
1747                    .get("path")
1748                    .and_then(Value::as_str)
1749                    .map(str::to_string)
1750            }),
1751        ToolPayload::Mcp { raw_arguments, .. } => raw_arguments
1752            .get("path")
1753            .and_then(Value::as_str)
1754            .map(str::to_string),
1755        ToolPayload::Custom { .. } | ToolPayload::LocalShell { .. } => None,
1756    }
1757}
1758
1759fn truncate_preview(value: &str) -> String {
1760    value.chars().take(120).collect()
1761}
1762
1763fn to_protocol_thread(thread: ThreadMetadata) -> Thread {
1764    Thread {
1765        id: thread.id,
1766        preview: thread.preview,
1767        ephemeral: thread.ephemeral,
1768        model_provider: thread.model_provider,
1769        created_at: thread.created_at,
1770        updated_at: thread.updated_at,
1771        status: match thread.status {
1772            PersistedThreadStatus::Running => ThreadStatus::Running,
1773            PersistedThreadStatus::Idle => ThreadStatus::Idle,
1774            PersistedThreadStatus::Completed => ThreadStatus::Completed,
1775            PersistedThreadStatus::Failed => ThreadStatus::Failed,
1776            PersistedThreadStatus::Paused => ThreadStatus::Paused,
1777            PersistedThreadStatus::Archived => ThreadStatus::Archived,
1778        },
1779        path: thread.path,
1780        cwd: thread.cwd,
1781        cli_version: thread.cli_version,
1782        source: match thread.source {
1783            SessionSource::Interactive => codewhale_protocol::SessionSource::Interactive,
1784            SessionSource::Resume => codewhale_protocol::SessionSource::Resume,
1785            SessionSource::Fork => codewhale_protocol::SessionSource::Fork,
1786            SessionSource::Api => codewhale_protocol::SessionSource::Api,
1787            SessionSource::Unknown => codewhale_protocol::SessionSource::Unknown,
1788        },
1789        name: thread.name,
1790    }
1791}
1792
1793fn to_protocol_goal(goal: ThreadGoalRecord) -> ThreadGoal {
1794    ThreadGoal {
1795        thread_id: goal.thread_id,
1796        goal_id: goal.goal_id,
1797        objective: goal.objective,
1798        status: to_protocol_goal_status(goal.status),
1799        token_budget: goal.token_budget,
1800        tokens_used: goal.tokens_used,
1801        time_used_seconds: goal.time_used_seconds,
1802        continuation_count: goal.continuation_count,
1803        created_at: goal.created_at,
1804        updated_at: goal.updated_at,
1805    }
1806}
1807
1808fn to_protocol_goal_status(status: PersistedThreadGoalStatus) -> ThreadGoalStatus {
1809    match status {
1810        PersistedThreadGoalStatus::Active => ThreadGoalStatus::Active,
1811        PersistedThreadGoalStatus::Paused => ThreadGoalStatus::Paused,
1812        PersistedThreadGoalStatus::Blocked => ThreadGoalStatus::Blocked,
1813        PersistedThreadGoalStatus::UsageLimited => ThreadGoalStatus::UsageLimited,
1814        PersistedThreadGoalStatus::BudgetLimited => ThreadGoalStatus::BudgetLimited,
1815        PersistedThreadGoalStatus::Complete => ThreadGoalStatus::Complete,
1816    }
1817}
1818
1819fn to_persisted_status(status: &ThreadStatus) -> PersistedThreadStatus {
1820    match status {
1821        ThreadStatus::Running => PersistedThreadStatus::Running,
1822        ThreadStatus::Idle => PersistedThreadStatus::Idle,
1823        ThreadStatus::Completed => PersistedThreadStatus::Completed,
1824        ThreadStatus::Failed => PersistedThreadStatus::Failed,
1825        ThreadStatus::Paused => PersistedThreadStatus::Paused,
1826        ThreadStatus::Archived => PersistedThreadStatus::Archived,
1827    }
1828}
1829
1830fn to_persisted_source(source: &codewhale_protocol::SessionSource) -> SessionSource {
1831    match source {
1832        codewhale_protocol::SessionSource::Interactive => SessionSource::Interactive,
1833        codewhale_protocol::SessionSource::Resume => SessionSource::Resume,
1834        codewhale_protocol::SessionSource::Fork => SessionSource::Fork,
1835        codewhale_protocol::SessionSource::Api => SessionSource::Api,
1836        codewhale_protocol::SessionSource::Unknown => SessionSource::Unknown,
1837    }
1838}
1839
1840fn approval_request_frame(
1841    requirement: &ExecApprovalRequirement,
1842    matched_rule: Option<&str>,
1843    call_id: String,
1844    approval_id: String,
1845    turn_id: String,
1846    command: String,
1847    cwd: String,
1848) -> Option<EventFrame> {
1849    let ExecApprovalRequirement::NeedsApproval {
1850        reason,
1851        proposed_execpolicy_amendment,
1852        proposed_network_policy_amendments,
1853    } = requirement
1854    else {
1855        return None;
1856    };
1857
1858    let mut available_decisions = vec![
1859        ReviewDecision::Approved,
1860        ReviewDecision::ApprovedForSession,
1861        ReviewDecision::Denied,
1862        ReviewDecision::Abort,
1863    ];
1864    if proposed_execpolicy_amendment
1865        .as_ref()
1866        .is_some_and(|amendment| !amendment.prefixes.is_empty())
1867    {
1868        available_decisions.push(ReviewDecision::ApprovedExecpolicyAmendment);
1869    }
1870    available_decisions.extend(proposed_network_policy_amendments.iter().cloned().map(
1871        |amendment| ReviewDecision::NetworkPolicyAmendment {
1872            host: amendment.host,
1873            action: amendment.action,
1874        },
1875    ));
1876
1877    Some(EventFrame::ExecApprovalRequest {
1878        request: ExecApprovalRequestEvent {
1879            call_id,
1880            approval_id,
1881            turn_id,
1882            command,
1883            cwd,
1884            reason: reason.clone(),
1885            matched_rule: matched_rule.map(|rule| rule.to_string().into_boxed_str()),
1886            network_approval_context: None,
1887            proposed_execpolicy_amendment: proposed_execpolicy_amendment
1888                .as_ref()
1889                .map(|amendment| amendment.prefixes.clone())
1890                .unwrap_or_default(),
1891            proposed_network_policy_amendments: proposed_network_policy_amendments.clone(),
1892            additional_permissions: Vec::new(),
1893            available_decisions,
1894        },
1895    })
1896}
1897
1898/// Build an [`EventFrame::UserInputRequest`] for a headless
1899/// `request_user_input` tool call, mirroring [`approval_request_frame`].
1900///
1901/// `arguments` is the raw JSON arguments string the model supplied to the
1902/// `request_user_input` tool (a `ToolPayload::Function` body). On parse
1903/// failure we return `None` so the caller falls through to the generic tool
1904/// error path rather than silently dropping the request.
1905fn user_input_request_frame(
1906    call_id: String,
1907    turn_id: String,
1908    request_id: String,
1909    arguments: &str,
1910) -> Option<EventFrame> {
1911    let parsed: Value = serde_json::from_str(arguments).ok()?;
1912    // Extract the `questions` array and lift it into the headless event
1913    // shape. We tolerate missing `allow_free_text`/`multi_select` (default
1914    // false) and extra fields, matching the lenient TUI `from_value` path.
1915    let questions = parsed.get("questions").cloned().filter(Value::is_array)?;
1916    let request = UserInputRequestEvent {
1917        call_id,
1918        turn_id,
1919        request_id,
1920        questions: serde_json::from_value(questions).ok()?,
1921    };
1922    Some(EventFrame::UserInputRequest { request })
1923}
1924
1925fn approval_requirement_payload(requirement: &ExecApprovalRequirement) -> Value {
1926    match requirement {
1927        ExecApprovalRequirement::Skip {
1928            bypass_sandbox,
1929            proposed_execpolicy_amendment,
1930        } => json!({
1931            "type": "skip",
1932            "bypass_sandbox": bypass_sandbox,
1933            "reason": requirement.reason(),
1934            "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1935                .as_ref()
1936                .map(|amendment| amendment.prefixes.clone())
1937                .unwrap_or_default()
1938        }),
1939        ExecApprovalRequirement::NeedsApproval {
1940            reason,
1941            proposed_execpolicy_amendment,
1942            proposed_network_policy_amendments,
1943        } => json!({
1944            "type": "needs_approval",
1945            "reason": reason,
1946            "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1947                .as_ref()
1948                .map(|amendment| amendment.prefixes.clone())
1949                .unwrap_or_default(),
1950            "proposed_network_policy_amendments": proposed_network_policy_amendments
1951        }),
1952        ExecApprovalRequirement::Forbidden { reason } => json!({
1953            "type": "forbidden",
1954            "reason": reason
1955        }),
1956    }
1957}
1958
1959fn policy_precheck_payload(
1960    decision: &ExecPolicyDecision,
1961    command: &str,
1962    cwd: &str,
1963    execution_kind: &str,
1964) -> Value {
1965    json!({
1966        "execution_kind": execution_kind,
1967        "command": command,
1968        "cwd": cwd,
1969        "allow": decision.allow,
1970        "requires_approval": decision.requires_approval,
1971        "matched_rule": decision.matched_rule.clone(),
1972        "phase": decision.requirement.phase(),
1973        "reason": decision.reason(),
1974        "requirement": approval_requirement_payload(&decision.requirement)
1975    })
1976}
1977
1978fn tool_payload_value(payload: &ToolPayload) -> Value {
1979    serde_json::to_value(payload).unwrap_or_else(
1980        |_| json!({"type":"serialization_error","message":"tool payload unavailable"}),
1981    )
1982}
1983
1984fn tool_output_value(output: &codewhale_protocol::ToolOutput) -> Value {
1985    serde_json::to_value(output).unwrap_or_else(
1986        |_| json!({"type":"serialization_error","message":"tool output unavailable"}),
1987    )
1988}
1989
1990fn event_frame_payload(frame: &EventFrame) -> Value {
1991    serde_json::to_value(frame)
1992        .unwrap_or_else(|_| json!({"event":"error","message":"failed to encode event frame"}))
1993}
1994
1995/// Tool name that triggers the headless clarification-question flow.
1996///
1997/// Mirrors the TUI's `REQUEST_USER_INPUT_NAME`
1998/// (`crates/tui/src/core/engine/tool_catalog.rs`); duplicated here rather than
1999/// depended on across crates so `core` stays free of `tui` imports.
2000const REQUEST_USER_INPUT_TOOL_NAME: &str = "request_user_input";
2001
2002fn json_optional_string(value: &Value) -> Option<String> {
2003    if value.is_null() {
2004        None
2005    } else {
2006        value.as_str().map(ToString::to_string)
2007    }
2008}
2009
2010fn parse_retry_metadata(value: Option<&Value>) -> JobRetryMetadata {
2011    let Some(value) = value else {
2012        return JobRetryMetadata::default();
2013    };
2014    JobRetryMetadata {
2015        attempt: value
2016            .get("attempt")
2017            .and_then(Value::as_u64)
2018            .unwrap_or(0)
2019            .min(u32::MAX as u64) as u32,
2020        max_attempts: value
2021            .get("max_attempts")
2022            .and_then(Value::as_u64)
2023            .unwrap_or(DEFAULT_JOB_MAX_ATTEMPTS as u64)
2024            .min(u32::MAX as u64) as u32,
2025        backoff_base_ms: value
2026            .get("backoff_base_ms")
2027            .and_then(Value::as_u64)
2028            .unwrap_or(DEFAULT_JOB_BACKOFF_BASE_MS),
2029        next_backoff_ms: value
2030            .get("next_backoff_ms")
2031            .and_then(Value::as_u64)
2032            .unwrap_or(0),
2033        next_retry_at: value.get("next_retry_at").and_then(Value::as_i64),
2034    }
2035}
2036
2037fn parse_history_entry(value: &Value) -> Option<JobHistoryEntry> {
2038    let status = value
2039        .get("status")
2040        .and_then(Value::as_str)
2041        .and_then(job_status_from_str)?;
2042    Some(JobHistoryEntry {
2043        at: value.get("at").and_then(Value::as_i64).unwrap_or(0),
2044        phase: value
2045            .get("phase")
2046            .and_then(Value::as_str)
2047            .unwrap_or("unknown")
2048            .to_string(),
2049        status,
2050        progress: value
2051            .get("progress")
2052            .and_then(Value::as_u64)
2053            .map(|v| v.min(u8::MAX as u64) as u8),
2054        detail: value.get("detail").and_then(json_optional_string),
2055        retry: parse_retry_metadata(value.get("retry")),
2056    })
2057}
2058
2059fn job_status_to_str(status: JobStatus) -> &'static str {
2060    match status {
2061        JobStatus::Queued => "queued",
2062        JobStatus::Running => "running",
2063        JobStatus::Paused => "paused",
2064        JobStatus::Completed => "completed",
2065        JobStatus::Failed => "failed",
2066        JobStatus::Cancelled => "cancelled",
2067    }
2068}
2069
2070fn job_status_from_str(value: &str) -> Option<JobStatus> {
2071    match value {
2072        "queued" => Some(JobStatus::Queued),
2073        "running" => Some(JobStatus::Running),
2074        "paused" => Some(JobStatus::Paused),
2075        "completed" => Some(JobStatus::Completed),
2076        "failed" => Some(JobStatus::Failed),
2077        "cancelled" => Some(JobStatus::Cancelled),
2078        _ => None,
2079    }
2080}
2081
2082fn job_retry_to_value(retry: &JobRetryMetadata) -> Value {
2083    json!({
2084        "attempt": retry.attempt,
2085        "max_attempts": retry.max_attempts,
2086        "backoff_base_ms": retry.backoff_base_ms,
2087        "next_backoff_ms": retry.next_backoff_ms,
2088        "next_retry_at": retry.next_retry_at
2089    })
2090}
2091
2092fn job_history_to_value(entry: &JobHistoryEntry) -> Value {
2093    json!({
2094        "at": entry.at,
2095        "phase": entry.phase.clone(),
2096        "status": job_status_to_str(entry.status),
2097        "progress": entry.progress,
2098        "detail": entry.detail.clone(),
2099        "retry": job_retry_to_value(&entry.retry)
2100    })
2101}
2102
2103fn runtime_status_to_job_state(status: JobStatus) -> JobStateStatus {
2104    match status {
2105        JobStatus::Queued => JobStateStatus::Queued,
2106        JobStatus::Running => JobStateStatus::Running,
2107        JobStatus::Paused => JobStateStatus::Running,
2108        JobStatus::Completed => JobStateStatus::Completed,
2109        JobStatus::Failed => JobStateStatus::Failed,
2110        JobStatus::Cancelled => JobStateStatus::Cancelled,
2111    }
2112}
2113
2114fn job_state_status_to_runtime(status: JobStateStatus) -> JobStatus {
2115    match status {
2116        JobStateStatus::Queued => JobStatus::Queued,
2117        JobStateStatus::Running => JobStatus::Running,
2118        JobStateStatus::Completed => JobStatus::Completed,
2119        JobStateStatus::Failed => JobStatus::Failed,
2120        JobStateStatus::Cancelled => JobStatus::Cancelled,
2121    }
2122}
2123
2124#[cfg(test)]
2125mod tests {
2126    use super::*;
2127    use codewhale_tools::ToolCallSource;
2128
2129    fn temp_core_state(name: &str) -> StateStore {
2130        let dir =
2131            std::env::temp_dir().join(format!("codewhale-core-{name}-{}", Uuid::new_v4().simple()));
2132        std::fs::create_dir_all(&dir).expect("create temp state dir");
2133        StateStore::open(Some(dir.join("state.db"))).expect("open state store")
2134    }
2135
2136    fn test_thread_metadata(id: &str) -> ThreadMetadata {
2137        ThreadMetadata {
2138            id: id.to_string(),
2139            rollout_path: None,
2140            preview: "test thread".to_string(),
2141            ephemeral: false,
2142            model_provider: "deepseek".to_string(),
2143            created_at: 10,
2144            updated_at: 10,
2145            status: PersistedThreadStatus::Running,
2146            path: None,
2147            cwd: PathBuf::from("/tmp/codewhale"),
2148            cli_version: "0.0.0-test".to_string(),
2149            source: SessionSource::Interactive,
2150            name: None,
2151            sandbox_policy: None,
2152            approval_mode: None,
2153            archived: false,
2154            archived_at: None,
2155            git_sha: None,
2156            git_branch: None,
2157            git_origin_url: None,
2158            memory_mode: None,
2159            current_leaf_id: None,
2160        }
2161    }
2162
2163    // ── JobManager: lifecycle ──────────────────────────────────────────
2164
2165    #[test]
2166    fn permission_path_for_call_extracts_function_path_argument() {
2167        let call = ToolCall {
2168            name: "read_file".to_string(),
2169            payload: ToolPayload::Function {
2170                arguments: json!({ "path": "README.md" }).to_string(),
2171            },
2172            source: ToolCallSource::Direct,
2173            raw_tool_call_id: None,
2174        };
2175
2176        assert_eq!(
2177            permission_path_for_call(&call).as_deref(),
2178            Some("README.md")
2179        );
2180    }
2181
2182    #[test]
2183    fn permission_path_for_call_extracts_mcp_path_argument() {
2184        let call = ToolCall {
2185            name: "mcp_fs_read".to_string(),
2186            payload: ToolPayload::Mcp {
2187                server: "fs".to_string(),
2188                tool: "read".to_string(),
2189                raw_arguments: json!({ "path": "secrets/token.txt" }),
2190                raw_tool_call_id: None,
2191            },
2192            source: ToolCallSource::Direct,
2193            raw_tool_call_id: None,
2194        };
2195
2196        assert_eq!(
2197            permission_path_for_call(&call).as_deref(),
2198            Some("secrets/token.txt")
2199        );
2200    }
2201
2202    #[test]
2203    fn permission_path_for_call_ignores_shell_payload() {
2204        let call = ToolCall {
2205            name: "exec_shell".to_string(),
2206            payload: ToolPayload::LocalShell {
2207                params: codewhale_protocol::LocalShellParams {
2208                    command: "cargo test".to_string(),
2209                    cwd: None,
2210                    timeout_ms: None,
2211                },
2212            },
2213            source: ToolCallSource::Direct,
2214            raw_tool_call_id: None,
2215        };
2216
2217        assert_eq!(permission_path_for_call(&call), None);
2218    }
2219
2220    #[test]
2221    fn thread_goal_progress_accumulates_durable_accounting() {
2222        let store = temp_core_state("thread-goal-progress");
2223        store
2224            .upsert_thread(&test_thread_metadata("thread-1"))
2225            .expect("upsert thread");
2226        let mut manager = ThreadManager::new(store);
2227        manager
2228            .set_thread_goal(&ThreadGoalSetParams {
2229                thread_id: "thread-1".to_string(),
2230                objective: "Carry the goal across turns".to_string(),
2231                token_budget: Some(2_000),
2232            })
2233            .expect("set goal")
2234            .expect("goal exists");
2235
2236        let updated = manager
2237            .record_thread_goal_progress(&ThreadGoalProgressParams {
2238                thread_id: "thread-1".to_string(),
2239                token_delta: 750,
2240                time_delta_seconds: 12,
2241                record_continuation: true,
2242            })
2243            .expect("record progress")
2244            .expect("goal exists");
2245
2246        assert_eq!(updated.tokens_used, 750);
2247        assert_eq!(updated.time_used_seconds, 12);
2248        assert_eq!(updated.continuation_count, 1);
2249
2250        let persisted = manager
2251            .get_thread_goal(&ThreadGoalGetParams {
2252                thread_id: "thread-1".to_string(),
2253            })
2254            .expect("read goal")
2255            .expect("goal exists");
2256        assert_eq!(persisted.tokens_used, 750);
2257        assert_eq!(persisted.time_used_seconds, 12);
2258        assert_eq!(persisted.continuation_count, 1);
2259    }
2260
2261    #[test]
2262    fn approval_request_frame_includes_matched_rule() {
2263        let requirement = ExecApprovalRequirement::NeedsApproval {
2264            reason: "Typed ask rule 'tool=exec_shell command=cargo test' requires approval."
2265                .to_string(),
2266            proposed_execpolicy_amendment: None,
2267            proposed_network_policy_amendments: Vec::new(),
2268        };
2269
2270        let frame = approval_request_frame(
2271            &requirement,
2272            Some("tool=exec_shell command=cargo test"),
2273            "call-1".to_string(),
2274            "approval-1".to_string(),
2275            "turn-1".to_string(),
2276            "cargo test --workspace".to_string(),
2277            "/repo".to_string(),
2278        )
2279        .expect("approval frame");
2280
2281        let EventFrame::ExecApprovalRequest { request } = frame else {
2282            panic!("expected exec approval request frame");
2283        };
2284        assert_eq!(
2285            request.matched_rule.as_deref(),
2286            Some("tool=exec_shell command=cargo test")
2287        );
2288        assert_eq!(request.reason, requirement.reason());
2289    }
2290
2291    #[test]
2292    fn user_input_request_frame_lifts_questions_from_arguments() {
2293        // issue #3102: the headless frame constructor must parse the model's
2294        // `request_user_input` arguments and lift the questions into the
2295        // UserInputRequestEvent, defaulting the boolean flags when omitted.
2296        let arguments = r#"{"questions":[{"header":"Scope","id":"scope","question":"Which?","options":[{"label":"A","description":"a"},{"label":"B","description":"b"}],"allow_free_text":true}]}"#;
2297        let frame = user_input_request_frame(
2298            "call-1".to_string(),
2299            "turn-1".to_string(),
2300            "ui-1".to_string(),
2301            arguments,
2302        )
2303        .expect("user input frame");
2304
2305        let EventFrame::UserInputRequest { request } = frame else {
2306            panic!("expected user_input_request frame");
2307        };
2308        assert_eq!(request.call_id, "call-1");
2309        assert_eq!(request.turn_id, "turn-1");
2310        assert_eq!(request.request_id, "ui-1");
2311        assert_eq!(request.questions.len(), 1);
2312        assert_eq!(request.questions[0].id, "scope");
2313        assert!(request.questions[0].allow_free_text);
2314        // multi_select omitted in the payload → defaults to false.
2315        assert!(!request.questions[0].multi_select);
2316        assert_eq!(request.questions[0].options.len(), 2);
2317    }
2318
2319    #[test]
2320    fn user_input_request_frame_returns_none_on_invalid_arguments() {
2321        // On parse failure the constructor returns None so invoke_tool falls
2322        // through to the generic tool error path instead of silently dropping.
2323        let frame = user_input_request_frame(
2324            "call-1".to_string(),
2325            "turn-1".to_string(),
2326            "ui-1".to_string(),
2327            "not json",
2328        );
2329        assert!(frame.is_none());
2330
2331        // Valid JSON but missing the questions array is also rejected.
2332        let frame = user_input_request_frame(
2333            "call-1".to_string(),
2334            "turn-1".to_string(),
2335            "ui-1".to_string(),
2336            r#"{"foo":"bar"}"#,
2337        );
2338        assert!(frame.is_none());
2339    }
2340
2341    #[test]
2342    fn enqueue_creates_queued_job_with_zero_progress() {
2343        let mut jm = JobManager::default();
2344        let job = jm.enqueue("build");
2345        assert_eq!(job.name, "build");
2346        assert_eq!(job.status, JobStatus::Queued);
2347        assert_eq!(job.progress, Some(0));
2348        assert!(job.detail.is_none());
2349        assert_eq!(job.history.len(), 1);
2350        assert_eq!(job.history[0].phase, "created");
2351    }
2352
2353    #[test]
2354    fn set_running_transitions_from_queued() {
2355        let mut jm = JobManager::default();
2356        let job = jm.enqueue("deploy");
2357        let id = job.id.clone();
2358        jm.set_running(&id);
2359        let jobs = jm.list();
2360        let updated = jobs.iter().find(|j| j.id == id).unwrap();
2361        assert_eq!(updated.status, JobStatus::Running);
2362        assert_eq!(updated.history.last().unwrap().phase, "running");
2363    }
2364
2365    #[test]
2366    fn update_progress_clamps_to_100() {
2367        let mut jm = JobManager::default();
2368        let job = jm.enqueue("task");
2369        let id = job.id.clone();
2370        jm.update_progress(&id, 150, Some("over".to_string()));
2371        let jobs = jm.list();
2372        let updated = jobs.iter().find(|j| j.id == id).unwrap();
2373        assert_eq!(updated.progress, Some(100));
2374    }
2375
2376    #[test]
2377    fn complete_sets_progress_to_100() {
2378        let mut jm = JobManager::default();
2379        let job = jm.enqueue("task");
2380        let id = job.id.clone();
2381        jm.set_running(&id);
2382        jm.complete(&id);
2383        let jobs = jm.list();
2384        let updated = jobs.iter().find(|j| j.id == id).unwrap();
2385        assert_eq!(updated.status, JobStatus::Completed);
2386        assert_eq!(updated.progress, Some(100));
2387    }
2388
2389    #[test]
2390    fn fail_increments_attempt_and_sets_backoff() {
2391        let mut jm = JobManager::default();
2392        let job = jm.enqueue("fragile");
2393        let id = job.id.clone();
2394        jm.set_running(&id);
2395        jm.fail(&id, "crashed");
2396        let jobs = jm.list();
2397        let updated = jobs.iter().find(|j| j.id == id).unwrap();
2398        assert_eq!(updated.status, JobStatus::Failed);
2399        assert_eq!(updated.retry.attempt, 1);
2400        assert!(updated.retry.next_backoff_ms > 0);
2401        assert!(updated.retry.next_retry_at.is_some());
2402        assert_eq!(updated.detail.as_deref(), Some("crashed"));
2403    }
2404
2405    #[test]
2406    fn fail_clears_retry_after_max_attempts() {
2407        let mut jm = JobManager::default();
2408        let job = jm.enqueue("fragile");
2409        let id = job.id.clone();
2410        for _ in 0..=DEFAULT_JOB_MAX_ATTEMPTS {
2411            jm.set_running(&id);
2412            jm.fail(&id, "boom");
2413        }
2414        let jobs = jm.list();
2415        let updated = jobs.iter().find(|j| j.id == id).unwrap();
2416        assert_eq!(updated.retry.attempt, DEFAULT_JOB_MAX_ATTEMPTS);
2417        assert_eq!(updated.retry.next_backoff_ms, 0);
2418        assert!(updated.retry.next_retry_at.is_none());
2419    }
2420
2421    #[test]
2422    fn cancel_sets_status_and_clears_retry() {
2423        let mut jm = JobManager::default();
2424        let job = jm.enqueue("task");
2425        let id = job.id.clone();
2426        jm.cancel(&id);
2427        let jobs = jm.list();
2428        let updated = jobs.iter().find(|j| j.id == id).unwrap();
2429        assert_eq!(updated.status, JobStatus::Cancelled);
2430        assert_eq!(updated.retry.next_backoff_ms, 0);
2431    }
2432
2433    #[test]
2434    fn pause_and_resume_round_trip() {
2435        let mut jm = JobManager::default();
2436        let job = jm.enqueue("task");
2437        let id = job.id.clone();
2438        jm.set_running(&id);
2439        jm.pause(&id, Some("waiting".to_string()));
2440        let jobs = jm.list();
2441        let paused = jobs.iter().find(|j| j.id == id).unwrap();
2442        assert_eq!(paused.status, JobStatus::Paused);
2443        assert_eq!(paused.detail.as_deref(), Some("waiting"));
2444
2445        jm.resume(&id, None);
2446        let jobs = jm.list();
2447        let resumed = jobs.iter().find(|j| j.id == id).unwrap();
2448        assert_eq!(resumed.status, JobStatus::Running);
2449        assert_eq!(resumed.history.last().unwrap().phase, "resumed");
2450    }
2451
2452    #[test]
2453    fn list_returns_jobs_sorted_by_updated_at_desc() {
2454        let mut jm = JobManager::default();
2455        jm.enqueue("first");
2456        jm.enqueue("second");
2457        jm.enqueue("third");
2458        let jobs = jm.list();
2459        assert_eq!(jobs.len(), 3);
2460        for window in jobs.windows(2) {
2461            assert!(window[0].updated_at >= window[1].updated_at);
2462        }
2463    }
2464
2465    #[test]
2466    fn history_returns_entries_for_existing_job() {
2467        let mut jm = JobManager::default();
2468        let job = jm.enqueue("task");
2469        let id = job.id.clone();
2470        jm.set_running(&id);
2471        jm.complete(&id);
2472        let history = jm.history(&id);
2473        assert_eq!(history.len(), 3); // created, running, completed
2474        assert_eq!(history[0].phase, "created");
2475        assert_eq!(history[1].phase, "running");
2476        assert_eq!(history[2].phase, "completed");
2477    }
2478
2479    #[test]
2480    fn history_returns_empty_for_unknown_job() {
2481        let jm = JobManager::default();
2482        assert!(jm.history("nonexistent").is_empty());
2483    }
2484
2485    #[test]
2486    fn resume_pending_requeues_running_and_queued() {
2487        let mut jm = JobManager::default();
2488        let _j1 = jm.enqueue("queued_task");
2489        let j2 = jm.enqueue("running_task");
2490        let j3 = jm.enqueue("completed_task");
2491        let id2 = j2.id.clone();
2492        let id3 = j3.id.clone();
2493        jm.set_running(&id2);
2494        jm.set_running(&id3);
2495        jm.complete(&id3);
2496
2497        let resumed = jm.resume_pending();
2498        assert_eq!(resumed.len(), 2);
2499        for job in &resumed {
2500            assert_eq!(job.status, JobStatus::Queued);
2501        }
2502    }
2503
2504    // ── JobManager: backoff ────────────────────────────────────────────
2505
2506    #[test]
2507    fn deterministic_backoff_zero_on_first_attempt() {
2508        let retry = JobRetryMetadata {
2509            attempt: 0,
2510            ..Default::default()
2511        };
2512        assert_eq!(JobManager::deterministic_backoff_ms(&retry), 0);
2513    }
2514
2515    #[test]
2516    fn deterministic_backoff_exponential_growth() {
2517        let base = DEFAULT_JOB_BACKOFF_BASE_MS;
2518        for attempt in 1..=5 {
2519            let retry = JobRetryMetadata {
2520                attempt,
2521                backoff_base_ms: base,
2522                ..Default::default()
2523            };
2524            let expected = base * 2u64.pow(attempt.saturating_sub(1).min(20));
2525            assert_eq!(
2526                JobManager::deterministic_backoff_ms(&retry),
2527                expected,
2528                "attempt {attempt}"
2529            );
2530        }
2531    }
2532
2533    #[test]
2534    fn deterministic_backoff_saturates_at_high_exponent() {
2535        let retry = JobRetryMetadata {
2536            attempt: 63,
2537            backoff_base_ms: 1000,
2538            ..Default::default()
2539        };
2540        // Should not panic; result saturates
2541        let _ = JobManager::deterministic_backoff_ms(&retry);
2542    }
2543
2544    // ── JobManager: history truncation ─────────────────────────────────
2545
2546    #[test]
2547    fn push_history_truncates_beyond_max() {
2548        let mut jm = JobManager::default();
2549        let job = jm.enqueue("task");
2550        let id = job.id.clone();
2551        // Generate more history entries than the limit
2552        for i in 0..(MAX_JOB_HISTORY_ENTRIES + 20) {
2553            jm.update_progress(&id, (i % 100) as u8, Some(format!("step {i}")));
2554        }
2555        let history = jm.history(&id);
2556        assert_eq!(history.len(), MAX_JOB_HISTORY_ENTRIES);
2557    }
2558
2559    // ── JobManager: persistence encoding/parsing ───────────────────────
2560
2561    #[test]
2562    fn encode_and_parse_persisted_detail_round_trip() {
2563        let mut jm = JobManager::default();
2564        let job = jm.enqueue("task");
2565        let id = job.id.clone();
2566        jm.set_running(&id);
2567        jm.fail(&id, "oops");
2568        let job = jm.list().into_iter().find(|j| j.id == id).unwrap();
2569
2570        let encoded = JobManager::encode_persisted_detail(&job).unwrap().unwrap();
2571        let parsed = JobManager::parse_persisted_detail(Some(&encoded)).unwrap();
2572
2573        assert_eq!(parsed.status, job.status);
2574        assert_eq!(parsed.detail, job.detail);
2575        assert_eq!(parsed.retry.attempt, job.retry.attempt);
2576        assert_eq!(parsed.history.len(), job.history.len());
2577    }
2578
2579    #[test]
2580    fn parse_persisted_detail_returns_none_for_none_input() {
2581        assert!(JobManager::parse_persisted_detail(None).is_none());
2582    }
2583
2584    #[test]
2585    fn parse_persisted_detail_returns_none_for_invalid_json() {
2586        assert!(JobManager::parse_persisted_detail(Some("not json")).is_none());
2587    }
2588
2589    // ── Helper functions ───────────────────────────────────────────────
2590
2591    #[test]
2592    fn job_status_round_trip_str() {
2593        let statuses = [
2594            JobStatus::Queued,
2595            JobStatus::Running,
2596            JobStatus::Paused,
2597            JobStatus::Completed,
2598            JobStatus::Failed,
2599            JobStatus::Cancelled,
2600        ];
2601        for status in &statuses {
2602            let s = job_status_to_str(*status);
2603            let parsed = job_status_from_str(s);
2604            assert_eq!(parsed, Some(*status), "round-trip failed for {s:?}");
2605        }
2606    }
2607
2608    #[test]
2609    fn job_status_from_str_returns_none_for_unknown() {
2610        assert_eq!(job_status_from_str("unknown"), None);
2611        assert_eq!(job_status_from_str(""), None);
2612    }
2613
2614    #[test]
2615    fn truncate_preview_limits_to_120_chars() {
2616        let long = "a".repeat(200);
2617        let truncated = truncate_preview(&long);
2618        assert_eq!(truncated.len(), 120);
2619    }
2620
2621    #[test]
2622    fn truncate_preview_preserves_short_strings() {
2623        let short = "hello";
2624        assert_eq!(truncate_preview(short), "hello");
2625    }
2626
2627    #[test]
2628    fn runtime_status_to_job_state_maps_correctly() {
2629        assert_eq!(
2630            runtime_status_to_job_state(JobStatus::Queued),
2631            JobStateStatus::Queued
2632        );
2633        assert_eq!(
2634            runtime_status_to_job_state(JobStatus::Running),
2635            JobStateStatus::Running
2636        );
2637        assert_eq!(
2638            runtime_status_to_job_state(JobStatus::Paused),
2639            JobStateStatus::Running
2640        );
2641        assert_eq!(
2642            runtime_status_to_job_state(JobStatus::Completed),
2643            JobStateStatus::Completed
2644        );
2645        assert_eq!(
2646            runtime_status_to_job_state(JobStatus::Failed),
2647            JobStateStatus::Failed
2648        );
2649        assert_eq!(
2650            runtime_status_to_job_state(JobStatus::Cancelled),
2651            JobStateStatus::Cancelled
2652        );
2653    }
2654
2655    #[test]
2656    fn job_state_status_to_runtime_maps_correctly() {
2657        assert_eq!(
2658            job_state_status_to_runtime(JobStateStatus::Queued),
2659            JobStatus::Queued
2660        );
2661        assert_eq!(
2662            job_state_status_to_runtime(JobStateStatus::Running),
2663            JobStatus::Running
2664        );
2665        assert_eq!(
2666            job_state_status_to_runtime(JobStateStatus::Completed),
2667            JobStatus::Completed
2668        );
2669        assert_eq!(
2670            job_state_status_to_runtime(JobStateStatus::Failed),
2671            JobStatus::Failed
2672        );
2673        assert_eq!(
2674            job_state_status_to_runtime(JobStateStatus::Cancelled),
2675            JobStatus::Cancelled
2676        );
2677    }
2678
2679    #[test]
2680    fn preview_from_initial_history_new() {
2681        let preview = preview_from_initial_history(&InitialHistory::New);
2682        assert_eq!(preview, "New conversation");
2683    }
2684
2685    #[test]
2686    fn preview_from_initial_history_forked() {
2687        let preview = preview_from_initial_history(&InitialHistory::Forked(vec![json!("hello")]));
2688        assert!(preview.contains("hello"));
2689    }
2690
2691    #[test]
2692    fn preview_from_initial_history_resumed() {
2693        let preview = preview_from_initial_history(&InitialHistory::Resumed {
2694            conversation_id: "test".to_string(),
2695            history: vec![json!("world")],
2696            rollout_path: PathBuf::from("/tmp/test"),
2697        });
2698        assert!(preview.contains("world"));
2699    }
2700
2701    #[test]
2702    fn json_optional_string_handles_null() {
2703        assert!(json_optional_string(&Value::Null).is_none());
2704    }
2705
2706    #[test]
2707    fn json_optional_string_handles_string() {
2708        assert_eq!(
2709            json_optional_string(&Value::String("hello".to_string())),
2710            Some("hello".to_string())
2711        );
2712    }
2713
2714    #[test]
2715    fn json_optional_string_handles_non_string() {
2716        assert!(json_optional_string(&json!(42)).is_none());
2717    }
2718
2719    #[test]
2720    fn parse_retry_metadata_returns_default_for_none() {
2721        let retry = parse_retry_metadata(None);
2722        assert_eq!(retry.attempt, 0);
2723        assert_eq!(retry.max_attempts, DEFAULT_JOB_MAX_ATTEMPTS);
2724        assert_eq!(retry.backoff_base_ms, DEFAULT_JOB_BACKOFF_BASE_MS);
2725    }
2726
2727    #[test]
2728    fn parse_retry_metadata_parses_fields() {
2729        let value = json!({
2730            "attempt": 2,
2731            "max_attempts": 5,
2732            "backoff_base_ms": 1000,
2733            "next_backoff_ms": 2000,
2734            "next_retry_at": 1234567890i64
2735        });
2736        let retry = parse_retry_metadata(Some(&value));
2737        assert_eq!(retry.attempt, 2);
2738        assert_eq!(retry.max_attempts, 5);
2739        assert_eq!(retry.backoff_base_ms, 1000);
2740        assert_eq!(retry.next_backoff_ms, 2000);
2741        assert_eq!(retry.next_retry_at, Some(1234567890));
2742    }
2743
2744    #[test]
2745    fn parse_history_entry_returns_none_without_status() {
2746        let value = json!({"at": 1, "phase": "test"});
2747        assert!(parse_history_entry(&value).is_none());
2748    }
2749
2750    #[test]
2751    fn parse_history_entry_parses_valid_entry() {
2752        let value = json!({
2753            "at": 100,
2754            "phase": "running",
2755            "status": "running",
2756            "progress": 50,
2757            "detail": "working",
2758            "retry": {"attempt": 0, "max_attempts": 3, "backoff_base_ms": 500}
2759        });
2760        let entry = parse_history_entry(&value).unwrap();
2761        assert_eq!(entry.at, 100);
2762        assert_eq!(entry.phase, "running");
2763        assert_eq!(entry.status, JobStatus::Running);
2764        assert_eq!(entry.progress, Some(50));
2765        assert_eq!(entry.detail.as_deref(), Some("working"));
2766    }
2767}