1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use anyhow::Result;
6use codewhale_agent::ModelRegistry;
7use codewhale_config::{CliRuntimeOverrides, ConfigToml, ProviderKind};
8use codewhale_execpolicy::{
9 AskForApproval, ExecApprovalRequirement, ExecPolicyContext, ExecPolicyDecision,
10 ExecPolicyEngine,
11};
12use codewhale_hooks::{HookDispatcher, HookEvent};
13use codewhale_mcp::{
14 McpManager, McpStartupCompleteEvent, McpStartupStatus as McpManagerStartupStatus,
15};
16use codewhale_protocol::{
17 AppResponse, EventFrame, ExecApprovalRequestEvent, PromptRequest, PromptResponse,
18 ResponseChannel, ReviewDecision, Thread, ThreadForkParams, ThreadGoal, ThreadGoalClearParams,
19 ThreadGoalGetParams, ThreadGoalSetParams, ThreadGoalStatus, ThreadListParams, ThreadReadParams,
20 ThreadRequest, ThreadResponse, ThreadResumeParams, ThreadSetNameParams, ThreadStatus,
21 ToolPayload,
22};
23use codewhale_state::{
24 JobStateRecord, JobStateStatus, SessionSource, StateStore, ThreadGoalRecord,
25 ThreadGoalStatus as PersistedThreadGoalStatus, ThreadListFilters, ThreadMetadata,
26 ThreadStatus as PersistedThreadStatus,
27};
28use codewhale_tools::{ToolCall, ToolRegistry};
29use serde_json::{Value, json};
30use uuid::Uuid;
31
32#[derive(Debug, Clone)]
34pub enum InitialHistory {
35 New,
37 Forked(Vec<Value>),
39 Resumed {
41 conversation_id: String,
42 history: Vec<Value>,
43 rollout_path: PathBuf,
44 },
45}
46
47#[derive(Debug, Clone)]
49pub struct NewThread {
50 pub thread: Thread,
52 pub model: String,
54 pub model_provider: String,
56 pub cwd: PathBuf,
58 pub approval_policy: Option<String>,
60 pub sandbox: Option<String>,
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum JobStatus {
67 Queued,
69 Running,
71 Paused,
73 Completed,
75 Failed,
77 Cancelled,
79}
80
81const JOB_DETAIL_SCHEMA_VERSION: u8 = 1;
82const DEFAULT_JOB_MAX_ATTEMPTS: u32 = 3;
83const DEFAULT_JOB_BACKOFF_BASE_MS: u64 = 500;
84const MAX_JOB_HISTORY_ENTRIES: usize = 64;
85
86#[derive(Debug, Clone)]
88pub struct JobRetryMetadata {
89 pub attempt: u32,
91 pub max_attempts: u32,
93 pub backoff_base_ms: u64,
95 pub next_backoff_ms: u64,
97 pub next_retry_at: Option<i64>,
99}
100
101impl Default for JobRetryMetadata {
102 fn default() -> Self {
103 Self {
104 attempt: 0,
105 max_attempts: DEFAULT_JOB_MAX_ATTEMPTS,
106 backoff_base_ms: DEFAULT_JOB_BACKOFF_BASE_MS,
107 next_backoff_ms: 0,
108 next_retry_at: None,
109 }
110 }
111}
112
113#[derive(Debug, Clone)]
115pub struct JobHistoryEntry {
116 pub at: i64,
118 pub phase: String,
120 pub status: JobStatus,
122 pub progress: Option<u8>,
124 pub detail: Option<String>,
126 pub retry: JobRetryMetadata,
128}
129
130#[derive(Debug, Clone)]
131struct PersistedJobDetail {
132 pub status: JobStatus,
133 pub detail: Option<String>,
134 pub retry: JobRetryMetadata,
135 pub history: Vec<JobHistoryEntry>,
136}
137
138#[derive(Debug, Clone)]
140pub struct JobRecord {
141 pub id: String,
143 pub name: String,
145 pub status: JobStatus,
147 pub progress: Option<u8>,
149 pub detail: Option<String>,
151 pub retry: JobRetryMetadata,
153 pub history: Vec<JobHistoryEntry>,
155 pub created_at: i64,
157 pub updated_at: i64,
159}
160
161#[derive(Debug, Default)]
163pub struct JobManager {
164 jobs: HashMap<String, JobRecord>,
165}
166
167impl JobManager {
168 fn now_ts() -> i64 {
169 chrono::Utc::now().timestamp()
170 }
171
172 fn deterministic_backoff_ms(retry: &JobRetryMetadata) -> u64 {
173 if retry.attempt == 0 {
174 return 0;
175 }
176 let exponent = retry.attempt.saturating_sub(1).min(20);
177 let multiplier = 1u64.checked_shl(exponent).unwrap_or(u64::MAX);
178 retry.backoff_base_ms.saturating_mul(multiplier)
179 }
180
181 fn clear_retry_schedule(retry: &mut JobRetryMetadata) {
182 retry.next_backoff_ms = 0;
183 retry.next_retry_at = None;
184 }
185
186 fn push_history(job: &mut JobRecord, phase: &str) {
187 job.history.push(JobHistoryEntry {
188 at: job.updated_at,
189 phase: phase.to_string(),
190 status: job.status,
191 progress: job.progress,
192 detail: job.detail.clone(),
193 retry: job.retry.clone(),
194 });
195 if job.history.len() > MAX_JOB_HISTORY_ENTRIES {
196 let to_drain = job.history.len() - MAX_JOB_HISTORY_ENTRIES;
197 job.history.drain(0..to_drain);
198 }
199 }
200
201 fn parse_persisted_detail(raw: Option<&str>) -> Option<PersistedJobDetail> {
202 let raw = raw?;
203 let parsed: Value = serde_json::from_str(raw).ok()?;
204 let status = parsed
205 .get("status")
206 .and_then(Value::as_str)
207 .and_then(job_status_from_str)?;
208 let detail = parsed.get("detail").and_then(json_optional_string);
209 let retry = parse_retry_metadata(parsed.get("retry"));
210 let history = parsed
211 .get("history")
212 .and_then(Value::as_array)
213 .map(|items| {
214 items
215 .iter()
216 .filter_map(parse_history_entry)
217 .collect::<Vec<_>>()
218 })
219 .unwrap_or_default();
220 Some(PersistedJobDetail {
221 status,
222 detail,
223 retry,
224 history,
225 })
226 }
227
228 fn encode_persisted_detail(job: &JobRecord) -> Result<Option<String>> {
229 let encoded = json!({
230 "schema_version": JOB_DETAIL_SCHEMA_VERSION,
231 "status": job_status_to_str(job.status),
232 "detail": job.detail.clone(),
233 "retry": job_retry_to_value(&job.retry),
234 "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
235 })
236 .to_string();
237 Ok(Some(encoded))
238 }
239
240 pub fn enqueue(&mut self, name: impl Into<String>) -> JobRecord {
242 let now = Self::now_ts();
243 let id = format!("job-{}", Uuid::new_v4());
244 let mut job = JobRecord {
245 id: id.clone(),
246 name: name.into(),
247 status: JobStatus::Queued,
248 progress: Some(0),
249 detail: None,
250 retry: JobRetryMetadata::default(),
251 history: Vec::new(),
252 created_at: now,
253 updated_at: now,
254 };
255 Self::push_history(&mut job, "created");
256 self.jobs.insert(id, job.clone());
257 job
258 }
259
260 pub fn set_running(&mut self, id: &str) {
262 if let Some(job) = self.jobs.get_mut(id) {
263 job.status = JobStatus::Running;
264 Self::clear_retry_schedule(&mut job.retry);
265 job.updated_at = Self::now_ts();
266 Self::push_history(job, "running");
267 }
268 }
269
270 pub fn update_progress(&mut self, id: &str, progress: u8, detail: Option<String>) {
272 if let Some(job) = self.jobs.get_mut(id) {
273 job.progress = Some(progress.min(100));
274 job.detail = detail;
275 job.updated_at = Self::now_ts();
276 Self::push_history(job, "progress_updated");
277 }
278 }
279
280 pub fn complete(&mut self, id: &str) {
282 if let Some(job) = self.jobs.get_mut(id) {
283 job.status = JobStatus::Completed;
284 job.progress = Some(100);
285 Self::clear_retry_schedule(&mut job.retry);
286 job.updated_at = Self::now_ts();
287 Self::push_history(job, "completed");
288 }
289 }
290
291 pub fn fail(&mut self, id: &str, detail: impl Into<String>) {
293 if let Some(job) = self.jobs.get_mut(id) {
294 let now = Self::now_ts();
295 job.status = JobStatus::Failed;
296 job.detail = Some(detail.into());
297 if job.retry.attempt < job.retry.max_attempts {
298 job.retry.attempt += 1;
299 job.retry.next_backoff_ms = Self::deterministic_backoff_ms(&job.retry);
300 let delay_secs = ((job.retry.next_backoff_ms.saturating_add(999)) / 1000)
301 .min(i64::MAX as u64) as i64;
302 job.retry.next_retry_at = Some(now.saturating_add(delay_secs));
303 } else {
304 Self::clear_retry_schedule(&mut job.retry);
305 }
306 job.updated_at = now;
307 Self::push_history(job, "failed");
308 }
309 }
310
311 pub fn cancel(&mut self, id: &str) {
313 if let Some(job) = self.jobs.get_mut(id) {
314 job.status = JobStatus::Cancelled;
315 Self::clear_retry_schedule(&mut job.retry);
316 job.updated_at = Self::now_ts();
317 Self::push_history(job, "cancelled");
318 }
319 }
320
321 pub fn pause(&mut self, id: &str, detail: Option<String>) {
323 if let Some(job) = self.jobs.get_mut(id) {
324 job.status = JobStatus::Paused;
325 if detail.is_some() {
326 job.detail = detail;
327 }
328 job.updated_at = Self::now_ts();
329 Self::push_history(job, "paused");
330 }
331 }
332
333 pub fn resume(&mut self, id: &str, detail: Option<String>) {
335 if let Some(job) = self.jobs.get_mut(id) {
336 job.status = JobStatus::Running;
337 if detail.is_some() {
338 job.detail = detail;
339 }
340 Self::clear_retry_schedule(&mut job.retry);
341 job.updated_at = Self::now_ts();
342 Self::push_history(job, "resumed");
343 }
344 }
345
346 pub fn list(&self) -> Vec<JobRecord> {
348 let mut out = self.jobs.values().cloned().collect::<Vec<_>>();
349 out.sort_by_key(|job| std::cmp::Reverse(job.updated_at));
350 out
351 }
352
353 pub fn history(&self, id: &str) -> Vec<JobHistoryEntry> {
355 self.jobs
356 .get(id)
357 .map(|job| job.history.clone())
358 .unwrap_or_default()
359 }
360
361 pub fn resume_pending(&mut self) -> Vec<JobRecord> {
363 let mut resumed = Vec::new();
364 for job in self.jobs.values_mut() {
365 if matches!(job.status, JobStatus::Queued | JobStatus::Running) {
366 job.status = JobStatus::Queued;
367 job.updated_at = Self::now_ts();
368 Self::push_history(job, "queued_after_resume");
369 resumed.push(job.clone());
370 }
371 }
372 resumed
373 }
374
375 pub fn load_from_store(&mut self, store: &StateStore) -> Result<()> {
377 let persisted = store.list_jobs(Some(500))?;
378 for job in persisted {
379 let fallback_status = job_state_status_to_runtime(job.status);
380 let parsed = Self::parse_persisted_detail(job.detail.as_deref());
381 let (status, detail, retry, history) = if let Some(detail_state) = parsed {
382 (
383 detail_state.status,
384 detail_state.detail,
385 detail_state.retry,
386 detail_state.history,
387 )
388 } else {
389 (
390 fallback_status,
391 job.detail,
392 JobRetryMetadata::default(),
393 Vec::new(),
394 )
395 };
396 self.jobs.insert(
397 job.id.clone(),
398 JobRecord {
399 id: job.id,
400 name: job.name,
401 status,
402 progress: job.progress,
403 detail,
404 retry,
405 history,
406 created_at: job.created_at,
407 updated_at: job.updated_at,
408 },
409 );
410 }
411 Ok(())
412 }
413
414 pub fn persist_job(&self, store: &StateStore, id: &str) -> Result<()> {
416 let Some(job) = self.jobs.get(id) else {
417 return Ok(());
418 };
419 let encoded_detail = Self::encode_persisted_detail(job)?;
420 store.upsert_job(&JobStateRecord {
421 id: job.id.clone(),
422 name: job.name.clone(),
423 status: runtime_status_to_job_state(job.status),
424 progress: job.progress,
425 detail: encoded_detail,
426 created_at: job.created_at,
427 updated_at: job.updated_at,
428 })
429 }
430
431 pub fn persist_all(&self, store: &StateStore) -> Result<()> {
433 for id in self.jobs.keys() {
434 self.persist_job(store, id)?;
435 }
436 Ok(())
437 }
438}
439
440pub struct ThreadManager {
442 store: StateStore,
443 running_threads: HashMap<String, Thread>,
444 cli_version: String,
445}
446
447impl ThreadManager {
448 pub fn new(store: StateStore) -> Self {
450 Self {
451 store,
452 running_threads: HashMap::new(),
453 cli_version: env!("CARGO_PKG_VERSION").to_string(),
454 }
455 }
456
457 pub fn state_store(&self) -> &StateStore {
459 &self.store
460 }
461
462 pub fn spawn_thread_with_history(
464 &mut self,
465 model_provider: String,
466 cwd: PathBuf,
467 initial_history: InitialHistory,
468 persist_extended_history: bool,
469 ) -> Result<NewThread> {
470 let id = format!("thread-{}", Uuid::new_v4());
471 let now = chrono::Utc::now().timestamp();
472 let preview = preview_from_initial_history(&initial_history);
473 let source = match initial_history {
474 InitialHistory::New => SessionSource::Interactive,
475 InitialHistory::Forked(_) => SessionSource::Fork,
476 InitialHistory::Resumed { .. } => SessionSource::Resume,
477 };
478 let thread = Thread {
479 id: id.clone(),
480 preview,
481 ephemeral: !persist_extended_history,
482 model_provider: model_provider.clone(),
483 created_at: now,
484 updated_at: now,
485 status: ThreadStatus::Running,
486 path: None,
487 cwd: cwd.clone(),
488 cli_version: self.cli_version.clone(),
489 source: match source {
490 SessionSource::Interactive => codewhale_protocol::SessionSource::Interactive,
491 SessionSource::Resume => codewhale_protocol::SessionSource::Resume,
492 SessionSource::Fork => codewhale_protocol::SessionSource::Fork,
493 SessionSource::Api => codewhale_protocol::SessionSource::Api,
494 SessionSource::Unknown => codewhale_protocol::SessionSource::Unknown,
495 },
496 name: None,
497 };
498 self.persist_thread(&thread, None)?;
499 match &initial_history {
500 InitialHistory::Forked(items) => {
501 for item in items {
502 self.store.append_message(
503 &thread.id,
504 "history",
505 &item.to_string(),
506 Some(item.clone()),
507 )?;
508 }
509 }
510 InitialHistory::Resumed { history, .. } => {
511 for item in history {
512 self.store.append_message(
513 &thread.id,
514 "history",
515 &item.to_string(),
516 Some(item.clone()),
517 )?;
518 }
519 }
520 InitialHistory::New => {}
521 }
522 self.running_threads
523 .insert(thread.id.clone(), thread.clone());
524 Ok(NewThread {
525 thread,
526 model: "auto".to_string(),
527 model_provider,
528 cwd,
529 approval_policy: None,
530 sandbox: None,
531 })
532 }
533
534 pub fn resume_thread_with_history(
536 &mut self,
537 params: &ThreadResumeParams,
538 fallback_cwd: &Path,
539 model_provider: String,
540 ) -> Result<Option<NewThread>> {
541 if params.history.is_none()
542 && let Some(thread) = self.running_threads.get(¶ms.thread_id).cloned()
543 {
544 return Ok(Some(NewThread {
545 model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
546 model_provider: params.model_provider.clone().unwrap_or(model_provider),
547 cwd: params.cwd.clone().unwrap_or_else(|| thread.cwd.clone()),
548 approval_policy: params.approval_policy.clone(),
549 sandbox: params.sandbox.clone(),
550 thread,
551 }));
552 }
553
554 let persisted = self.store.get_thread(¶ms.thread_id)?;
555 let Some(metadata) = persisted else {
556 return Ok(None);
557 };
558 let mut thread = to_protocol_thread(metadata);
559 thread.status = ThreadStatus::Running;
560 thread.updated_at = chrono::Utc::now().timestamp();
561 thread.cwd = params
562 .cwd
563 .clone()
564 .unwrap_or_else(|| fallback_cwd.to_path_buf());
565 self.persist_thread(&thread, None)?;
566 self.running_threads
567 .insert(thread.id.clone(), thread.clone());
568 if let Some(history) = params.history.as_ref() {
569 for item in history {
570 self.store.append_message(
571 &thread.id,
572 "history",
573 &item.to_string(),
574 Some(item.clone()),
575 )?;
576 }
577 }
578
579 Ok(Some(NewThread {
580 model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
581 model_provider: params.model_provider.clone().unwrap_or(model_provider),
582 cwd: thread.cwd.clone(),
583 approval_policy: params.approval_policy.clone(),
584 sandbox: params.sandbox.clone(),
585 thread,
586 }))
587 }
588
589 pub fn fork_thread(
591 &mut self,
592 params: &ThreadForkParams,
593 fallback_cwd: &Path,
594 ) -> Result<Option<NewThread>> {
595 let parent = self.store.get_thread(¶ms.thread_id)?;
596 let Some(parent) = parent else {
597 return Ok(None);
598 };
599 let parent_thread = to_protocol_thread(parent);
600 let new = self.spawn_thread_with_history(
601 params
602 .model_provider
603 .clone()
604 .unwrap_or_else(|| parent_thread.model_provider.clone()),
605 params
606 .cwd
607 .clone()
608 .unwrap_or_else(|| fallback_cwd.to_path_buf()),
609 InitialHistory::Forked(vec![json!({
610 "type": "fork",
611 "from_thread_id": parent_thread.id
612 })]),
613 params.persist_extended_history,
614 )?;
615 Ok(Some(new))
616 }
617
618 pub fn list_threads(&self, params: &ThreadListParams) -> Result<Vec<Thread>> {
620 let list = self.store.list_threads(ThreadListFilters {
621 include_archived: params.include_archived,
622 limit: params.limit,
623 })?;
624 Ok(list.into_iter().map(to_protocol_thread).collect())
625 }
626
627 pub fn read_thread(&self, params: &ThreadReadParams) -> Result<Option<Thread>> {
629 Ok(self
630 .store
631 .get_thread(¶ms.thread_id)?
632 .map(to_protocol_thread))
633 }
634
635 pub fn set_thread_name(&mut self, params: &ThreadSetNameParams) -> Result<Option<Thread>> {
637 let Some(mut metadata) = self.store.get_thread(¶ms.thread_id)? else {
638 return Ok(None);
639 };
640 metadata.name = Some(params.name.clone());
641 metadata.updated_at = chrono::Utc::now().timestamp();
642 self.store.upsert_thread(&metadata)?;
643 let updated = to_protocol_thread(metadata);
644 self.running_threads
645 .insert(updated.id.clone(), updated.clone());
646 Ok(Some(updated))
647 }
648
649 pub fn set_thread_goal(&mut self, params: &ThreadGoalSetParams) -> Result<Option<ThreadGoal>> {
651 if self.store.get_thread(¶ms.thread_id)?.is_none() {
652 return Ok(None);
653 }
654 let now = chrono::Utc::now().timestamp();
655 let goal = ThreadGoalRecord {
656 thread_id: params.thread_id.clone(),
657 goal_id: format!("goal-{}", Uuid::new_v4()),
658 objective: params.objective.clone(),
659 status: PersistedThreadGoalStatus::Active,
660 token_budget: params.token_budget,
661 tokens_used: 0,
662 time_used_seconds: 0,
663 created_at: now,
664 updated_at: now,
665 };
666 self.store.upsert_thread_goal(&goal)?;
667 Ok(Some(to_protocol_goal(goal)))
668 }
669
670 pub fn get_thread_goal(&self, params: &ThreadGoalGetParams) -> Result<Option<ThreadGoal>> {
672 Ok(self
673 .store
674 .get_thread_goal(¶ms.thread_id)?
675 .map(to_protocol_goal))
676 }
677
678 pub fn clear_thread_goal(&mut self, params: &ThreadGoalClearParams) -> Result<bool> {
680 self.store.delete_thread_goal(¶ms.thread_id)
681 }
682
683 pub fn archive_thread(&mut self, thread_id: &str) -> Result<()> {
685 self.store.mark_archived(thread_id)?;
686 if let Some(thread) = self.running_threads.get_mut(thread_id) {
687 thread.status = ThreadStatus::Archived;
688 }
689 Ok(())
690 }
691
692 pub fn unarchive_thread(&mut self, thread_id: &str) -> Result<()> {
694 self.store.mark_unarchived(thread_id)?;
695 Ok(())
696 }
697
698 pub fn touch_message(&mut self, thread_id: &str, input: &str) -> Result<()> {
700 let Some(mut metadata) = self.store.get_thread(thread_id)? else {
701 return Ok(());
702 };
703 metadata.updated_at = chrono::Utc::now().timestamp();
704 metadata.preview = truncate_preview(input);
705 metadata.status = PersistedThreadStatus::Running;
706 self.store.upsert_thread(&metadata)?;
707 if let Some(thread) = self.running_threads.get_mut(thread_id) {
708 thread.updated_at = metadata.updated_at;
709 thread.preview = metadata.preview;
710 thread.status = ThreadStatus::Running;
711 }
712 let message_id = self.store.append_message(thread_id, "user", input, None)?;
713 self.store.save_checkpoint(
714 thread_id,
715 "latest",
716 &json!({
717 "reason": "thread_message",
718 "message_id": message_id,
719 "role": "user",
720 "preview": truncate_preview(input),
721 "updated_at": metadata.updated_at
722 }),
723 )?;
724 Ok(())
725 }
726
727 fn persist_thread(&self, thread: &Thread, rollout_path: Option<PathBuf>) -> Result<()> {
728 self.store.upsert_thread(&ThreadMetadata {
729 id: thread.id.clone(),
730 rollout_path,
731 preview: thread.preview.clone(),
732 ephemeral: thread.ephemeral,
733 model_provider: thread.model_provider.clone(),
734 created_at: thread.created_at,
735 updated_at: thread.updated_at,
736 status: to_persisted_status(&thread.status),
737 path: thread.path.clone(),
738 cwd: thread.cwd.clone(),
739 cli_version: thread.cli_version.clone(),
740 source: to_persisted_source(&thread.source),
741 name: thread.name.clone(),
742 sandbox_policy: None,
743 approval_mode: None,
744 archived: matches!(thread.status, ThreadStatus::Archived),
745 archived_at: None,
746 git_sha: None,
747 git_branch: None,
748 git_origin_url: None,
749 memory_mode: None,
750 current_leaf_id: None,
751 })
752 }
753}
754
755pub struct Runtime {
757 pub config: ConfigToml,
759 pub model_registry: ModelRegistry,
761 pub thread_manager: ThreadManager,
763 pub tool_registry: Arc<ToolRegistry>,
765 pub mcp_manager: Arc<McpManager>,
767 pub exec_policy: ExecPolicyEngine,
769 pub hooks: HookDispatcher,
771 pub jobs: JobManager,
773}
774
775impl Runtime {
776 pub fn new(
778 config: ConfigToml,
779 model_registry: ModelRegistry,
780 state: StateStore,
781 tool_registry: Arc<ToolRegistry>,
782 mcp_manager: Arc<McpManager>,
783 exec_policy: ExecPolicyEngine,
784 hooks: HookDispatcher,
785 ) -> Self {
786 let mut jobs = JobManager::default();
787 if let Err(e) = jobs.load_from_store(&state) {
788 tracing::warn!("Failed to load job store, starting with empty job list: {e}");
789 }
790 Self {
791 config,
792 model_registry,
793 thread_manager: ThreadManager::new(state),
794 tool_registry,
795 mcp_manager,
796 exec_policy,
797 hooks,
798 jobs,
799 }
800 }
801
802 fn persisted_thread_data(&self, thread_id: &str) -> Result<Value> {
803 let history = self
804 .thread_manager
805 .state_store()
806 .list_messages(thread_id, Some(500))?
807 .into_iter()
808 .map(|message| {
809 json!({
810 "id": message.id,
811 "role": message.role,
812 "content": message.content,
813 "item": message.item,
814 "created_at": message.created_at
815 })
816 })
817 .collect::<Vec<_>>();
818
819 let checkpoint = self
820 .thread_manager
821 .state_store()
822 .load_checkpoint(thread_id, None)?
823 .map(|record| {
824 json!({
825 "checkpoint_id": record.checkpoint_id,
826 "state": record.state,
827 "created_at": record.created_at
828 })
829 });
830
831 let goal = self
832 .thread_manager
833 .state_store()
834 .get_thread_goal(thread_id)?
835 .map(to_protocol_goal);
836
837 Ok(json!({
838 "history": history,
839 "checkpoint": checkpoint,
840 "goal": goal
841 }))
842 }
843
844 fn persist_latest_checkpoint(&self, thread_id: &str, reason: &str, state: Value) -> Result<()> {
845 self.thread_manager.state_store().save_checkpoint(
846 thread_id,
847 "latest",
848 &json!({
849 "reason": reason,
850 "saved_at": chrono::Utc::now().timestamp(),
851 "state": state
852 }),
853 )
854 }
855
856 pub async fn handle_thread(&mut self, req: ThreadRequest) -> Result<ThreadResponse> {
858 match req {
859 ThreadRequest::Create { .. } => {
860 let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
861 let new = self.thread_manager.spawn_thread_with_history(
862 "deepseek".to_string(),
863 cwd,
864 InitialHistory::New,
865 false,
866 )?;
867 let mut response = thread_response_from_new("created", new);
868 response.data = self.persisted_thread_data(&response.thread_id)?;
869 Ok(response)
870 }
871 ThreadRequest::Start(params) => {
872 let cwd = params.cwd.clone().unwrap_or_else(|| {
873 std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
874 });
875 let new = self.thread_manager.spawn_thread_with_history(
876 params
877 .model_provider
878 .clone()
879 .unwrap_or_else(|| "deepseek".to_string()),
880 cwd,
881 InitialHistory::New,
882 params.persist_extended_history,
883 )?;
884 let mut response = thread_response_from_new("started", new);
885 response.data = self.persisted_thread_data(&response.thread_id)?;
886 Ok(response)
887 }
888 ThreadRequest::Resume(params) => {
889 let fallback_cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
890 if let Some(new) = self.thread_manager.resume_thread_with_history(
891 ¶ms,
892 &fallback_cwd,
893 "deepseek".to_string(),
894 )? {
895 let mut response = thread_response_from_new("resumed", new);
896 response.data = self.persisted_thread_data(&response.thread_id)?;
897 Ok(response)
898 } else {
899 Ok(ThreadResponse {
900 thread_id: params.thread_id,
901 status: "missing".to_string(),
902 thread: None,
903 threads: Vec::new(),
904 goal: None,
905 model: None,
906 model_provider: None,
907 cwd: None,
908 approval_policy: params.approval_policy,
909 sandbox: params.sandbox,
910 events: Vec::new(),
911 data: json!({"error":"thread not found"}),
912 })
913 }
914 }
915 ThreadRequest::Fork(params) => {
916 let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
917 if let Some(new) = self.thread_manager.fork_thread(¶ms, &cwd)? {
918 let mut response = thread_response_from_new("forked", new);
919 response.data = self.persisted_thread_data(&response.thread_id)?;
920 Ok(response)
921 } else {
922 Ok(ThreadResponse {
923 thread_id: params.thread_id,
924 status: "missing".to_string(),
925 thread: None,
926 threads: Vec::new(),
927 goal: None,
928 model: None,
929 model_provider: None,
930 cwd: None,
931 approval_policy: params.approval_policy,
932 sandbox: params.sandbox,
933 events: Vec::new(),
934 data: json!({"error":"thread not found"}),
935 })
936 }
937 }
938 ThreadRequest::List(params) => Ok(ThreadResponse {
939 thread_id: "list".to_string(),
940 status: "ok".to_string(),
941 thread: None,
942 threads: self.thread_manager.list_threads(¶ms)?,
943 goal: None,
944 model: None,
945 model_provider: None,
946 cwd: None,
947 approval_policy: None,
948 sandbox: None,
949 events: Vec::new(),
950 data: json!({}),
951 }),
952 ThreadRequest::Read(params) => {
953 let id = params.thread_id.clone();
954 let data = self.persisted_thread_data(&id)?;
955 Ok(ThreadResponse {
956 thread_id: id,
957 status: "ok".to_string(),
958 thread: self.thread_manager.read_thread(¶ms)?,
959 threads: Vec::new(),
960 goal: self.thread_manager.get_thread_goal(&ThreadGoalGetParams {
961 thread_id: params.thread_id,
962 })?,
963 model: None,
964 model_provider: None,
965 cwd: None,
966 approval_policy: None,
967 sandbox: None,
968 events: Vec::new(),
969 data,
970 })
971 }
972 ThreadRequest::SetName(params) => Ok(ThreadResponse {
973 thread_id: params.thread_id.clone(),
974 status: "ok".to_string(),
975 thread: self.thread_manager.set_thread_name(¶ms)?,
976 threads: Vec::new(),
977 goal: None,
978 model: None,
979 model_provider: None,
980 cwd: None,
981 approval_policy: None,
982 sandbox: None,
983 events: Vec::new(),
984 data: json!({}),
985 }),
986 ThreadRequest::GoalSet(params) => {
987 let thread_id = params.thread_id.clone();
988 if let Some(goal) = self.thread_manager.set_thread_goal(¶ms)? {
989 Ok(ThreadResponse {
990 thread_id,
991 status: "ok".to_string(),
992 thread: None,
993 threads: Vec::new(),
994 goal: Some(goal.clone()),
995 model: None,
996 model_provider: None,
997 cwd: None,
998 approval_policy: None,
999 sandbox: None,
1000 events: vec![EventFrame::ThreadGoalUpdated { goal: goal.clone() }],
1001 data: json!({ "goal": goal }),
1002 })
1003 } else {
1004 Ok(ThreadResponse {
1005 thread_id,
1006 status: "missing".to_string(),
1007 thread: None,
1008 threads: Vec::new(),
1009 goal: None,
1010 model: None,
1011 model_provider: None,
1012 cwd: None,
1013 approval_policy: None,
1014 sandbox: None,
1015 events: Vec::new(),
1016 data: json!({"error":"thread not found"}),
1017 })
1018 }
1019 }
1020 ThreadRequest::GoalGet(params) => {
1021 let goal = self.thread_manager.get_thread_goal(¶ms)?;
1022 Ok(ThreadResponse {
1023 thread_id: params.thread_id,
1024 status: "ok".to_string(),
1025 thread: None,
1026 threads: Vec::new(),
1027 goal: goal.clone(),
1028 model: None,
1029 model_provider: None,
1030 cwd: None,
1031 approval_policy: None,
1032 sandbox: None,
1033 events: Vec::new(),
1034 data: json!({ "goal": goal }),
1035 })
1036 }
1037 ThreadRequest::GoalClear(params) => {
1038 let thread_id = params.thread_id.clone();
1039 let cleared = self.thread_manager.clear_thread_goal(¶ms)?;
1040 Ok(ThreadResponse {
1041 thread_id: thread_id.clone(),
1042 status: if cleared { "cleared" } else { "empty" }.to_string(),
1043 thread: None,
1044 threads: Vec::new(),
1045 goal: None,
1046 model: None,
1047 model_provider: None,
1048 cwd: None,
1049 approval_policy: None,
1050 sandbox: None,
1051 events: if cleared {
1052 vec![EventFrame::ThreadGoalCleared { thread_id }]
1053 } else {
1054 Vec::new()
1055 },
1056 data: json!({ "cleared": cleared }),
1057 })
1058 }
1059 ThreadRequest::Archive { thread_id } => {
1060 self.thread_manager.archive_thread(&thread_id)?;
1061 Ok(ThreadResponse {
1062 thread_id,
1063 status: "archived".to_string(),
1064 thread: None,
1065 threads: Vec::new(),
1066 goal: None,
1067 model: None,
1068 model_provider: None,
1069 cwd: None,
1070 approval_policy: None,
1071 sandbox: None,
1072 events: Vec::new(),
1073 data: json!({}),
1074 })
1075 }
1076 ThreadRequest::Unarchive { thread_id } => {
1077 self.thread_manager.unarchive_thread(&thread_id)?;
1078 Ok(ThreadResponse {
1079 thread_id,
1080 status: "unarchived".to_string(),
1081 thread: None,
1082 threads: Vec::new(),
1083 goal: None,
1084 model: None,
1085 model_provider: None,
1086 cwd: None,
1087 approval_policy: None,
1088 sandbox: None,
1089 events: Vec::new(),
1090 data: json!({}),
1091 })
1092 }
1093 ThreadRequest::Message { thread_id, input } => {
1094 self.thread_manager.touch_message(&thread_id, &input)?;
1095 let response_id = format!("{thread_id}:{}", input.len());
1096 self.hooks
1097 .emit(HookEvent::ResponseStart {
1098 response_id: response_id.clone(),
1099 })
1100 .await;
1101 self.hooks
1102 .emit(HookEvent::ResponseEnd {
1103 response_id: response_id.clone(),
1104 })
1105 .await;
1106
1107 Ok(ThreadResponse {
1108 thread_id,
1109 status: "accepted".to_string(),
1110 thread: None,
1111 threads: Vec::new(),
1112 goal: None,
1113 model: None,
1114 model_provider: None,
1115 cwd: None,
1116 approval_policy: None,
1117 sandbox: None,
1118 events: vec![
1119 EventFrame::ResponseStart {
1120 response_id: response_id.clone(),
1121 },
1122 EventFrame::ResponseDelta {
1123 response_id: response_id.clone(),
1124 delta: "queued".to_string(),
1125 channel: ResponseChannel::Text,
1126 },
1127 EventFrame::ResponseEnd { response_id },
1128 ],
1129 data: json!({}),
1130 })
1131 }
1132 }
1133 }
1134
1135 pub async fn handle_prompt(
1137 &mut self,
1138 req: PromptRequest,
1139 cli_overrides: &CliRuntimeOverrides,
1140 ) -> Result<PromptResponse> {
1141 let resolved = self.config.resolve_runtime_options(cli_overrides);
1142 let requested_model = req.model.clone().unwrap_or_else(|| resolved.model.clone());
1143 let selection = self
1144 .model_registry
1145 .resolve(Some(&requested_model), Some(resolved.provider));
1146 let resolved_model = selection.resolved.id.clone();
1147 let response_id = format!("resp-{}", Uuid::new_v4());
1148
1149 self.hooks
1150 .emit(HookEvent::ResponseStart {
1151 response_id: response_id.clone(),
1152 })
1153 .await;
1154 self.hooks
1155 .emit(HookEvent::ResponseDelta {
1156 response_id: response_id.clone(),
1157 delta: "model-selected".to_string(),
1158 })
1159 .await;
1160 self.hooks
1161 .emit(HookEvent::ResponseEnd {
1162 response_id: response_id.clone(),
1163 })
1164 .await;
1165
1166 let payload = json!({
1167 "provider": resolved.provider.as_str(),
1168 "model": resolved_model.clone(),
1169 "prompt": req.prompt,
1170 "telemetry": resolved.telemetry,
1171 "base_url": resolved.base_url,
1172 "has_api_key": resolved.api_key.as_ref().is_some_and(|k| !k.trim().is_empty()),
1173 "approval_policy": resolved.approval_policy,
1174 "sandbox_mode": resolved.sandbox_mode
1175 });
1176 if let Some(thread_id) = req.thread_id.as_ref() {
1177 self.thread_manager.touch_message(thread_id, &req.prompt)?;
1178 let assistant_message_id = self.thread_manager.store.append_message(
1179 thread_id,
1180 "assistant",
1181 &payload.to_string(),
1182 Some(payload.clone()),
1183 )?;
1184 self.persist_latest_checkpoint(
1185 thread_id,
1186 "prompt_response",
1187 json!({
1188 "response_id": response_id.clone(),
1189 "model": resolved_model.clone(),
1190 "provider": resolved.provider.as_str(),
1191 "assistant_message_id": assistant_message_id
1192 }),
1193 )?;
1194 }
1195
1196 Ok(PromptResponse {
1197 output: payload.to_string(),
1198 model: resolved_model,
1199 events: vec![
1200 EventFrame::ResponseStart {
1201 response_id: response_id.clone(),
1202 },
1203 EventFrame::ResponseDelta {
1204 response_id: response_id.clone(),
1205 delta: "model-selected".to_string(),
1206 channel: ResponseChannel::Text,
1207 },
1208 EventFrame::ResponseEnd { response_id },
1209 ],
1210 })
1211 }
1212
1213 pub async fn invoke_tool(
1215 &self,
1216 call: ToolCall,
1217 approval_mode: AskForApproval,
1218 cwd: &Path,
1219 ) -> Result<Value> {
1220 let fallback_cwd = cwd.display().to_string();
1221 let (command, policy_cwd, execution_kind) = call.execution_subject(&fallback_cwd);
1222 let policy_tool = match &call.payload {
1223 ToolPayload::LocalShell { .. } => "exec_shell",
1224 _ => call.name.as_str(),
1225 };
1226 let policy_path = permission_path_for_call(&call);
1227 let decision = self.exec_policy.check(ExecPolicyContext {
1228 command: &command,
1229 cwd: &policy_cwd,
1230 tool: Some(policy_tool),
1231 path: policy_path.as_deref(),
1232 ask_for_approval: approval_mode,
1233 sandbox_mode: None,
1234 })?;
1235 let precheck = policy_precheck_payload(&decision, &command, &policy_cwd, execution_kind);
1236 let response_id = format!("tool-{}", Uuid::new_v4());
1237 let call_id = call
1238 .raw_tool_call_id
1239 .clone()
1240 .unwrap_or_else(|| format!("tool-call-{}", Uuid::new_v4()));
1241 self.hooks
1242 .emit(HookEvent::ToolLifecycle {
1243 response_id: response_id.clone(),
1244 tool_name: call.name.clone(),
1245 phase: "precheck".to_string(),
1246 payload: precheck.clone(),
1247 })
1248 .await;
1249
1250 if !decision.allow {
1251 let reason = decision.reason().to_string();
1252 let approval_id = format!("approval-{}", Uuid::new_v4());
1253 let error_frame = EventFrame::Error {
1254 response_id: response_id.clone(),
1255 message: reason.clone(),
1256 };
1257 self.hooks
1258 .emit(HookEvent::ApprovalLifecycle {
1259 approval_id,
1260 phase: "denied".to_string(),
1261 reason: Some(reason.clone()),
1262 })
1263 .await;
1264 self.hooks
1265 .emit(HookEvent::GenericEventFrame {
1266 frame: Box::new(error_frame.clone()),
1267 })
1268 .await;
1269 return Ok(json!({
1270 "ok": false,
1271 "status": "denied",
1272 "execution_kind": execution_kind,
1273 "response_id": response_id,
1274 "precheck": precheck,
1275 "error": reason,
1276 "events": [event_frame_payload(&error_frame)],
1277 }));
1278 }
1279
1280 if decision.requires_approval {
1281 let approval_id = format!("approval-{}", Uuid::new_v4());
1282 let reason = decision.reason().to_string();
1283 let maybe_approval_frame = approval_request_frame(
1284 &decision.requirement,
1285 decision.matched_rule.as_deref(),
1286 call_id,
1287 approval_id.clone(),
1288 response_id.clone(),
1289 command.clone(),
1290 policy_cwd.clone(),
1291 );
1292 self.hooks
1293 .emit(HookEvent::ApprovalLifecycle {
1294 approval_id: approval_id.clone(),
1295 phase: "requested".to_string(),
1296 reason: Some(reason.clone()),
1297 })
1298 .await;
1299 let mut events = Vec::new();
1300 if let Some(frame) = maybe_approval_frame {
1301 self.hooks
1302 .emit(HookEvent::GenericEventFrame {
1303 frame: Box::new(frame.clone()),
1304 })
1305 .await;
1306 events.push(event_frame_payload(&frame));
1307 }
1308 return Ok(json!({
1309 "ok": false,
1310 "status": "approval_required",
1311 "execution_kind": execution_kind,
1312 "response_id": response_id,
1313 "approval_id": approval_id,
1314 "precheck": precheck,
1315 "error": reason,
1316 "events": events,
1317 }));
1318 }
1319
1320 let start_frame = EventFrame::ToolCallStart {
1321 response_id: response_id.clone(),
1322 tool_name: call.name.clone(),
1323 arguments: tool_payload_value(&call.payload),
1324 };
1325 self.hooks
1326 .emit(HookEvent::GenericEventFrame {
1327 frame: Box::new(start_frame.clone()),
1328 })
1329 .await;
1330 self.hooks
1331 .emit(HookEvent::ToolLifecycle {
1332 response_id: response_id.clone(),
1333 tool_name: call.name.clone(),
1334 phase: "dispatching".to_string(),
1335 payload: json!({
1336 "call_id": call_id,
1337 "execution_kind": execution_kind
1338 }),
1339 })
1340 .await;
1341
1342 match self.tool_registry.dispatch(call.clone(), true).await {
1343 Ok(tool_output) => {
1344 let result_frame = EventFrame::ToolCallResult {
1345 response_id: response_id.clone(),
1346 tool_name: call.name.clone(),
1347 output: tool_output_value(&tool_output),
1348 };
1349 self.hooks
1350 .emit(HookEvent::GenericEventFrame {
1351 frame: Box::new(result_frame.clone()),
1352 })
1353 .await;
1354 self.hooks
1355 .emit(HookEvent::ToolLifecycle {
1356 response_id: response_id.clone(),
1357 tool_name: call.name,
1358 phase: "completed".to_string(),
1359 payload: json!({ "ok": true }),
1360 })
1361 .await;
1362 Ok(json!({
1363 "ok": true,
1364 "status": "completed",
1365 "execution_kind": execution_kind,
1366 "response_id": response_id,
1367 "precheck": precheck,
1368 "output": tool_output,
1369 "events": [
1370 event_frame_payload(&start_frame),
1371 event_frame_payload(&result_frame)
1372 ]
1373 }))
1374 }
1375 Err(err) => {
1376 let message = format!("{err:?}");
1377 let error_frame = EventFrame::Error {
1378 response_id: response_id.clone(),
1379 message: message.clone(),
1380 };
1381 self.hooks
1382 .emit(HookEvent::GenericEventFrame {
1383 frame: Box::new(error_frame.clone()),
1384 })
1385 .await;
1386 self.hooks
1387 .emit(HookEvent::ToolLifecycle {
1388 response_id: response_id.clone(),
1389 tool_name: call.name,
1390 phase: "failed".to_string(),
1391 payload: json!({ "error": message.clone() }),
1392 })
1393 .await;
1394 Ok(json!({
1395 "ok": false,
1396 "status": "failed",
1397 "execution_kind": execution_kind,
1398 "response_id": response_id,
1399 "precheck": precheck,
1400 "error": message,
1401 "events": [
1402 event_frame_payload(&start_frame),
1403 event_frame_payload(&error_frame)
1404 ]
1405 }))
1406 }
1407 }
1408 }
1409
1410 pub async fn mcp_startup(&self) -> McpStartupCompleteEvent {
1412 let mut updates = Vec::new();
1413 let summary = self.mcp_manager.start_all(|update| {
1414 updates.push(update);
1415 });
1416 for update in updates {
1417 let status = match update.status {
1418 McpManagerStartupStatus::Starting => codewhale_protocol::McpStartupStatus::Starting,
1419 McpManagerStartupStatus::Ready => codewhale_protocol::McpStartupStatus::Ready,
1420 McpManagerStartupStatus::Failed { error } => {
1421 codewhale_protocol::McpStartupStatus::Failed { error }
1422 }
1423 McpManagerStartupStatus::Cancelled => {
1424 codewhale_protocol::McpStartupStatus::Cancelled
1425 }
1426 };
1427 self.hooks
1428 .emit(HookEvent::GenericEventFrame {
1429 frame: Box::new(EventFrame::McpStartupUpdate {
1430 update: codewhale_protocol::McpStartupUpdateEvent {
1431 server_name: update.server_name,
1432 status,
1433 },
1434 }),
1435 })
1436 .await;
1437 }
1438 self.hooks
1439 .emit(HookEvent::GenericEventFrame {
1440 frame: Box::new(EventFrame::McpStartupComplete {
1441 summary: codewhale_protocol::McpStartupCompleteEvent {
1442 ready: summary.ready.clone(),
1443 failed: summary
1444 .failed
1445 .iter()
1446 .map(|f| codewhale_protocol::McpStartupFailure {
1447 server_name: f.server_name.clone(),
1448 error: f.error.clone(),
1449 })
1450 .collect(),
1451 cancelled: summary.cancelled.clone(),
1452 },
1453 }),
1454 })
1455 .await;
1456 summary
1457 }
1458
1459 pub fn app_status(&self) -> AppResponse {
1461 let jobs = self.jobs.list();
1462 let events = jobs
1463 .iter()
1464 .flat_map(|job| {
1465 job.history.iter().map(|entry| EventFrame::ResponseDelta {
1466 response_id: job.id.clone(),
1467 delta: json!({
1468 "kind": "job_transition",
1469 "job_id": job.id.clone(),
1470 "phase": entry.phase.clone(),
1471 "status": job_status_to_str(entry.status),
1472 "progress": entry.progress,
1473 "detail": entry.detail.clone(),
1474 "retry": job_retry_to_value(&entry.retry),
1475 "at": entry.at
1476 })
1477 .to_string(),
1478 channel: ResponseChannel::Text,
1479 })
1480 })
1481 .collect::<Vec<_>>();
1482 AppResponse {
1483 ok: true,
1484 data: json!({
1485 "jobs": jobs.into_iter().map(|job| {
1486 json!({
1487 "id": job.id,
1488 "name": job.name,
1489 "status": job_status_to_str(job.status),
1490 "progress": job.progress,
1491 "detail": job.detail,
1492 "retry": job_retry_to_value(&job.retry),
1493 "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
1494 })
1495 }).collect::<Vec<_>>()
1496 }),
1497 events,
1498 }
1499 }
1500
1501 pub fn provider_default(&self) -> ProviderKind {
1503 self.config.provider
1504 }
1505
1506 pub fn save_thread_checkpoint(
1508 &self,
1509 thread_id: &str,
1510 checkpoint_id: &str,
1511 state: &Value,
1512 ) -> Result<()> {
1513 self.thread_manager
1514 .state_store()
1515 .save_checkpoint(thread_id, checkpoint_id, state)
1516 }
1517
1518 pub fn load_thread_checkpoint(
1520 &self,
1521 thread_id: &str,
1522 checkpoint_id: Option<&str>,
1523 ) -> Result<Option<Value>> {
1524 Ok(self
1525 .thread_manager
1526 .state_store()
1527 .load_checkpoint(thread_id, checkpoint_id)?
1528 .map(|checkpoint| checkpoint.state))
1529 }
1530
1531 pub fn enqueue_job(&mut self, name: impl Into<String>) -> Result<JobRecord> {
1533 let job = self.jobs.enqueue(name);
1534 self.jobs
1535 .persist_job(self.thread_manager.state_store(), &job.id)?;
1536 Ok(job)
1537 }
1538
1539 pub fn set_job_running(&mut self, job_id: &str) -> Result<()> {
1541 self.jobs.set_running(job_id);
1542 self.jobs
1543 .persist_job(self.thread_manager.state_store(), job_id)
1544 }
1545
1546 pub fn update_job_progress(
1548 &mut self,
1549 job_id: &str,
1550 progress: u8,
1551 detail: Option<String>,
1552 ) -> Result<()> {
1553 self.jobs.update_progress(job_id, progress, detail);
1554 self.jobs
1555 .persist_job(self.thread_manager.state_store(), job_id)
1556 }
1557
1558 pub fn complete_job(&mut self, job_id: &str) -> Result<()> {
1560 self.jobs.complete(job_id);
1561 self.jobs
1562 .persist_job(self.thread_manager.state_store(), job_id)
1563 }
1564
1565 pub fn fail_job(&mut self, job_id: &str, detail: impl Into<String>) -> Result<()> {
1567 self.jobs.fail(job_id, detail);
1568 self.jobs
1569 .persist_job(self.thread_manager.state_store(), job_id)
1570 }
1571
1572 pub fn cancel_job(&mut self, job_id: &str) -> Result<()> {
1574 self.jobs.cancel(job_id);
1575 self.jobs
1576 .persist_job(self.thread_manager.state_store(), job_id)
1577 }
1578
1579 pub fn pause_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1581 self.jobs.pause(job_id, detail);
1582 self.jobs
1583 .persist_job(self.thread_manager.state_store(), job_id)
1584 }
1585
1586 pub fn resume_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1588 self.jobs.resume(job_id, detail);
1589 self.jobs
1590 .persist_job(self.thread_manager.state_store(), job_id)
1591 }
1592
1593 pub fn job_history(&self, job_id: &str) -> Vec<JobHistoryEntry> {
1595 self.jobs.history(job_id)
1596 }
1597}
1598
1599fn thread_response_from_new(status: &str, new: NewThread) -> ThreadResponse {
1600 ThreadResponse {
1601 thread_id: new.thread.id.clone(),
1602 status: status.to_string(),
1603 thread: Some(new.thread),
1604 threads: Vec::new(),
1605 goal: None,
1606 model: Some(new.model),
1607 model_provider: Some(new.model_provider),
1608 cwd: Some(new.cwd),
1609 approval_policy: new.approval_policy,
1610 sandbox: new.sandbox,
1611 events: Vec::new(),
1612 data: json!({}),
1613 }
1614}
1615
1616fn preview_from_initial_history(initial_history: &InitialHistory) -> String {
1617 match initial_history {
1618 InitialHistory::New => "New conversation".to_string(),
1619 InitialHistory::Forked(items) => truncate_preview(
1620 &items
1621 .first()
1622 .map(Value::to_string)
1623 .unwrap_or_else(|| "Forked conversation".to_string()),
1624 ),
1625 InitialHistory::Resumed { history, .. } => truncate_preview(
1626 &history
1627 .first()
1628 .map(Value::to_string)
1629 .unwrap_or_else(|| "Resumed conversation".to_string()),
1630 ),
1631 }
1632}
1633
1634fn permission_path_for_call(call: &ToolCall) -> Option<String> {
1635 match &call.payload {
1636 ToolPayload::Function { arguments } => serde_json::from_str::<Value>(arguments)
1637 .ok()
1638 .and_then(|value| {
1639 value
1640 .get("path")
1641 .and_then(Value::as_str)
1642 .map(str::to_string)
1643 }),
1644 ToolPayload::Mcp { raw_arguments, .. } => raw_arguments
1645 .get("path")
1646 .and_then(Value::as_str)
1647 .map(str::to_string),
1648 ToolPayload::Custom { .. } | ToolPayload::LocalShell { .. } => None,
1649 }
1650}
1651
1652fn truncate_preview(value: &str) -> String {
1653 value.chars().take(120).collect()
1654}
1655
1656fn to_protocol_thread(thread: ThreadMetadata) -> Thread {
1657 Thread {
1658 id: thread.id,
1659 preview: thread.preview,
1660 ephemeral: thread.ephemeral,
1661 model_provider: thread.model_provider,
1662 created_at: thread.created_at,
1663 updated_at: thread.updated_at,
1664 status: match thread.status {
1665 PersistedThreadStatus::Running => ThreadStatus::Running,
1666 PersistedThreadStatus::Idle => ThreadStatus::Idle,
1667 PersistedThreadStatus::Completed => ThreadStatus::Completed,
1668 PersistedThreadStatus::Failed => ThreadStatus::Failed,
1669 PersistedThreadStatus::Paused => ThreadStatus::Paused,
1670 PersistedThreadStatus::Archived => ThreadStatus::Archived,
1671 },
1672 path: thread.path,
1673 cwd: thread.cwd,
1674 cli_version: thread.cli_version,
1675 source: match thread.source {
1676 SessionSource::Interactive => codewhale_protocol::SessionSource::Interactive,
1677 SessionSource::Resume => codewhale_protocol::SessionSource::Resume,
1678 SessionSource::Fork => codewhale_protocol::SessionSource::Fork,
1679 SessionSource::Api => codewhale_protocol::SessionSource::Api,
1680 SessionSource::Unknown => codewhale_protocol::SessionSource::Unknown,
1681 },
1682 name: thread.name,
1683 }
1684}
1685
1686fn to_protocol_goal(goal: ThreadGoalRecord) -> ThreadGoal {
1687 ThreadGoal {
1688 thread_id: goal.thread_id,
1689 goal_id: goal.goal_id,
1690 objective: goal.objective,
1691 status: to_protocol_goal_status(goal.status),
1692 token_budget: goal.token_budget,
1693 tokens_used: goal.tokens_used,
1694 time_used_seconds: goal.time_used_seconds,
1695 created_at: goal.created_at,
1696 updated_at: goal.updated_at,
1697 }
1698}
1699
1700fn to_protocol_goal_status(status: PersistedThreadGoalStatus) -> ThreadGoalStatus {
1701 match status {
1702 PersistedThreadGoalStatus::Active => ThreadGoalStatus::Active,
1703 PersistedThreadGoalStatus::Paused => ThreadGoalStatus::Paused,
1704 PersistedThreadGoalStatus::Blocked => ThreadGoalStatus::Blocked,
1705 PersistedThreadGoalStatus::UsageLimited => ThreadGoalStatus::UsageLimited,
1706 PersistedThreadGoalStatus::BudgetLimited => ThreadGoalStatus::BudgetLimited,
1707 PersistedThreadGoalStatus::Complete => ThreadGoalStatus::Complete,
1708 }
1709}
1710
1711fn to_persisted_status(status: &ThreadStatus) -> PersistedThreadStatus {
1712 match status {
1713 ThreadStatus::Running => PersistedThreadStatus::Running,
1714 ThreadStatus::Idle => PersistedThreadStatus::Idle,
1715 ThreadStatus::Completed => PersistedThreadStatus::Completed,
1716 ThreadStatus::Failed => PersistedThreadStatus::Failed,
1717 ThreadStatus::Paused => PersistedThreadStatus::Paused,
1718 ThreadStatus::Archived => PersistedThreadStatus::Archived,
1719 }
1720}
1721
1722fn to_persisted_source(source: &codewhale_protocol::SessionSource) -> SessionSource {
1723 match source {
1724 codewhale_protocol::SessionSource::Interactive => SessionSource::Interactive,
1725 codewhale_protocol::SessionSource::Resume => SessionSource::Resume,
1726 codewhale_protocol::SessionSource::Fork => SessionSource::Fork,
1727 codewhale_protocol::SessionSource::Api => SessionSource::Api,
1728 codewhale_protocol::SessionSource::Unknown => SessionSource::Unknown,
1729 }
1730}
1731
1732fn approval_request_frame(
1733 requirement: &ExecApprovalRequirement,
1734 matched_rule: Option<&str>,
1735 call_id: String,
1736 approval_id: String,
1737 turn_id: String,
1738 command: String,
1739 cwd: String,
1740) -> Option<EventFrame> {
1741 let ExecApprovalRequirement::NeedsApproval {
1742 reason,
1743 proposed_execpolicy_amendment,
1744 proposed_network_policy_amendments,
1745 } = requirement
1746 else {
1747 return None;
1748 };
1749
1750 let mut available_decisions = vec![
1751 ReviewDecision::Approved,
1752 ReviewDecision::ApprovedForSession,
1753 ReviewDecision::Denied,
1754 ReviewDecision::Abort,
1755 ];
1756 if proposed_execpolicy_amendment
1757 .as_ref()
1758 .is_some_and(|amendment| !amendment.prefixes.is_empty())
1759 {
1760 available_decisions.push(ReviewDecision::ApprovedExecpolicyAmendment);
1761 }
1762 available_decisions.extend(proposed_network_policy_amendments.iter().cloned().map(
1763 |amendment| ReviewDecision::NetworkPolicyAmendment {
1764 host: amendment.host,
1765 action: amendment.action,
1766 },
1767 ));
1768
1769 Some(EventFrame::ExecApprovalRequest {
1770 request: ExecApprovalRequestEvent {
1771 call_id,
1772 approval_id,
1773 turn_id,
1774 command,
1775 cwd,
1776 reason: reason.clone(),
1777 matched_rule: matched_rule.map(|rule| rule.to_string().into_boxed_str()),
1778 network_approval_context: None,
1779 proposed_execpolicy_amendment: proposed_execpolicy_amendment
1780 .as_ref()
1781 .map(|amendment| amendment.prefixes.clone())
1782 .unwrap_or_default(),
1783 proposed_network_policy_amendments: proposed_network_policy_amendments.clone(),
1784 additional_permissions: Vec::new(),
1785 available_decisions,
1786 },
1787 })
1788}
1789
1790fn approval_requirement_payload(requirement: &ExecApprovalRequirement) -> Value {
1791 match requirement {
1792 ExecApprovalRequirement::Skip {
1793 bypass_sandbox,
1794 proposed_execpolicy_amendment,
1795 } => json!({
1796 "type": "skip",
1797 "bypass_sandbox": bypass_sandbox,
1798 "reason": requirement.reason(),
1799 "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1800 .as_ref()
1801 .map(|amendment| amendment.prefixes.clone())
1802 .unwrap_or_default()
1803 }),
1804 ExecApprovalRequirement::NeedsApproval {
1805 reason,
1806 proposed_execpolicy_amendment,
1807 proposed_network_policy_amendments,
1808 } => json!({
1809 "type": "needs_approval",
1810 "reason": reason,
1811 "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1812 .as_ref()
1813 .map(|amendment| amendment.prefixes.clone())
1814 .unwrap_or_default(),
1815 "proposed_network_policy_amendments": proposed_network_policy_amendments
1816 }),
1817 ExecApprovalRequirement::Forbidden { reason } => json!({
1818 "type": "forbidden",
1819 "reason": reason
1820 }),
1821 }
1822}
1823
1824fn policy_precheck_payload(
1825 decision: &ExecPolicyDecision,
1826 command: &str,
1827 cwd: &str,
1828 execution_kind: &str,
1829) -> Value {
1830 json!({
1831 "execution_kind": execution_kind,
1832 "command": command,
1833 "cwd": cwd,
1834 "allow": decision.allow,
1835 "requires_approval": decision.requires_approval,
1836 "matched_rule": decision.matched_rule.clone(),
1837 "phase": decision.requirement.phase(),
1838 "reason": decision.reason(),
1839 "requirement": approval_requirement_payload(&decision.requirement)
1840 })
1841}
1842
1843fn tool_payload_value(payload: &ToolPayload) -> Value {
1844 serde_json::to_value(payload).unwrap_or_else(
1845 |_| json!({"type":"serialization_error","message":"tool payload unavailable"}),
1846 )
1847}
1848
1849fn tool_output_value(output: &codewhale_protocol::ToolOutput) -> Value {
1850 serde_json::to_value(output).unwrap_or_else(
1851 |_| json!({"type":"serialization_error","message":"tool output unavailable"}),
1852 )
1853}
1854
1855fn event_frame_payload(frame: &EventFrame) -> Value {
1856 serde_json::to_value(frame)
1857 .unwrap_or_else(|_| json!({"event":"error","message":"failed to encode event frame"}))
1858}
1859
1860fn json_optional_string(value: &Value) -> Option<String> {
1861 if value.is_null() {
1862 None
1863 } else {
1864 value.as_str().map(ToString::to_string)
1865 }
1866}
1867
1868fn parse_retry_metadata(value: Option<&Value>) -> JobRetryMetadata {
1869 let Some(value) = value else {
1870 return JobRetryMetadata::default();
1871 };
1872 JobRetryMetadata {
1873 attempt: value
1874 .get("attempt")
1875 .and_then(Value::as_u64)
1876 .unwrap_or(0)
1877 .min(u32::MAX as u64) as u32,
1878 max_attempts: value
1879 .get("max_attempts")
1880 .and_then(Value::as_u64)
1881 .unwrap_or(DEFAULT_JOB_MAX_ATTEMPTS as u64)
1882 .min(u32::MAX as u64) as u32,
1883 backoff_base_ms: value
1884 .get("backoff_base_ms")
1885 .and_then(Value::as_u64)
1886 .unwrap_or(DEFAULT_JOB_BACKOFF_BASE_MS),
1887 next_backoff_ms: value
1888 .get("next_backoff_ms")
1889 .and_then(Value::as_u64)
1890 .unwrap_or(0),
1891 next_retry_at: value.get("next_retry_at").and_then(Value::as_i64),
1892 }
1893}
1894
1895fn parse_history_entry(value: &Value) -> Option<JobHistoryEntry> {
1896 let status = value
1897 .get("status")
1898 .and_then(Value::as_str)
1899 .and_then(job_status_from_str)?;
1900 Some(JobHistoryEntry {
1901 at: value.get("at").and_then(Value::as_i64).unwrap_or(0),
1902 phase: value
1903 .get("phase")
1904 .and_then(Value::as_str)
1905 .unwrap_or("unknown")
1906 .to_string(),
1907 status,
1908 progress: value
1909 .get("progress")
1910 .and_then(Value::as_u64)
1911 .map(|v| v.min(u8::MAX as u64) as u8),
1912 detail: value.get("detail").and_then(json_optional_string),
1913 retry: parse_retry_metadata(value.get("retry")),
1914 })
1915}
1916
1917fn job_status_to_str(status: JobStatus) -> &'static str {
1918 match status {
1919 JobStatus::Queued => "queued",
1920 JobStatus::Running => "running",
1921 JobStatus::Paused => "paused",
1922 JobStatus::Completed => "completed",
1923 JobStatus::Failed => "failed",
1924 JobStatus::Cancelled => "cancelled",
1925 }
1926}
1927
1928fn job_status_from_str(value: &str) -> Option<JobStatus> {
1929 match value {
1930 "queued" => Some(JobStatus::Queued),
1931 "running" => Some(JobStatus::Running),
1932 "paused" => Some(JobStatus::Paused),
1933 "completed" => Some(JobStatus::Completed),
1934 "failed" => Some(JobStatus::Failed),
1935 "cancelled" => Some(JobStatus::Cancelled),
1936 _ => None,
1937 }
1938}
1939
1940fn job_retry_to_value(retry: &JobRetryMetadata) -> Value {
1941 json!({
1942 "attempt": retry.attempt,
1943 "max_attempts": retry.max_attempts,
1944 "backoff_base_ms": retry.backoff_base_ms,
1945 "next_backoff_ms": retry.next_backoff_ms,
1946 "next_retry_at": retry.next_retry_at
1947 })
1948}
1949
1950fn job_history_to_value(entry: &JobHistoryEntry) -> Value {
1951 json!({
1952 "at": entry.at,
1953 "phase": entry.phase.clone(),
1954 "status": job_status_to_str(entry.status),
1955 "progress": entry.progress,
1956 "detail": entry.detail.clone(),
1957 "retry": job_retry_to_value(&entry.retry)
1958 })
1959}
1960
1961fn runtime_status_to_job_state(status: JobStatus) -> JobStateStatus {
1962 match status {
1963 JobStatus::Queued => JobStateStatus::Queued,
1964 JobStatus::Running => JobStateStatus::Running,
1965 JobStatus::Paused => JobStateStatus::Running,
1966 JobStatus::Completed => JobStateStatus::Completed,
1967 JobStatus::Failed => JobStateStatus::Failed,
1968 JobStatus::Cancelled => JobStateStatus::Cancelled,
1969 }
1970}
1971
1972fn job_state_status_to_runtime(status: JobStateStatus) -> JobStatus {
1973 match status {
1974 JobStateStatus::Queued => JobStatus::Queued,
1975 JobStateStatus::Running => JobStatus::Running,
1976 JobStateStatus::Completed => JobStatus::Completed,
1977 JobStateStatus::Failed => JobStatus::Failed,
1978 JobStateStatus::Cancelled => JobStatus::Cancelled,
1979 }
1980}
1981
1982#[cfg(test)]
1983mod tests {
1984 use super::*;
1985 use codewhale_tools::ToolCallSource;
1986
1987 #[test]
1990 fn permission_path_for_call_extracts_function_path_argument() {
1991 let call = ToolCall {
1992 name: "read_file".to_string(),
1993 payload: ToolPayload::Function {
1994 arguments: json!({ "path": "README.md" }).to_string(),
1995 },
1996 source: ToolCallSource::Direct,
1997 raw_tool_call_id: None,
1998 };
1999
2000 assert_eq!(
2001 permission_path_for_call(&call).as_deref(),
2002 Some("README.md")
2003 );
2004 }
2005
2006 #[test]
2007 fn permission_path_for_call_extracts_mcp_path_argument() {
2008 let call = ToolCall {
2009 name: "mcp_fs_read".to_string(),
2010 payload: ToolPayload::Mcp {
2011 server: "fs".to_string(),
2012 tool: "read".to_string(),
2013 raw_arguments: json!({ "path": "secrets/token.txt" }),
2014 raw_tool_call_id: None,
2015 },
2016 source: ToolCallSource::Direct,
2017 raw_tool_call_id: None,
2018 };
2019
2020 assert_eq!(
2021 permission_path_for_call(&call).as_deref(),
2022 Some("secrets/token.txt")
2023 );
2024 }
2025
2026 #[test]
2027 fn permission_path_for_call_ignores_shell_payload() {
2028 let call = ToolCall {
2029 name: "exec_shell".to_string(),
2030 payload: ToolPayload::LocalShell {
2031 params: codewhale_protocol::LocalShellParams {
2032 command: "cargo test".to_string(),
2033 cwd: None,
2034 timeout_ms: None,
2035 },
2036 },
2037 source: ToolCallSource::Direct,
2038 raw_tool_call_id: None,
2039 };
2040
2041 assert_eq!(permission_path_for_call(&call), None);
2042 }
2043
2044 #[test]
2045 fn approval_request_frame_includes_matched_rule() {
2046 let requirement = ExecApprovalRequirement::NeedsApproval {
2047 reason: "Typed ask rule 'tool=exec_shell command=cargo test' requires approval."
2048 .to_string(),
2049 proposed_execpolicy_amendment: None,
2050 proposed_network_policy_amendments: Vec::new(),
2051 };
2052
2053 let frame = approval_request_frame(
2054 &requirement,
2055 Some("tool=exec_shell command=cargo test"),
2056 "call-1".to_string(),
2057 "approval-1".to_string(),
2058 "turn-1".to_string(),
2059 "cargo test --workspace".to_string(),
2060 "/repo".to_string(),
2061 )
2062 .expect("approval frame");
2063
2064 let EventFrame::ExecApprovalRequest { request } = frame else {
2065 panic!("expected exec approval request frame");
2066 };
2067 assert_eq!(
2068 request.matched_rule.as_deref(),
2069 Some("tool=exec_shell command=cargo test")
2070 );
2071 assert_eq!(request.reason, requirement.reason());
2072 }
2073
2074 #[test]
2075 fn enqueue_creates_queued_job_with_zero_progress() {
2076 let mut jm = JobManager::default();
2077 let job = jm.enqueue("build");
2078 assert_eq!(job.name, "build");
2079 assert_eq!(job.status, JobStatus::Queued);
2080 assert_eq!(job.progress, Some(0));
2081 assert!(job.detail.is_none());
2082 assert_eq!(job.history.len(), 1);
2083 assert_eq!(job.history[0].phase, "created");
2084 }
2085
2086 #[test]
2087 fn set_running_transitions_from_queued() {
2088 let mut jm = JobManager::default();
2089 let job = jm.enqueue("deploy");
2090 let id = job.id.clone();
2091 jm.set_running(&id);
2092 let jobs = jm.list();
2093 let updated = jobs.iter().find(|j| j.id == id).unwrap();
2094 assert_eq!(updated.status, JobStatus::Running);
2095 assert_eq!(updated.history.last().unwrap().phase, "running");
2096 }
2097
2098 #[test]
2099 fn update_progress_clamps_to_100() {
2100 let mut jm = JobManager::default();
2101 let job = jm.enqueue("task");
2102 let id = job.id.clone();
2103 jm.update_progress(&id, 150, Some("over".to_string()));
2104 let jobs = jm.list();
2105 let updated = jobs.iter().find(|j| j.id == id).unwrap();
2106 assert_eq!(updated.progress, Some(100));
2107 }
2108
2109 #[test]
2110 fn complete_sets_progress_to_100() {
2111 let mut jm = JobManager::default();
2112 let job = jm.enqueue("task");
2113 let id = job.id.clone();
2114 jm.set_running(&id);
2115 jm.complete(&id);
2116 let jobs = jm.list();
2117 let updated = jobs.iter().find(|j| j.id == id).unwrap();
2118 assert_eq!(updated.status, JobStatus::Completed);
2119 assert_eq!(updated.progress, Some(100));
2120 }
2121
2122 #[test]
2123 fn fail_increments_attempt_and_sets_backoff() {
2124 let mut jm = JobManager::default();
2125 let job = jm.enqueue("fragile");
2126 let id = job.id.clone();
2127 jm.set_running(&id);
2128 jm.fail(&id, "crashed");
2129 let jobs = jm.list();
2130 let updated = jobs.iter().find(|j| j.id == id).unwrap();
2131 assert_eq!(updated.status, JobStatus::Failed);
2132 assert_eq!(updated.retry.attempt, 1);
2133 assert!(updated.retry.next_backoff_ms > 0);
2134 assert!(updated.retry.next_retry_at.is_some());
2135 assert_eq!(updated.detail.as_deref(), Some("crashed"));
2136 }
2137
2138 #[test]
2139 fn fail_clears_retry_after_max_attempts() {
2140 let mut jm = JobManager::default();
2141 let job = jm.enqueue("fragile");
2142 let id = job.id.clone();
2143 for _ in 0..=DEFAULT_JOB_MAX_ATTEMPTS {
2144 jm.set_running(&id);
2145 jm.fail(&id, "boom");
2146 }
2147 let jobs = jm.list();
2148 let updated = jobs.iter().find(|j| j.id == id).unwrap();
2149 assert_eq!(updated.retry.attempt, DEFAULT_JOB_MAX_ATTEMPTS);
2150 assert_eq!(updated.retry.next_backoff_ms, 0);
2151 assert!(updated.retry.next_retry_at.is_none());
2152 }
2153
2154 #[test]
2155 fn cancel_sets_status_and_clears_retry() {
2156 let mut jm = JobManager::default();
2157 let job = jm.enqueue("task");
2158 let id = job.id.clone();
2159 jm.cancel(&id);
2160 let jobs = jm.list();
2161 let updated = jobs.iter().find(|j| j.id == id).unwrap();
2162 assert_eq!(updated.status, JobStatus::Cancelled);
2163 assert_eq!(updated.retry.next_backoff_ms, 0);
2164 }
2165
2166 #[test]
2167 fn pause_and_resume_round_trip() {
2168 let mut jm = JobManager::default();
2169 let job = jm.enqueue("task");
2170 let id = job.id.clone();
2171 jm.set_running(&id);
2172 jm.pause(&id, Some("waiting".to_string()));
2173 let jobs = jm.list();
2174 let paused = jobs.iter().find(|j| j.id == id).unwrap();
2175 assert_eq!(paused.status, JobStatus::Paused);
2176 assert_eq!(paused.detail.as_deref(), Some("waiting"));
2177
2178 jm.resume(&id, None);
2179 let jobs = jm.list();
2180 let resumed = jobs.iter().find(|j| j.id == id).unwrap();
2181 assert_eq!(resumed.status, JobStatus::Running);
2182 assert_eq!(resumed.history.last().unwrap().phase, "resumed");
2183 }
2184
2185 #[test]
2186 fn list_returns_jobs_sorted_by_updated_at_desc() {
2187 let mut jm = JobManager::default();
2188 jm.enqueue("first");
2189 jm.enqueue("second");
2190 jm.enqueue("third");
2191 let jobs = jm.list();
2192 assert_eq!(jobs.len(), 3);
2193 for window in jobs.windows(2) {
2194 assert!(window[0].updated_at >= window[1].updated_at);
2195 }
2196 }
2197
2198 #[test]
2199 fn history_returns_entries_for_existing_job() {
2200 let mut jm = JobManager::default();
2201 let job = jm.enqueue("task");
2202 let id = job.id.clone();
2203 jm.set_running(&id);
2204 jm.complete(&id);
2205 let history = jm.history(&id);
2206 assert_eq!(history.len(), 3); assert_eq!(history[0].phase, "created");
2208 assert_eq!(history[1].phase, "running");
2209 assert_eq!(history[2].phase, "completed");
2210 }
2211
2212 #[test]
2213 fn history_returns_empty_for_unknown_job() {
2214 let jm = JobManager::default();
2215 assert!(jm.history("nonexistent").is_empty());
2216 }
2217
2218 #[test]
2219 fn resume_pending_requeues_running_and_queued() {
2220 let mut jm = JobManager::default();
2221 let _j1 = jm.enqueue("queued_task");
2222 let j2 = jm.enqueue("running_task");
2223 let j3 = jm.enqueue("completed_task");
2224 let id2 = j2.id.clone();
2225 let id3 = j3.id.clone();
2226 jm.set_running(&id2);
2227 jm.set_running(&id3);
2228 jm.complete(&id3);
2229
2230 let resumed = jm.resume_pending();
2231 assert_eq!(resumed.len(), 2);
2232 for job in &resumed {
2233 assert_eq!(job.status, JobStatus::Queued);
2234 }
2235 }
2236
2237 #[test]
2240 fn deterministic_backoff_zero_on_first_attempt() {
2241 let retry = JobRetryMetadata {
2242 attempt: 0,
2243 ..Default::default()
2244 };
2245 assert_eq!(JobManager::deterministic_backoff_ms(&retry), 0);
2246 }
2247
2248 #[test]
2249 fn deterministic_backoff_exponential_growth() {
2250 let base = DEFAULT_JOB_BACKOFF_BASE_MS;
2251 for attempt in 1..=5 {
2252 let retry = JobRetryMetadata {
2253 attempt,
2254 backoff_base_ms: base,
2255 ..Default::default()
2256 };
2257 let expected = base * 2u64.pow(attempt.saturating_sub(1).min(20));
2258 assert_eq!(
2259 JobManager::deterministic_backoff_ms(&retry),
2260 expected,
2261 "attempt {attempt}"
2262 );
2263 }
2264 }
2265
2266 #[test]
2267 fn deterministic_backoff_saturates_at_high_exponent() {
2268 let retry = JobRetryMetadata {
2269 attempt: 63,
2270 backoff_base_ms: 1000,
2271 ..Default::default()
2272 };
2273 let _ = JobManager::deterministic_backoff_ms(&retry);
2275 }
2276
2277 #[test]
2280 fn push_history_truncates_beyond_max() {
2281 let mut jm = JobManager::default();
2282 let job = jm.enqueue("task");
2283 let id = job.id.clone();
2284 for i in 0..(MAX_JOB_HISTORY_ENTRIES + 20) {
2286 jm.update_progress(&id, (i % 100) as u8, Some(format!("step {i}")));
2287 }
2288 let history = jm.history(&id);
2289 assert_eq!(history.len(), MAX_JOB_HISTORY_ENTRIES);
2290 }
2291
2292 #[test]
2295 fn encode_and_parse_persisted_detail_round_trip() {
2296 let mut jm = JobManager::default();
2297 let job = jm.enqueue("task");
2298 let id = job.id.clone();
2299 jm.set_running(&id);
2300 jm.fail(&id, "oops");
2301 let job = jm.list().into_iter().find(|j| j.id == id).unwrap();
2302
2303 let encoded = JobManager::encode_persisted_detail(&job).unwrap().unwrap();
2304 let parsed = JobManager::parse_persisted_detail(Some(&encoded)).unwrap();
2305
2306 assert_eq!(parsed.status, job.status);
2307 assert_eq!(parsed.detail, job.detail);
2308 assert_eq!(parsed.retry.attempt, job.retry.attempt);
2309 assert_eq!(parsed.history.len(), job.history.len());
2310 }
2311
2312 #[test]
2313 fn parse_persisted_detail_returns_none_for_none_input() {
2314 assert!(JobManager::parse_persisted_detail(None).is_none());
2315 }
2316
2317 #[test]
2318 fn parse_persisted_detail_returns_none_for_invalid_json() {
2319 assert!(JobManager::parse_persisted_detail(Some("not json")).is_none());
2320 }
2321
2322 #[test]
2325 fn job_status_round_trip_str() {
2326 let statuses = [
2327 JobStatus::Queued,
2328 JobStatus::Running,
2329 JobStatus::Paused,
2330 JobStatus::Completed,
2331 JobStatus::Failed,
2332 JobStatus::Cancelled,
2333 ];
2334 for status in &statuses {
2335 let s = job_status_to_str(*status);
2336 let parsed = job_status_from_str(s);
2337 assert_eq!(parsed, Some(*status), "round-trip failed for {s:?}");
2338 }
2339 }
2340
2341 #[test]
2342 fn job_status_from_str_returns_none_for_unknown() {
2343 assert_eq!(job_status_from_str("unknown"), None);
2344 assert_eq!(job_status_from_str(""), None);
2345 }
2346
2347 #[test]
2348 fn truncate_preview_limits_to_120_chars() {
2349 let long = "a".repeat(200);
2350 let truncated = truncate_preview(&long);
2351 assert_eq!(truncated.len(), 120);
2352 }
2353
2354 #[test]
2355 fn truncate_preview_preserves_short_strings() {
2356 let short = "hello";
2357 assert_eq!(truncate_preview(short), "hello");
2358 }
2359
2360 #[test]
2361 fn runtime_status_to_job_state_maps_correctly() {
2362 assert_eq!(
2363 runtime_status_to_job_state(JobStatus::Queued),
2364 JobStateStatus::Queued
2365 );
2366 assert_eq!(
2367 runtime_status_to_job_state(JobStatus::Running),
2368 JobStateStatus::Running
2369 );
2370 assert_eq!(
2371 runtime_status_to_job_state(JobStatus::Paused),
2372 JobStateStatus::Running
2373 );
2374 assert_eq!(
2375 runtime_status_to_job_state(JobStatus::Completed),
2376 JobStateStatus::Completed
2377 );
2378 assert_eq!(
2379 runtime_status_to_job_state(JobStatus::Failed),
2380 JobStateStatus::Failed
2381 );
2382 assert_eq!(
2383 runtime_status_to_job_state(JobStatus::Cancelled),
2384 JobStateStatus::Cancelled
2385 );
2386 }
2387
2388 #[test]
2389 fn job_state_status_to_runtime_maps_correctly() {
2390 assert_eq!(
2391 job_state_status_to_runtime(JobStateStatus::Queued),
2392 JobStatus::Queued
2393 );
2394 assert_eq!(
2395 job_state_status_to_runtime(JobStateStatus::Running),
2396 JobStatus::Running
2397 );
2398 assert_eq!(
2399 job_state_status_to_runtime(JobStateStatus::Completed),
2400 JobStatus::Completed
2401 );
2402 assert_eq!(
2403 job_state_status_to_runtime(JobStateStatus::Failed),
2404 JobStatus::Failed
2405 );
2406 assert_eq!(
2407 job_state_status_to_runtime(JobStateStatus::Cancelled),
2408 JobStatus::Cancelled
2409 );
2410 }
2411
2412 #[test]
2413 fn preview_from_initial_history_new() {
2414 let preview = preview_from_initial_history(&InitialHistory::New);
2415 assert_eq!(preview, "New conversation");
2416 }
2417
2418 #[test]
2419 fn preview_from_initial_history_forked() {
2420 let preview = preview_from_initial_history(&InitialHistory::Forked(vec![json!("hello")]));
2421 assert!(preview.contains("hello"));
2422 }
2423
2424 #[test]
2425 fn preview_from_initial_history_resumed() {
2426 let preview = preview_from_initial_history(&InitialHistory::Resumed {
2427 conversation_id: "test".to_string(),
2428 history: vec![json!("world")],
2429 rollout_path: PathBuf::from("/tmp/test"),
2430 });
2431 assert!(preview.contains("world"));
2432 }
2433
2434 #[test]
2435 fn json_optional_string_handles_null() {
2436 assert!(json_optional_string(&Value::Null).is_none());
2437 }
2438
2439 #[test]
2440 fn json_optional_string_handles_string() {
2441 assert_eq!(
2442 json_optional_string(&Value::String("hello".to_string())),
2443 Some("hello".to_string())
2444 );
2445 }
2446
2447 #[test]
2448 fn json_optional_string_handles_non_string() {
2449 assert!(json_optional_string(&json!(42)).is_none());
2450 }
2451
2452 #[test]
2453 fn parse_retry_metadata_returns_default_for_none() {
2454 let retry = parse_retry_metadata(None);
2455 assert_eq!(retry.attempt, 0);
2456 assert_eq!(retry.max_attempts, DEFAULT_JOB_MAX_ATTEMPTS);
2457 assert_eq!(retry.backoff_base_ms, DEFAULT_JOB_BACKOFF_BASE_MS);
2458 }
2459
2460 #[test]
2461 fn parse_retry_metadata_parses_fields() {
2462 let value = json!({
2463 "attempt": 2,
2464 "max_attempts": 5,
2465 "backoff_base_ms": 1000,
2466 "next_backoff_ms": 2000,
2467 "next_retry_at": 1234567890i64
2468 });
2469 let retry = parse_retry_metadata(Some(&value));
2470 assert_eq!(retry.attempt, 2);
2471 assert_eq!(retry.max_attempts, 5);
2472 assert_eq!(retry.backoff_base_ms, 1000);
2473 assert_eq!(retry.next_backoff_ms, 2000);
2474 assert_eq!(retry.next_retry_at, Some(1234567890));
2475 }
2476
2477 #[test]
2478 fn parse_history_entry_returns_none_without_status() {
2479 let value = json!({"at": 1, "phase": "test"});
2480 assert!(parse_history_entry(&value).is_none());
2481 }
2482
2483 #[test]
2484 fn parse_history_entry_parses_valid_entry() {
2485 let value = json!({
2486 "at": 100,
2487 "phase": "running",
2488 "status": "running",
2489 "progress": 50,
2490 "detail": "working",
2491 "retry": {"attempt": 0, "max_attempts": 3, "backoff_base_ms": 500}
2492 });
2493 let entry = parse_history_entry(&value).unwrap();
2494 assert_eq!(entry.at, 100);
2495 assert_eq!(entry.phase, "running");
2496 assert_eq!(entry.status, JobStatus::Running);
2497 assert_eq!(entry.progress, Some(50));
2498 assert_eq!(entry.detail.as_deref(), Some("working"));
2499 }
2500}