Skip to main content

harn_vm/
personas.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::sync::{Arc, OnceLock, RwLock};
4
5use chrono::{TimeZone, Utc};
6use croner::Cron;
7use serde::{Deserialize, Serialize};
8use serde_json::json;
9use time::format_description::well_known::Rfc3339;
10use time::OffsetDateTime;
11use uuid::Uuid;
12
13use crate::event_log::{AnyEventLog, EventLog, LogEvent, Topic};
14
15pub const PERSONA_RUNTIME_TOPIC: &str = "persona.runtime.events";
16
17const DEFAULT_LEASE_TTL_MS: i64 = 5 * 60 * 1000;
18
19#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
20#[serde(rename_all = "snake_case")]
21pub enum PersonaLifecycleState {
22    Inactive,
23    Starting,
24    #[default]
25    Idle,
26    Running,
27    Paused,
28    Draining,
29    Failed,
30    Disabled,
31}
32
33impl PersonaLifecycleState {
34    pub fn as_str(self) -> &'static str {
35        match self {
36            Self::Inactive => "inactive",
37            Self::Starting => "starting",
38            Self::Idle => "idle",
39            Self::Running => "running",
40            Self::Paused => "paused",
41            Self::Draining => "draining",
42            Self::Failed => "failed",
43            Self::Disabled => "disabled",
44        }
45    }
46}
47
48#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
49pub struct PersonaBudgetPolicy {
50    pub daily_usd: Option<f64>,
51    pub hourly_usd: Option<f64>,
52    pub run_usd: Option<f64>,
53    pub max_tokens: Option<u64>,
54}
55
56#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
57pub struct PersonaRuntimeBinding {
58    pub name: String,
59    #[serde(default)]
60    pub template_ref: Option<String>,
61    pub entry_workflow: String,
62    #[serde(default)]
63    pub schedules: Vec<String>,
64    #[serde(default)]
65    pub triggers: Vec<String>,
66    #[serde(default)]
67    pub budget: PersonaBudgetPolicy,
68}
69
70#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
71pub struct PersonaLease {
72    pub id: String,
73    pub holder: String,
74    pub work_key: String,
75    pub acquired_at_ms: i64,
76    pub expires_at_ms: i64,
77}
78
79#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
80pub struct PersonaBudgetStatus {
81    pub daily_usd: Option<f64>,
82    pub hourly_usd: Option<f64>,
83    pub run_usd: Option<f64>,
84    pub max_tokens: Option<u64>,
85    pub spent_today_usd: f64,
86    pub spent_this_hour_usd: f64,
87    pub spent_last_run_usd: f64,
88    pub tokens_today: u64,
89    pub remaining_today_usd: Option<f64>,
90    pub remaining_hour_usd: Option<f64>,
91    pub exhausted: bool,
92    pub reason: Option<String>,
93    pub last_receipt_id: Option<String>,
94}
95
96#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
97pub struct PersonaStatus {
98    pub name: String,
99    #[serde(default)]
100    pub template_ref: Option<String>,
101    pub state: PersonaLifecycleState,
102    pub entry_workflow: String,
103    #[serde(default)]
104    pub role: String,
105    #[serde(default)]
106    pub current_assignment: Option<PersonaAssignmentStatus>,
107    pub last_run: Option<String>,
108    pub next_scheduled_run: Option<String>,
109    pub active_lease: Option<PersonaLease>,
110    pub budget: PersonaBudgetStatus,
111    pub last_error: Option<String>,
112    pub queued_events: usize,
113    #[serde(default)]
114    pub queued_work: Vec<PersonaQueuedWork>,
115    #[serde(default)]
116    pub handoff_inbox: Vec<PersonaHandoffInboxItem>,
117    #[serde(default)]
118    pub value_receipts: Vec<PersonaValueReceipt>,
119    pub disabled_events: usize,
120    pub paused_event_policy: String,
121}
122
123#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
124pub struct PersonaAssignmentStatus {
125    pub work_key: String,
126    pub lease_id: String,
127    pub holder: String,
128    pub acquired_at: String,
129    pub expires_at: String,
130}
131
132#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
133pub struct PersonaQueuedWork {
134    pub work_key: String,
135    pub provider: String,
136    pub kind: String,
137    pub queued_at: String,
138    pub reason: String,
139    pub source_event_id: Option<String>,
140    #[serde(default)]
141    pub metadata: BTreeMap<String, String>,
142}
143
144#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
145pub struct PersonaHandoffInboxItem {
146    pub work_key: String,
147    pub handoff_id: Option<String>,
148    pub handoff_kind: Option<String>,
149    pub source_persona: Option<String>,
150    pub task: Option<String>,
151    pub queued_at: String,
152    pub reason: String,
153}
154
155#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
156pub struct PersonaValueReceipt {
157    pub kind: PersonaValueEventKind,
158    pub run_id: Option<Uuid>,
159    pub occurred_at: String,
160    pub paid_cost_usd: f64,
161    pub avoided_cost_usd: f64,
162    pub deterministic_steps: i64,
163    pub llm_steps: i64,
164    pub metadata: serde_json::Value,
165}
166
167#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
168pub struct PersonaTriggerEnvelope {
169    pub provider: String,
170    pub kind: String,
171    pub subject_key: String,
172    pub source_event_id: Option<String>,
173    pub received_at_ms: i64,
174    #[serde(default)]
175    pub metadata: BTreeMap<String, String>,
176    #[serde(default)]
177    pub raw: serde_json::Value,
178}
179
180#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
181pub struct PersonaRunReceipt {
182    pub status: String,
183    pub persona: String,
184    #[serde(default)]
185    pub run_id: Option<Uuid>,
186    pub work_key: String,
187    pub lease: Option<PersonaLease>,
188    pub queued: bool,
189    pub error: Option<String>,
190    pub budget_receipt_id: Option<String>,
191}
192
193#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
194pub struct PersonaRunCost {
195    pub cost_usd: f64,
196    pub tokens: u64,
197    #[serde(default)]
198    pub avoided_cost_usd: f64,
199    #[serde(default)]
200    pub deterministic_steps: i64,
201    #[serde(default)]
202    pub llm_steps: i64,
203    #[serde(default)]
204    pub frontier_escalations: i64,
205    #[serde(default)]
206    pub metadata: serde_json::Value,
207}
208
209#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
210#[serde(rename_all = "snake_case")]
211pub enum PersonaValueEventKind {
212    RunStarted,
213    RunCompleted,
214    AcceptedOutcome,
215    FrontierEscalation,
216    DeterministicExecution,
217    PromotionSavings,
218    ApprovalWait,
219}
220
221impl PersonaValueEventKind {
222    pub fn as_str(self) -> &'static str {
223        match self {
224            Self::RunStarted => "run_started",
225            Self::RunCompleted => "run_completed",
226            Self::AcceptedOutcome => "accepted_outcome",
227            Self::FrontierEscalation => "frontier_escalation",
228            Self::DeterministicExecution => "deterministic_execution",
229            Self::PromotionSavings => "promotion_savings",
230            Self::ApprovalWait => "approval_wait",
231        }
232    }
233}
234
235#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
236pub struct PersonaValueEvent {
237    pub persona_id: String,
238    pub template_ref: Option<String>,
239    pub run_id: Option<Uuid>,
240    pub kind: PersonaValueEventKind,
241    pub paid_cost_usd: f64,
242    pub avoided_cost_usd: f64,
243    pub deterministic_steps: i64,
244    pub llm_steps: i64,
245    pub metadata: serde_json::Value,
246    pub occurred_at: OffsetDateTime,
247}
248
249pub trait PersonaValueSink: Send + Sync {
250    fn handle_value_event(&self, event: &PersonaValueEvent);
251}
252
253/// Append-only multiplexed feed mirroring the harn-cloud
254/// `persona_supervision_events` projection. Sinks see the runtime-sourced
255/// `queue_position`, `repair_worker_status`, `receipt`, and
256/// `checkpoint` (restore-ack) variants; supervisor-sourced variants
257/// (`control`, `feedback`, `approval`, `handoff`, and checkpoint writes)
258/// originate on the hosted side and aren't routed here.
259#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
260#[serde(tag = "update_kind", rename_all = "snake_case")]
261pub enum PersonaSupervisionEvent {
262    QueuePosition(PersonaQueuePositionUpdate),
263    RepairWorkerStatus(PersonaRepairWorkerStatusUpdate),
264    Receipt(PersonaReceiptUpdate),
265    Checkpoint(PersonaCheckpointUpdate),
266}
267
268#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
269pub struct PersonaQueuePositionUpdate {
270    pub persona_id: String,
271    #[serde(default)]
272    pub template_ref: Option<String>,
273    pub work_key: String,
274    pub queue_depth: i64,
275    /// 1-indexed position of `work_key`; `0` means the item left the queue.
276    pub position: i64,
277    pub queued_at_ms: i64,
278    pub occurred_at_ms: i64,
279}
280
281#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
282#[serde(rename_all = "snake_case")]
283pub enum PersonaRepairWorkerLifecycle {
284    Pending,
285    Running,
286    Verifying,
287    Pushing,
288    Succeeded,
289    Failed,
290    Cancelled,
291}
292
293impl PersonaRepairWorkerLifecycle {
294    pub fn as_str(self) -> &'static str {
295        match self {
296            Self::Pending => "pending",
297            Self::Running => "running",
298            Self::Verifying => "verifying",
299            Self::Pushing => "pushing",
300            Self::Succeeded => "succeeded",
301            Self::Failed => "failed",
302            Self::Cancelled => "cancelled",
303        }
304    }
305}
306
307#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
308pub struct PersonaRepairWorkerStatusUpdate {
309    pub persona_id: String,
310    #[serde(default)]
311    pub template_ref: Option<String>,
312    pub repair_worker_id: String,
313    pub lifecycle: PersonaRepairWorkerLifecycle,
314    #[serde(default)]
315    pub work_key: Option<String>,
316    #[serde(default)]
317    pub lease_id: Option<String>,
318    #[serde(default)]
319    pub scratchpad_url: Option<String>,
320    pub last_heartbeat_ms: i64,
321    pub occurred_at_ms: i64,
322}
323
324#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
325pub struct PersonaReceiptUpdate {
326    pub persona_id: String,
327    #[serde(default)]
328    pub template_ref: Option<String>,
329    pub receipt: PersonaRunReceipt,
330    pub occurred_at_ms: i64,
331}
332
333#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
334pub struct PersonaCheckpointUpdate {
335    pub persona_id: String,
336    #[serde(default)]
337    pub template_ref: Option<String>,
338    pub action: PersonaCheckpointAction,
339    pub checkpoint_id: String,
340    #[serde(default)]
341    pub work_key: Option<String>,
342    /// Coordinates the runtime actually resumed from when acking a restore.
343    #[serde(default)]
344    pub resumed_from: Option<PersonaCheckpointResume>,
345    pub occurred_at_ms: i64,
346}
347
348#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
349#[serde(rename_all = "snake_case")]
350pub enum PersonaCheckpointAction {
351    RestoreAcked,
352}
353
354impl PersonaCheckpointAction {
355    pub fn as_str(self) -> &'static str {
356        match self {
357            Self::RestoreAcked => "restore_acked",
358        }
359    }
360}
361
362#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
363pub struct PersonaCheckpointResume {
364    pub run_id: Option<Uuid>,
365    pub lease_id: Option<String>,
366    pub last_run_ms: Option<i64>,
367    pub queued_work_keys: Vec<String>,
368    #[serde(default)]
369    pub note: Option<String>,
370}
371
372pub trait PersonaSupervisionSink: Send + Sync {
373    fn handle_supervision_event(&self, event: &PersonaSupervisionEvent);
374}
375
376struct TypedSinkRegistry<T: ?Sized + Send + Sync> {
377    sinks: RwLock<Vec<(u64, Arc<T>)>>,
378    next_id: AtomicU64,
379}
380
381impl<T: ?Sized + Send + Sync> TypedSinkRegistry<T> {
382    const fn new() -> Self {
383        Self {
384            sinks: RwLock::new(Vec::new()),
385            next_id: AtomicU64::new(1),
386        }
387    }
388
389    fn register(&self, sink: Arc<T>) -> u64 {
390        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
391        if let Ok(mut sinks) = self.sinks.write() {
392            sinks.push((id, sink));
393        }
394        id
395    }
396
397    fn unregister(&self, id: u64) {
398        if let Ok(mut sinks) = self.sinks.write() {
399            sinks.retain(|(existing, _)| *existing != id);
400        }
401    }
402
403    fn snapshot(&self) -> Vec<Arc<T>> {
404        self.sinks
405            .read()
406            .map(|sinks| sinks.iter().map(|(_, sink)| Arc::clone(sink)).collect())
407            .unwrap_or_default()
408    }
409}
410
411fn persona_value_sinks() -> &'static TypedSinkRegistry<dyn PersonaValueSink> {
412    static REGISTRY: OnceLock<TypedSinkRegistry<dyn PersonaValueSink>> = OnceLock::new();
413    REGISTRY.get_or_init(TypedSinkRegistry::new)
414}
415
416fn persona_supervision_sinks() -> &'static TypedSinkRegistry<dyn PersonaSupervisionSink> {
417    static REGISTRY: OnceLock<TypedSinkRegistry<dyn PersonaSupervisionSink>> = OnceLock::new();
418    REGISTRY.get_or_init(TypedSinkRegistry::new)
419}
420
421#[must_use = "dropping the registration immediately unregisters the sink"]
422pub struct PersonaValueSinkRegistration {
423    id: u64,
424}
425
426impl Drop for PersonaValueSinkRegistration {
427    fn drop(&mut self) {
428        persona_value_sinks().unregister(self.id);
429    }
430}
431
432pub fn register_persona_value_sink(
433    sink: Arc<dyn PersonaValueSink>,
434) -> PersonaValueSinkRegistration {
435    PersonaValueSinkRegistration {
436        id: persona_value_sinks().register(sink),
437    }
438}
439
440#[must_use = "dropping the registration immediately unregisters the sink"]
441pub struct PersonaSupervisionSinkRegistration {
442    id: u64,
443}
444
445impl Drop for PersonaSupervisionSinkRegistration {
446    fn drop(&mut self) {
447        persona_supervision_sinks().unregister(self.id);
448    }
449}
450
451pub fn register_persona_supervision_sink(
452    sink: Arc<dyn PersonaSupervisionSink>,
453) -> PersonaSupervisionSinkRegistration {
454    PersonaSupervisionSinkRegistration {
455        id: persona_supervision_sinks().register(sink),
456    }
457}
458
459pub async fn persona_status(
460    log: &Arc<AnyEventLog>,
461    binding: &PersonaRuntimeBinding,
462    now_ms: i64,
463) -> Result<PersonaStatus, String> {
464    let events = read_persona_events(log, &binding.name).await?;
465    let mut state = PersonaLifecycleState::Idle;
466    let mut last_run_ms = None;
467    let mut active_lease = None;
468    let mut last_error = None;
469    let mut queued = BTreeSet::<String>::new();
470    let mut completed = BTreeSet::<String>::new();
471    let mut disabled_events = 0usize;
472    let mut budget_receipt = None;
473    let mut budget_exhaustion_reason = None;
474    let mut spent = Vec::<(i64, f64, u64)>::new();
475    let mut queued_work = BTreeMap::<String, PersonaQueuedWork>::new();
476    let mut value_receipts = Vec::<PersonaValueReceipt>::new();
477
478    for (_, event) in events {
479        match event.kind.as_str() {
480            "persona.control.paused" => state = PersonaLifecycleState::Paused,
481            "persona.control.resumed" => state = PersonaLifecycleState::Idle,
482            "persona.control.disabled" => state = PersonaLifecycleState::Disabled,
483            "persona.control.draining" => state = PersonaLifecycleState::Draining,
484            "persona.lease.acquired" => {
485                if let Ok(lease) = serde_json::from_value::<PersonaLease>(event.payload.clone()) {
486                    active_lease = Some(lease);
487                    state = PersonaLifecycleState::Running;
488                }
489            }
490            "persona.lease.released" => {
491                active_lease = None;
492                if !matches!(
493                    state,
494                    PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
495                ) {
496                    state = PersonaLifecycleState::Idle;
497                }
498            }
499            "persona.lease.expired" => {
500                active_lease = None;
501                if !matches!(
502                    state,
503                    PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
504                ) {
505                    state = PersonaLifecycleState::Idle;
506                }
507            }
508            "persona.run.started" => state = PersonaLifecycleState::Running,
509            "persona.run.completed" => {
510                last_run_ms = event
511                    .payload
512                    .get("completed_at_ms")
513                    .and_then(serde_json::Value::as_i64)
514                    .or(Some(event.occurred_at_ms));
515                if let Some(work_key) = event
516                    .payload
517                    .get("work_key")
518                    .and_then(serde_json::Value::as_str)
519                {
520                    completed.insert(work_key.to_string());
521                }
522                if !matches!(
523                    state,
524                    PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
525                ) {
526                    state = PersonaLifecycleState::Idle;
527                }
528            }
529            "persona.run.failed" => {
530                state = PersonaLifecycleState::Failed;
531                last_error = event
532                    .payload
533                    .get("error")
534                    .and_then(serde_json::Value::as_str)
535                    .map(ToString::to_string);
536            }
537            "persona.trigger.queued" => {
538                if let Some(work_key) = event
539                    .payload
540                    .get("work_key")
541                    .and_then(serde_json::Value::as_str)
542                {
543                    queued.insert(work_key.to_string());
544                }
545                if let Some(item) = queued_work_from_event(&event)? {
546                    queued_work.insert(item.work_key.clone(), item);
547                }
548            }
549            "persona.trigger.dead_lettered" => disabled_events += 1,
550            "persona.budget.recorded" => {
551                budget_receipt = event
552                    .payload
553                    .get("receipt_id")
554                    .and_then(serde_json::Value::as_str)
555                    .map(ToString::to_string);
556                spent.push((
557                    event.occurred_at_ms,
558                    event
559                        .payload
560                        .get("cost_usd")
561                        .and_then(serde_json::Value::as_f64)
562                        .unwrap_or_default(),
563                    event
564                        .payload
565                        .get("tokens")
566                        .and_then(serde_json::Value::as_u64)
567                        .unwrap_or_default(),
568                ));
569            }
570            "persona.budget.exhausted" => {
571                budget_exhaustion_reason = event
572                    .payload
573                    .get("reason")
574                    .and_then(serde_json::Value::as_str)
575                    .map(ToString::to_string);
576                last_error = budget_exhaustion_reason
577                    .as_ref()
578                    .map(|reason| format!("persona budget exhausted: {reason}"));
579                budget_receipt = event
580                    .payload
581                    .get("receipt_id")
582                    .and_then(serde_json::Value::as_str)
583                    .map(ToString::to_string);
584            }
585            kind if kind.starts_with("persona.value.") => {
586                if let Some(receipt) = value_receipt_from_event(&event)? {
587                    value_receipts.push(receipt);
588                }
589            }
590            _ => {}
591        }
592    }
593
594    if let Some(lease) = active_lease.as_ref() {
595        if lease.expires_at_ms <= now_ms {
596            active_lease = None;
597            if !matches!(
598                state,
599                PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
600            ) {
601                state = PersonaLifecycleState::Idle;
602            }
603        }
604    }
605
606    queued.retain(|work_key| !completed.contains(work_key));
607    queued_work.retain(|work_key, _| !completed.contains(work_key));
608    let queued_work = queued_work.into_values().collect::<Vec<_>>();
609    let handoff_inbox = queued_work
610        .iter()
611        .filter_map(handoff_inbox_item)
612        .collect::<Vec<_>>();
613
614    let mut budget = budget_status(&binding.budget, &spent, now_ms);
615    if budget.reason.is_none() {
616        if let Some(reason) = budget_exhaustion_reason {
617            budget.exhausted = true;
618            budget.reason = Some(reason);
619        }
620    }
621    if budget.last_receipt_id.is_none() {
622        budget.last_receipt_id = budget_receipt;
623    }
624
625    let current_assignment = active_lease.as_ref().map(assignment_status_from_lease);
626
627    Ok(PersonaStatus {
628        name: binding.name.clone(),
629        template_ref: binding.template_ref.clone(),
630        state,
631        entry_workflow: binding.entry_workflow.clone(),
632        role: binding.name.clone(),
633        current_assignment,
634        last_run: last_run_ms.map(format_ms),
635        next_scheduled_run: next_scheduled_run(binding, last_run_ms, now_ms),
636        active_lease,
637        budget,
638        last_error,
639        queued_events: queued.len(),
640        queued_work,
641        handoff_inbox,
642        value_receipts,
643        disabled_events,
644        paused_event_policy: "queue_then_drain_on_resume".to_string(),
645    })
646}
647
648pub async fn pause_persona(
649    log: &Arc<AnyEventLog>,
650    binding: &PersonaRuntimeBinding,
651    now_ms: i64,
652) -> Result<PersonaStatus, String> {
653    append_persona_event(
654        log,
655        &binding.name,
656        "persona.control.paused",
657        json!({"paused_at_ms": now_ms, "policy": "queue_then_drain_on_resume"}),
658        now_ms,
659    )
660    .await?;
661    persona_status(log, binding, now_ms).await
662}
663
664pub async fn resume_persona(
665    log: &Arc<AnyEventLog>,
666    binding: &PersonaRuntimeBinding,
667    now_ms: i64,
668) -> Result<PersonaStatus, String> {
669    append_persona_event(
670        log,
671        &binding.name,
672        "persona.control.resumed",
673        json!({"resumed_at_ms": now_ms, "drain": true}),
674        now_ms,
675    )
676    .await?;
677    let queued = queued_events(log, &binding.name).await?;
678    for (envelope, cost) in queued {
679        let _ = run_for_envelope(log, binding, envelope, cost, now_ms).await?;
680    }
681    persona_status(log, binding, now_ms).await
682}
683
684pub async fn disable_persona(
685    log: &Arc<AnyEventLog>,
686    binding: &PersonaRuntimeBinding,
687    now_ms: i64,
688) -> Result<PersonaStatus, String> {
689    append_persona_event(
690        log,
691        &binding.name,
692        "persona.control.disabled",
693        json!({"disabled_at_ms": now_ms}),
694        now_ms,
695    )
696    .await?;
697    persona_status(log, binding, now_ms).await
698}
699
700pub async fn fire_schedule(
701    log: &Arc<AnyEventLog>,
702    binding: &PersonaRuntimeBinding,
703    cost: PersonaRunCost,
704    now_ms: i64,
705) -> Result<PersonaRunReceipt, String> {
706    let schedule = binding
707        .schedules
708        .first()
709        .cloned()
710        .unwrap_or_else(|| "manual".to_string());
711    let envelope = PersonaTriggerEnvelope {
712        provider: "schedule".to_string(),
713        kind: "cron.tick".to_string(),
714        subject_key: format!("schedule:{}:{schedule}:{}", binding.name, format_ms(now_ms)),
715        source_event_id: None,
716        received_at_ms: now_ms,
717        metadata: BTreeMap::from([
718            ("persona".to_string(), binding.name.clone()),
719            ("schedule".to_string(), schedule),
720            ("fired_at".to_string(), format_ms(now_ms)),
721        ]),
722        raw: json!({}),
723    };
724    append_persona_event(
725        log,
726        &binding.name,
727        "persona.schedule.fired",
728        json!({"persona": binding.name, "envelope": envelope}),
729        now_ms,
730    )
731    .await?;
732    run_for_envelope(log, binding, envelope, cost, now_ms).await
733}
734
735pub async fn fire_trigger(
736    log: &Arc<AnyEventLog>,
737    binding: &PersonaRuntimeBinding,
738    provider: &str,
739    kind: &str,
740    metadata: BTreeMap<String, String>,
741    cost: PersonaRunCost,
742    now_ms: i64,
743) -> Result<PersonaRunReceipt, String> {
744    let envelope = normalize_trigger_envelope(provider, kind, metadata, now_ms);
745    append_persona_event(
746        log,
747        &binding.name,
748        "persona.trigger.received",
749        json!({"persona": binding.name, "envelope": envelope}),
750        now_ms,
751    )
752    .await?;
753    run_for_envelope(log, binding, envelope, cost, now_ms).await
754}
755
756pub async fn record_persona_spend(
757    log: &Arc<AnyEventLog>,
758    binding: &PersonaRuntimeBinding,
759    cost: PersonaRunCost,
760    now_ms: i64,
761) -> Result<PersonaBudgetStatus, String> {
762    enforce_budget(log, binding, &cost, now_ms).await?;
763    append_budget_record(log, &binding.name, &cost, None, now_ms).await?;
764    persona_status(log, binding, now_ms)
765        .await
766        .map(|status| status.budget)
767}
768
769/// Report a `repair_worker_status` lifecycle transition for a sandboxed PR
770/// repair run.
771///
772/// Append-only and idempotent on `(repair_worker_id, lifecycle)`: replaying
773/// the same lifecycle is a no-op (`Ok(false)` indicates the event was already
774/// recorded). Hosted runtimes call this when a repair-worker job created by
775/// the persona transitions states.
776pub async fn report_repair_worker_status(
777    log: &Arc<AnyEventLog>,
778    binding: &PersonaRuntimeBinding,
779    status: PersonaRepairWorkerStatusUpdate,
780    now_ms: i64,
781) -> Result<bool, String> {
782    let mut status = status;
783    if status.persona_id.is_empty() {
784        status.persona_id = binding.name.clone();
785    }
786    if status.template_ref.is_none() {
787        status.template_ref = binding.template_ref.clone();
788    }
789    if status.occurred_at_ms == 0 {
790        status.occurred_at_ms = now_ms;
791    }
792    if status.last_heartbeat_ms == 0 {
793        status.last_heartbeat_ms = now_ms;
794    }
795
796    if repair_worker_status_recorded(log, &binding.name, &status).await? {
797        return Ok(false);
798    }
799    append_persona_event(
800        log,
801        &binding.name,
802        "persona.repair_worker.status",
803        serde_json::to_value(&status).map_err(|error| error.to_string())?,
804        status.occurred_at_ms,
805    )
806    .await?;
807    emit_persona_supervision_sink_event(&PersonaSupervisionEvent::RepairWorkerStatus(status));
808    Ok(true)
809}
810
811/// Acknowledge a checkpoint-restore request initiated by the supervision API.
812///
813/// Idempotent on `checkpoint_id`: a repeated restore request resolves to the
814/// same ack (returns `Ok(false)`). The runtime emits a typed
815/// `Checkpoint(action: RestoreAcked)` supervision event carrying the
816/// coordinates the runtime resumed from.
817pub async fn restore_persona_checkpoint(
818    log: &Arc<AnyEventLog>,
819    binding: &PersonaRuntimeBinding,
820    request: PersonaCheckpointRestoreRequest,
821    now_ms: i64,
822) -> Result<PersonaCheckpointRestoreOutcome, String> {
823    let PersonaCheckpointRestoreRequest {
824        checkpoint_id,
825        work_key,
826        resumed_from,
827    } = request;
828    let status = persona_status(log, binding, now_ms).await?;
829
830    if let Some(prior) = find_checkpoint_restore_ack(log, &binding.name, &checkpoint_id).await? {
831        return Ok(PersonaCheckpointRestoreOutcome {
832            acked: false,
833            update: prior,
834        });
835    }
836
837    let resume_coordinates = resumed_from.unwrap_or_else(|| PersonaCheckpointResume {
838        run_id: None,
839        lease_id: status.active_lease.as_ref().map(|lease| lease.id.clone()),
840        last_run_ms: status
841            .last_run
842            .as_deref()
843            .and_then(|value| parse_rfc3339_ms(value).ok()),
844        queued_work_keys: status
845            .queued_work
846            .iter()
847            .map(|item| item.work_key.clone())
848            .collect(),
849        note: None,
850    });
851
852    let update = PersonaCheckpointUpdate {
853        persona_id: binding.name.clone(),
854        template_ref: binding.template_ref.clone(),
855        action: PersonaCheckpointAction::RestoreAcked,
856        checkpoint_id: checkpoint_id.clone(),
857        work_key,
858        resumed_from: Some(resume_coordinates),
859        occurred_at_ms: now_ms,
860    };
861
862    append_persona_event(
863        log,
864        &binding.name,
865        "persona.checkpoint.restore_acked",
866        serde_json::to_value(&update).map_err(|error| error.to_string())?,
867        now_ms,
868    )
869    .await?;
870    emit_persona_supervision_sink_event(&PersonaSupervisionEvent::Checkpoint(update.clone()));
871    Ok(PersonaCheckpointRestoreOutcome {
872        acked: true,
873        update,
874    })
875}
876
877#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
878pub struct PersonaCheckpointRestoreRequest {
879    pub checkpoint_id: String,
880    #[serde(default)]
881    pub work_key: Option<String>,
882    #[serde(default)]
883    pub resumed_from: Option<PersonaCheckpointResume>,
884}
885
886#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
887pub struct PersonaCheckpointRestoreOutcome {
888    pub acked: bool,
889    pub update: PersonaCheckpointUpdate,
890}
891
892async fn repair_worker_status_recorded(
893    log: &Arc<AnyEventLog>,
894    persona: &str,
895    update: &PersonaRepairWorkerStatusUpdate,
896) -> Result<bool, String> {
897    let events = read_persona_events(log, persona).await?;
898    Ok(events.into_iter().any(|(_, event)| {
899        event.kind == "persona.repair_worker.status"
900            && event
901                .payload
902                .get("repair_worker_id")
903                .and_then(serde_json::Value::as_str)
904                == Some(update.repair_worker_id.as_str())
905            && event
906                .payload
907                .get("lifecycle")
908                .and_then(serde_json::Value::as_str)
909                == Some(update.lifecycle.as_str())
910    }))
911}
912
913async fn find_checkpoint_restore_ack(
914    log: &Arc<AnyEventLog>,
915    persona: &str,
916    checkpoint_id: &str,
917) -> Result<Option<PersonaCheckpointUpdate>, String> {
918    let events = read_persona_events(log, persona).await?;
919    for (_, event) in events.into_iter().rev() {
920        if event.kind != "persona.checkpoint.restore_acked" {
921            continue;
922        }
923        if event
924            .payload
925            .get("checkpoint_id")
926            .and_then(serde_json::Value::as_str)
927            != Some(checkpoint_id)
928        {
929            continue;
930        }
931        let update: PersonaCheckpointUpdate =
932            serde_json::from_value(event.payload).map_err(|error| error.to_string())?;
933        return Ok(Some(update));
934    }
935    Ok(None)
936}
937
938async fn run_for_envelope(
939    log: &Arc<AnyEventLog>,
940    binding: &PersonaRuntimeBinding,
941    envelope: PersonaTriggerEnvelope,
942    cost: PersonaRunCost,
943    now_ms: i64,
944) -> Result<PersonaRunReceipt, String> {
945    let pre_queue = queue_snapshot(log, binding, now_ms).await?;
946    let receipt = run_for_envelope_inner(log, binding, envelope, cost, now_ms).await?;
947    let post_queue = queue_snapshot(log, binding, now_ms).await?;
948    emit_queue_position_supervision(binding, &pre_queue, &post_queue, now_ms);
949    emit_receipt_supervision(binding, &receipt, now_ms);
950    Ok(receipt)
951}
952
953async fn run_for_envelope_inner(
954    log: &Arc<AnyEventLog>,
955    binding: &PersonaRuntimeBinding,
956    envelope: PersonaTriggerEnvelope,
957    cost: PersonaRunCost,
958    now_ms: i64,
959) -> Result<PersonaRunReceipt, String> {
960    let status = persona_status(log, binding, now_ms).await?;
961    match status.state {
962        PersonaLifecycleState::Paused => {
963            append_persona_event(
964                log,
965                &binding.name,
966                "persona.trigger.queued",
967                json!({
968                    "work_key": envelope.subject_key,
969                    "envelope": envelope,
970                    "cost": cost,
971                    "reason": "paused",
972                }),
973                now_ms,
974            )
975            .await?;
976            return Ok(PersonaRunReceipt {
977                status: "queued".to_string(),
978                persona: binding.name.clone(),
979                run_id: None,
980                work_key: envelope.subject_key,
981                lease: None,
982                queued: true,
983                error: None,
984                budget_receipt_id: None,
985            });
986        }
987        PersonaLifecycleState::Disabled => {
988            append_persona_event(
989                log,
990                &binding.name,
991                "persona.trigger.dead_lettered",
992                json!({
993                    "work_key": envelope.subject_key,
994                    "envelope": envelope,
995                    "reason": "disabled",
996                }),
997                now_ms,
998            )
999            .await?;
1000            return Ok(PersonaRunReceipt {
1001                status: "dead_lettered".to_string(),
1002                persona: binding.name.clone(),
1003                run_id: None,
1004                work_key: envelope.subject_key,
1005                lease: None,
1006                queued: false,
1007                error: Some("persona is disabled".to_string()),
1008                budget_receipt_id: None,
1009            });
1010        }
1011        _ => {}
1012    }
1013
1014    if let Err(error) = enforce_budget(log, binding, &cost, now_ms).await {
1015        return Ok(PersonaRunReceipt {
1016            status: "budget_exhausted".to_string(),
1017            persona: binding.name.clone(),
1018            run_id: None,
1019            work_key: envelope.subject_key,
1020            lease: None,
1021            queued: false,
1022            error: Some(error),
1023            budget_receipt_id: None,
1024        });
1025    }
1026
1027    if work_completed(log, &binding.name, &envelope.subject_key).await? {
1028        append_persona_event(
1029            log,
1030            &binding.name,
1031            "persona.trigger.duplicate",
1032            json!({
1033                "work_key": envelope.subject_key,
1034                "envelope": envelope,
1035                "reason": "already_completed",
1036            }),
1037            now_ms,
1038        )
1039        .await?;
1040        return Ok(PersonaRunReceipt {
1041            status: "duplicate".to_string(),
1042            persona: binding.name.clone(),
1043            run_id: None,
1044            work_key: envelope.subject_key,
1045            lease: None,
1046            queued: false,
1047            error: None,
1048            budget_receipt_id: None,
1049        });
1050    }
1051
1052    let Some(lease) = acquire_lease(
1053        log,
1054        binding,
1055        &envelope.subject_key,
1056        "persona-runtime",
1057        DEFAULT_LEASE_TTL_MS,
1058        now_ms,
1059    )
1060    .await?
1061    else {
1062        return Ok(PersonaRunReceipt {
1063            status: "lease_busy".to_string(),
1064            persona: binding.name.clone(),
1065            run_id: None,
1066            work_key: envelope.subject_key,
1067            lease: status.active_lease,
1068            queued: false,
1069            error: Some("active lease already owns persona work".to_string()),
1070            budget_receipt_id: None,
1071        });
1072    };
1073
1074    let run_id = Uuid::now_v7();
1075    let value_metadata = run_value_metadata(&envelope, &lease, &cost);
1076    append_persona_event(
1077        log,
1078        &binding.name,
1079        "persona.run.started",
1080        json!({
1081            "work_key": envelope.subject_key,
1082            "run_id": run_id,
1083            "started_at_ms": now_ms,
1084            "entry_workflow": binding.entry_workflow,
1085            "lease_id": lease.id,
1086        }),
1087        now_ms,
1088    )
1089    .await?;
1090    emit_persona_value_event(
1091        log,
1092        binding,
1093        run_id,
1094        PersonaValueEventDelta {
1095            kind: PersonaValueEventKind::RunStarted,
1096            metadata: value_metadata.clone(),
1097            ..Default::default()
1098        },
1099        now_ms,
1100    )
1101    .await?;
1102    let budget_receipt_id =
1103        append_budget_record(log, &binding.name, &cost, Some(&lease.id), now_ms).await?;
1104    if cost.avoided_cost_usd > 0.0 || cost.deterministic_steps > 0 {
1105        emit_persona_value_event(
1106            log,
1107            binding,
1108            run_id,
1109            PersonaValueEventDelta {
1110                kind: PersonaValueEventKind::DeterministicExecution,
1111                avoided_cost_usd: cost.avoided_cost_usd,
1112                deterministic_steps: cost.deterministic_steps.max(1),
1113                metadata: value_metadata.clone(),
1114                ..Default::default()
1115            },
1116            now_ms,
1117        )
1118        .await?;
1119    }
1120    if cost.frontier_escalations > 0 {
1121        emit_persona_value_event(
1122            log,
1123            binding,
1124            run_id,
1125            PersonaValueEventDelta {
1126                kind: PersonaValueEventKind::FrontierEscalation,
1127                paid_cost_usd: cost.cost_usd,
1128                llm_steps: cost.llm_steps.max(cost.frontier_escalations),
1129                metadata: value_metadata.clone(),
1130                ..Default::default()
1131            },
1132            now_ms,
1133        )
1134        .await?;
1135    }
1136    let completion_paid_cost = if cost.frontier_escalations > 0 {
1137        0.0
1138    } else {
1139        cost.cost_usd
1140    };
1141    let completion_llm_steps = if cost.frontier_escalations > 0 {
1142        0
1143    } else {
1144        cost.llm_steps
1145    };
1146    emit_persona_value_event(
1147        log,
1148        binding,
1149        run_id,
1150        PersonaValueEventDelta {
1151            kind: PersonaValueEventKind::RunCompleted,
1152            paid_cost_usd: completion_paid_cost,
1153            llm_steps: completion_llm_steps,
1154            metadata: value_metadata,
1155            ..Default::default()
1156        },
1157        now_ms,
1158    )
1159    .await?;
1160    append_persona_event(
1161        log,
1162        &binding.name,
1163        "persona.run.completed",
1164        json!({
1165            "work_key": envelope.subject_key,
1166            "run_id": run_id,
1167            "completed_at_ms": now_ms,
1168            "entry_workflow": binding.entry_workflow,
1169            "lease_id": lease.id,
1170        }),
1171        now_ms,
1172    )
1173    .await?;
1174    append_persona_event(
1175        log,
1176        &binding.name,
1177        "persona.lease.released",
1178        json!({
1179            "id": lease.id,
1180            "work_key": envelope.subject_key,
1181            "released_at_ms": now_ms,
1182        }),
1183        now_ms,
1184    )
1185    .await?;
1186    Ok(PersonaRunReceipt {
1187        status: "completed".to_string(),
1188        persona: binding.name.clone(),
1189        run_id: Some(run_id),
1190        work_key: envelope.subject_key,
1191        lease: Some(lease),
1192        queued: false,
1193        error: None,
1194        budget_receipt_id: Some(budget_receipt_id),
1195    })
1196}
1197
1198async fn acquire_lease(
1199    log: &Arc<AnyEventLog>,
1200    binding: &PersonaRuntimeBinding,
1201    work_key: &str,
1202    holder: &str,
1203    ttl_ms: i64,
1204    now_ms: i64,
1205) -> Result<Option<PersonaLease>, String> {
1206    let status = persona_status(log, binding, now_ms).await?;
1207    if let Some(lease) = status.active_lease {
1208        if lease.expires_at_ms > now_ms {
1209            append_persona_event(
1210                log,
1211                &binding.name,
1212                "persona.lease.conflict",
1213                json!({
1214                    "active_lease": lease,
1215                    "requested_work_key": work_key,
1216                    "at_ms": now_ms,
1217                }),
1218                now_ms,
1219            )
1220            .await?;
1221            return Ok(None);
1222        }
1223        append_persona_event(
1224            log,
1225            &binding.name,
1226            "persona.lease.expired",
1227            json!({
1228                "id": lease.id,
1229                "work_key": lease.work_key,
1230                "expired_at_ms": now_ms,
1231            }),
1232            now_ms,
1233        )
1234        .await?;
1235    }
1236
1237    let lease = PersonaLease {
1238        id: format!("persona_lease_{}", Uuid::now_v7()),
1239        holder: holder.to_string(),
1240        work_key: work_key.to_string(),
1241        acquired_at_ms: now_ms,
1242        expires_at_ms: now_ms + ttl_ms,
1243    };
1244    append_persona_event(
1245        log,
1246        &binding.name,
1247        "persona.lease.acquired",
1248        serde_json::to_value(&lease).map_err(|error| error.to_string())?,
1249        now_ms,
1250    )
1251    .await?;
1252    Ok(Some(lease))
1253}
1254
1255async fn enforce_budget(
1256    log: &Arc<AnyEventLog>,
1257    binding: &PersonaRuntimeBinding,
1258    cost: &PersonaRunCost,
1259    now_ms: i64,
1260) -> Result<(), String> {
1261    let status = persona_status(log, binding, now_ms).await?;
1262    let reason = if binding
1263        .budget
1264        .run_usd
1265        .is_some_and(|limit| cost.cost_usd > limit)
1266    {
1267        Some("run_usd")
1268    } else if binding
1269        .budget
1270        .daily_usd
1271        .is_some_and(|limit| status.budget.spent_today_usd + cost.cost_usd > limit)
1272    {
1273        Some("daily_usd")
1274    } else if binding
1275        .budget
1276        .hourly_usd
1277        .is_some_and(|limit| status.budget.spent_this_hour_usd + cost.cost_usd > limit)
1278    {
1279        Some("hourly_usd")
1280    } else if binding
1281        .budget
1282        .max_tokens
1283        .is_some_and(|limit| status.budget.tokens_today + cost.tokens > limit)
1284    {
1285        Some("max_tokens")
1286    } else {
1287        None
1288    };
1289
1290    if let Some(reason) = reason {
1291        let receipt_id = format!("persona_budget_{}", Uuid::now_v7());
1292        append_persona_event(
1293            log,
1294            &binding.name,
1295            "persona.budget.exhausted",
1296            json!({
1297                "receipt_id": receipt_id,
1298                "reason": reason,
1299                "attempted_cost_usd": cost.cost_usd,
1300                "attempted_tokens": cost.tokens,
1301                "persona": binding.name,
1302            }),
1303            now_ms,
1304        )
1305        .await?;
1306        return Err(format!("persona budget exhausted: {reason}"));
1307    }
1308
1309    Ok(())
1310}
1311
1312async fn append_budget_record(
1313    log: &Arc<AnyEventLog>,
1314    persona: &str,
1315    cost: &PersonaRunCost,
1316    lease_id: Option<&str>,
1317    now_ms: i64,
1318) -> Result<String, String> {
1319    let receipt_id = format!("persona_budget_{}", Uuid::now_v7());
1320    append_persona_event(
1321        log,
1322        persona,
1323        "persona.budget.recorded",
1324        json!({
1325            "receipt_id": receipt_id,
1326            "persona": persona,
1327            "cost_usd": cost.cost_usd,
1328            "tokens": cost.tokens,
1329            "lease_id": lease_id,
1330        }),
1331        now_ms,
1332    )
1333    .await?;
1334    Ok(receipt_id)
1335}
1336
1337fn normalize_trigger_envelope(
1338    provider: &str,
1339    kind: &str,
1340    metadata: BTreeMap<String, String>,
1341    now_ms: i64,
1342) -> PersonaTriggerEnvelope {
1343    let provider = provider.to_ascii_lowercase();
1344    let kind = kind.to_string();
1345    let source_event_id = metadata
1346        .get("event_id")
1347        .or_else(|| metadata.get("id"))
1348        .cloned();
1349    let subject_key = match provider.as_str() {
1350        "github" => {
1351            let repo = metadata
1352                .get("repository")
1353                .or_else(|| metadata.get("repository.full_name"))
1354                .cloned()
1355                .unwrap_or_else(|| "unknown".to_string());
1356            if let Some(number) = metadata
1357                .get("pr")
1358                .or_else(|| metadata.get("pull_request.number"))
1359                .or_else(|| metadata.get("number"))
1360            {
1361                format!("github:{repo}:pr:{number}")
1362            } else if let Some(check) = metadata
1363                .get("check_run.name")
1364                .or_else(|| metadata.get("check_name"))
1365            {
1366                format!("github:{repo}:check:{check}")
1367            } else {
1368                format!("github:{repo}:{kind}")
1369            }
1370        }
1371        "linear" => {
1372            let issue = metadata
1373                .get("issue_key")
1374                .or_else(|| metadata.get("issue.identifier"))
1375                .or_else(|| metadata.get("issue_id"))
1376                .or_else(|| metadata.get("id"))
1377                .cloned()
1378                .unwrap_or_else(|| "unknown".to_string());
1379            format!("linear:issue:{issue}")
1380        }
1381        "slack" => {
1382            let channel = metadata
1383                .get("channel")
1384                .or_else(|| metadata.get("channel_id"))
1385                .cloned()
1386                .unwrap_or_else(|| "unknown".to_string());
1387            let ts = metadata
1388                .get("ts")
1389                .or_else(|| metadata.get("event_ts"))
1390                .cloned()
1391                .unwrap_or_else(|| "unknown".to_string());
1392            format!("slack:{channel}:{ts}")
1393        }
1394        "webhook" => metadata
1395            .get("dedupe_key")
1396            .or_else(|| metadata.get("event_id"))
1397            .map(|value| format!("webhook:{value}"))
1398            .unwrap_or_else(|| format!("webhook:{kind}:{}", Uuid::now_v7())),
1399        _ => metadata
1400            .get("dedupe_key")
1401            .or_else(|| metadata.get("event_id"))
1402            .map(|value| format!("{provider}:{kind}:{value}"))
1403            .unwrap_or_else(|| format!("{provider}:{kind}:{}", Uuid::now_v7())),
1404    };
1405
1406    PersonaTriggerEnvelope {
1407        provider,
1408        kind,
1409        subject_key,
1410        source_event_id,
1411        received_at_ms: now_ms,
1412        raw: json!({"metadata": metadata}),
1413        metadata,
1414    }
1415}
1416
1417async fn queued_events(
1418    log: &Arc<AnyEventLog>,
1419    persona: &str,
1420) -> Result<Vec<(PersonaTriggerEnvelope, PersonaRunCost)>, String> {
1421    let events = read_persona_events(log, persona).await?;
1422    let mut queued = BTreeMap::<String, (PersonaTriggerEnvelope, PersonaRunCost)>::new();
1423    let mut completed = BTreeSet::<String>::new();
1424    for (_, event) in events {
1425        match event.kind.as_str() {
1426            "persona.trigger.queued" => {
1427                let Some(envelope) = event.payload.get("envelope") else {
1428                    continue;
1429                };
1430                let envelope: PersonaTriggerEnvelope =
1431                    serde_json::from_value(envelope.clone()).map_err(|error| error.to_string())?;
1432                let cost = event
1433                    .payload
1434                    .get("cost")
1435                    .cloned()
1436                    .map(serde_json::from_value::<PersonaRunCost>)
1437                    .transpose()
1438                    .map_err(|error| error.to_string())?
1439                    .unwrap_or_default();
1440                queued.insert(envelope.subject_key.clone(), (envelope, cost));
1441            }
1442            "persona.run.completed" => {
1443                if let Some(work_key) = event
1444                    .payload
1445                    .get("work_key")
1446                    .and_then(serde_json::Value::as_str)
1447                {
1448                    completed.insert(work_key.to_string());
1449                }
1450            }
1451            _ => {}
1452        }
1453    }
1454    queued.retain(|work_key, _| !completed.contains(work_key));
1455    Ok(queued.into_values().collect())
1456}
1457
1458fn assignment_status_from_lease(lease: &PersonaLease) -> PersonaAssignmentStatus {
1459    PersonaAssignmentStatus {
1460        work_key: lease.work_key.clone(),
1461        lease_id: lease.id.clone(),
1462        holder: lease.holder.clone(),
1463        acquired_at: format_ms(lease.acquired_at_ms),
1464        expires_at: format_ms(lease.expires_at_ms),
1465    }
1466}
1467
1468fn queued_work_from_event(event: &LogEvent) -> Result<Option<PersonaQueuedWork>, String> {
1469    let Some(envelope) = event.payload.get("envelope") else {
1470        return Ok(None);
1471    };
1472    let envelope: PersonaTriggerEnvelope =
1473        serde_json::from_value(envelope.clone()).map_err(|error| error.to_string())?;
1474    Ok(Some(PersonaQueuedWork {
1475        work_key: envelope.subject_key,
1476        provider: envelope.provider,
1477        kind: envelope.kind,
1478        queued_at: format_ms(event.occurred_at_ms),
1479        reason: event
1480            .payload
1481            .get("reason")
1482            .and_then(serde_json::Value::as_str)
1483            .unwrap_or("queued")
1484            .to_string(),
1485        source_event_id: envelope.source_event_id,
1486        metadata: envelope.metadata,
1487    }))
1488}
1489
1490fn handoff_inbox_item(work: &PersonaQueuedWork) -> Option<PersonaHandoffInboxItem> {
1491    if work.provider != "handoff" && !work.metadata.contains_key("handoff_id") {
1492        return None;
1493    }
1494    Some(PersonaHandoffInboxItem {
1495        work_key: work.work_key.clone(),
1496        handoff_id: work.metadata.get("handoff_id").cloned(),
1497        handoff_kind: work
1498            .metadata
1499            .get("handoff_kind")
1500            .or_else(|| work.metadata.get("kind"))
1501            .cloned(),
1502        source_persona: work.metadata.get("source_persona").cloned(),
1503        task: work.metadata.get("task").cloned(),
1504        queued_at: work.queued_at.clone(),
1505        reason: work.reason.clone(),
1506    })
1507}
1508
1509fn value_receipt_from_event(event: &LogEvent) -> Result<Option<PersonaValueReceipt>, String> {
1510    let Ok(value_event) = serde_json::from_value::<PersonaValueEvent>(event.payload.clone()) else {
1511        return Ok(None);
1512    };
1513    Ok(Some(PersonaValueReceipt {
1514        kind: value_event.kind,
1515        run_id: value_event.run_id,
1516        occurred_at: value_event
1517            .occurred_at
1518            .format(&Rfc3339)
1519            .map_err(|error| error.to_string())?,
1520        paid_cost_usd: value_event.paid_cost_usd,
1521        avoided_cost_usd: value_event.avoided_cost_usd,
1522        deterministic_steps: value_event.deterministic_steps,
1523        llm_steps: value_event.llm_steps,
1524        metadata: value_event.metadata,
1525    }))
1526}
1527
1528async fn work_completed(
1529    log: &Arc<AnyEventLog>,
1530    persona: &str,
1531    work_key: &str,
1532) -> Result<bool, String> {
1533    let events = read_persona_events(log, persona).await?;
1534    Ok(events.into_iter().any(|(_, event)| {
1535        event.kind == "persona.run.completed"
1536            && event
1537                .payload
1538                .get("work_key")
1539                .and_then(serde_json::Value::as_str)
1540                == Some(work_key)
1541    }))
1542}
1543
1544async fn read_persona_events(
1545    log: &Arc<AnyEventLog>,
1546    persona: &str,
1547) -> Result<Vec<(u64, LogEvent)>, String> {
1548    let topic = runtime_topic()?;
1549    Ok(log
1550        .read_range(&topic, None, usize::MAX)
1551        .await
1552        .map_err(|error| error.to_string())?
1553        .into_iter()
1554        .filter(|(_, event)| {
1555            event
1556                .headers
1557                .get("persona")
1558                .is_some_and(|name| name == persona)
1559        })
1560        .collect())
1561}
1562
1563async fn append_persona_event(
1564    log: &Arc<AnyEventLog>,
1565    persona: &str,
1566    kind: &str,
1567    payload: serde_json::Value,
1568    now_ms: i64,
1569) -> Result<u64, String> {
1570    let mut headers = BTreeMap::new();
1571    headers.insert("persona".to_string(), persona.to_string());
1572    let event = LogEvent {
1573        kind: kind.to_string(),
1574        payload,
1575        headers,
1576        occurred_at_ms: now_ms,
1577    };
1578    log.append(&runtime_topic()?, event)
1579        .await
1580        .map_err(|error| error.to_string())
1581}
1582
1583struct PersonaValueEventDelta {
1584    kind: PersonaValueEventKind,
1585    paid_cost_usd: f64,
1586    avoided_cost_usd: f64,
1587    deterministic_steps: i64,
1588    llm_steps: i64,
1589    metadata: serde_json::Value,
1590}
1591
1592impl Default for PersonaValueEventDelta {
1593    fn default() -> Self {
1594        Self {
1595            kind: PersonaValueEventKind::RunCompleted,
1596            paid_cost_usd: 0.0,
1597            avoided_cost_usd: 0.0,
1598            deterministic_steps: 0,
1599            llm_steps: 0,
1600            metadata: serde_json::Value::Null,
1601        }
1602    }
1603}
1604
1605async fn emit_persona_value_event(
1606    log: &Arc<AnyEventLog>,
1607    binding: &PersonaRuntimeBinding,
1608    run_id: Uuid,
1609    delta: PersonaValueEventDelta,
1610    now_ms: i64,
1611) -> Result<(), String> {
1612    let event = PersonaValueEvent {
1613        persona_id: binding.name.clone(),
1614        template_ref: binding.template_ref.clone(),
1615        run_id: Some(run_id),
1616        kind: delta.kind,
1617        paid_cost_usd: delta.paid_cost_usd.max(0.0),
1618        avoided_cost_usd: delta.avoided_cost_usd.max(0.0),
1619        deterministic_steps: delta.deterministic_steps.max(0),
1620        llm_steps: delta.llm_steps.max(0),
1621        metadata: delta.metadata,
1622        occurred_at: offset_datetime_from_ms(now_ms),
1623    };
1624    append_persona_event(
1625        log,
1626        &binding.name,
1627        &format!("persona.value.{}", event.kind.as_str()),
1628        serde_json::to_value(&event).map_err(|error| error.to_string())?,
1629        now_ms,
1630    )
1631    .await?;
1632    emit_persona_value_sink_event(&event);
1633    Ok(())
1634}
1635
1636fn emit_persona_value_sink_event(event: &PersonaValueEvent) {
1637    for sink in persona_value_sinks().snapshot() {
1638        sink.handle_value_event(event);
1639    }
1640}
1641
1642fn emit_persona_supervision_sink_event(event: &PersonaSupervisionEvent) {
1643    for sink in persona_supervision_sinks().snapshot() {
1644        sink.handle_supervision_event(event);
1645    }
1646}
1647
1648#[derive(Clone, Debug)]
1649struct QueueEntry {
1650    work_key: String,
1651    queued_at_ms: i64,
1652}
1653
1654async fn queue_snapshot(
1655    log: &Arc<AnyEventLog>,
1656    binding: &PersonaRuntimeBinding,
1657    now_ms: i64,
1658) -> Result<Vec<QueueEntry>, String> {
1659    let status = persona_status(log, binding, now_ms).await?;
1660    Ok(status
1661        .queued_work
1662        .into_iter()
1663        .map(|item| QueueEntry {
1664            queued_at_ms: parse_rfc3339_ms(&item.queued_at).unwrap_or(now_ms),
1665            work_key: item.work_key,
1666        })
1667        .collect())
1668}
1669
1670fn emit_queue_position_supervision(
1671    binding: &PersonaRuntimeBinding,
1672    before: &[QueueEntry],
1673    after: &[QueueEntry],
1674    now_ms: i64,
1675) {
1676    use std::collections::HashSet;
1677    let before_keys: HashSet<&str> = before.iter().map(|e| e.work_key.as_str()).collect();
1678    let after_keys: HashSet<&str> = after.iter().map(|e| e.work_key.as_str()).collect();
1679    let after_depth = after.len() as i64;
1680
1681    for (index, entry) in after.iter().enumerate() {
1682        if !before_keys.contains(entry.work_key.as_str()) {
1683            emit_persona_supervision_sink_event(&PersonaSupervisionEvent::QueuePosition(
1684                PersonaQueuePositionUpdate {
1685                    persona_id: binding.name.clone(),
1686                    template_ref: binding.template_ref.clone(),
1687                    work_key: entry.work_key.clone(),
1688                    queue_depth: after_depth,
1689                    position: (index + 1) as i64,
1690                    queued_at_ms: entry.queued_at_ms,
1691                    occurred_at_ms: now_ms,
1692                },
1693            ));
1694        }
1695    }
1696    for entry in before {
1697        if !after_keys.contains(entry.work_key.as_str()) {
1698            emit_persona_supervision_sink_event(&PersonaSupervisionEvent::QueuePosition(
1699                PersonaQueuePositionUpdate {
1700                    persona_id: binding.name.clone(),
1701                    template_ref: binding.template_ref.clone(),
1702                    work_key: entry.work_key.clone(),
1703                    queue_depth: after_depth,
1704                    position: 0,
1705                    queued_at_ms: entry.queued_at_ms,
1706                    occurred_at_ms: now_ms,
1707                },
1708            ));
1709        }
1710    }
1711}
1712
1713fn emit_receipt_supervision(
1714    binding: &PersonaRuntimeBinding,
1715    receipt: &PersonaRunReceipt,
1716    now_ms: i64,
1717) {
1718    emit_persona_supervision_sink_event(&PersonaSupervisionEvent::Receipt(PersonaReceiptUpdate {
1719        persona_id: binding.name.clone(),
1720        template_ref: binding.template_ref.clone(),
1721        receipt: receipt.clone(),
1722        occurred_at_ms: now_ms,
1723    }));
1724}
1725
1726fn run_value_metadata(
1727    envelope: &PersonaTriggerEnvelope,
1728    lease: &PersonaLease,
1729    cost: &PersonaRunCost,
1730) -> serde_json::Value {
1731    let mut metadata = serde_json::Map::new();
1732    metadata.insert("work_key".to_string(), json!(envelope.subject_key));
1733    metadata.insert("trigger_provider".to_string(), json!(envelope.provider));
1734    metadata.insert("trigger_kind".to_string(), json!(envelope.kind));
1735    metadata.insert("lease_id".to_string(), json!(lease.id));
1736    metadata.insert("tokens".to_string(), json!(cost.tokens));
1737    if cost.frontier_escalations > 0 {
1738        metadata.insert(
1739            "frontier_escalations".to_string(),
1740            json!(cost.frontier_escalations),
1741        );
1742    }
1743    match &cost.metadata {
1744        serde_json::Value::Null => {}
1745        serde_json::Value::Object(extra) => {
1746            metadata.extend(
1747                extra
1748                    .iter()
1749                    .map(|(key, value)| (key.clone(), value.clone())),
1750            );
1751        }
1752        extra => {
1753            metadata.insert("run_cost_metadata".to_string(), extra.clone());
1754        }
1755    }
1756    serde_json::Value::Object(metadata)
1757}
1758
1759fn budget_status(
1760    policy: &PersonaBudgetPolicy,
1761    spent: &[(i64, f64, u64)],
1762    now_ms: i64,
1763) -> PersonaBudgetStatus {
1764    let day_start = now_ms - (now_ms.rem_euclid(86_400_000));
1765    let hour_start = now_ms - (now_ms.rem_euclid(3_600_000));
1766    let mut spent_today_usd = 0.0;
1767    let mut spent_this_hour_usd = 0.0;
1768    let mut tokens_today = 0u64;
1769    let mut spent_last_run_usd = 0.0;
1770    for (at_ms, cost, tokens) in spent {
1771        spent_last_run_usd = *cost;
1772        if *at_ms >= day_start {
1773            spent_today_usd += cost;
1774            tokens_today += tokens;
1775        }
1776        if *at_ms >= hour_start {
1777            spent_this_hour_usd += cost;
1778        }
1779    }
1780
1781    let remaining_today_usd = policy
1782        .daily_usd
1783        .map(|limit| (limit - spent_today_usd).max(0.0));
1784    let remaining_hour_usd = policy
1785        .hourly_usd
1786        .map(|limit| (limit - spent_this_hour_usd).max(0.0));
1787    let reason = if policy
1788        .daily_usd
1789        .is_some_and(|limit| spent_today_usd >= limit && limit >= 0.0)
1790    {
1791        Some("daily_usd".to_string())
1792    } else if policy
1793        .hourly_usd
1794        .is_some_and(|limit| spent_this_hour_usd >= limit && limit >= 0.0)
1795    {
1796        Some("hourly_usd".to_string())
1797    } else if policy
1798        .max_tokens
1799        .is_some_and(|limit| tokens_today >= limit && limit > 0)
1800    {
1801        Some("max_tokens".to_string())
1802    } else {
1803        None
1804    };
1805
1806    PersonaBudgetStatus {
1807        daily_usd: policy.daily_usd,
1808        hourly_usd: policy.hourly_usd,
1809        run_usd: policy.run_usd,
1810        max_tokens: policy.max_tokens,
1811        spent_today_usd,
1812        spent_this_hour_usd,
1813        spent_last_run_usd,
1814        tokens_today,
1815        remaining_today_usd,
1816        remaining_hour_usd,
1817        exhausted: reason.is_some(),
1818        reason,
1819        last_receipt_id: None,
1820    }
1821}
1822
1823fn next_scheduled_run(
1824    binding: &PersonaRuntimeBinding,
1825    last_run_ms: Option<i64>,
1826    now_ms: i64,
1827) -> Option<String> {
1828    binding
1829        .schedules
1830        .iter()
1831        .filter_map(|schedule| next_cron_ms(schedule, last_run_ms.unwrap_or(now_ms)).ok())
1832        .min()
1833        .map(format_ms)
1834}
1835
1836fn next_cron_ms(schedule: &str, after_ms: i64) -> Result<i64, String> {
1837    let cron = schedule
1838        .parse::<Cron>()
1839        .map_err(|error| error.to_string())?;
1840    let after = Utc
1841        .timestamp_millis_opt(after_ms)
1842        .single()
1843        .ok_or_else(|| "invalid timestamp".to_string())?;
1844    let next = cron
1845        .find_next_occurrence(&after, false)
1846        .map_err(|error| error.to_string())?;
1847    Ok(next.timestamp_millis())
1848}
1849
1850pub fn now_ms() -> i64 {
1851    OffsetDateTime::now_utc().unix_timestamp_nanos() as i64 / 1_000_000
1852}
1853
1854fn offset_datetime_from_ms(ms: i64) -> OffsetDateTime {
1855    OffsetDateTime::from_unix_timestamp_nanos((ms as i128) * 1_000_000)
1856        .unwrap_or(OffsetDateTime::UNIX_EPOCH)
1857}
1858
1859pub fn format_ms(ms: i64) -> String {
1860    offset_datetime_from_ms(ms)
1861        .format(&Rfc3339)
1862        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
1863}
1864
1865pub fn parse_rfc3339_ms(value: &str) -> Result<i64, String> {
1866    let ts = OffsetDateTime::parse(value, &Rfc3339)
1867        .map_err(|error| format!("invalid RFC3339 timestamp '{value}': {error}"))?;
1868    Ok(ts.unix_timestamp_nanos() as i64 / 1_000_000)
1869}
1870
1871fn runtime_topic() -> Result<Topic, String> {
1872    Topic::new(PERSONA_RUNTIME_TOPIC).map_err(|error| error.to_string())
1873}
1874
1875#[cfg(test)]
1876mod tests {
1877    use super::*;
1878    use crate::event_log::{AnyEventLog, MemoryEventLog};
1879    use std::sync::Mutex;
1880
1881    struct CapturingValueSink {
1882        events: Arc<Mutex<Vec<PersonaValueEvent>>>,
1883    }
1884
1885    impl PersonaValueSink for CapturingValueSink {
1886        fn handle_value_event(&self, event: &PersonaValueEvent) {
1887            self.events.lock().unwrap().push(event.clone());
1888        }
1889    }
1890
1891    fn binding() -> PersonaRuntimeBinding {
1892        PersonaRuntimeBinding {
1893            name: "merge_captain".to_string(),
1894            template_ref: Some("software_factory@v0".to_string()),
1895            entry_workflow: "workflows/merge.harn#run".to_string(),
1896            schedules: vec!["*/30 * * * *".to_string()],
1897            triggers: vec!["github.pr_opened".to_string()],
1898            budget: PersonaBudgetPolicy {
1899                daily_usd: Some(0.02),
1900                hourly_usd: None,
1901                run_usd: Some(0.02),
1902                max_tokens: Some(100),
1903            },
1904        }
1905    }
1906
1907    fn log() -> Arc<AnyEventLog> {
1908        Arc::new(AnyEventLog::Memory(MemoryEventLog::new(64)))
1909    }
1910
1911    #[tokio::test]
1912    async fn schedule_tick_records_lifecycle_status_and_receipt() {
1913        let log = log();
1914        let binding = binding();
1915        let now = parse_rfc3339_ms("2026-04-24T12:30:00Z").unwrap();
1916        let receipt = fire_schedule(
1917            &log,
1918            &binding,
1919            PersonaRunCost {
1920                cost_usd: 0.01,
1921                tokens: 10,
1922                ..Default::default()
1923            },
1924            now,
1925        )
1926        .await
1927        .unwrap();
1928        assert_eq!(receipt.status, "completed");
1929        assert!(receipt.lease.is_some());
1930        let status = persona_status(&log, &binding, now).await.unwrap();
1931        assert_eq!(status.state, PersonaLifecycleState::Idle);
1932        assert_eq!(status.last_run.as_deref(), Some("2026-04-24T12:30:00Z"));
1933        assert!(status.next_scheduled_run.is_some());
1934        assert_eq!(status.budget.spent_today_usd, 0.01);
1935    }
1936
1937    #[tokio::test]
1938    async fn paused_personas_queue_and_resume_drains_once() {
1939        let log = log();
1940        let binding = binding();
1941        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
1942        pause_persona(&log, &binding, now).await.unwrap();
1943        let receipt = fire_trigger(
1944            &log,
1945            &binding,
1946            "github",
1947            "pull_request",
1948            BTreeMap::from([
1949                ("repository".to_string(), "burin-labs/harn".to_string()),
1950                ("number".to_string(), "462".to_string()),
1951            ]),
1952            PersonaRunCost::default(),
1953            now,
1954        )
1955        .await
1956        .unwrap();
1957        assert_eq!(receipt.status, "queued");
1958        assert_eq!(
1959            persona_status(&log, &binding, now)
1960                .await
1961                .unwrap()
1962                .queued_events,
1963            1
1964        );
1965        let status = resume_persona(&log, &binding, now + 1000).await.unwrap();
1966        assert_eq!(status.state, PersonaLifecycleState::Idle);
1967        assert_eq!(status.queued_events, 0);
1968    }
1969
1970    #[tokio::test]
1971    async fn resumed_queued_work_reuses_original_budget_cost() {
1972        let log = log();
1973        let mut binding = binding();
1974        binding.budget.run_usd = Some(0.01);
1975        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
1976        pause_persona(&log, &binding, now).await.unwrap();
1977        let queued = fire_trigger(
1978            &log,
1979            &binding,
1980            "github",
1981            "pull_request",
1982            BTreeMap::from([
1983                ("repository".to_string(), "burin-labs/harn".to_string()),
1984                ("number".to_string(), "1379".to_string()),
1985            ]),
1986            PersonaRunCost {
1987                cost_usd: 0.02,
1988                tokens: 1,
1989                ..Default::default()
1990            },
1991            now + 1,
1992        )
1993        .await
1994        .unwrap();
1995        assert_eq!(queued.status, "queued");
1996
1997        let status = resume_persona(&log, &binding, now + 2).await.unwrap();
1998
1999        assert_eq!(status.budget.reason.as_deref(), Some("run_usd"));
2000        assert!(status
2001            .last_error
2002            .as_deref()
2003            .is_some_and(|error| error.contains("run_usd")));
2004        assert_eq!(status.queued_events, 1);
2005    }
2006
2007    #[tokio::test]
2008    async fn duplicate_trigger_envelope_is_not_processed_twice() {
2009        let log = log();
2010        let binding = binding();
2011        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2012        let metadata = BTreeMap::from([
2013            ("repository".to_string(), "burin-labs/harn".to_string()),
2014            ("number".to_string(), "462".to_string()),
2015        ]);
2016        let first = fire_trigger(
2017            &log,
2018            &binding,
2019            "github",
2020            "pull_request",
2021            metadata.clone(),
2022            PersonaRunCost::default(),
2023            now,
2024        )
2025        .await
2026        .unwrap();
2027        let second = fire_trigger(
2028            &log,
2029            &binding,
2030            "github",
2031            "pull_request",
2032            metadata,
2033            PersonaRunCost::default(),
2034            now + 1000,
2035        )
2036        .await
2037        .unwrap();
2038        assert_eq!(first.status, "completed");
2039        assert_eq!(second.status, "duplicate");
2040        assert!(second.lease.is_none());
2041    }
2042
2043    #[tokio::test]
2044    async fn disabled_personas_dead_letter_events() {
2045        let log = log();
2046        let binding = binding();
2047        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2048        disable_persona(&log, &binding, now).await.unwrap();
2049        let receipt = fire_trigger(
2050            &log,
2051            &binding,
2052            "slack",
2053            "message",
2054            BTreeMap::from([
2055                ("channel".to_string(), "C123".to_string()),
2056                ("ts".to_string(), "1713988800.000100".to_string()),
2057            ]),
2058            PersonaRunCost::default(),
2059            now,
2060        )
2061        .await
2062        .unwrap();
2063        assert_eq!(receipt.status, "dead_lettered");
2064        let status = persona_status(&log, &binding, now).await.unwrap();
2065        assert_eq!(status.state, PersonaLifecycleState::Disabled);
2066        assert_eq!(status.disabled_events, 1);
2067    }
2068
2069    #[tokio::test]
2070    async fn budget_exhaustion_blocks_expensive_work() {
2071        let log = log();
2072        let mut binding = binding();
2073        binding.budget.daily_usd = Some(0.01);
2074        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2075        let receipt = fire_trigger(
2076            &log,
2077            &binding,
2078            "linear",
2079            "issue",
2080            BTreeMap::from([("issue_key".to_string(), "HAR-462".to_string())]),
2081            PersonaRunCost {
2082                cost_usd: 0.02,
2083                tokens: 1,
2084                ..Default::default()
2085            },
2086            now,
2087        )
2088        .await
2089        .unwrap();
2090        assert_eq!(receipt.status, "budget_exhausted");
2091        let status = persona_status(&log, &binding, now).await.unwrap();
2092        assert_eq!(status.budget.reason.as_deref(), Some("daily_usd"));
2093        assert!(status.budget.exhausted);
2094        assert!(status.last_error.as_deref().unwrap().contains("daily_usd"));
2095    }
2096
2097    #[tokio::test]
2098    async fn deterministic_predicate_hit_emits_value_event_with_avoided_cost() {
2099        let log = log();
2100        let binding = binding();
2101        let captured = Arc::new(Mutex::new(Vec::new()));
2102        let _registration = register_persona_value_sink(Arc::new(CapturingValueSink {
2103            events: captured.clone(),
2104        }));
2105        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2106
2107        let receipt = fire_trigger(
2108            &log,
2109            &binding,
2110            "github",
2111            "pull_request",
2112            BTreeMap::from([
2113                ("repository".to_string(), "burin-labs/harn".to_string()),
2114                ("number".to_string(), "715".to_string()),
2115            ]),
2116            PersonaRunCost {
2117                avoided_cost_usd: 0.0042,
2118                deterministic_steps: 1,
2119                metadata: json!({
2120                    "predicate": "pr_already_green",
2121                    "would_have_called_model": "gpt-5.4-mini",
2122                }),
2123                ..Default::default()
2124            },
2125            now,
2126        )
2127        .await
2128        .unwrap();
2129
2130        let run_id = receipt.run_id.expect("completed run has run_id");
2131        let events = captured.lock().unwrap().clone();
2132        let deterministic = events
2133            .iter()
2134            .find(|event| {
2135                event.kind == PersonaValueEventKind::DeterministicExecution
2136                    && event.run_id == Some(run_id)
2137            })
2138            .expect("deterministic execution value event");
2139        assert_eq!(deterministic.persona_id, "merge_captain");
2140        assert_eq!(
2141            deterministic.template_ref.as_deref(),
2142            Some("software_factory@v0")
2143        );
2144        assert_eq!(deterministic.run_id, Some(run_id));
2145        assert_eq!(deterministic.paid_cost_usd, 0.0);
2146        assert_eq!(deterministic.avoided_cost_usd, 0.0042);
2147        assert_eq!(deterministic.deterministic_steps, 1);
2148        assert_eq!(
2149            deterministic.metadata["predicate"].as_str(),
2150            Some("pr_already_green")
2151        );
2152
2153        let persisted = read_persona_events(&log, &binding.name).await.unwrap();
2154        assert!(persisted.iter().any(|(_, event)| {
2155            event.kind == "persona.value.deterministic_execution"
2156                && event.payload["avoided_cost_usd"] == json!(0.0042)
2157        }));
2158    }
2159
2160    #[tokio::test]
2161    async fn frontier_escalation_run_emits_value_event_with_paid_cost() {
2162        let log = log();
2163        let binding = binding();
2164        let captured = Arc::new(Mutex::new(Vec::new()));
2165        let _registration = register_persona_value_sink(Arc::new(CapturingValueSink {
2166            events: captured.clone(),
2167        }));
2168        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2169
2170        let receipt = fire_trigger(
2171            &log,
2172            &binding,
2173            "linear",
2174            "issue",
2175            BTreeMap::from([("issue_key".to_string(), "HAR-715".to_string())]),
2176            PersonaRunCost {
2177                cost_usd: 0.011,
2178                tokens: 20,
2179                llm_steps: 1,
2180                frontier_escalations: 1,
2181                metadata: json!({
2182                    "frontier_model": "gpt-5.4",
2183                    "escalation_reason": "high_risk_merge",
2184                }),
2185                ..Default::default()
2186            },
2187            now,
2188        )
2189        .await
2190        .unwrap();
2191
2192        let run_id = receipt.run_id.expect("completed run has run_id");
2193        let events = captured.lock().unwrap().clone();
2194        let escalation = events
2195            .iter()
2196            .find(|event| {
2197                event.kind == PersonaValueEventKind::FrontierEscalation
2198                    && event.run_id == Some(run_id)
2199            })
2200            .expect("frontier escalation value event");
2201        assert_eq!(escalation.run_id, Some(run_id));
2202        assert_eq!(escalation.paid_cost_usd, 0.011);
2203        assert_eq!(escalation.avoided_cost_usd, 0.0);
2204        assert_eq!(escalation.llm_steps, 1);
2205        assert_eq!(
2206            escalation.metadata["frontier_model"].as_str(),
2207            Some("gpt-5.4")
2208        );
2209
2210        let completion = events
2211            .iter()
2212            .find(|event| {
2213                event.kind == PersonaValueEventKind::RunCompleted && event.run_id == Some(run_id)
2214            })
2215            .expect("run completed value event");
2216        assert_eq!(completion.paid_cost_usd, 0.0);
2217    }
2218
2219    struct CapturingSupervisionSink {
2220        events: Arc<Mutex<Vec<PersonaSupervisionEvent>>>,
2221    }
2222
2223    impl PersonaSupervisionSink for CapturingSupervisionSink {
2224        fn handle_supervision_event(&self, event: &PersonaSupervisionEvent) {
2225            self.events.lock().unwrap().push(event.clone());
2226        }
2227    }
2228
2229    fn pr_metadata(repository: &str, number: &str) -> BTreeMap<String, String> {
2230        BTreeMap::from([
2231            ("repository".to_string(), repository.to_string()),
2232            ("number".to_string(), number.to_string()),
2233        ])
2234    }
2235
2236    fn binding_named(name: &str) -> PersonaRuntimeBinding {
2237        PersonaRuntimeBinding {
2238            name: name.to_string(),
2239            ..binding()
2240        }
2241    }
2242
2243    fn supervision_events_for(
2244        captured: &Arc<Mutex<Vec<PersonaSupervisionEvent>>>,
2245        persona: &str,
2246    ) -> Vec<PersonaSupervisionEvent> {
2247        captured
2248            .lock()
2249            .unwrap()
2250            .iter()
2251            .filter(|event| match event {
2252                PersonaSupervisionEvent::QueuePosition(update) => update.persona_id == persona,
2253                PersonaSupervisionEvent::RepairWorkerStatus(update) => update.persona_id == persona,
2254                PersonaSupervisionEvent::Receipt(update) => update.persona_id == persona,
2255                PersonaSupervisionEvent::Checkpoint(update) => update.persona_id == persona,
2256            })
2257            .cloned()
2258            .collect()
2259    }
2260
2261    async fn drive_pause_then_resume(binding: &PersonaRuntimeBinding, now: i64) {
2262        let log = log();
2263        pause_persona(&log, binding, now).await.unwrap();
2264        let _ = fire_trigger(
2265            &log,
2266            binding,
2267            "github",
2268            "pull_request",
2269            pr_metadata("burin-labs/harn", "1480"),
2270            PersonaRunCost::default(),
2271            now,
2272        )
2273        .await
2274        .unwrap();
2275        let _ = resume_persona(&log, binding, now + 1).await.unwrap();
2276        let _ = restore_persona_checkpoint(
2277            &log,
2278            binding,
2279            PersonaCheckpointRestoreRequest {
2280                checkpoint_id: "cp_42".to_string(),
2281                work_key: Some("github:burin-labs/harn:pr:1480".to_string()),
2282                resumed_from: Some(PersonaCheckpointResume {
2283                    note: Some("resumed from cp 42".to_string()),
2284                    ..Default::default()
2285                }),
2286            },
2287            now + 2,
2288        )
2289        .await
2290        .unwrap();
2291        let _ = report_repair_worker_status(
2292            &log,
2293            binding,
2294            PersonaRepairWorkerStatusUpdate {
2295                persona_id: String::new(),
2296                template_ref: None,
2297                repair_worker_id: "rw_42".to_string(),
2298                lifecycle: PersonaRepairWorkerLifecycle::Running,
2299                work_key: Some("github:burin-labs/harn:pr:1480".to_string()),
2300                lease_id: Some("persona_lease_xyz".to_string()),
2301                scratchpad_url: Some("https://factory.local/rw_42".to_string()),
2302                last_heartbeat_ms: 0,
2303                occurred_at_ms: 0,
2304            },
2305            now + 3,
2306        )
2307        .await
2308        .unwrap();
2309    }
2310
2311    #[tokio::test]
2312    async fn supervision_sink_emits_queue_position_and_receipt() {
2313        let captured = Arc::new(Mutex::new(Vec::new()));
2314        let _registration = register_persona_supervision_sink(Arc::new(CapturingSupervisionSink {
2315            events: captured.clone(),
2316        }));
2317        let log = log();
2318        let binding = binding_named("supervision_sink_emits_queue_position_and_receipt");
2319        let now = parse_rfc3339_ms("2026-05-01T00:00:00Z").unwrap();
2320
2321        pause_persona(&log, &binding, now).await.unwrap();
2322        fire_trigger(
2323            &log,
2324            &binding,
2325            "github",
2326            "pull_request",
2327            pr_metadata("burin-labs/harn", "1480"),
2328            PersonaRunCost::default(),
2329            now + 100,
2330        )
2331        .await
2332        .unwrap();
2333        resume_persona(&log, &binding, now + 200).await.unwrap();
2334
2335        let events = supervision_events_for(&captured, &binding.name);
2336        let queue_events: Vec<_> = events
2337            .iter()
2338            .filter_map(|event| match event {
2339                PersonaSupervisionEvent::QueuePosition(update) => Some(update.clone()),
2340                _ => None,
2341            })
2342            .collect();
2343        assert_eq!(queue_events.len(), 2, "enqueue + drain emitted");
2344        assert_eq!(queue_events[0].position, 1);
2345        assert_eq!(queue_events[0].queue_depth, 1);
2346        assert_eq!(queue_events[1].position, 0);
2347        assert_eq!(queue_events[1].queue_depth, 0);
2348        let receipt_events: Vec<_> = events
2349            .iter()
2350            .filter_map(|event| match event {
2351                PersonaSupervisionEvent::Receipt(update) => Some(update.clone()),
2352                _ => None,
2353            })
2354            .collect();
2355        assert_eq!(receipt_events.len(), 2, "queued + drained receipt");
2356        assert_eq!(receipt_events[0].receipt.status, "queued");
2357        assert_eq!(receipt_events[1].receipt.status, "completed");
2358        for event in &receipt_events {
2359            assert_eq!(event.receipt.persona, binding.name);
2360            assert_eq!(event.persona_id, binding.name);
2361            assert_eq!(event.template_ref.as_deref(), Some("software_factory@v0"));
2362        }
2363    }
2364
2365    #[tokio::test]
2366    async fn supervision_sink_emits_repair_worker_status_idempotently() {
2367        let captured = Arc::new(Mutex::new(Vec::new()));
2368        let _registration = register_persona_supervision_sink(Arc::new(CapturingSupervisionSink {
2369            events: captured.clone(),
2370        }));
2371        let log = log();
2372        let binding = binding_named("supervision_sink_emits_repair_worker_status_idempotently");
2373        let now = parse_rfc3339_ms("2026-05-01T01:00:00Z").unwrap();
2374        let update = PersonaRepairWorkerStatusUpdate {
2375            persona_id: String::new(),
2376            template_ref: None,
2377            repair_worker_id: "rw_test".to_string(),
2378            lifecycle: PersonaRepairWorkerLifecycle::Running,
2379            work_key: Some("github:burin-labs/harn:pr:1480".to_string()),
2380            lease_id: Some("persona_lease_abc".to_string()),
2381            scratchpad_url: Some("https://factory.local/rw_test".to_string()),
2382            last_heartbeat_ms: 0,
2383            occurred_at_ms: 0,
2384        };
2385        let first = report_repair_worker_status(&log, &binding, update.clone(), now)
2386            .await
2387            .unwrap();
2388        let second = report_repair_worker_status(&log, &binding, update.clone(), now + 5)
2389            .await
2390            .unwrap();
2391        assert!(first);
2392        assert!(!second, "second identical lifecycle is idempotent");
2393
2394        let mut next = update.clone();
2395        next.lifecycle = PersonaRepairWorkerLifecycle::Succeeded;
2396        let third = report_repair_worker_status(&log, &binding, next, now + 10)
2397            .await
2398            .unwrap();
2399        assert!(third);
2400
2401        let kinds: Vec<_> = supervision_events_for(&captured, &binding.name)
2402            .into_iter()
2403            .filter_map(|event| match event {
2404                PersonaSupervisionEvent::RepairWorkerStatus(update) => Some(update.lifecycle),
2405                _ => None,
2406            })
2407            .collect();
2408        assert_eq!(
2409            kinds,
2410            vec![
2411                PersonaRepairWorkerLifecycle::Running,
2412                PersonaRepairWorkerLifecycle::Succeeded
2413            ]
2414        );
2415    }
2416
2417    #[tokio::test]
2418    async fn supervision_sink_emits_checkpoint_restore_ack() {
2419        let captured = Arc::new(Mutex::new(Vec::new()));
2420        let _registration = register_persona_supervision_sink(Arc::new(CapturingSupervisionSink {
2421            events: captured.clone(),
2422        }));
2423        let log = log();
2424        let binding = binding_named("supervision_sink_emits_checkpoint_restore_ack");
2425        let now = parse_rfc3339_ms("2026-05-01T02:00:00Z").unwrap();
2426        fire_trigger(
2427            &log,
2428            &binding,
2429            "github",
2430            "pull_request",
2431            pr_metadata("burin-labs/harn", "1480"),
2432            PersonaRunCost::default(),
2433            now,
2434        )
2435        .await
2436        .unwrap();
2437
2438        let outcome = restore_persona_checkpoint(
2439            &log,
2440            &binding,
2441            PersonaCheckpointRestoreRequest {
2442                checkpoint_id: "cp_1".to_string(),
2443                work_key: Some("github:burin-labs/harn:pr:1480".to_string()),
2444                resumed_from: None,
2445            },
2446            now + 100,
2447        )
2448        .await
2449        .unwrap();
2450        assert!(outcome.acked);
2451        assert_eq!(outcome.update.checkpoint_id, "cp_1");
2452        let resume = outcome
2453            .update
2454            .resumed_from
2455            .as_ref()
2456            .expect("resume coordinates default-derived from status");
2457        assert_eq!(resume.last_run_ms, Some(now));
2458
2459        let replay = restore_persona_checkpoint(
2460            &log,
2461            &binding,
2462            PersonaCheckpointRestoreRequest {
2463                checkpoint_id: "cp_1".to_string(),
2464                work_key: None,
2465                resumed_from: None,
2466            },
2467            now + 200,
2468        )
2469        .await
2470        .unwrap();
2471        assert!(!replay.acked, "duplicate restore is a no-op ack");
2472        assert_eq!(replay.update.occurred_at_ms, now + 100);
2473
2474        let ack_events: Vec<_> = supervision_events_for(&captured, &binding.name)
2475            .into_iter()
2476            .filter_map(|event| match event {
2477                PersonaSupervisionEvent::Checkpoint(update) => Some(update),
2478                _ => None,
2479            })
2480            .collect();
2481        assert_eq!(ack_events.len(), 1, "ack emitted once, replay suppressed");
2482        assert_eq!(ack_events[0].action, PersonaCheckpointAction::RestoreAcked);
2483    }
2484
2485    #[tokio::test]
2486    async fn supervision_sink_replay_is_deterministic_under_recorded_clock() {
2487        use harn_clock::{ClockEventLog, PausedClock, RecordedClock};
2488        use time::OffsetDateTime;
2489
2490        let now_ms = parse_rfc3339_ms("2026-05-01T03:00:00Z").unwrap();
2491        async fn drive(now_ms: i64) -> (Vec<PersonaSupervisionEvent>, Vec<harn_clock::ClockEvent>) {
2492            let captured = Arc::new(Mutex::new(Vec::new()));
2493            let _registration =
2494                register_persona_supervision_sink(Arc::new(CapturingSupervisionSink {
2495                    events: captured.clone(),
2496                }));
2497            let paused = PausedClock::new(
2498                OffsetDateTime::from_unix_timestamp_nanos((now_ms as i128) * 1_000_000).unwrap(),
2499            );
2500            let recorded = Arc::new(RecordedClock::new(paused, Arc::new(ClockEventLog::new())));
2501            let binding = binding_named("supervision_replay_persona");
2502            let ts = harn_clock::now_wall_ms(&*recorded);
2503            drive_pause_then_resume(&binding, ts).await;
2504            let clock_log = recorded.log().snapshot();
2505            let events = supervision_events_for(&captured, &binding.name);
2506            (events, clock_log)
2507        }
2508
2509        let (events_a, clock_a) = drive(now_ms).await;
2510        let (events_b, clock_b) = drive(now_ms).await;
2511        // Run receipts carry per-run `run_id`s (UUIDv7) and lease ids that
2512        // differ across runs. Normalize them so the deterministic envelope
2513        // remains comparable.
2514        fn normalize(event: &PersonaSupervisionEvent) -> PersonaSupervisionEvent {
2515            match event.clone() {
2516                PersonaSupervisionEvent::Receipt(mut update) => {
2517                    update.receipt.run_id = None;
2518                    if let Some(lease) = update.receipt.lease.as_mut() {
2519                        lease.id = "lease".to_string();
2520                    }
2521                    update.receipt.budget_receipt_id = update
2522                        .receipt
2523                        .budget_receipt_id
2524                        .map(|_| "budget".to_string());
2525                    PersonaSupervisionEvent::Receipt(update)
2526                }
2527                PersonaSupervisionEvent::Checkpoint(mut update) => {
2528                    if let Some(resume) = update.resumed_from.as_mut() {
2529                        resume.run_id = None;
2530                        resume.lease_id = None;
2531                    }
2532                    PersonaSupervisionEvent::Checkpoint(update)
2533                }
2534                other => other,
2535            }
2536        }
2537        let a: Vec<_> = events_a.iter().map(normalize).collect();
2538        let b: Vec<_> = events_b.iter().map(normalize).collect();
2539        assert_eq!(a, b, "supervision sink emits identical event envelopes");
2540        assert_eq!(
2541            clock_a, clock_b,
2542            "recorded clock observation log is identical across replays"
2543        );
2544    }
2545}