Skip to main content

harn_vm/
personas.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::sync::{Arc, OnceLock, RwLock};
4
5use chrono::{TimeZone, Utc};
6use croner::Cron;
7use serde::{Deserialize, Serialize};
8use serde_json::json;
9use time::format_description::well_known::Rfc3339;
10use time::OffsetDateTime;
11use uuid::Uuid;
12
13use crate::event_log::{AnyEventLog, EventLog, LogEvent, Topic};
14
15pub const PERSONA_RUNTIME_TOPIC: &str = "persona.runtime.events";
16
17const DEFAULT_LEASE_TTL_MS: i64 = 5 * 60 * 1000;
18
19#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
20#[serde(rename_all = "snake_case")]
21pub enum PersonaLifecycleState {
22    Inactive,
23    Starting,
24    #[default]
25    Idle,
26    Running,
27    Paused,
28    Draining,
29    Failed,
30    Disabled,
31}
32
33impl PersonaLifecycleState {
34    pub fn as_str(self) -> &'static str {
35        match self {
36            Self::Inactive => "inactive",
37            Self::Starting => "starting",
38            Self::Idle => "idle",
39            Self::Running => "running",
40            Self::Paused => "paused",
41            Self::Draining => "draining",
42            Self::Failed => "failed",
43            Self::Disabled => "disabled",
44        }
45    }
46}
47
48#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
49pub struct PersonaBudgetPolicy {
50    pub daily_usd: Option<f64>,
51    pub hourly_usd: Option<f64>,
52    pub run_usd: Option<f64>,
53    pub max_tokens: Option<u64>,
54}
55
56#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
57pub struct PersonaRuntimeBinding {
58    pub name: String,
59    #[serde(default)]
60    pub template_ref: Option<String>,
61    pub entry_workflow: String,
62    #[serde(default)]
63    pub schedules: Vec<String>,
64    #[serde(default)]
65    pub triggers: Vec<String>,
66    #[serde(default)]
67    pub budget: PersonaBudgetPolicy,
68    /// Ordered stage declarations. Each stage names a `@step` and narrows the
69    /// runtime capability surface for the duration of that step (see
70    /// `crates/harn-vm/src/step_runtime.rs`). Empty means the persona keeps
71    /// the ambient `CapabilityPolicy`.
72    #[serde(default)]
73    pub stages: Vec<StageDecl>,
74}
75
76/// Per-stage narrowing of the runtime tool surface.
77///
78/// `name` matches a `@step` declaration on the persona. When that step's
79/// frame is entered the runtime pushes a `CapabilityPolicy` whose `tools`
80/// allowlist is `allowed_tools` (when set) and whose side-effect ceiling is
81/// `side_effect_level` (when set). Both narrow the ambient policy — the
82/// existing ceiling still applies, this just adds a tighter scope.
83#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
84pub struct StageDecl {
85    pub name: String,
86    /// Tool allowlist for the stage. `None` (omitted) inherits the
87    /// persona-level allowlist; `Some(vec![])` denies every tool.
88    #[serde(default, skip_serializing_if = "Option::is_none")]
89    pub allowed_tools: Option<Vec<String>>,
90    /// Optional side-effect ceiling override (`none` / `read_only` /
91    /// `workspace_write` / `process_exec` / `network`). Tightens, never
92    /// loosens, the ambient ceiling.
93    #[serde(default, skip_serializing_if = "Option::is_none")]
94    pub side_effect_level: Option<String>,
95    /// Optional agent-loop iteration cap for the stage. Surfaced for
96    /// downstream consumers; `agent_loop` reads it via the persona binding.
97    #[serde(default, skip_serializing_if = "Option::is_none")]
98    pub max_iterations: Option<u32>,
99    /// Exit transitions that name the next stage on success/failure plus an
100    /// optional explicit policy override.
101    #[serde(default, skip_serializing_if = "Option::is_none")]
102    pub on_exit: Option<StageExit>,
103}
104
105#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
106pub struct StageExit {
107    #[serde(default, skip_serializing_if = "Option::is_none")]
108    pub on_complete: Option<String>,
109    #[serde(default, skip_serializing_if = "Option::is_none")]
110    pub on_failure: Option<String>,
111    #[serde(default, skip_serializing_if = "Option::is_none")]
112    pub policy_override: Option<crate::orchestration::CapabilityPolicy>,
113}
114
115#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
116pub struct PersonaLease {
117    pub id: String,
118    pub holder: String,
119    pub work_key: String,
120    pub acquired_at_ms: i64,
121    pub expires_at_ms: i64,
122}
123
124#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
125pub struct PersonaBudgetStatus {
126    pub daily_usd: Option<f64>,
127    pub hourly_usd: Option<f64>,
128    pub run_usd: Option<f64>,
129    pub max_tokens: Option<u64>,
130    pub spent_today_usd: f64,
131    pub spent_this_hour_usd: f64,
132    pub spent_last_run_usd: f64,
133    pub tokens_today: u64,
134    pub remaining_today_usd: Option<f64>,
135    pub remaining_hour_usd: Option<f64>,
136    pub exhausted: bool,
137    pub reason: Option<String>,
138    pub last_receipt_id: Option<String>,
139}
140
141#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
142pub struct PersonaStatus {
143    pub name: String,
144    #[serde(default)]
145    pub template_ref: Option<String>,
146    pub state: PersonaLifecycleState,
147    pub entry_workflow: String,
148    #[serde(default)]
149    pub role: String,
150    #[serde(default)]
151    pub current_assignment: Option<PersonaAssignmentStatus>,
152    pub last_run: Option<String>,
153    pub next_scheduled_run: Option<String>,
154    pub active_lease: Option<PersonaLease>,
155    pub budget: PersonaBudgetStatus,
156    pub last_error: Option<String>,
157    pub queued_events: usize,
158    #[serde(default)]
159    pub queued_work: Vec<PersonaQueuedWork>,
160    #[serde(default)]
161    pub handoff_inbox: Vec<PersonaHandoffInboxItem>,
162    #[serde(default)]
163    pub value_receipts: Vec<PersonaValueReceipt>,
164    pub disabled_events: usize,
165    pub paused_event_policy: String,
166}
167
168#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
169pub struct PersonaAssignmentStatus {
170    pub work_key: String,
171    pub lease_id: String,
172    pub holder: String,
173    pub acquired_at: String,
174    pub expires_at: String,
175}
176
177#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
178pub struct PersonaQueuedWork {
179    pub work_key: String,
180    pub provider: String,
181    pub kind: String,
182    pub queued_at: String,
183    pub reason: String,
184    pub source_event_id: Option<String>,
185    #[serde(default)]
186    pub metadata: BTreeMap<String, String>,
187}
188
189#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
190pub struct PersonaHandoffInboxItem {
191    pub work_key: String,
192    pub handoff_id: Option<String>,
193    pub handoff_kind: Option<String>,
194    pub source_persona: Option<String>,
195    pub task: Option<String>,
196    pub queued_at: String,
197    pub reason: String,
198}
199
200#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
201pub struct PersonaValueReceipt {
202    pub kind: PersonaValueEventKind,
203    pub run_id: Option<Uuid>,
204    pub occurred_at: String,
205    pub paid_cost_usd: f64,
206    pub avoided_cost_usd: f64,
207    pub deterministic_steps: i64,
208    pub llm_steps: i64,
209    pub metadata: serde_json::Value,
210}
211
212#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
213pub struct PersonaTriggerEnvelope {
214    pub provider: String,
215    pub kind: String,
216    pub subject_key: String,
217    pub source_event_id: Option<String>,
218    pub received_at_ms: i64,
219    #[serde(default)]
220    pub metadata: BTreeMap<String, String>,
221    #[serde(default)]
222    pub raw: serde_json::Value,
223}
224
225#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
226pub struct PersonaRunReceipt {
227    pub status: String,
228    pub persona: String,
229    #[serde(default)]
230    pub run_id: Option<Uuid>,
231    pub work_key: String,
232    pub lease: Option<PersonaLease>,
233    pub queued: bool,
234    pub error: Option<String>,
235    pub budget_receipt_id: Option<String>,
236}
237
238#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
239pub struct PersonaRunCost {
240    pub cost_usd: f64,
241    pub tokens: u64,
242    #[serde(default)]
243    pub avoided_cost_usd: f64,
244    #[serde(default)]
245    pub deterministic_steps: i64,
246    #[serde(default)]
247    pub llm_steps: i64,
248    #[serde(default)]
249    pub frontier_escalations: i64,
250    #[serde(default)]
251    pub metadata: serde_json::Value,
252}
253
254#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
255#[serde(rename_all = "snake_case")]
256pub enum PersonaValueEventKind {
257    RunStarted,
258    RunCompleted,
259    AcceptedOutcome,
260    FrontierEscalation,
261    DeterministicExecution,
262    PromotionSavings,
263    ApprovalWait,
264}
265
266impl PersonaValueEventKind {
267    pub fn as_str(self) -> &'static str {
268        match self {
269            Self::RunStarted => "run_started",
270            Self::RunCompleted => "run_completed",
271            Self::AcceptedOutcome => "accepted_outcome",
272            Self::FrontierEscalation => "frontier_escalation",
273            Self::DeterministicExecution => "deterministic_execution",
274            Self::PromotionSavings => "promotion_savings",
275            Self::ApprovalWait => "approval_wait",
276        }
277    }
278}
279
280#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
281pub struct PersonaValueEvent {
282    pub persona_id: String,
283    pub template_ref: Option<String>,
284    pub run_id: Option<Uuid>,
285    pub kind: PersonaValueEventKind,
286    pub paid_cost_usd: f64,
287    pub avoided_cost_usd: f64,
288    pub deterministic_steps: i64,
289    pub llm_steps: i64,
290    pub metadata: serde_json::Value,
291    pub occurred_at: OffsetDateTime,
292}
293
294pub trait PersonaValueSink: Send + Sync {
295    fn handle_value_event(&self, event: &PersonaValueEvent);
296}
297
298/// Append-only multiplexed feed mirroring the harn-cloud
299/// `persona_supervision_events` projection. Sinks see the runtime-sourced
300/// `queue_position`, `repair_worker_status`, `receipt`, and
301/// `checkpoint` (restore-ack) variants; supervisor-sourced variants
302/// (`control`, `feedback`, `approval`, `handoff`, and checkpoint writes)
303/// originate on the hosted side and aren't routed here.
304#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
305#[serde(tag = "update_kind", rename_all = "snake_case")]
306pub enum PersonaSupervisionEvent {
307    QueuePosition(PersonaQueuePositionUpdate),
308    RepairWorkerStatus(PersonaRepairWorkerStatusUpdate),
309    Receipt(PersonaReceiptUpdate),
310    Checkpoint(PersonaCheckpointUpdate),
311}
312
313impl PersonaSupervisionEvent {
314    pub fn update_kind(&self) -> &'static str {
315        match self {
316            Self::QueuePosition(_) => "queue_position",
317            Self::RepairWorkerStatus(_) => "repair_worker_status",
318            Self::Receipt(_) => "receipt",
319            Self::Checkpoint(_) => "checkpoint",
320        }
321    }
322
323    pub fn persona_id(&self) -> &str {
324        match self {
325            Self::QueuePosition(update) => &update.persona_id,
326            Self::RepairWorkerStatus(update) => &update.persona_id,
327            Self::Receipt(update) => &update.persona_id,
328            Self::Checkpoint(update) => &update.persona_id,
329        }
330    }
331
332    pub fn template_ref(&self) -> Option<&str> {
333        match self {
334            Self::QueuePosition(update) => update.template_ref.as_deref(),
335            Self::RepairWorkerStatus(update) => update.template_ref.as_deref(),
336            Self::Receipt(update) => update.template_ref.as_deref(),
337            Self::Checkpoint(update) => update.template_ref.as_deref(),
338        }
339    }
340
341    pub fn occurred_at_ms(&self) -> i64 {
342        match self {
343            Self::QueuePosition(update) => update.occurred_at_ms,
344            Self::RepairWorkerStatus(update) => update.occurred_at_ms,
345            Self::Receipt(update) => update.occurred_at_ms,
346            Self::Checkpoint(update) => update.occurred_at_ms,
347        }
348    }
349}
350
351#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
352pub struct PersonaQueuePositionUpdate {
353    pub persona_id: String,
354    #[serde(default)]
355    pub template_ref: Option<String>,
356    pub work_key: String,
357    pub queue_depth: i64,
358    /// 1-indexed position of `work_key`; `0` means the item left the queue.
359    pub position: i64,
360    pub queued_at_ms: i64,
361    pub occurred_at_ms: i64,
362}
363
364#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
365#[serde(rename_all = "snake_case")]
366pub enum PersonaRepairWorkerLifecycle {
367    Pending,
368    Running,
369    Verifying,
370    Pushing,
371    Succeeded,
372    Failed,
373    Cancelled,
374}
375
376impl PersonaRepairWorkerLifecycle {
377    pub fn as_str(self) -> &'static str {
378        match self {
379            Self::Pending => "pending",
380            Self::Running => "running",
381            Self::Verifying => "verifying",
382            Self::Pushing => "pushing",
383            Self::Succeeded => "succeeded",
384            Self::Failed => "failed",
385            Self::Cancelled => "cancelled",
386        }
387    }
388}
389
390#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
391pub struct PersonaRepairWorkerStatusUpdate {
392    pub persona_id: String,
393    #[serde(default)]
394    pub template_ref: Option<String>,
395    pub repair_worker_id: String,
396    pub lifecycle: PersonaRepairWorkerLifecycle,
397    #[serde(default)]
398    pub work_key: Option<String>,
399    #[serde(default)]
400    pub lease_id: Option<String>,
401    #[serde(default)]
402    pub scratchpad_url: Option<String>,
403    pub last_heartbeat_ms: i64,
404    pub occurred_at_ms: i64,
405}
406
407#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
408pub struct PersonaReceiptUpdate {
409    pub persona_id: String,
410    #[serde(default)]
411    pub template_ref: Option<String>,
412    pub receipt: PersonaRunReceipt,
413    pub occurred_at_ms: i64,
414}
415
416#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
417pub struct PersonaCheckpointUpdate {
418    pub persona_id: String,
419    #[serde(default)]
420    pub template_ref: Option<String>,
421    pub action: PersonaCheckpointAction,
422    pub checkpoint_id: String,
423    #[serde(default)]
424    pub work_key: Option<String>,
425    /// Coordinates the runtime actually resumed from when acking a restore.
426    #[serde(default)]
427    pub resumed_from: Option<PersonaCheckpointResume>,
428    pub occurred_at_ms: i64,
429}
430
431#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
432#[serde(rename_all = "snake_case")]
433pub enum PersonaCheckpointAction {
434    RestoreAcked,
435}
436
437impl PersonaCheckpointAction {
438    pub fn as_str(self) -> &'static str {
439        match self {
440            Self::RestoreAcked => "restore_acked",
441        }
442    }
443}
444
445#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
446pub struct PersonaCheckpointResume {
447    pub run_id: Option<Uuid>,
448    pub lease_id: Option<String>,
449    pub last_run_ms: Option<i64>,
450    pub queued_work_keys: Vec<String>,
451    #[serde(default)]
452    pub note: Option<String>,
453}
454
455pub trait PersonaSupervisionSink: Send + Sync {
456    fn handle_supervision_event(&self, event: &PersonaSupervisionEvent);
457}
458
459struct TypedSinkRegistry<T: ?Sized + Send + Sync> {
460    sinks: RwLock<Vec<(u64, Arc<T>)>>,
461    next_id: AtomicU64,
462}
463
464impl<T: ?Sized + Send + Sync> TypedSinkRegistry<T> {
465    const fn new() -> Self {
466        Self {
467            sinks: RwLock::new(Vec::new()),
468            next_id: AtomicU64::new(1),
469        }
470    }
471
472    fn register(&self, sink: Arc<T>) -> u64 {
473        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
474        if let Ok(mut sinks) = self.sinks.write() {
475            sinks.push((id, sink));
476        }
477        id
478    }
479
480    fn unregister(&self, id: u64) {
481        if let Ok(mut sinks) = self.sinks.write() {
482            sinks.retain(|(existing, _)| *existing != id);
483        }
484    }
485
486    fn snapshot(&self) -> Vec<Arc<T>> {
487        self.sinks
488            .read()
489            .map(|sinks| sinks.iter().map(|(_, sink)| Arc::clone(sink)).collect())
490            .unwrap_or_default()
491    }
492}
493
494fn persona_value_sinks() -> &'static TypedSinkRegistry<dyn PersonaValueSink> {
495    static REGISTRY: OnceLock<TypedSinkRegistry<dyn PersonaValueSink>> = OnceLock::new();
496    REGISTRY.get_or_init(TypedSinkRegistry::new)
497}
498
499fn persona_supervision_sinks() -> &'static TypedSinkRegistry<dyn PersonaSupervisionSink> {
500    static REGISTRY: OnceLock<TypedSinkRegistry<dyn PersonaSupervisionSink>> = OnceLock::new();
501    REGISTRY.get_or_init(TypedSinkRegistry::new)
502}
503
504#[must_use = "dropping the registration immediately unregisters the sink"]
505pub struct PersonaValueSinkRegistration {
506    id: u64,
507}
508
509impl Drop for PersonaValueSinkRegistration {
510    fn drop(&mut self) {
511        persona_value_sinks().unregister(self.id);
512    }
513}
514
515pub fn register_persona_value_sink(
516    sink: Arc<dyn PersonaValueSink>,
517) -> PersonaValueSinkRegistration {
518    PersonaValueSinkRegistration {
519        id: persona_value_sinks().register(sink),
520    }
521}
522
523#[must_use = "dropping the registration immediately unregisters the sink"]
524pub struct PersonaSupervisionSinkRegistration {
525    id: u64,
526}
527
528impl Drop for PersonaSupervisionSinkRegistration {
529    fn drop(&mut self) {
530        persona_supervision_sinks().unregister(self.id);
531    }
532}
533
534pub fn register_persona_supervision_sink(
535    sink: Arc<dyn PersonaSupervisionSink>,
536) -> PersonaSupervisionSinkRegistration {
537    PersonaSupervisionSinkRegistration {
538        id: persona_supervision_sinks().register(sink),
539    }
540}
541
542pub async fn persona_status(
543    log: &Arc<AnyEventLog>,
544    binding: &PersonaRuntimeBinding,
545    now_ms: i64,
546) -> Result<PersonaStatus, String> {
547    let events = read_persona_events(log, &binding.name).await?;
548    let mut state = PersonaLifecycleState::Idle;
549    let mut last_run_ms = None;
550    let mut active_lease = None;
551    let mut last_error = None;
552    let mut queued = BTreeSet::<String>::new();
553    let mut completed = BTreeSet::<String>::new();
554    let mut disabled_events = 0usize;
555    let mut budget_receipt = None;
556    let mut budget_exhaustion_reason = None;
557    let mut spent = Vec::<(i64, f64, u64)>::new();
558    let mut queued_work = BTreeMap::<String, PersonaQueuedWork>::new();
559    let mut value_receipts = Vec::<PersonaValueReceipt>::new();
560
561    for (_, event) in events {
562        match event.kind.as_str() {
563            "persona.control.paused" => state = PersonaLifecycleState::Paused,
564            "persona.control.resumed" => state = PersonaLifecycleState::Idle,
565            "persona.control.disabled" => state = PersonaLifecycleState::Disabled,
566            "persona.control.draining" => state = PersonaLifecycleState::Draining,
567            "persona.lease.acquired" => {
568                if let Ok(lease) = serde_json::from_value::<PersonaLease>(event.payload.clone()) {
569                    active_lease = Some(lease);
570                    state = PersonaLifecycleState::Running;
571                }
572            }
573            "persona.lease.released" => {
574                active_lease = None;
575                if !matches!(
576                    state,
577                    PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
578                ) {
579                    state = PersonaLifecycleState::Idle;
580                }
581            }
582            "persona.lease.expired" => {
583                active_lease = None;
584                if !matches!(
585                    state,
586                    PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
587                ) {
588                    state = PersonaLifecycleState::Idle;
589                }
590            }
591            "persona.run.started" => state = PersonaLifecycleState::Running,
592            "persona.run.completed" => {
593                last_run_ms = event
594                    .payload
595                    .get("completed_at_ms")
596                    .and_then(serde_json::Value::as_i64)
597                    .or(Some(event.occurred_at_ms));
598                if let Some(work_key) = event
599                    .payload
600                    .get("work_key")
601                    .and_then(serde_json::Value::as_str)
602                {
603                    completed.insert(work_key.to_string());
604                }
605                if !matches!(
606                    state,
607                    PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
608                ) {
609                    state = PersonaLifecycleState::Idle;
610                }
611            }
612            "persona.run.failed" => {
613                state = PersonaLifecycleState::Failed;
614                last_error = event
615                    .payload
616                    .get("error")
617                    .and_then(serde_json::Value::as_str)
618                    .map(ToString::to_string);
619            }
620            "persona.trigger.queued" => {
621                if let Some(work_key) = event
622                    .payload
623                    .get("work_key")
624                    .and_then(serde_json::Value::as_str)
625                {
626                    queued.insert(work_key.to_string());
627                }
628                if let Some(item) = queued_work_from_event(&event)? {
629                    queued_work.insert(item.work_key.clone(), item);
630                }
631            }
632            "persona.trigger.dead_lettered" => disabled_events += 1,
633            "persona.budget.recorded" => {
634                budget_receipt = event
635                    .payload
636                    .get("receipt_id")
637                    .and_then(serde_json::Value::as_str)
638                    .map(ToString::to_string);
639                spent.push((
640                    event.occurred_at_ms,
641                    event
642                        .payload
643                        .get("cost_usd")
644                        .and_then(serde_json::Value::as_f64)
645                        .unwrap_or_default(),
646                    event
647                        .payload
648                        .get("tokens")
649                        .and_then(serde_json::Value::as_u64)
650                        .unwrap_or_default(),
651                ));
652            }
653            "persona.budget.exhausted" => {
654                budget_exhaustion_reason = event
655                    .payload
656                    .get("reason")
657                    .and_then(serde_json::Value::as_str)
658                    .map(ToString::to_string);
659                last_error = budget_exhaustion_reason
660                    .as_ref()
661                    .map(|reason| format!("persona budget exhausted: {reason}"));
662                budget_receipt = event
663                    .payload
664                    .get("receipt_id")
665                    .and_then(serde_json::Value::as_str)
666                    .map(ToString::to_string);
667            }
668            kind if kind.starts_with("persona.value.") => {
669                if let Some(receipt) = value_receipt_from_event(&event)? {
670                    value_receipts.push(receipt);
671                }
672            }
673            _ => {}
674        }
675    }
676
677    if let Some(lease) = active_lease.as_ref() {
678        if lease.expires_at_ms <= now_ms {
679            active_lease = None;
680            if !matches!(
681                state,
682                PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
683            ) {
684                state = PersonaLifecycleState::Idle;
685            }
686        }
687    }
688
689    queued.retain(|work_key| !completed.contains(work_key));
690    queued_work.retain(|work_key, _| !completed.contains(work_key));
691    let queued_work = queued_work.into_values().collect::<Vec<_>>();
692    let handoff_inbox = queued_work
693        .iter()
694        .filter_map(handoff_inbox_item)
695        .collect::<Vec<_>>();
696
697    let mut budget = budget_status(&binding.budget, &spent, now_ms);
698    if budget.reason.is_none() {
699        if let Some(reason) = budget_exhaustion_reason {
700            budget.exhausted = true;
701            budget.reason = Some(reason);
702        }
703    }
704    if budget.last_receipt_id.is_none() {
705        budget.last_receipt_id = budget_receipt;
706    }
707
708    let current_assignment = active_lease.as_ref().map(assignment_status_from_lease);
709
710    Ok(PersonaStatus {
711        name: binding.name.clone(),
712        template_ref: binding.template_ref.clone(),
713        state,
714        entry_workflow: binding.entry_workflow.clone(),
715        role: binding.name.clone(),
716        current_assignment,
717        last_run: last_run_ms.map(format_ms),
718        next_scheduled_run: next_scheduled_run(binding, last_run_ms, now_ms),
719        active_lease,
720        budget,
721        last_error,
722        queued_events: queued.len(),
723        queued_work,
724        handoff_inbox,
725        value_receipts,
726        disabled_events,
727        paused_event_policy: "queue_then_drain_on_resume".to_string(),
728    })
729}
730
731pub async fn pause_persona(
732    log: &Arc<AnyEventLog>,
733    binding: &PersonaRuntimeBinding,
734    now_ms: i64,
735) -> Result<PersonaStatus, String> {
736    append_persona_event(
737        log,
738        &binding.name,
739        "persona.control.paused",
740        json!({"paused_at_ms": now_ms, "policy": "queue_then_drain_on_resume"}),
741        now_ms,
742    )
743    .await?;
744    persona_status(log, binding, now_ms).await
745}
746
747pub async fn resume_persona(
748    log: &Arc<AnyEventLog>,
749    binding: &PersonaRuntimeBinding,
750    now_ms: i64,
751) -> Result<PersonaStatus, String> {
752    append_persona_event(
753        log,
754        &binding.name,
755        "persona.control.resumed",
756        json!({"resumed_at_ms": now_ms, "drain": true}),
757        now_ms,
758    )
759    .await?;
760    let queued = queued_events(log, &binding.name).await?;
761    for (envelope, cost) in queued {
762        let _ = run_for_envelope(log, binding, envelope, cost, now_ms).await?;
763    }
764    persona_status(log, binding, now_ms).await
765}
766
767pub async fn disable_persona(
768    log: &Arc<AnyEventLog>,
769    binding: &PersonaRuntimeBinding,
770    now_ms: i64,
771) -> Result<PersonaStatus, String> {
772    append_persona_event(
773        log,
774        &binding.name,
775        "persona.control.disabled",
776        json!({"disabled_at_ms": now_ms}),
777        now_ms,
778    )
779    .await?;
780    persona_status(log, binding, now_ms).await
781}
782
783pub async fn fire_schedule(
784    log: &Arc<AnyEventLog>,
785    binding: &PersonaRuntimeBinding,
786    cost: PersonaRunCost,
787    now_ms: i64,
788) -> Result<PersonaRunReceipt, String> {
789    let schedule = binding
790        .schedules
791        .first()
792        .cloned()
793        .unwrap_or_else(|| "manual".to_string());
794    let envelope = PersonaTriggerEnvelope {
795        provider: "schedule".to_string(),
796        kind: "cron.tick".to_string(),
797        subject_key: format!("schedule:{}:{schedule}:{}", binding.name, format_ms(now_ms)),
798        source_event_id: None,
799        received_at_ms: now_ms,
800        metadata: BTreeMap::from([
801            ("persona".to_string(), binding.name.clone()),
802            ("schedule".to_string(), schedule),
803            ("fired_at".to_string(), format_ms(now_ms)),
804        ]),
805        raw: json!({}),
806    };
807    append_persona_event(
808        log,
809        &binding.name,
810        "persona.schedule.fired",
811        json!({"persona": binding.name, "envelope": envelope}),
812        now_ms,
813    )
814    .await?;
815    run_for_envelope(log, binding, envelope, cost, now_ms).await
816}
817
818pub async fn fire_trigger(
819    log: &Arc<AnyEventLog>,
820    binding: &PersonaRuntimeBinding,
821    provider: &str,
822    kind: &str,
823    metadata: BTreeMap<String, String>,
824    cost: PersonaRunCost,
825    now_ms: i64,
826) -> Result<PersonaRunReceipt, String> {
827    let envelope = normalize_trigger_envelope(provider, kind, metadata, now_ms);
828    append_persona_event(
829        log,
830        &binding.name,
831        "persona.trigger.received",
832        json!({"persona": binding.name, "envelope": envelope}),
833        now_ms,
834    )
835    .await?;
836    run_for_envelope(log, binding, envelope, cost, now_ms).await
837}
838
839pub async fn record_persona_spend(
840    log: &Arc<AnyEventLog>,
841    binding: &PersonaRuntimeBinding,
842    cost: PersonaRunCost,
843    now_ms: i64,
844) -> Result<PersonaBudgetStatus, String> {
845    enforce_budget(log, binding, &cost, now_ms).await?;
846    append_budget_record(log, &binding.name, &cost, None, now_ms).await?;
847    persona_status(log, binding, now_ms)
848        .await
849        .map(|status| status.budget)
850}
851
852/// Report a `repair_worker_status` lifecycle transition for a sandboxed PR
853/// repair run.
854///
855/// Append-only and idempotent on `(repair_worker_id, lifecycle)`: replaying
856/// the same lifecycle is a no-op (`Ok(false)` indicates the event was already
857/// recorded). Hosted runtimes call this when a repair-worker job created by
858/// the persona transitions states.
859pub async fn report_repair_worker_status(
860    log: &Arc<AnyEventLog>,
861    binding: &PersonaRuntimeBinding,
862    status: PersonaRepairWorkerStatusUpdate,
863    now_ms: i64,
864) -> Result<bool, String> {
865    let mut status = status;
866    if status.persona_id.is_empty() {
867        status.persona_id = binding.name.clone();
868    }
869    if status.template_ref.is_none() {
870        status.template_ref = binding.template_ref.clone();
871    }
872    if status.occurred_at_ms == 0 {
873        status.occurred_at_ms = now_ms;
874    }
875    if status.last_heartbeat_ms == 0 {
876        status.last_heartbeat_ms = now_ms;
877    }
878
879    if repair_worker_status_recorded(log, &binding.name, &status).await? {
880        return Ok(false);
881    }
882    append_persona_event(
883        log,
884        &binding.name,
885        "persona.repair_worker.status",
886        serde_json::to_value(&status).map_err(|error| error.to_string())?,
887        status.occurred_at_ms,
888    )
889    .await?;
890    record_persona_supervision_event(
891        log,
892        &binding.name,
893        PersonaSupervisionEvent::RepairWorkerStatus(status),
894    )
895    .await?;
896    Ok(true)
897}
898
899/// Acknowledge a checkpoint-restore request initiated by the supervision API.
900///
901/// Idempotent on `checkpoint_id`: a repeated restore request resolves to the
902/// same ack (returns `Ok(false)`). The runtime emits a typed
903/// `Checkpoint(action: RestoreAcked)` supervision event carrying the
904/// coordinates the runtime resumed from.
905pub async fn restore_persona_checkpoint(
906    log: &Arc<AnyEventLog>,
907    binding: &PersonaRuntimeBinding,
908    request: PersonaCheckpointRestoreRequest,
909    now_ms: i64,
910) -> Result<PersonaCheckpointRestoreOutcome, String> {
911    let PersonaCheckpointRestoreRequest {
912        checkpoint_id,
913        work_key,
914        resumed_from,
915    } = request;
916    let status = persona_status(log, binding, now_ms).await?;
917
918    if let Some(prior) = find_checkpoint_restore_ack(log, &binding.name, &checkpoint_id).await? {
919        return Ok(PersonaCheckpointRestoreOutcome {
920            acked: false,
921            update: prior,
922        });
923    }
924
925    let resume_coordinates = resumed_from.unwrap_or_else(|| PersonaCheckpointResume {
926        run_id: None,
927        lease_id: status.active_lease.as_ref().map(|lease| lease.id.clone()),
928        last_run_ms: status
929            .last_run
930            .as_deref()
931            .and_then(|value| parse_rfc3339_ms(value).ok()),
932        queued_work_keys: status
933            .queued_work
934            .iter()
935            .map(|item| item.work_key.clone())
936            .collect(),
937        note: None,
938    });
939
940    let update = PersonaCheckpointUpdate {
941        persona_id: binding.name.clone(),
942        template_ref: binding.template_ref.clone(),
943        action: PersonaCheckpointAction::RestoreAcked,
944        checkpoint_id: checkpoint_id.clone(),
945        work_key,
946        resumed_from: Some(resume_coordinates),
947        occurred_at_ms: now_ms,
948    };
949
950    append_persona_event(
951        log,
952        &binding.name,
953        "persona.checkpoint.restore_acked",
954        serde_json::to_value(&update).map_err(|error| error.to_string())?,
955        now_ms,
956    )
957    .await?;
958    record_persona_supervision_event(
959        log,
960        &binding.name,
961        PersonaSupervisionEvent::Checkpoint(update.clone()),
962    )
963    .await?;
964    Ok(PersonaCheckpointRestoreOutcome {
965        acked: true,
966        update,
967    })
968}
969
970#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
971pub struct PersonaCheckpointRestoreRequest {
972    pub checkpoint_id: String,
973    #[serde(default)]
974    pub work_key: Option<String>,
975    #[serde(default)]
976    pub resumed_from: Option<PersonaCheckpointResume>,
977}
978
979#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
980pub struct PersonaCheckpointRestoreOutcome {
981    pub acked: bool,
982    pub update: PersonaCheckpointUpdate,
983}
984
985async fn repair_worker_status_recorded(
986    log: &Arc<AnyEventLog>,
987    persona: &str,
988    update: &PersonaRepairWorkerStatusUpdate,
989) -> Result<bool, String> {
990    let events = read_persona_events(log, persona).await?;
991    Ok(events.into_iter().any(|(_, event)| {
992        event.kind == "persona.repair_worker.status"
993            && event
994                .payload
995                .get("repair_worker_id")
996                .and_then(serde_json::Value::as_str)
997                == Some(update.repair_worker_id.as_str())
998            && event
999                .payload
1000                .get("lifecycle")
1001                .and_then(serde_json::Value::as_str)
1002                == Some(update.lifecycle.as_str())
1003    }))
1004}
1005
1006async fn find_checkpoint_restore_ack(
1007    log: &Arc<AnyEventLog>,
1008    persona: &str,
1009    checkpoint_id: &str,
1010) -> Result<Option<PersonaCheckpointUpdate>, String> {
1011    let events = read_persona_events(log, persona).await?;
1012    for (_, event) in events.into_iter().rev() {
1013        if event.kind != "persona.checkpoint.restore_acked" {
1014            continue;
1015        }
1016        if event
1017            .payload
1018            .get("checkpoint_id")
1019            .and_then(serde_json::Value::as_str)
1020            != Some(checkpoint_id)
1021        {
1022            continue;
1023        }
1024        let update: PersonaCheckpointUpdate =
1025            serde_json::from_value(event.payload).map_err(|error| error.to_string())?;
1026        return Ok(Some(update));
1027    }
1028    Ok(None)
1029}
1030
1031async fn run_for_envelope(
1032    log: &Arc<AnyEventLog>,
1033    binding: &PersonaRuntimeBinding,
1034    envelope: PersonaTriggerEnvelope,
1035    cost: PersonaRunCost,
1036    now_ms: i64,
1037) -> Result<PersonaRunReceipt, String> {
1038    let pre_queue = queue_snapshot(log, binding, now_ms).await?;
1039    let receipt = run_for_envelope_inner(log, binding, envelope, cost, now_ms).await?;
1040    let post_queue = queue_snapshot(log, binding, now_ms).await?;
1041    emit_queue_position_supervision(log, binding, &pre_queue, &post_queue, now_ms).await?;
1042    emit_receipt_supervision(log, binding, &receipt, now_ms).await?;
1043    Ok(receipt)
1044}
1045
1046async fn run_for_envelope_inner(
1047    log: &Arc<AnyEventLog>,
1048    binding: &PersonaRuntimeBinding,
1049    envelope: PersonaTriggerEnvelope,
1050    cost: PersonaRunCost,
1051    now_ms: i64,
1052) -> Result<PersonaRunReceipt, String> {
1053    let status = persona_status(log, binding, now_ms).await?;
1054    match status.state {
1055        PersonaLifecycleState::Paused => {
1056            append_persona_event(
1057                log,
1058                &binding.name,
1059                "persona.trigger.queued",
1060                json!({
1061                    "work_key": envelope.subject_key,
1062                    "envelope": envelope,
1063                    "cost": cost,
1064                    "reason": "paused",
1065                }),
1066                now_ms,
1067            )
1068            .await?;
1069            return Ok(PersonaRunReceipt {
1070                status: "queued".to_string(),
1071                persona: binding.name.clone(),
1072                run_id: None,
1073                work_key: envelope.subject_key,
1074                lease: None,
1075                queued: true,
1076                error: None,
1077                budget_receipt_id: None,
1078            });
1079        }
1080        PersonaLifecycleState::Disabled => {
1081            append_persona_event(
1082                log,
1083                &binding.name,
1084                "persona.trigger.dead_lettered",
1085                json!({
1086                    "work_key": envelope.subject_key,
1087                    "envelope": envelope,
1088                    "reason": "disabled",
1089                }),
1090                now_ms,
1091            )
1092            .await?;
1093            return Ok(PersonaRunReceipt {
1094                status: "dead_lettered".to_string(),
1095                persona: binding.name.clone(),
1096                run_id: None,
1097                work_key: envelope.subject_key,
1098                lease: None,
1099                queued: false,
1100                error: Some("persona is disabled".to_string()),
1101                budget_receipt_id: None,
1102            });
1103        }
1104        _ => {}
1105    }
1106
1107    if let Err(error) = enforce_budget(log, binding, &cost, now_ms).await {
1108        return Ok(PersonaRunReceipt {
1109            status: "budget_exhausted".to_string(),
1110            persona: binding.name.clone(),
1111            run_id: None,
1112            work_key: envelope.subject_key,
1113            lease: None,
1114            queued: false,
1115            error: Some(error),
1116            budget_receipt_id: None,
1117        });
1118    }
1119
1120    if work_completed(log, &binding.name, &envelope.subject_key).await? {
1121        append_persona_event(
1122            log,
1123            &binding.name,
1124            "persona.trigger.duplicate",
1125            json!({
1126                "work_key": envelope.subject_key,
1127                "envelope": envelope,
1128                "reason": "already_completed",
1129            }),
1130            now_ms,
1131        )
1132        .await?;
1133        return Ok(PersonaRunReceipt {
1134            status: "duplicate".to_string(),
1135            persona: binding.name.clone(),
1136            run_id: None,
1137            work_key: envelope.subject_key,
1138            lease: None,
1139            queued: false,
1140            error: None,
1141            budget_receipt_id: None,
1142        });
1143    }
1144
1145    let Some(lease) = acquire_lease(
1146        log,
1147        binding,
1148        &envelope.subject_key,
1149        "persona-runtime",
1150        DEFAULT_LEASE_TTL_MS,
1151        now_ms,
1152    )
1153    .await?
1154    else {
1155        return Ok(PersonaRunReceipt {
1156            status: "lease_busy".to_string(),
1157            persona: binding.name.clone(),
1158            run_id: None,
1159            work_key: envelope.subject_key,
1160            lease: status.active_lease,
1161            queued: false,
1162            error: Some("active lease already owns persona work".to_string()),
1163            budget_receipt_id: None,
1164        });
1165    };
1166
1167    let run_id = Uuid::now_v7();
1168    let value_metadata = run_value_metadata(&envelope, &lease, &cost);
1169    append_persona_event(
1170        log,
1171        &binding.name,
1172        "persona.run.started",
1173        json!({
1174            "work_key": envelope.subject_key,
1175            "run_id": run_id,
1176            "started_at_ms": now_ms,
1177            "entry_workflow": binding.entry_workflow,
1178            "lease_id": lease.id,
1179        }),
1180        now_ms,
1181    )
1182    .await?;
1183    emit_persona_value_event(
1184        log,
1185        binding,
1186        run_id,
1187        PersonaValueEventDelta {
1188            kind: PersonaValueEventKind::RunStarted,
1189            metadata: value_metadata.clone(),
1190            ..Default::default()
1191        },
1192        now_ms,
1193    )
1194    .await?;
1195    let budget_receipt_id =
1196        append_budget_record(log, &binding.name, &cost, Some(&lease.id), now_ms).await?;
1197    if cost.avoided_cost_usd > 0.0 || cost.deterministic_steps > 0 {
1198        emit_persona_value_event(
1199            log,
1200            binding,
1201            run_id,
1202            PersonaValueEventDelta {
1203                kind: PersonaValueEventKind::DeterministicExecution,
1204                avoided_cost_usd: cost.avoided_cost_usd,
1205                deterministic_steps: cost.deterministic_steps.max(1),
1206                metadata: value_metadata.clone(),
1207                ..Default::default()
1208            },
1209            now_ms,
1210        )
1211        .await?;
1212    }
1213    if cost.frontier_escalations > 0 {
1214        emit_persona_value_event(
1215            log,
1216            binding,
1217            run_id,
1218            PersonaValueEventDelta {
1219                kind: PersonaValueEventKind::FrontierEscalation,
1220                paid_cost_usd: cost.cost_usd,
1221                llm_steps: cost.llm_steps.max(cost.frontier_escalations),
1222                metadata: value_metadata.clone(),
1223                ..Default::default()
1224            },
1225            now_ms,
1226        )
1227        .await?;
1228    }
1229    let completion_paid_cost = if cost.frontier_escalations > 0 {
1230        0.0
1231    } else {
1232        cost.cost_usd
1233    };
1234    let completion_llm_steps = if cost.frontier_escalations > 0 {
1235        0
1236    } else {
1237        cost.llm_steps
1238    };
1239    emit_persona_value_event(
1240        log,
1241        binding,
1242        run_id,
1243        PersonaValueEventDelta {
1244            kind: PersonaValueEventKind::RunCompleted,
1245            paid_cost_usd: completion_paid_cost,
1246            llm_steps: completion_llm_steps,
1247            metadata: value_metadata,
1248            ..Default::default()
1249        },
1250        now_ms,
1251    )
1252    .await?;
1253    append_persona_event(
1254        log,
1255        &binding.name,
1256        "persona.run.completed",
1257        json!({
1258            "work_key": envelope.subject_key,
1259            "run_id": run_id,
1260            "completed_at_ms": now_ms,
1261            "entry_workflow": binding.entry_workflow,
1262            "lease_id": lease.id,
1263        }),
1264        now_ms,
1265    )
1266    .await?;
1267    append_persona_event(
1268        log,
1269        &binding.name,
1270        "persona.lease.released",
1271        json!({
1272            "id": lease.id,
1273            "work_key": envelope.subject_key,
1274            "released_at_ms": now_ms,
1275        }),
1276        now_ms,
1277    )
1278    .await?;
1279    Ok(PersonaRunReceipt {
1280        status: "completed".to_string(),
1281        persona: binding.name.clone(),
1282        run_id: Some(run_id),
1283        work_key: envelope.subject_key,
1284        lease: Some(lease),
1285        queued: false,
1286        error: None,
1287        budget_receipt_id: Some(budget_receipt_id),
1288    })
1289}
1290
1291async fn acquire_lease(
1292    log: &Arc<AnyEventLog>,
1293    binding: &PersonaRuntimeBinding,
1294    work_key: &str,
1295    holder: &str,
1296    ttl_ms: i64,
1297    now_ms: i64,
1298) -> Result<Option<PersonaLease>, String> {
1299    let status = persona_status(log, binding, now_ms).await?;
1300    if let Some(lease) = status.active_lease {
1301        if lease.expires_at_ms > now_ms {
1302            append_persona_event(
1303                log,
1304                &binding.name,
1305                "persona.lease.conflict",
1306                json!({
1307                    "active_lease": lease,
1308                    "requested_work_key": work_key,
1309                    "at_ms": now_ms,
1310                }),
1311                now_ms,
1312            )
1313            .await?;
1314            return Ok(None);
1315        }
1316        append_persona_event(
1317            log,
1318            &binding.name,
1319            "persona.lease.expired",
1320            json!({
1321                "id": lease.id,
1322                "work_key": lease.work_key,
1323                "expired_at_ms": now_ms,
1324            }),
1325            now_ms,
1326        )
1327        .await?;
1328    }
1329
1330    let lease = PersonaLease {
1331        id: format!("persona_lease_{}", Uuid::now_v7()),
1332        holder: holder.to_string(),
1333        work_key: work_key.to_string(),
1334        acquired_at_ms: now_ms,
1335        expires_at_ms: now_ms + ttl_ms,
1336    };
1337    append_persona_event(
1338        log,
1339        &binding.name,
1340        "persona.lease.acquired",
1341        serde_json::to_value(&lease).map_err(|error| error.to_string())?,
1342        now_ms,
1343    )
1344    .await?;
1345    Ok(Some(lease))
1346}
1347
1348async fn enforce_budget(
1349    log: &Arc<AnyEventLog>,
1350    binding: &PersonaRuntimeBinding,
1351    cost: &PersonaRunCost,
1352    now_ms: i64,
1353) -> Result<(), String> {
1354    let status = persona_status(log, binding, now_ms).await?;
1355    let reason = if binding
1356        .budget
1357        .run_usd
1358        .is_some_and(|limit| cost.cost_usd > limit)
1359    {
1360        Some("run_usd")
1361    } else if binding
1362        .budget
1363        .daily_usd
1364        .is_some_and(|limit| status.budget.spent_today_usd + cost.cost_usd > limit)
1365    {
1366        Some("daily_usd")
1367    } else if binding
1368        .budget
1369        .hourly_usd
1370        .is_some_and(|limit| status.budget.spent_this_hour_usd + cost.cost_usd > limit)
1371    {
1372        Some("hourly_usd")
1373    } else if binding
1374        .budget
1375        .max_tokens
1376        .is_some_and(|limit| status.budget.tokens_today + cost.tokens > limit)
1377    {
1378        Some("max_tokens")
1379    } else {
1380        None
1381    };
1382
1383    if let Some(reason) = reason {
1384        let receipt_id = format!("persona_budget_{}", Uuid::now_v7());
1385        append_persona_event(
1386            log,
1387            &binding.name,
1388            "persona.budget.exhausted",
1389            json!({
1390                "receipt_id": receipt_id,
1391                "reason": reason,
1392                "attempted_cost_usd": cost.cost_usd,
1393                "attempted_tokens": cost.tokens,
1394                "persona": binding.name,
1395            }),
1396            now_ms,
1397        )
1398        .await?;
1399        return Err(format!("persona budget exhausted: {reason}"));
1400    }
1401
1402    Ok(())
1403}
1404
1405async fn append_budget_record(
1406    log: &Arc<AnyEventLog>,
1407    persona: &str,
1408    cost: &PersonaRunCost,
1409    lease_id: Option<&str>,
1410    now_ms: i64,
1411) -> Result<String, String> {
1412    let receipt_id = format!("persona_budget_{}", Uuid::now_v7());
1413    append_persona_event(
1414        log,
1415        persona,
1416        "persona.budget.recorded",
1417        json!({
1418            "receipt_id": receipt_id,
1419            "persona": persona,
1420            "cost_usd": cost.cost_usd,
1421            "tokens": cost.tokens,
1422            "lease_id": lease_id,
1423        }),
1424        now_ms,
1425    )
1426    .await?;
1427    Ok(receipt_id)
1428}
1429
1430fn normalize_trigger_envelope(
1431    provider: &str,
1432    kind: &str,
1433    metadata: BTreeMap<String, String>,
1434    now_ms: i64,
1435) -> PersonaTriggerEnvelope {
1436    let provider = provider.to_ascii_lowercase();
1437    let kind = kind.to_string();
1438    let source_event_id = metadata
1439        .get("event_id")
1440        .or_else(|| metadata.get("id"))
1441        .cloned();
1442    let subject_key = match provider.as_str() {
1443        "github" => {
1444            let repo = metadata
1445                .get("repository")
1446                .or_else(|| metadata.get("repository.full_name"))
1447                .cloned()
1448                .unwrap_or_else(|| "unknown".to_string());
1449            if let Some(number) = metadata
1450                .get("pr")
1451                .or_else(|| metadata.get("pull_request.number"))
1452                .or_else(|| metadata.get("number"))
1453            {
1454                format!("github:{repo}:pr:{number}")
1455            } else if let Some(check) = metadata
1456                .get("check_run.name")
1457                .or_else(|| metadata.get("check_name"))
1458            {
1459                format!("github:{repo}:check:{check}")
1460            } else {
1461                format!("github:{repo}:{kind}")
1462            }
1463        }
1464        "linear" => {
1465            let issue = metadata
1466                .get("issue_key")
1467                .or_else(|| metadata.get("issue.identifier"))
1468                .or_else(|| metadata.get("issue_id"))
1469                .or_else(|| metadata.get("id"))
1470                .cloned()
1471                .unwrap_or_else(|| "unknown".to_string());
1472            format!("linear:issue:{issue}")
1473        }
1474        "slack" => {
1475            let channel = metadata
1476                .get("channel")
1477                .or_else(|| metadata.get("channel_id"))
1478                .cloned()
1479                .unwrap_or_else(|| "unknown".to_string());
1480            let ts = metadata
1481                .get("ts")
1482                .or_else(|| metadata.get("event_ts"))
1483                .cloned()
1484                .unwrap_or_else(|| "unknown".to_string());
1485            format!("slack:{channel}:{ts}")
1486        }
1487        "webhook" => metadata
1488            .get("dedupe_key")
1489            .or_else(|| metadata.get("event_id"))
1490            .map(|value| format!("webhook:{value}"))
1491            .unwrap_or_else(|| format!("webhook:{kind}:{}", Uuid::now_v7())),
1492        _ => metadata
1493            .get("dedupe_key")
1494            .or_else(|| metadata.get("event_id"))
1495            .map(|value| format!("{provider}:{kind}:{value}"))
1496            .unwrap_or_else(|| format!("{provider}:{kind}:{}", Uuid::now_v7())),
1497    };
1498
1499    PersonaTriggerEnvelope {
1500        provider,
1501        kind,
1502        subject_key,
1503        source_event_id,
1504        received_at_ms: now_ms,
1505        raw: json!({"metadata": metadata}),
1506        metadata,
1507    }
1508}
1509
1510async fn queued_events(
1511    log: &Arc<AnyEventLog>,
1512    persona: &str,
1513) -> Result<Vec<(PersonaTriggerEnvelope, PersonaRunCost)>, String> {
1514    let events = read_persona_events(log, persona).await?;
1515    let mut queued = BTreeMap::<String, (PersonaTriggerEnvelope, PersonaRunCost)>::new();
1516    let mut completed = BTreeSet::<String>::new();
1517    for (_, event) in events {
1518        match event.kind.as_str() {
1519            "persona.trigger.queued" => {
1520                let Some(envelope) = event.payload.get("envelope") else {
1521                    continue;
1522                };
1523                let envelope: PersonaTriggerEnvelope =
1524                    serde_json::from_value(envelope.clone()).map_err(|error| error.to_string())?;
1525                let cost = event
1526                    .payload
1527                    .get("cost")
1528                    .cloned()
1529                    .map(serde_json::from_value::<PersonaRunCost>)
1530                    .transpose()
1531                    .map_err(|error| error.to_string())?
1532                    .unwrap_or_default();
1533                queued.insert(envelope.subject_key.clone(), (envelope, cost));
1534            }
1535            "persona.run.completed" => {
1536                if let Some(work_key) = event
1537                    .payload
1538                    .get("work_key")
1539                    .and_then(serde_json::Value::as_str)
1540                {
1541                    completed.insert(work_key.to_string());
1542                }
1543            }
1544            _ => {}
1545        }
1546    }
1547    queued.retain(|work_key, _| !completed.contains(work_key));
1548    Ok(queued.into_values().collect())
1549}
1550
1551fn assignment_status_from_lease(lease: &PersonaLease) -> PersonaAssignmentStatus {
1552    PersonaAssignmentStatus {
1553        work_key: lease.work_key.clone(),
1554        lease_id: lease.id.clone(),
1555        holder: lease.holder.clone(),
1556        acquired_at: format_ms(lease.acquired_at_ms),
1557        expires_at: format_ms(lease.expires_at_ms),
1558    }
1559}
1560
1561fn queued_work_from_event(event: &LogEvent) -> Result<Option<PersonaQueuedWork>, String> {
1562    let Some(envelope) = event.payload.get("envelope") else {
1563        return Ok(None);
1564    };
1565    let envelope: PersonaTriggerEnvelope =
1566        serde_json::from_value(envelope.clone()).map_err(|error| error.to_string())?;
1567    Ok(Some(PersonaQueuedWork {
1568        work_key: envelope.subject_key,
1569        provider: envelope.provider,
1570        kind: envelope.kind,
1571        queued_at: format_ms(event.occurred_at_ms),
1572        reason: event
1573            .payload
1574            .get("reason")
1575            .and_then(serde_json::Value::as_str)
1576            .unwrap_or("queued")
1577            .to_string(),
1578        source_event_id: envelope.source_event_id,
1579        metadata: envelope.metadata,
1580    }))
1581}
1582
1583fn handoff_inbox_item(work: &PersonaQueuedWork) -> Option<PersonaHandoffInboxItem> {
1584    if work.provider != "handoff" && !work.metadata.contains_key("handoff_id") {
1585        return None;
1586    }
1587    Some(PersonaHandoffInboxItem {
1588        work_key: work.work_key.clone(),
1589        handoff_id: work.metadata.get("handoff_id").cloned(),
1590        handoff_kind: work
1591            .metadata
1592            .get("handoff_kind")
1593            .or_else(|| work.metadata.get("kind"))
1594            .cloned(),
1595        source_persona: work.metadata.get("source_persona").cloned(),
1596        task: work.metadata.get("task").cloned(),
1597        queued_at: work.queued_at.clone(),
1598        reason: work.reason.clone(),
1599    })
1600}
1601
1602fn value_receipt_from_event(event: &LogEvent) -> Result<Option<PersonaValueReceipt>, String> {
1603    let Ok(value_event) = serde_json::from_value::<PersonaValueEvent>(event.payload.clone()) else {
1604        return Ok(None);
1605    };
1606    Ok(Some(PersonaValueReceipt {
1607        kind: value_event.kind,
1608        run_id: value_event.run_id,
1609        occurred_at: value_event
1610            .occurred_at
1611            .format(&Rfc3339)
1612            .map_err(|error| error.to_string())?,
1613        paid_cost_usd: value_event.paid_cost_usd,
1614        avoided_cost_usd: value_event.avoided_cost_usd,
1615        deterministic_steps: value_event.deterministic_steps,
1616        llm_steps: value_event.llm_steps,
1617        metadata: value_event.metadata,
1618    }))
1619}
1620
1621async fn work_completed(
1622    log: &Arc<AnyEventLog>,
1623    persona: &str,
1624    work_key: &str,
1625) -> Result<bool, String> {
1626    let events = read_persona_events(log, persona).await?;
1627    Ok(events.into_iter().any(|(_, event)| {
1628        event.kind == "persona.run.completed"
1629            && event
1630                .payload
1631                .get("work_key")
1632                .and_then(serde_json::Value::as_str)
1633                == Some(work_key)
1634    }))
1635}
1636
1637async fn read_persona_events(
1638    log: &Arc<AnyEventLog>,
1639    persona: &str,
1640) -> Result<Vec<(u64, LogEvent)>, String> {
1641    let topic = runtime_topic()?;
1642    Ok(log
1643        .read_range(&topic, None, usize::MAX)
1644        .await
1645        .map_err(|error| error.to_string())?
1646        .into_iter()
1647        .filter(|(_, event)| {
1648            event
1649                .headers
1650                .get("persona")
1651                .is_some_and(|name| name == persona)
1652        })
1653        .collect())
1654}
1655
1656async fn append_persona_event(
1657    log: &Arc<AnyEventLog>,
1658    persona: &str,
1659    kind: &str,
1660    payload: serde_json::Value,
1661    now_ms: i64,
1662) -> Result<u64, String> {
1663    let mut headers = BTreeMap::new();
1664    headers.insert("persona".to_string(), persona.to_string());
1665    forward_persona_run_event(persona, kind, &payload);
1666    let event = LogEvent {
1667        kind: kind.to_string(),
1668        payload,
1669        headers,
1670        occurred_at_ms: now_ms,
1671    };
1672    log.append(&runtime_topic()?, event)
1673        .await
1674        .map_err(|error| error.to_string())
1675}
1676
1677/// Mirror persona-stage transitions onto the run-events sink for
1678/// `harn run --json`. Both `persona.stage.*` (per-stage moves) and
1679/// `persona.run.*` (whole-run lifecycle) kinds are surfaced as
1680/// [`crate::run_events::RunEvent::PersonaStage`]; the `transition`
1681/// field carries the suffix (`started`, `completed`, `handoff_started`,
1682/// `failed`, ...) and `stage` carries the named stage when present.
1683fn forward_persona_run_event(persona: &str, kind: &str, payload: &serde_json::Value) {
1684    if !crate::run_events::sink_active() {
1685        return;
1686    }
1687    let transition = kind
1688        .strip_prefix("persona.stage.")
1689        .or_else(|| kind.strip_prefix("persona.run."));
1690    let Some(transition) = transition else {
1691        return;
1692    };
1693    let stage = payload
1694        .get("stage")
1695        .or_else(|| payload.get("to"))
1696        .or_else(|| payload.get("name"))
1697        .and_then(|value| value.as_str())
1698        .unwrap_or("")
1699        .to_string();
1700    crate::run_events::emit(crate::run_events::RunEvent::PersonaStage {
1701        persona: persona.to_string(),
1702        stage,
1703        transition: transition.to_string(),
1704    });
1705}
1706
1707struct PersonaValueEventDelta {
1708    kind: PersonaValueEventKind,
1709    paid_cost_usd: f64,
1710    avoided_cost_usd: f64,
1711    deterministic_steps: i64,
1712    llm_steps: i64,
1713    metadata: serde_json::Value,
1714}
1715
1716impl Default for PersonaValueEventDelta {
1717    fn default() -> Self {
1718        Self {
1719            kind: PersonaValueEventKind::RunCompleted,
1720            paid_cost_usd: 0.0,
1721            avoided_cost_usd: 0.0,
1722            deterministic_steps: 0,
1723            llm_steps: 0,
1724            metadata: serde_json::Value::Null,
1725        }
1726    }
1727}
1728
1729async fn emit_persona_value_event(
1730    log: &Arc<AnyEventLog>,
1731    binding: &PersonaRuntimeBinding,
1732    run_id: Uuid,
1733    delta: PersonaValueEventDelta,
1734    now_ms: i64,
1735) -> Result<(), String> {
1736    let event = PersonaValueEvent {
1737        persona_id: binding.name.clone(),
1738        template_ref: binding.template_ref.clone(),
1739        run_id: Some(run_id),
1740        kind: delta.kind,
1741        paid_cost_usd: delta.paid_cost_usd.max(0.0),
1742        avoided_cost_usd: delta.avoided_cost_usd.max(0.0),
1743        deterministic_steps: delta.deterministic_steps.max(0),
1744        llm_steps: delta.llm_steps.max(0),
1745        metadata: delta.metadata,
1746        occurred_at: offset_datetime_from_ms(now_ms),
1747    };
1748    append_persona_event(
1749        log,
1750        &binding.name,
1751        &format!("persona.value.{}", event.kind.as_str()),
1752        serde_json::to_value(&event).map_err(|error| error.to_string())?,
1753        now_ms,
1754    )
1755    .await?;
1756    emit_persona_value_sink_event(&event);
1757    Ok(())
1758}
1759
1760fn emit_persona_value_sink_event(event: &PersonaValueEvent) {
1761    for sink in persona_value_sinks().snapshot() {
1762        sink.handle_value_event(event);
1763    }
1764}
1765
1766fn emit_persona_supervision_sink_event(event: &PersonaSupervisionEvent) {
1767    for sink in persona_supervision_sinks().snapshot() {
1768        sink.handle_supervision_event(event);
1769    }
1770}
1771
1772async fn record_persona_supervision_event(
1773    log: &Arc<AnyEventLog>,
1774    persona: &str,
1775    event: PersonaSupervisionEvent,
1776) -> Result<(), String> {
1777    let update_kind = event.update_kind();
1778    let occurred_at_ms = event.occurred_at_ms();
1779    append_persona_event(
1780        log,
1781        persona,
1782        &format!("persona.supervision.{update_kind}"),
1783        serde_json::to_value(&event).map_err(|error| error.to_string())?,
1784        occurred_at_ms,
1785    )
1786    .await?;
1787    emit_persona_supervision_sink_event(&event);
1788    Ok(())
1789}
1790
1791#[derive(Clone, Debug)]
1792struct QueueEntry {
1793    work_key: String,
1794    queued_at_ms: i64,
1795}
1796
1797async fn queue_snapshot(
1798    log: &Arc<AnyEventLog>,
1799    binding: &PersonaRuntimeBinding,
1800    now_ms: i64,
1801) -> Result<Vec<QueueEntry>, String> {
1802    let status = persona_status(log, binding, now_ms).await?;
1803    Ok(status
1804        .queued_work
1805        .into_iter()
1806        .map(|item| QueueEntry {
1807            queued_at_ms: parse_rfc3339_ms(&item.queued_at).unwrap_or(now_ms),
1808            work_key: item.work_key,
1809        })
1810        .collect())
1811}
1812
1813async fn emit_queue_position_supervision(
1814    log: &Arc<AnyEventLog>,
1815    binding: &PersonaRuntimeBinding,
1816    before: &[QueueEntry],
1817    after: &[QueueEntry],
1818    now_ms: i64,
1819) -> Result<(), String> {
1820    use std::collections::HashSet;
1821    let before_keys: HashSet<&str> = before.iter().map(|e| e.work_key.as_str()).collect();
1822    let after_keys: HashSet<&str> = after.iter().map(|e| e.work_key.as_str()).collect();
1823    let after_depth = after.len() as i64;
1824
1825    for (index, entry) in after.iter().enumerate() {
1826        if !before_keys.contains(entry.work_key.as_str()) {
1827            record_persona_supervision_event(
1828                log,
1829                &binding.name,
1830                PersonaSupervisionEvent::QueuePosition(PersonaQueuePositionUpdate {
1831                    persona_id: binding.name.clone(),
1832                    template_ref: binding.template_ref.clone(),
1833                    work_key: entry.work_key.clone(),
1834                    queue_depth: after_depth,
1835                    position: (index + 1) as i64,
1836                    queued_at_ms: entry.queued_at_ms,
1837                    occurred_at_ms: now_ms,
1838                }),
1839            )
1840            .await?;
1841        }
1842    }
1843    for entry in before {
1844        if !after_keys.contains(entry.work_key.as_str()) {
1845            record_persona_supervision_event(
1846                log,
1847                &binding.name,
1848                PersonaSupervisionEvent::QueuePosition(PersonaQueuePositionUpdate {
1849                    persona_id: binding.name.clone(),
1850                    template_ref: binding.template_ref.clone(),
1851                    work_key: entry.work_key.clone(),
1852                    queue_depth: after_depth,
1853                    position: 0,
1854                    queued_at_ms: entry.queued_at_ms,
1855                    occurred_at_ms: now_ms,
1856                }),
1857            )
1858            .await?;
1859        }
1860    }
1861    Ok(())
1862}
1863
1864async fn emit_receipt_supervision(
1865    log: &Arc<AnyEventLog>,
1866    binding: &PersonaRuntimeBinding,
1867    receipt: &PersonaRunReceipt,
1868    now_ms: i64,
1869) -> Result<(), String> {
1870    record_persona_supervision_event(
1871        log,
1872        &binding.name,
1873        PersonaSupervisionEvent::Receipt(PersonaReceiptUpdate {
1874            persona_id: binding.name.clone(),
1875            template_ref: binding.template_ref.clone(),
1876            receipt: receipt.clone(),
1877            occurred_at_ms: now_ms,
1878        }),
1879    )
1880    .await
1881}
1882
1883fn run_value_metadata(
1884    envelope: &PersonaTriggerEnvelope,
1885    lease: &PersonaLease,
1886    cost: &PersonaRunCost,
1887) -> serde_json::Value {
1888    let mut metadata = serde_json::Map::new();
1889    metadata.insert("work_key".to_string(), json!(envelope.subject_key));
1890    metadata.insert("trigger_provider".to_string(), json!(envelope.provider));
1891    metadata.insert("trigger_kind".to_string(), json!(envelope.kind));
1892    metadata.insert("lease_id".to_string(), json!(lease.id));
1893    metadata.insert("tokens".to_string(), json!(cost.tokens));
1894    if cost.frontier_escalations > 0 {
1895        metadata.insert(
1896            "frontier_escalations".to_string(),
1897            json!(cost.frontier_escalations),
1898        );
1899    }
1900    match &cost.metadata {
1901        serde_json::Value::Null => {}
1902        serde_json::Value::Object(extra) => {
1903            metadata.extend(
1904                extra
1905                    .iter()
1906                    .map(|(key, value)| (key.clone(), value.clone())),
1907            );
1908        }
1909        extra => {
1910            metadata.insert("run_cost_metadata".to_string(), extra.clone());
1911        }
1912    }
1913    serde_json::Value::Object(metadata)
1914}
1915
1916fn budget_status(
1917    policy: &PersonaBudgetPolicy,
1918    spent: &[(i64, f64, u64)],
1919    now_ms: i64,
1920) -> PersonaBudgetStatus {
1921    let day_start = now_ms - (now_ms.rem_euclid(86_400_000));
1922    let hour_start = now_ms - (now_ms.rem_euclid(3_600_000));
1923    let mut spent_today_usd = 0.0;
1924    let mut spent_this_hour_usd = 0.0;
1925    let mut tokens_today = 0u64;
1926    let mut spent_last_run_usd = 0.0;
1927    for (at_ms, cost, tokens) in spent {
1928        spent_last_run_usd = *cost;
1929        if *at_ms >= day_start {
1930            spent_today_usd += cost;
1931            tokens_today += tokens;
1932        }
1933        if *at_ms >= hour_start {
1934            spent_this_hour_usd += cost;
1935        }
1936    }
1937
1938    let remaining_today_usd = policy
1939        .daily_usd
1940        .map(|limit| (limit - spent_today_usd).max(0.0));
1941    let remaining_hour_usd = policy
1942        .hourly_usd
1943        .map(|limit| (limit - spent_this_hour_usd).max(0.0));
1944    let reason = if policy
1945        .daily_usd
1946        .is_some_and(|limit| spent_today_usd >= limit && limit >= 0.0)
1947    {
1948        Some("daily_usd".to_string())
1949    } else if policy
1950        .hourly_usd
1951        .is_some_and(|limit| spent_this_hour_usd >= limit && limit >= 0.0)
1952    {
1953        Some("hourly_usd".to_string())
1954    } else if policy
1955        .max_tokens
1956        .is_some_and(|limit| tokens_today >= limit && limit > 0)
1957    {
1958        Some("max_tokens".to_string())
1959    } else {
1960        None
1961    };
1962
1963    PersonaBudgetStatus {
1964        daily_usd: policy.daily_usd,
1965        hourly_usd: policy.hourly_usd,
1966        run_usd: policy.run_usd,
1967        max_tokens: policy.max_tokens,
1968        spent_today_usd,
1969        spent_this_hour_usd,
1970        spent_last_run_usd,
1971        tokens_today,
1972        remaining_today_usd,
1973        remaining_hour_usd,
1974        exhausted: reason.is_some(),
1975        reason,
1976        last_receipt_id: None,
1977    }
1978}
1979
1980fn next_scheduled_run(
1981    binding: &PersonaRuntimeBinding,
1982    last_run_ms: Option<i64>,
1983    now_ms: i64,
1984) -> Option<String> {
1985    binding
1986        .schedules
1987        .iter()
1988        .filter_map(|schedule| next_cron_ms(schedule, last_run_ms.unwrap_or(now_ms)).ok())
1989        .min()
1990        .map(format_ms)
1991}
1992
1993fn next_cron_ms(schedule: &str, after_ms: i64) -> Result<i64, String> {
1994    let cron = schedule
1995        .parse::<Cron>()
1996        .map_err(|error| error.to_string())?;
1997    let after = Utc
1998        .timestamp_millis_opt(after_ms)
1999        .single()
2000        .ok_or_else(|| "invalid timestamp".to_string())?;
2001    let next = cron
2002        .find_next_occurrence(&after, false)
2003        .map_err(|error| error.to_string())?;
2004    Ok(next.timestamp_millis())
2005}
2006
2007pub fn now_ms() -> i64 {
2008    OffsetDateTime::now_utc().unix_timestamp_nanos() as i64 / 1_000_000
2009}
2010
2011fn offset_datetime_from_ms(ms: i64) -> OffsetDateTime {
2012    OffsetDateTime::from_unix_timestamp_nanos((ms as i128) * 1_000_000)
2013        .unwrap_or(OffsetDateTime::UNIX_EPOCH)
2014}
2015
2016pub fn format_ms(ms: i64) -> String {
2017    offset_datetime_from_ms(ms)
2018        .format(&Rfc3339)
2019        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
2020}
2021
2022pub fn parse_rfc3339_ms(value: &str) -> Result<i64, String> {
2023    let ts = OffsetDateTime::parse(value, &Rfc3339)
2024        .map_err(|error| format!("invalid RFC3339 timestamp '{value}': {error}"))?;
2025    Ok(ts.unix_timestamp_nanos() as i64 / 1_000_000)
2026}
2027
2028fn runtime_topic() -> Result<Topic, String> {
2029    Topic::new(PERSONA_RUNTIME_TOPIC).map_err(|error| error.to_string())
2030}
2031
2032#[cfg(test)]
2033mod tests {
2034    use super::*;
2035    use crate::event_log::{AnyEventLog, MemoryEventLog};
2036    use std::sync::Mutex;
2037
2038    struct CapturingValueSink {
2039        events: Arc<Mutex<Vec<PersonaValueEvent>>>,
2040    }
2041
2042    impl PersonaValueSink for CapturingValueSink {
2043        fn handle_value_event(&self, event: &PersonaValueEvent) {
2044            self.events.lock().unwrap().push(event.clone());
2045        }
2046    }
2047
2048    fn binding() -> PersonaRuntimeBinding {
2049        PersonaRuntimeBinding {
2050            name: "merge_captain".to_string(),
2051            template_ref: Some("software_factory@v0".to_string()),
2052            entry_workflow: "workflows/merge.harn#run".to_string(),
2053            schedules: vec!["*/30 * * * *".to_string()],
2054            triggers: vec!["github.pr_opened".to_string()],
2055            budget: PersonaBudgetPolicy {
2056                daily_usd: Some(0.02),
2057                hourly_usd: None,
2058                run_usd: Some(0.02),
2059                max_tokens: Some(100),
2060            },
2061            stages: Vec::new(),
2062        }
2063    }
2064
2065    fn log() -> Arc<AnyEventLog> {
2066        Arc::new(AnyEventLog::Memory(MemoryEventLog::new(64)))
2067    }
2068
2069    #[tokio::test]
2070    async fn schedule_tick_records_lifecycle_status_and_receipt() {
2071        let log = log();
2072        let binding = binding();
2073        let now = parse_rfc3339_ms("2026-04-24T12:30:00Z").unwrap();
2074        let receipt = fire_schedule(
2075            &log,
2076            &binding,
2077            PersonaRunCost {
2078                cost_usd: 0.01,
2079                tokens: 10,
2080                ..Default::default()
2081            },
2082            now,
2083        )
2084        .await
2085        .unwrap();
2086        assert_eq!(receipt.status, "completed");
2087        assert!(receipt.lease.is_some());
2088        let status = persona_status(&log, &binding, now).await.unwrap();
2089        assert_eq!(status.state, PersonaLifecycleState::Idle);
2090        assert_eq!(status.last_run.as_deref(), Some("2026-04-24T12:30:00Z"));
2091        assert!(status.next_scheduled_run.is_some());
2092        assert_eq!(status.budget.spent_today_usd, 0.01);
2093    }
2094
2095    #[tokio::test]
2096    async fn paused_personas_queue_and_resume_drains_once() {
2097        let log = log();
2098        let binding = binding();
2099        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2100        pause_persona(&log, &binding, now).await.unwrap();
2101        let receipt = fire_trigger(
2102            &log,
2103            &binding,
2104            "github",
2105            "pull_request",
2106            BTreeMap::from([
2107                ("repository".to_string(), "burin-labs/harn".to_string()),
2108                ("number".to_string(), "462".to_string()),
2109            ]),
2110            PersonaRunCost::default(),
2111            now,
2112        )
2113        .await
2114        .unwrap();
2115        assert_eq!(receipt.status, "queued");
2116        assert_eq!(
2117            persona_status(&log, &binding, now)
2118                .await
2119                .unwrap()
2120                .queued_events,
2121            1
2122        );
2123        let status = resume_persona(&log, &binding, now + 1000).await.unwrap();
2124        assert_eq!(status.state, PersonaLifecycleState::Idle);
2125        assert_eq!(status.queued_events, 0);
2126    }
2127
2128    #[tokio::test]
2129    async fn resumed_queued_work_reuses_original_budget_cost() {
2130        let log = log();
2131        let mut binding = binding();
2132        binding.budget.run_usd = Some(0.01);
2133        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2134        pause_persona(&log, &binding, now).await.unwrap();
2135        let queued = fire_trigger(
2136            &log,
2137            &binding,
2138            "github",
2139            "pull_request",
2140            BTreeMap::from([
2141                ("repository".to_string(), "burin-labs/harn".to_string()),
2142                ("number".to_string(), "1379".to_string()),
2143            ]),
2144            PersonaRunCost {
2145                cost_usd: 0.02,
2146                tokens: 1,
2147                ..Default::default()
2148            },
2149            now + 1,
2150        )
2151        .await
2152        .unwrap();
2153        assert_eq!(queued.status, "queued");
2154
2155        let status = resume_persona(&log, &binding, now + 2).await.unwrap();
2156
2157        assert_eq!(status.budget.reason.as_deref(), Some("run_usd"));
2158        assert!(status
2159            .last_error
2160            .as_deref()
2161            .is_some_and(|error| error.contains("run_usd")));
2162        assert_eq!(status.queued_events, 1);
2163    }
2164
2165    #[tokio::test]
2166    async fn duplicate_trigger_envelope_is_not_processed_twice() {
2167        let log = log();
2168        let binding = binding();
2169        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2170        let metadata = BTreeMap::from([
2171            ("repository".to_string(), "burin-labs/harn".to_string()),
2172            ("number".to_string(), "462".to_string()),
2173        ]);
2174        let first = fire_trigger(
2175            &log,
2176            &binding,
2177            "github",
2178            "pull_request",
2179            metadata.clone(),
2180            PersonaRunCost::default(),
2181            now,
2182        )
2183        .await
2184        .unwrap();
2185        let second = fire_trigger(
2186            &log,
2187            &binding,
2188            "github",
2189            "pull_request",
2190            metadata,
2191            PersonaRunCost::default(),
2192            now + 1000,
2193        )
2194        .await
2195        .unwrap();
2196        assert_eq!(first.status, "completed");
2197        assert_eq!(second.status, "duplicate");
2198        assert!(second.lease.is_none());
2199    }
2200
2201    #[tokio::test]
2202    async fn disabled_personas_dead_letter_events() {
2203        let log = log();
2204        let binding = binding();
2205        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2206        disable_persona(&log, &binding, now).await.unwrap();
2207        let receipt = fire_trigger(
2208            &log,
2209            &binding,
2210            "slack",
2211            "message",
2212            BTreeMap::from([
2213                ("channel".to_string(), "C123".to_string()),
2214                ("ts".to_string(), "1713988800.000100".to_string()),
2215            ]),
2216            PersonaRunCost::default(),
2217            now,
2218        )
2219        .await
2220        .unwrap();
2221        assert_eq!(receipt.status, "dead_lettered");
2222        let status = persona_status(&log, &binding, now).await.unwrap();
2223        assert_eq!(status.state, PersonaLifecycleState::Disabled);
2224        assert_eq!(status.disabled_events, 1);
2225    }
2226
2227    #[tokio::test]
2228    async fn budget_exhaustion_blocks_expensive_work() {
2229        let log = log();
2230        let mut binding = binding();
2231        binding.budget.daily_usd = Some(0.01);
2232        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2233        let receipt = fire_trigger(
2234            &log,
2235            &binding,
2236            "linear",
2237            "issue",
2238            BTreeMap::from([("issue_key".to_string(), "HAR-462".to_string())]),
2239            PersonaRunCost {
2240                cost_usd: 0.02,
2241                tokens: 1,
2242                ..Default::default()
2243            },
2244            now,
2245        )
2246        .await
2247        .unwrap();
2248        assert_eq!(receipt.status, "budget_exhausted");
2249        let status = persona_status(&log, &binding, now).await.unwrap();
2250        assert_eq!(status.budget.reason.as_deref(), Some("daily_usd"));
2251        assert!(status.budget.exhausted);
2252        assert!(status.last_error.as_deref().unwrap().contains("daily_usd"));
2253    }
2254
2255    #[tokio::test]
2256    async fn deterministic_predicate_hit_emits_value_event_with_avoided_cost() {
2257        let log = log();
2258        let binding = binding();
2259        let captured = Arc::new(Mutex::new(Vec::new()));
2260        let _registration = register_persona_value_sink(Arc::new(CapturingValueSink {
2261            events: captured.clone(),
2262        }));
2263        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2264
2265        let receipt = fire_trigger(
2266            &log,
2267            &binding,
2268            "github",
2269            "pull_request",
2270            BTreeMap::from([
2271                ("repository".to_string(), "burin-labs/harn".to_string()),
2272                ("number".to_string(), "715".to_string()),
2273            ]),
2274            PersonaRunCost {
2275                avoided_cost_usd: 0.0042,
2276                deterministic_steps: 1,
2277                metadata: json!({
2278                    "predicate": "pr_already_green",
2279                    "would_have_called_model": "gpt-5.4-mini",
2280                }),
2281                ..Default::default()
2282            },
2283            now,
2284        )
2285        .await
2286        .unwrap();
2287
2288        let run_id = receipt.run_id.expect("completed run has run_id");
2289        let events = captured.lock().unwrap().clone();
2290        let deterministic = events
2291            .iter()
2292            .find(|event| {
2293                event.kind == PersonaValueEventKind::DeterministicExecution
2294                    && event.run_id == Some(run_id)
2295            })
2296            .expect("deterministic execution value event");
2297        assert_eq!(deterministic.persona_id, "merge_captain");
2298        assert_eq!(
2299            deterministic.template_ref.as_deref(),
2300            Some("software_factory@v0")
2301        );
2302        assert_eq!(deterministic.run_id, Some(run_id));
2303        assert_eq!(deterministic.paid_cost_usd, 0.0);
2304        assert_eq!(deterministic.avoided_cost_usd, 0.0042);
2305        assert_eq!(deterministic.deterministic_steps, 1);
2306        assert_eq!(
2307            deterministic.metadata["predicate"].as_str(),
2308            Some("pr_already_green")
2309        );
2310
2311        let persisted = read_persona_events(&log, &binding.name).await.unwrap();
2312        assert!(persisted.iter().any(|(_, event)| {
2313            event.kind == "persona.value.deterministic_execution"
2314                && event.payload["avoided_cost_usd"] == json!(0.0042)
2315        }));
2316    }
2317
2318    #[tokio::test]
2319    async fn frontier_escalation_run_emits_value_event_with_paid_cost() {
2320        let log = log();
2321        let binding = binding();
2322        let captured = Arc::new(Mutex::new(Vec::new()));
2323        let _registration = register_persona_value_sink(Arc::new(CapturingValueSink {
2324            events: captured.clone(),
2325        }));
2326        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2327
2328        let receipt = fire_trigger(
2329            &log,
2330            &binding,
2331            "linear",
2332            "issue",
2333            BTreeMap::from([("issue_key".to_string(), "HAR-715".to_string())]),
2334            PersonaRunCost {
2335                cost_usd: 0.011,
2336                tokens: 20,
2337                llm_steps: 1,
2338                frontier_escalations: 1,
2339                metadata: json!({
2340                    "frontier_model": "gpt-5.4",
2341                    "escalation_reason": "high_risk_merge",
2342                }),
2343                ..Default::default()
2344            },
2345            now,
2346        )
2347        .await
2348        .unwrap();
2349
2350        let run_id = receipt.run_id.expect("completed run has run_id");
2351        let events = captured.lock().unwrap().clone();
2352        let escalation = events
2353            .iter()
2354            .find(|event| {
2355                event.kind == PersonaValueEventKind::FrontierEscalation
2356                    && event.run_id == Some(run_id)
2357            })
2358            .expect("frontier escalation value event");
2359        assert_eq!(escalation.run_id, Some(run_id));
2360        assert_eq!(escalation.paid_cost_usd, 0.011);
2361        assert_eq!(escalation.avoided_cost_usd, 0.0);
2362        assert_eq!(escalation.llm_steps, 1);
2363        assert_eq!(
2364            escalation.metadata["frontier_model"].as_str(),
2365            Some("gpt-5.4")
2366        );
2367
2368        let completion = events
2369            .iter()
2370            .find(|event| {
2371                event.kind == PersonaValueEventKind::RunCompleted && event.run_id == Some(run_id)
2372            })
2373            .expect("run completed value event");
2374        assert_eq!(completion.paid_cost_usd, 0.0);
2375    }
2376
2377    struct CapturingSupervisionSink {
2378        events: Arc<Mutex<Vec<PersonaSupervisionEvent>>>,
2379    }
2380
2381    impl PersonaSupervisionSink for CapturingSupervisionSink {
2382        fn handle_supervision_event(&self, event: &PersonaSupervisionEvent) {
2383            self.events.lock().unwrap().push(event.clone());
2384        }
2385    }
2386
2387    fn pr_metadata(repository: &str, number: &str) -> BTreeMap<String, String> {
2388        BTreeMap::from([
2389            ("repository".to_string(), repository.to_string()),
2390            ("number".to_string(), number.to_string()),
2391        ])
2392    }
2393
2394    fn binding_named(name: &str) -> PersonaRuntimeBinding {
2395        PersonaRuntimeBinding {
2396            name: name.to_string(),
2397            ..binding()
2398        }
2399    }
2400
2401    fn supervision_events_for(
2402        captured: &Arc<Mutex<Vec<PersonaSupervisionEvent>>>,
2403        persona: &str,
2404    ) -> Vec<PersonaSupervisionEvent> {
2405        captured
2406            .lock()
2407            .unwrap()
2408            .iter()
2409            .filter(|event| match event {
2410                PersonaSupervisionEvent::QueuePosition(update) => update.persona_id == persona,
2411                PersonaSupervisionEvent::RepairWorkerStatus(update) => update.persona_id == persona,
2412                PersonaSupervisionEvent::Receipt(update) => update.persona_id == persona,
2413                PersonaSupervisionEvent::Checkpoint(update) => update.persona_id == persona,
2414            })
2415            .cloned()
2416            .collect()
2417    }
2418
2419    async fn drive_pause_then_resume(binding: &PersonaRuntimeBinding, now: i64) {
2420        let log = log();
2421        pause_persona(&log, binding, now).await.unwrap();
2422        let _ = fire_trigger(
2423            &log,
2424            binding,
2425            "github",
2426            "pull_request",
2427            pr_metadata("burin-labs/harn", "1480"),
2428            PersonaRunCost::default(),
2429            now,
2430        )
2431        .await
2432        .unwrap();
2433        let _ = resume_persona(&log, binding, now + 1).await.unwrap();
2434        let _ = restore_persona_checkpoint(
2435            &log,
2436            binding,
2437            PersonaCheckpointRestoreRequest {
2438                checkpoint_id: "cp_42".to_string(),
2439                work_key: Some("github:burin-labs/harn:pr:1480".to_string()),
2440                resumed_from: Some(PersonaCheckpointResume {
2441                    note: Some("resumed from cp 42".to_string()),
2442                    ..Default::default()
2443                }),
2444            },
2445            now + 2,
2446        )
2447        .await
2448        .unwrap();
2449        let _ = report_repair_worker_status(
2450            &log,
2451            binding,
2452            PersonaRepairWorkerStatusUpdate {
2453                persona_id: String::new(),
2454                template_ref: None,
2455                repair_worker_id: "rw_42".to_string(),
2456                lifecycle: PersonaRepairWorkerLifecycle::Running,
2457                work_key: Some("github:burin-labs/harn:pr:1480".to_string()),
2458                lease_id: Some("persona_lease_xyz".to_string()),
2459                scratchpad_url: Some("https://factory.local/rw_42".to_string()),
2460                last_heartbeat_ms: 0,
2461                occurred_at_ms: 0,
2462            },
2463            now + 3,
2464        )
2465        .await
2466        .unwrap();
2467    }
2468
2469    #[tokio::test]
2470    async fn supervision_sink_emits_queue_position_and_receipt() {
2471        let captured = Arc::new(Mutex::new(Vec::new()));
2472        let _registration = register_persona_supervision_sink(Arc::new(CapturingSupervisionSink {
2473            events: captured.clone(),
2474        }));
2475        let log = log();
2476        let binding = binding_named("supervision_sink_emits_queue_position_and_receipt");
2477        let now = parse_rfc3339_ms("2026-05-01T00:00:00Z").unwrap();
2478
2479        pause_persona(&log, &binding, now).await.unwrap();
2480        fire_trigger(
2481            &log,
2482            &binding,
2483            "github",
2484            "pull_request",
2485            pr_metadata("burin-labs/harn", "1480"),
2486            PersonaRunCost::default(),
2487            now + 100,
2488        )
2489        .await
2490        .unwrap();
2491        resume_persona(&log, &binding, now + 200).await.unwrap();
2492
2493        let events = supervision_events_for(&captured, &binding.name);
2494        let queue_events: Vec<_> = events
2495            .iter()
2496            .filter_map(|event| match event {
2497                PersonaSupervisionEvent::QueuePosition(update) => Some(update.clone()),
2498                _ => None,
2499            })
2500            .collect();
2501        assert_eq!(queue_events.len(), 2, "enqueue + drain emitted");
2502        assert_eq!(queue_events[0].position, 1);
2503        assert_eq!(queue_events[0].queue_depth, 1);
2504        assert_eq!(queue_events[1].position, 0);
2505        assert_eq!(queue_events[1].queue_depth, 0);
2506        let receipt_events: Vec<_> = events
2507            .iter()
2508            .filter_map(|event| match event {
2509                PersonaSupervisionEvent::Receipt(update) => Some(update.clone()),
2510                _ => None,
2511            })
2512            .collect();
2513        assert_eq!(receipt_events.len(), 2, "queued + drained receipt");
2514        assert_eq!(receipt_events[0].receipt.status, "queued");
2515        assert_eq!(receipt_events[1].receipt.status, "completed");
2516        for event in &receipt_events {
2517            assert_eq!(event.receipt.persona, binding.name);
2518            assert_eq!(event.persona_id, binding.name);
2519            assert_eq!(event.template_ref.as_deref(), Some("software_factory@v0"));
2520        }
2521        let persisted_kinds: Vec<_> = read_persona_events(&log, &binding.name)
2522            .await
2523            .unwrap()
2524            .into_iter()
2525            .map(|(_, event)| event.kind)
2526            .collect();
2527        assert!(
2528            persisted_kinds
2529                .iter()
2530                .any(|kind| kind == "persona.supervision.queue_position"),
2531            "queue_position supervision events should be durable: {persisted_kinds:?}"
2532        );
2533        assert!(
2534            persisted_kinds
2535                .iter()
2536                .any(|kind| kind == "persona.supervision.receipt"),
2537            "receipt supervision events should be durable: {persisted_kinds:?}"
2538        );
2539    }
2540
2541    #[tokio::test]
2542    async fn supervision_sink_emits_repair_worker_status_idempotently() {
2543        let captured = Arc::new(Mutex::new(Vec::new()));
2544        let _registration = register_persona_supervision_sink(Arc::new(CapturingSupervisionSink {
2545            events: captured.clone(),
2546        }));
2547        let log = log();
2548        let binding = binding_named("supervision_sink_emits_repair_worker_status_idempotently");
2549        let now = parse_rfc3339_ms("2026-05-01T01:00:00Z").unwrap();
2550        let update = PersonaRepairWorkerStatusUpdate {
2551            persona_id: String::new(),
2552            template_ref: None,
2553            repair_worker_id: "rw_test".to_string(),
2554            lifecycle: PersonaRepairWorkerLifecycle::Running,
2555            work_key: Some("github:burin-labs/harn:pr:1480".to_string()),
2556            lease_id: Some("persona_lease_abc".to_string()),
2557            scratchpad_url: Some("https://factory.local/rw_test".to_string()),
2558            last_heartbeat_ms: 0,
2559            occurred_at_ms: 0,
2560        };
2561        let first = report_repair_worker_status(&log, &binding, update.clone(), now)
2562            .await
2563            .unwrap();
2564        let second = report_repair_worker_status(&log, &binding, update.clone(), now + 5)
2565            .await
2566            .unwrap();
2567        assert!(first);
2568        assert!(!second, "second identical lifecycle is idempotent");
2569
2570        let mut next = update.clone();
2571        next.lifecycle = PersonaRepairWorkerLifecycle::Succeeded;
2572        let third = report_repair_worker_status(&log, &binding, next, now + 10)
2573            .await
2574            .unwrap();
2575        assert!(third);
2576
2577        let kinds: Vec<_> = supervision_events_for(&captured, &binding.name)
2578            .into_iter()
2579            .filter_map(|event| match event {
2580                PersonaSupervisionEvent::RepairWorkerStatus(update) => Some(update.lifecycle),
2581                _ => None,
2582            })
2583            .collect();
2584        assert_eq!(
2585            kinds,
2586            vec![
2587                PersonaRepairWorkerLifecycle::Running,
2588                PersonaRepairWorkerLifecycle::Succeeded
2589            ]
2590        );
2591    }
2592
2593    #[tokio::test]
2594    async fn supervision_sink_emits_checkpoint_restore_ack() {
2595        let captured = Arc::new(Mutex::new(Vec::new()));
2596        let _registration = register_persona_supervision_sink(Arc::new(CapturingSupervisionSink {
2597            events: captured.clone(),
2598        }));
2599        let log = log();
2600        let binding = binding_named("supervision_sink_emits_checkpoint_restore_ack");
2601        let now = parse_rfc3339_ms("2026-05-01T02:00:00Z").unwrap();
2602        fire_trigger(
2603            &log,
2604            &binding,
2605            "github",
2606            "pull_request",
2607            pr_metadata("burin-labs/harn", "1480"),
2608            PersonaRunCost::default(),
2609            now,
2610        )
2611        .await
2612        .unwrap();
2613
2614        let outcome = restore_persona_checkpoint(
2615            &log,
2616            &binding,
2617            PersonaCheckpointRestoreRequest {
2618                checkpoint_id: "cp_1".to_string(),
2619                work_key: Some("github:burin-labs/harn:pr:1480".to_string()),
2620                resumed_from: None,
2621            },
2622            now + 100,
2623        )
2624        .await
2625        .unwrap();
2626        assert!(outcome.acked);
2627        assert_eq!(outcome.update.checkpoint_id, "cp_1");
2628        let resume = outcome
2629            .update
2630            .resumed_from
2631            .as_ref()
2632            .expect("resume coordinates default-derived from status");
2633        assert_eq!(resume.last_run_ms, Some(now));
2634
2635        let replay = restore_persona_checkpoint(
2636            &log,
2637            &binding,
2638            PersonaCheckpointRestoreRequest {
2639                checkpoint_id: "cp_1".to_string(),
2640                work_key: None,
2641                resumed_from: None,
2642            },
2643            now + 200,
2644        )
2645        .await
2646        .unwrap();
2647        assert!(!replay.acked, "duplicate restore is a no-op ack");
2648        assert_eq!(replay.update.occurred_at_ms, now + 100);
2649
2650        let ack_events: Vec<_> = supervision_events_for(&captured, &binding.name)
2651            .into_iter()
2652            .filter_map(|event| match event {
2653                PersonaSupervisionEvent::Checkpoint(update) => Some(update),
2654                _ => None,
2655            })
2656            .collect();
2657        assert_eq!(ack_events.len(), 1, "ack emitted once, replay suppressed");
2658        assert_eq!(ack_events[0].action, PersonaCheckpointAction::RestoreAcked);
2659    }
2660
2661    #[tokio::test]
2662    async fn supervision_sink_replay_is_deterministic_under_recorded_clock() {
2663        use harn_clock::{ClockEventLog, PausedClock, RecordedClock};
2664        use time::OffsetDateTime;
2665
2666        let now_ms = parse_rfc3339_ms("2026-05-01T03:00:00Z").unwrap();
2667        async fn drive(now_ms: i64) -> (Vec<PersonaSupervisionEvent>, Vec<harn_clock::ClockEvent>) {
2668            let captured = Arc::new(Mutex::new(Vec::new()));
2669            let _registration =
2670                register_persona_supervision_sink(Arc::new(CapturingSupervisionSink {
2671                    events: captured.clone(),
2672                }));
2673            let paused = PausedClock::new(
2674                OffsetDateTime::from_unix_timestamp_nanos((now_ms as i128) * 1_000_000).unwrap(),
2675            );
2676            let recorded = Arc::new(RecordedClock::new(paused, Arc::new(ClockEventLog::new())));
2677            let binding = binding_named("supervision_replay_persona");
2678            let ts = harn_clock::now_wall_ms(&*recorded);
2679            drive_pause_then_resume(&binding, ts).await;
2680            let clock_log = recorded.log().snapshot();
2681            let events = supervision_events_for(&captured, &binding.name);
2682            (events, clock_log)
2683        }
2684
2685        let (events_a, clock_a) = drive(now_ms).await;
2686        let (events_b, clock_b) = drive(now_ms).await;
2687        // Run receipts carry per-run `run_id`s (UUIDv7) and lease ids that
2688        // differ across runs. Normalize them so the deterministic envelope
2689        // remains comparable.
2690        fn normalize(event: &PersonaSupervisionEvent) -> PersonaSupervisionEvent {
2691            match event.clone() {
2692                PersonaSupervisionEvent::Receipt(mut update) => {
2693                    update.receipt.run_id = None;
2694                    if let Some(lease) = update.receipt.lease.as_mut() {
2695                        lease.id = "lease".to_string();
2696                    }
2697                    update.receipt.budget_receipt_id = update
2698                        .receipt
2699                        .budget_receipt_id
2700                        .map(|_| "budget".to_string());
2701                    PersonaSupervisionEvent::Receipt(update)
2702                }
2703                PersonaSupervisionEvent::Checkpoint(mut update) => {
2704                    if let Some(resume) = update.resumed_from.as_mut() {
2705                        resume.run_id = None;
2706                        resume.lease_id = None;
2707                    }
2708                    PersonaSupervisionEvent::Checkpoint(update)
2709                }
2710                other => other,
2711            }
2712        }
2713        let a: Vec<_> = events_a.iter().map(normalize).collect();
2714        let b: Vec<_> = events_b.iter().map(normalize).collect();
2715        assert_eq!(a, b, "supervision sink emits identical event envelopes");
2716        assert_eq!(
2717            clock_a, clock_b,
2718            "recorded clock observation log is identical across replays"
2719        );
2720    }
2721}