1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use anyhow::Result;
6use codewhale_agent::ModelRegistry;
7use codewhale_config::{CliRuntimeOverrides, ConfigToml, ProviderKind};
8use codewhale_execpolicy::{
9 AskForApproval, ExecApprovalRequirement, ExecPolicyContext, ExecPolicyDecision,
10 ExecPolicyEngine,
11};
12use codewhale_hooks::{HookDispatcher, HookEvent};
13use codewhale_mcp::{
14 McpManager, McpStartupCompleteEvent, McpStartupStatus as McpManagerStartupStatus,
15};
16use codewhale_protocol::{
17 AppResponse, EventFrame, ExecApprovalRequestEvent, PromptRequest, PromptResponse,
18 ResponseChannel, ReviewDecision, Thread, ThreadForkParams, ThreadListParams, ThreadReadParams,
19 ThreadRequest, ThreadResponse, ThreadResumeParams, ThreadSetNameParams, ThreadStatus,
20 ToolPayload,
21};
22use codewhale_state::{
23 JobStateRecord, JobStateStatus, SessionSource, StateStore, ThreadListFilters, ThreadMetadata,
24 ThreadStatus as PersistedThreadStatus,
25};
26use codewhale_tools::{ToolCall, ToolRegistry};
27use serde_json::{Value, json};
28use uuid::Uuid;
29
30#[derive(Debug, Clone)]
31pub enum InitialHistory {
32 New,
33 Forked(Vec<Value>),
34 Resumed {
35 conversation_id: String,
36 history: Vec<Value>,
37 rollout_path: PathBuf,
38 },
39}
40
41#[derive(Debug, Clone)]
42pub struct NewThread {
43 pub thread: Thread,
44 pub model: String,
45 pub model_provider: String,
46 pub cwd: PathBuf,
47 pub approval_policy: Option<String>,
48 pub sandbox: Option<String>,
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum JobStatus {
53 Queued,
54 Running,
55 Paused,
56 Completed,
57 Failed,
58 Cancelled,
59}
60
61const JOB_DETAIL_SCHEMA_VERSION: u8 = 1;
62const DEFAULT_JOB_MAX_ATTEMPTS: u32 = 3;
63const DEFAULT_JOB_BACKOFF_BASE_MS: u64 = 500;
64const MAX_JOB_HISTORY_ENTRIES: usize = 64;
65
66#[derive(Debug, Clone)]
67pub struct JobRetryMetadata {
68 pub attempt: u32,
69 pub max_attempts: u32,
70 pub backoff_base_ms: u64,
71 pub next_backoff_ms: u64,
72 pub next_retry_at: Option<i64>,
73}
74
75impl Default for JobRetryMetadata {
76 fn default() -> Self {
77 Self {
78 attempt: 0,
79 max_attempts: DEFAULT_JOB_MAX_ATTEMPTS,
80 backoff_base_ms: DEFAULT_JOB_BACKOFF_BASE_MS,
81 next_backoff_ms: 0,
82 next_retry_at: None,
83 }
84 }
85}
86
87#[derive(Debug, Clone)]
88pub struct JobHistoryEntry {
89 pub at: i64,
90 pub phase: String,
91 pub status: JobStatus,
92 pub progress: Option<u8>,
93 pub detail: Option<String>,
94 pub retry: JobRetryMetadata,
95}
96
97#[derive(Debug, Clone)]
98struct PersistedJobDetail {
99 pub status: JobStatus,
100 pub detail: Option<String>,
101 pub retry: JobRetryMetadata,
102 pub history: Vec<JobHistoryEntry>,
103}
104
105#[derive(Debug, Clone)]
106pub struct JobRecord {
107 pub id: String,
108 pub name: String,
109 pub status: JobStatus,
110 pub progress: Option<u8>,
111 pub detail: Option<String>,
112 pub retry: JobRetryMetadata,
113 pub history: Vec<JobHistoryEntry>,
114 pub created_at: i64,
115 pub updated_at: i64,
116}
117
118#[derive(Debug, Default)]
119pub struct JobManager {
120 jobs: HashMap<String, JobRecord>,
121}
122
123impl JobManager {
124 fn now_ts() -> i64 {
125 chrono::Utc::now().timestamp()
126 }
127
128 fn deterministic_backoff_ms(retry: &JobRetryMetadata) -> u64 {
129 if retry.attempt == 0 {
130 return 0;
131 }
132 let exponent = retry.attempt.saturating_sub(1).min(20);
133 let multiplier = 1u64.checked_shl(exponent).unwrap_or(u64::MAX);
134 retry.backoff_base_ms.saturating_mul(multiplier)
135 }
136
137 fn clear_retry_schedule(retry: &mut JobRetryMetadata) {
138 retry.next_backoff_ms = 0;
139 retry.next_retry_at = None;
140 }
141
142 fn push_history(job: &mut JobRecord, phase: &str) {
143 job.history.push(JobHistoryEntry {
144 at: job.updated_at,
145 phase: phase.to_string(),
146 status: job.status,
147 progress: job.progress,
148 detail: job.detail.clone(),
149 retry: job.retry.clone(),
150 });
151 if job.history.len() > MAX_JOB_HISTORY_ENTRIES {
152 let to_drain = job.history.len() - MAX_JOB_HISTORY_ENTRIES;
153 job.history.drain(0..to_drain);
154 }
155 }
156
157 fn parse_persisted_detail(raw: Option<&str>) -> Option<PersistedJobDetail> {
158 let raw = raw?;
159 let parsed: Value = serde_json::from_str(raw).ok()?;
160 let status = parsed
161 .get("status")
162 .and_then(Value::as_str)
163 .and_then(job_status_from_str)?;
164 let detail = parsed.get("detail").and_then(json_optional_string);
165 let retry = parse_retry_metadata(parsed.get("retry"));
166 let history = parsed
167 .get("history")
168 .and_then(Value::as_array)
169 .map(|items| {
170 items
171 .iter()
172 .filter_map(parse_history_entry)
173 .collect::<Vec<_>>()
174 })
175 .unwrap_or_default();
176 Some(PersistedJobDetail {
177 status,
178 detail,
179 retry,
180 history,
181 })
182 }
183
184 fn encode_persisted_detail(job: &JobRecord) -> Result<Option<String>> {
185 let encoded = json!({
186 "schema_version": JOB_DETAIL_SCHEMA_VERSION,
187 "status": job_status_to_str(job.status),
188 "detail": job.detail.clone(),
189 "retry": job_retry_to_value(&job.retry),
190 "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
191 })
192 .to_string();
193 Ok(Some(encoded))
194 }
195
196 pub fn enqueue(&mut self, name: impl Into<String>) -> JobRecord {
197 let now = Self::now_ts();
198 let id = format!("job-{}", Uuid::new_v4());
199 let mut job = JobRecord {
200 id: id.clone(),
201 name: name.into(),
202 status: JobStatus::Queued,
203 progress: Some(0),
204 detail: None,
205 retry: JobRetryMetadata::default(),
206 history: Vec::new(),
207 created_at: now,
208 updated_at: now,
209 };
210 Self::push_history(&mut job, "created");
211 self.jobs.insert(id, job.clone());
212 job
213 }
214
215 pub fn set_running(&mut self, id: &str) {
216 if let Some(job) = self.jobs.get_mut(id) {
217 job.status = JobStatus::Running;
218 Self::clear_retry_schedule(&mut job.retry);
219 job.updated_at = Self::now_ts();
220 Self::push_history(job, "running");
221 }
222 }
223
224 pub fn update_progress(&mut self, id: &str, progress: u8, detail: Option<String>) {
225 if let Some(job) = self.jobs.get_mut(id) {
226 job.progress = Some(progress.min(100));
227 job.detail = detail;
228 job.updated_at = Self::now_ts();
229 Self::push_history(job, "progress_updated");
230 }
231 }
232
233 pub fn complete(&mut self, id: &str) {
234 if let Some(job) = self.jobs.get_mut(id) {
235 job.status = JobStatus::Completed;
236 job.progress = Some(100);
237 Self::clear_retry_schedule(&mut job.retry);
238 job.updated_at = Self::now_ts();
239 Self::push_history(job, "completed");
240 }
241 }
242
243 pub fn fail(&mut self, id: &str, detail: impl Into<String>) {
244 if let Some(job) = self.jobs.get_mut(id) {
245 let now = Self::now_ts();
246 job.status = JobStatus::Failed;
247 job.detail = Some(detail.into());
248 if job.retry.attempt < job.retry.max_attempts {
249 job.retry.attempt += 1;
250 job.retry.next_backoff_ms = Self::deterministic_backoff_ms(&job.retry);
251 let delay_secs = ((job.retry.next_backoff_ms.saturating_add(999)) / 1000)
252 .min(i64::MAX as u64) as i64;
253 job.retry.next_retry_at = Some(now.saturating_add(delay_secs));
254 } else {
255 Self::clear_retry_schedule(&mut job.retry);
256 }
257 job.updated_at = now;
258 Self::push_history(job, "failed");
259 }
260 }
261
262 pub fn cancel(&mut self, id: &str) {
263 if let Some(job) = self.jobs.get_mut(id) {
264 job.status = JobStatus::Cancelled;
265 Self::clear_retry_schedule(&mut job.retry);
266 job.updated_at = Self::now_ts();
267 Self::push_history(job, "cancelled");
268 }
269 }
270
271 pub fn pause(&mut self, id: &str, detail: Option<String>) {
272 if let Some(job) = self.jobs.get_mut(id) {
273 job.status = JobStatus::Paused;
274 if detail.is_some() {
275 job.detail = detail;
276 }
277 job.updated_at = Self::now_ts();
278 Self::push_history(job, "paused");
279 }
280 }
281
282 pub fn resume(&mut self, id: &str, detail: Option<String>) {
283 if let Some(job) = self.jobs.get_mut(id) {
284 job.status = JobStatus::Running;
285 if detail.is_some() {
286 job.detail = detail;
287 }
288 Self::clear_retry_schedule(&mut job.retry);
289 job.updated_at = Self::now_ts();
290 Self::push_history(job, "resumed");
291 }
292 }
293
294 pub fn list(&self) -> Vec<JobRecord> {
295 let mut out = self.jobs.values().cloned().collect::<Vec<_>>();
296 out.sort_by_key(|job| std::cmp::Reverse(job.updated_at));
297 out
298 }
299
300 pub fn history(&self, id: &str) -> Vec<JobHistoryEntry> {
301 self.jobs
302 .get(id)
303 .map(|job| job.history.clone())
304 .unwrap_or_default()
305 }
306
307 pub fn resume_pending(&mut self) -> Vec<JobRecord> {
308 let mut resumed = Vec::new();
309 for job in self.jobs.values_mut() {
310 if matches!(job.status, JobStatus::Queued | JobStatus::Running) {
311 job.status = JobStatus::Queued;
312 job.updated_at = Self::now_ts();
313 Self::push_history(job, "queued_after_resume");
314 resumed.push(job.clone());
315 }
316 }
317 resumed
318 }
319
320 pub fn load_from_store(&mut self, store: &StateStore) -> Result<()> {
321 let persisted = store.list_jobs(Some(500))?;
322 for job in persisted {
323 let fallback_status = job_state_status_to_runtime(job.status);
324 let parsed = Self::parse_persisted_detail(job.detail.as_deref());
325 let (status, detail, retry, history) = if let Some(detail_state) = parsed {
326 (
327 detail_state.status,
328 detail_state.detail,
329 detail_state.retry,
330 detail_state.history,
331 )
332 } else {
333 (
334 fallback_status,
335 job.detail,
336 JobRetryMetadata::default(),
337 Vec::new(),
338 )
339 };
340 self.jobs.insert(
341 job.id.clone(),
342 JobRecord {
343 id: job.id,
344 name: job.name,
345 status,
346 progress: job.progress,
347 detail,
348 retry,
349 history,
350 created_at: job.created_at,
351 updated_at: job.updated_at,
352 },
353 );
354 }
355 Ok(())
356 }
357
358 pub fn persist_job(&self, store: &StateStore, id: &str) -> Result<()> {
359 let Some(job) = self.jobs.get(id) else {
360 return Ok(());
361 };
362 let encoded_detail = Self::encode_persisted_detail(job)?;
363 store.upsert_job(&JobStateRecord {
364 id: job.id.clone(),
365 name: job.name.clone(),
366 status: runtime_status_to_job_state(job.status),
367 progress: job.progress,
368 detail: encoded_detail,
369 created_at: job.created_at,
370 updated_at: job.updated_at,
371 })
372 }
373
374 pub fn persist_all(&self, store: &StateStore) -> Result<()> {
375 for id in self.jobs.keys() {
376 self.persist_job(store, id)?;
377 }
378 Ok(())
379 }
380}
381
382pub struct ThreadManager {
383 store: StateStore,
384 running_threads: HashMap<String, Thread>,
385 cli_version: String,
386}
387
388impl ThreadManager {
389 pub fn new(store: StateStore) -> Self {
390 Self {
391 store,
392 running_threads: HashMap::new(),
393 cli_version: env!("CARGO_PKG_VERSION").to_string(),
394 }
395 }
396
397 pub fn state_store(&self) -> &StateStore {
398 &self.store
399 }
400
401 pub fn spawn_thread_with_history(
402 &mut self,
403 model_provider: String,
404 cwd: PathBuf,
405 initial_history: InitialHistory,
406 persist_extended_history: bool,
407 ) -> Result<NewThread> {
408 let id = format!("thread-{}", Uuid::new_v4());
409 let now = chrono::Utc::now().timestamp();
410 let preview = preview_from_initial_history(&initial_history);
411 let source = match initial_history {
412 InitialHistory::New => SessionSource::Interactive,
413 InitialHistory::Forked(_) => SessionSource::Fork,
414 InitialHistory::Resumed { .. } => SessionSource::Resume,
415 };
416 let thread = Thread {
417 id: id.clone(),
418 preview,
419 ephemeral: !persist_extended_history,
420 model_provider: model_provider.clone(),
421 created_at: now,
422 updated_at: now,
423 status: ThreadStatus::Running,
424 path: None,
425 cwd: cwd.clone(),
426 cli_version: self.cli_version.clone(),
427 source: match source {
428 SessionSource::Interactive => codewhale_protocol::SessionSource::Interactive,
429 SessionSource::Resume => codewhale_protocol::SessionSource::Resume,
430 SessionSource::Fork => codewhale_protocol::SessionSource::Fork,
431 SessionSource::Api => codewhale_protocol::SessionSource::Api,
432 SessionSource::Unknown => codewhale_protocol::SessionSource::Unknown,
433 },
434 name: None,
435 };
436 self.persist_thread(&thread, None)?;
437 match &initial_history {
438 InitialHistory::Forked(items) => {
439 for item in items {
440 self.store.append_message(
441 &thread.id,
442 "history",
443 &item.to_string(),
444 Some(item.clone()),
445 )?;
446 }
447 }
448 InitialHistory::Resumed { history, .. } => {
449 for item in history {
450 self.store.append_message(
451 &thread.id,
452 "history",
453 &item.to_string(),
454 Some(item.clone()),
455 )?;
456 }
457 }
458 InitialHistory::New => {}
459 }
460 self.running_threads
461 .insert(thread.id.clone(), thread.clone());
462 Ok(NewThread {
463 thread,
464 model: "auto".to_string(),
465 model_provider,
466 cwd,
467 approval_policy: None,
468 sandbox: None,
469 })
470 }
471
472 pub fn resume_thread_with_history(
473 &mut self,
474 params: &ThreadResumeParams,
475 fallback_cwd: &Path,
476 model_provider: String,
477 ) -> Result<Option<NewThread>> {
478 if params.history.is_none()
479 && let Some(thread) = self.running_threads.get(¶ms.thread_id).cloned()
480 {
481 return Ok(Some(NewThread {
482 model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
483 model_provider: params.model_provider.clone().unwrap_or(model_provider),
484 cwd: params.cwd.clone().unwrap_or_else(|| thread.cwd.clone()),
485 approval_policy: params.approval_policy.clone(),
486 sandbox: params.sandbox.clone(),
487 thread,
488 }));
489 }
490
491 let persisted = self.store.get_thread(¶ms.thread_id)?;
492 let Some(metadata) = persisted else {
493 return Ok(None);
494 };
495 let mut thread = to_protocol_thread(metadata);
496 thread.status = ThreadStatus::Running;
497 thread.updated_at = chrono::Utc::now().timestamp();
498 thread.cwd = params
499 .cwd
500 .clone()
501 .unwrap_or_else(|| fallback_cwd.to_path_buf());
502 self.persist_thread(&thread, None)?;
503 self.running_threads
504 .insert(thread.id.clone(), thread.clone());
505 if let Some(history) = params.history.as_ref() {
506 for item in history {
507 self.store.append_message(
508 &thread.id,
509 "history",
510 &item.to_string(),
511 Some(item.clone()),
512 )?;
513 }
514 }
515
516 Ok(Some(NewThread {
517 model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
518 model_provider: params.model_provider.clone().unwrap_or(model_provider),
519 cwd: thread.cwd.clone(),
520 approval_policy: params.approval_policy.clone(),
521 sandbox: params.sandbox.clone(),
522 thread,
523 }))
524 }
525
526 pub fn fork_thread(
527 &mut self,
528 params: &ThreadForkParams,
529 fallback_cwd: &Path,
530 ) -> Result<Option<NewThread>> {
531 let parent = self.store.get_thread(¶ms.thread_id)?;
532 let Some(parent) = parent else {
533 return Ok(None);
534 };
535 let parent_thread = to_protocol_thread(parent);
536 let new = self.spawn_thread_with_history(
537 params
538 .model_provider
539 .clone()
540 .unwrap_or_else(|| parent_thread.model_provider.clone()),
541 params
542 .cwd
543 .clone()
544 .unwrap_or_else(|| fallback_cwd.to_path_buf()),
545 InitialHistory::Forked(vec![json!({
546 "type": "fork",
547 "from_thread_id": parent_thread.id
548 })]),
549 params.persist_extended_history,
550 )?;
551 Ok(Some(new))
552 }
553
554 pub fn list_threads(&self, params: &ThreadListParams) -> Result<Vec<Thread>> {
555 let list = self.store.list_threads(ThreadListFilters {
556 include_archived: params.include_archived,
557 limit: params.limit,
558 })?;
559 Ok(list.into_iter().map(to_protocol_thread).collect())
560 }
561
562 pub fn read_thread(&self, params: &ThreadReadParams) -> Result<Option<Thread>> {
563 Ok(self
564 .store
565 .get_thread(¶ms.thread_id)?
566 .map(to_protocol_thread))
567 }
568
569 pub fn set_thread_name(&mut self, params: &ThreadSetNameParams) -> Result<Option<Thread>> {
570 let Some(mut metadata) = self.store.get_thread(¶ms.thread_id)? else {
571 return Ok(None);
572 };
573 metadata.name = Some(params.name.clone());
574 metadata.updated_at = chrono::Utc::now().timestamp();
575 self.store.upsert_thread(&metadata)?;
576 let updated = to_protocol_thread(metadata);
577 self.running_threads
578 .insert(updated.id.clone(), updated.clone());
579 Ok(Some(updated))
580 }
581
582 pub fn archive_thread(&mut self, thread_id: &str) -> Result<()> {
583 self.store.mark_archived(thread_id)?;
584 if let Some(thread) = self.running_threads.get_mut(thread_id) {
585 thread.status = ThreadStatus::Archived;
586 }
587 Ok(())
588 }
589
590 pub fn unarchive_thread(&mut self, thread_id: &str) -> Result<()> {
591 self.store.mark_unarchived(thread_id)?;
592 Ok(())
593 }
594
595 pub fn touch_message(&mut self, thread_id: &str, input: &str) -> Result<()> {
596 let Some(mut metadata) = self.store.get_thread(thread_id)? else {
597 return Ok(());
598 };
599 metadata.updated_at = chrono::Utc::now().timestamp();
600 metadata.preview = truncate_preview(input);
601 metadata.status = PersistedThreadStatus::Running;
602 self.store.upsert_thread(&metadata)?;
603 if let Some(thread) = self.running_threads.get_mut(thread_id) {
604 thread.updated_at = metadata.updated_at;
605 thread.preview = metadata.preview;
606 thread.status = ThreadStatus::Running;
607 }
608 let message_id = self.store.append_message(thread_id, "user", input, None)?;
609 self.store.save_checkpoint(
610 thread_id,
611 "latest",
612 &json!({
613 "reason": "thread_message",
614 "message_id": message_id,
615 "role": "user",
616 "preview": truncate_preview(input),
617 "updated_at": metadata.updated_at
618 }),
619 )?;
620 Ok(())
621 }
622
623 fn persist_thread(&self, thread: &Thread, rollout_path: Option<PathBuf>) -> Result<()> {
624 self.store.upsert_thread(&ThreadMetadata {
625 id: thread.id.clone(),
626 rollout_path,
627 preview: thread.preview.clone(),
628 ephemeral: thread.ephemeral,
629 model_provider: thread.model_provider.clone(),
630 created_at: thread.created_at,
631 updated_at: thread.updated_at,
632 status: to_persisted_status(&thread.status),
633 path: thread.path.clone(),
634 cwd: thread.cwd.clone(),
635 cli_version: thread.cli_version.clone(),
636 source: to_persisted_source(&thread.source),
637 name: thread.name.clone(),
638 sandbox_policy: None,
639 approval_mode: None,
640 archived: matches!(thread.status, ThreadStatus::Archived),
641 archived_at: None,
642 git_sha: None,
643 git_branch: None,
644 git_origin_url: None,
645 memory_mode: None,
646 })
647 }
648}
649
650pub struct Runtime {
651 pub config: ConfigToml,
652 pub model_registry: ModelRegistry,
653 pub thread_manager: ThreadManager,
654 pub tool_registry: Arc<ToolRegistry>,
655 pub mcp_manager: Arc<McpManager>,
656 pub exec_policy: ExecPolicyEngine,
657 pub hooks: HookDispatcher,
658 pub jobs: JobManager,
659}
660
661impl Runtime {
662 pub fn new(
663 config: ConfigToml,
664 model_registry: ModelRegistry,
665 state: StateStore,
666 tool_registry: Arc<ToolRegistry>,
667 mcp_manager: Arc<McpManager>,
668 exec_policy: ExecPolicyEngine,
669 hooks: HookDispatcher,
670 ) -> Self {
671 let mut jobs = JobManager::default();
672 let _ = jobs.load_from_store(&state);
673 Self {
674 config,
675 model_registry,
676 thread_manager: ThreadManager::new(state),
677 tool_registry,
678 mcp_manager,
679 exec_policy,
680 hooks,
681 jobs,
682 }
683 }
684
685 fn persisted_thread_data(&self, thread_id: &str) -> Result<Value> {
686 let history = self
687 .thread_manager
688 .state_store()
689 .list_messages(thread_id, Some(500))?
690 .into_iter()
691 .map(|message| {
692 json!({
693 "id": message.id,
694 "role": message.role,
695 "content": message.content,
696 "item": message.item,
697 "created_at": message.created_at
698 })
699 })
700 .collect::<Vec<_>>();
701
702 let checkpoint = self
703 .thread_manager
704 .state_store()
705 .load_checkpoint(thread_id, None)?
706 .map(|record| {
707 json!({
708 "checkpoint_id": record.checkpoint_id,
709 "state": record.state,
710 "created_at": record.created_at
711 })
712 });
713
714 Ok(json!({
715 "history": history,
716 "checkpoint": checkpoint
717 }))
718 }
719
720 fn persist_latest_checkpoint(&self, thread_id: &str, reason: &str, state: Value) -> Result<()> {
721 self.thread_manager.state_store().save_checkpoint(
722 thread_id,
723 "latest",
724 &json!({
725 "reason": reason,
726 "saved_at": chrono::Utc::now().timestamp(),
727 "state": state
728 }),
729 )
730 }
731
732 pub async fn handle_thread(&mut self, req: ThreadRequest) -> Result<ThreadResponse> {
733 match req {
734 ThreadRequest::Create { .. } => {
735 let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
736 let new = self.thread_manager.spawn_thread_with_history(
737 "deepseek".to_string(),
738 cwd,
739 InitialHistory::New,
740 false,
741 )?;
742 let mut response = thread_response_from_new("created", new);
743 response.data = self.persisted_thread_data(&response.thread_id)?;
744 Ok(response)
745 }
746 ThreadRequest::Start(params) => {
747 let cwd = params.cwd.clone().unwrap_or_else(|| {
748 std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
749 });
750 let new = self.thread_manager.spawn_thread_with_history(
751 params
752 .model_provider
753 .clone()
754 .unwrap_or_else(|| "deepseek".to_string()),
755 cwd,
756 InitialHistory::New,
757 params.persist_extended_history,
758 )?;
759 let mut response = thread_response_from_new("started", new);
760 response.data = self.persisted_thread_data(&response.thread_id)?;
761 Ok(response)
762 }
763 ThreadRequest::Resume(params) => {
764 let fallback_cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
765 if let Some(new) = self.thread_manager.resume_thread_with_history(
766 ¶ms,
767 &fallback_cwd,
768 "deepseek".to_string(),
769 )? {
770 let mut response = thread_response_from_new("resumed", new);
771 response.data = self.persisted_thread_data(&response.thread_id)?;
772 Ok(response)
773 } else {
774 Ok(ThreadResponse {
775 thread_id: params.thread_id,
776 status: "missing".to_string(),
777 thread: None,
778 threads: Vec::new(),
779 model: None,
780 model_provider: None,
781 cwd: None,
782 approval_policy: params.approval_policy,
783 sandbox: params.sandbox,
784 events: Vec::new(),
785 data: json!({"error":"thread not found"}),
786 })
787 }
788 }
789 ThreadRequest::Fork(params) => {
790 let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
791 if let Some(new) = self.thread_manager.fork_thread(¶ms, &cwd)? {
792 let mut response = thread_response_from_new("forked", new);
793 response.data = self.persisted_thread_data(&response.thread_id)?;
794 Ok(response)
795 } else {
796 Ok(ThreadResponse {
797 thread_id: params.thread_id,
798 status: "missing".to_string(),
799 thread: None,
800 threads: Vec::new(),
801 model: None,
802 model_provider: None,
803 cwd: None,
804 approval_policy: params.approval_policy,
805 sandbox: params.sandbox,
806 events: Vec::new(),
807 data: json!({"error":"thread not found"}),
808 })
809 }
810 }
811 ThreadRequest::List(params) => Ok(ThreadResponse {
812 thread_id: "list".to_string(),
813 status: "ok".to_string(),
814 thread: None,
815 threads: self.thread_manager.list_threads(¶ms)?,
816 model: None,
817 model_provider: None,
818 cwd: None,
819 approval_policy: None,
820 sandbox: None,
821 events: Vec::new(),
822 data: json!({}),
823 }),
824 ThreadRequest::Read(params) => {
825 let id = params.thread_id.clone();
826 let data = self.persisted_thread_data(&id)?;
827 Ok(ThreadResponse {
828 thread_id: id,
829 status: "ok".to_string(),
830 thread: self.thread_manager.read_thread(¶ms)?,
831 threads: Vec::new(),
832 model: None,
833 model_provider: None,
834 cwd: None,
835 approval_policy: None,
836 sandbox: None,
837 events: Vec::new(),
838 data,
839 })
840 }
841 ThreadRequest::SetName(params) => Ok(ThreadResponse {
842 thread_id: params.thread_id.clone(),
843 status: "ok".to_string(),
844 thread: self.thread_manager.set_thread_name(¶ms)?,
845 threads: Vec::new(),
846 model: None,
847 model_provider: None,
848 cwd: None,
849 approval_policy: None,
850 sandbox: None,
851 events: Vec::new(),
852 data: json!({}),
853 }),
854 ThreadRequest::Archive { thread_id } => {
855 self.thread_manager.archive_thread(&thread_id)?;
856 Ok(ThreadResponse {
857 thread_id,
858 status: "archived".to_string(),
859 thread: None,
860 threads: Vec::new(),
861 model: None,
862 model_provider: None,
863 cwd: None,
864 approval_policy: None,
865 sandbox: None,
866 events: Vec::new(),
867 data: json!({}),
868 })
869 }
870 ThreadRequest::Unarchive { thread_id } => {
871 self.thread_manager.unarchive_thread(&thread_id)?;
872 Ok(ThreadResponse {
873 thread_id,
874 status: "unarchived".to_string(),
875 thread: None,
876 threads: Vec::new(),
877 model: None,
878 model_provider: None,
879 cwd: None,
880 approval_policy: None,
881 sandbox: None,
882 events: Vec::new(),
883 data: json!({}),
884 })
885 }
886 ThreadRequest::Message { thread_id, input } => {
887 self.thread_manager.touch_message(&thread_id, &input)?;
888 let response_id = format!("{thread_id}:{}", input.len());
889 self.hooks
890 .emit(HookEvent::ResponseStart {
891 response_id: response_id.clone(),
892 })
893 .await;
894 self.hooks
895 .emit(HookEvent::ResponseEnd {
896 response_id: response_id.clone(),
897 })
898 .await;
899
900 Ok(ThreadResponse {
901 thread_id,
902 status: "accepted".to_string(),
903 thread: None,
904 threads: Vec::new(),
905 model: None,
906 model_provider: None,
907 cwd: None,
908 approval_policy: None,
909 sandbox: None,
910 events: vec![
911 EventFrame::ResponseStart {
912 response_id: response_id.clone(),
913 },
914 EventFrame::ResponseDelta {
915 response_id: response_id.clone(),
916 delta: "queued".to_string(),
917 channel: ResponseChannel::Text,
918 },
919 EventFrame::ResponseEnd { response_id },
920 ],
921 data: json!({}),
922 })
923 }
924 }
925 }
926
927 pub async fn handle_prompt(
928 &mut self,
929 req: PromptRequest,
930 cli_overrides: &CliRuntimeOverrides,
931 ) -> Result<PromptResponse> {
932 let resolved = self.config.resolve_runtime_options(cli_overrides);
933 let requested_model = req.model.clone().unwrap_or_else(|| resolved.model.clone());
934 let selection = self
935 .model_registry
936 .resolve(Some(&requested_model), Some(resolved.provider));
937 let resolved_model = selection.resolved.id.clone();
938 let response_id = format!("resp-{}", Uuid::new_v4());
939
940 self.hooks
941 .emit(HookEvent::ResponseStart {
942 response_id: response_id.clone(),
943 })
944 .await;
945 self.hooks
946 .emit(HookEvent::ResponseDelta {
947 response_id: response_id.clone(),
948 delta: "model-selected".to_string(),
949 })
950 .await;
951 self.hooks
952 .emit(HookEvent::ResponseEnd {
953 response_id: response_id.clone(),
954 })
955 .await;
956
957 let payload = json!({
958 "provider": resolved.provider.as_str(),
959 "model": resolved_model.clone(),
960 "prompt": req.prompt,
961 "telemetry": resolved.telemetry,
962 "base_url": resolved.base_url,
963 "has_api_key": resolved.api_key.as_ref().is_some_and(|k| !k.trim().is_empty()),
964 "approval_policy": resolved.approval_policy,
965 "sandbox_mode": resolved.sandbox_mode
966 });
967 if let Some(thread_id) = req.thread_id.as_ref() {
968 self.thread_manager.touch_message(thread_id, &req.prompt)?;
969 let assistant_message_id = self.thread_manager.store.append_message(
970 thread_id,
971 "assistant",
972 &payload.to_string(),
973 Some(payload.clone()),
974 )?;
975 self.persist_latest_checkpoint(
976 thread_id,
977 "prompt_response",
978 json!({
979 "response_id": response_id.clone(),
980 "model": resolved_model.clone(),
981 "provider": resolved.provider.as_str(),
982 "assistant_message_id": assistant_message_id
983 }),
984 )?;
985 }
986
987 Ok(PromptResponse {
988 output: payload.to_string(),
989 model: resolved_model,
990 events: vec![
991 EventFrame::ResponseStart {
992 response_id: response_id.clone(),
993 },
994 EventFrame::ResponseDelta {
995 response_id: response_id.clone(),
996 delta: "model-selected".to_string(),
997 channel: ResponseChannel::Text,
998 },
999 EventFrame::ResponseEnd { response_id },
1000 ],
1001 })
1002 }
1003
1004 pub async fn invoke_tool(
1005 &self,
1006 call: ToolCall,
1007 approval_mode: AskForApproval,
1008 cwd: &Path,
1009 ) -> Result<Value> {
1010 let fallback_cwd = cwd.display().to_string();
1011 let (command, policy_cwd, execution_kind) = call.execution_subject(&fallback_cwd);
1012 let decision = self.exec_policy.check(ExecPolicyContext {
1013 command: &command,
1014 cwd: &policy_cwd,
1015 ask_for_approval: approval_mode,
1016 sandbox_mode: None,
1017 })?;
1018 let precheck = policy_precheck_payload(&decision, &command, &policy_cwd, execution_kind);
1019 let response_id = format!("tool-{}", Uuid::new_v4());
1020 let call_id = call
1021 .raw_tool_call_id
1022 .clone()
1023 .unwrap_or_else(|| format!("tool-call-{}", Uuid::new_v4()));
1024 self.hooks
1025 .emit(HookEvent::ToolLifecycle {
1026 response_id: response_id.clone(),
1027 tool_name: call.name.clone(),
1028 phase: "precheck".to_string(),
1029 payload: precheck.clone(),
1030 })
1031 .await;
1032
1033 if !decision.allow {
1034 let reason = decision.reason().to_string();
1035 let approval_id = format!("approval-{}", Uuid::new_v4());
1036 let error_frame = EventFrame::Error {
1037 response_id: response_id.clone(),
1038 message: reason.clone(),
1039 };
1040 self.hooks
1041 .emit(HookEvent::ApprovalLifecycle {
1042 approval_id,
1043 phase: "denied".to_string(),
1044 reason: Some(reason.clone()),
1045 })
1046 .await;
1047 self.hooks
1048 .emit(HookEvent::GenericEventFrame {
1049 frame: error_frame.clone(),
1050 })
1051 .await;
1052 return Ok(json!({
1053 "ok": false,
1054 "status": "denied",
1055 "execution_kind": execution_kind,
1056 "response_id": response_id,
1057 "precheck": precheck,
1058 "error": reason,
1059 "events": [event_frame_payload(&error_frame)],
1060 }));
1061 }
1062
1063 if decision.requires_approval {
1064 let approval_id = format!("approval-{}", Uuid::new_v4());
1065 let reason = decision.reason().to_string();
1066 let maybe_approval_frame = approval_request_frame(
1067 &decision.requirement,
1068 call_id,
1069 approval_id.clone(),
1070 response_id.clone(),
1071 command.clone(),
1072 policy_cwd.clone(),
1073 );
1074 self.hooks
1075 .emit(HookEvent::ApprovalLifecycle {
1076 approval_id: approval_id.clone(),
1077 phase: "requested".to_string(),
1078 reason: Some(reason.clone()),
1079 })
1080 .await;
1081 let mut events = Vec::new();
1082 if let Some(frame) = maybe_approval_frame {
1083 self.hooks
1084 .emit(HookEvent::GenericEventFrame {
1085 frame: frame.clone(),
1086 })
1087 .await;
1088 events.push(event_frame_payload(&frame));
1089 }
1090 return Ok(json!({
1091 "ok": false,
1092 "status": "approval_required",
1093 "execution_kind": execution_kind,
1094 "response_id": response_id,
1095 "approval_id": approval_id,
1096 "precheck": precheck,
1097 "error": reason,
1098 "events": events,
1099 }));
1100 }
1101
1102 let start_frame = EventFrame::ToolCallStart {
1103 response_id: response_id.clone(),
1104 tool_name: call.name.clone(),
1105 arguments: tool_payload_value(&call.payload),
1106 };
1107 self.hooks
1108 .emit(HookEvent::GenericEventFrame {
1109 frame: start_frame.clone(),
1110 })
1111 .await;
1112 self.hooks
1113 .emit(HookEvent::ToolLifecycle {
1114 response_id: response_id.clone(),
1115 tool_name: call.name.clone(),
1116 phase: "dispatching".to_string(),
1117 payload: json!({
1118 "call_id": call_id,
1119 "execution_kind": execution_kind
1120 }),
1121 })
1122 .await;
1123
1124 match self.tool_registry.dispatch(call.clone(), true).await {
1125 Ok(tool_output) => {
1126 let result_frame = EventFrame::ToolCallResult {
1127 response_id: response_id.clone(),
1128 tool_name: call.name.clone(),
1129 output: tool_output_value(&tool_output),
1130 };
1131 self.hooks
1132 .emit(HookEvent::GenericEventFrame {
1133 frame: result_frame.clone(),
1134 })
1135 .await;
1136 self.hooks
1137 .emit(HookEvent::ToolLifecycle {
1138 response_id: response_id.clone(),
1139 tool_name: call.name,
1140 phase: "completed".to_string(),
1141 payload: json!({ "ok": true }),
1142 })
1143 .await;
1144 Ok(json!({
1145 "ok": true,
1146 "status": "completed",
1147 "execution_kind": execution_kind,
1148 "response_id": response_id,
1149 "precheck": precheck,
1150 "output": tool_output,
1151 "events": [
1152 event_frame_payload(&start_frame),
1153 event_frame_payload(&result_frame)
1154 ]
1155 }))
1156 }
1157 Err(err) => {
1158 let message = format!("{err:?}");
1159 let error_frame = EventFrame::Error {
1160 response_id: response_id.clone(),
1161 message: message.clone(),
1162 };
1163 self.hooks
1164 .emit(HookEvent::GenericEventFrame {
1165 frame: error_frame.clone(),
1166 })
1167 .await;
1168 self.hooks
1169 .emit(HookEvent::ToolLifecycle {
1170 response_id: response_id.clone(),
1171 tool_name: call.name,
1172 phase: "failed".to_string(),
1173 payload: json!({ "error": message.clone() }),
1174 })
1175 .await;
1176 Ok(json!({
1177 "ok": false,
1178 "status": "failed",
1179 "execution_kind": execution_kind,
1180 "response_id": response_id,
1181 "precheck": precheck,
1182 "error": message,
1183 "events": [
1184 event_frame_payload(&start_frame),
1185 event_frame_payload(&error_frame)
1186 ]
1187 }))
1188 }
1189 }
1190 }
1191
1192 pub async fn mcp_startup(&self) -> McpStartupCompleteEvent {
1193 let mut updates = Vec::new();
1194 let summary = self.mcp_manager.start_all(|update| {
1195 updates.push(update);
1196 });
1197 for update in updates {
1198 let status = match update.status {
1199 McpManagerStartupStatus::Starting => codewhale_protocol::McpStartupStatus::Starting,
1200 McpManagerStartupStatus::Ready => codewhale_protocol::McpStartupStatus::Ready,
1201 McpManagerStartupStatus::Failed { error } => {
1202 codewhale_protocol::McpStartupStatus::Failed { error }
1203 }
1204 McpManagerStartupStatus::Cancelled => {
1205 codewhale_protocol::McpStartupStatus::Cancelled
1206 }
1207 };
1208 self.hooks
1209 .emit(HookEvent::GenericEventFrame {
1210 frame: EventFrame::McpStartupUpdate {
1211 update: codewhale_protocol::McpStartupUpdateEvent {
1212 server_name: update.server_name,
1213 status,
1214 },
1215 },
1216 })
1217 .await;
1218 }
1219 self.hooks
1220 .emit(HookEvent::GenericEventFrame {
1221 frame: EventFrame::McpStartupComplete {
1222 summary: codewhale_protocol::McpStartupCompleteEvent {
1223 ready: summary.ready.clone(),
1224 failed: summary
1225 .failed
1226 .iter()
1227 .map(|f| codewhale_protocol::McpStartupFailure {
1228 server_name: f.server_name.clone(),
1229 error: f.error.clone(),
1230 })
1231 .collect(),
1232 cancelled: summary.cancelled.clone(),
1233 },
1234 },
1235 })
1236 .await;
1237 summary
1238 }
1239
1240 pub fn app_status(&self) -> AppResponse {
1241 let jobs = self.jobs.list();
1242 let events = jobs
1243 .iter()
1244 .flat_map(|job| {
1245 job.history.iter().map(|entry| EventFrame::ResponseDelta {
1246 response_id: job.id.clone(),
1247 delta: json!({
1248 "kind": "job_transition",
1249 "job_id": job.id.clone(),
1250 "phase": entry.phase.clone(),
1251 "status": job_status_to_str(entry.status),
1252 "progress": entry.progress,
1253 "detail": entry.detail.clone(),
1254 "retry": job_retry_to_value(&entry.retry),
1255 "at": entry.at
1256 })
1257 .to_string(),
1258 channel: ResponseChannel::Text,
1259 })
1260 })
1261 .collect::<Vec<_>>();
1262 AppResponse {
1263 ok: true,
1264 data: json!({
1265 "jobs": jobs.into_iter().map(|job| {
1266 json!({
1267 "id": job.id,
1268 "name": job.name,
1269 "status": job_status_to_str(job.status),
1270 "progress": job.progress,
1271 "detail": job.detail,
1272 "retry": job_retry_to_value(&job.retry),
1273 "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
1274 })
1275 }).collect::<Vec<_>>()
1276 }),
1277 events,
1278 }
1279 }
1280
1281 pub fn provider_default(&self) -> ProviderKind {
1282 self.config.provider
1283 }
1284
1285 pub fn save_thread_checkpoint(
1286 &self,
1287 thread_id: &str,
1288 checkpoint_id: &str,
1289 state: &Value,
1290 ) -> Result<()> {
1291 self.thread_manager
1292 .state_store()
1293 .save_checkpoint(thread_id, checkpoint_id, state)
1294 }
1295
1296 pub fn load_thread_checkpoint(
1297 &self,
1298 thread_id: &str,
1299 checkpoint_id: Option<&str>,
1300 ) -> Result<Option<Value>> {
1301 Ok(self
1302 .thread_manager
1303 .state_store()
1304 .load_checkpoint(thread_id, checkpoint_id)?
1305 .map(|checkpoint| checkpoint.state))
1306 }
1307
1308 pub fn enqueue_job(&mut self, name: impl Into<String>) -> Result<JobRecord> {
1309 let job = self.jobs.enqueue(name);
1310 self.jobs
1311 .persist_job(self.thread_manager.state_store(), &job.id)?;
1312 Ok(job)
1313 }
1314
1315 pub fn set_job_running(&mut self, job_id: &str) -> Result<()> {
1316 self.jobs.set_running(job_id);
1317 self.jobs
1318 .persist_job(self.thread_manager.state_store(), job_id)
1319 }
1320
1321 pub fn update_job_progress(
1322 &mut self,
1323 job_id: &str,
1324 progress: u8,
1325 detail: Option<String>,
1326 ) -> Result<()> {
1327 self.jobs.update_progress(job_id, progress, detail);
1328 self.jobs
1329 .persist_job(self.thread_manager.state_store(), job_id)
1330 }
1331
1332 pub fn complete_job(&mut self, job_id: &str) -> Result<()> {
1333 self.jobs.complete(job_id);
1334 self.jobs
1335 .persist_job(self.thread_manager.state_store(), job_id)
1336 }
1337
1338 pub fn fail_job(&mut self, job_id: &str, detail: impl Into<String>) -> Result<()> {
1339 self.jobs.fail(job_id, detail);
1340 self.jobs
1341 .persist_job(self.thread_manager.state_store(), job_id)
1342 }
1343
1344 pub fn cancel_job(&mut self, job_id: &str) -> Result<()> {
1345 self.jobs.cancel(job_id);
1346 self.jobs
1347 .persist_job(self.thread_manager.state_store(), job_id)
1348 }
1349
1350 pub fn pause_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1351 self.jobs.pause(job_id, detail);
1352 self.jobs
1353 .persist_job(self.thread_manager.state_store(), job_id)
1354 }
1355
1356 pub fn resume_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1357 self.jobs.resume(job_id, detail);
1358 self.jobs
1359 .persist_job(self.thread_manager.state_store(), job_id)
1360 }
1361
1362 pub fn job_history(&self, job_id: &str) -> Vec<JobHistoryEntry> {
1363 self.jobs.history(job_id)
1364 }
1365}
1366
1367fn thread_response_from_new(status: &str, new: NewThread) -> ThreadResponse {
1368 ThreadResponse {
1369 thread_id: new.thread.id.clone(),
1370 status: status.to_string(),
1371 thread: Some(new.thread),
1372 threads: Vec::new(),
1373 model: Some(new.model),
1374 model_provider: Some(new.model_provider),
1375 cwd: Some(new.cwd),
1376 approval_policy: new.approval_policy,
1377 sandbox: new.sandbox,
1378 events: Vec::new(),
1379 data: json!({}),
1380 }
1381}
1382
1383fn preview_from_initial_history(initial_history: &InitialHistory) -> String {
1384 match initial_history {
1385 InitialHistory::New => "New conversation".to_string(),
1386 InitialHistory::Forked(items) => truncate_preview(
1387 &items
1388 .first()
1389 .map(Value::to_string)
1390 .unwrap_or_else(|| "Forked conversation".to_string()),
1391 ),
1392 InitialHistory::Resumed { history, .. } => truncate_preview(
1393 &history
1394 .first()
1395 .map(Value::to_string)
1396 .unwrap_or_else(|| "Resumed conversation".to_string()),
1397 ),
1398 }
1399}
1400
1401fn truncate_preview(value: &str) -> String {
1402 value.chars().take(120).collect()
1403}
1404
1405fn to_protocol_thread(thread: ThreadMetadata) -> Thread {
1406 Thread {
1407 id: thread.id,
1408 preview: thread.preview,
1409 ephemeral: thread.ephemeral,
1410 model_provider: thread.model_provider,
1411 created_at: thread.created_at,
1412 updated_at: thread.updated_at,
1413 status: match thread.status {
1414 PersistedThreadStatus::Running => ThreadStatus::Running,
1415 PersistedThreadStatus::Idle => ThreadStatus::Idle,
1416 PersistedThreadStatus::Completed => ThreadStatus::Completed,
1417 PersistedThreadStatus::Failed => ThreadStatus::Failed,
1418 PersistedThreadStatus::Paused => ThreadStatus::Paused,
1419 PersistedThreadStatus::Archived => ThreadStatus::Archived,
1420 },
1421 path: thread.path,
1422 cwd: thread.cwd,
1423 cli_version: thread.cli_version,
1424 source: match thread.source {
1425 SessionSource::Interactive => codewhale_protocol::SessionSource::Interactive,
1426 SessionSource::Resume => codewhale_protocol::SessionSource::Resume,
1427 SessionSource::Fork => codewhale_protocol::SessionSource::Fork,
1428 SessionSource::Api => codewhale_protocol::SessionSource::Api,
1429 SessionSource::Unknown => codewhale_protocol::SessionSource::Unknown,
1430 },
1431 name: thread.name,
1432 }
1433}
1434
1435fn to_persisted_status(status: &ThreadStatus) -> PersistedThreadStatus {
1436 match status {
1437 ThreadStatus::Running => PersistedThreadStatus::Running,
1438 ThreadStatus::Idle => PersistedThreadStatus::Idle,
1439 ThreadStatus::Completed => PersistedThreadStatus::Completed,
1440 ThreadStatus::Failed => PersistedThreadStatus::Failed,
1441 ThreadStatus::Paused => PersistedThreadStatus::Paused,
1442 ThreadStatus::Archived => PersistedThreadStatus::Archived,
1443 }
1444}
1445
1446fn to_persisted_source(source: &codewhale_protocol::SessionSource) -> SessionSource {
1447 match source {
1448 codewhale_protocol::SessionSource::Interactive => SessionSource::Interactive,
1449 codewhale_protocol::SessionSource::Resume => SessionSource::Resume,
1450 codewhale_protocol::SessionSource::Fork => SessionSource::Fork,
1451 codewhale_protocol::SessionSource::Api => SessionSource::Api,
1452 codewhale_protocol::SessionSource::Unknown => SessionSource::Unknown,
1453 }
1454}
1455
1456fn approval_request_frame(
1457 requirement: &ExecApprovalRequirement,
1458 call_id: String,
1459 approval_id: String,
1460 turn_id: String,
1461 command: String,
1462 cwd: String,
1463) -> Option<EventFrame> {
1464 let ExecApprovalRequirement::NeedsApproval {
1465 reason,
1466 proposed_execpolicy_amendment,
1467 proposed_network_policy_amendments,
1468 } = requirement
1469 else {
1470 return None;
1471 };
1472
1473 let mut available_decisions = vec![
1474 ReviewDecision::Approved,
1475 ReviewDecision::ApprovedForSession,
1476 ReviewDecision::Denied,
1477 ReviewDecision::Abort,
1478 ];
1479 if proposed_execpolicy_amendment
1480 .as_ref()
1481 .is_some_and(|amendment| !amendment.prefixes.is_empty())
1482 {
1483 available_decisions.push(ReviewDecision::ApprovedExecpolicyAmendment);
1484 }
1485 available_decisions.extend(proposed_network_policy_amendments.iter().cloned().map(
1486 |amendment| ReviewDecision::NetworkPolicyAmendment {
1487 host: amendment.host,
1488 action: amendment.action,
1489 },
1490 ));
1491
1492 Some(EventFrame::ExecApprovalRequest {
1493 request: ExecApprovalRequestEvent {
1494 call_id,
1495 approval_id,
1496 turn_id,
1497 command,
1498 cwd,
1499 reason: reason.clone(),
1500 network_approval_context: None,
1501 proposed_execpolicy_amendment: proposed_execpolicy_amendment
1502 .as_ref()
1503 .map(|amendment| amendment.prefixes.clone())
1504 .unwrap_or_default(),
1505 proposed_network_policy_amendments: proposed_network_policy_amendments.clone(),
1506 additional_permissions: Vec::new(),
1507 available_decisions,
1508 },
1509 })
1510}
1511
1512fn approval_requirement_payload(requirement: &ExecApprovalRequirement) -> Value {
1513 match requirement {
1514 ExecApprovalRequirement::Skip {
1515 bypass_sandbox,
1516 proposed_execpolicy_amendment,
1517 } => json!({
1518 "type": "skip",
1519 "bypass_sandbox": bypass_sandbox,
1520 "reason": requirement.reason(),
1521 "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1522 .as_ref()
1523 .map(|amendment| amendment.prefixes.clone())
1524 .unwrap_or_default()
1525 }),
1526 ExecApprovalRequirement::NeedsApproval {
1527 reason,
1528 proposed_execpolicy_amendment,
1529 proposed_network_policy_amendments,
1530 } => json!({
1531 "type": "needs_approval",
1532 "reason": reason,
1533 "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1534 .as_ref()
1535 .map(|amendment| amendment.prefixes.clone())
1536 .unwrap_or_default(),
1537 "proposed_network_policy_amendments": proposed_network_policy_amendments
1538 }),
1539 ExecApprovalRequirement::Forbidden { reason } => json!({
1540 "type": "forbidden",
1541 "reason": reason
1542 }),
1543 }
1544}
1545
1546fn policy_precheck_payload(
1547 decision: &ExecPolicyDecision,
1548 command: &str,
1549 cwd: &str,
1550 execution_kind: &str,
1551) -> Value {
1552 json!({
1553 "execution_kind": execution_kind,
1554 "command": command,
1555 "cwd": cwd,
1556 "allow": decision.allow,
1557 "requires_approval": decision.requires_approval,
1558 "matched_rule": decision.matched_rule.clone(),
1559 "phase": decision.requirement.phase(),
1560 "reason": decision.reason(),
1561 "requirement": approval_requirement_payload(&decision.requirement)
1562 })
1563}
1564
1565fn tool_payload_value(payload: &ToolPayload) -> Value {
1566 serde_json::to_value(payload).unwrap_or_else(
1567 |_| json!({"type":"serialization_error","message":"tool payload unavailable"}),
1568 )
1569}
1570
1571fn tool_output_value(output: &codewhale_protocol::ToolOutput) -> Value {
1572 serde_json::to_value(output).unwrap_or_else(
1573 |_| json!({"type":"serialization_error","message":"tool output unavailable"}),
1574 )
1575}
1576
1577fn event_frame_payload(frame: &EventFrame) -> Value {
1578 serde_json::to_value(frame)
1579 .unwrap_or_else(|_| json!({"event":"error","message":"failed to encode event frame"}))
1580}
1581
1582fn json_optional_string(value: &Value) -> Option<String> {
1583 if value.is_null() {
1584 None
1585 } else {
1586 value.as_str().map(ToString::to_string)
1587 }
1588}
1589
1590fn parse_retry_metadata(value: Option<&Value>) -> JobRetryMetadata {
1591 let Some(value) = value else {
1592 return JobRetryMetadata::default();
1593 };
1594 JobRetryMetadata {
1595 attempt: value
1596 .get("attempt")
1597 .and_then(Value::as_u64)
1598 .unwrap_or(0)
1599 .min(u32::MAX as u64) as u32,
1600 max_attempts: value
1601 .get("max_attempts")
1602 .and_then(Value::as_u64)
1603 .unwrap_or(DEFAULT_JOB_MAX_ATTEMPTS as u64)
1604 .min(u32::MAX as u64) as u32,
1605 backoff_base_ms: value
1606 .get("backoff_base_ms")
1607 .and_then(Value::as_u64)
1608 .unwrap_or(DEFAULT_JOB_BACKOFF_BASE_MS),
1609 next_backoff_ms: value
1610 .get("next_backoff_ms")
1611 .and_then(Value::as_u64)
1612 .unwrap_or(0),
1613 next_retry_at: value.get("next_retry_at").and_then(Value::as_i64),
1614 }
1615}
1616
1617fn parse_history_entry(value: &Value) -> Option<JobHistoryEntry> {
1618 let status = value
1619 .get("status")
1620 .and_then(Value::as_str)
1621 .and_then(job_status_from_str)?;
1622 Some(JobHistoryEntry {
1623 at: value.get("at").and_then(Value::as_i64).unwrap_or(0),
1624 phase: value
1625 .get("phase")
1626 .and_then(Value::as_str)
1627 .unwrap_or("unknown")
1628 .to_string(),
1629 status,
1630 progress: value
1631 .get("progress")
1632 .and_then(Value::as_u64)
1633 .map(|v| v.min(u8::MAX as u64) as u8),
1634 detail: value.get("detail").and_then(json_optional_string),
1635 retry: parse_retry_metadata(value.get("retry")),
1636 })
1637}
1638
1639fn job_status_to_str(status: JobStatus) -> &'static str {
1640 match status {
1641 JobStatus::Queued => "queued",
1642 JobStatus::Running => "running",
1643 JobStatus::Paused => "paused",
1644 JobStatus::Completed => "completed",
1645 JobStatus::Failed => "failed",
1646 JobStatus::Cancelled => "cancelled",
1647 }
1648}
1649
1650fn job_status_from_str(value: &str) -> Option<JobStatus> {
1651 match value {
1652 "queued" => Some(JobStatus::Queued),
1653 "running" => Some(JobStatus::Running),
1654 "paused" => Some(JobStatus::Paused),
1655 "completed" => Some(JobStatus::Completed),
1656 "failed" => Some(JobStatus::Failed),
1657 "cancelled" => Some(JobStatus::Cancelled),
1658 _ => None,
1659 }
1660}
1661
1662fn job_retry_to_value(retry: &JobRetryMetadata) -> Value {
1663 json!({
1664 "attempt": retry.attempt,
1665 "max_attempts": retry.max_attempts,
1666 "backoff_base_ms": retry.backoff_base_ms,
1667 "next_backoff_ms": retry.next_backoff_ms,
1668 "next_retry_at": retry.next_retry_at
1669 })
1670}
1671
1672fn job_history_to_value(entry: &JobHistoryEntry) -> Value {
1673 json!({
1674 "at": entry.at,
1675 "phase": entry.phase.clone(),
1676 "status": job_status_to_str(entry.status),
1677 "progress": entry.progress,
1678 "detail": entry.detail.clone(),
1679 "retry": job_retry_to_value(&entry.retry)
1680 })
1681}
1682
1683fn runtime_status_to_job_state(status: JobStatus) -> JobStateStatus {
1684 match status {
1685 JobStatus::Queued => JobStateStatus::Queued,
1686 JobStatus::Running => JobStateStatus::Running,
1687 JobStatus::Paused => JobStateStatus::Running,
1688 JobStatus::Completed => JobStateStatus::Completed,
1689 JobStatus::Failed => JobStateStatus::Failed,
1690 JobStatus::Cancelled => JobStateStatus::Cancelled,
1691 }
1692}
1693
1694fn job_state_status_to_runtime(status: JobStateStatus) -> JobStatus {
1695 match status {
1696 JobStateStatus::Queued => JobStatus::Queued,
1697 JobStateStatus::Running => JobStatus::Running,
1698 JobStateStatus::Completed => JobStatus::Completed,
1699 JobStateStatus::Failed => JobStatus::Failed,
1700 JobStateStatus::Cancelled => JobStatus::Cancelled,
1701 }
1702}