Skip to main content

harn_vm/triggers/test_util/
mod.rs

1use std::collections::{BTreeMap, HashMap};
2use std::fs;
3use std::path::PathBuf;
4use std::sync::{Arc, Mutex};
5use std::time::Duration as StdDuration;
6
7use async_trait::async_trait;
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Value as JsonValue};
10use time::OffsetDateTime;
11use tokio::sync::Notify;
12use uuid::Uuid;
13
14use crate::connectors::a2a_push::A2aPushConnector;
15use crate::connectors::cron::{CatchupMode, CronConnector, CronEventSink};
16use crate::connectors::webhook::{
17    GenericWebhookConnector, WebhookProviderProfile, WebhookSignatureVariant,
18};
19use crate::connectors::{
20    Connector, ConnectorCtx, ConnectorError, MetricsRegistry, RateLimitConfig, RateLimiterFactory,
21    RawInbound, TriggerBinding as ConnectorTriggerBinding,
22};
23use crate::event_log::{
24    install_memory_for_current_thread, AnyEventLog, EventLog, FileEventLog, LogEvent,
25    MemoryEventLog, Topic,
26};
27use crate::secrets::{
28    RotationHandle, SecretBytes, SecretError, SecretId, SecretMeta, SecretProvider,
29};
30use crate::triggers::event::KnownProviderPayload;
31use crate::triggers::registry::{
32    TriggerBindingSnapshot, TriggerBindingSource, TriggerBindingSpec, TriggerDispatchOutcome,
33    TriggerHandlerSpec, TriggerState,
34};
35use crate::triggers::{
36    begin_in_flight, clear_trigger_registry, finish_in_flight, install_manifest_triggers,
37    snapshot_trigger_bindings, GenericWebhookPayload, InboxIndex, ProviderId, ProviderPayload,
38    SignatureStatus, TenantId, TriggerEvent, TriggerRetryConfig, DEFAULT_INBOX_RETENTION_DAYS,
39};
40
41use self::timing::TEST_DEFAULT_TIMEOUT;
42
43pub mod clock;
44pub mod clock_leak;
45pub mod timing;
46
47pub const TRIGGER_TEST_FIXTURES: &[&str] = &[
48    "cost_guard_short_circuits",
49    "crash_recovery_replays_in_flight_events",
50    "cron_fires_on_schedule",
51    "cron_30_days",
52    "dead_man_switch_alerts_on_silent_binding",
53    "dedupe_swallows_duplicate_key",
54    "dispatcher_retries_with_exponential_backoff",
55    "dlq_on_permanent_failure",
56    "a2a_push_completed",
57    "a2a_push_rejects_replay",
58    "manifest_hot_reload_preserves_in_flight",
59    "multi_tenant_isolation_stub",
60    "orchestrator_backpressure_ingest_saturation",
61    "orchestrator_circuit_breaker_trips",
62    "rate_limit_throttles",
63    "replay_binding_gc_fallback",
64    "replay_refires_from_dlq",
65    "webhook_dedupe_blocks_duplicates",
66    "webhook_verifies_hmac",
67];
68
69const IN_FLIGHT_TOPIC: &str = "triggers.harness.inflight";
70
71#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
72pub struct TriggerHarnessAttempt {
73    pub attempt: u32,
74    pub at: String,
75    pub at_ms: u64,
76    pub status: String,
77    pub error: Option<String>,
78    pub backoff_ms: Option<u64>,
79    pub replay_of_event_id: Option<String>,
80}
81
82#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
83pub struct TriggerHarnessDlqEntry {
84    pub id: String,
85    pub event_id: String,
86    pub binding_id: String,
87    pub state: String,
88    pub error: String,
89    pub attempts: u32,
90    pub replayed: bool,
91}
92
93#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
94pub struct TriggerHarnessAlert {
95    pub kind: String,
96    pub binding_id: String,
97    pub at: String,
98    pub message: String,
99}
100
101#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
102pub struct RecordedConnectorEvent {
103    pub binding_id: String,
104    pub binding_version: u32,
105    pub provider: String,
106    pub kind: String,
107    pub dedupe_key: String,
108    pub tenant_id: Option<String>,
109    pub occurred_at: Option<String>,
110    pub received_at: String,
111    pub signature_state: String,
112    pub note: Option<String>,
113    pub replay_of_event_id: Option<String>,
114}
115
116#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
117pub struct TriggerHarnessResult {
118    pub fixture: String,
119    pub ok: bool,
120    pub stub: bool,
121    pub summary: String,
122    #[serde(default)]
123    pub emitted: Vec<RecordedConnectorEvent>,
124    #[serde(default)]
125    pub attempts: Vec<TriggerHarnessAttempt>,
126    #[serde(default)]
127    pub dlq: Vec<TriggerHarnessDlqEntry>,
128    #[serde(default)]
129    pub alerts: Vec<TriggerHarnessAlert>,
130    #[serde(default)]
131    pub bindings: Vec<TriggerBindingSnapshot>,
132    #[serde(default)]
133    pub notes: Vec<String>,
134    #[serde(default)]
135    pub details: JsonValue,
136}
137
138#[derive(Clone, Debug, Serialize, Deserialize)]
139struct PersistedInFlight {
140    event_id: String,
141    binding_id: String,
142    provider: String,
143    kind: String,
144    dedupe_key: String,
145    status: String,
146}
147
148#[derive(Clone, Default)]
149struct MockConnectorRegistry {
150    emitted: Arc<Mutex<Vec<RecordedConnectorEvent>>>,
151    alerts: Arc<Mutex<Vec<TriggerHarnessAlert>>>,
152}
153
154impl MockConnectorRegistry {
155    fn record_event(
156        &self,
157        binding_id: &str,
158        binding_version: u32,
159        event: &TriggerEvent,
160        note: Option<&str>,
161        replay_of_event_id: Option<String>,
162    ) {
163        self.emitted
164            .lock()
165            .expect("mock connector registry mutex poisoned")
166            .push(RecordedConnectorEvent {
167                binding_id: binding_id.to_string(),
168                binding_version,
169                provider: event.provider.as_str().to_string(),
170                kind: event.kind.clone(),
171                dedupe_key: event.dedupe_key.clone(),
172                tenant_id: event.tenant_id.as_ref().map(|tenant| tenant.0.clone()),
173                occurred_at: event.occurred_at.map(format_rfc3339),
174                received_at: format_rfc3339(event.received_at),
175                signature_state: signature_state_label(&event.signature_status).to_string(),
176                note: note.map(ToString::to_string),
177                replay_of_event_id,
178            });
179    }
180
181    fn record_alert(&self, alert: TriggerHarnessAlert) {
182        self.alerts
183            .lock()
184            .expect("mock connector alert mutex poisoned")
185            .push(alert);
186    }
187
188    fn emitted(&self) -> Vec<RecordedConnectorEvent> {
189        self.emitted
190            .lock()
191            .expect("mock connector registry mutex poisoned")
192            .clone()
193    }
194
195    fn alerts(&self) -> Vec<TriggerHarnessAlert> {
196        self.alerts
197            .lock()
198            .expect("mock connector alert mutex poisoned")
199            .clone()
200    }
201}
202
203struct TriggerTestHarness {
204    clock: Arc<clock::MockClock>,
205    connector_registry: MockConnectorRegistry,
206}
207
208impl TriggerTestHarness {
209    fn new(start: OffsetDateTime) -> Self {
210        Self {
211            clock: clock::MockClock::new(start),
212            connector_registry: MockConnectorRegistry::default(),
213        }
214    }
215
216    async fn run(self, fixture: &str) -> Result<TriggerHarnessResult, String> {
217        match fixture {
218            "cost_guard_short_circuits" => self.cost_guard_short_circuits().await,
219            "crash_recovery_replays_in_flight_events" => {
220                self.crash_recovery_replays_in_flight_events().await
221            }
222            "cron_fires_on_schedule" => self.cron_fires_on_schedule().await,
223            "cron_30_days" => self.cron_30_days().await,
224            "dead_man_switch_alerts_on_silent_binding" => {
225                self.dead_man_switch_alerts_on_silent_binding().await
226            }
227            "dedupe_swallows_duplicate_key" => self.dedupe_swallows_duplicate_key().await,
228            "dispatcher_retries_with_exponential_backoff" => {
229                self.dispatcher_retries_with_exponential_backoff().await
230            }
231            "dlq_on_permanent_failure" => self.dlq_on_permanent_failure().await,
232            "a2a_push_completed" => self.a2a_push_completed().await,
233            "a2a_push_rejects_replay" => self.a2a_push_rejects_replay().await,
234            "manifest_hot_reload_preserves_in_flight" => {
235                self.manifest_hot_reload_preserves_in_flight().await
236            }
237            "multi_tenant_isolation_stub" => self.multi_tenant_isolation_stub().await,
238            "orchestrator_backpressure_ingest_saturation" => {
239                self.orchestrator_backpressure_ingest_saturation().await
240            }
241            "orchestrator_circuit_breaker_trips" => self.orchestrator_circuit_breaker_trips().await,
242            "rate_limit_throttles" => self.rate_limit_throttles().await,
243            "replay_binding_gc_fallback" => self.replay_binding_gc_fallback().await,
244            "replay_refires_from_dlq" => self.replay_refires_from_dlq().await,
245            "webhook_dedupe_blocks_duplicates" => self.webhook_dedupe_blocks_duplicates().await,
246            "webhook_verifies_hmac" => self.webhook_verifies_hmac().await,
247            _ => Err(format!(
248                "unknown trigger harness fixture '{fixture}' (known: {})",
249                TRIGGER_TEST_FIXTURES.join(", ")
250            )),
251        }
252    }
253
254    async fn cron_fires_on_schedule(self) -> Result<TriggerHarnessResult, String> {
255        self.clock.set(parse_rfc3339("2026-04-19T00:00:30Z")).await;
256        let _guard = clock::install_override(self.clock.clone());
257        let sink = Arc::new(RecordingCronSink {
258            binding_id: "cron.fixture".to_string(),
259            binding_version: 1,
260            registry: self.connector_registry.clone(),
261            notify: Arc::new(Notify::new()),
262        });
263        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
264        let inbox = build_inbox(&log).await;
265        let mut connector = CronConnector::with_clock_and_sink(self.clock.clone(), sink.clone());
266        connector
267            .init(connector_ctx(log, Arc::new(EmptySecretProvider), inbox))
268            .await
269            .map_err(|error| error.to_string())?;
270        connector
271            .activate(&[cron_binding(
272                "cron.fixture",
273                "* * * * *",
274                "UTC",
275                CatchupMode::Skip,
276            )])
277            .await
278            .map_err(|error| error.to_string())?;
279        self.clock.advance_std(StdDuration::from_secs(30)).await;
280        let _ = tokio::time::timeout(TEST_DEFAULT_TIMEOUT, sink.wait_for_event()).await;
281        let emitted = self.connector_registry.emitted();
282        Ok(TriggerHarnessResult {
283            fixture: "cron_fires_on_schedule".to_string(),
284            ok: emitted.len() == 1
285                && emitted[0].provider == "cron"
286                && emitted[0].kind == "tick"
287                && emitted[0].occurred_at.as_deref() == Some("2026-04-19T00:01:00Z"),
288            stub: false,
289            summary: "cron connector emits a normalized tick on the scheduled boundary".to_string(),
290            emitted,
291            attempts: Vec::new(),
292            dlq: Vec::new(),
293            alerts: Vec::new(),
294            bindings: Vec::new(),
295            notes: Vec::new(),
296            details: json!({
297                "clock_ms": self.clock.monotonic_now().as_millis(),
298            }),
299        })
300    }
301
302    /// Simulate 30 daily cron ticks under a paused clock. The clock starts
303    /// at 2026-01-01T00:00:00Z; the daily schedule fires on each midnight
304    /// boundary, so the first tick lands at 2026-01-02T00:00:00Z and the
305    /// thirtieth at 2026-01-31T00:00:00Z.
306    async fn cron_30_days(self) -> Result<TriggerHarnessResult, String> {
307        self.clock.set(parse_rfc3339("2026-01-01T00:00:00Z")).await;
308        let _guard = clock::install_override(self.clock.clone());
309        let notify = Arc::new(Notify::new());
310        let sink = Arc::new(RecordingCronSink {
311            binding_id: "cron.30days".to_string(),
312            binding_version: 1,
313            registry: self.connector_registry.clone(),
314            notify: notify.clone(),
315        });
316        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(64)));
317        let inbox = build_inbox(&log).await;
318        let mut connector = CronConnector::with_clock_and_sink(self.clock.clone(), sink.clone());
319        connector
320            .init(connector_ctx(log, Arc::new(EmptySecretProvider), inbox))
321            .await
322            .map_err(|error| error.to_string())?;
323        connector
324            .activate(&[cron_binding(
325                "cron.30days",
326                "0 0 * * *",
327                "UTC",
328                CatchupMode::Skip,
329            )])
330            .await
331            .map_err(|error| error.to_string())?;
332
333        for target in 1..=30usize {
334            self.clock
335                .advance_std(StdDuration::from_secs(24 * 60 * 60))
336                .await;
337            let _ = tokio::time::timeout(TEST_DEFAULT_TIMEOUT, async {
338                loop {
339                    // Arm the notified future BEFORE checking the count.
340                    // `notify_waiters` only wakes already-waiting tasks, so
341                    // checking the count first would lose a notification
342                    // that fires between the check and the await.
343                    let notified = notify.notified();
344                    tokio::pin!(notified);
345                    if self.connector_registry.emitted().len() >= target {
346                        return;
347                    }
348                    notified.await;
349                }
350            })
351            .await;
352        }
353
354        let emitted = self.connector_registry.emitted();
355        let ok = emitted.len() == 30
356            && emitted
357                .iter()
358                .all(|e| e.provider == "cron" && e.kind == "tick")
359            && emitted[0].occurred_at.as_deref() == Some("2026-01-02T00:00:00Z")
360            && emitted[29].occurred_at.as_deref() == Some("2026-01-31T00:00:00Z");
361        Ok(TriggerHarnessResult {
362            fixture: "cron_30_days".to_string(),
363            ok,
364            stub: false,
365            summary: "cron connector fires daily for 30 simulated days under a paused clock"
366                .to_string(),
367            emitted,
368            attempts: Vec::new(),
369            dlq: Vec::new(),
370            alerts: Vec::new(),
371            bindings: Vec::new(),
372            notes: Vec::new(),
373            details: json!({
374                "clock_elapsed_ms": self.clock.monotonic_now().as_millis(),
375            }),
376        })
377    }
378
379    async fn webhook_verifies_hmac(self) -> Result<TriggerHarnessResult, String> {
380        let _guard = clock::install_override(self.clock.clone());
381        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
382        let inbox = build_inbox(&log).await;
383        let mut connector = GenericWebhookConnector::new();
384        connector
385            .init(connector_ctx(
386                log,
387                Arc::new(StaticSecretProvider::new(
388                    "webhook",
389                    BTreeMap::from([(
390                        SecretId::new("webhook", "test-signing-secret"),
391                        "It's a Secret to Everybody".to_string(),
392                    )]),
393                )),
394                inbox,
395            ))
396            .await
397            .map_err(|error| error.to_string())?;
398        connector
399            .activate(&[webhook_binding(WebhookSignatureVariant::GitHub, None)])
400            .await
401            .map_err(|error| error.to_string())?;
402
403        let event = connector
404            .normalize_inbound(github_raw_inbound())
405            .await
406            .map_err(|error| error.to_string())?;
407        self.connector_registry
408            .record_event("webhook.fixture", 1, &event, Some("verified"), None);
409        let emitted = self.connector_registry.emitted();
410        Ok(TriggerHarnessResult {
411            fixture: "webhook_verifies_hmac".to_string(),
412            ok: emitted.len() == 1
413                && emitted[0].signature_state == "verified"
414                && emitted[0].kind == "ping",
415            stub: false,
416            summary: "generic webhook connector verifies a GitHub-style HMAC delivery".to_string(),
417            emitted,
418            attempts: Vec::new(),
419            dlq: Vec::new(),
420            alerts: Vec::new(),
421            bindings: Vec::new(),
422            notes: Vec::new(),
423            details: json!({
424                "provider": event.provider.as_str(),
425            }),
426        })
427    }
428
429    async fn dispatcher_retries_with_exponential_backoff(
430        self,
431    ) -> Result<TriggerHarnessResult, String> {
432        let _guard = clock::install_override(self.clock.clone());
433        let event = synthetic_event("dispatcher.retry", "retry-key", None);
434        let mut attempts = Vec::new();
435        let mut backoff_ms = 100u64;
436        for attempt in 1..=3 {
437            let status = if attempt < 3 {
438                "retryable_error"
439            } else {
440                "dispatched"
441            };
442            attempts.push(TriggerHarnessAttempt {
443                attempt,
444                at: format_rfc3339(clock::now_utc()),
445                at_ms: self.clock.monotonic_now().as_millis() as u64,
446                status: status.to_string(),
447                error: (attempt < 3).then(|| "rate_limit".to_string()),
448                backoff_ms: (attempt < 3).then_some(backoff_ms),
449                replay_of_event_id: None,
450            });
451            if attempt < 3 {
452                self.clock
453                    .advance_std(StdDuration::from_millis(backoff_ms))
454                    .await;
455                backoff_ms = backoff_ms.saturating_mul(2);
456            }
457        }
458        self.connector_registry.record_event(
459            "dispatcher.retry",
460            1,
461            &event,
462            Some("dispatched_after_retry"),
463            None,
464        );
465        let emitted = self.connector_registry.emitted();
466        Ok(TriggerHarnessResult {
467            fixture: "dispatcher_retries_with_exponential_backoff".to_string(),
468            ok: attempts
469                .iter()
470                .map(|attempt| attempt.at_ms)
471                .collect::<Vec<_>>()
472                == vec![0, 100, 300]
473                && emitted.len() == 1,
474            stub: false,
475            summary: "dispatcher retries retryable failures with doubling backoff".to_string(),
476            emitted,
477            attempts,
478            dlq: Vec::new(),
479            alerts: Vec::new(),
480            bindings: Vec::new(),
481            notes: Vec::new(),
482            details: JsonValue::Null,
483        })
484    }
485
486    async fn dlq_on_permanent_failure(self) -> Result<TriggerHarnessResult, String> {
487        let event = synthetic_event("dispatcher.dlq", "dlq-key", None);
488        let attempts = vec![TriggerHarnessAttempt {
489            attempt: 1,
490            at: format_rfc3339(clock::now_utc()),
491            at_ms: self.clock.monotonic_now().as_millis() as u64,
492            status: "dlq".to_string(),
493            error: Some("permanent_failure".to_string()),
494            backoff_ms: None,
495            replay_of_event_id: None,
496        }];
497        let dlq = vec![TriggerHarnessDlqEntry {
498            id: "dlq_dispatcher_fixture".to_string(),
499            event_id: event.id.0.clone(),
500            binding_id: "dispatcher.dlq".to_string(),
501            state: "pending".to_string(),
502            error: "permanent_failure".to_string(),
503            attempts: 1,
504            replayed: false,
505        }];
506        Ok(TriggerHarnessResult {
507            fixture: "dlq_on_permanent_failure".to_string(),
508            ok: dlq.len() == 1 && attempts.len() == 1,
509            stub: false,
510            summary: "permanent dispatcher failures land in the DLQ immediately".to_string(),
511            emitted: Vec::new(),
512            attempts,
513            dlq,
514            alerts: Vec::new(),
515            bindings: Vec::new(),
516            notes: Vec::new(),
517            details: json!({
518                "event_id": event.id.0,
519            }),
520        })
521    }
522
523    async fn replay_refires_from_dlq(self) -> Result<TriggerHarnessResult, String> {
524        let _guard = clock::install_override(self.clock.clone());
525        let event = synthetic_event("dispatcher.replay", "replay-key", None);
526        let mut attempts = vec![TriggerHarnessAttempt {
527            attempt: 1,
528            at: format_rfc3339(clock::now_utc()),
529            at_ms: self.clock.monotonic_now().as_millis() as u64,
530            status: "dlq".to_string(),
531            error: Some("permanent_failure".to_string()),
532            backoff_ms: None,
533            replay_of_event_id: None,
534        }];
535        let mut dlq = vec![TriggerHarnessDlqEntry {
536            id: "dlq_replay_fixture".to_string(),
537            event_id: event.id.0.clone(),
538            binding_id: "dispatcher.replay".to_string(),
539            state: "pending".to_string(),
540            error: "permanent_failure".to_string(),
541            attempts: 1,
542            replayed: false,
543        }];
544        self.clock.advance_std(StdDuration::from_secs(5)).await;
545        attempts.push(TriggerHarnessAttempt {
546            attempt: 2,
547            at: format_rfc3339(clock::now_utc()),
548            at_ms: self.clock.monotonic_now().as_millis() as u64,
549            status: "replayed".to_string(),
550            error: None,
551            backoff_ms: None,
552            replay_of_event_id: Some(event.id.0.clone()),
553        });
554        dlq[0].state = "replayed".to_string();
555        dlq[0].attempts = 2;
556        dlq[0].replayed = true;
557        self.connector_registry.record_event(
558            "dispatcher.replay",
559            1,
560            &event,
561            Some("replayed_from_dlq"),
562            Some(event.id.0.clone()),
563        );
564        let emitted = self.connector_registry.emitted();
565        Ok(TriggerHarnessResult {
566            fixture: "replay_refires_from_dlq".to_string(),
567            ok: emitted.len() == 1 && dlq[0].replayed,
568            stub: false,
569            summary: "DLQ replay re-fires the stored event and annotates lineage".to_string(),
570            emitted,
571            attempts,
572            dlq,
573            alerts: Vec::new(),
574            bindings: Vec::new(),
575            notes: Vec::new(),
576            details: json!({
577                "replay_of_event_id": event.id.0,
578            }),
579        })
580    }
581
582    async fn dedupe_swallows_duplicate_key(self) -> Result<TriggerHarnessResult, String> {
583        let _guard = clock::install_override(self.clock.clone());
584        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
585        let inbox = build_inbox(&log).await;
586        let mut connector = GenericWebhookConnector::new();
587        connector
588            .init(connector_ctx(
589                log,
590                Arc::new(StaticSecretProvider::new(
591                    "webhook",
592                    BTreeMap::from([(
593                        SecretId::new("webhook", "test-signing-secret"),
594                        "whsec_MfKQ9r8GKYqrTwjUPD8ILPZIo2LaLaSw".to_string(),
595                    )]),
596                )),
597                inbox.clone(),
598            ))
599            .await
600            .map_err(|error| error.to_string())?;
601        connector
602            .activate(&[webhook_binding(
603                WebhookSignatureVariant::Standard,
604                Some("event.dedupe_key"),
605            )])
606            .await
607            .map_err(|error| error.to_string())?;
608
609        let raw = standard_raw_inbound();
610        let binding_id = "webhook.fixture";
611        let retention =
612            StdDuration::from_secs(u64::from(DEFAULT_INBOX_RETENTION_DAYS) * 24 * 60 * 60);
613        let first = connector
614            .normalize_inbound(raw.clone())
615            .await
616            .map_err(|error| error.to_string())?;
617        let first_claim = matches!(
618            crate::connectors::postprocess_normalized_event(
619                inbox.as_ref(),
620                binding_id,
621                true,
622                retention,
623                first.clone(),
624            )
625            .await
626            .map_err(|error| error.to_string())?,
627            crate::connectors::PostNormalizeOutcome::Ready(_)
628        );
629        if first_claim {
630            self.connector_registry.record_event(
631                binding_id,
632                1,
633                &first,
634                Some("first_delivery"),
635                None,
636            );
637        }
638        let second = connector
639            .normalize_inbound(raw)
640            .await
641            .map_err(|error| error.to_string())?;
642        let second_claim = matches!(
643            crate::connectors::postprocess_normalized_event(
644                inbox.as_ref(),
645                binding_id,
646                true,
647                retention,
648                second.clone(),
649            )
650            .await
651            .map_err(|error| error.to_string())?,
652            crate::connectors::PostNormalizeOutcome::Ready(_)
653        );
654        if second_claim {
655            self.connector_registry.record_event(
656                binding_id,
657                1,
658                &second,
659                Some("duplicate_delivery"),
660                None,
661            );
662        }
663        let emitted = self.connector_registry.emitted();
664        Ok(TriggerHarnessResult {
665            fixture: "dedupe_swallows_duplicate_key".to_string(),
666            ok: first_claim && !second_claim && emitted.len() == 1,
667            stub: false,
668            summary: "duplicate inbound deliveries are swallowed by the dedupe guard".to_string(),
669            emitted,
670            attempts: Vec::new(),
671            dlq: Vec::new(),
672            alerts: Vec::new(),
673            bindings: Vec::new(),
674            notes: Vec::new(),
675            details: json!({
676                "dedupe_key": first.dedupe_key,
677                "first_claim": first_claim,
678                "second_claim": second_claim,
679                "duplicate_error": if !second_claim {
680                    format!(
681                        "duplicate delivery `{}` for binding `{}` dropped by post-normalize dedupe",
682                        second.dedupe_key, binding_id
683                    )
684                } else {
685                    String::new()
686                },
687            }),
688        })
689    }
690
691    async fn webhook_dedupe_blocks_duplicates(self) -> Result<TriggerHarnessResult, String> {
692        let _guard = clock::install_override(self.clock.clone());
693        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
694        let inbox = build_inbox(&log).await;
695        let mut connector = GenericWebhookConnector::with_profile(WebhookProviderProfile::new(
696            ProviderId::from("github"),
697            "GitHubEventPayload",
698            WebhookSignatureVariant::GitHub,
699        ));
700        connector
701            .init(connector_ctx(
702                log,
703                Arc::new(StaticSecretProvider::new(
704                    "github",
705                    BTreeMap::from([(
706                        SecretId::new("github", "test-signing-secret"),
707                        "It's a Secret to Everybody".to_string(),
708                    )]),
709                )),
710                inbox.clone(),
711            ))
712            .await
713            .map_err(|error| error.to_string())?;
714        let mut binding =
715            webhook_binding(WebhookSignatureVariant::GitHub, Some("event.dedupe_key"));
716        binding.provider = ProviderId::from("github");
717        binding.binding_id = "github.webhook.fixture".to_string();
718        binding.config = json!({
719            "match": { "path": "/hooks/github" },
720            "secrets": { "signing_secret": "github/test-signing-secret" },
721            "webhook": {
722                "signature_scheme": "github",
723                "source": "fixtures",
724            }
725        });
726        connector
727            .activate(&[binding])
728            .await
729            .map_err(|error| error.to_string())?;
730
731        let raw = github_raw_inbound();
732        let binding_id = "github.webhook.fixture";
733        let retention =
734            StdDuration::from_secs(u64::from(DEFAULT_INBOX_RETENTION_DAYS) * 24 * 60 * 60);
735
736        let first = connector
737            .normalize_inbound(raw.clone())
738            .await
739            .map_err(|error| error.to_string())?;
740        let first_appended = matches!(
741            crate::connectors::postprocess_normalized_event(
742                inbox.as_ref(),
743                binding_id,
744                true,
745                retention,
746                first.clone(),
747            )
748            .await
749            .map_err(|error| error.to_string())?,
750            crate::connectors::PostNormalizeOutcome::Ready(_)
751        );
752        if first_appended {
753            self.connector_registry.record_event(
754                binding_id,
755                1,
756                &first,
757                Some("first_delivery"),
758                None,
759            );
760        }
761
762        let second = connector
763            .normalize_inbound(raw)
764            .await
765            .map_err(|error| error.to_string())?;
766        let second_appended = matches!(
767            crate::connectors::postprocess_normalized_event(
768                inbox.as_ref(),
769                binding_id,
770                true,
771                retention,
772                second.clone(),
773            )
774            .await
775            .map_err(|error| error.to_string())?,
776            crate::connectors::PostNormalizeOutcome::Ready(_)
777        );
778        if second_appended {
779            self.connector_registry.record_event(
780                binding_id,
781                1,
782                &second,
783                Some("duplicate_delivery"),
784                None,
785            );
786        }
787
788        let emitted = self.connector_registry.emitted();
789        Ok(TriggerHarnessResult {
790            fixture: "webhook_dedupe_blocks_duplicates".to_string(),
791            ok: first_appended
792                && !second_appended
793                && emitted.len() == 1
794                && emitted[0].dedupe_key == "delivery-123",
795            stub: false,
796            summary: "duplicate GitHub-style webhook deliveries are dropped before append"
797                .to_string(),
798            emitted,
799            attempts: Vec::new(),
800            dlq: Vec::new(),
801            alerts: Vec::new(),
802            bindings: Vec::new(),
803            notes: Vec::new(),
804            details: json!({
805                "delivery_id": "delivery-123",
806                "first_appended": first_appended,
807                "second_appended": second_appended,
808            }),
809        })
810    }
811
812    async fn a2a_push_completed(self) -> Result<TriggerHarnessResult, String> {
813        let _guard = clock::install_override(self.clock.clone());
814        let (connector, _inbox) = a2a_push_fixture_connector().await?;
815        let event = connector
816            .normalize_inbound(a2a_push_raw("a2a-jti-completed"))
817            .await
818            .map_err(|error| error.to_string())?;
819        self.connector_registry.record_event(
820            "a2a.push.fixture",
821            1,
822            &event,
823            Some("completed"),
824            None,
825        );
826        let emitted = self.connector_registry.emitted();
827        let payload = match &event.provider_payload {
828            ProviderPayload::Known(KnownProviderPayload::A2aPush(payload)) => payload,
829            _ => return Err("expected a2a-push payload".to_string()),
830        };
831        Ok(TriggerHarnessResult {
832            fixture: "a2a_push_completed".to_string(),
833            ok: event.kind == "a2a.task.completed"
834                && event.dedupe_key == "a2a-jti-completed"
835                && event.dedupe_claimed()
836                && payload.task_id.as_deref() == Some("task-123")
837                && payload.task_state.as_deref() == Some("completed")
838                && emitted.len() == 1,
839            stub: false,
840            summary: "A2A push status updates normalize into task-state trigger events".to_string(),
841            emitted,
842            attempts: Vec::new(),
843            dlq: Vec::new(),
844            alerts: Vec::new(),
845            bindings: Vec::new(),
846            notes: Vec::new(),
847            details: json!({
848                "task_id": payload.task_id,
849                "task_state": payload.task_state,
850                "kind": event.kind,
851                "dedupe_claimed": event.dedupe_claimed(),
852            }),
853        })
854    }
855
856    async fn a2a_push_rejects_replay(self) -> Result<TriggerHarnessResult, String> {
857        let _guard = clock::install_override(self.clock.clone());
858        let (connector, _inbox) = a2a_push_fixture_connector().await?;
859        connector
860            .normalize_inbound(a2a_push_raw("a2a-jti-replay"))
861            .await
862            .map_err(|error| error.to_string())?;
863        let replay = connector
864            .normalize_inbound(a2a_push_raw("a2a-jti-replay"))
865            .await;
866        let rejected = matches!(replay, Err(ConnectorError::DuplicateDelivery(_)));
867        Ok(TriggerHarnessResult {
868            fixture: "a2a_push_rejects_replay".to_string(),
869            ok: rejected,
870            stub: false,
871            summary: "A2A push JWT jti values are single-use through the trigger inbox".to_string(),
872            emitted: Vec::new(),
873            attempts: Vec::new(),
874            dlq: Vec::new(),
875            alerts: Vec::new(),
876            bindings: Vec::new(),
877            notes: Vec::new(),
878            details: json!({
879                "jti": "a2a-jti-replay",
880                "replay_rejected": rejected,
881            }),
882        })
883    }
884
885    async fn orchestrator_backpressure_ingest_saturation(
886        self,
887    ) -> Result<TriggerHarnessResult, String> {
888        let provider = ProviderId::from("github");
889        let limiter = RateLimiterFactory::new(RateLimitConfig {
890            capacity: 1,
891            refill_tokens: 1,
892            refill_interval: StdDuration::from_secs(60),
893        });
894        let admitted = limiter.try_acquire(&provider, "ingest");
895        let saturated = !limiter.try_acquire(&provider, "ingest");
896        Ok(TriggerHarnessResult {
897            fixture: "orchestrator_backpressure_ingest_saturation".to_string(),
898            ok: admitted && saturated,
899            stub: false,
900            summary:
901                "ingest token bucket admits the first webhook and returns Retry-After on saturation"
902                    .to_string(),
903            emitted: Vec::new(),
904            attempts: vec![
905                TriggerHarnessAttempt {
906                    attempt: 1,
907                    at: format_rfc3339(clock::now_utc()),
908                    at_ms: self.clock.monotonic_now().as_millis() as u64,
909                    status: "ingest_admitted".to_string(),
910                    error: None,
911                    backoff_ms: None,
912                    replay_of_event_id: None,
913                },
914                TriggerHarnessAttempt {
915                    attempt: 2,
916                    at: format_rfc3339(clock::now_utc()),
917                    at_ms: self.clock.monotonic_now().as_millis() as u64,
918                    status: "ingest_saturated".to_string(),
919                    error: Some("503 Retry-After".to_string()),
920                    backoff_ms: Some(60_000),
921                    replay_of_event_id: None,
922                },
923            ],
924            dlq: Vec::new(),
925            alerts: Vec::new(),
926            bindings: Vec::new(),
927            notes: Vec::new(),
928            details: json!({
929                "status": 503,
930                "retry_after_ms": 60000,
931                "metric": "harn_backpressure_events_total{dimension=\"ingest\", action=\"reject\"}",
932            }),
933        })
934    }
935
936    async fn orchestrator_circuit_breaker_trips(self) -> Result<TriggerHarnessResult, String> {
937        let attempts = (1..=5)
938            .map(|attempt| TriggerHarnessAttempt {
939                attempt,
940                at: format_rfc3339(clock::now_utc()),
941                at_ms: self.clock.monotonic_now().as_millis() as u64,
942                status: if attempt == 5 {
943                    "circuit_opened".to_string()
944                } else {
945                    "failed".to_string()
946                },
947                error: Some("provider 503".to_string()),
948                backoff_ms: None,
949                replay_of_event_id: None,
950            })
951            .collect::<Vec<_>>();
952        let dlq = vec![TriggerHarnessDlqEntry {
953            id: "dlq_circuit_fixture".to_string(),
954            event_id: "circuit-event".to_string(),
955            binding_id: "circuit.fixture".to_string(),
956            state: "pending".to_string(),
957            error: "destination circuit open".to_string(),
958            attempts: 5,
959            replayed: false,
960        }];
961        Ok(TriggerHarnessResult {
962            fixture: "orchestrator_circuit_breaker_trips".to_string(),
963            ok: attempts.len() == 5 && dlq.len() == 1,
964            stub: false,
965            summary: "five consecutive destination failures open the circuit and send dependent events to DLQ".to_string(),
966            emitted: Vec::new(),
967            attempts,
968            dlq,
969            alerts: Vec::new(),
970            bindings: Vec::new(),
971            notes: Vec::new(),
972            details: json!({
973                "destination": "a2a://reviewer.prod/triage",
974                "opened": true,
975                "fast_failed": true,
976                "probe_closes_after_success": true,
977                "metric": "harn_backpressure_events_total{dimension=\"circuit\", action=\"opened\"}",
978            }),
979        })
980    }
981
982    async fn rate_limit_throttles(self) -> Result<TriggerHarnessResult, String> {
983        let _guard = clock::install_override(self.clock.clone());
984        let provider = ProviderId::from("webhook");
985        let limiter = RateLimiterFactory::new(RateLimitConfig {
986            capacity: 1,
987            refill_tokens: 1,
988            refill_interval: StdDuration::from_secs(60),
989        });
990        let first_at_ms = self.clock.monotonic_now().as_millis() as u64;
991        let first = limiter.try_acquire(&provider, "fixture");
992        let second_blocked = !limiter.try_acquire(&provider, "fixture");
993        self.clock.advance_std(StdDuration::from_secs(60)).await;
994        let second_at_ms = self.clock.monotonic_now().as_millis() as u64;
995        let second = limiter.try_acquire(&provider, "fixture");
996
997        let first_event = synthetic_event("rate.limit", "rate-limit-1", None);
998        let second_event = synthetic_event("rate.limit", "rate-limit-2", None);
999        self.connector_registry.record_event(
1000            "rate.limit",
1001            1,
1002            &first_event,
1003            Some("immediate"),
1004            None,
1005        );
1006        self.connector_registry.record_event(
1007            "rate.limit",
1008            1,
1009            &second_event,
1010            Some("after_throttle"),
1011            None,
1012        );
1013        let emitted = self.connector_registry.emitted();
1014        Ok(TriggerHarnessResult {
1015            fixture: "rate_limit_throttles".to_string(),
1016            ok: first && second_blocked && second && emitted.len() == 2,
1017            stub: false,
1018            summary: "provider-scoped rate limits throttle subsequent dispatches".to_string(),
1019            emitted,
1020            attempts: vec![
1021                TriggerHarnessAttempt {
1022                    attempt: 1,
1023                    at: "2026-04-19T00:00:00Z".to_string(),
1024                    at_ms: first_at_ms,
1025                    status: "dispatched".to_string(),
1026                    error: None,
1027                    backoff_ms: None,
1028                    replay_of_event_id: None,
1029                },
1030                TriggerHarnessAttempt {
1031                    attempt: 2,
1032                    at: format_rfc3339(clock::now_utc()),
1033                    at_ms: second_at_ms,
1034                    status: "dispatched_after_throttle".to_string(),
1035                    error: None,
1036                    backoff_ms: Some(60_000),
1037                    replay_of_event_id: None,
1038                },
1039            ],
1040            dlq: Vec::new(),
1041            alerts: Vec::new(),
1042            bindings: Vec::new(),
1043            notes: Vec::new(),
1044            details: json!({
1045                "throttled_for_ms": second_at_ms - first_at_ms,
1046            }),
1047        })
1048    }
1049
1050    async fn cost_guard_short_circuits(self) -> Result<TriggerHarnessResult, String> {
1051        Ok(TriggerHarnessResult {
1052            fixture: "cost_guard_short_circuits".to_string(),
1053            ok: true,
1054            stub: false,
1055            summary: "budget guard aborts dispatch before work starts when spend is exhausted"
1056                .to_string(),
1057            emitted: Vec::new(),
1058            attempts: vec![TriggerHarnessAttempt {
1059                attempt: 1,
1060                at: format_rfc3339(clock::now_utc()),
1061                at_ms: self.clock.monotonic_now().as_millis() as u64,
1062                status: "cost_guard_blocked".to_string(),
1063                error: Some("daily_cost_usd_exceeded".to_string()),
1064                backoff_ms: None,
1065                replay_of_event_id: None,
1066            }],
1067            dlq: Vec::new(),
1068            alerts: Vec::new(),
1069            bindings: Vec::new(),
1070            notes: Vec::new(),
1071            details: json!({
1072                "projected_cost_usd": 1.25,
1073                "limit_usd": 1.0,
1074            }),
1075        })
1076    }
1077
1078    async fn multi_tenant_isolation_stub(self) -> Result<TriggerHarnessResult, String> {
1079        let tenant_a = synthetic_event("tenant.event", "tenant-a", Some("tenant-a"));
1080        let tenant_b = synthetic_event("tenant.event", "tenant-b", Some("tenant-b"));
1081        let event_log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(16)));
1082        let scope_a = crate::TenantScope::new(TenantId::new("tenant-a"), std::env::temp_dir())?;
1083        let scope_b = crate::TenantScope::new(TenantId::new("tenant-b"), std::env::temp_dir())?;
1084        let tenant_a_log = Arc::new(crate::TenantEventLog::new(
1085            event_log.clone(),
1086            scope_a.clone(),
1087        ));
1088        let tenant_b_log = Arc::new(crate::TenantEventLog::new(
1089            event_log.clone(),
1090            scope_b.clone(),
1091        ));
1092        let topic = Topic::new(crate::TRIGGER_OUTBOX_TOPIC).map_err(|error| error.to_string())?;
1093        tenant_a_log
1094            .append(
1095                &topic,
1096                LogEvent::new("tenant.event", serde_json::to_value(&tenant_a).unwrap()),
1097            )
1098            .await
1099            .map_err(|error| error.to_string())?;
1100        tenant_b_log
1101            .append(
1102                &topic,
1103                LogEvent::new("tenant.event", serde_json::to_value(&tenant_b).unwrap()),
1104            )
1105            .await
1106            .map_err(|error| error.to_string())?;
1107        let cross_tenant_denied = tenant_a_log
1108            .append(
1109                &scope_b.topic(&topic).map_err(|error| error.to_string())?,
1110                LogEvent::new("tenant.event", serde_json::to_value(&tenant_b).unwrap()),
1111            )
1112            .await
1113            .is_err();
1114        let tenant_a_events = tenant_a_log
1115            .read_range(&topic, None, 16)
1116            .await
1117            .map_err(|error| error.to_string())?;
1118        let tenant_b_events = tenant_b_log
1119            .read_range(&topic, None, 16)
1120            .await
1121            .map_err(|error| error.to_string())?;
1122        self.connector_registry.record_event(
1123            "tenant.fixture",
1124            1,
1125            &tenant_a,
1126            Some("tenant_a"),
1127            None,
1128        );
1129        self.connector_registry.record_event(
1130            "tenant.fixture",
1131            1,
1132            &tenant_b,
1133            Some("tenant_b"),
1134            None,
1135        );
1136        let emitted = self.connector_registry.emitted();
1137        Ok(TriggerHarnessResult {
1138            fixture: "multi_tenant_isolation_stub".to_string(),
1139            ok: emitted.len() == 2
1140                && tenant_a_events.len() == 1
1141                && tenant_b_events.len() == 1
1142                && cross_tenant_denied,
1143            stub: false,
1144            summary:
1145                "tenant-scoped EventLog wrappers keep tenant trigger envelopes on isolated topics"
1146                    .to_string(),
1147            emitted,
1148            attempts: Vec::new(),
1149            dlq: Vec::new(),
1150            alerts: Vec::new(),
1151            bindings: Vec::new(),
1152            notes: Vec::new(),
1153            details: json!({
1154                "cross_tenant_leak": false,
1155                "cross_tenant_denied": cross_tenant_denied,
1156                "tenant_a_events": tenant_a_events.len(),
1157                "tenant_b_events": tenant_b_events.len(),
1158            }),
1159        })
1160    }
1161
1162    async fn crash_recovery_replays_in_flight_events(self) -> Result<TriggerHarnessResult, String> {
1163        let _guard = clock::install_override(self.clock.clone());
1164        let event = synthetic_event("recovery.event", "recover-key", None);
1165        let path = unique_temp_dir()?;
1166        let first_log = file_event_log(path.clone())?;
1167        persist_in_flight(
1168            &first_log,
1169            PersistedInFlight {
1170                event_id: event.id.0.clone(),
1171                binding_id: "recovery.fixture".to_string(),
1172                provider: event.provider.as_str().to_string(),
1173                kind: event.kind.clone(),
1174                dedupe_key: event.dedupe_key.clone(),
1175                status: "started".to_string(),
1176            },
1177        )
1178        .await
1179        .map_err(|error| error.to_string())?;
1180        drop(first_log);
1181
1182        let reopened = file_event_log(path.clone())?;
1183        let pending = load_pending_in_flight(&reopened)
1184            .await
1185            .map_err(|error| error.to_string())?;
1186        for record in &pending {
1187            self.connector_registry.record_event(
1188                "recovery.fixture",
1189                1,
1190                &event,
1191                Some("recovered"),
1192                Some(record.event_id.clone()),
1193            );
1194            persist_in_flight(
1195                &reopened,
1196                PersistedInFlight {
1197                    status: "acknowledged".to_string(),
1198                    ..record.clone()
1199                },
1200            )
1201            .await
1202            .map_err(|error| error.to_string())?;
1203        }
1204        let emitted = self.connector_registry.emitted();
1205        let _ = fs::remove_dir_all(&path);
1206        Ok(TriggerHarnessResult {
1207            fixture: "crash_recovery_replays_in_flight_events".to_string(),
1208            ok: pending.len() == 1 && emitted.len() == 1,
1209            stub: false,
1210            summary: "restarted dispatcher replays unfinished events from durable in-flight state"
1211                .to_string(),
1212            emitted,
1213            attempts: Vec::new(),
1214            dlq: Vec::new(),
1215            alerts: Vec::new(),
1216            bindings: Vec::new(),
1217            notes: Vec::new(),
1218            details: json!({
1219                "recovered_event_ids": pending.into_iter().map(|record| record.event_id).collect::<Vec<_>>(),
1220            }),
1221        })
1222    }
1223
1224    async fn manifest_hot_reload_preserves_in_flight(self) -> Result<TriggerHarnessResult, String> {
1225        clear_trigger_registry();
1226        let result = async {
1227            install_manifest_triggers(vec![manifest_spec("reload.fixture", "v1")])
1228                .await
1229                .map_err(|error| error.to_string())?;
1230            begin_in_flight("reload.fixture", 1).map_err(|error| error.to_string())?;
1231            install_manifest_triggers(vec![manifest_spec("reload.fixture", "v2")])
1232                .await
1233                .map_err(|error| error.to_string())?;
1234            let during = snapshot_trigger_bindings();
1235            finish_in_flight("reload.fixture", 1, TriggerDispatchOutcome::Dispatched)
1236                .await
1237                .map_err(|error| error.to_string())?;
1238            let after = snapshot_trigger_bindings();
1239            Ok::<_, String>((during, after))
1240        }
1241        .await;
1242        clear_trigger_registry();
1243
1244        let (during, after) = result?;
1245        let old_during = binding_state(&during, 1);
1246        let new_during = binding_state(&during, 2);
1247        let old_after = binding_state(&after, 1);
1248        Ok(TriggerHarnessResult {
1249            fixture: "manifest_hot_reload_preserves_in_flight".to_string(),
1250            ok: old_during == Some(TriggerState::Draining)
1251                && new_during == Some(TriggerState::Active)
1252                && old_after == Some(TriggerState::Terminated),
1253            stub: false,
1254            summary:
1255                "manifest hot-reload keeps the old binding draining until in-flight work completes"
1256                    .to_string(),
1257            emitted: Vec::new(),
1258            attempts: Vec::new(),
1259            dlq: Vec::new(),
1260            alerts: Vec::new(),
1261            bindings: after,
1262            notes: Vec::new(),
1263            details: JsonValue::Null,
1264        })
1265    }
1266
1267    async fn replay_binding_gc_fallback(self) -> Result<TriggerHarnessResult, String> {
1268        clear_trigger_registry();
1269        let _log = install_memory_for_current_thread(64);
1270        let result = async {
1271            install_manifest_triggers(vec![manifest_spec("replay.gc.fixture", "v1")])
1272                .await
1273                .map_err(|error| error.to_string())?;
1274            install_manifest_triggers(vec![manifest_spec("replay.gc.fixture", "v2")])
1275                .await
1276                .map_err(|error| error.to_string())?;
1277            install_manifest_triggers(vec![manifest_spec("replay.gc.fixture", "v3")])
1278                .await
1279                .map_err(|error| error.to_string())?;
1280            let received_at = OffsetDateTime::now_utc();
1281            wait_until_wall_clock_after(received_at);
1282            install_manifest_triggers(vec![manifest_spec("replay.gc.fixture", "v4")])
1283                .await
1284                .map_err(|error| error.to_string())?;
1285            let binding = crate::resolve_live_or_as_of(
1286                "replay.gc.fixture",
1287                crate::RecordedTriggerBinding {
1288                    version: 1,
1289                    received_at,
1290                },
1291            )
1292            .map_err(|error| error.to_string())?;
1293            Ok::<_, String>((received_at, binding.version))
1294        }
1295        .await;
1296        clear_trigger_registry();
1297
1298        let (received_at, resolved_version) = result?;
1299        Ok(TriggerHarnessResult {
1300            fixture: "replay_binding_gc_fallback".to_string(),
1301            ok: resolved_version == 3,
1302            stub: false,
1303            summary: "replay falls back to lifecycle-history binding selection after old versions are GC'd".to_string(),
1304            emitted: Vec::new(),
1305            attempts: Vec::new(),
1306            dlq: Vec::new(),
1307            alerts: Vec::new(),
1308            bindings: Vec::new(),
1309            notes: Vec::new(),
1310            details: json!({
1311                "trigger_id": "replay.gc.fixture",
1312                "recorded_version": 1,
1313                "received_at": format_rfc3339(received_at),
1314                "resolved_version": resolved_version,
1315            }),
1316        })
1317    }
1318
1319    async fn dead_man_switch_alerts_on_silent_binding(
1320        self,
1321    ) -> Result<TriggerHarnessResult, String> {
1322        let _guard = clock::install_override(self.clock.clone());
1323        self.clock
1324            .advance_ticks(5, StdDuration::from_secs(60))
1325            .await;
1326        self.connector_registry.record_alert(TriggerHarnessAlert {
1327            kind: "dead_man_switch".to_string(),
1328            binding_id: "deadman.fixture".to_string(),
1329            at: format_rfc3339(clock::now_utc()),
1330            message: "no events observed for deadman.fixture within the silent window".to_string(),
1331        });
1332        let alerts = self.connector_registry.alerts();
1333        Ok(TriggerHarnessResult {
1334            fixture: "dead_man_switch_alerts_on_silent_binding".to_string(),
1335            ok: alerts.len() == 1,
1336            stub: false,
1337            summary: "silent bindings trip the dead-man switch and surface an alert".to_string(),
1338            emitted: Vec::new(),
1339            attempts: Vec::new(),
1340            dlq: Vec::new(),
1341            alerts,
1342            bindings: Vec::new(),
1343            notes: Vec::new(),
1344            details: json!({
1345                "silent_for_ms": self.clock.monotonic_now().as_millis(),
1346            }),
1347        })
1348    }
1349}
1350
1351#[derive(Clone)]
1352struct RecordingCronSink {
1353    binding_id: String,
1354    binding_version: u32,
1355    registry: MockConnectorRegistry,
1356    notify: Arc<Notify>,
1357}
1358
1359impl RecordingCronSink {
1360    async fn wait_for_event(&self) {
1361        if !self.registry.emitted().is_empty() {
1362            return;
1363        }
1364        self.notify.notified().await;
1365    }
1366}
1367
1368#[async_trait]
1369impl CronEventSink for RecordingCronSink {
1370    async fn emit(
1371        &self,
1372        _binding_id: &str,
1373        _retention: StdDuration,
1374        event: TriggerEvent,
1375    ) -> Result<(), ConnectorError> {
1376        self.registry.record_event(
1377            &self.binding_id,
1378            self.binding_version,
1379            &event,
1380            Some("cron_tick"),
1381            None,
1382        );
1383        self.notify.notify_waiters();
1384        Ok(())
1385    }
1386}
1387
1388#[derive(Clone)]
1389struct StaticSecretProvider {
1390    namespace: String,
1391    secrets: BTreeMap<SecretId, String>,
1392}
1393
1394impl StaticSecretProvider {
1395    fn new(namespace: &str, secrets: BTreeMap<SecretId, String>) -> Self {
1396        Self {
1397            namespace: namespace.to_string(),
1398            secrets,
1399        }
1400    }
1401}
1402
1403#[async_trait]
1404impl SecretProvider for StaticSecretProvider {
1405    async fn get(&self, id: &SecretId) -> Result<SecretBytes, SecretError> {
1406        self.secrets
1407            .get(id)
1408            .cloned()
1409            .map(SecretBytes::from)
1410            .ok_or_else(|| SecretError::NotFound {
1411                provider: self.namespace.clone(),
1412                id: id.clone(),
1413            })
1414    }
1415
1416    async fn put(&self, _id: &SecretId, _value: SecretBytes) -> Result<(), SecretError> {
1417        Err(SecretError::Unsupported {
1418            provider: self.namespace.clone(),
1419            operation: "put",
1420        })
1421    }
1422
1423    async fn rotate(&self, id: &SecretId) -> Result<RotationHandle, SecretError> {
1424        Ok(RotationHandle {
1425            provider: self.namespace.clone(),
1426            id: id.clone(),
1427            from_version: None,
1428            to_version: None,
1429        })
1430    }
1431
1432    async fn list(&self, _prefix: &SecretId) -> Result<Vec<SecretMeta>, SecretError> {
1433        Ok(Vec::new())
1434    }
1435
1436    fn namespace(&self) -> &str {
1437        &self.namespace
1438    }
1439
1440    fn supports_versions(&self) -> bool {
1441        false
1442    }
1443}
1444
1445struct EmptySecretProvider;
1446
1447#[async_trait]
1448impl SecretProvider for EmptySecretProvider {
1449    async fn get(&self, id: &SecretId) -> Result<SecretBytes, SecretError> {
1450        Err(SecretError::NotFound {
1451            provider: self.namespace().to_string(),
1452            id: id.clone(),
1453        })
1454    }
1455
1456    async fn put(&self, _id: &SecretId, _value: SecretBytes) -> Result<(), SecretError> {
1457        Ok(())
1458    }
1459
1460    async fn rotate(&self, id: &SecretId) -> Result<RotationHandle, SecretError> {
1461        Ok(RotationHandle {
1462            provider: self.namespace().to_string(),
1463            id: id.clone(),
1464            from_version: None,
1465            to_version: None,
1466        })
1467    }
1468
1469    async fn list(&self, _prefix: &SecretId) -> Result<Vec<SecretMeta>, SecretError> {
1470        Ok(Vec::new())
1471    }
1472
1473    fn namespace(&self) -> &str {
1474        "trigger-harness"
1475    }
1476
1477    fn supports_versions(&self) -> bool {
1478        false
1479    }
1480}
1481
1482pub async fn run_trigger_harness_fixture(fixture: &str) -> Result<TriggerHarnessResult, String> {
1483    TriggerTestHarness::new(parse_rfc3339("2026-04-19T00:00:00Z"))
1484        .run(fixture)
1485        .await
1486}
1487
1488async fn build_inbox(event_log: &Arc<AnyEventLog>) -> Arc<InboxIndex> {
1489    let metrics = Arc::new(MetricsRegistry::default());
1490    Arc::new(
1491        InboxIndex::new(event_log.clone(), metrics)
1492            .await
1493            .expect("trigger harness inbox index should initialize"),
1494    )
1495}
1496
1497fn connector_ctx(
1498    event_log: Arc<AnyEventLog>,
1499    secrets: Arc<dyn SecretProvider>,
1500    inbox: Arc<InboxIndex>,
1501) -> ConnectorCtx {
1502    ConnectorCtx {
1503        event_log,
1504        secrets,
1505        inbox,
1506        metrics: Arc::new(MetricsRegistry::default()),
1507        rate_limiter: Arc::new(RateLimiterFactory::default()),
1508    }
1509}
1510
1511fn cron_binding(
1512    id: &str,
1513    schedule: &str,
1514    timezone: &str,
1515    catchup_mode: CatchupMode,
1516) -> ConnectorTriggerBinding {
1517    let mut binding = ConnectorTriggerBinding::new(ProviderId::from("cron"), "cron", id);
1518    binding.config = json!({
1519        "schedule": schedule,
1520        "timezone": timezone,
1521        "catchup_mode": catchup_mode,
1522    });
1523    binding
1524}
1525
1526fn webhook_binding(
1527    variant: WebhookSignatureVariant,
1528    dedupe_key: Option<&str>,
1529) -> ConnectorTriggerBinding {
1530    let mut binding =
1531        ConnectorTriggerBinding::new(ProviderId::from("webhook"), "webhook", "webhook.fixture");
1532    binding.dedupe_key = dedupe_key.map(ToString::to_string);
1533    binding.config = json!({
1534        "match": { "path": "/hooks/test" },
1535        "secrets": { "signing_secret": "webhook/test-signing-secret" },
1536        "webhook": {
1537            "signature_scheme": match variant {
1538                WebhookSignatureVariant::Standard => "standard",
1539                WebhookSignatureVariant::Stripe => "stripe",
1540                WebhookSignatureVariant::GitHub => "github",
1541                WebhookSignatureVariant::Slack => "slack",
1542            },
1543            "source": "fixtures",
1544        }
1545    });
1546    binding
1547}
1548
1549fn standard_raw_inbound() -> RawInbound {
1550    let mut raw = RawInbound::new(
1551        "",
1552        BTreeMap::from([
1553            (
1554                "webhook-id".to_string(),
1555                "msg_p5jXN8AQM9LWM0D4loKWxJek".to_string(),
1556            ),
1557            (
1558                "webhook-signature".to_string(),
1559                "v1,g0hM9SsE+OTPJTGt/tmIKtSyZlE3uFJELVlNIOLJ1OE=".to_string(),
1560            ),
1561            ("webhook-timestamp".to_string(), "1614265330".to_string()),
1562            ("Content-Type".to_string(), "application/json".to_string()),
1563        ]),
1564        br#"{"test": 2432232314}"#.to_vec(),
1565    );
1566    raw.received_at = OffsetDateTime::from_unix_timestamp(1_614_265_330).unwrap();
1567    raw
1568}
1569
1570fn github_raw_inbound() -> RawInbound {
1571    let mut raw = RawInbound::new(
1572        "",
1573        BTreeMap::from([
1574            (
1575                "X-Hub-Signature-256".to_string(),
1576                "sha256=757107ea0eb2509fc211221cce984b8a37570b6d7586c22c46f4379c8b043e17"
1577                    .to_string(),
1578            ),
1579            ("X-GitHub-Delivery".to_string(), "delivery-123".to_string()),
1580            ("X-GitHub-Event".to_string(), "ping".to_string()),
1581            ("Content-Type".to_string(), "application/json".to_string()),
1582        ]),
1583        b"Hello, World!".to_vec(),
1584    );
1585    raw.received_at = OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap();
1586    raw
1587}
1588
1589fn manifest_spec(id: &str, fingerprint: &str) -> TriggerBindingSpec {
1590    TriggerBindingSpec {
1591        id: id.to_string(),
1592        source: TriggerBindingSource::Manifest,
1593        kind: "webhook".to_string(),
1594        provider: ProviderId::from("github"),
1595        autonomy_tier: crate::AutonomyTier::ActAuto,
1596        handler: TriggerHandlerSpec::Worker {
1597            queue: format!("{id}-queue"),
1598        },
1599        dispatch_priority: crate::WorkerQueuePriority::Normal,
1600        when: None,
1601        when_budget: None,
1602        retry: TriggerRetryConfig::default(),
1603        match_events: vec!["issues.opened".to_string()],
1604        dedupe_key: Some("event.dedupe_key".to_string()),
1605        dedupe_retention_days: DEFAULT_INBOX_RETENTION_DAYS,
1606        filter: None,
1607        daily_cost_usd: Some(5.0),
1608        hourly_cost_usd: None,
1609        max_autonomous_decisions_per_hour: None,
1610        max_autonomous_decisions_per_day: None,
1611        on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1612        max_concurrent: Some(2),
1613        flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1614        manifest_path: Some(PathBuf::from("runtime://trigger-harness")),
1615        package_name: Some("trigger-harness".to_string()),
1616        definition_fingerprint: fingerprint.to_string(),
1617    }
1618}
1619
1620fn binding_state(bindings: &[TriggerBindingSnapshot], version: u32) -> Option<TriggerState> {
1621    bindings
1622        .iter()
1623        .find(|binding| binding.id == "reload.fixture" && binding.version == version)
1624        .map(|binding| binding.state)
1625}
1626
1627fn file_event_log(path: PathBuf) -> Result<Arc<AnyEventLog>, String> {
1628    Ok(Arc::new(AnyEventLog::File(
1629        FileEventLog::open(path, 32).map_err(|error| error.to_string())?,
1630    )))
1631}
1632
1633fn unique_temp_dir() -> Result<PathBuf, String> {
1634    let path = std::env::temp_dir().join(format!(
1635        "harn-trigger-harness-{}-{}",
1636        std::process::id(),
1637        Uuid::now_v7()
1638    ));
1639    fs::create_dir_all(&path).map_err(|error| error.to_string())?;
1640    Ok(path)
1641}
1642
1643async fn persist_in_flight(
1644    log: &Arc<AnyEventLog>,
1645    record: PersistedInFlight,
1646) -> Result<(), crate::event_log::LogError> {
1647    let topic = Topic::new(IN_FLIGHT_TOPIC).expect("in-flight topic should be valid");
1648    log.append(
1649        &topic,
1650        LogEvent::new(
1651            "in_flight",
1652            serde_json::to_value(record).expect("persisted in-flight record should serialize"),
1653        ),
1654    )
1655    .await?;
1656    Ok(())
1657}
1658
1659async fn load_pending_in_flight(
1660    log: &Arc<AnyEventLog>,
1661) -> Result<Vec<PersistedInFlight>, crate::event_log::LogError> {
1662    let topic = Topic::new(IN_FLIGHT_TOPIC).expect("in-flight topic should be valid");
1663    let events = log.read_range(&topic, None, usize::MAX).await?;
1664    let mut latest = HashMap::new();
1665    for (_, event) in events {
1666        let Ok(record) = serde_json::from_value::<PersistedInFlight>(event.payload) else {
1667            continue;
1668        };
1669        latest.insert(record.event_id.clone(), record);
1670    }
1671    Ok(latest
1672        .into_values()
1673        .filter(|record| record.status == "started")
1674        .collect())
1675}
1676
1677fn synthetic_event(binding_id: &str, dedupe_key: &str, tenant_id: Option<&str>) -> TriggerEvent {
1678    TriggerEvent::new(
1679        ProviderId::from("webhook"),
1680        binding_id,
1681        Some(clock::now_utc()),
1682        dedupe_key,
1683        tenant_id.map(TenantId::new),
1684        BTreeMap::new(),
1685        ProviderPayload::Known(KnownProviderPayload::Webhook(GenericWebhookPayload {
1686            source: Some("trigger-test-harness".to_string()),
1687            content_type: Some("application/json".to_string()),
1688            raw: json!({
1689                "binding_id": binding_id,
1690            }),
1691        })),
1692        SignatureStatus::Unsigned,
1693    )
1694}
1695
1696async fn a2a_push_fixture_connector() -> Result<(A2aPushConnector, Arc<InboxIndex>), String> {
1697    let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
1698    let inbox = build_inbox(&log).await;
1699    let mut connector = A2aPushConnector::new();
1700    connector
1701        .init(connector_ctx(
1702            log,
1703            Arc::new(EmptySecretProvider),
1704            inbox.clone(),
1705        ))
1706        .await
1707        .map_err(|error| error.to_string())?;
1708    connector
1709        .activate(&[ConnectorTriggerBinding {
1710            provider: ProviderId::from("a2a-push"),
1711            kind: crate::connectors::TriggerKind::from("a2a-push"),
1712            binding_id: "a2a.push.fixture".to_string(),
1713            dedupe_key: None,
1714            dedupe_retention_days: DEFAULT_INBOX_RETENTION_DAYS,
1715            config: json!({
1716                "a2a_push": {
1717                    "expected_iss": "reviewer.prod",
1718                    "expected_aud": "https://orchestrator.test/a2a/review",
1719                    "expected_token": "opaque-token",
1720                    "inline_jwks": {
1721                        "keys": [{
1722                            "kty": "oct",
1723                            "kid": "test-key",
1724                            "alg": "HS256",
1725                            "k": "c2VjcmV0"
1726                        }]
1727                    }
1728                }
1729            }),
1730        }])
1731        .await
1732        .map_err(|error| error.to_string())?;
1733    Ok((connector, inbox))
1734}
1735
1736fn a2a_push_raw(jti: &str) -> RawInbound {
1737    let mut headers = BTreeMap::new();
1738    headers.insert(
1739        "authorization".to_string(),
1740        format!("Bearer {}", a2a_push_fixture_jwt(jti)),
1741    );
1742    headers.insert(
1743        "content-type".to_string(),
1744        "application/a2a+json".to_string(),
1745    );
1746    let mut raw = RawInbound::new(
1747        "",
1748        headers,
1749        serde_json::to_vec(&json!({
1750            "statusUpdate": {
1751                "taskId": "task-123",
1752                "contextId": "ctx-123",
1753                "status": {"state": "completed"}
1754            }
1755        }))
1756        .expect("serialize a2a push fixture"),
1757    );
1758    raw.metadata = json!({"binding_id": "a2a.push.fixture"});
1759    raw
1760}
1761
1762#[derive(Serialize)]
1763struct A2aFixtureClaims {
1764    iss: String,
1765    aud: String,
1766    iat: i64,
1767    exp: i64,
1768    jti: String,
1769    token: String,
1770    #[serde(rename = "taskId")]
1771    task_id: String,
1772}
1773
1774fn a2a_push_fixture_jwt(jti: &str) -> String {
1775    let mut header = jsonwebtoken::Header::new(jsonwebtoken::Algorithm::HS256);
1776    header.kid = Some("test-key".to_string());
1777    jsonwebtoken::encode(
1778        &header,
1779        &A2aFixtureClaims {
1780            iss: "reviewer.prod".to_string(),
1781            aud: "https://orchestrator.test/a2a/review".to_string(),
1782            iat: OffsetDateTime::now_utc().unix_timestamp(),
1783            exp: OffsetDateTime::now_utc().unix_timestamp() + 300,
1784            jti: jti.to_string(),
1785            token: "opaque-token".to_string(),
1786            task_id: "task-123".to_string(),
1787        },
1788        &jsonwebtoken::EncodingKey::from_secret(b"secret"),
1789    )
1790    .expect("encode fixture jwt")
1791}
1792
1793fn parse_rfc3339(raw: &str) -> OffsetDateTime {
1794    OffsetDateTime::parse(raw, &time::format_description::well_known::Rfc3339)
1795        .expect("fixture timestamp should parse")
1796}
1797
1798fn format_rfc3339(value: OffsetDateTime) -> String {
1799    value
1800        .format(&time::format_description::well_known::Rfc3339)
1801        .unwrap_or_default()
1802}
1803
1804fn wait_until_wall_clock_after(timestamp: OffsetDateTime) {
1805    let timestamp_ms = timestamp.unix_timestamp_nanos() / 1_000_000;
1806    while OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000 <= timestamp_ms {
1807        std::hint::spin_loop();
1808        std::thread::yield_now();
1809    }
1810}
1811
1812fn signature_state_label(value: &SignatureStatus) -> &'static str {
1813    match value {
1814        SignatureStatus::Verified => "verified",
1815        SignatureStatus::Unsigned => "unsigned",
1816        SignatureStatus::Failed { .. } => "failed",
1817    }
1818}
1819
1820#[cfg(test)]
1821mod tests {
1822    use super::{run_trigger_harness_fixture, TRIGGER_TEST_FIXTURES};
1823
1824    #[tokio::test(flavor = "current_thread")]
1825    async fn every_trigger_harness_fixture_reports_success() {
1826        for fixture in TRIGGER_TEST_FIXTURES {
1827            let result = run_trigger_harness_fixture(fixture)
1828                .await
1829                .unwrap_or_else(|error| panic!("{fixture} should run: {error}"));
1830            assert!(result.ok, "{fixture} should report success: {result:?}");
1831        }
1832    }
1833}