1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, OnceLock, RwLock};
3
4use serde::{Deserialize, Serialize};
5use serde_json::Value as JsonValue;
6use time::OffsetDateTime;
7use uuid::Uuid;
8
9use crate::triggers::test_util::clock;
10
11const REDACTED_HEADER_VALUE: &str = "[redacted]";
12
13#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
14#[serde(transparent)]
15pub struct TriggerEventId(pub String);
16
17impl TriggerEventId {
18 pub fn new() -> Self {
19 Self(format!("trigger_evt_{}", Uuid::now_v7()))
20 }
21}
22
23impl Default for TriggerEventId {
24 fn default() -> Self {
25 Self::new()
26 }
27}
28
29#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
30#[serde(transparent)]
31pub struct ProviderId(pub String);
32
33impl ProviderId {
34 pub fn new(value: impl Into<String>) -> Self {
35 Self(value.into())
36 }
37
38 pub fn as_str(&self) -> &str {
39 self.0.as_str()
40 }
41}
42
43impl From<&str> for ProviderId {
44 fn from(value: &str) -> Self {
45 Self::new(value)
46 }
47}
48
49impl From<String> for ProviderId {
50 fn from(value: String) -> Self {
51 Self::new(value)
52 }
53}
54
55#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
56#[serde(transparent)]
57pub struct TraceId(pub String);
58
59impl TraceId {
60 pub fn new() -> Self {
61 Self(format!("trace_{}", Uuid::now_v7()))
62 }
63}
64
65impl Default for TraceId {
66 fn default() -> Self {
67 Self::new()
68 }
69}
70
71#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
72#[serde(transparent)]
73pub struct TenantId(pub String);
74
75impl TenantId {
76 pub fn new(value: impl Into<String>) -> Self {
77 Self(value.into())
78 }
79}
80
81#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
82#[serde(tag = "state", rename_all = "snake_case")]
83pub enum SignatureStatus {
84 Verified,
85 Unsigned,
86 Failed { reason: String },
87}
88
89#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
90pub struct GitHubEventCommon {
91 pub event: String,
92 pub action: Option<String>,
93 pub delivery_id: Option<String>,
94 pub installation_id: Option<i64>,
95 pub raw: JsonValue,
96}
97
98#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
99pub struct GitHubIssuesEventPayload {
100 #[serde(flatten)]
101 pub common: GitHubEventCommon,
102 pub issue: JsonValue,
103}
104
105#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
106pub struct GitHubPullRequestEventPayload {
107 #[serde(flatten)]
108 pub common: GitHubEventCommon,
109 pub pull_request: JsonValue,
110}
111
112#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
113pub struct GitHubIssueCommentEventPayload {
114 #[serde(flatten)]
115 pub common: GitHubEventCommon,
116 pub issue: JsonValue,
117 pub comment: JsonValue,
118}
119
120#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
121pub struct GitHubPullRequestReviewEventPayload {
122 #[serde(flatten)]
123 pub common: GitHubEventCommon,
124 pub pull_request: JsonValue,
125 pub review: JsonValue,
126}
127
128#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
129pub struct GitHubPushEventPayload {
130 #[serde(flatten)]
131 pub common: GitHubEventCommon,
132 #[serde(default)]
133 pub commits: Vec<JsonValue>,
134 pub distinct_size: Option<i64>,
135}
136
137#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
138pub struct GitHubWorkflowRunEventPayload {
139 #[serde(flatten)]
140 pub common: GitHubEventCommon,
141 pub workflow_run: JsonValue,
142}
143
144#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
145#[serde(untagged)]
146pub enum GitHubEventPayload {
147 Issues(GitHubIssuesEventPayload),
148 PullRequest(GitHubPullRequestEventPayload),
149 IssueComment(GitHubIssueCommentEventPayload),
150 PullRequestReview(GitHubPullRequestReviewEventPayload),
151 Push(GitHubPushEventPayload),
152 WorkflowRun(GitHubWorkflowRunEventPayload),
153 Other(GitHubEventCommon),
154}
155
156#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
157pub struct SlackEventCommon {
158 pub event: String,
159 pub event_id: Option<String>,
160 pub api_app_id: Option<String>,
161 pub team_id: Option<String>,
162 pub channel_id: Option<String>,
163 pub user_id: Option<String>,
164 pub event_ts: Option<String>,
165 pub raw: JsonValue,
166}
167
168#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
169pub struct SlackMessageChannelsEventPayload {
170 #[serde(flatten)]
171 pub common: SlackEventCommon,
172 pub subtype: Option<String>,
173 pub channel: Option<String>,
174 pub user: Option<String>,
175 pub text: Option<String>,
176 pub ts: Option<String>,
177 pub thread_ts: Option<String>,
178}
179
180#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
181pub struct SlackAppMentionEventPayload {
182 #[serde(flatten)]
183 pub common: SlackEventCommon,
184 pub channel: Option<String>,
185 pub user: Option<String>,
186 pub text: Option<String>,
187 pub ts: Option<String>,
188 pub thread_ts: Option<String>,
189}
190
191#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
192pub struct SlackReactionAddedEventPayload {
193 #[serde(flatten)]
194 pub common: SlackEventCommon,
195 pub reaction: Option<String>,
196 pub item_user: Option<String>,
197 pub item: JsonValue,
198}
199
200#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
201pub struct SlackTeamJoinEventPayload {
202 #[serde(flatten)]
203 pub common: SlackEventCommon,
204 pub user: JsonValue,
205}
206
207#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
208pub struct SlackChannelCreatedEventPayload {
209 #[serde(flatten)]
210 pub common: SlackEventCommon,
211 pub channel: JsonValue,
212}
213
214#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
215#[serde(untagged)]
216pub enum SlackEventPayload {
217 MessageChannels(SlackMessageChannelsEventPayload),
218 AppMention(SlackAppMentionEventPayload),
219 ReactionAdded(SlackReactionAddedEventPayload),
220 TeamJoin(SlackTeamJoinEventPayload),
221 ChannelCreated(SlackChannelCreatedEventPayload),
222 Other(SlackEventCommon),
223}
224
225#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
226pub struct LinearEventPayload {
227 pub action: Option<String>,
228 pub organization_id: Option<String>,
229 pub webhook_timestamp: Option<String>,
230 pub raw: JsonValue,
231}
232
233#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
234pub struct NotionEventPayload {
235 pub event: String,
236 pub workspace_id: Option<String>,
237 pub request_id: Option<String>,
238 pub raw: JsonValue,
239}
240
241#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
242pub struct CronEventPayload {
243 pub cron_id: Option<String>,
244 pub schedule: Option<String>,
245 #[serde(with = "time::serde::rfc3339")]
246 pub tick_at: OffsetDateTime,
247 pub raw: JsonValue,
248}
249
250#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
251pub struct GenericWebhookPayload {
252 pub source: Option<String>,
253 pub content_type: Option<String>,
254 pub raw: JsonValue,
255}
256
257#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
258pub struct A2aPushPayload {
259 pub task_id: Option<String>,
260 pub sender: Option<String>,
261 pub raw: JsonValue,
262}
263
264#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
265pub struct ExtensionProviderPayload {
266 pub provider: String,
267 pub schema_name: String,
268 pub raw: JsonValue,
269}
270
271#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
272#[serde(untagged)]
273pub enum ProviderPayload {
274 Known(KnownProviderPayload),
275 Extension(ExtensionProviderPayload),
276}
277
278impl ProviderPayload {
279 pub fn provider(&self) -> &str {
280 match self {
281 Self::Known(known) => known.provider(),
282 Self::Extension(payload) => payload.provider.as_str(),
283 }
284 }
285
286 pub fn normalize(
287 provider: &ProviderId,
288 kind: &str,
289 headers: &BTreeMap<String, String>,
290 raw: JsonValue,
291 ) -> Result<Self, ProviderCatalogError> {
292 provider_catalog()
293 .read()
294 .expect("provider catalog poisoned")
295 .normalize(provider, kind, headers, raw)
296 }
297}
298
299#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
300#[serde(tag = "provider")]
301pub enum KnownProviderPayload {
302 #[serde(rename = "github")]
303 GitHub(GitHubEventPayload),
304 #[serde(rename = "slack")]
305 Slack(Box<SlackEventPayload>),
306 #[serde(rename = "linear")]
307 Linear(LinearEventPayload),
308 #[serde(rename = "notion")]
309 Notion(NotionEventPayload),
310 #[serde(rename = "cron")]
311 Cron(CronEventPayload),
312 #[serde(rename = "webhook")]
313 Webhook(GenericWebhookPayload),
314 #[serde(rename = "a2a-push")]
315 A2aPush(A2aPushPayload),
316}
317
318impl KnownProviderPayload {
319 pub fn provider(&self) -> &str {
320 match self {
321 Self::GitHub(_) => "github",
322 Self::Slack(_) => "slack",
323 Self::Linear(_) => "linear",
324 Self::Notion(_) => "notion",
325 Self::Cron(_) => "cron",
326 Self::Webhook(_) => "webhook",
327 Self::A2aPush(_) => "a2a-push",
328 }
329 }
330}
331
332#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
333pub struct TriggerEvent {
334 pub id: TriggerEventId,
335 pub provider: ProviderId,
336 pub kind: String,
337 #[serde(with = "time::serde::rfc3339")]
338 pub received_at: OffsetDateTime,
339 #[serde(with = "time::serde::rfc3339::option")]
340 pub occurred_at: Option<OffsetDateTime>,
341 pub dedupe_key: String,
342 pub trace_id: TraceId,
343 pub tenant_id: Option<TenantId>,
344 pub headers: BTreeMap<String, String>,
345 #[serde(default, skip_serializing_if = "Option::is_none")]
346 pub batch: Option<Vec<JsonValue>>,
347 pub provider_payload: ProviderPayload,
348 pub signature_status: SignatureStatus,
349 #[serde(skip)]
350 pub dedupe_claimed: bool,
351}
352
353impl TriggerEvent {
354 #[allow(clippy::too_many_arguments)]
355 pub fn new(
356 provider: ProviderId,
357 kind: impl Into<String>,
358 occurred_at: Option<OffsetDateTime>,
359 dedupe_key: impl Into<String>,
360 tenant_id: Option<TenantId>,
361 headers: BTreeMap<String, String>,
362 provider_payload: ProviderPayload,
363 signature_status: SignatureStatus,
364 ) -> Self {
365 Self {
366 id: TriggerEventId::new(),
367 provider,
368 kind: kind.into(),
369 received_at: clock::now_utc(),
370 occurred_at,
371 dedupe_key: dedupe_key.into(),
372 trace_id: TraceId::new(),
373 tenant_id,
374 headers,
375 batch: None,
376 provider_payload,
377 signature_status,
378 dedupe_claimed: false,
379 }
380 }
381
382 pub fn dedupe_claimed(&self) -> bool {
383 self.dedupe_claimed
384 }
385
386 pub fn mark_dedupe_claimed(&mut self) {
387 self.dedupe_claimed = true;
388 }
389}
390
391#[derive(Clone, Debug, PartialEq, Eq)]
392pub struct HeaderRedactionPolicy {
393 safe_exact_names: BTreeSet<String>,
394}
395
396impl HeaderRedactionPolicy {
397 pub fn with_safe_header(mut self, name: impl Into<String>) -> Self {
398 self.safe_exact_names
399 .insert(name.into().to_ascii_lowercase());
400 self
401 }
402
403 fn should_keep(&self, name: &str) -> bool {
404 let lower = name.to_ascii_lowercase();
405 if self.safe_exact_names.contains(lower.as_str()) {
406 return true;
407 }
408 matches!(
409 lower.as_str(),
410 "user-agent"
411 | "request-id"
412 | "x-request-id"
413 | "x-correlation-id"
414 | "content-type"
415 | "content-length"
416 | "x-github-event"
417 | "x-github-delivery"
418 | "x-github-hook-id"
419 | "x-hub-signature-256"
420 | "x-slack-request-timestamp"
421 | "x-slack-signature"
422 | "x-linear-signature"
423 | "x-notion-signature"
424 | "x-a2a-signature"
425 | "x-a2a-delivery"
426 ) || lower.ends_with("-event")
427 || lower.ends_with("-delivery")
428 || lower.contains("timestamp")
429 || lower.contains("request-id")
430 }
431
432 fn should_redact(&self, name: &str) -> bool {
433 let lower = name.to_ascii_lowercase();
434 if self.should_keep(lower.as_str()) {
435 return false;
436 }
437 lower.contains("authorization")
438 || lower.contains("cookie")
439 || lower.contains("secret")
440 || lower.contains("token")
441 || lower.contains("key")
442 }
443}
444
445impl Default for HeaderRedactionPolicy {
446 fn default() -> Self {
447 Self {
448 safe_exact_names: BTreeSet::from([
449 "content-length".to_string(),
450 "content-type".to_string(),
451 "request-id".to_string(),
452 "user-agent".to_string(),
453 "x-a2a-delivery".to_string(),
454 "x-a2a-signature".to_string(),
455 "x-correlation-id".to_string(),
456 "x-github-delivery".to_string(),
457 "x-github-event".to_string(),
458 "x-github-hook-id".to_string(),
459 "x-hub-signature-256".to_string(),
460 "x-linear-signature".to_string(),
461 "x-notion-signature".to_string(),
462 "x-request-id".to_string(),
463 "x-slack-request-timestamp".to_string(),
464 "x-slack-signature".to_string(),
465 ]),
466 }
467 }
468}
469
470pub fn redact_headers(
471 headers: &BTreeMap<String, String>,
472 policy: &HeaderRedactionPolicy,
473) -> BTreeMap<String, String> {
474 headers
475 .iter()
476 .map(|(name, value)| {
477 if policy.should_redact(name) {
478 (name.clone(), REDACTED_HEADER_VALUE.to_string())
479 } else {
480 (name.clone(), value.clone())
481 }
482 })
483 .collect()
484}
485
486#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
487pub struct ProviderSecretRequirement {
488 pub name: String,
489 pub required: bool,
490 pub namespace: String,
491}
492
493#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
494pub struct ProviderOutboundMethod {
495 pub name: String,
496}
497
498#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
499#[serde(tag = "kind", rename_all = "snake_case")]
500pub enum SignatureVerificationMetadata {
501 #[default]
502 None,
503 Hmac {
504 variant: String,
505 raw_body: bool,
506 signature_header: String,
507 timestamp_header: Option<String>,
508 id_header: Option<String>,
509 default_tolerance_secs: Option<i64>,
510 digest: String,
511 encoding: String,
512 },
513}
514
515#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
516#[serde(tag = "kind", rename_all = "snake_case")]
517pub enum ProviderRuntimeMetadata {
518 Builtin {
519 connector: String,
520 default_signature_variant: Option<String>,
521 },
522 #[default]
523 Placeholder,
524}
525
526#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
527pub struct ProviderMetadata {
528 pub provider: String,
529 #[serde(default)]
530 pub kinds: Vec<String>,
531 pub schema_name: String,
532 #[serde(default)]
533 pub outbound_methods: Vec<ProviderOutboundMethod>,
534 #[serde(default)]
535 pub secret_requirements: Vec<ProviderSecretRequirement>,
536 #[serde(default)]
537 pub signature_verification: SignatureVerificationMetadata,
538 #[serde(default)]
539 pub runtime: ProviderRuntimeMetadata,
540}
541
542impl ProviderMetadata {
543 pub fn supports_kind(&self, kind: &str) -> bool {
544 self.kinds.iter().any(|candidate| candidate == kind)
545 }
546
547 pub fn required_secret_names(&self) -> impl Iterator<Item = &str> {
548 self.secret_requirements
549 .iter()
550 .filter(|requirement| requirement.required)
551 .map(|requirement| requirement.name.as_str())
552 }
553}
554
555pub trait ProviderSchema: Send + Sync {
556 fn provider_id(&self) -> &'static str;
557 fn harn_schema_name(&self) -> &'static str;
558 fn metadata(&self) -> ProviderMetadata {
559 ProviderMetadata {
560 provider: self.provider_id().to_string(),
561 schema_name: self.harn_schema_name().to_string(),
562 ..ProviderMetadata::default()
563 }
564 }
565 fn normalize(
566 &self,
567 kind: &str,
568 headers: &BTreeMap<String, String>,
569 raw: JsonValue,
570 ) -> Result<ProviderPayload, ProviderCatalogError>;
571}
572
573#[derive(Clone, Debug, PartialEq, Eq)]
574pub enum ProviderCatalogError {
575 DuplicateProvider(String),
576 UnknownProvider(String),
577}
578
579impl std::fmt::Display for ProviderCatalogError {
580 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
581 match self {
582 Self::DuplicateProvider(provider) => {
583 write!(f, "provider `{provider}` is already registered")
584 }
585 Self::UnknownProvider(provider) => write!(f, "provider `{provider}` is not registered"),
586 }
587 }
588}
589
590impl std::error::Error for ProviderCatalogError {}
591
592#[derive(Clone, Default)]
593pub struct ProviderCatalog {
594 providers: BTreeMap<String, Arc<dyn ProviderSchema>>,
595}
596
597impl ProviderCatalog {
598 pub fn with_defaults() -> Self {
599 let mut catalog = Self::default();
600 for schema in default_provider_schemas() {
601 catalog
602 .register(schema)
603 .expect("default providers must register cleanly");
604 }
605 catalog
606 }
607
608 pub fn register(
609 &mut self,
610 schema: Arc<dyn ProviderSchema>,
611 ) -> Result<(), ProviderCatalogError> {
612 let provider = schema.provider_id().to_string();
613 if self.providers.contains_key(provider.as_str()) {
614 return Err(ProviderCatalogError::DuplicateProvider(provider));
615 }
616 self.providers.insert(provider, schema);
617 Ok(())
618 }
619
620 pub fn normalize(
621 &self,
622 provider: &ProviderId,
623 kind: &str,
624 headers: &BTreeMap<String, String>,
625 raw: JsonValue,
626 ) -> Result<ProviderPayload, ProviderCatalogError> {
627 let schema = self
628 .providers
629 .get(provider.as_str())
630 .ok_or_else(|| ProviderCatalogError::UnknownProvider(provider.0.clone()))?;
631 schema.normalize(kind, headers, raw)
632 }
633
634 pub fn schema_names(&self) -> BTreeMap<String, String> {
635 self.providers
636 .iter()
637 .map(|(provider, schema)| (provider.clone(), schema.harn_schema_name().to_string()))
638 .collect()
639 }
640
641 pub fn entries(&self) -> Vec<ProviderMetadata> {
642 self.providers
643 .values()
644 .map(|schema| schema.metadata())
645 .collect()
646 }
647
648 pub fn metadata_for(&self, provider: &str) -> Option<ProviderMetadata> {
649 self.providers.get(provider).map(|schema| schema.metadata())
650 }
651}
652
653pub fn register_provider_schema(
654 schema: Arc<dyn ProviderSchema>,
655) -> Result<(), ProviderCatalogError> {
656 provider_catalog()
657 .write()
658 .expect("provider catalog poisoned")
659 .register(schema)
660}
661
662pub fn reset_provider_catalog() {
663 *provider_catalog()
664 .write()
665 .expect("provider catalog poisoned") = ProviderCatalog::with_defaults();
666}
667
668pub fn registered_provider_schema_names() -> BTreeMap<String, String> {
669 provider_catalog()
670 .read()
671 .expect("provider catalog poisoned")
672 .schema_names()
673}
674
675pub fn registered_provider_metadata() -> Vec<ProviderMetadata> {
676 provider_catalog()
677 .read()
678 .expect("provider catalog poisoned")
679 .entries()
680}
681
682pub fn provider_metadata(provider: &str) -> Option<ProviderMetadata> {
683 provider_catalog()
684 .read()
685 .expect("provider catalog poisoned")
686 .metadata_for(provider)
687}
688
689fn provider_catalog() -> &'static RwLock<ProviderCatalog> {
690 static PROVIDER_CATALOG: OnceLock<RwLock<ProviderCatalog>> = OnceLock::new();
691 PROVIDER_CATALOG.get_or_init(|| RwLock::new(ProviderCatalog::with_defaults()))
692}
693
694struct BuiltinProviderSchema {
695 provider_id: &'static str,
696 harn_schema_name: &'static str,
697 metadata: ProviderMetadata,
698 normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
699}
700
701impl ProviderSchema for BuiltinProviderSchema {
702 fn provider_id(&self) -> &'static str {
703 self.provider_id
704 }
705
706 fn harn_schema_name(&self) -> &'static str {
707 self.harn_schema_name
708 }
709
710 fn metadata(&self) -> ProviderMetadata {
711 self.metadata.clone()
712 }
713
714 fn normalize(
715 &self,
716 kind: &str,
717 headers: &BTreeMap<String, String>,
718 raw: JsonValue,
719 ) -> Result<ProviderPayload, ProviderCatalogError> {
720 Ok((self.normalize)(kind, headers, raw))
721 }
722}
723
724fn provider_metadata_entry(
725 provider: &str,
726 kinds: &[&str],
727 schema_name: &str,
728 signature_verification: SignatureVerificationMetadata,
729 secret_requirements: Vec<ProviderSecretRequirement>,
730 runtime: ProviderRuntimeMetadata,
731) -> ProviderMetadata {
732 ProviderMetadata {
733 provider: provider.to_string(),
734 kinds: kinds.iter().map(|kind| kind.to_string()).collect(),
735 schema_name: schema_name.to_string(),
736 outbound_methods: Vec::new(),
737 secret_requirements,
738 signature_verification,
739 runtime,
740 }
741}
742
743fn hmac_signature_metadata(
744 variant: &str,
745 signature_header: &str,
746 timestamp_header: Option<&str>,
747 id_header: Option<&str>,
748 default_tolerance_secs: Option<i64>,
749 encoding: &str,
750) -> SignatureVerificationMetadata {
751 SignatureVerificationMetadata::Hmac {
752 variant: variant.to_string(),
753 raw_body: true,
754 signature_header: signature_header.to_string(),
755 timestamp_header: timestamp_header.map(ToString::to_string),
756 id_header: id_header.map(ToString::to_string),
757 default_tolerance_secs,
758 digest: "sha256".to_string(),
759 encoding: encoding.to_string(),
760 }
761}
762
763fn required_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
764 ProviderSecretRequirement {
765 name: name.to_string(),
766 required: true,
767 namespace: namespace.to_string(),
768 }
769}
770
771fn default_provider_schemas() -> Vec<Arc<dyn ProviderSchema>> {
772 vec![
773 Arc::new(BuiltinProviderSchema {
774 provider_id: "github",
775 harn_schema_name: "GitHubEventPayload",
776 metadata: provider_metadata_entry(
777 "github",
778 &["webhook"],
779 "GitHubEventPayload",
780 hmac_signature_metadata(
781 "github",
782 "X-Hub-Signature-256",
783 None,
784 Some("X-GitHub-Delivery"),
785 None,
786 "hex",
787 ),
788 vec![required_secret("signing_secret", "github")],
789 ProviderRuntimeMetadata::Builtin {
790 connector: "webhook".to_string(),
791 default_signature_variant: Some("github".to_string()),
792 },
793 ),
794 normalize: github_payload,
795 }),
796 Arc::new(BuiltinProviderSchema {
797 provider_id: "slack",
798 harn_schema_name: "SlackEventPayload",
799 metadata: provider_metadata_entry(
800 "slack",
801 &["webhook"],
802 "SlackEventPayload",
803 hmac_signature_metadata(
804 "slack",
805 "X-Slack-Signature",
806 Some("X-Slack-Request-Timestamp"),
807 None,
808 Some(300),
809 "hex",
810 ),
811 vec![required_secret("signing_secret", "slack")],
812 ProviderRuntimeMetadata::Builtin {
813 connector: "slack".to_string(),
814 default_signature_variant: Some("slack".to_string()),
815 },
816 ),
817 normalize: slack_payload,
818 }),
819 Arc::new(BuiltinProviderSchema {
820 provider_id: "linear",
821 harn_schema_name: "LinearEventPayload",
822 metadata: provider_metadata_entry(
823 "linear",
824 &["webhook"],
825 "LinearEventPayload",
826 SignatureVerificationMetadata::None,
827 Vec::new(),
828 ProviderRuntimeMetadata::Placeholder,
829 ),
830 normalize: linear_payload,
831 }),
832 Arc::new(BuiltinProviderSchema {
833 provider_id: "notion",
834 harn_schema_name: "NotionEventPayload",
835 metadata: provider_metadata_entry(
836 "notion",
837 &["webhook"],
838 "NotionEventPayload",
839 SignatureVerificationMetadata::None,
840 Vec::new(),
841 ProviderRuntimeMetadata::Placeholder,
842 ),
843 normalize: notion_payload,
844 }),
845 Arc::new(BuiltinProviderSchema {
846 provider_id: "cron",
847 harn_schema_name: "CronEventPayload",
848 metadata: provider_metadata_entry(
849 "cron",
850 &["cron"],
851 "CronEventPayload",
852 SignatureVerificationMetadata::None,
853 Vec::new(),
854 ProviderRuntimeMetadata::Builtin {
855 connector: "cron".to_string(),
856 default_signature_variant: None,
857 },
858 ),
859 normalize: cron_payload,
860 }),
861 Arc::new(BuiltinProviderSchema {
862 provider_id: "webhook",
863 harn_schema_name: "GenericWebhookPayload",
864 metadata: provider_metadata_entry(
865 "webhook",
866 &["webhook"],
867 "GenericWebhookPayload",
868 hmac_signature_metadata(
869 "standard",
870 "webhook-signature",
871 Some("webhook-timestamp"),
872 Some("webhook-id"),
873 Some(300),
874 "base64",
875 ),
876 vec![required_secret("signing_secret", "webhook")],
877 ProviderRuntimeMetadata::Builtin {
878 connector: "webhook".to_string(),
879 default_signature_variant: Some("standard".to_string()),
880 },
881 ),
882 normalize: webhook_payload,
883 }),
884 Arc::new(BuiltinProviderSchema {
885 provider_id: "a2a-push",
886 harn_schema_name: "A2aPushPayload",
887 metadata: provider_metadata_entry(
888 "a2a-push",
889 &["a2a-push"],
890 "A2aPushPayload",
891 SignatureVerificationMetadata::None,
892 Vec::new(),
893 ProviderRuntimeMetadata::Placeholder,
894 ),
895 normalize: a2a_push_payload,
896 }),
897 ]
898}
899
900fn github_payload(
901 kind: &str,
902 headers: &BTreeMap<String, String>,
903 raw: JsonValue,
904) -> ProviderPayload {
905 let common = GitHubEventCommon {
906 event: kind.to_string(),
907 action: raw
908 .get("action")
909 .and_then(JsonValue::as_str)
910 .map(ToString::to_string),
911 delivery_id: headers.get("X-GitHub-Delivery").cloned(),
912 installation_id: raw
913 .get("installation")
914 .and_then(|value| value.get("id"))
915 .and_then(JsonValue::as_i64),
916 raw: raw.clone(),
917 };
918 let payload = match kind {
919 "issues" => GitHubEventPayload::Issues(GitHubIssuesEventPayload {
920 common,
921 issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
922 }),
923 "pull_request" => GitHubEventPayload::PullRequest(GitHubPullRequestEventPayload {
924 common,
925 pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
926 }),
927 "issue_comment" => GitHubEventPayload::IssueComment(GitHubIssueCommentEventPayload {
928 common,
929 issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
930 comment: raw.get("comment").cloned().unwrap_or(JsonValue::Null),
931 }),
932 "pull_request_review" => {
933 GitHubEventPayload::PullRequestReview(GitHubPullRequestReviewEventPayload {
934 common,
935 pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
936 review: raw.get("review").cloned().unwrap_or(JsonValue::Null),
937 })
938 }
939 "push" => GitHubEventPayload::Push(GitHubPushEventPayload {
940 common,
941 commits: raw
942 .get("commits")
943 .and_then(JsonValue::as_array)
944 .cloned()
945 .unwrap_or_default(),
946 distinct_size: raw.get("distinct_size").and_then(JsonValue::as_i64),
947 }),
948 "workflow_run" => GitHubEventPayload::WorkflowRun(GitHubWorkflowRunEventPayload {
949 common,
950 workflow_run: raw.get("workflow_run").cloned().unwrap_or(JsonValue::Null),
951 }),
952 _ => GitHubEventPayload::Other(common),
953 };
954 ProviderPayload::Known(KnownProviderPayload::GitHub(payload))
955}
956
957fn slack_payload(
958 kind: &str,
959 _headers: &BTreeMap<String, String>,
960 raw: JsonValue,
961) -> ProviderPayload {
962 let event = raw.get("event");
963 let common = SlackEventCommon {
964 event: kind.to_string(),
965 event_id: raw
966 .get("event_id")
967 .and_then(JsonValue::as_str)
968 .map(ToString::to_string),
969 api_app_id: raw
970 .get("api_app_id")
971 .and_then(JsonValue::as_str)
972 .map(ToString::to_string),
973 team_id: raw
974 .get("team_id")
975 .and_then(JsonValue::as_str)
976 .map(ToString::to_string),
977 channel_id: slack_channel_id(event),
978 user_id: slack_user_id(event),
979 event_ts: event
980 .and_then(|value| value.get("event_ts"))
981 .and_then(JsonValue::as_str)
982 .map(ToString::to_string),
983 raw: raw.clone(),
984 };
985 let payload = match kind {
986 "message.channels" => {
987 SlackEventPayload::MessageChannels(SlackMessageChannelsEventPayload {
988 subtype: event
989 .and_then(|value| value.get("subtype"))
990 .and_then(JsonValue::as_str)
991 .map(ToString::to_string),
992 channel: event
993 .and_then(|value| value.get("channel"))
994 .and_then(JsonValue::as_str)
995 .map(ToString::to_string),
996 user: event
997 .and_then(|value| value.get("user"))
998 .and_then(JsonValue::as_str)
999 .map(ToString::to_string),
1000 text: event
1001 .and_then(|value| value.get("text"))
1002 .and_then(JsonValue::as_str)
1003 .map(ToString::to_string),
1004 ts: event
1005 .and_then(|value| value.get("ts"))
1006 .and_then(JsonValue::as_str)
1007 .map(ToString::to_string),
1008 thread_ts: event
1009 .and_then(|value| value.get("thread_ts"))
1010 .and_then(JsonValue::as_str)
1011 .map(ToString::to_string),
1012 common,
1013 })
1014 }
1015 "app_mention" => SlackEventPayload::AppMention(SlackAppMentionEventPayload {
1016 channel: event
1017 .and_then(|value| value.get("channel"))
1018 .and_then(JsonValue::as_str)
1019 .map(ToString::to_string),
1020 user: event
1021 .and_then(|value| value.get("user"))
1022 .and_then(JsonValue::as_str)
1023 .map(ToString::to_string),
1024 text: event
1025 .and_then(|value| value.get("text"))
1026 .and_then(JsonValue::as_str)
1027 .map(ToString::to_string),
1028 ts: event
1029 .and_then(|value| value.get("ts"))
1030 .and_then(JsonValue::as_str)
1031 .map(ToString::to_string),
1032 thread_ts: event
1033 .and_then(|value| value.get("thread_ts"))
1034 .and_then(JsonValue::as_str)
1035 .map(ToString::to_string),
1036 common,
1037 }),
1038 "reaction_added" => SlackEventPayload::ReactionAdded(SlackReactionAddedEventPayload {
1039 reaction: event
1040 .and_then(|value| value.get("reaction"))
1041 .and_then(JsonValue::as_str)
1042 .map(ToString::to_string),
1043 item_user: event
1044 .and_then(|value| value.get("item_user"))
1045 .and_then(JsonValue::as_str)
1046 .map(ToString::to_string),
1047 item: event
1048 .and_then(|value| value.get("item"))
1049 .cloned()
1050 .unwrap_or(JsonValue::Null),
1051 common,
1052 }),
1053 "team_join" => SlackEventPayload::TeamJoin(SlackTeamJoinEventPayload {
1054 user: event
1055 .and_then(|value| value.get("user"))
1056 .cloned()
1057 .unwrap_or(JsonValue::Null),
1058 common,
1059 }),
1060 "channel_created" => SlackEventPayload::ChannelCreated(SlackChannelCreatedEventPayload {
1061 channel: event
1062 .and_then(|value| value.get("channel"))
1063 .cloned()
1064 .unwrap_or(JsonValue::Null),
1065 common,
1066 }),
1067 _ => SlackEventPayload::Other(common),
1068 };
1069 ProviderPayload::Known(KnownProviderPayload::Slack(Box::new(payload)))
1070}
1071
1072fn slack_channel_id(event: Option<&JsonValue>) -> Option<String> {
1073 event
1074 .and_then(|value| value.get("channel"))
1075 .and_then(JsonValue::as_str)
1076 .map(ToString::to_string)
1077 .or_else(|| {
1078 event
1079 .and_then(|value| value.get("item"))
1080 .and_then(|value| value.get("channel"))
1081 .and_then(JsonValue::as_str)
1082 .map(ToString::to_string)
1083 })
1084 .or_else(|| {
1085 event
1086 .and_then(|value| value.get("channel"))
1087 .and_then(|value| value.get("id"))
1088 .and_then(JsonValue::as_str)
1089 .map(ToString::to_string)
1090 })
1091}
1092
1093fn slack_user_id(event: Option<&JsonValue>) -> Option<String> {
1094 event
1095 .and_then(|value| value.get("user"))
1096 .and_then(JsonValue::as_str)
1097 .map(ToString::to_string)
1098 .or_else(|| {
1099 event
1100 .and_then(|value| value.get("user"))
1101 .and_then(|value| value.get("id"))
1102 .and_then(JsonValue::as_str)
1103 .map(ToString::to_string)
1104 })
1105}
1106
1107fn linear_payload(
1108 _kind: &str,
1109 headers: &BTreeMap<String, String>,
1110 raw: JsonValue,
1111) -> ProviderPayload {
1112 let action = raw
1113 .get("action")
1114 .and_then(JsonValue::as_str)
1115 .map(ToString::to_string);
1116 let organization_id = raw
1117 .get("organizationId")
1118 .and_then(JsonValue::as_str)
1119 .map(ToString::to_string);
1120 ProviderPayload::Known(KnownProviderPayload::Linear(LinearEventPayload {
1121 action,
1122 organization_id,
1123 webhook_timestamp: headers.get("Linear-Request-Timestamp").cloned(),
1124 raw,
1125 }))
1126}
1127
1128fn notion_payload(
1129 kind: &str,
1130 headers: &BTreeMap<String, String>,
1131 raw: JsonValue,
1132) -> ProviderPayload {
1133 let workspace_id = raw
1134 .get("workspace_id")
1135 .and_then(JsonValue::as_str)
1136 .map(ToString::to_string);
1137 ProviderPayload::Known(KnownProviderPayload::Notion(NotionEventPayload {
1138 event: kind.to_string(),
1139 workspace_id,
1140 request_id: headers
1141 .get("request-id")
1142 .cloned()
1143 .or_else(|| headers.get("x-request-id").cloned()),
1144 raw,
1145 }))
1146}
1147
1148fn cron_payload(
1149 _kind: &str,
1150 _headers: &BTreeMap<String, String>,
1151 raw: JsonValue,
1152) -> ProviderPayload {
1153 let cron_id = raw
1154 .get("cron_id")
1155 .and_then(JsonValue::as_str)
1156 .map(ToString::to_string);
1157 let schedule = raw
1158 .get("schedule")
1159 .and_then(JsonValue::as_str)
1160 .map(ToString::to_string);
1161 let tick_at = raw
1162 .get("tick_at")
1163 .and_then(JsonValue::as_str)
1164 .and_then(parse_rfc3339)
1165 .unwrap_or_else(OffsetDateTime::now_utc);
1166 ProviderPayload::Known(KnownProviderPayload::Cron(CronEventPayload {
1167 cron_id,
1168 schedule,
1169 tick_at,
1170 raw,
1171 }))
1172}
1173
1174fn webhook_payload(
1175 _kind: &str,
1176 headers: &BTreeMap<String, String>,
1177 raw: JsonValue,
1178) -> ProviderPayload {
1179 ProviderPayload::Known(KnownProviderPayload::Webhook(GenericWebhookPayload {
1180 source: headers.get("X-Webhook-Source").cloned(),
1181 content_type: headers.get("Content-Type").cloned(),
1182 raw,
1183 }))
1184}
1185
1186fn a2a_push_payload(
1187 _kind: &str,
1188 _headers: &BTreeMap<String, String>,
1189 raw: JsonValue,
1190) -> ProviderPayload {
1191 let task_id = raw
1192 .get("task_id")
1193 .and_then(JsonValue::as_str)
1194 .map(ToString::to_string);
1195 let sender = raw
1196 .get("sender")
1197 .and_then(JsonValue::as_str)
1198 .map(ToString::to_string);
1199 ProviderPayload::Known(KnownProviderPayload::A2aPush(A2aPushPayload {
1200 task_id,
1201 sender,
1202 raw,
1203 }))
1204}
1205
1206fn parse_rfc3339(text: &str) -> Option<OffsetDateTime> {
1207 OffsetDateTime::parse(text, &time::format_description::well_known::Rfc3339).ok()
1208}
1209
1210#[cfg(test)]
1211mod tests {
1212 use super::*;
1213
1214 fn sample_headers() -> BTreeMap<String, String> {
1215 BTreeMap::from([
1216 ("Authorization".to_string(), "Bearer secret".to_string()),
1217 ("Cookie".to_string(), "session=abc".to_string()),
1218 ("User-Agent".to_string(), "GitHub-Hookshot/123".to_string()),
1219 ("X-GitHub-Delivery".to_string(), "delivery-123".to_string()),
1220 ("X-GitHub-Event".to_string(), "issues".to_string()),
1221 ("X-Webhook-Token".to_string(), "token".to_string()),
1222 ])
1223 }
1224
1225 #[test]
1226 fn default_redaction_policy_keeps_safe_headers() {
1227 let redacted = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
1228 assert_eq!(redacted.get("User-Agent").unwrap(), "GitHub-Hookshot/123");
1229 assert_eq!(redacted.get("X-GitHub-Delivery").unwrap(), "delivery-123");
1230 assert_eq!(
1231 redacted.get("Authorization").unwrap(),
1232 REDACTED_HEADER_VALUE
1233 );
1234 assert_eq!(redacted.get("Cookie").unwrap(), REDACTED_HEADER_VALUE);
1235 assert_eq!(
1236 redacted.get("X-Webhook-Token").unwrap(),
1237 REDACTED_HEADER_VALUE
1238 );
1239 }
1240
1241 #[test]
1242 fn provider_catalog_rejects_duplicates() {
1243 let mut catalog = ProviderCatalog::default();
1244 catalog
1245 .register(Arc::new(BuiltinProviderSchema {
1246 provider_id: "github",
1247 harn_schema_name: "GitHubEventPayload",
1248 metadata: provider_metadata_entry(
1249 "github",
1250 &["webhook"],
1251 "GitHubEventPayload",
1252 SignatureVerificationMetadata::None,
1253 Vec::new(),
1254 ProviderRuntimeMetadata::Placeholder,
1255 ),
1256 normalize: github_payload,
1257 }))
1258 .unwrap();
1259 let error = catalog
1260 .register(Arc::new(BuiltinProviderSchema {
1261 provider_id: "github",
1262 harn_schema_name: "GitHubEventPayload",
1263 metadata: provider_metadata_entry(
1264 "github",
1265 &["webhook"],
1266 "GitHubEventPayload",
1267 SignatureVerificationMetadata::None,
1268 Vec::new(),
1269 ProviderRuntimeMetadata::Placeholder,
1270 ),
1271 normalize: github_payload,
1272 }))
1273 .unwrap_err();
1274 assert_eq!(
1275 error,
1276 ProviderCatalogError::DuplicateProvider("github".to_string())
1277 );
1278 }
1279
1280 #[test]
1281 fn registered_provider_metadata_marks_builtin_connectors() {
1282 let entries = registered_provider_metadata();
1283 let builtin: Vec<&ProviderMetadata> = entries
1284 .iter()
1285 .filter(|entry| matches!(entry.runtime, ProviderRuntimeMetadata::Builtin { .. }))
1286 .collect();
1287
1288 assert_eq!(builtin.len(), 4);
1289 assert!(builtin.iter().any(|entry| entry.provider == "cron"));
1290 assert!(builtin.iter().any(|entry| entry.provider == "github"));
1291 assert!(builtin.iter().any(|entry| entry.provider == "slack"));
1292 assert!(builtin.iter().any(|entry| entry.provider == "webhook"));
1293 }
1294
1295 #[test]
1296 fn trigger_event_round_trip_is_stable() {
1297 let provider = ProviderId::from("github");
1298 let headers = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
1299 let payload = ProviderPayload::normalize(
1300 &provider,
1301 "issues",
1302 &sample_headers(),
1303 serde_json::json!({
1304 "action": "opened",
1305 "installation": {"id": 42},
1306 "issue": {"number": 99}
1307 }),
1308 )
1309 .unwrap();
1310 let event = TriggerEvent {
1311 id: TriggerEventId("trigger_evt_fixed".to_string()),
1312 provider,
1313 kind: "issues".to_string(),
1314 received_at: parse_rfc3339("2026-04-19T07:00:00Z").unwrap(),
1315 occurred_at: Some(parse_rfc3339("2026-04-19T06:59:59Z").unwrap()),
1316 dedupe_key: "delivery-123".to_string(),
1317 trace_id: TraceId("trace_fixed".to_string()),
1318 tenant_id: Some(TenantId("tenant_1".to_string())),
1319 headers,
1320 provider_payload: payload,
1321 signature_status: SignatureStatus::Verified,
1322 dedupe_claimed: false,
1323 batch: None,
1324 };
1325
1326 let once = serde_json::to_value(&event).unwrap();
1327 let decoded: TriggerEvent = serde_json::from_value(once.clone()).unwrap();
1328 let twice = serde_json::to_value(&decoded).unwrap();
1329 assert_eq!(decoded, event);
1330 assert_eq!(once, twice);
1331 }
1332
1333 #[test]
1334 fn unknown_provider_errors() {
1335 let error = ProviderPayload::normalize(
1336 &ProviderId::from("custom-provider"),
1337 "thing.happened",
1338 &BTreeMap::new(),
1339 serde_json::json!({"ok": true}),
1340 )
1341 .unwrap_err();
1342 assert_eq!(
1343 error,
1344 ProviderCatalogError::UnknownProvider("custom-provider".to_string())
1345 );
1346 }
1347}