Skip to main content

codewhale_core/
lib.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use anyhow::Result;
6use codewhale_agent::ModelRegistry;
7use codewhale_config::{CliRuntimeOverrides, ConfigToml, ProviderKind};
8use codewhale_execpolicy::{
9    AskForApproval, ExecApprovalRequirement, ExecPolicyContext, ExecPolicyDecision,
10    ExecPolicyEngine,
11};
12use codewhale_hooks::{HookDispatcher, HookEvent};
13use codewhale_mcp::{
14    McpManager, McpStartupCompleteEvent, McpStartupStatus as McpManagerStartupStatus,
15};
16use codewhale_protocol::{
17    AppResponse, EventFrame, ExecApprovalRequestEvent, PromptRequest, PromptResponse,
18    ResponseChannel, ReviewDecision, Thread, ThreadForkParams, ThreadGoal, ThreadGoalClearParams,
19    ThreadGoalGetParams, ThreadGoalSetParams, ThreadGoalStatus, ThreadListParams, ThreadReadParams,
20    ThreadRequest, ThreadResponse, ThreadResumeParams, ThreadSetNameParams, ThreadStatus,
21    ToolPayload,
22};
23use codewhale_state::{
24    JobStateRecord, JobStateStatus, SessionSource, StateStore, ThreadGoalRecord,
25    ThreadGoalStatus as PersistedThreadGoalStatus, ThreadListFilters, ThreadMetadata,
26    ThreadStatus as PersistedThreadStatus,
27};
28use codewhale_tools::{ToolCall, ToolRegistry};
29use serde_json::{Value, json};
30use uuid::Uuid;
31
32/// How a new thread's conversation history is initialized.
33#[derive(Debug, Clone)]
34pub enum InitialHistory {
35    /// Start with an empty conversation.
36    New,
37    /// Forked from an existing thread with the given history items.
38    Forked(Vec<Value>),
39    /// Resumed from a persisted thread with its full history.
40    Resumed {
41        conversation_id: String,
42        history: Vec<Value>,
43        rollout_path: PathBuf,
44    },
45}
46
47/// Result of spawning or resuming a thread.
48#[derive(Debug, Clone)]
49pub struct NewThread {
50    /// The thread metadata.
51    pub thread: Thread,
52    /// Resolved model identifier.
53    pub model: String,
54    /// Provider that serves the model.
55    pub model_provider: String,
56    /// Working directory for the thread.
57    pub cwd: PathBuf,
58    /// Approval policy override, if any.
59    pub approval_policy: Option<String>,
60    /// Sandbox mode override, if any.
61    pub sandbox: Option<String>,
62}
63
64/// Status of a background job.
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum JobStatus {
67    /// Waiting to be picked up.
68    Queued,
69    /// Currently executing.
70    Running,
71    /// Temporarily paused.
72    Paused,
73    /// Finished successfully.
74    Completed,
75    /// Finished with an error.
76    Failed,
77    /// Cancelled by the user.
78    Cancelled,
79}
80
81const JOB_DETAIL_SCHEMA_VERSION: u8 = 1;
82const DEFAULT_JOB_MAX_ATTEMPTS: u32 = 3;
83const DEFAULT_JOB_BACKOFF_BASE_MS: u64 = 500;
84const MAX_JOB_HISTORY_ENTRIES: usize = 64;
85
86/// Retry state for a job that failed and may be retried.
87#[derive(Debug, Clone)]
88pub struct JobRetryMetadata {
89    /// Current attempt number (0 = not yet retried).
90    pub attempt: u32,
91    /// Maximum number of retry attempts before giving up.
92    pub max_attempts: u32,
93    /// Base delay in milliseconds for exponential backoff.
94    pub backoff_base_ms: u64,
95    /// Computed delay in milliseconds until the next retry.
96    pub next_backoff_ms: u64,
97    /// Timestamp when the next retry should be attempted.
98    pub next_retry_at: Option<i64>,
99}
100
101impl Default for JobRetryMetadata {
102    fn default() -> Self {
103        Self {
104            attempt: 0,
105            max_attempts: DEFAULT_JOB_MAX_ATTEMPTS,
106            backoff_base_ms: DEFAULT_JOB_BACKOFF_BASE_MS,
107            next_backoff_ms: 0,
108            next_retry_at: None,
109        }
110    }
111}
112
113/// A single entry in a job's history log.
114#[derive(Debug, Clone)]
115pub struct JobHistoryEntry {
116    /// Timestamp when this entry was recorded.
117    pub at: i64,
118    /// Phase name (e.g., "created", "running", "failed").
119    pub phase: String,
120    /// Job status at this point in time.
121    pub status: JobStatus,
122    /// Progress percentage at this point, if available.
123    pub progress: Option<u8>,
124    /// Human-readable detail message.
125    pub detail: Option<String>,
126    /// Retry state snapshot at this point.
127    pub retry: JobRetryMetadata,
128}
129
130#[derive(Debug, Clone)]
131struct PersistedJobDetail {
132    pub status: JobStatus,
133    pub detail: Option<String>,
134    pub retry: JobRetryMetadata,
135    pub history: Vec<JobHistoryEntry>,
136}
137
138/// A complete job record with all metadata and history.
139#[derive(Debug, Clone)]
140pub struct JobRecord {
141    /// Unique job identifier.
142    pub id: String,
143    /// Human-readable job name.
144    pub name: String,
145    /// Current job status.
146    pub status: JobStatus,
147    /// Current progress percentage (0-100).
148    pub progress: Option<u8>,
149    /// Human-readable detail about the current state.
150    pub detail: Option<String>,
151    /// Retry state for failed jobs.
152    pub retry: JobRetryMetadata,
153    /// Chronological history of state transitions.
154    pub history: Vec<JobHistoryEntry>,
155    /// Timestamp when the job was created.
156    pub created_at: i64,
157    /// Timestamp of the last state change.
158    pub updated_at: i64,
159}
160
161/// Manages background jobs with retry logic and persistence.
162#[derive(Debug, Default)]
163pub struct JobManager {
164    jobs: HashMap<String, JobRecord>,
165}
166
167impl JobManager {
168    fn now_ts() -> i64 {
169        chrono::Utc::now().timestamp()
170    }
171
172    fn deterministic_backoff_ms(retry: &JobRetryMetadata) -> u64 {
173        if retry.attempt == 0 {
174            return 0;
175        }
176        let exponent = retry.attempt.saturating_sub(1).min(20);
177        let multiplier = 1u64.checked_shl(exponent).unwrap_or(u64::MAX);
178        retry.backoff_base_ms.saturating_mul(multiplier)
179    }
180
181    fn clear_retry_schedule(retry: &mut JobRetryMetadata) {
182        retry.next_backoff_ms = 0;
183        retry.next_retry_at = None;
184    }
185
186    fn push_history(job: &mut JobRecord, phase: &str) {
187        job.history.push(JobHistoryEntry {
188            at: job.updated_at,
189            phase: phase.to_string(),
190            status: job.status,
191            progress: job.progress,
192            detail: job.detail.clone(),
193            retry: job.retry.clone(),
194        });
195        if job.history.len() > MAX_JOB_HISTORY_ENTRIES {
196            let to_drain = job.history.len() - MAX_JOB_HISTORY_ENTRIES;
197            job.history.drain(0..to_drain);
198        }
199    }
200
201    fn parse_persisted_detail(raw: Option<&str>) -> Option<PersistedJobDetail> {
202        let raw = raw?;
203        let parsed: Value = serde_json::from_str(raw).ok()?;
204        let status = parsed
205            .get("status")
206            .and_then(Value::as_str)
207            .and_then(job_status_from_str)?;
208        let detail = parsed.get("detail").and_then(json_optional_string);
209        let retry = parse_retry_metadata(parsed.get("retry"));
210        let history = parsed
211            .get("history")
212            .and_then(Value::as_array)
213            .map(|items| {
214                items
215                    .iter()
216                    .filter_map(parse_history_entry)
217                    .collect::<Vec<_>>()
218            })
219            .unwrap_or_default();
220        Some(PersistedJobDetail {
221            status,
222            detail,
223            retry,
224            history,
225        })
226    }
227
228    fn encode_persisted_detail(job: &JobRecord) -> Result<Option<String>> {
229        let encoded = json!({
230            "schema_version": JOB_DETAIL_SCHEMA_VERSION,
231            "status": job_status_to_str(job.status),
232            "detail": job.detail.clone(),
233            "retry": job_retry_to_value(&job.retry),
234            "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
235        })
236        .to_string();
237        Ok(Some(encoded))
238    }
239
240    /// Enqueues a new job and returns its record.
241    pub fn enqueue(&mut self, name: impl Into<String>) -> JobRecord {
242        let now = Self::now_ts();
243        let id = format!("job-{}", Uuid::new_v4());
244        let mut job = JobRecord {
245            id: id.clone(),
246            name: name.into(),
247            status: JobStatus::Queued,
248            progress: Some(0),
249            detail: None,
250            retry: JobRetryMetadata::default(),
251            history: Vec::new(),
252            created_at: now,
253            updated_at: now,
254        };
255        Self::push_history(&mut job, "created");
256        self.jobs.insert(id, job.clone());
257        job
258    }
259
260    /// Transitions a job to running and clears its retry schedule.
261    pub fn set_running(&mut self, id: &str) {
262        if let Some(job) = self.jobs.get_mut(id) {
263            job.status = JobStatus::Running;
264            Self::clear_retry_schedule(&mut job.retry);
265            job.updated_at = Self::now_ts();
266            Self::push_history(job, "running");
267        }
268    }
269
270    /// Updates a job's progress (clamped to 100) and optional detail message.
271    pub fn update_progress(&mut self, id: &str, progress: u8, detail: Option<String>) {
272        if let Some(job) = self.jobs.get_mut(id) {
273            job.progress = Some(progress.min(100));
274            job.detail = detail;
275            job.updated_at = Self::now_ts();
276            Self::push_history(job, "progress_updated");
277        }
278    }
279
280    /// Marks a job as completed with 100% progress and clears its retry schedule.
281    pub fn complete(&mut self, id: &str) {
282        if let Some(job) = self.jobs.get_mut(id) {
283            job.status = JobStatus::Completed;
284            job.progress = Some(100);
285            Self::clear_retry_schedule(&mut job.retry);
286            job.updated_at = Self::now_ts();
287            Self::push_history(job, "completed");
288        }
289    }
290
291    /// Marks a job as failed and schedules a retry if attempts remain.
292    pub fn fail(&mut self, id: &str, detail: impl Into<String>) {
293        if let Some(job) = self.jobs.get_mut(id) {
294            let now = Self::now_ts();
295            job.status = JobStatus::Failed;
296            job.detail = Some(detail.into());
297            if job.retry.attempt < job.retry.max_attempts {
298                job.retry.attempt += 1;
299                job.retry.next_backoff_ms = Self::deterministic_backoff_ms(&job.retry);
300                let delay_secs = ((job.retry.next_backoff_ms.saturating_add(999)) / 1000)
301                    .min(i64::MAX as u64) as i64;
302                job.retry.next_retry_at = Some(now.saturating_add(delay_secs));
303            } else {
304                Self::clear_retry_schedule(&mut job.retry);
305            }
306            job.updated_at = now;
307            Self::push_history(job, "failed");
308        }
309    }
310
311    /// Cancels a job and clears any pending retry schedule.
312    pub fn cancel(&mut self, id: &str) {
313        if let Some(job) = self.jobs.get_mut(id) {
314            job.status = JobStatus::Cancelled;
315            Self::clear_retry_schedule(&mut job.retry);
316            job.updated_at = Self::now_ts();
317            Self::push_history(job, "cancelled");
318        }
319    }
320
321    /// Pauses a job, optionally updating its detail message.
322    pub fn pause(&mut self, id: &str, detail: Option<String>) {
323        if let Some(job) = self.jobs.get_mut(id) {
324            job.status = JobStatus::Paused;
325            if detail.is_some() {
326                job.detail = detail;
327            }
328            job.updated_at = Self::now_ts();
329            Self::push_history(job, "paused");
330        }
331    }
332
333    /// Resumes a paused or failed job back to running status.
334    pub fn resume(&mut self, id: &str, detail: Option<String>) {
335        if let Some(job) = self.jobs.get_mut(id) {
336            job.status = JobStatus::Running;
337            if detail.is_some() {
338                job.detail = detail;
339            }
340            Self::clear_retry_schedule(&mut job.retry);
341            job.updated_at = Self::now_ts();
342            Self::push_history(job, "resumed");
343        }
344    }
345
346    /// Returns all jobs sorted by most recently updated first.
347    pub fn list(&self) -> Vec<JobRecord> {
348        let mut out = self.jobs.values().cloned().collect::<Vec<_>>();
349        out.sort_by_key(|job| std::cmp::Reverse(job.updated_at));
350        out
351    }
352
353    /// Returns the history entries for a job, or an empty vec if not found.
354    pub fn history(&self, id: &str) -> Vec<JobHistoryEntry> {
355        self.jobs
356            .get(id)
357            .map(|job| job.history.clone())
358            .unwrap_or_default()
359    }
360
361    /// Resets queued or running jobs back to queued on application resume.
362    pub fn resume_pending(&mut self) -> Vec<JobRecord> {
363        let mut resumed = Vec::new();
364        for job in self.jobs.values_mut() {
365            if matches!(job.status, JobStatus::Queued | JobStatus::Running) {
366                job.status = JobStatus::Queued;
367                job.updated_at = Self::now_ts();
368                Self::push_history(job, "queued_after_resume");
369                resumed.push(job.clone());
370            }
371        }
372        resumed
373    }
374
375    /// Loads jobs from the state store, deserializing extended detail when available.
376    pub fn load_from_store(&mut self, store: &StateStore) -> Result<()> {
377        let persisted = store.list_jobs(Some(500))?;
378        for job in persisted {
379            let fallback_status = job_state_status_to_runtime(job.status);
380            let parsed = Self::parse_persisted_detail(job.detail.as_deref());
381            let (status, detail, retry, history) = if let Some(detail_state) = parsed {
382                (
383                    detail_state.status,
384                    detail_state.detail,
385                    detail_state.retry,
386                    detail_state.history,
387                )
388            } else {
389                (
390                    fallback_status,
391                    job.detail,
392                    JobRetryMetadata::default(),
393                    Vec::new(),
394                )
395            };
396            self.jobs.insert(
397                job.id.clone(),
398                JobRecord {
399                    id: job.id,
400                    name: job.name,
401                    status,
402                    progress: job.progress,
403                    detail,
404                    retry,
405                    history,
406                    created_at: job.created_at,
407                    updated_at: job.updated_at,
408                },
409            );
410        }
411        Ok(())
412    }
413
414    /// Persists a single job's current state to the state store.
415    pub fn persist_job(&self, store: &StateStore, id: &str) -> Result<()> {
416        let Some(job) = self.jobs.get(id) else {
417            return Ok(());
418        };
419        let encoded_detail = Self::encode_persisted_detail(job)?;
420        store.upsert_job(&JobStateRecord {
421            id: job.id.clone(),
422            name: job.name.clone(),
423            status: runtime_status_to_job_state(job.status),
424            progress: job.progress,
425            detail: encoded_detail,
426            created_at: job.created_at,
427            updated_at: job.updated_at,
428        })
429    }
430
431    /// Persists all in-memory jobs to the state store.
432    pub fn persist_all(&self, store: &StateStore) -> Result<()> {
433        for id in self.jobs.keys() {
434            self.persist_job(store, id)?;
435        }
436        Ok(())
437    }
438}
439
440/// Manages thread lifecycle: spawn, resume, fork, archive, and persistence.
441pub struct ThreadManager {
442    store: StateStore,
443    running_threads: HashMap<String, Thread>,
444    cli_version: String,
445}
446
447impl ThreadManager {
448    /// Creates a new `ThreadManager` backed by the given state store.
449    pub fn new(store: StateStore) -> Self {
450        Self {
451            store,
452            running_threads: HashMap::new(),
453            cli_version: env!("CARGO_PKG_VERSION").to_string(),
454        }
455    }
456
457    /// Returns a reference to the underlying state store.
458    pub fn state_store(&self) -> &StateStore {
459        &self.store
460    }
461
462    /// Spawns a new thread with the given initial history and persists it.
463    pub fn spawn_thread_with_history(
464        &mut self,
465        model_provider: String,
466        cwd: PathBuf,
467        initial_history: InitialHistory,
468        persist_extended_history: bool,
469    ) -> Result<NewThread> {
470        let id = format!("thread-{}", Uuid::new_v4());
471        let now = chrono::Utc::now().timestamp();
472        let preview = preview_from_initial_history(&initial_history);
473        let source = match initial_history {
474            InitialHistory::New => SessionSource::Interactive,
475            InitialHistory::Forked(_) => SessionSource::Fork,
476            InitialHistory::Resumed { .. } => SessionSource::Resume,
477        };
478        let thread = Thread {
479            id: id.clone(),
480            preview,
481            ephemeral: !persist_extended_history,
482            model_provider: model_provider.clone(),
483            created_at: now,
484            updated_at: now,
485            status: ThreadStatus::Running,
486            path: None,
487            cwd: cwd.clone(),
488            cli_version: self.cli_version.clone(),
489            source: match source {
490                SessionSource::Interactive => codewhale_protocol::SessionSource::Interactive,
491                SessionSource::Resume => codewhale_protocol::SessionSource::Resume,
492                SessionSource::Fork => codewhale_protocol::SessionSource::Fork,
493                SessionSource::Api => codewhale_protocol::SessionSource::Api,
494                SessionSource::Unknown => codewhale_protocol::SessionSource::Unknown,
495            },
496            name: None,
497        };
498        self.persist_thread(&thread, None)?;
499        match &initial_history {
500            InitialHistory::Forked(items) => {
501                for item in items {
502                    self.store.append_message(
503                        &thread.id,
504                        "history",
505                        &item.to_string(),
506                        Some(item.clone()),
507                    )?;
508                }
509            }
510            InitialHistory::Resumed { history, .. } => {
511                for item in history {
512                    self.store.append_message(
513                        &thread.id,
514                        "history",
515                        &item.to_string(),
516                        Some(item.clone()),
517                    )?;
518                }
519            }
520            InitialHistory::New => {}
521        }
522        self.running_threads
523            .insert(thread.id.clone(), thread.clone());
524        Ok(NewThread {
525            thread,
526            model: "auto".to_string(),
527            model_provider,
528            cwd,
529            approval_policy: None,
530            sandbox: None,
531        })
532    }
533
534    /// Resumes an existing thread, returning `None` if not found.
535    pub fn resume_thread_with_history(
536        &mut self,
537        params: &ThreadResumeParams,
538        fallback_cwd: &Path,
539        model_provider: String,
540    ) -> Result<Option<NewThread>> {
541        if params.history.is_none()
542            && let Some(thread) = self.running_threads.get(&params.thread_id).cloned()
543        {
544            return Ok(Some(NewThread {
545                model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
546                model_provider: params.model_provider.clone().unwrap_or(model_provider),
547                cwd: params.cwd.clone().unwrap_or_else(|| thread.cwd.clone()),
548                approval_policy: params.approval_policy.clone(),
549                sandbox: params.sandbox.clone(),
550                thread,
551            }));
552        }
553
554        let persisted = self.store.get_thread(&params.thread_id)?;
555        let Some(metadata) = persisted else {
556            return Ok(None);
557        };
558        let mut thread = to_protocol_thread(metadata);
559        thread.status = ThreadStatus::Running;
560        thread.updated_at = chrono::Utc::now().timestamp();
561        thread.cwd = params
562            .cwd
563            .clone()
564            .unwrap_or_else(|| fallback_cwd.to_path_buf());
565        self.persist_thread(&thread, None)?;
566        self.running_threads
567            .insert(thread.id.clone(), thread.clone());
568        if let Some(history) = params.history.as_ref() {
569            for item in history {
570                self.store.append_message(
571                    &thread.id,
572                    "history",
573                    &item.to_string(),
574                    Some(item.clone()),
575                )?;
576            }
577        }
578
579        Ok(Some(NewThread {
580            model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
581            model_provider: params.model_provider.clone().unwrap_or(model_provider),
582            cwd: thread.cwd.clone(),
583            approval_policy: params.approval_policy.clone(),
584            sandbox: params.sandbox.clone(),
585            thread,
586        }))
587    }
588
589    /// Forks an existing thread into a new one, inheriting the parent's provider.
590    pub fn fork_thread(
591        &mut self,
592        params: &ThreadForkParams,
593        fallback_cwd: &Path,
594    ) -> Result<Option<NewThread>> {
595        let parent = self.store.get_thread(&params.thread_id)?;
596        let Some(parent) = parent else {
597            return Ok(None);
598        };
599        let parent_thread = to_protocol_thread(parent);
600        let new = self.spawn_thread_with_history(
601            params
602                .model_provider
603                .clone()
604                .unwrap_or_else(|| parent_thread.model_provider.clone()),
605            params
606                .cwd
607                .clone()
608                .unwrap_or_else(|| fallback_cwd.to_path_buf()),
609            InitialHistory::Forked(vec![json!({
610                "type": "fork",
611                "from_thread_id": parent_thread.id
612            })]),
613            params.persist_extended_history,
614        )?;
615        Ok(Some(new))
616    }
617
618    /// Lists threads matching the given filter parameters.
619    pub fn list_threads(&self, params: &ThreadListParams) -> Result<Vec<Thread>> {
620        let list = self.store.list_threads(ThreadListFilters {
621            include_archived: params.include_archived,
622            limit: params.limit,
623        })?;
624        Ok(list.into_iter().map(to_protocol_thread).collect())
625    }
626
627    /// Reads a single thread by id, or `None` if not found.
628    pub fn read_thread(&self, params: &ThreadReadParams) -> Result<Option<Thread>> {
629        Ok(self
630            .store
631            .get_thread(&params.thread_id)?
632            .map(to_protocol_thread))
633    }
634
635    /// Sets the display name for a thread, returning the updated thread or `None`.
636    pub fn set_thread_name(&mut self, params: &ThreadSetNameParams) -> Result<Option<Thread>> {
637        let Some(mut metadata) = self.store.get_thread(&params.thread_id)? else {
638            return Ok(None);
639        };
640        metadata.name = Some(params.name.clone());
641        metadata.updated_at = chrono::Utc::now().timestamp();
642        self.store.upsert_thread(&metadata)?;
643        let updated = to_protocol_thread(metadata);
644        self.running_threads
645            .insert(updated.id.clone(), updated.clone());
646        Ok(Some(updated))
647    }
648
649    /// Sets or replaces the persisted goal for a thread.
650    pub fn set_thread_goal(&mut self, params: &ThreadGoalSetParams) -> Result<Option<ThreadGoal>> {
651        if self.store.get_thread(&params.thread_id)?.is_none() {
652            return Ok(None);
653        }
654        let now = chrono::Utc::now().timestamp();
655        let goal = ThreadGoalRecord {
656            thread_id: params.thread_id.clone(),
657            goal_id: format!("goal-{}", Uuid::new_v4()),
658            objective: params.objective.clone(),
659            status: PersistedThreadGoalStatus::Active,
660            token_budget: params.token_budget,
661            tokens_used: 0,
662            time_used_seconds: 0,
663            created_at: now,
664            updated_at: now,
665        };
666        self.store.upsert_thread_goal(&goal)?;
667        Ok(Some(to_protocol_goal(goal)))
668    }
669
670    /// Reads the persisted goal for a thread.
671    pub fn get_thread_goal(&self, params: &ThreadGoalGetParams) -> Result<Option<ThreadGoal>> {
672        Ok(self
673            .store
674            .get_thread_goal(&params.thread_id)?
675            .map(to_protocol_goal))
676    }
677
678    /// Clears the persisted goal for a thread, returning whether one existed.
679    pub fn clear_thread_goal(&mut self, params: &ThreadGoalClearParams) -> Result<bool> {
680        self.store.delete_thread_goal(&params.thread_id)
681    }
682
683    /// Archives a thread so it no longer appears in default listings.
684    pub fn archive_thread(&mut self, thread_id: &str) -> Result<()> {
685        self.store.mark_archived(thread_id)?;
686        if let Some(thread) = self.running_threads.get_mut(thread_id) {
687            thread.status = ThreadStatus::Archived;
688        }
689        Ok(())
690    }
691
692    /// Restores an archived thread to active status.
693    pub fn unarchive_thread(&mut self, thread_id: &str) -> Result<()> {
694        self.store.mark_unarchived(thread_id)?;
695        Ok(())
696    }
697
698    /// Records a user message in a thread and updates its preview and timestamp.
699    pub fn touch_message(&mut self, thread_id: &str, input: &str) -> Result<()> {
700        let Some(mut metadata) = self.store.get_thread(thread_id)? else {
701            return Ok(());
702        };
703        metadata.updated_at = chrono::Utc::now().timestamp();
704        metadata.preview = truncate_preview(input);
705        metadata.status = PersistedThreadStatus::Running;
706        self.store.upsert_thread(&metadata)?;
707        if let Some(thread) = self.running_threads.get_mut(thread_id) {
708            thread.updated_at = metadata.updated_at;
709            thread.preview = metadata.preview;
710            thread.status = ThreadStatus::Running;
711        }
712        let message_id = self.store.append_message(thread_id, "user", input, None)?;
713        self.store.save_checkpoint(
714            thread_id,
715            "latest",
716            &json!({
717                "reason": "thread_message",
718                "message_id": message_id,
719                "role": "user",
720                "preview": truncate_preview(input),
721                "updated_at": metadata.updated_at
722            }),
723        )?;
724        Ok(())
725    }
726
727    fn persist_thread(&self, thread: &Thread, rollout_path: Option<PathBuf>) -> Result<()> {
728        self.store.upsert_thread(&ThreadMetadata {
729            id: thread.id.clone(),
730            rollout_path,
731            preview: thread.preview.clone(),
732            ephemeral: thread.ephemeral,
733            model_provider: thread.model_provider.clone(),
734            created_at: thread.created_at,
735            updated_at: thread.updated_at,
736            status: to_persisted_status(&thread.status),
737            path: thread.path.clone(),
738            cwd: thread.cwd.clone(),
739            cli_version: thread.cli_version.clone(),
740            source: to_persisted_source(&thread.source),
741            name: thread.name.clone(),
742            sandbox_policy: None,
743            approval_mode: None,
744            archived: matches!(thread.status, ThreadStatus::Archived),
745            archived_at: None,
746            git_sha: None,
747            git_branch: None,
748            git_origin_url: None,
749            memory_mode: None,
750            current_leaf_id: None,
751        })
752    }
753}
754
755/// Top-level runtime combining config, model registry, threads, tools, MCP, and hooks.
756pub struct Runtime {
757    /// Resolved application configuration.
758    pub config: ConfigToml,
759    /// Registry of available model providers.
760    pub model_registry: ModelRegistry,
761    /// Manages conversation thread lifecycle.
762    pub thread_manager: ThreadManager,
763    /// Registry of callable tools.
764    pub tool_registry: Arc<ToolRegistry>,
765    /// Manager for MCP server connections.
766    pub mcp_manager: Arc<McpManager>,
767    /// Engine for evaluating execution policy decisions.
768    pub exec_policy: ExecPolicyEngine,
769    /// Dispatcher for lifecycle hooks.
770    pub hooks: HookDispatcher,
771    /// Manager for background job lifecycle.
772    pub jobs: JobManager,
773}
774
775impl Runtime {
776    /// Constructs a new `Runtime`, loading existing jobs from the state store.
777    pub fn new(
778        config: ConfigToml,
779        model_registry: ModelRegistry,
780        state: StateStore,
781        tool_registry: Arc<ToolRegistry>,
782        mcp_manager: Arc<McpManager>,
783        exec_policy: ExecPolicyEngine,
784        hooks: HookDispatcher,
785    ) -> Self {
786        let mut jobs = JobManager::default();
787        if let Err(e) = jobs.load_from_store(&state) {
788            tracing::warn!("Failed to load job store, starting with empty job list: {e}");
789        }
790        Self {
791            config,
792            model_registry,
793            thread_manager: ThreadManager::new(state),
794            tool_registry,
795            mcp_manager,
796            exec_policy,
797            hooks,
798            jobs,
799        }
800    }
801
802    fn persisted_thread_data(&self, thread_id: &str) -> Result<Value> {
803        let history = self
804            .thread_manager
805            .state_store()
806            .list_messages(thread_id, Some(500))?
807            .into_iter()
808            .map(|message| {
809                json!({
810                    "id": message.id,
811                    "role": message.role,
812                    "content": message.content,
813                    "item": message.item,
814                    "created_at": message.created_at
815                })
816            })
817            .collect::<Vec<_>>();
818
819        let checkpoint = self
820            .thread_manager
821            .state_store()
822            .load_checkpoint(thread_id, None)?
823            .map(|record| {
824                json!({
825                    "checkpoint_id": record.checkpoint_id,
826                    "state": record.state,
827                    "created_at": record.created_at
828                })
829            });
830
831        let goal = self
832            .thread_manager
833            .state_store()
834            .get_thread_goal(thread_id)?
835            .map(to_protocol_goal);
836
837        Ok(json!({
838            "history": history,
839            "checkpoint": checkpoint,
840            "goal": goal
841        }))
842    }
843
844    fn persist_latest_checkpoint(&self, thread_id: &str, reason: &str, state: Value) -> Result<()> {
845        self.thread_manager.state_store().save_checkpoint(
846            thread_id,
847            "latest",
848            &json!({
849                "reason": reason,
850                "saved_at": chrono::Utc::now().timestamp(),
851                "state": state
852            }),
853        )
854    }
855
856    /// Dispatches a thread request (create, start, resume, fork, list, read, etc.).
857    pub async fn handle_thread(&mut self, req: ThreadRequest) -> Result<ThreadResponse> {
858        match req {
859            ThreadRequest::Create { .. } => {
860                let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
861                let new = self.thread_manager.spawn_thread_with_history(
862                    "deepseek".to_string(),
863                    cwd,
864                    InitialHistory::New,
865                    false,
866                )?;
867                let mut response = thread_response_from_new("created", new);
868                response.data = self.persisted_thread_data(&response.thread_id)?;
869                Ok(response)
870            }
871            ThreadRequest::Start(params) => {
872                let cwd = params.cwd.clone().unwrap_or_else(|| {
873                    std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
874                });
875                let new = self.thread_manager.spawn_thread_with_history(
876                    params
877                        .model_provider
878                        .clone()
879                        .unwrap_or_else(|| "deepseek".to_string()),
880                    cwd,
881                    InitialHistory::New,
882                    params.persist_extended_history,
883                )?;
884                let mut response = thread_response_from_new("started", new);
885                response.data = self.persisted_thread_data(&response.thread_id)?;
886                Ok(response)
887            }
888            ThreadRequest::Resume(params) => {
889                let fallback_cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
890                if let Some(new) = self.thread_manager.resume_thread_with_history(
891                    &params,
892                    &fallback_cwd,
893                    "deepseek".to_string(),
894                )? {
895                    let mut response = thread_response_from_new("resumed", new);
896                    response.data = self.persisted_thread_data(&response.thread_id)?;
897                    Ok(response)
898                } else {
899                    Ok(ThreadResponse {
900                        thread_id: params.thread_id,
901                        status: "missing".to_string(),
902                        thread: None,
903                        threads: Vec::new(),
904                        goal: None,
905                        model: None,
906                        model_provider: None,
907                        cwd: None,
908                        approval_policy: params.approval_policy,
909                        sandbox: params.sandbox,
910                        events: Vec::new(),
911                        data: json!({"error":"thread not found"}),
912                    })
913                }
914            }
915            ThreadRequest::Fork(params) => {
916                let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
917                if let Some(new) = self.thread_manager.fork_thread(&params, &cwd)? {
918                    let mut response = thread_response_from_new("forked", new);
919                    response.data = self.persisted_thread_data(&response.thread_id)?;
920                    Ok(response)
921                } else {
922                    Ok(ThreadResponse {
923                        thread_id: params.thread_id,
924                        status: "missing".to_string(),
925                        thread: None,
926                        threads: Vec::new(),
927                        goal: None,
928                        model: None,
929                        model_provider: None,
930                        cwd: None,
931                        approval_policy: params.approval_policy,
932                        sandbox: params.sandbox,
933                        events: Vec::new(),
934                        data: json!({"error":"thread not found"}),
935                    })
936                }
937            }
938            ThreadRequest::List(params) => Ok(ThreadResponse {
939                thread_id: "list".to_string(),
940                status: "ok".to_string(),
941                thread: None,
942                threads: self.thread_manager.list_threads(&params)?,
943                goal: None,
944                model: None,
945                model_provider: None,
946                cwd: None,
947                approval_policy: None,
948                sandbox: None,
949                events: Vec::new(),
950                data: json!({}),
951            }),
952            ThreadRequest::Read(params) => {
953                let id = params.thread_id.clone();
954                let data = self.persisted_thread_data(&id)?;
955                Ok(ThreadResponse {
956                    thread_id: id,
957                    status: "ok".to_string(),
958                    thread: self.thread_manager.read_thread(&params)?,
959                    threads: Vec::new(),
960                    goal: self.thread_manager.get_thread_goal(&ThreadGoalGetParams {
961                        thread_id: params.thread_id,
962                    })?,
963                    model: None,
964                    model_provider: None,
965                    cwd: None,
966                    approval_policy: None,
967                    sandbox: None,
968                    events: Vec::new(),
969                    data,
970                })
971            }
972            ThreadRequest::SetName(params) => Ok(ThreadResponse {
973                thread_id: params.thread_id.clone(),
974                status: "ok".to_string(),
975                thread: self.thread_manager.set_thread_name(&params)?,
976                threads: Vec::new(),
977                goal: None,
978                model: None,
979                model_provider: None,
980                cwd: None,
981                approval_policy: None,
982                sandbox: None,
983                events: Vec::new(),
984                data: json!({}),
985            }),
986            ThreadRequest::GoalSet(params) => {
987                let thread_id = params.thread_id.clone();
988                if let Some(goal) = self.thread_manager.set_thread_goal(&params)? {
989                    Ok(ThreadResponse {
990                        thread_id,
991                        status: "ok".to_string(),
992                        thread: None,
993                        threads: Vec::new(),
994                        goal: Some(goal.clone()),
995                        model: None,
996                        model_provider: None,
997                        cwd: None,
998                        approval_policy: None,
999                        sandbox: None,
1000                        events: vec![EventFrame::ThreadGoalUpdated { goal: goal.clone() }],
1001                        data: json!({ "goal": goal }),
1002                    })
1003                } else {
1004                    Ok(ThreadResponse {
1005                        thread_id,
1006                        status: "missing".to_string(),
1007                        thread: None,
1008                        threads: Vec::new(),
1009                        goal: None,
1010                        model: None,
1011                        model_provider: None,
1012                        cwd: None,
1013                        approval_policy: None,
1014                        sandbox: None,
1015                        events: Vec::new(),
1016                        data: json!({"error":"thread not found"}),
1017                    })
1018                }
1019            }
1020            ThreadRequest::GoalGet(params) => {
1021                let goal = self.thread_manager.get_thread_goal(&params)?;
1022                Ok(ThreadResponse {
1023                    thread_id: params.thread_id,
1024                    status: "ok".to_string(),
1025                    thread: None,
1026                    threads: Vec::new(),
1027                    goal: goal.clone(),
1028                    model: None,
1029                    model_provider: None,
1030                    cwd: None,
1031                    approval_policy: None,
1032                    sandbox: None,
1033                    events: Vec::new(),
1034                    data: json!({ "goal": goal }),
1035                })
1036            }
1037            ThreadRequest::GoalClear(params) => {
1038                let thread_id = params.thread_id.clone();
1039                let cleared = self.thread_manager.clear_thread_goal(&params)?;
1040                Ok(ThreadResponse {
1041                    thread_id: thread_id.clone(),
1042                    status: if cleared { "cleared" } else { "empty" }.to_string(),
1043                    thread: None,
1044                    threads: Vec::new(),
1045                    goal: None,
1046                    model: None,
1047                    model_provider: None,
1048                    cwd: None,
1049                    approval_policy: None,
1050                    sandbox: None,
1051                    events: if cleared {
1052                        vec![EventFrame::ThreadGoalCleared { thread_id }]
1053                    } else {
1054                        Vec::new()
1055                    },
1056                    data: json!({ "cleared": cleared }),
1057                })
1058            }
1059            ThreadRequest::Archive { thread_id } => {
1060                self.thread_manager.archive_thread(&thread_id)?;
1061                Ok(ThreadResponse {
1062                    thread_id,
1063                    status: "archived".to_string(),
1064                    thread: None,
1065                    threads: Vec::new(),
1066                    goal: None,
1067                    model: None,
1068                    model_provider: None,
1069                    cwd: None,
1070                    approval_policy: None,
1071                    sandbox: None,
1072                    events: Vec::new(),
1073                    data: json!({}),
1074                })
1075            }
1076            ThreadRequest::Unarchive { thread_id } => {
1077                self.thread_manager.unarchive_thread(&thread_id)?;
1078                Ok(ThreadResponse {
1079                    thread_id,
1080                    status: "unarchived".to_string(),
1081                    thread: None,
1082                    threads: Vec::new(),
1083                    goal: None,
1084                    model: None,
1085                    model_provider: None,
1086                    cwd: None,
1087                    approval_policy: None,
1088                    sandbox: None,
1089                    events: Vec::new(),
1090                    data: json!({}),
1091                })
1092            }
1093            ThreadRequest::Message { thread_id, input } => {
1094                self.thread_manager.touch_message(&thread_id, &input)?;
1095                let response_id = format!("{thread_id}:{}", input.len());
1096                self.hooks
1097                    .emit(HookEvent::ResponseStart {
1098                        response_id: response_id.clone(),
1099                    })
1100                    .await;
1101                self.hooks
1102                    .emit(HookEvent::ResponseEnd {
1103                        response_id: response_id.clone(),
1104                    })
1105                    .await;
1106
1107                Ok(ThreadResponse {
1108                    thread_id,
1109                    status: "accepted".to_string(),
1110                    thread: None,
1111                    threads: Vec::new(),
1112                    goal: None,
1113                    model: None,
1114                    model_provider: None,
1115                    cwd: None,
1116                    approval_policy: None,
1117                    sandbox: None,
1118                    events: vec![
1119                        EventFrame::ResponseStart {
1120                            response_id: response_id.clone(),
1121                        },
1122                        EventFrame::ResponseDelta {
1123                            response_id: response_id.clone(),
1124                            delta: "queued".to_string(),
1125                            channel: ResponseChannel::Text,
1126                        },
1127                        EventFrame::ResponseEnd { response_id },
1128                    ],
1129                    data: json!({}),
1130                })
1131            }
1132        }
1133    }
1134
1135    /// Resolves the model for a prompt, records the message, and returns the response.
1136    pub async fn handle_prompt(
1137        &mut self,
1138        req: PromptRequest,
1139        cli_overrides: &CliRuntimeOverrides,
1140    ) -> Result<PromptResponse> {
1141        let resolved = self.config.resolve_runtime_options(cli_overrides);
1142        let requested_model = req.model.clone().unwrap_or_else(|| resolved.model.clone());
1143        let selection = self
1144            .model_registry
1145            .resolve(Some(&requested_model), Some(resolved.provider));
1146        let resolved_model = selection.resolved.id.clone();
1147        let response_id = format!("resp-{}", Uuid::new_v4());
1148
1149        self.hooks
1150            .emit(HookEvent::ResponseStart {
1151                response_id: response_id.clone(),
1152            })
1153            .await;
1154        self.hooks
1155            .emit(HookEvent::ResponseDelta {
1156                response_id: response_id.clone(),
1157                delta: "model-selected".to_string(),
1158            })
1159            .await;
1160        self.hooks
1161            .emit(HookEvent::ResponseEnd {
1162                response_id: response_id.clone(),
1163            })
1164            .await;
1165
1166        let payload = json!({
1167            "provider": resolved.provider.as_str(),
1168            "model": resolved_model.clone(),
1169            "prompt": req.prompt,
1170            "telemetry": resolved.telemetry,
1171            "base_url": resolved.base_url,
1172            "has_api_key": resolved.api_key.as_ref().is_some_and(|k| !k.trim().is_empty()),
1173            "approval_policy": resolved.approval_policy,
1174            "sandbox_mode": resolved.sandbox_mode
1175        });
1176        if let Some(thread_id) = req.thread_id.as_ref() {
1177            self.thread_manager.touch_message(thread_id, &req.prompt)?;
1178            let assistant_message_id = self.thread_manager.store.append_message(
1179                thread_id,
1180                "assistant",
1181                &payload.to_string(),
1182                Some(payload.clone()),
1183            )?;
1184            self.persist_latest_checkpoint(
1185                thread_id,
1186                "prompt_response",
1187                json!({
1188                    "response_id": response_id.clone(),
1189                    "model": resolved_model.clone(),
1190                    "provider": resolved.provider.as_str(),
1191                    "assistant_message_id": assistant_message_id
1192                }),
1193            )?;
1194        }
1195
1196        Ok(PromptResponse {
1197            output: payload.to_string(),
1198            model: resolved_model,
1199            events: vec![
1200                EventFrame::ResponseStart {
1201                    response_id: response_id.clone(),
1202                },
1203                EventFrame::ResponseDelta {
1204                    response_id: response_id.clone(),
1205                    delta: "model-selected".to_string(),
1206                    channel: ResponseChannel::Text,
1207                },
1208                EventFrame::ResponseEnd { response_id },
1209            ],
1210        })
1211    }
1212
1213    /// Evaluates execution policy and dispatches a tool call.
1214    pub async fn invoke_tool(
1215        &self,
1216        call: ToolCall,
1217        approval_mode: AskForApproval,
1218        cwd: &Path,
1219    ) -> Result<Value> {
1220        let fallback_cwd = cwd.display().to_string();
1221        let (command, policy_cwd, execution_kind) = call.execution_subject(&fallback_cwd);
1222        let policy_tool = match &call.payload {
1223            ToolPayload::LocalShell { .. } => "exec_shell",
1224            _ => call.name.as_str(),
1225        };
1226        let policy_path = permission_path_for_call(&call);
1227        let decision = self.exec_policy.check(ExecPolicyContext {
1228            command: &command,
1229            cwd: &policy_cwd,
1230            tool: Some(policy_tool),
1231            path: policy_path.as_deref(),
1232            ask_for_approval: approval_mode,
1233            sandbox_mode: None,
1234        })?;
1235        let precheck = policy_precheck_payload(&decision, &command, &policy_cwd, execution_kind);
1236        let response_id = format!("tool-{}", Uuid::new_v4());
1237        let call_id = call
1238            .raw_tool_call_id
1239            .clone()
1240            .unwrap_or_else(|| format!("tool-call-{}", Uuid::new_v4()));
1241        self.hooks
1242            .emit(HookEvent::ToolLifecycle {
1243                response_id: response_id.clone(),
1244                tool_name: call.name.clone(),
1245                phase: "precheck".to_string(),
1246                payload: precheck.clone(),
1247            })
1248            .await;
1249
1250        if !decision.allow {
1251            let reason = decision.reason().to_string();
1252            let approval_id = format!("approval-{}", Uuid::new_v4());
1253            let error_frame = EventFrame::Error {
1254                response_id: response_id.clone(),
1255                message: reason.clone(),
1256            };
1257            self.hooks
1258                .emit(HookEvent::ApprovalLifecycle {
1259                    approval_id,
1260                    phase: "denied".to_string(),
1261                    reason: Some(reason.clone()),
1262                })
1263                .await;
1264            self.hooks
1265                .emit(HookEvent::GenericEventFrame {
1266                    frame: Box::new(error_frame.clone()),
1267                })
1268                .await;
1269            return Ok(json!({
1270                "ok": false,
1271                "status": "denied",
1272                "execution_kind": execution_kind,
1273                "response_id": response_id,
1274                "precheck": precheck,
1275                "error": reason,
1276                "events": [event_frame_payload(&error_frame)],
1277            }));
1278        }
1279
1280        if decision.requires_approval {
1281            let approval_id = format!("approval-{}", Uuid::new_v4());
1282            let reason = decision.reason().to_string();
1283            let maybe_approval_frame = approval_request_frame(
1284                &decision.requirement,
1285                decision.matched_rule.as_deref(),
1286                call_id,
1287                approval_id.clone(),
1288                response_id.clone(),
1289                command.clone(),
1290                policy_cwd.clone(),
1291            );
1292            self.hooks
1293                .emit(HookEvent::ApprovalLifecycle {
1294                    approval_id: approval_id.clone(),
1295                    phase: "requested".to_string(),
1296                    reason: Some(reason.clone()),
1297                })
1298                .await;
1299            let mut events = Vec::new();
1300            if let Some(frame) = maybe_approval_frame {
1301                self.hooks
1302                    .emit(HookEvent::GenericEventFrame {
1303                        frame: Box::new(frame.clone()),
1304                    })
1305                    .await;
1306                events.push(event_frame_payload(&frame));
1307            }
1308            return Ok(json!({
1309                "ok": false,
1310                "status": "approval_required",
1311                "execution_kind": execution_kind,
1312                "response_id": response_id,
1313                "approval_id": approval_id,
1314                "precheck": precheck,
1315                "error": reason,
1316                "events": events,
1317            }));
1318        }
1319
1320        let start_frame = EventFrame::ToolCallStart {
1321            response_id: response_id.clone(),
1322            tool_name: call.name.clone(),
1323            arguments: tool_payload_value(&call.payload),
1324        };
1325        self.hooks
1326            .emit(HookEvent::GenericEventFrame {
1327                frame: Box::new(start_frame.clone()),
1328            })
1329            .await;
1330        self.hooks
1331            .emit(HookEvent::ToolLifecycle {
1332                response_id: response_id.clone(),
1333                tool_name: call.name.clone(),
1334                phase: "dispatching".to_string(),
1335                payload: json!({
1336                    "call_id": call_id,
1337                    "execution_kind": execution_kind
1338                }),
1339            })
1340            .await;
1341
1342        match self.tool_registry.dispatch(call.clone(), true).await {
1343            Ok(tool_output) => {
1344                let result_frame = EventFrame::ToolCallResult {
1345                    response_id: response_id.clone(),
1346                    tool_name: call.name.clone(),
1347                    output: tool_output_value(&tool_output),
1348                };
1349                self.hooks
1350                    .emit(HookEvent::GenericEventFrame {
1351                        frame: Box::new(result_frame.clone()),
1352                    })
1353                    .await;
1354                self.hooks
1355                    .emit(HookEvent::ToolLifecycle {
1356                        response_id: response_id.clone(),
1357                        tool_name: call.name,
1358                        phase: "completed".to_string(),
1359                        payload: json!({ "ok": true }),
1360                    })
1361                    .await;
1362                Ok(json!({
1363                    "ok": true,
1364                    "status": "completed",
1365                    "execution_kind": execution_kind,
1366                    "response_id": response_id,
1367                    "precheck": precheck,
1368                    "output": tool_output,
1369                    "events": [
1370                        event_frame_payload(&start_frame),
1371                        event_frame_payload(&result_frame)
1372                    ]
1373                }))
1374            }
1375            Err(err) => {
1376                let message = format!("{err:?}");
1377                let error_frame = EventFrame::Error {
1378                    response_id: response_id.clone(),
1379                    message: message.clone(),
1380                };
1381                self.hooks
1382                    .emit(HookEvent::GenericEventFrame {
1383                        frame: Box::new(error_frame.clone()),
1384                    })
1385                    .await;
1386                self.hooks
1387                    .emit(HookEvent::ToolLifecycle {
1388                        response_id: response_id.clone(),
1389                        tool_name: call.name,
1390                        phase: "failed".to_string(),
1391                        payload: json!({ "error": message.clone() }),
1392                    })
1393                    .await;
1394                Ok(json!({
1395                    "ok": false,
1396                    "status": "failed",
1397                    "execution_kind": execution_kind,
1398                    "response_id": response_id,
1399                    "precheck": precheck,
1400                    "error": message,
1401                    "events": [
1402                        event_frame_payload(&start_frame),
1403                        event_frame_payload(&error_frame)
1404                    ]
1405                }))
1406            }
1407        }
1408    }
1409
1410    /// Starts all configured MCP servers and emits startup events via hooks.
1411    pub async fn mcp_startup(&self) -> McpStartupCompleteEvent {
1412        let mut updates = Vec::new();
1413        let summary = self.mcp_manager.start_all(|update| {
1414            updates.push(update);
1415        });
1416        for update in updates {
1417            let status = match update.status {
1418                McpManagerStartupStatus::Starting => codewhale_protocol::McpStartupStatus::Starting,
1419                McpManagerStartupStatus::Ready => codewhale_protocol::McpStartupStatus::Ready,
1420                McpManagerStartupStatus::Failed { error } => {
1421                    codewhale_protocol::McpStartupStatus::Failed { error }
1422                }
1423                McpManagerStartupStatus::Cancelled => {
1424                    codewhale_protocol::McpStartupStatus::Cancelled
1425                }
1426            };
1427            self.hooks
1428                .emit(HookEvent::GenericEventFrame {
1429                    frame: Box::new(EventFrame::McpStartupUpdate {
1430                        update: codewhale_protocol::McpStartupUpdateEvent {
1431                            server_name: update.server_name,
1432                            status,
1433                        },
1434                    }),
1435                })
1436                .await;
1437        }
1438        self.hooks
1439            .emit(HookEvent::GenericEventFrame {
1440                frame: Box::new(EventFrame::McpStartupComplete {
1441                    summary: codewhale_protocol::McpStartupCompleteEvent {
1442                        ready: summary.ready.clone(),
1443                        failed: summary
1444                            .failed
1445                            .iter()
1446                            .map(|f| codewhale_protocol::McpStartupFailure {
1447                                server_name: f.server_name.clone(),
1448                                error: f.error.clone(),
1449                            })
1450                            .collect(),
1451                        cancelled: summary.cancelled.clone(),
1452                    },
1453                }),
1454            })
1455            .await;
1456        summary
1457    }
1458
1459    /// Returns the current application status including all jobs and their history.
1460    pub fn app_status(&self) -> AppResponse {
1461        let jobs = self.jobs.list();
1462        let events = jobs
1463            .iter()
1464            .flat_map(|job| {
1465                job.history.iter().map(|entry| EventFrame::ResponseDelta {
1466                    response_id: job.id.clone(),
1467                    delta: json!({
1468                        "kind": "job_transition",
1469                        "job_id": job.id.clone(),
1470                        "phase": entry.phase.clone(),
1471                        "status": job_status_to_str(entry.status),
1472                        "progress": entry.progress,
1473                        "detail": entry.detail.clone(),
1474                        "retry": job_retry_to_value(&entry.retry),
1475                        "at": entry.at
1476                    })
1477                    .to_string(),
1478                    channel: ResponseChannel::Text,
1479                })
1480            })
1481            .collect::<Vec<_>>();
1482        AppResponse {
1483            ok: true,
1484            data: json!({
1485                "jobs": jobs.into_iter().map(|job| {
1486                    json!({
1487                        "id": job.id,
1488                        "name": job.name,
1489                        "status": job_status_to_str(job.status),
1490                        "progress": job.progress,
1491                        "detail": job.detail,
1492                        "retry": job_retry_to_value(&job.retry),
1493                        "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
1494                    })
1495                }).collect::<Vec<_>>()
1496            }),
1497            events,
1498        }
1499    }
1500
1501    /// Returns the default model provider from the resolved configuration.
1502    pub fn provider_default(&self) -> ProviderKind {
1503        self.config.provider
1504    }
1505
1506    /// Saves a named checkpoint for a thread.
1507    pub fn save_thread_checkpoint(
1508        &self,
1509        thread_id: &str,
1510        checkpoint_id: &str,
1511        state: &Value,
1512    ) -> Result<()> {
1513        self.thread_manager
1514            .state_store()
1515            .save_checkpoint(thread_id, checkpoint_id, state)
1516    }
1517
1518    /// Loads a checkpoint for a thread. Pass `None` for the latest.
1519    pub fn load_thread_checkpoint(
1520        &self,
1521        thread_id: &str,
1522        checkpoint_id: Option<&str>,
1523    ) -> Result<Option<Value>> {
1524        Ok(self
1525            .thread_manager
1526            .state_store()
1527            .load_checkpoint(thread_id, checkpoint_id)?
1528            .map(|checkpoint| checkpoint.state))
1529    }
1530
1531    /// Enqueues a new background job and persists it immediately.
1532    pub fn enqueue_job(&mut self, name: impl Into<String>) -> Result<JobRecord> {
1533        let job = self.jobs.enqueue(name);
1534        self.jobs
1535            .persist_job(self.thread_manager.state_store(), &job.id)?;
1536        Ok(job)
1537    }
1538
1539    /// Transitions a job to running and persists the change.
1540    pub fn set_job_running(&mut self, job_id: &str) -> Result<()> {
1541        self.jobs.set_running(job_id);
1542        self.jobs
1543            .persist_job(self.thread_manager.state_store(), job_id)
1544    }
1545
1546    /// Updates a job's progress and persists the change.
1547    pub fn update_job_progress(
1548        &mut self,
1549        job_id: &str,
1550        progress: u8,
1551        detail: Option<String>,
1552    ) -> Result<()> {
1553        self.jobs.update_progress(job_id, progress, detail);
1554        self.jobs
1555            .persist_job(self.thread_manager.state_store(), job_id)
1556    }
1557
1558    /// Marks a job as completed and persists the change.
1559    pub fn complete_job(&mut self, job_id: &str) -> Result<()> {
1560        self.jobs.complete(job_id);
1561        self.jobs
1562            .persist_job(self.thread_manager.state_store(), job_id)
1563    }
1564
1565    /// Marks a job as failed and persists the change.
1566    pub fn fail_job(&mut self, job_id: &str, detail: impl Into<String>) -> Result<()> {
1567        self.jobs.fail(job_id, detail);
1568        self.jobs
1569            .persist_job(self.thread_manager.state_store(), job_id)
1570    }
1571
1572    /// Cancels a job and persists the change.
1573    pub fn cancel_job(&mut self, job_id: &str) -> Result<()> {
1574        self.jobs.cancel(job_id);
1575        self.jobs
1576            .persist_job(self.thread_manager.state_store(), job_id)
1577    }
1578
1579    /// Pauses a job and persists the change.
1580    pub fn pause_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1581        self.jobs.pause(job_id, detail);
1582        self.jobs
1583            .persist_job(self.thread_manager.state_store(), job_id)
1584    }
1585
1586    /// Resumes a paused job and persists the change.
1587    pub fn resume_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1588        self.jobs.resume(job_id, detail);
1589        self.jobs
1590            .persist_job(self.thread_manager.state_store(), job_id)
1591    }
1592
1593    /// Returns the state-transition history for a job.
1594    pub fn job_history(&self, job_id: &str) -> Vec<JobHistoryEntry> {
1595        self.jobs.history(job_id)
1596    }
1597}
1598
1599fn thread_response_from_new(status: &str, new: NewThread) -> ThreadResponse {
1600    ThreadResponse {
1601        thread_id: new.thread.id.clone(),
1602        status: status.to_string(),
1603        thread: Some(new.thread),
1604        threads: Vec::new(),
1605        goal: None,
1606        model: Some(new.model),
1607        model_provider: Some(new.model_provider),
1608        cwd: Some(new.cwd),
1609        approval_policy: new.approval_policy,
1610        sandbox: new.sandbox,
1611        events: Vec::new(),
1612        data: json!({}),
1613    }
1614}
1615
1616fn preview_from_initial_history(initial_history: &InitialHistory) -> String {
1617    match initial_history {
1618        InitialHistory::New => "New conversation".to_string(),
1619        InitialHistory::Forked(items) => truncate_preview(
1620            &items
1621                .first()
1622                .map(Value::to_string)
1623                .unwrap_or_else(|| "Forked conversation".to_string()),
1624        ),
1625        InitialHistory::Resumed { history, .. } => truncate_preview(
1626            &history
1627                .first()
1628                .map(Value::to_string)
1629                .unwrap_or_else(|| "Resumed conversation".to_string()),
1630        ),
1631    }
1632}
1633
1634fn permission_path_for_call(call: &ToolCall) -> Option<String> {
1635    match &call.payload {
1636        ToolPayload::Function { arguments } => serde_json::from_str::<Value>(arguments)
1637            .ok()
1638            .and_then(|value| {
1639                value
1640                    .get("path")
1641                    .and_then(Value::as_str)
1642                    .map(str::to_string)
1643            }),
1644        ToolPayload::Mcp { raw_arguments, .. } => raw_arguments
1645            .get("path")
1646            .and_then(Value::as_str)
1647            .map(str::to_string),
1648        ToolPayload::Custom { .. } | ToolPayload::LocalShell { .. } => None,
1649    }
1650}
1651
1652fn truncate_preview(value: &str) -> String {
1653    value.chars().take(120).collect()
1654}
1655
1656fn to_protocol_thread(thread: ThreadMetadata) -> Thread {
1657    Thread {
1658        id: thread.id,
1659        preview: thread.preview,
1660        ephemeral: thread.ephemeral,
1661        model_provider: thread.model_provider,
1662        created_at: thread.created_at,
1663        updated_at: thread.updated_at,
1664        status: match thread.status {
1665            PersistedThreadStatus::Running => ThreadStatus::Running,
1666            PersistedThreadStatus::Idle => ThreadStatus::Idle,
1667            PersistedThreadStatus::Completed => ThreadStatus::Completed,
1668            PersistedThreadStatus::Failed => ThreadStatus::Failed,
1669            PersistedThreadStatus::Paused => ThreadStatus::Paused,
1670            PersistedThreadStatus::Archived => ThreadStatus::Archived,
1671        },
1672        path: thread.path,
1673        cwd: thread.cwd,
1674        cli_version: thread.cli_version,
1675        source: match thread.source {
1676            SessionSource::Interactive => codewhale_protocol::SessionSource::Interactive,
1677            SessionSource::Resume => codewhale_protocol::SessionSource::Resume,
1678            SessionSource::Fork => codewhale_protocol::SessionSource::Fork,
1679            SessionSource::Api => codewhale_protocol::SessionSource::Api,
1680            SessionSource::Unknown => codewhale_protocol::SessionSource::Unknown,
1681        },
1682        name: thread.name,
1683    }
1684}
1685
1686fn to_protocol_goal(goal: ThreadGoalRecord) -> ThreadGoal {
1687    ThreadGoal {
1688        thread_id: goal.thread_id,
1689        goal_id: goal.goal_id,
1690        objective: goal.objective,
1691        status: to_protocol_goal_status(goal.status),
1692        token_budget: goal.token_budget,
1693        tokens_used: goal.tokens_used,
1694        time_used_seconds: goal.time_used_seconds,
1695        created_at: goal.created_at,
1696        updated_at: goal.updated_at,
1697    }
1698}
1699
1700fn to_protocol_goal_status(status: PersistedThreadGoalStatus) -> ThreadGoalStatus {
1701    match status {
1702        PersistedThreadGoalStatus::Active => ThreadGoalStatus::Active,
1703        PersistedThreadGoalStatus::Paused => ThreadGoalStatus::Paused,
1704        PersistedThreadGoalStatus::Blocked => ThreadGoalStatus::Blocked,
1705        PersistedThreadGoalStatus::UsageLimited => ThreadGoalStatus::UsageLimited,
1706        PersistedThreadGoalStatus::BudgetLimited => ThreadGoalStatus::BudgetLimited,
1707        PersistedThreadGoalStatus::Complete => ThreadGoalStatus::Complete,
1708    }
1709}
1710
1711fn to_persisted_status(status: &ThreadStatus) -> PersistedThreadStatus {
1712    match status {
1713        ThreadStatus::Running => PersistedThreadStatus::Running,
1714        ThreadStatus::Idle => PersistedThreadStatus::Idle,
1715        ThreadStatus::Completed => PersistedThreadStatus::Completed,
1716        ThreadStatus::Failed => PersistedThreadStatus::Failed,
1717        ThreadStatus::Paused => PersistedThreadStatus::Paused,
1718        ThreadStatus::Archived => PersistedThreadStatus::Archived,
1719    }
1720}
1721
1722fn to_persisted_source(source: &codewhale_protocol::SessionSource) -> SessionSource {
1723    match source {
1724        codewhale_protocol::SessionSource::Interactive => SessionSource::Interactive,
1725        codewhale_protocol::SessionSource::Resume => SessionSource::Resume,
1726        codewhale_protocol::SessionSource::Fork => SessionSource::Fork,
1727        codewhale_protocol::SessionSource::Api => SessionSource::Api,
1728        codewhale_protocol::SessionSource::Unknown => SessionSource::Unknown,
1729    }
1730}
1731
1732fn approval_request_frame(
1733    requirement: &ExecApprovalRequirement,
1734    matched_rule: Option<&str>,
1735    call_id: String,
1736    approval_id: String,
1737    turn_id: String,
1738    command: String,
1739    cwd: String,
1740) -> Option<EventFrame> {
1741    let ExecApprovalRequirement::NeedsApproval {
1742        reason,
1743        proposed_execpolicy_amendment,
1744        proposed_network_policy_amendments,
1745    } = requirement
1746    else {
1747        return None;
1748    };
1749
1750    let mut available_decisions = vec![
1751        ReviewDecision::Approved,
1752        ReviewDecision::ApprovedForSession,
1753        ReviewDecision::Denied,
1754        ReviewDecision::Abort,
1755    ];
1756    if proposed_execpolicy_amendment
1757        .as_ref()
1758        .is_some_and(|amendment| !amendment.prefixes.is_empty())
1759    {
1760        available_decisions.push(ReviewDecision::ApprovedExecpolicyAmendment);
1761    }
1762    available_decisions.extend(proposed_network_policy_amendments.iter().cloned().map(
1763        |amendment| ReviewDecision::NetworkPolicyAmendment {
1764            host: amendment.host,
1765            action: amendment.action,
1766        },
1767    ));
1768
1769    Some(EventFrame::ExecApprovalRequest {
1770        request: ExecApprovalRequestEvent {
1771            call_id,
1772            approval_id,
1773            turn_id,
1774            command,
1775            cwd,
1776            reason: reason.clone(),
1777            matched_rule: matched_rule.map(|rule| rule.to_string().into_boxed_str()),
1778            network_approval_context: None,
1779            proposed_execpolicy_amendment: proposed_execpolicy_amendment
1780                .as_ref()
1781                .map(|amendment| amendment.prefixes.clone())
1782                .unwrap_or_default(),
1783            proposed_network_policy_amendments: proposed_network_policy_amendments.clone(),
1784            additional_permissions: Vec::new(),
1785            available_decisions,
1786        },
1787    })
1788}
1789
1790fn approval_requirement_payload(requirement: &ExecApprovalRequirement) -> Value {
1791    match requirement {
1792        ExecApprovalRequirement::Skip {
1793            bypass_sandbox,
1794            proposed_execpolicy_amendment,
1795        } => json!({
1796            "type": "skip",
1797            "bypass_sandbox": bypass_sandbox,
1798            "reason": requirement.reason(),
1799            "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1800                .as_ref()
1801                .map(|amendment| amendment.prefixes.clone())
1802                .unwrap_or_default()
1803        }),
1804        ExecApprovalRequirement::NeedsApproval {
1805            reason,
1806            proposed_execpolicy_amendment,
1807            proposed_network_policy_amendments,
1808        } => json!({
1809            "type": "needs_approval",
1810            "reason": reason,
1811            "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1812                .as_ref()
1813                .map(|amendment| amendment.prefixes.clone())
1814                .unwrap_or_default(),
1815            "proposed_network_policy_amendments": proposed_network_policy_amendments
1816        }),
1817        ExecApprovalRequirement::Forbidden { reason } => json!({
1818            "type": "forbidden",
1819            "reason": reason
1820        }),
1821    }
1822}
1823
1824fn policy_precheck_payload(
1825    decision: &ExecPolicyDecision,
1826    command: &str,
1827    cwd: &str,
1828    execution_kind: &str,
1829) -> Value {
1830    json!({
1831        "execution_kind": execution_kind,
1832        "command": command,
1833        "cwd": cwd,
1834        "allow": decision.allow,
1835        "requires_approval": decision.requires_approval,
1836        "matched_rule": decision.matched_rule.clone(),
1837        "phase": decision.requirement.phase(),
1838        "reason": decision.reason(),
1839        "requirement": approval_requirement_payload(&decision.requirement)
1840    })
1841}
1842
1843fn tool_payload_value(payload: &ToolPayload) -> Value {
1844    serde_json::to_value(payload).unwrap_or_else(
1845        |_| json!({"type":"serialization_error","message":"tool payload unavailable"}),
1846    )
1847}
1848
1849fn tool_output_value(output: &codewhale_protocol::ToolOutput) -> Value {
1850    serde_json::to_value(output).unwrap_or_else(
1851        |_| json!({"type":"serialization_error","message":"tool output unavailable"}),
1852    )
1853}
1854
1855fn event_frame_payload(frame: &EventFrame) -> Value {
1856    serde_json::to_value(frame)
1857        .unwrap_or_else(|_| json!({"event":"error","message":"failed to encode event frame"}))
1858}
1859
1860fn json_optional_string(value: &Value) -> Option<String> {
1861    if value.is_null() {
1862        None
1863    } else {
1864        value.as_str().map(ToString::to_string)
1865    }
1866}
1867
1868fn parse_retry_metadata(value: Option<&Value>) -> JobRetryMetadata {
1869    let Some(value) = value else {
1870        return JobRetryMetadata::default();
1871    };
1872    JobRetryMetadata {
1873        attempt: value
1874            .get("attempt")
1875            .and_then(Value::as_u64)
1876            .unwrap_or(0)
1877            .min(u32::MAX as u64) as u32,
1878        max_attempts: value
1879            .get("max_attempts")
1880            .and_then(Value::as_u64)
1881            .unwrap_or(DEFAULT_JOB_MAX_ATTEMPTS as u64)
1882            .min(u32::MAX as u64) as u32,
1883        backoff_base_ms: value
1884            .get("backoff_base_ms")
1885            .and_then(Value::as_u64)
1886            .unwrap_or(DEFAULT_JOB_BACKOFF_BASE_MS),
1887        next_backoff_ms: value
1888            .get("next_backoff_ms")
1889            .and_then(Value::as_u64)
1890            .unwrap_or(0),
1891        next_retry_at: value.get("next_retry_at").and_then(Value::as_i64),
1892    }
1893}
1894
1895fn parse_history_entry(value: &Value) -> Option<JobHistoryEntry> {
1896    let status = value
1897        .get("status")
1898        .and_then(Value::as_str)
1899        .and_then(job_status_from_str)?;
1900    Some(JobHistoryEntry {
1901        at: value.get("at").and_then(Value::as_i64).unwrap_or(0),
1902        phase: value
1903            .get("phase")
1904            .and_then(Value::as_str)
1905            .unwrap_or("unknown")
1906            .to_string(),
1907        status,
1908        progress: value
1909            .get("progress")
1910            .and_then(Value::as_u64)
1911            .map(|v| v.min(u8::MAX as u64) as u8),
1912        detail: value.get("detail").and_then(json_optional_string),
1913        retry: parse_retry_metadata(value.get("retry")),
1914    })
1915}
1916
1917fn job_status_to_str(status: JobStatus) -> &'static str {
1918    match status {
1919        JobStatus::Queued => "queued",
1920        JobStatus::Running => "running",
1921        JobStatus::Paused => "paused",
1922        JobStatus::Completed => "completed",
1923        JobStatus::Failed => "failed",
1924        JobStatus::Cancelled => "cancelled",
1925    }
1926}
1927
1928fn job_status_from_str(value: &str) -> Option<JobStatus> {
1929    match value {
1930        "queued" => Some(JobStatus::Queued),
1931        "running" => Some(JobStatus::Running),
1932        "paused" => Some(JobStatus::Paused),
1933        "completed" => Some(JobStatus::Completed),
1934        "failed" => Some(JobStatus::Failed),
1935        "cancelled" => Some(JobStatus::Cancelled),
1936        _ => None,
1937    }
1938}
1939
1940fn job_retry_to_value(retry: &JobRetryMetadata) -> Value {
1941    json!({
1942        "attempt": retry.attempt,
1943        "max_attempts": retry.max_attempts,
1944        "backoff_base_ms": retry.backoff_base_ms,
1945        "next_backoff_ms": retry.next_backoff_ms,
1946        "next_retry_at": retry.next_retry_at
1947    })
1948}
1949
1950fn job_history_to_value(entry: &JobHistoryEntry) -> Value {
1951    json!({
1952        "at": entry.at,
1953        "phase": entry.phase.clone(),
1954        "status": job_status_to_str(entry.status),
1955        "progress": entry.progress,
1956        "detail": entry.detail.clone(),
1957        "retry": job_retry_to_value(&entry.retry)
1958    })
1959}
1960
1961fn runtime_status_to_job_state(status: JobStatus) -> JobStateStatus {
1962    match status {
1963        JobStatus::Queued => JobStateStatus::Queued,
1964        JobStatus::Running => JobStateStatus::Running,
1965        JobStatus::Paused => JobStateStatus::Running,
1966        JobStatus::Completed => JobStateStatus::Completed,
1967        JobStatus::Failed => JobStateStatus::Failed,
1968        JobStatus::Cancelled => JobStateStatus::Cancelled,
1969    }
1970}
1971
1972fn job_state_status_to_runtime(status: JobStateStatus) -> JobStatus {
1973    match status {
1974        JobStateStatus::Queued => JobStatus::Queued,
1975        JobStateStatus::Running => JobStatus::Running,
1976        JobStateStatus::Completed => JobStatus::Completed,
1977        JobStateStatus::Failed => JobStatus::Failed,
1978        JobStateStatus::Cancelled => JobStatus::Cancelled,
1979    }
1980}
1981
1982#[cfg(test)]
1983mod tests {
1984    use super::*;
1985    use codewhale_tools::ToolCallSource;
1986
1987    // ── JobManager: lifecycle ──────────────────────────────────────────
1988
1989    #[test]
1990    fn permission_path_for_call_extracts_function_path_argument() {
1991        let call = ToolCall {
1992            name: "read_file".to_string(),
1993            payload: ToolPayload::Function {
1994                arguments: json!({ "path": "README.md" }).to_string(),
1995            },
1996            source: ToolCallSource::Direct,
1997            raw_tool_call_id: None,
1998        };
1999
2000        assert_eq!(
2001            permission_path_for_call(&call).as_deref(),
2002            Some("README.md")
2003        );
2004    }
2005
2006    #[test]
2007    fn permission_path_for_call_extracts_mcp_path_argument() {
2008        let call = ToolCall {
2009            name: "mcp_fs_read".to_string(),
2010            payload: ToolPayload::Mcp {
2011                server: "fs".to_string(),
2012                tool: "read".to_string(),
2013                raw_arguments: json!({ "path": "secrets/token.txt" }),
2014                raw_tool_call_id: None,
2015            },
2016            source: ToolCallSource::Direct,
2017            raw_tool_call_id: None,
2018        };
2019
2020        assert_eq!(
2021            permission_path_for_call(&call).as_deref(),
2022            Some("secrets/token.txt")
2023        );
2024    }
2025
2026    #[test]
2027    fn permission_path_for_call_ignores_shell_payload() {
2028        let call = ToolCall {
2029            name: "exec_shell".to_string(),
2030            payload: ToolPayload::LocalShell {
2031                params: codewhale_protocol::LocalShellParams {
2032                    command: "cargo test".to_string(),
2033                    cwd: None,
2034                    timeout_ms: None,
2035                },
2036            },
2037            source: ToolCallSource::Direct,
2038            raw_tool_call_id: None,
2039        };
2040
2041        assert_eq!(permission_path_for_call(&call), None);
2042    }
2043
2044    #[test]
2045    fn approval_request_frame_includes_matched_rule() {
2046        let requirement = ExecApprovalRequirement::NeedsApproval {
2047            reason: "Typed ask rule 'tool=exec_shell command=cargo test' requires approval."
2048                .to_string(),
2049            proposed_execpolicy_amendment: None,
2050            proposed_network_policy_amendments: Vec::new(),
2051        };
2052
2053        let frame = approval_request_frame(
2054            &requirement,
2055            Some("tool=exec_shell command=cargo test"),
2056            "call-1".to_string(),
2057            "approval-1".to_string(),
2058            "turn-1".to_string(),
2059            "cargo test --workspace".to_string(),
2060            "/repo".to_string(),
2061        )
2062        .expect("approval frame");
2063
2064        let EventFrame::ExecApprovalRequest { request } = frame else {
2065            panic!("expected exec approval request frame");
2066        };
2067        assert_eq!(
2068            request.matched_rule.as_deref(),
2069            Some("tool=exec_shell command=cargo test")
2070        );
2071        assert_eq!(request.reason, requirement.reason());
2072    }
2073
2074    #[test]
2075    fn enqueue_creates_queued_job_with_zero_progress() {
2076        let mut jm = JobManager::default();
2077        let job = jm.enqueue("build");
2078        assert_eq!(job.name, "build");
2079        assert_eq!(job.status, JobStatus::Queued);
2080        assert_eq!(job.progress, Some(0));
2081        assert!(job.detail.is_none());
2082        assert_eq!(job.history.len(), 1);
2083        assert_eq!(job.history[0].phase, "created");
2084    }
2085
2086    #[test]
2087    fn set_running_transitions_from_queued() {
2088        let mut jm = JobManager::default();
2089        let job = jm.enqueue("deploy");
2090        let id = job.id.clone();
2091        jm.set_running(&id);
2092        let jobs = jm.list();
2093        let updated = jobs.iter().find(|j| j.id == id).unwrap();
2094        assert_eq!(updated.status, JobStatus::Running);
2095        assert_eq!(updated.history.last().unwrap().phase, "running");
2096    }
2097
2098    #[test]
2099    fn update_progress_clamps_to_100() {
2100        let mut jm = JobManager::default();
2101        let job = jm.enqueue("task");
2102        let id = job.id.clone();
2103        jm.update_progress(&id, 150, Some("over".to_string()));
2104        let jobs = jm.list();
2105        let updated = jobs.iter().find(|j| j.id == id).unwrap();
2106        assert_eq!(updated.progress, Some(100));
2107    }
2108
2109    #[test]
2110    fn complete_sets_progress_to_100() {
2111        let mut jm = JobManager::default();
2112        let job = jm.enqueue("task");
2113        let id = job.id.clone();
2114        jm.set_running(&id);
2115        jm.complete(&id);
2116        let jobs = jm.list();
2117        let updated = jobs.iter().find(|j| j.id == id).unwrap();
2118        assert_eq!(updated.status, JobStatus::Completed);
2119        assert_eq!(updated.progress, Some(100));
2120    }
2121
2122    #[test]
2123    fn fail_increments_attempt_and_sets_backoff() {
2124        let mut jm = JobManager::default();
2125        let job = jm.enqueue("fragile");
2126        let id = job.id.clone();
2127        jm.set_running(&id);
2128        jm.fail(&id, "crashed");
2129        let jobs = jm.list();
2130        let updated = jobs.iter().find(|j| j.id == id).unwrap();
2131        assert_eq!(updated.status, JobStatus::Failed);
2132        assert_eq!(updated.retry.attempt, 1);
2133        assert!(updated.retry.next_backoff_ms > 0);
2134        assert!(updated.retry.next_retry_at.is_some());
2135        assert_eq!(updated.detail.as_deref(), Some("crashed"));
2136    }
2137
2138    #[test]
2139    fn fail_clears_retry_after_max_attempts() {
2140        let mut jm = JobManager::default();
2141        let job = jm.enqueue("fragile");
2142        let id = job.id.clone();
2143        for _ in 0..=DEFAULT_JOB_MAX_ATTEMPTS {
2144            jm.set_running(&id);
2145            jm.fail(&id, "boom");
2146        }
2147        let jobs = jm.list();
2148        let updated = jobs.iter().find(|j| j.id == id).unwrap();
2149        assert_eq!(updated.retry.attempt, DEFAULT_JOB_MAX_ATTEMPTS);
2150        assert_eq!(updated.retry.next_backoff_ms, 0);
2151        assert!(updated.retry.next_retry_at.is_none());
2152    }
2153
2154    #[test]
2155    fn cancel_sets_status_and_clears_retry() {
2156        let mut jm = JobManager::default();
2157        let job = jm.enqueue("task");
2158        let id = job.id.clone();
2159        jm.cancel(&id);
2160        let jobs = jm.list();
2161        let updated = jobs.iter().find(|j| j.id == id).unwrap();
2162        assert_eq!(updated.status, JobStatus::Cancelled);
2163        assert_eq!(updated.retry.next_backoff_ms, 0);
2164    }
2165
2166    #[test]
2167    fn pause_and_resume_round_trip() {
2168        let mut jm = JobManager::default();
2169        let job = jm.enqueue("task");
2170        let id = job.id.clone();
2171        jm.set_running(&id);
2172        jm.pause(&id, Some("waiting".to_string()));
2173        let jobs = jm.list();
2174        let paused = jobs.iter().find(|j| j.id == id).unwrap();
2175        assert_eq!(paused.status, JobStatus::Paused);
2176        assert_eq!(paused.detail.as_deref(), Some("waiting"));
2177
2178        jm.resume(&id, None);
2179        let jobs = jm.list();
2180        let resumed = jobs.iter().find(|j| j.id == id).unwrap();
2181        assert_eq!(resumed.status, JobStatus::Running);
2182        assert_eq!(resumed.history.last().unwrap().phase, "resumed");
2183    }
2184
2185    #[test]
2186    fn list_returns_jobs_sorted_by_updated_at_desc() {
2187        let mut jm = JobManager::default();
2188        jm.enqueue("first");
2189        jm.enqueue("second");
2190        jm.enqueue("third");
2191        let jobs = jm.list();
2192        assert_eq!(jobs.len(), 3);
2193        for window in jobs.windows(2) {
2194            assert!(window[0].updated_at >= window[1].updated_at);
2195        }
2196    }
2197
2198    #[test]
2199    fn history_returns_entries_for_existing_job() {
2200        let mut jm = JobManager::default();
2201        let job = jm.enqueue("task");
2202        let id = job.id.clone();
2203        jm.set_running(&id);
2204        jm.complete(&id);
2205        let history = jm.history(&id);
2206        assert_eq!(history.len(), 3); // created, running, completed
2207        assert_eq!(history[0].phase, "created");
2208        assert_eq!(history[1].phase, "running");
2209        assert_eq!(history[2].phase, "completed");
2210    }
2211
2212    #[test]
2213    fn history_returns_empty_for_unknown_job() {
2214        let jm = JobManager::default();
2215        assert!(jm.history("nonexistent").is_empty());
2216    }
2217
2218    #[test]
2219    fn resume_pending_requeues_running_and_queued() {
2220        let mut jm = JobManager::default();
2221        let _j1 = jm.enqueue("queued_task");
2222        let j2 = jm.enqueue("running_task");
2223        let j3 = jm.enqueue("completed_task");
2224        let id2 = j2.id.clone();
2225        let id3 = j3.id.clone();
2226        jm.set_running(&id2);
2227        jm.set_running(&id3);
2228        jm.complete(&id3);
2229
2230        let resumed = jm.resume_pending();
2231        assert_eq!(resumed.len(), 2);
2232        for job in &resumed {
2233            assert_eq!(job.status, JobStatus::Queued);
2234        }
2235    }
2236
2237    // ── JobManager: backoff ────────────────────────────────────────────
2238
2239    #[test]
2240    fn deterministic_backoff_zero_on_first_attempt() {
2241        let retry = JobRetryMetadata {
2242            attempt: 0,
2243            ..Default::default()
2244        };
2245        assert_eq!(JobManager::deterministic_backoff_ms(&retry), 0);
2246    }
2247
2248    #[test]
2249    fn deterministic_backoff_exponential_growth() {
2250        let base = DEFAULT_JOB_BACKOFF_BASE_MS;
2251        for attempt in 1..=5 {
2252            let retry = JobRetryMetadata {
2253                attempt,
2254                backoff_base_ms: base,
2255                ..Default::default()
2256            };
2257            let expected = base * 2u64.pow(attempt.saturating_sub(1).min(20));
2258            assert_eq!(
2259                JobManager::deterministic_backoff_ms(&retry),
2260                expected,
2261                "attempt {attempt}"
2262            );
2263        }
2264    }
2265
2266    #[test]
2267    fn deterministic_backoff_saturates_at_high_exponent() {
2268        let retry = JobRetryMetadata {
2269            attempt: 63,
2270            backoff_base_ms: 1000,
2271            ..Default::default()
2272        };
2273        // Should not panic; result saturates
2274        let _ = JobManager::deterministic_backoff_ms(&retry);
2275    }
2276
2277    // ── JobManager: history truncation ─────────────────────────────────
2278
2279    #[test]
2280    fn push_history_truncates_beyond_max() {
2281        let mut jm = JobManager::default();
2282        let job = jm.enqueue("task");
2283        let id = job.id.clone();
2284        // Generate more history entries than the limit
2285        for i in 0..(MAX_JOB_HISTORY_ENTRIES + 20) {
2286            jm.update_progress(&id, (i % 100) as u8, Some(format!("step {i}")));
2287        }
2288        let history = jm.history(&id);
2289        assert_eq!(history.len(), MAX_JOB_HISTORY_ENTRIES);
2290    }
2291
2292    // ── JobManager: persistence encoding/parsing ───────────────────────
2293
2294    #[test]
2295    fn encode_and_parse_persisted_detail_round_trip() {
2296        let mut jm = JobManager::default();
2297        let job = jm.enqueue("task");
2298        let id = job.id.clone();
2299        jm.set_running(&id);
2300        jm.fail(&id, "oops");
2301        let job = jm.list().into_iter().find(|j| j.id == id).unwrap();
2302
2303        let encoded = JobManager::encode_persisted_detail(&job).unwrap().unwrap();
2304        let parsed = JobManager::parse_persisted_detail(Some(&encoded)).unwrap();
2305
2306        assert_eq!(parsed.status, job.status);
2307        assert_eq!(parsed.detail, job.detail);
2308        assert_eq!(parsed.retry.attempt, job.retry.attempt);
2309        assert_eq!(parsed.history.len(), job.history.len());
2310    }
2311
2312    #[test]
2313    fn parse_persisted_detail_returns_none_for_none_input() {
2314        assert!(JobManager::parse_persisted_detail(None).is_none());
2315    }
2316
2317    #[test]
2318    fn parse_persisted_detail_returns_none_for_invalid_json() {
2319        assert!(JobManager::parse_persisted_detail(Some("not json")).is_none());
2320    }
2321
2322    // ── Helper functions ───────────────────────────────────────────────
2323
2324    #[test]
2325    fn job_status_round_trip_str() {
2326        let statuses = [
2327            JobStatus::Queued,
2328            JobStatus::Running,
2329            JobStatus::Paused,
2330            JobStatus::Completed,
2331            JobStatus::Failed,
2332            JobStatus::Cancelled,
2333        ];
2334        for status in &statuses {
2335            let s = job_status_to_str(*status);
2336            let parsed = job_status_from_str(s);
2337            assert_eq!(parsed, Some(*status), "round-trip failed for {s:?}");
2338        }
2339    }
2340
2341    #[test]
2342    fn job_status_from_str_returns_none_for_unknown() {
2343        assert_eq!(job_status_from_str("unknown"), None);
2344        assert_eq!(job_status_from_str(""), None);
2345    }
2346
2347    #[test]
2348    fn truncate_preview_limits_to_120_chars() {
2349        let long = "a".repeat(200);
2350        let truncated = truncate_preview(&long);
2351        assert_eq!(truncated.len(), 120);
2352    }
2353
2354    #[test]
2355    fn truncate_preview_preserves_short_strings() {
2356        let short = "hello";
2357        assert_eq!(truncate_preview(short), "hello");
2358    }
2359
2360    #[test]
2361    fn runtime_status_to_job_state_maps_correctly() {
2362        assert_eq!(
2363            runtime_status_to_job_state(JobStatus::Queued),
2364            JobStateStatus::Queued
2365        );
2366        assert_eq!(
2367            runtime_status_to_job_state(JobStatus::Running),
2368            JobStateStatus::Running
2369        );
2370        assert_eq!(
2371            runtime_status_to_job_state(JobStatus::Paused),
2372            JobStateStatus::Running
2373        );
2374        assert_eq!(
2375            runtime_status_to_job_state(JobStatus::Completed),
2376            JobStateStatus::Completed
2377        );
2378        assert_eq!(
2379            runtime_status_to_job_state(JobStatus::Failed),
2380            JobStateStatus::Failed
2381        );
2382        assert_eq!(
2383            runtime_status_to_job_state(JobStatus::Cancelled),
2384            JobStateStatus::Cancelled
2385        );
2386    }
2387
2388    #[test]
2389    fn job_state_status_to_runtime_maps_correctly() {
2390        assert_eq!(
2391            job_state_status_to_runtime(JobStateStatus::Queued),
2392            JobStatus::Queued
2393        );
2394        assert_eq!(
2395            job_state_status_to_runtime(JobStateStatus::Running),
2396            JobStatus::Running
2397        );
2398        assert_eq!(
2399            job_state_status_to_runtime(JobStateStatus::Completed),
2400            JobStatus::Completed
2401        );
2402        assert_eq!(
2403            job_state_status_to_runtime(JobStateStatus::Failed),
2404            JobStatus::Failed
2405        );
2406        assert_eq!(
2407            job_state_status_to_runtime(JobStateStatus::Cancelled),
2408            JobStatus::Cancelled
2409        );
2410    }
2411
2412    #[test]
2413    fn preview_from_initial_history_new() {
2414        let preview = preview_from_initial_history(&InitialHistory::New);
2415        assert_eq!(preview, "New conversation");
2416    }
2417
2418    #[test]
2419    fn preview_from_initial_history_forked() {
2420        let preview = preview_from_initial_history(&InitialHistory::Forked(vec![json!("hello")]));
2421        assert!(preview.contains("hello"));
2422    }
2423
2424    #[test]
2425    fn preview_from_initial_history_resumed() {
2426        let preview = preview_from_initial_history(&InitialHistory::Resumed {
2427            conversation_id: "test".to_string(),
2428            history: vec![json!("world")],
2429            rollout_path: PathBuf::from("/tmp/test"),
2430        });
2431        assert!(preview.contains("world"));
2432    }
2433
2434    #[test]
2435    fn json_optional_string_handles_null() {
2436        assert!(json_optional_string(&Value::Null).is_none());
2437    }
2438
2439    #[test]
2440    fn json_optional_string_handles_string() {
2441        assert_eq!(
2442            json_optional_string(&Value::String("hello".to_string())),
2443            Some("hello".to_string())
2444        );
2445    }
2446
2447    #[test]
2448    fn json_optional_string_handles_non_string() {
2449        assert!(json_optional_string(&json!(42)).is_none());
2450    }
2451
2452    #[test]
2453    fn parse_retry_metadata_returns_default_for_none() {
2454        let retry = parse_retry_metadata(None);
2455        assert_eq!(retry.attempt, 0);
2456        assert_eq!(retry.max_attempts, DEFAULT_JOB_MAX_ATTEMPTS);
2457        assert_eq!(retry.backoff_base_ms, DEFAULT_JOB_BACKOFF_BASE_MS);
2458    }
2459
2460    #[test]
2461    fn parse_retry_metadata_parses_fields() {
2462        let value = json!({
2463            "attempt": 2,
2464            "max_attempts": 5,
2465            "backoff_base_ms": 1000,
2466            "next_backoff_ms": 2000,
2467            "next_retry_at": 1234567890i64
2468        });
2469        let retry = parse_retry_metadata(Some(&value));
2470        assert_eq!(retry.attempt, 2);
2471        assert_eq!(retry.max_attempts, 5);
2472        assert_eq!(retry.backoff_base_ms, 1000);
2473        assert_eq!(retry.next_backoff_ms, 2000);
2474        assert_eq!(retry.next_retry_at, Some(1234567890));
2475    }
2476
2477    #[test]
2478    fn parse_history_entry_returns_none_without_status() {
2479        let value = json!({"at": 1, "phase": "test"});
2480        assert!(parse_history_entry(&value).is_none());
2481    }
2482
2483    #[test]
2484    fn parse_history_entry_parses_valid_entry() {
2485        let value = json!({
2486            "at": 100,
2487            "phase": "running",
2488            "status": "running",
2489            "progress": 50,
2490            "detail": "working",
2491            "retry": {"attempt": 0, "max_attempts": 3, "backoff_base_ms": 500}
2492        });
2493        let entry = parse_history_entry(&value).unwrap();
2494        assert_eq!(entry.at, 100);
2495        assert_eq!(entry.phase, "running");
2496        assert_eq!(entry.status, JobStatus::Running);
2497        assert_eq!(entry.progress, Some(50));
2498        assert_eq!(entry.detail.as_deref(), Some("working"));
2499    }
2500}