1use serde::{Deserialize, Serialize};
2use std::cell::RefCell;
3use std::collections::{BTreeMap, BTreeSet, VecDeque};
4use std::path::PathBuf;
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6use std::sync::{Arc, Mutex};
7
8use time::OffsetDateTime;
9use uuid::Uuid;
10
11use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogEvent, Topic};
12use crate::llm::trigger_predicate::TriggerPredicateBudget;
13use crate::secrets::{configured_default_chain, SecretProvider};
14use crate::triggers::test_util::clock;
15use crate::trust_graph::AutonomyTier;
16use crate::value::VmClosure;
17
18use super::aggregation::TriggerAggregationConfig;
19use super::dispatcher::TriggerRetryConfig;
20use super::flow_control::TriggerFlowControlConfig;
21use super::ProviderId;
22
23#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
24pub struct TriggerId(String);
25
26impl TriggerId {
27 pub fn new(value: impl Into<String>) -> Self {
28 Self(value.into())
29 }
30
31 pub fn as_str(&self) -> &str {
32 &self.0
33 }
34}
35
36impl std::fmt::Display for TriggerId {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 self.0.fmt(f)
39 }
40}
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
43#[serde(rename_all = "snake_case")]
44pub enum TriggerState {
45 Registering,
46 Active,
47 Paused,
48 Draining,
49 Terminated,
50}
51
52impl TriggerState {
53 pub fn as_str(self) -> &'static str {
54 match self {
55 Self::Registering => "registering",
56 Self::Active => "active",
57 Self::Paused => "paused",
58 Self::Draining => "draining",
59 Self::Terminated => "terminated",
60 }
61 }
62}
63
64#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
65#[serde(rename_all = "snake_case")]
66pub enum TriggerBindingSource {
67 Manifest,
68 Dynamic,
69}
70
71impl TriggerBindingSource {
72 pub fn as_str(self) -> &'static str {
73 match self {
74 Self::Manifest => "manifest",
75 Self::Dynamic => "dynamic",
76 }
77 }
78}
79
80#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
81#[serde(rename_all = "snake_case")]
82pub enum TriggerBudgetExhaustionStrategy {
83 #[default]
84 False,
85 RetryLater,
86 Fail,
87 Warn,
88}
89
90impl TriggerBudgetExhaustionStrategy {
91 pub fn as_str(self) -> &'static str {
92 match self {
93 Self::False => "false",
94 Self::RetryLater => "retry_later",
95 Self::Fail => "fail",
96 Self::Warn => "warn",
97 }
98 }
99}
100
101#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
102pub struct OrchestratorBudgetConfig {
103 pub daily_cost_usd: Option<f64>,
104 pub hourly_cost_usd: Option<f64>,
105}
106
107#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
108pub struct OrchestratorBudgetSnapshot {
109 pub daily_cost_usd: Option<f64>,
110 pub hourly_cost_usd: Option<f64>,
111 pub cost_today_usd_micros: u64,
112 pub cost_hour_usd_micros: u64,
113 pub day_utc: i32,
114 pub hour_utc: i64,
115}
116
117#[derive(Debug)]
118struct OrchestratorBudgetState {
119 config: OrchestratorBudgetConfig,
120 day_utc: i32,
121 hour_utc: i64,
122 cost_today_usd_micros: u64,
123 cost_hour_usd_micros: u64,
124}
125
126impl Default for OrchestratorBudgetState {
127 fn default() -> Self {
128 Self {
129 config: OrchestratorBudgetConfig::default(),
130 day_utc: utc_day_key(),
131 hour_utc: utc_hour_key(),
132 cost_today_usd_micros: 0,
133 cost_hour_usd_micros: 0,
134 }
135 }
136}
137
138#[derive(Clone)]
139pub enum TriggerHandlerSpec {
140 Local {
141 raw: String,
142 closure: Arc<VmClosure>,
143 },
144 A2a {
145 target: String,
146 allow_cleartext: bool,
147 },
148 Worker {
149 queue: String,
150 },
151 Persona {
152 binding: crate::PersonaRuntimeBinding,
153 },
154 AutoResume {
155 worker_id: String,
156 },
157 SpawnToPool {
162 pool: String,
163 priority_from: Option<String>,
164 key_from: Option<String>,
165 task_factory: Arc<VmClosure>,
166 },
167 ReminderInject {
176 target: TargetExpr,
177 body: String,
178 tags: Vec<String>,
179 ttl_turns: Option<i64>,
180 dedupe_key: Option<String>,
181 propagate: crate::llm::helpers::ReminderPropagate,
182 role_hint: crate::llm::helpers::ReminderRoleHint,
183 preserve_on_compact: bool,
184 },
185 InterruptAndSuspend {
198 target_agents: AgentScope,
199 reason: String,
200 },
201}
202
203#[derive(Clone)]
211pub enum AgentScope {
212 All,
213 Concrete(Vec<String>),
214 Closure(Arc<VmClosure>),
215}
216
217impl AgentScope {
218 pub fn kind(&self) -> &'static str {
219 match self {
220 Self::All => "all",
221 Self::Concrete(_) => "concrete",
222 Self::Closure(_) => "closure",
223 }
224 }
225}
226
227impl std::fmt::Debug for AgentScope {
228 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229 match self {
230 Self::All => f.write_str("All"),
231 Self::Concrete(ids) => f.debug_tuple("Concrete").field(ids).finish(),
232 Self::Closure(_) => f.write_str("Closure(<vm_closure>)"),
233 }
234 }
235}
236
237#[derive(Clone)]
244pub enum TargetExpr {
245 Current,
246 Parent,
247 Concrete(String),
248 Closure(Arc<VmClosure>),
249}
250
251impl TargetExpr {
252 pub fn kind(&self) -> &'static str {
253 match self {
254 Self::Current => "current",
255 Self::Parent => "parent",
256 Self::Concrete(_) => "concrete",
257 Self::Closure(_) => "closure",
258 }
259 }
260}
261
262impl std::fmt::Debug for TargetExpr {
263 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264 match self {
265 Self::Current => f.write_str("Current"),
266 Self::Parent => f.write_str("Parent"),
267 Self::Concrete(id) => f.debug_tuple("Concrete").field(id).finish(),
268 Self::Closure(_) => f.write_str("Closure(<vm_closure>)"),
269 }
270 }
271}
272
273impl std::fmt::Debug for TriggerHandlerSpec {
274 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275 match self {
276 Self::Local { raw, .. } => f.debug_struct("Local").field("raw", raw).finish(),
277 Self::A2a {
278 target,
279 allow_cleartext,
280 } => f
281 .debug_struct("A2a")
282 .field("target", target)
283 .field("allow_cleartext", allow_cleartext)
284 .finish(),
285 Self::Worker { queue } => f.debug_struct("Worker").field("queue", queue).finish(),
286 Self::Persona { binding } => f
287 .debug_struct("Persona")
288 .field("name", &binding.name)
289 .finish(),
290 Self::AutoResume { worker_id } => f
291 .debug_struct("AutoResume")
292 .field("worker_id", worker_id)
293 .finish(),
294 Self::SpawnToPool {
295 pool,
296 priority_from,
297 key_from,
298 ..
299 } => f
300 .debug_struct("SpawnToPool")
301 .field("pool", pool)
302 .field("priority_from", priority_from)
303 .field("key_from", key_from)
304 .finish(),
305 Self::ReminderInject {
306 target,
307 body,
308 tags,
309 ttl_turns,
310 dedupe_key,
311 propagate,
312 role_hint,
313 preserve_on_compact,
314 } => f
315 .debug_struct("ReminderInject")
316 .field("target", target)
317 .field("body", body)
318 .field("tags", tags)
319 .field("ttl_turns", ttl_turns)
320 .field("dedupe_key", dedupe_key)
321 .field("propagate", propagate)
322 .field("role_hint", role_hint)
323 .field("preserve_on_compact", preserve_on_compact)
324 .finish(),
325 Self::InterruptAndSuspend {
326 target_agents,
327 reason,
328 } => f
329 .debug_struct("InterruptAndSuspend")
330 .field("target_agents", target_agents)
331 .field("reason", reason)
332 .finish(),
333 }
334 }
335}
336
337impl TriggerHandlerSpec {
338 pub fn kind(&self) -> &'static str {
339 match self {
340 Self::Local { .. } => "local",
341 Self::A2a { .. } => "a2a",
342 Self::Worker { .. } => "worker",
343 Self::Persona { .. } => "persona",
344 Self::AutoResume { .. } => "auto_resume",
345 Self::SpawnToPool { .. } => "spawn_to_pool",
346 Self::ReminderInject { .. } => "reminder_inject",
347 Self::InterruptAndSuspend { .. } => "interrupt_and_suspend",
348 }
349 }
350}
351
352#[derive(Clone)]
353pub struct TriggerPredicateSpec {
354 pub raw: String,
355 pub closure: Arc<VmClosure>,
356}
357
358impl std::fmt::Debug for TriggerPredicateSpec {
359 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
360 f.debug_struct("TriggerPredicateSpec")
361 .field("raw", &self.raw)
362 .finish()
363 }
364}
365
366#[derive(Clone, Debug)]
367pub struct TriggerBindingSpec {
368 pub id: String,
369 pub source: TriggerBindingSource,
370 pub kind: String,
371 pub provider: ProviderId,
372 pub autonomy_tier: AutonomyTier,
373 pub handler: TriggerHandlerSpec,
374 pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
375 pub when: Option<TriggerPredicateSpec>,
376 pub when_budget: Option<TriggerPredicateBudget>,
377 pub retry: TriggerRetryConfig,
378 pub match_events: Vec<String>,
379 pub dedupe_key: Option<String>,
380 pub dedupe_retention_days: u32,
381 pub filter: Option<String>,
382 pub daily_cost_usd: Option<f64>,
383 pub hourly_cost_usd: Option<f64>,
384 pub max_autonomous_decisions_per_hour: Option<u64>,
385 pub max_autonomous_decisions_per_day: Option<u64>,
386 pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
387 pub max_concurrent: Option<u32>,
388 pub flow_control: TriggerFlowControlConfig,
389 pub aggregation: Option<TriggerAggregationConfig>,
394 pub manifest_path: Option<PathBuf>,
395 pub package_name: Option<String>,
396 pub definition_fingerprint: String,
397}
398
399#[derive(Debug)]
400pub struct TriggerMetrics {
401 pub received: AtomicU64,
402 pub dispatched: AtomicU64,
403 pub failed: AtomicU64,
404 pub dlq: AtomicU64,
405 pub last_received_ms: Mutex<Option<i64>>,
406 pub cost_total_usd_micros: AtomicU64,
407 pub cost_today_usd_micros: AtomicU64,
408 pub cost_hour_usd_micros: AtomicU64,
409 pub autonomous_decisions_total: AtomicU64,
410 pub autonomous_decisions_today: AtomicU64,
411 pub autonomous_decisions_hour: AtomicU64,
412}
413
414impl Default for TriggerMetrics {
415 fn default() -> Self {
416 Self {
417 received: AtomicU64::new(0),
418 dispatched: AtomicU64::new(0),
419 failed: AtomicU64::new(0),
420 dlq: AtomicU64::new(0),
421 last_received_ms: Mutex::new(None),
422 cost_total_usd_micros: AtomicU64::new(0),
423 cost_today_usd_micros: AtomicU64::new(0),
424 cost_hour_usd_micros: AtomicU64::new(0),
425 autonomous_decisions_total: AtomicU64::new(0),
426 autonomous_decisions_today: AtomicU64::new(0),
427 autonomous_decisions_hour: AtomicU64::new(0),
428 }
429 }
430}
431
432#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
433pub struct TriggerMetricsSnapshot {
434 pub received: u64,
435 pub dispatched: u64,
436 pub failed: u64,
437 pub dlq: u64,
438 pub in_flight: u64,
439 pub last_received_ms: Option<i64>,
440 pub cost_total_usd_micros: u64,
441 pub cost_today_usd_micros: u64,
442 pub cost_hour_usd_micros: u64,
443 pub autonomous_decisions_total: u64,
444 pub autonomous_decisions_today: u64,
445 pub autonomous_decisions_hour: u64,
446}
447
448pub struct TriggerBinding {
449 pub id: TriggerId,
450 pub version: u32,
451 pub source: TriggerBindingSource,
452 pub kind: String,
453 pub provider: ProviderId,
454 pub autonomy_tier: AutonomyTier,
455 pub handler: TriggerHandlerSpec,
456 pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
457 pub when: Option<TriggerPredicateSpec>,
458 pub when_budget: Option<TriggerPredicateBudget>,
459 pub retry: TriggerRetryConfig,
460 pub match_events: Vec<String>,
461 pub dedupe_key: Option<String>,
462 pub dedupe_retention_days: u32,
463 pub filter: Option<String>,
464 pub daily_cost_usd: Option<f64>,
465 pub hourly_cost_usd: Option<f64>,
466 pub max_autonomous_decisions_per_hour: Option<u64>,
467 pub max_autonomous_decisions_per_day: Option<u64>,
468 pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
469 pub max_concurrent: Option<u32>,
470 pub flow_control: TriggerFlowControlConfig,
471 pub aggregation: Option<TriggerAggregationConfig>,
475 pub manifest_path: Option<PathBuf>,
476 pub package_name: Option<String>,
477 pub definition_fingerprint: String,
478 pub state: Mutex<TriggerState>,
479 pub metrics: TriggerMetrics,
480 pub in_flight: AtomicU64,
481 pub cancel_token: Arc<AtomicBool>,
482 pub predicate_state: Mutex<TriggerPredicateState>,
483}
484
485#[derive(Clone, Debug, Default)]
486pub struct TriggerPredicateState {
487 pub budget_day_utc: Option<i32>,
488 pub budget_hour_utc: Option<i64>,
489 pub consecutive_failures: u32,
490 pub breaker_open_until_ms: Option<i64>,
491 pub recent_cost_usd_micros: VecDeque<u64>,
492}
493
494impl std::fmt::Debug for TriggerBinding {
495 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
496 f.debug_struct("TriggerBinding")
497 .field("id", &self.id)
498 .field("version", &self.version)
499 .field("source", &self.source)
500 .field("kind", &self.kind)
501 .field("provider", &self.provider)
502 .field("handler_kind", &self.handler.kind())
503 .field("state", &self.state_snapshot())
504 .finish()
505 }
506}
507
508impl TriggerBinding {
509 pub fn snapshot(&self) -> TriggerBindingSnapshot {
510 TriggerBindingSnapshot {
511 id: self.id.as_str().to_string(),
512 version: self.version,
513 source: self.source,
514 kind: self.kind.clone(),
515 provider: self.provider.as_str().to_string(),
516 autonomy_tier: self.autonomy_tier,
517 handler_kind: self.handler.kind().to_string(),
518 state: self.state_snapshot(),
519 metrics: self.metrics_snapshot(),
520 daily_cost_usd: self.daily_cost_usd,
521 hourly_cost_usd: self.hourly_cost_usd,
522 max_autonomous_decisions_per_hour: self.max_autonomous_decisions_per_hour,
523 max_autonomous_decisions_per_day: self.max_autonomous_decisions_per_day,
524 on_budget_exhausted: self.on_budget_exhausted,
525 }
526 }
527
528 fn new(spec: TriggerBindingSpec, version: u32) -> Self {
529 Self {
530 id: TriggerId::new(spec.id),
531 version,
532 source: spec.source,
533 kind: spec.kind,
534 provider: spec.provider,
535 autonomy_tier: spec.autonomy_tier,
536 handler: spec.handler,
537 dispatch_priority: spec.dispatch_priority,
538 when: spec.when,
539 when_budget: spec.when_budget,
540 retry: spec.retry,
541 match_events: spec.match_events,
542 dedupe_key: spec.dedupe_key,
543 dedupe_retention_days: spec.dedupe_retention_days,
544 filter: spec.filter,
545 daily_cost_usd: spec.daily_cost_usd,
546 hourly_cost_usd: spec.hourly_cost_usd,
547 max_autonomous_decisions_per_hour: spec.max_autonomous_decisions_per_hour,
548 max_autonomous_decisions_per_day: spec.max_autonomous_decisions_per_day,
549 on_budget_exhausted: spec.on_budget_exhausted,
550 max_concurrent: spec.max_concurrent,
551 flow_control: spec.flow_control,
552 aggregation: spec.aggregation,
553 manifest_path: spec.manifest_path,
554 package_name: spec.package_name,
555 definition_fingerprint: spec.definition_fingerprint,
556 state: Mutex::new(TriggerState::Registering),
557 metrics: TriggerMetrics::default(),
558 in_flight: AtomicU64::new(0),
559 cancel_token: Arc::new(AtomicBool::new(false)),
560 predicate_state: Mutex::new(TriggerPredicateState::default()),
561 }
562 }
563
564 pub fn binding_key(&self) -> String {
565 format!("{}@v{}", self.id.as_str(), self.version)
566 }
567
568 pub fn state_snapshot(&self) -> TriggerState {
569 *self.state.lock().expect("trigger state poisoned")
570 }
571
572 pub fn metrics_snapshot(&self) -> TriggerMetricsSnapshot {
573 TriggerMetricsSnapshot {
574 received: self.metrics.received.load(Ordering::Relaxed),
575 dispatched: self.metrics.dispatched.load(Ordering::Relaxed),
576 failed: self.metrics.failed.load(Ordering::Relaxed),
577 dlq: self.metrics.dlq.load(Ordering::Relaxed),
578 in_flight: self.in_flight.load(Ordering::Relaxed),
579 last_received_ms: *self
580 .metrics
581 .last_received_ms
582 .lock()
583 .expect("trigger metrics poisoned"),
584 cost_total_usd_micros: self.metrics.cost_total_usd_micros.load(Ordering::Relaxed),
585 cost_today_usd_micros: self.metrics.cost_today_usd_micros.load(Ordering::Relaxed),
586 cost_hour_usd_micros: self.metrics.cost_hour_usd_micros.load(Ordering::Relaxed),
587 autonomous_decisions_total: self
588 .metrics
589 .autonomous_decisions_total
590 .load(Ordering::Relaxed),
591 autonomous_decisions_today: self
592 .metrics
593 .autonomous_decisions_today
594 .load(Ordering::Relaxed),
595 autonomous_decisions_hour: self
596 .metrics
597 .autonomous_decisions_hour
598 .load(Ordering::Relaxed),
599 }
600 }
601}
602
603#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
604pub struct TriggerBindingSnapshot {
605 pub id: String,
606 pub version: u32,
607 pub source: TriggerBindingSource,
608 pub kind: String,
609 pub provider: String,
610 pub autonomy_tier: AutonomyTier,
611 pub handler_kind: String,
612 pub state: TriggerState,
613 pub metrics: TriggerMetricsSnapshot,
614 pub daily_cost_usd: Option<f64>,
615 pub hourly_cost_usd: Option<f64>,
616 pub max_autonomous_decisions_per_hour: Option<u64>,
617 pub max_autonomous_decisions_per_day: Option<u64>,
618 pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
619}
620
621#[derive(Clone, Copy, Debug, PartialEq, Eq)]
622pub enum TriggerDispatchOutcome {
623 Dispatched,
624 Failed,
625 Dlq,
626}
627
628#[derive(Debug)]
629pub enum TriggerRegistryError {
630 DuplicateId(String),
631 InvalidSpec(String),
632 UnknownId(String),
633 UnknownBindingVersion { id: String, version: u32 },
634 EventLog(String),
635}
636
637impl std::fmt::Display for TriggerRegistryError {
638 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
639 match self {
640 Self::DuplicateId(id) => write!(f, "duplicate trigger id '{id}'"),
641 Self::InvalidSpec(message) | Self::EventLog(message) => f.write_str(message),
642 Self::UnknownId(id) => write!(f, "unknown trigger id '{id}'"),
643 Self::UnknownBindingVersion { id, version } => {
644 write!(f, "unknown trigger binding '{id}' version {version}")
645 }
646 }
647 }
648}
649
650impl std::error::Error for TriggerRegistryError {}
651
652#[derive(Default)]
653pub struct TriggerRegistry {
654 bindings: BTreeMap<String, Vec<Arc<TriggerBinding>>>,
655 by_provider: BTreeMap<String, BTreeSet<String>>,
656 event_log: Option<Arc<AnyEventLog>>,
657 secret_provider: Option<Arc<dyn SecretProvider>>,
658}
659
660thread_local! {
661 static TRIGGER_REGISTRY: RefCell<TriggerRegistry> = RefCell::new(TriggerRegistry::default());
662}
663
664thread_local! {
665 static ORCHESTRATOR_BUDGET: RefCell<OrchestratorBudgetState> =
666 RefCell::new(OrchestratorBudgetState::default());
667}
668
669const TERMINATED_VERSION_RETENTION_LIMIT: usize = 2;
670
671const TRIGGERS_LIFECYCLE_TOPIC: &str = "triggers.lifecycle";
672const PREDICATE_COST_WINDOW: usize = 100;
673
674#[derive(Clone, Debug, Deserialize)]
675struct LifecycleStateTransitionRecord {
676 id: String,
677 version: u32,
678 #[serde(default)]
679 definition_fingerprint: Option<String>,
680 to_state: TriggerState,
681}
682
683#[derive(Clone, Debug)]
684struct HistoricalLifecycleRecord {
685 occurred_at_ms: i64,
686 transition: LifecycleStateTransitionRecord,
687}
688
689#[derive(Clone, Copy, Debug, PartialEq, Eq)]
690pub struct RecordedTriggerBinding {
691 pub version: u32,
692 pub received_at: OffsetDateTime,
693}
694
695#[derive(Clone, Copy, Debug, Default)]
696struct HistoricalVersionLookup {
697 matching_version: Option<u32>,
698 max_version: Option<u32>,
699}
700
701pub fn clear_trigger_registry() {
702 TRIGGER_REGISTRY.with(|slot| {
703 *slot.borrow_mut() = TriggerRegistry::default();
704 });
705 clear_orchestrator_budget();
706 super::aggregation::clear_aggregation_state();
707}
708
709pub fn install_orchestrator_budget(config: OrchestratorBudgetConfig) {
710 ORCHESTRATOR_BUDGET.with(|slot| {
711 let mut state = slot.borrow_mut();
712 rollover_orchestrator_budget(&mut state);
713 state.config = config;
714 });
715}
716
717pub fn clear_orchestrator_budget() {
718 ORCHESTRATOR_BUDGET.with(|slot| {
719 *slot.borrow_mut() = OrchestratorBudgetState::default();
720 });
721}
722
723pub fn snapshot_orchestrator_budget() -> OrchestratorBudgetSnapshot {
724 ORCHESTRATOR_BUDGET.with(|slot| {
725 let mut state = slot.borrow_mut();
726 rollover_orchestrator_budget(&mut state);
727 OrchestratorBudgetSnapshot {
728 daily_cost_usd: state.config.daily_cost_usd,
729 hourly_cost_usd: state.config.hourly_cost_usd,
730 cost_today_usd_micros: state.cost_today_usd_micros,
731 cost_hour_usd_micros: state.cost_hour_usd_micros,
732 day_utc: state.day_utc,
733 hour_utc: state.hour_utc,
734 }
735 })
736}
737
738pub fn note_orchestrator_budget_cost(cost_usd_micros: u64) {
739 if cost_usd_micros == 0 {
740 return;
741 }
742 ORCHESTRATOR_BUDGET.with(|slot| {
743 let mut state = slot.borrow_mut();
744 rollover_orchestrator_budget(&mut state);
745 state.cost_today_usd_micros = state.cost_today_usd_micros.saturating_add(cost_usd_micros);
746 state.cost_hour_usd_micros = state.cost_hour_usd_micros.saturating_add(cost_usd_micros);
747 });
748}
749
750pub fn orchestrator_budget_would_exceed(expected_cost_usd_micros: u64) -> Option<&'static str> {
751 ORCHESTRATOR_BUDGET.with(|slot| {
752 let mut state = slot.borrow_mut();
753 rollover_orchestrator_budget(&mut state);
754 if state.config.hourly_cost_usd.is_some_and(|limit| {
755 micros_to_usd(
756 state
757 .cost_hour_usd_micros
758 .saturating_add(expected_cost_usd_micros),
759 ) > limit
760 }) {
761 return Some("orchestrator_hourly_budget_exceeded");
762 }
763 if state.config.daily_cost_usd.is_some_and(|limit| {
764 micros_to_usd(
765 state
766 .cost_today_usd_micros
767 .saturating_add(expected_cost_usd_micros),
768 ) > limit
769 }) {
770 return Some("orchestrator_daily_budget_exceeded");
771 }
772 None
773 })
774}
775
776pub fn reset_binding_budget_windows(binding: &TriggerBinding) {
777 let today = utc_day_key();
778 let hour = utc_hour_key();
779 let mut state = binding
780 .predicate_state
781 .lock()
782 .expect("trigger predicate state poisoned");
783 if state.budget_day_utc != Some(today) {
784 state.budget_day_utc = Some(today);
785 binding
786 .metrics
787 .cost_today_usd_micros
788 .store(0, Ordering::Relaxed);
789 binding
790 .metrics
791 .autonomous_decisions_today
792 .store(0, Ordering::Relaxed);
793 }
794 if state.budget_hour_utc != Some(hour) {
795 state.budget_hour_utc = Some(hour);
796 binding
797 .metrics
798 .cost_hour_usd_micros
799 .store(0, Ordering::Relaxed);
800 binding
801 .metrics
802 .autonomous_decisions_hour
803 .store(0, Ordering::Relaxed);
804 }
805}
806
807pub fn binding_budget_would_exceed(
808 binding: &TriggerBinding,
809 expected_cost_usd_micros: u64,
810) -> Option<&'static str> {
811 reset_binding_budget_windows(binding);
812 if binding.hourly_cost_usd.is_some_and(|limit| {
813 micros_to_usd(
814 binding
815 .metrics
816 .cost_hour_usd_micros
817 .load(Ordering::Relaxed)
818 .saturating_add(expected_cost_usd_micros),
819 ) > limit
820 }) {
821 return Some("hourly_budget_exceeded");
822 }
823 if binding.daily_cost_usd.is_some_and(|limit| {
824 micros_to_usd(
825 binding
826 .metrics
827 .cost_today_usd_micros
828 .load(Ordering::Relaxed)
829 .saturating_add(expected_cost_usd_micros),
830 ) > limit
831 }) {
832 return Some("daily_budget_exceeded");
833 }
834 None
835}
836
837pub fn binding_autonomy_budget_would_exceed(binding: &TriggerBinding) -> Option<&'static str> {
838 reset_binding_budget_windows(binding);
839 if binding
840 .max_autonomous_decisions_per_hour
841 .is_some_and(|limit| {
842 binding
843 .metrics
844 .autonomous_decisions_hour
845 .load(Ordering::Relaxed)
846 .saturating_add(1)
847 > limit
848 })
849 {
850 return Some("hourly_autonomy_budget_exceeded");
851 }
852 if binding
853 .max_autonomous_decisions_per_day
854 .is_some_and(|limit| {
855 binding
856 .metrics
857 .autonomous_decisions_today
858 .load(Ordering::Relaxed)
859 .saturating_add(1)
860 > limit
861 })
862 {
863 return Some("daily_autonomy_budget_exceeded");
864 }
865 None
866}
867
868pub fn note_autonomous_decision(binding: &TriggerBinding) {
869 reset_binding_budget_windows(binding);
870 binding
871 .metrics
872 .autonomous_decisions_total
873 .fetch_add(1, Ordering::Relaxed);
874 binding
875 .metrics
876 .autonomous_decisions_today
877 .fetch_add(1, Ordering::Relaxed);
878 binding
879 .metrics
880 .autonomous_decisions_hour
881 .fetch_add(1, Ordering::Relaxed);
882}
883
884pub fn expected_predicate_cost_usd_micros(binding: &TriggerBinding) -> u64 {
885 let state = binding
886 .predicate_state
887 .lock()
888 .expect("trigger predicate state poisoned");
889 if let Some(average) = average_cost_sample_micros(&state.recent_cost_usd_micros) {
890 return average;
891 }
892 binding
893 .when_budget
894 .as_ref()
895 .and_then(|budget| budget.max_cost_usd)
896 .map(usd_to_micros)
897 .unwrap_or_default()
898}
899
900pub fn record_predicate_cost_sample(binding: &TriggerBinding, cost_usd_micros: u64) {
901 let mut state = binding
902 .predicate_state
903 .lock()
904 .expect("trigger predicate state poisoned");
905 state.recent_cost_usd_micros.push_back(cost_usd_micros);
906 while state.recent_cost_usd_micros.len() > PREDICATE_COST_WINDOW {
907 state.recent_cost_usd_micros.pop_front();
908 }
909}
910
911fn average_cost_sample_micros(samples: &VecDeque<u64>) -> Option<u64> {
912 if samples.is_empty() {
913 return None;
914 }
915 let total: u128 = samples.iter().map(|sample| u128::from(*sample)).sum();
916 Some((total / samples.len() as u128) as u64)
917}
918
919pub fn usd_to_micros(value: f64) -> u64 {
920 if !value.is_finite() || value <= 0.0 {
921 return 0;
922 }
923 (value * 1_000_000.0).ceil() as u64
924}
925
926pub fn micros_to_usd(value: u64) -> f64 {
927 value as f64 / 1_000_000.0
928}
929
930fn rollover_orchestrator_budget(state: &mut OrchestratorBudgetState) {
931 let today = utc_day_key();
932 let hour = utc_hour_key();
933 if state.day_utc != today {
934 state.day_utc = today;
935 state.cost_today_usd_micros = 0;
936 }
937 if state.hour_utc != hour {
938 state.hour_utc = hour;
939 state.cost_hour_usd_micros = 0;
940 }
941}
942
943fn utc_day_key() -> i32 {
944 (clock::now_utc().date()
945 - time::Date::from_calendar_date(1970, time::Month::January, 1).expect("valid epoch date"))
946 .whole_days() as i32
947}
948
949fn utc_hour_key() -> i64 {
950 clock::now_utc().unix_timestamp() / 3_600
951}
952
953pub fn snapshot_trigger_bindings() -> Vec<TriggerBindingSnapshot> {
954 TRIGGER_REGISTRY.with(|slot| {
955 let registry = slot.borrow();
956 let mut snapshots = Vec::new();
957 for bindings in registry.bindings.values() {
958 for binding in bindings {
959 snapshots.push(binding.snapshot());
960 }
961 }
962 snapshots.sort_by(|left, right| {
963 left.id
964 .cmp(&right.id)
965 .then(left.version.cmp(&right.version))
966 .then(left.state.as_str().cmp(right.state.as_str()))
967 });
968 snapshots
969 })
970}
971
972#[allow(clippy::arc_with_non_send_sync)]
973pub fn resolve_trigger_binding_as_of(
974 id: &str,
975 as_of: OffsetDateTime,
976) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
977 let version = binding_version_as_of(id, as_of)?;
978 resolve_trigger_binding_version(id, version)
979}
980
981#[allow(clippy::arc_with_non_send_sync)]
982pub fn resolve_live_or_as_of(
983 id: &str,
984 recorded: RecordedTriggerBinding,
985) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
986 match resolve_live_trigger_binding(id, Some(recorded.version)) {
987 Ok(binding) => Ok(binding),
988 Err(TriggerRegistryError::UnknownBindingVersion { .. }) => {
989 let binding = resolve_trigger_binding_as_of(id, recorded.received_at)?;
990 let mut metadata = BTreeMap::new();
991 metadata.insert("trigger_id".to_string(), serde_json::json!(id));
992 metadata.insert(
993 "recorded_version".to_string(),
994 serde_json::json!(recorded.version),
995 );
996 metadata.insert(
997 "received_at".to_string(),
998 serde_json::json!(recorded
999 .received_at
1000 .format(&time::format_description::well_known::Rfc3339)
1001 .unwrap_or_else(|_| recorded.received_at.to_string())),
1002 );
1003 metadata.insert(
1004 "resolved_version".to_string(),
1005 serde_json::json!(binding.version),
1006 );
1007 crate::events::log_warn_meta(
1008 "replay.binding_version_gc_fallback",
1009 "trigger replay fell back to lifecycle history after binding version GC",
1010 metadata,
1011 );
1012 Ok(binding)
1013 }
1014 Err(error) => Err(error),
1015 }
1016}
1017
1018pub fn binding_version_as_of(id: &str, as_of: OffsetDateTime) -> Result<u32, TriggerRegistryError> {
1019 TRIGGER_REGISTRY.with(|slot| {
1020 let registry = slot.borrow();
1021 registry.binding_version_as_of(id, as_of)
1022 })
1023}
1024
1025#[allow(clippy::arc_with_non_send_sync)]
1026fn resolve_trigger_binding_version(
1027 id: &str,
1028 version: u32,
1029) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
1030 TRIGGER_REGISTRY.with(|slot| {
1031 let registry = slot.borrow();
1032 registry
1033 .binding(id, version)
1034 .ok_or_else(|| TriggerRegistryError::UnknownBindingVersion {
1035 id: id.to_string(),
1036 version,
1037 })
1038 })
1039}
1040
1041#[allow(clippy::arc_with_non_send_sync)]
1042pub fn resolve_live_trigger_binding(
1043 id: &str,
1044 version: Option<u32>,
1045) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
1046 TRIGGER_REGISTRY.with(|slot| {
1047 let registry = slot.borrow();
1048 if let Some(version) = version {
1049 let binding = registry.binding(id, version).ok_or_else(|| {
1050 TriggerRegistryError::UnknownBindingVersion {
1051 id: id.to_string(),
1052 version,
1053 }
1054 })?;
1055 if binding.state_snapshot() == TriggerState::Terminated {
1056 return Err(TriggerRegistryError::UnknownBindingVersion {
1057 id: id.to_string(),
1058 version,
1059 });
1060 }
1061 return Ok(binding);
1062 }
1063
1064 registry
1065 .live_bindings_any_source(id)
1066 .into_iter()
1067 .max_by_key(|binding| binding.version)
1068 .ok_or_else(|| TriggerRegistryError::UnknownId(id.to_string()))
1069 })
1070}
1071
1072pub(crate) fn channel_bindings_matching(
1078 scope: &str,
1079 scope_id: &str,
1080 name: &str,
1081) -> Vec<Arc<TriggerBinding>> {
1082 TRIGGER_REGISTRY.with(|slot| {
1083 let registry = slot.borrow();
1084 let Some(binding_ids) = registry.by_provider.get("channel") else {
1085 return Vec::new();
1086 };
1087 let mut bindings = Vec::new();
1088 for id in binding_ids {
1089 let Some(versions) = registry.bindings.get(id) else {
1090 continue;
1091 };
1092 for binding in versions {
1093 if binding.state_snapshot() != TriggerState::Active {
1094 continue;
1095 }
1096 let Some(selector_raw) = binding.match_events.first() else {
1097 continue;
1098 };
1099 let Ok(selector) = crate::channels::ChannelSelector::parse(selector_raw) else {
1100 continue;
1101 };
1102 if !selector.matches(scope, scope_id, name, scope_id) {
1106 continue;
1107 }
1108 bindings.push(binding.clone());
1109 }
1110 }
1111 bindings.sort_by(|left, right| {
1112 left.id
1113 .as_str()
1114 .cmp(right.id.as_str())
1115 .then(left.version.cmp(&right.version))
1116 });
1117 bindings
1118 })
1119}
1120
1121pub(crate) fn matching_bindings(event: &super::TriggerEvent) -> Vec<Arc<TriggerBinding>> {
1122 TRIGGER_REGISTRY.with(|slot| {
1123 let registry = slot.borrow();
1124 let Some(binding_ids) = registry.by_provider.get(event.provider.as_str()) else {
1125 return Vec::new();
1126 };
1127
1128 let mut bindings = Vec::new();
1129 for id in binding_ids {
1130 let Some(versions) = registry.bindings.get(id) else {
1131 continue;
1132 };
1133 for binding in versions {
1134 if binding.state_snapshot() != TriggerState::Active {
1135 continue;
1136 }
1137 if !binding.match_events.is_empty()
1138 && !binding.match_events.iter().any(|kind| kind == &event.kind)
1139 {
1140 continue;
1141 }
1142 bindings.push(binding.clone());
1143 }
1144 }
1145
1146 bindings.sort_by(|left, right| {
1147 left.id
1148 .as_str()
1149 .cmp(right.id.as_str())
1150 .then(left.version.cmp(&right.version))
1151 });
1152 bindings
1153 })
1154}
1155
1156pub async fn install_manifest_triggers(
1157 specs: Vec<TriggerBindingSpec>,
1158) -> Result<(), TriggerRegistryError> {
1159 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1160 let registry = &mut *slot.borrow_mut();
1161 registry.refresh_runtime_context();
1162 let mut touched_ids = BTreeSet::new();
1163
1164 let mut incoming = BTreeMap::new();
1165 for spec in specs {
1166 let spec_id = spec.id.clone();
1167 if spec.source != TriggerBindingSource::Manifest {
1168 return Err(TriggerRegistryError::InvalidSpec(format!(
1169 "manifest install received non-manifest trigger '{spec_id}'"
1170 )));
1171 }
1172 if spec_id.trim().is_empty() {
1173 return Err(TriggerRegistryError::InvalidSpec(
1174 "manifest trigger id cannot be empty".to_string(),
1175 ));
1176 }
1177 if incoming.insert(spec_id.clone(), spec).is_some() {
1178 return Err(TriggerRegistryError::DuplicateId(spec_id));
1179 }
1180 }
1181
1182 let mut lifecycle = Vec::new();
1183 let existing_ids: Vec<String> = registry
1184 .bindings
1185 .iter()
1186 .filter(|(_, bindings)| {
1187 bindings.iter().any(|binding| {
1188 binding.source == TriggerBindingSource::Manifest
1189 && binding.state_snapshot() != TriggerState::Terminated
1190 })
1191 })
1192 .map(|(id, _)| id.clone())
1193 .collect();
1194
1195 for id in existing_ids {
1196 let live_manifest = registry.live_bindings(&id, TriggerBindingSource::Manifest);
1197 let Some(spec) = incoming.remove(&id) else {
1198 for binding in live_manifest {
1199 registry.transition_binding_to_draining(&binding, &mut lifecycle);
1200 }
1201 touched_ids.insert(id.clone());
1202 continue;
1203 };
1204
1205 let has_matching_active = live_manifest.iter().any(|binding| {
1206 binding.definition_fingerprint == spec.definition_fingerprint
1207 && matches!(
1208 binding.state_snapshot(),
1209 TriggerState::Registering | TriggerState::Active
1210 )
1211 });
1212 if has_matching_active {
1213 continue;
1214 }
1215
1216 for binding in live_manifest {
1217 registry.transition_binding_to_draining(&binding, &mut lifecycle);
1218 }
1219
1220 let version = registry.next_version_for_spec(&spec);
1221 registry.register_binding(spec, version, &mut lifecycle);
1222 touched_ids.insert(id.clone());
1223 }
1224
1225 for spec in incoming.into_values() {
1226 touched_ids.insert(spec.id.clone());
1227 let version = registry.next_version_for_spec(&spec);
1228 registry.register_binding(spec, version, &mut lifecycle);
1229 }
1230
1231 for id in touched_ids {
1232 registry.gc_terminated_versions(&id);
1233 }
1234
1235 Ok((registry.event_log.clone(), lifecycle))
1236 })?;
1237
1238 append_lifecycle_events(event_log, events).await
1239}
1240
1241pub async fn dynamic_register(
1242 mut spec: TriggerBindingSpec,
1243) -> Result<TriggerId, TriggerRegistryError> {
1244 if spec.id.trim().is_empty() {
1245 spec.id = format!("dynamic_trigger_{}", Uuid::now_v7());
1246 }
1247 spec.source = TriggerBindingSource::Dynamic;
1248 let id = spec.id.clone();
1249 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1250 let registry = &mut *slot.borrow_mut();
1251 registry.refresh_runtime_context();
1252
1253 if registry.bindings.contains_key(id.as_str()) {
1254 return Err(TriggerRegistryError::DuplicateId(id.clone()));
1255 }
1256
1257 let mut lifecycle = Vec::new();
1258 let version = registry.next_version_for_spec(&spec);
1259 registry.register_binding(spec, version, &mut lifecycle);
1260 Ok((registry.event_log.clone(), lifecycle))
1261 })?;
1262
1263 append_lifecycle_events(event_log, events).await?;
1264 Ok(TriggerId::new(id))
1265}
1266
1267pub async fn dynamic_deregister(id: &str) -> Result<(), TriggerRegistryError> {
1268 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1269 let registry = &mut *slot.borrow_mut();
1270 let live_dynamic = registry.live_bindings(id, TriggerBindingSource::Dynamic);
1271 if live_dynamic.is_empty() {
1272 return Err(TriggerRegistryError::UnknownId(id.to_string()));
1273 }
1274
1275 let mut lifecycle = Vec::new();
1276 for binding in live_dynamic {
1277 registry.transition_binding_to_draining(&binding, &mut lifecycle);
1278 }
1279 Ok((registry.event_log.clone(), lifecycle))
1280 })?;
1281
1282 append_lifecycle_events(event_log, events).await
1283}
1284
1285pub async fn drain(id: &str) -> Result<(), TriggerRegistryError> {
1286 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1287 let registry = &mut *slot.borrow_mut();
1288 let live = registry.live_bindings_any_source(id);
1289 if live.is_empty() {
1290 return Err(TriggerRegistryError::UnknownId(id.to_string()));
1291 }
1292
1293 let mut lifecycle = Vec::new();
1294 for binding in live {
1295 registry.transition_binding_to_draining(&binding, &mut lifecycle);
1296 }
1297 Ok((registry.event_log.clone(), lifecycle))
1298 })?;
1299
1300 append_lifecycle_events(event_log, events).await
1301}
1302
1303pub async fn pause(id: &str) -> Result<(), TriggerRegistryError> {
1304 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1305 let registry = &mut *slot.borrow_mut();
1306 let live = registry.live_bindings_any_source(id);
1307 if live.is_empty() {
1308 return Err(TriggerRegistryError::UnknownId(id.to_string()));
1309 }
1310
1311 let mut lifecycle = Vec::new();
1312 for binding in live {
1313 match binding.state_snapshot() {
1314 TriggerState::Registering | TriggerState::Active => {
1315 registry.transition_binding_state(
1316 &binding,
1317 TriggerState::Paused,
1318 &mut lifecycle,
1319 );
1320 }
1321 TriggerState::Paused | TriggerState::Draining | TriggerState::Terminated => {}
1322 }
1323 }
1324 Ok((registry.event_log.clone(), lifecycle))
1325 })?;
1326
1327 append_lifecycle_events(event_log, events).await
1328}
1329
1330pub async fn resume(id: &str) -> Result<(), TriggerRegistryError> {
1331 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1332 let registry = &mut *slot.borrow_mut();
1333 let live = registry.live_bindings_any_source(id);
1334 if live.is_empty() {
1335 return Err(TriggerRegistryError::UnknownId(id.to_string()));
1336 }
1337
1338 let mut lifecycle = Vec::new();
1339 for binding in live {
1340 if binding.state_snapshot() == TriggerState::Paused {
1341 registry.transition_binding_state(&binding, TriggerState::Active, &mut lifecycle);
1342 }
1343 }
1344 Ok((registry.event_log.clone(), lifecycle))
1345 })?;
1346
1347 append_lifecycle_events(event_log, events).await
1348}
1349
1350fn pin_trigger_binding_inner(
1351 id: &str,
1352 version: u32,
1353 allow_terminated: bool,
1354) -> Result<(), TriggerRegistryError> {
1355 TRIGGER_REGISTRY.with(|slot| {
1356 let registry = slot.borrow();
1357 let binding = registry.binding(id, version).ok_or_else(|| {
1358 TriggerRegistryError::UnknownBindingVersion {
1359 id: id.to_string(),
1360 version,
1361 }
1362 })?;
1363 match binding.state_snapshot() {
1364 TriggerState::Paused => Err(TriggerRegistryError::InvalidSpec(format!(
1365 "trigger binding '{id}' version {version} is paused"
1366 ))),
1367 TriggerState::Terminated if !allow_terminated => {
1368 Err(TriggerRegistryError::InvalidSpec(format!(
1369 "trigger binding '{id}' version {version} is terminated"
1370 )))
1371 }
1372 _ => {
1373 binding.in_flight.fetch_add(1, Ordering::Relaxed);
1374 Ok(())
1375 }
1376 }
1377 })
1378}
1379
1380pub fn pin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1381 pin_trigger_binding_inner(id, version, false)
1382}
1383
1384pub async fn unpin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1385 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1386 let registry = &mut *slot.borrow_mut();
1387 let binding = registry.binding(id, version).ok_or_else(|| {
1388 TriggerRegistryError::UnknownBindingVersion {
1389 id: id.to_string(),
1390 version,
1391 }
1392 })?;
1393 let current = binding.in_flight.load(Ordering::Relaxed);
1394 if current == 0 {
1395 return Err(TriggerRegistryError::InvalidSpec(format!(
1396 "trigger binding '{id}' version {version} has no in-flight events"
1397 )));
1398 }
1399 binding.in_flight.fetch_sub(1, Ordering::Relaxed);
1400
1401 let mut lifecycle = Vec::new();
1402 registry.maybe_finalize_draining(&binding, &mut lifecycle);
1403 registry.gc_terminated_versions(binding.id.as_str());
1404 Ok((registry.event_log.clone(), lifecycle))
1405 })?;
1406
1407 append_lifecycle_events(event_log, events).await
1408}
1409
1410pub fn begin_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1411 begin_in_flight_inner(id, version, false)
1412}
1413
1414pub(crate) fn begin_replay_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1415 begin_in_flight_inner(id, version, true)
1416}
1417
1418fn begin_in_flight_inner(
1419 id: &str,
1420 version: u32,
1421 allow_terminated: bool,
1422) -> Result<(), TriggerRegistryError> {
1423 pin_trigger_binding_inner(id, version, allow_terminated)?;
1424 TRIGGER_REGISTRY.with(|slot| {
1425 let registry = slot.borrow();
1426 let binding = registry.binding(id, version).ok_or_else(|| {
1427 TriggerRegistryError::UnknownBindingVersion {
1428 id: id.to_string(),
1429 version,
1430 }
1431 })?;
1432 binding.metrics.received.fetch_add(1, Ordering::Relaxed);
1433 *binding
1434 .metrics
1435 .last_received_ms
1436 .lock()
1437 .expect("trigger metrics poisoned") = Some(now_ms());
1438 Ok(())
1439 })
1440}
1441
1442pub async fn finish_in_flight(
1443 id: &str,
1444 version: u32,
1445 outcome: TriggerDispatchOutcome,
1446) -> Result<(), TriggerRegistryError> {
1447 TRIGGER_REGISTRY.with(|slot| {
1448 let registry = &mut *slot.borrow_mut();
1449 let binding = registry.binding(id, version).ok_or_else(|| {
1450 TriggerRegistryError::UnknownBindingVersion {
1451 id: id.to_string(),
1452 version,
1453 }
1454 })?;
1455 let current = binding.in_flight.load(Ordering::Relaxed);
1456 if current == 0 {
1457 return Err(TriggerRegistryError::InvalidSpec(format!(
1458 "trigger binding '{id}' version {version} has no in-flight events"
1459 )));
1460 }
1461 match outcome {
1462 TriggerDispatchOutcome::Dispatched => {
1463 binding.metrics.dispatched.fetch_add(1, Ordering::Relaxed);
1464 }
1465 TriggerDispatchOutcome::Failed => {
1466 binding.metrics.failed.fetch_add(1, Ordering::Relaxed);
1467 }
1468 TriggerDispatchOutcome::Dlq => {
1469 binding.metrics.dlq.fetch_add(1, Ordering::Relaxed);
1470 }
1471 }
1472 Ok(())
1473 })?;
1474
1475 unpin_trigger_binding(id, version).await
1476}
1477
1478impl TriggerRegistry {
1479 fn refresh_runtime_context(&mut self) {
1480 if self.event_log.is_none() {
1481 self.event_log = active_event_log();
1482 }
1483 if self.secret_provider.is_none() {
1484 self.secret_provider = default_secret_provider();
1485 }
1486 }
1487
1488 fn binding(&self, id: &str, version: u32) -> Option<Arc<TriggerBinding>> {
1489 self.bindings
1490 .get(id)
1491 .and_then(|bindings| bindings.iter().find(|binding| binding.version == version))
1492 .cloned()
1493 }
1494
1495 fn live_bindings(&self, id: &str, source: TriggerBindingSource) -> Vec<Arc<TriggerBinding>> {
1496 self.bindings
1497 .get(id)
1498 .into_iter()
1499 .flat_map(|bindings| bindings.iter())
1500 .filter(|binding| {
1501 binding.source == source && binding.state_snapshot() != TriggerState::Terminated
1502 })
1503 .cloned()
1504 .collect()
1505 }
1506
1507 fn live_bindings_any_source(&self, id: &str) -> Vec<Arc<TriggerBinding>> {
1508 self.bindings
1509 .get(id)
1510 .into_iter()
1511 .flat_map(|bindings| bindings.iter())
1512 .filter(|binding| binding.state_snapshot() != TriggerState::Terminated)
1513 .cloned()
1514 .collect()
1515 }
1516
1517 fn next_version_for_spec(&self, spec: &TriggerBindingSpec) -> u32 {
1518 if let Some(version) = self
1519 .bindings
1520 .get(spec.id.as_str())
1521 .into_iter()
1522 .flat_map(|bindings| bindings.iter())
1523 .find(|binding| binding.definition_fingerprint == spec.definition_fingerprint)
1524 .map(|binding| binding.version)
1525 {
1526 return version;
1527 }
1528
1529 let historical =
1530 self.historical_versions_for(spec.id.as_str(), spec.definition_fingerprint.as_str());
1531 if let Some(version) = historical.matching_version {
1532 return version;
1533 }
1534
1535 self.bindings
1536 .get(spec.id.as_str())
1537 .into_iter()
1538 .flat_map(|bindings| bindings.iter())
1539 .map(|binding| binding.version)
1540 .chain(historical.max_version)
1541 .max()
1542 .unwrap_or(0)
1543 + 1
1544 }
1545
1546 fn gc_terminated_versions(&mut self, id: &str) {
1547 let Some(bindings) = self.bindings.get_mut(id) else {
1548 return;
1549 };
1550
1551 let mut newest_versions: Vec<u32> =
1552 bindings.iter().map(|binding| binding.version).collect();
1553 newest_versions.sort_unstable_by(|left, right| right.cmp(left));
1554 newest_versions.truncate(TERMINATED_VERSION_RETENTION_LIMIT);
1555 let retained_versions: BTreeSet<u32> = newest_versions.into_iter().collect();
1556
1557 bindings.retain(|binding| {
1558 binding.state_snapshot() != TriggerState::Terminated
1559 || retained_versions.contains(&binding.version)
1560 });
1561
1562 if bindings.is_empty() {
1563 self.bindings.remove(id);
1564 }
1565 }
1566
1567 fn historical_versions_for(&self, id: &str, fingerprint: &str) -> HistoricalVersionLookup {
1568 let mut lookup = HistoricalVersionLookup::default();
1569 for record in self.lifecycle_records_for(id) {
1570 lookup.max_version = Some(
1571 lookup
1572 .max_version
1573 .unwrap_or(0)
1574 .max(record.transition.version),
1575 );
1576 if record.transition.definition_fingerprint.as_deref() == Some(fingerprint) {
1577 lookup.matching_version = Some(record.transition.version);
1578 }
1579 }
1580 lookup
1581 }
1582
1583 fn binding_version_as_of(
1584 &self,
1585 id: &str,
1586 as_of: OffsetDateTime,
1587 ) -> Result<u32, TriggerRegistryError> {
1588 let cutoff_ms = harn_clock::offset_datetime_to_ms(as_of);
1589 let mut active_version = None;
1590 for record in self.lifecycle_records_for(id) {
1591 if record.occurred_at_ms > cutoff_ms {
1592 break;
1593 }
1594 match record.transition.to_state {
1595 TriggerState::Active => active_version = Some(record.transition.version),
1596 TriggerState::Paused | TriggerState::Draining | TriggerState::Terminated => {
1597 if active_version == Some(record.transition.version) {
1598 active_version = None;
1599 }
1600 }
1601 TriggerState::Registering => {}
1602 }
1603 }
1604
1605 active_version.ok_or_else(|| {
1606 TriggerRegistryError::InvalidSpec(format!(
1607 "no active trigger binding '{}' at {}",
1608 id,
1609 as_of
1610 .format(&time::format_description::well_known::Rfc3339)
1611 .unwrap_or_else(|_| as_of.to_string())
1612 ))
1613 })
1614 }
1615
1616 fn lifecycle_records_for(&self, id: &str) -> Vec<HistoricalLifecycleRecord> {
1617 let Some(event_log) = self.event_log.as_ref() else {
1618 return Vec::new();
1619 };
1620 let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1621 .expect("static triggers.lifecycle topic should always be valid");
1622 futures::executor::block_on(event_log.read_range(&topic, None, usize::MAX))
1623 .unwrap_or_default()
1624 .into_iter()
1625 .filter_map(|(_, event)| {
1626 let occurred_at_ms = event.occurred_at_ms;
1627 let transition: LifecycleStateTransitionRecord =
1628 serde_json::from_value(event.payload).ok()?;
1629 (transition.id == id).then_some(HistoricalLifecycleRecord {
1630 occurred_at_ms,
1631 transition,
1632 })
1633 })
1634 .collect()
1635 }
1636
1637 #[allow(clippy::arc_with_non_send_sync)]
1638 fn register_binding(
1639 &mut self,
1640 spec: TriggerBindingSpec,
1641 version: u32,
1642 lifecycle: &mut Vec<LogEvent>,
1643 ) -> Arc<TriggerBinding> {
1644 let binding = Arc::new(TriggerBinding::new(spec, version));
1645 self.by_provider
1646 .entry(binding.provider.as_str().to_string())
1647 .or_default()
1648 .insert(binding.id.as_str().to_string());
1649 self.bindings
1650 .entry(binding.id.as_str().to_string())
1651 .or_default()
1652 .push(binding.clone());
1653 lifecycle.push(lifecycle_event(&binding, None, TriggerState::Registering));
1654 self.transition_binding_state(&binding, TriggerState::Active, lifecycle);
1655 binding
1656 }
1657
1658 fn transition_binding_to_draining(
1659 &self,
1660 binding: &Arc<TriggerBinding>,
1661 lifecycle: &mut Vec<LogEvent>,
1662 ) {
1663 if matches!(binding.state_snapshot(), TriggerState::Terminated) {
1664 return;
1665 }
1666 self.transition_binding_state(binding, TriggerState::Draining, lifecycle);
1667 self.maybe_finalize_draining(binding, lifecycle);
1668 }
1669
1670 fn maybe_finalize_draining(
1671 &self,
1672 binding: &Arc<TriggerBinding>,
1673 lifecycle: &mut Vec<LogEvent>,
1674 ) {
1675 if binding.state_snapshot() == TriggerState::Draining
1676 && binding.in_flight.load(Ordering::Relaxed) == 0
1677 {
1678 self.transition_binding_state(binding, TriggerState::Terminated, lifecycle);
1679 }
1680 }
1681
1682 fn transition_binding_state(
1683 &self,
1684 binding: &Arc<TriggerBinding>,
1685 next: TriggerState,
1686 lifecycle: &mut Vec<LogEvent>,
1687 ) {
1688 let mut state = binding.state.lock().expect("trigger state poisoned");
1689 let previous = *state;
1690 if previous == next {
1691 return;
1692 }
1693 *state = next;
1694 drop(state);
1695 if next == TriggerState::Terminated && binding.aggregation.is_some() {
1701 let _ = super::aggregation::drop_binding_aggregation(&binding.binding_key());
1702 }
1703 lifecycle.push(lifecycle_event(binding, Some(previous), next));
1704 }
1705}
1706
1707fn lifecycle_event(
1708 binding: &TriggerBinding,
1709 from_state: Option<TriggerState>,
1710 to_state: TriggerState,
1711) -> LogEvent {
1712 LogEvent::new(
1713 "state_transition",
1714 serde_json::json!({
1715 "id": binding.id.as_str(),
1716 "binding_key": binding.binding_key(),
1717 "version": binding.version,
1718 "provider": binding.provider.as_str(),
1719 "kind": &binding.kind,
1720 "source": binding.source.as_str(),
1721 "handler_kind": binding.handler.kind(),
1722 "definition_fingerprint": &binding.definition_fingerprint,
1723 "from_state": from_state.map(TriggerState::as_str),
1724 "to_state": to_state.as_str(),
1725 }),
1726 )
1727}
1728
1729async fn append_lifecycle_events(
1730 event_log: Option<Arc<AnyEventLog>>,
1731 events: Vec<LogEvent>,
1732) -> Result<(), TriggerRegistryError> {
1733 let Some(event_log) = event_log else {
1734 return Ok(());
1735 };
1736 if events.is_empty() {
1737 return Ok(());
1738 }
1739
1740 let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1741 .expect("static triggers.lifecycle topic should always be valid");
1742 for event in events {
1743 event_log
1744 .append(&topic, event)
1745 .await
1746 .map_err(|error| TriggerRegistryError::EventLog(error.to_string()))?;
1747 }
1748 Ok(())
1749}
1750
1751fn default_secret_provider() -> Option<Arc<dyn SecretProvider>> {
1752 configured_default_chain(default_secret_namespace())
1753 .ok()
1754 .map(|provider| Arc::new(provider) as Arc<dyn SecretProvider>)
1755}
1756
1757fn default_secret_namespace() -> String {
1758 if let Ok(namespace) = std::env::var("HARN_SECRET_NAMESPACE") {
1759 if !namespace.trim().is_empty() {
1760 return namespace;
1761 }
1762 }
1763
1764 let cwd = std::env::current_dir().unwrap_or_default();
1765 let leaf = cwd
1766 .file_name()
1767 .and_then(|name| name.to_str())
1768 .filter(|name| !name.is_empty())
1769 .unwrap_or("workspace");
1770 format!("harn/{leaf}")
1771}
1772
1773fn now_ms() -> i64 {
1774 clock::now_ms()
1775}
1776
1777#[cfg(test)]
1778mod tests {
1779 use super::*;
1780 use crate::event_log::{install_default_for_base_dir, reset_active_event_log};
1781 use crate::events::{add_event_sink, clear_event_sinks, CollectorSink, EventLevel};
1782 use std::rc::Rc;
1783
1784 fn manifest_spec(id: &str, fingerprint: &str) -> TriggerBindingSpec {
1785 TriggerBindingSpec {
1786 id: id.to_string(),
1787 source: TriggerBindingSource::Manifest,
1788 kind: "webhook".to_string(),
1789 provider: ProviderId::from("github"),
1790 autonomy_tier: crate::AutonomyTier::ActAuto,
1791 handler: TriggerHandlerSpec::Worker {
1792 queue: format!("{id}-queue"),
1793 },
1794 dispatch_priority: crate::WorkerQueuePriority::Normal,
1795 when: None,
1796 when_budget: None,
1797 retry: TriggerRetryConfig::default(),
1798 match_events: vec!["issues.opened".to_string()],
1799 dedupe_key: Some("event.dedupe_key".to_string()),
1800 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1801 filter: Some("event.kind".to_string()),
1802 daily_cost_usd: Some(5.0),
1803 hourly_cost_usd: None,
1804 max_autonomous_decisions_per_hour: None,
1805 max_autonomous_decisions_per_day: None,
1806 on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1807 max_concurrent: Some(10),
1808 flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1809 aggregation: None,
1810 manifest_path: None,
1811 package_name: Some("workspace".to_string()),
1812 definition_fingerprint: fingerprint.to_string(),
1813 }
1814 }
1815
1816 fn dynamic_spec(id: &str) -> TriggerBindingSpec {
1817 TriggerBindingSpec {
1818 id: id.to_string(),
1819 source: TriggerBindingSource::Dynamic,
1820 kind: "webhook".to_string(),
1821 provider: ProviderId::from("github"),
1822 autonomy_tier: crate::AutonomyTier::ActAuto,
1823 handler: TriggerHandlerSpec::Worker {
1824 queue: format!("{id}-queue"),
1825 },
1826 dispatch_priority: crate::WorkerQueuePriority::Normal,
1827 when: None,
1828 when_budget: None,
1829 retry: TriggerRetryConfig::default(),
1830 match_events: vec!["issues.opened".to_string()],
1831 dedupe_key: None,
1832 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1833 filter: None,
1834 daily_cost_usd: None,
1835 hourly_cost_usd: None,
1836 max_autonomous_decisions_per_hour: None,
1837 max_autonomous_decisions_per_day: None,
1838 on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1839 max_concurrent: None,
1840 flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1841 aggregation: None,
1842 manifest_path: None,
1843 package_name: None,
1844 definition_fingerprint: format!("dynamic:{id}"),
1845 }
1846 }
1847
1848 #[tokio::test(flavor = "current_thread")]
1849 async fn manifest_loaded_trigger_registers_with_zeroed_metrics() {
1850 clear_trigger_registry();
1851
1852 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1853 .await
1854 .expect("manifest trigger installs");
1855
1856 let snapshots = snapshot_trigger_bindings();
1857 assert_eq!(snapshots.len(), 1);
1858 let binding = &snapshots[0];
1859 assert_eq!(binding.id, "github-new-issue");
1860 assert_eq!(binding.version, 1);
1861 assert_eq!(binding.state, TriggerState::Active);
1862 assert_eq!(binding.metrics, TriggerMetricsSnapshot::default());
1863
1864 clear_trigger_registry();
1865 }
1866
1867 #[tokio::test(flavor = "current_thread")]
1868 async fn dynamic_register_assigns_unique_ids_and_rejects_duplicates() {
1869 clear_trigger_registry();
1870
1871 let first = dynamic_register(dynamic_spec("dynamic-a"))
1872 .await
1873 .expect("first dynamic trigger");
1874 let second = dynamic_register(dynamic_spec("dynamic-b"))
1875 .await
1876 .expect("second dynamic trigger");
1877 assert_ne!(first, second);
1878
1879 let error = dynamic_register(dynamic_spec("dynamic-a"))
1880 .await
1881 .expect_err("duplicate id should fail");
1882 assert!(matches!(error, TriggerRegistryError::DuplicateId(_)));
1883
1884 clear_trigger_registry();
1885 }
1886
1887 #[test]
1888 fn expected_predicate_cost_average_does_not_overflow() {
1889 let binding = TriggerBinding::new(manifest_spec("costed", "v1"), 1);
1890 record_predicate_cost_sample(&binding, u64::MAX);
1891 record_predicate_cost_sample(&binding, u64::MAX);
1892
1893 assert_eq!(expected_predicate_cost_usd_micros(&binding), u64::MAX);
1894 }
1895
1896 #[tokio::test(flavor = "current_thread")]
1897 async fn drain_waits_for_in_flight_events_before_terminating() {
1898 clear_trigger_registry();
1899
1900 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1901 .await
1902 .expect("manifest trigger installs");
1903 begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1904
1905 drain("github-new-issue").await.expect("drain succeeds");
1906 let binding = snapshot_trigger_bindings()
1907 .into_iter()
1908 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1909 .expect("binding snapshot");
1910 assert_eq!(binding.state, TriggerState::Draining);
1911 assert_eq!(binding.metrics.in_flight, 1);
1912
1913 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1914 .await
1915 .expect("finish in-flight event");
1916 let binding = snapshot_trigger_bindings()
1917 .into_iter()
1918 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1919 .expect("binding snapshot");
1920 assert_eq!(binding.state, TriggerState::Terminated);
1921 assert_eq!(binding.metrics.in_flight, 0);
1922
1923 clear_trigger_registry();
1924 }
1925
1926 #[tokio::test(flavor = "current_thread")]
1927 async fn hot_reload_registers_new_version_while_old_binding_drains() {
1928 clear_trigger_registry();
1929
1930 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1931 .await
1932 .expect("initial manifest trigger installs");
1933 begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1934
1935 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1936 .await
1937 .expect("updated manifest trigger installs");
1938
1939 let snapshots = snapshot_trigger_bindings();
1940 assert_eq!(snapshots.len(), 2);
1941 let old = snapshots
1942 .iter()
1943 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1944 .expect("old binding");
1945 let new = snapshots
1946 .iter()
1947 .find(|binding| binding.id == "github-new-issue" && binding.version == 2)
1948 .expect("new binding");
1949 assert_eq!(old.state, TriggerState::Draining);
1950 assert_eq!(new.state, TriggerState::Active);
1951
1952 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1953 .await
1954 .expect("finish old in-flight event");
1955 let old = snapshot_trigger_bindings()
1956 .into_iter()
1957 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1958 .expect("old binding");
1959 assert_eq!(old.state, TriggerState::Terminated);
1960
1961 clear_trigger_registry();
1962 }
1963
1964 #[tokio::test(flavor = "current_thread")]
1965 async fn gc_drops_terminated_versions_beyond_retention_limit() {
1966 clear_trigger_registry();
1967
1968 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1969 .await
1970 .expect("install v1");
1971 begin_in_flight("github-new-issue", 1).expect("pin v1");
1972
1973 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1974 .await
1975 .expect("install v2");
1976 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1977 .await
1978 .expect("finish v1");
1979
1980 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
1981 .await
1982 .expect("install v3");
1983
1984 let snapshots = snapshot_trigger_bindings();
1985 let versions: Vec<u32> = snapshots
1986 .into_iter()
1987 .filter(|binding| binding.id == "github-new-issue")
1988 .map(|binding| binding.version)
1989 .collect();
1990 assert_eq!(versions, vec![2, 3]);
1991
1992 clear_trigger_registry();
1993 }
1994
1995 #[tokio::test(flavor = "current_thread")]
1996 async fn lifecycle_transitions_append_to_event_log() {
1997 clear_trigger_registry();
1998 reset_active_event_log();
1999 let tempdir = tempfile::tempdir().expect("tempdir");
2000 let log = install_default_for_base_dir(tempdir.path()).expect("install event log");
2001
2002 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2003 .await
2004 .expect("manifest trigger installs");
2005 begin_in_flight("github-new-issue", 1).expect("start in-flight event");
2006 drain("github-new-issue").await.expect("drain succeeds");
2007 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
2008 .await
2009 .expect("finish event");
2010
2011 let topic = Topic::new("triggers.lifecycle").expect("valid lifecycle topic");
2012 let events = log
2013 .read_range(&topic, None, 32)
2014 .await
2015 .expect("read lifecycle events");
2016 let states: Vec<String> = events
2017 .into_iter()
2018 .filter_map(|(_, event)| {
2019 event
2020 .payload
2021 .get("to_state")
2022 .and_then(|value| value.as_str())
2023 .map(|value| value.to_string())
2024 })
2025 .collect();
2026 assert_eq!(
2027 states,
2028 vec![
2029 "registering".to_string(),
2030 "active".to_string(),
2031 "draining".to_string(),
2032 "terminated".to_string(),
2033 ]
2034 );
2035
2036 reset_active_event_log();
2037 clear_trigger_registry();
2038 }
2039
2040 #[tokio::test(flavor = "current_thread")]
2041 async fn version_history_reuses_historical_version_after_restart() {
2042 clear_trigger_registry();
2043 reset_active_event_log();
2044 let tempdir = tempfile::tempdir().expect("tempdir");
2045 install_default_for_base_dir(tempdir.path()).expect("install event log");
2046
2047 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2048 .await
2049 .expect("initial manifest trigger installs");
2050 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2051 .await
2052 .expect("updated manifest trigger installs");
2053
2054 clear_trigger_registry();
2055 reset_active_event_log();
2056 install_default_for_base_dir(tempdir.path()).expect("reopen event log");
2057
2058 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2059 .await
2060 .expect("manifest reload reuses historical version");
2061
2062 let binding = snapshot_trigger_bindings()
2063 .into_iter()
2064 .find(|binding| binding.id == "github-new-issue")
2065 .expect("binding snapshot");
2066 assert_eq!(binding.version, 2);
2067
2068 reset_active_event_log();
2069 clear_trigger_registry();
2070 }
2071
2072 #[tokio::test(flavor = "current_thread")]
2073 async fn binding_version_as_of_reports_historical_active_version() {
2074 clear_trigger_registry();
2075 reset_active_event_log();
2076 let tempdir = tempfile::tempdir().expect("tempdir");
2077 install_default_for_base_dir(tempdir.path()).expect("install event log");
2078
2079 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2080 .await
2081 .expect("initial manifest trigger installs");
2082 let before_reload = OffsetDateTime::now_utc();
2089 std::thread::sleep(std::time::Duration::from_millis(50));
2090
2091 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2092 .await
2093 .expect("updated manifest trigger installs");
2094 let after_reload = OffsetDateTime::now_utc();
2095
2096 assert_eq!(
2097 binding_version_as_of("github-new-issue", before_reload)
2098 .expect("version before reload"),
2099 1
2100 );
2101 assert_eq!(
2102 binding_version_as_of("github-new-issue", after_reload).expect("version after reload"),
2103 2
2104 );
2105
2106 reset_active_event_log();
2107 clear_trigger_registry();
2108 }
2109
2110 #[tokio::test(flavor = "current_thread")]
2111 async fn resolve_live_or_as_of_logs_structured_gc_fallback() {
2112 clear_trigger_registry();
2113 reset_active_event_log();
2114 let sink = Rc::new(CollectorSink::new());
2115 clear_event_sinks();
2116 add_event_sink(sink.clone());
2117 let tempdir = tempfile::tempdir().expect("tempdir");
2118 install_default_for_base_dir(tempdir.path()).expect("install event log");
2119
2120 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2121 .await
2122 .expect("install v1");
2123 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2124 .await
2125 .expect("install v2");
2126 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
2127 .await
2128 .expect("install v3");
2129 let received_at = OffsetDateTime::now_utc();
2133 std::thread::sleep(std::time::Duration::from_millis(50));
2134 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v4")])
2135 .await
2136 .expect("install v4");
2137
2138 let binding = resolve_live_or_as_of(
2139 "github-new-issue",
2140 RecordedTriggerBinding {
2141 version: 1,
2142 received_at,
2143 },
2144 )
2145 .expect("resolve fallback binding");
2146 assert_eq!(binding.version, 3);
2147
2148 let warning = sink
2149 .logs
2150 .borrow()
2151 .iter()
2152 .find(|log| log.category == "replay.binding_version_gc_fallback")
2153 .cloned()
2154 .expect("gc fallback warning");
2155 assert_eq!(warning.level, EventLevel::Warn);
2156 assert_eq!(
2157 warning.metadata.get("trigger_id"),
2158 Some(&serde_json::json!("github-new-issue"))
2159 );
2160 assert_eq!(
2161 warning.metadata.get("recorded_version"),
2162 Some(&serde_json::json!(1))
2163 );
2164 assert_eq!(
2165 warning.metadata.get("received_at"),
2166 Some(&serde_json::json!(received_at
2167 .format(&time::format_description::well_known::Rfc3339)
2168 .unwrap_or_else(|_| received_at.to_string())))
2169 );
2170 assert_eq!(
2171 warning.metadata.get("resolved_version"),
2172 Some(&serde_json::json!(3))
2173 );
2174
2175 clear_event_sinks();
2176 crate::events::reset_event_sinks();
2177 reset_active_event_log();
2178 clear_trigger_registry();
2179 }
2180}