Skip to main content

codewhale_core/
lib.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use anyhow::Result;
6use codewhale_agent::ModelRegistry;
7use codewhale_config::{CliRuntimeOverrides, ConfigToml, ProviderKind};
8use codewhale_execpolicy::{
9    AskForApproval, ExecApprovalRequirement, ExecPolicyContext, ExecPolicyDecision,
10    ExecPolicyEngine,
11};
12use codewhale_hooks::{HookDispatcher, HookEvent};
13use codewhale_mcp::{
14    McpManager, McpStartupCompleteEvent, McpStartupStatus as McpManagerStartupStatus,
15};
16use codewhale_protocol::{
17    AppResponse, EventFrame, ExecApprovalRequestEvent, PromptRequest, PromptResponse,
18    ResponseChannel, ReviewDecision, Thread, ThreadForkParams, ThreadListParams, ThreadReadParams,
19    ThreadRequest, ThreadResponse, ThreadResumeParams, ThreadSetNameParams, ThreadStatus,
20    ToolPayload,
21};
22use codewhale_state::{
23    JobStateRecord, JobStateStatus, SessionSource, StateStore, ThreadListFilters, ThreadMetadata,
24    ThreadStatus as PersistedThreadStatus,
25};
26use codewhale_tools::{ToolCall, ToolRegistry};
27use serde_json::{Value, json};
28use uuid::Uuid;
29
30/// How a new thread's conversation history is initialized.
31#[derive(Debug, Clone)]
32pub enum InitialHistory {
33    /// Start with an empty conversation.
34    New,
35    /// Forked from an existing thread with the given history items.
36    Forked(Vec<Value>),
37    /// Resumed from a persisted thread with its full history.
38    Resumed {
39        conversation_id: String,
40        history: Vec<Value>,
41        rollout_path: PathBuf,
42    },
43}
44
45/// Result of spawning or resuming a thread.
46#[derive(Debug, Clone)]
47pub struct NewThread {
48    /// The thread metadata.
49    pub thread: Thread,
50    /// Resolved model identifier.
51    pub model: String,
52    /// Provider that serves the model.
53    pub model_provider: String,
54    /// Working directory for the thread.
55    pub cwd: PathBuf,
56    /// Approval policy override, if any.
57    pub approval_policy: Option<String>,
58    /// Sandbox mode override, if any.
59    pub sandbox: Option<String>,
60}
61
62/// Status of a background job.
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum JobStatus {
65    /// Waiting to be picked up.
66    Queued,
67    /// Currently executing.
68    Running,
69    /// Temporarily paused.
70    Paused,
71    /// Finished successfully.
72    Completed,
73    /// Finished with an error.
74    Failed,
75    /// Cancelled by the user.
76    Cancelled,
77}
78
79const JOB_DETAIL_SCHEMA_VERSION: u8 = 1;
80const DEFAULT_JOB_MAX_ATTEMPTS: u32 = 3;
81const DEFAULT_JOB_BACKOFF_BASE_MS: u64 = 500;
82const MAX_JOB_HISTORY_ENTRIES: usize = 64;
83
84/// Retry state for a job that failed and may be retried.
85#[derive(Debug, Clone)]
86pub struct JobRetryMetadata {
87    /// Current attempt number (0 = not yet retried).
88    pub attempt: u32,
89    /// Maximum number of retry attempts before giving up.
90    pub max_attempts: u32,
91    /// Base delay in milliseconds for exponential backoff.
92    pub backoff_base_ms: u64,
93    /// Computed delay in milliseconds until the next retry.
94    pub next_backoff_ms: u64,
95    /// Timestamp when the next retry should be attempted.
96    pub next_retry_at: Option<i64>,
97}
98
99impl Default for JobRetryMetadata {
100    fn default() -> Self {
101        Self {
102            attempt: 0,
103            max_attempts: DEFAULT_JOB_MAX_ATTEMPTS,
104            backoff_base_ms: DEFAULT_JOB_BACKOFF_BASE_MS,
105            next_backoff_ms: 0,
106            next_retry_at: None,
107        }
108    }
109}
110
111/// A single entry in a job's history log.
112#[derive(Debug, Clone)]
113pub struct JobHistoryEntry {
114    /// Timestamp when this entry was recorded.
115    pub at: i64,
116    /// Phase name (e.g., "created", "running", "failed").
117    pub phase: String,
118    /// Job status at this point in time.
119    pub status: JobStatus,
120    /// Progress percentage at this point, if available.
121    pub progress: Option<u8>,
122    /// Human-readable detail message.
123    pub detail: Option<String>,
124    /// Retry state snapshot at this point.
125    pub retry: JobRetryMetadata,
126}
127
128#[derive(Debug, Clone)]
129struct PersistedJobDetail {
130    pub status: JobStatus,
131    pub detail: Option<String>,
132    pub retry: JobRetryMetadata,
133    pub history: Vec<JobHistoryEntry>,
134}
135
136/// A complete job record with all metadata and history.
137#[derive(Debug, Clone)]
138pub struct JobRecord {
139    /// Unique job identifier.
140    pub id: String,
141    /// Human-readable job name.
142    pub name: String,
143    /// Current job status.
144    pub status: JobStatus,
145    /// Current progress percentage (0-100).
146    pub progress: Option<u8>,
147    /// Human-readable detail about the current state.
148    pub detail: Option<String>,
149    /// Retry state for failed jobs.
150    pub retry: JobRetryMetadata,
151    /// Chronological history of state transitions.
152    pub history: Vec<JobHistoryEntry>,
153    /// Timestamp when the job was created.
154    pub created_at: i64,
155    /// Timestamp of the last state change.
156    pub updated_at: i64,
157}
158
159/// Manages background jobs with retry logic and persistence.
160#[derive(Debug, Default)]
161pub struct JobManager {
162    jobs: HashMap<String, JobRecord>,
163}
164
165impl JobManager {
166    fn now_ts() -> i64 {
167        chrono::Utc::now().timestamp()
168    }
169
170    fn deterministic_backoff_ms(retry: &JobRetryMetadata) -> u64 {
171        if retry.attempt == 0 {
172            return 0;
173        }
174        let exponent = retry.attempt.saturating_sub(1).min(20);
175        let multiplier = 1u64.checked_shl(exponent).unwrap_or(u64::MAX);
176        retry.backoff_base_ms.saturating_mul(multiplier)
177    }
178
179    fn clear_retry_schedule(retry: &mut JobRetryMetadata) {
180        retry.next_backoff_ms = 0;
181        retry.next_retry_at = None;
182    }
183
184    fn push_history(job: &mut JobRecord, phase: &str) {
185        job.history.push(JobHistoryEntry {
186            at: job.updated_at,
187            phase: phase.to_string(),
188            status: job.status,
189            progress: job.progress,
190            detail: job.detail.clone(),
191            retry: job.retry.clone(),
192        });
193        if job.history.len() > MAX_JOB_HISTORY_ENTRIES {
194            let to_drain = job.history.len() - MAX_JOB_HISTORY_ENTRIES;
195            job.history.drain(0..to_drain);
196        }
197    }
198
199    fn parse_persisted_detail(raw: Option<&str>) -> Option<PersistedJobDetail> {
200        let raw = raw?;
201        let parsed: Value = serde_json::from_str(raw).ok()?;
202        let status = parsed
203            .get("status")
204            .and_then(Value::as_str)
205            .and_then(job_status_from_str)?;
206        let detail = parsed.get("detail").and_then(json_optional_string);
207        let retry = parse_retry_metadata(parsed.get("retry"));
208        let history = parsed
209            .get("history")
210            .and_then(Value::as_array)
211            .map(|items| {
212                items
213                    .iter()
214                    .filter_map(parse_history_entry)
215                    .collect::<Vec<_>>()
216            })
217            .unwrap_or_default();
218        Some(PersistedJobDetail {
219            status,
220            detail,
221            retry,
222            history,
223        })
224    }
225
226    fn encode_persisted_detail(job: &JobRecord) -> Result<Option<String>> {
227        let encoded = json!({
228            "schema_version": JOB_DETAIL_SCHEMA_VERSION,
229            "status": job_status_to_str(job.status),
230            "detail": job.detail.clone(),
231            "retry": job_retry_to_value(&job.retry),
232            "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
233        })
234        .to_string();
235        Ok(Some(encoded))
236    }
237
238    /// Enqueues a new job and returns its record.
239    pub fn enqueue(&mut self, name: impl Into<String>) -> JobRecord {
240        let now = Self::now_ts();
241        let id = format!("job-{}", Uuid::new_v4());
242        let mut job = JobRecord {
243            id: id.clone(),
244            name: name.into(),
245            status: JobStatus::Queued,
246            progress: Some(0),
247            detail: None,
248            retry: JobRetryMetadata::default(),
249            history: Vec::new(),
250            created_at: now,
251            updated_at: now,
252        };
253        Self::push_history(&mut job, "created");
254        self.jobs.insert(id, job.clone());
255        job
256    }
257
258    /// Transitions a job to running and clears its retry schedule.
259    pub fn set_running(&mut self, id: &str) {
260        if let Some(job) = self.jobs.get_mut(id) {
261            job.status = JobStatus::Running;
262            Self::clear_retry_schedule(&mut job.retry);
263            job.updated_at = Self::now_ts();
264            Self::push_history(job, "running");
265        }
266    }
267
268    /// Updates a job's progress (clamped to 100) and optional detail message.
269    pub fn update_progress(&mut self, id: &str, progress: u8, detail: Option<String>) {
270        if let Some(job) = self.jobs.get_mut(id) {
271            job.progress = Some(progress.min(100));
272            job.detail = detail;
273            job.updated_at = Self::now_ts();
274            Self::push_history(job, "progress_updated");
275        }
276    }
277
278    /// Marks a job as completed with 100% progress and clears its retry schedule.
279    pub fn complete(&mut self, id: &str) {
280        if let Some(job) = self.jobs.get_mut(id) {
281            job.status = JobStatus::Completed;
282            job.progress = Some(100);
283            Self::clear_retry_schedule(&mut job.retry);
284            job.updated_at = Self::now_ts();
285            Self::push_history(job, "completed");
286        }
287    }
288
289    /// Marks a job as failed and schedules a retry if attempts remain.
290    pub fn fail(&mut self, id: &str, detail: impl Into<String>) {
291        if let Some(job) = self.jobs.get_mut(id) {
292            let now = Self::now_ts();
293            job.status = JobStatus::Failed;
294            job.detail = Some(detail.into());
295            if job.retry.attempt < job.retry.max_attempts {
296                job.retry.attempt += 1;
297                job.retry.next_backoff_ms = Self::deterministic_backoff_ms(&job.retry);
298                let delay_secs = ((job.retry.next_backoff_ms.saturating_add(999)) / 1000)
299                    .min(i64::MAX as u64) as i64;
300                job.retry.next_retry_at = Some(now.saturating_add(delay_secs));
301            } else {
302                Self::clear_retry_schedule(&mut job.retry);
303            }
304            job.updated_at = now;
305            Self::push_history(job, "failed");
306        }
307    }
308
309    /// Cancels a job and clears any pending retry schedule.
310    pub fn cancel(&mut self, id: &str) {
311        if let Some(job) = self.jobs.get_mut(id) {
312            job.status = JobStatus::Cancelled;
313            Self::clear_retry_schedule(&mut job.retry);
314            job.updated_at = Self::now_ts();
315            Self::push_history(job, "cancelled");
316        }
317    }
318
319    /// Pauses a job, optionally updating its detail message.
320    pub fn pause(&mut self, id: &str, detail: Option<String>) {
321        if let Some(job) = self.jobs.get_mut(id) {
322            job.status = JobStatus::Paused;
323            if detail.is_some() {
324                job.detail = detail;
325            }
326            job.updated_at = Self::now_ts();
327            Self::push_history(job, "paused");
328        }
329    }
330
331    /// Resumes a paused or failed job back to running status.
332    pub fn resume(&mut self, id: &str, detail: Option<String>) {
333        if let Some(job) = self.jobs.get_mut(id) {
334            job.status = JobStatus::Running;
335            if detail.is_some() {
336                job.detail = detail;
337            }
338            Self::clear_retry_schedule(&mut job.retry);
339            job.updated_at = Self::now_ts();
340            Self::push_history(job, "resumed");
341        }
342    }
343
344    /// Returns all jobs sorted by most recently updated first.
345    pub fn list(&self) -> Vec<JobRecord> {
346        let mut out = self.jobs.values().cloned().collect::<Vec<_>>();
347        out.sort_by_key(|job| std::cmp::Reverse(job.updated_at));
348        out
349    }
350
351    /// Returns the history entries for a job, or an empty vec if not found.
352    pub fn history(&self, id: &str) -> Vec<JobHistoryEntry> {
353        self.jobs
354            .get(id)
355            .map(|job| job.history.clone())
356            .unwrap_or_default()
357    }
358
359    /// Resets queued or running jobs back to queued on application resume.
360    pub fn resume_pending(&mut self) -> Vec<JobRecord> {
361        let mut resumed = Vec::new();
362        for job in self.jobs.values_mut() {
363            if matches!(job.status, JobStatus::Queued | JobStatus::Running) {
364                job.status = JobStatus::Queued;
365                job.updated_at = Self::now_ts();
366                Self::push_history(job, "queued_after_resume");
367                resumed.push(job.clone());
368            }
369        }
370        resumed
371    }
372
373    /// Loads jobs from the state store, deserializing extended detail when available.
374    pub fn load_from_store(&mut self, store: &StateStore) -> Result<()> {
375        let persisted = store.list_jobs(Some(500))?;
376        for job in persisted {
377            let fallback_status = job_state_status_to_runtime(job.status);
378            let parsed = Self::parse_persisted_detail(job.detail.as_deref());
379            let (status, detail, retry, history) = if let Some(detail_state) = parsed {
380                (
381                    detail_state.status,
382                    detail_state.detail,
383                    detail_state.retry,
384                    detail_state.history,
385                )
386            } else {
387                (
388                    fallback_status,
389                    job.detail,
390                    JobRetryMetadata::default(),
391                    Vec::new(),
392                )
393            };
394            self.jobs.insert(
395                job.id.clone(),
396                JobRecord {
397                    id: job.id,
398                    name: job.name,
399                    status,
400                    progress: job.progress,
401                    detail,
402                    retry,
403                    history,
404                    created_at: job.created_at,
405                    updated_at: job.updated_at,
406                },
407            );
408        }
409        Ok(())
410    }
411
412    /// Persists a single job's current state to the state store.
413    pub fn persist_job(&self, store: &StateStore, id: &str) -> Result<()> {
414        let Some(job) = self.jobs.get(id) else {
415            return Ok(());
416        };
417        let encoded_detail = Self::encode_persisted_detail(job)?;
418        store.upsert_job(&JobStateRecord {
419            id: job.id.clone(),
420            name: job.name.clone(),
421            status: runtime_status_to_job_state(job.status),
422            progress: job.progress,
423            detail: encoded_detail,
424            created_at: job.created_at,
425            updated_at: job.updated_at,
426        })
427    }
428
429    /// Persists all in-memory jobs to the state store.
430    pub fn persist_all(&self, store: &StateStore) -> Result<()> {
431        for id in self.jobs.keys() {
432            self.persist_job(store, id)?;
433        }
434        Ok(())
435    }
436}
437
438/// Manages thread lifecycle: spawn, resume, fork, archive, and persistence.
439pub struct ThreadManager {
440    store: StateStore,
441    running_threads: HashMap<String, Thread>,
442    cli_version: String,
443}
444
445impl ThreadManager {
446    /// Creates a new `ThreadManager` backed by the given state store.
447    pub fn new(store: StateStore) -> Self {
448        Self {
449            store,
450            running_threads: HashMap::new(),
451            cli_version: env!("CARGO_PKG_VERSION").to_string(),
452        }
453    }
454
455    /// Returns a reference to the underlying state store.
456    pub fn state_store(&self) -> &StateStore {
457        &self.store
458    }
459
460    /// Spawns a new thread with the given initial history and persists it.
461    pub fn spawn_thread_with_history(
462        &mut self,
463        model_provider: String,
464        cwd: PathBuf,
465        initial_history: InitialHistory,
466        persist_extended_history: bool,
467    ) -> Result<NewThread> {
468        let id = format!("thread-{}", Uuid::new_v4());
469        let now = chrono::Utc::now().timestamp();
470        let preview = preview_from_initial_history(&initial_history);
471        let source = match initial_history {
472            InitialHistory::New => SessionSource::Interactive,
473            InitialHistory::Forked(_) => SessionSource::Fork,
474            InitialHistory::Resumed { .. } => SessionSource::Resume,
475        };
476        let thread = Thread {
477            id: id.clone(),
478            preview,
479            ephemeral: !persist_extended_history,
480            model_provider: model_provider.clone(),
481            created_at: now,
482            updated_at: now,
483            status: ThreadStatus::Running,
484            path: None,
485            cwd: cwd.clone(),
486            cli_version: self.cli_version.clone(),
487            source: match source {
488                SessionSource::Interactive => codewhale_protocol::SessionSource::Interactive,
489                SessionSource::Resume => codewhale_protocol::SessionSource::Resume,
490                SessionSource::Fork => codewhale_protocol::SessionSource::Fork,
491                SessionSource::Api => codewhale_protocol::SessionSource::Api,
492                SessionSource::Unknown => codewhale_protocol::SessionSource::Unknown,
493            },
494            name: None,
495        };
496        self.persist_thread(&thread, None)?;
497        match &initial_history {
498            InitialHistory::Forked(items) => {
499                for item in items {
500                    self.store.append_message(
501                        &thread.id,
502                        "history",
503                        &item.to_string(),
504                        Some(item.clone()),
505                    )?;
506                }
507            }
508            InitialHistory::Resumed { history, .. } => {
509                for item in history {
510                    self.store.append_message(
511                        &thread.id,
512                        "history",
513                        &item.to_string(),
514                        Some(item.clone()),
515                    )?;
516                }
517            }
518            InitialHistory::New => {}
519        }
520        self.running_threads
521            .insert(thread.id.clone(), thread.clone());
522        Ok(NewThread {
523            thread,
524            model: "auto".to_string(),
525            model_provider,
526            cwd,
527            approval_policy: None,
528            sandbox: None,
529        })
530    }
531
532    /// Resumes an existing thread, returning `None` if not found.
533    pub fn resume_thread_with_history(
534        &mut self,
535        params: &ThreadResumeParams,
536        fallback_cwd: &Path,
537        model_provider: String,
538    ) -> Result<Option<NewThread>> {
539        if params.history.is_none()
540            && let Some(thread) = self.running_threads.get(&params.thread_id).cloned()
541        {
542            return Ok(Some(NewThread {
543                model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
544                model_provider: params.model_provider.clone().unwrap_or(model_provider),
545                cwd: params.cwd.clone().unwrap_or_else(|| thread.cwd.clone()),
546                approval_policy: params.approval_policy.clone(),
547                sandbox: params.sandbox.clone(),
548                thread,
549            }));
550        }
551
552        let persisted = self.store.get_thread(&params.thread_id)?;
553        let Some(metadata) = persisted else {
554            return Ok(None);
555        };
556        let mut thread = to_protocol_thread(metadata);
557        thread.status = ThreadStatus::Running;
558        thread.updated_at = chrono::Utc::now().timestamp();
559        thread.cwd = params
560            .cwd
561            .clone()
562            .unwrap_or_else(|| fallback_cwd.to_path_buf());
563        self.persist_thread(&thread, None)?;
564        self.running_threads
565            .insert(thread.id.clone(), thread.clone());
566        if let Some(history) = params.history.as_ref() {
567            for item in history {
568                self.store.append_message(
569                    &thread.id,
570                    "history",
571                    &item.to_string(),
572                    Some(item.clone()),
573                )?;
574            }
575        }
576
577        Ok(Some(NewThread {
578            model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
579            model_provider: params.model_provider.clone().unwrap_or(model_provider),
580            cwd: thread.cwd.clone(),
581            approval_policy: params.approval_policy.clone(),
582            sandbox: params.sandbox.clone(),
583            thread,
584        }))
585    }
586
587    /// Forks an existing thread into a new one, inheriting the parent's provider.
588    pub fn fork_thread(
589        &mut self,
590        params: &ThreadForkParams,
591        fallback_cwd: &Path,
592    ) -> Result<Option<NewThread>> {
593        let parent = self.store.get_thread(&params.thread_id)?;
594        let Some(parent) = parent else {
595            return Ok(None);
596        };
597        let parent_thread = to_protocol_thread(parent);
598        let new = self.spawn_thread_with_history(
599            params
600                .model_provider
601                .clone()
602                .unwrap_or_else(|| parent_thread.model_provider.clone()),
603            params
604                .cwd
605                .clone()
606                .unwrap_or_else(|| fallback_cwd.to_path_buf()),
607            InitialHistory::Forked(vec![json!({
608                "type": "fork",
609                "from_thread_id": parent_thread.id
610            })]),
611            params.persist_extended_history,
612        )?;
613        Ok(Some(new))
614    }
615
616    /// Lists threads matching the given filter parameters.
617    pub fn list_threads(&self, params: &ThreadListParams) -> Result<Vec<Thread>> {
618        let list = self.store.list_threads(ThreadListFilters {
619            include_archived: params.include_archived,
620            limit: params.limit,
621        })?;
622        Ok(list.into_iter().map(to_protocol_thread).collect())
623    }
624
625    /// Reads a single thread by id, or `None` if not found.
626    pub fn read_thread(&self, params: &ThreadReadParams) -> Result<Option<Thread>> {
627        Ok(self
628            .store
629            .get_thread(&params.thread_id)?
630            .map(to_protocol_thread))
631    }
632
633    /// Sets the display name for a thread, returning the updated thread or `None`.
634    pub fn set_thread_name(&mut self, params: &ThreadSetNameParams) -> Result<Option<Thread>> {
635        let Some(mut metadata) = self.store.get_thread(&params.thread_id)? else {
636            return Ok(None);
637        };
638        metadata.name = Some(params.name.clone());
639        metadata.updated_at = chrono::Utc::now().timestamp();
640        self.store.upsert_thread(&metadata)?;
641        let updated = to_protocol_thread(metadata);
642        self.running_threads
643            .insert(updated.id.clone(), updated.clone());
644        Ok(Some(updated))
645    }
646
647    /// Archives a thread so it no longer appears in default listings.
648    pub fn archive_thread(&mut self, thread_id: &str) -> Result<()> {
649        self.store.mark_archived(thread_id)?;
650        if let Some(thread) = self.running_threads.get_mut(thread_id) {
651            thread.status = ThreadStatus::Archived;
652        }
653        Ok(())
654    }
655
656    /// Restores an archived thread to active status.
657    pub fn unarchive_thread(&mut self, thread_id: &str) -> Result<()> {
658        self.store.mark_unarchived(thread_id)?;
659        Ok(())
660    }
661
662    /// Records a user message in a thread and updates its preview and timestamp.
663    pub fn touch_message(&mut self, thread_id: &str, input: &str) -> Result<()> {
664        let Some(mut metadata) = self.store.get_thread(thread_id)? else {
665            return Ok(());
666        };
667        metadata.updated_at = chrono::Utc::now().timestamp();
668        metadata.preview = truncate_preview(input);
669        metadata.status = PersistedThreadStatus::Running;
670        self.store.upsert_thread(&metadata)?;
671        if let Some(thread) = self.running_threads.get_mut(thread_id) {
672            thread.updated_at = metadata.updated_at;
673            thread.preview = metadata.preview;
674            thread.status = ThreadStatus::Running;
675        }
676        let message_id = self.store.append_message(thread_id, "user", input, None)?;
677        self.store.save_checkpoint(
678            thread_id,
679            "latest",
680            &json!({
681                "reason": "thread_message",
682                "message_id": message_id,
683                "role": "user",
684                "preview": truncate_preview(input),
685                "updated_at": metadata.updated_at
686            }),
687        )?;
688        Ok(())
689    }
690
691    fn persist_thread(&self, thread: &Thread, rollout_path: Option<PathBuf>) -> Result<()> {
692        self.store.upsert_thread(&ThreadMetadata {
693            id: thread.id.clone(),
694            rollout_path,
695            preview: thread.preview.clone(),
696            ephemeral: thread.ephemeral,
697            model_provider: thread.model_provider.clone(),
698            created_at: thread.created_at,
699            updated_at: thread.updated_at,
700            status: to_persisted_status(&thread.status),
701            path: thread.path.clone(),
702            cwd: thread.cwd.clone(),
703            cli_version: thread.cli_version.clone(),
704            source: to_persisted_source(&thread.source),
705            name: thread.name.clone(),
706            sandbox_policy: None,
707            approval_mode: None,
708            archived: matches!(thread.status, ThreadStatus::Archived),
709            archived_at: None,
710            git_sha: None,
711            git_branch: None,
712            git_origin_url: None,
713            memory_mode: None,
714            current_leaf_id: None,
715        })
716    }
717}
718
719/// Top-level runtime combining config, model registry, threads, tools, MCP, and hooks.
720pub struct Runtime {
721    /// Resolved application configuration.
722    pub config: ConfigToml,
723    /// Registry of available model providers.
724    pub model_registry: ModelRegistry,
725    /// Manages conversation thread lifecycle.
726    pub thread_manager: ThreadManager,
727    /// Registry of callable tools.
728    pub tool_registry: Arc<ToolRegistry>,
729    /// Manager for MCP server connections.
730    pub mcp_manager: Arc<McpManager>,
731    /// Engine for evaluating execution policy decisions.
732    pub exec_policy: ExecPolicyEngine,
733    /// Dispatcher for lifecycle hooks.
734    pub hooks: HookDispatcher,
735    /// Manager for background job lifecycle.
736    pub jobs: JobManager,
737}
738
739impl Runtime {
740    /// Constructs a new `Runtime`, loading existing jobs from the state store.
741    pub fn new(
742        config: ConfigToml,
743        model_registry: ModelRegistry,
744        state: StateStore,
745        tool_registry: Arc<ToolRegistry>,
746        mcp_manager: Arc<McpManager>,
747        exec_policy: ExecPolicyEngine,
748        hooks: HookDispatcher,
749    ) -> Self {
750        let mut jobs = JobManager::default();
751        if let Err(e) = jobs.load_from_store(&state) {
752            tracing::warn!("Failed to load job store, starting with empty job list: {e}");
753        }
754        Self {
755            config,
756            model_registry,
757            thread_manager: ThreadManager::new(state),
758            tool_registry,
759            mcp_manager,
760            exec_policy,
761            hooks,
762            jobs,
763        }
764    }
765
766    fn persisted_thread_data(&self, thread_id: &str) -> Result<Value> {
767        let history = self
768            .thread_manager
769            .state_store()
770            .list_messages(thread_id, Some(500))?
771            .into_iter()
772            .map(|message| {
773                json!({
774                    "id": message.id,
775                    "role": message.role,
776                    "content": message.content,
777                    "item": message.item,
778                    "created_at": message.created_at
779                })
780            })
781            .collect::<Vec<_>>();
782
783        let checkpoint = self
784            .thread_manager
785            .state_store()
786            .load_checkpoint(thread_id, None)?
787            .map(|record| {
788                json!({
789                    "checkpoint_id": record.checkpoint_id,
790                    "state": record.state,
791                    "created_at": record.created_at
792                })
793            });
794
795        Ok(json!({
796            "history": history,
797            "checkpoint": checkpoint
798        }))
799    }
800
801    fn persist_latest_checkpoint(&self, thread_id: &str, reason: &str, state: Value) -> Result<()> {
802        self.thread_manager.state_store().save_checkpoint(
803            thread_id,
804            "latest",
805            &json!({
806                "reason": reason,
807                "saved_at": chrono::Utc::now().timestamp(),
808                "state": state
809            }),
810        )
811    }
812
813    /// Dispatches a thread request (create, start, resume, fork, list, read, etc.).
814    pub async fn handle_thread(&mut self, req: ThreadRequest) -> Result<ThreadResponse> {
815        match req {
816            ThreadRequest::Create { .. } => {
817                let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
818                let new = self.thread_manager.spawn_thread_with_history(
819                    "deepseek".to_string(),
820                    cwd,
821                    InitialHistory::New,
822                    false,
823                )?;
824                let mut response = thread_response_from_new("created", new);
825                response.data = self.persisted_thread_data(&response.thread_id)?;
826                Ok(response)
827            }
828            ThreadRequest::Start(params) => {
829                let cwd = params.cwd.clone().unwrap_or_else(|| {
830                    std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
831                });
832                let new = self.thread_manager.spawn_thread_with_history(
833                    params
834                        .model_provider
835                        .clone()
836                        .unwrap_or_else(|| "deepseek".to_string()),
837                    cwd,
838                    InitialHistory::New,
839                    params.persist_extended_history,
840                )?;
841                let mut response = thread_response_from_new("started", new);
842                response.data = self.persisted_thread_data(&response.thread_id)?;
843                Ok(response)
844            }
845            ThreadRequest::Resume(params) => {
846                let fallback_cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
847                if let Some(new) = self.thread_manager.resume_thread_with_history(
848                    &params,
849                    &fallback_cwd,
850                    "deepseek".to_string(),
851                )? {
852                    let mut response = thread_response_from_new("resumed", new);
853                    response.data = self.persisted_thread_data(&response.thread_id)?;
854                    Ok(response)
855                } else {
856                    Ok(ThreadResponse {
857                        thread_id: params.thread_id,
858                        status: "missing".to_string(),
859                        thread: None,
860                        threads: Vec::new(),
861                        model: None,
862                        model_provider: None,
863                        cwd: None,
864                        approval_policy: params.approval_policy,
865                        sandbox: params.sandbox,
866                        events: Vec::new(),
867                        data: json!({"error":"thread not found"}),
868                    })
869                }
870            }
871            ThreadRequest::Fork(params) => {
872                let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
873                if let Some(new) = self.thread_manager.fork_thread(&params, &cwd)? {
874                    let mut response = thread_response_from_new("forked", new);
875                    response.data = self.persisted_thread_data(&response.thread_id)?;
876                    Ok(response)
877                } else {
878                    Ok(ThreadResponse {
879                        thread_id: params.thread_id,
880                        status: "missing".to_string(),
881                        thread: None,
882                        threads: Vec::new(),
883                        model: None,
884                        model_provider: None,
885                        cwd: None,
886                        approval_policy: params.approval_policy,
887                        sandbox: params.sandbox,
888                        events: Vec::new(),
889                        data: json!({"error":"thread not found"}),
890                    })
891                }
892            }
893            ThreadRequest::List(params) => Ok(ThreadResponse {
894                thread_id: "list".to_string(),
895                status: "ok".to_string(),
896                thread: None,
897                threads: self.thread_manager.list_threads(&params)?,
898                model: None,
899                model_provider: None,
900                cwd: None,
901                approval_policy: None,
902                sandbox: None,
903                events: Vec::new(),
904                data: json!({}),
905            }),
906            ThreadRequest::Read(params) => {
907                let id = params.thread_id.clone();
908                let data = self.persisted_thread_data(&id)?;
909                Ok(ThreadResponse {
910                    thread_id: id,
911                    status: "ok".to_string(),
912                    thread: self.thread_manager.read_thread(&params)?,
913                    threads: Vec::new(),
914                    model: None,
915                    model_provider: None,
916                    cwd: None,
917                    approval_policy: None,
918                    sandbox: None,
919                    events: Vec::new(),
920                    data,
921                })
922            }
923            ThreadRequest::SetName(params) => Ok(ThreadResponse {
924                thread_id: params.thread_id.clone(),
925                status: "ok".to_string(),
926                thread: self.thread_manager.set_thread_name(&params)?,
927                threads: Vec::new(),
928                model: None,
929                model_provider: None,
930                cwd: None,
931                approval_policy: None,
932                sandbox: None,
933                events: Vec::new(),
934                data: json!({}),
935            }),
936            ThreadRequest::Archive { thread_id } => {
937                self.thread_manager.archive_thread(&thread_id)?;
938                Ok(ThreadResponse {
939                    thread_id,
940                    status: "archived".to_string(),
941                    thread: None,
942                    threads: Vec::new(),
943                    model: None,
944                    model_provider: None,
945                    cwd: None,
946                    approval_policy: None,
947                    sandbox: None,
948                    events: Vec::new(),
949                    data: json!({}),
950                })
951            }
952            ThreadRequest::Unarchive { thread_id } => {
953                self.thread_manager.unarchive_thread(&thread_id)?;
954                Ok(ThreadResponse {
955                    thread_id,
956                    status: "unarchived".to_string(),
957                    thread: None,
958                    threads: Vec::new(),
959                    model: None,
960                    model_provider: None,
961                    cwd: None,
962                    approval_policy: None,
963                    sandbox: None,
964                    events: Vec::new(),
965                    data: json!({}),
966                })
967            }
968            ThreadRequest::Message { thread_id, input } => {
969                self.thread_manager.touch_message(&thread_id, &input)?;
970                let response_id = format!("{thread_id}:{}", input.len());
971                self.hooks
972                    .emit(HookEvent::ResponseStart {
973                        response_id: response_id.clone(),
974                    })
975                    .await;
976                self.hooks
977                    .emit(HookEvent::ResponseEnd {
978                        response_id: response_id.clone(),
979                    })
980                    .await;
981
982                Ok(ThreadResponse {
983                    thread_id,
984                    status: "accepted".to_string(),
985                    thread: None,
986                    threads: Vec::new(),
987                    model: None,
988                    model_provider: None,
989                    cwd: None,
990                    approval_policy: None,
991                    sandbox: None,
992                    events: vec![
993                        EventFrame::ResponseStart {
994                            response_id: response_id.clone(),
995                        },
996                        EventFrame::ResponseDelta {
997                            response_id: response_id.clone(),
998                            delta: "queued".to_string(),
999                            channel: ResponseChannel::Text,
1000                        },
1001                        EventFrame::ResponseEnd { response_id },
1002                    ],
1003                    data: json!({}),
1004                })
1005            }
1006        }
1007    }
1008
1009    /// Resolves the model for a prompt, records the message, and returns the response.
1010    pub async fn handle_prompt(
1011        &mut self,
1012        req: PromptRequest,
1013        cli_overrides: &CliRuntimeOverrides,
1014    ) -> Result<PromptResponse> {
1015        let resolved = self.config.resolve_runtime_options(cli_overrides);
1016        let requested_model = req.model.clone().unwrap_or_else(|| resolved.model.clone());
1017        let selection = self
1018            .model_registry
1019            .resolve(Some(&requested_model), Some(resolved.provider));
1020        let resolved_model = selection.resolved.id.clone();
1021        let response_id = format!("resp-{}", Uuid::new_v4());
1022
1023        self.hooks
1024            .emit(HookEvent::ResponseStart {
1025                response_id: response_id.clone(),
1026            })
1027            .await;
1028        self.hooks
1029            .emit(HookEvent::ResponseDelta {
1030                response_id: response_id.clone(),
1031                delta: "model-selected".to_string(),
1032            })
1033            .await;
1034        self.hooks
1035            .emit(HookEvent::ResponseEnd {
1036                response_id: response_id.clone(),
1037            })
1038            .await;
1039
1040        let payload = json!({
1041            "provider": resolved.provider.as_str(),
1042            "model": resolved_model.clone(),
1043            "prompt": req.prompt,
1044            "telemetry": resolved.telemetry,
1045            "base_url": resolved.base_url,
1046            "has_api_key": resolved.api_key.as_ref().is_some_and(|k| !k.trim().is_empty()),
1047            "approval_policy": resolved.approval_policy,
1048            "sandbox_mode": resolved.sandbox_mode
1049        });
1050        if let Some(thread_id) = req.thread_id.as_ref() {
1051            self.thread_manager.touch_message(thread_id, &req.prompt)?;
1052            let assistant_message_id = self.thread_manager.store.append_message(
1053                thread_id,
1054                "assistant",
1055                &payload.to_string(),
1056                Some(payload.clone()),
1057            )?;
1058            self.persist_latest_checkpoint(
1059                thread_id,
1060                "prompt_response",
1061                json!({
1062                    "response_id": response_id.clone(),
1063                    "model": resolved_model.clone(),
1064                    "provider": resolved.provider.as_str(),
1065                    "assistant_message_id": assistant_message_id
1066                }),
1067            )?;
1068        }
1069
1070        Ok(PromptResponse {
1071            output: payload.to_string(),
1072            model: resolved_model,
1073            events: vec![
1074                EventFrame::ResponseStart {
1075                    response_id: response_id.clone(),
1076                },
1077                EventFrame::ResponseDelta {
1078                    response_id: response_id.clone(),
1079                    delta: "model-selected".to_string(),
1080                    channel: ResponseChannel::Text,
1081                },
1082                EventFrame::ResponseEnd { response_id },
1083            ],
1084        })
1085    }
1086
1087    /// Evaluates execution policy and dispatches a tool call.
1088    pub async fn invoke_tool(
1089        &self,
1090        call: ToolCall,
1091        approval_mode: AskForApproval,
1092        cwd: &Path,
1093    ) -> Result<Value> {
1094        let fallback_cwd = cwd.display().to_string();
1095        let (command, policy_cwd, execution_kind) = call.execution_subject(&fallback_cwd);
1096        let policy_tool = match &call.payload {
1097            ToolPayload::LocalShell { .. } => "exec_shell",
1098            _ => call.name.as_str(),
1099        };
1100        let policy_path = permission_path_for_call(&call);
1101        let decision = self.exec_policy.check(ExecPolicyContext {
1102            command: &command,
1103            cwd: &policy_cwd,
1104            tool: Some(policy_tool),
1105            path: policy_path.as_deref(),
1106            ask_for_approval: approval_mode,
1107            sandbox_mode: None,
1108        })?;
1109        let precheck = policy_precheck_payload(&decision, &command, &policy_cwd, execution_kind);
1110        let response_id = format!("tool-{}", Uuid::new_v4());
1111        let call_id = call
1112            .raw_tool_call_id
1113            .clone()
1114            .unwrap_or_else(|| format!("tool-call-{}", Uuid::new_v4()));
1115        self.hooks
1116            .emit(HookEvent::ToolLifecycle {
1117                response_id: response_id.clone(),
1118                tool_name: call.name.clone(),
1119                phase: "precheck".to_string(),
1120                payload: precheck.clone(),
1121            })
1122            .await;
1123
1124        if !decision.allow {
1125            let reason = decision.reason().to_string();
1126            let approval_id = format!("approval-{}", Uuid::new_v4());
1127            let error_frame = EventFrame::Error {
1128                response_id: response_id.clone(),
1129                message: reason.clone(),
1130            };
1131            self.hooks
1132                .emit(HookEvent::ApprovalLifecycle {
1133                    approval_id,
1134                    phase: "denied".to_string(),
1135                    reason: Some(reason.clone()),
1136                })
1137                .await;
1138            self.hooks
1139                .emit(HookEvent::GenericEventFrame {
1140                    frame: error_frame.clone(),
1141                })
1142                .await;
1143            return Ok(json!({
1144                "ok": false,
1145                "status": "denied",
1146                "execution_kind": execution_kind,
1147                "response_id": response_id,
1148                "precheck": precheck,
1149                "error": reason,
1150                "events": [event_frame_payload(&error_frame)],
1151            }));
1152        }
1153
1154        if decision.requires_approval {
1155            let approval_id = format!("approval-{}", Uuid::new_v4());
1156            let reason = decision.reason().to_string();
1157            let maybe_approval_frame = approval_request_frame(
1158                &decision.requirement,
1159                call_id,
1160                approval_id.clone(),
1161                response_id.clone(),
1162                command.clone(),
1163                policy_cwd.clone(),
1164            );
1165            self.hooks
1166                .emit(HookEvent::ApprovalLifecycle {
1167                    approval_id: approval_id.clone(),
1168                    phase: "requested".to_string(),
1169                    reason: Some(reason.clone()),
1170                })
1171                .await;
1172            let mut events = Vec::new();
1173            if let Some(frame) = maybe_approval_frame {
1174                self.hooks
1175                    .emit(HookEvent::GenericEventFrame {
1176                        frame: frame.clone(),
1177                    })
1178                    .await;
1179                events.push(event_frame_payload(&frame));
1180            }
1181            return Ok(json!({
1182                "ok": false,
1183                "status": "approval_required",
1184                "execution_kind": execution_kind,
1185                "response_id": response_id,
1186                "approval_id": approval_id,
1187                "precheck": precheck,
1188                "error": reason,
1189                "events": events,
1190            }));
1191        }
1192
1193        let start_frame = EventFrame::ToolCallStart {
1194            response_id: response_id.clone(),
1195            tool_name: call.name.clone(),
1196            arguments: tool_payload_value(&call.payload),
1197        };
1198        self.hooks
1199            .emit(HookEvent::GenericEventFrame {
1200                frame: start_frame.clone(),
1201            })
1202            .await;
1203        self.hooks
1204            .emit(HookEvent::ToolLifecycle {
1205                response_id: response_id.clone(),
1206                tool_name: call.name.clone(),
1207                phase: "dispatching".to_string(),
1208                payload: json!({
1209                    "call_id": call_id,
1210                    "execution_kind": execution_kind
1211                }),
1212            })
1213            .await;
1214
1215        match self.tool_registry.dispatch(call.clone(), true).await {
1216            Ok(tool_output) => {
1217                let result_frame = EventFrame::ToolCallResult {
1218                    response_id: response_id.clone(),
1219                    tool_name: call.name.clone(),
1220                    output: tool_output_value(&tool_output),
1221                };
1222                self.hooks
1223                    .emit(HookEvent::GenericEventFrame {
1224                        frame: result_frame.clone(),
1225                    })
1226                    .await;
1227                self.hooks
1228                    .emit(HookEvent::ToolLifecycle {
1229                        response_id: response_id.clone(),
1230                        tool_name: call.name,
1231                        phase: "completed".to_string(),
1232                        payload: json!({ "ok": true }),
1233                    })
1234                    .await;
1235                Ok(json!({
1236                    "ok": true,
1237                    "status": "completed",
1238                    "execution_kind": execution_kind,
1239                    "response_id": response_id,
1240                    "precheck": precheck,
1241                    "output": tool_output,
1242                    "events": [
1243                        event_frame_payload(&start_frame),
1244                        event_frame_payload(&result_frame)
1245                    ]
1246                }))
1247            }
1248            Err(err) => {
1249                let message = format!("{err:?}");
1250                let error_frame = EventFrame::Error {
1251                    response_id: response_id.clone(),
1252                    message: message.clone(),
1253                };
1254                self.hooks
1255                    .emit(HookEvent::GenericEventFrame {
1256                        frame: error_frame.clone(),
1257                    })
1258                    .await;
1259                self.hooks
1260                    .emit(HookEvent::ToolLifecycle {
1261                        response_id: response_id.clone(),
1262                        tool_name: call.name,
1263                        phase: "failed".to_string(),
1264                        payload: json!({ "error": message.clone() }),
1265                    })
1266                    .await;
1267                Ok(json!({
1268                    "ok": false,
1269                    "status": "failed",
1270                    "execution_kind": execution_kind,
1271                    "response_id": response_id,
1272                    "precheck": precheck,
1273                    "error": message,
1274                    "events": [
1275                        event_frame_payload(&start_frame),
1276                        event_frame_payload(&error_frame)
1277                    ]
1278                }))
1279            }
1280        }
1281    }
1282
1283    /// Starts all configured MCP servers and emits startup events via hooks.
1284    pub async fn mcp_startup(&self) -> McpStartupCompleteEvent {
1285        let mut updates = Vec::new();
1286        let summary = self.mcp_manager.start_all(|update| {
1287            updates.push(update);
1288        });
1289        for update in updates {
1290            let status = match update.status {
1291                McpManagerStartupStatus::Starting => codewhale_protocol::McpStartupStatus::Starting,
1292                McpManagerStartupStatus::Ready => codewhale_protocol::McpStartupStatus::Ready,
1293                McpManagerStartupStatus::Failed { error } => {
1294                    codewhale_protocol::McpStartupStatus::Failed { error }
1295                }
1296                McpManagerStartupStatus::Cancelled => {
1297                    codewhale_protocol::McpStartupStatus::Cancelled
1298                }
1299            };
1300            self.hooks
1301                .emit(HookEvent::GenericEventFrame {
1302                    frame: EventFrame::McpStartupUpdate {
1303                        update: codewhale_protocol::McpStartupUpdateEvent {
1304                            server_name: update.server_name,
1305                            status,
1306                        },
1307                    },
1308                })
1309                .await;
1310        }
1311        self.hooks
1312            .emit(HookEvent::GenericEventFrame {
1313                frame: EventFrame::McpStartupComplete {
1314                    summary: codewhale_protocol::McpStartupCompleteEvent {
1315                        ready: summary.ready.clone(),
1316                        failed: summary
1317                            .failed
1318                            .iter()
1319                            .map(|f| codewhale_protocol::McpStartupFailure {
1320                                server_name: f.server_name.clone(),
1321                                error: f.error.clone(),
1322                            })
1323                            .collect(),
1324                        cancelled: summary.cancelled.clone(),
1325                    },
1326                },
1327            })
1328            .await;
1329        summary
1330    }
1331
1332    /// Returns the current application status including all jobs and their history.
1333    pub fn app_status(&self) -> AppResponse {
1334        let jobs = self.jobs.list();
1335        let events = jobs
1336            .iter()
1337            .flat_map(|job| {
1338                job.history.iter().map(|entry| EventFrame::ResponseDelta {
1339                    response_id: job.id.clone(),
1340                    delta: json!({
1341                        "kind": "job_transition",
1342                        "job_id": job.id.clone(),
1343                        "phase": entry.phase.clone(),
1344                        "status": job_status_to_str(entry.status),
1345                        "progress": entry.progress,
1346                        "detail": entry.detail.clone(),
1347                        "retry": job_retry_to_value(&entry.retry),
1348                        "at": entry.at
1349                    })
1350                    .to_string(),
1351                    channel: ResponseChannel::Text,
1352                })
1353            })
1354            .collect::<Vec<_>>();
1355        AppResponse {
1356            ok: true,
1357            data: json!({
1358                "jobs": jobs.into_iter().map(|job| {
1359                    json!({
1360                        "id": job.id,
1361                        "name": job.name,
1362                        "status": job_status_to_str(job.status),
1363                        "progress": job.progress,
1364                        "detail": job.detail,
1365                        "retry": job_retry_to_value(&job.retry),
1366                        "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
1367                    })
1368                }).collect::<Vec<_>>()
1369            }),
1370            events,
1371        }
1372    }
1373
1374    /// Returns the default model provider from the resolved configuration.
1375    pub fn provider_default(&self) -> ProviderKind {
1376        self.config.provider
1377    }
1378
1379    /// Saves a named checkpoint for a thread.
1380    pub fn save_thread_checkpoint(
1381        &self,
1382        thread_id: &str,
1383        checkpoint_id: &str,
1384        state: &Value,
1385    ) -> Result<()> {
1386        self.thread_manager
1387            .state_store()
1388            .save_checkpoint(thread_id, checkpoint_id, state)
1389    }
1390
1391    /// Loads a checkpoint for a thread. Pass `None` for the latest.
1392    pub fn load_thread_checkpoint(
1393        &self,
1394        thread_id: &str,
1395        checkpoint_id: Option<&str>,
1396    ) -> Result<Option<Value>> {
1397        Ok(self
1398            .thread_manager
1399            .state_store()
1400            .load_checkpoint(thread_id, checkpoint_id)?
1401            .map(|checkpoint| checkpoint.state))
1402    }
1403
1404    /// Enqueues a new background job and persists it immediately.
1405    pub fn enqueue_job(&mut self, name: impl Into<String>) -> Result<JobRecord> {
1406        let job = self.jobs.enqueue(name);
1407        self.jobs
1408            .persist_job(self.thread_manager.state_store(), &job.id)?;
1409        Ok(job)
1410    }
1411
1412    /// Transitions a job to running and persists the change.
1413    pub fn set_job_running(&mut self, job_id: &str) -> Result<()> {
1414        self.jobs.set_running(job_id);
1415        self.jobs
1416            .persist_job(self.thread_manager.state_store(), job_id)
1417    }
1418
1419    /// Updates a job's progress and persists the change.
1420    pub fn update_job_progress(
1421        &mut self,
1422        job_id: &str,
1423        progress: u8,
1424        detail: Option<String>,
1425    ) -> Result<()> {
1426        self.jobs.update_progress(job_id, progress, detail);
1427        self.jobs
1428            .persist_job(self.thread_manager.state_store(), job_id)
1429    }
1430
1431    /// Marks a job as completed and persists the change.
1432    pub fn complete_job(&mut self, job_id: &str) -> Result<()> {
1433        self.jobs.complete(job_id);
1434        self.jobs
1435            .persist_job(self.thread_manager.state_store(), job_id)
1436    }
1437
1438    /// Marks a job as failed and persists the change.
1439    pub fn fail_job(&mut self, job_id: &str, detail: impl Into<String>) -> Result<()> {
1440        self.jobs.fail(job_id, detail);
1441        self.jobs
1442            .persist_job(self.thread_manager.state_store(), job_id)
1443    }
1444
1445    /// Cancels a job and persists the change.
1446    pub fn cancel_job(&mut self, job_id: &str) -> Result<()> {
1447        self.jobs.cancel(job_id);
1448        self.jobs
1449            .persist_job(self.thread_manager.state_store(), job_id)
1450    }
1451
1452    /// Pauses a job and persists the change.
1453    pub fn pause_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1454        self.jobs.pause(job_id, detail);
1455        self.jobs
1456            .persist_job(self.thread_manager.state_store(), job_id)
1457    }
1458
1459    /// Resumes a paused job and persists the change.
1460    pub fn resume_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1461        self.jobs.resume(job_id, detail);
1462        self.jobs
1463            .persist_job(self.thread_manager.state_store(), job_id)
1464    }
1465
1466    /// Returns the state-transition history for a job.
1467    pub fn job_history(&self, job_id: &str) -> Vec<JobHistoryEntry> {
1468        self.jobs.history(job_id)
1469    }
1470}
1471
1472fn thread_response_from_new(status: &str, new: NewThread) -> ThreadResponse {
1473    ThreadResponse {
1474        thread_id: new.thread.id.clone(),
1475        status: status.to_string(),
1476        thread: Some(new.thread),
1477        threads: Vec::new(),
1478        model: Some(new.model),
1479        model_provider: Some(new.model_provider),
1480        cwd: Some(new.cwd),
1481        approval_policy: new.approval_policy,
1482        sandbox: new.sandbox,
1483        events: Vec::new(),
1484        data: json!({}),
1485    }
1486}
1487
1488fn preview_from_initial_history(initial_history: &InitialHistory) -> String {
1489    match initial_history {
1490        InitialHistory::New => "New conversation".to_string(),
1491        InitialHistory::Forked(items) => truncate_preview(
1492            &items
1493                .first()
1494                .map(Value::to_string)
1495                .unwrap_or_else(|| "Forked conversation".to_string()),
1496        ),
1497        InitialHistory::Resumed { history, .. } => truncate_preview(
1498            &history
1499                .first()
1500                .map(Value::to_string)
1501                .unwrap_or_else(|| "Resumed conversation".to_string()),
1502        ),
1503    }
1504}
1505
1506fn permission_path_for_call(call: &ToolCall) -> Option<String> {
1507    match &call.payload {
1508        ToolPayload::Function { arguments } => serde_json::from_str::<Value>(arguments)
1509            .ok()
1510            .and_then(|value| {
1511                value
1512                    .get("path")
1513                    .and_then(Value::as_str)
1514                    .map(str::to_string)
1515            }),
1516        ToolPayload::Mcp { raw_arguments, .. } => raw_arguments
1517            .get("path")
1518            .and_then(Value::as_str)
1519            .map(str::to_string),
1520        ToolPayload::Custom { .. } | ToolPayload::LocalShell { .. } => None,
1521    }
1522}
1523
1524fn truncate_preview(value: &str) -> String {
1525    value.chars().take(120).collect()
1526}
1527
1528fn to_protocol_thread(thread: ThreadMetadata) -> Thread {
1529    Thread {
1530        id: thread.id,
1531        preview: thread.preview,
1532        ephemeral: thread.ephemeral,
1533        model_provider: thread.model_provider,
1534        created_at: thread.created_at,
1535        updated_at: thread.updated_at,
1536        status: match thread.status {
1537            PersistedThreadStatus::Running => ThreadStatus::Running,
1538            PersistedThreadStatus::Idle => ThreadStatus::Idle,
1539            PersistedThreadStatus::Completed => ThreadStatus::Completed,
1540            PersistedThreadStatus::Failed => ThreadStatus::Failed,
1541            PersistedThreadStatus::Paused => ThreadStatus::Paused,
1542            PersistedThreadStatus::Archived => ThreadStatus::Archived,
1543        },
1544        path: thread.path,
1545        cwd: thread.cwd,
1546        cli_version: thread.cli_version,
1547        source: match thread.source {
1548            SessionSource::Interactive => codewhale_protocol::SessionSource::Interactive,
1549            SessionSource::Resume => codewhale_protocol::SessionSource::Resume,
1550            SessionSource::Fork => codewhale_protocol::SessionSource::Fork,
1551            SessionSource::Api => codewhale_protocol::SessionSource::Api,
1552            SessionSource::Unknown => codewhale_protocol::SessionSource::Unknown,
1553        },
1554        name: thread.name,
1555    }
1556}
1557
1558fn to_persisted_status(status: &ThreadStatus) -> PersistedThreadStatus {
1559    match status {
1560        ThreadStatus::Running => PersistedThreadStatus::Running,
1561        ThreadStatus::Idle => PersistedThreadStatus::Idle,
1562        ThreadStatus::Completed => PersistedThreadStatus::Completed,
1563        ThreadStatus::Failed => PersistedThreadStatus::Failed,
1564        ThreadStatus::Paused => PersistedThreadStatus::Paused,
1565        ThreadStatus::Archived => PersistedThreadStatus::Archived,
1566    }
1567}
1568
1569fn to_persisted_source(source: &codewhale_protocol::SessionSource) -> SessionSource {
1570    match source {
1571        codewhale_protocol::SessionSource::Interactive => SessionSource::Interactive,
1572        codewhale_protocol::SessionSource::Resume => SessionSource::Resume,
1573        codewhale_protocol::SessionSource::Fork => SessionSource::Fork,
1574        codewhale_protocol::SessionSource::Api => SessionSource::Api,
1575        codewhale_protocol::SessionSource::Unknown => SessionSource::Unknown,
1576    }
1577}
1578
1579fn approval_request_frame(
1580    requirement: &ExecApprovalRequirement,
1581    call_id: String,
1582    approval_id: String,
1583    turn_id: String,
1584    command: String,
1585    cwd: String,
1586) -> Option<EventFrame> {
1587    let ExecApprovalRequirement::NeedsApproval {
1588        reason,
1589        proposed_execpolicy_amendment,
1590        proposed_network_policy_amendments,
1591    } = requirement
1592    else {
1593        return None;
1594    };
1595
1596    let mut available_decisions = vec![
1597        ReviewDecision::Approved,
1598        ReviewDecision::ApprovedForSession,
1599        ReviewDecision::Denied,
1600        ReviewDecision::Abort,
1601    ];
1602    if proposed_execpolicy_amendment
1603        .as_ref()
1604        .is_some_and(|amendment| !amendment.prefixes.is_empty())
1605    {
1606        available_decisions.push(ReviewDecision::ApprovedExecpolicyAmendment);
1607    }
1608    available_decisions.extend(proposed_network_policy_amendments.iter().cloned().map(
1609        |amendment| ReviewDecision::NetworkPolicyAmendment {
1610            host: amendment.host,
1611            action: amendment.action,
1612        },
1613    ));
1614
1615    Some(EventFrame::ExecApprovalRequest {
1616        request: ExecApprovalRequestEvent {
1617            call_id,
1618            approval_id,
1619            turn_id,
1620            command,
1621            cwd,
1622            reason: reason.clone(),
1623            network_approval_context: None,
1624            proposed_execpolicy_amendment: proposed_execpolicy_amendment
1625                .as_ref()
1626                .map(|amendment| amendment.prefixes.clone())
1627                .unwrap_or_default(),
1628            proposed_network_policy_amendments: proposed_network_policy_amendments.clone(),
1629            additional_permissions: Vec::new(),
1630            available_decisions,
1631        },
1632    })
1633}
1634
1635fn approval_requirement_payload(requirement: &ExecApprovalRequirement) -> Value {
1636    match requirement {
1637        ExecApprovalRequirement::Skip {
1638            bypass_sandbox,
1639            proposed_execpolicy_amendment,
1640        } => json!({
1641            "type": "skip",
1642            "bypass_sandbox": bypass_sandbox,
1643            "reason": requirement.reason(),
1644            "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1645                .as_ref()
1646                .map(|amendment| amendment.prefixes.clone())
1647                .unwrap_or_default()
1648        }),
1649        ExecApprovalRequirement::NeedsApproval {
1650            reason,
1651            proposed_execpolicy_amendment,
1652            proposed_network_policy_amendments,
1653        } => json!({
1654            "type": "needs_approval",
1655            "reason": reason,
1656            "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1657                .as_ref()
1658                .map(|amendment| amendment.prefixes.clone())
1659                .unwrap_or_default(),
1660            "proposed_network_policy_amendments": proposed_network_policy_amendments
1661        }),
1662        ExecApprovalRequirement::Forbidden { reason } => json!({
1663            "type": "forbidden",
1664            "reason": reason
1665        }),
1666    }
1667}
1668
1669fn policy_precheck_payload(
1670    decision: &ExecPolicyDecision,
1671    command: &str,
1672    cwd: &str,
1673    execution_kind: &str,
1674) -> Value {
1675    json!({
1676        "execution_kind": execution_kind,
1677        "command": command,
1678        "cwd": cwd,
1679        "allow": decision.allow,
1680        "requires_approval": decision.requires_approval,
1681        "matched_rule": decision.matched_rule.clone(),
1682        "phase": decision.requirement.phase(),
1683        "reason": decision.reason(),
1684        "requirement": approval_requirement_payload(&decision.requirement)
1685    })
1686}
1687
1688fn tool_payload_value(payload: &ToolPayload) -> Value {
1689    serde_json::to_value(payload).unwrap_or_else(
1690        |_| json!({"type":"serialization_error","message":"tool payload unavailable"}),
1691    )
1692}
1693
1694fn tool_output_value(output: &codewhale_protocol::ToolOutput) -> Value {
1695    serde_json::to_value(output).unwrap_or_else(
1696        |_| json!({"type":"serialization_error","message":"tool output unavailable"}),
1697    )
1698}
1699
1700fn event_frame_payload(frame: &EventFrame) -> Value {
1701    serde_json::to_value(frame)
1702        .unwrap_or_else(|_| json!({"event":"error","message":"failed to encode event frame"}))
1703}
1704
1705fn json_optional_string(value: &Value) -> Option<String> {
1706    if value.is_null() {
1707        None
1708    } else {
1709        value.as_str().map(ToString::to_string)
1710    }
1711}
1712
1713fn parse_retry_metadata(value: Option<&Value>) -> JobRetryMetadata {
1714    let Some(value) = value else {
1715        return JobRetryMetadata::default();
1716    };
1717    JobRetryMetadata {
1718        attempt: value
1719            .get("attempt")
1720            .and_then(Value::as_u64)
1721            .unwrap_or(0)
1722            .min(u32::MAX as u64) as u32,
1723        max_attempts: value
1724            .get("max_attempts")
1725            .and_then(Value::as_u64)
1726            .unwrap_or(DEFAULT_JOB_MAX_ATTEMPTS as u64)
1727            .min(u32::MAX as u64) as u32,
1728        backoff_base_ms: value
1729            .get("backoff_base_ms")
1730            .and_then(Value::as_u64)
1731            .unwrap_or(DEFAULT_JOB_BACKOFF_BASE_MS),
1732        next_backoff_ms: value
1733            .get("next_backoff_ms")
1734            .and_then(Value::as_u64)
1735            .unwrap_or(0),
1736        next_retry_at: value.get("next_retry_at").and_then(Value::as_i64),
1737    }
1738}
1739
1740fn parse_history_entry(value: &Value) -> Option<JobHistoryEntry> {
1741    let status = value
1742        .get("status")
1743        .and_then(Value::as_str)
1744        .and_then(job_status_from_str)?;
1745    Some(JobHistoryEntry {
1746        at: value.get("at").and_then(Value::as_i64).unwrap_or(0),
1747        phase: value
1748            .get("phase")
1749            .and_then(Value::as_str)
1750            .unwrap_or("unknown")
1751            .to_string(),
1752        status,
1753        progress: value
1754            .get("progress")
1755            .and_then(Value::as_u64)
1756            .map(|v| v.min(u8::MAX as u64) as u8),
1757        detail: value.get("detail").and_then(json_optional_string),
1758        retry: parse_retry_metadata(value.get("retry")),
1759    })
1760}
1761
1762fn job_status_to_str(status: JobStatus) -> &'static str {
1763    match status {
1764        JobStatus::Queued => "queued",
1765        JobStatus::Running => "running",
1766        JobStatus::Paused => "paused",
1767        JobStatus::Completed => "completed",
1768        JobStatus::Failed => "failed",
1769        JobStatus::Cancelled => "cancelled",
1770    }
1771}
1772
1773fn job_status_from_str(value: &str) -> Option<JobStatus> {
1774    match value {
1775        "queued" => Some(JobStatus::Queued),
1776        "running" => Some(JobStatus::Running),
1777        "paused" => Some(JobStatus::Paused),
1778        "completed" => Some(JobStatus::Completed),
1779        "failed" => Some(JobStatus::Failed),
1780        "cancelled" => Some(JobStatus::Cancelled),
1781        _ => None,
1782    }
1783}
1784
1785fn job_retry_to_value(retry: &JobRetryMetadata) -> Value {
1786    json!({
1787        "attempt": retry.attempt,
1788        "max_attempts": retry.max_attempts,
1789        "backoff_base_ms": retry.backoff_base_ms,
1790        "next_backoff_ms": retry.next_backoff_ms,
1791        "next_retry_at": retry.next_retry_at
1792    })
1793}
1794
1795fn job_history_to_value(entry: &JobHistoryEntry) -> Value {
1796    json!({
1797        "at": entry.at,
1798        "phase": entry.phase.clone(),
1799        "status": job_status_to_str(entry.status),
1800        "progress": entry.progress,
1801        "detail": entry.detail.clone(),
1802        "retry": job_retry_to_value(&entry.retry)
1803    })
1804}
1805
1806fn runtime_status_to_job_state(status: JobStatus) -> JobStateStatus {
1807    match status {
1808        JobStatus::Queued => JobStateStatus::Queued,
1809        JobStatus::Running => JobStateStatus::Running,
1810        JobStatus::Paused => JobStateStatus::Running,
1811        JobStatus::Completed => JobStateStatus::Completed,
1812        JobStatus::Failed => JobStateStatus::Failed,
1813        JobStatus::Cancelled => JobStateStatus::Cancelled,
1814    }
1815}
1816
1817fn job_state_status_to_runtime(status: JobStateStatus) -> JobStatus {
1818    match status {
1819        JobStateStatus::Queued => JobStatus::Queued,
1820        JobStateStatus::Running => JobStatus::Running,
1821        JobStateStatus::Completed => JobStatus::Completed,
1822        JobStateStatus::Failed => JobStatus::Failed,
1823        JobStateStatus::Cancelled => JobStatus::Cancelled,
1824    }
1825}
1826
1827#[cfg(test)]
1828mod tests {
1829    use super::*;
1830    use codewhale_tools::ToolCallSource;
1831
1832    // ── JobManager: lifecycle ──────────────────────────────────────────
1833
1834    #[test]
1835    fn permission_path_for_call_extracts_function_path_argument() {
1836        let call = ToolCall {
1837            name: "read_file".to_string(),
1838            payload: ToolPayload::Function {
1839                arguments: json!({ "path": "README.md" }).to_string(),
1840            },
1841            source: ToolCallSource::Direct,
1842            raw_tool_call_id: None,
1843        };
1844
1845        assert_eq!(
1846            permission_path_for_call(&call).as_deref(),
1847            Some("README.md")
1848        );
1849    }
1850
1851    #[test]
1852    fn permission_path_for_call_extracts_mcp_path_argument() {
1853        let call = ToolCall {
1854            name: "mcp_fs_read".to_string(),
1855            payload: ToolPayload::Mcp {
1856                server: "fs".to_string(),
1857                tool: "read".to_string(),
1858                raw_arguments: json!({ "path": "secrets/token.txt" }),
1859                raw_tool_call_id: None,
1860            },
1861            source: ToolCallSource::Direct,
1862            raw_tool_call_id: None,
1863        };
1864
1865        assert_eq!(
1866            permission_path_for_call(&call).as_deref(),
1867            Some("secrets/token.txt")
1868        );
1869    }
1870
1871    #[test]
1872    fn permission_path_for_call_ignores_shell_payload() {
1873        let call = ToolCall {
1874            name: "exec_shell".to_string(),
1875            payload: ToolPayload::LocalShell {
1876                params: codewhale_protocol::LocalShellParams {
1877                    command: "cargo test".to_string(),
1878                    cwd: None,
1879                    timeout_ms: None,
1880                },
1881            },
1882            source: ToolCallSource::Direct,
1883            raw_tool_call_id: None,
1884        };
1885
1886        assert_eq!(permission_path_for_call(&call), None);
1887    }
1888
1889    #[test]
1890    fn enqueue_creates_queued_job_with_zero_progress() {
1891        let mut jm = JobManager::default();
1892        let job = jm.enqueue("build");
1893        assert_eq!(job.name, "build");
1894        assert_eq!(job.status, JobStatus::Queued);
1895        assert_eq!(job.progress, Some(0));
1896        assert!(job.detail.is_none());
1897        assert_eq!(job.history.len(), 1);
1898        assert_eq!(job.history[0].phase, "created");
1899    }
1900
1901    #[test]
1902    fn set_running_transitions_from_queued() {
1903        let mut jm = JobManager::default();
1904        let job = jm.enqueue("deploy");
1905        let id = job.id.clone();
1906        jm.set_running(&id);
1907        let jobs = jm.list();
1908        let updated = jobs.iter().find(|j| j.id == id).unwrap();
1909        assert_eq!(updated.status, JobStatus::Running);
1910        assert_eq!(updated.history.last().unwrap().phase, "running");
1911    }
1912
1913    #[test]
1914    fn update_progress_clamps_to_100() {
1915        let mut jm = JobManager::default();
1916        let job = jm.enqueue("task");
1917        let id = job.id.clone();
1918        jm.update_progress(&id, 150, Some("over".to_string()));
1919        let jobs = jm.list();
1920        let updated = jobs.iter().find(|j| j.id == id).unwrap();
1921        assert_eq!(updated.progress, Some(100));
1922    }
1923
1924    #[test]
1925    fn complete_sets_progress_to_100() {
1926        let mut jm = JobManager::default();
1927        let job = jm.enqueue("task");
1928        let id = job.id.clone();
1929        jm.set_running(&id);
1930        jm.complete(&id);
1931        let jobs = jm.list();
1932        let updated = jobs.iter().find(|j| j.id == id).unwrap();
1933        assert_eq!(updated.status, JobStatus::Completed);
1934        assert_eq!(updated.progress, Some(100));
1935    }
1936
1937    #[test]
1938    fn fail_increments_attempt_and_sets_backoff() {
1939        let mut jm = JobManager::default();
1940        let job = jm.enqueue("fragile");
1941        let id = job.id.clone();
1942        jm.set_running(&id);
1943        jm.fail(&id, "crashed");
1944        let jobs = jm.list();
1945        let updated = jobs.iter().find(|j| j.id == id).unwrap();
1946        assert_eq!(updated.status, JobStatus::Failed);
1947        assert_eq!(updated.retry.attempt, 1);
1948        assert!(updated.retry.next_backoff_ms > 0);
1949        assert!(updated.retry.next_retry_at.is_some());
1950        assert_eq!(updated.detail.as_deref(), Some("crashed"));
1951    }
1952
1953    #[test]
1954    fn fail_clears_retry_after_max_attempts() {
1955        let mut jm = JobManager::default();
1956        let job = jm.enqueue("fragile");
1957        let id = job.id.clone();
1958        for _ in 0..=DEFAULT_JOB_MAX_ATTEMPTS {
1959            jm.set_running(&id);
1960            jm.fail(&id, "boom");
1961        }
1962        let jobs = jm.list();
1963        let updated = jobs.iter().find(|j| j.id == id).unwrap();
1964        assert_eq!(updated.retry.attempt, DEFAULT_JOB_MAX_ATTEMPTS);
1965        assert_eq!(updated.retry.next_backoff_ms, 0);
1966        assert!(updated.retry.next_retry_at.is_none());
1967    }
1968
1969    #[test]
1970    fn cancel_sets_status_and_clears_retry() {
1971        let mut jm = JobManager::default();
1972        let job = jm.enqueue("task");
1973        let id = job.id.clone();
1974        jm.cancel(&id);
1975        let jobs = jm.list();
1976        let updated = jobs.iter().find(|j| j.id == id).unwrap();
1977        assert_eq!(updated.status, JobStatus::Cancelled);
1978        assert_eq!(updated.retry.next_backoff_ms, 0);
1979    }
1980
1981    #[test]
1982    fn pause_and_resume_round_trip() {
1983        let mut jm = JobManager::default();
1984        let job = jm.enqueue("task");
1985        let id = job.id.clone();
1986        jm.set_running(&id);
1987        jm.pause(&id, Some("waiting".to_string()));
1988        let jobs = jm.list();
1989        let paused = jobs.iter().find(|j| j.id == id).unwrap();
1990        assert_eq!(paused.status, JobStatus::Paused);
1991        assert_eq!(paused.detail.as_deref(), Some("waiting"));
1992
1993        jm.resume(&id, None);
1994        let jobs = jm.list();
1995        let resumed = jobs.iter().find(|j| j.id == id).unwrap();
1996        assert_eq!(resumed.status, JobStatus::Running);
1997        assert_eq!(resumed.history.last().unwrap().phase, "resumed");
1998    }
1999
2000    #[test]
2001    fn list_returns_jobs_sorted_by_updated_at_desc() {
2002        let mut jm = JobManager::default();
2003        jm.enqueue("first");
2004        jm.enqueue("second");
2005        jm.enqueue("third");
2006        let jobs = jm.list();
2007        assert_eq!(jobs.len(), 3);
2008        for window in jobs.windows(2) {
2009            assert!(window[0].updated_at >= window[1].updated_at);
2010        }
2011    }
2012
2013    #[test]
2014    fn history_returns_entries_for_existing_job() {
2015        let mut jm = JobManager::default();
2016        let job = jm.enqueue("task");
2017        let id = job.id.clone();
2018        jm.set_running(&id);
2019        jm.complete(&id);
2020        let history = jm.history(&id);
2021        assert_eq!(history.len(), 3); // created, running, completed
2022        assert_eq!(history[0].phase, "created");
2023        assert_eq!(history[1].phase, "running");
2024        assert_eq!(history[2].phase, "completed");
2025    }
2026
2027    #[test]
2028    fn history_returns_empty_for_unknown_job() {
2029        let jm = JobManager::default();
2030        assert!(jm.history("nonexistent").is_empty());
2031    }
2032
2033    #[test]
2034    fn resume_pending_requeues_running_and_queued() {
2035        let mut jm = JobManager::default();
2036        let _j1 = jm.enqueue("queued_task");
2037        let j2 = jm.enqueue("running_task");
2038        let j3 = jm.enqueue("completed_task");
2039        let id2 = j2.id.clone();
2040        let id3 = j3.id.clone();
2041        jm.set_running(&id2);
2042        jm.set_running(&id3);
2043        jm.complete(&id3);
2044
2045        let resumed = jm.resume_pending();
2046        assert_eq!(resumed.len(), 2);
2047        for job in &resumed {
2048            assert_eq!(job.status, JobStatus::Queued);
2049        }
2050    }
2051
2052    // ── JobManager: backoff ────────────────────────────────────────────
2053
2054    #[test]
2055    fn deterministic_backoff_zero_on_first_attempt() {
2056        let retry = JobRetryMetadata {
2057            attempt: 0,
2058            ..Default::default()
2059        };
2060        assert_eq!(JobManager::deterministic_backoff_ms(&retry), 0);
2061    }
2062
2063    #[test]
2064    fn deterministic_backoff_exponential_growth() {
2065        let base = DEFAULT_JOB_BACKOFF_BASE_MS;
2066        for attempt in 1..=5 {
2067            let retry = JobRetryMetadata {
2068                attempt,
2069                backoff_base_ms: base,
2070                ..Default::default()
2071            };
2072            let expected = base * 2u64.pow(attempt.saturating_sub(1).min(20));
2073            assert_eq!(
2074                JobManager::deterministic_backoff_ms(&retry),
2075                expected,
2076                "attempt {attempt}"
2077            );
2078        }
2079    }
2080
2081    #[test]
2082    fn deterministic_backoff_saturates_at_high_exponent() {
2083        let retry = JobRetryMetadata {
2084            attempt: 63,
2085            backoff_base_ms: 1000,
2086            ..Default::default()
2087        };
2088        // Should not panic; result saturates
2089        let _ = JobManager::deterministic_backoff_ms(&retry);
2090    }
2091
2092    // ── JobManager: history truncation ─────────────────────────────────
2093
2094    #[test]
2095    fn push_history_truncates_beyond_max() {
2096        let mut jm = JobManager::default();
2097        let job = jm.enqueue("task");
2098        let id = job.id.clone();
2099        // Generate more history entries than the limit
2100        for i in 0..(MAX_JOB_HISTORY_ENTRIES + 20) {
2101            jm.update_progress(&id, (i % 100) as u8, Some(format!("step {i}")));
2102        }
2103        let history = jm.history(&id);
2104        assert_eq!(history.len(), MAX_JOB_HISTORY_ENTRIES);
2105    }
2106
2107    // ── JobManager: persistence encoding/parsing ───────────────────────
2108
2109    #[test]
2110    fn encode_and_parse_persisted_detail_round_trip() {
2111        let mut jm = JobManager::default();
2112        let job = jm.enqueue("task");
2113        let id = job.id.clone();
2114        jm.set_running(&id);
2115        jm.fail(&id, "oops");
2116        let job = jm.list().into_iter().find(|j| j.id == id).unwrap();
2117
2118        let encoded = JobManager::encode_persisted_detail(&job).unwrap().unwrap();
2119        let parsed = JobManager::parse_persisted_detail(Some(&encoded)).unwrap();
2120
2121        assert_eq!(parsed.status, job.status);
2122        assert_eq!(parsed.detail, job.detail);
2123        assert_eq!(parsed.retry.attempt, job.retry.attempt);
2124        assert_eq!(parsed.history.len(), job.history.len());
2125    }
2126
2127    #[test]
2128    fn parse_persisted_detail_returns_none_for_none_input() {
2129        assert!(JobManager::parse_persisted_detail(None).is_none());
2130    }
2131
2132    #[test]
2133    fn parse_persisted_detail_returns_none_for_invalid_json() {
2134        assert!(JobManager::parse_persisted_detail(Some("not json")).is_none());
2135    }
2136
2137    // ── Helper functions ───────────────────────────────────────────────
2138
2139    #[test]
2140    fn job_status_round_trip_str() {
2141        let statuses = [
2142            JobStatus::Queued,
2143            JobStatus::Running,
2144            JobStatus::Paused,
2145            JobStatus::Completed,
2146            JobStatus::Failed,
2147            JobStatus::Cancelled,
2148        ];
2149        for status in &statuses {
2150            let s = job_status_to_str(*status);
2151            let parsed = job_status_from_str(s);
2152            assert_eq!(parsed, Some(*status), "round-trip failed for {s:?}");
2153        }
2154    }
2155
2156    #[test]
2157    fn job_status_from_str_returns_none_for_unknown() {
2158        assert_eq!(job_status_from_str("unknown"), None);
2159        assert_eq!(job_status_from_str(""), None);
2160    }
2161
2162    #[test]
2163    fn truncate_preview_limits_to_120_chars() {
2164        let long = "a".repeat(200);
2165        let truncated = truncate_preview(&long);
2166        assert_eq!(truncated.len(), 120);
2167    }
2168
2169    #[test]
2170    fn truncate_preview_preserves_short_strings() {
2171        let short = "hello";
2172        assert_eq!(truncate_preview(short), "hello");
2173    }
2174
2175    #[test]
2176    fn runtime_status_to_job_state_maps_correctly() {
2177        assert_eq!(
2178            runtime_status_to_job_state(JobStatus::Queued),
2179            JobStateStatus::Queued
2180        );
2181        assert_eq!(
2182            runtime_status_to_job_state(JobStatus::Running),
2183            JobStateStatus::Running
2184        );
2185        assert_eq!(
2186            runtime_status_to_job_state(JobStatus::Paused),
2187            JobStateStatus::Running
2188        );
2189        assert_eq!(
2190            runtime_status_to_job_state(JobStatus::Completed),
2191            JobStateStatus::Completed
2192        );
2193        assert_eq!(
2194            runtime_status_to_job_state(JobStatus::Failed),
2195            JobStateStatus::Failed
2196        );
2197        assert_eq!(
2198            runtime_status_to_job_state(JobStatus::Cancelled),
2199            JobStateStatus::Cancelled
2200        );
2201    }
2202
2203    #[test]
2204    fn job_state_status_to_runtime_maps_correctly() {
2205        assert_eq!(
2206            job_state_status_to_runtime(JobStateStatus::Queued),
2207            JobStatus::Queued
2208        );
2209        assert_eq!(
2210            job_state_status_to_runtime(JobStateStatus::Running),
2211            JobStatus::Running
2212        );
2213        assert_eq!(
2214            job_state_status_to_runtime(JobStateStatus::Completed),
2215            JobStatus::Completed
2216        );
2217        assert_eq!(
2218            job_state_status_to_runtime(JobStateStatus::Failed),
2219            JobStatus::Failed
2220        );
2221        assert_eq!(
2222            job_state_status_to_runtime(JobStateStatus::Cancelled),
2223            JobStatus::Cancelled
2224        );
2225    }
2226
2227    #[test]
2228    fn preview_from_initial_history_new() {
2229        let preview = preview_from_initial_history(&InitialHistory::New);
2230        assert_eq!(preview, "New conversation");
2231    }
2232
2233    #[test]
2234    fn preview_from_initial_history_forked() {
2235        let preview = preview_from_initial_history(&InitialHistory::Forked(vec![json!("hello")]));
2236        assert!(preview.contains("hello"));
2237    }
2238
2239    #[test]
2240    fn preview_from_initial_history_resumed() {
2241        let preview = preview_from_initial_history(&InitialHistory::Resumed {
2242            conversation_id: "test".to_string(),
2243            history: vec![json!("world")],
2244            rollout_path: PathBuf::from("/tmp/test"),
2245        });
2246        assert!(preview.contains("world"));
2247    }
2248
2249    #[test]
2250    fn json_optional_string_handles_null() {
2251        assert!(json_optional_string(&Value::Null).is_none());
2252    }
2253
2254    #[test]
2255    fn json_optional_string_handles_string() {
2256        assert_eq!(
2257            json_optional_string(&Value::String("hello".to_string())),
2258            Some("hello".to_string())
2259        );
2260    }
2261
2262    #[test]
2263    fn json_optional_string_handles_non_string() {
2264        assert!(json_optional_string(&json!(42)).is_none());
2265    }
2266
2267    #[test]
2268    fn parse_retry_metadata_returns_default_for_none() {
2269        let retry = parse_retry_metadata(None);
2270        assert_eq!(retry.attempt, 0);
2271        assert_eq!(retry.max_attempts, DEFAULT_JOB_MAX_ATTEMPTS);
2272        assert_eq!(retry.backoff_base_ms, DEFAULT_JOB_BACKOFF_BASE_MS);
2273    }
2274
2275    #[test]
2276    fn parse_retry_metadata_parses_fields() {
2277        let value = json!({
2278            "attempt": 2,
2279            "max_attempts": 5,
2280            "backoff_base_ms": 1000,
2281            "next_backoff_ms": 2000,
2282            "next_retry_at": 1234567890i64
2283        });
2284        let retry = parse_retry_metadata(Some(&value));
2285        assert_eq!(retry.attempt, 2);
2286        assert_eq!(retry.max_attempts, 5);
2287        assert_eq!(retry.backoff_base_ms, 1000);
2288        assert_eq!(retry.next_backoff_ms, 2000);
2289        assert_eq!(retry.next_retry_at, Some(1234567890));
2290    }
2291
2292    #[test]
2293    fn parse_history_entry_returns_none_without_status() {
2294        let value = json!({"at": 1, "phase": "test"});
2295        assert!(parse_history_entry(&value).is_none());
2296    }
2297
2298    #[test]
2299    fn parse_history_entry_parses_valid_entry() {
2300        let value = json!({
2301            "at": 100,
2302            "phase": "running",
2303            "status": "running",
2304            "progress": 50,
2305            "detail": "working",
2306            "retry": {"attempt": 0, "max_attempts": 3, "backoff_base_ms": 500}
2307        });
2308        let entry = parse_history_entry(&value).unwrap();
2309        assert_eq!(entry.at, 100);
2310        assert_eq!(entry.phase, "running");
2311        assert_eq!(entry.status, JobStatus::Running);
2312        assert_eq!(entry.progress, Some(50));
2313        assert_eq!(entry.detail.as_deref(), Some("working"));
2314    }
2315}