1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use anyhow::Result;
6use codewhale_agent::ModelRegistry;
7use codewhale_config::{CliRuntimeOverrides, ConfigToml, ProviderKind};
8use codewhale_execpolicy::{
9 AskForApproval, ExecApprovalRequirement, ExecPolicyContext, ExecPolicyDecision,
10 ExecPolicyEngine,
11};
12use codewhale_hooks::{HookDispatcher, HookEvent};
13use codewhale_mcp::{
14 McpManager, McpStartupCompleteEvent, McpStartupStatus as McpManagerStartupStatus,
15};
16use codewhale_protocol::{
17 AppResponse, EventFrame, ExecApprovalRequestEvent, PromptRequest, PromptResponse,
18 ResponseChannel, ReviewDecision, Thread, ThreadForkParams, ThreadListParams, ThreadReadParams,
19 ThreadRequest, ThreadResponse, ThreadResumeParams, ThreadSetNameParams, ThreadStatus,
20 ToolPayload,
21};
22use codewhale_state::{
23 JobStateRecord, JobStateStatus, SessionSource, StateStore, ThreadListFilters, ThreadMetadata,
24 ThreadStatus as PersistedThreadStatus,
25};
26use codewhale_tools::{ToolCall, ToolRegistry};
27use serde_json::{Value, json};
28use uuid::Uuid;
29
30#[derive(Debug, Clone)]
32pub enum InitialHistory {
33 New,
35 Forked(Vec<Value>),
37 Resumed {
39 conversation_id: String,
40 history: Vec<Value>,
41 rollout_path: PathBuf,
42 },
43}
44
45#[derive(Debug, Clone)]
47pub struct NewThread {
48 pub thread: Thread,
50 pub model: String,
52 pub model_provider: String,
54 pub cwd: PathBuf,
56 pub approval_policy: Option<String>,
58 pub sandbox: Option<String>,
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum JobStatus {
65 Queued,
67 Running,
69 Paused,
71 Completed,
73 Failed,
75 Cancelled,
77}
78
79const JOB_DETAIL_SCHEMA_VERSION: u8 = 1;
80const DEFAULT_JOB_MAX_ATTEMPTS: u32 = 3;
81const DEFAULT_JOB_BACKOFF_BASE_MS: u64 = 500;
82const MAX_JOB_HISTORY_ENTRIES: usize = 64;
83
84#[derive(Debug, Clone)]
86pub struct JobRetryMetadata {
87 pub attempt: u32,
89 pub max_attempts: u32,
91 pub backoff_base_ms: u64,
93 pub next_backoff_ms: u64,
95 pub next_retry_at: Option<i64>,
97}
98
99impl Default for JobRetryMetadata {
100 fn default() -> Self {
101 Self {
102 attempt: 0,
103 max_attempts: DEFAULT_JOB_MAX_ATTEMPTS,
104 backoff_base_ms: DEFAULT_JOB_BACKOFF_BASE_MS,
105 next_backoff_ms: 0,
106 next_retry_at: None,
107 }
108 }
109}
110
111#[derive(Debug, Clone)]
113pub struct JobHistoryEntry {
114 pub at: i64,
116 pub phase: String,
118 pub status: JobStatus,
120 pub progress: Option<u8>,
122 pub detail: Option<String>,
124 pub retry: JobRetryMetadata,
126}
127
128#[derive(Debug, Clone)]
129struct PersistedJobDetail {
130 pub status: JobStatus,
131 pub detail: Option<String>,
132 pub retry: JobRetryMetadata,
133 pub history: Vec<JobHistoryEntry>,
134}
135
136#[derive(Debug, Clone)]
138pub struct JobRecord {
139 pub id: String,
141 pub name: String,
143 pub status: JobStatus,
145 pub progress: Option<u8>,
147 pub detail: Option<String>,
149 pub retry: JobRetryMetadata,
151 pub history: Vec<JobHistoryEntry>,
153 pub created_at: i64,
155 pub updated_at: i64,
157}
158
159#[derive(Debug, Default)]
161pub struct JobManager {
162 jobs: HashMap<String, JobRecord>,
163}
164
165impl JobManager {
166 fn now_ts() -> i64 {
167 chrono::Utc::now().timestamp()
168 }
169
170 fn deterministic_backoff_ms(retry: &JobRetryMetadata) -> u64 {
171 if retry.attempt == 0 {
172 return 0;
173 }
174 let exponent = retry.attempt.saturating_sub(1).min(20);
175 let multiplier = 1u64.checked_shl(exponent).unwrap_or(u64::MAX);
176 retry.backoff_base_ms.saturating_mul(multiplier)
177 }
178
179 fn clear_retry_schedule(retry: &mut JobRetryMetadata) {
180 retry.next_backoff_ms = 0;
181 retry.next_retry_at = None;
182 }
183
184 fn push_history(job: &mut JobRecord, phase: &str) {
185 job.history.push(JobHistoryEntry {
186 at: job.updated_at,
187 phase: phase.to_string(),
188 status: job.status,
189 progress: job.progress,
190 detail: job.detail.clone(),
191 retry: job.retry.clone(),
192 });
193 if job.history.len() > MAX_JOB_HISTORY_ENTRIES {
194 let to_drain = job.history.len() - MAX_JOB_HISTORY_ENTRIES;
195 job.history.drain(0..to_drain);
196 }
197 }
198
199 fn parse_persisted_detail(raw: Option<&str>) -> Option<PersistedJobDetail> {
200 let raw = raw?;
201 let parsed: Value = serde_json::from_str(raw).ok()?;
202 let status = parsed
203 .get("status")
204 .and_then(Value::as_str)
205 .and_then(job_status_from_str)?;
206 let detail = parsed.get("detail").and_then(json_optional_string);
207 let retry = parse_retry_metadata(parsed.get("retry"));
208 let history = parsed
209 .get("history")
210 .and_then(Value::as_array)
211 .map(|items| {
212 items
213 .iter()
214 .filter_map(parse_history_entry)
215 .collect::<Vec<_>>()
216 })
217 .unwrap_or_default();
218 Some(PersistedJobDetail {
219 status,
220 detail,
221 retry,
222 history,
223 })
224 }
225
226 fn encode_persisted_detail(job: &JobRecord) -> Result<Option<String>> {
227 let encoded = json!({
228 "schema_version": JOB_DETAIL_SCHEMA_VERSION,
229 "status": job_status_to_str(job.status),
230 "detail": job.detail.clone(),
231 "retry": job_retry_to_value(&job.retry),
232 "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
233 })
234 .to_string();
235 Ok(Some(encoded))
236 }
237
238 pub fn enqueue(&mut self, name: impl Into<String>) -> JobRecord {
240 let now = Self::now_ts();
241 let id = format!("job-{}", Uuid::new_v4());
242 let mut job = JobRecord {
243 id: id.clone(),
244 name: name.into(),
245 status: JobStatus::Queued,
246 progress: Some(0),
247 detail: None,
248 retry: JobRetryMetadata::default(),
249 history: Vec::new(),
250 created_at: now,
251 updated_at: now,
252 };
253 Self::push_history(&mut job, "created");
254 self.jobs.insert(id, job.clone());
255 job
256 }
257
258 pub fn set_running(&mut self, id: &str) {
260 if let Some(job) = self.jobs.get_mut(id) {
261 job.status = JobStatus::Running;
262 Self::clear_retry_schedule(&mut job.retry);
263 job.updated_at = Self::now_ts();
264 Self::push_history(job, "running");
265 }
266 }
267
268 pub fn update_progress(&mut self, id: &str, progress: u8, detail: Option<String>) {
270 if let Some(job) = self.jobs.get_mut(id) {
271 job.progress = Some(progress.min(100));
272 job.detail = detail;
273 job.updated_at = Self::now_ts();
274 Self::push_history(job, "progress_updated");
275 }
276 }
277
278 pub fn complete(&mut self, id: &str) {
280 if let Some(job) = self.jobs.get_mut(id) {
281 job.status = JobStatus::Completed;
282 job.progress = Some(100);
283 Self::clear_retry_schedule(&mut job.retry);
284 job.updated_at = Self::now_ts();
285 Self::push_history(job, "completed");
286 }
287 }
288
289 pub fn fail(&mut self, id: &str, detail: impl Into<String>) {
291 if let Some(job) = self.jobs.get_mut(id) {
292 let now = Self::now_ts();
293 job.status = JobStatus::Failed;
294 job.detail = Some(detail.into());
295 if job.retry.attempt < job.retry.max_attempts {
296 job.retry.attempt += 1;
297 job.retry.next_backoff_ms = Self::deterministic_backoff_ms(&job.retry);
298 let delay_secs = ((job.retry.next_backoff_ms.saturating_add(999)) / 1000)
299 .min(i64::MAX as u64) as i64;
300 job.retry.next_retry_at = Some(now.saturating_add(delay_secs));
301 } else {
302 Self::clear_retry_schedule(&mut job.retry);
303 }
304 job.updated_at = now;
305 Self::push_history(job, "failed");
306 }
307 }
308
309 pub fn cancel(&mut self, id: &str) {
311 if let Some(job) = self.jobs.get_mut(id) {
312 job.status = JobStatus::Cancelled;
313 Self::clear_retry_schedule(&mut job.retry);
314 job.updated_at = Self::now_ts();
315 Self::push_history(job, "cancelled");
316 }
317 }
318
319 pub fn pause(&mut self, id: &str, detail: Option<String>) {
321 if let Some(job) = self.jobs.get_mut(id) {
322 job.status = JobStatus::Paused;
323 if detail.is_some() {
324 job.detail = detail;
325 }
326 job.updated_at = Self::now_ts();
327 Self::push_history(job, "paused");
328 }
329 }
330
331 pub fn resume(&mut self, id: &str, detail: Option<String>) {
333 if let Some(job) = self.jobs.get_mut(id) {
334 job.status = JobStatus::Running;
335 if detail.is_some() {
336 job.detail = detail;
337 }
338 Self::clear_retry_schedule(&mut job.retry);
339 job.updated_at = Self::now_ts();
340 Self::push_history(job, "resumed");
341 }
342 }
343
344 pub fn list(&self) -> Vec<JobRecord> {
346 let mut out = self.jobs.values().cloned().collect::<Vec<_>>();
347 out.sort_by_key(|job| std::cmp::Reverse(job.updated_at));
348 out
349 }
350
351 pub fn history(&self, id: &str) -> Vec<JobHistoryEntry> {
353 self.jobs
354 .get(id)
355 .map(|job| job.history.clone())
356 .unwrap_or_default()
357 }
358
359 pub fn resume_pending(&mut self) -> Vec<JobRecord> {
361 let mut resumed = Vec::new();
362 for job in self.jobs.values_mut() {
363 if matches!(job.status, JobStatus::Queued | JobStatus::Running) {
364 job.status = JobStatus::Queued;
365 job.updated_at = Self::now_ts();
366 Self::push_history(job, "queued_after_resume");
367 resumed.push(job.clone());
368 }
369 }
370 resumed
371 }
372
373 pub fn load_from_store(&mut self, store: &StateStore) -> Result<()> {
375 let persisted = store.list_jobs(Some(500))?;
376 for job in persisted {
377 let fallback_status = job_state_status_to_runtime(job.status);
378 let parsed = Self::parse_persisted_detail(job.detail.as_deref());
379 let (status, detail, retry, history) = if let Some(detail_state) = parsed {
380 (
381 detail_state.status,
382 detail_state.detail,
383 detail_state.retry,
384 detail_state.history,
385 )
386 } else {
387 (
388 fallback_status,
389 job.detail,
390 JobRetryMetadata::default(),
391 Vec::new(),
392 )
393 };
394 self.jobs.insert(
395 job.id.clone(),
396 JobRecord {
397 id: job.id,
398 name: job.name,
399 status,
400 progress: job.progress,
401 detail,
402 retry,
403 history,
404 created_at: job.created_at,
405 updated_at: job.updated_at,
406 },
407 );
408 }
409 Ok(())
410 }
411
412 pub fn persist_job(&self, store: &StateStore, id: &str) -> Result<()> {
414 let Some(job) = self.jobs.get(id) else {
415 return Ok(());
416 };
417 let encoded_detail = Self::encode_persisted_detail(job)?;
418 store.upsert_job(&JobStateRecord {
419 id: job.id.clone(),
420 name: job.name.clone(),
421 status: runtime_status_to_job_state(job.status),
422 progress: job.progress,
423 detail: encoded_detail,
424 created_at: job.created_at,
425 updated_at: job.updated_at,
426 })
427 }
428
429 pub fn persist_all(&self, store: &StateStore) -> Result<()> {
431 for id in self.jobs.keys() {
432 self.persist_job(store, id)?;
433 }
434 Ok(())
435 }
436}
437
438pub struct ThreadManager {
440 store: StateStore,
441 running_threads: HashMap<String, Thread>,
442 cli_version: String,
443}
444
445impl ThreadManager {
446 pub fn new(store: StateStore) -> Self {
448 Self {
449 store,
450 running_threads: HashMap::new(),
451 cli_version: env!("CARGO_PKG_VERSION").to_string(),
452 }
453 }
454
455 pub fn state_store(&self) -> &StateStore {
457 &self.store
458 }
459
460 pub fn spawn_thread_with_history(
462 &mut self,
463 model_provider: String,
464 cwd: PathBuf,
465 initial_history: InitialHistory,
466 persist_extended_history: bool,
467 ) -> Result<NewThread> {
468 let id = format!("thread-{}", Uuid::new_v4());
469 let now = chrono::Utc::now().timestamp();
470 let preview = preview_from_initial_history(&initial_history);
471 let source = match initial_history {
472 InitialHistory::New => SessionSource::Interactive,
473 InitialHistory::Forked(_) => SessionSource::Fork,
474 InitialHistory::Resumed { .. } => SessionSource::Resume,
475 };
476 let thread = Thread {
477 id: id.clone(),
478 preview,
479 ephemeral: !persist_extended_history,
480 model_provider: model_provider.clone(),
481 created_at: now,
482 updated_at: now,
483 status: ThreadStatus::Running,
484 path: None,
485 cwd: cwd.clone(),
486 cli_version: self.cli_version.clone(),
487 source: match source {
488 SessionSource::Interactive => codewhale_protocol::SessionSource::Interactive,
489 SessionSource::Resume => codewhale_protocol::SessionSource::Resume,
490 SessionSource::Fork => codewhale_protocol::SessionSource::Fork,
491 SessionSource::Api => codewhale_protocol::SessionSource::Api,
492 SessionSource::Unknown => codewhale_protocol::SessionSource::Unknown,
493 },
494 name: None,
495 };
496 self.persist_thread(&thread, None)?;
497 match &initial_history {
498 InitialHistory::Forked(items) => {
499 for item in items {
500 self.store.append_message(
501 &thread.id,
502 "history",
503 &item.to_string(),
504 Some(item.clone()),
505 )?;
506 }
507 }
508 InitialHistory::Resumed { history, .. } => {
509 for item in history {
510 self.store.append_message(
511 &thread.id,
512 "history",
513 &item.to_string(),
514 Some(item.clone()),
515 )?;
516 }
517 }
518 InitialHistory::New => {}
519 }
520 self.running_threads
521 .insert(thread.id.clone(), thread.clone());
522 Ok(NewThread {
523 thread,
524 model: "auto".to_string(),
525 model_provider,
526 cwd,
527 approval_policy: None,
528 sandbox: None,
529 })
530 }
531
532 pub fn resume_thread_with_history(
534 &mut self,
535 params: &ThreadResumeParams,
536 fallback_cwd: &Path,
537 model_provider: String,
538 ) -> Result<Option<NewThread>> {
539 if params.history.is_none()
540 && let Some(thread) = self.running_threads.get(¶ms.thread_id).cloned()
541 {
542 return Ok(Some(NewThread {
543 model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
544 model_provider: params.model_provider.clone().unwrap_or(model_provider),
545 cwd: params.cwd.clone().unwrap_or_else(|| thread.cwd.clone()),
546 approval_policy: params.approval_policy.clone(),
547 sandbox: params.sandbox.clone(),
548 thread,
549 }));
550 }
551
552 let persisted = self.store.get_thread(¶ms.thread_id)?;
553 let Some(metadata) = persisted else {
554 return Ok(None);
555 };
556 let mut thread = to_protocol_thread(metadata);
557 thread.status = ThreadStatus::Running;
558 thread.updated_at = chrono::Utc::now().timestamp();
559 thread.cwd = params
560 .cwd
561 .clone()
562 .unwrap_or_else(|| fallback_cwd.to_path_buf());
563 self.persist_thread(&thread, None)?;
564 self.running_threads
565 .insert(thread.id.clone(), thread.clone());
566 if let Some(history) = params.history.as_ref() {
567 for item in history {
568 self.store.append_message(
569 &thread.id,
570 "history",
571 &item.to_string(),
572 Some(item.clone()),
573 )?;
574 }
575 }
576
577 Ok(Some(NewThread {
578 model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
579 model_provider: params.model_provider.clone().unwrap_or(model_provider),
580 cwd: thread.cwd.clone(),
581 approval_policy: params.approval_policy.clone(),
582 sandbox: params.sandbox.clone(),
583 thread,
584 }))
585 }
586
587 pub fn fork_thread(
589 &mut self,
590 params: &ThreadForkParams,
591 fallback_cwd: &Path,
592 ) -> Result<Option<NewThread>> {
593 let parent = self.store.get_thread(¶ms.thread_id)?;
594 let Some(parent) = parent else {
595 return Ok(None);
596 };
597 let parent_thread = to_protocol_thread(parent);
598 let new = self.spawn_thread_with_history(
599 params
600 .model_provider
601 .clone()
602 .unwrap_or_else(|| parent_thread.model_provider.clone()),
603 params
604 .cwd
605 .clone()
606 .unwrap_or_else(|| fallback_cwd.to_path_buf()),
607 InitialHistory::Forked(vec![json!({
608 "type": "fork",
609 "from_thread_id": parent_thread.id
610 })]),
611 params.persist_extended_history,
612 )?;
613 Ok(Some(new))
614 }
615
616 pub fn list_threads(&self, params: &ThreadListParams) -> Result<Vec<Thread>> {
618 let list = self.store.list_threads(ThreadListFilters {
619 include_archived: params.include_archived,
620 limit: params.limit,
621 })?;
622 Ok(list.into_iter().map(to_protocol_thread).collect())
623 }
624
625 pub fn read_thread(&self, params: &ThreadReadParams) -> Result<Option<Thread>> {
627 Ok(self
628 .store
629 .get_thread(¶ms.thread_id)?
630 .map(to_protocol_thread))
631 }
632
633 pub fn set_thread_name(&mut self, params: &ThreadSetNameParams) -> Result<Option<Thread>> {
635 let Some(mut metadata) = self.store.get_thread(¶ms.thread_id)? else {
636 return Ok(None);
637 };
638 metadata.name = Some(params.name.clone());
639 metadata.updated_at = chrono::Utc::now().timestamp();
640 self.store.upsert_thread(&metadata)?;
641 let updated = to_protocol_thread(metadata);
642 self.running_threads
643 .insert(updated.id.clone(), updated.clone());
644 Ok(Some(updated))
645 }
646
647 pub fn archive_thread(&mut self, thread_id: &str) -> Result<()> {
649 self.store.mark_archived(thread_id)?;
650 if let Some(thread) = self.running_threads.get_mut(thread_id) {
651 thread.status = ThreadStatus::Archived;
652 }
653 Ok(())
654 }
655
656 pub fn unarchive_thread(&mut self, thread_id: &str) -> Result<()> {
658 self.store.mark_unarchived(thread_id)?;
659 Ok(())
660 }
661
662 pub fn touch_message(&mut self, thread_id: &str, input: &str) -> Result<()> {
664 let Some(mut metadata) = self.store.get_thread(thread_id)? else {
665 return Ok(());
666 };
667 metadata.updated_at = chrono::Utc::now().timestamp();
668 metadata.preview = truncate_preview(input);
669 metadata.status = PersistedThreadStatus::Running;
670 self.store.upsert_thread(&metadata)?;
671 if let Some(thread) = self.running_threads.get_mut(thread_id) {
672 thread.updated_at = metadata.updated_at;
673 thread.preview = metadata.preview;
674 thread.status = ThreadStatus::Running;
675 }
676 let message_id = self.store.append_message(thread_id, "user", input, None)?;
677 self.store.save_checkpoint(
678 thread_id,
679 "latest",
680 &json!({
681 "reason": "thread_message",
682 "message_id": message_id,
683 "role": "user",
684 "preview": truncate_preview(input),
685 "updated_at": metadata.updated_at
686 }),
687 )?;
688 Ok(())
689 }
690
691 fn persist_thread(&self, thread: &Thread, rollout_path: Option<PathBuf>) -> Result<()> {
692 self.store.upsert_thread(&ThreadMetadata {
693 id: thread.id.clone(),
694 rollout_path,
695 preview: thread.preview.clone(),
696 ephemeral: thread.ephemeral,
697 model_provider: thread.model_provider.clone(),
698 created_at: thread.created_at,
699 updated_at: thread.updated_at,
700 status: to_persisted_status(&thread.status),
701 path: thread.path.clone(),
702 cwd: thread.cwd.clone(),
703 cli_version: thread.cli_version.clone(),
704 source: to_persisted_source(&thread.source),
705 name: thread.name.clone(),
706 sandbox_policy: None,
707 approval_mode: None,
708 archived: matches!(thread.status, ThreadStatus::Archived),
709 archived_at: None,
710 git_sha: None,
711 git_branch: None,
712 git_origin_url: None,
713 memory_mode: None,
714 current_leaf_id: None,
715 })
716 }
717}
718
719pub struct Runtime {
721 pub config: ConfigToml,
723 pub model_registry: ModelRegistry,
725 pub thread_manager: ThreadManager,
727 pub tool_registry: Arc<ToolRegistry>,
729 pub mcp_manager: Arc<McpManager>,
731 pub exec_policy: ExecPolicyEngine,
733 pub hooks: HookDispatcher,
735 pub jobs: JobManager,
737}
738
739impl Runtime {
740 pub fn new(
742 config: ConfigToml,
743 model_registry: ModelRegistry,
744 state: StateStore,
745 tool_registry: Arc<ToolRegistry>,
746 mcp_manager: Arc<McpManager>,
747 exec_policy: ExecPolicyEngine,
748 hooks: HookDispatcher,
749 ) -> Self {
750 let mut jobs = JobManager::default();
751 if let Err(e) = jobs.load_from_store(&state) {
752 tracing::warn!("Failed to load job store, starting with empty job list: {e}");
753 }
754 Self {
755 config,
756 model_registry,
757 thread_manager: ThreadManager::new(state),
758 tool_registry,
759 mcp_manager,
760 exec_policy,
761 hooks,
762 jobs,
763 }
764 }
765
766 fn persisted_thread_data(&self, thread_id: &str) -> Result<Value> {
767 let history = self
768 .thread_manager
769 .state_store()
770 .list_messages(thread_id, Some(500))?
771 .into_iter()
772 .map(|message| {
773 json!({
774 "id": message.id,
775 "role": message.role,
776 "content": message.content,
777 "item": message.item,
778 "created_at": message.created_at
779 })
780 })
781 .collect::<Vec<_>>();
782
783 let checkpoint = self
784 .thread_manager
785 .state_store()
786 .load_checkpoint(thread_id, None)?
787 .map(|record| {
788 json!({
789 "checkpoint_id": record.checkpoint_id,
790 "state": record.state,
791 "created_at": record.created_at
792 })
793 });
794
795 Ok(json!({
796 "history": history,
797 "checkpoint": checkpoint
798 }))
799 }
800
801 fn persist_latest_checkpoint(&self, thread_id: &str, reason: &str, state: Value) -> Result<()> {
802 self.thread_manager.state_store().save_checkpoint(
803 thread_id,
804 "latest",
805 &json!({
806 "reason": reason,
807 "saved_at": chrono::Utc::now().timestamp(),
808 "state": state
809 }),
810 )
811 }
812
813 pub async fn handle_thread(&mut self, req: ThreadRequest) -> Result<ThreadResponse> {
815 match req {
816 ThreadRequest::Create { .. } => {
817 let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
818 let new = self.thread_manager.spawn_thread_with_history(
819 "deepseek".to_string(),
820 cwd,
821 InitialHistory::New,
822 false,
823 )?;
824 let mut response = thread_response_from_new("created", new);
825 response.data = self.persisted_thread_data(&response.thread_id)?;
826 Ok(response)
827 }
828 ThreadRequest::Start(params) => {
829 let cwd = params.cwd.clone().unwrap_or_else(|| {
830 std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
831 });
832 let new = self.thread_manager.spawn_thread_with_history(
833 params
834 .model_provider
835 .clone()
836 .unwrap_or_else(|| "deepseek".to_string()),
837 cwd,
838 InitialHistory::New,
839 params.persist_extended_history,
840 )?;
841 let mut response = thread_response_from_new("started", new);
842 response.data = self.persisted_thread_data(&response.thread_id)?;
843 Ok(response)
844 }
845 ThreadRequest::Resume(params) => {
846 let fallback_cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
847 if let Some(new) = self.thread_manager.resume_thread_with_history(
848 ¶ms,
849 &fallback_cwd,
850 "deepseek".to_string(),
851 )? {
852 let mut response = thread_response_from_new("resumed", new);
853 response.data = self.persisted_thread_data(&response.thread_id)?;
854 Ok(response)
855 } else {
856 Ok(ThreadResponse {
857 thread_id: params.thread_id,
858 status: "missing".to_string(),
859 thread: None,
860 threads: Vec::new(),
861 model: None,
862 model_provider: None,
863 cwd: None,
864 approval_policy: params.approval_policy,
865 sandbox: params.sandbox,
866 events: Vec::new(),
867 data: json!({"error":"thread not found"}),
868 })
869 }
870 }
871 ThreadRequest::Fork(params) => {
872 let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
873 if let Some(new) = self.thread_manager.fork_thread(¶ms, &cwd)? {
874 let mut response = thread_response_from_new("forked", new);
875 response.data = self.persisted_thread_data(&response.thread_id)?;
876 Ok(response)
877 } else {
878 Ok(ThreadResponse {
879 thread_id: params.thread_id,
880 status: "missing".to_string(),
881 thread: None,
882 threads: Vec::new(),
883 model: None,
884 model_provider: None,
885 cwd: None,
886 approval_policy: params.approval_policy,
887 sandbox: params.sandbox,
888 events: Vec::new(),
889 data: json!({"error":"thread not found"}),
890 })
891 }
892 }
893 ThreadRequest::List(params) => Ok(ThreadResponse {
894 thread_id: "list".to_string(),
895 status: "ok".to_string(),
896 thread: None,
897 threads: self.thread_manager.list_threads(¶ms)?,
898 model: None,
899 model_provider: None,
900 cwd: None,
901 approval_policy: None,
902 sandbox: None,
903 events: Vec::new(),
904 data: json!({}),
905 }),
906 ThreadRequest::Read(params) => {
907 let id = params.thread_id.clone();
908 let data = self.persisted_thread_data(&id)?;
909 Ok(ThreadResponse {
910 thread_id: id,
911 status: "ok".to_string(),
912 thread: self.thread_manager.read_thread(¶ms)?,
913 threads: Vec::new(),
914 model: None,
915 model_provider: None,
916 cwd: None,
917 approval_policy: None,
918 sandbox: None,
919 events: Vec::new(),
920 data,
921 })
922 }
923 ThreadRequest::SetName(params) => Ok(ThreadResponse {
924 thread_id: params.thread_id.clone(),
925 status: "ok".to_string(),
926 thread: self.thread_manager.set_thread_name(¶ms)?,
927 threads: Vec::new(),
928 model: None,
929 model_provider: None,
930 cwd: None,
931 approval_policy: None,
932 sandbox: None,
933 events: Vec::new(),
934 data: json!({}),
935 }),
936 ThreadRequest::Archive { thread_id } => {
937 self.thread_manager.archive_thread(&thread_id)?;
938 Ok(ThreadResponse {
939 thread_id,
940 status: "archived".to_string(),
941 thread: None,
942 threads: Vec::new(),
943 model: None,
944 model_provider: None,
945 cwd: None,
946 approval_policy: None,
947 sandbox: None,
948 events: Vec::new(),
949 data: json!({}),
950 })
951 }
952 ThreadRequest::Unarchive { thread_id } => {
953 self.thread_manager.unarchive_thread(&thread_id)?;
954 Ok(ThreadResponse {
955 thread_id,
956 status: "unarchived".to_string(),
957 thread: None,
958 threads: Vec::new(),
959 model: None,
960 model_provider: None,
961 cwd: None,
962 approval_policy: None,
963 sandbox: None,
964 events: Vec::new(),
965 data: json!({}),
966 })
967 }
968 ThreadRequest::Message { thread_id, input } => {
969 self.thread_manager.touch_message(&thread_id, &input)?;
970 let response_id = format!("{thread_id}:{}", input.len());
971 self.hooks
972 .emit(HookEvent::ResponseStart {
973 response_id: response_id.clone(),
974 })
975 .await;
976 self.hooks
977 .emit(HookEvent::ResponseEnd {
978 response_id: response_id.clone(),
979 })
980 .await;
981
982 Ok(ThreadResponse {
983 thread_id,
984 status: "accepted".to_string(),
985 thread: None,
986 threads: Vec::new(),
987 model: None,
988 model_provider: None,
989 cwd: None,
990 approval_policy: None,
991 sandbox: None,
992 events: vec![
993 EventFrame::ResponseStart {
994 response_id: response_id.clone(),
995 },
996 EventFrame::ResponseDelta {
997 response_id: response_id.clone(),
998 delta: "queued".to_string(),
999 channel: ResponseChannel::Text,
1000 },
1001 EventFrame::ResponseEnd { response_id },
1002 ],
1003 data: json!({}),
1004 })
1005 }
1006 }
1007 }
1008
1009 pub async fn handle_prompt(
1011 &mut self,
1012 req: PromptRequest,
1013 cli_overrides: &CliRuntimeOverrides,
1014 ) -> Result<PromptResponse> {
1015 let resolved = self.config.resolve_runtime_options(cli_overrides);
1016 let requested_model = req.model.clone().unwrap_or_else(|| resolved.model.clone());
1017 let selection = self
1018 .model_registry
1019 .resolve(Some(&requested_model), Some(resolved.provider));
1020 let resolved_model = selection.resolved.id.clone();
1021 let response_id = format!("resp-{}", Uuid::new_v4());
1022
1023 self.hooks
1024 .emit(HookEvent::ResponseStart {
1025 response_id: response_id.clone(),
1026 })
1027 .await;
1028 self.hooks
1029 .emit(HookEvent::ResponseDelta {
1030 response_id: response_id.clone(),
1031 delta: "model-selected".to_string(),
1032 })
1033 .await;
1034 self.hooks
1035 .emit(HookEvent::ResponseEnd {
1036 response_id: response_id.clone(),
1037 })
1038 .await;
1039
1040 let payload = json!({
1041 "provider": resolved.provider.as_str(),
1042 "model": resolved_model.clone(),
1043 "prompt": req.prompt,
1044 "telemetry": resolved.telemetry,
1045 "base_url": resolved.base_url,
1046 "has_api_key": resolved.api_key.as_ref().is_some_and(|k| !k.trim().is_empty()),
1047 "approval_policy": resolved.approval_policy,
1048 "sandbox_mode": resolved.sandbox_mode
1049 });
1050 if let Some(thread_id) = req.thread_id.as_ref() {
1051 self.thread_manager.touch_message(thread_id, &req.prompt)?;
1052 let assistant_message_id = self.thread_manager.store.append_message(
1053 thread_id,
1054 "assistant",
1055 &payload.to_string(),
1056 Some(payload.clone()),
1057 )?;
1058 self.persist_latest_checkpoint(
1059 thread_id,
1060 "prompt_response",
1061 json!({
1062 "response_id": response_id.clone(),
1063 "model": resolved_model.clone(),
1064 "provider": resolved.provider.as_str(),
1065 "assistant_message_id": assistant_message_id
1066 }),
1067 )?;
1068 }
1069
1070 Ok(PromptResponse {
1071 output: payload.to_string(),
1072 model: resolved_model,
1073 events: vec![
1074 EventFrame::ResponseStart {
1075 response_id: response_id.clone(),
1076 },
1077 EventFrame::ResponseDelta {
1078 response_id: response_id.clone(),
1079 delta: "model-selected".to_string(),
1080 channel: ResponseChannel::Text,
1081 },
1082 EventFrame::ResponseEnd { response_id },
1083 ],
1084 })
1085 }
1086
1087 pub async fn invoke_tool(
1089 &self,
1090 call: ToolCall,
1091 approval_mode: AskForApproval,
1092 cwd: &Path,
1093 ) -> Result<Value> {
1094 let fallback_cwd = cwd.display().to_string();
1095 let (command, policy_cwd, execution_kind) = call.execution_subject(&fallback_cwd);
1096 let policy_tool = match &call.payload {
1097 ToolPayload::LocalShell { .. } => "exec_shell",
1098 _ => call.name.as_str(),
1099 };
1100 let policy_path = permission_path_for_call(&call);
1101 let decision = self.exec_policy.check(ExecPolicyContext {
1102 command: &command,
1103 cwd: &policy_cwd,
1104 tool: Some(policy_tool),
1105 path: policy_path.as_deref(),
1106 ask_for_approval: approval_mode,
1107 sandbox_mode: None,
1108 })?;
1109 let precheck = policy_precheck_payload(&decision, &command, &policy_cwd, execution_kind);
1110 let response_id = format!("tool-{}", Uuid::new_v4());
1111 let call_id = call
1112 .raw_tool_call_id
1113 .clone()
1114 .unwrap_or_else(|| format!("tool-call-{}", Uuid::new_v4()));
1115 self.hooks
1116 .emit(HookEvent::ToolLifecycle {
1117 response_id: response_id.clone(),
1118 tool_name: call.name.clone(),
1119 phase: "precheck".to_string(),
1120 payload: precheck.clone(),
1121 })
1122 .await;
1123
1124 if !decision.allow {
1125 let reason = decision.reason().to_string();
1126 let approval_id = format!("approval-{}", Uuid::new_v4());
1127 let error_frame = EventFrame::Error {
1128 response_id: response_id.clone(),
1129 message: reason.clone(),
1130 };
1131 self.hooks
1132 .emit(HookEvent::ApprovalLifecycle {
1133 approval_id,
1134 phase: "denied".to_string(),
1135 reason: Some(reason.clone()),
1136 })
1137 .await;
1138 self.hooks
1139 .emit(HookEvent::GenericEventFrame {
1140 frame: error_frame.clone(),
1141 })
1142 .await;
1143 return Ok(json!({
1144 "ok": false,
1145 "status": "denied",
1146 "execution_kind": execution_kind,
1147 "response_id": response_id,
1148 "precheck": precheck,
1149 "error": reason,
1150 "events": [event_frame_payload(&error_frame)],
1151 }));
1152 }
1153
1154 if decision.requires_approval {
1155 let approval_id = format!("approval-{}", Uuid::new_v4());
1156 let reason = decision.reason().to_string();
1157 let maybe_approval_frame = approval_request_frame(
1158 &decision.requirement,
1159 call_id,
1160 approval_id.clone(),
1161 response_id.clone(),
1162 command.clone(),
1163 policy_cwd.clone(),
1164 );
1165 self.hooks
1166 .emit(HookEvent::ApprovalLifecycle {
1167 approval_id: approval_id.clone(),
1168 phase: "requested".to_string(),
1169 reason: Some(reason.clone()),
1170 })
1171 .await;
1172 let mut events = Vec::new();
1173 if let Some(frame) = maybe_approval_frame {
1174 self.hooks
1175 .emit(HookEvent::GenericEventFrame {
1176 frame: frame.clone(),
1177 })
1178 .await;
1179 events.push(event_frame_payload(&frame));
1180 }
1181 return Ok(json!({
1182 "ok": false,
1183 "status": "approval_required",
1184 "execution_kind": execution_kind,
1185 "response_id": response_id,
1186 "approval_id": approval_id,
1187 "precheck": precheck,
1188 "error": reason,
1189 "events": events,
1190 }));
1191 }
1192
1193 let start_frame = EventFrame::ToolCallStart {
1194 response_id: response_id.clone(),
1195 tool_name: call.name.clone(),
1196 arguments: tool_payload_value(&call.payload),
1197 };
1198 self.hooks
1199 .emit(HookEvent::GenericEventFrame {
1200 frame: start_frame.clone(),
1201 })
1202 .await;
1203 self.hooks
1204 .emit(HookEvent::ToolLifecycle {
1205 response_id: response_id.clone(),
1206 tool_name: call.name.clone(),
1207 phase: "dispatching".to_string(),
1208 payload: json!({
1209 "call_id": call_id,
1210 "execution_kind": execution_kind
1211 }),
1212 })
1213 .await;
1214
1215 match self.tool_registry.dispatch(call.clone(), true).await {
1216 Ok(tool_output) => {
1217 let result_frame = EventFrame::ToolCallResult {
1218 response_id: response_id.clone(),
1219 tool_name: call.name.clone(),
1220 output: tool_output_value(&tool_output),
1221 };
1222 self.hooks
1223 .emit(HookEvent::GenericEventFrame {
1224 frame: result_frame.clone(),
1225 })
1226 .await;
1227 self.hooks
1228 .emit(HookEvent::ToolLifecycle {
1229 response_id: response_id.clone(),
1230 tool_name: call.name,
1231 phase: "completed".to_string(),
1232 payload: json!({ "ok": true }),
1233 })
1234 .await;
1235 Ok(json!({
1236 "ok": true,
1237 "status": "completed",
1238 "execution_kind": execution_kind,
1239 "response_id": response_id,
1240 "precheck": precheck,
1241 "output": tool_output,
1242 "events": [
1243 event_frame_payload(&start_frame),
1244 event_frame_payload(&result_frame)
1245 ]
1246 }))
1247 }
1248 Err(err) => {
1249 let message = format!("{err:?}");
1250 let error_frame = EventFrame::Error {
1251 response_id: response_id.clone(),
1252 message: message.clone(),
1253 };
1254 self.hooks
1255 .emit(HookEvent::GenericEventFrame {
1256 frame: error_frame.clone(),
1257 })
1258 .await;
1259 self.hooks
1260 .emit(HookEvent::ToolLifecycle {
1261 response_id: response_id.clone(),
1262 tool_name: call.name,
1263 phase: "failed".to_string(),
1264 payload: json!({ "error": message.clone() }),
1265 })
1266 .await;
1267 Ok(json!({
1268 "ok": false,
1269 "status": "failed",
1270 "execution_kind": execution_kind,
1271 "response_id": response_id,
1272 "precheck": precheck,
1273 "error": message,
1274 "events": [
1275 event_frame_payload(&start_frame),
1276 event_frame_payload(&error_frame)
1277 ]
1278 }))
1279 }
1280 }
1281 }
1282
1283 pub async fn mcp_startup(&self) -> McpStartupCompleteEvent {
1285 let mut updates = Vec::new();
1286 let summary = self.mcp_manager.start_all(|update| {
1287 updates.push(update);
1288 });
1289 for update in updates {
1290 let status = match update.status {
1291 McpManagerStartupStatus::Starting => codewhale_protocol::McpStartupStatus::Starting,
1292 McpManagerStartupStatus::Ready => codewhale_protocol::McpStartupStatus::Ready,
1293 McpManagerStartupStatus::Failed { error } => {
1294 codewhale_protocol::McpStartupStatus::Failed { error }
1295 }
1296 McpManagerStartupStatus::Cancelled => {
1297 codewhale_protocol::McpStartupStatus::Cancelled
1298 }
1299 };
1300 self.hooks
1301 .emit(HookEvent::GenericEventFrame {
1302 frame: EventFrame::McpStartupUpdate {
1303 update: codewhale_protocol::McpStartupUpdateEvent {
1304 server_name: update.server_name,
1305 status,
1306 },
1307 },
1308 })
1309 .await;
1310 }
1311 self.hooks
1312 .emit(HookEvent::GenericEventFrame {
1313 frame: EventFrame::McpStartupComplete {
1314 summary: codewhale_protocol::McpStartupCompleteEvent {
1315 ready: summary.ready.clone(),
1316 failed: summary
1317 .failed
1318 .iter()
1319 .map(|f| codewhale_protocol::McpStartupFailure {
1320 server_name: f.server_name.clone(),
1321 error: f.error.clone(),
1322 })
1323 .collect(),
1324 cancelled: summary.cancelled.clone(),
1325 },
1326 },
1327 })
1328 .await;
1329 summary
1330 }
1331
1332 pub fn app_status(&self) -> AppResponse {
1334 let jobs = self.jobs.list();
1335 let events = jobs
1336 .iter()
1337 .flat_map(|job| {
1338 job.history.iter().map(|entry| EventFrame::ResponseDelta {
1339 response_id: job.id.clone(),
1340 delta: json!({
1341 "kind": "job_transition",
1342 "job_id": job.id.clone(),
1343 "phase": entry.phase.clone(),
1344 "status": job_status_to_str(entry.status),
1345 "progress": entry.progress,
1346 "detail": entry.detail.clone(),
1347 "retry": job_retry_to_value(&entry.retry),
1348 "at": entry.at
1349 })
1350 .to_string(),
1351 channel: ResponseChannel::Text,
1352 })
1353 })
1354 .collect::<Vec<_>>();
1355 AppResponse {
1356 ok: true,
1357 data: json!({
1358 "jobs": jobs.into_iter().map(|job| {
1359 json!({
1360 "id": job.id,
1361 "name": job.name,
1362 "status": job_status_to_str(job.status),
1363 "progress": job.progress,
1364 "detail": job.detail,
1365 "retry": job_retry_to_value(&job.retry),
1366 "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
1367 })
1368 }).collect::<Vec<_>>()
1369 }),
1370 events,
1371 }
1372 }
1373
1374 pub fn provider_default(&self) -> ProviderKind {
1376 self.config.provider
1377 }
1378
1379 pub fn save_thread_checkpoint(
1381 &self,
1382 thread_id: &str,
1383 checkpoint_id: &str,
1384 state: &Value,
1385 ) -> Result<()> {
1386 self.thread_manager
1387 .state_store()
1388 .save_checkpoint(thread_id, checkpoint_id, state)
1389 }
1390
1391 pub fn load_thread_checkpoint(
1393 &self,
1394 thread_id: &str,
1395 checkpoint_id: Option<&str>,
1396 ) -> Result<Option<Value>> {
1397 Ok(self
1398 .thread_manager
1399 .state_store()
1400 .load_checkpoint(thread_id, checkpoint_id)?
1401 .map(|checkpoint| checkpoint.state))
1402 }
1403
1404 pub fn enqueue_job(&mut self, name: impl Into<String>) -> Result<JobRecord> {
1406 let job = self.jobs.enqueue(name);
1407 self.jobs
1408 .persist_job(self.thread_manager.state_store(), &job.id)?;
1409 Ok(job)
1410 }
1411
1412 pub fn set_job_running(&mut self, job_id: &str) -> Result<()> {
1414 self.jobs.set_running(job_id);
1415 self.jobs
1416 .persist_job(self.thread_manager.state_store(), job_id)
1417 }
1418
1419 pub fn update_job_progress(
1421 &mut self,
1422 job_id: &str,
1423 progress: u8,
1424 detail: Option<String>,
1425 ) -> Result<()> {
1426 self.jobs.update_progress(job_id, progress, detail);
1427 self.jobs
1428 .persist_job(self.thread_manager.state_store(), job_id)
1429 }
1430
1431 pub fn complete_job(&mut self, job_id: &str) -> Result<()> {
1433 self.jobs.complete(job_id);
1434 self.jobs
1435 .persist_job(self.thread_manager.state_store(), job_id)
1436 }
1437
1438 pub fn fail_job(&mut self, job_id: &str, detail: impl Into<String>) -> Result<()> {
1440 self.jobs.fail(job_id, detail);
1441 self.jobs
1442 .persist_job(self.thread_manager.state_store(), job_id)
1443 }
1444
1445 pub fn cancel_job(&mut self, job_id: &str) -> Result<()> {
1447 self.jobs.cancel(job_id);
1448 self.jobs
1449 .persist_job(self.thread_manager.state_store(), job_id)
1450 }
1451
1452 pub fn pause_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1454 self.jobs.pause(job_id, detail);
1455 self.jobs
1456 .persist_job(self.thread_manager.state_store(), job_id)
1457 }
1458
1459 pub fn resume_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1461 self.jobs.resume(job_id, detail);
1462 self.jobs
1463 .persist_job(self.thread_manager.state_store(), job_id)
1464 }
1465
1466 pub fn job_history(&self, job_id: &str) -> Vec<JobHistoryEntry> {
1468 self.jobs.history(job_id)
1469 }
1470}
1471
1472fn thread_response_from_new(status: &str, new: NewThread) -> ThreadResponse {
1473 ThreadResponse {
1474 thread_id: new.thread.id.clone(),
1475 status: status.to_string(),
1476 thread: Some(new.thread),
1477 threads: Vec::new(),
1478 model: Some(new.model),
1479 model_provider: Some(new.model_provider),
1480 cwd: Some(new.cwd),
1481 approval_policy: new.approval_policy,
1482 sandbox: new.sandbox,
1483 events: Vec::new(),
1484 data: json!({}),
1485 }
1486}
1487
1488fn preview_from_initial_history(initial_history: &InitialHistory) -> String {
1489 match initial_history {
1490 InitialHistory::New => "New conversation".to_string(),
1491 InitialHistory::Forked(items) => truncate_preview(
1492 &items
1493 .first()
1494 .map(Value::to_string)
1495 .unwrap_or_else(|| "Forked conversation".to_string()),
1496 ),
1497 InitialHistory::Resumed { history, .. } => truncate_preview(
1498 &history
1499 .first()
1500 .map(Value::to_string)
1501 .unwrap_or_else(|| "Resumed conversation".to_string()),
1502 ),
1503 }
1504}
1505
1506fn permission_path_for_call(call: &ToolCall) -> Option<String> {
1507 match &call.payload {
1508 ToolPayload::Function { arguments } => serde_json::from_str::<Value>(arguments)
1509 .ok()
1510 .and_then(|value| {
1511 value
1512 .get("path")
1513 .and_then(Value::as_str)
1514 .map(str::to_string)
1515 }),
1516 ToolPayload::Mcp { raw_arguments, .. } => raw_arguments
1517 .get("path")
1518 .and_then(Value::as_str)
1519 .map(str::to_string),
1520 ToolPayload::Custom { .. } | ToolPayload::LocalShell { .. } => None,
1521 }
1522}
1523
1524fn truncate_preview(value: &str) -> String {
1525 value.chars().take(120).collect()
1526}
1527
1528fn to_protocol_thread(thread: ThreadMetadata) -> Thread {
1529 Thread {
1530 id: thread.id,
1531 preview: thread.preview,
1532 ephemeral: thread.ephemeral,
1533 model_provider: thread.model_provider,
1534 created_at: thread.created_at,
1535 updated_at: thread.updated_at,
1536 status: match thread.status {
1537 PersistedThreadStatus::Running => ThreadStatus::Running,
1538 PersistedThreadStatus::Idle => ThreadStatus::Idle,
1539 PersistedThreadStatus::Completed => ThreadStatus::Completed,
1540 PersistedThreadStatus::Failed => ThreadStatus::Failed,
1541 PersistedThreadStatus::Paused => ThreadStatus::Paused,
1542 PersistedThreadStatus::Archived => ThreadStatus::Archived,
1543 },
1544 path: thread.path,
1545 cwd: thread.cwd,
1546 cli_version: thread.cli_version,
1547 source: match thread.source {
1548 SessionSource::Interactive => codewhale_protocol::SessionSource::Interactive,
1549 SessionSource::Resume => codewhale_protocol::SessionSource::Resume,
1550 SessionSource::Fork => codewhale_protocol::SessionSource::Fork,
1551 SessionSource::Api => codewhale_protocol::SessionSource::Api,
1552 SessionSource::Unknown => codewhale_protocol::SessionSource::Unknown,
1553 },
1554 name: thread.name,
1555 }
1556}
1557
1558fn to_persisted_status(status: &ThreadStatus) -> PersistedThreadStatus {
1559 match status {
1560 ThreadStatus::Running => PersistedThreadStatus::Running,
1561 ThreadStatus::Idle => PersistedThreadStatus::Idle,
1562 ThreadStatus::Completed => PersistedThreadStatus::Completed,
1563 ThreadStatus::Failed => PersistedThreadStatus::Failed,
1564 ThreadStatus::Paused => PersistedThreadStatus::Paused,
1565 ThreadStatus::Archived => PersistedThreadStatus::Archived,
1566 }
1567}
1568
1569fn to_persisted_source(source: &codewhale_protocol::SessionSource) -> SessionSource {
1570 match source {
1571 codewhale_protocol::SessionSource::Interactive => SessionSource::Interactive,
1572 codewhale_protocol::SessionSource::Resume => SessionSource::Resume,
1573 codewhale_protocol::SessionSource::Fork => SessionSource::Fork,
1574 codewhale_protocol::SessionSource::Api => SessionSource::Api,
1575 codewhale_protocol::SessionSource::Unknown => SessionSource::Unknown,
1576 }
1577}
1578
1579fn approval_request_frame(
1580 requirement: &ExecApprovalRequirement,
1581 call_id: String,
1582 approval_id: String,
1583 turn_id: String,
1584 command: String,
1585 cwd: String,
1586) -> Option<EventFrame> {
1587 let ExecApprovalRequirement::NeedsApproval {
1588 reason,
1589 proposed_execpolicy_amendment,
1590 proposed_network_policy_amendments,
1591 } = requirement
1592 else {
1593 return None;
1594 };
1595
1596 let mut available_decisions = vec![
1597 ReviewDecision::Approved,
1598 ReviewDecision::ApprovedForSession,
1599 ReviewDecision::Denied,
1600 ReviewDecision::Abort,
1601 ];
1602 if proposed_execpolicy_amendment
1603 .as_ref()
1604 .is_some_and(|amendment| !amendment.prefixes.is_empty())
1605 {
1606 available_decisions.push(ReviewDecision::ApprovedExecpolicyAmendment);
1607 }
1608 available_decisions.extend(proposed_network_policy_amendments.iter().cloned().map(
1609 |amendment| ReviewDecision::NetworkPolicyAmendment {
1610 host: amendment.host,
1611 action: amendment.action,
1612 },
1613 ));
1614
1615 Some(EventFrame::ExecApprovalRequest {
1616 request: ExecApprovalRequestEvent {
1617 call_id,
1618 approval_id,
1619 turn_id,
1620 command,
1621 cwd,
1622 reason: reason.clone(),
1623 network_approval_context: None,
1624 proposed_execpolicy_amendment: proposed_execpolicy_amendment
1625 .as_ref()
1626 .map(|amendment| amendment.prefixes.clone())
1627 .unwrap_or_default(),
1628 proposed_network_policy_amendments: proposed_network_policy_amendments.clone(),
1629 additional_permissions: Vec::new(),
1630 available_decisions,
1631 },
1632 })
1633}
1634
1635fn approval_requirement_payload(requirement: &ExecApprovalRequirement) -> Value {
1636 match requirement {
1637 ExecApprovalRequirement::Skip {
1638 bypass_sandbox,
1639 proposed_execpolicy_amendment,
1640 } => json!({
1641 "type": "skip",
1642 "bypass_sandbox": bypass_sandbox,
1643 "reason": requirement.reason(),
1644 "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1645 .as_ref()
1646 .map(|amendment| amendment.prefixes.clone())
1647 .unwrap_or_default()
1648 }),
1649 ExecApprovalRequirement::NeedsApproval {
1650 reason,
1651 proposed_execpolicy_amendment,
1652 proposed_network_policy_amendments,
1653 } => json!({
1654 "type": "needs_approval",
1655 "reason": reason,
1656 "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1657 .as_ref()
1658 .map(|amendment| amendment.prefixes.clone())
1659 .unwrap_or_default(),
1660 "proposed_network_policy_amendments": proposed_network_policy_amendments
1661 }),
1662 ExecApprovalRequirement::Forbidden { reason } => json!({
1663 "type": "forbidden",
1664 "reason": reason
1665 }),
1666 }
1667}
1668
1669fn policy_precheck_payload(
1670 decision: &ExecPolicyDecision,
1671 command: &str,
1672 cwd: &str,
1673 execution_kind: &str,
1674) -> Value {
1675 json!({
1676 "execution_kind": execution_kind,
1677 "command": command,
1678 "cwd": cwd,
1679 "allow": decision.allow,
1680 "requires_approval": decision.requires_approval,
1681 "matched_rule": decision.matched_rule.clone(),
1682 "phase": decision.requirement.phase(),
1683 "reason": decision.reason(),
1684 "requirement": approval_requirement_payload(&decision.requirement)
1685 })
1686}
1687
1688fn tool_payload_value(payload: &ToolPayload) -> Value {
1689 serde_json::to_value(payload).unwrap_or_else(
1690 |_| json!({"type":"serialization_error","message":"tool payload unavailable"}),
1691 )
1692}
1693
1694fn tool_output_value(output: &codewhale_protocol::ToolOutput) -> Value {
1695 serde_json::to_value(output).unwrap_or_else(
1696 |_| json!({"type":"serialization_error","message":"tool output unavailable"}),
1697 )
1698}
1699
1700fn event_frame_payload(frame: &EventFrame) -> Value {
1701 serde_json::to_value(frame)
1702 .unwrap_or_else(|_| json!({"event":"error","message":"failed to encode event frame"}))
1703}
1704
1705fn json_optional_string(value: &Value) -> Option<String> {
1706 if value.is_null() {
1707 None
1708 } else {
1709 value.as_str().map(ToString::to_string)
1710 }
1711}
1712
1713fn parse_retry_metadata(value: Option<&Value>) -> JobRetryMetadata {
1714 let Some(value) = value else {
1715 return JobRetryMetadata::default();
1716 };
1717 JobRetryMetadata {
1718 attempt: value
1719 .get("attempt")
1720 .and_then(Value::as_u64)
1721 .unwrap_or(0)
1722 .min(u32::MAX as u64) as u32,
1723 max_attempts: value
1724 .get("max_attempts")
1725 .and_then(Value::as_u64)
1726 .unwrap_or(DEFAULT_JOB_MAX_ATTEMPTS as u64)
1727 .min(u32::MAX as u64) as u32,
1728 backoff_base_ms: value
1729 .get("backoff_base_ms")
1730 .and_then(Value::as_u64)
1731 .unwrap_or(DEFAULT_JOB_BACKOFF_BASE_MS),
1732 next_backoff_ms: value
1733 .get("next_backoff_ms")
1734 .and_then(Value::as_u64)
1735 .unwrap_or(0),
1736 next_retry_at: value.get("next_retry_at").and_then(Value::as_i64),
1737 }
1738}
1739
1740fn parse_history_entry(value: &Value) -> Option<JobHistoryEntry> {
1741 let status = value
1742 .get("status")
1743 .and_then(Value::as_str)
1744 .and_then(job_status_from_str)?;
1745 Some(JobHistoryEntry {
1746 at: value.get("at").and_then(Value::as_i64).unwrap_or(0),
1747 phase: value
1748 .get("phase")
1749 .and_then(Value::as_str)
1750 .unwrap_or("unknown")
1751 .to_string(),
1752 status,
1753 progress: value
1754 .get("progress")
1755 .and_then(Value::as_u64)
1756 .map(|v| v.min(u8::MAX as u64) as u8),
1757 detail: value.get("detail").and_then(json_optional_string),
1758 retry: parse_retry_metadata(value.get("retry")),
1759 })
1760}
1761
1762fn job_status_to_str(status: JobStatus) -> &'static str {
1763 match status {
1764 JobStatus::Queued => "queued",
1765 JobStatus::Running => "running",
1766 JobStatus::Paused => "paused",
1767 JobStatus::Completed => "completed",
1768 JobStatus::Failed => "failed",
1769 JobStatus::Cancelled => "cancelled",
1770 }
1771}
1772
1773fn job_status_from_str(value: &str) -> Option<JobStatus> {
1774 match value {
1775 "queued" => Some(JobStatus::Queued),
1776 "running" => Some(JobStatus::Running),
1777 "paused" => Some(JobStatus::Paused),
1778 "completed" => Some(JobStatus::Completed),
1779 "failed" => Some(JobStatus::Failed),
1780 "cancelled" => Some(JobStatus::Cancelled),
1781 _ => None,
1782 }
1783}
1784
1785fn job_retry_to_value(retry: &JobRetryMetadata) -> Value {
1786 json!({
1787 "attempt": retry.attempt,
1788 "max_attempts": retry.max_attempts,
1789 "backoff_base_ms": retry.backoff_base_ms,
1790 "next_backoff_ms": retry.next_backoff_ms,
1791 "next_retry_at": retry.next_retry_at
1792 })
1793}
1794
1795fn job_history_to_value(entry: &JobHistoryEntry) -> Value {
1796 json!({
1797 "at": entry.at,
1798 "phase": entry.phase.clone(),
1799 "status": job_status_to_str(entry.status),
1800 "progress": entry.progress,
1801 "detail": entry.detail.clone(),
1802 "retry": job_retry_to_value(&entry.retry)
1803 })
1804}
1805
1806fn runtime_status_to_job_state(status: JobStatus) -> JobStateStatus {
1807 match status {
1808 JobStatus::Queued => JobStateStatus::Queued,
1809 JobStatus::Running => JobStateStatus::Running,
1810 JobStatus::Paused => JobStateStatus::Running,
1811 JobStatus::Completed => JobStateStatus::Completed,
1812 JobStatus::Failed => JobStateStatus::Failed,
1813 JobStatus::Cancelled => JobStateStatus::Cancelled,
1814 }
1815}
1816
1817fn job_state_status_to_runtime(status: JobStateStatus) -> JobStatus {
1818 match status {
1819 JobStateStatus::Queued => JobStatus::Queued,
1820 JobStateStatus::Running => JobStatus::Running,
1821 JobStateStatus::Completed => JobStatus::Completed,
1822 JobStateStatus::Failed => JobStatus::Failed,
1823 JobStateStatus::Cancelled => JobStatus::Cancelled,
1824 }
1825}
1826
1827#[cfg(test)]
1828mod tests {
1829 use super::*;
1830 use codewhale_tools::ToolCallSource;
1831
1832 #[test]
1835 fn permission_path_for_call_extracts_function_path_argument() {
1836 let call = ToolCall {
1837 name: "read_file".to_string(),
1838 payload: ToolPayload::Function {
1839 arguments: json!({ "path": "README.md" }).to_string(),
1840 },
1841 source: ToolCallSource::Direct,
1842 raw_tool_call_id: None,
1843 };
1844
1845 assert_eq!(
1846 permission_path_for_call(&call).as_deref(),
1847 Some("README.md")
1848 );
1849 }
1850
1851 #[test]
1852 fn permission_path_for_call_extracts_mcp_path_argument() {
1853 let call = ToolCall {
1854 name: "mcp_fs_read".to_string(),
1855 payload: ToolPayload::Mcp {
1856 server: "fs".to_string(),
1857 tool: "read".to_string(),
1858 raw_arguments: json!({ "path": "secrets/token.txt" }),
1859 raw_tool_call_id: None,
1860 },
1861 source: ToolCallSource::Direct,
1862 raw_tool_call_id: None,
1863 };
1864
1865 assert_eq!(
1866 permission_path_for_call(&call).as_deref(),
1867 Some("secrets/token.txt")
1868 );
1869 }
1870
1871 #[test]
1872 fn permission_path_for_call_ignores_shell_payload() {
1873 let call = ToolCall {
1874 name: "exec_shell".to_string(),
1875 payload: ToolPayload::LocalShell {
1876 params: codewhale_protocol::LocalShellParams {
1877 command: "cargo test".to_string(),
1878 cwd: None,
1879 timeout_ms: None,
1880 },
1881 },
1882 source: ToolCallSource::Direct,
1883 raw_tool_call_id: None,
1884 };
1885
1886 assert_eq!(permission_path_for_call(&call), None);
1887 }
1888
1889 #[test]
1890 fn enqueue_creates_queued_job_with_zero_progress() {
1891 let mut jm = JobManager::default();
1892 let job = jm.enqueue("build");
1893 assert_eq!(job.name, "build");
1894 assert_eq!(job.status, JobStatus::Queued);
1895 assert_eq!(job.progress, Some(0));
1896 assert!(job.detail.is_none());
1897 assert_eq!(job.history.len(), 1);
1898 assert_eq!(job.history[0].phase, "created");
1899 }
1900
1901 #[test]
1902 fn set_running_transitions_from_queued() {
1903 let mut jm = JobManager::default();
1904 let job = jm.enqueue("deploy");
1905 let id = job.id.clone();
1906 jm.set_running(&id);
1907 let jobs = jm.list();
1908 let updated = jobs.iter().find(|j| j.id == id).unwrap();
1909 assert_eq!(updated.status, JobStatus::Running);
1910 assert_eq!(updated.history.last().unwrap().phase, "running");
1911 }
1912
1913 #[test]
1914 fn update_progress_clamps_to_100() {
1915 let mut jm = JobManager::default();
1916 let job = jm.enqueue("task");
1917 let id = job.id.clone();
1918 jm.update_progress(&id, 150, Some("over".to_string()));
1919 let jobs = jm.list();
1920 let updated = jobs.iter().find(|j| j.id == id).unwrap();
1921 assert_eq!(updated.progress, Some(100));
1922 }
1923
1924 #[test]
1925 fn complete_sets_progress_to_100() {
1926 let mut jm = JobManager::default();
1927 let job = jm.enqueue("task");
1928 let id = job.id.clone();
1929 jm.set_running(&id);
1930 jm.complete(&id);
1931 let jobs = jm.list();
1932 let updated = jobs.iter().find(|j| j.id == id).unwrap();
1933 assert_eq!(updated.status, JobStatus::Completed);
1934 assert_eq!(updated.progress, Some(100));
1935 }
1936
1937 #[test]
1938 fn fail_increments_attempt_and_sets_backoff() {
1939 let mut jm = JobManager::default();
1940 let job = jm.enqueue("fragile");
1941 let id = job.id.clone();
1942 jm.set_running(&id);
1943 jm.fail(&id, "crashed");
1944 let jobs = jm.list();
1945 let updated = jobs.iter().find(|j| j.id == id).unwrap();
1946 assert_eq!(updated.status, JobStatus::Failed);
1947 assert_eq!(updated.retry.attempt, 1);
1948 assert!(updated.retry.next_backoff_ms > 0);
1949 assert!(updated.retry.next_retry_at.is_some());
1950 assert_eq!(updated.detail.as_deref(), Some("crashed"));
1951 }
1952
1953 #[test]
1954 fn fail_clears_retry_after_max_attempts() {
1955 let mut jm = JobManager::default();
1956 let job = jm.enqueue("fragile");
1957 let id = job.id.clone();
1958 for _ in 0..=DEFAULT_JOB_MAX_ATTEMPTS {
1959 jm.set_running(&id);
1960 jm.fail(&id, "boom");
1961 }
1962 let jobs = jm.list();
1963 let updated = jobs.iter().find(|j| j.id == id).unwrap();
1964 assert_eq!(updated.retry.attempt, DEFAULT_JOB_MAX_ATTEMPTS);
1965 assert_eq!(updated.retry.next_backoff_ms, 0);
1966 assert!(updated.retry.next_retry_at.is_none());
1967 }
1968
1969 #[test]
1970 fn cancel_sets_status_and_clears_retry() {
1971 let mut jm = JobManager::default();
1972 let job = jm.enqueue("task");
1973 let id = job.id.clone();
1974 jm.cancel(&id);
1975 let jobs = jm.list();
1976 let updated = jobs.iter().find(|j| j.id == id).unwrap();
1977 assert_eq!(updated.status, JobStatus::Cancelled);
1978 assert_eq!(updated.retry.next_backoff_ms, 0);
1979 }
1980
1981 #[test]
1982 fn pause_and_resume_round_trip() {
1983 let mut jm = JobManager::default();
1984 let job = jm.enqueue("task");
1985 let id = job.id.clone();
1986 jm.set_running(&id);
1987 jm.pause(&id, Some("waiting".to_string()));
1988 let jobs = jm.list();
1989 let paused = jobs.iter().find(|j| j.id == id).unwrap();
1990 assert_eq!(paused.status, JobStatus::Paused);
1991 assert_eq!(paused.detail.as_deref(), Some("waiting"));
1992
1993 jm.resume(&id, None);
1994 let jobs = jm.list();
1995 let resumed = jobs.iter().find(|j| j.id == id).unwrap();
1996 assert_eq!(resumed.status, JobStatus::Running);
1997 assert_eq!(resumed.history.last().unwrap().phase, "resumed");
1998 }
1999
2000 #[test]
2001 fn list_returns_jobs_sorted_by_updated_at_desc() {
2002 let mut jm = JobManager::default();
2003 jm.enqueue("first");
2004 jm.enqueue("second");
2005 jm.enqueue("third");
2006 let jobs = jm.list();
2007 assert_eq!(jobs.len(), 3);
2008 for window in jobs.windows(2) {
2009 assert!(window[0].updated_at >= window[1].updated_at);
2010 }
2011 }
2012
2013 #[test]
2014 fn history_returns_entries_for_existing_job() {
2015 let mut jm = JobManager::default();
2016 let job = jm.enqueue("task");
2017 let id = job.id.clone();
2018 jm.set_running(&id);
2019 jm.complete(&id);
2020 let history = jm.history(&id);
2021 assert_eq!(history.len(), 3); assert_eq!(history[0].phase, "created");
2023 assert_eq!(history[1].phase, "running");
2024 assert_eq!(history[2].phase, "completed");
2025 }
2026
2027 #[test]
2028 fn history_returns_empty_for_unknown_job() {
2029 let jm = JobManager::default();
2030 assert!(jm.history("nonexistent").is_empty());
2031 }
2032
2033 #[test]
2034 fn resume_pending_requeues_running_and_queued() {
2035 let mut jm = JobManager::default();
2036 let _j1 = jm.enqueue("queued_task");
2037 let j2 = jm.enqueue("running_task");
2038 let j3 = jm.enqueue("completed_task");
2039 let id2 = j2.id.clone();
2040 let id3 = j3.id.clone();
2041 jm.set_running(&id2);
2042 jm.set_running(&id3);
2043 jm.complete(&id3);
2044
2045 let resumed = jm.resume_pending();
2046 assert_eq!(resumed.len(), 2);
2047 for job in &resumed {
2048 assert_eq!(job.status, JobStatus::Queued);
2049 }
2050 }
2051
2052 #[test]
2055 fn deterministic_backoff_zero_on_first_attempt() {
2056 let retry = JobRetryMetadata {
2057 attempt: 0,
2058 ..Default::default()
2059 };
2060 assert_eq!(JobManager::deterministic_backoff_ms(&retry), 0);
2061 }
2062
2063 #[test]
2064 fn deterministic_backoff_exponential_growth() {
2065 let base = DEFAULT_JOB_BACKOFF_BASE_MS;
2066 for attempt in 1..=5 {
2067 let retry = JobRetryMetadata {
2068 attempt,
2069 backoff_base_ms: base,
2070 ..Default::default()
2071 };
2072 let expected = base * 2u64.pow(attempt.saturating_sub(1).min(20));
2073 assert_eq!(
2074 JobManager::deterministic_backoff_ms(&retry),
2075 expected,
2076 "attempt {attempt}"
2077 );
2078 }
2079 }
2080
2081 #[test]
2082 fn deterministic_backoff_saturates_at_high_exponent() {
2083 let retry = JobRetryMetadata {
2084 attempt: 63,
2085 backoff_base_ms: 1000,
2086 ..Default::default()
2087 };
2088 let _ = JobManager::deterministic_backoff_ms(&retry);
2090 }
2091
2092 #[test]
2095 fn push_history_truncates_beyond_max() {
2096 let mut jm = JobManager::default();
2097 let job = jm.enqueue("task");
2098 let id = job.id.clone();
2099 for i in 0..(MAX_JOB_HISTORY_ENTRIES + 20) {
2101 jm.update_progress(&id, (i % 100) as u8, Some(format!("step {i}")));
2102 }
2103 let history = jm.history(&id);
2104 assert_eq!(history.len(), MAX_JOB_HISTORY_ENTRIES);
2105 }
2106
2107 #[test]
2110 fn encode_and_parse_persisted_detail_round_trip() {
2111 let mut jm = JobManager::default();
2112 let job = jm.enqueue("task");
2113 let id = job.id.clone();
2114 jm.set_running(&id);
2115 jm.fail(&id, "oops");
2116 let job = jm.list().into_iter().find(|j| j.id == id).unwrap();
2117
2118 let encoded = JobManager::encode_persisted_detail(&job).unwrap().unwrap();
2119 let parsed = JobManager::parse_persisted_detail(Some(&encoded)).unwrap();
2120
2121 assert_eq!(parsed.status, job.status);
2122 assert_eq!(parsed.detail, job.detail);
2123 assert_eq!(parsed.retry.attempt, job.retry.attempt);
2124 assert_eq!(parsed.history.len(), job.history.len());
2125 }
2126
2127 #[test]
2128 fn parse_persisted_detail_returns_none_for_none_input() {
2129 assert!(JobManager::parse_persisted_detail(None).is_none());
2130 }
2131
2132 #[test]
2133 fn parse_persisted_detail_returns_none_for_invalid_json() {
2134 assert!(JobManager::parse_persisted_detail(Some("not json")).is_none());
2135 }
2136
2137 #[test]
2140 fn job_status_round_trip_str() {
2141 let statuses = [
2142 JobStatus::Queued,
2143 JobStatus::Running,
2144 JobStatus::Paused,
2145 JobStatus::Completed,
2146 JobStatus::Failed,
2147 JobStatus::Cancelled,
2148 ];
2149 for status in &statuses {
2150 let s = job_status_to_str(*status);
2151 let parsed = job_status_from_str(s);
2152 assert_eq!(parsed, Some(*status), "round-trip failed for {s:?}");
2153 }
2154 }
2155
2156 #[test]
2157 fn job_status_from_str_returns_none_for_unknown() {
2158 assert_eq!(job_status_from_str("unknown"), None);
2159 assert_eq!(job_status_from_str(""), None);
2160 }
2161
2162 #[test]
2163 fn truncate_preview_limits_to_120_chars() {
2164 let long = "a".repeat(200);
2165 let truncated = truncate_preview(&long);
2166 assert_eq!(truncated.len(), 120);
2167 }
2168
2169 #[test]
2170 fn truncate_preview_preserves_short_strings() {
2171 let short = "hello";
2172 assert_eq!(truncate_preview(short), "hello");
2173 }
2174
2175 #[test]
2176 fn runtime_status_to_job_state_maps_correctly() {
2177 assert_eq!(
2178 runtime_status_to_job_state(JobStatus::Queued),
2179 JobStateStatus::Queued
2180 );
2181 assert_eq!(
2182 runtime_status_to_job_state(JobStatus::Running),
2183 JobStateStatus::Running
2184 );
2185 assert_eq!(
2186 runtime_status_to_job_state(JobStatus::Paused),
2187 JobStateStatus::Running
2188 );
2189 assert_eq!(
2190 runtime_status_to_job_state(JobStatus::Completed),
2191 JobStateStatus::Completed
2192 );
2193 assert_eq!(
2194 runtime_status_to_job_state(JobStatus::Failed),
2195 JobStateStatus::Failed
2196 );
2197 assert_eq!(
2198 runtime_status_to_job_state(JobStatus::Cancelled),
2199 JobStateStatus::Cancelled
2200 );
2201 }
2202
2203 #[test]
2204 fn job_state_status_to_runtime_maps_correctly() {
2205 assert_eq!(
2206 job_state_status_to_runtime(JobStateStatus::Queued),
2207 JobStatus::Queued
2208 );
2209 assert_eq!(
2210 job_state_status_to_runtime(JobStateStatus::Running),
2211 JobStatus::Running
2212 );
2213 assert_eq!(
2214 job_state_status_to_runtime(JobStateStatus::Completed),
2215 JobStatus::Completed
2216 );
2217 assert_eq!(
2218 job_state_status_to_runtime(JobStateStatus::Failed),
2219 JobStatus::Failed
2220 );
2221 assert_eq!(
2222 job_state_status_to_runtime(JobStateStatus::Cancelled),
2223 JobStatus::Cancelled
2224 );
2225 }
2226
2227 #[test]
2228 fn preview_from_initial_history_new() {
2229 let preview = preview_from_initial_history(&InitialHistory::New);
2230 assert_eq!(preview, "New conversation");
2231 }
2232
2233 #[test]
2234 fn preview_from_initial_history_forked() {
2235 let preview = preview_from_initial_history(&InitialHistory::Forked(vec![json!("hello")]));
2236 assert!(preview.contains("hello"));
2237 }
2238
2239 #[test]
2240 fn preview_from_initial_history_resumed() {
2241 let preview = preview_from_initial_history(&InitialHistory::Resumed {
2242 conversation_id: "test".to_string(),
2243 history: vec![json!("world")],
2244 rollout_path: PathBuf::from("/tmp/test"),
2245 });
2246 assert!(preview.contains("world"));
2247 }
2248
2249 #[test]
2250 fn json_optional_string_handles_null() {
2251 assert!(json_optional_string(&Value::Null).is_none());
2252 }
2253
2254 #[test]
2255 fn json_optional_string_handles_string() {
2256 assert_eq!(
2257 json_optional_string(&Value::String("hello".to_string())),
2258 Some("hello".to_string())
2259 );
2260 }
2261
2262 #[test]
2263 fn json_optional_string_handles_non_string() {
2264 assert!(json_optional_string(&json!(42)).is_none());
2265 }
2266
2267 #[test]
2268 fn parse_retry_metadata_returns_default_for_none() {
2269 let retry = parse_retry_metadata(None);
2270 assert_eq!(retry.attempt, 0);
2271 assert_eq!(retry.max_attempts, DEFAULT_JOB_MAX_ATTEMPTS);
2272 assert_eq!(retry.backoff_base_ms, DEFAULT_JOB_BACKOFF_BASE_MS);
2273 }
2274
2275 #[test]
2276 fn parse_retry_metadata_parses_fields() {
2277 let value = json!({
2278 "attempt": 2,
2279 "max_attempts": 5,
2280 "backoff_base_ms": 1000,
2281 "next_backoff_ms": 2000,
2282 "next_retry_at": 1234567890i64
2283 });
2284 let retry = parse_retry_metadata(Some(&value));
2285 assert_eq!(retry.attempt, 2);
2286 assert_eq!(retry.max_attempts, 5);
2287 assert_eq!(retry.backoff_base_ms, 1000);
2288 assert_eq!(retry.next_backoff_ms, 2000);
2289 assert_eq!(retry.next_retry_at, Some(1234567890));
2290 }
2291
2292 #[test]
2293 fn parse_history_entry_returns_none_without_status() {
2294 let value = json!({"at": 1, "phase": "test"});
2295 assert!(parse_history_entry(&value).is_none());
2296 }
2297
2298 #[test]
2299 fn parse_history_entry_parses_valid_entry() {
2300 let value = json!({
2301 "at": 100,
2302 "phase": "running",
2303 "status": "running",
2304 "progress": 50,
2305 "detail": "working",
2306 "retry": {"attempt": 0, "max_attempts": 3, "backoff_base_ms": 500}
2307 });
2308 let entry = parse_history_entry(&value).unwrap();
2309 assert_eq!(entry.at, 100);
2310 assert_eq!(entry.phase, "running");
2311 assert_eq!(entry.status, JobStatus::Running);
2312 assert_eq!(entry.progress, Some(50));
2313 assert_eq!(entry.detail.as_deref(), Some("working"));
2314 }
2315}