Skip to main content

harn_vm/triggers/
registry.rs

1use serde::{Deserialize, Serialize};
2use std::cell::RefCell;
3use std::collections::{BTreeMap, BTreeSet, VecDeque};
4use std::path::PathBuf;
5use std::rc::Rc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::{Arc, Mutex};
8
9use time::OffsetDateTime;
10use uuid::Uuid;
11
12use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogEvent, Topic};
13use crate::llm::trigger_predicate::TriggerPredicateBudget;
14use crate::secrets::{configured_default_chain, SecretProvider};
15use crate::triggers::test_util::clock;
16use crate::trust_graph::AutonomyTier;
17use crate::value::VmClosure;
18
19use super::dispatcher::TriggerRetryConfig;
20use super::flow_control::TriggerFlowControlConfig;
21use super::ProviderId;
22
23#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
24pub struct TriggerId(String);
25
26impl TriggerId {
27    pub fn new(value: impl Into<String>) -> Self {
28        Self(value.into())
29    }
30
31    pub fn as_str(&self) -> &str {
32        &self.0
33    }
34}
35
36impl std::fmt::Display for TriggerId {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        self.0.fmt(f)
39    }
40}
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
43#[serde(rename_all = "snake_case")]
44pub enum TriggerState {
45    Registering,
46    Active,
47    Draining,
48    Terminated,
49}
50
51impl TriggerState {
52    pub fn as_str(self) -> &'static str {
53        match self {
54            Self::Registering => "registering",
55            Self::Active => "active",
56            Self::Draining => "draining",
57            Self::Terminated => "terminated",
58        }
59    }
60}
61
62#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
63#[serde(rename_all = "snake_case")]
64pub enum TriggerBindingSource {
65    Manifest,
66    Dynamic,
67}
68
69impl TriggerBindingSource {
70    pub fn as_str(self) -> &'static str {
71        match self {
72            Self::Manifest => "manifest",
73            Self::Dynamic => "dynamic",
74        }
75    }
76}
77
78#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
79#[serde(rename_all = "snake_case")]
80pub enum TriggerBudgetExhaustionStrategy {
81    #[default]
82    False,
83    RetryLater,
84    Fail,
85    Warn,
86}
87
88impl TriggerBudgetExhaustionStrategy {
89    pub fn as_str(self) -> &'static str {
90        match self {
91            Self::False => "false",
92            Self::RetryLater => "retry_later",
93            Self::Fail => "fail",
94            Self::Warn => "warn",
95        }
96    }
97}
98
99#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
100pub struct OrchestratorBudgetConfig {
101    pub daily_cost_usd: Option<f64>,
102    pub hourly_cost_usd: Option<f64>,
103}
104
105#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
106pub struct OrchestratorBudgetSnapshot {
107    pub daily_cost_usd: Option<f64>,
108    pub hourly_cost_usd: Option<f64>,
109    pub cost_today_usd_micros: u64,
110    pub cost_hour_usd_micros: u64,
111    pub day_utc: i32,
112    pub hour_utc: i64,
113}
114
115#[derive(Debug)]
116struct OrchestratorBudgetState {
117    config: OrchestratorBudgetConfig,
118    day_utc: i32,
119    hour_utc: i64,
120    cost_today_usd_micros: u64,
121    cost_hour_usd_micros: u64,
122}
123
124impl Default for OrchestratorBudgetState {
125    fn default() -> Self {
126        Self {
127            config: OrchestratorBudgetConfig::default(),
128            day_utc: utc_day_key(),
129            hour_utc: utc_hour_key(),
130            cost_today_usd_micros: 0,
131            cost_hour_usd_micros: 0,
132        }
133    }
134}
135
136#[derive(Clone)]
137pub enum TriggerHandlerSpec {
138    Local {
139        raw: String,
140        closure: Rc<VmClosure>,
141    },
142    A2a {
143        target: String,
144        allow_cleartext: bool,
145    },
146    Worker {
147        queue: String,
148    },
149    Persona {
150        binding: crate::PersonaRuntimeBinding,
151    },
152}
153
154impl std::fmt::Debug for TriggerHandlerSpec {
155    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156        match self {
157            Self::Local { raw, .. } => f.debug_struct("Local").field("raw", raw).finish(),
158            Self::A2a {
159                target,
160                allow_cleartext,
161            } => f
162                .debug_struct("A2a")
163                .field("target", target)
164                .field("allow_cleartext", allow_cleartext)
165                .finish(),
166            Self::Worker { queue } => f.debug_struct("Worker").field("queue", queue).finish(),
167            Self::Persona { binding } => f
168                .debug_struct("Persona")
169                .field("name", &binding.name)
170                .finish(),
171        }
172    }
173}
174
175impl TriggerHandlerSpec {
176    pub fn kind(&self) -> &'static str {
177        match self {
178            Self::Local { .. } => "local",
179            Self::A2a { .. } => "a2a",
180            Self::Worker { .. } => "worker",
181            Self::Persona { .. } => "persona",
182        }
183    }
184}
185
186#[derive(Clone)]
187pub struct TriggerPredicateSpec {
188    pub raw: String,
189    pub closure: Rc<VmClosure>,
190}
191
192impl std::fmt::Debug for TriggerPredicateSpec {
193    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194        f.debug_struct("TriggerPredicateSpec")
195            .field("raw", &self.raw)
196            .finish()
197    }
198}
199
200#[derive(Clone, Debug)]
201pub struct TriggerBindingSpec {
202    pub id: String,
203    pub source: TriggerBindingSource,
204    pub kind: String,
205    pub provider: ProviderId,
206    pub autonomy_tier: AutonomyTier,
207    pub handler: TriggerHandlerSpec,
208    pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
209    pub when: Option<TriggerPredicateSpec>,
210    pub when_budget: Option<TriggerPredicateBudget>,
211    pub retry: TriggerRetryConfig,
212    pub match_events: Vec<String>,
213    pub dedupe_key: Option<String>,
214    pub dedupe_retention_days: u32,
215    pub filter: Option<String>,
216    pub daily_cost_usd: Option<f64>,
217    pub hourly_cost_usd: Option<f64>,
218    pub max_autonomous_decisions_per_hour: Option<u64>,
219    pub max_autonomous_decisions_per_day: Option<u64>,
220    pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
221    pub max_concurrent: Option<u32>,
222    pub flow_control: TriggerFlowControlConfig,
223    pub manifest_path: Option<PathBuf>,
224    pub package_name: Option<String>,
225    pub definition_fingerprint: String,
226}
227
228#[derive(Debug)]
229pub struct TriggerMetrics {
230    pub received: AtomicU64,
231    pub dispatched: AtomicU64,
232    pub failed: AtomicU64,
233    pub dlq: AtomicU64,
234    pub last_received_ms: Mutex<Option<i64>>,
235    pub cost_total_usd_micros: AtomicU64,
236    pub cost_today_usd_micros: AtomicU64,
237    pub cost_hour_usd_micros: AtomicU64,
238    pub autonomous_decisions_total: AtomicU64,
239    pub autonomous_decisions_today: AtomicU64,
240    pub autonomous_decisions_hour: AtomicU64,
241}
242
243impl Default for TriggerMetrics {
244    fn default() -> Self {
245        Self {
246            received: AtomicU64::new(0),
247            dispatched: AtomicU64::new(0),
248            failed: AtomicU64::new(0),
249            dlq: AtomicU64::new(0),
250            last_received_ms: Mutex::new(None),
251            cost_total_usd_micros: AtomicU64::new(0),
252            cost_today_usd_micros: AtomicU64::new(0),
253            cost_hour_usd_micros: AtomicU64::new(0),
254            autonomous_decisions_total: AtomicU64::new(0),
255            autonomous_decisions_today: AtomicU64::new(0),
256            autonomous_decisions_hour: AtomicU64::new(0),
257        }
258    }
259}
260
261#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
262pub struct TriggerMetricsSnapshot {
263    pub received: u64,
264    pub dispatched: u64,
265    pub failed: u64,
266    pub dlq: u64,
267    pub in_flight: u64,
268    pub last_received_ms: Option<i64>,
269    pub cost_total_usd_micros: u64,
270    pub cost_today_usd_micros: u64,
271    pub cost_hour_usd_micros: u64,
272    pub autonomous_decisions_total: u64,
273    pub autonomous_decisions_today: u64,
274    pub autonomous_decisions_hour: u64,
275}
276
277pub struct TriggerBinding {
278    pub id: TriggerId,
279    pub version: u32,
280    pub source: TriggerBindingSource,
281    pub kind: String,
282    pub provider: ProviderId,
283    pub autonomy_tier: AutonomyTier,
284    pub handler: TriggerHandlerSpec,
285    pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
286    pub when: Option<TriggerPredicateSpec>,
287    pub when_budget: Option<TriggerPredicateBudget>,
288    pub retry: TriggerRetryConfig,
289    pub match_events: Vec<String>,
290    pub dedupe_key: Option<String>,
291    pub dedupe_retention_days: u32,
292    pub filter: Option<String>,
293    pub daily_cost_usd: Option<f64>,
294    pub hourly_cost_usd: Option<f64>,
295    pub max_autonomous_decisions_per_hour: Option<u64>,
296    pub max_autonomous_decisions_per_day: Option<u64>,
297    pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
298    pub max_concurrent: Option<u32>,
299    pub flow_control: TriggerFlowControlConfig,
300    pub manifest_path: Option<PathBuf>,
301    pub package_name: Option<String>,
302    pub definition_fingerprint: String,
303    pub state: Mutex<TriggerState>,
304    pub metrics: TriggerMetrics,
305    pub in_flight: AtomicU64,
306    pub cancel_token: Arc<AtomicBool>,
307    pub predicate_state: Mutex<TriggerPredicateState>,
308}
309
310#[derive(Clone, Debug, Default)]
311pub struct TriggerPredicateState {
312    pub budget_day_utc: Option<i32>,
313    pub budget_hour_utc: Option<i64>,
314    pub consecutive_failures: u32,
315    pub breaker_open_until_ms: Option<i64>,
316    pub recent_cost_usd_micros: VecDeque<u64>,
317}
318
319impl std::fmt::Debug for TriggerBinding {
320    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
321        f.debug_struct("TriggerBinding")
322            .field("id", &self.id)
323            .field("version", &self.version)
324            .field("source", &self.source)
325            .field("kind", &self.kind)
326            .field("provider", &self.provider)
327            .field("handler_kind", &self.handler.kind())
328            .field("state", &self.state_snapshot())
329            .finish()
330    }
331}
332
333impl TriggerBinding {
334    pub fn snapshot(&self) -> TriggerBindingSnapshot {
335        TriggerBindingSnapshot {
336            id: self.id.as_str().to_string(),
337            version: self.version,
338            source: self.source,
339            kind: self.kind.clone(),
340            provider: self.provider.as_str().to_string(),
341            autonomy_tier: self.autonomy_tier,
342            handler_kind: self.handler.kind().to_string(),
343            state: self.state_snapshot(),
344            metrics: self.metrics_snapshot(),
345            daily_cost_usd: self.daily_cost_usd,
346            hourly_cost_usd: self.hourly_cost_usd,
347            max_autonomous_decisions_per_hour: self.max_autonomous_decisions_per_hour,
348            max_autonomous_decisions_per_day: self.max_autonomous_decisions_per_day,
349            on_budget_exhausted: self.on_budget_exhausted,
350        }
351    }
352
353    fn new(spec: TriggerBindingSpec, version: u32) -> Self {
354        Self {
355            id: TriggerId::new(spec.id),
356            version,
357            source: spec.source,
358            kind: spec.kind,
359            provider: spec.provider,
360            autonomy_tier: spec.autonomy_tier,
361            handler: spec.handler,
362            dispatch_priority: spec.dispatch_priority,
363            when: spec.when,
364            when_budget: spec.when_budget,
365            retry: spec.retry,
366            match_events: spec.match_events,
367            dedupe_key: spec.dedupe_key,
368            dedupe_retention_days: spec.dedupe_retention_days,
369            filter: spec.filter,
370            daily_cost_usd: spec.daily_cost_usd,
371            hourly_cost_usd: spec.hourly_cost_usd,
372            max_autonomous_decisions_per_hour: spec.max_autonomous_decisions_per_hour,
373            max_autonomous_decisions_per_day: spec.max_autonomous_decisions_per_day,
374            on_budget_exhausted: spec.on_budget_exhausted,
375            max_concurrent: spec.max_concurrent,
376            flow_control: spec.flow_control,
377            manifest_path: spec.manifest_path,
378            package_name: spec.package_name,
379            definition_fingerprint: spec.definition_fingerprint,
380            state: Mutex::new(TriggerState::Registering),
381            metrics: TriggerMetrics::default(),
382            in_flight: AtomicU64::new(0),
383            cancel_token: Arc::new(AtomicBool::new(false)),
384            predicate_state: Mutex::new(TriggerPredicateState::default()),
385        }
386    }
387
388    pub fn binding_key(&self) -> String {
389        format!("{}@v{}", self.id.as_str(), self.version)
390    }
391
392    pub fn state_snapshot(&self) -> TriggerState {
393        *self.state.lock().expect("trigger state poisoned")
394    }
395
396    pub fn metrics_snapshot(&self) -> TriggerMetricsSnapshot {
397        TriggerMetricsSnapshot {
398            received: self.metrics.received.load(Ordering::Relaxed),
399            dispatched: self.metrics.dispatched.load(Ordering::Relaxed),
400            failed: self.metrics.failed.load(Ordering::Relaxed),
401            dlq: self.metrics.dlq.load(Ordering::Relaxed),
402            in_flight: self.in_flight.load(Ordering::Relaxed),
403            last_received_ms: *self
404                .metrics
405                .last_received_ms
406                .lock()
407                .expect("trigger metrics poisoned"),
408            cost_total_usd_micros: self.metrics.cost_total_usd_micros.load(Ordering::Relaxed),
409            cost_today_usd_micros: self.metrics.cost_today_usd_micros.load(Ordering::Relaxed),
410            cost_hour_usd_micros: self.metrics.cost_hour_usd_micros.load(Ordering::Relaxed),
411            autonomous_decisions_total: self
412                .metrics
413                .autonomous_decisions_total
414                .load(Ordering::Relaxed),
415            autonomous_decisions_today: self
416                .metrics
417                .autonomous_decisions_today
418                .load(Ordering::Relaxed),
419            autonomous_decisions_hour: self
420                .metrics
421                .autonomous_decisions_hour
422                .load(Ordering::Relaxed),
423        }
424    }
425}
426
427#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
428pub struct TriggerBindingSnapshot {
429    pub id: String,
430    pub version: u32,
431    pub source: TriggerBindingSource,
432    pub kind: String,
433    pub provider: String,
434    pub autonomy_tier: AutonomyTier,
435    pub handler_kind: String,
436    pub state: TriggerState,
437    pub metrics: TriggerMetricsSnapshot,
438    pub daily_cost_usd: Option<f64>,
439    pub hourly_cost_usd: Option<f64>,
440    pub max_autonomous_decisions_per_hour: Option<u64>,
441    pub max_autonomous_decisions_per_day: Option<u64>,
442    pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
443}
444
445#[derive(Clone, Copy, Debug, PartialEq, Eq)]
446pub enum TriggerDispatchOutcome {
447    Dispatched,
448    Failed,
449    Dlq,
450}
451
452#[derive(Debug)]
453pub enum TriggerRegistryError {
454    DuplicateId(String),
455    InvalidSpec(String),
456    UnknownId(String),
457    UnknownBindingVersion { id: String, version: u32 },
458    EventLog(String),
459}
460
461impl std::fmt::Display for TriggerRegistryError {
462    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
463        match self {
464            Self::DuplicateId(id) => write!(f, "duplicate trigger id '{id}'"),
465            Self::InvalidSpec(message) | Self::EventLog(message) => f.write_str(message),
466            Self::UnknownId(id) => write!(f, "unknown trigger id '{id}'"),
467            Self::UnknownBindingVersion { id, version } => {
468                write!(f, "unknown trigger binding '{id}' version {version}")
469            }
470        }
471    }
472}
473
474impl std::error::Error for TriggerRegistryError {}
475
476#[derive(Default)]
477pub struct TriggerRegistry {
478    bindings: BTreeMap<String, Vec<Arc<TriggerBinding>>>,
479    by_provider: BTreeMap<String, BTreeSet<String>>,
480    event_log: Option<Arc<AnyEventLog>>,
481    secret_provider: Option<Arc<dyn SecretProvider>>,
482}
483
484thread_local! {
485    static TRIGGER_REGISTRY: RefCell<TriggerRegistry> = RefCell::new(TriggerRegistry::default());
486}
487
488thread_local! {
489    static ORCHESTRATOR_BUDGET: RefCell<OrchestratorBudgetState> =
490        RefCell::new(OrchestratorBudgetState::default());
491}
492
493const TERMINATED_VERSION_RETENTION_LIMIT: usize = 2;
494
495const TRIGGERS_LIFECYCLE_TOPIC: &str = "triggers.lifecycle";
496const PREDICATE_COST_WINDOW: usize = 100;
497
498#[derive(Clone, Debug, Deserialize)]
499struct LifecycleStateTransitionRecord {
500    id: String,
501    version: u32,
502    #[serde(default)]
503    definition_fingerprint: Option<String>,
504    to_state: TriggerState,
505}
506
507#[derive(Clone, Debug)]
508struct HistoricalLifecycleRecord {
509    occurred_at_ms: i64,
510    transition: LifecycleStateTransitionRecord,
511}
512
513#[derive(Clone, Copy, Debug, PartialEq, Eq)]
514pub struct RecordedTriggerBinding {
515    pub version: u32,
516    pub received_at: OffsetDateTime,
517}
518
519#[derive(Clone, Copy, Debug, Default)]
520struct HistoricalVersionLookup {
521    matching_version: Option<u32>,
522    max_version: Option<u32>,
523}
524
525pub fn clear_trigger_registry() {
526    TRIGGER_REGISTRY.with(|slot| {
527        *slot.borrow_mut() = TriggerRegistry::default();
528    });
529    clear_orchestrator_budget();
530}
531
532pub fn install_orchestrator_budget(config: OrchestratorBudgetConfig) {
533    ORCHESTRATOR_BUDGET.with(|slot| {
534        let mut state = slot.borrow_mut();
535        rollover_orchestrator_budget(&mut state);
536        state.config = config;
537    });
538}
539
540pub fn clear_orchestrator_budget() {
541    ORCHESTRATOR_BUDGET.with(|slot| {
542        *slot.borrow_mut() = OrchestratorBudgetState::default();
543    });
544}
545
546pub fn snapshot_orchestrator_budget() -> OrchestratorBudgetSnapshot {
547    ORCHESTRATOR_BUDGET.with(|slot| {
548        let mut state = slot.borrow_mut();
549        rollover_orchestrator_budget(&mut state);
550        OrchestratorBudgetSnapshot {
551            daily_cost_usd: state.config.daily_cost_usd,
552            hourly_cost_usd: state.config.hourly_cost_usd,
553            cost_today_usd_micros: state.cost_today_usd_micros,
554            cost_hour_usd_micros: state.cost_hour_usd_micros,
555            day_utc: state.day_utc,
556            hour_utc: state.hour_utc,
557        }
558    })
559}
560
561pub fn note_orchestrator_budget_cost(cost_usd_micros: u64) {
562    if cost_usd_micros == 0 {
563        return;
564    }
565    ORCHESTRATOR_BUDGET.with(|slot| {
566        let mut state = slot.borrow_mut();
567        rollover_orchestrator_budget(&mut state);
568        state.cost_today_usd_micros = state.cost_today_usd_micros.saturating_add(cost_usd_micros);
569        state.cost_hour_usd_micros = state.cost_hour_usd_micros.saturating_add(cost_usd_micros);
570    });
571}
572
573pub fn orchestrator_budget_would_exceed(expected_cost_usd_micros: u64) -> Option<&'static str> {
574    ORCHESTRATOR_BUDGET.with(|slot| {
575        let mut state = slot.borrow_mut();
576        rollover_orchestrator_budget(&mut state);
577        if state.config.hourly_cost_usd.is_some_and(|limit| {
578            micros_to_usd(
579                state
580                    .cost_hour_usd_micros
581                    .saturating_add(expected_cost_usd_micros),
582            ) > limit
583        }) {
584            return Some("orchestrator_hourly_budget_exceeded");
585        }
586        if state.config.daily_cost_usd.is_some_and(|limit| {
587            micros_to_usd(
588                state
589                    .cost_today_usd_micros
590                    .saturating_add(expected_cost_usd_micros),
591            ) > limit
592        }) {
593            return Some("orchestrator_daily_budget_exceeded");
594        }
595        None
596    })
597}
598
599pub fn reset_binding_budget_windows(binding: &TriggerBinding) {
600    let today = utc_day_key();
601    let hour = utc_hour_key();
602    let mut state = binding
603        .predicate_state
604        .lock()
605        .expect("trigger predicate state poisoned");
606    if state.budget_day_utc != Some(today) {
607        state.budget_day_utc = Some(today);
608        binding
609            .metrics
610            .cost_today_usd_micros
611            .store(0, Ordering::Relaxed);
612        binding
613            .metrics
614            .autonomous_decisions_today
615            .store(0, Ordering::Relaxed);
616    }
617    if state.budget_hour_utc != Some(hour) {
618        state.budget_hour_utc = Some(hour);
619        binding
620            .metrics
621            .cost_hour_usd_micros
622            .store(0, Ordering::Relaxed);
623        binding
624            .metrics
625            .autonomous_decisions_hour
626            .store(0, Ordering::Relaxed);
627    }
628}
629
630pub fn binding_budget_would_exceed(
631    binding: &TriggerBinding,
632    expected_cost_usd_micros: u64,
633) -> Option<&'static str> {
634    reset_binding_budget_windows(binding);
635    if binding.hourly_cost_usd.is_some_and(|limit| {
636        micros_to_usd(
637            binding
638                .metrics
639                .cost_hour_usd_micros
640                .load(Ordering::Relaxed)
641                .saturating_add(expected_cost_usd_micros),
642        ) > limit
643    }) {
644        return Some("hourly_budget_exceeded");
645    }
646    if binding.daily_cost_usd.is_some_and(|limit| {
647        micros_to_usd(
648            binding
649                .metrics
650                .cost_today_usd_micros
651                .load(Ordering::Relaxed)
652                .saturating_add(expected_cost_usd_micros),
653        ) > limit
654    }) {
655        return Some("daily_budget_exceeded");
656    }
657    None
658}
659
660pub fn binding_autonomy_budget_would_exceed(binding: &TriggerBinding) -> Option<&'static str> {
661    reset_binding_budget_windows(binding);
662    if binding
663        .max_autonomous_decisions_per_hour
664        .is_some_and(|limit| {
665            binding
666                .metrics
667                .autonomous_decisions_hour
668                .load(Ordering::Relaxed)
669                .saturating_add(1)
670                > limit
671        })
672    {
673        return Some("hourly_autonomy_budget_exceeded");
674    }
675    if binding
676        .max_autonomous_decisions_per_day
677        .is_some_and(|limit| {
678            binding
679                .metrics
680                .autonomous_decisions_today
681                .load(Ordering::Relaxed)
682                .saturating_add(1)
683                > limit
684        })
685    {
686        return Some("daily_autonomy_budget_exceeded");
687    }
688    None
689}
690
691pub fn note_autonomous_decision(binding: &TriggerBinding) {
692    reset_binding_budget_windows(binding);
693    binding
694        .metrics
695        .autonomous_decisions_total
696        .fetch_add(1, Ordering::Relaxed);
697    binding
698        .metrics
699        .autonomous_decisions_today
700        .fetch_add(1, Ordering::Relaxed);
701    binding
702        .metrics
703        .autonomous_decisions_hour
704        .fetch_add(1, Ordering::Relaxed);
705}
706
707pub fn expected_predicate_cost_usd_micros(binding: &TriggerBinding) -> u64 {
708    let state = binding
709        .predicate_state
710        .lock()
711        .expect("trigger predicate state poisoned");
712    if !state.recent_cost_usd_micros.is_empty() {
713        let total: u64 = state.recent_cost_usd_micros.iter().copied().sum();
714        return total / state.recent_cost_usd_micros.len() as u64;
715    }
716    binding
717        .when_budget
718        .as_ref()
719        .and_then(|budget| budget.max_cost_usd)
720        .map(usd_to_micros)
721        .unwrap_or_default()
722}
723
724pub fn record_predicate_cost_sample(binding: &TriggerBinding, cost_usd_micros: u64) {
725    let mut state = binding
726        .predicate_state
727        .lock()
728        .expect("trigger predicate state poisoned");
729    state.recent_cost_usd_micros.push_back(cost_usd_micros);
730    while state.recent_cost_usd_micros.len() > PREDICATE_COST_WINDOW {
731        state.recent_cost_usd_micros.pop_front();
732    }
733}
734
735pub fn usd_to_micros(value: f64) -> u64 {
736    if !value.is_finite() || value <= 0.0 {
737        return 0;
738    }
739    (value * 1_000_000.0).ceil() as u64
740}
741
742pub fn micros_to_usd(value: u64) -> f64 {
743    value as f64 / 1_000_000.0
744}
745
746fn rollover_orchestrator_budget(state: &mut OrchestratorBudgetState) {
747    let today = utc_day_key();
748    let hour = utc_hour_key();
749    if state.day_utc != today {
750        state.day_utc = today;
751        state.cost_today_usd_micros = 0;
752    }
753    if state.hour_utc != hour {
754        state.hour_utc = hour;
755        state.cost_hour_usd_micros = 0;
756    }
757}
758
759fn utc_day_key() -> i32 {
760    (clock::now_utc().date()
761        - time::Date::from_calendar_date(1970, time::Month::January, 1).expect("valid epoch date"))
762    .whole_days() as i32
763}
764
765fn utc_hour_key() -> i64 {
766    clock::now_utc().unix_timestamp() / 3_600
767}
768
769pub fn snapshot_trigger_bindings() -> Vec<TriggerBindingSnapshot> {
770    TRIGGER_REGISTRY.with(|slot| {
771        let registry = slot.borrow();
772        let mut snapshots = Vec::new();
773        for bindings in registry.bindings.values() {
774            for binding in bindings {
775                snapshots.push(binding.snapshot());
776            }
777        }
778        snapshots.sort_by(|left, right| {
779            left.id
780                .cmp(&right.id)
781                .then(left.version.cmp(&right.version))
782                .then(left.state.as_str().cmp(right.state.as_str()))
783        });
784        snapshots
785    })
786}
787
788#[allow(clippy::arc_with_non_send_sync)]
789pub fn resolve_trigger_binding_as_of(
790    id: &str,
791    as_of: OffsetDateTime,
792) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
793    let version = binding_version_as_of(id, as_of)?;
794    resolve_trigger_binding_version(id, version)
795}
796
797#[allow(clippy::arc_with_non_send_sync)]
798pub fn resolve_live_or_as_of(
799    id: &str,
800    recorded: RecordedTriggerBinding,
801) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
802    match resolve_live_trigger_binding(id, Some(recorded.version)) {
803        Ok(binding) => Ok(binding),
804        Err(TriggerRegistryError::UnknownBindingVersion { .. }) => {
805            let binding = resolve_trigger_binding_as_of(id, recorded.received_at)?;
806            let mut metadata = BTreeMap::new();
807            metadata.insert("trigger_id".to_string(), serde_json::json!(id));
808            metadata.insert(
809                "recorded_version".to_string(),
810                serde_json::json!(recorded.version),
811            );
812            metadata.insert(
813                "received_at".to_string(),
814                serde_json::json!(recorded
815                    .received_at
816                    .format(&time::format_description::well_known::Rfc3339)
817                    .unwrap_or_else(|_| recorded.received_at.to_string())),
818            );
819            metadata.insert(
820                "resolved_version".to_string(),
821                serde_json::json!(binding.version),
822            );
823            crate::events::log_warn_meta(
824                "replay.binding_version_gc_fallback",
825                "trigger replay fell back to lifecycle history after binding version GC",
826                metadata,
827            );
828            Ok(binding)
829        }
830        Err(error) => Err(error),
831    }
832}
833
834pub fn binding_version_as_of(id: &str, as_of: OffsetDateTime) -> Result<u32, TriggerRegistryError> {
835    TRIGGER_REGISTRY.with(|slot| {
836        let registry = slot.borrow();
837        registry.binding_version_as_of(id, as_of)
838    })
839}
840
841#[allow(clippy::arc_with_non_send_sync)]
842fn resolve_trigger_binding_version(
843    id: &str,
844    version: u32,
845) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
846    TRIGGER_REGISTRY.with(|slot| {
847        let registry = slot.borrow();
848        registry
849            .binding(id, version)
850            .ok_or_else(|| TriggerRegistryError::UnknownBindingVersion {
851                id: id.to_string(),
852                version,
853            })
854    })
855}
856
857#[allow(clippy::arc_with_non_send_sync)]
858pub fn resolve_live_trigger_binding(
859    id: &str,
860    version: Option<u32>,
861) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
862    TRIGGER_REGISTRY.with(|slot| {
863        let registry = slot.borrow();
864        if let Some(version) = version {
865            let binding = registry.binding(id, version).ok_or_else(|| {
866                TriggerRegistryError::UnknownBindingVersion {
867                    id: id.to_string(),
868                    version,
869                }
870            })?;
871            if binding.state_snapshot() == TriggerState::Terminated {
872                return Err(TriggerRegistryError::UnknownBindingVersion {
873                    id: id.to_string(),
874                    version,
875                });
876            }
877            return Ok(binding);
878        }
879
880        registry
881            .live_bindings_any_source(id)
882            .into_iter()
883            .max_by_key(|binding| binding.version)
884            .ok_or_else(|| TriggerRegistryError::UnknownId(id.to_string()))
885    })
886}
887
888pub(crate) fn matching_bindings(event: &super::TriggerEvent) -> Vec<Arc<TriggerBinding>> {
889    TRIGGER_REGISTRY.with(|slot| {
890        let registry = slot.borrow();
891        let Some(binding_ids) = registry.by_provider.get(event.provider.as_str()) else {
892            return Vec::new();
893        };
894
895        let mut bindings = Vec::new();
896        for id in binding_ids {
897            let Some(versions) = registry.bindings.get(id) else {
898                continue;
899            };
900            for binding in versions {
901                if binding.state_snapshot() != TriggerState::Active {
902                    continue;
903                }
904                if !binding.match_events.is_empty()
905                    && !binding.match_events.iter().any(|kind| kind == &event.kind)
906                {
907                    continue;
908                }
909                bindings.push(binding.clone());
910            }
911        }
912
913        bindings.sort_by(|left, right| {
914            left.id
915                .as_str()
916                .cmp(right.id.as_str())
917                .then(left.version.cmp(&right.version))
918        });
919        bindings
920    })
921}
922
923pub async fn install_manifest_triggers(
924    specs: Vec<TriggerBindingSpec>,
925) -> Result<(), TriggerRegistryError> {
926    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
927        let registry = &mut *slot.borrow_mut();
928        registry.refresh_runtime_context();
929        let mut touched_ids = BTreeSet::new();
930
931        let mut incoming = BTreeMap::new();
932        for spec in specs {
933            let spec_id = spec.id.clone();
934            if spec.source != TriggerBindingSource::Manifest {
935                return Err(TriggerRegistryError::InvalidSpec(format!(
936                    "manifest install received non-manifest trigger '{}'",
937                    spec_id
938                )));
939            }
940            if spec_id.trim().is_empty() {
941                return Err(TriggerRegistryError::InvalidSpec(
942                    "manifest trigger id cannot be empty".to_string(),
943                ));
944            }
945            if incoming.insert(spec_id.clone(), spec).is_some() {
946                return Err(TriggerRegistryError::DuplicateId(spec_id));
947            }
948        }
949
950        let mut lifecycle = Vec::new();
951        let existing_ids: Vec<String> = registry
952            .bindings
953            .iter()
954            .filter(|(_, bindings)| {
955                bindings.iter().any(|binding| {
956                    binding.source == TriggerBindingSource::Manifest
957                        && binding.state_snapshot() != TriggerState::Terminated
958                })
959            })
960            .map(|(id, _)| id.clone())
961            .collect();
962
963        for id in existing_ids {
964            let live_manifest = registry.live_bindings(&id, TriggerBindingSource::Manifest);
965            let Some(spec) = incoming.remove(&id) else {
966                for binding in live_manifest {
967                    registry.transition_binding_to_draining(&binding, &mut lifecycle);
968                }
969                touched_ids.insert(id.clone());
970                continue;
971            };
972
973            let has_matching_active = live_manifest.iter().any(|binding| {
974                binding.definition_fingerprint == spec.definition_fingerprint
975                    && matches!(
976                        binding.state_snapshot(),
977                        TriggerState::Registering | TriggerState::Active
978                    )
979            });
980            if has_matching_active {
981                continue;
982            }
983
984            for binding in live_manifest {
985                registry.transition_binding_to_draining(&binding, &mut lifecycle);
986            }
987
988            let version = registry.next_version_for_spec(&spec);
989            registry.register_binding(spec, version, &mut lifecycle);
990            touched_ids.insert(id.clone());
991        }
992
993        for spec in incoming.into_values() {
994            touched_ids.insert(spec.id.clone());
995            let version = registry.next_version_for_spec(&spec);
996            registry.register_binding(spec, version, &mut lifecycle);
997        }
998
999        for id in touched_ids {
1000            registry.gc_terminated_versions(&id);
1001        }
1002
1003        Ok((registry.event_log.clone(), lifecycle))
1004    })?;
1005
1006    append_lifecycle_events(event_log, events).await
1007}
1008
1009pub async fn dynamic_register(
1010    mut spec: TriggerBindingSpec,
1011) -> Result<TriggerId, TriggerRegistryError> {
1012    if spec.id.trim().is_empty() {
1013        spec.id = format!("dynamic_trigger_{}", Uuid::now_v7());
1014    }
1015    spec.source = TriggerBindingSource::Dynamic;
1016    let id = spec.id.clone();
1017    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1018        let registry = &mut *slot.borrow_mut();
1019        registry.refresh_runtime_context();
1020
1021        if registry.bindings.contains_key(id.as_str()) {
1022            return Err(TriggerRegistryError::DuplicateId(id.clone()));
1023        }
1024
1025        let mut lifecycle = Vec::new();
1026        let version = registry.next_version_for_spec(&spec);
1027        registry.register_binding(spec, version, &mut lifecycle);
1028        Ok((registry.event_log.clone(), lifecycle))
1029    })?;
1030
1031    append_lifecycle_events(event_log, events).await?;
1032    Ok(TriggerId::new(id))
1033}
1034
1035pub async fn dynamic_deregister(id: &str) -> Result<(), TriggerRegistryError> {
1036    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1037        let registry = &mut *slot.borrow_mut();
1038        let live_dynamic = registry.live_bindings(id, TriggerBindingSource::Dynamic);
1039        if live_dynamic.is_empty() {
1040            return Err(TriggerRegistryError::UnknownId(id.to_string()));
1041        }
1042
1043        let mut lifecycle = Vec::new();
1044        for binding in live_dynamic {
1045            registry.transition_binding_to_draining(&binding, &mut lifecycle);
1046        }
1047        Ok((registry.event_log.clone(), lifecycle))
1048    })?;
1049
1050    append_lifecycle_events(event_log, events).await
1051}
1052
1053pub async fn drain(id: &str) -> Result<(), TriggerRegistryError> {
1054    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1055        let registry = &mut *slot.borrow_mut();
1056        let live = registry.live_bindings_any_source(id);
1057        if live.is_empty() {
1058            return Err(TriggerRegistryError::UnknownId(id.to_string()));
1059        }
1060
1061        let mut lifecycle = Vec::new();
1062        for binding in live {
1063            registry.transition_binding_to_draining(&binding, &mut lifecycle);
1064        }
1065        Ok((registry.event_log.clone(), lifecycle))
1066    })?;
1067
1068    append_lifecycle_events(event_log, events).await
1069}
1070
1071fn pin_trigger_binding_inner(
1072    id: &str,
1073    version: u32,
1074    allow_terminated: bool,
1075) -> Result<(), TriggerRegistryError> {
1076    TRIGGER_REGISTRY.with(|slot| {
1077        let registry = slot.borrow();
1078        let binding = registry.binding(id, version).ok_or_else(|| {
1079            TriggerRegistryError::UnknownBindingVersion {
1080                id: id.to_string(),
1081                version,
1082            }
1083        })?;
1084        match binding.state_snapshot() {
1085            TriggerState::Terminated if !allow_terminated => {
1086                Err(TriggerRegistryError::InvalidSpec(format!(
1087                    "trigger binding '{}' version {} is terminated",
1088                    id, version
1089                )))
1090            }
1091            _ => {
1092                binding.in_flight.fetch_add(1, Ordering::Relaxed);
1093                Ok(())
1094            }
1095        }
1096    })
1097}
1098
1099pub fn pin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1100    pin_trigger_binding_inner(id, version, false)
1101}
1102
1103pub async fn unpin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1104    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1105        let registry = &mut *slot.borrow_mut();
1106        let binding = registry.binding(id, version).ok_or_else(|| {
1107            TriggerRegistryError::UnknownBindingVersion {
1108                id: id.to_string(),
1109                version,
1110            }
1111        })?;
1112        let current = binding.in_flight.load(Ordering::Relaxed);
1113        if current == 0 {
1114            return Err(TriggerRegistryError::InvalidSpec(format!(
1115                "trigger binding '{}' version {} has no in-flight events",
1116                id, version
1117            )));
1118        }
1119        binding.in_flight.fetch_sub(1, Ordering::Relaxed);
1120
1121        let mut lifecycle = Vec::new();
1122        registry.maybe_finalize_draining(&binding, &mut lifecycle);
1123        registry.gc_terminated_versions(binding.id.as_str());
1124        Ok((registry.event_log.clone(), lifecycle))
1125    })?;
1126
1127    append_lifecycle_events(event_log, events).await
1128}
1129
1130pub fn begin_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1131    begin_in_flight_inner(id, version, false)
1132}
1133
1134pub(crate) fn begin_replay_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1135    begin_in_flight_inner(id, version, true)
1136}
1137
1138fn begin_in_flight_inner(
1139    id: &str,
1140    version: u32,
1141    allow_terminated: bool,
1142) -> Result<(), TriggerRegistryError> {
1143    pin_trigger_binding_inner(id, version, allow_terminated)?;
1144    TRIGGER_REGISTRY.with(|slot| {
1145        let registry = slot.borrow();
1146        let binding = registry.binding(id, version).ok_or_else(|| {
1147            TriggerRegistryError::UnknownBindingVersion {
1148                id: id.to_string(),
1149                version,
1150            }
1151        })?;
1152        binding.metrics.received.fetch_add(1, Ordering::Relaxed);
1153        *binding
1154            .metrics
1155            .last_received_ms
1156            .lock()
1157            .expect("trigger metrics poisoned") = Some(now_ms());
1158        Ok(())
1159    })
1160}
1161
1162pub async fn finish_in_flight(
1163    id: &str,
1164    version: u32,
1165    outcome: TriggerDispatchOutcome,
1166) -> Result<(), TriggerRegistryError> {
1167    TRIGGER_REGISTRY.with(|slot| {
1168        let registry = &mut *slot.borrow_mut();
1169        let binding = registry.binding(id, version).ok_or_else(|| {
1170            TriggerRegistryError::UnknownBindingVersion {
1171                id: id.to_string(),
1172                version,
1173            }
1174        })?;
1175        let current = binding.in_flight.load(Ordering::Relaxed);
1176        if current == 0 {
1177            return Err(TriggerRegistryError::InvalidSpec(format!(
1178                "trigger binding '{}' version {} has no in-flight events",
1179                id, version
1180            )));
1181        }
1182        match outcome {
1183            TriggerDispatchOutcome::Dispatched => {
1184                binding.metrics.dispatched.fetch_add(1, Ordering::Relaxed);
1185            }
1186            TriggerDispatchOutcome::Failed => {
1187                binding.metrics.failed.fetch_add(1, Ordering::Relaxed);
1188            }
1189            TriggerDispatchOutcome::Dlq => {
1190                binding.metrics.dlq.fetch_add(1, Ordering::Relaxed);
1191            }
1192        }
1193        Ok(())
1194    })?;
1195
1196    unpin_trigger_binding(id, version).await
1197}
1198
1199impl TriggerRegistry {
1200    fn refresh_runtime_context(&mut self) {
1201        if self.event_log.is_none() {
1202            self.event_log = active_event_log();
1203        }
1204        if self.secret_provider.is_none() {
1205            self.secret_provider = default_secret_provider();
1206        }
1207    }
1208
1209    fn binding(&self, id: &str, version: u32) -> Option<Arc<TriggerBinding>> {
1210        self.bindings
1211            .get(id)
1212            .and_then(|bindings| bindings.iter().find(|binding| binding.version == version))
1213            .cloned()
1214    }
1215
1216    fn live_bindings(&self, id: &str, source: TriggerBindingSource) -> Vec<Arc<TriggerBinding>> {
1217        self.bindings
1218            .get(id)
1219            .into_iter()
1220            .flat_map(|bindings| bindings.iter())
1221            .filter(|binding| {
1222                binding.source == source && binding.state_snapshot() != TriggerState::Terminated
1223            })
1224            .cloned()
1225            .collect()
1226    }
1227
1228    fn live_bindings_any_source(&self, id: &str) -> Vec<Arc<TriggerBinding>> {
1229        self.bindings
1230            .get(id)
1231            .into_iter()
1232            .flat_map(|bindings| bindings.iter())
1233            .filter(|binding| binding.state_snapshot() != TriggerState::Terminated)
1234            .cloned()
1235            .collect()
1236    }
1237
1238    fn next_version_for_spec(&self, spec: &TriggerBindingSpec) -> u32 {
1239        if let Some(version) = self
1240            .bindings
1241            .get(spec.id.as_str())
1242            .into_iter()
1243            .flat_map(|bindings| bindings.iter())
1244            .find(|binding| binding.definition_fingerprint == spec.definition_fingerprint)
1245            .map(|binding| binding.version)
1246        {
1247            return version;
1248        }
1249
1250        let historical =
1251            self.historical_versions_for(spec.id.as_str(), spec.definition_fingerprint.as_str());
1252        if let Some(version) = historical.matching_version {
1253            return version;
1254        }
1255
1256        self.bindings
1257            .get(spec.id.as_str())
1258            .into_iter()
1259            .flat_map(|bindings| bindings.iter())
1260            .map(|binding| binding.version)
1261            .chain(historical.max_version)
1262            .max()
1263            .unwrap_or(0)
1264            + 1
1265    }
1266
1267    fn gc_terminated_versions(&mut self, id: &str) {
1268        let Some(bindings) = self.bindings.get_mut(id) else {
1269            return;
1270        };
1271
1272        let mut newest_versions: Vec<u32> =
1273            bindings.iter().map(|binding| binding.version).collect();
1274        newest_versions.sort_unstable_by(|left, right| right.cmp(left));
1275        newest_versions.truncate(TERMINATED_VERSION_RETENTION_LIMIT);
1276        let retained_versions: BTreeSet<u32> = newest_versions.into_iter().collect();
1277
1278        bindings.retain(|binding| {
1279            binding.state_snapshot() != TriggerState::Terminated
1280                || retained_versions.contains(&binding.version)
1281        });
1282
1283        if bindings.is_empty() {
1284            self.bindings.remove(id);
1285        }
1286    }
1287
1288    fn historical_versions_for(&self, id: &str, fingerprint: &str) -> HistoricalVersionLookup {
1289        let mut lookup = HistoricalVersionLookup::default();
1290        for record in self.lifecycle_records_for(id) {
1291            lookup.max_version = Some(
1292                lookup
1293                    .max_version
1294                    .unwrap_or(0)
1295                    .max(record.transition.version),
1296            );
1297            if record.transition.definition_fingerprint.as_deref() == Some(fingerprint) {
1298                lookup.matching_version = Some(record.transition.version);
1299            }
1300        }
1301        lookup
1302    }
1303
1304    fn binding_version_as_of(
1305        &self,
1306        id: &str,
1307        as_of: OffsetDateTime,
1308    ) -> Result<u32, TriggerRegistryError> {
1309        let cutoff_ms = (as_of.unix_timestamp_nanos() / 1_000_000) as i64;
1310        let mut active_version = None;
1311        for record in self.lifecycle_records_for(id) {
1312            if record.occurred_at_ms > cutoff_ms {
1313                break;
1314            }
1315            match record.transition.to_state {
1316                TriggerState::Active => active_version = Some(record.transition.version),
1317                TriggerState::Draining | TriggerState::Terminated => {
1318                    if active_version == Some(record.transition.version) {
1319                        active_version = None;
1320                    }
1321                }
1322                TriggerState::Registering => {}
1323            }
1324        }
1325
1326        active_version.ok_or_else(|| {
1327            TriggerRegistryError::InvalidSpec(format!(
1328                "no active trigger binding '{}' at {}",
1329                id,
1330                as_of
1331                    .format(&time::format_description::well_known::Rfc3339)
1332                    .unwrap_or_else(|_| as_of.to_string())
1333            ))
1334        })
1335    }
1336
1337    fn lifecycle_records_for(&self, id: &str) -> Vec<HistoricalLifecycleRecord> {
1338        let Some(event_log) = self.event_log.as_ref() else {
1339            return Vec::new();
1340        };
1341        let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1342            .expect("static triggers.lifecycle topic should always be valid");
1343        futures::executor::block_on(event_log.read_range(&topic, None, usize::MAX))
1344            .unwrap_or_default()
1345            .into_iter()
1346            .filter_map(|(_, event)| {
1347                let occurred_at_ms = event.occurred_at_ms;
1348                let transition: LifecycleStateTransitionRecord =
1349                    serde_json::from_value(event.payload).ok()?;
1350                (transition.id == id).then_some(HistoricalLifecycleRecord {
1351                    occurred_at_ms,
1352                    transition,
1353                })
1354            })
1355            .collect()
1356    }
1357
1358    #[allow(clippy::arc_with_non_send_sync)]
1359    fn register_binding(
1360        &mut self,
1361        spec: TriggerBindingSpec,
1362        version: u32,
1363        lifecycle: &mut Vec<LogEvent>,
1364    ) -> Arc<TriggerBinding> {
1365        let binding = Arc::new(TriggerBinding::new(spec, version));
1366        self.by_provider
1367            .entry(binding.provider.as_str().to_string())
1368            .or_default()
1369            .insert(binding.id.as_str().to_string());
1370        self.bindings
1371            .entry(binding.id.as_str().to_string())
1372            .or_default()
1373            .push(binding.clone());
1374        lifecycle.push(lifecycle_event(&binding, None, TriggerState::Registering));
1375        self.transition_binding_state(&binding, TriggerState::Active, lifecycle);
1376        binding
1377    }
1378
1379    fn transition_binding_to_draining(
1380        &self,
1381        binding: &Arc<TriggerBinding>,
1382        lifecycle: &mut Vec<LogEvent>,
1383    ) {
1384        if matches!(binding.state_snapshot(), TriggerState::Terminated) {
1385            return;
1386        }
1387        self.transition_binding_state(binding, TriggerState::Draining, lifecycle);
1388        self.maybe_finalize_draining(binding, lifecycle);
1389    }
1390
1391    fn maybe_finalize_draining(
1392        &self,
1393        binding: &Arc<TriggerBinding>,
1394        lifecycle: &mut Vec<LogEvent>,
1395    ) {
1396        if binding.state_snapshot() == TriggerState::Draining
1397            && binding.in_flight.load(Ordering::Relaxed) == 0
1398        {
1399            self.transition_binding_state(binding, TriggerState::Terminated, lifecycle);
1400        }
1401    }
1402
1403    fn transition_binding_state(
1404        &self,
1405        binding: &Arc<TriggerBinding>,
1406        next: TriggerState,
1407        lifecycle: &mut Vec<LogEvent>,
1408    ) {
1409        let mut state = binding.state.lock().expect("trigger state poisoned");
1410        let previous = *state;
1411        if previous == next {
1412            return;
1413        }
1414        *state = next;
1415        drop(state);
1416        lifecycle.push(lifecycle_event(binding, Some(previous), next));
1417    }
1418}
1419
1420fn lifecycle_event(
1421    binding: &TriggerBinding,
1422    from_state: Option<TriggerState>,
1423    to_state: TriggerState,
1424) -> LogEvent {
1425    LogEvent::new(
1426        "state_transition",
1427        serde_json::json!({
1428            "id": binding.id.as_str(),
1429            "binding_key": binding.binding_key(),
1430            "version": binding.version,
1431            "provider": binding.provider.as_str(),
1432            "kind": &binding.kind,
1433            "source": binding.source.as_str(),
1434            "handler_kind": binding.handler.kind(),
1435            "definition_fingerprint": &binding.definition_fingerprint,
1436            "from_state": from_state.map(TriggerState::as_str),
1437            "to_state": to_state.as_str(),
1438        }),
1439    )
1440}
1441
1442async fn append_lifecycle_events(
1443    event_log: Option<Arc<AnyEventLog>>,
1444    events: Vec<LogEvent>,
1445) -> Result<(), TriggerRegistryError> {
1446    let Some(event_log) = event_log else {
1447        return Ok(());
1448    };
1449    if events.is_empty() {
1450        return Ok(());
1451    }
1452
1453    let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1454        .expect("static triggers.lifecycle topic should always be valid");
1455    for event in events {
1456        event_log
1457            .append(&topic, event)
1458            .await
1459            .map_err(|error| TriggerRegistryError::EventLog(error.to_string()))?;
1460    }
1461    Ok(())
1462}
1463
1464fn default_secret_provider() -> Option<Arc<dyn SecretProvider>> {
1465    configured_default_chain(default_secret_namespace())
1466        .ok()
1467        .map(|provider| Arc::new(provider) as Arc<dyn SecretProvider>)
1468}
1469
1470fn default_secret_namespace() -> String {
1471    if let Ok(namespace) = std::env::var("HARN_SECRET_NAMESPACE") {
1472        if !namespace.trim().is_empty() {
1473            return namespace;
1474        }
1475    }
1476
1477    let cwd = std::env::current_dir().unwrap_or_default();
1478    let leaf = cwd
1479        .file_name()
1480        .and_then(|name| name.to_str())
1481        .filter(|name| !name.is_empty())
1482        .unwrap_or("workspace");
1483    format!("harn/{leaf}")
1484}
1485
1486fn now_ms() -> i64 {
1487    clock::now_ms()
1488}
1489
1490#[cfg(test)]
1491mod tests {
1492    use super::*;
1493    use crate::event_log::{install_default_for_base_dir, reset_active_event_log};
1494    use crate::events::{add_event_sink, clear_event_sinks, CollectorSink, EventLevel};
1495    use crate::triggers::test_util::timing::FILE_WATCH_FALLBACK_POLL;
1496    use std::rc::Rc;
1497    use time::OffsetDateTime;
1498
1499    fn manifest_spec(id: &str, fingerprint: &str) -> TriggerBindingSpec {
1500        TriggerBindingSpec {
1501            id: id.to_string(),
1502            source: TriggerBindingSource::Manifest,
1503            kind: "webhook".to_string(),
1504            provider: ProviderId::from("github"),
1505            autonomy_tier: crate::AutonomyTier::ActAuto,
1506            handler: TriggerHandlerSpec::Worker {
1507                queue: format!("{id}-queue"),
1508            },
1509            dispatch_priority: crate::WorkerQueuePriority::Normal,
1510            when: None,
1511            when_budget: None,
1512            retry: TriggerRetryConfig::default(),
1513            match_events: vec!["issues.opened".to_string()],
1514            dedupe_key: Some("event.dedupe_key".to_string()),
1515            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1516            filter: Some("event.kind".to_string()),
1517            daily_cost_usd: Some(5.0),
1518            hourly_cost_usd: None,
1519            max_autonomous_decisions_per_hour: None,
1520            max_autonomous_decisions_per_day: None,
1521            on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1522            max_concurrent: Some(10),
1523            flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1524            manifest_path: None,
1525            package_name: Some("workspace".to_string()),
1526            definition_fingerprint: fingerprint.to_string(),
1527        }
1528    }
1529
1530    fn dynamic_spec(id: &str) -> TriggerBindingSpec {
1531        TriggerBindingSpec {
1532            id: id.to_string(),
1533            source: TriggerBindingSource::Dynamic,
1534            kind: "webhook".to_string(),
1535            provider: ProviderId::from("github"),
1536            autonomy_tier: crate::AutonomyTier::ActAuto,
1537            handler: TriggerHandlerSpec::Worker {
1538                queue: format!("{id}-queue"),
1539            },
1540            dispatch_priority: crate::WorkerQueuePriority::Normal,
1541            when: None,
1542            when_budget: None,
1543            retry: TriggerRetryConfig::default(),
1544            match_events: vec!["issues.opened".to_string()],
1545            dedupe_key: None,
1546            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1547            filter: None,
1548            daily_cost_usd: None,
1549            hourly_cost_usd: None,
1550            max_autonomous_decisions_per_hour: None,
1551            max_autonomous_decisions_per_day: None,
1552            on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1553            max_concurrent: None,
1554            flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1555            manifest_path: None,
1556            package_name: None,
1557            definition_fingerprint: format!("dynamic:{id}"),
1558        }
1559    }
1560
1561    #[tokio::test(flavor = "current_thread")]
1562    async fn manifest_loaded_trigger_registers_with_zeroed_metrics() {
1563        clear_trigger_registry();
1564
1565        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1566            .await
1567            .expect("manifest trigger installs");
1568
1569        let snapshots = snapshot_trigger_bindings();
1570        assert_eq!(snapshots.len(), 1);
1571        let binding = &snapshots[0];
1572        assert_eq!(binding.id, "github-new-issue");
1573        assert_eq!(binding.version, 1);
1574        assert_eq!(binding.state, TriggerState::Active);
1575        assert_eq!(binding.metrics, TriggerMetricsSnapshot::default());
1576
1577        clear_trigger_registry();
1578    }
1579
1580    #[tokio::test(flavor = "current_thread")]
1581    async fn dynamic_register_assigns_unique_ids_and_rejects_duplicates() {
1582        clear_trigger_registry();
1583
1584        let first = dynamic_register(dynamic_spec("dynamic-a"))
1585            .await
1586            .expect("first dynamic trigger");
1587        let second = dynamic_register(dynamic_spec("dynamic-b"))
1588            .await
1589            .expect("second dynamic trigger");
1590        assert_ne!(first, second);
1591
1592        let error = dynamic_register(dynamic_spec("dynamic-a"))
1593            .await
1594            .expect_err("duplicate id should fail");
1595        assert!(matches!(error, TriggerRegistryError::DuplicateId(_)));
1596
1597        clear_trigger_registry();
1598    }
1599
1600    #[tokio::test(flavor = "current_thread")]
1601    async fn drain_waits_for_in_flight_events_before_terminating() {
1602        clear_trigger_registry();
1603
1604        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1605            .await
1606            .expect("manifest trigger installs");
1607        begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1608
1609        drain("github-new-issue").await.expect("drain succeeds");
1610        let binding = snapshot_trigger_bindings()
1611            .into_iter()
1612            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1613            .expect("binding snapshot");
1614        assert_eq!(binding.state, TriggerState::Draining);
1615        assert_eq!(binding.metrics.in_flight, 1);
1616
1617        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1618            .await
1619            .expect("finish in-flight event");
1620        let binding = snapshot_trigger_bindings()
1621            .into_iter()
1622            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1623            .expect("binding snapshot");
1624        assert_eq!(binding.state, TriggerState::Terminated);
1625        assert_eq!(binding.metrics.in_flight, 0);
1626
1627        clear_trigger_registry();
1628    }
1629
1630    #[tokio::test(flavor = "current_thread")]
1631    async fn hot_reload_registers_new_version_while_old_binding_drains() {
1632        clear_trigger_registry();
1633
1634        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1635            .await
1636            .expect("initial manifest trigger installs");
1637        begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1638
1639        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1640            .await
1641            .expect("updated manifest trigger installs");
1642
1643        let snapshots = snapshot_trigger_bindings();
1644        assert_eq!(snapshots.len(), 2);
1645        let old = snapshots
1646            .iter()
1647            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1648            .expect("old binding");
1649        let new = snapshots
1650            .iter()
1651            .find(|binding| binding.id == "github-new-issue" && binding.version == 2)
1652            .expect("new binding");
1653        assert_eq!(old.state, TriggerState::Draining);
1654        assert_eq!(new.state, TriggerState::Active);
1655
1656        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1657            .await
1658            .expect("finish old in-flight event");
1659        let old = snapshot_trigger_bindings()
1660            .into_iter()
1661            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1662            .expect("old binding");
1663        assert_eq!(old.state, TriggerState::Terminated);
1664
1665        clear_trigger_registry();
1666    }
1667
1668    #[tokio::test(flavor = "current_thread")]
1669    async fn gc_drops_terminated_versions_beyond_retention_limit() {
1670        clear_trigger_registry();
1671
1672        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1673            .await
1674            .expect("install v1");
1675        begin_in_flight("github-new-issue", 1).expect("pin v1");
1676
1677        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1678            .await
1679            .expect("install v2");
1680        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1681            .await
1682            .expect("finish v1");
1683
1684        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
1685            .await
1686            .expect("install v3");
1687
1688        let snapshots = snapshot_trigger_bindings();
1689        let versions: Vec<u32> = snapshots
1690            .into_iter()
1691            .filter(|binding| binding.id == "github-new-issue")
1692            .map(|binding| binding.version)
1693            .collect();
1694        assert_eq!(versions, vec![2, 3]);
1695
1696        clear_trigger_registry();
1697    }
1698
1699    #[tokio::test(flavor = "current_thread")]
1700    async fn lifecycle_transitions_append_to_event_log() {
1701        clear_trigger_registry();
1702        reset_active_event_log();
1703        let tempdir = tempfile::tempdir().expect("tempdir");
1704        let log = install_default_for_base_dir(tempdir.path()).expect("install event log");
1705
1706        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1707            .await
1708            .expect("manifest trigger installs");
1709        begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1710        drain("github-new-issue").await.expect("drain succeeds");
1711        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1712            .await
1713            .expect("finish event");
1714
1715        let topic = Topic::new("triggers.lifecycle").expect("valid lifecycle topic");
1716        let events = log
1717            .read_range(&topic, None, 32)
1718            .await
1719            .expect("read lifecycle events");
1720        let states: Vec<String> = events
1721            .into_iter()
1722            .filter_map(|(_, event)| {
1723                event
1724                    .payload
1725                    .get("to_state")
1726                    .and_then(|value| value.as_str())
1727                    .map(|value| value.to_string())
1728            })
1729            .collect();
1730        assert_eq!(
1731            states,
1732            vec![
1733                "registering".to_string(),
1734                "active".to_string(),
1735                "draining".to_string(),
1736                "terminated".to_string(),
1737            ]
1738        );
1739
1740        reset_active_event_log();
1741        clear_trigger_registry();
1742    }
1743
1744    #[tokio::test(flavor = "current_thread")]
1745    async fn version_history_reuses_historical_version_after_restart() {
1746        clear_trigger_registry();
1747        reset_active_event_log();
1748        let tempdir = tempfile::tempdir().expect("tempdir");
1749        install_default_for_base_dir(tempdir.path()).expect("install event log");
1750
1751        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1752            .await
1753            .expect("initial manifest trigger installs");
1754        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1755            .await
1756            .expect("updated manifest trigger installs");
1757
1758        clear_trigger_registry();
1759        reset_active_event_log();
1760        install_default_for_base_dir(tempdir.path()).expect("reopen event log");
1761
1762        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1763            .await
1764            .expect("manifest reload reuses historical version");
1765
1766        let binding = snapshot_trigger_bindings()
1767            .into_iter()
1768            .find(|binding| binding.id == "github-new-issue")
1769            .expect("binding snapshot");
1770        assert_eq!(binding.version, 2);
1771
1772        reset_active_event_log();
1773        clear_trigger_registry();
1774    }
1775
1776    #[tokio::test(flavor = "current_thread")]
1777    async fn binding_version_as_of_reports_historical_active_version() {
1778        clear_trigger_registry();
1779        reset_active_event_log();
1780        let tempdir = tempfile::tempdir().expect("tempdir");
1781        install_default_for_base_dir(tempdir.path()).expect("install event log");
1782
1783        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1784            .await
1785            .expect("initial manifest trigger installs");
1786        let before_reload = OffsetDateTime::now_utc();
1787        std::thread::sleep(FILE_WATCH_FALLBACK_POLL);
1788
1789        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1790            .await
1791            .expect("updated manifest trigger installs");
1792        let after_reload = OffsetDateTime::now_utc();
1793
1794        assert_eq!(
1795            binding_version_as_of("github-new-issue", before_reload)
1796                .expect("version before reload"),
1797            1
1798        );
1799        assert_eq!(
1800            binding_version_as_of("github-new-issue", after_reload).expect("version after reload"),
1801            2
1802        );
1803
1804        reset_active_event_log();
1805        clear_trigger_registry();
1806    }
1807
1808    #[tokio::test(flavor = "current_thread")]
1809    async fn resolve_live_or_as_of_logs_structured_gc_fallback() {
1810        clear_trigger_registry();
1811        reset_active_event_log();
1812        let sink = Rc::new(CollectorSink::new());
1813        clear_event_sinks();
1814        add_event_sink(sink.clone());
1815        let tempdir = tempfile::tempdir().expect("tempdir");
1816        install_default_for_base_dir(tempdir.path()).expect("install event log");
1817
1818        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1819            .await
1820            .expect("install v1");
1821        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1822            .await
1823            .expect("install v2");
1824        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
1825            .await
1826            .expect("install v3");
1827        let received_at = OffsetDateTime::now_utc();
1828        std::thread::sleep(FILE_WATCH_FALLBACK_POLL);
1829        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v4")])
1830            .await
1831            .expect("install v4");
1832
1833        let binding = resolve_live_or_as_of(
1834            "github-new-issue",
1835            RecordedTriggerBinding {
1836                version: 1,
1837                received_at,
1838            },
1839        )
1840        .expect("resolve fallback binding");
1841        assert_eq!(binding.version, 3);
1842
1843        let warning = sink
1844            .logs
1845            .borrow()
1846            .iter()
1847            .find(|log| log.category == "replay.binding_version_gc_fallback")
1848            .cloned()
1849            .expect("gc fallback warning");
1850        assert_eq!(warning.level, EventLevel::Warn);
1851        assert_eq!(
1852            warning.metadata.get("trigger_id"),
1853            Some(&serde_json::json!("github-new-issue"))
1854        );
1855        assert_eq!(
1856            warning.metadata.get("recorded_version"),
1857            Some(&serde_json::json!(1))
1858        );
1859        assert_eq!(
1860            warning.metadata.get("received_at"),
1861            Some(&serde_json::json!(received_at
1862                .format(&time::format_description::well_known::Rfc3339)
1863                .unwrap_or_else(|_| received_at.to_string())))
1864        );
1865        assert_eq!(
1866            warning.metadata.get("resolved_version"),
1867            Some(&serde_json::json!(3))
1868        );
1869
1870        clear_event_sinks();
1871        crate::events::reset_event_sinks();
1872        reset_active_event_log();
1873        clear_trigger_registry();
1874    }
1875}