1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4
5use anyhow::Result;
6use codewhale_agent::ModelRegistry;
7use codewhale_config::{CliRuntimeOverrides, ConfigToml, ProviderKind};
8use codewhale_execpolicy::{
9 AskForApproval, ExecApprovalRequirement, ExecPolicyContext, ExecPolicyDecision,
10 ExecPolicyEngine,
11};
12use codewhale_hooks::{HookDispatcher, HookEvent};
13use codewhale_mcp::{
14 McpManager, McpStartupCompleteEvent, McpStartupStatus as McpManagerStartupStatus,
15};
16use codewhale_protocol::{
17 AppResponse, EventFrame, ExecApprovalRequestEvent, PromptRequest, PromptResponse,
18 ResponseChannel, ReviewDecision, Thread, ThreadForkParams, ThreadGoal, ThreadGoalClearParams,
19 ThreadGoalGetParams, ThreadGoalProgressParams, ThreadGoalSetParams, ThreadGoalStatus,
20 ThreadListParams, ThreadReadParams, ThreadRequest, ThreadResponse, ThreadResumeParams,
21 ThreadSetNameParams, ThreadStatus, ToolPayload,
22};
23use codewhale_state::{
24 JobStateRecord, JobStateStatus, SessionSource, StateStore, ThreadGoalRecord,
25 ThreadGoalStatus as PersistedThreadGoalStatus, ThreadListFilters, ThreadMetadata,
26 ThreadStatus as PersistedThreadStatus,
27};
28use codewhale_tools::{ToolCall, ToolRegistry};
29use serde_json::{Value, json};
30use uuid::Uuid;
31
32#[derive(Debug, Clone)]
34pub enum InitialHistory {
35 New,
37 Forked(Vec<Value>),
39 Resumed {
41 conversation_id: String,
42 history: Vec<Value>,
43 rollout_path: PathBuf,
44 },
45}
46
47#[derive(Debug, Clone)]
49pub struct NewThread {
50 pub thread: Thread,
52 pub model: String,
54 pub model_provider: String,
56 pub cwd: PathBuf,
58 pub approval_policy: Option<String>,
60 pub sandbox: Option<String>,
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum JobStatus {
67 Queued,
69 Running,
71 Paused,
73 Completed,
75 Failed,
77 Cancelled,
79}
80
81const JOB_DETAIL_SCHEMA_VERSION: u8 = 1;
82const DEFAULT_JOB_MAX_ATTEMPTS: u32 = 3;
83const DEFAULT_JOB_BACKOFF_BASE_MS: u64 = 500;
84const MAX_JOB_HISTORY_ENTRIES: usize = 64;
85
86#[derive(Debug, Clone)]
88pub struct JobRetryMetadata {
89 pub attempt: u32,
91 pub max_attempts: u32,
93 pub backoff_base_ms: u64,
95 pub next_backoff_ms: u64,
97 pub next_retry_at: Option<i64>,
99}
100
101impl Default for JobRetryMetadata {
102 fn default() -> Self {
103 Self {
104 attempt: 0,
105 max_attempts: DEFAULT_JOB_MAX_ATTEMPTS,
106 backoff_base_ms: DEFAULT_JOB_BACKOFF_BASE_MS,
107 next_backoff_ms: 0,
108 next_retry_at: None,
109 }
110 }
111}
112
113#[derive(Debug, Clone)]
115pub struct JobHistoryEntry {
116 pub at: i64,
118 pub phase: String,
120 pub status: JobStatus,
122 pub progress: Option<u8>,
124 pub detail: Option<String>,
126 pub retry: JobRetryMetadata,
128}
129
130#[derive(Debug, Clone)]
131struct PersistedJobDetail {
132 pub status: JobStatus,
133 pub detail: Option<String>,
134 pub retry: JobRetryMetadata,
135 pub history: Vec<JobHistoryEntry>,
136}
137
138#[derive(Debug, Clone)]
140pub struct JobRecord {
141 pub id: String,
143 pub name: String,
145 pub status: JobStatus,
147 pub progress: Option<u8>,
149 pub detail: Option<String>,
151 pub retry: JobRetryMetadata,
153 pub history: Vec<JobHistoryEntry>,
155 pub created_at: i64,
157 pub updated_at: i64,
159}
160
161#[derive(Debug, Default)]
163pub struct JobManager {
164 jobs: HashMap<String, JobRecord>,
165}
166
167impl JobManager {
168 fn now_ts() -> i64 {
169 chrono::Utc::now().timestamp()
170 }
171
172 fn deterministic_backoff_ms(retry: &JobRetryMetadata) -> u64 {
173 if retry.attempt == 0 {
174 return 0;
175 }
176 let exponent = retry.attempt.saturating_sub(1).min(20);
177 let multiplier = 1u64.checked_shl(exponent).unwrap_or(u64::MAX);
178 retry.backoff_base_ms.saturating_mul(multiplier)
179 }
180
181 fn clear_retry_schedule(retry: &mut JobRetryMetadata) {
182 retry.next_backoff_ms = 0;
183 retry.next_retry_at = None;
184 }
185
186 fn push_history(job: &mut JobRecord, phase: &str) {
187 job.history.push(JobHistoryEntry {
188 at: job.updated_at,
189 phase: phase.to_string(),
190 status: job.status,
191 progress: job.progress,
192 detail: job.detail.clone(),
193 retry: job.retry.clone(),
194 });
195 if job.history.len() > MAX_JOB_HISTORY_ENTRIES {
196 let to_drain = job.history.len() - MAX_JOB_HISTORY_ENTRIES;
197 job.history.drain(0..to_drain);
198 }
199 }
200
201 fn parse_persisted_detail(raw: Option<&str>) -> Option<PersistedJobDetail> {
202 let raw = raw?;
203 let parsed: Value = serde_json::from_str(raw).ok()?;
204 let status = parsed
205 .get("status")
206 .and_then(Value::as_str)
207 .and_then(job_status_from_str)?;
208 let detail = parsed.get("detail").and_then(json_optional_string);
209 let retry = parse_retry_metadata(parsed.get("retry"));
210 let history = parsed
211 .get("history")
212 .and_then(Value::as_array)
213 .map(|items| {
214 items
215 .iter()
216 .filter_map(parse_history_entry)
217 .collect::<Vec<_>>()
218 })
219 .unwrap_or_default();
220 Some(PersistedJobDetail {
221 status,
222 detail,
223 retry,
224 history,
225 })
226 }
227
228 fn encode_persisted_detail(job: &JobRecord) -> Result<Option<String>> {
229 let encoded = json!({
230 "schema_version": JOB_DETAIL_SCHEMA_VERSION,
231 "status": job_status_to_str(job.status),
232 "detail": job.detail.clone(),
233 "retry": job_retry_to_value(&job.retry),
234 "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
235 })
236 .to_string();
237 Ok(Some(encoded))
238 }
239
240 pub fn enqueue(&mut self, name: impl Into<String>) -> JobRecord {
242 let now = Self::now_ts();
243 let id = format!("job-{}", Uuid::new_v4());
244 let mut job = JobRecord {
245 id: id.clone(),
246 name: name.into(),
247 status: JobStatus::Queued,
248 progress: Some(0),
249 detail: None,
250 retry: JobRetryMetadata::default(),
251 history: Vec::new(),
252 created_at: now,
253 updated_at: now,
254 };
255 Self::push_history(&mut job, "created");
256 self.jobs.insert(id, job.clone());
257 job
258 }
259
260 pub fn set_running(&mut self, id: &str) {
262 if let Some(job) = self.jobs.get_mut(id) {
263 job.status = JobStatus::Running;
264 Self::clear_retry_schedule(&mut job.retry);
265 job.updated_at = Self::now_ts();
266 Self::push_history(job, "running");
267 }
268 }
269
270 pub fn update_progress(&mut self, id: &str, progress: u8, detail: Option<String>) {
272 if let Some(job) = self.jobs.get_mut(id) {
273 job.progress = Some(progress.min(100));
274 job.detail = detail;
275 job.updated_at = Self::now_ts();
276 Self::push_history(job, "progress_updated");
277 }
278 }
279
280 pub fn complete(&mut self, id: &str) {
282 if let Some(job) = self.jobs.get_mut(id) {
283 job.status = JobStatus::Completed;
284 job.progress = Some(100);
285 Self::clear_retry_schedule(&mut job.retry);
286 job.updated_at = Self::now_ts();
287 Self::push_history(job, "completed");
288 }
289 }
290
291 pub fn fail(&mut self, id: &str, detail: impl Into<String>) {
293 if let Some(job) = self.jobs.get_mut(id) {
294 let now = Self::now_ts();
295 job.status = JobStatus::Failed;
296 job.detail = Some(detail.into());
297 if job.retry.attempt < job.retry.max_attempts {
298 job.retry.attempt += 1;
299 job.retry.next_backoff_ms = Self::deterministic_backoff_ms(&job.retry);
300 let delay_secs = ((job.retry.next_backoff_ms.saturating_add(999)) / 1000)
301 .min(i64::MAX as u64) as i64;
302 job.retry.next_retry_at = Some(now.saturating_add(delay_secs));
303 } else {
304 Self::clear_retry_schedule(&mut job.retry);
305 }
306 job.updated_at = now;
307 Self::push_history(job, "failed");
308 }
309 }
310
311 pub fn cancel(&mut self, id: &str) {
313 if let Some(job) = self.jobs.get_mut(id) {
314 job.status = JobStatus::Cancelled;
315 Self::clear_retry_schedule(&mut job.retry);
316 job.updated_at = Self::now_ts();
317 Self::push_history(job, "cancelled");
318 }
319 }
320
321 pub fn pause(&mut self, id: &str, detail: Option<String>) {
323 if let Some(job) = self.jobs.get_mut(id) {
324 job.status = JobStatus::Paused;
325 if detail.is_some() {
326 job.detail = detail;
327 }
328 job.updated_at = Self::now_ts();
329 Self::push_history(job, "paused");
330 }
331 }
332
333 pub fn resume(&mut self, id: &str, detail: Option<String>) {
335 if let Some(job) = self.jobs.get_mut(id) {
336 job.status = JobStatus::Running;
337 if detail.is_some() {
338 job.detail = detail;
339 }
340 Self::clear_retry_schedule(&mut job.retry);
341 job.updated_at = Self::now_ts();
342 Self::push_history(job, "resumed");
343 }
344 }
345
346 pub fn list(&self) -> Vec<JobRecord> {
348 let mut out = self.jobs.values().cloned().collect::<Vec<_>>();
349 out.sort_by_key(|job| std::cmp::Reverse(job.updated_at));
350 out
351 }
352
353 pub fn history(&self, id: &str) -> Vec<JobHistoryEntry> {
355 self.jobs
356 .get(id)
357 .map(|job| job.history.clone())
358 .unwrap_or_default()
359 }
360
361 pub fn resume_pending(&mut self) -> Vec<JobRecord> {
363 let mut resumed = Vec::new();
364 for job in self.jobs.values_mut() {
365 if matches!(job.status, JobStatus::Queued | JobStatus::Running) {
366 job.status = JobStatus::Queued;
367 job.updated_at = Self::now_ts();
368 Self::push_history(job, "queued_after_resume");
369 resumed.push(job.clone());
370 }
371 }
372 resumed
373 }
374
375 pub fn load_from_store(&mut self, store: &StateStore) -> Result<()> {
377 let persisted = store.list_jobs(Some(500))?;
378 for job in persisted {
379 let fallback_status = job_state_status_to_runtime(job.status);
380 let parsed = Self::parse_persisted_detail(job.detail.as_deref());
381 let (status, detail, retry, history) = if let Some(detail_state) = parsed {
382 (
383 detail_state.status,
384 detail_state.detail,
385 detail_state.retry,
386 detail_state.history,
387 )
388 } else {
389 (
390 fallback_status,
391 job.detail,
392 JobRetryMetadata::default(),
393 Vec::new(),
394 )
395 };
396 self.jobs.insert(
397 job.id.clone(),
398 JobRecord {
399 id: job.id,
400 name: job.name,
401 status,
402 progress: job.progress,
403 detail,
404 retry,
405 history,
406 created_at: job.created_at,
407 updated_at: job.updated_at,
408 },
409 );
410 }
411 Ok(())
412 }
413
414 pub fn persist_job(&self, store: &StateStore, id: &str) -> Result<()> {
416 let Some(job) = self.jobs.get(id) else {
417 return Ok(());
418 };
419 let encoded_detail = Self::encode_persisted_detail(job)?;
420 store.upsert_job(&JobStateRecord {
421 id: job.id.clone(),
422 name: job.name.clone(),
423 status: runtime_status_to_job_state(job.status),
424 progress: job.progress,
425 detail: encoded_detail,
426 created_at: job.created_at,
427 updated_at: job.updated_at,
428 })
429 }
430
431 pub fn persist_all(&self, store: &StateStore) -> Result<()> {
433 for id in self.jobs.keys() {
434 self.persist_job(store, id)?;
435 }
436 Ok(())
437 }
438}
439
440pub struct ThreadManager {
442 store: StateStore,
443 running_threads: HashMap<String, Thread>,
444 cli_version: String,
445}
446
447impl ThreadManager {
448 pub fn new(store: StateStore) -> Self {
450 Self {
451 store,
452 running_threads: HashMap::new(),
453 cli_version: env!("CARGO_PKG_VERSION").to_string(),
454 }
455 }
456
457 pub fn state_store(&self) -> &StateStore {
459 &self.store
460 }
461
462 pub fn spawn_thread_with_history(
464 &mut self,
465 model_provider: String,
466 cwd: PathBuf,
467 initial_history: InitialHistory,
468 persist_extended_history: bool,
469 ) -> Result<NewThread> {
470 let id = format!("thread-{}", Uuid::new_v4());
471 let now = chrono::Utc::now().timestamp();
472 let preview = preview_from_initial_history(&initial_history);
473 let source = match initial_history {
474 InitialHistory::New => SessionSource::Interactive,
475 InitialHistory::Forked(_) => SessionSource::Fork,
476 InitialHistory::Resumed { .. } => SessionSource::Resume,
477 };
478 let thread = Thread {
479 id: id.clone(),
480 preview,
481 ephemeral: !persist_extended_history,
482 model_provider: model_provider.clone(),
483 created_at: now,
484 updated_at: now,
485 status: ThreadStatus::Running,
486 path: None,
487 cwd: cwd.clone(),
488 cli_version: self.cli_version.clone(),
489 source: match source {
490 SessionSource::Interactive => codewhale_protocol::SessionSource::Interactive,
491 SessionSource::Resume => codewhale_protocol::SessionSource::Resume,
492 SessionSource::Fork => codewhale_protocol::SessionSource::Fork,
493 SessionSource::Api => codewhale_protocol::SessionSource::Api,
494 SessionSource::Unknown => codewhale_protocol::SessionSource::Unknown,
495 },
496 name: None,
497 };
498 self.persist_thread(&thread, None)?;
499 match &initial_history {
500 InitialHistory::Forked(items) => {
501 for item in items {
502 self.store.append_message(
503 &thread.id,
504 "history",
505 &item.to_string(),
506 Some(item.clone()),
507 )?;
508 }
509 }
510 InitialHistory::Resumed { history, .. } => {
511 for item in history {
512 self.store.append_message(
513 &thread.id,
514 "history",
515 &item.to_string(),
516 Some(item.clone()),
517 )?;
518 }
519 }
520 InitialHistory::New => {}
521 }
522 self.running_threads
523 .insert(thread.id.clone(), thread.clone());
524 Ok(NewThread {
525 thread,
526 model: "auto".to_string(),
527 model_provider,
528 cwd,
529 approval_policy: None,
530 sandbox: None,
531 })
532 }
533
534 pub fn resume_thread_with_history(
536 &mut self,
537 params: &ThreadResumeParams,
538 fallback_cwd: &Path,
539 model_provider: String,
540 ) -> Result<Option<NewThread>> {
541 if params.history.is_none()
542 && let Some(thread) = self.running_threads.get(¶ms.thread_id).cloned()
543 {
544 return Ok(Some(NewThread {
545 model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
546 model_provider: params.model_provider.clone().unwrap_or(model_provider),
547 cwd: params.cwd.clone().unwrap_or_else(|| thread.cwd.clone()),
548 approval_policy: params.approval_policy.clone(),
549 sandbox: params.sandbox.clone(),
550 thread,
551 }));
552 }
553
554 let persisted = self.store.get_thread(¶ms.thread_id)?;
555 let Some(metadata) = persisted else {
556 return Ok(None);
557 };
558 let mut thread = to_protocol_thread(metadata);
559 thread.status = ThreadStatus::Running;
560 thread.updated_at = chrono::Utc::now().timestamp();
561 thread.cwd = params
562 .cwd
563 .clone()
564 .unwrap_or_else(|| fallback_cwd.to_path_buf());
565 self.persist_thread(&thread, None)?;
566 self.running_threads
567 .insert(thread.id.clone(), thread.clone());
568 if let Some(history) = params.history.as_ref() {
569 for item in history {
570 self.store.append_message(
571 &thread.id,
572 "history",
573 &item.to_string(),
574 Some(item.clone()),
575 )?;
576 }
577 }
578
579 Ok(Some(NewThread {
580 model: params.model.clone().unwrap_or_else(|| "auto".to_string()),
581 model_provider: params.model_provider.clone().unwrap_or(model_provider),
582 cwd: thread.cwd.clone(),
583 approval_policy: params.approval_policy.clone(),
584 sandbox: params.sandbox.clone(),
585 thread,
586 }))
587 }
588
589 pub fn fork_thread(
591 &mut self,
592 params: &ThreadForkParams,
593 fallback_cwd: &Path,
594 ) -> Result<Option<NewThread>> {
595 let parent = self.store.get_thread(¶ms.thread_id)?;
596 let Some(parent) = parent else {
597 return Ok(None);
598 };
599 let parent_thread = to_protocol_thread(parent);
600 let new = self.spawn_thread_with_history(
601 params
602 .model_provider
603 .clone()
604 .unwrap_or_else(|| parent_thread.model_provider.clone()),
605 params
606 .cwd
607 .clone()
608 .unwrap_or_else(|| fallback_cwd.to_path_buf()),
609 InitialHistory::Forked(vec![json!({
610 "type": "fork",
611 "from_thread_id": parent_thread.id
612 })]),
613 params.persist_extended_history,
614 )?;
615 Ok(Some(new))
616 }
617
618 pub fn list_threads(&self, params: &ThreadListParams) -> Result<Vec<Thread>> {
620 let list = self.store.list_threads(ThreadListFilters {
621 include_archived: params.include_archived,
622 limit: params.limit,
623 })?;
624 Ok(list.into_iter().map(to_protocol_thread).collect())
625 }
626
627 pub fn read_thread(&self, params: &ThreadReadParams) -> Result<Option<Thread>> {
629 Ok(self
630 .store
631 .get_thread(¶ms.thread_id)?
632 .map(to_protocol_thread))
633 }
634
635 pub fn set_thread_name(&mut self, params: &ThreadSetNameParams) -> Result<Option<Thread>> {
637 let Some(mut metadata) = self.store.get_thread(¶ms.thread_id)? else {
638 return Ok(None);
639 };
640 metadata.name = Some(params.name.clone());
641 metadata.updated_at = chrono::Utc::now().timestamp();
642 self.store.upsert_thread(&metadata)?;
643 let updated = to_protocol_thread(metadata);
644 self.running_threads
645 .insert(updated.id.clone(), updated.clone());
646 Ok(Some(updated))
647 }
648
649 pub fn set_thread_goal(&mut self, params: &ThreadGoalSetParams) -> Result<Option<ThreadGoal>> {
651 if self.store.get_thread(¶ms.thread_id)?.is_none() {
652 return Ok(None);
653 }
654 let now = chrono::Utc::now().timestamp();
655 let goal = ThreadGoalRecord {
656 thread_id: params.thread_id.clone(),
657 goal_id: format!("goal-{}", Uuid::new_v4()),
658 objective: params.objective.clone(),
659 status: PersistedThreadGoalStatus::Active,
660 token_budget: params.token_budget,
661 tokens_used: 0,
662 time_used_seconds: 0,
663 continuation_count: 0,
664 created_at: now,
665 updated_at: now,
666 };
667 self.store.upsert_thread_goal(&goal)?;
668 Ok(Some(to_protocol_goal(goal)))
669 }
670
671 pub fn get_thread_goal(&self, params: &ThreadGoalGetParams) -> Result<Option<ThreadGoal>> {
673 Ok(self
674 .store
675 .get_thread_goal(¶ms.thread_id)?
676 .map(to_protocol_goal))
677 }
678
679 pub fn record_thread_goal_progress(
681 &mut self,
682 params: &ThreadGoalProgressParams,
683 ) -> Result<Option<ThreadGoal>> {
684 if self.store.get_thread(¶ms.thread_id)?.is_none() {
685 return Ok(None);
686 }
687
688 let now = chrono::Utc::now().timestamp();
689 let mut goal = if params.token_delta != 0 || params.time_delta_seconds != 0 {
690 self.store.record_thread_goal_usage(
691 ¶ms.thread_id,
692 params.token_delta,
693 params.time_delta_seconds,
694 now,
695 )?
696 } else {
697 self.store.get_thread_goal(¶ms.thread_id)?
698 };
699
700 if params.record_continuation {
701 goal = self
702 .store
703 .record_thread_goal_continuation(¶ms.thread_id, now)?;
704 }
705
706 Ok(goal.map(to_protocol_goal))
707 }
708
709 pub fn clear_thread_goal(&mut self, params: &ThreadGoalClearParams) -> Result<bool> {
711 self.store.delete_thread_goal(¶ms.thread_id)
712 }
713
714 pub fn archive_thread(&mut self, thread_id: &str) -> Result<()> {
716 self.store.mark_archived(thread_id)?;
717 if let Some(thread) = self.running_threads.get_mut(thread_id) {
718 thread.status = ThreadStatus::Archived;
719 }
720 Ok(())
721 }
722
723 pub fn unarchive_thread(&mut self, thread_id: &str) -> Result<()> {
725 self.store.mark_unarchived(thread_id)?;
726 Ok(())
727 }
728
729 pub fn touch_message(&mut self, thread_id: &str, input: &str) -> Result<()> {
731 let Some(mut metadata) = self.store.get_thread(thread_id)? else {
732 return Ok(());
733 };
734 metadata.updated_at = chrono::Utc::now().timestamp();
735 metadata.preview = truncate_preview(input);
736 metadata.status = PersistedThreadStatus::Running;
737 self.store.upsert_thread(&metadata)?;
738 if let Some(thread) = self.running_threads.get_mut(thread_id) {
739 thread.updated_at = metadata.updated_at;
740 thread.preview = metadata.preview;
741 thread.status = ThreadStatus::Running;
742 }
743 let message_id = self.store.append_message(thread_id, "user", input, None)?;
744 self.store.save_checkpoint(
745 thread_id,
746 "latest",
747 &json!({
748 "reason": "thread_message",
749 "message_id": message_id,
750 "role": "user",
751 "preview": truncate_preview(input),
752 "updated_at": metadata.updated_at
753 }),
754 )?;
755 Ok(())
756 }
757
758 fn persist_thread(&self, thread: &Thread, rollout_path: Option<PathBuf>) -> Result<()> {
759 self.store.upsert_thread(&ThreadMetadata {
760 id: thread.id.clone(),
761 rollout_path,
762 preview: thread.preview.clone(),
763 ephemeral: thread.ephemeral,
764 model_provider: thread.model_provider.clone(),
765 created_at: thread.created_at,
766 updated_at: thread.updated_at,
767 status: to_persisted_status(&thread.status),
768 path: thread.path.clone(),
769 cwd: thread.cwd.clone(),
770 cli_version: thread.cli_version.clone(),
771 source: to_persisted_source(&thread.source),
772 name: thread.name.clone(),
773 sandbox_policy: None,
774 approval_mode: None,
775 archived: matches!(thread.status, ThreadStatus::Archived),
776 archived_at: None,
777 git_sha: None,
778 git_branch: None,
779 git_origin_url: None,
780 memory_mode: None,
781 current_leaf_id: None,
782 })
783 }
784}
785
786pub struct Runtime {
788 pub config: ConfigToml,
790 pub model_registry: ModelRegistry,
792 pub thread_manager: ThreadManager,
794 pub tool_registry: Arc<ToolRegistry>,
796 pub mcp_manager: Arc<McpManager>,
798 pub exec_policy: ExecPolicyEngine,
800 pub hooks: HookDispatcher,
802 pub jobs: JobManager,
804}
805
806impl Runtime {
807 pub fn new(
809 config: ConfigToml,
810 model_registry: ModelRegistry,
811 state: StateStore,
812 tool_registry: Arc<ToolRegistry>,
813 mcp_manager: Arc<McpManager>,
814 exec_policy: ExecPolicyEngine,
815 hooks: HookDispatcher,
816 ) -> Self {
817 let mut jobs = JobManager::default();
818 if let Err(e) = jobs.load_from_store(&state) {
819 tracing::warn!("Failed to load job store, starting with empty job list: {e}");
820 }
821 Self {
822 config,
823 model_registry,
824 thread_manager: ThreadManager::new(state),
825 tool_registry,
826 mcp_manager,
827 exec_policy,
828 hooks,
829 jobs,
830 }
831 }
832
833 fn persisted_thread_data(&self, thread_id: &str) -> Result<Value> {
834 let history = self
835 .thread_manager
836 .state_store()
837 .list_messages(thread_id, Some(500))?
838 .into_iter()
839 .map(|message| {
840 json!({
841 "id": message.id,
842 "role": message.role,
843 "content": message.content,
844 "item": message.item,
845 "created_at": message.created_at
846 })
847 })
848 .collect::<Vec<_>>();
849
850 let checkpoint = self
851 .thread_manager
852 .state_store()
853 .load_checkpoint(thread_id, None)?
854 .map(|record| {
855 json!({
856 "checkpoint_id": record.checkpoint_id,
857 "state": record.state,
858 "created_at": record.created_at
859 })
860 });
861
862 let goal = self
863 .thread_manager
864 .state_store()
865 .get_thread_goal(thread_id)?
866 .map(to_protocol_goal);
867
868 Ok(json!({
869 "history": history,
870 "checkpoint": checkpoint,
871 "goal": goal
872 }))
873 }
874
875 fn persist_latest_checkpoint(&self, thread_id: &str, reason: &str, state: Value) -> Result<()> {
876 self.thread_manager.state_store().save_checkpoint(
877 thread_id,
878 "latest",
879 &json!({
880 "reason": reason,
881 "saved_at": chrono::Utc::now().timestamp(),
882 "state": state
883 }),
884 )
885 }
886
887 pub async fn handle_thread(&mut self, req: ThreadRequest) -> Result<ThreadResponse> {
889 match req {
890 ThreadRequest::Create { .. } => {
891 let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
892 let new = self.thread_manager.spawn_thread_with_history(
893 "deepseek".to_string(),
894 cwd,
895 InitialHistory::New,
896 false,
897 )?;
898 let mut response = thread_response_from_new("created", new);
899 response.data = self.persisted_thread_data(&response.thread_id)?;
900 Ok(response)
901 }
902 ThreadRequest::Start(params) => {
903 let cwd = params.cwd.clone().unwrap_or_else(|| {
904 std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
905 });
906 let new = self.thread_manager.spawn_thread_with_history(
907 params
908 .model_provider
909 .clone()
910 .unwrap_or_else(|| "deepseek".to_string()),
911 cwd,
912 InitialHistory::New,
913 params.persist_extended_history,
914 )?;
915 let mut response = thread_response_from_new("started", new);
916 response.data = self.persisted_thread_data(&response.thread_id)?;
917 Ok(response)
918 }
919 ThreadRequest::Resume(params) => {
920 let fallback_cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
921 if let Some(new) = self.thread_manager.resume_thread_with_history(
922 ¶ms,
923 &fallback_cwd,
924 "deepseek".to_string(),
925 )? {
926 let mut response = thread_response_from_new("resumed", new);
927 response.data = self.persisted_thread_data(&response.thread_id)?;
928 Ok(response)
929 } else {
930 Ok(ThreadResponse {
931 thread_id: params.thread_id,
932 status: "missing".to_string(),
933 thread: None,
934 threads: Vec::new(),
935 goal: None,
936 model: None,
937 model_provider: None,
938 cwd: None,
939 approval_policy: params.approval_policy,
940 sandbox: params.sandbox,
941 events: Vec::new(),
942 data: json!({"error":"thread not found"}),
943 })
944 }
945 }
946 ThreadRequest::Fork(params) => {
947 let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
948 if let Some(new) = self.thread_manager.fork_thread(¶ms, &cwd)? {
949 let mut response = thread_response_from_new("forked", new);
950 response.data = self.persisted_thread_data(&response.thread_id)?;
951 Ok(response)
952 } else {
953 Ok(ThreadResponse {
954 thread_id: params.thread_id,
955 status: "missing".to_string(),
956 thread: None,
957 threads: Vec::new(),
958 goal: None,
959 model: None,
960 model_provider: None,
961 cwd: None,
962 approval_policy: params.approval_policy,
963 sandbox: params.sandbox,
964 events: Vec::new(),
965 data: json!({"error":"thread not found"}),
966 })
967 }
968 }
969 ThreadRequest::List(params) => Ok(ThreadResponse {
970 thread_id: "list".to_string(),
971 status: "ok".to_string(),
972 thread: None,
973 threads: self.thread_manager.list_threads(¶ms)?,
974 goal: None,
975 model: None,
976 model_provider: None,
977 cwd: None,
978 approval_policy: None,
979 sandbox: None,
980 events: Vec::new(),
981 data: json!({}),
982 }),
983 ThreadRequest::Read(params) => {
984 let id = params.thread_id.clone();
985 let data = self.persisted_thread_data(&id)?;
986 Ok(ThreadResponse {
987 thread_id: id,
988 status: "ok".to_string(),
989 thread: self.thread_manager.read_thread(¶ms)?,
990 threads: Vec::new(),
991 goal: self.thread_manager.get_thread_goal(&ThreadGoalGetParams {
992 thread_id: params.thread_id,
993 })?,
994 model: None,
995 model_provider: None,
996 cwd: None,
997 approval_policy: None,
998 sandbox: None,
999 events: Vec::new(),
1000 data,
1001 })
1002 }
1003 ThreadRequest::SetName(params) => Ok(ThreadResponse {
1004 thread_id: params.thread_id.clone(),
1005 status: "ok".to_string(),
1006 thread: self.thread_manager.set_thread_name(¶ms)?,
1007 threads: Vec::new(),
1008 goal: None,
1009 model: None,
1010 model_provider: None,
1011 cwd: None,
1012 approval_policy: None,
1013 sandbox: None,
1014 events: Vec::new(),
1015 data: json!({}),
1016 }),
1017 ThreadRequest::GoalSet(params) => {
1018 let thread_id = params.thread_id.clone();
1019 if let Some(goal) = self.thread_manager.set_thread_goal(¶ms)? {
1020 Ok(ThreadResponse {
1021 thread_id,
1022 status: "ok".to_string(),
1023 thread: None,
1024 threads: Vec::new(),
1025 goal: Some(goal.clone()),
1026 model: None,
1027 model_provider: None,
1028 cwd: None,
1029 approval_policy: None,
1030 sandbox: None,
1031 events: vec![EventFrame::ThreadGoalUpdated { goal: goal.clone() }],
1032 data: json!({ "goal": goal }),
1033 })
1034 } else {
1035 Ok(ThreadResponse {
1036 thread_id,
1037 status: "missing".to_string(),
1038 thread: None,
1039 threads: Vec::new(),
1040 goal: None,
1041 model: None,
1042 model_provider: None,
1043 cwd: None,
1044 approval_policy: None,
1045 sandbox: None,
1046 events: Vec::new(),
1047 data: json!({"error":"thread not found"}),
1048 })
1049 }
1050 }
1051 ThreadRequest::GoalGet(params) => {
1052 let goal = self.thread_manager.get_thread_goal(¶ms)?;
1053 Ok(ThreadResponse {
1054 thread_id: params.thread_id,
1055 status: "ok".to_string(),
1056 thread: None,
1057 threads: Vec::new(),
1058 goal: goal.clone(),
1059 model: None,
1060 model_provider: None,
1061 cwd: None,
1062 approval_policy: None,
1063 sandbox: None,
1064 events: Vec::new(),
1065 data: json!({ "goal": goal }),
1066 })
1067 }
1068 ThreadRequest::GoalClear(params) => {
1069 let thread_id = params.thread_id.clone();
1070 let cleared = self.thread_manager.clear_thread_goal(¶ms)?;
1071 Ok(ThreadResponse {
1072 thread_id: thread_id.clone(),
1073 status: if cleared { "cleared" } else { "empty" }.to_string(),
1074 thread: None,
1075 threads: Vec::new(),
1076 goal: None,
1077 model: None,
1078 model_provider: None,
1079 cwd: None,
1080 approval_policy: None,
1081 sandbox: None,
1082 events: if cleared {
1083 vec![EventFrame::ThreadGoalCleared { thread_id }]
1084 } else {
1085 Vec::new()
1086 },
1087 data: json!({ "cleared": cleared }),
1088 })
1089 }
1090 ThreadRequest::GoalRecordProgress(params) => {
1091 let thread_id = params.thread_id.clone();
1092 if let Some(goal) = self.thread_manager.record_thread_goal_progress(¶ms)? {
1093 Ok(ThreadResponse {
1094 thread_id,
1095 status: "ok".to_string(),
1096 thread: None,
1097 threads: Vec::new(),
1098 goal: Some(goal.clone()),
1099 model: None,
1100 model_provider: None,
1101 cwd: None,
1102 approval_policy: None,
1103 sandbox: None,
1104 events: vec![EventFrame::ThreadGoalUpdated { goal: goal.clone() }],
1105 data: json!({ "goal": goal }),
1106 })
1107 } else {
1108 Ok(ThreadResponse {
1109 thread_id,
1110 status: "missing".to_string(),
1111 thread: None,
1112 threads: Vec::new(),
1113 goal: None,
1114 model: None,
1115 model_provider: None,
1116 cwd: None,
1117 approval_policy: None,
1118 sandbox: None,
1119 events: Vec::new(),
1120 data: json!({"error":"thread or goal not found"}),
1121 })
1122 }
1123 }
1124 ThreadRequest::Archive { thread_id } => {
1125 self.thread_manager.archive_thread(&thread_id)?;
1126 Ok(ThreadResponse {
1127 thread_id,
1128 status: "archived".to_string(),
1129 thread: None,
1130 threads: Vec::new(),
1131 goal: None,
1132 model: None,
1133 model_provider: None,
1134 cwd: None,
1135 approval_policy: None,
1136 sandbox: None,
1137 events: Vec::new(),
1138 data: json!({}),
1139 })
1140 }
1141 ThreadRequest::Unarchive { thread_id } => {
1142 self.thread_manager.unarchive_thread(&thread_id)?;
1143 Ok(ThreadResponse {
1144 thread_id,
1145 status: "unarchived".to_string(),
1146 thread: None,
1147 threads: Vec::new(),
1148 goal: None,
1149 model: None,
1150 model_provider: None,
1151 cwd: None,
1152 approval_policy: None,
1153 sandbox: None,
1154 events: Vec::new(),
1155 data: json!({}),
1156 })
1157 }
1158 ThreadRequest::Message { thread_id, input } => {
1159 self.thread_manager.touch_message(&thread_id, &input)?;
1160 let response_id = format!("{thread_id}:{}", input.len());
1161 self.hooks
1162 .emit(HookEvent::ResponseStart {
1163 response_id: response_id.clone(),
1164 })
1165 .await;
1166 self.hooks
1167 .emit(HookEvent::ResponseEnd {
1168 response_id: response_id.clone(),
1169 })
1170 .await;
1171
1172 Ok(ThreadResponse {
1173 thread_id,
1174 status: "accepted".to_string(),
1175 thread: None,
1176 threads: Vec::new(),
1177 goal: None,
1178 model: None,
1179 model_provider: None,
1180 cwd: None,
1181 approval_policy: None,
1182 sandbox: None,
1183 events: vec![
1184 EventFrame::ResponseStart {
1185 response_id: response_id.clone(),
1186 },
1187 EventFrame::ResponseDelta {
1188 response_id: response_id.clone(),
1189 delta: "queued".to_string(),
1190 channel: ResponseChannel::Text,
1191 },
1192 EventFrame::ResponseEnd { response_id },
1193 ],
1194 data: json!({}),
1195 })
1196 }
1197 }
1198 }
1199
1200 pub async fn handle_prompt(
1202 &mut self,
1203 req: PromptRequest,
1204 cli_overrides: &CliRuntimeOverrides,
1205 ) -> Result<PromptResponse> {
1206 let resolved = self.config.resolve_runtime_options(cli_overrides);
1207 let requested_model = req.model.clone().unwrap_or_else(|| resolved.model.clone());
1208 let selection = self
1209 .model_registry
1210 .resolve(Some(&requested_model), Some(resolved.provider));
1211 let resolved_model = selection.resolved.id.clone();
1212 let response_id = format!("resp-{}", Uuid::new_v4());
1213
1214 self.hooks
1215 .emit(HookEvent::ResponseStart {
1216 response_id: response_id.clone(),
1217 })
1218 .await;
1219 self.hooks
1220 .emit(HookEvent::ResponseDelta {
1221 response_id: response_id.clone(),
1222 delta: "model-selected".to_string(),
1223 })
1224 .await;
1225 self.hooks
1226 .emit(HookEvent::ResponseEnd {
1227 response_id: response_id.clone(),
1228 })
1229 .await;
1230
1231 let payload = json!({
1232 "provider": resolved.provider.as_str(),
1233 "model": resolved_model.clone(),
1234 "prompt": req.prompt,
1235 "telemetry": resolved.telemetry,
1236 "base_url": resolved.base_url,
1237 "has_api_key": resolved.api_key.as_ref().is_some_and(|k| !k.trim().is_empty()),
1238 "approval_policy": resolved.approval_policy,
1239 "sandbox_mode": resolved.sandbox_mode
1240 });
1241 if let Some(thread_id) = req.thread_id.as_ref() {
1242 self.thread_manager.touch_message(thread_id, &req.prompt)?;
1243 let assistant_message_id = self.thread_manager.store.append_message(
1244 thread_id,
1245 "assistant",
1246 &payload.to_string(),
1247 Some(payload.clone()),
1248 )?;
1249 self.persist_latest_checkpoint(
1250 thread_id,
1251 "prompt_response",
1252 json!({
1253 "response_id": response_id.clone(),
1254 "model": resolved_model.clone(),
1255 "provider": resolved.provider.as_str(),
1256 "assistant_message_id": assistant_message_id
1257 }),
1258 )?;
1259 }
1260
1261 Ok(PromptResponse {
1262 output: payload.to_string(),
1263 model: resolved_model,
1264 events: vec![
1265 EventFrame::ResponseStart {
1266 response_id: response_id.clone(),
1267 },
1268 EventFrame::ResponseDelta {
1269 response_id: response_id.clone(),
1270 delta: "model-selected".to_string(),
1271 channel: ResponseChannel::Text,
1272 },
1273 EventFrame::ResponseEnd { response_id },
1274 ],
1275 })
1276 }
1277
1278 pub async fn invoke_tool(
1280 &self,
1281 call: ToolCall,
1282 approval_mode: AskForApproval,
1283 cwd: &Path,
1284 ) -> Result<Value> {
1285 let fallback_cwd = cwd.display().to_string();
1286 let (command, policy_cwd, execution_kind) = call.execution_subject(&fallback_cwd);
1287 let policy_tool = match &call.payload {
1288 ToolPayload::LocalShell { .. } => "exec_shell",
1289 _ => call.name.as_str(),
1290 };
1291 let policy_path = permission_path_for_call(&call);
1292 let decision = self.exec_policy.check(ExecPolicyContext {
1293 command: &command,
1294 cwd: &policy_cwd,
1295 tool: Some(policy_tool),
1296 path: policy_path.as_deref(),
1297 ask_for_approval: approval_mode,
1298 sandbox_mode: None,
1299 })?;
1300 let precheck = policy_precheck_payload(&decision, &command, &policy_cwd, execution_kind);
1301 let response_id = format!("tool-{}", Uuid::new_v4());
1302 let call_id = call
1303 .raw_tool_call_id
1304 .clone()
1305 .unwrap_or_else(|| format!("tool-call-{}", Uuid::new_v4()));
1306 self.hooks
1307 .emit(HookEvent::ToolLifecycle {
1308 response_id: response_id.clone(),
1309 tool_name: call.name.clone(),
1310 phase: "precheck".to_string(),
1311 payload: precheck.clone(),
1312 })
1313 .await;
1314
1315 if !decision.allow {
1316 let reason = decision.reason().to_string();
1317 let approval_id = format!("approval-{}", Uuid::new_v4());
1318 let error_frame = EventFrame::Error {
1319 response_id: response_id.clone(),
1320 message: reason.clone(),
1321 };
1322 self.hooks
1323 .emit(HookEvent::ApprovalLifecycle {
1324 approval_id,
1325 phase: "denied".to_string(),
1326 reason: Some(reason.clone()),
1327 })
1328 .await;
1329 self.hooks
1330 .emit(HookEvent::GenericEventFrame {
1331 frame: Box::new(error_frame.clone()),
1332 })
1333 .await;
1334 return Ok(json!({
1335 "ok": false,
1336 "status": "denied",
1337 "execution_kind": execution_kind,
1338 "response_id": response_id,
1339 "precheck": precheck,
1340 "error": reason,
1341 "events": [event_frame_payload(&error_frame)],
1342 }));
1343 }
1344
1345 if decision.requires_approval {
1346 let approval_id = format!("approval-{}", Uuid::new_v4());
1347 let reason = decision.reason().to_string();
1348 let maybe_approval_frame = approval_request_frame(
1349 &decision.requirement,
1350 decision.matched_rule.as_deref(),
1351 call_id,
1352 approval_id.clone(),
1353 response_id.clone(),
1354 command.clone(),
1355 policy_cwd.clone(),
1356 );
1357 self.hooks
1358 .emit(HookEvent::ApprovalLifecycle {
1359 approval_id: approval_id.clone(),
1360 phase: "requested".to_string(),
1361 reason: Some(reason.clone()),
1362 })
1363 .await;
1364 let mut events = Vec::new();
1365 if let Some(frame) = maybe_approval_frame {
1366 self.hooks
1367 .emit(HookEvent::GenericEventFrame {
1368 frame: Box::new(frame.clone()),
1369 })
1370 .await;
1371 events.push(event_frame_payload(&frame));
1372 }
1373 return Ok(json!({
1374 "ok": false,
1375 "status": "approval_required",
1376 "execution_kind": execution_kind,
1377 "response_id": response_id,
1378 "approval_id": approval_id,
1379 "precheck": precheck,
1380 "error": reason,
1381 "events": events,
1382 }));
1383 }
1384
1385 let start_frame = EventFrame::ToolCallStart {
1386 response_id: response_id.clone(),
1387 tool_name: call.name.clone(),
1388 arguments: tool_payload_value(&call.payload),
1389 };
1390 self.hooks
1391 .emit(HookEvent::GenericEventFrame {
1392 frame: Box::new(start_frame.clone()),
1393 })
1394 .await;
1395 self.hooks
1396 .emit(HookEvent::ToolLifecycle {
1397 response_id: response_id.clone(),
1398 tool_name: call.name.clone(),
1399 phase: "dispatching".to_string(),
1400 payload: json!({
1401 "call_id": call_id,
1402 "execution_kind": execution_kind
1403 }),
1404 })
1405 .await;
1406
1407 match self.tool_registry.dispatch(call.clone(), true).await {
1408 Ok(tool_output) => {
1409 let result_frame = EventFrame::ToolCallResult {
1410 response_id: response_id.clone(),
1411 tool_name: call.name.clone(),
1412 output: tool_output_value(&tool_output),
1413 };
1414 self.hooks
1415 .emit(HookEvent::GenericEventFrame {
1416 frame: Box::new(result_frame.clone()),
1417 })
1418 .await;
1419 self.hooks
1420 .emit(HookEvent::ToolLifecycle {
1421 response_id: response_id.clone(),
1422 tool_name: call.name,
1423 phase: "completed".to_string(),
1424 payload: json!({ "ok": true }),
1425 })
1426 .await;
1427 Ok(json!({
1428 "ok": true,
1429 "status": "completed",
1430 "execution_kind": execution_kind,
1431 "response_id": response_id,
1432 "precheck": precheck,
1433 "output": tool_output,
1434 "events": [
1435 event_frame_payload(&start_frame),
1436 event_frame_payload(&result_frame)
1437 ]
1438 }))
1439 }
1440 Err(err) => {
1441 let message = format!("{err:?}");
1442 let error_frame = EventFrame::Error {
1443 response_id: response_id.clone(),
1444 message: message.clone(),
1445 };
1446 self.hooks
1447 .emit(HookEvent::GenericEventFrame {
1448 frame: Box::new(error_frame.clone()),
1449 })
1450 .await;
1451 self.hooks
1452 .emit(HookEvent::ToolLifecycle {
1453 response_id: response_id.clone(),
1454 tool_name: call.name,
1455 phase: "failed".to_string(),
1456 payload: json!({ "error": message.clone() }),
1457 })
1458 .await;
1459 Ok(json!({
1460 "ok": false,
1461 "status": "failed",
1462 "execution_kind": execution_kind,
1463 "response_id": response_id,
1464 "precheck": precheck,
1465 "error": message,
1466 "events": [
1467 event_frame_payload(&start_frame),
1468 event_frame_payload(&error_frame)
1469 ]
1470 }))
1471 }
1472 }
1473 }
1474
1475 pub async fn mcp_startup(&self) -> McpStartupCompleteEvent {
1477 let mut updates = Vec::new();
1478 let summary = self.mcp_manager.start_all(|update| {
1479 updates.push(update);
1480 });
1481 for update in updates {
1482 let status = match update.status {
1483 McpManagerStartupStatus::Starting => codewhale_protocol::McpStartupStatus::Starting,
1484 McpManagerStartupStatus::Ready => codewhale_protocol::McpStartupStatus::Ready,
1485 McpManagerStartupStatus::Failed { error } => {
1486 codewhale_protocol::McpStartupStatus::Failed { error }
1487 }
1488 McpManagerStartupStatus::Cancelled => {
1489 codewhale_protocol::McpStartupStatus::Cancelled
1490 }
1491 };
1492 self.hooks
1493 .emit(HookEvent::GenericEventFrame {
1494 frame: Box::new(EventFrame::McpStartupUpdate {
1495 update: codewhale_protocol::McpStartupUpdateEvent {
1496 server_name: update.server_name,
1497 status,
1498 },
1499 }),
1500 })
1501 .await;
1502 }
1503 self.hooks
1504 .emit(HookEvent::GenericEventFrame {
1505 frame: Box::new(EventFrame::McpStartupComplete {
1506 summary: codewhale_protocol::McpStartupCompleteEvent {
1507 ready: summary.ready.clone(),
1508 failed: summary
1509 .failed
1510 .iter()
1511 .map(|f| codewhale_protocol::McpStartupFailure {
1512 server_name: f.server_name.clone(),
1513 error: f.error.clone(),
1514 })
1515 .collect(),
1516 cancelled: summary.cancelled.clone(),
1517 },
1518 }),
1519 })
1520 .await;
1521 summary
1522 }
1523
1524 pub fn app_status(&self) -> AppResponse {
1526 let jobs = self.jobs.list();
1527 let events = jobs
1528 .iter()
1529 .flat_map(|job| {
1530 job.history.iter().map(|entry| EventFrame::ResponseDelta {
1531 response_id: job.id.clone(),
1532 delta: json!({
1533 "kind": "job_transition",
1534 "job_id": job.id.clone(),
1535 "phase": entry.phase.clone(),
1536 "status": job_status_to_str(entry.status),
1537 "progress": entry.progress,
1538 "detail": entry.detail.clone(),
1539 "retry": job_retry_to_value(&entry.retry),
1540 "at": entry.at
1541 })
1542 .to_string(),
1543 channel: ResponseChannel::Text,
1544 })
1545 })
1546 .collect::<Vec<_>>();
1547 AppResponse {
1548 ok: true,
1549 data: json!({
1550 "jobs": jobs.into_iter().map(|job| {
1551 json!({
1552 "id": job.id,
1553 "name": job.name,
1554 "status": job_status_to_str(job.status),
1555 "progress": job.progress,
1556 "detail": job.detail,
1557 "retry": job_retry_to_value(&job.retry),
1558 "history": job.history.iter().map(job_history_to_value).collect::<Vec<_>>()
1559 })
1560 }).collect::<Vec<_>>()
1561 }),
1562 events,
1563 }
1564 }
1565
1566 pub fn provider_default(&self) -> ProviderKind {
1568 self.config.provider
1569 }
1570
1571 pub fn save_thread_checkpoint(
1573 &self,
1574 thread_id: &str,
1575 checkpoint_id: &str,
1576 state: &Value,
1577 ) -> Result<()> {
1578 self.thread_manager
1579 .state_store()
1580 .save_checkpoint(thread_id, checkpoint_id, state)
1581 }
1582
1583 pub fn load_thread_checkpoint(
1585 &self,
1586 thread_id: &str,
1587 checkpoint_id: Option<&str>,
1588 ) -> Result<Option<Value>> {
1589 Ok(self
1590 .thread_manager
1591 .state_store()
1592 .load_checkpoint(thread_id, checkpoint_id)?
1593 .map(|checkpoint| checkpoint.state))
1594 }
1595
1596 pub fn enqueue_job(&mut self, name: impl Into<String>) -> Result<JobRecord> {
1598 let job = self.jobs.enqueue(name);
1599 self.jobs
1600 .persist_job(self.thread_manager.state_store(), &job.id)?;
1601 Ok(job)
1602 }
1603
1604 pub fn set_job_running(&mut self, job_id: &str) -> Result<()> {
1606 self.jobs.set_running(job_id);
1607 self.jobs
1608 .persist_job(self.thread_manager.state_store(), job_id)
1609 }
1610
1611 pub fn update_job_progress(
1613 &mut self,
1614 job_id: &str,
1615 progress: u8,
1616 detail: Option<String>,
1617 ) -> Result<()> {
1618 self.jobs.update_progress(job_id, progress, detail);
1619 self.jobs
1620 .persist_job(self.thread_manager.state_store(), job_id)
1621 }
1622
1623 pub fn complete_job(&mut self, job_id: &str) -> Result<()> {
1625 self.jobs.complete(job_id);
1626 self.jobs
1627 .persist_job(self.thread_manager.state_store(), job_id)
1628 }
1629
1630 pub fn fail_job(&mut self, job_id: &str, detail: impl Into<String>) -> Result<()> {
1632 self.jobs.fail(job_id, detail);
1633 self.jobs
1634 .persist_job(self.thread_manager.state_store(), job_id)
1635 }
1636
1637 pub fn cancel_job(&mut self, job_id: &str) -> Result<()> {
1639 self.jobs.cancel(job_id);
1640 self.jobs
1641 .persist_job(self.thread_manager.state_store(), job_id)
1642 }
1643
1644 pub fn pause_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1646 self.jobs.pause(job_id, detail);
1647 self.jobs
1648 .persist_job(self.thread_manager.state_store(), job_id)
1649 }
1650
1651 pub fn resume_job(&mut self, job_id: &str, detail: Option<String>) -> Result<()> {
1653 self.jobs.resume(job_id, detail);
1654 self.jobs
1655 .persist_job(self.thread_manager.state_store(), job_id)
1656 }
1657
1658 pub fn job_history(&self, job_id: &str) -> Vec<JobHistoryEntry> {
1660 self.jobs.history(job_id)
1661 }
1662}
1663
1664fn thread_response_from_new(status: &str, new: NewThread) -> ThreadResponse {
1665 ThreadResponse {
1666 thread_id: new.thread.id.clone(),
1667 status: status.to_string(),
1668 thread: Some(new.thread),
1669 threads: Vec::new(),
1670 goal: None,
1671 model: Some(new.model),
1672 model_provider: Some(new.model_provider),
1673 cwd: Some(new.cwd),
1674 approval_policy: new.approval_policy,
1675 sandbox: new.sandbox,
1676 events: Vec::new(),
1677 data: json!({}),
1678 }
1679}
1680
1681fn preview_from_initial_history(initial_history: &InitialHistory) -> String {
1682 match initial_history {
1683 InitialHistory::New => "New conversation".to_string(),
1684 InitialHistory::Forked(items) => truncate_preview(
1685 &items
1686 .first()
1687 .map(Value::to_string)
1688 .unwrap_or_else(|| "Forked conversation".to_string()),
1689 ),
1690 InitialHistory::Resumed { history, .. } => truncate_preview(
1691 &history
1692 .first()
1693 .map(Value::to_string)
1694 .unwrap_or_else(|| "Resumed conversation".to_string()),
1695 ),
1696 }
1697}
1698
1699fn permission_path_for_call(call: &ToolCall) -> Option<String> {
1700 match &call.payload {
1701 ToolPayload::Function { arguments } => serde_json::from_str::<Value>(arguments)
1702 .ok()
1703 .and_then(|value| {
1704 value
1705 .get("path")
1706 .and_then(Value::as_str)
1707 .map(str::to_string)
1708 }),
1709 ToolPayload::Mcp { raw_arguments, .. } => raw_arguments
1710 .get("path")
1711 .and_then(Value::as_str)
1712 .map(str::to_string),
1713 ToolPayload::Custom { .. } | ToolPayload::LocalShell { .. } => None,
1714 }
1715}
1716
1717fn truncate_preview(value: &str) -> String {
1718 value.chars().take(120).collect()
1719}
1720
1721fn to_protocol_thread(thread: ThreadMetadata) -> Thread {
1722 Thread {
1723 id: thread.id,
1724 preview: thread.preview,
1725 ephemeral: thread.ephemeral,
1726 model_provider: thread.model_provider,
1727 created_at: thread.created_at,
1728 updated_at: thread.updated_at,
1729 status: match thread.status {
1730 PersistedThreadStatus::Running => ThreadStatus::Running,
1731 PersistedThreadStatus::Idle => ThreadStatus::Idle,
1732 PersistedThreadStatus::Completed => ThreadStatus::Completed,
1733 PersistedThreadStatus::Failed => ThreadStatus::Failed,
1734 PersistedThreadStatus::Paused => ThreadStatus::Paused,
1735 PersistedThreadStatus::Archived => ThreadStatus::Archived,
1736 },
1737 path: thread.path,
1738 cwd: thread.cwd,
1739 cli_version: thread.cli_version,
1740 source: match thread.source {
1741 SessionSource::Interactive => codewhale_protocol::SessionSource::Interactive,
1742 SessionSource::Resume => codewhale_protocol::SessionSource::Resume,
1743 SessionSource::Fork => codewhale_protocol::SessionSource::Fork,
1744 SessionSource::Api => codewhale_protocol::SessionSource::Api,
1745 SessionSource::Unknown => codewhale_protocol::SessionSource::Unknown,
1746 },
1747 name: thread.name,
1748 }
1749}
1750
1751fn to_protocol_goal(goal: ThreadGoalRecord) -> ThreadGoal {
1752 ThreadGoal {
1753 thread_id: goal.thread_id,
1754 goal_id: goal.goal_id,
1755 objective: goal.objective,
1756 status: to_protocol_goal_status(goal.status),
1757 token_budget: goal.token_budget,
1758 tokens_used: goal.tokens_used,
1759 time_used_seconds: goal.time_used_seconds,
1760 continuation_count: goal.continuation_count,
1761 created_at: goal.created_at,
1762 updated_at: goal.updated_at,
1763 }
1764}
1765
1766fn to_protocol_goal_status(status: PersistedThreadGoalStatus) -> ThreadGoalStatus {
1767 match status {
1768 PersistedThreadGoalStatus::Active => ThreadGoalStatus::Active,
1769 PersistedThreadGoalStatus::Paused => ThreadGoalStatus::Paused,
1770 PersistedThreadGoalStatus::Blocked => ThreadGoalStatus::Blocked,
1771 PersistedThreadGoalStatus::UsageLimited => ThreadGoalStatus::UsageLimited,
1772 PersistedThreadGoalStatus::BudgetLimited => ThreadGoalStatus::BudgetLimited,
1773 PersistedThreadGoalStatus::Complete => ThreadGoalStatus::Complete,
1774 }
1775}
1776
1777fn to_persisted_status(status: &ThreadStatus) -> PersistedThreadStatus {
1778 match status {
1779 ThreadStatus::Running => PersistedThreadStatus::Running,
1780 ThreadStatus::Idle => PersistedThreadStatus::Idle,
1781 ThreadStatus::Completed => PersistedThreadStatus::Completed,
1782 ThreadStatus::Failed => PersistedThreadStatus::Failed,
1783 ThreadStatus::Paused => PersistedThreadStatus::Paused,
1784 ThreadStatus::Archived => PersistedThreadStatus::Archived,
1785 }
1786}
1787
1788fn to_persisted_source(source: &codewhale_protocol::SessionSource) -> SessionSource {
1789 match source {
1790 codewhale_protocol::SessionSource::Interactive => SessionSource::Interactive,
1791 codewhale_protocol::SessionSource::Resume => SessionSource::Resume,
1792 codewhale_protocol::SessionSource::Fork => SessionSource::Fork,
1793 codewhale_protocol::SessionSource::Api => SessionSource::Api,
1794 codewhale_protocol::SessionSource::Unknown => SessionSource::Unknown,
1795 }
1796}
1797
1798fn approval_request_frame(
1799 requirement: &ExecApprovalRequirement,
1800 matched_rule: Option<&str>,
1801 call_id: String,
1802 approval_id: String,
1803 turn_id: String,
1804 command: String,
1805 cwd: String,
1806) -> Option<EventFrame> {
1807 let ExecApprovalRequirement::NeedsApproval {
1808 reason,
1809 proposed_execpolicy_amendment,
1810 proposed_network_policy_amendments,
1811 } = requirement
1812 else {
1813 return None;
1814 };
1815
1816 let mut available_decisions = vec![
1817 ReviewDecision::Approved,
1818 ReviewDecision::ApprovedForSession,
1819 ReviewDecision::Denied,
1820 ReviewDecision::Abort,
1821 ];
1822 if proposed_execpolicy_amendment
1823 .as_ref()
1824 .is_some_and(|amendment| !amendment.prefixes.is_empty())
1825 {
1826 available_decisions.push(ReviewDecision::ApprovedExecpolicyAmendment);
1827 }
1828 available_decisions.extend(proposed_network_policy_amendments.iter().cloned().map(
1829 |amendment| ReviewDecision::NetworkPolicyAmendment {
1830 host: amendment.host,
1831 action: amendment.action,
1832 },
1833 ));
1834
1835 Some(EventFrame::ExecApprovalRequest {
1836 request: ExecApprovalRequestEvent {
1837 call_id,
1838 approval_id,
1839 turn_id,
1840 command,
1841 cwd,
1842 reason: reason.clone(),
1843 matched_rule: matched_rule.map(|rule| rule.to_string().into_boxed_str()),
1844 network_approval_context: None,
1845 proposed_execpolicy_amendment: proposed_execpolicy_amendment
1846 .as_ref()
1847 .map(|amendment| amendment.prefixes.clone())
1848 .unwrap_or_default(),
1849 proposed_network_policy_amendments: proposed_network_policy_amendments.clone(),
1850 additional_permissions: Vec::new(),
1851 available_decisions,
1852 },
1853 })
1854}
1855
1856fn approval_requirement_payload(requirement: &ExecApprovalRequirement) -> Value {
1857 match requirement {
1858 ExecApprovalRequirement::Skip {
1859 bypass_sandbox,
1860 proposed_execpolicy_amendment,
1861 } => json!({
1862 "type": "skip",
1863 "bypass_sandbox": bypass_sandbox,
1864 "reason": requirement.reason(),
1865 "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1866 .as_ref()
1867 .map(|amendment| amendment.prefixes.clone())
1868 .unwrap_or_default()
1869 }),
1870 ExecApprovalRequirement::NeedsApproval {
1871 reason,
1872 proposed_execpolicy_amendment,
1873 proposed_network_policy_amendments,
1874 } => json!({
1875 "type": "needs_approval",
1876 "reason": reason,
1877 "proposed_execpolicy_amendment": proposed_execpolicy_amendment
1878 .as_ref()
1879 .map(|amendment| amendment.prefixes.clone())
1880 .unwrap_or_default(),
1881 "proposed_network_policy_amendments": proposed_network_policy_amendments
1882 }),
1883 ExecApprovalRequirement::Forbidden { reason } => json!({
1884 "type": "forbidden",
1885 "reason": reason
1886 }),
1887 }
1888}
1889
1890fn policy_precheck_payload(
1891 decision: &ExecPolicyDecision,
1892 command: &str,
1893 cwd: &str,
1894 execution_kind: &str,
1895) -> Value {
1896 json!({
1897 "execution_kind": execution_kind,
1898 "command": command,
1899 "cwd": cwd,
1900 "allow": decision.allow,
1901 "requires_approval": decision.requires_approval,
1902 "matched_rule": decision.matched_rule.clone(),
1903 "phase": decision.requirement.phase(),
1904 "reason": decision.reason(),
1905 "requirement": approval_requirement_payload(&decision.requirement)
1906 })
1907}
1908
1909fn tool_payload_value(payload: &ToolPayload) -> Value {
1910 serde_json::to_value(payload).unwrap_or_else(
1911 |_| json!({"type":"serialization_error","message":"tool payload unavailable"}),
1912 )
1913}
1914
1915fn tool_output_value(output: &codewhale_protocol::ToolOutput) -> Value {
1916 serde_json::to_value(output).unwrap_or_else(
1917 |_| json!({"type":"serialization_error","message":"tool output unavailable"}),
1918 )
1919}
1920
1921fn event_frame_payload(frame: &EventFrame) -> Value {
1922 serde_json::to_value(frame)
1923 .unwrap_or_else(|_| json!({"event":"error","message":"failed to encode event frame"}))
1924}
1925
1926fn json_optional_string(value: &Value) -> Option<String> {
1927 if value.is_null() {
1928 None
1929 } else {
1930 value.as_str().map(ToString::to_string)
1931 }
1932}
1933
1934fn parse_retry_metadata(value: Option<&Value>) -> JobRetryMetadata {
1935 let Some(value) = value else {
1936 return JobRetryMetadata::default();
1937 };
1938 JobRetryMetadata {
1939 attempt: value
1940 .get("attempt")
1941 .and_then(Value::as_u64)
1942 .unwrap_or(0)
1943 .min(u32::MAX as u64) as u32,
1944 max_attempts: value
1945 .get("max_attempts")
1946 .and_then(Value::as_u64)
1947 .unwrap_or(DEFAULT_JOB_MAX_ATTEMPTS as u64)
1948 .min(u32::MAX as u64) as u32,
1949 backoff_base_ms: value
1950 .get("backoff_base_ms")
1951 .and_then(Value::as_u64)
1952 .unwrap_or(DEFAULT_JOB_BACKOFF_BASE_MS),
1953 next_backoff_ms: value
1954 .get("next_backoff_ms")
1955 .and_then(Value::as_u64)
1956 .unwrap_or(0),
1957 next_retry_at: value.get("next_retry_at").and_then(Value::as_i64),
1958 }
1959}
1960
1961fn parse_history_entry(value: &Value) -> Option<JobHistoryEntry> {
1962 let status = value
1963 .get("status")
1964 .and_then(Value::as_str)
1965 .and_then(job_status_from_str)?;
1966 Some(JobHistoryEntry {
1967 at: value.get("at").and_then(Value::as_i64).unwrap_or(0),
1968 phase: value
1969 .get("phase")
1970 .and_then(Value::as_str)
1971 .unwrap_or("unknown")
1972 .to_string(),
1973 status,
1974 progress: value
1975 .get("progress")
1976 .and_then(Value::as_u64)
1977 .map(|v| v.min(u8::MAX as u64) as u8),
1978 detail: value.get("detail").and_then(json_optional_string),
1979 retry: parse_retry_metadata(value.get("retry")),
1980 })
1981}
1982
1983fn job_status_to_str(status: JobStatus) -> &'static str {
1984 match status {
1985 JobStatus::Queued => "queued",
1986 JobStatus::Running => "running",
1987 JobStatus::Paused => "paused",
1988 JobStatus::Completed => "completed",
1989 JobStatus::Failed => "failed",
1990 JobStatus::Cancelled => "cancelled",
1991 }
1992}
1993
1994fn job_status_from_str(value: &str) -> Option<JobStatus> {
1995 match value {
1996 "queued" => Some(JobStatus::Queued),
1997 "running" => Some(JobStatus::Running),
1998 "paused" => Some(JobStatus::Paused),
1999 "completed" => Some(JobStatus::Completed),
2000 "failed" => Some(JobStatus::Failed),
2001 "cancelled" => Some(JobStatus::Cancelled),
2002 _ => None,
2003 }
2004}
2005
2006fn job_retry_to_value(retry: &JobRetryMetadata) -> Value {
2007 json!({
2008 "attempt": retry.attempt,
2009 "max_attempts": retry.max_attempts,
2010 "backoff_base_ms": retry.backoff_base_ms,
2011 "next_backoff_ms": retry.next_backoff_ms,
2012 "next_retry_at": retry.next_retry_at
2013 })
2014}
2015
2016fn job_history_to_value(entry: &JobHistoryEntry) -> Value {
2017 json!({
2018 "at": entry.at,
2019 "phase": entry.phase.clone(),
2020 "status": job_status_to_str(entry.status),
2021 "progress": entry.progress,
2022 "detail": entry.detail.clone(),
2023 "retry": job_retry_to_value(&entry.retry)
2024 })
2025}
2026
2027fn runtime_status_to_job_state(status: JobStatus) -> JobStateStatus {
2028 match status {
2029 JobStatus::Queued => JobStateStatus::Queued,
2030 JobStatus::Running => JobStateStatus::Running,
2031 JobStatus::Paused => JobStateStatus::Running,
2032 JobStatus::Completed => JobStateStatus::Completed,
2033 JobStatus::Failed => JobStateStatus::Failed,
2034 JobStatus::Cancelled => JobStateStatus::Cancelled,
2035 }
2036}
2037
2038fn job_state_status_to_runtime(status: JobStateStatus) -> JobStatus {
2039 match status {
2040 JobStateStatus::Queued => JobStatus::Queued,
2041 JobStateStatus::Running => JobStatus::Running,
2042 JobStateStatus::Completed => JobStatus::Completed,
2043 JobStateStatus::Failed => JobStatus::Failed,
2044 JobStateStatus::Cancelled => JobStatus::Cancelled,
2045 }
2046}
2047
2048#[cfg(test)]
2049mod tests {
2050 use super::*;
2051 use codewhale_tools::ToolCallSource;
2052
2053 fn temp_core_state(name: &str) -> StateStore {
2054 let dir =
2055 std::env::temp_dir().join(format!("codewhale-core-{name}-{}", Uuid::new_v4().simple()));
2056 std::fs::create_dir_all(&dir).expect("create temp state dir");
2057 StateStore::open(Some(dir.join("state.db"))).expect("open state store")
2058 }
2059
2060 fn test_thread_metadata(id: &str) -> ThreadMetadata {
2061 ThreadMetadata {
2062 id: id.to_string(),
2063 rollout_path: None,
2064 preview: "test thread".to_string(),
2065 ephemeral: false,
2066 model_provider: "deepseek".to_string(),
2067 created_at: 10,
2068 updated_at: 10,
2069 status: PersistedThreadStatus::Running,
2070 path: None,
2071 cwd: PathBuf::from("/tmp/codewhale"),
2072 cli_version: "0.0.0-test".to_string(),
2073 source: SessionSource::Interactive,
2074 name: None,
2075 sandbox_policy: None,
2076 approval_mode: None,
2077 archived: false,
2078 archived_at: None,
2079 git_sha: None,
2080 git_branch: None,
2081 git_origin_url: None,
2082 memory_mode: None,
2083 current_leaf_id: None,
2084 }
2085 }
2086
2087 #[test]
2090 fn permission_path_for_call_extracts_function_path_argument() {
2091 let call = ToolCall {
2092 name: "read_file".to_string(),
2093 payload: ToolPayload::Function {
2094 arguments: json!({ "path": "README.md" }).to_string(),
2095 },
2096 source: ToolCallSource::Direct,
2097 raw_tool_call_id: None,
2098 };
2099
2100 assert_eq!(
2101 permission_path_for_call(&call).as_deref(),
2102 Some("README.md")
2103 );
2104 }
2105
2106 #[test]
2107 fn permission_path_for_call_extracts_mcp_path_argument() {
2108 let call = ToolCall {
2109 name: "mcp_fs_read".to_string(),
2110 payload: ToolPayload::Mcp {
2111 server: "fs".to_string(),
2112 tool: "read".to_string(),
2113 raw_arguments: json!({ "path": "secrets/token.txt" }),
2114 raw_tool_call_id: None,
2115 },
2116 source: ToolCallSource::Direct,
2117 raw_tool_call_id: None,
2118 };
2119
2120 assert_eq!(
2121 permission_path_for_call(&call).as_deref(),
2122 Some("secrets/token.txt")
2123 );
2124 }
2125
2126 #[test]
2127 fn permission_path_for_call_ignores_shell_payload() {
2128 let call = ToolCall {
2129 name: "exec_shell".to_string(),
2130 payload: ToolPayload::LocalShell {
2131 params: codewhale_protocol::LocalShellParams {
2132 command: "cargo test".to_string(),
2133 cwd: None,
2134 timeout_ms: None,
2135 },
2136 },
2137 source: ToolCallSource::Direct,
2138 raw_tool_call_id: None,
2139 };
2140
2141 assert_eq!(permission_path_for_call(&call), None);
2142 }
2143
2144 #[test]
2145 fn thread_goal_progress_accumulates_durable_accounting() {
2146 let store = temp_core_state("thread-goal-progress");
2147 store
2148 .upsert_thread(&test_thread_metadata("thread-1"))
2149 .expect("upsert thread");
2150 let mut manager = ThreadManager::new(store);
2151 manager
2152 .set_thread_goal(&ThreadGoalSetParams {
2153 thread_id: "thread-1".to_string(),
2154 objective: "Carry the goal across turns".to_string(),
2155 token_budget: Some(2_000),
2156 })
2157 .expect("set goal")
2158 .expect("goal exists");
2159
2160 let updated = manager
2161 .record_thread_goal_progress(&ThreadGoalProgressParams {
2162 thread_id: "thread-1".to_string(),
2163 token_delta: 750,
2164 time_delta_seconds: 12,
2165 record_continuation: true,
2166 })
2167 .expect("record progress")
2168 .expect("goal exists");
2169
2170 assert_eq!(updated.tokens_used, 750);
2171 assert_eq!(updated.time_used_seconds, 12);
2172 assert_eq!(updated.continuation_count, 1);
2173
2174 let persisted = manager
2175 .get_thread_goal(&ThreadGoalGetParams {
2176 thread_id: "thread-1".to_string(),
2177 })
2178 .expect("read goal")
2179 .expect("goal exists");
2180 assert_eq!(persisted.tokens_used, 750);
2181 assert_eq!(persisted.time_used_seconds, 12);
2182 assert_eq!(persisted.continuation_count, 1);
2183 }
2184
2185 #[test]
2186 fn approval_request_frame_includes_matched_rule() {
2187 let requirement = ExecApprovalRequirement::NeedsApproval {
2188 reason: "Typed ask rule 'tool=exec_shell command=cargo test' requires approval."
2189 .to_string(),
2190 proposed_execpolicy_amendment: None,
2191 proposed_network_policy_amendments: Vec::new(),
2192 };
2193
2194 let frame = approval_request_frame(
2195 &requirement,
2196 Some("tool=exec_shell command=cargo test"),
2197 "call-1".to_string(),
2198 "approval-1".to_string(),
2199 "turn-1".to_string(),
2200 "cargo test --workspace".to_string(),
2201 "/repo".to_string(),
2202 )
2203 .expect("approval frame");
2204
2205 let EventFrame::ExecApprovalRequest { request } = frame else {
2206 panic!("expected exec approval request frame");
2207 };
2208 assert_eq!(
2209 request.matched_rule.as_deref(),
2210 Some("tool=exec_shell command=cargo test")
2211 );
2212 assert_eq!(request.reason, requirement.reason());
2213 }
2214
2215 #[test]
2216 fn enqueue_creates_queued_job_with_zero_progress() {
2217 let mut jm = JobManager::default();
2218 let job = jm.enqueue("build");
2219 assert_eq!(job.name, "build");
2220 assert_eq!(job.status, JobStatus::Queued);
2221 assert_eq!(job.progress, Some(0));
2222 assert!(job.detail.is_none());
2223 assert_eq!(job.history.len(), 1);
2224 assert_eq!(job.history[0].phase, "created");
2225 }
2226
2227 #[test]
2228 fn set_running_transitions_from_queued() {
2229 let mut jm = JobManager::default();
2230 let job = jm.enqueue("deploy");
2231 let id = job.id.clone();
2232 jm.set_running(&id);
2233 let jobs = jm.list();
2234 let updated = jobs.iter().find(|j| j.id == id).unwrap();
2235 assert_eq!(updated.status, JobStatus::Running);
2236 assert_eq!(updated.history.last().unwrap().phase, "running");
2237 }
2238
2239 #[test]
2240 fn update_progress_clamps_to_100() {
2241 let mut jm = JobManager::default();
2242 let job = jm.enqueue("task");
2243 let id = job.id.clone();
2244 jm.update_progress(&id, 150, Some("over".to_string()));
2245 let jobs = jm.list();
2246 let updated = jobs.iter().find(|j| j.id == id).unwrap();
2247 assert_eq!(updated.progress, Some(100));
2248 }
2249
2250 #[test]
2251 fn complete_sets_progress_to_100() {
2252 let mut jm = JobManager::default();
2253 let job = jm.enqueue("task");
2254 let id = job.id.clone();
2255 jm.set_running(&id);
2256 jm.complete(&id);
2257 let jobs = jm.list();
2258 let updated = jobs.iter().find(|j| j.id == id).unwrap();
2259 assert_eq!(updated.status, JobStatus::Completed);
2260 assert_eq!(updated.progress, Some(100));
2261 }
2262
2263 #[test]
2264 fn fail_increments_attempt_and_sets_backoff() {
2265 let mut jm = JobManager::default();
2266 let job = jm.enqueue("fragile");
2267 let id = job.id.clone();
2268 jm.set_running(&id);
2269 jm.fail(&id, "crashed");
2270 let jobs = jm.list();
2271 let updated = jobs.iter().find(|j| j.id == id).unwrap();
2272 assert_eq!(updated.status, JobStatus::Failed);
2273 assert_eq!(updated.retry.attempt, 1);
2274 assert!(updated.retry.next_backoff_ms > 0);
2275 assert!(updated.retry.next_retry_at.is_some());
2276 assert_eq!(updated.detail.as_deref(), Some("crashed"));
2277 }
2278
2279 #[test]
2280 fn fail_clears_retry_after_max_attempts() {
2281 let mut jm = JobManager::default();
2282 let job = jm.enqueue("fragile");
2283 let id = job.id.clone();
2284 for _ in 0..=DEFAULT_JOB_MAX_ATTEMPTS {
2285 jm.set_running(&id);
2286 jm.fail(&id, "boom");
2287 }
2288 let jobs = jm.list();
2289 let updated = jobs.iter().find(|j| j.id == id).unwrap();
2290 assert_eq!(updated.retry.attempt, DEFAULT_JOB_MAX_ATTEMPTS);
2291 assert_eq!(updated.retry.next_backoff_ms, 0);
2292 assert!(updated.retry.next_retry_at.is_none());
2293 }
2294
2295 #[test]
2296 fn cancel_sets_status_and_clears_retry() {
2297 let mut jm = JobManager::default();
2298 let job = jm.enqueue("task");
2299 let id = job.id.clone();
2300 jm.cancel(&id);
2301 let jobs = jm.list();
2302 let updated = jobs.iter().find(|j| j.id == id).unwrap();
2303 assert_eq!(updated.status, JobStatus::Cancelled);
2304 assert_eq!(updated.retry.next_backoff_ms, 0);
2305 }
2306
2307 #[test]
2308 fn pause_and_resume_round_trip() {
2309 let mut jm = JobManager::default();
2310 let job = jm.enqueue("task");
2311 let id = job.id.clone();
2312 jm.set_running(&id);
2313 jm.pause(&id, Some("waiting".to_string()));
2314 let jobs = jm.list();
2315 let paused = jobs.iter().find(|j| j.id == id).unwrap();
2316 assert_eq!(paused.status, JobStatus::Paused);
2317 assert_eq!(paused.detail.as_deref(), Some("waiting"));
2318
2319 jm.resume(&id, None);
2320 let jobs = jm.list();
2321 let resumed = jobs.iter().find(|j| j.id == id).unwrap();
2322 assert_eq!(resumed.status, JobStatus::Running);
2323 assert_eq!(resumed.history.last().unwrap().phase, "resumed");
2324 }
2325
2326 #[test]
2327 fn list_returns_jobs_sorted_by_updated_at_desc() {
2328 let mut jm = JobManager::default();
2329 jm.enqueue("first");
2330 jm.enqueue("second");
2331 jm.enqueue("third");
2332 let jobs = jm.list();
2333 assert_eq!(jobs.len(), 3);
2334 for window in jobs.windows(2) {
2335 assert!(window[0].updated_at >= window[1].updated_at);
2336 }
2337 }
2338
2339 #[test]
2340 fn history_returns_entries_for_existing_job() {
2341 let mut jm = JobManager::default();
2342 let job = jm.enqueue("task");
2343 let id = job.id.clone();
2344 jm.set_running(&id);
2345 jm.complete(&id);
2346 let history = jm.history(&id);
2347 assert_eq!(history.len(), 3); assert_eq!(history[0].phase, "created");
2349 assert_eq!(history[1].phase, "running");
2350 assert_eq!(history[2].phase, "completed");
2351 }
2352
2353 #[test]
2354 fn history_returns_empty_for_unknown_job() {
2355 let jm = JobManager::default();
2356 assert!(jm.history("nonexistent").is_empty());
2357 }
2358
2359 #[test]
2360 fn resume_pending_requeues_running_and_queued() {
2361 let mut jm = JobManager::default();
2362 let _j1 = jm.enqueue("queued_task");
2363 let j2 = jm.enqueue("running_task");
2364 let j3 = jm.enqueue("completed_task");
2365 let id2 = j2.id.clone();
2366 let id3 = j3.id.clone();
2367 jm.set_running(&id2);
2368 jm.set_running(&id3);
2369 jm.complete(&id3);
2370
2371 let resumed = jm.resume_pending();
2372 assert_eq!(resumed.len(), 2);
2373 for job in &resumed {
2374 assert_eq!(job.status, JobStatus::Queued);
2375 }
2376 }
2377
2378 #[test]
2381 fn deterministic_backoff_zero_on_first_attempt() {
2382 let retry = JobRetryMetadata {
2383 attempt: 0,
2384 ..Default::default()
2385 };
2386 assert_eq!(JobManager::deterministic_backoff_ms(&retry), 0);
2387 }
2388
2389 #[test]
2390 fn deterministic_backoff_exponential_growth() {
2391 let base = DEFAULT_JOB_BACKOFF_BASE_MS;
2392 for attempt in 1..=5 {
2393 let retry = JobRetryMetadata {
2394 attempt,
2395 backoff_base_ms: base,
2396 ..Default::default()
2397 };
2398 let expected = base * 2u64.pow(attempt.saturating_sub(1).min(20));
2399 assert_eq!(
2400 JobManager::deterministic_backoff_ms(&retry),
2401 expected,
2402 "attempt {attempt}"
2403 );
2404 }
2405 }
2406
2407 #[test]
2408 fn deterministic_backoff_saturates_at_high_exponent() {
2409 let retry = JobRetryMetadata {
2410 attempt: 63,
2411 backoff_base_ms: 1000,
2412 ..Default::default()
2413 };
2414 let _ = JobManager::deterministic_backoff_ms(&retry);
2416 }
2417
2418 #[test]
2421 fn push_history_truncates_beyond_max() {
2422 let mut jm = JobManager::default();
2423 let job = jm.enqueue("task");
2424 let id = job.id.clone();
2425 for i in 0..(MAX_JOB_HISTORY_ENTRIES + 20) {
2427 jm.update_progress(&id, (i % 100) as u8, Some(format!("step {i}")));
2428 }
2429 let history = jm.history(&id);
2430 assert_eq!(history.len(), MAX_JOB_HISTORY_ENTRIES);
2431 }
2432
2433 #[test]
2436 fn encode_and_parse_persisted_detail_round_trip() {
2437 let mut jm = JobManager::default();
2438 let job = jm.enqueue("task");
2439 let id = job.id.clone();
2440 jm.set_running(&id);
2441 jm.fail(&id, "oops");
2442 let job = jm.list().into_iter().find(|j| j.id == id).unwrap();
2443
2444 let encoded = JobManager::encode_persisted_detail(&job).unwrap().unwrap();
2445 let parsed = JobManager::parse_persisted_detail(Some(&encoded)).unwrap();
2446
2447 assert_eq!(parsed.status, job.status);
2448 assert_eq!(parsed.detail, job.detail);
2449 assert_eq!(parsed.retry.attempt, job.retry.attempt);
2450 assert_eq!(parsed.history.len(), job.history.len());
2451 }
2452
2453 #[test]
2454 fn parse_persisted_detail_returns_none_for_none_input() {
2455 assert!(JobManager::parse_persisted_detail(None).is_none());
2456 }
2457
2458 #[test]
2459 fn parse_persisted_detail_returns_none_for_invalid_json() {
2460 assert!(JobManager::parse_persisted_detail(Some("not json")).is_none());
2461 }
2462
2463 #[test]
2466 fn job_status_round_trip_str() {
2467 let statuses = [
2468 JobStatus::Queued,
2469 JobStatus::Running,
2470 JobStatus::Paused,
2471 JobStatus::Completed,
2472 JobStatus::Failed,
2473 JobStatus::Cancelled,
2474 ];
2475 for status in &statuses {
2476 let s = job_status_to_str(*status);
2477 let parsed = job_status_from_str(s);
2478 assert_eq!(parsed, Some(*status), "round-trip failed for {s:?}");
2479 }
2480 }
2481
2482 #[test]
2483 fn job_status_from_str_returns_none_for_unknown() {
2484 assert_eq!(job_status_from_str("unknown"), None);
2485 assert_eq!(job_status_from_str(""), None);
2486 }
2487
2488 #[test]
2489 fn truncate_preview_limits_to_120_chars() {
2490 let long = "a".repeat(200);
2491 let truncated = truncate_preview(&long);
2492 assert_eq!(truncated.len(), 120);
2493 }
2494
2495 #[test]
2496 fn truncate_preview_preserves_short_strings() {
2497 let short = "hello";
2498 assert_eq!(truncate_preview(short), "hello");
2499 }
2500
2501 #[test]
2502 fn runtime_status_to_job_state_maps_correctly() {
2503 assert_eq!(
2504 runtime_status_to_job_state(JobStatus::Queued),
2505 JobStateStatus::Queued
2506 );
2507 assert_eq!(
2508 runtime_status_to_job_state(JobStatus::Running),
2509 JobStateStatus::Running
2510 );
2511 assert_eq!(
2512 runtime_status_to_job_state(JobStatus::Paused),
2513 JobStateStatus::Running
2514 );
2515 assert_eq!(
2516 runtime_status_to_job_state(JobStatus::Completed),
2517 JobStateStatus::Completed
2518 );
2519 assert_eq!(
2520 runtime_status_to_job_state(JobStatus::Failed),
2521 JobStateStatus::Failed
2522 );
2523 assert_eq!(
2524 runtime_status_to_job_state(JobStatus::Cancelled),
2525 JobStateStatus::Cancelled
2526 );
2527 }
2528
2529 #[test]
2530 fn job_state_status_to_runtime_maps_correctly() {
2531 assert_eq!(
2532 job_state_status_to_runtime(JobStateStatus::Queued),
2533 JobStatus::Queued
2534 );
2535 assert_eq!(
2536 job_state_status_to_runtime(JobStateStatus::Running),
2537 JobStatus::Running
2538 );
2539 assert_eq!(
2540 job_state_status_to_runtime(JobStateStatus::Completed),
2541 JobStatus::Completed
2542 );
2543 assert_eq!(
2544 job_state_status_to_runtime(JobStateStatus::Failed),
2545 JobStatus::Failed
2546 );
2547 assert_eq!(
2548 job_state_status_to_runtime(JobStateStatus::Cancelled),
2549 JobStatus::Cancelled
2550 );
2551 }
2552
2553 #[test]
2554 fn preview_from_initial_history_new() {
2555 let preview = preview_from_initial_history(&InitialHistory::New);
2556 assert_eq!(preview, "New conversation");
2557 }
2558
2559 #[test]
2560 fn preview_from_initial_history_forked() {
2561 let preview = preview_from_initial_history(&InitialHistory::Forked(vec![json!("hello")]));
2562 assert!(preview.contains("hello"));
2563 }
2564
2565 #[test]
2566 fn preview_from_initial_history_resumed() {
2567 let preview = preview_from_initial_history(&InitialHistory::Resumed {
2568 conversation_id: "test".to_string(),
2569 history: vec![json!("world")],
2570 rollout_path: PathBuf::from("/tmp/test"),
2571 });
2572 assert!(preview.contains("world"));
2573 }
2574
2575 #[test]
2576 fn json_optional_string_handles_null() {
2577 assert!(json_optional_string(&Value::Null).is_none());
2578 }
2579
2580 #[test]
2581 fn json_optional_string_handles_string() {
2582 assert_eq!(
2583 json_optional_string(&Value::String("hello".to_string())),
2584 Some("hello".to_string())
2585 );
2586 }
2587
2588 #[test]
2589 fn json_optional_string_handles_non_string() {
2590 assert!(json_optional_string(&json!(42)).is_none());
2591 }
2592
2593 #[test]
2594 fn parse_retry_metadata_returns_default_for_none() {
2595 let retry = parse_retry_metadata(None);
2596 assert_eq!(retry.attempt, 0);
2597 assert_eq!(retry.max_attempts, DEFAULT_JOB_MAX_ATTEMPTS);
2598 assert_eq!(retry.backoff_base_ms, DEFAULT_JOB_BACKOFF_BASE_MS);
2599 }
2600
2601 #[test]
2602 fn parse_retry_metadata_parses_fields() {
2603 let value = json!({
2604 "attempt": 2,
2605 "max_attempts": 5,
2606 "backoff_base_ms": 1000,
2607 "next_backoff_ms": 2000,
2608 "next_retry_at": 1234567890i64
2609 });
2610 let retry = parse_retry_metadata(Some(&value));
2611 assert_eq!(retry.attempt, 2);
2612 assert_eq!(retry.max_attempts, 5);
2613 assert_eq!(retry.backoff_base_ms, 1000);
2614 assert_eq!(retry.next_backoff_ms, 2000);
2615 assert_eq!(retry.next_retry_at, Some(1234567890));
2616 }
2617
2618 #[test]
2619 fn parse_history_entry_returns_none_without_status() {
2620 let value = json!({"at": 1, "phase": "test"});
2621 assert!(parse_history_entry(&value).is_none());
2622 }
2623
2624 #[test]
2625 fn parse_history_entry_parses_valid_entry() {
2626 let value = json!({
2627 "at": 100,
2628 "phase": "running",
2629 "status": "running",
2630 "progress": 50,
2631 "detail": "working",
2632 "retry": {"attempt": 0, "max_attempts": 3, "backoff_base_ms": 500}
2633 });
2634 let entry = parse_history_entry(&value).unwrap();
2635 assert_eq!(entry.at, 100);
2636 assert_eq!(entry.phase, "running");
2637 assert_eq!(entry.status, JobStatus::Running);
2638 assert_eq!(entry.progress, Some(50));
2639 assert_eq!(entry.detail.as_deref(), Some("working"));
2640 }
2641}