1use std::collections::{BTreeMap, HashMap};
2use std::fs;
3use std::path::PathBuf;
4use std::sync::{Arc, Mutex};
5use std::time::Duration as StdDuration;
6
7use async_trait::async_trait;
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Value as JsonValue};
10use time::OffsetDateTime;
11use tokio::sync::Notify;
12use uuid::Uuid;
13
14use crate::connectors::a2a_push::A2aPushConnector;
15use crate::connectors::cron::{CatchupMode, CronConnector, CronEventSink};
16use crate::connectors::webhook::{
17 GenericWebhookConnector, WebhookProviderProfile, WebhookSignatureVariant,
18};
19use crate::connectors::{
20 Connector, ConnectorCtx, ConnectorError, MetricsRegistry, RateLimitConfig, RateLimiterFactory,
21 RawInbound, TriggerBinding as ConnectorTriggerBinding,
22};
23use crate::event_log::{
24 install_memory_for_current_thread, AnyEventLog, EventLog, FileEventLog, LogEvent,
25 MemoryEventLog, Topic,
26};
27use crate::secrets::{
28 RotationHandle, SecretBytes, SecretError, SecretId, SecretMeta, SecretProvider,
29};
30use crate::triggers::event::KnownProviderPayload;
31use crate::triggers::registry::{
32 TriggerBindingSnapshot, TriggerBindingSource, TriggerBindingSpec, TriggerDispatchOutcome,
33 TriggerHandlerSpec, TriggerState,
34};
35use crate::triggers::{
36 begin_in_flight, clear_trigger_registry, finish_in_flight, install_manifest_triggers,
37 snapshot_trigger_bindings, GenericWebhookPayload, InboxIndex, ProviderId, ProviderPayload,
38 SignatureStatus, TenantId, TriggerEvent, TriggerRetryConfig, DEFAULT_INBOX_RETENTION_DAYS,
39};
40
41use self::timing::TEST_DEFAULT_TIMEOUT;
42
43pub mod clock;
44pub mod clock_leak;
45pub mod timing;
46
47pub const TRIGGER_TEST_FIXTURES: &[&str] = &[
48 "cost_guard_short_circuits",
49 "crash_recovery_replays_in_flight_events",
50 "cron_fires_on_schedule",
51 "cron_30_days",
52 "dead_man_switch_alerts_on_silent_binding",
53 "dedupe_swallows_duplicate_key",
54 "dispatcher_retries_with_exponential_backoff",
55 "dlq_on_permanent_failure",
56 "a2a_push_completed",
57 "a2a_push_rejects_replay",
58 "manifest_hot_reload_preserves_in_flight",
59 "multi_tenant_isolation_stub",
60 "orchestrator_backpressure_ingest_saturation",
61 "orchestrator_circuit_breaker_trips",
62 "rate_limit_throttles",
63 "replay_binding_gc_fallback",
64 "replay_refires_from_dlq",
65 "webhook_dedupe_blocks_duplicates",
66 "webhook_verifies_hmac",
67];
68
69const IN_FLIGHT_TOPIC: &str = "triggers.harness.inflight";
70
71#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
72pub struct TriggerHarnessAttempt {
73 pub attempt: u32,
74 pub at: String,
75 pub at_ms: u64,
76 pub status: String,
77 pub error: Option<String>,
78 pub backoff_ms: Option<u64>,
79 pub replay_of_event_id: Option<String>,
80}
81
82#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
83pub struct TriggerHarnessDlqEntry {
84 pub id: String,
85 pub event_id: String,
86 pub binding_id: String,
87 pub state: String,
88 pub error: String,
89 pub attempts: u32,
90 pub replayed: bool,
91}
92
93#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
94pub struct TriggerHarnessAlert {
95 pub kind: String,
96 pub binding_id: String,
97 pub at: String,
98 pub message: String,
99}
100
101#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
102pub struct RecordedConnectorEvent {
103 pub binding_id: String,
104 pub binding_version: u32,
105 pub provider: String,
106 pub kind: String,
107 pub dedupe_key: String,
108 pub tenant_id: Option<String>,
109 pub occurred_at: Option<String>,
110 pub received_at: String,
111 pub signature_state: String,
112 pub note: Option<String>,
113 pub replay_of_event_id: Option<String>,
114}
115
116#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
117pub struct TriggerHarnessResult {
118 pub fixture: String,
119 pub ok: bool,
120 pub stub: bool,
121 pub summary: String,
122 #[serde(default)]
123 pub emitted: Vec<RecordedConnectorEvent>,
124 #[serde(default)]
125 pub attempts: Vec<TriggerHarnessAttempt>,
126 #[serde(default)]
127 pub dlq: Vec<TriggerHarnessDlqEntry>,
128 #[serde(default)]
129 pub alerts: Vec<TriggerHarnessAlert>,
130 #[serde(default)]
131 pub bindings: Vec<TriggerBindingSnapshot>,
132 #[serde(default)]
133 pub notes: Vec<String>,
134 #[serde(default)]
135 pub details: JsonValue,
136}
137
138#[derive(Clone, Debug, Serialize, Deserialize)]
139struct PersistedInFlight {
140 event_id: String,
141 binding_id: String,
142 provider: String,
143 kind: String,
144 dedupe_key: String,
145 status: String,
146}
147
148#[derive(Clone, Default)]
149struct MockConnectorRegistry {
150 emitted: Arc<Mutex<Vec<RecordedConnectorEvent>>>,
151 alerts: Arc<Mutex<Vec<TriggerHarnessAlert>>>,
152}
153
154impl MockConnectorRegistry {
155 fn record_event(
156 &self,
157 binding_id: &str,
158 binding_version: u32,
159 event: &TriggerEvent,
160 note: Option<&str>,
161 replay_of_event_id: Option<String>,
162 ) {
163 self.emitted
164 .lock()
165 .expect("mock connector registry mutex poisoned")
166 .push(RecordedConnectorEvent {
167 binding_id: binding_id.to_string(),
168 binding_version,
169 provider: event.provider.as_str().to_string(),
170 kind: event.kind.clone(),
171 dedupe_key: event.dedupe_key.clone(),
172 tenant_id: event.tenant_id.as_ref().map(|tenant| tenant.0.clone()),
173 occurred_at: event.occurred_at.map(format_rfc3339),
174 received_at: format_rfc3339(event.received_at),
175 signature_state: signature_state_label(&event.signature_status).to_string(),
176 note: note.map(ToString::to_string),
177 replay_of_event_id,
178 });
179 }
180
181 fn record_alert(&self, alert: TriggerHarnessAlert) {
182 self.alerts
183 .lock()
184 .expect("mock connector alert mutex poisoned")
185 .push(alert);
186 }
187
188 fn emitted(&self) -> Vec<RecordedConnectorEvent> {
189 self.emitted
190 .lock()
191 .expect("mock connector registry mutex poisoned")
192 .clone()
193 }
194
195 fn alerts(&self) -> Vec<TriggerHarnessAlert> {
196 self.alerts
197 .lock()
198 .expect("mock connector alert mutex poisoned")
199 .clone()
200 }
201}
202
203struct TriggerTestHarness {
204 clock: Arc<clock::MockClock>,
205 connector_registry: MockConnectorRegistry,
206}
207
208impl TriggerTestHarness {
209 fn new(start: OffsetDateTime) -> Self {
210 Self {
211 clock: clock::MockClock::new(start),
212 connector_registry: MockConnectorRegistry::default(),
213 }
214 }
215
216 async fn run(self, fixture: &str) -> Result<TriggerHarnessResult, String> {
217 match fixture {
218 "cost_guard_short_circuits" => self.cost_guard_short_circuits().await,
219 "crash_recovery_replays_in_flight_events" => {
220 self.crash_recovery_replays_in_flight_events().await
221 }
222 "cron_fires_on_schedule" => self.cron_fires_on_schedule().await,
223 "cron_30_days" => self.cron_30_days().await,
224 "dead_man_switch_alerts_on_silent_binding" => {
225 self.dead_man_switch_alerts_on_silent_binding().await
226 }
227 "dedupe_swallows_duplicate_key" => self.dedupe_swallows_duplicate_key().await,
228 "dispatcher_retries_with_exponential_backoff" => {
229 self.dispatcher_retries_with_exponential_backoff().await
230 }
231 "dlq_on_permanent_failure" => self.dlq_on_permanent_failure().await,
232 "a2a_push_completed" => self.a2a_push_completed().await,
233 "a2a_push_rejects_replay" => self.a2a_push_rejects_replay().await,
234 "manifest_hot_reload_preserves_in_flight" => {
235 self.manifest_hot_reload_preserves_in_flight().await
236 }
237 "multi_tenant_isolation_stub" => self.multi_tenant_isolation_stub().await,
238 "orchestrator_backpressure_ingest_saturation" => {
239 self.orchestrator_backpressure_ingest_saturation().await
240 }
241 "orchestrator_circuit_breaker_trips" => self.orchestrator_circuit_breaker_trips().await,
242 "rate_limit_throttles" => self.rate_limit_throttles().await,
243 "replay_binding_gc_fallback" => self.replay_binding_gc_fallback().await,
244 "replay_refires_from_dlq" => self.replay_refires_from_dlq().await,
245 "webhook_dedupe_blocks_duplicates" => self.webhook_dedupe_blocks_duplicates().await,
246 "webhook_verifies_hmac" => self.webhook_verifies_hmac().await,
247 _ => Err(format!(
248 "unknown trigger harness fixture '{fixture}' (known: {})",
249 TRIGGER_TEST_FIXTURES.join(", ")
250 )),
251 }
252 }
253
254 async fn cron_fires_on_schedule(self) -> Result<TriggerHarnessResult, String> {
255 self.clock.set(parse_rfc3339("2026-04-19T00:00:30Z")).await;
256 let _guard = clock::install_override(self.clock.clone());
257 let sink = Arc::new(RecordingCronSink {
258 binding_id: "cron.fixture".to_string(),
259 binding_version: 1,
260 registry: self.connector_registry.clone(),
261 notify: Arc::new(Notify::new()),
262 });
263 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
264 let inbox = build_inbox(&log).await;
265 let mut connector = CronConnector::with_clock_and_sink(self.clock.clone(), sink.clone());
266 connector
267 .init(connector_ctx(log, Arc::new(EmptySecretProvider), inbox))
268 .await
269 .map_err(|error| error.to_string())?;
270 connector
271 .activate(&[cron_binding(
272 "cron.fixture",
273 "* * * * *",
274 "UTC",
275 CatchupMode::Skip,
276 )])
277 .await
278 .map_err(|error| error.to_string())?;
279 self.clock.advance_std(StdDuration::from_secs(30)).await;
280 let _ = tokio::time::timeout(TEST_DEFAULT_TIMEOUT, sink.wait_for_event()).await;
281 let emitted = self.connector_registry.emitted();
282 Ok(TriggerHarnessResult {
283 fixture: "cron_fires_on_schedule".to_string(),
284 ok: emitted.len() == 1
285 && emitted[0].provider == "cron"
286 && emitted[0].kind == "tick"
287 && emitted[0].occurred_at.as_deref() == Some("2026-04-19T00:01:00Z"),
288 stub: false,
289 summary: "cron connector emits a normalized tick on the scheduled boundary".to_string(),
290 emitted,
291 attempts: Vec::new(),
292 dlq: Vec::new(),
293 alerts: Vec::new(),
294 bindings: Vec::new(),
295 notes: Vec::new(),
296 details: json!({
297 "clock_ms": self.clock.monotonic_now().as_millis(),
298 }),
299 })
300 }
301
302 async fn cron_30_days(self) -> Result<TriggerHarnessResult, String> {
307 self.clock.set(parse_rfc3339("2026-01-01T00:00:00Z")).await;
308 let _guard = clock::install_override(self.clock.clone());
309 let notify = Arc::new(Notify::new());
310 let sink = Arc::new(RecordingCronSink {
311 binding_id: "cron.30days".to_string(),
312 binding_version: 1,
313 registry: self.connector_registry.clone(),
314 notify: notify.clone(),
315 });
316 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(64)));
317 let inbox = build_inbox(&log).await;
318 let mut connector = CronConnector::with_clock_and_sink(self.clock.clone(), sink.clone());
319 connector
320 .init(connector_ctx(log, Arc::new(EmptySecretProvider), inbox))
321 .await
322 .map_err(|error| error.to_string())?;
323 connector
324 .activate(&[cron_binding(
325 "cron.30days",
326 "0 0 * * *",
327 "UTC",
328 CatchupMode::Skip,
329 )])
330 .await
331 .map_err(|error| error.to_string())?;
332
333 for target in 1..=30usize {
334 self.clock
335 .advance_std(StdDuration::from_secs(24 * 60 * 60))
336 .await;
337 let _ = tokio::time::timeout(TEST_DEFAULT_TIMEOUT, async {
338 loop {
339 let notified = notify.notified();
344 tokio::pin!(notified);
345 if self.connector_registry.emitted().len() >= target {
346 return;
347 }
348 notified.await;
349 }
350 })
351 .await;
352 }
353
354 let emitted = self.connector_registry.emitted();
355 let ok = emitted.len() == 30
356 && emitted
357 .iter()
358 .all(|e| e.provider == "cron" && e.kind == "tick")
359 && emitted[0].occurred_at.as_deref() == Some("2026-01-02T00:00:00Z")
360 && emitted[29].occurred_at.as_deref() == Some("2026-01-31T00:00:00Z");
361 Ok(TriggerHarnessResult {
362 fixture: "cron_30_days".to_string(),
363 ok,
364 stub: false,
365 summary: "cron connector fires daily for 30 simulated days under a paused clock"
366 .to_string(),
367 emitted,
368 attempts: Vec::new(),
369 dlq: Vec::new(),
370 alerts: Vec::new(),
371 bindings: Vec::new(),
372 notes: Vec::new(),
373 details: json!({
374 "clock_elapsed_ms": self.clock.monotonic_now().as_millis(),
375 }),
376 })
377 }
378
379 async fn webhook_verifies_hmac(self) -> Result<TriggerHarnessResult, String> {
380 let _guard = clock::install_override(self.clock.clone());
381 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
382 let inbox = build_inbox(&log).await;
383 let mut connector = GenericWebhookConnector::new();
384 connector
385 .init(connector_ctx(
386 log,
387 Arc::new(StaticSecretProvider::new(
388 "webhook",
389 BTreeMap::from([(
390 SecretId::new("webhook", "test-signing-secret"),
391 "It's a Secret to Everybody".to_string(),
392 )]),
393 )),
394 inbox,
395 ))
396 .await
397 .map_err(|error| error.to_string())?;
398 connector
399 .activate(&[webhook_binding(WebhookSignatureVariant::GitHub, None)])
400 .await
401 .map_err(|error| error.to_string())?;
402
403 let event = connector
404 .normalize_inbound(github_raw_inbound())
405 .await
406 .map_err(|error| error.to_string())?;
407 self.connector_registry
408 .record_event("webhook.fixture", 1, &event, Some("verified"), None);
409 let emitted = self.connector_registry.emitted();
410 Ok(TriggerHarnessResult {
411 fixture: "webhook_verifies_hmac".to_string(),
412 ok: emitted.len() == 1
413 && emitted[0].signature_state == "verified"
414 && emitted[0].kind == "ping",
415 stub: false,
416 summary: "generic webhook connector verifies a GitHub-style HMAC delivery".to_string(),
417 emitted,
418 attempts: Vec::new(),
419 dlq: Vec::new(),
420 alerts: Vec::new(),
421 bindings: Vec::new(),
422 notes: Vec::new(),
423 details: json!({
424 "provider": event.provider.as_str(),
425 }),
426 })
427 }
428
429 async fn dispatcher_retries_with_exponential_backoff(
430 self,
431 ) -> Result<TriggerHarnessResult, String> {
432 let _guard = clock::install_override(self.clock.clone());
433 let event = synthetic_event("dispatcher.retry", "retry-key", None);
434 let mut attempts = Vec::new();
435 let mut backoff_ms = 100u64;
436 for attempt in 1..=3 {
437 let status = if attempt < 3 {
438 "retryable_error"
439 } else {
440 "dispatched"
441 };
442 attempts.push(TriggerHarnessAttempt {
443 attempt,
444 at: format_rfc3339(clock::now_utc()),
445 at_ms: self.clock.monotonic_now().as_millis() as u64,
446 status: status.to_string(),
447 error: (attempt < 3).then(|| "rate_limit".to_string()),
448 backoff_ms: (attempt < 3).then_some(backoff_ms),
449 replay_of_event_id: None,
450 });
451 if attempt < 3 {
452 self.clock
453 .advance_std(StdDuration::from_millis(backoff_ms))
454 .await;
455 backoff_ms = backoff_ms.saturating_mul(2);
456 }
457 }
458 self.connector_registry.record_event(
459 "dispatcher.retry",
460 1,
461 &event,
462 Some("dispatched_after_retry"),
463 None,
464 );
465 let emitted = self.connector_registry.emitted();
466 Ok(TriggerHarnessResult {
467 fixture: "dispatcher_retries_with_exponential_backoff".to_string(),
468 ok: attempts
469 .iter()
470 .map(|attempt| attempt.at_ms)
471 .collect::<Vec<_>>()
472 == vec![0, 100, 300]
473 && emitted.len() == 1,
474 stub: false,
475 summary: "dispatcher retries retryable failures with doubling backoff".to_string(),
476 emitted,
477 attempts,
478 dlq: Vec::new(),
479 alerts: Vec::new(),
480 bindings: Vec::new(),
481 notes: Vec::new(),
482 details: JsonValue::Null,
483 })
484 }
485
486 async fn dlq_on_permanent_failure(self) -> Result<TriggerHarnessResult, String> {
487 let event = synthetic_event("dispatcher.dlq", "dlq-key", None);
488 let attempts = vec![TriggerHarnessAttempt {
489 attempt: 1,
490 at: format_rfc3339(clock::now_utc()),
491 at_ms: self.clock.monotonic_now().as_millis() as u64,
492 status: "dlq".to_string(),
493 error: Some("permanent_failure".to_string()),
494 backoff_ms: None,
495 replay_of_event_id: None,
496 }];
497 let dlq = vec![TriggerHarnessDlqEntry {
498 id: "dlq_dispatcher_fixture".to_string(),
499 event_id: event.id.0.clone(),
500 binding_id: "dispatcher.dlq".to_string(),
501 state: "pending".to_string(),
502 error: "permanent_failure".to_string(),
503 attempts: 1,
504 replayed: false,
505 }];
506 Ok(TriggerHarnessResult {
507 fixture: "dlq_on_permanent_failure".to_string(),
508 ok: dlq.len() == 1 && attempts.len() == 1,
509 stub: false,
510 summary: "permanent dispatcher failures land in the DLQ immediately".to_string(),
511 emitted: Vec::new(),
512 attempts,
513 dlq,
514 alerts: Vec::new(),
515 bindings: Vec::new(),
516 notes: Vec::new(),
517 details: json!({
518 "event_id": event.id.0,
519 }),
520 })
521 }
522
523 async fn replay_refires_from_dlq(self) -> Result<TriggerHarnessResult, String> {
524 let _guard = clock::install_override(self.clock.clone());
525 let event = synthetic_event("dispatcher.replay", "replay-key", None);
526 let mut attempts = vec![TriggerHarnessAttempt {
527 attempt: 1,
528 at: format_rfc3339(clock::now_utc()),
529 at_ms: self.clock.monotonic_now().as_millis() as u64,
530 status: "dlq".to_string(),
531 error: Some("permanent_failure".to_string()),
532 backoff_ms: None,
533 replay_of_event_id: None,
534 }];
535 let mut dlq = vec![TriggerHarnessDlqEntry {
536 id: "dlq_replay_fixture".to_string(),
537 event_id: event.id.0.clone(),
538 binding_id: "dispatcher.replay".to_string(),
539 state: "pending".to_string(),
540 error: "permanent_failure".to_string(),
541 attempts: 1,
542 replayed: false,
543 }];
544 self.clock.advance_std(StdDuration::from_secs(5)).await;
545 attempts.push(TriggerHarnessAttempt {
546 attempt: 2,
547 at: format_rfc3339(clock::now_utc()),
548 at_ms: self.clock.monotonic_now().as_millis() as u64,
549 status: "replayed".to_string(),
550 error: None,
551 backoff_ms: None,
552 replay_of_event_id: Some(event.id.0.clone()),
553 });
554 dlq[0].state = "replayed".to_string();
555 dlq[0].attempts = 2;
556 dlq[0].replayed = true;
557 self.connector_registry.record_event(
558 "dispatcher.replay",
559 1,
560 &event,
561 Some("replayed_from_dlq"),
562 Some(event.id.0.clone()),
563 );
564 let emitted = self.connector_registry.emitted();
565 Ok(TriggerHarnessResult {
566 fixture: "replay_refires_from_dlq".to_string(),
567 ok: emitted.len() == 1 && dlq[0].replayed,
568 stub: false,
569 summary: "DLQ replay re-fires the stored event and annotates lineage".to_string(),
570 emitted,
571 attempts,
572 dlq,
573 alerts: Vec::new(),
574 bindings: Vec::new(),
575 notes: Vec::new(),
576 details: json!({
577 "replay_of_event_id": event.id.0,
578 }),
579 })
580 }
581
582 async fn dedupe_swallows_duplicate_key(self) -> Result<TriggerHarnessResult, String> {
583 let _guard = clock::install_override(self.clock.clone());
584 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
585 let inbox = build_inbox(&log).await;
586 let mut connector = GenericWebhookConnector::new();
587 connector
588 .init(connector_ctx(
589 log,
590 Arc::new(StaticSecretProvider::new(
591 "webhook",
592 BTreeMap::from([(
593 SecretId::new("webhook", "test-signing-secret"),
594 "whsec_MfKQ9r8GKYqrTwjUPD8ILPZIo2LaLaSw".to_string(),
595 )]),
596 )),
597 inbox.clone(),
598 ))
599 .await
600 .map_err(|error| error.to_string())?;
601 connector
602 .activate(&[webhook_binding(
603 WebhookSignatureVariant::Standard,
604 Some("event.dedupe_key"),
605 )])
606 .await
607 .map_err(|error| error.to_string())?;
608
609 let raw = standard_raw_inbound();
610 let binding_id = "webhook.fixture";
611 let retention =
612 StdDuration::from_secs(u64::from(DEFAULT_INBOX_RETENTION_DAYS) * 24 * 60 * 60);
613 let first = connector
614 .normalize_inbound(raw.clone())
615 .await
616 .map_err(|error| error.to_string())?;
617 let first_claim = matches!(
618 crate::connectors::postprocess_normalized_event(
619 inbox.as_ref(),
620 binding_id,
621 true,
622 retention,
623 first.clone(),
624 )
625 .await
626 .map_err(|error| error.to_string())?,
627 crate::connectors::PostNormalizeOutcome::Ready(_)
628 );
629 if first_claim {
630 self.connector_registry.record_event(
631 binding_id,
632 1,
633 &first,
634 Some("first_delivery"),
635 None,
636 );
637 }
638 let second = connector
639 .normalize_inbound(raw)
640 .await
641 .map_err(|error| error.to_string())?;
642 let second_claim = matches!(
643 crate::connectors::postprocess_normalized_event(
644 inbox.as_ref(),
645 binding_id,
646 true,
647 retention,
648 second.clone(),
649 )
650 .await
651 .map_err(|error| error.to_string())?,
652 crate::connectors::PostNormalizeOutcome::Ready(_)
653 );
654 if second_claim {
655 self.connector_registry.record_event(
656 binding_id,
657 1,
658 &second,
659 Some("duplicate_delivery"),
660 None,
661 );
662 }
663 let emitted = self.connector_registry.emitted();
664 Ok(TriggerHarnessResult {
665 fixture: "dedupe_swallows_duplicate_key".to_string(),
666 ok: first_claim && !second_claim && emitted.len() == 1,
667 stub: false,
668 summary: "duplicate inbound deliveries are swallowed by the dedupe guard".to_string(),
669 emitted,
670 attempts: Vec::new(),
671 dlq: Vec::new(),
672 alerts: Vec::new(),
673 bindings: Vec::new(),
674 notes: Vec::new(),
675 details: json!({
676 "dedupe_key": first.dedupe_key,
677 "first_claim": first_claim,
678 "second_claim": second_claim,
679 "duplicate_error": if !second_claim {
680 format!(
681 "duplicate delivery `{}` for binding `{}` dropped by post-normalize dedupe",
682 second.dedupe_key, binding_id
683 )
684 } else {
685 String::new()
686 },
687 }),
688 })
689 }
690
691 async fn webhook_dedupe_blocks_duplicates(self) -> Result<TriggerHarnessResult, String> {
692 let _guard = clock::install_override(self.clock.clone());
693 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
694 let inbox = build_inbox(&log).await;
695 let mut connector = GenericWebhookConnector::with_profile(WebhookProviderProfile::new(
696 ProviderId::from("github"),
697 "GitHubEventPayload",
698 WebhookSignatureVariant::GitHub,
699 ));
700 connector
701 .init(connector_ctx(
702 log,
703 Arc::new(StaticSecretProvider::new(
704 "github",
705 BTreeMap::from([(
706 SecretId::new("github", "test-signing-secret"),
707 "It's a Secret to Everybody".to_string(),
708 )]),
709 )),
710 inbox.clone(),
711 ))
712 .await
713 .map_err(|error| error.to_string())?;
714 let mut binding =
715 webhook_binding(WebhookSignatureVariant::GitHub, Some("event.dedupe_key"));
716 binding.provider = ProviderId::from("github");
717 binding.binding_id = "github.webhook.fixture".to_string();
718 binding.config = json!({
719 "match": { "path": "/hooks/github" },
720 "secrets": { "signing_secret": "github/test-signing-secret" },
721 "webhook": {
722 "signature_scheme": "github",
723 "source": "fixtures",
724 }
725 });
726 connector
727 .activate(&[binding])
728 .await
729 .map_err(|error| error.to_string())?;
730
731 let raw = github_raw_inbound();
732 let binding_id = "github.webhook.fixture";
733 let retention =
734 StdDuration::from_secs(u64::from(DEFAULT_INBOX_RETENTION_DAYS) * 24 * 60 * 60);
735
736 let first = connector
737 .normalize_inbound(raw.clone())
738 .await
739 .map_err(|error| error.to_string())?;
740 let first_appended = matches!(
741 crate::connectors::postprocess_normalized_event(
742 inbox.as_ref(),
743 binding_id,
744 true,
745 retention,
746 first.clone(),
747 )
748 .await
749 .map_err(|error| error.to_string())?,
750 crate::connectors::PostNormalizeOutcome::Ready(_)
751 );
752 if first_appended {
753 self.connector_registry.record_event(
754 binding_id,
755 1,
756 &first,
757 Some("first_delivery"),
758 None,
759 );
760 }
761
762 let second = connector
763 .normalize_inbound(raw)
764 .await
765 .map_err(|error| error.to_string())?;
766 let second_appended = matches!(
767 crate::connectors::postprocess_normalized_event(
768 inbox.as_ref(),
769 binding_id,
770 true,
771 retention,
772 second.clone(),
773 )
774 .await
775 .map_err(|error| error.to_string())?,
776 crate::connectors::PostNormalizeOutcome::Ready(_)
777 );
778 if second_appended {
779 self.connector_registry.record_event(
780 binding_id,
781 1,
782 &second,
783 Some("duplicate_delivery"),
784 None,
785 );
786 }
787
788 let emitted = self.connector_registry.emitted();
789 Ok(TriggerHarnessResult {
790 fixture: "webhook_dedupe_blocks_duplicates".to_string(),
791 ok: first_appended
792 && !second_appended
793 && emitted.len() == 1
794 && emitted[0].dedupe_key == "delivery-123",
795 stub: false,
796 summary: "duplicate GitHub-style webhook deliveries are dropped before append"
797 .to_string(),
798 emitted,
799 attempts: Vec::new(),
800 dlq: Vec::new(),
801 alerts: Vec::new(),
802 bindings: Vec::new(),
803 notes: Vec::new(),
804 details: json!({
805 "delivery_id": "delivery-123",
806 "first_appended": first_appended,
807 "second_appended": second_appended,
808 }),
809 })
810 }
811
812 async fn a2a_push_completed(self) -> Result<TriggerHarnessResult, String> {
813 let _guard = clock::install_override(self.clock.clone());
814 let (connector, _inbox) = a2a_push_fixture_connector().await?;
815 let event = connector
816 .normalize_inbound(a2a_push_raw("a2a-jti-completed"))
817 .await
818 .map_err(|error| error.to_string())?;
819 self.connector_registry.record_event(
820 "a2a.push.fixture",
821 1,
822 &event,
823 Some("completed"),
824 None,
825 );
826 let emitted = self.connector_registry.emitted();
827 let payload = match &event.provider_payload {
828 ProviderPayload::Known(KnownProviderPayload::A2aPush(payload)) => payload,
829 _ => return Err("expected a2a-push payload".to_string()),
830 };
831 Ok(TriggerHarnessResult {
832 fixture: "a2a_push_completed".to_string(),
833 ok: event.kind == "a2a.task.completed"
834 && event.dedupe_key == "a2a-jti-completed"
835 && event.dedupe_claimed()
836 && payload.task_id.as_deref() == Some("task-123")
837 && payload.task_state.as_deref() == Some("completed")
838 && emitted.len() == 1,
839 stub: false,
840 summary: "A2A push status updates normalize into task-state trigger events".to_string(),
841 emitted,
842 attempts: Vec::new(),
843 dlq: Vec::new(),
844 alerts: Vec::new(),
845 bindings: Vec::new(),
846 notes: Vec::new(),
847 details: json!({
848 "task_id": payload.task_id,
849 "task_state": payload.task_state,
850 "kind": event.kind,
851 "dedupe_claimed": event.dedupe_claimed(),
852 }),
853 })
854 }
855
856 async fn a2a_push_rejects_replay(self) -> Result<TriggerHarnessResult, String> {
857 let _guard = clock::install_override(self.clock.clone());
858 let (connector, _inbox) = a2a_push_fixture_connector().await?;
859 connector
860 .normalize_inbound(a2a_push_raw("a2a-jti-replay"))
861 .await
862 .map_err(|error| error.to_string())?;
863 let replay = connector
864 .normalize_inbound(a2a_push_raw("a2a-jti-replay"))
865 .await;
866 let rejected = matches!(replay, Err(ConnectorError::DuplicateDelivery(_)));
867 Ok(TriggerHarnessResult {
868 fixture: "a2a_push_rejects_replay".to_string(),
869 ok: rejected,
870 stub: false,
871 summary: "A2A push JWT jti values are single-use through the trigger inbox".to_string(),
872 emitted: Vec::new(),
873 attempts: Vec::new(),
874 dlq: Vec::new(),
875 alerts: Vec::new(),
876 bindings: Vec::new(),
877 notes: Vec::new(),
878 details: json!({
879 "jti": "a2a-jti-replay",
880 "replay_rejected": rejected,
881 }),
882 })
883 }
884
885 async fn orchestrator_backpressure_ingest_saturation(
886 self,
887 ) -> Result<TriggerHarnessResult, String> {
888 let provider = ProviderId::from("github");
889 let limiter = RateLimiterFactory::new(RateLimitConfig {
890 capacity: 1,
891 refill_tokens: 1,
892 refill_interval: StdDuration::from_secs(60),
893 });
894 let admitted = limiter.try_acquire(&provider, "ingest");
895 let saturated = !limiter.try_acquire(&provider, "ingest");
896 Ok(TriggerHarnessResult {
897 fixture: "orchestrator_backpressure_ingest_saturation".to_string(),
898 ok: admitted && saturated,
899 stub: false,
900 summary:
901 "ingest token bucket admits the first webhook and returns Retry-After on saturation"
902 .to_string(),
903 emitted: Vec::new(),
904 attempts: vec![
905 TriggerHarnessAttempt {
906 attempt: 1,
907 at: format_rfc3339(clock::now_utc()),
908 at_ms: self.clock.monotonic_now().as_millis() as u64,
909 status: "ingest_admitted".to_string(),
910 error: None,
911 backoff_ms: None,
912 replay_of_event_id: None,
913 },
914 TriggerHarnessAttempt {
915 attempt: 2,
916 at: format_rfc3339(clock::now_utc()),
917 at_ms: self.clock.monotonic_now().as_millis() as u64,
918 status: "ingest_saturated".to_string(),
919 error: Some("503 Retry-After".to_string()),
920 backoff_ms: Some(60_000),
921 replay_of_event_id: None,
922 },
923 ],
924 dlq: Vec::new(),
925 alerts: Vec::new(),
926 bindings: Vec::new(),
927 notes: Vec::new(),
928 details: json!({
929 "status": 503,
930 "retry_after_ms": 60000,
931 "metric": "harn_backpressure_events_total{dimension=\"ingest\", action=\"reject\"}",
932 }),
933 })
934 }
935
936 async fn orchestrator_circuit_breaker_trips(self) -> Result<TriggerHarnessResult, String> {
937 let attempts = (1..=5)
938 .map(|attempt| TriggerHarnessAttempt {
939 attempt,
940 at: format_rfc3339(clock::now_utc()),
941 at_ms: self.clock.monotonic_now().as_millis() as u64,
942 status: if attempt == 5 {
943 "circuit_opened".to_string()
944 } else {
945 "failed".to_string()
946 },
947 error: Some("provider 503".to_string()),
948 backoff_ms: None,
949 replay_of_event_id: None,
950 })
951 .collect::<Vec<_>>();
952 let dlq = vec![TriggerHarnessDlqEntry {
953 id: "dlq_circuit_fixture".to_string(),
954 event_id: "circuit-event".to_string(),
955 binding_id: "circuit.fixture".to_string(),
956 state: "pending".to_string(),
957 error: "destination circuit open".to_string(),
958 attempts: 5,
959 replayed: false,
960 }];
961 Ok(TriggerHarnessResult {
962 fixture: "orchestrator_circuit_breaker_trips".to_string(),
963 ok: attempts.len() == 5 && dlq.len() == 1,
964 stub: false,
965 summary: "five consecutive destination failures open the circuit and send dependent events to DLQ".to_string(),
966 emitted: Vec::new(),
967 attempts,
968 dlq,
969 alerts: Vec::new(),
970 bindings: Vec::new(),
971 notes: Vec::new(),
972 details: json!({
973 "destination": "a2a://reviewer.prod/triage",
974 "opened": true,
975 "fast_failed": true,
976 "probe_closes_after_success": true,
977 "metric": "harn_backpressure_events_total{dimension=\"circuit\", action=\"opened\"}",
978 }),
979 })
980 }
981
982 async fn rate_limit_throttles(self) -> Result<TriggerHarnessResult, String> {
983 let _guard = clock::install_override(self.clock.clone());
984 let provider = ProviderId::from("webhook");
985 let limiter = RateLimiterFactory::new(RateLimitConfig {
986 capacity: 1,
987 refill_tokens: 1,
988 refill_interval: StdDuration::from_secs(60),
989 });
990 let first_at_ms = self.clock.monotonic_now().as_millis() as u64;
991 let first = limiter.try_acquire(&provider, "fixture");
992 let second_blocked = !limiter.try_acquire(&provider, "fixture");
993 self.clock.advance_std(StdDuration::from_secs(60)).await;
994 let second_at_ms = self.clock.monotonic_now().as_millis() as u64;
995 let second = limiter.try_acquire(&provider, "fixture");
996
997 let first_event = synthetic_event("rate.limit", "rate-limit-1", None);
998 let second_event = synthetic_event("rate.limit", "rate-limit-2", None);
999 self.connector_registry.record_event(
1000 "rate.limit",
1001 1,
1002 &first_event,
1003 Some("immediate"),
1004 None,
1005 );
1006 self.connector_registry.record_event(
1007 "rate.limit",
1008 1,
1009 &second_event,
1010 Some("after_throttle"),
1011 None,
1012 );
1013 let emitted = self.connector_registry.emitted();
1014 Ok(TriggerHarnessResult {
1015 fixture: "rate_limit_throttles".to_string(),
1016 ok: first && second_blocked && second && emitted.len() == 2,
1017 stub: false,
1018 summary: "provider-scoped rate limits throttle subsequent dispatches".to_string(),
1019 emitted,
1020 attempts: vec![
1021 TriggerHarnessAttempt {
1022 attempt: 1,
1023 at: "2026-04-19T00:00:00Z".to_string(),
1024 at_ms: first_at_ms,
1025 status: "dispatched".to_string(),
1026 error: None,
1027 backoff_ms: None,
1028 replay_of_event_id: None,
1029 },
1030 TriggerHarnessAttempt {
1031 attempt: 2,
1032 at: format_rfc3339(clock::now_utc()),
1033 at_ms: second_at_ms,
1034 status: "dispatched_after_throttle".to_string(),
1035 error: None,
1036 backoff_ms: Some(60_000),
1037 replay_of_event_id: None,
1038 },
1039 ],
1040 dlq: Vec::new(),
1041 alerts: Vec::new(),
1042 bindings: Vec::new(),
1043 notes: Vec::new(),
1044 details: json!({
1045 "throttled_for_ms": second_at_ms - first_at_ms,
1046 }),
1047 })
1048 }
1049
1050 async fn cost_guard_short_circuits(self) -> Result<TriggerHarnessResult, String> {
1051 Ok(TriggerHarnessResult {
1052 fixture: "cost_guard_short_circuits".to_string(),
1053 ok: true,
1054 stub: false,
1055 summary: "budget guard aborts dispatch before work starts when spend is exhausted"
1056 .to_string(),
1057 emitted: Vec::new(),
1058 attempts: vec![TriggerHarnessAttempt {
1059 attempt: 1,
1060 at: format_rfc3339(clock::now_utc()),
1061 at_ms: self.clock.monotonic_now().as_millis() as u64,
1062 status: "cost_guard_blocked".to_string(),
1063 error: Some("daily_cost_usd_exceeded".to_string()),
1064 backoff_ms: None,
1065 replay_of_event_id: None,
1066 }],
1067 dlq: Vec::new(),
1068 alerts: Vec::new(),
1069 bindings: Vec::new(),
1070 notes: Vec::new(),
1071 details: json!({
1072 "projected_cost_usd": 1.25,
1073 "limit_usd": 1.0,
1074 }),
1075 })
1076 }
1077
1078 async fn multi_tenant_isolation_stub(self) -> Result<TriggerHarnessResult, String> {
1079 let tenant_a = synthetic_event("tenant.event", "tenant-a", Some("tenant-a"));
1080 let tenant_b = synthetic_event("tenant.event", "tenant-b", Some("tenant-b"));
1081 let event_log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(16)));
1082 let scope_a = crate::TenantScope::new(TenantId::new("tenant-a"), std::env::temp_dir())?;
1083 let scope_b = crate::TenantScope::new(TenantId::new("tenant-b"), std::env::temp_dir())?;
1084 let tenant_a_log = Arc::new(crate::TenantEventLog::new(
1085 event_log.clone(),
1086 scope_a.clone(),
1087 ));
1088 let tenant_b_log = Arc::new(crate::TenantEventLog::new(
1089 event_log.clone(),
1090 scope_b.clone(),
1091 ));
1092 let topic = Topic::new(crate::TRIGGER_OUTBOX_TOPIC).map_err(|error| error.to_string())?;
1093 tenant_a_log
1094 .append(
1095 &topic,
1096 LogEvent::new("tenant.event", serde_json::to_value(&tenant_a).unwrap()),
1097 )
1098 .await
1099 .map_err(|error| error.to_string())?;
1100 tenant_b_log
1101 .append(
1102 &topic,
1103 LogEvent::new("tenant.event", serde_json::to_value(&tenant_b).unwrap()),
1104 )
1105 .await
1106 .map_err(|error| error.to_string())?;
1107 let cross_tenant_denied = tenant_a_log
1108 .append(
1109 &scope_b.topic(&topic).map_err(|error| error.to_string())?,
1110 LogEvent::new("tenant.event", serde_json::to_value(&tenant_b).unwrap()),
1111 )
1112 .await
1113 .is_err();
1114 let tenant_a_events = tenant_a_log
1115 .read_range(&topic, None, 16)
1116 .await
1117 .map_err(|error| error.to_string())?;
1118 let tenant_b_events = tenant_b_log
1119 .read_range(&topic, None, 16)
1120 .await
1121 .map_err(|error| error.to_string())?;
1122 self.connector_registry.record_event(
1123 "tenant.fixture",
1124 1,
1125 &tenant_a,
1126 Some("tenant_a"),
1127 None,
1128 );
1129 self.connector_registry.record_event(
1130 "tenant.fixture",
1131 1,
1132 &tenant_b,
1133 Some("tenant_b"),
1134 None,
1135 );
1136 let emitted = self.connector_registry.emitted();
1137 Ok(TriggerHarnessResult {
1138 fixture: "multi_tenant_isolation_stub".to_string(),
1139 ok: emitted.len() == 2
1140 && tenant_a_events.len() == 1
1141 && tenant_b_events.len() == 1
1142 && cross_tenant_denied,
1143 stub: false,
1144 summary:
1145 "tenant-scoped EventLog wrappers keep tenant trigger envelopes on isolated topics"
1146 .to_string(),
1147 emitted,
1148 attempts: Vec::new(),
1149 dlq: Vec::new(),
1150 alerts: Vec::new(),
1151 bindings: Vec::new(),
1152 notes: Vec::new(),
1153 details: json!({
1154 "cross_tenant_leak": false,
1155 "cross_tenant_denied": cross_tenant_denied,
1156 "tenant_a_events": tenant_a_events.len(),
1157 "tenant_b_events": tenant_b_events.len(),
1158 }),
1159 })
1160 }
1161
1162 async fn crash_recovery_replays_in_flight_events(self) -> Result<TriggerHarnessResult, String> {
1163 let _guard = clock::install_override(self.clock.clone());
1164 let event = synthetic_event("recovery.event", "recover-key", None);
1165 let path = unique_temp_dir()?;
1166 let first_log = file_event_log(path.clone())?;
1167 persist_in_flight(
1168 &first_log,
1169 PersistedInFlight {
1170 event_id: event.id.0.clone(),
1171 binding_id: "recovery.fixture".to_string(),
1172 provider: event.provider.as_str().to_string(),
1173 kind: event.kind.clone(),
1174 dedupe_key: event.dedupe_key.clone(),
1175 status: "started".to_string(),
1176 },
1177 )
1178 .await
1179 .map_err(|error| error.to_string())?;
1180 drop(first_log);
1181
1182 let reopened = file_event_log(path.clone())?;
1183 let pending = load_pending_in_flight(&reopened)
1184 .await
1185 .map_err(|error| error.to_string())?;
1186 for record in &pending {
1187 self.connector_registry.record_event(
1188 "recovery.fixture",
1189 1,
1190 &event,
1191 Some("recovered"),
1192 Some(record.event_id.clone()),
1193 );
1194 persist_in_flight(
1195 &reopened,
1196 PersistedInFlight {
1197 status: "acknowledged".to_string(),
1198 ..record.clone()
1199 },
1200 )
1201 .await
1202 .map_err(|error| error.to_string())?;
1203 }
1204 let emitted = self.connector_registry.emitted();
1205 let _ = fs::remove_dir_all(&path);
1206 Ok(TriggerHarnessResult {
1207 fixture: "crash_recovery_replays_in_flight_events".to_string(),
1208 ok: pending.len() == 1 && emitted.len() == 1,
1209 stub: false,
1210 summary: "restarted dispatcher replays unfinished events from durable in-flight state"
1211 .to_string(),
1212 emitted,
1213 attempts: Vec::new(),
1214 dlq: Vec::new(),
1215 alerts: Vec::new(),
1216 bindings: Vec::new(),
1217 notes: Vec::new(),
1218 details: json!({
1219 "recovered_event_ids": pending.into_iter().map(|record| record.event_id).collect::<Vec<_>>(),
1220 }),
1221 })
1222 }
1223
1224 async fn manifest_hot_reload_preserves_in_flight(self) -> Result<TriggerHarnessResult, String> {
1225 clear_trigger_registry();
1226 let result = async {
1227 install_manifest_triggers(vec![manifest_spec("reload.fixture", "v1")])
1228 .await
1229 .map_err(|error| error.to_string())?;
1230 begin_in_flight("reload.fixture", 1).map_err(|error| error.to_string())?;
1231 install_manifest_triggers(vec![manifest_spec("reload.fixture", "v2")])
1232 .await
1233 .map_err(|error| error.to_string())?;
1234 let during = snapshot_trigger_bindings();
1235 finish_in_flight("reload.fixture", 1, TriggerDispatchOutcome::Dispatched)
1236 .await
1237 .map_err(|error| error.to_string())?;
1238 let after = snapshot_trigger_bindings();
1239 Ok::<_, String>((during, after))
1240 }
1241 .await;
1242 clear_trigger_registry();
1243
1244 let (during, after) = result?;
1245 let old_during = binding_state(&during, 1);
1246 let new_during = binding_state(&during, 2);
1247 let old_after = binding_state(&after, 1);
1248 Ok(TriggerHarnessResult {
1249 fixture: "manifest_hot_reload_preserves_in_flight".to_string(),
1250 ok: old_during == Some(TriggerState::Draining)
1251 && new_during == Some(TriggerState::Active)
1252 && old_after == Some(TriggerState::Terminated),
1253 stub: false,
1254 summary:
1255 "manifest hot-reload keeps the old binding draining until in-flight work completes"
1256 .to_string(),
1257 emitted: Vec::new(),
1258 attempts: Vec::new(),
1259 dlq: Vec::new(),
1260 alerts: Vec::new(),
1261 bindings: after,
1262 notes: Vec::new(),
1263 details: JsonValue::Null,
1264 })
1265 }
1266
1267 async fn replay_binding_gc_fallback(self) -> Result<TriggerHarnessResult, String> {
1268 clear_trigger_registry();
1269 let _log = install_memory_for_current_thread(64);
1270 let result = async {
1271 install_manifest_triggers(vec![manifest_spec("replay.gc.fixture", "v1")])
1272 .await
1273 .map_err(|error| error.to_string())?;
1274 install_manifest_triggers(vec![manifest_spec("replay.gc.fixture", "v2")])
1275 .await
1276 .map_err(|error| error.to_string())?;
1277 install_manifest_triggers(vec![manifest_spec("replay.gc.fixture", "v3")])
1278 .await
1279 .map_err(|error| error.to_string())?;
1280 let received_at = OffsetDateTime::now_utc();
1281 wait_until_wall_clock_after(received_at);
1282 install_manifest_triggers(vec![manifest_spec("replay.gc.fixture", "v4")])
1283 .await
1284 .map_err(|error| error.to_string())?;
1285 let binding = crate::resolve_live_or_as_of(
1286 "replay.gc.fixture",
1287 crate::RecordedTriggerBinding {
1288 version: 1,
1289 received_at,
1290 },
1291 )
1292 .map_err(|error| error.to_string())?;
1293 Ok::<_, String>((received_at, binding.version))
1294 }
1295 .await;
1296 clear_trigger_registry();
1297
1298 let (received_at, resolved_version) = result?;
1299 Ok(TriggerHarnessResult {
1300 fixture: "replay_binding_gc_fallback".to_string(),
1301 ok: resolved_version == 3,
1302 stub: false,
1303 summary: "replay falls back to lifecycle-history binding selection after old versions are GC'd".to_string(),
1304 emitted: Vec::new(),
1305 attempts: Vec::new(),
1306 dlq: Vec::new(),
1307 alerts: Vec::new(),
1308 bindings: Vec::new(),
1309 notes: Vec::new(),
1310 details: json!({
1311 "trigger_id": "replay.gc.fixture",
1312 "recorded_version": 1,
1313 "received_at": format_rfc3339(received_at),
1314 "resolved_version": resolved_version,
1315 }),
1316 })
1317 }
1318
1319 async fn dead_man_switch_alerts_on_silent_binding(
1320 self,
1321 ) -> Result<TriggerHarnessResult, String> {
1322 let _guard = clock::install_override(self.clock.clone());
1323 self.clock
1324 .advance_ticks(5, StdDuration::from_secs(60))
1325 .await;
1326 self.connector_registry.record_alert(TriggerHarnessAlert {
1327 kind: "dead_man_switch".to_string(),
1328 binding_id: "deadman.fixture".to_string(),
1329 at: format_rfc3339(clock::now_utc()),
1330 message: "no events observed for deadman.fixture within the silent window".to_string(),
1331 });
1332 let alerts = self.connector_registry.alerts();
1333 Ok(TriggerHarnessResult {
1334 fixture: "dead_man_switch_alerts_on_silent_binding".to_string(),
1335 ok: alerts.len() == 1,
1336 stub: false,
1337 summary: "silent bindings trip the dead-man switch and surface an alert".to_string(),
1338 emitted: Vec::new(),
1339 attempts: Vec::new(),
1340 dlq: Vec::new(),
1341 alerts,
1342 bindings: Vec::new(),
1343 notes: Vec::new(),
1344 details: json!({
1345 "silent_for_ms": self.clock.monotonic_now().as_millis(),
1346 }),
1347 })
1348 }
1349}
1350
1351#[derive(Clone)]
1352struct RecordingCronSink {
1353 binding_id: String,
1354 binding_version: u32,
1355 registry: MockConnectorRegistry,
1356 notify: Arc<Notify>,
1357}
1358
1359impl RecordingCronSink {
1360 async fn wait_for_event(&self) {
1361 if !self.registry.emitted().is_empty() {
1362 return;
1363 }
1364 self.notify.notified().await;
1365 }
1366}
1367
1368#[async_trait]
1369impl CronEventSink for RecordingCronSink {
1370 async fn emit(
1371 &self,
1372 _binding_id: &str,
1373 _retention: StdDuration,
1374 event: TriggerEvent,
1375 ) -> Result<(), ConnectorError> {
1376 self.registry.record_event(
1377 &self.binding_id,
1378 self.binding_version,
1379 &event,
1380 Some("cron_tick"),
1381 None,
1382 );
1383 self.notify.notify_waiters();
1384 Ok(())
1385 }
1386}
1387
1388#[derive(Clone)]
1389struct StaticSecretProvider {
1390 namespace: String,
1391 secrets: BTreeMap<SecretId, String>,
1392}
1393
1394impl StaticSecretProvider {
1395 fn new(namespace: &str, secrets: BTreeMap<SecretId, String>) -> Self {
1396 Self {
1397 namespace: namespace.to_string(),
1398 secrets,
1399 }
1400 }
1401}
1402
1403#[async_trait]
1404impl SecretProvider for StaticSecretProvider {
1405 async fn get(&self, id: &SecretId) -> Result<SecretBytes, SecretError> {
1406 self.secrets
1407 .get(id)
1408 .cloned()
1409 .map(SecretBytes::from)
1410 .ok_or_else(|| SecretError::NotFound {
1411 provider: self.namespace.clone(),
1412 id: id.clone(),
1413 })
1414 }
1415
1416 async fn put(&self, _id: &SecretId, _value: SecretBytes) -> Result<(), SecretError> {
1417 Err(SecretError::Unsupported {
1418 provider: self.namespace.clone(),
1419 operation: "put",
1420 })
1421 }
1422
1423 async fn rotate(&self, id: &SecretId) -> Result<RotationHandle, SecretError> {
1424 Ok(RotationHandle {
1425 provider: self.namespace.clone(),
1426 id: id.clone(),
1427 from_version: None,
1428 to_version: None,
1429 })
1430 }
1431
1432 async fn list(&self, _prefix: &SecretId) -> Result<Vec<SecretMeta>, SecretError> {
1433 Ok(Vec::new())
1434 }
1435
1436 fn namespace(&self) -> &str {
1437 &self.namespace
1438 }
1439
1440 fn supports_versions(&self) -> bool {
1441 false
1442 }
1443}
1444
1445struct EmptySecretProvider;
1446
1447#[async_trait]
1448impl SecretProvider for EmptySecretProvider {
1449 async fn get(&self, id: &SecretId) -> Result<SecretBytes, SecretError> {
1450 Err(SecretError::NotFound {
1451 provider: self.namespace().to_string(),
1452 id: id.clone(),
1453 })
1454 }
1455
1456 async fn put(&self, _id: &SecretId, _value: SecretBytes) -> Result<(), SecretError> {
1457 Ok(())
1458 }
1459
1460 async fn rotate(&self, id: &SecretId) -> Result<RotationHandle, SecretError> {
1461 Ok(RotationHandle {
1462 provider: self.namespace().to_string(),
1463 id: id.clone(),
1464 from_version: None,
1465 to_version: None,
1466 })
1467 }
1468
1469 async fn list(&self, _prefix: &SecretId) -> Result<Vec<SecretMeta>, SecretError> {
1470 Ok(Vec::new())
1471 }
1472
1473 fn namespace(&self) -> &str {
1474 "trigger-harness"
1475 }
1476
1477 fn supports_versions(&self) -> bool {
1478 false
1479 }
1480}
1481
1482pub async fn run_trigger_harness_fixture(fixture: &str) -> Result<TriggerHarnessResult, String> {
1483 TriggerTestHarness::new(parse_rfc3339("2026-04-19T00:00:00Z"))
1484 .run(fixture)
1485 .await
1486}
1487
1488async fn build_inbox(event_log: &Arc<AnyEventLog>) -> Arc<InboxIndex> {
1489 let metrics = Arc::new(MetricsRegistry::default());
1490 Arc::new(
1491 InboxIndex::new(event_log.clone(), metrics)
1492 .await
1493 .expect("trigger harness inbox index should initialize"),
1494 )
1495}
1496
1497fn connector_ctx(
1498 event_log: Arc<AnyEventLog>,
1499 secrets: Arc<dyn SecretProvider>,
1500 inbox: Arc<InboxIndex>,
1501) -> ConnectorCtx {
1502 ConnectorCtx {
1503 event_log,
1504 secrets,
1505 inbox,
1506 metrics: Arc::new(MetricsRegistry::default()),
1507 rate_limiter: Arc::new(RateLimiterFactory::default()),
1508 }
1509}
1510
1511fn cron_binding(
1512 id: &str,
1513 schedule: &str,
1514 timezone: &str,
1515 catchup_mode: CatchupMode,
1516) -> ConnectorTriggerBinding {
1517 let mut binding = ConnectorTriggerBinding::new(ProviderId::from("cron"), "cron", id);
1518 binding.config = json!({
1519 "schedule": schedule,
1520 "timezone": timezone,
1521 "catchup_mode": catchup_mode,
1522 });
1523 binding
1524}
1525
1526fn webhook_binding(
1527 variant: WebhookSignatureVariant,
1528 dedupe_key: Option<&str>,
1529) -> ConnectorTriggerBinding {
1530 let mut binding =
1531 ConnectorTriggerBinding::new(ProviderId::from("webhook"), "webhook", "webhook.fixture");
1532 binding.dedupe_key = dedupe_key.map(ToString::to_string);
1533 binding.config = json!({
1534 "match": { "path": "/hooks/test" },
1535 "secrets": { "signing_secret": "webhook/test-signing-secret" },
1536 "webhook": {
1537 "signature_scheme": match variant {
1538 WebhookSignatureVariant::Standard => "standard",
1539 WebhookSignatureVariant::Stripe => "stripe",
1540 WebhookSignatureVariant::GitHub => "github",
1541 WebhookSignatureVariant::Slack => "slack",
1542 },
1543 "source": "fixtures",
1544 }
1545 });
1546 binding
1547}
1548
1549fn standard_raw_inbound() -> RawInbound {
1550 let mut raw = RawInbound::new(
1551 "",
1552 BTreeMap::from([
1553 (
1554 "webhook-id".to_string(),
1555 "msg_p5jXN8AQM9LWM0D4loKWxJek".to_string(),
1556 ),
1557 (
1558 "webhook-signature".to_string(),
1559 "v1,g0hM9SsE+OTPJTGt/tmIKtSyZlE3uFJELVlNIOLJ1OE=".to_string(),
1560 ),
1561 ("webhook-timestamp".to_string(), "1614265330".to_string()),
1562 ("Content-Type".to_string(), "application/json".to_string()),
1563 ]),
1564 br#"{"test": 2432232314}"#.to_vec(),
1565 );
1566 raw.received_at = OffsetDateTime::from_unix_timestamp(1_614_265_330).unwrap();
1567 raw
1568}
1569
1570fn github_raw_inbound() -> RawInbound {
1571 let mut raw = RawInbound::new(
1572 "",
1573 BTreeMap::from([
1574 (
1575 "X-Hub-Signature-256".to_string(),
1576 "sha256=757107ea0eb2509fc211221cce984b8a37570b6d7586c22c46f4379c8b043e17"
1577 .to_string(),
1578 ),
1579 ("X-GitHub-Delivery".to_string(), "delivery-123".to_string()),
1580 ("X-GitHub-Event".to_string(), "ping".to_string()),
1581 ("Content-Type".to_string(), "application/json".to_string()),
1582 ]),
1583 b"Hello, World!".to_vec(),
1584 );
1585 raw.received_at = OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap();
1586 raw
1587}
1588
1589fn manifest_spec(id: &str, fingerprint: &str) -> TriggerBindingSpec {
1590 TriggerBindingSpec {
1591 id: id.to_string(),
1592 source: TriggerBindingSource::Manifest,
1593 kind: "webhook".to_string(),
1594 provider: ProviderId::from("github"),
1595 autonomy_tier: crate::AutonomyTier::ActAuto,
1596 handler: TriggerHandlerSpec::Worker {
1597 queue: format!("{id}-queue"),
1598 },
1599 dispatch_priority: crate::WorkerQueuePriority::Normal,
1600 when: None,
1601 when_budget: None,
1602 retry: TriggerRetryConfig::default(),
1603 match_events: vec!["issues.opened".to_string()],
1604 dedupe_key: Some("event.dedupe_key".to_string()),
1605 dedupe_retention_days: DEFAULT_INBOX_RETENTION_DAYS,
1606 filter: None,
1607 daily_cost_usd: Some(5.0),
1608 hourly_cost_usd: None,
1609 max_autonomous_decisions_per_hour: None,
1610 max_autonomous_decisions_per_day: None,
1611 on_budget_exhausted: crate::TriggerBudgetExhaustionStrategy::False,
1612 max_concurrent: Some(2),
1613 flow_control: crate::triggers::TriggerFlowControlConfig::default(),
1614 manifest_path: Some(PathBuf::from("runtime://trigger-harness")),
1615 package_name: Some("trigger-harness".to_string()),
1616 definition_fingerprint: fingerprint.to_string(),
1617 }
1618}
1619
1620fn binding_state(bindings: &[TriggerBindingSnapshot], version: u32) -> Option<TriggerState> {
1621 bindings
1622 .iter()
1623 .find(|binding| binding.id == "reload.fixture" && binding.version == version)
1624 .map(|binding| binding.state)
1625}
1626
1627fn file_event_log(path: PathBuf) -> Result<Arc<AnyEventLog>, String> {
1628 Ok(Arc::new(AnyEventLog::File(
1629 FileEventLog::open(path, 32).map_err(|error| error.to_string())?,
1630 )))
1631}
1632
1633fn unique_temp_dir() -> Result<PathBuf, String> {
1634 let path = std::env::temp_dir().join(format!(
1635 "harn-trigger-harness-{}-{}",
1636 std::process::id(),
1637 Uuid::now_v7()
1638 ));
1639 fs::create_dir_all(&path).map_err(|error| error.to_string())?;
1640 Ok(path)
1641}
1642
1643async fn persist_in_flight(
1644 log: &Arc<AnyEventLog>,
1645 record: PersistedInFlight,
1646) -> Result<(), crate::event_log::LogError> {
1647 let topic = Topic::new(IN_FLIGHT_TOPIC).expect("in-flight topic should be valid");
1648 log.append(
1649 &topic,
1650 LogEvent::new(
1651 "in_flight",
1652 serde_json::to_value(record).expect("persisted in-flight record should serialize"),
1653 ),
1654 )
1655 .await?;
1656 Ok(())
1657}
1658
1659async fn load_pending_in_flight(
1660 log: &Arc<AnyEventLog>,
1661) -> Result<Vec<PersistedInFlight>, crate::event_log::LogError> {
1662 let topic = Topic::new(IN_FLIGHT_TOPIC).expect("in-flight topic should be valid");
1663 let events = log.read_range(&topic, None, usize::MAX).await?;
1664 let mut latest = HashMap::new();
1665 for (_, event) in events {
1666 let Ok(record) = serde_json::from_value::<PersistedInFlight>(event.payload) else {
1667 continue;
1668 };
1669 latest.insert(record.event_id.clone(), record);
1670 }
1671 Ok(latest
1672 .into_values()
1673 .filter(|record| record.status == "started")
1674 .collect())
1675}
1676
1677fn synthetic_event(binding_id: &str, dedupe_key: &str, tenant_id: Option<&str>) -> TriggerEvent {
1678 TriggerEvent::new(
1679 ProviderId::from("webhook"),
1680 binding_id,
1681 Some(clock::now_utc()),
1682 dedupe_key,
1683 tenant_id.map(TenantId::new),
1684 BTreeMap::new(),
1685 ProviderPayload::Known(KnownProviderPayload::Webhook(GenericWebhookPayload {
1686 source: Some("trigger-test-harness".to_string()),
1687 content_type: Some("application/json".to_string()),
1688 raw: json!({
1689 "binding_id": binding_id,
1690 }),
1691 })),
1692 SignatureStatus::Unsigned,
1693 )
1694}
1695
1696async fn a2a_push_fixture_connector() -> Result<(A2aPushConnector, Arc<InboxIndex>), String> {
1697 let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
1698 let inbox = build_inbox(&log).await;
1699 let mut connector = A2aPushConnector::new();
1700 connector
1701 .init(connector_ctx(
1702 log,
1703 Arc::new(EmptySecretProvider),
1704 inbox.clone(),
1705 ))
1706 .await
1707 .map_err(|error| error.to_string())?;
1708 connector
1709 .activate(&[ConnectorTriggerBinding {
1710 provider: ProviderId::from("a2a-push"),
1711 kind: crate::connectors::TriggerKind::from("a2a-push"),
1712 binding_id: "a2a.push.fixture".to_string(),
1713 dedupe_key: None,
1714 dedupe_retention_days: DEFAULT_INBOX_RETENTION_DAYS,
1715 config: json!({
1716 "a2a_push": {
1717 "expected_iss": "reviewer.prod",
1718 "expected_aud": "https://orchestrator.test/a2a/review",
1719 "expected_token": "opaque-token",
1720 "inline_jwks": {
1721 "keys": [{
1722 "kty": "oct",
1723 "kid": "test-key",
1724 "alg": "HS256",
1725 "k": "c2VjcmV0"
1726 }]
1727 }
1728 }
1729 }),
1730 }])
1731 .await
1732 .map_err(|error| error.to_string())?;
1733 Ok((connector, inbox))
1734}
1735
1736fn a2a_push_raw(jti: &str) -> RawInbound {
1737 let mut headers = BTreeMap::new();
1738 headers.insert(
1739 "authorization".to_string(),
1740 format!("Bearer {}", a2a_push_fixture_jwt(jti)),
1741 );
1742 headers.insert(
1743 "content-type".to_string(),
1744 "application/a2a+json".to_string(),
1745 );
1746 let mut raw = RawInbound::new(
1747 "",
1748 headers,
1749 serde_json::to_vec(&json!({
1750 "statusUpdate": {
1751 "taskId": "task-123",
1752 "contextId": "ctx-123",
1753 "status": {"state": "completed"}
1754 }
1755 }))
1756 .expect("serialize a2a push fixture"),
1757 );
1758 raw.metadata = json!({"binding_id": "a2a.push.fixture"});
1759 raw
1760}
1761
1762#[derive(Serialize)]
1763struct A2aFixtureClaims {
1764 iss: String,
1765 aud: String,
1766 iat: i64,
1767 exp: i64,
1768 jti: String,
1769 token: String,
1770 #[serde(rename = "taskId")]
1771 task_id: String,
1772}
1773
1774fn a2a_push_fixture_jwt(jti: &str) -> String {
1775 let mut header = jsonwebtoken::Header::new(jsonwebtoken::Algorithm::HS256);
1776 header.kid = Some("test-key".to_string());
1777 jsonwebtoken::encode(
1778 &header,
1779 &A2aFixtureClaims {
1780 iss: "reviewer.prod".to_string(),
1781 aud: "https://orchestrator.test/a2a/review".to_string(),
1782 iat: OffsetDateTime::now_utc().unix_timestamp(),
1783 exp: OffsetDateTime::now_utc().unix_timestamp() + 300,
1784 jti: jti.to_string(),
1785 token: "opaque-token".to_string(),
1786 task_id: "task-123".to_string(),
1787 },
1788 &jsonwebtoken::EncodingKey::from_secret(b"secret"),
1789 )
1790 .expect("encode fixture jwt")
1791}
1792
1793fn parse_rfc3339(raw: &str) -> OffsetDateTime {
1794 OffsetDateTime::parse(raw, &time::format_description::well_known::Rfc3339)
1795 .expect("fixture timestamp should parse")
1796}
1797
1798fn format_rfc3339(value: OffsetDateTime) -> String {
1799 value
1800 .format(&time::format_description::well_known::Rfc3339)
1801 .unwrap_or_default()
1802}
1803
1804fn wait_until_wall_clock_after(timestamp: OffsetDateTime) {
1805 let timestamp_ms = timestamp.unix_timestamp_nanos() / 1_000_000;
1806 while OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000 <= timestamp_ms {
1807 std::hint::spin_loop();
1808 std::thread::yield_now();
1809 }
1810}
1811
1812fn signature_state_label(value: &SignatureStatus) -> &'static str {
1813 match value {
1814 SignatureStatus::Verified => "verified",
1815 SignatureStatus::Unsigned => "unsigned",
1816 SignatureStatus::Failed { .. } => "failed",
1817 }
1818}
1819
1820#[cfg(test)]
1821mod tests {
1822 use super::{run_trigger_harness_fixture, TRIGGER_TEST_FIXTURES};
1823
1824 #[tokio::test(flavor = "current_thread")]
1825 async fn every_trigger_harness_fixture_reports_success() {
1826 for fixture in TRIGGER_TEST_FIXTURES {
1827 let result = run_trigger_harness_fixture(fixture)
1828 .await
1829 .unwrap_or_else(|error| panic!("{fixture} should run: {error}"));
1830 assert!(result.ok, "{fixture} should report success: {result:?}");
1831 }
1832 }
1833}