Skip to main content

codewhale_core/
lib.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use anyhow::Result;
6use codewhale_agent::ModelRegistry;
7use codewhale_config::{CliRuntimeOverrides, ConfigToml, ProviderKind};
8use codewhale_execpolicy::{
9    AskForApproval, ExecApprovalRequirement, ExecPolicyContext, ExecPolicyDecision,
10    ExecPolicyEngine,
11};
12use codewhale_hooks::{HookDispatcher, HookEvent};
13use codewhale_mcp::{
14    McpManager, McpStartupCompleteEvent, McpStartupStatus as McpManagerStartupStatus,
15};
16use codewhale_protocol::{
17    AppResponse, EventFrame, ExecApprovalRequestEvent, PromptRequest, PromptResponse,
18    ResponseChannel, ReviewDecision, Thread, ThreadForkParams, ThreadListParams, ThreadReadParams,
19    ThreadRequest, ThreadResponse, ThreadResumeParams, ThreadSetNameParams, ThreadStatus,
20    ToolPayload,
21};
22use codewhale_state::{
23    JobStateRecord, JobStateStatus, SessionSource, StateStore, ThreadListFilters, ThreadMetadata,
24    ThreadStatus as PersistedThreadStatus,
25};
26use codewhale_tools::{ToolCall, ToolRegistry};
27use serde_json::{Value, json};
28use uuid::Uuid;
29
30#[derive(Debug, Clone)]
31pub enum InitialHistory {
32    New,
33    Forked(Vec<Value>),
34    Resumed {
35        conversation_id: String,
36        history: Vec<Value>,
37        rollout_path: PathBuf,
38    },
39}
40
41#[derive(Debug, Clone)]
42pub struct NewThread {
43    pub thread: Thread,
44    pub model: String,
45    pub model_provider: String,
46    pub cwd: PathBuf,
47    pub approval_policy: Option<String>,
48    pub sandbox: Option<String>,
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum JobStatus {
53    Queued,
54    Running,
55    Paused,
56    Completed,
57    Failed,
58    Cancelled,
59}
60
61const JOB_DETAIL_SCHEMA_VERSION: u8 = 1;
62const DEFAULT_JOB_MAX_ATTEMPTS: u32 = 3;
63const DEFAULT_JOB_BACKOFF_BASE_MS: u64 = 500;
64const MAX_JOB_HISTORY_ENTRIES: usize = 64;
65
66#[derive(Debug, Clone)]
67pub struct JobRetryMetadata {
68    pub attempt: u32,
69    pub max_attempts: u32,
70    pub backoff_base_ms: u64,
71    pub next_backoff_ms: u64,
72    pub next_retry_at: Option<i64>,
73}
74
75impl Default for JobRetryMetadata {
76    fn default() -> Self {
77        Self {
78            attempt: 0,
79            max_attempts: DEFAULT_JOB_MAX_ATTEMPTS,
80            backoff_base_ms: DEFAULT_JOB_BACKOFF_BASE_MS,
81            next_backoff_ms: 0,
82            next_retry_at: None,
83        }
84    }
85}
86
87#[derive(Debug, Clone)]
88pub struct JobHistoryEntry {
89    pub at: i64,
90    pub phase: String,
91    pub status: JobStatus,
92    pub progress: Option<u8>,
93    pub detail: Option<String>,
94    pub retry: JobRetryMetadata,
95}
96
97#[derive(Debug, Clone)]
98struct PersistedJobDetail {
99    pub status: JobStatus,
100    pub detail: Option<String>,
101    pub retry: JobRetryMetadata,
102    pub history: Vec<JobHistoryEntry>,
103}
104
105#[derive(Debug, Clone)]
106pub struct JobRecord {
107    pub id: String,
108    pub name: String,
109    pub status: JobStatus,
110    pub progress: Option<u8>,
111    pub detail: Option<String>,
112    pub retry: JobRetryMetadata,
113    pub history: Vec<JobHistoryEntry>,
114    pub created_at: i64,
115    pub updated_at: i64,
116}
117
118#[derive(Debug, Default)]
119pub struct JobManager {
120    jobs: HashMap<String, JobRecord>,
121}
122
123impl JobManager {
124    fn now_ts() -> i64 {
125        chrono::Utc::now().timestamp()
126    }
127
128    fn deterministic_backoff_ms(retry: &JobRetryMetadata) -> u64 {
129        if retry.attempt == 0 {
130            return 0;
131        }
132        let exponent = retry.attempt.saturating_sub(1).min(20);
133        let multiplier = 1u64.checked_shl(exponent).unwrap_or(u64::MAX);
134        retry.backoff_base_ms.saturating_mul(multiplier)
135    }
136
137    fn clear_retry_schedule(retry: &mut JobRetryMetadata) {
138        retry.next_backoff_ms = 0;
139        retry.next_retry_at = None;
140    }
141
142    fn push_history(job: &mut JobRecord, phase: &str) {
143        job.history.push(JobHistoryEntry {
144            at: job.updated_at,
145            phase: phase.to_string(),
146            status: job.status,
147            progress: job.progress,
148            detail: job.detail.clone(),
149            retry: job.retry.clone(),
150        });
151        if job.history.len() > MAX_JOB_HISTORY_ENTRIES {
152            let to_drain = job.history.len() - MAX_JOB_HISTORY_ENTRIES;
153            job.history.drain(0..to_drain);
154        }
155    }
156
157    fn parse_persisted_detail(raw: Option<&str>) -> Option<PersistedJobDetail> {
158        let raw = raw?;
159        let parsed: Value = serde_json::from_str(raw).ok()?;
160        let status = parsed
161            .get("status")
162            .and_then(Value::as_str)
163            .and_then(job_status_from_str)?;
164        let detail = parsed.get("detail").and_then(json_optional_string);
165        let retry = parse_retry_metadata(parsed.get("retry"));
166        let history = parsed
167            .get("history")
168            .and_then(Value::as_array)
169            .map(|items| {
170                items
171                    .iter()
172                    .filter_map(parse_history_entry)
173                    .collect::<Vec<_>>()
174            })
175            .unwrap_or_default();
176        Some(PersistedJobDetail {
177            status,
178            detail,
179            retry,
180            history,
181        })
182    }
183
184    fn encode_persisted_detail(job: &JobRecord) -> Result<Option<String>> {
185        let encoded = json!({
186            "schema_version": JOB_DETAIL_SCHEMA_VERSION,
187            "status": job_status_to_str(job.status),
188            "detail": job.detail.clone(),
189            "retry": job_retry_to_value(&job.retry),
190            "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
191        })
192        .to_string();
193        Ok(Some(encoded))
194    }
195
196    pub fn enqueue(&mut self, name: impl Into<String>) -> JobRecord {
197        let now = Self::now_ts();
198        let id = format!("job-{}", Uuid::new_v4());
199        let mut job = JobRecord {
200            id: id.clone(),
201            name: name.into(),
202            status: JobStatus::Queued,
203            progress: Some(0),
204            detail: None,
205            retry: JobRetryMetadata::default(),
206            history: Vec::new(),
207            created_at: now,
208            updated_at: now,
209        };
210        Self::push_history(&mut job, "created");
211        self.jobs.insert(id, job.clone());
212        job
213    }
214
215    pub fn set_running(&mut self, id: &str) {
216        if let Some(job) = self.jobs.get_mut(id) {
217            job.status = JobStatus::Running;
218            Self::clear_retry_schedule(&mut job.retry);
219            job.updated_at = Self::now_ts();
220            Self::push_history(job, "running");
221        }
222    }
223
224    pub fn update_progress(&mut self, id: &str, progress: u8, detail: Option<String>) {
225        if let Some(job) = self.jobs.get_mut(id) {
226            job.progress = Some(progress.min(100));
227            job.detail = detail;
228            job.updated_at = Self::now_ts();
229            Self::push_history(job, "progress_updated");
230        }
231    }
232
233    pub fn complete(&mut self, id: &str) {
234        if let Some(job) = self.jobs.get_mut(id) {
235            job.status = JobStatus::Completed;
236            job.progress = Some(100);
237            Self::clear_retry_schedule(&mut job.retry);
238            job.updated_at = Self::now_ts();
239            Self::push_history(job, "completed");
240        }
241    }
242
243    pub fn fail(&mut self, id: &str, detail: impl Into<String>) {
244        if let Some(job) = self.jobs.get_mut(id) {
245            let now = Self::now_ts();
246            job.status = JobStatus::Failed;
247            job.detail = Some(detail.into());
248            if job.retry.attempt < job.retry.max_attempts {
249                job.retry.attempt += 1;
250                job.retry.next_backoff_ms = Self::deterministic_backoff_ms(&job.retry);
251                let delay_secs = ((job.retry.next_backoff_ms.saturating_add(999)) / 1000)
252                    .min(i64::MAX as u64) as i64;
253                job.retry.next_retry_at = Some(now.saturating_add(delay_secs));
254            } else {
255                Self::clear_retry_schedule(&mut job.retry);
256            }
257            job.updated_at = now;
258            Self::push_history(job, "failed");
259        }
260    }
261
262    pub fn cancel(&mut self, id: &str) {
263        if let Some(job) = self.jobs.get_mut(id) {
264            job.status = JobStatus::Cancelled;
265            Self::clear_retry_schedule(&mut job.retry);
266            job.updated_at = Self::now_ts();
267            Self::push_history(job, "cancelled");
268        }
269    }
270
271    pub fn pause(&mut self, id: &str, detail: Option<String>) {
272        if let Some(job) = self.jobs.get_mut(id) {
273            job.status = JobStatus::Paused;
274            if detail.is_some() {
275                job.detail = detail;
276            }
277            job.updated_at = Self::now_ts();
278            Self::push_history(job, "paused");
279        }
280    }
281
282    pub fn resume(&mut self, id: &str, detail: Option<String>) {
283        if let Some(job) = self.jobs.get_mut(id) {
284            job.status = JobStatus::Running;
285            if detail.is_some() {
286                job.detail = detail;
287            }
288            Self::clear_retry_schedule(&mut job.retry);
289            job.updated_at = Self::now_ts();
290            Self::push_history(job, "resumed");
291        }
292    }
293
294    pub fn list(&self) -> Vec<JobRecord> {
295        let mut out = self.jobs.values().cloned().collect::<Vec<_>>();
296        out.sort_by_key(|job| std::cmp::Reverse(job.updated_at));
297        out
298    }
299
300    pub fn history(&self, id: &str) -> Vec<JobHistoryEntry> {
301        self.jobs
302            .get(id)
303            .map(|job| job.history.clone())
304            .unwrap_or_default()
305    }
306
307    pub fn resume_pending(&mut self) -> Vec<JobRecord> {
308        let mut resumed = Vec::new();
309        for job in self.jobs.values_mut() {
310            if matches!(job.status, JobStatus::Queued | JobStatus::Running) {
311                job.status = JobStatus::Queued;
312                job.updated_at = Self::now_ts();
313                Self::push_history(job, "queued_after_resume");
314                resumed.push(job.clone());
315            }
316        }
317        resumed
318    }
319
320    pub fn load_from_store(&mut self, store: &StateStore) -> Result<()> {
321        let persisted = store.list_jobs(Some(500))?;
322        for job in persisted {
323            let fallback_status = job_state_status_to_runtime(job.status);
324            let parsed = Self::parse_persisted_detail(job.detail.as_deref());
325            let (status, detail, retry, history) = if let Some(detail_state) = parsed {
326                (
327                    detail_state.status,
328                    detail_state.detail,
329                    detail_state.retry,
330                    detail_state.history,
331                )
332            } else {
333                (
334                    fallback_status,
335                    job.detail,
336                    JobRetryMetadata::default(),
337                    Vec::new(),
338                )
339            };
340            self.jobs.insert(
341                job.id.clone(),
342                JobRecord {
343                    id: job.id,
344                    name: job.name,
345                    status,
346                    progress: job.progress,
347                    detail,
348                    retry,
349                    history,
350                    created_at: job.created_at,
351                    updated_at: job.updated_at,
352                },
353            );
354        }
355        Ok(())
356    }
357
358    pub fn persist_job(&self, store: &StateStore, id: &str) -> Result<()> {
359        let Some(job) = self.jobs.get(id) else {
360            return Ok(());
361        };
362        let encoded_detail = Self::encode_persisted_detail(job)?;
363        store.upsert_job(&JobStateRecord {
364            id: job.id.clone(),
365            name: job.name.clone(),
366            status: runtime_status_to_job_state(job.status),
367            progress: job.progress,
368            detail: encoded_detail,
369            created_at: job.created_at,
370            updated_at: job.updated_at,
371        })
372    }
373
374    pub fn persist_all(&self, store: &StateStore) -> Result<()> {
375        for id in self.jobs.keys() {
376            self.persist_job(store, id)?;
377        }
378        Ok(())
379    }
380}
381
382pub struct ThreadManager {
383    store: StateStore,
384    running_threads: HashMap<String, Thread>,
385    cli_version: String,
386}
387
388impl ThreadManager {
389    pub fn new(store: StateStore) -> Self {
390        Self {
391            store,
392            running_threads: HashMap::new(),
393            cli_version: env!("CARGO_PKG_VERSION").to_string(),
394        }
395    }
396
397    pub fn state_store(&self) -> &StateStore {
398        &self.store
399    }
400
401    pub fn spawn_thread_with_history(
402        &mut self,
403        model_provider: String,
404        cwd: PathBuf,
405        initial_history: InitialHistory,
406        persist_extended_history: bool,
407    ) -> Result<NewThread> {
408        let id = format!("thread-{}", Uuid::new_v4());
409        let now = chrono::Utc::now().timestamp();
410        let preview = preview_from_initial_history(&initial_history);
411        let source = match initial_history {
412            InitialHistory::New => SessionSource::Interactive,
413            InitialHistory::Forked(_) => SessionSource::Fork,
414            InitialHistory::Resumed { .. } => SessionSource::Resume,
415        };
416        let thread = Thread {
417            id: id.clone(),
418            preview,
419            ephemeral: !persist_extended_history,
420            model_provider: model_provider.clone(),
421            created_at: now,
422            updated_at: now,
423            status: ThreadStatus::Running,
424            path: None,
425            cwd: cwd.clone(),
426            cli_version: self.cli_version.clone(),
427            source: match source {
428                SessionSource::Interactive => codewhale_protocol::SessionSource::Interactive,
429                SessionSource::Resume => codewhale_protocol::SessionSource::Resume,
430                SessionSource::Fork => codewhale_protocol::SessionSource::Fork,
431                SessionSource::Api => codewhale_protocol::SessionSource::Api,
432                SessionSource::Unknown => codewhale_protocol::SessionSource::Unknown,
433            },
434            name: None,
435        };
436        self.persist_thread(&thread, None)?;
437        match &initial_history {
438            InitialHistory::Forked(items) => {
439                for item in items {
440                    self.store.append_message(
441                        &thread.id,
442                        "history",
443                        &item.to_string(),
444                        Some(item.clone()),
445                    )?;
446                }
447            }
448            InitialHistory::Resumed { history, .. } => {
449                for item in history {
450                    self.store.append_message(
451                        &thread.id,
452                        "history",
453                        &item.to_string(),
454                        Some(item.clone()),
455                    )?;
456                }
457            }
458            InitialHistory::New => {}
459        }
460        self.running_threads
461            .insert(thread.id.clone(), thread.clone());
462        Ok(NewThread {
463            thread,
464            model: "auto".to_string(),
465            model_provider,
466            cwd,
467            approval_policy: None,
468            sandbox: None,
469        })
470    }
471
472    pub fn resume_thread_with_history(
473        &mut self,
474        params: &ThreadResumeParams,
475        fallback_cwd: &Path,
476        model_provider: String,
477    ) -> Result<Option<NewThread>> {
478        if params.history.is_none()
479            && let Some(thread) = self.running_threads.get(&params.thread_id).cloned()
480        {
481            return Ok(Some(NewThread {
482                model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
483                model_provider: params.model_provider.clone().unwrap_or(model_provider),
484                cwd: params.cwd.clone().unwrap_or_else(|| thread.cwd.clone()),
485                approval_policy: params.approval_policy.clone(),
486                sandbox: params.sandbox.clone(),
487                thread,
488            }));
489        }
490
491        let persisted = self.store.get_thread(&params.thread_id)?;
492        let Some(metadata) = persisted else {
493            return Ok(None);
494        };
495        let mut thread = to_protocol_thread(metadata);
496        thread.status = ThreadStatus::Running;
497        thread.updated_at = chrono::Utc::now().timestamp();
498        thread.cwd = params
499            .cwd
500            .clone()
501            .unwrap_or_else(|| fallback_cwd.to_path_buf());
502        self.persist_thread(&thread, None)?;
503        self.running_threads
504            .insert(thread.id.clone(), thread.clone());
505        if let Some(history) = params.history.as_ref() {
506            for item in history {
507                self.store.append_message(
508                    &thread.id,
509                    "history",
510                    &item.to_string(),
511                    Some(item.clone()),
512                )?;
513            }
514        }
515
516        Ok(Some(NewThread {
517            model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
518            model_provider: params.model_provider.clone().unwrap_or(model_provider),
519            cwd: thread.cwd.clone(),
520            approval_policy: params.approval_policy.clone(),
521            sandbox: params.sandbox.clone(),
522            thread,
523        }))
524    }
525
526    pub fn fork_thread(
527        &mut self,
528        params: &ThreadForkParams,
529        fallback_cwd: &Path,
530    ) -> Result<Option<NewThread>> {
531        let parent = self.store.get_thread(&params.thread_id)?;
532        let Some(parent) = parent else {
533            return Ok(None);
534        };
535        let parent_thread = to_protocol_thread(parent);
536        let new = self.spawn_thread_with_history(
537            params
538                .model_provider
539                .clone()
540                .unwrap_or_else(|| parent_thread.model_provider.clone()),
541            params
542                .cwd
543                .clone()
544                .unwrap_or_else(|| fallback_cwd.to_path_buf()),
545            InitialHistory::Forked(vec![json!({
546                "type": "fork",
547                "from_thread_id": parent_thread.id
548            })]),
549            params.persist_extended_history,
550        )?;
551        Ok(Some(new))
552    }
553
554    pub fn list_threads(&self, params: &ThreadListParams) -> Result<Vec<Thread>> {
555        let list = self.store.list_threads(ThreadListFilters {
556            include_archived: params.include_archived,
557            limit: params.limit,
558        })?;
559        Ok(list.into_iter().map(to_protocol_thread).collect())
560    }
561
562    pub fn read_thread(&self, params: &ThreadReadParams) -> Result<Option<Thread>> {
563        Ok(self
564            .store
565            .get_thread(&params.thread_id)?
566            .map(to_protocol_thread))
567    }
568
569    pub fn set_thread_name(&mut self, params: &ThreadSetNameParams) -> Result<Option<Thread>> {
570        let Some(mut metadata) = self.store.get_thread(&params.thread_id)? else {
571            return Ok(None);
572        };
573        metadata.name = Some(params.name.clone());
574        metadata.updated_at = chrono::Utc::now().timestamp();
575        self.store.upsert_thread(&metadata)?;
576        let updated = to_protocol_thread(metadata);
577        self.running_threads
578            .insert(updated.id.clone(), updated.clone());
579        Ok(Some(updated))
580    }
581
582    pub fn archive_thread(&mut self, thread_id: &str) -> Result<()> {
583        self.store.mark_archived(thread_id)?;
584        if let Some(thread) = self.running_threads.get_mut(thread_id) {
585            thread.status = ThreadStatus::Archived;
586        }
587        Ok(())
588    }
589
590    pub fn unarchive_thread(&mut self, thread_id: &str) -> Result<()> {
591        self.store.mark_unarchived(thread_id)?;
592        Ok(())
593    }
594
595    pub fn touch_message(&mut self, thread_id: &str, input: &str) -> Result<()> {
596        let Some(mut metadata) = self.store.get_thread(thread_id)? else {
597            return Ok(());
598        };
599        metadata.updated_at = chrono::Utc::now().timestamp();
600        metadata.preview = truncate_preview(input);
601        metadata.status = PersistedThreadStatus::Running;
602        self.store.upsert_thread(&metadata)?;
603        if let Some(thread) = self.running_threads.get_mut(thread_id) {
604            thread.updated_at = metadata.updated_at;
605            thread.preview = metadata.preview;
606            thread.status = ThreadStatus::Running;
607        }
608        let message_id = self.store.append_message(thread_id, "user", input, None)?;
609        self.store.save_checkpoint(
610            thread_id,
611            "latest",
612            &json!({
613                "reason": "thread_message",
614                "message_id": message_id,
615                "role": "user",
616                "preview": truncate_preview(input),
617                "updated_at": metadata.updated_at
618            }),
619        )?;
620        Ok(())
621    }
622
623    fn persist_thread(&self, thread: &Thread, rollout_path: Option<PathBuf>) -> Result<()> {
624        self.store.upsert_thread(&ThreadMetadata {
625            id: thread.id.clone(),
626            rollout_path,
627            preview: thread.preview.clone(),
628            ephemeral: thread.ephemeral,
629            model_provider: thread.model_provider.clone(),
630            created_at: thread.created_at,
631            updated_at: thread.updated_at,
632            status: to_persisted_status(&thread.status),
633            path: thread.path.clone(),
634            cwd: thread.cwd.clone(),
635            cli_version: thread.cli_version.clone(),
636            source: to_persisted_source(&thread.source),
637            name: thread.name.clone(),
638            sandbox_policy: None,
639            approval_mode: None,
640            archived: matches!(thread.status, ThreadStatus::Archived),
641            archived_at: None,
642            git_sha: None,
643            git_branch: None,
644            git_origin_url: None,
645            memory_mode: None,
646        })
647    }
648}
649
650pub struct Runtime {
651    pub config: ConfigToml,
652    pub model_registry: ModelRegistry,
653    pub thread_manager: ThreadManager,
654    pub tool_registry: Arc<ToolRegistry>,
655    pub mcp_manager: Arc<McpManager>,
656    pub exec_policy: ExecPolicyEngine,
657    pub hooks: HookDispatcher,
658    pub jobs: JobManager,
659}
660
661impl Runtime {
662    pub fn new(
663        config: ConfigToml,
664        model_registry: ModelRegistry,
665        state: StateStore,
666        tool_registry: Arc<ToolRegistry>,
667        mcp_manager: Arc<McpManager>,
668        exec_policy: ExecPolicyEngine,
669        hooks: HookDispatcher,
670    ) -> Self {
671        let mut jobs = JobManager::default();
672        let _ = jobs.load_from_store(&state);
673        Self {
674            config,
675            model_registry,
676            thread_manager: ThreadManager::new(state),
677            tool_registry,
678            mcp_manager,
679            exec_policy,
680            hooks,
681            jobs,
682        }
683    }
684
685    fn persisted_thread_data(&self, thread_id: &str) -> Result<Value> {
686        let history = self
687            .thread_manager
688            .state_store()
689            .list_messages(thread_id, Some(500))?
690            .into_iter()
691            .map(|message| {
692                json!({
693                    "id": message.id,
694                    "role": message.role,
695                    "content": message.content,
696                    "item": message.item,
697                    "created_at": message.created_at
698                })
699            })
700            .collect::<Vec<_>>();
701
702        let checkpoint = self
703            .thread_manager
704            .state_store()
705            .load_checkpoint(thread_id, None)?
706            .map(|record| {
707                json!({
708                    "checkpoint_id": record.checkpoint_id,
709                    "state": record.state,
710                    "created_at": record.created_at
711                })
712            });
713
714        Ok(json!({
715            "history": history,
716            "checkpoint": checkpoint
717        }))
718    }
719
720    fn persist_latest_checkpoint(&self, thread_id: &str, reason: &str, state: Value) -> Result<()> {
721        self.thread_manager.state_store().save_checkpoint(
722            thread_id,
723            "latest",
724            &json!({
725                "reason": reason,
726                "saved_at": chrono::Utc::now().timestamp(),
727                "state": state
728            }),
729        )
730    }
731
732    pub async fn handle_thread(&mut self, req: ThreadRequest) -> Result<ThreadResponse> {
733        match req {
734            ThreadRequest::Create { .. } => {
735                let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
736                let new = self.thread_manager.spawn_thread_with_history(
737                    "deepseek".to_string(),
738                    cwd,
739                    InitialHistory::New,
740                    false,
741                )?;
742                let mut response = thread_response_from_new("created", new);
743                response.data = self.persisted_thread_data(&response.thread_id)?;
744                Ok(response)
745            }
746            ThreadRequest::Start(params) => {
747                let cwd = params.cwd.clone().unwrap_or_else(|| {
748                    std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
749                });
750                let new = self.thread_manager.spawn_thread_with_history(
751                    params
752                        .model_provider
753                        .clone()
754                        .unwrap_or_else(|| "deepseek".to_string()),
755                    cwd,
756                    InitialHistory::New,
757                    params.persist_extended_history,
758                )?;
759                let mut response = thread_response_from_new("started", new);
760                response.data = self.persisted_thread_data(&response.thread_id)?;
761                Ok(response)
762            }
763            ThreadRequest::Resume(params) => {
764                let fallback_cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
765                if let Some(new) = self.thread_manager.resume_thread_with_history(
766                    &params,
767                    &fallback_cwd,
768                    "deepseek".to_string(),
769                )? {
770                    let mut response = thread_response_from_new("resumed", new);
771                    response.data = self.persisted_thread_data(&response.thread_id)?;
772                    Ok(response)
773                } else {
774                    Ok(ThreadResponse {
775                        thread_id: params.thread_id,
776                        status: "missing".to_string(),
777                        thread: None,
778                        threads: Vec::new(),
779                        model: None,
780                        model_provider: None,
781                        cwd: None,
782                        approval_policy: params.approval_policy,
783                        sandbox: params.sandbox,
784                        events: Vec::new(),
785                        data: json!({"error":"thread not found"}),
786                    })
787                }
788            }
789            ThreadRequest::Fork(params) => {
790                let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
791                if let Some(new) = self.thread_manager.fork_thread(&params, &cwd)? {
792                    let mut response = thread_response_from_new("forked", new);
793                    response.data = self.persisted_thread_data(&response.thread_id)?;
794                    Ok(response)
795                } else {
796                    Ok(ThreadResponse {
797                        thread_id: params.thread_id,
798                        status: "missing".to_string(),
799                        thread: None,
800                        threads: Vec::new(),
801                        model: None,
802                        model_provider: None,
803                        cwd: None,
804                        approval_policy: params.approval_policy,
805                        sandbox: params.sandbox,
806                        events: Vec::new(),
807                        data: json!({"error":"thread not found"}),
808                    })
809                }
810            }
811            ThreadRequest::List(params) => Ok(ThreadResponse {
812                thread_id: "list".to_string(),
813                status: "ok".to_string(),
814                thread: None,
815                threads: self.thread_manager.list_threads(&params)?,
816                model: None,
817                model_provider: None,
818                cwd: None,
819                approval_policy: None,
820                sandbox: None,
821                events: Vec::new(),
822                data: json!({}),
823            }),
824            ThreadRequest::Read(params) => {
825                let id = params.thread_id.clone();
826                let data = self.persisted_thread_data(&id)?;
827                Ok(ThreadResponse {
828                    thread_id: id,
829                    status: "ok".to_string(),
830                    thread: self.thread_manager.read_thread(&params)?,
831                    threads: Vec::new(),
832                    model: None,
833                    model_provider: None,
834                    cwd: None,
835                    approval_policy: None,
836                    sandbox: None,
837                    events: Vec::new(),
838                    data,
839                })
840            }
841            ThreadRequest::SetName(params) => Ok(ThreadResponse {
842                thread_id: params.thread_id.clone(),
843                status: "ok".to_string(),
844                thread: self.thread_manager.set_thread_name(&params)?,
845                threads: Vec::new(),
846                model: None,
847                model_provider: None,
848                cwd: None,
849                approval_policy: None,
850                sandbox: None,
851                events: Vec::new(),
852                data: json!({}),
853            }),
854            ThreadRequest::Archive { thread_id } => {
855                self.thread_manager.archive_thread(&thread_id)?;
856                Ok(ThreadResponse {
857                    thread_id,
858                    status: "archived".to_string(),
859                    thread: None,
860                    threads: Vec::new(),
861                    model: None,
862                    model_provider: None,
863                    cwd: None,
864                    approval_policy: None,
865                    sandbox: None,
866                    events: Vec::new(),
867                    data: json!({}),
868                })
869            }
870            ThreadRequest::Unarchive { thread_id } => {
871                self.thread_manager.unarchive_thread(&thread_id)?;
872                Ok(ThreadResponse {
873                    thread_id,
874                    status: "unarchived".to_string(),
875                    thread: None,
876                    threads: Vec::new(),
877                    model: None,
878                    model_provider: None,
879                    cwd: None,
880                    approval_policy: None,
881                    sandbox: None,
882                    events: Vec::new(),
883                    data: json!({}),
884                })
885            }
886            ThreadRequest::Message { thread_id, input } => {
887                self.thread_manager.touch_message(&thread_id, &input)?;
888                let response_id = format!("{thread_id}:{}", input.len());
889                self.hooks
890                    .emit(HookEvent::ResponseStart {
891                        response_id: response_id.clone(),
892                    })
893                    .await;
894                self.hooks
895                    .emit(HookEvent::ResponseEnd {
896                        response_id: response_id.clone(),
897                    })
898                    .await;
899
900                Ok(ThreadResponse {
901                    thread_id,
902                    status: "accepted".to_string(),
903                    thread: None,
904                    threads: Vec::new(),
905                    model: None,
906                    model_provider: None,
907                    cwd: None,
908                    approval_policy: None,
909                    sandbox: None,
910                    events: vec![
911                        EventFrame::ResponseStart {
912                            response_id: response_id.clone(),
913                        },
914                        EventFrame::ResponseDelta {
915                            response_id: response_id.clone(),
916                            delta: "queued".to_string(),
917                            channel: ResponseChannel::Text,
918                        },
919                        EventFrame::ResponseEnd { response_id },
920                    ],
921                    data: json!({}),
922                })
923            }
924        }
925    }
926
927    pub async fn handle_prompt(
928        &mut self,
929        req: PromptRequest,
930        cli_overrides: &CliRuntimeOverrides,
931    ) -> Result<PromptResponse> {
932        let resolved = self.config.resolve_runtime_options(cli_overrides);
933        let requested_model = req.model.clone().unwrap_or_else(|| resolved.model.clone());
934        let selection = self
935            .model_registry
936            .resolve(Some(&requested_model), Some(resolved.provider));
937        let resolved_model = selection.resolved.id.clone();
938        let response_id = format!("resp-{}", Uuid::new_v4());
939
940        self.hooks
941            .emit(HookEvent::ResponseStart {
942                response_id: response_id.clone(),
943            })
944            .await;
945        self.hooks
946            .emit(HookEvent::ResponseDelta {
947                response_id: response_id.clone(),
948                delta: "model-selected".to_string(),
949            })
950            .await;
951        self.hooks
952            .emit(HookEvent::ResponseEnd {
953                response_id: response_id.clone(),
954            })
955            .await;
956
957        let payload = json!({
958            "provider": resolved.provider.as_str(),
959            "model": resolved_model.clone(),
960            "prompt": req.prompt,
961            "telemetry": resolved.telemetry,
962            "base_url": resolved.base_url,
963            "has_api_key": resolved.api_key.as_ref().is_some_and(|k| !k.trim().is_empty()),
964            "approval_policy": resolved.approval_policy,
965            "sandbox_mode": resolved.sandbox_mode
966        });
967        if let Some(thread_id) = req.thread_id.as_ref() {
968            self.thread_manager.touch_message(thread_id, &req.prompt)?;
969            let assistant_message_id = self.thread_manager.store.append_message(
970                thread_id,
971                "assistant",
972                &payload.to_string(),
973                Some(payload.clone()),
974            )?;
975            self.persist_latest_checkpoint(
976                thread_id,
977                "prompt_response",
978                json!({
979                    "response_id": response_id.clone(),
980                    "model": resolved_model.clone(),
981                    "provider": resolved.provider.as_str(),
982                    "assistant_message_id": assistant_message_id
983                }),
984            )?;
985        }
986
987        Ok(PromptResponse {
988            output: payload.to_string(),
989            model: resolved_model,
990            events: vec![
991                EventFrame::ResponseStart {
992                    response_id: response_id.clone(),
993                },
994                EventFrame::ResponseDelta {
995                    response_id: response_id.clone(),
996                    delta: "model-selected".to_string(),
997                    channel: ResponseChannel::Text,
998                },
999                EventFrame::ResponseEnd { response_id },
1000            ],
1001        })
1002    }
1003
1004    pub async fn invoke_tool(
1005        &self,
1006        call: ToolCall,
1007        approval_mode: AskForApproval,
1008        cwd: &Path,
1009    ) -> Result<Value> {
1010        let fallback_cwd = cwd.display().to_string();
1011        let (command, policy_cwd, execution_kind) = call.execution_subject(&fallback_cwd);
1012        let decision = self.exec_policy.check(ExecPolicyContext {
1013            command: &command,
1014            cwd: &policy_cwd,
1015            ask_for_approval: approval_mode,
1016            sandbox_mode: None,
1017        })?;
1018        let precheck = policy_precheck_payload(&decision, &command, &policy_cwd, execution_kind);
1019        let response_id = format!("tool-{}", Uuid::new_v4());
1020        let call_id = call
1021            .raw_tool_call_id
1022            .clone()
1023            .unwrap_or_else(|| format!("tool-call-{}", Uuid::new_v4()));
1024        self.hooks
1025            .emit(HookEvent::ToolLifecycle {
1026                response_id: response_id.clone(),
1027                tool_name: call.name.clone(),
1028                phase: "precheck".to_string(),
1029                payload: precheck.clone(),
1030            })
1031            .await;
1032
1033        if !decision.allow {
1034            let reason = decision.reason().to_string();
1035            let approval_id = format!("approval-{}", Uuid::new_v4());
1036            let error_frame = EventFrame::Error {
1037                response_id: response_id.clone(),
1038                message: reason.clone(),
1039            };
1040            self.hooks
1041                .emit(HookEvent::ApprovalLifecycle {
1042                    approval_id,
1043                    phase: "denied".to_string(),
1044                    reason: Some(reason.clone()),
1045                })
1046                .await;
1047            self.hooks
1048                .emit(HookEvent::GenericEventFrame {
1049                    frame: error_frame.clone(),
1050                })
1051                .await;
1052            return Ok(json!({
1053                "ok": false,
1054                "status": "denied",
1055                "execution_kind": execution_kind,
1056                "response_id": response_id,
1057                "precheck": precheck,
1058                "error": reason,
1059                "events": [event_frame_payload(&error_frame)],
1060            }));
1061        }
1062
1063        if decision.requires_approval {
1064            let approval_id = format!("approval-{}", Uuid::new_v4());
1065            let reason = decision.reason().to_string();
1066            let maybe_approval_frame = approval_request_frame(
1067                &decision.requirement,
1068                call_id,
1069                approval_id.clone(),
1070                response_id.clone(),
1071                command.clone(),
1072                policy_cwd.clone(),
1073            );
1074            self.hooks
1075                .emit(HookEvent::ApprovalLifecycle {
1076                    approval_id: approval_id.clone(),
1077                    phase: "requested".to_string(),
1078                    reason: Some(reason.clone()),
1079                })
1080                .await;
1081            let mut events = Vec::new();
1082            if let Some(frame) = maybe_approval_frame {
1083                self.hooks
1084                    .emit(HookEvent::GenericEventFrame {
1085                        frame: frame.clone(),
1086                    })
1087                    .await;
1088                events.push(event_frame_payload(&frame));
1089            }
1090            return Ok(json!({
1091                "ok": false,
1092                "status": "approval_required",
1093                "execution_kind": execution_kind,
1094                "response_id": response_id,
1095                "approval_id": approval_id,
1096                "precheck": precheck,
1097                "error": reason,
1098                "events": events,
1099            }));
1100        }
1101
1102        let start_frame = EventFrame::ToolCallStart {
1103            response_id: response_id.clone(),
1104            tool_name: call.name.clone(),
1105            arguments: tool_payload_value(&call.payload),
1106        };
1107        self.hooks
1108            .emit(HookEvent::GenericEventFrame {
1109                frame: start_frame.clone(),
1110            })
1111            .await;
1112        self.hooks
1113            .emit(HookEvent::ToolLifecycle {
1114                response_id: response_id.clone(),
1115                tool_name: call.name.clone(),
1116                phase: "dispatching".to_string(),
1117                payload: json!({
1118                    "call_id": call_id,
1119                    "execution_kind": execution_kind
1120                }),
1121            })
1122            .await;
1123
1124        match self.tool_registry.dispatch(call.clone(), true).await {
1125            Ok(tool_output) => {
1126                let result_frame = EventFrame::ToolCallResult {
1127                    response_id: response_id.clone(),
1128                    tool_name: call.name.clone(),
1129                    output: tool_output_value(&tool_output),
1130                };
1131                self.hooks
1132                    .emit(HookEvent::GenericEventFrame {
1133                        frame: result_frame.clone(),
1134                    })
1135                    .await;
1136                self.hooks
1137                    .emit(HookEvent::ToolLifecycle {
1138                        response_id: response_id.clone(),
1139                        tool_name: call.name,
1140                        phase: "completed".to_string(),
1141                        payload: json!({ "ok": true }),
1142                    })
1143                    .await;
1144                Ok(json!({
1145                    "ok": true,
1146                    "status": "completed",
1147                    "execution_kind": execution_kind,
1148                    "response_id": response_id,
1149                    "precheck": precheck,
1150                    "output": tool_output,
1151                    "events": [
1152                        event_frame_payload(&start_frame),
1153                        event_frame_payload(&result_frame)
1154                    ]
1155                }))
1156            }
1157            Err(err) => {
1158                let message = format!("{err:?}");
1159                let error_frame = EventFrame::Error {
1160                    response_id: response_id.clone(),
1161                    message: message.clone(),
1162                };
1163                self.hooks
1164                    .emit(HookEvent::GenericEventFrame {
1165                        frame: error_frame.clone(),
1166                    })
1167                    .await;
1168                self.hooks
1169                    .emit(HookEvent::ToolLifecycle {
1170                        response_id: response_id.clone(),
1171                        tool_name: call.name,
1172                        phase: "failed".to_string(),
1173                        payload: json!({ "error": message.clone() }),
1174                    })
1175                    .await;
1176                Ok(json!({
1177                    "ok": false,
1178                    "status": "failed",
1179                    "execution_kind": execution_kind,
1180                    "response_id": response_id,
1181                    "precheck": precheck,
1182                    "error": message,
1183                    "events": [
1184                        event_frame_payload(&start_frame),
1185                        event_frame_payload(&error_frame)
1186                    ]
1187                }))
1188            }
1189        }
1190    }
1191
1192    pub async fn mcp_startup(&self) -> McpStartupCompleteEvent {
1193        let mut updates = Vec::new();
1194        let summary = self.mcp_manager.start_all(|update| {
1195            updates.push(update);
1196        });
1197        for update in updates {
1198            let status = match update.status {
1199                McpManagerStartupStatus::Starting => codewhale_protocol::McpStartupStatus::Starting,
1200                McpManagerStartupStatus::Ready => codewhale_protocol::McpStartupStatus::Ready,
1201                McpManagerStartupStatus::Failed { error } => {
1202                    codewhale_protocol::McpStartupStatus::Failed { error }
1203                }
1204                McpManagerStartupStatus::Cancelled => {
1205                    codewhale_protocol::McpStartupStatus::Cancelled
1206                }
1207            };
1208            self.hooks
1209                .emit(HookEvent::GenericEventFrame {
1210                    frame: EventFrame::McpStartupUpdate {
1211                        update: codewhale_protocol::McpStartupUpdateEvent {
1212                            server_name: update.server_name,
1213                            status,
1214                        },
1215                    },
1216                })
1217                .await;
1218        }
1219        self.hooks
1220            .emit(HookEvent::GenericEventFrame {
1221                frame: EventFrame::McpStartupComplete {
1222                    summary: codewhale_protocol::McpStartupCompleteEvent {
1223                        ready: summary.ready.clone(),
1224                        failed: summary
1225                            .failed
1226                            .iter()
1227                            .map(|f| codewhale_protocol::McpStartupFailure {
1228                                server_name: f.server_name.clone(),
1229                                error: f.error.clone(),
1230                            })
1231                            .collect(),
1232                        cancelled: summary.cancelled.clone(),
1233                    },
1234                },
1235            })
1236            .await;
1237        summary
1238    }
1239
1240    pub fn app_status(&self) -> AppResponse {
1241        let jobs = self.jobs.list();
1242        let events = jobs
1243            .iter()
1244            .flat_map(|job| {
1245                job.history.iter().map(|entry| EventFrame::ResponseDelta {
1246                    response_id: job.id.clone(),
1247                    delta: json!({
1248                        "kind": "job_transition",
1249                        "job_id": job.id.clone(),
1250                        "phase": entry.phase.clone(),
1251                        "status": job_status_to_str(entry.status),
1252                        "progress": entry.progress,
1253                        "detail": entry.detail.clone(),
1254                        "retry": job_retry_to_value(&entry.retry),
1255                        "at": entry.at
1256                    })
1257                    .to_string(),
1258                    channel: ResponseChannel::Text,
1259                })
1260            })
1261            .collect::<Vec<_>>();
1262        AppResponse {
1263            ok: true,
1264            data: json!({
1265                "jobs": jobs.into_iter().map(|job| {
1266                    json!({
1267                        "id": job.id,
1268                        "name": job.name,
1269                        "status": job_status_to_str(job.status),
1270                        "progress": job.progress,
1271                        "detail": job.detail,
1272                        "retry": job_retry_to_value(&job.retry),
1273                        "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
1274                    })
1275                }).collect::<Vec<_>>()
1276            }),
1277            events,
1278        }
1279    }
1280
1281    pub fn provider_default(&self) -> ProviderKind {
1282        self.config.provider
1283    }
1284
1285    pub fn save_thread_checkpoint(
1286        &self,
1287        thread_id: &str,
1288        checkpoint_id: &str,
1289        state: &Value,
1290    ) -> Result<()> {
1291        self.thread_manager
1292            .state_store()
1293            .save_checkpoint(thread_id, checkpoint_id, state)
1294    }
1295
1296    pub fn load_thread_checkpoint(
1297        &self,
1298        thread_id: &str,
1299        checkpoint_id: Option<&str>,
1300    ) -> Result<Option<Value>> {
1301        Ok(self
1302            .thread_manager
1303            .state_store()
1304            .load_checkpoint(thread_id, checkpoint_id)?
1305            .map(|checkpoint| checkpoint.state))
1306    }
1307
1308    pub fn enqueue_job(&mut self, name: impl Into<String>) -> Result<JobRecord> {
1309        let job = self.jobs.enqueue(name);
1310        self.jobs
1311            .persist_job(self.thread_manager.state_store(), &job.id)?;
1312        Ok(job)
1313    }
1314
1315    pub fn set_job_running(&mut self, job_id: &str) -> Result<()> {
1316        self.jobs.set_running(job_id);
1317        self.jobs
1318            .persist_job(self.thread_manager.state_store(), job_id)
1319    }
1320
1321    pub fn update_job_progress(
1322        &mut self,
1323        job_id: &str,
1324        progress: u8,
1325        detail: Option<String>,
1326    ) -> Result<()> {
1327        self.jobs.update_progress(job_id, progress, detail);
1328        self.jobs
1329            .persist_job(self.thread_manager.state_store(), job_id)
1330    }
1331
1332    pub fn complete_job(&mut self, job_id: &str) -> Result<()> {
1333        self.jobs.complete(job_id);
1334        self.jobs
1335            .persist_job(self.thread_manager.state_store(), job_id)
1336    }
1337
1338    pub fn fail_job(&mut self, job_id: &str, detail: impl Into<String>) -> Result<()> {
1339        self.jobs.fail(job_id, detail);
1340        self.jobs
1341            .persist_job(self.thread_manager.state_store(), job_id)
1342    }
1343
1344    pub fn cancel_job(&mut self, job_id: &str) -> Result<()> {
1345        self.jobs.cancel(job_id);
1346        self.jobs
1347            .persist_job(self.thread_manager.state_store(), job_id)
1348    }
1349
1350    pub fn pause_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1351        self.jobs.pause(job_id, detail);
1352        self.jobs
1353            .persist_job(self.thread_manager.state_store(), job_id)
1354    }
1355
1356    pub fn resume_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1357        self.jobs.resume(job_id, detail);
1358        self.jobs
1359            .persist_job(self.thread_manager.state_store(), job_id)
1360    }
1361
1362    pub fn job_history(&self, job_id: &str) -> Vec<JobHistoryEntry> {
1363        self.jobs.history(job_id)
1364    }
1365}
1366
1367fn thread_response_from_new(status: &str, new: NewThread) -> ThreadResponse {
1368    ThreadResponse {
1369        thread_id: new.thread.id.clone(),
1370        status: status.to_string(),
1371        thread: Some(new.thread),
1372        threads: Vec::new(),
1373        model: Some(new.model),
1374        model_provider: Some(new.model_provider),
1375        cwd: Some(new.cwd),
1376        approval_policy: new.approval_policy,
1377        sandbox: new.sandbox,
1378        events: Vec::new(),
1379        data: json!({}),
1380    }
1381}
1382
1383fn preview_from_initial_history(initial_history: &InitialHistory) -> String {
1384    match initial_history {
1385        InitialHistory::New => "New conversation".to_string(),
1386        InitialHistory::Forked(items) => truncate_preview(
1387            &items
1388                .first()
1389                .map(Value::to_string)
1390                .unwrap_or_else(|| "Forked conversation".to_string()),
1391        ),
1392        InitialHistory::Resumed { history, .. } => truncate_preview(
1393            &history
1394                .first()
1395                .map(Value::to_string)
1396                .unwrap_or_else(|| "Resumed conversation".to_string()),
1397        ),
1398    }
1399}
1400
1401fn truncate_preview(value: &str) -> String {
1402    value.chars().take(120).collect()
1403}
1404
1405fn to_protocol_thread(thread: ThreadMetadata) -> Thread {
1406    Thread {
1407        id: thread.id,
1408        preview: thread.preview,
1409        ephemeral: thread.ephemeral,
1410        model_provider: thread.model_provider,
1411        created_at: thread.created_at,
1412        updated_at: thread.updated_at,
1413        status: match thread.status {
1414            PersistedThreadStatus::Running => ThreadStatus::Running,
1415            PersistedThreadStatus::Idle => ThreadStatus::Idle,
1416            PersistedThreadStatus::Completed => ThreadStatus::Completed,
1417            PersistedThreadStatus::Failed => ThreadStatus::Failed,
1418            PersistedThreadStatus::Paused => ThreadStatus::Paused,
1419            PersistedThreadStatus::Archived => ThreadStatus::Archived,
1420        },
1421        path: thread.path,
1422        cwd: thread.cwd,
1423        cli_version: thread.cli_version,
1424        source: match thread.source {
1425            SessionSource::Interactive => codewhale_protocol::SessionSource::Interactive,
1426            SessionSource::Resume => codewhale_protocol::SessionSource::Resume,
1427            SessionSource::Fork => codewhale_protocol::SessionSource::Fork,
1428            SessionSource::Api => codewhale_protocol::SessionSource::Api,
1429            SessionSource::Unknown => codewhale_protocol::SessionSource::Unknown,
1430        },
1431        name: thread.name,
1432    }
1433}
1434
1435fn to_persisted_status(status: &ThreadStatus) -> PersistedThreadStatus {
1436    match status {
1437        ThreadStatus::Running => PersistedThreadStatus::Running,
1438        ThreadStatus::Idle => PersistedThreadStatus::Idle,
1439        ThreadStatus::Completed => PersistedThreadStatus::Completed,
1440        ThreadStatus::Failed => PersistedThreadStatus::Failed,
1441        ThreadStatus::Paused => PersistedThreadStatus::Paused,
1442        ThreadStatus::Archived => PersistedThreadStatus::Archived,
1443    }
1444}
1445
1446fn to_persisted_source(source: &codewhale_protocol::SessionSource) -> SessionSource {
1447    match source {
1448        codewhale_protocol::SessionSource::Interactive => SessionSource::Interactive,
1449        codewhale_protocol::SessionSource::Resume => SessionSource::Resume,
1450        codewhale_protocol::SessionSource::Fork => SessionSource::Fork,
1451        codewhale_protocol::SessionSource::Api => SessionSource::Api,
1452        codewhale_protocol::SessionSource::Unknown => SessionSource::Unknown,
1453    }
1454}
1455
1456fn approval_request_frame(
1457    requirement: &ExecApprovalRequirement,
1458    call_id: String,
1459    approval_id: String,
1460    turn_id: String,
1461    command: String,
1462    cwd: String,
1463) -> Option<EventFrame> {
1464    let ExecApprovalRequirement::NeedsApproval {
1465        reason,
1466        proposed_execpolicy_amendment,
1467        proposed_network_policy_amendments,
1468    } = requirement
1469    else {
1470        return None;
1471    };
1472
1473    let mut available_decisions = vec![
1474        ReviewDecision::Approved,
1475        ReviewDecision::ApprovedForSession,
1476        ReviewDecision::Denied,
1477        ReviewDecision::Abort,
1478    ];
1479    if proposed_execpolicy_amendment
1480        .as_ref()
1481        .is_some_and(|amendment| !amendment.prefixes.is_empty())
1482    {
1483        available_decisions.push(ReviewDecision::ApprovedExecpolicyAmendment);
1484    }
1485    available_decisions.extend(proposed_network_policy_amendments.iter().cloned().map(
1486        |amendment| ReviewDecision::NetworkPolicyAmendment {
1487            host: amendment.host,
1488            action: amendment.action,
1489        },
1490    ));
1491
1492    Some(EventFrame::ExecApprovalRequest {
1493        request: ExecApprovalRequestEvent {
1494            call_id,
1495            approval_id,
1496            turn_id,
1497            command,
1498            cwd,
1499            reason: reason.clone(),
1500            network_approval_context: None,
1501            proposed_execpolicy_amendment: proposed_execpolicy_amendment
1502                .as_ref()
1503                .map(|amendment| amendment.prefixes.clone())
1504                .unwrap_or_default(),
1505            proposed_network_policy_amendments: proposed_network_policy_amendments.clone(),
1506            additional_permissions: Vec::new(),
1507            available_decisions,
1508        },
1509    })
1510}
1511
1512fn approval_requirement_payload(requirement: &ExecApprovalRequirement) -> Value {
1513    match requirement {
1514        ExecApprovalRequirement::Skip {
1515            bypass_sandbox,
1516            proposed_execpolicy_amendment,
1517        } => json!({
1518            "type": "skip",
1519            "bypass_sandbox": bypass_sandbox,
1520            "reason": requirement.reason(),
1521            "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1522                .as_ref()
1523                .map(|amendment| amendment.prefixes.clone())
1524                .unwrap_or_default()
1525        }),
1526        ExecApprovalRequirement::NeedsApproval {
1527            reason,
1528            proposed_execpolicy_amendment,
1529            proposed_network_policy_amendments,
1530        } => json!({
1531            "type": "needs_approval",
1532            "reason": reason,
1533            "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1534                .as_ref()
1535                .map(|amendment| amendment.prefixes.clone())
1536                .unwrap_or_default(),
1537            "proposed_network_policy_amendments": proposed_network_policy_amendments
1538        }),
1539        ExecApprovalRequirement::Forbidden { reason } => json!({
1540            "type": "forbidden",
1541            "reason": reason
1542        }),
1543    }
1544}
1545
1546fn policy_precheck_payload(
1547    decision: &ExecPolicyDecision,
1548    command: &str,
1549    cwd: &str,
1550    execution_kind: &str,
1551) -> Value {
1552    json!({
1553        "execution_kind": execution_kind,
1554        "command": command,
1555        "cwd": cwd,
1556        "allow": decision.allow,
1557        "requires_approval": decision.requires_approval,
1558        "matched_rule": decision.matched_rule.clone(),
1559        "phase": decision.requirement.phase(),
1560        "reason": decision.reason(),
1561        "requirement": approval_requirement_payload(&decision.requirement)
1562    })
1563}
1564
1565fn tool_payload_value(payload: &ToolPayload) -> Value {
1566    serde_json::to_value(payload).unwrap_or_else(
1567        |_| json!({"type":"serialization_error","message":"tool payload unavailable"}),
1568    )
1569}
1570
1571fn tool_output_value(output: &codewhale_protocol::ToolOutput) -> Value {
1572    serde_json::to_value(output).unwrap_or_else(
1573        |_| json!({"type":"serialization_error","message":"tool output unavailable"}),
1574    )
1575}
1576
1577fn event_frame_payload(frame: &EventFrame) -> Value {
1578    serde_json::to_value(frame)
1579        .unwrap_or_else(|_| json!({"event":"error","message":"failed to encode event frame"}))
1580}
1581
1582fn json_optional_string(value: &Value) -> Option<String> {
1583    if value.is_null() {
1584        None
1585    } else {
1586        value.as_str().map(ToString::to_string)
1587    }
1588}
1589
1590fn parse_retry_metadata(value: Option<&Value>) -> JobRetryMetadata {
1591    let Some(value) = value else {
1592        return JobRetryMetadata::default();
1593    };
1594    JobRetryMetadata {
1595        attempt: value
1596            .get("attempt")
1597            .and_then(Value::as_u64)
1598            .unwrap_or(0)
1599            .min(u32::MAX as u64) as u32,
1600        max_attempts: value
1601            .get("max_attempts")
1602            .and_then(Value::as_u64)
1603            .unwrap_or(DEFAULT_JOB_MAX_ATTEMPTS as u64)
1604            .min(u32::MAX as u64) as u32,
1605        backoff_base_ms: value
1606            .get("backoff_base_ms")
1607            .and_then(Value::as_u64)
1608            .unwrap_or(DEFAULT_JOB_BACKOFF_BASE_MS),
1609        next_backoff_ms: value
1610            .get("next_backoff_ms")
1611            .and_then(Value::as_u64)
1612            .unwrap_or(0),
1613        next_retry_at: value.get("next_retry_at").and_then(Value::as_i64),
1614    }
1615}
1616
1617fn parse_history_entry(value: &Value) -> Option<JobHistoryEntry> {
1618    let status = value
1619        .get("status")
1620        .and_then(Value::as_str)
1621        .and_then(job_status_from_str)?;
1622    Some(JobHistoryEntry {
1623        at: value.get("at").and_then(Value::as_i64).unwrap_or(0),
1624        phase: value
1625            .get("phase")
1626            .and_then(Value::as_str)
1627            .unwrap_or("unknown")
1628            .to_string(),
1629        status,
1630        progress: value
1631            .get("progress")
1632            .and_then(Value::as_u64)
1633            .map(|v| v.min(u8::MAX as u64) as u8),
1634        detail: value.get("detail").and_then(json_optional_string),
1635        retry: parse_retry_metadata(value.get("retry")),
1636    })
1637}
1638
1639fn job_status_to_str(status: JobStatus) -> &'static str {
1640    match status {
1641        JobStatus::Queued => "queued",
1642        JobStatus::Running => "running",
1643        JobStatus::Paused => "paused",
1644        JobStatus::Completed => "completed",
1645        JobStatus::Failed => "failed",
1646        JobStatus::Cancelled => "cancelled",
1647    }
1648}
1649
1650fn job_status_from_str(value: &str) -> Option<JobStatus> {
1651    match value {
1652        "queued" => Some(JobStatus::Queued),
1653        "running" => Some(JobStatus::Running),
1654        "paused" => Some(JobStatus::Paused),
1655        "completed" => Some(JobStatus::Completed),
1656        "failed" => Some(JobStatus::Failed),
1657        "cancelled" => Some(JobStatus::Cancelled),
1658        _ => None,
1659    }
1660}
1661
1662fn job_retry_to_value(retry: &JobRetryMetadata) -> Value {
1663    json!({
1664        "attempt": retry.attempt,
1665        "max_attempts": retry.max_attempts,
1666        "backoff_base_ms": retry.backoff_base_ms,
1667        "next_backoff_ms": retry.next_backoff_ms,
1668        "next_retry_at": retry.next_retry_at
1669    })
1670}
1671
1672fn job_history_to_value(entry: &JobHistoryEntry) -> Value {
1673    json!({
1674        "at": entry.at,
1675        "phase": entry.phase.clone(),
1676        "status": job_status_to_str(entry.status),
1677        "progress": entry.progress,
1678        "detail": entry.detail.clone(),
1679        "retry": job_retry_to_value(&entry.retry)
1680    })
1681}
1682
1683fn runtime_status_to_job_state(status: JobStatus) -> JobStateStatus {
1684    match status {
1685        JobStatus::Queued => JobStateStatus::Queued,
1686        JobStatus::Running => JobStateStatus::Running,
1687        JobStatus::Paused => JobStateStatus::Running,
1688        JobStatus::Completed => JobStateStatus::Completed,
1689        JobStatus::Failed => JobStateStatus::Failed,
1690        JobStatus::Cancelled => JobStateStatus::Cancelled,
1691    }
1692}
1693
1694fn job_state_status_to_runtime(status: JobStateStatus) -> JobStatus {
1695    match status {
1696        JobStateStatus::Queued => JobStatus::Queued,
1697        JobStateStatus::Running => JobStatus::Running,
1698        JobStateStatus::Completed => JobStatus::Completed,
1699        JobStateStatus::Failed => JobStatus::Failed,
1700        JobStateStatus::Cancelled => JobStatus::Cancelled,
1701    }
1702}