Skip to main content

harn_vm/triggers/
registry.rs

1use serde::{Deserialize, Serialize};
2use std::cell::RefCell;
3use std::collections::{BTreeMap, BTreeSet};
4use std::path::PathBuf;
5use std::rc::Rc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::{Arc, Mutex};
8
9use time::OffsetDateTime;
10use uuid::Uuid;
11
12use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogEvent, Topic};
13use crate::llm::trigger_predicate::TriggerPredicateBudget;
14use crate::secrets::{configured_default_chain, SecretProvider};
15use crate::triggers::test_util::clock;
16use crate::trust_graph::AutonomyTier;
17use crate::value::VmClosure;
18
19use super::dispatcher::TriggerRetryConfig;
20use super::flow_control::TriggerFlowControlConfig;
21use super::ProviderId;
22
23#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
24pub struct TriggerId(String);
25
26impl TriggerId {
27    pub fn new(value: impl Into<String>) -> Self {
28        Self(value.into())
29    }
30
31    pub fn as_str(&self) -> &str {
32        &self.0
33    }
34}
35
36impl std::fmt::Display for TriggerId {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        self.0.fmt(f)
39    }
40}
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
43#[serde(rename_all = "snake_case")]
44pub enum TriggerState {
45    Registering,
46    Active,
47    Draining,
48    Terminated,
49}
50
51impl TriggerState {
52    pub fn as_str(self) -> &'static str {
53        match self {
54            Self::Registering => "registering",
55            Self::Active => "active",
56            Self::Draining => "draining",
57            Self::Terminated => "terminated",
58        }
59    }
60}
61
62#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
63#[serde(rename_all = "snake_case")]
64pub enum TriggerBindingSource {
65    Manifest,
66    Dynamic,
67}
68
69impl TriggerBindingSource {
70    pub fn as_str(self) -> &'static str {
71        match self {
72            Self::Manifest => "manifest",
73            Self::Dynamic => "dynamic",
74        }
75    }
76}
77
78#[derive(Clone)]
79pub enum TriggerHandlerSpec {
80    Local {
81        raw: String,
82        closure: Rc<VmClosure>,
83    },
84    A2a {
85        target: String,
86        allow_cleartext: bool,
87    },
88    Worker {
89        queue: String,
90    },
91}
92
93impl std::fmt::Debug for TriggerHandlerSpec {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        match self {
96            Self::Local { raw, .. } => f.debug_struct("Local").field("raw", raw).finish(),
97            Self::A2a {
98                target,
99                allow_cleartext,
100            } => f
101                .debug_struct("A2a")
102                .field("target", target)
103                .field("allow_cleartext", allow_cleartext)
104                .finish(),
105            Self::Worker { queue } => f.debug_struct("Worker").field("queue", queue).finish(),
106        }
107    }
108}
109
110impl TriggerHandlerSpec {
111    pub fn kind(&self) -> &'static str {
112        match self {
113            Self::Local { .. } => "local",
114            Self::A2a { .. } => "a2a",
115            Self::Worker { .. } => "worker",
116        }
117    }
118}
119
120#[derive(Clone)]
121pub struct TriggerPredicateSpec {
122    pub raw: String,
123    pub closure: Rc<VmClosure>,
124}
125
126impl std::fmt::Debug for TriggerPredicateSpec {
127    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128        f.debug_struct("TriggerPredicateSpec")
129            .field("raw", &self.raw)
130            .finish()
131    }
132}
133
134#[derive(Clone, Debug)]
135pub struct TriggerBindingSpec {
136    pub id: String,
137    pub source: TriggerBindingSource,
138    pub kind: String,
139    pub provider: ProviderId,
140    pub autonomy_tier: AutonomyTier,
141    pub handler: TriggerHandlerSpec,
142    pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
143    pub when: Option<TriggerPredicateSpec>,
144    pub when_budget: Option<TriggerPredicateBudget>,
145    pub retry: TriggerRetryConfig,
146    pub match_events: Vec<String>,
147    pub dedupe_key: Option<String>,
148    pub dedupe_retention_days: u32,
149    pub filter: Option<String>,
150    pub daily_cost_usd: Option<f64>,
151    pub max_concurrent: Option<u32>,
152    pub flow_control: TriggerFlowControlConfig,
153    pub manifest_path: Option<PathBuf>,
154    pub package_name: Option<String>,
155    pub definition_fingerprint: String,
156}
157
158#[derive(Debug)]
159pub struct TriggerMetrics {
160    pub received: AtomicU64,
161    pub dispatched: AtomicU64,
162    pub failed: AtomicU64,
163    pub dlq: AtomicU64,
164    pub last_received_ms: Mutex<Option<i64>>,
165    pub cost_total_usd_micros: AtomicU64,
166    pub cost_today_usd_micros: AtomicU64,
167}
168
169impl Default for TriggerMetrics {
170    fn default() -> Self {
171        Self {
172            received: AtomicU64::new(0),
173            dispatched: AtomicU64::new(0),
174            failed: AtomicU64::new(0),
175            dlq: AtomicU64::new(0),
176            last_received_ms: Mutex::new(None),
177            cost_total_usd_micros: AtomicU64::new(0),
178            cost_today_usd_micros: AtomicU64::new(0),
179        }
180    }
181}
182
183#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
184pub struct TriggerMetricsSnapshot {
185    pub received: u64,
186    pub dispatched: u64,
187    pub failed: u64,
188    pub dlq: u64,
189    pub in_flight: u64,
190    pub last_received_ms: Option<i64>,
191    pub cost_total_usd_micros: u64,
192    pub cost_today_usd_micros: u64,
193}
194
195pub struct TriggerBinding {
196    pub id: TriggerId,
197    pub version: u32,
198    pub source: TriggerBindingSource,
199    pub kind: String,
200    pub provider: ProviderId,
201    pub autonomy_tier: AutonomyTier,
202    pub handler: TriggerHandlerSpec,
203    pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
204    pub when: Option<TriggerPredicateSpec>,
205    pub when_budget: Option<TriggerPredicateBudget>,
206    pub retry: TriggerRetryConfig,
207    pub match_events: Vec<String>,
208    pub dedupe_key: Option<String>,
209    pub dedupe_retention_days: u32,
210    pub filter: Option<String>,
211    pub daily_cost_usd: Option<f64>,
212    pub max_concurrent: Option<u32>,
213    pub flow_control: TriggerFlowControlConfig,
214    pub manifest_path: Option<PathBuf>,
215    pub package_name: Option<String>,
216    pub definition_fingerprint: String,
217    pub state: Mutex<TriggerState>,
218    pub metrics: TriggerMetrics,
219    pub in_flight: AtomicU64,
220    pub cancel_token: Arc<AtomicBool>,
221    pub predicate_state: Mutex<TriggerPredicateState>,
222}
223
224#[derive(Clone, Debug, Default)]
225pub struct TriggerPredicateState {
226    pub budget_day_utc: Option<i32>,
227    pub consecutive_failures: u32,
228    pub breaker_open_until_ms: Option<i64>,
229}
230
231impl std::fmt::Debug for TriggerBinding {
232    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233        f.debug_struct("TriggerBinding")
234            .field("id", &self.id)
235            .field("version", &self.version)
236            .field("source", &self.source)
237            .field("kind", &self.kind)
238            .field("provider", &self.provider)
239            .field("handler_kind", &self.handler.kind())
240            .field("state", &self.state_snapshot())
241            .finish()
242    }
243}
244
245impl TriggerBinding {
246    pub fn snapshot(&self) -> TriggerBindingSnapshot {
247        TriggerBindingSnapshot {
248            id: self.id.as_str().to_string(),
249            version: self.version,
250            source: self.source,
251            kind: self.kind.clone(),
252            provider: self.provider.as_str().to_string(),
253            autonomy_tier: self.autonomy_tier,
254            handler_kind: self.handler.kind().to_string(),
255            state: self.state_snapshot(),
256            metrics: self.metrics_snapshot(),
257        }
258    }
259
260    fn new(spec: TriggerBindingSpec, version: u32) -> Self {
261        Self {
262            id: TriggerId::new(spec.id),
263            version,
264            source: spec.source,
265            kind: spec.kind,
266            provider: spec.provider,
267            autonomy_tier: spec.autonomy_tier,
268            handler: spec.handler,
269            dispatch_priority: spec.dispatch_priority,
270            when: spec.when,
271            when_budget: spec.when_budget,
272            retry: spec.retry,
273            match_events: spec.match_events,
274            dedupe_key: spec.dedupe_key,
275            dedupe_retention_days: spec.dedupe_retention_days,
276            filter: spec.filter,
277            daily_cost_usd: spec.daily_cost_usd,
278            max_concurrent: spec.max_concurrent,
279            flow_control: spec.flow_control,
280            manifest_path: spec.manifest_path,
281            package_name: spec.package_name,
282            definition_fingerprint: spec.definition_fingerprint,
283            state: Mutex::new(TriggerState::Registering),
284            metrics: TriggerMetrics::default(),
285            in_flight: AtomicU64::new(0),
286            cancel_token: Arc::new(AtomicBool::new(false)),
287            predicate_state: Mutex::new(TriggerPredicateState::default()),
288        }
289    }
290
291    pub fn binding_key(&self) -> String {
292        format!("{}@v{}", self.id.as_str(), self.version)
293    }
294
295    pub fn state_snapshot(&self) -> TriggerState {
296        *self.state.lock().expect("trigger state poisoned")
297    }
298
299    pub fn metrics_snapshot(&self) -> TriggerMetricsSnapshot {
300        TriggerMetricsSnapshot {
301            received: self.metrics.received.load(Ordering::Relaxed),
302            dispatched: self.metrics.dispatched.load(Ordering::Relaxed),
303            failed: self.metrics.failed.load(Ordering::Relaxed),
304            dlq: self.metrics.dlq.load(Ordering::Relaxed),
305            in_flight: self.in_flight.load(Ordering::Relaxed),
306            last_received_ms: *self
307                .metrics
308                .last_received_ms
309                .lock()
310                .expect("trigger metrics poisoned"),
311            cost_total_usd_micros: self.metrics.cost_total_usd_micros.load(Ordering::Relaxed),
312            cost_today_usd_micros: self.metrics.cost_today_usd_micros.load(Ordering::Relaxed),
313        }
314    }
315}
316
317#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
318pub struct TriggerBindingSnapshot {
319    pub id: String,
320    pub version: u32,
321    pub source: TriggerBindingSource,
322    pub kind: String,
323    pub provider: String,
324    pub autonomy_tier: AutonomyTier,
325    pub handler_kind: String,
326    pub state: TriggerState,
327    pub metrics: TriggerMetricsSnapshot,
328}
329
330#[derive(Clone, Copy, Debug, PartialEq, Eq)]
331pub enum TriggerDispatchOutcome {
332    Dispatched,
333    Failed,
334    Dlq,
335}
336
337#[derive(Debug)]
338pub enum TriggerRegistryError {
339    DuplicateId(String),
340    InvalidSpec(String),
341    UnknownId(String),
342    UnknownBindingVersion { id: String, version: u32 },
343    EventLog(String),
344}
345
346impl std::fmt::Display for TriggerRegistryError {
347    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
348        match self {
349            Self::DuplicateId(id) => write!(f, "duplicate trigger id '{id}'"),
350            Self::InvalidSpec(message) | Self::EventLog(message) => f.write_str(message),
351            Self::UnknownId(id) => write!(f, "unknown trigger id '{id}'"),
352            Self::UnknownBindingVersion { id, version } => {
353                write!(f, "unknown trigger binding '{id}' version {version}")
354            }
355        }
356    }
357}
358
359impl std::error::Error for TriggerRegistryError {}
360
361#[derive(Default)]
362pub struct TriggerRegistry {
363    bindings: BTreeMap<String, Vec<Arc<TriggerBinding>>>,
364    by_provider: BTreeMap<String, BTreeSet<String>>,
365    event_log: Option<Arc<AnyEventLog>>,
366    secret_provider: Option<Arc<dyn SecretProvider>>,
367}
368
369thread_local! {
370    static TRIGGER_REGISTRY: RefCell<TriggerRegistry> = RefCell::new(TriggerRegistry::default());
371}
372
373const TERMINATED_VERSION_RETENTION_LIMIT: usize = 2;
374
375const TRIGGERS_LIFECYCLE_TOPIC: &str = "triggers.lifecycle";
376
377#[derive(Clone, Debug, Deserialize)]
378struct LifecycleStateTransitionRecord {
379    id: String,
380    version: u32,
381    #[serde(default)]
382    definition_fingerprint: Option<String>,
383    to_state: TriggerState,
384}
385
386#[derive(Clone, Debug)]
387struct HistoricalLifecycleRecord {
388    occurred_at_ms: i64,
389    transition: LifecycleStateTransitionRecord,
390}
391
392#[derive(Clone, Copy, Debug, PartialEq, Eq)]
393pub struct RecordedTriggerBinding {
394    pub version: u32,
395    pub received_at: OffsetDateTime,
396}
397
398#[derive(Clone, Copy, Debug, Default)]
399struct HistoricalVersionLookup {
400    matching_version: Option<u32>,
401    max_version: Option<u32>,
402}
403
404pub fn clear_trigger_registry() {
405    TRIGGER_REGISTRY.with(|slot| {
406        *slot.borrow_mut() = TriggerRegistry::default();
407    });
408}
409
410pub fn snapshot_trigger_bindings() -> Vec<TriggerBindingSnapshot> {
411    TRIGGER_REGISTRY.with(|slot| {
412        let registry = slot.borrow();
413        let mut snapshots = Vec::new();
414        for bindings in registry.bindings.values() {
415            for binding in bindings {
416                snapshots.push(binding.snapshot());
417            }
418        }
419        snapshots.sort_by(|left, right| {
420            left.id
421                .cmp(&right.id)
422                .then(left.version.cmp(&right.version))
423                .then(left.state.as_str().cmp(right.state.as_str()))
424        });
425        snapshots
426    })
427}
428
429#[allow(clippy::arc_with_non_send_sync)]
430pub fn resolve_trigger_binding_as_of(
431    id: &str,
432    as_of: OffsetDateTime,
433) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
434    let version = binding_version_as_of(id, as_of)?;
435    resolve_trigger_binding_version(id, version)
436}
437
438#[allow(clippy::arc_with_non_send_sync)]
439pub fn resolve_live_or_as_of(
440    id: &str,
441    recorded: RecordedTriggerBinding,
442) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
443    match resolve_live_trigger_binding(id, Some(recorded.version)) {
444        Ok(binding) => Ok(binding),
445        Err(TriggerRegistryError::UnknownBindingVersion { .. }) => {
446            let binding = resolve_trigger_binding_as_of(id, recorded.received_at)?;
447            let mut metadata = BTreeMap::new();
448            metadata.insert("trigger_id".to_string(), serde_json::json!(id));
449            metadata.insert(
450                "recorded_version".to_string(),
451                serde_json::json!(recorded.version),
452            );
453            metadata.insert(
454                "received_at".to_string(),
455                serde_json::json!(recorded
456                    .received_at
457                    .format(&time::format_description::well_known::Rfc3339)
458                    .unwrap_or_else(|_| recorded.received_at.to_string())),
459            );
460            metadata.insert(
461                "resolved_version".to_string(),
462                serde_json::json!(binding.version),
463            );
464            crate::events::log_warn_meta(
465                "replay.binding_version_gc_fallback",
466                "trigger replay fell back to lifecycle history after binding version GC",
467                metadata,
468            );
469            Ok(binding)
470        }
471        Err(error) => Err(error),
472    }
473}
474
475pub fn binding_version_as_of(id: &str, as_of: OffsetDateTime) -> Result<u32, TriggerRegistryError> {
476    TRIGGER_REGISTRY.with(|slot| {
477        let registry = slot.borrow();
478        registry.binding_version_as_of(id, as_of)
479    })
480}
481
482#[allow(clippy::arc_with_non_send_sync)]
483fn resolve_trigger_binding_version(
484    id: &str,
485    version: u32,
486) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
487    TRIGGER_REGISTRY.with(|slot| {
488        let registry = slot.borrow();
489        registry
490            .binding(id, version)
491            .ok_or_else(|| TriggerRegistryError::UnknownBindingVersion {
492                id: id.to_string(),
493                version,
494            })
495    })
496}
497
498#[allow(clippy::arc_with_non_send_sync)]
499pub fn resolve_live_trigger_binding(
500    id: &str,
501    version: Option<u32>,
502) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
503    TRIGGER_REGISTRY.with(|slot| {
504        let registry = slot.borrow();
505        if let Some(version) = version {
506            let binding = registry.binding(id, version).ok_or_else(|| {
507                TriggerRegistryError::UnknownBindingVersion {
508                    id: id.to_string(),
509                    version,
510                }
511            })?;
512            if binding.state_snapshot() == TriggerState::Terminated {
513                return Err(TriggerRegistryError::UnknownBindingVersion {
514                    id: id.to_string(),
515                    version,
516                });
517            }
518            return Ok(binding);
519        }
520
521        registry
522            .live_bindings_any_source(id)
523            .into_iter()
524            .max_by_key(|binding| binding.version)
525            .ok_or_else(|| TriggerRegistryError::UnknownId(id.to_string()))
526    })
527}
528
529pub(crate) fn matching_bindings(event: &super::TriggerEvent) -> Vec<Arc<TriggerBinding>> {
530    TRIGGER_REGISTRY.with(|slot| {
531        let registry = slot.borrow();
532        let Some(binding_ids) = registry.by_provider.get(event.provider.as_str()) else {
533            return Vec::new();
534        };
535
536        let mut bindings = Vec::new();
537        for id in binding_ids {
538            let Some(versions) = registry.bindings.get(id) else {
539                continue;
540            };
541            for binding in versions {
542                if binding.state_snapshot() != TriggerState::Active {
543                    continue;
544                }
545                if !binding.match_events.is_empty()
546                    && !binding.match_events.iter().any(|kind| kind == &event.kind)
547                {
548                    continue;
549                }
550                bindings.push(binding.clone());
551            }
552        }
553
554        bindings.sort_by(|left, right| {
555            left.id
556                .as_str()
557                .cmp(right.id.as_str())
558                .then(left.version.cmp(&right.version))
559        });
560        bindings
561    })
562}
563
564pub async fn install_manifest_triggers(
565    specs: Vec<TriggerBindingSpec>,
566) -> Result<(), TriggerRegistryError> {
567    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
568        let registry = &mut *slot.borrow_mut();
569        registry.refresh_runtime_context();
570        let mut touched_ids = BTreeSet::new();
571
572        let mut incoming = BTreeMap::new();
573        for spec in specs {
574            let spec_id = spec.id.clone();
575            if spec.source != TriggerBindingSource::Manifest {
576                return Err(TriggerRegistryError::InvalidSpec(format!(
577                    "manifest install received non-manifest trigger '{}'",
578                    spec_id
579                )));
580            }
581            if spec_id.trim().is_empty() {
582                return Err(TriggerRegistryError::InvalidSpec(
583                    "manifest trigger id cannot be empty".to_string(),
584                ));
585            }
586            if incoming.insert(spec_id.clone(), spec).is_some() {
587                return Err(TriggerRegistryError::DuplicateId(spec_id));
588            }
589        }
590
591        let mut lifecycle = Vec::new();
592        let existing_ids: Vec<String> = registry
593            .bindings
594            .iter()
595            .filter(|(_, bindings)| {
596                bindings.iter().any(|binding| {
597                    binding.source == TriggerBindingSource::Manifest
598                        && binding.state_snapshot() != TriggerState::Terminated
599                })
600            })
601            .map(|(id, _)| id.clone())
602            .collect();
603
604        for id in existing_ids {
605            let live_manifest = registry.live_bindings(&id, TriggerBindingSource::Manifest);
606            let Some(spec) = incoming.remove(&id) else {
607                for binding in live_manifest {
608                    registry.transition_binding_to_draining(&binding, &mut lifecycle);
609                }
610                touched_ids.insert(id.clone());
611                continue;
612            };
613
614            let has_matching_active = live_manifest.iter().any(|binding| {
615                binding.definition_fingerprint == spec.definition_fingerprint
616                    && matches!(
617                        binding.state_snapshot(),
618                        TriggerState::Registering | TriggerState::Active
619                    )
620            });
621            if has_matching_active {
622                continue;
623            }
624
625            for binding in live_manifest {
626                registry.transition_binding_to_draining(&binding, &mut lifecycle);
627            }
628
629            let version = registry.next_version_for_spec(&spec);
630            registry.register_binding(spec, version, &mut lifecycle);
631            touched_ids.insert(id.clone());
632        }
633
634        for spec in incoming.into_values() {
635            touched_ids.insert(spec.id.clone());
636            let version = registry.next_version_for_spec(&spec);
637            registry.register_binding(spec, version, &mut lifecycle);
638        }
639
640        for id in touched_ids {
641            registry.gc_terminated_versions(&id);
642        }
643
644        Ok((registry.event_log.clone(), lifecycle))
645    })?;
646
647    append_lifecycle_events(event_log, events).await
648}
649
650pub async fn dynamic_register(
651    mut spec: TriggerBindingSpec,
652) -> Result<TriggerId, TriggerRegistryError> {
653    if spec.id.trim().is_empty() {
654        spec.id = format!("dynamic_trigger_{}", Uuid::now_v7());
655    }
656    spec.source = TriggerBindingSource::Dynamic;
657    let id = spec.id.clone();
658    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
659        let registry = &mut *slot.borrow_mut();
660        registry.refresh_runtime_context();
661
662        if registry.bindings.contains_key(id.as_str()) {
663            return Err(TriggerRegistryError::DuplicateId(id.clone()));
664        }
665
666        let mut lifecycle = Vec::new();
667        let version = registry.next_version_for_spec(&spec);
668        registry.register_binding(spec, version, &mut lifecycle);
669        Ok((registry.event_log.clone(), lifecycle))
670    })?;
671
672    append_lifecycle_events(event_log, events).await?;
673    Ok(TriggerId::new(id))
674}
675
676pub async fn dynamic_deregister(id: &str) -> Result<(), TriggerRegistryError> {
677    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
678        let registry = &mut *slot.borrow_mut();
679        let live_dynamic = registry.live_bindings(id, TriggerBindingSource::Dynamic);
680        if live_dynamic.is_empty() {
681            return Err(TriggerRegistryError::UnknownId(id.to_string()));
682        }
683
684        let mut lifecycle = Vec::new();
685        for binding in live_dynamic {
686            registry.transition_binding_to_draining(&binding, &mut lifecycle);
687        }
688        Ok((registry.event_log.clone(), lifecycle))
689    })?;
690
691    append_lifecycle_events(event_log, events).await
692}
693
694pub async fn drain(id: &str) -> Result<(), TriggerRegistryError> {
695    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
696        let registry = &mut *slot.borrow_mut();
697        let live = registry.live_bindings_any_source(id);
698        if live.is_empty() {
699            return Err(TriggerRegistryError::UnknownId(id.to_string()));
700        }
701
702        let mut lifecycle = Vec::new();
703        for binding in live {
704            registry.transition_binding_to_draining(&binding, &mut lifecycle);
705        }
706        Ok((registry.event_log.clone(), lifecycle))
707    })?;
708
709    append_lifecycle_events(event_log, events).await
710}
711
712fn pin_trigger_binding_inner(
713    id: &str,
714    version: u32,
715    allow_terminated: bool,
716) -> Result<(), TriggerRegistryError> {
717    TRIGGER_REGISTRY.with(|slot| {
718        let registry = slot.borrow();
719        let binding = registry.binding(id, version).ok_or_else(|| {
720            TriggerRegistryError::UnknownBindingVersion {
721                id: id.to_string(),
722                version,
723            }
724        })?;
725        match binding.state_snapshot() {
726            TriggerState::Terminated if !allow_terminated => {
727                Err(TriggerRegistryError::InvalidSpec(format!(
728                    "trigger binding '{}' version {} is terminated",
729                    id, version
730                )))
731            }
732            _ => {
733                binding.in_flight.fetch_add(1, Ordering::Relaxed);
734                Ok(())
735            }
736        }
737    })
738}
739
740pub fn pin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
741    pin_trigger_binding_inner(id, version, false)
742}
743
744pub async fn unpin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
745    let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
746        let registry = &mut *slot.borrow_mut();
747        let binding = registry.binding(id, version).ok_or_else(|| {
748            TriggerRegistryError::UnknownBindingVersion {
749                id: id.to_string(),
750                version,
751            }
752        })?;
753        let current = binding.in_flight.load(Ordering::Relaxed);
754        if current == 0 {
755            return Err(TriggerRegistryError::InvalidSpec(format!(
756                "trigger binding '{}' version {} has no in-flight events",
757                id, version
758            )));
759        }
760        binding.in_flight.fetch_sub(1, Ordering::Relaxed);
761
762        let mut lifecycle = Vec::new();
763        registry.maybe_finalize_draining(&binding, &mut lifecycle);
764        registry.gc_terminated_versions(binding.id.as_str());
765        Ok((registry.event_log.clone(), lifecycle))
766    })?;
767
768    append_lifecycle_events(event_log, events).await
769}
770
771pub fn begin_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
772    begin_in_flight_inner(id, version, false)
773}
774
775pub(crate) fn begin_replay_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
776    begin_in_flight_inner(id, version, true)
777}
778
779fn begin_in_flight_inner(
780    id: &str,
781    version: u32,
782    allow_terminated: bool,
783) -> Result<(), TriggerRegistryError> {
784    pin_trigger_binding_inner(id, version, allow_terminated)?;
785    TRIGGER_REGISTRY.with(|slot| {
786        let registry = slot.borrow();
787        let binding = registry.binding(id, version).ok_or_else(|| {
788            TriggerRegistryError::UnknownBindingVersion {
789                id: id.to_string(),
790                version,
791            }
792        })?;
793        binding.metrics.received.fetch_add(1, Ordering::Relaxed);
794        *binding
795            .metrics
796            .last_received_ms
797            .lock()
798            .expect("trigger metrics poisoned") = Some(now_ms());
799        Ok(())
800    })
801}
802
803pub async fn finish_in_flight(
804    id: &str,
805    version: u32,
806    outcome: TriggerDispatchOutcome,
807) -> Result<(), TriggerRegistryError> {
808    TRIGGER_REGISTRY.with(|slot| {
809        let registry = &mut *slot.borrow_mut();
810        let binding = registry.binding(id, version).ok_or_else(|| {
811            TriggerRegistryError::UnknownBindingVersion {
812                id: id.to_string(),
813                version,
814            }
815        })?;
816        let current = binding.in_flight.load(Ordering::Relaxed);
817        if current == 0 {
818            return Err(TriggerRegistryError::InvalidSpec(format!(
819                "trigger binding '{}' version {} has no in-flight events",
820                id, version
821            )));
822        }
823        match outcome {
824            TriggerDispatchOutcome::Dispatched => {
825                binding.metrics.dispatched.fetch_add(1, Ordering::Relaxed);
826            }
827            TriggerDispatchOutcome::Failed => {
828                binding.metrics.failed.fetch_add(1, Ordering::Relaxed);
829            }
830            TriggerDispatchOutcome::Dlq => {
831                binding.metrics.dlq.fetch_add(1, Ordering::Relaxed);
832            }
833        }
834        Ok(())
835    })?;
836
837    unpin_trigger_binding(id, version).await
838}
839
840impl TriggerRegistry {
841    fn refresh_runtime_context(&mut self) {
842        if self.event_log.is_none() {
843            self.event_log = active_event_log();
844        }
845        if self.secret_provider.is_none() {
846            self.secret_provider = default_secret_provider();
847        }
848    }
849
850    fn binding(&self, id: &str, version: u32) -> Option<Arc<TriggerBinding>> {
851        self.bindings
852            .get(id)
853            .and_then(|bindings| bindings.iter().find(|binding| binding.version == version))
854            .cloned()
855    }
856
857    fn live_bindings(&self, id: &str, source: TriggerBindingSource) -> Vec<Arc<TriggerBinding>> {
858        self.bindings
859            .get(id)
860            .into_iter()
861            .flat_map(|bindings| bindings.iter())
862            .filter(|binding| {
863                binding.source == source && binding.state_snapshot() != TriggerState::Terminated
864            })
865            .cloned()
866            .collect()
867    }
868
869    fn live_bindings_any_source(&self, id: &str) -> Vec<Arc<TriggerBinding>> {
870        self.bindings
871            .get(id)
872            .into_iter()
873            .flat_map(|bindings| bindings.iter())
874            .filter(|binding| binding.state_snapshot() != TriggerState::Terminated)
875            .cloned()
876            .collect()
877    }
878
879    fn next_version_for_spec(&self, spec: &TriggerBindingSpec) -> u32 {
880        if let Some(version) = self
881            .bindings
882            .get(spec.id.as_str())
883            .into_iter()
884            .flat_map(|bindings| bindings.iter())
885            .find(|binding| binding.definition_fingerprint == spec.definition_fingerprint)
886            .map(|binding| binding.version)
887        {
888            return version;
889        }
890
891        let historical =
892            self.historical_versions_for(spec.id.as_str(), spec.definition_fingerprint.as_str());
893        if let Some(version) = historical.matching_version {
894            return version;
895        }
896
897        self.bindings
898            .get(spec.id.as_str())
899            .into_iter()
900            .flat_map(|bindings| bindings.iter())
901            .map(|binding| binding.version)
902            .chain(historical.max_version)
903            .max()
904            .unwrap_or(0)
905            + 1
906    }
907
908    fn gc_terminated_versions(&mut self, id: &str) {
909        let Some(bindings) = self.bindings.get_mut(id) else {
910            return;
911        };
912
913        let mut newest_versions: Vec<u32> =
914            bindings.iter().map(|binding| binding.version).collect();
915        newest_versions.sort_unstable_by(|left, right| right.cmp(left));
916        newest_versions.truncate(TERMINATED_VERSION_RETENTION_LIMIT);
917        let retained_versions: BTreeSet<u32> = newest_versions.into_iter().collect();
918
919        bindings.retain(|binding| {
920            binding.state_snapshot() != TriggerState::Terminated
921                || retained_versions.contains(&binding.version)
922        });
923
924        if bindings.is_empty() {
925            self.bindings.remove(id);
926        }
927    }
928
929    fn historical_versions_for(&self, id: &str, fingerprint: &str) -> HistoricalVersionLookup {
930        let mut lookup = HistoricalVersionLookup::default();
931        for record in self.lifecycle_records_for(id) {
932            lookup.max_version = Some(
933                lookup
934                    .max_version
935                    .unwrap_or(0)
936                    .max(record.transition.version),
937            );
938            if record.transition.definition_fingerprint.as_deref() == Some(fingerprint) {
939                lookup.matching_version = Some(record.transition.version);
940            }
941        }
942        lookup
943    }
944
945    fn binding_version_as_of(
946        &self,
947        id: &str,
948        as_of: OffsetDateTime,
949    ) -> Result<u32, TriggerRegistryError> {
950        let cutoff_ms = (as_of.unix_timestamp_nanos() / 1_000_000) as i64;
951        let mut active_version = None;
952        for record in self.lifecycle_records_for(id) {
953            if record.occurred_at_ms > cutoff_ms {
954                break;
955            }
956            match record.transition.to_state {
957                TriggerState::Active => active_version = Some(record.transition.version),
958                TriggerState::Draining | TriggerState::Terminated => {
959                    if active_version == Some(record.transition.version) {
960                        active_version = None;
961                    }
962                }
963                TriggerState::Registering => {}
964            }
965        }
966
967        active_version.ok_or_else(|| {
968            TriggerRegistryError::InvalidSpec(format!(
969                "no active trigger binding '{}' at {}",
970                id,
971                as_of
972                    .format(&time::format_description::well_known::Rfc3339)
973                    .unwrap_or_else(|_| as_of.to_string())
974            ))
975        })
976    }
977
978    fn lifecycle_records_for(&self, id: &str) -> Vec<HistoricalLifecycleRecord> {
979        let Some(event_log) = self.event_log.as_ref() else {
980            return Vec::new();
981        };
982        let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
983            .expect("static triggers.lifecycle topic should always be valid");
984        futures::executor::block_on(event_log.read_range(&topic, None, usize::MAX))
985            .unwrap_or_default()
986            .into_iter()
987            .filter_map(|(_, event)| {
988                let occurred_at_ms = event.occurred_at_ms;
989                let transition: LifecycleStateTransitionRecord =
990                    serde_json::from_value(event.payload).ok()?;
991                (transition.id == id).then_some(HistoricalLifecycleRecord {
992                    occurred_at_ms,
993                    transition,
994                })
995            })
996            .collect()
997    }
998
999    #[allow(clippy::arc_with_non_send_sync)]
1000    fn register_binding(
1001        &mut self,
1002        spec: TriggerBindingSpec,
1003        version: u32,
1004        lifecycle: &mut Vec<LogEvent>,
1005    ) -> Arc<TriggerBinding> {
1006        let binding = Arc::new(TriggerBinding::new(spec, version));
1007        self.by_provider
1008            .entry(binding.provider.as_str().to_string())
1009            .or_default()
1010            .insert(binding.id.as_str().to_string());
1011        self.bindings
1012            .entry(binding.id.as_str().to_string())
1013            .or_default()
1014            .push(binding.clone());
1015        lifecycle.push(lifecycle_event(&binding, None, TriggerState::Registering));
1016        self.transition_binding_state(&binding, TriggerState::Active, lifecycle);
1017        binding
1018    }
1019
1020    fn transition_binding_to_draining(
1021        &self,
1022        binding: &Arc<TriggerBinding>,
1023        lifecycle: &mut Vec<LogEvent>,
1024    ) {
1025        if matches!(binding.state_snapshot(), TriggerState::Terminated) {
1026            return;
1027        }
1028        self.transition_binding_state(binding, TriggerState::Draining, lifecycle);
1029        self.maybe_finalize_draining(binding, lifecycle);
1030    }
1031
1032    fn maybe_finalize_draining(
1033        &self,
1034        binding: &Arc<TriggerBinding>,
1035        lifecycle: &mut Vec<LogEvent>,
1036    ) {
1037        if binding.state_snapshot() == TriggerState::Draining
1038            && binding.in_flight.load(Ordering::Relaxed) == 0
1039        {
1040            self.transition_binding_state(binding, TriggerState::Terminated, lifecycle);
1041        }
1042    }
1043
1044    fn transition_binding_state(
1045        &self,
1046        binding: &Arc<TriggerBinding>,
1047        next: TriggerState,
1048        lifecycle: &mut Vec<LogEvent>,
1049    ) {
1050        let mut state = binding.state.lock().expect("trigger state poisoned");
1051        let previous = *state;
1052        if previous == next {
1053            return;
1054        }
1055        *state = next;
1056        drop(state);
1057        lifecycle.push(lifecycle_event(binding, Some(previous), next));
1058    }
1059}
1060
1061fn lifecycle_event(
1062    binding: &TriggerBinding,
1063    from_state: Option<TriggerState>,
1064    to_state: TriggerState,
1065) -> LogEvent {
1066    LogEvent::new(
1067        "state_transition",
1068        serde_json::json!({
1069            "id": binding.id.as_str(),
1070            "binding_key": binding.binding_key(),
1071            "version": binding.version,
1072            "provider": binding.provider.as_str(),
1073            "kind": &binding.kind,
1074            "source": binding.source.as_str(),
1075            "handler_kind": binding.handler.kind(),
1076            "definition_fingerprint": &binding.definition_fingerprint,
1077            "from_state": from_state.map(TriggerState::as_str),
1078            "to_state": to_state.as_str(),
1079        }),
1080    )
1081}
1082
1083async fn append_lifecycle_events(
1084    event_log: Option<Arc<AnyEventLog>>,
1085    events: Vec<LogEvent>,
1086) -> Result<(), TriggerRegistryError> {
1087    let Some(event_log) = event_log else {
1088        return Ok(());
1089    };
1090    if events.is_empty() {
1091        return Ok(());
1092    }
1093
1094    let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1095        .expect("static triggers.lifecycle topic should always be valid");
1096    for event in events {
1097        event_log
1098            .append(&topic, event)
1099            .await
1100            .map_err(|error| TriggerRegistryError::EventLog(error.to_string()))?;
1101    }
1102    Ok(())
1103}
1104
1105fn default_secret_provider() -> Option<Arc<dyn SecretProvider>> {
1106    configured_default_chain(default_secret_namespace())
1107        .ok()
1108        .map(|provider| Arc::new(provider) as Arc<dyn SecretProvider>)
1109}
1110
1111fn default_secret_namespace() -> String {
1112    if let Ok(namespace) = std::env::var("HARN_SECRET_NAMESPACE") {
1113        if !namespace.trim().is_empty() {
1114            return namespace;
1115        }
1116    }
1117
1118    let cwd = std::env::current_dir().unwrap_or_default();
1119    let leaf = cwd
1120        .file_name()
1121        .and_then(|name| name.to_str())
1122        .filter(|name| !name.is_empty())
1123        .unwrap_or("workspace");
1124    format!("harn/{leaf}")
1125}
1126
1127fn now_ms() -> i64 {
1128    clock::now_ms()
1129}
1130
1131#[cfg(test)]
1132mod tests {
1133    use super::*;
1134    use crate::event_log::{install_default_for_base_dir, reset_active_event_log};
1135    use crate::events::{add_event_sink, clear_event_sinks, CollectorSink, EventLevel};
1136    use std::rc::Rc;
1137    use time::OffsetDateTime;
1138
1139    fn manifest_spec(id: &str, fingerprint: &str) -> TriggerBindingSpec {
1140        TriggerBindingSpec {
1141            id: id.to_string(),
1142            source: TriggerBindingSource::Manifest,
1143            kind: "webhook".to_string(),
1144            provider: ProviderId::from("github"),
1145            autonomy_tier: crate::AutonomyTier::ActAuto,
1146            handler: TriggerHandlerSpec::Worker {
1147                queue: format!("{id}-queue"),
1148            },
1149            dispatch_priority: crate::WorkerQueuePriority::Normal,
1150            when: None,
1151            when_budget: None,
1152            retry: TriggerRetryConfig::default(),
1153            match_events: vec!["issues.opened".to_string()],
1154            dedupe_key: Some("event.dedupe_key".to_string()),
1155            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1156            filter: Some("event.kind".to_string()),
1157            daily_cost_usd: Some(5.0),
1158            max_concurrent: Some(10),
1159            flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1160            manifest_path: None,
1161            package_name: Some("workspace".to_string()),
1162            definition_fingerprint: fingerprint.to_string(),
1163        }
1164    }
1165
1166    fn dynamic_spec(id: &str) -> TriggerBindingSpec {
1167        TriggerBindingSpec {
1168            id: id.to_string(),
1169            source: TriggerBindingSource::Dynamic,
1170            kind: "webhook".to_string(),
1171            provider: ProviderId::from("github"),
1172            autonomy_tier: crate::AutonomyTier::ActAuto,
1173            handler: TriggerHandlerSpec::Worker {
1174                queue: format!("{id}-queue"),
1175            },
1176            dispatch_priority: crate::WorkerQueuePriority::Normal,
1177            when: None,
1178            when_budget: None,
1179            retry: TriggerRetryConfig::default(),
1180            match_events: vec!["issues.opened".to_string()],
1181            dedupe_key: None,
1182            dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1183            filter: None,
1184            daily_cost_usd: None,
1185            max_concurrent: None,
1186            flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1187            manifest_path: None,
1188            package_name: None,
1189            definition_fingerprint: format!("dynamic:{id}"),
1190        }
1191    }
1192
1193    #[tokio::test(flavor = "current_thread")]
1194    async fn manifest_loaded_trigger_registers_with_zeroed_metrics() {
1195        clear_trigger_registry();
1196
1197        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1198            .await
1199            .expect("manifest trigger installs");
1200
1201        let snapshots = snapshot_trigger_bindings();
1202        assert_eq!(snapshots.len(), 1);
1203        let binding = &snapshots[0];
1204        assert_eq!(binding.id, "github-new-issue");
1205        assert_eq!(binding.version, 1);
1206        assert_eq!(binding.state, TriggerState::Active);
1207        assert_eq!(binding.metrics, TriggerMetricsSnapshot::default());
1208
1209        clear_trigger_registry();
1210    }
1211
1212    #[tokio::test(flavor = "current_thread")]
1213    async fn dynamic_register_assigns_unique_ids_and_rejects_duplicates() {
1214        clear_trigger_registry();
1215
1216        let first = dynamic_register(dynamic_spec("dynamic-a"))
1217            .await
1218            .expect("first dynamic trigger");
1219        let second = dynamic_register(dynamic_spec("dynamic-b"))
1220            .await
1221            .expect("second dynamic trigger");
1222        assert_ne!(first, second);
1223
1224        let error = dynamic_register(dynamic_spec("dynamic-a"))
1225            .await
1226            .expect_err("duplicate id should fail");
1227        assert!(matches!(error, TriggerRegistryError::DuplicateId(_)));
1228
1229        clear_trigger_registry();
1230    }
1231
1232    #[tokio::test(flavor = "current_thread")]
1233    async fn drain_waits_for_in_flight_events_before_terminating() {
1234        clear_trigger_registry();
1235
1236        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1237            .await
1238            .expect("manifest trigger installs");
1239        begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1240
1241        drain("github-new-issue").await.expect("drain succeeds");
1242        let binding = snapshot_trigger_bindings()
1243            .into_iter()
1244            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1245            .expect("binding snapshot");
1246        assert_eq!(binding.state, TriggerState::Draining);
1247        assert_eq!(binding.metrics.in_flight, 1);
1248
1249        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1250            .await
1251            .expect("finish in-flight event");
1252        let binding = snapshot_trigger_bindings()
1253            .into_iter()
1254            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1255            .expect("binding snapshot");
1256        assert_eq!(binding.state, TriggerState::Terminated);
1257        assert_eq!(binding.metrics.in_flight, 0);
1258
1259        clear_trigger_registry();
1260    }
1261
1262    #[tokio::test(flavor = "current_thread")]
1263    async fn hot_reload_registers_new_version_while_old_binding_drains() {
1264        clear_trigger_registry();
1265
1266        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1267            .await
1268            .expect("initial manifest trigger installs");
1269        begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1270
1271        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1272            .await
1273            .expect("updated manifest trigger installs");
1274
1275        let snapshots = snapshot_trigger_bindings();
1276        assert_eq!(snapshots.len(), 2);
1277        let old = snapshots
1278            .iter()
1279            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1280            .expect("old binding");
1281        let new = snapshots
1282            .iter()
1283            .find(|binding| binding.id == "github-new-issue" && binding.version == 2)
1284            .expect("new binding");
1285        assert_eq!(old.state, TriggerState::Draining);
1286        assert_eq!(new.state, TriggerState::Active);
1287
1288        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1289            .await
1290            .expect("finish old in-flight event");
1291        let old = snapshot_trigger_bindings()
1292            .into_iter()
1293            .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1294            .expect("old binding");
1295        assert_eq!(old.state, TriggerState::Terminated);
1296
1297        clear_trigger_registry();
1298    }
1299
1300    #[tokio::test(flavor = "current_thread")]
1301    async fn gc_drops_terminated_versions_beyond_retention_limit() {
1302        clear_trigger_registry();
1303
1304        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1305            .await
1306            .expect("install v1");
1307        begin_in_flight("github-new-issue", 1).expect("pin v1");
1308
1309        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1310            .await
1311            .expect("install v2");
1312        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1313            .await
1314            .expect("finish v1");
1315
1316        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
1317            .await
1318            .expect("install v3");
1319
1320        let snapshots = snapshot_trigger_bindings();
1321        let versions: Vec<u32> = snapshots
1322            .into_iter()
1323            .filter(|binding| binding.id == "github-new-issue")
1324            .map(|binding| binding.version)
1325            .collect();
1326        assert_eq!(versions, vec![2, 3]);
1327
1328        clear_trigger_registry();
1329    }
1330
1331    #[tokio::test(flavor = "current_thread")]
1332    async fn lifecycle_transitions_append_to_event_log() {
1333        clear_trigger_registry();
1334        reset_active_event_log();
1335        let tempdir = tempfile::tempdir().expect("tempdir");
1336        let log = install_default_for_base_dir(tempdir.path()).expect("install event log");
1337
1338        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1339            .await
1340            .expect("manifest trigger installs");
1341        begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1342        drain("github-new-issue").await.expect("drain succeeds");
1343        finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1344            .await
1345            .expect("finish event");
1346
1347        let topic = Topic::new("triggers.lifecycle").expect("valid lifecycle topic");
1348        let events = log
1349            .read_range(&topic, None, 32)
1350            .await
1351            .expect("read lifecycle events");
1352        let states: Vec<String> = events
1353            .into_iter()
1354            .filter_map(|(_, event)| {
1355                event
1356                    .payload
1357                    .get("to_state")
1358                    .and_then(|value| value.as_str())
1359                    .map(|value| value.to_string())
1360            })
1361            .collect();
1362        assert_eq!(
1363            states,
1364            vec![
1365                "registering".to_string(),
1366                "active".to_string(),
1367                "draining".to_string(),
1368                "terminated".to_string(),
1369            ]
1370        );
1371
1372        reset_active_event_log();
1373        clear_trigger_registry();
1374    }
1375
1376    #[tokio::test(flavor = "current_thread")]
1377    async fn version_history_reuses_historical_version_after_restart() {
1378        clear_trigger_registry();
1379        reset_active_event_log();
1380        let tempdir = tempfile::tempdir().expect("tempdir");
1381        install_default_for_base_dir(tempdir.path()).expect("install event log");
1382
1383        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1384            .await
1385            .expect("initial manifest trigger installs");
1386        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1387            .await
1388            .expect("updated manifest trigger installs");
1389
1390        clear_trigger_registry();
1391        reset_active_event_log();
1392        install_default_for_base_dir(tempdir.path()).expect("reopen event log");
1393
1394        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1395            .await
1396            .expect("manifest reload reuses historical version");
1397
1398        let binding = snapshot_trigger_bindings()
1399            .into_iter()
1400            .find(|binding| binding.id == "github-new-issue")
1401            .expect("binding snapshot");
1402        assert_eq!(binding.version, 2);
1403
1404        reset_active_event_log();
1405        clear_trigger_registry();
1406    }
1407
1408    #[tokio::test(flavor = "current_thread")]
1409    async fn binding_version_as_of_reports_historical_active_version() {
1410        clear_trigger_registry();
1411        reset_active_event_log();
1412        let tempdir = tempfile::tempdir().expect("tempdir");
1413        install_default_for_base_dir(tempdir.path()).expect("install event log");
1414
1415        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1416            .await
1417            .expect("initial manifest trigger installs");
1418        let before_reload = OffsetDateTime::now_utc();
1419        std::thread::sleep(std::time::Duration::from_millis(10));
1420
1421        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1422            .await
1423            .expect("updated manifest trigger installs");
1424        let after_reload = OffsetDateTime::now_utc();
1425
1426        assert_eq!(
1427            binding_version_as_of("github-new-issue", before_reload)
1428                .expect("version before reload"),
1429            1
1430        );
1431        assert_eq!(
1432            binding_version_as_of("github-new-issue", after_reload).expect("version after reload"),
1433            2
1434        );
1435
1436        reset_active_event_log();
1437        clear_trigger_registry();
1438    }
1439
1440    #[tokio::test(flavor = "current_thread")]
1441    async fn resolve_live_or_as_of_logs_structured_gc_fallback() {
1442        clear_trigger_registry();
1443        reset_active_event_log();
1444        let sink = Rc::new(CollectorSink::new());
1445        clear_event_sinks();
1446        add_event_sink(sink.clone());
1447        let tempdir = tempfile::tempdir().expect("tempdir");
1448        install_default_for_base_dir(tempdir.path()).expect("install event log");
1449
1450        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1451            .await
1452            .expect("install v1");
1453        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1454            .await
1455            .expect("install v2");
1456        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
1457            .await
1458            .expect("install v3");
1459        let received_at = OffsetDateTime::now_utc();
1460        std::thread::sleep(std::time::Duration::from_millis(10));
1461        install_manifest_triggers(vec![manifest_spec("github-new-issue", "v4")])
1462            .await
1463            .expect("install v4");
1464
1465        let binding = resolve_live_or_as_of(
1466            "github-new-issue",
1467            RecordedTriggerBinding {
1468                version: 1,
1469                received_at,
1470            },
1471        )
1472        .expect("resolve fallback binding");
1473        assert_eq!(binding.version, 3);
1474
1475        let warning = sink
1476            .logs
1477            .borrow()
1478            .iter()
1479            .find(|log| log.category == "replay.binding_version_gc_fallback")
1480            .cloned()
1481            .expect("gc fallback warning");
1482        assert_eq!(warning.level, EventLevel::Warn);
1483        assert_eq!(
1484            warning.metadata.get("trigger_id"),
1485            Some(&serde_json::json!("github-new-issue"))
1486        );
1487        assert_eq!(
1488            warning.metadata.get("recorded_version"),
1489            Some(&serde_json::json!(1))
1490        );
1491        assert_eq!(
1492            warning.metadata.get("received_at"),
1493            Some(&serde_json::json!(received_at
1494                .format(&time::format_description::well_known::Rfc3339)
1495                .unwrap_or_else(|_| received_at.to_string())))
1496        );
1497        assert_eq!(
1498            warning.metadata.get("resolved_version"),
1499            Some(&serde_json::json!(3))
1500        );
1501
1502        clear_event_sinks();
1503        crate::events::reset_event_sinks();
1504        reset_active_event_log();
1505        clear_trigger_registry();
1506    }
1507}