1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::sync::{Arc, OnceLock, RwLock};
4
5use chrono::{TimeZone, Utc};
6use croner::Cron;
7use serde::{Deserialize, Serialize};
8use serde_json::json;
9use time::format_description::well_known::Rfc3339;
10use time::OffsetDateTime;
11use uuid::Uuid;
12
13use crate::event_log::{AnyEventLog, EventLog, LogEvent, Topic};
14
15pub const PERSONA_RUNTIME_TOPIC: &str = "persona.runtime.events";
16
17const DEFAULT_LEASE_TTL_MS: i64 = 5 * 60 * 1000;
18
19#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
20#[serde(rename_all = "snake_case")]
21pub enum PersonaLifecycleState {
22 Inactive,
23 Starting,
24 #[default]
25 Idle,
26 Running,
27 Paused,
28 Draining,
29 Failed,
30 Disabled,
31}
32
33impl PersonaLifecycleState {
34 pub fn as_str(self) -> &'static str {
35 match self {
36 Self::Inactive => "inactive",
37 Self::Starting => "starting",
38 Self::Idle => "idle",
39 Self::Running => "running",
40 Self::Paused => "paused",
41 Self::Draining => "draining",
42 Self::Failed => "failed",
43 Self::Disabled => "disabled",
44 }
45 }
46}
47
48#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
49pub struct PersonaBudgetPolicy {
50 pub daily_usd: Option<f64>,
51 pub hourly_usd: Option<f64>,
52 pub run_usd: Option<f64>,
53 pub max_tokens: Option<u64>,
54}
55
56#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
57pub struct PersonaRuntimeBinding {
58 pub name: String,
59 #[serde(default)]
60 pub template_ref: Option<String>,
61 pub entry_workflow: String,
62 #[serde(default)]
63 pub schedules: Vec<String>,
64 #[serde(default)]
65 pub triggers: Vec<String>,
66 #[serde(default)]
67 pub budget: PersonaBudgetPolicy,
68}
69
70#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
71pub struct PersonaLease {
72 pub id: String,
73 pub holder: String,
74 pub work_key: String,
75 pub acquired_at_ms: i64,
76 pub expires_at_ms: i64,
77}
78
79#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
80pub struct PersonaBudgetStatus {
81 pub daily_usd: Option<f64>,
82 pub hourly_usd: Option<f64>,
83 pub run_usd: Option<f64>,
84 pub max_tokens: Option<u64>,
85 pub spent_today_usd: f64,
86 pub spent_this_hour_usd: f64,
87 pub spent_last_run_usd: f64,
88 pub tokens_today: u64,
89 pub remaining_today_usd: Option<f64>,
90 pub remaining_hour_usd: Option<f64>,
91 pub exhausted: bool,
92 pub reason: Option<String>,
93 pub last_receipt_id: Option<String>,
94}
95
96#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
97pub struct PersonaStatus {
98 pub name: String,
99 #[serde(default)]
100 pub template_ref: Option<String>,
101 pub state: PersonaLifecycleState,
102 pub entry_workflow: String,
103 #[serde(default)]
104 pub role: String,
105 #[serde(default)]
106 pub current_assignment: Option<PersonaAssignmentStatus>,
107 pub last_run: Option<String>,
108 pub next_scheduled_run: Option<String>,
109 pub active_lease: Option<PersonaLease>,
110 pub budget: PersonaBudgetStatus,
111 pub last_error: Option<String>,
112 pub queued_events: usize,
113 #[serde(default)]
114 pub queued_work: Vec<PersonaQueuedWork>,
115 #[serde(default)]
116 pub handoff_inbox: Vec<PersonaHandoffInboxItem>,
117 #[serde(default)]
118 pub value_receipts: Vec<PersonaValueReceipt>,
119 pub disabled_events: usize,
120 pub paused_event_policy: String,
121}
122
123#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
124pub struct PersonaAssignmentStatus {
125 pub work_key: String,
126 pub lease_id: String,
127 pub holder: String,
128 pub acquired_at: String,
129 pub expires_at: String,
130}
131
132#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
133pub struct PersonaQueuedWork {
134 pub work_key: String,
135 pub provider: String,
136 pub kind: String,
137 pub queued_at: String,
138 pub reason: String,
139 pub source_event_id: Option<String>,
140 #[serde(default)]
141 pub metadata: BTreeMap<String, String>,
142}
143
144#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
145pub struct PersonaHandoffInboxItem {
146 pub work_key: String,
147 pub handoff_id: Option<String>,
148 pub handoff_kind: Option<String>,
149 pub source_persona: Option<String>,
150 pub task: Option<String>,
151 pub queued_at: String,
152 pub reason: String,
153}
154
155#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
156pub struct PersonaValueReceipt {
157 pub kind: PersonaValueEventKind,
158 pub run_id: Option<Uuid>,
159 pub occurred_at: String,
160 pub paid_cost_usd: f64,
161 pub avoided_cost_usd: f64,
162 pub deterministic_steps: i64,
163 pub llm_steps: i64,
164 pub metadata: serde_json::Value,
165}
166
167#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
168pub struct PersonaTriggerEnvelope {
169 pub provider: String,
170 pub kind: String,
171 pub subject_key: String,
172 pub source_event_id: Option<String>,
173 pub received_at_ms: i64,
174 #[serde(default)]
175 pub metadata: BTreeMap<String, String>,
176 #[serde(default)]
177 pub raw: serde_json::Value,
178}
179
180#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
181pub struct PersonaRunReceipt {
182 pub status: String,
183 pub persona: String,
184 #[serde(default)]
185 pub run_id: Option<Uuid>,
186 pub work_key: String,
187 pub lease: Option<PersonaLease>,
188 pub queued: bool,
189 pub error: Option<String>,
190 pub budget_receipt_id: Option<String>,
191}
192
193#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
194pub struct PersonaRunCost {
195 pub cost_usd: f64,
196 pub tokens: u64,
197 #[serde(default)]
198 pub avoided_cost_usd: f64,
199 #[serde(default)]
200 pub deterministic_steps: i64,
201 #[serde(default)]
202 pub llm_steps: i64,
203 #[serde(default)]
204 pub frontier_escalations: i64,
205 #[serde(default)]
206 pub metadata: serde_json::Value,
207}
208
209#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
210#[serde(rename_all = "snake_case")]
211pub enum PersonaValueEventKind {
212 RunStarted,
213 RunCompleted,
214 AcceptedOutcome,
215 FrontierEscalation,
216 DeterministicExecution,
217 PromotionSavings,
218 ApprovalWait,
219}
220
221impl PersonaValueEventKind {
222 pub fn as_str(self) -> &'static str {
223 match self {
224 Self::RunStarted => "run_started",
225 Self::RunCompleted => "run_completed",
226 Self::AcceptedOutcome => "accepted_outcome",
227 Self::FrontierEscalation => "frontier_escalation",
228 Self::DeterministicExecution => "deterministic_execution",
229 Self::PromotionSavings => "promotion_savings",
230 Self::ApprovalWait => "approval_wait",
231 }
232 }
233}
234
235#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
236pub struct PersonaValueEvent {
237 pub persona_id: String,
238 pub template_ref: Option<String>,
239 pub run_id: Option<Uuid>,
240 pub kind: PersonaValueEventKind,
241 pub paid_cost_usd: f64,
242 pub avoided_cost_usd: f64,
243 pub deterministic_steps: i64,
244 pub llm_steps: i64,
245 pub metadata: serde_json::Value,
246 pub occurred_at: OffsetDateTime,
247}
248
249pub trait PersonaValueSink: Send + Sync {
250 fn handle_value_event(&self, event: &PersonaValueEvent);
251}
252
253type PersonaValueSinkRegistry = RwLock<Vec<(u64, Arc<dyn PersonaValueSink>)>>;
254
255fn persona_value_sinks() -> &'static PersonaValueSinkRegistry {
256 static REGISTRY: OnceLock<PersonaValueSinkRegistry> = OnceLock::new();
257 REGISTRY.get_or_init(|| RwLock::new(Vec::new()))
258}
259
260fn next_persona_value_sink_id() -> u64 {
261 static NEXT_ID: AtomicU64 = AtomicU64::new(1);
262 NEXT_ID.fetch_add(1, Ordering::Relaxed)
263}
264
265pub struct PersonaValueSinkRegistration {
266 id: u64,
267}
268
269impl Drop for PersonaValueSinkRegistration {
270 fn drop(&mut self) {
271 if let Ok(mut sinks) = persona_value_sinks().write() {
272 sinks.retain(|(id, _)| *id != self.id);
273 }
274 }
275}
276
277pub fn register_persona_value_sink(
278 sink: Arc<dyn PersonaValueSink>,
279) -> PersonaValueSinkRegistration {
280 let id = next_persona_value_sink_id();
281 if let Ok(mut sinks) = persona_value_sinks().write() {
282 sinks.push((id, sink));
283 }
284 PersonaValueSinkRegistration { id }
285}
286
287pub async fn persona_status(
288 log: &Arc<AnyEventLog>,
289 binding: &PersonaRuntimeBinding,
290 now_ms: i64,
291) -> Result<PersonaStatus, String> {
292 let events = read_persona_events(log, &binding.name).await?;
293 let mut state = PersonaLifecycleState::Idle;
294 let mut last_run_ms = None;
295 let mut active_lease = None;
296 let mut last_error = None;
297 let mut queued = BTreeSet::<String>::new();
298 let mut completed = BTreeSet::<String>::new();
299 let mut disabled_events = 0usize;
300 let mut budget_receipt = None;
301 let mut budget_exhaustion_reason = None;
302 let mut spent = Vec::<(i64, f64, u64)>::new();
303 let mut queued_work = BTreeMap::<String, PersonaQueuedWork>::new();
304 let mut value_receipts = Vec::<PersonaValueReceipt>::new();
305
306 for (_, event) in events {
307 match event.kind.as_str() {
308 "persona.control.paused" => state = PersonaLifecycleState::Paused,
309 "persona.control.resumed" => state = PersonaLifecycleState::Idle,
310 "persona.control.disabled" => state = PersonaLifecycleState::Disabled,
311 "persona.control.draining" => state = PersonaLifecycleState::Draining,
312 "persona.lease.acquired" => {
313 if let Ok(lease) = serde_json::from_value::<PersonaLease>(event.payload.clone()) {
314 active_lease = Some(lease);
315 state = PersonaLifecycleState::Running;
316 }
317 }
318 "persona.lease.released" => {
319 active_lease = None;
320 if !matches!(
321 state,
322 PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
323 ) {
324 state = PersonaLifecycleState::Idle;
325 }
326 }
327 "persona.lease.expired" => {
328 active_lease = None;
329 if !matches!(
330 state,
331 PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
332 ) {
333 state = PersonaLifecycleState::Idle;
334 }
335 }
336 "persona.run.started" => state = PersonaLifecycleState::Running,
337 "persona.run.completed" => {
338 last_run_ms = event
339 .payload
340 .get("completed_at_ms")
341 .and_then(serde_json::Value::as_i64)
342 .or(Some(event.occurred_at_ms));
343 if let Some(work_key) = event
344 .payload
345 .get("work_key")
346 .and_then(serde_json::Value::as_str)
347 {
348 completed.insert(work_key.to_string());
349 }
350 if !matches!(
351 state,
352 PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
353 ) {
354 state = PersonaLifecycleState::Idle;
355 }
356 }
357 "persona.run.failed" => {
358 state = PersonaLifecycleState::Failed;
359 last_error = event
360 .payload
361 .get("error")
362 .and_then(serde_json::Value::as_str)
363 .map(ToString::to_string);
364 }
365 "persona.trigger.queued" => {
366 if let Some(work_key) = event
367 .payload
368 .get("work_key")
369 .and_then(serde_json::Value::as_str)
370 {
371 queued.insert(work_key.to_string());
372 }
373 if let Some(item) = queued_work_from_event(&event)? {
374 queued_work.insert(item.work_key.clone(), item);
375 }
376 }
377 "persona.trigger.dead_lettered" => disabled_events += 1,
378 "persona.budget.recorded" => {
379 budget_receipt = event
380 .payload
381 .get("receipt_id")
382 .and_then(serde_json::Value::as_str)
383 .map(ToString::to_string);
384 spent.push((
385 event.occurred_at_ms,
386 event
387 .payload
388 .get("cost_usd")
389 .and_then(serde_json::Value::as_f64)
390 .unwrap_or_default(),
391 event
392 .payload
393 .get("tokens")
394 .and_then(serde_json::Value::as_u64)
395 .unwrap_or_default(),
396 ));
397 }
398 "persona.budget.exhausted" => {
399 budget_exhaustion_reason = event
400 .payload
401 .get("reason")
402 .and_then(serde_json::Value::as_str)
403 .map(ToString::to_string);
404 last_error = budget_exhaustion_reason
405 .as_ref()
406 .map(|reason| format!("persona budget exhausted: {reason}"));
407 budget_receipt = event
408 .payload
409 .get("receipt_id")
410 .and_then(serde_json::Value::as_str)
411 .map(ToString::to_string);
412 }
413 kind if kind.starts_with("persona.value.") => {
414 if let Some(receipt) = value_receipt_from_event(&event)? {
415 value_receipts.push(receipt);
416 }
417 }
418 _ => {}
419 }
420 }
421
422 if let Some(lease) = active_lease.as_ref() {
423 if lease.expires_at_ms <= now_ms {
424 active_lease = None;
425 if !matches!(
426 state,
427 PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
428 ) {
429 state = PersonaLifecycleState::Idle;
430 }
431 }
432 }
433
434 queued.retain(|work_key| !completed.contains(work_key));
435 queued_work.retain(|work_key, _| !completed.contains(work_key));
436 let queued_work = queued_work.into_values().collect::<Vec<_>>();
437 let handoff_inbox = queued_work
438 .iter()
439 .filter_map(handoff_inbox_item)
440 .collect::<Vec<_>>();
441
442 let mut budget = budget_status(&binding.budget, &spent, now_ms);
443 if budget.reason.is_none() {
444 if let Some(reason) = budget_exhaustion_reason {
445 budget.exhausted = true;
446 budget.reason = Some(reason);
447 }
448 }
449 if budget.last_receipt_id.is_none() {
450 budget.last_receipt_id = budget_receipt;
451 }
452
453 let current_assignment = active_lease.as_ref().map(assignment_status_from_lease);
454
455 Ok(PersonaStatus {
456 name: binding.name.clone(),
457 template_ref: binding.template_ref.clone(),
458 state,
459 entry_workflow: binding.entry_workflow.clone(),
460 role: binding.name.clone(),
461 current_assignment,
462 last_run: last_run_ms.map(format_ms),
463 next_scheduled_run: next_scheduled_run(binding, last_run_ms, now_ms),
464 active_lease,
465 budget,
466 last_error,
467 queued_events: queued.len(),
468 queued_work,
469 handoff_inbox,
470 value_receipts,
471 disabled_events,
472 paused_event_policy: "queue_then_drain_on_resume".to_string(),
473 })
474}
475
476pub async fn pause_persona(
477 log: &Arc<AnyEventLog>,
478 binding: &PersonaRuntimeBinding,
479 now_ms: i64,
480) -> Result<PersonaStatus, String> {
481 append_persona_event(
482 log,
483 &binding.name,
484 "persona.control.paused",
485 json!({"paused_at_ms": now_ms, "policy": "queue_then_drain_on_resume"}),
486 now_ms,
487 )
488 .await?;
489 persona_status(log, binding, now_ms).await
490}
491
492pub async fn resume_persona(
493 log: &Arc<AnyEventLog>,
494 binding: &PersonaRuntimeBinding,
495 now_ms: i64,
496) -> Result<PersonaStatus, String> {
497 append_persona_event(
498 log,
499 &binding.name,
500 "persona.control.resumed",
501 json!({"resumed_at_ms": now_ms, "drain": true}),
502 now_ms,
503 )
504 .await?;
505 let queued = queued_events(log, &binding.name).await?;
506 for (envelope, cost) in queued {
507 let _ = run_for_envelope(log, binding, envelope, cost, now_ms).await?;
508 }
509 persona_status(log, binding, now_ms).await
510}
511
512pub async fn disable_persona(
513 log: &Arc<AnyEventLog>,
514 binding: &PersonaRuntimeBinding,
515 now_ms: i64,
516) -> Result<PersonaStatus, String> {
517 append_persona_event(
518 log,
519 &binding.name,
520 "persona.control.disabled",
521 json!({"disabled_at_ms": now_ms}),
522 now_ms,
523 )
524 .await?;
525 persona_status(log, binding, now_ms).await
526}
527
528pub async fn fire_schedule(
529 log: &Arc<AnyEventLog>,
530 binding: &PersonaRuntimeBinding,
531 cost: PersonaRunCost,
532 now_ms: i64,
533) -> Result<PersonaRunReceipt, String> {
534 let schedule = binding
535 .schedules
536 .first()
537 .cloned()
538 .unwrap_or_else(|| "manual".to_string());
539 let envelope = PersonaTriggerEnvelope {
540 provider: "schedule".to_string(),
541 kind: "cron.tick".to_string(),
542 subject_key: format!("schedule:{}:{schedule}:{}", binding.name, format_ms(now_ms)),
543 source_event_id: None,
544 received_at_ms: now_ms,
545 metadata: BTreeMap::from([
546 ("persona".to_string(), binding.name.clone()),
547 ("schedule".to_string(), schedule),
548 ("fired_at".to_string(), format_ms(now_ms)),
549 ]),
550 raw: json!({}),
551 };
552 append_persona_event(
553 log,
554 &binding.name,
555 "persona.schedule.fired",
556 json!({"persona": binding.name, "envelope": envelope}),
557 now_ms,
558 )
559 .await?;
560 run_for_envelope(log, binding, envelope, cost, now_ms).await
561}
562
563pub async fn fire_trigger(
564 log: &Arc<AnyEventLog>,
565 binding: &PersonaRuntimeBinding,
566 provider: &str,
567 kind: &str,
568 metadata: BTreeMap<String, String>,
569 cost: PersonaRunCost,
570 now_ms: i64,
571) -> Result<PersonaRunReceipt, String> {
572 let envelope = normalize_trigger_envelope(provider, kind, metadata, now_ms);
573 append_persona_event(
574 log,
575 &binding.name,
576 "persona.trigger.received",
577 json!({"persona": binding.name, "envelope": envelope}),
578 now_ms,
579 )
580 .await?;
581 run_for_envelope(log, binding, envelope, cost, now_ms).await
582}
583
584pub async fn record_persona_spend(
585 log: &Arc<AnyEventLog>,
586 binding: &PersonaRuntimeBinding,
587 cost: PersonaRunCost,
588 now_ms: i64,
589) -> Result<PersonaBudgetStatus, String> {
590 enforce_budget(log, binding, &cost, now_ms).await?;
591 append_budget_record(log, &binding.name, &cost, None, now_ms).await?;
592 persona_status(log, binding, now_ms)
593 .await
594 .map(|status| status.budget)
595}
596
597async fn run_for_envelope(
598 log: &Arc<AnyEventLog>,
599 binding: &PersonaRuntimeBinding,
600 envelope: PersonaTriggerEnvelope,
601 cost: PersonaRunCost,
602 now_ms: i64,
603) -> Result<PersonaRunReceipt, String> {
604 let status = persona_status(log, binding, now_ms).await?;
605 match status.state {
606 PersonaLifecycleState::Paused => {
607 append_persona_event(
608 log,
609 &binding.name,
610 "persona.trigger.queued",
611 json!({
612 "work_key": envelope.subject_key,
613 "envelope": envelope,
614 "cost": cost,
615 "reason": "paused",
616 }),
617 now_ms,
618 )
619 .await?;
620 return Ok(PersonaRunReceipt {
621 status: "queued".to_string(),
622 persona: binding.name.clone(),
623 run_id: None,
624 work_key: envelope.subject_key,
625 lease: None,
626 queued: true,
627 error: None,
628 budget_receipt_id: None,
629 });
630 }
631 PersonaLifecycleState::Disabled => {
632 append_persona_event(
633 log,
634 &binding.name,
635 "persona.trigger.dead_lettered",
636 json!({
637 "work_key": envelope.subject_key,
638 "envelope": envelope,
639 "reason": "disabled",
640 }),
641 now_ms,
642 )
643 .await?;
644 return Ok(PersonaRunReceipt {
645 status: "dead_lettered".to_string(),
646 persona: binding.name.clone(),
647 run_id: None,
648 work_key: envelope.subject_key,
649 lease: None,
650 queued: false,
651 error: Some("persona is disabled".to_string()),
652 budget_receipt_id: None,
653 });
654 }
655 _ => {}
656 }
657
658 if let Err(error) = enforce_budget(log, binding, &cost, now_ms).await {
659 return Ok(PersonaRunReceipt {
660 status: "budget_exhausted".to_string(),
661 persona: binding.name.clone(),
662 run_id: None,
663 work_key: envelope.subject_key,
664 lease: None,
665 queued: false,
666 error: Some(error),
667 budget_receipt_id: None,
668 });
669 }
670
671 if work_completed(log, &binding.name, &envelope.subject_key).await? {
672 append_persona_event(
673 log,
674 &binding.name,
675 "persona.trigger.duplicate",
676 json!({
677 "work_key": envelope.subject_key,
678 "envelope": envelope,
679 "reason": "already_completed",
680 }),
681 now_ms,
682 )
683 .await?;
684 return Ok(PersonaRunReceipt {
685 status: "duplicate".to_string(),
686 persona: binding.name.clone(),
687 run_id: None,
688 work_key: envelope.subject_key,
689 lease: None,
690 queued: false,
691 error: None,
692 budget_receipt_id: None,
693 });
694 }
695
696 let Some(lease) = acquire_lease(
697 log,
698 binding,
699 &envelope.subject_key,
700 "persona-runtime",
701 DEFAULT_LEASE_TTL_MS,
702 now_ms,
703 )
704 .await?
705 else {
706 return Ok(PersonaRunReceipt {
707 status: "lease_busy".to_string(),
708 persona: binding.name.clone(),
709 run_id: None,
710 work_key: envelope.subject_key,
711 lease: status.active_lease,
712 queued: false,
713 error: Some("active lease already owns persona work".to_string()),
714 budget_receipt_id: None,
715 });
716 };
717
718 let run_id = Uuid::now_v7();
719 let value_metadata = run_value_metadata(&envelope, &lease, &cost);
720 append_persona_event(
721 log,
722 &binding.name,
723 "persona.run.started",
724 json!({
725 "work_key": envelope.subject_key,
726 "run_id": run_id,
727 "started_at_ms": now_ms,
728 "entry_workflow": binding.entry_workflow,
729 "lease_id": lease.id,
730 }),
731 now_ms,
732 )
733 .await?;
734 emit_persona_value_event(
735 log,
736 binding,
737 run_id,
738 PersonaValueEventDelta {
739 kind: PersonaValueEventKind::RunStarted,
740 metadata: value_metadata.clone(),
741 ..Default::default()
742 },
743 now_ms,
744 )
745 .await?;
746 let budget_receipt_id =
747 append_budget_record(log, &binding.name, &cost, Some(&lease.id), now_ms).await?;
748 if cost.avoided_cost_usd > 0.0 || cost.deterministic_steps > 0 {
749 emit_persona_value_event(
750 log,
751 binding,
752 run_id,
753 PersonaValueEventDelta {
754 kind: PersonaValueEventKind::DeterministicExecution,
755 avoided_cost_usd: cost.avoided_cost_usd,
756 deterministic_steps: cost.deterministic_steps.max(1),
757 metadata: value_metadata.clone(),
758 ..Default::default()
759 },
760 now_ms,
761 )
762 .await?;
763 }
764 if cost.frontier_escalations > 0 {
765 emit_persona_value_event(
766 log,
767 binding,
768 run_id,
769 PersonaValueEventDelta {
770 kind: PersonaValueEventKind::FrontierEscalation,
771 paid_cost_usd: cost.cost_usd,
772 llm_steps: cost.llm_steps.max(cost.frontier_escalations),
773 metadata: value_metadata.clone(),
774 ..Default::default()
775 },
776 now_ms,
777 )
778 .await?;
779 }
780 let completion_paid_cost = if cost.frontier_escalations > 0 {
781 0.0
782 } else {
783 cost.cost_usd
784 };
785 let completion_llm_steps = if cost.frontier_escalations > 0 {
786 0
787 } else {
788 cost.llm_steps
789 };
790 emit_persona_value_event(
791 log,
792 binding,
793 run_id,
794 PersonaValueEventDelta {
795 kind: PersonaValueEventKind::RunCompleted,
796 paid_cost_usd: completion_paid_cost,
797 llm_steps: completion_llm_steps,
798 metadata: value_metadata,
799 ..Default::default()
800 },
801 now_ms,
802 )
803 .await?;
804 append_persona_event(
805 log,
806 &binding.name,
807 "persona.run.completed",
808 json!({
809 "work_key": envelope.subject_key,
810 "run_id": run_id,
811 "completed_at_ms": now_ms,
812 "entry_workflow": binding.entry_workflow,
813 "lease_id": lease.id,
814 }),
815 now_ms,
816 )
817 .await?;
818 append_persona_event(
819 log,
820 &binding.name,
821 "persona.lease.released",
822 json!({
823 "id": lease.id,
824 "work_key": envelope.subject_key,
825 "released_at_ms": now_ms,
826 }),
827 now_ms,
828 )
829 .await?;
830 Ok(PersonaRunReceipt {
831 status: "completed".to_string(),
832 persona: binding.name.clone(),
833 run_id: Some(run_id),
834 work_key: envelope.subject_key,
835 lease: Some(lease),
836 queued: false,
837 error: None,
838 budget_receipt_id: Some(budget_receipt_id),
839 })
840}
841
842async fn acquire_lease(
843 log: &Arc<AnyEventLog>,
844 binding: &PersonaRuntimeBinding,
845 work_key: &str,
846 holder: &str,
847 ttl_ms: i64,
848 now_ms: i64,
849) -> Result<Option<PersonaLease>, String> {
850 let status = persona_status(log, binding, now_ms).await?;
851 if let Some(lease) = status.active_lease {
852 if lease.expires_at_ms > now_ms {
853 append_persona_event(
854 log,
855 &binding.name,
856 "persona.lease.conflict",
857 json!({
858 "active_lease": lease,
859 "requested_work_key": work_key,
860 "at_ms": now_ms,
861 }),
862 now_ms,
863 )
864 .await?;
865 return Ok(None);
866 }
867 append_persona_event(
868 log,
869 &binding.name,
870 "persona.lease.expired",
871 json!({
872 "id": lease.id,
873 "work_key": lease.work_key,
874 "expired_at_ms": now_ms,
875 }),
876 now_ms,
877 )
878 .await?;
879 }
880
881 let lease = PersonaLease {
882 id: format!("persona_lease_{}", Uuid::now_v7()),
883 holder: holder.to_string(),
884 work_key: work_key.to_string(),
885 acquired_at_ms: now_ms,
886 expires_at_ms: now_ms + ttl_ms,
887 };
888 append_persona_event(
889 log,
890 &binding.name,
891 "persona.lease.acquired",
892 serde_json::to_value(&lease).map_err(|error| error.to_string())?,
893 now_ms,
894 )
895 .await?;
896 Ok(Some(lease))
897}
898
899async fn enforce_budget(
900 log: &Arc<AnyEventLog>,
901 binding: &PersonaRuntimeBinding,
902 cost: &PersonaRunCost,
903 now_ms: i64,
904) -> Result<(), String> {
905 let status = persona_status(log, binding, now_ms).await?;
906 let reason = if binding
907 .budget
908 .run_usd
909 .is_some_and(|limit| cost.cost_usd > limit)
910 {
911 Some("run_usd")
912 } else if binding
913 .budget
914 .daily_usd
915 .is_some_and(|limit| status.budget.spent_today_usd + cost.cost_usd > limit)
916 {
917 Some("daily_usd")
918 } else if binding
919 .budget
920 .hourly_usd
921 .is_some_and(|limit| status.budget.spent_this_hour_usd + cost.cost_usd > limit)
922 {
923 Some("hourly_usd")
924 } else if binding
925 .budget
926 .max_tokens
927 .is_some_and(|limit| status.budget.tokens_today + cost.tokens > limit)
928 {
929 Some("max_tokens")
930 } else {
931 None
932 };
933
934 if let Some(reason) = reason {
935 let receipt_id = format!("persona_budget_{}", Uuid::now_v7());
936 append_persona_event(
937 log,
938 &binding.name,
939 "persona.budget.exhausted",
940 json!({
941 "receipt_id": receipt_id,
942 "reason": reason,
943 "attempted_cost_usd": cost.cost_usd,
944 "attempted_tokens": cost.tokens,
945 "persona": binding.name,
946 }),
947 now_ms,
948 )
949 .await?;
950 return Err(format!("persona budget exhausted: {reason}"));
951 }
952
953 Ok(())
954}
955
956async fn append_budget_record(
957 log: &Arc<AnyEventLog>,
958 persona: &str,
959 cost: &PersonaRunCost,
960 lease_id: Option<&str>,
961 now_ms: i64,
962) -> Result<String, String> {
963 let receipt_id = format!("persona_budget_{}", Uuid::now_v7());
964 append_persona_event(
965 log,
966 persona,
967 "persona.budget.recorded",
968 json!({
969 "receipt_id": receipt_id,
970 "persona": persona,
971 "cost_usd": cost.cost_usd,
972 "tokens": cost.tokens,
973 "lease_id": lease_id,
974 }),
975 now_ms,
976 )
977 .await?;
978 Ok(receipt_id)
979}
980
981fn normalize_trigger_envelope(
982 provider: &str,
983 kind: &str,
984 metadata: BTreeMap<String, String>,
985 now_ms: i64,
986) -> PersonaTriggerEnvelope {
987 let provider = provider.to_ascii_lowercase();
988 let kind = kind.to_string();
989 let source_event_id = metadata
990 .get("event_id")
991 .or_else(|| metadata.get("id"))
992 .cloned();
993 let subject_key = match provider.as_str() {
994 "github" => {
995 let repo = metadata
996 .get("repository")
997 .or_else(|| metadata.get("repository.full_name"))
998 .cloned()
999 .unwrap_or_else(|| "unknown".to_string());
1000 if let Some(number) = metadata
1001 .get("pr")
1002 .or_else(|| metadata.get("pull_request.number"))
1003 .or_else(|| metadata.get("number"))
1004 {
1005 format!("github:{repo}:pr:{number}")
1006 } else if let Some(check) = metadata
1007 .get("check_run.name")
1008 .or_else(|| metadata.get("check_name"))
1009 {
1010 format!("github:{repo}:check:{check}")
1011 } else {
1012 format!("github:{repo}:{kind}")
1013 }
1014 }
1015 "linear" => {
1016 let issue = metadata
1017 .get("issue_key")
1018 .or_else(|| metadata.get("issue.identifier"))
1019 .or_else(|| metadata.get("issue_id"))
1020 .or_else(|| metadata.get("id"))
1021 .cloned()
1022 .unwrap_or_else(|| "unknown".to_string());
1023 format!("linear:issue:{issue}")
1024 }
1025 "slack" => {
1026 let channel = metadata
1027 .get("channel")
1028 .or_else(|| metadata.get("channel_id"))
1029 .cloned()
1030 .unwrap_or_else(|| "unknown".to_string());
1031 let ts = metadata
1032 .get("ts")
1033 .or_else(|| metadata.get("event_ts"))
1034 .cloned()
1035 .unwrap_or_else(|| "unknown".to_string());
1036 format!("slack:{channel}:{ts}")
1037 }
1038 "webhook" => metadata
1039 .get("dedupe_key")
1040 .or_else(|| metadata.get("event_id"))
1041 .map(|value| format!("webhook:{value}"))
1042 .unwrap_or_else(|| format!("webhook:{kind}:{}", Uuid::now_v7())),
1043 _ => metadata
1044 .get("dedupe_key")
1045 .or_else(|| metadata.get("event_id"))
1046 .map(|value| format!("{provider}:{kind}:{value}"))
1047 .unwrap_or_else(|| format!("{provider}:{kind}:{}", Uuid::now_v7())),
1048 };
1049
1050 PersonaTriggerEnvelope {
1051 provider,
1052 kind,
1053 subject_key,
1054 source_event_id,
1055 received_at_ms: now_ms,
1056 raw: json!({"metadata": metadata}),
1057 metadata,
1058 }
1059}
1060
1061async fn queued_events(
1062 log: &Arc<AnyEventLog>,
1063 persona: &str,
1064) -> Result<Vec<(PersonaTriggerEnvelope, PersonaRunCost)>, String> {
1065 let events = read_persona_events(log, persona).await?;
1066 let mut queued = BTreeMap::<String, (PersonaTriggerEnvelope, PersonaRunCost)>::new();
1067 let mut completed = BTreeSet::<String>::new();
1068 for (_, event) in events {
1069 match event.kind.as_str() {
1070 "persona.trigger.queued" => {
1071 let Some(envelope) = event.payload.get("envelope") else {
1072 continue;
1073 };
1074 let envelope: PersonaTriggerEnvelope =
1075 serde_json::from_value(envelope.clone()).map_err(|error| error.to_string())?;
1076 let cost = event
1077 .payload
1078 .get("cost")
1079 .cloned()
1080 .map(serde_json::from_value::<PersonaRunCost>)
1081 .transpose()
1082 .map_err(|error| error.to_string())?
1083 .unwrap_or_default();
1084 queued.insert(envelope.subject_key.clone(), (envelope, cost));
1085 }
1086 "persona.run.completed" => {
1087 if let Some(work_key) = event
1088 .payload
1089 .get("work_key")
1090 .and_then(serde_json::Value::as_str)
1091 {
1092 completed.insert(work_key.to_string());
1093 }
1094 }
1095 _ => {}
1096 }
1097 }
1098 queued.retain(|work_key, _| !completed.contains(work_key));
1099 Ok(queued.into_values().collect())
1100}
1101
1102fn assignment_status_from_lease(lease: &PersonaLease) -> PersonaAssignmentStatus {
1103 PersonaAssignmentStatus {
1104 work_key: lease.work_key.clone(),
1105 lease_id: lease.id.clone(),
1106 holder: lease.holder.clone(),
1107 acquired_at: format_ms(lease.acquired_at_ms),
1108 expires_at: format_ms(lease.expires_at_ms),
1109 }
1110}
1111
1112fn queued_work_from_event(event: &LogEvent) -> Result<Option<PersonaQueuedWork>, String> {
1113 let Some(envelope) = event.payload.get("envelope") else {
1114 return Ok(None);
1115 };
1116 let envelope: PersonaTriggerEnvelope =
1117 serde_json::from_value(envelope.clone()).map_err(|error| error.to_string())?;
1118 Ok(Some(PersonaQueuedWork {
1119 work_key: envelope.subject_key,
1120 provider: envelope.provider,
1121 kind: envelope.kind,
1122 queued_at: format_ms(event.occurred_at_ms),
1123 reason: event
1124 .payload
1125 .get("reason")
1126 .and_then(serde_json::Value::as_str)
1127 .unwrap_or("queued")
1128 .to_string(),
1129 source_event_id: envelope.source_event_id,
1130 metadata: envelope.metadata,
1131 }))
1132}
1133
1134fn handoff_inbox_item(work: &PersonaQueuedWork) -> Option<PersonaHandoffInboxItem> {
1135 if work.provider != "handoff" && !work.metadata.contains_key("handoff_id") {
1136 return None;
1137 }
1138 Some(PersonaHandoffInboxItem {
1139 work_key: work.work_key.clone(),
1140 handoff_id: work.metadata.get("handoff_id").cloned(),
1141 handoff_kind: work
1142 .metadata
1143 .get("handoff_kind")
1144 .or_else(|| work.metadata.get("kind"))
1145 .cloned(),
1146 source_persona: work.metadata.get("source_persona").cloned(),
1147 task: work.metadata.get("task").cloned(),
1148 queued_at: work.queued_at.clone(),
1149 reason: work.reason.clone(),
1150 })
1151}
1152
1153fn value_receipt_from_event(event: &LogEvent) -> Result<Option<PersonaValueReceipt>, String> {
1154 let Ok(value_event) = serde_json::from_value::<PersonaValueEvent>(event.payload.clone()) else {
1155 return Ok(None);
1156 };
1157 Ok(Some(PersonaValueReceipt {
1158 kind: value_event.kind,
1159 run_id: value_event.run_id,
1160 occurred_at: value_event
1161 .occurred_at
1162 .format(&Rfc3339)
1163 .map_err(|error| error.to_string())?,
1164 paid_cost_usd: value_event.paid_cost_usd,
1165 avoided_cost_usd: value_event.avoided_cost_usd,
1166 deterministic_steps: value_event.deterministic_steps,
1167 llm_steps: value_event.llm_steps,
1168 metadata: value_event.metadata,
1169 }))
1170}
1171
1172async fn work_completed(
1173 log: &Arc<AnyEventLog>,
1174 persona: &str,
1175 work_key: &str,
1176) -> Result<bool, String> {
1177 let events = read_persona_events(log, persona).await?;
1178 Ok(events.into_iter().any(|(_, event)| {
1179 event.kind == "persona.run.completed"
1180 && event
1181 .payload
1182 .get("work_key")
1183 .and_then(serde_json::Value::as_str)
1184 == Some(work_key)
1185 }))
1186}
1187
1188async fn read_persona_events(
1189 log: &Arc<AnyEventLog>,
1190 persona: &str,
1191) -> Result<Vec<(u64, LogEvent)>, String> {
1192 let topic = runtime_topic()?;
1193 Ok(log
1194 .read_range(&topic, None, usize::MAX)
1195 .await
1196 .map_err(|error| error.to_string())?
1197 .into_iter()
1198 .filter(|(_, event)| {
1199 event
1200 .headers
1201 .get("persona")
1202 .is_some_and(|name| name == persona)
1203 })
1204 .collect())
1205}
1206
1207async fn append_persona_event(
1208 log: &Arc<AnyEventLog>,
1209 persona: &str,
1210 kind: &str,
1211 payload: serde_json::Value,
1212 now_ms: i64,
1213) -> Result<u64, String> {
1214 let mut headers = BTreeMap::new();
1215 headers.insert("persona".to_string(), persona.to_string());
1216 let event = LogEvent {
1217 kind: kind.to_string(),
1218 payload,
1219 headers,
1220 occurred_at_ms: now_ms,
1221 };
1222 log.append(&runtime_topic()?, event)
1223 .await
1224 .map_err(|error| error.to_string())
1225}
1226
1227struct PersonaValueEventDelta {
1228 kind: PersonaValueEventKind,
1229 paid_cost_usd: f64,
1230 avoided_cost_usd: f64,
1231 deterministic_steps: i64,
1232 llm_steps: i64,
1233 metadata: serde_json::Value,
1234}
1235
1236impl Default for PersonaValueEventDelta {
1237 fn default() -> Self {
1238 Self {
1239 kind: PersonaValueEventKind::RunCompleted,
1240 paid_cost_usd: 0.0,
1241 avoided_cost_usd: 0.0,
1242 deterministic_steps: 0,
1243 llm_steps: 0,
1244 metadata: serde_json::Value::Null,
1245 }
1246 }
1247}
1248
1249async fn emit_persona_value_event(
1250 log: &Arc<AnyEventLog>,
1251 binding: &PersonaRuntimeBinding,
1252 run_id: Uuid,
1253 delta: PersonaValueEventDelta,
1254 now_ms: i64,
1255) -> Result<(), String> {
1256 let event = PersonaValueEvent {
1257 persona_id: binding.name.clone(),
1258 template_ref: binding.template_ref.clone(),
1259 run_id: Some(run_id),
1260 kind: delta.kind,
1261 paid_cost_usd: delta.paid_cost_usd.max(0.0),
1262 avoided_cost_usd: delta.avoided_cost_usd.max(0.0),
1263 deterministic_steps: delta.deterministic_steps.max(0),
1264 llm_steps: delta.llm_steps.max(0),
1265 metadata: delta.metadata,
1266 occurred_at: offset_datetime_from_ms(now_ms),
1267 };
1268 append_persona_event(
1269 log,
1270 &binding.name,
1271 &format!("persona.value.{}", event.kind.as_str()),
1272 serde_json::to_value(&event).map_err(|error| error.to_string())?,
1273 now_ms,
1274 )
1275 .await?;
1276 emit_persona_value_sink_event(&event);
1277 Ok(())
1278}
1279
1280fn emit_persona_value_sink_event(event: &PersonaValueEvent) {
1281 let sinks = persona_value_sinks()
1282 .read()
1283 .map(|sinks| {
1284 sinks
1285 .iter()
1286 .map(|(_, sink)| Arc::clone(sink))
1287 .collect::<Vec<_>>()
1288 })
1289 .unwrap_or_default();
1290 for sink in sinks {
1291 sink.handle_value_event(event);
1292 }
1293}
1294
1295fn run_value_metadata(
1296 envelope: &PersonaTriggerEnvelope,
1297 lease: &PersonaLease,
1298 cost: &PersonaRunCost,
1299) -> serde_json::Value {
1300 let mut metadata = serde_json::Map::new();
1301 metadata.insert("work_key".to_string(), json!(envelope.subject_key));
1302 metadata.insert("trigger_provider".to_string(), json!(envelope.provider));
1303 metadata.insert("trigger_kind".to_string(), json!(envelope.kind));
1304 metadata.insert("lease_id".to_string(), json!(lease.id));
1305 metadata.insert("tokens".to_string(), json!(cost.tokens));
1306 if cost.frontier_escalations > 0 {
1307 metadata.insert(
1308 "frontier_escalations".to_string(),
1309 json!(cost.frontier_escalations),
1310 );
1311 }
1312 match &cost.metadata {
1313 serde_json::Value::Null => {}
1314 serde_json::Value::Object(extra) => {
1315 metadata.extend(
1316 extra
1317 .iter()
1318 .map(|(key, value)| (key.clone(), value.clone())),
1319 );
1320 }
1321 extra => {
1322 metadata.insert("run_cost_metadata".to_string(), extra.clone());
1323 }
1324 }
1325 serde_json::Value::Object(metadata)
1326}
1327
1328fn budget_status(
1329 policy: &PersonaBudgetPolicy,
1330 spent: &[(i64, f64, u64)],
1331 now_ms: i64,
1332) -> PersonaBudgetStatus {
1333 let day_start = now_ms - (now_ms.rem_euclid(86_400_000));
1334 let hour_start = now_ms - (now_ms.rem_euclid(3_600_000));
1335 let mut spent_today_usd = 0.0;
1336 let mut spent_this_hour_usd = 0.0;
1337 let mut tokens_today = 0u64;
1338 let mut spent_last_run_usd = 0.0;
1339 for (at_ms, cost, tokens) in spent {
1340 spent_last_run_usd = *cost;
1341 if *at_ms >= day_start {
1342 spent_today_usd += cost;
1343 tokens_today += tokens;
1344 }
1345 if *at_ms >= hour_start {
1346 spent_this_hour_usd += cost;
1347 }
1348 }
1349
1350 let remaining_today_usd = policy
1351 .daily_usd
1352 .map(|limit| (limit - spent_today_usd).max(0.0));
1353 let remaining_hour_usd = policy
1354 .hourly_usd
1355 .map(|limit| (limit - spent_this_hour_usd).max(0.0));
1356 let reason = if policy
1357 .daily_usd
1358 .is_some_and(|limit| spent_today_usd >= limit && limit >= 0.0)
1359 {
1360 Some("daily_usd".to_string())
1361 } else if policy
1362 .hourly_usd
1363 .is_some_and(|limit| spent_this_hour_usd >= limit && limit >= 0.0)
1364 {
1365 Some("hourly_usd".to_string())
1366 } else if policy
1367 .max_tokens
1368 .is_some_and(|limit| tokens_today >= limit && limit > 0)
1369 {
1370 Some("max_tokens".to_string())
1371 } else {
1372 None
1373 };
1374
1375 PersonaBudgetStatus {
1376 daily_usd: policy.daily_usd,
1377 hourly_usd: policy.hourly_usd,
1378 run_usd: policy.run_usd,
1379 max_tokens: policy.max_tokens,
1380 spent_today_usd,
1381 spent_this_hour_usd,
1382 spent_last_run_usd,
1383 tokens_today,
1384 remaining_today_usd,
1385 remaining_hour_usd,
1386 exhausted: reason.is_some(),
1387 reason,
1388 last_receipt_id: None,
1389 }
1390}
1391
1392fn next_scheduled_run(
1393 binding: &PersonaRuntimeBinding,
1394 last_run_ms: Option<i64>,
1395 now_ms: i64,
1396) -> Option<String> {
1397 binding
1398 .schedules
1399 .iter()
1400 .filter_map(|schedule| next_cron_ms(schedule, last_run_ms.unwrap_or(now_ms)).ok())
1401 .min()
1402 .map(format_ms)
1403}
1404
1405fn next_cron_ms(schedule: &str, after_ms: i64) -> Result<i64, String> {
1406 let cron = schedule
1407 .parse::<Cron>()
1408 .map_err(|error| error.to_string())?;
1409 let after = Utc
1410 .timestamp_millis_opt(after_ms)
1411 .single()
1412 .ok_or_else(|| "invalid timestamp".to_string())?;
1413 let next = cron
1414 .find_next_occurrence(&after, false)
1415 .map_err(|error| error.to_string())?;
1416 Ok(next.timestamp_millis())
1417}
1418
1419pub fn now_ms() -> i64 {
1420 OffsetDateTime::now_utc().unix_timestamp_nanos() as i64 / 1_000_000
1421}
1422
1423fn offset_datetime_from_ms(ms: i64) -> OffsetDateTime {
1424 OffsetDateTime::from_unix_timestamp_nanos((ms as i128) * 1_000_000)
1425 .unwrap_or(OffsetDateTime::UNIX_EPOCH)
1426}
1427
1428pub fn format_ms(ms: i64) -> String {
1429 offset_datetime_from_ms(ms)
1430 .format(&Rfc3339)
1431 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
1432}
1433
1434pub fn parse_rfc3339_ms(value: &str) -> Result<i64, String> {
1435 let ts = OffsetDateTime::parse(value, &Rfc3339)
1436 .map_err(|error| format!("invalid RFC3339 timestamp '{value}': {error}"))?;
1437 Ok(ts.unix_timestamp_nanos() as i64 / 1_000_000)
1438}
1439
1440fn runtime_topic() -> Result<Topic, String> {
1441 Topic::new(PERSONA_RUNTIME_TOPIC).map_err(|error| error.to_string())
1442}
1443
1444#[cfg(test)]
1445mod tests {
1446 use super::*;
1447 use crate::event_log::{AnyEventLog, MemoryEventLog};
1448 use std::sync::Mutex;
1449
1450 struct CapturingValueSink {
1451 events: Arc<Mutex<Vec<PersonaValueEvent>>>,
1452 }
1453
1454 impl PersonaValueSink for CapturingValueSink {
1455 fn handle_value_event(&self, event: &PersonaValueEvent) {
1456 self.events.lock().unwrap().push(event.clone());
1457 }
1458 }
1459
1460 fn binding() -> PersonaRuntimeBinding {
1461 PersonaRuntimeBinding {
1462 name: "merge_captain".to_string(),
1463 template_ref: Some("software_factory@v0".to_string()),
1464 entry_workflow: "workflows/merge.harn#run".to_string(),
1465 schedules: vec!["*/30 * * * *".to_string()],
1466 triggers: vec!["github.pr_opened".to_string()],
1467 budget: PersonaBudgetPolicy {
1468 daily_usd: Some(0.02),
1469 hourly_usd: None,
1470 run_usd: Some(0.02),
1471 max_tokens: Some(100),
1472 },
1473 }
1474 }
1475
1476 fn log() -> Arc<AnyEventLog> {
1477 Arc::new(AnyEventLog::Memory(MemoryEventLog::new(64)))
1478 }
1479
1480 #[tokio::test]
1481 async fn schedule_tick_records_lifecycle_status_and_receipt() {
1482 let log = log();
1483 let binding = binding();
1484 let now = parse_rfc3339_ms("2026-04-24T12:30:00Z").unwrap();
1485 let receipt = fire_schedule(
1486 &log,
1487 &binding,
1488 PersonaRunCost {
1489 cost_usd: 0.01,
1490 tokens: 10,
1491 ..Default::default()
1492 },
1493 now,
1494 )
1495 .await
1496 .unwrap();
1497 assert_eq!(receipt.status, "completed");
1498 assert!(receipt.lease.is_some());
1499 let status = persona_status(&log, &binding, now).await.unwrap();
1500 assert_eq!(status.state, PersonaLifecycleState::Idle);
1501 assert_eq!(status.last_run.as_deref(), Some("2026-04-24T12:30:00Z"));
1502 assert!(status.next_scheduled_run.is_some());
1503 assert_eq!(status.budget.spent_today_usd, 0.01);
1504 }
1505
1506 #[tokio::test]
1507 async fn paused_personas_queue_and_resume_drains_once() {
1508 let log = log();
1509 let binding = binding();
1510 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
1511 pause_persona(&log, &binding, now).await.unwrap();
1512 let receipt = fire_trigger(
1513 &log,
1514 &binding,
1515 "github",
1516 "pull_request",
1517 BTreeMap::from([
1518 ("repository".to_string(), "burin-labs/harn".to_string()),
1519 ("number".to_string(), "462".to_string()),
1520 ]),
1521 PersonaRunCost::default(),
1522 now,
1523 )
1524 .await
1525 .unwrap();
1526 assert_eq!(receipt.status, "queued");
1527 assert_eq!(
1528 persona_status(&log, &binding, now)
1529 .await
1530 .unwrap()
1531 .queued_events,
1532 1
1533 );
1534 let status = resume_persona(&log, &binding, now + 1000).await.unwrap();
1535 assert_eq!(status.state, PersonaLifecycleState::Idle);
1536 assert_eq!(status.queued_events, 0);
1537 }
1538
1539 #[tokio::test]
1540 async fn resumed_queued_work_reuses_original_budget_cost() {
1541 let log = log();
1542 let mut binding = binding();
1543 binding.budget.run_usd = Some(0.01);
1544 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
1545 pause_persona(&log, &binding, now).await.unwrap();
1546 let queued = fire_trigger(
1547 &log,
1548 &binding,
1549 "github",
1550 "pull_request",
1551 BTreeMap::from([
1552 ("repository".to_string(), "burin-labs/harn".to_string()),
1553 ("number".to_string(), "1379".to_string()),
1554 ]),
1555 PersonaRunCost {
1556 cost_usd: 0.02,
1557 tokens: 1,
1558 ..Default::default()
1559 },
1560 now + 1,
1561 )
1562 .await
1563 .unwrap();
1564 assert_eq!(queued.status, "queued");
1565
1566 let status = resume_persona(&log, &binding, now + 2).await.unwrap();
1567
1568 assert_eq!(status.budget.reason.as_deref(), Some("run_usd"));
1569 assert!(status
1570 .last_error
1571 .as_deref()
1572 .is_some_and(|error| error.contains("run_usd")));
1573 assert_eq!(status.queued_events, 1);
1574 }
1575
1576 #[tokio::test]
1577 async fn duplicate_trigger_envelope_is_not_processed_twice() {
1578 let log = log();
1579 let binding = binding();
1580 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
1581 let metadata = BTreeMap::from([
1582 ("repository".to_string(), "burin-labs/harn".to_string()),
1583 ("number".to_string(), "462".to_string()),
1584 ]);
1585 let first = fire_trigger(
1586 &log,
1587 &binding,
1588 "github",
1589 "pull_request",
1590 metadata.clone(),
1591 PersonaRunCost::default(),
1592 now,
1593 )
1594 .await
1595 .unwrap();
1596 let second = fire_trigger(
1597 &log,
1598 &binding,
1599 "github",
1600 "pull_request",
1601 metadata,
1602 PersonaRunCost::default(),
1603 now + 1000,
1604 )
1605 .await
1606 .unwrap();
1607 assert_eq!(first.status, "completed");
1608 assert_eq!(second.status, "duplicate");
1609 assert!(second.lease.is_none());
1610 }
1611
1612 #[tokio::test]
1613 async fn disabled_personas_dead_letter_events() {
1614 let log = log();
1615 let binding = binding();
1616 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
1617 disable_persona(&log, &binding, now).await.unwrap();
1618 let receipt = fire_trigger(
1619 &log,
1620 &binding,
1621 "slack",
1622 "message",
1623 BTreeMap::from([
1624 ("channel".to_string(), "C123".to_string()),
1625 ("ts".to_string(), "1713988800.000100".to_string()),
1626 ]),
1627 PersonaRunCost::default(),
1628 now,
1629 )
1630 .await
1631 .unwrap();
1632 assert_eq!(receipt.status, "dead_lettered");
1633 let status = persona_status(&log, &binding, now).await.unwrap();
1634 assert_eq!(status.state, PersonaLifecycleState::Disabled);
1635 assert_eq!(status.disabled_events, 1);
1636 }
1637
1638 #[tokio::test]
1639 async fn budget_exhaustion_blocks_expensive_work() {
1640 let log = log();
1641 let mut binding = binding();
1642 binding.budget.daily_usd = Some(0.01);
1643 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
1644 let receipt = fire_trigger(
1645 &log,
1646 &binding,
1647 "linear",
1648 "issue",
1649 BTreeMap::from([("issue_key".to_string(), "HAR-462".to_string())]),
1650 PersonaRunCost {
1651 cost_usd: 0.02,
1652 tokens: 1,
1653 ..Default::default()
1654 },
1655 now,
1656 )
1657 .await
1658 .unwrap();
1659 assert_eq!(receipt.status, "budget_exhausted");
1660 let status = persona_status(&log, &binding, now).await.unwrap();
1661 assert_eq!(status.budget.reason.as_deref(), Some("daily_usd"));
1662 assert!(status.budget.exhausted);
1663 assert!(status.last_error.as_deref().unwrap().contains("daily_usd"));
1664 }
1665
1666 #[tokio::test]
1667 async fn deterministic_predicate_hit_emits_value_event_with_avoided_cost() {
1668 let log = log();
1669 let binding = binding();
1670 let captured = Arc::new(Mutex::new(Vec::new()));
1671 let _registration = register_persona_value_sink(Arc::new(CapturingValueSink {
1672 events: captured.clone(),
1673 }));
1674 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
1675
1676 let receipt = fire_trigger(
1677 &log,
1678 &binding,
1679 "github",
1680 "pull_request",
1681 BTreeMap::from([
1682 ("repository".to_string(), "burin-labs/harn".to_string()),
1683 ("number".to_string(), "715".to_string()),
1684 ]),
1685 PersonaRunCost {
1686 avoided_cost_usd: 0.0042,
1687 deterministic_steps: 1,
1688 metadata: json!({
1689 "predicate": "pr_already_green",
1690 "would_have_called_model": "gpt-5.4-mini",
1691 }),
1692 ..Default::default()
1693 },
1694 now,
1695 )
1696 .await
1697 .unwrap();
1698
1699 let run_id = receipt.run_id.expect("completed run has run_id");
1700 let events = captured.lock().unwrap().clone();
1701 let deterministic = events
1702 .iter()
1703 .find(|event| {
1704 event.kind == PersonaValueEventKind::DeterministicExecution
1705 && event.run_id == Some(run_id)
1706 })
1707 .expect("deterministic execution value event");
1708 assert_eq!(deterministic.persona_id, "merge_captain");
1709 assert_eq!(
1710 deterministic.template_ref.as_deref(),
1711 Some("software_factory@v0")
1712 );
1713 assert_eq!(deterministic.run_id, Some(run_id));
1714 assert_eq!(deterministic.paid_cost_usd, 0.0);
1715 assert_eq!(deterministic.avoided_cost_usd, 0.0042);
1716 assert_eq!(deterministic.deterministic_steps, 1);
1717 assert_eq!(
1718 deterministic.metadata["predicate"].as_str(),
1719 Some("pr_already_green")
1720 );
1721
1722 let persisted = read_persona_events(&log, &binding.name).await.unwrap();
1723 assert!(persisted.iter().any(|(_, event)| {
1724 event.kind == "persona.value.deterministic_execution"
1725 && event.payload["avoided_cost_usd"] == json!(0.0042)
1726 }));
1727 }
1728
1729 #[tokio::test]
1730 async fn frontier_escalation_run_emits_value_event_with_paid_cost() {
1731 let log = log();
1732 let binding = binding();
1733 let captured = Arc::new(Mutex::new(Vec::new()));
1734 let _registration = register_persona_value_sink(Arc::new(CapturingValueSink {
1735 events: captured.clone(),
1736 }));
1737 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
1738
1739 let receipt = fire_trigger(
1740 &log,
1741 &binding,
1742 "linear",
1743 "issue",
1744 BTreeMap::from([("issue_key".to_string(), "HAR-715".to_string())]),
1745 PersonaRunCost {
1746 cost_usd: 0.011,
1747 tokens: 20,
1748 llm_steps: 1,
1749 frontier_escalations: 1,
1750 metadata: json!({
1751 "frontier_model": "gpt-5.4",
1752 "escalation_reason": "high_risk_merge",
1753 }),
1754 ..Default::default()
1755 },
1756 now,
1757 )
1758 .await
1759 .unwrap();
1760
1761 let run_id = receipt.run_id.expect("completed run has run_id");
1762 let events = captured.lock().unwrap().clone();
1763 let escalation = events
1764 .iter()
1765 .find(|event| {
1766 event.kind == PersonaValueEventKind::FrontierEscalation
1767 && event.run_id == Some(run_id)
1768 })
1769 .expect("frontier escalation value event");
1770 assert_eq!(escalation.run_id, Some(run_id));
1771 assert_eq!(escalation.paid_cost_usd, 0.011);
1772 assert_eq!(escalation.avoided_cost_usd, 0.0);
1773 assert_eq!(escalation.llm_steps, 1);
1774 assert_eq!(
1775 escalation.metadata["frontier_model"].as_str(),
1776 Some("gpt-5.4")
1777 );
1778
1779 let completion = events
1780 .iter()
1781 .find(|event| {
1782 event.kind == PersonaValueEventKind::RunCompleted && event.run_id == Some(run_id)
1783 })
1784 .expect("run completed value event");
1785 assert_eq!(completion.paid_cost_usd, 0.0);
1786 }
1787}