Skip to main content

codewhale_core/
lib.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use anyhow::Result;
6use codewhale_agent::ModelRegistry;
7use codewhale_config::{CliRuntimeOverrides, ConfigToml, ProviderKind};
8use codewhale_execpolicy::{
9    AskForApproval, ExecApprovalRequirement, ExecPolicyContext, ExecPolicyDecision,
10    ExecPolicyEngine,
11};
12use codewhale_hooks::{HookDispatcher, HookEvent};
13use codewhale_mcp::{
14    McpManager, McpStartupCompleteEvent, McpStartupStatus as McpManagerStartupStatus,
15};
16use codewhale_protocol::{
17    AppResponse, EventFrame, ExecApprovalRequestEvent, PromptRequest, PromptResponse,
18    ResponseChannel, ReviewDecision, Thread, ThreadForkParams, ThreadListParams, ThreadReadParams,
19    ThreadRequest, ThreadResponse, ThreadResumeParams, ThreadSetNameParams, ThreadStatus,
20    ToolPayload,
21};
22use codewhale_state::{
23    JobStateRecord, JobStateStatus, SessionSource, StateStore, ThreadListFilters, ThreadMetadata,
24    ThreadStatus as PersistedThreadStatus,
25};
26use codewhale_tools::{ToolCall, ToolRegistry};
27use serde_json::{Value, json};
28use uuid::Uuid;
29
30/// How a new thread's conversation history is initialized.
31#[derive(Debug, Clone)]
32pub enum InitialHistory {
33    /// Start with an empty conversation.
34    New,
35    /// Forked from an existing thread with the given history items.
36    Forked(Vec<Value>),
37    /// Resumed from a persisted thread with its full history.
38    Resumed {
39        conversation_id: String,
40        history: Vec<Value>,
41        rollout_path: PathBuf,
42    },
43}
44
45/// Result of spawning or resuming a thread.
46#[derive(Debug, Clone)]
47pub struct NewThread {
48    /// The thread metadata.
49    pub thread: Thread,
50    /// Resolved model identifier.
51    pub model: String,
52    /// Provider that serves the model.
53    pub model_provider: String,
54    /// Working directory for the thread.
55    pub cwd: PathBuf,
56    /// Approval policy override, if any.
57    pub approval_policy: Option<String>,
58    /// Sandbox mode override, if any.
59    pub sandbox: Option<String>,
60}
61
62/// Status of a background job.
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum JobStatus {
65    /// Waiting to be picked up.
66    Queued,
67    /// Currently executing.
68    Running,
69    /// Temporarily paused.
70    Paused,
71    /// Finished successfully.
72    Completed,
73    /// Finished with an error.
74    Failed,
75    /// Cancelled by the user.
76    Cancelled,
77}
78
79const JOB_DETAIL_SCHEMA_VERSION: u8 = 1;
80const DEFAULT_JOB_MAX_ATTEMPTS: u32 = 3;
81const DEFAULT_JOB_BACKOFF_BASE_MS: u64 = 500;
82const MAX_JOB_HISTORY_ENTRIES: usize = 64;
83
84/// Retry state for a job that failed and may be retried.
85#[derive(Debug, Clone)]
86pub struct JobRetryMetadata {
87    /// Current attempt number (0 = not yet retried).
88    pub attempt: u32,
89    /// Maximum number of retry attempts before giving up.
90    pub max_attempts: u32,
91    /// Base delay in milliseconds for exponential backoff.
92    pub backoff_base_ms: u64,
93    /// Computed delay in milliseconds until the next retry.
94    pub next_backoff_ms: u64,
95    /// Timestamp when the next retry should be attempted.
96    pub next_retry_at: Option<i64>,
97}
98
99impl Default for JobRetryMetadata {
100    fn default() -> Self {
101        Self {
102            attempt: 0,
103            max_attempts: DEFAULT_JOB_MAX_ATTEMPTS,
104            backoff_base_ms: DEFAULT_JOB_BACKOFF_BASE_MS,
105            next_backoff_ms: 0,
106            next_retry_at: None,
107        }
108    }
109}
110
111/// A single entry in a job's history log.
112#[derive(Debug, Clone)]
113pub struct JobHistoryEntry {
114    /// Timestamp when this entry was recorded.
115    pub at: i64,
116    /// Phase name (e.g., "created", "running", "failed").
117    pub phase: String,
118    /// Job status at this point in time.
119    pub status: JobStatus,
120    /// Progress percentage at this point, if available.
121    pub progress: Option<u8>,
122    /// Human-readable detail message.
123    pub detail: Option<String>,
124    /// Retry state snapshot at this point.
125    pub retry: JobRetryMetadata,
126}
127
128#[derive(Debug, Clone)]
129struct PersistedJobDetail {
130    pub status: JobStatus,
131    pub detail: Option<String>,
132    pub retry: JobRetryMetadata,
133    pub history: Vec<JobHistoryEntry>,
134}
135
136/// A complete job record with all metadata and history.
137#[derive(Debug, Clone)]
138pub struct JobRecord {
139    /// Unique job identifier.
140    pub id: String,
141    /// Human-readable job name.
142    pub name: String,
143    /// Current job status.
144    pub status: JobStatus,
145    /// Current progress percentage (0-100).
146    pub progress: Option<u8>,
147    /// Human-readable detail about the current state.
148    pub detail: Option<String>,
149    /// Retry state for failed jobs.
150    pub retry: JobRetryMetadata,
151    /// Chronological history of state transitions.
152    pub history: Vec<JobHistoryEntry>,
153    /// Timestamp when the job was created.
154    pub created_at: i64,
155    /// Timestamp of the last state change.
156    pub updated_at: i64,
157}
158
159/// Manages background jobs with retry logic and persistence.
160#[derive(Debug, Default)]
161pub struct JobManager {
162    jobs: HashMap<String, JobRecord>,
163}
164
165impl JobManager {
166    fn now_ts() -> i64 {
167        chrono::Utc::now().timestamp()
168    }
169
170    fn deterministic_backoff_ms(retry: &JobRetryMetadata) -> u64 {
171        if retry.attempt == 0 {
172            return 0;
173        }
174        let exponent = retry.attempt.saturating_sub(1).min(20);
175        let multiplier = 1u64.checked_shl(exponent).unwrap_or(u64::MAX);
176        retry.backoff_base_ms.saturating_mul(multiplier)
177    }
178
179    fn clear_retry_schedule(retry: &mut JobRetryMetadata) {
180        retry.next_backoff_ms = 0;
181        retry.next_retry_at = None;
182    }
183
184    fn push_history(job: &mut JobRecord, phase: &str) {
185        job.history.push(JobHistoryEntry {
186            at: job.updated_at,
187            phase: phase.to_string(),
188            status: job.status,
189            progress: job.progress,
190            detail: job.detail.clone(),
191            retry: job.retry.clone(),
192        });
193        if job.history.len() > MAX_JOB_HISTORY_ENTRIES {
194            let to_drain = job.history.len() - MAX_JOB_HISTORY_ENTRIES;
195            job.history.drain(0..to_drain);
196        }
197    }
198
199    fn parse_persisted_detail(raw: Option<&str>) -> Option<PersistedJobDetail> {
200        let raw = raw?;
201        let parsed: Value = serde_json::from_str(raw).ok()?;
202        let status = parsed
203            .get("status")
204            .and_then(Value::as_str)
205            .and_then(job_status_from_str)?;
206        let detail = parsed.get("detail").and_then(json_optional_string);
207        let retry = parse_retry_metadata(parsed.get("retry"));
208        let history = parsed
209            .get("history")
210            .and_then(Value::as_array)
211            .map(|items| {
212                items
213                    .iter()
214                    .filter_map(parse_history_entry)
215                    .collect::<Vec<_>>()
216            })
217            .unwrap_or_default();
218        Some(PersistedJobDetail {
219            status,
220            detail,
221            retry,
222            history,
223        })
224    }
225
226    fn encode_persisted_detail(job: &JobRecord) -> Result<Option<String>> {
227        let encoded = json!({
228            "schema_version": JOB_DETAIL_SCHEMA_VERSION,
229            "status": job_status_to_str(job.status),
230            "detail": job.detail.clone(),
231            "retry": job_retry_to_value(&job.retry),
232            "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
233        })
234        .to_string();
235        Ok(Some(encoded))
236    }
237
238    /// Enqueues a new job and returns its record.
239    pub fn enqueue(&mut self, name: impl Into<String>) -> JobRecord {
240        let now = Self::now_ts();
241        let id = format!("job-{}", Uuid::new_v4());
242        let mut job = JobRecord {
243            id: id.clone(),
244            name: name.into(),
245            status: JobStatus::Queued,
246            progress: Some(0),
247            detail: None,
248            retry: JobRetryMetadata::default(),
249            history: Vec::new(),
250            created_at: now,
251            updated_at: now,
252        };
253        Self::push_history(&mut job, "created");
254        self.jobs.insert(id, job.clone());
255        job
256    }
257
258    /// Transitions a job to running and clears its retry schedule.
259    pub fn set_running(&mut self, id: &str) {
260        if let Some(job) = self.jobs.get_mut(id) {
261            job.status = JobStatus::Running;
262            Self::clear_retry_schedule(&mut job.retry);
263            job.updated_at = Self::now_ts();
264            Self::push_history(job, "running");
265        }
266    }
267
268    /// Updates a job's progress (clamped to 100) and optional detail message.
269    pub fn update_progress(&mut self, id: &str, progress: u8, detail: Option<String>) {
270        if let Some(job) = self.jobs.get_mut(id) {
271            job.progress = Some(progress.min(100));
272            job.detail = detail;
273            job.updated_at = Self::now_ts();
274            Self::push_history(job, "progress_updated");
275        }
276    }
277
278    /// Marks a job as completed with 100% progress and clears its retry schedule.
279    pub fn complete(&mut self, id: &str) {
280        if let Some(job) = self.jobs.get_mut(id) {
281            job.status = JobStatus::Completed;
282            job.progress = Some(100);
283            Self::clear_retry_schedule(&mut job.retry);
284            job.updated_at = Self::now_ts();
285            Self::push_history(job, "completed");
286        }
287    }
288
289    /// Marks a job as failed and schedules a retry if attempts remain.
290    pub fn fail(&mut self, id: &str, detail: impl Into<String>) {
291        if let Some(job) = self.jobs.get_mut(id) {
292            let now = Self::now_ts();
293            job.status = JobStatus::Failed;
294            job.detail = Some(detail.into());
295            if job.retry.attempt < job.retry.max_attempts {
296                job.retry.attempt += 1;
297                job.retry.next_backoff_ms = Self::deterministic_backoff_ms(&job.retry);
298                let delay_secs = ((job.retry.next_backoff_ms.saturating_add(999)) / 1000)
299                    .min(i64::MAX as u64) as i64;
300                job.retry.next_retry_at = Some(now.saturating_add(delay_secs));
301            } else {
302                Self::clear_retry_schedule(&mut job.retry);
303            }
304            job.updated_at = now;
305            Self::push_history(job, "failed");
306        }
307    }
308
309    /// Cancels a job and clears any pending retry schedule.
310    pub fn cancel(&mut self, id: &str) {
311        if let Some(job) = self.jobs.get_mut(id) {
312            job.status = JobStatus::Cancelled;
313            Self::clear_retry_schedule(&mut job.retry);
314            job.updated_at = Self::now_ts();
315            Self::push_history(job, "cancelled");
316        }
317    }
318
319    /// Pauses a job, optionally updating its detail message.
320    pub fn pause(&mut self, id: &str, detail: Option<String>) {
321        if let Some(job) = self.jobs.get_mut(id) {
322            job.status = JobStatus::Paused;
323            if detail.is_some() {
324                job.detail = detail;
325            }
326            job.updated_at = Self::now_ts();
327            Self::push_history(job, "paused");
328        }
329    }
330
331    /// Resumes a paused or failed job back to running status.
332    pub fn resume(&mut self, id: &str, detail: Option<String>) {
333        if let Some(job) = self.jobs.get_mut(id) {
334            job.status = JobStatus::Running;
335            if detail.is_some() {
336                job.detail = detail;
337            }
338            Self::clear_retry_schedule(&mut job.retry);
339            job.updated_at = Self::now_ts();
340            Self::push_history(job, "resumed");
341        }
342    }
343
344    /// Returns all jobs sorted by most recently updated first.
345    pub fn list(&self) -> Vec<JobRecord> {
346        let mut out = self.jobs.values().cloned().collect::<Vec<_>>();
347        out.sort_by_key(|job| std::cmp::Reverse(job.updated_at));
348        out
349    }
350
351    /// Returns the history entries for a job, or an empty vec if not found.
352    pub fn history(&self, id: &str) -> Vec<JobHistoryEntry> {
353        self.jobs
354            .get(id)
355            .map(|job| job.history.clone())
356            .unwrap_or_default()
357    }
358
359    /// Resets queued or running jobs back to queued on application resume.
360    pub fn resume_pending(&mut self) -> Vec<JobRecord> {
361        let mut resumed = Vec::new();
362        for job in self.jobs.values_mut() {
363            if matches!(job.status, JobStatus::Queued | JobStatus::Running) {
364                job.status = JobStatus::Queued;
365                job.updated_at = Self::now_ts();
366                Self::push_history(job, "queued_after_resume");
367                resumed.push(job.clone());
368            }
369        }
370        resumed
371    }
372
373    /// Loads jobs from the state store, deserializing extended detail when available.
374    pub fn load_from_store(&mut self, store: &StateStore) -> Result<()> {
375        let persisted = store.list_jobs(Some(500))?;
376        for job in persisted {
377            let fallback_status = job_state_status_to_runtime(job.status);
378            let parsed = Self::parse_persisted_detail(job.detail.as_deref());
379            let (status, detail, retry, history) = if let Some(detail_state) = parsed {
380                (
381                    detail_state.status,
382                    detail_state.detail,
383                    detail_state.retry,
384                    detail_state.history,
385                )
386            } else {
387                (
388                    fallback_status,
389                    job.detail,
390                    JobRetryMetadata::default(),
391                    Vec::new(),
392                )
393            };
394            self.jobs.insert(
395                job.id.clone(),
396                JobRecord {
397                    id: job.id,
398                    name: job.name,
399                    status,
400                    progress: job.progress,
401                    detail,
402                    retry,
403                    history,
404                    created_at: job.created_at,
405                    updated_at: job.updated_at,
406                },
407            );
408        }
409        Ok(())
410    }
411
412    /// Persists a single job's current state to the state store.
413    pub fn persist_job(&self, store: &StateStore, id: &str) -> Result<()> {
414        let Some(job) = self.jobs.get(id) else {
415            return Ok(());
416        };
417        let encoded_detail = Self::encode_persisted_detail(job)?;
418        store.upsert_job(&JobStateRecord {
419            id: job.id.clone(),
420            name: job.name.clone(),
421            status: runtime_status_to_job_state(job.status),
422            progress: job.progress,
423            detail: encoded_detail,
424            created_at: job.created_at,
425            updated_at: job.updated_at,
426        })
427    }
428
429    /// Persists all in-memory jobs to the state store.
430    pub fn persist_all(&self, store: &StateStore) -> Result<()> {
431        for id in self.jobs.keys() {
432            self.persist_job(store, id)?;
433        }
434        Ok(())
435    }
436}
437
438/// Manages thread lifecycle: spawn, resume, fork, archive, and persistence.
439pub struct ThreadManager {
440    store: StateStore,
441    running_threads: HashMap<String, Thread>,
442    cli_version: String,
443}
444
445impl ThreadManager {
446    /// Creates a new `ThreadManager` backed by the given state store.
447    pub fn new(store: StateStore) -> Self {
448        Self {
449            store,
450            running_threads: HashMap::new(),
451            cli_version: env!("CARGO_PKG_VERSION").to_string(),
452        }
453    }
454
455    /// Returns a reference to the underlying state store.
456    pub fn state_store(&self) -> &StateStore {
457        &self.store
458    }
459
460    /// Spawns a new thread with the given initial history and persists it.
461    pub fn spawn_thread_with_history(
462        &mut self,
463        model_provider: String,
464        cwd: PathBuf,
465        initial_history: InitialHistory,
466        persist_extended_history: bool,
467    ) -> Result<NewThread> {
468        let id = format!("thread-{}", Uuid::new_v4());
469        let now = chrono::Utc::now().timestamp();
470        let preview = preview_from_initial_history(&initial_history);
471        let source = match initial_history {
472            InitialHistory::New => SessionSource::Interactive,
473            InitialHistory::Forked(_) => SessionSource::Fork,
474            InitialHistory::Resumed { .. } => SessionSource::Resume,
475        };
476        let thread = Thread {
477            id: id.clone(),
478            preview,
479            ephemeral: !persist_extended_history,
480            model_provider: model_provider.clone(),
481            created_at: now,
482            updated_at: now,
483            status: ThreadStatus::Running,
484            path: None,
485            cwd: cwd.clone(),
486            cli_version: self.cli_version.clone(),
487            source: match source {
488                SessionSource::Interactive => codewhale_protocol::SessionSource::Interactive,
489                SessionSource::Resume => codewhale_protocol::SessionSource::Resume,
490                SessionSource::Fork => codewhale_protocol::SessionSource::Fork,
491                SessionSource::Api => codewhale_protocol::SessionSource::Api,
492                SessionSource::Unknown => codewhale_protocol::SessionSource::Unknown,
493            },
494            name: None,
495        };
496        self.persist_thread(&thread, None)?;
497        match &initial_history {
498            InitialHistory::Forked(items) => {
499                for item in items {
500                    self.store.append_message(
501                        &thread.id,
502                        "history",
503                        &item.to_string(),
504                        Some(item.clone()),
505                    )?;
506                }
507            }
508            InitialHistory::Resumed { history, .. } => {
509                for item in history {
510                    self.store.append_message(
511                        &thread.id,
512                        "history",
513                        &item.to_string(),
514                        Some(item.clone()),
515                    )?;
516                }
517            }
518            InitialHistory::New => {}
519        }
520        self.running_threads
521            .insert(thread.id.clone(), thread.clone());
522        Ok(NewThread {
523            thread,
524            model: "auto".to_string(),
525            model_provider,
526            cwd,
527            approval_policy: None,
528            sandbox: None,
529        })
530    }
531
532    /// Resumes an existing thread, returning `None` if not found.
533    pub fn resume_thread_with_history(
534        &mut self,
535        params: &ThreadResumeParams,
536        fallback_cwd: &Path,
537        model_provider: String,
538    ) -> Result<Option<NewThread>> {
539        if params.history.is_none()
540            && let Some(thread) = self.running_threads.get(&params.thread_id).cloned()
541        {
542            return Ok(Some(NewThread {
543                model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
544                model_provider: params.model_provider.clone().unwrap_or(model_provider),
545                cwd: params.cwd.clone().unwrap_or_else(|| thread.cwd.clone()),
546                approval_policy: params.approval_policy.clone(),
547                sandbox: params.sandbox.clone(),
548                thread,
549            }));
550        }
551
552        let persisted = self.store.get_thread(&params.thread_id)?;
553        let Some(metadata) = persisted else {
554            return Ok(None);
555        };
556        let mut thread = to_protocol_thread(metadata);
557        thread.status = ThreadStatus::Running;
558        thread.updated_at = chrono::Utc::now().timestamp();
559        thread.cwd = params
560            .cwd
561            .clone()
562            .unwrap_or_else(|| fallback_cwd.to_path_buf());
563        self.persist_thread(&thread, None)?;
564        self.running_threads
565            .insert(thread.id.clone(), thread.clone());
566        if let Some(history) = params.history.as_ref() {
567            for item in history {
568                self.store.append_message(
569                    &thread.id,
570                    "history",
571                    &item.to_string(),
572                    Some(item.clone()),
573                )?;
574            }
575        }
576
577        Ok(Some(NewThread {
578            model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
579            model_provider: params.model_provider.clone().unwrap_or(model_provider),
580            cwd: thread.cwd.clone(),
581            approval_policy: params.approval_policy.clone(),
582            sandbox: params.sandbox.clone(),
583            thread,
584        }))
585    }
586
587    /// Forks an existing thread into a new one, inheriting the parent's provider.
588    pub fn fork_thread(
589        &mut self,
590        params: &ThreadForkParams,
591        fallback_cwd: &Path,
592    ) -> Result<Option<NewThread>> {
593        let parent = self.store.get_thread(&params.thread_id)?;
594        let Some(parent) = parent else {
595            return Ok(None);
596        };
597        let parent_thread = to_protocol_thread(parent);
598        let new = self.spawn_thread_with_history(
599            params
600                .model_provider
601                .clone()
602                .unwrap_or_else(|| parent_thread.model_provider.clone()),
603            params
604                .cwd
605                .clone()
606                .unwrap_or_else(|| fallback_cwd.to_path_buf()),
607            InitialHistory::Forked(vec![json!({
608                "type": "fork",
609                "from_thread_id": parent_thread.id
610            })]),
611            params.persist_extended_history,
612        )?;
613        Ok(Some(new))
614    }
615
616    /// Lists threads matching the given filter parameters.
617    pub fn list_threads(&self, params: &ThreadListParams) -> Result<Vec<Thread>> {
618        let list = self.store.list_threads(ThreadListFilters {
619            include_archived: params.include_archived,
620            limit: params.limit,
621        })?;
622        Ok(list.into_iter().map(to_protocol_thread).collect())
623    }
624
625    /// Reads a single thread by id, or `None` if not found.
626    pub fn read_thread(&self, params: &ThreadReadParams) -> Result<Option<Thread>> {
627        Ok(self
628            .store
629            .get_thread(&params.thread_id)?
630            .map(to_protocol_thread))
631    }
632
633    /// Sets the display name for a thread, returning the updated thread or `None`.
634    pub fn set_thread_name(&mut self, params: &ThreadSetNameParams) -> Result<Option<Thread>> {
635        let Some(mut metadata) = self.store.get_thread(&params.thread_id)? else {
636            return Ok(None);
637        };
638        metadata.name = Some(params.name.clone());
639        metadata.updated_at = chrono::Utc::now().timestamp();
640        self.store.upsert_thread(&metadata)?;
641        let updated = to_protocol_thread(metadata);
642        self.running_threads
643            .insert(updated.id.clone(), updated.clone());
644        Ok(Some(updated))
645    }
646
647    /// Archives a thread so it no longer appears in default listings.
648    pub fn archive_thread(&mut self, thread_id: &str) -> Result<()> {
649        self.store.mark_archived(thread_id)?;
650        if let Some(thread) = self.running_threads.get_mut(thread_id) {
651            thread.status = ThreadStatus::Archived;
652        }
653        Ok(())
654    }
655
656    /// Restores an archived thread to active status.
657    pub fn unarchive_thread(&mut self, thread_id: &str) -> Result<()> {
658        self.store.mark_unarchived(thread_id)?;
659        Ok(())
660    }
661
662    /// Records a user message in a thread and updates its preview and timestamp.
663    pub fn touch_message(&mut self, thread_id: &str, input: &str) -> Result<()> {
664        let Some(mut metadata) = self.store.get_thread(thread_id)? else {
665            return Ok(());
666        };
667        metadata.updated_at = chrono::Utc::now().timestamp();
668        metadata.preview = truncate_preview(input);
669        metadata.status = PersistedThreadStatus::Running;
670        self.store.upsert_thread(&metadata)?;
671        if let Some(thread) = self.running_threads.get_mut(thread_id) {
672            thread.updated_at = metadata.updated_at;
673            thread.preview = metadata.preview;
674            thread.status = ThreadStatus::Running;
675        }
676        let message_id = self.store.append_message(thread_id, "user", input, None)?;
677        self.store.save_checkpoint(
678            thread_id,
679            "latest",
680            &json!({
681                "reason": "thread_message",
682                "message_id": message_id,
683                "role": "user",
684                "preview": truncate_preview(input),
685                "updated_at": metadata.updated_at
686            }),
687        )?;
688        Ok(())
689    }
690
691    fn persist_thread(&self, thread: &Thread, rollout_path: Option<PathBuf>) -> Result<()> {
692        self.store.upsert_thread(&ThreadMetadata {
693            id: thread.id.clone(),
694            rollout_path,
695            preview: thread.preview.clone(),
696            ephemeral: thread.ephemeral,
697            model_provider: thread.model_provider.clone(),
698            created_at: thread.created_at,
699            updated_at: thread.updated_at,
700            status: to_persisted_status(&thread.status),
701            path: thread.path.clone(),
702            cwd: thread.cwd.clone(),
703            cli_version: thread.cli_version.clone(),
704            source: to_persisted_source(&thread.source),
705            name: thread.name.clone(),
706            sandbox_policy: None,
707            approval_mode: None,
708            archived: matches!(thread.status, ThreadStatus::Archived),
709            archived_at: None,
710            git_sha: None,
711            git_branch: None,
712            git_origin_url: None,
713            memory_mode: None,
714            current_leaf_id: None,
715        })
716    }
717}
718
719/// Top-level runtime combining config, model registry, threads, tools, MCP, and hooks.
720pub struct Runtime {
721    /// Resolved application configuration.
722    pub config: ConfigToml,
723    /// Registry of available model providers.
724    pub model_registry: ModelRegistry,
725    /// Manages conversation thread lifecycle.
726    pub thread_manager: ThreadManager,
727    /// Registry of callable tools.
728    pub tool_registry: Arc<ToolRegistry>,
729    /// Manager for MCP server connections.
730    pub mcp_manager: Arc<McpManager>,
731    /// Engine for evaluating execution policy decisions.
732    pub exec_policy: ExecPolicyEngine,
733    /// Dispatcher for lifecycle hooks.
734    pub hooks: HookDispatcher,
735    /// Manager for background job lifecycle.
736    pub jobs: JobManager,
737}
738
739impl Runtime {
740    /// Constructs a new `Runtime`, loading existing jobs from the state store.
741    pub fn new(
742        config: ConfigToml,
743        model_registry: ModelRegistry,
744        state: StateStore,
745        tool_registry: Arc<ToolRegistry>,
746        mcp_manager: Arc<McpManager>,
747        exec_policy: ExecPolicyEngine,
748        hooks: HookDispatcher,
749    ) -> Self {
750        let mut jobs = JobManager::default();
751        let _ = jobs.load_from_store(&state);
752        Self {
753            config,
754            model_registry,
755            thread_manager: ThreadManager::new(state),
756            tool_registry,
757            mcp_manager,
758            exec_policy,
759            hooks,
760            jobs,
761        }
762    }
763
764    fn persisted_thread_data(&self, thread_id: &str) -> Result<Value> {
765        let history = self
766            .thread_manager
767            .state_store()
768            .list_messages(thread_id, Some(500))?
769            .into_iter()
770            .map(|message| {
771                json!({
772                    "id": message.id,
773                    "role": message.role,
774                    "content": message.content,
775                    "item": message.item,
776                    "created_at": message.created_at
777                })
778            })
779            .collect::<Vec<_>>();
780
781        let checkpoint = self
782            .thread_manager
783            .state_store()
784            .load_checkpoint(thread_id, None)?
785            .map(|record| {
786                json!({
787                    "checkpoint_id": record.checkpoint_id,
788                    "state": record.state,
789                    "created_at": record.created_at
790                })
791            });
792
793        Ok(json!({
794            "history": history,
795            "checkpoint": checkpoint
796        }))
797    }
798
799    fn persist_latest_checkpoint(&self, thread_id: &str, reason: &str, state: Value) -> Result<()> {
800        self.thread_manager.state_store().save_checkpoint(
801            thread_id,
802            "latest",
803            &json!({
804                "reason": reason,
805                "saved_at": chrono::Utc::now().timestamp(),
806                "state": state
807            }),
808        )
809    }
810
811    /// Dispatches a thread request (create, start, resume, fork, list, read, etc.).
812    pub async fn handle_thread(&mut self, req: ThreadRequest) -> Result<ThreadResponse> {
813        match req {
814            ThreadRequest::Create { .. } => {
815                let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
816                let new = self.thread_manager.spawn_thread_with_history(
817                    "deepseek".to_string(),
818                    cwd,
819                    InitialHistory::New,
820                    false,
821                )?;
822                let mut response = thread_response_from_new("created", new);
823                response.data = self.persisted_thread_data(&response.thread_id)?;
824                Ok(response)
825            }
826            ThreadRequest::Start(params) => {
827                let cwd = params.cwd.clone().unwrap_or_else(|| {
828                    std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
829                });
830                let new = self.thread_manager.spawn_thread_with_history(
831                    params
832                        .model_provider
833                        .clone()
834                        .unwrap_or_else(|| "deepseek".to_string()),
835                    cwd,
836                    InitialHistory::New,
837                    params.persist_extended_history,
838                )?;
839                let mut response = thread_response_from_new("started", new);
840                response.data = self.persisted_thread_data(&response.thread_id)?;
841                Ok(response)
842            }
843            ThreadRequest::Resume(params) => {
844                let fallback_cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
845                if let Some(new) = self.thread_manager.resume_thread_with_history(
846                    &params,
847                    &fallback_cwd,
848                    "deepseek".to_string(),
849                )? {
850                    let mut response = thread_response_from_new("resumed", new);
851                    response.data = self.persisted_thread_data(&response.thread_id)?;
852                    Ok(response)
853                } else {
854                    Ok(ThreadResponse {
855                        thread_id: params.thread_id,
856                        status: "missing".to_string(),
857                        thread: None,
858                        threads: Vec::new(),
859                        model: None,
860                        model_provider: None,
861                        cwd: None,
862                        approval_policy: params.approval_policy,
863                        sandbox: params.sandbox,
864                        events: Vec::new(),
865                        data: json!({"error":"thread not found"}),
866                    })
867                }
868            }
869            ThreadRequest::Fork(params) => {
870                let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
871                if let Some(new) = self.thread_manager.fork_thread(&params, &cwd)? {
872                    let mut response = thread_response_from_new("forked", new);
873                    response.data = self.persisted_thread_data(&response.thread_id)?;
874                    Ok(response)
875                } else {
876                    Ok(ThreadResponse {
877                        thread_id: params.thread_id,
878                        status: "missing".to_string(),
879                        thread: None,
880                        threads: Vec::new(),
881                        model: None,
882                        model_provider: None,
883                        cwd: None,
884                        approval_policy: params.approval_policy,
885                        sandbox: params.sandbox,
886                        events: Vec::new(),
887                        data: json!({"error":"thread not found"}),
888                    })
889                }
890            }
891            ThreadRequest::List(params) => Ok(ThreadResponse {
892                thread_id: "list".to_string(),
893                status: "ok".to_string(),
894                thread: None,
895                threads: self.thread_manager.list_threads(&params)?,
896                model: None,
897                model_provider: None,
898                cwd: None,
899                approval_policy: None,
900                sandbox: None,
901                events: Vec::new(),
902                data: json!({}),
903            }),
904            ThreadRequest::Read(params) => {
905                let id = params.thread_id.clone();
906                let data = self.persisted_thread_data(&id)?;
907                Ok(ThreadResponse {
908                    thread_id: id,
909                    status: "ok".to_string(),
910                    thread: self.thread_manager.read_thread(&params)?,
911                    threads: Vec::new(),
912                    model: None,
913                    model_provider: None,
914                    cwd: None,
915                    approval_policy: None,
916                    sandbox: None,
917                    events: Vec::new(),
918                    data,
919                })
920            }
921            ThreadRequest::SetName(params) => Ok(ThreadResponse {
922                thread_id: params.thread_id.clone(),
923                status: "ok".to_string(),
924                thread: self.thread_manager.set_thread_name(&params)?,
925                threads: Vec::new(),
926                model: None,
927                model_provider: None,
928                cwd: None,
929                approval_policy: None,
930                sandbox: None,
931                events: Vec::new(),
932                data: json!({}),
933            }),
934            ThreadRequest::Archive { thread_id } => {
935                self.thread_manager.archive_thread(&thread_id)?;
936                Ok(ThreadResponse {
937                    thread_id,
938                    status: "archived".to_string(),
939                    thread: None,
940                    threads: Vec::new(),
941                    model: None,
942                    model_provider: None,
943                    cwd: None,
944                    approval_policy: None,
945                    sandbox: None,
946                    events: Vec::new(),
947                    data: json!({}),
948                })
949            }
950            ThreadRequest::Unarchive { thread_id } => {
951                self.thread_manager.unarchive_thread(&thread_id)?;
952                Ok(ThreadResponse {
953                    thread_id,
954                    status: "unarchived".to_string(),
955                    thread: None,
956                    threads: Vec::new(),
957                    model: None,
958                    model_provider: None,
959                    cwd: None,
960                    approval_policy: None,
961                    sandbox: None,
962                    events: Vec::new(),
963                    data: json!({}),
964                })
965            }
966            ThreadRequest::Message { thread_id, input } => {
967                self.thread_manager.touch_message(&thread_id, &input)?;
968                let response_id = format!("{thread_id}:{}", input.len());
969                self.hooks
970                    .emit(HookEvent::ResponseStart {
971                        response_id: response_id.clone(),
972                    })
973                    .await;
974                self.hooks
975                    .emit(HookEvent::ResponseEnd {
976                        response_id: response_id.clone(),
977                    })
978                    .await;
979
980                Ok(ThreadResponse {
981                    thread_id,
982                    status: "accepted".to_string(),
983                    thread: None,
984                    threads: Vec::new(),
985                    model: None,
986                    model_provider: None,
987                    cwd: None,
988                    approval_policy: None,
989                    sandbox: None,
990                    events: vec![
991                        EventFrame::ResponseStart {
992                            response_id: response_id.clone(),
993                        },
994                        EventFrame::ResponseDelta {
995                            response_id: response_id.clone(),
996                            delta: "queued".to_string(),
997                            channel: ResponseChannel::Text,
998                        },
999                        EventFrame::ResponseEnd { response_id },
1000                    ],
1001                    data: json!({}),
1002                })
1003            }
1004        }
1005    }
1006
1007    /// Resolves the model for a prompt, records the message, and returns the response.
1008    pub async fn handle_prompt(
1009        &mut self,
1010        req: PromptRequest,
1011        cli_overrides: &CliRuntimeOverrides,
1012    ) -> Result<PromptResponse> {
1013        let resolved = self.config.resolve_runtime_options(cli_overrides);
1014        let requested_model = req.model.clone().unwrap_or_else(|| resolved.model.clone());
1015        let selection = self
1016            .model_registry
1017            .resolve(Some(&requested_model), Some(resolved.provider));
1018        let resolved_model = selection.resolved.id.clone();
1019        let response_id = format!("resp-{}", Uuid::new_v4());
1020
1021        self.hooks
1022            .emit(HookEvent::ResponseStart {
1023                response_id: response_id.clone(),
1024            })
1025            .await;
1026        self.hooks
1027            .emit(HookEvent::ResponseDelta {
1028                response_id: response_id.clone(),
1029                delta: "model-selected".to_string(),
1030            })
1031            .await;
1032        self.hooks
1033            .emit(HookEvent::ResponseEnd {
1034                response_id: response_id.clone(),
1035            })
1036            .await;
1037
1038        let payload = json!({
1039            "provider": resolved.provider.as_str(),
1040            "model": resolved_model.clone(),
1041            "prompt": req.prompt,
1042            "telemetry": resolved.telemetry,
1043            "base_url": resolved.base_url,
1044            "has_api_key": resolved.api_key.as_ref().is_some_and(|k| !k.trim().is_empty()),
1045            "approval_policy": resolved.approval_policy,
1046            "sandbox_mode": resolved.sandbox_mode
1047        });
1048        if let Some(thread_id) = req.thread_id.as_ref() {
1049            self.thread_manager.touch_message(thread_id, &req.prompt)?;
1050            let assistant_message_id = self.thread_manager.store.append_message(
1051                thread_id,
1052                "assistant",
1053                &payload.to_string(),
1054                Some(payload.clone()),
1055            )?;
1056            self.persist_latest_checkpoint(
1057                thread_id,
1058                "prompt_response",
1059                json!({
1060                    "response_id": response_id.clone(),
1061                    "model": resolved_model.clone(),
1062                    "provider": resolved.provider.as_str(),
1063                    "assistant_message_id": assistant_message_id
1064                }),
1065            )?;
1066        }
1067
1068        Ok(PromptResponse {
1069            output: payload.to_string(),
1070            model: resolved_model,
1071            events: vec![
1072                EventFrame::ResponseStart {
1073                    response_id: response_id.clone(),
1074                },
1075                EventFrame::ResponseDelta {
1076                    response_id: response_id.clone(),
1077                    delta: "model-selected".to_string(),
1078                    channel: ResponseChannel::Text,
1079                },
1080                EventFrame::ResponseEnd { response_id },
1081            ],
1082        })
1083    }
1084
1085    /// Evaluates execution policy and dispatches a tool call.
1086    pub async fn invoke_tool(
1087        &self,
1088        call: ToolCall,
1089        approval_mode: AskForApproval,
1090        cwd: &Path,
1091    ) -> Result<Value> {
1092        let fallback_cwd = cwd.display().to_string();
1093        let (command, policy_cwd, execution_kind) = call.execution_subject(&fallback_cwd);
1094        let policy_tool = match &call.payload {
1095            ToolPayload::LocalShell { .. } => "exec_shell",
1096            _ => call.name.as_str(),
1097        };
1098        let decision = self.exec_policy.check(ExecPolicyContext {
1099            command: &command,
1100            cwd: &policy_cwd,
1101            tool: Some(policy_tool),
1102            path: None,
1103            ask_for_approval: approval_mode,
1104            sandbox_mode: None,
1105        })?;
1106        let precheck = policy_precheck_payload(&decision, &command, &policy_cwd, execution_kind);
1107        let response_id = format!("tool-{}", Uuid::new_v4());
1108        let call_id = call
1109            .raw_tool_call_id
1110            .clone()
1111            .unwrap_or_else(|| format!("tool-call-{}", Uuid::new_v4()));
1112        self.hooks
1113            .emit(HookEvent::ToolLifecycle {
1114                response_id: response_id.clone(),
1115                tool_name: call.name.clone(),
1116                phase: "precheck".to_string(),
1117                payload: precheck.clone(),
1118            })
1119            .await;
1120
1121        if !decision.allow {
1122            let reason = decision.reason().to_string();
1123            let approval_id = format!("approval-{}", Uuid::new_v4());
1124            let error_frame = EventFrame::Error {
1125                response_id: response_id.clone(),
1126                message: reason.clone(),
1127            };
1128            self.hooks
1129                .emit(HookEvent::ApprovalLifecycle {
1130                    approval_id,
1131                    phase: "denied".to_string(),
1132                    reason: Some(reason.clone()),
1133                })
1134                .await;
1135            self.hooks
1136                .emit(HookEvent::GenericEventFrame {
1137                    frame: error_frame.clone(),
1138                })
1139                .await;
1140            return Ok(json!({
1141                "ok": false,
1142                "status": "denied",
1143                "execution_kind": execution_kind,
1144                "response_id": response_id,
1145                "precheck": precheck,
1146                "error": reason,
1147                "events": [event_frame_payload(&error_frame)],
1148            }));
1149        }
1150
1151        if decision.requires_approval {
1152            let approval_id = format!("approval-{}", Uuid::new_v4());
1153            let reason = decision.reason().to_string();
1154            let maybe_approval_frame = approval_request_frame(
1155                &decision.requirement,
1156                call_id,
1157                approval_id.clone(),
1158                response_id.clone(),
1159                command.clone(),
1160                policy_cwd.clone(),
1161            );
1162            self.hooks
1163                .emit(HookEvent::ApprovalLifecycle {
1164                    approval_id: approval_id.clone(),
1165                    phase: "requested".to_string(),
1166                    reason: Some(reason.clone()),
1167                })
1168                .await;
1169            let mut events = Vec::new();
1170            if let Some(frame) = maybe_approval_frame {
1171                self.hooks
1172                    .emit(HookEvent::GenericEventFrame {
1173                        frame: frame.clone(),
1174                    })
1175                    .await;
1176                events.push(event_frame_payload(&frame));
1177            }
1178            return Ok(json!({
1179                "ok": false,
1180                "status": "approval_required",
1181                "execution_kind": execution_kind,
1182                "response_id": response_id,
1183                "approval_id": approval_id,
1184                "precheck": precheck,
1185                "error": reason,
1186                "events": events,
1187            }));
1188        }
1189
1190        let start_frame = EventFrame::ToolCallStart {
1191            response_id: response_id.clone(),
1192            tool_name: call.name.clone(),
1193            arguments: tool_payload_value(&call.payload),
1194        };
1195        self.hooks
1196            .emit(HookEvent::GenericEventFrame {
1197                frame: start_frame.clone(),
1198            })
1199            .await;
1200        self.hooks
1201            .emit(HookEvent::ToolLifecycle {
1202                response_id: response_id.clone(),
1203                tool_name: call.name.clone(),
1204                phase: "dispatching".to_string(),
1205                payload: json!({
1206                    "call_id": call_id,
1207                    "execution_kind": execution_kind
1208                }),
1209            })
1210            .await;
1211
1212        match self.tool_registry.dispatch(call.clone(), true).await {
1213            Ok(tool_output) => {
1214                let result_frame = EventFrame::ToolCallResult {
1215                    response_id: response_id.clone(),
1216                    tool_name: call.name.clone(),
1217                    output: tool_output_value(&tool_output),
1218                };
1219                self.hooks
1220                    .emit(HookEvent::GenericEventFrame {
1221                        frame: result_frame.clone(),
1222                    })
1223                    .await;
1224                self.hooks
1225                    .emit(HookEvent::ToolLifecycle {
1226                        response_id: response_id.clone(),
1227                        tool_name: call.name,
1228                        phase: "completed".to_string(),
1229                        payload: json!({ "ok": true }),
1230                    })
1231                    .await;
1232                Ok(json!({
1233                    "ok": true,
1234                    "status": "completed",
1235                    "execution_kind": execution_kind,
1236                    "response_id": response_id,
1237                    "precheck": precheck,
1238                    "output": tool_output,
1239                    "events": [
1240                        event_frame_payload(&start_frame),
1241                        event_frame_payload(&result_frame)
1242                    ]
1243                }))
1244            }
1245            Err(err) => {
1246                let message = format!("{err:?}");
1247                let error_frame = EventFrame::Error {
1248                    response_id: response_id.clone(),
1249                    message: message.clone(),
1250                };
1251                self.hooks
1252                    .emit(HookEvent::GenericEventFrame {
1253                        frame: error_frame.clone(),
1254                    })
1255                    .await;
1256                self.hooks
1257                    .emit(HookEvent::ToolLifecycle {
1258                        response_id: response_id.clone(),
1259                        tool_name: call.name,
1260                        phase: "failed".to_string(),
1261                        payload: json!({ "error": message.clone() }),
1262                    })
1263                    .await;
1264                Ok(json!({
1265                    "ok": false,
1266                    "status": "failed",
1267                    "execution_kind": execution_kind,
1268                    "response_id": response_id,
1269                    "precheck": precheck,
1270                    "error": message,
1271                    "events": [
1272                        event_frame_payload(&start_frame),
1273                        event_frame_payload(&error_frame)
1274                    ]
1275                }))
1276            }
1277        }
1278    }
1279
1280    /// Starts all configured MCP servers and emits startup events via hooks.
1281    pub async fn mcp_startup(&self) -> McpStartupCompleteEvent {
1282        let mut updates = Vec::new();
1283        let summary = self.mcp_manager.start_all(|update| {
1284            updates.push(update);
1285        });
1286        for update in updates {
1287            let status = match update.status {
1288                McpManagerStartupStatus::Starting => codewhale_protocol::McpStartupStatus::Starting,
1289                McpManagerStartupStatus::Ready => codewhale_protocol::McpStartupStatus::Ready,
1290                McpManagerStartupStatus::Failed { error } => {
1291                    codewhale_protocol::McpStartupStatus::Failed { error }
1292                }
1293                McpManagerStartupStatus::Cancelled => {
1294                    codewhale_protocol::McpStartupStatus::Cancelled
1295                }
1296            };
1297            self.hooks
1298                .emit(HookEvent::GenericEventFrame {
1299                    frame: EventFrame::McpStartupUpdate {
1300                        update: codewhale_protocol::McpStartupUpdateEvent {
1301                            server_name: update.server_name,
1302                            status,
1303                        },
1304                    },
1305                })
1306                .await;
1307        }
1308        self.hooks
1309            .emit(HookEvent::GenericEventFrame {
1310                frame: EventFrame::McpStartupComplete {
1311                    summary: codewhale_protocol::McpStartupCompleteEvent {
1312                        ready: summary.ready.clone(),
1313                        failed: summary
1314                            .failed
1315                            .iter()
1316                            .map(|f| codewhale_protocol::McpStartupFailure {
1317                                server_name: f.server_name.clone(),
1318                                error: f.error.clone(),
1319                            })
1320                            .collect(),
1321                        cancelled: summary.cancelled.clone(),
1322                    },
1323                },
1324            })
1325            .await;
1326        summary
1327    }
1328
1329    /// Returns the current application status including all jobs and their history.
1330    pub fn app_status(&self) -> AppResponse {
1331        let jobs = self.jobs.list();
1332        let events = jobs
1333            .iter()
1334            .flat_map(|job| {
1335                job.history.iter().map(|entry| EventFrame::ResponseDelta {
1336                    response_id: job.id.clone(),
1337                    delta: json!({
1338                        "kind": "job_transition",
1339                        "job_id": job.id.clone(),
1340                        "phase": entry.phase.clone(),
1341                        "status": job_status_to_str(entry.status),
1342                        "progress": entry.progress,
1343                        "detail": entry.detail.clone(),
1344                        "retry": job_retry_to_value(&entry.retry),
1345                        "at": entry.at
1346                    })
1347                    .to_string(),
1348                    channel: ResponseChannel::Text,
1349                })
1350            })
1351            .collect::<Vec<_>>();
1352        AppResponse {
1353            ok: true,
1354            data: json!({
1355                "jobs": jobs.into_iter().map(|job| {
1356                    json!({
1357                        "id": job.id,
1358                        "name": job.name,
1359                        "status": job_status_to_str(job.status),
1360                        "progress": job.progress,
1361                        "detail": job.detail,
1362                        "retry": job_retry_to_value(&job.retry),
1363                        "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
1364                    })
1365                }).collect::<Vec<_>>()
1366            }),
1367            events,
1368        }
1369    }
1370
1371    /// Returns the default model provider from the resolved configuration.
1372    pub fn provider_default(&self) -> ProviderKind {
1373        self.config.provider
1374    }
1375
1376    /// Saves a named checkpoint for a thread.
1377    pub fn save_thread_checkpoint(
1378        &self,
1379        thread_id: &str,
1380        checkpoint_id: &str,
1381        state: &Value,
1382    ) -> Result<()> {
1383        self.thread_manager
1384            .state_store()
1385            .save_checkpoint(thread_id, checkpoint_id, state)
1386    }
1387
1388    /// Loads a checkpoint for a thread. Pass `None` for the latest.
1389    pub fn load_thread_checkpoint(
1390        &self,
1391        thread_id: &str,
1392        checkpoint_id: Option<&str>,
1393    ) -> Result<Option<Value>> {
1394        Ok(self
1395            .thread_manager
1396            .state_store()
1397            .load_checkpoint(thread_id, checkpoint_id)?
1398            .map(|checkpoint| checkpoint.state))
1399    }
1400
1401    /// Enqueues a new background job and persists it immediately.
1402    pub fn enqueue_job(&mut self, name: impl Into<String>) -> Result<JobRecord> {
1403        let job = self.jobs.enqueue(name);
1404        self.jobs
1405            .persist_job(self.thread_manager.state_store(), &job.id)?;
1406        Ok(job)
1407    }
1408
1409    /// Transitions a job to running and persists the change.
1410    pub fn set_job_running(&mut self, job_id: &str) -> Result<()> {
1411        self.jobs.set_running(job_id);
1412        self.jobs
1413            .persist_job(self.thread_manager.state_store(), job_id)
1414    }
1415
1416    /// Updates a job's progress and persists the change.
1417    pub fn update_job_progress(
1418        &mut self,
1419        job_id: &str,
1420        progress: u8,
1421        detail: Option<String>,
1422    ) -> Result<()> {
1423        self.jobs.update_progress(job_id, progress, detail);
1424        self.jobs
1425            .persist_job(self.thread_manager.state_store(), job_id)
1426    }
1427
1428    /// Marks a job as completed and persists the change.
1429    pub fn complete_job(&mut self, job_id: &str) -> Result<()> {
1430        self.jobs.complete(job_id);
1431        self.jobs
1432            .persist_job(self.thread_manager.state_store(), job_id)
1433    }
1434
1435    /// Marks a job as failed and persists the change.
1436    pub fn fail_job(&mut self, job_id: &str, detail: impl Into<String>) -> Result<()> {
1437        self.jobs.fail(job_id, detail);
1438        self.jobs
1439            .persist_job(self.thread_manager.state_store(), job_id)
1440    }
1441
1442    /// Cancels a job and persists the change.
1443    pub fn cancel_job(&mut self, job_id: &str) -> Result<()> {
1444        self.jobs.cancel(job_id);
1445        self.jobs
1446            .persist_job(self.thread_manager.state_store(), job_id)
1447    }
1448
1449    /// Pauses a job and persists the change.
1450    pub fn pause_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1451        self.jobs.pause(job_id, detail);
1452        self.jobs
1453            .persist_job(self.thread_manager.state_store(), job_id)
1454    }
1455
1456    /// Resumes a paused job and persists the change.
1457    pub fn resume_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1458        self.jobs.resume(job_id, detail);
1459        self.jobs
1460            .persist_job(self.thread_manager.state_store(), job_id)
1461    }
1462
1463    /// Returns the state-transition history for a job.
1464    pub fn job_history(&self, job_id: &str) -> Vec<JobHistoryEntry> {
1465        self.jobs.history(job_id)
1466    }
1467}
1468
1469fn thread_response_from_new(status: &str, new: NewThread) -> ThreadResponse {
1470    ThreadResponse {
1471        thread_id: new.thread.id.clone(),
1472        status: status.to_string(),
1473        thread: Some(new.thread),
1474        threads: Vec::new(),
1475        model: Some(new.model),
1476        model_provider: Some(new.model_provider),
1477        cwd: Some(new.cwd),
1478        approval_policy: new.approval_policy,
1479        sandbox: new.sandbox,
1480        events: Vec::new(),
1481        data: json!({}),
1482    }
1483}
1484
1485fn preview_from_initial_history(initial_history: &InitialHistory) -> String {
1486    match initial_history {
1487        InitialHistory::New => "New conversation".to_string(),
1488        InitialHistory::Forked(items) => truncate_preview(
1489            &items
1490                .first()
1491                .map(Value::to_string)
1492                .unwrap_or_else(|| "Forked conversation".to_string()),
1493        ),
1494        InitialHistory::Resumed { history, .. } => truncate_preview(
1495            &history
1496                .first()
1497                .map(Value::to_string)
1498                .unwrap_or_else(|| "Resumed conversation".to_string()),
1499        ),
1500    }
1501}
1502
1503fn truncate_preview(value: &str) -> String {
1504    value.chars().take(120).collect()
1505}
1506
1507fn to_protocol_thread(thread: ThreadMetadata) -> Thread {
1508    Thread {
1509        id: thread.id,
1510        preview: thread.preview,
1511        ephemeral: thread.ephemeral,
1512        model_provider: thread.model_provider,
1513        created_at: thread.created_at,
1514        updated_at: thread.updated_at,
1515        status: match thread.status {
1516            PersistedThreadStatus::Running => ThreadStatus::Running,
1517            PersistedThreadStatus::Idle => ThreadStatus::Idle,
1518            PersistedThreadStatus::Completed => ThreadStatus::Completed,
1519            PersistedThreadStatus::Failed => ThreadStatus::Failed,
1520            PersistedThreadStatus::Paused => ThreadStatus::Paused,
1521            PersistedThreadStatus::Archived => ThreadStatus::Archived,
1522        },
1523        path: thread.path,
1524        cwd: thread.cwd,
1525        cli_version: thread.cli_version,
1526        source: match thread.source {
1527            SessionSource::Interactive => codewhale_protocol::SessionSource::Interactive,
1528            SessionSource::Resume => codewhale_protocol::SessionSource::Resume,
1529            SessionSource::Fork => codewhale_protocol::SessionSource::Fork,
1530            SessionSource::Api => codewhale_protocol::SessionSource::Api,
1531            SessionSource::Unknown => codewhale_protocol::SessionSource::Unknown,
1532        },
1533        name: thread.name,
1534    }
1535}
1536
1537fn to_persisted_status(status: &ThreadStatus) -> PersistedThreadStatus {
1538    match status {
1539        ThreadStatus::Running => PersistedThreadStatus::Running,
1540        ThreadStatus::Idle => PersistedThreadStatus::Idle,
1541        ThreadStatus::Completed => PersistedThreadStatus::Completed,
1542        ThreadStatus::Failed => PersistedThreadStatus::Failed,
1543        ThreadStatus::Paused => PersistedThreadStatus::Paused,
1544        ThreadStatus::Archived => PersistedThreadStatus::Archived,
1545    }
1546}
1547
1548fn to_persisted_source(source: &codewhale_protocol::SessionSource) -> SessionSource {
1549    match source {
1550        codewhale_protocol::SessionSource::Interactive => SessionSource::Interactive,
1551        codewhale_protocol::SessionSource::Resume => SessionSource::Resume,
1552        codewhale_protocol::SessionSource::Fork => SessionSource::Fork,
1553        codewhale_protocol::SessionSource::Api => SessionSource::Api,
1554        codewhale_protocol::SessionSource::Unknown => SessionSource::Unknown,
1555    }
1556}
1557
1558fn approval_request_frame(
1559    requirement: &ExecApprovalRequirement,
1560    call_id: String,
1561    approval_id: String,
1562    turn_id: String,
1563    command: String,
1564    cwd: String,
1565) -> Option<EventFrame> {
1566    let ExecApprovalRequirement::NeedsApproval {
1567        reason,
1568        proposed_execpolicy_amendment,
1569        proposed_network_policy_amendments,
1570    } = requirement
1571    else {
1572        return None;
1573    };
1574
1575    let mut available_decisions = vec![
1576        ReviewDecision::Approved,
1577        ReviewDecision::ApprovedForSession,
1578        ReviewDecision::Denied,
1579        ReviewDecision::Abort,
1580    ];
1581    if proposed_execpolicy_amendment
1582        .as_ref()
1583        .is_some_and(|amendment| !amendment.prefixes.is_empty())
1584    {
1585        available_decisions.push(ReviewDecision::ApprovedExecpolicyAmendment);
1586    }
1587    available_decisions.extend(proposed_network_policy_amendments.iter().cloned().map(
1588        |amendment| ReviewDecision::NetworkPolicyAmendment {
1589            host: amendment.host,
1590            action: amendment.action,
1591        },
1592    ));
1593
1594    Some(EventFrame::ExecApprovalRequest {
1595        request: ExecApprovalRequestEvent {
1596            call_id,
1597            approval_id,
1598            turn_id,
1599            command,
1600            cwd,
1601            reason: reason.clone(),
1602            network_approval_context: None,
1603            proposed_execpolicy_amendment: proposed_execpolicy_amendment
1604                .as_ref()
1605                .map(|amendment| amendment.prefixes.clone())
1606                .unwrap_or_default(),
1607            proposed_network_policy_amendments: proposed_network_policy_amendments.clone(),
1608            additional_permissions: Vec::new(),
1609            available_decisions,
1610        },
1611    })
1612}
1613
1614fn approval_requirement_payload(requirement: &ExecApprovalRequirement) -> Value {
1615    match requirement {
1616        ExecApprovalRequirement::Skip {
1617            bypass_sandbox,
1618            proposed_execpolicy_amendment,
1619        } => json!({
1620            "type": "skip",
1621            "bypass_sandbox": bypass_sandbox,
1622            "reason": requirement.reason(),
1623            "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1624                .as_ref()
1625                .map(|amendment| amendment.prefixes.clone())
1626                .unwrap_or_default()
1627        }),
1628        ExecApprovalRequirement::NeedsApproval {
1629            reason,
1630            proposed_execpolicy_amendment,
1631            proposed_network_policy_amendments,
1632        } => json!({
1633            "type": "needs_approval",
1634            "reason": reason,
1635            "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1636                .as_ref()
1637                .map(|amendment| amendment.prefixes.clone())
1638                .unwrap_or_default(),
1639            "proposed_network_policy_amendments": proposed_network_policy_amendments
1640        }),
1641        ExecApprovalRequirement::Forbidden { reason } => json!({
1642            "type": "forbidden",
1643            "reason": reason
1644        }),
1645    }
1646}
1647
1648fn policy_precheck_payload(
1649    decision: &ExecPolicyDecision,
1650    command: &str,
1651    cwd: &str,
1652    execution_kind: &str,
1653) -> Value {
1654    json!({
1655        "execution_kind": execution_kind,
1656        "command": command,
1657        "cwd": cwd,
1658        "allow": decision.allow,
1659        "requires_approval": decision.requires_approval,
1660        "matched_rule": decision.matched_rule.clone(),
1661        "phase": decision.requirement.phase(),
1662        "reason": decision.reason(),
1663        "requirement": approval_requirement_payload(&decision.requirement)
1664    })
1665}
1666
1667fn tool_payload_value(payload: &ToolPayload) -> Value {
1668    serde_json::to_value(payload).unwrap_or_else(
1669        |_| json!({"type":"serialization_error","message":"tool payload unavailable"}),
1670    )
1671}
1672
1673fn tool_output_value(output: &codewhale_protocol::ToolOutput) -> Value {
1674    serde_json::to_value(output).unwrap_or_else(
1675        |_| json!({"type":"serialization_error","message":"tool output unavailable"}),
1676    )
1677}
1678
1679fn event_frame_payload(frame: &EventFrame) -> Value {
1680    serde_json::to_value(frame)
1681        .unwrap_or_else(|_| json!({"event":"error","message":"failed to encode event frame"}))
1682}
1683
1684fn json_optional_string(value: &Value) -> Option<String> {
1685    if value.is_null() {
1686        None
1687    } else {
1688        value.as_str().map(ToString::to_string)
1689    }
1690}
1691
1692fn parse_retry_metadata(value: Option<&Value>) -> JobRetryMetadata {
1693    let Some(value) = value else {
1694        return JobRetryMetadata::default();
1695    };
1696    JobRetryMetadata {
1697        attempt: value
1698            .get("attempt")
1699            .and_then(Value::as_u64)
1700            .unwrap_or(0)
1701            .min(u32::MAX as u64) as u32,
1702        max_attempts: value
1703            .get("max_attempts")
1704            .and_then(Value::as_u64)
1705            .unwrap_or(DEFAULT_JOB_MAX_ATTEMPTS as u64)
1706            .min(u32::MAX as u64) as u32,
1707        backoff_base_ms: value
1708            .get("backoff_base_ms")
1709            .and_then(Value::as_u64)
1710            .unwrap_or(DEFAULT_JOB_BACKOFF_BASE_MS),
1711        next_backoff_ms: value
1712            .get("next_backoff_ms")
1713            .and_then(Value::as_u64)
1714            .unwrap_or(0),
1715        next_retry_at: value.get("next_retry_at").and_then(Value::as_i64),
1716    }
1717}
1718
1719fn parse_history_entry(value: &Value) -> Option<JobHistoryEntry> {
1720    let status = value
1721        .get("status")
1722        .and_then(Value::as_str)
1723        .and_then(job_status_from_str)?;
1724    Some(JobHistoryEntry {
1725        at: value.get("at").and_then(Value::as_i64).unwrap_or(0),
1726        phase: value
1727            .get("phase")
1728            .and_then(Value::as_str)
1729            .unwrap_or("unknown")
1730            .to_string(),
1731        status,
1732        progress: value
1733            .get("progress")
1734            .and_then(Value::as_u64)
1735            .map(|v| v.min(u8::MAX as u64) as u8),
1736        detail: value.get("detail").and_then(json_optional_string),
1737        retry: parse_retry_metadata(value.get("retry")),
1738    })
1739}
1740
1741fn job_status_to_str(status: JobStatus) -> &'static str {
1742    match status {
1743        JobStatus::Queued => "queued",
1744        JobStatus::Running => "running",
1745        JobStatus::Paused => "paused",
1746        JobStatus::Completed => "completed",
1747        JobStatus::Failed => "failed",
1748        JobStatus::Cancelled => "cancelled",
1749    }
1750}
1751
1752fn job_status_from_str(value: &str) -> Option<JobStatus> {
1753    match value {
1754        "queued" => Some(JobStatus::Queued),
1755        "running" => Some(JobStatus::Running),
1756        "paused" => Some(JobStatus::Paused),
1757        "completed" => Some(JobStatus::Completed),
1758        "failed" => Some(JobStatus::Failed),
1759        "cancelled" => Some(JobStatus::Cancelled),
1760        _ => None,
1761    }
1762}
1763
1764fn job_retry_to_value(retry: &JobRetryMetadata) -> Value {
1765    json!({
1766        "attempt": retry.attempt,
1767        "max_attempts": retry.max_attempts,
1768        "backoff_base_ms": retry.backoff_base_ms,
1769        "next_backoff_ms": retry.next_backoff_ms,
1770        "next_retry_at": retry.next_retry_at
1771    })
1772}
1773
1774fn job_history_to_value(entry: &JobHistoryEntry) -> Value {
1775    json!({
1776        "at": entry.at,
1777        "phase": entry.phase.clone(),
1778        "status": job_status_to_str(entry.status),
1779        "progress": entry.progress,
1780        "detail": entry.detail.clone(),
1781        "retry": job_retry_to_value(&entry.retry)
1782    })
1783}
1784
1785fn runtime_status_to_job_state(status: JobStatus) -> JobStateStatus {
1786    match status {
1787        JobStatus::Queued => JobStateStatus::Queued,
1788        JobStatus::Running => JobStateStatus::Running,
1789        JobStatus::Paused => JobStateStatus::Running,
1790        JobStatus::Completed => JobStateStatus::Completed,
1791        JobStatus::Failed => JobStateStatus::Failed,
1792        JobStatus::Cancelled => JobStateStatus::Cancelled,
1793    }
1794}
1795
1796fn job_state_status_to_runtime(status: JobStateStatus) -> JobStatus {
1797    match status {
1798        JobStateStatus::Queued => JobStatus::Queued,
1799        JobStateStatus::Running => JobStatus::Running,
1800        JobStateStatus::Completed => JobStatus::Completed,
1801        JobStateStatus::Failed => JobStatus::Failed,
1802        JobStateStatus::Cancelled => JobStatus::Cancelled,
1803    }
1804}
1805
1806#[cfg(test)]
1807mod tests {
1808    use super::*;
1809
1810    // ── JobManager: lifecycle ──────────────────────────────────────────
1811
1812    #[test]
1813    fn enqueue_creates_queued_job_with_zero_progress() {
1814        let mut jm = JobManager::default();
1815        let job = jm.enqueue("build");
1816        assert_eq!(job.name, "build");
1817        assert_eq!(job.status, JobStatus::Queued);
1818        assert_eq!(job.progress, Some(0));
1819        assert!(job.detail.is_none());
1820        assert_eq!(job.history.len(), 1);
1821        assert_eq!(job.history[0].phase, "created");
1822    }
1823
1824    #[test]
1825    fn set_running_transitions_from_queued() {
1826        let mut jm = JobManager::default();
1827        let job = jm.enqueue("deploy");
1828        let id = job.id.clone();
1829        jm.set_running(&id);
1830        let jobs = jm.list();
1831        let updated = jobs.iter().find(|j| j.id == id).unwrap();
1832        assert_eq!(updated.status, JobStatus::Running);
1833        assert_eq!(updated.history.last().unwrap().phase, "running");
1834    }
1835
1836    #[test]
1837    fn update_progress_clamps_to_100() {
1838        let mut jm = JobManager::default();
1839        let job = jm.enqueue("task");
1840        let id = job.id.clone();
1841        jm.update_progress(&id, 150, Some("over".to_string()));
1842        let jobs = jm.list();
1843        let updated = jobs.iter().find(|j| j.id == id).unwrap();
1844        assert_eq!(updated.progress, Some(100));
1845    }
1846
1847    #[test]
1848    fn complete_sets_progress_to_100() {
1849        let mut jm = JobManager::default();
1850        let job = jm.enqueue("task");
1851        let id = job.id.clone();
1852        jm.set_running(&id);
1853        jm.complete(&id);
1854        let jobs = jm.list();
1855        let updated = jobs.iter().find(|j| j.id == id).unwrap();
1856        assert_eq!(updated.status, JobStatus::Completed);
1857        assert_eq!(updated.progress, Some(100));
1858    }
1859
1860    #[test]
1861    fn fail_increments_attempt_and_sets_backoff() {
1862        let mut jm = JobManager::default();
1863        let job = jm.enqueue("fragile");
1864        let id = job.id.clone();
1865        jm.set_running(&id);
1866        jm.fail(&id, "crashed");
1867        let jobs = jm.list();
1868        let updated = jobs.iter().find(|j| j.id == id).unwrap();
1869        assert_eq!(updated.status, JobStatus::Failed);
1870        assert_eq!(updated.retry.attempt, 1);
1871        assert!(updated.retry.next_backoff_ms > 0);
1872        assert!(updated.retry.next_retry_at.is_some());
1873        assert_eq!(updated.detail.as_deref(), Some("crashed"));
1874    }
1875
1876    #[test]
1877    fn fail_clears_retry_after_max_attempts() {
1878        let mut jm = JobManager::default();
1879        let job = jm.enqueue("fragile");
1880        let id = job.id.clone();
1881        for _ in 0..=DEFAULT_JOB_MAX_ATTEMPTS {
1882            jm.set_running(&id);
1883            jm.fail(&id, "boom");
1884        }
1885        let jobs = jm.list();
1886        let updated = jobs.iter().find(|j| j.id == id).unwrap();
1887        assert_eq!(updated.retry.attempt, DEFAULT_JOB_MAX_ATTEMPTS);
1888        assert_eq!(updated.retry.next_backoff_ms, 0);
1889        assert!(updated.retry.next_retry_at.is_none());
1890    }
1891
1892    #[test]
1893    fn cancel_sets_status_and_clears_retry() {
1894        let mut jm = JobManager::default();
1895        let job = jm.enqueue("task");
1896        let id = job.id.clone();
1897        jm.cancel(&id);
1898        let jobs = jm.list();
1899        let updated = jobs.iter().find(|j| j.id == id).unwrap();
1900        assert_eq!(updated.status, JobStatus::Cancelled);
1901        assert_eq!(updated.retry.next_backoff_ms, 0);
1902    }
1903
1904    #[test]
1905    fn pause_and_resume_round_trip() {
1906        let mut jm = JobManager::default();
1907        let job = jm.enqueue("task");
1908        let id = job.id.clone();
1909        jm.set_running(&id);
1910        jm.pause(&id, Some("waiting".to_string()));
1911        let jobs = jm.list();
1912        let paused = jobs.iter().find(|j| j.id == id).unwrap();
1913        assert_eq!(paused.status, JobStatus::Paused);
1914        assert_eq!(paused.detail.as_deref(), Some("waiting"));
1915
1916        jm.resume(&id, None);
1917        let jobs = jm.list();
1918        let resumed = jobs.iter().find(|j| j.id == id).unwrap();
1919        assert_eq!(resumed.status, JobStatus::Running);
1920        assert_eq!(resumed.history.last().unwrap().phase, "resumed");
1921    }
1922
1923    #[test]
1924    fn list_returns_jobs_sorted_by_updated_at_desc() {
1925        let mut jm = JobManager::default();
1926        jm.enqueue("first");
1927        jm.enqueue("second");
1928        jm.enqueue("third");
1929        let jobs = jm.list();
1930        assert_eq!(jobs.len(), 3);
1931        for window in jobs.windows(2) {
1932            assert!(window[0].updated_at >= window[1].updated_at);
1933        }
1934    }
1935
1936    #[test]
1937    fn history_returns_entries_for_existing_job() {
1938        let mut jm = JobManager::default();
1939        let job = jm.enqueue("task");
1940        let id = job.id.clone();
1941        jm.set_running(&id);
1942        jm.complete(&id);
1943        let history = jm.history(&id);
1944        assert_eq!(history.len(), 3); // created, running, completed
1945        assert_eq!(history[0].phase, "created");
1946        assert_eq!(history[1].phase, "running");
1947        assert_eq!(history[2].phase, "completed");
1948    }
1949
1950    #[test]
1951    fn history_returns_empty_for_unknown_job() {
1952        let jm = JobManager::default();
1953        assert!(jm.history("nonexistent").is_empty());
1954    }
1955
1956    #[test]
1957    fn resume_pending_requeues_running_and_queued() {
1958        let mut jm = JobManager::default();
1959        let _j1 = jm.enqueue("queued_task");
1960        let j2 = jm.enqueue("running_task");
1961        let j3 = jm.enqueue("completed_task");
1962        let id2 = j2.id.clone();
1963        let id3 = j3.id.clone();
1964        jm.set_running(&id2);
1965        jm.set_running(&id3);
1966        jm.complete(&id3);
1967
1968        let resumed = jm.resume_pending();
1969        assert_eq!(resumed.len(), 2);
1970        for job in &resumed {
1971            assert_eq!(job.status, JobStatus::Queued);
1972        }
1973    }
1974
1975    // ── JobManager: backoff ────────────────────────────────────────────
1976
1977    #[test]
1978    fn deterministic_backoff_zero_on_first_attempt() {
1979        let retry = JobRetryMetadata {
1980            attempt: 0,
1981            ..Default::default()
1982        };
1983        assert_eq!(JobManager::deterministic_backoff_ms(&retry), 0);
1984    }
1985
1986    #[test]
1987    fn deterministic_backoff_exponential_growth() {
1988        let base = DEFAULT_JOB_BACKOFF_BASE_MS;
1989        for attempt in 1..=5 {
1990            let retry = JobRetryMetadata {
1991                attempt,
1992                backoff_base_ms: base,
1993                ..Default::default()
1994            };
1995            let expected = base * 2u64.pow(attempt.saturating_sub(1).min(20));
1996            assert_eq!(
1997                JobManager::deterministic_backoff_ms(&retry),
1998                expected,
1999                "attempt {attempt}"
2000            );
2001        }
2002    }
2003
2004    #[test]
2005    fn deterministic_backoff_saturates_at_high_exponent() {
2006        let retry = JobRetryMetadata {
2007            attempt: 63,
2008            backoff_base_ms: 1000,
2009            ..Default::default()
2010        };
2011        // Should not panic; result saturates
2012        let _ = JobManager::deterministic_backoff_ms(&retry);
2013    }
2014
2015    // ── JobManager: history truncation ─────────────────────────────────
2016
2017    #[test]
2018    fn push_history_truncates_beyond_max() {
2019        let mut jm = JobManager::default();
2020        let job = jm.enqueue("task");
2021        let id = job.id.clone();
2022        // Generate more history entries than the limit
2023        for i in 0..(MAX_JOB_HISTORY_ENTRIES + 20) {
2024            jm.update_progress(&id, (i % 100) as u8, Some(format!("step {i}")));
2025        }
2026        let history = jm.history(&id);
2027        assert_eq!(history.len(), MAX_JOB_HISTORY_ENTRIES);
2028    }
2029
2030    // ── JobManager: persistence encoding/parsing ───────────────────────
2031
2032    #[test]
2033    fn encode_and_parse_persisted_detail_round_trip() {
2034        let mut jm = JobManager::default();
2035        let job = jm.enqueue("task");
2036        let id = job.id.clone();
2037        jm.set_running(&id);
2038        jm.fail(&id, "oops");
2039        let job = jm.list().into_iter().find(|j| j.id == id).unwrap();
2040
2041        let encoded = JobManager::encode_persisted_detail(&job).unwrap().unwrap();
2042        let parsed = JobManager::parse_persisted_detail(Some(&encoded)).unwrap();
2043
2044        assert_eq!(parsed.status, job.status);
2045        assert_eq!(parsed.detail, job.detail);
2046        assert_eq!(parsed.retry.attempt, job.retry.attempt);
2047        assert_eq!(parsed.history.len(), job.history.len());
2048    }
2049
2050    #[test]
2051    fn parse_persisted_detail_returns_none_for_none_input() {
2052        assert!(JobManager::parse_persisted_detail(None).is_none());
2053    }
2054
2055    #[test]
2056    fn parse_persisted_detail_returns_none_for_invalid_json() {
2057        assert!(JobManager::parse_persisted_detail(Some("not json")).is_none());
2058    }
2059
2060    // ── Helper functions ───────────────────────────────────────────────
2061
2062    #[test]
2063    fn job_status_round_trip_str() {
2064        let statuses = [
2065            JobStatus::Queued,
2066            JobStatus::Running,
2067            JobStatus::Paused,
2068            JobStatus::Completed,
2069            JobStatus::Failed,
2070            JobStatus::Cancelled,
2071        ];
2072        for status in &statuses {
2073            let s = job_status_to_str(*status);
2074            let parsed = job_status_from_str(s);
2075            assert_eq!(parsed, Some(*status), "round-trip failed for {s:?}");
2076        }
2077    }
2078
2079    #[test]
2080    fn job_status_from_str_returns_none_for_unknown() {
2081        assert_eq!(job_status_from_str("unknown"), None);
2082        assert_eq!(job_status_from_str(""), None);
2083    }
2084
2085    #[test]
2086    fn truncate_preview_limits_to_120_chars() {
2087        let long = "a".repeat(200);
2088        let truncated = truncate_preview(&long);
2089        assert_eq!(truncated.len(), 120);
2090    }
2091
2092    #[test]
2093    fn truncate_preview_preserves_short_strings() {
2094        let short = "hello";
2095        assert_eq!(truncate_preview(short), "hello");
2096    }
2097
2098    #[test]
2099    fn runtime_status_to_job_state_maps_correctly() {
2100        assert_eq!(
2101            runtime_status_to_job_state(JobStatus::Queued),
2102            JobStateStatus::Queued
2103        );
2104        assert_eq!(
2105            runtime_status_to_job_state(JobStatus::Running),
2106            JobStateStatus::Running
2107        );
2108        assert_eq!(
2109            runtime_status_to_job_state(JobStatus::Paused),
2110            JobStateStatus::Running
2111        );
2112        assert_eq!(
2113            runtime_status_to_job_state(JobStatus::Completed),
2114            JobStateStatus::Completed
2115        );
2116        assert_eq!(
2117            runtime_status_to_job_state(JobStatus::Failed),
2118            JobStateStatus::Failed
2119        );
2120        assert_eq!(
2121            runtime_status_to_job_state(JobStatus::Cancelled),
2122            JobStateStatus::Cancelled
2123        );
2124    }
2125
2126    #[test]
2127    fn job_state_status_to_runtime_maps_correctly() {
2128        assert_eq!(
2129            job_state_status_to_runtime(JobStateStatus::Queued),
2130            JobStatus::Queued
2131        );
2132        assert_eq!(
2133            job_state_status_to_runtime(JobStateStatus::Running),
2134            JobStatus::Running
2135        );
2136        assert_eq!(
2137            job_state_status_to_runtime(JobStateStatus::Completed),
2138            JobStatus::Completed
2139        );
2140        assert_eq!(
2141            job_state_status_to_runtime(JobStateStatus::Failed),
2142            JobStatus::Failed
2143        );
2144        assert_eq!(
2145            job_state_status_to_runtime(JobStateStatus::Cancelled),
2146            JobStatus::Cancelled
2147        );
2148    }
2149
2150    #[test]
2151    fn preview_from_initial_history_new() {
2152        let preview = preview_from_initial_history(&InitialHistory::New);
2153        assert_eq!(preview, "New conversation");
2154    }
2155
2156    #[test]
2157    fn preview_from_initial_history_forked() {
2158        let preview = preview_from_initial_history(&InitialHistory::Forked(vec![json!("hello")]));
2159        assert!(preview.contains("hello"));
2160    }
2161
2162    #[test]
2163    fn preview_from_initial_history_resumed() {
2164        let preview = preview_from_initial_history(&InitialHistory::Resumed {
2165            conversation_id: "test".to_string(),
2166            history: vec![json!("world")],
2167            rollout_path: PathBuf::from("/tmp/test"),
2168        });
2169        assert!(preview.contains("world"));
2170    }
2171
2172    #[test]
2173    fn json_optional_string_handles_null() {
2174        assert!(json_optional_string(&Value::Null).is_none());
2175    }
2176
2177    #[test]
2178    fn json_optional_string_handles_string() {
2179        assert_eq!(
2180            json_optional_string(&Value::String("hello".to_string())),
2181            Some("hello".to_string())
2182        );
2183    }
2184
2185    #[test]
2186    fn json_optional_string_handles_non_string() {
2187        assert!(json_optional_string(&json!(42)).is_none());
2188    }
2189
2190    #[test]
2191    fn parse_retry_metadata_returns_default_for_none() {
2192        let retry = parse_retry_metadata(None);
2193        assert_eq!(retry.attempt, 0);
2194        assert_eq!(retry.max_attempts, DEFAULT_JOB_MAX_ATTEMPTS);
2195        assert_eq!(retry.backoff_base_ms, DEFAULT_JOB_BACKOFF_BASE_MS);
2196    }
2197
2198    #[test]
2199    fn parse_retry_metadata_parses_fields() {
2200        let value = json!({
2201            "attempt": 2,
2202            "max_attempts": 5,
2203            "backoff_base_ms": 1000,
2204            "next_backoff_ms": 2000,
2205            "next_retry_at": 1234567890i64
2206        });
2207        let retry = parse_retry_metadata(Some(&value));
2208        assert_eq!(retry.attempt, 2);
2209        assert_eq!(retry.max_attempts, 5);
2210        assert_eq!(retry.backoff_base_ms, 1000);
2211        assert_eq!(retry.next_backoff_ms, 2000);
2212        assert_eq!(retry.next_retry_at, Some(1234567890));
2213    }
2214
2215    #[test]
2216    fn parse_history_entry_returns_none_without_status() {
2217        let value = json!({"at": 1, "phase": "test"});
2218        assert!(parse_history_entry(&value).is_none());
2219    }
2220
2221    #[test]
2222    fn parse_history_entry_parses_valid_entry() {
2223        let value = json!({
2224            "at": 100,
2225            "phase": "running",
2226            "status": "running",
2227            "progress": 50,
2228            "detail": "working",
2229            "retry": {"attempt": 0, "max_attempts": 3, "backoff_base_ms": 500}
2230        });
2231        let entry = parse_history_entry(&value).unwrap();
2232        assert_eq!(entry.at, 100);
2233        assert_eq!(entry.phase, "running");
2234        assert_eq!(entry.status, JobStatus::Running);
2235        assert_eq!(entry.progress, Some(50));
2236        assert_eq!(entry.detail.as_deref(), Some("working"));
2237    }
2238}