1use serde::{Deserialize, Serialize};
2use std::cell::RefCell;
3use std::collections::{BTreeMap, BTreeSet, VecDeque};
4use std::path::PathBuf;
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6use std::sync::{Arc, Mutex};
7
8use time::OffsetDateTime;
9use uuid::Uuid;
10
11use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogEvent, Topic};
12use crate::llm::trigger_predicate::TriggerPredicateBudget;
13use crate::secrets::{configured_default_chain, SecretProvider};
14use crate::triggers::test_util::clock;
15use crate::trust_graph::AutonomyTier;
16use crate::value::VmClosure;
17
18use super::aggregation::TriggerAggregationConfig;
19use super::dispatcher::TriggerRetryConfig;
20use super::flow_control::TriggerFlowControlConfig;
21use super::ProviderId;
22
23#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
24pub struct TriggerId(String);
25
26impl TriggerId {
27 pub fn new(value: impl Into<String>) -> Self {
28 Self(value.into())
29 }
30
31 pub fn as_str(&self) -> &str {
32 &self.0
33 }
34}
35
36impl std::fmt::Display for TriggerId {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 self.0.fmt(f)
39 }
40}
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
43#[serde(rename_all = "snake_case")]
44pub enum TriggerState {
45 Registering,
46 Active,
47 Paused,
48 Draining,
49 Terminated,
50}
51
52impl TriggerState {
53 pub fn as_str(self) -> &'static str {
54 match self {
55 Self::Registering => "registering",
56 Self::Active => "active",
57 Self::Paused => "paused",
58 Self::Draining => "draining",
59 Self::Terminated => "terminated",
60 }
61 }
62}
63
64#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
65#[serde(rename_all = "snake_case")]
66pub enum TriggerBindingSource {
67 Manifest,
68 Dynamic,
69}
70
71impl TriggerBindingSource {
72 pub fn as_str(self) -> &'static str {
73 match self {
74 Self::Manifest => "manifest",
75 Self::Dynamic => "dynamic",
76 }
77 }
78}
79
80#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
81#[serde(rename_all = "snake_case")]
82pub enum TriggerBudgetExhaustionStrategy {
83 #[default]
84 False,
85 RetryLater,
86 Fail,
87 Warn,
88}
89
90impl TriggerBudgetExhaustionStrategy {
91 pub fn as_str(self) -> &'static str {
92 match self {
93 Self::False => "false",
94 Self::RetryLater => "retry_later",
95 Self::Fail => "fail",
96 Self::Warn => "warn",
97 }
98 }
99}
100
101#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
102pub struct OrchestratorBudgetConfig {
103 pub daily_cost_usd: Option<f64>,
104 pub hourly_cost_usd: Option<f64>,
105}
106
107#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
108pub struct OrchestratorBudgetSnapshot {
109 pub daily_cost_usd: Option<f64>,
110 pub hourly_cost_usd: Option<f64>,
111 pub cost_today_usd_micros: u64,
112 pub cost_hour_usd_micros: u64,
113 pub day_utc: i32,
114 pub hour_utc: i64,
115}
116
117#[derive(Debug)]
118struct OrchestratorBudgetState {
119 config: OrchestratorBudgetConfig,
120 day_utc: i32,
121 hour_utc: i64,
122 cost_today_usd_micros: u64,
123 cost_hour_usd_micros: u64,
124}
125
126impl Default for OrchestratorBudgetState {
127 fn default() -> Self {
128 Self {
129 config: OrchestratorBudgetConfig::default(),
130 day_utc: utc_day_key(),
131 hour_utc: utc_hour_key(),
132 cost_today_usd_micros: 0,
133 cost_hour_usd_micros: 0,
134 }
135 }
136}
137
138#[derive(Clone)]
139pub enum TriggerHandlerSpec {
140 Local {
141 raw: String,
142 closure: Arc<VmClosure>,
143 },
144 A2a {
145 target: String,
146 allow_cleartext: bool,
147 },
148 Worker {
149 queue: String,
150 },
151 Persona {
152 binding: crate::PersonaRuntimeBinding,
153 },
154 EvalPack {
155 target: String,
156 manifest: Box<crate::orchestration::EvalPackManifest>,
157 ledger_options: Option<serde_json::Value>,
158 },
159 AutoResume {
160 worker_id: String,
161 },
162 SpawnToPool {
167 pool: String,
168 priority_from: Option<String>,
169 key_from: Option<String>,
170 task_factory: Arc<VmClosure>,
171 },
172 ReminderInject {
181 target: TargetExpr,
182 body: String,
183 tags: Vec<String>,
184 ttl_turns: Option<i64>,
185 dedupe_key: Option<String>,
186 propagate: crate::llm::helpers::ReminderPropagate,
187 role_hint: crate::llm::helpers::ReminderRoleHint,
188 preserve_on_compact: bool,
189 },
190 InterruptAndSuspend {
203 target_agents: AgentScope,
204 reason: String,
205 },
206}
207
208#[derive(Clone)]
216pub enum AgentScope {
217 All,
218 Concrete(Vec<String>),
219 Closure(Arc<VmClosure>),
220}
221
222impl AgentScope {
223 pub fn kind(&self) -> &'static str {
224 match self {
225 Self::All => "all",
226 Self::Concrete(_) => "concrete",
227 Self::Closure(_) => "closure",
228 }
229 }
230}
231
232impl std::fmt::Debug for AgentScope {
233 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234 match self {
235 Self::All => f.write_str("All"),
236 Self::Concrete(ids) => f.debug_tuple("Concrete").field(ids).finish(),
237 Self::Closure(_) => f.write_str("Closure(<vm_closure>)"),
238 }
239 }
240}
241
242#[derive(Clone)]
249pub enum TargetExpr {
250 Current,
251 Parent,
252 Concrete(String),
253 Closure(Arc<VmClosure>),
254}
255
256impl TargetExpr {
257 pub fn kind(&self) -> &'static str {
258 match self {
259 Self::Current => "current",
260 Self::Parent => "parent",
261 Self::Concrete(_) => "concrete",
262 Self::Closure(_) => "closure",
263 }
264 }
265}
266
267impl std::fmt::Debug for TargetExpr {
268 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269 match self {
270 Self::Current => f.write_str("Current"),
271 Self::Parent => f.write_str("Parent"),
272 Self::Concrete(id) => f.debug_tuple("Concrete").field(id).finish(),
273 Self::Closure(_) => f.write_str("Closure(<vm_closure>)"),
274 }
275 }
276}
277
278impl std::fmt::Debug for TriggerHandlerSpec {
279 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280 match self {
281 Self::Local { raw, .. } => f.debug_struct("Local").field("raw", raw).finish(),
282 Self::A2a {
283 target,
284 allow_cleartext,
285 } => f
286 .debug_struct("A2a")
287 .field("target", target)
288 .field("allow_cleartext", allow_cleartext)
289 .finish(),
290 Self::Worker { queue } => f.debug_struct("Worker").field("queue", queue).finish(),
291 Self::Persona { binding } => f
292 .debug_struct("Persona")
293 .field("name", &binding.name)
294 .finish(),
295 Self::EvalPack {
296 target,
297 manifest,
298 ledger_options,
299 } => f
300 .debug_struct("EvalPack")
301 .field("target", target)
302 .field("pack_id", &manifest.id)
303 .field("ledger_options", ledger_options)
304 .finish(),
305 Self::AutoResume { worker_id } => f
306 .debug_struct("AutoResume")
307 .field("worker_id", worker_id)
308 .finish(),
309 Self::SpawnToPool {
310 pool,
311 priority_from,
312 key_from,
313 ..
314 } => f
315 .debug_struct("SpawnToPool")
316 .field("pool", pool)
317 .field("priority_from", priority_from)
318 .field("key_from", key_from)
319 .finish(),
320 Self::ReminderInject {
321 target,
322 body,
323 tags,
324 ttl_turns,
325 dedupe_key,
326 propagate,
327 role_hint,
328 preserve_on_compact,
329 } => f
330 .debug_struct("ReminderInject")
331 .field("target", target)
332 .field("body", body)
333 .field("tags", tags)
334 .field("ttl_turns", ttl_turns)
335 .field("dedupe_key", dedupe_key)
336 .field("propagate", propagate)
337 .field("role_hint", role_hint)
338 .field("preserve_on_compact", preserve_on_compact)
339 .finish(),
340 Self::InterruptAndSuspend {
341 target_agents,
342 reason,
343 } => f
344 .debug_struct("InterruptAndSuspend")
345 .field("target_agents", target_agents)
346 .field("reason", reason)
347 .finish(),
348 }
349 }
350}
351
352impl TriggerHandlerSpec {
353 pub fn kind(&self) -> &'static str {
354 match self {
355 Self::Local { .. } => "local",
356 Self::A2a { .. } => "a2a",
357 Self::Worker { .. } => "worker",
358 Self::Persona { .. } => "persona",
359 Self::EvalPack { .. } => "eval_pack",
360 Self::AutoResume { .. } => "auto_resume",
361 Self::SpawnToPool { .. } => "spawn_to_pool",
362 Self::ReminderInject { .. } => "reminder_inject",
363 Self::InterruptAndSuspend { .. } => "interrupt_and_suspend",
364 }
365 }
366}
367
368#[derive(Clone)]
369pub struct TriggerPredicateSpec {
370 pub raw: String,
371 pub closure: Arc<VmClosure>,
372}
373
374impl std::fmt::Debug for TriggerPredicateSpec {
375 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
376 f.debug_struct("TriggerPredicateSpec")
377 .field("raw", &self.raw)
378 .finish()
379 }
380}
381
382#[derive(Clone, Debug)]
383pub struct TriggerBindingSpec {
384 pub id: String,
385 pub source: TriggerBindingSource,
386 pub kind: String,
387 pub provider: ProviderId,
388 pub autonomy_tier: AutonomyTier,
389 pub handler: TriggerHandlerSpec,
390 pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
391 pub when: Option<TriggerPredicateSpec>,
392 pub when_budget: Option<TriggerPredicateBudget>,
393 pub retry: TriggerRetryConfig,
394 pub match_events: Vec<String>,
395 pub dedupe_key: Option<String>,
396 pub dedupe_retention_days: u32,
397 pub filter: Option<String>,
398 pub daily_cost_usd: Option<f64>,
399 pub hourly_cost_usd: Option<f64>,
400 pub max_autonomous_decisions_per_hour: Option<u64>,
401 pub max_autonomous_decisions_per_day: Option<u64>,
402 pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
403 pub max_concurrent: Option<u32>,
404 pub flow_control: TriggerFlowControlConfig,
405 pub aggregation: Option<TriggerAggregationConfig>,
410 pub manifest_path: Option<PathBuf>,
411 pub package_name: Option<String>,
412 pub definition_fingerprint: String,
413}
414
415#[derive(Debug)]
416pub struct TriggerMetrics {
417 pub received: AtomicU64,
418 pub dispatched: AtomicU64,
419 pub failed: AtomicU64,
420 pub dlq: AtomicU64,
421 pub last_received_ms: Mutex<Option<i64>>,
422 pub cost_total_usd_micros: AtomicU64,
423 pub cost_today_usd_micros: AtomicU64,
424 pub cost_hour_usd_micros: AtomicU64,
425 pub autonomous_decisions_total: AtomicU64,
426 pub autonomous_decisions_today: AtomicU64,
427 pub autonomous_decisions_hour: AtomicU64,
428}
429
430impl Default for TriggerMetrics {
431 fn default() -> Self {
432 Self {
433 received: AtomicU64::new(0),
434 dispatched: AtomicU64::new(0),
435 failed: AtomicU64::new(0),
436 dlq: AtomicU64::new(0),
437 last_received_ms: Mutex::new(None),
438 cost_total_usd_micros: AtomicU64::new(0),
439 cost_today_usd_micros: AtomicU64::new(0),
440 cost_hour_usd_micros: AtomicU64::new(0),
441 autonomous_decisions_total: AtomicU64::new(0),
442 autonomous_decisions_today: AtomicU64::new(0),
443 autonomous_decisions_hour: AtomicU64::new(0),
444 }
445 }
446}
447
448#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
449pub struct TriggerMetricsSnapshot {
450 pub received: u64,
451 pub dispatched: u64,
452 pub failed: u64,
453 pub dlq: u64,
454 pub in_flight: u64,
455 pub last_received_ms: Option<i64>,
456 pub cost_total_usd_micros: u64,
457 pub cost_today_usd_micros: u64,
458 pub cost_hour_usd_micros: u64,
459 pub autonomous_decisions_total: u64,
460 pub autonomous_decisions_today: u64,
461 pub autonomous_decisions_hour: u64,
462}
463
464pub struct TriggerBinding {
465 pub id: TriggerId,
466 pub version: u32,
467 pub source: TriggerBindingSource,
468 pub kind: String,
469 pub provider: ProviderId,
470 pub autonomy_tier: AutonomyTier,
471 pub handler: TriggerHandlerSpec,
472 pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
473 pub when: Option<TriggerPredicateSpec>,
474 pub when_budget: Option<TriggerPredicateBudget>,
475 pub retry: TriggerRetryConfig,
476 pub match_events: Vec<String>,
477 pub dedupe_key: Option<String>,
478 pub dedupe_retention_days: u32,
479 pub filter: Option<String>,
480 pub daily_cost_usd: Option<f64>,
481 pub hourly_cost_usd: Option<f64>,
482 pub max_autonomous_decisions_per_hour: Option<u64>,
483 pub max_autonomous_decisions_per_day: Option<u64>,
484 pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
485 pub max_concurrent: Option<u32>,
486 pub flow_control: TriggerFlowControlConfig,
487 pub aggregation: Option<TriggerAggregationConfig>,
491 pub manifest_path: Option<PathBuf>,
492 pub package_name: Option<String>,
493 pub definition_fingerprint: String,
494 pub state: Mutex<TriggerState>,
495 pub metrics: TriggerMetrics,
496 pub in_flight: AtomicU64,
497 pub cancel_token: Arc<AtomicBool>,
498 pub predicate_state: Mutex<TriggerPredicateState>,
499}
500
501#[derive(Clone, Debug, Default)]
502pub struct TriggerPredicateState {
503 pub budget_day_utc: Option<i32>,
504 pub budget_hour_utc: Option<i64>,
505 pub consecutive_failures: u32,
506 pub breaker_open_until_ms: Option<i64>,
507 pub recent_cost_usd_micros: VecDeque<u64>,
508}
509
510impl std::fmt::Debug for TriggerBinding {
511 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
512 f.debug_struct("TriggerBinding")
513 .field("id", &self.id)
514 .field("version", &self.version)
515 .field("source", &self.source)
516 .field("kind", &self.kind)
517 .field("provider", &self.provider)
518 .field("handler_kind", &self.handler.kind())
519 .field("state", &self.state_snapshot())
520 .finish()
521 }
522}
523
524impl TriggerBinding {
525 pub fn snapshot(&self) -> TriggerBindingSnapshot {
526 TriggerBindingSnapshot {
527 id: self.id.as_str().to_string(),
528 version: self.version,
529 source: self.source,
530 kind: self.kind.clone(),
531 provider: self.provider.as_str().to_string(),
532 autonomy_tier: self.autonomy_tier,
533 handler_kind: self.handler.kind().to_string(),
534 state: self.state_snapshot(),
535 metrics: self.metrics_snapshot(),
536 daily_cost_usd: self.daily_cost_usd,
537 hourly_cost_usd: self.hourly_cost_usd,
538 max_autonomous_decisions_per_hour: self.max_autonomous_decisions_per_hour,
539 max_autonomous_decisions_per_day: self.max_autonomous_decisions_per_day,
540 on_budget_exhausted: self.on_budget_exhausted,
541 }
542 }
543
544 fn new(spec: TriggerBindingSpec, version: u32) -> Self {
545 Self {
546 id: TriggerId::new(spec.id),
547 version,
548 source: spec.source,
549 kind: spec.kind,
550 provider: spec.provider,
551 autonomy_tier: spec.autonomy_tier,
552 handler: spec.handler,
553 dispatch_priority: spec.dispatch_priority,
554 when: spec.when,
555 when_budget: spec.when_budget,
556 retry: spec.retry,
557 match_events: spec.match_events,
558 dedupe_key: spec.dedupe_key,
559 dedupe_retention_days: spec.dedupe_retention_days,
560 filter: spec.filter,
561 daily_cost_usd: spec.daily_cost_usd,
562 hourly_cost_usd: spec.hourly_cost_usd,
563 max_autonomous_decisions_per_hour: spec.max_autonomous_decisions_per_hour,
564 max_autonomous_decisions_per_day: spec.max_autonomous_decisions_per_day,
565 on_budget_exhausted: spec.on_budget_exhausted,
566 max_concurrent: spec.max_concurrent,
567 flow_control: spec.flow_control,
568 aggregation: spec.aggregation,
569 manifest_path: spec.manifest_path,
570 package_name: spec.package_name,
571 definition_fingerprint: spec.definition_fingerprint,
572 state: Mutex::new(TriggerState::Registering),
573 metrics: TriggerMetrics::default(),
574 in_flight: AtomicU64::new(0),
575 cancel_token: Arc::new(AtomicBool::new(false)),
576 predicate_state: Mutex::new(TriggerPredicateState::default()),
577 }
578 }
579
580 pub fn binding_key(&self) -> String {
581 format!("{}@v{}", self.id.as_str(), self.version)
582 }
583
584 pub fn state_snapshot(&self) -> TriggerState {
585 *self.state.lock().expect("trigger state poisoned")
586 }
587
588 pub fn metrics_snapshot(&self) -> TriggerMetricsSnapshot {
589 TriggerMetricsSnapshot {
590 received: self.metrics.received.load(Ordering::Relaxed),
591 dispatched: self.metrics.dispatched.load(Ordering::Relaxed),
592 failed: self.metrics.failed.load(Ordering::Relaxed),
593 dlq: self.metrics.dlq.load(Ordering::Relaxed),
594 in_flight: self.in_flight.load(Ordering::Relaxed),
595 last_received_ms: *self
596 .metrics
597 .last_received_ms
598 .lock()
599 .expect("trigger metrics poisoned"),
600 cost_total_usd_micros: self.metrics.cost_total_usd_micros.load(Ordering::Relaxed),
601 cost_today_usd_micros: self.metrics.cost_today_usd_micros.load(Ordering::Relaxed),
602 cost_hour_usd_micros: self.metrics.cost_hour_usd_micros.load(Ordering::Relaxed),
603 autonomous_decisions_total: self
604 .metrics
605 .autonomous_decisions_total
606 .load(Ordering::Relaxed),
607 autonomous_decisions_today: self
608 .metrics
609 .autonomous_decisions_today
610 .load(Ordering::Relaxed),
611 autonomous_decisions_hour: self
612 .metrics
613 .autonomous_decisions_hour
614 .load(Ordering::Relaxed),
615 }
616 }
617}
618
619#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
620pub struct TriggerBindingSnapshot {
621 pub id: String,
622 pub version: u32,
623 pub source: TriggerBindingSource,
624 pub kind: String,
625 pub provider: String,
626 pub autonomy_tier: AutonomyTier,
627 pub handler_kind: String,
628 pub state: TriggerState,
629 pub metrics: TriggerMetricsSnapshot,
630 pub daily_cost_usd: Option<f64>,
631 pub hourly_cost_usd: Option<f64>,
632 pub max_autonomous_decisions_per_hour: Option<u64>,
633 pub max_autonomous_decisions_per_day: Option<u64>,
634 pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
635}
636
637#[derive(Clone, Copy, Debug, PartialEq, Eq)]
638pub enum TriggerDispatchOutcome {
639 Dispatched,
640 Failed,
641 Dlq,
642}
643
644#[derive(Debug)]
645pub enum TriggerRegistryError {
646 DuplicateId(String),
647 InvalidSpec(String),
648 UnknownId(String),
649 UnknownBindingVersion { id: String, version: u32 },
650 EventLog(String),
651}
652
653impl std::fmt::Display for TriggerRegistryError {
654 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
655 match self {
656 Self::DuplicateId(id) => write!(f, "duplicate trigger id '{id}'"),
657 Self::InvalidSpec(message) | Self::EventLog(message) => f.write_str(message),
658 Self::UnknownId(id) => write!(f, "unknown trigger id '{id}'"),
659 Self::UnknownBindingVersion { id, version } => {
660 write!(f, "unknown trigger binding '{id}' version {version}")
661 }
662 }
663 }
664}
665
666impl std::error::Error for TriggerRegistryError {}
667
668#[derive(Default)]
669pub struct TriggerRegistry {
670 bindings: BTreeMap<String, Vec<Arc<TriggerBinding>>>,
671 by_provider: BTreeMap<String, BTreeSet<String>>,
672 event_log: Option<Arc<AnyEventLog>>,
673 secret_provider: Option<Arc<dyn SecretProvider>>,
674}
675
676thread_local! {
677 static TRIGGER_REGISTRY: RefCell<TriggerRegistry> = RefCell::new(TriggerRegistry::default());
678}
679
680thread_local! {
681 static ORCHESTRATOR_BUDGET: RefCell<OrchestratorBudgetState> =
682 RefCell::new(OrchestratorBudgetState::default());
683}
684
685const TERMINATED_VERSION_RETENTION_LIMIT: usize = 2;
686
687const TRIGGERS_LIFECYCLE_TOPIC: &str = "triggers.lifecycle";
688const PREDICATE_COST_WINDOW: usize = 100;
689
690#[derive(Clone, Debug, Deserialize)]
691struct LifecycleStateTransitionRecord {
692 id: String,
693 version: u32,
694 #[serde(default)]
695 definition_fingerprint: Option<String>,
696 to_state: TriggerState,
697}
698
699#[derive(Clone, Debug)]
700struct HistoricalLifecycleRecord {
701 occurred_at_ms: i64,
702 transition: LifecycleStateTransitionRecord,
703}
704
705#[derive(Clone, Copy, Debug, PartialEq, Eq)]
706pub struct RecordedTriggerBinding {
707 pub version: u32,
708 pub received_at: OffsetDateTime,
709}
710
711#[derive(Clone, Copy, Debug, Default)]
712struct HistoricalVersionLookup {
713 matching_version: Option<u32>,
714 max_version: Option<u32>,
715}
716
717pub fn clear_trigger_registry() {
718 TRIGGER_REGISTRY.with(|slot| {
719 *slot.borrow_mut() = TriggerRegistry::default();
720 });
721 clear_orchestrator_budget();
722 super::aggregation::clear_aggregation_state();
723}
724
725pub fn install_orchestrator_budget(config: OrchestratorBudgetConfig) {
726 ORCHESTRATOR_BUDGET.with(|slot| {
727 let mut state = slot.borrow_mut();
728 rollover_orchestrator_budget(&mut state);
729 state.config = config;
730 });
731}
732
733pub fn clear_orchestrator_budget() {
734 ORCHESTRATOR_BUDGET.with(|slot| {
735 *slot.borrow_mut() = OrchestratorBudgetState::default();
736 });
737}
738
739pub fn snapshot_orchestrator_budget() -> OrchestratorBudgetSnapshot {
740 ORCHESTRATOR_BUDGET.with(|slot| {
741 let mut state = slot.borrow_mut();
742 rollover_orchestrator_budget(&mut state);
743 OrchestratorBudgetSnapshot {
744 daily_cost_usd: state.config.daily_cost_usd,
745 hourly_cost_usd: state.config.hourly_cost_usd,
746 cost_today_usd_micros: state.cost_today_usd_micros,
747 cost_hour_usd_micros: state.cost_hour_usd_micros,
748 day_utc: state.day_utc,
749 hour_utc: state.hour_utc,
750 }
751 })
752}
753
754pub fn note_orchestrator_budget_cost(cost_usd_micros: u64) {
755 if cost_usd_micros == 0 {
756 return;
757 }
758 ORCHESTRATOR_BUDGET.with(|slot| {
759 let mut state = slot.borrow_mut();
760 rollover_orchestrator_budget(&mut state);
761 state.cost_today_usd_micros = state.cost_today_usd_micros.saturating_add(cost_usd_micros);
762 state.cost_hour_usd_micros = state.cost_hour_usd_micros.saturating_add(cost_usd_micros);
763 });
764}
765
766pub(crate) fn note_binding_budget_cost(binding: &TriggerBinding, cost_usd_micros: u64) {
767 if cost_usd_micros == 0 {
768 return;
769 }
770 reset_binding_budget_windows(binding);
771 binding
772 .metrics
773 .cost_total_usd_micros
774 .fetch_add(cost_usd_micros, Ordering::Relaxed);
775 binding
776 .metrics
777 .cost_today_usd_micros
778 .fetch_add(cost_usd_micros, Ordering::Relaxed);
779 binding
780 .metrics
781 .cost_hour_usd_micros
782 .fetch_add(cost_usd_micros, Ordering::Relaxed);
783}
784
785pub fn orchestrator_budget_would_exceed(expected_cost_usd_micros: u64) -> Option<&'static str> {
786 ORCHESTRATOR_BUDGET.with(|slot| {
787 let mut state = slot.borrow_mut();
788 rollover_orchestrator_budget(&mut state);
789 if state.config.hourly_cost_usd.is_some_and(|limit| {
790 micros_to_usd(
791 state
792 .cost_hour_usd_micros
793 .saturating_add(expected_cost_usd_micros),
794 ) > limit
795 }) {
796 return Some("orchestrator_hourly_budget_exceeded");
797 }
798 if state.config.daily_cost_usd.is_some_and(|limit| {
799 micros_to_usd(
800 state
801 .cost_today_usd_micros
802 .saturating_add(expected_cost_usd_micros),
803 ) > limit
804 }) {
805 return Some("orchestrator_daily_budget_exceeded");
806 }
807 None
808 })
809}
810
811pub fn reset_binding_budget_windows(binding: &TriggerBinding) {
812 let today = utc_day_key();
813 let hour = utc_hour_key();
814 let mut state = binding
815 .predicate_state
816 .lock()
817 .expect("trigger predicate state poisoned");
818 if state.budget_day_utc != Some(today) {
819 state.budget_day_utc = Some(today);
820 binding
821 .metrics
822 .cost_today_usd_micros
823 .store(0, Ordering::Relaxed);
824 binding
825 .metrics
826 .autonomous_decisions_today
827 .store(0, Ordering::Relaxed);
828 }
829 if state.budget_hour_utc != Some(hour) {
830 state.budget_hour_utc = Some(hour);
831 binding
832 .metrics
833 .cost_hour_usd_micros
834 .store(0, Ordering::Relaxed);
835 binding
836 .metrics
837 .autonomous_decisions_hour
838 .store(0, Ordering::Relaxed);
839 }
840}
841
842pub fn binding_budget_would_exceed(
843 binding: &TriggerBinding,
844 expected_cost_usd_micros: u64,
845) -> Option<&'static str> {
846 reset_binding_budget_windows(binding);
847 if binding.hourly_cost_usd.is_some_and(|limit| {
848 micros_to_usd(
849 binding
850 .metrics
851 .cost_hour_usd_micros
852 .load(Ordering::Relaxed)
853 .saturating_add(expected_cost_usd_micros),
854 ) > limit
855 }) {
856 return Some("hourly_budget_exceeded");
857 }
858 if binding.daily_cost_usd.is_some_and(|limit| {
859 micros_to_usd(
860 binding
861 .metrics
862 .cost_today_usd_micros
863 .load(Ordering::Relaxed)
864 .saturating_add(expected_cost_usd_micros),
865 ) > limit
866 }) {
867 return Some("daily_budget_exceeded");
868 }
869 None
870}
871
872pub fn binding_autonomy_budget_would_exceed(binding: &TriggerBinding) -> Option<&'static str> {
873 reset_binding_budget_windows(binding);
874 if binding
875 .max_autonomous_decisions_per_hour
876 .is_some_and(|limit| {
877 binding
878 .metrics
879 .autonomous_decisions_hour
880 .load(Ordering::Relaxed)
881 .saturating_add(1)
882 > limit
883 })
884 {
885 return Some("hourly_autonomy_budget_exceeded");
886 }
887 if binding
888 .max_autonomous_decisions_per_day
889 .is_some_and(|limit| {
890 binding
891 .metrics
892 .autonomous_decisions_today
893 .load(Ordering::Relaxed)
894 .saturating_add(1)
895 > limit
896 })
897 {
898 return Some("daily_autonomy_budget_exceeded");
899 }
900 None
901}
902
903pub fn note_autonomous_decision(binding: &TriggerBinding) {
904 reset_binding_budget_windows(binding);
905 binding
906 .metrics
907 .autonomous_decisions_total
908 .fetch_add(1, Ordering::Relaxed);
909 binding
910 .metrics
911 .autonomous_decisions_today
912 .fetch_add(1, Ordering::Relaxed);
913 binding
914 .metrics
915 .autonomous_decisions_hour
916 .fetch_add(1, Ordering::Relaxed);
917}
918
919pub fn expected_predicate_cost_usd_micros(binding: &TriggerBinding) -> u64 {
920 let state = binding
921 .predicate_state
922 .lock()
923 .expect("trigger predicate state poisoned");
924 if let Some(average) = average_cost_sample_micros(&state.recent_cost_usd_micros) {
925 return average;
926 }
927 binding
928 .when_budget
929 .as_ref()
930 .and_then(|budget| budget.max_cost_usd)
931 .map(usd_to_micros)
932 .unwrap_or_default()
933}
934
935pub fn record_predicate_cost_sample(binding: &TriggerBinding, cost_usd_micros: u64) {
936 let mut state = binding
937 .predicate_state
938 .lock()
939 .expect("trigger predicate state poisoned");
940 state.recent_cost_usd_micros.push_back(cost_usd_micros);
941 while state.recent_cost_usd_micros.len() > PREDICATE_COST_WINDOW {
942 state.recent_cost_usd_micros.pop_front();
943 }
944}
945
946fn average_cost_sample_micros(samples: &VecDeque<u64>) -> Option<u64> {
947 if samples.is_empty() {
948 return None;
949 }
950 let total: u128 = samples.iter().map(|sample| u128::from(*sample)).sum();
951 Some((total / samples.len() as u128) as u64)
952}
953
954pub fn usd_to_micros(value: f64) -> u64 {
955 if !value.is_finite() || value <= 0.0 {
956 return 0;
957 }
958 (value * 1_000_000.0).ceil() as u64
959}
960
961pub fn micros_to_usd(value: u64) -> f64 {
962 value as f64 / 1_000_000.0
963}
964
965fn rollover_orchestrator_budget(state: &mut OrchestratorBudgetState) {
966 let today = utc_day_key();
967 let hour = utc_hour_key();
968 if state.day_utc != today {
969 state.day_utc = today;
970 state.cost_today_usd_micros = 0;
971 }
972 if state.hour_utc != hour {
973 state.hour_utc = hour;
974 state.cost_hour_usd_micros = 0;
975 }
976}
977
978fn utc_day_key() -> i32 {
979 (clock::now_utc().date()
980 - time::Date::from_calendar_date(1970, time::Month::January, 1).expect("valid epoch date"))
981 .whole_days() as i32
982}
983
984fn utc_hour_key() -> i64 {
985 clock::now_utc().unix_timestamp() / 3_600
986}
987
988pub fn snapshot_trigger_bindings() -> Vec<TriggerBindingSnapshot> {
989 TRIGGER_REGISTRY.with(|slot| {
990 let registry = slot.borrow();
991 let mut snapshots = Vec::new();
992 for bindings in registry.bindings.values() {
993 for binding in bindings {
994 snapshots.push(binding.snapshot());
995 }
996 }
997 snapshots.sort_by(|left, right| {
998 left.id
999 .cmp(&right.id)
1000 .then(left.version.cmp(&right.version))
1001 .then(left.state.as_str().cmp(right.state.as_str()))
1002 });
1003 snapshots
1004 })
1005}
1006
1007#[allow(clippy::arc_with_non_send_sync)]
1008pub fn resolve_trigger_binding_as_of(
1009 id: &str,
1010 as_of: OffsetDateTime,
1011) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
1012 let version = binding_version_as_of(id, as_of)?;
1013 resolve_trigger_binding_version(id, version)
1014}
1015
1016#[allow(clippy::arc_with_non_send_sync)]
1017pub fn resolve_live_or_as_of(
1018 id: &str,
1019 recorded: RecordedTriggerBinding,
1020) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
1021 match resolve_live_trigger_binding(id, Some(recorded.version)) {
1022 Ok(binding) => Ok(binding),
1023 Err(TriggerRegistryError::UnknownBindingVersion { .. }) => {
1024 let binding = resolve_trigger_binding_as_of(id, recorded.received_at)?;
1025 let mut metadata = BTreeMap::new();
1026 metadata.insert("trigger_id".to_string(), serde_json::json!(id));
1027 metadata.insert(
1028 "recorded_version".to_string(),
1029 serde_json::json!(recorded.version),
1030 );
1031 metadata.insert(
1032 "received_at".to_string(),
1033 serde_json::json!(recorded
1034 .received_at
1035 .format(&time::format_description::well_known::Rfc3339)
1036 .unwrap_or_else(|_| recorded.received_at.to_string())),
1037 );
1038 metadata.insert(
1039 "resolved_version".to_string(),
1040 serde_json::json!(binding.version),
1041 );
1042 crate::events::log_warn_meta(
1043 "replay.binding_version_gc_fallback",
1044 "trigger replay fell back to lifecycle history after binding version GC",
1045 metadata,
1046 );
1047 Ok(binding)
1048 }
1049 Err(error) => Err(error),
1050 }
1051}
1052
1053pub fn binding_version_as_of(id: &str, as_of: OffsetDateTime) -> Result<u32, TriggerRegistryError> {
1054 TRIGGER_REGISTRY.with(|slot| {
1055 let registry = slot.borrow();
1056 registry.binding_version_as_of(id, as_of)
1057 })
1058}
1059
1060#[allow(clippy::arc_with_non_send_sync)]
1061fn resolve_trigger_binding_version(
1062 id: &str,
1063 version: u32,
1064) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
1065 TRIGGER_REGISTRY.with(|slot| {
1066 let registry = slot.borrow();
1067 registry
1068 .binding(id, version)
1069 .ok_or_else(|| TriggerRegistryError::UnknownBindingVersion {
1070 id: id.to_string(),
1071 version,
1072 })
1073 })
1074}
1075
1076#[allow(clippy::arc_with_non_send_sync)]
1077pub fn resolve_live_trigger_binding(
1078 id: &str,
1079 version: Option<u32>,
1080) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
1081 TRIGGER_REGISTRY.with(|slot| {
1082 let registry = slot.borrow();
1083 if let Some(version) = version {
1084 let binding = registry.binding(id, version).ok_or_else(|| {
1085 TriggerRegistryError::UnknownBindingVersion {
1086 id: id.to_string(),
1087 version,
1088 }
1089 })?;
1090 if binding.state_snapshot() == TriggerState::Terminated {
1091 return Err(TriggerRegistryError::UnknownBindingVersion {
1092 id: id.to_string(),
1093 version,
1094 });
1095 }
1096 return Ok(binding);
1097 }
1098
1099 registry
1100 .live_bindings_any_source(id)
1101 .into_iter()
1102 .max_by_key(|binding| binding.version)
1103 .ok_or_else(|| TriggerRegistryError::UnknownId(id.to_string()))
1104 })
1105}
1106
1107pub(crate) fn channel_bindings_matching(
1113 scope: &str,
1114 scope_id: &str,
1115 name: &str,
1116) -> Vec<Arc<TriggerBinding>> {
1117 TRIGGER_REGISTRY.with(|slot| {
1118 let registry = slot.borrow();
1119 let Some(binding_ids) = registry.by_provider.get("channel") else {
1120 return Vec::new();
1121 };
1122 let mut bindings = Vec::new();
1123 for id in binding_ids {
1124 let Some(versions) = registry.bindings.get(id) else {
1125 continue;
1126 };
1127 for binding in versions {
1128 if binding.state_snapshot() != TriggerState::Active {
1129 continue;
1130 }
1131 let Some(selector_raw) = binding.match_events.first() else {
1132 continue;
1133 };
1134 let Ok(selector) = crate::channels::ChannelSelector::parse(selector_raw) else {
1135 continue;
1136 };
1137 if !selector.matches(scope, scope_id, name, scope_id) {
1141 continue;
1142 }
1143 bindings.push(binding.clone());
1144 }
1145 }
1146 bindings.sort_by(|left, right| {
1147 left.id
1148 .as_str()
1149 .cmp(right.id.as_str())
1150 .then(left.version.cmp(&right.version))
1151 });
1152 bindings
1153 })
1154}
1155
1156pub(crate) fn matching_bindings(event: &super::TriggerEvent) -> Vec<Arc<TriggerBinding>> {
1157 TRIGGER_REGISTRY.with(|slot| {
1158 let registry = slot.borrow();
1159 let Some(binding_ids) = registry.by_provider.get(event.provider.as_str()) else {
1160 return Vec::new();
1161 };
1162
1163 let mut bindings = Vec::new();
1164 for id in binding_ids {
1165 let Some(versions) = registry.bindings.get(id) else {
1166 continue;
1167 };
1168 for binding in versions {
1169 if binding.state_snapshot() != TriggerState::Active {
1170 continue;
1171 }
1172 if !binding.match_events.is_empty()
1173 && !binding
1174 .match_events
1175 .iter()
1176 .any(|kind| trigger_event_kind_matches(event, kind))
1177 {
1178 continue;
1179 }
1180 bindings.push(binding.clone());
1181 }
1182 }
1183
1184 bindings.sort_by(|left, right| {
1185 left.id
1186 .as_str()
1187 .cmp(right.id.as_str())
1188 .then(left.version.cmp(&right.version))
1189 });
1190 bindings
1191 })
1192}
1193
1194fn trigger_event_kind_matches(event: &super::TriggerEvent, expected: &str) -> bool {
1195 if expected == event.kind {
1196 return true;
1197 }
1198 expected
1199 .strip_prefix(event.provider.as_str())
1200 .and_then(|rest| rest.strip_prefix('.'))
1201 .is_some_and(|kind| kind == event.kind)
1202}
1203
1204pub async fn install_manifest_triggers(
1205 specs: Vec<TriggerBindingSpec>,
1206) -> Result<(), TriggerRegistryError> {
1207 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1208 let registry = &mut *slot.borrow_mut();
1209 registry.refresh_runtime_context();
1210 let mut touched_ids = BTreeSet::new();
1211
1212 let mut incoming = BTreeMap::new();
1213 for spec in specs {
1214 let spec_id = spec.id.clone();
1215 if spec.source != TriggerBindingSource::Manifest {
1216 return Err(TriggerRegistryError::InvalidSpec(format!(
1217 "manifest install received non-manifest trigger '{spec_id}'"
1218 )));
1219 }
1220 if spec_id.trim().is_empty() {
1221 return Err(TriggerRegistryError::InvalidSpec(
1222 "manifest trigger id cannot be empty".to_string(),
1223 ));
1224 }
1225 if incoming.insert(spec_id.clone(), spec).is_some() {
1226 return Err(TriggerRegistryError::DuplicateId(spec_id));
1227 }
1228 }
1229
1230 let mut lifecycle = Vec::new();
1231 let existing_ids: Vec<String> = registry
1232 .bindings
1233 .iter()
1234 .filter(|(_, bindings)| {
1235 bindings.iter().any(|binding| {
1236 binding.source == TriggerBindingSource::Manifest
1237 && binding.state_snapshot() != TriggerState::Terminated
1238 })
1239 })
1240 .map(|(id, _)| id.clone())
1241 .collect();
1242
1243 for id in existing_ids {
1244 let live_manifest = registry.live_bindings(&id, TriggerBindingSource::Manifest);
1245 let Some(spec) = incoming.remove(&id) else {
1246 for binding in live_manifest {
1247 registry.transition_binding_to_draining(&binding, &mut lifecycle);
1248 }
1249 touched_ids.insert(id.clone());
1250 continue;
1251 };
1252
1253 let has_matching_active = live_manifest.iter().any(|binding| {
1254 binding.definition_fingerprint == spec.definition_fingerprint
1255 && matches!(
1256 binding.state_snapshot(),
1257 TriggerState::Registering | TriggerState::Active
1258 )
1259 });
1260 if has_matching_active {
1261 continue;
1262 }
1263
1264 for binding in live_manifest {
1265 registry.transition_binding_to_draining(&binding, &mut lifecycle);
1266 }
1267
1268 let version = registry.next_version_for_spec(&spec);
1269 registry.register_binding(spec, version, &mut lifecycle);
1270 touched_ids.insert(id.clone());
1271 }
1272
1273 for spec in incoming.into_values() {
1274 touched_ids.insert(spec.id.clone());
1275 let version = registry.next_version_for_spec(&spec);
1276 registry.register_binding(spec, version, &mut lifecycle);
1277 }
1278
1279 for id in touched_ids {
1280 registry.gc_terminated_versions(&id);
1281 }
1282
1283 Ok((registry.event_log.clone(), lifecycle))
1284 })?;
1285
1286 append_lifecycle_events(event_log, events).await
1287}
1288
1289pub async fn dynamic_register(
1290 mut spec: TriggerBindingSpec,
1291) -> Result<TriggerId, TriggerRegistryError> {
1292 if spec.id.trim().is_empty() {
1293 spec.id = format!("dynamic_trigger_{}", Uuid::now_v7());
1294 }
1295 spec.source = TriggerBindingSource::Dynamic;
1296 let id = spec.id.clone();
1297 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1298 let registry = &mut *slot.borrow_mut();
1299 registry.refresh_runtime_context();
1300
1301 if registry.bindings.contains_key(id.as_str()) {
1302 return Err(TriggerRegistryError::DuplicateId(id.clone()));
1303 }
1304
1305 let mut lifecycle = Vec::new();
1306 let version = registry.next_version_for_spec(&spec);
1307 registry.register_binding(spec, version, &mut lifecycle);
1308 Ok((registry.event_log.clone(), lifecycle))
1309 })?;
1310
1311 append_lifecycle_events(event_log, events).await?;
1312 Ok(TriggerId::new(id))
1313}
1314
1315pub async fn dynamic_deregister(id: &str) -> Result<(), TriggerRegistryError> {
1316 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1317 let registry = &mut *slot.borrow_mut();
1318 let live_dynamic = registry.live_bindings(id, TriggerBindingSource::Dynamic);
1319 if live_dynamic.is_empty() {
1320 return Err(TriggerRegistryError::UnknownId(id.to_string()));
1321 }
1322
1323 let mut lifecycle = Vec::new();
1324 for binding in live_dynamic {
1325 registry.transition_binding_to_draining(&binding, &mut lifecycle);
1326 }
1327 Ok((registry.event_log.clone(), lifecycle))
1328 })?;
1329
1330 append_lifecycle_events(event_log, events).await
1331}
1332
1333pub async fn drain(id: &str) -> Result<(), TriggerRegistryError> {
1334 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1335 let registry = &mut *slot.borrow_mut();
1336 let live = registry.live_bindings_any_source(id);
1337 if live.is_empty() {
1338 return Err(TriggerRegistryError::UnknownId(id.to_string()));
1339 }
1340
1341 let mut lifecycle = Vec::new();
1342 for binding in live {
1343 registry.transition_binding_to_draining(&binding, &mut lifecycle);
1344 }
1345 Ok((registry.event_log.clone(), lifecycle))
1346 })?;
1347
1348 append_lifecycle_events(event_log, events).await
1349}
1350
1351pub async fn pause(id: &str) -> Result<(), TriggerRegistryError> {
1352 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1353 let registry = &mut *slot.borrow_mut();
1354 let live = registry.live_bindings_any_source(id);
1355 if live.is_empty() {
1356 return Err(TriggerRegistryError::UnknownId(id.to_string()));
1357 }
1358
1359 let mut lifecycle = Vec::new();
1360 for binding in live {
1361 match binding.state_snapshot() {
1362 TriggerState::Registering | TriggerState::Active => {
1363 registry.transition_binding_state(
1364 &binding,
1365 TriggerState::Paused,
1366 &mut lifecycle,
1367 );
1368 }
1369 TriggerState::Paused | TriggerState::Draining | TriggerState::Terminated => {}
1370 }
1371 }
1372 Ok((registry.event_log.clone(), lifecycle))
1373 })?;
1374
1375 append_lifecycle_events(event_log, events).await
1376}
1377
1378pub async fn resume(id: &str) -> Result<(), TriggerRegistryError> {
1379 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1380 let registry = &mut *slot.borrow_mut();
1381 let live = registry.live_bindings_any_source(id);
1382 if live.is_empty() {
1383 return Err(TriggerRegistryError::UnknownId(id.to_string()));
1384 }
1385
1386 let mut lifecycle = Vec::new();
1387 for binding in live {
1388 if binding.state_snapshot() == TriggerState::Paused {
1389 registry.transition_binding_state(&binding, TriggerState::Active, &mut lifecycle);
1390 }
1391 }
1392 Ok((registry.event_log.clone(), lifecycle))
1393 })?;
1394
1395 append_lifecycle_events(event_log, events).await
1396}
1397
1398fn pin_trigger_binding_inner(
1399 id: &str,
1400 version: u32,
1401 allow_terminated: bool,
1402) -> Result<(), TriggerRegistryError> {
1403 TRIGGER_REGISTRY.with(|slot| {
1404 let registry = slot.borrow();
1405 let binding = registry.binding(id, version).ok_or_else(|| {
1406 TriggerRegistryError::UnknownBindingVersion {
1407 id: id.to_string(),
1408 version,
1409 }
1410 })?;
1411 match binding.state_snapshot() {
1412 TriggerState::Paused => Err(TriggerRegistryError::InvalidSpec(format!(
1413 "trigger binding '{id}' version {version} is paused"
1414 ))),
1415 TriggerState::Terminated if !allow_terminated => {
1416 Err(TriggerRegistryError::InvalidSpec(format!(
1417 "trigger binding '{id}' version {version} is terminated"
1418 )))
1419 }
1420 _ => {
1421 binding.in_flight.fetch_add(1, Ordering::Relaxed);
1422 Ok(())
1423 }
1424 }
1425 })
1426}
1427
1428pub fn pin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1429 pin_trigger_binding_inner(id, version, false)
1430}
1431
1432pub async fn unpin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1433 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1434 let registry = &mut *slot.borrow_mut();
1435 let binding = registry.binding(id, version).ok_or_else(|| {
1436 TriggerRegistryError::UnknownBindingVersion {
1437 id: id.to_string(),
1438 version,
1439 }
1440 })?;
1441 let current = binding.in_flight.load(Ordering::Relaxed);
1442 if current == 0 {
1443 return Err(TriggerRegistryError::InvalidSpec(format!(
1444 "trigger binding '{id}' version {version} has no in-flight events"
1445 )));
1446 }
1447 binding.in_flight.fetch_sub(1, Ordering::Relaxed);
1448
1449 let mut lifecycle = Vec::new();
1450 registry.maybe_finalize_draining(&binding, &mut lifecycle);
1451 registry.gc_terminated_versions(binding.id.as_str());
1452 Ok((registry.event_log.clone(), lifecycle))
1453 })?;
1454
1455 append_lifecycle_events(event_log, events).await
1456}
1457
1458pub fn begin_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1459 begin_in_flight_inner(id, version, false)
1460}
1461
1462pub(crate) fn begin_replay_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1463 begin_in_flight_inner(id, version, true)
1464}
1465
1466fn begin_in_flight_inner(
1467 id: &str,
1468 version: u32,
1469 allow_terminated: bool,
1470) -> Result<(), TriggerRegistryError> {
1471 pin_trigger_binding_inner(id, version, allow_terminated)?;
1472 TRIGGER_REGISTRY.with(|slot| {
1473 let registry = slot.borrow();
1474 let binding = registry.binding(id, version).ok_or_else(|| {
1475 TriggerRegistryError::UnknownBindingVersion {
1476 id: id.to_string(),
1477 version,
1478 }
1479 })?;
1480 binding.metrics.received.fetch_add(1, Ordering::Relaxed);
1481 *binding
1482 .metrics
1483 .last_received_ms
1484 .lock()
1485 .expect("trigger metrics poisoned") = Some(now_ms());
1486 Ok(())
1487 })
1488}
1489
1490pub async fn finish_in_flight(
1491 id: &str,
1492 version: u32,
1493 outcome: TriggerDispatchOutcome,
1494) -> Result<(), TriggerRegistryError> {
1495 TRIGGER_REGISTRY.with(|slot| {
1496 let registry = &mut *slot.borrow_mut();
1497 let binding = registry.binding(id, version).ok_or_else(|| {
1498 TriggerRegistryError::UnknownBindingVersion {
1499 id: id.to_string(),
1500 version,
1501 }
1502 })?;
1503 let current = binding.in_flight.load(Ordering::Relaxed);
1504 if current == 0 {
1505 return Err(TriggerRegistryError::InvalidSpec(format!(
1506 "trigger binding '{id}' version {version} has no in-flight events"
1507 )));
1508 }
1509 match outcome {
1510 TriggerDispatchOutcome::Dispatched => {
1511 binding.metrics.dispatched.fetch_add(1, Ordering::Relaxed);
1512 }
1513 TriggerDispatchOutcome::Failed => {
1514 binding.metrics.failed.fetch_add(1, Ordering::Relaxed);
1515 }
1516 TriggerDispatchOutcome::Dlq => {
1517 binding.metrics.dlq.fetch_add(1, Ordering::Relaxed);
1518 }
1519 }
1520 Ok(())
1521 })?;
1522
1523 unpin_trigger_binding(id, version).await
1524}
1525
1526impl TriggerRegistry {
1527 fn refresh_runtime_context(&mut self) {
1528 if self.event_log.is_none() {
1529 self.event_log = active_event_log();
1530 }
1531 if self.secret_provider.is_none() {
1532 self.secret_provider = default_secret_provider();
1533 }
1534 }
1535
1536 fn binding(&self, id: &str, version: u32) -> Option<Arc<TriggerBinding>> {
1537 self.bindings
1538 .get(id)
1539 .and_then(|bindings| bindings.iter().find(|binding| binding.version == version))
1540 .cloned()
1541 }
1542
1543 fn live_bindings(&self, id: &str, source: TriggerBindingSource) -> Vec<Arc<TriggerBinding>> {
1544 self.bindings
1545 .get(id)
1546 .into_iter()
1547 .flat_map(|bindings| bindings.iter())
1548 .filter(|binding| {
1549 binding.source == source && binding.state_snapshot() != TriggerState::Terminated
1550 })
1551 .cloned()
1552 .collect()
1553 }
1554
1555 fn live_bindings_any_source(&self, id: &str) -> Vec<Arc<TriggerBinding>> {
1556 self.bindings
1557 .get(id)
1558 .into_iter()
1559 .flat_map(|bindings| bindings.iter())
1560 .filter(|binding| binding.state_snapshot() != TriggerState::Terminated)
1561 .cloned()
1562 .collect()
1563 }
1564
1565 fn next_version_for_spec(&self, spec: &TriggerBindingSpec) -> u32 {
1566 if let Some(version) = self
1567 .bindings
1568 .get(spec.id.as_str())
1569 .into_iter()
1570 .flat_map(|bindings| bindings.iter())
1571 .find(|binding| binding.definition_fingerprint == spec.definition_fingerprint)
1572 .map(|binding| binding.version)
1573 {
1574 return version;
1575 }
1576
1577 let historical =
1578 self.historical_versions_for(spec.id.as_str(), spec.definition_fingerprint.as_str());
1579 if let Some(version) = historical.matching_version {
1580 return version;
1581 }
1582
1583 self.bindings
1584 .get(spec.id.as_str())
1585 .into_iter()
1586 .flat_map(|bindings| bindings.iter())
1587 .map(|binding| binding.version)
1588 .chain(historical.max_version)
1589 .max()
1590 .unwrap_or(0)
1591 + 1
1592 }
1593
1594 fn gc_terminated_versions(&mut self, id: &str) {
1595 let Some(bindings) = self.bindings.get_mut(id) else {
1596 return;
1597 };
1598
1599 let mut newest_versions: Vec<u32> =
1600 bindings.iter().map(|binding| binding.version).collect();
1601 newest_versions.sort_unstable_by(|left, right| right.cmp(left));
1602 newest_versions.truncate(TERMINATED_VERSION_RETENTION_LIMIT);
1603 let retained_versions: BTreeSet<u32> = newest_versions.into_iter().collect();
1604
1605 bindings.retain(|binding| {
1606 binding.state_snapshot() != TriggerState::Terminated
1607 || retained_versions.contains(&binding.version)
1608 });
1609
1610 if bindings.is_empty() {
1611 self.bindings.remove(id);
1612 }
1613 }
1614
1615 fn historical_versions_for(&self, id: &str, fingerprint: &str) -> HistoricalVersionLookup {
1616 let mut lookup = HistoricalVersionLookup::default();
1617 for record in self.lifecycle_records_for(id) {
1618 lookup.max_version = Some(
1619 lookup
1620 .max_version
1621 .unwrap_or(0)
1622 .max(record.transition.version),
1623 );
1624 if record.transition.definition_fingerprint.as_deref() == Some(fingerprint) {
1625 lookup.matching_version = Some(record.transition.version);
1626 }
1627 }
1628 lookup
1629 }
1630
1631 fn binding_version_as_of(
1632 &self,
1633 id: &str,
1634 as_of: OffsetDateTime,
1635 ) -> Result<u32, TriggerRegistryError> {
1636 let cutoff_ms = harn_clock::offset_datetime_to_ms(as_of);
1637 let mut active_version = None;
1638 for record in self.lifecycle_records_for(id) {
1639 if record.occurred_at_ms > cutoff_ms {
1640 break;
1641 }
1642 match record.transition.to_state {
1643 TriggerState::Active => active_version = Some(record.transition.version),
1644 TriggerState::Paused | TriggerState::Draining | TriggerState::Terminated => {
1645 if active_version == Some(record.transition.version) {
1646 active_version = None;
1647 }
1648 }
1649 TriggerState::Registering => {}
1650 }
1651 }
1652
1653 active_version.ok_or_else(|| {
1654 TriggerRegistryError::InvalidSpec(format!(
1655 "no active trigger binding '{}' at {}",
1656 id,
1657 as_of
1658 .format(&time::format_description::well_known::Rfc3339)
1659 .unwrap_or_else(|_| as_of.to_string())
1660 ))
1661 })
1662 }
1663
1664 fn lifecycle_records_for(&self, id: &str) -> Vec<HistoricalLifecycleRecord> {
1665 let Some(event_log) = self.event_log.as_ref() else {
1666 return Vec::new();
1667 };
1668 let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1669 .expect("static triggers.lifecycle topic should always be valid");
1670 futures::executor::block_on(event_log.read_range(&topic, None, usize::MAX))
1671 .unwrap_or_default()
1672 .into_iter()
1673 .filter_map(|(_, event)| {
1674 let occurred_at_ms = event.occurred_at_ms;
1675 let transition: LifecycleStateTransitionRecord =
1676 serde_json::from_value(event.payload).ok()?;
1677 (transition.id == id).then_some(HistoricalLifecycleRecord {
1678 occurred_at_ms,
1679 transition,
1680 })
1681 })
1682 .collect()
1683 }
1684
1685 #[allow(clippy::arc_with_non_send_sync)]
1686 fn register_binding(
1687 &mut self,
1688 spec: TriggerBindingSpec,
1689 version: u32,
1690 lifecycle: &mut Vec<LogEvent>,
1691 ) -> Arc<TriggerBinding> {
1692 let binding = Arc::new(TriggerBinding::new(spec, version));
1693 self.by_provider
1694 .entry(binding.provider.as_str().to_string())
1695 .or_default()
1696 .insert(binding.id.as_str().to_string());
1697 self.bindings
1698 .entry(binding.id.as_str().to_string())
1699 .or_default()
1700 .push(binding.clone());
1701 lifecycle.push(lifecycle_event(&binding, None, TriggerState::Registering));
1702 self.transition_binding_state(&binding, TriggerState::Active, lifecycle);
1703 binding
1704 }
1705
1706 fn transition_binding_to_draining(
1707 &self,
1708 binding: &Arc<TriggerBinding>,
1709 lifecycle: &mut Vec<LogEvent>,
1710 ) {
1711 if matches!(binding.state_snapshot(), TriggerState::Terminated) {
1712 return;
1713 }
1714 self.transition_binding_state(binding, TriggerState::Draining, lifecycle);
1715 self.maybe_finalize_draining(binding, lifecycle);
1716 }
1717
1718 fn maybe_finalize_draining(
1719 &self,
1720 binding: &Arc<TriggerBinding>,
1721 lifecycle: &mut Vec<LogEvent>,
1722 ) {
1723 if binding.state_snapshot() == TriggerState::Draining
1724 && binding.in_flight.load(Ordering::Relaxed) == 0
1725 {
1726 self.transition_binding_state(binding, TriggerState::Terminated, lifecycle);
1727 }
1728 }
1729
1730 fn transition_binding_state(
1731 &self,
1732 binding: &Arc<TriggerBinding>,
1733 next: TriggerState,
1734 lifecycle: &mut Vec<LogEvent>,
1735 ) {
1736 let mut state = binding.state.lock().expect("trigger state poisoned");
1737 let previous = *state;
1738 if previous == next {
1739 return;
1740 }
1741 *state = next;
1742 drop(state);
1743 if next == TriggerState::Terminated && binding.aggregation.is_some() {
1749 let _ = super::aggregation::drop_binding_aggregation(&binding.binding_key());
1750 }
1751 lifecycle.push(lifecycle_event(binding, Some(previous), next));
1752 }
1753}
1754
1755fn lifecycle_event(
1756 binding: &TriggerBinding,
1757 from_state: Option<TriggerState>,
1758 to_state: TriggerState,
1759) -> LogEvent {
1760 LogEvent::new(
1761 "state_transition",
1762 serde_json::json!({
1763 "id": binding.id.as_str(),
1764 "binding_key": binding.binding_key(),
1765 "version": binding.version,
1766 "provider": binding.provider.as_str(),
1767 "kind": &binding.kind,
1768 "source": binding.source.as_str(),
1769 "handler_kind": binding.handler.kind(),
1770 "definition_fingerprint": &binding.definition_fingerprint,
1771 "from_state": from_state.map(TriggerState::as_str),
1772 "to_state": to_state.as_str(),
1773 }),
1774 )
1775}
1776
1777async fn append_lifecycle_events(
1778 event_log: Option<Arc<AnyEventLog>>,
1779 events: Vec<LogEvent>,
1780) -> Result<(), TriggerRegistryError> {
1781 let Some(event_log) = event_log else {
1782 return Ok(());
1783 };
1784 if events.is_empty() {
1785 return Ok(());
1786 }
1787
1788 let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1789 .expect("static triggers.lifecycle topic should always be valid");
1790 for event in events {
1791 event_log
1792 .append(&topic, event)
1793 .await
1794 .map_err(|error| TriggerRegistryError::EventLog(error.to_string()))?;
1795 }
1796 Ok(())
1797}
1798
1799fn default_secret_provider() -> Option<Arc<dyn SecretProvider>> {
1800 configured_default_chain(default_secret_namespace())
1801 .ok()
1802 .map(|provider| Arc::new(provider) as Arc<dyn SecretProvider>)
1803}
1804
1805fn default_secret_namespace() -> String {
1806 if let Ok(namespace) = std::env::var("HARN_SECRET_NAMESPACE") {
1807 if !namespace.trim().is_empty() {
1808 return namespace;
1809 }
1810 }
1811
1812 let cwd = std::env::current_dir().unwrap_or_default();
1813 let leaf = cwd
1814 .file_name()
1815 .and_then(|name| name.to_str())
1816 .filter(|name| !name.is_empty())
1817 .unwrap_or("workspace");
1818 format!("harn/{leaf}")
1819}
1820
1821fn now_ms() -> i64 {
1822 clock::now_ms()
1823}
1824
1825#[cfg(test)]
1826mod tests {
1827 use super::*;
1828 use crate::event_log::{
1829 install_default_for_base_dir, pin_test_occurred_at_ms, reset_active_event_log,
1830 };
1831 use crate::events::{add_event_sink, clear_event_sinks, CollectorSink, EventLevel};
1832 use std::rc::Rc;
1833
1834 fn offset_from_ms(ms: i64) -> OffsetDateTime {
1839 OffsetDateTime::from_unix_timestamp_nanos((ms as i128) * 1_000_000)
1840 .expect("epoch ms within OffsetDateTime range")
1841 }
1842
1843 fn manifest_spec(id: &str, fingerprint: &str) -> TriggerBindingSpec {
1844 TriggerBindingSpec {
1845 id: id.to_string(),
1846 source: TriggerBindingSource::Manifest,
1847 kind: "webhook".to_string(),
1848 provider: ProviderId::from("github"),
1849 autonomy_tier: crate::AutonomyTier::ActAuto,
1850 handler: TriggerHandlerSpec::Worker {
1851 queue: format!("{id}-queue"),
1852 },
1853 dispatch_priority: crate::WorkerQueuePriority::Normal,
1854 when: None,
1855 when_budget: None,
1856 retry: TriggerRetryConfig::default(),
1857 match_events: vec!["issues.opened".to_string()],
1858 dedupe_key: Some("event.dedupe_key".to_string()),
1859 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1860 filter: Some("event.kind".to_string()),
1861 daily_cost_usd: Some(5.0),
1862 hourly_cost_usd: None,
1863 max_autonomous_decisions_per_hour: None,
1864 max_autonomous_decisions_per_day: None,
1865 on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1866 max_concurrent: Some(10),
1867 flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1868 aggregation: None,
1869 manifest_path: None,
1870 package_name: Some("workspace".to_string()),
1871 definition_fingerprint: fingerprint.to_string(),
1872 }
1873 }
1874
1875 fn dynamic_spec(id: &str) -> TriggerBindingSpec {
1876 TriggerBindingSpec {
1877 id: id.to_string(),
1878 source: TriggerBindingSource::Dynamic,
1879 kind: "webhook".to_string(),
1880 provider: ProviderId::from("github"),
1881 autonomy_tier: crate::AutonomyTier::ActAuto,
1882 handler: TriggerHandlerSpec::Worker {
1883 queue: format!("{id}-queue"),
1884 },
1885 dispatch_priority: crate::WorkerQueuePriority::Normal,
1886 when: None,
1887 when_budget: None,
1888 retry: TriggerRetryConfig::default(),
1889 match_events: vec!["issues.opened".to_string()],
1890 dedupe_key: None,
1891 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1892 filter: None,
1893 daily_cost_usd: None,
1894 hourly_cost_usd: None,
1895 max_autonomous_decisions_per_hour: None,
1896 max_autonomous_decisions_per_day: None,
1897 on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1898 max_concurrent: None,
1899 flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1900 aggregation: None,
1901 manifest_path: None,
1902 package_name: None,
1903 definition_fingerprint: format!("dynamic:{id}"),
1904 }
1905 }
1906
1907 #[tokio::test(flavor = "current_thread")]
1908 async fn manifest_loaded_trigger_registers_with_zeroed_metrics() {
1909 clear_trigger_registry();
1910
1911 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1912 .await
1913 .expect("manifest trigger installs");
1914
1915 let snapshots = snapshot_trigger_bindings();
1916 assert_eq!(snapshots.len(), 1);
1917 let binding = &snapshots[0];
1918 assert_eq!(binding.id, "github-new-issue");
1919 assert_eq!(binding.version, 1);
1920 assert_eq!(binding.state, TriggerState::Active);
1921 assert_eq!(binding.metrics, TriggerMetricsSnapshot::default());
1922
1923 clear_trigger_registry();
1924 }
1925
1926 #[tokio::test(flavor = "current_thread")]
1927 async fn dynamic_register_assigns_unique_ids_and_rejects_duplicates() {
1928 clear_trigger_registry();
1929
1930 let first = dynamic_register(dynamic_spec("dynamic-a"))
1931 .await
1932 .expect("first dynamic trigger");
1933 let second = dynamic_register(dynamic_spec("dynamic-b"))
1934 .await
1935 .expect("second dynamic trigger");
1936 assert_ne!(first, second);
1937
1938 let error = dynamic_register(dynamic_spec("dynamic-a"))
1939 .await
1940 .expect_err("duplicate id should fail");
1941 assert!(matches!(error, TriggerRegistryError::DuplicateId(_)));
1942
1943 clear_trigger_registry();
1944 }
1945
1946 #[test]
1947 fn expected_predicate_cost_average_does_not_overflow() {
1948 let binding = TriggerBinding::new(manifest_spec("costed", "v1"), 1);
1949 record_predicate_cost_sample(&binding, u64::MAX);
1950 record_predicate_cost_sample(&binding, u64::MAX);
1951
1952 assert_eq!(expected_predicate_cost_usd_micros(&binding), u64::MAX);
1953 }
1954
1955 #[tokio::test(flavor = "current_thread")]
1956 async fn drain_waits_for_in_flight_events_before_terminating() {
1957 clear_trigger_registry();
1958
1959 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1960 .await
1961 .expect("manifest trigger installs");
1962 begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1963
1964 drain("github-new-issue").await.expect("drain succeeds");
1965 let binding = snapshot_trigger_bindings()
1966 .into_iter()
1967 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1968 .expect("binding snapshot");
1969 assert_eq!(binding.state, TriggerState::Draining);
1970 assert_eq!(binding.metrics.in_flight, 1);
1971
1972 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1973 .await
1974 .expect("finish in-flight event");
1975 let binding = snapshot_trigger_bindings()
1976 .into_iter()
1977 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1978 .expect("binding snapshot");
1979 assert_eq!(binding.state, TriggerState::Terminated);
1980 assert_eq!(binding.metrics.in_flight, 0);
1981
1982 clear_trigger_registry();
1983 }
1984
1985 #[tokio::test(flavor = "current_thread")]
1986 async fn hot_reload_registers_new_version_while_old_binding_drains() {
1987 clear_trigger_registry();
1988
1989 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1990 .await
1991 .expect("initial manifest trigger installs");
1992 begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1993
1994 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1995 .await
1996 .expect("updated manifest trigger installs");
1997
1998 let snapshots = snapshot_trigger_bindings();
1999 assert_eq!(snapshots.len(), 2);
2000 let old = snapshots
2001 .iter()
2002 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
2003 .expect("old binding");
2004 let new = snapshots
2005 .iter()
2006 .find(|binding| binding.id == "github-new-issue" && binding.version == 2)
2007 .expect("new binding");
2008 assert_eq!(old.state, TriggerState::Draining);
2009 assert_eq!(new.state, TriggerState::Active);
2010
2011 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
2012 .await
2013 .expect("finish old in-flight event");
2014 let old = snapshot_trigger_bindings()
2015 .into_iter()
2016 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
2017 .expect("old binding");
2018 assert_eq!(old.state, TriggerState::Terminated);
2019
2020 clear_trigger_registry();
2021 }
2022
2023 #[tokio::test(flavor = "current_thread")]
2024 async fn gc_drops_terminated_versions_beyond_retention_limit() {
2025 clear_trigger_registry();
2026
2027 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2028 .await
2029 .expect("install v1");
2030 begin_in_flight("github-new-issue", 1).expect("pin v1");
2031
2032 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2033 .await
2034 .expect("install v2");
2035 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
2036 .await
2037 .expect("finish v1");
2038
2039 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
2040 .await
2041 .expect("install v3");
2042
2043 let snapshots = snapshot_trigger_bindings();
2044 let versions: Vec<u32> = snapshots
2045 .into_iter()
2046 .filter(|binding| binding.id == "github-new-issue")
2047 .map(|binding| binding.version)
2048 .collect();
2049 assert_eq!(versions, vec![2, 3]);
2050
2051 clear_trigger_registry();
2052 }
2053
2054 #[tokio::test(flavor = "current_thread")]
2055 async fn lifecycle_transitions_append_to_event_log() {
2056 clear_trigger_registry();
2057 reset_active_event_log();
2058 let tempdir = tempfile::tempdir().expect("tempdir");
2059 let log = install_default_for_base_dir(tempdir.path()).expect("install event log");
2060
2061 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2062 .await
2063 .expect("manifest trigger installs");
2064 begin_in_flight("github-new-issue", 1).expect("start in-flight event");
2065 drain("github-new-issue").await.expect("drain succeeds");
2066 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
2067 .await
2068 .expect("finish event");
2069
2070 let topic = Topic::new("triggers.lifecycle").expect("valid lifecycle topic");
2071 let events = log
2072 .read_range(&topic, None, 32)
2073 .await
2074 .expect("read lifecycle events");
2075 let states: Vec<String> = events
2076 .into_iter()
2077 .filter_map(|(_, event)| {
2078 event
2079 .payload
2080 .get("to_state")
2081 .and_then(|value| value.as_str())
2082 .map(|value| value.to_string())
2083 })
2084 .collect();
2085 assert_eq!(
2086 states,
2087 vec![
2088 "registering".to_string(),
2089 "active".to_string(),
2090 "draining".to_string(),
2091 "terminated".to_string(),
2092 ]
2093 );
2094
2095 reset_active_event_log();
2096 clear_trigger_registry();
2097 }
2098
2099 #[tokio::test(flavor = "current_thread")]
2100 async fn version_history_reuses_historical_version_after_restart() {
2101 clear_trigger_registry();
2102 reset_active_event_log();
2103 let tempdir = tempfile::tempdir().expect("tempdir");
2104 install_default_for_base_dir(tempdir.path()).expect("install event log");
2105
2106 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2107 .await
2108 .expect("initial manifest trigger installs");
2109 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2110 .await
2111 .expect("updated manifest trigger installs");
2112
2113 clear_trigger_registry();
2114 reset_active_event_log();
2115 install_default_for_base_dir(tempdir.path()).expect("reopen event log");
2116
2117 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2118 .await
2119 .expect("manifest reload reuses historical version");
2120
2121 let binding = snapshot_trigger_bindings()
2122 .into_iter()
2123 .find(|binding| binding.id == "github-new-issue")
2124 .expect("binding snapshot");
2125 assert_eq!(binding.version, 2);
2126
2127 reset_active_event_log();
2128 clear_trigger_registry();
2129 }
2130
2131 #[tokio::test(flavor = "current_thread")]
2132 async fn binding_version_as_of_reports_historical_active_version() {
2133 clear_trigger_registry();
2134 reset_active_event_log();
2135 let tempdir = tempfile::tempdir().expect("tempdir");
2136 install_default_for_base_dir(tempdir.path()).expect("install event log");
2137
2138 const T1_MS: i64 = 1_700_000_000_000;
2143 const T2_MS: i64 = T1_MS + 50;
2144 let before_reload = offset_from_ms(T1_MS);
2145 let after_reload = offset_from_ms(T2_MS);
2146
2147 let clock = pin_test_occurred_at_ms(T1_MS);
2148 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2149 .await
2150 .expect("initial manifest trigger installs");
2151 drop(clock);
2152
2153 let clock = pin_test_occurred_at_ms(T2_MS);
2154 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2155 .await
2156 .expect("updated manifest trigger installs");
2157 drop(clock);
2158
2159 assert_eq!(
2160 binding_version_as_of("github-new-issue", before_reload)
2161 .expect("version before reload"),
2162 1
2163 );
2164 assert_eq!(
2165 binding_version_as_of("github-new-issue", after_reload).expect("version after reload"),
2166 2
2167 );
2168
2169 reset_active_event_log();
2170 clear_trigger_registry();
2171 }
2172
2173 #[tokio::test(flavor = "current_thread")]
2174 async fn resolve_live_or_as_of_logs_structured_gc_fallback() {
2175 clear_trigger_registry();
2176 reset_active_event_log();
2177 let sink = Rc::new(CollectorSink::new());
2178 clear_event_sinks();
2179 add_event_sink(sink.clone());
2180 let tempdir = tempfile::tempdir().expect("tempdir");
2181 install_default_for_base_dir(tempdir.path()).expect("install event log");
2182
2183 const T1_MS: i64 = 1_700_000_000_000;
2187 const T2_MS: i64 = T1_MS + 50;
2188 let received_at = offset_from_ms(T1_MS);
2189
2190 let clock = pin_test_occurred_at_ms(T1_MS);
2191 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2192 .await
2193 .expect("install v1");
2194 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2195 .await
2196 .expect("install v2");
2197 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
2198 .await
2199 .expect("install v3");
2200 drop(clock);
2201
2202 let clock = pin_test_occurred_at_ms(T2_MS);
2203 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v4")])
2204 .await
2205 .expect("install v4");
2206 drop(clock);
2207
2208 let binding = resolve_live_or_as_of(
2209 "github-new-issue",
2210 RecordedTriggerBinding {
2211 version: 1,
2212 received_at,
2213 },
2214 )
2215 .expect("resolve fallback binding");
2216 assert_eq!(binding.version, 3);
2217
2218 let warning = sink
2219 .logs
2220 .borrow()
2221 .iter()
2222 .find(|log| log.category == "replay.binding_version_gc_fallback")
2223 .cloned()
2224 .expect("gc fallback warning");
2225 assert_eq!(warning.level, EventLevel::Warn);
2226 assert_eq!(
2227 warning.metadata.get("trigger_id"),
2228 Some(&serde_json::json!("github-new-issue"))
2229 );
2230 assert_eq!(
2231 warning.metadata.get("recorded_version"),
2232 Some(&serde_json::json!(1))
2233 );
2234 assert_eq!(
2235 warning.metadata.get("received_at"),
2236 Some(&serde_json::json!(received_at
2237 .format(&time::format_description::well_known::Rfc3339)
2238 .unwrap_or_else(|_| received_at.to_string())))
2239 );
2240 assert_eq!(
2241 warning.metadata.get("resolved_version"),
2242 Some(&serde_json::json!(3))
2243 );
2244
2245 clear_event_sinks();
2246 crate::events::reset_event_sinks();
2247 reset_active_event_log();
2248 clear_trigger_registry();
2249 }
2250}