1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use anyhow::Result;
6use codewhale_agent::ModelRegistry;
7use codewhale_config::{CliRuntimeOverrides, ConfigToml, ProviderKind};
8use codewhale_execpolicy::{
9 AskForApproval, ExecApprovalRequirement, ExecPolicyContext, ExecPolicyDecision,
10 ExecPolicyEngine,
11};
12use codewhale_hooks::{HookDispatcher, HookEvent};
13use codewhale_mcp::{
14 McpManager, McpStartupCompleteEvent, McpStartupStatus as McpManagerStartupStatus,
15};
16use codewhale_protocol::{
17 AppResponse, EventFrame, ExecApprovalRequestEvent, PromptRequest, PromptResponse,
18 ResponseChannel, ReviewDecision, Thread, ThreadForkParams, ThreadListParams, ThreadReadParams,
19 ThreadRequest, ThreadResponse, ThreadResumeParams, ThreadSetNameParams, ThreadStatus,
20 ToolPayload,
21};
22use codewhale_state::{
23 JobStateRecord, JobStateStatus, SessionSource, StateStore, ThreadListFilters, ThreadMetadata,
24 ThreadStatus as PersistedThreadStatus,
25};
26use codewhale_tools::{ToolCall, ToolRegistry};
27use serde_json::{Value, json};
28use uuid::Uuid;
29
30#[derive(Debug, Clone)]
32pub enum InitialHistory {
33 New,
35 Forked(Vec<Value>),
37 Resumed {
39 conversation_id: String,
40 history: Vec<Value>,
41 rollout_path: PathBuf,
42 },
43}
44
45#[derive(Debug, Clone)]
47pub struct NewThread {
48 pub thread: Thread,
50 pub model: String,
52 pub model_provider: String,
54 pub cwd: PathBuf,
56 pub approval_policy: Option<String>,
58 pub sandbox: Option<String>,
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum JobStatus {
65 Queued,
67 Running,
69 Paused,
71 Completed,
73 Failed,
75 Cancelled,
77}
78
79const JOB_DETAIL_SCHEMA_VERSION: u8 = 1;
80const DEFAULT_JOB_MAX_ATTEMPTS: u32 = 3;
81const DEFAULT_JOB_BACKOFF_BASE_MS: u64 = 500;
82const MAX_JOB_HISTORY_ENTRIES: usize = 64;
83
84#[derive(Debug, Clone)]
86pub struct JobRetryMetadata {
87 pub attempt: u32,
89 pub max_attempts: u32,
91 pub backoff_base_ms: u64,
93 pub next_backoff_ms: u64,
95 pub next_retry_at: Option<i64>,
97}
98
99impl Default for JobRetryMetadata {
100 fn default() -> Self {
101 Self {
102 attempt: 0,
103 max_attempts: DEFAULT_JOB_MAX_ATTEMPTS,
104 backoff_base_ms: DEFAULT_JOB_BACKOFF_BASE_MS,
105 next_backoff_ms: 0,
106 next_retry_at: None,
107 }
108 }
109}
110
111#[derive(Debug, Clone)]
113pub struct JobHistoryEntry {
114 pub at: i64,
116 pub phase: String,
118 pub status: JobStatus,
120 pub progress: Option<u8>,
122 pub detail: Option<String>,
124 pub retry: JobRetryMetadata,
126}
127
128#[derive(Debug, Clone)]
129struct PersistedJobDetail {
130 pub status: JobStatus,
131 pub detail: Option<String>,
132 pub retry: JobRetryMetadata,
133 pub history: Vec<JobHistoryEntry>,
134}
135
136#[derive(Debug, Clone)]
138pub struct JobRecord {
139 pub id: String,
141 pub name: String,
143 pub status: JobStatus,
145 pub progress: Option<u8>,
147 pub detail: Option<String>,
149 pub retry: JobRetryMetadata,
151 pub history: Vec<JobHistoryEntry>,
153 pub created_at: i64,
155 pub updated_at: i64,
157}
158
159#[derive(Debug, Default)]
161pub struct JobManager {
162 jobs: HashMap<String, JobRecord>,
163}
164
165impl JobManager {
166 fn now_ts() -> i64 {
167 chrono::Utc::now().timestamp()
168 }
169
170 fn deterministic_backoff_ms(retry: &JobRetryMetadata) -> u64 {
171 if retry.attempt == 0 {
172 return 0;
173 }
174 let exponent = retry.attempt.saturating_sub(1).min(20);
175 let multiplier = 1u64.checked_shl(exponent).unwrap_or(u64::MAX);
176 retry.backoff_base_ms.saturating_mul(multiplier)
177 }
178
179 fn clear_retry_schedule(retry: &mut JobRetryMetadata) {
180 retry.next_backoff_ms = 0;
181 retry.next_retry_at = None;
182 }
183
184 fn push_history(job: &mut JobRecord, phase: &str) {
185 job.history.push(JobHistoryEntry {
186 at: job.updated_at,
187 phase: phase.to_string(),
188 status: job.status,
189 progress: job.progress,
190 detail: job.detail.clone(),
191 retry: job.retry.clone(),
192 });
193 if job.history.len() > MAX_JOB_HISTORY_ENTRIES {
194 let to_drain = job.history.len() - MAX_JOB_HISTORY_ENTRIES;
195 job.history.drain(0..to_drain);
196 }
197 }
198
199 fn parse_persisted_detail(raw: Option<&str>) -> Option<PersistedJobDetail> {
200 let raw = raw?;
201 let parsed: Value = serde_json::from_str(raw).ok()?;
202 let status = parsed
203 .get("status")
204 .and_then(Value::as_str)
205 .and_then(job_status_from_str)?;
206 let detail = parsed.get("detail").and_then(json_optional_string);
207 let retry = parse_retry_metadata(parsed.get("retry"));
208 let history = parsed
209 .get("history")
210 .and_then(Value::as_array)
211 .map(|items| {
212 items
213 .iter()
214 .filter_map(parse_history_entry)
215 .collect::<Vec<_>>()
216 })
217 .unwrap_or_default();
218 Some(PersistedJobDetail {
219 status,
220 detail,
221 retry,
222 history,
223 })
224 }
225
226 fn encode_persisted_detail(job: &JobRecord) -> Result<Option<String>> {
227 let encoded = json!({
228 "schema_version": JOB_DETAIL_SCHEMA_VERSION,
229 "status": job_status_to_str(job.status),
230 "detail": job.detail.clone(),
231 "retry": job_retry_to_value(&job.retry),
232 "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
233 })
234 .to_string();
235 Ok(Some(encoded))
236 }
237
238 pub fn enqueue(&mut self, name: impl Into<String>) -> JobRecord {
240 let now = Self::now_ts();
241 let id = format!("job-{}", Uuid::new_v4());
242 let mut job = JobRecord {
243 id: id.clone(),
244 name: name.into(),
245 status: JobStatus::Queued,
246 progress: Some(0),
247 detail: None,
248 retry: JobRetryMetadata::default(),
249 history: Vec::new(),
250 created_at: now,
251 updated_at: now,
252 };
253 Self::push_history(&mut job, "created");
254 self.jobs.insert(id, job.clone());
255 job
256 }
257
258 pub fn set_running(&mut self, id: &str) {
260 if let Some(job) = self.jobs.get_mut(id) {
261 job.status = JobStatus::Running;
262 Self::clear_retry_schedule(&mut job.retry);
263 job.updated_at = Self::now_ts();
264 Self::push_history(job, "running");
265 }
266 }
267
268 pub fn update_progress(&mut self, id: &str, progress: u8, detail: Option<String>) {
270 if let Some(job) = self.jobs.get_mut(id) {
271 job.progress = Some(progress.min(100));
272 job.detail = detail;
273 job.updated_at = Self::now_ts();
274 Self::push_history(job, "progress_updated");
275 }
276 }
277
278 pub fn complete(&mut self, id: &str) {
280 if let Some(job) = self.jobs.get_mut(id) {
281 job.status = JobStatus::Completed;
282 job.progress = Some(100);
283 Self::clear_retry_schedule(&mut job.retry);
284 job.updated_at = Self::now_ts();
285 Self::push_history(job, "completed");
286 }
287 }
288
289 pub fn fail(&mut self, id: &str, detail: impl Into<String>) {
291 if let Some(job) = self.jobs.get_mut(id) {
292 let now = Self::now_ts();
293 job.status = JobStatus::Failed;
294 job.detail = Some(detail.into());
295 if job.retry.attempt < job.retry.max_attempts {
296 job.retry.attempt += 1;
297 job.retry.next_backoff_ms = Self::deterministic_backoff_ms(&job.retry);
298 let delay_secs = ((job.retry.next_backoff_ms.saturating_add(999)) / 1000)
299 .min(i64::MAX as u64) as i64;
300 job.retry.next_retry_at = Some(now.saturating_add(delay_secs));
301 } else {
302 Self::clear_retry_schedule(&mut job.retry);
303 }
304 job.updated_at = now;
305 Self::push_history(job, "failed");
306 }
307 }
308
309 pub fn cancel(&mut self, id: &str) {
311 if let Some(job) = self.jobs.get_mut(id) {
312 job.status = JobStatus::Cancelled;
313 Self::clear_retry_schedule(&mut job.retry);
314 job.updated_at = Self::now_ts();
315 Self::push_history(job, "cancelled");
316 }
317 }
318
319 pub fn pause(&mut self, id: &str, detail: Option<String>) {
321 if let Some(job) = self.jobs.get_mut(id) {
322 job.status = JobStatus::Paused;
323 if detail.is_some() {
324 job.detail = detail;
325 }
326 job.updated_at = Self::now_ts();
327 Self::push_history(job, "paused");
328 }
329 }
330
331 pub fn resume(&mut self, id: &str, detail: Option<String>) {
333 if let Some(job) = self.jobs.get_mut(id) {
334 job.status = JobStatus::Running;
335 if detail.is_some() {
336 job.detail = detail;
337 }
338 Self::clear_retry_schedule(&mut job.retry);
339 job.updated_at = Self::now_ts();
340 Self::push_history(job, "resumed");
341 }
342 }
343
344 pub fn list(&self) -> Vec<JobRecord> {
346 let mut out = self.jobs.values().cloned().collect::<Vec<_>>();
347 out.sort_by_key(|job| std::cmp::Reverse(job.updated_at));
348 out
349 }
350
351 pub fn history(&self, id: &str) -> Vec<JobHistoryEntry> {
353 self.jobs
354 .get(id)
355 .map(|job| job.history.clone())
356 .unwrap_or_default()
357 }
358
359 pub fn resume_pending(&mut self) -> Vec<JobRecord> {
361 let mut resumed = Vec::new();
362 for job in self.jobs.values_mut() {
363 if matches!(job.status, JobStatus::Queued | JobStatus::Running) {
364 job.status = JobStatus::Queued;
365 job.updated_at = Self::now_ts();
366 Self::push_history(job, "queued_after_resume");
367 resumed.push(job.clone());
368 }
369 }
370 resumed
371 }
372
373 pub fn load_from_store(&mut self, store: &StateStore) -> Result<()> {
375 let persisted = store.list_jobs(Some(500))?;
376 for job in persisted {
377 let fallback_status = job_state_status_to_runtime(job.status);
378 let parsed = Self::parse_persisted_detail(job.detail.as_deref());
379 let (status, detail, retry, history) = if let Some(detail_state) = parsed {
380 (
381 detail_state.status,
382 detail_state.detail,
383 detail_state.retry,
384 detail_state.history,
385 )
386 } else {
387 (
388 fallback_status,
389 job.detail,
390 JobRetryMetadata::default(),
391 Vec::new(),
392 )
393 };
394 self.jobs.insert(
395 job.id.clone(),
396 JobRecord {
397 id: job.id,
398 name: job.name,
399 status,
400 progress: job.progress,
401 detail,
402 retry,
403 history,
404 created_at: job.created_at,
405 updated_at: job.updated_at,
406 },
407 );
408 }
409 Ok(())
410 }
411
412 pub fn persist_job(&self, store: &StateStore, id: &str) -> Result<()> {
414 let Some(job) = self.jobs.get(id) else {
415 return Ok(());
416 };
417 let encoded_detail = Self::encode_persisted_detail(job)?;
418 store.upsert_job(&JobStateRecord {
419 id: job.id.clone(),
420 name: job.name.clone(),
421 status: runtime_status_to_job_state(job.status),
422 progress: job.progress,
423 detail: encoded_detail,
424 created_at: job.created_at,
425 updated_at: job.updated_at,
426 })
427 }
428
429 pub fn persist_all(&self, store: &StateStore) -> Result<()> {
431 for id in self.jobs.keys() {
432 self.persist_job(store, id)?;
433 }
434 Ok(())
435 }
436}
437
438pub struct ThreadManager {
440 store: StateStore,
441 running_threads: HashMap<String, Thread>,
442 cli_version: String,
443}
444
445impl ThreadManager {
446 pub fn new(store: StateStore) -> Self {
448 Self {
449 store,
450 running_threads: HashMap::new(),
451 cli_version: env!("CARGO_PKG_VERSION").to_string(),
452 }
453 }
454
455 pub fn state_store(&self) -> &StateStore {
457 &self.store
458 }
459
460 pub fn spawn_thread_with_history(
462 &mut self,
463 model_provider: String,
464 cwd: PathBuf,
465 initial_history: InitialHistory,
466 persist_extended_history: bool,
467 ) -> Result<NewThread> {
468 let id = format!("thread-{}", Uuid::new_v4());
469 let now = chrono::Utc::now().timestamp();
470 let preview = preview_from_initial_history(&initial_history);
471 let source = match initial_history {
472 InitialHistory::New => SessionSource::Interactive,
473 InitialHistory::Forked(_) => SessionSource::Fork,
474 InitialHistory::Resumed { .. } => SessionSource::Resume,
475 };
476 let thread = Thread {
477 id: id.clone(),
478 preview,
479 ephemeral: !persist_extended_history,
480 model_provider: model_provider.clone(),
481 created_at: now,
482 updated_at: now,
483 status: ThreadStatus::Running,
484 path: None,
485 cwd: cwd.clone(),
486 cli_version: self.cli_version.clone(),
487 source: match source {
488 SessionSource::Interactive => codewhale_protocol::SessionSource::Interactive,
489 SessionSource::Resume => codewhale_protocol::SessionSource::Resume,
490 SessionSource::Fork => codewhale_protocol::SessionSource::Fork,
491 SessionSource::Api => codewhale_protocol::SessionSource::Api,
492 SessionSource::Unknown => codewhale_protocol::SessionSource::Unknown,
493 },
494 name: None,
495 };
496 self.persist_thread(&thread, None)?;
497 match &initial_history {
498 InitialHistory::Forked(items) => {
499 for item in items {
500 self.store.append_message(
501 &thread.id,
502 "history",
503 &item.to_string(),
504 Some(item.clone()),
505 )?;
506 }
507 }
508 InitialHistory::Resumed { history, .. } => {
509 for item in history {
510 self.store.append_message(
511 &thread.id,
512 "history",
513 &item.to_string(),
514 Some(item.clone()),
515 )?;
516 }
517 }
518 InitialHistory::New => {}
519 }
520 self.running_threads
521 .insert(thread.id.clone(), thread.clone());
522 Ok(NewThread {
523 thread,
524 model: "auto".to_string(),
525 model_provider,
526 cwd,
527 approval_policy: None,
528 sandbox: None,
529 })
530 }
531
532 pub fn resume_thread_with_history(
534 &mut self,
535 params: &ThreadResumeParams,
536 fallback_cwd: &Path,
537 model_provider: String,
538 ) -> Result<Option<NewThread>> {
539 if params.history.is_none()
540 && let Some(thread) = self.running_threads.get(¶ms.thread_id).cloned()
541 {
542 return Ok(Some(NewThread {
543 model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
544 model_provider: params.model_provider.clone().unwrap_or(model_provider),
545 cwd: params.cwd.clone().unwrap_or_else(|| thread.cwd.clone()),
546 approval_policy: params.approval_policy.clone(),
547 sandbox: params.sandbox.clone(),
548 thread,
549 }));
550 }
551
552 let persisted = self.store.get_thread(¶ms.thread_id)?;
553 let Some(metadata) = persisted else {
554 return Ok(None);
555 };
556 let mut thread = to_protocol_thread(metadata);
557 thread.status = ThreadStatus::Running;
558 thread.updated_at = chrono::Utc::now().timestamp();
559 thread.cwd = params
560 .cwd
561 .clone()
562 .unwrap_or_else(|| fallback_cwd.to_path_buf());
563 self.persist_thread(&thread, None)?;
564 self.running_threads
565 .insert(thread.id.clone(), thread.clone());
566 if let Some(history) = params.history.as_ref() {
567 for item in history {
568 self.store.append_message(
569 &thread.id,
570 "history",
571 &item.to_string(),
572 Some(item.clone()),
573 )?;
574 }
575 }
576
577 Ok(Some(NewThread {
578 model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
579 model_provider: params.model_provider.clone().unwrap_or(model_provider),
580 cwd: thread.cwd.clone(),
581 approval_policy: params.approval_policy.clone(),
582 sandbox: params.sandbox.clone(),
583 thread,
584 }))
585 }
586
587 pub fn fork_thread(
589 &mut self,
590 params: &ThreadForkParams,
591 fallback_cwd: &Path,
592 ) -> Result<Option<NewThread>> {
593 let parent = self.store.get_thread(¶ms.thread_id)?;
594 let Some(parent) = parent else {
595 return Ok(None);
596 };
597 let parent_thread = to_protocol_thread(parent);
598 let new = self.spawn_thread_with_history(
599 params
600 .model_provider
601 .clone()
602 .unwrap_or_else(|| parent_thread.model_provider.clone()),
603 params
604 .cwd
605 .clone()
606 .unwrap_or_else(|| fallback_cwd.to_path_buf()),
607 InitialHistory::Forked(vec![json!({
608 "type": "fork",
609 "from_thread_id": parent_thread.id
610 })]),
611 params.persist_extended_history,
612 )?;
613 Ok(Some(new))
614 }
615
616 pub fn list_threads(&self, params: &ThreadListParams) -> Result<Vec<Thread>> {
618 let list = self.store.list_threads(ThreadListFilters {
619 include_archived: params.include_archived,
620 limit: params.limit,
621 })?;
622 Ok(list.into_iter().map(to_protocol_thread).collect())
623 }
624
625 pub fn read_thread(&self, params: &ThreadReadParams) -> Result<Option<Thread>> {
627 Ok(self
628 .store
629 .get_thread(¶ms.thread_id)?
630 .map(to_protocol_thread))
631 }
632
633 pub fn set_thread_name(&mut self, params: &ThreadSetNameParams) -> Result<Option<Thread>> {
635 let Some(mut metadata) = self.store.get_thread(¶ms.thread_id)? else {
636 return Ok(None);
637 };
638 metadata.name = Some(params.name.clone());
639 metadata.updated_at = chrono::Utc::now().timestamp();
640 self.store.upsert_thread(&metadata)?;
641 let updated = to_protocol_thread(metadata);
642 self.running_threads
643 .insert(updated.id.clone(), updated.clone());
644 Ok(Some(updated))
645 }
646
647 pub fn archive_thread(&mut self, thread_id: &str) -> Result<()> {
649 self.store.mark_archived(thread_id)?;
650 if let Some(thread) = self.running_threads.get_mut(thread_id) {
651 thread.status = ThreadStatus::Archived;
652 }
653 Ok(())
654 }
655
656 pub fn unarchive_thread(&mut self, thread_id: &str) -> Result<()> {
658 self.store.mark_unarchived(thread_id)?;
659 Ok(())
660 }
661
662 pub fn touch_message(&mut self, thread_id: &str, input: &str) -> Result<()> {
664 let Some(mut metadata) = self.store.get_thread(thread_id)? else {
665 return Ok(());
666 };
667 metadata.updated_at = chrono::Utc::now().timestamp();
668 metadata.preview = truncate_preview(input);
669 metadata.status = PersistedThreadStatus::Running;
670 self.store.upsert_thread(&metadata)?;
671 if let Some(thread) = self.running_threads.get_mut(thread_id) {
672 thread.updated_at = metadata.updated_at;
673 thread.preview = metadata.preview;
674 thread.status = ThreadStatus::Running;
675 }
676 let message_id = self.store.append_message(thread_id, "user", input, None)?;
677 self.store.save_checkpoint(
678 thread_id,
679 "latest",
680 &json!({
681 "reason": "thread_message",
682 "message_id": message_id,
683 "role": "user",
684 "preview": truncate_preview(input),
685 "updated_at": metadata.updated_at
686 }),
687 )?;
688 Ok(())
689 }
690
691 fn persist_thread(&self, thread: &Thread, rollout_path: Option<PathBuf>) -> Result<()> {
692 self.store.upsert_thread(&ThreadMetadata {
693 id: thread.id.clone(),
694 rollout_path,
695 preview: thread.preview.clone(),
696 ephemeral: thread.ephemeral,
697 model_provider: thread.model_provider.clone(),
698 created_at: thread.created_at,
699 updated_at: thread.updated_at,
700 status: to_persisted_status(&thread.status),
701 path: thread.path.clone(),
702 cwd: thread.cwd.clone(),
703 cli_version: thread.cli_version.clone(),
704 source: to_persisted_source(&thread.source),
705 name: thread.name.clone(),
706 sandbox_policy: None,
707 approval_mode: None,
708 archived: matches!(thread.status, ThreadStatus::Archived),
709 archived_at: None,
710 git_sha: None,
711 git_branch: None,
712 git_origin_url: None,
713 memory_mode: None,
714 current_leaf_id: None,
715 })
716 }
717}
718
719pub struct Runtime {
721 pub config: ConfigToml,
723 pub model_registry: ModelRegistry,
725 pub thread_manager: ThreadManager,
727 pub tool_registry: Arc<ToolRegistry>,
729 pub mcp_manager: Arc<McpManager>,
731 pub exec_policy: ExecPolicyEngine,
733 pub hooks: HookDispatcher,
735 pub jobs: JobManager,
737}
738
739impl Runtime {
740 pub fn new(
742 config: ConfigToml,
743 model_registry: ModelRegistry,
744 state: StateStore,
745 tool_registry: Arc<ToolRegistry>,
746 mcp_manager: Arc<McpManager>,
747 exec_policy: ExecPolicyEngine,
748 hooks: HookDispatcher,
749 ) -> Self {
750 let mut jobs = JobManager::default();
751 let _ = jobs.load_from_store(&state);
752 Self {
753 config,
754 model_registry,
755 thread_manager: ThreadManager::new(state),
756 tool_registry,
757 mcp_manager,
758 exec_policy,
759 hooks,
760 jobs,
761 }
762 }
763
764 fn persisted_thread_data(&self, thread_id: &str) -> Result<Value> {
765 let history = self
766 .thread_manager
767 .state_store()
768 .list_messages(thread_id, Some(500))?
769 .into_iter()
770 .map(|message| {
771 json!({
772 "id": message.id,
773 "role": message.role,
774 "content": message.content,
775 "item": message.item,
776 "created_at": message.created_at
777 })
778 })
779 .collect::<Vec<_>>();
780
781 let checkpoint = self
782 .thread_manager
783 .state_store()
784 .load_checkpoint(thread_id, None)?
785 .map(|record| {
786 json!({
787 "checkpoint_id": record.checkpoint_id,
788 "state": record.state,
789 "created_at": record.created_at
790 })
791 });
792
793 Ok(json!({
794 "history": history,
795 "checkpoint": checkpoint
796 }))
797 }
798
799 fn persist_latest_checkpoint(&self, thread_id: &str, reason: &str, state: Value) -> Result<()> {
800 self.thread_manager.state_store().save_checkpoint(
801 thread_id,
802 "latest",
803 &json!({
804 "reason": reason,
805 "saved_at": chrono::Utc::now().timestamp(),
806 "state": state
807 }),
808 )
809 }
810
811 pub async fn handle_thread(&mut self, req: ThreadRequest) -> Result<ThreadResponse> {
813 match req {
814 ThreadRequest::Create { .. } => {
815 let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
816 let new = self.thread_manager.spawn_thread_with_history(
817 "deepseek".to_string(),
818 cwd,
819 InitialHistory::New,
820 false,
821 )?;
822 let mut response = thread_response_from_new("created", new);
823 response.data = self.persisted_thread_data(&response.thread_id)?;
824 Ok(response)
825 }
826 ThreadRequest::Start(params) => {
827 let cwd = params.cwd.clone().unwrap_or_else(|| {
828 std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
829 });
830 let new = self.thread_manager.spawn_thread_with_history(
831 params
832 .model_provider
833 .clone()
834 .unwrap_or_else(|| "deepseek".to_string()),
835 cwd,
836 InitialHistory::New,
837 params.persist_extended_history,
838 )?;
839 let mut response = thread_response_from_new("started", new);
840 response.data = self.persisted_thread_data(&response.thread_id)?;
841 Ok(response)
842 }
843 ThreadRequest::Resume(params) => {
844 let fallback_cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
845 if let Some(new) = self.thread_manager.resume_thread_with_history(
846 ¶ms,
847 &fallback_cwd,
848 "deepseek".to_string(),
849 )? {
850 let mut response = thread_response_from_new("resumed", new);
851 response.data = self.persisted_thread_data(&response.thread_id)?;
852 Ok(response)
853 } else {
854 Ok(ThreadResponse {
855 thread_id: params.thread_id,
856 status: "missing".to_string(),
857 thread: None,
858 threads: Vec::new(),
859 model: None,
860 model_provider: None,
861 cwd: None,
862 approval_policy: params.approval_policy,
863 sandbox: params.sandbox,
864 events: Vec::new(),
865 data: json!({"error":"thread not found"}),
866 })
867 }
868 }
869 ThreadRequest::Fork(params) => {
870 let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
871 if let Some(new) = self.thread_manager.fork_thread(¶ms, &cwd)? {
872 let mut response = thread_response_from_new("forked", new);
873 response.data = self.persisted_thread_data(&response.thread_id)?;
874 Ok(response)
875 } else {
876 Ok(ThreadResponse {
877 thread_id: params.thread_id,
878 status: "missing".to_string(),
879 thread: None,
880 threads: Vec::new(),
881 model: None,
882 model_provider: None,
883 cwd: None,
884 approval_policy: params.approval_policy,
885 sandbox: params.sandbox,
886 events: Vec::new(),
887 data: json!({"error":"thread not found"}),
888 })
889 }
890 }
891 ThreadRequest::List(params) => Ok(ThreadResponse {
892 thread_id: "list".to_string(),
893 status: "ok".to_string(),
894 thread: None,
895 threads: self.thread_manager.list_threads(¶ms)?,
896 model: None,
897 model_provider: None,
898 cwd: None,
899 approval_policy: None,
900 sandbox: None,
901 events: Vec::new(),
902 data: json!({}),
903 }),
904 ThreadRequest::Read(params) => {
905 let id = params.thread_id.clone();
906 let data = self.persisted_thread_data(&id)?;
907 Ok(ThreadResponse {
908 thread_id: id,
909 status: "ok".to_string(),
910 thread: self.thread_manager.read_thread(¶ms)?,
911 threads: Vec::new(),
912 model: None,
913 model_provider: None,
914 cwd: None,
915 approval_policy: None,
916 sandbox: None,
917 events: Vec::new(),
918 data,
919 })
920 }
921 ThreadRequest::SetName(params) => Ok(ThreadResponse {
922 thread_id: params.thread_id.clone(),
923 status: "ok".to_string(),
924 thread: self.thread_manager.set_thread_name(¶ms)?,
925 threads: Vec::new(),
926 model: None,
927 model_provider: None,
928 cwd: None,
929 approval_policy: None,
930 sandbox: None,
931 events: Vec::new(),
932 data: json!({}),
933 }),
934 ThreadRequest::Archive { thread_id } => {
935 self.thread_manager.archive_thread(&thread_id)?;
936 Ok(ThreadResponse {
937 thread_id,
938 status: "archived".to_string(),
939 thread: None,
940 threads: Vec::new(),
941 model: None,
942 model_provider: None,
943 cwd: None,
944 approval_policy: None,
945 sandbox: None,
946 events: Vec::new(),
947 data: json!({}),
948 })
949 }
950 ThreadRequest::Unarchive { thread_id } => {
951 self.thread_manager.unarchive_thread(&thread_id)?;
952 Ok(ThreadResponse {
953 thread_id,
954 status: "unarchived".to_string(),
955 thread: None,
956 threads: Vec::new(),
957 model: None,
958 model_provider: None,
959 cwd: None,
960 approval_policy: None,
961 sandbox: None,
962 events: Vec::new(),
963 data: json!({}),
964 })
965 }
966 ThreadRequest::Message { thread_id, input } => {
967 self.thread_manager.touch_message(&thread_id, &input)?;
968 let response_id = format!("{thread_id}:{}", input.len());
969 self.hooks
970 .emit(HookEvent::ResponseStart {
971 response_id: response_id.clone(),
972 })
973 .await;
974 self.hooks
975 .emit(HookEvent::ResponseEnd {
976 response_id: response_id.clone(),
977 })
978 .await;
979
980 Ok(ThreadResponse {
981 thread_id,
982 status: "accepted".to_string(),
983 thread: None,
984 threads: Vec::new(),
985 model: None,
986 model_provider: None,
987 cwd: None,
988 approval_policy: None,
989 sandbox: None,
990 events: vec![
991 EventFrame::ResponseStart {
992 response_id: response_id.clone(),
993 },
994 EventFrame::ResponseDelta {
995 response_id: response_id.clone(),
996 delta: "queued".to_string(),
997 channel: ResponseChannel::Text,
998 },
999 EventFrame::ResponseEnd { response_id },
1000 ],
1001 data: json!({}),
1002 })
1003 }
1004 }
1005 }
1006
1007 pub async fn handle_prompt(
1009 &mut self,
1010 req: PromptRequest,
1011 cli_overrides: &CliRuntimeOverrides,
1012 ) -> Result<PromptResponse> {
1013 let resolved = self.config.resolve_runtime_options(cli_overrides);
1014 let requested_model = req.model.clone().unwrap_or_else(|| resolved.model.clone());
1015 let selection = self
1016 .model_registry
1017 .resolve(Some(&requested_model), Some(resolved.provider));
1018 let resolved_model = selection.resolved.id.clone();
1019 let response_id = format!("resp-{}", Uuid::new_v4());
1020
1021 self.hooks
1022 .emit(HookEvent::ResponseStart {
1023 response_id: response_id.clone(),
1024 })
1025 .await;
1026 self.hooks
1027 .emit(HookEvent::ResponseDelta {
1028 response_id: response_id.clone(),
1029 delta: "model-selected".to_string(),
1030 })
1031 .await;
1032 self.hooks
1033 .emit(HookEvent::ResponseEnd {
1034 response_id: response_id.clone(),
1035 })
1036 .await;
1037
1038 let payload = json!({
1039 "provider": resolved.provider.as_str(),
1040 "model": resolved_model.clone(),
1041 "prompt": req.prompt,
1042 "telemetry": resolved.telemetry,
1043 "base_url": resolved.base_url,
1044 "has_api_key": resolved.api_key.as_ref().is_some_and(|k| !k.trim().is_empty()),
1045 "approval_policy": resolved.approval_policy,
1046 "sandbox_mode": resolved.sandbox_mode
1047 });
1048 if let Some(thread_id) = req.thread_id.as_ref() {
1049 self.thread_manager.touch_message(thread_id, &req.prompt)?;
1050 let assistant_message_id = self.thread_manager.store.append_message(
1051 thread_id,
1052 "assistant",
1053 &payload.to_string(),
1054 Some(payload.clone()),
1055 )?;
1056 self.persist_latest_checkpoint(
1057 thread_id,
1058 "prompt_response",
1059 json!({
1060 "response_id": response_id.clone(),
1061 "model": resolved_model.clone(),
1062 "provider": resolved.provider.as_str(),
1063 "assistant_message_id": assistant_message_id
1064 }),
1065 )?;
1066 }
1067
1068 Ok(PromptResponse {
1069 output: payload.to_string(),
1070 model: resolved_model,
1071 events: vec![
1072 EventFrame::ResponseStart {
1073 response_id: response_id.clone(),
1074 },
1075 EventFrame::ResponseDelta {
1076 response_id: response_id.clone(),
1077 delta: "model-selected".to_string(),
1078 channel: ResponseChannel::Text,
1079 },
1080 EventFrame::ResponseEnd { response_id },
1081 ],
1082 })
1083 }
1084
1085 pub async fn invoke_tool(
1087 &self,
1088 call: ToolCall,
1089 approval_mode: AskForApproval,
1090 cwd: &Path,
1091 ) -> Result<Value> {
1092 let fallback_cwd = cwd.display().to_string();
1093 let (command, policy_cwd, execution_kind) = call.execution_subject(&fallback_cwd);
1094 let policy_tool = match &call.payload {
1095 ToolPayload::LocalShell { .. } => "exec_shell",
1096 _ => call.name.as_str(),
1097 };
1098 let decision = self.exec_policy.check(ExecPolicyContext {
1099 command: &command,
1100 cwd: &policy_cwd,
1101 tool: Some(policy_tool),
1102 path: None,
1103 ask_for_approval: approval_mode,
1104 sandbox_mode: None,
1105 })?;
1106 let precheck = policy_precheck_payload(&decision, &command, &policy_cwd, execution_kind);
1107 let response_id = format!("tool-{}", Uuid::new_v4());
1108 let call_id = call
1109 .raw_tool_call_id
1110 .clone()
1111 .unwrap_or_else(|| format!("tool-call-{}", Uuid::new_v4()));
1112 self.hooks
1113 .emit(HookEvent::ToolLifecycle {
1114 response_id: response_id.clone(),
1115 tool_name: call.name.clone(),
1116 phase: "precheck".to_string(),
1117 payload: precheck.clone(),
1118 })
1119 .await;
1120
1121 if !decision.allow {
1122 let reason = decision.reason().to_string();
1123 let approval_id = format!("approval-{}", Uuid::new_v4());
1124 let error_frame = EventFrame::Error {
1125 response_id: response_id.clone(),
1126 message: reason.clone(),
1127 };
1128 self.hooks
1129 .emit(HookEvent::ApprovalLifecycle {
1130 approval_id,
1131 phase: "denied".to_string(),
1132 reason: Some(reason.clone()),
1133 })
1134 .await;
1135 self.hooks
1136 .emit(HookEvent::GenericEventFrame {
1137 frame: error_frame.clone(),
1138 })
1139 .await;
1140 return Ok(json!({
1141 "ok": false,
1142 "status": "denied",
1143 "execution_kind": execution_kind,
1144 "response_id": response_id,
1145 "precheck": precheck,
1146 "error": reason,
1147 "events": [event_frame_payload(&error_frame)],
1148 }));
1149 }
1150
1151 if decision.requires_approval {
1152 let approval_id = format!("approval-{}", Uuid::new_v4());
1153 let reason = decision.reason().to_string();
1154 let maybe_approval_frame = approval_request_frame(
1155 &decision.requirement,
1156 call_id,
1157 approval_id.clone(),
1158 response_id.clone(),
1159 command.clone(),
1160 policy_cwd.clone(),
1161 );
1162 self.hooks
1163 .emit(HookEvent::ApprovalLifecycle {
1164 approval_id: approval_id.clone(),
1165 phase: "requested".to_string(),
1166 reason: Some(reason.clone()),
1167 })
1168 .await;
1169 let mut events = Vec::new();
1170 if let Some(frame) = maybe_approval_frame {
1171 self.hooks
1172 .emit(HookEvent::GenericEventFrame {
1173 frame: frame.clone(),
1174 })
1175 .await;
1176 events.push(event_frame_payload(&frame));
1177 }
1178 return Ok(json!({
1179 "ok": false,
1180 "status": "approval_required",
1181 "execution_kind": execution_kind,
1182 "response_id": response_id,
1183 "approval_id": approval_id,
1184 "precheck": precheck,
1185 "error": reason,
1186 "events": events,
1187 }));
1188 }
1189
1190 let start_frame = EventFrame::ToolCallStart {
1191 response_id: response_id.clone(),
1192 tool_name: call.name.clone(),
1193 arguments: tool_payload_value(&call.payload),
1194 };
1195 self.hooks
1196 .emit(HookEvent::GenericEventFrame {
1197 frame: start_frame.clone(),
1198 })
1199 .await;
1200 self.hooks
1201 .emit(HookEvent::ToolLifecycle {
1202 response_id: response_id.clone(),
1203 tool_name: call.name.clone(),
1204 phase: "dispatching".to_string(),
1205 payload: json!({
1206 "call_id": call_id,
1207 "execution_kind": execution_kind
1208 }),
1209 })
1210 .await;
1211
1212 match self.tool_registry.dispatch(call.clone(), true).await {
1213 Ok(tool_output) => {
1214 let result_frame = EventFrame::ToolCallResult {
1215 response_id: response_id.clone(),
1216 tool_name: call.name.clone(),
1217 output: tool_output_value(&tool_output),
1218 };
1219 self.hooks
1220 .emit(HookEvent::GenericEventFrame {
1221 frame: result_frame.clone(),
1222 })
1223 .await;
1224 self.hooks
1225 .emit(HookEvent::ToolLifecycle {
1226 response_id: response_id.clone(),
1227 tool_name: call.name,
1228 phase: "completed".to_string(),
1229 payload: json!({ "ok": true }),
1230 })
1231 .await;
1232 Ok(json!({
1233 "ok": true,
1234 "status": "completed",
1235 "execution_kind": execution_kind,
1236 "response_id": response_id,
1237 "precheck": precheck,
1238 "output": tool_output,
1239 "events": [
1240 event_frame_payload(&start_frame),
1241 event_frame_payload(&result_frame)
1242 ]
1243 }))
1244 }
1245 Err(err) => {
1246 let message = format!("{err:?}");
1247 let error_frame = EventFrame::Error {
1248 response_id: response_id.clone(),
1249 message: message.clone(),
1250 };
1251 self.hooks
1252 .emit(HookEvent::GenericEventFrame {
1253 frame: error_frame.clone(),
1254 })
1255 .await;
1256 self.hooks
1257 .emit(HookEvent::ToolLifecycle {
1258 response_id: response_id.clone(),
1259 tool_name: call.name,
1260 phase: "failed".to_string(),
1261 payload: json!({ "error": message.clone() }),
1262 })
1263 .await;
1264 Ok(json!({
1265 "ok": false,
1266 "status": "failed",
1267 "execution_kind": execution_kind,
1268 "response_id": response_id,
1269 "precheck": precheck,
1270 "error": message,
1271 "events": [
1272 event_frame_payload(&start_frame),
1273 event_frame_payload(&error_frame)
1274 ]
1275 }))
1276 }
1277 }
1278 }
1279
1280 pub async fn mcp_startup(&self) -> McpStartupCompleteEvent {
1282 let mut updates = Vec::new();
1283 let summary = self.mcp_manager.start_all(|update| {
1284 updates.push(update);
1285 });
1286 for update in updates {
1287 let status = match update.status {
1288 McpManagerStartupStatus::Starting => codewhale_protocol::McpStartupStatus::Starting,
1289 McpManagerStartupStatus::Ready => codewhale_protocol::McpStartupStatus::Ready,
1290 McpManagerStartupStatus::Failed { error } => {
1291 codewhale_protocol::McpStartupStatus::Failed { error }
1292 }
1293 McpManagerStartupStatus::Cancelled => {
1294 codewhale_protocol::McpStartupStatus::Cancelled
1295 }
1296 };
1297 self.hooks
1298 .emit(HookEvent::GenericEventFrame {
1299 frame: EventFrame::McpStartupUpdate {
1300 update: codewhale_protocol::McpStartupUpdateEvent {
1301 server_name: update.server_name,
1302 status,
1303 },
1304 },
1305 })
1306 .await;
1307 }
1308 self.hooks
1309 .emit(HookEvent::GenericEventFrame {
1310 frame: EventFrame::McpStartupComplete {
1311 summary: codewhale_protocol::McpStartupCompleteEvent {
1312 ready: summary.ready.clone(),
1313 failed: summary
1314 .failed
1315 .iter()
1316 .map(|f| codewhale_protocol::McpStartupFailure {
1317 server_name: f.server_name.clone(),
1318 error: f.error.clone(),
1319 })
1320 .collect(),
1321 cancelled: summary.cancelled.clone(),
1322 },
1323 },
1324 })
1325 .await;
1326 summary
1327 }
1328
1329 pub fn app_status(&self) -> AppResponse {
1331 let jobs = self.jobs.list();
1332 let events = jobs
1333 .iter()
1334 .flat_map(|job| {
1335 job.history.iter().map(|entry| EventFrame::ResponseDelta {
1336 response_id: job.id.clone(),
1337 delta: json!({
1338 "kind": "job_transition",
1339 "job_id": job.id.clone(),
1340 "phase": entry.phase.clone(),
1341 "status": job_status_to_str(entry.status),
1342 "progress": entry.progress,
1343 "detail": entry.detail.clone(),
1344 "retry": job_retry_to_value(&entry.retry),
1345 "at": entry.at
1346 })
1347 .to_string(),
1348 channel: ResponseChannel::Text,
1349 })
1350 })
1351 .collect::<Vec<_>>();
1352 AppResponse {
1353 ok: true,
1354 data: json!({
1355 "jobs": jobs.into_iter().map(|job| {
1356 json!({
1357 "id": job.id,
1358 "name": job.name,
1359 "status": job_status_to_str(job.status),
1360 "progress": job.progress,
1361 "detail": job.detail,
1362 "retry": job_retry_to_value(&job.retry),
1363 "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
1364 })
1365 }).collect::<Vec<_>>()
1366 }),
1367 events,
1368 }
1369 }
1370
1371 pub fn provider_default(&self) -> ProviderKind {
1373 self.config.provider
1374 }
1375
1376 pub fn save_thread_checkpoint(
1378 &self,
1379 thread_id: &str,
1380 checkpoint_id: &str,
1381 state: &Value,
1382 ) -> Result<()> {
1383 self.thread_manager
1384 .state_store()
1385 .save_checkpoint(thread_id, checkpoint_id, state)
1386 }
1387
1388 pub fn load_thread_checkpoint(
1390 &self,
1391 thread_id: &str,
1392 checkpoint_id: Option<&str>,
1393 ) -> Result<Option<Value>> {
1394 Ok(self
1395 .thread_manager
1396 .state_store()
1397 .load_checkpoint(thread_id, checkpoint_id)?
1398 .map(|checkpoint| checkpoint.state))
1399 }
1400
1401 pub fn enqueue_job(&mut self, name: impl Into<String>) -> Result<JobRecord> {
1403 let job = self.jobs.enqueue(name);
1404 self.jobs
1405 .persist_job(self.thread_manager.state_store(), &job.id)?;
1406 Ok(job)
1407 }
1408
1409 pub fn set_job_running(&mut self, job_id: &str) -> Result<()> {
1411 self.jobs.set_running(job_id);
1412 self.jobs
1413 .persist_job(self.thread_manager.state_store(), job_id)
1414 }
1415
1416 pub fn update_job_progress(
1418 &mut self,
1419 job_id: &str,
1420 progress: u8,
1421 detail: Option<String>,
1422 ) -> Result<()> {
1423 self.jobs.update_progress(job_id, progress, detail);
1424 self.jobs
1425 .persist_job(self.thread_manager.state_store(), job_id)
1426 }
1427
1428 pub fn complete_job(&mut self, job_id: &str) -> Result<()> {
1430 self.jobs.complete(job_id);
1431 self.jobs
1432 .persist_job(self.thread_manager.state_store(), job_id)
1433 }
1434
1435 pub fn fail_job(&mut self, job_id: &str, detail: impl Into<String>) -> Result<()> {
1437 self.jobs.fail(job_id, detail);
1438 self.jobs
1439 .persist_job(self.thread_manager.state_store(), job_id)
1440 }
1441
1442 pub fn cancel_job(&mut self, job_id: &str) -> Result<()> {
1444 self.jobs.cancel(job_id);
1445 self.jobs
1446 .persist_job(self.thread_manager.state_store(), job_id)
1447 }
1448
1449 pub fn pause_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1451 self.jobs.pause(job_id, detail);
1452 self.jobs
1453 .persist_job(self.thread_manager.state_store(), job_id)
1454 }
1455
1456 pub fn resume_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1458 self.jobs.resume(job_id, detail);
1459 self.jobs
1460 .persist_job(self.thread_manager.state_store(), job_id)
1461 }
1462
1463 pub fn job_history(&self, job_id: &str) -> Vec<JobHistoryEntry> {
1465 self.jobs.history(job_id)
1466 }
1467}
1468
1469fn thread_response_from_new(status: &str, new: NewThread) -> ThreadResponse {
1470 ThreadResponse {
1471 thread_id: new.thread.id.clone(),
1472 status: status.to_string(),
1473 thread: Some(new.thread),
1474 threads: Vec::new(),
1475 model: Some(new.model),
1476 model_provider: Some(new.model_provider),
1477 cwd: Some(new.cwd),
1478 approval_policy: new.approval_policy,
1479 sandbox: new.sandbox,
1480 events: Vec::new(),
1481 data: json!({}),
1482 }
1483}
1484
1485fn preview_from_initial_history(initial_history: &InitialHistory) -> String {
1486 match initial_history {
1487 InitialHistory::New => "New conversation".to_string(),
1488 InitialHistory::Forked(items) => truncate_preview(
1489 &items
1490 .first()
1491 .map(Value::to_string)
1492 .unwrap_or_else(|| "Forked conversation".to_string()),
1493 ),
1494 InitialHistory::Resumed { history, .. } => truncate_preview(
1495 &history
1496 .first()
1497 .map(Value::to_string)
1498 .unwrap_or_else(|| "Resumed conversation".to_string()),
1499 ),
1500 }
1501}
1502
1503fn truncate_preview(value: &str) -> String {
1504 value.chars().take(120).collect()
1505}
1506
1507fn to_protocol_thread(thread: ThreadMetadata) -> Thread {
1508 Thread {
1509 id: thread.id,
1510 preview: thread.preview,
1511 ephemeral: thread.ephemeral,
1512 model_provider: thread.model_provider,
1513 created_at: thread.created_at,
1514 updated_at: thread.updated_at,
1515 status: match thread.status {
1516 PersistedThreadStatus::Running => ThreadStatus::Running,
1517 PersistedThreadStatus::Idle => ThreadStatus::Idle,
1518 PersistedThreadStatus::Completed => ThreadStatus::Completed,
1519 PersistedThreadStatus::Failed => ThreadStatus::Failed,
1520 PersistedThreadStatus::Paused => ThreadStatus::Paused,
1521 PersistedThreadStatus::Archived => ThreadStatus::Archived,
1522 },
1523 path: thread.path,
1524 cwd: thread.cwd,
1525 cli_version: thread.cli_version,
1526 source: match thread.source {
1527 SessionSource::Interactive => codewhale_protocol::SessionSource::Interactive,
1528 SessionSource::Resume => codewhale_protocol::SessionSource::Resume,
1529 SessionSource::Fork => codewhale_protocol::SessionSource::Fork,
1530 SessionSource::Api => codewhale_protocol::SessionSource::Api,
1531 SessionSource::Unknown => codewhale_protocol::SessionSource::Unknown,
1532 },
1533 name: thread.name,
1534 }
1535}
1536
1537fn to_persisted_status(status: &ThreadStatus) -> PersistedThreadStatus {
1538 match status {
1539 ThreadStatus::Running => PersistedThreadStatus::Running,
1540 ThreadStatus::Idle => PersistedThreadStatus::Idle,
1541 ThreadStatus::Completed => PersistedThreadStatus::Completed,
1542 ThreadStatus::Failed => PersistedThreadStatus::Failed,
1543 ThreadStatus::Paused => PersistedThreadStatus::Paused,
1544 ThreadStatus::Archived => PersistedThreadStatus::Archived,
1545 }
1546}
1547
1548fn to_persisted_source(source: &codewhale_protocol::SessionSource) -> SessionSource {
1549 match source {
1550 codewhale_protocol::SessionSource::Interactive => SessionSource::Interactive,
1551 codewhale_protocol::SessionSource::Resume => SessionSource::Resume,
1552 codewhale_protocol::SessionSource::Fork => SessionSource::Fork,
1553 codewhale_protocol::SessionSource::Api => SessionSource::Api,
1554 codewhale_protocol::SessionSource::Unknown => SessionSource::Unknown,
1555 }
1556}
1557
1558fn approval_request_frame(
1559 requirement: &ExecApprovalRequirement,
1560 call_id: String,
1561 approval_id: String,
1562 turn_id: String,
1563 command: String,
1564 cwd: String,
1565) -> Option<EventFrame> {
1566 let ExecApprovalRequirement::NeedsApproval {
1567 reason,
1568 proposed_execpolicy_amendment,
1569 proposed_network_policy_amendments,
1570 } = requirement
1571 else {
1572 return None;
1573 };
1574
1575 let mut available_decisions = vec![
1576 ReviewDecision::Approved,
1577 ReviewDecision::ApprovedForSession,
1578 ReviewDecision::Denied,
1579 ReviewDecision::Abort,
1580 ];
1581 if proposed_execpolicy_amendment
1582 .as_ref()
1583 .is_some_and(|amendment| !amendment.prefixes.is_empty())
1584 {
1585 available_decisions.push(ReviewDecision::ApprovedExecpolicyAmendment);
1586 }
1587 available_decisions.extend(proposed_network_policy_amendments.iter().cloned().map(
1588 |amendment| ReviewDecision::NetworkPolicyAmendment {
1589 host: amendment.host,
1590 action: amendment.action,
1591 },
1592 ));
1593
1594 Some(EventFrame::ExecApprovalRequest {
1595 request: ExecApprovalRequestEvent {
1596 call_id,
1597 approval_id,
1598 turn_id,
1599 command,
1600 cwd,
1601 reason: reason.clone(),
1602 network_approval_context: None,
1603 proposed_execpolicy_amendment: proposed_execpolicy_amendment
1604 .as_ref()
1605 .map(|amendment| amendment.prefixes.clone())
1606 .unwrap_or_default(),
1607 proposed_network_policy_amendments: proposed_network_policy_amendments.clone(),
1608 additional_permissions: Vec::new(),
1609 available_decisions,
1610 },
1611 })
1612}
1613
1614fn approval_requirement_payload(requirement: &ExecApprovalRequirement) -> Value {
1615 match requirement {
1616 ExecApprovalRequirement::Skip {
1617 bypass_sandbox,
1618 proposed_execpolicy_amendment,
1619 } => json!({
1620 "type": "skip",
1621 "bypass_sandbox": bypass_sandbox,
1622 "reason": requirement.reason(),
1623 "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1624 .as_ref()
1625 .map(|amendment| amendment.prefixes.clone())
1626 .unwrap_or_default()
1627 }),
1628 ExecApprovalRequirement::NeedsApproval {
1629 reason,
1630 proposed_execpolicy_amendment,
1631 proposed_network_policy_amendments,
1632 } => json!({
1633 "type": "needs_approval",
1634 "reason": reason,
1635 "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1636 .as_ref()
1637 .map(|amendment| amendment.prefixes.clone())
1638 .unwrap_or_default(),
1639 "proposed_network_policy_amendments": proposed_network_policy_amendments
1640 }),
1641 ExecApprovalRequirement::Forbidden { reason } => json!({
1642 "type": "forbidden",
1643 "reason": reason
1644 }),
1645 }
1646}
1647
1648fn policy_precheck_payload(
1649 decision: &ExecPolicyDecision,
1650 command: &str,
1651 cwd: &str,
1652 execution_kind: &str,
1653) -> Value {
1654 json!({
1655 "execution_kind": execution_kind,
1656 "command": command,
1657 "cwd": cwd,
1658 "allow": decision.allow,
1659 "requires_approval": decision.requires_approval,
1660 "matched_rule": decision.matched_rule.clone(),
1661 "phase": decision.requirement.phase(),
1662 "reason": decision.reason(),
1663 "requirement": approval_requirement_payload(&decision.requirement)
1664 })
1665}
1666
1667fn tool_payload_value(payload: &ToolPayload) -> Value {
1668 serde_json::to_value(payload).unwrap_or_else(
1669 |_| json!({"type":"serialization_error","message":"tool payload unavailable"}),
1670 )
1671}
1672
1673fn tool_output_value(output: &codewhale_protocol::ToolOutput) -> Value {
1674 serde_json::to_value(output).unwrap_or_else(
1675 |_| json!({"type":"serialization_error","message":"tool output unavailable"}),
1676 )
1677}
1678
1679fn event_frame_payload(frame: &EventFrame) -> Value {
1680 serde_json::to_value(frame)
1681 .unwrap_or_else(|_| json!({"event":"error","message":"failed to encode event frame"}))
1682}
1683
1684fn json_optional_string(value: &Value) -> Option<String> {
1685 if value.is_null() {
1686 None
1687 } else {
1688 value.as_str().map(ToString::to_string)
1689 }
1690}
1691
1692fn parse_retry_metadata(value: Option<&Value>) -> JobRetryMetadata {
1693 let Some(value) = value else {
1694 return JobRetryMetadata::default();
1695 };
1696 JobRetryMetadata {
1697 attempt: value
1698 .get("attempt")
1699 .and_then(Value::as_u64)
1700 .unwrap_or(0)
1701 .min(u32::MAX as u64) as u32,
1702 max_attempts: value
1703 .get("max_attempts")
1704 .and_then(Value::as_u64)
1705 .unwrap_or(DEFAULT_JOB_MAX_ATTEMPTS as u64)
1706 .min(u32::MAX as u64) as u32,
1707 backoff_base_ms: value
1708 .get("backoff_base_ms")
1709 .and_then(Value::as_u64)
1710 .unwrap_or(DEFAULT_JOB_BACKOFF_BASE_MS),
1711 next_backoff_ms: value
1712 .get("next_backoff_ms")
1713 .and_then(Value::as_u64)
1714 .unwrap_or(0),
1715 next_retry_at: value.get("next_retry_at").and_then(Value::as_i64),
1716 }
1717}
1718
1719fn parse_history_entry(value: &Value) -> Option<JobHistoryEntry> {
1720 let status = value
1721 .get("status")
1722 .and_then(Value::as_str)
1723 .and_then(job_status_from_str)?;
1724 Some(JobHistoryEntry {
1725 at: value.get("at").and_then(Value::as_i64).unwrap_or(0),
1726 phase: value
1727 .get("phase")
1728 .and_then(Value::as_str)
1729 .unwrap_or("unknown")
1730 .to_string(),
1731 status,
1732 progress: value
1733 .get("progress")
1734 .and_then(Value::as_u64)
1735 .map(|v| v.min(u8::MAX as u64) as u8),
1736 detail: value.get("detail").and_then(json_optional_string),
1737 retry: parse_retry_metadata(value.get("retry")),
1738 })
1739}
1740
1741fn job_status_to_str(status: JobStatus) -> &'static str {
1742 match status {
1743 JobStatus::Queued => "queued",
1744 JobStatus::Running => "running",
1745 JobStatus::Paused => "paused",
1746 JobStatus::Completed => "completed",
1747 JobStatus::Failed => "failed",
1748 JobStatus::Cancelled => "cancelled",
1749 }
1750}
1751
1752fn job_status_from_str(value: &str) -> Option<JobStatus> {
1753 match value {
1754 "queued" => Some(JobStatus::Queued),
1755 "running" => Some(JobStatus::Running),
1756 "paused" => Some(JobStatus::Paused),
1757 "completed" => Some(JobStatus::Completed),
1758 "failed" => Some(JobStatus::Failed),
1759 "cancelled" => Some(JobStatus::Cancelled),
1760 _ => None,
1761 }
1762}
1763
1764fn job_retry_to_value(retry: &JobRetryMetadata) -> Value {
1765 json!({
1766 "attempt": retry.attempt,
1767 "max_attempts": retry.max_attempts,
1768 "backoff_base_ms": retry.backoff_base_ms,
1769 "next_backoff_ms": retry.next_backoff_ms,
1770 "next_retry_at": retry.next_retry_at
1771 })
1772}
1773
1774fn job_history_to_value(entry: &JobHistoryEntry) -> Value {
1775 json!({
1776 "at": entry.at,
1777 "phase": entry.phase.clone(),
1778 "status": job_status_to_str(entry.status),
1779 "progress": entry.progress,
1780 "detail": entry.detail.clone(),
1781 "retry": job_retry_to_value(&entry.retry)
1782 })
1783}
1784
1785fn runtime_status_to_job_state(status: JobStatus) -> JobStateStatus {
1786 match status {
1787 JobStatus::Queued => JobStateStatus::Queued,
1788 JobStatus::Running => JobStateStatus::Running,
1789 JobStatus::Paused => JobStateStatus::Running,
1790 JobStatus::Completed => JobStateStatus::Completed,
1791 JobStatus::Failed => JobStateStatus::Failed,
1792 JobStatus::Cancelled => JobStateStatus::Cancelled,
1793 }
1794}
1795
1796fn job_state_status_to_runtime(status: JobStateStatus) -> JobStatus {
1797 match status {
1798 JobStateStatus::Queued => JobStatus::Queued,
1799 JobStateStatus::Running => JobStatus::Running,
1800 JobStateStatus::Completed => JobStatus::Completed,
1801 JobStateStatus::Failed => JobStatus::Failed,
1802 JobStateStatus::Cancelled => JobStatus::Cancelled,
1803 }
1804}
1805
1806#[cfg(test)]
1807mod tests {
1808 use super::*;
1809
1810 #[test]
1813 fn enqueue_creates_queued_job_with_zero_progress() {
1814 let mut jm = JobManager::default();
1815 let job = jm.enqueue("build");
1816 assert_eq!(job.name, "build");
1817 assert_eq!(job.status, JobStatus::Queued);
1818 assert_eq!(job.progress, Some(0));
1819 assert!(job.detail.is_none());
1820 assert_eq!(job.history.len(), 1);
1821 assert_eq!(job.history[0].phase, "created");
1822 }
1823
1824 #[test]
1825 fn set_running_transitions_from_queued() {
1826 let mut jm = JobManager::default();
1827 let job = jm.enqueue("deploy");
1828 let id = job.id.clone();
1829 jm.set_running(&id);
1830 let jobs = jm.list();
1831 let updated = jobs.iter().find(|j| j.id == id).unwrap();
1832 assert_eq!(updated.status, JobStatus::Running);
1833 assert_eq!(updated.history.last().unwrap().phase, "running");
1834 }
1835
1836 #[test]
1837 fn update_progress_clamps_to_100() {
1838 let mut jm = JobManager::default();
1839 let job = jm.enqueue("task");
1840 let id = job.id.clone();
1841 jm.update_progress(&id, 150, Some("over".to_string()));
1842 let jobs = jm.list();
1843 let updated = jobs.iter().find(|j| j.id == id).unwrap();
1844 assert_eq!(updated.progress, Some(100));
1845 }
1846
1847 #[test]
1848 fn complete_sets_progress_to_100() {
1849 let mut jm = JobManager::default();
1850 let job = jm.enqueue("task");
1851 let id = job.id.clone();
1852 jm.set_running(&id);
1853 jm.complete(&id);
1854 let jobs = jm.list();
1855 let updated = jobs.iter().find(|j| j.id == id).unwrap();
1856 assert_eq!(updated.status, JobStatus::Completed);
1857 assert_eq!(updated.progress, Some(100));
1858 }
1859
1860 #[test]
1861 fn fail_increments_attempt_and_sets_backoff() {
1862 let mut jm = JobManager::default();
1863 let job = jm.enqueue("fragile");
1864 let id = job.id.clone();
1865 jm.set_running(&id);
1866 jm.fail(&id, "crashed");
1867 let jobs = jm.list();
1868 let updated = jobs.iter().find(|j| j.id == id).unwrap();
1869 assert_eq!(updated.status, JobStatus::Failed);
1870 assert_eq!(updated.retry.attempt, 1);
1871 assert!(updated.retry.next_backoff_ms > 0);
1872 assert!(updated.retry.next_retry_at.is_some());
1873 assert_eq!(updated.detail.as_deref(), Some("crashed"));
1874 }
1875
1876 #[test]
1877 fn fail_clears_retry_after_max_attempts() {
1878 let mut jm = JobManager::default();
1879 let job = jm.enqueue("fragile");
1880 let id = job.id.clone();
1881 for _ in 0..=DEFAULT_JOB_MAX_ATTEMPTS {
1882 jm.set_running(&id);
1883 jm.fail(&id, "boom");
1884 }
1885 let jobs = jm.list();
1886 let updated = jobs.iter().find(|j| j.id == id).unwrap();
1887 assert_eq!(updated.retry.attempt, DEFAULT_JOB_MAX_ATTEMPTS);
1888 assert_eq!(updated.retry.next_backoff_ms, 0);
1889 assert!(updated.retry.next_retry_at.is_none());
1890 }
1891
1892 #[test]
1893 fn cancel_sets_status_and_clears_retry() {
1894 let mut jm = JobManager::default();
1895 let job = jm.enqueue("task");
1896 let id = job.id.clone();
1897 jm.cancel(&id);
1898 let jobs = jm.list();
1899 let updated = jobs.iter().find(|j| j.id == id).unwrap();
1900 assert_eq!(updated.status, JobStatus::Cancelled);
1901 assert_eq!(updated.retry.next_backoff_ms, 0);
1902 }
1903
1904 #[test]
1905 fn pause_and_resume_round_trip() {
1906 let mut jm = JobManager::default();
1907 let job = jm.enqueue("task");
1908 let id = job.id.clone();
1909 jm.set_running(&id);
1910 jm.pause(&id, Some("waiting".to_string()));
1911 let jobs = jm.list();
1912 let paused = jobs.iter().find(|j| j.id == id).unwrap();
1913 assert_eq!(paused.status, JobStatus::Paused);
1914 assert_eq!(paused.detail.as_deref(), Some("waiting"));
1915
1916 jm.resume(&id, None);
1917 let jobs = jm.list();
1918 let resumed = jobs.iter().find(|j| j.id == id).unwrap();
1919 assert_eq!(resumed.status, JobStatus::Running);
1920 assert_eq!(resumed.history.last().unwrap().phase, "resumed");
1921 }
1922
1923 #[test]
1924 fn list_returns_jobs_sorted_by_updated_at_desc() {
1925 let mut jm = JobManager::default();
1926 jm.enqueue("first");
1927 jm.enqueue("second");
1928 jm.enqueue("third");
1929 let jobs = jm.list();
1930 assert_eq!(jobs.len(), 3);
1931 for window in jobs.windows(2) {
1932 assert!(window[0].updated_at >= window[1].updated_at);
1933 }
1934 }
1935
1936 #[test]
1937 fn history_returns_entries_for_existing_job() {
1938 let mut jm = JobManager::default();
1939 let job = jm.enqueue("task");
1940 let id = job.id.clone();
1941 jm.set_running(&id);
1942 jm.complete(&id);
1943 let history = jm.history(&id);
1944 assert_eq!(history.len(), 3); assert_eq!(history[0].phase, "created");
1946 assert_eq!(history[1].phase, "running");
1947 assert_eq!(history[2].phase, "completed");
1948 }
1949
1950 #[test]
1951 fn history_returns_empty_for_unknown_job() {
1952 let jm = JobManager::default();
1953 assert!(jm.history("nonexistent").is_empty());
1954 }
1955
1956 #[test]
1957 fn resume_pending_requeues_running_and_queued() {
1958 let mut jm = JobManager::default();
1959 let _j1 = jm.enqueue("queued_task");
1960 let j2 = jm.enqueue("running_task");
1961 let j3 = jm.enqueue("completed_task");
1962 let id2 = j2.id.clone();
1963 let id3 = j3.id.clone();
1964 jm.set_running(&id2);
1965 jm.set_running(&id3);
1966 jm.complete(&id3);
1967
1968 let resumed = jm.resume_pending();
1969 assert_eq!(resumed.len(), 2);
1970 for job in &resumed {
1971 assert_eq!(job.status, JobStatus::Queued);
1972 }
1973 }
1974
1975 #[test]
1978 fn deterministic_backoff_zero_on_first_attempt() {
1979 let retry = JobRetryMetadata {
1980 attempt: 0,
1981 ..Default::default()
1982 };
1983 assert_eq!(JobManager::deterministic_backoff_ms(&retry), 0);
1984 }
1985
1986 #[test]
1987 fn deterministic_backoff_exponential_growth() {
1988 let base = DEFAULT_JOB_BACKOFF_BASE_MS;
1989 for attempt in 1..=5 {
1990 let retry = JobRetryMetadata {
1991 attempt,
1992 backoff_base_ms: base,
1993 ..Default::default()
1994 };
1995 let expected = base * 2u64.pow(attempt.saturating_sub(1).min(20));
1996 assert_eq!(
1997 JobManager::deterministic_backoff_ms(&retry),
1998 expected,
1999 "attempt {attempt}"
2000 );
2001 }
2002 }
2003
2004 #[test]
2005 fn deterministic_backoff_saturates_at_high_exponent() {
2006 let retry = JobRetryMetadata {
2007 attempt: 63,
2008 backoff_base_ms: 1000,
2009 ..Default::default()
2010 };
2011 let _ = JobManager::deterministic_backoff_ms(&retry);
2013 }
2014
2015 #[test]
2018 fn push_history_truncates_beyond_max() {
2019 let mut jm = JobManager::default();
2020 let job = jm.enqueue("task");
2021 let id = job.id.clone();
2022 for i in 0..(MAX_JOB_HISTORY_ENTRIES + 20) {
2024 jm.update_progress(&id, (i % 100) as u8, Some(format!("step {i}")));
2025 }
2026 let history = jm.history(&id);
2027 assert_eq!(history.len(), MAX_JOB_HISTORY_ENTRIES);
2028 }
2029
2030 #[test]
2033 fn encode_and_parse_persisted_detail_round_trip() {
2034 let mut jm = JobManager::default();
2035 let job = jm.enqueue("task");
2036 let id = job.id.clone();
2037 jm.set_running(&id);
2038 jm.fail(&id, "oops");
2039 let job = jm.list().into_iter().find(|j| j.id == id).unwrap();
2040
2041 let encoded = JobManager::encode_persisted_detail(&job).unwrap().unwrap();
2042 let parsed = JobManager::parse_persisted_detail(Some(&encoded)).unwrap();
2043
2044 assert_eq!(parsed.status, job.status);
2045 assert_eq!(parsed.detail, job.detail);
2046 assert_eq!(parsed.retry.attempt, job.retry.attempt);
2047 assert_eq!(parsed.history.len(), job.history.len());
2048 }
2049
2050 #[test]
2051 fn parse_persisted_detail_returns_none_for_none_input() {
2052 assert!(JobManager::parse_persisted_detail(None).is_none());
2053 }
2054
2055 #[test]
2056 fn parse_persisted_detail_returns_none_for_invalid_json() {
2057 assert!(JobManager::parse_persisted_detail(Some("not json")).is_none());
2058 }
2059
2060 #[test]
2063 fn job_status_round_trip_str() {
2064 let statuses = [
2065 JobStatus::Queued,
2066 JobStatus::Running,
2067 JobStatus::Paused,
2068 JobStatus::Completed,
2069 JobStatus::Failed,
2070 JobStatus::Cancelled,
2071 ];
2072 for status in &statuses {
2073 let s = job_status_to_str(*status);
2074 let parsed = job_status_from_str(s);
2075 assert_eq!(parsed, Some(*status), "round-trip failed for {s:?}");
2076 }
2077 }
2078
2079 #[test]
2080 fn job_status_from_str_returns_none_for_unknown() {
2081 assert_eq!(job_status_from_str("unknown"), None);
2082 assert_eq!(job_status_from_str(""), None);
2083 }
2084
2085 #[test]
2086 fn truncate_preview_limits_to_120_chars() {
2087 let long = "a".repeat(200);
2088 let truncated = truncate_preview(&long);
2089 assert_eq!(truncated.len(), 120);
2090 }
2091
2092 #[test]
2093 fn truncate_preview_preserves_short_strings() {
2094 let short = "hello";
2095 assert_eq!(truncate_preview(short), "hello");
2096 }
2097
2098 #[test]
2099 fn runtime_status_to_job_state_maps_correctly() {
2100 assert_eq!(
2101 runtime_status_to_job_state(JobStatus::Queued),
2102 JobStateStatus::Queued
2103 );
2104 assert_eq!(
2105 runtime_status_to_job_state(JobStatus::Running),
2106 JobStateStatus::Running
2107 );
2108 assert_eq!(
2109 runtime_status_to_job_state(JobStatus::Paused),
2110 JobStateStatus::Running
2111 );
2112 assert_eq!(
2113 runtime_status_to_job_state(JobStatus::Completed),
2114 JobStateStatus::Completed
2115 );
2116 assert_eq!(
2117 runtime_status_to_job_state(JobStatus::Failed),
2118 JobStateStatus::Failed
2119 );
2120 assert_eq!(
2121 runtime_status_to_job_state(JobStatus::Cancelled),
2122 JobStateStatus::Cancelled
2123 );
2124 }
2125
2126 #[test]
2127 fn job_state_status_to_runtime_maps_correctly() {
2128 assert_eq!(
2129 job_state_status_to_runtime(JobStateStatus::Queued),
2130 JobStatus::Queued
2131 );
2132 assert_eq!(
2133 job_state_status_to_runtime(JobStateStatus::Running),
2134 JobStatus::Running
2135 );
2136 assert_eq!(
2137 job_state_status_to_runtime(JobStateStatus::Completed),
2138 JobStatus::Completed
2139 );
2140 assert_eq!(
2141 job_state_status_to_runtime(JobStateStatus::Failed),
2142 JobStatus::Failed
2143 );
2144 assert_eq!(
2145 job_state_status_to_runtime(JobStateStatus::Cancelled),
2146 JobStatus::Cancelled
2147 );
2148 }
2149
2150 #[test]
2151 fn preview_from_initial_history_new() {
2152 let preview = preview_from_initial_history(&InitialHistory::New);
2153 assert_eq!(preview, "New conversation");
2154 }
2155
2156 #[test]
2157 fn preview_from_initial_history_forked() {
2158 let preview = preview_from_initial_history(&InitialHistory::Forked(vec![json!("hello")]));
2159 assert!(preview.contains("hello"));
2160 }
2161
2162 #[test]
2163 fn preview_from_initial_history_resumed() {
2164 let preview = preview_from_initial_history(&InitialHistory::Resumed {
2165 conversation_id: "test".to_string(),
2166 history: vec![json!("world")],
2167 rollout_path: PathBuf::from("/tmp/test"),
2168 });
2169 assert!(preview.contains("world"));
2170 }
2171
2172 #[test]
2173 fn json_optional_string_handles_null() {
2174 assert!(json_optional_string(&Value::Null).is_none());
2175 }
2176
2177 #[test]
2178 fn json_optional_string_handles_string() {
2179 assert_eq!(
2180 json_optional_string(&Value::String("hello".to_string())),
2181 Some("hello".to_string())
2182 );
2183 }
2184
2185 #[test]
2186 fn json_optional_string_handles_non_string() {
2187 assert!(json_optional_string(&json!(42)).is_none());
2188 }
2189
2190 #[test]
2191 fn parse_retry_metadata_returns_default_for_none() {
2192 let retry = parse_retry_metadata(None);
2193 assert_eq!(retry.attempt, 0);
2194 assert_eq!(retry.max_attempts, DEFAULT_JOB_MAX_ATTEMPTS);
2195 assert_eq!(retry.backoff_base_ms, DEFAULT_JOB_BACKOFF_BASE_MS);
2196 }
2197
2198 #[test]
2199 fn parse_retry_metadata_parses_fields() {
2200 let value = json!({
2201 "attempt": 2,
2202 "max_attempts": 5,
2203 "backoff_base_ms": 1000,
2204 "next_backoff_ms": 2000,
2205 "next_retry_at": 1234567890i64
2206 });
2207 let retry = parse_retry_metadata(Some(&value));
2208 assert_eq!(retry.attempt, 2);
2209 assert_eq!(retry.max_attempts, 5);
2210 assert_eq!(retry.backoff_base_ms, 1000);
2211 assert_eq!(retry.next_backoff_ms, 2000);
2212 assert_eq!(retry.next_retry_at, Some(1234567890));
2213 }
2214
2215 #[test]
2216 fn parse_history_entry_returns_none_without_status() {
2217 let value = json!({"at": 1, "phase": "test"});
2218 assert!(parse_history_entry(&value).is_none());
2219 }
2220
2221 #[test]
2222 fn parse_history_entry_parses_valid_entry() {
2223 let value = json!({
2224 "at": 100,
2225 "phase": "running",
2226 "status": "running",
2227 "progress": 50,
2228 "detail": "working",
2229 "retry": {"attempt": 0, "max_attempts": 3, "backoff_base_ms": 500}
2230 });
2231 let entry = parse_history_entry(&value).unwrap();
2232 assert_eq!(entry.at, 100);
2233 assert_eq!(entry.phase, "running");
2234 assert_eq!(entry.status, JobStatus::Running);
2235 assert_eq!(entry.progress, Some(50));
2236 assert_eq!(entry.detail.as_deref(), Some("working"));
2237 }
2238}