1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use anyhow::Result;
6use codewhale_agent::ModelRegistry;
7use codewhale_config::{CliRuntimeOverrides, ConfigToml, ProviderKind};
8use codewhale_execpolicy::{
9 AskForApproval, ExecApprovalRequirement, ExecPolicyContext, ExecPolicyDecision,
10 ExecPolicyEngine,
11};
12use codewhale_hooks::{HookDispatcher, HookEvent};
13use codewhale_mcp::{
14 McpManager, McpStartupCompleteEvent, McpStartupStatus as McpManagerStartupStatus,
15};
16use codewhale_protocol::{
17 AppResponse, EventFrame, ExecApprovalRequestEvent, PromptRequest, PromptResponse,
18 ResponseChannel, ReviewDecision, Thread, ThreadForkParams, ThreadGoal, ThreadGoalClearParams,
19 ThreadGoalGetParams, ThreadGoalProgressParams, ThreadGoalSetParams, ThreadGoalStatus,
20 ThreadListParams, ThreadReadParams, ThreadRequest, ThreadResponse, ThreadResumeParams,
21 ThreadSetNameParams, ThreadStatus, ToolPayload, UserInputRequestEvent,
22};
23use codewhale_state::{
24 JobStateRecord, JobStateStatus, SessionSource, StateStore, ThreadGoalRecord,
25 ThreadGoalStatus as PersistedThreadGoalStatus, ThreadListFilters, ThreadMetadata,
26 ThreadStatus as PersistedThreadStatus,
27};
28use codewhale_tools::{ToolCall, ToolRegistry};
29use serde_json::{Value, json};
30use uuid::Uuid;
31
32#[derive(Debug, Clone)]
34pub enum InitialHistory {
35 New,
37 Forked(Vec<Value>),
39 Resumed {
41 conversation_id: String,
42 history: Vec<Value>,
43 rollout_path: PathBuf,
44 },
45}
46
47#[derive(Debug, Clone)]
49pub struct NewThread {
50 pub thread: Thread,
52 pub model: String,
54 pub model_provider: String,
56 pub cwd: PathBuf,
58 pub approval_policy: Option<String>,
60 pub sandbox: Option<String>,
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum JobStatus {
67 Queued,
69 Running,
71 Paused,
73 Completed,
75 Failed,
77 Cancelled,
79}
80
81const JOB_DETAIL_SCHEMA_VERSION: u8 = 1;
82const DEFAULT_JOB_MAX_ATTEMPTS: u32 = 3;
83const DEFAULT_JOB_BACKOFF_BASE_MS: u64 = 500;
84const MAX_JOB_HISTORY_ENTRIES: usize = 64;
85
86#[derive(Debug, Clone)]
88pub struct JobRetryMetadata {
89 pub attempt: u32,
91 pub max_attempts: u32,
93 pub backoff_base_ms: u64,
95 pub next_backoff_ms: u64,
97 pub next_retry_at: Option<i64>,
99}
100
101impl Default for JobRetryMetadata {
102 fn default() -> Self {
103 Self {
104 attempt: 0,
105 max_attempts: DEFAULT_JOB_MAX_ATTEMPTS,
106 backoff_base_ms: DEFAULT_JOB_BACKOFF_BASE_MS,
107 next_backoff_ms: 0,
108 next_retry_at: None,
109 }
110 }
111}
112
113#[derive(Debug, Clone)]
115pub struct JobHistoryEntry {
116 pub at: i64,
118 pub phase: String,
120 pub status: JobStatus,
122 pub progress: Option<u8>,
124 pub detail: Option<String>,
126 pub retry: JobRetryMetadata,
128}
129
130#[derive(Debug, Clone)]
131struct PersistedJobDetail {
132 pub status: JobStatus,
133 pub detail: Option<String>,
134 pub retry: JobRetryMetadata,
135 pub history: Vec<JobHistoryEntry>,
136}
137
138#[derive(Debug, Clone)]
140pub struct JobRecord {
141 pub id: String,
143 pub name: String,
145 pub status: JobStatus,
147 pub progress: Option<u8>,
149 pub detail: Option<String>,
151 pub retry: JobRetryMetadata,
153 pub history: Vec<JobHistoryEntry>,
155 pub created_at: i64,
157 pub updated_at: i64,
159}
160
161#[derive(Debug, Default)]
163pub struct JobManager {
164 jobs: HashMap<String, JobRecord>,
165}
166
167impl JobManager {
168 fn now_ts() -> i64 {
169 chrono::Utc::now().timestamp()
170 }
171
172 fn deterministic_backoff_ms(retry: &JobRetryMetadata) -> u64 {
173 if retry.attempt == 0 {
174 return 0;
175 }
176 let exponent = retry.attempt.saturating_sub(1).min(20);
177 let multiplier = 1u64.checked_shl(exponent).unwrap_or(u64::MAX);
178 retry.backoff_base_ms.saturating_mul(multiplier)
179 }
180
181 fn clear_retry_schedule(retry: &mut JobRetryMetadata) {
182 retry.next_backoff_ms = 0;
183 retry.next_retry_at = None;
184 }
185
186 fn push_history(job: &mut JobRecord, phase: &str) {
187 job.history.push(JobHistoryEntry {
188 at: job.updated_at,
189 phase: phase.to_string(),
190 status: job.status,
191 progress: job.progress,
192 detail: job.detail.clone(),
193 retry: job.retry.clone(),
194 });
195 if job.history.len() > MAX_JOB_HISTORY_ENTRIES {
196 let to_drain = job.history.len() - MAX_JOB_HISTORY_ENTRIES;
197 job.history.drain(0..to_drain);
198 }
199 }
200
201 fn parse_persisted_detail(raw: Option<&str>) -> Option<PersistedJobDetail> {
202 let raw = raw?;
203 let parsed: Value = serde_json::from_str(raw).ok()?;
204 let status = parsed
205 .get("status")
206 .and_then(Value::as_str)
207 .and_then(job_status_from_str)?;
208 let detail = parsed.get("detail").and_then(json_optional_string);
209 let retry = parse_retry_metadata(parsed.get("retry"));
210 let history = parsed
211 .get("history")
212 .and_then(Value::as_array)
213 .map(|items| {
214 items
215 .iter()
216 .filter_map(parse_history_entry)
217 .collect::<Vec<_>>()
218 })
219 .unwrap_or_default();
220 Some(PersistedJobDetail {
221 status,
222 detail,
223 retry,
224 history,
225 })
226 }
227
228 fn encode_persisted_detail(job: &JobRecord) -> Result<Option<String>> {
229 let encoded = json!({
230 "schema_version": JOB_DETAIL_SCHEMA_VERSION,
231 "status": job_status_to_str(job.status),
232 "detail": job.detail.clone(),
233 "retry": job_retry_to_value(&job.retry),
234 "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
235 })
236 .to_string();
237 Ok(Some(encoded))
238 }
239
240 pub fn enqueue(&mut self, name: impl Into<String>) -> JobRecord {
242 let now = Self::now_ts();
243 let id = format!("job-{}", Uuid::new_v4());
244 let mut job = JobRecord {
245 id: id.clone(),
246 name: name.into(),
247 status: JobStatus::Queued,
248 progress: Some(0),
249 detail: None,
250 retry: JobRetryMetadata::default(),
251 history: Vec::new(),
252 created_at: now,
253 updated_at: now,
254 };
255 Self::push_history(&mut job, "created");
256 self.jobs.insert(id, job.clone());
257 job
258 }
259
260 pub fn set_running(&mut self, id: &str) {
262 if let Some(job) = self.jobs.get_mut(id) {
263 job.status = JobStatus::Running;
264 Self::clear_retry_schedule(&mut job.retry);
265 job.updated_at = Self::now_ts();
266 Self::push_history(job, "running");
267 }
268 }
269
270 pub fn update_progress(&mut self, id: &str, progress: u8, detail: Option<String>) {
272 if let Some(job) = self.jobs.get_mut(id) {
273 job.progress = Some(progress.min(100));
274 job.detail = detail;
275 job.updated_at = Self::now_ts();
276 Self::push_history(job, "progress_updated");
277 }
278 }
279
280 pub fn complete(&mut self, id: &str) {
282 if let Some(job) = self.jobs.get_mut(id) {
283 job.status = JobStatus::Completed;
284 job.progress = Some(100);
285 Self::clear_retry_schedule(&mut job.retry);
286 job.updated_at = Self::now_ts();
287 Self::push_history(job, "completed");
288 }
289 }
290
291 pub fn fail(&mut self, id: &str, detail: impl Into<String>) {
293 if let Some(job) = self.jobs.get_mut(id) {
294 let now = Self::now_ts();
295 job.status = JobStatus::Failed;
296 job.detail = Some(detail.into());
297 if job.retry.attempt < job.retry.max_attempts {
298 job.retry.attempt += 1;
299 job.retry.next_backoff_ms = Self::deterministic_backoff_ms(&job.retry);
300 let delay_secs = ((job.retry.next_backoff_ms.saturating_add(999)) / 1000)
301 .min(i64::MAX as u64) as i64;
302 job.retry.next_retry_at = Some(now.saturating_add(delay_secs));
303 } else {
304 Self::clear_retry_schedule(&mut job.retry);
305 }
306 job.updated_at = now;
307 Self::push_history(job, "failed");
308 }
309 }
310
311 pub fn cancel(&mut self, id: &str) {
313 if let Some(job) = self.jobs.get_mut(id) {
314 job.status = JobStatus::Cancelled;
315 Self::clear_retry_schedule(&mut job.retry);
316 job.updated_at = Self::now_ts();
317 Self::push_history(job, "cancelled");
318 }
319 }
320
321 pub fn pause(&mut self, id: &str, detail: Option<String>) {
323 if let Some(job) = self.jobs.get_mut(id) {
324 job.status = JobStatus::Paused;
325 if detail.is_some() {
326 job.detail = detail;
327 }
328 job.updated_at = Self::now_ts();
329 Self::push_history(job, "paused");
330 }
331 }
332
333 pub fn resume(&mut self, id: &str, detail: Option<String>) {
335 if let Some(job) = self.jobs.get_mut(id) {
336 job.status = JobStatus::Running;
337 if detail.is_some() {
338 job.detail = detail;
339 }
340 Self::clear_retry_schedule(&mut job.retry);
341 job.updated_at = Self::now_ts();
342 Self::push_history(job, "resumed");
343 }
344 }
345
346 pub fn list(&self) -> Vec<JobRecord> {
348 let mut out = self.jobs.values().cloned().collect::<Vec<_>>();
349 out.sort_by_key(|job| std::cmp::Reverse(job.updated_at));
350 out
351 }
352
353 pub fn history(&self, id: &str) -> Vec<JobHistoryEntry> {
355 self.jobs
356 .get(id)
357 .map(|job| job.history.clone())
358 .unwrap_or_default()
359 }
360
361 pub fn resume_pending(&mut self) -> Vec<JobRecord> {
363 let mut resumed = Vec::new();
364 for job in self.jobs.values_mut() {
365 if matches!(job.status, JobStatus::Queued | JobStatus::Running) {
366 job.status = JobStatus::Queued;
367 job.updated_at = Self::now_ts();
368 Self::push_history(job, "queued_after_resume");
369 resumed.push(job.clone());
370 }
371 }
372 resumed
373 }
374
375 pub fn load_from_store(&mut self, store: &StateStore) -> Result<()> {
377 let persisted = store.list_jobs(Some(500))?;
378 for job in persisted {
379 let fallback_status = job_state_status_to_runtime(job.status);
380 let parsed = Self::parse_persisted_detail(job.detail.as_deref());
381 let (status, detail, retry, history) = if let Some(detail_state) = parsed {
382 (
383 detail_state.status,
384 detail_state.detail,
385 detail_state.retry,
386 detail_state.history,
387 )
388 } else {
389 (
390 fallback_status,
391 job.detail,
392 JobRetryMetadata::default(),
393 Vec::new(),
394 )
395 };
396 self.jobs.insert(
397 job.id.clone(),
398 JobRecord {
399 id: job.id,
400 name: job.name,
401 status,
402 progress: job.progress,
403 detail,
404 retry,
405 history,
406 created_at: job.created_at,
407 updated_at: job.updated_at,
408 },
409 );
410 }
411 Ok(())
412 }
413
414 pub fn persist_job(&self, store: &StateStore, id: &str) -> Result<()> {
416 let Some(job) = self.jobs.get(id) else {
417 return Ok(());
418 };
419 let encoded_detail = Self::encode_persisted_detail(job)?;
420 store.upsert_job(&JobStateRecord {
421 id: job.id.clone(),
422 name: job.name.clone(),
423 status: runtime_status_to_job_state(job.status),
424 progress: job.progress,
425 detail: encoded_detail,
426 created_at: job.created_at,
427 updated_at: job.updated_at,
428 })
429 }
430
431 pub fn persist_all(&self, store: &StateStore) -> Result<()> {
433 for id in self.jobs.keys() {
434 self.persist_job(store, id)?;
435 }
436 Ok(())
437 }
438}
439
440pub struct ThreadManager {
442 store: StateStore,
443 running_threads: HashMap<String, Thread>,
444 cli_version: String,
445}
446
447impl ThreadManager {
448 pub fn new(store: StateStore) -> Self {
450 Self {
451 store,
452 running_threads: HashMap::new(),
453 cli_version: env!("CARGO_PKG_VERSION").to_string(),
454 }
455 }
456
457 pub fn state_store(&self) -> &StateStore {
459 &self.store
460 }
461
462 pub fn spawn_thread_with_history(
464 &mut self,
465 model_provider: String,
466 cwd: PathBuf,
467 initial_history: InitialHistory,
468 persist_extended_history: bool,
469 ) -> Result<NewThread> {
470 let id = format!("thread-{}", Uuid::new_v4());
471 let now = chrono::Utc::now().timestamp();
472 let preview = preview_from_initial_history(&initial_history);
473 let source = match initial_history {
474 InitialHistory::New => SessionSource::Interactive,
475 InitialHistory::Forked(_) => SessionSource::Fork,
476 InitialHistory::Resumed { .. } => SessionSource::Resume,
477 };
478 let thread = Thread {
479 id: id.clone(),
480 preview,
481 ephemeral: !persist_extended_history,
482 model_provider: model_provider.clone(),
483 created_at: now,
484 updated_at: now,
485 status: ThreadStatus::Running,
486 path: None,
487 cwd: cwd.clone(),
488 cli_version: self.cli_version.clone(),
489 source: match source {
490 SessionSource::Interactive => codewhale_protocol::SessionSource::Interactive,
491 SessionSource::Resume => codewhale_protocol::SessionSource::Resume,
492 SessionSource::Fork => codewhale_protocol::SessionSource::Fork,
493 SessionSource::Api => codewhale_protocol::SessionSource::Api,
494 SessionSource::Unknown => codewhale_protocol::SessionSource::Unknown,
495 },
496 name: None,
497 };
498 self.persist_thread(&thread, None)?;
499 match &initial_history {
500 InitialHistory::Forked(items) => {
501 for item in items {
502 self.store.append_message(
503 &thread.id,
504 "history",
505 &item.to_string(),
506 Some(item.clone()),
507 )?;
508 }
509 }
510 InitialHistory::Resumed { history, .. } => {
511 for item in history {
512 self.store.append_message(
513 &thread.id,
514 "history",
515 &item.to_string(),
516 Some(item.clone()),
517 )?;
518 }
519 }
520 InitialHistory::New => {}
521 }
522 self.running_threads
523 .insert(thread.id.clone(), thread.clone());
524 Ok(NewThread {
525 thread,
526 model: "auto".to_string(),
527 model_provider,
528 cwd,
529 approval_policy: None,
530 sandbox: None,
531 })
532 }
533
534 pub fn resume_thread_with_history(
536 &mut self,
537 params: &ThreadResumeParams,
538 fallback_cwd: &Path,
539 model_provider: String,
540 ) -> Result<Option<NewThread>> {
541 if params.history.is_none()
542 && let Some(thread) = self.running_threads.get(¶ms.thread_id).cloned()
543 {
544 return Ok(Some(NewThread {
545 model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
546 model_provider: params.model_provider.clone().unwrap_or(model_provider),
547 cwd: params.cwd.clone().unwrap_or_else(|| thread.cwd.clone()),
548 approval_policy: params.approval_policy.clone(),
549 sandbox: params.sandbox.clone(),
550 thread,
551 }));
552 }
553
554 let persisted = self.store.get_thread(¶ms.thread_id)?;
555 let Some(metadata) = persisted else {
556 return Ok(None);
557 };
558 let mut thread = to_protocol_thread(metadata);
559 thread.status = ThreadStatus::Running;
560 thread.updated_at = chrono::Utc::now().timestamp();
561 thread.cwd = params
562 .cwd
563 .clone()
564 .unwrap_or_else(|| fallback_cwd.to_path_buf());
565 self.persist_thread(&thread, None)?;
566 self.running_threads
567 .insert(thread.id.clone(), thread.clone());
568 if let Some(history) = params.history.as_ref() {
569 for item in history {
570 self.store.append_message(
571 &thread.id,
572 "history",
573 &item.to_string(),
574 Some(item.clone()),
575 )?;
576 }
577 }
578
579 Ok(Some(NewThread {
580 model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
581 model_provider: params.model_provider.clone().unwrap_or(model_provider),
582 cwd: thread.cwd.clone(),
583 approval_policy: params.approval_policy.clone(),
584 sandbox: params.sandbox.clone(),
585 thread,
586 }))
587 }
588
589 pub fn fork_thread(
591 &mut self,
592 params: &ThreadForkParams,
593 fallback_cwd: &Path,
594 ) -> Result<Option<NewThread>> {
595 let parent = self.store.get_thread(¶ms.thread_id)?;
596 let Some(parent) = parent else {
597 return Ok(None);
598 };
599 let parent_thread = to_protocol_thread(parent);
600 let new = self.spawn_thread_with_history(
601 params
602 .model_provider
603 .clone()
604 .unwrap_or_else(|| parent_thread.model_provider.clone()),
605 params
606 .cwd
607 .clone()
608 .unwrap_or_else(|| fallback_cwd.to_path_buf()),
609 InitialHistory::Forked(vec![json!({
610 "type": "fork",
611 "from_thread_id": parent_thread.id
612 })]),
613 params.persist_extended_history,
614 )?;
615 Ok(Some(new))
616 }
617
618 pub fn list_threads(&self, params: &ThreadListParams) -> Result<Vec<Thread>> {
620 let list = self.store.list_threads(ThreadListFilters {
621 include_archived: params.include_archived,
622 limit: params.limit,
623 })?;
624 Ok(list.into_iter().map(to_protocol_thread).collect())
625 }
626
627 pub fn read_thread(&self, params: &ThreadReadParams) -> Result<Option<Thread>> {
629 Ok(self
630 .store
631 .get_thread(¶ms.thread_id)?
632 .map(to_protocol_thread))
633 }
634
635 pub fn set_thread_name(&mut self, params: &ThreadSetNameParams) -> Result<Option<Thread>> {
637 let Some(mut metadata) = self.store.get_thread(¶ms.thread_id)? else {
638 return Ok(None);
639 };
640 metadata.name = Some(params.name.clone());
641 metadata.updated_at = chrono::Utc::now().timestamp();
642 self.store.upsert_thread(&metadata)?;
643 let updated = to_protocol_thread(metadata);
644 self.running_threads
645 .insert(updated.id.clone(), updated.clone());
646 Ok(Some(updated))
647 }
648
649 pub fn set_thread_goal(&mut self, params: &ThreadGoalSetParams) -> Result<Option<ThreadGoal>> {
651 if self.store.get_thread(¶ms.thread_id)?.is_none() {
652 return Ok(None);
653 }
654 let now = chrono::Utc::now().timestamp();
655 let goal = ThreadGoalRecord {
656 thread_id: params.thread_id.clone(),
657 goal_id: format!("goal-{}", Uuid::new_v4()),
658 objective: params.objective.clone(),
659 status: PersistedThreadGoalStatus::Active,
660 token_budget: params.token_budget,
661 tokens_used: 0,
662 time_used_seconds: 0,
663 continuation_count: 0,
664 created_at: now,
665 updated_at: now,
666 };
667 self.store.upsert_thread_goal(&goal)?;
668 Ok(Some(to_protocol_goal(goal)))
669 }
670
671 pub fn get_thread_goal(&self, params: &ThreadGoalGetParams) -> Result<Option<ThreadGoal>> {
673 Ok(self
674 .store
675 .get_thread_goal(¶ms.thread_id)?
676 .map(to_protocol_goal))
677 }
678
679 pub fn record_thread_goal_progress(
681 &mut self,
682 params: &ThreadGoalProgressParams,
683 ) -> Result<Option<ThreadGoal>> {
684 if self.store.get_thread(¶ms.thread_id)?.is_none() {
685 return Ok(None);
686 }
687
688 let now = chrono::Utc::now().timestamp();
689 let mut goal = if params.token_delta != 0 || params.time_delta_seconds != 0 {
690 self.store.record_thread_goal_usage(
691 ¶ms.thread_id,
692 params.token_delta,
693 params.time_delta_seconds,
694 now,
695 )?
696 } else {
697 self.store.get_thread_goal(¶ms.thread_id)?
698 };
699
700 if params.record_continuation {
701 goal = self
702 .store
703 .record_thread_goal_continuation(¶ms.thread_id, now)?;
704 }
705
706 Ok(goal.map(to_protocol_goal))
707 }
708
709 pub fn clear_thread_goal(&mut self, params: &ThreadGoalClearParams) -> Result<bool> {
711 self.store.delete_thread_goal(¶ms.thread_id)
712 }
713
714 pub fn archive_thread(&mut self, thread_id: &str) -> Result<()> {
716 self.store.mark_archived(thread_id)?;
717 if let Some(thread) = self.running_threads.get_mut(thread_id) {
718 thread.status = ThreadStatus::Archived;
719 }
720 Ok(())
721 }
722
723 pub fn unarchive_thread(&mut self, thread_id: &str) -> Result<()> {
725 self.store.mark_unarchived(thread_id)?;
726 Ok(())
727 }
728
729 pub fn touch_message(&mut self, thread_id: &str, input: &str) -> Result<()> {
731 let Some(mut metadata) = self.store.get_thread(thread_id)? else {
732 return Ok(());
733 };
734 metadata.updated_at = chrono::Utc::now().timestamp();
735 metadata.preview = truncate_preview(input);
736 metadata.status = PersistedThreadStatus::Running;
737 self.store.upsert_thread(&metadata)?;
738 if let Some(thread) = self.running_threads.get_mut(thread_id) {
739 thread.updated_at = metadata.updated_at;
740 thread.preview = metadata.preview;
741 thread.status = ThreadStatus::Running;
742 }
743 let message_id = self.store.append_message(thread_id, "user", input, None)?;
744 self.store.save_checkpoint(
745 thread_id,
746 "latest",
747 &json!({
748 "reason": "thread_message",
749 "message_id": message_id,
750 "role": "user",
751 "preview": truncate_preview(input),
752 "updated_at": metadata.updated_at
753 }),
754 )?;
755 Ok(())
756 }
757
758 fn persist_thread(&self, thread: &Thread, rollout_path: Option<PathBuf>) -> Result<()> {
759 self.store.upsert_thread(&ThreadMetadata {
760 id: thread.id.clone(),
761 rollout_path,
762 preview: thread.preview.clone(),
763 ephemeral: thread.ephemeral,
764 model_provider: thread.model_provider.clone(),
765 created_at: thread.created_at,
766 updated_at: thread.updated_at,
767 status: to_persisted_status(&thread.status),
768 path: thread.path.clone(),
769 cwd: thread.cwd.clone(),
770 cli_version: thread.cli_version.clone(),
771 source: to_persisted_source(&thread.source),
772 name: thread.name.clone(),
773 sandbox_policy: None,
774 approval_mode: None,
775 archived: matches!(thread.status, ThreadStatus::Archived),
776 archived_at: None,
777 git_sha: None,
778 git_branch: None,
779 git_origin_url: None,
780 memory_mode: None,
781 current_leaf_id: None,
782 })
783 }
784}
785
786pub struct Runtime {
788 pub config: ConfigToml,
790 pub model_registry: ModelRegistry,
792 pub thread_manager: ThreadManager,
794 pub tool_registry: Arc<ToolRegistry>,
796 pub mcp_manager: Arc<McpManager>,
798 pub exec_policy: ExecPolicyEngine,
800 pub hooks: HookDispatcher,
802 pub jobs: JobManager,
804}
805
806impl Runtime {
807 pub fn new(
809 config: ConfigToml,
810 model_registry: ModelRegistry,
811 state: StateStore,
812 tool_registry: Arc<ToolRegistry>,
813 mcp_manager: Arc<McpManager>,
814 exec_policy: ExecPolicyEngine,
815 hooks: HookDispatcher,
816 ) -> Self {
817 let mut jobs = JobManager::default();
818 if let Err(e) = jobs.load_from_store(&state) {
819 tracing::warn!("Failed to load job store, starting with empty job list: {e}");
820 }
821 Self {
822 config,
823 model_registry,
824 thread_manager: ThreadManager::new(state),
825 tool_registry,
826 mcp_manager,
827 exec_policy,
828 hooks,
829 jobs,
830 }
831 }
832
833 fn persisted_thread_data(&self, thread_id: &str) -> Result<Value> {
834 let history = self
835 .thread_manager
836 .state_store()
837 .list_messages(thread_id, Some(500))?
838 .into_iter()
839 .map(|message| {
840 json!({
841 "id": message.id,
842 "role": message.role,
843 "content": message.content,
844 "item": message.item,
845 "created_at": message.created_at
846 })
847 })
848 .collect::<Vec<_>>();
849
850 let checkpoint = self
851 .thread_manager
852 .state_store()
853 .load_checkpoint(thread_id, None)?
854 .map(|record| {
855 json!({
856 "checkpoint_id": record.checkpoint_id,
857 "state": record.state,
858 "created_at": record.created_at
859 })
860 });
861
862 let goal = self
863 .thread_manager
864 .state_store()
865 .get_thread_goal(thread_id)?
866 .map(to_protocol_goal);
867
868 Ok(json!({
869 "history": history,
870 "checkpoint": checkpoint,
871 "goal": goal
872 }))
873 }
874
875 fn persist_latest_checkpoint(&self, thread_id: &str, reason: &str, state: Value) -> Result<()> {
876 self.thread_manager.state_store().save_checkpoint(
877 thread_id,
878 "latest",
879 &json!({
880 "reason": reason,
881 "saved_at": chrono::Utc::now().timestamp(),
882 "state": state
883 }),
884 )
885 }
886
887 pub async fn handle_thread(&mut self, req: ThreadRequest) -> Result<ThreadResponse> {
889 match req {
890 ThreadRequest::Create { .. } => {
891 let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
892 let new = self.thread_manager.spawn_thread_with_history(
893 "deepseek".to_string(),
894 cwd,
895 InitialHistory::New,
896 false,
897 )?;
898 let mut response = thread_response_from_new("created", new);
899 response.data = self.persisted_thread_data(&response.thread_id)?;
900 Ok(response)
901 }
902 ThreadRequest::Start(params) => {
903 let cwd = params.cwd.clone().unwrap_or_else(|| {
904 std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
905 });
906 let new = self.thread_manager.spawn_thread_with_history(
907 params
908 .model_provider
909 .clone()
910 .unwrap_or_else(|| "deepseek".to_string()),
911 cwd,
912 InitialHistory::New,
913 params.persist_extended_history,
914 )?;
915 let mut response = thread_response_from_new("started", new);
916 response.data = self.persisted_thread_data(&response.thread_id)?;
917 Ok(response)
918 }
919 ThreadRequest::Resume(params) => {
920 let fallback_cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
921 if let Some(new) = self.thread_manager.resume_thread_with_history(
922 ¶ms,
923 &fallback_cwd,
924 "deepseek".to_string(),
925 )? {
926 let mut response = thread_response_from_new("resumed", new);
927 response.data = self.persisted_thread_data(&response.thread_id)?;
928 Ok(response)
929 } else {
930 Ok(ThreadResponse {
931 thread_id: params.thread_id,
932 status: "missing".to_string(),
933 thread: None,
934 threads: Vec::new(),
935 goal: None,
936 model: None,
937 model_provider: None,
938 cwd: None,
939 approval_policy: params.approval_policy,
940 sandbox: params.sandbox,
941 events: Vec::new(),
942 data: json!({"error":"thread not found"}),
943 })
944 }
945 }
946 ThreadRequest::Fork(params) => {
947 let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
948 if let Some(new) = self.thread_manager.fork_thread(¶ms, &cwd)? {
949 let mut response = thread_response_from_new("forked", new);
950 response.data = self.persisted_thread_data(&response.thread_id)?;
951 Ok(response)
952 } else {
953 Ok(ThreadResponse {
954 thread_id: params.thread_id,
955 status: "missing".to_string(),
956 thread: None,
957 threads: Vec::new(),
958 goal: None,
959 model: None,
960 model_provider: None,
961 cwd: None,
962 approval_policy: params.approval_policy,
963 sandbox: params.sandbox,
964 events: Vec::new(),
965 data: json!({"error":"thread not found"}),
966 })
967 }
968 }
969 ThreadRequest::List(params) => Ok(ThreadResponse {
970 thread_id: "list".to_string(),
971 status: "ok".to_string(),
972 thread: None,
973 threads: self.thread_manager.list_threads(¶ms)?,
974 goal: None,
975 model: None,
976 model_provider: None,
977 cwd: None,
978 approval_policy: None,
979 sandbox: None,
980 events: Vec::new(),
981 data: json!({}),
982 }),
983 ThreadRequest::Read(params) => {
984 let id = params.thread_id.clone();
985 let data = self.persisted_thread_data(&id)?;
986 Ok(ThreadResponse {
987 thread_id: id,
988 status: "ok".to_string(),
989 thread: self.thread_manager.read_thread(¶ms)?,
990 threads: Vec::new(),
991 goal: self.thread_manager.get_thread_goal(&ThreadGoalGetParams {
992 thread_id: params.thread_id,
993 })?,
994 model: None,
995 model_provider: None,
996 cwd: None,
997 approval_policy: None,
998 sandbox: None,
999 events: Vec::new(),
1000 data,
1001 })
1002 }
1003 ThreadRequest::SetName(params) => Ok(ThreadResponse {
1004 thread_id: params.thread_id.clone(),
1005 status: "ok".to_string(),
1006 thread: self.thread_manager.set_thread_name(¶ms)?,
1007 threads: Vec::new(),
1008 goal: None,
1009 model: None,
1010 model_provider: None,
1011 cwd: None,
1012 approval_policy: None,
1013 sandbox: None,
1014 events: Vec::new(),
1015 data: json!({}),
1016 }),
1017 ThreadRequest::GoalSet(params) => {
1018 let thread_id = params.thread_id.clone();
1019 if let Some(goal) = self.thread_manager.set_thread_goal(¶ms)? {
1020 Ok(ThreadResponse {
1021 thread_id,
1022 status: "ok".to_string(),
1023 thread: None,
1024 threads: Vec::new(),
1025 goal: Some(goal.clone()),
1026 model: None,
1027 model_provider: None,
1028 cwd: None,
1029 approval_policy: None,
1030 sandbox: None,
1031 events: vec![EventFrame::ThreadGoalUpdated { goal: goal.clone() }],
1032 data: json!({ "goal": goal }),
1033 })
1034 } else {
1035 Ok(ThreadResponse {
1036 thread_id,
1037 status: "missing".to_string(),
1038 thread: None,
1039 threads: Vec::new(),
1040 goal: None,
1041 model: None,
1042 model_provider: None,
1043 cwd: None,
1044 approval_policy: None,
1045 sandbox: None,
1046 events: Vec::new(),
1047 data: json!({"error":"thread not found"}),
1048 })
1049 }
1050 }
1051 ThreadRequest::GoalGet(params) => {
1052 let goal = self.thread_manager.get_thread_goal(¶ms)?;
1053 Ok(ThreadResponse {
1054 thread_id: params.thread_id,
1055 status: "ok".to_string(),
1056 thread: None,
1057 threads: Vec::new(),
1058 goal: goal.clone(),
1059 model: None,
1060 model_provider: None,
1061 cwd: None,
1062 approval_policy: None,
1063 sandbox: None,
1064 events: Vec::new(),
1065 data: json!({ "goal": goal }),
1066 })
1067 }
1068 ThreadRequest::GoalClear(params) => {
1069 let thread_id = params.thread_id.clone();
1070 let cleared = self.thread_manager.clear_thread_goal(¶ms)?;
1071 Ok(ThreadResponse {
1072 thread_id: thread_id.clone(),
1073 status: if cleared { "cleared" } else { "empty" }.to_string(),
1074 thread: None,
1075 threads: Vec::new(),
1076 goal: None,
1077 model: None,
1078 model_provider: None,
1079 cwd: None,
1080 approval_policy: None,
1081 sandbox: None,
1082 events: if cleared {
1083 vec![EventFrame::ThreadGoalCleared { thread_id }]
1084 } else {
1085 Vec::new()
1086 },
1087 data: json!({ "cleared": cleared }),
1088 })
1089 }
1090 ThreadRequest::GoalRecordProgress(params) => {
1091 let thread_id = params.thread_id.clone();
1092 if let Some(goal) = self.thread_manager.record_thread_goal_progress(¶ms)? {
1093 Ok(ThreadResponse {
1094 thread_id,
1095 status: "ok".to_string(),
1096 thread: None,
1097 threads: Vec::new(),
1098 goal: Some(goal.clone()),
1099 model: None,
1100 model_provider: None,
1101 cwd: None,
1102 approval_policy: None,
1103 sandbox: None,
1104 events: vec![EventFrame::ThreadGoalUpdated { goal: goal.clone() }],
1105 data: json!({ "goal": goal }),
1106 })
1107 } else {
1108 Ok(ThreadResponse {
1109 thread_id,
1110 status: "missing".to_string(),
1111 thread: None,
1112 threads: Vec::new(),
1113 goal: None,
1114 model: None,
1115 model_provider: None,
1116 cwd: None,
1117 approval_policy: None,
1118 sandbox: None,
1119 events: Vec::new(),
1120 data: json!({"error":"thread or goal not found"}),
1121 })
1122 }
1123 }
1124 ThreadRequest::Archive { thread_id } => {
1125 self.thread_manager.archive_thread(&thread_id)?;
1126 Ok(ThreadResponse {
1127 thread_id,
1128 status: "archived".to_string(),
1129 thread: None,
1130 threads: Vec::new(),
1131 goal: None,
1132 model: None,
1133 model_provider: None,
1134 cwd: None,
1135 approval_policy: None,
1136 sandbox: None,
1137 events: Vec::new(),
1138 data: json!({}),
1139 })
1140 }
1141 ThreadRequest::Unarchive { thread_id } => {
1142 self.thread_manager.unarchive_thread(&thread_id)?;
1143 Ok(ThreadResponse {
1144 thread_id,
1145 status: "unarchived".to_string(),
1146 thread: None,
1147 threads: Vec::new(),
1148 goal: None,
1149 model: None,
1150 model_provider: None,
1151 cwd: None,
1152 approval_policy: None,
1153 sandbox: None,
1154 events: Vec::new(),
1155 data: json!({}),
1156 })
1157 }
1158 ThreadRequest::Message { thread_id, input } => {
1159 self.thread_manager.touch_message(&thread_id, &input)?;
1160 let response_id = format!("{thread_id}:{}", input.len());
1161 self.hooks
1162 .emit(HookEvent::ResponseStart {
1163 response_id: response_id.clone(),
1164 })
1165 .await;
1166 self.hooks
1167 .emit(HookEvent::ResponseEnd {
1168 response_id: response_id.clone(),
1169 })
1170 .await;
1171
1172 Ok(ThreadResponse {
1173 thread_id,
1174 status: "accepted".to_string(),
1175 thread: None,
1176 threads: Vec::new(),
1177 goal: None,
1178 model: None,
1179 model_provider: None,
1180 cwd: None,
1181 approval_policy: None,
1182 sandbox: None,
1183 events: vec![
1184 EventFrame::ResponseStart {
1185 response_id: response_id.clone(),
1186 },
1187 EventFrame::ResponseDelta {
1188 response_id: response_id.clone(),
1189 delta: "queued".to_string(),
1190 channel: ResponseChannel::Text,
1191 },
1192 EventFrame::ResponseEnd { response_id },
1193 ],
1194 data: json!({}),
1195 })
1196 }
1197 }
1198 }
1199
1200 pub async fn handle_prompt(
1202 &mut self,
1203 req: PromptRequest,
1204 cli_overrides: &CliRuntimeOverrides,
1205 ) -> Result<PromptResponse> {
1206 let resolved = self.config.resolve_runtime_options(cli_overrides);
1207 let requested_model = req.model.clone().unwrap_or_else(|| resolved.model.clone());
1208 let selection = self
1209 .model_registry
1210 .resolve(Some(&requested_model), Some(resolved.provider));
1211 let resolved_model = selection.resolved.id.clone();
1212 let response_id = format!("resp-{}", Uuid::new_v4());
1213
1214 self.hooks
1215 .emit(HookEvent::ResponseStart {
1216 response_id: response_id.clone(),
1217 })
1218 .await;
1219 self.hooks
1220 .emit(HookEvent::ResponseDelta {
1221 response_id: response_id.clone(),
1222 delta: "model-selected".to_string(),
1223 })
1224 .await;
1225 self.hooks
1226 .emit(HookEvent::ResponseEnd {
1227 response_id: response_id.clone(),
1228 })
1229 .await;
1230
1231 let payload = json!({
1232 "provider": resolved.provider.as_str(),
1233 "model": resolved_model.clone(),
1234 "prompt": req.prompt,
1235 "telemetry": resolved.telemetry,
1236 "base_url": resolved.base_url,
1237 "has_api_key": resolved.api_key.as_ref().is_some_and(|k| !k.trim().is_empty()),
1238 "approval_policy": resolved.approval_policy,
1239 "sandbox_mode": resolved.sandbox_mode
1240 });
1241 if let Some(thread_id) = req.thread_id.as_ref() {
1242 self.thread_manager.touch_message(thread_id, &req.prompt)?;
1243 let assistant_message_id = self.thread_manager.store.append_message(
1244 thread_id,
1245 "assistant",
1246 &payload.to_string(),
1247 Some(payload.clone()),
1248 )?;
1249 self.persist_latest_checkpoint(
1250 thread_id,
1251 "prompt_response",
1252 json!({
1253 "response_id": response_id.clone(),
1254 "model": resolved_model.clone(),
1255 "provider": resolved.provider.as_str(),
1256 "assistant_message_id": assistant_message_id
1257 }),
1258 )?;
1259 }
1260
1261 Ok(PromptResponse {
1262 output: payload.to_string(),
1263 model: resolved_model,
1264 events: vec![
1265 EventFrame::ResponseStart {
1266 response_id: response_id.clone(),
1267 },
1268 EventFrame::ResponseDelta {
1269 response_id: response_id.clone(),
1270 delta: "model-selected".to_string(),
1271 channel: ResponseChannel::Text,
1272 },
1273 EventFrame::ResponseEnd { response_id },
1274 ],
1275 })
1276 }
1277
1278 pub async fn invoke_tool(
1280 &self,
1281 call: ToolCall,
1282 approval_mode: AskForApproval,
1283 cwd: &Path,
1284 ) -> Result<Value> {
1285 let fallback_cwd = cwd.display().to_string();
1286 let (command, policy_cwd, execution_kind) = call.execution_subject(&fallback_cwd);
1287 let policy_tool = match &call.payload {
1288 ToolPayload::LocalShell { .. } => "exec_shell",
1289 _ => call.name.as_str(),
1290 };
1291 let policy_path = permission_path_for_call(&call);
1292 let decision = self.exec_policy.check(ExecPolicyContext {
1293 command: &command,
1294 cwd: &policy_cwd,
1295 tool: Some(policy_tool),
1296 path: policy_path.as_deref(),
1297 ask_for_approval: approval_mode,
1298 sandbox_mode: None,
1299 })?;
1300 let precheck = policy_precheck_payload(&decision, &command, &policy_cwd, execution_kind);
1301 let response_id = format!("tool-{}", Uuid::new_v4());
1302 let call_id = call
1303 .raw_tool_call_id
1304 .clone()
1305 .unwrap_or_else(|| format!("tool-call-{}", Uuid::new_v4()));
1306 self.hooks
1307 .emit(HookEvent::ToolLifecycle {
1308 response_id: response_id.clone(),
1309 tool_name: call.name.clone(),
1310 phase: "precheck".to_string(),
1311 payload: precheck.clone(),
1312 })
1313 .await;
1314
1315 if !decision.allow {
1316 let reason = decision.reason().to_string();
1317 let approval_id = format!("approval-{}", Uuid::new_v4());
1318 let error_frame = EventFrame::Error {
1319 response_id: response_id.clone(),
1320 message: reason.clone(),
1321 };
1322 self.hooks
1323 .emit(HookEvent::ApprovalLifecycle {
1324 approval_id,
1325 phase: "denied".to_string(),
1326 reason: Some(reason.clone()),
1327 })
1328 .await;
1329 self.hooks
1330 .emit(HookEvent::GenericEventFrame {
1331 frame: Box::new(error_frame.clone()),
1332 })
1333 .await;
1334 return Ok(json!({
1335 "ok": false,
1336 "status": "denied",
1337 "execution_kind": execution_kind,
1338 "response_id": response_id,
1339 "precheck": precheck,
1340 "error": reason,
1341 "events": [event_frame_payload(&error_frame)],
1342 }));
1343 }
1344
1345 if decision.requires_approval {
1346 let approval_id = format!("approval-{}", Uuid::new_v4());
1347 let reason = decision.reason().to_string();
1348 let maybe_approval_frame = approval_request_frame(
1349 &decision.requirement,
1350 decision.matched_rule.as_deref(),
1351 call_id,
1352 approval_id.clone(),
1353 response_id.clone(),
1354 command.clone(),
1355 policy_cwd.clone(),
1356 );
1357 self.hooks
1358 .emit(HookEvent::ApprovalLifecycle {
1359 approval_id: approval_id.clone(),
1360 phase: "requested".to_string(),
1361 reason: Some(reason.clone()),
1362 })
1363 .await;
1364 let mut events = Vec::new();
1365 if let Some(frame) = maybe_approval_frame {
1366 self.hooks
1367 .emit(HookEvent::GenericEventFrame {
1368 frame: Box::new(frame.clone()),
1369 })
1370 .await;
1371 events.push(event_frame_payload(&frame));
1372 }
1373 return Ok(json!({
1374 "ok": false,
1375 "status": "approval_required",
1376 "execution_kind": execution_kind,
1377 "response_id": response_id,
1378 "approval_id": approval_id,
1379 "precheck": precheck,
1380 "error": reason,
1381 "events": events,
1382 }));
1383 }
1384
1385 if call.name == REQUEST_USER_INPUT_TOOL_NAME {
1394 let request_id = format!("user-input-{}", Uuid::new_v4());
1395 let arguments = match &call.payload {
1396 ToolPayload::Function { arguments } => arguments.as_str(),
1397 _ => "",
1400 };
1401 let maybe_frame = user_input_request_frame(
1402 call_id.clone(),
1403 response_id.clone(),
1404 request_id.clone(),
1405 arguments,
1406 );
1407 let mut events = Vec::new();
1408 if let Some(frame) = maybe_frame {
1409 self.hooks
1410 .emit(HookEvent::GenericEventFrame {
1411 frame: Box::new(frame.clone()),
1412 })
1413 .await;
1414 events.push(event_frame_payload(&frame));
1415 }
1416 return Ok(json!({
1417 "ok": false,
1418 "status": "user_input_required",
1419 "execution_kind": execution_kind,
1420 "response_id": response_id,
1421 "request_id": request_id,
1422 "precheck": precheck,
1423 "events": events,
1424 }));
1425 }
1426
1427 let start_frame = EventFrame::ToolCallStart {
1428 response_id: response_id.clone(),
1429 tool_name: call.name.clone(),
1430 arguments: tool_payload_value(&call.payload),
1431 };
1432 self.hooks
1433 .emit(HookEvent::GenericEventFrame {
1434 frame: Box::new(start_frame.clone()),
1435 })
1436 .await;
1437 self.hooks
1438 .emit(HookEvent::ToolLifecycle {
1439 response_id: response_id.clone(),
1440 tool_name: call.name.clone(),
1441 phase: "dispatching".to_string(),
1442 payload: json!({
1443 "call_id": call_id,
1444 "execution_kind": execution_kind
1445 }),
1446 })
1447 .await;
1448
1449 match self.tool_registry.dispatch(call.clone(), true).await {
1450 Ok(tool_output) => {
1451 let result_frame = EventFrame::ToolCallResult {
1452 response_id: response_id.clone(),
1453 tool_name: call.name.clone(),
1454 output: tool_output_value(&tool_output),
1455 };
1456 self.hooks
1457 .emit(HookEvent::GenericEventFrame {
1458 frame: Box::new(result_frame.clone()),
1459 })
1460 .await;
1461 self.hooks
1462 .emit(HookEvent::ToolLifecycle {
1463 response_id: response_id.clone(),
1464 tool_name: call.name,
1465 phase: "completed".to_string(),
1466 payload: json!({ "ok": true }),
1467 })
1468 .await;
1469 Ok(json!({
1470 "ok": true,
1471 "status": "completed",
1472 "execution_kind": execution_kind,
1473 "response_id": response_id,
1474 "precheck": precheck,
1475 "output": tool_output,
1476 "events": [
1477 event_frame_payload(&start_frame),
1478 event_frame_payload(&result_frame)
1479 ]
1480 }))
1481 }
1482 Err(err) => {
1483 let message = format!("{err:?}");
1484 let error_frame = EventFrame::Error {
1485 response_id: response_id.clone(),
1486 message: message.clone(),
1487 };
1488 self.hooks
1489 .emit(HookEvent::GenericEventFrame {
1490 frame: Box::new(error_frame.clone()),
1491 })
1492 .await;
1493 self.hooks
1494 .emit(HookEvent::ToolLifecycle {
1495 response_id: response_id.clone(),
1496 tool_name: call.name,
1497 phase: "failed".to_string(),
1498 payload: json!({ "error": message.clone() }),
1499 })
1500 .await;
1501 Ok(json!({
1502 "ok": false,
1503 "status": "failed",
1504 "execution_kind": execution_kind,
1505 "response_id": response_id,
1506 "precheck": precheck,
1507 "error": message,
1508 "events": [
1509 event_frame_payload(&start_frame),
1510 event_frame_payload(&error_frame)
1511 ]
1512 }))
1513 }
1514 }
1515 }
1516
1517 pub async fn mcp_startup(&self) -> McpStartupCompleteEvent {
1519 let mut updates = Vec::new();
1520 let summary = self.mcp_manager.start_all(|update| {
1521 updates.push(update);
1522 });
1523 for update in updates {
1524 let status = match update.status {
1525 McpManagerStartupStatus::Starting => codewhale_protocol::McpStartupStatus::Starting,
1526 McpManagerStartupStatus::Ready => codewhale_protocol::McpStartupStatus::Ready,
1527 McpManagerStartupStatus::Failed { error } => {
1528 codewhale_protocol::McpStartupStatus::Failed { error }
1529 }
1530 McpManagerStartupStatus::Cancelled => {
1531 codewhale_protocol::McpStartupStatus::Cancelled
1532 }
1533 };
1534 self.hooks
1535 .emit(HookEvent::GenericEventFrame {
1536 frame: Box::new(EventFrame::McpStartupUpdate {
1537 update: codewhale_protocol::McpStartupUpdateEvent {
1538 server_name: update.server_name,
1539 status,
1540 },
1541 }),
1542 })
1543 .await;
1544 }
1545 self.hooks
1546 .emit(HookEvent::GenericEventFrame {
1547 frame: Box::new(EventFrame::McpStartupComplete {
1548 summary: codewhale_protocol::McpStartupCompleteEvent {
1549 ready: summary.ready.clone(),
1550 failed: summary
1551 .failed
1552 .iter()
1553 .map(|f| codewhale_protocol::McpStartupFailure {
1554 server_name: f.server_name.clone(),
1555 error: f.error.clone(),
1556 })
1557 .collect(),
1558 cancelled: summary.cancelled.clone(),
1559 },
1560 }),
1561 })
1562 .await;
1563 summary
1564 }
1565
1566 pub fn app_status(&self) -> AppResponse {
1568 let jobs = self.jobs.list();
1569 let events = jobs
1570 .iter()
1571 .flat_map(|job| {
1572 job.history.iter().map(|entry| EventFrame::ResponseDelta {
1573 response_id: job.id.clone(),
1574 delta: json!({
1575 "kind": "job_transition",
1576 "job_id": job.id.clone(),
1577 "phase": entry.phase.clone(),
1578 "status": job_status_to_str(entry.status),
1579 "progress": entry.progress,
1580 "detail": entry.detail.clone(),
1581 "retry": job_retry_to_value(&entry.retry),
1582 "at": entry.at
1583 })
1584 .to_string(),
1585 channel: ResponseChannel::Text,
1586 })
1587 })
1588 .collect::<Vec<_>>();
1589 AppResponse {
1590 ok: true,
1591 data: json!({
1592 "jobs": jobs.into_iter().map(|job| {
1593 json!({
1594 "id": job.id,
1595 "name": job.name,
1596 "status": job_status_to_str(job.status),
1597 "progress": job.progress,
1598 "detail": job.detail,
1599 "retry": job_retry_to_value(&job.retry),
1600 "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
1601 })
1602 }).collect::<Vec<_>>()
1603 }),
1604 events,
1605 }
1606 }
1607
1608 pub fn provider_default(&self) -> ProviderKind {
1610 self.config.provider
1611 }
1612
1613 pub fn save_thread_checkpoint(
1615 &self,
1616 thread_id: &str,
1617 checkpoint_id: &str,
1618 state: &Value,
1619 ) -> Result<()> {
1620 self.thread_manager
1621 .state_store()
1622 .save_checkpoint(thread_id, checkpoint_id, state)
1623 }
1624
1625 pub fn load_thread_checkpoint(
1627 &self,
1628 thread_id: &str,
1629 checkpoint_id: Option<&str>,
1630 ) -> Result<Option<Value>> {
1631 Ok(self
1632 .thread_manager
1633 .state_store()
1634 .load_checkpoint(thread_id, checkpoint_id)?
1635 .map(|checkpoint| checkpoint.state))
1636 }
1637
1638 pub fn enqueue_job(&mut self, name: impl Into<String>) -> Result<JobRecord> {
1640 let job = self.jobs.enqueue(name);
1641 self.jobs
1642 .persist_job(self.thread_manager.state_store(), &job.id)?;
1643 Ok(job)
1644 }
1645
1646 pub fn set_job_running(&mut self, job_id: &str) -> Result<()> {
1648 self.jobs.set_running(job_id);
1649 self.jobs
1650 .persist_job(self.thread_manager.state_store(), job_id)
1651 }
1652
1653 pub fn update_job_progress(
1655 &mut self,
1656 job_id: &str,
1657 progress: u8,
1658 detail: Option<String>,
1659 ) -> Result<()> {
1660 self.jobs.update_progress(job_id, progress, detail);
1661 self.jobs
1662 .persist_job(self.thread_manager.state_store(), job_id)
1663 }
1664
1665 pub fn complete_job(&mut self, job_id: &str) -> Result<()> {
1667 self.jobs.complete(job_id);
1668 self.jobs
1669 .persist_job(self.thread_manager.state_store(), job_id)
1670 }
1671
1672 pub fn fail_job(&mut self, job_id: &str, detail: impl Into<String>) -> Result<()> {
1674 self.jobs.fail(job_id, detail);
1675 self.jobs
1676 .persist_job(self.thread_manager.state_store(), job_id)
1677 }
1678
1679 pub fn cancel_job(&mut self, job_id: &str) -> Result<()> {
1681 self.jobs.cancel(job_id);
1682 self.jobs
1683 .persist_job(self.thread_manager.state_store(), job_id)
1684 }
1685
1686 pub fn pause_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1688 self.jobs.pause(job_id, detail);
1689 self.jobs
1690 .persist_job(self.thread_manager.state_store(), job_id)
1691 }
1692
1693 pub fn resume_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1695 self.jobs.resume(job_id, detail);
1696 self.jobs
1697 .persist_job(self.thread_manager.state_store(), job_id)
1698 }
1699
1700 pub fn job_history(&self, job_id: &str) -> Vec<JobHistoryEntry> {
1702 self.jobs.history(job_id)
1703 }
1704}
1705
1706fn thread_response_from_new(status: &str, new: NewThread) -> ThreadResponse {
1707 ThreadResponse {
1708 thread_id: new.thread.id.clone(),
1709 status: status.to_string(),
1710 thread: Some(new.thread),
1711 threads: Vec::new(),
1712 goal: None,
1713 model: Some(new.model),
1714 model_provider: Some(new.model_provider),
1715 cwd: Some(new.cwd),
1716 approval_policy: new.approval_policy,
1717 sandbox: new.sandbox,
1718 events: Vec::new(),
1719 data: json!({}),
1720 }
1721}
1722
1723fn preview_from_initial_history(initial_history: &InitialHistory) -> String {
1724 match initial_history {
1725 InitialHistory::New => "New conversation".to_string(),
1726 InitialHistory::Forked(items) => truncate_preview(
1727 &items
1728 .first()
1729 .map(Value::to_string)
1730 .unwrap_or_else(|| "Forked conversation".to_string()),
1731 ),
1732 InitialHistory::Resumed { history, .. } => truncate_preview(
1733 &history
1734 .first()
1735 .map(Value::to_string)
1736 .unwrap_or_else(|| "Resumed conversation".to_string()),
1737 ),
1738 }
1739}
1740
1741fn permission_path_for_call(call: &ToolCall) -> Option<String> {
1742 match &call.payload {
1743 ToolPayload::Function { arguments } => serde_json::from_str::<Value>(arguments)
1744 .ok()
1745 .and_then(|value| {
1746 value
1747 .get("path")
1748 .and_then(Value::as_str)
1749 .map(str::to_string)
1750 }),
1751 ToolPayload::Mcp { raw_arguments, .. } => raw_arguments
1752 .get("path")
1753 .and_then(Value::as_str)
1754 .map(str::to_string),
1755 ToolPayload::Custom { .. } | ToolPayload::LocalShell { .. } => None,
1756 }
1757}
1758
1759fn truncate_preview(value: &str) -> String {
1760 value.chars().take(120).collect()
1761}
1762
1763fn to_protocol_thread(thread: ThreadMetadata) -> Thread {
1764 Thread {
1765 id: thread.id,
1766 preview: thread.preview,
1767 ephemeral: thread.ephemeral,
1768 model_provider: thread.model_provider,
1769 created_at: thread.created_at,
1770 updated_at: thread.updated_at,
1771 status: match thread.status {
1772 PersistedThreadStatus::Running => ThreadStatus::Running,
1773 PersistedThreadStatus::Idle => ThreadStatus::Idle,
1774 PersistedThreadStatus::Completed => ThreadStatus::Completed,
1775 PersistedThreadStatus::Failed => ThreadStatus::Failed,
1776 PersistedThreadStatus::Paused => ThreadStatus::Paused,
1777 PersistedThreadStatus::Archived => ThreadStatus::Archived,
1778 },
1779 path: thread.path,
1780 cwd: thread.cwd,
1781 cli_version: thread.cli_version,
1782 source: match thread.source {
1783 SessionSource::Interactive => codewhale_protocol::SessionSource::Interactive,
1784 SessionSource::Resume => codewhale_protocol::SessionSource::Resume,
1785 SessionSource::Fork => codewhale_protocol::SessionSource::Fork,
1786 SessionSource::Api => codewhale_protocol::SessionSource::Api,
1787 SessionSource::Unknown => codewhale_protocol::SessionSource::Unknown,
1788 },
1789 name: thread.name,
1790 }
1791}
1792
1793fn to_protocol_goal(goal: ThreadGoalRecord) -> ThreadGoal {
1794 ThreadGoal {
1795 thread_id: goal.thread_id,
1796 goal_id: goal.goal_id,
1797 objective: goal.objective,
1798 status: to_protocol_goal_status(goal.status),
1799 token_budget: goal.token_budget,
1800 tokens_used: goal.tokens_used,
1801 time_used_seconds: goal.time_used_seconds,
1802 continuation_count: goal.continuation_count,
1803 created_at: goal.created_at,
1804 updated_at: goal.updated_at,
1805 }
1806}
1807
1808fn to_protocol_goal_status(status: PersistedThreadGoalStatus) -> ThreadGoalStatus {
1809 match status {
1810 PersistedThreadGoalStatus::Active => ThreadGoalStatus::Active,
1811 PersistedThreadGoalStatus::Paused => ThreadGoalStatus::Paused,
1812 PersistedThreadGoalStatus::Blocked => ThreadGoalStatus::Blocked,
1813 PersistedThreadGoalStatus::UsageLimited => ThreadGoalStatus::UsageLimited,
1814 PersistedThreadGoalStatus::BudgetLimited => ThreadGoalStatus::BudgetLimited,
1815 PersistedThreadGoalStatus::Complete => ThreadGoalStatus::Complete,
1816 }
1817}
1818
1819fn to_persisted_status(status: &ThreadStatus) -> PersistedThreadStatus {
1820 match status {
1821 ThreadStatus::Running => PersistedThreadStatus::Running,
1822 ThreadStatus::Idle => PersistedThreadStatus::Idle,
1823 ThreadStatus::Completed => PersistedThreadStatus::Completed,
1824 ThreadStatus::Failed => PersistedThreadStatus::Failed,
1825 ThreadStatus::Paused => PersistedThreadStatus::Paused,
1826 ThreadStatus::Archived => PersistedThreadStatus::Archived,
1827 }
1828}
1829
1830fn to_persisted_source(source: &codewhale_protocol::SessionSource) -> SessionSource {
1831 match source {
1832 codewhale_protocol::SessionSource::Interactive => SessionSource::Interactive,
1833 codewhale_protocol::SessionSource::Resume => SessionSource::Resume,
1834 codewhale_protocol::SessionSource::Fork => SessionSource::Fork,
1835 codewhale_protocol::SessionSource::Api => SessionSource::Api,
1836 codewhale_protocol::SessionSource::Unknown => SessionSource::Unknown,
1837 }
1838}
1839
1840fn approval_request_frame(
1841 requirement: &ExecApprovalRequirement,
1842 matched_rule: Option<&str>,
1843 call_id: String,
1844 approval_id: String,
1845 turn_id: String,
1846 command: String,
1847 cwd: String,
1848) -> Option<EventFrame> {
1849 let ExecApprovalRequirement::NeedsApproval {
1850 reason,
1851 proposed_execpolicy_amendment,
1852 proposed_network_policy_amendments,
1853 } = requirement
1854 else {
1855 return None;
1856 };
1857
1858 let mut available_decisions = vec![
1859 ReviewDecision::Approved,
1860 ReviewDecision::ApprovedForSession,
1861 ReviewDecision::Denied,
1862 ReviewDecision::Abort,
1863 ];
1864 if proposed_execpolicy_amendment
1865 .as_ref()
1866 .is_some_and(|amendment| !amendment.prefixes.is_empty())
1867 {
1868 available_decisions.push(ReviewDecision::ApprovedExecpolicyAmendment);
1869 }
1870 available_decisions.extend(proposed_network_policy_amendments.iter().cloned().map(
1871 |amendment| ReviewDecision::NetworkPolicyAmendment {
1872 host: amendment.host,
1873 action: amendment.action,
1874 },
1875 ));
1876
1877 Some(EventFrame::ExecApprovalRequest {
1878 request: ExecApprovalRequestEvent {
1879 call_id,
1880 approval_id,
1881 turn_id,
1882 command,
1883 cwd,
1884 reason: reason.clone(),
1885 matched_rule: matched_rule.map(|rule| rule.to_string().into_boxed_str()),
1886 network_approval_context: None,
1887 proposed_execpolicy_amendment: proposed_execpolicy_amendment
1888 .as_ref()
1889 .map(|amendment| amendment.prefixes.clone())
1890 .unwrap_or_default(),
1891 proposed_network_policy_amendments: proposed_network_policy_amendments.clone(),
1892 additional_permissions: Vec::new(),
1893 available_decisions,
1894 },
1895 })
1896}
1897
1898fn user_input_request_frame(
1906 call_id: String,
1907 turn_id: String,
1908 request_id: String,
1909 arguments: &str,
1910) -> Option<EventFrame> {
1911 let parsed: Value = serde_json::from_str(arguments).ok()?;
1912 let questions = parsed.get("questions").cloned().filter(Value::is_array)?;
1916 let request = UserInputRequestEvent {
1917 call_id,
1918 turn_id,
1919 request_id,
1920 questions: serde_json::from_value(questions).ok()?,
1921 };
1922 Some(EventFrame::UserInputRequest { request })
1923}
1924
1925fn approval_requirement_payload(requirement: &ExecApprovalRequirement) -> Value {
1926 match requirement {
1927 ExecApprovalRequirement::Skip {
1928 bypass_sandbox,
1929 proposed_execpolicy_amendment,
1930 } => json!({
1931 "type": "skip",
1932 "bypass_sandbox": bypass_sandbox,
1933 "reason": requirement.reason(),
1934 "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1935 .as_ref()
1936 .map(|amendment| amendment.prefixes.clone())
1937 .unwrap_or_default()
1938 }),
1939 ExecApprovalRequirement::NeedsApproval {
1940 reason,
1941 proposed_execpolicy_amendment,
1942 proposed_network_policy_amendments,
1943 } => json!({
1944 "type": "needs_approval",
1945 "reason": reason,
1946 "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1947 .as_ref()
1948 .map(|amendment| amendment.prefixes.clone())
1949 .unwrap_or_default(),
1950 "proposed_network_policy_amendments": proposed_network_policy_amendments
1951 }),
1952 ExecApprovalRequirement::Forbidden { reason } => json!({
1953 "type": "forbidden",
1954 "reason": reason
1955 }),
1956 }
1957}
1958
1959fn policy_precheck_payload(
1960 decision: &ExecPolicyDecision,
1961 command: &str,
1962 cwd: &str,
1963 execution_kind: &str,
1964) -> Value {
1965 json!({
1966 "execution_kind": execution_kind,
1967 "command": command,
1968 "cwd": cwd,
1969 "allow": decision.allow,
1970 "requires_approval": decision.requires_approval,
1971 "matched_rule": decision.matched_rule.clone(),
1972 "phase": decision.requirement.phase(),
1973 "reason": decision.reason(),
1974 "requirement": approval_requirement_payload(&decision.requirement)
1975 })
1976}
1977
1978fn tool_payload_value(payload: &ToolPayload) -> Value {
1979 serde_json::to_value(payload).unwrap_or_else(
1980 |_| json!({"type":"serialization_error","message":"tool payload unavailable"}),
1981 )
1982}
1983
1984fn tool_output_value(output: &codewhale_protocol::ToolOutput) -> Value {
1985 serde_json::to_value(output).unwrap_or_else(
1986 |_| json!({"type":"serialization_error","message":"tool output unavailable"}),
1987 )
1988}
1989
1990fn event_frame_payload(frame: &EventFrame) -> Value {
1991 serde_json::to_value(frame)
1992 .unwrap_or_else(|_| json!({"event":"error","message":"failed to encode event frame"}))
1993}
1994
1995const REQUEST_USER_INPUT_TOOL_NAME: &str = "request_user_input";
2001
2002fn json_optional_string(value: &Value) -> Option<String> {
2003 if value.is_null() {
2004 None
2005 } else {
2006 value.as_str().map(ToString::to_string)
2007 }
2008}
2009
2010fn parse_retry_metadata(value: Option<&Value>) -> JobRetryMetadata {
2011 let Some(value) = value else {
2012 return JobRetryMetadata::default();
2013 };
2014 JobRetryMetadata {
2015 attempt: value
2016 .get("attempt")
2017 .and_then(Value::as_u64)
2018 .unwrap_or(0)
2019 .min(u32::MAX as u64) as u32,
2020 max_attempts: value
2021 .get("max_attempts")
2022 .and_then(Value::as_u64)
2023 .unwrap_or(DEFAULT_JOB_MAX_ATTEMPTS as u64)
2024 .min(u32::MAX as u64) as u32,
2025 backoff_base_ms: value
2026 .get("backoff_base_ms")
2027 .and_then(Value::as_u64)
2028 .unwrap_or(DEFAULT_JOB_BACKOFF_BASE_MS),
2029 next_backoff_ms: value
2030 .get("next_backoff_ms")
2031 .and_then(Value::as_u64)
2032 .unwrap_or(0),
2033 next_retry_at: value.get("next_retry_at").and_then(Value::as_i64),
2034 }
2035}
2036
2037fn parse_history_entry(value: &Value) -> Option<JobHistoryEntry> {
2038 let status = value
2039 .get("status")
2040 .and_then(Value::as_str)
2041 .and_then(job_status_from_str)?;
2042 Some(JobHistoryEntry {
2043 at: value.get("at").and_then(Value::as_i64).unwrap_or(0),
2044 phase: value
2045 .get("phase")
2046 .and_then(Value::as_str)
2047 .unwrap_or("unknown")
2048 .to_string(),
2049 status,
2050 progress: value
2051 .get("progress")
2052 .and_then(Value::as_u64)
2053 .map(|v| v.min(u8::MAX as u64) as u8),
2054 detail: value.get("detail").and_then(json_optional_string),
2055 retry: parse_retry_metadata(value.get("retry")),
2056 })
2057}
2058
2059fn job_status_to_str(status: JobStatus) -> &'static str {
2060 match status {
2061 JobStatus::Queued => "queued",
2062 JobStatus::Running => "running",
2063 JobStatus::Paused => "paused",
2064 JobStatus::Completed => "completed",
2065 JobStatus::Failed => "failed",
2066 JobStatus::Cancelled => "cancelled",
2067 }
2068}
2069
2070fn job_status_from_str(value: &str) -> Option<JobStatus> {
2071 match value {
2072 "queued" => Some(JobStatus::Queued),
2073 "running" => Some(JobStatus::Running),
2074 "paused" => Some(JobStatus::Paused),
2075 "completed" => Some(JobStatus::Completed),
2076 "failed" => Some(JobStatus::Failed),
2077 "cancelled" => Some(JobStatus::Cancelled),
2078 _ => None,
2079 }
2080}
2081
2082fn job_retry_to_value(retry: &JobRetryMetadata) -> Value {
2083 json!({
2084 "attempt": retry.attempt,
2085 "max_attempts": retry.max_attempts,
2086 "backoff_base_ms": retry.backoff_base_ms,
2087 "next_backoff_ms": retry.next_backoff_ms,
2088 "next_retry_at": retry.next_retry_at
2089 })
2090}
2091
2092fn job_history_to_value(entry: &JobHistoryEntry) -> Value {
2093 json!({
2094 "at": entry.at,
2095 "phase": entry.phase.clone(),
2096 "status": job_status_to_str(entry.status),
2097 "progress": entry.progress,
2098 "detail": entry.detail.clone(),
2099 "retry": job_retry_to_value(&entry.retry)
2100 })
2101}
2102
2103fn runtime_status_to_job_state(status: JobStatus) -> JobStateStatus {
2104 match status {
2105 JobStatus::Queued => JobStateStatus::Queued,
2106 JobStatus::Running => JobStateStatus::Running,
2107 JobStatus::Paused => JobStateStatus::Running,
2108 JobStatus::Completed => JobStateStatus::Completed,
2109 JobStatus::Failed => JobStateStatus::Failed,
2110 JobStatus::Cancelled => JobStateStatus::Cancelled,
2111 }
2112}
2113
2114fn job_state_status_to_runtime(status: JobStateStatus) -> JobStatus {
2115 match status {
2116 JobStateStatus::Queued => JobStatus::Queued,
2117 JobStateStatus::Running => JobStatus::Running,
2118 JobStateStatus::Completed => JobStatus::Completed,
2119 JobStateStatus::Failed => JobStatus::Failed,
2120 JobStateStatus::Cancelled => JobStatus::Cancelled,
2121 }
2122}
2123
2124#[cfg(test)]
2125mod tests {
2126 use super::*;
2127 use codewhale_tools::ToolCallSource;
2128
2129 fn temp_core_state(name: &str) -> StateStore {
2130 let dir =
2131 std::env::temp_dir().join(format!("codewhale-core-{name}-{}", Uuid::new_v4().simple()));
2132 std::fs::create_dir_all(&dir).expect("create temp state dir");
2133 StateStore::open(Some(dir.join("state.db"))).expect("open state store")
2134 }
2135
2136 fn test_thread_metadata(id: &str) -> ThreadMetadata {
2137 ThreadMetadata {
2138 id: id.to_string(),
2139 rollout_path: None,
2140 preview: "test thread".to_string(),
2141 ephemeral: false,
2142 model_provider: "deepseek".to_string(),
2143 created_at: 10,
2144 updated_at: 10,
2145 status: PersistedThreadStatus::Running,
2146 path: None,
2147 cwd: PathBuf::from("/tmp/codewhale"),
2148 cli_version: "0.0.0-test".to_string(),
2149 source: SessionSource::Interactive,
2150 name: None,
2151 sandbox_policy: None,
2152 approval_mode: None,
2153 archived: false,
2154 archived_at: None,
2155 git_sha: None,
2156 git_branch: None,
2157 git_origin_url: None,
2158 memory_mode: None,
2159 current_leaf_id: None,
2160 }
2161 }
2162
2163 #[test]
2166 fn permission_path_for_call_extracts_function_path_argument() {
2167 let call = ToolCall {
2168 name: "read_file".to_string(),
2169 payload: ToolPayload::Function {
2170 arguments: json!({ "path": "README.md" }).to_string(),
2171 },
2172 source: ToolCallSource::Direct,
2173 raw_tool_call_id: None,
2174 };
2175
2176 assert_eq!(
2177 permission_path_for_call(&call).as_deref(),
2178 Some("README.md")
2179 );
2180 }
2181
2182 #[test]
2183 fn permission_path_for_call_extracts_mcp_path_argument() {
2184 let call = ToolCall {
2185 name: "mcp_fs_read".to_string(),
2186 payload: ToolPayload::Mcp {
2187 server: "fs".to_string(),
2188 tool: "read".to_string(),
2189 raw_arguments: json!({ "path": "secrets/token.txt" }),
2190 raw_tool_call_id: None,
2191 },
2192 source: ToolCallSource::Direct,
2193 raw_tool_call_id: None,
2194 };
2195
2196 assert_eq!(
2197 permission_path_for_call(&call).as_deref(),
2198 Some("secrets/token.txt")
2199 );
2200 }
2201
2202 #[test]
2203 fn permission_path_for_call_ignores_shell_payload() {
2204 let call = ToolCall {
2205 name: "exec_shell".to_string(),
2206 payload: ToolPayload::LocalShell {
2207 params: codewhale_protocol::LocalShellParams {
2208 command: "cargo test".to_string(),
2209 cwd: None,
2210 timeout_ms: None,
2211 },
2212 },
2213 source: ToolCallSource::Direct,
2214 raw_tool_call_id: None,
2215 };
2216
2217 assert_eq!(permission_path_for_call(&call), None);
2218 }
2219
2220 #[test]
2221 fn thread_goal_progress_accumulates_durable_accounting() {
2222 let store = temp_core_state("thread-goal-progress");
2223 store
2224 .upsert_thread(&test_thread_metadata("thread-1"))
2225 .expect("upsert thread");
2226 let mut manager = ThreadManager::new(store);
2227 manager
2228 .set_thread_goal(&ThreadGoalSetParams {
2229 thread_id: "thread-1".to_string(),
2230 objective: "Carry the goal across turns".to_string(),
2231 token_budget: Some(2_000),
2232 })
2233 .expect("set goal")
2234 .expect("goal exists");
2235
2236 let updated = manager
2237 .record_thread_goal_progress(&ThreadGoalProgressParams {
2238 thread_id: "thread-1".to_string(),
2239 token_delta: 750,
2240 time_delta_seconds: 12,
2241 record_continuation: true,
2242 })
2243 .expect("record progress")
2244 .expect("goal exists");
2245
2246 assert_eq!(updated.tokens_used, 750);
2247 assert_eq!(updated.time_used_seconds, 12);
2248 assert_eq!(updated.continuation_count, 1);
2249
2250 let persisted = manager
2251 .get_thread_goal(&ThreadGoalGetParams {
2252 thread_id: "thread-1".to_string(),
2253 })
2254 .expect("read goal")
2255 .expect("goal exists");
2256 assert_eq!(persisted.tokens_used, 750);
2257 assert_eq!(persisted.time_used_seconds, 12);
2258 assert_eq!(persisted.continuation_count, 1);
2259 }
2260
2261 #[test]
2262 fn approval_request_frame_includes_matched_rule() {
2263 let requirement = ExecApprovalRequirement::NeedsApproval {
2264 reason: "Typed ask rule 'tool=exec_shell command=cargo test' requires approval."
2265 .to_string(),
2266 proposed_execpolicy_amendment: None,
2267 proposed_network_policy_amendments: Vec::new(),
2268 };
2269
2270 let frame = approval_request_frame(
2271 &requirement,
2272 Some("tool=exec_shell command=cargo test"),
2273 "call-1".to_string(),
2274 "approval-1".to_string(),
2275 "turn-1".to_string(),
2276 "cargo test --workspace".to_string(),
2277 "/repo".to_string(),
2278 )
2279 .expect("approval frame");
2280
2281 let EventFrame::ExecApprovalRequest { request } = frame else {
2282 panic!("expected exec approval request frame");
2283 };
2284 assert_eq!(
2285 request.matched_rule.as_deref(),
2286 Some("tool=exec_shell command=cargo test")
2287 );
2288 assert_eq!(request.reason, requirement.reason());
2289 }
2290
2291 #[test]
2292 fn user_input_request_frame_lifts_questions_from_arguments() {
2293 let arguments = r#"{"questions":[{"header":"Scope","id":"scope","question":"Which?","options":[{"label":"A","description":"a"},{"label":"B","description":"b"}],"allow_free_text":true}]}"#;
2297 let frame = user_input_request_frame(
2298 "call-1".to_string(),
2299 "turn-1".to_string(),
2300 "ui-1".to_string(),
2301 arguments,
2302 )
2303 .expect("user input frame");
2304
2305 let EventFrame::UserInputRequest { request } = frame else {
2306 panic!("expected user_input_request frame");
2307 };
2308 assert_eq!(request.call_id, "call-1");
2309 assert_eq!(request.turn_id, "turn-1");
2310 assert_eq!(request.request_id, "ui-1");
2311 assert_eq!(request.questions.len(), 1);
2312 assert_eq!(request.questions[0].id, "scope");
2313 assert!(request.questions[0].allow_free_text);
2314 assert!(!request.questions[0].multi_select);
2316 assert_eq!(request.questions[0].options.len(), 2);
2317 }
2318
2319 #[test]
2320 fn user_input_request_frame_returns_none_on_invalid_arguments() {
2321 let frame = user_input_request_frame(
2324 "call-1".to_string(),
2325 "turn-1".to_string(),
2326 "ui-1".to_string(),
2327 "not json",
2328 );
2329 assert!(frame.is_none());
2330
2331 let frame = user_input_request_frame(
2333 "call-1".to_string(),
2334 "turn-1".to_string(),
2335 "ui-1".to_string(),
2336 r#"{"foo":"bar"}"#,
2337 );
2338 assert!(frame.is_none());
2339 }
2340
2341 #[test]
2342 fn enqueue_creates_queued_job_with_zero_progress() {
2343 let mut jm = JobManager::default();
2344 let job = jm.enqueue("build");
2345 assert_eq!(job.name, "build");
2346 assert_eq!(job.status, JobStatus::Queued);
2347 assert_eq!(job.progress, Some(0));
2348 assert!(job.detail.is_none());
2349 assert_eq!(job.history.len(), 1);
2350 assert_eq!(job.history[0].phase, "created");
2351 }
2352
2353 #[test]
2354 fn set_running_transitions_from_queued() {
2355 let mut jm = JobManager::default();
2356 let job = jm.enqueue("deploy");
2357 let id = job.id.clone();
2358 jm.set_running(&id);
2359 let jobs = jm.list();
2360 let updated = jobs.iter().find(|j| j.id == id).unwrap();
2361 assert_eq!(updated.status, JobStatus::Running);
2362 assert_eq!(updated.history.last().unwrap().phase, "running");
2363 }
2364
2365 #[test]
2366 fn update_progress_clamps_to_100() {
2367 let mut jm = JobManager::default();
2368 let job = jm.enqueue("task");
2369 let id = job.id.clone();
2370 jm.update_progress(&id, 150, Some("over".to_string()));
2371 let jobs = jm.list();
2372 let updated = jobs.iter().find(|j| j.id == id).unwrap();
2373 assert_eq!(updated.progress, Some(100));
2374 }
2375
2376 #[test]
2377 fn complete_sets_progress_to_100() {
2378 let mut jm = JobManager::default();
2379 let job = jm.enqueue("task");
2380 let id = job.id.clone();
2381 jm.set_running(&id);
2382 jm.complete(&id);
2383 let jobs = jm.list();
2384 let updated = jobs.iter().find(|j| j.id == id).unwrap();
2385 assert_eq!(updated.status, JobStatus::Completed);
2386 assert_eq!(updated.progress, Some(100));
2387 }
2388
2389 #[test]
2390 fn fail_increments_attempt_and_sets_backoff() {
2391 let mut jm = JobManager::default();
2392 let job = jm.enqueue("fragile");
2393 let id = job.id.clone();
2394 jm.set_running(&id);
2395 jm.fail(&id, "crashed");
2396 let jobs = jm.list();
2397 let updated = jobs.iter().find(|j| j.id == id).unwrap();
2398 assert_eq!(updated.status, JobStatus::Failed);
2399 assert_eq!(updated.retry.attempt, 1);
2400 assert!(updated.retry.next_backoff_ms > 0);
2401 assert!(updated.retry.next_retry_at.is_some());
2402 assert_eq!(updated.detail.as_deref(), Some("crashed"));
2403 }
2404
2405 #[test]
2406 fn fail_clears_retry_after_max_attempts() {
2407 let mut jm = JobManager::default();
2408 let job = jm.enqueue("fragile");
2409 let id = job.id.clone();
2410 for _ in 0..=DEFAULT_JOB_MAX_ATTEMPTS {
2411 jm.set_running(&id);
2412 jm.fail(&id, "boom");
2413 }
2414 let jobs = jm.list();
2415 let updated = jobs.iter().find(|j| j.id == id).unwrap();
2416 assert_eq!(updated.retry.attempt, DEFAULT_JOB_MAX_ATTEMPTS);
2417 assert_eq!(updated.retry.next_backoff_ms, 0);
2418 assert!(updated.retry.next_retry_at.is_none());
2419 }
2420
2421 #[test]
2422 fn cancel_sets_status_and_clears_retry() {
2423 let mut jm = JobManager::default();
2424 let job = jm.enqueue("task");
2425 let id = job.id.clone();
2426 jm.cancel(&id);
2427 let jobs = jm.list();
2428 let updated = jobs.iter().find(|j| j.id == id).unwrap();
2429 assert_eq!(updated.status, JobStatus::Cancelled);
2430 assert_eq!(updated.retry.next_backoff_ms, 0);
2431 }
2432
2433 #[test]
2434 fn pause_and_resume_round_trip() {
2435 let mut jm = JobManager::default();
2436 let job = jm.enqueue("task");
2437 let id = job.id.clone();
2438 jm.set_running(&id);
2439 jm.pause(&id, Some("waiting".to_string()));
2440 let jobs = jm.list();
2441 let paused = jobs.iter().find(|j| j.id == id).unwrap();
2442 assert_eq!(paused.status, JobStatus::Paused);
2443 assert_eq!(paused.detail.as_deref(), Some("waiting"));
2444
2445 jm.resume(&id, None);
2446 let jobs = jm.list();
2447 let resumed = jobs.iter().find(|j| j.id == id).unwrap();
2448 assert_eq!(resumed.status, JobStatus::Running);
2449 assert_eq!(resumed.history.last().unwrap().phase, "resumed");
2450 }
2451
2452 #[test]
2453 fn list_returns_jobs_sorted_by_updated_at_desc() {
2454 let mut jm = JobManager::default();
2455 jm.enqueue("first");
2456 jm.enqueue("second");
2457 jm.enqueue("third");
2458 let jobs = jm.list();
2459 assert_eq!(jobs.len(), 3);
2460 for window in jobs.windows(2) {
2461 assert!(window[0].updated_at >= window[1].updated_at);
2462 }
2463 }
2464
2465 #[test]
2466 fn history_returns_entries_for_existing_job() {
2467 let mut jm = JobManager::default();
2468 let job = jm.enqueue("task");
2469 let id = job.id.clone();
2470 jm.set_running(&id);
2471 jm.complete(&id);
2472 let history = jm.history(&id);
2473 assert_eq!(history.len(), 3); assert_eq!(history[0].phase, "created");
2475 assert_eq!(history[1].phase, "running");
2476 assert_eq!(history[2].phase, "completed");
2477 }
2478
2479 #[test]
2480 fn history_returns_empty_for_unknown_job() {
2481 let jm = JobManager::default();
2482 assert!(jm.history("nonexistent").is_empty());
2483 }
2484
2485 #[test]
2486 fn resume_pending_requeues_running_and_queued() {
2487 let mut jm = JobManager::default();
2488 let _j1 = jm.enqueue("queued_task");
2489 let j2 = jm.enqueue("running_task");
2490 let j3 = jm.enqueue("completed_task");
2491 let id2 = j2.id.clone();
2492 let id3 = j3.id.clone();
2493 jm.set_running(&id2);
2494 jm.set_running(&id3);
2495 jm.complete(&id3);
2496
2497 let resumed = jm.resume_pending();
2498 assert_eq!(resumed.len(), 2);
2499 for job in &resumed {
2500 assert_eq!(job.status, JobStatus::Queued);
2501 }
2502 }
2503
2504 #[test]
2507 fn deterministic_backoff_zero_on_first_attempt() {
2508 let retry = JobRetryMetadata {
2509 attempt: 0,
2510 ..Default::default()
2511 };
2512 assert_eq!(JobManager::deterministic_backoff_ms(&retry), 0);
2513 }
2514
2515 #[test]
2516 fn deterministic_backoff_exponential_growth() {
2517 let base = DEFAULT_JOB_BACKOFF_BASE_MS;
2518 for attempt in 1..=5 {
2519 let retry = JobRetryMetadata {
2520 attempt,
2521 backoff_base_ms: base,
2522 ..Default::default()
2523 };
2524 let expected = base * 2u64.pow(attempt.saturating_sub(1).min(20));
2525 assert_eq!(
2526 JobManager::deterministic_backoff_ms(&retry),
2527 expected,
2528 "attempt {attempt}"
2529 );
2530 }
2531 }
2532
2533 #[test]
2534 fn deterministic_backoff_saturates_at_high_exponent() {
2535 let retry = JobRetryMetadata {
2536 attempt: 63,
2537 backoff_base_ms: 1000,
2538 ..Default::default()
2539 };
2540 let _ = JobManager::deterministic_backoff_ms(&retry);
2542 }
2543
2544 #[test]
2547 fn push_history_truncates_beyond_max() {
2548 let mut jm = JobManager::default();
2549 let job = jm.enqueue("task");
2550 let id = job.id.clone();
2551 for i in 0..(MAX_JOB_HISTORY_ENTRIES + 20) {
2553 jm.update_progress(&id, (i % 100) as u8, Some(format!("step {i}")));
2554 }
2555 let history = jm.history(&id);
2556 assert_eq!(history.len(), MAX_JOB_HISTORY_ENTRIES);
2557 }
2558
2559 #[test]
2562 fn encode_and_parse_persisted_detail_round_trip() {
2563 let mut jm = JobManager::default();
2564 let job = jm.enqueue("task");
2565 let id = job.id.clone();
2566 jm.set_running(&id);
2567 jm.fail(&id, "oops");
2568 let job = jm.list().into_iter().find(|j| j.id == id).unwrap();
2569
2570 let encoded = JobManager::encode_persisted_detail(&job).unwrap().unwrap();
2571 let parsed = JobManager::parse_persisted_detail(Some(&encoded)).unwrap();
2572
2573 assert_eq!(parsed.status, job.status);
2574 assert_eq!(parsed.detail, job.detail);
2575 assert_eq!(parsed.retry.attempt, job.retry.attempt);
2576 assert_eq!(parsed.history.len(), job.history.len());
2577 }
2578
2579 #[test]
2580 fn parse_persisted_detail_returns_none_for_none_input() {
2581 assert!(JobManager::parse_persisted_detail(None).is_none());
2582 }
2583
2584 #[test]
2585 fn parse_persisted_detail_returns_none_for_invalid_json() {
2586 assert!(JobManager::parse_persisted_detail(Some("not json")).is_none());
2587 }
2588
2589 #[test]
2592 fn job_status_round_trip_str() {
2593 let statuses = [
2594 JobStatus::Queued,
2595 JobStatus::Running,
2596 JobStatus::Paused,
2597 JobStatus::Completed,
2598 JobStatus::Failed,
2599 JobStatus::Cancelled,
2600 ];
2601 for status in &statuses {
2602 let s = job_status_to_str(*status);
2603 let parsed = job_status_from_str(s);
2604 assert_eq!(parsed, Some(*status), "round-trip failed for {s:?}");
2605 }
2606 }
2607
2608 #[test]
2609 fn job_status_from_str_returns_none_for_unknown() {
2610 assert_eq!(job_status_from_str("unknown"), None);
2611 assert_eq!(job_status_from_str(""), None);
2612 }
2613
2614 #[test]
2615 fn truncate_preview_limits_to_120_chars() {
2616 let long = "a".repeat(200);
2617 let truncated = truncate_preview(&long);
2618 assert_eq!(truncated.len(), 120);
2619 }
2620
2621 #[test]
2622 fn truncate_preview_preserves_short_strings() {
2623 let short = "hello";
2624 assert_eq!(truncate_preview(short), "hello");
2625 }
2626
2627 #[test]
2628 fn runtime_status_to_job_state_maps_correctly() {
2629 assert_eq!(
2630 runtime_status_to_job_state(JobStatus::Queued),
2631 JobStateStatus::Queued
2632 );
2633 assert_eq!(
2634 runtime_status_to_job_state(JobStatus::Running),
2635 JobStateStatus::Running
2636 );
2637 assert_eq!(
2638 runtime_status_to_job_state(JobStatus::Paused),
2639 JobStateStatus::Running
2640 );
2641 assert_eq!(
2642 runtime_status_to_job_state(JobStatus::Completed),
2643 JobStateStatus::Completed
2644 );
2645 assert_eq!(
2646 runtime_status_to_job_state(JobStatus::Failed),
2647 JobStateStatus::Failed
2648 );
2649 assert_eq!(
2650 runtime_status_to_job_state(JobStatus::Cancelled),
2651 JobStateStatus::Cancelled
2652 );
2653 }
2654
2655 #[test]
2656 fn job_state_status_to_runtime_maps_correctly() {
2657 assert_eq!(
2658 job_state_status_to_runtime(JobStateStatus::Queued),
2659 JobStatus::Queued
2660 );
2661 assert_eq!(
2662 job_state_status_to_runtime(JobStateStatus::Running),
2663 JobStatus::Running
2664 );
2665 assert_eq!(
2666 job_state_status_to_runtime(JobStateStatus::Completed),
2667 JobStatus::Completed
2668 );
2669 assert_eq!(
2670 job_state_status_to_runtime(JobStateStatus::Failed),
2671 JobStatus::Failed
2672 );
2673 assert_eq!(
2674 job_state_status_to_runtime(JobStateStatus::Cancelled),
2675 JobStatus::Cancelled
2676 );
2677 }
2678
2679 #[test]
2680 fn preview_from_initial_history_new() {
2681 let preview = preview_from_initial_history(&InitialHistory::New);
2682 assert_eq!(preview, "New conversation");
2683 }
2684
2685 #[test]
2686 fn preview_from_initial_history_forked() {
2687 let preview = preview_from_initial_history(&InitialHistory::Forked(vec![json!("hello")]));
2688 assert!(preview.contains("hello"));
2689 }
2690
2691 #[test]
2692 fn preview_from_initial_history_resumed() {
2693 let preview = preview_from_initial_history(&InitialHistory::Resumed {
2694 conversation_id: "test".to_string(),
2695 history: vec![json!("world")],
2696 rollout_path: PathBuf::from("/tmp/test"),
2697 });
2698 assert!(preview.contains("world"));
2699 }
2700
2701 #[test]
2702 fn json_optional_string_handles_null() {
2703 assert!(json_optional_string(&Value::Null).is_none());
2704 }
2705
2706 #[test]
2707 fn json_optional_string_handles_string() {
2708 assert_eq!(
2709 json_optional_string(&Value::String("hello".to_string())),
2710 Some("hello".to_string())
2711 );
2712 }
2713
2714 #[test]
2715 fn json_optional_string_handles_non_string() {
2716 assert!(json_optional_string(&json!(42)).is_none());
2717 }
2718
2719 #[test]
2720 fn parse_retry_metadata_returns_default_for_none() {
2721 let retry = parse_retry_metadata(None);
2722 assert_eq!(retry.attempt, 0);
2723 assert_eq!(retry.max_attempts, DEFAULT_JOB_MAX_ATTEMPTS);
2724 assert_eq!(retry.backoff_base_ms, DEFAULT_JOB_BACKOFF_BASE_MS);
2725 }
2726
2727 #[test]
2728 fn parse_retry_metadata_parses_fields() {
2729 let value = json!({
2730 "attempt": 2,
2731 "max_attempts": 5,
2732 "backoff_base_ms": 1000,
2733 "next_backoff_ms": 2000,
2734 "next_retry_at": 1234567890i64
2735 });
2736 let retry = parse_retry_metadata(Some(&value));
2737 assert_eq!(retry.attempt, 2);
2738 assert_eq!(retry.max_attempts, 5);
2739 assert_eq!(retry.backoff_base_ms, 1000);
2740 assert_eq!(retry.next_backoff_ms, 2000);
2741 assert_eq!(retry.next_retry_at, Some(1234567890));
2742 }
2743
2744 #[test]
2745 fn parse_history_entry_returns_none_without_status() {
2746 let value = json!({"at": 1, "phase": "test"});
2747 assert!(parse_history_entry(&value).is_none());
2748 }
2749
2750 #[test]
2751 fn parse_history_entry_parses_valid_entry() {
2752 let value = json!({
2753 "at": 100,
2754 "phase": "running",
2755 "status": "running",
2756 "progress": 50,
2757 "detail": "working",
2758 "retry": {"attempt": 0, "max_attempts": 3, "backoff_base_ms": 500}
2759 });
2760 let entry = parse_history_entry(&value).unwrap();
2761 assert_eq!(entry.at, 100);
2762 assert_eq!(entry.phase, "running");
2763 assert_eq!(entry.status, JobStatus::Running);
2764 assert_eq!(entry.progress, Some(50));
2765 assert_eq!(entry.detail.as_deref(), Some("working"));
2766 }
2767}