1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::sync::{Arc, OnceLock, RwLock};
4
5use chrono::{TimeZone, Utc};
6use croner::Cron;
7use serde::{Deserialize, Serialize};
8use serde_json::json;
9use time::format_description::well_known::Rfc3339;
10use time::OffsetDateTime;
11use uuid::Uuid;
12
13use crate::event_log::{AnyEventLog, EventLog, LogEvent, Topic};
14
15pub const PERSONA_RUNTIME_TOPIC: &str = "persona.runtime.events";
16
17const DEFAULT_LEASE_TTL_MS: i64 = 5 * 60 * 1000;
18
19#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
20#[serde(rename_all = "snake_case")]
21pub enum PersonaLifecycleState {
22 Inactive,
23 Starting,
24 #[default]
25 Idle,
26 Running,
27 Paused,
28 Draining,
29 Failed,
30 Disabled,
31}
32
33impl PersonaLifecycleState {
34 pub fn as_str(self) -> &'static str {
35 match self {
36 Self::Inactive => "inactive",
37 Self::Starting => "starting",
38 Self::Idle => "idle",
39 Self::Running => "running",
40 Self::Paused => "paused",
41 Self::Draining => "draining",
42 Self::Failed => "failed",
43 Self::Disabled => "disabled",
44 }
45 }
46}
47
48#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
49pub struct PersonaBudgetPolicy {
50 pub daily_usd: Option<f64>,
51 pub hourly_usd: Option<f64>,
52 pub run_usd: Option<f64>,
53 pub max_tokens: Option<u64>,
54}
55
56#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
57pub struct PersonaRuntimeBinding {
58 pub name: String,
59 #[serde(default)]
60 pub template_ref: Option<String>,
61 pub entry_workflow: String,
62 #[serde(default)]
63 pub schedules: Vec<String>,
64 #[serde(default)]
65 pub triggers: Vec<String>,
66 #[serde(default)]
67 pub budget: PersonaBudgetPolicy,
68}
69
70#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
71pub struct PersonaLease {
72 pub id: String,
73 pub holder: String,
74 pub work_key: String,
75 pub acquired_at_ms: i64,
76 pub expires_at_ms: i64,
77}
78
79#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
80pub struct PersonaBudgetStatus {
81 pub daily_usd: Option<f64>,
82 pub hourly_usd: Option<f64>,
83 pub run_usd: Option<f64>,
84 pub max_tokens: Option<u64>,
85 pub spent_today_usd: f64,
86 pub spent_this_hour_usd: f64,
87 pub spent_last_run_usd: f64,
88 pub tokens_today: u64,
89 pub remaining_today_usd: Option<f64>,
90 pub remaining_hour_usd: Option<f64>,
91 pub exhausted: bool,
92 pub reason: Option<String>,
93 pub last_receipt_id: Option<String>,
94}
95
96#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
97pub struct PersonaStatus {
98 pub name: String,
99 #[serde(default)]
100 pub template_ref: Option<String>,
101 pub state: PersonaLifecycleState,
102 pub entry_workflow: String,
103 #[serde(default)]
104 pub role: String,
105 #[serde(default)]
106 pub current_assignment: Option<PersonaAssignmentStatus>,
107 pub last_run: Option<String>,
108 pub next_scheduled_run: Option<String>,
109 pub active_lease: Option<PersonaLease>,
110 pub budget: PersonaBudgetStatus,
111 pub last_error: Option<String>,
112 pub queued_events: usize,
113 #[serde(default)]
114 pub queued_work: Vec<PersonaQueuedWork>,
115 #[serde(default)]
116 pub handoff_inbox: Vec<PersonaHandoffInboxItem>,
117 #[serde(default)]
118 pub value_receipts: Vec<PersonaValueReceipt>,
119 pub disabled_events: usize,
120 pub paused_event_policy: String,
121}
122
123#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
124pub struct PersonaAssignmentStatus {
125 pub work_key: String,
126 pub lease_id: String,
127 pub holder: String,
128 pub acquired_at: String,
129 pub expires_at: String,
130}
131
132#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
133pub struct PersonaQueuedWork {
134 pub work_key: String,
135 pub provider: String,
136 pub kind: String,
137 pub queued_at: String,
138 pub reason: String,
139 pub source_event_id: Option<String>,
140 #[serde(default)]
141 pub metadata: BTreeMap<String, String>,
142}
143
144#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
145pub struct PersonaHandoffInboxItem {
146 pub work_key: String,
147 pub handoff_id: Option<String>,
148 pub handoff_kind: Option<String>,
149 pub source_persona: Option<String>,
150 pub task: Option<String>,
151 pub queued_at: String,
152 pub reason: String,
153}
154
155#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
156pub struct PersonaValueReceipt {
157 pub kind: PersonaValueEventKind,
158 pub run_id: Option<Uuid>,
159 pub occurred_at: String,
160 pub paid_cost_usd: f64,
161 pub avoided_cost_usd: f64,
162 pub deterministic_steps: i64,
163 pub llm_steps: i64,
164 pub metadata: serde_json::Value,
165}
166
167#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
168pub struct PersonaTriggerEnvelope {
169 pub provider: String,
170 pub kind: String,
171 pub subject_key: String,
172 pub source_event_id: Option<String>,
173 pub received_at_ms: i64,
174 #[serde(default)]
175 pub metadata: BTreeMap<String, String>,
176 #[serde(default)]
177 pub raw: serde_json::Value,
178}
179
180#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
181pub struct PersonaRunReceipt {
182 pub status: String,
183 pub persona: String,
184 #[serde(default)]
185 pub run_id: Option<Uuid>,
186 pub work_key: String,
187 pub lease: Option<PersonaLease>,
188 pub queued: bool,
189 pub error: Option<String>,
190 pub budget_receipt_id: Option<String>,
191}
192
193#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
194pub struct PersonaRunCost {
195 pub cost_usd: f64,
196 pub tokens: u64,
197 #[serde(default)]
198 pub avoided_cost_usd: f64,
199 #[serde(default)]
200 pub deterministic_steps: i64,
201 #[serde(default)]
202 pub llm_steps: i64,
203 #[serde(default)]
204 pub frontier_escalations: i64,
205 #[serde(default)]
206 pub metadata: serde_json::Value,
207}
208
209#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
210#[serde(rename_all = "snake_case")]
211pub enum PersonaValueEventKind {
212 RunStarted,
213 RunCompleted,
214 AcceptedOutcome,
215 FrontierEscalation,
216 DeterministicExecution,
217 PromotionSavings,
218 ApprovalWait,
219}
220
221impl PersonaValueEventKind {
222 pub fn as_str(self) -> &'static str {
223 match self {
224 Self::RunStarted => "run_started",
225 Self::RunCompleted => "run_completed",
226 Self::AcceptedOutcome => "accepted_outcome",
227 Self::FrontierEscalation => "frontier_escalation",
228 Self::DeterministicExecution => "deterministic_execution",
229 Self::PromotionSavings => "promotion_savings",
230 Self::ApprovalWait => "approval_wait",
231 }
232 }
233}
234
235#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
236pub struct PersonaValueEvent {
237 pub persona_id: String,
238 pub template_ref: Option<String>,
239 pub run_id: Option<Uuid>,
240 pub kind: PersonaValueEventKind,
241 pub paid_cost_usd: f64,
242 pub avoided_cost_usd: f64,
243 pub deterministic_steps: i64,
244 pub llm_steps: i64,
245 pub metadata: serde_json::Value,
246 pub occurred_at: OffsetDateTime,
247}
248
249pub trait PersonaValueSink: Send + Sync {
250 fn handle_value_event(&self, event: &PersonaValueEvent);
251}
252
253#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
260#[serde(tag = "update_kind", rename_all = "snake_case")]
261pub enum PersonaSupervisionEvent {
262 QueuePosition(PersonaQueuePositionUpdate),
263 RepairWorkerStatus(PersonaRepairWorkerStatusUpdate),
264 Receipt(PersonaReceiptUpdate),
265 Checkpoint(PersonaCheckpointUpdate),
266}
267
268#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
269pub struct PersonaQueuePositionUpdate {
270 pub persona_id: String,
271 #[serde(default)]
272 pub template_ref: Option<String>,
273 pub work_key: String,
274 pub queue_depth: i64,
275 pub position: i64,
277 pub queued_at_ms: i64,
278 pub occurred_at_ms: i64,
279}
280
281#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
282#[serde(rename_all = "snake_case")]
283pub enum PersonaRepairWorkerLifecycle {
284 Pending,
285 Running,
286 Verifying,
287 Pushing,
288 Succeeded,
289 Failed,
290 Cancelled,
291}
292
293impl PersonaRepairWorkerLifecycle {
294 pub fn as_str(self) -> &'static str {
295 match self {
296 Self::Pending => "pending",
297 Self::Running => "running",
298 Self::Verifying => "verifying",
299 Self::Pushing => "pushing",
300 Self::Succeeded => "succeeded",
301 Self::Failed => "failed",
302 Self::Cancelled => "cancelled",
303 }
304 }
305}
306
307#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
308pub struct PersonaRepairWorkerStatusUpdate {
309 pub persona_id: String,
310 #[serde(default)]
311 pub template_ref: Option<String>,
312 pub repair_worker_id: String,
313 pub lifecycle: PersonaRepairWorkerLifecycle,
314 #[serde(default)]
315 pub work_key: Option<String>,
316 #[serde(default)]
317 pub lease_id: Option<String>,
318 #[serde(default)]
319 pub scratchpad_url: Option<String>,
320 pub last_heartbeat_ms: i64,
321 pub occurred_at_ms: i64,
322}
323
324#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
325pub struct PersonaReceiptUpdate {
326 pub persona_id: String,
327 #[serde(default)]
328 pub template_ref: Option<String>,
329 pub receipt: PersonaRunReceipt,
330 pub occurred_at_ms: i64,
331}
332
333#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
334pub struct PersonaCheckpointUpdate {
335 pub persona_id: String,
336 #[serde(default)]
337 pub template_ref: Option<String>,
338 pub action: PersonaCheckpointAction,
339 pub checkpoint_id: String,
340 #[serde(default)]
341 pub work_key: Option<String>,
342 #[serde(default)]
344 pub resumed_from: Option<PersonaCheckpointResume>,
345 pub occurred_at_ms: i64,
346}
347
348#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
349#[serde(rename_all = "snake_case")]
350pub enum PersonaCheckpointAction {
351 RestoreAcked,
352}
353
354impl PersonaCheckpointAction {
355 pub fn as_str(self) -> &'static str {
356 match self {
357 Self::RestoreAcked => "restore_acked",
358 }
359 }
360}
361
362#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
363pub struct PersonaCheckpointResume {
364 pub run_id: Option<Uuid>,
365 pub lease_id: Option<String>,
366 pub last_run_ms: Option<i64>,
367 pub queued_work_keys: Vec<String>,
368 #[serde(default)]
369 pub note: Option<String>,
370}
371
372pub trait PersonaSupervisionSink: Send + Sync {
373 fn handle_supervision_event(&self, event: &PersonaSupervisionEvent);
374}
375
376struct TypedSinkRegistry<T: ?Sized + Send + Sync> {
377 sinks: RwLock<Vec<(u64, Arc<T>)>>,
378 next_id: AtomicU64,
379}
380
381impl<T: ?Sized + Send + Sync> TypedSinkRegistry<T> {
382 const fn new() -> Self {
383 Self {
384 sinks: RwLock::new(Vec::new()),
385 next_id: AtomicU64::new(1),
386 }
387 }
388
389 fn register(&self, sink: Arc<T>) -> u64 {
390 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
391 if let Ok(mut sinks) = self.sinks.write() {
392 sinks.push((id, sink));
393 }
394 id
395 }
396
397 fn unregister(&self, id: u64) {
398 if let Ok(mut sinks) = self.sinks.write() {
399 sinks.retain(|(existing, _)| *existing != id);
400 }
401 }
402
403 fn snapshot(&self) -> Vec<Arc<T>> {
404 self.sinks
405 .read()
406 .map(|sinks| sinks.iter().map(|(_, sink)| Arc::clone(sink)).collect())
407 .unwrap_or_default()
408 }
409}
410
411fn persona_value_sinks() -> &'static TypedSinkRegistry<dyn PersonaValueSink> {
412 static REGISTRY: OnceLock<TypedSinkRegistry<dyn PersonaValueSink>> = OnceLock::new();
413 REGISTRY.get_or_init(TypedSinkRegistry::new)
414}
415
416fn persona_supervision_sinks() -> &'static TypedSinkRegistry<dyn PersonaSupervisionSink> {
417 static REGISTRY: OnceLock<TypedSinkRegistry<dyn PersonaSupervisionSink>> = OnceLock::new();
418 REGISTRY.get_or_init(TypedSinkRegistry::new)
419}
420
421#[must_use = "dropping the registration immediately unregisters the sink"]
422pub struct PersonaValueSinkRegistration {
423 id: u64,
424}
425
426impl Drop for PersonaValueSinkRegistration {
427 fn drop(&mut self) {
428 persona_value_sinks().unregister(self.id);
429 }
430}
431
432pub fn register_persona_value_sink(
433 sink: Arc<dyn PersonaValueSink>,
434) -> PersonaValueSinkRegistration {
435 PersonaValueSinkRegistration {
436 id: persona_value_sinks().register(sink),
437 }
438}
439
440#[must_use = "dropping the registration immediately unregisters the sink"]
441pub struct PersonaSupervisionSinkRegistration {
442 id: u64,
443}
444
445impl Drop for PersonaSupervisionSinkRegistration {
446 fn drop(&mut self) {
447 persona_supervision_sinks().unregister(self.id);
448 }
449}
450
451pub fn register_persona_supervision_sink(
452 sink: Arc<dyn PersonaSupervisionSink>,
453) -> PersonaSupervisionSinkRegistration {
454 PersonaSupervisionSinkRegistration {
455 id: persona_supervision_sinks().register(sink),
456 }
457}
458
459pub async fn persona_status(
460 log: &Arc<AnyEventLog>,
461 binding: &PersonaRuntimeBinding,
462 now_ms: i64,
463) -> Result<PersonaStatus, String> {
464 let events = read_persona_events(log, &binding.name).await?;
465 let mut state = PersonaLifecycleState::Idle;
466 let mut last_run_ms = None;
467 let mut active_lease = None;
468 let mut last_error = None;
469 let mut queued = BTreeSet::<String>::new();
470 let mut completed = BTreeSet::<String>::new();
471 let mut disabled_events = 0usize;
472 let mut budget_receipt = None;
473 let mut budget_exhaustion_reason = None;
474 let mut spent = Vec::<(i64, f64, u64)>::new();
475 let mut queued_work = BTreeMap::<String, PersonaQueuedWork>::new();
476 let mut value_receipts = Vec::<PersonaValueReceipt>::new();
477
478 for (_, event) in events {
479 match event.kind.as_str() {
480 "persona.control.paused" => state = PersonaLifecycleState::Paused,
481 "persona.control.resumed" => state = PersonaLifecycleState::Idle,
482 "persona.control.disabled" => state = PersonaLifecycleState::Disabled,
483 "persona.control.draining" => state = PersonaLifecycleState::Draining,
484 "persona.lease.acquired" => {
485 if let Ok(lease) = serde_json::from_value::<PersonaLease>(event.payload.clone()) {
486 active_lease = Some(lease);
487 state = PersonaLifecycleState::Running;
488 }
489 }
490 "persona.lease.released" => {
491 active_lease = None;
492 if !matches!(
493 state,
494 PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
495 ) {
496 state = PersonaLifecycleState::Idle;
497 }
498 }
499 "persona.lease.expired" => {
500 active_lease = None;
501 if !matches!(
502 state,
503 PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
504 ) {
505 state = PersonaLifecycleState::Idle;
506 }
507 }
508 "persona.run.started" => state = PersonaLifecycleState::Running,
509 "persona.run.completed" => {
510 last_run_ms = event
511 .payload
512 .get("completed_at_ms")
513 .and_then(serde_json::Value::as_i64)
514 .or(Some(event.occurred_at_ms));
515 if let Some(work_key) = event
516 .payload
517 .get("work_key")
518 .and_then(serde_json::Value::as_str)
519 {
520 completed.insert(work_key.to_string());
521 }
522 if !matches!(
523 state,
524 PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
525 ) {
526 state = PersonaLifecycleState::Idle;
527 }
528 }
529 "persona.run.failed" => {
530 state = PersonaLifecycleState::Failed;
531 last_error = event
532 .payload
533 .get("error")
534 .and_then(serde_json::Value::as_str)
535 .map(ToString::to_string);
536 }
537 "persona.trigger.queued" => {
538 if let Some(work_key) = event
539 .payload
540 .get("work_key")
541 .and_then(serde_json::Value::as_str)
542 {
543 queued.insert(work_key.to_string());
544 }
545 if let Some(item) = queued_work_from_event(&event)? {
546 queued_work.insert(item.work_key.clone(), item);
547 }
548 }
549 "persona.trigger.dead_lettered" => disabled_events += 1,
550 "persona.budget.recorded" => {
551 budget_receipt = event
552 .payload
553 .get("receipt_id")
554 .and_then(serde_json::Value::as_str)
555 .map(ToString::to_string);
556 spent.push((
557 event.occurred_at_ms,
558 event
559 .payload
560 .get("cost_usd")
561 .and_then(serde_json::Value::as_f64)
562 .unwrap_or_default(),
563 event
564 .payload
565 .get("tokens")
566 .and_then(serde_json::Value::as_u64)
567 .unwrap_or_default(),
568 ));
569 }
570 "persona.budget.exhausted" => {
571 budget_exhaustion_reason = event
572 .payload
573 .get("reason")
574 .and_then(serde_json::Value::as_str)
575 .map(ToString::to_string);
576 last_error = budget_exhaustion_reason
577 .as_ref()
578 .map(|reason| format!("persona budget exhausted: {reason}"));
579 budget_receipt = event
580 .payload
581 .get("receipt_id")
582 .and_then(serde_json::Value::as_str)
583 .map(ToString::to_string);
584 }
585 kind if kind.starts_with("persona.value.") => {
586 if let Some(receipt) = value_receipt_from_event(&event)? {
587 value_receipts.push(receipt);
588 }
589 }
590 _ => {}
591 }
592 }
593
594 if let Some(lease) = active_lease.as_ref() {
595 if lease.expires_at_ms <= now_ms {
596 active_lease = None;
597 if !matches!(
598 state,
599 PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
600 ) {
601 state = PersonaLifecycleState::Idle;
602 }
603 }
604 }
605
606 queued.retain(|work_key| !completed.contains(work_key));
607 queued_work.retain(|work_key, _| !completed.contains(work_key));
608 let queued_work = queued_work.into_values().collect::<Vec<_>>();
609 let handoff_inbox = queued_work
610 .iter()
611 .filter_map(handoff_inbox_item)
612 .collect::<Vec<_>>();
613
614 let mut budget = budget_status(&binding.budget, &spent, now_ms);
615 if budget.reason.is_none() {
616 if let Some(reason) = budget_exhaustion_reason {
617 budget.exhausted = true;
618 budget.reason = Some(reason);
619 }
620 }
621 if budget.last_receipt_id.is_none() {
622 budget.last_receipt_id = budget_receipt;
623 }
624
625 let current_assignment = active_lease.as_ref().map(assignment_status_from_lease);
626
627 Ok(PersonaStatus {
628 name: binding.name.clone(),
629 template_ref: binding.template_ref.clone(),
630 state,
631 entry_workflow: binding.entry_workflow.clone(),
632 role: binding.name.clone(),
633 current_assignment,
634 last_run: last_run_ms.map(format_ms),
635 next_scheduled_run: next_scheduled_run(binding, last_run_ms, now_ms),
636 active_lease,
637 budget,
638 last_error,
639 queued_events: queued.len(),
640 queued_work,
641 handoff_inbox,
642 value_receipts,
643 disabled_events,
644 paused_event_policy: "queue_then_drain_on_resume".to_string(),
645 })
646}
647
648pub async fn pause_persona(
649 log: &Arc<AnyEventLog>,
650 binding: &PersonaRuntimeBinding,
651 now_ms: i64,
652) -> Result<PersonaStatus, String> {
653 append_persona_event(
654 log,
655 &binding.name,
656 "persona.control.paused",
657 json!({"paused_at_ms": now_ms, "policy": "queue_then_drain_on_resume"}),
658 now_ms,
659 )
660 .await?;
661 persona_status(log, binding, now_ms).await
662}
663
664pub async fn resume_persona(
665 log: &Arc<AnyEventLog>,
666 binding: &PersonaRuntimeBinding,
667 now_ms: i64,
668) -> Result<PersonaStatus, String> {
669 append_persona_event(
670 log,
671 &binding.name,
672 "persona.control.resumed",
673 json!({"resumed_at_ms": now_ms, "drain": true}),
674 now_ms,
675 )
676 .await?;
677 let queued = queued_events(log, &binding.name).await?;
678 for (envelope, cost) in queued {
679 let _ = run_for_envelope(log, binding, envelope, cost, now_ms).await?;
680 }
681 persona_status(log, binding, now_ms).await
682}
683
684pub async fn disable_persona(
685 log: &Arc<AnyEventLog>,
686 binding: &PersonaRuntimeBinding,
687 now_ms: i64,
688) -> Result<PersonaStatus, String> {
689 append_persona_event(
690 log,
691 &binding.name,
692 "persona.control.disabled",
693 json!({"disabled_at_ms": now_ms}),
694 now_ms,
695 )
696 .await?;
697 persona_status(log, binding, now_ms).await
698}
699
700pub async fn fire_schedule(
701 log: &Arc<AnyEventLog>,
702 binding: &PersonaRuntimeBinding,
703 cost: PersonaRunCost,
704 now_ms: i64,
705) -> Result<PersonaRunReceipt, String> {
706 let schedule = binding
707 .schedules
708 .first()
709 .cloned()
710 .unwrap_or_else(|| "manual".to_string());
711 let envelope = PersonaTriggerEnvelope {
712 provider: "schedule".to_string(),
713 kind: "cron.tick".to_string(),
714 subject_key: format!("schedule:{}:{schedule}:{}", binding.name, format_ms(now_ms)),
715 source_event_id: None,
716 received_at_ms: now_ms,
717 metadata: BTreeMap::from([
718 ("persona".to_string(), binding.name.clone()),
719 ("schedule".to_string(), schedule),
720 ("fired_at".to_string(), format_ms(now_ms)),
721 ]),
722 raw: json!({}),
723 };
724 append_persona_event(
725 log,
726 &binding.name,
727 "persona.schedule.fired",
728 json!({"persona": binding.name, "envelope": envelope}),
729 now_ms,
730 )
731 .await?;
732 run_for_envelope(log, binding, envelope, cost, now_ms).await
733}
734
735pub async fn fire_trigger(
736 log: &Arc<AnyEventLog>,
737 binding: &PersonaRuntimeBinding,
738 provider: &str,
739 kind: &str,
740 metadata: BTreeMap<String, String>,
741 cost: PersonaRunCost,
742 now_ms: i64,
743) -> Result<PersonaRunReceipt, String> {
744 let envelope = normalize_trigger_envelope(provider, kind, metadata, now_ms);
745 append_persona_event(
746 log,
747 &binding.name,
748 "persona.trigger.received",
749 json!({"persona": binding.name, "envelope": envelope}),
750 now_ms,
751 )
752 .await?;
753 run_for_envelope(log, binding, envelope, cost, now_ms).await
754}
755
756pub async fn record_persona_spend(
757 log: &Arc<AnyEventLog>,
758 binding: &PersonaRuntimeBinding,
759 cost: PersonaRunCost,
760 now_ms: i64,
761) -> Result<PersonaBudgetStatus, String> {
762 enforce_budget(log, binding, &cost, now_ms).await?;
763 append_budget_record(log, &binding.name, &cost, None, now_ms).await?;
764 persona_status(log, binding, now_ms)
765 .await
766 .map(|status| status.budget)
767}
768
769pub async fn report_repair_worker_status(
777 log: &Arc<AnyEventLog>,
778 binding: &PersonaRuntimeBinding,
779 status: PersonaRepairWorkerStatusUpdate,
780 now_ms: i64,
781) -> Result<bool, String> {
782 let mut status = status;
783 if status.persona_id.is_empty() {
784 status.persona_id = binding.name.clone();
785 }
786 if status.template_ref.is_none() {
787 status.template_ref = binding.template_ref.clone();
788 }
789 if status.occurred_at_ms == 0 {
790 status.occurred_at_ms = now_ms;
791 }
792 if status.last_heartbeat_ms == 0 {
793 status.last_heartbeat_ms = now_ms;
794 }
795
796 if repair_worker_status_recorded(log, &binding.name, &status).await? {
797 return Ok(false);
798 }
799 append_persona_event(
800 log,
801 &binding.name,
802 "persona.repair_worker.status",
803 serde_json::to_value(&status).map_err(|error| error.to_string())?,
804 status.occurred_at_ms,
805 )
806 .await?;
807 emit_persona_supervision_sink_event(&PersonaSupervisionEvent::RepairWorkerStatus(status));
808 Ok(true)
809}
810
811pub async fn restore_persona_checkpoint(
818 log: &Arc<AnyEventLog>,
819 binding: &PersonaRuntimeBinding,
820 request: PersonaCheckpointRestoreRequest,
821 now_ms: i64,
822) -> Result<PersonaCheckpointRestoreOutcome, String> {
823 let PersonaCheckpointRestoreRequest {
824 checkpoint_id,
825 work_key,
826 resumed_from,
827 } = request;
828 let status = persona_status(log, binding, now_ms).await?;
829
830 if let Some(prior) = find_checkpoint_restore_ack(log, &binding.name, &checkpoint_id).await? {
831 return Ok(PersonaCheckpointRestoreOutcome {
832 acked: false,
833 update: prior,
834 });
835 }
836
837 let resume_coordinates = resumed_from.unwrap_or_else(|| PersonaCheckpointResume {
838 run_id: None,
839 lease_id: status.active_lease.as_ref().map(|lease| lease.id.clone()),
840 last_run_ms: status
841 .last_run
842 .as_deref()
843 .and_then(|value| parse_rfc3339_ms(value).ok()),
844 queued_work_keys: status
845 .queued_work
846 .iter()
847 .map(|item| item.work_key.clone())
848 .collect(),
849 note: None,
850 });
851
852 let update = PersonaCheckpointUpdate {
853 persona_id: binding.name.clone(),
854 template_ref: binding.template_ref.clone(),
855 action: PersonaCheckpointAction::RestoreAcked,
856 checkpoint_id: checkpoint_id.clone(),
857 work_key,
858 resumed_from: Some(resume_coordinates),
859 occurred_at_ms: now_ms,
860 };
861
862 append_persona_event(
863 log,
864 &binding.name,
865 "persona.checkpoint.restore_acked",
866 serde_json::to_value(&update).map_err(|error| error.to_string())?,
867 now_ms,
868 )
869 .await?;
870 emit_persona_supervision_sink_event(&PersonaSupervisionEvent::Checkpoint(update.clone()));
871 Ok(PersonaCheckpointRestoreOutcome {
872 acked: true,
873 update,
874 })
875}
876
877#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
878pub struct PersonaCheckpointRestoreRequest {
879 pub checkpoint_id: String,
880 #[serde(default)]
881 pub work_key: Option<String>,
882 #[serde(default)]
883 pub resumed_from: Option<PersonaCheckpointResume>,
884}
885
886#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
887pub struct PersonaCheckpointRestoreOutcome {
888 pub acked: bool,
889 pub update: PersonaCheckpointUpdate,
890}
891
892async fn repair_worker_status_recorded(
893 log: &Arc<AnyEventLog>,
894 persona: &str,
895 update: &PersonaRepairWorkerStatusUpdate,
896) -> Result<bool, String> {
897 let events = read_persona_events(log, persona).await?;
898 Ok(events.into_iter().any(|(_, event)| {
899 event.kind == "persona.repair_worker.status"
900 && event
901 .payload
902 .get("repair_worker_id")
903 .and_then(serde_json::Value::as_str)
904 == Some(update.repair_worker_id.as_str())
905 && event
906 .payload
907 .get("lifecycle")
908 .and_then(serde_json::Value::as_str)
909 == Some(update.lifecycle.as_str())
910 }))
911}
912
913async fn find_checkpoint_restore_ack(
914 log: &Arc<AnyEventLog>,
915 persona: &str,
916 checkpoint_id: &str,
917) -> Result<Option<PersonaCheckpointUpdate>, String> {
918 let events = read_persona_events(log, persona).await?;
919 for (_, event) in events.into_iter().rev() {
920 if event.kind != "persona.checkpoint.restore_acked" {
921 continue;
922 }
923 if event
924 .payload
925 .get("checkpoint_id")
926 .and_then(serde_json::Value::as_str)
927 != Some(checkpoint_id)
928 {
929 continue;
930 }
931 let update: PersonaCheckpointUpdate =
932 serde_json::from_value(event.payload).map_err(|error| error.to_string())?;
933 return Ok(Some(update));
934 }
935 Ok(None)
936}
937
938async fn run_for_envelope(
939 log: &Arc<AnyEventLog>,
940 binding: &PersonaRuntimeBinding,
941 envelope: PersonaTriggerEnvelope,
942 cost: PersonaRunCost,
943 now_ms: i64,
944) -> Result<PersonaRunReceipt, String> {
945 let pre_queue = queue_snapshot(log, binding, now_ms).await?;
946 let receipt = run_for_envelope_inner(log, binding, envelope, cost, now_ms).await?;
947 let post_queue = queue_snapshot(log, binding, now_ms).await?;
948 emit_queue_position_supervision(binding, &pre_queue, &post_queue, now_ms);
949 emit_receipt_supervision(binding, &receipt, now_ms);
950 Ok(receipt)
951}
952
953async fn run_for_envelope_inner(
954 log: &Arc<AnyEventLog>,
955 binding: &PersonaRuntimeBinding,
956 envelope: PersonaTriggerEnvelope,
957 cost: PersonaRunCost,
958 now_ms: i64,
959) -> Result<PersonaRunReceipt, String> {
960 let status = persona_status(log, binding, now_ms).await?;
961 match status.state {
962 PersonaLifecycleState::Paused => {
963 append_persona_event(
964 log,
965 &binding.name,
966 "persona.trigger.queued",
967 json!({
968 "work_key": envelope.subject_key,
969 "envelope": envelope,
970 "cost": cost,
971 "reason": "paused",
972 }),
973 now_ms,
974 )
975 .await?;
976 return Ok(PersonaRunReceipt {
977 status: "queued".to_string(),
978 persona: binding.name.clone(),
979 run_id: None,
980 work_key: envelope.subject_key,
981 lease: None,
982 queued: true,
983 error: None,
984 budget_receipt_id: None,
985 });
986 }
987 PersonaLifecycleState::Disabled => {
988 append_persona_event(
989 log,
990 &binding.name,
991 "persona.trigger.dead_lettered",
992 json!({
993 "work_key": envelope.subject_key,
994 "envelope": envelope,
995 "reason": "disabled",
996 }),
997 now_ms,
998 )
999 .await?;
1000 return Ok(PersonaRunReceipt {
1001 status: "dead_lettered".to_string(),
1002 persona: binding.name.clone(),
1003 run_id: None,
1004 work_key: envelope.subject_key,
1005 lease: None,
1006 queued: false,
1007 error: Some("persona is disabled".to_string()),
1008 budget_receipt_id: None,
1009 });
1010 }
1011 _ => {}
1012 }
1013
1014 if let Err(error) = enforce_budget(log, binding, &cost, now_ms).await {
1015 return Ok(PersonaRunReceipt {
1016 status: "budget_exhausted".to_string(),
1017 persona: binding.name.clone(),
1018 run_id: None,
1019 work_key: envelope.subject_key,
1020 lease: None,
1021 queued: false,
1022 error: Some(error),
1023 budget_receipt_id: None,
1024 });
1025 }
1026
1027 if work_completed(log, &binding.name, &envelope.subject_key).await? {
1028 append_persona_event(
1029 log,
1030 &binding.name,
1031 "persona.trigger.duplicate",
1032 json!({
1033 "work_key": envelope.subject_key,
1034 "envelope": envelope,
1035 "reason": "already_completed",
1036 }),
1037 now_ms,
1038 )
1039 .await?;
1040 return Ok(PersonaRunReceipt {
1041 status: "duplicate".to_string(),
1042 persona: binding.name.clone(),
1043 run_id: None,
1044 work_key: envelope.subject_key,
1045 lease: None,
1046 queued: false,
1047 error: None,
1048 budget_receipt_id: None,
1049 });
1050 }
1051
1052 let Some(lease) = acquire_lease(
1053 log,
1054 binding,
1055 &envelope.subject_key,
1056 "persona-runtime",
1057 DEFAULT_LEASE_TTL_MS,
1058 now_ms,
1059 )
1060 .await?
1061 else {
1062 return Ok(PersonaRunReceipt {
1063 status: "lease_busy".to_string(),
1064 persona: binding.name.clone(),
1065 run_id: None,
1066 work_key: envelope.subject_key,
1067 lease: status.active_lease,
1068 queued: false,
1069 error: Some("active lease already owns persona work".to_string()),
1070 budget_receipt_id: None,
1071 });
1072 };
1073
1074 let run_id = Uuid::now_v7();
1075 let value_metadata = run_value_metadata(&envelope, &lease, &cost);
1076 append_persona_event(
1077 log,
1078 &binding.name,
1079 "persona.run.started",
1080 json!({
1081 "work_key": envelope.subject_key,
1082 "run_id": run_id,
1083 "started_at_ms": now_ms,
1084 "entry_workflow": binding.entry_workflow,
1085 "lease_id": lease.id,
1086 }),
1087 now_ms,
1088 )
1089 .await?;
1090 emit_persona_value_event(
1091 log,
1092 binding,
1093 run_id,
1094 PersonaValueEventDelta {
1095 kind: PersonaValueEventKind::RunStarted,
1096 metadata: value_metadata.clone(),
1097 ..Default::default()
1098 },
1099 now_ms,
1100 )
1101 .await?;
1102 let budget_receipt_id =
1103 append_budget_record(log, &binding.name, &cost, Some(&lease.id), now_ms).await?;
1104 if cost.avoided_cost_usd > 0.0 || cost.deterministic_steps > 0 {
1105 emit_persona_value_event(
1106 log,
1107 binding,
1108 run_id,
1109 PersonaValueEventDelta {
1110 kind: PersonaValueEventKind::DeterministicExecution,
1111 avoided_cost_usd: cost.avoided_cost_usd,
1112 deterministic_steps: cost.deterministic_steps.max(1),
1113 metadata: value_metadata.clone(),
1114 ..Default::default()
1115 },
1116 now_ms,
1117 )
1118 .await?;
1119 }
1120 if cost.frontier_escalations > 0 {
1121 emit_persona_value_event(
1122 log,
1123 binding,
1124 run_id,
1125 PersonaValueEventDelta {
1126 kind: PersonaValueEventKind::FrontierEscalation,
1127 paid_cost_usd: cost.cost_usd,
1128 llm_steps: cost.llm_steps.max(cost.frontier_escalations),
1129 metadata: value_metadata.clone(),
1130 ..Default::default()
1131 },
1132 now_ms,
1133 )
1134 .await?;
1135 }
1136 let completion_paid_cost = if cost.frontier_escalations > 0 {
1137 0.0
1138 } else {
1139 cost.cost_usd
1140 };
1141 let completion_llm_steps = if cost.frontier_escalations > 0 {
1142 0
1143 } else {
1144 cost.llm_steps
1145 };
1146 emit_persona_value_event(
1147 log,
1148 binding,
1149 run_id,
1150 PersonaValueEventDelta {
1151 kind: PersonaValueEventKind::RunCompleted,
1152 paid_cost_usd: completion_paid_cost,
1153 llm_steps: completion_llm_steps,
1154 metadata: value_metadata,
1155 ..Default::default()
1156 },
1157 now_ms,
1158 )
1159 .await?;
1160 append_persona_event(
1161 log,
1162 &binding.name,
1163 "persona.run.completed",
1164 json!({
1165 "work_key": envelope.subject_key,
1166 "run_id": run_id,
1167 "completed_at_ms": now_ms,
1168 "entry_workflow": binding.entry_workflow,
1169 "lease_id": lease.id,
1170 }),
1171 now_ms,
1172 )
1173 .await?;
1174 append_persona_event(
1175 log,
1176 &binding.name,
1177 "persona.lease.released",
1178 json!({
1179 "id": lease.id,
1180 "work_key": envelope.subject_key,
1181 "released_at_ms": now_ms,
1182 }),
1183 now_ms,
1184 )
1185 .await?;
1186 Ok(PersonaRunReceipt {
1187 status: "completed".to_string(),
1188 persona: binding.name.clone(),
1189 run_id: Some(run_id),
1190 work_key: envelope.subject_key,
1191 lease: Some(lease),
1192 queued: false,
1193 error: None,
1194 budget_receipt_id: Some(budget_receipt_id),
1195 })
1196}
1197
1198async fn acquire_lease(
1199 log: &Arc<AnyEventLog>,
1200 binding: &PersonaRuntimeBinding,
1201 work_key: &str,
1202 holder: &str,
1203 ttl_ms: i64,
1204 now_ms: i64,
1205) -> Result<Option<PersonaLease>, String> {
1206 let status = persona_status(log, binding, now_ms).await?;
1207 if let Some(lease) = status.active_lease {
1208 if lease.expires_at_ms > now_ms {
1209 append_persona_event(
1210 log,
1211 &binding.name,
1212 "persona.lease.conflict",
1213 json!({
1214 "active_lease": lease,
1215 "requested_work_key": work_key,
1216 "at_ms": now_ms,
1217 }),
1218 now_ms,
1219 )
1220 .await?;
1221 return Ok(None);
1222 }
1223 append_persona_event(
1224 log,
1225 &binding.name,
1226 "persona.lease.expired",
1227 json!({
1228 "id": lease.id,
1229 "work_key": lease.work_key,
1230 "expired_at_ms": now_ms,
1231 }),
1232 now_ms,
1233 )
1234 .await?;
1235 }
1236
1237 let lease = PersonaLease {
1238 id: format!("persona_lease_{}", Uuid::now_v7()),
1239 holder: holder.to_string(),
1240 work_key: work_key.to_string(),
1241 acquired_at_ms: now_ms,
1242 expires_at_ms: now_ms + ttl_ms,
1243 };
1244 append_persona_event(
1245 log,
1246 &binding.name,
1247 "persona.lease.acquired",
1248 serde_json::to_value(&lease).map_err(|error| error.to_string())?,
1249 now_ms,
1250 )
1251 .await?;
1252 Ok(Some(lease))
1253}
1254
1255async fn enforce_budget(
1256 log: &Arc<AnyEventLog>,
1257 binding: &PersonaRuntimeBinding,
1258 cost: &PersonaRunCost,
1259 now_ms: i64,
1260) -> Result<(), String> {
1261 let status = persona_status(log, binding, now_ms).await?;
1262 let reason = if binding
1263 .budget
1264 .run_usd
1265 .is_some_and(|limit| cost.cost_usd > limit)
1266 {
1267 Some("run_usd")
1268 } else if binding
1269 .budget
1270 .daily_usd
1271 .is_some_and(|limit| status.budget.spent_today_usd + cost.cost_usd > limit)
1272 {
1273 Some("daily_usd")
1274 } else if binding
1275 .budget
1276 .hourly_usd
1277 .is_some_and(|limit| status.budget.spent_this_hour_usd + cost.cost_usd > limit)
1278 {
1279 Some("hourly_usd")
1280 } else if binding
1281 .budget
1282 .max_tokens
1283 .is_some_and(|limit| status.budget.tokens_today + cost.tokens > limit)
1284 {
1285 Some("max_tokens")
1286 } else {
1287 None
1288 };
1289
1290 if let Some(reason) = reason {
1291 let receipt_id = format!("persona_budget_{}", Uuid::now_v7());
1292 append_persona_event(
1293 log,
1294 &binding.name,
1295 "persona.budget.exhausted",
1296 json!({
1297 "receipt_id": receipt_id,
1298 "reason": reason,
1299 "attempted_cost_usd": cost.cost_usd,
1300 "attempted_tokens": cost.tokens,
1301 "persona": binding.name,
1302 }),
1303 now_ms,
1304 )
1305 .await?;
1306 return Err(format!("persona budget exhausted: {reason}"));
1307 }
1308
1309 Ok(())
1310}
1311
1312async fn append_budget_record(
1313 log: &Arc<AnyEventLog>,
1314 persona: &str,
1315 cost: &PersonaRunCost,
1316 lease_id: Option<&str>,
1317 now_ms: i64,
1318) -> Result<String, String> {
1319 let receipt_id = format!("persona_budget_{}", Uuid::now_v7());
1320 append_persona_event(
1321 log,
1322 persona,
1323 "persona.budget.recorded",
1324 json!({
1325 "receipt_id": receipt_id,
1326 "persona": persona,
1327 "cost_usd": cost.cost_usd,
1328 "tokens": cost.tokens,
1329 "lease_id": lease_id,
1330 }),
1331 now_ms,
1332 )
1333 .await?;
1334 Ok(receipt_id)
1335}
1336
1337fn normalize_trigger_envelope(
1338 provider: &str,
1339 kind: &str,
1340 metadata: BTreeMap<String, String>,
1341 now_ms: i64,
1342) -> PersonaTriggerEnvelope {
1343 let provider = provider.to_ascii_lowercase();
1344 let kind = kind.to_string();
1345 let source_event_id = metadata
1346 .get("event_id")
1347 .or_else(|| metadata.get("id"))
1348 .cloned();
1349 let subject_key = match provider.as_str() {
1350 "github" => {
1351 let repo = metadata
1352 .get("repository")
1353 .or_else(|| metadata.get("repository.full_name"))
1354 .cloned()
1355 .unwrap_or_else(|| "unknown".to_string());
1356 if let Some(number) = metadata
1357 .get("pr")
1358 .or_else(|| metadata.get("pull_request.number"))
1359 .or_else(|| metadata.get("number"))
1360 {
1361 format!("github:{repo}:pr:{number}")
1362 } else if let Some(check) = metadata
1363 .get("check_run.name")
1364 .or_else(|| metadata.get("check_name"))
1365 {
1366 format!("github:{repo}:check:{check}")
1367 } else {
1368 format!("github:{repo}:{kind}")
1369 }
1370 }
1371 "linear" => {
1372 let issue = metadata
1373 .get("issue_key")
1374 .or_else(|| metadata.get("issue.identifier"))
1375 .or_else(|| metadata.get("issue_id"))
1376 .or_else(|| metadata.get("id"))
1377 .cloned()
1378 .unwrap_or_else(|| "unknown".to_string());
1379 format!("linear:issue:{issue}")
1380 }
1381 "slack" => {
1382 let channel = metadata
1383 .get("channel")
1384 .or_else(|| metadata.get("channel_id"))
1385 .cloned()
1386 .unwrap_or_else(|| "unknown".to_string());
1387 let ts = metadata
1388 .get("ts")
1389 .or_else(|| metadata.get("event_ts"))
1390 .cloned()
1391 .unwrap_or_else(|| "unknown".to_string());
1392 format!("slack:{channel}:{ts}")
1393 }
1394 "webhook" => metadata
1395 .get("dedupe_key")
1396 .or_else(|| metadata.get("event_id"))
1397 .map(|value| format!("webhook:{value}"))
1398 .unwrap_or_else(|| format!("webhook:{kind}:{}", Uuid::now_v7())),
1399 _ => metadata
1400 .get("dedupe_key")
1401 .or_else(|| metadata.get("event_id"))
1402 .map(|value| format!("{provider}:{kind}:{value}"))
1403 .unwrap_or_else(|| format!("{provider}:{kind}:{}", Uuid::now_v7())),
1404 };
1405
1406 PersonaTriggerEnvelope {
1407 provider,
1408 kind,
1409 subject_key,
1410 source_event_id,
1411 received_at_ms: now_ms,
1412 raw: json!({"metadata": metadata}),
1413 metadata,
1414 }
1415}
1416
1417async fn queued_events(
1418 log: &Arc<AnyEventLog>,
1419 persona: &str,
1420) -> Result<Vec<(PersonaTriggerEnvelope, PersonaRunCost)>, String> {
1421 let events = read_persona_events(log, persona).await?;
1422 let mut queued = BTreeMap::<String, (PersonaTriggerEnvelope, PersonaRunCost)>::new();
1423 let mut completed = BTreeSet::<String>::new();
1424 for (_, event) in events {
1425 match event.kind.as_str() {
1426 "persona.trigger.queued" => {
1427 let Some(envelope) = event.payload.get("envelope") else {
1428 continue;
1429 };
1430 let envelope: PersonaTriggerEnvelope =
1431 serde_json::from_value(envelope.clone()).map_err(|error| error.to_string())?;
1432 let cost = event
1433 .payload
1434 .get("cost")
1435 .cloned()
1436 .map(serde_json::from_value::<PersonaRunCost>)
1437 .transpose()
1438 .map_err(|error| error.to_string())?
1439 .unwrap_or_default();
1440 queued.insert(envelope.subject_key.clone(), (envelope, cost));
1441 }
1442 "persona.run.completed" => {
1443 if let Some(work_key) = event
1444 .payload
1445 .get("work_key")
1446 .and_then(serde_json::Value::as_str)
1447 {
1448 completed.insert(work_key.to_string());
1449 }
1450 }
1451 _ => {}
1452 }
1453 }
1454 queued.retain(|work_key, _| !completed.contains(work_key));
1455 Ok(queued.into_values().collect())
1456}
1457
1458fn assignment_status_from_lease(lease: &PersonaLease) -> PersonaAssignmentStatus {
1459 PersonaAssignmentStatus {
1460 work_key: lease.work_key.clone(),
1461 lease_id: lease.id.clone(),
1462 holder: lease.holder.clone(),
1463 acquired_at: format_ms(lease.acquired_at_ms),
1464 expires_at: format_ms(lease.expires_at_ms),
1465 }
1466}
1467
1468fn queued_work_from_event(event: &LogEvent) -> Result<Option<PersonaQueuedWork>, String> {
1469 let Some(envelope) = event.payload.get("envelope") else {
1470 return Ok(None);
1471 };
1472 let envelope: PersonaTriggerEnvelope =
1473 serde_json::from_value(envelope.clone()).map_err(|error| error.to_string())?;
1474 Ok(Some(PersonaQueuedWork {
1475 work_key: envelope.subject_key,
1476 provider: envelope.provider,
1477 kind: envelope.kind,
1478 queued_at: format_ms(event.occurred_at_ms),
1479 reason: event
1480 .payload
1481 .get("reason")
1482 .and_then(serde_json::Value::as_str)
1483 .unwrap_or("queued")
1484 .to_string(),
1485 source_event_id: envelope.source_event_id,
1486 metadata: envelope.metadata,
1487 }))
1488}
1489
1490fn handoff_inbox_item(work: &PersonaQueuedWork) -> Option<PersonaHandoffInboxItem> {
1491 if work.provider != "handoff" && !work.metadata.contains_key("handoff_id") {
1492 return None;
1493 }
1494 Some(PersonaHandoffInboxItem {
1495 work_key: work.work_key.clone(),
1496 handoff_id: work.metadata.get("handoff_id").cloned(),
1497 handoff_kind: work
1498 .metadata
1499 .get("handoff_kind")
1500 .or_else(|| work.metadata.get("kind"))
1501 .cloned(),
1502 source_persona: work.metadata.get("source_persona").cloned(),
1503 task: work.metadata.get("task").cloned(),
1504 queued_at: work.queued_at.clone(),
1505 reason: work.reason.clone(),
1506 })
1507}
1508
1509fn value_receipt_from_event(event: &LogEvent) -> Result<Option<PersonaValueReceipt>, String> {
1510 let Ok(value_event) = serde_json::from_value::<PersonaValueEvent>(event.payload.clone()) else {
1511 return Ok(None);
1512 };
1513 Ok(Some(PersonaValueReceipt {
1514 kind: value_event.kind,
1515 run_id: value_event.run_id,
1516 occurred_at: value_event
1517 .occurred_at
1518 .format(&Rfc3339)
1519 .map_err(|error| error.to_string())?,
1520 paid_cost_usd: value_event.paid_cost_usd,
1521 avoided_cost_usd: value_event.avoided_cost_usd,
1522 deterministic_steps: value_event.deterministic_steps,
1523 llm_steps: value_event.llm_steps,
1524 metadata: value_event.metadata,
1525 }))
1526}
1527
1528async fn work_completed(
1529 log: &Arc<AnyEventLog>,
1530 persona: &str,
1531 work_key: &str,
1532) -> Result<bool, String> {
1533 let events = read_persona_events(log, persona).await?;
1534 Ok(events.into_iter().any(|(_, event)| {
1535 event.kind == "persona.run.completed"
1536 && event
1537 .payload
1538 .get("work_key")
1539 .and_then(serde_json::Value::as_str)
1540 == Some(work_key)
1541 }))
1542}
1543
1544async fn read_persona_events(
1545 log: &Arc<AnyEventLog>,
1546 persona: &str,
1547) -> Result<Vec<(u64, LogEvent)>, String> {
1548 let topic = runtime_topic()?;
1549 Ok(log
1550 .read_range(&topic, None, usize::MAX)
1551 .await
1552 .map_err(|error| error.to_string())?
1553 .into_iter()
1554 .filter(|(_, event)| {
1555 event
1556 .headers
1557 .get("persona")
1558 .is_some_and(|name| name == persona)
1559 })
1560 .collect())
1561}
1562
1563async fn append_persona_event(
1564 log: &Arc<AnyEventLog>,
1565 persona: &str,
1566 kind: &str,
1567 payload: serde_json::Value,
1568 now_ms: i64,
1569) -> Result<u64, String> {
1570 let mut headers = BTreeMap::new();
1571 headers.insert("persona".to_string(), persona.to_string());
1572 let event = LogEvent {
1573 kind: kind.to_string(),
1574 payload,
1575 headers,
1576 occurred_at_ms: now_ms,
1577 };
1578 log.append(&runtime_topic()?, event)
1579 .await
1580 .map_err(|error| error.to_string())
1581}
1582
1583struct PersonaValueEventDelta {
1584 kind: PersonaValueEventKind,
1585 paid_cost_usd: f64,
1586 avoided_cost_usd: f64,
1587 deterministic_steps: i64,
1588 llm_steps: i64,
1589 metadata: serde_json::Value,
1590}
1591
1592impl Default for PersonaValueEventDelta {
1593 fn default() -> Self {
1594 Self {
1595 kind: PersonaValueEventKind::RunCompleted,
1596 paid_cost_usd: 0.0,
1597 avoided_cost_usd: 0.0,
1598 deterministic_steps: 0,
1599 llm_steps: 0,
1600 metadata: serde_json::Value::Null,
1601 }
1602 }
1603}
1604
1605async fn emit_persona_value_event(
1606 log: &Arc<AnyEventLog>,
1607 binding: &PersonaRuntimeBinding,
1608 run_id: Uuid,
1609 delta: PersonaValueEventDelta,
1610 now_ms: i64,
1611) -> Result<(), String> {
1612 let event = PersonaValueEvent {
1613 persona_id: binding.name.clone(),
1614 template_ref: binding.template_ref.clone(),
1615 run_id: Some(run_id),
1616 kind: delta.kind,
1617 paid_cost_usd: delta.paid_cost_usd.max(0.0),
1618 avoided_cost_usd: delta.avoided_cost_usd.max(0.0),
1619 deterministic_steps: delta.deterministic_steps.max(0),
1620 llm_steps: delta.llm_steps.max(0),
1621 metadata: delta.metadata,
1622 occurred_at: offset_datetime_from_ms(now_ms),
1623 };
1624 append_persona_event(
1625 log,
1626 &binding.name,
1627 &format!("persona.value.{}", event.kind.as_str()),
1628 serde_json::to_value(&event).map_err(|error| error.to_string())?,
1629 now_ms,
1630 )
1631 .await?;
1632 emit_persona_value_sink_event(&event);
1633 Ok(())
1634}
1635
1636fn emit_persona_value_sink_event(event: &PersonaValueEvent) {
1637 for sink in persona_value_sinks().snapshot() {
1638 sink.handle_value_event(event);
1639 }
1640}
1641
1642fn emit_persona_supervision_sink_event(event: &PersonaSupervisionEvent) {
1643 for sink in persona_supervision_sinks().snapshot() {
1644 sink.handle_supervision_event(event);
1645 }
1646}
1647
1648#[derive(Clone, Debug)]
1649struct QueueEntry {
1650 work_key: String,
1651 queued_at_ms: i64,
1652}
1653
1654async fn queue_snapshot(
1655 log: &Arc<AnyEventLog>,
1656 binding: &PersonaRuntimeBinding,
1657 now_ms: i64,
1658) -> Result<Vec<QueueEntry>, String> {
1659 let status = persona_status(log, binding, now_ms).await?;
1660 Ok(status
1661 .queued_work
1662 .into_iter()
1663 .map(|item| QueueEntry {
1664 queued_at_ms: parse_rfc3339_ms(&item.queued_at).unwrap_or(now_ms),
1665 work_key: item.work_key,
1666 })
1667 .collect())
1668}
1669
1670fn emit_queue_position_supervision(
1671 binding: &PersonaRuntimeBinding,
1672 before: &[QueueEntry],
1673 after: &[QueueEntry],
1674 now_ms: i64,
1675) {
1676 use std::collections::HashSet;
1677 let before_keys: HashSet<&str> = before.iter().map(|e| e.work_key.as_str()).collect();
1678 let after_keys: HashSet<&str> = after.iter().map(|e| e.work_key.as_str()).collect();
1679 let after_depth = after.len() as i64;
1680
1681 for (index, entry) in after.iter().enumerate() {
1682 if !before_keys.contains(entry.work_key.as_str()) {
1683 emit_persona_supervision_sink_event(&PersonaSupervisionEvent::QueuePosition(
1684 PersonaQueuePositionUpdate {
1685 persona_id: binding.name.clone(),
1686 template_ref: binding.template_ref.clone(),
1687 work_key: entry.work_key.clone(),
1688 queue_depth: after_depth,
1689 position: (index + 1) as i64,
1690 queued_at_ms: entry.queued_at_ms,
1691 occurred_at_ms: now_ms,
1692 },
1693 ));
1694 }
1695 }
1696 for entry in before {
1697 if !after_keys.contains(entry.work_key.as_str()) {
1698 emit_persona_supervision_sink_event(&PersonaSupervisionEvent::QueuePosition(
1699 PersonaQueuePositionUpdate {
1700 persona_id: binding.name.clone(),
1701 template_ref: binding.template_ref.clone(),
1702 work_key: entry.work_key.clone(),
1703 queue_depth: after_depth,
1704 position: 0,
1705 queued_at_ms: entry.queued_at_ms,
1706 occurred_at_ms: now_ms,
1707 },
1708 ));
1709 }
1710 }
1711}
1712
1713fn emit_receipt_supervision(
1714 binding: &PersonaRuntimeBinding,
1715 receipt: &PersonaRunReceipt,
1716 now_ms: i64,
1717) {
1718 emit_persona_supervision_sink_event(&PersonaSupervisionEvent::Receipt(PersonaReceiptUpdate {
1719 persona_id: binding.name.clone(),
1720 template_ref: binding.template_ref.clone(),
1721 receipt: receipt.clone(),
1722 occurred_at_ms: now_ms,
1723 }));
1724}
1725
1726fn run_value_metadata(
1727 envelope: &PersonaTriggerEnvelope,
1728 lease: &PersonaLease,
1729 cost: &PersonaRunCost,
1730) -> serde_json::Value {
1731 let mut metadata = serde_json::Map::new();
1732 metadata.insert("work_key".to_string(), json!(envelope.subject_key));
1733 metadata.insert("trigger_provider".to_string(), json!(envelope.provider));
1734 metadata.insert("trigger_kind".to_string(), json!(envelope.kind));
1735 metadata.insert("lease_id".to_string(), json!(lease.id));
1736 metadata.insert("tokens".to_string(), json!(cost.tokens));
1737 if cost.frontier_escalations > 0 {
1738 metadata.insert(
1739 "frontier_escalations".to_string(),
1740 json!(cost.frontier_escalations),
1741 );
1742 }
1743 match &cost.metadata {
1744 serde_json::Value::Null => {}
1745 serde_json::Value::Object(extra) => {
1746 metadata.extend(
1747 extra
1748 .iter()
1749 .map(|(key, value)| (key.clone(), value.clone())),
1750 );
1751 }
1752 extra => {
1753 metadata.insert("run_cost_metadata".to_string(), extra.clone());
1754 }
1755 }
1756 serde_json::Value::Object(metadata)
1757}
1758
1759fn budget_status(
1760 policy: &PersonaBudgetPolicy,
1761 spent: &[(i64, f64, u64)],
1762 now_ms: i64,
1763) -> PersonaBudgetStatus {
1764 let day_start = now_ms - (now_ms.rem_euclid(86_400_000));
1765 let hour_start = now_ms - (now_ms.rem_euclid(3_600_000));
1766 let mut spent_today_usd = 0.0;
1767 let mut spent_this_hour_usd = 0.0;
1768 let mut tokens_today = 0u64;
1769 let mut spent_last_run_usd = 0.0;
1770 for (at_ms, cost, tokens) in spent {
1771 spent_last_run_usd = *cost;
1772 if *at_ms >= day_start {
1773 spent_today_usd += cost;
1774 tokens_today += tokens;
1775 }
1776 if *at_ms >= hour_start {
1777 spent_this_hour_usd += cost;
1778 }
1779 }
1780
1781 let remaining_today_usd = policy
1782 .daily_usd
1783 .map(|limit| (limit - spent_today_usd).max(0.0));
1784 let remaining_hour_usd = policy
1785 .hourly_usd
1786 .map(|limit| (limit - spent_this_hour_usd).max(0.0));
1787 let reason = if policy
1788 .daily_usd
1789 .is_some_and(|limit| spent_today_usd >= limit && limit >= 0.0)
1790 {
1791 Some("daily_usd".to_string())
1792 } else if policy
1793 .hourly_usd
1794 .is_some_and(|limit| spent_this_hour_usd >= limit && limit >= 0.0)
1795 {
1796 Some("hourly_usd".to_string())
1797 } else if policy
1798 .max_tokens
1799 .is_some_and(|limit| tokens_today >= limit && limit > 0)
1800 {
1801 Some("max_tokens".to_string())
1802 } else {
1803 None
1804 };
1805
1806 PersonaBudgetStatus {
1807 daily_usd: policy.daily_usd,
1808 hourly_usd: policy.hourly_usd,
1809 run_usd: policy.run_usd,
1810 max_tokens: policy.max_tokens,
1811 spent_today_usd,
1812 spent_this_hour_usd,
1813 spent_last_run_usd,
1814 tokens_today,
1815 remaining_today_usd,
1816 remaining_hour_usd,
1817 exhausted: reason.is_some(),
1818 reason,
1819 last_receipt_id: None,
1820 }
1821}
1822
1823fn next_scheduled_run(
1824 binding: &PersonaRuntimeBinding,
1825 last_run_ms: Option<i64>,
1826 now_ms: i64,
1827) -> Option<String> {
1828 binding
1829 .schedules
1830 .iter()
1831 .filter_map(|schedule| next_cron_ms(schedule, last_run_ms.unwrap_or(now_ms)).ok())
1832 .min()
1833 .map(format_ms)
1834}
1835
1836fn next_cron_ms(schedule: &str, after_ms: i64) -> Result<i64, String> {
1837 let cron = schedule
1838 .parse::<Cron>()
1839 .map_err(|error| error.to_string())?;
1840 let after = Utc
1841 .timestamp_millis_opt(after_ms)
1842 .single()
1843 .ok_or_else(|| "invalid timestamp".to_string())?;
1844 let next = cron
1845 .find_next_occurrence(&after, false)
1846 .map_err(|error| error.to_string())?;
1847 Ok(next.timestamp_millis())
1848}
1849
1850pub fn now_ms() -> i64 {
1851 OffsetDateTime::now_utc().unix_timestamp_nanos() as i64 / 1_000_000
1852}
1853
1854fn offset_datetime_from_ms(ms: i64) -> OffsetDateTime {
1855 OffsetDateTime::from_unix_timestamp_nanos((ms as i128) * 1_000_000)
1856 .unwrap_or(OffsetDateTime::UNIX_EPOCH)
1857}
1858
1859pub fn format_ms(ms: i64) -> String {
1860 offset_datetime_from_ms(ms)
1861 .format(&Rfc3339)
1862 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
1863}
1864
1865pub fn parse_rfc3339_ms(value: &str) -> Result<i64, String> {
1866 let ts = OffsetDateTime::parse(value, &Rfc3339)
1867 .map_err(|error| format!("invalid RFC3339 timestamp '{value}': {error}"))?;
1868 Ok(ts.unix_timestamp_nanos() as i64 / 1_000_000)
1869}
1870
1871fn runtime_topic() -> Result<Topic, String> {
1872 Topic::new(PERSONA_RUNTIME_TOPIC).map_err(|error| error.to_string())
1873}
1874
1875#[cfg(test)]
1876mod tests {
1877 use super::*;
1878 use crate::event_log::{AnyEventLog, MemoryEventLog};
1879 use std::sync::Mutex;
1880
1881 struct CapturingValueSink {
1882 events: Arc<Mutex<Vec<PersonaValueEvent>>>,
1883 }
1884
1885 impl PersonaValueSink for CapturingValueSink {
1886 fn handle_value_event(&self, event: &PersonaValueEvent) {
1887 self.events.lock().unwrap().push(event.clone());
1888 }
1889 }
1890
1891 fn binding() -> PersonaRuntimeBinding {
1892 PersonaRuntimeBinding {
1893 name: "merge_captain".to_string(),
1894 template_ref: Some("software_factory@v0".to_string()),
1895 entry_workflow: "workflows/merge.harn#run".to_string(),
1896 schedules: vec!["*/30 * * * *".to_string()],
1897 triggers: vec!["github.pr_opened".to_string()],
1898 budget: PersonaBudgetPolicy {
1899 daily_usd: Some(0.02),
1900 hourly_usd: None,
1901 run_usd: Some(0.02),
1902 max_tokens: Some(100),
1903 },
1904 }
1905 }
1906
1907 fn log() -> Arc<AnyEventLog> {
1908 Arc::new(AnyEventLog::Memory(MemoryEventLog::new(64)))
1909 }
1910
1911 #[tokio::test]
1912 async fn schedule_tick_records_lifecycle_status_and_receipt() {
1913 let log = log();
1914 let binding = binding();
1915 let now = parse_rfc3339_ms("2026-04-24T12:30:00Z").unwrap();
1916 let receipt = fire_schedule(
1917 &log,
1918 &binding,
1919 PersonaRunCost {
1920 cost_usd: 0.01,
1921 tokens: 10,
1922 ..Default::default()
1923 },
1924 now,
1925 )
1926 .await
1927 .unwrap();
1928 assert_eq!(receipt.status, "completed");
1929 assert!(receipt.lease.is_some());
1930 let status = persona_status(&log, &binding, now).await.unwrap();
1931 assert_eq!(status.state, PersonaLifecycleState::Idle);
1932 assert_eq!(status.last_run.as_deref(), Some("2026-04-24T12:30:00Z"));
1933 assert!(status.next_scheduled_run.is_some());
1934 assert_eq!(status.budget.spent_today_usd, 0.01);
1935 }
1936
1937 #[tokio::test]
1938 async fn paused_personas_queue_and_resume_drains_once() {
1939 let log = log();
1940 let binding = binding();
1941 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
1942 pause_persona(&log, &binding, now).await.unwrap();
1943 let receipt = fire_trigger(
1944 &log,
1945 &binding,
1946 "github",
1947 "pull_request",
1948 BTreeMap::from([
1949 ("repository".to_string(), "burin-labs/harn".to_string()),
1950 ("number".to_string(), "462".to_string()),
1951 ]),
1952 PersonaRunCost::default(),
1953 now,
1954 )
1955 .await
1956 .unwrap();
1957 assert_eq!(receipt.status, "queued");
1958 assert_eq!(
1959 persona_status(&log, &binding, now)
1960 .await
1961 .unwrap()
1962 .queued_events,
1963 1
1964 );
1965 let status = resume_persona(&log, &binding, now + 1000).await.unwrap();
1966 assert_eq!(status.state, PersonaLifecycleState::Idle);
1967 assert_eq!(status.queued_events, 0);
1968 }
1969
1970 #[tokio::test]
1971 async fn resumed_queued_work_reuses_original_budget_cost() {
1972 let log = log();
1973 let mut binding = binding();
1974 binding.budget.run_usd = Some(0.01);
1975 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
1976 pause_persona(&log, &binding, now).await.unwrap();
1977 let queued = fire_trigger(
1978 &log,
1979 &binding,
1980 "github",
1981 "pull_request",
1982 BTreeMap::from([
1983 ("repository".to_string(), "burin-labs/harn".to_string()),
1984 ("number".to_string(), "1379".to_string()),
1985 ]),
1986 PersonaRunCost {
1987 cost_usd: 0.02,
1988 tokens: 1,
1989 ..Default::default()
1990 },
1991 now + 1,
1992 )
1993 .await
1994 .unwrap();
1995 assert_eq!(queued.status, "queued");
1996
1997 let status = resume_persona(&log, &binding, now + 2).await.unwrap();
1998
1999 assert_eq!(status.budget.reason.as_deref(), Some("run_usd"));
2000 assert!(status
2001 .last_error
2002 .as_deref()
2003 .is_some_and(|error| error.contains("run_usd")));
2004 assert_eq!(status.queued_events, 1);
2005 }
2006
2007 #[tokio::test]
2008 async fn duplicate_trigger_envelope_is_not_processed_twice() {
2009 let log = log();
2010 let binding = binding();
2011 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2012 let metadata = BTreeMap::from([
2013 ("repository".to_string(), "burin-labs/harn".to_string()),
2014 ("number".to_string(), "462".to_string()),
2015 ]);
2016 let first = fire_trigger(
2017 &log,
2018 &binding,
2019 "github",
2020 "pull_request",
2021 metadata.clone(),
2022 PersonaRunCost::default(),
2023 now,
2024 )
2025 .await
2026 .unwrap();
2027 let second = fire_trigger(
2028 &log,
2029 &binding,
2030 "github",
2031 "pull_request",
2032 metadata,
2033 PersonaRunCost::default(),
2034 now + 1000,
2035 )
2036 .await
2037 .unwrap();
2038 assert_eq!(first.status, "completed");
2039 assert_eq!(second.status, "duplicate");
2040 assert!(second.lease.is_none());
2041 }
2042
2043 #[tokio::test]
2044 async fn disabled_personas_dead_letter_events() {
2045 let log = log();
2046 let binding = binding();
2047 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2048 disable_persona(&log, &binding, now).await.unwrap();
2049 let receipt = fire_trigger(
2050 &log,
2051 &binding,
2052 "slack",
2053 "message",
2054 BTreeMap::from([
2055 ("channel".to_string(), "C123".to_string()),
2056 ("ts".to_string(), "1713988800.000100".to_string()),
2057 ]),
2058 PersonaRunCost::default(),
2059 now,
2060 )
2061 .await
2062 .unwrap();
2063 assert_eq!(receipt.status, "dead_lettered");
2064 let status = persona_status(&log, &binding, now).await.unwrap();
2065 assert_eq!(status.state, PersonaLifecycleState::Disabled);
2066 assert_eq!(status.disabled_events, 1);
2067 }
2068
2069 #[tokio::test]
2070 async fn budget_exhaustion_blocks_expensive_work() {
2071 let log = log();
2072 let mut binding = binding();
2073 binding.budget.daily_usd = Some(0.01);
2074 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2075 let receipt = fire_trigger(
2076 &log,
2077 &binding,
2078 "linear",
2079 "issue",
2080 BTreeMap::from([("issue_key".to_string(), "HAR-462".to_string())]),
2081 PersonaRunCost {
2082 cost_usd: 0.02,
2083 tokens: 1,
2084 ..Default::default()
2085 },
2086 now,
2087 )
2088 .await
2089 .unwrap();
2090 assert_eq!(receipt.status, "budget_exhausted");
2091 let status = persona_status(&log, &binding, now).await.unwrap();
2092 assert_eq!(status.budget.reason.as_deref(), Some("daily_usd"));
2093 assert!(status.budget.exhausted);
2094 assert!(status.last_error.as_deref().unwrap().contains("daily_usd"));
2095 }
2096
2097 #[tokio::test]
2098 async fn deterministic_predicate_hit_emits_value_event_with_avoided_cost() {
2099 let log = log();
2100 let binding = binding();
2101 let captured = Arc::new(Mutex::new(Vec::new()));
2102 let _registration = register_persona_value_sink(Arc::new(CapturingValueSink {
2103 events: captured.clone(),
2104 }));
2105 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2106
2107 let receipt = fire_trigger(
2108 &log,
2109 &binding,
2110 "github",
2111 "pull_request",
2112 BTreeMap::from([
2113 ("repository".to_string(), "burin-labs/harn".to_string()),
2114 ("number".to_string(), "715".to_string()),
2115 ]),
2116 PersonaRunCost {
2117 avoided_cost_usd: 0.0042,
2118 deterministic_steps: 1,
2119 metadata: json!({
2120 "predicate": "pr_already_green",
2121 "would_have_called_model": "gpt-5.4-mini",
2122 }),
2123 ..Default::default()
2124 },
2125 now,
2126 )
2127 .await
2128 .unwrap();
2129
2130 let run_id = receipt.run_id.expect("completed run has run_id");
2131 let events = captured.lock().unwrap().clone();
2132 let deterministic = events
2133 .iter()
2134 .find(|event| {
2135 event.kind == PersonaValueEventKind::DeterministicExecution
2136 && event.run_id == Some(run_id)
2137 })
2138 .expect("deterministic execution value event");
2139 assert_eq!(deterministic.persona_id, "merge_captain");
2140 assert_eq!(
2141 deterministic.template_ref.as_deref(),
2142 Some("software_factory@v0")
2143 );
2144 assert_eq!(deterministic.run_id, Some(run_id));
2145 assert_eq!(deterministic.paid_cost_usd, 0.0);
2146 assert_eq!(deterministic.avoided_cost_usd, 0.0042);
2147 assert_eq!(deterministic.deterministic_steps, 1);
2148 assert_eq!(
2149 deterministic.metadata["predicate"].as_str(),
2150 Some("pr_already_green")
2151 );
2152
2153 let persisted = read_persona_events(&log, &binding.name).await.unwrap();
2154 assert!(persisted.iter().any(|(_, event)| {
2155 event.kind == "persona.value.deterministic_execution"
2156 && event.payload["avoided_cost_usd"] == json!(0.0042)
2157 }));
2158 }
2159
2160 #[tokio::test]
2161 async fn frontier_escalation_run_emits_value_event_with_paid_cost() {
2162 let log = log();
2163 let binding = binding();
2164 let captured = Arc::new(Mutex::new(Vec::new()));
2165 let _registration = register_persona_value_sink(Arc::new(CapturingValueSink {
2166 events: captured.clone(),
2167 }));
2168 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2169
2170 let receipt = fire_trigger(
2171 &log,
2172 &binding,
2173 "linear",
2174 "issue",
2175 BTreeMap::from([("issue_key".to_string(), "HAR-715".to_string())]),
2176 PersonaRunCost {
2177 cost_usd: 0.011,
2178 tokens: 20,
2179 llm_steps: 1,
2180 frontier_escalations: 1,
2181 metadata: json!({
2182 "frontier_model": "gpt-5.4",
2183 "escalation_reason": "high_risk_merge",
2184 }),
2185 ..Default::default()
2186 },
2187 now,
2188 )
2189 .await
2190 .unwrap();
2191
2192 let run_id = receipt.run_id.expect("completed run has run_id");
2193 let events = captured.lock().unwrap().clone();
2194 let escalation = events
2195 .iter()
2196 .find(|event| {
2197 event.kind == PersonaValueEventKind::FrontierEscalation
2198 && event.run_id == Some(run_id)
2199 })
2200 .expect("frontier escalation value event");
2201 assert_eq!(escalation.run_id, Some(run_id));
2202 assert_eq!(escalation.paid_cost_usd, 0.011);
2203 assert_eq!(escalation.avoided_cost_usd, 0.0);
2204 assert_eq!(escalation.llm_steps, 1);
2205 assert_eq!(
2206 escalation.metadata["frontier_model"].as_str(),
2207 Some("gpt-5.4")
2208 );
2209
2210 let completion = events
2211 .iter()
2212 .find(|event| {
2213 event.kind == PersonaValueEventKind::RunCompleted && event.run_id == Some(run_id)
2214 })
2215 .expect("run completed value event");
2216 assert_eq!(completion.paid_cost_usd, 0.0);
2217 }
2218
2219 struct CapturingSupervisionSink {
2220 events: Arc<Mutex<Vec<PersonaSupervisionEvent>>>,
2221 }
2222
2223 impl PersonaSupervisionSink for CapturingSupervisionSink {
2224 fn handle_supervision_event(&self, event: &PersonaSupervisionEvent) {
2225 self.events.lock().unwrap().push(event.clone());
2226 }
2227 }
2228
2229 fn pr_metadata(repository: &str, number: &str) -> BTreeMap<String, String> {
2230 BTreeMap::from([
2231 ("repository".to_string(), repository.to_string()),
2232 ("number".to_string(), number.to_string()),
2233 ])
2234 }
2235
2236 fn binding_named(name: &str) -> PersonaRuntimeBinding {
2237 PersonaRuntimeBinding {
2238 name: name.to_string(),
2239 ..binding()
2240 }
2241 }
2242
2243 fn supervision_events_for(
2244 captured: &Arc<Mutex<Vec<PersonaSupervisionEvent>>>,
2245 persona: &str,
2246 ) -> Vec<PersonaSupervisionEvent> {
2247 captured
2248 .lock()
2249 .unwrap()
2250 .iter()
2251 .filter(|event| match event {
2252 PersonaSupervisionEvent::QueuePosition(update) => update.persona_id == persona,
2253 PersonaSupervisionEvent::RepairWorkerStatus(update) => update.persona_id == persona,
2254 PersonaSupervisionEvent::Receipt(update) => update.persona_id == persona,
2255 PersonaSupervisionEvent::Checkpoint(update) => update.persona_id == persona,
2256 })
2257 .cloned()
2258 .collect()
2259 }
2260
2261 async fn drive_pause_then_resume(binding: &PersonaRuntimeBinding, now: i64) {
2262 let log = log();
2263 pause_persona(&log, binding, now).await.unwrap();
2264 let _ = fire_trigger(
2265 &log,
2266 binding,
2267 "github",
2268 "pull_request",
2269 pr_metadata("burin-labs/harn", "1480"),
2270 PersonaRunCost::default(),
2271 now,
2272 )
2273 .await
2274 .unwrap();
2275 let _ = resume_persona(&log, binding, now + 1).await.unwrap();
2276 let _ = restore_persona_checkpoint(
2277 &log,
2278 binding,
2279 PersonaCheckpointRestoreRequest {
2280 checkpoint_id: "cp_42".to_string(),
2281 work_key: Some("github:burin-labs/harn:pr:1480".to_string()),
2282 resumed_from: Some(PersonaCheckpointResume {
2283 note: Some("resumed from cp 42".to_string()),
2284 ..Default::default()
2285 }),
2286 },
2287 now + 2,
2288 )
2289 .await
2290 .unwrap();
2291 let _ = report_repair_worker_status(
2292 &log,
2293 binding,
2294 PersonaRepairWorkerStatusUpdate {
2295 persona_id: String::new(),
2296 template_ref: None,
2297 repair_worker_id: "rw_42".to_string(),
2298 lifecycle: PersonaRepairWorkerLifecycle::Running,
2299 work_key: Some("github:burin-labs/harn:pr:1480".to_string()),
2300 lease_id: Some("persona_lease_xyz".to_string()),
2301 scratchpad_url: Some("https://factory.local/rw_42".to_string()),
2302 last_heartbeat_ms: 0,
2303 occurred_at_ms: 0,
2304 },
2305 now + 3,
2306 )
2307 .await
2308 .unwrap();
2309 }
2310
2311 #[tokio::test]
2312 async fn supervision_sink_emits_queue_position_and_receipt() {
2313 let captured = Arc::new(Mutex::new(Vec::new()));
2314 let _registration = register_persona_supervision_sink(Arc::new(CapturingSupervisionSink {
2315 events: captured.clone(),
2316 }));
2317 let log = log();
2318 let binding = binding_named("supervision_sink_emits_queue_position_and_receipt");
2319 let now = parse_rfc3339_ms("2026-05-01T00:00:00Z").unwrap();
2320
2321 pause_persona(&log, &binding, now).await.unwrap();
2322 fire_trigger(
2323 &log,
2324 &binding,
2325 "github",
2326 "pull_request",
2327 pr_metadata("burin-labs/harn", "1480"),
2328 PersonaRunCost::default(),
2329 now + 100,
2330 )
2331 .await
2332 .unwrap();
2333 resume_persona(&log, &binding, now + 200).await.unwrap();
2334
2335 let events = supervision_events_for(&captured, &binding.name);
2336 let queue_events: Vec<_> = events
2337 .iter()
2338 .filter_map(|event| match event {
2339 PersonaSupervisionEvent::QueuePosition(update) => Some(update.clone()),
2340 _ => None,
2341 })
2342 .collect();
2343 assert_eq!(queue_events.len(), 2, "enqueue + drain emitted");
2344 assert_eq!(queue_events[0].position, 1);
2345 assert_eq!(queue_events[0].queue_depth, 1);
2346 assert_eq!(queue_events[1].position, 0);
2347 assert_eq!(queue_events[1].queue_depth, 0);
2348 let receipt_events: Vec<_> = events
2349 .iter()
2350 .filter_map(|event| match event {
2351 PersonaSupervisionEvent::Receipt(update) => Some(update.clone()),
2352 _ => None,
2353 })
2354 .collect();
2355 assert_eq!(receipt_events.len(), 2, "queued + drained receipt");
2356 assert_eq!(receipt_events[0].receipt.status, "queued");
2357 assert_eq!(receipt_events[1].receipt.status, "completed");
2358 for event in &receipt_events {
2359 assert_eq!(event.receipt.persona, binding.name);
2360 assert_eq!(event.persona_id, binding.name);
2361 assert_eq!(event.template_ref.as_deref(), Some("software_factory@v0"));
2362 }
2363 }
2364
2365 #[tokio::test]
2366 async fn supervision_sink_emits_repair_worker_status_idempotently() {
2367 let captured = Arc::new(Mutex::new(Vec::new()));
2368 let _registration = register_persona_supervision_sink(Arc::new(CapturingSupervisionSink {
2369 events: captured.clone(),
2370 }));
2371 let log = log();
2372 let binding = binding_named("supervision_sink_emits_repair_worker_status_idempotently");
2373 let now = parse_rfc3339_ms("2026-05-01T01:00:00Z").unwrap();
2374 let update = PersonaRepairWorkerStatusUpdate {
2375 persona_id: String::new(),
2376 template_ref: None,
2377 repair_worker_id: "rw_test".to_string(),
2378 lifecycle: PersonaRepairWorkerLifecycle::Running,
2379 work_key: Some("github:burin-labs/harn:pr:1480".to_string()),
2380 lease_id: Some("persona_lease_abc".to_string()),
2381 scratchpad_url: Some("https://factory.local/rw_test".to_string()),
2382 last_heartbeat_ms: 0,
2383 occurred_at_ms: 0,
2384 };
2385 let first = report_repair_worker_status(&log, &binding, update.clone(), now)
2386 .await
2387 .unwrap();
2388 let second = report_repair_worker_status(&log, &binding, update.clone(), now + 5)
2389 .await
2390 .unwrap();
2391 assert!(first);
2392 assert!(!second, "second identical lifecycle is idempotent");
2393
2394 let mut next = update.clone();
2395 next.lifecycle = PersonaRepairWorkerLifecycle::Succeeded;
2396 let third = report_repair_worker_status(&log, &binding, next, now + 10)
2397 .await
2398 .unwrap();
2399 assert!(third);
2400
2401 let kinds: Vec<_> = supervision_events_for(&captured, &binding.name)
2402 .into_iter()
2403 .filter_map(|event| match event {
2404 PersonaSupervisionEvent::RepairWorkerStatus(update) => Some(update.lifecycle),
2405 _ => None,
2406 })
2407 .collect();
2408 assert_eq!(
2409 kinds,
2410 vec![
2411 PersonaRepairWorkerLifecycle::Running,
2412 PersonaRepairWorkerLifecycle::Succeeded
2413 ]
2414 );
2415 }
2416
2417 #[tokio::test]
2418 async fn supervision_sink_emits_checkpoint_restore_ack() {
2419 let captured = Arc::new(Mutex::new(Vec::new()));
2420 let _registration = register_persona_supervision_sink(Arc::new(CapturingSupervisionSink {
2421 events: captured.clone(),
2422 }));
2423 let log = log();
2424 let binding = binding_named("supervision_sink_emits_checkpoint_restore_ack");
2425 let now = parse_rfc3339_ms("2026-05-01T02:00:00Z").unwrap();
2426 fire_trigger(
2427 &log,
2428 &binding,
2429 "github",
2430 "pull_request",
2431 pr_metadata("burin-labs/harn", "1480"),
2432 PersonaRunCost::default(),
2433 now,
2434 )
2435 .await
2436 .unwrap();
2437
2438 let outcome = restore_persona_checkpoint(
2439 &log,
2440 &binding,
2441 PersonaCheckpointRestoreRequest {
2442 checkpoint_id: "cp_1".to_string(),
2443 work_key: Some("github:burin-labs/harn:pr:1480".to_string()),
2444 resumed_from: None,
2445 },
2446 now + 100,
2447 )
2448 .await
2449 .unwrap();
2450 assert!(outcome.acked);
2451 assert_eq!(outcome.update.checkpoint_id, "cp_1");
2452 let resume = outcome
2453 .update
2454 .resumed_from
2455 .as_ref()
2456 .expect("resume coordinates default-derived from status");
2457 assert_eq!(resume.last_run_ms, Some(now));
2458
2459 let replay = restore_persona_checkpoint(
2460 &log,
2461 &binding,
2462 PersonaCheckpointRestoreRequest {
2463 checkpoint_id: "cp_1".to_string(),
2464 work_key: None,
2465 resumed_from: None,
2466 },
2467 now + 200,
2468 )
2469 .await
2470 .unwrap();
2471 assert!(!replay.acked, "duplicate restore is a no-op ack");
2472 assert_eq!(replay.update.occurred_at_ms, now + 100);
2473
2474 let ack_events: Vec<_> = supervision_events_for(&captured, &binding.name)
2475 .into_iter()
2476 .filter_map(|event| match event {
2477 PersonaSupervisionEvent::Checkpoint(update) => Some(update),
2478 _ => None,
2479 })
2480 .collect();
2481 assert_eq!(ack_events.len(), 1, "ack emitted once, replay suppressed");
2482 assert_eq!(ack_events[0].action, PersonaCheckpointAction::RestoreAcked);
2483 }
2484
2485 #[tokio::test]
2486 async fn supervision_sink_replay_is_deterministic_under_recorded_clock() {
2487 use harn_clock::{ClockEventLog, PausedClock, RecordedClock};
2488 use time::OffsetDateTime;
2489
2490 let now_ms = parse_rfc3339_ms("2026-05-01T03:00:00Z").unwrap();
2491 async fn drive(now_ms: i64) -> (Vec<PersonaSupervisionEvent>, Vec<harn_clock::ClockEvent>) {
2492 let captured = Arc::new(Mutex::new(Vec::new()));
2493 let _registration =
2494 register_persona_supervision_sink(Arc::new(CapturingSupervisionSink {
2495 events: captured.clone(),
2496 }));
2497 let paused = PausedClock::new(
2498 OffsetDateTime::from_unix_timestamp_nanos((now_ms as i128) * 1_000_000).unwrap(),
2499 );
2500 let recorded = Arc::new(RecordedClock::new(paused, Arc::new(ClockEventLog::new())));
2501 let binding = binding_named("supervision_replay_persona");
2502 let ts = harn_clock::now_wall_ms(&*recorded);
2503 drive_pause_then_resume(&binding, ts).await;
2504 let clock_log = recorded.log().snapshot();
2505 let events = supervision_events_for(&captured, &binding.name);
2506 (events, clock_log)
2507 }
2508
2509 let (events_a, clock_a) = drive(now_ms).await;
2510 let (events_b, clock_b) = drive(now_ms).await;
2511 fn normalize(event: &PersonaSupervisionEvent) -> PersonaSupervisionEvent {
2515 match event.clone() {
2516 PersonaSupervisionEvent::Receipt(mut update) => {
2517 update.receipt.run_id = None;
2518 if let Some(lease) = update.receipt.lease.as_mut() {
2519 lease.id = "lease".to_string();
2520 }
2521 update.receipt.budget_receipt_id = update
2522 .receipt
2523 .budget_receipt_id
2524 .map(|_| "budget".to_string());
2525 PersonaSupervisionEvent::Receipt(update)
2526 }
2527 PersonaSupervisionEvent::Checkpoint(mut update) => {
2528 if let Some(resume) = update.resumed_from.as_mut() {
2529 resume.run_id = None;
2530 resume.lease_id = None;
2531 }
2532 PersonaSupervisionEvent::Checkpoint(update)
2533 }
2534 other => other,
2535 }
2536 }
2537 let a: Vec<_> = events_a.iter().map(normalize).collect();
2538 let b: Vec<_> = events_b.iter().map(normalize).collect();
2539 assert_eq!(a, b, "supervision sink emits identical event envelopes");
2540 assert_eq!(
2541 clock_a, clock_b,
2542 "recorded clock observation log is identical across replays"
2543 );
2544 }
2545}