1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::sync::{Arc, OnceLock, RwLock};
4
5use chrono::{TimeZone, Utc};
6use croner::Cron;
7use serde::{Deserialize, Serialize};
8use serde_json::json;
9use time::format_description::well_known::Rfc3339;
10use time::OffsetDateTime;
11use uuid::Uuid;
12
13use crate::event_log::{AnyEventLog, EventLog, LogEvent, Topic};
14
15pub const PERSONA_RUNTIME_TOPIC: &str = "persona.runtime.events";
16
17const DEFAULT_LEASE_TTL_MS: i64 = 5 * 60 * 1000;
18
19#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
20#[serde(rename_all = "snake_case")]
21pub enum PersonaLifecycleState {
22 Inactive,
23 Starting,
24 #[default]
25 Idle,
26 Running,
27 Paused,
28 Draining,
29 Failed,
30 Disabled,
31}
32
33impl PersonaLifecycleState {
34 pub fn as_str(self) -> &'static str {
35 match self {
36 Self::Inactive => "inactive",
37 Self::Starting => "starting",
38 Self::Idle => "idle",
39 Self::Running => "running",
40 Self::Paused => "paused",
41 Self::Draining => "draining",
42 Self::Failed => "failed",
43 Self::Disabled => "disabled",
44 }
45 }
46}
47
48#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
49pub struct PersonaBudgetPolicy {
50 pub daily_usd: Option<f64>,
51 pub hourly_usd: Option<f64>,
52 pub run_usd: Option<f64>,
53 pub max_tokens: Option<u64>,
54}
55
56#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
57pub struct PersonaRuntimeBinding {
58 pub name: String,
59 #[serde(default)]
60 pub template_ref: Option<String>,
61 pub entry_workflow: String,
62 #[serde(default)]
63 pub schedules: Vec<String>,
64 #[serde(default)]
65 pub triggers: Vec<String>,
66 #[serde(default)]
67 pub budget: PersonaBudgetPolicy,
68}
69
70#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
71pub struct PersonaLease {
72 pub id: String,
73 pub holder: String,
74 pub work_key: String,
75 pub acquired_at_ms: i64,
76 pub expires_at_ms: i64,
77}
78
79#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
80pub struct PersonaBudgetStatus {
81 pub daily_usd: Option<f64>,
82 pub hourly_usd: Option<f64>,
83 pub run_usd: Option<f64>,
84 pub max_tokens: Option<u64>,
85 pub spent_today_usd: f64,
86 pub spent_this_hour_usd: f64,
87 pub spent_last_run_usd: f64,
88 pub tokens_today: u64,
89 pub remaining_today_usd: Option<f64>,
90 pub remaining_hour_usd: Option<f64>,
91 pub exhausted: bool,
92 pub reason: Option<String>,
93 pub last_receipt_id: Option<String>,
94}
95
96#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
97pub struct PersonaStatus {
98 pub name: String,
99 #[serde(default)]
100 pub template_ref: Option<String>,
101 pub state: PersonaLifecycleState,
102 pub entry_workflow: String,
103 #[serde(default)]
104 pub role: String,
105 #[serde(default)]
106 pub current_assignment: Option<PersonaAssignmentStatus>,
107 pub last_run: Option<String>,
108 pub next_scheduled_run: Option<String>,
109 pub active_lease: Option<PersonaLease>,
110 pub budget: PersonaBudgetStatus,
111 pub last_error: Option<String>,
112 pub queued_events: usize,
113 #[serde(default)]
114 pub queued_work: Vec<PersonaQueuedWork>,
115 #[serde(default)]
116 pub handoff_inbox: Vec<PersonaHandoffInboxItem>,
117 #[serde(default)]
118 pub value_receipts: Vec<PersonaValueReceipt>,
119 pub disabled_events: usize,
120 pub paused_event_policy: String,
121}
122
123#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
124pub struct PersonaAssignmentStatus {
125 pub work_key: String,
126 pub lease_id: String,
127 pub holder: String,
128 pub acquired_at: String,
129 pub expires_at: String,
130}
131
132#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
133pub struct PersonaQueuedWork {
134 pub work_key: String,
135 pub provider: String,
136 pub kind: String,
137 pub queued_at: String,
138 pub reason: String,
139 pub source_event_id: Option<String>,
140 #[serde(default)]
141 pub metadata: BTreeMap<String, String>,
142}
143
144#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
145pub struct PersonaHandoffInboxItem {
146 pub work_key: String,
147 pub handoff_id: Option<String>,
148 pub handoff_kind: Option<String>,
149 pub source_persona: Option<String>,
150 pub task: Option<String>,
151 pub queued_at: String,
152 pub reason: String,
153}
154
155#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
156pub struct PersonaValueReceipt {
157 pub kind: PersonaValueEventKind,
158 pub run_id: Option<Uuid>,
159 pub occurred_at: String,
160 pub paid_cost_usd: f64,
161 pub avoided_cost_usd: f64,
162 pub deterministic_steps: i64,
163 pub llm_steps: i64,
164 pub metadata: serde_json::Value,
165}
166
167#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
168pub struct PersonaTriggerEnvelope {
169 pub provider: String,
170 pub kind: String,
171 pub subject_key: String,
172 pub source_event_id: Option<String>,
173 pub received_at_ms: i64,
174 #[serde(default)]
175 pub metadata: BTreeMap<String, String>,
176 #[serde(default)]
177 pub raw: serde_json::Value,
178}
179
180#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
181pub struct PersonaRunReceipt {
182 pub status: String,
183 pub persona: String,
184 #[serde(default)]
185 pub run_id: Option<Uuid>,
186 pub work_key: String,
187 pub lease: Option<PersonaLease>,
188 pub queued: bool,
189 pub error: Option<String>,
190 pub budget_receipt_id: Option<String>,
191}
192
193#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
194pub struct PersonaRunCost {
195 pub cost_usd: f64,
196 pub tokens: u64,
197 #[serde(default)]
198 pub avoided_cost_usd: f64,
199 #[serde(default)]
200 pub deterministic_steps: i64,
201 #[serde(default)]
202 pub llm_steps: i64,
203 #[serde(default)]
204 pub frontier_escalations: i64,
205 #[serde(default)]
206 pub metadata: serde_json::Value,
207}
208
209#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
210#[serde(rename_all = "snake_case")]
211pub enum PersonaValueEventKind {
212 RunStarted,
213 RunCompleted,
214 AcceptedOutcome,
215 FrontierEscalation,
216 DeterministicExecution,
217 PromotionSavings,
218 ApprovalWait,
219}
220
221impl PersonaValueEventKind {
222 pub fn as_str(self) -> &'static str {
223 match self {
224 Self::RunStarted => "run_started",
225 Self::RunCompleted => "run_completed",
226 Self::AcceptedOutcome => "accepted_outcome",
227 Self::FrontierEscalation => "frontier_escalation",
228 Self::DeterministicExecution => "deterministic_execution",
229 Self::PromotionSavings => "promotion_savings",
230 Self::ApprovalWait => "approval_wait",
231 }
232 }
233}
234
235#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
236pub struct PersonaValueEvent {
237 pub persona_id: String,
238 pub template_ref: Option<String>,
239 pub run_id: Option<Uuid>,
240 pub kind: PersonaValueEventKind,
241 pub paid_cost_usd: f64,
242 pub avoided_cost_usd: f64,
243 pub deterministic_steps: i64,
244 pub llm_steps: i64,
245 pub metadata: serde_json::Value,
246 pub occurred_at: OffsetDateTime,
247}
248
249pub trait PersonaValueSink: Send + Sync {
250 fn handle_value_event(&self, event: &PersonaValueEvent);
251}
252
253#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
260#[serde(tag = "update_kind", rename_all = "snake_case")]
261pub enum PersonaSupervisionEvent {
262 QueuePosition(PersonaQueuePositionUpdate),
263 RepairWorkerStatus(PersonaRepairWorkerStatusUpdate),
264 Receipt(PersonaReceiptUpdate),
265 Checkpoint(PersonaCheckpointUpdate),
266}
267
268impl PersonaSupervisionEvent {
269 pub fn update_kind(&self) -> &'static str {
270 match self {
271 Self::QueuePosition(_) => "queue_position",
272 Self::RepairWorkerStatus(_) => "repair_worker_status",
273 Self::Receipt(_) => "receipt",
274 Self::Checkpoint(_) => "checkpoint",
275 }
276 }
277
278 pub fn persona_id(&self) -> &str {
279 match self {
280 Self::QueuePosition(update) => &update.persona_id,
281 Self::RepairWorkerStatus(update) => &update.persona_id,
282 Self::Receipt(update) => &update.persona_id,
283 Self::Checkpoint(update) => &update.persona_id,
284 }
285 }
286
287 pub fn template_ref(&self) -> Option<&str> {
288 match self {
289 Self::QueuePosition(update) => update.template_ref.as_deref(),
290 Self::RepairWorkerStatus(update) => update.template_ref.as_deref(),
291 Self::Receipt(update) => update.template_ref.as_deref(),
292 Self::Checkpoint(update) => update.template_ref.as_deref(),
293 }
294 }
295
296 pub fn occurred_at_ms(&self) -> i64 {
297 match self {
298 Self::QueuePosition(update) => update.occurred_at_ms,
299 Self::RepairWorkerStatus(update) => update.occurred_at_ms,
300 Self::Receipt(update) => update.occurred_at_ms,
301 Self::Checkpoint(update) => update.occurred_at_ms,
302 }
303 }
304}
305
306#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
307pub struct PersonaQueuePositionUpdate {
308 pub persona_id: String,
309 #[serde(default)]
310 pub template_ref: Option<String>,
311 pub work_key: String,
312 pub queue_depth: i64,
313 pub position: i64,
315 pub queued_at_ms: i64,
316 pub occurred_at_ms: i64,
317}
318
319#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
320#[serde(rename_all = "snake_case")]
321pub enum PersonaRepairWorkerLifecycle {
322 Pending,
323 Running,
324 Verifying,
325 Pushing,
326 Succeeded,
327 Failed,
328 Cancelled,
329}
330
331impl PersonaRepairWorkerLifecycle {
332 pub fn as_str(self) -> &'static str {
333 match self {
334 Self::Pending => "pending",
335 Self::Running => "running",
336 Self::Verifying => "verifying",
337 Self::Pushing => "pushing",
338 Self::Succeeded => "succeeded",
339 Self::Failed => "failed",
340 Self::Cancelled => "cancelled",
341 }
342 }
343}
344
345#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
346pub struct PersonaRepairWorkerStatusUpdate {
347 pub persona_id: String,
348 #[serde(default)]
349 pub template_ref: Option<String>,
350 pub repair_worker_id: String,
351 pub lifecycle: PersonaRepairWorkerLifecycle,
352 #[serde(default)]
353 pub work_key: Option<String>,
354 #[serde(default)]
355 pub lease_id: Option<String>,
356 #[serde(default)]
357 pub scratchpad_url: Option<String>,
358 pub last_heartbeat_ms: i64,
359 pub occurred_at_ms: i64,
360}
361
362#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
363pub struct PersonaReceiptUpdate {
364 pub persona_id: String,
365 #[serde(default)]
366 pub template_ref: Option<String>,
367 pub receipt: PersonaRunReceipt,
368 pub occurred_at_ms: i64,
369}
370
371#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
372pub struct PersonaCheckpointUpdate {
373 pub persona_id: String,
374 #[serde(default)]
375 pub template_ref: Option<String>,
376 pub action: PersonaCheckpointAction,
377 pub checkpoint_id: String,
378 #[serde(default)]
379 pub work_key: Option<String>,
380 #[serde(default)]
382 pub resumed_from: Option<PersonaCheckpointResume>,
383 pub occurred_at_ms: i64,
384}
385
386#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
387#[serde(rename_all = "snake_case")]
388pub enum PersonaCheckpointAction {
389 RestoreAcked,
390}
391
392impl PersonaCheckpointAction {
393 pub fn as_str(self) -> &'static str {
394 match self {
395 Self::RestoreAcked => "restore_acked",
396 }
397 }
398}
399
400#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
401pub struct PersonaCheckpointResume {
402 pub run_id: Option<Uuid>,
403 pub lease_id: Option<String>,
404 pub last_run_ms: Option<i64>,
405 pub queued_work_keys: Vec<String>,
406 #[serde(default)]
407 pub note: Option<String>,
408}
409
410pub trait PersonaSupervisionSink: Send + Sync {
411 fn handle_supervision_event(&self, event: &PersonaSupervisionEvent);
412}
413
414struct TypedSinkRegistry<T: ?Sized + Send + Sync> {
415 sinks: RwLock<Vec<(u64, Arc<T>)>>,
416 next_id: AtomicU64,
417}
418
419impl<T: ?Sized + Send + Sync> TypedSinkRegistry<T> {
420 const fn new() -> Self {
421 Self {
422 sinks: RwLock::new(Vec::new()),
423 next_id: AtomicU64::new(1),
424 }
425 }
426
427 fn register(&self, sink: Arc<T>) -> u64 {
428 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
429 if let Ok(mut sinks) = self.sinks.write() {
430 sinks.push((id, sink));
431 }
432 id
433 }
434
435 fn unregister(&self, id: u64) {
436 if let Ok(mut sinks) = self.sinks.write() {
437 sinks.retain(|(existing, _)| *existing != id);
438 }
439 }
440
441 fn snapshot(&self) -> Vec<Arc<T>> {
442 self.sinks
443 .read()
444 .map(|sinks| sinks.iter().map(|(_, sink)| Arc::clone(sink)).collect())
445 .unwrap_or_default()
446 }
447}
448
449fn persona_value_sinks() -> &'static TypedSinkRegistry<dyn PersonaValueSink> {
450 static REGISTRY: OnceLock<TypedSinkRegistry<dyn PersonaValueSink>> = OnceLock::new();
451 REGISTRY.get_or_init(TypedSinkRegistry::new)
452}
453
454fn persona_supervision_sinks() -> &'static TypedSinkRegistry<dyn PersonaSupervisionSink> {
455 static REGISTRY: OnceLock<TypedSinkRegistry<dyn PersonaSupervisionSink>> = OnceLock::new();
456 REGISTRY.get_or_init(TypedSinkRegistry::new)
457}
458
459#[must_use = "dropping the registration immediately unregisters the sink"]
460pub struct PersonaValueSinkRegistration {
461 id: u64,
462}
463
464impl Drop for PersonaValueSinkRegistration {
465 fn drop(&mut self) {
466 persona_value_sinks().unregister(self.id);
467 }
468}
469
470pub fn register_persona_value_sink(
471 sink: Arc<dyn PersonaValueSink>,
472) -> PersonaValueSinkRegistration {
473 PersonaValueSinkRegistration {
474 id: persona_value_sinks().register(sink),
475 }
476}
477
478#[must_use = "dropping the registration immediately unregisters the sink"]
479pub struct PersonaSupervisionSinkRegistration {
480 id: u64,
481}
482
483impl Drop for PersonaSupervisionSinkRegistration {
484 fn drop(&mut self) {
485 persona_supervision_sinks().unregister(self.id);
486 }
487}
488
489pub fn register_persona_supervision_sink(
490 sink: Arc<dyn PersonaSupervisionSink>,
491) -> PersonaSupervisionSinkRegistration {
492 PersonaSupervisionSinkRegistration {
493 id: persona_supervision_sinks().register(sink),
494 }
495}
496
497pub async fn persona_status(
498 log: &Arc<AnyEventLog>,
499 binding: &PersonaRuntimeBinding,
500 now_ms: i64,
501) -> Result<PersonaStatus, String> {
502 let events = read_persona_events(log, &binding.name).await?;
503 let mut state = PersonaLifecycleState::Idle;
504 let mut last_run_ms = None;
505 let mut active_lease = None;
506 let mut last_error = None;
507 let mut queued = BTreeSet::<String>::new();
508 let mut completed = BTreeSet::<String>::new();
509 let mut disabled_events = 0usize;
510 let mut budget_receipt = None;
511 let mut budget_exhaustion_reason = None;
512 let mut spent = Vec::<(i64, f64, u64)>::new();
513 let mut queued_work = BTreeMap::<String, PersonaQueuedWork>::new();
514 let mut value_receipts = Vec::<PersonaValueReceipt>::new();
515
516 for (_, event) in events {
517 match event.kind.as_str() {
518 "persona.control.paused" => state = PersonaLifecycleState::Paused,
519 "persona.control.resumed" => state = PersonaLifecycleState::Idle,
520 "persona.control.disabled" => state = PersonaLifecycleState::Disabled,
521 "persona.control.draining" => state = PersonaLifecycleState::Draining,
522 "persona.lease.acquired" => {
523 if let Ok(lease) = serde_json::from_value::<PersonaLease>(event.payload.clone()) {
524 active_lease = Some(lease);
525 state = PersonaLifecycleState::Running;
526 }
527 }
528 "persona.lease.released" => {
529 active_lease = None;
530 if !matches!(
531 state,
532 PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
533 ) {
534 state = PersonaLifecycleState::Idle;
535 }
536 }
537 "persona.lease.expired" => {
538 active_lease = None;
539 if !matches!(
540 state,
541 PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
542 ) {
543 state = PersonaLifecycleState::Idle;
544 }
545 }
546 "persona.run.started" => state = PersonaLifecycleState::Running,
547 "persona.run.completed" => {
548 last_run_ms = event
549 .payload
550 .get("completed_at_ms")
551 .and_then(serde_json::Value::as_i64)
552 .or(Some(event.occurred_at_ms));
553 if let Some(work_key) = event
554 .payload
555 .get("work_key")
556 .and_then(serde_json::Value::as_str)
557 {
558 completed.insert(work_key.to_string());
559 }
560 if !matches!(
561 state,
562 PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
563 ) {
564 state = PersonaLifecycleState::Idle;
565 }
566 }
567 "persona.run.failed" => {
568 state = PersonaLifecycleState::Failed;
569 last_error = event
570 .payload
571 .get("error")
572 .and_then(serde_json::Value::as_str)
573 .map(ToString::to_string);
574 }
575 "persona.trigger.queued" => {
576 if let Some(work_key) = event
577 .payload
578 .get("work_key")
579 .and_then(serde_json::Value::as_str)
580 {
581 queued.insert(work_key.to_string());
582 }
583 if let Some(item) = queued_work_from_event(&event)? {
584 queued_work.insert(item.work_key.clone(), item);
585 }
586 }
587 "persona.trigger.dead_lettered" => disabled_events += 1,
588 "persona.budget.recorded" => {
589 budget_receipt = event
590 .payload
591 .get("receipt_id")
592 .and_then(serde_json::Value::as_str)
593 .map(ToString::to_string);
594 spent.push((
595 event.occurred_at_ms,
596 event
597 .payload
598 .get("cost_usd")
599 .and_then(serde_json::Value::as_f64)
600 .unwrap_or_default(),
601 event
602 .payload
603 .get("tokens")
604 .and_then(serde_json::Value::as_u64)
605 .unwrap_or_default(),
606 ));
607 }
608 "persona.budget.exhausted" => {
609 budget_exhaustion_reason = event
610 .payload
611 .get("reason")
612 .and_then(serde_json::Value::as_str)
613 .map(ToString::to_string);
614 last_error = budget_exhaustion_reason
615 .as_ref()
616 .map(|reason| format!("persona budget exhausted: {reason}"));
617 budget_receipt = event
618 .payload
619 .get("receipt_id")
620 .and_then(serde_json::Value::as_str)
621 .map(ToString::to_string);
622 }
623 kind if kind.starts_with("persona.value.") => {
624 if let Some(receipt) = value_receipt_from_event(&event)? {
625 value_receipts.push(receipt);
626 }
627 }
628 _ => {}
629 }
630 }
631
632 if let Some(lease) = active_lease.as_ref() {
633 if lease.expires_at_ms <= now_ms {
634 active_lease = None;
635 if !matches!(
636 state,
637 PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
638 ) {
639 state = PersonaLifecycleState::Idle;
640 }
641 }
642 }
643
644 queued.retain(|work_key| !completed.contains(work_key));
645 queued_work.retain(|work_key, _| !completed.contains(work_key));
646 let queued_work = queued_work.into_values().collect::<Vec<_>>();
647 let handoff_inbox = queued_work
648 .iter()
649 .filter_map(handoff_inbox_item)
650 .collect::<Vec<_>>();
651
652 let mut budget = budget_status(&binding.budget, &spent, now_ms);
653 if budget.reason.is_none() {
654 if let Some(reason) = budget_exhaustion_reason {
655 budget.exhausted = true;
656 budget.reason = Some(reason);
657 }
658 }
659 if budget.last_receipt_id.is_none() {
660 budget.last_receipt_id = budget_receipt;
661 }
662
663 let current_assignment = active_lease.as_ref().map(assignment_status_from_lease);
664
665 Ok(PersonaStatus {
666 name: binding.name.clone(),
667 template_ref: binding.template_ref.clone(),
668 state,
669 entry_workflow: binding.entry_workflow.clone(),
670 role: binding.name.clone(),
671 current_assignment,
672 last_run: last_run_ms.map(format_ms),
673 next_scheduled_run: next_scheduled_run(binding, last_run_ms, now_ms),
674 active_lease,
675 budget,
676 last_error,
677 queued_events: queued.len(),
678 queued_work,
679 handoff_inbox,
680 value_receipts,
681 disabled_events,
682 paused_event_policy: "queue_then_drain_on_resume".to_string(),
683 })
684}
685
686pub async fn pause_persona(
687 log: &Arc<AnyEventLog>,
688 binding: &PersonaRuntimeBinding,
689 now_ms: i64,
690) -> Result<PersonaStatus, String> {
691 append_persona_event(
692 log,
693 &binding.name,
694 "persona.control.paused",
695 json!({"paused_at_ms": now_ms, "policy": "queue_then_drain_on_resume"}),
696 now_ms,
697 )
698 .await?;
699 persona_status(log, binding, now_ms).await
700}
701
702pub async fn resume_persona(
703 log: &Arc<AnyEventLog>,
704 binding: &PersonaRuntimeBinding,
705 now_ms: i64,
706) -> Result<PersonaStatus, String> {
707 append_persona_event(
708 log,
709 &binding.name,
710 "persona.control.resumed",
711 json!({"resumed_at_ms": now_ms, "drain": true}),
712 now_ms,
713 )
714 .await?;
715 let queued = queued_events(log, &binding.name).await?;
716 for (envelope, cost) in queued {
717 let _ = run_for_envelope(log, binding, envelope, cost, now_ms).await?;
718 }
719 persona_status(log, binding, now_ms).await
720}
721
722pub async fn disable_persona(
723 log: &Arc<AnyEventLog>,
724 binding: &PersonaRuntimeBinding,
725 now_ms: i64,
726) -> Result<PersonaStatus, String> {
727 append_persona_event(
728 log,
729 &binding.name,
730 "persona.control.disabled",
731 json!({"disabled_at_ms": now_ms}),
732 now_ms,
733 )
734 .await?;
735 persona_status(log, binding, now_ms).await
736}
737
738pub async fn fire_schedule(
739 log: &Arc<AnyEventLog>,
740 binding: &PersonaRuntimeBinding,
741 cost: PersonaRunCost,
742 now_ms: i64,
743) -> Result<PersonaRunReceipt, String> {
744 let schedule = binding
745 .schedules
746 .first()
747 .cloned()
748 .unwrap_or_else(|| "manual".to_string());
749 let envelope = PersonaTriggerEnvelope {
750 provider: "schedule".to_string(),
751 kind: "cron.tick".to_string(),
752 subject_key: format!("schedule:{}:{schedule}:{}", binding.name, format_ms(now_ms)),
753 source_event_id: None,
754 received_at_ms: now_ms,
755 metadata: BTreeMap::from([
756 ("persona".to_string(), binding.name.clone()),
757 ("schedule".to_string(), schedule),
758 ("fired_at".to_string(), format_ms(now_ms)),
759 ]),
760 raw: json!({}),
761 };
762 append_persona_event(
763 log,
764 &binding.name,
765 "persona.schedule.fired",
766 json!({"persona": binding.name, "envelope": envelope}),
767 now_ms,
768 )
769 .await?;
770 run_for_envelope(log, binding, envelope, cost, now_ms).await
771}
772
773pub async fn fire_trigger(
774 log: &Arc<AnyEventLog>,
775 binding: &PersonaRuntimeBinding,
776 provider: &str,
777 kind: &str,
778 metadata: BTreeMap<String, String>,
779 cost: PersonaRunCost,
780 now_ms: i64,
781) -> Result<PersonaRunReceipt, String> {
782 let envelope = normalize_trigger_envelope(provider, kind, metadata, now_ms);
783 append_persona_event(
784 log,
785 &binding.name,
786 "persona.trigger.received",
787 json!({"persona": binding.name, "envelope": envelope}),
788 now_ms,
789 )
790 .await?;
791 run_for_envelope(log, binding, envelope, cost, now_ms).await
792}
793
794pub async fn record_persona_spend(
795 log: &Arc<AnyEventLog>,
796 binding: &PersonaRuntimeBinding,
797 cost: PersonaRunCost,
798 now_ms: i64,
799) -> Result<PersonaBudgetStatus, String> {
800 enforce_budget(log, binding, &cost, now_ms).await?;
801 append_budget_record(log, &binding.name, &cost, None, now_ms).await?;
802 persona_status(log, binding, now_ms)
803 .await
804 .map(|status| status.budget)
805}
806
807pub async fn report_repair_worker_status(
815 log: &Arc<AnyEventLog>,
816 binding: &PersonaRuntimeBinding,
817 status: PersonaRepairWorkerStatusUpdate,
818 now_ms: i64,
819) -> Result<bool, String> {
820 let mut status = status;
821 if status.persona_id.is_empty() {
822 status.persona_id = binding.name.clone();
823 }
824 if status.template_ref.is_none() {
825 status.template_ref = binding.template_ref.clone();
826 }
827 if status.occurred_at_ms == 0 {
828 status.occurred_at_ms = now_ms;
829 }
830 if status.last_heartbeat_ms == 0 {
831 status.last_heartbeat_ms = now_ms;
832 }
833
834 if repair_worker_status_recorded(log, &binding.name, &status).await? {
835 return Ok(false);
836 }
837 append_persona_event(
838 log,
839 &binding.name,
840 "persona.repair_worker.status",
841 serde_json::to_value(&status).map_err(|error| error.to_string())?,
842 status.occurred_at_ms,
843 )
844 .await?;
845 record_persona_supervision_event(
846 log,
847 &binding.name,
848 PersonaSupervisionEvent::RepairWorkerStatus(status),
849 )
850 .await?;
851 Ok(true)
852}
853
854pub async fn restore_persona_checkpoint(
861 log: &Arc<AnyEventLog>,
862 binding: &PersonaRuntimeBinding,
863 request: PersonaCheckpointRestoreRequest,
864 now_ms: i64,
865) -> Result<PersonaCheckpointRestoreOutcome, String> {
866 let PersonaCheckpointRestoreRequest {
867 checkpoint_id,
868 work_key,
869 resumed_from,
870 } = request;
871 let status = persona_status(log, binding, now_ms).await?;
872
873 if let Some(prior) = find_checkpoint_restore_ack(log, &binding.name, &checkpoint_id).await? {
874 return Ok(PersonaCheckpointRestoreOutcome {
875 acked: false,
876 update: prior,
877 });
878 }
879
880 let resume_coordinates = resumed_from.unwrap_or_else(|| PersonaCheckpointResume {
881 run_id: None,
882 lease_id: status.active_lease.as_ref().map(|lease| lease.id.clone()),
883 last_run_ms: status
884 .last_run
885 .as_deref()
886 .and_then(|value| parse_rfc3339_ms(value).ok()),
887 queued_work_keys: status
888 .queued_work
889 .iter()
890 .map(|item| item.work_key.clone())
891 .collect(),
892 note: None,
893 });
894
895 let update = PersonaCheckpointUpdate {
896 persona_id: binding.name.clone(),
897 template_ref: binding.template_ref.clone(),
898 action: PersonaCheckpointAction::RestoreAcked,
899 checkpoint_id: checkpoint_id.clone(),
900 work_key,
901 resumed_from: Some(resume_coordinates),
902 occurred_at_ms: now_ms,
903 };
904
905 append_persona_event(
906 log,
907 &binding.name,
908 "persona.checkpoint.restore_acked",
909 serde_json::to_value(&update).map_err(|error| error.to_string())?,
910 now_ms,
911 )
912 .await?;
913 record_persona_supervision_event(
914 log,
915 &binding.name,
916 PersonaSupervisionEvent::Checkpoint(update.clone()),
917 )
918 .await?;
919 Ok(PersonaCheckpointRestoreOutcome {
920 acked: true,
921 update,
922 })
923}
924
925#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
926pub struct PersonaCheckpointRestoreRequest {
927 pub checkpoint_id: String,
928 #[serde(default)]
929 pub work_key: Option<String>,
930 #[serde(default)]
931 pub resumed_from: Option<PersonaCheckpointResume>,
932}
933
934#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
935pub struct PersonaCheckpointRestoreOutcome {
936 pub acked: bool,
937 pub update: PersonaCheckpointUpdate,
938}
939
940async fn repair_worker_status_recorded(
941 log: &Arc<AnyEventLog>,
942 persona: &str,
943 update: &PersonaRepairWorkerStatusUpdate,
944) -> Result<bool, String> {
945 let events = read_persona_events(log, persona).await?;
946 Ok(events.into_iter().any(|(_, event)| {
947 event.kind == "persona.repair_worker.status"
948 && event
949 .payload
950 .get("repair_worker_id")
951 .and_then(serde_json::Value::as_str)
952 == Some(update.repair_worker_id.as_str())
953 && event
954 .payload
955 .get("lifecycle")
956 .and_then(serde_json::Value::as_str)
957 == Some(update.lifecycle.as_str())
958 }))
959}
960
961async fn find_checkpoint_restore_ack(
962 log: &Arc<AnyEventLog>,
963 persona: &str,
964 checkpoint_id: &str,
965) -> Result<Option<PersonaCheckpointUpdate>, String> {
966 let events = read_persona_events(log, persona).await?;
967 for (_, event) in events.into_iter().rev() {
968 if event.kind != "persona.checkpoint.restore_acked" {
969 continue;
970 }
971 if event
972 .payload
973 .get("checkpoint_id")
974 .and_then(serde_json::Value::as_str)
975 != Some(checkpoint_id)
976 {
977 continue;
978 }
979 let update: PersonaCheckpointUpdate =
980 serde_json::from_value(event.payload).map_err(|error| error.to_string())?;
981 return Ok(Some(update));
982 }
983 Ok(None)
984}
985
986async fn run_for_envelope(
987 log: &Arc<AnyEventLog>,
988 binding: &PersonaRuntimeBinding,
989 envelope: PersonaTriggerEnvelope,
990 cost: PersonaRunCost,
991 now_ms: i64,
992) -> Result<PersonaRunReceipt, String> {
993 let pre_queue = queue_snapshot(log, binding, now_ms).await?;
994 let receipt = run_for_envelope_inner(log, binding, envelope, cost, now_ms).await?;
995 let post_queue = queue_snapshot(log, binding, now_ms).await?;
996 emit_queue_position_supervision(log, binding, &pre_queue, &post_queue, now_ms).await?;
997 emit_receipt_supervision(log, binding, &receipt, now_ms).await?;
998 Ok(receipt)
999}
1000
1001async fn run_for_envelope_inner(
1002 log: &Arc<AnyEventLog>,
1003 binding: &PersonaRuntimeBinding,
1004 envelope: PersonaTriggerEnvelope,
1005 cost: PersonaRunCost,
1006 now_ms: i64,
1007) -> Result<PersonaRunReceipt, String> {
1008 let status = persona_status(log, binding, now_ms).await?;
1009 match status.state {
1010 PersonaLifecycleState::Paused => {
1011 append_persona_event(
1012 log,
1013 &binding.name,
1014 "persona.trigger.queued",
1015 json!({
1016 "work_key": envelope.subject_key,
1017 "envelope": envelope,
1018 "cost": cost,
1019 "reason": "paused",
1020 }),
1021 now_ms,
1022 )
1023 .await?;
1024 return Ok(PersonaRunReceipt {
1025 status: "queued".to_string(),
1026 persona: binding.name.clone(),
1027 run_id: None,
1028 work_key: envelope.subject_key,
1029 lease: None,
1030 queued: true,
1031 error: None,
1032 budget_receipt_id: None,
1033 });
1034 }
1035 PersonaLifecycleState::Disabled => {
1036 append_persona_event(
1037 log,
1038 &binding.name,
1039 "persona.trigger.dead_lettered",
1040 json!({
1041 "work_key": envelope.subject_key,
1042 "envelope": envelope,
1043 "reason": "disabled",
1044 }),
1045 now_ms,
1046 )
1047 .await?;
1048 return Ok(PersonaRunReceipt {
1049 status: "dead_lettered".to_string(),
1050 persona: binding.name.clone(),
1051 run_id: None,
1052 work_key: envelope.subject_key,
1053 lease: None,
1054 queued: false,
1055 error: Some("persona is disabled".to_string()),
1056 budget_receipt_id: None,
1057 });
1058 }
1059 _ => {}
1060 }
1061
1062 if let Err(error) = enforce_budget(log, binding, &cost, now_ms).await {
1063 return Ok(PersonaRunReceipt {
1064 status: "budget_exhausted".to_string(),
1065 persona: binding.name.clone(),
1066 run_id: None,
1067 work_key: envelope.subject_key,
1068 lease: None,
1069 queued: false,
1070 error: Some(error),
1071 budget_receipt_id: None,
1072 });
1073 }
1074
1075 if work_completed(log, &binding.name, &envelope.subject_key).await? {
1076 append_persona_event(
1077 log,
1078 &binding.name,
1079 "persona.trigger.duplicate",
1080 json!({
1081 "work_key": envelope.subject_key,
1082 "envelope": envelope,
1083 "reason": "already_completed",
1084 }),
1085 now_ms,
1086 )
1087 .await?;
1088 return Ok(PersonaRunReceipt {
1089 status: "duplicate".to_string(),
1090 persona: binding.name.clone(),
1091 run_id: None,
1092 work_key: envelope.subject_key,
1093 lease: None,
1094 queued: false,
1095 error: None,
1096 budget_receipt_id: None,
1097 });
1098 }
1099
1100 let Some(lease) = acquire_lease(
1101 log,
1102 binding,
1103 &envelope.subject_key,
1104 "persona-runtime",
1105 DEFAULT_LEASE_TTL_MS,
1106 now_ms,
1107 )
1108 .await?
1109 else {
1110 return Ok(PersonaRunReceipt {
1111 status: "lease_busy".to_string(),
1112 persona: binding.name.clone(),
1113 run_id: None,
1114 work_key: envelope.subject_key,
1115 lease: status.active_lease,
1116 queued: false,
1117 error: Some("active lease already owns persona work".to_string()),
1118 budget_receipt_id: None,
1119 });
1120 };
1121
1122 let run_id = Uuid::now_v7();
1123 let value_metadata = run_value_metadata(&envelope, &lease, &cost);
1124 append_persona_event(
1125 log,
1126 &binding.name,
1127 "persona.run.started",
1128 json!({
1129 "work_key": envelope.subject_key,
1130 "run_id": run_id,
1131 "started_at_ms": now_ms,
1132 "entry_workflow": binding.entry_workflow,
1133 "lease_id": lease.id,
1134 }),
1135 now_ms,
1136 )
1137 .await?;
1138 emit_persona_value_event(
1139 log,
1140 binding,
1141 run_id,
1142 PersonaValueEventDelta {
1143 kind: PersonaValueEventKind::RunStarted,
1144 metadata: value_metadata.clone(),
1145 ..Default::default()
1146 },
1147 now_ms,
1148 )
1149 .await?;
1150 let budget_receipt_id =
1151 append_budget_record(log, &binding.name, &cost, Some(&lease.id), now_ms).await?;
1152 if cost.avoided_cost_usd > 0.0 || cost.deterministic_steps > 0 {
1153 emit_persona_value_event(
1154 log,
1155 binding,
1156 run_id,
1157 PersonaValueEventDelta {
1158 kind: PersonaValueEventKind::DeterministicExecution,
1159 avoided_cost_usd: cost.avoided_cost_usd,
1160 deterministic_steps: cost.deterministic_steps.max(1),
1161 metadata: value_metadata.clone(),
1162 ..Default::default()
1163 },
1164 now_ms,
1165 )
1166 .await?;
1167 }
1168 if cost.frontier_escalations > 0 {
1169 emit_persona_value_event(
1170 log,
1171 binding,
1172 run_id,
1173 PersonaValueEventDelta {
1174 kind: PersonaValueEventKind::FrontierEscalation,
1175 paid_cost_usd: cost.cost_usd,
1176 llm_steps: cost.llm_steps.max(cost.frontier_escalations),
1177 metadata: value_metadata.clone(),
1178 ..Default::default()
1179 },
1180 now_ms,
1181 )
1182 .await?;
1183 }
1184 let completion_paid_cost = if cost.frontier_escalations > 0 {
1185 0.0
1186 } else {
1187 cost.cost_usd
1188 };
1189 let completion_llm_steps = if cost.frontier_escalations > 0 {
1190 0
1191 } else {
1192 cost.llm_steps
1193 };
1194 emit_persona_value_event(
1195 log,
1196 binding,
1197 run_id,
1198 PersonaValueEventDelta {
1199 kind: PersonaValueEventKind::RunCompleted,
1200 paid_cost_usd: completion_paid_cost,
1201 llm_steps: completion_llm_steps,
1202 metadata: value_metadata,
1203 ..Default::default()
1204 },
1205 now_ms,
1206 )
1207 .await?;
1208 append_persona_event(
1209 log,
1210 &binding.name,
1211 "persona.run.completed",
1212 json!({
1213 "work_key": envelope.subject_key,
1214 "run_id": run_id,
1215 "completed_at_ms": now_ms,
1216 "entry_workflow": binding.entry_workflow,
1217 "lease_id": lease.id,
1218 }),
1219 now_ms,
1220 )
1221 .await?;
1222 append_persona_event(
1223 log,
1224 &binding.name,
1225 "persona.lease.released",
1226 json!({
1227 "id": lease.id,
1228 "work_key": envelope.subject_key,
1229 "released_at_ms": now_ms,
1230 }),
1231 now_ms,
1232 )
1233 .await?;
1234 Ok(PersonaRunReceipt {
1235 status: "completed".to_string(),
1236 persona: binding.name.clone(),
1237 run_id: Some(run_id),
1238 work_key: envelope.subject_key,
1239 lease: Some(lease),
1240 queued: false,
1241 error: None,
1242 budget_receipt_id: Some(budget_receipt_id),
1243 })
1244}
1245
1246async fn acquire_lease(
1247 log: &Arc<AnyEventLog>,
1248 binding: &PersonaRuntimeBinding,
1249 work_key: &str,
1250 holder: &str,
1251 ttl_ms: i64,
1252 now_ms: i64,
1253) -> Result<Option<PersonaLease>, String> {
1254 let status = persona_status(log, binding, now_ms).await?;
1255 if let Some(lease) = status.active_lease {
1256 if lease.expires_at_ms > now_ms {
1257 append_persona_event(
1258 log,
1259 &binding.name,
1260 "persona.lease.conflict",
1261 json!({
1262 "active_lease": lease,
1263 "requested_work_key": work_key,
1264 "at_ms": now_ms,
1265 }),
1266 now_ms,
1267 )
1268 .await?;
1269 return Ok(None);
1270 }
1271 append_persona_event(
1272 log,
1273 &binding.name,
1274 "persona.lease.expired",
1275 json!({
1276 "id": lease.id,
1277 "work_key": lease.work_key,
1278 "expired_at_ms": now_ms,
1279 }),
1280 now_ms,
1281 )
1282 .await?;
1283 }
1284
1285 let lease = PersonaLease {
1286 id: format!("persona_lease_{}", Uuid::now_v7()),
1287 holder: holder.to_string(),
1288 work_key: work_key.to_string(),
1289 acquired_at_ms: now_ms,
1290 expires_at_ms: now_ms + ttl_ms,
1291 };
1292 append_persona_event(
1293 log,
1294 &binding.name,
1295 "persona.lease.acquired",
1296 serde_json::to_value(&lease).map_err(|error| error.to_string())?,
1297 now_ms,
1298 )
1299 .await?;
1300 Ok(Some(lease))
1301}
1302
1303async fn enforce_budget(
1304 log: &Arc<AnyEventLog>,
1305 binding: &PersonaRuntimeBinding,
1306 cost: &PersonaRunCost,
1307 now_ms: i64,
1308) -> Result<(), String> {
1309 let status = persona_status(log, binding, now_ms).await?;
1310 let reason = if binding
1311 .budget
1312 .run_usd
1313 .is_some_and(|limit| cost.cost_usd > limit)
1314 {
1315 Some("run_usd")
1316 } else if binding
1317 .budget
1318 .daily_usd
1319 .is_some_and(|limit| status.budget.spent_today_usd + cost.cost_usd > limit)
1320 {
1321 Some("daily_usd")
1322 } else if binding
1323 .budget
1324 .hourly_usd
1325 .is_some_and(|limit| status.budget.spent_this_hour_usd + cost.cost_usd > limit)
1326 {
1327 Some("hourly_usd")
1328 } else if binding
1329 .budget
1330 .max_tokens
1331 .is_some_and(|limit| status.budget.tokens_today + cost.tokens > limit)
1332 {
1333 Some("max_tokens")
1334 } else {
1335 None
1336 };
1337
1338 if let Some(reason) = reason {
1339 let receipt_id = format!("persona_budget_{}", Uuid::now_v7());
1340 append_persona_event(
1341 log,
1342 &binding.name,
1343 "persona.budget.exhausted",
1344 json!({
1345 "receipt_id": receipt_id,
1346 "reason": reason,
1347 "attempted_cost_usd": cost.cost_usd,
1348 "attempted_tokens": cost.tokens,
1349 "persona": binding.name,
1350 }),
1351 now_ms,
1352 )
1353 .await?;
1354 return Err(format!("persona budget exhausted: {reason}"));
1355 }
1356
1357 Ok(())
1358}
1359
1360async fn append_budget_record(
1361 log: &Arc<AnyEventLog>,
1362 persona: &str,
1363 cost: &PersonaRunCost,
1364 lease_id: Option<&str>,
1365 now_ms: i64,
1366) -> Result<String, String> {
1367 let receipt_id = format!("persona_budget_{}", Uuid::now_v7());
1368 append_persona_event(
1369 log,
1370 persona,
1371 "persona.budget.recorded",
1372 json!({
1373 "receipt_id": receipt_id,
1374 "persona": persona,
1375 "cost_usd": cost.cost_usd,
1376 "tokens": cost.tokens,
1377 "lease_id": lease_id,
1378 }),
1379 now_ms,
1380 )
1381 .await?;
1382 Ok(receipt_id)
1383}
1384
1385fn normalize_trigger_envelope(
1386 provider: &str,
1387 kind: &str,
1388 metadata: BTreeMap<String, String>,
1389 now_ms: i64,
1390) -> PersonaTriggerEnvelope {
1391 let provider = provider.to_ascii_lowercase();
1392 let kind = kind.to_string();
1393 let source_event_id = metadata
1394 .get("event_id")
1395 .or_else(|| metadata.get("id"))
1396 .cloned();
1397 let subject_key = match provider.as_str() {
1398 "github" => {
1399 let repo = metadata
1400 .get("repository")
1401 .or_else(|| metadata.get("repository.full_name"))
1402 .cloned()
1403 .unwrap_or_else(|| "unknown".to_string());
1404 if let Some(number) = metadata
1405 .get("pr")
1406 .or_else(|| metadata.get("pull_request.number"))
1407 .or_else(|| metadata.get("number"))
1408 {
1409 format!("github:{repo}:pr:{number}")
1410 } else if let Some(check) = metadata
1411 .get("check_run.name")
1412 .or_else(|| metadata.get("check_name"))
1413 {
1414 format!("github:{repo}:check:{check}")
1415 } else {
1416 format!("github:{repo}:{kind}")
1417 }
1418 }
1419 "linear" => {
1420 let issue = metadata
1421 .get("issue_key")
1422 .or_else(|| metadata.get("issue.identifier"))
1423 .or_else(|| metadata.get("issue_id"))
1424 .or_else(|| metadata.get("id"))
1425 .cloned()
1426 .unwrap_or_else(|| "unknown".to_string());
1427 format!("linear:issue:{issue}")
1428 }
1429 "slack" => {
1430 let channel = metadata
1431 .get("channel")
1432 .or_else(|| metadata.get("channel_id"))
1433 .cloned()
1434 .unwrap_or_else(|| "unknown".to_string());
1435 let ts = metadata
1436 .get("ts")
1437 .or_else(|| metadata.get("event_ts"))
1438 .cloned()
1439 .unwrap_or_else(|| "unknown".to_string());
1440 format!("slack:{channel}:{ts}")
1441 }
1442 "webhook" => metadata
1443 .get("dedupe_key")
1444 .or_else(|| metadata.get("event_id"))
1445 .map(|value| format!("webhook:{value}"))
1446 .unwrap_or_else(|| format!("webhook:{kind}:{}", Uuid::now_v7())),
1447 _ => metadata
1448 .get("dedupe_key")
1449 .or_else(|| metadata.get("event_id"))
1450 .map(|value| format!("{provider}:{kind}:{value}"))
1451 .unwrap_or_else(|| format!("{provider}:{kind}:{}", Uuid::now_v7())),
1452 };
1453
1454 PersonaTriggerEnvelope {
1455 provider,
1456 kind,
1457 subject_key,
1458 source_event_id,
1459 received_at_ms: now_ms,
1460 raw: json!({"metadata": metadata}),
1461 metadata,
1462 }
1463}
1464
1465async fn queued_events(
1466 log: &Arc<AnyEventLog>,
1467 persona: &str,
1468) -> Result<Vec<(PersonaTriggerEnvelope, PersonaRunCost)>, String> {
1469 let events = read_persona_events(log, persona).await?;
1470 let mut queued = BTreeMap::<String, (PersonaTriggerEnvelope, PersonaRunCost)>::new();
1471 let mut completed = BTreeSet::<String>::new();
1472 for (_, event) in events {
1473 match event.kind.as_str() {
1474 "persona.trigger.queued" => {
1475 let Some(envelope) = event.payload.get("envelope") else {
1476 continue;
1477 };
1478 let envelope: PersonaTriggerEnvelope =
1479 serde_json::from_value(envelope.clone()).map_err(|error| error.to_string())?;
1480 let cost = event
1481 .payload
1482 .get("cost")
1483 .cloned()
1484 .map(serde_json::from_value::<PersonaRunCost>)
1485 .transpose()
1486 .map_err(|error| error.to_string())?
1487 .unwrap_or_default();
1488 queued.insert(envelope.subject_key.clone(), (envelope, cost));
1489 }
1490 "persona.run.completed" => {
1491 if let Some(work_key) = event
1492 .payload
1493 .get("work_key")
1494 .and_then(serde_json::Value::as_str)
1495 {
1496 completed.insert(work_key.to_string());
1497 }
1498 }
1499 _ => {}
1500 }
1501 }
1502 queued.retain(|work_key, _| !completed.contains(work_key));
1503 Ok(queued.into_values().collect())
1504}
1505
1506fn assignment_status_from_lease(lease: &PersonaLease) -> PersonaAssignmentStatus {
1507 PersonaAssignmentStatus {
1508 work_key: lease.work_key.clone(),
1509 lease_id: lease.id.clone(),
1510 holder: lease.holder.clone(),
1511 acquired_at: format_ms(lease.acquired_at_ms),
1512 expires_at: format_ms(lease.expires_at_ms),
1513 }
1514}
1515
1516fn queued_work_from_event(event: &LogEvent) -> Result<Option<PersonaQueuedWork>, String> {
1517 let Some(envelope) = event.payload.get("envelope") else {
1518 return Ok(None);
1519 };
1520 let envelope: PersonaTriggerEnvelope =
1521 serde_json::from_value(envelope.clone()).map_err(|error| error.to_string())?;
1522 Ok(Some(PersonaQueuedWork {
1523 work_key: envelope.subject_key,
1524 provider: envelope.provider,
1525 kind: envelope.kind,
1526 queued_at: format_ms(event.occurred_at_ms),
1527 reason: event
1528 .payload
1529 .get("reason")
1530 .and_then(serde_json::Value::as_str)
1531 .unwrap_or("queued")
1532 .to_string(),
1533 source_event_id: envelope.source_event_id,
1534 metadata: envelope.metadata,
1535 }))
1536}
1537
1538fn handoff_inbox_item(work: &PersonaQueuedWork) -> Option<PersonaHandoffInboxItem> {
1539 if work.provider != "handoff" && !work.metadata.contains_key("handoff_id") {
1540 return None;
1541 }
1542 Some(PersonaHandoffInboxItem {
1543 work_key: work.work_key.clone(),
1544 handoff_id: work.metadata.get("handoff_id").cloned(),
1545 handoff_kind: work
1546 .metadata
1547 .get("handoff_kind")
1548 .or_else(|| work.metadata.get("kind"))
1549 .cloned(),
1550 source_persona: work.metadata.get("source_persona").cloned(),
1551 task: work.metadata.get("task").cloned(),
1552 queued_at: work.queued_at.clone(),
1553 reason: work.reason.clone(),
1554 })
1555}
1556
1557fn value_receipt_from_event(event: &LogEvent) -> Result<Option<PersonaValueReceipt>, String> {
1558 let Ok(value_event) = serde_json::from_value::<PersonaValueEvent>(event.payload.clone()) else {
1559 return Ok(None);
1560 };
1561 Ok(Some(PersonaValueReceipt {
1562 kind: value_event.kind,
1563 run_id: value_event.run_id,
1564 occurred_at: value_event
1565 .occurred_at
1566 .format(&Rfc3339)
1567 .map_err(|error| error.to_string())?,
1568 paid_cost_usd: value_event.paid_cost_usd,
1569 avoided_cost_usd: value_event.avoided_cost_usd,
1570 deterministic_steps: value_event.deterministic_steps,
1571 llm_steps: value_event.llm_steps,
1572 metadata: value_event.metadata,
1573 }))
1574}
1575
1576async fn work_completed(
1577 log: &Arc<AnyEventLog>,
1578 persona: &str,
1579 work_key: &str,
1580) -> Result<bool, String> {
1581 let events = read_persona_events(log, persona).await?;
1582 Ok(events.into_iter().any(|(_, event)| {
1583 event.kind == "persona.run.completed"
1584 && event
1585 .payload
1586 .get("work_key")
1587 .and_then(serde_json::Value::as_str)
1588 == Some(work_key)
1589 }))
1590}
1591
1592async fn read_persona_events(
1593 log: &Arc<AnyEventLog>,
1594 persona: &str,
1595) -> Result<Vec<(u64, LogEvent)>, String> {
1596 let topic = runtime_topic()?;
1597 Ok(log
1598 .read_range(&topic, None, usize::MAX)
1599 .await
1600 .map_err(|error| error.to_string())?
1601 .into_iter()
1602 .filter(|(_, event)| {
1603 event
1604 .headers
1605 .get("persona")
1606 .is_some_and(|name| name == persona)
1607 })
1608 .collect())
1609}
1610
1611async fn append_persona_event(
1612 log: &Arc<AnyEventLog>,
1613 persona: &str,
1614 kind: &str,
1615 payload: serde_json::Value,
1616 now_ms: i64,
1617) -> Result<u64, String> {
1618 let mut headers = BTreeMap::new();
1619 headers.insert("persona".to_string(), persona.to_string());
1620 let event = LogEvent {
1621 kind: kind.to_string(),
1622 payload,
1623 headers,
1624 occurred_at_ms: now_ms,
1625 };
1626 log.append(&runtime_topic()?, event)
1627 .await
1628 .map_err(|error| error.to_string())
1629}
1630
1631struct PersonaValueEventDelta {
1632 kind: PersonaValueEventKind,
1633 paid_cost_usd: f64,
1634 avoided_cost_usd: f64,
1635 deterministic_steps: i64,
1636 llm_steps: i64,
1637 metadata: serde_json::Value,
1638}
1639
1640impl Default for PersonaValueEventDelta {
1641 fn default() -> Self {
1642 Self {
1643 kind: PersonaValueEventKind::RunCompleted,
1644 paid_cost_usd: 0.0,
1645 avoided_cost_usd: 0.0,
1646 deterministic_steps: 0,
1647 llm_steps: 0,
1648 metadata: serde_json::Value::Null,
1649 }
1650 }
1651}
1652
1653async fn emit_persona_value_event(
1654 log: &Arc<AnyEventLog>,
1655 binding: &PersonaRuntimeBinding,
1656 run_id: Uuid,
1657 delta: PersonaValueEventDelta,
1658 now_ms: i64,
1659) -> Result<(), String> {
1660 let event = PersonaValueEvent {
1661 persona_id: binding.name.clone(),
1662 template_ref: binding.template_ref.clone(),
1663 run_id: Some(run_id),
1664 kind: delta.kind,
1665 paid_cost_usd: delta.paid_cost_usd.max(0.0),
1666 avoided_cost_usd: delta.avoided_cost_usd.max(0.0),
1667 deterministic_steps: delta.deterministic_steps.max(0),
1668 llm_steps: delta.llm_steps.max(0),
1669 metadata: delta.metadata,
1670 occurred_at: offset_datetime_from_ms(now_ms),
1671 };
1672 append_persona_event(
1673 log,
1674 &binding.name,
1675 &format!("persona.value.{}", event.kind.as_str()),
1676 serde_json::to_value(&event).map_err(|error| error.to_string())?,
1677 now_ms,
1678 )
1679 .await?;
1680 emit_persona_value_sink_event(&event);
1681 Ok(())
1682}
1683
1684fn emit_persona_value_sink_event(event: &PersonaValueEvent) {
1685 for sink in persona_value_sinks().snapshot() {
1686 sink.handle_value_event(event);
1687 }
1688}
1689
1690fn emit_persona_supervision_sink_event(event: &PersonaSupervisionEvent) {
1691 for sink in persona_supervision_sinks().snapshot() {
1692 sink.handle_supervision_event(event);
1693 }
1694}
1695
1696async fn record_persona_supervision_event(
1697 log: &Arc<AnyEventLog>,
1698 persona: &str,
1699 event: PersonaSupervisionEvent,
1700) -> Result<(), String> {
1701 let update_kind = event.update_kind();
1702 let occurred_at_ms = event.occurred_at_ms();
1703 append_persona_event(
1704 log,
1705 persona,
1706 &format!("persona.supervision.{update_kind}"),
1707 serde_json::to_value(&event).map_err(|error| error.to_string())?,
1708 occurred_at_ms,
1709 )
1710 .await?;
1711 emit_persona_supervision_sink_event(&event);
1712 Ok(())
1713}
1714
1715#[derive(Clone, Debug)]
1716struct QueueEntry {
1717 work_key: String,
1718 queued_at_ms: i64,
1719}
1720
1721async fn queue_snapshot(
1722 log: &Arc<AnyEventLog>,
1723 binding: &PersonaRuntimeBinding,
1724 now_ms: i64,
1725) -> Result<Vec<QueueEntry>, String> {
1726 let status = persona_status(log, binding, now_ms).await?;
1727 Ok(status
1728 .queued_work
1729 .into_iter()
1730 .map(|item| QueueEntry {
1731 queued_at_ms: parse_rfc3339_ms(&item.queued_at).unwrap_or(now_ms),
1732 work_key: item.work_key,
1733 })
1734 .collect())
1735}
1736
1737async fn emit_queue_position_supervision(
1738 log: &Arc<AnyEventLog>,
1739 binding: &PersonaRuntimeBinding,
1740 before: &[QueueEntry],
1741 after: &[QueueEntry],
1742 now_ms: i64,
1743) -> Result<(), String> {
1744 use std::collections::HashSet;
1745 let before_keys: HashSet<&str> = before.iter().map(|e| e.work_key.as_str()).collect();
1746 let after_keys: HashSet<&str> = after.iter().map(|e| e.work_key.as_str()).collect();
1747 let after_depth = after.len() as i64;
1748
1749 for (index, entry) in after.iter().enumerate() {
1750 if !before_keys.contains(entry.work_key.as_str()) {
1751 record_persona_supervision_event(
1752 log,
1753 &binding.name,
1754 PersonaSupervisionEvent::QueuePosition(PersonaQueuePositionUpdate {
1755 persona_id: binding.name.clone(),
1756 template_ref: binding.template_ref.clone(),
1757 work_key: entry.work_key.clone(),
1758 queue_depth: after_depth,
1759 position: (index + 1) as i64,
1760 queued_at_ms: entry.queued_at_ms,
1761 occurred_at_ms: now_ms,
1762 }),
1763 )
1764 .await?;
1765 }
1766 }
1767 for entry in before {
1768 if !after_keys.contains(entry.work_key.as_str()) {
1769 record_persona_supervision_event(
1770 log,
1771 &binding.name,
1772 PersonaSupervisionEvent::QueuePosition(PersonaQueuePositionUpdate {
1773 persona_id: binding.name.clone(),
1774 template_ref: binding.template_ref.clone(),
1775 work_key: entry.work_key.clone(),
1776 queue_depth: after_depth,
1777 position: 0,
1778 queued_at_ms: entry.queued_at_ms,
1779 occurred_at_ms: now_ms,
1780 }),
1781 )
1782 .await?;
1783 }
1784 }
1785 Ok(())
1786}
1787
1788async fn emit_receipt_supervision(
1789 log: &Arc<AnyEventLog>,
1790 binding: &PersonaRuntimeBinding,
1791 receipt: &PersonaRunReceipt,
1792 now_ms: i64,
1793) -> Result<(), String> {
1794 record_persona_supervision_event(
1795 log,
1796 &binding.name,
1797 PersonaSupervisionEvent::Receipt(PersonaReceiptUpdate {
1798 persona_id: binding.name.clone(),
1799 template_ref: binding.template_ref.clone(),
1800 receipt: receipt.clone(),
1801 occurred_at_ms: now_ms,
1802 }),
1803 )
1804 .await
1805}
1806
1807fn run_value_metadata(
1808 envelope: &PersonaTriggerEnvelope,
1809 lease: &PersonaLease,
1810 cost: &PersonaRunCost,
1811) -> serde_json::Value {
1812 let mut metadata = serde_json::Map::new();
1813 metadata.insert("work_key".to_string(), json!(envelope.subject_key));
1814 metadata.insert("trigger_provider".to_string(), json!(envelope.provider));
1815 metadata.insert("trigger_kind".to_string(), json!(envelope.kind));
1816 metadata.insert("lease_id".to_string(), json!(lease.id));
1817 metadata.insert("tokens".to_string(), json!(cost.tokens));
1818 if cost.frontier_escalations > 0 {
1819 metadata.insert(
1820 "frontier_escalations".to_string(),
1821 json!(cost.frontier_escalations),
1822 );
1823 }
1824 match &cost.metadata {
1825 serde_json::Value::Null => {}
1826 serde_json::Value::Object(extra) => {
1827 metadata.extend(
1828 extra
1829 .iter()
1830 .map(|(key, value)| (key.clone(), value.clone())),
1831 );
1832 }
1833 extra => {
1834 metadata.insert("run_cost_metadata".to_string(), extra.clone());
1835 }
1836 }
1837 serde_json::Value::Object(metadata)
1838}
1839
1840fn budget_status(
1841 policy: &PersonaBudgetPolicy,
1842 spent: &[(i64, f64, u64)],
1843 now_ms: i64,
1844) -> PersonaBudgetStatus {
1845 let day_start = now_ms - (now_ms.rem_euclid(86_400_000));
1846 let hour_start = now_ms - (now_ms.rem_euclid(3_600_000));
1847 let mut spent_today_usd = 0.0;
1848 let mut spent_this_hour_usd = 0.0;
1849 let mut tokens_today = 0u64;
1850 let mut spent_last_run_usd = 0.0;
1851 for (at_ms, cost, tokens) in spent {
1852 spent_last_run_usd = *cost;
1853 if *at_ms >= day_start {
1854 spent_today_usd += cost;
1855 tokens_today += tokens;
1856 }
1857 if *at_ms >= hour_start {
1858 spent_this_hour_usd += cost;
1859 }
1860 }
1861
1862 let remaining_today_usd = policy
1863 .daily_usd
1864 .map(|limit| (limit - spent_today_usd).max(0.0));
1865 let remaining_hour_usd = policy
1866 .hourly_usd
1867 .map(|limit| (limit - spent_this_hour_usd).max(0.0));
1868 let reason = if policy
1869 .daily_usd
1870 .is_some_and(|limit| spent_today_usd >= limit && limit >= 0.0)
1871 {
1872 Some("daily_usd".to_string())
1873 } else if policy
1874 .hourly_usd
1875 .is_some_and(|limit| spent_this_hour_usd >= limit && limit >= 0.0)
1876 {
1877 Some("hourly_usd".to_string())
1878 } else if policy
1879 .max_tokens
1880 .is_some_and(|limit| tokens_today >= limit && limit > 0)
1881 {
1882 Some("max_tokens".to_string())
1883 } else {
1884 None
1885 };
1886
1887 PersonaBudgetStatus {
1888 daily_usd: policy.daily_usd,
1889 hourly_usd: policy.hourly_usd,
1890 run_usd: policy.run_usd,
1891 max_tokens: policy.max_tokens,
1892 spent_today_usd,
1893 spent_this_hour_usd,
1894 spent_last_run_usd,
1895 tokens_today,
1896 remaining_today_usd,
1897 remaining_hour_usd,
1898 exhausted: reason.is_some(),
1899 reason,
1900 last_receipt_id: None,
1901 }
1902}
1903
1904fn next_scheduled_run(
1905 binding: &PersonaRuntimeBinding,
1906 last_run_ms: Option<i64>,
1907 now_ms: i64,
1908) -> Option<String> {
1909 binding
1910 .schedules
1911 .iter()
1912 .filter_map(|schedule| next_cron_ms(schedule, last_run_ms.unwrap_or(now_ms)).ok())
1913 .min()
1914 .map(format_ms)
1915}
1916
1917fn next_cron_ms(schedule: &str, after_ms: i64) -> Result<i64, String> {
1918 let cron = schedule
1919 .parse::<Cron>()
1920 .map_err(|error| error.to_string())?;
1921 let after = Utc
1922 .timestamp_millis_opt(after_ms)
1923 .single()
1924 .ok_or_else(|| "invalid timestamp".to_string())?;
1925 let next = cron
1926 .find_next_occurrence(&after, false)
1927 .map_err(|error| error.to_string())?;
1928 Ok(next.timestamp_millis())
1929}
1930
1931pub fn now_ms() -> i64 {
1932 OffsetDateTime::now_utc().unix_timestamp_nanos() as i64 / 1_000_000
1933}
1934
1935fn offset_datetime_from_ms(ms: i64) -> OffsetDateTime {
1936 OffsetDateTime::from_unix_timestamp_nanos((ms as i128) * 1_000_000)
1937 .unwrap_or(OffsetDateTime::UNIX_EPOCH)
1938}
1939
1940pub fn format_ms(ms: i64) -> String {
1941 offset_datetime_from_ms(ms)
1942 .format(&Rfc3339)
1943 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
1944}
1945
1946pub fn parse_rfc3339_ms(value: &str) -> Result<i64, String> {
1947 let ts = OffsetDateTime::parse(value, &Rfc3339)
1948 .map_err(|error| format!("invalid RFC3339 timestamp '{value}': {error}"))?;
1949 Ok(ts.unix_timestamp_nanos() as i64 / 1_000_000)
1950}
1951
1952fn runtime_topic() -> Result<Topic, String> {
1953 Topic::new(PERSONA_RUNTIME_TOPIC).map_err(|error| error.to_string())
1954}
1955
1956#[cfg(test)]
1957mod tests {
1958 use super::*;
1959 use crate::event_log::{AnyEventLog, MemoryEventLog};
1960 use std::sync::Mutex;
1961
1962 struct CapturingValueSink {
1963 events: Arc<Mutex<Vec<PersonaValueEvent>>>,
1964 }
1965
1966 impl PersonaValueSink for CapturingValueSink {
1967 fn handle_value_event(&self, event: &PersonaValueEvent) {
1968 self.events.lock().unwrap().push(event.clone());
1969 }
1970 }
1971
1972 fn binding() -> PersonaRuntimeBinding {
1973 PersonaRuntimeBinding {
1974 name: "merge_captain".to_string(),
1975 template_ref: Some("software_factory@v0".to_string()),
1976 entry_workflow: "workflows/merge.harn#run".to_string(),
1977 schedules: vec!["*/30 * * * *".to_string()],
1978 triggers: vec!["github.pr_opened".to_string()],
1979 budget: PersonaBudgetPolicy {
1980 daily_usd: Some(0.02),
1981 hourly_usd: None,
1982 run_usd: Some(0.02),
1983 max_tokens: Some(100),
1984 },
1985 }
1986 }
1987
1988 fn log() -> Arc<AnyEventLog> {
1989 Arc::new(AnyEventLog::Memory(MemoryEventLog::new(64)))
1990 }
1991
1992 #[tokio::test]
1993 async fn schedule_tick_records_lifecycle_status_and_receipt() {
1994 let log = log();
1995 let binding = binding();
1996 let now = parse_rfc3339_ms("2026-04-24T12:30:00Z").unwrap();
1997 let receipt = fire_schedule(
1998 &log,
1999 &binding,
2000 PersonaRunCost {
2001 cost_usd: 0.01,
2002 tokens: 10,
2003 ..Default::default()
2004 },
2005 now,
2006 )
2007 .await
2008 .unwrap();
2009 assert_eq!(receipt.status, "completed");
2010 assert!(receipt.lease.is_some());
2011 let status = persona_status(&log, &binding, now).await.unwrap();
2012 assert_eq!(status.state, PersonaLifecycleState::Idle);
2013 assert_eq!(status.last_run.as_deref(), Some("2026-04-24T12:30:00Z"));
2014 assert!(status.next_scheduled_run.is_some());
2015 assert_eq!(status.budget.spent_today_usd, 0.01);
2016 }
2017
2018 #[tokio::test]
2019 async fn paused_personas_queue_and_resume_drains_once() {
2020 let log = log();
2021 let binding = binding();
2022 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2023 pause_persona(&log, &binding, now).await.unwrap();
2024 let receipt = fire_trigger(
2025 &log,
2026 &binding,
2027 "github",
2028 "pull_request",
2029 BTreeMap::from([
2030 ("repository".to_string(), "burin-labs/harn".to_string()),
2031 ("number".to_string(), "462".to_string()),
2032 ]),
2033 PersonaRunCost::default(),
2034 now,
2035 )
2036 .await
2037 .unwrap();
2038 assert_eq!(receipt.status, "queued");
2039 assert_eq!(
2040 persona_status(&log, &binding, now)
2041 .await
2042 .unwrap()
2043 .queued_events,
2044 1
2045 );
2046 let status = resume_persona(&log, &binding, now + 1000).await.unwrap();
2047 assert_eq!(status.state, PersonaLifecycleState::Idle);
2048 assert_eq!(status.queued_events, 0);
2049 }
2050
2051 #[tokio::test]
2052 async fn resumed_queued_work_reuses_original_budget_cost() {
2053 let log = log();
2054 let mut binding = binding();
2055 binding.budget.run_usd = Some(0.01);
2056 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2057 pause_persona(&log, &binding, now).await.unwrap();
2058 let queued = fire_trigger(
2059 &log,
2060 &binding,
2061 "github",
2062 "pull_request",
2063 BTreeMap::from([
2064 ("repository".to_string(), "burin-labs/harn".to_string()),
2065 ("number".to_string(), "1379".to_string()),
2066 ]),
2067 PersonaRunCost {
2068 cost_usd: 0.02,
2069 tokens: 1,
2070 ..Default::default()
2071 },
2072 now + 1,
2073 )
2074 .await
2075 .unwrap();
2076 assert_eq!(queued.status, "queued");
2077
2078 let status = resume_persona(&log, &binding, now + 2).await.unwrap();
2079
2080 assert_eq!(status.budget.reason.as_deref(), Some("run_usd"));
2081 assert!(status
2082 .last_error
2083 .as_deref()
2084 .is_some_and(|error| error.contains("run_usd")));
2085 assert_eq!(status.queued_events, 1);
2086 }
2087
2088 #[tokio::test]
2089 async fn duplicate_trigger_envelope_is_not_processed_twice() {
2090 let log = log();
2091 let binding = binding();
2092 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2093 let metadata = BTreeMap::from([
2094 ("repository".to_string(), "burin-labs/harn".to_string()),
2095 ("number".to_string(), "462".to_string()),
2096 ]);
2097 let first = fire_trigger(
2098 &log,
2099 &binding,
2100 "github",
2101 "pull_request",
2102 metadata.clone(),
2103 PersonaRunCost::default(),
2104 now,
2105 )
2106 .await
2107 .unwrap();
2108 let second = fire_trigger(
2109 &log,
2110 &binding,
2111 "github",
2112 "pull_request",
2113 metadata,
2114 PersonaRunCost::default(),
2115 now + 1000,
2116 )
2117 .await
2118 .unwrap();
2119 assert_eq!(first.status, "completed");
2120 assert_eq!(second.status, "duplicate");
2121 assert!(second.lease.is_none());
2122 }
2123
2124 #[tokio::test]
2125 async fn disabled_personas_dead_letter_events() {
2126 let log = log();
2127 let binding = binding();
2128 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2129 disable_persona(&log, &binding, now).await.unwrap();
2130 let receipt = fire_trigger(
2131 &log,
2132 &binding,
2133 "slack",
2134 "message",
2135 BTreeMap::from([
2136 ("channel".to_string(), "C123".to_string()),
2137 ("ts".to_string(), "1713988800.000100".to_string()),
2138 ]),
2139 PersonaRunCost::default(),
2140 now,
2141 )
2142 .await
2143 .unwrap();
2144 assert_eq!(receipt.status, "dead_lettered");
2145 let status = persona_status(&log, &binding, now).await.unwrap();
2146 assert_eq!(status.state, PersonaLifecycleState::Disabled);
2147 assert_eq!(status.disabled_events, 1);
2148 }
2149
2150 #[tokio::test]
2151 async fn budget_exhaustion_blocks_expensive_work() {
2152 let log = log();
2153 let mut binding = binding();
2154 binding.budget.daily_usd = Some(0.01);
2155 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2156 let receipt = fire_trigger(
2157 &log,
2158 &binding,
2159 "linear",
2160 "issue",
2161 BTreeMap::from([("issue_key".to_string(), "HAR-462".to_string())]),
2162 PersonaRunCost {
2163 cost_usd: 0.02,
2164 tokens: 1,
2165 ..Default::default()
2166 },
2167 now,
2168 )
2169 .await
2170 .unwrap();
2171 assert_eq!(receipt.status, "budget_exhausted");
2172 let status = persona_status(&log, &binding, now).await.unwrap();
2173 assert_eq!(status.budget.reason.as_deref(), Some("daily_usd"));
2174 assert!(status.budget.exhausted);
2175 assert!(status.last_error.as_deref().unwrap().contains("daily_usd"));
2176 }
2177
2178 #[tokio::test]
2179 async fn deterministic_predicate_hit_emits_value_event_with_avoided_cost() {
2180 let log = log();
2181 let binding = binding();
2182 let captured = Arc::new(Mutex::new(Vec::new()));
2183 let _registration = register_persona_value_sink(Arc::new(CapturingValueSink {
2184 events: captured.clone(),
2185 }));
2186 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2187
2188 let receipt = fire_trigger(
2189 &log,
2190 &binding,
2191 "github",
2192 "pull_request",
2193 BTreeMap::from([
2194 ("repository".to_string(), "burin-labs/harn".to_string()),
2195 ("number".to_string(), "715".to_string()),
2196 ]),
2197 PersonaRunCost {
2198 avoided_cost_usd: 0.0042,
2199 deterministic_steps: 1,
2200 metadata: json!({
2201 "predicate": "pr_already_green",
2202 "would_have_called_model": "gpt-5.4-mini",
2203 }),
2204 ..Default::default()
2205 },
2206 now,
2207 )
2208 .await
2209 .unwrap();
2210
2211 let run_id = receipt.run_id.expect("completed run has run_id");
2212 let events = captured.lock().unwrap().clone();
2213 let deterministic = events
2214 .iter()
2215 .find(|event| {
2216 event.kind == PersonaValueEventKind::DeterministicExecution
2217 && event.run_id == Some(run_id)
2218 })
2219 .expect("deterministic execution value event");
2220 assert_eq!(deterministic.persona_id, "merge_captain");
2221 assert_eq!(
2222 deterministic.template_ref.as_deref(),
2223 Some("software_factory@v0")
2224 );
2225 assert_eq!(deterministic.run_id, Some(run_id));
2226 assert_eq!(deterministic.paid_cost_usd, 0.0);
2227 assert_eq!(deterministic.avoided_cost_usd, 0.0042);
2228 assert_eq!(deterministic.deterministic_steps, 1);
2229 assert_eq!(
2230 deterministic.metadata["predicate"].as_str(),
2231 Some("pr_already_green")
2232 );
2233
2234 let persisted = read_persona_events(&log, &binding.name).await.unwrap();
2235 assert!(persisted.iter().any(|(_, event)| {
2236 event.kind == "persona.value.deterministic_execution"
2237 && event.payload["avoided_cost_usd"] == json!(0.0042)
2238 }));
2239 }
2240
2241 #[tokio::test]
2242 async fn frontier_escalation_run_emits_value_event_with_paid_cost() {
2243 let log = log();
2244 let binding = binding();
2245 let captured = Arc::new(Mutex::new(Vec::new()));
2246 let _registration = register_persona_value_sink(Arc::new(CapturingValueSink {
2247 events: captured.clone(),
2248 }));
2249 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2250
2251 let receipt = fire_trigger(
2252 &log,
2253 &binding,
2254 "linear",
2255 "issue",
2256 BTreeMap::from([("issue_key".to_string(), "HAR-715".to_string())]),
2257 PersonaRunCost {
2258 cost_usd: 0.011,
2259 tokens: 20,
2260 llm_steps: 1,
2261 frontier_escalations: 1,
2262 metadata: json!({
2263 "frontier_model": "gpt-5.4",
2264 "escalation_reason": "high_risk_merge",
2265 }),
2266 ..Default::default()
2267 },
2268 now,
2269 )
2270 .await
2271 .unwrap();
2272
2273 let run_id = receipt.run_id.expect("completed run has run_id");
2274 let events = captured.lock().unwrap().clone();
2275 let escalation = events
2276 .iter()
2277 .find(|event| {
2278 event.kind == PersonaValueEventKind::FrontierEscalation
2279 && event.run_id == Some(run_id)
2280 })
2281 .expect("frontier escalation value event");
2282 assert_eq!(escalation.run_id, Some(run_id));
2283 assert_eq!(escalation.paid_cost_usd, 0.011);
2284 assert_eq!(escalation.avoided_cost_usd, 0.0);
2285 assert_eq!(escalation.llm_steps, 1);
2286 assert_eq!(
2287 escalation.metadata["frontier_model"].as_str(),
2288 Some("gpt-5.4")
2289 );
2290
2291 let completion = events
2292 .iter()
2293 .find(|event| {
2294 event.kind == PersonaValueEventKind::RunCompleted && event.run_id == Some(run_id)
2295 })
2296 .expect("run completed value event");
2297 assert_eq!(completion.paid_cost_usd, 0.0);
2298 }
2299
2300 struct CapturingSupervisionSink {
2301 events: Arc<Mutex<Vec<PersonaSupervisionEvent>>>,
2302 }
2303
2304 impl PersonaSupervisionSink for CapturingSupervisionSink {
2305 fn handle_supervision_event(&self, event: &PersonaSupervisionEvent) {
2306 self.events.lock().unwrap().push(event.clone());
2307 }
2308 }
2309
2310 fn pr_metadata(repository: &str, number: &str) -> BTreeMap<String, String> {
2311 BTreeMap::from([
2312 ("repository".to_string(), repository.to_string()),
2313 ("number".to_string(), number.to_string()),
2314 ])
2315 }
2316
2317 fn binding_named(name: &str) -> PersonaRuntimeBinding {
2318 PersonaRuntimeBinding {
2319 name: name.to_string(),
2320 ..binding()
2321 }
2322 }
2323
2324 fn supervision_events_for(
2325 captured: &Arc<Mutex<Vec<PersonaSupervisionEvent>>>,
2326 persona: &str,
2327 ) -> Vec<PersonaSupervisionEvent> {
2328 captured
2329 .lock()
2330 .unwrap()
2331 .iter()
2332 .filter(|event| match event {
2333 PersonaSupervisionEvent::QueuePosition(update) => update.persona_id == persona,
2334 PersonaSupervisionEvent::RepairWorkerStatus(update) => update.persona_id == persona,
2335 PersonaSupervisionEvent::Receipt(update) => update.persona_id == persona,
2336 PersonaSupervisionEvent::Checkpoint(update) => update.persona_id == persona,
2337 })
2338 .cloned()
2339 .collect()
2340 }
2341
2342 async fn drive_pause_then_resume(binding: &PersonaRuntimeBinding, now: i64) {
2343 let log = log();
2344 pause_persona(&log, binding, now).await.unwrap();
2345 let _ = fire_trigger(
2346 &log,
2347 binding,
2348 "github",
2349 "pull_request",
2350 pr_metadata("burin-labs/harn", "1480"),
2351 PersonaRunCost::default(),
2352 now,
2353 )
2354 .await
2355 .unwrap();
2356 let _ = resume_persona(&log, binding, now + 1).await.unwrap();
2357 let _ = restore_persona_checkpoint(
2358 &log,
2359 binding,
2360 PersonaCheckpointRestoreRequest {
2361 checkpoint_id: "cp_42".to_string(),
2362 work_key: Some("github:burin-labs/harn:pr:1480".to_string()),
2363 resumed_from: Some(PersonaCheckpointResume {
2364 note: Some("resumed from cp 42".to_string()),
2365 ..Default::default()
2366 }),
2367 },
2368 now + 2,
2369 )
2370 .await
2371 .unwrap();
2372 let _ = report_repair_worker_status(
2373 &log,
2374 binding,
2375 PersonaRepairWorkerStatusUpdate {
2376 persona_id: String::new(),
2377 template_ref: None,
2378 repair_worker_id: "rw_42".to_string(),
2379 lifecycle: PersonaRepairWorkerLifecycle::Running,
2380 work_key: Some("github:burin-labs/harn:pr:1480".to_string()),
2381 lease_id: Some("persona_lease_xyz".to_string()),
2382 scratchpad_url: Some("https://factory.local/rw_42".to_string()),
2383 last_heartbeat_ms: 0,
2384 occurred_at_ms: 0,
2385 },
2386 now + 3,
2387 )
2388 .await
2389 .unwrap();
2390 }
2391
2392 #[tokio::test]
2393 async fn supervision_sink_emits_queue_position_and_receipt() {
2394 let captured = Arc::new(Mutex::new(Vec::new()));
2395 let _registration = register_persona_supervision_sink(Arc::new(CapturingSupervisionSink {
2396 events: captured.clone(),
2397 }));
2398 let log = log();
2399 let binding = binding_named("supervision_sink_emits_queue_position_and_receipt");
2400 let now = parse_rfc3339_ms("2026-05-01T00:00:00Z").unwrap();
2401
2402 pause_persona(&log, &binding, now).await.unwrap();
2403 fire_trigger(
2404 &log,
2405 &binding,
2406 "github",
2407 "pull_request",
2408 pr_metadata("burin-labs/harn", "1480"),
2409 PersonaRunCost::default(),
2410 now + 100,
2411 )
2412 .await
2413 .unwrap();
2414 resume_persona(&log, &binding, now + 200).await.unwrap();
2415
2416 let events = supervision_events_for(&captured, &binding.name);
2417 let queue_events: Vec<_> = events
2418 .iter()
2419 .filter_map(|event| match event {
2420 PersonaSupervisionEvent::QueuePosition(update) => Some(update.clone()),
2421 _ => None,
2422 })
2423 .collect();
2424 assert_eq!(queue_events.len(), 2, "enqueue + drain emitted");
2425 assert_eq!(queue_events[0].position, 1);
2426 assert_eq!(queue_events[0].queue_depth, 1);
2427 assert_eq!(queue_events[1].position, 0);
2428 assert_eq!(queue_events[1].queue_depth, 0);
2429 let receipt_events: Vec<_> = events
2430 .iter()
2431 .filter_map(|event| match event {
2432 PersonaSupervisionEvent::Receipt(update) => Some(update.clone()),
2433 _ => None,
2434 })
2435 .collect();
2436 assert_eq!(receipt_events.len(), 2, "queued + drained receipt");
2437 assert_eq!(receipt_events[0].receipt.status, "queued");
2438 assert_eq!(receipt_events[1].receipt.status, "completed");
2439 for event in &receipt_events {
2440 assert_eq!(event.receipt.persona, binding.name);
2441 assert_eq!(event.persona_id, binding.name);
2442 assert_eq!(event.template_ref.as_deref(), Some("software_factory@v0"));
2443 }
2444 let persisted_kinds: Vec<_> = read_persona_events(&log, &binding.name)
2445 .await
2446 .unwrap()
2447 .into_iter()
2448 .map(|(_, event)| event.kind)
2449 .collect();
2450 assert!(
2451 persisted_kinds
2452 .iter()
2453 .any(|kind| kind == "persona.supervision.queue_position"),
2454 "queue_position supervision events should be durable: {persisted_kinds:?}"
2455 );
2456 assert!(
2457 persisted_kinds
2458 .iter()
2459 .any(|kind| kind == "persona.supervision.receipt"),
2460 "receipt supervision events should be durable: {persisted_kinds:?}"
2461 );
2462 }
2463
2464 #[tokio::test]
2465 async fn supervision_sink_emits_repair_worker_status_idempotently() {
2466 let captured = Arc::new(Mutex::new(Vec::new()));
2467 let _registration = register_persona_supervision_sink(Arc::new(CapturingSupervisionSink {
2468 events: captured.clone(),
2469 }));
2470 let log = log();
2471 let binding = binding_named("supervision_sink_emits_repair_worker_status_idempotently");
2472 let now = parse_rfc3339_ms("2026-05-01T01:00:00Z").unwrap();
2473 let update = PersonaRepairWorkerStatusUpdate {
2474 persona_id: String::new(),
2475 template_ref: None,
2476 repair_worker_id: "rw_test".to_string(),
2477 lifecycle: PersonaRepairWorkerLifecycle::Running,
2478 work_key: Some("github:burin-labs/harn:pr:1480".to_string()),
2479 lease_id: Some("persona_lease_abc".to_string()),
2480 scratchpad_url: Some("https://factory.local/rw_test".to_string()),
2481 last_heartbeat_ms: 0,
2482 occurred_at_ms: 0,
2483 };
2484 let first = report_repair_worker_status(&log, &binding, update.clone(), now)
2485 .await
2486 .unwrap();
2487 let second = report_repair_worker_status(&log, &binding, update.clone(), now + 5)
2488 .await
2489 .unwrap();
2490 assert!(first);
2491 assert!(!second, "second identical lifecycle is idempotent");
2492
2493 let mut next = update.clone();
2494 next.lifecycle = PersonaRepairWorkerLifecycle::Succeeded;
2495 let third = report_repair_worker_status(&log, &binding, next, now + 10)
2496 .await
2497 .unwrap();
2498 assert!(third);
2499
2500 let kinds: Vec<_> = supervision_events_for(&captured, &binding.name)
2501 .into_iter()
2502 .filter_map(|event| match event {
2503 PersonaSupervisionEvent::RepairWorkerStatus(update) => Some(update.lifecycle),
2504 _ => None,
2505 })
2506 .collect();
2507 assert_eq!(
2508 kinds,
2509 vec![
2510 PersonaRepairWorkerLifecycle::Running,
2511 PersonaRepairWorkerLifecycle::Succeeded
2512 ]
2513 );
2514 }
2515
2516 #[tokio::test]
2517 async fn supervision_sink_emits_checkpoint_restore_ack() {
2518 let captured = Arc::new(Mutex::new(Vec::new()));
2519 let _registration = register_persona_supervision_sink(Arc::new(CapturingSupervisionSink {
2520 events: captured.clone(),
2521 }));
2522 let log = log();
2523 let binding = binding_named("supervision_sink_emits_checkpoint_restore_ack");
2524 let now = parse_rfc3339_ms("2026-05-01T02:00:00Z").unwrap();
2525 fire_trigger(
2526 &log,
2527 &binding,
2528 "github",
2529 "pull_request",
2530 pr_metadata("burin-labs/harn", "1480"),
2531 PersonaRunCost::default(),
2532 now,
2533 )
2534 .await
2535 .unwrap();
2536
2537 let outcome = restore_persona_checkpoint(
2538 &log,
2539 &binding,
2540 PersonaCheckpointRestoreRequest {
2541 checkpoint_id: "cp_1".to_string(),
2542 work_key: Some("github:burin-labs/harn:pr:1480".to_string()),
2543 resumed_from: None,
2544 },
2545 now + 100,
2546 )
2547 .await
2548 .unwrap();
2549 assert!(outcome.acked);
2550 assert_eq!(outcome.update.checkpoint_id, "cp_1");
2551 let resume = outcome
2552 .update
2553 .resumed_from
2554 .as_ref()
2555 .expect("resume coordinates default-derived from status");
2556 assert_eq!(resume.last_run_ms, Some(now));
2557
2558 let replay = restore_persona_checkpoint(
2559 &log,
2560 &binding,
2561 PersonaCheckpointRestoreRequest {
2562 checkpoint_id: "cp_1".to_string(),
2563 work_key: None,
2564 resumed_from: None,
2565 },
2566 now + 200,
2567 )
2568 .await
2569 .unwrap();
2570 assert!(!replay.acked, "duplicate restore is a no-op ack");
2571 assert_eq!(replay.update.occurred_at_ms, now + 100);
2572
2573 let ack_events: Vec<_> = supervision_events_for(&captured, &binding.name)
2574 .into_iter()
2575 .filter_map(|event| match event {
2576 PersonaSupervisionEvent::Checkpoint(update) => Some(update),
2577 _ => None,
2578 })
2579 .collect();
2580 assert_eq!(ack_events.len(), 1, "ack emitted once, replay suppressed");
2581 assert_eq!(ack_events[0].action, PersonaCheckpointAction::RestoreAcked);
2582 }
2583
2584 #[tokio::test]
2585 async fn supervision_sink_replay_is_deterministic_under_recorded_clock() {
2586 use harn_clock::{ClockEventLog, PausedClock, RecordedClock};
2587 use time::OffsetDateTime;
2588
2589 let now_ms = parse_rfc3339_ms("2026-05-01T03:00:00Z").unwrap();
2590 async fn drive(now_ms: i64) -> (Vec<PersonaSupervisionEvent>, Vec<harn_clock::ClockEvent>) {
2591 let captured = Arc::new(Mutex::new(Vec::new()));
2592 let _registration =
2593 register_persona_supervision_sink(Arc::new(CapturingSupervisionSink {
2594 events: captured.clone(),
2595 }));
2596 let paused = PausedClock::new(
2597 OffsetDateTime::from_unix_timestamp_nanos((now_ms as i128) * 1_000_000).unwrap(),
2598 );
2599 let recorded = Arc::new(RecordedClock::new(paused, Arc::new(ClockEventLog::new())));
2600 let binding = binding_named("supervision_replay_persona");
2601 let ts = harn_clock::now_wall_ms(&*recorded);
2602 drive_pause_then_resume(&binding, ts).await;
2603 let clock_log = recorded.log().snapshot();
2604 let events = supervision_events_for(&captured, &binding.name);
2605 (events, clock_log)
2606 }
2607
2608 let (events_a, clock_a) = drive(now_ms).await;
2609 let (events_b, clock_b) = drive(now_ms).await;
2610 fn normalize(event: &PersonaSupervisionEvent) -> PersonaSupervisionEvent {
2614 match event.clone() {
2615 PersonaSupervisionEvent::Receipt(mut update) => {
2616 update.receipt.run_id = None;
2617 if let Some(lease) = update.receipt.lease.as_mut() {
2618 lease.id = "lease".to_string();
2619 }
2620 update.receipt.budget_receipt_id = update
2621 .receipt
2622 .budget_receipt_id
2623 .map(|_| "budget".to_string());
2624 PersonaSupervisionEvent::Receipt(update)
2625 }
2626 PersonaSupervisionEvent::Checkpoint(mut update) => {
2627 if let Some(resume) = update.resumed_from.as_mut() {
2628 resume.run_id = None;
2629 resume.lease_id = None;
2630 }
2631 PersonaSupervisionEvent::Checkpoint(update)
2632 }
2633 other => other,
2634 }
2635 }
2636 let a: Vec<_> = events_a.iter().map(normalize).collect();
2637 let b: Vec<_> = events_b.iter().map(normalize).collect();
2638 assert_eq!(a, b, "supervision sink emits identical event envelopes");
2639 assert_eq!(
2640 clock_a, clock_b,
2641 "recorded clock observation log is identical across replays"
2642 );
2643 }
2644}