1use serde::{Deserialize, Serialize};
2use std::cell::RefCell;
3use std::collections::{BTreeMap, BTreeSet, VecDeque};
4use std::path::PathBuf;
5use std::rc::Rc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::{Arc, Mutex};
8
9use time::OffsetDateTime;
10use uuid::Uuid;
11
12use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogEvent, Topic};
13use crate::llm::trigger_predicate::TriggerPredicateBudget;
14use crate::secrets::{configured_default_chain, SecretProvider};
15use crate::triggers::test_util::clock;
16use crate::trust_graph::AutonomyTier;
17use crate::value::VmClosure;
18
19use super::aggregation::TriggerAggregationConfig;
20use super::dispatcher::TriggerRetryConfig;
21use super::flow_control::TriggerFlowControlConfig;
22use super::ProviderId;
23
24#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
25pub struct TriggerId(String);
26
27impl TriggerId {
28 pub fn new(value: impl Into<String>) -> Self {
29 Self(value.into())
30 }
31
32 pub fn as_str(&self) -> &str {
33 &self.0
34 }
35}
36
37impl std::fmt::Display for TriggerId {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 self.0.fmt(f)
40 }
41}
42
43#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
44#[serde(rename_all = "snake_case")]
45pub enum TriggerState {
46 Registering,
47 Active,
48 Paused,
49 Draining,
50 Terminated,
51}
52
53impl TriggerState {
54 pub fn as_str(self) -> &'static str {
55 match self {
56 Self::Registering => "registering",
57 Self::Active => "active",
58 Self::Paused => "paused",
59 Self::Draining => "draining",
60 Self::Terminated => "terminated",
61 }
62 }
63}
64
65#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
66#[serde(rename_all = "snake_case")]
67pub enum TriggerBindingSource {
68 Manifest,
69 Dynamic,
70}
71
72impl TriggerBindingSource {
73 pub fn as_str(self) -> &'static str {
74 match self {
75 Self::Manifest => "manifest",
76 Self::Dynamic => "dynamic",
77 }
78 }
79}
80
81#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
82#[serde(rename_all = "snake_case")]
83pub enum TriggerBudgetExhaustionStrategy {
84 #[default]
85 False,
86 RetryLater,
87 Fail,
88 Warn,
89}
90
91impl TriggerBudgetExhaustionStrategy {
92 pub fn as_str(self) -> &'static str {
93 match self {
94 Self::False => "false",
95 Self::RetryLater => "retry_later",
96 Self::Fail => "fail",
97 Self::Warn => "warn",
98 }
99 }
100}
101
102#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
103pub struct OrchestratorBudgetConfig {
104 pub daily_cost_usd: Option<f64>,
105 pub hourly_cost_usd: Option<f64>,
106}
107
108#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
109pub struct OrchestratorBudgetSnapshot {
110 pub daily_cost_usd: Option<f64>,
111 pub hourly_cost_usd: Option<f64>,
112 pub cost_today_usd_micros: u64,
113 pub cost_hour_usd_micros: u64,
114 pub day_utc: i32,
115 pub hour_utc: i64,
116}
117
118#[derive(Debug)]
119struct OrchestratorBudgetState {
120 config: OrchestratorBudgetConfig,
121 day_utc: i32,
122 hour_utc: i64,
123 cost_today_usd_micros: u64,
124 cost_hour_usd_micros: u64,
125}
126
127impl Default for OrchestratorBudgetState {
128 fn default() -> Self {
129 Self {
130 config: OrchestratorBudgetConfig::default(),
131 day_utc: utc_day_key(),
132 hour_utc: utc_hour_key(),
133 cost_today_usd_micros: 0,
134 cost_hour_usd_micros: 0,
135 }
136 }
137}
138
139#[derive(Clone)]
140pub enum TriggerHandlerSpec {
141 Local {
142 raw: String,
143 closure: Rc<VmClosure>,
144 },
145 A2a {
146 target: String,
147 allow_cleartext: bool,
148 },
149 Worker {
150 queue: String,
151 },
152 Persona {
153 binding: crate::PersonaRuntimeBinding,
154 },
155 AutoResume {
156 worker_id: String,
157 },
158 SpawnToPool {
163 pool: String,
164 priority_from: Option<String>,
165 key_from: Option<String>,
166 task_factory: Rc<VmClosure>,
167 },
168 ReminderInject {
177 target: TargetExpr,
178 body: String,
179 tags: Vec<String>,
180 ttl_turns: Option<i64>,
181 dedupe_key: Option<String>,
182 propagate: crate::llm::helpers::ReminderPropagate,
183 role_hint: crate::llm::helpers::ReminderRoleHint,
184 preserve_on_compact: bool,
185 },
186 InterruptAndSuspend {
199 target_agents: AgentScope,
200 reason: String,
201 },
202}
203
204#[derive(Clone)]
212pub enum AgentScope {
213 All,
214 Concrete(Vec<String>),
215 Closure(Rc<VmClosure>),
216}
217
218impl AgentScope {
219 pub fn kind(&self) -> &'static str {
220 match self {
221 Self::All => "all",
222 Self::Concrete(_) => "concrete",
223 Self::Closure(_) => "closure",
224 }
225 }
226}
227
228impl std::fmt::Debug for AgentScope {
229 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
230 match self {
231 Self::All => f.write_str("All"),
232 Self::Concrete(ids) => f.debug_tuple("Concrete").field(ids).finish(),
233 Self::Closure(_) => f.write_str("Closure(<vm_closure>)"),
234 }
235 }
236}
237
238#[derive(Clone)]
245pub enum TargetExpr {
246 Current,
247 Parent,
248 Concrete(String),
249 Closure(Rc<VmClosure>),
250}
251
252impl TargetExpr {
253 pub fn kind(&self) -> &'static str {
254 match self {
255 Self::Current => "current",
256 Self::Parent => "parent",
257 Self::Concrete(_) => "concrete",
258 Self::Closure(_) => "closure",
259 }
260 }
261}
262
263impl std::fmt::Debug for TargetExpr {
264 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
265 match self {
266 Self::Current => f.write_str("Current"),
267 Self::Parent => f.write_str("Parent"),
268 Self::Concrete(id) => f.debug_tuple("Concrete").field(id).finish(),
269 Self::Closure(_) => f.write_str("Closure(<vm_closure>)"),
270 }
271 }
272}
273
274impl std::fmt::Debug for TriggerHandlerSpec {
275 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276 match self {
277 Self::Local { raw, .. } => f.debug_struct("Local").field("raw", raw).finish(),
278 Self::A2a {
279 target,
280 allow_cleartext,
281 } => f
282 .debug_struct("A2a")
283 .field("target", target)
284 .field("allow_cleartext", allow_cleartext)
285 .finish(),
286 Self::Worker { queue } => f.debug_struct("Worker").field("queue", queue).finish(),
287 Self::Persona { binding } => f
288 .debug_struct("Persona")
289 .field("name", &binding.name)
290 .finish(),
291 Self::AutoResume { worker_id } => f
292 .debug_struct("AutoResume")
293 .field("worker_id", worker_id)
294 .finish(),
295 Self::SpawnToPool {
296 pool,
297 priority_from,
298 key_from,
299 ..
300 } => f
301 .debug_struct("SpawnToPool")
302 .field("pool", pool)
303 .field("priority_from", priority_from)
304 .field("key_from", key_from)
305 .finish(),
306 Self::ReminderInject {
307 target,
308 body,
309 tags,
310 ttl_turns,
311 dedupe_key,
312 propagate,
313 role_hint,
314 preserve_on_compact,
315 } => f
316 .debug_struct("ReminderInject")
317 .field("target", target)
318 .field("body", body)
319 .field("tags", tags)
320 .field("ttl_turns", ttl_turns)
321 .field("dedupe_key", dedupe_key)
322 .field("propagate", propagate)
323 .field("role_hint", role_hint)
324 .field("preserve_on_compact", preserve_on_compact)
325 .finish(),
326 Self::InterruptAndSuspend {
327 target_agents,
328 reason,
329 } => f
330 .debug_struct("InterruptAndSuspend")
331 .field("target_agents", target_agents)
332 .field("reason", reason)
333 .finish(),
334 }
335 }
336}
337
338impl TriggerHandlerSpec {
339 pub fn kind(&self) -> &'static str {
340 match self {
341 Self::Local { .. } => "local",
342 Self::A2a { .. } => "a2a",
343 Self::Worker { .. } => "worker",
344 Self::Persona { .. } => "persona",
345 Self::AutoResume { .. } => "auto_resume",
346 Self::SpawnToPool { .. } => "spawn_to_pool",
347 Self::ReminderInject { .. } => "reminder_inject",
348 Self::InterruptAndSuspend { .. } => "interrupt_and_suspend",
349 }
350 }
351}
352
353#[derive(Clone)]
354pub struct TriggerPredicateSpec {
355 pub raw: String,
356 pub closure: Rc<VmClosure>,
357}
358
359impl std::fmt::Debug for TriggerPredicateSpec {
360 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
361 f.debug_struct("TriggerPredicateSpec")
362 .field("raw", &self.raw)
363 .finish()
364 }
365}
366
367#[derive(Clone, Debug)]
368pub struct TriggerBindingSpec {
369 pub id: String,
370 pub source: TriggerBindingSource,
371 pub kind: String,
372 pub provider: ProviderId,
373 pub autonomy_tier: AutonomyTier,
374 pub handler: TriggerHandlerSpec,
375 pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
376 pub when: Option<TriggerPredicateSpec>,
377 pub when_budget: Option<TriggerPredicateBudget>,
378 pub retry: TriggerRetryConfig,
379 pub match_events: Vec<String>,
380 pub dedupe_key: Option<String>,
381 pub dedupe_retention_days: u32,
382 pub filter: Option<String>,
383 pub daily_cost_usd: Option<f64>,
384 pub hourly_cost_usd: Option<f64>,
385 pub max_autonomous_decisions_per_hour: Option<u64>,
386 pub max_autonomous_decisions_per_day: Option<u64>,
387 pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
388 pub max_concurrent: Option<u32>,
389 pub flow_control: TriggerFlowControlConfig,
390 pub aggregation: Option<TriggerAggregationConfig>,
395 pub manifest_path: Option<PathBuf>,
396 pub package_name: Option<String>,
397 pub definition_fingerprint: String,
398}
399
400#[derive(Debug)]
401pub struct TriggerMetrics {
402 pub received: AtomicU64,
403 pub dispatched: AtomicU64,
404 pub failed: AtomicU64,
405 pub dlq: AtomicU64,
406 pub last_received_ms: Mutex<Option<i64>>,
407 pub cost_total_usd_micros: AtomicU64,
408 pub cost_today_usd_micros: AtomicU64,
409 pub cost_hour_usd_micros: AtomicU64,
410 pub autonomous_decisions_total: AtomicU64,
411 pub autonomous_decisions_today: AtomicU64,
412 pub autonomous_decisions_hour: AtomicU64,
413}
414
415impl Default for TriggerMetrics {
416 fn default() -> Self {
417 Self {
418 received: AtomicU64::new(0),
419 dispatched: AtomicU64::new(0),
420 failed: AtomicU64::new(0),
421 dlq: AtomicU64::new(0),
422 last_received_ms: Mutex::new(None),
423 cost_total_usd_micros: AtomicU64::new(0),
424 cost_today_usd_micros: AtomicU64::new(0),
425 cost_hour_usd_micros: AtomicU64::new(0),
426 autonomous_decisions_total: AtomicU64::new(0),
427 autonomous_decisions_today: AtomicU64::new(0),
428 autonomous_decisions_hour: AtomicU64::new(0),
429 }
430 }
431}
432
433#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
434pub struct TriggerMetricsSnapshot {
435 pub received: u64,
436 pub dispatched: u64,
437 pub failed: u64,
438 pub dlq: u64,
439 pub in_flight: u64,
440 pub last_received_ms: Option<i64>,
441 pub cost_total_usd_micros: u64,
442 pub cost_today_usd_micros: u64,
443 pub cost_hour_usd_micros: u64,
444 pub autonomous_decisions_total: u64,
445 pub autonomous_decisions_today: u64,
446 pub autonomous_decisions_hour: u64,
447}
448
449pub struct TriggerBinding {
450 pub id: TriggerId,
451 pub version: u32,
452 pub source: TriggerBindingSource,
453 pub kind: String,
454 pub provider: ProviderId,
455 pub autonomy_tier: AutonomyTier,
456 pub handler: TriggerHandlerSpec,
457 pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
458 pub when: Option<TriggerPredicateSpec>,
459 pub when_budget: Option<TriggerPredicateBudget>,
460 pub retry: TriggerRetryConfig,
461 pub match_events: Vec<String>,
462 pub dedupe_key: Option<String>,
463 pub dedupe_retention_days: u32,
464 pub filter: Option<String>,
465 pub daily_cost_usd: Option<f64>,
466 pub hourly_cost_usd: Option<f64>,
467 pub max_autonomous_decisions_per_hour: Option<u64>,
468 pub max_autonomous_decisions_per_day: Option<u64>,
469 pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
470 pub max_concurrent: Option<u32>,
471 pub flow_control: TriggerFlowControlConfig,
472 pub aggregation: Option<TriggerAggregationConfig>,
476 pub manifest_path: Option<PathBuf>,
477 pub package_name: Option<String>,
478 pub definition_fingerprint: String,
479 pub state: Mutex<TriggerState>,
480 pub metrics: TriggerMetrics,
481 pub in_flight: AtomicU64,
482 pub cancel_token: Arc<AtomicBool>,
483 pub predicate_state: Mutex<TriggerPredicateState>,
484}
485
486#[derive(Clone, Debug, Default)]
487pub struct TriggerPredicateState {
488 pub budget_day_utc: Option<i32>,
489 pub budget_hour_utc: Option<i64>,
490 pub consecutive_failures: u32,
491 pub breaker_open_until_ms: Option<i64>,
492 pub recent_cost_usd_micros: VecDeque<u64>,
493}
494
495impl std::fmt::Debug for TriggerBinding {
496 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
497 f.debug_struct("TriggerBinding")
498 .field("id", &self.id)
499 .field("version", &self.version)
500 .field("source", &self.source)
501 .field("kind", &self.kind)
502 .field("provider", &self.provider)
503 .field("handler_kind", &self.handler.kind())
504 .field("state", &self.state_snapshot())
505 .finish()
506 }
507}
508
509impl TriggerBinding {
510 pub fn snapshot(&self) -> TriggerBindingSnapshot {
511 TriggerBindingSnapshot {
512 id: self.id.as_str().to_string(),
513 version: self.version,
514 source: self.source,
515 kind: self.kind.clone(),
516 provider: self.provider.as_str().to_string(),
517 autonomy_tier: self.autonomy_tier,
518 handler_kind: self.handler.kind().to_string(),
519 state: self.state_snapshot(),
520 metrics: self.metrics_snapshot(),
521 daily_cost_usd: self.daily_cost_usd,
522 hourly_cost_usd: self.hourly_cost_usd,
523 max_autonomous_decisions_per_hour: self.max_autonomous_decisions_per_hour,
524 max_autonomous_decisions_per_day: self.max_autonomous_decisions_per_day,
525 on_budget_exhausted: self.on_budget_exhausted,
526 }
527 }
528
529 fn new(spec: TriggerBindingSpec, version: u32) -> Self {
530 Self {
531 id: TriggerId::new(spec.id),
532 version,
533 source: spec.source,
534 kind: spec.kind,
535 provider: spec.provider,
536 autonomy_tier: spec.autonomy_tier,
537 handler: spec.handler,
538 dispatch_priority: spec.dispatch_priority,
539 when: spec.when,
540 when_budget: spec.when_budget,
541 retry: spec.retry,
542 match_events: spec.match_events,
543 dedupe_key: spec.dedupe_key,
544 dedupe_retention_days: spec.dedupe_retention_days,
545 filter: spec.filter,
546 daily_cost_usd: spec.daily_cost_usd,
547 hourly_cost_usd: spec.hourly_cost_usd,
548 max_autonomous_decisions_per_hour: spec.max_autonomous_decisions_per_hour,
549 max_autonomous_decisions_per_day: spec.max_autonomous_decisions_per_day,
550 on_budget_exhausted: spec.on_budget_exhausted,
551 max_concurrent: spec.max_concurrent,
552 flow_control: spec.flow_control,
553 aggregation: spec.aggregation,
554 manifest_path: spec.manifest_path,
555 package_name: spec.package_name,
556 definition_fingerprint: spec.definition_fingerprint,
557 state: Mutex::new(TriggerState::Registering),
558 metrics: TriggerMetrics::default(),
559 in_flight: AtomicU64::new(0),
560 cancel_token: Arc::new(AtomicBool::new(false)),
561 predicate_state: Mutex::new(TriggerPredicateState::default()),
562 }
563 }
564
565 pub fn binding_key(&self) -> String {
566 format!("{}@v{}", self.id.as_str(), self.version)
567 }
568
569 pub fn state_snapshot(&self) -> TriggerState {
570 *self.state.lock().expect("trigger state poisoned")
571 }
572
573 pub fn metrics_snapshot(&self) -> TriggerMetricsSnapshot {
574 TriggerMetricsSnapshot {
575 received: self.metrics.received.load(Ordering::Relaxed),
576 dispatched: self.metrics.dispatched.load(Ordering::Relaxed),
577 failed: self.metrics.failed.load(Ordering::Relaxed),
578 dlq: self.metrics.dlq.load(Ordering::Relaxed),
579 in_flight: self.in_flight.load(Ordering::Relaxed),
580 last_received_ms: *self
581 .metrics
582 .last_received_ms
583 .lock()
584 .expect("trigger metrics poisoned"),
585 cost_total_usd_micros: self.metrics.cost_total_usd_micros.load(Ordering::Relaxed),
586 cost_today_usd_micros: self.metrics.cost_today_usd_micros.load(Ordering::Relaxed),
587 cost_hour_usd_micros: self.metrics.cost_hour_usd_micros.load(Ordering::Relaxed),
588 autonomous_decisions_total: self
589 .metrics
590 .autonomous_decisions_total
591 .load(Ordering::Relaxed),
592 autonomous_decisions_today: self
593 .metrics
594 .autonomous_decisions_today
595 .load(Ordering::Relaxed),
596 autonomous_decisions_hour: self
597 .metrics
598 .autonomous_decisions_hour
599 .load(Ordering::Relaxed),
600 }
601 }
602}
603
604#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
605pub struct TriggerBindingSnapshot {
606 pub id: String,
607 pub version: u32,
608 pub source: TriggerBindingSource,
609 pub kind: String,
610 pub provider: String,
611 pub autonomy_tier: AutonomyTier,
612 pub handler_kind: String,
613 pub state: TriggerState,
614 pub metrics: TriggerMetricsSnapshot,
615 pub daily_cost_usd: Option<f64>,
616 pub hourly_cost_usd: Option<f64>,
617 pub max_autonomous_decisions_per_hour: Option<u64>,
618 pub max_autonomous_decisions_per_day: Option<u64>,
619 pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
620}
621
622#[derive(Clone, Copy, Debug, PartialEq, Eq)]
623pub enum TriggerDispatchOutcome {
624 Dispatched,
625 Failed,
626 Dlq,
627}
628
629#[derive(Debug)]
630pub enum TriggerRegistryError {
631 DuplicateId(String),
632 InvalidSpec(String),
633 UnknownId(String),
634 UnknownBindingVersion { id: String, version: u32 },
635 EventLog(String),
636}
637
638impl std::fmt::Display for TriggerRegistryError {
639 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
640 match self {
641 Self::DuplicateId(id) => write!(f, "duplicate trigger id '{id}'"),
642 Self::InvalidSpec(message) | Self::EventLog(message) => f.write_str(message),
643 Self::UnknownId(id) => write!(f, "unknown trigger id '{id}'"),
644 Self::UnknownBindingVersion { id, version } => {
645 write!(f, "unknown trigger binding '{id}' version {version}")
646 }
647 }
648 }
649}
650
651impl std::error::Error for TriggerRegistryError {}
652
653#[derive(Default)]
654pub struct TriggerRegistry {
655 bindings: BTreeMap<String, Vec<Arc<TriggerBinding>>>,
656 by_provider: BTreeMap<String, BTreeSet<String>>,
657 event_log: Option<Arc<AnyEventLog>>,
658 secret_provider: Option<Arc<dyn SecretProvider>>,
659}
660
661thread_local! {
662 static TRIGGER_REGISTRY: RefCell<TriggerRegistry> = RefCell::new(TriggerRegistry::default());
663}
664
665thread_local! {
666 static ORCHESTRATOR_BUDGET: RefCell<OrchestratorBudgetState> =
667 RefCell::new(OrchestratorBudgetState::default());
668}
669
670const TERMINATED_VERSION_RETENTION_LIMIT: usize = 2;
671
672const TRIGGERS_LIFECYCLE_TOPIC: &str = "triggers.lifecycle";
673const PREDICATE_COST_WINDOW: usize = 100;
674
675#[derive(Clone, Debug, Deserialize)]
676struct LifecycleStateTransitionRecord {
677 id: String,
678 version: u32,
679 #[serde(default)]
680 definition_fingerprint: Option<String>,
681 to_state: TriggerState,
682}
683
684#[derive(Clone, Debug)]
685struct HistoricalLifecycleRecord {
686 occurred_at_ms: i64,
687 transition: LifecycleStateTransitionRecord,
688}
689
690#[derive(Clone, Copy, Debug, PartialEq, Eq)]
691pub struct RecordedTriggerBinding {
692 pub version: u32,
693 pub received_at: OffsetDateTime,
694}
695
696#[derive(Clone, Copy, Debug, Default)]
697struct HistoricalVersionLookup {
698 matching_version: Option<u32>,
699 max_version: Option<u32>,
700}
701
702pub fn clear_trigger_registry() {
703 TRIGGER_REGISTRY.with(|slot| {
704 *slot.borrow_mut() = TriggerRegistry::default();
705 });
706 clear_orchestrator_budget();
707 super::aggregation::clear_aggregation_state();
708}
709
710pub fn install_orchestrator_budget(config: OrchestratorBudgetConfig) {
711 ORCHESTRATOR_BUDGET.with(|slot| {
712 let mut state = slot.borrow_mut();
713 rollover_orchestrator_budget(&mut state);
714 state.config = config;
715 });
716}
717
718pub fn clear_orchestrator_budget() {
719 ORCHESTRATOR_BUDGET.with(|slot| {
720 *slot.borrow_mut() = OrchestratorBudgetState::default();
721 });
722}
723
724pub fn snapshot_orchestrator_budget() -> OrchestratorBudgetSnapshot {
725 ORCHESTRATOR_BUDGET.with(|slot| {
726 let mut state = slot.borrow_mut();
727 rollover_orchestrator_budget(&mut state);
728 OrchestratorBudgetSnapshot {
729 daily_cost_usd: state.config.daily_cost_usd,
730 hourly_cost_usd: state.config.hourly_cost_usd,
731 cost_today_usd_micros: state.cost_today_usd_micros,
732 cost_hour_usd_micros: state.cost_hour_usd_micros,
733 day_utc: state.day_utc,
734 hour_utc: state.hour_utc,
735 }
736 })
737}
738
739pub fn note_orchestrator_budget_cost(cost_usd_micros: u64) {
740 if cost_usd_micros == 0 {
741 return;
742 }
743 ORCHESTRATOR_BUDGET.with(|slot| {
744 let mut state = slot.borrow_mut();
745 rollover_orchestrator_budget(&mut state);
746 state.cost_today_usd_micros = state.cost_today_usd_micros.saturating_add(cost_usd_micros);
747 state.cost_hour_usd_micros = state.cost_hour_usd_micros.saturating_add(cost_usd_micros);
748 });
749}
750
751pub fn orchestrator_budget_would_exceed(expected_cost_usd_micros: u64) -> Option<&'static str> {
752 ORCHESTRATOR_BUDGET.with(|slot| {
753 let mut state = slot.borrow_mut();
754 rollover_orchestrator_budget(&mut state);
755 if state.config.hourly_cost_usd.is_some_and(|limit| {
756 micros_to_usd(
757 state
758 .cost_hour_usd_micros
759 .saturating_add(expected_cost_usd_micros),
760 ) > limit
761 }) {
762 return Some("orchestrator_hourly_budget_exceeded");
763 }
764 if state.config.daily_cost_usd.is_some_and(|limit| {
765 micros_to_usd(
766 state
767 .cost_today_usd_micros
768 .saturating_add(expected_cost_usd_micros),
769 ) > limit
770 }) {
771 return Some("orchestrator_daily_budget_exceeded");
772 }
773 None
774 })
775}
776
777pub fn reset_binding_budget_windows(binding: &TriggerBinding) {
778 let today = utc_day_key();
779 let hour = utc_hour_key();
780 let mut state = binding
781 .predicate_state
782 .lock()
783 .expect("trigger predicate state poisoned");
784 if state.budget_day_utc != Some(today) {
785 state.budget_day_utc = Some(today);
786 binding
787 .metrics
788 .cost_today_usd_micros
789 .store(0, Ordering::Relaxed);
790 binding
791 .metrics
792 .autonomous_decisions_today
793 .store(0, Ordering::Relaxed);
794 }
795 if state.budget_hour_utc != Some(hour) {
796 state.budget_hour_utc = Some(hour);
797 binding
798 .metrics
799 .cost_hour_usd_micros
800 .store(0, Ordering::Relaxed);
801 binding
802 .metrics
803 .autonomous_decisions_hour
804 .store(0, Ordering::Relaxed);
805 }
806}
807
808pub fn binding_budget_would_exceed(
809 binding: &TriggerBinding,
810 expected_cost_usd_micros: u64,
811) -> Option<&'static str> {
812 reset_binding_budget_windows(binding);
813 if binding.hourly_cost_usd.is_some_and(|limit| {
814 micros_to_usd(
815 binding
816 .metrics
817 .cost_hour_usd_micros
818 .load(Ordering::Relaxed)
819 .saturating_add(expected_cost_usd_micros),
820 ) > limit
821 }) {
822 return Some("hourly_budget_exceeded");
823 }
824 if binding.daily_cost_usd.is_some_and(|limit| {
825 micros_to_usd(
826 binding
827 .metrics
828 .cost_today_usd_micros
829 .load(Ordering::Relaxed)
830 .saturating_add(expected_cost_usd_micros),
831 ) > limit
832 }) {
833 return Some("daily_budget_exceeded");
834 }
835 None
836}
837
838pub fn binding_autonomy_budget_would_exceed(binding: &TriggerBinding) -> Option<&'static str> {
839 reset_binding_budget_windows(binding);
840 if binding
841 .max_autonomous_decisions_per_hour
842 .is_some_and(|limit| {
843 binding
844 .metrics
845 .autonomous_decisions_hour
846 .load(Ordering::Relaxed)
847 .saturating_add(1)
848 > limit
849 })
850 {
851 return Some("hourly_autonomy_budget_exceeded");
852 }
853 if binding
854 .max_autonomous_decisions_per_day
855 .is_some_and(|limit| {
856 binding
857 .metrics
858 .autonomous_decisions_today
859 .load(Ordering::Relaxed)
860 .saturating_add(1)
861 > limit
862 })
863 {
864 return Some("daily_autonomy_budget_exceeded");
865 }
866 None
867}
868
869pub fn note_autonomous_decision(binding: &TriggerBinding) {
870 reset_binding_budget_windows(binding);
871 binding
872 .metrics
873 .autonomous_decisions_total
874 .fetch_add(1, Ordering::Relaxed);
875 binding
876 .metrics
877 .autonomous_decisions_today
878 .fetch_add(1, Ordering::Relaxed);
879 binding
880 .metrics
881 .autonomous_decisions_hour
882 .fetch_add(1, Ordering::Relaxed);
883}
884
885pub fn expected_predicate_cost_usd_micros(binding: &TriggerBinding) -> u64 {
886 let state = binding
887 .predicate_state
888 .lock()
889 .expect("trigger predicate state poisoned");
890 if !state.recent_cost_usd_micros.is_empty() {
891 let total: u64 = state.recent_cost_usd_micros.iter().copied().sum();
892 return total / state.recent_cost_usd_micros.len() as u64;
893 }
894 binding
895 .when_budget
896 .as_ref()
897 .and_then(|budget| budget.max_cost_usd)
898 .map(usd_to_micros)
899 .unwrap_or_default()
900}
901
902pub fn record_predicate_cost_sample(binding: &TriggerBinding, cost_usd_micros: u64) {
903 let mut state = binding
904 .predicate_state
905 .lock()
906 .expect("trigger predicate state poisoned");
907 state.recent_cost_usd_micros.push_back(cost_usd_micros);
908 while state.recent_cost_usd_micros.len() > PREDICATE_COST_WINDOW {
909 state.recent_cost_usd_micros.pop_front();
910 }
911}
912
913pub fn usd_to_micros(value: f64) -> u64 {
914 if !value.is_finite() || value <= 0.0 {
915 return 0;
916 }
917 (value * 1_000_000.0).ceil() as u64
918}
919
920pub fn micros_to_usd(value: u64) -> f64 {
921 value as f64 / 1_000_000.0
922}
923
924fn rollover_orchestrator_budget(state: &mut OrchestratorBudgetState) {
925 let today = utc_day_key();
926 let hour = utc_hour_key();
927 if state.day_utc != today {
928 state.day_utc = today;
929 state.cost_today_usd_micros = 0;
930 }
931 if state.hour_utc != hour {
932 state.hour_utc = hour;
933 state.cost_hour_usd_micros = 0;
934 }
935}
936
937fn utc_day_key() -> i32 {
938 (clock::now_utc().date()
939 - time::Date::from_calendar_date(1970, time::Month::January, 1).expect("valid epoch date"))
940 .whole_days() as i32
941}
942
943fn utc_hour_key() -> i64 {
944 clock::now_utc().unix_timestamp() / 3_600
945}
946
947pub fn snapshot_trigger_bindings() -> Vec<TriggerBindingSnapshot> {
948 TRIGGER_REGISTRY.with(|slot| {
949 let registry = slot.borrow();
950 let mut snapshots = Vec::new();
951 for bindings in registry.bindings.values() {
952 for binding in bindings {
953 snapshots.push(binding.snapshot());
954 }
955 }
956 snapshots.sort_by(|left, right| {
957 left.id
958 .cmp(&right.id)
959 .then(left.version.cmp(&right.version))
960 .then(left.state.as_str().cmp(right.state.as_str()))
961 });
962 snapshots
963 })
964}
965
966#[allow(clippy::arc_with_non_send_sync)]
967pub fn resolve_trigger_binding_as_of(
968 id: &str,
969 as_of: OffsetDateTime,
970) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
971 let version = binding_version_as_of(id, as_of)?;
972 resolve_trigger_binding_version(id, version)
973}
974
975#[allow(clippy::arc_with_non_send_sync)]
976pub fn resolve_live_or_as_of(
977 id: &str,
978 recorded: RecordedTriggerBinding,
979) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
980 match resolve_live_trigger_binding(id, Some(recorded.version)) {
981 Ok(binding) => Ok(binding),
982 Err(TriggerRegistryError::UnknownBindingVersion { .. }) => {
983 let binding = resolve_trigger_binding_as_of(id, recorded.received_at)?;
984 let mut metadata = BTreeMap::new();
985 metadata.insert("trigger_id".to_string(), serde_json::json!(id));
986 metadata.insert(
987 "recorded_version".to_string(),
988 serde_json::json!(recorded.version),
989 );
990 metadata.insert(
991 "received_at".to_string(),
992 serde_json::json!(recorded
993 .received_at
994 .format(&time::format_description::well_known::Rfc3339)
995 .unwrap_or_else(|_| recorded.received_at.to_string())),
996 );
997 metadata.insert(
998 "resolved_version".to_string(),
999 serde_json::json!(binding.version),
1000 );
1001 crate::events::log_warn_meta(
1002 "replay.binding_version_gc_fallback",
1003 "trigger replay fell back to lifecycle history after binding version GC",
1004 metadata,
1005 );
1006 Ok(binding)
1007 }
1008 Err(error) => Err(error),
1009 }
1010}
1011
1012pub fn binding_version_as_of(id: &str, as_of: OffsetDateTime) -> Result<u32, TriggerRegistryError> {
1013 TRIGGER_REGISTRY.with(|slot| {
1014 let registry = slot.borrow();
1015 registry.binding_version_as_of(id, as_of)
1016 })
1017}
1018
1019#[allow(clippy::arc_with_non_send_sync)]
1020fn resolve_trigger_binding_version(
1021 id: &str,
1022 version: u32,
1023) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
1024 TRIGGER_REGISTRY.with(|slot| {
1025 let registry = slot.borrow();
1026 registry
1027 .binding(id, version)
1028 .ok_or_else(|| TriggerRegistryError::UnknownBindingVersion {
1029 id: id.to_string(),
1030 version,
1031 })
1032 })
1033}
1034
1035#[allow(clippy::arc_with_non_send_sync)]
1036pub fn resolve_live_trigger_binding(
1037 id: &str,
1038 version: Option<u32>,
1039) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
1040 TRIGGER_REGISTRY.with(|slot| {
1041 let registry = slot.borrow();
1042 if let Some(version) = version {
1043 let binding = registry.binding(id, version).ok_or_else(|| {
1044 TriggerRegistryError::UnknownBindingVersion {
1045 id: id.to_string(),
1046 version,
1047 }
1048 })?;
1049 if binding.state_snapshot() == TriggerState::Terminated {
1050 return Err(TriggerRegistryError::UnknownBindingVersion {
1051 id: id.to_string(),
1052 version,
1053 });
1054 }
1055 return Ok(binding);
1056 }
1057
1058 registry
1059 .live_bindings_any_source(id)
1060 .into_iter()
1061 .max_by_key(|binding| binding.version)
1062 .ok_or_else(|| TriggerRegistryError::UnknownId(id.to_string()))
1063 })
1064}
1065
1066pub(crate) fn channel_bindings_matching(
1072 scope: &str,
1073 scope_id: &str,
1074 name: &str,
1075) -> Vec<Arc<TriggerBinding>> {
1076 TRIGGER_REGISTRY.with(|slot| {
1077 let registry = slot.borrow();
1078 let Some(binding_ids) = registry.by_provider.get("channel") else {
1079 return Vec::new();
1080 };
1081 let mut bindings = Vec::new();
1082 for id in binding_ids {
1083 let Some(versions) = registry.bindings.get(id) else {
1084 continue;
1085 };
1086 for binding in versions {
1087 if binding.state_snapshot() != TriggerState::Active {
1088 continue;
1089 }
1090 let Some(selector_raw) = binding.match_events.first() else {
1091 continue;
1092 };
1093 let Ok(selector) = crate::channels::ChannelSelector::parse(selector_raw) else {
1094 continue;
1095 };
1096 if !selector.matches(scope, scope_id, name, scope_id) {
1100 continue;
1101 }
1102 bindings.push(binding.clone());
1103 }
1104 }
1105 bindings.sort_by(|left, right| {
1106 left.id
1107 .as_str()
1108 .cmp(right.id.as_str())
1109 .then(left.version.cmp(&right.version))
1110 });
1111 bindings
1112 })
1113}
1114
1115pub(crate) fn matching_bindings(event: &super::TriggerEvent) -> Vec<Arc<TriggerBinding>> {
1116 TRIGGER_REGISTRY.with(|slot| {
1117 let registry = slot.borrow();
1118 let Some(binding_ids) = registry.by_provider.get(event.provider.as_str()) else {
1119 return Vec::new();
1120 };
1121
1122 let mut bindings = Vec::new();
1123 for id in binding_ids {
1124 let Some(versions) = registry.bindings.get(id) else {
1125 continue;
1126 };
1127 for binding in versions {
1128 if binding.state_snapshot() != TriggerState::Active {
1129 continue;
1130 }
1131 if !binding.match_events.is_empty()
1132 && !binding.match_events.iter().any(|kind| kind == &event.kind)
1133 {
1134 continue;
1135 }
1136 bindings.push(binding.clone());
1137 }
1138 }
1139
1140 bindings.sort_by(|left, right| {
1141 left.id
1142 .as_str()
1143 .cmp(right.id.as_str())
1144 .then(left.version.cmp(&right.version))
1145 });
1146 bindings
1147 })
1148}
1149
1150pub async fn install_manifest_triggers(
1151 specs: Vec<TriggerBindingSpec>,
1152) -> Result<(), TriggerRegistryError> {
1153 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1154 let registry = &mut *slot.borrow_mut();
1155 registry.refresh_runtime_context();
1156 let mut touched_ids = BTreeSet::new();
1157
1158 let mut incoming = BTreeMap::new();
1159 for spec in specs {
1160 let spec_id = spec.id.clone();
1161 if spec.source != TriggerBindingSource::Manifest {
1162 return Err(TriggerRegistryError::InvalidSpec(format!(
1163 "manifest install received non-manifest trigger '{}'",
1164 spec_id
1165 )));
1166 }
1167 if spec_id.trim().is_empty() {
1168 return Err(TriggerRegistryError::InvalidSpec(
1169 "manifest trigger id cannot be empty".to_string(),
1170 ));
1171 }
1172 if incoming.insert(spec_id.clone(), spec).is_some() {
1173 return Err(TriggerRegistryError::DuplicateId(spec_id));
1174 }
1175 }
1176
1177 let mut lifecycle = Vec::new();
1178 let existing_ids: Vec<String> = registry
1179 .bindings
1180 .iter()
1181 .filter(|(_, bindings)| {
1182 bindings.iter().any(|binding| {
1183 binding.source == TriggerBindingSource::Manifest
1184 && binding.state_snapshot() != TriggerState::Terminated
1185 })
1186 })
1187 .map(|(id, _)| id.clone())
1188 .collect();
1189
1190 for id in existing_ids {
1191 let live_manifest = registry.live_bindings(&id, TriggerBindingSource::Manifest);
1192 let Some(spec) = incoming.remove(&id) else {
1193 for binding in live_manifest {
1194 registry.transition_binding_to_draining(&binding, &mut lifecycle);
1195 }
1196 touched_ids.insert(id.clone());
1197 continue;
1198 };
1199
1200 let has_matching_active = live_manifest.iter().any(|binding| {
1201 binding.definition_fingerprint == spec.definition_fingerprint
1202 && matches!(
1203 binding.state_snapshot(),
1204 TriggerState::Registering | TriggerState::Active
1205 )
1206 });
1207 if has_matching_active {
1208 continue;
1209 }
1210
1211 for binding in live_manifest {
1212 registry.transition_binding_to_draining(&binding, &mut lifecycle);
1213 }
1214
1215 let version = registry.next_version_for_spec(&spec);
1216 registry.register_binding(spec, version, &mut lifecycle);
1217 touched_ids.insert(id.clone());
1218 }
1219
1220 for spec in incoming.into_values() {
1221 touched_ids.insert(spec.id.clone());
1222 let version = registry.next_version_for_spec(&spec);
1223 registry.register_binding(spec, version, &mut lifecycle);
1224 }
1225
1226 for id in touched_ids {
1227 registry.gc_terminated_versions(&id);
1228 }
1229
1230 Ok((registry.event_log.clone(), lifecycle))
1231 })?;
1232
1233 append_lifecycle_events(event_log, events).await
1234}
1235
1236pub async fn dynamic_register(
1237 mut spec: TriggerBindingSpec,
1238) -> Result<TriggerId, TriggerRegistryError> {
1239 if spec.id.trim().is_empty() {
1240 spec.id = format!("dynamic_trigger_{}", Uuid::now_v7());
1241 }
1242 spec.source = TriggerBindingSource::Dynamic;
1243 let id = spec.id.clone();
1244 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1245 let registry = &mut *slot.borrow_mut();
1246 registry.refresh_runtime_context();
1247
1248 if registry.bindings.contains_key(id.as_str()) {
1249 return Err(TriggerRegistryError::DuplicateId(id.clone()));
1250 }
1251
1252 let mut lifecycle = Vec::new();
1253 let version = registry.next_version_for_spec(&spec);
1254 registry.register_binding(spec, version, &mut lifecycle);
1255 Ok((registry.event_log.clone(), lifecycle))
1256 })?;
1257
1258 append_lifecycle_events(event_log, events).await?;
1259 Ok(TriggerId::new(id))
1260}
1261
1262pub async fn dynamic_deregister(id: &str) -> Result<(), TriggerRegistryError> {
1263 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1264 let registry = &mut *slot.borrow_mut();
1265 let live_dynamic = registry.live_bindings(id, TriggerBindingSource::Dynamic);
1266 if live_dynamic.is_empty() {
1267 return Err(TriggerRegistryError::UnknownId(id.to_string()));
1268 }
1269
1270 let mut lifecycle = Vec::new();
1271 for binding in live_dynamic {
1272 registry.transition_binding_to_draining(&binding, &mut lifecycle);
1273 }
1274 Ok((registry.event_log.clone(), lifecycle))
1275 })?;
1276
1277 append_lifecycle_events(event_log, events).await
1278}
1279
1280pub async fn drain(id: &str) -> Result<(), TriggerRegistryError> {
1281 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1282 let registry = &mut *slot.borrow_mut();
1283 let live = registry.live_bindings_any_source(id);
1284 if live.is_empty() {
1285 return Err(TriggerRegistryError::UnknownId(id.to_string()));
1286 }
1287
1288 let mut lifecycle = Vec::new();
1289 for binding in live {
1290 registry.transition_binding_to_draining(&binding, &mut lifecycle);
1291 }
1292 Ok((registry.event_log.clone(), lifecycle))
1293 })?;
1294
1295 append_lifecycle_events(event_log, events).await
1296}
1297
1298pub async fn pause(id: &str) -> Result<(), TriggerRegistryError> {
1299 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1300 let registry = &mut *slot.borrow_mut();
1301 let live = registry.live_bindings_any_source(id);
1302 if live.is_empty() {
1303 return Err(TriggerRegistryError::UnknownId(id.to_string()));
1304 }
1305
1306 let mut lifecycle = Vec::new();
1307 for binding in live {
1308 match binding.state_snapshot() {
1309 TriggerState::Registering | TriggerState::Active => {
1310 registry.transition_binding_state(
1311 &binding,
1312 TriggerState::Paused,
1313 &mut lifecycle,
1314 );
1315 }
1316 TriggerState::Paused | TriggerState::Draining | TriggerState::Terminated => {}
1317 }
1318 }
1319 Ok((registry.event_log.clone(), lifecycle))
1320 })?;
1321
1322 append_lifecycle_events(event_log, events).await
1323}
1324
1325pub async fn resume(id: &str) -> Result<(), TriggerRegistryError> {
1326 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1327 let registry = &mut *slot.borrow_mut();
1328 let live = registry.live_bindings_any_source(id);
1329 if live.is_empty() {
1330 return Err(TriggerRegistryError::UnknownId(id.to_string()));
1331 }
1332
1333 let mut lifecycle = Vec::new();
1334 for binding in live {
1335 if binding.state_snapshot() == TriggerState::Paused {
1336 registry.transition_binding_state(&binding, TriggerState::Active, &mut lifecycle);
1337 }
1338 }
1339 Ok((registry.event_log.clone(), lifecycle))
1340 })?;
1341
1342 append_lifecycle_events(event_log, events).await
1343}
1344
1345fn pin_trigger_binding_inner(
1346 id: &str,
1347 version: u32,
1348 allow_terminated: bool,
1349) -> Result<(), TriggerRegistryError> {
1350 TRIGGER_REGISTRY.with(|slot| {
1351 let registry = slot.borrow();
1352 let binding = registry.binding(id, version).ok_or_else(|| {
1353 TriggerRegistryError::UnknownBindingVersion {
1354 id: id.to_string(),
1355 version,
1356 }
1357 })?;
1358 match binding.state_snapshot() {
1359 TriggerState::Paused => Err(TriggerRegistryError::InvalidSpec(format!(
1360 "trigger binding '{}' version {} is paused",
1361 id, version
1362 ))),
1363 TriggerState::Terminated if !allow_terminated => {
1364 Err(TriggerRegistryError::InvalidSpec(format!(
1365 "trigger binding '{}' version {} is terminated",
1366 id, version
1367 )))
1368 }
1369 _ => {
1370 binding.in_flight.fetch_add(1, Ordering::Relaxed);
1371 Ok(())
1372 }
1373 }
1374 })
1375}
1376
1377pub fn pin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1378 pin_trigger_binding_inner(id, version, false)
1379}
1380
1381pub async fn unpin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1382 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1383 let registry = &mut *slot.borrow_mut();
1384 let binding = registry.binding(id, version).ok_or_else(|| {
1385 TriggerRegistryError::UnknownBindingVersion {
1386 id: id.to_string(),
1387 version,
1388 }
1389 })?;
1390 let current = binding.in_flight.load(Ordering::Relaxed);
1391 if current == 0 {
1392 return Err(TriggerRegistryError::InvalidSpec(format!(
1393 "trigger binding '{}' version {} has no in-flight events",
1394 id, version
1395 )));
1396 }
1397 binding.in_flight.fetch_sub(1, Ordering::Relaxed);
1398
1399 let mut lifecycle = Vec::new();
1400 registry.maybe_finalize_draining(&binding, &mut lifecycle);
1401 registry.gc_terminated_versions(binding.id.as_str());
1402 Ok((registry.event_log.clone(), lifecycle))
1403 })?;
1404
1405 append_lifecycle_events(event_log, events).await
1406}
1407
1408pub fn begin_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1409 begin_in_flight_inner(id, version, false)
1410}
1411
1412pub(crate) fn begin_replay_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1413 begin_in_flight_inner(id, version, true)
1414}
1415
1416fn begin_in_flight_inner(
1417 id: &str,
1418 version: u32,
1419 allow_terminated: bool,
1420) -> Result<(), TriggerRegistryError> {
1421 pin_trigger_binding_inner(id, version, allow_terminated)?;
1422 TRIGGER_REGISTRY.with(|slot| {
1423 let registry = slot.borrow();
1424 let binding = registry.binding(id, version).ok_or_else(|| {
1425 TriggerRegistryError::UnknownBindingVersion {
1426 id: id.to_string(),
1427 version,
1428 }
1429 })?;
1430 binding.metrics.received.fetch_add(1, Ordering::Relaxed);
1431 *binding
1432 .metrics
1433 .last_received_ms
1434 .lock()
1435 .expect("trigger metrics poisoned") = Some(now_ms());
1436 Ok(())
1437 })
1438}
1439
1440pub async fn finish_in_flight(
1441 id: &str,
1442 version: u32,
1443 outcome: TriggerDispatchOutcome,
1444) -> Result<(), TriggerRegistryError> {
1445 TRIGGER_REGISTRY.with(|slot| {
1446 let registry = &mut *slot.borrow_mut();
1447 let binding = registry.binding(id, version).ok_or_else(|| {
1448 TriggerRegistryError::UnknownBindingVersion {
1449 id: id.to_string(),
1450 version,
1451 }
1452 })?;
1453 let current = binding.in_flight.load(Ordering::Relaxed);
1454 if current == 0 {
1455 return Err(TriggerRegistryError::InvalidSpec(format!(
1456 "trigger binding '{}' version {} has no in-flight events",
1457 id, version
1458 )));
1459 }
1460 match outcome {
1461 TriggerDispatchOutcome::Dispatched => {
1462 binding.metrics.dispatched.fetch_add(1, Ordering::Relaxed);
1463 }
1464 TriggerDispatchOutcome::Failed => {
1465 binding.metrics.failed.fetch_add(1, Ordering::Relaxed);
1466 }
1467 TriggerDispatchOutcome::Dlq => {
1468 binding.metrics.dlq.fetch_add(1, Ordering::Relaxed);
1469 }
1470 }
1471 Ok(())
1472 })?;
1473
1474 unpin_trigger_binding(id, version).await
1475}
1476
1477impl TriggerRegistry {
1478 fn refresh_runtime_context(&mut self) {
1479 if self.event_log.is_none() {
1480 self.event_log = active_event_log();
1481 }
1482 if self.secret_provider.is_none() {
1483 self.secret_provider = default_secret_provider();
1484 }
1485 }
1486
1487 fn binding(&self, id: &str, version: u32) -> Option<Arc<TriggerBinding>> {
1488 self.bindings
1489 .get(id)
1490 .and_then(|bindings| bindings.iter().find(|binding| binding.version == version))
1491 .cloned()
1492 }
1493
1494 fn live_bindings(&self, id: &str, source: TriggerBindingSource) -> Vec<Arc<TriggerBinding>> {
1495 self.bindings
1496 .get(id)
1497 .into_iter()
1498 .flat_map(|bindings| bindings.iter())
1499 .filter(|binding| {
1500 binding.source == source && binding.state_snapshot() != TriggerState::Terminated
1501 })
1502 .cloned()
1503 .collect()
1504 }
1505
1506 fn live_bindings_any_source(&self, id: &str) -> Vec<Arc<TriggerBinding>> {
1507 self.bindings
1508 .get(id)
1509 .into_iter()
1510 .flat_map(|bindings| bindings.iter())
1511 .filter(|binding| binding.state_snapshot() != TriggerState::Terminated)
1512 .cloned()
1513 .collect()
1514 }
1515
1516 fn next_version_for_spec(&self, spec: &TriggerBindingSpec) -> u32 {
1517 if let Some(version) = self
1518 .bindings
1519 .get(spec.id.as_str())
1520 .into_iter()
1521 .flat_map(|bindings| bindings.iter())
1522 .find(|binding| binding.definition_fingerprint == spec.definition_fingerprint)
1523 .map(|binding| binding.version)
1524 {
1525 return version;
1526 }
1527
1528 let historical =
1529 self.historical_versions_for(spec.id.as_str(), spec.definition_fingerprint.as_str());
1530 if let Some(version) = historical.matching_version {
1531 return version;
1532 }
1533
1534 self.bindings
1535 .get(spec.id.as_str())
1536 .into_iter()
1537 .flat_map(|bindings| bindings.iter())
1538 .map(|binding| binding.version)
1539 .chain(historical.max_version)
1540 .max()
1541 .unwrap_or(0)
1542 + 1
1543 }
1544
1545 fn gc_terminated_versions(&mut self, id: &str) {
1546 let Some(bindings) = self.bindings.get_mut(id) else {
1547 return;
1548 };
1549
1550 let mut newest_versions: Vec<u32> =
1551 bindings.iter().map(|binding| binding.version).collect();
1552 newest_versions.sort_unstable_by(|left, right| right.cmp(left));
1553 newest_versions.truncate(TERMINATED_VERSION_RETENTION_LIMIT);
1554 let retained_versions: BTreeSet<u32> = newest_versions.into_iter().collect();
1555
1556 bindings.retain(|binding| {
1557 binding.state_snapshot() != TriggerState::Terminated
1558 || retained_versions.contains(&binding.version)
1559 });
1560
1561 if bindings.is_empty() {
1562 self.bindings.remove(id);
1563 }
1564 }
1565
1566 fn historical_versions_for(&self, id: &str, fingerprint: &str) -> HistoricalVersionLookup {
1567 let mut lookup = HistoricalVersionLookup::default();
1568 for record in self.lifecycle_records_for(id) {
1569 lookup.max_version = Some(
1570 lookup
1571 .max_version
1572 .unwrap_or(0)
1573 .max(record.transition.version),
1574 );
1575 if record.transition.definition_fingerprint.as_deref() == Some(fingerprint) {
1576 lookup.matching_version = Some(record.transition.version);
1577 }
1578 }
1579 lookup
1580 }
1581
1582 fn binding_version_as_of(
1583 &self,
1584 id: &str,
1585 as_of: OffsetDateTime,
1586 ) -> Result<u32, TriggerRegistryError> {
1587 let cutoff_ms = (as_of.unix_timestamp_nanos() / 1_000_000) as i64;
1588 let mut active_version = None;
1589 for record in self.lifecycle_records_for(id) {
1590 if record.occurred_at_ms > cutoff_ms {
1591 break;
1592 }
1593 match record.transition.to_state {
1594 TriggerState::Active => active_version = Some(record.transition.version),
1595 TriggerState::Paused | TriggerState::Draining | TriggerState::Terminated => {
1596 if active_version == Some(record.transition.version) {
1597 active_version = None;
1598 }
1599 }
1600 TriggerState::Registering => {}
1601 }
1602 }
1603
1604 active_version.ok_or_else(|| {
1605 TriggerRegistryError::InvalidSpec(format!(
1606 "no active trigger binding '{}' at {}",
1607 id,
1608 as_of
1609 .format(&time::format_description::well_known::Rfc3339)
1610 .unwrap_or_else(|_| as_of.to_string())
1611 ))
1612 })
1613 }
1614
1615 fn lifecycle_records_for(&self, id: &str) -> Vec<HistoricalLifecycleRecord> {
1616 let Some(event_log) = self.event_log.as_ref() else {
1617 return Vec::new();
1618 };
1619 let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1620 .expect("static triggers.lifecycle topic should always be valid");
1621 futures::executor::block_on(event_log.read_range(&topic, None, usize::MAX))
1622 .unwrap_or_default()
1623 .into_iter()
1624 .filter_map(|(_, event)| {
1625 let occurred_at_ms = event.occurred_at_ms;
1626 let transition: LifecycleStateTransitionRecord =
1627 serde_json::from_value(event.payload).ok()?;
1628 (transition.id == id).then_some(HistoricalLifecycleRecord {
1629 occurred_at_ms,
1630 transition,
1631 })
1632 })
1633 .collect()
1634 }
1635
1636 #[allow(clippy::arc_with_non_send_sync)]
1637 fn register_binding(
1638 &mut self,
1639 spec: TriggerBindingSpec,
1640 version: u32,
1641 lifecycle: &mut Vec<LogEvent>,
1642 ) -> Arc<TriggerBinding> {
1643 let binding = Arc::new(TriggerBinding::new(spec, version));
1644 self.by_provider
1645 .entry(binding.provider.as_str().to_string())
1646 .or_default()
1647 .insert(binding.id.as_str().to_string());
1648 self.bindings
1649 .entry(binding.id.as_str().to_string())
1650 .or_default()
1651 .push(binding.clone());
1652 lifecycle.push(lifecycle_event(&binding, None, TriggerState::Registering));
1653 self.transition_binding_state(&binding, TriggerState::Active, lifecycle);
1654 binding
1655 }
1656
1657 fn transition_binding_to_draining(
1658 &self,
1659 binding: &Arc<TriggerBinding>,
1660 lifecycle: &mut Vec<LogEvent>,
1661 ) {
1662 if matches!(binding.state_snapshot(), TriggerState::Terminated) {
1663 return;
1664 }
1665 self.transition_binding_state(binding, TriggerState::Draining, lifecycle);
1666 self.maybe_finalize_draining(binding, lifecycle);
1667 }
1668
1669 fn maybe_finalize_draining(
1670 &self,
1671 binding: &Arc<TriggerBinding>,
1672 lifecycle: &mut Vec<LogEvent>,
1673 ) {
1674 if binding.state_snapshot() == TriggerState::Draining
1675 && binding.in_flight.load(Ordering::Relaxed) == 0
1676 {
1677 self.transition_binding_state(binding, TriggerState::Terminated, lifecycle);
1678 }
1679 }
1680
1681 fn transition_binding_state(
1682 &self,
1683 binding: &Arc<TriggerBinding>,
1684 next: TriggerState,
1685 lifecycle: &mut Vec<LogEvent>,
1686 ) {
1687 let mut state = binding.state.lock().expect("trigger state poisoned");
1688 let previous = *state;
1689 if previous == next {
1690 return;
1691 }
1692 *state = next;
1693 drop(state);
1694 if next == TriggerState::Terminated && binding.aggregation.is_some() {
1700 let _ = super::aggregation::drop_binding_aggregation(&binding.binding_key());
1701 }
1702 lifecycle.push(lifecycle_event(binding, Some(previous), next));
1703 }
1704}
1705
1706fn lifecycle_event(
1707 binding: &TriggerBinding,
1708 from_state: Option<TriggerState>,
1709 to_state: TriggerState,
1710) -> LogEvent {
1711 LogEvent::new(
1712 "state_transition",
1713 serde_json::json!({
1714 "id": binding.id.as_str(),
1715 "binding_key": binding.binding_key(),
1716 "version": binding.version,
1717 "provider": binding.provider.as_str(),
1718 "kind": &binding.kind,
1719 "source": binding.source.as_str(),
1720 "handler_kind": binding.handler.kind(),
1721 "definition_fingerprint": &binding.definition_fingerprint,
1722 "from_state": from_state.map(TriggerState::as_str),
1723 "to_state": to_state.as_str(),
1724 }),
1725 )
1726}
1727
1728async fn append_lifecycle_events(
1729 event_log: Option<Arc<AnyEventLog>>,
1730 events: Vec<LogEvent>,
1731) -> Result<(), TriggerRegistryError> {
1732 let Some(event_log) = event_log else {
1733 return Ok(());
1734 };
1735 if events.is_empty() {
1736 return Ok(());
1737 }
1738
1739 let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1740 .expect("static triggers.lifecycle topic should always be valid");
1741 for event in events {
1742 event_log
1743 .append(&topic, event)
1744 .await
1745 .map_err(|error| TriggerRegistryError::EventLog(error.to_string()))?;
1746 }
1747 Ok(())
1748}
1749
1750fn default_secret_provider() -> Option<Arc<dyn SecretProvider>> {
1751 configured_default_chain(default_secret_namespace())
1752 .ok()
1753 .map(|provider| Arc::new(provider) as Arc<dyn SecretProvider>)
1754}
1755
1756fn default_secret_namespace() -> String {
1757 if let Ok(namespace) = std::env::var("HARN_SECRET_NAMESPACE") {
1758 if !namespace.trim().is_empty() {
1759 return namespace;
1760 }
1761 }
1762
1763 let cwd = std::env::current_dir().unwrap_or_default();
1764 let leaf = cwd
1765 .file_name()
1766 .and_then(|name| name.to_str())
1767 .filter(|name| !name.is_empty())
1768 .unwrap_or("workspace");
1769 format!("harn/{leaf}")
1770}
1771
1772fn now_ms() -> i64 {
1773 clock::now_ms()
1774}
1775
1776#[cfg(test)]
1777mod tests {
1778 use super::*;
1779 use crate::event_log::{install_default_for_base_dir, reset_active_event_log};
1780 use crate::events::{add_event_sink, clear_event_sinks, CollectorSink, EventLevel};
1781 use crate::triggers::test_util::timing::FILE_WATCH_FALLBACK_POLL;
1782 use std::rc::Rc;
1783 use time::OffsetDateTime;
1784
1785 fn manifest_spec(id: &str, fingerprint: &str) -> TriggerBindingSpec {
1786 TriggerBindingSpec {
1787 id: id.to_string(),
1788 source: TriggerBindingSource::Manifest,
1789 kind: "webhook".to_string(),
1790 provider: ProviderId::from("github"),
1791 autonomy_tier: crate::AutonomyTier::ActAuto,
1792 handler: TriggerHandlerSpec::Worker {
1793 queue: format!("{id}-queue"),
1794 },
1795 dispatch_priority: crate::WorkerQueuePriority::Normal,
1796 when: None,
1797 when_budget: None,
1798 retry: TriggerRetryConfig::default(),
1799 match_events: vec!["issues.opened".to_string()],
1800 dedupe_key: Some("event.dedupe_key".to_string()),
1801 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1802 filter: Some("event.kind".to_string()),
1803 daily_cost_usd: Some(5.0),
1804 hourly_cost_usd: None,
1805 max_autonomous_decisions_per_hour: None,
1806 max_autonomous_decisions_per_day: None,
1807 on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1808 max_concurrent: Some(10),
1809 flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1810 aggregation: None,
1811 manifest_path: None,
1812 package_name: Some("workspace".to_string()),
1813 definition_fingerprint: fingerprint.to_string(),
1814 }
1815 }
1816
1817 fn dynamic_spec(id: &str) -> TriggerBindingSpec {
1818 TriggerBindingSpec {
1819 id: id.to_string(),
1820 source: TriggerBindingSource::Dynamic,
1821 kind: "webhook".to_string(),
1822 provider: ProviderId::from("github"),
1823 autonomy_tier: crate::AutonomyTier::ActAuto,
1824 handler: TriggerHandlerSpec::Worker {
1825 queue: format!("{id}-queue"),
1826 },
1827 dispatch_priority: crate::WorkerQueuePriority::Normal,
1828 when: None,
1829 when_budget: None,
1830 retry: TriggerRetryConfig::default(),
1831 match_events: vec!["issues.opened".to_string()],
1832 dedupe_key: None,
1833 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1834 filter: None,
1835 daily_cost_usd: None,
1836 hourly_cost_usd: None,
1837 max_autonomous_decisions_per_hour: None,
1838 max_autonomous_decisions_per_day: None,
1839 on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1840 max_concurrent: None,
1841 flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1842 aggregation: None,
1843 manifest_path: None,
1844 package_name: None,
1845 definition_fingerprint: format!("dynamic:{id}"),
1846 }
1847 }
1848
1849 #[tokio::test(flavor = "current_thread")]
1850 async fn manifest_loaded_trigger_registers_with_zeroed_metrics() {
1851 clear_trigger_registry();
1852
1853 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1854 .await
1855 .expect("manifest trigger installs");
1856
1857 let snapshots = snapshot_trigger_bindings();
1858 assert_eq!(snapshots.len(), 1);
1859 let binding = &snapshots[0];
1860 assert_eq!(binding.id, "github-new-issue");
1861 assert_eq!(binding.version, 1);
1862 assert_eq!(binding.state, TriggerState::Active);
1863 assert_eq!(binding.metrics, TriggerMetricsSnapshot::default());
1864
1865 clear_trigger_registry();
1866 }
1867
1868 #[tokio::test(flavor = "current_thread")]
1869 async fn dynamic_register_assigns_unique_ids_and_rejects_duplicates() {
1870 clear_trigger_registry();
1871
1872 let first = dynamic_register(dynamic_spec("dynamic-a"))
1873 .await
1874 .expect("first dynamic trigger");
1875 let second = dynamic_register(dynamic_spec("dynamic-b"))
1876 .await
1877 .expect("second dynamic trigger");
1878 assert_ne!(first, second);
1879
1880 let error = dynamic_register(dynamic_spec("dynamic-a"))
1881 .await
1882 .expect_err("duplicate id should fail");
1883 assert!(matches!(error, TriggerRegistryError::DuplicateId(_)));
1884
1885 clear_trigger_registry();
1886 }
1887
1888 #[tokio::test(flavor = "current_thread")]
1889 async fn drain_waits_for_in_flight_events_before_terminating() {
1890 clear_trigger_registry();
1891
1892 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1893 .await
1894 .expect("manifest trigger installs");
1895 begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1896
1897 drain("github-new-issue").await.expect("drain succeeds");
1898 let binding = snapshot_trigger_bindings()
1899 .into_iter()
1900 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1901 .expect("binding snapshot");
1902 assert_eq!(binding.state, TriggerState::Draining);
1903 assert_eq!(binding.metrics.in_flight, 1);
1904
1905 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1906 .await
1907 .expect("finish in-flight event");
1908 let binding = snapshot_trigger_bindings()
1909 .into_iter()
1910 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1911 .expect("binding snapshot");
1912 assert_eq!(binding.state, TriggerState::Terminated);
1913 assert_eq!(binding.metrics.in_flight, 0);
1914
1915 clear_trigger_registry();
1916 }
1917
1918 #[tokio::test(flavor = "current_thread")]
1919 async fn hot_reload_registers_new_version_while_old_binding_drains() {
1920 clear_trigger_registry();
1921
1922 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1923 .await
1924 .expect("initial manifest trigger installs");
1925 begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1926
1927 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1928 .await
1929 .expect("updated manifest trigger installs");
1930
1931 let snapshots = snapshot_trigger_bindings();
1932 assert_eq!(snapshots.len(), 2);
1933 let old = snapshots
1934 .iter()
1935 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1936 .expect("old binding");
1937 let new = snapshots
1938 .iter()
1939 .find(|binding| binding.id == "github-new-issue" && binding.version == 2)
1940 .expect("new binding");
1941 assert_eq!(old.state, TriggerState::Draining);
1942 assert_eq!(new.state, TriggerState::Active);
1943
1944 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1945 .await
1946 .expect("finish old in-flight event");
1947 let old = snapshot_trigger_bindings()
1948 .into_iter()
1949 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1950 .expect("old binding");
1951 assert_eq!(old.state, TriggerState::Terminated);
1952
1953 clear_trigger_registry();
1954 }
1955
1956 #[tokio::test(flavor = "current_thread")]
1957 async fn gc_drops_terminated_versions_beyond_retention_limit() {
1958 clear_trigger_registry();
1959
1960 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1961 .await
1962 .expect("install v1");
1963 begin_in_flight("github-new-issue", 1).expect("pin v1");
1964
1965 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1966 .await
1967 .expect("install v2");
1968 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1969 .await
1970 .expect("finish v1");
1971
1972 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
1973 .await
1974 .expect("install v3");
1975
1976 let snapshots = snapshot_trigger_bindings();
1977 let versions: Vec<u32> = snapshots
1978 .into_iter()
1979 .filter(|binding| binding.id == "github-new-issue")
1980 .map(|binding| binding.version)
1981 .collect();
1982 assert_eq!(versions, vec![2, 3]);
1983
1984 clear_trigger_registry();
1985 }
1986
1987 #[tokio::test(flavor = "current_thread")]
1988 async fn lifecycle_transitions_append_to_event_log() {
1989 clear_trigger_registry();
1990 reset_active_event_log();
1991 let tempdir = tempfile::tempdir().expect("tempdir");
1992 let log = install_default_for_base_dir(tempdir.path()).expect("install event log");
1993
1994 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1995 .await
1996 .expect("manifest trigger installs");
1997 begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1998 drain("github-new-issue").await.expect("drain succeeds");
1999 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
2000 .await
2001 .expect("finish event");
2002
2003 let topic = Topic::new("triggers.lifecycle").expect("valid lifecycle topic");
2004 let events = log
2005 .read_range(&topic, None, 32)
2006 .await
2007 .expect("read lifecycle events");
2008 let states: Vec<String> = events
2009 .into_iter()
2010 .filter_map(|(_, event)| {
2011 event
2012 .payload
2013 .get("to_state")
2014 .and_then(|value| value.as_str())
2015 .map(|value| value.to_string())
2016 })
2017 .collect();
2018 assert_eq!(
2019 states,
2020 vec![
2021 "registering".to_string(),
2022 "active".to_string(),
2023 "draining".to_string(),
2024 "terminated".to_string(),
2025 ]
2026 );
2027
2028 reset_active_event_log();
2029 clear_trigger_registry();
2030 }
2031
2032 #[tokio::test(flavor = "current_thread")]
2033 async fn version_history_reuses_historical_version_after_restart() {
2034 clear_trigger_registry();
2035 reset_active_event_log();
2036 let tempdir = tempfile::tempdir().expect("tempdir");
2037 install_default_for_base_dir(tempdir.path()).expect("install event log");
2038
2039 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2040 .await
2041 .expect("initial manifest trigger installs");
2042 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2043 .await
2044 .expect("updated manifest trigger installs");
2045
2046 clear_trigger_registry();
2047 reset_active_event_log();
2048 install_default_for_base_dir(tempdir.path()).expect("reopen event log");
2049
2050 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2051 .await
2052 .expect("manifest reload reuses historical version");
2053
2054 let binding = snapshot_trigger_bindings()
2055 .into_iter()
2056 .find(|binding| binding.id == "github-new-issue")
2057 .expect("binding snapshot");
2058 assert_eq!(binding.version, 2);
2059
2060 reset_active_event_log();
2061 clear_trigger_registry();
2062 }
2063
2064 #[tokio::test(flavor = "current_thread")]
2065 async fn binding_version_as_of_reports_historical_active_version() {
2066 clear_trigger_registry();
2067 reset_active_event_log();
2068 let tempdir = tempfile::tempdir().expect("tempdir");
2069 install_default_for_base_dir(tempdir.path()).expect("install event log");
2070
2071 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2072 .await
2073 .expect("initial manifest trigger installs");
2074 let before_reload = OffsetDateTime::now_utc();
2075 std::thread::sleep(FILE_WATCH_FALLBACK_POLL);
2076
2077 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2078 .await
2079 .expect("updated manifest trigger installs");
2080 let after_reload = OffsetDateTime::now_utc();
2081
2082 assert_eq!(
2083 binding_version_as_of("github-new-issue", before_reload)
2084 .expect("version before reload"),
2085 1
2086 );
2087 assert_eq!(
2088 binding_version_as_of("github-new-issue", after_reload).expect("version after reload"),
2089 2
2090 );
2091
2092 reset_active_event_log();
2093 clear_trigger_registry();
2094 }
2095
2096 #[tokio::test(flavor = "current_thread")]
2097 async fn resolve_live_or_as_of_logs_structured_gc_fallback() {
2098 clear_trigger_registry();
2099 reset_active_event_log();
2100 let sink = Rc::new(CollectorSink::new());
2101 clear_event_sinks();
2102 add_event_sink(sink.clone());
2103 let tempdir = tempfile::tempdir().expect("tempdir");
2104 install_default_for_base_dir(tempdir.path()).expect("install event log");
2105
2106 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2107 .await
2108 .expect("install v1");
2109 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2110 .await
2111 .expect("install v2");
2112 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
2113 .await
2114 .expect("install v3");
2115 let received_at = OffsetDateTime::now_utc();
2116 std::thread::sleep(FILE_WATCH_FALLBACK_POLL);
2117 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v4")])
2118 .await
2119 .expect("install v4");
2120
2121 let binding = resolve_live_or_as_of(
2122 "github-new-issue",
2123 RecordedTriggerBinding {
2124 version: 1,
2125 received_at,
2126 },
2127 )
2128 .expect("resolve fallback binding");
2129 assert_eq!(binding.version, 3);
2130
2131 let warning = sink
2132 .logs
2133 .borrow()
2134 .iter()
2135 .find(|log| log.category == "replay.binding_version_gc_fallback")
2136 .cloned()
2137 .expect("gc fallback warning");
2138 assert_eq!(warning.level, EventLevel::Warn);
2139 assert_eq!(
2140 warning.metadata.get("trigger_id"),
2141 Some(&serde_json::json!("github-new-issue"))
2142 );
2143 assert_eq!(
2144 warning.metadata.get("recorded_version"),
2145 Some(&serde_json::json!(1))
2146 );
2147 assert_eq!(
2148 warning.metadata.get("received_at"),
2149 Some(&serde_json::json!(received_at
2150 .format(&time::format_description::well_known::Rfc3339)
2151 .unwrap_or_else(|_| received_at.to_string())))
2152 );
2153 assert_eq!(
2154 warning.metadata.get("resolved_version"),
2155 Some(&serde_json::json!(3))
2156 );
2157
2158 clear_event_sinks();
2159 crate::events::reset_event_sinks();
2160 reset_active_event_log();
2161 clear_trigger_registry();
2162 }
2163}