1use serde::{Deserialize, Serialize};
2use std::cell::RefCell;
3use std::collections::{BTreeMap, BTreeSet, VecDeque};
4use std::path::PathBuf;
5use std::rc::Rc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::{Arc, Mutex};
8
9use time::OffsetDateTime;
10use uuid::Uuid;
11
12use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogEvent, Topic};
13use crate::llm::trigger_predicate::TriggerPredicateBudget;
14use crate::secrets::{configured_default_chain, SecretProvider};
15use crate::triggers::test_util::clock;
16use crate::trust_graph::AutonomyTier;
17use crate::value::VmClosure;
18
19use super::dispatcher::TriggerRetryConfig;
20use super::flow_control::TriggerFlowControlConfig;
21use super::ProviderId;
22
23#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
24pub struct TriggerId(String);
25
26impl TriggerId {
27 pub fn new(value: impl Into<String>) -> Self {
28 Self(value.into())
29 }
30
31 pub fn as_str(&self) -> &str {
32 &self.0
33 }
34}
35
36impl std::fmt::Display for TriggerId {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 self.0.fmt(f)
39 }
40}
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
43#[serde(rename_all = "snake_case")]
44pub enum TriggerState {
45 Registering,
46 Active,
47 Draining,
48 Terminated,
49}
50
51impl TriggerState {
52 pub fn as_str(self) -> &'static str {
53 match self {
54 Self::Registering => "registering",
55 Self::Active => "active",
56 Self::Draining => "draining",
57 Self::Terminated => "terminated",
58 }
59 }
60}
61
62#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
63#[serde(rename_all = "snake_case")]
64pub enum TriggerBindingSource {
65 Manifest,
66 Dynamic,
67}
68
69impl TriggerBindingSource {
70 pub fn as_str(self) -> &'static str {
71 match self {
72 Self::Manifest => "manifest",
73 Self::Dynamic => "dynamic",
74 }
75 }
76}
77
78#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
79#[serde(rename_all = "snake_case")]
80pub enum TriggerBudgetExhaustionStrategy {
81 #[default]
82 False,
83 RetryLater,
84 Fail,
85 Warn,
86}
87
88impl TriggerBudgetExhaustionStrategy {
89 pub fn as_str(self) -> &'static str {
90 match self {
91 Self::False => "false",
92 Self::RetryLater => "retry_later",
93 Self::Fail => "fail",
94 Self::Warn => "warn",
95 }
96 }
97}
98
99#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
100pub struct OrchestratorBudgetConfig {
101 pub daily_cost_usd: Option<f64>,
102 pub hourly_cost_usd: Option<f64>,
103}
104
105#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
106pub struct OrchestratorBudgetSnapshot {
107 pub daily_cost_usd: Option<f64>,
108 pub hourly_cost_usd: Option<f64>,
109 pub cost_today_usd_micros: u64,
110 pub cost_hour_usd_micros: u64,
111 pub day_utc: i32,
112 pub hour_utc: i64,
113}
114
115#[derive(Debug)]
116struct OrchestratorBudgetState {
117 config: OrchestratorBudgetConfig,
118 day_utc: i32,
119 hour_utc: i64,
120 cost_today_usd_micros: u64,
121 cost_hour_usd_micros: u64,
122}
123
124impl Default for OrchestratorBudgetState {
125 fn default() -> Self {
126 Self {
127 config: OrchestratorBudgetConfig::default(),
128 day_utc: utc_day_key(),
129 hour_utc: utc_hour_key(),
130 cost_today_usd_micros: 0,
131 cost_hour_usd_micros: 0,
132 }
133 }
134}
135
136#[derive(Clone)]
137pub enum TriggerHandlerSpec {
138 Local {
139 raw: String,
140 closure: Rc<VmClosure>,
141 },
142 A2a {
143 target: String,
144 allow_cleartext: bool,
145 },
146 Worker {
147 queue: String,
148 },
149}
150
151impl std::fmt::Debug for TriggerHandlerSpec {
152 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153 match self {
154 Self::Local { raw, .. } => f.debug_struct("Local").field("raw", raw).finish(),
155 Self::A2a {
156 target,
157 allow_cleartext,
158 } => f
159 .debug_struct("A2a")
160 .field("target", target)
161 .field("allow_cleartext", allow_cleartext)
162 .finish(),
163 Self::Worker { queue } => f.debug_struct("Worker").field("queue", queue).finish(),
164 }
165 }
166}
167
168impl TriggerHandlerSpec {
169 pub fn kind(&self) -> &'static str {
170 match self {
171 Self::Local { .. } => "local",
172 Self::A2a { .. } => "a2a",
173 Self::Worker { .. } => "worker",
174 }
175 }
176}
177
178#[derive(Clone)]
179pub struct TriggerPredicateSpec {
180 pub raw: String,
181 pub closure: Rc<VmClosure>,
182}
183
184impl std::fmt::Debug for TriggerPredicateSpec {
185 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186 f.debug_struct("TriggerPredicateSpec")
187 .field("raw", &self.raw)
188 .finish()
189 }
190}
191
192#[derive(Clone, Debug)]
193pub struct TriggerBindingSpec {
194 pub id: String,
195 pub source: TriggerBindingSource,
196 pub kind: String,
197 pub provider: ProviderId,
198 pub autonomy_tier: AutonomyTier,
199 pub handler: TriggerHandlerSpec,
200 pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
201 pub when: Option<TriggerPredicateSpec>,
202 pub when_budget: Option<TriggerPredicateBudget>,
203 pub retry: TriggerRetryConfig,
204 pub match_events: Vec<String>,
205 pub dedupe_key: Option<String>,
206 pub dedupe_retention_days: u32,
207 pub filter: Option<String>,
208 pub daily_cost_usd: Option<f64>,
209 pub hourly_cost_usd: Option<f64>,
210 pub max_autonomous_decisions_per_hour: Option<u64>,
211 pub max_autonomous_decisions_per_day: Option<u64>,
212 pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
213 pub max_concurrent: Option<u32>,
214 pub flow_control: TriggerFlowControlConfig,
215 pub manifest_path: Option<PathBuf>,
216 pub package_name: Option<String>,
217 pub definition_fingerprint: String,
218}
219
220#[derive(Debug)]
221pub struct TriggerMetrics {
222 pub received: AtomicU64,
223 pub dispatched: AtomicU64,
224 pub failed: AtomicU64,
225 pub dlq: AtomicU64,
226 pub last_received_ms: Mutex<Option<i64>>,
227 pub cost_total_usd_micros: AtomicU64,
228 pub cost_today_usd_micros: AtomicU64,
229 pub cost_hour_usd_micros: AtomicU64,
230 pub autonomous_decisions_total: AtomicU64,
231 pub autonomous_decisions_today: AtomicU64,
232 pub autonomous_decisions_hour: AtomicU64,
233}
234
235impl Default for TriggerMetrics {
236 fn default() -> Self {
237 Self {
238 received: AtomicU64::new(0),
239 dispatched: AtomicU64::new(0),
240 failed: AtomicU64::new(0),
241 dlq: AtomicU64::new(0),
242 last_received_ms: Mutex::new(None),
243 cost_total_usd_micros: AtomicU64::new(0),
244 cost_today_usd_micros: AtomicU64::new(0),
245 cost_hour_usd_micros: AtomicU64::new(0),
246 autonomous_decisions_total: AtomicU64::new(0),
247 autonomous_decisions_today: AtomicU64::new(0),
248 autonomous_decisions_hour: AtomicU64::new(0),
249 }
250 }
251}
252
253#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
254pub struct TriggerMetricsSnapshot {
255 pub received: u64,
256 pub dispatched: u64,
257 pub failed: u64,
258 pub dlq: u64,
259 pub in_flight: u64,
260 pub last_received_ms: Option<i64>,
261 pub cost_total_usd_micros: u64,
262 pub cost_today_usd_micros: u64,
263 pub cost_hour_usd_micros: u64,
264 pub autonomous_decisions_total: u64,
265 pub autonomous_decisions_today: u64,
266 pub autonomous_decisions_hour: u64,
267}
268
269pub struct TriggerBinding {
270 pub id: TriggerId,
271 pub version: u32,
272 pub source: TriggerBindingSource,
273 pub kind: String,
274 pub provider: ProviderId,
275 pub autonomy_tier: AutonomyTier,
276 pub handler: TriggerHandlerSpec,
277 pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
278 pub when: Option<TriggerPredicateSpec>,
279 pub when_budget: Option<TriggerPredicateBudget>,
280 pub retry: TriggerRetryConfig,
281 pub match_events: Vec<String>,
282 pub dedupe_key: Option<String>,
283 pub dedupe_retention_days: u32,
284 pub filter: Option<String>,
285 pub daily_cost_usd: Option<f64>,
286 pub hourly_cost_usd: Option<f64>,
287 pub max_autonomous_decisions_per_hour: Option<u64>,
288 pub max_autonomous_decisions_per_day: Option<u64>,
289 pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
290 pub max_concurrent: Option<u32>,
291 pub flow_control: TriggerFlowControlConfig,
292 pub manifest_path: Option<PathBuf>,
293 pub package_name: Option<String>,
294 pub definition_fingerprint: String,
295 pub state: Mutex<TriggerState>,
296 pub metrics: TriggerMetrics,
297 pub in_flight: AtomicU64,
298 pub cancel_token: Arc<AtomicBool>,
299 pub predicate_state: Mutex<TriggerPredicateState>,
300}
301
302#[derive(Clone, Debug, Default)]
303pub struct TriggerPredicateState {
304 pub budget_day_utc: Option<i32>,
305 pub budget_hour_utc: Option<i64>,
306 pub consecutive_failures: u32,
307 pub breaker_open_until_ms: Option<i64>,
308 pub recent_cost_usd_micros: VecDeque<u64>,
309}
310
311impl std::fmt::Debug for TriggerBinding {
312 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313 f.debug_struct("TriggerBinding")
314 .field("id", &self.id)
315 .field("version", &self.version)
316 .field("source", &self.source)
317 .field("kind", &self.kind)
318 .field("provider", &self.provider)
319 .field("handler_kind", &self.handler.kind())
320 .field("state", &self.state_snapshot())
321 .finish()
322 }
323}
324
325impl TriggerBinding {
326 pub fn snapshot(&self) -> TriggerBindingSnapshot {
327 TriggerBindingSnapshot {
328 id: self.id.as_str().to_string(),
329 version: self.version,
330 source: self.source,
331 kind: self.kind.clone(),
332 provider: self.provider.as_str().to_string(),
333 autonomy_tier: self.autonomy_tier,
334 handler_kind: self.handler.kind().to_string(),
335 state: self.state_snapshot(),
336 metrics: self.metrics_snapshot(),
337 daily_cost_usd: self.daily_cost_usd,
338 hourly_cost_usd: self.hourly_cost_usd,
339 max_autonomous_decisions_per_hour: self.max_autonomous_decisions_per_hour,
340 max_autonomous_decisions_per_day: self.max_autonomous_decisions_per_day,
341 on_budget_exhausted: self.on_budget_exhausted,
342 }
343 }
344
345 fn new(spec: TriggerBindingSpec, version: u32) -> Self {
346 Self {
347 id: TriggerId::new(spec.id),
348 version,
349 source: spec.source,
350 kind: spec.kind,
351 provider: spec.provider,
352 autonomy_tier: spec.autonomy_tier,
353 handler: spec.handler,
354 dispatch_priority: spec.dispatch_priority,
355 when: spec.when,
356 when_budget: spec.when_budget,
357 retry: spec.retry,
358 match_events: spec.match_events,
359 dedupe_key: spec.dedupe_key,
360 dedupe_retention_days: spec.dedupe_retention_days,
361 filter: spec.filter,
362 daily_cost_usd: spec.daily_cost_usd,
363 hourly_cost_usd: spec.hourly_cost_usd,
364 max_autonomous_decisions_per_hour: spec.max_autonomous_decisions_per_hour,
365 max_autonomous_decisions_per_day: spec.max_autonomous_decisions_per_day,
366 on_budget_exhausted: spec.on_budget_exhausted,
367 max_concurrent: spec.max_concurrent,
368 flow_control: spec.flow_control,
369 manifest_path: spec.manifest_path,
370 package_name: spec.package_name,
371 definition_fingerprint: spec.definition_fingerprint,
372 state: Mutex::new(TriggerState::Registering),
373 metrics: TriggerMetrics::default(),
374 in_flight: AtomicU64::new(0),
375 cancel_token: Arc::new(AtomicBool::new(false)),
376 predicate_state: Mutex::new(TriggerPredicateState::default()),
377 }
378 }
379
380 pub fn binding_key(&self) -> String {
381 format!("{}@v{}", self.id.as_str(), self.version)
382 }
383
384 pub fn state_snapshot(&self) -> TriggerState {
385 *self.state.lock().expect("trigger state poisoned")
386 }
387
388 pub fn metrics_snapshot(&self) -> TriggerMetricsSnapshot {
389 TriggerMetricsSnapshot {
390 received: self.metrics.received.load(Ordering::Relaxed),
391 dispatched: self.metrics.dispatched.load(Ordering::Relaxed),
392 failed: self.metrics.failed.load(Ordering::Relaxed),
393 dlq: self.metrics.dlq.load(Ordering::Relaxed),
394 in_flight: self.in_flight.load(Ordering::Relaxed),
395 last_received_ms: *self
396 .metrics
397 .last_received_ms
398 .lock()
399 .expect("trigger metrics poisoned"),
400 cost_total_usd_micros: self.metrics.cost_total_usd_micros.load(Ordering::Relaxed),
401 cost_today_usd_micros: self.metrics.cost_today_usd_micros.load(Ordering::Relaxed),
402 cost_hour_usd_micros: self.metrics.cost_hour_usd_micros.load(Ordering::Relaxed),
403 autonomous_decisions_total: self
404 .metrics
405 .autonomous_decisions_total
406 .load(Ordering::Relaxed),
407 autonomous_decisions_today: self
408 .metrics
409 .autonomous_decisions_today
410 .load(Ordering::Relaxed),
411 autonomous_decisions_hour: self
412 .metrics
413 .autonomous_decisions_hour
414 .load(Ordering::Relaxed),
415 }
416 }
417}
418
419#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
420pub struct TriggerBindingSnapshot {
421 pub id: String,
422 pub version: u32,
423 pub source: TriggerBindingSource,
424 pub kind: String,
425 pub provider: String,
426 pub autonomy_tier: AutonomyTier,
427 pub handler_kind: String,
428 pub state: TriggerState,
429 pub metrics: TriggerMetricsSnapshot,
430 pub daily_cost_usd: Option<f64>,
431 pub hourly_cost_usd: Option<f64>,
432 pub max_autonomous_decisions_per_hour: Option<u64>,
433 pub max_autonomous_decisions_per_day: Option<u64>,
434 pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
435}
436
437#[derive(Clone, Copy, Debug, PartialEq, Eq)]
438pub enum TriggerDispatchOutcome {
439 Dispatched,
440 Failed,
441 Dlq,
442}
443
444#[derive(Debug)]
445pub enum TriggerRegistryError {
446 DuplicateId(String),
447 InvalidSpec(String),
448 UnknownId(String),
449 UnknownBindingVersion { id: String, version: u32 },
450 EventLog(String),
451}
452
453impl std::fmt::Display for TriggerRegistryError {
454 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
455 match self {
456 Self::DuplicateId(id) => write!(f, "duplicate trigger id '{id}'"),
457 Self::InvalidSpec(message) | Self::EventLog(message) => f.write_str(message),
458 Self::UnknownId(id) => write!(f, "unknown trigger id '{id}'"),
459 Self::UnknownBindingVersion { id, version } => {
460 write!(f, "unknown trigger binding '{id}' version {version}")
461 }
462 }
463 }
464}
465
466impl std::error::Error for TriggerRegistryError {}
467
468#[derive(Default)]
469pub struct TriggerRegistry {
470 bindings: BTreeMap<String, Vec<Arc<TriggerBinding>>>,
471 by_provider: BTreeMap<String, BTreeSet<String>>,
472 event_log: Option<Arc<AnyEventLog>>,
473 secret_provider: Option<Arc<dyn SecretProvider>>,
474}
475
476thread_local! {
477 static TRIGGER_REGISTRY: RefCell<TriggerRegistry> = RefCell::new(TriggerRegistry::default());
478}
479
480thread_local! {
481 static ORCHESTRATOR_BUDGET: RefCell<OrchestratorBudgetState> =
482 RefCell::new(OrchestratorBudgetState::default());
483}
484
485const TERMINATED_VERSION_RETENTION_LIMIT: usize = 2;
486
487const TRIGGERS_LIFECYCLE_TOPIC: &str = "triggers.lifecycle";
488const PREDICATE_COST_WINDOW: usize = 100;
489
490#[derive(Clone, Debug, Deserialize)]
491struct LifecycleStateTransitionRecord {
492 id: String,
493 version: u32,
494 #[serde(default)]
495 definition_fingerprint: Option<String>,
496 to_state: TriggerState,
497}
498
499#[derive(Clone, Debug)]
500struct HistoricalLifecycleRecord {
501 occurred_at_ms: i64,
502 transition: LifecycleStateTransitionRecord,
503}
504
505#[derive(Clone, Copy, Debug, PartialEq, Eq)]
506pub struct RecordedTriggerBinding {
507 pub version: u32,
508 pub received_at: OffsetDateTime,
509}
510
511#[derive(Clone, Copy, Debug, Default)]
512struct HistoricalVersionLookup {
513 matching_version: Option<u32>,
514 max_version: Option<u32>,
515}
516
517pub fn clear_trigger_registry() {
518 TRIGGER_REGISTRY.with(|slot| {
519 *slot.borrow_mut() = TriggerRegistry::default();
520 });
521 clear_orchestrator_budget();
522}
523
524pub fn install_orchestrator_budget(config: OrchestratorBudgetConfig) {
525 ORCHESTRATOR_BUDGET.with(|slot| {
526 let mut state = slot.borrow_mut();
527 rollover_orchestrator_budget(&mut state);
528 state.config = config;
529 });
530}
531
532pub fn clear_orchestrator_budget() {
533 ORCHESTRATOR_BUDGET.with(|slot| {
534 *slot.borrow_mut() = OrchestratorBudgetState::default();
535 });
536}
537
538pub fn snapshot_orchestrator_budget() -> OrchestratorBudgetSnapshot {
539 ORCHESTRATOR_BUDGET.with(|slot| {
540 let mut state = slot.borrow_mut();
541 rollover_orchestrator_budget(&mut state);
542 OrchestratorBudgetSnapshot {
543 daily_cost_usd: state.config.daily_cost_usd,
544 hourly_cost_usd: state.config.hourly_cost_usd,
545 cost_today_usd_micros: state.cost_today_usd_micros,
546 cost_hour_usd_micros: state.cost_hour_usd_micros,
547 day_utc: state.day_utc,
548 hour_utc: state.hour_utc,
549 }
550 })
551}
552
553pub fn note_orchestrator_budget_cost(cost_usd_micros: u64) {
554 if cost_usd_micros == 0 {
555 return;
556 }
557 ORCHESTRATOR_BUDGET.with(|slot| {
558 let mut state = slot.borrow_mut();
559 rollover_orchestrator_budget(&mut state);
560 state.cost_today_usd_micros = state.cost_today_usd_micros.saturating_add(cost_usd_micros);
561 state.cost_hour_usd_micros = state.cost_hour_usd_micros.saturating_add(cost_usd_micros);
562 });
563}
564
565pub fn orchestrator_budget_would_exceed(expected_cost_usd_micros: u64) -> Option<&'static str> {
566 ORCHESTRATOR_BUDGET.with(|slot| {
567 let mut state = slot.borrow_mut();
568 rollover_orchestrator_budget(&mut state);
569 if state.config.hourly_cost_usd.is_some_and(|limit| {
570 micros_to_usd(
571 state
572 .cost_hour_usd_micros
573 .saturating_add(expected_cost_usd_micros),
574 ) > limit
575 }) {
576 return Some("orchestrator_hourly_budget_exceeded");
577 }
578 if state.config.daily_cost_usd.is_some_and(|limit| {
579 micros_to_usd(
580 state
581 .cost_today_usd_micros
582 .saturating_add(expected_cost_usd_micros),
583 ) > limit
584 }) {
585 return Some("orchestrator_daily_budget_exceeded");
586 }
587 None
588 })
589}
590
591pub fn reset_binding_budget_windows(binding: &TriggerBinding) {
592 let today = utc_day_key();
593 let hour = utc_hour_key();
594 let mut state = binding
595 .predicate_state
596 .lock()
597 .expect("trigger predicate state poisoned");
598 if state.budget_day_utc != Some(today) {
599 state.budget_day_utc = Some(today);
600 binding
601 .metrics
602 .cost_today_usd_micros
603 .store(0, Ordering::Relaxed);
604 binding
605 .metrics
606 .autonomous_decisions_today
607 .store(0, Ordering::Relaxed);
608 }
609 if state.budget_hour_utc != Some(hour) {
610 state.budget_hour_utc = Some(hour);
611 binding
612 .metrics
613 .cost_hour_usd_micros
614 .store(0, Ordering::Relaxed);
615 binding
616 .metrics
617 .autonomous_decisions_hour
618 .store(0, Ordering::Relaxed);
619 }
620}
621
622pub fn binding_budget_would_exceed(
623 binding: &TriggerBinding,
624 expected_cost_usd_micros: u64,
625) -> Option<&'static str> {
626 reset_binding_budget_windows(binding);
627 if binding.hourly_cost_usd.is_some_and(|limit| {
628 micros_to_usd(
629 binding
630 .metrics
631 .cost_hour_usd_micros
632 .load(Ordering::Relaxed)
633 .saturating_add(expected_cost_usd_micros),
634 ) > limit
635 }) {
636 return Some("hourly_budget_exceeded");
637 }
638 if binding.daily_cost_usd.is_some_and(|limit| {
639 micros_to_usd(
640 binding
641 .metrics
642 .cost_today_usd_micros
643 .load(Ordering::Relaxed)
644 .saturating_add(expected_cost_usd_micros),
645 ) > limit
646 }) {
647 return Some("daily_budget_exceeded");
648 }
649 None
650}
651
652pub fn binding_autonomy_budget_would_exceed(binding: &TriggerBinding) -> Option<&'static str> {
653 reset_binding_budget_windows(binding);
654 if binding
655 .max_autonomous_decisions_per_hour
656 .is_some_and(|limit| {
657 binding
658 .metrics
659 .autonomous_decisions_hour
660 .load(Ordering::Relaxed)
661 .saturating_add(1)
662 > limit
663 })
664 {
665 return Some("hourly_autonomy_budget_exceeded");
666 }
667 if binding
668 .max_autonomous_decisions_per_day
669 .is_some_and(|limit| {
670 binding
671 .metrics
672 .autonomous_decisions_today
673 .load(Ordering::Relaxed)
674 .saturating_add(1)
675 > limit
676 })
677 {
678 return Some("daily_autonomy_budget_exceeded");
679 }
680 None
681}
682
683pub fn note_autonomous_decision(binding: &TriggerBinding) {
684 reset_binding_budget_windows(binding);
685 binding
686 .metrics
687 .autonomous_decisions_total
688 .fetch_add(1, Ordering::Relaxed);
689 binding
690 .metrics
691 .autonomous_decisions_today
692 .fetch_add(1, Ordering::Relaxed);
693 binding
694 .metrics
695 .autonomous_decisions_hour
696 .fetch_add(1, Ordering::Relaxed);
697}
698
699pub fn expected_predicate_cost_usd_micros(binding: &TriggerBinding) -> u64 {
700 let state = binding
701 .predicate_state
702 .lock()
703 .expect("trigger predicate state poisoned");
704 if !state.recent_cost_usd_micros.is_empty() {
705 let total: u64 = state.recent_cost_usd_micros.iter().copied().sum();
706 return total / state.recent_cost_usd_micros.len() as u64;
707 }
708 binding
709 .when_budget
710 .as_ref()
711 .and_then(|budget| budget.max_cost_usd)
712 .map(usd_to_micros)
713 .unwrap_or_default()
714}
715
716pub fn record_predicate_cost_sample(binding: &TriggerBinding, cost_usd_micros: u64) {
717 let mut state = binding
718 .predicate_state
719 .lock()
720 .expect("trigger predicate state poisoned");
721 state.recent_cost_usd_micros.push_back(cost_usd_micros);
722 while state.recent_cost_usd_micros.len() > PREDICATE_COST_WINDOW {
723 state.recent_cost_usd_micros.pop_front();
724 }
725}
726
727pub fn usd_to_micros(value: f64) -> u64 {
728 if !value.is_finite() || value <= 0.0 {
729 return 0;
730 }
731 (value * 1_000_000.0).ceil() as u64
732}
733
734pub fn micros_to_usd(value: u64) -> f64 {
735 value as f64 / 1_000_000.0
736}
737
738fn rollover_orchestrator_budget(state: &mut OrchestratorBudgetState) {
739 let today = utc_day_key();
740 let hour = utc_hour_key();
741 if state.day_utc != today {
742 state.day_utc = today;
743 state.cost_today_usd_micros = 0;
744 }
745 if state.hour_utc != hour {
746 state.hour_utc = hour;
747 state.cost_hour_usd_micros = 0;
748 }
749}
750
751fn utc_day_key() -> i32 {
752 (clock::now_utc().date()
753 - time::Date::from_calendar_date(1970, time::Month::January, 1).expect("valid epoch date"))
754 .whole_days() as i32
755}
756
757fn utc_hour_key() -> i64 {
758 clock::now_utc().unix_timestamp() / 3_600
759}
760
761pub fn snapshot_trigger_bindings() -> Vec<TriggerBindingSnapshot> {
762 TRIGGER_REGISTRY.with(|slot| {
763 let registry = slot.borrow();
764 let mut snapshots = Vec::new();
765 for bindings in registry.bindings.values() {
766 for binding in bindings {
767 snapshots.push(binding.snapshot());
768 }
769 }
770 snapshots.sort_by(|left, right| {
771 left.id
772 .cmp(&right.id)
773 .then(left.version.cmp(&right.version))
774 .then(left.state.as_str().cmp(right.state.as_str()))
775 });
776 snapshots
777 })
778}
779
780#[allow(clippy::arc_with_non_send_sync)]
781pub fn resolve_trigger_binding_as_of(
782 id: &str,
783 as_of: OffsetDateTime,
784) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
785 let version = binding_version_as_of(id, as_of)?;
786 resolve_trigger_binding_version(id, version)
787}
788
789#[allow(clippy::arc_with_non_send_sync)]
790pub fn resolve_live_or_as_of(
791 id: &str,
792 recorded: RecordedTriggerBinding,
793) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
794 match resolve_live_trigger_binding(id, Some(recorded.version)) {
795 Ok(binding) => Ok(binding),
796 Err(TriggerRegistryError::UnknownBindingVersion { .. }) => {
797 let binding = resolve_trigger_binding_as_of(id, recorded.received_at)?;
798 let mut metadata = BTreeMap::new();
799 metadata.insert("trigger_id".to_string(), serde_json::json!(id));
800 metadata.insert(
801 "recorded_version".to_string(),
802 serde_json::json!(recorded.version),
803 );
804 metadata.insert(
805 "received_at".to_string(),
806 serde_json::json!(recorded
807 .received_at
808 .format(&time::format_description::well_known::Rfc3339)
809 .unwrap_or_else(|_| recorded.received_at.to_string())),
810 );
811 metadata.insert(
812 "resolved_version".to_string(),
813 serde_json::json!(binding.version),
814 );
815 crate::events::log_warn_meta(
816 "replay.binding_version_gc_fallback",
817 "trigger replay fell back to lifecycle history after binding version GC",
818 metadata,
819 );
820 Ok(binding)
821 }
822 Err(error) => Err(error),
823 }
824}
825
826pub fn binding_version_as_of(id: &str, as_of: OffsetDateTime) -> Result<u32, TriggerRegistryError> {
827 TRIGGER_REGISTRY.with(|slot| {
828 let registry = slot.borrow();
829 registry.binding_version_as_of(id, as_of)
830 })
831}
832
833#[allow(clippy::arc_with_non_send_sync)]
834fn resolve_trigger_binding_version(
835 id: &str,
836 version: u32,
837) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
838 TRIGGER_REGISTRY.with(|slot| {
839 let registry = slot.borrow();
840 registry
841 .binding(id, version)
842 .ok_or_else(|| TriggerRegistryError::UnknownBindingVersion {
843 id: id.to_string(),
844 version,
845 })
846 })
847}
848
849#[allow(clippy::arc_with_non_send_sync)]
850pub fn resolve_live_trigger_binding(
851 id: &str,
852 version: Option<u32>,
853) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
854 TRIGGER_REGISTRY.with(|slot| {
855 let registry = slot.borrow();
856 if let Some(version) = version {
857 let binding = registry.binding(id, version).ok_or_else(|| {
858 TriggerRegistryError::UnknownBindingVersion {
859 id: id.to_string(),
860 version,
861 }
862 })?;
863 if binding.state_snapshot() == TriggerState::Terminated {
864 return Err(TriggerRegistryError::UnknownBindingVersion {
865 id: id.to_string(),
866 version,
867 });
868 }
869 return Ok(binding);
870 }
871
872 registry
873 .live_bindings_any_source(id)
874 .into_iter()
875 .max_by_key(|binding| binding.version)
876 .ok_or_else(|| TriggerRegistryError::UnknownId(id.to_string()))
877 })
878}
879
880pub(crate) fn matching_bindings(event: &super::TriggerEvent) -> Vec<Arc<TriggerBinding>> {
881 TRIGGER_REGISTRY.with(|slot| {
882 let registry = slot.borrow();
883 let Some(binding_ids) = registry.by_provider.get(event.provider.as_str()) else {
884 return Vec::new();
885 };
886
887 let mut bindings = Vec::new();
888 for id in binding_ids {
889 let Some(versions) = registry.bindings.get(id) else {
890 continue;
891 };
892 for binding in versions {
893 if binding.state_snapshot() != TriggerState::Active {
894 continue;
895 }
896 if !binding.match_events.is_empty()
897 && !binding.match_events.iter().any(|kind| kind == &event.kind)
898 {
899 continue;
900 }
901 bindings.push(binding.clone());
902 }
903 }
904
905 bindings.sort_by(|left, right| {
906 left.id
907 .as_str()
908 .cmp(right.id.as_str())
909 .then(left.version.cmp(&right.version))
910 });
911 bindings
912 })
913}
914
915pub async fn install_manifest_triggers(
916 specs: Vec<TriggerBindingSpec>,
917) -> Result<(), TriggerRegistryError> {
918 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
919 let registry = &mut *slot.borrow_mut();
920 registry.refresh_runtime_context();
921 let mut touched_ids = BTreeSet::new();
922
923 let mut incoming = BTreeMap::new();
924 for spec in specs {
925 let spec_id = spec.id.clone();
926 if spec.source != TriggerBindingSource::Manifest {
927 return Err(TriggerRegistryError::InvalidSpec(format!(
928 "manifest install received non-manifest trigger '{}'",
929 spec_id
930 )));
931 }
932 if spec_id.trim().is_empty() {
933 return Err(TriggerRegistryError::InvalidSpec(
934 "manifest trigger id cannot be empty".to_string(),
935 ));
936 }
937 if incoming.insert(spec_id.clone(), spec).is_some() {
938 return Err(TriggerRegistryError::DuplicateId(spec_id));
939 }
940 }
941
942 let mut lifecycle = Vec::new();
943 let existing_ids: Vec<String> = registry
944 .bindings
945 .iter()
946 .filter(|(_, bindings)| {
947 bindings.iter().any(|binding| {
948 binding.source == TriggerBindingSource::Manifest
949 && binding.state_snapshot() != TriggerState::Terminated
950 })
951 })
952 .map(|(id, _)| id.clone())
953 .collect();
954
955 for id in existing_ids {
956 let live_manifest = registry.live_bindings(&id, TriggerBindingSource::Manifest);
957 let Some(spec) = incoming.remove(&id) else {
958 for binding in live_manifest {
959 registry.transition_binding_to_draining(&binding, &mut lifecycle);
960 }
961 touched_ids.insert(id.clone());
962 continue;
963 };
964
965 let has_matching_active = live_manifest.iter().any(|binding| {
966 binding.definition_fingerprint == spec.definition_fingerprint
967 && matches!(
968 binding.state_snapshot(),
969 TriggerState::Registering | TriggerState::Active
970 )
971 });
972 if has_matching_active {
973 continue;
974 }
975
976 for binding in live_manifest {
977 registry.transition_binding_to_draining(&binding, &mut lifecycle);
978 }
979
980 let version = registry.next_version_for_spec(&spec);
981 registry.register_binding(spec, version, &mut lifecycle);
982 touched_ids.insert(id.clone());
983 }
984
985 for spec in incoming.into_values() {
986 touched_ids.insert(spec.id.clone());
987 let version = registry.next_version_for_spec(&spec);
988 registry.register_binding(spec, version, &mut lifecycle);
989 }
990
991 for id in touched_ids {
992 registry.gc_terminated_versions(&id);
993 }
994
995 Ok((registry.event_log.clone(), lifecycle))
996 })?;
997
998 append_lifecycle_events(event_log, events).await
999}
1000
1001pub async fn dynamic_register(
1002 mut spec: TriggerBindingSpec,
1003) -> Result<TriggerId, TriggerRegistryError> {
1004 if spec.id.trim().is_empty() {
1005 spec.id = format!("dynamic_trigger_{}", Uuid::now_v7());
1006 }
1007 spec.source = TriggerBindingSource::Dynamic;
1008 let id = spec.id.clone();
1009 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1010 let registry = &mut *slot.borrow_mut();
1011 registry.refresh_runtime_context();
1012
1013 if registry.bindings.contains_key(id.as_str()) {
1014 return Err(TriggerRegistryError::DuplicateId(id.clone()));
1015 }
1016
1017 let mut lifecycle = Vec::new();
1018 let version = registry.next_version_for_spec(&spec);
1019 registry.register_binding(spec, version, &mut lifecycle);
1020 Ok((registry.event_log.clone(), lifecycle))
1021 })?;
1022
1023 append_lifecycle_events(event_log, events).await?;
1024 Ok(TriggerId::new(id))
1025}
1026
1027pub async fn dynamic_deregister(id: &str) -> Result<(), TriggerRegistryError> {
1028 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1029 let registry = &mut *slot.borrow_mut();
1030 let live_dynamic = registry.live_bindings(id, TriggerBindingSource::Dynamic);
1031 if live_dynamic.is_empty() {
1032 return Err(TriggerRegistryError::UnknownId(id.to_string()));
1033 }
1034
1035 let mut lifecycle = Vec::new();
1036 for binding in live_dynamic {
1037 registry.transition_binding_to_draining(&binding, &mut lifecycle);
1038 }
1039 Ok((registry.event_log.clone(), lifecycle))
1040 })?;
1041
1042 append_lifecycle_events(event_log, events).await
1043}
1044
1045pub async fn drain(id: &str) -> Result<(), TriggerRegistryError> {
1046 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1047 let registry = &mut *slot.borrow_mut();
1048 let live = registry.live_bindings_any_source(id);
1049 if live.is_empty() {
1050 return Err(TriggerRegistryError::UnknownId(id.to_string()));
1051 }
1052
1053 let mut lifecycle = Vec::new();
1054 for binding in live {
1055 registry.transition_binding_to_draining(&binding, &mut lifecycle);
1056 }
1057 Ok((registry.event_log.clone(), lifecycle))
1058 })?;
1059
1060 append_lifecycle_events(event_log, events).await
1061}
1062
1063fn pin_trigger_binding_inner(
1064 id: &str,
1065 version: u32,
1066 allow_terminated: bool,
1067) -> Result<(), TriggerRegistryError> {
1068 TRIGGER_REGISTRY.with(|slot| {
1069 let registry = slot.borrow();
1070 let binding = registry.binding(id, version).ok_or_else(|| {
1071 TriggerRegistryError::UnknownBindingVersion {
1072 id: id.to_string(),
1073 version,
1074 }
1075 })?;
1076 match binding.state_snapshot() {
1077 TriggerState::Terminated if !allow_terminated => {
1078 Err(TriggerRegistryError::InvalidSpec(format!(
1079 "trigger binding '{}' version {} is terminated",
1080 id, version
1081 )))
1082 }
1083 _ => {
1084 binding.in_flight.fetch_add(1, Ordering::Relaxed);
1085 Ok(())
1086 }
1087 }
1088 })
1089}
1090
1091pub fn pin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1092 pin_trigger_binding_inner(id, version, false)
1093}
1094
1095pub async fn unpin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1096 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1097 let registry = &mut *slot.borrow_mut();
1098 let binding = registry.binding(id, version).ok_or_else(|| {
1099 TriggerRegistryError::UnknownBindingVersion {
1100 id: id.to_string(),
1101 version,
1102 }
1103 })?;
1104 let current = binding.in_flight.load(Ordering::Relaxed);
1105 if current == 0 {
1106 return Err(TriggerRegistryError::InvalidSpec(format!(
1107 "trigger binding '{}' version {} has no in-flight events",
1108 id, version
1109 )));
1110 }
1111 binding.in_flight.fetch_sub(1, Ordering::Relaxed);
1112
1113 let mut lifecycle = Vec::new();
1114 registry.maybe_finalize_draining(&binding, &mut lifecycle);
1115 registry.gc_terminated_versions(binding.id.as_str());
1116 Ok((registry.event_log.clone(), lifecycle))
1117 })?;
1118
1119 append_lifecycle_events(event_log, events).await
1120}
1121
1122pub fn begin_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1123 begin_in_flight_inner(id, version, false)
1124}
1125
1126pub(crate) fn begin_replay_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1127 begin_in_flight_inner(id, version, true)
1128}
1129
1130fn begin_in_flight_inner(
1131 id: &str,
1132 version: u32,
1133 allow_terminated: bool,
1134) -> Result<(), TriggerRegistryError> {
1135 pin_trigger_binding_inner(id, version, allow_terminated)?;
1136 TRIGGER_REGISTRY.with(|slot| {
1137 let registry = slot.borrow();
1138 let binding = registry.binding(id, version).ok_or_else(|| {
1139 TriggerRegistryError::UnknownBindingVersion {
1140 id: id.to_string(),
1141 version,
1142 }
1143 })?;
1144 binding.metrics.received.fetch_add(1, Ordering::Relaxed);
1145 *binding
1146 .metrics
1147 .last_received_ms
1148 .lock()
1149 .expect("trigger metrics poisoned") = Some(now_ms());
1150 Ok(())
1151 })
1152}
1153
1154pub async fn finish_in_flight(
1155 id: &str,
1156 version: u32,
1157 outcome: TriggerDispatchOutcome,
1158) -> Result<(), TriggerRegistryError> {
1159 TRIGGER_REGISTRY.with(|slot| {
1160 let registry = &mut *slot.borrow_mut();
1161 let binding = registry.binding(id, version).ok_or_else(|| {
1162 TriggerRegistryError::UnknownBindingVersion {
1163 id: id.to_string(),
1164 version,
1165 }
1166 })?;
1167 let current = binding.in_flight.load(Ordering::Relaxed);
1168 if current == 0 {
1169 return Err(TriggerRegistryError::InvalidSpec(format!(
1170 "trigger binding '{}' version {} has no in-flight events",
1171 id, version
1172 )));
1173 }
1174 match outcome {
1175 TriggerDispatchOutcome::Dispatched => {
1176 binding.metrics.dispatched.fetch_add(1, Ordering::Relaxed);
1177 }
1178 TriggerDispatchOutcome::Failed => {
1179 binding.metrics.failed.fetch_add(1, Ordering::Relaxed);
1180 }
1181 TriggerDispatchOutcome::Dlq => {
1182 binding.metrics.dlq.fetch_add(1, Ordering::Relaxed);
1183 }
1184 }
1185 Ok(())
1186 })?;
1187
1188 unpin_trigger_binding(id, version).await
1189}
1190
1191impl TriggerRegistry {
1192 fn refresh_runtime_context(&mut self) {
1193 if self.event_log.is_none() {
1194 self.event_log = active_event_log();
1195 }
1196 if self.secret_provider.is_none() {
1197 self.secret_provider = default_secret_provider();
1198 }
1199 }
1200
1201 fn binding(&self, id: &str, version: u32) -> Option<Arc<TriggerBinding>> {
1202 self.bindings
1203 .get(id)
1204 .and_then(|bindings| bindings.iter().find(|binding| binding.version == version))
1205 .cloned()
1206 }
1207
1208 fn live_bindings(&self, id: &str, source: TriggerBindingSource) -> Vec<Arc<TriggerBinding>> {
1209 self.bindings
1210 .get(id)
1211 .into_iter()
1212 .flat_map(|bindings| bindings.iter())
1213 .filter(|binding| {
1214 binding.source == source && binding.state_snapshot() != TriggerState::Terminated
1215 })
1216 .cloned()
1217 .collect()
1218 }
1219
1220 fn live_bindings_any_source(&self, id: &str) -> Vec<Arc<TriggerBinding>> {
1221 self.bindings
1222 .get(id)
1223 .into_iter()
1224 .flat_map(|bindings| bindings.iter())
1225 .filter(|binding| binding.state_snapshot() != TriggerState::Terminated)
1226 .cloned()
1227 .collect()
1228 }
1229
1230 fn next_version_for_spec(&self, spec: &TriggerBindingSpec) -> u32 {
1231 if let Some(version) = self
1232 .bindings
1233 .get(spec.id.as_str())
1234 .into_iter()
1235 .flat_map(|bindings| bindings.iter())
1236 .find(|binding| binding.definition_fingerprint == spec.definition_fingerprint)
1237 .map(|binding| binding.version)
1238 {
1239 return version;
1240 }
1241
1242 let historical =
1243 self.historical_versions_for(spec.id.as_str(), spec.definition_fingerprint.as_str());
1244 if let Some(version) = historical.matching_version {
1245 return version;
1246 }
1247
1248 self.bindings
1249 .get(spec.id.as_str())
1250 .into_iter()
1251 .flat_map(|bindings| bindings.iter())
1252 .map(|binding| binding.version)
1253 .chain(historical.max_version)
1254 .max()
1255 .unwrap_or(0)
1256 + 1
1257 }
1258
1259 fn gc_terminated_versions(&mut self, id: &str) {
1260 let Some(bindings) = self.bindings.get_mut(id) else {
1261 return;
1262 };
1263
1264 let mut newest_versions: Vec<u32> =
1265 bindings.iter().map(|binding| binding.version).collect();
1266 newest_versions.sort_unstable_by(|left, right| right.cmp(left));
1267 newest_versions.truncate(TERMINATED_VERSION_RETENTION_LIMIT);
1268 let retained_versions: BTreeSet<u32> = newest_versions.into_iter().collect();
1269
1270 bindings.retain(|binding| {
1271 binding.state_snapshot() != TriggerState::Terminated
1272 || retained_versions.contains(&binding.version)
1273 });
1274
1275 if bindings.is_empty() {
1276 self.bindings.remove(id);
1277 }
1278 }
1279
1280 fn historical_versions_for(&self, id: &str, fingerprint: &str) -> HistoricalVersionLookup {
1281 let mut lookup = HistoricalVersionLookup::default();
1282 for record in self.lifecycle_records_for(id) {
1283 lookup.max_version = Some(
1284 lookup
1285 .max_version
1286 .unwrap_or(0)
1287 .max(record.transition.version),
1288 );
1289 if record.transition.definition_fingerprint.as_deref() == Some(fingerprint) {
1290 lookup.matching_version = Some(record.transition.version);
1291 }
1292 }
1293 lookup
1294 }
1295
1296 fn binding_version_as_of(
1297 &self,
1298 id: &str,
1299 as_of: OffsetDateTime,
1300 ) -> Result<u32, TriggerRegistryError> {
1301 let cutoff_ms = (as_of.unix_timestamp_nanos() / 1_000_000) as i64;
1302 let mut active_version = None;
1303 for record in self.lifecycle_records_for(id) {
1304 if record.occurred_at_ms > cutoff_ms {
1305 break;
1306 }
1307 match record.transition.to_state {
1308 TriggerState::Active => active_version = Some(record.transition.version),
1309 TriggerState::Draining | TriggerState::Terminated => {
1310 if active_version == Some(record.transition.version) {
1311 active_version = None;
1312 }
1313 }
1314 TriggerState::Registering => {}
1315 }
1316 }
1317
1318 active_version.ok_or_else(|| {
1319 TriggerRegistryError::InvalidSpec(format!(
1320 "no active trigger binding '{}' at {}",
1321 id,
1322 as_of
1323 .format(&time::format_description::well_known::Rfc3339)
1324 .unwrap_or_else(|_| as_of.to_string())
1325 ))
1326 })
1327 }
1328
1329 fn lifecycle_records_for(&self, id: &str) -> Vec<HistoricalLifecycleRecord> {
1330 let Some(event_log) = self.event_log.as_ref() else {
1331 return Vec::new();
1332 };
1333 let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1334 .expect("static triggers.lifecycle topic should always be valid");
1335 futures::executor::block_on(event_log.read_range(&topic, None, usize::MAX))
1336 .unwrap_or_default()
1337 .into_iter()
1338 .filter_map(|(_, event)| {
1339 let occurred_at_ms = event.occurred_at_ms;
1340 let transition: LifecycleStateTransitionRecord =
1341 serde_json::from_value(event.payload).ok()?;
1342 (transition.id == id).then_some(HistoricalLifecycleRecord {
1343 occurred_at_ms,
1344 transition,
1345 })
1346 })
1347 .collect()
1348 }
1349
1350 #[allow(clippy::arc_with_non_send_sync)]
1351 fn register_binding(
1352 &mut self,
1353 spec: TriggerBindingSpec,
1354 version: u32,
1355 lifecycle: &mut Vec<LogEvent>,
1356 ) -> Arc<TriggerBinding> {
1357 let binding = Arc::new(TriggerBinding::new(spec, version));
1358 self.by_provider
1359 .entry(binding.provider.as_str().to_string())
1360 .or_default()
1361 .insert(binding.id.as_str().to_string());
1362 self.bindings
1363 .entry(binding.id.as_str().to_string())
1364 .or_default()
1365 .push(binding.clone());
1366 lifecycle.push(lifecycle_event(&binding, None, TriggerState::Registering));
1367 self.transition_binding_state(&binding, TriggerState::Active, lifecycle);
1368 binding
1369 }
1370
1371 fn transition_binding_to_draining(
1372 &self,
1373 binding: &Arc<TriggerBinding>,
1374 lifecycle: &mut Vec<LogEvent>,
1375 ) {
1376 if matches!(binding.state_snapshot(), TriggerState::Terminated) {
1377 return;
1378 }
1379 self.transition_binding_state(binding, TriggerState::Draining, lifecycle);
1380 self.maybe_finalize_draining(binding, lifecycle);
1381 }
1382
1383 fn maybe_finalize_draining(
1384 &self,
1385 binding: &Arc<TriggerBinding>,
1386 lifecycle: &mut Vec<LogEvent>,
1387 ) {
1388 if binding.state_snapshot() == TriggerState::Draining
1389 && binding.in_flight.load(Ordering::Relaxed) == 0
1390 {
1391 self.transition_binding_state(binding, TriggerState::Terminated, lifecycle);
1392 }
1393 }
1394
1395 fn transition_binding_state(
1396 &self,
1397 binding: &Arc<TriggerBinding>,
1398 next: TriggerState,
1399 lifecycle: &mut Vec<LogEvent>,
1400 ) {
1401 let mut state = binding.state.lock().expect("trigger state poisoned");
1402 let previous = *state;
1403 if previous == next {
1404 return;
1405 }
1406 *state = next;
1407 drop(state);
1408 lifecycle.push(lifecycle_event(binding, Some(previous), next));
1409 }
1410}
1411
1412fn lifecycle_event(
1413 binding: &TriggerBinding,
1414 from_state: Option<TriggerState>,
1415 to_state: TriggerState,
1416) -> LogEvent {
1417 LogEvent::new(
1418 "state_transition",
1419 serde_json::json!({
1420 "id": binding.id.as_str(),
1421 "binding_key": binding.binding_key(),
1422 "version": binding.version,
1423 "provider": binding.provider.as_str(),
1424 "kind": &binding.kind,
1425 "source": binding.source.as_str(),
1426 "handler_kind": binding.handler.kind(),
1427 "definition_fingerprint": &binding.definition_fingerprint,
1428 "from_state": from_state.map(TriggerState::as_str),
1429 "to_state": to_state.as_str(),
1430 }),
1431 )
1432}
1433
1434async fn append_lifecycle_events(
1435 event_log: Option<Arc<AnyEventLog>>,
1436 events: Vec<LogEvent>,
1437) -> Result<(), TriggerRegistryError> {
1438 let Some(event_log) = event_log else {
1439 return Ok(());
1440 };
1441 if events.is_empty() {
1442 return Ok(());
1443 }
1444
1445 let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1446 .expect("static triggers.lifecycle topic should always be valid");
1447 for event in events {
1448 event_log
1449 .append(&topic, event)
1450 .await
1451 .map_err(|error| TriggerRegistryError::EventLog(error.to_string()))?;
1452 }
1453 Ok(())
1454}
1455
1456fn default_secret_provider() -> Option<Arc<dyn SecretProvider>> {
1457 configured_default_chain(default_secret_namespace())
1458 .ok()
1459 .map(|provider| Arc::new(provider) as Arc<dyn SecretProvider>)
1460}
1461
1462fn default_secret_namespace() -> String {
1463 if let Ok(namespace) = std::env::var("HARN_SECRET_NAMESPACE") {
1464 if !namespace.trim().is_empty() {
1465 return namespace;
1466 }
1467 }
1468
1469 let cwd = std::env::current_dir().unwrap_or_default();
1470 let leaf = cwd
1471 .file_name()
1472 .and_then(|name| name.to_str())
1473 .filter(|name| !name.is_empty())
1474 .unwrap_or("workspace");
1475 format!("harn/{leaf}")
1476}
1477
1478fn now_ms() -> i64 {
1479 clock::now_ms()
1480}
1481
1482#[cfg(test)]
1483mod tests {
1484 use super::*;
1485 use crate::event_log::{install_default_for_base_dir, reset_active_event_log};
1486 use crate::events::{add_event_sink, clear_event_sinks, CollectorSink, EventLevel};
1487 use std::rc::Rc;
1488 use time::OffsetDateTime;
1489
1490 fn manifest_spec(id: &str, fingerprint: &str) -> TriggerBindingSpec {
1491 TriggerBindingSpec {
1492 id: id.to_string(),
1493 source: TriggerBindingSource::Manifest,
1494 kind: "webhook".to_string(),
1495 provider: ProviderId::from("github"),
1496 autonomy_tier: crate::AutonomyTier::ActAuto,
1497 handler: TriggerHandlerSpec::Worker {
1498 queue: format!("{id}-queue"),
1499 },
1500 dispatch_priority: crate::WorkerQueuePriority::Normal,
1501 when: None,
1502 when_budget: None,
1503 retry: TriggerRetryConfig::default(),
1504 match_events: vec!["issues.opened".to_string()],
1505 dedupe_key: Some("event.dedupe_key".to_string()),
1506 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1507 filter: Some("event.kind".to_string()),
1508 daily_cost_usd: Some(5.0),
1509 hourly_cost_usd: None,
1510 max_autonomous_decisions_per_hour: None,
1511 max_autonomous_decisions_per_day: None,
1512 on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1513 max_concurrent: Some(10),
1514 flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1515 manifest_path: None,
1516 package_name: Some("workspace".to_string()),
1517 definition_fingerprint: fingerprint.to_string(),
1518 }
1519 }
1520
1521 fn dynamic_spec(id: &str) -> TriggerBindingSpec {
1522 TriggerBindingSpec {
1523 id: id.to_string(),
1524 source: TriggerBindingSource::Dynamic,
1525 kind: "webhook".to_string(),
1526 provider: ProviderId::from("github"),
1527 autonomy_tier: crate::AutonomyTier::ActAuto,
1528 handler: TriggerHandlerSpec::Worker {
1529 queue: format!("{id}-queue"),
1530 },
1531 dispatch_priority: crate::WorkerQueuePriority::Normal,
1532 when: None,
1533 when_budget: None,
1534 retry: TriggerRetryConfig::default(),
1535 match_events: vec!["issues.opened".to_string()],
1536 dedupe_key: None,
1537 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1538 filter: None,
1539 daily_cost_usd: None,
1540 hourly_cost_usd: None,
1541 max_autonomous_decisions_per_hour: None,
1542 max_autonomous_decisions_per_day: None,
1543 on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1544 max_concurrent: None,
1545 flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1546 manifest_path: None,
1547 package_name: None,
1548 definition_fingerprint: format!("dynamic:{id}"),
1549 }
1550 }
1551
1552 #[tokio::test(flavor = "current_thread")]
1553 async fn manifest_loaded_trigger_registers_with_zeroed_metrics() {
1554 clear_trigger_registry();
1555
1556 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1557 .await
1558 .expect("manifest trigger installs");
1559
1560 let snapshots = snapshot_trigger_bindings();
1561 assert_eq!(snapshots.len(), 1);
1562 let binding = &snapshots[0];
1563 assert_eq!(binding.id, "github-new-issue");
1564 assert_eq!(binding.version, 1);
1565 assert_eq!(binding.state, TriggerState::Active);
1566 assert_eq!(binding.metrics, TriggerMetricsSnapshot::default());
1567
1568 clear_trigger_registry();
1569 }
1570
1571 #[tokio::test(flavor = "current_thread")]
1572 async fn dynamic_register_assigns_unique_ids_and_rejects_duplicates() {
1573 clear_trigger_registry();
1574
1575 let first = dynamic_register(dynamic_spec("dynamic-a"))
1576 .await
1577 .expect("first dynamic trigger");
1578 let second = dynamic_register(dynamic_spec("dynamic-b"))
1579 .await
1580 .expect("second dynamic trigger");
1581 assert_ne!(first, second);
1582
1583 let error = dynamic_register(dynamic_spec("dynamic-a"))
1584 .await
1585 .expect_err("duplicate id should fail");
1586 assert!(matches!(error, TriggerRegistryError::DuplicateId(_)));
1587
1588 clear_trigger_registry();
1589 }
1590
1591 #[tokio::test(flavor = "current_thread")]
1592 async fn drain_waits_for_in_flight_events_before_terminating() {
1593 clear_trigger_registry();
1594
1595 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1596 .await
1597 .expect("manifest trigger installs");
1598 begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1599
1600 drain("github-new-issue").await.expect("drain succeeds");
1601 let binding = snapshot_trigger_bindings()
1602 .into_iter()
1603 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1604 .expect("binding snapshot");
1605 assert_eq!(binding.state, TriggerState::Draining);
1606 assert_eq!(binding.metrics.in_flight, 1);
1607
1608 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1609 .await
1610 .expect("finish in-flight event");
1611 let binding = snapshot_trigger_bindings()
1612 .into_iter()
1613 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1614 .expect("binding snapshot");
1615 assert_eq!(binding.state, TriggerState::Terminated);
1616 assert_eq!(binding.metrics.in_flight, 0);
1617
1618 clear_trigger_registry();
1619 }
1620
1621 #[tokio::test(flavor = "current_thread")]
1622 async fn hot_reload_registers_new_version_while_old_binding_drains() {
1623 clear_trigger_registry();
1624
1625 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1626 .await
1627 .expect("initial manifest trigger installs");
1628 begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1629
1630 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1631 .await
1632 .expect("updated manifest trigger installs");
1633
1634 let snapshots = snapshot_trigger_bindings();
1635 assert_eq!(snapshots.len(), 2);
1636 let old = snapshots
1637 .iter()
1638 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1639 .expect("old binding");
1640 let new = snapshots
1641 .iter()
1642 .find(|binding| binding.id == "github-new-issue" && binding.version == 2)
1643 .expect("new binding");
1644 assert_eq!(old.state, TriggerState::Draining);
1645 assert_eq!(new.state, TriggerState::Active);
1646
1647 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1648 .await
1649 .expect("finish old in-flight event");
1650 let old = snapshot_trigger_bindings()
1651 .into_iter()
1652 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1653 .expect("old binding");
1654 assert_eq!(old.state, TriggerState::Terminated);
1655
1656 clear_trigger_registry();
1657 }
1658
1659 #[tokio::test(flavor = "current_thread")]
1660 async fn gc_drops_terminated_versions_beyond_retention_limit() {
1661 clear_trigger_registry();
1662
1663 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1664 .await
1665 .expect("install v1");
1666 begin_in_flight("github-new-issue", 1).expect("pin v1");
1667
1668 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1669 .await
1670 .expect("install v2");
1671 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1672 .await
1673 .expect("finish v1");
1674
1675 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
1676 .await
1677 .expect("install v3");
1678
1679 let snapshots = snapshot_trigger_bindings();
1680 let versions: Vec<u32> = snapshots
1681 .into_iter()
1682 .filter(|binding| binding.id == "github-new-issue")
1683 .map(|binding| binding.version)
1684 .collect();
1685 assert_eq!(versions, vec![2, 3]);
1686
1687 clear_trigger_registry();
1688 }
1689
1690 #[tokio::test(flavor = "current_thread")]
1691 async fn lifecycle_transitions_append_to_event_log() {
1692 clear_trigger_registry();
1693 reset_active_event_log();
1694 let tempdir = tempfile::tempdir().expect("tempdir");
1695 let log = install_default_for_base_dir(tempdir.path()).expect("install event log");
1696
1697 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1698 .await
1699 .expect("manifest trigger installs");
1700 begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1701 drain("github-new-issue").await.expect("drain succeeds");
1702 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1703 .await
1704 .expect("finish event");
1705
1706 let topic = Topic::new("triggers.lifecycle").expect("valid lifecycle topic");
1707 let events = log
1708 .read_range(&topic, None, 32)
1709 .await
1710 .expect("read lifecycle events");
1711 let states: Vec<String> = events
1712 .into_iter()
1713 .filter_map(|(_, event)| {
1714 event
1715 .payload
1716 .get("to_state")
1717 .and_then(|value| value.as_str())
1718 .map(|value| value.to_string())
1719 })
1720 .collect();
1721 assert_eq!(
1722 states,
1723 vec![
1724 "registering".to_string(),
1725 "active".to_string(),
1726 "draining".to_string(),
1727 "terminated".to_string(),
1728 ]
1729 );
1730
1731 reset_active_event_log();
1732 clear_trigger_registry();
1733 }
1734
1735 #[tokio::test(flavor = "current_thread")]
1736 async fn version_history_reuses_historical_version_after_restart() {
1737 clear_trigger_registry();
1738 reset_active_event_log();
1739 let tempdir = tempfile::tempdir().expect("tempdir");
1740 install_default_for_base_dir(tempdir.path()).expect("install event log");
1741
1742 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1743 .await
1744 .expect("initial manifest trigger installs");
1745 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1746 .await
1747 .expect("updated manifest trigger installs");
1748
1749 clear_trigger_registry();
1750 reset_active_event_log();
1751 install_default_for_base_dir(tempdir.path()).expect("reopen event log");
1752
1753 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1754 .await
1755 .expect("manifest reload reuses historical version");
1756
1757 let binding = snapshot_trigger_bindings()
1758 .into_iter()
1759 .find(|binding| binding.id == "github-new-issue")
1760 .expect("binding snapshot");
1761 assert_eq!(binding.version, 2);
1762
1763 reset_active_event_log();
1764 clear_trigger_registry();
1765 }
1766
1767 #[tokio::test(flavor = "current_thread")]
1768 async fn binding_version_as_of_reports_historical_active_version() {
1769 clear_trigger_registry();
1770 reset_active_event_log();
1771 let tempdir = tempfile::tempdir().expect("tempdir");
1772 install_default_for_base_dir(tempdir.path()).expect("install event log");
1773
1774 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1775 .await
1776 .expect("initial manifest trigger installs");
1777 let before_reload = OffsetDateTime::now_utc();
1778 std::thread::sleep(std::time::Duration::from_millis(10));
1779
1780 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1781 .await
1782 .expect("updated manifest trigger installs");
1783 let after_reload = OffsetDateTime::now_utc();
1784
1785 assert_eq!(
1786 binding_version_as_of("github-new-issue", before_reload)
1787 .expect("version before reload"),
1788 1
1789 );
1790 assert_eq!(
1791 binding_version_as_of("github-new-issue", after_reload).expect("version after reload"),
1792 2
1793 );
1794
1795 reset_active_event_log();
1796 clear_trigger_registry();
1797 }
1798
1799 #[tokio::test(flavor = "current_thread")]
1800 async fn resolve_live_or_as_of_logs_structured_gc_fallback() {
1801 clear_trigger_registry();
1802 reset_active_event_log();
1803 let sink = Rc::new(CollectorSink::new());
1804 clear_event_sinks();
1805 add_event_sink(sink.clone());
1806 let tempdir = tempfile::tempdir().expect("tempdir");
1807 install_default_for_base_dir(tempdir.path()).expect("install event log");
1808
1809 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1810 .await
1811 .expect("install v1");
1812 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1813 .await
1814 .expect("install v2");
1815 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
1816 .await
1817 .expect("install v3");
1818 let received_at = OffsetDateTime::now_utc();
1819 std::thread::sleep(std::time::Duration::from_millis(10));
1820 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v4")])
1821 .await
1822 .expect("install v4");
1823
1824 let binding = resolve_live_or_as_of(
1825 "github-new-issue",
1826 RecordedTriggerBinding {
1827 version: 1,
1828 received_at,
1829 },
1830 )
1831 .expect("resolve fallback binding");
1832 assert_eq!(binding.version, 3);
1833
1834 let warning = sink
1835 .logs
1836 .borrow()
1837 .iter()
1838 .find(|log| log.category == "replay.binding_version_gc_fallback")
1839 .cloned()
1840 .expect("gc fallback warning");
1841 assert_eq!(warning.level, EventLevel::Warn);
1842 assert_eq!(
1843 warning.metadata.get("trigger_id"),
1844 Some(&serde_json::json!("github-new-issue"))
1845 );
1846 assert_eq!(
1847 warning.metadata.get("recorded_version"),
1848 Some(&serde_json::json!(1))
1849 );
1850 assert_eq!(
1851 warning.metadata.get("received_at"),
1852 Some(&serde_json::json!(received_at
1853 .format(&time::format_description::well_known::Rfc3339)
1854 .unwrap_or_else(|_| received_at.to_string())))
1855 );
1856 assert_eq!(
1857 warning.metadata.get("resolved_version"),
1858 Some(&serde_json::json!(3))
1859 );
1860
1861 clear_event_sinks();
1862 crate::events::reset_event_sinks();
1863 reset_active_event_log();
1864 clear_trigger_registry();
1865 }
1866}