Skip to main content

harn_vm/triggers/
registry.rs

1use serde::{Deserialize, Serialize};
2use std::cell::RefCell;
3use std::collections::{BTreeMap, BTreeSet, VecDeque};
4use std::path::PathBuf;
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6use std::sync::{Arc, Mutex};
7
8use time::OffsetDateTime;
9use uuid::Uuid;
10
11use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogEvent, Topic};
12use crate::llm::trigger_predicate::TriggerPredicateBudget;
13use crate::secrets::{configured_default_chain, SecretProvider};
14use crate::triggers::test_util::clock;
15use crate::trust_graph::AutonomyTier;
16use crate::value::VmClosure;
17
18use super::aggregation::TriggerAggregationConfig;
19use super::dispatcher::TriggerRetryConfig;
20use super::flow_control::TriggerFlowControlConfig;
21use super::ProviderId;
22
23#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
24pub struct TriggerId(String);
25
26impl TriggerId {
27    pub fn new(value: impl Into<String>) -> Self {
28        Self(value.into())
29    }
30
31    pub fn as_str(&self) -> &str {
32        &self.0
33    }
34}
35
36impl std::fmt::Display for TriggerId {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        self.0.fmt(f)
39    }
40}
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
43#[serde(rename_all = "snake_case")]
44pub enum TriggerState {
45    Registering,
46    Active,
47    Paused,
48    Draining,
49    Terminated,
50}
51
52impl TriggerState {
53    pub fn as_str(self) -> &'static str {
54        match self {
55            Self::Registering => "registering",
56            Self::Active => "active",
57            Self::Paused => "paused",
58            Self::Draining => "draining",
59            Self::Terminated => "terminated",
60        }
61    }
62}
63
64#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
65#[serde(rename_all = "snake_case")]
66pub enum TriggerBindingSource {
67    Manifest,
68    Dynamic,
69}
70
71impl TriggerBindingSource {
72    pub fn as_str(self) -> &'static str {
73        match self {
74            Self::Manifest => "manifest",
75            Self::Dynamic => "dynamic",
76        }
77    }
78}
79
80#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
81#[serde(rename_all = "snake_case")]
82pub enum TriggerBudgetExhaustionStrategy {
83    #[default]
84    False,
85    RetryLater,
86    Fail,
87    Warn,
88}
89
90impl TriggerBudgetExhaustionStrategy {
91    pub fn as_str(self) -> &'static str {
92        match self {
93            Self::False => "false",
94            Self::RetryLater => "retry_later",
95            Self::Fail => "fail",
96            Self::Warn => "warn",
97        }
98    }
99}
100
101#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
102pub struct OrchestratorBudgetConfig {
103    pub daily_cost_usd: Option<f64>,
104    pub hourly_cost_usd: Option<f64>,
105}
106
107#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
108pub struct OrchestratorBudgetSnapshot {
109    pub daily_cost_usd: Option<f64>,
110    pub hourly_cost_usd: Option<f64>,
111    pub cost_today_usd_micros: u64,
112    pub cost_hour_usd_micros: u64,
113    pub day_utc: i32,
114    pub hour_utc: i64,
115}
116
117#[derive(Debug)]
118struct OrchestratorBudgetState {
119    config: OrchestratorBudgetConfig,
120    day_utc: i32,
121    hour_utc: i64,
122    cost_today_usd_micros: u64,
123    cost_hour_usd_micros: u64,
124}
125
126impl Default for OrchestratorBudgetState {
127    fn default() -> Self {
128        Self {
129            config: OrchestratorBudgetConfig::default(),
130            day_utc: utc_day_key(),
131            hour_utc: utc_hour_key(),
132            cost_today_usd_micros: 0,
133            cost_hour_usd_micros: 0,
134        }
135    }
136}
137
138#[derive(Clone)]
139pub enum TriggerHandlerSpec {
140    Local {
141        raw: String,
142        closure: Arc<VmClosure>,
143    },
144    A2a {
145        target: String,
146        allow_cleartext: bool,
147    },
148    Worker {
149        queue: String,
150    },
151    Persona {
152        binding: crate::PersonaRuntimeBinding,
153    },
154    AutoResume {
155        worker_id: String,
156    },
157    /// Composes triggers (#1870) with named agent pools (#1883). On match,
158    /// the dispatcher resolves `pool` by name, invokes `task_factory(event)`
159    /// to build the per-event closure, and submits it to the pool with the
160    /// pool's queue strategy + backpressure policy applied (PL-04 / #1889).
161    SpawnToPool {
162        pool: String,
163        priority_from: Option<String>,
164        key_from: Option<String>,
165        task_factory: Arc<VmClosure>,
166    },
167    /// Composes triggers (#1870) with the reminder pipeline (#1815) by
168    /// injecting a `SystemReminder` into a target agent session's transcript
169    /// when the trigger matches (CH-05 / #1876). The reminder appears at the
170    /// session's next turn boundary — same as any other reminder. No spawn,
171    /// no resume, no signal. `body` is a prompt-template string rendered
172    /// against `{event}` / `{batch.events}` per match. `target` resolution
173    /// happens at dispatch time via [`TargetExpr`]; missing-target dispatches
174    /// are recorded as audit events rather than crashing the trigger.
175    ReminderInject {
176        target: TargetExpr,
177        body: String,
178        tags: Vec<String>,
179        ttl_turns: Option<i64>,
180        dedupe_key: Option<String>,
181        propagate: crate::llm::helpers::ReminderPropagate,
182        role_hint: crate::llm::helpers::ReminderRoleHint,
183        preserve_on_compact: bool,
184    },
185    /// CH-10 (#1910): emergency "panic" broadcast. When the trigger fires,
186    /// every running worker matched by `target_agents` is suspended
187    /// synchronously via the cooperative suspend pipeline used by
188    /// `suspend_agent` (#1837), bypassing the normal turn-boundary
189    /// delivery contract. Already-suspended or terminal workers are skipped
190    /// (no double-suspend, no error). `reason` is propagated to the
191    /// suspension envelope, the `WorkerSuspended` event, and the
192    /// `triggers.interrupt_and_suspend.audit` audit entry per suspension.
193    /// A no-target dispatch records a single roll-up audit entry and
194    /// returns a `broadcast` result with `suspended_count: 0` — graceful
195    /// no-op rather than dispatch failure, mirroring the `ReminderInject`
196    /// missing-target contract.
197    InterruptAndSuspend {
198        target_agents: AgentScope,
199        reason: String,
200    },
201}
202
203/// Resolution mode for the target worker set of a
204/// [`TriggerHandlerSpec::InterruptAndSuspend`] dispatch. `All` enumerates
205/// every entry in the local worker registry (the org-scoped "panic"
206/// broadcast); `Concrete` carries an explicit worker-id list from the
207/// registration; `Closure` defers resolution to a user closure that runs at
208/// dispatch time with the event payload bound to its first argument and
209/// must return a list of worker-id strings.
210#[derive(Clone)]
211pub enum AgentScope {
212    All,
213    Concrete(Vec<String>),
214    Closure(Arc<VmClosure>),
215}
216
217impl AgentScope {
218    pub fn kind(&self) -> &'static str {
219        match self {
220            Self::All => "all",
221            Self::Concrete(_) => "concrete",
222            Self::Closure(_) => "closure",
223        }
224    }
225}
226
227impl std::fmt::Debug for AgentScope {
228    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229        match self {
230            Self::All => f.write_str("All"),
231            Self::Concrete(ids) => f.debug_tuple("Concrete").field(ids).finish(),
232            Self::Closure(_) => f.write_str("Closure(<vm_closure>)"),
233        }
234    }
235}
236
237/// Resolution mode for the target session of a [`TriggerHandlerSpec::ReminderInject`]
238/// dispatch. `Current` walks the current-session thread-local; `Parent` resolves
239/// the active session's parent in the agent-session lineage; `Concrete` carries
240/// a literal session id from the registration; `Closure` defers resolution to
241/// a user closure that runs at dispatch time with the event payload bound to
242/// its first argument and must return the session id as a string.
243#[derive(Clone)]
244pub enum TargetExpr {
245    Current,
246    Parent,
247    Concrete(String),
248    Closure(Arc<VmClosure>),
249}
250
251impl TargetExpr {
252    pub fn kind(&self) -> &'static str {
253        match self {
254            Self::Current => "current",
255            Self::Parent => "parent",
256            Self::Concrete(_) => "concrete",
257            Self::Closure(_) => "closure",
258        }
259    }
260}
261
262impl std::fmt::Debug for TargetExpr {
263    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264        match self {
265            Self::Current => f.write_str("Current"),
266            Self::Parent => f.write_str("Parent"),
267            Self::Concrete(id) => f.debug_tuple("Concrete").field(id).finish(),
268            Self::Closure(_) => f.write_str("Closure(<vm_closure>)"),
269        }
270    }
271}
272
273impl std::fmt::Debug for TriggerHandlerSpec {
274    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275        match self {
276            Self::Local { raw, .. } => f.debug_struct("Local").field("raw", raw).finish(),
277            Self::A2a {
278                target,
279                allow_cleartext,
280            } => f
281                .debug_struct("A2a")
282                .field("target", target)
283                .field("allow_cleartext", allow_cleartext)
284                .finish(),
285            Self::Worker { queue } => f.debug_struct("Worker").field("queue", queue).finish(),
286            Self::Persona { binding } => f
287                .debug_struct("Persona")
288                .field("name", &binding.name)
289                .finish(),
290            Self::AutoResume { worker_id } => f
291                .debug_struct("AutoResume")
292                .field("worker_id", worker_id)
293                .finish(),
294            Self::SpawnToPool {
295                pool,
296                priority_from,
297                key_from,
298                ..
299            } => f
300                .debug_struct("SpawnToPool")
301                .field("pool", pool)
302                .field("priority_from", priority_from)
303                .field("key_from", key_from)
304                .finish(),
305            Self::ReminderInject {
306                target,
307                body,
308                tags,
309                ttl_turns,
310                dedupe_key,
311                propagate,
312                role_hint,
313                preserve_on_compact,
314            } => f
315                .debug_struct("ReminderInject")
316                .field("target", target)
317                .field("body", body)
318                .field("tags", tags)
319                .field("ttl_turns", ttl_turns)
320                .field("dedupe_key", dedupe_key)
321                .field("propagate", propagate)
322                .field("role_hint", role_hint)
323                .field("preserve_on_compact", preserve_on_compact)
324                .finish(),
325            Self::InterruptAndSuspend {
326                target_agents,
327                reason,
328            } => f
329                .debug_struct("InterruptAndSuspend")
330                .field("target_agents", target_agents)
331                .field("reason", reason)
332                .finish(),
333        }
334    }
335}
336
337impl TriggerHandlerSpec {
338    pub fn kind(&self) -> &'static str {
339        match self {
340            Self::Local { .. } => "local",
341            Self::A2a { .. } => "a2a",
342            Self::Worker { .. } => "worker",
343            Self::Persona { .. } => "persona",
344            Self::AutoResume { .. } => "auto_resume",
345            Self::SpawnToPool { .. } => "spawn_to_pool",
346            Self::ReminderInject { .. } => "reminder_inject",
347            Self::InterruptAndSuspend { .. } => "interrupt_and_suspend",
348        }
349    }
350}
351
352#[derive(Clone)]
353pub struct TriggerPredicateSpec {
354    pub raw: String,
355    pub closure: Arc<VmClosure>,
356}
357
358impl std::fmt::Debug for TriggerPredicateSpec {
359    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
360        f.debug_struct("TriggerPredicateSpec")
361            .field("raw", &self.raw)
362            .finish()
363    }
364}
365
366#[derive(Clone, Debug)]
367pub struct TriggerBindingSpec {
368    pub id: String,
369    pub source: TriggerBindingSource,
370    pub kind: String,
371    pub provider: ProviderId,
372    pub autonomy_tier: AutonomyTier,
373    pub handler: TriggerHandlerSpec,
374    pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
375    pub when: Option<TriggerPredicateSpec>,
376    pub when_budget: Option<TriggerPredicateBudget>,
377    pub retry: TriggerRetryConfig,
378    pub match_events: Vec<String>,
379    pub dedupe_key: Option<String>,
380    pub dedupe_retention_days: u32,
381    pub filter: Option<String>,
382    pub daily_cost_usd: Option<f64>,
383    pub hourly_cost_usd: Option<f64>,
384    pub max_autonomous_decisions_per_hour: Option<u64>,
385    pub max_autonomous_decisions_per_day: Option<u64>,
386    pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
387    pub max_concurrent: Option<u32>,
388    pub flow_control: TriggerFlowControlConfig,
389    /// CH-04 (#1875): optional aggregation buffer. When set, the
390    /// dispatcher accumulates matching events into a per-(binding,
391    /// partition_key) buffer and dispatches the handler with a batched
392    /// event when `count` is reached or the `window` elapses.
393    pub aggregation: Option<TriggerAggregationConfig>,
394    pub manifest_path: Option<PathBuf>,
395    pub package_name: Option<String>,
396    pub definition_fingerprint: String,
397}
398
399#[derive(Debug)]
400pub struct TriggerMetrics {
401    pub received: AtomicU64,
402    pub dispatched: AtomicU64,
403    pub failed: AtomicU64,
404    pub dlq: AtomicU64,
405    pub last_received_ms: Mutex<Option<i64>>,
406    pub cost_total_usd_micros: AtomicU64,
407    pub cost_today_usd_micros: AtomicU64,
408    pub cost_hour_usd_micros: AtomicU64,
409    pub autonomous_decisions_total: AtomicU64,
410    pub autonomous_decisions_today: AtomicU64,
411    pub autonomous_decisions_hour: AtomicU64,
412}
413
414impl Default for TriggerMetrics {
415    fn default() -> Self {
416        Self {
417            received: AtomicU64::new(0),
418            dispatched: AtomicU64::new(0),
419            failed: AtomicU64::new(0),
420            dlq: AtomicU64::new(0),
421            last_received_ms: Mutex::new(None),
422            cost_total_usd_micros: AtomicU64::new(0),
423            cost_today_usd_micros: AtomicU64::new(0),
424            cost_hour_usd_micros: AtomicU64::new(0),
425            autonomous_decisions_total: AtomicU64::new(0),
426            autonomous_decisions_today: AtomicU64::new(0),
427            autonomous_decisions_hour: AtomicU64::new(0),
428        }
429    }
430}
431
432#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
433pub struct TriggerMetricsSnapshot {
434    pub received: u64,
435    pub dispatched: u64,
436    pub failed: u64,
437    pub dlq: u64,
438    pub in_flight: u64,
439    pub last_received_ms: Option<i64>,
440    pub cost_total_usd_micros: u64,
441    pub cost_today_usd_micros: u64,
442    pub cost_hour_usd_micros: u64,
443    pub autonomous_decisions_total: u64,
444    pub autonomous_decisions_today: u64,
445    pub autonomous_decisions_hour: u64,
446}
447
448pub struct TriggerBinding {
449    pub id: TriggerId,
450    pub version: u32,
451    pub source: TriggerBindingSource,
452    pub kind: String,
453    pub provider: ProviderId,
454    pub autonomy_tier: AutonomyTier,
455    pub handler: TriggerHandlerSpec,
456    pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
457    pub when: Option<TriggerPredicateSpec>,
458    pub when_budget: Option<TriggerPredicateBudget>,
459    pub retry: TriggerRetryConfig,
460    pub match_events: Vec<String>,
461    pub dedupe_key: Option<String>,
462    pub dedupe_retention_days: u32,
463    pub filter: Option<String>,
464    pub daily_cost_usd: Option<f64>,
465    pub hourly_cost_usd: Option<f64>,
466    pub max_autonomous_decisions_per_hour: Option<u64>,
467    pub max_autonomous_decisions_per_day: Option<u64>,
468    pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
469    pub max_concurrent: Option<u32>,
470    pub flow_control: TriggerFlowControlConfig,
471    /// CH-04 (#1875): see `TriggerBindingSpec::aggregation`. Cloned at
472    /// registration so the dispatcher does not need to lock the registry
473    /// for every emit.
474    pub aggregation: Option<TriggerAggregationConfig>,
475    pub manifest_path: Option<PathBuf>,
476    pub package_name: Option<String>,
477    pub definition_fingerprint: String,
478    pub state: Mutex<TriggerState>,
479    pub metrics: TriggerMetrics,
480    pub in_flight: AtomicU64,
481    pub cancel_token: Arc<AtomicBool>,
482    pub predicate_state: Mutex<TriggerPredicateState>,
483}
484
485#[derive(Clone, Debug, Default)]
486pub struct TriggerPredicateState {
487    pub budget_day_utc: Option<i32>,
488    pub budget_hour_utc: Option<i64>,
489    pub consecutive_failures: u32,
490    pub breaker_open_until_ms: Option<i64>,
491    pub recent_cost_usd_micros: VecDeque<u64>,
492}
493
494impl std::fmt::Debug for TriggerBinding {
495    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
496        f.debug_struct("TriggerBinding")
497            .field("id", &self.id)
498            .field("version", &self.version)
499            .field("source", &self.source)
500            .field("kind", &self.kind)
501            .field("provider", &self.provider)
502            .field("handler_kind", &self.handler.kind())
503            .field("state", &self.state_snapshot())
504            .finish()
505    }
506}
507
508impl TriggerBinding {
509    pub fn snapshot(&self) -> TriggerBindingSnapshot {
510        TriggerBindingSnapshot {
511            id: self.id.as_str().to_string(),
512            version: self.version,
513            source: self.source,
514            kind: self.kind.clone(),
515            provider: self.provider.as_str().to_string(),
516            autonomy_tier: self.autonomy_tier,
517            handler_kind: self.handler.kind().to_string(),
518            state: self.state_snapshot(),
519            metrics: self.metrics_snapshot(),
520            daily_cost_usd: self.daily_cost_usd,
521            hourly_cost_usd: self.hourly_cost_usd,
522            max_autonomous_decisions_per_hour: self.max_autonomous_decisions_per_hour,
523            max_autonomous_decisions_per_day: self.max_autonomous_decisions_per_day,
524            on_budget_exhausted: self.on_budget_exhausted,
525        }
526    }
527
528    fn new(spec: TriggerBindingSpec, version: u32) -> Self {
529        Self {
530            id: TriggerId::new(spec.id),
531            version,
532            source: spec.source,
533            kind: spec.kind,
534            provider: spec.provider,
535            autonomy_tier: spec.autonomy_tier,
536            handler: spec.handler,
537            dispatch_priority: spec.dispatch_priority,
538            when: spec.when,
539            when_budget: spec.when_budget,
540            retry: spec.retry,
541            match_events: spec.match_events,
542            dedupe_key: spec.dedupe_key,
543            dedupe_retention_days: spec.dedupe_retention_days,
544            filter: spec.filter,
545            daily_cost_usd: spec.daily_cost_usd,
546            hourly_cost_usd: spec.hourly_cost_usd,
547            max_autonomous_decisions_per_hour: spec.max_autonomous_decisions_per_hour,
548            max_autonomous_decisions_per_day: spec.max_autonomous_decisions_per_day,
549            on_budget_exhausted: spec.on_budget_exhausted,
550            max_concurrent: spec.max_concurrent,
551            flow_control: spec.flow_control,
552            aggregation: spec.aggregation,
553            manifest_path: spec.manifest_path,
554            package_name: spec.package_name,
555            definition_fingerprint: spec.definition_fingerprint,
556            state: Mutex::new(TriggerState::Registering),
557            metrics: TriggerMetrics::default(),
558            in_flight: AtomicU64::new(0),
559            cancel_token: Arc::new(AtomicBool::new(false)),
560            predicate_state: Mutex::new(TriggerPredicateState::default()),
561        }
562    }
563
564    pub fn binding_key(&self) -> String {
565        format!("{}@v{}", self.id.as_str(), self.version)
566    }
567
568    pub fn state_snapshot(&self) -> TriggerState {
569        *self.state.lock().expect("trigger state poisoned")
570    }
571
572    pub fn metrics_snapshot(&self) -> TriggerMetricsSnapshot {
573        TriggerMetricsSnapshot {
574            received: self.metrics.received.load(Ordering::Relaxed),
575            dispatched: self.metrics.dispatched.load(Ordering::Relaxed),
576            failed: self.metrics.failed.load(Ordering::Relaxed),
577            dlq: self.metrics.dlq.load(Ordering::Relaxed),
578            in_flight: self.in_flight.load(Ordering::Relaxed),
579            last_received_ms: *self
580                .metrics
581                .last_received_ms
582                .lock()
583                .expect("trigger metrics poisoned"),
584            cost_total_usd_micros: self.metrics.cost_total_usd_micros.load(Ordering::Relaxed),
585            cost_today_usd_micros: self.metrics.cost_today_usd_micros.load(Ordering::Relaxed),
586            cost_hour_usd_micros: self.metrics.cost_hour_usd_micros.load(Ordering::Relaxed),
587            autonomous_decisions_total: self
588                .metrics
589                .autonomous_decisions_total
590                .load(Ordering::Relaxed),
591            autonomous_decisions_today: self
592                .metrics
593                .autonomous_decisions_today
594                .load(Ordering::Relaxed),
595            autonomous_decisions_hour: self
596                .metrics
597                .autonomous_decisions_hour
598                .load(Ordering::Relaxed),
599        }
600    }
601}
602
603#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
604pub struct TriggerBindingSnapshot {
605    pub id: String,
606    pub version: u32,
607    pub source: TriggerBindingSource,
608    pub kind: String,
609    pub provider: String,
610    pub autonomy_tier: AutonomyTier,
611    pub handler_kind: String,
612    pub state: TriggerState,
613    pub metrics: TriggerMetricsSnapshot,
614    pub daily_cost_usd: Option<f64>,
615    pub hourly_cost_usd: Option<f64>,
616    pub max_autonomous_decisions_per_hour: Option<u64>,
617    pub max_autonomous_decisions_per_day: Option<u64>,
618    pub on_budget_exhausted: TriggerBudgetExhaustionStrategy,
619}
620
621#[derive(Clone, Copy, Debug, PartialEq, Eq)]
622pub enum TriggerDispatchOutcome {
623    Dispatched,
624    Failed,
625    Dlq,
626}
627
628#[derive(Debug)]
629pub enum TriggerRegistryError {
630    DuplicateId(String),
631    InvalidSpec(String),
632    UnknownId(String),
633    UnknownBindingVersion { id: String, version: u32 },
634    EventLog(String),
635}
636
637impl std::fmt::Display for TriggerRegistryError {
638    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
639        match self {
640            Self::DuplicateId(id) => write!(f, "duplicate trigger id '{id}'"),
641            Self::InvalidSpec(message) | Self::EventLog(message) => f.write_str(message),
642            Self::UnknownId(id) => write!(f, "unknown trigger id '{id}'"),
643            Self::UnknownBindingVersion { id, version } => {
644                write!(f, "unknown trigger binding '{id}' version {version}")
645            }
646        }
647    }
648}
649
650impl std::error::Error for TriggerRegistryError {}
651
652#[derive(Default)]
653pub struct TriggerRegistry {
654    bindings: BTreeMap<String, Vec<Arc<TriggerBinding>>>,
655    by_provider: BTreeMap<String, BTreeSet<String>>,
656    event_log: Option<Arc<AnyEventLog>>,
657    secret_provider: Option<Arc<dyn SecretProvider>>,
658}
659
660thread_local! {
661    static TRIGGER_REGISTRY: RefCell<TriggerRegistry> = RefCell::new(TriggerRegistry::default());
662}
663
664thread_local! {
665    static ORCHESTRATOR_BUDGET: RefCell<OrchestratorBudgetState> =
666        RefCell::new(OrchestratorBudgetState::default());
667}
668
669const TERMINATED_VERSION_RETENTION_LIMIT: usize = 2;
670
671const TRIGGERS_LIFECYCLE_TOPIC: &str = "triggers.lifecycle";
672const PREDICATE_COST_WINDOW: usize = 100;
673
674#[derive(Clone, Debug, Deserialize)]
675struct LifecycleStateTransitionRecord {
676    id: String,
677    version: u32,
678    #[serde(default)]
679    definition_fingerprint: Option<String>,
680    to_state: TriggerState,
681}
682
683#[derive(Clone, Debug)]
684struct HistoricalLifecycleRecord {
685    occurred_at_ms: i64,
686    transition: LifecycleStateTransitionRecord,
687}
688
689#[derive(Clone, Copy, Debug, PartialEq, Eq)]
690pub struct RecordedTriggerBinding {
691    pub version: u32,
692    pub received_at: OffsetDateTime,
693}
694
695#[derive(Clone, Copy, Debug, Default)]
696struct HistoricalVersionLookup {
697    matching_version: Option<u32>,
698    max_version: Option<u32>,
699}
700
701pub fn clear_trigger_registry() {
702    TRIGGER_REGISTRY.with(|slot| {
703        *slot.borrow_mut() = TriggerRegistry::default();
704    });
705    clear_orchestrator_budget();
706    super::aggregation::clear_aggregation_state();
707}
708
709pub fn install_orchestrator_budget(config: OrchestratorBudgetConfig) {
710    ORCHESTRATOR_BUDGET.with(|slot| {
711        let mut state = slot.borrow_mut();
712        rollover_orchestrator_budget(&mut state);
713        state.config = config;
714    });
715}
716
717pub fn clear_orchestrator_budget() {
718    ORCHESTRATOR_BUDGET.with(|slot| {
719        *slot.borrow_mut() = OrchestratorBudgetState::default();
720    });
721}
722
723pub fn snapshot_orchestrator_budget() -> OrchestratorBudgetSnapshot {
724    ORCHESTRATOR_BUDGET.with(|slot| {
725        let mut state = slot.borrow_mut();
726        rollover_orchestrator_budget(&mut state);
727        OrchestratorBudgetSnapshot {
728            daily_cost_usd: state.config.daily_cost_usd,
729            hourly_cost_usd: state.config.hourly_cost_usd,
730            cost_today_usd_micros: state.cost_today_usd_micros,
731            cost_hour_usd_micros: state.cost_hour_usd_micros,
732            day_utc: state.day_utc,
733            hour_utc: state.hour_utc,
734        }
735    })
736}
737
738pub fn note_orchestrator_budget_cost(cost_usd_micros: u64) {
739    if cost_usd_micros == 0 {
740        return;
741    }
742    ORCHESTRATOR_BUDGET.with(|slot| {
743        let mut state = slot.borrow_mut();
744        rollover_orchestrator_budget(&mut state);
745        state.cost_today_usd_micros = state.cost_today_usd_micros.saturating_add(cost_usd_micros);
746        state.cost_hour_usd_micros = state.cost_hour_usd_micros.saturating_add(cost_usd_micros);
747    });
748}
749
750pub fn orchestrator_budget_would_exceed(expected_cost_usd_micros: u64) -> Option<&'static str> {
751    ORCHESTRATOR_BUDGET.with(|slot| {
752        let mut state = slot.borrow_mut();
753        rollover_orchestrator_budget(&mut state);
754        if state.config.hourly_cost_usd.is_some_and(|limit| {
755            micros_to_usd(
756                state
757                    .cost_hour_usd_micros
758                    .saturating_add(expected_cost_usd_micros),
759            ) > limit
760        }) {
761            return Some("orchestrator_hourly_budget_exceeded");
762        }
763        if state.config.daily_cost_usd.is_some_and(|limit| {
764            micros_to_usd(
765                state
766                    .cost_today_usd_micros
767                    .saturating_add(expected_cost_usd_micros),
768            ) > limit
769        }) {
770            return Some("orchestrator_daily_budget_exceeded");
771        }
772        None
773    })
774}
775
776pub fn reset_binding_budget_windows(binding: &TriggerBinding) {
777    let today = utc_day_key();
778    let hour = utc_hour_key();
779    let mut state = binding
780        .predicate_state
781        .lock()
782        .expect("trigger predicate state poisoned");
783    if state.budget_day_utc != Some(today) {
784        state.budget_day_utc = Some(today);
785        binding
786            .metrics
787            .cost_today_usd_micros
788            .store(0, Ordering::Relaxed);
789        binding
790            .metrics
791            .autonomous_decisions_today
792            .store(0, Ordering::Relaxed);
793    }
794    if state.budget_hour_utc != Some(hour) {
795        state.budget_hour_utc = Some(hour);
796        binding
797            .metrics
798            .cost_hour_usd_micros
799            .store(0, Ordering::Relaxed);
800        binding
801            .metrics
802            .autonomous_decisions_hour
803            .store(0, Ordering::Relaxed);
804    }
805}
806
807pub fn binding_budget_would_exceed(
808    binding: &TriggerBinding,
809    expected_cost_usd_micros: u64,
810) -> Option<&'static str> {
811    reset_binding_budget_windows(binding);
812    if binding.hourly_cost_usd.is_some_and(|limit| {
813        micros_to_usd(
814            binding
815                .metrics
816                .cost_hour_usd_micros
817                .load(Ordering::Relaxed)
818                .saturating_add(expected_cost_usd_micros),
819        ) > limit
820    }) {
821        return Some("hourly_budget_exceeded");
822    }
823    if binding.daily_cost_usd.is_some_and(|limit| {
824        micros_to_usd(
825            binding
826                .metrics
827                .cost_today_usd_micros
828                .load(Ordering::Relaxed)
829                .saturating_add(expected_cost_usd_micros),
830        ) > limit
831    }) {
832        return Some("daily_budget_exceeded");
833    }
834    None
835}
836
837pub fn binding_autonomy_budget_would_exceed(binding: &TriggerBinding) -> Option<&'static str> {
838    reset_binding_budget_windows(binding);
839    if binding
840        .max_autonomous_decisions_per_hour
841        .is_some_and(|limit| {
842            binding
843                .metrics
844                .autonomous_decisions_hour
845                .load(Ordering::Relaxed)
846                .saturating_add(1)
847                > limit
848        })
849    {
850        return Some("hourly_autonomy_budget_exceeded");
851    }
852    if binding
853        .max_autonomous_decisions_per_day
854        .is_some_and(|limit| {
855            binding
856                .metrics
857                .autonomous_decisions_today
858                .load(Ordering::Relaxed)
859                .saturating_add(1)
860                > limit
861        })
862    {
863        return Some("daily_autonomy_budget_exceeded");
864    }
865    None
866}
867
868pub fn note_autonomous_decision(binding: &TriggerBinding) {
869    reset_binding_budget_windows(binding);
870    binding
871        .metrics
872        .autonomous_decisions_total
873        .fetch_add(1, Ordering::Relaxed);
874    binding
875        .metrics
876        .autonomous_decisions_today
877        .fetch_add(1, Ordering::Relaxed);
878    binding
879        .metrics
880        .autonomous_decisions_hour
881        .fetch_add(1, Ordering::Relaxed);
882}
883
884pub fn expected_predicate_cost_usd_micros(binding: &TriggerBinding) -> u64 {
885    let state = binding
886        .predicate_state
887        .lock()
888        .expect("trigger predicate state poisoned");
889    if let Some(average) = average_cost_sample_micros(&state.recent_cost_usd_micros) {
890        return average;
891    }
892    binding
893        .when_budget
894        .as_ref()
895        .and_then(|budget| budget.max_cost_usd)
896        .map(usd_to_micros)
897        .unwrap_or_default()
898}
899
900pub fn record_predicate_cost_sample(binding: &TriggerBinding, cost_usd_micros: u64) {
901    let mut state = binding
902        .predicate_state
903        .lock()
904        .expect("trigger predicate state poisoned");
905    state.recent_cost_usd_micros.push_back(cost_usd_micros);
906    while state.recent_cost_usd_micros.len() > PREDICATE_COST_WINDOW {
907        state.recent_cost_usd_micros.pop_front();
908    }
909}
910
911fn average_cost_sample_micros(samples: &VecDeque<u64>) -> Option<u64> {
912    if samples.is_empty() {
913        return None;
914    }
915    let total: u128 = samples.iter().map(|sample| u128::from(*sample)).sum();
916    Some((total / samples.len() as u128) as u64)
917}
918
919pub fn usd_to_micros(value: f64) -> u64 {
920    if !value.is_finite() || value <= 0.0 {
921        return 0;
922    }
923    (value * 1_000_000.0).ceil() as u64
924}
925
926pub fn micros_to_usd(value: u64) -> f64 {
927    value as f64 / 1_000_000.0
928}
929
930fn rollover_orchestrator_budget(state: &mut OrchestratorBudgetState) {
931    let today = utc_day_key();
932    let hour = utc_hour_key();
933    if state.day_utc != today {
934        state.day_utc = today;
935        state.cost_today_usd_micros = 0;
936    }
937    if state.hour_utc != hour {
938        state.hour_utc = hour;
939        state.cost_hour_usd_micros = 0;
940    }
941}
942
943fn utc_day_key() -> i32 {
944    (clock::now_utc().date()
945        - time::Date::from_calendar_date(1970, time::Month::January, 1).expect("valid epoch date"))
946    .whole_days() as i32
947}
948
949fn utc_hour_key() -> i64 {
950    clock::now_utc().unix_timestamp() / 3_600
951}
952
953pub fn snapshot_trigger_bindings() -> Vec<TriggerBindingSnapshot> {
954    TRIGGER_REGISTRY.with(|slot| {
955        let registry = slot.borrow();
956        let mut snapshots = Vec::new();
957        for bindings in registry.bindings.values() {
958            for binding in bindings {
959                snapshots.push(binding.snapshot());
960            }
961        }
962        snapshots.sort_by(|left, right| {
963            left.id
964                .cmp(&right.id)
965                .then(left.version.cmp(&right.version))
966                .then(left.state.as_str().cmp(right.state.as_str()))
967        });
968        snapshots
969    })
970}
971
972#[allow(clippy::arc_with_non_send_sync)]
973pub fn resolve_trigger_binding_as_of(
974    id: &str,
975    as_of: OffsetDateTime,
976) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
977    let version = binding_version_as_of(id, as_of)?;
978    resolve_trigger_binding_version(id, version)
979}
980
981#[allow(clippy::arc_with_non_send_sync)]
982pub fn resolve_live_or_as_of(
983    id: &str,
984    recorded: RecordedTriggerBinding,
985) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
986    match resolve_live_trigger_binding(id, Some(recorded.version)) {
987        Ok(binding) => Ok(binding),
988        Err(TriggerRegistryError::UnknownBindingVersion { .. }) => {
989            let binding = resolve_trigger_binding_as_of(id, recorded.received_at)?;
990            let mut metadata = BTreeMap::new();
991            metadata.insert("trigger_id".to_string(), serde_json::json!(id));
992            metadata.insert(
993                "recorded_version".to_string(),
994                serde_json::json!(recorded.version),
995            );
996            metadata.insert(
997                "received_at".to_string(),
998                serde_json::json!(recorded
999                    .received_at
1000                    .format(&time::format_description::well_known::Rfc3339)
1001                    .unwrap_or_else(|_| recorded.received_at.to_string())),
1002            );
1003            metadata.insert(
1004                "resolved_version".to_string(),
1005                serde_json::json!(binding.version),
1006            );
1007            crate::events::log_warn_meta(
1008                "replay.binding_version_gc_fallback",
1009                "trigger replay fell back to lifecycle history after binding version GC",
1010                metadata,
1011            );
1012            Ok(binding)
1013        }
1014        Err(error) => Err(error),
1015    }
1016}
1017
1018pub fn binding_version_as_of(id: &str, as_of: OffsetDateTime) -> Result<u32, TriggerRegistryError> {
1019    TRIGGER_REGISTRY.with(|slot| {
1020        let registry = slot.borrow();
1021        registry.binding_version_as_of(id, as_of)
1022    })
1023}
1024
1025#[allow(clippy::arc_with_non_send_sync)]
1026fn resolve_trigger_binding_version(
1027    id: &str,
1028    version: u32,
1029) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
1030    TRIGGER_REGISTRY.with(|slot| {
1031        let registry = slot.borrow();
1032        registry
1033            .binding(id, version)
1034            .ok_or_else(|| TriggerRegistryError::UnknownBindingVersion {
1035                id: id.to_string(),
1036                version,
1037            })
1038    })
1039}
1040
1041#[allow(clippy::arc_with_non_send_sync)]
1042pub fn resolve_live_trigger_binding(
1043    id: &str,
1044    version: Option<u32>,
1045) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
1046    TRIGGER_REGISTRY.with(|slot| {
1047        let registry = slot.borrow();
1048        if let Some(version) = version {
1049            let binding = registry.binding(id, version).ok_or_else(|| {
1050                TriggerRegistryError::UnknownBindingVersion {
1051                    id: id.to_string(),
1052                    version,
1053                }
1054            })?;
1055            if binding.state_snapshot() == TriggerState::Terminated {
1056                return Err(TriggerRegistryError::UnknownBindingVersion {
1057                    id: id.to_string(),
1058                    version,
1059                });
1060            }
1061            return Ok(binding);
1062        }
1063
1064        registry
1065            .live_bindings_any_source(id)
1066            .into_iter()
1067            .max_by_key(|binding| binding.version)
1068            .ok_or_else(|| TriggerRegistryError::UnknownId(id.to_string()))
1069    })
1070}
1071
1072/// Snapshot active channel-source trigger bindings whose `channel:` selector
1073/// in `match_events[0]` matches the supplied (scope, scope_id, name) tuple.
1074///
1075/// Used by [`crate::channels`] fan-out (CH-02 / #1872). Returns bindings
1076/// sorted by id then version so dispatch ordering is deterministic.
1077pub(crate) fn channel_bindings_matching(
1078    scope: &str,
1079    scope_id: &str,
1080    name: &str,
1081) -> Vec<Arc<TriggerBinding>> {
1082    TRIGGER_REGISTRY.with(|slot| {
1083        let registry = slot.borrow();
1084        let Some(binding_ids) = registry.by_provider.get("channel") else {
1085            return Vec::new();
1086        };
1087        let mut bindings = Vec::new();
1088        for id in binding_ids {
1089            let Some(versions) = registry.bindings.get(id) else {
1090                continue;
1091            };
1092            for binding in versions {
1093                if binding.state_snapshot() != TriggerState::Active {
1094                    continue;
1095                }
1096                let Some(selector_raw) = binding.match_events.first() else {
1097                    continue;
1098                };
1099                let Ok(selector) = crate::channels::ChannelSelector::parse(selector_raw) else {
1100                    continue;
1101                };
1102                // For tenant-default ("Current") selectors we resolve "current"
1103                // to the scope_id of the emit, since the trigger and the
1104                // emitter share the same registry / runtime context.
1105                if !selector.matches(scope, scope_id, name, scope_id) {
1106                    continue;
1107                }
1108                bindings.push(binding.clone());
1109            }
1110        }
1111        bindings.sort_by(|left, right| {
1112            left.id
1113                .as_str()
1114                .cmp(right.id.as_str())
1115                .then(left.version.cmp(&right.version))
1116        });
1117        bindings
1118    })
1119}
1120
1121pub(crate) fn matching_bindings(event: &super::TriggerEvent) -> Vec<Arc<TriggerBinding>> {
1122    TRIGGER_REGISTRY.with(|slot| {
1123        let registry = slot.borrow();
1124        let Some(binding_ids) = registry.by_provider.get(event.provider.as_str()) else {
1125            return Vec::new();
1126        };
1127
1128        let mut bindings = Vec::new();
1129        for id in binding_ids {
1130            let Some(versions) = registry.bindings.get(id) else {
1131                continue;
1132            };
1133            for binding in versions {
1134                if binding.state_snapshot() != TriggerState::Active {
1135                    continue;
1136                }
1137                if !binding.match_events.is_empty()
1138                    && !binding.match_events.iter().any(|kind| kind == &event.kind)
1139                {
1140                    continue;
1141                }
1142                bindings.push(binding.clone());
1143            }
1144        }
1145
1146        bindings.sort_by(|left, right| {
1147            left.id
1148                .as_str()
1149                .cmp(right.id.as_str())
1150                .then(left.version.cmp(&right.version))
1151        });
1152        bindings
1153    })
1154}
1155
1156pub async fn install_manifest_triggers(
1157    specs: Vec<TriggerBindingSpec>,
1158) -> Result<(), TriggerRegistryError> {
1159    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1160        let registry = &mut *slot.borrow_mut();
1161        registry.refresh_runtime_context();
1162        let mut touched_ids = BTreeSet::new();
1163
1164        let mut incoming = BTreeMap::new();
1165        for spec in specs {
1166            let spec_id = spec.id.clone();
1167            if spec.source != TriggerBindingSource::Manifest {
1168                return Err(TriggerRegistryError::InvalidSpec(format!(
1169                    "manifest install received non-manifest trigger '{spec_id}'"
1170                )));
1171            }
1172            if spec_id.trim().is_empty() {
1173                return Err(TriggerRegistryError::InvalidSpec(
1174                    "manifest trigger id cannot be empty".to_string(),
1175                ));
1176            }
1177            if incoming.insert(spec_id.clone(), spec).is_some() {
1178                return Err(TriggerRegistryError::DuplicateId(spec_id));
1179            }
1180        }
1181
1182        let mut lifecycle = Vec::new();
1183        let existing_ids: Vec<String> = registry
1184            .bindings
1185            .iter()
1186            .filter(|(_, bindings)| {
1187                bindings.iter().any(|binding| {
1188                    binding.source == TriggerBindingSource::Manifest
1189                        && binding.state_snapshot() != TriggerState::Terminated
1190                })
1191            })
1192            .map(|(id, _)| id.clone())
1193            .collect();
1194
1195        for id in existing_ids {
1196            let live_manifest = registry.live_bindings(&id, TriggerBindingSource::Manifest);
1197            let Some(spec) = incoming.remove(&id) else {
1198                for binding in live_manifest {
1199                    registry.transition_binding_to_draining(&binding, &mut lifecycle);
1200                }
1201                touched_ids.insert(id.clone());
1202                continue;
1203            };
1204
1205            let has_matching_active = live_manifest.iter().any(|binding| {
1206                binding.definition_fingerprint == spec.definition_fingerprint
1207                    && matches!(
1208                        binding.state_snapshot(),
1209                        TriggerState::Registering | TriggerState::Active
1210                    )
1211            });
1212            if has_matching_active {
1213                continue;
1214            }
1215
1216            for binding in live_manifest {
1217                registry.transition_binding_to_draining(&binding, &mut lifecycle);
1218            }
1219
1220            let version = registry.next_version_for_spec(&spec);
1221            registry.register_binding(spec, version, &mut lifecycle);
1222            touched_ids.insert(id.clone());
1223        }
1224
1225        for spec in incoming.into_values() {
1226            touched_ids.insert(spec.id.clone());
1227            let version = registry.next_version_for_spec(&spec);
1228            registry.register_binding(spec, version, &mut lifecycle);
1229        }
1230
1231        for id in touched_ids {
1232            registry.gc_terminated_versions(&id);
1233        }
1234
1235        Ok((registry.event_log.clone(), lifecycle))
1236    })?;
1237
1238    append_lifecycle_events(event_log, events).await
1239}
1240
1241pub async fn dynamic_register(
1242    mut spec: TriggerBindingSpec,
1243) -> Result<TriggerId, TriggerRegistryError> {
1244    if spec.id.trim().is_empty() {
1245        spec.id = format!("dynamic_trigger_{}", Uuid::now_v7());
1246    }
1247    spec.source = TriggerBindingSource::Dynamic;
1248    let id = spec.id.clone();
1249    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1250        let registry = &mut *slot.borrow_mut();
1251        registry.refresh_runtime_context();
1252
1253        if registry.bindings.contains_key(id.as_str()) {
1254            return Err(TriggerRegistryError::DuplicateId(id.clone()));
1255        }
1256
1257        let mut lifecycle = Vec::new();
1258        let version = registry.next_version_for_spec(&spec);
1259        registry.register_binding(spec, version, &mut lifecycle);
1260        Ok((registry.event_log.clone(), lifecycle))
1261    })?;
1262
1263    append_lifecycle_events(event_log, events).await?;
1264    Ok(TriggerId::new(id))
1265}
1266
1267pub async fn dynamic_deregister(id: &str) -> Result<(), TriggerRegistryError> {
1268    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1269        let registry = &mut *slot.borrow_mut();
1270        let live_dynamic = registry.live_bindings(id, TriggerBindingSource::Dynamic);
1271        if live_dynamic.is_empty() {
1272            return Err(TriggerRegistryError::UnknownId(id.to_string()));
1273        }
1274
1275        let mut lifecycle = Vec::new();
1276        for binding in live_dynamic {
1277            registry.transition_binding_to_draining(&binding, &mut lifecycle);
1278        }
1279        Ok((registry.event_log.clone(), lifecycle))
1280    })?;
1281
1282    append_lifecycle_events(event_log, events).await
1283}
1284
1285pub async fn drain(id: &str) -> Result<(), TriggerRegistryError> {
1286    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1287        let registry = &mut *slot.borrow_mut();
1288        let live = registry.live_bindings_any_source(id);
1289        if live.is_empty() {
1290            return Err(TriggerRegistryError::UnknownId(id.to_string()));
1291        }
1292
1293        let mut lifecycle = Vec::new();
1294        for binding in live {
1295            registry.transition_binding_to_draining(&binding, &mut lifecycle);
1296        }
1297        Ok((registry.event_log.clone(), lifecycle))
1298    })?;
1299
1300    append_lifecycle_events(event_log, events).await
1301}
1302
1303pub async fn pause(id: &str) -> Result<(), TriggerRegistryError> {
1304    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1305        let registry = &mut *slot.borrow_mut();
1306        let live = registry.live_bindings_any_source(id);
1307        if live.is_empty() {
1308            return Err(TriggerRegistryError::UnknownId(id.to_string()));
1309        }
1310
1311        let mut lifecycle = Vec::new();
1312        for binding in live {
1313            match binding.state_snapshot() {
1314                TriggerState::Registering | TriggerState::Active => {
1315                    registry.transition_binding_state(
1316                        &binding,
1317                        TriggerState::Paused,
1318                        &mut lifecycle,
1319                    );
1320                }
1321                TriggerState::Paused | TriggerState::Draining | TriggerState::Terminated => {}
1322            }
1323        }
1324        Ok((registry.event_log.clone(), lifecycle))
1325    })?;
1326
1327    append_lifecycle_events(event_log, events).await
1328}
1329
1330pub async fn resume(id: &str) -> Result<(), TriggerRegistryError> {
1331    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1332        let registry = &mut *slot.borrow_mut();
1333        let live = registry.live_bindings_any_source(id);
1334        if live.is_empty() {
1335            return Err(TriggerRegistryError::UnknownId(id.to_string()));
1336        }
1337
1338        let mut lifecycle = Vec::new();
1339        for binding in live {
1340            if binding.state_snapshot() == TriggerState::Paused {
1341                registry.transition_binding_state(&binding, TriggerState::Active, &mut lifecycle);
1342            }
1343        }
1344        Ok((registry.event_log.clone(), lifecycle))
1345    })?;
1346
1347    append_lifecycle_events(event_log, events).await
1348}
1349
1350fn pin_trigger_binding_inner(
1351    id: &str,
1352    version: u32,
1353    allow_terminated: bool,
1354) -> Result<(), TriggerRegistryError> {
1355    TRIGGER_REGISTRY.with(|slot| {
1356        let registry = slot.borrow();
1357        let binding = registry.binding(id, version).ok_or_else(|| {
1358            TriggerRegistryError::UnknownBindingVersion {
1359                id: id.to_string(),
1360                version,
1361            }
1362        })?;
1363        match binding.state_snapshot() {
1364            TriggerState::Paused => Err(TriggerRegistryError::InvalidSpec(format!(
1365                "trigger binding '{id}' version {version} is paused"
1366            ))),
1367            TriggerState::Terminated if !allow_terminated => {
1368                Err(TriggerRegistryError::InvalidSpec(format!(
1369                    "trigger binding '{id}' version {version} is terminated"
1370                )))
1371            }
1372            _ => {
1373                binding.in_flight.fetch_add(1, Ordering::Relaxed);
1374                Ok(())
1375            }
1376        }
1377    })
1378}
1379
1380pub fn pin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1381    pin_trigger_binding_inner(id, version, false)
1382}
1383
1384pub async fn unpin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1385    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
1386        let registry = &mut *slot.borrow_mut();
1387        let binding = registry.binding(id, version).ok_or_else(|| {
1388            TriggerRegistryError::UnknownBindingVersion {
1389                id: id.to_string(),
1390                version,
1391            }
1392        })?;
1393        let current = binding.in_flight.load(Ordering::Relaxed);
1394        if current == 0 {
1395            return Err(TriggerRegistryError::InvalidSpec(format!(
1396                "trigger binding '{id}' version {version} has no in-flight events"
1397            )));
1398        }
1399        binding.in_flight.fetch_sub(1, Ordering::Relaxed);
1400
1401        let mut lifecycle = Vec::new();
1402        registry.maybe_finalize_draining(&binding, &mut lifecycle);
1403        registry.gc_terminated_versions(binding.id.as_str());
1404        Ok((registry.event_log.clone(), lifecycle))
1405    })?;
1406
1407    append_lifecycle_events(event_log, events).await
1408}
1409
1410pub fn begin_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1411    begin_in_flight_inner(id, version, false)
1412}
1413
1414pub(crate) fn begin_replay_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
1415    begin_in_flight_inner(id, version, true)
1416}
1417
1418fn begin_in_flight_inner(
1419    id: &str,
1420    version: u32,
1421    allow_terminated: bool,
1422) -> Result<(), TriggerRegistryError> {
1423    pin_trigger_binding_inner(id, version, allow_terminated)?;
1424    TRIGGER_REGISTRY.with(|slot| {
1425        let registry = slot.borrow();
1426        let binding = registry.binding(id, version).ok_or_else(|| {
1427            TriggerRegistryError::UnknownBindingVersion {
1428                id: id.to_string(),
1429                version,
1430            }
1431        })?;
1432        binding.metrics.received.fetch_add(1, Ordering::Relaxed);
1433        *binding
1434            .metrics
1435            .last_received_ms
1436            .lock()
1437            .expect("trigger metrics poisoned") = Some(now_ms());
1438        Ok(())
1439    })
1440}
1441
1442pub async fn finish_in_flight(
1443    id: &str,
1444    version: u32,
1445    outcome: TriggerDispatchOutcome,
1446) -> Result<(), TriggerRegistryError> {
1447    TRIGGER_REGISTRY.with(|slot| {
1448        let registry = &mut *slot.borrow_mut();
1449        let binding = registry.binding(id, version).ok_or_else(|| {
1450            TriggerRegistryError::UnknownBindingVersion {
1451                id: id.to_string(),
1452                version,
1453            }
1454        })?;
1455        let current = binding.in_flight.load(Ordering::Relaxed);
1456        if current == 0 {
1457            return Err(TriggerRegistryError::InvalidSpec(format!(
1458                "trigger binding '{id}' version {version} has no in-flight events"
1459            )));
1460        }
1461        match outcome {
1462            TriggerDispatchOutcome::Dispatched => {
1463                binding.metrics.dispatched.fetch_add(1, Ordering::Relaxed);
1464            }
1465            TriggerDispatchOutcome::Failed => {
1466                binding.metrics.failed.fetch_add(1, Ordering::Relaxed);
1467            }
1468            TriggerDispatchOutcome::Dlq => {
1469                binding.metrics.dlq.fetch_add(1, Ordering::Relaxed);
1470            }
1471        }
1472        Ok(())
1473    })?;
1474
1475    unpin_trigger_binding(id, version).await
1476}
1477
1478impl TriggerRegistry {
1479    fn refresh_runtime_context(&mut self) {
1480        if self.event_log.is_none() {
1481            self.event_log = active_event_log();
1482        }
1483        if self.secret_provider.is_none() {
1484            self.secret_provider = default_secret_provider();
1485        }
1486    }
1487
1488    fn binding(&self, id: &str, version: u32) -> Option<Arc<TriggerBinding>> {
1489        self.bindings
1490            .get(id)
1491            .and_then(|bindings| bindings.iter().find(|binding| binding.version == version))
1492            .cloned()
1493    }
1494
1495    fn live_bindings(&self, id: &str, source: TriggerBindingSource) -> Vec<Arc<TriggerBinding>> {
1496        self.bindings
1497            .get(id)
1498            .into_iter()
1499            .flat_map(|bindings| bindings.iter())
1500            .filter(|binding| {
1501                binding.source == source && binding.state_snapshot() != TriggerState::Terminated
1502            })
1503            .cloned()
1504            .collect()
1505    }
1506
1507    fn live_bindings_any_source(&self, id: &str) -> Vec<Arc<TriggerBinding>> {
1508        self.bindings
1509            .get(id)
1510            .into_iter()
1511            .flat_map(|bindings| bindings.iter())
1512            .filter(|binding| binding.state_snapshot() != TriggerState::Terminated)
1513            .cloned()
1514            .collect()
1515    }
1516
1517    fn next_version_for_spec(&self, spec: &TriggerBindingSpec) -> u32 {
1518        if let Some(version) = self
1519            .bindings
1520            .get(spec.id.as_str())
1521            .into_iter()
1522            .flat_map(|bindings| bindings.iter())
1523            .find(|binding| binding.definition_fingerprint == spec.definition_fingerprint)
1524            .map(|binding| binding.version)
1525        {
1526            return version;
1527        }
1528
1529        let historical =
1530            self.historical_versions_for(spec.id.as_str(), spec.definition_fingerprint.as_str());
1531        if let Some(version) = historical.matching_version {
1532            return version;
1533        }
1534
1535        self.bindings
1536            .get(spec.id.as_str())
1537            .into_iter()
1538            .flat_map(|bindings| bindings.iter())
1539            .map(|binding| binding.version)
1540            .chain(historical.max_version)
1541            .max()
1542            .unwrap_or(0)
1543            + 1
1544    }
1545
1546    fn gc_terminated_versions(&mut self, id: &str) {
1547        let Some(bindings) = self.bindings.get_mut(id) else {
1548            return;
1549        };
1550
1551        let mut newest_versions: Vec<u32> =
1552            bindings.iter().map(|binding| binding.version).collect();
1553        newest_versions.sort_unstable_by(|left, right| right.cmp(left));
1554        newest_versions.truncate(TERMINATED_VERSION_RETENTION_LIMIT);
1555        let retained_versions: BTreeSet<u32> = newest_versions.into_iter().collect();
1556
1557        bindings.retain(|binding| {
1558            binding.state_snapshot() != TriggerState::Terminated
1559                || retained_versions.contains(&binding.version)
1560        });
1561
1562        if bindings.is_empty() {
1563            self.bindings.remove(id);
1564        }
1565    }
1566
1567    fn historical_versions_for(&self, id: &str, fingerprint: &str) -> HistoricalVersionLookup {
1568        let mut lookup = HistoricalVersionLookup::default();
1569        for record in self.lifecycle_records_for(id) {
1570            lookup.max_version = Some(
1571                lookup
1572                    .max_version
1573                    .unwrap_or(0)
1574                    .max(record.transition.version),
1575            );
1576            if record.transition.definition_fingerprint.as_deref() == Some(fingerprint) {
1577                lookup.matching_version = Some(record.transition.version);
1578            }
1579        }
1580        lookup
1581    }
1582
1583    fn binding_version_as_of(
1584        &self,
1585        id: &str,
1586        as_of: OffsetDateTime,
1587    ) -> Result<u32, TriggerRegistryError> {
1588        let cutoff_ms = harn_clock::offset_datetime_to_ms(as_of);
1589        let mut active_version = None;
1590        for record in self.lifecycle_records_for(id) {
1591            if record.occurred_at_ms > cutoff_ms {
1592                break;
1593            }
1594            match record.transition.to_state {
1595                TriggerState::Active => active_version = Some(record.transition.version),
1596                TriggerState::Paused | TriggerState::Draining | TriggerState::Terminated => {
1597                    if active_version == Some(record.transition.version) {
1598                        active_version = None;
1599                    }
1600                }
1601                TriggerState::Registering => {}
1602            }
1603        }
1604
1605        active_version.ok_or_else(|| {
1606            TriggerRegistryError::InvalidSpec(format!(
1607                "no active trigger binding '{}' at {}",
1608                id,
1609                as_of
1610                    .format(&time::format_description::well_known::Rfc3339)
1611                    .unwrap_or_else(|_| as_of.to_string())
1612            ))
1613        })
1614    }
1615
1616    fn lifecycle_records_for(&self, id: &str) -> Vec<HistoricalLifecycleRecord> {
1617        let Some(event_log) = self.event_log.as_ref() else {
1618            return Vec::new();
1619        };
1620        let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1621            .expect("static triggers.lifecycle topic should always be valid");
1622        futures::executor::block_on(event_log.read_range(&topic, None, usize::MAX))
1623            .unwrap_or_default()
1624            .into_iter()
1625            .filter_map(|(_, event)| {
1626                let occurred_at_ms = event.occurred_at_ms;
1627                let transition: LifecycleStateTransitionRecord =
1628                    serde_json::from_value(event.payload).ok()?;
1629                (transition.id == id).then_some(HistoricalLifecycleRecord {
1630                    occurred_at_ms,
1631                    transition,
1632                })
1633            })
1634            .collect()
1635    }
1636
1637    #[allow(clippy::arc_with_non_send_sync)]
1638    fn register_binding(
1639        &mut self,
1640        spec: TriggerBindingSpec,
1641        version: u32,
1642        lifecycle: &mut Vec<LogEvent>,
1643    ) -> Arc<TriggerBinding> {
1644        let binding = Arc::new(TriggerBinding::new(spec, version));
1645        self.by_provider
1646            .entry(binding.provider.as_str().to_string())
1647            .or_default()
1648            .insert(binding.id.as_str().to_string());
1649        self.bindings
1650            .entry(binding.id.as_str().to_string())
1651            .or_default()
1652            .push(binding.clone());
1653        lifecycle.push(lifecycle_event(&binding, None, TriggerState::Registering));
1654        self.transition_binding_state(&binding, TriggerState::Active, lifecycle);
1655        binding
1656    }
1657
1658    fn transition_binding_to_draining(
1659        &self,
1660        binding: &Arc<TriggerBinding>,
1661        lifecycle: &mut Vec<LogEvent>,
1662    ) {
1663        if matches!(binding.state_snapshot(), TriggerState::Terminated) {
1664            return;
1665        }
1666        self.transition_binding_state(binding, TriggerState::Draining, lifecycle);
1667        self.maybe_finalize_draining(binding, lifecycle);
1668    }
1669
1670    fn maybe_finalize_draining(
1671        &self,
1672        binding: &Arc<TriggerBinding>,
1673        lifecycle: &mut Vec<LogEvent>,
1674    ) {
1675        if binding.state_snapshot() == TriggerState::Draining
1676            && binding.in_flight.load(Ordering::Relaxed) == 0
1677        {
1678            self.transition_binding_state(binding, TriggerState::Terminated, lifecycle);
1679        }
1680    }
1681
1682    fn transition_binding_state(
1683        &self,
1684        binding: &Arc<TriggerBinding>,
1685        next: TriggerState,
1686        lifecycle: &mut Vec<LogEvent>,
1687    ) {
1688        let mut state = binding.state.lock().expect("trigger state poisoned");
1689        let previous = *state;
1690        if previous == next {
1691            return;
1692        }
1693        *state = next;
1694        drop(state);
1695        // CH-04 (#1875): drop any pending aggregation buffers when the
1696        // binding terminates so a long-lived buffer doesn't fire after
1697        // the trigger has been removed. Leftover events are discarded —
1698        // matches the in-flight drain contract elsewhere in the
1699        // registry (the trigger is gone; we'd have nowhere to dispatch).
1700        if next == TriggerState::Terminated && binding.aggregation.is_some() {
1701            let _ = super::aggregation::drop_binding_aggregation(&binding.binding_key());
1702        }
1703        lifecycle.push(lifecycle_event(binding, Some(previous), next));
1704    }
1705}
1706
1707fn lifecycle_event(
1708    binding: &TriggerBinding,
1709    from_state: Option<TriggerState>,
1710    to_state: TriggerState,
1711) -> LogEvent {
1712    LogEvent::new(
1713        "state_transition",
1714        serde_json::json!({
1715            "id": binding.id.as_str(),
1716            "binding_key": binding.binding_key(),
1717            "version": binding.version,
1718            "provider": binding.provider.as_str(),
1719            "kind": &binding.kind,
1720            "source": binding.source.as_str(),
1721            "handler_kind": binding.handler.kind(),
1722            "definition_fingerprint": &binding.definition_fingerprint,
1723            "from_state": from_state.map(TriggerState::as_str),
1724            "to_state": to_state.as_str(),
1725        }),
1726    )
1727}
1728
1729async fn append_lifecycle_events(
1730    event_log: Option<Arc<AnyEventLog>>,
1731    events: Vec<LogEvent>,
1732) -> Result<(), TriggerRegistryError> {
1733    let Some(event_log) = event_log else {
1734        return Ok(());
1735    };
1736    if events.is_empty() {
1737        return Ok(());
1738    }
1739
1740    let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1741        .expect("static triggers.lifecycle topic should always be valid");
1742    for event in events {
1743        event_log
1744            .append(&topic, event)
1745            .await
1746            .map_err(|error| TriggerRegistryError::EventLog(error.to_string()))?;
1747    }
1748    Ok(())
1749}
1750
1751fn default_secret_provider() -> Option<Arc<dyn SecretProvider>> {
1752    configured_default_chain(default_secret_namespace())
1753        .ok()
1754        .map(|provider| Arc::new(provider) as Arc<dyn SecretProvider>)
1755}
1756
1757fn default_secret_namespace() -> String {
1758    if let Ok(namespace) = std::env::var("HARN_SECRET_NAMESPACE") {
1759        if !namespace.trim().is_empty() {
1760            return namespace;
1761        }
1762    }
1763
1764    let cwd = std::env::current_dir().unwrap_or_default();
1765    let leaf = cwd
1766        .file_name()
1767        .and_then(|name| name.to_str())
1768        .filter(|name| !name.is_empty())
1769        .unwrap_or("workspace");
1770    format!("harn/{leaf}")
1771}
1772
1773fn now_ms() -> i64 {
1774    clock::now_ms()
1775}
1776
1777#[cfg(test)]
1778mod tests {
1779    use super::*;
1780    use crate::event_log::{install_default_for_base_dir, reset_active_event_log};
1781    use crate::events::{add_event_sink, clear_event_sinks, CollectorSink, EventLevel};
1782    use std::rc::Rc;
1783
1784    fn manifest_spec(id: &str, fingerprint: &str) -> TriggerBindingSpec {
1785        TriggerBindingSpec {
1786            id: id.to_string(),
1787            source: TriggerBindingSource::Manifest,
1788            kind: "webhook".to_string(),
1789            provider: ProviderId::from("github"),
1790            autonomy_tier: crate::AutonomyTier::ActAuto,
1791            handler: TriggerHandlerSpec::Worker {
1792                queue: format!("{id}-queue"),
1793            },
1794            dispatch_priority: crate::WorkerQueuePriority::Normal,
1795            when: None,
1796            when_budget: None,
1797            retry: TriggerRetryConfig::default(),
1798            match_events: vec!["issues.opened".to_string()],
1799            dedupe_key: Some("event.dedupe_key".to_string()),
1800            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1801            filter: Some("event.kind".to_string()),
1802            daily_cost_usd: Some(5.0),
1803            hourly_cost_usd: None,
1804            max_autonomous_decisions_per_hour: None,
1805            max_autonomous_decisions_per_day: None,
1806            on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1807            max_concurrent: Some(10),
1808            flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1809            aggregation: None,
1810            manifest_path: None,
1811            package_name: Some("workspace".to_string()),
1812            definition_fingerprint: fingerprint.to_string(),
1813        }
1814    }
1815
1816    fn dynamic_spec(id: &str) -> TriggerBindingSpec {
1817        TriggerBindingSpec {
1818            id: id.to_string(),
1819            source: TriggerBindingSource::Dynamic,
1820            kind: "webhook".to_string(),
1821            provider: ProviderId::from("github"),
1822            autonomy_tier: crate::AutonomyTier::ActAuto,
1823            handler: TriggerHandlerSpec::Worker {
1824                queue: format!("{id}-queue"),
1825            },
1826            dispatch_priority: crate::WorkerQueuePriority::Normal,
1827            when: None,
1828            when_budget: None,
1829            retry: TriggerRetryConfig::default(),
1830            match_events: vec!["issues.opened".to_string()],
1831            dedupe_key: None,
1832            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1833            filter: None,
1834            daily_cost_usd: None,
1835            hourly_cost_usd: None,
1836            max_autonomous_decisions_per_hour: None,
1837            max_autonomous_decisions_per_day: None,
1838            on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1839            max_concurrent: None,
1840            flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1841            aggregation: None,
1842            manifest_path: None,
1843            package_name: None,
1844            definition_fingerprint: format!("dynamic:{id}"),
1845        }
1846    }
1847
1848    #[tokio::test(flavor = "current_thread")]
1849    async fn manifest_loaded_trigger_registers_with_zeroed_metrics() {
1850        clear_trigger_registry();
1851
1852        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1853            .await
1854            .expect("manifest trigger installs");
1855
1856        let snapshots = snapshot_trigger_bindings();
1857        assert_eq!(snapshots.len(), 1);
1858        let binding = &snapshots[0];
1859        assert_eq!(binding.id, "github-new-issue");
1860        assert_eq!(binding.version, 1);
1861        assert_eq!(binding.state, TriggerState::Active);
1862        assert_eq!(binding.metrics, TriggerMetricsSnapshot::default());
1863
1864        clear_trigger_registry();
1865    }
1866
1867    #[tokio::test(flavor = "current_thread")]
1868    async fn dynamic_register_assigns_unique_ids_and_rejects_duplicates() {
1869        clear_trigger_registry();
1870
1871        let first = dynamic_register(dynamic_spec("dynamic-a"))
1872            .await
1873            .expect("first dynamic trigger");
1874        let second = dynamic_register(dynamic_spec("dynamic-b"))
1875            .await
1876            .expect("second dynamic trigger");
1877        assert_ne!(first, second);
1878
1879        let error = dynamic_register(dynamic_spec("dynamic-a"))
1880            .await
1881            .expect_err("duplicate id should fail");
1882        assert!(matches!(error, TriggerRegistryError::DuplicateId(_)));
1883
1884        clear_trigger_registry();
1885    }
1886
1887    #[test]
1888    fn expected_predicate_cost_average_does_not_overflow() {
1889        let binding = TriggerBinding::new(manifest_spec("costed", "v1"), 1);
1890        record_predicate_cost_sample(&binding, u64::MAX);
1891        record_predicate_cost_sample(&binding, u64::MAX);
1892
1893        assert_eq!(expected_predicate_cost_usd_micros(&binding), u64::MAX);
1894    }
1895
1896    #[tokio::test(flavor = "current_thread")]
1897    async fn drain_waits_for_in_flight_events_before_terminating() {
1898        clear_trigger_registry();
1899
1900        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1901            .await
1902            .expect("manifest trigger installs");
1903        begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1904
1905        drain("github-new-issue").await.expect("drain succeeds");
1906        let binding = snapshot_trigger_bindings()
1907            .into_iter()
1908            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1909            .expect("binding snapshot");
1910        assert_eq!(binding.state, TriggerState::Draining);
1911        assert_eq!(binding.metrics.in_flight, 1);
1912
1913        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1914            .await
1915            .expect("finish in-flight event");
1916        let binding = snapshot_trigger_bindings()
1917            .into_iter()
1918            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1919            .expect("binding snapshot");
1920        assert_eq!(binding.state, TriggerState::Terminated);
1921        assert_eq!(binding.metrics.in_flight, 0);
1922
1923        clear_trigger_registry();
1924    }
1925
1926    #[tokio::test(flavor = "current_thread")]
1927    async fn hot_reload_registers_new_version_while_old_binding_drains() {
1928        clear_trigger_registry();
1929
1930        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1931            .await
1932            .expect("initial manifest trigger installs");
1933        begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1934
1935        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1936            .await
1937            .expect("updated manifest trigger installs");
1938
1939        let snapshots = snapshot_trigger_bindings();
1940        assert_eq!(snapshots.len(), 2);
1941        let old = snapshots
1942            .iter()
1943            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1944            .expect("old binding");
1945        let new = snapshots
1946            .iter()
1947            .find(|binding| binding.id == "github-new-issue" && binding.version == 2)
1948            .expect("new binding");
1949        assert_eq!(old.state, TriggerState::Draining);
1950        assert_eq!(new.state, TriggerState::Active);
1951
1952        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1953            .await
1954            .expect("finish old in-flight event");
1955        let old = snapshot_trigger_bindings()
1956            .into_iter()
1957            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1958            .expect("old binding");
1959        assert_eq!(old.state, TriggerState::Terminated);
1960
1961        clear_trigger_registry();
1962    }
1963
1964    #[tokio::test(flavor = "current_thread")]
1965    async fn gc_drops_terminated_versions_beyond_retention_limit() {
1966        clear_trigger_registry();
1967
1968        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1969            .await
1970            .expect("install v1");
1971        begin_in_flight("github-new-issue", 1).expect("pin v1");
1972
1973        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1974            .await
1975            .expect("install v2");
1976        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1977            .await
1978            .expect("finish v1");
1979
1980        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
1981            .await
1982            .expect("install v3");
1983
1984        let snapshots = snapshot_trigger_bindings();
1985        let versions: Vec<u32> = snapshots
1986            .into_iter()
1987            .filter(|binding| binding.id == "github-new-issue")
1988            .map(|binding| binding.version)
1989            .collect();
1990        assert_eq!(versions, vec![2, 3]);
1991
1992        clear_trigger_registry();
1993    }
1994
1995    #[tokio::test(flavor = "current_thread")]
1996    async fn lifecycle_transitions_append_to_event_log() {
1997        clear_trigger_registry();
1998        reset_active_event_log();
1999        let tempdir = tempfile::tempdir().expect("tempdir");
2000        let log = install_default_for_base_dir(tempdir.path()).expect("install event log");
2001
2002        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2003            .await
2004            .expect("manifest trigger installs");
2005        begin_in_flight("github-new-issue", 1).expect("start in-flight event");
2006        drain("github-new-issue").await.expect("drain succeeds");
2007        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
2008            .await
2009            .expect("finish event");
2010
2011        let topic = Topic::new("triggers.lifecycle").expect("valid lifecycle topic");
2012        let events = log
2013            .read_range(&topic, None, 32)
2014            .await
2015            .expect("read lifecycle events");
2016        let states: Vec<String> = events
2017            .into_iter()
2018            .filter_map(|(_, event)| {
2019                event
2020                    .payload
2021                    .get("to_state")
2022                    .and_then(|value| value.as_str())
2023                    .map(|value| value.to_string())
2024            })
2025            .collect();
2026        assert_eq!(
2027            states,
2028            vec![
2029                "registering".to_string(),
2030                "active".to_string(),
2031                "draining".to_string(),
2032                "terminated".to_string(),
2033            ]
2034        );
2035
2036        reset_active_event_log();
2037        clear_trigger_registry();
2038    }
2039
2040    #[tokio::test(flavor = "current_thread")]
2041    async fn version_history_reuses_historical_version_after_restart() {
2042        clear_trigger_registry();
2043        reset_active_event_log();
2044        let tempdir = tempfile::tempdir().expect("tempdir");
2045        install_default_for_base_dir(tempdir.path()).expect("install event log");
2046
2047        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2048            .await
2049            .expect("initial manifest trigger installs");
2050        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2051            .await
2052            .expect("updated manifest trigger installs");
2053
2054        clear_trigger_registry();
2055        reset_active_event_log();
2056        install_default_for_base_dir(tempdir.path()).expect("reopen event log");
2057
2058        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2059            .await
2060            .expect("manifest reload reuses historical version");
2061
2062        let binding = snapshot_trigger_bindings()
2063            .into_iter()
2064            .find(|binding| binding.id == "github-new-issue")
2065            .expect("binding snapshot");
2066        assert_eq!(binding.version, 2);
2067
2068        reset_active_event_log();
2069        clear_trigger_registry();
2070    }
2071
2072    #[tokio::test(flavor = "current_thread")]
2073    async fn binding_version_as_of_reports_historical_active_version() {
2074        clear_trigger_registry();
2075        reset_active_event_log();
2076        let tempdir = tempfile::tempdir().expect("tempdir");
2077        install_default_for_base_dir(tempdir.path()).expect("install event log");
2078
2079        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2080            .await
2081            .expect("initial manifest trigger installs");
2082        // Capture before_reload from real wall-clock. The event log's
2083        // occurred_at_ms also uses the real wall-clock (util::now_ms via
2084        // std::time::SystemTime), so the same reference frame applies.
2085        // The 50ms sleep gives at least one full timer-tick on any POSIX
2086        // platform (even those with a ~15ms tick), ensuring v2's event
2087        // timestamp is strictly after before_reload.
2088        let before_reload = OffsetDateTime::now_utc();
2089        std::thread::sleep(std::time::Duration::from_millis(50));
2090
2091        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2092            .await
2093            .expect("updated manifest trigger installs");
2094        let after_reload = OffsetDateTime::now_utc();
2095
2096        assert_eq!(
2097            binding_version_as_of("github-new-issue", before_reload)
2098                .expect("version before reload"),
2099            1
2100        );
2101        assert_eq!(
2102            binding_version_as_of("github-new-issue", after_reload).expect("version after reload"),
2103            2
2104        );
2105
2106        reset_active_event_log();
2107        clear_trigger_registry();
2108    }
2109
2110    #[tokio::test(flavor = "current_thread")]
2111    async fn resolve_live_or_as_of_logs_structured_gc_fallback() {
2112        clear_trigger_registry();
2113        reset_active_event_log();
2114        let sink = Rc::new(CollectorSink::new());
2115        clear_event_sinks();
2116        add_event_sink(sink.clone());
2117        let tempdir = tempfile::tempdir().expect("tempdir");
2118        install_default_for_base_dir(tempdir.path()).expect("install event log");
2119
2120        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
2121            .await
2122            .expect("install v1");
2123        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
2124            .await
2125            .expect("install v2");
2126        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
2127            .await
2128            .expect("install v3");
2129        // received_at uses the real wall-clock (same reference frame as the
2130        // event log's occurred_at_ms). The 50ms sleep guarantees v4's event
2131        // timestamp is strictly after received_at on any POSIX platform.
2132        let received_at = OffsetDateTime::now_utc();
2133        std::thread::sleep(std::time::Duration::from_millis(50));
2134        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v4")])
2135            .await
2136            .expect("install v4");
2137
2138        let binding = resolve_live_or_as_of(
2139            "github-new-issue",
2140            RecordedTriggerBinding {
2141                version: 1,
2142                received_at,
2143            },
2144        )
2145        .expect("resolve fallback binding");
2146        assert_eq!(binding.version, 3);
2147
2148        let warning = sink
2149            .logs
2150            .borrow()
2151            .iter()
2152            .find(|log| log.category == "replay.binding_version_gc_fallback")
2153            .cloned()
2154            .expect("gc fallback warning");
2155        assert_eq!(warning.level, EventLevel::Warn);
2156        assert_eq!(
2157            warning.metadata.get("trigger_id"),
2158            Some(&serde_json::json!("github-new-issue"))
2159        );
2160        assert_eq!(
2161            warning.metadata.get("recorded_version"),
2162            Some(&serde_json::json!(1))
2163        );
2164        assert_eq!(
2165            warning.metadata.get("received_at"),
2166            Some(&serde_json::json!(received_at
2167                .format(&time::format_description::well_known::Rfc3339)
2168                .unwrap_or_else(|_| received_at.to_string())))
2169        );
2170        assert_eq!(
2171            warning.metadata.get("resolved_version"),
2172            Some(&serde_json::json!(3))
2173        );
2174
2175        clear_event_sinks();
2176        crate::events::reset_event_sinks();
2177        reset_active_event_log();
2178        clear_trigger_registry();
2179    }
2180}