1use serde::{Deserialize, Serialize};
2use std::cell::RefCell;
3use std::collections::{BTreeMap, BTreeSet};
4use std::path::PathBuf;
5use std::rc::Rc;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::{Arc, Mutex};
8
9use time::OffsetDateTime;
10use uuid::Uuid;
11
12use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogEvent, Topic};
13use crate::llm::trigger_predicate::TriggerPredicateBudget;
14use crate::secrets::{configured_default_chain, SecretProvider};
15use crate::triggers::test_util::clock;
16use crate::trust_graph::AutonomyTier;
17use crate::value::VmClosure;
18
19use super::dispatcher::TriggerRetryConfig;
20use super::flow_control::TriggerFlowControlConfig;
21use super::ProviderId;
22
23#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
24pub struct TriggerId(String);
25
26impl TriggerId {
27 pub fn new(value: impl Into<String>) -> Self {
28 Self(value.into())
29 }
30
31 pub fn as_str(&self) -> &str {
32 &self.0
33 }
34}
35
36impl std::fmt::Display for TriggerId {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 self.0.fmt(f)
39 }
40}
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
43#[serde(rename_all = "snake_case")]
44pub enum TriggerState {
45 Registering,
46 Active,
47 Draining,
48 Terminated,
49}
50
51impl TriggerState {
52 pub fn as_str(self) -> &'static str {
53 match self {
54 Self::Registering => "registering",
55 Self::Active => "active",
56 Self::Draining => "draining",
57 Self::Terminated => "terminated",
58 }
59 }
60}
61
62#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
63#[serde(rename_all = "snake_case")]
64pub enum TriggerBindingSource {
65 Manifest,
66 Dynamic,
67}
68
69impl TriggerBindingSource {
70 pub fn as_str(self) -> &'static str {
71 match self {
72 Self::Manifest => "manifest",
73 Self::Dynamic => "dynamic",
74 }
75 }
76}
77
78#[derive(Clone)]
79pub enum TriggerHandlerSpec {
80 Local {
81 raw: String,
82 closure: Rc<VmClosure>,
83 },
84 A2a {
85 target: String,
86 allow_cleartext: bool,
87 },
88 Worker {
89 queue: String,
90 },
91}
92
93impl std::fmt::Debug for TriggerHandlerSpec {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 match self {
96 Self::Local { raw, .. } => f.debug_struct("Local").field("raw", raw).finish(),
97 Self::A2a {
98 target,
99 allow_cleartext,
100 } => f
101 .debug_struct("A2a")
102 .field("target", target)
103 .field("allow_cleartext", allow_cleartext)
104 .finish(),
105 Self::Worker { queue } => f.debug_struct("Worker").field("queue", queue).finish(),
106 }
107 }
108}
109
110impl TriggerHandlerSpec {
111 pub fn kind(&self) -> &'static str {
112 match self {
113 Self::Local { .. } => "local",
114 Self::A2a { .. } => "a2a",
115 Self::Worker { .. } => "worker",
116 }
117 }
118}
119
120#[derive(Clone)]
121pub struct TriggerPredicateSpec {
122 pub raw: String,
123 pub closure: Rc<VmClosure>,
124}
125
126impl std::fmt::Debug for TriggerPredicateSpec {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 f.debug_struct("TriggerPredicateSpec")
129 .field("raw", &self.raw)
130 .finish()
131 }
132}
133
134#[derive(Clone, Debug)]
135pub struct TriggerBindingSpec {
136 pub id: String,
137 pub source: TriggerBindingSource,
138 pub kind: String,
139 pub provider: ProviderId,
140 pub autonomy_tier: AutonomyTier,
141 pub handler: TriggerHandlerSpec,
142 pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
143 pub when: Option<TriggerPredicateSpec>,
144 pub when_budget: Option<TriggerPredicateBudget>,
145 pub retry: TriggerRetryConfig,
146 pub match_events: Vec<String>,
147 pub dedupe_key: Option<String>,
148 pub dedupe_retention_days: u32,
149 pub filter: Option<String>,
150 pub daily_cost_usd: Option<f64>,
151 pub max_concurrent: Option<u32>,
152 pub flow_control: TriggerFlowControlConfig,
153 pub manifest_path: Option<PathBuf>,
154 pub package_name: Option<String>,
155 pub definition_fingerprint: String,
156}
157
158#[derive(Debug)]
159pub struct TriggerMetrics {
160 pub received: AtomicU64,
161 pub dispatched: AtomicU64,
162 pub failed: AtomicU64,
163 pub dlq: AtomicU64,
164 pub last_received_ms: Mutex<Option<i64>>,
165 pub cost_total_usd_micros: AtomicU64,
166 pub cost_today_usd_micros: AtomicU64,
167}
168
169impl Default for TriggerMetrics {
170 fn default() -> Self {
171 Self {
172 received: AtomicU64::new(0),
173 dispatched: AtomicU64::new(0),
174 failed: AtomicU64::new(0),
175 dlq: AtomicU64::new(0),
176 last_received_ms: Mutex::new(None),
177 cost_total_usd_micros: AtomicU64::new(0),
178 cost_today_usd_micros: AtomicU64::new(0),
179 }
180 }
181}
182
183#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
184pub struct TriggerMetricsSnapshot {
185 pub received: u64,
186 pub dispatched: u64,
187 pub failed: u64,
188 pub dlq: u64,
189 pub in_flight: u64,
190 pub last_received_ms: Option<i64>,
191 pub cost_total_usd_micros: u64,
192 pub cost_today_usd_micros: u64,
193}
194
195pub struct TriggerBinding {
196 pub id: TriggerId,
197 pub version: u32,
198 pub source: TriggerBindingSource,
199 pub kind: String,
200 pub provider: ProviderId,
201 pub autonomy_tier: AutonomyTier,
202 pub handler: TriggerHandlerSpec,
203 pub dispatch_priority: super::worker_queue::WorkerQueuePriority,
204 pub when: Option<TriggerPredicateSpec>,
205 pub when_budget: Option<TriggerPredicateBudget>,
206 pub retry: TriggerRetryConfig,
207 pub match_events: Vec<String>,
208 pub dedupe_key: Option<String>,
209 pub dedupe_retention_days: u32,
210 pub filter: Option<String>,
211 pub daily_cost_usd: Option<f64>,
212 pub max_concurrent: Option<u32>,
213 pub flow_control: TriggerFlowControlConfig,
214 pub manifest_path: Option<PathBuf>,
215 pub package_name: Option<String>,
216 pub definition_fingerprint: String,
217 pub state: Mutex<TriggerState>,
218 pub metrics: TriggerMetrics,
219 pub in_flight: AtomicU64,
220 pub cancel_token: Arc<AtomicBool>,
221 pub predicate_state: Mutex<TriggerPredicateState>,
222}
223
224#[derive(Clone, Debug, Default)]
225pub struct TriggerPredicateState {
226 pub budget_day_utc: Option<i32>,
227 pub consecutive_failures: u32,
228 pub breaker_open_until_ms: Option<i64>,
229}
230
231impl std::fmt::Debug for TriggerBinding {
232 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
233 f.debug_struct("TriggerBinding")
234 .field("id", &self.id)
235 .field("version", &self.version)
236 .field("source", &self.source)
237 .field("kind", &self.kind)
238 .field("provider", &self.provider)
239 .field("handler_kind", &self.handler.kind())
240 .field("state", &self.state_snapshot())
241 .finish()
242 }
243}
244
245impl TriggerBinding {
246 pub fn snapshot(&self) -> TriggerBindingSnapshot {
247 TriggerBindingSnapshot {
248 id: self.id.as_str().to_string(),
249 version: self.version,
250 source: self.source,
251 kind: self.kind.clone(),
252 provider: self.provider.as_str().to_string(),
253 autonomy_tier: self.autonomy_tier,
254 handler_kind: self.handler.kind().to_string(),
255 state: self.state_snapshot(),
256 metrics: self.metrics_snapshot(),
257 }
258 }
259
260 fn new(spec: TriggerBindingSpec, version: u32) -> Self {
261 Self {
262 id: TriggerId::new(spec.id),
263 version,
264 source: spec.source,
265 kind: spec.kind,
266 provider: spec.provider,
267 autonomy_tier: spec.autonomy_tier,
268 handler: spec.handler,
269 dispatch_priority: spec.dispatch_priority,
270 when: spec.when,
271 when_budget: spec.when_budget,
272 retry: spec.retry,
273 match_events: spec.match_events,
274 dedupe_key: spec.dedupe_key,
275 dedupe_retention_days: spec.dedupe_retention_days,
276 filter: spec.filter,
277 daily_cost_usd: spec.daily_cost_usd,
278 max_concurrent: spec.max_concurrent,
279 flow_control: spec.flow_control,
280 manifest_path: spec.manifest_path,
281 package_name: spec.package_name,
282 definition_fingerprint: spec.definition_fingerprint,
283 state: Mutex::new(TriggerState::Registering),
284 metrics: TriggerMetrics::default(),
285 in_flight: AtomicU64::new(0),
286 cancel_token: Arc::new(AtomicBool::new(false)),
287 predicate_state: Mutex::new(TriggerPredicateState::default()),
288 }
289 }
290
291 pub fn binding_key(&self) -> String {
292 format!("{}@v{}", self.id.as_str(), self.version)
293 }
294
295 pub fn state_snapshot(&self) -> TriggerState {
296 *self.state.lock().expect("trigger state poisoned")
297 }
298
299 pub fn metrics_snapshot(&self) -> TriggerMetricsSnapshot {
300 TriggerMetricsSnapshot {
301 received: self.metrics.received.load(Ordering::Relaxed),
302 dispatched: self.metrics.dispatched.load(Ordering::Relaxed),
303 failed: self.metrics.failed.load(Ordering::Relaxed),
304 dlq: self.metrics.dlq.load(Ordering::Relaxed),
305 in_flight: self.in_flight.load(Ordering::Relaxed),
306 last_received_ms: *self
307 .metrics
308 .last_received_ms
309 .lock()
310 .expect("trigger metrics poisoned"),
311 cost_total_usd_micros: self.metrics.cost_total_usd_micros.load(Ordering::Relaxed),
312 cost_today_usd_micros: self.metrics.cost_today_usd_micros.load(Ordering::Relaxed),
313 }
314 }
315}
316
317#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
318pub struct TriggerBindingSnapshot {
319 pub id: String,
320 pub version: u32,
321 pub source: TriggerBindingSource,
322 pub kind: String,
323 pub provider: String,
324 pub autonomy_tier: AutonomyTier,
325 pub handler_kind: String,
326 pub state: TriggerState,
327 pub metrics: TriggerMetricsSnapshot,
328}
329
330#[derive(Clone, Copy, Debug, PartialEq, Eq)]
331pub enum TriggerDispatchOutcome {
332 Dispatched,
333 Failed,
334 Dlq,
335}
336
337#[derive(Debug)]
338pub enum TriggerRegistryError {
339 DuplicateId(String),
340 InvalidSpec(String),
341 UnknownId(String),
342 UnknownBindingVersion { id: String, version: u32 },
343 EventLog(String),
344}
345
346impl std::fmt::Display for TriggerRegistryError {
347 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
348 match self {
349 Self::DuplicateId(id) => write!(f, "duplicate trigger id '{id}'"),
350 Self::InvalidSpec(message) | Self::EventLog(message) => f.write_str(message),
351 Self::UnknownId(id) => write!(f, "unknown trigger id '{id}'"),
352 Self::UnknownBindingVersion { id, version } => {
353 write!(f, "unknown trigger binding '{id}' version {version}")
354 }
355 }
356 }
357}
358
359impl std::error::Error for TriggerRegistryError {}
360
361#[derive(Default)]
362pub struct TriggerRegistry {
363 bindings: BTreeMap<String, Vec<Arc<TriggerBinding>>>,
364 by_provider: BTreeMap<String, BTreeSet<String>>,
365 event_log: Option<Arc<AnyEventLog>>,
366 secret_provider: Option<Arc<dyn SecretProvider>>,
367}
368
369thread_local! {
370 static TRIGGER_REGISTRY: RefCell<TriggerRegistry> = RefCell::new(TriggerRegistry::default());
371}
372
373const TERMINATED_VERSION_RETENTION_LIMIT: usize = 2;
374
375const TRIGGERS_LIFECYCLE_TOPIC: &str = "triggers.lifecycle";
376
377#[derive(Clone, Debug, Deserialize)]
378struct LifecycleStateTransitionRecord {
379 id: String,
380 version: u32,
381 #[serde(default)]
382 definition_fingerprint: Option<String>,
383 to_state: TriggerState,
384}
385
386#[derive(Clone, Debug)]
387struct HistoricalLifecycleRecord {
388 occurred_at_ms: i64,
389 transition: LifecycleStateTransitionRecord,
390}
391
392#[derive(Clone, Copy, Debug, PartialEq, Eq)]
393pub struct RecordedTriggerBinding {
394 pub version: u32,
395 pub received_at: OffsetDateTime,
396}
397
398#[derive(Clone, Copy, Debug, Default)]
399struct HistoricalVersionLookup {
400 matching_version: Option<u32>,
401 max_version: Option<u32>,
402}
403
404pub fn clear_trigger_registry() {
405 TRIGGER_REGISTRY.with(|slot| {
406 *slot.borrow_mut() = TriggerRegistry::default();
407 });
408}
409
410pub fn snapshot_trigger_bindings() -> Vec<TriggerBindingSnapshot> {
411 TRIGGER_REGISTRY.with(|slot| {
412 let registry = slot.borrow();
413 let mut snapshots = Vec::new();
414 for bindings in registry.bindings.values() {
415 for binding in bindings {
416 snapshots.push(binding.snapshot());
417 }
418 }
419 snapshots.sort_by(|left, right| {
420 left.id
421 .cmp(&right.id)
422 .then(left.version.cmp(&right.version))
423 .then(left.state.as_str().cmp(right.state.as_str()))
424 });
425 snapshots
426 })
427}
428
429#[allow(clippy::arc_with_non_send_sync)]
430pub fn resolve_trigger_binding_as_of(
431 id: &str,
432 as_of: OffsetDateTime,
433) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
434 let version = binding_version_as_of(id, as_of)?;
435 resolve_trigger_binding_version(id, version)
436}
437
438#[allow(clippy::arc_with_non_send_sync)]
439pub fn resolve_live_or_as_of(
440 id: &str,
441 recorded: RecordedTriggerBinding,
442) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
443 match resolve_live_trigger_binding(id, Some(recorded.version)) {
444 Ok(binding) => Ok(binding),
445 Err(TriggerRegistryError::UnknownBindingVersion { .. }) => {
446 let binding = resolve_trigger_binding_as_of(id, recorded.received_at)?;
447 let mut metadata = BTreeMap::new();
448 metadata.insert("trigger_id".to_string(), serde_json::json!(id));
449 metadata.insert(
450 "recorded_version".to_string(),
451 serde_json::json!(recorded.version),
452 );
453 metadata.insert(
454 "received_at".to_string(),
455 serde_json::json!(recorded
456 .received_at
457 .format(&time::format_description::well_known::Rfc3339)
458 .unwrap_or_else(|_| recorded.received_at.to_string())),
459 );
460 metadata.insert(
461 "resolved_version".to_string(),
462 serde_json::json!(binding.version),
463 );
464 crate::events::log_warn_meta(
465 "replay.binding_version_gc_fallback",
466 "trigger replay fell back to lifecycle history after binding version GC",
467 metadata,
468 );
469 Ok(binding)
470 }
471 Err(error) => Err(error),
472 }
473}
474
475pub fn binding_version_as_of(id: &str, as_of: OffsetDateTime) -> Result<u32, TriggerRegistryError> {
476 TRIGGER_REGISTRY.with(|slot| {
477 let registry = slot.borrow();
478 registry.binding_version_as_of(id, as_of)
479 })
480}
481
482#[allow(clippy::arc_with_non_send_sync)]
483fn resolve_trigger_binding_version(
484 id: &str,
485 version: u32,
486) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
487 TRIGGER_REGISTRY.with(|slot| {
488 let registry = slot.borrow();
489 registry
490 .binding(id, version)
491 .ok_or_else(|| TriggerRegistryError::UnknownBindingVersion {
492 id: id.to_string(),
493 version,
494 })
495 })
496}
497
498#[allow(clippy::arc_with_non_send_sync)]
499pub fn resolve_live_trigger_binding(
500 id: &str,
501 version: Option<u32>,
502) -> Result<Arc<TriggerBinding>, TriggerRegistryError> {
503 TRIGGER_REGISTRY.with(|slot| {
504 let registry = slot.borrow();
505 if let Some(version) = version {
506 let binding = registry.binding(id, version).ok_or_else(|| {
507 TriggerRegistryError::UnknownBindingVersion {
508 id: id.to_string(),
509 version,
510 }
511 })?;
512 if binding.state_snapshot() == TriggerState::Terminated {
513 return Err(TriggerRegistryError::UnknownBindingVersion {
514 id: id.to_string(),
515 version,
516 });
517 }
518 return Ok(binding);
519 }
520
521 registry
522 .live_bindings_any_source(id)
523 .into_iter()
524 .max_by_key(|binding| binding.version)
525 .ok_or_else(|| TriggerRegistryError::UnknownId(id.to_string()))
526 })
527}
528
529pub(crate) fn matching_bindings(event: &super::TriggerEvent) -> Vec<Arc<TriggerBinding>> {
530 TRIGGER_REGISTRY.with(|slot| {
531 let registry = slot.borrow();
532 let Some(binding_ids) = registry.by_provider.get(event.provider.as_str()) else {
533 return Vec::new();
534 };
535
536 let mut bindings = Vec::new();
537 for id in binding_ids {
538 let Some(versions) = registry.bindings.get(id) else {
539 continue;
540 };
541 for binding in versions {
542 if binding.state_snapshot() != TriggerState::Active {
543 continue;
544 }
545 if !binding.match_events.is_empty()
546 && !binding.match_events.iter().any(|kind| kind == &event.kind)
547 {
548 continue;
549 }
550 bindings.push(binding.clone());
551 }
552 }
553
554 bindings.sort_by(|left, right| {
555 left.id
556 .as_str()
557 .cmp(right.id.as_str())
558 .then(left.version.cmp(&right.version))
559 });
560 bindings
561 })
562}
563
564pub async fn install_manifest_triggers(
565 specs: Vec<TriggerBindingSpec>,
566) -> Result<(), TriggerRegistryError> {
567 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
568 let registry = &mut *slot.borrow_mut();
569 registry.refresh_runtime_context();
570 let mut touched_ids = BTreeSet::new();
571
572 let mut incoming = BTreeMap::new();
573 for spec in specs {
574 let spec_id = spec.id.clone();
575 if spec.source != TriggerBindingSource::Manifest {
576 return Err(TriggerRegistryError::InvalidSpec(format!(
577 "manifest install received non-manifest trigger '{}'",
578 spec_id
579 )));
580 }
581 if spec_id.trim().is_empty() {
582 return Err(TriggerRegistryError::InvalidSpec(
583 "manifest trigger id cannot be empty".to_string(),
584 ));
585 }
586 if incoming.insert(spec_id.clone(), spec).is_some() {
587 return Err(TriggerRegistryError::DuplicateId(spec_id));
588 }
589 }
590
591 let mut lifecycle = Vec::new();
592 let existing_ids: Vec<String> = registry
593 .bindings
594 .iter()
595 .filter(|(_, bindings)| {
596 bindings.iter().any(|binding| {
597 binding.source == TriggerBindingSource::Manifest
598 && binding.state_snapshot() != TriggerState::Terminated
599 })
600 })
601 .map(|(id, _)| id.clone())
602 .collect();
603
604 for id in existing_ids {
605 let live_manifest = registry.live_bindings(&id, TriggerBindingSource::Manifest);
606 let Some(spec) = incoming.remove(&id) else {
607 for binding in live_manifest {
608 registry.transition_binding_to_draining(&binding, &mut lifecycle);
609 }
610 touched_ids.insert(id.clone());
611 continue;
612 };
613
614 let has_matching_active = live_manifest.iter().any(|binding| {
615 binding.definition_fingerprint == spec.definition_fingerprint
616 && matches!(
617 binding.state_snapshot(),
618 TriggerState::Registering | TriggerState::Active
619 )
620 });
621 if has_matching_active {
622 continue;
623 }
624
625 for binding in live_manifest {
626 registry.transition_binding_to_draining(&binding, &mut lifecycle);
627 }
628
629 let version = registry.next_version_for_spec(&spec);
630 registry.register_binding(spec, version, &mut lifecycle);
631 touched_ids.insert(id.clone());
632 }
633
634 for spec in incoming.into_values() {
635 touched_ids.insert(spec.id.clone());
636 let version = registry.next_version_for_spec(&spec);
637 registry.register_binding(spec, version, &mut lifecycle);
638 }
639
640 for id in touched_ids {
641 registry.gc_terminated_versions(&id);
642 }
643
644 Ok((registry.event_log.clone(), lifecycle))
645 })?;
646
647 append_lifecycle_events(event_log, events).await
648}
649
650pub async fn dynamic_register(
651 mut spec: TriggerBindingSpec,
652) -> Result<TriggerId, TriggerRegistryError> {
653 if spec.id.trim().is_empty() {
654 spec.id = format!("dynamic_trigger_{}", Uuid::now_v7());
655 }
656 spec.source = TriggerBindingSource::Dynamic;
657 let id = spec.id.clone();
658 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
659 let registry = &mut *slot.borrow_mut();
660 registry.refresh_runtime_context();
661
662 if registry.bindings.contains_key(id.as_str()) {
663 return Err(TriggerRegistryError::DuplicateId(id.clone()));
664 }
665
666 let mut lifecycle = Vec::new();
667 let version = registry.next_version_for_spec(&spec);
668 registry.register_binding(spec, version, &mut lifecycle);
669 Ok((registry.event_log.clone(), lifecycle))
670 })?;
671
672 append_lifecycle_events(event_log, events).await?;
673 Ok(TriggerId::new(id))
674}
675
676pub async fn dynamic_deregister(id: &str) -> Result<(), TriggerRegistryError> {
677 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
678 let registry = &mut *slot.borrow_mut();
679 let live_dynamic = registry.live_bindings(id, TriggerBindingSource::Dynamic);
680 if live_dynamic.is_empty() {
681 return Err(TriggerRegistryError::UnknownId(id.to_string()));
682 }
683
684 let mut lifecycle = Vec::new();
685 for binding in live_dynamic {
686 registry.transition_binding_to_draining(&binding, &mut lifecycle);
687 }
688 Ok((registry.event_log.clone(), lifecycle))
689 })?;
690
691 append_lifecycle_events(event_log, events).await
692}
693
694pub async fn drain(id: &str) -> Result<(), TriggerRegistryError> {
695 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
696 let registry = &mut *slot.borrow_mut();
697 let live = registry.live_bindings_any_source(id);
698 if live.is_empty() {
699 return Err(TriggerRegistryError::UnknownId(id.to_string()));
700 }
701
702 let mut lifecycle = Vec::new();
703 for binding in live {
704 registry.transition_binding_to_draining(&binding, &mut lifecycle);
705 }
706 Ok((registry.event_log.clone(), lifecycle))
707 })?;
708
709 append_lifecycle_events(event_log, events).await
710}
711
712fn pin_trigger_binding_inner(
713 id: &str,
714 version: u32,
715 allow_terminated: bool,
716) -> Result<(), TriggerRegistryError> {
717 TRIGGER_REGISTRY.with(|slot| {
718 let registry = slot.borrow();
719 let binding = registry.binding(id, version).ok_or_else(|| {
720 TriggerRegistryError::UnknownBindingVersion {
721 id: id.to_string(),
722 version,
723 }
724 })?;
725 match binding.state_snapshot() {
726 TriggerState::Terminated if !allow_terminated => {
727 Err(TriggerRegistryError::InvalidSpec(format!(
728 "trigger binding '{}' version {} is terminated",
729 id, version
730 )))
731 }
732 _ => {
733 binding.in_flight.fetch_add(1, Ordering::Relaxed);
734 Ok(())
735 }
736 }
737 })
738}
739
740pub fn pin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
741 pin_trigger_binding_inner(id, version, false)
742}
743
744pub async fn unpin_trigger_binding(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
745 let (event_log, events) = TRIGGER_REGISTRY.with(|slot| {
746 let registry = &mut *slot.borrow_mut();
747 let binding = registry.binding(id, version).ok_or_else(|| {
748 TriggerRegistryError::UnknownBindingVersion {
749 id: id.to_string(),
750 version,
751 }
752 })?;
753 let current = binding.in_flight.load(Ordering::Relaxed);
754 if current == 0 {
755 return Err(TriggerRegistryError::InvalidSpec(format!(
756 "trigger binding '{}' version {} has no in-flight events",
757 id, version
758 )));
759 }
760 binding.in_flight.fetch_sub(1, Ordering::Relaxed);
761
762 let mut lifecycle = Vec::new();
763 registry.maybe_finalize_draining(&binding, &mut lifecycle);
764 registry.gc_terminated_versions(binding.id.as_str());
765 Ok((registry.event_log.clone(), lifecycle))
766 })?;
767
768 append_lifecycle_events(event_log, events).await
769}
770
771pub fn begin_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
772 begin_in_flight_inner(id, version, false)
773}
774
775pub(crate) fn begin_replay_in_flight(id: &str, version: u32) -> Result<(), TriggerRegistryError> {
776 begin_in_flight_inner(id, version, true)
777}
778
779fn begin_in_flight_inner(
780 id: &str,
781 version: u32,
782 allow_terminated: bool,
783) -> Result<(), TriggerRegistryError> {
784 pin_trigger_binding_inner(id, version, allow_terminated)?;
785 TRIGGER_REGISTRY.with(|slot| {
786 let registry = slot.borrow();
787 let binding = registry.binding(id, version).ok_or_else(|| {
788 TriggerRegistryError::UnknownBindingVersion {
789 id: id.to_string(),
790 version,
791 }
792 })?;
793 binding.metrics.received.fetch_add(1, Ordering::Relaxed);
794 *binding
795 .metrics
796 .last_received_ms
797 .lock()
798 .expect("trigger metrics poisoned") = Some(now_ms());
799 Ok(())
800 })
801}
802
803pub async fn finish_in_flight(
804 id: &str,
805 version: u32,
806 outcome: TriggerDispatchOutcome,
807) -> Result<(), TriggerRegistryError> {
808 TRIGGER_REGISTRY.with(|slot| {
809 let registry = &mut *slot.borrow_mut();
810 let binding = registry.binding(id, version).ok_or_else(|| {
811 TriggerRegistryError::UnknownBindingVersion {
812 id: id.to_string(),
813 version,
814 }
815 })?;
816 let current = binding.in_flight.load(Ordering::Relaxed);
817 if current == 0 {
818 return Err(TriggerRegistryError::InvalidSpec(format!(
819 "trigger binding '{}' version {} has no in-flight events",
820 id, version
821 )));
822 }
823 match outcome {
824 TriggerDispatchOutcome::Dispatched => {
825 binding.metrics.dispatched.fetch_add(1, Ordering::Relaxed);
826 }
827 TriggerDispatchOutcome::Failed => {
828 binding.metrics.failed.fetch_add(1, Ordering::Relaxed);
829 }
830 TriggerDispatchOutcome::Dlq => {
831 binding.metrics.dlq.fetch_add(1, Ordering::Relaxed);
832 }
833 }
834 Ok(())
835 })?;
836
837 unpin_trigger_binding(id, version).await
838}
839
840impl TriggerRegistry {
841 fn refresh_runtime_context(&mut self) {
842 if self.event_log.is_none() {
843 self.event_log = active_event_log();
844 }
845 if self.secret_provider.is_none() {
846 self.secret_provider = default_secret_provider();
847 }
848 }
849
850 fn binding(&self, id: &str, version: u32) -> Option<Arc<TriggerBinding>> {
851 self.bindings
852 .get(id)
853 .and_then(|bindings| bindings.iter().find(|binding| binding.version == version))
854 .cloned()
855 }
856
857 fn live_bindings(&self, id: &str, source: TriggerBindingSource) -> Vec<Arc<TriggerBinding>> {
858 self.bindings
859 .get(id)
860 .into_iter()
861 .flat_map(|bindings| bindings.iter())
862 .filter(|binding| {
863 binding.source == source && binding.state_snapshot() != TriggerState::Terminated
864 })
865 .cloned()
866 .collect()
867 }
868
869 fn live_bindings_any_source(&self, id: &str) -> Vec<Arc<TriggerBinding>> {
870 self.bindings
871 .get(id)
872 .into_iter()
873 .flat_map(|bindings| bindings.iter())
874 .filter(|binding| binding.state_snapshot() != TriggerState::Terminated)
875 .cloned()
876 .collect()
877 }
878
879 fn next_version_for_spec(&self, spec: &TriggerBindingSpec) -> u32 {
880 if let Some(version) = self
881 .bindings
882 .get(spec.id.as_str())
883 .into_iter()
884 .flat_map(|bindings| bindings.iter())
885 .find(|binding| binding.definition_fingerprint == spec.definition_fingerprint)
886 .map(|binding| binding.version)
887 {
888 return version;
889 }
890
891 let historical =
892 self.historical_versions_for(spec.id.as_str(), spec.definition_fingerprint.as_str());
893 if let Some(version) = historical.matching_version {
894 return version;
895 }
896
897 self.bindings
898 .get(spec.id.as_str())
899 .into_iter()
900 .flat_map(|bindings| bindings.iter())
901 .map(|binding| binding.version)
902 .chain(historical.max_version)
903 .max()
904 .unwrap_or(0)
905 + 1
906 }
907
908 fn gc_terminated_versions(&mut self, id: &str) {
909 let Some(bindings) = self.bindings.get_mut(id) else {
910 return;
911 };
912
913 let mut newest_versions: Vec<u32> =
914 bindings.iter().map(|binding| binding.version).collect();
915 newest_versions.sort_unstable_by(|left, right| right.cmp(left));
916 newest_versions.truncate(TERMINATED_VERSION_RETENTION_LIMIT);
917 let retained_versions: BTreeSet<u32> = newest_versions.into_iter().collect();
918
919 bindings.retain(|binding| {
920 binding.state_snapshot() != TriggerState::Terminated
921 || retained_versions.contains(&binding.version)
922 });
923
924 if bindings.is_empty() {
925 self.bindings.remove(id);
926 }
927 }
928
929 fn historical_versions_for(&self, id: &str, fingerprint: &str) -> HistoricalVersionLookup {
930 let mut lookup = HistoricalVersionLookup::default();
931 for record in self.lifecycle_records_for(id) {
932 lookup.max_version = Some(
933 lookup
934 .max_version
935 .unwrap_or(0)
936 .max(record.transition.version),
937 );
938 if record.transition.definition_fingerprint.as_deref() == Some(fingerprint) {
939 lookup.matching_version = Some(record.transition.version);
940 }
941 }
942 lookup
943 }
944
945 fn binding_version_as_of(
946 &self,
947 id: &str,
948 as_of: OffsetDateTime,
949 ) -> Result<u32, TriggerRegistryError> {
950 let cutoff_ms = (as_of.unix_timestamp_nanos() / 1_000_000) as i64;
951 let mut active_version = None;
952 for record in self.lifecycle_records_for(id) {
953 if record.occurred_at_ms > cutoff_ms {
954 break;
955 }
956 match record.transition.to_state {
957 TriggerState::Active => active_version = Some(record.transition.version),
958 TriggerState::Draining | TriggerState::Terminated => {
959 if active_version == Some(record.transition.version) {
960 active_version = None;
961 }
962 }
963 TriggerState::Registering => {}
964 }
965 }
966
967 active_version.ok_or_else(|| {
968 TriggerRegistryError::InvalidSpec(format!(
969 "no active trigger binding '{}' at {}",
970 id,
971 as_of
972 .format(&time::format_description::well_known::Rfc3339)
973 .unwrap_or_else(|_| as_of.to_string())
974 ))
975 })
976 }
977
978 fn lifecycle_records_for(&self, id: &str) -> Vec<HistoricalLifecycleRecord> {
979 let Some(event_log) = self.event_log.as_ref() else {
980 return Vec::new();
981 };
982 let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
983 .expect("static triggers.lifecycle topic should always be valid");
984 futures::executor::block_on(event_log.read_range(&topic, None, usize::MAX))
985 .unwrap_or_default()
986 .into_iter()
987 .filter_map(|(_, event)| {
988 let occurred_at_ms = event.occurred_at_ms;
989 let transition: LifecycleStateTransitionRecord =
990 serde_json::from_value(event.payload).ok()?;
991 (transition.id == id).then_some(HistoricalLifecycleRecord {
992 occurred_at_ms,
993 transition,
994 })
995 })
996 .collect()
997 }
998
999 #[allow(clippy::arc_with_non_send_sync)]
1000 fn register_binding(
1001 &mut self,
1002 spec: TriggerBindingSpec,
1003 version: u32,
1004 lifecycle: &mut Vec<LogEvent>,
1005 ) -> Arc<TriggerBinding> {
1006 let binding = Arc::new(TriggerBinding::new(spec, version));
1007 self.by_provider
1008 .entry(binding.provider.as_str().to_string())
1009 .or_default()
1010 .insert(binding.id.as_str().to_string());
1011 self.bindings
1012 .entry(binding.id.as_str().to_string())
1013 .or_default()
1014 .push(binding.clone());
1015 lifecycle.push(lifecycle_event(&binding, None, TriggerState::Registering));
1016 self.transition_binding_state(&binding, TriggerState::Active, lifecycle);
1017 binding
1018 }
1019
1020 fn transition_binding_to_draining(
1021 &self,
1022 binding: &Arc<TriggerBinding>,
1023 lifecycle: &mut Vec<LogEvent>,
1024 ) {
1025 if matches!(binding.state_snapshot(), TriggerState::Terminated) {
1026 return;
1027 }
1028 self.transition_binding_state(binding, TriggerState::Draining, lifecycle);
1029 self.maybe_finalize_draining(binding, lifecycle);
1030 }
1031
1032 fn maybe_finalize_draining(
1033 &self,
1034 binding: &Arc<TriggerBinding>,
1035 lifecycle: &mut Vec<LogEvent>,
1036 ) {
1037 if binding.state_snapshot() == TriggerState::Draining
1038 && binding.in_flight.load(Ordering::Relaxed) == 0
1039 {
1040 self.transition_binding_state(binding, TriggerState::Terminated, lifecycle);
1041 }
1042 }
1043
1044 fn transition_binding_state(
1045 &self,
1046 binding: &Arc<TriggerBinding>,
1047 next: TriggerState,
1048 lifecycle: &mut Vec<LogEvent>,
1049 ) {
1050 let mut state = binding.state.lock().expect("trigger state poisoned");
1051 let previous = *state;
1052 if previous == next {
1053 return;
1054 }
1055 *state = next;
1056 drop(state);
1057 lifecycle.push(lifecycle_event(binding, Some(previous), next));
1058 }
1059}
1060
1061fn lifecycle_event(
1062 binding: &TriggerBinding,
1063 from_state: Option<TriggerState>,
1064 to_state: TriggerState,
1065) -> LogEvent {
1066 LogEvent::new(
1067 "state_transition",
1068 serde_json::json!({
1069 "id": binding.id.as_str(),
1070 "binding_key": binding.binding_key(),
1071 "version": binding.version,
1072 "provider": binding.provider.as_str(),
1073 "kind": &binding.kind,
1074 "source": binding.source.as_str(),
1075 "handler_kind": binding.handler.kind(),
1076 "definition_fingerprint": &binding.definition_fingerprint,
1077 "from_state": from_state.map(TriggerState::as_str),
1078 "to_state": to_state.as_str(),
1079 }),
1080 )
1081}
1082
1083async fn append_lifecycle_events(
1084 event_log: Option<Arc<AnyEventLog>>,
1085 events: Vec<LogEvent>,
1086) -> Result<(), TriggerRegistryError> {
1087 let Some(event_log) = event_log else {
1088 return Ok(());
1089 };
1090 if events.is_empty() {
1091 return Ok(());
1092 }
1093
1094 let topic = Topic::new(TRIGGERS_LIFECYCLE_TOPIC)
1095 .expect("static triggers.lifecycle topic should always be valid");
1096 for event in events {
1097 event_log
1098 .append(&topic, event)
1099 .await
1100 .map_err(|error| TriggerRegistryError::EventLog(error.to_string()))?;
1101 }
1102 Ok(())
1103}
1104
1105fn default_secret_provider() -> Option<Arc<dyn SecretProvider>> {
1106 configured_default_chain(default_secret_namespace())
1107 .ok()
1108 .map(|provider| Arc::new(provider) as Arc<dyn SecretProvider>)
1109}
1110
1111fn default_secret_namespace() -> String {
1112 if let Ok(namespace) = std::env::var("HARN_SECRET_NAMESPACE") {
1113 if !namespace.trim().is_empty() {
1114 return namespace;
1115 }
1116 }
1117
1118 let cwd = std::env::current_dir().unwrap_or_default();
1119 let leaf = cwd
1120 .file_name()
1121 .and_then(|name| name.to_str())
1122 .filter(|name| !name.is_empty())
1123 .unwrap_or("workspace");
1124 format!("harn/{leaf}")
1125}
1126
1127fn now_ms() -> i64 {
1128 clock::now_ms()
1129}
1130
1131#[cfg(test)]
1132mod tests {
1133 use super::*;
1134 use crate::event_log::{install_default_for_base_dir, reset_active_event_log};
1135 use crate::events::{add_event_sink, clear_event_sinks, CollectorSink, EventLevel};
1136 use std::rc::Rc;
1137 use time::OffsetDateTime;
1138
1139 fn manifest_spec(id: &str, fingerprint: &str) -> TriggerBindingSpec {
1140 TriggerBindingSpec {
1141 id: id.to_string(),
1142 source: TriggerBindingSource::Manifest,
1143 kind: "webhook".to_string(),
1144 provider: ProviderId::from("github"),
1145 autonomy_tier: crate::AutonomyTier::ActAuto,
1146 handler: TriggerHandlerSpec::Worker {
1147 queue: format!("{id}-queue"),
1148 },
1149 dispatch_priority: crate::WorkerQueuePriority::Normal,
1150 when: None,
1151 when_budget: None,
1152 retry: TriggerRetryConfig::default(),
1153 match_events: vec!["issues.opened".to_string()],
1154 dedupe_key: Some("event.dedupe_key".to_string()),
1155 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1156 filter: Some("event.kind".to_string()),
1157 daily_cost_usd: Some(5.0),
1158 max_concurrent: Some(10),
1159 flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1160 manifest_path: None,
1161 package_name: Some("workspace".to_string()),
1162 definition_fingerprint: fingerprint.to_string(),
1163 }
1164 }
1165
1166 fn dynamic_spec(id: &str) -> TriggerBindingSpec {
1167 TriggerBindingSpec {
1168 id: id.to_string(),
1169 source: TriggerBindingSource::Dynamic,
1170 kind: "webhook".to_string(),
1171 provider: ProviderId::from("github"),
1172 autonomy_tier: crate::AutonomyTier::ActAuto,
1173 handler: TriggerHandlerSpec::Worker {
1174 queue: format!("{id}-queue"),
1175 },
1176 dispatch_priority: crate::WorkerQueuePriority::Normal,
1177 when: None,
1178 when_budget: None,
1179 retry: TriggerRetryConfig::default(),
1180 match_events: vec!["issues.opened".to_string()],
1181 dedupe_key: None,
1182 dedupe_retention_days: crate::triggers::DEFAULT_INBOX_RETENTION_DAYS,
1183 filter: None,
1184 daily_cost_usd: None,
1185 max_concurrent: None,
1186 flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1187 manifest_path: None,
1188 package_name: None,
1189 definition_fingerprint: format!("dynamic:{id}"),
1190 }
1191 }
1192
1193 #[tokio::test(flavor = "current_thread")]
1194 async fn manifest_loaded_trigger_registers_with_zeroed_metrics() {
1195 clear_trigger_registry();
1196
1197 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1198 .await
1199 .expect("manifest trigger installs");
1200
1201 let snapshots = snapshot_trigger_bindings();
1202 assert_eq!(snapshots.len(), 1);
1203 let binding = &snapshots[0];
1204 assert_eq!(binding.id, "github-new-issue");
1205 assert_eq!(binding.version, 1);
1206 assert_eq!(binding.state, TriggerState::Active);
1207 assert_eq!(binding.metrics, TriggerMetricsSnapshot::default());
1208
1209 clear_trigger_registry();
1210 }
1211
1212 #[tokio::test(flavor = "current_thread")]
1213 async fn dynamic_register_assigns_unique_ids_and_rejects_duplicates() {
1214 clear_trigger_registry();
1215
1216 let first = dynamic_register(dynamic_spec("dynamic-a"))
1217 .await
1218 .expect("first dynamic trigger");
1219 let second = dynamic_register(dynamic_spec("dynamic-b"))
1220 .await
1221 .expect("second dynamic trigger");
1222 assert_ne!(first, second);
1223
1224 let error = dynamic_register(dynamic_spec("dynamic-a"))
1225 .await
1226 .expect_err("duplicate id should fail");
1227 assert!(matches!(error, TriggerRegistryError::DuplicateId(_)));
1228
1229 clear_trigger_registry();
1230 }
1231
1232 #[tokio::test(flavor = "current_thread")]
1233 async fn drain_waits_for_in_flight_events_before_terminating() {
1234 clear_trigger_registry();
1235
1236 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1237 .await
1238 .expect("manifest trigger installs");
1239 begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1240
1241 drain("github-new-issue").await.expect("drain succeeds");
1242 let binding = snapshot_trigger_bindings()
1243 .into_iter()
1244 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1245 .expect("binding snapshot");
1246 assert_eq!(binding.state, TriggerState::Draining);
1247 assert_eq!(binding.metrics.in_flight, 1);
1248
1249 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1250 .await
1251 .expect("finish in-flight event");
1252 let binding = snapshot_trigger_bindings()
1253 .into_iter()
1254 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1255 .expect("binding snapshot");
1256 assert_eq!(binding.state, TriggerState::Terminated);
1257 assert_eq!(binding.metrics.in_flight, 0);
1258
1259 clear_trigger_registry();
1260 }
1261
1262 #[tokio::test(flavor = "current_thread")]
1263 async fn hot_reload_registers_new_version_while_old_binding_drains() {
1264 clear_trigger_registry();
1265
1266 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1267 .await
1268 .expect("initial manifest trigger installs");
1269 begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1270
1271 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1272 .await
1273 .expect("updated manifest trigger installs");
1274
1275 let snapshots = snapshot_trigger_bindings();
1276 assert_eq!(snapshots.len(), 2);
1277 let old = snapshots
1278 .iter()
1279 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1280 .expect("old binding");
1281 let new = snapshots
1282 .iter()
1283 .find(|binding| binding.id == "github-new-issue" && binding.version == 2)
1284 .expect("new binding");
1285 assert_eq!(old.state, TriggerState::Draining);
1286 assert_eq!(new.state, TriggerState::Active);
1287
1288 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1289 .await
1290 .expect("finish old in-flight event");
1291 let old = snapshot_trigger_bindings()
1292 .into_iter()
1293 .find(|binding| binding.id == "github-new-issue" && binding.version == 1)
1294 .expect("old binding");
1295 assert_eq!(old.state, TriggerState::Terminated);
1296
1297 clear_trigger_registry();
1298 }
1299
1300 #[tokio::test(flavor = "current_thread")]
1301 async fn gc_drops_terminated_versions_beyond_retention_limit() {
1302 clear_trigger_registry();
1303
1304 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1305 .await
1306 .expect("install v1");
1307 begin_in_flight("github-new-issue", 1).expect("pin v1");
1308
1309 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1310 .await
1311 .expect("install v2");
1312 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1313 .await
1314 .expect("finish v1");
1315
1316 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
1317 .await
1318 .expect("install v3");
1319
1320 let snapshots = snapshot_trigger_bindings();
1321 let versions: Vec<u32> = snapshots
1322 .into_iter()
1323 .filter(|binding| binding.id == "github-new-issue")
1324 .map(|binding| binding.version)
1325 .collect();
1326 assert_eq!(versions, vec![2, 3]);
1327
1328 clear_trigger_registry();
1329 }
1330
1331 #[tokio::test(flavor = "current_thread")]
1332 async fn lifecycle_transitions_append_to_event_log() {
1333 clear_trigger_registry();
1334 reset_active_event_log();
1335 let tempdir = tempfile::tempdir().expect("tempdir");
1336 let log = install_default_for_base_dir(tempdir.path()).expect("install event log");
1337
1338 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1339 .await
1340 .expect("manifest trigger installs");
1341 begin_in_flight("github-new-issue", 1).expect("start in-flight event");
1342 drain("github-new-issue").await.expect("drain succeeds");
1343 finish_in_flight("github-new-issue", 1, TriggerDispatchOutcome::Dispatched)
1344 .await
1345 .expect("finish event");
1346
1347 let topic = Topic::new("triggers.lifecycle").expect("valid lifecycle topic");
1348 let events = log
1349 .read_range(&topic, None, 32)
1350 .await
1351 .expect("read lifecycle events");
1352 let states: Vec<String> = events
1353 .into_iter()
1354 .filter_map(|(_, event)| {
1355 event
1356 .payload
1357 .get("to_state")
1358 .and_then(|value| value.as_str())
1359 .map(|value| value.to_string())
1360 })
1361 .collect();
1362 assert_eq!(
1363 states,
1364 vec![
1365 "registering".to_string(),
1366 "active".to_string(),
1367 "draining".to_string(),
1368 "terminated".to_string(),
1369 ]
1370 );
1371
1372 reset_active_event_log();
1373 clear_trigger_registry();
1374 }
1375
1376 #[tokio::test(flavor = "current_thread")]
1377 async fn version_history_reuses_historical_version_after_restart() {
1378 clear_trigger_registry();
1379 reset_active_event_log();
1380 let tempdir = tempfile::tempdir().expect("tempdir");
1381 install_default_for_base_dir(tempdir.path()).expect("install event log");
1382
1383 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1384 .await
1385 .expect("initial manifest trigger installs");
1386 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1387 .await
1388 .expect("updated manifest trigger installs");
1389
1390 clear_trigger_registry();
1391 reset_active_event_log();
1392 install_default_for_base_dir(tempdir.path()).expect("reopen event log");
1393
1394 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1395 .await
1396 .expect("manifest reload reuses historical version");
1397
1398 let binding = snapshot_trigger_bindings()
1399 .into_iter()
1400 .find(|binding| binding.id == "github-new-issue")
1401 .expect("binding snapshot");
1402 assert_eq!(binding.version, 2);
1403
1404 reset_active_event_log();
1405 clear_trigger_registry();
1406 }
1407
1408 #[tokio::test(flavor = "current_thread")]
1409 async fn binding_version_as_of_reports_historical_active_version() {
1410 clear_trigger_registry();
1411 reset_active_event_log();
1412 let tempdir = tempfile::tempdir().expect("tempdir");
1413 install_default_for_base_dir(tempdir.path()).expect("install event log");
1414
1415 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1416 .await
1417 .expect("initial manifest trigger installs");
1418 let before_reload = OffsetDateTime::now_utc();
1419 std::thread::sleep(std::time::Duration::from_millis(10));
1420
1421 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1422 .await
1423 .expect("updated manifest trigger installs");
1424 let after_reload = OffsetDateTime::now_utc();
1425
1426 assert_eq!(
1427 binding_version_as_of("github-new-issue", before_reload)
1428 .expect("version before reload"),
1429 1
1430 );
1431 assert_eq!(
1432 binding_version_as_of("github-new-issue", after_reload).expect("version after reload"),
1433 2
1434 );
1435
1436 reset_active_event_log();
1437 clear_trigger_registry();
1438 }
1439
1440 #[tokio::test(flavor = "current_thread")]
1441 async fn resolve_live_or_as_of_logs_structured_gc_fallback() {
1442 clear_trigger_registry();
1443 reset_active_event_log();
1444 let sink = Rc::new(CollectorSink::new());
1445 clear_event_sinks();
1446 add_event_sink(sink.clone());
1447 let tempdir = tempfile::tempdir().expect("tempdir");
1448 install_default_for_base_dir(tempdir.path()).expect("install event log");
1449
1450 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v1")])
1451 .await
1452 .expect("install v1");
1453 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v2")])
1454 .await
1455 .expect("install v2");
1456 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v3")])
1457 .await
1458 .expect("install v3");
1459 let received_at = OffsetDateTime::now_utc();
1460 std::thread::sleep(std::time::Duration::from_millis(10));
1461 install_manifest_triggers(vec![manifest_spec("github-new-issue", "v4")])
1462 .await
1463 .expect("install v4");
1464
1465 let binding = resolve_live_or_as_of(
1466 "github-new-issue",
1467 RecordedTriggerBinding {
1468 version: 1,
1469 received_at,
1470 },
1471 )
1472 .expect("resolve fallback binding");
1473 assert_eq!(binding.version, 3);
1474
1475 let warning = sink
1476 .logs
1477 .borrow()
1478 .iter()
1479 .find(|log| log.category == "replay.binding_version_gc_fallback")
1480 .cloned()
1481 .expect("gc fallback warning");
1482 assert_eq!(warning.level, EventLevel::Warn);
1483 assert_eq!(
1484 warning.metadata.get("trigger_id"),
1485 Some(&serde_json::json!("github-new-issue"))
1486 );
1487 assert_eq!(
1488 warning.metadata.get("recorded_version"),
1489 Some(&serde_json::json!(1))
1490 );
1491 assert_eq!(
1492 warning.metadata.get("received_at"),
1493 Some(&serde_json::json!(received_at
1494 .format(&time::format_description::well_known::Rfc3339)
1495 .unwrap_or_else(|_| received_at.to_string())))
1496 );
1497 assert_eq!(
1498 warning.metadata.get("resolved_version"),
1499 Some(&serde_json::json!(3))
1500 );
1501
1502 clear_event_sinks();
1503 crate::events::reset_event_sinks();
1504 reset_active_event_log();
1505 clear_trigger_registry();
1506 }
1507}