Skip to main content

codewhale_core/
lib.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use anyhow::Result;
6use codewhale_agent::ModelRegistry;
7use codewhale_config::{CliRuntimeOverrides, ConfigToml, ProviderKind};
8use codewhale_execpolicy::{
9    AskForApproval, ExecApprovalRequirement, ExecPolicyContext, ExecPolicyDecision,
10    ExecPolicyEngine,
11};
12use codewhale_hooks::{HookDispatcher, HookEvent};
13use codewhale_mcp::{
14    McpManager, McpStartupCompleteEvent, McpStartupStatus as McpManagerStartupStatus,
15};
16use codewhale_protocol::{
17    AppResponse, EventFrame, ExecApprovalRequestEvent, PromptRequest, PromptResponse,
18    ResponseChannel, ReviewDecision, Thread, ThreadForkParams, ThreadGoal, ThreadGoalClearParams,
19    ThreadGoalGetParams, ThreadGoalProgressParams, ThreadGoalSetParams, ThreadGoalStatus,
20    ThreadListParams, ThreadReadParams, ThreadRequest, ThreadResponse, ThreadResumeParams,
21    ThreadSetNameParams, ThreadStatus, ToolPayload,
22};
23use codewhale_state::{
24    JobStateRecord, JobStateStatus, SessionSource, StateStore, ThreadGoalRecord,
25    ThreadGoalStatus as PersistedThreadGoalStatus, ThreadListFilters, ThreadMetadata,
26    ThreadStatus as PersistedThreadStatus,
27};
28use codewhale_tools::{ToolCall, ToolRegistry};
29use serde_json::{Value, json};
30use uuid::Uuid;
31
32/// How a new thread's conversation history is initialized.
33#[derive(Debug, Clone)]
34pub enum InitialHistory {
35    /// Start with an empty conversation.
36    New,
37    /// Forked from an existing thread with the given history items.
38    Forked(Vec<Value>),
39    /// Resumed from a persisted thread with its full history.
40    Resumed {
41        conversation_id: String,
42        history: Vec<Value>,
43        rollout_path: PathBuf,
44    },
45}
46
47/// Result of spawning or resuming a thread.
48#[derive(Debug, Clone)]
49pub struct NewThread {
50    /// The thread metadata.
51    pub thread: Thread,
52    /// Resolved model identifier.
53    pub model: String,
54    /// Provider that serves the model.
55    pub model_provider: String,
56    /// Working directory for the thread.
57    pub cwd: PathBuf,
58    /// Approval policy override, if any.
59    pub approval_policy: Option<String>,
60    /// Sandbox mode override, if any.
61    pub sandbox: Option<String>,
62}
63
64/// Status of a background job.
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum JobStatus {
67    /// Waiting to be picked up.
68    Queued,
69    /// Currently executing.
70    Running,
71    /// Temporarily paused.
72    Paused,
73    /// Finished successfully.
74    Completed,
75    /// Finished with an error.
76    Failed,
77    /// Cancelled by the user.
78    Cancelled,
79}
80
81const JOB_DETAIL_SCHEMA_VERSION: u8 = 1;
82const DEFAULT_JOB_MAX_ATTEMPTS: u32 = 3;
83const DEFAULT_JOB_BACKOFF_BASE_MS: u64 = 500;
84const MAX_JOB_HISTORY_ENTRIES: usize = 64;
85
86/// Retry state for a job that failed and may be retried.
87#[derive(Debug, Clone)]
88pub struct JobRetryMetadata {
89    /// Current attempt number (0 = not yet retried).
90    pub attempt: u32,
91    /// Maximum number of retry attempts before giving up.
92    pub max_attempts: u32,
93    /// Base delay in milliseconds for exponential backoff.
94    pub backoff_base_ms: u64,
95    /// Computed delay in milliseconds until the next retry.
96    pub next_backoff_ms: u64,
97    /// Timestamp when the next retry should be attempted.
98    pub next_retry_at: Option<i64>,
99}
100
101impl Default for JobRetryMetadata {
102    fn default() -> Self {
103        Self {
104            attempt: 0,
105            max_attempts: DEFAULT_JOB_MAX_ATTEMPTS,
106            backoff_base_ms: DEFAULT_JOB_BACKOFF_BASE_MS,
107            next_backoff_ms: 0,
108            next_retry_at: None,
109        }
110    }
111}
112
113/// A single entry in a job's history log.
114#[derive(Debug, Clone)]
115pub struct JobHistoryEntry {
116    /// Timestamp when this entry was recorded.
117    pub at: i64,
118    /// Phase name (e.g., "created", "running", "failed").
119    pub phase: String,
120    /// Job status at this point in time.
121    pub status: JobStatus,
122    /// Progress percentage at this point, if available.
123    pub progress: Option<u8>,
124    /// Human-readable detail message.
125    pub detail: Option<String>,
126    /// Retry state snapshot at this point.
127    pub retry: JobRetryMetadata,
128}
129
130#[derive(Debug, Clone)]
131struct PersistedJobDetail {
132    pub status: JobStatus,
133    pub detail: Option<String>,
134    pub retry: JobRetryMetadata,
135    pub history: Vec<JobHistoryEntry>,
136}
137
138/// A complete job record with all metadata and history.
139#[derive(Debug, Clone)]
140pub struct JobRecord {
141    /// Unique job identifier.
142    pub id: String,
143    /// Human-readable job name.
144    pub name: String,
145    /// Current job status.
146    pub status: JobStatus,
147    /// Current progress percentage (0-100).
148    pub progress: Option<u8>,
149    /// Human-readable detail about the current state.
150    pub detail: Option<String>,
151    /// Retry state for failed jobs.
152    pub retry: JobRetryMetadata,
153    /// Chronological history of state transitions.
154    pub history: Vec<JobHistoryEntry>,
155    /// Timestamp when the job was created.
156    pub created_at: i64,
157    /// Timestamp of the last state change.
158    pub updated_at: i64,
159}
160
161/// Manages background jobs with retry logic and persistence.
162#[derive(Debug, Default)]
163pub struct JobManager {
164    jobs: HashMap<String, JobRecord>,
165}
166
167impl JobManager {
168    fn now_ts() -> i64 {
169        chrono::Utc::now().timestamp()
170    }
171
172    fn deterministic_backoff_ms(retry: &JobRetryMetadata) -> u64 {
173        if retry.attempt == 0 {
174            return 0;
175        }
176        let exponent = retry.attempt.saturating_sub(1).min(20);
177        let multiplier = 1u64.checked_shl(exponent).unwrap_or(u64::MAX);
178        retry.backoff_base_ms.saturating_mul(multiplier)
179    }
180
181    fn clear_retry_schedule(retry: &mut JobRetryMetadata) {
182        retry.next_backoff_ms = 0;
183        retry.next_retry_at = None;
184    }
185
186    fn push_history(job: &mut JobRecord, phase: &str) {
187        job.history.push(JobHistoryEntry {
188            at: job.updated_at,
189            phase: phase.to_string(),
190            status: job.status,
191            progress: job.progress,
192            detail: job.detail.clone(),
193            retry: job.retry.clone(),
194        });
195        if job.history.len() > MAX_JOB_HISTORY_ENTRIES {
196            let to_drain = job.history.len() - MAX_JOB_HISTORY_ENTRIES;
197            job.history.drain(0..to_drain);
198        }
199    }
200
201    fn parse_persisted_detail(raw: Option<&str>) -> Option<PersistedJobDetail> {
202        let raw = raw?;
203        let parsed: Value = serde_json::from_str(raw).ok()?;
204        let status = parsed
205            .get("status")
206            .and_then(Value::as_str)
207            .and_then(job_status_from_str)?;
208        let detail = parsed.get("detail").and_then(json_optional_string);
209        let retry = parse_retry_metadata(parsed.get("retry"));
210        let history = parsed
211            .get("history")
212            .and_then(Value::as_array)
213            .map(|items| {
214                items
215                    .iter()
216                    .filter_map(parse_history_entry)
217                    .collect::<Vec<_>>()
218            })
219            .unwrap_or_default();
220        Some(PersistedJobDetail {
221            status,
222            detail,
223            retry,
224            history,
225        })
226    }
227
228    fn encode_persisted_detail(job: &JobRecord) -> Result<Option<String>> {
229        let encoded = json!({
230            "schema_version": JOB_DETAIL_SCHEMA_VERSION,
231            "status": job_status_to_str(job.status),
232            "detail": job.detail.clone(),
233            "retry": job_retry_to_value(&job.retry),
234            "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
235        })
236        .to_string();
237        Ok(Some(encoded))
238    }
239
240    /// Enqueues a new job and returns its record.
241    pub fn enqueue(&mut self, name: impl Into<String>) -> JobRecord {
242        let now = Self::now_ts();
243        let id = format!("job-{}", Uuid::new_v4());
244        let mut job = JobRecord {
245            id: id.clone(),
246            name: name.into(),
247            status: JobStatus::Queued,
248            progress: Some(0),
249            detail: None,
250            retry: JobRetryMetadata::default(),
251            history: Vec::new(),
252            created_at: now,
253            updated_at: now,
254        };
255        Self::push_history(&mut job, "created");
256        self.jobs.insert(id, job.clone());
257        job
258    }
259
260    /// Transitions a job to running and clears its retry schedule.
261    pub fn set_running(&mut self, id: &str) {
262        if let Some(job) = self.jobs.get_mut(id) {
263            job.status = JobStatus::Running;
264            Self::clear_retry_schedule(&mut job.retry);
265            job.updated_at = Self::now_ts();
266            Self::push_history(job, "running");
267        }
268    }
269
270    /// Updates a job's progress (clamped to 100) and optional detail message.
271    pub fn update_progress(&mut self, id: &str, progress: u8, detail: Option<String>) {
272        if let Some(job) = self.jobs.get_mut(id) {
273            job.progress = Some(progress.min(100));
274            job.detail = detail;
275            job.updated_at = Self::now_ts();
276            Self::push_history(job, "progress_updated");
277        }
278    }
279
280    /// Marks a job as completed with 100% progress and clears its retry schedule.
281    pub fn complete(&mut self, id: &str) {
282        if let Some(job) = self.jobs.get_mut(id) {
283            job.status = JobStatus::Completed;
284            job.progress = Some(100);
285            Self::clear_retry_schedule(&mut job.retry);
286            job.updated_at = Self::now_ts();
287            Self::push_history(job, "completed");
288        }
289    }
290
291    /// Marks a job as failed and schedules a retry if attempts remain.
292    pub fn fail(&mut self, id: &str, detail: impl Into<String>) {
293        if let Some(job) = self.jobs.get_mut(id) {
294            let now = Self::now_ts();
295            job.status = JobStatus::Failed;
296            job.detail = Some(detail.into());
297            if job.retry.attempt < job.retry.max_attempts {
298                job.retry.attempt += 1;
299                job.retry.next_backoff_ms = Self::deterministic_backoff_ms(&job.retry);
300                let delay_secs = ((job.retry.next_backoff_ms.saturating_add(999)) / 1000)
301                    .min(i64::MAX as u64) as i64;
302                job.retry.next_retry_at = Some(now.saturating_add(delay_secs));
303            } else {
304                Self::clear_retry_schedule(&mut job.retry);
305            }
306            job.updated_at = now;
307            Self::push_history(job, "failed");
308        }
309    }
310
311    /// Cancels a job and clears any pending retry schedule.
312    pub fn cancel(&mut self, id: &str) {
313        if let Some(job) = self.jobs.get_mut(id) {
314            job.status = JobStatus::Cancelled;
315            Self::clear_retry_schedule(&mut job.retry);
316            job.updated_at = Self::now_ts();
317            Self::push_history(job, "cancelled");
318        }
319    }
320
321    /// Pauses a job, optionally updating its detail message.
322    pub fn pause(&mut self, id: &str, detail: Option<String>) {
323        if let Some(job) = self.jobs.get_mut(id) {
324            job.status = JobStatus::Paused;
325            if detail.is_some() {
326                job.detail = detail;
327            }
328            job.updated_at = Self::now_ts();
329            Self::push_history(job, "paused");
330        }
331    }
332
333    /// Resumes a paused or failed job back to running status.
334    pub fn resume(&mut self, id: &str, detail: Option<String>) {
335        if let Some(job) = self.jobs.get_mut(id) {
336            job.status = JobStatus::Running;
337            if detail.is_some() {
338                job.detail = detail;
339            }
340            Self::clear_retry_schedule(&mut job.retry);
341            job.updated_at = Self::now_ts();
342            Self::push_history(job, "resumed");
343        }
344    }
345
346    /// Returns all jobs sorted by most recently updated first.
347    pub fn list(&self) -> Vec<JobRecord> {
348        let mut out = self.jobs.values().cloned().collect::<Vec<_>>();
349        out.sort_by_key(|job| std::cmp::Reverse(job.updated_at));
350        out
351    }
352
353    /// Returns the history entries for a job, or an empty vec if not found.
354    pub fn history(&self, id: &str) -> Vec<JobHistoryEntry> {
355        self.jobs
356            .get(id)
357            .map(|job| job.history.clone())
358            .unwrap_or_default()
359    }
360
361    /// Resets queued or running jobs back to queued on application resume.
362    pub fn resume_pending(&mut self) -> Vec<JobRecord> {
363        let mut resumed = Vec::new();
364        for job in self.jobs.values_mut() {
365            if matches!(job.status, JobStatus::Queued | JobStatus::Running) {
366                job.status = JobStatus::Queued;
367                job.updated_at = Self::now_ts();
368                Self::push_history(job, "queued_after_resume");
369                resumed.push(job.clone());
370            }
371        }
372        resumed
373    }
374
375    /// Loads jobs from the state store, deserializing extended detail when available.
376    pub fn load_from_store(&mut self, store: &StateStore) -> Result<()> {
377        let persisted = store.list_jobs(Some(500))?;
378        for job in persisted {
379            let fallback_status = job_state_status_to_runtime(job.status);
380            let parsed = Self::parse_persisted_detail(job.detail.as_deref());
381            let (status, detail, retry, history) = if let Some(detail_state) = parsed {
382                (
383                    detail_state.status,
384                    detail_state.detail,
385                    detail_state.retry,
386                    detail_state.history,
387                )
388            } else {
389                (
390                    fallback_status,
391                    job.detail,
392                    JobRetryMetadata::default(),
393                    Vec::new(),
394                )
395            };
396            self.jobs.insert(
397                job.id.clone(),
398                JobRecord {
399                    id: job.id,
400                    name: job.name,
401                    status,
402                    progress: job.progress,
403                    detail,
404                    retry,
405                    history,
406                    created_at: job.created_at,
407                    updated_at: job.updated_at,
408                },
409            );
410        }
411        Ok(())
412    }
413
414    /// Persists a single job's current state to the state store.
415    pub fn persist_job(&self, store: &StateStore, id: &str) -> Result<()> {
416        let Some(job) = self.jobs.get(id) else {
417            return Ok(());
418        };
419        let encoded_detail = Self::encode_persisted_detail(job)?;
420        store.upsert_job(&JobStateRecord {
421            id: job.id.clone(),
422            name: job.name.clone(),
423            status: runtime_status_to_job_state(job.status),
424            progress: job.progress,
425            detail: encoded_detail,
426            created_at: job.created_at,
427            updated_at: job.updated_at,
428        })
429    }
430
431    /// Persists all in-memory jobs to the state store.
432    pub fn persist_all(&self, store: &StateStore) -> Result<()> {
433        for id in self.jobs.keys() {
434            self.persist_job(store, id)?;
435        }
436        Ok(())
437    }
438}
439
440/// Manages thread lifecycle: spawn, resume, fork, archive, and persistence.
441pub struct ThreadManager {
442    store: StateStore,
443    running_threads: HashMap<String, Thread>,
444    cli_version: String,
445}
446
447impl ThreadManager {
448    /// Creates a new `ThreadManager` backed by the given state store.
449    pub fn new(store: StateStore) -> Self {
450        Self {
451            store,
452            running_threads: HashMap::new(),
453            cli_version: env!("CARGO_PKG_VERSION").to_string(),
454        }
455    }
456
457    /// Returns a reference to the underlying state store.
458    pub fn state_store(&self) -> &StateStore {
459        &self.store
460    }
461
462    /// Spawns a new thread with the given initial history and persists it.
463    pub fn spawn_thread_with_history(
464        &mut self,
465        model_provider: String,
466        cwd: PathBuf,
467        initial_history: InitialHistory,
468        persist_extended_history: bool,
469    ) -> Result<NewThread> {
470        let id = format!("thread-{}", Uuid::new_v4());
471        let now = chrono::Utc::now().timestamp();
472        let preview = preview_from_initial_history(&initial_history);
473        let source = match initial_history {
474            InitialHistory::New => SessionSource::Interactive,
475            InitialHistory::Forked(_) => SessionSource::Fork,
476            InitialHistory::Resumed { .. } => SessionSource::Resume,
477        };
478        let thread = Thread {
479            id: id.clone(),
480            preview,
481            ephemeral: !persist_extended_history,
482            model_provider: model_provider.clone(),
483            created_at: now,
484            updated_at: now,
485            status: ThreadStatus::Running,
486            path: None,
487            cwd: cwd.clone(),
488            cli_version: self.cli_version.clone(),
489            source: match source {
490                SessionSource::Interactive => codewhale_protocol::SessionSource::Interactive,
491                SessionSource::Resume => codewhale_protocol::SessionSource::Resume,
492                SessionSource::Fork => codewhale_protocol::SessionSource::Fork,
493                SessionSource::Api => codewhale_protocol::SessionSource::Api,
494                SessionSource::Unknown => codewhale_protocol::SessionSource::Unknown,
495            },
496            name: None,
497        };
498        self.persist_thread(&thread, None)?;
499        match &initial_history {
500            InitialHistory::Forked(items) => {
501                for item in items {
502                    self.store.append_message(
503                        &thread.id,
504                        "history",
505                        &item.to_string(),
506                        Some(item.clone()),
507                    )?;
508                }
509            }
510            InitialHistory::Resumed { history, .. } => {
511                for item in history {
512                    self.store.append_message(
513                        &thread.id,
514                        "history",
515                        &item.to_string(),
516                        Some(item.clone()),
517                    )?;
518                }
519            }
520            InitialHistory::New => {}
521        }
522        self.running_threads
523            .insert(thread.id.clone(), thread.clone());
524        Ok(NewThread {
525            thread,
526            model: "auto".to_string(),
527            model_provider,
528            cwd,
529            approval_policy: None,
530            sandbox: None,
531        })
532    }
533
534    /// Resumes an existing thread, returning `None` if not found.
535    pub fn resume_thread_with_history(
536        &mut self,
537        params: &ThreadResumeParams,
538        fallback_cwd: &Path,
539        model_provider: String,
540    ) -> Result<Option<NewThread>> {
541        if params.history.is_none()
542            && let Some(thread) = self.running_threads.get(&params.thread_id).cloned()
543        {
544            return Ok(Some(NewThread {
545                model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
546                model_provider: params.model_provider.clone().unwrap_or(model_provider),
547                cwd: params.cwd.clone().unwrap_or_else(|| thread.cwd.clone()),
548                approval_policy: params.approval_policy.clone(),
549                sandbox: params.sandbox.clone(),
550                thread,
551            }));
552        }
553
554        let persisted = self.store.get_thread(&params.thread_id)?;
555        let Some(metadata) = persisted else {
556            return Ok(None);
557        };
558        let mut thread = to_protocol_thread(metadata);
559        thread.status = ThreadStatus::Running;
560        thread.updated_at = chrono::Utc::now().timestamp();
561        thread.cwd = params
562            .cwd
563            .clone()
564            .unwrap_or_else(|| fallback_cwd.to_path_buf());
565        self.persist_thread(&thread, None)?;
566        self.running_threads
567            .insert(thread.id.clone(), thread.clone());
568        if let Some(history) = params.history.as_ref() {
569            for item in history {
570                self.store.append_message(
571                    &thread.id,
572                    "history",
573                    &item.to_string(),
574                    Some(item.clone()),
575                )?;
576            }
577        }
578
579        Ok(Some(NewThread {
580            model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
581            model_provider: params.model_provider.clone().unwrap_or(model_provider),
582            cwd: thread.cwd.clone(),
583            approval_policy: params.approval_policy.clone(),
584            sandbox: params.sandbox.clone(),
585            thread,
586        }))
587    }
588
589    /// Forks an existing thread into a new one, inheriting the parent's provider.
590    pub fn fork_thread(
591        &mut self,
592        params: &ThreadForkParams,
593        fallback_cwd: &Path,
594    ) -> Result<Option<NewThread>> {
595        let parent = self.store.get_thread(&params.thread_id)?;
596        let Some(parent) = parent else {
597            return Ok(None);
598        };
599        let parent_thread = to_protocol_thread(parent);
600        let new = self.spawn_thread_with_history(
601            params
602                .model_provider
603                .clone()
604                .unwrap_or_else(|| parent_thread.model_provider.clone()),
605            params
606                .cwd
607                .clone()
608                .unwrap_or_else(|| fallback_cwd.to_path_buf()),
609            InitialHistory::Forked(vec![json!({
610                "type": "fork",
611                "from_thread_id": parent_thread.id
612            })]),
613            params.persist_extended_history,
614        )?;
615        Ok(Some(new))
616    }
617
618    /// Lists threads matching the given filter parameters.
619    pub fn list_threads(&self, params: &ThreadListParams) -> Result<Vec<Thread>> {
620        let list = self.store.list_threads(ThreadListFilters {
621            include_archived: params.include_archived,
622            limit: params.limit,
623        })?;
624        Ok(list.into_iter().map(to_protocol_thread).collect())
625    }
626
627    /// Reads a single thread by id, or `None` if not found.
628    pub fn read_thread(&self, params: &ThreadReadParams) -> Result<Option<Thread>> {
629        Ok(self
630            .store
631            .get_thread(&params.thread_id)?
632            .map(to_protocol_thread))
633    }
634
635    /// Sets the display name for a thread, returning the updated thread or `None`.
636    pub fn set_thread_name(&mut self, params: &ThreadSetNameParams) -> Result<Option<Thread>> {
637        let Some(mut metadata) = self.store.get_thread(&params.thread_id)? else {
638            return Ok(None);
639        };
640        metadata.name = Some(params.name.clone());
641        metadata.updated_at = chrono::Utc::now().timestamp();
642        self.store.upsert_thread(&metadata)?;
643        let updated = to_protocol_thread(metadata);
644        self.running_threads
645            .insert(updated.id.clone(), updated.clone());
646        Ok(Some(updated))
647    }
648
649    /// Sets or replaces the persisted goal for a thread.
650    pub fn set_thread_goal(&mut self, params: &ThreadGoalSetParams) -> Result<Option<ThreadGoal>> {
651        if self.store.get_thread(&params.thread_id)?.is_none() {
652            return Ok(None);
653        }
654        let now = chrono::Utc::now().timestamp();
655        let goal = ThreadGoalRecord {
656            thread_id: params.thread_id.clone(),
657            goal_id: format!("goal-{}", Uuid::new_v4()),
658            objective: params.objective.clone(),
659            status: PersistedThreadGoalStatus::Active,
660            token_budget: params.token_budget,
661            tokens_used: 0,
662            time_used_seconds: 0,
663            continuation_count: 0,
664            created_at: now,
665            updated_at: now,
666        };
667        self.store.upsert_thread_goal(&goal)?;
668        Ok(Some(to_protocol_goal(goal)))
669    }
670
671    /// Reads the persisted goal for a thread.
672    pub fn get_thread_goal(&self, params: &ThreadGoalGetParams) -> Result<Option<ThreadGoal>> {
673        Ok(self
674            .store
675            .get_thread_goal(&params.thread_id)?
676            .map(to_protocol_goal))
677    }
678
679    /// Accrues durable per-goal usage and/or a continuation pass for a thread.
680    pub fn record_thread_goal_progress(
681        &mut self,
682        params: &ThreadGoalProgressParams,
683    ) -> Result<Option<ThreadGoal>> {
684        if self.store.get_thread(&params.thread_id)?.is_none() {
685            return Ok(None);
686        }
687
688        let now = chrono::Utc::now().timestamp();
689        let mut goal = if params.token_delta != 0 || params.time_delta_seconds != 0 {
690            self.store.record_thread_goal_usage(
691                &params.thread_id,
692                params.token_delta,
693                params.time_delta_seconds,
694                now,
695            )?
696        } else {
697            self.store.get_thread_goal(&params.thread_id)?
698        };
699
700        if params.record_continuation {
701            goal = self
702                .store
703                .record_thread_goal_continuation(&params.thread_id, now)?;
704        }
705
706        Ok(goal.map(to_protocol_goal))
707    }
708
709    /// Clears the persisted goal for a thread, returning whether one existed.
710    pub fn clear_thread_goal(&mut self, params: &ThreadGoalClearParams) -> Result<bool> {
711        self.store.delete_thread_goal(&params.thread_id)
712    }
713
714    /// Archives a thread so it no longer appears in default listings.
715    pub fn archive_thread(&mut self, thread_id: &str) -> Result<()> {
716        self.store.mark_archived(thread_id)?;
717        if let Some(thread) = self.running_threads.get_mut(thread_id) {
718            thread.status = ThreadStatus::Archived;
719        }
720        Ok(())
721    }
722
723    /// Restores an archived thread to active status.
724    pub fn unarchive_thread(&mut self, thread_id: &str) -> Result<()> {
725        self.store.mark_unarchived(thread_id)?;
726        Ok(())
727    }
728
729    /// Records a user message in a thread and updates its preview and timestamp.
730    pub fn touch_message(&mut self, thread_id: &str, input: &str) -> Result<()> {
731        let Some(mut metadata) = self.store.get_thread(thread_id)? else {
732            return Ok(());
733        };
734        metadata.updated_at = chrono::Utc::now().timestamp();
735        metadata.preview = truncate_preview(input);
736        metadata.status = PersistedThreadStatus::Running;
737        self.store.upsert_thread(&metadata)?;
738        if let Some(thread) = self.running_threads.get_mut(thread_id) {
739            thread.updated_at = metadata.updated_at;
740            thread.preview = metadata.preview;
741            thread.status = ThreadStatus::Running;
742        }
743        let message_id = self.store.append_message(thread_id, "user", input, None)?;
744        self.store.save_checkpoint(
745            thread_id,
746            "latest",
747            &json!({
748                "reason": "thread_message",
749                "message_id": message_id,
750                "role": "user",
751                "preview": truncate_preview(input),
752                "updated_at": metadata.updated_at
753            }),
754        )?;
755        Ok(())
756    }
757
758    fn persist_thread(&self, thread: &Thread, rollout_path: Option<PathBuf>) -> Result<()> {
759        self.store.upsert_thread(&ThreadMetadata {
760            id: thread.id.clone(),
761            rollout_path,
762            preview: thread.preview.clone(),
763            ephemeral: thread.ephemeral,
764            model_provider: thread.model_provider.clone(),
765            created_at: thread.created_at,
766            updated_at: thread.updated_at,
767            status: to_persisted_status(&thread.status),
768            path: thread.path.clone(),
769            cwd: thread.cwd.clone(),
770            cli_version: thread.cli_version.clone(),
771            source: to_persisted_source(&thread.source),
772            name: thread.name.clone(),
773            sandbox_policy: None,
774            approval_mode: None,
775            archived: matches!(thread.status, ThreadStatus::Archived),
776            archived_at: None,
777            git_sha: None,
778            git_branch: None,
779            git_origin_url: None,
780            memory_mode: None,
781            current_leaf_id: None,
782        })
783    }
784}
785
786/// Top-level runtime combining config, model registry, threads, tools, MCP, and hooks.
787pub struct Runtime {
788    /// Resolved application configuration.
789    pub config: ConfigToml,
790    /// Registry of available model providers.
791    pub model_registry: ModelRegistry,
792    /// Manages conversation thread lifecycle.
793    pub thread_manager: ThreadManager,
794    /// Registry of callable tools.
795    pub tool_registry: Arc<ToolRegistry>,
796    /// Manager for MCP server connections.
797    pub mcp_manager: Arc<McpManager>,
798    /// Engine for evaluating execution policy decisions.
799    pub exec_policy: ExecPolicyEngine,
800    /// Dispatcher for lifecycle hooks.
801    pub hooks: HookDispatcher,
802    /// Manager for background job lifecycle.
803    pub jobs: JobManager,
804}
805
806impl Runtime {
807    /// Constructs a new `Runtime`, loading existing jobs from the state store.
808    pub fn new(
809        config: ConfigToml,
810        model_registry: ModelRegistry,
811        state: StateStore,
812        tool_registry: Arc<ToolRegistry>,
813        mcp_manager: Arc<McpManager>,
814        exec_policy: ExecPolicyEngine,
815        hooks: HookDispatcher,
816    ) -> Self {
817        let mut jobs = JobManager::default();
818        if let Err(e) = jobs.load_from_store(&state) {
819            tracing::warn!("Failed to load job store, starting with empty job list: {e}");
820        }
821        Self {
822            config,
823            model_registry,
824            thread_manager: ThreadManager::new(state),
825            tool_registry,
826            mcp_manager,
827            exec_policy,
828            hooks,
829            jobs,
830        }
831    }
832
833    fn persisted_thread_data(&self, thread_id: &str) -> Result<Value> {
834        let history = self
835            .thread_manager
836            .state_store()
837            .list_messages(thread_id, Some(500))?
838            .into_iter()
839            .map(|message| {
840                json!({
841                    "id": message.id,
842                    "role": message.role,
843                    "content": message.content,
844                    "item": message.item,
845                    "created_at": message.created_at
846                })
847            })
848            .collect::<Vec<_>>();
849
850        let checkpoint = self
851            .thread_manager
852            .state_store()
853            .load_checkpoint(thread_id, None)?
854            .map(|record| {
855                json!({
856                    "checkpoint_id": record.checkpoint_id,
857                    "state": record.state,
858                    "created_at": record.created_at
859                })
860            });
861
862        let goal = self
863            .thread_manager
864            .state_store()
865            .get_thread_goal(thread_id)?
866            .map(to_protocol_goal);
867
868        Ok(json!({
869            "history": history,
870            "checkpoint": checkpoint,
871            "goal": goal
872        }))
873    }
874
875    fn persist_latest_checkpoint(&self, thread_id: &str, reason: &str, state: Value) -> Result<()> {
876        self.thread_manager.state_store().save_checkpoint(
877            thread_id,
878            "latest",
879            &json!({
880                "reason": reason,
881                "saved_at": chrono::Utc::now().timestamp(),
882                "state": state
883            }),
884        )
885    }
886
887    /// Dispatches a thread request (create, start, resume, fork, list, read, etc.).
888    pub async fn handle_thread(&mut self, req: ThreadRequest) -> Result<ThreadResponse> {
889        match req {
890            ThreadRequest::Create { .. } => {
891                let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
892                let new = self.thread_manager.spawn_thread_with_history(
893                    "deepseek".to_string(),
894                    cwd,
895                    InitialHistory::New,
896                    false,
897                )?;
898                let mut response = thread_response_from_new("created", new);
899                response.data = self.persisted_thread_data(&response.thread_id)?;
900                Ok(response)
901            }
902            ThreadRequest::Start(params) => {
903                let cwd = params.cwd.clone().unwrap_or_else(|| {
904                    std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
905                });
906                let new = self.thread_manager.spawn_thread_with_history(
907                    params
908                        .model_provider
909                        .clone()
910                        .unwrap_or_else(|| "deepseek".to_string()),
911                    cwd,
912                    InitialHistory::New,
913                    params.persist_extended_history,
914                )?;
915                let mut response = thread_response_from_new("started", new);
916                response.data = self.persisted_thread_data(&response.thread_id)?;
917                Ok(response)
918            }
919            ThreadRequest::Resume(params) => {
920                let fallback_cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
921                if let Some(new) = self.thread_manager.resume_thread_with_history(
922                    &params,
923                    &fallback_cwd,
924                    "deepseek".to_string(),
925                )? {
926                    let mut response = thread_response_from_new("resumed", new);
927                    response.data = self.persisted_thread_data(&response.thread_id)?;
928                    Ok(response)
929                } else {
930                    Ok(ThreadResponse {
931                        thread_id: params.thread_id,
932                        status: "missing".to_string(),
933                        thread: None,
934                        threads: Vec::new(),
935                        goal: None,
936                        model: None,
937                        model_provider: None,
938                        cwd: None,
939                        approval_policy: params.approval_policy,
940                        sandbox: params.sandbox,
941                        events: Vec::new(),
942                        data: json!({"error":"thread not found"}),
943                    })
944                }
945            }
946            ThreadRequest::Fork(params) => {
947                let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
948                if let Some(new) = self.thread_manager.fork_thread(&params, &cwd)? {
949                    let mut response = thread_response_from_new("forked", new);
950                    response.data = self.persisted_thread_data(&response.thread_id)?;
951                    Ok(response)
952                } else {
953                    Ok(ThreadResponse {
954                        thread_id: params.thread_id,
955                        status: "missing".to_string(),
956                        thread: None,
957                        threads: Vec::new(),
958                        goal: None,
959                        model: None,
960                        model_provider: None,
961                        cwd: None,
962                        approval_policy: params.approval_policy,
963                        sandbox: params.sandbox,
964                        events: Vec::new(),
965                        data: json!({"error":"thread not found"}),
966                    })
967                }
968            }
969            ThreadRequest::List(params) => Ok(ThreadResponse {
970                thread_id: "list".to_string(),
971                status: "ok".to_string(),
972                thread: None,
973                threads: self.thread_manager.list_threads(&params)?,
974                goal: None,
975                model: None,
976                model_provider: None,
977                cwd: None,
978                approval_policy: None,
979                sandbox: None,
980                events: Vec::new(),
981                data: json!({}),
982            }),
983            ThreadRequest::Read(params) => {
984                let id = params.thread_id.clone();
985                let data = self.persisted_thread_data(&id)?;
986                Ok(ThreadResponse {
987                    thread_id: id,
988                    status: "ok".to_string(),
989                    thread: self.thread_manager.read_thread(&params)?,
990                    threads: Vec::new(),
991                    goal: self.thread_manager.get_thread_goal(&ThreadGoalGetParams {
992                        thread_id: params.thread_id,
993                    })?,
994                    model: None,
995                    model_provider: None,
996                    cwd: None,
997                    approval_policy: None,
998                    sandbox: None,
999                    events: Vec::new(),
1000                    data,
1001                })
1002            }
1003            ThreadRequest::SetName(params) => Ok(ThreadResponse {
1004                thread_id: params.thread_id.clone(),
1005                status: "ok".to_string(),
1006                thread: self.thread_manager.set_thread_name(&params)?,
1007                threads: Vec::new(),
1008                goal: None,
1009                model: None,
1010                model_provider: None,
1011                cwd: None,
1012                approval_policy: None,
1013                sandbox: None,
1014                events: Vec::new(),
1015                data: json!({}),
1016            }),
1017            ThreadRequest::GoalSet(params) => {
1018                let thread_id = params.thread_id.clone();
1019                if let Some(goal) = self.thread_manager.set_thread_goal(&params)? {
1020                    Ok(ThreadResponse {
1021                        thread_id,
1022                        status: "ok".to_string(),
1023                        thread: None,
1024                        threads: Vec::new(),
1025                        goal: Some(goal.clone()),
1026                        model: None,
1027                        model_provider: None,
1028                        cwd: None,
1029                        approval_policy: None,
1030                        sandbox: None,
1031                        events: vec![EventFrame::ThreadGoalUpdated { goal: goal.clone() }],
1032                        data: json!({ "goal": goal }),
1033                    })
1034                } else {
1035                    Ok(ThreadResponse {
1036                        thread_id,
1037                        status: "missing".to_string(),
1038                        thread: None,
1039                        threads: Vec::new(),
1040                        goal: None,
1041                        model: None,
1042                        model_provider: None,
1043                        cwd: None,
1044                        approval_policy: None,
1045                        sandbox: None,
1046                        events: Vec::new(),
1047                        data: json!({"error":"thread not found"}),
1048                    })
1049                }
1050            }
1051            ThreadRequest::GoalGet(params) => {
1052                let goal = self.thread_manager.get_thread_goal(&params)?;
1053                Ok(ThreadResponse {
1054                    thread_id: params.thread_id,
1055                    status: "ok".to_string(),
1056                    thread: None,
1057                    threads: Vec::new(),
1058                    goal: goal.clone(),
1059                    model: None,
1060                    model_provider: None,
1061                    cwd: None,
1062                    approval_policy: None,
1063                    sandbox: None,
1064                    events: Vec::new(),
1065                    data: json!({ "goal": goal }),
1066                })
1067            }
1068            ThreadRequest::GoalClear(params) => {
1069                let thread_id = params.thread_id.clone();
1070                let cleared = self.thread_manager.clear_thread_goal(&params)?;
1071                Ok(ThreadResponse {
1072                    thread_id: thread_id.clone(),
1073                    status: if cleared { "cleared" } else { "empty" }.to_string(),
1074                    thread: None,
1075                    threads: Vec::new(),
1076                    goal: None,
1077                    model: None,
1078                    model_provider: None,
1079                    cwd: None,
1080                    approval_policy: None,
1081                    sandbox: None,
1082                    events: if cleared {
1083                        vec![EventFrame::ThreadGoalCleared { thread_id }]
1084                    } else {
1085                        Vec::new()
1086                    },
1087                    data: json!({ "cleared": cleared }),
1088                })
1089            }
1090            ThreadRequest::GoalRecordProgress(params) => {
1091                let thread_id = params.thread_id.clone();
1092                if let Some(goal) = self.thread_manager.record_thread_goal_progress(&params)? {
1093                    Ok(ThreadResponse {
1094                        thread_id,
1095                        status: "ok".to_string(),
1096                        thread: None,
1097                        threads: Vec::new(),
1098                        goal: Some(goal.clone()),
1099                        model: None,
1100                        model_provider: None,
1101                        cwd: None,
1102                        approval_policy: None,
1103                        sandbox: None,
1104                        events: vec![EventFrame::ThreadGoalUpdated { goal: goal.clone() }],
1105                        data: json!({ "goal": goal }),
1106                    })
1107                } else {
1108                    Ok(ThreadResponse {
1109                        thread_id,
1110                        status: "missing".to_string(),
1111                        thread: None,
1112                        threads: Vec::new(),
1113                        goal: None,
1114                        model: None,
1115                        model_provider: None,
1116                        cwd: None,
1117                        approval_policy: None,
1118                        sandbox: None,
1119                        events: Vec::new(),
1120                        data: json!({"error":"thread or goal not found"}),
1121                    })
1122                }
1123            }
1124            ThreadRequest::Archive { thread_id } => {
1125                self.thread_manager.archive_thread(&thread_id)?;
1126                Ok(ThreadResponse {
1127                    thread_id,
1128                    status: "archived".to_string(),
1129                    thread: None,
1130                    threads: Vec::new(),
1131                    goal: None,
1132                    model: None,
1133                    model_provider: None,
1134                    cwd: None,
1135                    approval_policy: None,
1136                    sandbox: None,
1137                    events: Vec::new(),
1138                    data: json!({}),
1139                })
1140            }
1141            ThreadRequest::Unarchive { thread_id } => {
1142                self.thread_manager.unarchive_thread(&thread_id)?;
1143                Ok(ThreadResponse {
1144                    thread_id,
1145                    status: "unarchived".to_string(),
1146                    thread: None,
1147                    threads: Vec::new(),
1148                    goal: None,
1149                    model: None,
1150                    model_provider: None,
1151                    cwd: None,
1152                    approval_policy: None,
1153                    sandbox: None,
1154                    events: Vec::new(),
1155                    data: json!({}),
1156                })
1157            }
1158            ThreadRequest::Message { thread_id, input } => {
1159                self.thread_manager.touch_message(&thread_id, &input)?;
1160                let response_id = format!("{thread_id}:{}", input.len());
1161                self.hooks
1162                    .emit(HookEvent::ResponseStart {
1163                        response_id: response_id.clone(),
1164                    })
1165                    .await;
1166                self.hooks
1167                    .emit(HookEvent::ResponseEnd {
1168                        response_id: response_id.clone(),
1169                    })
1170                    .await;
1171
1172                Ok(ThreadResponse {
1173                    thread_id,
1174                    status: "accepted".to_string(),
1175                    thread: None,
1176                    threads: Vec::new(),
1177                    goal: None,
1178                    model: None,
1179                    model_provider: None,
1180                    cwd: None,
1181                    approval_policy: None,
1182                    sandbox: None,
1183                    events: vec![
1184                        EventFrame::ResponseStart {
1185                            response_id: response_id.clone(),
1186                        },
1187                        EventFrame::ResponseDelta {
1188                            response_id: response_id.clone(),
1189                            delta: "queued".to_string(),
1190                            channel: ResponseChannel::Text,
1191                        },
1192                        EventFrame::ResponseEnd { response_id },
1193                    ],
1194                    data: json!({}),
1195                })
1196            }
1197        }
1198    }
1199
1200    /// Resolves the model for a prompt, records the message, and returns the response.
1201    pub async fn handle_prompt(
1202        &mut self,
1203        req: PromptRequest,
1204        cli_overrides: &CliRuntimeOverrides,
1205    ) -> Result<PromptResponse> {
1206        let resolved = self.config.resolve_runtime_options(cli_overrides);
1207        let requested_model = req.model.clone().unwrap_or_else(|| resolved.model.clone());
1208        let selection = self
1209            .model_registry
1210            .resolve(Some(&requested_model), Some(resolved.provider));
1211        let resolved_model = selection.resolved.id.clone();
1212        let response_id = format!("resp-{}", Uuid::new_v4());
1213
1214        self.hooks
1215            .emit(HookEvent::ResponseStart {
1216                response_id: response_id.clone(),
1217            })
1218            .await;
1219        self.hooks
1220            .emit(HookEvent::ResponseDelta {
1221                response_id: response_id.clone(),
1222                delta: "model-selected".to_string(),
1223            })
1224            .await;
1225        self.hooks
1226            .emit(HookEvent::ResponseEnd {
1227                response_id: response_id.clone(),
1228            })
1229            .await;
1230
1231        let payload = json!({
1232            "provider": resolved.provider.as_str(),
1233            "model": resolved_model.clone(),
1234            "prompt": req.prompt,
1235            "telemetry": resolved.telemetry,
1236            "base_url": resolved.base_url,
1237            "has_api_key": resolved.api_key.as_ref().is_some_and(|k| !k.trim().is_empty()),
1238            "approval_policy": resolved.approval_policy,
1239            "sandbox_mode": resolved.sandbox_mode
1240        });
1241        if let Some(thread_id) = req.thread_id.as_ref() {
1242            self.thread_manager.touch_message(thread_id, &req.prompt)?;
1243            let assistant_message_id = self.thread_manager.store.append_message(
1244                thread_id,
1245                "assistant",
1246                &payload.to_string(),
1247                Some(payload.clone()),
1248            )?;
1249            self.persist_latest_checkpoint(
1250                thread_id,
1251                "prompt_response",
1252                json!({
1253                    "response_id": response_id.clone(),
1254                    "model": resolved_model.clone(),
1255                    "provider": resolved.provider.as_str(),
1256                    "assistant_message_id": assistant_message_id
1257                }),
1258            )?;
1259        }
1260
1261        Ok(PromptResponse {
1262            output: payload.to_string(),
1263            model: resolved_model,
1264            events: vec![
1265                EventFrame::ResponseStart {
1266                    response_id: response_id.clone(),
1267                },
1268                EventFrame::ResponseDelta {
1269                    response_id: response_id.clone(),
1270                    delta: "model-selected".to_string(),
1271                    channel: ResponseChannel::Text,
1272                },
1273                EventFrame::ResponseEnd { response_id },
1274            ],
1275        })
1276    }
1277
1278    /// Evaluates execution policy and dispatches a tool call.
1279    pub async fn invoke_tool(
1280        &self,
1281        call: ToolCall,
1282        approval_mode: AskForApproval,
1283        cwd: &Path,
1284    ) -> Result<Value> {
1285        let fallback_cwd = cwd.display().to_string();
1286        let (command, policy_cwd, execution_kind) = call.execution_subject(&fallback_cwd);
1287        let policy_tool = match &call.payload {
1288            ToolPayload::LocalShell { .. } => "exec_shell",
1289            _ => call.name.as_str(),
1290        };
1291        let policy_path = permission_path_for_call(&call);
1292        let decision = self.exec_policy.check(ExecPolicyContext {
1293            command: &command,
1294            cwd: &policy_cwd,
1295            tool: Some(policy_tool),
1296            path: policy_path.as_deref(),
1297            ask_for_approval: approval_mode,
1298            sandbox_mode: None,
1299        })?;
1300        let precheck = policy_precheck_payload(&decision, &command, &policy_cwd, execution_kind);
1301        let response_id = format!("tool-{}", Uuid::new_v4());
1302        let call_id = call
1303            .raw_tool_call_id
1304            .clone()
1305            .unwrap_or_else(|| format!("tool-call-{}", Uuid::new_v4()));
1306        self.hooks
1307            .emit(HookEvent::ToolLifecycle {
1308                response_id: response_id.clone(),
1309                tool_name: call.name.clone(),
1310                phase: "precheck".to_string(),
1311                payload: precheck.clone(),
1312            })
1313            .await;
1314
1315        if !decision.allow {
1316            let reason = decision.reason().to_string();
1317            let approval_id = format!("approval-{}", Uuid::new_v4());
1318            let error_frame = EventFrame::Error {
1319                response_id: response_id.clone(),
1320                message: reason.clone(),
1321            };
1322            self.hooks
1323                .emit(HookEvent::ApprovalLifecycle {
1324                    approval_id,
1325                    phase: "denied".to_string(),
1326                    reason: Some(reason.clone()),
1327                })
1328                .await;
1329            self.hooks
1330                .emit(HookEvent::GenericEventFrame {
1331                    frame: Box::new(error_frame.clone()),
1332                })
1333                .await;
1334            return Ok(json!({
1335                "ok": false,
1336                "status": "denied",
1337                "execution_kind": execution_kind,
1338                "response_id": response_id,
1339                "precheck": precheck,
1340                "error": reason,
1341                "events": [event_frame_payload(&error_frame)],
1342            }));
1343        }
1344
1345        if decision.requires_approval {
1346            let approval_id = format!("approval-{}", Uuid::new_v4());
1347            let reason = decision.reason().to_string();
1348            let maybe_approval_frame = approval_request_frame(
1349                &decision.requirement,
1350                decision.matched_rule.as_deref(),
1351                call_id,
1352                approval_id.clone(),
1353                response_id.clone(),
1354                command.clone(),
1355                policy_cwd.clone(),
1356            );
1357            self.hooks
1358                .emit(HookEvent::ApprovalLifecycle {
1359                    approval_id: approval_id.clone(),
1360                    phase: "requested".to_string(),
1361                    reason: Some(reason.clone()),
1362                })
1363                .await;
1364            let mut events = Vec::new();
1365            if let Some(frame) = maybe_approval_frame {
1366                self.hooks
1367                    .emit(HookEvent::GenericEventFrame {
1368                        frame: Box::new(frame.clone()),
1369                    })
1370                    .await;
1371                events.push(event_frame_payload(&frame));
1372            }
1373            return Ok(json!({
1374                "ok": false,
1375                "status": "approval_required",
1376                "execution_kind": execution_kind,
1377                "response_id": response_id,
1378                "approval_id": approval_id,
1379                "precheck": precheck,
1380                "error": reason,
1381                "events": events,
1382            }));
1383        }
1384
1385        let start_frame = EventFrame::ToolCallStart {
1386            response_id: response_id.clone(),
1387            tool_name: call.name.clone(),
1388            arguments: tool_payload_value(&call.payload),
1389        };
1390        self.hooks
1391            .emit(HookEvent::GenericEventFrame {
1392                frame: Box::new(start_frame.clone()),
1393            })
1394            .await;
1395        self.hooks
1396            .emit(HookEvent::ToolLifecycle {
1397                response_id: response_id.clone(),
1398                tool_name: call.name.clone(),
1399                phase: "dispatching".to_string(),
1400                payload: json!({
1401                    "call_id": call_id,
1402                    "execution_kind": execution_kind
1403                }),
1404            })
1405            .await;
1406
1407        match self.tool_registry.dispatch(call.clone(), true).await {
1408            Ok(tool_output) => {
1409                let result_frame = EventFrame::ToolCallResult {
1410                    response_id: response_id.clone(),
1411                    tool_name: call.name.clone(),
1412                    output: tool_output_value(&tool_output),
1413                };
1414                self.hooks
1415                    .emit(HookEvent::GenericEventFrame {
1416                        frame: Box::new(result_frame.clone()),
1417                    })
1418                    .await;
1419                self.hooks
1420                    .emit(HookEvent::ToolLifecycle {
1421                        response_id: response_id.clone(),
1422                        tool_name: call.name,
1423                        phase: "completed".to_string(),
1424                        payload: json!({ "ok": true }),
1425                    })
1426                    .await;
1427                Ok(json!({
1428                    "ok": true,
1429                    "status": "completed",
1430                    "execution_kind": execution_kind,
1431                    "response_id": response_id,
1432                    "precheck": precheck,
1433                    "output": tool_output,
1434                    "events": [
1435                        event_frame_payload(&start_frame),
1436                        event_frame_payload(&result_frame)
1437                    ]
1438                }))
1439            }
1440            Err(err) => {
1441                let message = format!("{err:?}");
1442                let error_frame = EventFrame::Error {
1443                    response_id: response_id.clone(),
1444                    message: message.clone(),
1445                };
1446                self.hooks
1447                    .emit(HookEvent::GenericEventFrame {
1448                        frame: Box::new(error_frame.clone()),
1449                    })
1450                    .await;
1451                self.hooks
1452                    .emit(HookEvent::ToolLifecycle {
1453                        response_id: response_id.clone(),
1454                        tool_name: call.name,
1455                        phase: "failed".to_string(),
1456                        payload: json!({ "error": message.clone() }),
1457                    })
1458                    .await;
1459                Ok(json!({
1460                    "ok": false,
1461                    "status": "failed",
1462                    "execution_kind": execution_kind,
1463                    "response_id": response_id,
1464                    "precheck": precheck,
1465                    "error": message,
1466                    "events": [
1467                        event_frame_payload(&start_frame),
1468                        event_frame_payload(&error_frame)
1469                    ]
1470                }))
1471            }
1472        }
1473    }
1474
1475    /// Starts all configured MCP servers and emits startup events via hooks.
1476    pub async fn mcp_startup(&self) -> McpStartupCompleteEvent {
1477        let mut updates = Vec::new();
1478        let summary = self.mcp_manager.start_all(|update| {
1479            updates.push(update);
1480        });
1481        for update in updates {
1482            let status = match update.status {
1483                McpManagerStartupStatus::Starting => codewhale_protocol::McpStartupStatus::Starting,
1484                McpManagerStartupStatus::Ready => codewhale_protocol::McpStartupStatus::Ready,
1485                McpManagerStartupStatus::Failed { error } => {
1486                    codewhale_protocol::McpStartupStatus::Failed { error }
1487                }
1488                McpManagerStartupStatus::Cancelled => {
1489                    codewhale_protocol::McpStartupStatus::Cancelled
1490                }
1491            };
1492            self.hooks
1493                .emit(HookEvent::GenericEventFrame {
1494                    frame: Box::new(EventFrame::McpStartupUpdate {
1495                        update: codewhale_protocol::McpStartupUpdateEvent {
1496                            server_name: update.server_name,
1497                            status,
1498                        },
1499                    }),
1500                })
1501                .await;
1502        }
1503        self.hooks
1504            .emit(HookEvent::GenericEventFrame {
1505                frame: Box::new(EventFrame::McpStartupComplete {
1506                    summary: codewhale_protocol::McpStartupCompleteEvent {
1507                        ready: summary.ready.clone(),
1508                        failed: summary
1509                            .failed
1510                            .iter()
1511                            .map(|f| codewhale_protocol::McpStartupFailure {
1512                                server_name: f.server_name.clone(),
1513                                error: f.error.clone(),
1514                            })
1515                            .collect(),
1516                        cancelled: summary.cancelled.clone(),
1517                    },
1518                }),
1519            })
1520            .await;
1521        summary
1522    }
1523
1524    /// Returns the current application status including all jobs and their history.
1525    pub fn app_status(&self) -> AppResponse {
1526        let jobs = self.jobs.list();
1527        let events = jobs
1528            .iter()
1529            .flat_map(|job| {
1530                job.history.iter().map(|entry| EventFrame::ResponseDelta {
1531                    response_id: job.id.clone(),
1532                    delta: json!({
1533                        "kind": "job_transition",
1534                        "job_id": job.id.clone(),
1535                        "phase": entry.phase.clone(),
1536                        "status": job_status_to_str(entry.status),
1537                        "progress": entry.progress,
1538                        "detail": entry.detail.clone(),
1539                        "retry": job_retry_to_value(&entry.retry),
1540                        "at": entry.at
1541                    })
1542                    .to_string(),
1543                    channel: ResponseChannel::Text,
1544                })
1545            })
1546            .collect::<Vec<_>>();
1547        AppResponse {
1548            ok: true,
1549            data: json!({
1550                "jobs": jobs.into_iter().map(|job| {
1551                    json!({
1552                        "id": job.id,
1553                        "name": job.name,
1554                        "status": job_status_to_str(job.status),
1555                        "progress": job.progress,
1556                        "detail": job.detail,
1557                        "retry": job_retry_to_value(&job.retry),
1558                        "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
1559                    })
1560                }).collect::<Vec<_>>()
1561            }),
1562            events,
1563        }
1564    }
1565
1566    /// Returns the default model provider from the resolved configuration.
1567    pub fn provider_default(&self) -> ProviderKind {
1568        self.config.provider
1569    }
1570
1571    /// Saves a named checkpoint for a thread.
1572    pub fn save_thread_checkpoint(
1573        &self,
1574        thread_id: &str,
1575        checkpoint_id: &str,
1576        state: &Value,
1577    ) -> Result<()> {
1578        self.thread_manager
1579            .state_store()
1580            .save_checkpoint(thread_id, checkpoint_id, state)
1581    }
1582
1583    /// Loads a checkpoint for a thread. Pass `None` for the latest.
1584    pub fn load_thread_checkpoint(
1585        &self,
1586        thread_id: &str,
1587        checkpoint_id: Option<&str>,
1588    ) -> Result<Option<Value>> {
1589        Ok(self
1590            .thread_manager
1591            .state_store()
1592            .load_checkpoint(thread_id, checkpoint_id)?
1593            .map(|checkpoint| checkpoint.state))
1594    }
1595
1596    /// Enqueues a new background job and persists it immediately.
1597    pub fn enqueue_job(&mut self, name: impl Into<String>) -> Result<JobRecord> {
1598        let job = self.jobs.enqueue(name);
1599        self.jobs
1600            .persist_job(self.thread_manager.state_store(), &job.id)?;
1601        Ok(job)
1602    }
1603
1604    /// Transitions a job to running and persists the change.
1605    pub fn set_job_running(&mut self, job_id: &str) -> Result<()> {
1606        self.jobs.set_running(job_id);
1607        self.jobs
1608            .persist_job(self.thread_manager.state_store(), job_id)
1609    }
1610
1611    /// Updates a job's progress and persists the change.
1612    pub fn update_job_progress(
1613        &mut self,
1614        job_id: &str,
1615        progress: u8,
1616        detail: Option<String>,
1617    ) -> Result<()> {
1618        self.jobs.update_progress(job_id, progress, detail);
1619        self.jobs
1620            .persist_job(self.thread_manager.state_store(), job_id)
1621    }
1622
1623    /// Marks a job as completed and persists the change.
1624    pub fn complete_job(&mut self, job_id: &str) -> Result<()> {
1625        self.jobs.complete(job_id);
1626        self.jobs
1627            .persist_job(self.thread_manager.state_store(), job_id)
1628    }
1629
1630    /// Marks a job as failed and persists the change.
1631    pub fn fail_job(&mut self, job_id: &str, detail: impl Into<String>) -> Result<()> {
1632        self.jobs.fail(job_id, detail);
1633        self.jobs
1634            .persist_job(self.thread_manager.state_store(), job_id)
1635    }
1636
1637    /// Cancels a job and persists the change.
1638    pub fn cancel_job(&mut self, job_id: &str) -> Result<()> {
1639        self.jobs.cancel(job_id);
1640        self.jobs
1641            .persist_job(self.thread_manager.state_store(), job_id)
1642    }
1643
1644    /// Pauses a job and persists the change.
1645    pub fn pause_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1646        self.jobs.pause(job_id, detail);
1647        self.jobs
1648            .persist_job(self.thread_manager.state_store(), job_id)
1649    }
1650
1651    /// Resumes a paused job and persists the change.
1652    pub fn resume_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1653        self.jobs.resume(job_id, detail);
1654        self.jobs
1655            .persist_job(self.thread_manager.state_store(), job_id)
1656    }
1657
1658    /// Returns the state-transition history for a job.
1659    pub fn job_history(&self, job_id: &str) -> Vec<JobHistoryEntry> {
1660        self.jobs.history(job_id)
1661    }
1662}
1663
1664fn thread_response_from_new(status: &str, new: NewThread) -> ThreadResponse {
1665    ThreadResponse {
1666        thread_id: new.thread.id.clone(),
1667        status: status.to_string(),
1668        thread: Some(new.thread),
1669        threads: Vec::new(),
1670        goal: None,
1671        model: Some(new.model),
1672        model_provider: Some(new.model_provider),
1673        cwd: Some(new.cwd),
1674        approval_policy: new.approval_policy,
1675        sandbox: new.sandbox,
1676        events: Vec::new(),
1677        data: json!({}),
1678    }
1679}
1680
1681fn preview_from_initial_history(initial_history: &InitialHistory) -> String {
1682    match initial_history {
1683        InitialHistory::New => "New conversation".to_string(),
1684        InitialHistory::Forked(items) => truncate_preview(
1685            &items
1686                .first()
1687                .map(Value::to_string)
1688                .unwrap_or_else(|| "Forked conversation".to_string()),
1689        ),
1690        InitialHistory::Resumed { history, .. } => truncate_preview(
1691            &history
1692                .first()
1693                .map(Value::to_string)
1694                .unwrap_or_else(|| "Resumed conversation".to_string()),
1695        ),
1696    }
1697}
1698
1699fn permission_path_for_call(call: &ToolCall) -> Option<String> {
1700    match &call.payload {
1701        ToolPayload::Function { arguments } => serde_json::from_str::<Value>(arguments)
1702            .ok()
1703            .and_then(|value| {
1704                value
1705                    .get("path")
1706                    .and_then(Value::as_str)
1707                    .map(str::to_string)
1708            }),
1709        ToolPayload::Mcp { raw_arguments, .. } => raw_arguments
1710            .get("path")
1711            .and_then(Value::as_str)
1712            .map(str::to_string),
1713        ToolPayload::Custom { .. } | ToolPayload::LocalShell { .. } => None,
1714    }
1715}
1716
1717fn truncate_preview(value: &str) -> String {
1718    value.chars().take(120).collect()
1719}
1720
1721fn to_protocol_thread(thread: ThreadMetadata) -> Thread {
1722    Thread {
1723        id: thread.id,
1724        preview: thread.preview,
1725        ephemeral: thread.ephemeral,
1726        model_provider: thread.model_provider,
1727        created_at: thread.created_at,
1728        updated_at: thread.updated_at,
1729        status: match thread.status {
1730            PersistedThreadStatus::Running => ThreadStatus::Running,
1731            PersistedThreadStatus::Idle => ThreadStatus::Idle,
1732            PersistedThreadStatus::Completed => ThreadStatus::Completed,
1733            PersistedThreadStatus::Failed => ThreadStatus::Failed,
1734            PersistedThreadStatus::Paused => ThreadStatus::Paused,
1735            PersistedThreadStatus::Archived => ThreadStatus::Archived,
1736        },
1737        path: thread.path,
1738        cwd: thread.cwd,
1739        cli_version: thread.cli_version,
1740        source: match thread.source {
1741            SessionSource::Interactive => codewhale_protocol::SessionSource::Interactive,
1742            SessionSource::Resume => codewhale_protocol::SessionSource::Resume,
1743            SessionSource::Fork => codewhale_protocol::SessionSource::Fork,
1744            SessionSource::Api => codewhale_protocol::SessionSource::Api,
1745            SessionSource::Unknown => codewhale_protocol::SessionSource::Unknown,
1746        },
1747        name: thread.name,
1748    }
1749}
1750
1751fn to_protocol_goal(goal: ThreadGoalRecord) -> ThreadGoal {
1752    ThreadGoal {
1753        thread_id: goal.thread_id,
1754        goal_id: goal.goal_id,
1755        objective: goal.objective,
1756        status: to_protocol_goal_status(goal.status),
1757        token_budget: goal.token_budget,
1758        tokens_used: goal.tokens_used,
1759        time_used_seconds: goal.time_used_seconds,
1760        continuation_count: goal.continuation_count,
1761        created_at: goal.created_at,
1762        updated_at: goal.updated_at,
1763    }
1764}
1765
1766fn to_protocol_goal_status(status: PersistedThreadGoalStatus) -> ThreadGoalStatus {
1767    match status {
1768        PersistedThreadGoalStatus::Active => ThreadGoalStatus::Active,
1769        PersistedThreadGoalStatus::Paused => ThreadGoalStatus::Paused,
1770        PersistedThreadGoalStatus::Blocked => ThreadGoalStatus::Blocked,
1771        PersistedThreadGoalStatus::UsageLimited => ThreadGoalStatus::UsageLimited,
1772        PersistedThreadGoalStatus::BudgetLimited => ThreadGoalStatus::BudgetLimited,
1773        PersistedThreadGoalStatus::Complete => ThreadGoalStatus::Complete,
1774    }
1775}
1776
1777fn to_persisted_status(status: &ThreadStatus) -> PersistedThreadStatus {
1778    match status {
1779        ThreadStatus::Running => PersistedThreadStatus::Running,
1780        ThreadStatus::Idle => PersistedThreadStatus::Idle,
1781        ThreadStatus::Completed => PersistedThreadStatus::Completed,
1782        ThreadStatus::Failed => PersistedThreadStatus::Failed,
1783        ThreadStatus::Paused => PersistedThreadStatus::Paused,
1784        ThreadStatus::Archived => PersistedThreadStatus::Archived,
1785    }
1786}
1787
1788fn to_persisted_source(source: &codewhale_protocol::SessionSource) -> SessionSource {
1789    match source {
1790        codewhale_protocol::SessionSource::Interactive => SessionSource::Interactive,
1791        codewhale_protocol::SessionSource::Resume => SessionSource::Resume,
1792        codewhale_protocol::SessionSource::Fork => SessionSource::Fork,
1793        codewhale_protocol::SessionSource::Api => SessionSource::Api,
1794        codewhale_protocol::SessionSource::Unknown => SessionSource::Unknown,
1795    }
1796}
1797
1798fn approval_request_frame(
1799    requirement: &ExecApprovalRequirement,
1800    matched_rule: Option<&str>,
1801    call_id: String,
1802    approval_id: String,
1803    turn_id: String,
1804    command: String,
1805    cwd: String,
1806) -> Option<EventFrame> {
1807    let ExecApprovalRequirement::NeedsApproval {
1808        reason,
1809        proposed_execpolicy_amendment,
1810        proposed_network_policy_amendments,
1811    } = requirement
1812    else {
1813        return None;
1814    };
1815
1816    let mut available_decisions = vec![
1817        ReviewDecision::Approved,
1818        ReviewDecision::ApprovedForSession,
1819        ReviewDecision::Denied,
1820        ReviewDecision::Abort,
1821    ];
1822    if proposed_execpolicy_amendment
1823        .as_ref()
1824        .is_some_and(|amendment| !amendment.prefixes.is_empty())
1825    {
1826        available_decisions.push(ReviewDecision::ApprovedExecpolicyAmendment);
1827    }
1828    available_decisions.extend(proposed_network_policy_amendments.iter().cloned().map(
1829        |amendment| ReviewDecision::NetworkPolicyAmendment {
1830            host: amendment.host,
1831            action: amendment.action,
1832        },
1833    ));
1834
1835    Some(EventFrame::ExecApprovalRequest {
1836        request: ExecApprovalRequestEvent {
1837            call_id,
1838            approval_id,
1839            turn_id,
1840            command,
1841            cwd,
1842            reason: reason.clone(),
1843            matched_rule: matched_rule.map(|rule| rule.to_string().into_boxed_str()),
1844            network_approval_context: None,
1845            proposed_execpolicy_amendment: proposed_execpolicy_amendment
1846                .as_ref()
1847                .map(|amendment| amendment.prefixes.clone())
1848                .unwrap_or_default(),
1849            proposed_network_policy_amendments: proposed_network_policy_amendments.clone(),
1850            additional_permissions: Vec::new(),
1851            available_decisions,
1852        },
1853    })
1854}
1855
1856fn approval_requirement_payload(requirement: &ExecApprovalRequirement) -> Value {
1857    match requirement {
1858        ExecApprovalRequirement::Skip {
1859            bypass_sandbox,
1860            proposed_execpolicy_amendment,
1861        } => json!({
1862            "type": "skip",
1863            "bypass_sandbox": bypass_sandbox,
1864            "reason": requirement.reason(),
1865            "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1866                .as_ref()
1867                .map(|amendment| amendment.prefixes.clone())
1868                .unwrap_or_default()
1869        }),
1870        ExecApprovalRequirement::NeedsApproval {
1871            reason,
1872            proposed_execpolicy_amendment,
1873            proposed_network_policy_amendments,
1874        } => json!({
1875            "type": "needs_approval",
1876            "reason": reason,
1877            "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1878                .as_ref()
1879                .map(|amendment| amendment.prefixes.clone())
1880                .unwrap_or_default(),
1881            "proposed_network_policy_amendments": proposed_network_policy_amendments
1882        }),
1883        ExecApprovalRequirement::Forbidden { reason } => json!({
1884            "type": "forbidden",
1885            "reason": reason
1886        }),
1887    }
1888}
1889
1890fn policy_precheck_payload(
1891    decision: &ExecPolicyDecision,
1892    command: &str,
1893    cwd: &str,
1894    execution_kind: &str,
1895) -> Value {
1896    json!({
1897        "execution_kind": execution_kind,
1898        "command": command,
1899        "cwd": cwd,
1900        "allow": decision.allow,
1901        "requires_approval": decision.requires_approval,
1902        "matched_rule": decision.matched_rule.clone(),
1903        "phase": decision.requirement.phase(),
1904        "reason": decision.reason(),
1905        "requirement": approval_requirement_payload(&decision.requirement)
1906    })
1907}
1908
1909fn tool_payload_value(payload: &ToolPayload) -> Value {
1910    serde_json::to_value(payload).unwrap_or_else(
1911        |_| json!({"type":"serialization_error","message":"tool payload unavailable"}),
1912    )
1913}
1914
1915fn tool_output_value(output: &codewhale_protocol::ToolOutput) -> Value {
1916    serde_json::to_value(output).unwrap_or_else(
1917        |_| json!({"type":"serialization_error","message":"tool output unavailable"}),
1918    )
1919}
1920
1921fn event_frame_payload(frame: &EventFrame) -> Value {
1922    serde_json::to_value(frame)
1923        .unwrap_or_else(|_| json!({"event":"error","message":"failed to encode event frame"}))
1924}
1925
1926fn json_optional_string(value: &Value) -> Option<String> {
1927    if value.is_null() {
1928        None
1929    } else {
1930        value.as_str().map(ToString::to_string)
1931    }
1932}
1933
1934fn parse_retry_metadata(value: Option<&Value>) -> JobRetryMetadata {
1935    let Some(value) = value else {
1936        return JobRetryMetadata::default();
1937    };
1938    JobRetryMetadata {
1939        attempt: value
1940            .get("attempt")
1941            .and_then(Value::as_u64)
1942            .unwrap_or(0)
1943            .min(u32::MAX as u64) as u32,
1944        max_attempts: value
1945            .get("max_attempts")
1946            .and_then(Value::as_u64)
1947            .unwrap_or(DEFAULT_JOB_MAX_ATTEMPTS as u64)
1948            .min(u32::MAX as u64) as u32,
1949        backoff_base_ms: value
1950            .get("backoff_base_ms")
1951            .and_then(Value::as_u64)
1952            .unwrap_or(DEFAULT_JOB_BACKOFF_BASE_MS),
1953        next_backoff_ms: value
1954            .get("next_backoff_ms")
1955            .and_then(Value::as_u64)
1956            .unwrap_or(0),
1957        next_retry_at: value.get("next_retry_at").and_then(Value::as_i64),
1958    }
1959}
1960
1961fn parse_history_entry(value: &Value) -> Option<JobHistoryEntry> {
1962    let status = value
1963        .get("status")
1964        .and_then(Value::as_str)
1965        .and_then(job_status_from_str)?;
1966    Some(JobHistoryEntry {
1967        at: value.get("at").and_then(Value::as_i64).unwrap_or(0),
1968        phase: value
1969            .get("phase")
1970            .and_then(Value::as_str)
1971            .unwrap_or("unknown")
1972            .to_string(),
1973        status,
1974        progress: value
1975            .get("progress")
1976            .and_then(Value::as_u64)
1977            .map(|v| v.min(u8::MAX as u64) as u8),
1978        detail: value.get("detail").and_then(json_optional_string),
1979        retry: parse_retry_metadata(value.get("retry")),
1980    })
1981}
1982
1983fn job_status_to_str(status: JobStatus) -> &'static str {
1984    match status {
1985        JobStatus::Queued => "queued",
1986        JobStatus::Running => "running",
1987        JobStatus::Paused => "paused",
1988        JobStatus::Completed => "completed",
1989        JobStatus::Failed => "failed",
1990        JobStatus::Cancelled => "cancelled",
1991    }
1992}
1993
1994fn job_status_from_str(value: &str) -> Option<JobStatus> {
1995    match value {
1996        "queued" => Some(JobStatus::Queued),
1997        "running" => Some(JobStatus::Running),
1998        "paused" => Some(JobStatus::Paused),
1999        "completed" => Some(JobStatus::Completed),
2000        "failed" => Some(JobStatus::Failed),
2001        "cancelled" => Some(JobStatus::Cancelled),
2002        _ => None,
2003    }
2004}
2005
2006fn job_retry_to_value(retry: &JobRetryMetadata) -> Value {
2007    json!({
2008        "attempt": retry.attempt,
2009        "max_attempts": retry.max_attempts,
2010        "backoff_base_ms": retry.backoff_base_ms,
2011        "next_backoff_ms": retry.next_backoff_ms,
2012        "next_retry_at": retry.next_retry_at
2013    })
2014}
2015
2016fn job_history_to_value(entry: &JobHistoryEntry) -> Value {
2017    json!({
2018        "at": entry.at,
2019        "phase": entry.phase.clone(),
2020        "status": job_status_to_str(entry.status),
2021        "progress": entry.progress,
2022        "detail": entry.detail.clone(),
2023        "retry": job_retry_to_value(&entry.retry)
2024    })
2025}
2026
2027fn runtime_status_to_job_state(status: JobStatus) -> JobStateStatus {
2028    match status {
2029        JobStatus::Queued => JobStateStatus::Queued,
2030        JobStatus::Running => JobStateStatus::Running,
2031        JobStatus::Paused => JobStateStatus::Running,
2032        JobStatus::Completed => JobStateStatus::Completed,
2033        JobStatus::Failed => JobStateStatus::Failed,
2034        JobStatus::Cancelled => JobStateStatus::Cancelled,
2035    }
2036}
2037
2038fn job_state_status_to_runtime(status: JobStateStatus) -> JobStatus {
2039    match status {
2040        JobStateStatus::Queued => JobStatus::Queued,
2041        JobStateStatus::Running => JobStatus::Running,
2042        JobStateStatus::Completed => JobStatus::Completed,
2043        JobStateStatus::Failed => JobStatus::Failed,
2044        JobStateStatus::Cancelled => JobStatus::Cancelled,
2045    }
2046}
2047
2048#[cfg(test)]
2049mod tests {
2050    use super::*;
2051    use codewhale_tools::ToolCallSource;
2052
2053    fn temp_core_state(name: &str) -> StateStore {
2054        let dir =
2055            std::env::temp_dir().join(format!("codewhale-core-{name}-{}", Uuid::new_v4().simple()));
2056        std::fs::create_dir_all(&dir).expect("create temp state dir");
2057        StateStore::open(Some(dir.join("state.db"))).expect("open state store")
2058    }
2059
2060    fn test_thread_metadata(id: &str) -> ThreadMetadata {
2061        ThreadMetadata {
2062            id: id.to_string(),
2063            rollout_path: None,
2064            preview: "test thread".to_string(),
2065            ephemeral: false,
2066            model_provider: "deepseek".to_string(),
2067            created_at: 10,
2068            updated_at: 10,
2069            status: PersistedThreadStatus::Running,
2070            path: None,
2071            cwd: PathBuf::from("/tmp/codewhale"),
2072            cli_version: "0.0.0-test".to_string(),
2073            source: SessionSource::Interactive,
2074            name: None,
2075            sandbox_policy: None,
2076            approval_mode: None,
2077            archived: false,
2078            archived_at: None,
2079            git_sha: None,
2080            git_branch: None,
2081            git_origin_url: None,
2082            memory_mode: None,
2083            current_leaf_id: None,
2084        }
2085    }
2086
2087    // ── JobManager: lifecycle ──────────────────────────────────────────
2088
2089    #[test]
2090    fn permission_path_for_call_extracts_function_path_argument() {
2091        let call = ToolCall {
2092            name: "read_file".to_string(),
2093            payload: ToolPayload::Function {
2094                arguments: json!({ "path": "README.md" }).to_string(),
2095            },
2096            source: ToolCallSource::Direct,
2097            raw_tool_call_id: None,
2098        };
2099
2100        assert_eq!(
2101            permission_path_for_call(&call).as_deref(),
2102            Some("README.md")
2103        );
2104    }
2105
2106    #[test]
2107    fn permission_path_for_call_extracts_mcp_path_argument() {
2108        let call = ToolCall {
2109            name: "mcp_fs_read".to_string(),
2110            payload: ToolPayload::Mcp {
2111                server: "fs".to_string(),
2112                tool: "read".to_string(),
2113                raw_arguments: json!({ "path": "secrets/token.txt" }),
2114                raw_tool_call_id: None,
2115            },
2116            source: ToolCallSource::Direct,
2117            raw_tool_call_id: None,
2118        };
2119
2120        assert_eq!(
2121            permission_path_for_call(&call).as_deref(),
2122            Some("secrets/token.txt")
2123        );
2124    }
2125
2126    #[test]
2127    fn permission_path_for_call_ignores_shell_payload() {
2128        let call = ToolCall {
2129            name: "exec_shell".to_string(),
2130            payload: ToolPayload::LocalShell {
2131                params: codewhale_protocol::LocalShellParams {
2132                    command: "cargo test".to_string(),
2133                    cwd: None,
2134                    timeout_ms: None,
2135                },
2136            },
2137            source: ToolCallSource::Direct,
2138            raw_tool_call_id: None,
2139        };
2140
2141        assert_eq!(permission_path_for_call(&call), None);
2142    }
2143
2144    #[test]
2145    fn thread_goal_progress_accumulates_durable_accounting() {
2146        let store = temp_core_state("thread-goal-progress");
2147        store
2148            .upsert_thread(&test_thread_metadata("thread-1"))
2149            .expect("upsert thread");
2150        let mut manager = ThreadManager::new(store);
2151        manager
2152            .set_thread_goal(&ThreadGoalSetParams {
2153                thread_id: "thread-1".to_string(),
2154                objective: "Carry the goal across turns".to_string(),
2155                token_budget: Some(2_000),
2156            })
2157            .expect("set goal")
2158            .expect("goal exists");
2159
2160        let updated = manager
2161            .record_thread_goal_progress(&ThreadGoalProgressParams {
2162                thread_id: "thread-1".to_string(),
2163                token_delta: 750,
2164                time_delta_seconds: 12,
2165                record_continuation: true,
2166            })
2167            .expect("record progress")
2168            .expect("goal exists");
2169
2170        assert_eq!(updated.tokens_used, 750);
2171        assert_eq!(updated.time_used_seconds, 12);
2172        assert_eq!(updated.continuation_count, 1);
2173
2174        let persisted = manager
2175            .get_thread_goal(&ThreadGoalGetParams {
2176                thread_id: "thread-1".to_string(),
2177            })
2178            .expect("read goal")
2179            .expect("goal exists");
2180        assert_eq!(persisted.tokens_used, 750);
2181        assert_eq!(persisted.time_used_seconds, 12);
2182        assert_eq!(persisted.continuation_count, 1);
2183    }
2184
2185    #[test]
2186    fn approval_request_frame_includes_matched_rule() {
2187        let requirement = ExecApprovalRequirement::NeedsApproval {
2188            reason: "Typed ask rule 'tool=exec_shell command=cargo test' requires approval."
2189                .to_string(),
2190            proposed_execpolicy_amendment: None,
2191            proposed_network_policy_amendments: Vec::new(),
2192        };
2193
2194        let frame = approval_request_frame(
2195            &requirement,
2196            Some("tool=exec_shell command=cargo test"),
2197            "call-1".to_string(),
2198            "approval-1".to_string(),
2199            "turn-1".to_string(),
2200            "cargo test --workspace".to_string(),
2201            "/repo".to_string(),
2202        )
2203        .expect("approval frame");
2204
2205        let EventFrame::ExecApprovalRequest { request } = frame else {
2206            panic!("expected exec approval request frame");
2207        };
2208        assert_eq!(
2209            request.matched_rule.as_deref(),
2210            Some("tool=exec_shell command=cargo test")
2211        );
2212        assert_eq!(request.reason, requirement.reason());
2213    }
2214
2215    #[test]
2216    fn enqueue_creates_queued_job_with_zero_progress() {
2217        let mut jm = JobManager::default();
2218        let job = jm.enqueue("build");
2219        assert_eq!(job.name, "build");
2220        assert_eq!(job.status, JobStatus::Queued);
2221        assert_eq!(job.progress, Some(0));
2222        assert!(job.detail.is_none());
2223        assert_eq!(job.history.len(), 1);
2224        assert_eq!(job.history[0].phase, "created");
2225    }
2226
2227    #[test]
2228    fn set_running_transitions_from_queued() {
2229        let mut jm = JobManager::default();
2230        let job = jm.enqueue("deploy");
2231        let id = job.id.clone();
2232        jm.set_running(&id);
2233        let jobs = jm.list();
2234        let updated = jobs.iter().find(|j| j.id == id).unwrap();
2235        assert_eq!(updated.status, JobStatus::Running);
2236        assert_eq!(updated.history.last().unwrap().phase, "running");
2237    }
2238
2239    #[test]
2240    fn update_progress_clamps_to_100() {
2241        let mut jm = JobManager::default();
2242        let job = jm.enqueue("task");
2243        let id = job.id.clone();
2244        jm.update_progress(&id, 150, Some("over".to_string()));
2245        let jobs = jm.list();
2246        let updated = jobs.iter().find(|j| j.id == id).unwrap();
2247        assert_eq!(updated.progress, Some(100));
2248    }
2249
2250    #[test]
2251    fn complete_sets_progress_to_100() {
2252        let mut jm = JobManager::default();
2253        let job = jm.enqueue("task");
2254        let id = job.id.clone();
2255        jm.set_running(&id);
2256        jm.complete(&id);
2257        let jobs = jm.list();
2258        let updated = jobs.iter().find(|j| j.id == id).unwrap();
2259        assert_eq!(updated.status, JobStatus::Completed);
2260        assert_eq!(updated.progress, Some(100));
2261    }
2262
2263    #[test]
2264    fn fail_increments_attempt_and_sets_backoff() {
2265        let mut jm = JobManager::default();
2266        let job = jm.enqueue("fragile");
2267        let id = job.id.clone();
2268        jm.set_running(&id);
2269        jm.fail(&id, "crashed");
2270        let jobs = jm.list();
2271        let updated = jobs.iter().find(|j| j.id == id).unwrap();
2272        assert_eq!(updated.status, JobStatus::Failed);
2273        assert_eq!(updated.retry.attempt, 1);
2274        assert!(updated.retry.next_backoff_ms > 0);
2275        assert!(updated.retry.next_retry_at.is_some());
2276        assert_eq!(updated.detail.as_deref(), Some("crashed"));
2277    }
2278
2279    #[test]
2280    fn fail_clears_retry_after_max_attempts() {
2281        let mut jm = JobManager::default();
2282        let job = jm.enqueue("fragile");
2283        let id = job.id.clone();
2284        for _ in 0..=DEFAULT_JOB_MAX_ATTEMPTS {
2285            jm.set_running(&id);
2286            jm.fail(&id, "boom");
2287        }
2288        let jobs = jm.list();
2289        let updated = jobs.iter().find(|j| j.id == id).unwrap();
2290        assert_eq!(updated.retry.attempt, DEFAULT_JOB_MAX_ATTEMPTS);
2291        assert_eq!(updated.retry.next_backoff_ms, 0);
2292        assert!(updated.retry.next_retry_at.is_none());
2293    }
2294
2295    #[test]
2296    fn cancel_sets_status_and_clears_retry() {
2297        let mut jm = JobManager::default();
2298        let job = jm.enqueue("task");
2299        let id = job.id.clone();
2300        jm.cancel(&id);
2301        let jobs = jm.list();
2302        let updated = jobs.iter().find(|j| j.id == id).unwrap();
2303        assert_eq!(updated.status, JobStatus::Cancelled);
2304        assert_eq!(updated.retry.next_backoff_ms, 0);
2305    }
2306
2307    #[test]
2308    fn pause_and_resume_round_trip() {
2309        let mut jm = JobManager::default();
2310        let job = jm.enqueue("task");
2311        let id = job.id.clone();
2312        jm.set_running(&id);
2313        jm.pause(&id, Some("waiting".to_string()));
2314        let jobs = jm.list();
2315        let paused = jobs.iter().find(|j| j.id == id).unwrap();
2316        assert_eq!(paused.status, JobStatus::Paused);
2317        assert_eq!(paused.detail.as_deref(), Some("waiting"));
2318
2319        jm.resume(&id, None);
2320        let jobs = jm.list();
2321        let resumed = jobs.iter().find(|j| j.id == id).unwrap();
2322        assert_eq!(resumed.status, JobStatus::Running);
2323        assert_eq!(resumed.history.last().unwrap().phase, "resumed");
2324    }
2325
2326    #[test]
2327    fn list_returns_jobs_sorted_by_updated_at_desc() {
2328        let mut jm = JobManager::default();
2329        jm.enqueue("first");
2330        jm.enqueue("second");
2331        jm.enqueue("third");
2332        let jobs = jm.list();
2333        assert_eq!(jobs.len(), 3);
2334        for window in jobs.windows(2) {
2335            assert!(window[0].updated_at >= window[1].updated_at);
2336        }
2337    }
2338
2339    #[test]
2340    fn history_returns_entries_for_existing_job() {
2341        let mut jm = JobManager::default();
2342        let job = jm.enqueue("task");
2343        let id = job.id.clone();
2344        jm.set_running(&id);
2345        jm.complete(&id);
2346        let history = jm.history(&id);
2347        assert_eq!(history.len(), 3); // created, running, completed
2348        assert_eq!(history[0].phase, "created");
2349        assert_eq!(history[1].phase, "running");
2350        assert_eq!(history[2].phase, "completed");
2351    }
2352
2353    #[test]
2354    fn history_returns_empty_for_unknown_job() {
2355        let jm = JobManager::default();
2356        assert!(jm.history("nonexistent").is_empty());
2357    }
2358
2359    #[test]
2360    fn resume_pending_requeues_running_and_queued() {
2361        let mut jm = JobManager::default();
2362        let _j1 = jm.enqueue("queued_task");
2363        let j2 = jm.enqueue("running_task");
2364        let j3 = jm.enqueue("completed_task");
2365        let id2 = j2.id.clone();
2366        let id3 = j3.id.clone();
2367        jm.set_running(&id2);
2368        jm.set_running(&id3);
2369        jm.complete(&id3);
2370
2371        let resumed = jm.resume_pending();
2372        assert_eq!(resumed.len(), 2);
2373        for job in &resumed {
2374            assert_eq!(job.status, JobStatus::Queued);
2375        }
2376    }
2377
2378    // ── JobManager: backoff ────────────────────────────────────────────
2379
2380    #[test]
2381    fn deterministic_backoff_zero_on_first_attempt() {
2382        let retry = JobRetryMetadata {
2383            attempt: 0,
2384            ..Default::default()
2385        };
2386        assert_eq!(JobManager::deterministic_backoff_ms(&retry), 0);
2387    }
2388
2389    #[test]
2390    fn deterministic_backoff_exponential_growth() {
2391        let base = DEFAULT_JOB_BACKOFF_BASE_MS;
2392        for attempt in 1..=5 {
2393            let retry = JobRetryMetadata {
2394                attempt,
2395                backoff_base_ms: base,
2396                ..Default::default()
2397            };
2398            let expected = base * 2u64.pow(attempt.saturating_sub(1).min(20));
2399            assert_eq!(
2400                JobManager::deterministic_backoff_ms(&retry),
2401                expected,
2402                "attempt {attempt}"
2403            );
2404        }
2405    }
2406
2407    #[test]
2408    fn deterministic_backoff_saturates_at_high_exponent() {
2409        let retry = JobRetryMetadata {
2410            attempt: 63,
2411            backoff_base_ms: 1000,
2412            ..Default::default()
2413        };
2414        // Should not panic; result saturates
2415        let _ = JobManager::deterministic_backoff_ms(&retry);
2416    }
2417
2418    // ── JobManager: history truncation ─────────────────────────────────
2419
2420    #[test]
2421    fn push_history_truncates_beyond_max() {
2422        let mut jm = JobManager::default();
2423        let job = jm.enqueue("task");
2424        let id = job.id.clone();
2425        // Generate more history entries than the limit
2426        for i in 0..(MAX_JOB_HISTORY_ENTRIES + 20) {
2427            jm.update_progress(&id, (i % 100) as u8, Some(format!("step {i}")));
2428        }
2429        let history = jm.history(&id);
2430        assert_eq!(history.len(), MAX_JOB_HISTORY_ENTRIES);
2431    }
2432
2433    // ── JobManager: persistence encoding/parsing ───────────────────────
2434
2435    #[test]
2436    fn encode_and_parse_persisted_detail_round_trip() {
2437        let mut jm = JobManager::default();
2438        let job = jm.enqueue("task");
2439        let id = job.id.clone();
2440        jm.set_running(&id);
2441        jm.fail(&id, "oops");
2442        let job = jm.list().into_iter().find(|j| j.id == id).unwrap();
2443
2444        let encoded = JobManager::encode_persisted_detail(&job).unwrap().unwrap();
2445        let parsed = JobManager::parse_persisted_detail(Some(&encoded)).unwrap();
2446
2447        assert_eq!(parsed.status, job.status);
2448        assert_eq!(parsed.detail, job.detail);
2449        assert_eq!(parsed.retry.attempt, job.retry.attempt);
2450        assert_eq!(parsed.history.len(), job.history.len());
2451    }
2452
2453    #[test]
2454    fn parse_persisted_detail_returns_none_for_none_input() {
2455        assert!(JobManager::parse_persisted_detail(None).is_none());
2456    }
2457
2458    #[test]
2459    fn parse_persisted_detail_returns_none_for_invalid_json() {
2460        assert!(JobManager::parse_persisted_detail(Some("not json")).is_none());
2461    }
2462
2463    // ── Helper functions ───────────────────────────────────────────────
2464
2465    #[test]
2466    fn job_status_round_trip_str() {
2467        let statuses = [
2468            JobStatus::Queued,
2469            JobStatus::Running,
2470            JobStatus::Paused,
2471            JobStatus::Completed,
2472            JobStatus::Failed,
2473            JobStatus::Cancelled,
2474        ];
2475        for status in &statuses {
2476            let s = job_status_to_str(*status);
2477            let parsed = job_status_from_str(s);
2478            assert_eq!(parsed, Some(*status), "round-trip failed for {s:?}");
2479        }
2480    }
2481
2482    #[test]
2483    fn job_status_from_str_returns_none_for_unknown() {
2484        assert_eq!(job_status_from_str("unknown"), None);
2485        assert_eq!(job_status_from_str(""), None);
2486    }
2487
2488    #[test]
2489    fn truncate_preview_limits_to_120_chars() {
2490        let long = "a".repeat(200);
2491        let truncated = truncate_preview(&long);
2492        assert_eq!(truncated.len(), 120);
2493    }
2494
2495    #[test]
2496    fn truncate_preview_preserves_short_strings() {
2497        let short = "hello";
2498        assert_eq!(truncate_preview(short), "hello");
2499    }
2500
2501    #[test]
2502    fn runtime_status_to_job_state_maps_correctly() {
2503        assert_eq!(
2504            runtime_status_to_job_state(JobStatus::Queued),
2505            JobStateStatus::Queued
2506        );
2507        assert_eq!(
2508            runtime_status_to_job_state(JobStatus::Running),
2509            JobStateStatus::Running
2510        );
2511        assert_eq!(
2512            runtime_status_to_job_state(JobStatus::Paused),
2513            JobStateStatus::Running
2514        );
2515        assert_eq!(
2516            runtime_status_to_job_state(JobStatus::Completed),
2517            JobStateStatus::Completed
2518        );
2519        assert_eq!(
2520            runtime_status_to_job_state(JobStatus::Failed),
2521            JobStateStatus::Failed
2522        );
2523        assert_eq!(
2524            runtime_status_to_job_state(JobStatus::Cancelled),
2525            JobStateStatus::Cancelled
2526        );
2527    }
2528
2529    #[test]
2530    fn job_state_status_to_runtime_maps_correctly() {
2531        assert_eq!(
2532            job_state_status_to_runtime(JobStateStatus::Queued),
2533            JobStatus::Queued
2534        );
2535        assert_eq!(
2536            job_state_status_to_runtime(JobStateStatus::Running),
2537            JobStatus::Running
2538        );
2539        assert_eq!(
2540            job_state_status_to_runtime(JobStateStatus::Completed),
2541            JobStatus::Completed
2542        );
2543        assert_eq!(
2544            job_state_status_to_runtime(JobStateStatus::Failed),
2545            JobStatus::Failed
2546        );
2547        assert_eq!(
2548            job_state_status_to_runtime(JobStateStatus::Cancelled),
2549            JobStatus::Cancelled
2550        );
2551    }
2552
2553    #[test]
2554    fn preview_from_initial_history_new() {
2555        let preview = preview_from_initial_history(&InitialHistory::New);
2556        assert_eq!(preview, "New conversation");
2557    }
2558
2559    #[test]
2560    fn preview_from_initial_history_forked() {
2561        let preview = preview_from_initial_history(&InitialHistory::Forked(vec![json!("hello")]));
2562        assert!(preview.contains("hello"));
2563    }
2564
2565    #[test]
2566    fn preview_from_initial_history_resumed() {
2567        let preview = preview_from_initial_history(&InitialHistory::Resumed {
2568            conversation_id: "test".to_string(),
2569            history: vec![json!("world")],
2570            rollout_path: PathBuf::from("/tmp/test"),
2571        });
2572        assert!(preview.contains("world"));
2573    }
2574
2575    #[test]
2576    fn json_optional_string_handles_null() {
2577        assert!(json_optional_string(&Value::Null).is_none());
2578    }
2579
2580    #[test]
2581    fn json_optional_string_handles_string() {
2582        assert_eq!(
2583            json_optional_string(&Value::String("hello".to_string())),
2584            Some("hello".to_string())
2585        );
2586    }
2587
2588    #[test]
2589    fn json_optional_string_handles_non_string() {
2590        assert!(json_optional_string(&json!(42)).is_none());
2591    }
2592
2593    #[test]
2594    fn parse_retry_metadata_returns_default_for_none() {
2595        let retry = parse_retry_metadata(None);
2596        assert_eq!(retry.attempt, 0);
2597        assert_eq!(retry.max_attempts, DEFAULT_JOB_MAX_ATTEMPTS);
2598        assert_eq!(retry.backoff_base_ms, DEFAULT_JOB_BACKOFF_BASE_MS);
2599    }
2600
2601    #[test]
2602    fn parse_retry_metadata_parses_fields() {
2603        let value = json!({
2604            "attempt": 2,
2605            "max_attempts": 5,
2606            "backoff_base_ms": 1000,
2607            "next_backoff_ms": 2000,
2608            "next_retry_at": 1234567890i64
2609        });
2610        let retry = parse_retry_metadata(Some(&value));
2611        assert_eq!(retry.attempt, 2);
2612        assert_eq!(retry.max_attempts, 5);
2613        assert_eq!(retry.backoff_base_ms, 1000);
2614        assert_eq!(retry.next_backoff_ms, 2000);
2615        assert_eq!(retry.next_retry_at, Some(1234567890));
2616    }
2617
2618    #[test]
2619    fn parse_history_entry_returns_none_without_status() {
2620        let value = json!({"at": 1, "phase": "test"});
2621        assert!(parse_history_entry(&value).is_none());
2622    }
2623
2624    #[test]
2625    fn parse_history_entry_parses_valid_entry() {
2626        let value = json!({
2627            "at": 100,
2628            "phase": "running",
2629            "status": "running",
2630            "progress": 50,
2631            "detail": "working",
2632            "retry": {"attempt": 0, "max_attempts": 3, "backoff_base_ms": 500}
2633        });
2634        let entry = parse_history_entry(&value).unwrap();
2635        assert_eq!(entry.at, 100);
2636        assert_eq!(entry.phase, "running");
2637        assert_eq!(entry.status, JobStatus::Running);
2638        assert_eq!(entry.progress, Some(50));
2639        assert_eq!(entry.detail.as_deref(), Some("working"));
2640    }
2641}