Skip to main content

harn_vm/triggers/
registry.rs

1use serde::{Deserialize, Serialize};
2use std::cell::RefCell;
3use std::collections::{BTreeMap, BTreeSet, VecDeque};
4use std::path::PathBuf;
5use std::rc::Rc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::{Arc, Mutex};
8
9use time::OffsetDateTime;
10use uuid::Uuid;
11
12use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogEvent, Topic};
13use crate::llm::trigger_predicate::TriggerPredicateBudget;
14use crate::secrets::{configured_default_chain, SecretProvider};
15use crate::triggers::test_util::clock;
16use crate::trust_graph::AutonomyTier;
17use crate::value::VmClosure;
18
19use super::aggregation::TriggerAggregationConfig;
20use super::dispatcher::TriggerRetryConfig;
21use super::flow_control::TriggerFlowControlConfig;
22use super::ProviderId;
23
24#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
25pub struct TriggerId(String);
26
27impl TriggerId {
28    pub fn new(value: impl Into<String>) -> Self {
29        Self(value.into())
30    }
31
32    pub fn as_str(&self) -> &str {
33        &self.0
34    }
35}
36
37impl std::fmt::Display for TriggerId {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        self.0.fmt(f)
40    }
41}
42
43#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
44#[serde(rename_all = "snake_case")]
45pub enum TriggerState {
46    Registering,
47    Active,
48    Paused,
49    Draining,
50    Terminated,
51}
52
53impl TriggerState {
54    pub fn as_str(self) -> &'static str {
55        match self {
56            Self::Registering => "registering",
57            Self::Active => "active",
58            Self::Paused => "paused",
59            Self::Draining => "draining",
60            Self::Terminated => "terminated",
61        }
62    }
63}
64
65#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
66#[serde(rename_all = "snake_case")]
67pub enum TriggerBindingSource {
68    Manifest,
69    Dynamic,
70}
71
72impl TriggerBindingSource {
73    pub fn as_str(self) -> &'static str {
74        match self {
75            Self::Manifest => "manifest",
76            Self::Dynamic => "dynamic",
77        }
78    }
79}
80
81#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
82#[serde(rename_all = "snake_case")]
83pub enum TriggerBudgetExhaustionStrategy {
84    #[default]
85    False,
86    RetryLater,
87    Fail,
88    Warn,
89}
90
91impl TriggerBudgetExhaustionStrategy {
92    pub fn as_str(self) -> &'static str {
93        match self {
94            Self::False => "false",
95            Self::RetryLater => "retry_later",
96            Self::Fail => "fail",
97            Self::Warn => "warn",
98        }
99    }
100}
101
102#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
103pub struct OrchestratorBudgetConfig {
104    pub daily_cost_usd: Option<f64>,
105    pub hourly_cost_usd: Option<f64>,
106}
107
108#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
109pub struct OrchestratorBudgetSnapshot {
110    pub daily_cost_usd: Option<f64>,
111    pub hourly_cost_usd: Option<f64>,
112    pub cost_today_usd_micros: u64,
113    pub cost_hour_usd_micros: u64,
114    pub day_utc: i32,
115    pub hour_utc: i64,
116}
117
118#[derive(Debug)]
119struct OrchestratorBudgetState {
120    config: OrchestratorBudgetConfig,
121    day_utc: i32,
122    hour_utc: i64,
123    cost_today_usd_micros: u64,
124    cost_hour_usd_micros: u64,
125}
126
127impl Default for OrchestratorBudgetState {
128    fn default() -> Self {
129        Self {
130            config: OrchestratorBudgetConfig::default(),
131            day_utc: utc_day_key(),
132            hour_utc: utc_hour_key(),
133            cost_today_usd_micros: 0,
134            cost_hour_usd_micros: 0,
135        }
136    }
137}
138
139#[derive(Clone)]
140pub enum TriggerHandlerSpec {
141    Local {
142        raw: String,
143        closure: Rc<VmClosure>,
144    },
145    A2a {
146        target: String,
147        allow_cleartext: bool,
148    },
149    Worker {
150        queue: String,
151    },
152    Persona {
153        binding: crate::PersonaRuntimeBinding,
154    },
155    AutoResume {
156        worker_id: String,
157    },
158    /// Composes triggers (#1870) with named agent pools (#1883). On match,
159    /// the dispatcher resolves `pool` by name, invokes `task_factory(event)`
160    /// to build the per-event closure, and submits it to the pool with the
161    /// pool's queue strategy + backpressure policy applied (PL-04 / #1889).
162    SpawnToPool {
163        pool: String,
164        priority_from: Option<String>,
165        key_from: Option<String>,
166        task_factory: Rc<VmClosure>,
167    },
168    /// Composes triggers (#1870) with the reminder pipeline (#1815) by
169    /// injecting a `SystemReminder` into a target agent session's transcript
170    /// when the trigger matches (CH-05 / #1876). The reminder appears at the
171    /// session's next turn boundary — same as any other reminder. No spawn,
172    /// no resume, no signal. `body` is a prompt-template string rendered
173    /// against `{event}` / `{batch.events}` per match. `target` resolution
174    /// happens at dispatch time via [`TargetExpr`]; missing-target dispatches
175    /// are recorded as audit events rather than crashing the trigger.
176    ReminderInject {
177        target: TargetExpr,
178        body: String,
179        tags: Vec<String>,
180        ttl_turns: Option<i64>,
181        dedupe_key: Option<String>,
182        propagate: crate::llm::helpers::ReminderPropagate,
183        role_hint: crate::llm::helpers::ReminderRoleHint,
184        preserve_on_compact: bool,
185    },
186    /// CH-10 (#1910): emergency "panic" broadcast. When the trigger fires,
187    /// every running worker matched by `target_agents` is suspended
188    /// synchronously via the cooperative suspend pipeline used by
189    /// `suspend_agent` (#1837), bypassing the normal turn-boundary
190    /// delivery contract. Already-suspended or terminal workers are skipped
191    /// (no double-suspend, no error). `reason` is propagated to the
192    /// suspension envelope, the `WorkerSuspended` event, and the
193    /// `triggers.interrupt_and_suspend.audit` audit entry per suspension.
194    /// A no-target dispatch records a single roll-up audit entry and
195    /// returns a `broadcast` result with `suspended_count: 0` — graceful
196    /// no-op rather than dispatch failure, mirroring the `ReminderInject`
197    /// missing-target contract.
198    InterruptAndSuspend {
199        target_agents: AgentScope,
200        reason: String,
201    },
202}
203
204/// Resolution mode for the target worker set of a
205/// [`TriggerHandlerSpec::InterruptAndSuspend`] dispatch. `All` enumerates
206/// every entry in the local worker registry (the org-scoped "panic"
207/// broadcast); `Concrete` carries an explicit worker-id list from the
208/// registration; `Closure` defers resolution to a user closure that runs at
209/// dispatch time with the event payload bound to its first argument and
210/// must return a list of worker-id strings.
211#[derive(Clone)]
212pub enum AgentScope {
213    All,
214    Concrete(Vec<String>),
215    Closure(Rc<VmClosure>),
216}
217
218impl AgentScope {
219    pub fn kind(&self) -> &'static str {
220        match self {
221            Self::All => "all",
222            Self::Concrete(_) => "concrete",
223            Self::Closure(_) => "closure",
224        }
225    }
226}
227
228impl std::fmt::Debug for AgentScope {
229    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
230        match self {
231            Self::All => f.write_str("All"),
232            Self::Concrete(ids) => f.debug_tuple("Concrete").field(ids).finish(),
233            Self::Closure(_) => f.write_str("Closure(<vm_closure>)"),
234        }
235    }
236}
237
238/// Resolution mode for the target session of a [`TriggerHandlerSpec::ReminderInject`]
239/// dispatch. `Current` walks the current-session thread-local; `Parent` resolves
240/// the active session's parent in the agent-session lineage; `Concrete` carries
241/// a literal session id from the registration; `Closure` defers resolution to
242/// a user closure that runs at dispatch time with the event payload bound to
243/// its first argument and must return the session id as a string.
244#[derive(Clone)]
245pub enum TargetExpr {
246    Current,
247    Parent,
248    Concrete(String),
249    Closure(Rc<VmClosure>),
250}
251
252impl TargetExpr {
253    pub fn kind(&self) -> &'static str {
254        match self {
255            Self::Current => "current",
256            Self::Parent => "parent",
257            Self::Concrete(_) => "concrete",
258            Self::Closure(_) => "closure",
259        }
260    }
261}
262
263impl std::fmt::Debug for TargetExpr {
264    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
265        match self {
266            Self::Current => f.write_str("Current"),
267            Self::Parent => f.write_str("Parent"),
268            Self::Concrete(id) => f.debug_tuple("Concrete").field(id).finish(),
269            Self::Closure(_) => f.write_str("Closure(<vm_closure>)"),
270        }
271    }
272}
273
274impl std::fmt::Debug for TriggerHandlerSpec {
275    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276        match self {
277            Self::Local { raw, .. } => f.debug_struct("Local").field("raw", raw).finish(),
278            Self::A2a {
279                target,
280                allow_cleartext,
281            } => f
282                .debug_struct("A2a")
283                .field("target", target)
284                .field("allow_cleartext", allow_cleartext)
285                .finish(),
286            Self::Worker { queue } => f.debug_struct("Worker").field("queue", queue).finish(),
287            Self::Persona { binding } => f
288                .debug_struct("Persona")
289                .field("name", &binding.name)
290                .finish(),
291            Self::AutoResume { worker_id } => f
292                .debug_struct("AutoResume")
293                .field("worker_id", worker_id)
294                .finish(),
295            Self::SpawnToPool {
296                pool,
297                priority_from,
298                key_from,
299                ..
300            } => f
301                .debug_struct("SpawnToPool")
302                .field("pool", pool)
303                .field("priority_from", priority_from)
304                .field("key_from", key_from)
305                .finish(),
306            Self::ReminderInject {
307                target,
308                body,
309                tags,
310                ttl_turns,
311                dedupe_key,
312                propagate,
313                role_hint,
314                preserve_on_compact,
315            } => f
316                .debug_struct("ReminderInject")
317                .field("target", target)
318                .field("body", body)
319                .field("tags", tags)
320                .field("ttl_turns", ttl_turns)
321                .field("dedupe_key", dedupe_key)
322                .field("propagate", propagate)
323                .field("role_hint", role_hint)
324                .field("preserve_on_compact", preserve_on_compact)
325                .finish(),
326            Self::InterruptAndSuspend {
327                target_agents,
328                reason,
329            } => f
330                .debug_struct("InterruptAndSuspend")
331                .field("target_agents", target_agents)
332                .field("reason", reason)
333                .finish(),
334        }
335    }
336}
337
338impl TriggerHandlerSpec {
339    pub fn kind(&self) -> &'static str {
340        match self {
341            Self::Local { .. } => "local",
342            Self::A2a { .. } => "a2a",
343            Self::Worker { .. } => "worker",
344            Self::Persona { .. } => "persona",
345            Self::AutoResume { .. } => "auto_resume",
346            Self::SpawnToPool { .. } => "spawn_to_pool",
347            Self::ReminderInject { .. } => "reminder_inject",
348            Self::InterruptAndSuspend { .. } => "interrupt_and_suspend",
349        }
350    }
351}
352
353#[derive(Clone)]
354pub struct TriggerPredicateSpec {
355    pub raw: String,
356    pub closure: Rc<VmClosure>,
357}
358
359impl std::fmt::Debug for TriggerPredicateSpec {
360    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
361        f.debug_struct("TriggerPredicateSpec")
362            .field("raw", &self.raw)
363            .finish()
364    }
365}
366
367#[derive(Clone, Debug)]
368pub struct TriggerBindingSpec {
369    pub id: String,
370    pub source: TriggerBindingSource,
371    pub kind: String,
372    pub provider: ProviderId,
373    pub autonomy_tier: AutonomyTier,
374    pub handler: TriggerHandlerSpec,
375    pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
376    pub when: Option<TriggerPredicateSpec>,
377    pub when_budget: Option<TriggerPredicateBudget>,
378    pub retry: TriggerRetryConfig,
379    pub match_events: Vec<String>,
380    pub dedupe_key: Option<String>,
381    pub dedupe_retention_days: u32,
382    pub filter: Option<String>,
383    pub daily_cost_usd: Option<f64>,
384    pub hourly_cost_usd: Option<f64>,
385    pub max_autonomous_decisions_per_hour: Option<u64>,
386    pub max_autonomous_decisions_per_day: Option<u64>,
387    pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
388    pub max_concurrent: Option<u32>,
389    pub flow_control: TriggerFlowControlConfig,
390    /// CH-04 (#1875): optional aggregation buffer. When set, the
391    /// dispatcher accumulates matching events into a per-(binding,
392    /// partition_key) buffer and dispatches the handler with a batched
393    /// event when `count` is reached or the `window` elapses.
394    pub aggregation: Option<TriggerAggregationConfig>,
395    pub manifest_path: Option<PathBuf>,
396    pub package_name: Option<String>,
397    pub definition_fingerprint: String,
398}
399
400#[derive(Debug)]
401pub struct TriggerMetrics {
402    pub received: AtomicU64,
403    pub dispatched: AtomicU64,
404    pub failed: AtomicU64,
405    pub dlq: AtomicU64,
406    pub last_received_ms: Mutex<Option<i64>>,
407    pub cost_total_usd_micros: AtomicU64,
408    pub cost_today_usd_micros: AtomicU64,
409    pub cost_hour_usd_micros: AtomicU64,
410    pub autonomous_decisions_total: AtomicU64,
411    pub autonomous_decisions_today: AtomicU64,
412    pub autonomous_decisions_hour: AtomicU64,
413}
414
415impl Default for TriggerMetrics {
416    fn default() -> Self {
417        Self {
418            received: AtomicU64::new(0),
419            dispatched: AtomicU64::new(0),
420            failed: AtomicU64::new(0),
421            dlq: AtomicU64::new(0),
422            last_received_ms: Mutex::new(None),
423            cost_total_usd_micros: AtomicU64::new(0),
424            cost_today_usd_micros: AtomicU64::new(0),
425            cost_hour_usd_micros: AtomicU64::new(0),
426            autonomous_decisions_total: AtomicU64::new(0),
427            autonomous_decisions_today: AtomicU64::new(0),
428            autonomous_decisions_hour: AtomicU64::new(0),
429        }
430    }
431}
432
433#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
434pub struct TriggerMetricsSnapshot {
435    pub received: u64,
436    pub dispatched: u64,
437    pub failed: u64,
438    pub dlq: u64,
439    pub in_flight: u64,
440    pub last_received_ms: Option<i64>,
441    pub cost_total_usd_micros: u64,
442    pub cost_today_usd_micros: u64,
443    pub cost_hour_usd_micros: u64,
444    pub autonomous_decisions_total: u64,
445    pub autonomous_decisions_today: u64,
446    pub autonomous_decisions_hour: u64,
447}
448
449pub struct TriggerBinding {
450    pub id: TriggerId,
451    pub version: u32,
452    pub source: TriggerBindingSource,
453    pub kind: String,
454    pub provider: ProviderId,
455    pub autonomy_tier: AutonomyTier,
456    pub handler: TriggerHandlerSpec,
457    pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
458    pub when: Option<TriggerPredicateSpec>,
459    pub when_budget: Option<TriggerPredicateBudget>,
460    pub retry: TriggerRetryConfig,
461    pub match_events: Vec<String>,
462    pub dedupe_key: Option<String>,
463    pub dedupe_retention_days: u32,
464    pub filter: Option<String>,
465    pub daily_cost_usd: Option<f64>,
466    pub hourly_cost_usd: Option<f64>,
467    pub max_autonomous_decisions_per_hour: Option<u64>,
468    pub max_autonomous_decisions_per_day: Option<u64>,
469    pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
470    pub max_concurrent: Option<u32>,
471    pub flow_control: TriggerFlowControlConfig,
472    /// CH-04 (#1875): see `TriggerBindingSpec::aggregation`. Cloned at
473    /// registration so the dispatcher does not need to lock the registry
474    /// for every emit.
475    pub aggregation: Option<TriggerAggregationConfig>,
476    pub manifest_path: Option<PathBuf>,
477    pub package_name: Option<String>,
478    pub definition_fingerprint: String,
479    pub state: Mutex<TriggerState>,
480    pub metrics: TriggerMetrics,
481    pub in_flight: AtomicU64,
482    pub cancel_token: Arc<AtomicBool>,
483    pub predicate_state: Mutex<TriggerPredicateState>,
484}
485
486#[derive(Clone, Debug, Default)]
487pub struct TriggerPredicateState {
488    pub budget_day_utc: Option<i32>,
489    pub budget_hour_utc: Option<i64>,
490    pub consecutive_failures: u32,
491    pub breaker_open_until_ms: Option<i64>,
492    pub recent_cost_usd_micros: VecDeque<u64>,
493}
494
495impl std::fmt::Debug for TriggerBinding {
496    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
497        f.debug_struct("TriggerBinding")
498            .field("id", &self.id)
499            .field("version", &self.version)
500            .field("source", &self.source)
501            .field("kind", &self.kind)
502            .field("provider", &self.provider)
503            .field("handler_kind", &self.handler.kind())
504            .field("state", &self.state_snapshot())
505            .finish()
506    }
507}
508
509impl TriggerBinding {
510    pub fn snapshot(&self) -> TriggerBindingSnapshot {
511        TriggerBindingSnapshot {
512            id: self.id.as_str().to_string(),
513            version: self.version,
514            source: self.source,
515            kind: self.kind.clone(),
516            provider: self.provider.as_str().to_string(),
517            autonomy_tier: self.autonomy_tier,
518            handler_kind: self.handler.kind().to_string(),
519            state: self.state_snapshot(),
520            metrics: self.metrics_snapshot(),
521            daily_cost_usd: self.daily_cost_usd,
522            hourly_cost_usd: self.hourly_cost_usd,
523            max_autonomous_decisions_per_hour: self.max_autonomous_decisions_per_hour,
524            max_autonomous_decisions_per_day: self.max_autonomous_decisions_per_day,
525            on_budget_exhausted: self.on_budget_exhausted,
526        }
527    }
528
529    fn new(spec: TriggerBindingSpec, version: u32) -> Self {
530        Self {
531            id: TriggerId::new(spec.id),
532            version,
533            source: spec.source,
534            kind: spec.kind,
535            provider: spec.provider,
536            autonomy_tier: spec.autonomy_tier,
537            handler: spec.handler,
538            dispatch_priority: spec.dispatch_priority,
539            when: spec.when,
540            when_budget: spec.when_budget,
541            retry: spec.retry,
542            match_events: spec.match_events,
543            dedupe_key: spec.dedupe_key,
544            dedupe_retention_days: spec.dedupe_retention_days,
545            filter: spec.filter,
546            daily_cost_usd: spec.daily_cost_usd,
547            hourly_cost_usd: spec.hourly_cost_usd,
548            max_autonomous_decisions_per_hour: spec.max_autonomous_decisions_per_hour,
549            max_autonomous_decisions_per_day: spec.max_autonomous_decisions_per_day,
550            on_budget_exhausted: spec.on_budget_exhausted,
551            max_concurrent: spec.max_concurrent,
552            flow_control: spec.flow_control,
553            aggregation: spec.aggregation,
554            manifest_path: spec.manifest_path,
555            package_name: spec.package_name,
556            definition_fingerprint: spec.definition_fingerprint,
557            state: Mutex::new(TriggerState::Registering),
558            metrics: TriggerMetrics::default(),
559            in_flight: AtomicU64::new(0),
560            cancel_token: Arc::new(AtomicBool::new(false)),
561            predicate_state: Mutex::new(TriggerPredicateState::default()),
562        }
563    }
564
565    pub fn binding_key(&self) -> String {
566        format!("{}@v{}", self.id.as_str(), self.version)
567    }
568
569    pub fn state_snapshot(&self) -> TriggerState {
570        *self.state.lock().expect("trigger state poisoned")
571    }
572
573    pub fn metrics_snapshot(&self) -> TriggerMetricsSnapshot {
574        TriggerMetricsSnapshot {
575            received: self.metrics.received.load(Ordering::Relaxed),
576            dispatched: self.metrics.dispatched.load(Ordering::Relaxed),
577            failed: self.metrics.failed.load(Ordering::Relaxed),
578            dlq: self.metrics.dlq.load(Ordering::Relaxed),
579            in_flight: self.in_flight.load(Ordering::Relaxed),
580            last_received_ms: *self
581                .metrics
582                .last_received_ms
583                .lock()
584                .expect("trigger metrics poisoned"),
585            cost_total_usd_micros: self.metrics.cost_total_usd_micros.load(Ordering::Relaxed),
586            cost_today_usd_micros: self.metrics.cost_today_usd_micros.load(Ordering::Relaxed),
587            cost_hour_usd_micros: self.metrics.cost_hour_usd_micros.load(Ordering::Relaxed),
588            autonomous_decisions_total: self
589                .metrics
590                .autonomous_decisions_total
591                .load(Ordering::Relaxed),
592            autonomous_decisions_today: self
593                .metrics
594                .autonomous_decisions_today
595                .load(Ordering::Relaxed),
596            autonomous_decisions_hour: self
597                .metrics
598                .autonomous_decisions_hour
599                .load(Ordering::Relaxed),
600        }
601    }
602}
603
604#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
605pub struct TriggerBindingSnapshot {
606    pub id: String,
607    pub version: u32,
608    pub source: TriggerBindingSource,
609    pub kind: String,
610    pub provider: String,
611    pub autonomy_tier: AutonomyTier,
612    pub handler_kind: String,
613    pub state: TriggerState,
614    pub metrics: TriggerMetricsSnapshot,
615    pub daily_cost_usd: Option<f64>,
616    pub hourly_cost_usd: Option<f64>,
617    pub max_autonomous_decisions_per_hour: Option<u64>,
618    pub max_autonomous_decisions_per_day: Option<u64>,
619    pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
620}
621
622#[derive(Clone, Copy, Debug, PartialEq, Eq)]
623pub enum TriggerDispatchOutcome {
624    Dispatched,
625    Failed,
626    Dlq,
627}
628
629#[derive(Debug)]
630pub enum TriggerRegistryError {
631    DuplicateId(String),
632    InvalidSpec(String),
633    UnknownId(String),
634    UnknownBindingVersion { id: String, version: u32 },
635    EventLog(String),
636}
637
638impl std::fmt::Display for TriggerRegistryError {
639    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
640        match self {
641            Self::DuplicateId(id) => write!(f, "duplicate trigger id '{id}'"),
642            Self::InvalidSpec(message) | Self::EventLog(message) => f.write_str(message),
643            Self::UnknownId(id) => write!(f, "unknown trigger id '{id}'"),
644            Self::UnknownBindingVersion { id, version } => {
645                write!(f, "unknown trigger binding '{id}' version {version}")
646            }
647        }
648    }
649}
650
651impl std::error::Error for TriggerRegistryError {}
652
653#[derive(Default)]
654pub struct TriggerRegistry {
655    bindings: BTreeMap<String, Vec<Arc<TriggerBinding>>>,
656    by_provider: BTreeMap<String, BTreeSet<String>>,
657    event_log: Option<Arc<AnyEventLog>>,
658    secret_provider: Option<Arc<dyn SecretProvider>>,
659}
660
661thread_local! {
662    static TRIGGER_REGISTRY: RefCell<TriggerRegistry> = RefCell::new(TriggerRegistry::default());
663}
664
665thread_local! {
666    static ORCHESTRATOR_BUDGET: RefCell<OrchestratorBudgetState> =
667        RefCell::new(OrchestratorBudgetState::default());
668}
669
670const TERMINATED_VERSION_RETENTION_LIMIT: usize = 2;
671
672const TRIGGERS_LIFECYCLE_TOPIC: &str = "triggers.lifecycle";
673const PREDICATE_COST_WINDOW: usize = 100;
674
675#[derive(Clone, Debug, Deserialize)]
676struct LifecycleStateTransitionRecord {
677    id: String,
678    version: u32,
679    #[serde(default)]
680    definition_fingerprint: Option<String>,
681    to_state: TriggerState,
682}
683
684#[derive(Clone, Debug)]
685struct HistoricalLifecycleRecord {
686    occurred_at_ms: i64,
687    transition: LifecycleStateTransitionRecord,
688}
689
690#[derive(Clone, Copy, Debug, PartialEq, Eq)]
691pub struct RecordedTriggerBinding {
692    pub version: u32,
693    pub received_at: OffsetDateTime,
694}
695
696#[derive(Clone, Copy, Debug, Default)]
697struct HistoricalVersionLookup {
698    matching_version: Option<u32>,
699    max_version: Option<u32>,
700}
701
702pub fn clear_trigger_registry() {
703    TRIGGER_REGISTRY.with(|slot| {
704        *slot.borrow_mut() = TriggerRegistry::default();
705    });
706    clear_orchestrator_budget();
707    super::aggregation::clear_aggregation_state();
708}
709
710pub fn install_orchestrator_budget(config: OrchestratorBudgetConfig) {
711    ORCHESTRATOR_BUDGET.with(|slot| {
712        let mut state = slot.borrow_mut();
713        rollover_orchestrator_budget(&mut state);
714        state.config = config;
715    });
716}
717
718pub fn clear_orchestrator_budget() {
719    ORCHESTRATOR_BUDGET.with(|slot| {
720        *slot.borrow_mut() = OrchestratorBudgetState::default();
721    });
722}
723
724pub fn snapshot_orchestrator_budget() -> OrchestratorBudgetSnapshot {
725    ORCHESTRATOR_BUDGET.with(|slot| {
726        let mut state = slot.borrow_mut();
727        rollover_orchestrator_budget(&mut state);
728        OrchestratorBudgetSnapshot {
729            daily_cost_usd: state.config.daily_cost_usd,
730            hourly_cost_usd: state.config.hourly_cost_usd,
731            cost_today_usd_micros: state.cost_today_usd_micros,
732            cost_hour_usd_micros: state.cost_hour_usd_micros,
733            day_utc: state.day_utc,
734            hour_utc: state.hour_utc,
735        }
736    })
737}
738
739pub fn note_orchestrator_budget_cost(cost_usd_micros: u64) {
740    if cost_usd_micros == 0 {
741        return;
742    }
743    ORCHESTRATOR_BUDGET.with(|slot| {
744        let mut state = slot.borrow_mut();
745        rollover_orchestrator_budget(&mut state);
746        state.cost_today_usd_micros = state.cost_today_usd_micros.saturating_add(cost_usd_micros);
747        state.cost_hour_usd_micros = state.cost_hour_usd_micros.saturating_add(cost_usd_micros);
748    });
749}
750
751pub fn orchestrator_budget_would_exceed(expected_cost_usd_micros: u64) -> Option<&'static str> {
752    ORCHESTRATOR_BUDGET.with(|slot| {
753        let mut state = slot.borrow_mut();
754        rollover_orchestrator_budget(&mut state);
755        if state.config.hourly_cost_usd.is_some_and(|limit| {
756            micros_to_usd(
757                state
758                    .cost_hour_usd_micros
759                    .saturating_add(expected_cost_usd_micros),
760            ) > limit
761        }) {
762            return Some("orchestrator_hourly_budget_exceeded");
763        }
764        if state.config.daily_cost_usd.is_some_and(|limit| {
765            micros_to_usd(
766                state
767                    .cost_today_usd_micros
768                    .saturating_add(expected_cost_usd_micros),
769            ) > limit
770        }) {
771            return Some("orchestrator_daily_budget_exceeded");
772        }
773        None
774    })
775}
776
777pub fn reset_binding_budget_windows(binding: &TriggerBinding) {
778    let today = utc_day_key();
779    let hour = utc_hour_key();
780    let mut state = binding
781        .predicate_state
782        .lock()
783        .expect("trigger predicate state poisoned");
784    if state.budget_day_utc != Some(today) {
785        state.budget_day_utc = Some(today);
786        binding
787            .metrics
788            .cost_today_usd_micros
789            .store(0, Ordering::Relaxed);
790        binding
791            .metrics
792            .autonomous_decisions_today
793            .store(0, Ordering::Relaxed);
794    }
795    if state.budget_hour_utc != Some(hour) {
796        state.budget_hour_utc = Some(hour);
797        binding
798            .metrics
799            .cost_hour_usd_micros
800            .store(0, Ordering::Relaxed);
801        binding
802            .metrics
803            .autonomous_decisions_hour
804            .store(0, Ordering::Relaxed);
805    }
806}
807
808pub fn binding_budget_would_exceed(
809    binding: &TriggerBinding,
810    expected_cost_usd_micros: u64,
811) -> Option<&'static str> {
812    reset_binding_budget_windows(binding);
813    if binding.hourly_cost_usd.is_some_and(|limit| {
814        micros_to_usd(
815            binding
816                .metrics
817                .cost_hour_usd_micros
818                .load(Ordering::Relaxed)
819                .saturating_add(expected_cost_usd_micros),
820        ) > limit
821    }) {
822        return Some("hourly_budget_exceeded");
823    }
824    if binding.daily_cost_usd.is_some_and(|limit| {
825        micros_to_usd(
826            binding
827                .metrics
828                .cost_today_usd_micros
829                .load(Ordering::Relaxed)
830                .saturating_add(expected_cost_usd_micros),
831        ) > limit
832    }) {
833        return Some("daily_budget_exceeded");
834    }
835    None
836}
837
838pub fn binding_autonomy_budget_would_exceed(binding: &TriggerBinding) -> Option<&'static str> {
839    reset_binding_budget_windows(binding);
840    if binding
841        .max_autonomous_decisions_per_hour
842        .is_some_and(|limit| {
843            binding
844                .metrics
845                .autonomous_decisions_hour
846                .load(Ordering::Relaxed)
847                .saturating_add(1)
848                > limit
849        })
850    {
851        return Some("hourly_autonomy_budget_exceeded");
852    }
853    if binding
854        .max_autonomous_decisions_per_day
855        .is_some_and(|limit| {
856            binding
857                .metrics
858                .autonomous_decisions_today
859                .load(Ordering::Relaxed)
860                .saturating_add(1)
861                > limit
862        })
863    {
864        return Some("daily_autonomy_budget_exceeded");
865    }
866    None
867}
868
869pub fn note_autonomous_decision(binding: &TriggerBinding) {
870    reset_binding_budget_windows(binding);
871    binding
872        .metrics
873        .autonomous_decisions_total
874        .fetch_add(1, Ordering::Relaxed);
875    binding
876        .metrics
877        .autonomous_decisions_today
878        .fetch_add(1, Ordering::Relaxed);
879    binding
880        .metrics
881        .autonomous_decisions_hour
882        .fetch_add(1, Ordering::Relaxed);
883}
884
885pub fn expected_predicate_cost_usd_micros(binding: &TriggerBinding) -> u64 {
886    let state = binding
887        .predicate_state
888        .lock()
889        .expect("trigger predicate state poisoned");
890    if !state.recent_cost_usd_micros.is_empty() {
891        let total: u64 = state.recent_cost_usd_micros.iter().copied().sum();
892        return total / state.recent_cost_usd_micros.len() as u64;
893    }
894    binding
895        .when_budget
896        .as_ref()
897        .and_then(|budget| budget.max_cost_usd)
898        .map(usd_to_micros)
899        .unwrap_or_default()
900}
901
902pub fn record_predicate_cost_sample(binding: &TriggerBinding, cost_usd_micros: u64) {
903    let mut state = binding
904        .predicate_state
905        .lock()
906        .expect("trigger predicate state poisoned");
907    state.recent_cost_usd_micros.push_back(cost_usd_micros);
908    while state.recent_cost_usd_micros.len() > PREDICATE_COST_WINDOW {
909        state.recent_cost_usd_micros.pop_front();
910    }
911}
912
913pub fn usd_to_micros(value: f64) -> u64 {
914    if !value.is_finite() || value <= 0.0 {
915        return 0;
916    }
917    (value * 1_000_000.0).ceil() as u64
918}
919
920pub fn micros_to_usd(value: u64) -> f64 {
921    value as f64 / 1_000_000.0
922}
923
924fn rollover_orchestrator_budget(state: &mut OrchestratorBudgetState) {
925    let today = utc_day_key();
926    let hour = utc_hour_key();
927    if state.day_utc != today {
928        state.day_utc = today;
929        state.cost_today_usd_micros = 0;
930    }
931    if state.hour_utc != hour {
932        state.hour_utc = hour;
933        state.cost_hour_usd_micros = 0;
934    }
935}
936
937fn utc_day_key() -> i32 {
938    (clock::now_utc().date()
939        - time::Date::from_calendar_date(1970, time::Month::January, 1).expect("valid epoch date"))
940    .whole_days() as i32
941}
942
943fn utc_hour_key() -> i64 {
944    clock::now_utc().unix_timestamp() / 3_600
945}
946
947pub fn snapshot_trigger_bindings() -> Vec<TriggerBindingSnapshot> {
948    TRIGGER_REGISTRY.with(|slot| {
949        let registry = slot.borrow();
950        let mut snapshots = Vec::new();
951        for bindings in registry.bindings.values() {
952            for binding in bindings {
953                snapshots.push(binding.snapshot());
954            }
955        }
956        snapshots.sort_by(|left, right| {
957            left.id
958                .cmp(&right.id)
959                .then(left.version.cmp(&right.version))
960                .then(left.state.as_str().cmp(right.state.as_str()))
961        });
962        snapshots
963    })
964}
965
966#[allow(clippy::arc_with_non_send_sync)]
967pub fn resolve_trigger_binding_as_of(
968    id: &str,
969    as_of: OffsetDateTime,
970) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
971    let version = binding_version_as_of(id, as_of)?;
972    resolve_trigger_binding_version(id, version)
973}
974
975#[allow(clippy::arc_with_non_send_sync)]
976pub fn resolve_live_or_as_of(
977    id: &str,
978    recorded: RecordedTriggerBinding,
979) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
980    match resolve_live_trigger_binding(id, Some(recorded.version)) {
981        Ok(binding) => Ok(binding),
982        Err(TriggerRegistryError::UnknownBindingVersion { .. }) => {
983            let binding = resolve_trigger_binding_as_of(id, recorded.received_at)?;
984            let mut metadata = BTreeMap::new();
985            metadata.insert("trigger_id".to_string(), serde_json::json!(id));
986            metadata.insert(
987                "recorded_version".to_string(),
988                serde_json::json!(recorded.version),
989            );
990            metadata.insert(
991                "received_at".to_string(),
992                serde_json::json!(recorded
993                    .received_at
994                    .format(&time::format_description::well_known::Rfc3339)
995                    .unwrap_or_else(|_| recorded.received_at.to_string())),
996            );
997            metadata.insert(
998                "resolved_version".to_string(),
999                serde_json::json!(binding.version),
1000            );
1001            crate::events::log_warn_meta(
1002                "replay.binding_version_gc_fallback",
1003                "trigger replay fell back to lifecycle history after binding version GC",
1004                metadata,
1005            );
1006            Ok(binding)
1007        }
1008        Err(error) => Err(error),
1009    }
1010}
1011
1012pub fn binding_version_as_of(id: &str, as_of: OffsetDateTime) -> Result<u32, TriggerRegistryError> {
1013    TRIGGER_REGISTRY.with(|slot| {
1014        let registry = slot.borrow();
1015        registry.binding_version_as_of(id, as_of)
1016    })
1017}
1018
1019#[allow(clippy::arc_with_non_send_sync)]
1020fn resolve_trigger_binding_version(
1021    id: &str,
1022    version: u32,
1023) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
1024    TRIGGER_REGISTRY.with(|slot| {
1025        let registry = slot.borrow();
1026        registry
1027            .binding(id, version)
1028            .ok_or_else(|| TriggerRegistryError::UnknownBindingVersion {
1029                id: id.to_string(),
1030                version,
1031            })
1032    })
1033}
1034
1035#[allow(clippy::arc_with_non_send_sync)]
1036pub fn resolve_live_trigger_binding(
1037    id: &str,
1038    version: Option<u32>,
1039) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
1040    TRIGGER_REGISTRY.with(|slot| {
1041        let registry = slot.borrow();
1042        if let Some(version) = version {
1043            let binding = registry.binding(id, version).ok_or_else(|| {
1044                TriggerRegistryError::UnknownBindingVersion {
1045                    id: id.to_string(),
1046                    version,
1047                }
1048            })?;
1049            if binding.state_snapshot() == TriggerState::Terminated {
1050                return Err(TriggerRegistryError::UnknownBindingVersion {
1051                    id: id.to_string(),
1052                    version,
1053                });
1054            }
1055            return Ok(binding);
1056        }
1057
1058        registry
1059            .live_bindings_any_source(id)
1060            .into_iter()
1061            .max_by_key(|binding| binding.version)
1062            .ok_or_else(|| TriggerRegistryError::UnknownId(id.to_string()))
1063    })
1064}
1065
1066/// Snapshot active channel-source trigger bindings whose `channel:` selector
1067/// in `match_events[0]` matches the supplied (scope, scope_id, name) tuple.
1068///
1069/// Used by [`crate::channels`] fan-out (CH-02 / #1872). Returns bindings
1070/// sorted by id then version so dispatch ordering is deterministic.
1071pub(crate) fn channel_bindings_matching(
1072    scope: &str,
1073    scope_id: &str,
1074    name: &str,
1075) -> Vec<Arc<TriggerBinding>> {
1076    TRIGGER_REGISTRY.with(|slot| {
1077        let registry = slot.borrow();
1078        let Some(binding_ids) = registry.by_provider.get("channel") else {
1079            return Vec::new();
1080        };
1081        let mut bindings = Vec::new();
1082        for id in binding_ids {
1083            let Some(versions) = registry.bindings.get(id) else {
1084                continue;
1085            };
1086            for binding in versions {
1087                if binding.state_snapshot() != TriggerState::Active {
1088                    continue;
1089                }
1090                let Some(selector_raw) = binding.match_events.first() else {
1091                    continue;
1092                };
1093                let Ok(selector) = crate::channels::ChannelSelector::parse(selector_raw) else {
1094                    continue;
1095                };
1096                // For tenant-default ("Current") selectors we resolve "current"
1097                // to the scope_id of the emit, since the trigger and the
1098                // emitter share the same registry / runtime context.
1099                if !selector.matches(scope, scope_id, name, scope_id) {
1100                    continue;
1101                }
1102                bindings.push(binding.clone());
1103            }
1104        }
1105        bindings.sort_by(|left, right| {
1106            left.id
1107                .as_str()
1108                .cmp(right.id.as_str())
1109                .then(left.version.cmp(&right.version))
1110        });
1111        bindings
1112    })
1113}
1114
1115pub(crate) fn matching_bindings(event: &super::TriggerEvent) -> Vec<Arc<TriggerBinding>> {
1116    TRIGGER_REGISTRY.with(|slot| {
1117        let registry = slot.borrow();
1118        let Some(binding_ids) = registry.by_provider.get(event.provider.as_str()) else {
1119            return Vec::new();
1120        };
1121
1122        let mut bindings = Vec::new();
1123        for id in binding_ids {
1124            let Some(versions) = registry.bindings.get(id) else {
1125                continue;
1126            };
1127            for binding in versions {
1128                if binding.state_snapshot() != TriggerState::Active {
1129                    continue;
1130                }
1131                if !binding.match_events.is_empty()
1132                    && !binding.match_events.iter().any(|kind| kind == &event.kind)
1133                {
1134                    continue;
1135                }
1136                bindings.push(binding.clone());
1137            }
1138        }
1139
1140        bindings.sort_by(|left, right| {
1141            left.id
1142                .as_str()
1143                .cmp(right.id.as_str())
1144                .then(left.version.cmp(&right.version))
1145        });
1146        bindings
1147    })
1148}
1149
1150pub async fn install_manifest_triggers(
1151    specs: Vec<TriggerBindingSpec>,
1152) -> Result<(), TriggerRegistryError> {
1153    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1154        let registry = &mut *slot.borrow_mut();
1155        registry.refresh_runtime_context();
1156        let mut touched_ids = BTreeSet::new();
1157
1158        let mut incoming = BTreeMap::new();
1159        for spec in specs {
1160            let spec_id = spec.id.clone();
1161            if spec.source != TriggerBindingSource::Manifest {
1162                return Err(TriggerRegistryError::InvalidSpec(format!(
1163                    "manifest install received non-manifest trigger '{}'",
1164                    spec_id
1165                )));
1166            }
1167            if spec_id.trim().is_empty() {
1168                return Err(TriggerRegistryError::InvalidSpec(
1169                    "manifest trigger id cannot be empty".to_string(),
1170                ));
1171            }
1172            if incoming.insert(spec_id.clone(), spec).is_some() {
1173                return Err(TriggerRegistryError::DuplicateId(spec_id));
1174            }
1175        }
1176
1177        let mut lifecycle = Vec::new();
1178        let existing_ids: Vec<String> = registry
1179            .bindings
1180            .iter()
1181            .filter(|(_, bindings)| {
1182                bindings.iter().any(|binding| {
1183                    binding.source == TriggerBindingSource::Manifest
1184                        && binding.state_snapshot() != TriggerState::Terminated
1185                })
1186            })
1187            .map(|(id, _)| id.clone())
1188            .collect();
1189
1190        for id in existing_ids {
1191            let live_manifest = registry.live_bindings(&id, TriggerBindingSource::Manifest);
1192            let Some(spec) = incoming.remove(&id) else {
1193                for binding in live_manifest {
1194                    registry.transition_binding_to_draining(&binding, &mut lifecycle);
1195                }
1196                touched_ids.insert(id.clone());
1197                continue;
1198            };
1199
1200            let has_matching_active = live_manifest.iter().any(|binding| {
1201                binding.definition_fingerprint == spec.definition_fingerprint
1202                    && matches!(
1203                        binding.state_snapshot(),
1204                        TriggerState::Registering | TriggerState::Active
1205                    )
1206            });
1207            if has_matching_active {
1208                continue;
1209            }
1210
1211            for binding in live_manifest {
1212                registry.transition_binding_to_draining(&binding, &mut lifecycle);
1213            }
1214
1215            let version = registry.next_version_for_spec(&spec);
1216            registry.register_binding(spec, version, &mut lifecycle);
1217            touched_ids.insert(id.clone());
1218        }
1219
1220        for spec in incoming.into_values() {
1221            touched_ids.insert(spec.id.clone());
1222            let version = registry.next_version_for_spec(&spec);
1223            registry.register_binding(spec, version, &mut lifecycle);
1224        }
1225
1226        for id in touched_ids {
1227            registry.gc_terminated_versions(&id);
1228        }
1229
1230        Ok((registry.event_log.clone(), lifecycle))
1231    })?;
1232
1233    append_lifecycle_events(event_log, events).await
1234}
1235
1236pub async fn dynamic_register(
1237    mut spec: TriggerBindingSpec,
1238) -> Result<TriggerId, TriggerRegistryError> {
1239    if spec.id.trim().is_empty() {
1240        spec.id = format!("dynamic_trigger_{}", Uuid::now_v7());
1241    }
1242    spec.source = TriggerBindingSource::Dynamic;
1243    let id = spec.id.clone();
1244    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1245        let registry = &mut *slot.borrow_mut();
1246        registry.refresh_runtime_context();
1247
1248        if registry.bindings.contains_key(id.as_str()) {
1249            return Err(TriggerRegistryError::DuplicateId(id.clone()));
1250        }
1251
1252        let mut lifecycle = Vec::new();
1253        let version = registry.next_version_for_spec(&spec);
1254        registry.register_binding(spec, version, &mut lifecycle);
1255        Ok((registry.event_log.clone(), lifecycle))
1256    })?;
1257
1258    append_lifecycle_events(event_log, events).await?;
1259    Ok(TriggerId::new(id))
1260}
1261
1262pub async fn dynamic_deregister(id: &str) -> Result<(), TriggerRegistryError> {
1263    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1264        let registry = &mut *slot.borrow_mut();
1265        let live_dynamic = registry.live_bindings(id, TriggerBindingSource::Dynamic);
1266        if live_dynamic.is_empty() {
1267            return Err(TriggerRegistryError::UnknownId(id.to_string()));
1268        }
1269
1270        let mut lifecycle = Vec::new();
1271        for binding in live_dynamic {
1272            registry.transition_binding_to_draining(&binding, &mut lifecycle);
1273        }
1274        Ok((registry.event_log.clone(), lifecycle))
1275    })?;
1276
1277    append_lifecycle_events(event_log, events).await
1278}
1279
1280pub async fn drain(id: &str) -> Result<(), TriggerRegistryError> {
1281    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1282        let registry = &mut *slot.borrow_mut();
1283        let live = registry.live_bindings_any_source(id);
1284        if live.is_empty() {
1285            return Err(TriggerRegistryError::UnknownId(id.to_string()));
1286        }
1287
1288        let mut lifecycle = Vec::new();
1289        for binding in live {
1290            registry.transition_binding_to_draining(&binding, &mut lifecycle);
1291        }
1292        Ok((registry.event_log.clone(), lifecycle))
1293    })?;
1294
1295    append_lifecycle_events(event_log, events).await
1296}
1297
1298pub async fn pause(id: &str) -> Result<(), TriggerRegistryError> {
1299    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1300        let registry = &mut *slot.borrow_mut();
1301        let live = registry.live_bindings_any_source(id);
1302        if live.is_empty() {
1303            return Err(TriggerRegistryError::UnknownId(id.to_string()));
1304        }
1305
1306        let mut lifecycle = Vec::new();
1307        for binding in live {
1308            match binding.state_snapshot() {
1309                TriggerState::Registering | TriggerState::Active => {
1310                    registry.transition_binding_state(
1311                        &binding,
1312                        TriggerState::Paused,
1313                        &mut lifecycle,
1314                    );
1315                }
1316                TriggerState::Paused | TriggerState::Draining | TriggerState::Terminated => {}
1317            }
1318        }
1319        Ok((registry.event_log.clone(), lifecycle))
1320    })?;
1321
1322    append_lifecycle_events(event_log, events).await
1323}
1324
1325pub async fn resume(id: &str) -> Result<(), TriggerRegistryError> {
1326    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1327        let registry = &mut *slot.borrow_mut();
1328        let live = registry.live_bindings_any_source(id);
1329        if live.is_empty() {
1330            return Err(TriggerRegistryError::UnknownId(id.to_string()));
1331        }
1332
1333        let mut lifecycle = Vec::new();
1334        for binding in live {
1335            if binding.state_snapshot() == TriggerState::Paused {
1336                registry.transition_binding_state(&binding, TriggerState::Active, &mut lifecycle);
1337            }
1338        }
1339        Ok((registry.event_log.clone(), lifecycle))
1340    })?;
1341
1342    append_lifecycle_events(event_log, events).await
1343}
1344
1345fn pin_trigger_binding_inner(
1346    id: &str,
1347    version: u32,
1348    allow_terminated: bool,
1349) -> Result<(), TriggerRegistryError> {
1350    TRIGGER_REGISTRY.with(|slot| {
1351        let registry = slot.borrow();
1352        let binding = registry.binding(id, version).ok_or_else(|| {
1353            TriggerRegistryError::UnknownBindingVersion {
1354                id: id.to_string(),
1355                version,
1356            }
1357        })?;
1358        match binding.state_snapshot() {
1359            TriggerState::Paused => Err(TriggerRegistryError::InvalidSpec(format!(
1360                "trigger binding '{}' version {} is paused",
1361                id, version
1362            ))),
1363            TriggerState::Terminated if !allow_terminated => {
1364                Err(TriggerRegistryError::InvalidSpec(format!(
1365                    "trigger binding '{}' version {} is terminated",
1366                    id, version
1367                )))
1368            }
1369            _ => {
1370                binding.in_flight.fetch_add(1, Ordering::Relaxed);
1371                Ok(())
1372            }
1373        }
1374    })
1375}
1376
1377pub fn pin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1378    pin_trigger_binding_inner(id, version, false)
1379}
1380
1381pub async fn unpin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1382    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1383        let registry = &mut *slot.borrow_mut();
1384        let binding = registry.binding(id, version).ok_or_else(|| {
1385            TriggerRegistryError::UnknownBindingVersion {
1386                id: id.to_string(),
1387                version,
1388            }
1389        })?;
1390        let current = binding.in_flight.load(Ordering::Relaxed);
1391        if current == 0 {
1392            return Err(TriggerRegistryError::InvalidSpec(format!(
1393                "trigger binding '{}' version {} has no in-flight events",
1394                id, version
1395            )));
1396        }
1397        binding.in_flight.fetch_sub(1, Ordering::Relaxed);
1398
1399        let mut lifecycle = Vec::new();
1400        registry.maybe_finalize_draining(&binding, &mut lifecycle);
1401        registry.gc_terminated_versions(binding.id.as_str());
1402        Ok((registry.event_log.clone(), lifecycle))
1403    })?;
1404
1405    append_lifecycle_events(event_log, events).await
1406}
1407
1408pub fn begin_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1409    begin_in_flight_inner(id, version, false)
1410}
1411
1412pub(crate) fn begin_replay_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1413    begin_in_flight_inner(id, version, true)
1414}
1415
1416fn begin_in_flight_inner(
1417    id: &str,
1418    version: u32,
1419    allow_terminated: bool,
1420) -> Result<(), TriggerRegistryError> {
1421    pin_trigger_binding_inner(id, version, allow_terminated)?;
1422    TRIGGER_REGISTRY.with(|slot| {
1423        let registry = slot.borrow();
1424        let binding = registry.binding(id, version).ok_or_else(|| {
1425            TriggerRegistryError::UnknownBindingVersion {
1426                id: id.to_string(),
1427                version,
1428            }
1429        })?;
1430        binding.metrics.received.fetch_add(1, Ordering::Relaxed);
1431        *binding
1432            .metrics
1433            .last_received_ms
1434            .lock()
1435            .expect("trigger metrics poisoned") = Some(now_ms());
1436        Ok(())
1437    })
1438}
1439
1440pub async fn finish_in_flight(
1441    id: &str,
1442    version: u32,
1443    outcome: TriggerDispatchOutcome,
1444) -> Result<(), TriggerRegistryError> {
1445    TRIGGER_REGISTRY.with(|slot| {
1446        let registry = &mut *slot.borrow_mut();
1447        let binding = registry.binding(id, version).ok_or_else(|| {
1448            TriggerRegistryError::UnknownBindingVersion {
1449                id: id.to_string(),
1450                version,
1451            }
1452        })?;
1453        let current = binding.in_flight.load(Ordering::Relaxed);
1454        if current == 0 {
1455            return Err(TriggerRegistryError::InvalidSpec(format!(
1456                "trigger binding '{}' version {} has no in-flight events",
1457                id, version
1458            )));
1459        }
1460        match outcome {
1461            TriggerDispatchOutcome::Dispatched => {
1462                binding.metrics.dispatched.fetch_add(1, Ordering::Relaxed);
1463            }
1464            TriggerDispatchOutcome::Failed => {
1465                binding.metrics.failed.fetch_add(1, Ordering::Relaxed);
1466            }
1467            TriggerDispatchOutcome::Dlq => {
1468                binding.metrics.dlq.fetch_add(1, Ordering::Relaxed);
1469            }
1470        }
1471        Ok(())
1472    })?;
1473
1474    unpin_trigger_binding(id, version).await
1475}
1476
1477impl TriggerRegistry {
1478    fn refresh_runtime_context(&mut self) {
1479        if self.event_log.is_none() {
1480            self.event_log = active_event_log();
1481        }
1482        if self.secret_provider.is_none() {
1483            self.secret_provider = default_secret_provider();
1484        }
1485    }
1486
1487    fn binding(&self, id: &str, version: u32) -> Option<Arc<TriggerBinding>> {
1488        self.bindings
1489            .get(id)
1490            .and_then(|bindings| bindings.iter().find(|binding| binding.version == version))
1491            .cloned()
1492    }
1493
1494    fn live_bindings(&self, id: &str, source: TriggerBindingSource) -> Vec<Arc<TriggerBinding>> {
1495        self.bindings
1496            .get(id)
1497            .into_iter()
1498            .flat_map(|bindings| bindings.iter())
1499            .filter(|binding| {
1500                binding.source == source && binding.state_snapshot() != TriggerState::Terminated
1501            })
1502            .cloned()
1503            .collect()
1504    }
1505
1506    fn live_bindings_any_source(&self, id: &str) -> Vec<Arc<TriggerBinding>> {
1507        self.bindings
1508            .get(id)
1509            .into_iter()
1510            .flat_map(|bindings| bindings.iter())
1511            .filter(|binding| binding.state_snapshot() != TriggerState::Terminated)
1512            .cloned()
1513            .collect()
1514    }
1515
1516    fn next_version_for_spec(&self, spec: &TriggerBindingSpec) -> u32 {
1517        if let Some(version) = self
1518            .bindings
1519            .get(spec.id.as_str())
1520            .into_iter()
1521            .flat_map(|bindings| bindings.iter())
1522            .find(|binding| binding.definition_fingerprint == spec.definition_fingerprint)
1523            .map(|binding| binding.version)
1524        {
1525            return version;
1526        }
1527
1528        let historical =
1529            self.historical_versions_for(spec.id.as_str(), spec.definition_fingerprint.as_str());
1530        if let Some(version) = historical.matching_version {
1531            return version;
1532        }
1533
1534        self.bindings
1535            .get(spec.id.as_str())
1536            .into_iter()
1537            .flat_map(|bindings| bindings.iter())
1538            .map(|binding| binding.version)
1539            .chain(historical.max_version)
1540            .max()
1541            .unwrap_or(0)
1542            + 1
1543    }
1544
1545    fn gc_terminated_versions(&mut self, id: &str) {
1546        let Some(bindings) = self.bindings.get_mut(id) else {
1547            return;
1548        };
1549
1550        let mut newest_versions: Vec<u32> =
1551            bindings.iter().map(|binding| binding.version).collect();
1552        newest_versions.sort_unstable_by(|left, right| right.cmp(left));
1553        newest_versions.truncate(TERMINATED_VERSION_RETENTION_LIMIT);
1554        let retained_versions: BTreeSet<u32> = newest_versions.into_iter().collect();
1555
1556        bindings.retain(|binding| {
1557            binding.state_snapshot() != TriggerState::Terminated
1558                || retained_versions.contains(&binding.version)
1559        });
1560
1561        if bindings.is_empty() {
1562            self.bindings.remove(id);
1563        }
1564    }
1565
1566    fn historical_versions_for(&self, id: &str, fingerprint: &str) -> HistoricalVersionLookup {
1567        let mut lookup = HistoricalVersionLookup::default();
1568        for record in self.lifecycle_records_for(id) {
1569            lookup.max_version = Some(
1570                lookup
1571                    .max_version
1572                    .unwrap_or(0)
1573                    .max(record.transition.version),
1574            );
1575            if record.transition.definition_fingerprint.as_deref() == Some(fingerprint) {
1576                lookup.matching_version = Some(record.transition.version);
1577            }
1578        }
1579        lookup
1580    }
1581
1582    fn binding_version_as_of(
1583        &self,
1584        id: &str,
1585        as_of: OffsetDateTime,
1586    ) -> Result<u32, TriggerRegistryError> {
1587        let cutoff_ms = (as_of.unix_timestamp_nanos() / 1_000_000) as i64;
1588        let mut active_version = None;
1589        for record in self.lifecycle_records_for(id) {
1590            if record.occurred_at_ms > cutoff_ms {
1591                break;
1592            }
1593            match record.transition.to_state {
1594                TriggerState::Active => active_version = Some(record.transition.version),
1595                TriggerState::Paused | TriggerState::Draining | TriggerState::Terminated => {
1596                    if active_version == Some(record.transition.version) {
1597                        active_version = None;
1598                    }
1599                }
1600                TriggerState::Registering => {}
1601            }
1602        }
1603
1604        active_version.ok_or_else(|| {
1605            TriggerRegistryError::InvalidSpec(format!(
1606                "no active trigger binding '{}' at {}",
1607                id,
1608                as_of
1609                    .format(&time::format_description::well_known::Rfc3339)
1610                    .unwrap_or_else(|_| as_of.to_string())
1611            ))
1612        })
1613    }
1614
1615    fn lifecycle_records_for(&self, id: &str) -> Vec<HistoricalLifecycleRecord> {
1616        let Some(event_log) = self.event_log.as_ref() else {
1617            return Vec::new();
1618        };
1619        let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1620            .expect("static triggers.lifecycle topic should always be valid");
1621        futures::executor::block_on(event_log.read_range(&topic, None, usize::MAX))
1622            .unwrap_or_default()
1623            .into_iter()
1624            .filter_map(|(_, event)| {
1625                let occurred_at_ms = event.occurred_at_ms;
1626                let transition: LifecycleStateTransitionRecord =
1627                    serde_json::from_value(event.payload).ok()?;
1628                (transition.id == id).then_some(HistoricalLifecycleRecord {
1629                    occurred_at_ms,
1630                    transition,
1631                })
1632            })
1633            .collect()
1634    }
1635
1636    #[allow(clippy::arc_with_non_send_sync)]
1637    fn register_binding(
1638        &mut self,
1639        spec: TriggerBindingSpec,
1640        version: u32,
1641        lifecycle: &mut Vec<LogEvent>,
1642    ) -> Arc<TriggerBinding> {
1643        let binding = Arc::new(TriggerBinding::new(spec, version));
1644        self.by_provider
1645            .entry(binding.provider.as_str().to_string())
1646            .or_default()
1647            .insert(binding.id.as_str().to_string());
1648        self.bindings
1649            .entry(binding.id.as_str().to_string())
1650            .or_default()
1651            .push(binding.clone());
1652        lifecycle.push(lifecycle_event(&binding, None, TriggerState::Registering));
1653        self.transition_binding_state(&binding, TriggerState::Active, lifecycle);
1654        binding
1655    }
1656
1657    fn transition_binding_to_draining(
1658        &self,
1659        binding: &Arc<TriggerBinding>,
1660        lifecycle: &mut Vec<LogEvent>,
1661    ) {
1662        if matches!(binding.state_snapshot(), TriggerState::Terminated) {
1663            return;
1664        }
1665        self.transition_binding_state(binding, TriggerState::Draining, lifecycle);
1666        self.maybe_finalize_draining(binding, lifecycle);
1667    }
1668
1669    fn maybe_finalize_draining(
1670        &self,
1671        binding: &Arc<TriggerBinding>,
1672        lifecycle: &mut Vec<LogEvent>,
1673    ) {
1674        if binding.state_snapshot() == TriggerState::Draining
1675            && binding.in_flight.load(Ordering::Relaxed) == 0
1676        {
1677            self.transition_binding_state(binding, TriggerState::Terminated, lifecycle);
1678        }
1679    }
1680
1681    fn transition_binding_state(
1682        &self,
1683        binding: &Arc<TriggerBinding>,
1684        next: TriggerState,
1685        lifecycle: &mut Vec<LogEvent>,
1686    ) {
1687        let mut state = binding.state.lock().expect("trigger state poisoned");
1688        let previous = *state;
1689        if previous == next {
1690            return;
1691        }
1692        *state = next;
1693        drop(state);
1694        // CH-04 (#1875): drop any pending aggregation buffers when the
1695        // binding terminates so a long-lived buffer doesn't fire after
1696        // the trigger has been removed. Leftover events are discarded —
1697        // matches the in-flight drain contract elsewhere in the
1698        // registry (the trigger is gone; we'd have nowhere to dispatch).
1699        if next == TriggerState::Terminated && binding.aggregation.is_some() {
1700            let _ = super::aggregation::drop_binding_aggregation(&binding.binding_key());
1701        }
1702        lifecycle.push(lifecycle_event(binding, Some(previous), next));
1703    }
1704}
1705
1706fn lifecycle_event(
1707    binding: &TriggerBinding,
1708    from_state: Option<TriggerState>,
1709    to_state: TriggerState,
1710) -> LogEvent {
1711    LogEvent::new(
1712        "state_transition",
1713        serde_json::json!({
1714            "id": binding.id.as_str(),
1715            "binding_key": binding.binding_key(),
1716            "version": binding.version,
1717            "provider": binding.provider.as_str(),
1718            "kind": &binding.kind,
1719            "source": binding.source.as_str(),
1720            "handler_kind": binding.handler.kind(),
1721            "definition_fingerprint": &binding.definition_fingerprint,
1722            "from_state": from_state.map(TriggerState::as_str),
1723            "to_state": to_state.as_str(),
1724        }),
1725    )
1726}
1727
1728async fn append_lifecycle_events(
1729    event_log: Option<Arc<AnyEventLog>>,
1730    events: Vec<LogEvent>,
1731) -> Result<(), TriggerRegistryError> {
1732    let Some(event_log) = event_log else {
1733        return Ok(());
1734    };
1735    if events.is_empty() {
1736        return Ok(());
1737    }
1738
1739    let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1740        .expect("static triggers.lifecycle topic should always be valid");
1741    for event in events {
1742        event_log
1743            .append(&topic, event)
1744            .await
1745            .map_err(|error| TriggerRegistryError::EventLog(error.to_string()))?;
1746    }
1747    Ok(())
1748}
1749
1750fn default_secret_provider() -> Option<Arc<dyn SecretProvider>> {
1751    configured_default_chain(default_secret_namespace())
1752        .ok()
1753        .map(|provider| Arc::new(provider) as Arc<dyn SecretProvider>)
1754}
1755
1756fn default_secret_namespace() -> String {
1757    if let Ok(namespace) = std::env::var("HARN_SECRET_NAMESPACE") {
1758        if !namespace.trim().is_empty() {
1759            return namespace;
1760        }
1761    }
1762
1763    let cwd = std::env::current_dir().unwrap_or_default();
1764    let leaf = cwd
1765        .file_name()
1766        .and_then(|name| name.to_str())
1767        .filter(|name| !name.is_empty())
1768        .unwrap_or("workspace");
1769    format!("harn/{leaf}")
1770}
1771
1772fn now_ms() -> i64 {
1773    clock::now_ms()
1774}
1775
1776#[cfg(test)]
1777mod tests {
1778    use super::*;
1779    use crate::event_log::{install_default_for_base_dir, reset_active_event_log};
1780    use crate::events::{add_event_sink, clear_event_sinks, CollectorSink, EventLevel};
1781    use crate::triggers::test_util::timing::FILE_WATCH_FALLBACK_POLL;
1782    use std::rc::Rc;
1783    use time::OffsetDateTime;
1784
1785    fn manifest_spec(id: &str, fingerprint: &str) -> TriggerBindingSpec {
1786        TriggerBindingSpec {
1787            id: id.to_string(),
1788            source: TriggerBindingSource::Manifest,
1789            kind: "webhook".to_string(),
1790            provider: ProviderId::from("github"),
1791            autonomy_tier: crate::AutonomyTier::ActAuto,
1792            handler: TriggerHandlerSpec::Worker {
1793                queue: format!("{id}-queue"),
1794            },
1795            dispatch_priority: crate::WorkerQueuePriority::Normal,
1796            when: None,
1797            when_budget: None,
1798            retry: TriggerRetryConfig::default(),
1799            match_events: vec!["issues.opened".to_string()],
1800            dedupe_key: Some("event.dedupe_key".to_string()),
1801            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1802            filter: Some("event.kind".to_string()),
1803            daily_cost_usd: Some(5.0),
1804            hourly_cost_usd: None,
1805            max_autonomous_decisions_per_hour: None,
1806            max_autonomous_decisions_per_day: None,
1807            on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1808            max_concurrent: Some(10),
1809            flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1810            aggregation: None,
1811            manifest_path: None,
1812            package_name: Some("workspace".to_string()),
1813            definition_fingerprint: fingerprint.to_string(),
1814        }
1815    }
1816
1817    fn dynamic_spec(id: &str) -> TriggerBindingSpec {
1818        TriggerBindingSpec {
1819            id: id.to_string(),
1820            source: TriggerBindingSource::Dynamic,
1821            kind: "webhook".to_string(),
1822            provider: ProviderId::from("github"),
1823            autonomy_tier: crate::AutonomyTier::ActAuto,
1824            handler: TriggerHandlerSpec::Worker {
1825                queue: format!("{id}-queue"),
1826            },
1827            dispatch_priority: crate::WorkerQueuePriority::Normal,
1828            when: None,
1829            when_budget: None,
1830            retry: TriggerRetryConfig::default(),
1831            match_events: vec!["issues.opened".to_string()],
1832            dedupe_key: None,
1833            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1834            filter: None,
1835            daily_cost_usd: None,
1836            hourly_cost_usd: None,
1837            max_autonomous_decisions_per_hour: None,
1838            max_autonomous_decisions_per_day: None,
1839            on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1840            max_concurrent: None,
1841            flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1842            aggregation: None,
1843            manifest_path: None,
1844            package_name: None,
1845            definition_fingerprint: format!("dynamic:{id}"),
1846        }
1847    }
1848
1849    #[tokio::test(flavor = "current_thread")]
1850    async fn manifest_loaded_trigger_registers_with_zeroed_metrics() {
1851        clear_trigger_registry();
1852
1853        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1854            .await
1855            .expect("manifest trigger installs");
1856
1857        let snapshots = snapshot_trigger_bindings();
1858        assert_eq!(snapshots.len(), 1);
1859        let binding = &snapshots[0];
1860        assert_eq!(binding.id, "github-new-issue");
1861        assert_eq!(binding.version, 1);
1862        assert_eq!(binding.state, TriggerState::Active);
1863        assert_eq!(binding.metrics, TriggerMetricsSnapshot::default());
1864
1865        clear_trigger_registry();
1866    }
1867
1868    #[tokio::test(flavor = "current_thread")]
1869    async fn dynamic_register_assigns_unique_ids_and_rejects_duplicates() {
1870        clear_trigger_registry();
1871
1872        let first = dynamic_register(dynamic_spec("dynamic-a"))
1873            .await
1874            .expect("first dynamic trigger");
1875        let second = dynamic_register(dynamic_spec("dynamic-b"))
1876            .await
1877            .expect("second dynamic trigger");
1878        assert_ne!(first, second);
1879
1880        let error = dynamic_register(dynamic_spec("dynamic-a"))
1881            .await
1882            .expect_err("duplicate id should fail");
1883        assert!(matches!(error, TriggerRegistryError::DuplicateId(_)));
1884
1885        clear_trigger_registry();
1886    }
1887
1888    #[tokio::test(flavor = "current_thread")]
1889    async fn drain_waits_for_in_flight_events_before_terminating() {
1890        clear_trigger_registry();
1891
1892        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1893            .await
1894            .expect("manifest trigger installs");
1895        begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1896
1897        drain("github-new-issue").await.expect("drain succeeds");
1898        let binding = snapshot_trigger_bindings()
1899            .into_iter()
1900            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1901            .expect("binding snapshot");
1902        assert_eq!(binding.state, TriggerState::Draining);
1903        assert_eq!(binding.metrics.in_flight, 1);
1904
1905        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1906            .await
1907            .expect("finish in-flight event");
1908        let binding = snapshot_trigger_bindings()
1909            .into_iter()
1910            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1911            .expect("binding snapshot");
1912        assert_eq!(binding.state, TriggerState::Terminated);
1913        assert_eq!(binding.metrics.in_flight, 0);
1914
1915        clear_trigger_registry();
1916    }
1917
1918    #[tokio::test(flavor = "current_thread")]
1919    async fn hot_reload_registers_new_version_while_old_binding_drains() {
1920        clear_trigger_registry();
1921
1922        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1923            .await
1924            .expect("initial manifest trigger installs");
1925        begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1926
1927        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1928            .await
1929            .expect("updated manifest trigger installs");
1930
1931        let snapshots = snapshot_trigger_bindings();
1932        assert_eq!(snapshots.len(), 2);
1933        let old = snapshots
1934            .iter()
1935            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1936            .expect("old binding");
1937        let new = snapshots
1938            .iter()
1939            .find(|binding| binding.id == "github-new-issue" && binding.version == 2)
1940            .expect("new binding");
1941        assert_eq!(old.state, TriggerState::Draining);
1942        assert_eq!(new.state, TriggerState::Active);
1943
1944        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1945            .await
1946            .expect("finish old in-flight event");
1947        let old = snapshot_trigger_bindings()
1948            .into_iter()
1949            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1950            .expect("old binding");
1951        assert_eq!(old.state, TriggerState::Terminated);
1952
1953        clear_trigger_registry();
1954    }
1955
1956    #[tokio::test(flavor = "current_thread")]
1957    async fn gc_drops_terminated_versions_beyond_retention_limit() {
1958        clear_trigger_registry();
1959
1960        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1961            .await
1962            .expect("install v1");
1963        begin_in_flight("github-new-issue", 1).expect("pin v1");
1964
1965        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1966            .await
1967            .expect("install v2");
1968        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1969            .await
1970            .expect("finish v1");
1971
1972        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
1973            .await
1974            .expect("install v3");
1975
1976        let snapshots = snapshot_trigger_bindings();
1977        let versions: Vec<u32> = snapshots
1978            .into_iter()
1979            .filter(|binding| binding.id == "github-new-issue")
1980            .map(|binding| binding.version)
1981            .collect();
1982        assert_eq!(versions, vec![2, 3]);
1983
1984        clear_trigger_registry();
1985    }
1986
1987    #[tokio::test(flavor = "current_thread")]
1988    async fn lifecycle_transitions_append_to_event_log() {
1989        clear_trigger_registry();
1990        reset_active_event_log();
1991        let tempdir = tempfile::tempdir().expect("tempdir");
1992        let log = install_default_for_base_dir(tempdir.path()).expect("install event log");
1993
1994        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1995            .await
1996            .expect("manifest trigger installs");
1997        begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1998        drain("github-new-issue").await.expect("drain succeeds");
1999        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
2000            .await
2001            .expect("finish event");
2002
2003        let topic = Topic::new("triggers.lifecycle").expect("valid lifecycle topic");
2004        let events = log
2005            .read_range(&topic, None, 32)
2006            .await
2007            .expect("read lifecycle events");
2008        let states: Vec<String> = events
2009            .into_iter()
2010            .filter_map(|(_, event)| {
2011                event
2012                    .payload
2013                    .get("to_state")
2014                    .and_then(|value| value.as_str())
2015                    .map(|value| value.to_string())
2016            })
2017            .collect();
2018        assert_eq!(
2019            states,
2020            vec![
2021                "registering".to_string(),
2022                "active".to_string(),
2023                "draining".to_string(),
2024                "terminated".to_string(),
2025            ]
2026        );
2027
2028        reset_active_event_log();
2029        clear_trigger_registry();
2030    }
2031
2032    #[tokio::test(flavor = "current_thread")]
2033    async fn version_history_reuses_historical_version_after_restart() {
2034        clear_trigger_registry();
2035        reset_active_event_log();
2036        let tempdir = tempfile::tempdir().expect("tempdir");
2037        install_default_for_base_dir(tempdir.path()).expect("install event log");
2038
2039        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2040            .await
2041            .expect("initial manifest trigger installs");
2042        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2043            .await
2044            .expect("updated manifest trigger installs");
2045
2046        clear_trigger_registry();
2047        reset_active_event_log();
2048        install_default_for_base_dir(tempdir.path()).expect("reopen event log");
2049
2050        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2051            .await
2052            .expect("manifest reload reuses historical version");
2053
2054        let binding = snapshot_trigger_bindings()
2055            .into_iter()
2056            .find(|binding| binding.id == "github-new-issue")
2057            .expect("binding snapshot");
2058        assert_eq!(binding.version, 2);
2059
2060        reset_active_event_log();
2061        clear_trigger_registry();
2062    }
2063
2064    #[tokio::test(flavor = "current_thread")]
2065    async fn binding_version_as_of_reports_historical_active_version() {
2066        clear_trigger_registry();
2067        reset_active_event_log();
2068        let tempdir = tempfile::tempdir().expect("tempdir");
2069        install_default_for_base_dir(tempdir.path()).expect("install event log");
2070
2071        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2072            .await
2073            .expect("initial manifest trigger installs");
2074        let before_reload = OffsetDateTime::now_utc();
2075        std::thread::sleep(FILE_WATCH_FALLBACK_POLL);
2076
2077        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2078            .await
2079            .expect("updated manifest trigger installs");
2080        let after_reload = OffsetDateTime::now_utc();
2081
2082        assert_eq!(
2083            binding_version_as_of("github-new-issue", before_reload)
2084                .expect("version before reload"),
2085            1
2086        );
2087        assert_eq!(
2088            binding_version_as_of("github-new-issue", after_reload).expect("version after reload"),
2089            2
2090        );
2091
2092        reset_active_event_log();
2093        clear_trigger_registry();
2094    }
2095
2096    #[tokio::test(flavor = "current_thread")]
2097    async fn resolve_live_or_as_of_logs_structured_gc_fallback() {
2098        clear_trigger_registry();
2099        reset_active_event_log();
2100        let sink = Rc::new(CollectorSink::new());
2101        clear_event_sinks();
2102        add_event_sink(sink.clone());
2103        let tempdir = tempfile::tempdir().expect("tempdir");
2104        install_default_for_base_dir(tempdir.path()).expect("install event log");
2105
2106        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2107            .await
2108            .expect("install v1");
2109        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2110            .await
2111            .expect("install v2");
2112        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
2113            .await
2114            .expect("install v3");
2115        let received_at = OffsetDateTime::now_utc();
2116        std::thread::sleep(FILE_WATCH_FALLBACK_POLL);
2117        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v4")])
2118            .await
2119            .expect("install v4");
2120
2121        let binding = resolve_live_or_as_of(
2122            "github-new-issue",
2123            RecordedTriggerBinding {
2124                version: 1,
2125                received_at,
2126            },
2127        )
2128        .expect("resolve fallback binding");
2129        assert_eq!(binding.version, 3);
2130
2131        let warning = sink
2132            .logs
2133            .borrow()
2134            .iter()
2135            .find(|log| log.category == "replay.binding_version_gc_fallback")
2136            .cloned()
2137            .expect("gc fallback warning");
2138        assert_eq!(warning.level, EventLevel::Warn);
2139        assert_eq!(
2140            warning.metadata.get("trigger_id"),
2141            Some(&serde_json::json!("github-new-issue"))
2142        );
2143        assert_eq!(
2144            warning.metadata.get("recorded_version"),
2145            Some(&serde_json::json!(1))
2146        );
2147        assert_eq!(
2148            warning.metadata.get("received_at"),
2149            Some(&serde_json::json!(received_at
2150                .format(&time::format_description::well_known::Rfc3339)
2151                .unwrap_or_else(|_| received_at.to_string())))
2152        );
2153        assert_eq!(
2154            warning.metadata.get("resolved_version"),
2155            Some(&serde_json::json!(3))
2156        );
2157
2158        clear_event_sinks();
2159        crate::events::reset_event_sinks();
2160        reset_active_event_log();
2161        clear_trigger_registry();
2162    }
2163}