Skip to main content

harn_vm/triggers/
registry.rs

1use serde::{Deserialize, Serialize};
2use std::cell::RefCell;
3use std::collections::{BTreeMap, BTreeSet, VecDeque};
4use std::path::PathBuf;
5use std::rc::Rc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::{Arc, Mutex};
8
9use time::OffsetDateTime;
10use uuid::Uuid;
11
12use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogEvent, Topic};
13use crate::llm::trigger_predicate::TriggerPredicateBudget;
14use crate::secrets::{configured_default_chain, SecretProvider};
15use crate::triggers::test_util::clock;
16use crate::trust_graph::AutonomyTier;
17use crate::value::VmClosure;
18
19use super::aggregation::TriggerAggregationConfig;
20use super::dispatcher::TriggerRetryConfig;
21use super::flow_control::TriggerFlowControlConfig;
22use super::ProviderId;
23
24#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
25pub struct TriggerId(String);
26
27impl TriggerId {
28    pub fn new(value: impl Into<String>) -> Self {
29        Self(value.into())
30    }
31
32    pub fn as_str(&self) -> &str {
33        &self.0
34    }
35}
36
37impl std::fmt::Display for TriggerId {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        self.0.fmt(f)
40    }
41}
42
43#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
44#[serde(rename_all = "snake_case")]
45pub enum TriggerState {
46    Registering,
47    Active,
48    Paused,
49    Draining,
50    Terminated,
51}
52
53impl TriggerState {
54    pub fn as_str(self) -> &'static str {
55        match self {
56            Self::Registering => "registering",
57            Self::Active => "active",
58            Self::Paused => "paused",
59            Self::Draining => "draining",
60            Self::Terminated => "terminated",
61        }
62    }
63}
64
65#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
66#[serde(rename_all = "snake_case")]
67pub enum TriggerBindingSource {
68    Manifest,
69    Dynamic,
70}
71
72impl TriggerBindingSource {
73    pub fn as_str(self) -> &'static str {
74        match self {
75            Self::Manifest => "manifest",
76            Self::Dynamic => "dynamic",
77        }
78    }
79}
80
81#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
82#[serde(rename_all = "snake_case")]
83pub enum TriggerBudgetExhaustionStrategy {
84    #[default]
85    False,
86    RetryLater,
87    Fail,
88    Warn,
89}
90
91impl TriggerBudgetExhaustionStrategy {
92    pub fn as_str(self) -> &'static str {
93        match self {
94            Self::False => "false",
95            Self::RetryLater => "retry_later",
96            Self::Fail => "fail",
97            Self::Warn => "warn",
98        }
99    }
100}
101
102#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
103pub struct OrchestratorBudgetConfig {
104    pub daily_cost_usd: Option<f64>,
105    pub hourly_cost_usd: Option<f64>,
106}
107
108#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
109pub struct OrchestratorBudgetSnapshot {
110    pub daily_cost_usd: Option<f64>,
111    pub hourly_cost_usd: Option<f64>,
112    pub cost_today_usd_micros: u64,
113    pub cost_hour_usd_micros: u64,
114    pub day_utc: i32,
115    pub hour_utc: i64,
116}
117
118#[derive(Debug)]
119struct OrchestratorBudgetState {
120    config: OrchestratorBudgetConfig,
121    day_utc: i32,
122    hour_utc: i64,
123    cost_today_usd_micros: u64,
124    cost_hour_usd_micros: u64,
125}
126
127impl Default for OrchestratorBudgetState {
128    fn default() -> Self {
129        Self {
130            config: OrchestratorBudgetConfig::default(),
131            day_utc: utc_day_key(),
132            hour_utc: utc_hour_key(),
133            cost_today_usd_micros: 0,
134            cost_hour_usd_micros: 0,
135        }
136    }
137}
138
139#[derive(Clone)]
140pub enum TriggerHandlerSpec {
141    Local {
142        raw: String,
143        closure: Rc<VmClosure>,
144    },
145    A2a {
146        target: String,
147        allow_cleartext: bool,
148    },
149    Worker {
150        queue: String,
151    },
152    Persona {
153        binding: crate::PersonaRuntimeBinding,
154    },
155    AutoResume {
156        worker_id: String,
157    },
158    /// Composes triggers (#1870) with named agent pools (#1883). On match,
159    /// the dispatcher resolves `pool` by name, invokes `task_factory(event)`
160    /// to build the per-event closure, and submits it to the pool with the
161    /// pool's queue strategy + backpressure policy applied (PL-04 / #1889).
162    SpawnToPool {
163        pool: String,
164        priority_from: Option<String>,
165        key_from: Option<String>,
166        task_factory: Rc<VmClosure>,
167    },
168    /// Composes triggers (#1870) with the reminder pipeline (#1815) by
169    /// injecting a `SystemReminder` into a target agent session's transcript
170    /// when the trigger matches (CH-05 / #1876). The reminder appears at the
171    /// session's next turn boundary — same as any other reminder. No spawn,
172    /// no resume, no signal. `body` is a prompt-template string rendered
173    /// against `{event}` / `{batch.events}` per match. `target` resolution
174    /// happens at dispatch time via [`TargetExpr`]; missing-target dispatches
175    /// are recorded as audit events rather than crashing the trigger.
176    ReminderInject {
177        target: TargetExpr,
178        body: String,
179        tags: Vec<String>,
180        ttl_turns: Option<i64>,
181        dedupe_key: Option<String>,
182        propagate: crate::llm::helpers::ReminderPropagate,
183        role_hint: crate::llm::helpers::ReminderRoleHint,
184        preserve_on_compact: bool,
185    },
186    /// CH-10 (#1910): emergency "panic" broadcast. When the trigger fires,
187    /// every running worker matched by `target_agents` is suspended
188    /// synchronously via the cooperative suspend pipeline used by
189    /// `suspend_agent` (#1837), bypassing the normal turn-boundary
190    /// delivery contract. Already-suspended or terminal workers are skipped
191    /// (no double-suspend, no error). `reason` is propagated to the
192    /// suspension envelope, the `WorkerSuspended` event, and the
193    /// `triggers.interrupt_and_suspend.audit` audit entry per suspension.
194    /// A no-target dispatch records a single roll-up audit entry and
195    /// returns a `broadcast` result with `suspended_count: 0` — graceful
196    /// no-op rather than dispatch failure, mirroring the `ReminderInject`
197    /// missing-target contract.
198    InterruptAndSuspend {
199        target_agents: AgentScope,
200        reason: String,
201    },
202}
203
204/// Resolution mode for the target worker set of a
205/// [`TriggerHandlerSpec::InterruptAndSuspend`] dispatch. `All` enumerates
206/// every entry in the local worker registry (the org-scoped "panic"
207/// broadcast); `Concrete` carries an explicit worker-id list from the
208/// registration; `Closure` defers resolution to a user closure that runs at
209/// dispatch time with the event payload bound to its first argument and
210/// must return a list of worker-id strings.
211#[derive(Clone)]
212pub enum AgentScope {
213    All,
214    Concrete(Vec<String>),
215    Closure(Rc<VmClosure>),
216}
217
218impl AgentScope {
219    pub fn kind(&self) -> &'static str {
220        match self {
221            Self::All => "all",
222            Self::Concrete(_) => "concrete",
223            Self::Closure(_) => "closure",
224        }
225    }
226}
227
228impl std::fmt::Debug for AgentScope {
229    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
230        match self {
231            Self::All => f.write_str("All"),
232            Self::Concrete(ids) => f.debug_tuple("Concrete").field(ids).finish(),
233            Self::Closure(_) => f.write_str("Closure(<vm_closure>)"),
234        }
235    }
236}
237
238/// Resolution mode for the target session of a [`TriggerHandlerSpec::ReminderInject`]
239/// dispatch. `Current` walks the current-session thread-local; `Parent` resolves
240/// the active session's parent in the agent-session lineage; `Concrete` carries
241/// a literal session id from the registration; `Closure` defers resolution to
242/// a user closure that runs at dispatch time with the event payload bound to
243/// its first argument and must return the session id as a string.
244#[derive(Clone)]
245pub enum TargetExpr {
246    Current,
247    Parent,
248    Concrete(String),
249    Closure(Rc<VmClosure>),
250}
251
252impl TargetExpr {
253    pub fn kind(&self) -> &'static str {
254        match self {
255            Self::Current => "current",
256            Self::Parent => "parent",
257            Self::Concrete(_) => "concrete",
258            Self::Closure(_) => "closure",
259        }
260    }
261}
262
263impl std::fmt::Debug for TargetExpr {
264    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
265        match self {
266            Self::Current => f.write_str("Current"),
267            Self::Parent => f.write_str("Parent"),
268            Self::Concrete(id) => f.debug_tuple("Concrete").field(id).finish(),
269            Self::Closure(_) => f.write_str("Closure(<vm_closure>)"),
270        }
271    }
272}
273
274impl std::fmt::Debug for TriggerHandlerSpec {
275    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276        match self {
277            Self::Local { raw, .. } => f.debug_struct("Local").field("raw", raw).finish(),
278            Self::A2a {
279                target,
280                allow_cleartext,
281            } => f
282                .debug_struct("A2a")
283                .field("target", target)
284                .field("allow_cleartext", allow_cleartext)
285                .finish(),
286            Self::Worker { queue } => f.debug_struct("Worker").field("queue", queue).finish(),
287            Self::Persona { binding } => f
288                .debug_struct("Persona")
289                .field("name", &binding.name)
290                .finish(),
291            Self::AutoResume { worker_id } => f
292                .debug_struct("AutoResume")
293                .field("worker_id", worker_id)
294                .finish(),
295            Self::SpawnToPool {
296                pool,
297                priority_from,
298                key_from,
299                ..
300            } => f
301                .debug_struct("SpawnToPool")
302                .field("pool", pool)
303                .field("priority_from", priority_from)
304                .field("key_from", key_from)
305                .finish(),
306            Self::ReminderInject {
307                target,
308                body,
309                tags,
310                ttl_turns,
311                dedupe_key,
312                propagate,
313                role_hint,
314                preserve_on_compact,
315            } => f
316                .debug_struct("ReminderInject")
317                .field("target", target)
318                .field("body", body)
319                .field("tags", tags)
320                .field("ttl_turns", ttl_turns)
321                .field("dedupe_key", dedupe_key)
322                .field("propagate", propagate)
323                .field("role_hint", role_hint)
324                .field("preserve_on_compact", preserve_on_compact)
325                .finish(),
326            Self::InterruptAndSuspend {
327                target_agents,
328                reason,
329            } => f
330                .debug_struct("InterruptAndSuspend")
331                .field("target_agents", target_agents)
332                .field("reason", reason)
333                .finish(),
334        }
335    }
336}
337
338impl TriggerHandlerSpec {
339    pub fn kind(&self) -> &'static str {
340        match self {
341            Self::Local { .. } => "local",
342            Self::A2a { .. } => "a2a",
343            Self::Worker { .. } => "worker",
344            Self::Persona { .. } => "persona",
345            Self::AutoResume { .. } => "auto_resume",
346            Self::SpawnToPool { .. } => "spawn_to_pool",
347            Self::ReminderInject { .. } => "reminder_inject",
348            Self::InterruptAndSuspend { .. } => "interrupt_and_suspend",
349        }
350    }
351}
352
353#[derive(Clone)]
354pub struct TriggerPredicateSpec {
355    pub raw: String,
356    pub closure: Rc<VmClosure>,
357}
358
359impl std::fmt::Debug for TriggerPredicateSpec {
360    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
361        f.debug_struct("TriggerPredicateSpec")
362            .field("raw", &self.raw)
363            .finish()
364    }
365}
366
367#[derive(Clone, Debug)]
368pub struct TriggerBindingSpec {
369    pub id: String,
370    pub source: TriggerBindingSource,
371    pub kind: String,
372    pub provider: ProviderId,
373    pub autonomy_tier: AutonomyTier,
374    pub handler: TriggerHandlerSpec,
375    pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
376    pub when: Option<TriggerPredicateSpec>,
377    pub when_budget: Option<TriggerPredicateBudget>,
378    pub retry: TriggerRetryConfig,
379    pub match_events: Vec<String>,
380    pub dedupe_key: Option<String>,
381    pub dedupe_retention_days: u32,
382    pub filter: Option<String>,
383    pub daily_cost_usd: Option<f64>,
384    pub hourly_cost_usd: Option<f64>,
385    pub max_autonomous_decisions_per_hour: Option<u64>,
386    pub max_autonomous_decisions_per_day: Option<u64>,
387    pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
388    pub max_concurrent: Option<u32>,
389    pub flow_control: TriggerFlowControlConfig,
390    /// CH-04 (#1875): optional aggregation buffer. When set, the
391    /// dispatcher accumulates matching events into a per-(binding,
392    /// partition_key) buffer and dispatches the handler with a batched
393    /// event when `count` is reached or the `window` elapses.
394    pub aggregation: Option<TriggerAggregationConfig>,
395    pub manifest_path: Option<PathBuf>,
396    pub package_name: Option<String>,
397    pub definition_fingerprint: String,
398}
399
400#[derive(Debug)]
401pub struct TriggerMetrics {
402    pub received: AtomicU64,
403    pub dispatched: AtomicU64,
404    pub failed: AtomicU64,
405    pub dlq: AtomicU64,
406    pub last_received_ms: Mutex<Option<i64>>,
407    pub cost_total_usd_micros: AtomicU64,
408    pub cost_today_usd_micros: AtomicU64,
409    pub cost_hour_usd_micros: AtomicU64,
410    pub autonomous_decisions_total: AtomicU64,
411    pub autonomous_decisions_today: AtomicU64,
412    pub autonomous_decisions_hour: AtomicU64,
413}
414
415impl Default for TriggerMetrics {
416    fn default() -> Self {
417        Self {
418            received: AtomicU64::new(0),
419            dispatched: AtomicU64::new(0),
420            failed: AtomicU64::new(0),
421            dlq: AtomicU64::new(0),
422            last_received_ms: Mutex::new(None),
423            cost_total_usd_micros: AtomicU64::new(0),
424            cost_today_usd_micros: AtomicU64::new(0),
425            cost_hour_usd_micros: AtomicU64::new(0),
426            autonomous_decisions_total: AtomicU64::new(0),
427            autonomous_decisions_today: AtomicU64::new(0),
428            autonomous_decisions_hour: AtomicU64::new(0),
429        }
430    }
431}
432
433#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
434pub struct TriggerMetricsSnapshot {
435    pub received: u64,
436    pub dispatched: u64,
437    pub failed: u64,
438    pub dlq: u64,
439    pub in_flight: u64,
440    pub last_received_ms: Option<i64>,
441    pub cost_total_usd_micros: u64,
442    pub cost_today_usd_micros: u64,
443    pub cost_hour_usd_micros: u64,
444    pub autonomous_decisions_total: u64,
445    pub autonomous_decisions_today: u64,
446    pub autonomous_decisions_hour: u64,
447}
448
449pub struct TriggerBinding {
450    pub id: TriggerId,
451    pub version: u32,
452    pub source: TriggerBindingSource,
453    pub kind: String,
454    pub provider: ProviderId,
455    pub autonomy_tier: AutonomyTier,
456    pub handler: TriggerHandlerSpec,
457    pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
458    pub when: Option<TriggerPredicateSpec>,
459    pub when_budget: Option<TriggerPredicateBudget>,
460    pub retry: TriggerRetryConfig,
461    pub match_events: Vec<String>,
462    pub dedupe_key: Option<String>,
463    pub dedupe_retention_days: u32,
464    pub filter: Option<String>,
465    pub daily_cost_usd: Option<f64>,
466    pub hourly_cost_usd: Option<f64>,
467    pub max_autonomous_decisions_per_hour: Option<u64>,
468    pub max_autonomous_decisions_per_day: Option<u64>,
469    pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
470    pub max_concurrent: Option<u32>,
471    pub flow_control: TriggerFlowControlConfig,
472    /// CH-04 (#1875): see `TriggerBindingSpec::aggregation`. Cloned at
473    /// registration so the dispatcher does not need to lock the registry
474    /// for every emit.
475    pub aggregation: Option<TriggerAggregationConfig>,
476    pub manifest_path: Option<PathBuf>,
477    pub package_name: Option<String>,
478    pub definition_fingerprint: String,
479    pub state: Mutex<TriggerState>,
480    pub metrics: TriggerMetrics,
481    pub in_flight: AtomicU64,
482    pub cancel_token: Arc<AtomicBool>,
483    pub predicate_state: Mutex<TriggerPredicateState>,
484}
485
486#[derive(Clone, Debug, Default)]
487pub struct TriggerPredicateState {
488    pub budget_day_utc: Option<i32>,
489    pub budget_hour_utc: Option<i64>,
490    pub consecutive_failures: u32,
491    pub breaker_open_until_ms: Option<i64>,
492    pub recent_cost_usd_micros: VecDeque<u64>,
493}
494
495impl std::fmt::Debug for TriggerBinding {
496    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
497        f.debug_struct("TriggerBinding")
498            .field("id", &self.id)
499            .field("version", &self.version)
500            .field("source", &self.source)
501            .field("kind", &self.kind)
502            .field("provider", &self.provider)
503            .field("handler_kind", &self.handler.kind())
504            .field("state", &self.state_snapshot())
505            .finish()
506    }
507}
508
509impl TriggerBinding {
510    pub fn snapshot(&self) -> TriggerBindingSnapshot {
511        TriggerBindingSnapshot {
512            id: self.id.as_str().to_string(),
513            version: self.version,
514            source: self.source,
515            kind: self.kind.clone(),
516            provider: self.provider.as_str().to_string(),
517            autonomy_tier: self.autonomy_tier,
518            handler_kind: self.handler.kind().to_string(),
519            state: self.state_snapshot(),
520            metrics: self.metrics_snapshot(),
521            daily_cost_usd: self.daily_cost_usd,
522            hourly_cost_usd: self.hourly_cost_usd,
523            max_autonomous_decisions_per_hour: self.max_autonomous_decisions_per_hour,
524            max_autonomous_decisions_per_day: self.max_autonomous_decisions_per_day,
525            on_budget_exhausted: self.on_budget_exhausted,
526        }
527    }
528
529    fn new(spec: TriggerBindingSpec, version: u32) -> Self {
530        Self {
531            id: TriggerId::new(spec.id),
532            version,
533            source: spec.source,
534            kind: spec.kind,
535            provider: spec.provider,
536            autonomy_tier: spec.autonomy_tier,
537            handler: spec.handler,
538            dispatch_priority: spec.dispatch_priority,
539            when: spec.when,
540            when_budget: spec.when_budget,
541            retry: spec.retry,
542            match_events: spec.match_events,
543            dedupe_key: spec.dedupe_key,
544            dedupe_retention_days: spec.dedupe_retention_days,
545            filter: spec.filter,
546            daily_cost_usd: spec.daily_cost_usd,
547            hourly_cost_usd: spec.hourly_cost_usd,
548            max_autonomous_decisions_per_hour: spec.max_autonomous_decisions_per_hour,
549            max_autonomous_decisions_per_day: spec.max_autonomous_decisions_per_day,
550            on_budget_exhausted: spec.on_budget_exhausted,
551            max_concurrent: spec.max_concurrent,
552            flow_control: spec.flow_control,
553            aggregation: spec.aggregation,
554            manifest_path: spec.manifest_path,
555            package_name: spec.package_name,
556            definition_fingerprint: spec.definition_fingerprint,
557            state: Mutex::new(TriggerState::Registering),
558            metrics: TriggerMetrics::default(),
559            in_flight: AtomicU64::new(0),
560            cancel_token: Arc::new(AtomicBool::new(false)),
561            predicate_state: Mutex::new(TriggerPredicateState::default()),
562        }
563    }
564
565    pub fn binding_key(&self) -> String {
566        format!("{}@v{}", self.id.as_str(), self.version)
567    }
568
569    pub fn state_snapshot(&self) -> TriggerState {
570        *self.state.lock().expect("trigger state poisoned")
571    }
572
573    pub fn metrics_snapshot(&self) -> TriggerMetricsSnapshot {
574        TriggerMetricsSnapshot {
575            received: self.metrics.received.load(Ordering::Relaxed),
576            dispatched: self.metrics.dispatched.load(Ordering::Relaxed),
577            failed: self.metrics.failed.load(Ordering::Relaxed),
578            dlq: self.metrics.dlq.load(Ordering::Relaxed),
579            in_flight: self.in_flight.load(Ordering::Relaxed),
580            last_received_ms: *self
581                .metrics
582                .last_received_ms
583                .lock()
584                .expect("trigger metrics poisoned"),
585            cost_total_usd_micros: self.metrics.cost_total_usd_micros.load(Ordering::Relaxed),
586            cost_today_usd_micros: self.metrics.cost_today_usd_micros.load(Ordering::Relaxed),
587            cost_hour_usd_micros: self.metrics.cost_hour_usd_micros.load(Ordering::Relaxed),
588            autonomous_decisions_total: self
589                .metrics
590                .autonomous_decisions_total
591                .load(Ordering::Relaxed),
592            autonomous_decisions_today: self
593                .metrics
594                .autonomous_decisions_today
595                .load(Ordering::Relaxed),
596            autonomous_decisions_hour: self
597                .metrics
598                .autonomous_decisions_hour
599                .load(Ordering::Relaxed),
600        }
601    }
602}
603
604#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
605pub struct TriggerBindingSnapshot {
606    pub id: String,
607    pub version: u32,
608    pub source: TriggerBindingSource,
609    pub kind: String,
610    pub provider: String,
611    pub autonomy_tier: AutonomyTier,
612    pub handler_kind: String,
613    pub state: TriggerState,
614    pub metrics: TriggerMetricsSnapshot,
615    pub daily_cost_usd: Option<f64>,
616    pub hourly_cost_usd: Option<f64>,
617    pub max_autonomous_decisions_per_hour: Option<u64>,
618    pub max_autonomous_decisions_per_day: Option<u64>,
619    pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
620}
621
622#[derive(Clone, Copy, Debug, PartialEq, Eq)]
623pub enum TriggerDispatchOutcome {
624    Dispatched,
625    Failed,
626    Dlq,
627}
628
629#[derive(Debug)]
630pub enum TriggerRegistryError {
631    DuplicateId(String),
632    InvalidSpec(String),
633    UnknownId(String),
634    UnknownBindingVersion { id: String, version: u32 },
635    EventLog(String),
636}
637
638impl std::fmt::Display for TriggerRegistryError {
639    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
640        match self {
641            Self::DuplicateId(id) => write!(f, "duplicate trigger id '{id}'"),
642            Self::InvalidSpec(message) | Self::EventLog(message) => f.write_str(message),
643            Self::UnknownId(id) => write!(f, "unknown trigger id '{id}'"),
644            Self::UnknownBindingVersion { id, version } => {
645                write!(f, "unknown trigger binding '{id}' version {version}")
646            }
647        }
648    }
649}
650
651impl std::error::Error for TriggerRegistryError {}
652
653#[derive(Default)]
654pub struct TriggerRegistry {
655    bindings: BTreeMap<String, Vec<Arc<TriggerBinding>>>,
656    by_provider: BTreeMap<String, BTreeSet<String>>,
657    event_log: Option<Arc<AnyEventLog>>,
658    secret_provider: Option<Arc<dyn SecretProvider>>,
659}
660
661thread_local! {
662    static TRIGGER_REGISTRY: RefCell<TriggerRegistry> = RefCell::new(TriggerRegistry::default());
663}
664
665thread_local! {
666    static ORCHESTRATOR_BUDGET: RefCell<OrchestratorBudgetState> =
667        RefCell::new(OrchestratorBudgetState::default());
668}
669
670const TERMINATED_VERSION_RETENTION_LIMIT: usize = 2;
671
672const TRIGGERS_LIFECYCLE_TOPIC: &str = "triggers.lifecycle";
673const PREDICATE_COST_WINDOW: usize = 100;
674
675#[derive(Clone, Debug, Deserialize)]
676struct LifecycleStateTransitionRecord {
677    id: String,
678    version: u32,
679    #[serde(default)]
680    definition_fingerprint: Option<String>,
681    to_state: TriggerState,
682}
683
684#[derive(Clone, Debug)]
685struct HistoricalLifecycleRecord {
686    occurred_at_ms: i64,
687    transition: LifecycleStateTransitionRecord,
688}
689
690#[derive(Clone, Copy, Debug, PartialEq, Eq)]
691pub struct RecordedTriggerBinding {
692    pub version: u32,
693    pub received_at: OffsetDateTime,
694}
695
696#[derive(Clone, Copy, Debug, Default)]
697struct HistoricalVersionLookup {
698    matching_version: Option<u32>,
699    max_version: Option<u32>,
700}
701
702pub fn clear_trigger_registry() {
703    TRIGGER_REGISTRY.with(|slot| {
704        *slot.borrow_mut() = TriggerRegistry::default();
705    });
706    clear_orchestrator_budget();
707    super::aggregation::clear_aggregation_state();
708}
709
710pub fn install_orchestrator_budget(config: OrchestratorBudgetConfig) {
711    ORCHESTRATOR_BUDGET.with(|slot| {
712        let mut state = slot.borrow_mut();
713        rollover_orchestrator_budget(&mut state);
714        state.config = config;
715    });
716}
717
718pub fn clear_orchestrator_budget() {
719    ORCHESTRATOR_BUDGET.with(|slot| {
720        *slot.borrow_mut() = OrchestratorBudgetState::default();
721    });
722}
723
724pub fn snapshot_orchestrator_budget() -> OrchestratorBudgetSnapshot {
725    ORCHESTRATOR_BUDGET.with(|slot| {
726        let mut state = slot.borrow_mut();
727        rollover_orchestrator_budget(&mut state);
728        OrchestratorBudgetSnapshot {
729            daily_cost_usd: state.config.daily_cost_usd,
730            hourly_cost_usd: state.config.hourly_cost_usd,
731            cost_today_usd_micros: state.cost_today_usd_micros,
732            cost_hour_usd_micros: state.cost_hour_usd_micros,
733            day_utc: state.day_utc,
734            hour_utc: state.hour_utc,
735        }
736    })
737}
738
739pub fn note_orchestrator_budget_cost(cost_usd_micros: u64) {
740    if cost_usd_micros == 0 {
741        return;
742    }
743    ORCHESTRATOR_BUDGET.with(|slot| {
744        let mut state = slot.borrow_mut();
745        rollover_orchestrator_budget(&mut state);
746        state.cost_today_usd_micros = state.cost_today_usd_micros.saturating_add(cost_usd_micros);
747        state.cost_hour_usd_micros = state.cost_hour_usd_micros.saturating_add(cost_usd_micros);
748    });
749}
750
751pub fn orchestrator_budget_would_exceed(expected_cost_usd_micros: u64) -> Option<&'static str> {
752    ORCHESTRATOR_BUDGET.with(|slot| {
753        let mut state = slot.borrow_mut();
754        rollover_orchestrator_budget(&mut state);
755        if state.config.hourly_cost_usd.is_some_and(|limit| {
756            micros_to_usd(
757                state
758                    .cost_hour_usd_micros
759                    .saturating_add(expected_cost_usd_micros),
760            ) > limit
761        }) {
762            return Some("orchestrator_hourly_budget_exceeded");
763        }
764        if state.config.daily_cost_usd.is_some_and(|limit| {
765            micros_to_usd(
766                state
767                    .cost_today_usd_micros
768                    .saturating_add(expected_cost_usd_micros),
769            ) > limit
770        }) {
771            return Some("orchestrator_daily_budget_exceeded");
772        }
773        None
774    })
775}
776
777pub fn reset_binding_budget_windows(binding: &TriggerBinding) {
778    let today = utc_day_key();
779    let hour = utc_hour_key();
780    let mut state = binding
781        .predicate_state
782        .lock()
783        .expect("trigger predicate state poisoned");
784    if state.budget_day_utc != Some(today) {
785        state.budget_day_utc = Some(today);
786        binding
787            .metrics
788            .cost_today_usd_micros
789            .store(0, Ordering::Relaxed);
790        binding
791            .metrics
792            .autonomous_decisions_today
793            .store(0, Ordering::Relaxed);
794    }
795    if state.budget_hour_utc != Some(hour) {
796        state.budget_hour_utc = Some(hour);
797        binding
798            .metrics
799            .cost_hour_usd_micros
800            .store(0, Ordering::Relaxed);
801        binding
802            .metrics
803            .autonomous_decisions_hour
804            .store(0, Ordering::Relaxed);
805    }
806}
807
808pub fn binding_budget_would_exceed(
809    binding: &TriggerBinding,
810    expected_cost_usd_micros: u64,
811) -> Option<&'static str> {
812    reset_binding_budget_windows(binding);
813    if binding.hourly_cost_usd.is_some_and(|limit| {
814        micros_to_usd(
815            binding
816                .metrics
817                .cost_hour_usd_micros
818                .load(Ordering::Relaxed)
819                .saturating_add(expected_cost_usd_micros),
820        ) > limit
821    }) {
822        return Some("hourly_budget_exceeded");
823    }
824    if binding.daily_cost_usd.is_some_and(|limit| {
825        micros_to_usd(
826            binding
827                .metrics
828                .cost_today_usd_micros
829                .load(Ordering::Relaxed)
830                .saturating_add(expected_cost_usd_micros),
831        ) > limit
832    }) {
833        return Some("daily_budget_exceeded");
834    }
835    None
836}
837
838pub fn binding_autonomy_budget_would_exceed(binding: &TriggerBinding) -> Option<&'static str> {
839    reset_binding_budget_windows(binding);
840    if binding
841        .max_autonomous_decisions_per_hour
842        .is_some_and(|limit| {
843            binding
844                .metrics
845                .autonomous_decisions_hour
846                .load(Ordering::Relaxed)
847                .saturating_add(1)
848                > limit
849        })
850    {
851        return Some("hourly_autonomy_budget_exceeded");
852    }
853    if binding
854        .max_autonomous_decisions_per_day
855        .is_some_and(|limit| {
856            binding
857                .metrics
858                .autonomous_decisions_today
859                .load(Ordering::Relaxed)
860                .saturating_add(1)
861                > limit
862        })
863    {
864        return Some("daily_autonomy_budget_exceeded");
865    }
866    None
867}
868
869pub fn note_autonomous_decision(binding: &TriggerBinding) {
870    reset_binding_budget_windows(binding);
871    binding
872        .metrics
873        .autonomous_decisions_total
874        .fetch_add(1, Ordering::Relaxed);
875    binding
876        .metrics
877        .autonomous_decisions_today
878        .fetch_add(1, Ordering::Relaxed);
879    binding
880        .metrics
881        .autonomous_decisions_hour
882        .fetch_add(1, Ordering::Relaxed);
883}
884
885pub fn expected_predicate_cost_usd_micros(binding: &TriggerBinding) -> u64 {
886    let state = binding
887        .predicate_state
888        .lock()
889        .expect("trigger predicate state poisoned");
890    if !state.recent_cost_usd_micros.is_empty() {
891        let total: u64 = state.recent_cost_usd_micros.iter().copied().sum();
892        return total / state.recent_cost_usd_micros.len() as u64;
893    }
894    binding
895        .when_budget
896        .as_ref()
897        .and_then(|budget| budget.max_cost_usd)
898        .map(usd_to_micros)
899        .unwrap_or_default()
900}
901
902pub fn record_predicate_cost_sample(binding: &TriggerBinding, cost_usd_micros: u64) {
903    let mut state = binding
904        .predicate_state
905        .lock()
906        .expect("trigger predicate state poisoned");
907    state.recent_cost_usd_micros.push_back(cost_usd_micros);
908    while state.recent_cost_usd_micros.len() > PREDICATE_COST_WINDOW {
909        state.recent_cost_usd_micros.pop_front();
910    }
911}
912
913pub fn usd_to_micros(value: f64) -> u64 {
914    if !value.is_finite() || value <= 0.0 {
915        return 0;
916    }
917    (value * 1_000_000.0).ceil() as u64
918}
919
920pub fn micros_to_usd(value: u64) -> f64 {
921    value as f64 / 1_000_000.0
922}
923
924fn rollover_orchestrator_budget(state: &mut OrchestratorBudgetState) {
925    let today = utc_day_key();
926    let hour = utc_hour_key();
927    if state.day_utc != today {
928        state.day_utc = today;
929        state.cost_today_usd_micros = 0;
930    }
931    if state.hour_utc != hour {
932        state.hour_utc = hour;
933        state.cost_hour_usd_micros = 0;
934    }
935}
936
937fn utc_day_key() -> i32 {
938    (clock::now_utc().date()
939        - time::Date::from_calendar_date(1970, time::Month::January, 1).expect("valid epoch date"))
940    .whole_days() as i32
941}
942
943fn utc_hour_key() -> i64 {
944    clock::now_utc().unix_timestamp() / 3_600
945}
946
947pub fn snapshot_trigger_bindings() -> Vec<TriggerBindingSnapshot> {
948    TRIGGER_REGISTRY.with(|slot| {
949        let registry = slot.borrow();
950        let mut snapshots = Vec::new();
951        for bindings in registry.bindings.values() {
952            for binding in bindings {
953                snapshots.push(binding.snapshot());
954            }
955        }
956        snapshots.sort_by(|left, right| {
957            left.id
958                .cmp(&right.id)
959                .then(left.version.cmp(&right.version))
960                .then(left.state.as_str().cmp(right.state.as_str()))
961        });
962        snapshots
963    })
964}
965
966#[allow(clippy::arc_with_non_send_sync)]
967pub fn resolve_trigger_binding_as_of(
968    id: &str,
969    as_of: OffsetDateTime,
970) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
971    let version = binding_version_as_of(id, as_of)?;
972    resolve_trigger_binding_version(id, version)
973}
974
975#[allow(clippy::arc_with_non_send_sync)]
976pub fn resolve_live_or_as_of(
977    id: &str,
978    recorded: RecordedTriggerBinding,
979) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
980    match resolve_live_trigger_binding(id, Some(recorded.version)) {
981        Ok(binding) => Ok(binding),
982        Err(TriggerRegistryError::UnknownBindingVersion { .. }) => {
983            let binding = resolve_trigger_binding_as_of(id, recorded.received_at)?;
984            let mut metadata = BTreeMap::new();
985            metadata.insert("trigger_id".to_string(), serde_json::json!(id));
986            metadata.insert(
987                "recorded_version".to_string(),
988                serde_json::json!(recorded.version),
989            );
990            metadata.insert(
991                "received_at".to_string(),
992                serde_json::json!(recorded
993                    .received_at
994                    .format(&time::format_description::well_known::Rfc3339)
995                    .unwrap_or_else(|_| recorded.received_at.to_string())),
996            );
997            metadata.insert(
998                "resolved_version".to_string(),
999                serde_json::json!(binding.version),
1000            );
1001            crate::events::log_warn_meta(
1002                "replay.binding_version_gc_fallback",
1003                "trigger replay fell back to lifecycle history after binding version GC",
1004                metadata,
1005            );
1006            Ok(binding)
1007        }
1008        Err(error) => Err(error),
1009    }
1010}
1011
1012pub fn binding_version_as_of(id: &str, as_of: OffsetDateTime) -> Result<u32, TriggerRegistryError> {
1013    TRIGGER_REGISTRY.with(|slot| {
1014        let registry = slot.borrow();
1015        registry.binding_version_as_of(id, as_of)
1016    })
1017}
1018
1019#[allow(clippy::arc_with_non_send_sync)]
1020fn resolve_trigger_binding_version(
1021    id: &str,
1022    version: u32,
1023) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
1024    TRIGGER_REGISTRY.with(|slot| {
1025        let registry = slot.borrow();
1026        registry
1027            .binding(id, version)
1028            .ok_or_else(|| TriggerRegistryError::UnknownBindingVersion {
1029                id: id.to_string(),
1030                version,
1031            })
1032    })
1033}
1034
1035#[allow(clippy::arc_with_non_send_sync)]
1036pub fn resolve_live_trigger_binding(
1037    id: &str,
1038    version: Option<u32>,
1039) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
1040    TRIGGER_REGISTRY.with(|slot| {
1041        let registry = slot.borrow();
1042        if let Some(version) = version {
1043            let binding = registry.binding(id, version).ok_or_else(|| {
1044                TriggerRegistryError::UnknownBindingVersion {
1045                    id: id.to_string(),
1046                    version,
1047                }
1048            })?;
1049            if binding.state_snapshot() == TriggerState::Terminated {
1050                return Err(TriggerRegistryError::UnknownBindingVersion {
1051                    id: id.to_string(),
1052                    version,
1053                });
1054            }
1055            return Ok(binding);
1056        }
1057
1058        registry
1059            .live_bindings_any_source(id)
1060            .into_iter()
1061            .max_by_key(|binding| binding.version)
1062            .ok_or_else(|| TriggerRegistryError::UnknownId(id.to_string()))
1063    })
1064}
1065
1066/// Snapshot active channel-source trigger bindings whose `channel:` selector
1067/// in `match_events[0]` matches the supplied (scope, scope_id, name) tuple.
1068///
1069/// Used by [`crate::channels`] fan-out (CH-02 / #1872). Returns bindings
1070/// sorted by id then version so dispatch ordering is deterministic.
1071pub(crate) fn channel_bindings_matching(
1072    scope: &str,
1073    scope_id: &str,
1074    name: &str,
1075) -> Vec<Arc<TriggerBinding>> {
1076    TRIGGER_REGISTRY.with(|slot| {
1077        let registry = slot.borrow();
1078        let Some(binding_ids) = registry.by_provider.get("channel") else {
1079            return Vec::new();
1080        };
1081        let mut bindings = Vec::new();
1082        for id in binding_ids {
1083            let Some(versions) = registry.bindings.get(id) else {
1084                continue;
1085            };
1086            for binding in versions {
1087                if binding.state_snapshot() != TriggerState::Active {
1088                    continue;
1089                }
1090                let Some(selector_raw) = binding.match_events.first() else {
1091                    continue;
1092                };
1093                let Ok(selector) = crate::channels::ChannelSelector::parse(selector_raw) else {
1094                    continue;
1095                };
1096                // For tenant-default ("Current") selectors we resolve "current"
1097                // to the scope_id of the emit, since the trigger and the
1098                // emitter share the same registry / runtime context.
1099                if !selector.matches(scope, scope_id, name, scope_id) {
1100                    continue;
1101                }
1102                bindings.push(binding.clone());
1103            }
1104        }
1105        bindings.sort_by(|left, right| {
1106            left.id
1107                .as_str()
1108                .cmp(right.id.as_str())
1109                .then(left.version.cmp(&right.version))
1110        });
1111        bindings
1112    })
1113}
1114
1115pub(crate) fn matching_bindings(event: &super::TriggerEvent) -> Vec<Arc<TriggerBinding>> {
1116    TRIGGER_REGISTRY.with(|slot| {
1117        let registry = slot.borrow();
1118        let Some(binding_ids) = registry.by_provider.get(event.provider.as_str()) else {
1119            return Vec::new();
1120        };
1121
1122        let mut bindings = Vec::new();
1123        for id in binding_ids {
1124            let Some(versions) = registry.bindings.get(id) else {
1125                continue;
1126            };
1127            for binding in versions {
1128                if binding.state_snapshot() != TriggerState::Active {
1129                    continue;
1130                }
1131                if !binding.match_events.is_empty()
1132                    && !binding.match_events.iter().any(|kind| kind == &event.kind)
1133                {
1134                    continue;
1135                }
1136                bindings.push(binding.clone());
1137            }
1138        }
1139
1140        bindings.sort_by(|left, right| {
1141            left.id
1142                .as_str()
1143                .cmp(right.id.as_str())
1144                .then(left.version.cmp(&right.version))
1145        });
1146        bindings
1147    })
1148}
1149
1150pub async fn install_manifest_triggers(
1151    specs: Vec<TriggerBindingSpec>,
1152) -> Result<(), TriggerRegistryError> {
1153    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1154        let registry = &mut *slot.borrow_mut();
1155        registry.refresh_runtime_context();
1156        let mut touched_ids = BTreeSet::new();
1157
1158        let mut incoming = BTreeMap::new();
1159        for spec in specs {
1160            let spec_id = spec.id.clone();
1161            if spec.source != TriggerBindingSource::Manifest {
1162                return Err(TriggerRegistryError::InvalidSpec(format!(
1163                    "manifest install received non-manifest trigger '{spec_id}'"
1164                )));
1165            }
1166            if spec_id.trim().is_empty() {
1167                return Err(TriggerRegistryError::InvalidSpec(
1168                    "manifest trigger id cannot be empty".to_string(),
1169                ));
1170            }
1171            if incoming.insert(spec_id.clone(), spec).is_some() {
1172                return Err(TriggerRegistryError::DuplicateId(spec_id));
1173            }
1174        }
1175
1176        let mut lifecycle = Vec::new();
1177        let existing_ids: Vec<String> = registry
1178            .bindings
1179            .iter()
1180            .filter(|(_, bindings)| {
1181                bindings.iter().any(|binding| {
1182                    binding.source == TriggerBindingSource::Manifest
1183                        && binding.state_snapshot() != TriggerState::Terminated
1184                })
1185            })
1186            .map(|(id, _)| id.clone())
1187            .collect();
1188
1189        for id in existing_ids {
1190            let live_manifest = registry.live_bindings(&id, TriggerBindingSource::Manifest);
1191            let Some(spec) = incoming.remove(&id) else {
1192                for binding in live_manifest {
1193                    registry.transition_binding_to_draining(&binding, &mut lifecycle);
1194                }
1195                touched_ids.insert(id.clone());
1196                continue;
1197            };
1198
1199            let has_matching_active = live_manifest.iter().any(|binding| {
1200                binding.definition_fingerprint == spec.definition_fingerprint
1201                    && matches!(
1202                        binding.state_snapshot(),
1203                        TriggerState::Registering | TriggerState::Active
1204                    )
1205            });
1206            if has_matching_active {
1207                continue;
1208            }
1209
1210            for binding in live_manifest {
1211                registry.transition_binding_to_draining(&binding, &mut lifecycle);
1212            }
1213
1214            let version = registry.next_version_for_spec(&spec);
1215            registry.register_binding(spec, version, &mut lifecycle);
1216            touched_ids.insert(id.clone());
1217        }
1218
1219        for spec in incoming.into_values() {
1220            touched_ids.insert(spec.id.clone());
1221            let version = registry.next_version_for_spec(&spec);
1222            registry.register_binding(spec, version, &mut lifecycle);
1223        }
1224
1225        for id in touched_ids {
1226            registry.gc_terminated_versions(&id);
1227        }
1228
1229        Ok((registry.event_log.clone(), lifecycle))
1230    })?;
1231
1232    append_lifecycle_events(event_log, events).await
1233}
1234
1235pub async fn dynamic_register(
1236    mut spec: TriggerBindingSpec,
1237) -> Result<TriggerId, TriggerRegistryError> {
1238    if spec.id.trim().is_empty() {
1239        spec.id = format!("dynamic_trigger_{}", Uuid::now_v7());
1240    }
1241    spec.source = TriggerBindingSource::Dynamic;
1242    let id = spec.id.clone();
1243    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1244        let registry = &mut *slot.borrow_mut();
1245        registry.refresh_runtime_context();
1246
1247        if registry.bindings.contains_key(id.as_str()) {
1248            return Err(TriggerRegistryError::DuplicateId(id.clone()));
1249        }
1250
1251        let mut lifecycle = Vec::new();
1252        let version = registry.next_version_for_spec(&spec);
1253        registry.register_binding(spec, version, &mut lifecycle);
1254        Ok((registry.event_log.clone(), lifecycle))
1255    })?;
1256
1257    append_lifecycle_events(event_log, events).await?;
1258    Ok(TriggerId::new(id))
1259}
1260
1261pub async fn dynamic_deregister(id: &str) -> Result<(), TriggerRegistryError> {
1262    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1263        let registry = &mut *slot.borrow_mut();
1264        let live_dynamic = registry.live_bindings(id, TriggerBindingSource::Dynamic);
1265        if live_dynamic.is_empty() {
1266            return Err(TriggerRegistryError::UnknownId(id.to_string()));
1267        }
1268
1269        let mut lifecycle = Vec::new();
1270        for binding in live_dynamic {
1271            registry.transition_binding_to_draining(&binding, &mut lifecycle);
1272        }
1273        Ok((registry.event_log.clone(), lifecycle))
1274    })?;
1275
1276    append_lifecycle_events(event_log, events).await
1277}
1278
1279pub async fn drain(id: &str) -> Result<(), TriggerRegistryError> {
1280    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1281        let registry = &mut *slot.borrow_mut();
1282        let live = registry.live_bindings_any_source(id);
1283        if live.is_empty() {
1284            return Err(TriggerRegistryError::UnknownId(id.to_string()));
1285        }
1286
1287        let mut lifecycle = Vec::new();
1288        for binding in live {
1289            registry.transition_binding_to_draining(&binding, &mut lifecycle);
1290        }
1291        Ok((registry.event_log.clone(), lifecycle))
1292    })?;
1293
1294    append_lifecycle_events(event_log, events).await
1295}
1296
1297pub async fn pause(id: &str) -> Result<(), TriggerRegistryError> {
1298    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1299        let registry = &mut *slot.borrow_mut();
1300        let live = registry.live_bindings_any_source(id);
1301        if live.is_empty() {
1302            return Err(TriggerRegistryError::UnknownId(id.to_string()));
1303        }
1304
1305        let mut lifecycle = Vec::new();
1306        for binding in live {
1307            match binding.state_snapshot() {
1308                TriggerState::Registering | TriggerState::Active => {
1309                    registry.transition_binding_state(
1310                        &binding,
1311                        TriggerState::Paused,
1312                        &mut lifecycle,
1313                    );
1314                }
1315                TriggerState::Paused | TriggerState::Draining | TriggerState::Terminated => {}
1316            }
1317        }
1318        Ok((registry.event_log.clone(), lifecycle))
1319    })?;
1320
1321    append_lifecycle_events(event_log, events).await
1322}
1323
1324pub async fn resume(id: &str) -> Result<(), TriggerRegistryError> {
1325    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1326        let registry = &mut *slot.borrow_mut();
1327        let live = registry.live_bindings_any_source(id);
1328        if live.is_empty() {
1329            return Err(TriggerRegistryError::UnknownId(id.to_string()));
1330        }
1331
1332        let mut lifecycle = Vec::new();
1333        for binding in live {
1334            if binding.state_snapshot() == TriggerState::Paused {
1335                registry.transition_binding_state(&binding, TriggerState::Active, &mut lifecycle);
1336            }
1337        }
1338        Ok((registry.event_log.clone(), lifecycle))
1339    })?;
1340
1341    append_lifecycle_events(event_log, events).await
1342}
1343
1344fn pin_trigger_binding_inner(
1345    id: &str,
1346    version: u32,
1347    allow_terminated: bool,
1348) -> Result<(), TriggerRegistryError> {
1349    TRIGGER_REGISTRY.with(|slot| {
1350        let registry = slot.borrow();
1351        let binding = registry.binding(id, version).ok_or_else(|| {
1352            TriggerRegistryError::UnknownBindingVersion {
1353                id: id.to_string(),
1354                version,
1355            }
1356        })?;
1357        match binding.state_snapshot() {
1358            TriggerState::Paused => Err(TriggerRegistryError::InvalidSpec(format!(
1359                "trigger binding '{id}' version {version} is paused"
1360            ))),
1361            TriggerState::Terminated if !allow_terminated => {
1362                Err(TriggerRegistryError::InvalidSpec(format!(
1363                    "trigger binding '{id}' version {version} is terminated"
1364                )))
1365            }
1366            _ => {
1367                binding.in_flight.fetch_add(1, Ordering::Relaxed);
1368                Ok(())
1369            }
1370        }
1371    })
1372}
1373
1374pub fn pin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1375    pin_trigger_binding_inner(id, version, false)
1376}
1377
1378pub async fn unpin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1379    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1380        let registry = &mut *slot.borrow_mut();
1381        let binding = registry.binding(id, version).ok_or_else(|| {
1382            TriggerRegistryError::UnknownBindingVersion {
1383                id: id.to_string(),
1384                version,
1385            }
1386        })?;
1387        let current = binding.in_flight.load(Ordering::Relaxed);
1388        if current == 0 {
1389            return Err(TriggerRegistryError::InvalidSpec(format!(
1390                "trigger binding '{id}' version {version} has no in-flight events"
1391            )));
1392        }
1393        binding.in_flight.fetch_sub(1, Ordering::Relaxed);
1394
1395        let mut lifecycle = Vec::new();
1396        registry.maybe_finalize_draining(&binding, &mut lifecycle);
1397        registry.gc_terminated_versions(binding.id.as_str());
1398        Ok((registry.event_log.clone(), lifecycle))
1399    })?;
1400
1401    append_lifecycle_events(event_log, events).await
1402}
1403
1404pub fn begin_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1405    begin_in_flight_inner(id, version, false)
1406}
1407
1408pub(crate) fn begin_replay_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1409    begin_in_flight_inner(id, version, true)
1410}
1411
1412fn begin_in_flight_inner(
1413    id: &str,
1414    version: u32,
1415    allow_terminated: bool,
1416) -> Result<(), TriggerRegistryError> {
1417    pin_trigger_binding_inner(id, version, allow_terminated)?;
1418    TRIGGER_REGISTRY.with(|slot| {
1419        let registry = slot.borrow();
1420        let binding = registry.binding(id, version).ok_or_else(|| {
1421            TriggerRegistryError::UnknownBindingVersion {
1422                id: id.to_string(),
1423                version,
1424            }
1425        })?;
1426        binding.metrics.received.fetch_add(1, Ordering::Relaxed);
1427        *binding
1428            .metrics
1429            .last_received_ms
1430            .lock()
1431            .expect("trigger metrics poisoned") = Some(now_ms());
1432        Ok(())
1433    })
1434}
1435
1436pub async fn finish_in_flight(
1437    id: &str,
1438    version: u32,
1439    outcome: TriggerDispatchOutcome,
1440) -> Result<(), TriggerRegistryError> {
1441    TRIGGER_REGISTRY.with(|slot| {
1442        let registry = &mut *slot.borrow_mut();
1443        let binding = registry.binding(id, version).ok_or_else(|| {
1444            TriggerRegistryError::UnknownBindingVersion {
1445                id: id.to_string(),
1446                version,
1447            }
1448        })?;
1449        let current = binding.in_flight.load(Ordering::Relaxed);
1450        if current == 0 {
1451            return Err(TriggerRegistryError::InvalidSpec(format!(
1452                "trigger binding '{id}' version {version} has no in-flight events"
1453            )));
1454        }
1455        match outcome {
1456            TriggerDispatchOutcome::Dispatched => {
1457                binding.metrics.dispatched.fetch_add(1, Ordering::Relaxed);
1458            }
1459            TriggerDispatchOutcome::Failed => {
1460                binding.metrics.failed.fetch_add(1, Ordering::Relaxed);
1461            }
1462            TriggerDispatchOutcome::Dlq => {
1463                binding.metrics.dlq.fetch_add(1, Ordering::Relaxed);
1464            }
1465        }
1466        Ok(())
1467    })?;
1468
1469    unpin_trigger_binding(id, version).await
1470}
1471
1472impl TriggerRegistry {
1473    fn refresh_runtime_context(&mut self) {
1474        if self.event_log.is_none() {
1475            self.event_log = active_event_log();
1476        }
1477        if self.secret_provider.is_none() {
1478            self.secret_provider = default_secret_provider();
1479        }
1480    }
1481
1482    fn binding(&self, id: &str, version: u32) -> Option<Arc<TriggerBinding>> {
1483        self.bindings
1484            .get(id)
1485            .and_then(|bindings| bindings.iter().find(|binding| binding.version == version))
1486            .cloned()
1487    }
1488
1489    fn live_bindings(&self, id: &str, source: TriggerBindingSource) -> Vec<Arc<TriggerBinding>> {
1490        self.bindings
1491            .get(id)
1492            .into_iter()
1493            .flat_map(|bindings| bindings.iter())
1494            .filter(|binding| {
1495                binding.source == source && binding.state_snapshot() != TriggerState::Terminated
1496            })
1497            .cloned()
1498            .collect()
1499    }
1500
1501    fn live_bindings_any_source(&self, id: &str) -> Vec<Arc<TriggerBinding>> {
1502        self.bindings
1503            .get(id)
1504            .into_iter()
1505            .flat_map(|bindings| bindings.iter())
1506            .filter(|binding| binding.state_snapshot() != TriggerState::Terminated)
1507            .cloned()
1508            .collect()
1509    }
1510
1511    fn next_version_for_spec(&self, spec: &TriggerBindingSpec) -> u32 {
1512        if let Some(version) = self
1513            .bindings
1514            .get(spec.id.as_str())
1515            .into_iter()
1516            .flat_map(|bindings| bindings.iter())
1517            .find(|binding| binding.definition_fingerprint == spec.definition_fingerprint)
1518            .map(|binding| binding.version)
1519        {
1520            return version;
1521        }
1522
1523        let historical =
1524            self.historical_versions_for(spec.id.as_str(), spec.definition_fingerprint.as_str());
1525        if let Some(version) = historical.matching_version {
1526            return version;
1527        }
1528
1529        self.bindings
1530            .get(spec.id.as_str())
1531            .into_iter()
1532            .flat_map(|bindings| bindings.iter())
1533            .map(|binding| binding.version)
1534            .chain(historical.max_version)
1535            .max()
1536            .unwrap_or(0)
1537            + 1
1538    }
1539
1540    fn gc_terminated_versions(&mut self, id: &str) {
1541        let Some(bindings) = self.bindings.get_mut(id) else {
1542            return;
1543        };
1544
1545        let mut newest_versions: Vec<u32> =
1546            bindings.iter().map(|binding| binding.version).collect();
1547        newest_versions.sort_unstable_by(|left, right| right.cmp(left));
1548        newest_versions.truncate(TERMINATED_VERSION_RETENTION_LIMIT);
1549        let retained_versions: BTreeSet<u32> = newest_versions.into_iter().collect();
1550
1551        bindings.retain(|binding| {
1552            binding.state_snapshot() != TriggerState::Terminated
1553                || retained_versions.contains(&binding.version)
1554        });
1555
1556        if bindings.is_empty() {
1557            self.bindings.remove(id);
1558        }
1559    }
1560
1561    fn historical_versions_for(&self, id: &str, fingerprint: &str) -> HistoricalVersionLookup {
1562        let mut lookup = HistoricalVersionLookup::default();
1563        for record in self.lifecycle_records_for(id) {
1564            lookup.max_version = Some(
1565                lookup
1566                    .max_version
1567                    .unwrap_or(0)
1568                    .max(record.transition.version),
1569            );
1570            if record.transition.definition_fingerprint.as_deref() == Some(fingerprint) {
1571                lookup.matching_version = Some(record.transition.version);
1572            }
1573        }
1574        lookup
1575    }
1576
1577    fn binding_version_as_of(
1578        &self,
1579        id: &str,
1580        as_of: OffsetDateTime,
1581    ) -> Result<u32, TriggerRegistryError> {
1582        let cutoff_ms = harn_clock::offset_datetime_to_ms(as_of);
1583        let mut active_version = None;
1584        for record in self.lifecycle_records_for(id) {
1585            if record.occurred_at_ms > cutoff_ms {
1586                break;
1587            }
1588            match record.transition.to_state {
1589                TriggerState::Active => active_version = Some(record.transition.version),
1590                TriggerState::Paused | TriggerState::Draining | TriggerState::Terminated => {
1591                    if active_version == Some(record.transition.version) {
1592                        active_version = None;
1593                    }
1594                }
1595                TriggerState::Registering => {}
1596            }
1597        }
1598
1599        active_version.ok_or_else(|| {
1600            TriggerRegistryError::InvalidSpec(format!(
1601                "no active trigger binding '{}' at {}",
1602                id,
1603                as_of
1604                    .format(&time::format_description::well_known::Rfc3339)
1605                    .unwrap_or_else(|_| as_of.to_string())
1606            ))
1607        })
1608    }
1609
1610    fn lifecycle_records_for(&self, id: &str) -> Vec<HistoricalLifecycleRecord> {
1611        let Some(event_log) = self.event_log.as_ref() else {
1612            return Vec::new();
1613        };
1614        let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1615            .expect("static triggers.lifecycle topic should always be valid");
1616        futures::executor::block_on(event_log.read_range(&topic, None, usize::MAX))
1617            .unwrap_or_default()
1618            .into_iter()
1619            .filter_map(|(_, event)| {
1620                let occurred_at_ms = event.occurred_at_ms;
1621                let transition: LifecycleStateTransitionRecord =
1622                    serde_json::from_value(event.payload).ok()?;
1623                (transition.id == id).then_some(HistoricalLifecycleRecord {
1624                    occurred_at_ms,
1625                    transition,
1626                })
1627            })
1628            .collect()
1629    }
1630
1631    #[allow(clippy::arc_with_non_send_sync)]
1632    fn register_binding(
1633        &mut self,
1634        spec: TriggerBindingSpec,
1635        version: u32,
1636        lifecycle: &mut Vec<LogEvent>,
1637    ) -> Arc<TriggerBinding> {
1638        let binding = Arc::new(TriggerBinding::new(spec, version));
1639        self.by_provider
1640            .entry(binding.provider.as_str().to_string())
1641            .or_default()
1642            .insert(binding.id.as_str().to_string());
1643        self.bindings
1644            .entry(binding.id.as_str().to_string())
1645            .or_default()
1646            .push(binding.clone());
1647        lifecycle.push(lifecycle_event(&binding, None, TriggerState::Registering));
1648        self.transition_binding_state(&binding, TriggerState::Active, lifecycle);
1649        binding
1650    }
1651
1652    fn transition_binding_to_draining(
1653        &self,
1654        binding: &Arc<TriggerBinding>,
1655        lifecycle: &mut Vec<LogEvent>,
1656    ) {
1657        if matches!(binding.state_snapshot(), TriggerState::Terminated) {
1658            return;
1659        }
1660        self.transition_binding_state(binding, TriggerState::Draining, lifecycle);
1661        self.maybe_finalize_draining(binding, lifecycle);
1662    }
1663
1664    fn maybe_finalize_draining(
1665        &self,
1666        binding: &Arc<TriggerBinding>,
1667        lifecycle: &mut Vec<LogEvent>,
1668    ) {
1669        if binding.state_snapshot() == TriggerState::Draining
1670            && binding.in_flight.load(Ordering::Relaxed) == 0
1671        {
1672            self.transition_binding_state(binding, TriggerState::Terminated, lifecycle);
1673        }
1674    }
1675
1676    fn transition_binding_state(
1677        &self,
1678        binding: &Arc<TriggerBinding>,
1679        next: TriggerState,
1680        lifecycle: &mut Vec<LogEvent>,
1681    ) {
1682        let mut state = binding.state.lock().expect("trigger state poisoned");
1683        let previous = *state;
1684        if previous == next {
1685            return;
1686        }
1687        *state = next;
1688        drop(state);
1689        // CH-04 (#1875): drop any pending aggregation buffers when the
1690        // binding terminates so a long-lived buffer doesn't fire after
1691        // the trigger has been removed. Leftover events are discarded —
1692        // matches the in-flight drain contract elsewhere in the
1693        // registry (the trigger is gone; we'd have nowhere to dispatch).
1694        if next == TriggerState::Terminated && binding.aggregation.is_some() {
1695            let _ = super::aggregation::drop_binding_aggregation(&binding.binding_key());
1696        }
1697        lifecycle.push(lifecycle_event(binding, Some(previous), next));
1698    }
1699}
1700
1701fn lifecycle_event(
1702    binding: &TriggerBinding,
1703    from_state: Option<TriggerState>,
1704    to_state: TriggerState,
1705) -> LogEvent {
1706    LogEvent::new(
1707        "state_transition",
1708        serde_json::json!({
1709            "id": binding.id.as_str(),
1710            "binding_key": binding.binding_key(),
1711            "version": binding.version,
1712            "provider": binding.provider.as_str(),
1713            "kind": &binding.kind,
1714            "source": binding.source.as_str(),
1715            "handler_kind": binding.handler.kind(),
1716            "definition_fingerprint": &binding.definition_fingerprint,
1717            "from_state": from_state.map(TriggerState::as_str),
1718            "to_state": to_state.as_str(),
1719        }),
1720    )
1721}
1722
1723async fn append_lifecycle_events(
1724    event_log: Option<Arc<AnyEventLog>>,
1725    events: Vec<LogEvent>,
1726) -> Result<(), TriggerRegistryError> {
1727    let Some(event_log) = event_log else {
1728        return Ok(());
1729    };
1730    if events.is_empty() {
1731        return Ok(());
1732    }
1733
1734    let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1735        .expect("static triggers.lifecycle topic should always be valid");
1736    for event in events {
1737        event_log
1738            .append(&topic, event)
1739            .await
1740            .map_err(|error| TriggerRegistryError::EventLog(error.to_string()))?;
1741    }
1742    Ok(())
1743}
1744
1745fn default_secret_provider() -> Option<Arc<dyn SecretProvider>> {
1746    configured_default_chain(default_secret_namespace())
1747        .ok()
1748        .map(|provider| Arc::new(provider) as Arc<dyn SecretProvider>)
1749}
1750
1751fn default_secret_namespace() -> String {
1752    if let Ok(namespace) = std::env::var("HARN_SECRET_NAMESPACE") {
1753        if !namespace.trim().is_empty() {
1754            return namespace;
1755        }
1756    }
1757
1758    let cwd = std::env::current_dir().unwrap_or_default();
1759    let leaf = cwd
1760        .file_name()
1761        .and_then(|name| name.to_str())
1762        .filter(|name| !name.is_empty())
1763        .unwrap_or("workspace");
1764    format!("harn/{leaf}")
1765}
1766
1767fn now_ms() -> i64 {
1768    clock::now_ms()
1769}
1770
1771#[cfg(test)]
1772mod tests {
1773    use super::*;
1774    use crate::event_log::{install_default_for_base_dir, reset_active_event_log};
1775    use crate::events::{add_event_sink, clear_event_sinks, CollectorSink, EventLevel};
1776    use crate::triggers::test_util::timing::FILE_WATCH_FALLBACK_POLL;
1777    use std::rc::Rc;
1778    use time::OffsetDateTime;
1779
1780    fn manifest_spec(id: &str, fingerprint: &str) -> TriggerBindingSpec {
1781        TriggerBindingSpec {
1782            id: id.to_string(),
1783            source: TriggerBindingSource::Manifest,
1784            kind: "webhook".to_string(),
1785            provider: ProviderId::from("github"),
1786            autonomy_tier: crate::AutonomyTier::ActAuto,
1787            handler: TriggerHandlerSpec::Worker {
1788                queue: format!("{id}-queue"),
1789            },
1790            dispatch_priority: crate::WorkerQueuePriority::Normal,
1791            when: None,
1792            when_budget: None,
1793            retry: TriggerRetryConfig::default(),
1794            match_events: vec!["issues.opened".to_string()],
1795            dedupe_key: Some("event.dedupe_key".to_string()),
1796            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1797            filter: Some("event.kind".to_string()),
1798            daily_cost_usd: Some(5.0),
1799            hourly_cost_usd: None,
1800            max_autonomous_decisions_per_hour: None,
1801            max_autonomous_decisions_per_day: None,
1802            on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1803            max_concurrent: Some(10),
1804            flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1805            aggregation: None,
1806            manifest_path: None,
1807            package_name: Some("workspace".to_string()),
1808            definition_fingerprint: fingerprint.to_string(),
1809        }
1810    }
1811
1812    fn dynamic_spec(id: &str) -> TriggerBindingSpec {
1813        TriggerBindingSpec {
1814            id: id.to_string(),
1815            source: TriggerBindingSource::Dynamic,
1816            kind: "webhook".to_string(),
1817            provider: ProviderId::from("github"),
1818            autonomy_tier: crate::AutonomyTier::ActAuto,
1819            handler: TriggerHandlerSpec::Worker {
1820                queue: format!("{id}-queue"),
1821            },
1822            dispatch_priority: crate::WorkerQueuePriority::Normal,
1823            when: None,
1824            when_budget: None,
1825            retry: TriggerRetryConfig::default(),
1826            match_events: vec!["issues.opened".to_string()],
1827            dedupe_key: None,
1828            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1829            filter: None,
1830            daily_cost_usd: None,
1831            hourly_cost_usd: None,
1832            max_autonomous_decisions_per_hour: None,
1833            max_autonomous_decisions_per_day: None,
1834            on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1835            max_concurrent: None,
1836            flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1837            aggregation: None,
1838            manifest_path: None,
1839            package_name: None,
1840            definition_fingerprint: format!("dynamic:{id}"),
1841        }
1842    }
1843
1844    #[tokio::test(flavor = "current_thread")]
1845    async fn manifest_loaded_trigger_registers_with_zeroed_metrics() {
1846        clear_trigger_registry();
1847
1848        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1849            .await
1850            .expect("manifest trigger installs");
1851
1852        let snapshots = snapshot_trigger_bindings();
1853        assert_eq!(snapshots.len(), 1);
1854        let binding = &snapshots[0];
1855        assert_eq!(binding.id, "github-new-issue");
1856        assert_eq!(binding.version, 1);
1857        assert_eq!(binding.state, TriggerState::Active);
1858        assert_eq!(binding.metrics, TriggerMetricsSnapshot::default());
1859
1860        clear_trigger_registry();
1861    }
1862
1863    #[tokio::test(flavor = "current_thread")]
1864    async fn dynamic_register_assigns_unique_ids_and_rejects_duplicates() {
1865        clear_trigger_registry();
1866
1867        let first = dynamic_register(dynamic_spec("dynamic-a"))
1868            .await
1869            .expect("first dynamic trigger");
1870        let second = dynamic_register(dynamic_spec("dynamic-b"))
1871            .await
1872            .expect("second dynamic trigger");
1873        assert_ne!(first, second);
1874
1875        let error = dynamic_register(dynamic_spec("dynamic-a"))
1876            .await
1877            .expect_err("duplicate id should fail");
1878        assert!(matches!(error, TriggerRegistryError::DuplicateId(_)));
1879
1880        clear_trigger_registry();
1881    }
1882
1883    #[tokio::test(flavor = "current_thread")]
1884    async fn drain_waits_for_in_flight_events_before_terminating() {
1885        clear_trigger_registry();
1886
1887        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1888            .await
1889            .expect("manifest trigger installs");
1890        begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1891
1892        drain("github-new-issue").await.expect("drain succeeds");
1893        let binding = snapshot_trigger_bindings()
1894            .into_iter()
1895            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1896            .expect("binding snapshot");
1897        assert_eq!(binding.state, TriggerState::Draining);
1898        assert_eq!(binding.metrics.in_flight, 1);
1899
1900        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1901            .await
1902            .expect("finish in-flight event");
1903        let binding = snapshot_trigger_bindings()
1904            .into_iter()
1905            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1906            .expect("binding snapshot");
1907        assert_eq!(binding.state, TriggerState::Terminated);
1908        assert_eq!(binding.metrics.in_flight, 0);
1909
1910        clear_trigger_registry();
1911    }
1912
1913    #[tokio::test(flavor = "current_thread")]
1914    async fn hot_reload_registers_new_version_while_old_binding_drains() {
1915        clear_trigger_registry();
1916
1917        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1918            .await
1919            .expect("initial manifest trigger installs");
1920        begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1921
1922        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1923            .await
1924            .expect("updated manifest trigger installs");
1925
1926        let snapshots = snapshot_trigger_bindings();
1927        assert_eq!(snapshots.len(), 2);
1928        let old = snapshots
1929            .iter()
1930            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1931            .expect("old binding");
1932        let new = snapshots
1933            .iter()
1934            .find(|binding| binding.id == "github-new-issue" && binding.version == 2)
1935            .expect("new binding");
1936        assert_eq!(old.state, TriggerState::Draining);
1937        assert_eq!(new.state, TriggerState::Active);
1938
1939        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1940            .await
1941            .expect("finish old in-flight event");
1942        let old = snapshot_trigger_bindings()
1943            .into_iter()
1944            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1945            .expect("old binding");
1946        assert_eq!(old.state, TriggerState::Terminated);
1947
1948        clear_trigger_registry();
1949    }
1950
1951    #[tokio::test(flavor = "current_thread")]
1952    async fn gc_drops_terminated_versions_beyond_retention_limit() {
1953        clear_trigger_registry();
1954
1955        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1956            .await
1957            .expect("install v1");
1958        begin_in_flight("github-new-issue", 1).expect("pin v1");
1959
1960        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1961            .await
1962            .expect("install v2");
1963        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1964            .await
1965            .expect("finish v1");
1966
1967        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
1968            .await
1969            .expect("install v3");
1970
1971        let snapshots = snapshot_trigger_bindings();
1972        let versions: Vec<u32> = snapshots
1973            .into_iter()
1974            .filter(|binding| binding.id == "github-new-issue")
1975            .map(|binding| binding.version)
1976            .collect();
1977        assert_eq!(versions, vec![2, 3]);
1978
1979        clear_trigger_registry();
1980    }
1981
1982    #[tokio::test(flavor = "current_thread")]
1983    async fn lifecycle_transitions_append_to_event_log() {
1984        clear_trigger_registry();
1985        reset_active_event_log();
1986        let tempdir = tempfile::tempdir().expect("tempdir");
1987        let log = install_default_for_base_dir(tempdir.path()).expect("install event log");
1988
1989        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1990            .await
1991            .expect("manifest trigger installs");
1992        begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1993        drain("github-new-issue").await.expect("drain succeeds");
1994        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1995            .await
1996            .expect("finish event");
1997
1998        let topic = Topic::new("triggers.lifecycle").expect("valid lifecycle topic");
1999        let events = log
2000            .read_range(&topic, None, 32)
2001            .await
2002            .expect("read lifecycle events");
2003        let states: Vec<String> = events
2004            .into_iter()
2005            .filter_map(|(_, event)| {
2006                event
2007                    .payload
2008                    .get("to_state")
2009                    .and_then(|value| value.as_str())
2010                    .map(|value| value.to_string())
2011            })
2012            .collect();
2013        assert_eq!(
2014            states,
2015            vec![
2016                "registering".to_string(),
2017                "active".to_string(),
2018                "draining".to_string(),
2019                "terminated".to_string(),
2020            ]
2021        );
2022
2023        reset_active_event_log();
2024        clear_trigger_registry();
2025    }
2026
2027    #[tokio::test(flavor = "current_thread")]
2028    async fn version_history_reuses_historical_version_after_restart() {
2029        clear_trigger_registry();
2030        reset_active_event_log();
2031        let tempdir = tempfile::tempdir().expect("tempdir");
2032        install_default_for_base_dir(tempdir.path()).expect("install event log");
2033
2034        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2035            .await
2036            .expect("initial manifest trigger installs");
2037        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2038            .await
2039            .expect("updated manifest trigger installs");
2040
2041        clear_trigger_registry();
2042        reset_active_event_log();
2043        install_default_for_base_dir(tempdir.path()).expect("reopen event log");
2044
2045        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2046            .await
2047            .expect("manifest reload reuses historical version");
2048
2049        let binding = snapshot_trigger_bindings()
2050            .into_iter()
2051            .find(|binding| binding.id == "github-new-issue")
2052            .expect("binding snapshot");
2053        assert_eq!(binding.version, 2);
2054
2055        reset_active_event_log();
2056        clear_trigger_registry();
2057    }
2058
2059    #[tokio::test(flavor = "current_thread")]
2060    async fn binding_version_as_of_reports_historical_active_version() {
2061        clear_trigger_registry();
2062        reset_active_event_log();
2063        let tempdir = tempfile::tempdir().expect("tempdir");
2064        install_default_for_base_dir(tempdir.path()).expect("install event log");
2065
2066        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2067            .await
2068            .expect("initial manifest trigger installs");
2069        let before_reload = OffsetDateTime::now_utc();
2070        std::thread::sleep(FILE_WATCH_FALLBACK_POLL);
2071
2072        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2073            .await
2074            .expect("updated manifest trigger installs");
2075        let after_reload = OffsetDateTime::now_utc();
2076
2077        assert_eq!(
2078            binding_version_as_of("github-new-issue", before_reload)
2079                .expect("version before reload"),
2080            1
2081        );
2082        assert_eq!(
2083            binding_version_as_of("github-new-issue", after_reload).expect("version after reload"),
2084            2
2085        );
2086
2087        reset_active_event_log();
2088        clear_trigger_registry();
2089    }
2090
2091    #[tokio::test(flavor = "current_thread")]
2092    async fn resolve_live_or_as_of_logs_structured_gc_fallback() {
2093        clear_trigger_registry();
2094        reset_active_event_log();
2095        let sink = Rc::new(CollectorSink::new());
2096        clear_event_sinks();
2097        add_event_sink(sink.clone());
2098        let tempdir = tempfile::tempdir().expect("tempdir");
2099        install_default_for_base_dir(tempdir.path()).expect("install event log");
2100
2101        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2102            .await
2103            .expect("install v1");
2104        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2105            .await
2106            .expect("install v2");
2107        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
2108            .await
2109            .expect("install v3");
2110        let received_at = OffsetDateTime::now_utc();
2111        std::thread::sleep(FILE_WATCH_FALLBACK_POLL);
2112        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v4")])
2113            .await
2114            .expect("install v4");
2115
2116        let binding = resolve_live_or_as_of(
2117            "github-new-issue",
2118            RecordedTriggerBinding {
2119                version: 1,
2120                received_at,
2121            },
2122        )
2123        .expect("resolve fallback binding");
2124        assert_eq!(binding.version, 3);
2125
2126        let warning = sink
2127            .logs
2128            .borrow()
2129            .iter()
2130            .find(|log| log.category == "replay.binding_version_gc_fallback")
2131            .cloned()
2132            .expect("gc fallback warning");
2133        assert_eq!(warning.level, EventLevel::Warn);
2134        assert_eq!(
2135            warning.metadata.get("trigger_id"),
2136            Some(&serde_json::json!("github-new-issue"))
2137        );
2138        assert_eq!(
2139            warning.metadata.get("recorded_version"),
2140            Some(&serde_json::json!(1))
2141        );
2142        assert_eq!(
2143            warning.metadata.get("received_at"),
2144            Some(&serde_json::json!(received_at
2145                .format(&time::format_description::well_known::Rfc3339)
2146                .unwrap_or_else(|_| received_at.to_string())))
2147        );
2148        assert_eq!(
2149            warning.metadata.get("resolved_version"),
2150            Some(&serde_json::json!(3))
2151        );
2152
2153        clear_event_sinks();
2154        crate::events::reset_event_sinks();
2155        reset_active_event_log();
2156        clear_trigger_registry();
2157    }
2158}