Skip to main content

harn_vm/triggers/
event.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, OnceLock, RwLock};
3
4use serde::{Deserialize, Deserializer, Serialize, Serializer};
5use serde_json::Value as JsonValue;
6use time::OffsetDateTime;
7use uuid::Uuid;
8
9use crate::triggers::test_util::clock;
10
11const REDACTED_HEADER_VALUE: &str = "[redacted]";
12
13fn serialize_optional_bytes_b64<S>(
14    value: &Option<Vec<u8>>,
15    serializer: S,
16) -> Result<S::Ok, S::Error>
17where
18    S: Serializer,
19{
20    use base64::Engine;
21
22    match value {
23        Some(bytes) => {
24            serializer.serialize_some(&base64::engine::general_purpose::STANDARD.encode(bytes))
25        }
26        None => serializer.serialize_none(),
27    }
28}
29
30fn deserialize_optional_bytes_b64<'de, D>(deserializer: D) -> Result<Option<Vec<u8>>, D::Error>
31where
32    D: Deserializer<'de>,
33{
34    use base64::Engine;
35
36    let encoded = Option::<String>::deserialize(deserializer)?;
37    encoded
38        .map(|text| {
39            base64::engine::general_purpose::STANDARD
40                .decode(text.as_bytes())
41                .map_err(serde::de::Error::custom)
42        })
43        .transpose()
44}
45
46#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
47#[serde(transparent)]
48pub struct TriggerEventId(pub String);
49
50impl TriggerEventId {
51    pub fn new() -> Self {
52        Self(format!("trigger_evt_{}", Uuid::now_v7()))
53    }
54}
55
56impl Default for TriggerEventId {
57    fn default() -> Self {
58        Self::new()
59    }
60}
61
62#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
63#[serde(transparent)]
64pub struct ProviderId(pub String);
65
66impl ProviderId {
67    pub fn new(value: impl Into<String>) -> Self {
68        Self(value.into())
69    }
70
71    pub fn as_str(&self) -> &str {
72        self.0.as_str()
73    }
74}
75
76impl From<&str> for ProviderId {
77    fn from(value: &str) -> Self {
78        Self::new(value)
79    }
80}
81
82impl From<String> for ProviderId {
83    fn from(value: String) -> Self {
84        Self::new(value)
85    }
86}
87
88#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
89#[serde(transparent)]
90pub struct TraceId(pub String);
91
92impl TraceId {
93    pub fn new() -> Self {
94        Self(format!("trace_{}", Uuid::now_v7()))
95    }
96}
97
98impl Default for TraceId {
99    fn default() -> Self {
100        Self::new()
101    }
102}
103
104#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
105#[serde(transparent)]
106pub struct TenantId(pub String);
107
108impl TenantId {
109    pub fn new(value: impl Into<String>) -> Self {
110        Self(value.into())
111    }
112}
113
114#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
115#[serde(tag = "state", rename_all = "snake_case")]
116pub enum SignatureStatus {
117    Verified,
118    Unsigned,
119    Failed { reason: String },
120}
121
122#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
123pub struct GitHubEventCommon {
124    pub event: String,
125    pub action: Option<String>,
126    pub delivery_id: Option<String>,
127    pub installation_id: Option<i64>,
128    pub raw: JsonValue,
129}
130
131#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
132pub struct GitHubIssuesEventPayload {
133    #[serde(flatten)]
134    pub common: GitHubEventCommon,
135    pub issue: JsonValue,
136}
137
138#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
139pub struct GitHubPullRequestEventPayload {
140    #[serde(flatten)]
141    pub common: GitHubEventCommon,
142    pub pull_request: JsonValue,
143}
144
145#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
146pub struct GitHubIssueCommentEventPayload {
147    #[serde(flatten)]
148    pub common: GitHubEventCommon,
149    pub issue: JsonValue,
150    pub comment: JsonValue,
151}
152
153#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
154pub struct GitHubPullRequestReviewEventPayload {
155    #[serde(flatten)]
156    pub common: GitHubEventCommon,
157    pub pull_request: JsonValue,
158    pub review: JsonValue,
159}
160
161#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
162pub struct GitHubPushEventPayload {
163    #[serde(flatten)]
164    pub common: GitHubEventCommon,
165    #[serde(default)]
166    pub commits: Vec<JsonValue>,
167    pub distinct_size: Option<i64>,
168}
169
170#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
171pub struct GitHubWorkflowRunEventPayload {
172    #[serde(flatten)]
173    pub common: GitHubEventCommon,
174    pub workflow_run: JsonValue,
175}
176
177#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
178pub struct GitHubDeploymentStatusEventPayload {
179    #[serde(flatten)]
180    pub common: GitHubEventCommon,
181    pub deployment_status: JsonValue,
182    pub deployment: JsonValue,
183}
184
185#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
186pub struct GitHubCheckRunEventPayload {
187    #[serde(flatten)]
188    pub common: GitHubEventCommon,
189    pub check_run: JsonValue,
190}
191
192#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
193#[serde(untagged)]
194pub enum GitHubEventPayload {
195    Issues(GitHubIssuesEventPayload),
196    PullRequest(GitHubPullRequestEventPayload),
197    IssueComment(GitHubIssueCommentEventPayload),
198    PullRequestReview(GitHubPullRequestReviewEventPayload),
199    Push(GitHubPushEventPayload),
200    WorkflowRun(GitHubWorkflowRunEventPayload),
201    DeploymentStatus(GitHubDeploymentStatusEventPayload),
202    CheckRun(GitHubCheckRunEventPayload),
203    Other(GitHubEventCommon),
204}
205
206#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
207pub struct SlackEventCommon {
208    pub event: String,
209    pub event_id: Option<String>,
210    pub api_app_id: Option<String>,
211    pub team_id: Option<String>,
212    pub channel_id: Option<String>,
213    pub user_id: Option<String>,
214    pub event_ts: Option<String>,
215    pub raw: JsonValue,
216}
217
218#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
219pub struct SlackMessageEventPayload {
220    #[serde(flatten)]
221    pub common: SlackEventCommon,
222    pub subtype: Option<String>,
223    pub channel_type: Option<String>,
224    pub channel: Option<String>,
225    pub user: Option<String>,
226    pub text: Option<String>,
227    pub ts: Option<String>,
228    pub thread_ts: Option<String>,
229}
230
231#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
232pub struct SlackAppMentionEventPayload {
233    #[serde(flatten)]
234    pub common: SlackEventCommon,
235    pub channel: Option<String>,
236    pub user: Option<String>,
237    pub text: Option<String>,
238    pub ts: Option<String>,
239    pub thread_ts: Option<String>,
240}
241
242#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
243pub struct SlackReactionAddedEventPayload {
244    #[serde(flatten)]
245    pub common: SlackEventCommon,
246    pub reaction: Option<String>,
247    pub item_user: Option<String>,
248    pub item: JsonValue,
249}
250
251#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
252pub struct SlackAppHomeOpenedEventPayload {
253    #[serde(flatten)]
254    pub common: SlackEventCommon,
255    pub user: Option<String>,
256    pub channel: Option<String>,
257    pub tab: Option<String>,
258    pub view: JsonValue,
259}
260
261#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
262pub struct SlackAssistantThreadStartedEventPayload {
263    #[serde(flatten)]
264    pub common: SlackEventCommon,
265    pub assistant_thread: JsonValue,
266    pub thread_ts: Option<String>,
267    pub context: JsonValue,
268}
269
270#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
271#[serde(untagged)]
272pub enum SlackEventPayload {
273    Message(SlackMessageEventPayload),
274    AppMention(SlackAppMentionEventPayload),
275    ReactionAdded(SlackReactionAddedEventPayload),
276    AppHomeOpened(SlackAppHomeOpenedEventPayload),
277    AssistantThreadStarted(SlackAssistantThreadStartedEventPayload),
278    Other(SlackEventCommon),
279}
280
281#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
282pub struct LinearEventCommon {
283    pub event: String,
284    pub action: Option<String>,
285    pub delivery_id: Option<String>,
286    pub organization_id: Option<String>,
287    pub webhook_timestamp: Option<i64>,
288    pub webhook_id: Option<String>,
289    pub url: Option<String>,
290    pub created_at: Option<String>,
291    pub actor: JsonValue,
292    pub raw: JsonValue,
293}
294
295#[derive(Clone, Debug, PartialEq)]
296pub enum LinearIssueChange {
297    Title { previous: Option<String> },
298    Description { previous: Option<String> },
299    Priority { previous: Option<i64> },
300    Estimate { previous: Option<i64> },
301    StateId { previous: Option<String> },
302    TeamId { previous: Option<String> },
303    AssigneeId { previous: Option<String> },
304    ProjectId { previous: Option<String> },
305    CycleId { previous: Option<String> },
306    DueDate { previous: Option<String> },
307    ParentId { previous: Option<String> },
308    SortOrder { previous: Option<f64> },
309    LabelIds { previous: Vec<String> },
310    CompletedAt { previous: Option<String> },
311    Other { field: String, previous: JsonValue },
312}
313
314impl Serialize for LinearIssueChange {
315    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
316    where
317        S: Serializer,
318    {
319        let value = match self {
320            Self::Title { previous } => {
321                serde_json::json!({ "field_name": "title", "previous": previous })
322            }
323            Self::Description { previous } => {
324                serde_json::json!({ "field_name": "description", "previous": previous })
325            }
326            Self::Priority { previous } => {
327                serde_json::json!({ "field_name": "priority", "previous": previous })
328            }
329            Self::Estimate { previous } => {
330                serde_json::json!({ "field_name": "estimate", "previous": previous })
331            }
332            Self::StateId { previous } => {
333                serde_json::json!({ "field_name": "state_id", "previous": previous })
334            }
335            Self::TeamId { previous } => {
336                serde_json::json!({ "field_name": "team_id", "previous": previous })
337            }
338            Self::AssigneeId { previous } => {
339                serde_json::json!({ "field_name": "assignee_id", "previous": previous })
340            }
341            Self::ProjectId { previous } => {
342                serde_json::json!({ "field_name": "project_id", "previous": previous })
343            }
344            Self::CycleId { previous } => {
345                serde_json::json!({ "field_name": "cycle_id", "previous": previous })
346            }
347            Self::DueDate { previous } => {
348                serde_json::json!({ "field_name": "due_date", "previous": previous })
349            }
350            Self::ParentId { previous } => {
351                serde_json::json!({ "field_name": "parent_id", "previous": previous })
352            }
353            Self::SortOrder { previous } => {
354                serde_json::json!({ "field_name": "sort_order", "previous": previous })
355            }
356            Self::LabelIds { previous } => {
357                serde_json::json!({ "field_name": "label_ids", "previous": previous })
358            }
359            Self::CompletedAt { previous } => {
360                serde_json::json!({ "field_name": "completed_at", "previous": previous })
361            }
362            Self::Other { field, previous } => {
363                serde_json::json!({ "field_name": "other", "field": field, "previous": previous })
364            }
365        };
366        value.serialize(serializer)
367    }
368}
369
370impl<'de> Deserialize<'de> for LinearIssueChange {
371    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
372    where
373        D: Deserializer<'de>,
374    {
375        let value = JsonValue::deserialize(deserializer)?;
376        let field_name = value
377            .get("field_name")
378            .and_then(JsonValue::as_str)
379            .ok_or_else(|| serde::de::Error::custom("linear issue change missing field_name"))?;
380        let previous = value.get("previous").cloned().unwrap_or(JsonValue::Null);
381        Ok(match field_name {
382            "title" => Self::Title {
383                previous: previous.as_str().map(ToString::to_string),
384            },
385            "description" => Self::Description {
386                previous: previous.as_str().map(ToString::to_string),
387            },
388            "priority" => Self::Priority {
389                previous: parse_json_i64ish(&previous),
390            },
391            "estimate" => Self::Estimate {
392                previous: parse_json_i64ish(&previous),
393            },
394            "state_id" => Self::StateId {
395                previous: previous.as_str().map(ToString::to_string),
396            },
397            "team_id" => Self::TeamId {
398                previous: previous.as_str().map(ToString::to_string),
399            },
400            "assignee_id" => Self::AssigneeId {
401                previous: previous.as_str().map(ToString::to_string),
402            },
403            "project_id" => Self::ProjectId {
404                previous: previous.as_str().map(ToString::to_string),
405            },
406            "cycle_id" => Self::CycleId {
407                previous: previous.as_str().map(ToString::to_string),
408            },
409            "due_date" => Self::DueDate {
410                previous: previous.as_str().map(ToString::to_string),
411            },
412            "parent_id" => Self::ParentId {
413                previous: previous.as_str().map(ToString::to_string),
414            },
415            "sort_order" => Self::SortOrder {
416                previous: previous.as_f64(),
417            },
418            "label_ids" => Self::LabelIds {
419                previous: parse_string_array(&previous),
420            },
421            "completed_at" => Self::CompletedAt {
422                previous: previous.as_str().map(ToString::to_string),
423            },
424            "other" => Self::Other {
425                field: value
426                    .get("field")
427                    .and_then(JsonValue::as_str)
428                    .map(ToString::to_string)
429                    .unwrap_or_else(|| "unknown".to_string()),
430                previous,
431            },
432            other => Self::Other {
433                field: other.to_string(),
434                previous,
435            },
436        })
437    }
438}
439
440#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
441pub struct LinearIssueEventPayload {
442    #[serde(flatten)]
443    pub common: LinearEventCommon,
444    pub issue: JsonValue,
445    #[serde(default)]
446    pub changes: Vec<LinearIssueChange>,
447}
448
449#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
450pub struct LinearIssueCommentEventPayload {
451    #[serde(flatten)]
452    pub common: LinearEventCommon,
453    pub comment: JsonValue,
454}
455
456#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
457pub struct LinearIssueLabelEventPayload {
458    #[serde(flatten)]
459    pub common: LinearEventCommon,
460    pub label: JsonValue,
461}
462
463#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
464pub struct LinearProjectEventPayload {
465    #[serde(flatten)]
466    pub common: LinearEventCommon,
467    pub project: JsonValue,
468}
469
470#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
471pub struct LinearCycleEventPayload {
472    #[serde(flatten)]
473    pub common: LinearEventCommon,
474    pub cycle: JsonValue,
475}
476
477#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
478pub struct LinearCustomerEventPayload {
479    #[serde(flatten)]
480    pub common: LinearEventCommon,
481    pub customer: JsonValue,
482}
483
484#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
485pub struct LinearCustomerRequestEventPayload {
486    #[serde(flatten)]
487    pub common: LinearEventCommon,
488    pub customer_request: JsonValue,
489}
490
491#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
492#[serde(untagged)]
493pub enum LinearEventPayload {
494    Issue(LinearIssueEventPayload),
495    IssueComment(LinearIssueCommentEventPayload),
496    IssueLabel(LinearIssueLabelEventPayload),
497    Project(LinearProjectEventPayload),
498    Cycle(LinearCycleEventPayload),
499    Customer(LinearCustomerEventPayload),
500    CustomerRequest(LinearCustomerRequestEventPayload),
501    Other(LinearEventCommon),
502}
503
504#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
505pub struct NotionPolledChangeEvent {
506    pub resource: String,
507    pub source_id: String,
508    pub entity_id: String,
509    pub high_water_mark: String,
510    pub before: Option<JsonValue>,
511    pub after: JsonValue,
512}
513
514#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
515pub struct NotionEventPayload {
516    pub event: String,
517    pub workspace_id: Option<String>,
518    pub request_id: Option<String>,
519    pub subscription_id: Option<String>,
520    pub integration_id: Option<String>,
521    pub attempt_number: Option<u32>,
522    pub entity_id: Option<String>,
523    pub entity_type: Option<String>,
524    pub api_version: Option<String>,
525    pub verification_token: Option<String>,
526    pub polled: Option<NotionPolledChangeEvent>,
527    pub raw: JsonValue,
528}
529
530#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
531pub struct CronEventPayload {
532    pub cron_id: Option<String>,
533    pub schedule: Option<String>,
534    #[serde(with = "time::serde::rfc3339")]
535    pub tick_at: OffsetDateTime,
536    pub raw: JsonValue,
537}
538
539#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
540pub struct GenericWebhookPayload {
541    pub source: Option<String>,
542    pub content_type: Option<String>,
543    pub raw: JsonValue,
544}
545
546#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
547pub struct A2aPushPayload {
548    pub task_id: Option<String>,
549    pub task_state: Option<String>,
550    pub artifact: Option<JsonValue>,
551    pub sender: Option<String>,
552    pub raw: JsonValue,
553    pub kind: String,
554}
555
556#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
557pub struct StreamEventPayload {
558    pub event: String,
559    pub source: Option<String>,
560    pub stream: Option<String>,
561    pub partition: Option<String>,
562    pub offset: Option<String>,
563    pub key: Option<String>,
564    pub timestamp: Option<String>,
565    #[serde(default)]
566    pub headers: BTreeMap<String, String>,
567    pub raw: JsonValue,
568}
569
570#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
571pub struct ExtensionProviderPayload {
572    pub provider: String,
573    pub schema_name: String,
574    pub raw: JsonValue,
575}
576
577#[allow(clippy::large_enum_variant)]
578#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
579#[serde(untagged)]
580pub enum ProviderPayload {
581    Known(KnownProviderPayload),
582    Extension(ExtensionProviderPayload),
583}
584
585impl ProviderPayload {
586    pub fn provider(&self) -> &str {
587        match self {
588            Self::Known(known) => known.provider(),
589            Self::Extension(payload) => payload.provider.as_str(),
590        }
591    }
592
593    pub fn normalize(
594        provider: &ProviderId,
595        kind: &str,
596        headers: &BTreeMap<String, String>,
597        raw: JsonValue,
598    ) -> Result<Self, ProviderCatalogError> {
599        provider_catalog()
600            .read()
601            .expect("provider catalog poisoned")
602            .normalize(provider, kind, headers, raw)
603    }
604}
605
606#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
607#[serde(tag = "provider")]
608pub enum KnownProviderPayload {
609    #[serde(rename = "github")]
610    GitHub(GitHubEventPayload),
611    #[serde(rename = "slack")]
612    Slack(Box<SlackEventPayload>),
613    #[serde(rename = "linear")]
614    Linear(LinearEventPayload),
615    #[serde(rename = "notion")]
616    Notion(Box<NotionEventPayload>),
617    #[serde(rename = "cron")]
618    Cron(CronEventPayload),
619    #[serde(rename = "webhook")]
620    Webhook(GenericWebhookPayload),
621    #[serde(rename = "a2a-push")]
622    A2aPush(A2aPushPayload),
623    #[serde(rename = "kafka")]
624    Kafka(StreamEventPayload),
625    #[serde(rename = "nats")]
626    Nats(StreamEventPayload),
627    #[serde(rename = "pulsar")]
628    Pulsar(StreamEventPayload),
629    #[serde(rename = "postgres-cdc")]
630    PostgresCdc(StreamEventPayload),
631    #[serde(rename = "email")]
632    Email(StreamEventPayload),
633    #[serde(rename = "websocket")]
634    Websocket(StreamEventPayload),
635}
636
637impl KnownProviderPayload {
638    pub fn provider(&self) -> &str {
639        match self {
640            Self::GitHub(_) => "github",
641            Self::Slack(_) => "slack",
642            Self::Linear(_) => "linear",
643            Self::Notion(_) => "notion",
644            Self::Cron(_) => "cron",
645            Self::Webhook(_) => "webhook",
646            Self::A2aPush(_) => "a2a-push",
647            Self::Kafka(_) => "kafka",
648            Self::Nats(_) => "nats",
649            Self::Pulsar(_) => "pulsar",
650            Self::PostgresCdc(_) => "postgres-cdc",
651            Self::Email(_) => "email",
652            Self::Websocket(_) => "websocket",
653        }
654    }
655}
656
657#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
658pub struct TriggerEvent {
659    pub id: TriggerEventId,
660    pub provider: ProviderId,
661    pub kind: String,
662    #[serde(with = "time::serde::rfc3339")]
663    pub received_at: OffsetDateTime,
664    #[serde(with = "time::serde::rfc3339::option")]
665    pub occurred_at: Option<OffsetDateTime>,
666    pub dedupe_key: String,
667    pub trace_id: TraceId,
668    pub tenant_id: Option<TenantId>,
669    pub headers: BTreeMap<String, String>,
670    #[serde(default, skip_serializing_if = "Option::is_none")]
671    pub batch: Option<Vec<JsonValue>>,
672    #[serde(
673        default,
674        skip_serializing_if = "Option::is_none",
675        serialize_with = "serialize_optional_bytes_b64",
676        deserialize_with = "deserialize_optional_bytes_b64"
677    )]
678    pub raw_body: Option<Vec<u8>>,
679    pub provider_payload: ProviderPayload,
680    pub signature_status: SignatureStatus,
681    #[serde(skip)]
682    pub dedupe_claimed: bool,
683}
684
685impl TriggerEvent {
686    #[allow(clippy::too_many_arguments)]
687    pub fn new(
688        provider: ProviderId,
689        kind: impl Into<String>,
690        occurred_at: Option<OffsetDateTime>,
691        dedupe_key: impl Into<String>,
692        tenant_id: Option<TenantId>,
693        headers: BTreeMap<String, String>,
694        provider_payload: ProviderPayload,
695        signature_status: SignatureStatus,
696    ) -> Self {
697        Self {
698            id: TriggerEventId::new(),
699            provider,
700            kind: kind.into(),
701            received_at: clock::now_utc(),
702            occurred_at,
703            dedupe_key: dedupe_key.into(),
704            trace_id: TraceId::new(),
705            tenant_id,
706            headers,
707            batch: None,
708            raw_body: None,
709            provider_payload,
710            signature_status,
711            dedupe_claimed: false,
712        }
713    }
714
715    pub fn dedupe_claimed(&self) -> bool {
716        self.dedupe_claimed
717    }
718
719    pub fn mark_dedupe_claimed(&mut self) {
720        self.dedupe_claimed = true;
721    }
722}
723
724#[derive(Clone, Debug, PartialEq, Eq)]
725pub struct HeaderRedactionPolicy {
726    safe_exact_names: BTreeSet<String>,
727}
728
729impl HeaderRedactionPolicy {
730    pub fn with_safe_header(mut self, name: impl Into<String>) -> Self {
731        self.safe_exact_names
732            .insert(name.into().to_ascii_lowercase());
733        self
734    }
735
736    fn should_keep(&self, name: &str) -> bool {
737        let lower = name.to_ascii_lowercase();
738        if self.safe_exact_names.contains(lower.as_str()) {
739            return true;
740        }
741        matches!(
742            lower.as_str(),
743            "user-agent"
744                | "request-id"
745                | "x-request-id"
746                | "x-correlation-id"
747                | "content-type"
748                | "content-length"
749                | "x-github-event"
750                | "x-github-delivery"
751                | "x-github-hook-id"
752                | "x-hub-signature-256"
753                | "x-slack-request-timestamp"
754                | "x-slack-signature"
755                | "x-linear-signature"
756                | "x-notion-signature"
757                | "x-a2a-signature"
758                | "x-a2a-delivery"
759        ) || lower.ends_with("-event")
760            || lower.ends_with("-delivery")
761            || lower.contains("timestamp")
762            || lower.contains("request-id")
763    }
764
765    fn should_redact(&self, name: &str) -> bool {
766        let lower = name.to_ascii_lowercase();
767        if self.should_keep(lower.as_str()) {
768            return false;
769        }
770        lower.contains("authorization")
771            || lower.contains("cookie")
772            || lower.contains("secret")
773            || lower.contains("token")
774            || lower.contains("key")
775    }
776}
777
778impl Default for HeaderRedactionPolicy {
779    fn default() -> Self {
780        Self {
781            safe_exact_names: BTreeSet::from([
782                "content-length".to_string(),
783                "content-type".to_string(),
784                "request-id".to_string(),
785                "user-agent".to_string(),
786                "x-a2a-delivery".to_string(),
787                "x-a2a-signature".to_string(),
788                "x-correlation-id".to_string(),
789                "x-github-delivery".to_string(),
790                "x-github-event".to_string(),
791                "x-github-hook-id".to_string(),
792                "x-hub-signature-256".to_string(),
793                "x-linear-signature".to_string(),
794                "x-notion-signature".to_string(),
795                "x-request-id".to_string(),
796                "x-slack-request-timestamp".to_string(),
797                "x-slack-signature".to_string(),
798            ]),
799        }
800    }
801}
802
803pub fn redact_headers(
804    headers: &BTreeMap<String, String>,
805    policy: &HeaderRedactionPolicy,
806) -> BTreeMap<String, String> {
807    headers
808        .iter()
809        .map(|(name, value)| {
810            if policy.should_redact(name) {
811                (name.clone(), REDACTED_HEADER_VALUE.to_string())
812            } else {
813                (name.clone(), value.clone())
814            }
815        })
816        .collect()
817}
818
819#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
820pub struct ProviderSecretRequirement {
821    pub name: String,
822    pub required: bool,
823    pub namespace: String,
824}
825
826#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
827pub struct ProviderOutboundMethod {
828    pub name: String,
829}
830
831#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
832#[serde(tag = "kind", rename_all = "snake_case")]
833pub enum SignatureVerificationMetadata {
834    #[default]
835    None,
836    Hmac {
837        variant: String,
838        raw_body: bool,
839        signature_header: String,
840        timestamp_header: Option<String>,
841        id_header: Option<String>,
842        default_tolerance_secs: Option<i64>,
843        digest: String,
844        encoding: String,
845    },
846}
847
848#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
849#[serde(tag = "kind", rename_all = "snake_case")]
850pub enum ProviderRuntimeMetadata {
851    Builtin {
852        connector: String,
853        default_signature_variant: Option<String>,
854    },
855    #[default]
856    Placeholder,
857}
858
859#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
860pub struct ProviderMetadata {
861    pub provider: String,
862    #[serde(default)]
863    pub kinds: Vec<String>,
864    pub schema_name: String,
865    #[serde(default)]
866    pub outbound_methods: Vec<ProviderOutboundMethod>,
867    #[serde(default)]
868    pub secret_requirements: Vec<ProviderSecretRequirement>,
869    #[serde(default)]
870    pub signature_verification: SignatureVerificationMetadata,
871    #[serde(default)]
872    pub runtime: ProviderRuntimeMetadata,
873}
874
875impl ProviderMetadata {
876    pub fn supports_kind(&self, kind: &str) -> bool {
877        self.kinds.iter().any(|candidate| candidate == kind)
878    }
879
880    pub fn required_secret_names(&self) -> impl Iterator<Item = &str> {
881        self.secret_requirements
882            .iter()
883            .filter(|requirement| requirement.required)
884            .map(|requirement| requirement.name.as_str())
885    }
886}
887
888pub trait ProviderSchema: Send + Sync {
889    fn provider_id(&self) -> &'static str;
890    fn harn_schema_name(&self) -> &'static str;
891    fn metadata(&self) -> ProviderMetadata {
892        ProviderMetadata {
893            provider: self.provider_id().to_string(),
894            schema_name: self.harn_schema_name().to_string(),
895            ..ProviderMetadata::default()
896        }
897    }
898    fn normalize(
899        &self,
900        kind: &str,
901        headers: &BTreeMap<String, String>,
902        raw: JsonValue,
903    ) -> Result<ProviderPayload, ProviderCatalogError>;
904}
905
906#[derive(Clone, Debug, PartialEq, Eq)]
907pub enum ProviderCatalogError {
908    DuplicateProvider(String),
909    UnknownProvider(String),
910}
911
912impl std::fmt::Display for ProviderCatalogError {
913    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
914        match self {
915            Self::DuplicateProvider(provider) => {
916                write!(f, "provider `{provider}` is already registered")
917            }
918            Self::UnknownProvider(provider) => write!(f, "provider `{provider}` is not registered"),
919        }
920    }
921}
922
923impl std::error::Error for ProviderCatalogError {}
924
925#[derive(Clone, Default)]
926pub struct ProviderCatalog {
927    providers: BTreeMap<String, Arc<dyn ProviderSchema>>,
928}
929
930impl ProviderCatalog {
931    pub fn with_defaults() -> Self {
932        let mut catalog = Self::default();
933        for schema in default_provider_schemas() {
934            catalog
935                .register(schema)
936                .expect("default providers must register cleanly");
937        }
938        catalog
939    }
940
941    pub fn register(
942        &mut self,
943        schema: Arc<dyn ProviderSchema>,
944    ) -> Result<(), ProviderCatalogError> {
945        let provider = schema.provider_id().to_string();
946        if self.providers.contains_key(provider.as_str()) {
947            return Err(ProviderCatalogError::DuplicateProvider(provider));
948        }
949        self.providers.insert(provider, schema);
950        Ok(())
951    }
952
953    pub fn normalize(
954        &self,
955        provider: &ProviderId,
956        kind: &str,
957        headers: &BTreeMap<String, String>,
958        raw: JsonValue,
959    ) -> Result<ProviderPayload, ProviderCatalogError> {
960        let schema = self
961            .providers
962            .get(provider.as_str())
963            .ok_or_else(|| ProviderCatalogError::UnknownProvider(provider.0.clone()))?;
964        schema.normalize(kind, headers, raw)
965    }
966
967    pub fn schema_names(&self) -> BTreeMap<String, String> {
968        self.providers
969            .iter()
970            .map(|(provider, schema)| (provider.clone(), schema.harn_schema_name().to_string()))
971            .collect()
972    }
973
974    pub fn entries(&self) -> Vec<ProviderMetadata> {
975        self.providers
976            .values()
977            .map(|schema| schema.metadata())
978            .collect()
979    }
980
981    pub fn metadata_for(&self, provider: &str) -> Option<ProviderMetadata> {
982        self.providers.get(provider).map(|schema| schema.metadata())
983    }
984}
985
986pub fn register_provider_schema(
987    schema: Arc<dyn ProviderSchema>,
988) -> Result<(), ProviderCatalogError> {
989    provider_catalog()
990        .write()
991        .expect("provider catalog poisoned")
992        .register(schema)
993}
994
995pub fn reset_provider_catalog() {
996    *provider_catalog()
997        .write()
998        .expect("provider catalog poisoned") = ProviderCatalog::with_defaults();
999}
1000
1001pub fn reset_provider_catalog_with(
1002    schemas: Vec<Arc<dyn ProviderSchema>>,
1003) -> Result<(), ProviderCatalogError> {
1004    let mut catalog = ProviderCatalog::with_defaults();
1005    let builtin_providers: BTreeSet<String> = catalog.schema_names().into_keys().collect();
1006    for schema in schemas {
1007        if builtin_providers.contains(schema.provider_id()) {
1008            continue;
1009        }
1010        catalog.register(schema)?;
1011    }
1012    *provider_catalog()
1013        .write()
1014        .expect("provider catalog poisoned") = catalog;
1015    Ok(())
1016}
1017
1018pub fn registered_provider_schema_names() -> BTreeMap<String, String> {
1019    provider_catalog()
1020        .read()
1021        .expect("provider catalog poisoned")
1022        .schema_names()
1023}
1024
1025pub fn registered_provider_metadata() -> Vec<ProviderMetadata> {
1026    provider_catalog()
1027        .read()
1028        .expect("provider catalog poisoned")
1029        .entries()
1030}
1031
1032pub fn provider_metadata(provider: &str) -> Option<ProviderMetadata> {
1033    provider_catalog()
1034        .read()
1035        .expect("provider catalog poisoned")
1036        .metadata_for(provider)
1037}
1038
1039fn provider_catalog() -> &'static RwLock<ProviderCatalog> {
1040    static PROVIDER_CATALOG: OnceLock<RwLock<ProviderCatalog>> = OnceLock::new();
1041    PROVIDER_CATALOG.get_or_init(|| RwLock::new(ProviderCatalog::with_defaults()))
1042}
1043
1044struct BuiltinProviderSchema {
1045    provider_id: &'static str,
1046    harn_schema_name: &'static str,
1047    metadata: ProviderMetadata,
1048    normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
1049}
1050
1051impl ProviderSchema for BuiltinProviderSchema {
1052    fn provider_id(&self) -> &'static str {
1053        self.provider_id
1054    }
1055
1056    fn harn_schema_name(&self) -> &'static str {
1057        self.harn_schema_name
1058    }
1059
1060    fn metadata(&self) -> ProviderMetadata {
1061        self.metadata.clone()
1062    }
1063
1064    fn normalize(
1065        &self,
1066        kind: &str,
1067        headers: &BTreeMap<String, String>,
1068        raw: JsonValue,
1069    ) -> Result<ProviderPayload, ProviderCatalogError> {
1070        Ok((self.normalize)(kind, headers, raw))
1071    }
1072}
1073
1074fn provider_metadata_entry(
1075    provider: &str,
1076    kinds: &[&str],
1077    schema_name: &str,
1078    outbound_methods: &[&str],
1079    signature_verification: SignatureVerificationMetadata,
1080    secret_requirements: Vec<ProviderSecretRequirement>,
1081    runtime: ProviderRuntimeMetadata,
1082) -> ProviderMetadata {
1083    ProviderMetadata {
1084        provider: provider.to_string(),
1085        kinds: kinds.iter().map(|kind| kind.to_string()).collect(),
1086        schema_name: schema_name.to_string(),
1087        outbound_methods: outbound_methods
1088            .iter()
1089            .map(|name| ProviderOutboundMethod {
1090                name: (*name).to_string(),
1091            })
1092            .collect(),
1093        secret_requirements,
1094        signature_verification,
1095        runtime,
1096    }
1097}
1098
1099fn hmac_signature_metadata(
1100    variant: &str,
1101    signature_header: &str,
1102    timestamp_header: Option<&str>,
1103    id_header: Option<&str>,
1104    default_tolerance_secs: Option<i64>,
1105    encoding: &str,
1106) -> SignatureVerificationMetadata {
1107    SignatureVerificationMetadata::Hmac {
1108        variant: variant.to_string(),
1109        raw_body: true,
1110        signature_header: signature_header.to_string(),
1111        timestamp_header: timestamp_header.map(ToString::to_string),
1112        id_header: id_header.map(ToString::to_string),
1113        default_tolerance_secs,
1114        digest: "sha256".to_string(),
1115        encoding: encoding.to_string(),
1116    }
1117}
1118
1119fn required_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1120    ProviderSecretRequirement {
1121        name: name.to_string(),
1122        required: true,
1123        namespace: namespace.to_string(),
1124    }
1125}
1126
1127fn outbound_method(name: &str) -> ProviderOutboundMethod {
1128    ProviderOutboundMethod {
1129        name: name.to_string(),
1130    }
1131}
1132
1133fn optional_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1134    ProviderSecretRequirement {
1135        name: name.to_string(),
1136        required: false,
1137        namespace: namespace.to_string(),
1138    }
1139}
1140
1141fn default_provider_schemas() -> Vec<Arc<dyn ProviderSchema>> {
1142    vec![
1143        Arc::new(BuiltinProviderSchema {
1144            provider_id: "github",
1145            harn_schema_name: "GitHubEventPayload",
1146            metadata: provider_metadata_entry(
1147                "github",
1148                &["webhook"],
1149                "GitHubEventPayload",
1150                &[],
1151                hmac_signature_metadata(
1152                    "github",
1153                    "X-Hub-Signature-256",
1154                    None,
1155                    Some("X-GitHub-Delivery"),
1156                    None,
1157                    "hex",
1158                ),
1159                vec![required_secret("signing_secret", "github")],
1160                ProviderRuntimeMetadata::Builtin {
1161                    connector: "webhook".to_string(),
1162                    default_signature_variant: Some("github".to_string()),
1163                },
1164            ),
1165            normalize: github_payload,
1166        }),
1167        Arc::new(BuiltinProviderSchema {
1168            provider_id: "slack",
1169            harn_schema_name: "SlackEventPayload",
1170            metadata: provider_metadata_entry(
1171                "slack",
1172                &["webhook"],
1173                "SlackEventPayload",
1174                &[
1175                    "post_message",
1176                    "update_message",
1177                    "add_reaction",
1178                    "open_view",
1179                    "user_info",
1180                    "api_call",
1181                    "upload_file",
1182                ],
1183                hmac_signature_metadata(
1184                    "slack",
1185                    "X-Slack-Signature",
1186                    Some("X-Slack-Request-Timestamp"),
1187                    None,
1188                    Some(300),
1189                    "hex",
1190                ),
1191                vec![required_secret("signing_secret", "slack")],
1192                ProviderRuntimeMetadata::Builtin {
1193                    connector: "slack".to_string(),
1194                    default_signature_variant: Some("slack".to_string()),
1195                },
1196            ),
1197            normalize: slack_payload,
1198        }),
1199        Arc::new(BuiltinProviderSchema {
1200            provider_id: "linear",
1201            harn_schema_name: "LinearEventPayload",
1202            metadata: {
1203                let mut metadata = provider_metadata_entry(
1204                    "linear",
1205                    &["webhook"],
1206                    "LinearEventPayload",
1207                    &[],
1208                    hmac_signature_metadata(
1209                        "linear",
1210                        "Linear-Signature",
1211                        None,
1212                        Some("Linear-Delivery"),
1213                        Some(75),
1214                        "hex",
1215                    ),
1216                    vec![
1217                        required_secret("signing_secret", "linear"),
1218                        optional_secret("access_token", "linear"),
1219                    ],
1220                    ProviderRuntimeMetadata::Builtin {
1221                        connector: "linear".to_string(),
1222                        default_signature_variant: Some("linear".to_string()),
1223                    },
1224                );
1225                metadata.outbound_methods = vec![
1226                    ProviderOutboundMethod {
1227                        name: "list_issues".to_string(),
1228                    },
1229                    ProviderOutboundMethod {
1230                        name: "update_issue".to_string(),
1231                    },
1232                    ProviderOutboundMethod {
1233                        name: "create_comment".to_string(),
1234                    },
1235                    ProviderOutboundMethod {
1236                        name: "search".to_string(),
1237                    },
1238                    ProviderOutboundMethod {
1239                        name: "graphql".to_string(),
1240                    },
1241                ];
1242                metadata
1243            },
1244            normalize: linear_payload,
1245        }),
1246        Arc::new(BuiltinProviderSchema {
1247            provider_id: "notion",
1248            harn_schema_name: "NotionEventPayload",
1249            metadata: {
1250                let mut metadata = provider_metadata_entry(
1251                    "notion",
1252                    &["webhook", "poll"],
1253                    "NotionEventPayload",
1254                    &[],
1255                    hmac_signature_metadata(
1256                        "notion",
1257                        "X-Notion-Signature",
1258                        None,
1259                        None,
1260                        None,
1261                        "hex",
1262                    ),
1263                    vec![required_secret("verification_token", "notion")],
1264                    ProviderRuntimeMetadata::Builtin {
1265                        connector: "notion".to_string(),
1266                        default_signature_variant: Some("notion".to_string()),
1267                    },
1268                );
1269                metadata.outbound_methods = vec![
1270                    outbound_method("get_page"),
1271                    outbound_method("update_page"),
1272                    outbound_method("append_blocks"),
1273                    outbound_method("query_database"),
1274                    outbound_method("search"),
1275                    outbound_method("create_comment"),
1276                    outbound_method("api_call"),
1277                ];
1278                metadata
1279            },
1280            normalize: notion_payload,
1281        }),
1282        Arc::new(BuiltinProviderSchema {
1283            provider_id: "cron",
1284            harn_schema_name: "CronEventPayload",
1285            metadata: provider_metadata_entry(
1286                "cron",
1287                &["cron"],
1288                "CronEventPayload",
1289                &[],
1290                SignatureVerificationMetadata::None,
1291                Vec::new(),
1292                ProviderRuntimeMetadata::Builtin {
1293                    connector: "cron".to_string(),
1294                    default_signature_variant: None,
1295                },
1296            ),
1297            normalize: cron_payload,
1298        }),
1299        Arc::new(BuiltinProviderSchema {
1300            provider_id: "webhook",
1301            harn_schema_name: "GenericWebhookPayload",
1302            metadata: provider_metadata_entry(
1303                "webhook",
1304                &["webhook"],
1305                "GenericWebhookPayload",
1306                &[],
1307                hmac_signature_metadata(
1308                    "standard",
1309                    "webhook-signature",
1310                    Some("webhook-timestamp"),
1311                    Some("webhook-id"),
1312                    Some(300),
1313                    "base64",
1314                ),
1315                vec![required_secret("signing_secret", "webhook")],
1316                ProviderRuntimeMetadata::Builtin {
1317                    connector: "webhook".to_string(),
1318                    default_signature_variant: Some("standard".to_string()),
1319                },
1320            ),
1321            normalize: webhook_payload,
1322        }),
1323        Arc::new(BuiltinProviderSchema {
1324            provider_id: "a2a-push",
1325            harn_schema_name: "A2aPushPayload",
1326            metadata: provider_metadata_entry(
1327                "a2a-push",
1328                &["a2a-push"],
1329                "A2aPushPayload",
1330                &[],
1331                SignatureVerificationMetadata::None,
1332                Vec::new(),
1333                ProviderRuntimeMetadata::Builtin {
1334                    connector: "a2a-push".to_string(),
1335                    default_signature_variant: None,
1336                },
1337            ),
1338            normalize: a2a_push_payload,
1339        }),
1340        Arc::new(stream_provider_schema("kafka", kafka_payload)),
1341        Arc::new(stream_provider_schema("nats", nats_payload)),
1342        Arc::new(stream_provider_schema("pulsar", pulsar_payload)),
1343        Arc::new(stream_provider_schema("postgres-cdc", postgres_cdc_payload)),
1344        Arc::new(stream_provider_schema("email", email_payload)),
1345        Arc::new(stream_provider_schema("websocket", websocket_payload)),
1346    ]
1347}
1348
1349fn stream_provider_schema(
1350    provider_id: &'static str,
1351    normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
1352) -> BuiltinProviderSchema {
1353    BuiltinProviderSchema {
1354        provider_id,
1355        harn_schema_name: "StreamEventPayload",
1356        metadata: provider_metadata_entry(
1357            provider_id,
1358            &["stream"],
1359            "StreamEventPayload",
1360            &[],
1361            SignatureVerificationMetadata::None,
1362            Vec::new(),
1363            ProviderRuntimeMetadata::Builtin {
1364                connector: "stream".to_string(),
1365                default_signature_variant: None,
1366            },
1367        ),
1368        normalize,
1369    }
1370}
1371
1372fn github_payload(
1373    kind: &str,
1374    headers: &BTreeMap<String, String>,
1375    raw: JsonValue,
1376) -> ProviderPayload {
1377    let common = GitHubEventCommon {
1378        event: kind.to_string(),
1379        action: raw
1380            .get("action")
1381            .and_then(JsonValue::as_str)
1382            .map(ToString::to_string),
1383        delivery_id: headers.get("X-GitHub-Delivery").cloned(),
1384        installation_id: raw
1385            .get("installation")
1386            .and_then(|value| value.get("id"))
1387            .and_then(JsonValue::as_i64),
1388        raw: raw.clone(),
1389    };
1390    let payload = match kind {
1391        "issues" => GitHubEventPayload::Issues(GitHubIssuesEventPayload {
1392            common,
1393            issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1394        }),
1395        "pull_request" => GitHubEventPayload::PullRequest(GitHubPullRequestEventPayload {
1396            common,
1397            pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1398        }),
1399        "issue_comment" => GitHubEventPayload::IssueComment(GitHubIssueCommentEventPayload {
1400            common,
1401            issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1402            comment: raw.get("comment").cloned().unwrap_or(JsonValue::Null),
1403        }),
1404        "pull_request_review" => {
1405            GitHubEventPayload::PullRequestReview(GitHubPullRequestReviewEventPayload {
1406                common,
1407                pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1408                review: raw.get("review").cloned().unwrap_or(JsonValue::Null),
1409            })
1410        }
1411        "push" => GitHubEventPayload::Push(GitHubPushEventPayload {
1412            common,
1413            commits: raw
1414                .get("commits")
1415                .and_then(JsonValue::as_array)
1416                .cloned()
1417                .unwrap_or_default(),
1418            distinct_size: raw.get("distinct_size").and_then(JsonValue::as_i64),
1419        }),
1420        "workflow_run" => GitHubEventPayload::WorkflowRun(GitHubWorkflowRunEventPayload {
1421            common,
1422            workflow_run: raw.get("workflow_run").cloned().unwrap_or(JsonValue::Null),
1423        }),
1424        "deployment_status" => {
1425            GitHubEventPayload::DeploymentStatus(GitHubDeploymentStatusEventPayload {
1426                common,
1427                deployment_status: raw
1428                    .get("deployment_status")
1429                    .cloned()
1430                    .unwrap_or(JsonValue::Null),
1431                deployment: raw.get("deployment").cloned().unwrap_or(JsonValue::Null),
1432            })
1433        }
1434        "check_run" => GitHubEventPayload::CheckRun(GitHubCheckRunEventPayload {
1435            common,
1436            check_run: raw.get("check_run").cloned().unwrap_or(JsonValue::Null),
1437        }),
1438        _ => GitHubEventPayload::Other(common),
1439    };
1440    ProviderPayload::Known(KnownProviderPayload::GitHub(payload))
1441}
1442
1443fn slack_payload(
1444    kind: &str,
1445    _headers: &BTreeMap<String, String>,
1446    raw: JsonValue,
1447) -> ProviderPayload {
1448    let event = raw.get("event");
1449    let common = SlackEventCommon {
1450        event: kind.to_string(),
1451        event_id: raw
1452            .get("event_id")
1453            .and_then(JsonValue::as_str)
1454            .map(ToString::to_string),
1455        api_app_id: raw
1456            .get("api_app_id")
1457            .and_then(JsonValue::as_str)
1458            .map(ToString::to_string),
1459        team_id: raw
1460            .get("team_id")
1461            .and_then(JsonValue::as_str)
1462            .map(ToString::to_string),
1463        channel_id: slack_channel_id(event),
1464        user_id: slack_user_id(event),
1465        event_ts: event
1466            .and_then(|value| value.get("event_ts"))
1467            .and_then(JsonValue::as_str)
1468            .map(ToString::to_string),
1469        raw: raw.clone(),
1470    };
1471    let payload = match kind {
1472        kind if kind == "message" || kind.starts_with("message.") => {
1473            SlackEventPayload::Message(SlackMessageEventPayload {
1474                subtype: event
1475                    .and_then(|value| value.get("subtype"))
1476                    .and_then(JsonValue::as_str)
1477                    .map(ToString::to_string),
1478                channel_type: event
1479                    .and_then(|value| value.get("channel_type"))
1480                    .and_then(JsonValue::as_str)
1481                    .map(ToString::to_string),
1482                channel: event
1483                    .and_then(|value| value.get("channel"))
1484                    .and_then(JsonValue::as_str)
1485                    .map(ToString::to_string),
1486                user: event
1487                    .and_then(|value| value.get("user"))
1488                    .and_then(JsonValue::as_str)
1489                    .map(ToString::to_string),
1490                text: event
1491                    .and_then(|value| value.get("text"))
1492                    .and_then(JsonValue::as_str)
1493                    .map(ToString::to_string),
1494                ts: event
1495                    .and_then(|value| value.get("ts"))
1496                    .and_then(JsonValue::as_str)
1497                    .map(ToString::to_string),
1498                thread_ts: event
1499                    .and_then(|value| value.get("thread_ts"))
1500                    .and_then(JsonValue::as_str)
1501                    .map(ToString::to_string),
1502                common,
1503            })
1504        }
1505        "app_mention" => SlackEventPayload::AppMention(SlackAppMentionEventPayload {
1506            channel: event
1507                .and_then(|value| value.get("channel"))
1508                .and_then(JsonValue::as_str)
1509                .map(ToString::to_string),
1510            user: event
1511                .and_then(|value| value.get("user"))
1512                .and_then(JsonValue::as_str)
1513                .map(ToString::to_string),
1514            text: event
1515                .and_then(|value| value.get("text"))
1516                .and_then(JsonValue::as_str)
1517                .map(ToString::to_string),
1518            ts: event
1519                .and_then(|value| value.get("ts"))
1520                .and_then(JsonValue::as_str)
1521                .map(ToString::to_string),
1522            thread_ts: event
1523                .and_then(|value| value.get("thread_ts"))
1524                .and_then(JsonValue::as_str)
1525                .map(ToString::to_string),
1526            common,
1527        }),
1528        "reaction_added" => SlackEventPayload::ReactionAdded(SlackReactionAddedEventPayload {
1529            reaction: event
1530                .and_then(|value| value.get("reaction"))
1531                .and_then(JsonValue::as_str)
1532                .map(ToString::to_string),
1533            item_user: event
1534                .and_then(|value| value.get("item_user"))
1535                .and_then(JsonValue::as_str)
1536                .map(ToString::to_string),
1537            item: event
1538                .and_then(|value| value.get("item"))
1539                .cloned()
1540                .unwrap_or(JsonValue::Null),
1541            common,
1542        }),
1543        "app_home_opened" => SlackEventPayload::AppHomeOpened(SlackAppHomeOpenedEventPayload {
1544            user: event
1545                .and_then(|value| value.get("user"))
1546                .and_then(JsonValue::as_str)
1547                .map(ToString::to_string),
1548            channel: event
1549                .and_then(|value| value.get("channel"))
1550                .and_then(JsonValue::as_str)
1551                .map(ToString::to_string),
1552            tab: event
1553                .and_then(|value| value.get("tab"))
1554                .and_then(JsonValue::as_str)
1555                .map(ToString::to_string),
1556            view: event
1557                .and_then(|value| value.get("view"))
1558                .cloned()
1559                .unwrap_or(JsonValue::Null),
1560            common,
1561        }),
1562        "assistant_thread_started" => {
1563            let assistant_thread = event
1564                .and_then(|value| value.get("assistant_thread"))
1565                .cloned()
1566                .unwrap_or(JsonValue::Null);
1567            SlackEventPayload::AssistantThreadStarted(SlackAssistantThreadStartedEventPayload {
1568                thread_ts: assistant_thread
1569                    .get("thread_ts")
1570                    .and_then(JsonValue::as_str)
1571                    .map(ToString::to_string),
1572                context: assistant_thread
1573                    .get("context")
1574                    .cloned()
1575                    .unwrap_or(JsonValue::Null),
1576                assistant_thread,
1577                common,
1578            })
1579        }
1580        _ => SlackEventPayload::Other(common),
1581    };
1582    ProviderPayload::Known(KnownProviderPayload::Slack(Box::new(payload)))
1583}
1584
1585fn slack_channel_id(event: Option<&JsonValue>) -> Option<String> {
1586    event
1587        .and_then(|value| value.get("channel"))
1588        .and_then(JsonValue::as_str)
1589        .map(ToString::to_string)
1590        .or_else(|| {
1591            event
1592                .and_then(|value| value.get("item"))
1593                .and_then(|value| value.get("channel"))
1594                .and_then(JsonValue::as_str)
1595                .map(ToString::to_string)
1596        })
1597        .or_else(|| {
1598            event
1599                .and_then(|value| value.get("channel"))
1600                .and_then(|value| value.get("id"))
1601                .and_then(JsonValue::as_str)
1602                .map(ToString::to_string)
1603        })
1604        .or_else(|| {
1605            event
1606                .and_then(|value| value.get("assistant_thread"))
1607                .and_then(|value| value.get("channel_id"))
1608                .and_then(JsonValue::as_str)
1609                .map(ToString::to_string)
1610        })
1611}
1612
1613fn slack_user_id(event: Option<&JsonValue>) -> Option<String> {
1614    event
1615        .and_then(|value| value.get("user"))
1616        .and_then(JsonValue::as_str)
1617        .map(ToString::to_string)
1618        .or_else(|| {
1619            event
1620                .and_then(|value| value.get("user"))
1621                .and_then(|value| value.get("id"))
1622                .and_then(JsonValue::as_str)
1623                .map(ToString::to_string)
1624        })
1625        .or_else(|| {
1626            event
1627                .and_then(|value| value.get("item_user"))
1628                .and_then(JsonValue::as_str)
1629                .map(ToString::to_string)
1630        })
1631        .or_else(|| {
1632            event
1633                .and_then(|value| value.get("assistant_thread"))
1634                .and_then(|value| value.get("user_id"))
1635                .and_then(JsonValue::as_str)
1636                .map(ToString::to_string)
1637        })
1638}
1639
1640fn linear_payload(
1641    _kind: &str,
1642    headers: &BTreeMap<String, String>,
1643    raw: JsonValue,
1644) -> ProviderPayload {
1645    let common = linear_event_common(headers, &raw);
1646    let event = common.event.clone();
1647    let payload = match event.as_str() {
1648        "issue" => LinearEventPayload::Issue(LinearIssueEventPayload {
1649            common,
1650            issue: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1651            changes: parse_linear_issue_changes(raw.get("updatedFrom")),
1652        }),
1653        "comment" => LinearEventPayload::IssueComment(LinearIssueCommentEventPayload {
1654            common,
1655            comment: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1656        }),
1657        "issue_label" => LinearEventPayload::IssueLabel(LinearIssueLabelEventPayload {
1658            common,
1659            label: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1660        }),
1661        "project" => LinearEventPayload::Project(LinearProjectEventPayload {
1662            common,
1663            project: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1664        }),
1665        "cycle" => LinearEventPayload::Cycle(LinearCycleEventPayload {
1666            common,
1667            cycle: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1668        }),
1669        "customer" => LinearEventPayload::Customer(LinearCustomerEventPayload {
1670            common,
1671            customer: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1672        }),
1673        "customer_request" => {
1674            LinearEventPayload::CustomerRequest(LinearCustomerRequestEventPayload {
1675                common,
1676                customer_request: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1677            })
1678        }
1679        _ => LinearEventPayload::Other(common),
1680    };
1681    ProviderPayload::Known(KnownProviderPayload::Linear(payload))
1682}
1683
1684fn linear_event_common(headers: &BTreeMap<String, String>, raw: &JsonValue) -> LinearEventCommon {
1685    LinearEventCommon {
1686        event: linear_event_name(
1687            raw.get("type")
1688                .and_then(JsonValue::as_str)
1689                .or_else(|| headers.get("Linear-Event").map(String::as_str)),
1690        ),
1691        action: raw
1692            .get("action")
1693            .and_then(JsonValue::as_str)
1694            .map(ToString::to_string),
1695        delivery_id: header_value(headers, "Linear-Delivery").map(ToString::to_string),
1696        organization_id: raw
1697            .get("organizationId")
1698            .and_then(JsonValue::as_str)
1699            .map(ToString::to_string),
1700        webhook_timestamp: raw.get("webhookTimestamp").and_then(parse_json_i64ish),
1701        webhook_id: raw
1702            .get("webhookId")
1703            .and_then(JsonValue::as_str)
1704            .map(ToString::to_string),
1705        url: raw
1706            .get("url")
1707            .and_then(JsonValue::as_str)
1708            .map(ToString::to_string),
1709        created_at: raw
1710            .get("createdAt")
1711            .and_then(JsonValue::as_str)
1712            .map(ToString::to_string),
1713        actor: raw.get("actor").cloned().unwrap_or(JsonValue::Null),
1714        raw: raw.clone(),
1715    }
1716}
1717
1718fn linear_event_name(raw_type: Option<&str>) -> String {
1719    match raw_type.unwrap_or_default().to_ascii_lowercase().as_str() {
1720        "issue" => "issue".to_string(),
1721        "comment" | "issuecomment" | "issue_comment" => "comment".to_string(),
1722        "issuelabel" | "issue_label" => "issue_label".to_string(),
1723        "project" | "projectupdate" | "project_update" => "project".to_string(),
1724        "cycle" => "cycle".to_string(),
1725        "customer" => "customer".to_string(),
1726        "customerrequest" | "customer_request" => "customer_request".to_string(),
1727        other if !other.is_empty() => other.to_string(),
1728        _ => "other".to_string(),
1729    }
1730}
1731
1732fn parse_linear_issue_changes(updated_from: Option<&JsonValue>) -> Vec<LinearIssueChange> {
1733    let Some(JsonValue::Object(fields)) = updated_from else {
1734        return Vec::new();
1735    };
1736    let mut changes = Vec::new();
1737    for (field, previous) in fields {
1738        let change = match field.as_str() {
1739            "title" => LinearIssueChange::Title {
1740                previous: previous.as_str().map(ToString::to_string),
1741            },
1742            "description" => LinearIssueChange::Description {
1743                previous: previous.as_str().map(ToString::to_string),
1744            },
1745            "priority" => LinearIssueChange::Priority {
1746                previous: parse_json_i64ish(previous),
1747            },
1748            "estimate" => LinearIssueChange::Estimate {
1749                previous: parse_json_i64ish(previous),
1750            },
1751            "stateId" => LinearIssueChange::StateId {
1752                previous: previous.as_str().map(ToString::to_string),
1753            },
1754            "teamId" => LinearIssueChange::TeamId {
1755                previous: previous.as_str().map(ToString::to_string),
1756            },
1757            "assigneeId" => LinearIssueChange::AssigneeId {
1758                previous: previous.as_str().map(ToString::to_string),
1759            },
1760            "projectId" => LinearIssueChange::ProjectId {
1761                previous: previous.as_str().map(ToString::to_string),
1762            },
1763            "cycleId" => LinearIssueChange::CycleId {
1764                previous: previous.as_str().map(ToString::to_string),
1765            },
1766            "dueDate" => LinearIssueChange::DueDate {
1767                previous: previous.as_str().map(ToString::to_string),
1768            },
1769            "parentId" => LinearIssueChange::ParentId {
1770                previous: previous.as_str().map(ToString::to_string),
1771            },
1772            "sortOrder" => LinearIssueChange::SortOrder {
1773                previous: previous.as_f64(),
1774            },
1775            "labelIds" => LinearIssueChange::LabelIds {
1776                previous: parse_string_array(previous),
1777            },
1778            "completedAt" => LinearIssueChange::CompletedAt {
1779                previous: previous.as_str().map(ToString::to_string),
1780            },
1781            _ => LinearIssueChange::Other {
1782                field: field.clone(),
1783                previous: previous.clone(),
1784            },
1785        };
1786        changes.push(change);
1787    }
1788    changes
1789}
1790
1791fn parse_json_i64ish(value: &JsonValue) -> Option<i64> {
1792    value
1793        .as_i64()
1794        .or_else(|| value.as_u64().and_then(|raw| i64::try_from(raw).ok()))
1795        .or_else(|| value.as_str().and_then(|raw| raw.parse::<i64>().ok()))
1796}
1797
1798fn parse_string_array(value: &JsonValue) -> Vec<String> {
1799    let Some(array) = value.as_array() else {
1800        return Vec::new();
1801    };
1802    array
1803        .iter()
1804        .filter_map(|entry| {
1805            entry.as_str().map(ToString::to_string).or_else(|| {
1806                entry
1807                    .get("id")
1808                    .and_then(JsonValue::as_str)
1809                    .map(ToString::to_string)
1810            })
1811        })
1812        .collect()
1813}
1814
1815fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
1816    headers
1817        .iter()
1818        .find(|(key, _)| key.eq_ignore_ascii_case(name))
1819        .map(|(_, value)| value.as_str())
1820}
1821
1822fn notion_payload(
1823    kind: &str,
1824    headers: &BTreeMap<String, String>,
1825    raw: JsonValue,
1826) -> ProviderPayload {
1827    let workspace_id = raw
1828        .get("workspace_id")
1829        .and_then(JsonValue::as_str)
1830        .map(ToString::to_string);
1831    ProviderPayload::Known(KnownProviderPayload::Notion(Box::new(NotionEventPayload {
1832        event: kind.to_string(),
1833        workspace_id,
1834        request_id: headers
1835            .get("request-id")
1836            .cloned()
1837            .or_else(|| headers.get("x-request-id").cloned()),
1838        subscription_id: raw
1839            .get("subscription_id")
1840            .and_then(JsonValue::as_str)
1841            .map(ToString::to_string),
1842        integration_id: raw
1843            .get("integration_id")
1844            .and_then(JsonValue::as_str)
1845            .map(ToString::to_string),
1846        attempt_number: raw
1847            .get("attempt_number")
1848            .and_then(JsonValue::as_u64)
1849            .and_then(|value| u32::try_from(value).ok()),
1850        entity_id: raw
1851            .get("entity")
1852            .and_then(|value| value.get("id"))
1853            .and_then(JsonValue::as_str)
1854            .map(ToString::to_string),
1855        entity_type: raw
1856            .get("entity")
1857            .and_then(|value| value.get("type"))
1858            .and_then(JsonValue::as_str)
1859            .map(ToString::to_string),
1860        api_version: raw
1861            .get("api_version")
1862            .and_then(JsonValue::as_str)
1863            .map(ToString::to_string),
1864        verification_token: raw
1865            .get("verification_token")
1866            .and_then(JsonValue::as_str)
1867            .map(ToString::to_string),
1868        polled: None,
1869        raw,
1870    })))
1871}
1872
1873fn cron_payload(
1874    _kind: &str,
1875    _headers: &BTreeMap<String, String>,
1876    raw: JsonValue,
1877) -> ProviderPayload {
1878    let cron_id = raw
1879        .get("cron_id")
1880        .and_then(JsonValue::as_str)
1881        .map(ToString::to_string);
1882    let schedule = raw
1883        .get("schedule")
1884        .and_then(JsonValue::as_str)
1885        .map(ToString::to_string);
1886    let tick_at = raw
1887        .get("tick_at")
1888        .and_then(JsonValue::as_str)
1889        .and_then(parse_rfc3339)
1890        .unwrap_or_else(OffsetDateTime::now_utc);
1891    ProviderPayload::Known(KnownProviderPayload::Cron(CronEventPayload {
1892        cron_id,
1893        schedule,
1894        tick_at,
1895        raw,
1896    }))
1897}
1898
1899fn webhook_payload(
1900    _kind: &str,
1901    headers: &BTreeMap<String, String>,
1902    raw: JsonValue,
1903) -> ProviderPayload {
1904    ProviderPayload::Known(KnownProviderPayload::Webhook(GenericWebhookPayload {
1905        source: headers.get("X-Webhook-Source").cloned(),
1906        content_type: headers.get("Content-Type").cloned(),
1907        raw,
1908    }))
1909}
1910
1911fn a2a_push_payload(
1912    _kind: &str,
1913    _headers: &BTreeMap<String, String>,
1914    raw: JsonValue,
1915) -> ProviderPayload {
1916    let task_id = raw
1917        .get("task_id")
1918        .and_then(JsonValue::as_str)
1919        .map(ToString::to_string);
1920    let sender = raw
1921        .get("sender")
1922        .and_then(JsonValue::as_str)
1923        .map(ToString::to_string);
1924    let task_state = raw
1925        .pointer("/status/state")
1926        .or_else(|| raw.pointer("/statusUpdate/status/state"))
1927        .and_then(JsonValue::as_str)
1928        .map(|state| match state {
1929            "cancelled" => "canceled".to_string(),
1930            other => other.to_string(),
1931        });
1932    let artifact = raw
1933        .pointer("/artifactUpdate/artifact")
1934        .or_else(|| raw.get("artifact"))
1935        .cloned();
1936    let kind = task_state
1937        .as_deref()
1938        .map(|state| format!("a2a.task.{state}"))
1939        .unwrap_or_else(|| "a2a.task.update".to_string());
1940    ProviderPayload::Known(KnownProviderPayload::A2aPush(A2aPushPayload {
1941        task_id,
1942        task_state,
1943        artifact,
1944        sender,
1945        raw,
1946        kind,
1947    }))
1948}
1949
1950fn kafka_payload(
1951    kind: &str,
1952    headers: &BTreeMap<String, String>,
1953    raw: JsonValue,
1954) -> ProviderPayload {
1955    ProviderPayload::Known(KnownProviderPayload::Kafka(stream_payload(
1956        kind, headers, raw,
1957    )))
1958}
1959
1960fn nats_payload(kind: &str, headers: &BTreeMap<String, String>, raw: JsonValue) -> ProviderPayload {
1961    ProviderPayload::Known(KnownProviderPayload::Nats(stream_payload(
1962        kind, headers, raw,
1963    )))
1964}
1965
1966fn pulsar_payload(
1967    kind: &str,
1968    headers: &BTreeMap<String, String>,
1969    raw: JsonValue,
1970) -> ProviderPayload {
1971    ProviderPayload::Known(KnownProviderPayload::Pulsar(stream_payload(
1972        kind, headers, raw,
1973    )))
1974}
1975
1976fn postgres_cdc_payload(
1977    kind: &str,
1978    headers: &BTreeMap<String, String>,
1979    raw: JsonValue,
1980) -> ProviderPayload {
1981    ProviderPayload::Known(KnownProviderPayload::PostgresCdc(stream_payload(
1982        kind, headers, raw,
1983    )))
1984}
1985
1986fn email_payload(
1987    kind: &str,
1988    headers: &BTreeMap<String, String>,
1989    raw: JsonValue,
1990) -> ProviderPayload {
1991    ProviderPayload::Known(KnownProviderPayload::Email(stream_payload(
1992        kind, headers, raw,
1993    )))
1994}
1995
1996fn websocket_payload(
1997    kind: &str,
1998    headers: &BTreeMap<String, String>,
1999    raw: JsonValue,
2000) -> ProviderPayload {
2001    ProviderPayload::Known(KnownProviderPayload::Websocket(stream_payload(
2002        kind, headers, raw,
2003    )))
2004}
2005
2006fn stream_payload(
2007    kind: &str,
2008    headers: &BTreeMap<String, String>,
2009    raw: JsonValue,
2010) -> StreamEventPayload {
2011    StreamEventPayload {
2012        event: kind.to_string(),
2013        source: json_stringish(&raw, &["source", "connector", "origin"]),
2014        stream: json_stringish(
2015            &raw,
2016            &["stream", "topic", "subject", "channel", "mailbox", "slot"],
2017        ),
2018        partition: json_stringish(&raw, &["partition", "shard", "consumer"]),
2019        offset: json_stringish(&raw, &["offset", "sequence", "lsn", "message_id"]),
2020        key: json_stringish(&raw, &["key", "message_key", "id", "event_id"]),
2021        timestamp: json_stringish(&raw, &["timestamp", "occurred_at", "received_at", "ts"]),
2022        headers: headers.clone(),
2023        raw,
2024    }
2025}
2026
2027fn json_stringish(raw: &JsonValue, fields: &[&str]) -> Option<String> {
2028    fields.iter().find_map(|field| {
2029        let value = raw.get(*field)?;
2030        value
2031            .as_str()
2032            .map(ToString::to_string)
2033            .or_else(|| parse_json_i64ish(value).map(|number| number.to_string()))
2034            .or_else(|| value.as_u64().map(|number| number.to_string()))
2035    })
2036}
2037
2038fn parse_rfc3339(text: &str) -> Option<OffsetDateTime> {
2039    OffsetDateTime::parse(text, &time::format_description::well_known::Rfc3339).ok()
2040}
2041
2042#[cfg(test)]
2043mod tests {
2044    use super::*;
2045
2046    fn sample_headers() -> BTreeMap<String, String> {
2047        BTreeMap::from([
2048            ("Authorization".to_string(), "Bearer secret".to_string()),
2049            ("Cookie".to_string(), "session=abc".to_string()),
2050            ("User-Agent".to_string(), "GitHub-Hookshot/123".to_string()),
2051            ("X-GitHub-Delivery".to_string(), "delivery-123".to_string()),
2052            ("X-GitHub-Event".to_string(), "issues".to_string()),
2053            ("X-Webhook-Token".to_string(), "token".to_string()),
2054        ])
2055    }
2056
2057    #[test]
2058    fn default_redaction_policy_keeps_safe_headers() {
2059        let redacted = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
2060        assert_eq!(redacted.get("User-Agent").unwrap(), "GitHub-Hookshot/123");
2061        assert_eq!(redacted.get("X-GitHub-Delivery").unwrap(), "delivery-123");
2062        assert_eq!(
2063            redacted.get("Authorization").unwrap(),
2064            REDACTED_HEADER_VALUE
2065        );
2066        assert_eq!(redacted.get("Cookie").unwrap(), REDACTED_HEADER_VALUE);
2067        assert_eq!(
2068            redacted.get("X-Webhook-Token").unwrap(),
2069            REDACTED_HEADER_VALUE
2070        );
2071    }
2072
2073    #[test]
2074    fn provider_catalog_rejects_duplicates() {
2075        let mut catalog = ProviderCatalog::default();
2076        catalog
2077            .register(Arc::new(BuiltinProviderSchema {
2078                provider_id: "github",
2079                harn_schema_name: "GitHubEventPayload",
2080                metadata: provider_metadata_entry(
2081                    "github",
2082                    &["webhook"],
2083                    "GitHubEventPayload",
2084                    &[],
2085                    SignatureVerificationMetadata::None,
2086                    Vec::new(),
2087                    ProviderRuntimeMetadata::Placeholder,
2088                ),
2089                normalize: github_payload,
2090            }))
2091            .unwrap();
2092        let error = catalog
2093            .register(Arc::new(BuiltinProviderSchema {
2094                provider_id: "github",
2095                harn_schema_name: "GitHubEventPayload",
2096                metadata: provider_metadata_entry(
2097                    "github",
2098                    &["webhook"],
2099                    "GitHubEventPayload",
2100                    &[],
2101                    SignatureVerificationMetadata::None,
2102                    Vec::new(),
2103                    ProviderRuntimeMetadata::Placeholder,
2104                ),
2105                normalize: github_payload,
2106            }))
2107            .unwrap_err();
2108        assert_eq!(
2109            error,
2110            ProviderCatalogError::DuplicateProvider("github".to_string())
2111        );
2112    }
2113
2114    #[test]
2115    fn registered_provider_metadata_marks_builtin_connectors() {
2116        let entries = registered_provider_metadata();
2117        let builtin: Vec<&ProviderMetadata> = entries
2118            .iter()
2119            .filter(|entry| matches!(entry.runtime, ProviderRuntimeMetadata::Builtin { .. }))
2120            .collect();
2121
2122        assert_eq!(builtin.len(), 13);
2123        assert!(builtin.iter().any(|entry| entry.provider == "a2a-push"));
2124        assert!(builtin.iter().any(|entry| entry.provider == "cron"));
2125        assert!(builtin.iter().any(|entry| entry.provider == "github"));
2126        assert!(builtin.iter().any(|entry| entry.provider == "linear"));
2127        assert!(builtin.iter().any(|entry| entry.provider == "notion"));
2128        assert!(builtin.iter().any(|entry| entry.provider == "slack"));
2129        assert!(builtin.iter().any(|entry| entry.provider == "webhook"));
2130        let kafka = entries
2131            .iter()
2132            .find(|entry| entry.provider == "kafka")
2133            .expect("kafka stream provider");
2134        assert_eq!(kafka.kinds, vec!["stream".to_string()]);
2135        assert_eq!(kafka.schema_name, "StreamEventPayload");
2136        assert!(matches!(
2137            kafka.runtime,
2138            ProviderRuntimeMetadata::Builtin {
2139                ref connector,
2140                default_signature_variant: None
2141            } if connector == "stream"
2142        ));
2143    }
2144
2145    #[test]
2146    fn trigger_event_round_trip_is_stable() {
2147        let provider = ProviderId::from("github");
2148        let headers = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
2149        let payload = ProviderPayload::normalize(
2150            &provider,
2151            "issues",
2152            &sample_headers(),
2153            serde_json::json!({
2154                "action": "opened",
2155                "installation": {"id": 42},
2156                "issue": {"number": 99}
2157            }),
2158        )
2159        .unwrap();
2160        let event = TriggerEvent {
2161            id: TriggerEventId("trigger_evt_fixed".to_string()),
2162            provider,
2163            kind: "issues".to_string(),
2164            received_at: parse_rfc3339("2026-04-19T07:00:00Z").unwrap(),
2165            occurred_at: Some(parse_rfc3339("2026-04-19T06:59:59Z").unwrap()),
2166            dedupe_key: "delivery-123".to_string(),
2167            trace_id: TraceId("trace_fixed".to_string()),
2168            tenant_id: Some(TenantId("tenant_1".to_string())),
2169            headers,
2170            provider_payload: payload,
2171            signature_status: SignatureStatus::Verified,
2172            dedupe_claimed: false,
2173            batch: None,
2174            raw_body: Some(vec![0, 159, 255, 10]),
2175        };
2176
2177        let once = serde_json::to_value(&event).unwrap();
2178        assert_eq!(once["raw_body"], serde_json::json!("AJ//Cg=="));
2179        let decoded: TriggerEvent = serde_json::from_value(once.clone()).unwrap();
2180        let twice = serde_json::to_value(&decoded).unwrap();
2181        assert_eq!(decoded, event);
2182        assert_eq!(once, twice);
2183    }
2184
2185    #[test]
2186    fn unknown_provider_errors() {
2187        let error = ProviderPayload::normalize(
2188            &ProviderId::from("custom-provider"),
2189            "thing.happened",
2190            &BTreeMap::new(),
2191            serde_json::json!({"ok": true}),
2192        )
2193        .unwrap_err();
2194        assert_eq!(
2195            error,
2196            ProviderCatalogError::UnknownProvider("custom-provider".to_string())
2197        );
2198    }
2199
2200    #[test]
2201    fn provider_normalizes_stream_payloads() {
2202        let payload = ProviderPayload::normalize(
2203            &ProviderId::from("kafka"),
2204            "quote.tick",
2205            &BTreeMap::from([("x-source".to_string(), "feed".to_string())]),
2206            serde_json::json!({
2207                "topic": "quotes",
2208                "partition": 7,
2209                "offset": "42",
2210                "key": "AAPL",
2211                "timestamp": "2026-04-21T12:00:00Z"
2212            }),
2213        )
2214        .expect("stream payload");
2215        let ProviderPayload::Known(KnownProviderPayload::Kafka(payload)) = payload else {
2216            panic!("expected kafka stream payload")
2217        };
2218        assert_eq!(payload.event, "quote.tick");
2219        assert_eq!(payload.stream.as_deref(), Some("quotes"));
2220        assert_eq!(payload.partition.as_deref(), Some("7"));
2221        assert_eq!(payload.offset.as_deref(), Some("42"));
2222        assert_eq!(payload.key.as_deref(), Some("AAPL"));
2223        assert_eq!(payload.timestamp.as_deref(), Some("2026-04-21T12:00:00Z"));
2224    }
2225}