1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::sync::{Arc, OnceLock, RwLock};
4
5use chrono::{TimeZone, Utc};
6use croner::Cron;
7use serde::{Deserialize, Serialize};
8use serde_json::json;
9use time::format_description::well_known::Rfc3339;
10use time::OffsetDateTime;
11use uuid::Uuid;
12
13use crate::event_log::{AnyEventLog, EventLog, LogEvent, Topic};
14
15pub const PERSONA_RUNTIME_TOPIC: &str = "persona.runtime.events";
16
17const DEFAULT_LEASE_TTL_MS: i64 = 5 * 60 * 1000;
18
19#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
20#[serde(rename_all = "snake_case")]
21pub enum PersonaLifecycleState {
22 Inactive,
23 Starting,
24 #[default]
25 Idle,
26 Running,
27 Paused,
28 Draining,
29 Failed,
30 Disabled,
31}
32
33impl PersonaLifecycleState {
34 pub fn as_str(self) -> &'static str {
35 match self {
36 Self::Inactive => "inactive",
37 Self::Starting => "starting",
38 Self::Idle => "idle",
39 Self::Running => "running",
40 Self::Paused => "paused",
41 Self::Draining => "draining",
42 Self::Failed => "failed",
43 Self::Disabled => "disabled",
44 }
45 }
46}
47
48#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
49pub struct PersonaBudgetPolicy {
50 pub daily_usd: Option<f64>,
51 pub hourly_usd: Option<f64>,
52 pub run_usd: Option<f64>,
53 pub max_tokens: Option<u64>,
54}
55
56#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
57pub struct PersonaRuntimeBinding {
58 pub name: String,
59 #[serde(default)]
60 pub template_ref: Option<String>,
61 pub entry_workflow: String,
62 #[serde(default)]
63 pub schedules: Vec<String>,
64 #[serde(default)]
65 pub triggers: Vec<String>,
66 #[serde(default)]
67 pub budget: PersonaBudgetPolicy,
68 #[serde(default)]
73 pub stages: Vec<StageDecl>,
74}
75
76#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
84pub struct StageDecl {
85 pub name: String,
86 #[serde(default, skip_serializing_if = "Option::is_none")]
89 pub allowed_tools: Option<Vec<String>>,
90 #[serde(default, skip_serializing_if = "Option::is_none")]
94 pub side_effect_level: Option<String>,
95 #[serde(default, skip_serializing_if = "Option::is_none")]
98 pub max_iterations: Option<u32>,
99 #[serde(default, skip_serializing_if = "Option::is_none")]
102 pub on_exit: Option<StageExit>,
103}
104
105#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
106pub struct StageExit {
107 #[serde(default, skip_serializing_if = "Option::is_none")]
108 pub on_complete: Option<String>,
109 #[serde(default, skip_serializing_if = "Option::is_none")]
110 pub on_failure: Option<String>,
111 #[serde(default, skip_serializing_if = "Option::is_none")]
112 pub policy_override: Option<crate::orchestration::CapabilityPolicy>,
113}
114
115#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
116pub struct PersonaLease {
117 pub id: String,
118 pub holder: String,
119 pub work_key: String,
120 pub acquired_at_ms: i64,
121 pub expires_at_ms: i64,
122}
123
124#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
125pub struct PersonaBudgetStatus {
126 pub daily_usd: Option<f64>,
127 pub hourly_usd: Option<f64>,
128 pub run_usd: Option<f64>,
129 pub max_tokens: Option<u64>,
130 pub spent_today_usd: f64,
131 pub spent_this_hour_usd: f64,
132 pub spent_last_run_usd: f64,
133 pub tokens_today: u64,
134 pub remaining_today_usd: Option<f64>,
135 pub remaining_hour_usd: Option<f64>,
136 pub exhausted: bool,
137 pub reason: Option<String>,
138 pub last_receipt_id: Option<String>,
139}
140
141#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
142pub struct PersonaStatus {
143 pub name: String,
144 #[serde(default)]
145 pub template_ref: Option<String>,
146 pub state: PersonaLifecycleState,
147 pub entry_workflow: String,
148 #[serde(default)]
149 pub role: String,
150 #[serde(default)]
151 pub current_assignment: Option<PersonaAssignmentStatus>,
152 pub last_run: Option<String>,
153 pub next_scheduled_run: Option<String>,
154 pub active_lease: Option<PersonaLease>,
155 pub budget: PersonaBudgetStatus,
156 pub last_error: Option<String>,
157 pub queued_events: usize,
158 #[serde(default)]
159 pub queued_work: Vec<PersonaQueuedWork>,
160 #[serde(default)]
161 pub handoff_inbox: Vec<PersonaHandoffInboxItem>,
162 #[serde(default)]
163 pub value_receipts: Vec<PersonaValueReceipt>,
164 pub disabled_events: usize,
165 pub paused_event_policy: String,
166}
167
168#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
169pub struct PersonaAssignmentStatus {
170 pub work_key: String,
171 pub lease_id: String,
172 pub holder: String,
173 pub acquired_at: String,
174 pub expires_at: String,
175}
176
177#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
178pub struct PersonaQueuedWork {
179 pub work_key: String,
180 pub provider: String,
181 pub kind: String,
182 pub queued_at: String,
183 pub reason: String,
184 pub source_event_id: Option<String>,
185 #[serde(default)]
186 pub metadata: BTreeMap<String, String>,
187}
188
189#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
190pub struct PersonaHandoffInboxItem {
191 pub work_key: String,
192 pub handoff_id: Option<String>,
193 pub handoff_kind: Option<String>,
194 pub source_persona: Option<String>,
195 pub task: Option<String>,
196 pub queued_at: String,
197 pub reason: String,
198}
199
200#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
201pub struct PersonaValueReceipt {
202 pub kind: PersonaValueEventKind,
203 pub run_id: Option<Uuid>,
204 pub occurred_at: String,
205 pub paid_cost_usd: f64,
206 pub avoided_cost_usd: f64,
207 pub deterministic_steps: i64,
208 pub llm_steps: i64,
209 pub metadata: serde_json::Value,
210}
211
212#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
213pub struct PersonaTriggerEnvelope {
214 pub provider: String,
215 pub kind: String,
216 pub subject_key: String,
217 pub source_event_id: Option<String>,
218 pub received_at_ms: i64,
219 #[serde(default)]
220 pub metadata: BTreeMap<String, String>,
221 #[serde(default)]
222 pub raw: serde_json::Value,
223}
224
225#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
226pub struct PersonaRunReceipt {
227 pub status: String,
228 pub persona: String,
229 #[serde(default)]
230 pub run_id: Option<Uuid>,
231 pub work_key: String,
232 pub lease: Option<PersonaLease>,
233 pub queued: bool,
234 pub error: Option<String>,
235 pub budget_receipt_id: Option<String>,
236}
237
238#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
239pub struct PersonaRunCost {
240 pub cost_usd: f64,
241 pub tokens: u64,
242 #[serde(default)]
243 pub avoided_cost_usd: f64,
244 #[serde(default)]
245 pub deterministic_steps: i64,
246 #[serde(default)]
247 pub llm_steps: i64,
248 #[serde(default)]
249 pub frontier_escalations: i64,
250 #[serde(default)]
251 pub metadata: serde_json::Value,
252}
253
254#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
255#[serde(rename_all = "snake_case")]
256pub enum PersonaValueEventKind {
257 RunStarted,
258 RunCompleted,
259 AcceptedOutcome,
260 FrontierEscalation,
261 DeterministicExecution,
262 PromotionSavings,
263 ApprovalWait,
264}
265
266impl PersonaValueEventKind {
267 pub fn as_str(self) -> &'static str {
268 match self {
269 Self::RunStarted => "run_started",
270 Self::RunCompleted => "run_completed",
271 Self::AcceptedOutcome => "accepted_outcome",
272 Self::FrontierEscalation => "frontier_escalation",
273 Self::DeterministicExecution => "deterministic_execution",
274 Self::PromotionSavings => "promotion_savings",
275 Self::ApprovalWait => "approval_wait",
276 }
277 }
278}
279
280#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
281pub struct PersonaValueEvent {
282 pub persona_id: String,
283 pub template_ref: Option<String>,
284 pub run_id: Option<Uuid>,
285 pub kind: PersonaValueEventKind,
286 pub paid_cost_usd: f64,
287 pub avoided_cost_usd: f64,
288 pub deterministic_steps: i64,
289 pub llm_steps: i64,
290 pub metadata: serde_json::Value,
291 pub occurred_at: OffsetDateTime,
292}
293
294pub trait PersonaValueSink: Send + Sync {
295 fn handle_value_event(&self, event: &PersonaValueEvent);
296}
297
298#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
305#[serde(tag = "update_kind", rename_all = "snake_case")]
306pub enum PersonaSupervisionEvent {
307 QueuePosition(PersonaQueuePositionUpdate),
308 RepairWorkerStatus(PersonaRepairWorkerStatusUpdate),
309 Receipt(PersonaReceiptUpdate),
310 Checkpoint(PersonaCheckpointUpdate),
311}
312
313impl PersonaSupervisionEvent {
314 pub fn update_kind(&self) -> &'static str {
315 match self {
316 Self::QueuePosition(_) => "queue_position",
317 Self::RepairWorkerStatus(_) => "repair_worker_status",
318 Self::Receipt(_) => "receipt",
319 Self::Checkpoint(_) => "checkpoint",
320 }
321 }
322
323 pub fn persona_id(&self) -> &str {
324 match self {
325 Self::QueuePosition(update) => &update.persona_id,
326 Self::RepairWorkerStatus(update) => &update.persona_id,
327 Self::Receipt(update) => &update.persona_id,
328 Self::Checkpoint(update) => &update.persona_id,
329 }
330 }
331
332 pub fn template_ref(&self) -> Option<&str> {
333 match self {
334 Self::QueuePosition(update) => update.template_ref.as_deref(),
335 Self::RepairWorkerStatus(update) => update.template_ref.as_deref(),
336 Self::Receipt(update) => update.template_ref.as_deref(),
337 Self::Checkpoint(update) => update.template_ref.as_deref(),
338 }
339 }
340
341 pub fn occurred_at_ms(&self) -> i64 {
342 match self {
343 Self::QueuePosition(update) => update.occurred_at_ms,
344 Self::RepairWorkerStatus(update) => update.occurred_at_ms,
345 Self::Receipt(update) => update.occurred_at_ms,
346 Self::Checkpoint(update) => update.occurred_at_ms,
347 }
348 }
349}
350
351#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
352pub struct PersonaQueuePositionUpdate {
353 pub persona_id: String,
354 #[serde(default)]
355 pub template_ref: Option<String>,
356 pub work_key: String,
357 pub queue_depth: i64,
358 pub position: i64,
360 pub queued_at_ms: i64,
361 pub occurred_at_ms: i64,
362}
363
364#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
365#[serde(rename_all = "snake_case")]
366pub enum PersonaRepairWorkerLifecycle {
367 Pending,
368 Running,
369 Verifying,
370 Pushing,
371 Succeeded,
372 Failed,
373 Cancelled,
374}
375
376impl PersonaRepairWorkerLifecycle {
377 pub fn as_str(self) -> &'static str {
378 match self {
379 Self::Pending => "pending",
380 Self::Running => "running",
381 Self::Verifying => "verifying",
382 Self::Pushing => "pushing",
383 Self::Succeeded => "succeeded",
384 Self::Failed => "failed",
385 Self::Cancelled => "cancelled",
386 }
387 }
388}
389
390#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
391pub struct PersonaRepairWorkerStatusUpdate {
392 pub persona_id: String,
393 #[serde(default)]
394 pub template_ref: Option<String>,
395 pub repair_worker_id: String,
396 pub lifecycle: PersonaRepairWorkerLifecycle,
397 #[serde(default)]
398 pub work_key: Option<String>,
399 #[serde(default)]
400 pub lease_id: Option<String>,
401 #[serde(default)]
402 pub scratchpad_url: Option<String>,
403 pub last_heartbeat_ms: i64,
404 pub occurred_at_ms: i64,
405}
406
407#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
408pub struct PersonaReceiptUpdate {
409 pub persona_id: String,
410 #[serde(default)]
411 pub template_ref: Option<String>,
412 pub receipt: PersonaRunReceipt,
413 pub occurred_at_ms: i64,
414}
415
416#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
417pub struct PersonaCheckpointUpdate {
418 pub persona_id: String,
419 #[serde(default)]
420 pub template_ref: Option<String>,
421 pub action: PersonaCheckpointAction,
422 pub checkpoint_id: String,
423 #[serde(default)]
424 pub work_key: Option<String>,
425 #[serde(default)]
427 pub resumed_from: Option<PersonaCheckpointResume>,
428 pub occurred_at_ms: i64,
429}
430
431#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
432#[serde(rename_all = "snake_case")]
433pub enum PersonaCheckpointAction {
434 RestoreAcked,
435}
436
437impl PersonaCheckpointAction {
438 pub fn as_str(self) -> &'static str {
439 match self {
440 Self::RestoreAcked => "restore_acked",
441 }
442 }
443}
444
445#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
446pub struct PersonaCheckpointResume {
447 pub run_id: Option<Uuid>,
448 pub lease_id: Option<String>,
449 pub last_run_ms: Option<i64>,
450 pub queued_work_keys: Vec<String>,
451 #[serde(default)]
452 pub note: Option<String>,
453}
454
455pub trait PersonaSupervisionSink: Send + Sync {
456 fn handle_supervision_event(&self, event: &PersonaSupervisionEvent);
457}
458
459struct TypedSinkRegistry<T: ?Sized + Send + Sync> {
460 sinks: RwLock<Vec<(u64, Arc<T>)>>,
461 next_id: AtomicU64,
462}
463
464impl<T: ?Sized + Send + Sync> TypedSinkRegistry<T> {
465 const fn new() -> Self {
466 Self {
467 sinks: RwLock::new(Vec::new()),
468 next_id: AtomicU64::new(1),
469 }
470 }
471
472 fn register(&self, sink: Arc<T>) -> u64 {
473 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
474 if let Ok(mut sinks) = self.sinks.write() {
475 sinks.push((id, sink));
476 }
477 id
478 }
479
480 fn unregister(&self, id: u64) {
481 if let Ok(mut sinks) = self.sinks.write() {
482 sinks.retain(|(existing, _)| *existing != id);
483 }
484 }
485
486 fn snapshot(&self) -> Vec<Arc<T>> {
487 self.sinks
488 .read()
489 .map(|sinks| sinks.iter().map(|(_, sink)| Arc::clone(sink)).collect())
490 .unwrap_or_default()
491 }
492}
493
494fn persona_value_sinks() -> &'static TypedSinkRegistry<dyn PersonaValueSink> {
495 static REGISTRY: OnceLock<TypedSinkRegistry<dyn PersonaValueSink>> = OnceLock::new();
496 REGISTRY.get_or_init(TypedSinkRegistry::new)
497}
498
499fn persona_supervision_sinks() -> &'static TypedSinkRegistry<dyn PersonaSupervisionSink> {
500 static REGISTRY: OnceLock<TypedSinkRegistry<dyn PersonaSupervisionSink>> = OnceLock::new();
501 REGISTRY.get_or_init(TypedSinkRegistry::new)
502}
503
504#[must_use = "dropping the registration immediately unregisters the sink"]
505pub struct PersonaValueSinkRegistration {
506 id: u64,
507}
508
509impl Drop for PersonaValueSinkRegistration {
510 fn drop(&mut self) {
511 persona_value_sinks().unregister(self.id);
512 }
513}
514
515pub fn register_persona_value_sink(
516 sink: Arc<dyn PersonaValueSink>,
517) -> PersonaValueSinkRegistration {
518 PersonaValueSinkRegistration {
519 id: persona_value_sinks().register(sink),
520 }
521}
522
523#[must_use = "dropping the registration immediately unregisters the sink"]
524pub struct PersonaSupervisionSinkRegistration {
525 id: u64,
526}
527
528impl Drop for PersonaSupervisionSinkRegistration {
529 fn drop(&mut self) {
530 persona_supervision_sinks().unregister(self.id);
531 }
532}
533
534pub fn register_persona_supervision_sink(
535 sink: Arc<dyn PersonaSupervisionSink>,
536) -> PersonaSupervisionSinkRegistration {
537 PersonaSupervisionSinkRegistration {
538 id: persona_supervision_sinks().register(sink),
539 }
540}
541
542pub async fn persona_status(
543 log: &Arc<AnyEventLog>,
544 binding: &PersonaRuntimeBinding,
545 now_ms: i64,
546) -> Result<PersonaStatus, String> {
547 let events = read_persona_events(log, &binding.name).await?;
548 let mut state = PersonaLifecycleState::Idle;
549 let mut last_run_ms = None;
550 let mut active_lease = None;
551 let mut last_error = None;
552 let mut queued = BTreeSet::<String>::new();
553 let mut completed = BTreeSet::<String>::new();
554 let mut disabled_events = 0usize;
555 let mut budget_receipt = None;
556 let mut budget_exhaustion_reason = None;
557 let mut spent = Vec::<(i64, f64, u64)>::new();
558 let mut queued_work = BTreeMap::<String, PersonaQueuedWork>::new();
559 let mut value_receipts = Vec::<PersonaValueReceipt>::new();
560
561 for (_, event) in events {
562 match event.kind.as_str() {
563 "persona.control.paused" => state = PersonaLifecycleState::Paused,
564 "persona.control.resumed" => state = PersonaLifecycleState::Idle,
565 "persona.control.disabled" => state = PersonaLifecycleState::Disabled,
566 "persona.control.draining" => state = PersonaLifecycleState::Draining,
567 "persona.lease.acquired" => {
568 if let Ok(lease) = serde_json::from_value::<PersonaLease>(event.payload.clone()) {
569 active_lease = Some(lease);
570 state = PersonaLifecycleState::Running;
571 }
572 }
573 "persona.lease.released" => {
574 active_lease = None;
575 if !matches!(
576 state,
577 PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
578 ) {
579 state = PersonaLifecycleState::Idle;
580 }
581 }
582 "persona.lease.expired" => {
583 active_lease = None;
584 if !matches!(
585 state,
586 PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
587 ) {
588 state = PersonaLifecycleState::Idle;
589 }
590 }
591 "persona.run.started" => state = PersonaLifecycleState::Running,
592 "persona.run.completed" => {
593 last_run_ms = event
594 .payload
595 .get("completed_at_ms")
596 .and_then(serde_json::Value::as_i64)
597 .or(Some(event.occurred_at_ms));
598 if let Some(work_key) = event
599 .payload
600 .get("work_key")
601 .and_then(serde_json::Value::as_str)
602 {
603 completed.insert(work_key.to_string());
604 }
605 if !matches!(
606 state,
607 PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
608 ) {
609 state = PersonaLifecycleState::Idle;
610 }
611 }
612 "persona.run.failed" => {
613 state = PersonaLifecycleState::Failed;
614 last_error = event
615 .payload
616 .get("error")
617 .and_then(serde_json::Value::as_str)
618 .map(ToString::to_string);
619 }
620 "persona.trigger.queued" => {
621 if let Some(work_key) = event
622 .payload
623 .get("work_key")
624 .and_then(serde_json::Value::as_str)
625 {
626 queued.insert(work_key.to_string());
627 }
628 if let Some(item) = queued_work_from_event(&event)? {
629 queued_work.insert(item.work_key.clone(), item);
630 }
631 }
632 "persona.trigger.dead_lettered" => disabled_events += 1,
633 "persona.budget.recorded" => {
634 budget_receipt = event
635 .payload
636 .get("receipt_id")
637 .and_then(serde_json::Value::as_str)
638 .map(ToString::to_string);
639 spent.push((
640 event.occurred_at_ms,
641 event
642 .payload
643 .get("cost_usd")
644 .and_then(serde_json::Value::as_f64)
645 .unwrap_or_default(),
646 event
647 .payload
648 .get("tokens")
649 .and_then(serde_json::Value::as_u64)
650 .unwrap_or_default(),
651 ));
652 }
653 "persona.budget.exhausted" => {
654 budget_exhaustion_reason = event
655 .payload
656 .get("reason")
657 .and_then(serde_json::Value::as_str)
658 .map(ToString::to_string);
659 last_error = budget_exhaustion_reason
660 .as_ref()
661 .map(|reason| format!("persona budget exhausted: {reason}"));
662 budget_receipt = event
663 .payload
664 .get("receipt_id")
665 .and_then(serde_json::Value::as_str)
666 .map(ToString::to_string);
667 }
668 kind if kind.starts_with("persona.value.") => {
669 if let Some(receipt) = value_receipt_from_event(&event)? {
670 value_receipts.push(receipt);
671 }
672 }
673 _ => {}
674 }
675 }
676
677 if let Some(lease) = active_lease.as_ref() {
678 if lease.expires_at_ms <= now_ms {
679 active_lease = None;
680 if !matches!(
681 state,
682 PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
683 ) {
684 state = PersonaLifecycleState::Idle;
685 }
686 }
687 }
688
689 queued.retain(|work_key| !completed.contains(work_key));
690 queued_work.retain(|work_key, _| !completed.contains(work_key));
691 let queued_work = queued_work.into_values().collect::<Vec<_>>();
692 let handoff_inbox = queued_work
693 .iter()
694 .filter_map(handoff_inbox_item)
695 .collect::<Vec<_>>();
696
697 let mut budget = budget_status(&binding.budget, &spent, now_ms);
698 if budget.reason.is_none() {
699 if let Some(reason) = budget_exhaustion_reason {
700 budget.exhausted = true;
701 budget.reason = Some(reason);
702 }
703 }
704 if budget.last_receipt_id.is_none() {
705 budget.last_receipt_id = budget_receipt;
706 }
707
708 let current_assignment = active_lease.as_ref().map(assignment_status_from_lease);
709
710 Ok(PersonaStatus {
711 name: binding.name.clone(),
712 template_ref: binding.template_ref.clone(),
713 state,
714 entry_workflow: binding.entry_workflow.clone(),
715 role: binding.name.clone(),
716 current_assignment,
717 last_run: last_run_ms.map(format_ms),
718 next_scheduled_run: next_scheduled_run(binding, last_run_ms, now_ms),
719 active_lease,
720 budget,
721 last_error,
722 queued_events: queued.len(),
723 queued_work,
724 handoff_inbox,
725 value_receipts,
726 disabled_events,
727 paused_event_policy: "queue_then_drain_on_resume".to_string(),
728 })
729}
730
731pub async fn pause_persona(
732 log: &Arc<AnyEventLog>,
733 binding: &PersonaRuntimeBinding,
734 now_ms: i64,
735) -> Result<PersonaStatus, String> {
736 append_persona_event(
737 log,
738 &binding.name,
739 "persona.control.paused",
740 json!({"paused_at_ms": now_ms, "policy": "queue_then_drain_on_resume"}),
741 now_ms,
742 )
743 .await?;
744 persona_status(log, binding, now_ms).await
745}
746
747pub async fn resume_persona(
748 log: &Arc<AnyEventLog>,
749 binding: &PersonaRuntimeBinding,
750 now_ms: i64,
751) -> Result<PersonaStatus, String> {
752 append_persona_event(
753 log,
754 &binding.name,
755 "persona.control.resumed",
756 json!({"resumed_at_ms": now_ms, "drain": true}),
757 now_ms,
758 )
759 .await?;
760 let queued = queued_events(log, &binding.name).await?;
761 for (envelope, cost) in queued {
762 let _ = run_for_envelope(log, binding, envelope, cost, now_ms).await?;
763 }
764 persona_status(log, binding, now_ms).await
765}
766
767pub async fn disable_persona(
768 log: &Arc<AnyEventLog>,
769 binding: &PersonaRuntimeBinding,
770 now_ms: i64,
771) -> Result<PersonaStatus, String> {
772 append_persona_event(
773 log,
774 &binding.name,
775 "persona.control.disabled",
776 json!({"disabled_at_ms": now_ms}),
777 now_ms,
778 )
779 .await?;
780 persona_status(log, binding, now_ms).await
781}
782
783pub async fn fire_schedule(
784 log: &Arc<AnyEventLog>,
785 binding: &PersonaRuntimeBinding,
786 cost: PersonaRunCost,
787 now_ms: i64,
788) -> Result<PersonaRunReceipt, String> {
789 let schedule = binding
790 .schedules
791 .first()
792 .cloned()
793 .unwrap_or_else(|| "manual".to_string());
794 let envelope = PersonaTriggerEnvelope {
795 provider: "schedule".to_string(),
796 kind: "cron.tick".to_string(),
797 subject_key: format!("schedule:{}:{schedule}:{}", binding.name, format_ms(now_ms)),
798 source_event_id: None,
799 received_at_ms: now_ms,
800 metadata: BTreeMap::from([
801 ("persona".to_string(), binding.name.clone()),
802 ("schedule".to_string(), schedule),
803 ("fired_at".to_string(), format_ms(now_ms)),
804 ]),
805 raw: json!({}),
806 };
807 append_persona_event(
808 log,
809 &binding.name,
810 "persona.schedule.fired",
811 json!({"persona": binding.name, "envelope": envelope}),
812 now_ms,
813 )
814 .await?;
815 run_for_envelope(log, binding, envelope, cost, now_ms).await
816}
817
818pub async fn fire_trigger(
819 log: &Arc<AnyEventLog>,
820 binding: &PersonaRuntimeBinding,
821 provider: &str,
822 kind: &str,
823 metadata: BTreeMap<String, String>,
824 cost: PersonaRunCost,
825 now_ms: i64,
826) -> Result<PersonaRunReceipt, String> {
827 let envelope = normalize_trigger_envelope(provider, kind, metadata, now_ms);
828 append_persona_event(
829 log,
830 &binding.name,
831 "persona.trigger.received",
832 json!({"persona": binding.name, "envelope": envelope}),
833 now_ms,
834 )
835 .await?;
836 run_for_envelope(log, binding, envelope, cost, now_ms).await
837}
838
839pub async fn record_persona_spend(
840 log: &Arc<AnyEventLog>,
841 binding: &PersonaRuntimeBinding,
842 cost: PersonaRunCost,
843 now_ms: i64,
844) -> Result<PersonaBudgetStatus, String> {
845 enforce_budget(log, binding, &cost, now_ms).await?;
846 append_budget_record(log, &binding.name, &cost, None, now_ms).await?;
847 persona_status(log, binding, now_ms)
848 .await
849 .map(|status| status.budget)
850}
851
852pub async fn report_repair_worker_status(
860 log: &Arc<AnyEventLog>,
861 binding: &PersonaRuntimeBinding,
862 status: PersonaRepairWorkerStatusUpdate,
863 now_ms: i64,
864) -> Result<bool, String> {
865 let mut status = status;
866 if status.persona_id.is_empty() {
867 status.persona_id = binding.name.clone();
868 }
869 if status.template_ref.is_none() {
870 status.template_ref = binding.template_ref.clone();
871 }
872 if status.occurred_at_ms == 0 {
873 status.occurred_at_ms = now_ms;
874 }
875 if status.last_heartbeat_ms == 0 {
876 status.last_heartbeat_ms = now_ms;
877 }
878
879 if repair_worker_status_recorded(log, &binding.name, &status).await? {
880 return Ok(false);
881 }
882 append_persona_event(
883 log,
884 &binding.name,
885 "persona.repair_worker.status",
886 serde_json::to_value(&status).map_err(|error| error.to_string())?,
887 status.occurred_at_ms,
888 )
889 .await?;
890 record_persona_supervision_event(
891 log,
892 &binding.name,
893 PersonaSupervisionEvent::RepairWorkerStatus(status),
894 )
895 .await?;
896 Ok(true)
897}
898
899pub async fn restore_persona_checkpoint(
906 log: &Arc<AnyEventLog>,
907 binding: &PersonaRuntimeBinding,
908 request: PersonaCheckpointRestoreRequest,
909 now_ms: i64,
910) -> Result<PersonaCheckpointRestoreOutcome, String> {
911 let PersonaCheckpointRestoreRequest {
912 checkpoint_id,
913 work_key,
914 resumed_from,
915 } = request;
916 let status = persona_status(log, binding, now_ms).await?;
917
918 if let Some(prior) = find_checkpoint_restore_ack(log, &binding.name, &checkpoint_id).await? {
919 return Ok(PersonaCheckpointRestoreOutcome {
920 acked: false,
921 update: prior,
922 });
923 }
924
925 let resume_coordinates = resumed_from.unwrap_or_else(|| PersonaCheckpointResume {
926 run_id: None,
927 lease_id: status.active_lease.as_ref().map(|lease| lease.id.clone()),
928 last_run_ms: status
929 .last_run
930 .as_deref()
931 .and_then(|value| parse_rfc3339_ms(value).ok()),
932 queued_work_keys: status
933 .queued_work
934 .iter()
935 .map(|item| item.work_key.clone())
936 .collect(),
937 note: None,
938 });
939
940 let update = PersonaCheckpointUpdate {
941 persona_id: binding.name.clone(),
942 template_ref: binding.template_ref.clone(),
943 action: PersonaCheckpointAction::RestoreAcked,
944 checkpoint_id: checkpoint_id.clone(),
945 work_key,
946 resumed_from: Some(resume_coordinates),
947 occurred_at_ms: now_ms,
948 };
949
950 append_persona_event(
951 log,
952 &binding.name,
953 "persona.checkpoint.restore_acked",
954 serde_json::to_value(&update).map_err(|error| error.to_string())?,
955 now_ms,
956 )
957 .await?;
958 record_persona_supervision_event(
959 log,
960 &binding.name,
961 PersonaSupervisionEvent::Checkpoint(update.clone()),
962 )
963 .await?;
964 Ok(PersonaCheckpointRestoreOutcome {
965 acked: true,
966 update,
967 })
968}
969
970#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
971pub struct PersonaCheckpointRestoreRequest {
972 pub checkpoint_id: String,
973 #[serde(default)]
974 pub work_key: Option<String>,
975 #[serde(default)]
976 pub resumed_from: Option<PersonaCheckpointResume>,
977}
978
979#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
980pub struct PersonaCheckpointRestoreOutcome {
981 pub acked: bool,
982 pub update: PersonaCheckpointUpdate,
983}
984
985async fn repair_worker_status_recorded(
986 log: &Arc<AnyEventLog>,
987 persona: &str,
988 update: &PersonaRepairWorkerStatusUpdate,
989) -> Result<bool, String> {
990 let events = read_persona_events(log, persona).await?;
991 Ok(events.into_iter().any(|(_, event)| {
992 event.kind == "persona.repair_worker.status"
993 && event
994 .payload
995 .get("repair_worker_id")
996 .and_then(serde_json::Value::as_str)
997 == Some(update.repair_worker_id.as_str())
998 && event
999 .payload
1000 .get("lifecycle")
1001 .and_then(serde_json::Value::as_str)
1002 == Some(update.lifecycle.as_str())
1003 }))
1004}
1005
1006async fn find_checkpoint_restore_ack(
1007 log: &Arc<AnyEventLog>,
1008 persona: &str,
1009 checkpoint_id: &str,
1010) -> Result<Option<PersonaCheckpointUpdate>, String> {
1011 let events = read_persona_events(log, persona).await?;
1012 for (_, event) in events.into_iter().rev() {
1013 if event.kind != "persona.checkpoint.restore_acked" {
1014 continue;
1015 }
1016 if event
1017 .payload
1018 .get("checkpoint_id")
1019 .and_then(serde_json::Value::as_str)
1020 != Some(checkpoint_id)
1021 {
1022 continue;
1023 }
1024 let update: PersonaCheckpointUpdate =
1025 serde_json::from_value(event.payload).map_err(|error| error.to_string())?;
1026 return Ok(Some(update));
1027 }
1028 Ok(None)
1029}
1030
1031async fn run_for_envelope(
1032 log: &Arc<AnyEventLog>,
1033 binding: &PersonaRuntimeBinding,
1034 envelope: PersonaTriggerEnvelope,
1035 cost: PersonaRunCost,
1036 now_ms: i64,
1037) -> Result<PersonaRunReceipt, String> {
1038 let pre_queue = queue_snapshot(log, binding, now_ms).await?;
1039 let receipt = run_for_envelope_inner(log, binding, envelope, cost, now_ms).await?;
1040 let post_queue = queue_snapshot(log, binding, now_ms).await?;
1041 emit_queue_position_supervision(log, binding, &pre_queue, &post_queue, now_ms).await?;
1042 emit_receipt_supervision(log, binding, &receipt, now_ms).await?;
1043 Ok(receipt)
1044}
1045
1046async fn run_for_envelope_inner(
1047 log: &Arc<AnyEventLog>,
1048 binding: &PersonaRuntimeBinding,
1049 envelope: PersonaTriggerEnvelope,
1050 cost: PersonaRunCost,
1051 now_ms: i64,
1052) -> Result<PersonaRunReceipt, String> {
1053 let status = persona_status(log, binding, now_ms).await?;
1054 match status.state {
1055 PersonaLifecycleState::Paused => {
1056 append_persona_event(
1057 log,
1058 &binding.name,
1059 "persona.trigger.queued",
1060 json!({
1061 "work_key": envelope.subject_key,
1062 "envelope": envelope,
1063 "cost": cost,
1064 "reason": "paused",
1065 }),
1066 now_ms,
1067 )
1068 .await?;
1069 return Ok(PersonaRunReceipt {
1070 status: "queued".to_string(),
1071 persona: binding.name.clone(),
1072 run_id: None,
1073 work_key: envelope.subject_key,
1074 lease: None,
1075 queued: true,
1076 error: None,
1077 budget_receipt_id: None,
1078 });
1079 }
1080 PersonaLifecycleState::Disabled => {
1081 append_persona_event(
1082 log,
1083 &binding.name,
1084 "persona.trigger.dead_lettered",
1085 json!({
1086 "work_key": envelope.subject_key,
1087 "envelope": envelope,
1088 "reason": "disabled",
1089 }),
1090 now_ms,
1091 )
1092 .await?;
1093 return Ok(PersonaRunReceipt {
1094 status: "dead_lettered".to_string(),
1095 persona: binding.name.clone(),
1096 run_id: None,
1097 work_key: envelope.subject_key,
1098 lease: None,
1099 queued: false,
1100 error: Some("persona is disabled".to_string()),
1101 budget_receipt_id: None,
1102 });
1103 }
1104 _ => {}
1105 }
1106
1107 if let Err(error) = enforce_budget(log, binding, &cost, now_ms).await {
1108 return Ok(PersonaRunReceipt {
1109 status: "budget_exhausted".to_string(),
1110 persona: binding.name.clone(),
1111 run_id: None,
1112 work_key: envelope.subject_key,
1113 lease: None,
1114 queued: false,
1115 error: Some(error),
1116 budget_receipt_id: None,
1117 });
1118 }
1119
1120 if work_completed(log, &binding.name, &envelope.subject_key).await? {
1121 append_persona_event(
1122 log,
1123 &binding.name,
1124 "persona.trigger.duplicate",
1125 json!({
1126 "work_key": envelope.subject_key,
1127 "envelope": envelope,
1128 "reason": "already_completed",
1129 }),
1130 now_ms,
1131 )
1132 .await?;
1133 return Ok(PersonaRunReceipt {
1134 status: "duplicate".to_string(),
1135 persona: binding.name.clone(),
1136 run_id: None,
1137 work_key: envelope.subject_key,
1138 lease: None,
1139 queued: false,
1140 error: None,
1141 budget_receipt_id: None,
1142 });
1143 }
1144
1145 let Some(lease) = acquire_lease(
1146 log,
1147 binding,
1148 &envelope.subject_key,
1149 "persona-runtime",
1150 DEFAULT_LEASE_TTL_MS,
1151 now_ms,
1152 )
1153 .await?
1154 else {
1155 return Ok(PersonaRunReceipt {
1156 status: "lease_busy".to_string(),
1157 persona: binding.name.clone(),
1158 run_id: None,
1159 work_key: envelope.subject_key,
1160 lease: status.active_lease,
1161 queued: false,
1162 error: Some("active lease already owns persona work".to_string()),
1163 budget_receipt_id: None,
1164 });
1165 };
1166
1167 let run_id = Uuid::now_v7();
1168 let value_metadata = run_value_metadata(&envelope, &lease, &cost);
1169 append_persona_event(
1170 log,
1171 &binding.name,
1172 "persona.run.started",
1173 json!({
1174 "work_key": envelope.subject_key,
1175 "run_id": run_id,
1176 "started_at_ms": now_ms,
1177 "entry_workflow": binding.entry_workflow,
1178 "lease_id": lease.id,
1179 }),
1180 now_ms,
1181 )
1182 .await?;
1183 emit_persona_value_event(
1184 log,
1185 binding,
1186 run_id,
1187 PersonaValueEventDelta {
1188 kind: PersonaValueEventKind::RunStarted,
1189 metadata: value_metadata.clone(),
1190 ..Default::default()
1191 },
1192 now_ms,
1193 )
1194 .await?;
1195 let budget_receipt_id =
1196 append_budget_record(log, &binding.name, &cost, Some(&lease.id), now_ms).await?;
1197 if cost.avoided_cost_usd > 0.0 || cost.deterministic_steps > 0 {
1198 emit_persona_value_event(
1199 log,
1200 binding,
1201 run_id,
1202 PersonaValueEventDelta {
1203 kind: PersonaValueEventKind::DeterministicExecution,
1204 avoided_cost_usd: cost.avoided_cost_usd,
1205 deterministic_steps: cost.deterministic_steps.max(1),
1206 metadata: value_metadata.clone(),
1207 ..Default::default()
1208 },
1209 now_ms,
1210 )
1211 .await?;
1212 }
1213 if cost.frontier_escalations > 0 {
1214 emit_persona_value_event(
1215 log,
1216 binding,
1217 run_id,
1218 PersonaValueEventDelta {
1219 kind: PersonaValueEventKind::FrontierEscalation,
1220 paid_cost_usd: cost.cost_usd,
1221 llm_steps: cost.llm_steps.max(cost.frontier_escalations),
1222 metadata: value_metadata.clone(),
1223 ..Default::default()
1224 },
1225 now_ms,
1226 )
1227 .await?;
1228 }
1229 let completion_paid_cost = if cost.frontier_escalations > 0 {
1230 0.0
1231 } else {
1232 cost.cost_usd
1233 };
1234 let completion_llm_steps = if cost.frontier_escalations > 0 {
1235 0
1236 } else {
1237 cost.llm_steps
1238 };
1239 emit_persona_value_event(
1240 log,
1241 binding,
1242 run_id,
1243 PersonaValueEventDelta {
1244 kind: PersonaValueEventKind::RunCompleted,
1245 paid_cost_usd: completion_paid_cost,
1246 llm_steps: completion_llm_steps,
1247 metadata: value_metadata,
1248 ..Default::default()
1249 },
1250 now_ms,
1251 )
1252 .await?;
1253 append_persona_event(
1254 log,
1255 &binding.name,
1256 "persona.run.completed",
1257 json!({
1258 "work_key": envelope.subject_key,
1259 "run_id": run_id,
1260 "completed_at_ms": now_ms,
1261 "entry_workflow": binding.entry_workflow,
1262 "lease_id": lease.id,
1263 }),
1264 now_ms,
1265 )
1266 .await?;
1267 append_persona_event(
1268 log,
1269 &binding.name,
1270 "persona.lease.released",
1271 json!({
1272 "id": lease.id,
1273 "work_key": envelope.subject_key,
1274 "released_at_ms": now_ms,
1275 }),
1276 now_ms,
1277 )
1278 .await?;
1279 Ok(PersonaRunReceipt {
1280 status: "completed".to_string(),
1281 persona: binding.name.clone(),
1282 run_id: Some(run_id),
1283 work_key: envelope.subject_key,
1284 lease: Some(lease),
1285 queued: false,
1286 error: None,
1287 budget_receipt_id: Some(budget_receipt_id),
1288 })
1289}
1290
1291async fn acquire_lease(
1292 log: &Arc<AnyEventLog>,
1293 binding: &PersonaRuntimeBinding,
1294 work_key: &str,
1295 holder: &str,
1296 ttl_ms: i64,
1297 now_ms: i64,
1298) -> Result<Option<PersonaLease>, String> {
1299 let status = persona_status(log, binding, now_ms).await?;
1300 if let Some(lease) = status.active_lease {
1301 if lease.expires_at_ms > now_ms {
1302 append_persona_event(
1303 log,
1304 &binding.name,
1305 "persona.lease.conflict",
1306 json!({
1307 "active_lease": lease,
1308 "requested_work_key": work_key,
1309 "at_ms": now_ms,
1310 }),
1311 now_ms,
1312 )
1313 .await?;
1314 return Ok(None);
1315 }
1316 append_persona_event(
1317 log,
1318 &binding.name,
1319 "persona.lease.expired",
1320 json!({
1321 "id": lease.id,
1322 "work_key": lease.work_key,
1323 "expired_at_ms": now_ms,
1324 }),
1325 now_ms,
1326 )
1327 .await?;
1328 }
1329
1330 let lease = PersonaLease {
1331 id: format!("persona_lease_{}", Uuid::now_v7()),
1332 holder: holder.to_string(),
1333 work_key: work_key.to_string(),
1334 acquired_at_ms: now_ms,
1335 expires_at_ms: now_ms + ttl_ms,
1336 };
1337 append_persona_event(
1338 log,
1339 &binding.name,
1340 "persona.lease.acquired",
1341 serde_json::to_value(&lease).map_err(|error| error.to_string())?,
1342 now_ms,
1343 )
1344 .await?;
1345 Ok(Some(lease))
1346}
1347
1348async fn enforce_budget(
1349 log: &Arc<AnyEventLog>,
1350 binding: &PersonaRuntimeBinding,
1351 cost: &PersonaRunCost,
1352 now_ms: i64,
1353) -> Result<(), String> {
1354 let status = persona_status(log, binding, now_ms).await?;
1355 let reason = if binding
1356 .budget
1357 .run_usd
1358 .is_some_and(|limit| cost.cost_usd > limit)
1359 {
1360 Some("run_usd")
1361 } else if binding
1362 .budget
1363 .daily_usd
1364 .is_some_and(|limit| status.budget.spent_today_usd + cost.cost_usd > limit)
1365 {
1366 Some("daily_usd")
1367 } else if binding
1368 .budget
1369 .hourly_usd
1370 .is_some_and(|limit| status.budget.spent_this_hour_usd + cost.cost_usd > limit)
1371 {
1372 Some("hourly_usd")
1373 } else if binding
1374 .budget
1375 .max_tokens
1376 .is_some_and(|limit| status.budget.tokens_today + cost.tokens > limit)
1377 {
1378 Some("max_tokens")
1379 } else {
1380 None
1381 };
1382
1383 if let Some(reason) = reason {
1384 let receipt_id = format!("persona_budget_{}", Uuid::now_v7());
1385 append_persona_event(
1386 log,
1387 &binding.name,
1388 "persona.budget.exhausted",
1389 json!({
1390 "receipt_id": receipt_id,
1391 "reason": reason,
1392 "attempted_cost_usd": cost.cost_usd,
1393 "attempted_tokens": cost.tokens,
1394 "persona": binding.name,
1395 }),
1396 now_ms,
1397 )
1398 .await?;
1399 return Err(format!("persona budget exhausted: {reason}"));
1400 }
1401
1402 Ok(())
1403}
1404
1405async fn append_budget_record(
1406 log: &Arc<AnyEventLog>,
1407 persona: &str,
1408 cost: &PersonaRunCost,
1409 lease_id: Option<&str>,
1410 now_ms: i64,
1411) -> Result<String, String> {
1412 let receipt_id = format!("persona_budget_{}", Uuid::now_v7());
1413 append_persona_event(
1414 log,
1415 persona,
1416 "persona.budget.recorded",
1417 json!({
1418 "receipt_id": receipt_id,
1419 "persona": persona,
1420 "cost_usd": cost.cost_usd,
1421 "tokens": cost.tokens,
1422 "lease_id": lease_id,
1423 }),
1424 now_ms,
1425 )
1426 .await?;
1427 Ok(receipt_id)
1428}
1429
1430fn normalize_trigger_envelope(
1431 provider: &str,
1432 kind: &str,
1433 metadata: BTreeMap<String, String>,
1434 now_ms: i64,
1435) -> PersonaTriggerEnvelope {
1436 let provider = provider.to_ascii_lowercase();
1437 let kind = kind.to_string();
1438 let source_event_id = metadata
1439 .get("event_id")
1440 .or_else(|| metadata.get("id"))
1441 .cloned();
1442 let subject_key = match provider.as_str() {
1443 "github" => {
1444 let repo = metadata
1445 .get("repository")
1446 .or_else(|| metadata.get("repository.full_name"))
1447 .cloned()
1448 .unwrap_or_else(|| "unknown".to_string());
1449 if let Some(number) = metadata
1450 .get("pr")
1451 .or_else(|| metadata.get("pull_request.number"))
1452 .or_else(|| metadata.get("number"))
1453 {
1454 format!("github:{repo}:pr:{number}")
1455 } else if let Some(check) = metadata
1456 .get("check_run.name")
1457 .or_else(|| metadata.get("check_name"))
1458 {
1459 format!("github:{repo}:check:{check}")
1460 } else {
1461 format!("github:{repo}:{kind}")
1462 }
1463 }
1464 "linear" => {
1465 let issue = metadata
1466 .get("issue_key")
1467 .or_else(|| metadata.get("issue.identifier"))
1468 .or_else(|| metadata.get("issue_id"))
1469 .or_else(|| metadata.get("id"))
1470 .cloned()
1471 .unwrap_or_else(|| "unknown".to_string());
1472 format!("linear:issue:{issue}")
1473 }
1474 "slack" => {
1475 let channel = metadata
1476 .get("channel")
1477 .or_else(|| metadata.get("channel_id"))
1478 .cloned()
1479 .unwrap_or_else(|| "unknown".to_string());
1480 let ts = metadata
1481 .get("ts")
1482 .or_else(|| metadata.get("event_ts"))
1483 .cloned()
1484 .unwrap_or_else(|| "unknown".to_string());
1485 format!("slack:{channel}:{ts}")
1486 }
1487 "webhook" => metadata
1488 .get("dedupe_key")
1489 .or_else(|| metadata.get("event_id"))
1490 .map(|value| format!("webhook:{value}"))
1491 .unwrap_or_else(|| format!("webhook:{kind}:{}", Uuid::now_v7())),
1492 _ => metadata
1493 .get("dedupe_key")
1494 .or_else(|| metadata.get("event_id"))
1495 .map(|value| format!("{provider}:{kind}:{value}"))
1496 .unwrap_or_else(|| format!("{provider}:{kind}:{}", Uuid::now_v7())),
1497 };
1498
1499 PersonaTriggerEnvelope {
1500 provider,
1501 kind,
1502 subject_key,
1503 source_event_id,
1504 received_at_ms: now_ms,
1505 raw: json!({"metadata": metadata}),
1506 metadata,
1507 }
1508}
1509
1510async fn queued_events(
1511 log: &Arc<AnyEventLog>,
1512 persona: &str,
1513) -> Result<Vec<(PersonaTriggerEnvelope, PersonaRunCost)>, String> {
1514 let events = read_persona_events(log, persona).await?;
1515 let mut queued = BTreeMap::<String, (PersonaTriggerEnvelope, PersonaRunCost)>::new();
1516 let mut completed = BTreeSet::<String>::new();
1517 for (_, event) in events {
1518 match event.kind.as_str() {
1519 "persona.trigger.queued" => {
1520 let Some(envelope) = event.payload.get("envelope") else {
1521 continue;
1522 };
1523 let envelope: PersonaTriggerEnvelope =
1524 serde_json::from_value(envelope.clone()).map_err(|error| error.to_string())?;
1525 let cost = event
1526 .payload
1527 .get("cost")
1528 .cloned()
1529 .map(serde_json::from_value::<PersonaRunCost>)
1530 .transpose()
1531 .map_err(|error| error.to_string())?
1532 .unwrap_or_default();
1533 queued.insert(envelope.subject_key.clone(), (envelope, cost));
1534 }
1535 "persona.run.completed" => {
1536 if let Some(work_key) = event
1537 .payload
1538 .get("work_key")
1539 .and_then(serde_json::Value::as_str)
1540 {
1541 completed.insert(work_key.to_string());
1542 }
1543 }
1544 _ => {}
1545 }
1546 }
1547 queued.retain(|work_key, _| !completed.contains(work_key));
1548 Ok(queued.into_values().collect())
1549}
1550
1551fn assignment_status_from_lease(lease: &PersonaLease) -> PersonaAssignmentStatus {
1552 PersonaAssignmentStatus {
1553 work_key: lease.work_key.clone(),
1554 lease_id: lease.id.clone(),
1555 holder: lease.holder.clone(),
1556 acquired_at: format_ms(lease.acquired_at_ms),
1557 expires_at: format_ms(lease.expires_at_ms),
1558 }
1559}
1560
1561fn queued_work_from_event(event: &LogEvent) -> Result<Option<PersonaQueuedWork>, String> {
1562 let Some(envelope) = event.payload.get("envelope") else {
1563 return Ok(None);
1564 };
1565 let envelope: PersonaTriggerEnvelope =
1566 serde_json::from_value(envelope.clone()).map_err(|error| error.to_string())?;
1567 Ok(Some(PersonaQueuedWork {
1568 work_key: envelope.subject_key,
1569 provider: envelope.provider,
1570 kind: envelope.kind,
1571 queued_at: format_ms(event.occurred_at_ms),
1572 reason: event
1573 .payload
1574 .get("reason")
1575 .and_then(serde_json::Value::as_str)
1576 .unwrap_or("queued")
1577 .to_string(),
1578 source_event_id: envelope.source_event_id,
1579 metadata: envelope.metadata,
1580 }))
1581}
1582
1583fn handoff_inbox_item(work: &PersonaQueuedWork) -> Option<PersonaHandoffInboxItem> {
1584 if work.provider != "handoff" && !work.metadata.contains_key("handoff_id") {
1585 return None;
1586 }
1587 Some(PersonaHandoffInboxItem {
1588 work_key: work.work_key.clone(),
1589 handoff_id: work.metadata.get("handoff_id").cloned(),
1590 handoff_kind: work
1591 .metadata
1592 .get("handoff_kind")
1593 .or_else(|| work.metadata.get("kind"))
1594 .cloned(),
1595 source_persona: work.metadata.get("source_persona").cloned(),
1596 task: work.metadata.get("task").cloned(),
1597 queued_at: work.queued_at.clone(),
1598 reason: work.reason.clone(),
1599 })
1600}
1601
1602fn value_receipt_from_event(event: &LogEvent) -> Result<Option<PersonaValueReceipt>, String> {
1603 let Ok(value_event) = serde_json::from_value::<PersonaValueEvent>(event.payload.clone()) else {
1604 return Ok(None);
1605 };
1606 Ok(Some(PersonaValueReceipt {
1607 kind: value_event.kind,
1608 run_id: value_event.run_id,
1609 occurred_at: value_event
1610 .occurred_at
1611 .format(&Rfc3339)
1612 .map_err(|error| error.to_string())?,
1613 paid_cost_usd: value_event.paid_cost_usd,
1614 avoided_cost_usd: value_event.avoided_cost_usd,
1615 deterministic_steps: value_event.deterministic_steps,
1616 llm_steps: value_event.llm_steps,
1617 metadata: value_event.metadata,
1618 }))
1619}
1620
1621async fn work_completed(
1622 log: &Arc<AnyEventLog>,
1623 persona: &str,
1624 work_key: &str,
1625) -> Result<bool, String> {
1626 let events = read_persona_events(log, persona).await?;
1627 Ok(events.into_iter().any(|(_, event)| {
1628 event.kind == "persona.run.completed"
1629 && event
1630 .payload
1631 .get("work_key")
1632 .and_then(serde_json::Value::as_str)
1633 == Some(work_key)
1634 }))
1635}
1636
1637async fn read_persona_events(
1638 log: &Arc<AnyEventLog>,
1639 persona: &str,
1640) -> Result<Vec<(u64, LogEvent)>, String> {
1641 let topic = runtime_topic()?;
1642 Ok(log
1643 .read_range(&topic, None, usize::MAX)
1644 .await
1645 .map_err(|error| error.to_string())?
1646 .into_iter()
1647 .filter(|(_, event)| {
1648 event
1649 .headers
1650 .get("persona")
1651 .is_some_and(|name| name == persona)
1652 })
1653 .collect())
1654}
1655
1656async fn append_persona_event(
1657 log: &Arc<AnyEventLog>,
1658 persona: &str,
1659 kind: &str,
1660 payload: serde_json::Value,
1661 now_ms: i64,
1662) -> Result<u64, String> {
1663 let mut headers = BTreeMap::new();
1664 headers.insert("persona".to_string(), persona.to_string());
1665 forward_persona_run_event(persona, kind, &payload);
1666 let event = LogEvent {
1667 kind: kind.to_string(),
1668 payload,
1669 headers,
1670 occurred_at_ms: now_ms,
1671 };
1672 log.append(&runtime_topic()?, event)
1673 .await
1674 .map_err(|error| error.to_string())
1675}
1676
1677fn forward_persona_run_event(persona: &str, kind: &str, payload: &serde_json::Value) {
1684 if !crate::run_events::sink_active() {
1685 return;
1686 }
1687 let transition = kind
1688 .strip_prefix("persona.stage.")
1689 .or_else(|| kind.strip_prefix("persona.run."));
1690 let Some(transition) = transition else {
1691 return;
1692 };
1693 let stage = payload
1694 .get("stage")
1695 .or_else(|| payload.get("to"))
1696 .or_else(|| payload.get("name"))
1697 .and_then(|value| value.as_str())
1698 .unwrap_or("")
1699 .to_string();
1700 crate::run_events::emit(crate::run_events::RunEvent::PersonaStage {
1701 persona: persona.to_string(),
1702 stage,
1703 transition: transition.to_string(),
1704 });
1705}
1706
1707struct PersonaValueEventDelta {
1708 kind: PersonaValueEventKind,
1709 paid_cost_usd: f64,
1710 avoided_cost_usd: f64,
1711 deterministic_steps: i64,
1712 llm_steps: i64,
1713 metadata: serde_json::Value,
1714}
1715
1716impl Default for PersonaValueEventDelta {
1717 fn default() -> Self {
1718 Self {
1719 kind: PersonaValueEventKind::RunCompleted,
1720 paid_cost_usd: 0.0,
1721 avoided_cost_usd: 0.0,
1722 deterministic_steps: 0,
1723 llm_steps: 0,
1724 metadata: serde_json::Value::Null,
1725 }
1726 }
1727}
1728
1729async fn emit_persona_value_event(
1730 log: &Arc<AnyEventLog>,
1731 binding: &PersonaRuntimeBinding,
1732 run_id: Uuid,
1733 delta: PersonaValueEventDelta,
1734 now_ms: i64,
1735) -> Result<(), String> {
1736 let event = PersonaValueEvent {
1737 persona_id: binding.name.clone(),
1738 template_ref: binding.template_ref.clone(),
1739 run_id: Some(run_id),
1740 kind: delta.kind,
1741 paid_cost_usd: delta.paid_cost_usd.max(0.0),
1742 avoided_cost_usd: delta.avoided_cost_usd.max(0.0),
1743 deterministic_steps: delta.deterministic_steps.max(0),
1744 llm_steps: delta.llm_steps.max(0),
1745 metadata: delta.metadata,
1746 occurred_at: offset_datetime_from_ms(now_ms),
1747 };
1748 append_persona_event(
1749 log,
1750 &binding.name,
1751 &format!("persona.value.{}", event.kind.as_str()),
1752 serde_json::to_value(&event).map_err(|error| error.to_string())?,
1753 now_ms,
1754 )
1755 .await?;
1756 emit_persona_value_sink_event(&event);
1757 Ok(())
1758}
1759
1760fn emit_persona_value_sink_event(event: &PersonaValueEvent) {
1761 for sink in persona_value_sinks().snapshot() {
1762 sink.handle_value_event(event);
1763 }
1764}
1765
1766fn emit_persona_supervision_sink_event(event: &PersonaSupervisionEvent) {
1767 for sink in persona_supervision_sinks().snapshot() {
1768 sink.handle_supervision_event(event);
1769 }
1770}
1771
1772async fn record_persona_supervision_event(
1773 log: &Arc<AnyEventLog>,
1774 persona: &str,
1775 event: PersonaSupervisionEvent,
1776) -> Result<(), String> {
1777 let update_kind = event.update_kind();
1778 let occurred_at_ms = event.occurred_at_ms();
1779 append_persona_event(
1780 log,
1781 persona,
1782 &format!("persona.supervision.{update_kind}"),
1783 serde_json::to_value(&event).map_err(|error| error.to_string())?,
1784 occurred_at_ms,
1785 )
1786 .await?;
1787 emit_persona_supervision_sink_event(&event);
1788 Ok(())
1789}
1790
1791#[derive(Clone, Debug)]
1792struct QueueEntry {
1793 work_key: String,
1794 queued_at_ms: i64,
1795}
1796
1797async fn queue_snapshot(
1798 log: &Arc<AnyEventLog>,
1799 binding: &PersonaRuntimeBinding,
1800 now_ms: i64,
1801) -> Result<Vec<QueueEntry>, String> {
1802 let status = persona_status(log, binding, now_ms).await?;
1803 Ok(status
1804 .queued_work
1805 .into_iter()
1806 .map(|item| QueueEntry {
1807 queued_at_ms: parse_rfc3339_ms(&item.queued_at).unwrap_or(now_ms),
1808 work_key: item.work_key,
1809 })
1810 .collect())
1811}
1812
1813async fn emit_queue_position_supervision(
1814 log: &Arc<AnyEventLog>,
1815 binding: &PersonaRuntimeBinding,
1816 before: &[QueueEntry],
1817 after: &[QueueEntry],
1818 now_ms: i64,
1819) -> Result<(), String> {
1820 use std::collections::HashSet;
1821 let before_keys: HashSet<&str> = before.iter().map(|e| e.work_key.as_str()).collect();
1822 let after_keys: HashSet<&str> = after.iter().map(|e| e.work_key.as_str()).collect();
1823 let after_depth = after.len() as i64;
1824
1825 for (index, entry) in after.iter().enumerate() {
1826 if !before_keys.contains(entry.work_key.as_str()) {
1827 record_persona_supervision_event(
1828 log,
1829 &binding.name,
1830 PersonaSupervisionEvent::QueuePosition(PersonaQueuePositionUpdate {
1831 persona_id: binding.name.clone(),
1832 template_ref: binding.template_ref.clone(),
1833 work_key: entry.work_key.clone(),
1834 queue_depth: after_depth,
1835 position: (index + 1) as i64,
1836 queued_at_ms: entry.queued_at_ms,
1837 occurred_at_ms: now_ms,
1838 }),
1839 )
1840 .await?;
1841 }
1842 }
1843 for entry in before {
1844 if !after_keys.contains(entry.work_key.as_str()) {
1845 record_persona_supervision_event(
1846 log,
1847 &binding.name,
1848 PersonaSupervisionEvent::QueuePosition(PersonaQueuePositionUpdate {
1849 persona_id: binding.name.clone(),
1850 template_ref: binding.template_ref.clone(),
1851 work_key: entry.work_key.clone(),
1852 queue_depth: after_depth,
1853 position: 0,
1854 queued_at_ms: entry.queued_at_ms,
1855 occurred_at_ms: now_ms,
1856 }),
1857 )
1858 .await?;
1859 }
1860 }
1861 Ok(())
1862}
1863
1864async fn emit_receipt_supervision(
1865 log: &Arc<AnyEventLog>,
1866 binding: &PersonaRuntimeBinding,
1867 receipt: &PersonaRunReceipt,
1868 now_ms: i64,
1869) -> Result<(), String> {
1870 record_persona_supervision_event(
1871 log,
1872 &binding.name,
1873 PersonaSupervisionEvent::Receipt(PersonaReceiptUpdate {
1874 persona_id: binding.name.clone(),
1875 template_ref: binding.template_ref.clone(),
1876 receipt: receipt.clone(),
1877 occurred_at_ms: now_ms,
1878 }),
1879 )
1880 .await
1881}
1882
1883fn run_value_metadata(
1884 envelope: &PersonaTriggerEnvelope,
1885 lease: &PersonaLease,
1886 cost: &PersonaRunCost,
1887) -> serde_json::Value {
1888 let mut metadata = serde_json::Map::new();
1889 metadata.insert("work_key".to_string(), json!(envelope.subject_key));
1890 metadata.insert("trigger_provider".to_string(), json!(envelope.provider));
1891 metadata.insert("trigger_kind".to_string(), json!(envelope.kind));
1892 metadata.insert("lease_id".to_string(), json!(lease.id));
1893 metadata.insert("tokens".to_string(), json!(cost.tokens));
1894 if cost.frontier_escalations > 0 {
1895 metadata.insert(
1896 "frontier_escalations".to_string(),
1897 json!(cost.frontier_escalations),
1898 );
1899 }
1900 match &cost.metadata {
1901 serde_json::Value::Null => {}
1902 serde_json::Value::Object(extra) => {
1903 metadata.extend(
1904 extra
1905 .iter()
1906 .map(|(key, value)| (key.clone(), value.clone())),
1907 );
1908 }
1909 extra => {
1910 metadata.insert("run_cost_metadata".to_string(), extra.clone());
1911 }
1912 }
1913 serde_json::Value::Object(metadata)
1914}
1915
1916fn budget_status(
1917 policy: &PersonaBudgetPolicy,
1918 spent: &[(i64, f64, u64)],
1919 now_ms: i64,
1920) -> PersonaBudgetStatus {
1921 let day_start = now_ms - (now_ms.rem_euclid(86_400_000));
1922 let hour_start = now_ms - (now_ms.rem_euclid(3_600_000));
1923 let mut spent_today_usd = 0.0;
1924 let mut spent_this_hour_usd = 0.0;
1925 let mut tokens_today = 0u64;
1926 let mut spent_last_run_usd = 0.0;
1927 for (at_ms, cost, tokens) in spent {
1928 spent_last_run_usd = *cost;
1929 if *at_ms >= day_start {
1930 spent_today_usd += cost;
1931 tokens_today += tokens;
1932 }
1933 if *at_ms >= hour_start {
1934 spent_this_hour_usd += cost;
1935 }
1936 }
1937
1938 let remaining_today_usd = policy
1939 .daily_usd
1940 .map(|limit| (limit - spent_today_usd).max(0.0));
1941 let remaining_hour_usd = policy
1942 .hourly_usd
1943 .map(|limit| (limit - spent_this_hour_usd).max(0.0));
1944 let reason = if policy
1945 .daily_usd
1946 .is_some_and(|limit| spent_today_usd >= limit && limit >= 0.0)
1947 {
1948 Some("daily_usd".to_string())
1949 } else if policy
1950 .hourly_usd
1951 .is_some_and(|limit| spent_this_hour_usd >= limit && limit >= 0.0)
1952 {
1953 Some("hourly_usd".to_string())
1954 } else if policy
1955 .max_tokens
1956 .is_some_and(|limit| tokens_today >= limit && limit > 0)
1957 {
1958 Some("max_tokens".to_string())
1959 } else {
1960 None
1961 };
1962
1963 PersonaBudgetStatus {
1964 daily_usd: policy.daily_usd,
1965 hourly_usd: policy.hourly_usd,
1966 run_usd: policy.run_usd,
1967 max_tokens: policy.max_tokens,
1968 spent_today_usd,
1969 spent_this_hour_usd,
1970 spent_last_run_usd,
1971 tokens_today,
1972 remaining_today_usd,
1973 remaining_hour_usd,
1974 exhausted: reason.is_some(),
1975 reason,
1976 last_receipt_id: None,
1977 }
1978}
1979
1980fn next_scheduled_run(
1981 binding: &PersonaRuntimeBinding,
1982 last_run_ms: Option<i64>,
1983 now_ms: i64,
1984) -> Option<String> {
1985 binding
1986 .schedules
1987 .iter()
1988 .filter_map(|schedule| next_cron_ms(schedule, last_run_ms.unwrap_or(now_ms)).ok())
1989 .min()
1990 .map(format_ms)
1991}
1992
1993fn next_cron_ms(schedule: &str, after_ms: i64) -> Result<i64, String> {
1994 let cron = schedule
1995 .parse::<Cron>()
1996 .map_err(|error| error.to_string())?;
1997 let after = Utc
1998 .timestamp_millis_opt(after_ms)
1999 .single()
2000 .ok_or_else(|| "invalid timestamp".to_string())?;
2001 let next = cron
2002 .find_next_occurrence(&after, false)
2003 .map_err(|error| error.to_string())?;
2004 Ok(next.timestamp_millis())
2005}
2006
2007pub fn now_ms() -> i64 {
2008 OffsetDateTime::now_utc().unix_timestamp_nanos() as i64 / 1_000_000
2009}
2010
2011fn offset_datetime_from_ms(ms: i64) -> OffsetDateTime {
2012 OffsetDateTime::from_unix_timestamp_nanos((ms as i128) * 1_000_000)
2013 .unwrap_or(OffsetDateTime::UNIX_EPOCH)
2014}
2015
2016pub fn format_ms(ms: i64) -> String {
2017 offset_datetime_from_ms(ms)
2018 .format(&Rfc3339)
2019 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
2020}
2021
2022pub fn parse_rfc3339_ms(value: &str) -> Result<i64, String> {
2023 let ts = OffsetDateTime::parse(value, &Rfc3339)
2024 .map_err(|error| format!("invalid RFC3339 timestamp '{value}': {error}"))?;
2025 Ok(ts.unix_timestamp_nanos() as i64 / 1_000_000)
2026}
2027
2028fn runtime_topic() -> Result<Topic, String> {
2029 Topic::new(PERSONA_RUNTIME_TOPIC).map_err(|error| error.to_string())
2030}
2031
2032#[cfg(test)]
2033mod tests {
2034 use super::*;
2035 use crate::event_log::{AnyEventLog, MemoryEventLog};
2036 use std::sync::Mutex;
2037
2038 struct CapturingValueSink {
2039 events: Arc<Mutex<Vec<PersonaValueEvent>>>,
2040 }
2041
2042 impl PersonaValueSink for CapturingValueSink {
2043 fn handle_value_event(&self, event: &PersonaValueEvent) {
2044 self.events.lock().unwrap().push(event.clone());
2045 }
2046 }
2047
2048 fn binding() -> PersonaRuntimeBinding {
2049 PersonaRuntimeBinding {
2050 name: "merge_captain".to_string(),
2051 template_ref: Some("software_factory@v0".to_string()),
2052 entry_workflow: "workflows/merge.harn#run".to_string(),
2053 schedules: vec!["*/30 * * * *".to_string()],
2054 triggers: vec!["github.pr_opened".to_string()],
2055 budget: PersonaBudgetPolicy {
2056 daily_usd: Some(0.02),
2057 hourly_usd: None,
2058 run_usd: Some(0.02),
2059 max_tokens: Some(100),
2060 },
2061 stages: Vec::new(),
2062 }
2063 }
2064
2065 fn log() -> Arc<AnyEventLog> {
2066 Arc::new(AnyEventLog::Memory(MemoryEventLog::new(64)))
2067 }
2068
2069 #[tokio::test]
2070 async fn schedule_tick_records_lifecycle_status_and_receipt() {
2071 let log = log();
2072 let binding = binding();
2073 let now = parse_rfc3339_ms("2026-04-24T12:30:00Z").unwrap();
2074 let receipt = fire_schedule(
2075 &log,
2076 &binding,
2077 PersonaRunCost {
2078 cost_usd: 0.01,
2079 tokens: 10,
2080 ..Default::default()
2081 },
2082 now,
2083 )
2084 .await
2085 .unwrap();
2086 assert_eq!(receipt.status, "completed");
2087 assert!(receipt.lease.is_some());
2088 let status = persona_status(&log, &binding, now).await.unwrap();
2089 assert_eq!(status.state, PersonaLifecycleState::Idle);
2090 assert_eq!(status.last_run.as_deref(), Some("2026-04-24T12:30:00Z"));
2091 assert!(status.next_scheduled_run.is_some());
2092 assert_eq!(status.budget.spent_today_usd, 0.01);
2093 }
2094
2095 #[tokio::test]
2096 async fn paused_personas_queue_and_resume_drains_once() {
2097 let log = log();
2098 let binding = binding();
2099 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2100 pause_persona(&log, &binding, now).await.unwrap();
2101 let receipt = fire_trigger(
2102 &log,
2103 &binding,
2104 "github",
2105 "pull_request",
2106 BTreeMap::from([
2107 ("repository".to_string(), "burin-labs/harn".to_string()),
2108 ("number".to_string(), "462".to_string()),
2109 ]),
2110 PersonaRunCost::default(),
2111 now,
2112 )
2113 .await
2114 .unwrap();
2115 assert_eq!(receipt.status, "queued");
2116 assert_eq!(
2117 persona_status(&log, &binding, now)
2118 .await
2119 .unwrap()
2120 .queued_events,
2121 1
2122 );
2123 let status = resume_persona(&log, &binding, now + 1000).await.unwrap();
2124 assert_eq!(status.state, PersonaLifecycleState::Idle);
2125 assert_eq!(status.queued_events, 0);
2126 }
2127
2128 #[tokio::test]
2129 async fn resumed_queued_work_reuses_original_budget_cost() {
2130 let log = log();
2131 let mut binding = binding();
2132 binding.budget.run_usd = Some(0.01);
2133 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2134 pause_persona(&log, &binding, now).await.unwrap();
2135 let queued = fire_trigger(
2136 &log,
2137 &binding,
2138 "github",
2139 "pull_request",
2140 BTreeMap::from([
2141 ("repository".to_string(), "burin-labs/harn".to_string()),
2142 ("number".to_string(), "1379".to_string()),
2143 ]),
2144 PersonaRunCost {
2145 cost_usd: 0.02,
2146 tokens: 1,
2147 ..Default::default()
2148 },
2149 now + 1,
2150 )
2151 .await
2152 .unwrap();
2153 assert_eq!(queued.status, "queued");
2154
2155 let status = resume_persona(&log, &binding, now + 2).await.unwrap();
2156
2157 assert_eq!(status.budget.reason.as_deref(), Some("run_usd"));
2158 assert!(status
2159 .last_error
2160 .as_deref()
2161 .is_some_and(|error| error.contains("run_usd")));
2162 assert_eq!(status.queued_events, 1);
2163 }
2164
2165 #[tokio::test]
2166 async fn duplicate_trigger_envelope_is_not_processed_twice() {
2167 let log = log();
2168 let binding = binding();
2169 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2170 let metadata = BTreeMap::from([
2171 ("repository".to_string(), "burin-labs/harn".to_string()),
2172 ("number".to_string(), "462".to_string()),
2173 ]);
2174 let first = fire_trigger(
2175 &log,
2176 &binding,
2177 "github",
2178 "pull_request",
2179 metadata.clone(),
2180 PersonaRunCost::default(),
2181 now,
2182 )
2183 .await
2184 .unwrap();
2185 let second = fire_trigger(
2186 &log,
2187 &binding,
2188 "github",
2189 "pull_request",
2190 metadata,
2191 PersonaRunCost::default(),
2192 now + 1000,
2193 )
2194 .await
2195 .unwrap();
2196 assert_eq!(first.status, "completed");
2197 assert_eq!(second.status, "duplicate");
2198 assert!(second.lease.is_none());
2199 }
2200
2201 #[tokio::test]
2202 async fn disabled_personas_dead_letter_events() {
2203 let log = log();
2204 let binding = binding();
2205 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2206 disable_persona(&log, &binding, now).await.unwrap();
2207 let receipt = fire_trigger(
2208 &log,
2209 &binding,
2210 "slack",
2211 "message",
2212 BTreeMap::from([
2213 ("channel".to_string(), "C123".to_string()),
2214 ("ts".to_string(), "1713988800.000100".to_string()),
2215 ]),
2216 PersonaRunCost::default(),
2217 now,
2218 )
2219 .await
2220 .unwrap();
2221 assert_eq!(receipt.status, "dead_lettered");
2222 let status = persona_status(&log, &binding, now).await.unwrap();
2223 assert_eq!(status.state, PersonaLifecycleState::Disabled);
2224 assert_eq!(status.disabled_events, 1);
2225 }
2226
2227 #[tokio::test]
2228 async fn budget_exhaustion_blocks_expensive_work() {
2229 let log = log();
2230 let mut binding = binding();
2231 binding.budget.daily_usd = Some(0.01);
2232 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2233 let receipt = fire_trigger(
2234 &log,
2235 &binding,
2236 "linear",
2237 "issue",
2238 BTreeMap::from([("issue_key".to_string(), "HAR-462".to_string())]),
2239 PersonaRunCost {
2240 cost_usd: 0.02,
2241 tokens: 1,
2242 ..Default::default()
2243 },
2244 now,
2245 )
2246 .await
2247 .unwrap();
2248 assert_eq!(receipt.status, "budget_exhausted");
2249 let status = persona_status(&log, &binding, now).await.unwrap();
2250 assert_eq!(status.budget.reason.as_deref(), Some("daily_usd"));
2251 assert!(status.budget.exhausted);
2252 assert!(status.last_error.as_deref().unwrap().contains("daily_usd"));
2253 }
2254
2255 #[tokio::test]
2256 async fn deterministic_predicate_hit_emits_value_event_with_avoided_cost() {
2257 let log = log();
2258 let binding = binding();
2259 let captured = Arc::new(Mutex::new(Vec::new()));
2260 let _registration = register_persona_value_sink(Arc::new(CapturingValueSink {
2261 events: captured.clone(),
2262 }));
2263 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2264
2265 let receipt = fire_trigger(
2266 &log,
2267 &binding,
2268 "github",
2269 "pull_request",
2270 BTreeMap::from([
2271 ("repository".to_string(), "burin-labs/harn".to_string()),
2272 ("number".to_string(), "715".to_string()),
2273 ]),
2274 PersonaRunCost {
2275 avoided_cost_usd: 0.0042,
2276 deterministic_steps: 1,
2277 metadata: json!({
2278 "predicate": "pr_already_green",
2279 "would_have_called_model": "gpt-5.4-mini",
2280 }),
2281 ..Default::default()
2282 },
2283 now,
2284 )
2285 .await
2286 .unwrap();
2287
2288 let run_id = receipt.run_id.expect("completed run has run_id");
2289 let events = captured.lock().unwrap().clone();
2290 let deterministic = events
2291 .iter()
2292 .find(|event| {
2293 event.kind == PersonaValueEventKind::DeterministicExecution
2294 && event.run_id == Some(run_id)
2295 })
2296 .expect("deterministic execution value event");
2297 assert_eq!(deterministic.persona_id, "merge_captain");
2298 assert_eq!(
2299 deterministic.template_ref.as_deref(),
2300 Some("software_factory@v0")
2301 );
2302 assert_eq!(deterministic.run_id, Some(run_id));
2303 assert_eq!(deterministic.paid_cost_usd, 0.0);
2304 assert_eq!(deterministic.avoided_cost_usd, 0.0042);
2305 assert_eq!(deterministic.deterministic_steps, 1);
2306 assert_eq!(
2307 deterministic.metadata["predicate"].as_str(),
2308 Some("pr_already_green")
2309 );
2310
2311 let persisted = read_persona_events(&log, &binding.name).await.unwrap();
2312 assert!(persisted.iter().any(|(_, event)| {
2313 event.kind == "persona.value.deterministic_execution"
2314 && event.payload["avoided_cost_usd"] == json!(0.0042)
2315 }));
2316 }
2317
2318 #[tokio::test]
2319 async fn frontier_escalation_run_emits_value_event_with_paid_cost() {
2320 let log = log();
2321 let binding = binding();
2322 let captured = Arc::new(Mutex::new(Vec::new()));
2323 let _registration = register_persona_value_sink(Arc::new(CapturingValueSink {
2324 events: captured.clone(),
2325 }));
2326 let now = parse_rfc3339_ms("2026-04-24T12:00:00Z").unwrap();
2327
2328 let receipt = fire_trigger(
2329 &log,
2330 &binding,
2331 "linear",
2332 "issue",
2333 BTreeMap::from([("issue_key".to_string(), "HAR-715".to_string())]),
2334 PersonaRunCost {
2335 cost_usd: 0.011,
2336 tokens: 20,
2337 llm_steps: 1,
2338 frontier_escalations: 1,
2339 metadata: json!({
2340 "frontier_model": "gpt-5.4",
2341 "escalation_reason": "high_risk_merge",
2342 }),
2343 ..Default::default()
2344 },
2345 now,
2346 )
2347 .await
2348 .unwrap();
2349
2350 let run_id = receipt.run_id.expect("completed run has run_id");
2351 let events = captured.lock().unwrap().clone();
2352 let escalation = events
2353 .iter()
2354 .find(|event| {
2355 event.kind == PersonaValueEventKind::FrontierEscalation
2356 && event.run_id == Some(run_id)
2357 })
2358 .expect("frontier escalation value event");
2359 assert_eq!(escalation.run_id, Some(run_id));
2360 assert_eq!(escalation.paid_cost_usd, 0.011);
2361 assert_eq!(escalation.avoided_cost_usd, 0.0);
2362 assert_eq!(escalation.llm_steps, 1);
2363 assert_eq!(
2364 escalation.metadata["frontier_model"].as_str(),
2365 Some("gpt-5.4")
2366 );
2367
2368 let completion = events
2369 .iter()
2370 .find(|event| {
2371 event.kind == PersonaValueEventKind::RunCompleted && event.run_id == Some(run_id)
2372 })
2373 .expect("run completed value event");
2374 assert_eq!(completion.paid_cost_usd, 0.0);
2375 }
2376
2377 struct CapturingSupervisionSink {
2378 events: Arc<Mutex<Vec<PersonaSupervisionEvent>>>,
2379 }
2380
2381 impl PersonaSupervisionSink for CapturingSupervisionSink {
2382 fn handle_supervision_event(&self, event: &PersonaSupervisionEvent) {
2383 self.events.lock().unwrap().push(event.clone());
2384 }
2385 }
2386
2387 fn pr_metadata(repository: &str, number: &str) -> BTreeMap<String, String> {
2388 BTreeMap::from([
2389 ("repository".to_string(), repository.to_string()),
2390 ("number".to_string(), number.to_string()),
2391 ])
2392 }
2393
2394 fn binding_named(name: &str) -> PersonaRuntimeBinding {
2395 PersonaRuntimeBinding {
2396 name: name.to_string(),
2397 ..binding()
2398 }
2399 }
2400
2401 fn supervision_events_for(
2402 captured: &Arc<Mutex<Vec<PersonaSupervisionEvent>>>,
2403 persona: &str,
2404 ) -> Vec<PersonaSupervisionEvent> {
2405 captured
2406 .lock()
2407 .unwrap()
2408 .iter()
2409 .filter(|event| match event {
2410 PersonaSupervisionEvent::QueuePosition(update) => update.persona_id == persona,
2411 PersonaSupervisionEvent::RepairWorkerStatus(update) => update.persona_id == persona,
2412 PersonaSupervisionEvent::Receipt(update) => update.persona_id == persona,
2413 PersonaSupervisionEvent::Checkpoint(update) => update.persona_id == persona,
2414 })
2415 .cloned()
2416 .collect()
2417 }
2418
2419 async fn drive_pause_then_resume(binding: &PersonaRuntimeBinding, now: i64) {
2420 let log = log();
2421 pause_persona(&log, binding, now).await.unwrap();
2422 let _ = fire_trigger(
2423 &log,
2424 binding,
2425 "github",
2426 "pull_request",
2427 pr_metadata("burin-labs/harn", "1480"),
2428 PersonaRunCost::default(),
2429 now,
2430 )
2431 .await
2432 .unwrap();
2433 let _ = resume_persona(&log, binding, now + 1).await.unwrap();
2434 let _ = restore_persona_checkpoint(
2435 &log,
2436 binding,
2437 PersonaCheckpointRestoreRequest {
2438 checkpoint_id: "cp_42".to_string(),
2439 work_key: Some("github:burin-labs/harn:pr:1480".to_string()),
2440 resumed_from: Some(PersonaCheckpointResume {
2441 note: Some("resumed from cp 42".to_string()),
2442 ..Default::default()
2443 }),
2444 },
2445 now + 2,
2446 )
2447 .await
2448 .unwrap();
2449 let _ = report_repair_worker_status(
2450 &log,
2451 binding,
2452 PersonaRepairWorkerStatusUpdate {
2453 persona_id: String::new(),
2454 template_ref: None,
2455 repair_worker_id: "rw_42".to_string(),
2456 lifecycle: PersonaRepairWorkerLifecycle::Running,
2457 work_key: Some("github:burin-labs/harn:pr:1480".to_string()),
2458 lease_id: Some("persona_lease_xyz".to_string()),
2459 scratchpad_url: Some("https://factory.local/rw_42".to_string()),
2460 last_heartbeat_ms: 0,
2461 occurred_at_ms: 0,
2462 },
2463 now + 3,
2464 )
2465 .await
2466 .unwrap();
2467 }
2468
2469 #[tokio::test]
2470 async fn supervision_sink_emits_queue_position_and_receipt() {
2471 let captured = Arc::new(Mutex::new(Vec::new()));
2472 let _registration = register_persona_supervision_sink(Arc::new(CapturingSupervisionSink {
2473 events: captured.clone(),
2474 }));
2475 let log = log();
2476 let binding = binding_named("supervision_sink_emits_queue_position_and_receipt");
2477 let now = parse_rfc3339_ms("2026-05-01T00:00:00Z").unwrap();
2478
2479 pause_persona(&log, &binding, now).await.unwrap();
2480 fire_trigger(
2481 &log,
2482 &binding,
2483 "github",
2484 "pull_request",
2485 pr_metadata("burin-labs/harn", "1480"),
2486 PersonaRunCost::default(),
2487 now + 100,
2488 )
2489 .await
2490 .unwrap();
2491 resume_persona(&log, &binding, now + 200).await.unwrap();
2492
2493 let events = supervision_events_for(&captured, &binding.name);
2494 let queue_events: Vec<_> = events
2495 .iter()
2496 .filter_map(|event| match event {
2497 PersonaSupervisionEvent::QueuePosition(update) => Some(update.clone()),
2498 _ => None,
2499 })
2500 .collect();
2501 assert_eq!(queue_events.len(), 2, "enqueue + drain emitted");
2502 assert_eq!(queue_events[0].position, 1);
2503 assert_eq!(queue_events[0].queue_depth, 1);
2504 assert_eq!(queue_events[1].position, 0);
2505 assert_eq!(queue_events[1].queue_depth, 0);
2506 let receipt_events: Vec<_> = events
2507 .iter()
2508 .filter_map(|event| match event {
2509 PersonaSupervisionEvent::Receipt(update) => Some(update.clone()),
2510 _ => None,
2511 })
2512 .collect();
2513 assert_eq!(receipt_events.len(), 2, "queued + drained receipt");
2514 assert_eq!(receipt_events[0].receipt.status, "queued");
2515 assert_eq!(receipt_events[1].receipt.status, "completed");
2516 for event in &receipt_events {
2517 assert_eq!(event.receipt.persona, binding.name);
2518 assert_eq!(event.persona_id, binding.name);
2519 assert_eq!(event.template_ref.as_deref(), Some("software_factory@v0"));
2520 }
2521 let persisted_kinds: Vec<_> = read_persona_events(&log, &binding.name)
2522 .await
2523 .unwrap()
2524 .into_iter()
2525 .map(|(_, event)| event.kind)
2526 .collect();
2527 assert!(
2528 persisted_kinds
2529 .iter()
2530 .any(|kind| kind == "persona.supervision.queue_position"),
2531 "queue_position supervision events should be durable: {persisted_kinds:?}"
2532 );
2533 assert!(
2534 persisted_kinds
2535 .iter()
2536 .any(|kind| kind == "persona.supervision.receipt"),
2537 "receipt supervision events should be durable: {persisted_kinds:?}"
2538 );
2539 }
2540
2541 #[tokio::test]
2542 async fn supervision_sink_emits_repair_worker_status_idempotently() {
2543 let captured = Arc::new(Mutex::new(Vec::new()));
2544 let _registration = register_persona_supervision_sink(Arc::new(CapturingSupervisionSink {
2545 events: captured.clone(),
2546 }));
2547 let log = log();
2548 let binding = binding_named("supervision_sink_emits_repair_worker_status_idempotently");
2549 let now = parse_rfc3339_ms("2026-05-01T01:00:00Z").unwrap();
2550 let update = PersonaRepairWorkerStatusUpdate {
2551 persona_id: String::new(),
2552 template_ref: None,
2553 repair_worker_id: "rw_test".to_string(),
2554 lifecycle: PersonaRepairWorkerLifecycle::Running,
2555 work_key: Some("github:burin-labs/harn:pr:1480".to_string()),
2556 lease_id: Some("persona_lease_abc".to_string()),
2557 scratchpad_url: Some("https://factory.local/rw_test".to_string()),
2558 last_heartbeat_ms: 0,
2559 occurred_at_ms: 0,
2560 };
2561 let first = report_repair_worker_status(&log, &binding, update.clone(), now)
2562 .await
2563 .unwrap();
2564 let second = report_repair_worker_status(&log, &binding, update.clone(), now + 5)
2565 .await
2566 .unwrap();
2567 assert!(first);
2568 assert!(!second, "second identical lifecycle is idempotent");
2569
2570 let mut next = update.clone();
2571 next.lifecycle = PersonaRepairWorkerLifecycle::Succeeded;
2572 let third = report_repair_worker_status(&log, &binding, next, now + 10)
2573 .await
2574 .unwrap();
2575 assert!(third);
2576
2577 let kinds: Vec<_> = supervision_events_for(&captured, &binding.name)
2578 .into_iter()
2579 .filter_map(|event| match event {
2580 PersonaSupervisionEvent::RepairWorkerStatus(update) => Some(update.lifecycle),
2581 _ => None,
2582 })
2583 .collect();
2584 assert_eq!(
2585 kinds,
2586 vec![
2587 PersonaRepairWorkerLifecycle::Running,
2588 PersonaRepairWorkerLifecycle::Succeeded
2589 ]
2590 );
2591 }
2592
2593 #[tokio::test]
2594 async fn supervision_sink_emits_checkpoint_restore_ack() {
2595 let captured = Arc::new(Mutex::new(Vec::new()));
2596 let _registration = register_persona_supervision_sink(Arc::new(CapturingSupervisionSink {
2597 events: captured.clone(),
2598 }));
2599 let log = log();
2600 let binding = binding_named("supervision_sink_emits_checkpoint_restore_ack");
2601 let now = parse_rfc3339_ms("2026-05-01T02:00:00Z").unwrap();
2602 fire_trigger(
2603 &log,
2604 &binding,
2605 "github",
2606 "pull_request",
2607 pr_metadata("burin-labs/harn", "1480"),
2608 PersonaRunCost::default(),
2609 now,
2610 )
2611 .await
2612 .unwrap();
2613
2614 let outcome = restore_persona_checkpoint(
2615 &log,
2616 &binding,
2617 PersonaCheckpointRestoreRequest {
2618 checkpoint_id: "cp_1".to_string(),
2619 work_key: Some("github:burin-labs/harn:pr:1480".to_string()),
2620 resumed_from: None,
2621 },
2622 now + 100,
2623 )
2624 .await
2625 .unwrap();
2626 assert!(outcome.acked);
2627 assert_eq!(outcome.update.checkpoint_id, "cp_1");
2628 let resume = outcome
2629 .update
2630 .resumed_from
2631 .as_ref()
2632 .expect("resume coordinates default-derived from status");
2633 assert_eq!(resume.last_run_ms, Some(now));
2634
2635 let replay = restore_persona_checkpoint(
2636 &log,
2637 &binding,
2638 PersonaCheckpointRestoreRequest {
2639 checkpoint_id: "cp_1".to_string(),
2640 work_key: None,
2641 resumed_from: None,
2642 },
2643 now + 200,
2644 )
2645 .await
2646 .unwrap();
2647 assert!(!replay.acked, "duplicate restore is a no-op ack");
2648 assert_eq!(replay.update.occurred_at_ms, now + 100);
2649
2650 let ack_events: Vec<_> = supervision_events_for(&captured, &binding.name)
2651 .into_iter()
2652 .filter_map(|event| match event {
2653 PersonaSupervisionEvent::Checkpoint(update) => Some(update),
2654 _ => None,
2655 })
2656 .collect();
2657 assert_eq!(ack_events.len(), 1, "ack emitted once, replay suppressed");
2658 assert_eq!(ack_events[0].action, PersonaCheckpointAction::RestoreAcked);
2659 }
2660
2661 #[tokio::test]
2662 async fn supervision_sink_replay_is_deterministic_under_recorded_clock() {
2663 use harn_clock::{ClockEventLog, PausedClock, RecordedClock};
2664 use time::OffsetDateTime;
2665
2666 let now_ms = parse_rfc3339_ms("2026-05-01T03:00:00Z").unwrap();
2667 async fn drive(now_ms: i64) -> (Vec<PersonaSupervisionEvent>, Vec<harn_clock::ClockEvent>) {
2668 let captured = Arc::new(Mutex::new(Vec::new()));
2669 let _registration =
2670 register_persona_supervision_sink(Arc::new(CapturingSupervisionSink {
2671 events: captured.clone(),
2672 }));
2673 let paused = PausedClock::new(
2674 OffsetDateTime::from_unix_timestamp_nanos((now_ms as i128) * 1_000_000).unwrap(),
2675 );
2676 let recorded = Arc::new(RecordedClock::new(paused, Arc::new(ClockEventLog::new())));
2677 let binding = binding_named("supervision_replay_persona");
2678 let ts = harn_clock::now_wall_ms(&*recorded);
2679 drive_pause_then_resume(&binding, ts).await;
2680 let clock_log = recorded.log().snapshot();
2681 let events = supervision_events_for(&captured, &binding.name);
2682 (events, clock_log)
2683 }
2684
2685 let (events_a, clock_a) = drive(now_ms).await;
2686 let (events_b, clock_b) = drive(now_ms).await;
2687 fn normalize(event: &PersonaSupervisionEvent) -> PersonaSupervisionEvent {
2691 match event.clone() {
2692 PersonaSupervisionEvent::Receipt(mut update) => {
2693 update.receipt.run_id = None;
2694 if let Some(lease) = update.receipt.lease.as_mut() {
2695 lease.id = "lease".to_string();
2696 }
2697 update.receipt.budget_receipt_id = update
2698 .receipt
2699 .budget_receipt_id
2700 .map(|_| "budget".to_string());
2701 PersonaSupervisionEvent::Receipt(update)
2702 }
2703 PersonaSupervisionEvent::Checkpoint(mut update) => {
2704 if let Some(resume) = update.resumed_from.as_mut() {
2705 resume.run_id = None;
2706 resume.lease_id = None;
2707 }
2708 PersonaSupervisionEvent::Checkpoint(update)
2709 }
2710 other => other,
2711 }
2712 }
2713 let a: Vec<_> = events_a.iter().map(normalize).collect();
2714 let b: Vec<_> = events_b.iter().map(normalize).collect();
2715 assert_eq!(a, b, "supervision sink emits identical event envelopes");
2716 assert_eq!(
2717 clock_a, clock_b,
2718 "recorded clock observation log is identical across replays"
2719 );
2720 }
2721}