Skip to main content

deepseek_core/
lib.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use anyhow::Result;
6use deepseek_agent::ModelRegistry;
7use deepseek_config::{CliRuntimeOverrides, ConfigToml, ProviderKind};
8use deepseek_execpolicy::{
9    AskForApproval, ExecApprovalRequirement, ExecPolicyContext, ExecPolicyDecision,
10    ExecPolicyEngine,
11};
12use deepseek_hooks::{HookDispatcher, HookEvent};
13use deepseek_mcp::{
14    McpManager, McpStartupCompleteEvent, McpStartupStatus as McpManagerStartupStatus,
15};
16use deepseek_protocol::{
17    AppResponse, EventFrame, ExecApprovalRequestEvent, PromptRequest, PromptResponse,
18    ReviewDecision, Thread, ThreadForkParams, ThreadListParams, ThreadReadParams, ThreadRequest,
19    ThreadResponse, ThreadResumeParams, ThreadSetNameParams, ThreadStatus, ToolPayload,
20};
21use deepseek_state::{
22    JobStateRecord, JobStateStatus, SessionSource, StateStore, ThreadListFilters, ThreadMetadata,
23    ThreadStatus as PersistedThreadStatus,
24};
25use deepseek_tools::{ToolCall, ToolRegistry};
26use serde_json::{Value, json};
27use uuid::Uuid;
28
29#[derive(Debug, Clone)]
30pub enum InitialHistory {
31    New,
32    Forked(Vec<Value>),
33    Resumed {
34        conversation_id: String,
35        history: Vec<Value>,
36        rollout_path: PathBuf,
37    },
38}
39
40#[derive(Debug, Clone)]
41pub struct NewThread {
42    pub thread: Thread,
43    pub model: String,
44    pub model_provider: String,
45    pub cwd: PathBuf,
46    pub approval_policy: Option<String>,
47    pub sandbox: Option<String>,
48}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub enum JobStatus {
52    Queued,
53    Running,
54    Paused,
55    Completed,
56    Failed,
57    Cancelled,
58}
59
60const JOB_DETAIL_SCHEMA_VERSION: u8 = 1;
61const DEFAULT_JOB_MAX_ATTEMPTS: u32 = 3;
62const DEFAULT_JOB_BACKOFF_BASE_MS: u64 = 500;
63const MAX_JOB_HISTORY_ENTRIES: usize = 64;
64
65#[derive(Debug, Clone)]
66pub struct JobRetryMetadata {
67    pub attempt: u32,
68    pub max_attempts: u32,
69    pub backoff_base_ms: u64,
70    pub next_backoff_ms: u64,
71    pub next_retry_at: Option<i64>,
72}
73
74impl Default for JobRetryMetadata {
75    fn default() -> Self {
76        Self {
77            attempt: 0,
78            max_attempts: DEFAULT_JOB_MAX_ATTEMPTS,
79            backoff_base_ms: DEFAULT_JOB_BACKOFF_BASE_MS,
80            next_backoff_ms: 0,
81            next_retry_at: None,
82        }
83    }
84}
85
86#[derive(Debug, Clone)]
87pub struct JobHistoryEntry {
88    pub at: i64,
89    pub phase: String,
90    pub status: JobStatus,
91    pub progress: Option<u8>,
92    pub detail: Option<String>,
93    pub retry: JobRetryMetadata,
94}
95
96#[derive(Debug, Clone)]
97struct PersistedJobDetail {
98    pub status: JobStatus,
99    pub detail: Option<String>,
100    pub retry: JobRetryMetadata,
101    pub history: Vec<JobHistoryEntry>,
102}
103
104#[derive(Debug, Clone)]
105pub struct JobRecord {
106    pub id: String,
107    pub name: String,
108    pub status: JobStatus,
109    pub progress: Option<u8>,
110    pub detail: Option<String>,
111    pub retry: JobRetryMetadata,
112    pub history: Vec<JobHistoryEntry>,
113    pub created_at: i64,
114    pub updated_at: i64,
115}
116
117#[derive(Debug, Default)]
118pub struct JobManager {
119    jobs: HashMap<String, JobRecord>,
120}
121
122impl JobManager {
123    fn now_ts() -> i64 {
124        chrono::Utc::now().timestamp()
125    }
126
127    fn deterministic_backoff_ms(retry: &JobRetryMetadata) -> u64 {
128        if retry.attempt == 0 {
129            return 0;
130        }
131        let exponent = retry.attempt.saturating_sub(1).min(20);
132        let multiplier = 1u64.checked_shl(exponent).unwrap_or(u64::MAX);
133        retry.backoff_base_ms.saturating_mul(multiplier)
134    }
135
136    fn clear_retry_schedule(retry: &mut JobRetryMetadata) {
137        retry.next_backoff_ms = 0;
138        retry.next_retry_at = None;
139    }
140
141    fn push_history(job: &mut JobRecord, phase: &str) {
142        job.history.push(JobHistoryEntry {
143            at: job.updated_at,
144            phase: phase.to_string(),
145            status: job.status,
146            progress: job.progress,
147            detail: job.detail.clone(),
148            retry: job.retry.clone(),
149        });
150        if job.history.len() > MAX_JOB_HISTORY_ENTRIES {
151            let to_drain = job.history.len() - MAX_JOB_HISTORY_ENTRIES;
152            job.history.drain(0..to_drain);
153        }
154    }
155
156    fn parse_persisted_detail(raw: Option<&str>) -> Option<PersistedJobDetail> {
157        let raw = raw?;
158        let parsed: Value = serde_json::from_str(raw).ok()?;
159        let status = parsed
160            .get("status")
161            .and_then(Value::as_str)
162            .and_then(job_status_from_str)?;
163        let detail = parsed.get("detail").and_then(json_optional_string);
164        let retry = parse_retry_metadata(parsed.get("retry"));
165        let history = parsed
166            .get("history")
167            .and_then(Value::as_array)
168            .map(|items| {
169                items
170                    .iter()
171                    .filter_map(parse_history_entry)
172                    .collect::<Vec<_>>()
173            })
174            .unwrap_or_default();
175        Some(PersistedJobDetail {
176            status,
177            detail,
178            retry,
179            history,
180        })
181    }
182
183    fn encode_persisted_detail(job: &JobRecord) -> Result<Option<String>> {
184        let encoded = json!({
185            "schema_version": JOB_DETAIL_SCHEMA_VERSION,
186            "status": job_status_to_str(job.status),
187            "detail": job.detail.clone(),
188            "retry": job_retry_to_value(&job.retry),
189            "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
190        })
191        .to_string();
192        Ok(Some(encoded))
193    }
194
195    pub fn enqueue(&mut self, name: impl Into<String>) -> JobRecord {
196        let now = Self::now_ts();
197        let id = format!("job-{}", Uuid::new_v4());
198        let mut job = JobRecord {
199            id: id.clone(),
200            name: name.into(),
201            status: JobStatus::Queued,
202            progress: Some(0),
203            detail: None,
204            retry: JobRetryMetadata::default(),
205            history: Vec::new(),
206            created_at: now,
207            updated_at: now,
208        };
209        Self::push_history(&mut job, "created");
210        self.jobs.insert(id, job.clone());
211        job
212    }
213
214    pub fn set_running(&mut self, id: &str) {
215        if let Some(job) = self.jobs.get_mut(id) {
216            job.status = JobStatus::Running;
217            Self::clear_retry_schedule(&mut job.retry);
218            job.updated_at = Self::now_ts();
219            Self::push_history(job, "running");
220        }
221    }
222
223    pub fn update_progress(&mut self, id: &str, progress: u8, detail: Option<String>) {
224        if let Some(job) = self.jobs.get_mut(id) {
225            job.progress = Some(progress.min(100));
226            job.detail = detail;
227            job.updated_at = Self::now_ts();
228            Self::push_history(job, "progress_updated");
229        }
230    }
231
232    pub fn complete(&mut self, id: &str) {
233        if let Some(job) = self.jobs.get_mut(id) {
234            job.status = JobStatus::Completed;
235            job.progress = Some(100);
236            Self::clear_retry_schedule(&mut job.retry);
237            job.updated_at = Self::now_ts();
238            Self::push_history(job, "completed");
239        }
240    }
241
242    pub fn fail(&mut self, id: &str, detail: impl Into<String>) {
243        if let Some(job) = self.jobs.get_mut(id) {
244            let now = Self::now_ts();
245            job.status = JobStatus::Failed;
246            job.detail = Some(detail.into());
247            if job.retry.attempt < job.retry.max_attempts {
248                job.retry.attempt += 1;
249                job.retry.next_backoff_ms = Self::deterministic_backoff_ms(&job.retry);
250                let delay_secs = ((job.retry.next_backoff_ms.saturating_add(999)) / 1000)
251                    .min(i64::MAX as u64) as i64;
252                job.retry.next_retry_at = Some(now.saturating_add(delay_secs));
253            } else {
254                Self::clear_retry_schedule(&mut job.retry);
255            }
256            job.updated_at = now;
257            Self::push_history(job, "failed");
258        }
259    }
260
261    pub fn cancel(&mut self, id: &str) {
262        if let Some(job) = self.jobs.get_mut(id) {
263            job.status = JobStatus::Cancelled;
264            Self::clear_retry_schedule(&mut job.retry);
265            job.updated_at = Self::now_ts();
266            Self::push_history(job, "cancelled");
267        }
268    }
269
270    pub fn pause(&mut self, id: &str, detail: Option<String>) {
271        if let Some(job) = self.jobs.get_mut(id) {
272            job.status = JobStatus::Paused;
273            if detail.is_some() {
274                job.detail = detail;
275            }
276            job.updated_at = Self::now_ts();
277            Self::push_history(job, "paused");
278        }
279    }
280
281    pub fn resume(&mut self, id: &str, detail: Option<String>) {
282        if let Some(job) = self.jobs.get_mut(id) {
283            job.status = JobStatus::Running;
284            if detail.is_some() {
285                job.detail = detail;
286            }
287            Self::clear_retry_schedule(&mut job.retry);
288            job.updated_at = Self::now_ts();
289            Self::push_history(job, "resumed");
290        }
291    }
292
293    pub fn list(&self) -> Vec<JobRecord> {
294        let mut out = self.jobs.values().cloned().collect::<Vec<_>>();
295        out.sort_by_key(|job| -job.updated_at);
296        out
297    }
298
299    pub fn history(&self, id: &str) -> Vec<JobHistoryEntry> {
300        self.jobs
301            .get(id)
302            .map(|job| job.history.clone())
303            .unwrap_or_default()
304    }
305
306    pub fn resume_pending(&mut self) -> Vec<JobRecord> {
307        let mut resumed = Vec::new();
308        for job in self.jobs.values_mut() {
309            if matches!(job.status, JobStatus::Queued | JobStatus::Running) {
310                job.status = JobStatus::Queued;
311                job.updated_at = Self::now_ts();
312                Self::push_history(job, "queued_after_resume");
313                resumed.push(job.clone());
314            }
315        }
316        resumed
317    }
318
319    pub fn load_from_store(&mut self, store: &StateStore) -> Result<()> {
320        let persisted = store.list_jobs(Some(500))?;
321        for job in persisted {
322            let fallback_status = job_state_status_to_runtime(job.status.clone());
323            let parsed = Self::parse_persisted_detail(job.detail.as_deref());
324            let (status, detail, retry, history) = if let Some(detail_state) = parsed {
325                (
326                    detail_state.status,
327                    detail_state.detail,
328                    detail_state.retry,
329                    detail_state.history,
330                )
331            } else {
332                (
333                    fallback_status,
334                    job.detail,
335                    JobRetryMetadata::default(),
336                    Vec::new(),
337                )
338            };
339            self.jobs.insert(
340                job.id.clone(),
341                JobRecord {
342                    id: job.id,
343                    name: job.name,
344                    status,
345                    progress: job.progress,
346                    detail,
347                    retry,
348                    history,
349                    created_at: job.created_at,
350                    updated_at: job.updated_at,
351                },
352            );
353        }
354        Ok(())
355    }
356
357    pub fn persist_job(&self, store: &StateStore, id: &str) -> Result<()> {
358        let Some(job) = self.jobs.get(id) else {
359            return Ok(());
360        };
361        let encoded_detail = Self::encode_persisted_detail(job)?;
362        store.upsert_job(&JobStateRecord {
363            id: job.id.clone(),
364            name: job.name.clone(),
365            status: runtime_status_to_job_state(job.status),
366            progress: job.progress,
367            detail: encoded_detail,
368            created_at: job.created_at,
369            updated_at: job.updated_at,
370        })
371    }
372
373    pub fn persist_all(&self, store: &StateStore) -> Result<()> {
374        for id in self.jobs.keys() {
375            self.persist_job(store, id)?;
376        }
377        Ok(())
378    }
379}
380
381pub struct ThreadManager {
382    store: StateStore,
383    running_threads: HashMap<String, Thread>,
384    cli_version: String,
385}
386
387impl ThreadManager {
388    pub fn new(store: StateStore) -> Self {
389        Self {
390            store,
391            running_threads: HashMap::new(),
392            cli_version: env!("CARGO_PKG_VERSION").to_string(),
393        }
394    }
395
396    pub fn state_store(&self) -> &StateStore {
397        &self.store
398    }
399
400    pub fn spawn_thread_with_history(
401        &mut self,
402        model_provider: String,
403        cwd: PathBuf,
404        initial_history: InitialHistory,
405        persist_extended_history: bool,
406    ) -> Result<NewThread> {
407        let id = format!("thread-{}", Uuid::new_v4());
408        let now = chrono::Utc::now().timestamp();
409        let preview = preview_from_initial_history(&initial_history);
410        let source = match initial_history {
411            InitialHistory::New => SessionSource::Interactive,
412            InitialHistory::Forked(_) => SessionSource::Fork,
413            InitialHistory::Resumed { .. } => SessionSource::Resume,
414        };
415        let thread = Thread {
416            id: id.clone(),
417            preview,
418            ephemeral: !persist_extended_history,
419            model_provider: model_provider.clone(),
420            created_at: now,
421            updated_at: now,
422            status: ThreadStatus::Running,
423            path: None,
424            cwd: cwd.clone(),
425            cli_version: self.cli_version.clone(),
426            source: match source {
427                SessionSource::Interactive => deepseek_protocol::SessionSource::Interactive,
428                SessionSource::Resume => deepseek_protocol::SessionSource::Resume,
429                SessionSource::Fork => deepseek_protocol::SessionSource::Fork,
430                SessionSource::Api => deepseek_protocol::SessionSource::Api,
431                SessionSource::Unknown => deepseek_protocol::SessionSource::Unknown,
432            },
433            name: None,
434        };
435        self.persist_thread(&thread, None)?;
436        match &initial_history {
437            InitialHistory::Forked(items) => {
438                for item in items {
439                    self.store.append_message(
440                        &thread.id,
441                        "history",
442                        &item.to_string(),
443                        Some(item.clone()),
444                    )?;
445                }
446            }
447            InitialHistory::Resumed { history, .. } => {
448                for item in history {
449                    self.store.append_message(
450                        &thread.id,
451                        "history",
452                        &item.to_string(),
453                        Some(item.clone()),
454                    )?;
455                }
456            }
457            InitialHistory::New => {}
458        }
459        self.running_threads
460            .insert(thread.id.clone(), thread.clone());
461        Ok(NewThread {
462            thread,
463            model: "auto".to_string(),
464            model_provider,
465            cwd,
466            approval_policy: None,
467            sandbox: None,
468        })
469    }
470
471    pub fn resume_thread_with_history(
472        &mut self,
473        params: &ThreadResumeParams,
474        fallback_cwd: &Path,
475        model_provider: String,
476    ) -> Result<Option<NewThread>> {
477        if params.history.is_none()
478            && let Some(thread) = self.running_threads.get(&params.thread_id).cloned()
479        {
480            return Ok(Some(NewThread {
481                model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
482                model_provider: params.model_provider.clone().unwrap_or(model_provider),
483                cwd: params.cwd.clone().unwrap_or_else(|| thread.cwd.clone()),
484                approval_policy: params.approval_policy.clone(),
485                sandbox: params.sandbox.clone(),
486                thread,
487            }));
488        }
489
490        let persisted = self.store.get_thread(&params.thread_id)?;
491        let Some(metadata) = persisted else {
492            return Ok(None);
493        };
494        let mut thread = to_protocol_thread(metadata);
495        thread.status = ThreadStatus::Running;
496        thread.updated_at = chrono::Utc::now().timestamp();
497        thread.cwd = params
498            .cwd
499            .clone()
500            .unwrap_or_else(|| fallback_cwd.to_path_buf());
501        self.persist_thread(&thread, None)?;
502        self.running_threads
503            .insert(thread.id.clone(), thread.clone());
504        if let Some(history) = params.history.as_ref() {
505            for item in history {
506                self.store.append_message(
507                    &thread.id,
508                    "history",
509                    &item.to_string(),
510                    Some(item.clone()),
511                )?;
512            }
513        }
514
515        Ok(Some(NewThread {
516            model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
517            model_provider: params.model_provider.clone().unwrap_or(model_provider),
518            cwd: thread.cwd.clone(),
519            approval_policy: params.approval_policy.clone(),
520            sandbox: params.sandbox.clone(),
521            thread,
522        }))
523    }
524
525    pub fn fork_thread(
526        &mut self,
527        params: &ThreadForkParams,
528        fallback_cwd: &Path,
529    ) -> Result<Option<NewThread>> {
530        let parent = self.store.get_thread(&params.thread_id)?;
531        let Some(parent) = parent else {
532            return Ok(None);
533        };
534        let parent_thread = to_protocol_thread(parent);
535        let new = self.spawn_thread_with_history(
536            params
537                .model_provider
538                .clone()
539                .unwrap_or_else(|| parent_thread.model_provider.clone()),
540            params
541                .cwd
542                .clone()
543                .unwrap_or_else(|| fallback_cwd.to_path_buf()),
544            InitialHistory::Forked(vec![json!({
545                "type": "fork",
546                "from_thread_id": parent_thread.id
547            })]),
548            params.persist_extended_history,
549        )?;
550        Ok(Some(new))
551    }
552
553    pub fn list_threads(&self, params: &ThreadListParams) -> Result<Vec<Thread>> {
554        let list = self.store.list_threads(ThreadListFilters {
555            include_archived: params.include_archived,
556            limit: params.limit,
557        })?;
558        Ok(list.into_iter().map(to_protocol_thread).collect())
559    }
560
561    pub fn read_thread(&self, params: &ThreadReadParams) -> Result<Option<Thread>> {
562        Ok(self
563            .store
564            .get_thread(&params.thread_id)?
565            .map(to_protocol_thread))
566    }
567
568    pub fn set_thread_name(&mut self, params: &ThreadSetNameParams) -> Result<Option<Thread>> {
569        let Some(mut metadata) = self.store.get_thread(&params.thread_id)? else {
570            return Ok(None);
571        };
572        metadata.name = Some(params.name.clone());
573        metadata.updated_at = chrono::Utc::now().timestamp();
574        self.store.upsert_thread(&metadata)?;
575        let updated = to_protocol_thread(metadata);
576        self.running_threads
577            .insert(updated.id.clone(), updated.clone());
578        Ok(Some(updated))
579    }
580
581    pub fn archive_thread(&mut self, thread_id: &str) -> Result<()> {
582        self.store.mark_archived(thread_id)?;
583        if let Some(thread) = self.running_threads.get_mut(thread_id) {
584            thread.status = ThreadStatus::Archived;
585        }
586        Ok(())
587    }
588
589    pub fn unarchive_thread(&mut self, thread_id: &str) -> Result<()> {
590        self.store.mark_unarchived(thread_id)?;
591        Ok(())
592    }
593
594    pub fn touch_message(&mut self, thread_id: &str, input: &str) -> Result<()> {
595        let Some(mut metadata) = self.store.get_thread(thread_id)? else {
596            return Ok(());
597        };
598        metadata.updated_at = chrono::Utc::now().timestamp();
599        metadata.preview = truncate_preview(input);
600        metadata.status = PersistedThreadStatus::Running;
601        self.store.upsert_thread(&metadata)?;
602        if let Some(thread) = self.running_threads.get_mut(thread_id) {
603            thread.updated_at = metadata.updated_at;
604            thread.preview = metadata.preview;
605            thread.status = ThreadStatus::Running;
606        }
607        let message_id = self.store.append_message(thread_id, "user", input, None)?;
608        self.store.save_checkpoint(
609            thread_id,
610            "latest",
611            &json!({
612                "reason": "thread_message",
613                "message_id": message_id,
614                "role": "user",
615                "preview": truncate_preview(input),
616                "updated_at": metadata.updated_at
617            }),
618        )?;
619        Ok(())
620    }
621
622    fn persist_thread(&self, thread: &Thread, rollout_path: Option<PathBuf>) -> Result<()> {
623        self.store.upsert_thread(&ThreadMetadata {
624            id: thread.id.clone(),
625            rollout_path,
626            preview: thread.preview.clone(),
627            ephemeral: thread.ephemeral,
628            model_provider: thread.model_provider.clone(),
629            created_at: thread.created_at,
630            updated_at: thread.updated_at,
631            status: to_persisted_status(&thread.status),
632            path: thread.path.clone(),
633            cwd: thread.cwd.clone(),
634            cli_version: thread.cli_version.clone(),
635            source: to_persisted_source(&thread.source),
636            name: thread.name.clone(),
637            sandbox_policy: None,
638            approval_mode: None,
639            archived: matches!(thread.status, ThreadStatus::Archived),
640            archived_at: None,
641            git_sha: None,
642            git_branch: None,
643            git_origin_url: None,
644            memory_mode: None,
645        })
646    }
647}
648
649pub struct Runtime {
650    pub config: ConfigToml,
651    pub model_registry: ModelRegistry,
652    pub thread_manager: ThreadManager,
653    pub tool_registry: Arc<ToolRegistry>,
654    pub mcp_manager: Arc<McpManager>,
655    pub exec_policy: ExecPolicyEngine,
656    pub hooks: HookDispatcher,
657    pub jobs: JobManager,
658}
659
660impl Runtime {
661    pub fn new(
662        config: ConfigToml,
663        model_registry: ModelRegistry,
664        state: StateStore,
665        tool_registry: Arc<ToolRegistry>,
666        mcp_manager: Arc<McpManager>,
667        exec_policy: ExecPolicyEngine,
668        hooks: HookDispatcher,
669    ) -> Self {
670        let mut jobs = JobManager::default();
671        let _ = jobs.load_from_store(&state);
672        Self {
673            config,
674            model_registry,
675            thread_manager: ThreadManager::new(state),
676            tool_registry,
677            mcp_manager,
678            exec_policy,
679            hooks,
680            jobs,
681        }
682    }
683
684    fn persisted_thread_data(&self, thread_id: &str) -> Result<Value> {
685        let history = self
686            .thread_manager
687            .state_store()
688            .list_messages(thread_id, Some(500))?
689            .into_iter()
690            .map(|message| {
691                json!({
692                    "id": message.id,
693                    "role": message.role,
694                    "content": message.content,
695                    "item": message.item,
696                    "created_at": message.created_at
697                })
698            })
699            .collect::<Vec<_>>();
700
701        let checkpoint = self
702            .thread_manager
703            .state_store()
704            .load_checkpoint(thread_id, None)?
705            .map(|record| {
706                json!({
707                    "checkpoint_id": record.checkpoint_id,
708                    "state": record.state,
709                    "created_at": record.created_at
710                })
711            });
712
713        Ok(json!({
714            "history": history,
715            "checkpoint": checkpoint
716        }))
717    }
718
719    fn persist_latest_checkpoint(&self, thread_id: &str, reason: &str, state: Value) -> Result<()> {
720        self.thread_manager.state_store().save_checkpoint(
721            thread_id,
722            "latest",
723            &json!({
724                "reason": reason,
725                "saved_at": chrono::Utc::now().timestamp(),
726                "state": state
727            }),
728        )
729    }
730
731    pub async fn handle_thread(&mut self, req: ThreadRequest) -> Result<ThreadResponse> {
732        match req {
733            ThreadRequest::Create { .. } => {
734                let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
735                let new = self.thread_manager.spawn_thread_with_history(
736                    "deepseek".to_string(),
737                    cwd,
738                    InitialHistory::New,
739                    false,
740                )?;
741                let mut response = thread_response_from_new("created", new);
742                response.data = self.persisted_thread_data(&response.thread_id)?;
743                Ok(response)
744            }
745            ThreadRequest::Start(params) => {
746                let cwd = params.cwd.clone().unwrap_or_else(|| {
747                    std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
748                });
749                let new = self.thread_manager.spawn_thread_with_history(
750                    params
751                        .model_provider
752                        .clone()
753                        .unwrap_or_else(|| "deepseek".to_string()),
754                    cwd,
755                    InitialHistory::New,
756                    params.persist_extended_history,
757                )?;
758                let mut response = thread_response_from_new("started", new);
759                response.data = self.persisted_thread_data(&response.thread_id)?;
760                Ok(response)
761            }
762            ThreadRequest::Resume(params) => {
763                let fallback_cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
764                if let Some(new) = self.thread_manager.resume_thread_with_history(
765                    &params,
766                    &fallback_cwd,
767                    "deepseek".to_string(),
768                )? {
769                    let mut response = thread_response_from_new("resumed", new);
770                    response.data = self.persisted_thread_data(&response.thread_id)?;
771                    Ok(response)
772                } else {
773                    Ok(ThreadResponse {
774                        thread_id: params.thread_id,
775                        status: "missing".to_string(),
776                        thread: None,
777                        threads: Vec::new(),
778                        model: None,
779                        model_provider: None,
780                        cwd: None,
781                        approval_policy: params.approval_policy,
782                        sandbox: params.sandbox,
783                        events: Vec::new(),
784                        data: json!({"error":"thread not found"}),
785                    })
786                }
787            }
788            ThreadRequest::Fork(params) => {
789                let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
790                if let Some(new) = self.thread_manager.fork_thread(&params, &cwd)? {
791                    let mut response = thread_response_from_new("forked", new);
792                    response.data = self.persisted_thread_data(&response.thread_id)?;
793                    Ok(response)
794                } else {
795                    Ok(ThreadResponse {
796                        thread_id: params.thread_id,
797                        status: "missing".to_string(),
798                        thread: None,
799                        threads: Vec::new(),
800                        model: None,
801                        model_provider: None,
802                        cwd: None,
803                        approval_policy: params.approval_policy,
804                        sandbox: params.sandbox,
805                        events: Vec::new(),
806                        data: json!({"error":"thread not found"}),
807                    })
808                }
809            }
810            ThreadRequest::List(params) => Ok(ThreadResponse {
811                thread_id: "list".to_string(),
812                status: "ok".to_string(),
813                thread: None,
814                threads: self.thread_manager.list_threads(&params)?,
815                model: None,
816                model_provider: None,
817                cwd: None,
818                approval_policy: None,
819                sandbox: None,
820                events: Vec::new(),
821                data: json!({}),
822            }),
823            ThreadRequest::Read(params) => {
824                let id = params.thread_id.clone();
825                let data = self.persisted_thread_data(&id)?;
826                Ok(ThreadResponse {
827                    thread_id: id,
828                    status: "ok".to_string(),
829                    thread: self.thread_manager.read_thread(&params)?,
830                    threads: Vec::new(),
831                    model: None,
832                    model_provider: None,
833                    cwd: None,
834                    approval_policy: None,
835                    sandbox: None,
836                    events: Vec::new(),
837                    data,
838                })
839            }
840            ThreadRequest::SetName(params) => Ok(ThreadResponse {
841                thread_id: params.thread_id.clone(),
842                status: "ok".to_string(),
843                thread: self.thread_manager.set_thread_name(&params)?,
844                threads: Vec::new(),
845                model: None,
846                model_provider: None,
847                cwd: None,
848                approval_policy: None,
849                sandbox: None,
850                events: Vec::new(),
851                data: json!({}),
852            }),
853            ThreadRequest::Archive { thread_id } => {
854                self.thread_manager.archive_thread(&thread_id)?;
855                Ok(ThreadResponse {
856                    thread_id,
857                    status: "archived".to_string(),
858                    thread: None,
859                    threads: Vec::new(),
860                    model: None,
861                    model_provider: None,
862                    cwd: None,
863                    approval_policy: None,
864                    sandbox: None,
865                    events: Vec::new(),
866                    data: json!({}),
867                })
868            }
869            ThreadRequest::Unarchive { thread_id } => {
870                self.thread_manager.unarchive_thread(&thread_id)?;
871                Ok(ThreadResponse {
872                    thread_id,
873                    status: "unarchived".to_string(),
874                    thread: None,
875                    threads: Vec::new(),
876                    model: None,
877                    model_provider: None,
878                    cwd: None,
879                    approval_policy: None,
880                    sandbox: None,
881                    events: Vec::new(),
882                    data: json!({}),
883                })
884            }
885            ThreadRequest::Message { thread_id, input } => {
886                self.thread_manager.touch_message(&thread_id, &input)?;
887                let response_id = format!("{thread_id}:{}", input.len());
888                self.hooks
889                    .emit(HookEvent::ResponseStart {
890                        response_id: response_id.clone(),
891                    })
892                    .await;
893                self.hooks
894                    .emit(HookEvent::ResponseEnd {
895                        response_id: response_id.clone(),
896                    })
897                    .await;
898
899                Ok(ThreadResponse {
900                    thread_id,
901                    status: "accepted".to_string(),
902                    thread: None,
903                    threads: Vec::new(),
904                    model: None,
905                    model_provider: None,
906                    cwd: None,
907                    approval_policy: None,
908                    sandbox: None,
909                    events: vec![
910                        EventFrame::ResponseStart {
911                            response_id: response_id.clone(),
912                        },
913                        EventFrame::ResponseDelta {
914                            response_id: response_id.clone(),
915                            delta: "queued".to_string(),
916                        },
917                        EventFrame::ResponseEnd { response_id },
918                    ],
919                    data: json!({}),
920                })
921            }
922        }
923    }
924
925    pub async fn handle_prompt(
926        &mut self,
927        req: PromptRequest,
928        cli_overrides: &CliRuntimeOverrides,
929    ) -> Result<PromptResponse> {
930        let resolved = self.config.resolve_runtime_options(cli_overrides);
931        let requested_model = req.model.clone().unwrap_or_else(|| resolved.model.clone());
932        let selection = self
933            .model_registry
934            .resolve(Some(&requested_model), Some(resolved.provider));
935        let resolved_model = selection.resolved.id.clone();
936        let response_id = format!("resp-{}", Uuid::new_v4());
937
938        self.hooks
939            .emit(HookEvent::ResponseStart {
940                response_id: response_id.clone(),
941            })
942            .await;
943        self.hooks
944            .emit(HookEvent::ResponseDelta {
945                response_id: response_id.clone(),
946                delta: "model-selected".to_string(),
947            })
948            .await;
949        self.hooks
950            .emit(HookEvent::ResponseEnd {
951                response_id: response_id.clone(),
952            })
953            .await;
954
955        let payload = json!({
956            "provider": resolved.provider.as_str(),
957            "model": resolved_model.clone(),
958            "prompt": req.prompt,
959            "telemetry": resolved.telemetry,
960            "base_url": resolved.base_url,
961            "has_api_key": resolved.api_key.as_ref().is_some_and(|k| !k.trim().is_empty()),
962            "approval_policy": resolved.approval_policy,
963            "sandbox_mode": resolved.sandbox_mode
964        });
965        if let Some(thread_id) = req.thread_id.as_ref() {
966            self.thread_manager.touch_message(thread_id, &req.prompt)?;
967            let assistant_message_id = self.thread_manager.store.append_message(
968                thread_id,
969                "assistant",
970                &payload.to_string(),
971                Some(payload.clone()),
972            )?;
973            self.persist_latest_checkpoint(
974                thread_id,
975                "prompt_response",
976                json!({
977                    "response_id": response_id.clone(),
978                    "model": resolved_model.clone(),
979                    "provider": resolved.provider.as_str(),
980                    "assistant_message_id": assistant_message_id
981                }),
982            )?;
983        }
984
985        Ok(PromptResponse {
986            output: payload.to_string(),
987            model: resolved_model,
988            events: vec![
989                EventFrame::ResponseStart {
990                    response_id: response_id.clone(),
991                },
992                EventFrame::ResponseDelta {
993                    response_id: response_id.clone(),
994                    delta: "model-selected".to_string(),
995                },
996                EventFrame::ResponseEnd { response_id },
997            ],
998        })
999    }
1000
1001    pub async fn invoke_tool(
1002        &self,
1003        call: ToolCall,
1004        approval_mode: AskForApproval,
1005        cwd: &Path,
1006    ) -> Result<Value> {
1007        let fallback_cwd = cwd.display().to_string();
1008        let (command, policy_cwd, execution_kind) = call.execution_subject(&fallback_cwd);
1009        let decision = self.exec_policy.check(ExecPolicyContext {
1010            command: &command,
1011            cwd: &policy_cwd,
1012            ask_for_approval: approval_mode,
1013            sandbox_mode: None,
1014        })?;
1015        let precheck = policy_precheck_payload(&decision, &command, &policy_cwd, execution_kind);
1016        let response_id = format!("tool-{}", Uuid::new_v4());
1017        let call_id = call
1018            .raw_tool_call_id
1019            .clone()
1020            .unwrap_or_else(|| format!("tool-call-{}", Uuid::new_v4()));
1021        self.hooks
1022            .emit(HookEvent::ToolLifecycle {
1023                response_id: response_id.clone(),
1024                tool_name: call.name.clone(),
1025                phase: "precheck".to_string(),
1026                payload: precheck.clone(),
1027            })
1028            .await;
1029
1030        if !decision.allow {
1031            let reason = decision.reason().to_string();
1032            let approval_id = format!("approval-{}", Uuid::new_v4());
1033            let error_frame = EventFrame::Error {
1034                response_id: response_id.clone(),
1035                message: reason.clone(),
1036            };
1037            self.hooks
1038                .emit(HookEvent::ApprovalLifecycle {
1039                    approval_id,
1040                    phase: "denied".to_string(),
1041                    reason: Some(reason.clone()),
1042                })
1043                .await;
1044            self.hooks
1045                .emit(HookEvent::GenericEventFrame {
1046                    frame: error_frame.clone(),
1047                })
1048                .await;
1049            return Ok(json!({
1050                "ok": false,
1051                "status": "denied",
1052                "execution_kind": execution_kind,
1053                "response_id": response_id,
1054                "precheck": precheck,
1055                "error": reason,
1056                "events": [event_frame_payload(&error_frame)],
1057            }));
1058        }
1059
1060        if decision.requires_approval {
1061            let approval_id = format!("approval-{}", Uuid::new_v4());
1062            let reason = decision.reason().to_string();
1063            let maybe_approval_frame = approval_request_frame(
1064                &decision.requirement,
1065                call_id,
1066                approval_id.clone(),
1067                response_id.clone(),
1068                command.clone(),
1069                policy_cwd.clone(),
1070            );
1071            self.hooks
1072                .emit(HookEvent::ApprovalLifecycle {
1073                    approval_id: approval_id.clone(),
1074                    phase: "requested".to_string(),
1075                    reason: Some(reason.clone()),
1076                })
1077                .await;
1078            let mut events = Vec::new();
1079            if let Some(frame) = maybe_approval_frame {
1080                self.hooks
1081                    .emit(HookEvent::GenericEventFrame {
1082                        frame: frame.clone(),
1083                    })
1084                    .await;
1085                events.push(event_frame_payload(&frame));
1086            }
1087            return Ok(json!({
1088                "ok": false,
1089                "status": "approval_required",
1090                "execution_kind": execution_kind,
1091                "response_id": response_id,
1092                "approval_id": approval_id,
1093                "precheck": precheck,
1094                "error": reason,
1095                "events": events,
1096            }));
1097        }
1098
1099        let start_frame = EventFrame::ToolCallStart {
1100            response_id: response_id.clone(),
1101            tool_name: call.name.clone(),
1102            arguments: tool_payload_value(&call.payload),
1103        };
1104        self.hooks
1105            .emit(HookEvent::GenericEventFrame {
1106                frame: start_frame.clone(),
1107            })
1108            .await;
1109        self.hooks
1110            .emit(HookEvent::ToolLifecycle {
1111                response_id: response_id.clone(),
1112                tool_name: call.name.clone(),
1113                phase: "dispatching".to_string(),
1114                payload: json!({
1115                    "call_id": call_id,
1116                    "execution_kind": execution_kind
1117                }),
1118            })
1119            .await;
1120
1121        match self.tool_registry.dispatch(call.clone(), true).await {
1122            Ok(tool_output) => {
1123                let result_frame = EventFrame::ToolCallResult {
1124                    response_id: response_id.clone(),
1125                    tool_name: call.name.clone(),
1126                    output: tool_output_value(&tool_output),
1127                };
1128                self.hooks
1129                    .emit(HookEvent::GenericEventFrame {
1130                        frame: result_frame.clone(),
1131                    })
1132                    .await;
1133                self.hooks
1134                    .emit(HookEvent::ToolLifecycle {
1135                        response_id: response_id.clone(),
1136                        tool_name: call.name,
1137                        phase: "completed".to_string(),
1138                        payload: json!({ "ok": true }),
1139                    })
1140                    .await;
1141                Ok(json!({
1142                    "ok": true,
1143                    "status": "completed",
1144                    "execution_kind": execution_kind,
1145                    "response_id": response_id,
1146                    "precheck": precheck,
1147                    "output": tool_output,
1148                    "events": [
1149                        event_frame_payload(&start_frame),
1150                        event_frame_payload(&result_frame)
1151                    ]
1152                }))
1153            }
1154            Err(err) => {
1155                let message = format!("{err:?}");
1156                let error_frame = EventFrame::Error {
1157                    response_id: response_id.clone(),
1158                    message: message.clone(),
1159                };
1160                self.hooks
1161                    .emit(HookEvent::GenericEventFrame {
1162                        frame: error_frame.clone(),
1163                    })
1164                    .await;
1165                self.hooks
1166                    .emit(HookEvent::ToolLifecycle {
1167                        response_id: response_id.clone(),
1168                        tool_name: call.name,
1169                        phase: "failed".to_string(),
1170                        payload: json!({ "error": message.clone() }),
1171                    })
1172                    .await;
1173                Ok(json!({
1174                    "ok": false,
1175                    "status": "failed",
1176                    "execution_kind": execution_kind,
1177                    "response_id": response_id,
1178                    "precheck": precheck,
1179                    "error": message,
1180                    "events": [
1181                        event_frame_payload(&start_frame),
1182                        event_frame_payload(&error_frame)
1183                    ]
1184                }))
1185            }
1186        }
1187    }
1188
1189    pub async fn mcp_startup(&self) -> McpStartupCompleteEvent {
1190        let mut updates = Vec::new();
1191        let summary = self.mcp_manager.start_all(|update| {
1192            updates.push(update);
1193        });
1194        for update in updates {
1195            let status = match update.status {
1196                McpManagerStartupStatus::Starting => deepseek_protocol::McpStartupStatus::Starting,
1197                McpManagerStartupStatus::Ready => deepseek_protocol::McpStartupStatus::Ready,
1198                McpManagerStartupStatus::Failed { error } => {
1199                    deepseek_protocol::McpStartupStatus::Failed { error }
1200                }
1201                McpManagerStartupStatus::Cancelled => {
1202                    deepseek_protocol::McpStartupStatus::Cancelled
1203                }
1204            };
1205            self.hooks
1206                .emit(HookEvent::GenericEventFrame {
1207                    frame: EventFrame::McpStartupUpdate {
1208                        update: deepseek_protocol::McpStartupUpdateEvent {
1209                            server_name: update.server_name,
1210                            status,
1211                        },
1212                    },
1213                })
1214                .await;
1215        }
1216        self.hooks
1217            .emit(HookEvent::GenericEventFrame {
1218                frame: EventFrame::McpStartupComplete {
1219                    summary: deepseek_protocol::McpStartupCompleteEvent {
1220                        ready: summary.ready.clone(),
1221                        failed: summary
1222                            .failed
1223                            .iter()
1224                            .map(|f| deepseek_protocol::McpStartupFailure {
1225                                server_name: f.server_name.clone(),
1226                                error: f.error.clone(),
1227                            })
1228                            .collect(),
1229                        cancelled: summary.cancelled.clone(),
1230                    },
1231                },
1232            })
1233            .await;
1234        summary
1235    }
1236
1237    pub fn app_status(&self) -> AppResponse {
1238        let jobs = self.jobs.list();
1239        let events = jobs
1240            .iter()
1241            .flat_map(|job| {
1242                job.history.iter().map(|entry| EventFrame::ResponseDelta {
1243                    response_id: job.id.clone(),
1244                    delta: json!({
1245                        "kind": "job_transition",
1246                        "job_id": job.id.clone(),
1247                        "phase": entry.phase.clone(),
1248                        "status": job_status_to_str(entry.status),
1249                        "progress": entry.progress,
1250                        "detail": entry.detail.clone(),
1251                        "retry": job_retry_to_value(&entry.retry),
1252                        "at": entry.at
1253                    })
1254                    .to_string(),
1255                })
1256            })
1257            .collect::<Vec<_>>();
1258        AppResponse {
1259            ok: true,
1260            data: json!({
1261                "jobs": jobs.into_iter().map(|job| {
1262                    json!({
1263                        "id": job.id,
1264                        "name": job.name,
1265                        "status": job_status_to_str(job.status),
1266                        "progress": job.progress,
1267                        "detail": job.detail,
1268                        "retry": job_retry_to_value(&job.retry),
1269                        "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
1270                    })
1271                }).collect::<Vec<_>>()
1272            }),
1273            events,
1274        }
1275    }
1276
1277    pub fn provider_default(&self) -> ProviderKind {
1278        self.config.provider
1279    }
1280
1281    pub fn save_thread_checkpoint(
1282        &self,
1283        thread_id: &str,
1284        checkpoint_id: &str,
1285        state: &Value,
1286    ) -> Result<()> {
1287        self.thread_manager
1288            .state_store()
1289            .save_checkpoint(thread_id, checkpoint_id, state)
1290    }
1291
1292    pub fn load_thread_checkpoint(
1293        &self,
1294        thread_id: &str,
1295        checkpoint_id: Option<&str>,
1296    ) -> Result<Option<Value>> {
1297        Ok(self
1298            .thread_manager
1299            .state_store()
1300            .load_checkpoint(thread_id, checkpoint_id)?
1301            .map(|checkpoint| checkpoint.state))
1302    }
1303
1304    pub fn enqueue_job(&mut self, name: impl Into<String>) -> Result<JobRecord> {
1305        let job = self.jobs.enqueue(name);
1306        self.jobs
1307            .persist_job(self.thread_manager.state_store(), &job.id)?;
1308        Ok(job)
1309    }
1310
1311    pub fn set_job_running(&mut self, job_id: &str) -> Result<()> {
1312        self.jobs.set_running(job_id);
1313        self.jobs
1314            .persist_job(self.thread_manager.state_store(), job_id)
1315    }
1316
1317    pub fn update_job_progress(
1318        &mut self,
1319        job_id: &str,
1320        progress: u8,
1321        detail: Option<String>,
1322    ) -> Result<()> {
1323        self.jobs.update_progress(job_id, progress, detail);
1324        self.jobs
1325            .persist_job(self.thread_manager.state_store(), job_id)
1326    }
1327
1328    pub fn complete_job(&mut self, job_id: &str) -> Result<()> {
1329        self.jobs.complete(job_id);
1330        self.jobs
1331            .persist_job(self.thread_manager.state_store(), job_id)
1332    }
1333
1334    pub fn fail_job(&mut self, job_id: &str, detail: impl Into<String>) -> Result<()> {
1335        self.jobs.fail(job_id, detail);
1336        self.jobs
1337            .persist_job(self.thread_manager.state_store(), job_id)
1338    }
1339
1340    pub fn cancel_job(&mut self, job_id: &str) -> Result<()> {
1341        self.jobs.cancel(job_id);
1342        self.jobs
1343            .persist_job(self.thread_manager.state_store(), job_id)
1344    }
1345
1346    pub fn pause_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1347        self.jobs.pause(job_id, detail);
1348        self.jobs
1349            .persist_job(self.thread_manager.state_store(), job_id)
1350    }
1351
1352    pub fn resume_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1353        self.jobs.resume(job_id, detail);
1354        self.jobs
1355            .persist_job(self.thread_manager.state_store(), job_id)
1356    }
1357
1358    pub fn job_history(&self, job_id: &str) -> Vec<JobHistoryEntry> {
1359        self.jobs.history(job_id)
1360    }
1361}
1362
1363fn thread_response_from_new(status: &str, new: NewThread) -> ThreadResponse {
1364    ThreadResponse {
1365        thread_id: new.thread.id.clone(),
1366        status: status.to_string(),
1367        thread: Some(new.thread),
1368        threads: Vec::new(),
1369        model: Some(new.model),
1370        model_provider: Some(new.model_provider),
1371        cwd: Some(new.cwd),
1372        approval_policy: new.approval_policy,
1373        sandbox: new.sandbox,
1374        events: Vec::new(),
1375        data: json!({}),
1376    }
1377}
1378
1379fn preview_from_initial_history(initial_history: &InitialHistory) -> String {
1380    match initial_history {
1381        InitialHistory::New => "New conversation".to_string(),
1382        InitialHistory::Forked(items) => truncate_preview(
1383            &items
1384                .first()
1385                .map(Value::to_string)
1386                .unwrap_or_else(|| "Forked conversation".to_string()),
1387        ),
1388        InitialHistory::Resumed { history, .. } => truncate_preview(
1389            &history
1390                .first()
1391                .map(Value::to_string)
1392                .unwrap_or_else(|| "Resumed conversation".to_string()),
1393        ),
1394    }
1395}
1396
1397fn truncate_preview(value: &str) -> String {
1398    value.chars().take(120).collect()
1399}
1400
1401fn to_protocol_thread(thread: ThreadMetadata) -> Thread {
1402    Thread {
1403        id: thread.id,
1404        preview: thread.preview,
1405        ephemeral: thread.ephemeral,
1406        model_provider: thread.model_provider,
1407        created_at: thread.created_at,
1408        updated_at: thread.updated_at,
1409        status: match thread.status {
1410            PersistedThreadStatus::Running => ThreadStatus::Running,
1411            PersistedThreadStatus::Idle => ThreadStatus::Idle,
1412            PersistedThreadStatus::Completed => ThreadStatus::Completed,
1413            PersistedThreadStatus::Failed => ThreadStatus::Failed,
1414            PersistedThreadStatus::Paused => ThreadStatus::Paused,
1415            PersistedThreadStatus::Archived => ThreadStatus::Archived,
1416        },
1417        path: thread.path,
1418        cwd: thread.cwd,
1419        cli_version: thread.cli_version,
1420        source: match thread.source {
1421            SessionSource::Interactive => deepseek_protocol::SessionSource::Interactive,
1422            SessionSource::Resume => deepseek_protocol::SessionSource::Resume,
1423            SessionSource::Fork => deepseek_protocol::SessionSource::Fork,
1424            SessionSource::Api => deepseek_protocol::SessionSource::Api,
1425            SessionSource::Unknown => deepseek_protocol::SessionSource::Unknown,
1426        },
1427        name: thread.name,
1428    }
1429}
1430
1431fn to_persisted_status(status: &ThreadStatus) -> PersistedThreadStatus {
1432    match status {
1433        ThreadStatus::Running => PersistedThreadStatus::Running,
1434        ThreadStatus::Idle => PersistedThreadStatus::Idle,
1435        ThreadStatus::Completed => PersistedThreadStatus::Completed,
1436        ThreadStatus::Failed => PersistedThreadStatus::Failed,
1437        ThreadStatus::Paused => PersistedThreadStatus::Paused,
1438        ThreadStatus::Archived => PersistedThreadStatus::Archived,
1439    }
1440}
1441
1442fn to_persisted_source(source: &deepseek_protocol::SessionSource) -> SessionSource {
1443    match source {
1444        deepseek_protocol::SessionSource::Interactive => SessionSource::Interactive,
1445        deepseek_protocol::SessionSource::Resume => SessionSource::Resume,
1446        deepseek_protocol::SessionSource::Fork => SessionSource::Fork,
1447        deepseek_protocol::SessionSource::Api => SessionSource::Api,
1448        deepseek_protocol::SessionSource::Unknown => SessionSource::Unknown,
1449    }
1450}
1451
1452fn approval_request_frame(
1453    requirement: &ExecApprovalRequirement,
1454    call_id: String,
1455    approval_id: String,
1456    turn_id: String,
1457    command: String,
1458    cwd: String,
1459) -> Option<EventFrame> {
1460    let ExecApprovalRequirement::NeedsApproval {
1461        reason,
1462        proposed_execpolicy_amendment,
1463        proposed_network_policy_amendments,
1464    } = requirement
1465    else {
1466        return None;
1467    };
1468
1469    let mut available_decisions = vec![
1470        ReviewDecision::Approved,
1471        ReviewDecision::ApprovedForSession,
1472        ReviewDecision::Denied,
1473        ReviewDecision::Abort,
1474    ];
1475    if proposed_execpolicy_amendment
1476        .as_ref()
1477        .is_some_and(|amendment| !amendment.prefixes.is_empty())
1478    {
1479        available_decisions.push(ReviewDecision::ApprovedExecpolicyAmendment);
1480    }
1481    available_decisions.extend(proposed_network_policy_amendments.iter().cloned().map(
1482        |amendment| ReviewDecision::NetworkPolicyAmendment {
1483            host: amendment.host,
1484            action: amendment.action,
1485        },
1486    ));
1487
1488    Some(EventFrame::ExecApprovalRequest {
1489        request: ExecApprovalRequestEvent {
1490            call_id,
1491            approval_id,
1492            turn_id,
1493            command,
1494            cwd,
1495            reason: reason.clone(),
1496            network_approval_context: None,
1497            proposed_execpolicy_amendment: proposed_execpolicy_amendment
1498                .as_ref()
1499                .map(|amendment| amendment.prefixes.clone())
1500                .unwrap_or_default(),
1501            proposed_network_policy_amendments: proposed_network_policy_amendments.clone(),
1502            additional_permissions: Vec::new(),
1503            available_decisions,
1504        },
1505    })
1506}
1507
1508fn approval_requirement_payload(requirement: &ExecApprovalRequirement) -> Value {
1509    match requirement {
1510        ExecApprovalRequirement::Skip {
1511            bypass_sandbox,
1512            proposed_execpolicy_amendment,
1513        } => json!({
1514            "type": "skip",
1515            "bypass_sandbox": bypass_sandbox,
1516            "reason": requirement.reason(),
1517            "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1518                .as_ref()
1519                .map(|amendment| amendment.prefixes.clone())
1520                .unwrap_or_default()
1521        }),
1522        ExecApprovalRequirement::NeedsApproval {
1523            reason,
1524            proposed_execpolicy_amendment,
1525            proposed_network_policy_amendments,
1526        } => json!({
1527            "type": "needs_approval",
1528            "reason": reason,
1529            "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1530                .as_ref()
1531                .map(|amendment| amendment.prefixes.clone())
1532                .unwrap_or_default(),
1533            "proposed_network_policy_amendments": proposed_network_policy_amendments
1534        }),
1535        ExecApprovalRequirement::Forbidden { reason } => json!({
1536            "type": "forbidden",
1537            "reason": reason
1538        }),
1539    }
1540}
1541
1542fn policy_precheck_payload(
1543    decision: &ExecPolicyDecision,
1544    command: &str,
1545    cwd: &str,
1546    execution_kind: &str,
1547) -> Value {
1548    json!({
1549        "execution_kind": execution_kind,
1550        "command": command,
1551        "cwd": cwd,
1552        "allow": decision.allow,
1553        "requires_approval": decision.requires_approval,
1554        "matched_rule": decision.matched_rule.clone(),
1555        "phase": decision.requirement.phase(),
1556        "reason": decision.reason(),
1557        "requirement": approval_requirement_payload(&decision.requirement)
1558    })
1559}
1560
1561fn tool_payload_value(payload: &ToolPayload) -> Value {
1562    serde_json::to_value(payload).unwrap_or_else(
1563        |_| json!({"type":"serialization_error","message":"tool payload unavailable"}),
1564    )
1565}
1566
1567fn tool_output_value(output: &deepseek_protocol::ToolOutput) -> Value {
1568    serde_json::to_value(output).unwrap_or_else(
1569        |_| json!({"type":"serialization_error","message":"tool output unavailable"}),
1570    )
1571}
1572
1573fn event_frame_payload(frame: &EventFrame) -> Value {
1574    serde_json::to_value(frame)
1575        .unwrap_or_else(|_| json!({"event":"error","message":"failed to encode event frame"}))
1576}
1577
1578fn json_optional_string(value: &Value) -> Option<String> {
1579    if value.is_null() {
1580        None
1581    } else {
1582        value.as_str().map(ToString::to_string)
1583    }
1584}
1585
1586fn parse_retry_metadata(value: Option<&Value>) -> JobRetryMetadata {
1587    let Some(value) = value else {
1588        return JobRetryMetadata::default();
1589    };
1590    JobRetryMetadata {
1591        attempt: value
1592            .get("attempt")
1593            .and_then(Value::as_u64)
1594            .unwrap_or(0)
1595            .min(u32::MAX as u64) as u32,
1596        max_attempts: value
1597            .get("max_attempts")
1598            .and_then(Value::as_u64)
1599            .unwrap_or(DEFAULT_JOB_MAX_ATTEMPTS as u64)
1600            .min(u32::MAX as u64) as u32,
1601        backoff_base_ms: value
1602            .get("backoff_base_ms")
1603            .and_then(Value::as_u64)
1604            .unwrap_or(DEFAULT_JOB_BACKOFF_BASE_MS),
1605        next_backoff_ms: value
1606            .get("next_backoff_ms")
1607            .and_then(Value::as_u64)
1608            .unwrap_or(0),
1609        next_retry_at: value.get("next_retry_at").and_then(Value::as_i64),
1610    }
1611}
1612
1613fn parse_history_entry(value: &Value) -> Option<JobHistoryEntry> {
1614    let status = value
1615        .get("status")
1616        .and_then(Value::as_str)
1617        .and_then(job_status_from_str)?;
1618    Some(JobHistoryEntry {
1619        at: value.get("at").and_then(Value::as_i64).unwrap_or(0),
1620        phase: value
1621            .get("phase")
1622            .and_then(Value::as_str)
1623            .unwrap_or("unknown")
1624            .to_string(),
1625        status,
1626        progress: value
1627            .get("progress")
1628            .and_then(Value::as_u64)
1629            .map(|v| v.min(u8::MAX as u64) as u8),
1630        detail: value.get("detail").and_then(json_optional_string),
1631        retry: parse_retry_metadata(value.get("retry")),
1632    })
1633}
1634
1635fn job_status_to_str(status: JobStatus) -> &'static str {
1636    match status {
1637        JobStatus::Queued => "queued",
1638        JobStatus::Running => "running",
1639        JobStatus::Paused => "paused",
1640        JobStatus::Completed => "completed",
1641        JobStatus::Failed => "failed",
1642        JobStatus::Cancelled => "cancelled",
1643    }
1644}
1645
1646fn job_status_from_str(value: &str) -> Option<JobStatus> {
1647    match value {
1648        "queued" => Some(JobStatus::Queued),
1649        "running" => Some(JobStatus::Running),
1650        "paused" => Some(JobStatus::Paused),
1651        "completed" => Some(JobStatus::Completed),
1652        "failed" => Some(JobStatus::Failed),
1653        "cancelled" => Some(JobStatus::Cancelled),
1654        _ => None,
1655    }
1656}
1657
1658fn job_retry_to_value(retry: &JobRetryMetadata) -> Value {
1659    json!({
1660        "attempt": retry.attempt,
1661        "max_attempts": retry.max_attempts,
1662        "backoff_base_ms": retry.backoff_base_ms,
1663        "next_backoff_ms": retry.next_backoff_ms,
1664        "next_retry_at": retry.next_retry_at
1665    })
1666}
1667
1668fn job_history_to_value(entry: &JobHistoryEntry) -> Value {
1669    json!({
1670        "at": entry.at,
1671        "phase": entry.phase.clone(),
1672        "status": job_status_to_str(entry.status),
1673        "progress": entry.progress,
1674        "detail": entry.detail.clone(),
1675        "retry": job_retry_to_value(&entry.retry)
1676    })
1677}
1678
1679fn runtime_status_to_job_state(status: JobStatus) -> JobStateStatus {
1680    match status {
1681        JobStatus::Queued => JobStateStatus::Queued,
1682        JobStatus::Running => JobStateStatus::Running,
1683        JobStatus::Paused => JobStateStatus::Running,
1684        JobStatus::Completed => JobStateStatus::Completed,
1685        JobStatus::Failed => JobStateStatus::Failed,
1686        JobStatus::Cancelled => JobStateStatus::Cancelled,
1687    }
1688}
1689
1690fn job_state_status_to_runtime(status: JobStateStatus) -> JobStatus {
1691    match status {
1692        JobStateStatus::Queued => JobStatus::Queued,
1693        JobStateStatus::Running => JobStatus::Running,
1694        JobStateStatus::Completed => JobStatus::Completed,
1695        JobStateStatus::Failed => JobStatus::Failed,
1696        JobStateStatus::Cancelled => JobStatus::Cancelled,
1697    }
1698}