Skip to main content

harn_vm/triggers/
event.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, OnceLock, RwLock};
3
4use serde::{Deserialize, Deserializer, Serialize, Serializer};
5use serde_json::Value as JsonValue;
6use time::OffsetDateTime;
7use uuid::Uuid;
8
9use crate::triggers::test_util::clock;
10
11const REDACTED_HEADER_VALUE: &str = "[redacted]";
12
13fn serialize_optional_bytes_b64<S>(
14    value: &Option<Vec<u8>>,
15    serializer: S,
16) -> Result<S::Ok, S::Error>
17where
18    S: Serializer,
19{
20    use base64::Engine;
21
22    match value {
23        Some(bytes) => {
24            serializer.serialize_some(&base64::engine::general_purpose::STANDARD.encode(bytes))
25        }
26        None => serializer.serialize_none(),
27    }
28}
29
30fn deserialize_optional_bytes_b64<'de, D>(deserializer: D) -> Result<Option<Vec<u8>>, D::Error>
31where
32    D: Deserializer<'de>,
33{
34    use base64::Engine;
35
36    let encoded = Option::<String>::deserialize(deserializer)?;
37    encoded
38        .map(|text| {
39            base64::engine::general_purpose::STANDARD
40                .decode(text.as_bytes())
41                .map_err(serde::de::Error::custom)
42        })
43        .transpose()
44}
45
46#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
47#[serde(transparent)]
48pub struct TriggerEventId(pub String);
49
50impl TriggerEventId {
51    pub fn new() -> Self {
52        Self(format!("trigger_evt_{}", Uuid::now_v7()))
53    }
54}
55
56impl Default for TriggerEventId {
57    fn default() -> Self {
58        Self::new()
59    }
60}
61
62#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
63#[serde(transparent)]
64pub struct ProviderId(pub String);
65
66impl ProviderId {
67    pub fn new(value: impl Into<String>) -> Self {
68        Self(value.into())
69    }
70
71    pub fn as_str(&self) -> &str {
72        self.0.as_str()
73    }
74}
75
76impl From<&str> for ProviderId {
77    fn from(value: &str) -> Self {
78        Self::new(value)
79    }
80}
81
82impl From<String> for ProviderId {
83    fn from(value: String) -> Self {
84        Self::new(value)
85    }
86}
87
88#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
89#[serde(transparent)]
90pub struct TraceId(pub String);
91
92impl TraceId {
93    pub fn new() -> Self {
94        Self(format!("trace_{}", Uuid::now_v7()))
95    }
96}
97
98impl Default for TraceId {
99    fn default() -> Self {
100        Self::new()
101    }
102}
103
104#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
105#[serde(transparent)]
106pub struct TenantId(pub String);
107
108impl TenantId {
109    pub fn new(value: impl Into<String>) -> Self {
110        Self(value.into())
111    }
112}
113
114#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
115#[serde(tag = "state", rename_all = "snake_case")]
116pub enum SignatureStatus {
117    Verified,
118    Unsigned,
119    Failed { reason: String },
120}
121
122#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
123pub struct GitHubEventCommon {
124    pub event: String,
125    pub action: Option<String>,
126    pub delivery_id: Option<String>,
127    pub installation_id: Option<i64>,
128    pub raw: JsonValue,
129}
130
131#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
132pub struct GitHubIssuesEventPayload {
133    #[serde(flatten)]
134    pub common: GitHubEventCommon,
135    pub issue: JsonValue,
136}
137
138#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
139pub struct GitHubPullRequestEventPayload {
140    #[serde(flatten)]
141    pub common: GitHubEventCommon,
142    pub pull_request: JsonValue,
143}
144
145#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
146pub struct GitHubIssueCommentEventPayload {
147    #[serde(flatten)]
148    pub common: GitHubEventCommon,
149    pub issue: JsonValue,
150    pub comment: JsonValue,
151}
152
153#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
154pub struct GitHubPullRequestReviewEventPayload {
155    #[serde(flatten)]
156    pub common: GitHubEventCommon,
157    pub pull_request: JsonValue,
158    pub review: JsonValue,
159}
160
161#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
162pub struct GitHubPushEventPayload {
163    #[serde(flatten)]
164    pub common: GitHubEventCommon,
165    #[serde(default)]
166    pub commits: Vec<JsonValue>,
167    pub distinct_size: Option<i64>,
168}
169
170#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
171pub struct GitHubWorkflowRunEventPayload {
172    #[serde(flatten)]
173    pub common: GitHubEventCommon,
174    pub workflow_run: JsonValue,
175}
176
177#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
178pub struct GitHubDeploymentStatusEventPayload {
179    #[serde(flatten)]
180    pub common: GitHubEventCommon,
181    pub deployment_status: JsonValue,
182    pub deployment: JsonValue,
183}
184
185#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
186pub struct GitHubCheckRunEventPayload {
187    #[serde(flatten)]
188    pub common: GitHubEventCommon,
189    pub check_run: JsonValue,
190}
191
192#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
193#[serde(untagged)]
194pub enum GitHubEventPayload {
195    Issues(GitHubIssuesEventPayload),
196    PullRequest(GitHubPullRequestEventPayload),
197    IssueComment(GitHubIssueCommentEventPayload),
198    PullRequestReview(GitHubPullRequestReviewEventPayload),
199    Push(GitHubPushEventPayload),
200    WorkflowRun(GitHubWorkflowRunEventPayload),
201    DeploymentStatus(GitHubDeploymentStatusEventPayload),
202    CheckRun(GitHubCheckRunEventPayload),
203    Other(GitHubEventCommon),
204}
205
206#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
207pub struct SlackEventCommon {
208    pub event: String,
209    pub event_id: Option<String>,
210    pub api_app_id: Option<String>,
211    pub team_id: Option<String>,
212    pub channel_id: Option<String>,
213    pub user_id: Option<String>,
214    pub event_ts: Option<String>,
215    pub raw: JsonValue,
216}
217
218#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
219pub struct SlackMessageEventPayload {
220    #[serde(flatten)]
221    pub common: SlackEventCommon,
222    pub subtype: Option<String>,
223    pub channel_type: Option<String>,
224    pub channel: Option<String>,
225    pub user: Option<String>,
226    pub text: Option<String>,
227    pub ts: Option<String>,
228    pub thread_ts: Option<String>,
229}
230
231#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
232pub struct SlackAppMentionEventPayload {
233    #[serde(flatten)]
234    pub common: SlackEventCommon,
235    pub channel: Option<String>,
236    pub user: Option<String>,
237    pub text: Option<String>,
238    pub ts: Option<String>,
239    pub thread_ts: Option<String>,
240}
241
242#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
243pub struct SlackReactionAddedEventPayload {
244    #[serde(flatten)]
245    pub common: SlackEventCommon,
246    pub reaction: Option<String>,
247    pub item_user: Option<String>,
248    pub item: JsonValue,
249}
250
251#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
252pub struct SlackAppHomeOpenedEventPayload {
253    #[serde(flatten)]
254    pub common: SlackEventCommon,
255    pub user: Option<String>,
256    pub channel: Option<String>,
257    pub tab: Option<String>,
258    pub view: JsonValue,
259}
260
261#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
262pub struct SlackAssistantThreadStartedEventPayload {
263    #[serde(flatten)]
264    pub common: SlackEventCommon,
265    pub assistant_thread: JsonValue,
266    pub thread_ts: Option<String>,
267    pub context: JsonValue,
268}
269
270#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
271#[serde(untagged)]
272pub enum SlackEventPayload {
273    Message(SlackMessageEventPayload),
274    AppMention(SlackAppMentionEventPayload),
275    ReactionAdded(SlackReactionAddedEventPayload),
276    AppHomeOpened(SlackAppHomeOpenedEventPayload),
277    AssistantThreadStarted(SlackAssistantThreadStartedEventPayload),
278    Other(SlackEventCommon),
279}
280
281#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
282pub struct LinearEventCommon {
283    pub event: String,
284    pub action: Option<String>,
285    pub delivery_id: Option<String>,
286    pub organization_id: Option<String>,
287    pub webhook_timestamp: Option<i64>,
288    pub webhook_id: Option<String>,
289    pub url: Option<String>,
290    pub created_at: Option<String>,
291    pub actor: JsonValue,
292    pub raw: JsonValue,
293}
294
295#[derive(Clone, Debug, PartialEq)]
296pub enum LinearIssueChange {
297    Title { previous: Option<String> },
298    Description { previous: Option<String> },
299    Priority { previous: Option<i64> },
300    Estimate { previous: Option<i64> },
301    StateId { previous: Option<String> },
302    TeamId { previous: Option<String> },
303    AssigneeId { previous: Option<String> },
304    ProjectId { previous: Option<String> },
305    CycleId { previous: Option<String> },
306    DueDate { previous: Option<String> },
307    ParentId { previous: Option<String> },
308    SortOrder { previous: Option<f64> },
309    LabelIds { previous: Vec<String> },
310    CompletedAt { previous: Option<String> },
311    Other { field: String, previous: JsonValue },
312}
313
314impl Serialize for LinearIssueChange {
315    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
316    where
317        S: Serializer,
318    {
319        let value = match self {
320            Self::Title { previous } => {
321                serde_json::json!({ "field_name": "title", "previous": previous })
322            }
323            Self::Description { previous } => {
324                serde_json::json!({ "field_name": "description", "previous": previous })
325            }
326            Self::Priority { previous } => {
327                serde_json::json!({ "field_name": "priority", "previous": previous })
328            }
329            Self::Estimate { previous } => {
330                serde_json::json!({ "field_name": "estimate", "previous": previous })
331            }
332            Self::StateId { previous } => {
333                serde_json::json!({ "field_name": "state_id", "previous": previous })
334            }
335            Self::TeamId { previous } => {
336                serde_json::json!({ "field_name": "team_id", "previous": previous })
337            }
338            Self::AssigneeId { previous } => {
339                serde_json::json!({ "field_name": "assignee_id", "previous": previous })
340            }
341            Self::ProjectId { previous } => {
342                serde_json::json!({ "field_name": "project_id", "previous": previous })
343            }
344            Self::CycleId { previous } => {
345                serde_json::json!({ "field_name": "cycle_id", "previous": previous })
346            }
347            Self::DueDate { previous } => {
348                serde_json::json!({ "field_name": "due_date", "previous": previous })
349            }
350            Self::ParentId { previous } => {
351                serde_json::json!({ "field_name": "parent_id", "previous": previous })
352            }
353            Self::SortOrder { previous } => {
354                serde_json::json!({ "field_name": "sort_order", "previous": previous })
355            }
356            Self::LabelIds { previous } => {
357                serde_json::json!({ "field_name": "label_ids", "previous": previous })
358            }
359            Self::CompletedAt { previous } => {
360                serde_json::json!({ "field_name": "completed_at", "previous": previous })
361            }
362            Self::Other { field, previous } => {
363                serde_json::json!({ "field_name": "other", "field": field, "previous": previous })
364            }
365        };
366        value.serialize(serializer)
367    }
368}
369
370impl<'de> Deserialize<'de> for LinearIssueChange {
371    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
372    where
373        D: Deserializer<'de>,
374    {
375        let value = JsonValue::deserialize(deserializer)?;
376        let field_name = value
377            .get("field_name")
378            .and_then(JsonValue::as_str)
379            .ok_or_else(|| serde::de::Error::custom("linear issue change missing field_name"))?;
380        let previous = value.get("previous").cloned().unwrap_or(JsonValue::Null);
381        Ok(match field_name {
382            "title" => Self::Title {
383                previous: previous.as_str().map(ToString::to_string),
384            },
385            "description" => Self::Description {
386                previous: previous.as_str().map(ToString::to_string),
387            },
388            "priority" => Self::Priority {
389                previous: parse_json_i64ish(&previous),
390            },
391            "estimate" => Self::Estimate {
392                previous: parse_json_i64ish(&previous),
393            },
394            "state_id" => Self::StateId {
395                previous: previous.as_str().map(ToString::to_string),
396            },
397            "team_id" => Self::TeamId {
398                previous: previous.as_str().map(ToString::to_string),
399            },
400            "assignee_id" => Self::AssigneeId {
401                previous: previous.as_str().map(ToString::to_string),
402            },
403            "project_id" => Self::ProjectId {
404                previous: previous.as_str().map(ToString::to_string),
405            },
406            "cycle_id" => Self::CycleId {
407                previous: previous.as_str().map(ToString::to_string),
408            },
409            "due_date" => Self::DueDate {
410                previous: previous.as_str().map(ToString::to_string),
411            },
412            "parent_id" => Self::ParentId {
413                previous: previous.as_str().map(ToString::to_string),
414            },
415            "sort_order" => Self::SortOrder {
416                previous: previous.as_f64(),
417            },
418            "label_ids" => Self::LabelIds {
419                previous: parse_string_array(&previous),
420            },
421            "completed_at" => Self::CompletedAt {
422                previous: previous.as_str().map(ToString::to_string),
423            },
424            "other" => Self::Other {
425                field: value
426                    .get("field")
427                    .and_then(JsonValue::as_str)
428                    .map(ToString::to_string)
429                    .unwrap_or_else(|| "unknown".to_string()),
430                previous,
431            },
432            other => Self::Other {
433                field: other.to_string(),
434                previous,
435            },
436        })
437    }
438}
439
440#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
441pub struct LinearIssueEventPayload {
442    #[serde(flatten)]
443    pub common: LinearEventCommon,
444    pub issue: JsonValue,
445    #[serde(default)]
446    pub changes: Vec<LinearIssueChange>,
447}
448
449#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
450pub struct LinearIssueCommentEventPayload {
451    #[serde(flatten)]
452    pub common: LinearEventCommon,
453    pub comment: JsonValue,
454}
455
456#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
457pub struct LinearIssueLabelEventPayload {
458    #[serde(flatten)]
459    pub common: LinearEventCommon,
460    pub label: JsonValue,
461}
462
463#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
464pub struct LinearProjectEventPayload {
465    #[serde(flatten)]
466    pub common: LinearEventCommon,
467    pub project: JsonValue,
468}
469
470#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
471pub struct LinearCycleEventPayload {
472    #[serde(flatten)]
473    pub common: LinearEventCommon,
474    pub cycle: JsonValue,
475}
476
477#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
478pub struct LinearCustomerEventPayload {
479    #[serde(flatten)]
480    pub common: LinearEventCommon,
481    pub customer: JsonValue,
482}
483
484#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
485pub struct LinearCustomerRequestEventPayload {
486    #[serde(flatten)]
487    pub common: LinearEventCommon,
488    pub customer_request: JsonValue,
489}
490
491#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
492#[serde(untagged)]
493pub enum LinearEventPayload {
494    Issue(LinearIssueEventPayload),
495    IssueComment(LinearIssueCommentEventPayload),
496    IssueLabel(LinearIssueLabelEventPayload),
497    Project(LinearProjectEventPayload),
498    Cycle(LinearCycleEventPayload),
499    Customer(LinearCustomerEventPayload),
500    CustomerRequest(LinearCustomerRequestEventPayload),
501    Other(LinearEventCommon),
502}
503
504#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
505pub struct NotionPolledChangeEvent {
506    pub resource: String,
507    pub source_id: String,
508    pub entity_id: String,
509    pub high_water_mark: String,
510    pub before: Option<JsonValue>,
511    pub after: JsonValue,
512}
513
514#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
515pub struct NotionEventPayload {
516    pub event: String,
517    pub workspace_id: Option<String>,
518    pub request_id: Option<String>,
519    pub subscription_id: Option<String>,
520    pub integration_id: Option<String>,
521    pub attempt_number: Option<u32>,
522    pub entity_id: Option<String>,
523    pub entity_type: Option<String>,
524    pub api_version: Option<String>,
525    pub verification_token: Option<String>,
526    pub polled: Option<NotionPolledChangeEvent>,
527    pub raw: JsonValue,
528}
529
530#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
531pub struct CronEventPayload {
532    pub cron_id: Option<String>,
533    pub schedule: Option<String>,
534    #[serde(with = "time::serde::rfc3339")]
535    pub tick_at: OffsetDateTime,
536    pub raw: JsonValue,
537}
538
539#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
540pub struct GenericWebhookPayload {
541    pub source: Option<String>,
542    pub content_type: Option<String>,
543    pub raw: JsonValue,
544}
545
546#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
547pub struct A2aPushPayload {
548    pub task_id: Option<String>,
549    pub task_state: Option<String>,
550    pub artifact: Option<JsonValue>,
551    pub sender: Option<String>,
552    pub raw: JsonValue,
553    pub kind: String,
554}
555
556#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
557pub struct StreamEventPayload {
558    pub event: String,
559    pub source: Option<String>,
560    pub stream: Option<String>,
561    pub partition: Option<String>,
562    pub offset: Option<String>,
563    pub key: Option<String>,
564    pub timestamp: Option<String>,
565    #[serde(default)]
566    pub headers: BTreeMap<String, String>,
567    pub raw: JsonValue,
568}
569
570#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
571pub struct ExtensionProviderPayload {
572    pub provider: String,
573    pub schema_name: String,
574    pub raw: JsonValue,
575}
576
577#[allow(clippy::large_enum_variant)]
578#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
579#[serde(untagged)]
580pub enum ProviderPayload {
581    Known(KnownProviderPayload),
582    Extension(ExtensionProviderPayload),
583}
584
585impl ProviderPayload {
586    pub fn provider(&self) -> &str {
587        match self {
588            Self::Known(known) => known.provider(),
589            Self::Extension(payload) => payload.provider.as_str(),
590        }
591    }
592
593    pub fn normalize(
594        provider: &ProviderId,
595        kind: &str,
596        headers: &BTreeMap<String, String>,
597        raw: JsonValue,
598    ) -> Result<Self, ProviderCatalogError> {
599        provider_catalog()
600            .read()
601            .expect("provider catalog poisoned")
602            .normalize(provider, kind, headers, raw)
603    }
604}
605
606#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
607#[serde(tag = "provider")]
608pub enum KnownProviderPayload {
609    #[serde(rename = "github")]
610    GitHub(GitHubEventPayload),
611    #[serde(rename = "slack")]
612    Slack(Box<SlackEventPayload>),
613    #[serde(rename = "linear")]
614    Linear(LinearEventPayload),
615    #[serde(rename = "notion")]
616    Notion(Box<NotionEventPayload>),
617    #[serde(rename = "cron")]
618    Cron(CronEventPayload),
619    #[serde(rename = "webhook")]
620    Webhook(GenericWebhookPayload),
621    #[serde(rename = "a2a-push")]
622    A2aPush(A2aPushPayload),
623    #[serde(rename = "kafka")]
624    Kafka(StreamEventPayload),
625    #[serde(rename = "nats")]
626    Nats(StreamEventPayload),
627    #[serde(rename = "pulsar")]
628    Pulsar(StreamEventPayload),
629    #[serde(rename = "postgres-cdc")]
630    PostgresCdc(StreamEventPayload),
631    #[serde(rename = "email")]
632    Email(StreamEventPayload),
633    #[serde(rename = "websocket")]
634    Websocket(StreamEventPayload),
635}
636
637impl KnownProviderPayload {
638    pub fn provider(&self) -> &str {
639        match self {
640            Self::GitHub(_) => "github",
641            Self::Slack(_) => "slack",
642            Self::Linear(_) => "linear",
643            Self::Notion(_) => "notion",
644            Self::Cron(_) => "cron",
645            Self::Webhook(_) => "webhook",
646            Self::A2aPush(_) => "a2a-push",
647            Self::Kafka(_) => "kafka",
648            Self::Nats(_) => "nats",
649            Self::Pulsar(_) => "pulsar",
650            Self::PostgresCdc(_) => "postgres-cdc",
651            Self::Email(_) => "email",
652            Self::Websocket(_) => "websocket",
653        }
654    }
655}
656
657#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
658pub struct TriggerEvent {
659    pub id: TriggerEventId,
660    pub provider: ProviderId,
661    pub kind: String,
662    #[serde(with = "time::serde::rfc3339")]
663    pub received_at: OffsetDateTime,
664    #[serde(with = "time::serde::rfc3339::option")]
665    pub occurred_at: Option<OffsetDateTime>,
666    pub dedupe_key: String,
667    pub trace_id: TraceId,
668    pub tenant_id: Option<TenantId>,
669    pub headers: BTreeMap<String, String>,
670    #[serde(default, skip_serializing_if = "Option::is_none")]
671    pub batch: Option<Vec<JsonValue>>,
672    #[serde(
673        default,
674        skip_serializing_if = "Option::is_none",
675        serialize_with = "serialize_optional_bytes_b64",
676        deserialize_with = "deserialize_optional_bytes_b64"
677    )]
678    pub raw_body: Option<Vec<u8>>,
679    pub provider_payload: ProviderPayload,
680    pub signature_status: SignatureStatus,
681    #[serde(skip)]
682    pub dedupe_claimed: bool,
683}
684
685impl TriggerEvent {
686    #[allow(clippy::too_many_arguments)]
687    pub fn new(
688        provider: ProviderId,
689        kind: impl Into<String>,
690        occurred_at: Option<OffsetDateTime>,
691        dedupe_key: impl Into<String>,
692        tenant_id: Option<TenantId>,
693        headers: BTreeMap<String, String>,
694        provider_payload: ProviderPayload,
695        signature_status: SignatureStatus,
696    ) -> Self {
697        Self {
698            id: TriggerEventId::new(),
699            provider,
700            kind: kind.into(),
701            received_at: clock::now_utc(),
702            occurred_at,
703            dedupe_key: dedupe_key.into(),
704            trace_id: TraceId::new(),
705            tenant_id,
706            headers,
707            batch: None,
708            raw_body: None,
709            provider_payload,
710            signature_status,
711            dedupe_claimed: false,
712        }
713    }
714
715    pub fn dedupe_claimed(&self) -> bool {
716        self.dedupe_claimed
717    }
718
719    pub fn mark_dedupe_claimed(&mut self) {
720        self.dedupe_claimed = true;
721    }
722}
723
724#[derive(Clone, Debug, PartialEq, Eq)]
725pub struct HeaderRedactionPolicy {
726    safe_exact_names: BTreeSet<String>,
727}
728
729impl HeaderRedactionPolicy {
730    pub fn with_safe_header(mut self, name: impl Into<String>) -> Self {
731        self.safe_exact_names
732            .insert(name.into().to_ascii_lowercase());
733        self
734    }
735
736    fn should_keep(&self, name: &str) -> bool {
737        let lower = name.to_ascii_lowercase();
738        if self.safe_exact_names.contains(lower.as_str()) {
739            return true;
740        }
741        matches!(
742            lower.as_str(),
743            "user-agent"
744                | "request-id"
745                | "x-request-id"
746                | "x-correlation-id"
747                | "content-type"
748                | "content-length"
749                | "x-github-event"
750                | "x-github-delivery"
751                | "x-github-hook-id"
752                | "x-hub-signature-256"
753                | "x-slack-request-timestamp"
754                | "x-slack-signature"
755                | "x-linear-signature"
756                | "x-notion-signature"
757                | "x-a2a-signature"
758                | "x-a2a-delivery"
759        ) || lower.ends_with("-event")
760            || lower.ends_with("-delivery")
761            || lower.contains("timestamp")
762            || lower.contains("request-id")
763    }
764
765    fn should_redact(&self, name: &str) -> bool {
766        let lower = name.to_ascii_lowercase();
767        if self.should_keep(lower.as_str()) {
768            return false;
769        }
770        lower.contains("authorization")
771            || lower.contains("cookie")
772            || lower.contains("secret")
773            || lower.contains("token")
774            || lower.contains("key")
775    }
776}
777
778impl Default for HeaderRedactionPolicy {
779    fn default() -> Self {
780        Self {
781            safe_exact_names: BTreeSet::from([
782                "content-length".to_string(),
783                "content-type".to_string(),
784                "request-id".to_string(),
785                "user-agent".to_string(),
786                "x-a2a-delivery".to_string(),
787                "x-a2a-signature".to_string(),
788                "x-correlation-id".to_string(),
789                "x-github-delivery".to_string(),
790                "x-github-event".to_string(),
791                "x-github-hook-id".to_string(),
792                "x-hub-signature-256".to_string(),
793                "x-linear-signature".to_string(),
794                "x-notion-signature".to_string(),
795                "x-request-id".to_string(),
796                "x-slack-request-timestamp".to_string(),
797                "x-slack-signature".to_string(),
798            ]),
799        }
800    }
801}
802
803pub fn redact_headers(
804    headers: &BTreeMap<String, String>,
805    policy: &HeaderRedactionPolicy,
806) -> BTreeMap<String, String> {
807    headers
808        .iter()
809        .map(|(name, value)| {
810            if policy.should_redact(name) {
811                (name.clone(), REDACTED_HEADER_VALUE.to_string())
812            } else {
813                (name.clone(), value.clone())
814            }
815        })
816        .collect()
817}
818
819#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
820pub struct ProviderSecretRequirement {
821    pub name: String,
822    pub required: bool,
823    pub namespace: String,
824}
825
826#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
827pub struct ProviderOutboundMethod {
828    pub name: String,
829}
830
831#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
832#[serde(tag = "kind", rename_all = "snake_case")]
833pub enum SignatureVerificationMetadata {
834    #[default]
835    None,
836    Hmac {
837        variant: String,
838        raw_body: bool,
839        signature_header: String,
840        timestamp_header: Option<String>,
841        id_header: Option<String>,
842        default_tolerance_secs: Option<i64>,
843        digest: String,
844        encoding: String,
845    },
846}
847
848#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
849#[serde(tag = "kind", rename_all = "snake_case")]
850pub enum ProviderRuntimeMetadata {
851    Builtin {
852        connector: String,
853        default_signature_variant: Option<String>,
854    },
855    #[default]
856    Placeholder,
857}
858
859#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
860pub struct ProviderMetadata {
861    pub provider: String,
862    #[serde(default)]
863    pub kinds: Vec<String>,
864    pub schema_name: String,
865    #[serde(default)]
866    pub outbound_methods: Vec<ProviderOutboundMethod>,
867    #[serde(default)]
868    pub secret_requirements: Vec<ProviderSecretRequirement>,
869    #[serde(default)]
870    pub signature_verification: SignatureVerificationMetadata,
871    #[serde(default)]
872    pub runtime: ProviderRuntimeMetadata,
873}
874
875impl ProviderMetadata {
876    pub fn supports_kind(&self, kind: &str) -> bool {
877        self.kinds.iter().any(|candidate| candidate == kind)
878    }
879
880    pub fn required_secret_names(&self) -> impl Iterator<Item = &str> {
881        self.secret_requirements
882            .iter()
883            .filter(|requirement| requirement.required)
884            .map(|requirement| requirement.name.as_str())
885    }
886}
887
888pub trait ProviderSchema: Send + Sync {
889    fn provider_id(&self) -> &'static str;
890    fn harn_schema_name(&self) -> &'static str;
891    fn metadata(&self) -> ProviderMetadata {
892        ProviderMetadata {
893            provider: self.provider_id().to_string(),
894            schema_name: self.harn_schema_name().to_string(),
895            ..ProviderMetadata::default()
896        }
897    }
898    fn normalize(
899        &self,
900        kind: &str,
901        headers: &BTreeMap<String, String>,
902        raw: JsonValue,
903    ) -> Result<ProviderPayload, ProviderCatalogError>;
904}
905
906#[derive(Clone, Debug, PartialEq, Eq)]
907pub enum ProviderCatalogError {
908    DuplicateProvider(String),
909    UnknownProvider(String),
910}
911
912impl std::fmt::Display for ProviderCatalogError {
913    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
914        match self {
915            Self::DuplicateProvider(provider) => {
916                write!(f, "provider `{provider}` is already registered")
917            }
918            Self::UnknownProvider(provider) => write!(f, "provider `{provider}` is not registered"),
919        }
920    }
921}
922
923impl std::error::Error for ProviderCatalogError {}
924
925#[derive(Clone, Default)]
926pub struct ProviderCatalog {
927    providers: BTreeMap<String, Arc<dyn ProviderSchema>>,
928}
929
930impl ProviderCatalog {
931    pub fn with_defaults() -> Self {
932        let mut catalog = Self::default();
933        for schema in default_provider_schemas() {
934            catalog
935                .register(schema)
936                .expect("default providers must register cleanly");
937        }
938        catalog
939    }
940
941    pub fn register(
942        &mut self,
943        schema: Arc<dyn ProviderSchema>,
944    ) -> Result<(), ProviderCatalogError> {
945        let provider = schema.provider_id().to_string();
946        if self.providers.contains_key(provider.as_str()) {
947            return Err(ProviderCatalogError::DuplicateProvider(provider));
948        }
949        self.providers.insert(provider, schema);
950        Ok(())
951    }
952
953    pub fn normalize(
954        &self,
955        provider: &ProviderId,
956        kind: &str,
957        headers: &BTreeMap<String, String>,
958        raw: JsonValue,
959    ) -> Result<ProviderPayload, ProviderCatalogError> {
960        let schema = self
961            .providers
962            .get(provider.as_str())
963            .ok_or_else(|| ProviderCatalogError::UnknownProvider(provider.0.clone()))?;
964        schema.normalize(kind, headers, raw)
965    }
966
967    pub fn schema_names(&self) -> BTreeMap<String, String> {
968        self.providers
969            .iter()
970            .map(|(provider, schema)| (provider.clone(), schema.harn_schema_name().to_string()))
971            .collect()
972    }
973
974    pub fn entries(&self) -> Vec<ProviderMetadata> {
975        self.providers
976            .values()
977            .map(|schema| schema.metadata())
978            .collect()
979    }
980
981    pub fn metadata_for(&self, provider: &str) -> Option<ProviderMetadata> {
982        self.providers.get(provider).map(|schema| schema.metadata())
983    }
984}
985
986pub fn register_provider_schema(
987    schema: Arc<dyn ProviderSchema>,
988) -> Result<(), ProviderCatalogError> {
989    provider_catalog()
990        .write()
991        .expect("provider catalog poisoned")
992        .register(schema)
993}
994
995pub fn reset_provider_catalog() {
996    *provider_catalog()
997        .write()
998        .expect("provider catalog poisoned") = ProviderCatalog::with_defaults();
999}
1000
1001pub fn registered_provider_schema_names() -> BTreeMap<String, String> {
1002    provider_catalog()
1003        .read()
1004        .expect("provider catalog poisoned")
1005        .schema_names()
1006}
1007
1008pub fn registered_provider_metadata() -> Vec<ProviderMetadata> {
1009    provider_catalog()
1010        .read()
1011        .expect("provider catalog poisoned")
1012        .entries()
1013}
1014
1015pub fn provider_metadata(provider: &str) -> Option<ProviderMetadata> {
1016    provider_catalog()
1017        .read()
1018        .expect("provider catalog poisoned")
1019        .metadata_for(provider)
1020}
1021
1022fn provider_catalog() -> &'static RwLock<ProviderCatalog> {
1023    static PROVIDER_CATALOG: OnceLock<RwLock<ProviderCatalog>> = OnceLock::new();
1024    PROVIDER_CATALOG.get_or_init(|| RwLock::new(ProviderCatalog::with_defaults()))
1025}
1026
1027struct BuiltinProviderSchema {
1028    provider_id: &'static str,
1029    harn_schema_name: &'static str,
1030    metadata: ProviderMetadata,
1031    normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
1032}
1033
1034impl ProviderSchema for BuiltinProviderSchema {
1035    fn provider_id(&self) -> &'static str {
1036        self.provider_id
1037    }
1038
1039    fn harn_schema_name(&self) -> &'static str {
1040        self.harn_schema_name
1041    }
1042
1043    fn metadata(&self) -> ProviderMetadata {
1044        self.metadata.clone()
1045    }
1046
1047    fn normalize(
1048        &self,
1049        kind: &str,
1050        headers: &BTreeMap<String, String>,
1051        raw: JsonValue,
1052    ) -> Result<ProviderPayload, ProviderCatalogError> {
1053        Ok((self.normalize)(kind, headers, raw))
1054    }
1055}
1056
1057fn provider_metadata_entry(
1058    provider: &str,
1059    kinds: &[&str],
1060    schema_name: &str,
1061    outbound_methods: &[&str],
1062    signature_verification: SignatureVerificationMetadata,
1063    secret_requirements: Vec<ProviderSecretRequirement>,
1064    runtime: ProviderRuntimeMetadata,
1065) -> ProviderMetadata {
1066    ProviderMetadata {
1067        provider: provider.to_string(),
1068        kinds: kinds.iter().map(|kind| kind.to_string()).collect(),
1069        schema_name: schema_name.to_string(),
1070        outbound_methods: outbound_methods
1071            .iter()
1072            .map(|name| ProviderOutboundMethod {
1073                name: (*name).to_string(),
1074            })
1075            .collect(),
1076        secret_requirements,
1077        signature_verification,
1078        runtime,
1079    }
1080}
1081
1082fn hmac_signature_metadata(
1083    variant: &str,
1084    signature_header: &str,
1085    timestamp_header: Option<&str>,
1086    id_header: Option<&str>,
1087    default_tolerance_secs: Option<i64>,
1088    encoding: &str,
1089) -> SignatureVerificationMetadata {
1090    SignatureVerificationMetadata::Hmac {
1091        variant: variant.to_string(),
1092        raw_body: true,
1093        signature_header: signature_header.to_string(),
1094        timestamp_header: timestamp_header.map(ToString::to_string),
1095        id_header: id_header.map(ToString::to_string),
1096        default_tolerance_secs,
1097        digest: "sha256".to_string(),
1098        encoding: encoding.to_string(),
1099    }
1100}
1101
1102fn required_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1103    ProviderSecretRequirement {
1104        name: name.to_string(),
1105        required: true,
1106        namespace: namespace.to_string(),
1107    }
1108}
1109
1110fn outbound_method(name: &str) -> ProviderOutboundMethod {
1111    ProviderOutboundMethod {
1112        name: name.to_string(),
1113    }
1114}
1115
1116fn optional_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1117    ProviderSecretRequirement {
1118        name: name.to_string(),
1119        required: false,
1120        namespace: namespace.to_string(),
1121    }
1122}
1123
1124fn default_provider_schemas() -> Vec<Arc<dyn ProviderSchema>> {
1125    vec![
1126        Arc::new(BuiltinProviderSchema {
1127            provider_id: "github",
1128            harn_schema_name: "GitHubEventPayload",
1129            metadata: provider_metadata_entry(
1130                "github",
1131                &["webhook"],
1132                "GitHubEventPayload",
1133                &[],
1134                hmac_signature_metadata(
1135                    "github",
1136                    "X-Hub-Signature-256",
1137                    None,
1138                    Some("X-GitHub-Delivery"),
1139                    None,
1140                    "hex",
1141                ),
1142                vec![required_secret("signing_secret", "github")],
1143                ProviderRuntimeMetadata::Builtin {
1144                    connector: "webhook".to_string(),
1145                    default_signature_variant: Some("github".to_string()),
1146                },
1147            ),
1148            normalize: github_payload,
1149        }),
1150        Arc::new(BuiltinProviderSchema {
1151            provider_id: "slack",
1152            harn_schema_name: "SlackEventPayload",
1153            metadata: provider_metadata_entry(
1154                "slack",
1155                &["webhook"],
1156                "SlackEventPayload",
1157                &[
1158                    "post_message",
1159                    "update_message",
1160                    "add_reaction",
1161                    "open_view",
1162                    "user_info",
1163                    "api_call",
1164                    "upload_file",
1165                ],
1166                hmac_signature_metadata(
1167                    "slack",
1168                    "X-Slack-Signature",
1169                    Some("X-Slack-Request-Timestamp"),
1170                    None,
1171                    Some(300),
1172                    "hex",
1173                ),
1174                vec![required_secret("signing_secret", "slack")],
1175                ProviderRuntimeMetadata::Builtin {
1176                    connector: "slack".to_string(),
1177                    default_signature_variant: Some("slack".to_string()),
1178                },
1179            ),
1180            normalize: slack_payload,
1181        }),
1182        Arc::new(BuiltinProviderSchema {
1183            provider_id: "linear",
1184            harn_schema_name: "LinearEventPayload",
1185            metadata: {
1186                let mut metadata = provider_metadata_entry(
1187                    "linear",
1188                    &["webhook"],
1189                    "LinearEventPayload",
1190                    &[],
1191                    hmac_signature_metadata(
1192                        "linear",
1193                        "Linear-Signature",
1194                        None,
1195                        Some("Linear-Delivery"),
1196                        Some(75),
1197                        "hex",
1198                    ),
1199                    vec![
1200                        required_secret("signing_secret", "linear"),
1201                        optional_secret("access_token", "linear"),
1202                    ],
1203                    ProviderRuntimeMetadata::Builtin {
1204                        connector: "linear".to_string(),
1205                        default_signature_variant: Some("linear".to_string()),
1206                    },
1207                );
1208                metadata.outbound_methods = vec![
1209                    ProviderOutboundMethod {
1210                        name: "list_issues".to_string(),
1211                    },
1212                    ProviderOutboundMethod {
1213                        name: "update_issue".to_string(),
1214                    },
1215                    ProviderOutboundMethod {
1216                        name: "create_comment".to_string(),
1217                    },
1218                    ProviderOutboundMethod {
1219                        name: "search".to_string(),
1220                    },
1221                    ProviderOutboundMethod {
1222                        name: "graphql".to_string(),
1223                    },
1224                ];
1225                metadata
1226            },
1227            normalize: linear_payload,
1228        }),
1229        Arc::new(BuiltinProviderSchema {
1230            provider_id: "notion",
1231            harn_schema_name: "NotionEventPayload",
1232            metadata: {
1233                let mut metadata = provider_metadata_entry(
1234                    "notion",
1235                    &["webhook", "poll"],
1236                    "NotionEventPayload",
1237                    &[],
1238                    hmac_signature_metadata(
1239                        "notion",
1240                        "X-Notion-Signature",
1241                        None,
1242                        None,
1243                        None,
1244                        "hex",
1245                    ),
1246                    vec![required_secret("verification_token", "notion")],
1247                    ProviderRuntimeMetadata::Builtin {
1248                        connector: "notion".to_string(),
1249                        default_signature_variant: Some("notion".to_string()),
1250                    },
1251                );
1252                metadata.outbound_methods = vec![
1253                    outbound_method("get_page"),
1254                    outbound_method("update_page"),
1255                    outbound_method("append_blocks"),
1256                    outbound_method("query_database"),
1257                    outbound_method("search"),
1258                    outbound_method("create_comment"),
1259                    outbound_method("api_call"),
1260                ];
1261                metadata
1262            },
1263            normalize: notion_payload,
1264        }),
1265        Arc::new(BuiltinProviderSchema {
1266            provider_id: "cron",
1267            harn_schema_name: "CronEventPayload",
1268            metadata: provider_metadata_entry(
1269                "cron",
1270                &["cron"],
1271                "CronEventPayload",
1272                &[],
1273                SignatureVerificationMetadata::None,
1274                Vec::new(),
1275                ProviderRuntimeMetadata::Builtin {
1276                    connector: "cron".to_string(),
1277                    default_signature_variant: None,
1278                },
1279            ),
1280            normalize: cron_payload,
1281        }),
1282        Arc::new(BuiltinProviderSchema {
1283            provider_id: "webhook",
1284            harn_schema_name: "GenericWebhookPayload",
1285            metadata: provider_metadata_entry(
1286                "webhook",
1287                &["webhook"],
1288                "GenericWebhookPayload",
1289                &[],
1290                hmac_signature_metadata(
1291                    "standard",
1292                    "webhook-signature",
1293                    Some("webhook-timestamp"),
1294                    Some("webhook-id"),
1295                    Some(300),
1296                    "base64",
1297                ),
1298                vec![required_secret("signing_secret", "webhook")],
1299                ProviderRuntimeMetadata::Builtin {
1300                    connector: "webhook".to_string(),
1301                    default_signature_variant: Some("standard".to_string()),
1302                },
1303            ),
1304            normalize: webhook_payload,
1305        }),
1306        Arc::new(BuiltinProviderSchema {
1307            provider_id: "a2a-push",
1308            harn_schema_name: "A2aPushPayload",
1309            metadata: provider_metadata_entry(
1310                "a2a-push",
1311                &["a2a-push"],
1312                "A2aPushPayload",
1313                &[],
1314                SignatureVerificationMetadata::None,
1315                Vec::new(),
1316                ProviderRuntimeMetadata::Builtin {
1317                    connector: "a2a-push".to_string(),
1318                    default_signature_variant: None,
1319                },
1320            ),
1321            normalize: a2a_push_payload,
1322        }),
1323        Arc::new(stream_provider_schema("kafka", kafka_payload)),
1324        Arc::new(stream_provider_schema("nats", nats_payload)),
1325        Arc::new(stream_provider_schema("pulsar", pulsar_payload)),
1326        Arc::new(stream_provider_schema("postgres-cdc", postgres_cdc_payload)),
1327        Arc::new(stream_provider_schema("email", email_payload)),
1328        Arc::new(stream_provider_schema("websocket", websocket_payload)),
1329    ]
1330}
1331
1332fn stream_provider_schema(
1333    provider_id: &'static str,
1334    normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
1335) -> BuiltinProviderSchema {
1336    BuiltinProviderSchema {
1337        provider_id,
1338        harn_schema_name: "StreamEventPayload",
1339        metadata: provider_metadata_entry(
1340            provider_id,
1341            &["stream"],
1342            "StreamEventPayload",
1343            &[],
1344            SignatureVerificationMetadata::None,
1345            Vec::new(),
1346            ProviderRuntimeMetadata::Placeholder,
1347        ),
1348        normalize,
1349    }
1350}
1351
1352fn github_payload(
1353    kind: &str,
1354    headers: &BTreeMap<String, String>,
1355    raw: JsonValue,
1356) -> ProviderPayload {
1357    let common = GitHubEventCommon {
1358        event: kind.to_string(),
1359        action: raw
1360            .get("action")
1361            .and_then(JsonValue::as_str)
1362            .map(ToString::to_string),
1363        delivery_id: headers.get("X-GitHub-Delivery").cloned(),
1364        installation_id: raw
1365            .get("installation")
1366            .and_then(|value| value.get("id"))
1367            .and_then(JsonValue::as_i64),
1368        raw: raw.clone(),
1369    };
1370    let payload = match kind {
1371        "issues" => GitHubEventPayload::Issues(GitHubIssuesEventPayload {
1372            common,
1373            issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1374        }),
1375        "pull_request" => GitHubEventPayload::PullRequest(GitHubPullRequestEventPayload {
1376            common,
1377            pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1378        }),
1379        "issue_comment" => GitHubEventPayload::IssueComment(GitHubIssueCommentEventPayload {
1380            common,
1381            issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1382            comment: raw.get("comment").cloned().unwrap_or(JsonValue::Null),
1383        }),
1384        "pull_request_review" => {
1385            GitHubEventPayload::PullRequestReview(GitHubPullRequestReviewEventPayload {
1386                common,
1387                pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1388                review: raw.get("review").cloned().unwrap_or(JsonValue::Null),
1389            })
1390        }
1391        "push" => GitHubEventPayload::Push(GitHubPushEventPayload {
1392            common,
1393            commits: raw
1394                .get("commits")
1395                .and_then(JsonValue::as_array)
1396                .cloned()
1397                .unwrap_or_default(),
1398            distinct_size: raw.get("distinct_size").and_then(JsonValue::as_i64),
1399        }),
1400        "workflow_run" => GitHubEventPayload::WorkflowRun(GitHubWorkflowRunEventPayload {
1401            common,
1402            workflow_run: raw.get("workflow_run").cloned().unwrap_or(JsonValue::Null),
1403        }),
1404        "deployment_status" => {
1405            GitHubEventPayload::DeploymentStatus(GitHubDeploymentStatusEventPayload {
1406                common,
1407                deployment_status: raw
1408                    .get("deployment_status")
1409                    .cloned()
1410                    .unwrap_or(JsonValue::Null),
1411                deployment: raw.get("deployment").cloned().unwrap_or(JsonValue::Null),
1412            })
1413        }
1414        "check_run" => GitHubEventPayload::CheckRun(GitHubCheckRunEventPayload {
1415            common,
1416            check_run: raw.get("check_run").cloned().unwrap_or(JsonValue::Null),
1417        }),
1418        _ => GitHubEventPayload::Other(common),
1419    };
1420    ProviderPayload::Known(KnownProviderPayload::GitHub(payload))
1421}
1422
1423fn slack_payload(
1424    kind: &str,
1425    _headers: &BTreeMap<String, String>,
1426    raw: JsonValue,
1427) -> ProviderPayload {
1428    let event = raw.get("event");
1429    let common = SlackEventCommon {
1430        event: kind.to_string(),
1431        event_id: raw
1432            .get("event_id")
1433            .and_then(JsonValue::as_str)
1434            .map(ToString::to_string),
1435        api_app_id: raw
1436            .get("api_app_id")
1437            .and_then(JsonValue::as_str)
1438            .map(ToString::to_string),
1439        team_id: raw
1440            .get("team_id")
1441            .and_then(JsonValue::as_str)
1442            .map(ToString::to_string),
1443        channel_id: slack_channel_id(event),
1444        user_id: slack_user_id(event),
1445        event_ts: event
1446            .and_then(|value| value.get("event_ts"))
1447            .and_then(JsonValue::as_str)
1448            .map(ToString::to_string),
1449        raw: raw.clone(),
1450    };
1451    let payload = match kind {
1452        kind if kind == "message" || kind.starts_with("message.") => {
1453            SlackEventPayload::Message(SlackMessageEventPayload {
1454                subtype: event
1455                    .and_then(|value| value.get("subtype"))
1456                    .and_then(JsonValue::as_str)
1457                    .map(ToString::to_string),
1458                channel_type: event
1459                    .and_then(|value| value.get("channel_type"))
1460                    .and_then(JsonValue::as_str)
1461                    .map(ToString::to_string),
1462                channel: event
1463                    .and_then(|value| value.get("channel"))
1464                    .and_then(JsonValue::as_str)
1465                    .map(ToString::to_string),
1466                user: event
1467                    .and_then(|value| value.get("user"))
1468                    .and_then(JsonValue::as_str)
1469                    .map(ToString::to_string),
1470                text: event
1471                    .and_then(|value| value.get("text"))
1472                    .and_then(JsonValue::as_str)
1473                    .map(ToString::to_string),
1474                ts: event
1475                    .and_then(|value| value.get("ts"))
1476                    .and_then(JsonValue::as_str)
1477                    .map(ToString::to_string),
1478                thread_ts: event
1479                    .and_then(|value| value.get("thread_ts"))
1480                    .and_then(JsonValue::as_str)
1481                    .map(ToString::to_string),
1482                common,
1483            })
1484        }
1485        "app_mention" => SlackEventPayload::AppMention(SlackAppMentionEventPayload {
1486            channel: event
1487                .and_then(|value| value.get("channel"))
1488                .and_then(JsonValue::as_str)
1489                .map(ToString::to_string),
1490            user: event
1491                .and_then(|value| value.get("user"))
1492                .and_then(JsonValue::as_str)
1493                .map(ToString::to_string),
1494            text: event
1495                .and_then(|value| value.get("text"))
1496                .and_then(JsonValue::as_str)
1497                .map(ToString::to_string),
1498            ts: event
1499                .and_then(|value| value.get("ts"))
1500                .and_then(JsonValue::as_str)
1501                .map(ToString::to_string),
1502            thread_ts: event
1503                .and_then(|value| value.get("thread_ts"))
1504                .and_then(JsonValue::as_str)
1505                .map(ToString::to_string),
1506            common,
1507        }),
1508        "reaction_added" => SlackEventPayload::ReactionAdded(SlackReactionAddedEventPayload {
1509            reaction: event
1510                .and_then(|value| value.get("reaction"))
1511                .and_then(JsonValue::as_str)
1512                .map(ToString::to_string),
1513            item_user: event
1514                .and_then(|value| value.get("item_user"))
1515                .and_then(JsonValue::as_str)
1516                .map(ToString::to_string),
1517            item: event
1518                .and_then(|value| value.get("item"))
1519                .cloned()
1520                .unwrap_or(JsonValue::Null),
1521            common,
1522        }),
1523        "app_home_opened" => SlackEventPayload::AppHomeOpened(SlackAppHomeOpenedEventPayload {
1524            user: event
1525                .and_then(|value| value.get("user"))
1526                .and_then(JsonValue::as_str)
1527                .map(ToString::to_string),
1528            channel: event
1529                .and_then(|value| value.get("channel"))
1530                .and_then(JsonValue::as_str)
1531                .map(ToString::to_string),
1532            tab: event
1533                .and_then(|value| value.get("tab"))
1534                .and_then(JsonValue::as_str)
1535                .map(ToString::to_string),
1536            view: event
1537                .and_then(|value| value.get("view"))
1538                .cloned()
1539                .unwrap_or(JsonValue::Null),
1540            common,
1541        }),
1542        "assistant_thread_started" => {
1543            let assistant_thread = event
1544                .and_then(|value| value.get("assistant_thread"))
1545                .cloned()
1546                .unwrap_or(JsonValue::Null);
1547            SlackEventPayload::AssistantThreadStarted(SlackAssistantThreadStartedEventPayload {
1548                thread_ts: assistant_thread
1549                    .get("thread_ts")
1550                    .and_then(JsonValue::as_str)
1551                    .map(ToString::to_string),
1552                context: assistant_thread
1553                    .get("context")
1554                    .cloned()
1555                    .unwrap_or(JsonValue::Null),
1556                assistant_thread,
1557                common,
1558            })
1559        }
1560        _ => SlackEventPayload::Other(common),
1561    };
1562    ProviderPayload::Known(KnownProviderPayload::Slack(Box::new(payload)))
1563}
1564
1565fn slack_channel_id(event: Option<&JsonValue>) -> Option<String> {
1566    event
1567        .and_then(|value| value.get("channel"))
1568        .and_then(JsonValue::as_str)
1569        .map(ToString::to_string)
1570        .or_else(|| {
1571            event
1572                .and_then(|value| value.get("item"))
1573                .and_then(|value| value.get("channel"))
1574                .and_then(JsonValue::as_str)
1575                .map(ToString::to_string)
1576        })
1577        .or_else(|| {
1578            event
1579                .and_then(|value| value.get("channel"))
1580                .and_then(|value| value.get("id"))
1581                .and_then(JsonValue::as_str)
1582                .map(ToString::to_string)
1583        })
1584        .or_else(|| {
1585            event
1586                .and_then(|value| value.get("assistant_thread"))
1587                .and_then(|value| value.get("channel_id"))
1588                .and_then(JsonValue::as_str)
1589                .map(ToString::to_string)
1590        })
1591}
1592
1593fn slack_user_id(event: Option<&JsonValue>) -> Option<String> {
1594    event
1595        .and_then(|value| value.get("user"))
1596        .and_then(JsonValue::as_str)
1597        .map(ToString::to_string)
1598        .or_else(|| {
1599            event
1600                .and_then(|value| value.get("user"))
1601                .and_then(|value| value.get("id"))
1602                .and_then(JsonValue::as_str)
1603                .map(ToString::to_string)
1604        })
1605        .or_else(|| {
1606            event
1607                .and_then(|value| value.get("item_user"))
1608                .and_then(JsonValue::as_str)
1609                .map(ToString::to_string)
1610        })
1611        .or_else(|| {
1612            event
1613                .and_then(|value| value.get("assistant_thread"))
1614                .and_then(|value| value.get("user_id"))
1615                .and_then(JsonValue::as_str)
1616                .map(ToString::to_string)
1617        })
1618}
1619
1620fn linear_payload(
1621    _kind: &str,
1622    headers: &BTreeMap<String, String>,
1623    raw: JsonValue,
1624) -> ProviderPayload {
1625    let common = linear_event_common(headers, &raw);
1626    let event = common.event.clone();
1627    let payload = match event.as_str() {
1628        "issue" => LinearEventPayload::Issue(LinearIssueEventPayload {
1629            common,
1630            issue: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1631            changes: parse_linear_issue_changes(raw.get("updatedFrom")),
1632        }),
1633        "comment" => LinearEventPayload::IssueComment(LinearIssueCommentEventPayload {
1634            common,
1635            comment: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1636        }),
1637        "issue_label" => LinearEventPayload::IssueLabel(LinearIssueLabelEventPayload {
1638            common,
1639            label: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1640        }),
1641        "project" => LinearEventPayload::Project(LinearProjectEventPayload {
1642            common,
1643            project: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1644        }),
1645        "cycle" => LinearEventPayload::Cycle(LinearCycleEventPayload {
1646            common,
1647            cycle: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1648        }),
1649        "customer" => LinearEventPayload::Customer(LinearCustomerEventPayload {
1650            common,
1651            customer: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1652        }),
1653        "customer_request" => {
1654            LinearEventPayload::CustomerRequest(LinearCustomerRequestEventPayload {
1655                common,
1656                customer_request: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1657            })
1658        }
1659        _ => LinearEventPayload::Other(common),
1660    };
1661    ProviderPayload::Known(KnownProviderPayload::Linear(payload))
1662}
1663
1664fn linear_event_common(headers: &BTreeMap<String, String>, raw: &JsonValue) -> LinearEventCommon {
1665    LinearEventCommon {
1666        event: linear_event_name(
1667            raw.get("type")
1668                .and_then(JsonValue::as_str)
1669                .or_else(|| headers.get("Linear-Event").map(String::as_str)),
1670        ),
1671        action: raw
1672            .get("action")
1673            .and_then(JsonValue::as_str)
1674            .map(ToString::to_string),
1675        delivery_id: header_value(headers, "Linear-Delivery").map(ToString::to_string),
1676        organization_id: raw
1677            .get("organizationId")
1678            .and_then(JsonValue::as_str)
1679            .map(ToString::to_string),
1680        webhook_timestamp: raw.get("webhookTimestamp").and_then(parse_json_i64ish),
1681        webhook_id: raw
1682            .get("webhookId")
1683            .and_then(JsonValue::as_str)
1684            .map(ToString::to_string),
1685        url: raw
1686            .get("url")
1687            .and_then(JsonValue::as_str)
1688            .map(ToString::to_string),
1689        created_at: raw
1690            .get("createdAt")
1691            .and_then(JsonValue::as_str)
1692            .map(ToString::to_string),
1693        actor: raw.get("actor").cloned().unwrap_or(JsonValue::Null),
1694        raw: raw.clone(),
1695    }
1696}
1697
1698fn linear_event_name(raw_type: Option<&str>) -> String {
1699    match raw_type.unwrap_or_default().to_ascii_lowercase().as_str() {
1700        "issue" => "issue".to_string(),
1701        "comment" | "issuecomment" | "issue_comment" => "comment".to_string(),
1702        "issuelabel" | "issue_label" => "issue_label".to_string(),
1703        "project" | "projectupdate" | "project_update" => "project".to_string(),
1704        "cycle" => "cycle".to_string(),
1705        "customer" => "customer".to_string(),
1706        "customerrequest" | "customer_request" => "customer_request".to_string(),
1707        other if !other.is_empty() => other.to_string(),
1708        _ => "other".to_string(),
1709    }
1710}
1711
1712fn parse_linear_issue_changes(updated_from: Option<&JsonValue>) -> Vec<LinearIssueChange> {
1713    let Some(JsonValue::Object(fields)) = updated_from else {
1714        return Vec::new();
1715    };
1716    let mut changes = Vec::new();
1717    for (field, previous) in fields {
1718        let change = match field.as_str() {
1719            "title" => LinearIssueChange::Title {
1720                previous: previous.as_str().map(ToString::to_string),
1721            },
1722            "description" => LinearIssueChange::Description {
1723                previous: previous.as_str().map(ToString::to_string),
1724            },
1725            "priority" => LinearIssueChange::Priority {
1726                previous: parse_json_i64ish(previous),
1727            },
1728            "estimate" => LinearIssueChange::Estimate {
1729                previous: parse_json_i64ish(previous),
1730            },
1731            "stateId" => LinearIssueChange::StateId {
1732                previous: previous.as_str().map(ToString::to_string),
1733            },
1734            "teamId" => LinearIssueChange::TeamId {
1735                previous: previous.as_str().map(ToString::to_string),
1736            },
1737            "assigneeId" => LinearIssueChange::AssigneeId {
1738                previous: previous.as_str().map(ToString::to_string),
1739            },
1740            "projectId" => LinearIssueChange::ProjectId {
1741                previous: previous.as_str().map(ToString::to_string),
1742            },
1743            "cycleId" => LinearIssueChange::CycleId {
1744                previous: previous.as_str().map(ToString::to_string),
1745            },
1746            "dueDate" => LinearIssueChange::DueDate {
1747                previous: previous.as_str().map(ToString::to_string),
1748            },
1749            "parentId" => LinearIssueChange::ParentId {
1750                previous: previous.as_str().map(ToString::to_string),
1751            },
1752            "sortOrder" => LinearIssueChange::SortOrder {
1753                previous: previous.as_f64(),
1754            },
1755            "labelIds" => LinearIssueChange::LabelIds {
1756                previous: parse_string_array(previous),
1757            },
1758            "completedAt" => LinearIssueChange::CompletedAt {
1759                previous: previous.as_str().map(ToString::to_string),
1760            },
1761            _ => LinearIssueChange::Other {
1762                field: field.clone(),
1763                previous: previous.clone(),
1764            },
1765        };
1766        changes.push(change);
1767    }
1768    changes
1769}
1770
1771fn parse_json_i64ish(value: &JsonValue) -> Option<i64> {
1772    value
1773        .as_i64()
1774        .or_else(|| value.as_u64().and_then(|raw| i64::try_from(raw).ok()))
1775        .or_else(|| value.as_str().and_then(|raw| raw.parse::<i64>().ok()))
1776}
1777
1778fn parse_string_array(value: &JsonValue) -> Vec<String> {
1779    let Some(array) = value.as_array() else {
1780        return Vec::new();
1781    };
1782    array
1783        .iter()
1784        .filter_map(|entry| {
1785            entry.as_str().map(ToString::to_string).or_else(|| {
1786                entry
1787                    .get("id")
1788                    .and_then(JsonValue::as_str)
1789                    .map(ToString::to_string)
1790            })
1791        })
1792        .collect()
1793}
1794
1795fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
1796    headers
1797        .iter()
1798        .find(|(key, _)| key.eq_ignore_ascii_case(name))
1799        .map(|(_, value)| value.as_str())
1800}
1801
1802fn notion_payload(
1803    kind: &str,
1804    headers: &BTreeMap<String, String>,
1805    raw: JsonValue,
1806) -> ProviderPayload {
1807    let workspace_id = raw
1808        .get("workspace_id")
1809        .and_then(JsonValue::as_str)
1810        .map(ToString::to_string);
1811    ProviderPayload::Known(KnownProviderPayload::Notion(Box::new(NotionEventPayload {
1812        event: kind.to_string(),
1813        workspace_id,
1814        request_id: headers
1815            .get("request-id")
1816            .cloned()
1817            .or_else(|| headers.get("x-request-id").cloned()),
1818        subscription_id: raw
1819            .get("subscription_id")
1820            .and_then(JsonValue::as_str)
1821            .map(ToString::to_string),
1822        integration_id: raw
1823            .get("integration_id")
1824            .and_then(JsonValue::as_str)
1825            .map(ToString::to_string),
1826        attempt_number: raw
1827            .get("attempt_number")
1828            .and_then(JsonValue::as_u64)
1829            .and_then(|value| u32::try_from(value).ok()),
1830        entity_id: raw
1831            .get("entity")
1832            .and_then(|value| value.get("id"))
1833            .and_then(JsonValue::as_str)
1834            .map(ToString::to_string),
1835        entity_type: raw
1836            .get("entity")
1837            .and_then(|value| value.get("type"))
1838            .and_then(JsonValue::as_str)
1839            .map(ToString::to_string),
1840        api_version: raw
1841            .get("api_version")
1842            .and_then(JsonValue::as_str)
1843            .map(ToString::to_string),
1844        verification_token: raw
1845            .get("verification_token")
1846            .and_then(JsonValue::as_str)
1847            .map(ToString::to_string),
1848        polled: None,
1849        raw,
1850    })))
1851}
1852
1853fn cron_payload(
1854    _kind: &str,
1855    _headers: &BTreeMap<String, String>,
1856    raw: JsonValue,
1857) -> ProviderPayload {
1858    let cron_id = raw
1859        .get("cron_id")
1860        .and_then(JsonValue::as_str)
1861        .map(ToString::to_string);
1862    let schedule = raw
1863        .get("schedule")
1864        .and_then(JsonValue::as_str)
1865        .map(ToString::to_string);
1866    let tick_at = raw
1867        .get("tick_at")
1868        .and_then(JsonValue::as_str)
1869        .and_then(parse_rfc3339)
1870        .unwrap_or_else(OffsetDateTime::now_utc);
1871    ProviderPayload::Known(KnownProviderPayload::Cron(CronEventPayload {
1872        cron_id,
1873        schedule,
1874        tick_at,
1875        raw,
1876    }))
1877}
1878
1879fn webhook_payload(
1880    _kind: &str,
1881    headers: &BTreeMap<String, String>,
1882    raw: JsonValue,
1883) -> ProviderPayload {
1884    ProviderPayload::Known(KnownProviderPayload::Webhook(GenericWebhookPayload {
1885        source: headers.get("X-Webhook-Source").cloned(),
1886        content_type: headers.get("Content-Type").cloned(),
1887        raw,
1888    }))
1889}
1890
1891fn a2a_push_payload(
1892    _kind: &str,
1893    _headers: &BTreeMap<String, String>,
1894    raw: JsonValue,
1895) -> ProviderPayload {
1896    let task_id = raw
1897        .get("task_id")
1898        .and_then(JsonValue::as_str)
1899        .map(ToString::to_string);
1900    let sender = raw
1901        .get("sender")
1902        .and_then(JsonValue::as_str)
1903        .map(ToString::to_string);
1904    let task_state = raw
1905        .pointer("/status/state")
1906        .or_else(|| raw.pointer("/statusUpdate/status/state"))
1907        .and_then(JsonValue::as_str)
1908        .map(|state| match state {
1909            "cancelled" => "canceled".to_string(),
1910            other => other.to_string(),
1911        });
1912    let artifact = raw
1913        .pointer("/artifactUpdate/artifact")
1914        .or_else(|| raw.get("artifact"))
1915        .cloned();
1916    let kind = task_state
1917        .as_deref()
1918        .map(|state| format!("a2a.task.{state}"))
1919        .unwrap_or_else(|| "a2a.task.update".to_string());
1920    ProviderPayload::Known(KnownProviderPayload::A2aPush(A2aPushPayload {
1921        task_id,
1922        task_state,
1923        artifact,
1924        sender,
1925        raw,
1926        kind,
1927    }))
1928}
1929
1930fn kafka_payload(
1931    kind: &str,
1932    headers: &BTreeMap<String, String>,
1933    raw: JsonValue,
1934) -> ProviderPayload {
1935    ProviderPayload::Known(KnownProviderPayload::Kafka(stream_payload(
1936        kind, headers, raw,
1937    )))
1938}
1939
1940fn nats_payload(kind: &str, headers: &BTreeMap<String, String>, raw: JsonValue) -> ProviderPayload {
1941    ProviderPayload::Known(KnownProviderPayload::Nats(stream_payload(
1942        kind, headers, raw,
1943    )))
1944}
1945
1946fn pulsar_payload(
1947    kind: &str,
1948    headers: &BTreeMap<String, String>,
1949    raw: JsonValue,
1950) -> ProviderPayload {
1951    ProviderPayload::Known(KnownProviderPayload::Pulsar(stream_payload(
1952        kind, headers, raw,
1953    )))
1954}
1955
1956fn postgres_cdc_payload(
1957    kind: &str,
1958    headers: &BTreeMap<String, String>,
1959    raw: JsonValue,
1960) -> ProviderPayload {
1961    ProviderPayload::Known(KnownProviderPayload::PostgresCdc(stream_payload(
1962        kind, headers, raw,
1963    )))
1964}
1965
1966fn email_payload(
1967    kind: &str,
1968    headers: &BTreeMap<String, String>,
1969    raw: JsonValue,
1970) -> ProviderPayload {
1971    ProviderPayload::Known(KnownProviderPayload::Email(stream_payload(
1972        kind, headers, raw,
1973    )))
1974}
1975
1976fn websocket_payload(
1977    kind: &str,
1978    headers: &BTreeMap<String, String>,
1979    raw: JsonValue,
1980) -> ProviderPayload {
1981    ProviderPayload::Known(KnownProviderPayload::Websocket(stream_payload(
1982        kind, headers, raw,
1983    )))
1984}
1985
1986fn stream_payload(
1987    kind: &str,
1988    headers: &BTreeMap<String, String>,
1989    raw: JsonValue,
1990) -> StreamEventPayload {
1991    StreamEventPayload {
1992        event: kind.to_string(),
1993        source: json_stringish(&raw, &["source", "connector", "origin"]),
1994        stream: json_stringish(
1995            &raw,
1996            &["stream", "topic", "subject", "channel", "mailbox", "slot"],
1997        ),
1998        partition: json_stringish(&raw, &["partition", "shard", "consumer"]),
1999        offset: json_stringish(&raw, &["offset", "sequence", "lsn", "message_id"]),
2000        key: json_stringish(&raw, &["key", "message_key", "id", "event_id"]),
2001        timestamp: json_stringish(&raw, &["timestamp", "occurred_at", "received_at", "ts"]),
2002        headers: headers.clone(),
2003        raw,
2004    }
2005}
2006
2007fn json_stringish(raw: &JsonValue, fields: &[&str]) -> Option<String> {
2008    fields.iter().find_map(|field| {
2009        let value = raw.get(*field)?;
2010        value
2011            .as_str()
2012            .map(ToString::to_string)
2013            .or_else(|| parse_json_i64ish(value).map(|number| number.to_string()))
2014            .or_else(|| value.as_u64().map(|number| number.to_string()))
2015    })
2016}
2017
2018fn parse_rfc3339(text: &str) -> Option<OffsetDateTime> {
2019    OffsetDateTime::parse(text, &time::format_description::well_known::Rfc3339).ok()
2020}
2021
2022#[cfg(test)]
2023mod tests {
2024    use super::*;
2025
2026    fn sample_headers() -> BTreeMap<String, String> {
2027        BTreeMap::from([
2028            ("Authorization".to_string(), "Bearer secret".to_string()),
2029            ("Cookie".to_string(), "session=abc".to_string()),
2030            ("User-Agent".to_string(), "GitHub-Hookshot/123".to_string()),
2031            ("X-GitHub-Delivery".to_string(), "delivery-123".to_string()),
2032            ("X-GitHub-Event".to_string(), "issues".to_string()),
2033            ("X-Webhook-Token".to_string(), "token".to_string()),
2034        ])
2035    }
2036
2037    #[test]
2038    fn default_redaction_policy_keeps_safe_headers() {
2039        let redacted = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
2040        assert_eq!(redacted.get("User-Agent").unwrap(), "GitHub-Hookshot/123");
2041        assert_eq!(redacted.get("X-GitHub-Delivery").unwrap(), "delivery-123");
2042        assert_eq!(
2043            redacted.get("Authorization").unwrap(),
2044            REDACTED_HEADER_VALUE
2045        );
2046        assert_eq!(redacted.get("Cookie").unwrap(), REDACTED_HEADER_VALUE);
2047        assert_eq!(
2048            redacted.get("X-Webhook-Token").unwrap(),
2049            REDACTED_HEADER_VALUE
2050        );
2051    }
2052
2053    #[test]
2054    fn provider_catalog_rejects_duplicates() {
2055        let mut catalog = ProviderCatalog::default();
2056        catalog
2057            .register(Arc::new(BuiltinProviderSchema {
2058                provider_id: "github",
2059                harn_schema_name: "GitHubEventPayload",
2060                metadata: provider_metadata_entry(
2061                    "github",
2062                    &["webhook"],
2063                    "GitHubEventPayload",
2064                    &[],
2065                    SignatureVerificationMetadata::None,
2066                    Vec::new(),
2067                    ProviderRuntimeMetadata::Placeholder,
2068                ),
2069                normalize: github_payload,
2070            }))
2071            .unwrap();
2072        let error = catalog
2073            .register(Arc::new(BuiltinProviderSchema {
2074                provider_id: "github",
2075                harn_schema_name: "GitHubEventPayload",
2076                metadata: provider_metadata_entry(
2077                    "github",
2078                    &["webhook"],
2079                    "GitHubEventPayload",
2080                    &[],
2081                    SignatureVerificationMetadata::None,
2082                    Vec::new(),
2083                    ProviderRuntimeMetadata::Placeholder,
2084                ),
2085                normalize: github_payload,
2086            }))
2087            .unwrap_err();
2088        assert_eq!(
2089            error,
2090            ProviderCatalogError::DuplicateProvider("github".to_string())
2091        );
2092    }
2093
2094    #[test]
2095    fn registered_provider_metadata_marks_builtin_connectors() {
2096        let entries = registered_provider_metadata();
2097        let builtin: Vec<&ProviderMetadata> = entries
2098            .iter()
2099            .filter(|entry| matches!(entry.runtime, ProviderRuntimeMetadata::Builtin { .. }))
2100            .collect();
2101
2102        assert_eq!(builtin.len(), 7);
2103        assert!(builtin.iter().any(|entry| entry.provider == "a2a-push"));
2104        assert!(builtin.iter().any(|entry| entry.provider == "cron"));
2105        assert!(builtin.iter().any(|entry| entry.provider == "github"));
2106        assert!(builtin.iter().any(|entry| entry.provider == "linear"));
2107        assert!(builtin.iter().any(|entry| entry.provider == "notion"));
2108        assert!(builtin.iter().any(|entry| entry.provider == "slack"));
2109        assert!(builtin.iter().any(|entry| entry.provider == "webhook"));
2110        let kafka = entries
2111            .iter()
2112            .find(|entry| entry.provider == "kafka")
2113            .expect("kafka stream provider");
2114        assert_eq!(kafka.kinds, vec!["stream".to_string()]);
2115        assert_eq!(kafka.schema_name, "StreamEventPayload");
2116        assert!(matches!(
2117            kafka.runtime,
2118            ProviderRuntimeMetadata::Placeholder
2119        ));
2120    }
2121
2122    #[test]
2123    fn trigger_event_round_trip_is_stable() {
2124        let provider = ProviderId::from("github");
2125        let headers = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
2126        let payload = ProviderPayload::normalize(
2127            &provider,
2128            "issues",
2129            &sample_headers(),
2130            serde_json::json!({
2131                "action": "opened",
2132                "installation": {"id": 42},
2133                "issue": {"number": 99}
2134            }),
2135        )
2136        .unwrap();
2137        let event = TriggerEvent {
2138            id: TriggerEventId("trigger_evt_fixed".to_string()),
2139            provider,
2140            kind: "issues".to_string(),
2141            received_at: parse_rfc3339("2026-04-19T07:00:00Z").unwrap(),
2142            occurred_at: Some(parse_rfc3339("2026-04-19T06:59:59Z").unwrap()),
2143            dedupe_key: "delivery-123".to_string(),
2144            trace_id: TraceId("trace_fixed".to_string()),
2145            tenant_id: Some(TenantId("tenant_1".to_string())),
2146            headers,
2147            provider_payload: payload,
2148            signature_status: SignatureStatus::Verified,
2149            dedupe_claimed: false,
2150            batch: None,
2151            raw_body: Some(vec![0, 159, 255, 10]),
2152        };
2153
2154        let once = serde_json::to_value(&event).unwrap();
2155        assert_eq!(once["raw_body"], serde_json::json!("AJ//Cg=="));
2156        let decoded: TriggerEvent = serde_json::from_value(once.clone()).unwrap();
2157        let twice = serde_json::to_value(&decoded).unwrap();
2158        assert_eq!(decoded, event);
2159        assert_eq!(once, twice);
2160    }
2161
2162    #[test]
2163    fn unknown_provider_errors() {
2164        let error = ProviderPayload::normalize(
2165            &ProviderId::from("custom-provider"),
2166            "thing.happened",
2167            &BTreeMap::new(),
2168            serde_json::json!({"ok": true}),
2169        )
2170        .unwrap_err();
2171        assert_eq!(
2172            error,
2173            ProviderCatalogError::UnknownProvider("custom-provider".to_string())
2174        );
2175    }
2176
2177    #[test]
2178    fn provider_normalizes_stream_payloads() {
2179        let payload = ProviderPayload::normalize(
2180            &ProviderId::from("kafka"),
2181            "quote.tick",
2182            &BTreeMap::from([("x-source".to_string(), "feed".to_string())]),
2183            serde_json::json!({
2184                "topic": "quotes",
2185                "partition": 7,
2186                "offset": "42",
2187                "key": "AAPL",
2188                "timestamp": "2026-04-21T12:00:00Z"
2189            }),
2190        )
2191        .expect("stream payload");
2192        let ProviderPayload::Known(KnownProviderPayload::Kafka(payload)) = payload else {
2193            panic!("expected kafka stream payload")
2194        };
2195        assert_eq!(payload.event, "quote.tick");
2196        assert_eq!(payload.stream.as_deref(), Some("quotes"));
2197        assert_eq!(payload.partition.as_deref(), Some("7"));
2198        assert_eq!(payload.offset.as_deref(), Some("42"));
2199        assert_eq!(payload.key.as_deref(), Some("AAPL"));
2200        assert_eq!(payload.timestamp.as_deref(), Some("2026-04-21T12:00:00Z"));
2201    }
2202}