Skip to main content

harn_vm/
personas.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::sync::{Arc, OnceLock, RwLock};
4
5use chrono::{TimeZone, Utc};
6use croner::Cron;
7use serde::{Deserialize, Serialize};
8use serde_json::json;
9use time::format_description::well_known::Rfc3339;
10use time::OffsetDateTime;
11use uuid::Uuid;
12
13use crate::event_log::{AnyEventLog, EventLog, LogEvent, Topic};
14
15pub const PERSONA_RUNTIME_TOPIC: &str = "persona.runtime.events";
16
17const DEFAULT_LEASE_TTL_MS: i64 = 5 * 60 * 1000;
18
19#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
20#[serde(rename_all = "snake_case")]
21pub enum PersonaLifecycleState {
22    Inactive,
23    Starting,
24    #[default]
25    Idle,
26    Running,
27    Paused,
28    Draining,
29    Failed,
30    Disabled,
31}
32
33impl PersonaLifecycleState {
34    pub fn as_str(self) -> &'static str {
35        match self {
36            Self::Inactive => "inactive",
37            Self::Starting => "starting",
38            Self::Idle => "idle",
39            Self::Running => "running",
40            Self::Paused => "paused",
41            Self::Draining => "draining",
42            Self::Failed => "failed",
43            Self::Disabled => "disabled",
44        }
45    }
46}
47
48#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
49pub struct PersonaBudgetPolicy {
50    pub daily_usd: Option<f64>,
51    pub hourly_usd: Option<f64>,
52    pub run_usd: Option<f64>,
53    pub max_tokens: Option<u64>,
54}
55
56#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
57pub struct PersonaRuntimeBinding {
58    pub name: String,
59    #[serde(default)]
60    pub template_ref: Option<String>,
61    pub entry_workflow: String,
62    #[serde(default)]
63    pub schedules: Vec<String>,
64    #[serde(default)]
65    pub triggers: Vec<String>,
66    #[serde(default)]
67    pub budget: PersonaBudgetPolicy,
68}
69
70#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
71pub struct PersonaLease {
72    pub id: String,
73    pub holder: String,
74    pub work_key: String,
75    pub acquired_at_ms: i64,
76    pub expires_at_ms: i64,
77}
78
79#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
80pub struct PersonaBudgetStatus {
81    pub daily_usd: Option<f64>,
82    pub hourly_usd: Option<f64>,
83    pub run_usd: Option<f64>,
84    pub max_tokens: Option<u64>,
85    pub spent_today_usd: f64,
86    pub spent_this_hour_usd: f64,
87    pub spent_last_run_usd: f64,
88    pub tokens_today: u64,
89    pub remaining_today_usd: Option<f64>,
90    pub remaining_hour_usd: Option<f64>,
91    pub exhausted: bool,
92    pub reason: Option<String>,
93    pub last_receipt_id: Option<String>,
94}
95
96#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
97pub struct PersonaStatus {
98    pub name: String,
99    #[serde(default)]
100    pub template_ref: Option<String>,
101    pub state: PersonaLifecycleState,
102    pub entry_workflow: String,
103    #[serde(default)]
104    pub role: String,
105    #[serde(default)]
106    pub current_assignment: Option<PersonaAssignmentStatus>,
107    pub last_run: Option<String>,
108    pub next_scheduled_run: Option<String>,
109    pub active_lease: Option<PersonaLease>,
110    pub budget: PersonaBudgetStatus,
111    pub last_error: Option<String>,
112    pub queued_events: usize,
113    #[serde(default)]
114    pub queued_work: Vec<PersonaQueuedWork>,
115    #[serde(default)]
116    pub handoff_inbox: Vec<PersonaHandoffInboxItem>,
117    #[serde(default)]
118    pub value_receipts: Vec<PersonaValueReceipt>,
119    pub disabled_events: usize,
120    pub paused_event_policy: String,
121}
122
123#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
124pub struct PersonaAssignmentStatus {
125    pub work_key: String,
126    pub lease_id: String,
127    pub holder: String,
128    pub acquired_at: String,
129    pub expires_at: String,
130}
131
132#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
133pub struct PersonaQueuedWork {
134    pub work_key: String,
135    pub provider: String,
136    pub kind: String,
137    pub queued_at: String,
138    pub reason: String,
139    pub source_event_id: Option<String>,
140    #[serde(default)]
141    pub metadata: BTreeMap<String, String>,
142}
143
144#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
145pub struct PersonaHandoffInboxItem {
146    pub work_key: String,
147    pub handoff_id: Option<String>,
148    pub handoff_kind: Option<String>,
149    pub source_persona: Option<String>,
150    pub task: Option<String>,
151    pub queued_at: String,
152    pub reason: String,
153}
154
155#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
156pub struct PersonaValueReceipt {
157    pub kind: PersonaValueEventKind,
158    pub run_id: Option<Uuid>,
159    pub occurred_at: String,
160    pub paid_cost_usd: f64,
161    pub avoided_cost_usd: f64,
162    pub deterministic_steps: i64,
163    pub llm_steps: i64,
164    pub metadata: serde_json::Value,
165}
166
167#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
168pub struct PersonaTriggerEnvelope {
169    pub provider: String,
170    pub kind: String,
171    pub subject_key: String,
172    pub source_event_id: Option<String>,
173    pub received_at_ms: i64,
174    #[serde(default)]
175    pub metadata: BTreeMap<String, String>,
176    #[serde(default)]
177    pub raw: serde_json::Value,
178}
179
180#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
181pub struct PersonaRunReceipt {
182    pub status: String,
183    pub persona: String,
184    #[serde(default)]
185    pub run_id: Option<Uuid>,
186    pub work_key: String,
187    pub lease: Option<PersonaLease>,
188    pub queued: bool,
189    pub error: Option<String>,
190    pub budget_receipt_id: Option<String>,
191}
192
193#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
194pub struct PersonaRunCost {
195    pub cost_usd: f64,
196    pub tokens: u64,
197    #[serde(default)]
198    pub avoided_cost_usd: f64,
199    #[serde(default)]
200    pub deterministic_steps: i64,
201    #[serde(default)]
202    pub llm_steps: i64,
203    #[serde(default)]
204    pub frontier_escalations: i64,
205    #[serde(default)]
206    pub metadata: serde_json::Value,
207}
208
209#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
210#[serde(rename_all = "snake_case")]
211pub enum PersonaValueEventKind {
212    RunStarted,
213    RunCompleted,
214    AcceptedOutcome,
215    FrontierEscalation,
216    DeterministicExecution,
217    PromotionSavings,
218    ApprovalWait,
219}
220
221impl PersonaValueEventKind {
222    pub fn as_str(self) -> &'static str {
223        match self {
224            Self::RunStarted => "run_started",
225            Self::RunCompleted => "run_completed",
226            Self::AcceptedOutcome => "accepted_outcome",
227            Self::FrontierEscalation => "frontier_escalation",
228            Self::DeterministicExecution => "deterministic_execution",
229            Self::PromotionSavings => "promotion_savings",
230            Self::ApprovalWait => "approval_wait",
231        }
232    }
233}
234
235#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
236pub struct PersonaValueEvent {
237    pub persona_id: String,
238    pub template_ref: Option<String>,
239    pub run_id: Option<Uuid>,
240    pub kind: PersonaValueEventKind,
241    pub paid_cost_usd: f64,
242    pub avoided_cost_usd: f64,
243    pub deterministic_steps: i64,
244    pub llm_steps: i64,
245    pub metadata: serde_json::Value,
246    pub occurred_at: OffsetDateTime,
247}
248
249pub trait PersonaValueSink: Send + Sync {
250    fn handle_value_event(&self, event: &PersonaValueEvent);
251}
252
253/// Append-only multiplexed feed mirroring the harn-cloud
254/// `persona_supervision_events` projection. Sinks see the runtime-sourced
255/// `queue_position`, `repair_worker_status`, `receipt`, and
256/// `checkpoint` (restore-ack) variants; supervisor-sourced variants
257/// (`control`, `feedback`, `approval`, `handoff`, and checkpoint writes)
258/// originate on the hosted side and aren't routed here.
259#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
260#[serde(tag = "update_kind", rename_all = "snake_case")]
261pub enum PersonaSupervisionEvent {
262    QueuePosition(PersonaQueuePositionUpdate),
263    RepairWorkerStatus(PersonaRepairWorkerStatusUpdate),
264    Receipt(PersonaReceiptUpdate),
265    Checkpoint(PersonaCheckpointUpdate),
266}
267
268impl PersonaSupervisionEvent {
269    pub fn update_kind(&self) -> &'static str {
270        match self {
271            Self::QueuePosition(_) => "queue_position",
272            Self::RepairWorkerStatus(_) => "repair_worker_status",
273            Self::Receipt(_) => "receipt",
274            Self::Checkpoint(_) => "checkpoint",
275        }
276    }
277
278    pub fn persona_id(&self) -> &str {
279        match self {
280            Self::QueuePosition(update) => &update.persona_id,
281            Self::RepairWorkerStatus(update) => &update.persona_id,
282            Self::Receipt(update) => &update.persona_id,
283            Self::Checkpoint(update) => &update.persona_id,
284        }
285    }
286
287    pub fn template_ref(&self) -> Option<&str> {
288        match self {
289            Self::QueuePosition(update) => update.template_ref.as_deref(),
290            Self::RepairWorkerStatus(update) => update.template_ref.as_deref(),
291            Self::Receipt(update) => update.template_ref.as_deref(),
292            Self::Checkpoint(update) => update.template_ref.as_deref(),
293        }
294    }
295
296    pub fn occurred_at_ms(&self) -> i64 {
297        match self {
298            Self::QueuePosition(update) => update.occurred_at_ms,
299            Self::RepairWorkerStatus(update) => update.occurred_at_ms,
300            Self::Receipt(update) => update.occurred_at_ms,
301            Self::Checkpoint(update) => update.occurred_at_ms,
302        }
303    }
304}
305
306#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
307pub struct PersonaQueuePositionUpdate {
308    pub persona_id: String,
309    #[serde(default)]
310    pub template_ref: Option<String>,
311    pub work_key: String,
312    pub queue_depth: i64,
313    /// 1-indexed position of `work_key`; `0` means the item left the queue.
314    pub position: i64,
315    pub queued_at_ms: i64,
316    pub occurred_at_ms: i64,
317}
318
319#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
320#[serde(rename_all = "snake_case")]
321pub enum PersonaRepairWorkerLifecycle {
322    Pending,
323    Running,
324    Verifying,
325    Pushing,
326    Succeeded,
327    Failed,
328    Cancelled,
329}
330
331impl PersonaRepairWorkerLifecycle {
332    pub fn as_str(self) -> &'static str {
333        match self {
334            Self::Pending => "pending",
335            Self::Running => "running",
336            Self::Verifying => "verifying",
337            Self::Pushing => "pushing",
338            Self::Succeeded => "succeeded",
339            Self::Failed => "failed",
340            Self::Cancelled => "cancelled",
341        }
342    }
343}
344
345#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
346pub struct PersonaRepairWorkerStatusUpdate {
347    pub persona_id: String,
348    #[serde(default)]
349    pub template_ref: Option<String>,
350    pub repair_worker_id: String,
351    pub lifecycle: PersonaRepairWorkerLifecycle,
352    #[serde(default)]
353    pub work_key: Option<String>,
354    #[serde(default)]
355    pub lease_id: Option<String>,
356    #[serde(default)]
357    pub scratchpad_url: Option<String>,
358    pub last_heartbeat_ms: i64,
359    pub occurred_at_ms: i64,
360}
361
362#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
363pub struct PersonaReceiptUpdate {
364    pub persona_id: String,
365    #[serde(default)]
366    pub template_ref: Option<String>,
367    pub receipt: PersonaRunReceipt,
368    pub occurred_at_ms: i64,
369}
370
371#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
372pub struct PersonaCheckpointUpdate {
373    pub persona_id: String,
374    #[serde(default)]
375    pub template_ref: Option<String>,
376    pub action: PersonaCheckpointAction,
377    pub checkpoint_id: String,
378    #[serde(default)]
379    pub work_key: Option<String>,
380    /// Coordinates the runtime actually resumed from when acking a restore.
381    #[serde(default)]
382    pub resumed_from: Option<PersonaCheckpointResume>,
383    pub occurred_at_ms: i64,
384}
385
386#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
387#[serde(rename_all = "snake_case")]
388pub enum PersonaCheckpointAction {
389    RestoreAcked,
390}
391
392impl PersonaCheckpointAction {
393    pub fn as_str(self) -> &'static str {
394        match self {
395            Self::RestoreAcked => "restore_acked",
396        }
397    }
398}
399
400#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
401pub struct PersonaCheckpointResume {
402    pub run_id: Option<Uuid>,
403    pub lease_id: Option<String>,
404    pub last_run_ms: Option<i64>,
405    pub queued_work_keys: Vec<String>,
406    #[serde(default)]
407    pub note: Option<String>,
408}
409
410pub trait PersonaSupervisionSink: Send + Sync {
411    fn handle_supervision_event(&self, event: &PersonaSupervisionEvent);
412}
413
414struct TypedSinkRegistry<T: ?Sized + Send + Sync> {
415    sinks: RwLock<Vec<(u64, Arc<T>)>>,
416    next_id: AtomicU64,
417}
418
419impl<T: ?Sized + Send + Sync> TypedSinkRegistry<T> {
420    const fn new() -> Self {
421        Self {
422            sinks: RwLock::new(Vec::new()),
423            next_id: AtomicU64::new(1),
424        }
425    }
426
427    fn register(&self, sink: Arc<T>) -> u64 {
428        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
429        if let Ok(mut sinks) = self.sinks.write() {
430            sinks.push((id, sink));
431        }
432        id
433    }
434
435    fn unregister(&self, id: u64) {
436        if let Ok(mut sinks) = self.sinks.write() {
437            sinks.retain(|(existing, _)| *existing != id);
438        }
439    }
440
441    fn snapshot(&self) -> Vec<Arc<T>> {
442        self.sinks
443            .read()
444            .map(|sinks| sinks.iter().map(|(_, sink)| Arc::clone(sink)).collect())
445            .unwrap_or_default()
446    }
447}
448
449fn persona_value_sinks() -> &'static TypedSinkRegistry<dyn PersonaValueSink> {
450    static REGISTRY: OnceLock<TypedSinkRegistry<dyn PersonaValueSink>> = OnceLock::new();
451    REGISTRY.get_or_init(TypedSinkRegistry::new)
452}
453
454fn persona_supervision_sinks() -> &'static TypedSinkRegistry<dyn PersonaSupervisionSink> {
455    static REGISTRY: OnceLock<TypedSinkRegistry<dyn PersonaSupervisionSink>> = OnceLock::new();
456    REGISTRY.get_or_init(TypedSinkRegistry::new)
457}
458
459#[must_use = "dropping the registration immediately unregisters the sink"]
460pub struct PersonaValueSinkRegistration {
461    id: u64,
462}
463
464impl Drop for PersonaValueSinkRegistration {
465    fn drop(&mut self) {
466        persona_value_sinks().unregister(self.id);
467    }
468}
469
470pub fn register_persona_value_sink(
471    sink: Arc<dyn PersonaValueSink>,
472) -> PersonaValueSinkRegistration {
473    PersonaValueSinkRegistration {
474        id: persona_value_sinks().register(sink),
475    }
476}
477
478#[must_use = "dropping the registration immediately unregisters the sink"]
479pub struct PersonaSupervisionSinkRegistration {
480    id: u64,
481}
482
483impl Drop for PersonaSupervisionSinkRegistration {
484    fn drop(&mut self) {
485        persona_supervision_sinks().unregister(self.id);
486    }
487}
488
489pub fn register_persona_supervision_sink(
490    sink: Arc<dyn PersonaSupervisionSink>,
491) -> PersonaSupervisionSinkRegistration {
492    PersonaSupervisionSinkRegistration {
493        id: persona_supervision_sinks().register(sink),
494    }
495}
496
497pub async fn persona_status(
498    log: &Arc<AnyEventLog>,
499    binding: &PersonaRuntimeBinding,
500    now_ms: i64,
501) -> Result<PersonaStatus, String> {
502    let events = read_persona_events(log, &binding.name).await?;
503    let mut state = PersonaLifecycleState::Idle;
504    let mut last_run_ms = None;
505    let mut active_lease = None;
506    let mut last_error = None;
507    let mut queued = BTreeSet::<String>::new();
508    let mut completed = BTreeSet::<String>::new();
509    let mut disabled_events = 0usize;
510    let mut budget_receipt = None;
511    let mut budget_exhaustion_reason = None;
512    let mut spent = Vec::<(i64, f64, u64)>::new();
513    let mut queued_work = BTreeMap::<String, PersonaQueuedWork>::new();
514    let mut value_receipts = Vec::<PersonaValueReceipt>::new();
515
516    for (_, event) in events {
517        match event.kind.as_str() {
518            "persona.control.paused" => state = PersonaLifecycleState::Paused,
519            "persona.control.resumed" => state = PersonaLifecycleState::Idle,
520            "persona.control.disabled" => state = PersonaLifecycleState::Disabled,
521            "persona.control.draining" => state = PersonaLifecycleState::Draining,
522            "persona.lease.acquired" => {
523                if let Ok(lease) = serde_json::from_value::<PersonaLease>(event.payload.clone()) {
524                    active_lease = Some(lease);
525                    state = PersonaLifecycleState::Running;
526                }
527            }
528            "persona.lease.released" => {
529                active_lease = None;
530                if !matches!(
531                    state,
532                    PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
533                ) {
534                    state = PersonaLifecycleState::Idle;
535                }
536            }
537            "persona.lease.expired" => {
538                active_lease = None;
539                if !matches!(
540                    state,
541                    PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
542                ) {
543                    state = PersonaLifecycleState::Idle;
544                }
545            }
546            "persona.run.started" => state = PersonaLifecycleState::Running,
547            "persona.run.completed" => {
548                last_run_ms = event
549                    .payload
550                    .get("completed_at_ms")
551                    .and_then(serde_json::Value::as_i64)
552                    .or(Some(event.occurred_at_ms));
553                if let Some(work_key) = event
554                    .payload
555                    .get("work_key")
556                    .and_then(serde_json::Value::as_str)
557                {
558                    completed.insert(work_key.to_string());
559                }
560                if !matches!(
561                    state,
562                    PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
563                ) {
564                    state = PersonaLifecycleState::Idle;
565                }
566            }
567            "persona.run.failed" => {
568                state = PersonaLifecycleState::Failed;
569                last_error = event
570                    .payload
571                    .get("error")
572                    .and_then(serde_json::Value::as_str)
573                    .map(ToString::to_string);
574            }
575            "persona.trigger.queued" => {
576                if let Some(work_key) = event
577                    .payload
578                    .get("work_key")
579                    .and_then(serde_json::Value::as_str)
580                {
581                    queued.insert(work_key.to_string());
582                }
583                if let Some(item) = queued_work_from_event(&event)? {
584                    queued_work.insert(item.work_key.clone(), item);
585                }
586            }
587            "persona.trigger.dead_lettered" => disabled_events += 1,
588            "persona.budget.recorded" => {
589                budget_receipt = event
590                    .payload
591                    .get("receipt_id")
592                    .and_then(serde_json::Value::as_str)
593                    .map(ToString::to_string);
594                spent.push((
595                    event.occurred_at_ms,
596                    event
597                        .payload
598                        .get("cost_usd")
599                        .and_then(serde_json::Value::as_f64)
600                        .unwrap_or_default(),
601                    event
602                        .payload
603                        .get("tokens")
604                        .and_then(serde_json::Value::as_u64)
605                        .unwrap_or_default(),
606                ));
607            }
608            "persona.budget.exhausted" => {
609                budget_exhaustion_reason = event
610                    .payload
611                    .get("reason")
612                    .and_then(serde_json::Value::as_str)
613                    .map(ToString::to_string);
614                last_error = budget_exhaustion_reason
615                    .as_ref()
616                    .map(|reason| format!("persona budget exhausted: {reason}"));
617                budget_receipt = event
618                    .payload
619                    .get("receipt_id")
620                    .and_then(serde_json::Value::as_str)
621                    .map(ToString::to_string);
622            }
623            kind if kind.starts_with("persona.value.") => {
624                if let Some(receipt) = value_receipt_from_event(&event)? {
625                    value_receipts.push(receipt);
626                }
627            }
628            _ => {}
629        }
630    }
631
632    if let Some(lease) = active_lease.as_ref() {
633        if lease.expires_at_ms <= now_ms {
634            active_lease = None;
635            if !matches!(
636                state,
637                PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
638            ) {
639                state = PersonaLifecycleState::Idle;
640            }
641        }
642    }
643
644    queued.retain(|work_key| !completed.contains(work_key));
645    queued_work.retain(|work_key, _| !completed.contains(work_key));
646    let queued_work = queued_work.into_values().collect::<Vec<_>>();
647    let handoff_inbox = queued_work
648        .iter()
649        .filter_map(handoff_inbox_item)
650        .collect::<Vec<_>>();
651
652    let mut budget = budget_status(&binding.budget, &spent, now_ms);
653    if budget.reason.is_none() {
654        if let Some(reason) = budget_exhaustion_reason {
655            budget.exhausted = true;
656            budget.reason = Some(reason);
657        }
658    }
659    if budget.last_receipt_id.is_none() {
660        budget.last_receipt_id = budget_receipt;
661    }
662
663    let current_assignment = active_lease.as_ref().map(assignment_status_from_lease);
664
665    Ok(PersonaStatus {
666        name: binding.name.clone(),
667        template_ref: binding.template_ref.clone(),
668        state,
669        entry_workflow: binding.entry_workflow.clone(),
670        role: binding.name.clone(),
671        current_assignment,
672        last_run: last_run_ms.map(format_ms),
673        next_scheduled_run: next_scheduled_run(binding, last_run_ms, now_ms),
674        active_lease,
675        budget,
676        last_error,
677        queued_events: queued.len(),
678        queued_work,
679        handoff_inbox,
680        value_receipts,
681        disabled_events,
682        paused_event_policy: "queue_then_drain_on_resume".to_string(),
683    })
684}
685
686pub async fn pause_persona(
687    log: &Arc<AnyEventLog>,
688    binding: &PersonaRuntimeBinding,
689    now_ms: i64,
690) -> Result<PersonaStatus, String> {
691    append_persona_event(
692        log,
693        &binding.name,
694        "persona.control.paused",
695        json!({"paused_at_ms": now_ms, "policy": "queue_then_drain_on_resume"}),
696        now_ms,
697    )
698    .await?;
699    persona_status(log, binding, now_ms).await
700}
701
702pub async fn resume_persona(
703    log: &Arc<AnyEventLog>,
704    binding: &PersonaRuntimeBinding,
705    now_ms: i64,
706) -> Result<PersonaStatus, String> {
707    append_persona_event(
708        log,
709        &binding.name,
710        "persona.control.resumed",
711        json!({"resumed_at_ms": now_ms, "drain": true}),
712        now_ms,
713    )
714    .await?;
715    let queued = queued_events(log, &binding.name).await?;
716    for (envelope, cost) in queued {
717        let _ = run_for_envelope(log, binding, envelope, cost, now_ms).await?;
718    }
719    persona_status(log, binding, now_ms).await
720}
721
722pub async fn disable_persona(
723    log: &Arc<AnyEventLog>,
724    binding: &PersonaRuntimeBinding,
725    now_ms: i64,
726) -> Result<PersonaStatus, String> {
727    append_persona_event(
728        log,
729        &binding.name,
730        "persona.control.disabled",
731        json!({"disabled_at_ms": now_ms}),
732        now_ms,
733    )
734    .await?;
735    persona_status(log, binding, now_ms).await
736}
737
738pub async fn fire_schedule(
739    log: &Arc<AnyEventLog>,
740    binding: &PersonaRuntimeBinding,
741    cost: PersonaRunCost,
742    now_ms: i64,
743) -> Result<PersonaRunReceipt, String> {
744    let schedule = binding
745        .schedules
746        .first()
747        .cloned()
748        .unwrap_or_else(|| "manual".to_string());
749    let envelope = PersonaTriggerEnvelope {
750        provider: "schedule".to_string(),
751        kind: "cron.tick".to_string(),
752        subject_key: format!("schedule:{}:{schedule}:{}", binding.name, format_ms(now_ms)),
753        source_event_id: None,
754        received_at_ms: now_ms,
755        metadata: BTreeMap::from([
756            ("persona".to_string(), binding.name.clone()),
757            ("schedule".to_string(), schedule),
758            ("fired_at".to_string(), format_ms(now_ms)),
759        ]),
760        raw: json!({}),
761    };
762    append_persona_event(
763        log,
764        &binding.name,
765        "persona.schedule.fired",
766        json!({"persona": binding.name, "envelope": envelope}),
767        now_ms,
768    )
769    .await?;
770    run_for_envelope(log, binding, envelope, cost, now_ms).await
771}
772
773pub async fn fire_trigger(
774    log: &Arc<AnyEventLog>,
775    binding: &PersonaRuntimeBinding,
776    provider: &str,
777    kind: &str,
778    metadata: BTreeMap<String, String>,
779    cost: PersonaRunCost,
780    now_ms: i64,
781) -> Result<PersonaRunReceipt, String> {
782    let envelope = normalize_trigger_envelope(provider, kind, metadata, now_ms);
783    append_persona_event(
784        log,
785        &binding.name,
786        "persona.trigger.received",
787        json!({"persona": binding.name, "envelope": envelope}),
788        now_ms,
789    )
790    .await?;
791    run_for_envelope(log, binding, envelope, cost, now_ms).await
792}
793
794pub async fn record_persona_spend(
795    log: &Arc<AnyEventLog>,
796    binding: &PersonaRuntimeBinding,
797    cost: PersonaRunCost,
798    now_ms: i64,
799) -> Result<PersonaBudgetStatus, String> {
800    enforce_budget(log, binding, &cost, now_ms).await?;
801    append_budget_record(log, &binding.name, &cost, None, now_ms).await?;
802    persona_status(log, binding, now_ms)
803        .await
804        .map(|status| status.budget)
805}
806
807/// Report a `repair_worker_status` lifecycle transition for a sandboxed PR
808/// repair run.
809///
810/// Append-only and idempotent on `(repair_worker_id, lifecycle)`: replaying
811/// the same lifecycle is a no-op (`Ok(false)` indicates the event was already
812/// recorded). Hosted runtimes call this when a repair-worker job created by
813/// the persona transitions states.
814pub async fn report_repair_worker_status(
815    log: &Arc<AnyEventLog>,
816    binding: &PersonaRuntimeBinding,
817    status: PersonaRepairWorkerStatusUpdate,
818    now_ms: i64,
819) -> Result<bool, String> {
820    let mut status = status;
821    if status.persona_id.is_empty() {
822        status.persona_id = binding.name.clone();
823    }
824    if status.template_ref.is_none() {
825        status.template_ref = binding.template_ref.clone();
826    }
827    if status.occurred_at_ms == 0 {
828        status.occurred_at_ms = now_ms;
829    }
830    if status.last_heartbeat_ms == 0 {
831        status.last_heartbeat_ms = now_ms;
832    }
833
834    if repair_worker_status_recorded(log, &binding.name, &status).await? {
835        return Ok(false);
836    }
837    append_persona_event(
838        log,
839        &binding.name,
840        "persona.repair_worker.status",
841        serde_json::to_value(&status).map_err(|error| error.to_string())?,
842        status.occurred_at_ms,
843    )
844    .await?;
845    record_persona_supervision_event(
846        log,
847        &binding.name,
848        PersonaSupervisionEvent::RepairWorkerStatus(status),
849    )
850    .await?;
851    Ok(true)
852}
853
854/// Acknowledge a checkpoint-restore request initiated by the supervision API.
855///
856/// Idempotent on `checkpoint_id`: a repeated restore request resolves to the
857/// same ack (returns `Ok(false)`). The runtime emits a typed
858/// `Checkpoint(action: RestoreAcked)` supervision event carrying the
859/// coordinates the runtime resumed from.
860pub async fn restore_persona_checkpoint(
861    log: &Arc<AnyEventLog>,
862    binding: &PersonaRuntimeBinding,
863    request: PersonaCheckpointRestoreRequest,
864    now_ms: i64,
865) -> Result<PersonaCheckpointRestoreOutcome, String> {
866    let PersonaCheckpointRestoreRequest {
867        checkpoint_id,
868        work_key,
869        resumed_from,
870    } = request;
871    let status = persona_status(log, binding, now_ms).await?;
872
873    if let Some(prior) = find_checkpoint_restore_ack(log, &binding.name, &checkpoint_id).await? {
874        return Ok(PersonaCheckpointRestoreOutcome {
875            acked: false,
876            update: prior,
877        });
878    }
879
880    let resume_coordinates = resumed_from.unwrap_or_else(|| PersonaCheckpointResume {
881        run_id: None,
882        lease_id: status.active_lease.as_ref().map(|lease| lease.id.clone()),
883        last_run_ms: status
884            .last_run
885            .as_deref()
886            .and_then(|value| parse_rfc3339_ms(value).ok()),
887        queued_work_keys: status
888            .queued_work
889            .iter()
890            .map(|item| item.work_key.clone())
891            .collect(),
892        note: None,
893    });
894
895    let update = PersonaCheckpointUpdate {
896        persona_id: binding.name.clone(),
897        template_ref: binding.template_ref.clone(),
898        action: PersonaCheckpointAction::RestoreAcked,
899        checkpoint_id: checkpoint_id.clone(),
900        work_key,
901        resumed_from: Some(resume_coordinates),
902        occurred_at_ms: now_ms,
903    };
904
905    append_persona_event(
906        log,
907        &binding.name,
908        "persona.checkpoint.restore_acked",
909        serde_json::to_value(&update).map_err(|error| error.to_string())?,
910        now_ms,
911    )
912    .await?;
913    record_persona_supervision_event(
914        log,
915        &binding.name,
916        PersonaSupervisionEvent::Checkpoint(update.clone()),
917    )
918    .await?;
919    Ok(PersonaCheckpointRestoreOutcome {
920        acked: true,
921        update,
922    })
923}
924
925#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
926pub struct PersonaCheckpointRestoreRequest {
927    pub checkpoint_id: String,
928    #[serde(default)]
929    pub work_key: Option<String>,
930    #[serde(default)]
931    pub resumed_from: Option<PersonaCheckpointResume>,
932}
933
934#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
935pub struct PersonaCheckpointRestoreOutcome {
936    pub acked: bool,
937    pub update: PersonaCheckpointUpdate,
938}
939
940async fn repair_worker_status_recorded(
941    log: &Arc<AnyEventLog>,
942    persona: &str,
943    update: &PersonaRepairWorkerStatusUpdate,
944) -> Result<bool, String> {
945    let events = read_persona_events(log, persona).await?;
946    Ok(events.into_iter().any(|(_, event)| {
947        event.kind == "persona.repair_worker.status"
948            && event
949                .payload
950                .get("repair_worker_id")
951                .and_then(serde_json::Value::as_str)
952                == Some(update.repair_worker_id.as_str())
953            && event
954                .payload
955                .get("lifecycle")
956                .and_then(serde_json::Value::as_str)
957                == Some(update.lifecycle.as_str())
958    }))
959}
960
961async fn find_checkpoint_restore_ack(
962    log: &Arc<AnyEventLog>,
963    persona: &str,
964    checkpoint_id: &str,
965) -> Result<Option<PersonaCheckpointUpdate>, String> {
966    let events = read_persona_events(log, persona).await?;
967    for (_, event) in events.into_iter().rev() {
968        if event.kind != "persona.checkpoint.restore_acked" {
969            continue;
970        }
971        if event
972            .payload
973            .get("checkpoint_id")
974            .and_then(serde_json::Value::as_str)
975            != Some(checkpoint_id)
976        {
977            continue;
978        }
979        let update: PersonaCheckpointUpdate =
980            serde_json::from_value(event.payload).map_err(|error| error.to_string())?;
981        return Ok(Some(update));
982    }
983    Ok(None)
984}
985
986async fn run_for_envelope(
987    log: &Arc<AnyEventLog>,
988    binding: &PersonaRuntimeBinding,
989    envelope: PersonaTriggerEnvelope,
990    cost: PersonaRunCost,
991    now_ms: i64,
992) -> Result<PersonaRunReceipt, String> {
993    let pre_queue = queue_snapshot(log, binding, now_ms).await?;
994    let receipt = run_for_envelope_inner(log, binding, envelope, cost, now_ms).await?;
995    let post_queue = queue_snapshot(log, binding, now_ms).await?;
996    emit_queue_position_supervision(log, binding, &pre_queue, &post_queue, now_ms).await?;
997    emit_receipt_supervision(log, binding, &receipt, now_ms).await?;
998    Ok(receipt)
999}
1000
1001async fn run_for_envelope_inner(
1002    log: &Arc<AnyEventLog>,
1003    binding: &PersonaRuntimeBinding,
1004    envelope: PersonaTriggerEnvelope,
1005    cost: PersonaRunCost,
1006    now_ms: i64,
1007) -> Result<PersonaRunReceipt, String> {
1008    let status = persona_status(log, binding, now_ms).await?;
1009    match status.state {
1010        PersonaLifecycleState::Paused => {
1011            append_persona_event(
1012                log,
1013                &binding.name,
1014                "persona.trigger.queued",
1015                json!({
1016                    "work_key": envelope.subject_key,
1017                    "envelope": envelope,
1018                    "cost": cost,
1019                    "reason": "paused",
1020                }),
1021                now_ms,
1022            )
1023            .await?;
1024            return Ok(PersonaRunReceipt {
1025                status: "queued".to_string(),
1026                persona: binding.name.clone(),
1027                run_id: None,
1028                work_key: envelope.subject_key,
1029                lease: None,
1030                queued: true,
1031                error: None,
1032                budget_receipt_id: None,
1033            });
1034        }
1035        PersonaLifecycleState::Disabled => {
1036            append_persona_event(
1037                log,
1038                &binding.name,
1039                "persona.trigger.dead_lettered",
1040                json!({
1041                    "work_key": envelope.subject_key,
1042                    "envelope": envelope,
1043                    "reason": "disabled",
1044                }),
1045                now_ms,
1046            )
1047            .await?;
1048            return Ok(PersonaRunReceipt {
1049                status: "dead_lettered".to_string(),
1050                persona: binding.name.clone(),
1051                run_id: None,
1052                work_key: envelope.subject_key,
1053                lease: None,
1054                queued: false,
1055                error: Some("persona is disabled".to_string()),
1056                budget_receipt_id: None,
1057            });
1058        }
1059        _ => {}
1060    }
1061
1062    if let Err(error) = enforce_budget(log, binding, &cost, now_ms).await {
1063        return Ok(PersonaRunReceipt {
1064            status: "budget_exhausted".to_string(),
1065            persona: binding.name.clone(),
1066            run_id: None,
1067            work_key: envelope.subject_key,
1068            lease: None,
1069            queued: false,
1070            error: Some(error),
1071            budget_receipt_id: None,
1072        });
1073    }
1074
1075    if work_completed(log, &binding.name, &envelope.subject_key).await? {
1076        append_persona_event(
1077            log,
1078            &binding.name,
1079            "persona.trigger.duplicate",
1080            json!({
1081                "work_key": envelope.subject_key,
1082                "envelope": envelope,
1083                "reason": "already_completed",
1084            }),
1085            now_ms,
1086        )
1087        .await?;
1088        return Ok(PersonaRunReceipt {
1089            status: "duplicate".to_string(),
1090            persona: binding.name.clone(),
1091            run_id: None,
1092            work_key: envelope.subject_key,
1093            lease: None,
1094            queued: false,
1095            error: None,
1096            budget_receipt_id: None,
1097        });
1098    }
1099
1100    let Some(lease) = acquire_lease(
1101        log,
1102        binding,
1103        &envelope.subject_key,
1104        "persona-runtime",
1105        DEFAULT_LEASE_TTL_MS,
1106        now_ms,
1107    )
1108    .await?
1109    else {
1110        return Ok(PersonaRunReceipt {
1111            status: "lease_busy".to_string(),
1112            persona: binding.name.clone(),
1113            run_id: None,
1114            work_key: envelope.subject_key,
1115            lease: status.active_lease,
1116            queued: false,
1117            error: Some("active lease already owns persona work".to_string()),
1118            budget_receipt_id: None,
1119        });
1120    };
1121
1122    let run_id = Uuid::now_v7();
1123    let value_metadata = run_value_metadata(&envelope, &lease, &cost);
1124    append_persona_event(
1125        log,
1126        &binding.name,
1127        "persona.run.started",
1128        json!({
1129            "work_key": envelope.subject_key,
1130            "run_id": run_id,
1131            "started_at_ms": now_ms,
1132            "entry_workflow": binding.entry_workflow,
1133            "lease_id": lease.id,
1134        }),
1135        now_ms,
1136    )
1137    .await?;
1138    emit_persona_value_event(
1139        log,
1140        binding,
1141        run_id,
1142        PersonaValueEventDelta {
1143            kind: PersonaValueEventKind::RunStarted,
1144            metadata: value_metadata.clone(),
1145            ..Default::default()
1146        },
1147        now_ms,
1148    )
1149    .await?;
1150    let budget_receipt_id =
1151        append_budget_record(log, &binding.name, &cost, Some(&lease.id), now_ms).await?;
1152    if cost.avoided_cost_usd > 0.0 || cost.deterministic_steps > 0 {
1153        emit_persona_value_event(
1154            log,
1155            binding,
1156            run_id,
1157            PersonaValueEventDelta {
1158                kind: PersonaValueEventKind::DeterministicExecution,
1159                avoided_cost_usd: cost.avoided_cost_usd,
1160                deterministic_steps: cost.deterministic_steps.max(1),
1161                metadata: value_metadata.clone(),
1162                ..Default::default()
1163            },
1164            now_ms,
1165        )
1166        .await?;
1167    }
1168    if cost.frontier_escalations > 0 {
1169        emit_persona_value_event(
1170            log,
1171            binding,
1172            run_id,
1173            PersonaValueEventDelta {
1174                kind: PersonaValueEventKind::FrontierEscalation,
1175                paid_cost_usd: cost.cost_usd,
1176                llm_steps: cost.llm_steps.max(cost.frontier_escalations),
1177                metadata: value_metadata.clone(),
1178                ..Default::default()
1179            },
1180            now_ms,
1181        )
1182        .await?;
1183    }
1184    let completion_paid_cost = if cost.frontier_escalations > 0 {
1185        0.0
1186    } else {
1187        cost.cost_usd
1188    };
1189    let completion_llm_steps = if cost.frontier_escalations > 0 {
1190        0
1191    } else {
1192        cost.llm_steps
1193    };
1194    emit_persona_value_event(
1195        log,
1196        binding,
1197        run_id,
1198        PersonaValueEventDelta {
1199            kind: PersonaValueEventKind::RunCompleted,
1200            paid_cost_usd: completion_paid_cost,
1201            llm_steps: completion_llm_steps,
1202            metadata: value_metadata,
1203            ..Default::default()
1204        },
1205        now_ms,
1206    )
1207    .await?;
1208    append_persona_event(
1209        log,
1210        &binding.name,
1211        "persona.run.completed",
1212        json!({
1213            "work_key": envelope.subject_key,
1214            "run_id": run_id,
1215            "completed_at_ms": now_ms,
1216            "entry_workflow": binding.entry_workflow,
1217            "lease_id": lease.id,
1218        }),
1219        now_ms,
1220    )
1221    .await?;
1222    append_persona_event(
1223        log,
1224        &binding.name,
1225        "persona.lease.released",
1226        json!({
1227            "id": lease.id,
1228            "work_key": envelope.subject_key,
1229            "released_at_ms": now_ms,
1230        }),
1231        now_ms,
1232    )
1233    .await?;
1234    Ok(PersonaRunReceipt {
1235        status: "completed".to_string(),
1236        persona: binding.name.clone(),
1237        run_id: Some(run_id),
1238        work_key: envelope.subject_key,
1239        lease: Some(lease),
1240        queued: false,
1241        error: None,
1242        budget_receipt_id: Some(budget_receipt_id),
1243    })
1244}
1245
1246async fn acquire_lease(
1247    log: &Arc<AnyEventLog>,
1248    binding: &PersonaRuntimeBinding,
1249    work_key: &str,
1250    holder: &str,
1251    ttl_ms: i64,
1252    now_ms: i64,
1253) -> Result<Option<PersonaLease>, String> {
1254    let status = persona_status(log, binding, now_ms).await?;
1255    if let Some(lease) = status.active_lease {
1256        if lease.expires_at_ms > now_ms {
1257            append_persona_event(
1258                log,
1259                &binding.name,
1260                "persona.lease.conflict",
1261                json!({
1262                    "active_lease": lease,
1263                    "requested_work_key": work_key,
1264                    "at_ms": now_ms,
1265                }),
1266                now_ms,
1267            )
1268            .await?;
1269            return Ok(None);
1270        }
1271        append_persona_event(
1272            log,
1273            &binding.name,
1274            "persona.lease.expired",
1275            json!({
1276                "id": lease.id,
1277                "work_key": lease.work_key,
1278                "expired_at_ms": now_ms,
1279            }),
1280            now_ms,
1281        )
1282        .await?;
1283    }
1284
1285    let lease = PersonaLease {
1286        id: format!("persona_lease_{}", Uuid::now_v7()),
1287        holder: holder.to_string(),
1288        work_key: work_key.to_string(),
1289        acquired_at_ms: now_ms,
1290        expires_at_ms: now_ms + ttl_ms,
1291    };
1292    append_persona_event(
1293        log,
1294        &binding.name,
1295        "persona.lease.acquired",
1296        serde_json::to_value(&lease).map_err(|error| error.to_string())?,
1297        now_ms,
1298    )
1299    .await?;
1300    Ok(Some(lease))
1301}
1302
1303async fn enforce_budget(
1304    log: &Arc<AnyEventLog>,
1305    binding: &PersonaRuntimeBinding,
1306    cost: &PersonaRunCost,
1307    now_ms: i64,
1308) -> Result<(), String> {
1309    let status = persona_status(log, binding, now_ms).await?;
1310    let reason = if binding
1311        .budget
1312        .run_usd
1313        .is_some_and(|limit| cost.cost_usd > limit)
1314    {
1315        Some("run_usd")
1316    } else if binding
1317        .budget
1318        .daily_usd
1319        .is_some_and(|limit| status.budget.spent_today_usd + cost.cost_usd > limit)
1320    {
1321        Some("daily_usd")
1322    } else if binding
1323        .budget
1324        .hourly_usd
1325        .is_some_and(|limit| status.budget.spent_this_hour_usd + cost.cost_usd > limit)
1326    {
1327        Some("hourly_usd")
1328    } else if binding
1329        .budget
1330        .max_tokens
1331        .is_some_and(|limit| status.budget.tokens_today + cost.tokens > limit)
1332    {
1333        Some("max_tokens")
1334    } else {
1335        None
1336    };
1337
1338    if let Some(reason) = reason {
1339        let receipt_id = format!("persona_budget_{}", Uuid::now_v7());
1340        append_persona_event(
1341            log,
1342            &binding.name,
1343            "persona.budget.exhausted",
1344            json!({
1345                "receipt_id": receipt_id,
1346                "reason": reason,
1347                "attempted_cost_usd": cost.cost_usd,
1348                "attempted_tokens": cost.tokens,
1349                "persona": binding.name,
1350            }),
1351            now_ms,
1352        )
1353        .await?;
1354        return Err(format!("persona budget exhausted: {reason}"));
1355    }
1356
1357    Ok(())
1358}
1359
1360async fn append_budget_record(
1361    log: &Arc<AnyEventLog>,
1362    persona: &str,
1363    cost: &PersonaRunCost,
1364    lease_id: Option<&str>,
1365    now_ms: i64,
1366) -> Result<String, String> {
1367    let receipt_id = format!("persona_budget_{}", Uuid::now_v7());
1368    append_persona_event(
1369        log,
1370        persona,
1371        "persona.budget.recorded",
1372        json!({
1373            "receipt_id": receipt_id,
1374            "persona": persona,
1375            "cost_usd": cost.cost_usd,
1376            "tokens": cost.tokens,
1377            "lease_id": lease_id,
1378        }),
1379        now_ms,
1380    )
1381    .await?;
1382    Ok(receipt_id)
1383}
1384
1385fn normalize_trigger_envelope(
1386    provider: &str,
1387    kind: &str,
1388    metadata: BTreeMap<String, String>,
1389    now_ms: i64,
1390) -> PersonaTriggerEnvelope {
1391    let provider = provider.to_ascii_lowercase();
1392    let kind = kind.to_string();
1393    let source_event_id = metadata
1394        .get("event_id")
1395        .or_else(|| metadata.get("id"))
1396        .cloned();
1397    let subject_key = match provider.as_str() {
1398        "github" => {
1399            let repo = metadata
1400                .get("repository")
1401                .or_else(|| metadata.get("repository.full_name"))
1402                .cloned()
1403                .unwrap_or_else(|| "unknown".to_string());
1404            if let Some(number) = metadata
1405                .get("pr")
1406                .or_else(|| metadata.get("pull_request.number"))
1407                .or_else(|| metadata.get("number"))
1408            {
1409                format!("github:{repo}:pr:{number}")
1410            } else if let Some(check) = metadata
1411                .get("check_run.name")
1412                .or_else(|| metadata.get("check_name"))
1413            {
1414                format!("github:{repo}:check:{check}")
1415            } else {
1416                format!("github:{repo}:{kind}")
1417            }
1418        }
1419        "linear" => {
1420            let issue = metadata
1421                .get("issue_key")
1422                .or_else(|| metadata.get("issue.identifier"))
1423                .or_else(|| metadata.get("issue_id"))
1424                .or_else(|| metadata.get("id"))
1425                .cloned()
1426                .unwrap_or_else(|| "unknown".to_string());
1427            format!("linear:issue:{issue}")
1428        }
1429        "slack" => {
1430            let channel = metadata
1431                .get("channel")
1432                .or_else(|| metadata.get("channel_id"))
1433                .cloned()
1434                .unwrap_or_else(|| "unknown".to_string());
1435            let ts = metadata
1436                .get("ts")
1437                .or_else(|| metadata.get("event_ts"))
1438                .cloned()
1439                .unwrap_or_else(|| "unknown".to_string());
1440            format!("slack:{channel}:{ts}")
1441        }
1442        "webhook" => metadata
1443            .get("dedupe_key")
1444            .or_else(|| metadata.get("event_id"))
1445            .map(|value| format!("webhook:{value}"))
1446            .unwrap_or_else(|| format!("webhook:{kind}:{}", Uuid::now_v7())),
1447        _ => metadata
1448            .get("dedupe_key")
1449            .or_else(|| metadata.get("event_id"))
1450            .map(|value| format!("{provider}:{kind}:{value}"))
1451            .unwrap_or_else(|| format!("{provider}:{kind}:{}", Uuid::now_v7())),
1452    };
1453
1454    PersonaTriggerEnvelope {
1455        provider,
1456        kind,
1457        subject_key,
1458        source_event_id,
1459        received_at_ms: now_ms,
1460        raw: json!({"metadata": metadata}),
1461        metadata,
1462    }
1463}
1464
1465async fn queued_events(
1466    log: &Arc<AnyEventLog>,
1467    persona: &str,
1468) -> Result<Vec<(PersonaTriggerEnvelope, PersonaRunCost)>, String> {
1469    let events = read_persona_events(log, persona).await?;
1470    let mut queued = BTreeMap::<String, (PersonaTriggerEnvelope, PersonaRunCost)>::new();
1471    let mut completed = BTreeSet::<String>::new();
1472    for (_, event) in events {
1473        match event.kind.as_str() {
1474            "persona.trigger.queued" => {
1475                let Some(envelope) = event.payload.get("envelope") else {
1476                    continue;
1477                };
1478                let envelope: PersonaTriggerEnvelope =
1479                    serde_json::from_value(envelope.clone()).map_err(|error| error.to_string())?;
1480                let cost = event
1481                    .payload
1482                    .get("cost")
1483                    .cloned()
1484                    .map(serde_json::from_value::<PersonaRunCost>)
1485                    .transpose()
1486                    .map_err(|error| error.to_string())?
1487                    .unwrap_or_default();
1488                queued.insert(envelope.subject_key.clone(), (envelope, cost));
1489            }
1490            "persona.run.completed" => {
1491                if let Some(work_key) = event
1492                    .payload
1493                    .get("work_key")
1494                    .and_then(serde_json::Value::as_str)
1495                {
1496                    completed.insert(work_key.to_string());
1497                }
1498            }
1499            _ => {}
1500        }
1501    }
1502    queued.retain(|work_key, _| !completed.contains(work_key));
1503    Ok(queued.into_values().collect())
1504}
1505
1506fn assignment_status_from_lease(lease: &PersonaLease) -> PersonaAssignmentStatus {
1507    PersonaAssignmentStatus {
1508        work_key: lease.work_key.clone(),
1509        lease_id: lease.id.clone(),
1510        holder: lease.holder.clone(),
1511        acquired_at: format_ms(lease.acquired_at_ms),
1512        expires_at: format_ms(lease.expires_at_ms),
1513    }
1514}
1515
1516fn queued_work_from_event(event: &LogEvent) -> Result<Option<PersonaQueuedWork>, String> {
1517    let Some(envelope) = event.payload.get("envelope") else {
1518        return Ok(None);
1519    };
1520    let envelope: PersonaTriggerEnvelope =
1521        serde_json::from_value(envelope.clone()).map_err(|error| error.to_string())?;
1522    Ok(Some(PersonaQueuedWork {
1523        work_key: envelope.subject_key,
1524        provider: envelope.provider,
1525        kind: envelope.kind,
1526        queued_at: format_ms(event.occurred_at_ms),
1527        reason: event
1528            .payload
1529            .get("reason")
1530            .and_then(serde_json::Value::as_str)
1531            .unwrap_or("queued")
1532            .to_string(),
1533        source_event_id: envelope.source_event_id,
1534        metadata: envelope.metadata,
1535    }))
1536}
1537
1538fn handoff_inbox_item(work: &PersonaQueuedWork) -> Option<PersonaHandoffInboxItem> {
1539    if work.provider != "handoff" && !work.metadata.contains_key("handoff_id") {
1540        return None;
1541    }
1542    Some(PersonaHandoffInboxItem {
1543        work_key: work.work_key.clone(),
1544        handoff_id: work.metadata.get("handoff_id").cloned(),
1545        handoff_kind: work
1546            .metadata
1547            .get("handoff_kind")
1548            .or_else(|| work.metadata.get("kind"))
1549            .cloned(),
1550        source_persona: work.metadata.get("source_persona").cloned(),
1551        task: work.metadata.get("task").cloned(),
1552        queued_at: work.queued_at.clone(),
1553        reason: work.reason.clone(),
1554    })
1555}
1556
1557fn value_receipt_from_event(event: &LogEvent) -> Result<Option<PersonaValueReceipt>, String> {
1558    let Ok(value_event) = serde_json::from_value::<PersonaValueEvent>(event.payload.clone()) else {
1559        return Ok(None);
1560    };
1561    Ok(Some(PersonaValueReceipt {
1562        kind: value_event.kind,
1563        run_id: value_event.run_id,
1564        occurred_at: value_event
1565            .occurred_at
1566            .format(&Rfc3339)
1567            .map_err(|error| error.to_string())?,
1568        paid_cost_usd: value_event.paid_cost_usd,
1569        avoided_cost_usd: value_event.avoided_cost_usd,
1570        deterministic_steps: value_event.deterministic_steps,
1571        llm_steps: value_event.llm_steps,
1572        metadata: value_event.metadata,
1573    }))
1574}
1575
1576async fn work_completed(
1577    log: &Arc<AnyEventLog>,
1578    persona: &str,
1579    work_key: &str,
1580) -> Result<bool, String> {
1581    let events = read_persona_events(log, persona).await?;
1582    Ok(events.into_iter().any(|(_, event)| {
1583        event.kind == "persona.run.completed"
1584            && event
1585                .payload
1586                .get("work_key")
1587                .and_then(serde_json::Value::as_str)
1588                == Some(work_key)
1589    }))
1590}
1591
1592async fn read_persona_events(
1593    log: &Arc<AnyEventLog>,
1594    persona: &str,
1595) -> Result<Vec<(u64, LogEvent)>, String> {
1596    let topic = runtime_topic()?;
1597    Ok(log
1598        .read_range(&topic, None, usize::MAX)
1599        .await
1600        .map_err(|error| error.to_string())?
1601        .into_iter()
1602        .filter(|(_, event)| {
1603            event
1604                .headers
1605                .get("persona")
1606                .is_some_and(|name| name == persona)
1607        })
1608        .collect())
1609}
1610
1611async fn append_persona_event(
1612    log: &Arc<AnyEventLog>,
1613    persona: &str,
1614    kind: &str,
1615    payload: serde_json::Value,
1616    now_ms: i64,
1617) -> Result<u64, String> {
1618    let mut headers = BTreeMap::new();
1619    headers.insert("persona".to_string(), persona.to_string());
1620    let event = LogEvent {
1621        kind: kind.to_string(),
1622        payload,
1623        headers,
1624        occurred_at_ms: now_ms,
1625    };
1626    log.append(&runtime_topic()?, event)
1627        .await
1628        .map_err(|error| error.to_string())
1629}
1630
1631struct PersonaValueEventDelta {
1632    kind: PersonaValueEventKind,
1633    paid_cost_usd: f64,
1634    avoided_cost_usd: f64,
1635    deterministic_steps: i64,
1636    llm_steps: i64,
1637    metadata: serde_json::Value,
1638}
1639
1640impl Default for PersonaValueEventDelta {
1641    fn default() -> Self {
1642        Self {
1643            kind: PersonaValueEventKind::RunCompleted,
1644            paid_cost_usd: 0.0,
1645            avoided_cost_usd: 0.0,
1646            deterministic_steps: 0,
1647            llm_steps: 0,
1648            metadata: serde_json::Value::Null,
1649        }
1650    }
1651}
1652
1653async fn emit_persona_value_event(
1654    log: &Arc<AnyEventLog>,
1655    binding: &PersonaRuntimeBinding,
1656    run_id: Uuid,
1657    delta: PersonaValueEventDelta,
1658    now_ms: i64,
1659) -> Result<(), String> {
1660    let event = PersonaValueEvent {
1661        persona_id: binding.name.clone(),
1662        template_ref: binding.template_ref.clone(),
1663        run_id: Some(run_id),
1664        kind: delta.kind,
1665        paid_cost_usd: delta.paid_cost_usd.max(0.0),
1666        avoided_cost_usd: delta.avoided_cost_usd.max(0.0),
1667        deterministic_steps: delta.deterministic_steps.max(0),
1668        llm_steps: delta.llm_steps.max(0),
1669        metadata: delta.metadata,
1670        occurred_at: offset_datetime_from_ms(now_ms),
1671    };
1672    append_persona_event(
1673        log,
1674        &binding.name,
1675        &format!("persona.value.{}", event.kind.as_str()),
1676        serde_json::to_value(&event).map_err(|error| error.to_string())?,
1677        now_ms,
1678    )
1679    .await?;
1680    emit_persona_value_sink_event(&event);
1681    Ok(())
1682}
1683
1684fn emit_persona_value_sink_event(event: &PersonaValueEvent) {
1685    for sink in persona_value_sinks().snapshot() {
1686        sink.handle_value_event(event);
1687    }
1688}
1689
1690fn emit_persona_supervision_sink_event(event: &PersonaSupervisionEvent) {
1691    for sink in persona_supervision_sinks().snapshot() {
1692        sink.handle_supervision_event(event);
1693    }
1694}
1695
1696async fn record_persona_supervision_event(
1697    log: &Arc<AnyEventLog>,
1698    persona: &str,
1699    event: PersonaSupervisionEvent,
1700) -> Result<(), String> {
1701    let update_kind = event.update_kind();
1702    let occurred_at_ms = event.occurred_at_ms();
1703    append_persona_event(
1704        log,
1705        persona,
1706        &format!("persona.supervision.{update_kind}"),
1707        serde_json::to_value(&event).map_err(|error| error.to_string())?,
1708        occurred_at_ms,
1709    )
1710    .await?;
1711    emit_persona_supervision_sink_event(&event);
1712    Ok(())
1713}
1714
1715#[derive(Clone, Debug)]
1716struct QueueEntry {
1717    work_key: String,
1718    queued_at_ms: i64,
1719}
1720
1721async fn queue_snapshot(
1722    log: &Arc<AnyEventLog>,
1723    binding: &PersonaRuntimeBinding,
1724    now_ms: i64,
1725) -> Result<Vec<QueueEntry>, String> {
1726    let status = persona_status(log, binding, now_ms).await?;
1727    Ok(status
1728        .queued_work
1729        .into_iter()
1730        .map(|item| QueueEntry {
1731            queued_at_ms: parse_rfc3339_ms(&item.queued_at).unwrap_or(now_ms),
1732            work_key: item.work_key,
1733        })
1734        .collect())
1735}
1736
1737async fn emit_queue_position_supervision(
1738    log: &Arc<AnyEventLog>,
1739    binding: &PersonaRuntimeBinding,
1740    before: &[QueueEntry],
1741    after: &[QueueEntry],
1742    now_ms: i64,
1743) -> Result<(), String> {
1744    use std::collections::HashSet;
1745    let before_keys: HashSet<&str> = before.iter().map(|e| e.work_key.as_str()).collect();
1746    let after_keys: HashSet<&str> = after.iter().map(|e| e.work_key.as_str()).collect();
1747    let after_depth = after.len() as i64;
1748
1749    for (index, entry) in after.iter().enumerate() {
1750        if !before_keys.contains(entry.work_key.as_str()) {
1751            record_persona_supervision_event(
1752                log,
1753                &binding.name,
1754                PersonaSupervisionEvent::QueuePosition(PersonaQueuePositionUpdate {
1755                    persona_id: binding.name.clone(),
1756                    template_ref: binding.template_ref.clone(),
1757                    work_key: entry.work_key.clone(),
1758                    queue_depth: after_depth,
1759                    position: (index + 1) as i64,
1760                    queued_at_ms: entry.queued_at_ms,
1761                    occurred_at_ms: now_ms,
1762                }),
1763            )
1764            .await?;
1765        }
1766    }
1767    for entry in before {
1768        if !after_keys.contains(entry.work_key.as_str()) {
1769            record_persona_supervision_event(
1770                log,
1771                &binding.name,
1772                PersonaSupervisionEvent::QueuePosition(PersonaQueuePositionUpdate {
1773                    persona_id: binding.name.clone(),
1774                    template_ref: binding.template_ref.clone(),
1775                    work_key: entry.work_key.clone(),
1776                    queue_depth: after_depth,
1777                    position: 0,
1778                    queued_at_ms: entry.queued_at_ms,
1779                    occurred_at_ms: now_ms,
1780                }),
1781            )
1782            .await?;
1783        }
1784    }
1785    Ok(())
1786}
1787
1788async fn emit_receipt_supervision(
1789    log: &Arc<AnyEventLog>,
1790    binding: &PersonaRuntimeBinding,
1791    receipt: &PersonaRunReceipt,
1792    now_ms: i64,
1793) -> Result<(), String> {
1794    record_persona_supervision_event(
1795        log,
1796        &binding.name,
1797        PersonaSupervisionEvent::Receipt(PersonaReceiptUpdate {
1798            persona_id: binding.name.clone(),
1799            template_ref: binding.template_ref.clone(),
1800            receipt: receipt.clone(),
1801            occurred_at_ms: now_ms,
1802        }),
1803    )
1804    .await
1805}
1806
1807fn run_value_metadata(
1808    envelope: &PersonaTriggerEnvelope,
1809    lease: &PersonaLease,
1810    cost: &PersonaRunCost,
1811) -> serde_json::Value {
1812    let mut metadata = serde_json::Map::new();
1813    metadata.insert("work_key".to_string(), json!(envelope.subject_key));
1814    metadata.insert("trigger_provider".to_string(), json!(envelope.provider));
1815    metadata.insert("trigger_kind".to_string(), json!(envelope.kind));
1816    metadata.insert("lease_id".to_string(), json!(lease.id));
1817    metadata.insert("tokens".to_string(), json!(cost.tokens));
1818    if cost.frontier_escalations > 0 {
1819        metadata.insert(
1820            "frontier_escalations".to_string(),
1821            json!(cost.frontier_escalations),
1822        );
1823    }
1824    match &cost.metadata {
1825        serde_json::Value::Null => {}
1826        serde_json::Value::Object(extra) => {
1827            metadata.extend(
1828                extra
1829                    .iter()
1830                    .map(|(key, value)| (key.clone(), value.clone())),
1831            );
1832        }
1833        extra => {
1834            metadata.insert("run_cost_metadata".to_string(), extra.clone());
1835        }
1836    }
1837    serde_json::Value::Object(metadata)
1838}
1839
1840fn budget_status(
1841    policy: &PersonaBudgetPolicy,
1842    spent: &[(i64, f64, u64)],
1843    now_ms: i64,
1844) -> PersonaBudgetStatus {
1845    let day_start = now_ms - (now_ms.rem_euclid(86_400_000));
1846    let hour_start = now_ms - (now_ms.rem_euclid(3_600_000));
1847    let mut spent_today_usd = 0.0;
1848    let mut spent_this_hour_usd = 0.0;
1849    let mut tokens_today = 0u64;
1850    let mut spent_last_run_usd = 0.0;
1851    for (at_ms, cost, tokens) in spent {
1852        spent_last_run_usd = *cost;
1853        if *at_ms >= day_start {
1854            spent_today_usd += cost;
1855            tokens_today += tokens;
1856        }
1857        if *at_ms >= hour_start {
1858            spent_this_hour_usd += cost;
1859        }
1860    }
1861
1862    let remaining_today_usd = policy
1863        .daily_usd
1864        .map(|limit| (limit - spent_today_usd).max(0.0));
1865    let remaining_hour_usd = policy
1866        .hourly_usd
1867        .map(|limit| (limit - spent_this_hour_usd).max(0.0));
1868    let reason = if policy
1869        .daily_usd
1870        .is_some_and(|limit| spent_today_usd >= limit && limit >= 0.0)
1871    {
1872        Some("daily_usd".to_string())
1873    } else if policy
1874        .hourly_usd
1875        .is_some_and(|limit| spent_this_hour_usd >= limit && limit >= 0.0)
1876    {
1877        Some("hourly_usd".to_string())
1878    } else if policy
1879        .max_tokens
1880        .is_some_and(|limit| tokens_today >= limit && limit > 0)
1881    {
1882        Some("max_tokens".to_string())
1883    } else {
1884        None
1885    };
1886
1887    PersonaBudgetStatus {
1888        daily_usd: policy.daily_usd,
1889        hourly_usd: policy.hourly_usd,
1890        run_usd: policy.run_usd,
1891        max_tokens: policy.max_tokens,
1892        spent_today_usd,
1893        spent_this_hour_usd,
1894        spent_last_run_usd,
1895        tokens_today,
1896        remaining_today_usd,
1897        remaining_hour_usd,
1898        exhausted: reason.is_some(),
1899        reason,
1900        last_receipt_id: None,
1901    }
1902}
1903
1904fn next_scheduled_run(
1905    binding: &PersonaRuntimeBinding,
1906    last_run_ms: Option<i64>,
1907    now_ms: i64,
1908) -> Option<String> {
1909    binding
1910        .schedules
1911        .iter()
1912        .filter_map(|schedule| next_cron_ms(schedule, last_run_ms.unwrap_or(now_ms)).ok())
1913        .min()
1914        .map(format_ms)
1915}
1916
1917fn next_cron_ms(schedule: &str, after_ms: i64) -> Result<i64, String> {
1918    let cron = schedule
1919        .parse::<Cron>()
1920        .map_err(|error| error.to_string())?;
1921    let after = Utc
1922        .timestamp_millis_opt(after_ms)
1923        .single()
1924        .ok_or_else(|| "invalid timestamp".to_string())?;
1925    let next = cron
1926        .find_next_occurrence(&after, false)
1927        .map_err(|error| error.to_string())?;
1928    Ok(next.timestamp_millis())
1929}
1930
1931pub fn now_ms() -> i64 {
1932    OffsetDateTime::now_utc().unix_timestamp_nanos() as i64 / 1_000_000
1933}
1934
1935fn offset_datetime_from_ms(ms: i64) -> OffsetDateTime {
1936    OffsetDateTime::from_unix_timestamp_nanos((ms as i128) * 1_000_000)
1937        .unwrap_or(OffsetDateTime::UNIX_EPOCH)
1938}
1939
1940pub fn format_ms(ms: i64) -> String {
1941    offset_datetime_from_ms(ms)
1942        .format(&Rfc3339)
1943        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
1944}
1945
1946pub fn parse_rfc3339_ms(value: &str) -> Result<i64, String> {
1947    let ts = OffsetDateTime::parse(value, &Rfc3339)
1948        .map_err(|error| format!("invalid RFC3339 timestamp '{value}': {error}"))?;
1949    Ok(ts.unix_timestamp_nanos() as i64 / 1_000_000)
1950}
1951
1952fn runtime_topic() -> Result<Topic, String> {
1953    Topic::new(PERSONA_RUNTIME_TOPIC).map_err(|error| error.to_string())
1954}
1955
1956#[cfg(test)]
1957mod tests {
1958    use super::*;
1959    use crate::event_log::{AnyEventLog, MemoryEventLog};
1960    use std::sync::Mutex;
1961
1962    struct CapturingValueSink {
1963        events: Arc<Mutex<Vec<PersonaValueEvent>>>,
1964    }
1965
1966    impl PersonaValueSink for CapturingValueSink {
1967        fn handle_value_event(&self, event: &PersonaValueEvent) {
1968            self.events.lock().unwrap().push(event.clone());
1969        }
1970    }
1971
1972    fn binding() -> PersonaRuntimeBinding {
1973        PersonaRuntimeBinding {
1974            name: "merge_captain".to_string(),
1975            template_ref: Some("software_factory@v0".to_string()),
1976            entry_workflow: "workflows/merge.harn#run".to_string(),
1977            schedules: vec!["*/30 * * * *".to_string()],
1978            triggers: vec!["github.pr_opened".to_string()],
1979            budget: PersonaBudgetPolicy {
1980                daily_usd: Some(0.02),
1981                hourly_usd: None,
1982                run_usd: Some(0.02),
1983                max_tokens: Some(100),
1984            },
1985        }
1986    }
1987
1988    fn log() -> Arc<AnyEventLog> {
1989        Arc::new(AnyEventLog::Memory(MemoryEventLog::new(64)))
1990    }
1991
1992    #[tokio::test]
1993    async fn schedule_tick_records_lifecycle_status_and_receipt() {
1994        let log = log();
1995        let binding = binding();
1996        let now = parse_rfc3339_ms("2026-04-24T12:30:00Z").unwrap();
1997        let receipt = fire_schedule(
1998            &log,
1999            &binding,
2000            PersonaRunCost {
2001                cost_usd: 0.01,
2002                tokens: 10,
2003                ..Default::default()
2004            },
2005            now,
2006        )
2007        .await
2008        .unwrap();
2009        assert_eq!(receipt.status, "completed");
2010        assert!(receipt.lease.is_some());
2011        let status = persona_status(&log, &binding, now).await.unwrap();
2012        assert_eq!(status.state, PersonaLifecycleState::Idle);
2013        assert_eq!(status.last_run.as_deref(), Some("2026-04-24T12:30:00Z"));
2014        assert!(status.next_scheduled_run.is_some());
2015        assert_eq!(status.budget.spent_today_usd, 0.01);
2016    }
2017
2018    #[tokio::test]
2019    async fn paused_personas_queue_and_resume_drains_once() {
2020        let log = log();
2021        let binding = binding();
2022        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2023        pause_persona(&log, &binding, now).await.unwrap();
2024        let receipt = fire_trigger(
2025            &log,
2026            &binding,
2027            "github",
2028            "pull_request",
2029            BTreeMap::from([
2030                ("repository".to_string(), "burin-labs/harn".to_string()),
2031                ("number".to_string(), "462".to_string()),
2032            ]),
2033            PersonaRunCost::default(),
2034            now,
2035        )
2036        .await
2037        .unwrap();
2038        assert_eq!(receipt.status, "queued");
2039        assert_eq!(
2040            persona_status(&log, &binding, now)
2041                .await
2042                .unwrap()
2043                .queued_events,
2044            1
2045        );
2046        let status = resume_persona(&log, &binding, now + 1000).await.unwrap();
2047        assert_eq!(status.state, PersonaLifecycleState::Idle);
2048        assert_eq!(status.queued_events, 0);
2049    }
2050
2051    #[tokio::test]
2052    async fn resumed_queued_work_reuses_original_budget_cost() {
2053        let log = log();
2054        let mut binding = binding();
2055        binding.budget.run_usd = Some(0.01);
2056        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2057        pause_persona(&log, &binding, now).await.unwrap();
2058        let queued = fire_trigger(
2059            &log,
2060            &binding,
2061            "github",
2062            "pull_request",
2063            BTreeMap::from([
2064                ("repository".to_string(), "burin-labs/harn".to_string()),
2065                ("number".to_string(), "1379".to_string()),
2066            ]),
2067            PersonaRunCost {
2068                cost_usd: 0.02,
2069                tokens: 1,
2070                ..Default::default()
2071            },
2072            now + 1,
2073        )
2074        .await
2075        .unwrap();
2076        assert_eq!(queued.status, "queued");
2077
2078        let status = resume_persona(&log, &binding, now + 2).await.unwrap();
2079
2080        assert_eq!(status.budget.reason.as_deref(), Some("run_usd"));
2081        assert!(status
2082            .last_error
2083            .as_deref()
2084            .is_some_and(|error| error.contains("run_usd")));
2085        assert_eq!(status.queued_events, 1);
2086    }
2087
2088    #[tokio::test]
2089    async fn duplicate_trigger_envelope_is_not_processed_twice() {
2090        let log = log();
2091        let binding = binding();
2092        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2093        let metadata = BTreeMap::from([
2094            ("repository".to_string(), "burin-labs/harn".to_string()),
2095            ("number".to_string(), "462".to_string()),
2096        ]);
2097        let first = fire_trigger(
2098            &log,
2099            &binding,
2100            "github",
2101            "pull_request",
2102            metadata.clone(),
2103            PersonaRunCost::default(),
2104            now,
2105        )
2106        .await
2107        .unwrap();
2108        let second = fire_trigger(
2109            &log,
2110            &binding,
2111            "github",
2112            "pull_request",
2113            metadata,
2114            PersonaRunCost::default(),
2115            now + 1000,
2116        )
2117        .await
2118        .unwrap();
2119        assert_eq!(first.status, "completed");
2120        assert_eq!(second.status, "duplicate");
2121        assert!(second.lease.is_none());
2122    }
2123
2124    #[tokio::test]
2125    async fn disabled_personas_dead_letter_events() {
2126        let log = log();
2127        let binding = binding();
2128        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2129        disable_persona(&log, &binding, now).await.unwrap();
2130        let receipt = fire_trigger(
2131            &log,
2132            &binding,
2133            "slack",
2134            "message",
2135            BTreeMap::from([
2136                ("channel".to_string(), "C123".to_string()),
2137                ("ts".to_string(), "1713988800.000100".to_string()),
2138            ]),
2139            PersonaRunCost::default(),
2140            now,
2141        )
2142        .await
2143        .unwrap();
2144        assert_eq!(receipt.status, "dead_lettered");
2145        let status = persona_status(&log, &binding, now).await.unwrap();
2146        assert_eq!(status.state, PersonaLifecycleState::Disabled);
2147        assert_eq!(status.disabled_events, 1);
2148    }
2149
2150    #[tokio::test]
2151    async fn budget_exhaustion_blocks_expensive_work() {
2152        let log = log();
2153        let mut binding = binding();
2154        binding.budget.daily_usd = Some(0.01);
2155        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2156        let receipt = fire_trigger(
2157            &log,
2158            &binding,
2159            "linear",
2160            "issue",
2161            BTreeMap::from([("issue_key".to_string(), "HAR-462".to_string())]),
2162            PersonaRunCost {
2163                cost_usd: 0.02,
2164                tokens: 1,
2165                ..Default::default()
2166            },
2167            now,
2168        )
2169        .await
2170        .unwrap();
2171        assert_eq!(receipt.status, "budget_exhausted");
2172        let status = persona_status(&log, &binding, now).await.unwrap();
2173        assert_eq!(status.budget.reason.as_deref(), Some("daily_usd"));
2174        assert!(status.budget.exhausted);
2175        assert!(status.last_error.as_deref().unwrap().contains("daily_usd"));
2176    }
2177
2178    #[tokio::test]
2179    async fn deterministic_predicate_hit_emits_value_event_with_avoided_cost() {
2180        let log = log();
2181        let binding = binding();
2182        let captured = Arc::new(Mutex::new(Vec::new()));
2183        let _registration = register_persona_value_sink(Arc::new(CapturingValueSink {
2184            events: captured.clone(),
2185        }));
2186        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2187
2188        let receipt = fire_trigger(
2189            &log,
2190            &binding,
2191            "github",
2192            "pull_request",
2193            BTreeMap::from([
2194                ("repository".to_string(), "burin-labs/harn".to_string()),
2195                ("number".to_string(), "715".to_string()),
2196            ]),
2197            PersonaRunCost {
2198                avoided_cost_usd: 0.0042,
2199                deterministic_steps: 1,
2200                metadata: json!({
2201                    "predicate": "pr_already_green",
2202                    "would_have_called_model": "gpt-5.4-mini",
2203                }),
2204                ..Default::default()
2205            },
2206            now,
2207        )
2208        .await
2209        .unwrap();
2210
2211        let run_id = receipt.run_id.expect("completed run has run_id");
2212        let events = captured.lock().unwrap().clone();
2213        let deterministic = events
2214            .iter()
2215            .find(|event| {
2216                event.kind == PersonaValueEventKind::DeterministicExecution
2217                    && event.run_id == Some(run_id)
2218            })
2219            .expect("deterministic execution value event");
2220        assert_eq!(deterministic.persona_id, "merge_captain");
2221        assert_eq!(
2222            deterministic.template_ref.as_deref(),
2223            Some("software_factory@v0")
2224        );
2225        assert_eq!(deterministic.run_id, Some(run_id));
2226        assert_eq!(deterministic.paid_cost_usd, 0.0);
2227        assert_eq!(deterministic.avoided_cost_usd, 0.0042);
2228        assert_eq!(deterministic.deterministic_steps, 1);
2229        assert_eq!(
2230            deterministic.metadata["predicate"].as_str(),
2231            Some("pr_already_green")
2232        );
2233
2234        let persisted = read_persona_events(&log, &binding.name).await.unwrap();
2235        assert!(persisted.iter().any(|(_, event)| {
2236            event.kind == "persona.value.deterministic_execution"
2237                && event.payload["avoided_cost_usd"] == json!(0.0042)
2238        }));
2239    }
2240
2241    #[tokio::test]
2242    async fn frontier_escalation_run_emits_value_event_with_paid_cost() {
2243        let log = log();
2244        let binding = binding();
2245        let captured = Arc::new(Mutex::new(Vec::new()));
2246        let _registration = register_persona_value_sink(Arc::new(CapturingValueSink {
2247            events: captured.clone(),
2248        }));
2249        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2250
2251        let receipt = fire_trigger(
2252            &log,
2253            &binding,
2254            "linear",
2255            "issue",
2256            BTreeMap::from([("issue_key".to_string(), "HAR-715".to_string())]),
2257            PersonaRunCost {
2258                cost_usd: 0.011,
2259                tokens: 20,
2260                llm_steps: 1,
2261                frontier_escalations: 1,
2262                metadata: json!({
2263                    "frontier_model": "gpt-5.4",
2264                    "escalation_reason": "high_risk_merge",
2265                }),
2266                ..Default::default()
2267            },
2268            now,
2269        )
2270        .await
2271        .unwrap();
2272
2273        let run_id = receipt.run_id.expect("completed run has run_id");
2274        let events = captured.lock().unwrap().clone();
2275        let escalation = events
2276            .iter()
2277            .find(|event| {
2278                event.kind == PersonaValueEventKind::FrontierEscalation
2279                    && event.run_id == Some(run_id)
2280            })
2281            .expect("frontier escalation value event");
2282        assert_eq!(escalation.run_id, Some(run_id));
2283        assert_eq!(escalation.paid_cost_usd, 0.011);
2284        assert_eq!(escalation.avoided_cost_usd, 0.0);
2285        assert_eq!(escalation.llm_steps, 1);
2286        assert_eq!(
2287            escalation.metadata["frontier_model"].as_str(),
2288            Some("gpt-5.4")
2289        );
2290
2291        let completion = events
2292            .iter()
2293            .find(|event| {
2294                event.kind == PersonaValueEventKind::RunCompleted && event.run_id == Some(run_id)
2295            })
2296            .expect("run completed value event");
2297        assert_eq!(completion.paid_cost_usd, 0.0);
2298    }
2299
2300    struct CapturingSupervisionSink {
2301        events: Arc<Mutex<Vec<PersonaSupervisionEvent>>>,
2302    }
2303
2304    impl PersonaSupervisionSink for CapturingSupervisionSink {
2305        fn handle_supervision_event(&self, event: &PersonaSupervisionEvent) {
2306            self.events.lock().unwrap().push(event.clone());
2307        }
2308    }
2309
2310    fn pr_metadata(repository: &str, number: &str) -> BTreeMap<String, String> {
2311        BTreeMap::from([
2312            ("repository".to_string(), repository.to_string()),
2313            ("number".to_string(), number.to_string()),
2314        ])
2315    }
2316
2317    fn binding_named(name: &str) -> PersonaRuntimeBinding {
2318        PersonaRuntimeBinding {
2319            name: name.to_string(),
2320            ..binding()
2321        }
2322    }
2323
2324    fn supervision_events_for(
2325        captured: &Arc<Mutex<Vec<PersonaSupervisionEvent>>>,
2326        persona: &str,
2327    ) -> Vec<PersonaSupervisionEvent> {
2328        captured
2329            .lock()
2330            .unwrap()
2331            .iter()
2332            .filter(|event| match event {
2333                PersonaSupervisionEvent::QueuePosition(update) => update.persona_id == persona,
2334                PersonaSupervisionEvent::RepairWorkerStatus(update) => update.persona_id == persona,
2335                PersonaSupervisionEvent::Receipt(update) => update.persona_id == persona,
2336                PersonaSupervisionEvent::Checkpoint(update) => update.persona_id == persona,
2337            })
2338            .cloned()
2339            .collect()
2340    }
2341
2342    async fn drive_pause_then_resume(binding: &PersonaRuntimeBinding, now: i64) {
2343        let log = log();
2344        pause_persona(&log, binding, now).await.unwrap();
2345        let _ = fire_trigger(
2346            &log,
2347            binding,
2348            "github",
2349            "pull_request",
2350            pr_metadata("burin-labs/harn", "1480"),
2351            PersonaRunCost::default(),
2352            now,
2353        )
2354        .await
2355        .unwrap();
2356        let _ = resume_persona(&log, binding, now + 1).await.unwrap();
2357        let _ = restore_persona_checkpoint(
2358            &log,
2359            binding,
2360            PersonaCheckpointRestoreRequest {
2361                checkpoint_id: "cp_42".to_string(),
2362                work_key: Some("github:burin-labs/harn:pr:1480".to_string()),
2363                resumed_from: Some(PersonaCheckpointResume {
2364                    note: Some("resumed from cp 42".to_string()),
2365                    ..Default::default()
2366                }),
2367            },
2368            now + 2,
2369        )
2370        .await
2371        .unwrap();
2372        let _ = report_repair_worker_status(
2373            &log,
2374            binding,
2375            PersonaRepairWorkerStatusUpdate {
2376                persona_id: String::new(),
2377                template_ref: None,
2378                repair_worker_id: "rw_42".to_string(),
2379                lifecycle: PersonaRepairWorkerLifecycle::Running,
2380                work_key: Some("github:burin-labs/harn:pr:1480".to_string()),
2381                lease_id: Some("persona_lease_xyz".to_string()),
2382                scratchpad_url: Some("https://factory.local/rw_42".to_string()),
2383                last_heartbeat_ms: 0,
2384                occurred_at_ms: 0,
2385            },
2386            now + 3,
2387        )
2388        .await
2389        .unwrap();
2390    }
2391
2392    #[tokio::test]
2393    async fn supervision_sink_emits_queue_position_and_receipt() {
2394        let captured = Arc::new(Mutex::new(Vec::new()));
2395        let _registration = register_persona_supervision_sink(Arc::new(CapturingSupervisionSink {
2396            events: captured.clone(),
2397        }));
2398        let log = log();
2399        let binding = binding_named("supervision_sink_emits_queue_position_and_receipt");
2400        let now = parse_rfc3339_ms("2026-05-01T00:00:00Z").unwrap();
2401
2402        pause_persona(&log, &binding, now).await.unwrap();
2403        fire_trigger(
2404            &log,
2405            &binding,
2406            "github",
2407            "pull_request",
2408            pr_metadata("burin-labs/harn", "1480"),
2409            PersonaRunCost::default(),
2410            now + 100,
2411        )
2412        .await
2413        .unwrap();
2414        resume_persona(&log, &binding, now + 200).await.unwrap();
2415
2416        let events = supervision_events_for(&captured, &binding.name);
2417        let queue_events: Vec<_> = events
2418            .iter()
2419            .filter_map(|event| match event {
2420                PersonaSupervisionEvent::QueuePosition(update) => Some(update.clone()),
2421                _ => None,
2422            })
2423            .collect();
2424        assert_eq!(queue_events.len(), 2, "enqueue + drain emitted");
2425        assert_eq!(queue_events[0].position, 1);
2426        assert_eq!(queue_events[0].queue_depth, 1);
2427        assert_eq!(queue_events[1].position, 0);
2428        assert_eq!(queue_events[1].queue_depth, 0);
2429        let receipt_events: Vec<_> = events
2430            .iter()
2431            .filter_map(|event| match event {
2432                PersonaSupervisionEvent::Receipt(update) => Some(update.clone()),
2433                _ => None,
2434            })
2435            .collect();
2436        assert_eq!(receipt_events.len(), 2, "queued + drained receipt");
2437        assert_eq!(receipt_events[0].receipt.status, "queued");
2438        assert_eq!(receipt_events[1].receipt.status, "completed");
2439        for event in &receipt_events {
2440            assert_eq!(event.receipt.persona, binding.name);
2441            assert_eq!(event.persona_id, binding.name);
2442            assert_eq!(event.template_ref.as_deref(), Some("software_factory@v0"));
2443        }
2444        let persisted_kinds: Vec<_> = read_persona_events(&log, &binding.name)
2445            .await
2446            .unwrap()
2447            .into_iter()
2448            .map(|(_, event)| event.kind)
2449            .collect();
2450        assert!(
2451            persisted_kinds
2452                .iter()
2453                .any(|kind| kind == "persona.supervision.queue_position"),
2454            "queue_position supervision events should be durable: {persisted_kinds:?}"
2455        );
2456        assert!(
2457            persisted_kinds
2458                .iter()
2459                .any(|kind| kind == "persona.supervision.receipt"),
2460            "receipt supervision events should be durable: {persisted_kinds:?}"
2461        );
2462    }
2463
2464    #[tokio::test]
2465    async fn supervision_sink_emits_repair_worker_status_idempotently() {
2466        let captured = Arc::new(Mutex::new(Vec::new()));
2467        let _registration = register_persona_supervision_sink(Arc::new(CapturingSupervisionSink {
2468            events: captured.clone(),
2469        }));
2470        let log = log();
2471        let binding = binding_named("supervision_sink_emits_repair_worker_status_idempotently");
2472        let now = parse_rfc3339_ms("2026-05-01T01:00:00Z").unwrap();
2473        let update = PersonaRepairWorkerStatusUpdate {
2474            persona_id: String::new(),
2475            template_ref: None,
2476            repair_worker_id: "rw_test".to_string(),
2477            lifecycle: PersonaRepairWorkerLifecycle::Running,
2478            work_key: Some("github:burin-labs/harn:pr:1480".to_string()),
2479            lease_id: Some("persona_lease_abc".to_string()),
2480            scratchpad_url: Some("https://factory.local/rw_test".to_string()),
2481            last_heartbeat_ms: 0,
2482            occurred_at_ms: 0,
2483        };
2484        let first = report_repair_worker_status(&log, &binding, update.clone(), now)
2485            .await
2486            .unwrap();
2487        let second = report_repair_worker_status(&log, &binding, update.clone(), now + 5)
2488            .await
2489            .unwrap();
2490        assert!(first);
2491        assert!(!second, "second identical lifecycle is idempotent");
2492
2493        let mut next = update.clone();
2494        next.lifecycle = PersonaRepairWorkerLifecycle::Succeeded;
2495        let third = report_repair_worker_status(&log, &binding, next, now + 10)
2496            .await
2497            .unwrap();
2498        assert!(third);
2499
2500        let kinds: Vec<_> = supervision_events_for(&captured, &binding.name)
2501            .into_iter()
2502            .filter_map(|event| match event {
2503                PersonaSupervisionEvent::RepairWorkerStatus(update) => Some(update.lifecycle),
2504                _ => None,
2505            })
2506            .collect();
2507        assert_eq!(
2508            kinds,
2509            vec![
2510                PersonaRepairWorkerLifecycle::Running,
2511                PersonaRepairWorkerLifecycle::Succeeded
2512            ]
2513        );
2514    }
2515
2516    #[tokio::test]
2517    async fn supervision_sink_emits_checkpoint_restore_ack() {
2518        let captured = Arc::new(Mutex::new(Vec::new()));
2519        let _registration = register_persona_supervision_sink(Arc::new(CapturingSupervisionSink {
2520            events: captured.clone(),
2521        }));
2522        let log = log();
2523        let binding = binding_named("supervision_sink_emits_checkpoint_restore_ack");
2524        let now = parse_rfc3339_ms("2026-05-01T02:00:00Z").unwrap();
2525        fire_trigger(
2526            &log,
2527            &binding,
2528            "github",
2529            "pull_request",
2530            pr_metadata("burin-labs/harn", "1480"),
2531            PersonaRunCost::default(),
2532            now,
2533        )
2534        .await
2535        .unwrap();
2536
2537        let outcome = restore_persona_checkpoint(
2538            &log,
2539            &binding,
2540            PersonaCheckpointRestoreRequest {
2541                checkpoint_id: "cp_1".to_string(),
2542                work_key: Some("github:burin-labs/harn:pr:1480".to_string()),
2543                resumed_from: None,
2544            },
2545            now + 100,
2546        )
2547        .await
2548        .unwrap();
2549        assert!(outcome.acked);
2550        assert_eq!(outcome.update.checkpoint_id, "cp_1");
2551        let resume = outcome
2552            .update
2553            .resumed_from
2554            .as_ref()
2555            .expect("resume coordinates default-derived from status");
2556        assert_eq!(resume.last_run_ms, Some(now));
2557
2558        let replay = restore_persona_checkpoint(
2559            &log,
2560            &binding,
2561            PersonaCheckpointRestoreRequest {
2562                checkpoint_id: "cp_1".to_string(),
2563                work_key: None,
2564                resumed_from: None,
2565            },
2566            now + 200,
2567        )
2568        .await
2569        .unwrap();
2570        assert!(!replay.acked, "duplicate restore is a no-op ack");
2571        assert_eq!(replay.update.occurred_at_ms, now + 100);
2572
2573        let ack_events: Vec<_> = supervision_events_for(&captured, &binding.name)
2574            .into_iter()
2575            .filter_map(|event| match event {
2576                PersonaSupervisionEvent::Checkpoint(update) => Some(update),
2577                _ => None,
2578            })
2579            .collect();
2580        assert_eq!(ack_events.len(), 1, "ack emitted once, replay suppressed");
2581        assert_eq!(ack_events[0].action, PersonaCheckpointAction::RestoreAcked);
2582    }
2583
2584    #[tokio::test]
2585    async fn supervision_sink_replay_is_deterministic_under_recorded_clock() {
2586        use harn_clock::{ClockEventLog, PausedClock, RecordedClock};
2587        use time::OffsetDateTime;
2588
2589        let now_ms = parse_rfc3339_ms("2026-05-01T03:00:00Z").unwrap();
2590        async fn drive(now_ms: i64) -> (Vec<PersonaSupervisionEvent>, Vec<harn_clock::ClockEvent>) {
2591            let captured = Arc::new(Mutex::new(Vec::new()));
2592            let _registration =
2593                register_persona_supervision_sink(Arc::new(CapturingSupervisionSink {
2594                    events: captured.clone(),
2595                }));
2596            let paused = PausedClock::new(
2597                OffsetDateTime::from_unix_timestamp_nanos((now_ms as i128) * 1_000_000).unwrap(),
2598            );
2599            let recorded = Arc::new(RecordedClock::new(paused, Arc::new(ClockEventLog::new())));
2600            let binding = binding_named("supervision_replay_persona");
2601            let ts = harn_clock::now_wall_ms(&*recorded);
2602            drive_pause_then_resume(&binding, ts).await;
2603            let clock_log = recorded.log().snapshot();
2604            let events = supervision_events_for(&captured, &binding.name);
2605            (events, clock_log)
2606        }
2607
2608        let (events_a, clock_a) = drive(now_ms).await;
2609        let (events_b, clock_b) = drive(now_ms).await;
2610        // Run receipts carry per-run `run_id`s (UUIDv7) and lease ids that
2611        // differ across runs. Normalize them so the deterministic envelope
2612        // remains comparable.
2613        fn normalize(event: &PersonaSupervisionEvent) -> PersonaSupervisionEvent {
2614            match event.clone() {
2615                PersonaSupervisionEvent::Receipt(mut update) => {
2616                    update.receipt.run_id = None;
2617                    if let Some(lease) = update.receipt.lease.as_mut() {
2618                        lease.id = "lease".to_string();
2619                    }
2620                    update.receipt.budget_receipt_id = update
2621                        .receipt
2622                        .budget_receipt_id
2623                        .map(|_| "budget".to_string());
2624                    PersonaSupervisionEvent::Receipt(update)
2625                }
2626                PersonaSupervisionEvent::Checkpoint(mut update) => {
2627                    if let Some(resume) = update.resumed_from.as_mut() {
2628                        resume.run_id = None;
2629                        resume.lease_id = None;
2630                    }
2631                    PersonaSupervisionEvent::Checkpoint(update)
2632                }
2633                other => other,
2634            }
2635        }
2636        let a: Vec<_> = events_a.iter().map(normalize).collect();
2637        let b: Vec<_> = events_b.iter().map(normalize).collect();
2638        assert_eq!(a, b, "supervision sink emits identical event envelopes");
2639        assert_eq!(
2640            clock_a, clock_b,
2641            "recorded clock observation log is identical across replays"
2642        );
2643    }
2644}