Skip to main content

harn_vm/triggers/
registry.rs

1use serde::{Deserialize, Serialize};
2use std::cell::RefCell;
3use std::collections::{BTreeMap, BTreeSet, VecDeque};
4use std::path::PathBuf;
5use std::rc::Rc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::{Arc, Mutex};
8
9use time::OffsetDateTime;
10use uuid::Uuid;
11
12use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogEvent, Topic};
13use crate::llm::trigger_predicate::TriggerPredicateBudget;
14use crate::secrets::{configured_default_chain, SecretProvider};
15use crate::triggers::test_util::clock;
16use crate::trust_graph::AutonomyTier;
17use crate::value::VmClosure;
18
19use super::dispatcher::TriggerRetryConfig;
20use super::flow_control::TriggerFlowControlConfig;
21use super::ProviderId;
22
23#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
24pub struct TriggerId(String);
25
26impl TriggerId {
27    pub fn new(value: impl Into<String>) -> Self {
28        Self(value.into())
29    }
30
31    pub fn as_str(&self) -> &str {
32        &self.0
33    }
34}
35
36impl std::fmt::Display for TriggerId {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        self.0.fmt(f)
39    }
40}
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
43#[serde(rename_all = "snake_case")]
44pub enum TriggerState {
45    Registering,
46    Active,
47    Draining,
48    Terminated,
49}
50
51impl TriggerState {
52    pub fn as_str(self) -> &'static str {
53        match self {
54            Self::Registering => "registering",
55            Self::Active => "active",
56            Self::Draining => "draining",
57            Self::Terminated => "terminated",
58        }
59    }
60}
61
62#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
63#[serde(rename_all = "snake_case")]
64pub enum TriggerBindingSource {
65    Manifest,
66    Dynamic,
67}
68
69impl TriggerBindingSource {
70    pub fn as_str(self) -> &'static str {
71        match self {
72            Self::Manifest => "manifest",
73            Self::Dynamic => "dynamic",
74        }
75    }
76}
77
78#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
79#[serde(rename_all = "snake_case")]
80pub enum TriggerBudgetExhaustionStrategy {
81    #[default]
82    False,
83    RetryLater,
84    Fail,
85    Warn,
86}
87
88impl TriggerBudgetExhaustionStrategy {
89    pub fn as_str(self) -> &'static str {
90        match self {
91            Self::False => "false",
92            Self::RetryLater => "retry_later",
93            Self::Fail => "fail",
94            Self::Warn => "warn",
95        }
96    }
97}
98
99#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
100pub struct OrchestratorBudgetConfig {
101    pub daily_cost_usd: Option<f64>,
102    pub hourly_cost_usd: Option<f64>,
103}
104
105#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
106pub struct OrchestratorBudgetSnapshot {
107    pub daily_cost_usd: Option<f64>,
108    pub hourly_cost_usd: Option<f64>,
109    pub cost_today_usd_micros: u64,
110    pub cost_hour_usd_micros: u64,
111    pub day_utc: i32,
112    pub hour_utc: i64,
113}
114
115#[derive(Debug)]
116struct OrchestratorBudgetState {
117    config: OrchestratorBudgetConfig,
118    day_utc: i32,
119    hour_utc: i64,
120    cost_today_usd_micros: u64,
121    cost_hour_usd_micros: u64,
122}
123
124impl Default for OrchestratorBudgetState {
125    fn default() -> Self {
126        Self {
127            config: OrchestratorBudgetConfig::default(),
128            day_utc: utc_day_key(),
129            hour_utc: utc_hour_key(),
130            cost_today_usd_micros: 0,
131            cost_hour_usd_micros: 0,
132        }
133    }
134}
135
136#[derive(Clone)]
137pub enum TriggerHandlerSpec {
138    Local {
139        raw: String,
140        closure: Rc<VmClosure>,
141    },
142    A2a {
143        target: String,
144        allow_cleartext: bool,
145    },
146    Worker {
147        queue: String,
148    },
149}
150
151impl std::fmt::Debug for TriggerHandlerSpec {
152    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153        match self {
154            Self::Local { raw, .. } => f.debug_struct("Local").field("raw", raw).finish(),
155            Self::A2a {
156                target,
157                allow_cleartext,
158            } => f
159                .debug_struct("A2a")
160                .field("target", target)
161                .field("allow_cleartext", allow_cleartext)
162                .finish(),
163            Self::Worker { queue } => f.debug_struct("Worker").field("queue", queue).finish(),
164        }
165    }
166}
167
168impl TriggerHandlerSpec {
169    pub fn kind(&self) -> &'static str {
170        match self {
171            Self::Local { .. } => "local",
172            Self::A2a { .. } => "a2a",
173            Self::Worker { .. } => "worker",
174        }
175    }
176}
177
178#[derive(Clone)]
179pub struct TriggerPredicateSpec {
180    pub raw: String,
181    pub closure: Rc<VmClosure>,
182}
183
184impl std::fmt::Debug for TriggerPredicateSpec {
185    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186        f.debug_struct("TriggerPredicateSpec")
187            .field("raw", &self.raw)
188            .finish()
189    }
190}
191
192#[derive(Clone, Debug)]
193pub struct TriggerBindingSpec {
194    pub id: String,
195    pub source: TriggerBindingSource,
196    pub kind: String,
197    pub provider: ProviderId,
198    pub autonomy_tier: AutonomyTier,
199    pub handler: TriggerHandlerSpec,
200    pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
201    pub when: Option<TriggerPredicateSpec>,
202    pub when_budget: Option<TriggerPredicateBudget>,
203    pub retry: TriggerRetryConfig,
204    pub match_events: Vec<String>,
205    pub dedupe_key: Option<String>,
206    pub dedupe_retention_days: u32,
207    pub filter: Option<String>,
208    pub daily_cost_usd: Option<f64>,
209    pub hourly_cost_usd: Option<f64>,
210    pub max_autonomous_decisions_per_hour: Option<u64>,
211    pub max_autonomous_decisions_per_day: Option<u64>,
212    pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
213    pub max_concurrent: Option<u32>,
214    pub flow_control: TriggerFlowControlConfig,
215    pub manifest_path: Option<PathBuf>,
216    pub package_name: Option<String>,
217    pub definition_fingerprint: String,
218}
219
220#[derive(Debug)]
221pub struct TriggerMetrics {
222    pub received: AtomicU64,
223    pub dispatched: AtomicU64,
224    pub failed: AtomicU64,
225    pub dlq: AtomicU64,
226    pub last_received_ms: Mutex<Option<i64>>,
227    pub cost_total_usd_micros: AtomicU64,
228    pub cost_today_usd_micros: AtomicU64,
229    pub cost_hour_usd_micros: AtomicU64,
230    pub autonomous_decisions_total: AtomicU64,
231    pub autonomous_decisions_today: AtomicU64,
232    pub autonomous_decisions_hour: AtomicU64,
233}
234
235impl Default for TriggerMetrics {
236    fn default() -> Self {
237        Self {
238            received: AtomicU64::new(0),
239            dispatched: AtomicU64::new(0),
240            failed: AtomicU64::new(0),
241            dlq: AtomicU64::new(0),
242            last_received_ms: Mutex::new(None),
243            cost_total_usd_micros: AtomicU64::new(0),
244            cost_today_usd_micros: AtomicU64::new(0),
245            cost_hour_usd_micros: AtomicU64::new(0),
246            autonomous_decisions_total: AtomicU64::new(0),
247            autonomous_decisions_today: AtomicU64::new(0),
248            autonomous_decisions_hour: AtomicU64::new(0),
249        }
250    }
251}
252
253#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
254pub struct TriggerMetricsSnapshot {
255    pub received: u64,
256    pub dispatched: u64,
257    pub failed: u64,
258    pub dlq: u64,
259    pub in_flight: u64,
260    pub last_received_ms: Option<i64>,
261    pub cost_total_usd_micros: u64,
262    pub cost_today_usd_micros: u64,
263    pub cost_hour_usd_micros: u64,
264    pub autonomous_decisions_total: u64,
265    pub autonomous_decisions_today: u64,
266    pub autonomous_decisions_hour: u64,
267}
268
269pub struct TriggerBinding {
270    pub id: TriggerId,
271    pub version: u32,
272    pub source: TriggerBindingSource,
273    pub kind: String,
274    pub provider: ProviderId,
275    pub autonomy_tier: AutonomyTier,
276    pub handler: TriggerHandlerSpec,
277    pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
278    pub when: Option<TriggerPredicateSpec>,
279    pub when_budget: Option<TriggerPredicateBudget>,
280    pub retry: TriggerRetryConfig,
281    pub match_events: Vec<String>,
282    pub dedupe_key: Option<String>,
283    pub dedupe_retention_days: u32,
284    pub filter: Option<String>,
285    pub daily_cost_usd: Option<f64>,
286    pub hourly_cost_usd: Option<f64>,
287    pub max_autonomous_decisions_per_hour: Option<u64>,
288    pub max_autonomous_decisions_per_day: Option<u64>,
289    pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
290    pub max_concurrent: Option<u32>,
291    pub flow_control: TriggerFlowControlConfig,
292    pub manifest_path: Option<PathBuf>,
293    pub package_name: Option<String>,
294    pub definition_fingerprint: String,
295    pub state: Mutex<TriggerState>,
296    pub metrics: TriggerMetrics,
297    pub in_flight: AtomicU64,
298    pub cancel_token: Arc<AtomicBool>,
299    pub predicate_state: Mutex<TriggerPredicateState>,
300}
301
302#[derive(Clone, Debug, Default)]
303pub struct TriggerPredicateState {
304    pub budget_day_utc: Option<i32>,
305    pub budget_hour_utc: Option<i64>,
306    pub consecutive_failures: u32,
307    pub breaker_open_until_ms: Option<i64>,
308    pub recent_cost_usd_micros: VecDeque<u64>,
309}
310
311impl std::fmt::Debug for TriggerBinding {
312    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313        f.debug_struct("TriggerBinding")
314            .field("id", &self.id)
315            .field("version", &self.version)
316            .field("source", &self.source)
317            .field("kind", &self.kind)
318            .field("provider", &self.provider)
319            .field("handler_kind", &self.handler.kind())
320            .field("state", &self.state_snapshot())
321            .finish()
322    }
323}
324
325impl TriggerBinding {
326    pub fn snapshot(&self) -> TriggerBindingSnapshot {
327        TriggerBindingSnapshot {
328            id: self.id.as_str().to_string(),
329            version: self.version,
330            source: self.source,
331            kind: self.kind.clone(),
332            provider: self.provider.as_str().to_string(),
333            autonomy_tier: self.autonomy_tier,
334            handler_kind: self.handler.kind().to_string(),
335            state: self.state_snapshot(),
336            metrics: self.metrics_snapshot(),
337            daily_cost_usd: self.daily_cost_usd,
338            hourly_cost_usd: self.hourly_cost_usd,
339            max_autonomous_decisions_per_hour: self.max_autonomous_decisions_per_hour,
340            max_autonomous_decisions_per_day: self.max_autonomous_decisions_per_day,
341            on_budget_exhausted: self.on_budget_exhausted,
342        }
343    }
344
345    fn new(spec: TriggerBindingSpec, version: u32) -> Self {
346        Self {
347            id: TriggerId::new(spec.id),
348            version,
349            source: spec.source,
350            kind: spec.kind,
351            provider: spec.provider,
352            autonomy_tier: spec.autonomy_tier,
353            handler: spec.handler,
354            dispatch_priority: spec.dispatch_priority,
355            when: spec.when,
356            when_budget: spec.when_budget,
357            retry: spec.retry,
358            match_events: spec.match_events,
359            dedupe_key: spec.dedupe_key,
360            dedupe_retention_days: spec.dedupe_retention_days,
361            filter: spec.filter,
362            daily_cost_usd: spec.daily_cost_usd,
363            hourly_cost_usd: spec.hourly_cost_usd,
364            max_autonomous_decisions_per_hour: spec.max_autonomous_decisions_per_hour,
365            max_autonomous_decisions_per_day: spec.max_autonomous_decisions_per_day,
366            on_budget_exhausted: spec.on_budget_exhausted,
367            max_concurrent: spec.max_concurrent,
368            flow_control: spec.flow_control,
369            manifest_path: spec.manifest_path,
370            package_name: spec.package_name,
371            definition_fingerprint: spec.definition_fingerprint,
372            state: Mutex::new(TriggerState::Registering),
373            metrics: TriggerMetrics::default(),
374            in_flight: AtomicU64::new(0),
375            cancel_token: Arc::new(AtomicBool::new(false)),
376            predicate_state: Mutex::new(TriggerPredicateState::default()),
377        }
378    }
379
380    pub fn binding_key(&self) -> String {
381        format!("{}@v{}", self.id.as_str(), self.version)
382    }
383
384    pub fn state_snapshot(&self) -> TriggerState {
385        *self.state.lock().expect("trigger state poisoned")
386    }
387
388    pub fn metrics_snapshot(&self) -> TriggerMetricsSnapshot {
389        TriggerMetricsSnapshot {
390            received: self.metrics.received.load(Ordering::Relaxed),
391            dispatched: self.metrics.dispatched.load(Ordering::Relaxed),
392            failed: self.metrics.failed.load(Ordering::Relaxed),
393            dlq: self.metrics.dlq.load(Ordering::Relaxed),
394            in_flight: self.in_flight.load(Ordering::Relaxed),
395            last_received_ms: *self
396                .metrics
397                .last_received_ms
398                .lock()
399                .expect("trigger metrics poisoned"),
400            cost_total_usd_micros: self.metrics.cost_total_usd_micros.load(Ordering::Relaxed),
401            cost_today_usd_micros: self.metrics.cost_today_usd_micros.load(Ordering::Relaxed),
402            cost_hour_usd_micros: self.metrics.cost_hour_usd_micros.load(Ordering::Relaxed),
403            autonomous_decisions_total: self
404                .metrics
405                .autonomous_decisions_total
406                .load(Ordering::Relaxed),
407            autonomous_decisions_today: self
408                .metrics
409                .autonomous_decisions_today
410                .load(Ordering::Relaxed),
411            autonomous_decisions_hour: self
412                .metrics
413                .autonomous_decisions_hour
414                .load(Ordering::Relaxed),
415        }
416    }
417}
418
419#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
420pub struct TriggerBindingSnapshot {
421    pub id: String,
422    pub version: u32,
423    pub source: TriggerBindingSource,
424    pub kind: String,
425    pub provider: String,
426    pub autonomy_tier: AutonomyTier,
427    pub handler_kind: String,
428    pub state: TriggerState,
429    pub metrics: TriggerMetricsSnapshot,
430    pub daily_cost_usd: Option<f64>,
431    pub hourly_cost_usd: Option<f64>,
432    pub max_autonomous_decisions_per_hour: Option<u64>,
433    pub max_autonomous_decisions_per_day: Option<u64>,
434    pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
435}
436
437#[derive(Clone, Copy, Debug, PartialEq, Eq)]
438pub enum TriggerDispatchOutcome {
439    Dispatched,
440    Failed,
441    Dlq,
442}
443
444#[derive(Debug)]
445pub enum TriggerRegistryError {
446    DuplicateId(String),
447    InvalidSpec(String),
448    UnknownId(String),
449    UnknownBindingVersion { id: String, version: u32 },
450    EventLog(String),
451}
452
453impl std::fmt::Display for TriggerRegistryError {
454    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
455        match self {
456            Self::DuplicateId(id) => write!(f, "duplicate trigger id '{id}'"),
457            Self::InvalidSpec(message) | Self::EventLog(message) => f.write_str(message),
458            Self::UnknownId(id) => write!(f, "unknown trigger id '{id}'"),
459            Self::UnknownBindingVersion { id, version } => {
460                write!(f, "unknown trigger binding '{id}' version {version}")
461            }
462        }
463    }
464}
465
466impl std::error::Error for TriggerRegistryError {}
467
468#[derive(Default)]
469pub struct TriggerRegistry {
470    bindings: BTreeMap<String, Vec<Arc<TriggerBinding>>>,
471    by_provider: BTreeMap<String, BTreeSet<String>>,
472    event_log: Option<Arc<AnyEventLog>>,
473    secret_provider: Option<Arc<dyn SecretProvider>>,
474}
475
476thread_local! {
477    static TRIGGER_REGISTRY: RefCell<TriggerRegistry> = RefCell::new(TriggerRegistry::default());
478}
479
480thread_local! {
481    static ORCHESTRATOR_BUDGET: RefCell<OrchestratorBudgetState> =
482        RefCell::new(OrchestratorBudgetState::default());
483}
484
485const TERMINATED_VERSION_RETENTION_LIMIT: usize = 2;
486
487const TRIGGERS_LIFECYCLE_TOPIC: &str = "triggers.lifecycle";
488const PREDICATE_COST_WINDOW: usize = 100;
489
490#[derive(Clone, Debug, Deserialize)]
491struct LifecycleStateTransitionRecord {
492    id: String,
493    version: u32,
494    #[serde(default)]
495    definition_fingerprint: Option<String>,
496    to_state: TriggerState,
497}
498
499#[derive(Clone, Debug)]
500struct HistoricalLifecycleRecord {
501    occurred_at_ms: i64,
502    transition: LifecycleStateTransitionRecord,
503}
504
505#[derive(Clone, Copy, Debug, PartialEq, Eq)]
506pub struct RecordedTriggerBinding {
507    pub version: u32,
508    pub received_at: OffsetDateTime,
509}
510
511#[derive(Clone, Copy, Debug, Default)]
512struct HistoricalVersionLookup {
513    matching_version: Option<u32>,
514    max_version: Option<u32>,
515}
516
517pub fn clear_trigger_registry() {
518    TRIGGER_REGISTRY.with(|slot| {
519        *slot.borrow_mut() = TriggerRegistry::default();
520    });
521    clear_orchestrator_budget();
522}
523
524pub fn install_orchestrator_budget(config: OrchestratorBudgetConfig) {
525    ORCHESTRATOR_BUDGET.with(|slot| {
526        let mut state = slot.borrow_mut();
527        rollover_orchestrator_budget(&mut state);
528        state.config = config;
529    });
530}
531
532pub fn clear_orchestrator_budget() {
533    ORCHESTRATOR_BUDGET.with(|slot| {
534        *slot.borrow_mut() = OrchestratorBudgetState::default();
535    });
536}
537
538pub fn snapshot_orchestrator_budget() -> OrchestratorBudgetSnapshot {
539    ORCHESTRATOR_BUDGET.with(|slot| {
540        let mut state = slot.borrow_mut();
541        rollover_orchestrator_budget(&mut state);
542        OrchestratorBudgetSnapshot {
543            daily_cost_usd: state.config.daily_cost_usd,
544            hourly_cost_usd: state.config.hourly_cost_usd,
545            cost_today_usd_micros: state.cost_today_usd_micros,
546            cost_hour_usd_micros: state.cost_hour_usd_micros,
547            day_utc: state.day_utc,
548            hour_utc: state.hour_utc,
549        }
550    })
551}
552
553pub fn note_orchestrator_budget_cost(cost_usd_micros: u64) {
554    if cost_usd_micros == 0 {
555        return;
556    }
557    ORCHESTRATOR_BUDGET.with(|slot| {
558        let mut state = slot.borrow_mut();
559        rollover_orchestrator_budget(&mut state);
560        state.cost_today_usd_micros = state.cost_today_usd_micros.saturating_add(cost_usd_micros);
561        state.cost_hour_usd_micros = state.cost_hour_usd_micros.saturating_add(cost_usd_micros);
562    });
563}
564
565pub fn orchestrator_budget_would_exceed(expected_cost_usd_micros: u64) -> Option<&'static str> {
566    ORCHESTRATOR_BUDGET.with(|slot| {
567        let mut state = slot.borrow_mut();
568        rollover_orchestrator_budget(&mut state);
569        if state.config.hourly_cost_usd.is_some_and(|limit| {
570            micros_to_usd(
571                state
572                    .cost_hour_usd_micros
573                    .saturating_add(expected_cost_usd_micros),
574            ) > limit
575        }) {
576            return Some("orchestrator_hourly_budget_exceeded");
577        }
578        if state.config.daily_cost_usd.is_some_and(|limit| {
579            micros_to_usd(
580                state
581                    .cost_today_usd_micros
582                    .saturating_add(expected_cost_usd_micros),
583            ) > limit
584        }) {
585            return Some("orchestrator_daily_budget_exceeded");
586        }
587        None
588    })
589}
590
591pub fn reset_binding_budget_windows(binding: &TriggerBinding) {
592    let today = utc_day_key();
593    let hour = utc_hour_key();
594    let mut state = binding
595        .predicate_state
596        .lock()
597        .expect("trigger predicate state poisoned");
598    if state.budget_day_utc != Some(today) {
599        state.budget_day_utc = Some(today);
600        binding
601            .metrics
602            .cost_today_usd_micros
603            .store(0, Ordering::Relaxed);
604        binding
605            .metrics
606            .autonomous_decisions_today
607            .store(0, Ordering::Relaxed);
608    }
609    if state.budget_hour_utc != Some(hour) {
610        state.budget_hour_utc = Some(hour);
611        binding
612            .metrics
613            .cost_hour_usd_micros
614            .store(0, Ordering::Relaxed);
615        binding
616            .metrics
617            .autonomous_decisions_hour
618            .store(0, Ordering::Relaxed);
619    }
620}
621
622pub fn binding_budget_would_exceed(
623    binding: &TriggerBinding,
624    expected_cost_usd_micros: u64,
625) -> Option<&'static str> {
626    reset_binding_budget_windows(binding);
627    if binding.hourly_cost_usd.is_some_and(|limit| {
628        micros_to_usd(
629            binding
630                .metrics
631                .cost_hour_usd_micros
632                .load(Ordering::Relaxed)
633                .saturating_add(expected_cost_usd_micros),
634        ) > limit
635    }) {
636        return Some("hourly_budget_exceeded");
637    }
638    if binding.daily_cost_usd.is_some_and(|limit| {
639        micros_to_usd(
640            binding
641                .metrics
642                .cost_today_usd_micros
643                .load(Ordering::Relaxed)
644                .saturating_add(expected_cost_usd_micros),
645        ) > limit
646    }) {
647        return Some("daily_budget_exceeded");
648    }
649    None
650}
651
652pub fn binding_autonomy_budget_would_exceed(binding: &TriggerBinding) -> Option<&'static str> {
653    reset_binding_budget_windows(binding);
654    if binding
655        .max_autonomous_decisions_per_hour
656        .is_some_and(|limit| {
657            binding
658                .metrics
659                .autonomous_decisions_hour
660                .load(Ordering::Relaxed)
661                .saturating_add(1)
662                > limit
663        })
664    {
665        return Some("hourly_autonomy_budget_exceeded");
666    }
667    if binding
668        .max_autonomous_decisions_per_day
669        .is_some_and(|limit| {
670            binding
671                .metrics
672                .autonomous_decisions_today
673                .load(Ordering::Relaxed)
674                .saturating_add(1)
675                > limit
676        })
677    {
678        return Some("daily_autonomy_budget_exceeded");
679    }
680    None
681}
682
683pub fn note_autonomous_decision(binding: &TriggerBinding) {
684    reset_binding_budget_windows(binding);
685    binding
686        .metrics
687        .autonomous_decisions_total
688        .fetch_add(1, Ordering::Relaxed);
689    binding
690        .metrics
691        .autonomous_decisions_today
692        .fetch_add(1, Ordering::Relaxed);
693    binding
694        .metrics
695        .autonomous_decisions_hour
696        .fetch_add(1, Ordering::Relaxed);
697}
698
699pub fn expected_predicate_cost_usd_micros(binding: &TriggerBinding) -> u64 {
700    let state = binding
701        .predicate_state
702        .lock()
703        .expect("trigger predicate state poisoned");
704    if !state.recent_cost_usd_micros.is_empty() {
705        let total: u64 = state.recent_cost_usd_micros.iter().copied().sum();
706        return total / state.recent_cost_usd_micros.len() as u64;
707    }
708    binding
709        .when_budget
710        .as_ref()
711        .and_then(|budget| budget.max_cost_usd)
712        .map(usd_to_micros)
713        .unwrap_or_default()
714}
715
716pub fn record_predicate_cost_sample(binding: &TriggerBinding, cost_usd_micros: u64) {
717    let mut state = binding
718        .predicate_state
719        .lock()
720        .expect("trigger predicate state poisoned");
721    state.recent_cost_usd_micros.push_back(cost_usd_micros);
722    while state.recent_cost_usd_micros.len() > PREDICATE_COST_WINDOW {
723        state.recent_cost_usd_micros.pop_front();
724    }
725}
726
727pub fn usd_to_micros(value: f64) -> u64 {
728    if !value.is_finite() || value <= 0.0 {
729        return 0;
730    }
731    (value * 1_000_000.0).ceil() as u64
732}
733
734pub fn micros_to_usd(value: u64) -> f64 {
735    value as f64 / 1_000_000.0
736}
737
738fn rollover_orchestrator_budget(state: &mut OrchestratorBudgetState) {
739    let today = utc_day_key();
740    let hour = utc_hour_key();
741    if state.day_utc != today {
742        state.day_utc = today;
743        state.cost_today_usd_micros = 0;
744    }
745    if state.hour_utc != hour {
746        state.hour_utc = hour;
747        state.cost_hour_usd_micros = 0;
748    }
749}
750
751fn utc_day_key() -> i32 {
752    (clock::now_utc().date()
753        - time::Date::from_calendar_date(1970, time::Month::January, 1).expect("valid epoch date"))
754    .whole_days() as i32
755}
756
757fn utc_hour_key() -> i64 {
758    clock::now_utc().unix_timestamp() / 3_600
759}
760
761pub fn snapshot_trigger_bindings() -> Vec<TriggerBindingSnapshot> {
762    TRIGGER_REGISTRY.with(|slot| {
763        let registry = slot.borrow();
764        let mut snapshots = Vec::new();
765        for bindings in registry.bindings.values() {
766            for binding in bindings {
767                snapshots.push(binding.snapshot());
768            }
769        }
770        snapshots.sort_by(|left, right| {
771            left.id
772                .cmp(&right.id)
773                .then(left.version.cmp(&right.version))
774                .then(left.state.as_str().cmp(right.state.as_str()))
775        });
776        snapshots
777    })
778}
779
780#[allow(clippy::arc_with_non_send_sync)]
781pub fn resolve_trigger_binding_as_of(
782    id: &str,
783    as_of: OffsetDateTime,
784) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
785    let version = binding_version_as_of(id, as_of)?;
786    resolve_trigger_binding_version(id, version)
787}
788
789#[allow(clippy::arc_with_non_send_sync)]
790pub fn resolve_live_or_as_of(
791    id: &str,
792    recorded: RecordedTriggerBinding,
793) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
794    match resolve_live_trigger_binding(id, Some(recorded.version)) {
795        Ok(binding) => Ok(binding),
796        Err(TriggerRegistryError::UnknownBindingVersion { .. }) => {
797            let binding = resolve_trigger_binding_as_of(id, recorded.received_at)?;
798            let mut metadata = BTreeMap::new();
799            metadata.insert("trigger_id".to_string(), serde_json::json!(id));
800            metadata.insert(
801                "recorded_version".to_string(),
802                serde_json::json!(recorded.version),
803            );
804            metadata.insert(
805                "received_at".to_string(),
806                serde_json::json!(recorded
807                    .received_at
808                    .format(&time::format_description::well_known::Rfc3339)
809                    .unwrap_or_else(|_| recorded.received_at.to_string())),
810            );
811            metadata.insert(
812                "resolved_version".to_string(),
813                serde_json::json!(binding.version),
814            );
815            crate::events::log_warn_meta(
816                "replay.binding_version_gc_fallback",
817                "trigger replay fell back to lifecycle history after binding version GC",
818                metadata,
819            );
820            Ok(binding)
821        }
822        Err(error) => Err(error),
823    }
824}
825
826pub fn binding_version_as_of(id: &str, as_of: OffsetDateTime) -> Result<u32, TriggerRegistryError> {
827    TRIGGER_REGISTRY.with(|slot| {
828        let registry = slot.borrow();
829        registry.binding_version_as_of(id, as_of)
830    })
831}
832
833#[allow(clippy::arc_with_non_send_sync)]
834fn resolve_trigger_binding_version(
835    id: &str,
836    version: u32,
837) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
838    TRIGGER_REGISTRY.with(|slot| {
839        let registry = slot.borrow();
840        registry
841            .binding(id, version)
842            .ok_or_else(|| TriggerRegistryError::UnknownBindingVersion {
843                id: id.to_string(),
844                version,
845            })
846    })
847}
848
849#[allow(clippy::arc_with_non_send_sync)]
850pub fn resolve_live_trigger_binding(
851    id: &str,
852    version: Option<u32>,
853) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
854    TRIGGER_REGISTRY.with(|slot| {
855        let registry = slot.borrow();
856        if let Some(version) = version {
857            let binding = registry.binding(id, version).ok_or_else(|| {
858                TriggerRegistryError::UnknownBindingVersion {
859                    id: id.to_string(),
860                    version,
861                }
862            })?;
863            if binding.state_snapshot() == TriggerState::Terminated {
864                return Err(TriggerRegistryError::UnknownBindingVersion {
865                    id: id.to_string(),
866                    version,
867                });
868            }
869            return Ok(binding);
870        }
871
872        registry
873            .live_bindings_any_source(id)
874            .into_iter()
875            .max_by_key(|binding| binding.version)
876            .ok_or_else(|| TriggerRegistryError::UnknownId(id.to_string()))
877    })
878}
879
880pub(crate) fn matching_bindings(event: &super::TriggerEvent) -> Vec<Arc<TriggerBinding>> {
881    TRIGGER_REGISTRY.with(|slot| {
882        let registry = slot.borrow();
883        let Some(binding_ids) = registry.by_provider.get(event.provider.as_str()) else {
884            return Vec::new();
885        };
886
887        let mut bindings = Vec::new();
888        for id in binding_ids {
889            let Some(versions) = registry.bindings.get(id) else {
890                continue;
891            };
892            for binding in versions {
893                if binding.state_snapshot() != TriggerState::Active {
894                    continue;
895                }
896                if !binding.match_events.is_empty()
897                    && !binding.match_events.iter().any(|kind| kind == &event.kind)
898                {
899                    continue;
900                }
901                bindings.push(binding.clone());
902            }
903        }
904
905        bindings.sort_by(|left, right| {
906            left.id
907                .as_str()
908                .cmp(right.id.as_str())
909                .then(left.version.cmp(&right.version))
910        });
911        bindings
912    })
913}
914
915pub async fn install_manifest_triggers(
916    specs: Vec<TriggerBindingSpec>,
917) -> Result<(), TriggerRegistryError> {
918    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
919        let registry = &mut *slot.borrow_mut();
920        registry.refresh_runtime_context();
921        let mut touched_ids = BTreeSet::new();
922
923        let mut incoming = BTreeMap::new();
924        for spec in specs {
925            let spec_id = spec.id.clone();
926            if spec.source != TriggerBindingSource::Manifest {
927                return Err(TriggerRegistryError::InvalidSpec(format!(
928                    "manifest install received non-manifest trigger '{}'",
929                    spec_id
930                )));
931            }
932            if spec_id.trim().is_empty() {
933                return Err(TriggerRegistryError::InvalidSpec(
934                    "manifest trigger id cannot be empty".to_string(),
935                ));
936            }
937            if incoming.insert(spec_id.clone(), spec).is_some() {
938                return Err(TriggerRegistryError::DuplicateId(spec_id));
939            }
940        }
941
942        let mut lifecycle = Vec::new();
943        let existing_ids: Vec<String> = registry
944            .bindings
945            .iter()
946            .filter(|(_, bindings)| {
947                bindings.iter().any(|binding| {
948                    binding.source == TriggerBindingSource::Manifest
949                        && binding.state_snapshot() != TriggerState::Terminated
950                })
951            })
952            .map(|(id, _)| id.clone())
953            .collect();
954
955        for id in existing_ids {
956            let live_manifest = registry.live_bindings(&id, TriggerBindingSource::Manifest);
957            let Some(spec) = incoming.remove(&id) else {
958                for binding in live_manifest {
959                    registry.transition_binding_to_draining(&binding, &mut lifecycle);
960                }
961                touched_ids.insert(id.clone());
962                continue;
963            };
964
965            let has_matching_active = live_manifest.iter().any(|binding| {
966                binding.definition_fingerprint == spec.definition_fingerprint
967                    && matches!(
968                        binding.state_snapshot(),
969                        TriggerState::Registering | TriggerState::Active
970                    )
971            });
972            if has_matching_active {
973                continue;
974            }
975
976            for binding in live_manifest {
977                registry.transition_binding_to_draining(&binding, &mut lifecycle);
978            }
979
980            let version = registry.next_version_for_spec(&spec);
981            registry.register_binding(spec, version, &mut lifecycle);
982            touched_ids.insert(id.clone());
983        }
984
985        for spec in incoming.into_values() {
986            touched_ids.insert(spec.id.clone());
987            let version = registry.next_version_for_spec(&spec);
988            registry.register_binding(spec, version, &mut lifecycle);
989        }
990
991        for id in touched_ids {
992            registry.gc_terminated_versions(&id);
993        }
994
995        Ok((registry.event_log.clone(), lifecycle))
996    })?;
997
998    append_lifecycle_events(event_log, events).await
999}
1000
1001pub async fn dynamic_register(
1002    mut spec: TriggerBindingSpec,
1003) -> Result<TriggerId, TriggerRegistryError> {
1004    if spec.id.trim().is_empty() {
1005        spec.id = format!("dynamic_trigger_{}", Uuid::now_v7());
1006    }
1007    spec.source = TriggerBindingSource::Dynamic;
1008    let id = spec.id.clone();
1009    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1010        let registry = &mut *slot.borrow_mut();
1011        registry.refresh_runtime_context();
1012
1013        if registry.bindings.contains_key(id.as_str()) {
1014            return Err(TriggerRegistryError::DuplicateId(id.clone()));
1015        }
1016
1017        let mut lifecycle = Vec::new();
1018        let version = registry.next_version_for_spec(&spec);
1019        registry.register_binding(spec, version, &mut lifecycle);
1020        Ok((registry.event_log.clone(), lifecycle))
1021    })?;
1022
1023    append_lifecycle_events(event_log, events).await?;
1024    Ok(TriggerId::new(id))
1025}
1026
1027pub async fn dynamic_deregister(id: &str) -> Result<(), TriggerRegistryError> {
1028    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1029        let registry = &mut *slot.borrow_mut();
1030        let live_dynamic = registry.live_bindings(id, TriggerBindingSource::Dynamic);
1031        if live_dynamic.is_empty() {
1032            return Err(TriggerRegistryError::UnknownId(id.to_string()));
1033        }
1034
1035        let mut lifecycle = Vec::new();
1036        for binding in live_dynamic {
1037            registry.transition_binding_to_draining(&binding, &mut lifecycle);
1038        }
1039        Ok((registry.event_log.clone(), lifecycle))
1040    })?;
1041
1042    append_lifecycle_events(event_log, events).await
1043}
1044
1045pub async fn drain(id: &str) -> Result<(), TriggerRegistryError> {
1046    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1047        let registry = &mut *slot.borrow_mut();
1048        let live = registry.live_bindings_any_source(id);
1049        if live.is_empty() {
1050            return Err(TriggerRegistryError::UnknownId(id.to_string()));
1051        }
1052
1053        let mut lifecycle = Vec::new();
1054        for binding in live {
1055            registry.transition_binding_to_draining(&binding, &mut lifecycle);
1056        }
1057        Ok((registry.event_log.clone(), lifecycle))
1058    })?;
1059
1060    append_lifecycle_events(event_log, events).await
1061}
1062
1063fn pin_trigger_binding_inner(
1064    id: &str,
1065    version: u32,
1066    allow_terminated: bool,
1067) -> Result<(), TriggerRegistryError> {
1068    TRIGGER_REGISTRY.with(|slot| {
1069        let registry = slot.borrow();
1070        let binding = registry.binding(id, version).ok_or_else(|| {
1071            TriggerRegistryError::UnknownBindingVersion {
1072                id: id.to_string(),
1073                version,
1074            }
1075        })?;
1076        match binding.state_snapshot() {
1077            TriggerState::Terminated if !allow_terminated => {
1078                Err(TriggerRegistryError::InvalidSpec(format!(
1079                    "trigger binding '{}' version {} is terminated",
1080                    id, version
1081                )))
1082            }
1083            _ => {
1084                binding.in_flight.fetch_add(1, Ordering::Relaxed);
1085                Ok(())
1086            }
1087        }
1088    })
1089}
1090
1091pub fn pin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1092    pin_trigger_binding_inner(id, version, false)
1093}
1094
1095pub async fn unpin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1096    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1097        let registry = &mut *slot.borrow_mut();
1098        let binding = registry.binding(id, version).ok_or_else(|| {
1099            TriggerRegistryError::UnknownBindingVersion {
1100                id: id.to_string(),
1101                version,
1102            }
1103        })?;
1104        let current = binding.in_flight.load(Ordering::Relaxed);
1105        if current == 0 {
1106            return Err(TriggerRegistryError::InvalidSpec(format!(
1107                "trigger binding '{}' version {} has no in-flight events",
1108                id, version
1109            )));
1110        }
1111        binding.in_flight.fetch_sub(1, Ordering::Relaxed);
1112
1113        let mut lifecycle = Vec::new();
1114        registry.maybe_finalize_draining(&binding, &mut lifecycle);
1115        registry.gc_terminated_versions(binding.id.as_str());
1116        Ok((registry.event_log.clone(), lifecycle))
1117    })?;
1118
1119    append_lifecycle_events(event_log, events).await
1120}
1121
1122pub fn begin_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1123    begin_in_flight_inner(id, version, false)
1124}
1125
1126pub(crate) fn begin_replay_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1127    begin_in_flight_inner(id, version, true)
1128}
1129
1130fn begin_in_flight_inner(
1131    id: &str,
1132    version: u32,
1133    allow_terminated: bool,
1134) -> Result<(), TriggerRegistryError> {
1135    pin_trigger_binding_inner(id, version, allow_terminated)?;
1136    TRIGGER_REGISTRY.with(|slot| {
1137        let registry = slot.borrow();
1138        let binding = registry.binding(id, version).ok_or_else(|| {
1139            TriggerRegistryError::UnknownBindingVersion {
1140                id: id.to_string(),
1141                version,
1142            }
1143        })?;
1144        binding.metrics.received.fetch_add(1, Ordering::Relaxed);
1145        *binding
1146            .metrics
1147            .last_received_ms
1148            .lock()
1149            .expect("trigger metrics poisoned") = Some(now_ms());
1150        Ok(())
1151    })
1152}
1153
1154pub async fn finish_in_flight(
1155    id: &str,
1156    version: u32,
1157    outcome: TriggerDispatchOutcome,
1158) -> Result<(), TriggerRegistryError> {
1159    TRIGGER_REGISTRY.with(|slot| {
1160        let registry = &mut *slot.borrow_mut();
1161        let binding = registry.binding(id, version).ok_or_else(|| {
1162            TriggerRegistryError::UnknownBindingVersion {
1163                id: id.to_string(),
1164                version,
1165            }
1166        })?;
1167        let current = binding.in_flight.load(Ordering::Relaxed);
1168        if current == 0 {
1169            return Err(TriggerRegistryError::InvalidSpec(format!(
1170                "trigger binding '{}' version {} has no in-flight events",
1171                id, version
1172            )));
1173        }
1174        match outcome {
1175            TriggerDispatchOutcome::Dispatched => {
1176                binding.metrics.dispatched.fetch_add(1, Ordering::Relaxed);
1177            }
1178            TriggerDispatchOutcome::Failed => {
1179                binding.metrics.failed.fetch_add(1, Ordering::Relaxed);
1180            }
1181            TriggerDispatchOutcome::Dlq => {
1182                binding.metrics.dlq.fetch_add(1, Ordering::Relaxed);
1183            }
1184        }
1185        Ok(())
1186    })?;
1187
1188    unpin_trigger_binding(id, version).await
1189}
1190
1191impl TriggerRegistry {
1192    fn refresh_runtime_context(&mut self) {
1193        if self.event_log.is_none() {
1194            self.event_log = active_event_log();
1195        }
1196        if self.secret_provider.is_none() {
1197            self.secret_provider = default_secret_provider();
1198        }
1199    }
1200
1201    fn binding(&self, id: &str, version: u32) -> Option<Arc<TriggerBinding>> {
1202        self.bindings
1203            .get(id)
1204            .and_then(|bindings| bindings.iter().find(|binding| binding.version == version))
1205            .cloned()
1206    }
1207
1208    fn live_bindings(&self, id: &str, source: TriggerBindingSource) -> Vec<Arc<TriggerBinding>> {
1209        self.bindings
1210            .get(id)
1211            .into_iter()
1212            .flat_map(|bindings| bindings.iter())
1213            .filter(|binding| {
1214                binding.source == source && binding.state_snapshot() != TriggerState::Terminated
1215            })
1216            .cloned()
1217            .collect()
1218    }
1219
1220    fn live_bindings_any_source(&self, id: &str) -> Vec<Arc<TriggerBinding>> {
1221        self.bindings
1222            .get(id)
1223            .into_iter()
1224            .flat_map(|bindings| bindings.iter())
1225            .filter(|binding| binding.state_snapshot() != TriggerState::Terminated)
1226            .cloned()
1227            .collect()
1228    }
1229
1230    fn next_version_for_spec(&self, spec: &TriggerBindingSpec) -> u32 {
1231        if let Some(version) = self
1232            .bindings
1233            .get(spec.id.as_str())
1234            .into_iter()
1235            .flat_map(|bindings| bindings.iter())
1236            .find(|binding| binding.definition_fingerprint == spec.definition_fingerprint)
1237            .map(|binding| binding.version)
1238        {
1239            return version;
1240        }
1241
1242        let historical =
1243            self.historical_versions_for(spec.id.as_str(), spec.definition_fingerprint.as_str());
1244        if let Some(version) = historical.matching_version {
1245            return version;
1246        }
1247
1248        self.bindings
1249            .get(spec.id.as_str())
1250            .into_iter()
1251            .flat_map(|bindings| bindings.iter())
1252            .map(|binding| binding.version)
1253            .chain(historical.max_version)
1254            .max()
1255            .unwrap_or(0)
1256            + 1
1257    }
1258
1259    fn gc_terminated_versions(&mut self, id: &str) {
1260        let Some(bindings) = self.bindings.get_mut(id) else {
1261            return;
1262        };
1263
1264        let mut newest_versions: Vec<u32> =
1265            bindings.iter().map(|binding| binding.version).collect();
1266        newest_versions.sort_unstable_by(|left, right| right.cmp(left));
1267        newest_versions.truncate(TERMINATED_VERSION_RETENTION_LIMIT);
1268        let retained_versions: BTreeSet<u32> = newest_versions.into_iter().collect();
1269
1270        bindings.retain(|binding| {
1271            binding.state_snapshot() != TriggerState::Terminated
1272                || retained_versions.contains(&binding.version)
1273        });
1274
1275        if bindings.is_empty() {
1276            self.bindings.remove(id);
1277        }
1278    }
1279
1280    fn historical_versions_for(&self, id: &str, fingerprint: &str) -> HistoricalVersionLookup {
1281        let mut lookup = HistoricalVersionLookup::default();
1282        for record in self.lifecycle_records_for(id) {
1283            lookup.max_version = Some(
1284                lookup
1285                    .max_version
1286                    .unwrap_or(0)
1287                    .max(record.transition.version),
1288            );
1289            if record.transition.definition_fingerprint.as_deref() == Some(fingerprint) {
1290                lookup.matching_version = Some(record.transition.version);
1291            }
1292        }
1293        lookup
1294    }
1295
1296    fn binding_version_as_of(
1297        &self,
1298        id: &str,
1299        as_of: OffsetDateTime,
1300    ) -> Result<u32, TriggerRegistryError> {
1301        let cutoff_ms = (as_of.unix_timestamp_nanos() / 1_000_000) as i64;
1302        let mut active_version = None;
1303        for record in self.lifecycle_records_for(id) {
1304            if record.occurred_at_ms > cutoff_ms {
1305                break;
1306            }
1307            match record.transition.to_state {
1308                TriggerState::Active => active_version = Some(record.transition.version),
1309                TriggerState::Draining | TriggerState::Terminated => {
1310                    if active_version == Some(record.transition.version) {
1311                        active_version = None;
1312                    }
1313                }
1314                TriggerState::Registering => {}
1315            }
1316        }
1317
1318        active_version.ok_or_else(|| {
1319            TriggerRegistryError::InvalidSpec(format!(
1320                "no active trigger binding '{}' at {}",
1321                id,
1322                as_of
1323                    .format(&time::format_description::well_known::Rfc3339)
1324                    .unwrap_or_else(|_| as_of.to_string())
1325            ))
1326        })
1327    }
1328
1329    fn lifecycle_records_for(&self, id: &str) -> Vec<HistoricalLifecycleRecord> {
1330        let Some(event_log) = self.event_log.as_ref() else {
1331            return Vec::new();
1332        };
1333        let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1334            .expect("static triggers.lifecycle topic should always be valid");
1335        futures::executor::block_on(event_log.read_range(&topic, None, usize::MAX))
1336            .unwrap_or_default()
1337            .into_iter()
1338            .filter_map(|(_, event)| {
1339                let occurred_at_ms = event.occurred_at_ms;
1340                let transition: LifecycleStateTransitionRecord =
1341                    serde_json::from_value(event.payload).ok()?;
1342                (transition.id == id).then_some(HistoricalLifecycleRecord {
1343                    occurred_at_ms,
1344                    transition,
1345                })
1346            })
1347            .collect()
1348    }
1349
1350    #[allow(clippy::arc_with_non_send_sync)]
1351    fn register_binding(
1352        &mut self,
1353        spec: TriggerBindingSpec,
1354        version: u32,
1355        lifecycle: &mut Vec<LogEvent>,
1356    ) -> Arc<TriggerBinding> {
1357        let binding = Arc::new(TriggerBinding::new(spec, version));
1358        self.by_provider
1359            .entry(binding.provider.as_str().to_string())
1360            .or_default()
1361            .insert(binding.id.as_str().to_string());
1362        self.bindings
1363            .entry(binding.id.as_str().to_string())
1364            .or_default()
1365            .push(binding.clone());
1366        lifecycle.push(lifecycle_event(&binding, None, TriggerState::Registering));
1367        self.transition_binding_state(&binding, TriggerState::Active, lifecycle);
1368        binding
1369    }
1370
1371    fn transition_binding_to_draining(
1372        &self,
1373        binding: &Arc<TriggerBinding>,
1374        lifecycle: &mut Vec<LogEvent>,
1375    ) {
1376        if matches!(binding.state_snapshot(), TriggerState::Terminated) {
1377            return;
1378        }
1379        self.transition_binding_state(binding, TriggerState::Draining, lifecycle);
1380        self.maybe_finalize_draining(binding, lifecycle);
1381    }
1382
1383    fn maybe_finalize_draining(
1384        &self,
1385        binding: &Arc<TriggerBinding>,
1386        lifecycle: &mut Vec<LogEvent>,
1387    ) {
1388        if binding.state_snapshot() == TriggerState::Draining
1389            && binding.in_flight.load(Ordering::Relaxed) == 0
1390        {
1391            self.transition_binding_state(binding, TriggerState::Terminated, lifecycle);
1392        }
1393    }
1394
1395    fn transition_binding_state(
1396        &self,
1397        binding: &Arc<TriggerBinding>,
1398        next: TriggerState,
1399        lifecycle: &mut Vec<LogEvent>,
1400    ) {
1401        let mut state = binding.state.lock().expect("trigger state poisoned");
1402        let previous = *state;
1403        if previous == next {
1404            return;
1405        }
1406        *state = next;
1407        drop(state);
1408        lifecycle.push(lifecycle_event(binding, Some(previous), next));
1409    }
1410}
1411
1412fn lifecycle_event(
1413    binding: &TriggerBinding,
1414    from_state: Option<TriggerState>,
1415    to_state: TriggerState,
1416) -> LogEvent {
1417    LogEvent::new(
1418        "state_transition",
1419        serde_json::json!({
1420            "id": binding.id.as_str(),
1421            "binding_key": binding.binding_key(),
1422            "version": binding.version,
1423            "provider": binding.provider.as_str(),
1424            "kind": &binding.kind,
1425            "source": binding.source.as_str(),
1426            "handler_kind": binding.handler.kind(),
1427            "definition_fingerprint": &binding.definition_fingerprint,
1428            "from_state": from_state.map(TriggerState::as_str),
1429            "to_state": to_state.as_str(),
1430        }),
1431    )
1432}
1433
1434async fn append_lifecycle_events(
1435    event_log: Option<Arc<AnyEventLog>>,
1436    events: Vec<LogEvent>,
1437) -> Result<(), TriggerRegistryError> {
1438    let Some(event_log) = event_log else {
1439        return Ok(());
1440    };
1441    if events.is_empty() {
1442        return Ok(());
1443    }
1444
1445    let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1446        .expect("static triggers.lifecycle topic should always be valid");
1447    for event in events {
1448        event_log
1449            .append(&topic, event)
1450            .await
1451            .map_err(|error| TriggerRegistryError::EventLog(error.to_string()))?;
1452    }
1453    Ok(())
1454}
1455
1456fn default_secret_provider() -> Option<Arc<dyn SecretProvider>> {
1457    configured_default_chain(default_secret_namespace())
1458        .ok()
1459        .map(|provider| Arc::new(provider) as Arc<dyn SecretProvider>)
1460}
1461
1462fn default_secret_namespace() -> String {
1463    if let Ok(namespace) = std::env::var("HARN_SECRET_NAMESPACE") {
1464        if !namespace.trim().is_empty() {
1465            return namespace;
1466        }
1467    }
1468
1469    let cwd = std::env::current_dir().unwrap_or_default();
1470    let leaf = cwd
1471        .file_name()
1472        .and_then(|name| name.to_str())
1473        .filter(|name| !name.is_empty())
1474        .unwrap_or("workspace");
1475    format!("harn/{leaf}")
1476}
1477
1478fn now_ms() -> i64 {
1479    clock::now_ms()
1480}
1481
1482#[cfg(test)]
1483mod tests {
1484    use super::*;
1485    use crate::event_log::{install_default_for_base_dir, reset_active_event_log};
1486    use crate::events::{add_event_sink, clear_event_sinks, CollectorSink, EventLevel};
1487    use std::rc::Rc;
1488    use time::OffsetDateTime;
1489
1490    fn manifest_spec(id: &str, fingerprint: &str) -> TriggerBindingSpec {
1491        TriggerBindingSpec {
1492            id: id.to_string(),
1493            source: TriggerBindingSource::Manifest,
1494            kind: "webhook".to_string(),
1495            provider: ProviderId::from("github"),
1496            autonomy_tier: crate::AutonomyTier::ActAuto,
1497            handler: TriggerHandlerSpec::Worker {
1498                queue: format!("{id}-queue"),
1499            },
1500            dispatch_priority: crate::WorkerQueuePriority::Normal,
1501            when: None,
1502            when_budget: None,
1503            retry: TriggerRetryConfig::default(),
1504            match_events: vec!["issues.opened".to_string()],
1505            dedupe_key: Some("event.dedupe_key".to_string()),
1506            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1507            filter: Some("event.kind".to_string()),
1508            daily_cost_usd: Some(5.0),
1509            hourly_cost_usd: None,
1510            max_autonomous_decisions_per_hour: None,
1511            max_autonomous_decisions_per_day: None,
1512            on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1513            max_concurrent: Some(10),
1514            flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1515            manifest_path: None,
1516            package_name: Some("workspace".to_string()),
1517            definition_fingerprint: fingerprint.to_string(),
1518        }
1519    }
1520
1521    fn dynamic_spec(id: &str) -> TriggerBindingSpec {
1522        TriggerBindingSpec {
1523            id: id.to_string(),
1524            source: TriggerBindingSource::Dynamic,
1525            kind: "webhook".to_string(),
1526            provider: ProviderId::from("github"),
1527            autonomy_tier: crate::AutonomyTier::ActAuto,
1528            handler: TriggerHandlerSpec::Worker {
1529                queue: format!("{id}-queue"),
1530            },
1531            dispatch_priority: crate::WorkerQueuePriority::Normal,
1532            when: None,
1533            when_budget: None,
1534            retry: TriggerRetryConfig::default(),
1535            match_events: vec!["issues.opened".to_string()],
1536            dedupe_key: None,
1537            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1538            filter: None,
1539            daily_cost_usd: None,
1540            hourly_cost_usd: None,
1541            max_autonomous_decisions_per_hour: None,
1542            max_autonomous_decisions_per_day: None,
1543            on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1544            max_concurrent: None,
1545            flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1546            manifest_path: None,
1547            package_name: None,
1548            definition_fingerprint: format!("dynamic:{id}"),
1549        }
1550    }
1551
1552    #[tokio::test(flavor = "current_thread")]
1553    async fn manifest_loaded_trigger_registers_with_zeroed_metrics() {
1554        clear_trigger_registry();
1555
1556        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1557            .await
1558            .expect("manifest trigger installs");
1559
1560        let snapshots = snapshot_trigger_bindings();
1561        assert_eq!(snapshots.len(), 1);
1562        let binding = &snapshots[0];
1563        assert_eq!(binding.id, "github-new-issue");
1564        assert_eq!(binding.version, 1);
1565        assert_eq!(binding.state, TriggerState::Active);
1566        assert_eq!(binding.metrics, TriggerMetricsSnapshot::default());
1567
1568        clear_trigger_registry();
1569    }
1570
1571    #[tokio::test(flavor = "current_thread")]
1572    async fn dynamic_register_assigns_unique_ids_and_rejects_duplicates() {
1573        clear_trigger_registry();
1574
1575        let first = dynamic_register(dynamic_spec("dynamic-a"))
1576            .await
1577            .expect("first dynamic trigger");
1578        let second = dynamic_register(dynamic_spec("dynamic-b"))
1579            .await
1580            .expect("second dynamic trigger");
1581        assert_ne!(first, second);
1582
1583        let error = dynamic_register(dynamic_spec("dynamic-a"))
1584            .await
1585            .expect_err("duplicate id should fail");
1586        assert!(matches!(error, TriggerRegistryError::DuplicateId(_)));
1587
1588        clear_trigger_registry();
1589    }
1590
1591    #[tokio::test(flavor = "current_thread")]
1592    async fn drain_waits_for_in_flight_events_before_terminating() {
1593        clear_trigger_registry();
1594
1595        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1596            .await
1597            .expect("manifest trigger installs");
1598        begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1599
1600        drain("github-new-issue").await.expect("drain succeeds");
1601        let binding = snapshot_trigger_bindings()
1602            .into_iter()
1603            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1604            .expect("binding snapshot");
1605        assert_eq!(binding.state, TriggerState::Draining);
1606        assert_eq!(binding.metrics.in_flight, 1);
1607
1608        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1609            .await
1610            .expect("finish in-flight event");
1611        let binding = snapshot_trigger_bindings()
1612            .into_iter()
1613            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1614            .expect("binding snapshot");
1615        assert_eq!(binding.state, TriggerState::Terminated);
1616        assert_eq!(binding.metrics.in_flight, 0);
1617
1618        clear_trigger_registry();
1619    }
1620
1621    #[tokio::test(flavor = "current_thread")]
1622    async fn hot_reload_registers_new_version_while_old_binding_drains() {
1623        clear_trigger_registry();
1624
1625        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1626            .await
1627            .expect("initial manifest trigger installs");
1628        begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1629
1630        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1631            .await
1632            .expect("updated manifest trigger installs");
1633
1634        let snapshots = snapshot_trigger_bindings();
1635        assert_eq!(snapshots.len(), 2);
1636        let old = snapshots
1637            .iter()
1638            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1639            .expect("old binding");
1640        let new = snapshots
1641            .iter()
1642            .find(|binding| binding.id == "github-new-issue" && binding.version == 2)
1643            .expect("new binding");
1644        assert_eq!(old.state, TriggerState::Draining);
1645        assert_eq!(new.state, TriggerState::Active);
1646
1647        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1648            .await
1649            .expect("finish old in-flight event");
1650        let old = snapshot_trigger_bindings()
1651            .into_iter()
1652            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1653            .expect("old binding");
1654        assert_eq!(old.state, TriggerState::Terminated);
1655
1656        clear_trigger_registry();
1657    }
1658
1659    #[tokio::test(flavor = "current_thread")]
1660    async fn gc_drops_terminated_versions_beyond_retention_limit() {
1661        clear_trigger_registry();
1662
1663        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1664            .await
1665            .expect("install v1");
1666        begin_in_flight("github-new-issue", 1).expect("pin v1");
1667
1668        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1669            .await
1670            .expect("install v2");
1671        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1672            .await
1673            .expect("finish v1");
1674
1675        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
1676            .await
1677            .expect("install v3");
1678
1679        let snapshots = snapshot_trigger_bindings();
1680        let versions: Vec<u32> = snapshots
1681            .into_iter()
1682            .filter(|binding| binding.id == "github-new-issue")
1683            .map(|binding| binding.version)
1684            .collect();
1685        assert_eq!(versions, vec![2, 3]);
1686
1687        clear_trigger_registry();
1688    }
1689
1690    #[tokio::test(flavor = "current_thread")]
1691    async fn lifecycle_transitions_append_to_event_log() {
1692        clear_trigger_registry();
1693        reset_active_event_log();
1694        let tempdir = tempfile::tempdir().expect("tempdir");
1695        let log = install_default_for_base_dir(tempdir.path()).expect("install event log");
1696
1697        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1698            .await
1699            .expect("manifest trigger installs");
1700        begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1701        drain("github-new-issue").await.expect("drain succeeds");
1702        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1703            .await
1704            .expect("finish event");
1705
1706        let topic = Topic::new("triggers.lifecycle").expect("valid lifecycle topic");
1707        let events = log
1708            .read_range(&topic, None, 32)
1709            .await
1710            .expect("read lifecycle events");
1711        let states: Vec<String> = events
1712            .into_iter()
1713            .filter_map(|(_, event)| {
1714                event
1715                    .payload
1716                    .get("to_state")
1717                    .and_then(|value| value.as_str())
1718                    .map(|value| value.to_string())
1719            })
1720            .collect();
1721        assert_eq!(
1722            states,
1723            vec![
1724                "registering".to_string(),
1725                "active".to_string(),
1726                "draining".to_string(),
1727                "terminated".to_string(),
1728            ]
1729        );
1730
1731        reset_active_event_log();
1732        clear_trigger_registry();
1733    }
1734
1735    #[tokio::test(flavor = "current_thread")]
1736    async fn version_history_reuses_historical_version_after_restart() {
1737        clear_trigger_registry();
1738        reset_active_event_log();
1739        let tempdir = tempfile::tempdir().expect("tempdir");
1740        install_default_for_base_dir(tempdir.path()).expect("install event log");
1741
1742        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1743            .await
1744            .expect("initial manifest trigger installs");
1745        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1746            .await
1747            .expect("updated manifest trigger installs");
1748
1749        clear_trigger_registry();
1750        reset_active_event_log();
1751        install_default_for_base_dir(tempdir.path()).expect("reopen event log");
1752
1753        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1754            .await
1755            .expect("manifest reload reuses historical version");
1756
1757        let binding = snapshot_trigger_bindings()
1758            .into_iter()
1759            .find(|binding| binding.id == "github-new-issue")
1760            .expect("binding snapshot");
1761        assert_eq!(binding.version, 2);
1762
1763        reset_active_event_log();
1764        clear_trigger_registry();
1765    }
1766
1767    #[tokio::test(flavor = "current_thread")]
1768    async fn binding_version_as_of_reports_historical_active_version() {
1769        clear_trigger_registry();
1770        reset_active_event_log();
1771        let tempdir = tempfile::tempdir().expect("tempdir");
1772        install_default_for_base_dir(tempdir.path()).expect("install event log");
1773
1774        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1775            .await
1776            .expect("initial manifest trigger installs");
1777        let before_reload = OffsetDateTime::now_utc();
1778        std::thread::sleep(std::time::Duration::from_millis(10));
1779
1780        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1781            .await
1782            .expect("updated manifest trigger installs");
1783        let after_reload = OffsetDateTime::now_utc();
1784
1785        assert_eq!(
1786            binding_version_as_of("github-new-issue", before_reload)
1787                .expect("version before reload"),
1788            1
1789        );
1790        assert_eq!(
1791            binding_version_as_of("github-new-issue", after_reload).expect("version after reload"),
1792            2
1793        );
1794
1795        reset_active_event_log();
1796        clear_trigger_registry();
1797    }
1798
1799    #[tokio::test(flavor = "current_thread")]
1800    async fn resolve_live_or_as_of_logs_structured_gc_fallback() {
1801        clear_trigger_registry();
1802        reset_active_event_log();
1803        let sink = Rc::new(CollectorSink::new());
1804        clear_event_sinks();
1805        add_event_sink(sink.clone());
1806        let tempdir = tempfile::tempdir().expect("tempdir");
1807        install_default_for_base_dir(tempdir.path()).expect("install event log");
1808
1809        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1810            .await
1811            .expect("install v1");
1812        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1813            .await
1814            .expect("install v2");
1815        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
1816            .await
1817            .expect("install v3");
1818        let received_at = OffsetDateTime::now_utc();
1819        std::thread::sleep(std::time::Duration::from_millis(10));
1820        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v4")])
1821            .await
1822            .expect("install v4");
1823
1824        let binding = resolve_live_or_as_of(
1825            "github-new-issue",
1826            RecordedTriggerBinding {
1827                version: 1,
1828                received_at,
1829            },
1830        )
1831        .expect("resolve fallback binding");
1832        assert_eq!(binding.version, 3);
1833
1834        let warning = sink
1835            .logs
1836            .borrow()
1837            .iter()
1838            .find(|log| log.category == "replay.binding_version_gc_fallback")
1839            .cloned()
1840            .expect("gc fallback warning");
1841        assert_eq!(warning.level, EventLevel::Warn);
1842        assert_eq!(
1843            warning.metadata.get("trigger_id"),
1844            Some(&serde_json::json!("github-new-issue"))
1845        );
1846        assert_eq!(
1847            warning.metadata.get("recorded_version"),
1848            Some(&serde_json::json!(1))
1849        );
1850        assert_eq!(
1851            warning.metadata.get("received_at"),
1852            Some(&serde_json::json!(received_at
1853                .format(&time::format_description::well_known::Rfc3339)
1854                .unwrap_or_else(|_| received_at.to_string())))
1855        );
1856        assert_eq!(
1857            warning.metadata.get("resolved_version"),
1858            Some(&serde_json::json!(3))
1859        );
1860
1861        clear_event_sinks();
1862        crate::events::reset_event_sinks();
1863        reset_active_event_log();
1864        clear_trigger_registry();
1865    }
1866}