Skip to main content

harn_vm/triggers/
registry.rs

1use serde::{Deserialize, Serialize};
2use std::cell::RefCell;
3use std::collections::{BTreeMap, BTreeSet, VecDeque};
4use std::path::PathBuf;
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6use std::sync::{Arc, Mutex};
7
8use time::OffsetDateTime;
9use uuid::Uuid;
10
11use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogEvent, Topic};
12use crate::llm::trigger_predicate::TriggerPredicateBudget;
13use crate::secrets::{configured_default_chain, SecretProvider};
14use crate::triggers::test_util::clock;
15use crate::trust_graph::AutonomyTier;
16use crate::value::VmClosure;
17
18use super::aggregation::TriggerAggregationConfig;
19use super::dispatcher::TriggerRetryConfig;
20use super::flow_control::TriggerFlowControlConfig;
21use super::ProviderId;
22
23#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
24pub struct TriggerId(String);
25
26impl TriggerId {
27    pub fn new(value: impl Into<String>) -> Self {
28        Self(value.into())
29    }
30
31    pub fn as_str(&self) -> &str {
32        &self.0
33    }
34}
35
36impl std::fmt::Display for TriggerId {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        self.0.fmt(f)
39    }
40}
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
43#[serde(rename_all = "snake_case")]
44pub enum TriggerState {
45    Registering,
46    Active,
47    Paused,
48    Draining,
49    Terminated,
50}
51
52impl TriggerState {
53    pub fn as_str(self) -> &'static str {
54        match self {
55            Self::Registering => "registering",
56            Self::Active => "active",
57            Self::Paused => "paused",
58            Self::Draining => "draining",
59            Self::Terminated => "terminated",
60        }
61    }
62}
63
64#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
65#[serde(rename_all = "snake_case")]
66pub enum TriggerBindingSource {
67    Manifest,
68    Dynamic,
69}
70
71impl TriggerBindingSource {
72    pub fn as_str(self) -> &'static str {
73        match self {
74            Self::Manifest => "manifest",
75            Self::Dynamic => "dynamic",
76        }
77    }
78}
79
80#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
81#[serde(rename_all = "snake_case")]
82pub enum TriggerBudgetExhaustionStrategy {
83    #[default]
84    False,
85    RetryLater,
86    Fail,
87    Warn,
88}
89
90impl TriggerBudgetExhaustionStrategy {
91    pub fn as_str(self) -> &'static str {
92        match self {
93            Self::False => "false",
94            Self::RetryLater => "retry_later",
95            Self::Fail => "fail",
96            Self::Warn => "warn",
97        }
98    }
99}
100
101#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
102pub struct OrchestratorBudgetConfig {
103    pub daily_cost_usd: Option<f64>,
104    pub hourly_cost_usd: Option<f64>,
105}
106
107#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
108pub struct OrchestratorBudgetSnapshot {
109    pub daily_cost_usd: Option<f64>,
110    pub hourly_cost_usd: Option<f64>,
111    pub cost_today_usd_micros: u64,
112    pub cost_hour_usd_micros: u64,
113    pub day_utc: i32,
114    pub hour_utc: i64,
115}
116
117#[derive(Debug)]
118struct OrchestratorBudgetState {
119    config: OrchestratorBudgetConfig,
120    day_utc: i32,
121    hour_utc: i64,
122    cost_today_usd_micros: u64,
123    cost_hour_usd_micros: u64,
124}
125
126impl Default for OrchestratorBudgetState {
127    fn default() -> Self {
128        Self {
129            config: OrchestratorBudgetConfig::default(),
130            day_utc: utc_day_key(),
131            hour_utc: utc_hour_key(),
132            cost_today_usd_micros: 0,
133            cost_hour_usd_micros: 0,
134        }
135    }
136}
137
138#[derive(Clone)]
139pub enum TriggerHandlerSpec {
140    Local {
141        raw: String,
142        closure: Arc<VmClosure>,
143    },
144    A2a {
145        target: String,
146        allow_cleartext: bool,
147    },
148    Worker {
149        queue: String,
150    },
151    Persona {
152        binding: crate::PersonaRuntimeBinding,
153    },
154    EvalPack {
155        target: String,
156        manifest: Box<crate::orchestration::EvalPackManifest>,
157        ledger_options: Option<serde_json::Value>,
158    },
159    AutoResume {
160        worker_id: String,
161    },
162    /// Composes triggers (#1870) with named agent pools (#1883). On match,
163    /// the dispatcher resolves `pool` by name, invokes `task_factory(event)`
164    /// to build the per-event closure, and submits it to the pool with the
165    /// pool's queue strategy + backpressure policy applied (PL-04 / #1889).
166    SpawnToPool {
167        pool: String,
168        priority_from: Option<String>,
169        key_from: Option<String>,
170        task_factory: Arc<VmClosure>,
171    },
172    /// Composes triggers (#1870) with the reminder pipeline (#1815) by
173    /// injecting a `SystemReminder` into a target agent session's transcript
174    /// when the trigger matches (CH-05 / #1876). The reminder appears at the
175    /// session's next turn boundary — same as any other reminder. No spawn,
176    /// no resume, no signal. `body` is a prompt-template string rendered
177    /// against `{event}` / `{batch.events}` per match. `target` resolution
178    /// happens at dispatch time via [`TargetExpr`]; missing-target dispatches
179    /// are recorded as audit events rather than crashing the trigger.
180    ReminderInject {
181        target: TargetExpr,
182        body: String,
183        tags: Vec<String>,
184        ttl_turns: Option<i64>,
185        dedupe_key: Option<String>,
186        propagate: crate::llm::helpers::ReminderPropagate,
187        role_hint: crate::llm::helpers::ReminderRoleHint,
188        preserve_on_compact: bool,
189    },
190    /// CH-10 (#1910): emergency "panic" broadcast. When the trigger fires,
191    /// every running worker matched by `target_agents` is suspended
192    /// synchronously via the cooperative suspend pipeline used by
193    /// `suspend_agent` (#1837), bypassing the normal turn-boundary
194    /// delivery contract. Already-suspended or terminal workers are skipped
195    /// (no double-suspend, no error). `reason` is propagated to the
196    /// suspension envelope, the `WorkerSuspended` event, and the
197    /// `triggers.interrupt_and_suspend.audit` audit entry per suspension.
198    /// A no-target dispatch records a single roll-up audit entry and
199    /// returns a `broadcast` result with `suspended_count: 0` — graceful
200    /// no-op rather than dispatch failure, mirroring the `ReminderInject`
201    /// missing-target contract.
202    InterruptAndSuspend {
203        target_agents: AgentScope,
204        reason: String,
205    },
206}
207
208/// Resolution mode for the target worker set of a
209/// [`TriggerHandlerSpec::InterruptAndSuspend`] dispatch. `All` enumerates
210/// every entry in the local worker registry (the org-scoped "panic"
211/// broadcast); `Concrete` carries an explicit worker-id list from the
212/// registration; `Closure` defers resolution to a user closure that runs at
213/// dispatch time with the event payload bound to its first argument and
214/// must return a list of worker-id strings.
215#[derive(Clone)]
216pub enum AgentScope {
217    All,
218    Concrete(Vec<String>),
219    Closure(Arc<VmClosure>),
220}
221
222impl AgentScope {
223    pub fn kind(&self) -> &'static str {
224        match self {
225            Self::All => "all",
226            Self::Concrete(_) => "concrete",
227            Self::Closure(_) => "closure",
228        }
229    }
230}
231
232impl std::fmt::Debug for AgentScope {
233    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234        match self {
235            Self::All => f.write_str("All"),
236            Self::Concrete(ids) => f.debug_tuple("Concrete").field(ids).finish(),
237            Self::Closure(_) => f.write_str("Closure(<vm_closure>)"),
238        }
239    }
240}
241
242/// Resolution mode for the target session of a [`TriggerHandlerSpec::ReminderInject`]
243/// dispatch. `Current` walks the current-session thread-local; `Parent` resolves
244/// the active session's parent in the agent-session lineage; `Concrete` carries
245/// a literal session id from the registration; `Closure` defers resolution to
246/// a user closure that runs at dispatch time with the event payload bound to
247/// its first argument and must return the session id as a string.
248#[derive(Clone)]
249pub enum TargetExpr {
250    Current,
251    Parent,
252    Concrete(String),
253    Closure(Arc<VmClosure>),
254}
255
256impl TargetExpr {
257    pub fn kind(&self) -> &'static str {
258        match self {
259            Self::Current => "current",
260            Self::Parent => "parent",
261            Self::Concrete(_) => "concrete",
262            Self::Closure(_) => "closure",
263        }
264    }
265}
266
267impl std::fmt::Debug for TargetExpr {
268    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269        match self {
270            Self::Current => f.write_str("Current"),
271            Self::Parent => f.write_str("Parent"),
272            Self::Concrete(id) => f.debug_tuple("Concrete").field(id).finish(),
273            Self::Closure(_) => f.write_str("Closure(<vm_closure>)"),
274        }
275    }
276}
277
278impl std::fmt::Debug for TriggerHandlerSpec {
279    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280        match self {
281            Self::Local { raw, .. } => f.debug_struct("Local").field("raw", raw).finish(),
282            Self::A2a {
283                target,
284                allow_cleartext,
285            } => f
286                .debug_struct("A2a")
287                .field("target", target)
288                .field("allow_cleartext", allow_cleartext)
289                .finish(),
290            Self::Worker { queue } => f.debug_struct("Worker").field("queue", queue).finish(),
291            Self::Persona { binding } => f
292                .debug_struct("Persona")
293                .field("name", &binding.name)
294                .finish(),
295            Self::EvalPack {
296                target,
297                manifest,
298                ledger_options,
299            } => f
300                .debug_struct("EvalPack")
301                .field("target", target)
302                .field("pack_id", &manifest.id)
303                .field("ledger_options", ledger_options)
304                .finish(),
305            Self::AutoResume { worker_id } => f
306                .debug_struct("AutoResume")
307                .field("worker_id", worker_id)
308                .finish(),
309            Self::SpawnToPool {
310                pool,
311                priority_from,
312                key_from,
313                ..
314            } => f
315                .debug_struct("SpawnToPool")
316                .field("pool", pool)
317                .field("priority_from", priority_from)
318                .field("key_from", key_from)
319                .finish(),
320            Self::ReminderInject {
321                target,
322                body,
323                tags,
324                ttl_turns,
325                dedupe_key,
326                propagate,
327                role_hint,
328                preserve_on_compact,
329            } => f
330                .debug_struct("ReminderInject")
331                .field("target", target)
332                .field("body", body)
333                .field("tags", tags)
334                .field("ttl_turns", ttl_turns)
335                .field("dedupe_key", dedupe_key)
336                .field("propagate", propagate)
337                .field("role_hint", role_hint)
338                .field("preserve_on_compact", preserve_on_compact)
339                .finish(),
340            Self::InterruptAndSuspend {
341                target_agents,
342                reason,
343            } => f
344                .debug_struct("InterruptAndSuspend")
345                .field("target_agents", target_agents)
346                .field("reason", reason)
347                .finish(),
348        }
349    }
350}
351
352impl TriggerHandlerSpec {
353    pub fn kind(&self) -> &'static str {
354        match self {
355            Self::Local { .. } => "local",
356            Self::A2a { .. } => "a2a",
357            Self::Worker { .. } => "worker",
358            Self::Persona { .. } => "persona",
359            Self::EvalPack { .. } => "eval_pack",
360            Self::AutoResume { .. } => "auto_resume",
361            Self::SpawnToPool { .. } => "spawn_to_pool",
362            Self::ReminderInject { .. } => "reminder_inject",
363            Self::InterruptAndSuspend { .. } => "interrupt_and_suspend",
364        }
365    }
366}
367
368#[derive(Clone)]
369pub struct TriggerPredicateSpec {
370    pub raw: String,
371    pub closure: Arc<VmClosure>,
372}
373
374impl std::fmt::Debug for TriggerPredicateSpec {
375    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
376        f.debug_struct("TriggerPredicateSpec")
377            .field("raw", &self.raw)
378            .finish()
379    }
380}
381
382#[derive(Clone, Debug)]
383pub struct TriggerBindingSpec {
384    pub id: String,
385    pub source: TriggerBindingSource,
386    pub kind: String,
387    pub provider: ProviderId,
388    pub autonomy_tier: AutonomyTier,
389    pub handler: TriggerHandlerSpec,
390    pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
391    pub when: Option<TriggerPredicateSpec>,
392    pub when_budget: Option<TriggerPredicateBudget>,
393    pub retry: TriggerRetryConfig,
394    pub match_events: Vec<String>,
395    pub dedupe_key: Option<String>,
396    pub dedupe_retention_days: u32,
397    pub filter: Option<String>,
398    pub daily_cost_usd: Option<f64>,
399    pub hourly_cost_usd: Option<f64>,
400    pub max_autonomous_decisions_per_hour: Option<u64>,
401    pub max_autonomous_decisions_per_day: Option<u64>,
402    pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
403    pub max_concurrent: Option<u32>,
404    pub flow_control: TriggerFlowControlConfig,
405    /// CH-04 (#1875): optional aggregation buffer. When set, the
406    /// dispatcher accumulates matching events into a per-(binding,
407    /// partition_key) buffer and dispatches the handler with a batched
408    /// event when `count` is reached or the `window` elapses.
409    pub aggregation: Option<TriggerAggregationConfig>,
410    pub manifest_path: Option<PathBuf>,
411    pub package_name: Option<String>,
412    pub definition_fingerprint: String,
413}
414
415#[derive(Debug)]
416pub struct TriggerMetrics {
417    pub received: AtomicU64,
418    pub dispatched: AtomicU64,
419    pub failed: AtomicU64,
420    pub dlq: AtomicU64,
421    pub last_received_ms: Mutex<Option<i64>>,
422    pub cost_total_usd_micros: AtomicU64,
423    pub cost_today_usd_micros: AtomicU64,
424    pub cost_hour_usd_micros: AtomicU64,
425    pub autonomous_decisions_total: AtomicU64,
426    pub autonomous_decisions_today: AtomicU64,
427    pub autonomous_decisions_hour: AtomicU64,
428}
429
430impl Default for TriggerMetrics {
431    fn default() -> Self {
432        Self {
433            received: AtomicU64::new(0),
434            dispatched: AtomicU64::new(0),
435            failed: AtomicU64::new(0),
436            dlq: AtomicU64::new(0),
437            last_received_ms: Mutex::new(None),
438            cost_total_usd_micros: AtomicU64::new(0),
439            cost_today_usd_micros: AtomicU64::new(0),
440            cost_hour_usd_micros: AtomicU64::new(0),
441            autonomous_decisions_total: AtomicU64::new(0),
442            autonomous_decisions_today: AtomicU64::new(0),
443            autonomous_decisions_hour: AtomicU64::new(0),
444        }
445    }
446}
447
448#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
449pub struct TriggerMetricsSnapshot {
450    pub received: u64,
451    pub dispatched: u64,
452    pub failed: u64,
453    pub dlq: u64,
454    pub in_flight: u64,
455    pub last_received_ms: Option<i64>,
456    pub cost_total_usd_micros: u64,
457    pub cost_today_usd_micros: u64,
458    pub cost_hour_usd_micros: u64,
459    pub autonomous_decisions_total: u64,
460    pub autonomous_decisions_today: u64,
461    pub autonomous_decisions_hour: u64,
462}
463
464pub struct TriggerBinding {
465    pub id: TriggerId,
466    pub version: u32,
467    pub source: TriggerBindingSource,
468    pub kind: String,
469    pub provider: ProviderId,
470    pub autonomy_tier: AutonomyTier,
471    pub handler: TriggerHandlerSpec,
472    pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
473    pub when: Option<TriggerPredicateSpec>,
474    pub when_budget: Option<TriggerPredicateBudget>,
475    pub retry: TriggerRetryConfig,
476    pub match_events: Vec<String>,
477    pub dedupe_key: Option<String>,
478    pub dedupe_retention_days: u32,
479    pub filter: Option<String>,
480    pub daily_cost_usd: Option<f64>,
481    pub hourly_cost_usd: Option<f64>,
482    pub max_autonomous_decisions_per_hour: Option<u64>,
483    pub max_autonomous_decisions_per_day: Option<u64>,
484    pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
485    pub max_concurrent: Option<u32>,
486    pub flow_control: TriggerFlowControlConfig,
487    /// CH-04 (#1875): see `TriggerBindingSpec::aggregation`. Cloned at
488    /// registration so the dispatcher does not need to lock the registry
489    /// for every emit.
490    pub aggregation: Option<TriggerAggregationConfig>,
491    pub manifest_path: Option<PathBuf>,
492    pub package_name: Option<String>,
493    pub definition_fingerprint: String,
494    pub state: Mutex<TriggerState>,
495    pub metrics: TriggerMetrics,
496    pub in_flight: AtomicU64,
497    pub cancel_token: Arc<AtomicBool>,
498    pub predicate_state: Mutex<TriggerPredicateState>,
499}
500
501#[derive(Clone, Debug, Default)]
502pub struct TriggerPredicateState {
503    pub budget_day_utc: Option<i32>,
504    pub budget_hour_utc: Option<i64>,
505    pub consecutive_failures: u32,
506    pub breaker_open_until_ms: Option<i64>,
507    pub recent_cost_usd_micros: VecDeque<u64>,
508}
509
510impl std::fmt::Debug for TriggerBinding {
511    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
512        f.debug_struct("TriggerBinding")
513            .field("id", &self.id)
514            .field("version", &self.version)
515            .field("source", &self.source)
516            .field("kind", &self.kind)
517            .field("provider", &self.provider)
518            .field("handler_kind", &self.handler.kind())
519            .field("state", &self.state_snapshot())
520            .finish()
521    }
522}
523
524impl TriggerBinding {
525    pub fn snapshot(&self) -> TriggerBindingSnapshot {
526        TriggerBindingSnapshot {
527            id: self.id.as_str().to_string(),
528            version: self.version,
529            source: self.source,
530            kind: self.kind.clone(),
531            provider: self.provider.as_str().to_string(),
532            autonomy_tier: self.autonomy_tier,
533            handler_kind: self.handler.kind().to_string(),
534            state: self.state_snapshot(),
535            metrics: self.metrics_snapshot(),
536            daily_cost_usd: self.daily_cost_usd,
537            hourly_cost_usd: self.hourly_cost_usd,
538            max_autonomous_decisions_per_hour: self.max_autonomous_decisions_per_hour,
539            max_autonomous_decisions_per_day: self.max_autonomous_decisions_per_day,
540            on_budget_exhausted: self.on_budget_exhausted,
541        }
542    }
543
544    fn new(spec: TriggerBindingSpec, version: u32) -> Self {
545        Self {
546            id: TriggerId::new(spec.id),
547            version,
548            source: spec.source,
549            kind: spec.kind,
550            provider: spec.provider,
551            autonomy_tier: spec.autonomy_tier,
552            handler: spec.handler,
553            dispatch_priority: spec.dispatch_priority,
554            when: spec.when,
555            when_budget: spec.when_budget,
556            retry: spec.retry,
557            match_events: spec.match_events,
558            dedupe_key: spec.dedupe_key,
559            dedupe_retention_days: spec.dedupe_retention_days,
560            filter: spec.filter,
561            daily_cost_usd: spec.daily_cost_usd,
562            hourly_cost_usd: spec.hourly_cost_usd,
563            max_autonomous_decisions_per_hour: spec.max_autonomous_decisions_per_hour,
564            max_autonomous_decisions_per_day: spec.max_autonomous_decisions_per_day,
565            on_budget_exhausted: spec.on_budget_exhausted,
566            max_concurrent: spec.max_concurrent,
567            flow_control: spec.flow_control,
568            aggregation: spec.aggregation,
569            manifest_path: spec.manifest_path,
570            package_name: spec.package_name,
571            definition_fingerprint: spec.definition_fingerprint,
572            state: Mutex::new(TriggerState::Registering),
573            metrics: TriggerMetrics::default(),
574            in_flight: AtomicU64::new(0),
575            cancel_token: Arc::new(AtomicBool::new(false)),
576            predicate_state: Mutex::new(TriggerPredicateState::default()),
577        }
578    }
579
580    pub fn binding_key(&self) -> String {
581        format!("{}@v{}", self.id.as_str(), self.version)
582    }
583
584    pub fn state_snapshot(&self) -> TriggerState {
585        *self.state.lock().expect("trigger state poisoned")
586    }
587
588    pub fn metrics_snapshot(&self) -> TriggerMetricsSnapshot {
589        TriggerMetricsSnapshot {
590            received: self.metrics.received.load(Ordering::Relaxed),
591            dispatched: self.metrics.dispatched.load(Ordering::Relaxed),
592            failed: self.metrics.failed.load(Ordering::Relaxed),
593            dlq: self.metrics.dlq.load(Ordering::Relaxed),
594            in_flight: self.in_flight.load(Ordering::Relaxed),
595            last_received_ms: *self
596                .metrics
597                .last_received_ms
598                .lock()
599                .expect("trigger metrics poisoned"),
600            cost_total_usd_micros: self.metrics.cost_total_usd_micros.load(Ordering::Relaxed),
601            cost_today_usd_micros: self.metrics.cost_today_usd_micros.load(Ordering::Relaxed),
602            cost_hour_usd_micros: self.metrics.cost_hour_usd_micros.load(Ordering::Relaxed),
603            autonomous_decisions_total: self
604                .metrics
605                .autonomous_decisions_total
606                .load(Ordering::Relaxed),
607            autonomous_decisions_today: self
608                .metrics
609                .autonomous_decisions_today
610                .load(Ordering::Relaxed),
611            autonomous_decisions_hour: self
612                .metrics
613                .autonomous_decisions_hour
614                .load(Ordering::Relaxed),
615        }
616    }
617}
618
619#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
620pub struct TriggerBindingSnapshot {
621    pub id: String,
622    pub version: u32,
623    pub source: TriggerBindingSource,
624    pub kind: String,
625    pub provider: String,
626    pub autonomy_tier: AutonomyTier,
627    pub handler_kind: String,
628    pub state: TriggerState,
629    pub metrics: TriggerMetricsSnapshot,
630    pub daily_cost_usd: Option<f64>,
631    pub hourly_cost_usd: Option<f64>,
632    pub max_autonomous_decisions_per_hour: Option<u64>,
633    pub max_autonomous_decisions_per_day: Option<u64>,
634    pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
635}
636
637#[derive(Clone, Copy, Debug, PartialEq, Eq)]
638pub enum TriggerDispatchOutcome {
639    Dispatched,
640    Failed,
641    Dlq,
642}
643
644#[derive(Debug)]
645pub enum TriggerRegistryError {
646    DuplicateId(String),
647    InvalidSpec(String),
648    UnknownId(String),
649    UnknownBindingVersion { id: String, version: u32 },
650    EventLog(String),
651}
652
653impl std::fmt::Display for TriggerRegistryError {
654    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
655        match self {
656            Self::DuplicateId(id) => write!(f, "duplicate trigger id '{id}'"),
657            Self::InvalidSpec(message) | Self::EventLog(message) => f.write_str(message),
658            Self::UnknownId(id) => write!(f, "unknown trigger id '{id}'"),
659            Self::UnknownBindingVersion { id, version } => {
660                write!(f, "unknown trigger binding '{id}' version {version}")
661            }
662        }
663    }
664}
665
666impl std::error::Error for TriggerRegistryError {}
667
668#[derive(Default)]
669pub struct TriggerRegistry {
670    bindings: BTreeMap<String, Vec<Arc<TriggerBinding>>>,
671    by_provider: BTreeMap<String, BTreeSet<String>>,
672    event_log: Option<Arc<AnyEventLog>>,
673    secret_provider: Option<Arc<dyn SecretProvider>>,
674}
675
676thread_local! {
677    static TRIGGER_REGISTRY: RefCell<TriggerRegistry> = RefCell::new(TriggerRegistry::default());
678}
679
680thread_local! {
681    static ORCHESTRATOR_BUDGET: RefCell<OrchestratorBudgetState> =
682        RefCell::new(OrchestratorBudgetState::default());
683}
684
685const TERMINATED_VERSION_RETENTION_LIMIT: usize = 2;
686
687const TRIGGERS_LIFECYCLE_TOPIC: &str = "triggers.lifecycle";
688const PREDICATE_COST_WINDOW: usize = 100;
689
690#[derive(Clone, Debug, Deserialize)]
691struct LifecycleStateTransitionRecord {
692    id: String,
693    version: u32,
694    #[serde(default)]
695    definition_fingerprint: Option<String>,
696    to_state: TriggerState,
697}
698
699#[derive(Clone, Debug)]
700struct HistoricalLifecycleRecord {
701    occurred_at_ms: i64,
702    transition: LifecycleStateTransitionRecord,
703}
704
705#[derive(Clone, Copy, Debug, PartialEq, Eq)]
706pub struct RecordedTriggerBinding {
707    pub version: u32,
708    pub received_at: OffsetDateTime,
709}
710
711#[derive(Clone, Copy, Debug, Default)]
712struct HistoricalVersionLookup {
713    matching_version: Option<u32>,
714    max_version: Option<u32>,
715}
716
717pub fn clear_trigger_registry() {
718    TRIGGER_REGISTRY.with(|slot| {
719        *slot.borrow_mut() = TriggerRegistry::default();
720    });
721    clear_orchestrator_budget();
722    super::aggregation::clear_aggregation_state();
723}
724
725pub fn install_orchestrator_budget(config: OrchestratorBudgetConfig) {
726    ORCHESTRATOR_BUDGET.with(|slot| {
727        let mut state = slot.borrow_mut();
728        rollover_orchestrator_budget(&mut state);
729        state.config = config;
730    });
731}
732
733pub fn clear_orchestrator_budget() {
734    ORCHESTRATOR_BUDGET.with(|slot| {
735        *slot.borrow_mut() = OrchestratorBudgetState::default();
736    });
737}
738
739pub fn snapshot_orchestrator_budget() -> OrchestratorBudgetSnapshot {
740    ORCHESTRATOR_BUDGET.with(|slot| {
741        let mut state = slot.borrow_mut();
742        rollover_orchestrator_budget(&mut state);
743        OrchestratorBudgetSnapshot {
744            daily_cost_usd: state.config.daily_cost_usd,
745            hourly_cost_usd: state.config.hourly_cost_usd,
746            cost_today_usd_micros: state.cost_today_usd_micros,
747            cost_hour_usd_micros: state.cost_hour_usd_micros,
748            day_utc: state.day_utc,
749            hour_utc: state.hour_utc,
750        }
751    })
752}
753
754pub fn note_orchestrator_budget_cost(cost_usd_micros: u64) {
755    if cost_usd_micros == 0 {
756        return;
757    }
758    ORCHESTRATOR_BUDGET.with(|slot| {
759        let mut state = slot.borrow_mut();
760        rollover_orchestrator_budget(&mut state);
761        state.cost_today_usd_micros = state.cost_today_usd_micros.saturating_add(cost_usd_micros);
762        state.cost_hour_usd_micros = state.cost_hour_usd_micros.saturating_add(cost_usd_micros);
763    });
764}
765
766pub(crate) fn note_binding_budget_cost(binding: &TriggerBinding, cost_usd_micros: u64) {
767    if cost_usd_micros == 0 {
768        return;
769    }
770    reset_binding_budget_windows(binding);
771    binding
772        .metrics
773        .cost_total_usd_micros
774        .fetch_add(cost_usd_micros, Ordering::Relaxed);
775    binding
776        .metrics
777        .cost_today_usd_micros
778        .fetch_add(cost_usd_micros, Ordering::Relaxed);
779    binding
780        .metrics
781        .cost_hour_usd_micros
782        .fetch_add(cost_usd_micros, Ordering::Relaxed);
783}
784
785pub fn orchestrator_budget_would_exceed(expected_cost_usd_micros: u64) -> Option<&'static str> {
786    ORCHESTRATOR_BUDGET.with(|slot| {
787        let mut state = slot.borrow_mut();
788        rollover_orchestrator_budget(&mut state);
789        if state.config.hourly_cost_usd.is_some_and(|limit| {
790            micros_to_usd(
791                state
792                    .cost_hour_usd_micros
793                    .saturating_add(expected_cost_usd_micros),
794            ) > limit
795        }) {
796            return Some("orchestrator_hourly_budget_exceeded");
797        }
798        if state.config.daily_cost_usd.is_some_and(|limit| {
799            micros_to_usd(
800                state
801                    .cost_today_usd_micros
802                    .saturating_add(expected_cost_usd_micros),
803            ) > limit
804        }) {
805            return Some("orchestrator_daily_budget_exceeded");
806        }
807        None
808    })
809}
810
811pub fn reset_binding_budget_windows(binding: &TriggerBinding) {
812    let today = utc_day_key();
813    let hour = utc_hour_key();
814    let mut state = binding
815        .predicate_state
816        .lock()
817        .expect("trigger predicate state poisoned");
818    if state.budget_day_utc != Some(today) {
819        state.budget_day_utc = Some(today);
820        binding
821            .metrics
822            .cost_today_usd_micros
823            .store(0, Ordering::Relaxed);
824        binding
825            .metrics
826            .autonomous_decisions_today
827            .store(0, Ordering::Relaxed);
828    }
829    if state.budget_hour_utc != Some(hour) {
830        state.budget_hour_utc = Some(hour);
831        binding
832            .metrics
833            .cost_hour_usd_micros
834            .store(0, Ordering::Relaxed);
835        binding
836            .metrics
837            .autonomous_decisions_hour
838            .store(0, Ordering::Relaxed);
839    }
840}
841
842pub fn binding_budget_would_exceed(
843    binding: &TriggerBinding,
844    expected_cost_usd_micros: u64,
845) -> Option<&'static str> {
846    reset_binding_budget_windows(binding);
847    if binding.hourly_cost_usd.is_some_and(|limit| {
848        micros_to_usd(
849            binding
850                .metrics
851                .cost_hour_usd_micros
852                .load(Ordering::Relaxed)
853                .saturating_add(expected_cost_usd_micros),
854        ) > limit
855    }) {
856        return Some("hourly_budget_exceeded");
857    }
858    if binding.daily_cost_usd.is_some_and(|limit| {
859        micros_to_usd(
860            binding
861                .metrics
862                .cost_today_usd_micros
863                .load(Ordering::Relaxed)
864                .saturating_add(expected_cost_usd_micros),
865        ) > limit
866    }) {
867        return Some("daily_budget_exceeded");
868    }
869    None
870}
871
872pub fn binding_autonomy_budget_would_exceed(binding: &TriggerBinding) -> Option<&'static str> {
873    reset_binding_budget_windows(binding);
874    if binding
875        .max_autonomous_decisions_per_hour
876        .is_some_and(|limit| {
877            binding
878                .metrics
879                .autonomous_decisions_hour
880                .load(Ordering::Relaxed)
881                .saturating_add(1)
882                > limit
883        })
884    {
885        return Some("hourly_autonomy_budget_exceeded");
886    }
887    if binding
888        .max_autonomous_decisions_per_day
889        .is_some_and(|limit| {
890            binding
891                .metrics
892                .autonomous_decisions_today
893                .load(Ordering::Relaxed)
894                .saturating_add(1)
895                > limit
896        })
897    {
898        return Some("daily_autonomy_budget_exceeded");
899    }
900    None
901}
902
903pub fn note_autonomous_decision(binding: &TriggerBinding) {
904    reset_binding_budget_windows(binding);
905    binding
906        .metrics
907        .autonomous_decisions_total
908        .fetch_add(1, Ordering::Relaxed);
909    binding
910        .metrics
911        .autonomous_decisions_today
912        .fetch_add(1, Ordering::Relaxed);
913    binding
914        .metrics
915        .autonomous_decisions_hour
916        .fetch_add(1, Ordering::Relaxed);
917}
918
919pub fn expected_predicate_cost_usd_micros(binding: &TriggerBinding) -> u64 {
920    let state = binding
921        .predicate_state
922        .lock()
923        .expect("trigger predicate state poisoned");
924    if let Some(average) = average_cost_sample_micros(&state.recent_cost_usd_micros) {
925        return average;
926    }
927    binding
928        .when_budget
929        .as_ref()
930        .and_then(|budget| budget.max_cost_usd)
931        .map(usd_to_micros)
932        .unwrap_or_default()
933}
934
935pub fn record_predicate_cost_sample(binding: &TriggerBinding, cost_usd_micros: u64) {
936    let mut state = binding
937        .predicate_state
938        .lock()
939        .expect("trigger predicate state poisoned");
940    state.recent_cost_usd_micros.push_back(cost_usd_micros);
941    while state.recent_cost_usd_micros.len() > PREDICATE_COST_WINDOW {
942        state.recent_cost_usd_micros.pop_front();
943    }
944}
945
946fn average_cost_sample_micros(samples: &VecDeque<u64>) -> Option<u64> {
947    if samples.is_empty() {
948        return None;
949    }
950    let total: u128 = samples.iter().map(|sample| u128::from(*sample)).sum();
951    Some((total / samples.len() as u128) as u64)
952}
953
954pub fn usd_to_micros(value: f64) -> u64 {
955    if !value.is_finite() || value <= 0.0 {
956        return 0;
957    }
958    (value * 1_000_000.0).ceil() as u64
959}
960
961pub fn micros_to_usd(value: u64) -> f64 {
962    value as f64 / 1_000_000.0
963}
964
965fn rollover_orchestrator_budget(state: &mut OrchestratorBudgetState) {
966    let today = utc_day_key();
967    let hour = utc_hour_key();
968    if state.day_utc != today {
969        state.day_utc = today;
970        state.cost_today_usd_micros = 0;
971    }
972    if state.hour_utc != hour {
973        state.hour_utc = hour;
974        state.cost_hour_usd_micros = 0;
975    }
976}
977
978fn utc_day_key() -> i32 {
979    (clock::now_utc().date()
980        - time::Date::from_calendar_date(1970, time::Month::January, 1).expect("valid epoch date"))
981    .whole_days() as i32
982}
983
984fn utc_hour_key() -> i64 {
985    clock::now_utc().unix_timestamp() / 3_600
986}
987
988pub fn snapshot_trigger_bindings() -> Vec<TriggerBindingSnapshot> {
989    TRIGGER_REGISTRY.with(|slot| {
990        let registry = slot.borrow();
991        let mut snapshots = Vec::new();
992        for bindings in registry.bindings.values() {
993            for binding in bindings {
994                snapshots.push(binding.snapshot());
995            }
996        }
997        snapshots.sort_by(|left, right| {
998            left.id
999                .cmp(&right.id)
1000                .then(left.version.cmp(&right.version))
1001                .then(left.state.as_str().cmp(right.state.as_str()))
1002        });
1003        snapshots
1004    })
1005}
1006
1007#[allow(clippy::arc_with_non_send_sync)]
1008pub fn resolve_trigger_binding_as_of(
1009    id: &str,
1010    as_of: OffsetDateTime,
1011) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
1012    let version = binding_version_as_of(id, as_of)?;
1013    resolve_trigger_binding_version(id, version)
1014}
1015
1016#[allow(clippy::arc_with_non_send_sync)]
1017pub fn resolve_live_or_as_of(
1018    id: &str,
1019    recorded: RecordedTriggerBinding,
1020) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
1021    match resolve_live_trigger_binding(id, Some(recorded.version)) {
1022        Ok(binding) => Ok(binding),
1023        Err(TriggerRegistryError::UnknownBindingVersion { .. }) => {
1024            let binding = resolve_trigger_binding_as_of(id, recorded.received_at)?;
1025            let mut metadata = BTreeMap::new();
1026            metadata.insert("trigger_id".to_string(), serde_json::json!(id));
1027            metadata.insert(
1028                "recorded_version".to_string(),
1029                serde_json::json!(recorded.version),
1030            );
1031            metadata.insert(
1032                "received_at".to_string(),
1033                serde_json::json!(recorded
1034                    .received_at
1035                    .format(&time::format_description::well_known::Rfc3339)
1036                    .unwrap_or_else(|_| recorded.received_at.to_string())),
1037            );
1038            metadata.insert(
1039                "resolved_version".to_string(),
1040                serde_json::json!(binding.version),
1041            );
1042            crate::events::log_warn_meta(
1043                "replay.binding_version_gc_fallback",
1044                "trigger replay fell back to lifecycle history after binding version GC",
1045                metadata,
1046            );
1047            Ok(binding)
1048        }
1049        Err(error) => Err(error),
1050    }
1051}
1052
1053pub fn binding_version_as_of(id: &str, as_of: OffsetDateTime) -> Result<u32, TriggerRegistryError> {
1054    TRIGGER_REGISTRY.with(|slot| {
1055        let registry = slot.borrow();
1056        registry.binding_version_as_of(id, as_of)
1057    })
1058}
1059
1060#[allow(clippy::arc_with_non_send_sync)]
1061fn resolve_trigger_binding_version(
1062    id: &str,
1063    version: u32,
1064) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
1065    TRIGGER_REGISTRY.with(|slot| {
1066        let registry = slot.borrow();
1067        registry
1068            .binding(id, version)
1069            .ok_or_else(|| TriggerRegistryError::UnknownBindingVersion {
1070                id: id.to_string(),
1071                version,
1072            })
1073    })
1074}
1075
1076#[allow(clippy::arc_with_non_send_sync)]
1077pub fn resolve_live_trigger_binding(
1078    id: &str,
1079    version: Option<u32>,
1080) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
1081    TRIGGER_REGISTRY.with(|slot| {
1082        let registry = slot.borrow();
1083        if let Some(version) = version {
1084            let binding = registry.binding(id, version).ok_or_else(|| {
1085                TriggerRegistryError::UnknownBindingVersion {
1086                    id: id.to_string(),
1087                    version,
1088                }
1089            })?;
1090            if binding.state_snapshot() == TriggerState::Terminated {
1091                return Err(TriggerRegistryError::UnknownBindingVersion {
1092                    id: id.to_string(),
1093                    version,
1094                });
1095            }
1096            return Ok(binding);
1097        }
1098
1099        registry
1100            .live_bindings_any_source(id)
1101            .into_iter()
1102            .max_by_key(|binding| binding.version)
1103            .ok_or_else(|| TriggerRegistryError::UnknownId(id.to_string()))
1104    })
1105}
1106
1107/// Snapshot active channel-source trigger bindings whose `channel:` selector
1108/// in `match_events[0]` matches the supplied (scope, scope_id, name) tuple.
1109///
1110/// Used by [`crate::channels`] fan-out (CH-02 / #1872). Returns bindings
1111/// sorted by id then version so dispatch ordering is deterministic.
1112pub(crate) fn channel_bindings_matching(
1113    scope: &str,
1114    scope_id: &str,
1115    name: &str,
1116) -> Vec<Arc<TriggerBinding>> {
1117    TRIGGER_REGISTRY.with(|slot| {
1118        let registry = slot.borrow();
1119        let Some(binding_ids) = registry.by_provider.get("channel") else {
1120            return Vec::new();
1121        };
1122        let mut bindings = Vec::new();
1123        for id in binding_ids {
1124            let Some(versions) = registry.bindings.get(id) else {
1125                continue;
1126            };
1127            for binding in versions {
1128                if binding.state_snapshot() != TriggerState::Active {
1129                    continue;
1130                }
1131                let Some(selector_raw) = binding.match_events.first() else {
1132                    continue;
1133                };
1134                let Ok(selector) = crate::channels::ChannelSelector::parse(selector_raw) else {
1135                    continue;
1136                };
1137                // For tenant-default ("Current") selectors we resolve "current"
1138                // to the scope_id of the emit, since the trigger and the
1139                // emitter share the same registry / runtime context.
1140                if !selector.matches(scope, scope_id, name, scope_id) {
1141                    continue;
1142                }
1143                bindings.push(binding.clone());
1144            }
1145        }
1146        bindings.sort_by(|left, right| {
1147            left.id
1148                .as_str()
1149                .cmp(right.id.as_str())
1150                .then(left.version.cmp(&right.version))
1151        });
1152        bindings
1153    })
1154}
1155
1156pub(crate) fn matching_bindings(event: &super::TriggerEvent) -> Vec<Arc<TriggerBinding>> {
1157    TRIGGER_REGISTRY.with(|slot| {
1158        let registry = slot.borrow();
1159        let Some(binding_ids) = registry.by_provider.get(event.provider.as_str()) else {
1160            return Vec::new();
1161        };
1162
1163        let mut bindings = Vec::new();
1164        for id in binding_ids {
1165            let Some(versions) = registry.bindings.get(id) else {
1166                continue;
1167            };
1168            for binding in versions {
1169                if binding.state_snapshot() != TriggerState::Active {
1170                    continue;
1171                }
1172                if !binding.match_events.is_empty()
1173                    && !binding
1174                        .match_events
1175                        .iter()
1176                        .any(|kind| trigger_event_kind_matches(event, kind))
1177                {
1178                    continue;
1179                }
1180                bindings.push(binding.clone());
1181            }
1182        }
1183
1184        bindings.sort_by(|left, right| {
1185            left.id
1186                .as_str()
1187                .cmp(right.id.as_str())
1188                .then(left.version.cmp(&right.version))
1189        });
1190        bindings
1191    })
1192}
1193
1194fn trigger_event_kind_matches(event: &super::TriggerEvent, expected: &str) -> bool {
1195    if expected == event.kind {
1196        return true;
1197    }
1198    expected
1199        .strip_prefix(event.provider.as_str())
1200        .and_then(|rest| rest.strip_prefix('.'))
1201        .is_some_and(|kind| kind == event.kind)
1202}
1203
1204pub async fn install_manifest_triggers(
1205    specs: Vec<TriggerBindingSpec>,
1206) -> Result<(), TriggerRegistryError> {
1207    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1208        let registry = &mut *slot.borrow_mut();
1209        registry.refresh_runtime_context();
1210        let mut touched_ids = BTreeSet::new();
1211
1212        let mut incoming = BTreeMap::new();
1213        for spec in specs {
1214            let spec_id = spec.id.clone();
1215            if spec.source != TriggerBindingSource::Manifest {
1216                return Err(TriggerRegistryError::InvalidSpec(format!(
1217                    "manifest install received non-manifest trigger '{spec_id}'"
1218                )));
1219            }
1220            if spec_id.trim().is_empty() {
1221                return Err(TriggerRegistryError::InvalidSpec(
1222                    "manifest trigger id cannot be empty".to_string(),
1223                ));
1224            }
1225            if incoming.insert(spec_id.clone(), spec).is_some() {
1226                return Err(TriggerRegistryError::DuplicateId(spec_id));
1227            }
1228        }
1229
1230        let mut lifecycle = Vec::new();
1231        let existing_ids: Vec<String> = registry
1232            .bindings
1233            .iter()
1234            .filter(|(_, bindings)| {
1235                bindings.iter().any(|binding| {
1236                    binding.source == TriggerBindingSource::Manifest
1237                        && binding.state_snapshot() != TriggerState::Terminated
1238                })
1239            })
1240            .map(|(id, _)| id.clone())
1241            .collect();
1242
1243        for id in existing_ids {
1244            let live_manifest = registry.live_bindings(&id, TriggerBindingSource::Manifest);
1245            let Some(spec) = incoming.remove(&id) else {
1246                for binding in live_manifest {
1247                    registry.transition_binding_to_draining(&binding, &mut lifecycle);
1248                }
1249                touched_ids.insert(id.clone());
1250                continue;
1251            };
1252
1253            let has_matching_active = live_manifest.iter().any(|binding| {
1254                binding.definition_fingerprint == spec.definition_fingerprint
1255                    && matches!(
1256                        binding.state_snapshot(),
1257                        TriggerState::Registering | TriggerState::Active
1258                    )
1259            });
1260            if has_matching_active {
1261                continue;
1262            }
1263
1264            for binding in live_manifest {
1265                registry.transition_binding_to_draining(&binding, &mut lifecycle);
1266            }
1267
1268            let version = registry.next_version_for_spec(&spec);
1269            registry.register_binding(spec, version, &mut lifecycle);
1270            touched_ids.insert(id.clone());
1271        }
1272
1273        for spec in incoming.into_values() {
1274            touched_ids.insert(spec.id.clone());
1275            let version = registry.next_version_for_spec(&spec);
1276            registry.register_binding(spec, version, &mut lifecycle);
1277        }
1278
1279        for id in touched_ids {
1280            registry.gc_terminated_versions(&id);
1281        }
1282
1283        Ok((registry.event_log.clone(), lifecycle))
1284    })?;
1285
1286    append_lifecycle_events(event_log, events).await
1287}
1288
1289pub async fn dynamic_register(
1290    mut spec: TriggerBindingSpec,
1291) -> Result<TriggerId, TriggerRegistryError> {
1292    if spec.id.trim().is_empty() {
1293        spec.id = format!("dynamic_trigger_{}", Uuid::now_v7());
1294    }
1295    spec.source = TriggerBindingSource::Dynamic;
1296    let id = spec.id.clone();
1297    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1298        let registry = &mut *slot.borrow_mut();
1299        registry.refresh_runtime_context();
1300
1301        if registry.bindings.contains_key(id.as_str()) {
1302            return Err(TriggerRegistryError::DuplicateId(id.clone()));
1303        }
1304
1305        let mut lifecycle = Vec::new();
1306        let version = registry.next_version_for_spec(&spec);
1307        registry.register_binding(spec, version, &mut lifecycle);
1308        Ok((registry.event_log.clone(), lifecycle))
1309    })?;
1310
1311    append_lifecycle_events(event_log, events).await?;
1312    Ok(TriggerId::new(id))
1313}
1314
1315pub async fn dynamic_deregister(id: &str) -> Result<(), TriggerRegistryError> {
1316    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1317        let registry = &mut *slot.borrow_mut();
1318        let live_dynamic = registry.live_bindings(id, TriggerBindingSource::Dynamic);
1319        if live_dynamic.is_empty() {
1320            return Err(TriggerRegistryError::UnknownId(id.to_string()));
1321        }
1322
1323        let mut lifecycle = Vec::new();
1324        for binding in live_dynamic {
1325            registry.transition_binding_to_draining(&binding, &mut lifecycle);
1326        }
1327        Ok((registry.event_log.clone(), lifecycle))
1328    })?;
1329
1330    append_lifecycle_events(event_log, events).await
1331}
1332
1333pub async fn drain(id: &str) -> Result<(), TriggerRegistryError> {
1334    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1335        let registry = &mut *slot.borrow_mut();
1336        let live = registry.live_bindings_any_source(id);
1337        if live.is_empty() {
1338            return Err(TriggerRegistryError::UnknownId(id.to_string()));
1339        }
1340
1341        let mut lifecycle = Vec::new();
1342        for binding in live {
1343            registry.transition_binding_to_draining(&binding, &mut lifecycle);
1344        }
1345        Ok((registry.event_log.clone(), lifecycle))
1346    })?;
1347
1348    append_lifecycle_events(event_log, events).await
1349}
1350
1351pub async fn pause(id: &str) -> Result<(), TriggerRegistryError> {
1352    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1353        let registry = &mut *slot.borrow_mut();
1354        let live = registry.live_bindings_any_source(id);
1355        if live.is_empty() {
1356            return Err(TriggerRegistryError::UnknownId(id.to_string()));
1357        }
1358
1359        let mut lifecycle = Vec::new();
1360        for binding in live {
1361            match binding.state_snapshot() {
1362                TriggerState::Registering | TriggerState::Active => {
1363                    registry.transition_binding_state(
1364                        &binding,
1365                        TriggerState::Paused,
1366                        &mut lifecycle,
1367                    );
1368                }
1369                TriggerState::Paused | TriggerState::Draining | TriggerState::Terminated => {}
1370            }
1371        }
1372        Ok((registry.event_log.clone(), lifecycle))
1373    })?;
1374
1375    append_lifecycle_events(event_log, events).await
1376}
1377
1378pub async fn resume(id: &str) -> Result<(), TriggerRegistryError> {
1379    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1380        let registry = &mut *slot.borrow_mut();
1381        let live = registry.live_bindings_any_source(id);
1382        if live.is_empty() {
1383            return Err(TriggerRegistryError::UnknownId(id.to_string()));
1384        }
1385
1386        let mut lifecycle = Vec::new();
1387        for binding in live {
1388            if binding.state_snapshot() == TriggerState::Paused {
1389                registry.transition_binding_state(&binding, TriggerState::Active, &mut lifecycle);
1390            }
1391        }
1392        Ok((registry.event_log.clone(), lifecycle))
1393    })?;
1394
1395    append_lifecycle_events(event_log, events).await
1396}
1397
1398fn pin_trigger_binding_inner(
1399    id: &str,
1400    version: u32,
1401    allow_terminated: bool,
1402) -> Result<(), TriggerRegistryError> {
1403    TRIGGER_REGISTRY.with(|slot| {
1404        let registry = slot.borrow();
1405        let binding = registry.binding(id, version).ok_or_else(|| {
1406            TriggerRegistryError::UnknownBindingVersion {
1407                id: id.to_string(),
1408                version,
1409            }
1410        })?;
1411        match binding.state_snapshot() {
1412            TriggerState::Paused => Err(TriggerRegistryError::InvalidSpec(format!(
1413                "trigger binding '{id}' version {version} is paused"
1414            ))),
1415            TriggerState::Terminated if !allow_terminated => {
1416                Err(TriggerRegistryError::InvalidSpec(format!(
1417                    "trigger binding '{id}' version {version} is terminated"
1418                )))
1419            }
1420            _ => {
1421                binding.in_flight.fetch_add(1, Ordering::Relaxed);
1422                Ok(())
1423            }
1424        }
1425    })
1426}
1427
1428pub fn pin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1429    pin_trigger_binding_inner(id, version, false)
1430}
1431
1432pub async fn unpin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1433    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1434        let registry = &mut *slot.borrow_mut();
1435        let binding = registry.binding(id, version).ok_or_else(|| {
1436            TriggerRegistryError::UnknownBindingVersion {
1437                id: id.to_string(),
1438                version,
1439            }
1440        })?;
1441        let current = binding.in_flight.load(Ordering::Relaxed);
1442        if current == 0 {
1443            return Err(TriggerRegistryError::InvalidSpec(format!(
1444                "trigger binding '{id}' version {version} has no in-flight events"
1445            )));
1446        }
1447        binding.in_flight.fetch_sub(1, Ordering::Relaxed);
1448
1449        let mut lifecycle = Vec::new();
1450        registry.maybe_finalize_draining(&binding, &mut lifecycle);
1451        registry.gc_terminated_versions(binding.id.as_str());
1452        Ok((registry.event_log.clone(), lifecycle))
1453    })?;
1454
1455    append_lifecycle_events(event_log, events).await
1456}
1457
1458pub fn begin_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1459    begin_in_flight_inner(id, version, false)
1460}
1461
1462pub(crate) fn begin_replay_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1463    begin_in_flight_inner(id, version, true)
1464}
1465
1466fn begin_in_flight_inner(
1467    id: &str,
1468    version: u32,
1469    allow_terminated: bool,
1470) -> Result<(), TriggerRegistryError> {
1471    pin_trigger_binding_inner(id, version, allow_terminated)?;
1472    TRIGGER_REGISTRY.with(|slot| {
1473        let registry = slot.borrow();
1474        let binding = registry.binding(id, version).ok_or_else(|| {
1475            TriggerRegistryError::UnknownBindingVersion {
1476                id: id.to_string(),
1477                version,
1478            }
1479        })?;
1480        binding.metrics.received.fetch_add(1, Ordering::Relaxed);
1481        *binding
1482            .metrics
1483            .last_received_ms
1484            .lock()
1485            .expect("trigger metrics poisoned") = Some(now_ms());
1486        Ok(())
1487    })
1488}
1489
1490pub async fn finish_in_flight(
1491    id: &str,
1492    version: u32,
1493    outcome: TriggerDispatchOutcome,
1494) -> Result<(), TriggerRegistryError> {
1495    TRIGGER_REGISTRY.with(|slot| {
1496        let registry = &mut *slot.borrow_mut();
1497        let binding = registry.binding(id, version).ok_or_else(|| {
1498            TriggerRegistryError::UnknownBindingVersion {
1499                id: id.to_string(),
1500                version,
1501            }
1502        })?;
1503        let current = binding.in_flight.load(Ordering::Relaxed);
1504        if current == 0 {
1505            return Err(TriggerRegistryError::InvalidSpec(format!(
1506                "trigger binding '{id}' version {version} has no in-flight events"
1507            )));
1508        }
1509        match outcome {
1510            TriggerDispatchOutcome::Dispatched => {
1511                binding.metrics.dispatched.fetch_add(1, Ordering::Relaxed);
1512            }
1513            TriggerDispatchOutcome::Failed => {
1514                binding.metrics.failed.fetch_add(1, Ordering::Relaxed);
1515            }
1516            TriggerDispatchOutcome::Dlq => {
1517                binding.metrics.dlq.fetch_add(1, Ordering::Relaxed);
1518            }
1519        }
1520        Ok(())
1521    })?;
1522
1523    unpin_trigger_binding(id, version).await
1524}
1525
1526impl TriggerRegistry {
1527    fn refresh_runtime_context(&mut self) {
1528        if self.event_log.is_none() {
1529            self.event_log = active_event_log();
1530        }
1531        if self.secret_provider.is_none() {
1532            self.secret_provider = default_secret_provider();
1533        }
1534    }
1535
1536    fn binding(&self, id: &str, version: u32) -> Option<Arc<TriggerBinding>> {
1537        self.bindings
1538            .get(id)
1539            .and_then(|bindings| bindings.iter().find(|binding| binding.version == version))
1540            .cloned()
1541    }
1542
1543    fn live_bindings(&self, id: &str, source: TriggerBindingSource) -> Vec<Arc<TriggerBinding>> {
1544        self.bindings
1545            .get(id)
1546            .into_iter()
1547            .flat_map(|bindings| bindings.iter())
1548            .filter(|binding| {
1549                binding.source == source && binding.state_snapshot() != TriggerState::Terminated
1550            })
1551            .cloned()
1552            .collect()
1553    }
1554
1555    fn live_bindings_any_source(&self, id: &str) -> Vec<Arc<TriggerBinding>> {
1556        self.bindings
1557            .get(id)
1558            .into_iter()
1559            .flat_map(|bindings| bindings.iter())
1560            .filter(|binding| binding.state_snapshot() != TriggerState::Terminated)
1561            .cloned()
1562            .collect()
1563    }
1564
1565    fn next_version_for_spec(&self, spec: &TriggerBindingSpec) -> u32 {
1566        if let Some(version) = self
1567            .bindings
1568            .get(spec.id.as_str())
1569            .into_iter()
1570            .flat_map(|bindings| bindings.iter())
1571            .find(|binding| binding.definition_fingerprint == spec.definition_fingerprint)
1572            .map(|binding| binding.version)
1573        {
1574            return version;
1575        }
1576
1577        let historical =
1578            self.historical_versions_for(spec.id.as_str(), spec.definition_fingerprint.as_str());
1579        if let Some(version) = historical.matching_version {
1580            return version;
1581        }
1582
1583        self.bindings
1584            .get(spec.id.as_str())
1585            .into_iter()
1586            .flat_map(|bindings| bindings.iter())
1587            .map(|binding| binding.version)
1588            .chain(historical.max_version)
1589            .max()
1590            .unwrap_or(0)
1591            + 1
1592    }
1593
1594    fn gc_terminated_versions(&mut self, id: &str) {
1595        let Some(bindings) = self.bindings.get_mut(id) else {
1596            return;
1597        };
1598
1599        let mut newest_versions: Vec<u32> =
1600            bindings.iter().map(|binding| binding.version).collect();
1601        newest_versions.sort_unstable_by(|left, right| right.cmp(left));
1602        newest_versions.truncate(TERMINATED_VERSION_RETENTION_LIMIT);
1603        let retained_versions: BTreeSet<u32> = newest_versions.into_iter().collect();
1604
1605        bindings.retain(|binding| {
1606            binding.state_snapshot() != TriggerState::Terminated
1607                || retained_versions.contains(&binding.version)
1608        });
1609
1610        if bindings.is_empty() {
1611            self.bindings.remove(id);
1612        }
1613    }
1614
1615    fn historical_versions_for(&self, id: &str, fingerprint: &str) -> HistoricalVersionLookup {
1616        let mut lookup = HistoricalVersionLookup::default();
1617        for record in self.lifecycle_records_for(id) {
1618            lookup.max_version = Some(
1619                lookup
1620                    .max_version
1621                    .unwrap_or(0)
1622                    .max(record.transition.version),
1623            );
1624            if record.transition.definition_fingerprint.as_deref() == Some(fingerprint) {
1625                lookup.matching_version = Some(record.transition.version);
1626            }
1627        }
1628        lookup
1629    }
1630
1631    fn binding_version_as_of(
1632        &self,
1633        id: &str,
1634        as_of: OffsetDateTime,
1635    ) -> Result<u32, TriggerRegistryError> {
1636        let cutoff_ms = harn_clock::offset_datetime_to_ms(as_of);
1637        let mut active_version = None;
1638        for record in self.lifecycle_records_for(id) {
1639            if record.occurred_at_ms > cutoff_ms {
1640                break;
1641            }
1642            match record.transition.to_state {
1643                TriggerState::Active => active_version = Some(record.transition.version),
1644                TriggerState::Paused | TriggerState::Draining | TriggerState::Terminated => {
1645                    if active_version == Some(record.transition.version) {
1646                        active_version = None;
1647                    }
1648                }
1649                TriggerState::Registering => {}
1650            }
1651        }
1652
1653        active_version.ok_or_else(|| {
1654            TriggerRegistryError::InvalidSpec(format!(
1655                "no active trigger binding '{}' at {}",
1656                id,
1657                as_of
1658                    .format(&time::format_description::well_known::Rfc3339)
1659                    .unwrap_or_else(|_| as_of.to_string())
1660            ))
1661        })
1662    }
1663
1664    fn lifecycle_records_for(&self, id: &str) -> Vec<HistoricalLifecycleRecord> {
1665        let Some(event_log) = self.event_log.as_ref() else {
1666            return Vec::new();
1667        };
1668        let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1669            .expect("static triggers.lifecycle topic should always be valid");
1670        futures::executor::block_on(event_log.read_range(&topic, None, usize::MAX))
1671            .unwrap_or_default()
1672            .into_iter()
1673            .filter_map(|(_, event)| {
1674                let occurred_at_ms = event.occurred_at_ms;
1675                let transition: LifecycleStateTransitionRecord =
1676                    serde_json::from_value(event.payload).ok()?;
1677                (transition.id == id).then_some(HistoricalLifecycleRecord {
1678                    occurred_at_ms,
1679                    transition,
1680                })
1681            })
1682            .collect()
1683    }
1684
1685    #[allow(clippy::arc_with_non_send_sync)]
1686    fn register_binding(
1687        &mut self,
1688        spec: TriggerBindingSpec,
1689        version: u32,
1690        lifecycle: &mut Vec<LogEvent>,
1691    ) -> Arc<TriggerBinding> {
1692        let binding = Arc::new(TriggerBinding::new(spec, version));
1693        self.by_provider
1694            .entry(binding.provider.as_str().to_string())
1695            .or_default()
1696            .insert(binding.id.as_str().to_string());
1697        self.bindings
1698            .entry(binding.id.as_str().to_string())
1699            .or_default()
1700            .push(binding.clone());
1701        lifecycle.push(lifecycle_event(&binding, None, TriggerState::Registering));
1702        self.transition_binding_state(&binding, TriggerState::Active, lifecycle);
1703        binding
1704    }
1705
1706    fn transition_binding_to_draining(
1707        &self,
1708        binding: &Arc<TriggerBinding>,
1709        lifecycle: &mut Vec<LogEvent>,
1710    ) {
1711        if matches!(binding.state_snapshot(), TriggerState::Terminated) {
1712            return;
1713        }
1714        self.transition_binding_state(binding, TriggerState::Draining, lifecycle);
1715        self.maybe_finalize_draining(binding, lifecycle);
1716    }
1717
1718    fn maybe_finalize_draining(
1719        &self,
1720        binding: &Arc<TriggerBinding>,
1721        lifecycle: &mut Vec<LogEvent>,
1722    ) {
1723        if binding.state_snapshot() == TriggerState::Draining
1724            && binding.in_flight.load(Ordering::Relaxed) == 0
1725        {
1726            self.transition_binding_state(binding, TriggerState::Terminated, lifecycle);
1727        }
1728    }
1729
1730    fn transition_binding_state(
1731        &self,
1732        binding: &Arc<TriggerBinding>,
1733        next: TriggerState,
1734        lifecycle: &mut Vec<LogEvent>,
1735    ) {
1736        let mut state = binding.state.lock().expect("trigger state poisoned");
1737        let previous = *state;
1738        if previous == next {
1739            return;
1740        }
1741        *state = next;
1742        drop(state);
1743        // CH-04 (#1875): drop any pending aggregation buffers when the
1744        // binding terminates so a long-lived buffer doesn't fire after
1745        // the trigger has been removed. Leftover events are discarded —
1746        // matches the in-flight drain contract elsewhere in the
1747        // registry (the trigger is gone; we'd have nowhere to dispatch).
1748        if next == TriggerState::Terminated && binding.aggregation.is_some() {
1749            let _ = super::aggregation::drop_binding_aggregation(&binding.binding_key());
1750        }
1751        lifecycle.push(lifecycle_event(binding, Some(previous), next));
1752    }
1753}
1754
1755fn lifecycle_event(
1756    binding: &TriggerBinding,
1757    from_state: Option<TriggerState>,
1758    to_state: TriggerState,
1759) -> LogEvent {
1760    LogEvent::new(
1761        "state_transition",
1762        serde_json::json!({
1763            "id": binding.id.as_str(),
1764            "binding_key": binding.binding_key(),
1765            "version": binding.version,
1766            "provider": binding.provider.as_str(),
1767            "kind": &binding.kind,
1768            "source": binding.source.as_str(),
1769            "handler_kind": binding.handler.kind(),
1770            "definition_fingerprint": &binding.definition_fingerprint,
1771            "from_state": from_state.map(TriggerState::as_str),
1772            "to_state": to_state.as_str(),
1773        }),
1774    )
1775}
1776
1777async fn append_lifecycle_events(
1778    event_log: Option<Arc<AnyEventLog>>,
1779    events: Vec<LogEvent>,
1780) -> Result<(), TriggerRegistryError> {
1781    let Some(event_log) = event_log else {
1782        return Ok(());
1783    };
1784    if events.is_empty() {
1785        return Ok(());
1786    }
1787
1788    let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1789        .expect("static triggers.lifecycle topic should always be valid");
1790    for event in events {
1791        event_log
1792            .append(&topic, event)
1793            .await
1794            .map_err(|error| TriggerRegistryError::EventLog(error.to_string()))?;
1795    }
1796    Ok(())
1797}
1798
1799fn default_secret_provider() -> Option<Arc<dyn SecretProvider>> {
1800    configured_default_chain(default_secret_namespace())
1801        .ok()
1802        .map(|provider| Arc::new(provider) as Arc<dyn SecretProvider>)
1803}
1804
1805fn default_secret_namespace() -> String {
1806    if let Ok(namespace) = std::env::var("HARN_SECRET_NAMESPACE") {
1807        if !namespace.trim().is_empty() {
1808            return namespace;
1809        }
1810    }
1811
1812    let cwd = std::env::current_dir().unwrap_or_default();
1813    let leaf = cwd
1814        .file_name()
1815        .and_then(|name| name.to_str())
1816        .filter(|name| !name.is_empty())
1817        .unwrap_or("workspace");
1818    format!("harn/{leaf}")
1819}
1820
1821fn now_ms() -> i64 {
1822    clock::now_ms()
1823}
1824
1825#[cfg(test)]
1826mod tests {
1827    use super::*;
1828    use crate::event_log::{
1829        install_default_for_base_dir, pin_test_occurred_at_ms, reset_active_event_log,
1830    };
1831    use crate::events::{add_event_sink, clear_event_sinks, CollectorSink, EventLevel};
1832    use std::rc::Rc;
1833
1834    /// Build the `OffsetDateTime` that corresponds to a pinned event-log
1835    /// `occurred_at_ms`, so `as_of` cutoffs can be expressed in the same
1836    /// reference frame as `pin_test_occurred_at_ms` without touching the wall
1837    /// clock.
1838    fn offset_from_ms(ms: i64) -> OffsetDateTime {
1839        OffsetDateTime::from_unix_timestamp_nanos((ms as i128) * 1_000_000)
1840            .expect("epoch ms within OffsetDateTime range")
1841    }
1842
1843    fn manifest_spec(id: &str, fingerprint: &str) -> TriggerBindingSpec {
1844        TriggerBindingSpec {
1845            id: id.to_string(),
1846            source: TriggerBindingSource::Manifest,
1847            kind: "webhook".to_string(),
1848            provider: ProviderId::from("github"),
1849            autonomy_tier: crate::AutonomyTier::ActAuto,
1850            handler: TriggerHandlerSpec::Worker {
1851                queue: format!("{id}-queue"),
1852            },
1853            dispatch_priority: crate::WorkerQueuePriority::Normal,
1854            when: None,
1855            when_budget: None,
1856            retry: TriggerRetryConfig::default(),
1857            match_events: vec!["issues.opened".to_string()],
1858            dedupe_key: Some("event.dedupe_key".to_string()),
1859            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1860            filter: Some("event.kind".to_string()),
1861            daily_cost_usd: Some(5.0),
1862            hourly_cost_usd: None,
1863            max_autonomous_decisions_per_hour: None,
1864            max_autonomous_decisions_per_day: None,
1865            on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1866            max_concurrent: Some(10),
1867            flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1868            aggregation: None,
1869            manifest_path: None,
1870            package_name: Some("workspace".to_string()),
1871            definition_fingerprint: fingerprint.to_string(),
1872        }
1873    }
1874
1875    fn dynamic_spec(id: &str) -> TriggerBindingSpec {
1876        TriggerBindingSpec {
1877            id: id.to_string(),
1878            source: TriggerBindingSource::Dynamic,
1879            kind: "webhook".to_string(),
1880            provider: ProviderId::from("github"),
1881            autonomy_tier: crate::AutonomyTier::ActAuto,
1882            handler: TriggerHandlerSpec::Worker {
1883                queue: format!("{id}-queue"),
1884            },
1885            dispatch_priority: crate::WorkerQueuePriority::Normal,
1886            when: None,
1887            when_budget: None,
1888            retry: TriggerRetryConfig::default(),
1889            match_events: vec!["issues.opened".to_string()],
1890            dedupe_key: None,
1891            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1892            filter: None,
1893            daily_cost_usd: None,
1894            hourly_cost_usd: None,
1895            max_autonomous_decisions_per_hour: None,
1896            max_autonomous_decisions_per_day: None,
1897            on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1898            max_concurrent: None,
1899            flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1900            aggregation: None,
1901            manifest_path: None,
1902            package_name: None,
1903            definition_fingerprint: format!("dynamic:{id}"),
1904        }
1905    }
1906
1907    #[tokio::test(flavor = "current_thread")]
1908    async fn manifest_loaded_trigger_registers_with_zeroed_metrics() {
1909        clear_trigger_registry();
1910
1911        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1912            .await
1913            .expect("manifest trigger installs");
1914
1915        let snapshots = snapshot_trigger_bindings();
1916        assert_eq!(snapshots.len(), 1);
1917        let binding = &snapshots[0];
1918        assert_eq!(binding.id, "github-new-issue");
1919        assert_eq!(binding.version, 1);
1920        assert_eq!(binding.state, TriggerState::Active);
1921        assert_eq!(binding.metrics, TriggerMetricsSnapshot::default());
1922
1923        clear_trigger_registry();
1924    }
1925
1926    #[tokio::test(flavor = "current_thread")]
1927    async fn dynamic_register_assigns_unique_ids_and_rejects_duplicates() {
1928        clear_trigger_registry();
1929
1930        let first = dynamic_register(dynamic_spec("dynamic-a"))
1931            .await
1932            .expect("first dynamic trigger");
1933        let second = dynamic_register(dynamic_spec("dynamic-b"))
1934            .await
1935            .expect("second dynamic trigger");
1936        assert_ne!(first, second);
1937
1938        let error = dynamic_register(dynamic_spec("dynamic-a"))
1939            .await
1940            .expect_err("duplicate id should fail");
1941        assert!(matches!(error, TriggerRegistryError::DuplicateId(_)));
1942
1943        clear_trigger_registry();
1944    }
1945
1946    #[test]
1947    fn expected_predicate_cost_average_does_not_overflow() {
1948        let binding = TriggerBinding::new(manifest_spec("costed", "v1"), 1);
1949        record_predicate_cost_sample(&binding, u64::MAX);
1950        record_predicate_cost_sample(&binding, u64::MAX);
1951
1952        assert_eq!(expected_predicate_cost_usd_micros(&binding), u64::MAX);
1953    }
1954
1955    #[tokio::test(flavor = "current_thread")]
1956    async fn drain_waits_for_in_flight_events_before_terminating() {
1957        clear_trigger_registry();
1958
1959        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1960            .await
1961            .expect("manifest trigger installs");
1962        begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1963
1964        drain("github-new-issue").await.expect("drain succeeds");
1965        let binding = snapshot_trigger_bindings()
1966            .into_iter()
1967            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1968            .expect("binding snapshot");
1969        assert_eq!(binding.state, TriggerState::Draining);
1970        assert_eq!(binding.metrics.in_flight, 1);
1971
1972        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1973            .await
1974            .expect("finish in-flight event");
1975        let binding = snapshot_trigger_bindings()
1976            .into_iter()
1977            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1978            .expect("binding snapshot");
1979        assert_eq!(binding.state, TriggerState::Terminated);
1980        assert_eq!(binding.metrics.in_flight, 0);
1981
1982        clear_trigger_registry();
1983    }
1984
1985    #[tokio::test(flavor = "current_thread")]
1986    async fn hot_reload_registers_new_version_while_old_binding_drains() {
1987        clear_trigger_registry();
1988
1989        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1990            .await
1991            .expect("initial manifest trigger installs");
1992        begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1993
1994        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1995            .await
1996            .expect("updated manifest trigger installs");
1997
1998        let snapshots = snapshot_trigger_bindings();
1999        assert_eq!(snapshots.len(), 2);
2000        let old = snapshots
2001            .iter()
2002            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
2003            .expect("old binding");
2004        let new = snapshots
2005            .iter()
2006            .find(|binding| binding.id == "github-new-issue" && binding.version == 2)
2007            .expect("new binding");
2008        assert_eq!(old.state, TriggerState::Draining);
2009        assert_eq!(new.state, TriggerState::Active);
2010
2011        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
2012            .await
2013            .expect("finish old in-flight event");
2014        let old = snapshot_trigger_bindings()
2015            .into_iter()
2016            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
2017            .expect("old binding");
2018        assert_eq!(old.state, TriggerState::Terminated);
2019
2020        clear_trigger_registry();
2021    }
2022
2023    #[tokio::test(flavor = "current_thread")]
2024    async fn gc_drops_terminated_versions_beyond_retention_limit() {
2025        clear_trigger_registry();
2026
2027        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2028            .await
2029            .expect("install v1");
2030        begin_in_flight("github-new-issue", 1).expect("pin v1");
2031
2032        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2033            .await
2034            .expect("install v2");
2035        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
2036            .await
2037            .expect("finish v1");
2038
2039        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
2040            .await
2041            .expect("install v3");
2042
2043        let snapshots = snapshot_trigger_bindings();
2044        let versions: Vec<u32> = snapshots
2045            .into_iter()
2046            .filter(|binding| binding.id == "github-new-issue")
2047            .map(|binding| binding.version)
2048            .collect();
2049        assert_eq!(versions, vec![2, 3]);
2050
2051        clear_trigger_registry();
2052    }
2053
2054    #[tokio::test(flavor = "current_thread")]
2055    async fn lifecycle_transitions_append_to_event_log() {
2056        clear_trigger_registry();
2057        reset_active_event_log();
2058        let tempdir = tempfile::tempdir().expect("tempdir");
2059        let log = install_default_for_base_dir(tempdir.path()).expect("install event log");
2060
2061        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2062            .await
2063            .expect("manifest trigger installs");
2064        begin_in_flight("github-new-issue", 1).expect("start in-flight event");
2065        drain("github-new-issue").await.expect("drain succeeds");
2066        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
2067            .await
2068            .expect("finish event");
2069
2070        let topic = Topic::new("triggers.lifecycle").expect("valid lifecycle topic");
2071        let events = log
2072            .read_range(&topic, None, 32)
2073            .await
2074            .expect("read lifecycle events");
2075        let states: Vec<String> = events
2076            .into_iter()
2077            .filter_map(|(_, event)| {
2078                event
2079                    .payload
2080                    .get("to_state")
2081                    .and_then(|value| value.as_str())
2082                    .map(|value| value.to_string())
2083            })
2084            .collect();
2085        assert_eq!(
2086            states,
2087            vec![
2088                "registering".to_string(),
2089                "active".to_string(),
2090                "draining".to_string(),
2091                "terminated".to_string(),
2092            ]
2093        );
2094
2095        reset_active_event_log();
2096        clear_trigger_registry();
2097    }
2098
2099    #[tokio::test(flavor = "current_thread")]
2100    async fn version_history_reuses_historical_version_after_restart() {
2101        clear_trigger_registry();
2102        reset_active_event_log();
2103        let tempdir = tempfile::tempdir().expect("tempdir");
2104        install_default_for_base_dir(tempdir.path()).expect("install event log");
2105
2106        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2107            .await
2108            .expect("initial manifest trigger installs");
2109        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2110            .await
2111            .expect("updated manifest trigger installs");
2112
2113        clear_trigger_registry();
2114        reset_active_event_log();
2115        install_default_for_base_dir(tempdir.path()).expect("reopen event log");
2116
2117        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2118            .await
2119            .expect("manifest reload reuses historical version");
2120
2121        let binding = snapshot_trigger_bindings()
2122            .into_iter()
2123            .find(|binding| binding.id == "github-new-issue")
2124            .expect("binding snapshot");
2125        assert_eq!(binding.version, 2);
2126
2127        reset_active_event_log();
2128        clear_trigger_registry();
2129    }
2130
2131    #[tokio::test(flavor = "current_thread")]
2132    async fn binding_version_as_of_reports_historical_active_version() {
2133        clear_trigger_registry();
2134        reset_active_event_log();
2135        let tempdir = tempfile::tempdir().expect("tempdir");
2136        install_default_for_base_dir(tempdir.path()).expect("install event log");
2137
2138        // Pin the event-log timestamp instead of sleeping between captures:
2139        // v1's lifecycle events stamp at T1, v2's at T2 > T1, with zero
2140        // wall-clock dependence. `before_reload` sits at T1 (so v1 is active
2141        // but v2 is not yet), `after_reload` at T2 (so v2 is active).
2142        const T1_MS: i64 = 1_700_000_000_000;
2143        const T2_MS: i64 = T1_MS + 50;
2144        let before_reload = offset_from_ms(T1_MS);
2145        let after_reload = offset_from_ms(T2_MS);
2146
2147        let clock = pin_test_occurred_at_ms(T1_MS);
2148        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2149            .await
2150            .expect("initial manifest trigger installs");
2151        drop(clock);
2152
2153        let clock = pin_test_occurred_at_ms(T2_MS);
2154        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2155            .await
2156            .expect("updated manifest trigger installs");
2157        drop(clock);
2158
2159        assert_eq!(
2160            binding_version_as_of("github-new-issue", before_reload)
2161                .expect("version before reload"),
2162            1
2163        );
2164        assert_eq!(
2165            binding_version_as_of("github-new-issue", after_reload).expect("version after reload"),
2166            2
2167        );
2168
2169        reset_active_event_log();
2170        clear_trigger_registry();
2171    }
2172
2173    #[tokio::test(flavor = "current_thread")]
2174    async fn resolve_live_or_as_of_logs_structured_gc_fallback() {
2175        clear_trigger_registry();
2176        reset_active_event_log();
2177        let sink = Rc::new(CollectorSink::new());
2178        clear_event_sinks();
2179        add_event_sink(sink.clone());
2180        let tempdir = tempfile::tempdir().expect("tempdir");
2181        install_default_for_base_dir(tempdir.path()).expect("install event log");
2182
2183        // Pin the event-log timestamps: v1..v3 land at T1, v4 at T2 > T1, with
2184        // `received_at` at T1 so the gc fallback resolves to v3 (the latest
2185        // binding active at-or-before `received_at`) while v4 is strictly later.
2186        const T1_MS: i64 = 1_700_000_000_000;
2187        const T2_MS: i64 = T1_MS + 50;
2188        let received_at = offset_from_ms(T1_MS);
2189
2190        let clock = pin_test_occurred_at_ms(T1_MS);
2191        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2192            .await
2193            .expect("install v1");
2194        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2195            .await
2196            .expect("install v2");
2197        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
2198            .await
2199            .expect("install v3");
2200        drop(clock);
2201
2202        let clock = pin_test_occurred_at_ms(T2_MS);
2203        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v4")])
2204            .await
2205            .expect("install v4");
2206        drop(clock);
2207
2208        let binding = resolve_live_or_as_of(
2209            "github-new-issue",
2210            RecordedTriggerBinding {
2211                version: 1,
2212                received_at,
2213            },
2214        )
2215        .expect("resolve fallback binding");
2216        assert_eq!(binding.version, 3);
2217
2218        let warning = sink
2219            .logs
2220            .borrow()
2221            .iter()
2222            .find(|log| log.category == "replay.binding_version_gc_fallback")
2223            .cloned()
2224            .expect("gc fallback warning");
2225        assert_eq!(warning.level, EventLevel::Warn);
2226        assert_eq!(
2227            warning.metadata.get("trigger_id"),
2228            Some(&serde_json::json!("github-new-issue"))
2229        );
2230        assert_eq!(
2231            warning.metadata.get("recorded_version"),
2232            Some(&serde_json::json!(1))
2233        );
2234        assert_eq!(
2235            warning.metadata.get("received_at"),
2236            Some(&serde_json::json!(received_at
2237                .format(&time::format_description::well_known::Rfc3339)
2238                .unwrap_or_else(|_| received_at.to_string())))
2239        );
2240        assert_eq!(
2241            warning.metadata.get("resolved_version"),
2242            Some(&serde_json::json!(3))
2243        );
2244
2245        clear_event_sinks();
2246        crate::events::reset_event_sinks();
2247        reset_active_event_log();
2248        clear_trigger_registry();
2249    }
2250}