1use serde::{Deserialize, Serialize};
2use std::cell::RefCell;
3use std::collections::{BTreeMap, BTreeSet, VecDeque};
4use std::path::PathBuf;
5use std::rc::Rc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::{Arc, Mutex};
8
9use time::OffsetDateTime;
10use uuid::Uuid;
11
12use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogEvent, Topic};
13use crate::llm::trigger_predicate::TriggerPredicateBudget;
14use crate::secrets::{configured_default_chain, SecretProvider};
15use crate::triggers::test_util::clock;
16use crate::trust_graph::AutonomyTier;
17use crate::value::VmClosure;
18
19use super::aggregation::TriggerAggregationConfig;
20use super::dispatcher::TriggerRetryConfig;
21use super::flow_control::TriggerFlowControlConfig;
22use super::ProviderId;
23
24#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
25pub struct TriggerId(String);
26
27impl TriggerId {
28 pub fn new(value: impl Into<String>) -> Self {
29 Self(value.into())
30 }
31
32 pub fn as_str(&self) -> &str {
33 &self.0
34 }
35}
36
37impl std::fmt::Display for TriggerId {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 self.0.fmt(f)
40 }
41}
42
43#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
44#[serde(rename_all = "snake_case")]
45pub enum TriggerState {
46 Registering,
47 Active,
48 Paused,
49 Draining,
50 Terminated,
51}
52
53impl TriggerState {
54 pub fn as_str(self) -> &'static str {
55 match self {
56 Self::Registering => "registering",
57 Self::Active => "active",
58 Self::Paused => "paused",
59 Self::Draining => "draining",
60 Self::Terminated => "terminated",
61 }
62 }
63}
64
65#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
66#[serde(rename_all = "snake_case")]
67pub enum TriggerBindingSource {
68 Manifest,
69 Dynamic,
70}
71
72impl TriggerBindingSource {
73 pub fn as_str(self) -> &'static str {
74 match self {
75 Self::Manifest => "manifest",
76 Self::Dynamic => "dynamic",
77 }
78 }
79}
80
81#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
82#[serde(rename_all = "snake_case")]
83pub enum TriggerBudgetExhaustionStrategy {
84 #[default]
85 False,
86 RetryLater,
87 Fail,
88 Warn,
89}
90
91impl TriggerBudgetExhaustionStrategy {
92 pub fn as_str(self) -> &'static str {
93 match self {
94 Self::False => "false",
95 Self::RetryLater => "retry_later",
96 Self::Fail => "fail",
97 Self::Warn => "warn",
98 }
99 }
100}
101
102#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
103pub struct OrchestratorBudgetConfig {
104 pub daily_cost_usd: Option<f64>,
105 pub hourly_cost_usd: Option<f64>,
106}
107
108#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
109pub struct OrchestratorBudgetSnapshot {
110 pub daily_cost_usd: Option<f64>,
111 pub hourly_cost_usd: Option<f64>,
112 pub cost_today_usd_micros: u64,
113 pub cost_hour_usd_micros: u64,
114 pub day_utc: i32,
115 pub hour_utc: i64,
116}
117
118#[derive(Debug)]
119struct OrchestratorBudgetState {
120 config: OrchestratorBudgetConfig,
121 day_utc: i32,
122 hour_utc: i64,
123 cost_today_usd_micros: u64,
124 cost_hour_usd_micros: u64,
125}
126
127impl Default for OrchestratorBudgetState {
128 fn default() -> Self {
129 Self {
130 config: OrchestratorBudgetConfig::default(),
131 day_utc: utc_day_key(),
132 hour_utc: utc_hour_key(),
133 cost_today_usd_micros: 0,
134 cost_hour_usd_micros: 0,
135 }
136 }
137}
138
139#[derive(Clone)]
140pub enum TriggerHandlerSpec {
141 Local {
142 raw: String,
143 closure: Rc<VmClosure>,
144 },
145 A2a {
146 target: String,
147 allow_cleartext: bool,
148 },
149 Worker {
150 queue: String,
151 },
152 Persona {
153 binding: crate::PersonaRuntimeBinding,
154 },
155 AutoResume {
156 worker_id: String,
157 },
158 SpawnToPool {
163 pool: String,
164 priority_from: Option<String>,
165 key_from: Option<String>,
166 task_factory: Rc<VmClosure>,
167 },
168 ReminderInject {
177 target: TargetExpr,
178 body: String,
179 tags: Vec<String>,
180 ttl_turns: Option<i64>,
181 dedupe_key: Option<String>,
182 propagate: crate::llm::helpers::ReminderPropagate,
183 role_hint: crate::llm::helpers::ReminderRoleHint,
184 preserve_on_compact: bool,
185 },
186 InterruptAndSuspend {
199 target_agents: AgentScope,
200 reason: String,
201 },
202}
203
204#[derive(Clone)]
212pub enum AgentScope {
213 All,
214 Concrete(Vec<String>),
215 Closure(Rc<VmClosure>),
216}
217
218impl AgentScope {
219 pub fn kind(&self) -> &'static str {
220 match self {
221 Self::All => "all",
222 Self::Concrete(_) => "concrete",
223 Self::Closure(_) => "closure",
224 }
225 }
226}
227
228impl std::fmt::Debug for AgentScope {
229 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
230 match self {
231 Self::All => f.write_str("All"),
232 Self::Concrete(ids) => f.debug_tuple("Concrete").field(ids).finish(),
233 Self::Closure(_) => f.write_str("Closure(<vm_closure>)"),
234 }
235 }
236}
237
238#[derive(Clone)]
245pub enum TargetExpr {
246 Current,
247 Parent,
248 Concrete(String),
249 Closure(Rc<VmClosure>),
250}
251
252impl TargetExpr {
253 pub fn kind(&self) -> &'static str {
254 match self {
255 Self::Current => "current",
256 Self::Parent => "parent",
257 Self::Concrete(_) => "concrete",
258 Self::Closure(_) => "closure",
259 }
260 }
261}
262
263impl std::fmt::Debug for TargetExpr {
264 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
265 match self {
266 Self::Current => f.write_str("Current"),
267 Self::Parent => f.write_str("Parent"),
268 Self::Concrete(id) => f.debug_tuple("Concrete").field(id).finish(),
269 Self::Closure(_) => f.write_str("Closure(<vm_closure>)"),
270 }
271 }
272}
273
274impl std::fmt::Debug for TriggerHandlerSpec {
275 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276 match self {
277 Self::Local { raw, .. } => f.debug_struct("Local").field("raw", raw).finish(),
278 Self::A2a {
279 target,
280 allow_cleartext,
281 } => f
282 .debug_struct("A2a")
283 .field("target", target)
284 .field("allow_cleartext", allow_cleartext)
285 .finish(),
286 Self::Worker { queue } => f.debug_struct("Worker").field("queue", queue).finish(),
287 Self::Persona { binding } => f
288 .debug_struct("Persona")
289 .field("name", &binding.name)
290 .finish(),
291 Self::AutoResume { worker_id } => f
292 .debug_struct("AutoResume")
293 .field("worker_id", worker_id)
294 .finish(),
295 Self::SpawnToPool {
296 pool,
297 priority_from,
298 key_from,
299 ..
300 } => f
301 .debug_struct("SpawnToPool")
302 .field("pool", pool)
303 .field("priority_from", priority_from)
304 .field("key_from", key_from)
305 .finish(),
306 Self::ReminderInject {
307 target,
308 body,
309 tags,
310 ttl_turns,
311 dedupe_key,
312 propagate,
313 role_hint,
314 preserve_on_compact,
315 } => f
316 .debug_struct("ReminderInject")
317 .field("target", target)
318 .field("body", body)
319 .field("tags", tags)
320 .field("ttl_turns", ttl_turns)
321 .field("dedupe_key", dedupe_key)
322 .field("propagate", propagate)
323 .field("role_hint", role_hint)
324 .field("preserve_on_compact", preserve_on_compact)
325 .finish(),
326 Self::InterruptAndSuspend {
327 target_agents,
328 reason,
329 } => f
330 .debug_struct("InterruptAndSuspend")
331 .field("target_agents", target_agents)
332 .field("reason", reason)
333 .finish(),
334 }
335 }
336}
337
338impl TriggerHandlerSpec {
339 pub fn kind(&self) -> &'static str {
340 match self {
341 Self::Local { .. } => "local",
342 Self::A2a { .. } => "a2a",
343 Self::Worker { .. } => "worker",
344 Self::Persona { .. } => "persona",
345 Self::AutoResume { .. } => "auto_resume",
346 Self::SpawnToPool { .. } => "spawn_to_pool",
347 Self::ReminderInject { .. } => "reminder_inject",
348 Self::InterruptAndSuspend { .. } => "interrupt_and_suspend",
349 }
350 }
351}
352
353#[derive(Clone)]
354pub struct TriggerPredicateSpec {
355 pub raw: String,
356 pub closure: Rc<VmClosure>,
357}
358
359impl std::fmt::Debug for TriggerPredicateSpec {
360 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
361 f.debug_struct("TriggerPredicateSpec")
362 .field("raw", &self.raw)
363 .finish()
364 }
365}
366
367#[derive(Clone, Debug)]
368pub struct TriggerBindingSpec {
369 pub id: String,
370 pub source: TriggerBindingSource,
371 pub kind: String,
372 pub provider: ProviderId,
373 pub autonomy_tier: AutonomyTier,
374 pub handler: TriggerHandlerSpec,
375 pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
376 pub when: Option<TriggerPredicateSpec>,
377 pub when_budget: Option<TriggerPredicateBudget>,
378 pub retry: TriggerRetryConfig,
379 pub match_events: Vec<String>,
380 pub dedupe_key: Option<String>,
381 pub dedupe_retention_days: u32,
382 pub filter: Option<String>,
383 pub daily_cost_usd: Option<f64>,
384 pub hourly_cost_usd: Option<f64>,
385 pub max_autonomous_decisions_per_hour: Option<u64>,
386 pub max_autonomous_decisions_per_day: Option<u64>,
387 pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
388 pub max_concurrent: Option<u32>,
389 pub flow_control: TriggerFlowControlConfig,
390 pub aggregation: Option<TriggerAggregationConfig>,
395 pub manifest_path: Option<PathBuf>,
396 pub package_name: Option<String>,
397 pub definition_fingerprint: String,
398}
399
400#[derive(Debug)]
401pub struct TriggerMetrics {
402 pub received: AtomicU64,
403 pub dispatched: AtomicU64,
404 pub failed: AtomicU64,
405 pub dlq: AtomicU64,
406 pub last_received_ms: Mutex<Option<i64>>,
407 pub cost_total_usd_micros: AtomicU64,
408 pub cost_today_usd_micros: AtomicU64,
409 pub cost_hour_usd_micros: AtomicU64,
410 pub autonomous_decisions_total: AtomicU64,
411 pub autonomous_decisions_today: AtomicU64,
412 pub autonomous_decisions_hour: AtomicU64,
413}
414
415impl Default for TriggerMetrics {
416 fn default() -> Self {
417 Self {
418 received: AtomicU64::new(0),
419 dispatched: AtomicU64::new(0),
420 failed: AtomicU64::new(0),
421 dlq: AtomicU64::new(0),
422 last_received_ms: Mutex::new(None),
423 cost_total_usd_micros: AtomicU64::new(0),
424 cost_today_usd_micros: AtomicU64::new(0),
425 cost_hour_usd_micros: AtomicU64::new(0),
426 autonomous_decisions_total: AtomicU64::new(0),
427 autonomous_decisions_today: AtomicU64::new(0),
428 autonomous_decisions_hour: AtomicU64::new(0),
429 }
430 }
431}
432
433#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
434pub struct TriggerMetricsSnapshot {
435 pub received: u64,
436 pub dispatched: u64,
437 pub failed: u64,
438 pub dlq: u64,
439 pub in_flight: u64,
440 pub last_received_ms: Option<i64>,
441 pub cost_total_usd_micros: u64,
442 pub cost_today_usd_micros: u64,
443 pub cost_hour_usd_micros: u64,
444 pub autonomous_decisions_total: u64,
445 pub autonomous_decisions_today: u64,
446 pub autonomous_decisions_hour: u64,
447}
448
449pub struct TriggerBinding {
450 pub id: TriggerId,
451 pub version: u32,
452 pub source: TriggerBindingSource,
453 pub kind: String,
454 pub provider: ProviderId,
455 pub autonomy_tier: AutonomyTier,
456 pub handler: TriggerHandlerSpec,
457 pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
458 pub when: Option<TriggerPredicateSpec>,
459 pub when_budget: Option<TriggerPredicateBudget>,
460 pub retry: TriggerRetryConfig,
461 pub match_events: Vec<String>,
462 pub dedupe_key: Option<String>,
463 pub dedupe_retention_days: u32,
464 pub filter: Option<String>,
465 pub daily_cost_usd: Option<f64>,
466 pub hourly_cost_usd: Option<f64>,
467 pub max_autonomous_decisions_per_hour: Option<u64>,
468 pub max_autonomous_decisions_per_day: Option<u64>,
469 pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
470 pub max_concurrent: Option<u32>,
471 pub flow_control: TriggerFlowControlConfig,
472 pub aggregation: Option<TriggerAggregationConfig>,
476 pub manifest_path: Option<PathBuf>,
477 pub package_name: Option<String>,
478 pub definition_fingerprint: String,
479 pub state: Mutex<TriggerState>,
480 pub metrics: TriggerMetrics,
481 pub in_flight: AtomicU64,
482 pub cancel_token: Arc<AtomicBool>,
483 pub predicate_state: Mutex<TriggerPredicateState>,
484}
485
486#[derive(Clone, Debug, Default)]
487pub struct TriggerPredicateState {
488 pub budget_day_utc: Option<i32>,
489 pub budget_hour_utc: Option<i64>,
490 pub consecutive_failures: u32,
491 pub breaker_open_until_ms: Option<i64>,
492 pub recent_cost_usd_micros: VecDeque<u64>,
493}
494
495impl std::fmt::Debug for TriggerBinding {
496 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
497 f.debug_struct("TriggerBinding")
498 .field("id", &self.id)
499 .field("version", &self.version)
500 .field("source", &self.source)
501 .field("kind", &self.kind)
502 .field("provider", &self.provider)
503 .field("handler_kind", &self.handler.kind())
504 .field("state", &self.state_snapshot())
505 .finish()
506 }
507}
508
509impl TriggerBinding {
510 pub fn snapshot(&self) -> TriggerBindingSnapshot {
511 TriggerBindingSnapshot {
512 id: self.id.as_str().to_string(),
513 version: self.version,
514 source: self.source,
515 kind: self.kind.clone(),
516 provider: self.provider.as_str().to_string(),
517 autonomy_tier: self.autonomy_tier,
518 handler_kind: self.handler.kind().to_string(),
519 state: self.state_snapshot(),
520 metrics: self.metrics_snapshot(),
521 daily_cost_usd: self.daily_cost_usd,
522 hourly_cost_usd: self.hourly_cost_usd,
523 max_autonomous_decisions_per_hour: self.max_autonomous_decisions_per_hour,
524 max_autonomous_decisions_per_day: self.max_autonomous_decisions_per_day,
525 on_budget_exhausted: self.on_budget_exhausted,
526 }
527 }
528
529 fn new(spec: TriggerBindingSpec, version: u32) -> Self {
530 Self {
531 id: TriggerId::new(spec.id),
532 version,
533 source: spec.source,
534 kind: spec.kind,
535 provider: spec.provider,
536 autonomy_tier: spec.autonomy_tier,
537 handler: spec.handler,
538 dispatch_priority: spec.dispatch_priority,
539 when: spec.when,
540 when_budget: spec.when_budget,
541 retry: spec.retry,
542 match_events: spec.match_events,
543 dedupe_key: spec.dedupe_key,
544 dedupe_retention_days: spec.dedupe_retention_days,
545 filter: spec.filter,
546 daily_cost_usd: spec.daily_cost_usd,
547 hourly_cost_usd: spec.hourly_cost_usd,
548 max_autonomous_decisions_per_hour: spec.max_autonomous_decisions_per_hour,
549 max_autonomous_decisions_per_day: spec.max_autonomous_decisions_per_day,
550 on_budget_exhausted: spec.on_budget_exhausted,
551 max_concurrent: spec.max_concurrent,
552 flow_control: spec.flow_control,
553 aggregation: spec.aggregation,
554 manifest_path: spec.manifest_path,
555 package_name: spec.package_name,
556 definition_fingerprint: spec.definition_fingerprint,
557 state: Mutex::new(TriggerState::Registering),
558 metrics: TriggerMetrics::default(),
559 in_flight: AtomicU64::new(0),
560 cancel_token: Arc::new(AtomicBool::new(false)),
561 predicate_state: Mutex::new(TriggerPredicateState::default()),
562 }
563 }
564
565 pub fn binding_key(&self) -> String {
566 format!("{}@v{}", self.id.as_str(), self.version)
567 }
568
569 pub fn state_snapshot(&self) -> TriggerState {
570 *self.state.lock().expect("trigger state poisoned")
571 }
572
573 pub fn metrics_snapshot(&self) -> TriggerMetricsSnapshot {
574 TriggerMetricsSnapshot {
575 received: self.metrics.received.load(Ordering::Relaxed),
576 dispatched: self.metrics.dispatched.load(Ordering::Relaxed),
577 failed: self.metrics.failed.load(Ordering::Relaxed),
578 dlq: self.metrics.dlq.load(Ordering::Relaxed),
579 in_flight: self.in_flight.load(Ordering::Relaxed),
580 last_received_ms: *self
581 .metrics
582 .last_received_ms
583 .lock()
584 .expect("trigger metrics poisoned"),
585 cost_total_usd_micros: self.metrics.cost_total_usd_micros.load(Ordering::Relaxed),
586 cost_today_usd_micros: self.metrics.cost_today_usd_micros.load(Ordering::Relaxed),
587 cost_hour_usd_micros: self.metrics.cost_hour_usd_micros.load(Ordering::Relaxed),
588 autonomous_decisions_total: self
589 .metrics
590 .autonomous_decisions_total
591 .load(Ordering::Relaxed),
592 autonomous_decisions_today: self
593 .metrics
594 .autonomous_decisions_today
595 .load(Ordering::Relaxed),
596 autonomous_decisions_hour: self
597 .metrics
598 .autonomous_decisions_hour
599 .load(Ordering::Relaxed),
600 }
601 }
602}
603
604#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
605pub struct TriggerBindingSnapshot {
606 pub id: String,
607 pub version: u32,
608 pub source: TriggerBindingSource,
609 pub kind: String,
610 pub provider: String,
611 pub autonomy_tier: AutonomyTier,
612 pub handler_kind: String,
613 pub state: TriggerState,
614 pub metrics: TriggerMetricsSnapshot,
615 pub daily_cost_usd: Option<f64>,
616 pub hourly_cost_usd: Option<f64>,
617 pub max_autonomous_decisions_per_hour: Option<u64>,
618 pub max_autonomous_decisions_per_day: Option<u64>,
619 pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
620}
621
622#[derive(Clone, Copy, Debug, PartialEq, Eq)]
623pub enum TriggerDispatchOutcome {
624 Dispatched,
625 Failed,
626 Dlq,
627}
628
629#[derive(Debug)]
630pub enum TriggerRegistryError {
631 DuplicateId(String),
632 InvalidSpec(String),
633 UnknownId(String),
634 UnknownBindingVersion { id: String, version: u32 },
635 EventLog(String),
636}
637
638impl std::fmt::Display for TriggerRegistryError {
639 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
640 match self {
641 Self::DuplicateId(id) => write!(f, "duplicate trigger id '{id}'"),
642 Self::InvalidSpec(message) | Self::EventLog(message) => f.write_str(message),
643 Self::UnknownId(id) => write!(f, "unknown trigger id '{id}'"),
644 Self::UnknownBindingVersion { id, version } => {
645 write!(f, "unknown trigger binding '{id}' version {version}")
646 }
647 }
648 }
649}
650
651impl std::error::Error for TriggerRegistryError {}
652
653#[derive(Default)]
654pub struct TriggerRegistry {
655 bindings: BTreeMap<String, Vec<Arc<TriggerBinding>>>,
656 by_provider: BTreeMap<String, BTreeSet<String>>,
657 event_log: Option<Arc<AnyEventLog>>,
658 secret_provider: Option<Arc<dyn SecretProvider>>,
659}
660
661thread_local! {
662 static TRIGGER_REGISTRY: RefCell<TriggerRegistry> = RefCell::new(TriggerRegistry::default());
663}
664
665thread_local! {
666 static ORCHESTRATOR_BUDGET: RefCell<OrchestratorBudgetState> =
667 RefCell::new(OrchestratorBudgetState::default());
668}
669
670const TERMINATED_VERSION_RETENTION_LIMIT: usize = 2;
671
672const TRIGGERS_LIFECYCLE_TOPIC: &str = "triggers.lifecycle";
673const PREDICATE_COST_WINDOW: usize = 100;
674
675#[derive(Clone, Debug, Deserialize)]
676struct LifecycleStateTransitionRecord {
677 id: String,
678 version: u32,
679 #[serde(default)]
680 definition_fingerprint: Option<String>,
681 to_state: TriggerState,
682}
683
684#[derive(Clone, Debug)]
685struct HistoricalLifecycleRecord {
686 occurred_at_ms: i64,
687 transition: LifecycleStateTransitionRecord,
688}
689
690#[derive(Clone, Copy, Debug, PartialEq, Eq)]
691pub struct RecordedTriggerBinding {
692 pub version: u32,
693 pub received_at: OffsetDateTime,
694}
695
696#[derive(Clone, Copy, Debug, Default)]
697struct HistoricalVersionLookup {
698 matching_version: Option<u32>,
699 max_version: Option<u32>,
700}
701
702pub fn clear_trigger_registry() {
703 TRIGGER_REGISTRY.with(|slot| {
704 *slot.borrow_mut() = TriggerRegistry::default();
705 });
706 clear_orchestrator_budget();
707 super::aggregation::clear_aggregation_state();
708}
709
710pub fn install_orchestrator_budget(config: OrchestratorBudgetConfig) {
711 ORCHESTRATOR_BUDGET.with(|slot| {
712 let mut state = slot.borrow_mut();
713 rollover_orchestrator_budget(&mut state);
714 state.config = config;
715 });
716}
717
718pub fn clear_orchestrator_budget() {
719 ORCHESTRATOR_BUDGET.with(|slot| {
720 *slot.borrow_mut() = OrchestratorBudgetState::default();
721 });
722}
723
724pub fn snapshot_orchestrator_budget() -> OrchestratorBudgetSnapshot {
725 ORCHESTRATOR_BUDGET.with(|slot| {
726 let mut state = slot.borrow_mut();
727 rollover_orchestrator_budget(&mut state);
728 OrchestratorBudgetSnapshot {
729 daily_cost_usd: state.config.daily_cost_usd,
730 hourly_cost_usd: state.config.hourly_cost_usd,
731 cost_today_usd_micros: state.cost_today_usd_micros,
732 cost_hour_usd_micros: state.cost_hour_usd_micros,
733 day_utc: state.day_utc,
734 hour_utc: state.hour_utc,
735 }
736 })
737}
738
739pub fn note_orchestrator_budget_cost(cost_usd_micros: u64) {
740 if cost_usd_micros == 0 {
741 return;
742 }
743 ORCHESTRATOR_BUDGET.with(|slot| {
744 let mut state = slot.borrow_mut();
745 rollover_orchestrator_budget(&mut state);
746 state.cost_today_usd_micros = state.cost_today_usd_micros.saturating_add(cost_usd_micros);
747 state.cost_hour_usd_micros = state.cost_hour_usd_micros.saturating_add(cost_usd_micros);
748 });
749}
750
751pub fn orchestrator_budget_would_exceed(expected_cost_usd_micros: u64) -> Option<&'static str> {
752 ORCHESTRATOR_BUDGET.with(|slot| {
753 let mut state = slot.borrow_mut();
754 rollover_orchestrator_budget(&mut state);
755 if state.config.hourly_cost_usd.is_some_and(|limit| {
756 micros_to_usd(
757 state
758 .cost_hour_usd_micros
759 .saturating_add(expected_cost_usd_micros),
760 ) > limit
761 }) {
762 return Some("orchestrator_hourly_budget_exceeded");
763 }
764 if state.config.daily_cost_usd.is_some_and(|limit| {
765 micros_to_usd(
766 state
767 .cost_today_usd_micros
768 .saturating_add(expected_cost_usd_micros),
769 ) > limit
770 }) {
771 return Some("orchestrator_daily_budget_exceeded");
772 }
773 None
774 })
775}
776
777pub fn reset_binding_budget_windows(binding: &TriggerBinding) {
778 let today = utc_day_key();
779 let hour = utc_hour_key();
780 let mut state = binding
781 .predicate_state
782 .lock()
783 .expect("trigger predicate state poisoned");
784 if state.budget_day_utc != Some(today) {
785 state.budget_day_utc = Some(today);
786 binding
787 .metrics
788 .cost_today_usd_micros
789 .store(0, Ordering::Relaxed);
790 binding
791 .metrics
792 .autonomous_decisions_today
793 .store(0, Ordering::Relaxed);
794 }
795 if state.budget_hour_utc != Some(hour) {
796 state.budget_hour_utc = Some(hour);
797 binding
798 .metrics
799 .cost_hour_usd_micros
800 .store(0, Ordering::Relaxed);
801 binding
802 .metrics
803 .autonomous_decisions_hour
804 .store(0, Ordering::Relaxed);
805 }
806}
807
808pub fn binding_budget_would_exceed(
809 binding: &TriggerBinding,
810 expected_cost_usd_micros: u64,
811) -> Option<&'static str> {
812 reset_binding_budget_windows(binding);
813 if binding.hourly_cost_usd.is_some_and(|limit| {
814 micros_to_usd(
815 binding
816 .metrics
817 .cost_hour_usd_micros
818 .load(Ordering::Relaxed)
819 .saturating_add(expected_cost_usd_micros),
820 ) > limit
821 }) {
822 return Some("hourly_budget_exceeded");
823 }
824 if binding.daily_cost_usd.is_some_and(|limit| {
825 micros_to_usd(
826 binding
827 .metrics
828 .cost_today_usd_micros
829 .load(Ordering::Relaxed)
830 .saturating_add(expected_cost_usd_micros),
831 ) > limit
832 }) {
833 return Some("daily_budget_exceeded");
834 }
835 None
836}
837
838pub fn binding_autonomy_budget_would_exceed(binding: &TriggerBinding) -> Option<&'static str> {
839 reset_binding_budget_windows(binding);
840 if binding
841 .max_autonomous_decisions_per_hour
842 .is_some_and(|limit| {
843 binding
844 .metrics
845 .autonomous_decisions_hour
846 .load(Ordering::Relaxed)
847 .saturating_add(1)
848 > limit
849 })
850 {
851 return Some("hourly_autonomy_budget_exceeded");
852 }
853 if binding
854 .max_autonomous_decisions_per_day
855 .is_some_and(|limit| {
856 binding
857 .metrics
858 .autonomous_decisions_today
859 .load(Ordering::Relaxed)
860 .saturating_add(1)
861 > limit
862 })
863 {
864 return Some("daily_autonomy_budget_exceeded");
865 }
866 None
867}
868
869pub fn note_autonomous_decision(binding: &TriggerBinding) {
870 reset_binding_budget_windows(binding);
871 binding
872 .metrics
873 .autonomous_decisions_total
874 .fetch_add(1, Ordering::Relaxed);
875 binding
876 .metrics
877 .autonomous_decisions_today
878 .fetch_add(1, Ordering::Relaxed);
879 binding
880 .metrics
881 .autonomous_decisions_hour
882 .fetch_add(1, Ordering::Relaxed);
883}
884
885pub fn expected_predicate_cost_usd_micros(binding: &TriggerBinding) -> u64 {
886 let state = binding
887 .predicate_state
888 .lock()
889 .expect("trigger predicate state poisoned");
890 if !state.recent_cost_usd_micros.is_empty() {
891 let total: u64 = state.recent_cost_usd_micros.iter().copied().sum();
892 return total / state.recent_cost_usd_micros.len() as u64;
893 }
894 binding
895 .when_budget
896 .as_ref()
897 .and_then(|budget| budget.max_cost_usd)
898 .map(usd_to_micros)
899 .unwrap_or_default()
900}
901
902pub fn record_predicate_cost_sample(binding: &TriggerBinding, cost_usd_micros: u64) {
903 let mut state = binding
904 .predicate_state
905 .lock()
906 .expect("trigger predicate state poisoned");
907 state.recent_cost_usd_micros.push_back(cost_usd_micros);
908 while state.recent_cost_usd_micros.len() > PREDICATE_COST_WINDOW {
909 state.recent_cost_usd_micros.pop_front();
910 }
911}
912
913pub fn usd_to_micros(value: f64) -> u64 {
914 if !value.is_finite() || value <= 0.0 {
915 return 0;
916 }
917 (value * 1_000_000.0).ceil() as u64
918}
919
920pub fn micros_to_usd(value: u64) -> f64 {
921 value as f64 / 1_000_000.0
922}
923
924fn rollover_orchestrator_budget(state: &mut OrchestratorBudgetState) {
925 let today = utc_day_key();
926 let hour = utc_hour_key();
927 if state.day_utc != today {
928 state.day_utc = today;
929 state.cost_today_usd_micros = 0;
930 }
931 if state.hour_utc != hour {
932 state.hour_utc = hour;
933 state.cost_hour_usd_micros = 0;
934 }
935}
936
937fn utc_day_key() -> i32 {
938 (clock::now_utc().date()
939 - time::Date::from_calendar_date(1970, time::Month::January, 1).expect("valid epoch date"))
940 .whole_days() as i32
941}
942
943fn utc_hour_key() -> i64 {
944 clock::now_utc().unix_timestamp() / 3_600
945}
946
947pub fn snapshot_trigger_bindings() -> Vec<TriggerBindingSnapshot> {
948 TRIGGER_REGISTRY.with(|slot| {
949 let registry = slot.borrow();
950 let mut snapshots = Vec::new();
951 for bindings in registry.bindings.values() {
952 for binding in bindings {
953 snapshots.push(binding.snapshot());
954 }
955 }
956 snapshots.sort_by(|left, right| {
957 left.id
958 .cmp(&right.id)
959 .then(left.version.cmp(&right.version))
960 .then(left.state.as_str().cmp(right.state.as_str()))
961 });
962 snapshots
963 })
964}
965
966#[allow(clippy::arc_with_non_send_sync)]
967pub fn resolve_trigger_binding_as_of(
968 id: &str,
969 as_of: OffsetDateTime,
970) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
971 let version = binding_version_as_of(id, as_of)?;
972 resolve_trigger_binding_version(id, version)
973}
974
975#[allow(clippy::arc_with_non_send_sync)]
976pub fn resolve_live_or_as_of(
977 id: &str,
978 recorded: RecordedTriggerBinding,
979) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
980 match resolve_live_trigger_binding(id, Some(recorded.version)) {
981 Ok(binding) => Ok(binding),
982 Err(TriggerRegistryError::UnknownBindingVersion { .. }) => {
983 let binding = resolve_trigger_binding_as_of(id, recorded.received_at)?;
984 let mut metadata = BTreeMap::new();
985 metadata.insert("trigger_id".to_string(), serde_json::json!(id));
986 metadata.insert(
987 "recorded_version".to_string(),
988 serde_json::json!(recorded.version),
989 );
990 metadata.insert(
991 "received_at".to_string(),
992 serde_json::json!(recorded
993 .received_at
994 .format(&time::format_description::well_known::Rfc3339)
995 .unwrap_or_else(|_| recorded.received_at.to_string())),
996 );
997 metadata.insert(
998 "resolved_version".to_string(),
999 serde_json::json!(binding.version),
1000 );
1001 crate::events::log_warn_meta(
1002 "replay.binding_version_gc_fallback",
1003 "trigger replay fell back to lifecycle history after binding version GC",
1004 metadata,
1005 );
1006 Ok(binding)
1007 }
1008 Err(error) => Err(error),
1009 }
1010}
1011
1012pub fn binding_version_as_of(id: &str, as_of: OffsetDateTime) -> Result<u32, TriggerRegistryError> {
1013 TRIGGER_REGISTRY.with(|slot| {
1014 let registry = slot.borrow();
1015 registry.binding_version_as_of(id, as_of)
1016 })
1017}
1018
1019#[allow(clippy::arc_with_non_send_sync)]
1020fn resolve_trigger_binding_version(
1021 id: &str,
1022 version: u32,
1023) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
1024 TRIGGER_REGISTRY.with(|slot| {
1025 let registry = slot.borrow();
1026 registry
1027 .binding(id, version)
1028 .ok_or_else(|| TriggerRegistryError::UnknownBindingVersion {
1029 id: id.to_string(),
1030 version,
1031 })
1032 })
1033}
1034
1035#[allow(clippy::arc_with_non_send_sync)]
1036pub fn resolve_live_trigger_binding(
1037 id: &str,
1038 version: Option<u32>,
1039) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
1040 TRIGGER_REGISTRY.with(|slot| {
1041 let registry = slot.borrow();
1042 if let Some(version) = version {
1043 let binding = registry.binding(id, version).ok_or_else(|| {
1044 TriggerRegistryError::UnknownBindingVersion {
1045 id: id.to_string(),
1046 version,
1047 }
1048 })?;
1049 if binding.state_snapshot() == TriggerState::Terminated {
1050 return Err(TriggerRegistryError::UnknownBindingVersion {
1051 id: id.to_string(),
1052 version,
1053 });
1054 }
1055 return Ok(binding);
1056 }
1057
1058 registry
1059 .live_bindings_any_source(id)
1060 .into_iter()
1061 .max_by_key(|binding| binding.version)
1062 .ok_or_else(|| TriggerRegistryError::UnknownId(id.to_string()))
1063 })
1064}
1065
1066pub(crate) fn channel_bindings_matching(
1072 scope: &str,
1073 scope_id: &str,
1074 name: &str,
1075) -> Vec<Arc<TriggerBinding>> {
1076 TRIGGER_REGISTRY.with(|slot| {
1077 let registry = slot.borrow();
1078 let Some(binding_ids) = registry.by_provider.get("channel") else {
1079 return Vec::new();
1080 };
1081 let mut bindings = Vec::new();
1082 for id in binding_ids {
1083 let Some(versions) = registry.bindings.get(id) else {
1084 continue;
1085 };
1086 for binding in versions {
1087 if binding.state_snapshot() != TriggerState::Active {
1088 continue;
1089 }
1090 let Some(selector_raw) = binding.match_events.first() else {
1091 continue;
1092 };
1093 let Ok(selector) = crate::channels::ChannelSelector::parse(selector_raw) else {
1094 continue;
1095 };
1096 if !selector.matches(scope, scope_id, name, scope_id) {
1100 continue;
1101 }
1102 bindings.push(binding.clone());
1103 }
1104 }
1105 bindings.sort_by(|left, right| {
1106 left.id
1107 .as_str()
1108 .cmp(right.id.as_str())
1109 .then(left.version.cmp(&right.version))
1110 });
1111 bindings
1112 })
1113}
1114
1115pub(crate) fn matching_bindings(event: &super::TriggerEvent) -> Vec<Arc<TriggerBinding>> {
1116 TRIGGER_REGISTRY.with(|slot| {
1117 let registry = slot.borrow();
1118 let Some(binding_ids) = registry.by_provider.get(event.provider.as_str()) else {
1119 return Vec::new();
1120 };
1121
1122 let mut bindings = Vec::new();
1123 for id in binding_ids {
1124 let Some(versions) = registry.bindings.get(id) else {
1125 continue;
1126 };
1127 for binding in versions {
1128 if binding.state_snapshot() != TriggerState::Active {
1129 continue;
1130 }
1131 if !binding.match_events.is_empty()
1132 && !binding.match_events.iter().any(|kind| kind == &event.kind)
1133 {
1134 continue;
1135 }
1136 bindings.push(binding.clone());
1137 }
1138 }
1139
1140 bindings.sort_by(|left, right| {
1141 left.id
1142 .as_str()
1143 .cmp(right.id.as_str())
1144 .then(left.version.cmp(&right.version))
1145 });
1146 bindings
1147 })
1148}
1149
1150pub async fn install_manifest_triggers(
1151 specs: Vec<TriggerBindingSpec>,
1152) -> Result<(), TriggerRegistryError> {
1153 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1154 let registry = &mut *slot.borrow_mut();
1155 registry.refresh_runtime_context();
1156 let mut touched_ids = BTreeSet::new();
1157
1158 let mut incoming = BTreeMap::new();
1159 for spec in specs {
1160 let spec_id = spec.id.clone();
1161 if spec.source != TriggerBindingSource::Manifest {
1162 return Err(TriggerRegistryError::InvalidSpec(format!(
1163 "manifest install received non-manifest trigger '{spec_id}'"
1164 )));
1165 }
1166 if spec_id.trim().is_empty() {
1167 return Err(TriggerRegistryError::InvalidSpec(
1168 "manifest trigger id cannot be empty".to_string(),
1169 ));
1170 }
1171 if incoming.insert(spec_id.clone(), spec).is_some() {
1172 return Err(TriggerRegistryError::DuplicateId(spec_id));
1173 }
1174 }
1175
1176 let mut lifecycle = Vec::new();
1177 let existing_ids: Vec<String> = registry
1178 .bindings
1179 .iter()
1180 .filter(|(_, bindings)| {
1181 bindings.iter().any(|binding| {
1182 binding.source == TriggerBindingSource::Manifest
1183 && binding.state_snapshot() != TriggerState::Terminated
1184 })
1185 })
1186 .map(|(id, _)| id.clone())
1187 .collect();
1188
1189 for id in existing_ids {
1190 let live_manifest = registry.live_bindings(&id, TriggerBindingSource::Manifest);
1191 let Some(spec) = incoming.remove(&id) else {
1192 for binding in live_manifest {
1193 registry.transition_binding_to_draining(&binding, &mut lifecycle);
1194 }
1195 touched_ids.insert(id.clone());
1196 continue;
1197 };
1198
1199 let has_matching_active = live_manifest.iter().any(|binding| {
1200 binding.definition_fingerprint == spec.definition_fingerprint
1201 && matches!(
1202 binding.state_snapshot(),
1203 TriggerState::Registering | TriggerState::Active
1204 )
1205 });
1206 if has_matching_active {
1207 continue;
1208 }
1209
1210 for binding in live_manifest {
1211 registry.transition_binding_to_draining(&binding, &mut lifecycle);
1212 }
1213
1214 let version = registry.next_version_for_spec(&spec);
1215 registry.register_binding(spec, version, &mut lifecycle);
1216 touched_ids.insert(id.clone());
1217 }
1218
1219 for spec in incoming.into_values() {
1220 touched_ids.insert(spec.id.clone());
1221 let version = registry.next_version_for_spec(&spec);
1222 registry.register_binding(spec, version, &mut lifecycle);
1223 }
1224
1225 for id in touched_ids {
1226 registry.gc_terminated_versions(&id);
1227 }
1228
1229 Ok((registry.event_log.clone(), lifecycle))
1230 })?;
1231
1232 append_lifecycle_events(event_log, events).await
1233}
1234
1235pub async fn dynamic_register(
1236 mut spec: TriggerBindingSpec,
1237) -> Result<TriggerId, TriggerRegistryError> {
1238 if spec.id.trim().is_empty() {
1239 spec.id = format!("dynamic_trigger_{}", Uuid::now_v7());
1240 }
1241 spec.source = TriggerBindingSource::Dynamic;
1242 let id = spec.id.clone();
1243 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1244 let registry = &mut *slot.borrow_mut();
1245 registry.refresh_runtime_context();
1246
1247 if registry.bindings.contains_key(id.as_str()) {
1248 return Err(TriggerRegistryError::DuplicateId(id.clone()));
1249 }
1250
1251 let mut lifecycle = Vec::new();
1252 let version = registry.next_version_for_spec(&spec);
1253 registry.register_binding(spec, version, &mut lifecycle);
1254 Ok((registry.event_log.clone(), lifecycle))
1255 })?;
1256
1257 append_lifecycle_events(event_log, events).await?;
1258 Ok(TriggerId::new(id))
1259}
1260
1261pub async fn dynamic_deregister(id: &str) -> Result<(), TriggerRegistryError> {
1262 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1263 let registry = &mut *slot.borrow_mut();
1264 let live_dynamic = registry.live_bindings(id, TriggerBindingSource::Dynamic);
1265 if live_dynamic.is_empty() {
1266 return Err(TriggerRegistryError::UnknownId(id.to_string()));
1267 }
1268
1269 let mut lifecycle = Vec::new();
1270 for binding in live_dynamic {
1271 registry.transition_binding_to_draining(&binding, &mut lifecycle);
1272 }
1273 Ok((registry.event_log.clone(), lifecycle))
1274 })?;
1275
1276 append_lifecycle_events(event_log, events).await
1277}
1278
1279pub async fn drain(id: &str) -> Result<(), TriggerRegistryError> {
1280 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1281 let registry = &mut *slot.borrow_mut();
1282 let live = registry.live_bindings_any_source(id);
1283 if live.is_empty() {
1284 return Err(TriggerRegistryError::UnknownId(id.to_string()));
1285 }
1286
1287 let mut lifecycle = Vec::new();
1288 for binding in live {
1289 registry.transition_binding_to_draining(&binding, &mut lifecycle);
1290 }
1291 Ok((registry.event_log.clone(), lifecycle))
1292 })?;
1293
1294 append_lifecycle_events(event_log, events).await
1295}
1296
1297pub async fn pause(id: &str) -> Result<(), TriggerRegistryError> {
1298 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1299 let registry = &mut *slot.borrow_mut();
1300 let live = registry.live_bindings_any_source(id);
1301 if live.is_empty() {
1302 return Err(TriggerRegistryError::UnknownId(id.to_string()));
1303 }
1304
1305 let mut lifecycle = Vec::new();
1306 for binding in live {
1307 match binding.state_snapshot() {
1308 TriggerState::Registering | TriggerState::Active => {
1309 registry.transition_binding_state(
1310 &binding,
1311 TriggerState::Paused,
1312 &mut lifecycle,
1313 );
1314 }
1315 TriggerState::Paused | TriggerState::Draining | TriggerState::Terminated => {}
1316 }
1317 }
1318 Ok((registry.event_log.clone(), lifecycle))
1319 })?;
1320
1321 append_lifecycle_events(event_log, events).await
1322}
1323
1324pub async fn resume(id: &str) -> Result<(), TriggerRegistryError> {
1325 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1326 let registry = &mut *slot.borrow_mut();
1327 let live = registry.live_bindings_any_source(id);
1328 if live.is_empty() {
1329 return Err(TriggerRegistryError::UnknownId(id.to_string()));
1330 }
1331
1332 let mut lifecycle = Vec::new();
1333 for binding in live {
1334 if binding.state_snapshot() == TriggerState::Paused {
1335 registry.transition_binding_state(&binding, TriggerState::Active, &mut lifecycle);
1336 }
1337 }
1338 Ok((registry.event_log.clone(), lifecycle))
1339 })?;
1340
1341 append_lifecycle_events(event_log, events).await
1342}
1343
1344fn pin_trigger_binding_inner(
1345 id: &str,
1346 version: u32,
1347 allow_terminated: bool,
1348) -> Result<(), TriggerRegistryError> {
1349 TRIGGER_REGISTRY.with(|slot| {
1350 let registry = slot.borrow();
1351 let binding = registry.binding(id, version).ok_or_else(|| {
1352 TriggerRegistryError::UnknownBindingVersion {
1353 id: id.to_string(),
1354 version,
1355 }
1356 })?;
1357 match binding.state_snapshot() {
1358 TriggerState::Paused => Err(TriggerRegistryError::InvalidSpec(format!(
1359 "trigger binding '{id}' version {version} is paused"
1360 ))),
1361 TriggerState::Terminated if !allow_terminated => {
1362 Err(TriggerRegistryError::InvalidSpec(format!(
1363 "trigger binding '{id}' version {version} is terminated"
1364 )))
1365 }
1366 _ => {
1367 binding.in_flight.fetch_add(1, Ordering::Relaxed);
1368 Ok(())
1369 }
1370 }
1371 })
1372}
1373
1374pub fn pin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1375 pin_trigger_binding_inner(id, version, false)
1376}
1377
1378pub async fn unpin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1379 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1380 let registry = &mut *slot.borrow_mut();
1381 let binding = registry.binding(id, version).ok_or_else(|| {
1382 TriggerRegistryError::UnknownBindingVersion {
1383 id: id.to_string(),
1384 version,
1385 }
1386 })?;
1387 let current = binding.in_flight.load(Ordering::Relaxed);
1388 if current == 0 {
1389 return Err(TriggerRegistryError::InvalidSpec(format!(
1390 "trigger binding '{id}' version {version} has no in-flight events"
1391 )));
1392 }
1393 binding.in_flight.fetch_sub(1, Ordering::Relaxed);
1394
1395 let mut lifecycle = Vec::new();
1396 registry.maybe_finalize_draining(&binding, &mut lifecycle);
1397 registry.gc_terminated_versions(binding.id.as_str());
1398 Ok((registry.event_log.clone(), lifecycle))
1399 })?;
1400
1401 append_lifecycle_events(event_log, events).await
1402}
1403
1404pub fn begin_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1405 begin_in_flight_inner(id, version, false)
1406}
1407
1408pub(crate) fn begin_replay_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1409 begin_in_flight_inner(id, version, true)
1410}
1411
1412fn begin_in_flight_inner(
1413 id: &str,
1414 version: u32,
1415 allow_terminated: bool,
1416) -> Result<(), TriggerRegistryError> {
1417 pin_trigger_binding_inner(id, version, allow_terminated)?;
1418 TRIGGER_REGISTRY.with(|slot| {
1419 let registry = slot.borrow();
1420 let binding = registry.binding(id, version).ok_or_else(|| {
1421 TriggerRegistryError::UnknownBindingVersion {
1422 id: id.to_string(),
1423 version,
1424 }
1425 })?;
1426 binding.metrics.received.fetch_add(1, Ordering::Relaxed);
1427 *binding
1428 .metrics
1429 .last_received_ms
1430 .lock()
1431 .expect("trigger metrics poisoned") = Some(now_ms());
1432 Ok(())
1433 })
1434}
1435
1436pub async fn finish_in_flight(
1437 id: &str,
1438 version: u32,
1439 outcome: TriggerDispatchOutcome,
1440) -> Result<(), TriggerRegistryError> {
1441 TRIGGER_REGISTRY.with(|slot| {
1442 let registry = &mut *slot.borrow_mut();
1443 let binding = registry.binding(id, version).ok_or_else(|| {
1444 TriggerRegistryError::UnknownBindingVersion {
1445 id: id.to_string(),
1446 version,
1447 }
1448 })?;
1449 let current = binding.in_flight.load(Ordering::Relaxed);
1450 if current == 0 {
1451 return Err(TriggerRegistryError::InvalidSpec(format!(
1452 "trigger binding '{id}' version {version} has no in-flight events"
1453 )));
1454 }
1455 match outcome {
1456 TriggerDispatchOutcome::Dispatched => {
1457 binding.metrics.dispatched.fetch_add(1, Ordering::Relaxed);
1458 }
1459 TriggerDispatchOutcome::Failed => {
1460 binding.metrics.failed.fetch_add(1, Ordering::Relaxed);
1461 }
1462 TriggerDispatchOutcome::Dlq => {
1463 binding.metrics.dlq.fetch_add(1, Ordering::Relaxed);
1464 }
1465 }
1466 Ok(())
1467 })?;
1468
1469 unpin_trigger_binding(id, version).await
1470}
1471
1472impl TriggerRegistry {
1473 fn refresh_runtime_context(&mut self) {
1474 if self.event_log.is_none() {
1475 self.event_log = active_event_log();
1476 }
1477 if self.secret_provider.is_none() {
1478 self.secret_provider = default_secret_provider();
1479 }
1480 }
1481
1482 fn binding(&self, id: &str, version: u32) -> Option<Arc<TriggerBinding>> {
1483 self.bindings
1484 .get(id)
1485 .and_then(|bindings| bindings.iter().find(|binding| binding.version == version))
1486 .cloned()
1487 }
1488
1489 fn live_bindings(&self, id: &str, source: TriggerBindingSource) -> Vec<Arc<TriggerBinding>> {
1490 self.bindings
1491 .get(id)
1492 .into_iter()
1493 .flat_map(|bindings| bindings.iter())
1494 .filter(|binding| {
1495 binding.source == source && binding.state_snapshot() != TriggerState::Terminated
1496 })
1497 .cloned()
1498 .collect()
1499 }
1500
1501 fn live_bindings_any_source(&self, id: &str) -> Vec<Arc<TriggerBinding>> {
1502 self.bindings
1503 .get(id)
1504 .into_iter()
1505 .flat_map(|bindings| bindings.iter())
1506 .filter(|binding| binding.state_snapshot() != TriggerState::Terminated)
1507 .cloned()
1508 .collect()
1509 }
1510
1511 fn next_version_for_spec(&self, spec: &TriggerBindingSpec) -> u32 {
1512 if let Some(version) = self
1513 .bindings
1514 .get(spec.id.as_str())
1515 .into_iter()
1516 .flat_map(|bindings| bindings.iter())
1517 .find(|binding| binding.definition_fingerprint == spec.definition_fingerprint)
1518 .map(|binding| binding.version)
1519 {
1520 return version;
1521 }
1522
1523 let historical =
1524 self.historical_versions_for(spec.id.as_str(), spec.definition_fingerprint.as_str());
1525 if let Some(version) = historical.matching_version {
1526 return version;
1527 }
1528
1529 self.bindings
1530 .get(spec.id.as_str())
1531 .into_iter()
1532 .flat_map(|bindings| bindings.iter())
1533 .map(|binding| binding.version)
1534 .chain(historical.max_version)
1535 .max()
1536 .unwrap_or(0)
1537 + 1
1538 }
1539
1540 fn gc_terminated_versions(&mut self, id: &str) {
1541 let Some(bindings) = self.bindings.get_mut(id) else {
1542 return;
1543 };
1544
1545 let mut newest_versions: Vec<u32> =
1546 bindings.iter().map(|binding| binding.version).collect();
1547 newest_versions.sort_unstable_by(|left, right| right.cmp(left));
1548 newest_versions.truncate(TERMINATED_VERSION_RETENTION_LIMIT);
1549 let retained_versions: BTreeSet<u32> = newest_versions.into_iter().collect();
1550
1551 bindings.retain(|binding| {
1552 binding.state_snapshot() != TriggerState::Terminated
1553 || retained_versions.contains(&binding.version)
1554 });
1555
1556 if bindings.is_empty() {
1557 self.bindings.remove(id);
1558 }
1559 }
1560
1561 fn historical_versions_for(&self, id: &str, fingerprint: &str) -> HistoricalVersionLookup {
1562 let mut lookup = HistoricalVersionLookup::default();
1563 for record in self.lifecycle_records_for(id) {
1564 lookup.max_version = Some(
1565 lookup
1566 .max_version
1567 .unwrap_or(0)
1568 .max(record.transition.version),
1569 );
1570 if record.transition.definition_fingerprint.as_deref() == Some(fingerprint) {
1571 lookup.matching_version = Some(record.transition.version);
1572 }
1573 }
1574 lookup
1575 }
1576
1577 fn binding_version_as_of(
1578 &self,
1579 id: &str,
1580 as_of: OffsetDateTime,
1581 ) -> Result<u32, TriggerRegistryError> {
1582 let cutoff_ms = harn_clock::offset_datetime_to_ms(as_of);
1583 let mut active_version = None;
1584 for record in self.lifecycle_records_for(id) {
1585 if record.occurred_at_ms > cutoff_ms {
1586 break;
1587 }
1588 match record.transition.to_state {
1589 TriggerState::Active => active_version = Some(record.transition.version),
1590 TriggerState::Paused | TriggerState::Draining | TriggerState::Terminated => {
1591 if active_version == Some(record.transition.version) {
1592 active_version = None;
1593 }
1594 }
1595 TriggerState::Registering => {}
1596 }
1597 }
1598
1599 active_version.ok_or_else(|| {
1600 TriggerRegistryError::InvalidSpec(format!(
1601 "no active trigger binding '{}' at {}",
1602 id,
1603 as_of
1604 .format(&time::format_description::well_known::Rfc3339)
1605 .unwrap_or_else(|_| as_of.to_string())
1606 ))
1607 })
1608 }
1609
1610 fn lifecycle_records_for(&self, id: &str) -> Vec<HistoricalLifecycleRecord> {
1611 let Some(event_log) = self.event_log.as_ref() else {
1612 return Vec::new();
1613 };
1614 let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1615 .expect("static triggers.lifecycle topic should always be valid");
1616 futures::executor::block_on(event_log.read_range(&topic, None, usize::MAX))
1617 .unwrap_or_default()
1618 .into_iter()
1619 .filter_map(|(_, event)| {
1620 let occurred_at_ms = event.occurred_at_ms;
1621 let transition: LifecycleStateTransitionRecord =
1622 serde_json::from_value(event.payload).ok()?;
1623 (transition.id == id).then_some(HistoricalLifecycleRecord {
1624 occurred_at_ms,
1625 transition,
1626 })
1627 })
1628 .collect()
1629 }
1630
1631 #[allow(clippy::arc_with_non_send_sync)]
1632 fn register_binding(
1633 &mut self,
1634 spec: TriggerBindingSpec,
1635 version: u32,
1636 lifecycle: &mut Vec<LogEvent>,
1637 ) -> Arc<TriggerBinding> {
1638 let binding = Arc::new(TriggerBinding::new(spec, version));
1639 self.by_provider
1640 .entry(binding.provider.as_str().to_string())
1641 .or_default()
1642 .insert(binding.id.as_str().to_string());
1643 self.bindings
1644 .entry(binding.id.as_str().to_string())
1645 .or_default()
1646 .push(binding.clone());
1647 lifecycle.push(lifecycle_event(&binding, None, TriggerState::Registering));
1648 self.transition_binding_state(&binding, TriggerState::Active, lifecycle);
1649 binding
1650 }
1651
1652 fn transition_binding_to_draining(
1653 &self,
1654 binding: &Arc<TriggerBinding>,
1655 lifecycle: &mut Vec<LogEvent>,
1656 ) {
1657 if matches!(binding.state_snapshot(), TriggerState::Terminated) {
1658 return;
1659 }
1660 self.transition_binding_state(binding, TriggerState::Draining, lifecycle);
1661 self.maybe_finalize_draining(binding, lifecycle);
1662 }
1663
1664 fn maybe_finalize_draining(
1665 &self,
1666 binding: &Arc<TriggerBinding>,
1667 lifecycle: &mut Vec<LogEvent>,
1668 ) {
1669 if binding.state_snapshot() == TriggerState::Draining
1670 && binding.in_flight.load(Ordering::Relaxed) == 0
1671 {
1672 self.transition_binding_state(binding, TriggerState::Terminated, lifecycle);
1673 }
1674 }
1675
1676 fn transition_binding_state(
1677 &self,
1678 binding: &Arc<TriggerBinding>,
1679 next: TriggerState,
1680 lifecycle: &mut Vec<LogEvent>,
1681 ) {
1682 let mut state = binding.state.lock().expect("trigger state poisoned");
1683 let previous = *state;
1684 if previous == next {
1685 return;
1686 }
1687 *state = next;
1688 drop(state);
1689 if next == TriggerState::Terminated && binding.aggregation.is_some() {
1695 let _ = super::aggregation::drop_binding_aggregation(&binding.binding_key());
1696 }
1697 lifecycle.push(lifecycle_event(binding, Some(previous), next));
1698 }
1699}
1700
1701fn lifecycle_event(
1702 binding: &TriggerBinding,
1703 from_state: Option<TriggerState>,
1704 to_state: TriggerState,
1705) -> LogEvent {
1706 LogEvent::new(
1707 "state_transition",
1708 serde_json::json!({
1709 "id": binding.id.as_str(),
1710 "binding_key": binding.binding_key(),
1711 "version": binding.version,
1712 "provider": binding.provider.as_str(),
1713 "kind": &binding.kind,
1714 "source": binding.source.as_str(),
1715 "handler_kind": binding.handler.kind(),
1716 "definition_fingerprint": &binding.definition_fingerprint,
1717 "from_state": from_state.map(TriggerState::as_str),
1718 "to_state": to_state.as_str(),
1719 }),
1720 )
1721}
1722
1723async fn append_lifecycle_events(
1724 event_log: Option<Arc<AnyEventLog>>,
1725 events: Vec<LogEvent>,
1726) -> Result<(), TriggerRegistryError> {
1727 let Some(event_log) = event_log else {
1728 return Ok(());
1729 };
1730 if events.is_empty() {
1731 return Ok(());
1732 }
1733
1734 let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1735 .expect("static triggers.lifecycle topic should always be valid");
1736 for event in events {
1737 event_log
1738 .append(&topic, event)
1739 .await
1740 .map_err(|error| TriggerRegistryError::EventLog(error.to_string()))?;
1741 }
1742 Ok(())
1743}
1744
1745fn default_secret_provider() -> Option<Arc<dyn SecretProvider>> {
1746 configured_default_chain(default_secret_namespace())
1747 .ok()
1748 .map(|provider| Arc::new(provider) as Arc<dyn SecretProvider>)
1749}
1750
1751fn default_secret_namespace() -> String {
1752 if let Ok(namespace) = std::env::var("HARN_SECRET_NAMESPACE") {
1753 if !namespace.trim().is_empty() {
1754 return namespace;
1755 }
1756 }
1757
1758 let cwd = std::env::current_dir().unwrap_or_default();
1759 let leaf = cwd
1760 .file_name()
1761 .and_then(|name| name.to_str())
1762 .filter(|name| !name.is_empty())
1763 .unwrap_or("workspace");
1764 format!("harn/{leaf}")
1765}
1766
1767fn now_ms() -> i64 {
1768 clock::now_ms()
1769}
1770
1771#[cfg(test)]
1772mod tests {
1773 use super::*;
1774 use crate::event_log::{install_default_for_base_dir, reset_active_event_log};
1775 use crate::events::{add_event_sink, clear_event_sinks, CollectorSink, EventLevel};
1776 use crate::triggers::test_util::timing::FILE_WATCH_FALLBACK_POLL;
1777 use std::rc::Rc;
1778 use time::OffsetDateTime;
1779
1780 fn manifest_spec(id: &str, fingerprint: &str) -> TriggerBindingSpec {
1781 TriggerBindingSpec {
1782 id: id.to_string(),
1783 source: TriggerBindingSource::Manifest,
1784 kind: "webhook".to_string(),
1785 provider: ProviderId::from("github"),
1786 autonomy_tier: crate::AutonomyTier::ActAuto,
1787 handler: TriggerHandlerSpec::Worker {
1788 queue: format!("{id}-queue"),
1789 },
1790 dispatch_priority: crate::WorkerQueuePriority::Normal,
1791 when: None,
1792 when_budget: None,
1793 retry: TriggerRetryConfig::default(),
1794 match_events: vec!["issues.opened".to_string()],
1795 dedupe_key: Some("event.dedupe_key".to_string()),
1796 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1797 filter: Some("event.kind".to_string()),
1798 daily_cost_usd: Some(5.0),
1799 hourly_cost_usd: None,
1800 max_autonomous_decisions_per_hour: None,
1801 max_autonomous_decisions_per_day: None,
1802 on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1803 max_concurrent: Some(10),
1804 flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1805 aggregation: None,
1806 manifest_path: None,
1807 package_name: Some("workspace".to_string()),
1808 definition_fingerprint: fingerprint.to_string(),
1809 }
1810 }
1811
1812 fn dynamic_spec(id: &str) -> TriggerBindingSpec {
1813 TriggerBindingSpec {
1814 id: id.to_string(),
1815 source: TriggerBindingSource::Dynamic,
1816 kind: "webhook".to_string(),
1817 provider: ProviderId::from("github"),
1818 autonomy_tier: crate::AutonomyTier::ActAuto,
1819 handler: TriggerHandlerSpec::Worker {
1820 queue: format!("{id}-queue"),
1821 },
1822 dispatch_priority: crate::WorkerQueuePriority::Normal,
1823 when: None,
1824 when_budget: None,
1825 retry: TriggerRetryConfig::default(),
1826 match_events: vec!["issues.opened".to_string()],
1827 dedupe_key: None,
1828 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1829 filter: None,
1830 daily_cost_usd: None,
1831 hourly_cost_usd: None,
1832 max_autonomous_decisions_per_hour: None,
1833 max_autonomous_decisions_per_day: None,
1834 on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1835 max_concurrent: None,
1836 flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1837 aggregation: None,
1838 manifest_path: None,
1839 package_name: None,
1840 definition_fingerprint: format!("dynamic:{id}"),
1841 }
1842 }
1843
1844 #[tokio::test(flavor = "current_thread")]
1845 async fn manifest_loaded_trigger_registers_with_zeroed_metrics() {
1846 clear_trigger_registry();
1847
1848 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1849 .await
1850 .expect("manifest trigger installs");
1851
1852 let snapshots = snapshot_trigger_bindings();
1853 assert_eq!(snapshots.len(), 1);
1854 let binding = &snapshots[0];
1855 assert_eq!(binding.id, "github-new-issue");
1856 assert_eq!(binding.version, 1);
1857 assert_eq!(binding.state, TriggerState::Active);
1858 assert_eq!(binding.metrics, TriggerMetricsSnapshot::default());
1859
1860 clear_trigger_registry();
1861 }
1862
1863 #[tokio::test(flavor = "current_thread")]
1864 async fn dynamic_register_assigns_unique_ids_and_rejects_duplicates() {
1865 clear_trigger_registry();
1866
1867 let first = dynamic_register(dynamic_spec("dynamic-a"))
1868 .await
1869 .expect("first dynamic trigger");
1870 let second = dynamic_register(dynamic_spec("dynamic-b"))
1871 .await
1872 .expect("second dynamic trigger");
1873 assert_ne!(first, second);
1874
1875 let error = dynamic_register(dynamic_spec("dynamic-a"))
1876 .await
1877 .expect_err("duplicate id should fail");
1878 assert!(matches!(error, TriggerRegistryError::DuplicateId(_)));
1879
1880 clear_trigger_registry();
1881 }
1882
1883 #[tokio::test(flavor = "current_thread")]
1884 async fn drain_waits_for_in_flight_events_before_terminating() {
1885 clear_trigger_registry();
1886
1887 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1888 .await
1889 .expect("manifest trigger installs");
1890 begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1891
1892 drain("github-new-issue").await.expect("drain succeeds");
1893 let binding = snapshot_trigger_bindings()
1894 .into_iter()
1895 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1896 .expect("binding snapshot");
1897 assert_eq!(binding.state, TriggerState::Draining);
1898 assert_eq!(binding.metrics.in_flight, 1);
1899
1900 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1901 .await
1902 .expect("finish in-flight event");
1903 let binding = snapshot_trigger_bindings()
1904 .into_iter()
1905 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1906 .expect("binding snapshot");
1907 assert_eq!(binding.state, TriggerState::Terminated);
1908 assert_eq!(binding.metrics.in_flight, 0);
1909
1910 clear_trigger_registry();
1911 }
1912
1913 #[tokio::test(flavor = "current_thread")]
1914 async fn hot_reload_registers_new_version_while_old_binding_drains() {
1915 clear_trigger_registry();
1916
1917 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1918 .await
1919 .expect("initial manifest trigger installs");
1920 begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1921
1922 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1923 .await
1924 .expect("updated manifest trigger installs");
1925
1926 let snapshots = snapshot_trigger_bindings();
1927 assert_eq!(snapshots.len(), 2);
1928 let old = snapshots
1929 .iter()
1930 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1931 .expect("old binding");
1932 let new = snapshots
1933 .iter()
1934 .find(|binding| binding.id == "github-new-issue" && binding.version == 2)
1935 .expect("new binding");
1936 assert_eq!(old.state, TriggerState::Draining);
1937 assert_eq!(new.state, TriggerState::Active);
1938
1939 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1940 .await
1941 .expect("finish old in-flight event");
1942 let old = snapshot_trigger_bindings()
1943 .into_iter()
1944 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1945 .expect("old binding");
1946 assert_eq!(old.state, TriggerState::Terminated);
1947
1948 clear_trigger_registry();
1949 }
1950
1951 #[tokio::test(flavor = "current_thread")]
1952 async fn gc_drops_terminated_versions_beyond_retention_limit() {
1953 clear_trigger_registry();
1954
1955 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1956 .await
1957 .expect("install v1");
1958 begin_in_flight("github-new-issue", 1).expect("pin v1");
1959
1960 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1961 .await
1962 .expect("install v2");
1963 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1964 .await
1965 .expect("finish v1");
1966
1967 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
1968 .await
1969 .expect("install v3");
1970
1971 let snapshots = snapshot_trigger_bindings();
1972 let versions: Vec<u32> = snapshots
1973 .into_iter()
1974 .filter(|binding| binding.id == "github-new-issue")
1975 .map(|binding| binding.version)
1976 .collect();
1977 assert_eq!(versions, vec![2, 3]);
1978
1979 clear_trigger_registry();
1980 }
1981
1982 #[tokio::test(flavor = "current_thread")]
1983 async fn lifecycle_transitions_append_to_event_log() {
1984 clear_trigger_registry();
1985 reset_active_event_log();
1986 let tempdir = tempfile::tempdir().expect("tempdir");
1987 let log = install_default_for_base_dir(tempdir.path()).expect("install event log");
1988
1989 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1990 .await
1991 .expect("manifest trigger installs");
1992 begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1993 drain("github-new-issue").await.expect("drain succeeds");
1994 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1995 .await
1996 .expect("finish event");
1997
1998 let topic = Topic::new("triggers.lifecycle").expect("valid lifecycle topic");
1999 let events = log
2000 .read_range(&topic, None, 32)
2001 .await
2002 .expect("read lifecycle events");
2003 let states: Vec<String> = events
2004 .into_iter()
2005 .filter_map(|(_, event)| {
2006 event
2007 .payload
2008 .get("to_state")
2009 .and_then(|value| value.as_str())
2010 .map(|value| value.to_string())
2011 })
2012 .collect();
2013 assert_eq!(
2014 states,
2015 vec![
2016 "registering".to_string(),
2017 "active".to_string(),
2018 "draining".to_string(),
2019 "terminated".to_string(),
2020 ]
2021 );
2022
2023 reset_active_event_log();
2024 clear_trigger_registry();
2025 }
2026
2027 #[tokio::test(flavor = "current_thread")]
2028 async fn version_history_reuses_historical_version_after_restart() {
2029 clear_trigger_registry();
2030 reset_active_event_log();
2031 let tempdir = tempfile::tempdir().expect("tempdir");
2032 install_default_for_base_dir(tempdir.path()).expect("install event log");
2033
2034 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2035 .await
2036 .expect("initial manifest trigger installs");
2037 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2038 .await
2039 .expect("updated manifest trigger installs");
2040
2041 clear_trigger_registry();
2042 reset_active_event_log();
2043 install_default_for_base_dir(tempdir.path()).expect("reopen event log");
2044
2045 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2046 .await
2047 .expect("manifest reload reuses historical version");
2048
2049 let binding = snapshot_trigger_bindings()
2050 .into_iter()
2051 .find(|binding| binding.id == "github-new-issue")
2052 .expect("binding snapshot");
2053 assert_eq!(binding.version, 2);
2054
2055 reset_active_event_log();
2056 clear_trigger_registry();
2057 }
2058
2059 #[tokio::test(flavor = "current_thread")]
2060 async fn binding_version_as_of_reports_historical_active_version() {
2061 clear_trigger_registry();
2062 reset_active_event_log();
2063 let tempdir = tempfile::tempdir().expect("tempdir");
2064 install_default_for_base_dir(tempdir.path()).expect("install event log");
2065
2066 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2067 .await
2068 .expect("initial manifest trigger installs");
2069 let before_reload = OffsetDateTime::now_utc();
2070 std::thread::sleep(FILE_WATCH_FALLBACK_POLL);
2071
2072 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2073 .await
2074 .expect("updated manifest trigger installs");
2075 let after_reload = OffsetDateTime::now_utc();
2076
2077 assert_eq!(
2078 binding_version_as_of("github-new-issue", before_reload)
2079 .expect("version before reload"),
2080 1
2081 );
2082 assert_eq!(
2083 binding_version_as_of("github-new-issue", after_reload).expect("version after reload"),
2084 2
2085 );
2086
2087 reset_active_event_log();
2088 clear_trigger_registry();
2089 }
2090
2091 #[tokio::test(flavor = "current_thread")]
2092 async fn resolve_live_or_as_of_logs_structured_gc_fallback() {
2093 clear_trigger_registry();
2094 reset_active_event_log();
2095 let sink = Rc::new(CollectorSink::new());
2096 clear_event_sinks();
2097 add_event_sink(sink.clone());
2098 let tempdir = tempfile::tempdir().expect("tempdir");
2099 install_default_for_base_dir(tempdir.path()).expect("install event log");
2100
2101 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2102 .await
2103 .expect("install v1");
2104 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2105 .await
2106 .expect("install v2");
2107 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
2108 .await
2109 .expect("install v3");
2110 let received_at = OffsetDateTime::now_utc();
2111 std::thread::sleep(FILE_WATCH_FALLBACK_POLL);
2112 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v4")])
2113 .await
2114 .expect("install v4");
2115
2116 let binding = resolve_live_or_as_of(
2117 "github-new-issue",
2118 RecordedTriggerBinding {
2119 version: 1,
2120 received_at,
2121 },
2122 )
2123 .expect("resolve fallback binding");
2124 assert_eq!(binding.version, 3);
2125
2126 let warning = sink
2127 .logs
2128 .borrow()
2129 .iter()
2130 .find(|log| log.category == "replay.binding_version_gc_fallback")
2131 .cloned()
2132 .expect("gc fallback warning");
2133 assert_eq!(warning.level, EventLevel::Warn);
2134 assert_eq!(
2135 warning.metadata.get("trigger_id"),
2136 Some(&serde_json::json!("github-new-issue"))
2137 );
2138 assert_eq!(
2139 warning.metadata.get("recorded_version"),
2140 Some(&serde_json::json!(1))
2141 );
2142 assert_eq!(
2143 warning.metadata.get("received_at"),
2144 Some(&serde_json::json!(received_at
2145 .format(&time::format_description::well_known::Rfc3339)
2146 .unwrap_or_else(|_| received_at.to_string())))
2147 );
2148 assert_eq!(
2149 warning.metadata.get("resolved_version"),
2150 Some(&serde_json::json!(3))
2151 );
2152
2153 clear_event_sinks();
2154 crate::events::reset_event_sinks();
2155 reset_active_event_log();
2156 clear_trigger_registry();
2157 }
2158}