1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use anyhow::Result;
6use deepseek_agent::ModelRegistry;
7use deepseek_config::{CliRuntimeOverrides, ConfigToml, ProviderKind};
8use deepseek_execpolicy::{
9 AskForApproval, ExecApprovalRequirement, ExecPolicyContext, ExecPolicyDecision,
10 ExecPolicyEngine,
11};
12use deepseek_hooks::{HookDispatcher, HookEvent};
13use deepseek_mcp::{
14 McpManager, McpStartupCompleteEvent, McpStartupStatus as McpManagerStartupStatus,
15};
16use deepseek_protocol::{
17 AppResponse, EventFrame, ExecApprovalRequestEvent, PromptRequest, PromptResponse,
18 ReviewDecision, Thread, ThreadForkParams, ThreadListParams, ThreadReadParams, ThreadRequest,
19 ThreadResponse, ThreadResumeParams, ThreadSetNameParams, ThreadStatus, ToolPayload,
20};
21use deepseek_state::{
22 JobStateRecord, JobStateStatus, SessionSource, StateStore, ThreadListFilters, ThreadMetadata,
23 ThreadStatus as PersistedThreadStatus,
24};
25use deepseek_tools::{ToolCall, ToolRegistry};
26use serde_json::{Value, json};
27use uuid::Uuid;
28
29#[derive(Debug, Clone)]
30pub enum InitialHistory {
31 New,
32 Forked(Vec<Value>),
33 Resumed {
34 conversation_id: String,
35 history: Vec<Value>,
36 rollout_path: PathBuf,
37 },
38}
39
40#[derive(Debug, Clone)]
41pub struct NewThread {
42 pub thread: Thread,
43 pub model: String,
44 pub model_provider: String,
45 pub cwd: PathBuf,
46 pub approval_policy: Option<String>,
47 pub sandbox: Option<String>,
48}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub enum JobStatus {
52 Queued,
53 Running,
54 Paused,
55 Completed,
56 Failed,
57 Cancelled,
58}
59
60const JOB_DETAIL_SCHEMA_VERSION: u8 = 1;
61const DEFAULT_JOB_MAX_ATTEMPTS: u32 = 3;
62const DEFAULT_JOB_BACKOFF_BASE_MS: u64 = 500;
63const MAX_JOB_HISTORY_ENTRIES: usize = 64;
64
65#[derive(Debug, Clone)]
66pub struct JobRetryMetadata {
67 pub attempt: u32,
68 pub max_attempts: u32,
69 pub backoff_base_ms: u64,
70 pub next_backoff_ms: u64,
71 pub next_retry_at: Option<i64>,
72}
73
74impl Default for JobRetryMetadata {
75 fn default() -> Self {
76 Self {
77 attempt: 0,
78 max_attempts: DEFAULT_JOB_MAX_ATTEMPTS,
79 backoff_base_ms: DEFAULT_JOB_BACKOFF_BASE_MS,
80 next_backoff_ms: 0,
81 next_retry_at: None,
82 }
83 }
84}
85
86#[derive(Debug, Clone)]
87pub struct JobHistoryEntry {
88 pub at: i64,
89 pub phase: String,
90 pub status: JobStatus,
91 pub progress: Option<u8>,
92 pub detail: Option<String>,
93 pub retry: JobRetryMetadata,
94}
95
96#[derive(Debug, Clone)]
97struct PersistedJobDetail {
98 pub status: JobStatus,
99 pub detail: Option<String>,
100 pub retry: JobRetryMetadata,
101 pub history: Vec<JobHistoryEntry>,
102}
103
104#[derive(Debug, Clone)]
105pub struct JobRecord {
106 pub id: String,
107 pub name: String,
108 pub status: JobStatus,
109 pub progress: Option<u8>,
110 pub detail: Option<String>,
111 pub retry: JobRetryMetadata,
112 pub history: Vec<JobHistoryEntry>,
113 pub created_at: i64,
114 pub updated_at: i64,
115}
116
117#[derive(Debug, Default)]
118pub struct JobManager {
119 jobs: HashMap<String, JobRecord>,
120}
121
122impl JobManager {
123 fn now_ts() -> i64 {
124 chrono::Utc::now().timestamp()
125 }
126
127 fn deterministic_backoff_ms(retry: &JobRetryMetadata) -> u64 {
128 if retry.attempt == 0 {
129 return 0;
130 }
131 let exponent = retry.attempt.saturating_sub(1).min(20);
132 let multiplier = 1u64.checked_shl(exponent).unwrap_or(u64::MAX);
133 retry.backoff_base_ms.saturating_mul(multiplier)
134 }
135
136 fn clear_retry_schedule(retry: &mut JobRetryMetadata) {
137 retry.next_backoff_ms = 0;
138 retry.next_retry_at = None;
139 }
140
141 fn push_history(job: &mut JobRecord, phase: &str) {
142 job.history.push(JobHistoryEntry {
143 at: job.updated_at,
144 phase: phase.to_string(),
145 status: job.status,
146 progress: job.progress,
147 detail: job.detail.clone(),
148 retry: job.retry.clone(),
149 });
150 if job.history.len() > MAX_JOB_HISTORY_ENTRIES {
151 let to_drain = job.history.len() - MAX_JOB_HISTORY_ENTRIES;
152 job.history.drain(0..to_drain);
153 }
154 }
155
156 fn parse_persisted_detail(raw: Option<&str>) -> Option<PersistedJobDetail> {
157 let raw = raw?;
158 let parsed: Value = serde_json::from_str(raw).ok()?;
159 let status = parsed
160 .get("status")
161 .and_then(Value::as_str)
162 .and_then(job_status_from_str)?;
163 let detail = parsed.get("detail").and_then(json_optional_string);
164 let retry = parse_retry_metadata(parsed.get("retry"));
165 let history = parsed
166 .get("history")
167 .and_then(Value::as_array)
168 .map(|items| {
169 items
170 .iter()
171 .filter_map(parse_history_entry)
172 .collect::<Vec<_>>()
173 })
174 .unwrap_or_default();
175 Some(PersistedJobDetail {
176 status,
177 detail,
178 retry,
179 history,
180 })
181 }
182
183 fn encode_persisted_detail(job: &JobRecord) -> Result<Option<String>> {
184 let encoded = json!({
185 "schema_version": JOB_DETAIL_SCHEMA_VERSION,
186 "status": job_status_to_str(job.status),
187 "detail": job.detail.clone(),
188 "retry": job_retry_to_value(&job.retry),
189 "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
190 })
191 .to_string();
192 Ok(Some(encoded))
193 }
194
195 pub fn enqueue(&mut self, name: impl Into<String>) -> JobRecord {
196 let now = Self::now_ts();
197 let id = format!("job-{}", Uuid::new_v4());
198 let mut job = JobRecord {
199 id: id.clone(),
200 name: name.into(),
201 status: JobStatus::Queued,
202 progress: Some(0),
203 detail: None,
204 retry: JobRetryMetadata::default(),
205 history: Vec::new(),
206 created_at: now,
207 updated_at: now,
208 };
209 Self::push_history(&mut job, "created");
210 self.jobs.insert(id, job.clone());
211 job
212 }
213
214 pub fn set_running(&mut self, id: &str) {
215 if let Some(job) = self.jobs.get_mut(id) {
216 job.status = JobStatus::Running;
217 Self::clear_retry_schedule(&mut job.retry);
218 job.updated_at = Self::now_ts();
219 Self::push_history(job, "running");
220 }
221 }
222
223 pub fn update_progress(&mut self, id: &str, progress: u8, detail: Option<String>) {
224 if let Some(job) = self.jobs.get_mut(id) {
225 job.progress = Some(progress.min(100));
226 job.detail = detail;
227 job.updated_at = Self::now_ts();
228 Self::push_history(job, "progress_updated");
229 }
230 }
231
232 pub fn complete(&mut self, id: &str) {
233 if let Some(job) = self.jobs.get_mut(id) {
234 job.status = JobStatus::Completed;
235 job.progress = Some(100);
236 Self::clear_retry_schedule(&mut job.retry);
237 job.updated_at = Self::now_ts();
238 Self::push_history(job, "completed");
239 }
240 }
241
242 pub fn fail(&mut self, id: &str, detail: impl Into<String>) {
243 if let Some(job) = self.jobs.get_mut(id) {
244 let now = Self::now_ts();
245 job.status = JobStatus::Failed;
246 job.detail = Some(detail.into());
247 if job.retry.attempt < job.retry.max_attempts {
248 job.retry.attempt += 1;
249 job.retry.next_backoff_ms = Self::deterministic_backoff_ms(&job.retry);
250 let delay_secs = ((job.retry.next_backoff_ms.saturating_add(999)) / 1000)
251 .min(i64::MAX as u64) as i64;
252 job.retry.next_retry_at = Some(now.saturating_add(delay_secs));
253 } else {
254 Self::clear_retry_schedule(&mut job.retry);
255 }
256 job.updated_at = now;
257 Self::push_history(job, "failed");
258 }
259 }
260
261 pub fn cancel(&mut self, id: &str) {
262 if let Some(job) = self.jobs.get_mut(id) {
263 job.status = JobStatus::Cancelled;
264 Self::clear_retry_schedule(&mut job.retry);
265 job.updated_at = Self::now_ts();
266 Self::push_history(job, "cancelled");
267 }
268 }
269
270 pub fn pause(&mut self, id: &str, detail: Option<String>) {
271 if let Some(job) = self.jobs.get_mut(id) {
272 job.status = JobStatus::Paused;
273 if detail.is_some() {
274 job.detail = detail;
275 }
276 job.updated_at = Self::now_ts();
277 Self::push_history(job, "paused");
278 }
279 }
280
281 pub fn resume(&mut self, id: &str, detail: Option<String>) {
282 if let Some(job) = self.jobs.get_mut(id) {
283 job.status = JobStatus::Running;
284 if detail.is_some() {
285 job.detail = detail;
286 }
287 Self::clear_retry_schedule(&mut job.retry);
288 job.updated_at = Self::now_ts();
289 Self::push_history(job, "resumed");
290 }
291 }
292
293 pub fn list(&self) -> Vec<JobRecord> {
294 let mut out = self.jobs.values().cloned().collect::<Vec<_>>();
295 out.sort_by_key(|job| -job.updated_at);
296 out
297 }
298
299 pub fn history(&self, id: &str) -> Vec<JobHistoryEntry> {
300 self.jobs
301 .get(id)
302 .map(|job| job.history.clone())
303 .unwrap_or_default()
304 }
305
306 pub fn resume_pending(&mut self) -> Vec<JobRecord> {
307 let mut resumed = Vec::new();
308 for job in self.jobs.values_mut() {
309 if matches!(job.status, JobStatus::Queued | JobStatus::Running) {
310 job.status = JobStatus::Queued;
311 job.updated_at = Self::now_ts();
312 Self::push_history(job, "queued_after_resume");
313 resumed.push(job.clone());
314 }
315 }
316 resumed
317 }
318
319 pub fn load_from_store(&mut self, store: &StateStore) -> Result<()> {
320 let persisted = store.list_jobs(Some(500))?;
321 for job in persisted {
322 let fallback_status = job_state_status_to_runtime(job.status.clone());
323 let parsed = Self::parse_persisted_detail(job.detail.as_deref());
324 let (status, detail, retry, history) = if let Some(detail_state) = parsed {
325 (
326 detail_state.status,
327 detail_state.detail,
328 detail_state.retry,
329 detail_state.history,
330 )
331 } else {
332 (
333 fallback_status,
334 job.detail,
335 JobRetryMetadata::default(),
336 Vec::new(),
337 )
338 };
339 self.jobs.insert(
340 job.id.clone(),
341 JobRecord {
342 id: job.id,
343 name: job.name,
344 status,
345 progress: job.progress,
346 detail,
347 retry,
348 history,
349 created_at: job.created_at,
350 updated_at: job.updated_at,
351 },
352 );
353 }
354 Ok(())
355 }
356
357 pub fn persist_job(&self, store: &StateStore, id: &str) -> Result<()> {
358 let Some(job) = self.jobs.get(id) else {
359 return Ok(());
360 };
361 let encoded_detail = Self::encode_persisted_detail(job)?;
362 store.upsert_job(&JobStateRecord {
363 id: job.id.clone(),
364 name: job.name.clone(),
365 status: runtime_status_to_job_state(job.status),
366 progress: job.progress,
367 detail: encoded_detail,
368 created_at: job.created_at,
369 updated_at: job.updated_at,
370 })
371 }
372
373 pub fn persist_all(&self, store: &StateStore) -> Result<()> {
374 for id in self.jobs.keys() {
375 self.persist_job(store, id)?;
376 }
377 Ok(())
378 }
379}
380
381pub struct ThreadManager {
382 store: StateStore,
383 running_threads: HashMap<String, Thread>,
384 cli_version: String,
385}
386
387impl ThreadManager {
388 pub fn new(store: StateStore) -> Self {
389 Self {
390 store,
391 running_threads: HashMap::new(),
392 cli_version: env!("CARGO_PKG_VERSION").to_string(),
393 }
394 }
395
396 pub fn state_store(&self) -> &StateStore {
397 &self.store
398 }
399
400 pub fn spawn_thread_with_history(
401 &mut self,
402 model_provider: String,
403 cwd: PathBuf,
404 initial_history: InitialHistory,
405 persist_extended_history: bool,
406 ) -> Result<NewThread> {
407 let id = format!("thread-{}", Uuid::new_v4());
408 let now = chrono::Utc::now().timestamp();
409 let preview = preview_from_initial_history(&initial_history);
410 let source = match initial_history {
411 InitialHistory::New => SessionSource::Interactive,
412 InitialHistory::Forked(_) => SessionSource::Fork,
413 InitialHistory::Resumed { .. } => SessionSource::Resume,
414 };
415 let thread = Thread {
416 id: id.clone(),
417 preview,
418 ephemeral: !persist_extended_history,
419 model_provider: model_provider.clone(),
420 created_at: now,
421 updated_at: now,
422 status: ThreadStatus::Running,
423 path: None,
424 cwd: cwd.clone(),
425 cli_version: self.cli_version.clone(),
426 source: match source {
427 SessionSource::Interactive => deepseek_protocol::SessionSource::Interactive,
428 SessionSource::Resume => deepseek_protocol::SessionSource::Resume,
429 SessionSource::Fork => deepseek_protocol::SessionSource::Fork,
430 SessionSource::Api => deepseek_protocol::SessionSource::Api,
431 SessionSource::Unknown => deepseek_protocol::SessionSource::Unknown,
432 },
433 name: None,
434 };
435 self.persist_thread(&thread, None)?;
436 match &initial_history {
437 InitialHistory::Forked(items) => {
438 for item in items {
439 self.store.append_message(
440 &thread.id,
441 "history",
442 &item.to_string(),
443 Some(item.clone()),
444 )?;
445 }
446 }
447 InitialHistory::Resumed { history, .. } => {
448 for item in history {
449 self.store.append_message(
450 &thread.id,
451 "history",
452 &item.to_string(),
453 Some(item.clone()),
454 )?;
455 }
456 }
457 InitialHistory::New => {}
458 }
459 self.running_threads
460 .insert(thread.id.clone(), thread.clone());
461 Ok(NewThread {
462 thread,
463 model: "auto".to_string(),
464 model_provider,
465 cwd,
466 approval_policy: None,
467 sandbox: None,
468 })
469 }
470
471 pub fn resume_thread_with_history(
472 &mut self,
473 params: &ThreadResumeParams,
474 fallback_cwd: &Path,
475 model_provider: String,
476 ) -> Result<Option<NewThread>> {
477 if params.history.is_none()
478 && let Some(thread) = self.running_threads.get(¶ms.thread_id).cloned()
479 {
480 return Ok(Some(NewThread {
481 model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
482 model_provider: params.model_provider.clone().unwrap_or(model_provider),
483 cwd: params.cwd.clone().unwrap_or_else(|| thread.cwd.clone()),
484 approval_policy: params.approval_policy.clone(),
485 sandbox: params.sandbox.clone(),
486 thread,
487 }));
488 }
489
490 let persisted = self.store.get_thread(¶ms.thread_id)?;
491 let Some(metadata) = persisted else {
492 return Ok(None);
493 };
494 let mut thread = to_protocol_thread(metadata);
495 thread.status = ThreadStatus::Running;
496 thread.updated_at = chrono::Utc::now().timestamp();
497 thread.cwd = params
498 .cwd
499 .clone()
500 .unwrap_or_else(|| fallback_cwd.to_path_buf());
501 self.persist_thread(&thread, None)?;
502 self.running_threads
503 .insert(thread.id.clone(), thread.clone());
504 if let Some(history) = params.history.as_ref() {
505 for item in history {
506 self.store.append_message(
507 &thread.id,
508 "history",
509 &item.to_string(),
510 Some(item.clone()),
511 )?;
512 }
513 }
514
515 Ok(Some(NewThread {
516 model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
517 model_provider: params.model_provider.clone().unwrap_or(model_provider),
518 cwd: thread.cwd.clone(),
519 approval_policy: params.approval_policy.clone(),
520 sandbox: params.sandbox.clone(),
521 thread,
522 }))
523 }
524
525 pub fn fork_thread(
526 &mut self,
527 params: &ThreadForkParams,
528 fallback_cwd: &Path,
529 ) -> Result<Option<NewThread>> {
530 let parent = self.store.get_thread(¶ms.thread_id)?;
531 let Some(parent) = parent else {
532 return Ok(None);
533 };
534 let parent_thread = to_protocol_thread(parent);
535 let new = self.spawn_thread_with_history(
536 params
537 .model_provider
538 .clone()
539 .unwrap_or_else(|| parent_thread.model_provider.clone()),
540 params
541 .cwd
542 .clone()
543 .unwrap_or_else(|| fallback_cwd.to_path_buf()),
544 InitialHistory::Forked(vec![json!({
545 "type": "fork",
546 "from_thread_id": parent_thread.id
547 })]),
548 params.persist_extended_history,
549 )?;
550 Ok(Some(new))
551 }
552
553 pub fn list_threads(&self, params: &ThreadListParams) -> Result<Vec<Thread>> {
554 let list = self.store.list_threads(ThreadListFilters {
555 include_archived: params.include_archived,
556 limit: params.limit,
557 })?;
558 Ok(list.into_iter().map(to_protocol_thread).collect())
559 }
560
561 pub fn read_thread(&self, params: &ThreadReadParams) -> Result<Option<Thread>> {
562 Ok(self
563 .store
564 .get_thread(¶ms.thread_id)?
565 .map(to_protocol_thread))
566 }
567
568 pub fn set_thread_name(&mut self, params: &ThreadSetNameParams) -> Result<Option<Thread>> {
569 let Some(mut metadata) = self.store.get_thread(¶ms.thread_id)? else {
570 return Ok(None);
571 };
572 metadata.name = Some(params.name.clone());
573 metadata.updated_at = chrono::Utc::now().timestamp();
574 self.store.upsert_thread(&metadata)?;
575 let updated = to_protocol_thread(metadata);
576 self.running_threads
577 .insert(updated.id.clone(), updated.clone());
578 Ok(Some(updated))
579 }
580
581 pub fn archive_thread(&mut self, thread_id: &str) -> Result<()> {
582 self.store.mark_archived(thread_id)?;
583 if let Some(thread) = self.running_threads.get_mut(thread_id) {
584 thread.status = ThreadStatus::Archived;
585 }
586 Ok(())
587 }
588
589 pub fn unarchive_thread(&mut self, thread_id: &str) -> Result<()> {
590 self.store.mark_unarchived(thread_id)?;
591 Ok(())
592 }
593
594 pub fn touch_message(&mut self, thread_id: &str, input: &str) -> Result<()> {
595 let Some(mut metadata) = self.store.get_thread(thread_id)? else {
596 return Ok(());
597 };
598 metadata.updated_at = chrono::Utc::now().timestamp();
599 metadata.preview = truncate_preview(input);
600 metadata.status = PersistedThreadStatus::Running;
601 self.store.upsert_thread(&metadata)?;
602 if let Some(thread) = self.running_threads.get_mut(thread_id) {
603 thread.updated_at = metadata.updated_at;
604 thread.preview = metadata.preview;
605 thread.status = ThreadStatus::Running;
606 }
607 let message_id = self.store.append_message(thread_id, "user", input, None)?;
608 self.store.save_checkpoint(
609 thread_id,
610 "latest",
611 &json!({
612 "reason": "thread_message",
613 "message_id": message_id,
614 "role": "user",
615 "preview": truncate_preview(input),
616 "updated_at": metadata.updated_at
617 }),
618 )?;
619 Ok(())
620 }
621
622 fn persist_thread(&self, thread: &Thread, rollout_path: Option<PathBuf>) -> Result<()> {
623 self.store.upsert_thread(&ThreadMetadata {
624 id: thread.id.clone(),
625 rollout_path,
626 preview: thread.preview.clone(),
627 ephemeral: thread.ephemeral,
628 model_provider: thread.model_provider.clone(),
629 created_at: thread.created_at,
630 updated_at: thread.updated_at,
631 status: to_persisted_status(&thread.status),
632 path: thread.path.clone(),
633 cwd: thread.cwd.clone(),
634 cli_version: thread.cli_version.clone(),
635 source: to_persisted_source(&thread.source),
636 name: thread.name.clone(),
637 sandbox_policy: None,
638 approval_mode: None,
639 archived: matches!(thread.status, ThreadStatus::Archived),
640 archived_at: None,
641 git_sha: None,
642 git_branch: None,
643 git_origin_url: None,
644 memory_mode: None,
645 })
646 }
647}
648
649pub struct Runtime {
650 pub config: ConfigToml,
651 pub model_registry: ModelRegistry,
652 pub thread_manager: ThreadManager,
653 pub tool_registry: Arc<ToolRegistry>,
654 pub mcp_manager: Arc<McpManager>,
655 pub exec_policy: ExecPolicyEngine,
656 pub hooks: HookDispatcher,
657 pub jobs: JobManager,
658}
659
660impl Runtime {
661 pub fn new(
662 config: ConfigToml,
663 model_registry: ModelRegistry,
664 state: StateStore,
665 tool_registry: Arc<ToolRegistry>,
666 mcp_manager: Arc<McpManager>,
667 exec_policy: ExecPolicyEngine,
668 hooks: HookDispatcher,
669 ) -> Self {
670 let mut jobs = JobManager::default();
671 let _ = jobs.load_from_store(&state);
672 Self {
673 config,
674 model_registry,
675 thread_manager: ThreadManager::new(state),
676 tool_registry,
677 mcp_manager,
678 exec_policy,
679 hooks,
680 jobs,
681 }
682 }
683
684 fn persisted_thread_data(&self, thread_id: &str) -> Result<Value> {
685 let history = self
686 .thread_manager
687 .state_store()
688 .list_messages(thread_id, Some(500))?
689 .into_iter()
690 .map(|message| {
691 json!({
692 "id": message.id,
693 "role": message.role,
694 "content": message.content,
695 "item": message.item,
696 "created_at": message.created_at
697 })
698 })
699 .collect::<Vec<_>>();
700
701 let checkpoint = self
702 .thread_manager
703 .state_store()
704 .load_checkpoint(thread_id, None)?
705 .map(|record| {
706 json!({
707 "checkpoint_id": record.checkpoint_id,
708 "state": record.state,
709 "created_at": record.created_at
710 })
711 });
712
713 Ok(json!({
714 "history": history,
715 "checkpoint": checkpoint
716 }))
717 }
718
719 fn persist_latest_checkpoint(&self, thread_id: &str, reason: &str, state: Value) -> Result<()> {
720 self.thread_manager.state_store().save_checkpoint(
721 thread_id,
722 "latest",
723 &json!({
724 "reason": reason,
725 "saved_at": chrono::Utc::now().timestamp(),
726 "state": state
727 }),
728 )
729 }
730
731 pub async fn handle_thread(&mut self, req: ThreadRequest) -> Result<ThreadResponse> {
732 match req {
733 ThreadRequest::Create { .. } => {
734 let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
735 let new = self.thread_manager.spawn_thread_with_history(
736 "deepseek".to_string(),
737 cwd,
738 InitialHistory::New,
739 false,
740 )?;
741 let mut response = thread_response_from_new("created", new);
742 response.data = self.persisted_thread_data(&response.thread_id)?;
743 Ok(response)
744 }
745 ThreadRequest::Start(params) => {
746 let cwd = params.cwd.clone().unwrap_or_else(|| {
747 std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
748 });
749 let new = self.thread_manager.spawn_thread_with_history(
750 params
751 .model_provider
752 .clone()
753 .unwrap_or_else(|| "deepseek".to_string()),
754 cwd,
755 InitialHistory::New,
756 params.persist_extended_history,
757 )?;
758 let mut response = thread_response_from_new("started", new);
759 response.data = self.persisted_thread_data(&response.thread_id)?;
760 Ok(response)
761 }
762 ThreadRequest::Resume(params) => {
763 let fallback_cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
764 if let Some(new) = self.thread_manager.resume_thread_with_history(
765 ¶ms,
766 &fallback_cwd,
767 "deepseek".to_string(),
768 )? {
769 let mut response = thread_response_from_new("resumed", new);
770 response.data = self.persisted_thread_data(&response.thread_id)?;
771 Ok(response)
772 } else {
773 Ok(ThreadResponse {
774 thread_id: params.thread_id,
775 status: "missing".to_string(),
776 thread: None,
777 threads: Vec::new(),
778 model: None,
779 model_provider: None,
780 cwd: None,
781 approval_policy: params.approval_policy,
782 sandbox: params.sandbox,
783 events: Vec::new(),
784 data: json!({"error":"thread not found"}),
785 })
786 }
787 }
788 ThreadRequest::Fork(params) => {
789 let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
790 if let Some(new) = self.thread_manager.fork_thread(¶ms, &cwd)? {
791 let mut response = thread_response_from_new("forked", new);
792 response.data = self.persisted_thread_data(&response.thread_id)?;
793 Ok(response)
794 } else {
795 Ok(ThreadResponse {
796 thread_id: params.thread_id,
797 status: "missing".to_string(),
798 thread: None,
799 threads: Vec::new(),
800 model: None,
801 model_provider: None,
802 cwd: None,
803 approval_policy: params.approval_policy,
804 sandbox: params.sandbox,
805 events: Vec::new(),
806 data: json!({"error":"thread not found"}),
807 })
808 }
809 }
810 ThreadRequest::List(params) => Ok(ThreadResponse {
811 thread_id: "list".to_string(),
812 status: "ok".to_string(),
813 thread: None,
814 threads: self.thread_manager.list_threads(¶ms)?,
815 model: None,
816 model_provider: None,
817 cwd: None,
818 approval_policy: None,
819 sandbox: None,
820 events: Vec::new(),
821 data: json!({}),
822 }),
823 ThreadRequest::Read(params) => {
824 let id = params.thread_id.clone();
825 let data = self.persisted_thread_data(&id)?;
826 Ok(ThreadResponse {
827 thread_id: id,
828 status: "ok".to_string(),
829 thread: self.thread_manager.read_thread(¶ms)?,
830 threads: Vec::new(),
831 model: None,
832 model_provider: None,
833 cwd: None,
834 approval_policy: None,
835 sandbox: None,
836 events: Vec::new(),
837 data,
838 })
839 }
840 ThreadRequest::SetName(params) => Ok(ThreadResponse {
841 thread_id: params.thread_id.clone(),
842 status: "ok".to_string(),
843 thread: self.thread_manager.set_thread_name(¶ms)?,
844 threads: Vec::new(),
845 model: None,
846 model_provider: None,
847 cwd: None,
848 approval_policy: None,
849 sandbox: None,
850 events: Vec::new(),
851 data: json!({}),
852 }),
853 ThreadRequest::Archive { thread_id } => {
854 self.thread_manager.archive_thread(&thread_id)?;
855 Ok(ThreadResponse {
856 thread_id,
857 status: "archived".to_string(),
858 thread: None,
859 threads: Vec::new(),
860 model: None,
861 model_provider: None,
862 cwd: None,
863 approval_policy: None,
864 sandbox: None,
865 events: Vec::new(),
866 data: json!({}),
867 })
868 }
869 ThreadRequest::Unarchive { thread_id } => {
870 self.thread_manager.unarchive_thread(&thread_id)?;
871 Ok(ThreadResponse {
872 thread_id,
873 status: "unarchived".to_string(),
874 thread: None,
875 threads: Vec::new(),
876 model: None,
877 model_provider: None,
878 cwd: None,
879 approval_policy: None,
880 sandbox: None,
881 events: Vec::new(),
882 data: json!({}),
883 })
884 }
885 ThreadRequest::Message { thread_id, input } => {
886 self.thread_manager.touch_message(&thread_id, &input)?;
887 let response_id = format!("{thread_id}:{}", input.len());
888 self.hooks
889 .emit(HookEvent::ResponseStart {
890 response_id: response_id.clone(),
891 })
892 .await;
893 self.hooks
894 .emit(HookEvent::ResponseEnd {
895 response_id: response_id.clone(),
896 })
897 .await;
898
899 Ok(ThreadResponse {
900 thread_id,
901 status: "accepted".to_string(),
902 thread: None,
903 threads: Vec::new(),
904 model: None,
905 model_provider: None,
906 cwd: None,
907 approval_policy: None,
908 sandbox: None,
909 events: vec![
910 EventFrame::ResponseStart {
911 response_id: response_id.clone(),
912 },
913 EventFrame::ResponseDelta {
914 response_id: response_id.clone(),
915 delta: "queued".to_string(),
916 },
917 EventFrame::ResponseEnd { response_id },
918 ],
919 data: json!({}),
920 })
921 }
922 }
923 }
924
925 pub async fn handle_prompt(
926 &mut self,
927 req: PromptRequest,
928 cli_overrides: &CliRuntimeOverrides,
929 ) -> Result<PromptResponse> {
930 let resolved = self.config.resolve_runtime_options(cli_overrides);
931 let requested_model = req.model.clone().unwrap_or_else(|| resolved.model.clone());
932 let selection = self
933 .model_registry
934 .resolve(Some(&requested_model), Some(resolved.provider));
935 let resolved_model = selection.resolved.id.clone();
936 let response_id = format!("resp-{}", Uuid::new_v4());
937
938 self.hooks
939 .emit(HookEvent::ResponseStart {
940 response_id: response_id.clone(),
941 })
942 .await;
943 self.hooks
944 .emit(HookEvent::ResponseDelta {
945 response_id: response_id.clone(),
946 delta: "model-selected".to_string(),
947 })
948 .await;
949 self.hooks
950 .emit(HookEvent::ResponseEnd {
951 response_id: response_id.clone(),
952 })
953 .await;
954
955 let payload = json!({
956 "provider": resolved.provider.as_str(),
957 "model": resolved_model.clone(),
958 "prompt": req.prompt,
959 "telemetry": resolved.telemetry,
960 "base_url": resolved.base_url,
961 "has_api_key": resolved.api_key.as_ref().is_some_and(|k| !k.trim().is_empty()),
962 "approval_policy": resolved.approval_policy,
963 "sandbox_mode": resolved.sandbox_mode
964 });
965 if let Some(thread_id) = req.thread_id.as_ref() {
966 self.thread_manager.touch_message(thread_id, &req.prompt)?;
967 let assistant_message_id = self.thread_manager.store.append_message(
968 thread_id,
969 "assistant",
970 &payload.to_string(),
971 Some(payload.clone()),
972 )?;
973 self.persist_latest_checkpoint(
974 thread_id,
975 "prompt_response",
976 json!({
977 "response_id": response_id.clone(),
978 "model": resolved_model.clone(),
979 "provider": resolved.provider.as_str(),
980 "assistant_message_id": assistant_message_id
981 }),
982 )?;
983 }
984
985 Ok(PromptResponse {
986 output: payload.to_string(),
987 model: resolved_model,
988 events: vec![
989 EventFrame::ResponseStart {
990 response_id: response_id.clone(),
991 },
992 EventFrame::ResponseDelta {
993 response_id: response_id.clone(),
994 delta: "model-selected".to_string(),
995 },
996 EventFrame::ResponseEnd { response_id },
997 ],
998 })
999 }
1000
1001 pub async fn invoke_tool(
1002 &self,
1003 call: ToolCall,
1004 approval_mode: AskForApproval,
1005 cwd: &Path,
1006 ) -> Result<Value> {
1007 let fallback_cwd = cwd.display().to_string();
1008 let (command, policy_cwd, execution_kind) = call.execution_subject(&fallback_cwd);
1009 let decision = self.exec_policy.check(ExecPolicyContext {
1010 command: &command,
1011 cwd: &policy_cwd,
1012 ask_for_approval: approval_mode,
1013 sandbox_mode: None,
1014 })?;
1015 let precheck = policy_precheck_payload(&decision, &command, &policy_cwd, execution_kind);
1016 let response_id = format!("tool-{}", Uuid::new_v4());
1017 let call_id = call
1018 .raw_tool_call_id
1019 .clone()
1020 .unwrap_or_else(|| format!("tool-call-{}", Uuid::new_v4()));
1021 self.hooks
1022 .emit(HookEvent::ToolLifecycle {
1023 response_id: response_id.clone(),
1024 tool_name: call.name.clone(),
1025 phase: "precheck".to_string(),
1026 payload: precheck.clone(),
1027 })
1028 .await;
1029
1030 if !decision.allow {
1031 let reason = decision.reason().to_string();
1032 let approval_id = format!("approval-{}", Uuid::new_v4());
1033 let error_frame = EventFrame::Error {
1034 response_id: response_id.clone(),
1035 message: reason.clone(),
1036 };
1037 self.hooks
1038 .emit(HookEvent::ApprovalLifecycle {
1039 approval_id,
1040 phase: "denied".to_string(),
1041 reason: Some(reason.clone()),
1042 })
1043 .await;
1044 self.hooks
1045 .emit(HookEvent::GenericEventFrame {
1046 frame: error_frame.clone(),
1047 })
1048 .await;
1049 return Ok(json!({
1050 "ok": false,
1051 "status": "denied",
1052 "execution_kind": execution_kind,
1053 "response_id": response_id,
1054 "precheck": precheck,
1055 "error": reason,
1056 "events": [event_frame_payload(&error_frame)],
1057 }));
1058 }
1059
1060 if decision.requires_approval {
1061 let approval_id = format!("approval-{}", Uuid::new_v4());
1062 let reason = decision.reason().to_string();
1063 let maybe_approval_frame = approval_request_frame(
1064 &decision.requirement,
1065 call_id,
1066 approval_id.clone(),
1067 response_id.clone(),
1068 command.clone(),
1069 policy_cwd.clone(),
1070 );
1071 self.hooks
1072 .emit(HookEvent::ApprovalLifecycle {
1073 approval_id: approval_id.clone(),
1074 phase: "requested".to_string(),
1075 reason: Some(reason.clone()),
1076 })
1077 .await;
1078 let mut events = Vec::new();
1079 if let Some(frame) = maybe_approval_frame {
1080 self.hooks
1081 .emit(HookEvent::GenericEventFrame {
1082 frame: frame.clone(),
1083 })
1084 .await;
1085 events.push(event_frame_payload(&frame));
1086 }
1087 return Ok(json!({
1088 "ok": false,
1089 "status": "approval_required",
1090 "execution_kind": execution_kind,
1091 "response_id": response_id,
1092 "approval_id": approval_id,
1093 "precheck": precheck,
1094 "error": reason,
1095 "events": events,
1096 }));
1097 }
1098
1099 let start_frame = EventFrame::ToolCallStart {
1100 response_id: response_id.clone(),
1101 tool_name: call.name.clone(),
1102 arguments: tool_payload_value(&call.payload),
1103 };
1104 self.hooks
1105 .emit(HookEvent::GenericEventFrame {
1106 frame: start_frame.clone(),
1107 })
1108 .await;
1109 self.hooks
1110 .emit(HookEvent::ToolLifecycle {
1111 response_id: response_id.clone(),
1112 tool_name: call.name.clone(),
1113 phase: "dispatching".to_string(),
1114 payload: json!({
1115 "call_id": call_id,
1116 "execution_kind": execution_kind
1117 }),
1118 })
1119 .await;
1120
1121 match self.tool_registry.dispatch(call.clone(), true).await {
1122 Ok(tool_output) => {
1123 let result_frame = EventFrame::ToolCallResult {
1124 response_id: response_id.clone(),
1125 tool_name: call.name.clone(),
1126 output: tool_output_value(&tool_output),
1127 };
1128 self.hooks
1129 .emit(HookEvent::GenericEventFrame {
1130 frame: result_frame.clone(),
1131 })
1132 .await;
1133 self.hooks
1134 .emit(HookEvent::ToolLifecycle {
1135 response_id: response_id.clone(),
1136 tool_name: call.name,
1137 phase: "completed".to_string(),
1138 payload: json!({ "ok": true }),
1139 })
1140 .await;
1141 Ok(json!({
1142 "ok": true,
1143 "status": "completed",
1144 "execution_kind": execution_kind,
1145 "response_id": response_id,
1146 "precheck": precheck,
1147 "output": tool_output,
1148 "events": [
1149 event_frame_payload(&start_frame),
1150 event_frame_payload(&result_frame)
1151 ]
1152 }))
1153 }
1154 Err(err) => {
1155 let message = format!("{err:?}");
1156 let error_frame = EventFrame::Error {
1157 response_id: response_id.clone(),
1158 message: message.clone(),
1159 };
1160 self.hooks
1161 .emit(HookEvent::GenericEventFrame {
1162 frame: error_frame.clone(),
1163 })
1164 .await;
1165 self.hooks
1166 .emit(HookEvent::ToolLifecycle {
1167 response_id: response_id.clone(),
1168 tool_name: call.name,
1169 phase: "failed".to_string(),
1170 payload: json!({ "error": message.clone() }),
1171 })
1172 .await;
1173 Ok(json!({
1174 "ok": false,
1175 "status": "failed",
1176 "execution_kind": execution_kind,
1177 "response_id": response_id,
1178 "precheck": precheck,
1179 "error": message,
1180 "events": [
1181 event_frame_payload(&start_frame),
1182 event_frame_payload(&error_frame)
1183 ]
1184 }))
1185 }
1186 }
1187 }
1188
1189 pub async fn mcp_startup(&self) -> McpStartupCompleteEvent {
1190 let mut updates = Vec::new();
1191 let summary = self.mcp_manager.start_all(|update| {
1192 updates.push(update);
1193 });
1194 for update in updates {
1195 let status = match update.status {
1196 McpManagerStartupStatus::Starting => deepseek_protocol::McpStartupStatus::Starting,
1197 McpManagerStartupStatus::Ready => deepseek_protocol::McpStartupStatus::Ready,
1198 McpManagerStartupStatus::Failed { error } => {
1199 deepseek_protocol::McpStartupStatus::Failed { error }
1200 }
1201 McpManagerStartupStatus::Cancelled => {
1202 deepseek_protocol::McpStartupStatus::Cancelled
1203 }
1204 };
1205 self.hooks
1206 .emit(HookEvent::GenericEventFrame {
1207 frame: EventFrame::McpStartupUpdate {
1208 update: deepseek_protocol::McpStartupUpdateEvent {
1209 server_name: update.server_name,
1210 status,
1211 },
1212 },
1213 })
1214 .await;
1215 }
1216 self.hooks
1217 .emit(HookEvent::GenericEventFrame {
1218 frame: EventFrame::McpStartupComplete {
1219 summary: deepseek_protocol::McpStartupCompleteEvent {
1220 ready: summary.ready.clone(),
1221 failed: summary
1222 .failed
1223 .iter()
1224 .map(|f| deepseek_protocol::McpStartupFailure {
1225 server_name: f.server_name.clone(),
1226 error: f.error.clone(),
1227 })
1228 .collect(),
1229 cancelled: summary.cancelled.clone(),
1230 },
1231 },
1232 })
1233 .await;
1234 summary
1235 }
1236
1237 pub fn app_status(&self) -> AppResponse {
1238 let jobs = self.jobs.list();
1239 let events = jobs
1240 .iter()
1241 .flat_map(|job| {
1242 job.history.iter().map(|entry| EventFrame::ResponseDelta {
1243 response_id: job.id.clone(),
1244 delta: json!({
1245 "kind": "job_transition",
1246 "job_id": job.id.clone(),
1247 "phase": entry.phase.clone(),
1248 "status": job_status_to_str(entry.status),
1249 "progress": entry.progress,
1250 "detail": entry.detail.clone(),
1251 "retry": job_retry_to_value(&entry.retry),
1252 "at": entry.at
1253 })
1254 .to_string(),
1255 })
1256 })
1257 .collect::<Vec<_>>();
1258 AppResponse {
1259 ok: true,
1260 data: json!({
1261 "jobs": jobs.into_iter().map(|job| {
1262 json!({
1263 "id": job.id,
1264 "name": job.name,
1265 "status": job_status_to_str(job.status),
1266 "progress": job.progress,
1267 "detail": job.detail,
1268 "retry": job_retry_to_value(&job.retry),
1269 "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
1270 })
1271 }).collect::<Vec<_>>()
1272 }),
1273 events,
1274 }
1275 }
1276
1277 pub fn provider_default(&self) -> ProviderKind {
1278 self.config.provider
1279 }
1280
1281 pub fn save_thread_checkpoint(
1282 &self,
1283 thread_id: &str,
1284 checkpoint_id: &str,
1285 state: &Value,
1286 ) -> Result<()> {
1287 self.thread_manager
1288 .state_store()
1289 .save_checkpoint(thread_id, checkpoint_id, state)
1290 }
1291
1292 pub fn load_thread_checkpoint(
1293 &self,
1294 thread_id: &str,
1295 checkpoint_id: Option<&str>,
1296 ) -> Result<Option<Value>> {
1297 Ok(self
1298 .thread_manager
1299 .state_store()
1300 .load_checkpoint(thread_id, checkpoint_id)?
1301 .map(|checkpoint| checkpoint.state))
1302 }
1303
1304 pub fn enqueue_job(&mut self, name: impl Into<String>) -> Result<JobRecord> {
1305 let job = self.jobs.enqueue(name);
1306 self.jobs
1307 .persist_job(self.thread_manager.state_store(), &job.id)?;
1308 Ok(job)
1309 }
1310
1311 pub fn set_job_running(&mut self, job_id: &str) -> Result<()> {
1312 self.jobs.set_running(job_id);
1313 self.jobs
1314 .persist_job(self.thread_manager.state_store(), job_id)
1315 }
1316
1317 pub fn update_job_progress(
1318 &mut self,
1319 job_id: &str,
1320 progress: u8,
1321 detail: Option<String>,
1322 ) -> Result<()> {
1323 self.jobs.update_progress(job_id, progress, detail);
1324 self.jobs
1325 .persist_job(self.thread_manager.state_store(), job_id)
1326 }
1327
1328 pub fn complete_job(&mut self, job_id: &str) -> Result<()> {
1329 self.jobs.complete(job_id);
1330 self.jobs
1331 .persist_job(self.thread_manager.state_store(), job_id)
1332 }
1333
1334 pub fn fail_job(&mut self, job_id: &str, detail: impl Into<String>) -> Result<()> {
1335 self.jobs.fail(job_id, detail);
1336 self.jobs
1337 .persist_job(self.thread_manager.state_store(), job_id)
1338 }
1339
1340 pub fn cancel_job(&mut self, job_id: &str) -> Result<()> {
1341 self.jobs.cancel(job_id);
1342 self.jobs
1343 .persist_job(self.thread_manager.state_store(), job_id)
1344 }
1345
1346 pub fn pause_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1347 self.jobs.pause(job_id, detail);
1348 self.jobs
1349 .persist_job(self.thread_manager.state_store(), job_id)
1350 }
1351
1352 pub fn resume_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1353 self.jobs.resume(job_id, detail);
1354 self.jobs
1355 .persist_job(self.thread_manager.state_store(), job_id)
1356 }
1357
1358 pub fn job_history(&self, job_id: &str) -> Vec<JobHistoryEntry> {
1359 self.jobs.history(job_id)
1360 }
1361}
1362
1363fn thread_response_from_new(status: &str, new: NewThread) -> ThreadResponse {
1364 ThreadResponse {
1365 thread_id: new.thread.id.clone(),
1366 status: status.to_string(),
1367 thread: Some(new.thread),
1368 threads: Vec::new(),
1369 model: Some(new.model),
1370 model_provider: Some(new.model_provider),
1371 cwd: Some(new.cwd),
1372 approval_policy: new.approval_policy,
1373 sandbox: new.sandbox,
1374 events: Vec::new(),
1375 data: json!({}),
1376 }
1377}
1378
1379fn preview_from_initial_history(initial_history: &InitialHistory) -> String {
1380 match initial_history {
1381 InitialHistory::New => "New conversation".to_string(),
1382 InitialHistory::Forked(items) => truncate_preview(
1383 &items
1384 .first()
1385 .map(Value::to_string)
1386 .unwrap_or_else(|| "Forked conversation".to_string()),
1387 ),
1388 InitialHistory::Resumed { history, .. } => truncate_preview(
1389 &history
1390 .first()
1391 .map(Value::to_string)
1392 .unwrap_or_else(|| "Resumed conversation".to_string()),
1393 ),
1394 }
1395}
1396
1397fn truncate_preview(value: &str) -> String {
1398 value.chars().take(120).collect()
1399}
1400
1401fn to_protocol_thread(thread: ThreadMetadata) -> Thread {
1402 Thread {
1403 id: thread.id,
1404 preview: thread.preview,
1405 ephemeral: thread.ephemeral,
1406 model_provider: thread.model_provider,
1407 created_at: thread.created_at,
1408 updated_at: thread.updated_at,
1409 status: match thread.status {
1410 PersistedThreadStatus::Running => ThreadStatus::Running,
1411 PersistedThreadStatus::Idle => ThreadStatus::Idle,
1412 PersistedThreadStatus::Completed => ThreadStatus::Completed,
1413 PersistedThreadStatus::Failed => ThreadStatus::Failed,
1414 PersistedThreadStatus::Paused => ThreadStatus::Paused,
1415 PersistedThreadStatus::Archived => ThreadStatus::Archived,
1416 },
1417 path: thread.path,
1418 cwd: thread.cwd,
1419 cli_version: thread.cli_version,
1420 source: match thread.source {
1421 SessionSource::Interactive => deepseek_protocol::SessionSource::Interactive,
1422 SessionSource::Resume => deepseek_protocol::SessionSource::Resume,
1423 SessionSource::Fork => deepseek_protocol::SessionSource::Fork,
1424 SessionSource::Api => deepseek_protocol::SessionSource::Api,
1425 SessionSource::Unknown => deepseek_protocol::SessionSource::Unknown,
1426 },
1427 name: thread.name,
1428 }
1429}
1430
1431fn to_persisted_status(status: &ThreadStatus) -> PersistedThreadStatus {
1432 match status {
1433 ThreadStatus::Running => PersistedThreadStatus::Running,
1434 ThreadStatus::Idle => PersistedThreadStatus::Idle,
1435 ThreadStatus::Completed => PersistedThreadStatus::Completed,
1436 ThreadStatus::Failed => PersistedThreadStatus::Failed,
1437 ThreadStatus::Paused => PersistedThreadStatus::Paused,
1438 ThreadStatus::Archived => PersistedThreadStatus::Archived,
1439 }
1440}
1441
1442fn to_persisted_source(source: &deepseek_protocol::SessionSource) -> SessionSource {
1443 match source {
1444 deepseek_protocol::SessionSource::Interactive => SessionSource::Interactive,
1445 deepseek_protocol::SessionSource::Resume => SessionSource::Resume,
1446 deepseek_protocol::SessionSource::Fork => SessionSource::Fork,
1447 deepseek_protocol::SessionSource::Api => SessionSource::Api,
1448 deepseek_protocol::SessionSource::Unknown => SessionSource::Unknown,
1449 }
1450}
1451
1452fn approval_request_frame(
1453 requirement: &ExecApprovalRequirement,
1454 call_id: String,
1455 approval_id: String,
1456 turn_id: String,
1457 command: String,
1458 cwd: String,
1459) -> Option<EventFrame> {
1460 let ExecApprovalRequirement::NeedsApproval {
1461 reason,
1462 proposed_execpolicy_amendment,
1463 proposed_network_policy_amendments,
1464 } = requirement
1465 else {
1466 return None;
1467 };
1468
1469 let mut available_decisions = vec![
1470 ReviewDecision::Approved,
1471 ReviewDecision::ApprovedForSession,
1472 ReviewDecision::Denied,
1473 ReviewDecision::Abort,
1474 ];
1475 if proposed_execpolicy_amendment
1476 .as_ref()
1477 .is_some_and(|amendment| !amendment.prefixes.is_empty())
1478 {
1479 available_decisions.push(ReviewDecision::ApprovedExecpolicyAmendment);
1480 }
1481 available_decisions.extend(proposed_network_policy_amendments.iter().cloned().map(
1482 |amendment| ReviewDecision::NetworkPolicyAmendment {
1483 host: amendment.host,
1484 action: amendment.action,
1485 },
1486 ));
1487
1488 Some(EventFrame::ExecApprovalRequest {
1489 request: ExecApprovalRequestEvent {
1490 call_id,
1491 approval_id,
1492 turn_id,
1493 command,
1494 cwd,
1495 reason: reason.clone(),
1496 network_approval_context: None,
1497 proposed_execpolicy_amendment: proposed_execpolicy_amendment
1498 .as_ref()
1499 .map(|amendment| amendment.prefixes.clone())
1500 .unwrap_or_default(),
1501 proposed_network_policy_amendments: proposed_network_policy_amendments.clone(),
1502 additional_permissions: Vec::new(),
1503 available_decisions,
1504 },
1505 })
1506}
1507
1508fn approval_requirement_payload(requirement: &ExecApprovalRequirement) -> Value {
1509 match requirement {
1510 ExecApprovalRequirement::Skip {
1511 bypass_sandbox,
1512 proposed_execpolicy_amendment,
1513 } => json!({
1514 "type": "skip",
1515 "bypass_sandbox": bypass_sandbox,
1516 "reason": requirement.reason(),
1517 "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1518 .as_ref()
1519 .map(|amendment| amendment.prefixes.clone())
1520 .unwrap_or_default()
1521 }),
1522 ExecApprovalRequirement::NeedsApproval {
1523 reason,
1524 proposed_execpolicy_amendment,
1525 proposed_network_policy_amendments,
1526 } => json!({
1527 "type": "needs_approval",
1528 "reason": reason,
1529 "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1530 .as_ref()
1531 .map(|amendment| amendment.prefixes.clone())
1532 .unwrap_or_default(),
1533 "proposed_network_policy_amendments": proposed_network_policy_amendments
1534 }),
1535 ExecApprovalRequirement::Forbidden { reason } => json!({
1536 "type": "forbidden",
1537 "reason": reason
1538 }),
1539 }
1540}
1541
1542fn policy_precheck_payload(
1543 decision: &ExecPolicyDecision,
1544 command: &str,
1545 cwd: &str,
1546 execution_kind: &str,
1547) -> Value {
1548 json!({
1549 "execution_kind": execution_kind,
1550 "command": command,
1551 "cwd": cwd,
1552 "allow": decision.allow,
1553 "requires_approval": decision.requires_approval,
1554 "matched_rule": decision.matched_rule.clone(),
1555 "phase": decision.requirement.phase(),
1556 "reason": decision.reason(),
1557 "requirement": approval_requirement_payload(&decision.requirement)
1558 })
1559}
1560
1561fn tool_payload_value(payload: &ToolPayload) -> Value {
1562 serde_json::to_value(payload).unwrap_or_else(
1563 |_| json!({"type":"serialization_error","message":"tool payload unavailable"}),
1564 )
1565}
1566
1567fn tool_output_value(output: &deepseek_protocol::ToolOutput) -> Value {
1568 serde_json::to_value(output).unwrap_or_else(
1569 |_| json!({"type":"serialization_error","message":"tool output unavailable"}),
1570 )
1571}
1572
1573fn event_frame_payload(frame: &EventFrame) -> Value {
1574 serde_json::to_value(frame)
1575 .unwrap_or_else(|_| json!({"event":"error","message":"failed to encode event frame"}))
1576}
1577
1578fn json_optional_string(value: &Value) -> Option<String> {
1579 if value.is_null() {
1580 None
1581 } else {
1582 value.as_str().map(ToString::to_string)
1583 }
1584}
1585
1586fn parse_retry_metadata(value: Option<&Value>) -> JobRetryMetadata {
1587 let Some(value) = value else {
1588 return JobRetryMetadata::default();
1589 };
1590 JobRetryMetadata {
1591 attempt: value
1592 .get("attempt")
1593 .and_then(Value::as_u64)
1594 .unwrap_or(0)
1595 .min(u32::MAX as u64) as u32,
1596 max_attempts: value
1597 .get("max_attempts")
1598 .and_then(Value::as_u64)
1599 .unwrap_or(DEFAULT_JOB_MAX_ATTEMPTS as u64)
1600 .min(u32::MAX as u64) as u32,
1601 backoff_base_ms: value
1602 .get("backoff_base_ms")
1603 .and_then(Value::as_u64)
1604 .unwrap_or(DEFAULT_JOB_BACKOFF_BASE_MS),
1605 next_backoff_ms: value
1606 .get("next_backoff_ms")
1607 .and_then(Value::as_u64)
1608 .unwrap_or(0),
1609 next_retry_at: value.get("next_retry_at").and_then(Value::as_i64),
1610 }
1611}
1612
1613fn parse_history_entry(value: &Value) -> Option<JobHistoryEntry> {
1614 let status = value
1615 .get("status")
1616 .and_then(Value::as_str)
1617 .and_then(job_status_from_str)?;
1618 Some(JobHistoryEntry {
1619 at: value.get("at").and_then(Value::as_i64).unwrap_or(0),
1620 phase: value
1621 .get("phase")
1622 .and_then(Value::as_str)
1623 .unwrap_or("unknown")
1624 .to_string(),
1625 status,
1626 progress: value
1627 .get("progress")
1628 .and_then(Value::as_u64)
1629 .map(|v| v.min(u8::MAX as u64) as u8),
1630 detail: value.get("detail").and_then(json_optional_string),
1631 retry: parse_retry_metadata(value.get("retry")),
1632 })
1633}
1634
1635fn job_status_to_str(status: JobStatus) -> &'static str {
1636 match status {
1637 JobStatus::Queued => "queued",
1638 JobStatus::Running => "running",
1639 JobStatus::Paused => "paused",
1640 JobStatus::Completed => "completed",
1641 JobStatus::Failed => "failed",
1642 JobStatus::Cancelled => "cancelled",
1643 }
1644}
1645
1646fn job_status_from_str(value: &str) -> Option<JobStatus> {
1647 match value {
1648 "queued" => Some(JobStatus::Queued),
1649 "running" => Some(JobStatus::Running),
1650 "paused" => Some(JobStatus::Paused),
1651 "completed" => Some(JobStatus::Completed),
1652 "failed" => Some(JobStatus::Failed),
1653 "cancelled" => Some(JobStatus::Cancelled),
1654 _ => None,
1655 }
1656}
1657
1658fn job_retry_to_value(retry: &JobRetryMetadata) -> Value {
1659 json!({
1660 "attempt": retry.attempt,
1661 "max_attempts": retry.max_attempts,
1662 "backoff_base_ms": retry.backoff_base_ms,
1663 "next_backoff_ms": retry.next_backoff_ms,
1664 "next_retry_at": retry.next_retry_at
1665 })
1666}
1667
1668fn job_history_to_value(entry: &JobHistoryEntry) -> Value {
1669 json!({
1670 "at": entry.at,
1671 "phase": entry.phase.clone(),
1672 "status": job_status_to_str(entry.status),
1673 "progress": entry.progress,
1674 "detail": entry.detail.clone(),
1675 "retry": job_retry_to_value(&entry.retry)
1676 })
1677}
1678
1679fn runtime_status_to_job_state(status: JobStatus) -> JobStateStatus {
1680 match status {
1681 JobStatus::Queued => JobStateStatus::Queued,
1682 JobStatus::Running => JobStateStatus::Running,
1683 JobStatus::Paused => JobStateStatus::Running,
1684 JobStatus::Completed => JobStateStatus::Completed,
1685 JobStatus::Failed => JobStateStatus::Failed,
1686 JobStatus::Cancelled => JobStateStatus::Cancelled,
1687 }
1688}
1689
1690fn job_state_status_to_runtime(status: JobStateStatus) -> JobStatus {
1691 match status {
1692 JobStateStatus::Queued => JobStatus::Queued,
1693 JobStateStatus::Running => JobStatus::Running,
1694 JobStateStatus::Completed => JobStatus::Completed,
1695 JobStateStatus::Failed => JobStatus::Failed,
1696 JobStateStatus::Cancelled => JobStatus::Cancelled,
1697 }
1698}