Skip to main content

harn_vm/
personas.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::sync::{Arc, OnceLock, RwLock};
4
5use chrono::{TimeZone, Utc};
6use croner::Cron;
7use serde::{Deserialize, Serialize};
8use serde_json::json;
9use time::format_description::well_known::Rfc3339;
10use time::OffsetDateTime;
11use uuid::Uuid;
12
13use crate::event_log::{AnyEventLog, EventLog, LogEvent, Topic};
14
15pub const PERSONA_RUNTIME_TOPIC: &str = "persona.runtime.events";
16
17const DEFAULT_LEASE_TTL_MS: i64 = 5 * 60 * 1000;
18
19#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
20#[serde(rename_all = "snake_case")]
21pub enum PersonaLifecycleState {
22    Inactive,
23    Starting,
24    #[default]
25    Idle,
26    Running,
27    Paused,
28    Draining,
29    Failed,
30    Disabled,
31}
32
33impl PersonaLifecycleState {
34    pub fn as_str(self) -> &'static str {
35        match self {
36            Self::Inactive => "inactive",
37            Self::Starting => "starting",
38            Self::Idle => "idle",
39            Self::Running => "running",
40            Self::Paused => "paused",
41            Self::Draining => "draining",
42            Self::Failed => "failed",
43            Self::Disabled => "disabled",
44        }
45    }
46}
47
48#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
49pub struct PersonaBudgetPolicy {
50    pub daily_usd: Option<f64>,
51    pub hourly_usd: Option<f64>,
52    pub run_usd: Option<f64>,
53    pub max_tokens: Option<u64>,
54}
55
56#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
57pub struct PersonaRuntimeBinding {
58    pub name: String,
59    #[serde(default)]
60    pub template_ref: Option<String>,
61    pub entry_workflow: String,
62    #[serde(default)]
63    pub schedules: Vec<String>,
64    #[serde(default)]
65    pub triggers: Vec<String>,
66    #[serde(default)]
67    pub budget: PersonaBudgetPolicy,
68}
69
70#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
71pub struct PersonaLease {
72    pub id: String,
73    pub holder: String,
74    pub work_key: String,
75    pub acquired_at_ms: i64,
76    pub expires_at_ms: i64,
77}
78
79#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
80pub struct PersonaBudgetStatus {
81    pub daily_usd: Option<f64>,
82    pub hourly_usd: Option<f64>,
83    pub run_usd: Option<f64>,
84    pub max_tokens: Option<u64>,
85    pub spent_today_usd: f64,
86    pub spent_this_hour_usd: f64,
87    pub spent_last_run_usd: f64,
88    pub tokens_today: u64,
89    pub remaining_today_usd: Option<f64>,
90    pub remaining_hour_usd: Option<f64>,
91    pub exhausted: bool,
92    pub reason: Option<String>,
93    pub last_receipt_id: Option<String>,
94}
95
96#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
97pub struct PersonaStatus {
98    pub name: String,
99    #[serde(default)]
100    pub template_ref: Option<String>,
101    pub state: PersonaLifecycleState,
102    pub entry_workflow: String,
103    #[serde(default)]
104    pub role: String,
105    #[serde(default)]
106    pub current_assignment: Option<PersonaAssignmentStatus>,
107    pub last_run: Option<String>,
108    pub next_scheduled_run: Option<String>,
109    pub active_lease: Option<PersonaLease>,
110    pub budget: PersonaBudgetStatus,
111    pub last_error: Option<String>,
112    pub queued_events: usize,
113    #[serde(default)]
114    pub queued_work: Vec<PersonaQueuedWork>,
115    #[serde(default)]
116    pub handoff_inbox: Vec<PersonaHandoffInboxItem>,
117    #[serde(default)]
118    pub value_receipts: Vec<PersonaValueReceipt>,
119    pub disabled_events: usize,
120    pub paused_event_policy: String,
121}
122
123#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
124pub struct PersonaAssignmentStatus {
125    pub work_key: String,
126    pub lease_id: String,
127    pub holder: String,
128    pub acquired_at: String,
129    pub expires_at: String,
130}
131
132#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
133pub struct PersonaQueuedWork {
134    pub work_key: String,
135    pub provider: String,
136    pub kind: String,
137    pub queued_at: String,
138    pub reason: String,
139    pub source_event_id: Option<String>,
140    #[serde(default)]
141    pub metadata: BTreeMap<String, String>,
142}
143
144#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
145pub struct PersonaHandoffInboxItem {
146    pub work_key: String,
147    pub handoff_id: Option<String>,
148    pub handoff_kind: Option<String>,
149    pub source_persona: Option<String>,
150    pub task: Option<String>,
151    pub queued_at: String,
152    pub reason: String,
153}
154
155#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
156pub struct PersonaValueReceipt {
157    pub kind: PersonaValueEventKind,
158    pub run_id: Option<Uuid>,
159    pub occurred_at: String,
160    pub paid_cost_usd: f64,
161    pub avoided_cost_usd: f64,
162    pub deterministic_steps: i64,
163    pub llm_steps: i64,
164    pub metadata: serde_json::Value,
165}
166
167#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
168pub struct PersonaTriggerEnvelope {
169    pub provider: String,
170    pub kind: String,
171    pub subject_key: String,
172    pub source_event_id: Option<String>,
173    pub received_at_ms: i64,
174    #[serde(default)]
175    pub metadata: BTreeMap<String, String>,
176    #[serde(default)]
177    pub raw: serde_json::Value,
178}
179
180#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
181pub struct PersonaRunReceipt {
182    pub status: String,
183    pub persona: String,
184    #[serde(default)]
185    pub run_id: Option<Uuid>,
186    pub work_key: String,
187    pub lease: Option<PersonaLease>,
188    pub queued: bool,
189    pub error: Option<String>,
190    pub budget_receipt_id: Option<String>,
191}
192
193#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
194pub struct PersonaRunCost {
195    pub cost_usd: f64,
196    pub tokens: u64,
197    #[serde(default)]
198    pub avoided_cost_usd: f64,
199    #[serde(default)]
200    pub deterministic_steps: i64,
201    #[serde(default)]
202    pub llm_steps: i64,
203    #[serde(default)]
204    pub frontier_escalations: i64,
205    #[serde(default)]
206    pub metadata: serde_json::Value,
207}
208
209#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
210#[serde(rename_all = "snake_case")]
211pub enum PersonaValueEventKind {
212    RunStarted,
213    RunCompleted,
214    AcceptedOutcome,
215    FrontierEscalation,
216    DeterministicExecution,
217    PromotionSavings,
218    ApprovalWait,
219}
220
221impl PersonaValueEventKind {
222    pub fn as_str(self) -> &'static str {
223        match self {
224            Self::RunStarted => "run_started",
225            Self::RunCompleted => "run_completed",
226            Self::AcceptedOutcome => "accepted_outcome",
227            Self::FrontierEscalation => "frontier_escalation",
228            Self::DeterministicExecution => "deterministic_execution",
229            Self::PromotionSavings => "promotion_savings",
230            Self::ApprovalWait => "approval_wait",
231        }
232    }
233}
234
235#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
236pub struct PersonaValueEvent {
237    pub persona_id: String,
238    pub template_ref: Option<String>,
239    pub run_id: Option<Uuid>,
240    pub kind: PersonaValueEventKind,
241    pub paid_cost_usd: f64,
242    pub avoided_cost_usd: f64,
243    pub deterministic_steps: i64,
244    pub llm_steps: i64,
245    pub metadata: serde_json::Value,
246    pub occurred_at: OffsetDateTime,
247}
248
249pub trait PersonaValueSink: Send + Sync {
250    fn handle_value_event(&self, event: &PersonaValueEvent);
251}
252
253type PersonaValueSinkRegistry = RwLock<Vec<(u64, Arc<dyn PersonaValueSink>)>>;
254
255fn persona_value_sinks() -> &'static PersonaValueSinkRegistry {
256    static REGISTRY: OnceLock<PersonaValueSinkRegistry> = OnceLock::new();
257    REGISTRY.get_or_init(|| RwLock::new(Vec::new()))
258}
259
260fn next_persona_value_sink_id() -> u64 {
261    static NEXT_ID: AtomicU64 = AtomicU64::new(1);
262    NEXT_ID.fetch_add(1, Ordering::Relaxed)
263}
264
265pub struct PersonaValueSinkRegistration {
266    id: u64,
267}
268
269impl Drop for PersonaValueSinkRegistration {
270    fn drop(&mut self) {
271        if let Ok(mut sinks) = persona_value_sinks().write() {
272            sinks.retain(|(id, _)| *id != self.id);
273        }
274    }
275}
276
277pub fn register_persona_value_sink(
278    sink: Arc<dyn PersonaValueSink>,
279) -> PersonaValueSinkRegistration {
280    let id = next_persona_value_sink_id();
281    if let Ok(mut sinks) = persona_value_sinks().write() {
282        sinks.push((id, sink));
283    }
284    PersonaValueSinkRegistration { id }
285}
286
287pub async fn persona_status(
288    log: &Arc<AnyEventLog>,
289    binding: &PersonaRuntimeBinding,
290    now_ms: i64,
291) -> Result<PersonaStatus, String> {
292    let events = read_persona_events(log, &binding.name).await?;
293    let mut state = PersonaLifecycleState::Idle;
294    let mut last_run_ms = None;
295    let mut active_lease = None;
296    let mut last_error = None;
297    let mut queued = BTreeSet::<String>::new();
298    let mut completed = BTreeSet::<String>::new();
299    let mut disabled_events = 0usize;
300    let mut budget_receipt = None;
301    let mut budget_exhaustion_reason = None;
302    let mut spent = Vec::<(i64, f64, u64)>::new();
303    let mut queued_work = BTreeMap::<String, PersonaQueuedWork>::new();
304    let mut value_receipts = Vec::<PersonaValueReceipt>::new();
305
306    for (_, event) in events {
307        match event.kind.as_str() {
308            "persona.control.paused" => state = PersonaLifecycleState::Paused,
309            "persona.control.resumed" => state = PersonaLifecycleState::Idle,
310            "persona.control.disabled" => state = PersonaLifecycleState::Disabled,
311            "persona.control.draining" => state = PersonaLifecycleState::Draining,
312            "persona.lease.acquired" => {
313                if let Ok(lease) = serde_json::from_value::<PersonaLease>(event.payload.clone()) {
314                    active_lease = Some(lease);
315                    state = PersonaLifecycleState::Running;
316                }
317            }
318            "persona.lease.released" => {
319                active_lease = None;
320                if !matches!(
321                    state,
322                    PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
323                ) {
324                    state = PersonaLifecycleState::Idle;
325                }
326            }
327            "persona.lease.expired" => {
328                active_lease = None;
329                if !matches!(
330                    state,
331                    PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
332                ) {
333                    state = PersonaLifecycleState::Idle;
334                }
335            }
336            "persona.run.started" => state = PersonaLifecycleState::Running,
337            "persona.run.completed" => {
338                last_run_ms = event
339                    .payload
340                    .get("completed_at_ms")
341                    .and_then(serde_json::Value::as_i64)
342                    .or(Some(event.occurred_at_ms));
343                if let Some(work_key) = event
344                    .payload
345                    .get("work_key")
346                    .and_then(serde_json::Value::as_str)
347                {
348                    completed.insert(work_key.to_string());
349                }
350                if !matches!(
351                    state,
352                    PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
353                ) {
354                    state = PersonaLifecycleState::Idle;
355                }
356            }
357            "persona.run.failed" => {
358                state = PersonaLifecycleState::Failed;
359                last_error = event
360                    .payload
361                    .get("error")
362                    .and_then(serde_json::Value::as_str)
363                    .map(ToString::to_string);
364            }
365            "persona.trigger.queued" => {
366                if let Some(work_key) = event
367                    .payload
368                    .get("work_key")
369                    .and_then(serde_json::Value::as_str)
370                {
371                    queued.insert(work_key.to_string());
372                }
373                if let Some(item) = queued_work_from_event(&event)? {
374                    queued_work.insert(item.work_key.clone(), item);
375                }
376            }
377            "persona.trigger.dead_lettered" => disabled_events += 1,
378            "persona.budget.recorded" => {
379                budget_receipt = event
380                    .payload
381                    .get("receipt_id")
382                    .and_then(serde_json::Value::as_str)
383                    .map(ToString::to_string);
384                spent.push((
385                    event.occurred_at_ms,
386                    event
387                        .payload
388                        .get("cost_usd")
389                        .and_then(serde_json::Value::as_f64)
390                        .unwrap_or_default(),
391                    event
392                        .payload
393                        .get("tokens")
394                        .and_then(serde_json::Value::as_u64)
395                        .unwrap_or_default(),
396                ));
397            }
398            "persona.budget.exhausted" => {
399                budget_exhaustion_reason = event
400                    .payload
401                    .get("reason")
402                    .and_then(serde_json::Value::as_str)
403                    .map(ToString::to_string);
404                last_error = budget_exhaustion_reason
405                    .as_ref()
406                    .map(|reason| format!("persona budget exhausted: {reason}"));
407                budget_receipt = event
408                    .payload
409                    .get("receipt_id")
410                    .and_then(serde_json::Value::as_str)
411                    .map(ToString::to_string);
412            }
413            kind if kind.starts_with("persona.value.") => {
414                if let Some(receipt) = value_receipt_from_event(&event)? {
415                    value_receipts.push(receipt);
416                }
417            }
418            _ => {}
419        }
420    }
421
422    if let Some(lease) = active_lease.as_ref() {
423        if lease.expires_at_ms <= now_ms {
424            active_lease = None;
425            if !matches!(
426                state,
427                PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
428            ) {
429                state = PersonaLifecycleState::Idle;
430            }
431        }
432    }
433
434    queued.retain(|work_key| !completed.contains(work_key));
435    queued_work.retain(|work_key, _| !completed.contains(work_key));
436    let queued_work = queued_work.into_values().collect::<Vec<_>>();
437    let handoff_inbox = queued_work
438        .iter()
439        .filter_map(handoff_inbox_item)
440        .collect::<Vec<_>>();
441
442    let mut budget = budget_status(&binding.budget, &spent, now_ms);
443    if budget.reason.is_none() {
444        if let Some(reason) = budget_exhaustion_reason {
445            budget.exhausted = true;
446            budget.reason = Some(reason);
447        }
448    }
449    if budget.last_receipt_id.is_none() {
450        budget.last_receipt_id = budget_receipt;
451    }
452
453    let current_assignment = active_lease.as_ref().map(assignment_status_from_lease);
454
455    Ok(PersonaStatus {
456        name: binding.name.clone(),
457        template_ref: binding.template_ref.clone(),
458        state,
459        entry_workflow: binding.entry_workflow.clone(),
460        role: binding.name.clone(),
461        current_assignment,
462        last_run: last_run_ms.map(format_ms),
463        next_scheduled_run: next_scheduled_run(binding, last_run_ms, now_ms),
464        active_lease,
465        budget,
466        last_error,
467        queued_events: queued.len(),
468        queued_work,
469        handoff_inbox,
470        value_receipts,
471        disabled_events,
472        paused_event_policy: "queue_then_drain_on_resume".to_string(),
473    })
474}
475
476pub async fn pause_persona(
477    log: &Arc<AnyEventLog>,
478    binding: &PersonaRuntimeBinding,
479    now_ms: i64,
480) -> Result<PersonaStatus, String> {
481    append_persona_event(
482        log,
483        &binding.name,
484        "persona.control.paused",
485        json!({"paused_at_ms": now_ms, "policy": "queue_then_drain_on_resume"}),
486        now_ms,
487    )
488    .await?;
489    persona_status(log, binding, now_ms).await
490}
491
492pub async fn resume_persona(
493    log: &Arc<AnyEventLog>,
494    binding: &PersonaRuntimeBinding,
495    now_ms: i64,
496) -> Result<PersonaStatus, String> {
497    append_persona_event(
498        log,
499        &binding.name,
500        "persona.control.resumed",
501        json!({"resumed_at_ms": now_ms, "drain": true}),
502        now_ms,
503    )
504    .await?;
505    let queued = queued_events(log, &binding.name).await?;
506    for (envelope, cost) in queued {
507        let _ = run_for_envelope(log, binding, envelope, cost, now_ms).await?;
508    }
509    persona_status(log, binding, now_ms).await
510}
511
512pub async fn disable_persona(
513    log: &Arc<AnyEventLog>,
514    binding: &PersonaRuntimeBinding,
515    now_ms: i64,
516) -> Result<PersonaStatus, String> {
517    append_persona_event(
518        log,
519        &binding.name,
520        "persona.control.disabled",
521        json!({"disabled_at_ms": now_ms}),
522        now_ms,
523    )
524    .await?;
525    persona_status(log, binding, now_ms).await
526}
527
528pub async fn fire_schedule(
529    log: &Arc<AnyEventLog>,
530    binding: &PersonaRuntimeBinding,
531    cost: PersonaRunCost,
532    now_ms: i64,
533) -> Result<PersonaRunReceipt, String> {
534    let schedule = binding
535        .schedules
536        .first()
537        .cloned()
538        .unwrap_or_else(|| "manual".to_string());
539    let envelope = PersonaTriggerEnvelope {
540        provider: "schedule".to_string(),
541        kind: "cron.tick".to_string(),
542        subject_key: format!("schedule:{}:{schedule}:{}", binding.name, format_ms(now_ms)),
543        source_event_id: None,
544        received_at_ms: now_ms,
545        metadata: BTreeMap::from([
546            ("persona".to_string(), binding.name.clone()),
547            ("schedule".to_string(), schedule),
548            ("fired_at".to_string(), format_ms(now_ms)),
549        ]),
550        raw: json!({}),
551    };
552    append_persona_event(
553        log,
554        &binding.name,
555        "persona.schedule.fired",
556        json!({"persona": binding.name, "envelope": envelope}),
557        now_ms,
558    )
559    .await?;
560    run_for_envelope(log, binding, envelope, cost, now_ms).await
561}
562
563pub async fn fire_trigger(
564    log: &Arc<AnyEventLog>,
565    binding: &PersonaRuntimeBinding,
566    provider: &str,
567    kind: &str,
568    metadata: BTreeMap<String, String>,
569    cost: PersonaRunCost,
570    now_ms: i64,
571) -> Result<PersonaRunReceipt, String> {
572    let envelope = normalize_trigger_envelope(provider, kind, metadata, now_ms);
573    append_persona_event(
574        log,
575        &binding.name,
576        "persona.trigger.received",
577        json!({"persona": binding.name, "envelope": envelope}),
578        now_ms,
579    )
580    .await?;
581    run_for_envelope(log, binding, envelope, cost, now_ms).await
582}
583
584pub async fn record_persona_spend(
585    log: &Arc<AnyEventLog>,
586    binding: &PersonaRuntimeBinding,
587    cost: PersonaRunCost,
588    now_ms: i64,
589) -> Result<PersonaBudgetStatus, String> {
590    enforce_budget(log, binding, &cost, now_ms).await?;
591    append_budget_record(log, &binding.name, &cost, None, now_ms).await?;
592    persona_status(log, binding, now_ms)
593        .await
594        .map(|status| status.budget)
595}
596
597async fn run_for_envelope(
598    log: &Arc<AnyEventLog>,
599    binding: &PersonaRuntimeBinding,
600    envelope: PersonaTriggerEnvelope,
601    cost: PersonaRunCost,
602    now_ms: i64,
603) -> Result<PersonaRunReceipt, String> {
604    let status = persona_status(log, binding, now_ms).await?;
605    match status.state {
606        PersonaLifecycleState::Paused => {
607            append_persona_event(
608                log,
609                &binding.name,
610                "persona.trigger.queued",
611                json!({
612                    "work_key": envelope.subject_key,
613                    "envelope": envelope,
614                    "cost": cost,
615                    "reason": "paused",
616                }),
617                now_ms,
618            )
619            .await?;
620            return Ok(PersonaRunReceipt {
621                status: "queued".to_string(),
622                persona: binding.name.clone(),
623                run_id: None,
624                work_key: envelope.subject_key,
625                lease: None,
626                queued: true,
627                error: None,
628                budget_receipt_id: None,
629            });
630        }
631        PersonaLifecycleState::Disabled => {
632            append_persona_event(
633                log,
634                &binding.name,
635                "persona.trigger.dead_lettered",
636                json!({
637                    "work_key": envelope.subject_key,
638                    "envelope": envelope,
639                    "reason": "disabled",
640                }),
641                now_ms,
642            )
643            .await?;
644            return Ok(PersonaRunReceipt {
645                status: "dead_lettered".to_string(),
646                persona: binding.name.clone(),
647                run_id: None,
648                work_key: envelope.subject_key,
649                lease: None,
650                queued: false,
651                error: Some("persona is disabled".to_string()),
652                budget_receipt_id: None,
653            });
654        }
655        _ => {}
656    }
657
658    if let Err(error) = enforce_budget(log, binding, &cost, now_ms).await {
659        return Ok(PersonaRunReceipt {
660            status: "budget_exhausted".to_string(),
661            persona: binding.name.clone(),
662            run_id: None,
663            work_key: envelope.subject_key,
664            lease: None,
665            queued: false,
666            error: Some(error),
667            budget_receipt_id: None,
668        });
669    }
670
671    if work_completed(log, &binding.name, &envelope.subject_key).await? {
672        append_persona_event(
673            log,
674            &binding.name,
675            "persona.trigger.duplicate",
676            json!({
677                "work_key": envelope.subject_key,
678                "envelope": envelope,
679                "reason": "already_completed",
680            }),
681            now_ms,
682        )
683        .await?;
684        return Ok(PersonaRunReceipt {
685            status: "duplicate".to_string(),
686            persona: binding.name.clone(),
687            run_id: None,
688            work_key: envelope.subject_key,
689            lease: None,
690            queued: false,
691            error: None,
692            budget_receipt_id: None,
693        });
694    }
695
696    let Some(lease) = acquire_lease(
697        log,
698        binding,
699        &envelope.subject_key,
700        "persona-runtime",
701        DEFAULT_LEASE_TTL_MS,
702        now_ms,
703    )
704    .await?
705    else {
706        return Ok(PersonaRunReceipt {
707            status: "lease_busy".to_string(),
708            persona: binding.name.clone(),
709            run_id: None,
710            work_key: envelope.subject_key,
711            lease: status.active_lease,
712            queued: false,
713            error: Some("active lease already owns persona work".to_string()),
714            budget_receipt_id: None,
715        });
716    };
717
718    let run_id = Uuid::now_v7();
719    let value_metadata = run_value_metadata(&envelope, &lease, &cost);
720    append_persona_event(
721        log,
722        &binding.name,
723        "persona.run.started",
724        json!({
725            "work_key": envelope.subject_key,
726            "run_id": run_id,
727            "started_at_ms": now_ms,
728            "entry_workflow": binding.entry_workflow,
729            "lease_id": lease.id,
730        }),
731        now_ms,
732    )
733    .await?;
734    emit_persona_value_event(
735        log,
736        binding,
737        run_id,
738        PersonaValueEventDelta {
739            kind: PersonaValueEventKind::RunStarted,
740            metadata: value_metadata.clone(),
741            ..Default::default()
742        },
743        now_ms,
744    )
745    .await?;
746    let budget_receipt_id =
747        append_budget_record(log, &binding.name, &cost, Some(&lease.id), now_ms).await?;
748    if cost.avoided_cost_usd > 0.0 || cost.deterministic_steps > 0 {
749        emit_persona_value_event(
750            log,
751            binding,
752            run_id,
753            PersonaValueEventDelta {
754                kind: PersonaValueEventKind::DeterministicExecution,
755                avoided_cost_usd: cost.avoided_cost_usd,
756                deterministic_steps: cost.deterministic_steps.max(1),
757                metadata: value_metadata.clone(),
758                ..Default::default()
759            },
760            now_ms,
761        )
762        .await?;
763    }
764    if cost.frontier_escalations > 0 {
765        emit_persona_value_event(
766            log,
767            binding,
768            run_id,
769            PersonaValueEventDelta {
770                kind: PersonaValueEventKind::FrontierEscalation,
771                paid_cost_usd: cost.cost_usd,
772                llm_steps: cost.llm_steps.max(cost.frontier_escalations),
773                metadata: value_metadata.clone(),
774                ..Default::default()
775            },
776            now_ms,
777        )
778        .await?;
779    }
780    let completion_paid_cost = if cost.frontier_escalations > 0 {
781        0.0
782    } else {
783        cost.cost_usd
784    };
785    let completion_llm_steps = if cost.frontier_escalations > 0 {
786        0
787    } else {
788        cost.llm_steps
789    };
790    emit_persona_value_event(
791        log,
792        binding,
793        run_id,
794        PersonaValueEventDelta {
795            kind: PersonaValueEventKind::RunCompleted,
796            paid_cost_usd: completion_paid_cost,
797            llm_steps: completion_llm_steps,
798            metadata: value_metadata,
799            ..Default::default()
800        },
801        now_ms,
802    )
803    .await?;
804    append_persona_event(
805        log,
806        &binding.name,
807        "persona.run.completed",
808        json!({
809            "work_key": envelope.subject_key,
810            "run_id": run_id,
811            "completed_at_ms": now_ms,
812            "entry_workflow": binding.entry_workflow,
813            "lease_id": lease.id,
814        }),
815        now_ms,
816    )
817    .await?;
818    append_persona_event(
819        log,
820        &binding.name,
821        "persona.lease.released",
822        json!({
823            "id": lease.id,
824            "work_key": envelope.subject_key,
825            "released_at_ms": now_ms,
826        }),
827        now_ms,
828    )
829    .await?;
830    Ok(PersonaRunReceipt {
831        status: "completed".to_string(),
832        persona: binding.name.clone(),
833        run_id: Some(run_id),
834        work_key: envelope.subject_key,
835        lease: Some(lease),
836        queued: false,
837        error: None,
838        budget_receipt_id: Some(budget_receipt_id),
839    })
840}
841
842async fn acquire_lease(
843    log: &Arc<AnyEventLog>,
844    binding: &PersonaRuntimeBinding,
845    work_key: &str,
846    holder: &str,
847    ttl_ms: i64,
848    now_ms: i64,
849) -> Result<Option<PersonaLease>, String> {
850    let status = persona_status(log, binding, now_ms).await?;
851    if let Some(lease) = status.active_lease {
852        if lease.expires_at_ms > now_ms {
853            append_persona_event(
854                log,
855                &binding.name,
856                "persona.lease.conflict",
857                json!({
858                    "active_lease": lease,
859                    "requested_work_key": work_key,
860                    "at_ms": now_ms,
861                }),
862                now_ms,
863            )
864            .await?;
865            return Ok(None);
866        }
867        append_persona_event(
868            log,
869            &binding.name,
870            "persona.lease.expired",
871            json!({
872                "id": lease.id,
873                "work_key": lease.work_key,
874                "expired_at_ms": now_ms,
875            }),
876            now_ms,
877        )
878        .await?;
879    }
880
881    let lease = PersonaLease {
882        id: format!("persona_lease_{}", Uuid::now_v7()),
883        holder: holder.to_string(),
884        work_key: work_key.to_string(),
885        acquired_at_ms: now_ms,
886        expires_at_ms: now_ms + ttl_ms,
887    };
888    append_persona_event(
889        log,
890        &binding.name,
891        "persona.lease.acquired",
892        serde_json::to_value(&lease).map_err(|error| error.to_string())?,
893        now_ms,
894    )
895    .await?;
896    Ok(Some(lease))
897}
898
899async fn enforce_budget(
900    log: &Arc<AnyEventLog>,
901    binding: &PersonaRuntimeBinding,
902    cost: &PersonaRunCost,
903    now_ms: i64,
904) -> Result<(), String> {
905    let status = persona_status(log, binding, now_ms).await?;
906    let reason = if binding
907        .budget
908        .run_usd
909        .is_some_and(|limit| cost.cost_usd > limit)
910    {
911        Some("run_usd")
912    } else if binding
913        .budget
914        .daily_usd
915        .is_some_and(|limit| status.budget.spent_today_usd + cost.cost_usd > limit)
916    {
917        Some("daily_usd")
918    } else if binding
919        .budget
920        .hourly_usd
921        .is_some_and(|limit| status.budget.spent_this_hour_usd + cost.cost_usd > limit)
922    {
923        Some("hourly_usd")
924    } else if binding
925        .budget
926        .max_tokens
927        .is_some_and(|limit| status.budget.tokens_today + cost.tokens > limit)
928    {
929        Some("max_tokens")
930    } else {
931        None
932    };
933
934    if let Some(reason) = reason {
935        let receipt_id = format!("persona_budget_{}", Uuid::now_v7());
936        append_persona_event(
937            log,
938            &binding.name,
939            "persona.budget.exhausted",
940            json!({
941                "receipt_id": receipt_id,
942                "reason": reason,
943                "attempted_cost_usd": cost.cost_usd,
944                "attempted_tokens": cost.tokens,
945                "persona": binding.name,
946            }),
947            now_ms,
948        )
949        .await?;
950        return Err(format!("persona budget exhausted: {reason}"));
951    }
952
953    Ok(())
954}
955
956async fn append_budget_record(
957    log: &Arc<AnyEventLog>,
958    persona: &str,
959    cost: &PersonaRunCost,
960    lease_id: Option<&str>,
961    now_ms: i64,
962) -> Result<String, String> {
963    let receipt_id = format!("persona_budget_{}", Uuid::now_v7());
964    append_persona_event(
965        log,
966        persona,
967        "persona.budget.recorded",
968        json!({
969            "receipt_id": receipt_id,
970            "persona": persona,
971            "cost_usd": cost.cost_usd,
972            "tokens": cost.tokens,
973            "lease_id": lease_id,
974        }),
975        now_ms,
976    )
977    .await?;
978    Ok(receipt_id)
979}
980
981fn normalize_trigger_envelope(
982    provider: &str,
983    kind: &str,
984    metadata: BTreeMap<String, String>,
985    now_ms: i64,
986) -> PersonaTriggerEnvelope {
987    let provider = provider.to_ascii_lowercase();
988    let kind = kind.to_string();
989    let source_event_id = metadata
990        .get("event_id")
991        .or_else(|| metadata.get("id"))
992        .cloned();
993    let subject_key = match provider.as_str() {
994        "github" => {
995            let repo = metadata
996                .get("repository")
997                .or_else(|| metadata.get("repository.full_name"))
998                .cloned()
999                .unwrap_or_else(|| "unknown".to_string());
1000            if let Some(number) = metadata
1001                .get("pr")
1002                .or_else(|| metadata.get("pull_request.number"))
1003                .or_else(|| metadata.get("number"))
1004            {
1005                format!("github:{repo}:pr:{number}")
1006            } else if let Some(check) = metadata
1007                .get("check_run.name")
1008                .or_else(|| metadata.get("check_name"))
1009            {
1010                format!("github:{repo}:check:{check}")
1011            } else {
1012                format!("github:{repo}:{kind}")
1013            }
1014        }
1015        "linear" => {
1016            let issue = metadata
1017                .get("issue_key")
1018                .or_else(|| metadata.get("issue.identifier"))
1019                .or_else(|| metadata.get("issue_id"))
1020                .or_else(|| metadata.get("id"))
1021                .cloned()
1022                .unwrap_or_else(|| "unknown".to_string());
1023            format!("linear:issue:{issue}")
1024        }
1025        "slack" => {
1026            let channel = metadata
1027                .get("channel")
1028                .or_else(|| metadata.get("channel_id"))
1029                .cloned()
1030                .unwrap_or_else(|| "unknown".to_string());
1031            let ts = metadata
1032                .get("ts")
1033                .or_else(|| metadata.get("event_ts"))
1034                .cloned()
1035                .unwrap_or_else(|| "unknown".to_string());
1036            format!("slack:{channel}:{ts}")
1037        }
1038        "webhook" => metadata
1039            .get("dedupe_key")
1040            .or_else(|| metadata.get("event_id"))
1041            .map(|value| format!("webhook:{value}"))
1042            .unwrap_or_else(|| format!("webhook:{kind}:{}", Uuid::now_v7())),
1043        _ => metadata
1044            .get("dedupe_key")
1045            .or_else(|| metadata.get("event_id"))
1046            .map(|value| format!("{provider}:{kind}:{value}"))
1047            .unwrap_or_else(|| format!("{provider}:{kind}:{}", Uuid::now_v7())),
1048    };
1049
1050    PersonaTriggerEnvelope {
1051        provider,
1052        kind,
1053        subject_key,
1054        source_event_id,
1055        received_at_ms: now_ms,
1056        raw: json!({"metadata": metadata}),
1057        metadata,
1058    }
1059}
1060
1061async fn queued_events(
1062    log: &Arc<AnyEventLog>,
1063    persona: &str,
1064) -> Result<Vec<(PersonaTriggerEnvelope, PersonaRunCost)>, String> {
1065    let events = read_persona_events(log, persona).await?;
1066    let mut queued = BTreeMap::<String, (PersonaTriggerEnvelope, PersonaRunCost)>::new();
1067    let mut completed = BTreeSet::<String>::new();
1068    for (_, event) in events {
1069        match event.kind.as_str() {
1070            "persona.trigger.queued" => {
1071                let Some(envelope) = event.payload.get("envelope") else {
1072                    continue;
1073                };
1074                let envelope: PersonaTriggerEnvelope =
1075                    serde_json::from_value(envelope.clone()).map_err(|error| error.to_string())?;
1076                let cost = event
1077                    .payload
1078                    .get("cost")
1079                    .cloned()
1080                    .map(serde_json::from_value::<PersonaRunCost>)
1081                    .transpose()
1082                    .map_err(|error| error.to_string())?
1083                    .unwrap_or_default();
1084                queued.insert(envelope.subject_key.clone(), (envelope, cost));
1085            }
1086            "persona.run.completed" => {
1087                if let Some(work_key) = event
1088                    .payload
1089                    .get("work_key")
1090                    .and_then(serde_json::Value::as_str)
1091                {
1092                    completed.insert(work_key.to_string());
1093                }
1094            }
1095            _ => {}
1096        }
1097    }
1098    queued.retain(|work_key, _| !completed.contains(work_key));
1099    Ok(queued.into_values().collect())
1100}
1101
1102fn assignment_status_from_lease(lease: &PersonaLease) -> PersonaAssignmentStatus {
1103    PersonaAssignmentStatus {
1104        work_key: lease.work_key.clone(),
1105        lease_id: lease.id.clone(),
1106        holder: lease.holder.clone(),
1107        acquired_at: format_ms(lease.acquired_at_ms),
1108        expires_at: format_ms(lease.expires_at_ms),
1109    }
1110}
1111
1112fn queued_work_from_event(event: &LogEvent) -> Result<Option<PersonaQueuedWork>, String> {
1113    let Some(envelope) = event.payload.get("envelope") else {
1114        return Ok(None);
1115    };
1116    let envelope: PersonaTriggerEnvelope =
1117        serde_json::from_value(envelope.clone()).map_err(|error| error.to_string())?;
1118    Ok(Some(PersonaQueuedWork {
1119        work_key: envelope.subject_key,
1120        provider: envelope.provider,
1121        kind: envelope.kind,
1122        queued_at: format_ms(event.occurred_at_ms),
1123        reason: event
1124            .payload
1125            .get("reason")
1126            .and_then(serde_json::Value::as_str)
1127            .unwrap_or("queued")
1128            .to_string(),
1129        source_event_id: envelope.source_event_id,
1130        metadata: envelope.metadata,
1131    }))
1132}
1133
1134fn handoff_inbox_item(work: &PersonaQueuedWork) -> Option<PersonaHandoffInboxItem> {
1135    if work.provider != "handoff" && !work.metadata.contains_key("handoff_id") {
1136        return None;
1137    }
1138    Some(PersonaHandoffInboxItem {
1139        work_key: work.work_key.clone(),
1140        handoff_id: work.metadata.get("handoff_id").cloned(),
1141        handoff_kind: work
1142            .metadata
1143            .get("handoff_kind")
1144            .or_else(|| work.metadata.get("kind"))
1145            .cloned(),
1146        source_persona: work.metadata.get("source_persona").cloned(),
1147        task: work.metadata.get("task").cloned(),
1148        queued_at: work.queued_at.clone(),
1149        reason: work.reason.clone(),
1150    })
1151}
1152
1153fn value_receipt_from_event(event: &LogEvent) -> Result<Option<PersonaValueReceipt>, String> {
1154    let Ok(value_event) = serde_json::from_value::<PersonaValueEvent>(event.payload.clone()) else {
1155        return Ok(None);
1156    };
1157    Ok(Some(PersonaValueReceipt {
1158        kind: value_event.kind,
1159        run_id: value_event.run_id,
1160        occurred_at: value_event
1161            .occurred_at
1162            .format(&Rfc3339)
1163            .map_err(|error| error.to_string())?,
1164        paid_cost_usd: value_event.paid_cost_usd,
1165        avoided_cost_usd: value_event.avoided_cost_usd,
1166        deterministic_steps: value_event.deterministic_steps,
1167        llm_steps: value_event.llm_steps,
1168        metadata: value_event.metadata,
1169    }))
1170}
1171
1172async fn work_completed(
1173    log: &Arc<AnyEventLog>,
1174    persona: &str,
1175    work_key: &str,
1176) -> Result<bool, String> {
1177    let events = read_persona_events(log, persona).await?;
1178    Ok(events.into_iter().any(|(_, event)| {
1179        event.kind == "persona.run.completed"
1180            && event
1181                .payload
1182                .get("work_key")
1183                .and_then(serde_json::Value::as_str)
1184                == Some(work_key)
1185    }))
1186}
1187
1188async fn read_persona_events(
1189    log: &Arc<AnyEventLog>,
1190    persona: &str,
1191) -> Result<Vec<(u64, LogEvent)>, String> {
1192    let topic = runtime_topic()?;
1193    Ok(log
1194        .read_range(&topic, None, usize::MAX)
1195        .await
1196        .map_err(|error| error.to_string())?
1197        .into_iter()
1198        .filter(|(_, event)| {
1199            event
1200                .headers
1201                .get("persona")
1202                .is_some_and(|name| name == persona)
1203        })
1204        .collect())
1205}
1206
1207async fn append_persona_event(
1208    log: &Arc<AnyEventLog>,
1209    persona: &str,
1210    kind: &str,
1211    payload: serde_json::Value,
1212    now_ms: i64,
1213) -> Result<u64, String> {
1214    let mut headers = BTreeMap::new();
1215    headers.insert("persona".to_string(), persona.to_string());
1216    let event = LogEvent {
1217        kind: kind.to_string(),
1218        payload,
1219        headers,
1220        occurred_at_ms: now_ms,
1221    };
1222    log.append(&runtime_topic()?, event)
1223        .await
1224        .map_err(|error| error.to_string())
1225}
1226
1227struct PersonaValueEventDelta {
1228    kind: PersonaValueEventKind,
1229    paid_cost_usd: f64,
1230    avoided_cost_usd: f64,
1231    deterministic_steps: i64,
1232    llm_steps: i64,
1233    metadata: serde_json::Value,
1234}
1235
1236impl Default for PersonaValueEventDelta {
1237    fn default() -> Self {
1238        Self {
1239            kind: PersonaValueEventKind::RunCompleted,
1240            paid_cost_usd: 0.0,
1241            avoided_cost_usd: 0.0,
1242            deterministic_steps: 0,
1243            llm_steps: 0,
1244            metadata: serde_json::Value::Null,
1245        }
1246    }
1247}
1248
1249async fn emit_persona_value_event(
1250    log: &Arc<AnyEventLog>,
1251    binding: &PersonaRuntimeBinding,
1252    run_id: Uuid,
1253    delta: PersonaValueEventDelta,
1254    now_ms: i64,
1255) -> Result<(), String> {
1256    let event = PersonaValueEvent {
1257        persona_id: binding.name.clone(),
1258        template_ref: binding.template_ref.clone(),
1259        run_id: Some(run_id),
1260        kind: delta.kind,
1261        paid_cost_usd: delta.paid_cost_usd.max(0.0),
1262        avoided_cost_usd: delta.avoided_cost_usd.max(0.0),
1263        deterministic_steps: delta.deterministic_steps.max(0),
1264        llm_steps: delta.llm_steps.max(0),
1265        metadata: delta.metadata,
1266        occurred_at: offset_datetime_from_ms(now_ms),
1267    };
1268    append_persona_event(
1269        log,
1270        &binding.name,
1271        &format!("persona.value.{}", event.kind.as_str()),
1272        serde_json::to_value(&event).map_err(|error| error.to_string())?,
1273        now_ms,
1274    )
1275    .await?;
1276    emit_persona_value_sink_event(&event);
1277    Ok(())
1278}
1279
1280fn emit_persona_value_sink_event(event: &PersonaValueEvent) {
1281    let sinks = persona_value_sinks()
1282        .read()
1283        .map(|sinks| {
1284            sinks
1285                .iter()
1286                .map(|(_, sink)| Arc::clone(sink))
1287                .collect::<Vec<_>>()
1288        })
1289        .unwrap_or_default();
1290    for sink in sinks {
1291        sink.handle_value_event(event);
1292    }
1293}
1294
1295fn run_value_metadata(
1296    envelope: &PersonaTriggerEnvelope,
1297    lease: &PersonaLease,
1298    cost: &PersonaRunCost,
1299) -> serde_json::Value {
1300    let mut metadata = serde_json::Map::new();
1301    metadata.insert("work_key".to_string(), json!(envelope.subject_key));
1302    metadata.insert("trigger_provider".to_string(), json!(envelope.provider));
1303    metadata.insert("trigger_kind".to_string(), json!(envelope.kind));
1304    metadata.insert("lease_id".to_string(), json!(lease.id));
1305    metadata.insert("tokens".to_string(), json!(cost.tokens));
1306    if cost.frontier_escalations > 0 {
1307        metadata.insert(
1308            "frontier_escalations".to_string(),
1309            json!(cost.frontier_escalations),
1310        );
1311    }
1312    match &cost.metadata {
1313        serde_json::Value::Null => {}
1314        serde_json::Value::Object(extra) => {
1315            metadata.extend(
1316                extra
1317                    .iter()
1318                    .map(|(key, value)| (key.clone(), value.clone())),
1319            );
1320        }
1321        extra => {
1322            metadata.insert("run_cost_metadata".to_string(), extra.clone());
1323        }
1324    }
1325    serde_json::Value::Object(metadata)
1326}
1327
1328fn budget_status(
1329    policy: &PersonaBudgetPolicy,
1330    spent: &[(i64, f64, u64)],
1331    now_ms: i64,
1332) -> PersonaBudgetStatus {
1333    let day_start = now_ms - (now_ms.rem_euclid(86_400_000));
1334    let hour_start = now_ms - (now_ms.rem_euclid(3_600_000));
1335    let mut spent_today_usd = 0.0;
1336    let mut spent_this_hour_usd = 0.0;
1337    let mut tokens_today = 0u64;
1338    let mut spent_last_run_usd = 0.0;
1339    for (at_ms, cost, tokens) in spent {
1340        spent_last_run_usd = *cost;
1341        if *at_ms >= day_start {
1342            spent_today_usd += cost;
1343            tokens_today += tokens;
1344        }
1345        if *at_ms >= hour_start {
1346            spent_this_hour_usd += cost;
1347        }
1348    }
1349
1350    let remaining_today_usd = policy
1351        .daily_usd
1352        .map(|limit| (limit - spent_today_usd).max(0.0));
1353    let remaining_hour_usd = policy
1354        .hourly_usd
1355        .map(|limit| (limit - spent_this_hour_usd).max(0.0));
1356    let reason = if policy
1357        .daily_usd
1358        .is_some_and(|limit| spent_today_usd >= limit && limit >= 0.0)
1359    {
1360        Some("daily_usd".to_string())
1361    } else if policy
1362        .hourly_usd
1363        .is_some_and(|limit| spent_this_hour_usd >= limit && limit >= 0.0)
1364    {
1365        Some("hourly_usd".to_string())
1366    } else if policy
1367        .max_tokens
1368        .is_some_and(|limit| tokens_today >= limit && limit > 0)
1369    {
1370        Some("max_tokens".to_string())
1371    } else {
1372        None
1373    };
1374
1375    PersonaBudgetStatus {
1376        daily_usd: policy.daily_usd,
1377        hourly_usd: policy.hourly_usd,
1378        run_usd: policy.run_usd,
1379        max_tokens: policy.max_tokens,
1380        spent_today_usd,
1381        spent_this_hour_usd,
1382        spent_last_run_usd,
1383        tokens_today,
1384        remaining_today_usd,
1385        remaining_hour_usd,
1386        exhausted: reason.is_some(),
1387        reason,
1388        last_receipt_id: None,
1389    }
1390}
1391
1392fn next_scheduled_run(
1393    binding: &PersonaRuntimeBinding,
1394    last_run_ms: Option<i64>,
1395    now_ms: i64,
1396) -> Option<String> {
1397    binding
1398        .schedules
1399        .iter()
1400        .filter_map(|schedule| next_cron_ms(schedule, last_run_ms.unwrap_or(now_ms)).ok())
1401        .min()
1402        .map(format_ms)
1403}
1404
1405fn next_cron_ms(schedule: &str, after_ms: i64) -> Result<i64, String> {
1406    let cron = schedule
1407        .parse::<Cron>()
1408        .map_err(|error| error.to_string())?;
1409    let after = Utc
1410        .timestamp_millis_opt(after_ms)
1411        .single()
1412        .ok_or_else(|| "invalid timestamp".to_string())?;
1413    let next = cron
1414        .find_next_occurrence(&after, false)
1415        .map_err(|error| error.to_string())?;
1416    Ok(next.timestamp_millis())
1417}
1418
1419pub fn now_ms() -> i64 {
1420    OffsetDateTime::now_utc().unix_timestamp_nanos() as i64 / 1_000_000
1421}
1422
1423fn offset_datetime_from_ms(ms: i64) -> OffsetDateTime {
1424    OffsetDateTime::from_unix_timestamp_nanos((ms as i128) * 1_000_000)
1425        .unwrap_or(OffsetDateTime::UNIX_EPOCH)
1426}
1427
1428pub fn format_ms(ms: i64) -> String {
1429    offset_datetime_from_ms(ms)
1430        .format(&Rfc3339)
1431        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
1432}
1433
1434pub fn parse_rfc3339_ms(value: &str) -> Result<i64, String> {
1435    let ts = OffsetDateTime::parse(value, &Rfc3339)
1436        .map_err(|error| format!("invalid RFC3339 timestamp '{value}': {error}"))?;
1437    Ok(ts.unix_timestamp_nanos() as i64 / 1_000_000)
1438}
1439
1440fn runtime_topic() -> Result<Topic, String> {
1441    Topic::new(PERSONA_RUNTIME_TOPIC).map_err(|error| error.to_string())
1442}
1443
1444#[cfg(test)]
1445mod tests {
1446    use super::*;
1447    use crate::event_log::{AnyEventLog, MemoryEventLog};
1448    use std::sync::Mutex;
1449
1450    struct CapturingValueSink {
1451        events: Arc<Mutex<Vec<PersonaValueEvent>>>,
1452    }
1453
1454    impl PersonaValueSink for CapturingValueSink {
1455        fn handle_value_event(&self, event: &PersonaValueEvent) {
1456            self.events.lock().unwrap().push(event.clone());
1457        }
1458    }
1459
1460    fn binding() -> PersonaRuntimeBinding {
1461        PersonaRuntimeBinding {
1462            name: "merge_captain".to_string(),
1463            template_ref: Some("software_factory@v0".to_string()),
1464            entry_workflow: "workflows/merge.harn#run".to_string(),
1465            schedules: vec!["*/30 * * * *".to_string()],
1466            triggers: vec!["github.pr_opened".to_string()],
1467            budget: PersonaBudgetPolicy {
1468                daily_usd: Some(0.02),
1469                hourly_usd: None,
1470                run_usd: Some(0.02),
1471                max_tokens: Some(100),
1472            },
1473        }
1474    }
1475
1476    fn log() -> Arc<AnyEventLog> {
1477        Arc::new(AnyEventLog::Memory(MemoryEventLog::new(64)))
1478    }
1479
1480    #[tokio::test]
1481    async fn schedule_tick_records_lifecycle_status_and_receipt() {
1482        let log = log();
1483        let binding = binding();
1484        let now = parse_rfc3339_ms("2026-04-24T12:30:00Z").unwrap();
1485        let receipt = fire_schedule(
1486            &log,
1487            &binding,
1488            PersonaRunCost {
1489                cost_usd: 0.01,
1490                tokens: 10,
1491                ..Default::default()
1492            },
1493            now,
1494        )
1495        .await
1496        .unwrap();
1497        assert_eq!(receipt.status, "completed");
1498        assert!(receipt.lease.is_some());
1499        let status = persona_status(&log, &binding, now).await.unwrap();
1500        assert_eq!(status.state, PersonaLifecycleState::Idle);
1501        assert_eq!(status.last_run.as_deref(), Some("2026-04-24T12:30:00Z"));
1502        assert!(status.next_scheduled_run.is_some());
1503        assert_eq!(status.budget.spent_today_usd, 0.01);
1504    }
1505
1506    #[tokio::test]
1507    async fn paused_personas_queue_and_resume_drains_once() {
1508        let log = log();
1509        let binding = binding();
1510        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
1511        pause_persona(&log, &binding, now).await.unwrap();
1512        let receipt = fire_trigger(
1513            &log,
1514            &binding,
1515            "github",
1516            "pull_request",
1517            BTreeMap::from([
1518                ("repository".to_string(), "burin-labs/harn".to_string()),
1519                ("number".to_string(), "462".to_string()),
1520            ]),
1521            PersonaRunCost::default(),
1522            now,
1523        )
1524        .await
1525        .unwrap();
1526        assert_eq!(receipt.status, "queued");
1527        assert_eq!(
1528            persona_status(&log, &binding, now)
1529                .await
1530                .unwrap()
1531                .queued_events,
1532            1
1533        );
1534        let status = resume_persona(&log, &binding, now + 1000).await.unwrap();
1535        assert_eq!(status.state, PersonaLifecycleState::Idle);
1536        assert_eq!(status.queued_events, 0);
1537    }
1538
1539    #[tokio::test]
1540    async fn resumed_queued_work_reuses_original_budget_cost() {
1541        let log = log();
1542        let mut binding = binding();
1543        binding.budget.run_usd = Some(0.01);
1544        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
1545        pause_persona(&log, &binding, now).await.unwrap();
1546        let queued = fire_trigger(
1547            &log,
1548            &binding,
1549            "github",
1550            "pull_request",
1551            BTreeMap::from([
1552                ("repository".to_string(), "burin-labs/harn".to_string()),
1553                ("number".to_string(), "1379".to_string()),
1554            ]),
1555            PersonaRunCost {
1556                cost_usd: 0.02,
1557                tokens: 1,
1558                ..Default::default()
1559            },
1560            now + 1,
1561        )
1562        .await
1563        .unwrap();
1564        assert_eq!(queued.status, "queued");
1565
1566        let status = resume_persona(&log, &binding, now + 2).await.unwrap();
1567
1568        assert_eq!(status.budget.reason.as_deref(), Some("run_usd"));
1569        assert!(status
1570            .last_error
1571            .as_deref()
1572            .is_some_and(|error| error.contains("run_usd")));
1573        assert_eq!(status.queued_events, 1);
1574    }
1575
1576    #[tokio::test]
1577    async fn duplicate_trigger_envelope_is_not_processed_twice() {
1578        let log = log();
1579        let binding = binding();
1580        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
1581        let metadata = BTreeMap::from([
1582            ("repository".to_string(), "burin-labs/harn".to_string()),
1583            ("number".to_string(), "462".to_string()),
1584        ]);
1585        let first = fire_trigger(
1586            &log,
1587            &binding,
1588            "github",
1589            "pull_request",
1590            metadata.clone(),
1591            PersonaRunCost::default(),
1592            now,
1593        )
1594        .await
1595        .unwrap();
1596        let second = fire_trigger(
1597            &log,
1598            &binding,
1599            "github",
1600            "pull_request",
1601            metadata,
1602            PersonaRunCost::default(),
1603            now + 1000,
1604        )
1605        .await
1606        .unwrap();
1607        assert_eq!(first.status, "completed");
1608        assert_eq!(second.status, "duplicate");
1609        assert!(second.lease.is_none());
1610    }
1611
1612    #[tokio::test]
1613    async fn disabled_personas_dead_letter_events() {
1614        let log = log();
1615        let binding = binding();
1616        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
1617        disable_persona(&log, &binding, now).await.unwrap();
1618        let receipt = fire_trigger(
1619            &log,
1620            &binding,
1621            "slack",
1622            "message",
1623            BTreeMap::from([
1624                ("channel".to_string(), "C123".to_string()),
1625                ("ts".to_string(), "1713988800.000100".to_string()),
1626            ]),
1627            PersonaRunCost::default(),
1628            now,
1629        )
1630        .await
1631        .unwrap();
1632        assert_eq!(receipt.status, "dead_lettered");
1633        let status = persona_status(&log, &binding, now).await.unwrap();
1634        assert_eq!(status.state, PersonaLifecycleState::Disabled);
1635        assert_eq!(status.disabled_events, 1);
1636    }
1637
1638    #[tokio::test]
1639    async fn budget_exhaustion_blocks_expensive_work() {
1640        let log = log();
1641        let mut binding = binding();
1642        binding.budget.daily_usd = Some(0.01);
1643        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
1644        let receipt = fire_trigger(
1645            &log,
1646            &binding,
1647            "linear",
1648            "issue",
1649            BTreeMap::from([("issue_key".to_string(), "HAR-462".to_string())]),
1650            PersonaRunCost {
1651                cost_usd: 0.02,
1652                tokens: 1,
1653                ..Default::default()
1654            },
1655            now,
1656        )
1657        .await
1658        .unwrap();
1659        assert_eq!(receipt.status, "budget_exhausted");
1660        let status = persona_status(&log, &binding, now).await.unwrap();
1661        assert_eq!(status.budget.reason.as_deref(), Some("daily_usd"));
1662        assert!(status.budget.exhausted);
1663        assert!(status.last_error.as_deref().unwrap().contains("daily_usd"));
1664    }
1665
1666    #[tokio::test]
1667    async fn deterministic_predicate_hit_emits_value_event_with_avoided_cost() {
1668        let log = log();
1669        let binding = binding();
1670        let captured = Arc::new(Mutex::new(Vec::new()));
1671        let _registration = register_persona_value_sink(Arc::new(CapturingValueSink {
1672            events: captured.clone(),
1673        }));
1674        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
1675
1676        let receipt = fire_trigger(
1677            &log,
1678            &binding,
1679            "github",
1680            "pull_request",
1681            BTreeMap::from([
1682                ("repository".to_string(), "burin-labs/harn".to_string()),
1683                ("number".to_string(), "715".to_string()),
1684            ]),
1685            PersonaRunCost {
1686                avoided_cost_usd: 0.0042,
1687                deterministic_steps: 1,
1688                metadata: json!({
1689                    "predicate": "pr_already_green",
1690                    "would_have_called_model": "gpt-5.4-mini",
1691                }),
1692                ..Default::default()
1693            },
1694            now,
1695        )
1696        .await
1697        .unwrap();
1698
1699        let run_id = receipt.run_id.expect("completed run has run_id");
1700        let events = captured.lock().unwrap().clone();
1701        let deterministic = events
1702            .iter()
1703            .find(|event| {
1704                event.kind == PersonaValueEventKind::DeterministicExecution
1705                    && event.run_id == Some(run_id)
1706            })
1707            .expect("deterministic execution value event");
1708        assert_eq!(deterministic.persona_id, "merge_captain");
1709        assert_eq!(
1710            deterministic.template_ref.as_deref(),
1711            Some("software_factory@v0")
1712        );
1713        assert_eq!(deterministic.run_id, Some(run_id));
1714        assert_eq!(deterministic.paid_cost_usd, 0.0);
1715        assert_eq!(deterministic.avoided_cost_usd, 0.0042);
1716        assert_eq!(deterministic.deterministic_steps, 1);
1717        assert_eq!(
1718            deterministic.metadata["predicate"].as_str(),
1719            Some("pr_already_green")
1720        );
1721
1722        let persisted = read_persona_events(&log, &binding.name).await.unwrap();
1723        assert!(persisted.iter().any(|(_, event)| {
1724            event.kind == "persona.value.deterministic_execution"
1725                && event.payload["avoided_cost_usd"] == json!(0.0042)
1726        }));
1727    }
1728
1729    #[tokio::test]
1730    async fn frontier_escalation_run_emits_value_event_with_paid_cost() {
1731        let log = log();
1732        let binding = binding();
1733        let captured = Arc::new(Mutex::new(Vec::new()));
1734        let _registration = register_persona_value_sink(Arc::new(CapturingValueSink {
1735            events: captured.clone(),
1736        }));
1737        let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
1738
1739        let receipt = fire_trigger(
1740            &log,
1741            &binding,
1742            "linear",
1743            "issue",
1744            BTreeMap::from([("issue_key".to_string(), "HAR-715".to_string())]),
1745            PersonaRunCost {
1746                cost_usd: 0.011,
1747                tokens: 20,
1748                llm_steps: 1,
1749                frontier_escalations: 1,
1750                metadata: json!({
1751                    "frontier_model": "gpt-5.4",
1752                    "escalation_reason": "high_risk_merge",
1753                }),
1754                ..Default::default()
1755            },
1756            now,
1757        )
1758        .await
1759        .unwrap();
1760
1761        let run_id = receipt.run_id.expect("completed run has run_id");
1762        let events = captured.lock().unwrap().clone();
1763        let escalation = events
1764            .iter()
1765            .find(|event| {
1766                event.kind == PersonaValueEventKind::FrontierEscalation
1767                    && event.run_id == Some(run_id)
1768            })
1769            .expect("frontier escalation value event");
1770        assert_eq!(escalation.run_id, Some(run_id));
1771        assert_eq!(escalation.paid_cost_usd, 0.011);
1772        assert_eq!(escalation.avoided_cost_usd, 0.0);
1773        assert_eq!(escalation.llm_steps, 1);
1774        assert_eq!(
1775            escalation.metadata["frontier_model"].as_str(),
1776            Some("gpt-5.4")
1777        );
1778
1779        let completion = events
1780            .iter()
1781            .find(|event| {
1782                event.kind == PersonaValueEventKind::RunCompleted && event.run_id == Some(run_id)
1783            })
1784            .expect("run completed value event");
1785        assert_eq!(completion.paid_cost_usd, 0.0);
1786    }
1787}