1use serde::{Deserialize, Serialize};
2use std::cell::RefCell;
3use std::collections::{BTreeMap, BTreeSet, VecDeque};
4use std::path::PathBuf;
5use std::rc::Rc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::{Arc, Mutex};
8
9use time::OffsetDateTime;
10use uuid::Uuid;
11
12use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogEvent, Topic};
13use crate::llm::trigger_predicate::TriggerPredicateBudget;
14use crate::secrets::{configured_default_chain, SecretProvider};
15use crate::triggers::test_util::clock;
16use crate::trust_graph::AutonomyTier;
17use crate::value::VmClosure;
18
19use super::dispatcher::TriggerRetryConfig;
20use super::flow_control::TriggerFlowControlConfig;
21use super::ProviderId;
22
23#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
24pub struct TriggerId(String);
25
26impl TriggerId {
27 pub fn new(value: impl Into<String>) -> Self {
28 Self(value.into())
29 }
30
31 pub fn as_str(&self) -> &str {
32 &self.0
33 }
34}
35
36impl std::fmt::Display for TriggerId {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 self.0.fmt(f)
39 }
40}
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
43#[serde(rename_all = "snake_case")]
44pub enum TriggerState {
45 Registering,
46 Active,
47 Paused,
48 Draining,
49 Terminated,
50}
51
52impl TriggerState {
53 pub fn as_str(self) -> &'static str {
54 match self {
55 Self::Registering => "registering",
56 Self::Active => "active",
57 Self::Paused => "paused",
58 Self::Draining => "draining",
59 Self::Terminated => "terminated",
60 }
61 }
62}
63
64#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
65#[serde(rename_all = "snake_case")]
66pub enum TriggerBindingSource {
67 Manifest,
68 Dynamic,
69}
70
71impl TriggerBindingSource {
72 pub fn as_str(self) -> &'static str {
73 match self {
74 Self::Manifest => "manifest",
75 Self::Dynamic => "dynamic",
76 }
77 }
78}
79
80#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
81#[serde(rename_all = "snake_case")]
82pub enum TriggerBudgetExhaustionStrategy {
83 #[default]
84 False,
85 RetryLater,
86 Fail,
87 Warn,
88}
89
90impl TriggerBudgetExhaustionStrategy {
91 pub fn as_str(self) -> &'static str {
92 match self {
93 Self::False => "false",
94 Self::RetryLater => "retry_later",
95 Self::Fail => "fail",
96 Self::Warn => "warn",
97 }
98 }
99}
100
101#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
102pub struct OrchestratorBudgetConfig {
103 pub daily_cost_usd: Option<f64>,
104 pub hourly_cost_usd: Option<f64>,
105}
106
107#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
108pub struct OrchestratorBudgetSnapshot {
109 pub daily_cost_usd: Option<f64>,
110 pub hourly_cost_usd: Option<f64>,
111 pub cost_today_usd_micros: u64,
112 pub cost_hour_usd_micros: u64,
113 pub day_utc: i32,
114 pub hour_utc: i64,
115}
116
117#[derive(Debug)]
118struct OrchestratorBudgetState {
119 config: OrchestratorBudgetConfig,
120 day_utc: i32,
121 hour_utc: i64,
122 cost_today_usd_micros: u64,
123 cost_hour_usd_micros: u64,
124}
125
126impl Default for OrchestratorBudgetState {
127 fn default() -> Self {
128 Self {
129 config: OrchestratorBudgetConfig::default(),
130 day_utc: utc_day_key(),
131 hour_utc: utc_hour_key(),
132 cost_today_usd_micros: 0,
133 cost_hour_usd_micros: 0,
134 }
135 }
136}
137
138#[derive(Clone)]
139pub enum TriggerHandlerSpec {
140 Local {
141 raw: String,
142 closure: Rc<VmClosure>,
143 },
144 A2a {
145 target: String,
146 allow_cleartext: bool,
147 },
148 Worker {
149 queue: String,
150 },
151 Persona {
152 binding: crate::PersonaRuntimeBinding,
153 },
154}
155
156impl std::fmt::Debug for TriggerHandlerSpec {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 match self {
159 Self::Local { raw, .. } => f.debug_struct("Local").field("raw", raw).finish(),
160 Self::A2a {
161 target,
162 allow_cleartext,
163 } => f
164 .debug_struct("A2a")
165 .field("target", target)
166 .field("allow_cleartext", allow_cleartext)
167 .finish(),
168 Self::Worker { queue } => f.debug_struct("Worker").field("queue", queue).finish(),
169 Self::Persona { binding } => f
170 .debug_struct("Persona")
171 .field("name", &binding.name)
172 .finish(),
173 }
174 }
175}
176
177impl TriggerHandlerSpec {
178 pub fn kind(&self) -> &'static str {
179 match self {
180 Self::Local { .. } => "local",
181 Self::A2a { .. } => "a2a",
182 Self::Worker { .. } => "worker",
183 Self::Persona { .. } => "persona",
184 }
185 }
186}
187
188#[derive(Clone)]
189pub struct TriggerPredicateSpec {
190 pub raw: String,
191 pub closure: Rc<VmClosure>,
192}
193
194impl std::fmt::Debug for TriggerPredicateSpec {
195 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196 f.debug_struct("TriggerPredicateSpec")
197 .field("raw", &self.raw)
198 .finish()
199 }
200}
201
202#[derive(Clone, Debug)]
203pub struct TriggerBindingSpec {
204 pub id: String,
205 pub source: TriggerBindingSource,
206 pub kind: String,
207 pub provider: ProviderId,
208 pub autonomy_tier: AutonomyTier,
209 pub handler: TriggerHandlerSpec,
210 pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
211 pub when: Option<TriggerPredicateSpec>,
212 pub when_budget: Option<TriggerPredicateBudget>,
213 pub retry: TriggerRetryConfig,
214 pub match_events: Vec<String>,
215 pub dedupe_key: Option<String>,
216 pub dedupe_retention_days: u32,
217 pub filter: Option<String>,
218 pub daily_cost_usd: Option<f64>,
219 pub hourly_cost_usd: Option<f64>,
220 pub max_autonomous_decisions_per_hour: Option<u64>,
221 pub max_autonomous_decisions_per_day: Option<u64>,
222 pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
223 pub max_concurrent: Option<u32>,
224 pub flow_control: TriggerFlowControlConfig,
225 pub manifest_path: Option<PathBuf>,
226 pub package_name: Option<String>,
227 pub definition_fingerprint: String,
228}
229
230#[derive(Debug)]
231pub struct TriggerMetrics {
232 pub received: AtomicU64,
233 pub dispatched: AtomicU64,
234 pub failed: AtomicU64,
235 pub dlq: AtomicU64,
236 pub last_received_ms: Mutex<Option<i64>>,
237 pub cost_total_usd_micros: AtomicU64,
238 pub cost_today_usd_micros: AtomicU64,
239 pub cost_hour_usd_micros: AtomicU64,
240 pub autonomous_decisions_total: AtomicU64,
241 pub autonomous_decisions_today: AtomicU64,
242 pub autonomous_decisions_hour: AtomicU64,
243}
244
245impl Default for TriggerMetrics {
246 fn default() -> Self {
247 Self {
248 received: AtomicU64::new(0),
249 dispatched: AtomicU64::new(0),
250 failed: AtomicU64::new(0),
251 dlq: AtomicU64::new(0),
252 last_received_ms: Mutex::new(None),
253 cost_total_usd_micros: AtomicU64::new(0),
254 cost_today_usd_micros: AtomicU64::new(0),
255 cost_hour_usd_micros: AtomicU64::new(0),
256 autonomous_decisions_total: AtomicU64::new(0),
257 autonomous_decisions_today: AtomicU64::new(0),
258 autonomous_decisions_hour: AtomicU64::new(0),
259 }
260 }
261}
262
263#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
264pub struct TriggerMetricsSnapshot {
265 pub received: u64,
266 pub dispatched: u64,
267 pub failed: u64,
268 pub dlq: u64,
269 pub in_flight: u64,
270 pub last_received_ms: Option<i64>,
271 pub cost_total_usd_micros: u64,
272 pub cost_today_usd_micros: u64,
273 pub cost_hour_usd_micros: u64,
274 pub autonomous_decisions_total: u64,
275 pub autonomous_decisions_today: u64,
276 pub autonomous_decisions_hour: u64,
277}
278
279pub struct TriggerBinding {
280 pub id: TriggerId,
281 pub version: u32,
282 pub source: TriggerBindingSource,
283 pub kind: String,
284 pub provider: ProviderId,
285 pub autonomy_tier: AutonomyTier,
286 pub handler: TriggerHandlerSpec,
287 pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
288 pub when: Option<TriggerPredicateSpec>,
289 pub when_budget: Option<TriggerPredicateBudget>,
290 pub retry: TriggerRetryConfig,
291 pub match_events: Vec<String>,
292 pub dedupe_key: Option<String>,
293 pub dedupe_retention_days: u32,
294 pub filter: Option<String>,
295 pub daily_cost_usd: Option<f64>,
296 pub hourly_cost_usd: Option<f64>,
297 pub max_autonomous_decisions_per_hour: Option<u64>,
298 pub max_autonomous_decisions_per_day: Option<u64>,
299 pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
300 pub max_concurrent: Option<u32>,
301 pub flow_control: TriggerFlowControlConfig,
302 pub manifest_path: Option<PathBuf>,
303 pub package_name: Option<String>,
304 pub definition_fingerprint: String,
305 pub state: Mutex<TriggerState>,
306 pub metrics: TriggerMetrics,
307 pub in_flight: AtomicU64,
308 pub cancel_token: Arc<AtomicBool>,
309 pub predicate_state: Mutex<TriggerPredicateState>,
310}
311
312#[derive(Clone, Debug, Default)]
313pub struct TriggerPredicateState {
314 pub budget_day_utc: Option<i32>,
315 pub budget_hour_utc: Option<i64>,
316 pub consecutive_failures: u32,
317 pub breaker_open_until_ms: Option<i64>,
318 pub recent_cost_usd_micros: VecDeque<u64>,
319}
320
321impl std::fmt::Debug for TriggerBinding {
322 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
323 f.debug_struct("TriggerBinding")
324 .field("id", &self.id)
325 .field("version", &self.version)
326 .field("source", &self.source)
327 .field("kind", &self.kind)
328 .field("provider", &self.provider)
329 .field("handler_kind", &self.handler.kind())
330 .field("state", &self.state_snapshot())
331 .finish()
332 }
333}
334
335impl TriggerBinding {
336 pub fn snapshot(&self) -> TriggerBindingSnapshot {
337 TriggerBindingSnapshot {
338 id: self.id.as_str().to_string(),
339 version: self.version,
340 source: self.source,
341 kind: self.kind.clone(),
342 provider: self.provider.as_str().to_string(),
343 autonomy_tier: self.autonomy_tier,
344 handler_kind: self.handler.kind().to_string(),
345 state: self.state_snapshot(),
346 metrics: self.metrics_snapshot(),
347 daily_cost_usd: self.daily_cost_usd,
348 hourly_cost_usd: self.hourly_cost_usd,
349 max_autonomous_decisions_per_hour: self.max_autonomous_decisions_per_hour,
350 max_autonomous_decisions_per_day: self.max_autonomous_decisions_per_day,
351 on_budget_exhausted: self.on_budget_exhausted,
352 }
353 }
354
355 fn new(spec: TriggerBindingSpec, version: u32) -> Self {
356 Self {
357 id: TriggerId::new(spec.id),
358 version,
359 source: spec.source,
360 kind: spec.kind,
361 provider: spec.provider,
362 autonomy_tier: spec.autonomy_tier,
363 handler: spec.handler,
364 dispatch_priority: spec.dispatch_priority,
365 when: spec.when,
366 when_budget: spec.when_budget,
367 retry: spec.retry,
368 match_events: spec.match_events,
369 dedupe_key: spec.dedupe_key,
370 dedupe_retention_days: spec.dedupe_retention_days,
371 filter: spec.filter,
372 daily_cost_usd: spec.daily_cost_usd,
373 hourly_cost_usd: spec.hourly_cost_usd,
374 max_autonomous_decisions_per_hour: spec.max_autonomous_decisions_per_hour,
375 max_autonomous_decisions_per_day: spec.max_autonomous_decisions_per_day,
376 on_budget_exhausted: spec.on_budget_exhausted,
377 max_concurrent: spec.max_concurrent,
378 flow_control: spec.flow_control,
379 manifest_path: spec.manifest_path,
380 package_name: spec.package_name,
381 definition_fingerprint: spec.definition_fingerprint,
382 state: Mutex::new(TriggerState::Registering),
383 metrics: TriggerMetrics::default(),
384 in_flight: AtomicU64::new(0),
385 cancel_token: Arc::new(AtomicBool::new(false)),
386 predicate_state: Mutex::new(TriggerPredicateState::default()),
387 }
388 }
389
390 pub fn binding_key(&self) -> String {
391 format!("{}@v{}", self.id.as_str(), self.version)
392 }
393
394 pub fn state_snapshot(&self) -> TriggerState {
395 *self.state.lock().expect("trigger state poisoned")
396 }
397
398 pub fn metrics_snapshot(&self) -> TriggerMetricsSnapshot {
399 TriggerMetricsSnapshot {
400 received: self.metrics.received.load(Ordering::Relaxed),
401 dispatched: self.metrics.dispatched.load(Ordering::Relaxed),
402 failed: self.metrics.failed.load(Ordering::Relaxed),
403 dlq: self.metrics.dlq.load(Ordering::Relaxed),
404 in_flight: self.in_flight.load(Ordering::Relaxed),
405 last_received_ms: *self
406 .metrics
407 .last_received_ms
408 .lock()
409 .expect("trigger metrics poisoned"),
410 cost_total_usd_micros: self.metrics.cost_total_usd_micros.load(Ordering::Relaxed),
411 cost_today_usd_micros: self.metrics.cost_today_usd_micros.load(Ordering::Relaxed),
412 cost_hour_usd_micros: self.metrics.cost_hour_usd_micros.load(Ordering::Relaxed),
413 autonomous_decisions_total: self
414 .metrics
415 .autonomous_decisions_total
416 .load(Ordering::Relaxed),
417 autonomous_decisions_today: self
418 .metrics
419 .autonomous_decisions_today
420 .load(Ordering::Relaxed),
421 autonomous_decisions_hour: self
422 .metrics
423 .autonomous_decisions_hour
424 .load(Ordering::Relaxed),
425 }
426 }
427}
428
429#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
430pub struct TriggerBindingSnapshot {
431 pub id: String,
432 pub version: u32,
433 pub source: TriggerBindingSource,
434 pub kind: String,
435 pub provider: String,
436 pub autonomy_tier: AutonomyTier,
437 pub handler_kind: String,
438 pub state: TriggerState,
439 pub metrics: TriggerMetricsSnapshot,
440 pub daily_cost_usd: Option<f64>,
441 pub hourly_cost_usd: Option<f64>,
442 pub max_autonomous_decisions_per_hour: Option<u64>,
443 pub max_autonomous_decisions_per_day: Option<u64>,
444 pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
445}
446
447#[derive(Clone, Copy, Debug, PartialEq, Eq)]
448pub enum TriggerDispatchOutcome {
449 Dispatched,
450 Failed,
451 Dlq,
452}
453
454#[derive(Debug)]
455pub enum TriggerRegistryError {
456 DuplicateId(String),
457 InvalidSpec(String),
458 UnknownId(String),
459 UnknownBindingVersion { id: String, version: u32 },
460 EventLog(String),
461}
462
463impl std::fmt::Display for TriggerRegistryError {
464 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
465 match self {
466 Self::DuplicateId(id) => write!(f, "duplicate trigger id '{id}'"),
467 Self::InvalidSpec(message) | Self::EventLog(message) => f.write_str(message),
468 Self::UnknownId(id) => write!(f, "unknown trigger id '{id}'"),
469 Self::UnknownBindingVersion { id, version } => {
470 write!(f, "unknown trigger binding '{id}' version {version}")
471 }
472 }
473 }
474}
475
476impl std::error::Error for TriggerRegistryError {}
477
478#[derive(Default)]
479pub struct TriggerRegistry {
480 bindings: BTreeMap<String, Vec<Arc<TriggerBinding>>>,
481 by_provider: BTreeMap<String, BTreeSet<String>>,
482 event_log: Option<Arc<AnyEventLog>>,
483 secret_provider: Option<Arc<dyn SecretProvider>>,
484}
485
486thread_local! {
487 static TRIGGER_REGISTRY: RefCell<TriggerRegistry> = RefCell::new(TriggerRegistry::default());
488}
489
490thread_local! {
491 static ORCHESTRATOR_BUDGET: RefCell<OrchestratorBudgetState> =
492 RefCell::new(OrchestratorBudgetState::default());
493}
494
495const TERMINATED_VERSION_RETENTION_LIMIT: usize = 2;
496
497const TRIGGERS_LIFECYCLE_TOPIC: &str = "triggers.lifecycle";
498const PREDICATE_COST_WINDOW: usize = 100;
499
500#[derive(Clone, Debug, Deserialize)]
501struct LifecycleStateTransitionRecord {
502 id: String,
503 version: u32,
504 #[serde(default)]
505 definition_fingerprint: Option<String>,
506 to_state: TriggerState,
507}
508
509#[derive(Clone, Debug)]
510struct HistoricalLifecycleRecord {
511 occurred_at_ms: i64,
512 transition: LifecycleStateTransitionRecord,
513}
514
515#[derive(Clone, Copy, Debug, PartialEq, Eq)]
516pub struct RecordedTriggerBinding {
517 pub version: u32,
518 pub received_at: OffsetDateTime,
519}
520
521#[derive(Clone, Copy, Debug, Default)]
522struct HistoricalVersionLookup {
523 matching_version: Option<u32>,
524 max_version: Option<u32>,
525}
526
527pub fn clear_trigger_registry() {
528 TRIGGER_REGISTRY.with(|slot| {
529 *slot.borrow_mut() = TriggerRegistry::default();
530 });
531 clear_orchestrator_budget();
532}
533
534pub fn install_orchestrator_budget(config: OrchestratorBudgetConfig) {
535 ORCHESTRATOR_BUDGET.with(|slot| {
536 let mut state = slot.borrow_mut();
537 rollover_orchestrator_budget(&mut state);
538 state.config = config;
539 });
540}
541
542pub fn clear_orchestrator_budget() {
543 ORCHESTRATOR_BUDGET.with(|slot| {
544 *slot.borrow_mut() = OrchestratorBudgetState::default();
545 });
546}
547
548pub fn snapshot_orchestrator_budget() -> OrchestratorBudgetSnapshot {
549 ORCHESTRATOR_BUDGET.with(|slot| {
550 let mut state = slot.borrow_mut();
551 rollover_orchestrator_budget(&mut state);
552 OrchestratorBudgetSnapshot {
553 daily_cost_usd: state.config.daily_cost_usd,
554 hourly_cost_usd: state.config.hourly_cost_usd,
555 cost_today_usd_micros: state.cost_today_usd_micros,
556 cost_hour_usd_micros: state.cost_hour_usd_micros,
557 day_utc: state.day_utc,
558 hour_utc: state.hour_utc,
559 }
560 })
561}
562
563pub fn note_orchestrator_budget_cost(cost_usd_micros: u64) {
564 if cost_usd_micros == 0 {
565 return;
566 }
567 ORCHESTRATOR_BUDGET.with(|slot| {
568 let mut state = slot.borrow_mut();
569 rollover_orchestrator_budget(&mut state);
570 state.cost_today_usd_micros = state.cost_today_usd_micros.saturating_add(cost_usd_micros);
571 state.cost_hour_usd_micros = state.cost_hour_usd_micros.saturating_add(cost_usd_micros);
572 });
573}
574
575pub fn orchestrator_budget_would_exceed(expected_cost_usd_micros: u64) -> Option<&'static str> {
576 ORCHESTRATOR_BUDGET.with(|slot| {
577 let mut state = slot.borrow_mut();
578 rollover_orchestrator_budget(&mut state);
579 if state.config.hourly_cost_usd.is_some_and(|limit| {
580 micros_to_usd(
581 state
582 .cost_hour_usd_micros
583 .saturating_add(expected_cost_usd_micros),
584 ) > limit
585 }) {
586 return Some("orchestrator_hourly_budget_exceeded");
587 }
588 if state.config.daily_cost_usd.is_some_and(|limit| {
589 micros_to_usd(
590 state
591 .cost_today_usd_micros
592 .saturating_add(expected_cost_usd_micros),
593 ) > limit
594 }) {
595 return Some("orchestrator_daily_budget_exceeded");
596 }
597 None
598 })
599}
600
601pub fn reset_binding_budget_windows(binding: &TriggerBinding) {
602 let today = utc_day_key();
603 let hour = utc_hour_key();
604 let mut state = binding
605 .predicate_state
606 .lock()
607 .expect("trigger predicate state poisoned");
608 if state.budget_day_utc != Some(today) {
609 state.budget_day_utc = Some(today);
610 binding
611 .metrics
612 .cost_today_usd_micros
613 .store(0, Ordering::Relaxed);
614 binding
615 .metrics
616 .autonomous_decisions_today
617 .store(0, Ordering::Relaxed);
618 }
619 if state.budget_hour_utc != Some(hour) {
620 state.budget_hour_utc = Some(hour);
621 binding
622 .metrics
623 .cost_hour_usd_micros
624 .store(0, Ordering::Relaxed);
625 binding
626 .metrics
627 .autonomous_decisions_hour
628 .store(0, Ordering::Relaxed);
629 }
630}
631
632pub fn binding_budget_would_exceed(
633 binding: &TriggerBinding,
634 expected_cost_usd_micros: u64,
635) -> Option<&'static str> {
636 reset_binding_budget_windows(binding);
637 if binding.hourly_cost_usd.is_some_and(|limit| {
638 micros_to_usd(
639 binding
640 .metrics
641 .cost_hour_usd_micros
642 .load(Ordering::Relaxed)
643 .saturating_add(expected_cost_usd_micros),
644 ) > limit
645 }) {
646 return Some("hourly_budget_exceeded");
647 }
648 if binding.daily_cost_usd.is_some_and(|limit| {
649 micros_to_usd(
650 binding
651 .metrics
652 .cost_today_usd_micros
653 .load(Ordering::Relaxed)
654 .saturating_add(expected_cost_usd_micros),
655 ) > limit
656 }) {
657 return Some("daily_budget_exceeded");
658 }
659 None
660}
661
662pub fn binding_autonomy_budget_would_exceed(binding: &TriggerBinding) -> Option<&'static str> {
663 reset_binding_budget_windows(binding);
664 if binding
665 .max_autonomous_decisions_per_hour
666 .is_some_and(|limit| {
667 binding
668 .metrics
669 .autonomous_decisions_hour
670 .load(Ordering::Relaxed)
671 .saturating_add(1)
672 > limit
673 })
674 {
675 return Some("hourly_autonomy_budget_exceeded");
676 }
677 if binding
678 .max_autonomous_decisions_per_day
679 .is_some_and(|limit| {
680 binding
681 .metrics
682 .autonomous_decisions_today
683 .load(Ordering::Relaxed)
684 .saturating_add(1)
685 > limit
686 })
687 {
688 return Some("daily_autonomy_budget_exceeded");
689 }
690 None
691}
692
693pub fn note_autonomous_decision(binding: &TriggerBinding) {
694 reset_binding_budget_windows(binding);
695 binding
696 .metrics
697 .autonomous_decisions_total
698 .fetch_add(1, Ordering::Relaxed);
699 binding
700 .metrics
701 .autonomous_decisions_today
702 .fetch_add(1, Ordering::Relaxed);
703 binding
704 .metrics
705 .autonomous_decisions_hour
706 .fetch_add(1, Ordering::Relaxed);
707}
708
709pub fn expected_predicate_cost_usd_micros(binding: &TriggerBinding) -> u64 {
710 let state = binding
711 .predicate_state
712 .lock()
713 .expect("trigger predicate state poisoned");
714 if !state.recent_cost_usd_micros.is_empty() {
715 let total: u64 = state.recent_cost_usd_micros.iter().copied().sum();
716 return total / state.recent_cost_usd_micros.len() as u64;
717 }
718 binding
719 .when_budget
720 .as_ref()
721 .and_then(|budget| budget.max_cost_usd)
722 .map(usd_to_micros)
723 .unwrap_or_default()
724}
725
726pub fn record_predicate_cost_sample(binding: &TriggerBinding, cost_usd_micros: u64) {
727 let mut state = binding
728 .predicate_state
729 .lock()
730 .expect("trigger predicate state poisoned");
731 state.recent_cost_usd_micros.push_back(cost_usd_micros);
732 while state.recent_cost_usd_micros.len() > PREDICATE_COST_WINDOW {
733 state.recent_cost_usd_micros.pop_front();
734 }
735}
736
737pub fn usd_to_micros(value: f64) -> u64 {
738 if !value.is_finite() || value <= 0.0 {
739 return 0;
740 }
741 (value * 1_000_000.0).ceil() as u64
742}
743
744pub fn micros_to_usd(value: u64) -> f64 {
745 value as f64 / 1_000_000.0
746}
747
748fn rollover_orchestrator_budget(state: &mut OrchestratorBudgetState) {
749 let today = utc_day_key();
750 let hour = utc_hour_key();
751 if state.day_utc != today {
752 state.day_utc = today;
753 state.cost_today_usd_micros = 0;
754 }
755 if state.hour_utc != hour {
756 state.hour_utc = hour;
757 state.cost_hour_usd_micros = 0;
758 }
759}
760
761fn utc_day_key() -> i32 {
762 (clock::now_utc().date()
763 - time::Date::from_calendar_date(1970, time::Month::January, 1).expect("valid epoch date"))
764 .whole_days() as i32
765}
766
767fn utc_hour_key() -> i64 {
768 clock::now_utc().unix_timestamp() / 3_600
769}
770
771pub fn snapshot_trigger_bindings() -> Vec<TriggerBindingSnapshot> {
772 TRIGGER_REGISTRY.with(|slot| {
773 let registry = slot.borrow();
774 let mut snapshots = Vec::new();
775 for bindings in registry.bindings.values() {
776 for binding in bindings {
777 snapshots.push(binding.snapshot());
778 }
779 }
780 snapshots.sort_by(|left, right| {
781 left.id
782 .cmp(&right.id)
783 .then(left.version.cmp(&right.version))
784 .then(left.state.as_str().cmp(right.state.as_str()))
785 });
786 snapshots
787 })
788}
789
790#[allow(clippy::arc_with_non_send_sync)]
791pub fn resolve_trigger_binding_as_of(
792 id: &str,
793 as_of: OffsetDateTime,
794) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
795 let version = binding_version_as_of(id, as_of)?;
796 resolve_trigger_binding_version(id, version)
797}
798
799#[allow(clippy::arc_with_non_send_sync)]
800pub fn resolve_live_or_as_of(
801 id: &str,
802 recorded: RecordedTriggerBinding,
803) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
804 match resolve_live_trigger_binding(id, Some(recorded.version)) {
805 Ok(binding) => Ok(binding),
806 Err(TriggerRegistryError::UnknownBindingVersion { .. }) => {
807 let binding = resolve_trigger_binding_as_of(id, recorded.received_at)?;
808 let mut metadata = BTreeMap::new();
809 metadata.insert("trigger_id".to_string(), serde_json::json!(id));
810 metadata.insert(
811 "recorded_version".to_string(),
812 serde_json::json!(recorded.version),
813 );
814 metadata.insert(
815 "received_at".to_string(),
816 serde_json::json!(recorded
817 .received_at
818 .format(&time::format_description::well_known::Rfc3339)
819 .unwrap_or_else(|_| recorded.received_at.to_string())),
820 );
821 metadata.insert(
822 "resolved_version".to_string(),
823 serde_json::json!(binding.version),
824 );
825 crate::events::log_warn_meta(
826 "replay.binding_version_gc_fallback",
827 "trigger replay fell back to lifecycle history after binding version GC",
828 metadata,
829 );
830 Ok(binding)
831 }
832 Err(error) => Err(error),
833 }
834}
835
836pub fn binding_version_as_of(id: &str, as_of: OffsetDateTime) -> Result<u32, TriggerRegistryError> {
837 TRIGGER_REGISTRY.with(|slot| {
838 let registry = slot.borrow();
839 registry.binding_version_as_of(id, as_of)
840 })
841}
842
843#[allow(clippy::arc_with_non_send_sync)]
844fn resolve_trigger_binding_version(
845 id: &str,
846 version: u32,
847) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
848 TRIGGER_REGISTRY.with(|slot| {
849 let registry = slot.borrow();
850 registry
851 .binding(id, version)
852 .ok_or_else(|| TriggerRegistryError::UnknownBindingVersion {
853 id: id.to_string(),
854 version,
855 })
856 })
857}
858
859#[allow(clippy::arc_with_non_send_sync)]
860pub fn resolve_live_trigger_binding(
861 id: &str,
862 version: Option<u32>,
863) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
864 TRIGGER_REGISTRY.with(|slot| {
865 let registry = slot.borrow();
866 if let Some(version) = version {
867 let binding = registry.binding(id, version).ok_or_else(|| {
868 TriggerRegistryError::UnknownBindingVersion {
869 id: id.to_string(),
870 version,
871 }
872 })?;
873 if binding.state_snapshot() == TriggerState::Terminated {
874 return Err(TriggerRegistryError::UnknownBindingVersion {
875 id: id.to_string(),
876 version,
877 });
878 }
879 return Ok(binding);
880 }
881
882 registry
883 .live_bindings_any_source(id)
884 .into_iter()
885 .max_by_key(|binding| binding.version)
886 .ok_or_else(|| TriggerRegistryError::UnknownId(id.to_string()))
887 })
888}
889
890pub(crate) fn matching_bindings(event: &super::TriggerEvent) -> Vec<Arc<TriggerBinding>> {
891 TRIGGER_REGISTRY.with(|slot| {
892 let registry = slot.borrow();
893 let Some(binding_ids) = registry.by_provider.get(event.provider.as_str()) else {
894 return Vec::new();
895 };
896
897 let mut bindings = Vec::new();
898 for id in binding_ids {
899 let Some(versions) = registry.bindings.get(id) else {
900 continue;
901 };
902 for binding in versions {
903 if binding.state_snapshot() != TriggerState::Active {
904 continue;
905 }
906 if !binding.match_events.is_empty()
907 && !binding.match_events.iter().any(|kind| kind == &event.kind)
908 {
909 continue;
910 }
911 bindings.push(binding.clone());
912 }
913 }
914
915 bindings.sort_by(|left, right| {
916 left.id
917 .as_str()
918 .cmp(right.id.as_str())
919 .then(left.version.cmp(&right.version))
920 });
921 bindings
922 })
923}
924
925pub async fn install_manifest_triggers(
926 specs: Vec<TriggerBindingSpec>,
927) -> Result<(), TriggerRegistryError> {
928 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
929 let registry = &mut *slot.borrow_mut();
930 registry.refresh_runtime_context();
931 let mut touched_ids = BTreeSet::new();
932
933 let mut incoming = BTreeMap::new();
934 for spec in specs {
935 let spec_id = spec.id.clone();
936 if spec.source != TriggerBindingSource::Manifest {
937 return Err(TriggerRegistryError::InvalidSpec(format!(
938 "manifest install received non-manifest trigger '{}'",
939 spec_id
940 )));
941 }
942 if spec_id.trim().is_empty() {
943 return Err(TriggerRegistryError::InvalidSpec(
944 "manifest trigger id cannot be empty".to_string(),
945 ));
946 }
947 if incoming.insert(spec_id.clone(), spec).is_some() {
948 return Err(TriggerRegistryError::DuplicateId(spec_id));
949 }
950 }
951
952 let mut lifecycle = Vec::new();
953 let existing_ids: Vec<String> = registry
954 .bindings
955 .iter()
956 .filter(|(_, bindings)| {
957 bindings.iter().any(|binding| {
958 binding.source == TriggerBindingSource::Manifest
959 && binding.state_snapshot() != TriggerState::Terminated
960 })
961 })
962 .map(|(id, _)| id.clone())
963 .collect();
964
965 for id in existing_ids {
966 let live_manifest = registry.live_bindings(&id, TriggerBindingSource::Manifest);
967 let Some(spec) = incoming.remove(&id) else {
968 for binding in live_manifest {
969 registry.transition_binding_to_draining(&binding, &mut lifecycle);
970 }
971 touched_ids.insert(id.clone());
972 continue;
973 };
974
975 let has_matching_active = live_manifest.iter().any(|binding| {
976 binding.definition_fingerprint == spec.definition_fingerprint
977 && matches!(
978 binding.state_snapshot(),
979 TriggerState::Registering | TriggerState::Active
980 )
981 });
982 if has_matching_active {
983 continue;
984 }
985
986 for binding in live_manifest {
987 registry.transition_binding_to_draining(&binding, &mut lifecycle);
988 }
989
990 let version = registry.next_version_for_spec(&spec);
991 registry.register_binding(spec, version, &mut lifecycle);
992 touched_ids.insert(id.clone());
993 }
994
995 for spec in incoming.into_values() {
996 touched_ids.insert(spec.id.clone());
997 let version = registry.next_version_for_spec(&spec);
998 registry.register_binding(spec, version, &mut lifecycle);
999 }
1000
1001 for id in touched_ids {
1002 registry.gc_terminated_versions(&id);
1003 }
1004
1005 Ok((registry.event_log.clone(), lifecycle))
1006 })?;
1007
1008 append_lifecycle_events(event_log, events).await
1009}
1010
1011pub async fn dynamic_register(
1012 mut spec: TriggerBindingSpec,
1013) -> Result<TriggerId, TriggerRegistryError> {
1014 if spec.id.trim().is_empty() {
1015 spec.id = format!("dynamic_trigger_{}", Uuid::now_v7());
1016 }
1017 spec.source = TriggerBindingSource::Dynamic;
1018 let id = spec.id.clone();
1019 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1020 let registry = &mut *slot.borrow_mut();
1021 registry.refresh_runtime_context();
1022
1023 if registry.bindings.contains_key(id.as_str()) {
1024 return Err(TriggerRegistryError::DuplicateId(id.clone()));
1025 }
1026
1027 let mut lifecycle = Vec::new();
1028 let version = registry.next_version_for_spec(&spec);
1029 registry.register_binding(spec, version, &mut lifecycle);
1030 Ok((registry.event_log.clone(), lifecycle))
1031 })?;
1032
1033 append_lifecycle_events(event_log, events).await?;
1034 Ok(TriggerId::new(id))
1035}
1036
1037pub async fn dynamic_deregister(id: &str) -> Result<(), TriggerRegistryError> {
1038 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1039 let registry = &mut *slot.borrow_mut();
1040 let live_dynamic = registry.live_bindings(id, TriggerBindingSource::Dynamic);
1041 if live_dynamic.is_empty() {
1042 return Err(TriggerRegistryError::UnknownId(id.to_string()));
1043 }
1044
1045 let mut lifecycle = Vec::new();
1046 for binding in live_dynamic {
1047 registry.transition_binding_to_draining(&binding, &mut lifecycle);
1048 }
1049 Ok((registry.event_log.clone(), lifecycle))
1050 })?;
1051
1052 append_lifecycle_events(event_log, events).await
1053}
1054
1055pub async fn drain(id: &str) -> Result<(), TriggerRegistryError> {
1056 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1057 let registry = &mut *slot.borrow_mut();
1058 let live = registry.live_bindings_any_source(id);
1059 if live.is_empty() {
1060 return Err(TriggerRegistryError::UnknownId(id.to_string()));
1061 }
1062
1063 let mut lifecycle = Vec::new();
1064 for binding in live {
1065 registry.transition_binding_to_draining(&binding, &mut lifecycle);
1066 }
1067 Ok((registry.event_log.clone(), lifecycle))
1068 })?;
1069
1070 append_lifecycle_events(event_log, events).await
1071}
1072
1073pub async fn pause(id: &str) -> Result<(), TriggerRegistryError> {
1074 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1075 let registry = &mut *slot.borrow_mut();
1076 let live = registry.live_bindings_any_source(id);
1077 if live.is_empty() {
1078 return Err(TriggerRegistryError::UnknownId(id.to_string()));
1079 }
1080
1081 let mut lifecycle = Vec::new();
1082 for binding in live {
1083 match binding.state_snapshot() {
1084 TriggerState::Registering | TriggerState::Active => {
1085 registry.transition_binding_state(
1086 &binding,
1087 TriggerState::Paused,
1088 &mut lifecycle,
1089 );
1090 }
1091 TriggerState::Paused | TriggerState::Draining | TriggerState::Terminated => {}
1092 }
1093 }
1094 Ok((registry.event_log.clone(), lifecycle))
1095 })?;
1096
1097 append_lifecycle_events(event_log, events).await
1098}
1099
1100pub async fn resume(id: &str) -> Result<(), TriggerRegistryError> {
1101 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1102 let registry = &mut *slot.borrow_mut();
1103 let live = registry.live_bindings_any_source(id);
1104 if live.is_empty() {
1105 return Err(TriggerRegistryError::UnknownId(id.to_string()));
1106 }
1107
1108 let mut lifecycle = Vec::new();
1109 for binding in live {
1110 if binding.state_snapshot() == TriggerState::Paused {
1111 registry.transition_binding_state(&binding, TriggerState::Active, &mut lifecycle);
1112 }
1113 }
1114 Ok((registry.event_log.clone(), lifecycle))
1115 })?;
1116
1117 append_lifecycle_events(event_log, events).await
1118}
1119
1120fn pin_trigger_binding_inner(
1121 id: &str,
1122 version: u32,
1123 allow_terminated: bool,
1124) -> Result<(), TriggerRegistryError> {
1125 TRIGGER_REGISTRY.with(|slot| {
1126 let registry = slot.borrow();
1127 let binding = registry.binding(id, version).ok_or_else(|| {
1128 TriggerRegistryError::UnknownBindingVersion {
1129 id: id.to_string(),
1130 version,
1131 }
1132 })?;
1133 match binding.state_snapshot() {
1134 TriggerState::Paused => Err(TriggerRegistryError::InvalidSpec(format!(
1135 "trigger binding '{}' version {} is paused",
1136 id, version
1137 ))),
1138 TriggerState::Terminated if !allow_terminated => {
1139 Err(TriggerRegistryError::InvalidSpec(format!(
1140 "trigger binding '{}' version {} is terminated",
1141 id, version
1142 )))
1143 }
1144 _ => {
1145 binding.in_flight.fetch_add(1, Ordering::Relaxed);
1146 Ok(())
1147 }
1148 }
1149 })
1150}
1151
1152pub fn pin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1153 pin_trigger_binding_inner(id, version, false)
1154}
1155
1156pub async fn unpin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1157 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1158 let registry = &mut *slot.borrow_mut();
1159 let binding = registry.binding(id, version).ok_or_else(|| {
1160 TriggerRegistryError::UnknownBindingVersion {
1161 id: id.to_string(),
1162 version,
1163 }
1164 })?;
1165 let current = binding.in_flight.load(Ordering::Relaxed);
1166 if current == 0 {
1167 return Err(TriggerRegistryError::InvalidSpec(format!(
1168 "trigger binding '{}' version {} has no in-flight events",
1169 id, version
1170 )));
1171 }
1172 binding.in_flight.fetch_sub(1, Ordering::Relaxed);
1173
1174 let mut lifecycle = Vec::new();
1175 registry.maybe_finalize_draining(&binding, &mut lifecycle);
1176 registry.gc_terminated_versions(binding.id.as_str());
1177 Ok((registry.event_log.clone(), lifecycle))
1178 })?;
1179
1180 append_lifecycle_events(event_log, events).await
1181}
1182
1183pub fn begin_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1184 begin_in_flight_inner(id, version, false)
1185}
1186
1187pub(crate) fn begin_replay_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1188 begin_in_flight_inner(id, version, true)
1189}
1190
1191fn begin_in_flight_inner(
1192 id: &str,
1193 version: u32,
1194 allow_terminated: bool,
1195) -> Result<(), TriggerRegistryError> {
1196 pin_trigger_binding_inner(id, version, allow_terminated)?;
1197 TRIGGER_REGISTRY.with(|slot| {
1198 let registry = slot.borrow();
1199 let binding = registry.binding(id, version).ok_or_else(|| {
1200 TriggerRegistryError::UnknownBindingVersion {
1201 id: id.to_string(),
1202 version,
1203 }
1204 })?;
1205 binding.metrics.received.fetch_add(1, Ordering::Relaxed);
1206 *binding
1207 .metrics
1208 .last_received_ms
1209 .lock()
1210 .expect("trigger metrics poisoned") = Some(now_ms());
1211 Ok(())
1212 })
1213}
1214
1215pub async fn finish_in_flight(
1216 id: &str,
1217 version: u32,
1218 outcome: TriggerDispatchOutcome,
1219) -> Result<(), TriggerRegistryError> {
1220 TRIGGER_REGISTRY.with(|slot| {
1221 let registry = &mut *slot.borrow_mut();
1222 let binding = registry.binding(id, version).ok_or_else(|| {
1223 TriggerRegistryError::UnknownBindingVersion {
1224 id: id.to_string(),
1225 version,
1226 }
1227 })?;
1228 let current = binding.in_flight.load(Ordering::Relaxed);
1229 if current == 0 {
1230 return Err(TriggerRegistryError::InvalidSpec(format!(
1231 "trigger binding '{}' version {} has no in-flight events",
1232 id, version
1233 )));
1234 }
1235 match outcome {
1236 TriggerDispatchOutcome::Dispatched => {
1237 binding.metrics.dispatched.fetch_add(1, Ordering::Relaxed);
1238 }
1239 TriggerDispatchOutcome::Failed => {
1240 binding.metrics.failed.fetch_add(1, Ordering::Relaxed);
1241 }
1242 TriggerDispatchOutcome::Dlq => {
1243 binding.metrics.dlq.fetch_add(1, Ordering::Relaxed);
1244 }
1245 }
1246 Ok(())
1247 })?;
1248
1249 unpin_trigger_binding(id, version).await
1250}
1251
1252impl TriggerRegistry {
1253 fn refresh_runtime_context(&mut self) {
1254 if self.event_log.is_none() {
1255 self.event_log = active_event_log();
1256 }
1257 if self.secret_provider.is_none() {
1258 self.secret_provider = default_secret_provider();
1259 }
1260 }
1261
1262 fn binding(&self, id: &str, version: u32) -> Option<Arc<TriggerBinding>> {
1263 self.bindings
1264 .get(id)
1265 .and_then(|bindings| bindings.iter().find(|binding| binding.version == version))
1266 .cloned()
1267 }
1268
1269 fn live_bindings(&self, id: &str, source: TriggerBindingSource) -> Vec<Arc<TriggerBinding>> {
1270 self.bindings
1271 .get(id)
1272 .into_iter()
1273 .flat_map(|bindings| bindings.iter())
1274 .filter(|binding| {
1275 binding.source == source && binding.state_snapshot() != TriggerState::Terminated
1276 })
1277 .cloned()
1278 .collect()
1279 }
1280
1281 fn live_bindings_any_source(&self, id: &str) -> Vec<Arc<TriggerBinding>> {
1282 self.bindings
1283 .get(id)
1284 .into_iter()
1285 .flat_map(|bindings| bindings.iter())
1286 .filter(|binding| binding.state_snapshot() != TriggerState::Terminated)
1287 .cloned()
1288 .collect()
1289 }
1290
1291 fn next_version_for_spec(&self, spec: &TriggerBindingSpec) -> u32 {
1292 if let Some(version) = self
1293 .bindings
1294 .get(spec.id.as_str())
1295 .into_iter()
1296 .flat_map(|bindings| bindings.iter())
1297 .find(|binding| binding.definition_fingerprint == spec.definition_fingerprint)
1298 .map(|binding| binding.version)
1299 {
1300 return version;
1301 }
1302
1303 let historical =
1304 self.historical_versions_for(spec.id.as_str(), spec.definition_fingerprint.as_str());
1305 if let Some(version) = historical.matching_version {
1306 return version;
1307 }
1308
1309 self.bindings
1310 .get(spec.id.as_str())
1311 .into_iter()
1312 .flat_map(|bindings| bindings.iter())
1313 .map(|binding| binding.version)
1314 .chain(historical.max_version)
1315 .max()
1316 .unwrap_or(0)
1317 + 1
1318 }
1319
1320 fn gc_terminated_versions(&mut self, id: &str) {
1321 let Some(bindings) = self.bindings.get_mut(id) else {
1322 return;
1323 };
1324
1325 let mut newest_versions: Vec<u32> =
1326 bindings.iter().map(|binding| binding.version).collect();
1327 newest_versions.sort_unstable_by(|left, right| right.cmp(left));
1328 newest_versions.truncate(TERMINATED_VERSION_RETENTION_LIMIT);
1329 let retained_versions: BTreeSet<u32> = newest_versions.into_iter().collect();
1330
1331 bindings.retain(|binding| {
1332 binding.state_snapshot() != TriggerState::Terminated
1333 || retained_versions.contains(&binding.version)
1334 });
1335
1336 if bindings.is_empty() {
1337 self.bindings.remove(id);
1338 }
1339 }
1340
1341 fn historical_versions_for(&self, id: &str, fingerprint: &str) -> HistoricalVersionLookup {
1342 let mut lookup = HistoricalVersionLookup::default();
1343 for record in self.lifecycle_records_for(id) {
1344 lookup.max_version = Some(
1345 lookup
1346 .max_version
1347 .unwrap_or(0)
1348 .max(record.transition.version),
1349 );
1350 if record.transition.definition_fingerprint.as_deref() == Some(fingerprint) {
1351 lookup.matching_version = Some(record.transition.version);
1352 }
1353 }
1354 lookup
1355 }
1356
1357 fn binding_version_as_of(
1358 &self,
1359 id: &str,
1360 as_of: OffsetDateTime,
1361 ) -> Result<u32, TriggerRegistryError> {
1362 let cutoff_ms = (as_of.unix_timestamp_nanos() / 1_000_000) as i64;
1363 let mut active_version = None;
1364 for record in self.lifecycle_records_for(id) {
1365 if record.occurred_at_ms > cutoff_ms {
1366 break;
1367 }
1368 match record.transition.to_state {
1369 TriggerState::Active => active_version = Some(record.transition.version),
1370 TriggerState::Paused | TriggerState::Draining | TriggerState::Terminated => {
1371 if active_version == Some(record.transition.version) {
1372 active_version = None;
1373 }
1374 }
1375 TriggerState::Registering => {}
1376 }
1377 }
1378
1379 active_version.ok_or_else(|| {
1380 TriggerRegistryError::InvalidSpec(format!(
1381 "no active trigger binding '{}' at {}",
1382 id,
1383 as_of
1384 .format(&time::format_description::well_known::Rfc3339)
1385 .unwrap_or_else(|_| as_of.to_string())
1386 ))
1387 })
1388 }
1389
1390 fn lifecycle_records_for(&self, id: &str) -> Vec<HistoricalLifecycleRecord> {
1391 let Some(event_log) = self.event_log.as_ref() else {
1392 return Vec::new();
1393 };
1394 let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1395 .expect("static triggers.lifecycle topic should always be valid");
1396 futures::executor::block_on(event_log.read_range(&topic, None, usize::MAX))
1397 .unwrap_or_default()
1398 .into_iter()
1399 .filter_map(|(_, event)| {
1400 let occurred_at_ms = event.occurred_at_ms;
1401 let transition: LifecycleStateTransitionRecord =
1402 serde_json::from_value(event.payload).ok()?;
1403 (transition.id == id).then_some(HistoricalLifecycleRecord {
1404 occurred_at_ms,
1405 transition,
1406 })
1407 })
1408 .collect()
1409 }
1410
1411 #[allow(clippy::arc_with_non_send_sync)]
1412 fn register_binding(
1413 &mut self,
1414 spec: TriggerBindingSpec,
1415 version: u32,
1416 lifecycle: &mut Vec<LogEvent>,
1417 ) -> Arc<TriggerBinding> {
1418 let binding = Arc::new(TriggerBinding::new(spec, version));
1419 self.by_provider
1420 .entry(binding.provider.as_str().to_string())
1421 .or_default()
1422 .insert(binding.id.as_str().to_string());
1423 self.bindings
1424 .entry(binding.id.as_str().to_string())
1425 .or_default()
1426 .push(binding.clone());
1427 lifecycle.push(lifecycle_event(&binding, None, TriggerState::Registering));
1428 self.transition_binding_state(&binding, TriggerState::Active, lifecycle);
1429 binding
1430 }
1431
1432 fn transition_binding_to_draining(
1433 &self,
1434 binding: &Arc<TriggerBinding>,
1435 lifecycle: &mut Vec<LogEvent>,
1436 ) {
1437 if matches!(binding.state_snapshot(), TriggerState::Terminated) {
1438 return;
1439 }
1440 self.transition_binding_state(binding, TriggerState::Draining, lifecycle);
1441 self.maybe_finalize_draining(binding, lifecycle);
1442 }
1443
1444 fn maybe_finalize_draining(
1445 &self,
1446 binding: &Arc<TriggerBinding>,
1447 lifecycle: &mut Vec<LogEvent>,
1448 ) {
1449 if binding.state_snapshot() == TriggerState::Draining
1450 && binding.in_flight.load(Ordering::Relaxed) == 0
1451 {
1452 self.transition_binding_state(binding, TriggerState::Terminated, lifecycle);
1453 }
1454 }
1455
1456 fn transition_binding_state(
1457 &self,
1458 binding: &Arc<TriggerBinding>,
1459 next: TriggerState,
1460 lifecycle: &mut Vec<LogEvent>,
1461 ) {
1462 let mut state = binding.state.lock().expect("trigger state poisoned");
1463 let previous = *state;
1464 if previous == next {
1465 return;
1466 }
1467 *state = next;
1468 drop(state);
1469 lifecycle.push(lifecycle_event(binding, Some(previous), next));
1470 }
1471}
1472
1473fn lifecycle_event(
1474 binding: &TriggerBinding,
1475 from_state: Option<TriggerState>,
1476 to_state: TriggerState,
1477) -> LogEvent {
1478 LogEvent::new(
1479 "state_transition",
1480 serde_json::json!({
1481 "id": binding.id.as_str(),
1482 "binding_key": binding.binding_key(),
1483 "version": binding.version,
1484 "provider": binding.provider.as_str(),
1485 "kind": &binding.kind,
1486 "source": binding.source.as_str(),
1487 "handler_kind": binding.handler.kind(),
1488 "definition_fingerprint": &binding.definition_fingerprint,
1489 "from_state": from_state.map(TriggerState::as_str),
1490 "to_state": to_state.as_str(),
1491 }),
1492 )
1493}
1494
1495async fn append_lifecycle_events(
1496 event_log: Option<Arc<AnyEventLog>>,
1497 events: Vec<LogEvent>,
1498) -> Result<(), TriggerRegistryError> {
1499 let Some(event_log) = event_log else {
1500 return Ok(());
1501 };
1502 if events.is_empty() {
1503 return Ok(());
1504 }
1505
1506 let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1507 .expect("static triggers.lifecycle topic should always be valid");
1508 for event in events {
1509 event_log
1510 .append(&topic, event)
1511 .await
1512 .map_err(|error| TriggerRegistryError::EventLog(error.to_string()))?;
1513 }
1514 Ok(())
1515}
1516
1517fn default_secret_provider() -> Option<Arc<dyn SecretProvider>> {
1518 configured_default_chain(default_secret_namespace())
1519 .ok()
1520 .map(|provider| Arc::new(provider) as Arc<dyn SecretProvider>)
1521}
1522
1523fn default_secret_namespace() -> String {
1524 if let Ok(namespace) = std::env::var("HARN_SECRET_NAMESPACE") {
1525 if !namespace.trim().is_empty() {
1526 return namespace;
1527 }
1528 }
1529
1530 let cwd = std::env::current_dir().unwrap_or_default();
1531 let leaf = cwd
1532 .file_name()
1533 .and_then(|name| name.to_str())
1534 .filter(|name| !name.is_empty())
1535 .unwrap_or("workspace");
1536 format!("harn/{leaf}")
1537}
1538
1539fn now_ms() -> i64 {
1540 clock::now_ms()
1541}
1542
1543#[cfg(test)]
1544mod tests {
1545 use super::*;
1546 use crate::event_log::{install_default_for_base_dir, reset_active_event_log};
1547 use crate::events::{add_event_sink, clear_event_sinks, CollectorSink, EventLevel};
1548 use crate::triggers::test_util::timing::FILE_WATCH_FALLBACK_POLL;
1549 use std::rc::Rc;
1550 use time::OffsetDateTime;
1551
1552 fn manifest_spec(id: &str, fingerprint: &str) -> TriggerBindingSpec {
1553 TriggerBindingSpec {
1554 id: id.to_string(),
1555 source: TriggerBindingSource::Manifest,
1556 kind: "webhook".to_string(),
1557 provider: ProviderId::from("github"),
1558 autonomy_tier: crate::AutonomyTier::ActAuto,
1559 handler: TriggerHandlerSpec::Worker {
1560 queue: format!("{id}-queue"),
1561 },
1562 dispatch_priority: crate::WorkerQueuePriority::Normal,
1563 when: None,
1564 when_budget: None,
1565 retry: TriggerRetryConfig::default(),
1566 match_events: vec!["issues.opened".to_string()],
1567 dedupe_key: Some("event.dedupe_key".to_string()),
1568 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1569 filter: Some("event.kind".to_string()),
1570 daily_cost_usd: Some(5.0),
1571 hourly_cost_usd: None,
1572 max_autonomous_decisions_per_hour: None,
1573 max_autonomous_decisions_per_day: None,
1574 on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1575 max_concurrent: Some(10),
1576 flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1577 manifest_path: None,
1578 package_name: Some("workspace".to_string()),
1579 definition_fingerprint: fingerprint.to_string(),
1580 }
1581 }
1582
1583 fn dynamic_spec(id: &str) -> TriggerBindingSpec {
1584 TriggerBindingSpec {
1585 id: id.to_string(),
1586 source: TriggerBindingSource::Dynamic,
1587 kind: "webhook".to_string(),
1588 provider: ProviderId::from("github"),
1589 autonomy_tier: crate::AutonomyTier::ActAuto,
1590 handler: TriggerHandlerSpec::Worker {
1591 queue: format!("{id}-queue"),
1592 },
1593 dispatch_priority: crate::WorkerQueuePriority::Normal,
1594 when: None,
1595 when_budget: None,
1596 retry: TriggerRetryConfig::default(),
1597 match_events: vec!["issues.opened".to_string()],
1598 dedupe_key: None,
1599 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1600 filter: None,
1601 daily_cost_usd: None,
1602 hourly_cost_usd: None,
1603 max_autonomous_decisions_per_hour: None,
1604 max_autonomous_decisions_per_day: None,
1605 on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1606 max_concurrent: None,
1607 flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1608 manifest_path: None,
1609 package_name: None,
1610 definition_fingerprint: format!("dynamic:{id}"),
1611 }
1612 }
1613
1614 #[tokio::test(flavor = "current_thread")]
1615 async fn manifest_loaded_trigger_registers_with_zeroed_metrics() {
1616 clear_trigger_registry();
1617
1618 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1619 .await
1620 .expect("manifest trigger installs");
1621
1622 let snapshots = snapshot_trigger_bindings();
1623 assert_eq!(snapshots.len(), 1);
1624 let binding = &snapshots[0];
1625 assert_eq!(binding.id, "github-new-issue");
1626 assert_eq!(binding.version, 1);
1627 assert_eq!(binding.state, TriggerState::Active);
1628 assert_eq!(binding.metrics, TriggerMetricsSnapshot::default());
1629
1630 clear_trigger_registry();
1631 }
1632
1633 #[tokio::test(flavor = "current_thread")]
1634 async fn dynamic_register_assigns_unique_ids_and_rejects_duplicates() {
1635 clear_trigger_registry();
1636
1637 let first = dynamic_register(dynamic_spec("dynamic-a"))
1638 .await
1639 .expect("first dynamic trigger");
1640 let second = dynamic_register(dynamic_spec("dynamic-b"))
1641 .await
1642 .expect("second dynamic trigger");
1643 assert_ne!(first, second);
1644
1645 let error = dynamic_register(dynamic_spec("dynamic-a"))
1646 .await
1647 .expect_err("duplicate id should fail");
1648 assert!(matches!(error, TriggerRegistryError::DuplicateId(_)));
1649
1650 clear_trigger_registry();
1651 }
1652
1653 #[tokio::test(flavor = "current_thread")]
1654 async fn drain_waits_for_in_flight_events_before_terminating() {
1655 clear_trigger_registry();
1656
1657 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1658 .await
1659 .expect("manifest trigger installs");
1660 begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1661
1662 drain("github-new-issue").await.expect("drain succeeds");
1663 let binding = snapshot_trigger_bindings()
1664 .into_iter()
1665 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1666 .expect("binding snapshot");
1667 assert_eq!(binding.state, TriggerState::Draining);
1668 assert_eq!(binding.metrics.in_flight, 1);
1669
1670 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1671 .await
1672 .expect("finish in-flight event");
1673 let binding = snapshot_trigger_bindings()
1674 .into_iter()
1675 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1676 .expect("binding snapshot");
1677 assert_eq!(binding.state, TriggerState::Terminated);
1678 assert_eq!(binding.metrics.in_flight, 0);
1679
1680 clear_trigger_registry();
1681 }
1682
1683 #[tokio::test(flavor = "current_thread")]
1684 async fn hot_reload_registers_new_version_while_old_binding_drains() {
1685 clear_trigger_registry();
1686
1687 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1688 .await
1689 .expect("initial manifest trigger installs");
1690 begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1691
1692 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1693 .await
1694 .expect("updated manifest trigger installs");
1695
1696 let snapshots = snapshot_trigger_bindings();
1697 assert_eq!(snapshots.len(), 2);
1698 let old = snapshots
1699 .iter()
1700 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1701 .expect("old binding");
1702 let new = snapshots
1703 .iter()
1704 .find(|binding| binding.id == "github-new-issue" && binding.version == 2)
1705 .expect("new binding");
1706 assert_eq!(old.state, TriggerState::Draining);
1707 assert_eq!(new.state, TriggerState::Active);
1708
1709 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1710 .await
1711 .expect("finish old in-flight event");
1712 let old = snapshot_trigger_bindings()
1713 .into_iter()
1714 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1715 .expect("old binding");
1716 assert_eq!(old.state, TriggerState::Terminated);
1717
1718 clear_trigger_registry();
1719 }
1720
1721 #[tokio::test(flavor = "current_thread")]
1722 async fn gc_drops_terminated_versions_beyond_retention_limit() {
1723 clear_trigger_registry();
1724
1725 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1726 .await
1727 .expect("install v1");
1728 begin_in_flight("github-new-issue", 1).expect("pin v1");
1729
1730 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1731 .await
1732 .expect("install v2");
1733 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1734 .await
1735 .expect("finish v1");
1736
1737 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
1738 .await
1739 .expect("install v3");
1740
1741 let snapshots = snapshot_trigger_bindings();
1742 let versions: Vec<u32> = snapshots
1743 .into_iter()
1744 .filter(|binding| binding.id == "github-new-issue")
1745 .map(|binding| binding.version)
1746 .collect();
1747 assert_eq!(versions, vec![2, 3]);
1748
1749 clear_trigger_registry();
1750 }
1751
1752 #[tokio::test(flavor = "current_thread")]
1753 async fn lifecycle_transitions_append_to_event_log() {
1754 clear_trigger_registry();
1755 reset_active_event_log();
1756 let tempdir = tempfile::tempdir().expect("tempdir");
1757 let log = install_default_for_base_dir(tempdir.path()).expect("install event log");
1758
1759 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1760 .await
1761 .expect("manifest trigger installs");
1762 begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1763 drain("github-new-issue").await.expect("drain succeeds");
1764 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1765 .await
1766 .expect("finish event");
1767
1768 let topic = Topic::new("triggers.lifecycle").expect("valid lifecycle topic");
1769 let events = log
1770 .read_range(&topic, None, 32)
1771 .await
1772 .expect("read lifecycle events");
1773 let states: Vec<String> = events
1774 .into_iter()
1775 .filter_map(|(_, event)| {
1776 event
1777 .payload
1778 .get("to_state")
1779 .and_then(|value| value.as_str())
1780 .map(|value| value.to_string())
1781 })
1782 .collect();
1783 assert_eq!(
1784 states,
1785 vec![
1786 "registering".to_string(),
1787 "active".to_string(),
1788 "draining".to_string(),
1789 "terminated".to_string(),
1790 ]
1791 );
1792
1793 reset_active_event_log();
1794 clear_trigger_registry();
1795 }
1796
1797 #[tokio::test(flavor = "current_thread")]
1798 async fn version_history_reuses_historical_version_after_restart() {
1799 clear_trigger_registry();
1800 reset_active_event_log();
1801 let tempdir = tempfile::tempdir().expect("tempdir");
1802 install_default_for_base_dir(tempdir.path()).expect("install event log");
1803
1804 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1805 .await
1806 .expect("initial manifest trigger installs");
1807 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1808 .await
1809 .expect("updated manifest trigger installs");
1810
1811 clear_trigger_registry();
1812 reset_active_event_log();
1813 install_default_for_base_dir(tempdir.path()).expect("reopen event log");
1814
1815 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1816 .await
1817 .expect("manifest reload reuses historical version");
1818
1819 let binding = snapshot_trigger_bindings()
1820 .into_iter()
1821 .find(|binding| binding.id == "github-new-issue")
1822 .expect("binding snapshot");
1823 assert_eq!(binding.version, 2);
1824
1825 reset_active_event_log();
1826 clear_trigger_registry();
1827 }
1828
1829 #[tokio::test(flavor = "current_thread")]
1830 async fn binding_version_as_of_reports_historical_active_version() {
1831 clear_trigger_registry();
1832 reset_active_event_log();
1833 let tempdir = tempfile::tempdir().expect("tempdir");
1834 install_default_for_base_dir(tempdir.path()).expect("install event log");
1835
1836 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1837 .await
1838 .expect("initial manifest trigger installs");
1839 let before_reload = OffsetDateTime::now_utc();
1840 std::thread::sleep(FILE_WATCH_FALLBACK_POLL);
1841
1842 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1843 .await
1844 .expect("updated manifest trigger installs");
1845 let after_reload = OffsetDateTime::now_utc();
1846
1847 assert_eq!(
1848 binding_version_as_of("github-new-issue", before_reload)
1849 .expect("version before reload"),
1850 1
1851 );
1852 assert_eq!(
1853 binding_version_as_of("github-new-issue", after_reload).expect("version after reload"),
1854 2
1855 );
1856
1857 reset_active_event_log();
1858 clear_trigger_registry();
1859 }
1860
1861 #[tokio::test(flavor = "current_thread")]
1862 async fn resolve_live_or_as_of_logs_structured_gc_fallback() {
1863 clear_trigger_registry();
1864 reset_active_event_log();
1865 let sink = Rc::new(CollectorSink::new());
1866 clear_event_sinks();
1867 add_event_sink(sink.clone());
1868 let tempdir = tempfile::tempdir().expect("tempdir");
1869 install_default_for_base_dir(tempdir.path()).expect("install event log");
1870
1871 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1872 .await
1873 .expect("install v1");
1874 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1875 .await
1876 .expect("install v2");
1877 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
1878 .await
1879 .expect("install v3");
1880 let received_at = OffsetDateTime::now_utc();
1881 std::thread::sleep(FILE_WATCH_FALLBACK_POLL);
1882 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v4")])
1883 .await
1884 .expect("install v4");
1885
1886 let binding = resolve_live_or_as_of(
1887 "github-new-issue",
1888 RecordedTriggerBinding {
1889 version: 1,
1890 received_at,
1891 },
1892 )
1893 .expect("resolve fallback binding");
1894 assert_eq!(binding.version, 3);
1895
1896 let warning = sink
1897 .logs
1898 .borrow()
1899 .iter()
1900 .find(|log| log.category == "replay.binding_version_gc_fallback")
1901 .cloned()
1902 .expect("gc fallback warning");
1903 assert_eq!(warning.level, EventLevel::Warn);
1904 assert_eq!(
1905 warning.metadata.get("trigger_id"),
1906 Some(&serde_json::json!("github-new-issue"))
1907 );
1908 assert_eq!(
1909 warning.metadata.get("recorded_version"),
1910 Some(&serde_json::json!(1))
1911 );
1912 assert_eq!(
1913 warning.metadata.get("received_at"),
1914 Some(&serde_json::json!(received_at
1915 .format(&time::format_description::well_known::Rfc3339)
1916 .unwrap_or_else(|_| received_at.to_string())))
1917 );
1918 assert_eq!(
1919 warning.metadata.get("resolved_version"),
1920 Some(&serde_json::json!(3))
1921 );
1922
1923 clear_event_sinks();
1924 crate::events::reset_event_sinks();
1925 reset_active_event_log();
1926 clear_trigger_registry();
1927 }
1928}