Skip to main content

harn_vm/triggers/
registry.rs

1use serde::{Deserialize, Serialize};
2use std::cell::RefCell;
3use std::collections::{BTreeMap, BTreeSet, VecDeque};
4use std::path::PathBuf;
5use std::rc::Rc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::{Arc, Mutex};
8
9use time::OffsetDateTime;
10use uuid::Uuid;
11
12use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogEvent, Topic};
13use crate::llm::trigger_predicate::TriggerPredicateBudget;
14use crate::secrets::{configured_default_chain, SecretProvider};
15use crate::triggers::test_util::clock;
16use crate::trust_graph::AutonomyTier;
17use crate::value::VmClosure;
18
19use super::dispatcher::TriggerRetryConfig;
20use super::flow_control::TriggerFlowControlConfig;
21use super::ProviderId;
22
23#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
24pub struct TriggerId(String);
25
26impl TriggerId {
27    pub fn new(value: impl Into<String>) -> Self {
28        Self(value.into())
29    }
30
31    pub fn as_str(&self) -> &str {
32        &self.0
33    }
34}
35
36impl std::fmt::Display for TriggerId {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        self.0.fmt(f)
39    }
40}
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
43#[serde(rename_all = "snake_case")]
44pub enum TriggerState {
45    Registering,
46    Active,
47    Paused,
48    Draining,
49    Terminated,
50}
51
52impl TriggerState {
53    pub fn as_str(self) -> &'static str {
54        match self {
55            Self::Registering => "registering",
56            Self::Active => "active",
57            Self::Paused => "paused",
58            Self::Draining => "draining",
59            Self::Terminated => "terminated",
60        }
61    }
62}
63
64#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
65#[serde(rename_all = "snake_case")]
66pub enum TriggerBindingSource {
67    Manifest,
68    Dynamic,
69}
70
71impl TriggerBindingSource {
72    pub fn as_str(self) -> &'static str {
73        match self {
74            Self::Manifest => "manifest",
75            Self::Dynamic => "dynamic",
76        }
77    }
78}
79
80#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
81#[serde(rename_all = "snake_case")]
82pub enum TriggerBudgetExhaustionStrategy {
83    #[default]
84    False,
85    RetryLater,
86    Fail,
87    Warn,
88}
89
90impl TriggerBudgetExhaustionStrategy {
91    pub fn as_str(self) -> &'static str {
92        match self {
93            Self::False => "false",
94            Self::RetryLater => "retry_later",
95            Self::Fail => "fail",
96            Self::Warn => "warn",
97        }
98    }
99}
100
101#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
102pub struct OrchestratorBudgetConfig {
103    pub daily_cost_usd: Option<f64>,
104    pub hourly_cost_usd: Option<f64>,
105}
106
107#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
108pub struct OrchestratorBudgetSnapshot {
109    pub daily_cost_usd: Option<f64>,
110    pub hourly_cost_usd: Option<f64>,
111    pub cost_today_usd_micros: u64,
112    pub cost_hour_usd_micros: u64,
113    pub day_utc: i32,
114    pub hour_utc: i64,
115}
116
117#[derive(Debug)]
118struct OrchestratorBudgetState {
119    config: OrchestratorBudgetConfig,
120    day_utc: i32,
121    hour_utc: i64,
122    cost_today_usd_micros: u64,
123    cost_hour_usd_micros: u64,
124}
125
126impl Default for OrchestratorBudgetState {
127    fn default() -> Self {
128        Self {
129            config: OrchestratorBudgetConfig::default(),
130            day_utc: utc_day_key(),
131            hour_utc: utc_hour_key(),
132            cost_today_usd_micros: 0,
133            cost_hour_usd_micros: 0,
134        }
135    }
136}
137
138#[derive(Clone)]
139pub enum TriggerHandlerSpec {
140    Local {
141        raw: String,
142        closure: Rc<VmClosure>,
143    },
144    A2a {
145        target: String,
146        allow_cleartext: bool,
147    },
148    Worker {
149        queue: String,
150    },
151    Persona {
152        binding: crate::PersonaRuntimeBinding,
153    },
154}
155
156impl std::fmt::Debug for TriggerHandlerSpec {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        match self {
159            Self::Local { raw, .. } => f.debug_struct("Local").field("raw", raw).finish(),
160            Self::A2a {
161                target,
162                allow_cleartext,
163            } => f
164                .debug_struct("A2a")
165                .field("target", target)
166                .field("allow_cleartext", allow_cleartext)
167                .finish(),
168            Self::Worker { queue } => f.debug_struct("Worker").field("queue", queue).finish(),
169            Self::Persona { binding } => f
170                .debug_struct("Persona")
171                .field("name", &binding.name)
172                .finish(),
173        }
174    }
175}
176
177impl TriggerHandlerSpec {
178    pub fn kind(&self) -> &'static str {
179        match self {
180            Self::Local { .. } => "local",
181            Self::A2a { .. } => "a2a",
182            Self::Worker { .. } => "worker",
183            Self::Persona { .. } => "persona",
184        }
185    }
186}
187
188#[derive(Clone)]
189pub struct TriggerPredicateSpec {
190    pub raw: String,
191    pub closure: Rc<VmClosure>,
192}
193
194impl std::fmt::Debug for TriggerPredicateSpec {
195    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196        f.debug_struct("TriggerPredicateSpec")
197            .field("raw", &self.raw)
198            .finish()
199    }
200}
201
202#[derive(Clone, Debug)]
203pub struct TriggerBindingSpec {
204    pub id: String,
205    pub source: TriggerBindingSource,
206    pub kind: String,
207    pub provider: ProviderId,
208    pub autonomy_tier: AutonomyTier,
209    pub handler: TriggerHandlerSpec,
210    pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
211    pub when: Option<TriggerPredicateSpec>,
212    pub when_budget: Option<TriggerPredicateBudget>,
213    pub retry: TriggerRetryConfig,
214    pub match_events: Vec<String>,
215    pub dedupe_key: Option<String>,
216    pub dedupe_retention_days: u32,
217    pub filter: Option<String>,
218    pub daily_cost_usd: Option<f64>,
219    pub hourly_cost_usd: Option<f64>,
220    pub max_autonomous_decisions_per_hour: Option<u64>,
221    pub max_autonomous_decisions_per_day: Option<u64>,
222    pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
223    pub max_concurrent: Option<u32>,
224    pub flow_control: TriggerFlowControlConfig,
225    pub manifest_path: Option<PathBuf>,
226    pub package_name: Option<String>,
227    pub definition_fingerprint: String,
228}
229
230#[derive(Debug)]
231pub struct TriggerMetrics {
232    pub received: AtomicU64,
233    pub dispatched: AtomicU64,
234    pub failed: AtomicU64,
235    pub dlq: AtomicU64,
236    pub last_received_ms: Mutex<Option<i64>>,
237    pub cost_total_usd_micros: AtomicU64,
238    pub cost_today_usd_micros: AtomicU64,
239    pub cost_hour_usd_micros: AtomicU64,
240    pub autonomous_decisions_total: AtomicU64,
241    pub autonomous_decisions_today: AtomicU64,
242    pub autonomous_decisions_hour: AtomicU64,
243}
244
245impl Default for TriggerMetrics {
246    fn default() -> Self {
247        Self {
248            received: AtomicU64::new(0),
249            dispatched: AtomicU64::new(0),
250            failed: AtomicU64::new(0),
251            dlq: AtomicU64::new(0),
252            last_received_ms: Mutex::new(None),
253            cost_total_usd_micros: AtomicU64::new(0),
254            cost_today_usd_micros: AtomicU64::new(0),
255            cost_hour_usd_micros: AtomicU64::new(0),
256            autonomous_decisions_total: AtomicU64::new(0),
257            autonomous_decisions_today: AtomicU64::new(0),
258            autonomous_decisions_hour: AtomicU64::new(0),
259        }
260    }
261}
262
263#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
264pub struct TriggerMetricsSnapshot {
265    pub received: u64,
266    pub dispatched: u64,
267    pub failed: u64,
268    pub dlq: u64,
269    pub in_flight: u64,
270    pub last_received_ms: Option<i64>,
271    pub cost_total_usd_micros: u64,
272    pub cost_today_usd_micros: u64,
273    pub cost_hour_usd_micros: u64,
274    pub autonomous_decisions_total: u64,
275    pub autonomous_decisions_today: u64,
276    pub autonomous_decisions_hour: u64,
277}
278
279pub struct TriggerBinding {
280    pub id: TriggerId,
281    pub version: u32,
282    pub source: TriggerBindingSource,
283    pub kind: String,
284    pub provider: ProviderId,
285    pub autonomy_tier: AutonomyTier,
286    pub handler: TriggerHandlerSpec,
287    pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
288    pub when: Option<TriggerPredicateSpec>,
289    pub when_budget: Option<TriggerPredicateBudget>,
290    pub retry: TriggerRetryConfig,
291    pub match_events: Vec<String>,
292    pub dedupe_key: Option<String>,
293    pub dedupe_retention_days: u32,
294    pub filter: Option<String>,
295    pub daily_cost_usd: Option<f64>,
296    pub hourly_cost_usd: Option<f64>,
297    pub max_autonomous_decisions_per_hour: Option<u64>,
298    pub max_autonomous_decisions_per_day: Option<u64>,
299    pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
300    pub max_concurrent: Option<u32>,
301    pub flow_control: TriggerFlowControlConfig,
302    pub manifest_path: Option<PathBuf>,
303    pub package_name: Option<String>,
304    pub definition_fingerprint: String,
305    pub state: Mutex<TriggerState>,
306    pub metrics: TriggerMetrics,
307    pub in_flight: AtomicU64,
308    pub cancel_token: Arc<AtomicBool>,
309    pub predicate_state: Mutex<TriggerPredicateState>,
310}
311
312#[derive(Clone, Debug, Default)]
313pub struct TriggerPredicateState {
314    pub budget_day_utc: Option<i32>,
315    pub budget_hour_utc: Option<i64>,
316    pub consecutive_failures: u32,
317    pub breaker_open_until_ms: Option<i64>,
318    pub recent_cost_usd_micros: VecDeque<u64>,
319}
320
321impl std::fmt::Debug for TriggerBinding {
322    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
323        f.debug_struct("TriggerBinding")
324            .field("id", &self.id)
325            .field("version", &self.version)
326            .field("source", &self.source)
327            .field("kind", &self.kind)
328            .field("provider", &self.provider)
329            .field("handler_kind", &self.handler.kind())
330            .field("state", &self.state_snapshot())
331            .finish()
332    }
333}
334
335impl TriggerBinding {
336    pub fn snapshot(&self) -> TriggerBindingSnapshot {
337        TriggerBindingSnapshot {
338            id: self.id.as_str().to_string(),
339            version: self.version,
340            source: self.source,
341            kind: self.kind.clone(),
342            provider: self.provider.as_str().to_string(),
343            autonomy_tier: self.autonomy_tier,
344            handler_kind: self.handler.kind().to_string(),
345            state: self.state_snapshot(),
346            metrics: self.metrics_snapshot(),
347            daily_cost_usd: self.daily_cost_usd,
348            hourly_cost_usd: self.hourly_cost_usd,
349            max_autonomous_decisions_per_hour: self.max_autonomous_decisions_per_hour,
350            max_autonomous_decisions_per_day: self.max_autonomous_decisions_per_day,
351            on_budget_exhausted: self.on_budget_exhausted,
352        }
353    }
354
355    fn new(spec: TriggerBindingSpec, version: u32) -> Self {
356        Self {
357            id: TriggerId::new(spec.id),
358            version,
359            source: spec.source,
360            kind: spec.kind,
361            provider: spec.provider,
362            autonomy_tier: spec.autonomy_tier,
363            handler: spec.handler,
364            dispatch_priority: spec.dispatch_priority,
365            when: spec.when,
366            when_budget: spec.when_budget,
367            retry: spec.retry,
368            match_events: spec.match_events,
369            dedupe_key: spec.dedupe_key,
370            dedupe_retention_days: spec.dedupe_retention_days,
371            filter: spec.filter,
372            daily_cost_usd: spec.daily_cost_usd,
373            hourly_cost_usd: spec.hourly_cost_usd,
374            max_autonomous_decisions_per_hour: spec.max_autonomous_decisions_per_hour,
375            max_autonomous_decisions_per_day: spec.max_autonomous_decisions_per_day,
376            on_budget_exhausted: spec.on_budget_exhausted,
377            max_concurrent: spec.max_concurrent,
378            flow_control: spec.flow_control,
379            manifest_path: spec.manifest_path,
380            package_name: spec.package_name,
381            definition_fingerprint: spec.definition_fingerprint,
382            state: Mutex::new(TriggerState::Registering),
383            metrics: TriggerMetrics::default(),
384            in_flight: AtomicU64::new(0),
385            cancel_token: Arc::new(AtomicBool::new(false)),
386            predicate_state: Mutex::new(TriggerPredicateState::default()),
387        }
388    }
389
390    pub fn binding_key(&self) -> String {
391        format!("{}@v{}", self.id.as_str(), self.version)
392    }
393
394    pub fn state_snapshot(&self) -> TriggerState {
395        *self.state.lock().expect("trigger state poisoned")
396    }
397
398    pub fn metrics_snapshot(&self) -> TriggerMetricsSnapshot {
399        TriggerMetricsSnapshot {
400            received: self.metrics.received.load(Ordering::Relaxed),
401            dispatched: self.metrics.dispatched.load(Ordering::Relaxed),
402            failed: self.metrics.failed.load(Ordering::Relaxed),
403            dlq: self.metrics.dlq.load(Ordering::Relaxed),
404            in_flight: self.in_flight.load(Ordering::Relaxed),
405            last_received_ms: *self
406                .metrics
407                .last_received_ms
408                .lock()
409                .expect("trigger metrics poisoned"),
410            cost_total_usd_micros: self.metrics.cost_total_usd_micros.load(Ordering::Relaxed),
411            cost_today_usd_micros: self.metrics.cost_today_usd_micros.load(Ordering::Relaxed),
412            cost_hour_usd_micros: self.metrics.cost_hour_usd_micros.load(Ordering::Relaxed),
413            autonomous_decisions_total: self
414                .metrics
415                .autonomous_decisions_total
416                .load(Ordering::Relaxed),
417            autonomous_decisions_today: self
418                .metrics
419                .autonomous_decisions_today
420                .load(Ordering::Relaxed),
421            autonomous_decisions_hour: self
422                .metrics
423                .autonomous_decisions_hour
424                .load(Ordering::Relaxed),
425        }
426    }
427}
428
429#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
430pub struct TriggerBindingSnapshot {
431    pub id: String,
432    pub version: u32,
433    pub source: TriggerBindingSource,
434    pub kind: String,
435    pub provider: String,
436    pub autonomy_tier: AutonomyTier,
437    pub handler_kind: String,
438    pub state: TriggerState,
439    pub metrics: TriggerMetricsSnapshot,
440    pub daily_cost_usd: Option<f64>,
441    pub hourly_cost_usd: Option<f64>,
442    pub max_autonomous_decisions_per_hour: Option<u64>,
443    pub max_autonomous_decisions_per_day: Option<u64>,
444    pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
445}
446
447#[derive(Clone, Copy, Debug, PartialEq, Eq)]
448pub enum TriggerDispatchOutcome {
449    Dispatched,
450    Failed,
451    Dlq,
452}
453
454#[derive(Debug)]
455pub enum TriggerRegistryError {
456    DuplicateId(String),
457    InvalidSpec(String),
458    UnknownId(String),
459    UnknownBindingVersion { id: String, version: u32 },
460    EventLog(String),
461}
462
463impl std::fmt::Display for TriggerRegistryError {
464    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
465        match self {
466            Self::DuplicateId(id) => write!(f, "duplicate trigger id '{id}'"),
467            Self::InvalidSpec(message) | Self::EventLog(message) => f.write_str(message),
468            Self::UnknownId(id) => write!(f, "unknown trigger id '{id}'"),
469            Self::UnknownBindingVersion { id, version } => {
470                write!(f, "unknown trigger binding '{id}' version {version}")
471            }
472        }
473    }
474}
475
476impl std::error::Error for TriggerRegistryError {}
477
478#[derive(Default)]
479pub struct TriggerRegistry {
480    bindings: BTreeMap<String, Vec<Arc<TriggerBinding>>>,
481    by_provider: BTreeMap<String, BTreeSet<String>>,
482    event_log: Option<Arc<AnyEventLog>>,
483    secret_provider: Option<Arc<dyn SecretProvider>>,
484}
485
486thread_local! {
487    static TRIGGER_REGISTRY: RefCell<TriggerRegistry> = RefCell::new(TriggerRegistry::default());
488}
489
490thread_local! {
491    static ORCHESTRATOR_BUDGET: RefCell<OrchestratorBudgetState> =
492        RefCell::new(OrchestratorBudgetState::default());
493}
494
495const TERMINATED_VERSION_RETENTION_LIMIT: usize = 2;
496
497const TRIGGERS_LIFECYCLE_TOPIC: &str = "triggers.lifecycle";
498const PREDICATE_COST_WINDOW: usize = 100;
499
500#[derive(Clone, Debug, Deserialize)]
501struct LifecycleStateTransitionRecord {
502    id: String,
503    version: u32,
504    #[serde(default)]
505    definition_fingerprint: Option<String>,
506    to_state: TriggerState,
507}
508
509#[derive(Clone, Debug)]
510struct HistoricalLifecycleRecord {
511    occurred_at_ms: i64,
512    transition: LifecycleStateTransitionRecord,
513}
514
515#[derive(Clone, Copy, Debug, PartialEq, Eq)]
516pub struct RecordedTriggerBinding {
517    pub version: u32,
518    pub received_at: OffsetDateTime,
519}
520
521#[derive(Clone, Copy, Debug, Default)]
522struct HistoricalVersionLookup {
523    matching_version: Option<u32>,
524    max_version: Option<u32>,
525}
526
527pub fn clear_trigger_registry() {
528    TRIGGER_REGISTRY.with(|slot| {
529        *slot.borrow_mut() = TriggerRegistry::default();
530    });
531    clear_orchestrator_budget();
532}
533
534pub fn install_orchestrator_budget(config: OrchestratorBudgetConfig) {
535    ORCHESTRATOR_BUDGET.with(|slot| {
536        let mut state = slot.borrow_mut();
537        rollover_orchestrator_budget(&mut state);
538        state.config = config;
539    });
540}
541
542pub fn clear_orchestrator_budget() {
543    ORCHESTRATOR_BUDGET.with(|slot| {
544        *slot.borrow_mut() = OrchestratorBudgetState::default();
545    });
546}
547
548pub fn snapshot_orchestrator_budget() -> OrchestratorBudgetSnapshot {
549    ORCHESTRATOR_BUDGET.with(|slot| {
550        let mut state = slot.borrow_mut();
551        rollover_orchestrator_budget(&mut state);
552        OrchestratorBudgetSnapshot {
553            daily_cost_usd: state.config.daily_cost_usd,
554            hourly_cost_usd: state.config.hourly_cost_usd,
555            cost_today_usd_micros: state.cost_today_usd_micros,
556            cost_hour_usd_micros: state.cost_hour_usd_micros,
557            day_utc: state.day_utc,
558            hour_utc: state.hour_utc,
559        }
560    })
561}
562
563pub fn note_orchestrator_budget_cost(cost_usd_micros: u64) {
564    if cost_usd_micros == 0 {
565        return;
566    }
567    ORCHESTRATOR_BUDGET.with(|slot| {
568        let mut state = slot.borrow_mut();
569        rollover_orchestrator_budget(&mut state);
570        state.cost_today_usd_micros = state.cost_today_usd_micros.saturating_add(cost_usd_micros);
571        state.cost_hour_usd_micros = state.cost_hour_usd_micros.saturating_add(cost_usd_micros);
572    });
573}
574
575pub fn orchestrator_budget_would_exceed(expected_cost_usd_micros: u64) -> Option<&'static str> {
576    ORCHESTRATOR_BUDGET.with(|slot| {
577        let mut state = slot.borrow_mut();
578        rollover_orchestrator_budget(&mut state);
579        if state.config.hourly_cost_usd.is_some_and(|limit| {
580            micros_to_usd(
581                state
582                    .cost_hour_usd_micros
583                    .saturating_add(expected_cost_usd_micros),
584            ) > limit
585        }) {
586            return Some("orchestrator_hourly_budget_exceeded");
587        }
588        if state.config.daily_cost_usd.is_some_and(|limit| {
589            micros_to_usd(
590                state
591                    .cost_today_usd_micros
592                    .saturating_add(expected_cost_usd_micros),
593            ) > limit
594        }) {
595            return Some("orchestrator_daily_budget_exceeded");
596        }
597        None
598    })
599}
600
601pub fn reset_binding_budget_windows(binding: &TriggerBinding) {
602    let today = utc_day_key();
603    let hour = utc_hour_key();
604    let mut state = binding
605        .predicate_state
606        .lock()
607        .expect("trigger predicate state poisoned");
608    if state.budget_day_utc != Some(today) {
609        state.budget_day_utc = Some(today);
610        binding
611            .metrics
612            .cost_today_usd_micros
613            .store(0, Ordering::Relaxed);
614        binding
615            .metrics
616            .autonomous_decisions_today
617            .store(0, Ordering::Relaxed);
618    }
619    if state.budget_hour_utc != Some(hour) {
620        state.budget_hour_utc = Some(hour);
621        binding
622            .metrics
623            .cost_hour_usd_micros
624            .store(0, Ordering::Relaxed);
625        binding
626            .metrics
627            .autonomous_decisions_hour
628            .store(0, Ordering::Relaxed);
629    }
630}
631
632pub fn binding_budget_would_exceed(
633    binding: &TriggerBinding,
634    expected_cost_usd_micros: u64,
635) -> Option<&'static str> {
636    reset_binding_budget_windows(binding);
637    if binding.hourly_cost_usd.is_some_and(|limit| {
638        micros_to_usd(
639            binding
640                .metrics
641                .cost_hour_usd_micros
642                .load(Ordering::Relaxed)
643                .saturating_add(expected_cost_usd_micros),
644        ) > limit
645    }) {
646        return Some("hourly_budget_exceeded");
647    }
648    if binding.daily_cost_usd.is_some_and(|limit| {
649        micros_to_usd(
650            binding
651                .metrics
652                .cost_today_usd_micros
653                .load(Ordering::Relaxed)
654                .saturating_add(expected_cost_usd_micros),
655        ) > limit
656    }) {
657        return Some("daily_budget_exceeded");
658    }
659    None
660}
661
662pub fn binding_autonomy_budget_would_exceed(binding: &TriggerBinding) -> Option<&'static str> {
663    reset_binding_budget_windows(binding);
664    if binding
665        .max_autonomous_decisions_per_hour
666        .is_some_and(|limit| {
667            binding
668                .metrics
669                .autonomous_decisions_hour
670                .load(Ordering::Relaxed)
671                .saturating_add(1)
672                > limit
673        })
674    {
675        return Some("hourly_autonomy_budget_exceeded");
676    }
677    if binding
678        .max_autonomous_decisions_per_day
679        .is_some_and(|limit| {
680            binding
681                .metrics
682                .autonomous_decisions_today
683                .load(Ordering::Relaxed)
684                .saturating_add(1)
685                > limit
686        })
687    {
688        return Some("daily_autonomy_budget_exceeded");
689    }
690    None
691}
692
693pub fn note_autonomous_decision(binding: &TriggerBinding) {
694    reset_binding_budget_windows(binding);
695    binding
696        .metrics
697        .autonomous_decisions_total
698        .fetch_add(1, Ordering::Relaxed);
699    binding
700        .metrics
701        .autonomous_decisions_today
702        .fetch_add(1, Ordering::Relaxed);
703    binding
704        .metrics
705        .autonomous_decisions_hour
706        .fetch_add(1, Ordering::Relaxed);
707}
708
709pub fn expected_predicate_cost_usd_micros(binding: &TriggerBinding) -> u64 {
710    let state = binding
711        .predicate_state
712        .lock()
713        .expect("trigger predicate state poisoned");
714    if !state.recent_cost_usd_micros.is_empty() {
715        let total: u64 = state.recent_cost_usd_micros.iter().copied().sum();
716        return total / state.recent_cost_usd_micros.len() as u64;
717    }
718    binding
719        .when_budget
720        .as_ref()
721        .and_then(|budget| budget.max_cost_usd)
722        .map(usd_to_micros)
723        .unwrap_or_default()
724}
725
726pub fn record_predicate_cost_sample(binding: &TriggerBinding, cost_usd_micros: u64) {
727    let mut state = binding
728        .predicate_state
729        .lock()
730        .expect("trigger predicate state poisoned");
731    state.recent_cost_usd_micros.push_back(cost_usd_micros);
732    while state.recent_cost_usd_micros.len() > PREDICATE_COST_WINDOW {
733        state.recent_cost_usd_micros.pop_front();
734    }
735}
736
737pub fn usd_to_micros(value: f64) -> u64 {
738    if !value.is_finite() || value <= 0.0 {
739        return 0;
740    }
741    (value * 1_000_000.0).ceil() as u64
742}
743
744pub fn micros_to_usd(value: u64) -> f64 {
745    value as f64 / 1_000_000.0
746}
747
748fn rollover_orchestrator_budget(state: &mut OrchestratorBudgetState) {
749    let today = utc_day_key();
750    let hour = utc_hour_key();
751    if state.day_utc != today {
752        state.day_utc = today;
753        state.cost_today_usd_micros = 0;
754    }
755    if state.hour_utc != hour {
756        state.hour_utc = hour;
757        state.cost_hour_usd_micros = 0;
758    }
759}
760
761fn utc_day_key() -> i32 {
762    (clock::now_utc().date()
763        - time::Date::from_calendar_date(1970, time::Month::January, 1).expect("valid epoch date"))
764    .whole_days() as i32
765}
766
767fn utc_hour_key() -> i64 {
768    clock::now_utc().unix_timestamp() / 3_600
769}
770
771pub fn snapshot_trigger_bindings() -> Vec<TriggerBindingSnapshot> {
772    TRIGGER_REGISTRY.with(|slot| {
773        let registry = slot.borrow();
774        let mut snapshots = Vec::new();
775        for bindings in registry.bindings.values() {
776            for binding in bindings {
777                snapshots.push(binding.snapshot());
778            }
779        }
780        snapshots.sort_by(|left, right| {
781            left.id
782                .cmp(&right.id)
783                .then(left.version.cmp(&right.version))
784                .then(left.state.as_str().cmp(right.state.as_str()))
785        });
786        snapshots
787    })
788}
789
790#[allow(clippy::arc_with_non_send_sync)]
791pub fn resolve_trigger_binding_as_of(
792    id: &str,
793    as_of: OffsetDateTime,
794) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
795    let version = binding_version_as_of(id, as_of)?;
796    resolve_trigger_binding_version(id, version)
797}
798
799#[allow(clippy::arc_with_non_send_sync)]
800pub fn resolve_live_or_as_of(
801    id: &str,
802    recorded: RecordedTriggerBinding,
803) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
804    match resolve_live_trigger_binding(id, Some(recorded.version)) {
805        Ok(binding) => Ok(binding),
806        Err(TriggerRegistryError::UnknownBindingVersion { .. }) => {
807            let binding = resolve_trigger_binding_as_of(id, recorded.received_at)?;
808            let mut metadata = BTreeMap::new();
809            metadata.insert("trigger_id".to_string(), serde_json::json!(id));
810            metadata.insert(
811                "recorded_version".to_string(),
812                serde_json::json!(recorded.version),
813            );
814            metadata.insert(
815                "received_at".to_string(),
816                serde_json::json!(recorded
817                    .received_at
818                    .format(&time::format_description::well_known::Rfc3339)
819                    .unwrap_or_else(|_| recorded.received_at.to_string())),
820            );
821            metadata.insert(
822                "resolved_version".to_string(),
823                serde_json::json!(binding.version),
824            );
825            crate::events::log_warn_meta(
826                "replay.binding_version_gc_fallback",
827                "trigger replay fell back to lifecycle history after binding version GC",
828                metadata,
829            );
830            Ok(binding)
831        }
832        Err(error) => Err(error),
833    }
834}
835
836pub fn binding_version_as_of(id: &str, as_of: OffsetDateTime) -> Result<u32, TriggerRegistryError> {
837    TRIGGER_REGISTRY.with(|slot| {
838        let registry = slot.borrow();
839        registry.binding_version_as_of(id, as_of)
840    })
841}
842
843#[allow(clippy::arc_with_non_send_sync)]
844fn resolve_trigger_binding_version(
845    id: &str,
846    version: u32,
847) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
848    TRIGGER_REGISTRY.with(|slot| {
849        let registry = slot.borrow();
850        registry
851            .binding(id, version)
852            .ok_or_else(|| TriggerRegistryError::UnknownBindingVersion {
853                id: id.to_string(),
854                version,
855            })
856    })
857}
858
859#[allow(clippy::arc_with_non_send_sync)]
860pub fn resolve_live_trigger_binding(
861    id: &str,
862    version: Option<u32>,
863) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
864    TRIGGER_REGISTRY.with(|slot| {
865        let registry = slot.borrow();
866        if let Some(version) = version {
867            let binding = registry.binding(id, version).ok_or_else(|| {
868                TriggerRegistryError::UnknownBindingVersion {
869                    id: id.to_string(),
870                    version,
871                }
872            })?;
873            if binding.state_snapshot() == TriggerState::Terminated {
874                return Err(TriggerRegistryError::UnknownBindingVersion {
875                    id: id.to_string(),
876                    version,
877                });
878            }
879            return Ok(binding);
880        }
881
882        registry
883            .live_bindings_any_source(id)
884            .into_iter()
885            .max_by_key(|binding| binding.version)
886            .ok_or_else(|| TriggerRegistryError::UnknownId(id.to_string()))
887    })
888}
889
890pub(crate) fn matching_bindings(event: &super::TriggerEvent) -> Vec<Arc<TriggerBinding>> {
891    TRIGGER_REGISTRY.with(|slot| {
892        let registry = slot.borrow();
893        let Some(binding_ids) = registry.by_provider.get(event.provider.as_str()) else {
894            return Vec::new();
895        };
896
897        let mut bindings = Vec::new();
898        for id in binding_ids {
899            let Some(versions) = registry.bindings.get(id) else {
900                continue;
901            };
902            for binding in versions {
903                if binding.state_snapshot() != TriggerState::Active {
904                    continue;
905                }
906                if !binding.match_events.is_empty()
907                    && !binding.match_events.iter().any(|kind| kind == &event.kind)
908                {
909                    continue;
910                }
911                bindings.push(binding.clone());
912            }
913        }
914
915        bindings.sort_by(|left, right| {
916            left.id
917                .as_str()
918                .cmp(right.id.as_str())
919                .then(left.version.cmp(&right.version))
920        });
921        bindings
922    })
923}
924
925pub async fn install_manifest_triggers(
926    specs: Vec<TriggerBindingSpec>,
927) -> Result<(), TriggerRegistryError> {
928    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
929        let registry = &mut *slot.borrow_mut();
930        registry.refresh_runtime_context();
931        let mut touched_ids = BTreeSet::new();
932
933        let mut incoming = BTreeMap::new();
934        for spec in specs {
935            let spec_id = spec.id.clone();
936            if spec.source != TriggerBindingSource::Manifest {
937                return Err(TriggerRegistryError::InvalidSpec(format!(
938                    "manifest install received non-manifest trigger '{}'",
939                    spec_id
940                )));
941            }
942            if spec_id.trim().is_empty() {
943                return Err(TriggerRegistryError::InvalidSpec(
944                    "manifest trigger id cannot be empty".to_string(),
945                ));
946            }
947            if incoming.insert(spec_id.clone(), spec).is_some() {
948                return Err(TriggerRegistryError::DuplicateId(spec_id));
949            }
950        }
951
952        let mut lifecycle = Vec::new();
953        let existing_ids: Vec<String> = registry
954            .bindings
955            .iter()
956            .filter(|(_, bindings)| {
957                bindings.iter().any(|binding| {
958                    binding.source == TriggerBindingSource::Manifest
959                        && binding.state_snapshot() != TriggerState::Terminated
960                })
961            })
962            .map(|(id, _)| id.clone())
963            .collect();
964
965        for id in existing_ids {
966            let live_manifest = registry.live_bindings(&id, TriggerBindingSource::Manifest);
967            let Some(spec) = incoming.remove(&id) else {
968                for binding in live_manifest {
969                    registry.transition_binding_to_draining(&binding, &mut lifecycle);
970                }
971                touched_ids.insert(id.clone());
972                continue;
973            };
974
975            let has_matching_active = live_manifest.iter().any(|binding| {
976                binding.definition_fingerprint == spec.definition_fingerprint
977                    && matches!(
978                        binding.state_snapshot(),
979                        TriggerState::Registering | TriggerState::Active
980                    )
981            });
982            if has_matching_active {
983                continue;
984            }
985
986            for binding in live_manifest {
987                registry.transition_binding_to_draining(&binding, &mut lifecycle);
988            }
989
990            let version = registry.next_version_for_spec(&spec);
991            registry.register_binding(spec, version, &mut lifecycle);
992            touched_ids.insert(id.clone());
993        }
994
995        for spec in incoming.into_values() {
996            touched_ids.insert(spec.id.clone());
997            let version = registry.next_version_for_spec(&spec);
998            registry.register_binding(spec, version, &mut lifecycle);
999        }
1000
1001        for id in touched_ids {
1002            registry.gc_terminated_versions(&id);
1003        }
1004
1005        Ok((registry.event_log.clone(), lifecycle))
1006    })?;
1007
1008    append_lifecycle_events(event_log, events).await
1009}
1010
1011pub async fn dynamic_register(
1012    mut spec: TriggerBindingSpec,
1013) -> Result<TriggerId, TriggerRegistryError> {
1014    if spec.id.trim().is_empty() {
1015        spec.id = format!("dynamic_trigger_{}", Uuid::now_v7());
1016    }
1017    spec.source = TriggerBindingSource::Dynamic;
1018    let id = spec.id.clone();
1019    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1020        let registry = &mut *slot.borrow_mut();
1021        registry.refresh_runtime_context();
1022
1023        if registry.bindings.contains_key(id.as_str()) {
1024            return Err(TriggerRegistryError::DuplicateId(id.clone()));
1025        }
1026
1027        let mut lifecycle = Vec::new();
1028        let version = registry.next_version_for_spec(&spec);
1029        registry.register_binding(spec, version, &mut lifecycle);
1030        Ok((registry.event_log.clone(), lifecycle))
1031    })?;
1032
1033    append_lifecycle_events(event_log, events).await?;
1034    Ok(TriggerId::new(id))
1035}
1036
1037pub async fn dynamic_deregister(id: &str) -> Result<(), TriggerRegistryError> {
1038    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1039        let registry = &mut *slot.borrow_mut();
1040        let live_dynamic = registry.live_bindings(id, TriggerBindingSource::Dynamic);
1041        if live_dynamic.is_empty() {
1042            return Err(TriggerRegistryError::UnknownId(id.to_string()));
1043        }
1044
1045        let mut lifecycle = Vec::new();
1046        for binding in live_dynamic {
1047            registry.transition_binding_to_draining(&binding, &mut lifecycle);
1048        }
1049        Ok((registry.event_log.clone(), lifecycle))
1050    })?;
1051
1052    append_lifecycle_events(event_log, events).await
1053}
1054
1055pub async fn drain(id: &str) -> Result<(), TriggerRegistryError> {
1056    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1057        let registry = &mut *slot.borrow_mut();
1058        let live = registry.live_bindings_any_source(id);
1059        if live.is_empty() {
1060            return Err(TriggerRegistryError::UnknownId(id.to_string()));
1061        }
1062
1063        let mut lifecycle = Vec::new();
1064        for binding in live {
1065            registry.transition_binding_to_draining(&binding, &mut lifecycle);
1066        }
1067        Ok((registry.event_log.clone(), lifecycle))
1068    })?;
1069
1070    append_lifecycle_events(event_log, events).await
1071}
1072
1073pub async fn pause(id: &str) -> Result<(), TriggerRegistryError> {
1074    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1075        let registry = &mut *slot.borrow_mut();
1076        let live = registry.live_bindings_any_source(id);
1077        if live.is_empty() {
1078            return Err(TriggerRegistryError::UnknownId(id.to_string()));
1079        }
1080
1081        let mut lifecycle = Vec::new();
1082        for binding in live {
1083            match binding.state_snapshot() {
1084                TriggerState::Registering | TriggerState::Active => {
1085                    registry.transition_binding_state(
1086                        &binding,
1087                        TriggerState::Paused,
1088                        &mut lifecycle,
1089                    );
1090                }
1091                TriggerState::Paused | TriggerState::Draining | TriggerState::Terminated => {}
1092            }
1093        }
1094        Ok((registry.event_log.clone(), lifecycle))
1095    })?;
1096
1097    append_lifecycle_events(event_log, events).await
1098}
1099
1100pub async fn resume(id: &str) -> Result<(), TriggerRegistryError> {
1101    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1102        let registry = &mut *slot.borrow_mut();
1103        let live = registry.live_bindings_any_source(id);
1104        if live.is_empty() {
1105            return Err(TriggerRegistryError::UnknownId(id.to_string()));
1106        }
1107
1108        let mut lifecycle = Vec::new();
1109        for binding in live {
1110            if binding.state_snapshot() == TriggerState::Paused {
1111                registry.transition_binding_state(&binding, TriggerState::Active, &mut lifecycle);
1112            }
1113        }
1114        Ok((registry.event_log.clone(), lifecycle))
1115    })?;
1116
1117    append_lifecycle_events(event_log, events).await
1118}
1119
1120fn pin_trigger_binding_inner(
1121    id: &str,
1122    version: u32,
1123    allow_terminated: bool,
1124) -> Result<(), TriggerRegistryError> {
1125    TRIGGER_REGISTRY.with(|slot| {
1126        let registry = slot.borrow();
1127        let binding = registry.binding(id, version).ok_or_else(|| {
1128            TriggerRegistryError::UnknownBindingVersion {
1129                id: id.to_string(),
1130                version,
1131            }
1132        })?;
1133        match binding.state_snapshot() {
1134            TriggerState::Paused => Err(TriggerRegistryError::InvalidSpec(format!(
1135                "trigger binding '{}' version {} is paused",
1136                id, version
1137            ))),
1138            TriggerState::Terminated if !allow_terminated => {
1139                Err(TriggerRegistryError::InvalidSpec(format!(
1140                    "trigger binding '{}' version {} is terminated",
1141                    id, version
1142                )))
1143            }
1144            _ => {
1145                binding.in_flight.fetch_add(1, Ordering::Relaxed);
1146                Ok(())
1147            }
1148        }
1149    })
1150}
1151
1152pub fn pin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1153    pin_trigger_binding_inner(id, version, false)
1154}
1155
1156pub async fn unpin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1157    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1158        let registry = &mut *slot.borrow_mut();
1159        let binding = registry.binding(id, version).ok_or_else(|| {
1160            TriggerRegistryError::UnknownBindingVersion {
1161                id: id.to_string(),
1162                version,
1163            }
1164        })?;
1165        let current = binding.in_flight.load(Ordering::Relaxed);
1166        if current == 0 {
1167            return Err(TriggerRegistryError::InvalidSpec(format!(
1168                "trigger binding '{}' version {} has no in-flight events",
1169                id, version
1170            )));
1171        }
1172        binding.in_flight.fetch_sub(1, Ordering::Relaxed);
1173
1174        let mut lifecycle = Vec::new();
1175        registry.maybe_finalize_draining(&binding, &mut lifecycle);
1176        registry.gc_terminated_versions(binding.id.as_str());
1177        Ok((registry.event_log.clone(), lifecycle))
1178    })?;
1179
1180    append_lifecycle_events(event_log, events).await
1181}
1182
1183pub fn begin_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1184    begin_in_flight_inner(id, version, false)
1185}
1186
1187pub(crate) fn begin_replay_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1188    begin_in_flight_inner(id, version, true)
1189}
1190
1191fn begin_in_flight_inner(
1192    id: &str,
1193    version: u32,
1194    allow_terminated: bool,
1195) -> Result<(), TriggerRegistryError> {
1196    pin_trigger_binding_inner(id, version, allow_terminated)?;
1197    TRIGGER_REGISTRY.with(|slot| {
1198        let registry = slot.borrow();
1199        let binding = registry.binding(id, version).ok_or_else(|| {
1200            TriggerRegistryError::UnknownBindingVersion {
1201                id: id.to_string(),
1202                version,
1203            }
1204        })?;
1205        binding.metrics.received.fetch_add(1, Ordering::Relaxed);
1206        *binding
1207            .metrics
1208            .last_received_ms
1209            .lock()
1210            .expect("trigger metrics poisoned") = Some(now_ms());
1211        Ok(())
1212    })
1213}
1214
1215pub async fn finish_in_flight(
1216    id: &str,
1217    version: u32,
1218    outcome: TriggerDispatchOutcome,
1219) -> Result<(), TriggerRegistryError> {
1220    TRIGGER_REGISTRY.with(|slot| {
1221        let registry = &mut *slot.borrow_mut();
1222        let binding = registry.binding(id, version).ok_or_else(|| {
1223            TriggerRegistryError::UnknownBindingVersion {
1224                id: id.to_string(),
1225                version,
1226            }
1227        })?;
1228        let current = binding.in_flight.load(Ordering::Relaxed);
1229        if current == 0 {
1230            return Err(TriggerRegistryError::InvalidSpec(format!(
1231                "trigger binding '{}' version {} has no in-flight events",
1232                id, version
1233            )));
1234        }
1235        match outcome {
1236            TriggerDispatchOutcome::Dispatched => {
1237                binding.metrics.dispatched.fetch_add(1, Ordering::Relaxed);
1238            }
1239            TriggerDispatchOutcome::Failed => {
1240                binding.metrics.failed.fetch_add(1, Ordering::Relaxed);
1241            }
1242            TriggerDispatchOutcome::Dlq => {
1243                binding.metrics.dlq.fetch_add(1, Ordering::Relaxed);
1244            }
1245        }
1246        Ok(())
1247    })?;
1248
1249    unpin_trigger_binding(id, version).await
1250}
1251
1252impl TriggerRegistry {
1253    fn refresh_runtime_context(&mut self) {
1254        if self.event_log.is_none() {
1255            self.event_log = active_event_log();
1256        }
1257        if self.secret_provider.is_none() {
1258            self.secret_provider = default_secret_provider();
1259        }
1260    }
1261
1262    fn binding(&self, id: &str, version: u32) -> Option<Arc<TriggerBinding>> {
1263        self.bindings
1264            .get(id)
1265            .and_then(|bindings| bindings.iter().find(|binding| binding.version == version))
1266            .cloned()
1267    }
1268
1269    fn live_bindings(&self, id: &str, source: TriggerBindingSource) -> Vec<Arc<TriggerBinding>> {
1270        self.bindings
1271            .get(id)
1272            .into_iter()
1273            .flat_map(|bindings| bindings.iter())
1274            .filter(|binding| {
1275                binding.source == source && binding.state_snapshot() != TriggerState::Terminated
1276            })
1277            .cloned()
1278            .collect()
1279    }
1280
1281    fn live_bindings_any_source(&self, id: &str) -> Vec<Arc<TriggerBinding>> {
1282        self.bindings
1283            .get(id)
1284            .into_iter()
1285            .flat_map(|bindings| bindings.iter())
1286            .filter(|binding| binding.state_snapshot() != TriggerState::Terminated)
1287            .cloned()
1288            .collect()
1289    }
1290
1291    fn next_version_for_spec(&self, spec: &TriggerBindingSpec) -> u32 {
1292        if let Some(version) = self
1293            .bindings
1294            .get(spec.id.as_str())
1295            .into_iter()
1296            .flat_map(|bindings| bindings.iter())
1297            .find(|binding| binding.definition_fingerprint == spec.definition_fingerprint)
1298            .map(|binding| binding.version)
1299        {
1300            return version;
1301        }
1302
1303        let historical =
1304            self.historical_versions_for(spec.id.as_str(), spec.definition_fingerprint.as_str());
1305        if let Some(version) = historical.matching_version {
1306            return version;
1307        }
1308
1309        self.bindings
1310            .get(spec.id.as_str())
1311            .into_iter()
1312            .flat_map(|bindings| bindings.iter())
1313            .map(|binding| binding.version)
1314            .chain(historical.max_version)
1315            .max()
1316            .unwrap_or(0)
1317            + 1
1318    }
1319
1320    fn gc_terminated_versions(&mut self, id: &str) {
1321        let Some(bindings) = self.bindings.get_mut(id) else {
1322            return;
1323        };
1324
1325        let mut newest_versions: Vec<u32> =
1326            bindings.iter().map(|binding| binding.version).collect();
1327        newest_versions.sort_unstable_by(|left, right| right.cmp(left));
1328        newest_versions.truncate(TERMINATED_VERSION_RETENTION_LIMIT);
1329        let retained_versions: BTreeSet<u32> = newest_versions.into_iter().collect();
1330
1331        bindings.retain(|binding| {
1332            binding.state_snapshot() != TriggerState::Terminated
1333                || retained_versions.contains(&binding.version)
1334        });
1335
1336        if bindings.is_empty() {
1337            self.bindings.remove(id);
1338        }
1339    }
1340
1341    fn historical_versions_for(&self, id: &str, fingerprint: &str) -> HistoricalVersionLookup {
1342        let mut lookup = HistoricalVersionLookup::default();
1343        for record in self.lifecycle_records_for(id) {
1344            lookup.max_version = Some(
1345                lookup
1346                    .max_version
1347                    .unwrap_or(0)
1348                    .max(record.transition.version),
1349            );
1350            if record.transition.definition_fingerprint.as_deref() == Some(fingerprint) {
1351                lookup.matching_version = Some(record.transition.version);
1352            }
1353        }
1354        lookup
1355    }
1356
1357    fn binding_version_as_of(
1358        &self,
1359        id: &str,
1360        as_of: OffsetDateTime,
1361    ) -> Result<u32, TriggerRegistryError> {
1362        let cutoff_ms = (as_of.unix_timestamp_nanos() / 1_000_000) as i64;
1363        let mut active_version = None;
1364        for record in self.lifecycle_records_for(id) {
1365            if record.occurred_at_ms > cutoff_ms {
1366                break;
1367            }
1368            match record.transition.to_state {
1369                TriggerState::Active => active_version = Some(record.transition.version),
1370                TriggerState::Paused | TriggerState::Draining | TriggerState::Terminated => {
1371                    if active_version == Some(record.transition.version) {
1372                        active_version = None;
1373                    }
1374                }
1375                TriggerState::Registering => {}
1376            }
1377        }
1378
1379        active_version.ok_or_else(|| {
1380            TriggerRegistryError::InvalidSpec(format!(
1381                "no active trigger binding '{}' at {}",
1382                id,
1383                as_of
1384                    .format(&time::format_description::well_known::Rfc3339)
1385                    .unwrap_or_else(|_| as_of.to_string())
1386            ))
1387        })
1388    }
1389
1390    fn lifecycle_records_for(&self, id: &str) -> Vec<HistoricalLifecycleRecord> {
1391        let Some(event_log) = self.event_log.as_ref() else {
1392            return Vec::new();
1393        };
1394        let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1395            .expect("static triggers.lifecycle topic should always be valid");
1396        futures::executor::block_on(event_log.read_range(&topic, None, usize::MAX))
1397            .unwrap_or_default()
1398            .into_iter()
1399            .filter_map(|(_, event)| {
1400                let occurred_at_ms = event.occurred_at_ms;
1401                let transition: LifecycleStateTransitionRecord =
1402                    serde_json::from_value(event.payload).ok()?;
1403                (transition.id == id).then_some(HistoricalLifecycleRecord {
1404                    occurred_at_ms,
1405                    transition,
1406                })
1407            })
1408            .collect()
1409    }
1410
1411    #[allow(clippy::arc_with_non_send_sync)]
1412    fn register_binding(
1413        &mut self,
1414        spec: TriggerBindingSpec,
1415        version: u32,
1416        lifecycle: &mut Vec<LogEvent>,
1417    ) -> Arc<TriggerBinding> {
1418        let binding = Arc::new(TriggerBinding::new(spec, version));
1419        self.by_provider
1420            .entry(binding.provider.as_str().to_string())
1421            .or_default()
1422            .insert(binding.id.as_str().to_string());
1423        self.bindings
1424            .entry(binding.id.as_str().to_string())
1425            .or_default()
1426            .push(binding.clone());
1427        lifecycle.push(lifecycle_event(&binding, None, TriggerState::Registering));
1428        self.transition_binding_state(&binding, TriggerState::Active, lifecycle);
1429        binding
1430    }
1431
1432    fn transition_binding_to_draining(
1433        &self,
1434        binding: &Arc<TriggerBinding>,
1435        lifecycle: &mut Vec<LogEvent>,
1436    ) {
1437        if matches!(binding.state_snapshot(), TriggerState::Terminated) {
1438            return;
1439        }
1440        self.transition_binding_state(binding, TriggerState::Draining, lifecycle);
1441        self.maybe_finalize_draining(binding, lifecycle);
1442    }
1443
1444    fn maybe_finalize_draining(
1445        &self,
1446        binding: &Arc<TriggerBinding>,
1447        lifecycle: &mut Vec<LogEvent>,
1448    ) {
1449        if binding.state_snapshot() == TriggerState::Draining
1450            && binding.in_flight.load(Ordering::Relaxed) == 0
1451        {
1452            self.transition_binding_state(binding, TriggerState::Terminated, lifecycle);
1453        }
1454    }
1455
1456    fn transition_binding_state(
1457        &self,
1458        binding: &Arc<TriggerBinding>,
1459        next: TriggerState,
1460        lifecycle: &mut Vec<LogEvent>,
1461    ) {
1462        let mut state = binding.state.lock().expect("trigger state poisoned");
1463        let previous = *state;
1464        if previous == next {
1465            return;
1466        }
1467        *state = next;
1468        drop(state);
1469        lifecycle.push(lifecycle_event(binding, Some(previous), next));
1470    }
1471}
1472
1473fn lifecycle_event(
1474    binding: &TriggerBinding,
1475    from_state: Option<TriggerState>,
1476    to_state: TriggerState,
1477) -> LogEvent {
1478    LogEvent::new(
1479        "state_transition",
1480        serde_json::json!({
1481            "id": binding.id.as_str(),
1482            "binding_key": binding.binding_key(),
1483            "version": binding.version,
1484            "provider": binding.provider.as_str(),
1485            "kind": &binding.kind,
1486            "source": binding.source.as_str(),
1487            "handler_kind": binding.handler.kind(),
1488            "definition_fingerprint": &binding.definition_fingerprint,
1489            "from_state": from_state.map(TriggerState::as_str),
1490            "to_state": to_state.as_str(),
1491        }),
1492    )
1493}
1494
1495async fn append_lifecycle_events(
1496    event_log: Option<Arc<AnyEventLog>>,
1497    events: Vec<LogEvent>,
1498) -> Result<(), TriggerRegistryError> {
1499    let Some(event_log) = event_log else {
1500        return Ok(());
1501    };
1502    if events.is_empty() {
1503        return Ok(());
1504    }
1505
1506    let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1507        .expect("static triggers.lifecycle topic should always be valid");
1508    for event in events {
1509        event_log
1510            .append(&topic, event)
1511            .await
1512            .map_err(|error| TriggerRegistryError::EventLog(error.to_string()))?;
1513    }
1514    Ok(())
1515}
1516
1517fn default_secret_provider() -> Option<Arc<dyn SecretProvider>> {
1518    configured_default_chain(default_secret_namespace())
1519        .ok()
1520        .map(|provider| Arc::new(provider) as Arc<dyn SecretProvider>)
1521}
1522
1523fn default_secret_namespace() -> String {
1524    if let Ok(namespace) = std::env::var("HARN_SECRET_NAMESPACE") {
1525        if !namespace.trim().is_empty() {
1526            return namespace;
1527        }
1528    }
1529
1530    let cwd = std::env::current_dir().unwrap_or_default();
1531    let leaf = cwd
1532        .file_name()
1533        .and_then(|name| name.to_str())
1534        .filter(|name| !name.is_empty())
1535        .unwrap_or("workspace");
1536    format!("harn/{leaf}")
1537}
1538
1539fn now_ms() -> i64 {
1540    clock::now_ms()
1541}
1542
1543#[cfg(test)]
1544mod tests {
1545    use super::*;
1546    use crate::event_log::{install_default_for_base_dir, reset_active_event_log};
1547    use crate::events::{add_event_sink, clear_event_sinks, CollectorSink, EventLevel};
1548    use crate::triggers::test_util::timing::FILE_WATCH_FALLBACK_POLL;
1549    use std::rc::Rc;
1550    use time::OffsetDateTime;
1551
1552    fn manifest_spec(id: &str, fingerprint: &str) -> TriggerBindingSpec {
1553        TriggerBindingSpec {
1554            id: id.to_string(),
1555            source: TriggerBindingSource::Manifest,
1556            kind: "webhook".to_string(),
1557            provider: ProviderId::from("github"),
1558            autonomy_tier: crate::AutonomyTier::ActAuto,
1559            handler: TriggerHandlerSpec::Worker {
1560                queue: format!("{id}-queue"),
1561            },
1562            dispatch_priority: crate::WorkerQueuePriority::Normal,
1563            when: None,
1564            when_budget: None,
1565            retry: TriggerRetryConfig::default(),
1566            match_events: vec!["issues.opened".to_string()],
1567            dedupe_key: Some("event.dedupe_key".to_string()),
1568            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1569            filter: Some("event.kind".to_string()),
1570            daily_cost_usd: Some(5.0),
1571            hourly_cost_usd: None,
1572            max_autonomous_decisions_per_hour: None,
1573            max_autonomous_decisions_per_day: None,
1574            on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1575            max_concurrent: Some(10),
1576            flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1577            manifest_path: None,
1578            package_name: Some("workspace".to_string()),
1579            definition_fingerprint: fingerprint.to_string(),
1580        }
1581    }
1582
1583    fn dynamic_spec(id: &str) -> TriggerBindingSpec {
1584        TriggerBindingSpec {
1585            id: id.to_string(),
1586            source: TriggerBindingSource::Dynamic,
1587            kind: "webhook".to_string(),
1588            provider: ProviderId::from("github"),
1589            autonomy_tier: crate::AutonomyTier::ActAuto,
1590            handler: TriggerHandlerSpec::Worker {
1591                queue: format!("{id}-queue"),
1592            },
1593            dispatch_priority: crate::WorkerQueuePriority::Normal,
1594            when: None,
1595            when_budget: None,
1596            retry: TriggerRetryConfig::default(),
1597            match_events: vec!["issues.opened".to_string()],
1598            dedupe_key: None,
1599            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1600            filter: None,
1601            daily_cost_usd: None,
1602            hourly_cost_usd: None,
1603            max_autonomous_decisions_per_hour: None,
1604            max_autonomous_decisions_per_day: None,
1605            on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1606            max_concurrent: None,
1607            flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1608            manifest_path: None,
1609            package_name: None,
1610            definition_fingerprint: format!("dynamic:{id}"),
1611        }
1612    }
1613
1614    #[tokio::test(flavor = "current_thread")]
1615    async fn manifest_loaded_trigger_registers_with_zeroed_metrics() {
1616        clear_trigger_registry();
1617
1618        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1619            .await
1620            .expect("manifest trigger installs");
1621
1622        let snapshots = snapshot_trigger_bindings();
1623        assert_eq!(snapshots.len(), 1);
1624        let binding = &snapshots[0];
1625        assert_eq!(binding.id, "github-new-issue");
1626        assert_eq!(binding.version, 1);
1627        assert_eq!(binding.state, TriggerState::Active);
1628        assert_eq!(binding.metrics, TriggerMetricsSnapshot::default());
1629
1630        clear_trigger_registry();
1631    }
1632
1633    #[tokio::test(flavor = "current_thread")]
1634    async fn dynamic_register_assigns_unique_ids_and_rejects_duplicates() {
1635        clear_trigger_registry();
1636
1637        let first = dynamic_register(dynamic_spec("dynamic-a"))
1638            .await
1639            .expect("first dynamic trigger");
1640        let second = dynamic_register(dynamic_spec("dynamic-b"))
1641            .await
1642            .expect("second dynamic trigger");
1643        assert_ne!(first, second);
1644
1645        let error = dynamic_register(dynamic_spec("dynamic-a"))
1646            .await
1647            .expect_err("duplicate id should fail");
1648        assert!(matches!(error, TriggerRegistryError::DuplicateId(_)));
1649
1650        clear_trigger_registry();
1651    }
1652
1653    #[tokio::test(flavor = "current_thread")]
1654    async fn drain_waits_for_in_flight_events_before_terminating() {
1655        clear_trigger_registry();
1656
1657        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1658            .await
1659            .expect("manifest trigger installs");
1660        begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1661
1662        drain("github-new-issue").await.expect("drain succeeds");
1663        let binding = snapshot_trigger_bindings()
1664            .into_iter()
1665            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1666            .expect("binding snapshot");
1667        assert_eq!(binding.state, TriggerState::Draining);
1668        assert_eq!(binding.metrics.in_flight, 1);
1669
1670        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1671            .await
1672            .expect("finish in-flight event");
1673        let binding = snapshot_trigger_bindings()
1674            .into_iter()
1675            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1676            .expect("binding snapshot");
1677        assert_eq!(binding.state, TriggerState::Terminated);
1678        assert_eq!(binding.metrics.in_flight, 0);
1679
1680        clear_trigger_registry();
1681    }
1682
1683    #[tokio::test(flavor = "current_thread")]
1684    async fn hot_reload_registers_new_version_while_old_binding_drains() {
1685        clear_trigger_registry();
1686
1687        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1688            .await
1689            .expect("initial manifest trigger installs");
1690        begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1691
1692        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1693            .await
1694            .expect("updated manifest trigger installs");
1695
1696        let snapshots = snapshot_trigger_bindings();
1697        assert_eq!(snapshots.len(), 2);
1698        let old = snapshots
1699            .iter()
1700            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1701            .expect("old binding");
1702        let new = snapshots
1703            .iter()
1704            .find(|binding| binding.id == "github-new-issue" && binding.version == 2)
1705            .expect("new binding");
1706        assert_eq!(old.state, TriggerState::Draining);
1707        assert_eq!(new.state, TriggerState::Active);
1708
1709        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1710            .await
1711            .expect("finish old in-flight event");
1712        let old = snapshot_trigger_bindings()
1713            .into_iter()
1714            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1715            .expect("old binding");
1716        assert_eq!(old.state, TriggerState::Terminated);
1717
1718        clear_trigger_registry();
1719    }
1720
1721    #[tokio::test(flavor = "current_thread")]
1722    async fn gc_drops_terminated_versions_beyond_retention_limit() {
1723        clear_trigger_registry();
1724
1725        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1726            .await
1727            .expect("install v1");
1728        begin_in_flight("github-new-issue", 1).expect("pin v1");
1729
1730        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1731            .await
1732            .expect("install v2");
1733        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1734            .await
1735            .expect("finish v1");
1736
1737        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
1738            .await
1739            .expect("install v3");
1740
1741        let snapshots = snapshot_trigger_bindings();
1742        let versions: Vec<u32> = snapshots
1743            .into_iter()
1744            .filter(|binding| binding.id == "github-new-issue")
1745            .map(|binding| binding.version)
1746            .collect();
1747        assert_eq!(versions, vec![2, 3]);
1748
1749        clear_trigger_registry();
1750    }
1751
1752    #[tokio::test(flavor = "current_thread")]
1753    async fn lifecycle_transitions_append_to_event_log() {
1754        clear_trigger_registry();
1755        reset_active_event_log();
1756        let tempdir = tempfile::tempdir().expect("tempdir");
1757        let log = install_default_for_base_dir(tempdir.path()).expect("install event log");
1758
1759        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1760            .await
1761            .expect("manifest trigger installs");
1762        begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1763        drain("github-new-issue").await.expect("drain succeeds");
1764        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1765            .await
1766            .expect("finish event");
1767
1768        let topic = Topic::new("triggers.lifecycle").expect("valid lifecycle topic");
1769        let events = log
1770            .read_range(&topic, None, 32)
1771            .await
1772            .expect("read lifecycle events");
1773        let states: Vec<String> = events
1774            .into_iter()
1775            .filter_map(|(_, event)| {
1776                event
1777                    .payload
1778                    .get("to_state")
1779                    .and_then(|value| value.as_str())
1780                    .map(|value| value.to_string())
1781            })
1782            .collect();
1783        assert_eq!(
1784            states,
1785            vec![
1786                "registering".to_string(),
1787                "active".to_string(),
1788                "draining".to_string(),
1789                "terminated".to_string(),
1790            ]
1791        );
1792
1793        reset_active_event_log();
1794        clear_trigger_registry();
1795    }
1796
1797    #[tokio::test(flavor = "current_thread")]
1798    async fn version_history_reuses_historical_version_after_restart() {
1799        clear_trigger_registry();
1800        reset_active_event_log();
1801        let tempdir = tempfile::tempdir().expect("tempdir");
1802        install_default_for_base_dir(tempdir.path()).expect("install event log");
1803
1804        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1805            .await
1806            .expect("initial manifest trigger installs");
1807        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1808            .await
1809            .expect("updated manifest trigger installs");
1810
1811        clear_trigger_registry();
1812        reset_active_event_log();
1813        install_default_for_base_dir(tempdir.path()).expect("reopen event log");
1814
1815        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1816            .await
1817            .expect("manifest reload reuses historical version");
1818
1819        let binding = snapshot_trigger_bindings()
1820            .into_iter()
1821            .find(|binding| binding.id == "github-new-issue")
1822            .expect("binding snapshot");
1823        assert_eq!(binding.version, 2);
1824
1825        reset_active_event_log();
1826        clear_trigger_registry();
1827    }
1828
1829    #[tokio::test(flavor = "current_thread")]
1830    async fn binding_version_as_of_reports_historical_active_version() {
1831        clear_trigger_registry();
1832        reset_active_event_log();
1833        let tempdir = tempfile::tempdir().expect("tempdir");
1834        install_default_for_base_dir(tempdir.path()).expect("install event log");
1835
1836        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1837            .await
1838            .expect("initial manifest trigger installs");
1839        let before_reload = OffsetDateTime::now_utc();
1840        std::thread::sleep(FILE_WATCH_FALLBACK_POLL);
1841
1842        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1843            .await
1844            .expect("updated manifest trigger installs");
1845        let after_reload = OffsetDateTime::now_utc();
1846
1847        assert_eq!(
1848            binding_version_as_of("github-new-issue", before_reload)
1849                .expect("version before reload"),
1850            1
1851        );
1852        assert_eq!(
1853            binding_version_as_of("github-new-issue", after_reload).expect("version after reload"),
1854            2
1855        );
1856
1857        reset_active_event_log();
1858        clear_trigger_registry();
1859    }
1860
1861    #[tokio::test(flavor = "current_thread")]
1862    async fn resolve_live_or_as_of_logs_structured_gc_fallback() {
1863        clear_trigger_registry();
1864        reset_active_event_log();
1865        let sink = Rc::new(CollectorSink::new());
1866        clear_event_sinks();
1867        add_event_sink(sink.clone());
1868        let tempdir = tempfile::tempdir().expect("tempdir");
1869        install_default_for_base_dir(tempdir.path()).expect("install event log");
1870
1871        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1872            .await
1873            .expect("install v1");
1874        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1875            .await
1876            .expect("install v2");
1877        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
1878            .await
1879            .expect("install v3");
1880        let received_at = OffsetDateTime::now_utc();
1881        std::thread::sleep(FILE_WATCH_FALLBACK_POLL);
1882        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v4")])
1883            .await
1884            .expect("install v4");
1885
1886        let binding = resolve_live_or_as_of(
1887            "github-new-issue",
1888            RecordedTriggerBinding {
1889                version: 1,
1890                received_at,
1891            },
1892        )
1893        .expect("resolve fallback binding");
1894        assert_eq!(binding.version, 3);
1895
1896        let warning = sink
1897            .logs
1898            .borrow()
1899            .iter()
1900            .find(|log| log.category == "replay.binding_version_gc_fallback")
1901            .cloned()
1902            .expect("gc fallback warning");
1903        assert_eq!(warning.level, EventLevel::Warn);
1904        assert_eq!(
1905            warning.metadata.get("trigger_id"),
1906            Some(&serde_json::json!("github-new-issue"))
1907        );
1908        assert_eq!(
1909            warning.metadata.get("recorded_version"),
1910            Some(&serde_json::json!(1))
1911        );
1912        assert_eq!(
1913            warning.metadata.get("received_at"),
1914            Some(&serde_json::json!(received_at
1915                .format(&time::format_description::well_known::Rfc3339)
1916                .unwrap_or_else(|_| received_at.to_string())))
1917        );
1918        assert_eq!(
1919            warning.metadata.get("resolved_version"),
1920            Some(&serde_json::json!(3))
1921        );
1922
1923        clear_event_sinks();
1924        crate::events::reset_event_sinks();
1925        reset_active_event_log();
1926        clear_trigger_registry();
1927    }
1928}