1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::sync::{Arc, OnceLock, RwLock};
4
5use chrono::{TimeZone, Utc};
6use croner::Cron;
7use serde::{Deserialize, Serialize};
8use serde_json::json;
9use time::format_description::well_known::Rfc3339;
10use time::OffsetDateTime;
11use uuid::Uuid;
12
13use crate::event_log::{AnyEventLog, EventLog, LogEvent, Topic};
14
15pub const PERSONA_RUNTIME_TOPIC: &str = "persona.runtime.events";
16
17const DEFAULT_LEASE_TTL_MS: i64 = 5 * 60 * 1000;
18
19#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
20#[serde(rename_all = "snake_case")]
21pub enum PersonaLifecycleState {
22 Inactive,
23 Starting,
24 #[default]
25 Idle,
26 Running,
27 Paused,
28 Draining,
29 Failed,
30 Disabled,
31}
32
33impl PersonaLifecycleState {
34 pub fn as_str(self) -> &'static str {
35 match self {
36 Self::Inactive => "inactive",
37 Self::Starting => "starting",
38 Self::Idle => "idle",
39 Self::Running => "running",
40 Self::Paused => "paused",
41 Self::Draining => "draining",
42 Self::Failed => "failed",
43 Self::Disabled => "disabled",
44 }
45 }
46}
47
48#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
49pub struct PersonaBudgetPolicy {
50 pub daily_usd: Option<f64>,
51 pub hourly_usd: Option<f64>,
52 pub run_usd: Option<f64>,
53 pub max_tokens: Option<u64>,
54}
55
56#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
57pub struct PersonaRuntimeBinding {
58 pub name: String,
59 #[serde(default)]
60 pub template_ref: Option<String>,
61 pub entry_workflow: String,
62 #[serde(default)]
63 pub schedules: Vec<String>,
64 #[serde(default)]
65 pub triggers: Vec<String>,
66 #[serde(default)]
67 pub budget: PersonaBudgetPolicy,
68 #[serde(default)]
73 pub stages: Vec<StageDecl>,
74}
75
76#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
84pub struct StageDecl {
85 pub name: String,
86 #[serde(default, skip_serializing_if = "Option::is_none")]
89 pub allowed_tools: Option<Vec<String>>,
90 #[serde(default, skip_serializing_if = "Option::is_none")]
94 pub side_effect_level: Option<String>,
95 #[serde(default, skip_serializing_if = "Option::is_none")]
98 pub max_iterations: Option<u32>,
99 #[serde(default, skip_serializing_if = "Option::is_none")]
102 pub on_exit: Option<StageExit>,
103}
104
105#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
106pub struct StageExit {
107 #[serde(default, skip_serializing_if = "Option::is_none")]
108 pub on_complete: Option<String>,
109 #[serde(default, skip_serializing_if = "Option::is_none")]
110 pub on_failure: Option<String>,
111 #[serde(default, skip_serializing_if = "Option::is_none")]
112 pub policy_override: Option<crate::orchestration::CapabilityPolicy>,
113}
114
115#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
116pub struct PersonaLease {
117 pub id: String,
118 pub holder: String,
119 pub work_key: String,
120 pub acquired_at_ms: i64,
121 pub expires_at_ms: i64,
122}
123
124#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
125pub struct PersonaBudgetStatus {
126 pub daily_usd: Option<f64>,
127 pub hourly_usd: Option<f64>,
128 pub run_usd: Option<f64>,
129 pub max_tokens: Option<u64>,
130 pub spent_today_usd: f64,
131 pub spent_this_hour_usd: f64,
132 pub spent_last_run_usd: f64,
133 pub tokens_today: u64,
134 pub remaining_today_usd: Option<f64>,
135 pub remaining_hour_usd: Option<f64>,
136 pub exhausted: bool,
137 pub reason: Option<String>,
138 pub last_receipt_id: Option<String>,
139}
140
141#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
142pub struct PersonaStatus {
143 pub name: String,
144 #[serde(default)]
145 pub template_ref: Option<String>,
146 pub state: PersonaLifecycleState,
147 pub entry_workflow: String,
148 #[serde(default)]
149 pub role: String,
150 #[serde(default)]
151 pub current_assignment: Option<PersonaAssignmentStatus>,
152 pub last_run: Option<String>,
153 pub next_scheduled_run: Option<String>,
154 pub active_lease: Option<PersonaLease>,
155 pub budget: PersonaBudgetStatus,
156 pub last_error: Option<String>,
157 pub queued_events: usize,
158 #[serde(default)]
159 pub queued_work: Vec<PersonaQueuedWork>,
160 #[serde(default)]
161 pub handoff_inbox: Vec<PersonaHandoffInboxItem>,
162 #[serde(default)]
163 pub value_receipts: Vec<PersonaValueReceipt>,
164 pub disabled_events: usize,
165 pub paused_event_policy: String,
166}
167
168#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
169pub struct PersonaAssignmentStatus {
170 pub work_key: String,
171 pub lease_id: String,
172 pub holder: String,
173 pub acquired_at: String,
174 pub expires_at: String,
175}
176
177#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
178pub struct PersonaQueuedWork {
179 pub work_key: String,
180 pub provider: String,
181 pub kind: String,
182 pub queued_at: String,
183 pub reason: String,
184 pub source_event_id: Option<String>,
185 #[serde(default)]
186 pub metadata: BTreeMap<String, String>,
187}
188
189#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
190pub struct PersonaHandoffInboxItem {
191 pub work_key: String,
192 pub handoff_id: Option<String>,
193 pub handoff_kind: Option<String>,
194 pub source_persona: Option<String>,
195 pub task: Option<String>,
196 pub queued_at: String,
197 pub reason: String,
198}
199
200#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
201pub struct PersonaValueReceipt {
202 pub kind: PersonaValueEventKind,
203 pub run_id: Option<Uuid>,
204 pub occurred_at: String,
205 pub paid_cost_usd: f64,
206 pub avoided_cost_usd: f64,
207 pub deterministic_steps: i64,
208 pub llm_steps: i64,
209 pub metadata: serde_json::Value,
210}
211
212#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
213pub struct PersonaTriggerEnvelope {
214 pub provider: String,
215 pub kind: String,
216 pub subject_key: String,
217 pub source_event_id: Option<String>,
218 pub received_at_ms: i64,
219 #[serde(default)]
220 pub metadata: BTreeMap<String, String>,
221 #[serde(default)]
222 pub raw: serde_json::Value,
223}
224
225#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
226pub struct PersonaRunReceipt {
227 pub status: String,
228 pub persona: String,
229 #[serde(default)]
230 pub run_id: Option<Uuid>,
231 pub work_key: String,
232 pub lease: Option<PersonaLease>,
233 pub queued: bool,
234 pub error: Option<String>,
235 pub budget_receipt_id: Option<String>,
236}
237
238#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
239pub struct PersonaRunCost {
240 pub cost_usd: f64,
241 pub tokens: u64,
242 #[serde(default)]
243 pub avoided_cost_usd: f64,
244 #[serde(default)]
245 pub deterministic_steps: i64,
246 #[serde(default)]
247 pub llm_steps: i64,
248 #[serde(default)]
249 pub frontier_escalations: i64,
250 #[serde(default)]
251 pub metadata: serde_json::Value,
252}
253
254#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
255#[serde(rename_all = "snake_case")]
256pub enum PersonaValueEventKind {
257 RunStarted,
258 RunCompleted,
259 AcceptedOutcome,
260 FrontierEscalation,
261 DeterministicExecution,
262 PromotionSavings,
263 ApprovalWait,
264}
265
266impl PersonaValueEventKind {
267 pub fn as_str(self) -> &'static str {
268 match self {
269 Self::RunStarted => "run_started",
270 Self::RunCompleted => "run_completed",
271 Self::AcceptedOutcome => "accepted_outcome",
272 Self::FrontierEscalation => "frontier_escalation",
273 Self::DeterministicExecution => "deterministic_execution",
274 Self::PromotionSavings => "promotion_savings",
275 Self::ApprovalWait => "approval_wait",
276 }
277 }
278}
279
280#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
281pub struct PersonaValueEvent {
282 pub persona_id: String,
283 pub template_ref: Option<String>,
284 pub run_id: Option<Uuid>,
285 pub kind: PersonaValueEventKind,
286 pub paid_cost_usd: f64,
287 pub avoided_cost_usd: f64,
288 pub deterministic_steps: i64,
289 pub llm_steps: i64,
290 pub metadata: serde_json::Value,
291 pub occurred_at: OffsetDateTime,
292}
293
294pub trait PersonaValueSink: Send + Sync {
295 fn handle_value_event(&self, event: &PersonaValueEvent);
296}
297
298#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
305#[serde(tag = "update_kind", rename_all = "snake_case")]
306pub enum PersonaSupervisionEvent {
307 QueuePosition(PersonaQueuePositionUpdate),
308 RepairWorkerStatus(PersonaRepairWorkerStatusUpdate),
309 Receipt(PersonaReceiptUpdate),
310 Checkpoint(PersonaCheckpointUpdate),
311}
312
313impl PersonaSupervisionEvent {
314 pub fn update_kind(&self) -> &'static str {
315 match self {
316 Self::QueuePosition(_) => "queue_position",
317 Self::RepairWorkerStatus(_) => "repair_worker_status",
318 Self::Receipt(_) => "receipt",
319 Self::Checkpoint(_) => "checkpoint",
320 }
321 }
322
323 pub fn persona_id(&self) -> &str {
324 match self {
325 Self::QueuePosition(update) => &update.persona_id,
326 Self::RepairWorkerStatus(update) => &update.persona_id,
327 Self::Receipt(update) => &update.persona_id,
328 Self::Checkpoint(update) => &update.persona_id,
329 }
330 }
331
332 pub fn template_ref(&self) -> Option<&str> {
333 match self {
334 Self::QueuePosition(update) => update.template_ref.as_deref(),
335 Self::RepairWorkerStatus(update) => update.template_ref.as_deref(),
336 Self::Receipt(update) => update.template_ref.as_deref(),
337 Self::Checkpoint(update) => update.template_ref.as_deref(),
338 }
339 }
340
341 pub fn occurred_at_ms(&self) -> i64 {
342 match self {
343 Self::QueuePosition(update) => update.occurred_at_ms,
344 Self::RepairWorkerStatus(update) => update.occurred_at_ms,
345 Self::Receipt(update) => update.occurred_at_ms,
346 Self::Checkpoint(update) => update.occurred_at_ms,
347 }
348 }
349}
350
351#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
352pub struct PersonaQueuePositionUpdate {
353 pub persona_id: String,
354 #[serde(default)]
355 pub template_ref: Option<String>,
356 pub work_key: String,
357 pub queue_depth: i64,
358 pub position: i64,
360 pub queued_at_ms: i64,
361 pub occurred_at_ms: i64,
362}
363
364#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
365#[serde(rename_all = "snake_case")]
366pub enum PersonaRepairWorkerLifecycle {
367 Pending,
368 Running,
369 Verifying,
370 Pushing,
371 Succeeded,
372 Failed,
373 Cancelled,
374}
375
376impl PersonaRepairWorkerLifecycle {
377 pub fn as_str(self) -> &'static str {
378 match self {
379 Self::Pending => "pending",
380 Self::Running => "running",
381 Self::Verifying => "verifying",
382 Self::Pushing => "pushing",
383 Self::Succeeded => "succeeded",
384 Self::Failed => "failed",
385 Self::Cancelled => "cancelled",
386 }
387 }
388}
389
390#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
391pub struct PersonaRepairWorkerStatusUpdate {
392 pub persona_id: String,
393 #[serde(default)]
394 pub template_ref: Option<String>,
395 pub repair_worker_id: String,
396 pub lifecycle: PersonaRepairWorkerLifecycle,
397 #[serde(default)]
398 pub work_key: Option<String>,
399 #[serde(default)]
400 pub lease_id: Option<String>,
401 #[serde(default)]
402 pub scratchpad_url: Option<String>,
403 pub last_heartbeat_ms: i64,
404 pub occurred_at_ms: i64,
405}
406
407#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
408pub struct PersonaReceiptUpdate {
409 pub persona_id: String,
410 #[serde(default)]
411 pub template_ref: Option<String>,
412 pub receipt: PersonaRunReceipt,
413 pub occurred_at_ms: i64,
414}
415
416#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
417pub struct PersonaCheckpointUpdate {
418 pub persona_id: String,
419 #[serde(default)]
420 pub template_ref: Option<String>,
421 pub action: PersonaCheckpointAction,
422 pub checkpoint_id: String,
423 #[serde(default)]
424 pub work_key: Option<String>,
425 #[serde(default)]
427 pub resumed_from: Option<PersonaCheckpointResume>,
428 pub occurred_at_ms: i64,
429}
430
431#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
432#[serde(rename_all = "snake_case")]
433pub enum PersonaCheckpointAction {
434 RestoreAcked,
435}
436
437impl PersonaCheckpointAction {
438 pub fn as_str(self) -> &'static str {
439 match self {
440 Self::RestoreAcked => "restore_acked",
441 }
442 }
443}
444
445#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
446pub struct PersonaCheckpointResume {
447 pub run_id: Option<Uuid>,
448 pub lease_id: Option<String>,
449 pub last_run_ms: Option<i64>,
450 pub queued_work_keys: Vec<String>,
451 #[serde(default)]
452 pub note: Option<String>,
453}
454
455pub trait PersonaSupervisionSink: Send + Sync {
456 fn handle_supervision_event(&self, event: &PersonaSupervisionEvent);
457}
458
459struct TypedSinkRegistry<T: ?Sized + Send + Sync> {
460 sinks: RwLock<Vec<(u64, Arc<T>)>>,
461 next_id: AtomicU64,
462}
463
464impl<T: ?Sized + Send + Sync> TypedSinkRegistry<T> {
465 const fn new() -> Self {
466 Self {
467 sinks: RwLock::new(Vec::new()),
468 next_id: AtomicU64::new(1),
469 }
470 }
471
472 fn register(&self, sink: Arc<T>) -> u64 {
473 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
474 if let Ok(mut sinks) = self.sinks.write() {
475 sinks.push((id, sink));
476 }
477 id
478 }
479
480 fn unregister(&self, id: u64) {
481 if let Ok(mut sinks) = self.sinks.write() {
482 sinks.retain(|(existing, _)| *existing != id);
483 }
484 }
485
486 fn snapshot(&self) -> Vec<Arc<T>> {
487 self.sinks
488 .read()
489 .map(|sinks| sinks.iter().map(|(_, sink)| Arc::clone(sink)).collect())
490 .unwrap_or_default()
491 }
492}
493
494fn persona_value_sinks() -> &'static TypedSinkRegistry<dyn PersonaValueSink> {
495 static REGISTRY: OnceLock<TypedSinkRegistry<dyn PersonaValueSink>> = OnceLock::new();
496 REGISTRY.get_or_init(TypedSinkRegistry::new)
497}
498
499fn persona_supervision_sinks() -> &'static TypedSinkRegistry<dyn PersonaSupervisionSink> {
500 static REGISTRY: OnceLock<TypedSinkRegistry<dyn PersonaSupervisionSink>> = OnceLock::new();
501 REGISTRY.get_or_init(TypedSinkRegistry::new)
502}
503
504#[must_use = "dropping the registration immediately unregisters the sink"]
505pub struct PersonaValueSinkRegistration {
506 id: u64,
507}
508
509impl Drop for PersonaValueSinkRegistration {
510 fn drop(&mut self) {
511 persona_value_sinks().unregister(self.id);
512 }
513}
514
515pub fn register_persona_value_sink(
516 sink: Arc<dyn PersonaValueSink>,
517) -> PersonaValueSinkRegistration {
518 PersonaValueSinkRegistration {
519 id: persona_value_sinks().register(sink),
520 }
521}
522
523#[must_use = "dropping the registration immediately unregisters the sink"]
524pub struct PersonaSupervisionSinkRegistration {
525 id: u64,
526}
527
528impl Drop for PersonaSupervisionSinkRegistration {
529 fn drop(&mut self) {
530 persona_supervision_sinks().unregister(self.id);
531 }
532}
533
534pub fn register_persona_supervision_sink(
535 sink: Arc<dyn PersonaSupervisionSink>,
536) -> PersonaSupervisionSinkRegistration {
537 PersonaSupervisionSinkRegistration {
538 id: persona_supervision_sinks().register(sink),
539 }
540}
541
542pub async fn persona_status(
543 log: &Arc<AnyEventLog>,
544 binding: &PersonaRuntimeBinding,
545 now_ms: i64,
546) -> Result<PersonaStatus, String> {
547 let events = read_persona_events(log, &binding.name).await?;
548 let mut state = PersonaLifecycleState::Idle;
549 let mut last_run_ms = None;
550 let mut active_lease = None;
551 let mut last_error = None;
552 let mut queued = BTreeSet::<String>::new();
553 let mut completed = BTreeSet::<String>::new();
554 let mut disabled_events = 0usize;
555 let mut budget_receipt = None;
556 let mut budget_exhaustion_reason = None;
557 let mut spent = Vec::<(i64, f64, u64)>::new();
558 let mut queued_work = BTreeMap::<String, PersonaQueuedWork>::new();
559 let mut value_receipts = Vec::<PersonaValueReceipt>::new();
560
561 for (_, event) in events {
562 match event.kind.as_str() {
563 "persona.control.paused" => state = PersonaLifecycleState::Paused,
564 "persona.control.resumed" => state = PersonaLifecycleState::Idle,
565 "persona.control.disabled" => state = PersonaLifecycleState::Disabled,
566 "persona.control.draining" => state = PersonaLifecycleState::Draining,
567 "persona.lease.acquired" => {
568 if let Ok(lease) = serde_json::from_value::<PersonaLease>(event.payload.clone()) {
569 active_lease = Some(lease);
570 state = PersonaLifecycleState::Running;
571 }
572 }
573 "persona.lease.released" => {
574 active_lease = None;
575 if !matches!(
576 state,
577 PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
578 ) {
579 state = PersonaLifecycleState::Idle;
580 }
581 }
582 "persona.lease.expired" => {
583 active_lease = None;
584 if !matches!(
585 state,
586 PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
587 ) {
588 state = PersonaLifecycleState::Idle;
589 }
590 }
591 "persona.run.started" => state = PersonaLifecycleState::Running,
592 "persona.run.completed" => {
593 last_run_ms = event
594 .payload
595 .get("completed_at_ms")
596 .and_then(serde_json::Value::as_i64)
597 .or(Some(event.occurred_at_ms));
598 if let Some(work_key) = event
599 .payload
600 .get("work_key")
601 .and_then(serde_json::Value::as_str)
602 {
603 completed.insert(work_key.to_string());
604 }
605 if !matches!(
606 state,
607 PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
608 ) {
609 state = PersonaLifecycleState::Idle;
610 }
611 }
612 "persona.run.failed" => {
613 state = PersonaLifecycleState::Failed;
614 last_error = event
615 .payload
616 .get("error")
617 .and_then(serde_json::Value::as_str)
618 .map(ToString::to_string);
619 }
620 "persona.trigger.queued" => {
621 if let Some(work_key) = event
622 .payload
623 .get("work_key")
624 .and_then(serde_json::Value::as_str)
625 {
626 queued.insert(work_key.to_string());
627 }
628 if let Some(item) = queued_work_from_event(&event)? {
629 queued_work.insert(item.work_key.clone(), item);
630 }
631 }
632 "persona.trigger.dead_lettered" => disabled_events += 1,
633 "persona.budget.recorded" => {
634 budget_receipt = event
635 .payload
636 .get("receipt_id")
637 .and_then(serde_json::Value::as_str)
638 .map(ToString::to_string);
639 spent.push((
640 event.occurred_at_ms,
641 event
642 .payload
643 .get("cost_usd")
644 .and_then(serde_json::Value::as_f64)
645 .unwrap_or_default(),
646 event
647 .payload
648 .get("tokens")
649 .and_then(serde_json::Value::as_u64)
650 .unwrap_or_default(),
651 ));
652 }
653 "persona.budget.exhausted" => {
654 budget_exhaustion_reason = event
655 .payload
656 .get("reason")
657 .and_then(serde_json::Value::as_str)
658 .map(ToString::to_string);
659 last_error = budget_exhaustion_reason
660 .as_ref()
661 .map(|reason| format!("persona budget exhausted: {reason}"));
662 budget_receipt = event
663 .payload
664 .get("receipt_id")
665 .and_then(serde_json::Value::as_str)
666 .map(ToString::to_string);
667 }
668 kind if kind.starts_with("persona.value.") => {
669 if let Some(receipt) = value_receipt_from_event(&event)? {
670 value_receipts.push(receipt);
671 }
672 }
673 _ => {}
674 }
675 }
676
677 if let Some(lease) = active_lease.as_ref() {
678 if lease.expires_at_ms <= now_ms {
679 active_lease = None;
680 if !matches!(
681 state,
682 PersonaLifecycleState::Paused | PersonaLifecycleState::Disabled
683 ) {
684 state = PersonaLifecycleState::Idle;
685 }
686 }
687 }
688
689 queued.retain(|work_key| !completed.contains(work_key));
690 queued_work.retain(|work_key, _| !completed.contains(work_key));
691 let queued_work = queued_work.into_values().collect::<Vec<_>>();
692 let handoff_inbox = queued_work
693 .iter()
694 .filter_map(handoff_inbox_item)
695 .collect::<Vec<_>>();
696
697 let mut budget = budget_status(&binding.budget, &spent, now_ms);
698 if budget.reason.is_none() {
699 if let Some(reason) = budget_exhaustion_reason {
700 budget.exhausted = true;
701 budget.reason = Some(reason);
702 }
703 }
704 if budget.last_receipt_id.is_none() {
705 budget.last_receipt_id = budget_receipt;
706 }
707
708 let current_assignment = active_lease.as_ref().map(assignment_status_from_lease);
709
710 Ok(PersonaStatus {
711 name: binding.name.clone(),
712 template_ref: binding.template_ref.clone(),
713 state,
714 entry_workflow: binding.entry_workflow.clone(),
715 role: binding.name.clone(),
716 current_assignment,
717 last_run: last_run_ms.map(format_ms),
718 next_scheduled_run: next_scheduled_run(binding, last_run_ms, now_ms),
719 active_lease,
720 budget,
721 last_error,
722 queued_events: queued.len(),
723 queued_work,
724 handoff_inbox,
725 value_receipts,
726 disabled_events,
727 paused_event_policy: "queue_then_drain_on_resume".to_string(),
728 })
729}
730
731pub async fn pause_persona(
732 log: &Arc<AnyEventLog>,
733 binding: &PersonaRuntimeBinding,
734 now_ms: i64,
735) -> Result<PersonaStatus, String> {
736 append_persona_event(
737 log,
738 &binding.name,
739 "persona.control.paused",
740 json!({"paused_at_ms": now_ms, "policy": "queue_then_drain_on_resume"}),
741 now_ms,
742 )
743 .await?;
744 persona_status(log, binding, now_ms).await
745}
746
747pub async fn resume_persona(
748 log: &Arc<AnyEventLog>,
749 binding: &PersonaRuntimeBinding,
750 now_ms: i64,
751) -> Result<PersonaStatus, String> {
752 append_persona_event(
753 log,
754 &binding.name,
755 "persona.control.resumed",
756 json!({"resumed_at_ms": now_ms, "drain": true}),
757 now_ms,
758 )
759 .await?;
760 let queued = queued_events(log, &binding.name).await?;
761 for (envelope, cost) in queued {
762 let _ = run_for_envelope(log, binding, envelope, cost, now_ms).await?;
763 }
764 persona_status(log, binding, now_ms).await
765}
766
767pub async fn disable_persona(
768 log: &Arc<AnyEventLog>,
769 binding: &PersonaRuntimeBinding,
770 now_ms: i64,
771) -> Result<PersonaStatus, String> {
772 append_persona_event(
773 log,
774 &binding.name,
775 "persona.control.disabled",
776 json!({"disabled_at_ms": now_ms}),
777 now_ms,
778 )
779 .await?;
780 persona_status(log, binding, now_ms).await
781}
782
783pub async fn fire_schedule(
784 log: &Arc<AnyEventLog>,
785 binding: &PersonaRuntimeBinding,
786 cost: PersonaRunCost,
787 now_ms: i64,
788) -> Result<PersonaRunReceipt, String> {
789 let schedule = binding
790 .schedules
791 .first()
792 .cloned()
793 .unwrap_or_else(|| "manual".to_string());
794 let envelope = PersonaTriggerEnvelope {
795 provider: "schedule".to_string(),
796 kind: "cron.tick".to_string(),
797 subject_key: format!("schedule:{}:{schedule}:{}", binding.name, format_ms(now_ms)),
798 source_event_id: None,
799 received_at_ms: now_ms,
800 metadata: BTreeMap::from([
801 ("persona".to_string(), binding.name.clone()),
802 ("schedule".to_string(), schedule),
803 ("fired_at".to_string(), format_ms(now_ms)),
804 ]),
805 raw: json!({}),
806 };
807 append_persona_event(
808 log,
809 &binding.name,
810 "persona.schedule.fired",
811 json!({"persona": binding.name, "envelope": envelope}),
812 now_ms,
813 )
814 .await?;
815 run_for_envelope(log, binding, envelope, cost, now_ms).await
816}
817
818pub async fn fire_trigger(
819 log: &Arc<AnyEventLog>,
820 binding: &PersonaRuntimeBinding,
821 provider: &str,
822 kind: &str,
823 metadata: BTreeMap<String, String>,
824 cost: PersonaRunCost,
825 now_ms: i64,
826) -> Result<PersonaRunReceipt, String> {
827 let envelope = normalize_trigger_envelope(provider, kind, metadata, now_ms);
828 append_persona_event(
829 log,
830 &binding.name,
831 "persona.trigger.received",
832 json!({"persona": binding.name, "envelope": envelope}),
833 now_ms,
834 )
835 .await?;
836 run_for_envelope(log, binding, envelope, cost, now_ms).await
837}
838
839pub async fn record_persona_spend(
840 log: &Arc<AnyEventLog>,
841 binding: &PersonaRuntimeBinding,
842 cost: PersonaRunCost,
843 now_ms: i64,
844) -> Result<PersonaBudgetStatus, String> {
845 enforce_budget(log, binding, &cost, now_ms).await?;
846 append_budget_record(log, &binding.name, &cost, None, now_ms).await?;
847 persona_status(log, binding, now_ms)
848 .await
849 .map(|status| status.budget)
850}
851
852pub async fn report_repair_worker_status(
860 log: &Arc<AnyEventLog>,
861 binding: &PersonaRuntimeBinding,
862 status: PersonaRepairWorkerStatusUpdate,
863 now_ms: i64,
864) -> Result<bool, String> {
865 let mut status = status;
866 if status.persona_id.is_empty() {
867 status.persona_id = binding.name.clone();
868 }
869 if status.template_ref.is_none() {
870 status.template_ref = binding.template_ref.clone();
871 }
872 if status.occurred_at_ms == 0 {
873 status.occurred_at_ms = now_ms;
874 }
875 if status.last_heartbeat_ms == 0 {
876 status.last_heartbeat_ms = now_ms;
877 }
878
879 if repair_worker_status_recorded(log, &binding.name, &status).await? {
880 return Ok(false);
881 }
882 append_persona_event(
883 log,
884 &binding.name,
885 "persona.repair_worker.status",
886 serde_json::to_value(&status).map_err(|error| error.to_string())?,
887 status.occurred_at_ms,
888 )
889 .await?;
890 record_persona_supervision_event(
891 log,
892 &binding.name,
893 PersonaSupervisionEvent::RepairWorkerStatus(status),
894 )
895 .await?;
896 Ok(true)
897}
898
899pub async fn restore_persona_checkpoint(
906 log: &Arc<AnyEventLog>,
907 binding: &PersonaRuntimeBinding,
908 request: PersonaCheckpointRestoreRequest,
909 now_ms: i64,
910) -> Result<PersonaCheckpointRestoreOutcome, String> {
911 let PersonaCheckpointRestoreRequest {
912 checkpoint_id,
913 work_key,
914 resumed_from,
915 } = request;
916 let status = persona_status(log, binding, now_ms).await?;
917
918 if let Some(prior) = find_checkpoint_restore_ack(log, &binding.name, &checkpoint_id).await? {
919 return Ok(PersonaCheckpointRestoreOutcome {
920 acked: false,
921 update: prior,
922 });
923 }
924
925 let resume_coordinates = resumed_from.unwrap_or_else(|| PersonaCheckpointResume {
926 run_id: None,
927 lease_id: status.active_lease.as_ref().map(|lease| lease.id.clone()),
928 last_run_ms: status
929 .last_run
930 .as_deref()
931 .and_then(|value| parse_rfc3339_ms(value).ok()),
932 queued_work_keys: status
933 .queued_work
934 .iter()
935 .map(|item| item.work_key.clone())
936 .collect(),
937 note: None,
938 });
939
940 let update = PersonaCheckpointUpdate {
941 persona_id: binding.name.clone(),
942 template_ref: binding.template_ref.clone(),
943 action: PersonaCheckpointAction::RestoreAcked,
944 checkpoint_id: checkpoint_id.clone(),
945 work_key,
946 resumed_from: Some(resume_coordinates),
947 occurred_at_ms: now_ms,
948 };
949
950 append_persona_event(
951 log,
952 &binding.name,
953 "persona.checkpoint.restore_acked",
954 serde_json::to_value(&update).map_err(|error| error.to_string())?,
955 now_ms,
956 )
957 .await?;
958 record_persona_supervision_event(
959 log,
960 &binding.name,
961 PersonaSupervisionEvent::Checkpoint(update.clone()),
962 )
963 .await?;
964 Ok(PersonaCheckpointRestoreOutcome {
965 acked: true,
966 update,
967 })
968}
969
970#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
971pub struct PersonaCheckpointRestoreRequest {
972 pub checkpoint_id: String,
973 #[serde(default)]
974 pub work_key: Option<String>,
975 #[serde(default)]
976 pub resumed_from: Option<PersonaCheckpointResume>,
977}
978
979#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
980pub struct PersonaCheckpointRestoreOutcome {
981 pub acked: bool,
982 pub update: PersonaCheckpointUpdate,
983}
984
985async fn repair_worker_status_recorded(
986 log: &Arc<AnyEventLog>,
987 persona: &str,
988 update: &PersonaRepairWorkerStatusUpdate,
989) -> Result<bool, String> {
990 let events = read_persona_events(log, persona).await?;
991 Ok(events.into_iter().any(|(_, event)| {
992 event.kind == "persona.repair_worker.status"
993 && event
994 .payload
995 .get("repair_worker_id")
996 .and_then(serde_json::Value::as_str)
997 == Some(update.repair_worker_id.as_str())
998 && event
999 .payload
1000 .get("lifecycle")
1001 .and_then(serde_json::Value::as_str)
1002 == Some(update.lifecycle.as_str())
1003 }))
1004}
1005
1006async fn find_checkpoint_restore_ack(
1007 log: &Arc<AnyEventLog>,
1008 persona: &str,
1009 checkpoint_id: &str,
1010) -> Result<Option<PersonaCheckpointUpdate>, String> {
1011 let events = read_persona_events(log, persona).await?;
1012 for (_, event) in events.into_iter().rev() {
1013 if event.kind != "persona.checkpoint.restore_acked" {
1014 continue;
1015 }
1016 if event
1017 .payload
1018 .get("checkpoint_id")
1019 .and_then(serde_json::Value::as_str)
1020 != Some(checkpoint_id)
1021 {
1022 continue;
1023 }
1024 let update: PersonaCheckpointUpdate =
1025 serde_json::from_value(event.payload).map_err(|error| error.to_string())?;
1026 return Ok(Some(update));
1027 }
1028 Ok(None)
1029}
1030
1031async fn run_for_envelope(
1032 log: &Arc<AnyEventLog>,
1033 binding: &PersonaRuntimeBinding,
1034 envelope: PersonaTriggerEnvelope,
1035 cost: PersonaRunCost,
1036 now_ms: i64,
1037) -> Result<PersonaRunReceipt, String> {
1038 let pre_queue = queue_snapshot(log, binding, now_ms).await?;
1039 let receipt = run_for_envelope_inner(log, binding, envelope, cost, now_ms).await?;
1040 let post_queue = queue_snapshot(log, binding, now_ms).await?;
1041 emit_queue_position_supervision(log, binding, &pre_queue, &post_queue, now_ms).await?;
1042 emit_receipt_supervision(log, binding, &receipt, now_ms).await?;
1043 Ok(receipt)
1044}
1045
1046async fn run_for_envelope_inner(
1047 log: &Arc<AnyEventLog>,
1048 binding: &PersonaRuntimeBinding,
1049 envelope: PersonaTriggerEnvelope,
1050 cost: PersonaRunCost,
1051 now_ms: i64,
1052) -> Result<PersonaRunReceipt, String> {
1053 let status = persona_status(log, binding, now_ms).await?;
1054 match status.state {
1055 PersonaLifecycleState::Paused => {
1056 append_persona_event(
1057 log,
1058 &binding.name,
1059 "persona.trigger.queued",
1060 json!({
1061 "work_key": envelope.subject_key,
1062 "envelope": envelope,
1063 "cost": cost,
1064 "reason": "paused",
1065 }),
1066 now_ms,
1067 )
1068 .await?;
1069 return Ok(PersonaRunReceipt {
1070 status: "queued".to_string(),
1071 persona: binding.name.clone(),
1072 run_id: None,
1073 work_key: envelope.subject_key,
1074 lease: None,
1075 queued: true,
1076 error: None,
1077 budget_receipt_id: None,
1078 });
1079 }
1080 PersonaLifecycleState::Disabled => {
1081 append_persona_event(
1082 log,
1083 &binding.name,
1084 "persona.trigger.dead_lettered",
1085 json!({
1086 "work_key": envelope.subject_key,
1087 "envelope": envelope,
1088 "reason": "disabled",
1089 }),
1090 now_ms,
1091 )
1092 .await?;
1093 return Ok(PersonaRunReceipt {
1094 status: "dead_lettered".to_string(),
1095 persona: binding.name.clone(),
1096 run_id: None,
1097 work_key: envelope.subject_key,
1098 lease: None,
1099 queued: false,
1100 error: Some("persona is disabled".to_string()),
1101 budget_receipt_id: None,
1102 });
1103 }
1104 _ => {}
1105 }
1106
1107 if let Err(error) = enforce_budget(log, binding, &cost, now_ms).await {
1108 return Ok(PersonaRunReceipt {
1109 status: "budget_exhausted".to_string(),
1110 persona: binding.name.clone(),
1111 run_id: None,
1112 work_key: envelope.subject_key,
1113 lease: None,
1114 queued: false,
1115 error: Some(error),
1116 budget_receipt_id: None,
1117 });
1118 }
1119
1120 if work_completed(log, &binding.name, &envelope.subject_key).await? {
1121 append_persona_event(
1122 log,
1123 &binding.name,
1124 "persona.trigger.duplicate",
1125 json!({
1126 "work_key": envelope.subject_key,
1127 "envelope": envelope,
1128 "reason": "already_completed",
1129 }),
1130 now_ms,
1131 )
1132 .await?;
1133 return Ok(PersonaRunReceipt {
1134 status: "duplicate".to_string(),
1135 persona: binding.name.clone(),
1136 run_id: None,
1137 work_key: envelope.subject_key,
1138 lease: None,
1139 queued: false,
1140 error: None,
1141 budget_receipt_id: None,
1142 });
1143 }
1144
1145 let Some(lease) = acquire_lease(
1146 log,
1147 binding,
1148 &envelope.subject_key,
1149 "persona-runtime",
1150 DEFAULT_LEASE_TTL_MS,
1151 now_ms,
1152 )
1153 .await?
1154 else {
1155 return Ok(PersonaRunReceipt {
1156 status: "lease_busy".to_string(),
1157 persona: binding.name.clone(),
1158 run_id: None,
1159 work_key: envelope.subject_key,
1160 lease: status.active_lease,
1161 queued: false,
1162 error: Some("active lease already owns persona work".to_string()),
1163 budget_receipt_id: None,
1164 });
1165 };
1166
1167 let run_id = Uuid::now_v7();
1168 let value_metadata = run_value_metadata(&envelope, &lease, &cost);
1169 append_persona_event(
1170 log,
1171 &binding.name,
1172 "persona.run.started",
1173 json!({
1174 "work_key": envelope.subject_key,
1175 "run_id": run_id,
1176 "started_at_ms": now_ms,
1177 "entry_workflow": binding.entry_workflow,
1178 "lease_id": lease.id,
1179 }),
1180 now_ms,
1181 )
1182 .await?;
1183 emit_persona_value_event(
1184 log,
1185 binding,
1186 run_id,
1187 PersonaValueEventDelta {
1188 kind: PersonaValueEventKind::RunStarted,
1189 metadata: value_metadata.clone(),
1190 ..Default::default()
1191 },
1192 now_ms,
1193 )
1194 .await?;
1195 let budget_receipt_id =
1196 append_budget_record(log, &binding.name, &cost, Some(&lease.id), now_ms).await?;
1197 if cost.avoided_cost_usd > 0.0 || cost.deterministic_steps > 0 {
1198 emit_persona_value_event(
1199 log,
1200 binding,
1201 run_id,
1202 PersonaValueEventDelta {
1203 kind: PersonaValueEventKind::DeterministicExecution,
1204 avoided_cost_usd: cost.avoided_cost_usd,
1205 deterministic_steps: cost.deterministic_steps.max(1),
1206 metadata: value_metadata.clone(),
1207 ..Default::default()
1208 },
1209 now_ms,
1210 )
1211 .await?;
1212 }
1213 if cost.frontier_escalations > 0 {
1214 emit_persona_value_event(
1215 log,
1216 binding,
1217 run_id,
1218 PersonaValueEventDelta {
1219 kind: PersonaValueEventKind::FrontierEscalation,
1220 paid_cost_usd: cost.cost_usd,
1221 llm_steps: cost.llm_steps.max(cost.frontier_escalations),
1222 metadata: value_metadata.clone(),
1223 ..Default::default()
1224 },
1225 now_ms,
1226 )
1227 .await?;
1228 }
1229 let completion_paid_cost = if cost.frontier_escalations > 0 {
1230 0.0
1231 } else {
1232 cost.cost_usd
1233 };
1234 let completion_llm_steps = if cost.frontier_escalations > 0 {
1235 0
1236 } else {
1237 cost.llm_steps
1238 };
1239 emit_persona_value_event(
1240 log,
1241 binding,
1242 run_id,
1243 PersonaValueEventDelta {
1244 kind: PersonaValueEventKind::RunCompleted,
1245 paid_cost_usd: completion_paid_cost,
1246 llm_steps: completion_llm_steps,
1247 metadata: value_metadata,
1248 ..Default::default()
1249 },
1250 now_ms,
1251 )
1252 .await?;
1253 append_persona_event(
1254 log,
1255 &binding.name,
1256 "persona.run.completed",
1257 json!({
1258 "work_key": envelope.subject_key,
1259 "run_id": run_id,
1260 "completed_at_ms": now_ms,
1261 "entry_workflow": binding.entry_workflow,
1262 "lease_id": lease.id,
1263 }),
1264 now_ms,
1265 )
1266 .await?;
1267 append_persona_event(
1268 log,
1269 &binding.name,
1270 "persona.lease.released",
1271 json!({
1272 "id": lease.id,
1273 "work_key": envelope.subject_key,
1274 "released_at_ms": now_ms,
1275 }),
1276 now_ms,
1277 )
1278 .await?;
1279 Ok(PersonaRunReceipt {
1280 status: "completed".to_string(),
1281 persona: binding.name.clone(),
1282 run_id: Some(run_id),
1283 work_key: envelope.subject_key,
1284 lease: Some(lease),
1285 queued: false,
1286 error: None,
1287 budget_receipt_id: Some(budget_receipt_id),
1288 })
1289}
1290
1291async fn acquire_lease(
1292 log: &Arc<AnyEventLog>,
1293 binding: &PersonaRuntimeBinding,
1294 work_key: &str,
1295 holder: &str,
1296 ttl_ms: i64,
1297 now_ms: i64,
1298) -> Result<Option<PersonaLease>, String> {
1299 let status = persona_status(log, binding, now_ms).await?;
1300 if let Some(lease) = status.active_lease {
1301 if lease.expires_at_ms > now_ms {
1302 append_persona_event(
1303 log,
1304 &binding.name,
1305 "persona.lease.conflict",
1306 json!({
1307 "active_lease": lease,
1308 "requested_work_key": work_key,
1309 "at_ms": now_ms,
1310 }),
1311 now_ms,
1312 )
1313 .await?;
1314 return Ok(None);
1315 }
1316 append_persona_event(
1317 log,
1318 &binding.name,
1319 "persona.lease.expired",
1320 json!({
1321 "id": lease.id,
1322 "work_key": lease.work_key,
1323 "expired_at_ms": now_ms,
1324 }),
1325 now_ms,
1326 )
1327 .await?;
1328 }
1329
1330 let lease = PersonaLease {
1331 id: format!("persona_lease_{}", Uuid::now_v7()),
1332 holder: holder.to_string(),
1333 work_key: work_key.to_string(),
1334 acquired_at_ms: now_ms,
1335 expires_at_ms: now_ms + ttl_ms,
1336 };
1337 append_persona_event(
1338 log,
1339 &binding.name,
1340 "persona.lease.acquired",
1341 serde_json::to_value(&lease).map_err(|error| error.to_string())?,
1342 now_ms,
1343 )
1344 .await?;
1345 Ok(Some(lease))
1346}
1347
1348async fn enforce_budget(
1349 log: &Arc<AnyEventLog>,
1350 binding: &PersonaRuntimeBinding,
1351 cost: &PersonaRunCost,
1352 now_ms: i64,
1353) -> Result<(), String> {
1354 let status = persona_status(log, binding, now_ms).await?;
1355 let reason = if binding
1356 .budget
1357 .run_usd
1358 .is_some_and(|limit| cost.cost_usd > limit)
1359 {
1360 Some("run_usd")
1361 } else if binding
1362 .budget
1363 .daily_usd
1364 .is_some_and(|limit| status.budget.spent_today_usd + cost.cost_usd > limit)
1365 {
1366 Some("daily_usd")
1367 } else if binding
1368 .budget
1369 .hourly_usd
1370 .is_some_and(|limit| status.budget.spent_this_hour_usd + cost.cost_usd > limit)
1371 {
1372 Some("hourly_usd")
1373 } else if binding
1374 .budget
1375 .max_tokens
1376 .is_some_and(|limit| status.budget.tokens_today + cost.tokens > limit)
1377 {
1378 Some("max_tokens")
1379 } else {
1380 None
1381 };
1382
1383 if let Some(reason) = reason {
1384 let receipt_id = format!("persona_budget_{}", Uuid::now_v7());
1385 append_persona_event(
1386 log,
1387 &binding.name,
1388 "persona.budget.exhausted",
1389 json!({
1390 "receipt_id": receipt_id,
1391 "reason": reason,
1392 "attempted_cost_usd": cost.cost_usd,
1393 "attempted_tokens": cost.tokens,
1394 "persona": binding.name,
1395 }),
1396 now_ms,
1397 )
1398 .await?;
1399 return Err(format!("persona budget exhausted: {reason}"));
1400 }
1401
1402 Ok(())
1403}
1404
1405async fn append_budget_record(
1406 log: &Arc<AnyEventLog>,
1407 persona: &str,
1408 cost: &PersonaRunCost,
1409 lease_id: Option<&str>,
1410 now_ms: i64,
1411) -> Result<String, String> {
1412 let receipt_id = format!("persona_budget_{}", Uuid::now_v7());
1413 append_persona_event(
1414 log,
1415 persona,
1416 "persona.budget.recorded",
1417 json!({
1418 "receipt_id": receipt_id,
1419 "persona": persona,
1420 "cost_usd": cost.cost_usd,
1421 "tokens": cost.tokens,
1422 "lease_id": lease_id,
1423 }),
1424 now_ms,
1425 )
1426 .await?;
1427 Ok(receipt_id)
1428}
1429
1430fn normalize_trigger_envelope(
1431 provider: &str,
1432 kind: &str,
1433 metadata: BTreeMap<String, String>,
1434 now_ms: i64,
1435) -> PersonaTriggerEnvelope {
1436 let provider = provider.to_ascii_lowercase();
1437 let kind = kind.to_string();
1438 let source_event_id = metadata
1439 .get("event_id")
1440 .or_else(|| metadata.get("id"))
1441 .cloned();
1442 let subject_key = match provider.as_str() {
1443 "github" => {
1444 let repo = metadata
1445 .get("repository")
1446 .or_else(|| metadata.get("repository.full_name"))
1447 .cloned()
1448 .unwrap_or_else(|| "unknown".to_string());
1449 if let Some(number) = metadata
1450 .get("pr")
1451 .or_else(|| metadata.get("pull_request.number"))
1452 .or_else(|| metadata.get("number"))
1453 {
1454 format!("github:{repo}:pr:{number}")
1455 } else if let Some(check) = metadata
1456 .get("check_run.name")
1457 .or_else(|| metadata.get("check_name"))
1458 {
1459 format!("github:{repo}:check:{check}")
1460 } else {
1461 format!("github:{repo}:{kind}")
1462 }
1463 }
1464 "linear" => {
1465 let issue = metadata
1466 .get("issue_key")
1467 .or_else(|| metadata.get("issue.identifier"))
1468 .or_else(|| metadata.get("issue_id"))
1469 .or_else(|| metadata.get("id"))
1470 .cloned()
1471 .unwrap_or_else(|| "unknown".to_string());
1472 format!("linear:issue:{issue}")
1473 }
1474 "slack" => {
1475 let channel = metadata
1476 .get("channel")
1477 .or_else(|| metadata.get("channel_id"))
1478 .cloned()
1479 .unwrap_or_else(|| "unknown".to_string());
1480 let ts = metadata
1481 .get("ts")
1482 .or_else(|| metadata.get("event_ts"))
1483 .cloned()
1484 .unwrap_or_else(|| "unknown".to_string());
1485 format!("slack:{channel}:{ts}")
1486 }
1487 "webhook" => metadata
1488 .get("dedupe_key")
1489 .or_else(|| metadata.get("event_id"))
1490 .map(|value| format!("webhook:{value}"))
1491 .unwrap_or_else(|| format!("webhook:{kind}:{}", Uuid::now_v7())),
1492 _ => metadata
1493 .get("dedupe_key")
1494 .or_else(|| metadata.get("event_id"))
1495 .map(|value| format!("{provider}:{kind}:{value}"))
1496 .unwrap_or_else(|| format!("{provider}:{kind}:{}", Uuid::now_v7())),
1497 };
1498
1499 PersonaTriggerEnvelope {
1500 provider,
1501 kind,
1502 subject_key,
1503 source_event_id,
1504 received_at_ms: now_ms,
1505 raw: json!({"metadata": metadata}),
1506 metadata,
1507 }
1508}
1509
1510async fn queued_events(
1511 log: &Arc<AnyEventLog>,
1512 persona: &str,
1513) -> Result<Vec<(PersonaTriggerEnvelope, PersonaRunCost)>, String> {
1514 let events = read_persona_events(log, persona).await?;
1515 let mut queued = BTreeMap::<String, (PersonaTriggerEnvelope, PersonaRunCost)>::new();
1516 let mut completed = BTreeSet::<String>::new();
1517 for (_, event) in events {
1518 match event.kind.as_str() {
1519 "persona.trigger.queued" => {
1520 let Some(envelope) = event.payload.get("envelope") else {
1521 continue;
1522 };
1523 let envelope: PersonaTriggerEnvelope =
1524 serde_json::from_value(envelope.clone()).map_err(|error| error.to_string())?;
1525 let cost = event
1526 .payload
1527 .get("cost")
1528 .cloned()
1529 .map(serde_json::from_value::<PersonaRunCost>)
1530 .transpose()
1531 .map_err(|error| error.to_string())?
1532 .unwrap_or_default();
1533 queued.insert(envelope.subject_key.clone(), (envelope, cost));
1534 }
1535 "persona.run.completed" => {
1536 if let Some(work_key) = event
1537 .payload
1538 .get("work_key")
1539 .and_then(serde_json::Value::as_str)
1540 {
1541 completed.insert(work_key.to_string());
1542 }
1543 }
1544 _ => {}
1545 }
1546 }
1547 queued.retain(|work_key, _| !completed.contains(work_key));
1548 Ok(queued.into_values().collect())
1549}
1550
1551fn assignment_status_from_lease(lease: &PersonaLease) -> PersonaAssignmentStatus {
1552 PersonaAssignmentStatus {
1553 work_key: lease.work_key.clone(),
1554 lease_id: lease.id.clone(),
1555 holder: lease.holder.clone(),
1556 acquired_at: format_ms(lease.acquired_at_ms),
1557 expires_at: format_ms(lease.expires_at_ms),
1558 }
1559}
1560
1561fn queued_work_from_event(event: &LogEvent) -> Result<Option<PersonaQueuedWork>, String> {
1562 let Some(envelope) = event.payload.get("envelope") else {
1563 return Ok(None);
1564 };
1565 let envelope: PersonaTriggerEnvelope =
1566 serde_json::from_value(envelope.clone()).map_err(|error| error.to_string())?;
1567 Ok(Some(PersonaQueuedWork {
1568 work_key: envelope.subject_key,
1569 provider: envelope.provider,
1570 kind: envelope.kind,
1571 queued_at: format_ms(event.occurred_at_ms),
1572 reason: event
1573 .payload
1574 .get("reason")
1575 .and_then(serde_json::Value::as_str)
1576 .unwrap_or("queued")
1577 .to_string(),
1578 source_event_id: envelope.source_event_id,
1579 metadata: envelope.metadata,
1580 }))
1581}
1582
1583fn handoff_inbox_item(work: &PersonaQueuedWork) -> Option<PersonaHandoffInboxItem> {
1584 if work.provider != "handoff" && !work.metadata.contains_key("handoff_id") {
1585 return None;
1586 }
1587 Some(PersonaHandoffInboxItem {
1588 work_key: work.work_key.clone(),
1589 handoff_id: work.metadata.get("handoff_id").cloned(),
1590 handoff_kind: work
1591 .metadata
1592 .get("handoff_kind")
1593 .or_else(|| work.metadata.get("kind"))
1594 .cloned(),
1595 source_persona: work.metadata.get("source_persona").cloned(),
1596 task: work.metadata.get("task").cloned(),
1597 queued_at: work.queued_at.clone(),
1598 reason: work.reason.clone(),
1599 })
1600}
1601
1602fn value_receipt_from_event(event: &LogEvent) -> Result<Option<PersonaValueReceipt>, String> {
1603 let Ok(value_event) = serde_json::from_value::<PersonaValueEvent>(event.payload.clone()) else {
1604 return Ok(None);
1605 };
1606 Ok(Some(PersonaValueReceipt {
1607 kind: value_event.kind,
1608 run_id: value_event.run_id,
1609 occurred_at: value_event
1610 .occurred_at
1611 .format(&Rfc3339)
1612 .map_err(|error| error.to_string())?,
1613 paid_cost_usd: value_event.paid_cost_usd,
1614 avoided_cost_usd: value_event.avoided_cost_usd,
1615 deterministic_steps: value_event.deterministic_steps,
1616 llm_steps: value_event.llm_steps,
1617 metadata: value_event.metadata,
1618 }))
1619}
1620
1621async fn work_completed(
1622 log: &Arc<AnyEventLog>,
1623 persona: &str,
1624 work_key: &str,
1625) -> Result<bool, String> {
1626 let events = read_persona_events(log, persona).await?;
1627 Ok(events.into_iter().any(|(_, event)| {
1628 event.kind == "persona.run.completed"
1629 && event
1630 .payload
1631 .get("work_key")
1632 .and_then(serde_json::Value::as_str)
1633 == Some(work_key)
1634 }))
1635}
1636
1637async fn read_persona_events(
1638 log: &Arc<AnyEventLog>,
1639 persona: &str,
1640) -> Result<Vec<(u64, LogEvent)>, String> {
1641 let topic = runtime_topic()?;
1642 Ok(log
1643 .read_range(&topic, None, usize::MAX)
1644 .await
1645 .map_err(|error| error.to_string())?
1646 .into_iter()
1647 .filter(|(_, event)| {
1648 event
1649 .headers
1650 .get("persona")
1651 .is_some_and(|name| name == persona)
1652 })
1653 .collect())
1654}
1655
1656async fn append_persona_event(
1657 log: &Arc<AnyEventLog>,
1658 persona: &str,
1659 kind: &str,
1660 payload: serde_json::Value,
1661 now_ms: i64,
1662) -> Result<u64, String> {
1663 let mut headers = BTreeMap::new();
1664 headers.insert("persona".to_string(), persona.to_string());
1665 forward_persona_run_event(persona, kind, &payload);
1666 let event = LogEvent {
1667 kind: kind.to_string(),
1668 payload,
1669 headers,
1670 occurred_at_ms: now_ms,
1671 };
1672 log.append(&runtime_topic()?, event)
1673 .await
1674 .map_err(|error| error.to_string())
1675}
1676
1677fn forward_persona_run_event(persona: &str, kind: &str, payload: &serde_json::Value) {
1684 if !crate::run_events::sink_active() {
1685 return;
1686 }
1687 let transition = kind
1688 .strip_prefix("persona.stage.")
1689 .or_else(|| kind.strip_prefix("persona.run."));
1690 let Some(transition) = transition else {
1691 return;
1692 };
1693 let stage = payload
1694 .get("stage")
1695 .or_else(|| payload.get("to"))
1696 .or_else(|| payload.get("name"))
1697 .and_then(|value| value.as_str())
1698 .unwrap_or("")
1699 .to_string();
1700 crate::run_events::emit(crate::run_events::RunEvent::PersonaStage {
1701 persona: persona.to_string(),
1702 stage,
1703 transition: transition.to_string(),
1704 });
1705}
1706
1707struct PersonaValueEventDelta {
1708 kind: PersonaValueEventKind,
1709 paid_cost_usd: f64,
1710 avoided_cost_usd: f64,
1711 deterministic_steps: i64,
1712 llm_steps: i64,
1713 metadata: serde_json::Value,
1714}
1715
1716impl Default for PersonaValueEventDelta {
1717 fn default() -> Self {
1718 Self {
1719 kind: PersonaValueEventKind::RunCompleted,
1720 paid_cost_usd: 0.0,
1721 avoided_cost_usd: 0.0,
1722 deterministic_steps: 0,
1723 llm_steps: 0,
1724 metadata: serde_json::Value::Null,
1725 }
1726 }
1727}
1728
1729async fn emit_persona_value_event(
1730 log: &Arc<AnyEventLog>,
1731 binding: &PersonaRuntimeBinding,
1732 run_id: Uuid,
1733 delta: PersonaValueEventDelta,
1734 now_ms: i64,
1735) -> Result<(), String> {
1736 let event = PersonaValueEvent {
1737 persona_id: binding.name.clone(),
1738 template_ref: binding.template_ref.clone(),
1739 run_id: Some(run_id),
1740 kind: delta.kind,
1741 paid_cost_usd: delta.paid_cost_usd.max(0.0),
1742 avoided_cost_usd: delta.avoided_cost_usd.max(0.0),
1743 deterministic_steps: delta.deterministic_steps.max(0),
1744 llm_steps: delta.llm_steps.max(0),
1745 metadata: delta.metadata,
1746 occurred_at: offset_datetime_from_ms(now_ms),
1747 };
1748 append_persona_event(
1749 log,
1750 &binding.name,
1751 &format!("persona.value.{}", event.kind.as_str()),
1752 serde_json::to_value(&event).map_err(|error| error.to_string())?,
1753 now_ms,
1754 )
1755 .await?;
1756 emit_persona_value_sink_event(&event);
1757 Ok(())
1758}
1759
1760fn emit_persona_value_sink_event(event: &PersonaValueEvent) {
1761 for sink in persona_value_sinks().snapshot() {
1762 sink.handle_value_event(event);
1763 }
1764}
1765
1766fn emit_persona_supervision_sink_event(event: &PersonaSupervisionEvent) {
1767 for sink in persona_supervision_sinks().snapshot() {
1768 sink.handle_supervision_event(event);
1769 }
1770}
1771
1772async fn record_persona_supervision_event(
1773 log: &Arc<AnyEventLog>,
1774 persona: &str,
1775 event: PersonaSupervisionEvent,
1776) -> Result<(), String> {
1777 let update_kind = event.update_kind();
1778 let occurred_at_ms = event.occurred_at_ms();
1779 append_persona_event(
1780 log,
1781 persona,
1782 &format!("persona.supervision.{update_kind}"),
1783 serde_json::to_value(&event).map_err(|error| error.to_string())?,
1784 occurred_at_ms,
1785 )
1786 .await?;
1787 emit_persona_supervision_sink_event(&event);
1788 Ok(())
1789}
1790
1791#[derive(Clone, Debug)]
1792struct QueueEntry {
1793 work_key: String,
1794 queued_at_ms: i64,
1795}
1796
1797async fn queue_snapshot(
1798 log: &Arc<AnyEventLog>,
1799 binding: &PersonaRuntimeBinding,
1800 now_ms: i64,
1801) -> Result<Vec<QueueEntry>, String> {
1802 let status = persona_status(log, binding, now_ms).await?;
1803 Ok(status
1804 .queued_work
1805 .into_iter()
1806 .map(|item| QueueEntry {
1807 queued_at_ms: parse_rfc3339_ms(&item.queued_at).unwrap_or(now_ms),
1808 work_key: item.work_key,
1809 })
1810 .collect())
1811}
1812
1813async fn emit_queue_position_supervision(
1814 log: &Arc<AnyEventLog>,
1815 binding: &PersonaRuntimeBinding,
1816 before: &[QueueEntry],
1817 after: &[QueueEntry],
1818 now_ms: i64,
1819) -> Result<(), String> {
1820 use std::collections::HashSet;
1821 let before_keys: HashSet<&str> = before.iter().map(|e| e.work_key.as_str()).collect();
1822 let after_keys: HashSet<&str> = after.iter().map(|e| e.work_key.as_str()).collect();
1823 let after_depth = after.len() as i64;
1824
1825 for (index, entry) in after.iter().enumerate() {
1826 if !before_keys.contains(entry.work_key.as_str()) {
1827 record_persona_supervision_event(
1828 log,
1829 &binding.name,
1830 PersonaSupervisionEvent::QueuePosition(PersonaQueuePositionUpdate {
1831 persona_id: binding.name.clone(),
1832 template_ref: binding.template_ref.clone(),
1833 work_key: entry.work_key.clone(),
1834 queue_depth: after_depth,
1835 position: (index + 1) as i64,
1836 queued_at_ms: entry.queued_at_ms,
1837 occurred_at_ms: now_ms,
1838 }),
1839 )
1840 .await?;
1841 }
1842 }
1843 for entry in before {
1844 if !after_keys.contains(entry.work_key.as_str()) {
1845 record_persona_supervision_event(
1846 log,
1847 &binding.name,
1848 PersonaSupervisionEvent::QueuePosition(PersonaQueuePositionUpdate {
1849 persona_id: binding.name.clone(),
1850 template_ref: binding.template_ref.clone(),
1851 work_key: entry.work_key.clone(),
1852 queue_depth: after_depth,
1853 position: 0,
1854 queued_at_ms: entry.queued_at_ms,
1855 occurred_at_ms: now_ms,
1856 }),
1857 )
1858 .await?;
1859 }
1860 }
1861 Ok(())
1862}
1863
1864async fn emit_receipt_supervision(
1865 log: &Arc<AnyEventLog>,
1866 binding: &PersonaRuntimeBinding,
1867 receipt: &PersonaRunReceipt,
1868 now_ms: i64,
1869) -> Result<(), String> {
1870 record_persona_supervision_event(
1871 log,
1872 &binding.name,
1873 PersonaSupervisionEvent::Receipt(PersonaReceiptUpdate {
1874 persona_id: binding.name.clone(),
1875 template_ref: binding.template_ref.clone(),
1876 receipt: receipt.clone(),
1877 occurred_at_ms: now_ms,
1878 }),
1879 )
1880 .await
1881}
1882
1883fn run_value_metadata(
1884 envelope: &PersonaTriggerEnvelope,
1885 lease: &PersonaLease,
1886 cost: &PersonaRunCost,
1887) -> serde_json::Value {
1888 let mut metadata = serde_json::Map::new();
1889 metadata.insert("work_key".to_string(), json!(envelope.subject_key));
1890 metadata.insert("trigger_provider".to_string(), json!(envelope.provider));
1891 metadata.insert("trigger_kind".to_string(), json!(envelope.kind));
1892 metadata.insert("lease_id".to_string(), json!(lease.id));
1893 metadata.insert("tokens".to_string(), json!(cost.tokens));
1894 if cost.frontier_escalations > 0 {
1895 metadata.insert(
1896 "frontier_escalations".to_string(),
1897 json!(cost.frontier_escalations),
1898 );
1899 }
1900 match &cost.metadata {
1901 serde_json::Value::Null => {}
1902 serde_json::Value::Object(extra) => {
1903 metadata.extend(
1904 extra
1905 .iter()
1906 .map(|(key, value)| (key.clone(), value.clone())),
1907 );
1908 }
1909 extra => {
1910 metadata.insert("run_cost_metadata".to_string(), extra.clone());
1911 }
1912 }
1913 serde_json::Value::Object(metadata)
1914}
1915
1916fn budget_status(
1917 policy: &PersonaBudgetPolicy,
1918 spent: &[(i64, f64, u64)],
1919 now_ms: i64,
1920) -> PersonaBudgetStatus {
1921 let day_start = now_ms - (now_ms.rem_euclid(86_400_000));
1922 let hour_start = now_ms - (now_ms.rem_euclid(3_600_000));
1923 let mut spent_today_usd = 0.0;
1924 let mut spent_this_hour_usd = 0.0;
1925 let mut tokens_today = 0u64;
1926 let mut spent_last_run_usd = 0.0;
1927 for (at_ms, cost, tokens) in spent {
1928 spent_last_run_usd = *cost;
1929 if *at_ms >= day_start {
1930 spent_today_usd += cost;
1931 tokens_today += tokens;
1932 }
1933 if *at_ms >= hour_start {
1934 spent_this_hour_usd += cost;
1935 }
1936 }
1937
1938 let remaining_today_usd = policy
1939 .daily_usd
1940 .map(|limit| (limit - spent_today_usd).max(0.0));
1941 let remaining_hour_usd = policy
1942 .hourly_usd
1943 .map(|limit| (limit - spent_this_hour_usd).max(0.0));
1944 let reason = if policy
1945 .daily_usd
1946 .is_some_and(|limit| spent_today_usd >= limit && limit >= 0.0)
1947 {
1948 Some("daily_usd".to_string())
1949 } else if policy
1950 .hourly_usd
1951 .is_some_and(|limit| spent_this_hour_usd >= limit && limit >= 0.0)
1952 {
1953 Some("hourly_usd".to_string())
1954 } else if policy
1955 .max_tokens
1956 .is_some_and(|limit| tokens_today >= limit && limit > 0)
1957 {
1958 Some("max_tokens".to_string())
1959 } else {
1960 None
1961 };
1962
1963 PersonaBudgetStatus {
1964 daily_usd: policy.daily_usd,
1965 hourly_usd: policy.hourly_usd,
1966 run_usd: policy.run_usd,
1967 max_tokens: policy.max_tokens,
1968 spent_today_usd,
1969 spent_this_hour_usd,
1970 spent_last_run_usd,
1971 tokens_today,
1972 remaining_today_usd,
1973 remaining_hour_usd,
1974 exhausted: reason.is_some(),
1975 reason,
1976 last_receipt_id: None,
1977 }
1978}
1979
1980fn next_scheduled_run(
1981 binding: &PersonaRuntimeBinding,
1982 last_run_ms: Option<i64>,
1983 now_ms: i64,
1984) -> Option<String> {
1985 binding
1986 .schedules
1987 .iter()
1988 .filter_map(|schedule| next_cron_ms(schedule, last_run_ms.unwrap_or(now_ms)).ok())
1989 .min()
1990 .map(format_ms)
1991}
1992
1993fn next_cron_ms(schedule: &str, after_ms: i64) -> Result<i64, String> {
1994 let cron = schedule
1995 .parse::<Cron>()
1996 .map_err(|error| error.to_string())?;
1997 let after = Utc
1998 .timestamp_millis_opt(after_ms)
1999 .single()
2000 .ok_or_else(|| "invalid timestamp".to_string())?;
2001 let next = cron
2002 .find_next_occurrence(&after, false)
2003 .map_err(|error| error.to_string())?;
2004 Ok(next.timestamp_millis())
2005}
2006
2007pub fn now_ms() -> i64 {
2008 harn_clock::offset_datetime_to_ms(OffsetDateTime::now_utc())
2009}
2010
2011fn offset_datetime_from_ms(ms: i64) -> OffsetDateTime {
2012 OffsetDateTime::from_unix_timestamp_nanos((ms as i128) * 1_000_000)
2013 .unwrap_or(OffsetDateTime::UNIX_EPOCH)
2014}
2015
2016pub fn format_ms(ms: i64) -> String {
2017 offset_datetime_from_ms(ms)
2018 .format(&Rfc3339)
2019 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
2020}
2021
2022pub fn parse_rfc3339_ms(value: &str) -> Result<i64, String> {
2023 let ts = OffsetDateTime::parse(value, &Rfc3339)
2024 .map_err(|error| format!("invalid RFC3339 timestamp '{value}': {error}"))?;
2025 Ok(harn_clock::offset_datetime_to_ms(ts))
2026}
2027
2028fn runtime_topic() -> Result<Topic, String> {
2029 Topic::new(PERSONA_RUNTIME_TOPIC).map_err(|error| error.to_string())
2030}
2031
2032#[cfg(test)]
2033mod tests;