1use serde::{Deserialize, Serialize};
2use std::cell::RefCell;
3use std::collections::{BTreeMap, BTreeSet, VecDeque};
4use std::path::PathBuf;
5use std::rc::Rc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::{Arc, Mutex};
8
9use time::OffsetDateTime;
10use uuid::Uuid;
11
12use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogEvent, Topic};
13use crate::llm::trigger_predicate::TriggerPredicateBudget;
14use crate::secrets::{configured_default_chain, SecretProvider};
15use crate::triggers::test_util::clock;
16use crate::trust_graph::AutonomyTier;
17use crate::value::VmClosure;
18
19use super::dispatcher::TriggerRetryConfig;
20use super::flow_control::TriggerFlowControlConfig;
21use super::ProviderId;
22
23#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
24pub struct TriggerId(String);
25
26impl TriggerId {
27 pub fn new(value: impl Into<String>) -> Self {
28 Self(value.into())
29 }
30
31 pub fn as_str(&self) -> &str {
32 &self.0
33 }
34}
35
36impl std::fmt::Display for TriggerId {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 self.0.fmt(f)
39 }
40}
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
43#[serde(rename_all = "snake_case")]
44pub enum TriggerState {
45 Registering,
46 Active,
47 Draining,
48 Terminated,
49}
50
51impl TriggerState {
52 pub fn as_str(self) -> &'static str {
53 match self {
54 Self::Registering => "registering",
55 Self::Active => "active",
56 Self::Draining => "draining",
57 Self::Terminated => "terminated",
58 }
59 }
60}
61
62#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
63#[serde(rename_all = "snake_case")]
64pub enum TriggerBindingSource {
65 Manifest,
66 Dynamic,
67}
68
69impl TriggerBindingSource {
70 pub fn as_str(self) -> &'static str {
71 match self {
72 Self::Manifest => "manifest",
73 Self::Dynamic => "dynamic",
74 }
75 }
76}
77
78#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
79#[serde(rename_all = "snake_case")]
80pub enum TriggerBudgetExhaustionStrategy {
81 #[default]
82 False,
83 RetryLater,
84 Fail,
85 Warn,
86}
87
88impl TriggerBudgetExhaustionStrategy {
89 pub fn as_str(self) -> &'static str {
90 match self {
91 Self::False => "false",
92 Self::RetryLater => "retry_later",
93 Self::Fail => "fail",
94 Self::Warn => "warn",
95 }
96 }
97}
98
99#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
100pub struct OrchestratorBudgetConfig {
101 pub daily_cost_usd: Option<f64>,
102 pub hourly_cost_usd: Option<f64>,
103}
104
105#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
106pub struct OrchestratorBudgetSnapshot {
107 pub daily_cost_usd: Option<f64>,
108 pub hourly_cost_usd: Option<f64>,
109 pub cost_today_usd_micros: u64,
110 pub cost_hour_usd_micros: u64,
111 pub day_utc: i32,
112 pub hour_utc: i64,
113}
114
115#[derive(Debug)]
116struct OrchestratorBudgetState {
117 config: OrchestratorBudgetConfig,
118 day_utc: i32,
119 hour_utc: i64,
120 cost_today_usd_micros: u64,
121 cost_hour_usd_micros: u64,
122}
123
124impl Default for OrchestratorBudgetState {
125 fn default() -> Self {
126 Self {
127 config: OrchestratorBudgetConfig::default(),
128 day_utc: utc_day_key(),
129 hour_utc: utc_hour_key(),
130 cost_today_usd_micros: 0,
131 cost_hour_usd_micros: 0,
132 }
133 }
134}
135
136#[derive(Clone)]
137pub enum TriggerHandlerSpec {
138 Local {
139 raw: String,
140 closure: Rc<VmClosure>,
141 },
142 A2a {
143 target: String,
144 allow_cleartext: bool,
145 },
146 Worker {
147 queue: String,
148 },
149 Persona {
150 binding: crate::PersonaRuntimeBinding,
151 },
152}
153
154impl std::fmt::Debug for TriggerHandlerSpec {
155 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156 match self {
157 Self::Local { raw, .. } => f.debug_struct("Local").field("raw", raw).finish(),
158 Self::A2a {
159 target,
160 allow_cleartext,
161 } => f
162 .debug_struct("A2a")
163 .field("target", target)
164 .field("allow_cleartext", allow_cleartext)
165 .finish(),
166 Self::Worker { queue } => f.debug_struct("Worker").field("queue", queue).finish(),
167 Self::Persona { binding } => f
168 .debug_struct("Persona")
169 .field("name", &binding.name)
170 .finish(),
171 }
172 }
173}
174
175impl TriggerHandlerSpec {
176 pub fn kind(&self) -> &'static str {
177 match self {
178 Self::Local { .. } => "local",
179 Self::A2a { .. } => "a2a",
180 Self::Worker { .. } => "worker",
181 Self::Persona { .. } => "persona",
182 }
183 }
184}
185
186#[derive(Clone)]
187pub struct TriggerPredicateSpec {
188 pub raw: String,
189 pub closure: Rc<VmClosure>,
190}
191
192impl std::fmt::Debug for TriggerPredicateSpec {
193 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194 f.debug_struct("TriggerPredicateSpec")
195 .field("raw", &self.raw)
196 .finish()
197 }
198}
199
200#[derive(Clone, Debug)]
201pub struct TriggerBindingSpec {
202 pub id: String,
203 pub source: TriggerBindingSource,
204 pub kind: String,
205 pub provider: ProviderId,
206 pub autonomy_tier: AutonomyTier,
207 pub handler: TriggerHandlerSpec,
208 pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
209 pub when: Option<TriggerPredicateSpec>,
210 pub when_budget: Option<TriggerPredicateBudget>,
211 pub retry: TriggerRetryConfig,
212 pub match_events: Vec<String>,
213 pub dedupe_key: Option<String>,
214 pub dedupe_retention_days: u32,
215 pub filter: Option<String>,
216 pub daily_cost_usd: Option<f64>,
217 pub hourly_cost_usd: Option<f64>,
218 pub max_autonomous_decisions_per_hour: Option<u64>,
219 pub max_autonomous_decisions_per_day: Option<u64>,
220 pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
221 pub max_concurrent: Option<u32>,
222 pub flow_control: TriggerFlowControlConfig,
223 pub manifest_path: Option<PathBuf>,
224 pub package_name: Option<String>,
225 pub definition_fingerprint: String,
226}
227
228#[derive(Debug)]
229pub struct TriggerMetrics {
230 pub received: AtomicU64,
231 pub dispatched: AtomicU64,
232 pub failed: AtomicU64,
233 pub dlq: AtomicU64,
234 pub last_received_ms: Mutex<Option<i64>>,
235 pub cost_total_usd_micros: AtomicU64,
236 pub cost_today_usd_micros: AtomicU64,
237 pub cost_hour_usd_micros: AtomicU64,
238 pub autonomous_decisions_total: AtomicU64,
239 pub autonomous_decisions_today: AtomicU64,
240 pub autonomous_decisions_hour: AtomicU64,
241}
242
243impl Default for TriggerMetrics {
244 fn default() -> Self {
245 Self {
246 received: AtomicU64::new(0),
247 dispatched: AtomicU64::new(0),
248 failed: AtomicU64::new(0),
249 dlq: AtomicU64::new(0),
250 last_received_ms: Mutex::new(None),
251 cost_total_usd_micros: AtomicU64::new(0),
252 cost_today_usd_micros: AtomicU64::new(0),
253 cost_hour_usd_micros: AtomicU64::new(0),
254 autonomous_decisions_total: AtomicU64::new(0),
255 autonomous_decisions_today: AtomicU64::new(0),
256 autonomous_decisions_hour: AtomicU64::new(0),
257 }
258 }
259}
260
261#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
262pub struct TriggerMetricsSnapshot {
263 pub received: u64,
264 pub dispatched: u64,
265 pub failed: u64,
266 pub dlq: u64,
267 pub in_flight: u64,
268 pub last_received_ms: Option<i64>,
269 pub cost_total_usd_micros: u64,
270 pub cost_today_usd_micros: u64,
271 pub cost_hour_usd_micros: u64,
272 pub autonomous_decisions_total: u64,
273 pub autonomous_decisions_today: u64,
274 pub autonomous_decisions_hour: u64,
275}
276
277pub struct TriggerBinding {
278 pub id: TriggerId,
279 pub version: u32,
280 pub source: TriggerBindingSource,
281 pub kind: String,
282 pub provider: ProviderId,
283 pub autonomy_tier: AutonomyTier,
284 pub handler: TriggerHandlerSpec,
285 pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
286 pub when: Option<TriggerPredicateSpec>,
287 pub when_budget: Option<TriggerPredicateBudget>,
288 pub retry: TriggerRetryConfig,
289 pub match_events: Vec<String>,
290 pub dedupe_key: Option<String>,
291 pub dedupe_retention_days: u32,
292 pub filter: Option<String>,
293 pub daily_cost_usd: Option<f64>,
294 pub hourly_cost_usd: Option<f64>,
295 pub max_autonomous_decisions_per_hour: Option<u64>,
296 pub max_autonomous_decisions_per_day: Option<u64>,
297 pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
298 pub max_concurrent: Option<u32>,
299 pub flow_control: TriggerFlowControlConfig,
300 pub manifest_path: Option<PathBuf>,
301 pub package_name: Option<String>,
302 pub definition_fingerprint: String,
303 pub state: Mutex<TriggerState>,
304 pub metrics: TriggerMetrics,
305 pub in_flight: AtomicU64,
306 pub cancel_token: Arc<AtomicBool>,
307 pub predicate_state: Mutex<TriggerPredicateState>,
308}
309
310#[derive(Clone, Debug, Default)]
311pub struct TriggerPredicateState {
312 pub budget_day_utc: Option<i32>,
313 pub budget_hour_utc: Option<i64>,
314 pub consecutive_failures: u32,
315 pub breaker_open_until_ms: Option<i64>,
316 pub recent_cost_usd_micros: VecDeque<u64>,
317}
318
319impl std::fmt::Debug for TriggerBinding {
320 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
321 f.debug_struct("TriggerBinding")
322 .field("id", &self.id)
323 .field("version", &self.version)
324 .field("source", &self.source)
325 .field("kind", &self.kind)
326 .field("provider", &self.provider)
327 .field("handler_kind", &self.handler.kind())
328 .field("state", &self.state_snapshot())
329 .finish()
330 }
331}
332
333impl TriggerBinding {
334 pub fn snapshot(&self) -> TriggerBindingSnapshot {
335 TriggerBindingSnapshot {
336 id: self.id.as_str().to_string(),
337 version: self.version,
338 source: self.source,
339 kind: self.kind.clone(),
340 provider: self.provider.as_str().to_string(),
341 autonomy_tier: self.autonomy_tier,
342 handler_kind: self.handler.kind().to_string(),
343 state: self.state_snapshot(),
344 metrics: self.metrics_snapshot(),
345 daily_cost_usd: self.daily_cost_usd,
346 hourly_cost_usd: self.hourly_cost_usd,
347 max_autonomous_decisions_per_hour: self.max_autonomous_decisions_per_hour,
348 max_autonomous_decisions_per_day: self.max_autonomous_decisions_per_day,
349 on_budget_exhausted: self.on_budget_exhausted,
350 }
351 }
352
353 fn new(spec: TriggerBindingSpec, version: u32) -> Self {
354 Self {
355 id: TriggerId::new(spec.id),
356 version,
357 source: spec.source,
358 kind: spec.kind,
359 provider: spec.provider,
360 autonomy_tier: spec.autonomy_tier,
361 handler: spec.handler,
362 dispatch_priority: spec.dispatch_priority,
363 when: spec.when,
364 when_budget: spec.when_budget,
365 retry: spec.retry,
366 match_events: spec.match_events,
367 dedupe_key: spec.dedupe_key,
368 dedupe_retention_days: spec.dedupe_retention_days,
369 filter: spec.filter,
370 daily_cost_usd: spec.daily_cost_usd,
371 hourly_cost_usd: spec.hourly_cost_usd,
372 max_autonomous_decisions_per_hour: spec.max_autonomous_decisions_per_hour,
373 max_autonomous_decisions_per_day: spec.max_autonomous_decisions_per_day,
374 on_budget_exhausted: spec.on_budget_exhausted,
375 max_concurrent: spec.max_concurrent,
376 flow_control: spec.flow_control,
377 manifest_path: spec.manifest_path,
378 package_name: spec.package_name,
379 definition_fingerprint: spec.definition_fingerprint,
380 state: Mutex::new(TriggerState::Registering),
381 metrics: TriggerMetrics::default(),
382 in_flight: AtomicU64::new(0),
383 cancel_token: Arc::new(AtomicBool::new(false)),
384 predicate_state: Mutex::new(TriggerPredicateState::default()),
385 }
386 }
387
388 pub fn binding_key(&self) -> String {
389 format!("{}@v{}", self.id.as_str(), self.version)
390 }
391
392 pub fn state_snapshot(&self) -> TriggerState {
393 *self.state.lock().expect("trigger state poisoned")
394 }
395
396 pub fn metrics_snapshot(&self) -> TriggerMetricsSnapshot {
397 TriggerMetricsSnapshot {
398 received: self.metrics.received.load(Ordering::Relaxed),
399 dispatched: self.metrics.dispatched.load(Ordering::Relaxed),
400 failed: self.metrics.failed.load(Ordering::Relaxed),
401 dlq: self.metrics.dlq.load(Ordering::Relaxed),
402 in_flight: self.in_flight.load(Ordering::Relaxed),
403 last_received_ms: *self
404 .metrics
405 .last_received_ms
406 .lock()
407 .expect("trigger metrics poisoned"),
408 cost_total_usd_micros: self.metrics.cost_total_usd_micros.load(Ordering::Relaxed),
409 cost_today_usd_micros: self.metrics.cost_today_usd_micros.load(Ordering::Relaxed),
410 cost_hour_usd_micros: self.metrics.cost_hour_usd_micros.load(Ordering::Relaxed),
411 autonomous_decisions_total: self
412 .metrics
413 .autonomous_decisions_total
414 .load(Ordering::Relaxed),
415 autonomous_decisions_today: self
416 .metrics
417 .autonomous_decisions_today
418 .load(Ordering::Relaxed),
419 autonomous_decisions_hour: self
420 .metrics
421 .autonomous_decisions_hour
422 .load(Ordering::Relaxed),
423 }
424 }
425}
426
427#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
428pub struct TriggerBindingSnapshot {
429 pub id: String,
430 pub version: u32,
431 pub source: TriggerBindingSource,
432 pub kind: String,
433 pub provider: String,
434 pub autonomy_tier: AutonomyTier,
435 pub handler_kind: String,
436 pub state: TriggerState,
437 pub metrics: TriggerMetricsSnapshot,
438 pub daily_cost_usd: Option<f64>,
439 pub hourly_cost_usd: Option<f64>,
440 pub max_autonomous_decisions_per_hour: Option<u64>,
441 pub max_autonomous_decisions_per_day: Option<u64>,
442 pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
443}
444
445#[derive(Clone, Copy, Debug, PartialEq, Eq)]
446pub enum TriggerDispatchOutcome {
447 Dispatched,
448 Failed,
449 Dlq,
450}
451
452#[derive(Debug)]
453pub enum TriggerRegistryError {
454 DuplicateId(String),
455 InvalidSpec(String),
456 UnknownId(String),
457 UnknownBindingVersion { id: String, version: u32 },
458 EventLog(String),
459}
460
461impl std::fmt::Display for TriggerRegistryError {
462 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
463 match self {
464 Self::DuplicateId(id) => write!(f, "duplicate trigger id '{id}'"),
465 Self::InvalidSpec(message) | Self::EventLog(message) => f.write_str(message),
466 Self::UnknownId(id) => write!(f, "unknown trigger id '{id}'"),
467 Self::UnknownBindingVersion { id, version } => {
468 write!(f, "unknown trigger binding '{id}' version {version}")
469 }
470 }
471 }
472}
473
474impl std::error::Error for TriggerRegistryError {}
475
476#[derive(Default)]
477pub struct TriggerRegistry {
478 bindings: BTreeMap<String, Vec<Arc<TriggerBinding>>>,
479 by_provider: BTreeMap<String, BTreeSet<String>>,
480 event_log: Option<Arc<AnyEventLog>>,
481 secret_provider: Option<Arc<dyn SecretProvider>>,
482}
483
484thread_local! {
485 static TRIGGER_REGISTRY: RefCell<TriggerRegistry> = RefCell::new(TriggerRegistry::default());
486}
487
488thread_local! {
489 static ORCHESTRATOR_BUDGET: RefCell<OrchestratorBudgetState> =
490 RefCell::new(OrchestratorBudgetState::default());
491}
492
493const TERMINATED_VERSION_RETENTION_LIMIT: usize = 2;
494
495const TRIGGERS_LIFECYCLE_TOPIC: &str = "triggers.lifecycle";
496const PREDICATE_COST_WINDOW: usize = 100;
497
498#[derive(Clone, Debug, Deserialize)]
499struct LifecycleStateTransitionRecord {
500 id: String,
501 version: u32,
502 #[serde(default)]
503 definition_fingerprint: Option<String>,
504 to_state: TriggerState,
505}
506
507#[derive(Clone, Debug)]
508struct HistoricalLifecycleRecord {
509 occurred_at_ms: i64,
510 transition: LifecycleStateTransitionRecord,
511}
512
513#[derive(Clone, Copy, Debug, PartialEq, Eq)]
514pub struct RecordedTriggerBinding {
515 pub version: u32,
516 pub received_at: OffsetDateTime,
517}
518
519#[derive(Clone, Copy, Debug, Default)]
520struct HistoricalVersionLookup {
521 matching_version: Option<u32>,
522 max_version: Option<u32>,
523}
524
525pub fn clear_trigger_registry() {
526 TRIGGER_REGISTRY.with(|slot| {
527 *slot.borrow_mut() = TriggerRegistry::default();
528 });
529 clear_orchestrator_budget();
530}
531
532pub fn install_orchestrator_budget(config: OrchestratorBudgetConfig) {
533 ORCHESTRATOR_BUDGET.with(|slot| {
534 let mut state = slot.borrow_mut();
535 rollover_orchestrator_budget(&mut state);
536 state.config = config;
537 });
538}
539
540pub fn clear_orchestrator_budget() {
541 ORCHESTRATOR_BUDGET.with(|slot| {
542 *slot.borrow_mut() = OrchestratorBudgetState::default();
543 });
544}
545
546pub fn snapshot_orchestrator_budget() -> OrchestratorBudgetSnapshot {
547 ORCHESTRATOR_BUDGET.with(|slot| {
548 let mut state = slot.borrow_mut();
549 rollover_orchestrator_budget(&mut state);
550 OrchestratorBudgetSnapshot {
551 daily_cost_usd: state.config.daily_cost_usd,
552 hourly_cost_usd: state.config.hourly_cost_usd,
553 cost_today_usd_micros: state.cost_today_usd_micros,
554 cost_hour_usd_micros: state.cost_hour_usd_micros,
555 day_utc: state.day_utc,
556 hour_utc: state.hour_utc,
557 }
558 })
559}
560
561pub fn note_orchestrator_budget_cost(cost_usd_micros: u64) {
562 if cost_usd_micros == 0 {
563 return;
564 }
565 ORCHESTRATOR_BUDGET.with(|slot| {
566 let mut state = slot.borrow_mut();
567 rollover_orchestrator_budget(&mut state);
568 state.cost_today_usd_micros = state.cost_today_usd_micros.saturating_add(cost_usd_micros);
569 state.cost_hour_usd_micros = state.cost_hour_usd_micros.saturating_add(cost_usd_micros);
570 });
571}
572
573pub fn orchestrator_budget_would_exceed(expected_cost_usd_micros: u64) -> Option<&'static str> {
574 ORCHESTRATOR_BUDGET.with(|slot| {
575 let mut state = slot.borrow_mut();
576 rollover_orchestrator_budget(&mut state);
577 if state.config.hourly_cost_usd.is_some_and(|limit| {
578 micros_to_usd(
579 state
580 .cost_hour_usd_micros
581 .saturating_add(expected_cost_usd_micros),
582 ) > limit
583 }) {
584 return Some("orchestrator_hourly_budget_exceeded");
585 }
586 if state.config.daily_cost_usd.is_some_and(|limit| {
587 micros_to_usd(
588 state
589 .cost_today_usd_micros
590 .saturating_add(expected_cost_usd_micros),
591 ) > limit
592 }) {
593 return Some("orchestrator_daily_budget_exceeded");
594 }
595 None
596 })
597}
598
599pub fn reset_binding_budget_windows(binding: &TriggerBinding) {
600 let today = utc_day_key();
601 let hour = utc_hour_key();
602 let mut state = binding
603 .predicate_state
604 .lock()
605 .expect("trigger predicate state poisoned");
606 if state.budget_day_utc != Some(today) {
607 state.budget_day_utc = Some(today);
608 binding
609 .metrics
610 .cost_today_usd_micros
611 .store(0, Ordering::Relaxed);
612 binding
613 .metrics
614 .autonomous_decisions_today
615 .store(0, Ordering::Relaxed);
616 }
617 if state.budget_hour_utc != Some(hour) {
618 state.budget_hour_utc = Some(hour);
619 binding
620 .metrics
621 .cost_hour_usd_micros
622 .store(0, Ordering::Relaxed);
623 binding
624 .metrics
625 .autonomous_decisions_hour
626 .store(0, Ordering::Relaxed);
627 }
628}
629
630pub fn binding_budget_would_exceed(
631 binding: &TriggerBinding,
632 expected_cost_usd_micros: u64,
633) -> Option<&'static str> {
634 reset_binding_budget_windows(binding);
635 if binding.hourly_cost_usd.is_some_and(|limit| {
636 micros_to_usd(
637 binding
638 .metrics
639 .cost_hour_usd_micros
640 .load(Ordering::Relaxed)
641 .saturating_add(expected_cost_usd_micros),
642 ) > limit
643 }) {
644 return Some("hourly_budget_exceeded");
645 }
646 if binding.daily_cost_usd.is_some_and(|limit| {
647 micros_to_usd(
648 binding
649 .metrics
650 .cost_today_usd_micros
651 .load(Ordering::Relaxed)
652 .saturating_add(expected_cost_usd_micros),
653 ) > limit
654 }) {
655 return Some("daily_budget_exceeded");
656 }
657 None
658}
659
660pub fn binding_autonomy_budget_would_exceed(binding: &TriggerBinding) -> Option<&'static str> {
661 reset_binding_budget_windows(binding);
662 if binding
663 .max_autonomous_decisions_per_hour
664 .is_some_and(|limit| {
665 binding
666 .metrics
667 .autonomous_decisions_hour
668 .load(Ordering::Relaxed)
669 .saturating_add(1)
670 > limit
671 })
672 {
673 return Some("hourly_autonomy_budget_exceeded");
674 }
675 if binding
676 .max_autonomous_decisions_per_day
677 .is_some_and(|limit| {
678 binding
679 .metrics
680 .autonomous_decisions_today
681 .load(Ordering::Relaxed)
682 .saturating_add(1)
683 > limit
684 })
685 {
686 return Some("daily_autonomy_budget_exceeded");
687 }
688 None
689}
690
691pub fn note_autonomous_decision(binding: &TriggerBinding) {
692 reset_binding_budget_windows(binding);
693 binding
694 .metrics
695 .autonomous_decisions_total
696 .fetch_add(1, Ordering::Relaxed);
697 binding
698 .metrics
699 .autonomous_decisions_today
700 .fetch_add(1, Ordering::Relaxed);
701 binding
702 .metrics
703 .autonomous_decisions_hour
704 .fetch_add(1, Ordering::Relaxed);
705}
706
707pub fn expected_predicate_cost_usd_micros(binding: &TriggerBinding) -> u64 {
708 let state = binding
709 .predicate_state
710 .lock()
711 .expect("trigger predicate state poisoned");
712 if !state.recent_cost_usd_micros.is_empty() {
713 let total: u64 = state.recent_cost_usd_micros.iter().copied().sum();
714 return total / state.recent_cost_usd_micros.len() as u64;
715 }
716 binding
717 .when_budget
718 .as_ref()
719 .and_then(|budget| budget.max_cost_usd)
720 .map(usd_to_micros)
721 .unwrap_or_default()
722}
723
724pub fn record_predicate_cost_sample(binding: &TriggerBinding, cost_usd_micros: u64) {
725 let mut state = binding
726 .predicate_state
727 .lock()
728 .expect("trigger predicate state poisoned");
729 state.recent_cost_usd_micros.push_back(cost_usd_micros);
730 while state.recent_cost_usd_micros.len() > PREDICATE_COST_WINDOW {
731 state.recent_cost_usd_micros.pop_front();
732 }
733}
734
735pub fn usd_to_micros(value: f64) -> u64 {
736 if !value.is_finite() || value <= 0.0 {
737 return 0;
738 }
739 (value * 1_000_000.0).ceil() as u64
740}
741
742pub fn micros_to_usd(value: u64) -> f64 {
743 value as f64 / 1_000_000.0
744}
745
746fn rollover_orchestrator_budget(state: &mut OrchestratorBudgetState) {
747 let today = utc_day_key();
748 let hour = utc_hour_key();
749 if state.day_utc != today {
750 state.day_utc = today;
751 state.cost_today_usd_micros = 0;
752 }
753 if state.hour_utc != hour {
754 state.hour_utc = hour;
755 state.cost_hour_usd_micros = 0;
756 }
757}
758
759fn utc_day_key() -> i32 {
760 (clock::now_utc().date()
761 - time::Date::from_calendar_date(1970, time::Month::January, 1).expect("valid epoch date"))
762 .whole_days() as i32
763}
764
765fn utc_hour_key() -> i64 {
766 clock::now_utc().unix_timestamp() / 3_600
767}
768
769pub fn snapshot_trigger_bindings() -> Vec<TriggerBindingSnapshot> {
770 TRIGGER_REGISTRY.with(|slot| {
771 let registry = slot.borrow();
772 let mut snapshots = Vec::new();
773 for bindings in registry.bindings.values() {
774 for binding in bindings {
775 snapshots.push(binding.snapshot());
776 }
777 }
778 snapshots.sort_by(|left, right| {
779 left.id
780 .cmp(&right.id)
781 .then(left.version.cmp(&right.version))
782 .then(left.state.as_str().cmp(right.state.as_str()))
783 });
784 snapshots
785 })
786}
787
788#[allow(clippy::arc_with_non_send_sync)]
789pub fn resolve_trigger_binding_as_of(
790 id: &str,
791 as_of: OffsetDateTime,
792) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
793 let version = binding_version_as_of(id, as_of)?;
794 resolve_trigger_binding_version(id, version)
795}
796
797#[allow(clippy::arc_with_non_send_sync)]
798pub fn resolve_live_or_as_of(
799 id: &str,
800 recorded: RecordedTriggerBinding,
801) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
802 match resolve_live_trigger_binding(id, Some(recorded.version)) {
803 Ok(binding) => Ok(binding),
804 Err(TriggerRegistryError::UnknownBindingVersion { .. }) => {
805 let binding = resolve_trigger_binding_as_of(id, recorded.received_at)?;
806 let mut metadata = BTreeMap::new();
807 metadata.insert("trigger_id".to_string(), serde_json::json!(id));
808 metadata.insert(
809 "recorded_version".to_string(),
810 serde_json::json!(recorded.version),
811 );
812 metadata.insert(
813 "received_at".to_string(),
814 serde_json::json!(recorded
815 .received_at
816 .format(&time::format_description::well_known::Rfc3339)
817 .unwrap_or_else(|_| recorded.received_at.to_string())),
818 );
819 metadata.insert(
820 "resolved_version".to_string(),
821 serde_json::json!(binding.version),
822 );
823 crate::events::log_warn_meta(
824 "replay.binding_version_gc_fallback",
825 "trigger replay fell back to lifecycle history after binding version GC",
826 metadata,
827 );
828 Ok(binding)
829 }
830 Err(error) => Err(error),
831 }
832}
833
834pub fn binding_version_as_of(id: &str, as_of: OffsetDateTime) -> Result<u32, TriggerRegistryError> {
835 TRIGGER_REGISTRY.with(|slot| {
836 let registry = slot.borrow();
837 registry.binding_version_as_of(id, as_of)
838 })
839}
840
841#[allow(clippy::arc_with_non_send_sync)]
842fn resolve_trigger_binding_version(
843 id: &str,
844 version: u32,
845) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
846 TRIGGER_REGISTRY.with(|slot| {
847 let registry = slot.borrow();
848 registry
849 .binding(id, version)
850 .ok_or_else(|| TriggerRegistryError::UnknownBindingVersion {
851 id: id.to_string(),
852 version,
853 })
854 })
855}
856
857#[allow(clippy::arc_with_non_send_sync)]
858pub fn resolve_live_trigger_binding(
859 id: &str,
860 version: Option<u32>,
861) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
862 TRIGGER_REGISTRY.with(|slot| {
863 let registry = slot.borrow();
864 if let Some(version) = version {
865 let binding = registry.binding(id, version).ok_or_else(|| {
866 TriggerRegistryError::UnknownBindingVersion {
867 id: id.to_string(),
868 version,
869 }
870 })?;
871 if binding.state_snapshot() == TriggerState::Terminated {
872 return Err(TriggerRegistryError::UnknownBindingVersion {
873 id: id.to_string(),
874 version,
875 });
876 }
877 return Ok(binding);
878 }
879
880 registry
881 .live_bindings_any_source(id)
882 .into_iter()
883 .max_by_key(|binding| binding.version)
884 .ok_or_else(|| TriggerRegistryError::UnknownId(id.to_string()))
885 })
886}
887
888pub(crate) fn matching_bindings(event: &super::TriggerEvent) -> Vec<Arc<TriggerBinding>> {
889 TRIGGER_REGISTRY.with(|slot| {
890 let registry = slot.borrow();
891 let Some(binding_ids) = registry.by_provider.get(event.provider.as_str()) else {
892 return Vec::new();
893 };
894
895 let mut bindings = Vec::new();
896 for id in binding_ids {
897 let Some(versions) = registry.bindings.get(id) else {
898 continue;
899 };
900 for binding in versions {
901 if binding.state_snapshot() != TriggerState::Active {
902 continue;
903 }
904 if !binding.match_events.is_empty()
905 && !binding.match_events.iter().any(|kind| kind == &event.kind)
906 {
907 continue;
908 }
909 bindings.push(binding.clone());
910 }
911 }
912
913 bindings.sort_by(|left, right| {
914 left.id
915 .as_str()
916 .cmp(right.id.as_str())
917 .then(left.version.cmp(&right.version))
918 });
919 bindings
920 })
921}
922
923pub async fn install_manifest_triggers(
924 specs: Vec<TriggerBindingSpec>,
925) -> Result<(), TriggerRegistryError> {
926 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
927 let registry = &mut *slot.borrow_mut();
928 registry.refresh_runtime_context();
929 let mut touched_ids = BTreeSet::new();
930
931 let mut incoming = BTreeMap::new();
932 for spec in specs {
933 let spec_id = spec.id.clone();
934 if spec.source != TriggerBindingSource::Manifest {
935 return Err(TriggerRegistryError::InvalidSpec(format!(
936 "manifest install received non-manifest trigger '{}'",
937 spec_id
938 )));
939 }
940 if spec_id.trim().is_empty() {
941 return Err(TriggerRegistryError::InvalidSpec(
942 "manifest trigger id cannot be empty".to_string(),
943 ));
944 }
945 if incoming.insert(spec_id.clone(), spec).is_some() {
946 return Err(TriggerRegistryError::DuplicateId(spec_id));
947 }
948 }
949
950 let mut lifecycle = Vec::new();
951 let existing_ids: Vec<String> = registry
952 .bindings
953 .iter()
954 .filter(|(_, bindings)| {
955 bindings.iter().any(|binding| {
956 binding.source == TriggerBindingSource::Manifest
957 && binding.state_snapshot() != TriggerState::Terminated
958 })
959 })
960 .map(|(id, _)| id.clone())
961 .collect();
962
963 for id in existing_ids {
964 let live_manifest = registry.live_bindings(&id, TriggerBindingSource::Manifest);
965 let Some(spec) = incoming.remove(&id) else {
966 for binding in live_manifest {
967 registry.transition_binding_to_draining(&binding, &mut lifecycle);
968 }
969 touched_ids.insert(id.clone());
970 continue;
971 };
972
973 let has_matching_active = live_manifest.iter().any(|binding| {
974 binding.definition_fingerprint == spec.definition_fingerprint
975 && matches!(
976 binding.state_snapshot(),
977 TriggerState::Registering | TriggerState::Active
978 )
979 });
980 if has_matching_active {
981 continue;
982 }
983
984 for binding in live_manifest {
985 registry.transition_binding_to_draining(&binding, &mut lifecycle);
986 }
987
988 let version = registry.next_version_for_spec(&spec);
989 registry.register_binding(spec, version, &mut lifecycle);
990 touched_ids.insert(id.clone());
991 }
992
993 for spec in incoming.into_values() {
994 touched_ids.insert(spec.id.clone());
995 let version = registry.next_version_for_spec(&spec);
996 registry.register_binding(spec, version, &mut lifecycle);
997 }
998
999 for id in touched_ids {
1000 registry.gc_terminated_versions(&id);
1001 }
1002
1003 Ok((registry.event_log.clone(), lifecycle))
1004 })?;
1005
1006 append_lifecycle_events(event_log, events).await
1007}
1008
1009pub async fn dynamic_register(
1010 mut spec: TriggerBindingSpec,
1011) -> Result<TriggerId, TriggerRegistryError> {
1012 if spec.id.trim().is_empty() {
1013 spec.id = format!("dynamic_trigger_{}", Uuid::now_v7());
1014 }
1015 spec.source = TriggerBindingSource::Dynamic;
1016 let id = spec.id.clone();
1017 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1018 let registry = &mut *slot.borrow_mut();
1019 registry.refresh_runtime_context();
1020
1021 if registry.bindings.contains_key(id.as_str()) {
1022 return Err(TriggerRegistryError::DuplicateId(id.clone()));
1023 }
1024
1025 let mut lifecycle = Vec::new();
1026 let version = registry.next_version_for_spec(&spec);
1027 registry.register_binding(spec, version, &mut lifecycle);
1028 Ok((registry.event_log.clone(), lifecycle))
1029 })?;
1030
1031 append_lifecycle_events(event_log, events).await?;
1032 Ok(TriggerId::new(id))
1033}
1034
1035pub async fn dynamic_deregister(id: &str) -> Result<(), TriggerRegistryError> {
1036 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1037 let registry = &mut *slot.borrow_mut();
1038 let live_dynamic = registry.live_bindings(id, TriggerBindingSource::Dynamic);
1039 if live_dynamic.is_empty() {
1040 return Err(TriggerRegistryError::UnknownId(id.to_string()));
1041 }
1042
1043 let mut lifecycle = Vec::new();
1044 for binding in live_dynamic {
1045 registry.transition_binding_to_draining(&binding, &mut lifecycle);
1046 }
1047 Ok((registry.event_log.clone(), lifecycle))
1048 })?;
1049
1050 append_lifecycle_events(event_log, events).await
1051}
1052
1053pub async fn drain(id: &str) -> Result<(), TriggerRegistryError> {
1054 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1055 let registry = &mut *slot.borrow_mut();
1056 let live = registry.live_bindings_any_source(id);
1057 if live.is_empty() {
1058 return Err(TriggerRegistryError::UnknownId(id.to_string()));
1059 }
1060
1061 let mut lifecycle = Vec::new();
1062 for binding in live {
1063 registry.transition_binding_to_draining(&binding, &mut lifecycle);
1064 }
1065 Ok((registry.event_log.clone(), lifecycle))
1066 })?;
1067
1068 append_lifecycle_events(event_log, events).await
1069}
1070
1071fn pin_trigger_binding_inner(
1072 id: &str,
1073 version: u32,
1074 allow_terminated: bool,
1075) -> Result<(), TriggerRegistryError> {
1076 TRIGGER_REGISTRY.with(|slot| {
1077 let registry = slot.borrow();
1078 let binding = registry.binding(id, version).ok_or_else(|| {
1079 TriggerRegistryError::UnknownBindingVersion {
1080 id: id.to_string(),
1081 version,
1082 }
1083 })?;
1084 match binding.state_snapshot() {
1085 TriggerState::Terminated if !allow_terminated => {
1086 Err(TriggerRegistryError::InvalidSpec(format!(
1087 "trigger binding '{}' version {} is terminated",
1088 id, version
1089 )))
1090 }
1091 _ => {
1092 binding.in_flight.fetch_add(1, Ordering::Relaxed);
1093 Ok(())
1094 }
1095 }
1096 })
1097}
1098
1099pub fn pin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1100 pin_trigger_binding_inner(id, version, false)
1101}
1102
1103pub async fn unpin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1104 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1105 let registry = &mut *slot.borrow_mut();
1106 let binding = registry.binding(id, version).ok_or_else(|| {
1107 TriggerRegistryError::UnknownBindingVersion {
1108 id: id.to_string(),
1109 version,
1110 }
1111 })?;
1112 let current = binding.in_flight.load(Ordering::Relaxed);
1113 if current == 0 {
1114 return Err(TriggerRegistryError::InvalidSpec(format!(
1115 "trigger binding '{}' version {} has no in-flight events",
1116 id, version
1117 )));
1118 }
1119 binding.in_flight.fetch_sub(1, Ordering::Relaxed);
1120
1121 let mut lifecycle = Vec::new();
1122 registry.maybe_finalize_draining(&binding, &mut lifecycle);
1123 registry.gc_terminated_versions(binding.id.as_str());
1124 Ok((registry.event_log.clone(), lifecycle))
1125 })?;
1126
1127 append_lifecycle_events(event_log, events).await
1128}
1129
1130pub fn begin_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1131 begin_in_flight_inner(id, version, false)
1132}
1133
1134pub(crate) fn begin_replay_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1135 begin_in_flight_inner(id, version, true)
1136}
1137
1138fn begin_in_flight_inner(
1139 id: &str,
1140 version: u32,
1141 allow_terminated: bool,
1142) -> Result<(), TriggerRegistryError> {
1143 pin_trigger_binding_inner(id, version, allow_terminated)?;
1144 TRIGGER_REGISTRY.with(|slot| {
1145 let registry = slot.borrow();
1146 let binding = registry.binding(id, version).ok_or_else(|| {
1147 TriggerRegistryError::UnknownBindingVersion {
1148 id: id.to_string(),
1149 version,
1150 }
1151 })?;
1152 binding.metrics.received.fetch_add(1, Ordering::Relaxed);
1153 *binding
1154 .metrics
1155 .last_received_ms
1156 .lock()
1157 .expect("trigger metrics poisoned") = Some(now_ms());
1158 Ok(())
1159 })
1160}
1161
1162pub async fn finish_in_flight(
1163 id: &str,
1164 version: u32,
1165 outcome: TriggerDispatchOutcome,
1166) -> Result<(), TriggerRegistryError> {
1167 TRIGGER_REGISTRY.with(|slot| {
1168 let registry = &mut *slot.borrow_mut();
1169 let binding = registry.binding(id, version).ok_or_else(|| {
1170 TriggerRegistryError::UnknownBindingVersion {
1171 id: id.to_string(),
1172 version,
1173 }
1174 })?;
1175 let current = binding.in_flight.load(Ordering::Relaxed);
1176 if current == 0 {
1177 return Err(TriggerRegistryError::InvalidSpec(format!(
1178 "trigger binding '{}' version {} has no in-flight events",
1179 id, version
1180 )));
1181 }
1182 match outcome {
1183 TriggerDispatchOutcome::Dispatched => {
1184 binding.metrics.dispatched.fetch_add(1, Ordering::Relaxed);
1185 }
1186 TriggerDispatchOutcome::Failed => {
1187 binding.metrics.failed.fetch_add(1, Ordering::Relaxed);
1188 }
1189 TriggerDispatchOutcome::Dlq => {
1190 binding.metrics.dlq.fetch_add(1, Ordering::Relaxed);
1191 }
1192 }
1193 Ok(())
1194 })?;
1195
1196 unpin_trigger_binding(id, version).await
1197}
1198
1199impl TriggerRegistry {
1200 fn refresh_runtime_context(&mut self) {
1201 if self.event_log.is_none() {
1202 self.event_log = active_event_log();
1203 }
1204 if self.secret_provider.is_none() {
1205 self.secret_provider = default_secret_provider();
1206 }
1207 }
1208
1209 fn binding(&self, id: &str, version: u32) -> Option<Arc<TriggerBinding>> {
1210 self.bindings
1211 .get(id)
1212 .and_then(|bindings| bindings.iter().find(|binding| binding.version == version))
1213 .cloned()
1214 }
1215
1216 fn live_bindings(&self, id: &str, source: TriggerBindingSource) -> Vec<Arc<TriggerBinding>> {
1217 self.bindings
1218 .get(id)
1219 .into_iter()
1220 .flat_map(|bindings| bindings.iter())
1221 .filter(|binding| {
1222 binding.source == source && binding.state_snapshot() != TriggerState::Terminated
1223 })
1224 .cloned()
1225 .collect()
1226 }
1227
1228 fn live_bindings_any_source(&self, id: &str) -> Vec<Arc<TriggerBinding>> {
1229 self.bindings
1230 .get(id)
1231 .into_iter()
1232 .flat_map(|bindings| bindings.iter())
1233 .filter(|binding| binding.state_snapshot() != TriggerState::Terminated)
1234 .cloned()
1235 .collect()
1236 }
1237
1238 fn next_version_for_spec(&self, spec: &TriggerBindingSpec) -> u32 {
1239 if let Some(version) = self
1240 .bindings
1241 .get(spec.id.as_str())
1242 .into_iter()
1243 .flat_map(|bindings| bindings.iter())
1244 .find(|binding| binding.definition_fingerprint == spec.definition_fingerprint)
1245 .map(|binding| binding.version)
1246 {
1247 return version;
1248 }
1249
1250 let historical =
1251 self.historical_versions_for(spec.id.as_str(), spec.definition_fingerprint.as_str());
1252 if let Some(version) = historical.matching_version {
1253 return version;
1254 }
1255
1256 self.bindings
1257 .get(spec.id.as_str())
1258 .into_iter()
1259 .flat_map(|bindings| bindings.iter())
1260 .map(|binding| binding.version)
1261 .chain(historical.max_version)
1262 .max()
1263 .unwrap_or(0)
1264 + 1
1265 }
1266
1267 fn gc_terminated_versions(&mut self, id: &str) {
1268 let Some(bindings) = self.bindings.get_mut(id) else {
1269 return;
1270 };
1271
1272 let mut newest_versions: Vec<u32> =
1273 bindings.iter().map(|binding| binding.version).collect();
1274 newest_versions.sort_unstable_by(|left, right| right.cmp(left));
1275 newest_versions.truncate(TERMINATED_VERSION_RETENTION_LIMIT);
1276 let retained_versions: BTreeSet<u32> = newest_versions.into_iter().collect();
1277
1278 bindings.retain(|binding| {
1279 binding.state_snapshot() != TriggerState::Terminated
1280 || retained_versions.contains(&binding.version)
1281 });
1282
1283 if bindings.is_empty() {
1284 self.bindings.remove(id);
1285 }
1286 }
1287
1288 fn historical_versions_for(&self, id: &str, fingerprint: &str) -> HistoricalVersionLookup {
1289 let mut lookup = HistoricalVersionLookup::default();
1290 for record in self.lifecycle_records_for(id) {
1291 lookup.max_version = Some(
1292 lookup
1293 .max_version
1294 .unwrap_or(0)
1295 .max(record.transition.version),
1296 );
1297 if record.transition.definition_fingerprint.as_deref() == Some(fingerprint) {
1298 lookup.matching_version = Some(record.transition.version);
1299 }
1300 }
1301 lookup
1302 }
1303
1304 fn binding_version_as_of(
1305 &self,
1306 id: &str,
1307 as_of: OffsetDateTime,
1308 ) -> Result<u32, TriggerRegistryError> {
1309 let cutoff_ms = (as_of.unix_timestamp_nanos() / 1_000_000) as i64;
1310 let mut active_version = None;
1311 for record in self.lifecycle_records_for(id) {
1312 if record.occurred_at_ms > cutoff_ms {
1313 break;
1314 }
1315 match record.transition.to_state {
1316 TriggerState::Active => active_version = Some(record.transition.version),
1317 TriggerState::Draining | TriggerState::Terminated => {
1318 if active_version == Some(record.transition.version) {
1319 active_version = None;
1320 }
1321 }
1322 TriggerState::Registering => {}
1323 }
1324 }
1325
1326 active_version.ok_or_else(|| {
1327 TriggerRegistryError::InvalidSpec(format!(
1328 "no active trigger binding '{}' at {}",
1329 id,
1330 as_of
1331 .format(&time::format_description::well_known::Rfc3339)
1332 .unwrap_or_else(|_| as_of.to_string())
1333 ))
1334 })
1335 }
1336
1337 fn lifecycle_records_for(&self, id: &str) -> Vec<HistoricalLifecycleRecord> {
1338 let Some(event_log) = self.event_log.as_ref() else {
1339 return Vec::new();
1340 };
1341 let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1342 .expect("static triggers.lifecycle topic should always be valid");
1343 futures::executor::block_on(event_log.read_range(&topic, None, usize::MAX))
1344 .unwrap_or_default()
1345 .into_iter()
1346 .filter_map(|(_, event)| {
1347 let occurred_at_ms = event.occurred_at_ms;
1348 let transition: LifecycleStateTransitionRecord =
1349 serde_json::from_value(event.payload).ok()?;
1350 (transition.id == id).then_some(HistoricalLifecycleRecord {
1351 occurred_at_ms,
1352 transition,
1353 })
1354 })
1355 .collect()
1356 }
1357
1358 #[allow(clippy::arc_with_non_send_sync)]
1359 fn register_binding(
1360 &mut self,
1361 spec: TriggerBindingSpec,
1362 version: u32,
1363 lifecycle: &mut Vec<LogEvent>,
1364 ) -> Arc<TriggerBinding> {
1365 let binding = Arc::new(TriggerBinding::new(spec, version));
1366 self.by_provider
1367 .entry(binding.provider.as_str().to_string())
1368 .or_default()
1369 .insert(binding.id.as_str().to_string());
1370 self.bindings
1371 .entry(binding.id.as_str().to_string())
1372 .or_default()
1373 .push(binding.clone());
1374 lifecycle.push(lifecycle_event(&binding, None, TriggerState::Registering));
1375 self.transition_binding_state(&binding, TriggerState::Active, lifecycle);
1376 binding
1377 }
1378
1379 fn transition_binding_to_draining(
1380 &self,
1381 binding: &Arc<TriggerBinding>,
1382 lifecycle: &mut Vec<LogEvent>,
1383 ) {
1384 if matches!(binding.state_snapshot(), TriggerState::Terminated) {
1385 return;
1386 }
1387 self.transition_binding_state(binding, TriggerState::Draining, lifecycle);
1388 self.maybe_finalize_draining(binding, lifecycle);
1389 }
1390
1391 fn maybe_finalize_draining(
1392 &self,
1393 binding: &Arc<TriggerBinding>,
1394 lifecycle: &mut Vec<LogEvent>,
1395 ) {
1396 if binding.state_snapshot() == TriggerState::Draining
1397 && binding.in_flight.load(Ordering::Relaxed) == 0
1398 {
1399 self.transition_binding_state(binding, TriggerState::Terminated, lifecycle);
1400 }
1401 }
1402
1403 fn transition_binding_state(
1404 &self,
1405 binding: &Arc<TriggerBinding>,
1406 next: TriggerState,
1407 lifecycle: &mut Vec<LogEvent>,
1408 ) {
1409 let mut state = binding.state.lock().expect("trigger state poisoned");
1410 let previous = *state;
1411 if previous == next {
1412 return;
1413 }
1414 *state = next;
1415 drop(state);
1416 lifecycle.push(lifecycle_event(binding, Some(previous), next));
1417 }
1418}
1419
1420fn lifecycle_event(
1421 binding: &TriggerBinding,
1422 from_state: Option<TriggerState>,
1423 to_state: TriggerState,
1424) -> LogEvent {
1425 LogEvent::new(
1426 "state_transition",
1427 serde_json::json!({
1428 "id": binding.id.as_str(),
1429 "binding_key": binding.binding_key(),
1430 "version": binding.version,
1431 "provider": binding.provider.as_str(),
1432 "kind": &binding.kind,
1433 "source": binding.source.as_str(),
1434 "handler_kind": binding.handler.kind(),
1435 "definition_fingerprint": &binding.definition_fingerprint,
1436 "from_state": from_state.map(TriggerState::as_str),
1437 "to_state": to_state.as_str(),
1438 }),
1439 )
1440}
1441
1442async fn append_lifecycle_events(
1443 event_log: Option<Arc<AnyEventLog>>,
1444 events: Vec<LogEvent>,
1445) -> Result<(), TriggerRegistryError> {
1446 let Some(event_log) = event_log else {
1447 return Ok(());
1448 };
1449 if events.is_empty() {
1450 return Ok(());
1451 }
1452
1453 let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1454 .expect("static triggers.lifecycle topic should always be valid");
1455 for event in events {
1456 event_log
1457 .append(&topic, event)
1458 .await
1459 .map_err(|error| TriggerRegistryError::EventLog(error.to_string()))?;
1460 }
1461 Ok(())
1462}
1463
1464fn default_secret_provider() -> Option<Arc<dyn SecretProvider>> {
1465 configured_default_chain(default_secret_namespace())
1466 .ok()
1467 .map(|provider| Arc::new(provider) as Arc<dyn SecretProvider>)
1468}
1469
1470fn default_secret_namespace() -> String {
1471 if let Ok(namespace) = std::env::var("HARN_SECRET_NAMESPACE") {
1472 if !namespace.trim().is_empty() {
1473 return namespace;
1474 }
1475 }
1476
1477 let cwd = std::env::current_dir().unwrap_or_default();
1478 let leaf = cwd
1479 .file_name()
1480 .and_then(|name| name.to_str())
1481 .filter(|name| !name.is_empty())
1482 .unwrap_or("workspace");
1483 format!("harn/{leaf}")
1484}
1485
1486fn now_ms() -> i64 {
1487 clock::now_ms()
1488}
1489
1490#[cfg(test)]
1491mod tests {
1492 use super::*;
1493 use crate::event_log::{install_default_for_base_dir, reset_active_event_log};
1494 use crate::events::{add_event_sink, clear_event_sinks, CollectorSink, EventLevel};
1495 use crate::triggers::test_util::timing::FILE_WATCH_FALLBACK_POLL;
1496 use std::rc::Rc;
1497 use time::OffsetDateTime;
1498
1499 fn manifest_spec(id: &str, fingerprint: &str) -> TriggerBindingSpec {
1500 TriggerBindingSpec {
1501 id: id.to_string(),
1502 source: TriggerBindingSource::Manifest,
1503 kind: "webhook".to_string(),
1504 provider: ProviderId::from("github"),
1505 autonomy_tier: crate::AutonomyTier::ActAuto,
1506 handler: TriggerHandlerSpec::Worker {
1507 queue: format!("{id}-queue"),
1508 },
1509 dispatch_priority: crate::WorkerQueuePriority::Normal,
1510 when: None,
1511 when_budget: None,
1512 retry: TriggerRetryConfig::default(),
1513 match_events: vec!["issues.opened".to_string()],
1514 dedupe_key: Some("event.dedupe_key".to_string()),
1515 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1516 filter: Some("event.kind".to_string()),
1517 daily_cost_usd: Some(5.0),
1518 hourly_cost_usd: None,
1519 max_autonomous_decisions_per_hour: None,
1520 max_autonomous_decisions_per_day: None,
1521 on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1522 max_concurrent: Some(10),
1523 flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1524 manifest_path: None,
1525 package_name: Some("workspace".to_string()),
1526 definition_fingerprint: fingerprint.to_string(),
1527 }
1528 }
1529
1530 fn dynamic_spec(id: &str) -> TriggerBindingSpec {
1531 TriggerBindingSpec {
1532 id: id.to_string(),
1533 source: TriggerBindingSource::Dynamic,
1534 kind: "webhook".to_string(),
1535 provider: ProviderId::from("github"),
1536 autonomy_tier: crate::AutonomyTier::ActAuto,
1537 handler: TriggerHandlerSpec::Worker {
1538 queue: format!("{id}-queue"),
1539 },
1540 dispatch_priority: crate::WorkerQueuePriority::Normal,
1541 when: None,
1542 when_budget: None,
1543 retry: TriggerRetryConfig::default(),
1544 match_events: vec!["issues.opened".to_string()],
1545 dedupe_key: None,
1546 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1547 filter: None,
1548 daily_cost_usd: None,
1549 hourly_cost_usd: None,
1550 max_autonomous_decisions_per_hour: None,
1551 max_autonomous_decisions_per_day: None,
1552 on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1553 max_concurrent: None,
1554 flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1555 manifest_path: None,
1556 package_name: None,
1557 definition_fingerprint: format!("dynamic:{id}"),
1558 }
1559 }
1560
1561 #[tokio::test(flavor = "current_thread")]
1562 async fn manifest_loaded_trigger_registers_with_zeroed_metrics() {
1563 clear_trigger_registry();
1564
1565 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1566 .await
1567 .expect("manifest trigger installs");
1568
1569 let snapshots = snapshot_trigger_bindings();
1570 assert_eq!(snapshots.len(), 1);
1571 let binding = &snapshots[0];
1572 assert_eq!(binding.id, "github-new-issue");
1573 assert_eq!(binding.version, 1);
1574 assert_eq!(binding.state, TriggerState::Active);
1575 assert_eq!(binding.metrics, TriggerMetricsSnapshot::default());
1576
1577 clear_trigger_registry();
1578 }
1579
1580 #[tokio::test(flavor = "current_thread")]
1581 async fn dynamic_register_assigns_unique_ids_and_rejects_duplicates() {
1582 clear_trigger_registry();
1583
1584 let first = dynamic_register(dynamic_spec("dynamic-a"))
1585 .await
1586 .expect("first dynamic trigger");
1587 let second = dynamic_register(dynamic_spec("dynamic-b"))
1588 .await
1589 .expect("second dynamic trigger");
1590 assert_ne!(first, second);
1591
1592 let error = dynamic_register(dynamic_spec("dynamic-a"))
1593 .await
1594 .expect_err("duplicate id should fail");
1595 assert!(matches!(error, TriggerRegistryError::DuplicateId(_)));
1596
1597 clear_trigger_registry();
1598 }
1599
1600 #[tokio::test(flavor = "current_thread")]
1601 async fn drain_waits_for_in_flight_events_before_terminating() {
1602 clear_trigger_registry();
1603
1604 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1605 .await
1606 .expect("manifest trigger installs");
1607 begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1608
1609 drain("github-new-issue").await.expect("drain succeeds");
1610 let binding = snapshot_trigger_bindings()
1611 .into_iter()
1612 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1613 .expect("binding snapshot");
1614 assert_eq!(binding.state, TriggerState::Draining);
1615 assert_eq!(binding.metrics.in_flight, 1);
1616
1617 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1618 .await
1619 .expect("finish in-flight event");
1620 let binding = snapshot_trigger_bindings()
1621 .into_iter()
1622 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1623 .expect("binding snapshot");
1624 assert_eq!(binding.state, TriggerState::Terminated);
1625 assert_eq!(binding.metrics.in_flight, 0);
1626
1627 clear_trigger_registry();
1628 }
1629
1630 #[tokio::test(flavor = "current_thread")]
1631 async fn hot_reload_registers_new_version_while_old_binding_drains() {
1632 clear_trigger_registry();
1633
1634 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1635 .await
1636 .expect("initial manifest trigger installs");
1637 begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1638
1639 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1640 .await
1641 .expect("updated manifest trigger installs");
1642
1643 let snapshots = snapshot_trigger_bindings();
1644 assert_eq!(snapshots.len(), 2);
1645 let old = snapshots
1646 .iter()
1647 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1648 .expect("old binding");
1649 let new = snapshots
1650 .iter()
1651 .find(|binding| binding.id == "github-new-issue" && binding.version == 2)
1652 .expect("new binding");
1653 assert_eq!(old.state, TriggerState::Draining);
1654 assert_eq!(new.state, TriggerState::Active);
1655
1656 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1657 .await
1658 .expect("finish old in-flight event");
1659 let old = snapshot_trigger_bindings()
1660 .into_iter()
1661 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1662 .expect("old binding");
1663 assert_eq!(old.state, TriggerState::Terminated);
1664
1665 clear_trigger_registry();
1666 }
1667
1668 #[tokio::test(flavor = "current_thread")]
1669 async fn gc_drops_terminated_versions_beyond_retention_limit() {
1670 clear_trigger_registry();
1671
1672 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1673 .await
1674 .expect("install v1");
1675 begin_in_flight("github-new-issue", 1).expect("pin v1");
1676
1677 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1678 .await
1679 .expect("install v2");
1680 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1681 .await
1682 .expect("finish v1");
1683
1684 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
1685 .await
1686 .expect("install v3");
1687
1688 let snapshots = snapshot_trigger_bindings();
1689 let versions: Vec<u32> = snapshots
1690 .into_iter()
1691 .filter(|binding| binding.id == "github-new-issue")
1692 .map(|binding| binding.version)
1693 .collect();
1694 assert_eq!(versions, vec![2, 3]);
1695
1696 clear_trigger_registry();
1697 }
1698
1699 #[tokio::test(flavor = "current_thread")]
1700 async fn lifecycle_transitions_append_to_event_log() {
1701 clear_trigger_registry();
1702 reset_active_event_log();
1703 let tempdir = tempfile::tempdir().expect("tempdir");
1704 let log = install_default_for_base_dir(tempdir.path()).expect("install event log");
1705
1706 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1707 .await
1708 .expect("manifest trigger installs");
1709 begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1710 drain("github-new-issue").await.expect("drain succeeds");
1711 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1712 .await
1713 .expect("finish event");
1714
1715 let topic = Topic::new("triggers.lifecycle").expect("valid lifecycle topic");
1716 let events = log
1717 .read_range(&topic, None, 32)
1718 .await
1719 .expect("read lifecycle events");
1720 let states: Vec<String> = events
1721 .into_iter()
1722 .filter_map(|(_, event)| {
1723 event
1724 .payload
1725 .get("to_state")
1726 .and_then(|value| value.as_str())
1727 .map(|value| value.to_string())
1728 })
1729 .collect();
1730 assert_eq!(
1731 states,
1732 vec![
1733 "registering".to_string(),
1734 "active".to_string(),
1735 "draining".to_string(),
1736 "terminated".to_string(),
1737 ]
1738 );
1739
1740 reset_active_event_log();
1741 clear_trigger_registry();
1742 }
1743
1744 #[tokio::test(flavor = "current_thread")]
1745 async fn version_history_reuses_historical_version_after_restart() {
1746 clear_trigger_registry();
1747 reset_active_event_log();
1748 let tempdir = tempfile::tempdir().expect("tempdir");
1749 install_default_for_base_dir(tempdir.path()).expect("install event log");
1750
1751 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1752 .await
1753 .expect("initial manifest trigger installs");
1754 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1755 .await
1756 .expect("updated manifest trigger installs");
1757
1758 clear_trigger_registry();
1759 reset_active_event_log();
1760 install_default_for_base_dir(tempdir.path()).expect("reopen event log");
1761
1762 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1763 .await
1764 .expect("manifest reload reuses historical version");
1765
1766 let binding = snapshot_trigger_bindings()
1767 .into_iter()
1768 .find(|binding| binding.id == "github-new-issue")
1769 .expect("binding snapshot");
1770 assert_eq!(binding.version, 2);
1771
1772 reset_active_event_log();
1773 clear_trigger_registry();
1774 }
1775
1776 #[tokio::test(flavor = "current_thread")]
1777 async fn binding_version_as_of_reports_historical_active_version() {
1778 clear_trigger_registry();
1779 reset_active_event_log();
1780 let tempdir = tempfile::tempdir().expect("tempdir");
1781 install_default_for_base_dir(tempdir.path()).expect("install event log");
1782
1783 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1784 .await
1785 .expect("initial manifest trigger installs");
1786 let before_reload = OffsetDateTime::now_utc();
1787 std::thread::sleep(FILE_WATCH_FALLBACK_POLL);
1788
1789 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1790 .await
1791 .expect("updated manifest trigger installs");
1792 let after_reload = OffsetDateTime::now_utc();
1793
1794 assert_eq!(
1795 binding_version_as_of("github-new-issue", before_reload)
1796 .expect("version before reload"),
1797 1
1798 );
1799 assert_eq!(
1800 binding_version_as_of("github-new-issue", after_reload).expect("version after reload"),
1801 2
1802 );
1803
1804 reset_active_event_log();
1805 clear_trigger_registry();
1806 }
1807
1808 #[tokio::test(flavor = "current_thread")]
1809 async fn resolve_live_or_as_of_logs_structured_gc_fallback() {
1810 clear_trigger_registry();
1811 reset_active_event_log();
1812 let sink = Rc::new(CollectorSink::new());
1813 clear_event_sinks();
1814 add_event_sink(sink.clone());
1815 let tempdir = tempfile::tempdir().expect("tempdir");
1816 install_default_for_base_dir(tempdir.path()).expect("install event log");
1817
1818 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1819 .await
1820 .expect("install v1");
1821 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1822 .await
1823 .expect("install v2");
1824 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
1825 .await
1826 .expect("install v3");
1827 let received_at = OffsetDateTime::now_utc();
1828 std::thread::sleep(FILE_WATCH_FALLBACK_POLL);
1829 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v4")])
1830 .await
1831 .expect("install v4");
1832
1833 let binding = resolve_live_or_as_of(
1834 "github-new-issue",
1835 RecordedTriggerBinding {
1836 version: 1,
1837 received_at,
1838 },
1839 )
1840 .expect("resolve fallback binding");
1841 assert_eq!(binding.version, 3);
1842
1843 let warning = sink
1844 .logs
1845 .borrow()
1846 .iter()
1847 .find(|log| log.category == "replay.binding_version_gc_fallback")
1848 .cloned()
1849 .expect("gc fallback warning");
1850 assert_eq!(warning.level, EventLevel::Warn);
1851 assert_eq!(
1852 warning.metadata.get("trigger_id"),
1853 Some(&serde_json::json!("github-new-issue"))
1854 );
1855 assert_eq!(
1856 warning.metadata.get("recorded_version"),
1857 Some(&serde_json::json!(1))
1858 );
1859 assert_eq!(
1860 warning.metadata.get("received_at"),
1861 Some(&serde_json::json!(received_at
1862 .format(&time::format_description::well_known::Rfc3339)
1863 .unwrap_or_else(|_| received_at.to_string())))
1864 );
1865 assert_eq!(
1866 warning.metadata.get("resolved_version"),
1867 Some(&serde_json::json!(3))
1868 );
1869
1870 clear_event_sinks();
1871 crate::events::reset_event_sinks();
1872 reset_active_event_log();
1873 clear_trigger_registry();
1874 }
1875}