Skip to main content

harn_vm/triggers/
event.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, OnceLock, RwLock};
3
4use serde::{Deserialize, Deserializer, Serialize, Serializer};
5use serde_json::Value as JsonValue;
6use time::OffsetDateTime;
7use uuid::Uuid;
8
9use crate::triggers::test_util::clock;
10
11const REDACTED_HEADER_VALUE: &str = "[redacted]";
12
13fn serialize_optional_bytes_b64<S>(
14    value: &Option<Vec<u8>>,
15    serializer: S,
16) -> Result<S::Ok, S::Error>
17where
18    S: Serializer,
19{
20    use base64::Engine;
21
22    match value {
23        Some(bytes) => {
24            serializer.serialize_some(&base64::engine::general_purpose::STANDARD.encode(bytes))
25        }
26        None => serializer.serialize_none(),
27    }
28}
29
30fn deserialize_optional_bytes_b64<'de, D>(deserializer: D) -> Result<Option<Vec<u8>>, D::Error>
31where
32    D: Deserializer<'de>,
33{
34    use base64::Engine;
35
36    let encoded = Option::<String>::deserialize(deserializer)?;
37    encoded
38        .map(|text| {
39            base64::engine::general_purpose::STANDARD
40                .decode(text.as_bytes())
41                .map_err(serde::de::Error::custom)
42        })
43        .transpose()
44}
45
46#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
47#[serde(transparent)]
48pub struct TriggerEventId(pub String);
49
50impl TriggerEventId {
51    pub fn new() -> Self {
52        Self(format!("trigger_evt_{}", Uuid::now_v7()))
53    }
54}
55
56impl Default for TriggerEventId {
57    fn default() -> Self {
58        Self::new()
59    }
60}
61
62#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
63#[serde(transparent)]
64pub struct ProviderId(pub String);
65
66impl ProviderId {
67    pub fn new(value: impl Into<String>) -> Self {
68        Self(value.into())
69    }
70
71    pub fn as_str(&self) -> &str {
72        self.0.as_str()
73    }
74}
75
76impl From<&str> for ProviderId {
77    fn from(value: &str) -> Self {
78        Self::new(value)
79    }
80}
81
82impl From<String> for ProviderId {
83    fn from(value: String) -> Self {
84        Self::new(value)
85    }
86}
87
88#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
89#[serde(transparent)]
90pub struct TraceId(pub String);
91
92impl TraceId {
93    pub fn new() -> Self {
94        Self(format!("trace_{}", Uuid::now_v7()))
95    }
96}
97
98impl Default for TraceId {
99    fn default() -> Self {
100        Self::new()
101    }
102}
103
104#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
105#[serde(transparent)]
106pub struct TenantId(pub String);
107
108impl TenantId {
109    pub fn new(value: impl Into<String>) -> Self {
110        Self(value.into())
111    }
112}
113
114#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
115#[serde(tag = "state", rename_all = "snake_case")]
116pub enum SignatureStatus {
117    Verified,
118    Unsigned,
119    Failed { reason: String },
120}
121
122#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
123pub struct GitHubEventCommon {
124    pub event: String,
125    pub action: Option<String>,
126    pub delivery_id: Option<String>,
127    pub installation_id: Option<i64>,
128    pub raw: JsonValue,
129}
130
131#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
132pub struct GitHubIssuesEventPayload {
133    #[serde(flatten)]
134    pub common: GitHubEventCommon,
135    pub issue: JsonValue,
136}
137
138#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
139pub struct GitHubPullRequestEventPayload {
140    #[serde(flatten)]
141    pub common: GitHubEventCommon,
142    pub pull_request: JsonValue,
143}
144
145#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
146pub struct GitHubIssueCommentEventPayload {
147    #[serde(flatten)]
148    pub common: GitHubEventCommon,
149    pub issue: JsonValue,
150    pub comment: JsonValue,
151}
152
153#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
154pub struct GitHubPullRequestReviewEventPayload {
155    #[serde(flatten)]
156    pub common: GitHubEventCommon,
157    pub pull_request: JsonValue,
158    pub review: JsonValue,
159}
160
161#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
162pub struct GitHubPushEventPayload {
163    #[serde(flatten)]
164    pub common: GitHubEventCommon,
165    #[serde(default)]
166    pub commits: Vec<JsonValue>,
167    pub distinct_size: Option<i64>,
168}
169
170#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
171pub struct GitHubWorkflowRunEventPayload {
172    #[serde(flatten)]
173    pub common: GitHubEventCommon,
174    pub workflow_run: JsonValue,
175}
176
177#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
178pub struct GitHubDeploymentStatusEventPayload {
179    #[serde(flatten)]
180    pub common: GitHubEventCommon,
181    pub deployment_status: JsonValue,
182    pub deployment: JsonValue,
183}
184
185#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
186pub struct GitHubCheckRunEventPayload {
187    #[serde(flatten)]
188    pub common: GitHubEventCommon,
189    pub check_run: JsonValue,
190}
191
192#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
193#[serde(untagged)]
194pub enum GitHubEventPayload {
195    Issues(GitHubIssuesEventPayload),
196    PullRequest(GitHubPullRequestEventPayload),
197    IssueComment(GitHubIssueCommentEventPayload),
198    PullRequestReview(GitHubPullRequestReviewEventPayload),
199    Push(GitHubPushEventPayload),
200    WorkflowRun(GitHubWorkflowRunEventPayload),
201    DeploymentStatus(GitHubDeploymentStatusEventPayload),
202    CheckRun(GitHubCheckRunEventPayload),
203    Other(GitHubEventCommon),
204}
205
206#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
207pub struct SlackEventCommon {
208    pub event: String,
209    pub event_id: Option<String>,
210    pub api_app_id: Option<String>,
211    pub team_id: Option<String>,
212    pub channel_id: Option<String>,
213    pub user_id: Option<String>,
214    pub event_ts: Option<String>,
215    pub raw: JsonValue,
216}
217
218#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
219pub struct SlackMessageEventPayload {
220    #[serde(flatten)]
221    pub common: SlackEventCommon,
222    pub subtype: Option<String>,
223    pub channel_type: Option<String>,
224    pub channel: Option<String>,
225    pub user: Option<String>,
226    pub text: Option<String>,
227    pub ts: Option<String>,
228    pub thread_ts: Option<String>,
229}
230
231#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
232pub struct SlackAppMentionEventPayload {
233    #[serde(flatten)]
234    pub common: SlackEventCommon,
235    pub channel: Option<String>,
236    pub user: Option<String>,
237    pub text: Option<String>,
238    pub ts: Option<String>,
239    pub thread_ts: Option<String>,
240}
241
242#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
243pub struct SlackReactionAddedEventPayload {
244    #[serde(flatten)]
245    pub common: SlackEventCommon,
246    pub reaction: Option<String>,
247    pub item_user: Option<String>,
248    pub item: JsonValue,
249}
250
251#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
252pub struct SlackAppHomeOpenedEventPayload {
253    #[serde(flatten)]
254    pub common: SlackEventCommon,
255    pub user: Option<String>,
256    pub channel: Option<String>,
257    pub tab: Option<String>,
258    pub view: JsonValue,
259}
260
261#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
262pub struct SlackAssistantThreadStartedEventPayload {
263    #[serde(flatten)]
264    pub common: SlackEventCommon,
265    pub assistant_thread: JsonValue,
266    pub thread_ts: Option<String>,
267    pub context: JsonValue,
268}
269
270#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
271#[serde(untagged)]
272pub enum SlackEventPayload {
273    Message(SlackMessageEventPayload),
274    AppMention(SlackAppMentionEventPayload),
275    ReactionAdded(SlackReactionAddedEventPayload),
276    AppHomeOpened(SlackAppHomeOpenedEventPayload),
277    AssistantThreadStarted(SlackAssistantThreadStartedEventPayload),
278    Other(SlackEventCommon),
279}
280
281#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
282pub struct LinearEventCommon {
283    pub event: String,
284    pub action: Option<String>,
285    pub delivery_id: Option<String>,
286    pub organization_id: Option<String>,
287    pub webhook_timestamp: Option<i64>,
288    pub webhook_id: Option<String>,
289    pub url: Option<String>,
290    pub created_at: Option<String>,
291    pub actor: JsonValue,
292    pub raw: JsonValue,
293}
294
295#[derive(Clone, Debug, PartialEq)]
296pub enum LinearIssueChange {
297    Title { previous: Option<String> },
298    Description { previous: Option<String> },
299    Priority { previous: Option<i64> },
300    Estimate { previous: Option<i64> },
301    StateId { previous: Option<String> },
302    TeamId { previous: Option<String> },
303    AssigneeId { previous: Option<String> },
304    ProjectId { previous: Option<String> },
305    CycleId { previous: Option<String> },
306    DueDate { previous: Option<String> },
307    ParentId { previous: Option<String> },
308    SortOrder { previous: Option<f64> },
309    LabelIds { previous: Vec<String> },
310    CompletedAt { previous: Option<String> },
311    Other { field: String, previous: JsonValue },
312}
313
314impl Serialize for LinearIssueChange {
315    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
316    where
317        S: Serializer,
318    {
319        let value = match self {
320            Self::Title { previous } => {
321                serde_json::json!({ "field_name": "title", "previous": previous })
322            }
323            Self::Description { previous } => {
324                serde_json::json!({ "field_name": "description", "previous": previous })
325            }
326            Self::Priority { previous } => {
327                serde_json::json!({ "field_name": "priority", "previous": previous })
328            }
329            Self::Estimate { previous } => {
330                serde_json::json!({ "field_name": "estimate", "previous": previous })
331            }
332            Self::StateId { previous } => {
333                serde_json::json!({ "field_name": "state_id", "previous": previous })
334            }
335            Self::TeamId { previous } => {
336                serde_json::json!({ "field_name": "team_id", "previous": previous })
337            }
338            Self::AssigneeId { previous } => {
339                serde_json::json!({ "field_name": "assignee_id", "previous": previous })
340            }
341            Self::ProjectId { previous } => {
342                serde_json::json!({ "field_name": "project_id", "previous": previous })
343            }
344            Self::CycleId { previous } => {
345                serde_json::json!({ "field_name": "cycle_id", "previous": previous })
346            }
347            Self::DueDate { previous } => {
348                serde_json::json!({ "field_name": "due_date", "previous": previous })
349            }
350            Self::ParentId { previous } => {
351                serde_json::json!({ "field_name": "parent_id", "previous": previous })
352            }
353            Self::SortOrder { previous } => {
354                serde_json::json!({ "field_name": "sort_order", "previous": previous })
355            }
356            Self::LabelIds { previous } => {
357                serde_json::json!({ "field_name": "label_ids", "previous": previous })
358            }
359            Self::CompletedAt { previous } => {
360                serde_json::json!({ "field_name": "completed_at", "previous": previous })
361            }
362            Self::Other { field, previous } => {
363                serde_json::json!({ "field_name": "other", "field": field, "previous": previous })
364            }
365        };
366        value.serialize(serializer)
367    }
368}
369
370impl<'de> Deserialize<'de> for LinearIssueChange {
371    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
372    where
373        D: Deserializer<'de>,
374    {
375        let value = JsonValue::deserialize(deserializer)?;
376        let field_name = value
377            .get("field_name")
378            .and_then(JsonValue::as_str)
379            .ok_or_else(|| serde::de::Error::custom("linear issue change missing field_name"))?;
380        let previous = value.get("previous").cloned().unwrap_or(JsonValue::Null);
381        Ok(match field_name {
382            "title" => Self::Title {
383                previous: previous.as_str().map(ToString::to_string),
384            },
385            "description" => Self::Description {
386                previous: previous.as_str().map(ToString::to_string),
387            },
388            "priority" => Self::Priority {
389                previous: parse_json_i64ish(&previous),
390            },
391            "estimate" => Self::Estimate {
392                previous: parse_json_i64ish(&previous),
393            },
394            "state_id" => Self::StateId {
395                previous: previous.as_str().map(ToString::to_string),
396            },
397            "team_id" => Self::TeamId {
398                previous: previous.as_str().map(ToString::to_string),
399            },
400            "assignee_id" => Self::AssigneeId {
401                previous: previous.as_str().map(ToString::to_string),
402            },
403            "project_id" => Self::ProjectId {
404                previous: previous.as_str().map(ToString::to_string),
405            },
406            "cycle_id" => Self::CycleId {
407                previous: previous.as_str().map(ToString::to_string),
408            },
409            "due_date" => Self::DueDate {
410                previous: previous.as_str().map(ToString::to_string),
411            },
412            "parent_id" => Self::ParentId {
413                previous: previous.as_str().map(ToString::to_string),
414            },
415            "sort_order" => Self::SortOrder {
416                previous: previous.as_f64(),
417            },
418            "label_ids" => Self::LabelIds {
419                previous: parse_string_array(&previous),
420            },
421            "completed_at" => Self::CompletedAt {
422                previous: previous.as_str().map(ToString::to_string),
423            },
424            "other" => Self::Other {
425                field: value
426                    .get("field")
427                    .and_then(JsonValue::as_str)
428                    .map(ToString::to_string)
429                    .unwrap_or_else(|| "unknown".to_string()),
430                previous,
431            },
432            other => Self::Other {
433                field: other.to_string(),
434                previous,
435            },
436        })
437    }
438}
439
440#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
441pub struct LinearIssueEventPayload {
442    #[serde(flatten)]
443    pub common: LinearEventCommon,
444    pub issue: JsonValue,
445    #[serde(default)]
446    pub changes: Vec<LinearIssueChange>,
447}
448
449#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
450pub struct LinearIssueCommentEventPayload {
451    #[serde(flatten)]
452    pub common: LinearEventCommon,
453    pub comment: JsonValue,
454}
455
456#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
457pub struct LinearIssueLabelEventPayload {
458    #[serde(flatten)]
459    pub common: LinearEventCommon,
460    pub label: JsonValue,
461}
462
463#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
464pub struct LinearProjectEventPayload {
465    #[serde(flatten)]
466    pub common: LinearEventCommon,
467    pub project: JsonValue,
468}
469
470#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
471pub struct LinearCycleEventPayload {
472    #[serde(flatten)]
473    pub common: LinearEventCommon,
474    pub cycle: JsonValue,
475}
476
477#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
478pub struct LinearCustomerEventPayload {
479    #[serde(flatten)]
480    pub common: LinearEventCommon,
481    pub customer: JsonValue,
482}
483
484#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
485pub struct LinearCustomerRequestEventPayload {
486    #[serde(flatten)]
487    pub common: LinearEventCommon,
488    pub customer_request: JsonValue,
489}
490
491#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
492#[serde(untagged)]
493pub enum LinearEventPayload {
494    Issue(LinearIssueEventPayload),
495    IssueComment(LinearIssueCommentEventPayload),
496    IssueLabel(LinearIssueLabelEventPayload),
497    Project(LinearProjectEventPayload),
498    Cycle(LinearCycleEventPayload),
499    Customer(LinearCustomerEventPayload),
500    CustomerRequest(LinearCustomerRequestEventPayload),
501    Other(LinearEventCommon),
502}
503
504#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
505pub struct NotionPolledChangeEvent {
506    pub resource: String,
507    pub source_id: String,
508    pub entity_id: String,
509    pub high_water_mark: String,
510    pub before: Option<JsonValue>,
511    pub after: JsonValue,
512}
513
514#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
515pub struct NotionEventPayload {
516    pub event: String,
517    pub workspace_id: Option<String>,
518    pub request_id: Option<String>,
519    pub subscription_id: Option<String>,
520    pub integration_id: Option<String>,
521    pub attempt_number: Option<u32>,
522    pub entity_id: Option<String>,
523    pub entity_type: Option<String>,
524    pub api_version: Option<String>,
525    pub verification_token: Option<String>,
526    pub polled: Option<NotionPolledChangeEvent>,
527    pub raw: JsonValue,
528}
529
530#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
531pub struct CronEventPayload {
532    pub cron_id: Option<String>,
533    pub schedule: Option<String>,
534    #[serde(with = "time::serde::rfc3339")]
535    pub tick_at: OffsetDateTime,
536    pub raw: JsonValue,
537}
538
539#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
540pub struct GenericWebhookPayload {
541    pub source: Option<String>,
542    pub content_type: Option<String>,
543    pub raw: JsonValue,
544}
545
546#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
547pub struct A2aPushPayload {
548    pub task_id: Option<String>,
549    pub task_state: Option<String>,
550    pub artifact: Option<JsonValue>,
551    pub sender: Option<String>,
552    pub raw: JsonValue,
553    pub kind: String,
554}
555
556#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
557pub struct StreamEventPayload {
558    pub event: String,
559    pub source: Option<String>,
560    pub stream: Option<String>,
561    pub partition: Option<String>,
562    pub offset: Option<String>,
563    pub key: Option<String>,
564    pub timestamp: Option<String>,
565    #[serde(default)]
566    pub headers: BTreeMap<String, String>,
567    pub raw: JsonValue,
568}
569
570#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
571pub struct ExtensionProviderPayload {
572    pub provider: String,
573    pub schema_name: String,
574    pub raw: JsonValue,
575}
576
577#[allow(clippy::large_enum_variant)]
578#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
579#[serde(untagged)]
580pub enum ProviderPayload {
581    Known(KnownProviderPayload),
582    Extension(ExtensionProviderPayload),
583}
584
585impl ProviderPayload {
586    pub fn provider(&self) -> &str {
587        match self {
588            Self::Known(known) => known.provider(),
589            Self::Extension(payload) => payload.provider.as_str(),
590        }
591    }
592
593    pub fn normalize(
594        provider: &ProviderId,
595        kind: &str,
596        headers: &BTreeMap<String, String>,
597        raw: JsonValue,
598    ) -> Result<Self, ProviderCatalogError> {
599        provider_catalog()
600            .read()
601            .expect("provider catalog poisoned")
602            .normalize(provider, kind, headers, raw)
603    }
604}
605
606#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
607#[serde(tag = "provider")]
608pub enum KnownProviderPayload {
609    #[serde(rename = "github")]
610    GitHub(GitHubEventPayload),
611    #[serde(rename = "slack")]
612    Slack(Box<SlackEventPayload>),
613    #[serde(rename = "linear")]
614    Linear(LinearEventPayload),
615    #[serde(rename = "notion")]
616    Notion(Box<NotionEventPayload>),
617    #[serde(rename = "cron")]
618    Cron(CronEventPayload),
619    #[serde(rename = "webhook")]
620    Webhook(GenericWebhookPayload),
621    #[serde(rename = "a2a-push")]
622    A2aPush(A2aPushPayload),
623    #[serde(rename = "kafka")]
624    Kafka(StreamEventPayload),
625    #[serde(rename = "nats")]
626    Nats(StreamEventPayload),
627    #[serde(rename = "pulsar")]
628    Pulsar(StreamEventPayload),
629    #[serde(rename = "postgres-cdc")]
630    PostgresCdc(StreamEventPayload),
631    #[serde(rename = "email")]
632    Email(StreamEventPayload),
633    #[serde(rename = "websocket")]
634    Websocket(StreamEventPayload),
635}
636
637impl KnownProviderPayload {
638    pub fn provider(&self) -> &str {
639        match self {
640            Self::GitHub(_) => "github",
641            Self::Slack(_) => "slack",
642            Self::Linear(_) => "linear",
643            Self::Notion(_) => "notion",
644            Self::Cron(_) => "cron",
645            Self::Webhook(_) => "webhook",
646            Self::A2aPush(_) => "a2a-push",
647            Self::Kafka(_) => "kafka",
648            Self::Nats(_) => "nats",
649            Self::Pulsar(_) => "pulsar",
650            Self::PostgresCdc(_) => "postgres-cdc",
651            Self::Email(_) => "email",
652            Self::Websocket(_) => "websocket",
653        }
654    }
655}
656
657#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
658pub struct TriggerEvent {
659    pub id: TriggerEventId,
660    pub provider: ProviderId,
661    pub kind: String,
662    #[serde(with = "time::serde::rfc3339")]
663    pub received_at: OffsetDateTime,
664    #[serde(with = "time::serde::rfc3339::option")]
665    pub occurred_at: Option<OffsetDateTime>,
666    pub dedupe_key: String,
667    pub trace_id: TraceId,
668    pub tenant_id: Option<TenantId>,
669    pub headers: BTreeMap<String, String>,
670    #[serde(default, skip_serializing_if = "Option::is_none")]
671    pub batch: Option<Vec<JsonValue>>,
672    #[serde(
673        default,
674        skip_serializing_if = "Option::is_none",
675        serialize_with = "serialize_optional_bytes_b64",
676        deserialize_with = "deserialize_optional_bytes_b64"
677    )]
678    pub raw_body: Option<Vec<u8>>,
679    pub provider_payload: ProviderPayload,
680    pub signature_status: SignatureStatus,
681    #[serde(skip)]
682    pub dedupe_claimed: bool,
683}
684
685impl TriggerEvent {
686    #[allow(clippy::too_many_arguments)]
687    pub fn new(
688        provider: ProviderId,
689        kind: impl Into<String>,
690        occurred_at: Option<OffsetDateTime>,
691        dedupe_key: impl Into<String>,
692        tenant_id: Option<TenantId>,
693        headers: BTreeMap<String, String>,
694        provider_payload: ProviderPayload,
695        signature_status: SignatureStatus,
696    ) -> Self {
697        Self {
698            id: TriggerEventId::new(),
699            provider,
700            kind: kind.into(),
701            received_at: clock::now_utc(),
702            occurred_at,
703            dedupe_key: dedupe_key.into(),
704            trace_id: TraceId::new(),
705            tenant_id,
706            headers,
707            batch: None,
708            raw_body: None,
709            provider_payload,
710            signature_status,
711            dedupe_claimed: false,
712        }
713    }
714
715    pub fn dedupe_claimed(&self) -> bool {
716        self.dedupe_claimed
717    }
718
719    pub fn mark_dedupe_claimed(&mut self) {
720        self.dedupe_claimed = true;
721    }
722}
723
724#[derive(Clone, Debug, PartialEq, Eq)]
725pub struct HeaderRedactionPolicy {
726    safe_exact_names: BTreeSet<String>,
727}
728
729impl HeaderRedactionPolicy {
730    pub fn with_safe_header(mut self, name: impl Into<String>) -> Self {
731        self.safe_exact_names
732            .insert(name.into().to_ascii_lowercase());
733        self
734    }
735
736    fn should_keep(&self, name: &str) -> bool {
737        let lower = name.to_ascii_lowercase();
738        if self.safe_exact_names.contains(lower.as_str()) {
739            return true;
740        }
741        matches!(
742            lower.as_str(),
743            "user-agent"
744                | "request-id"
745                | "x-request-id"
746                | "x-correlation-id"
747                | "content-type"
748                | "content-length"
749                | "x-github-event"
750                | "x-github-delivery"
751                | "x-github-hook-id"
752                | "x-hub-signature-256"
753                | "x-slack-request-timestamp"
754                | "x-slack-signature"
755                | "x-linear-signature"
756                | "x-notion-signature"
757                | "x-a2a-signature"
758                | "x-a2a-delivery"
759        ) || lower.ends_with("-event")
760            || lower.ends_with("-delivery")
761            || lower.contains("timestamp")
762            || lower.contains("request-id")
763    }
764
765    fn should_redact(&self, name: &str) -> bool {
766        let lower = name.to_ascii_lowercase();
767        if self.should_keep(lower.as_str()) {
768            return false;
769        }
770        lower.contains("authorization")
771            || lower.contains("cookie")
772            || lower.contains("secret")
773            || lower.contains("token")
774            || lower.contains("key")
775    }
776}
777
778impl Default for HeaderRedactionPolicy {
779    fn default() -> Self {
780        Self {
781            safe_exact_names: BTreeSet::from([
782                "content-length".to_string(),
783                "content-type".to_string(),
784                "request-id".to_string(),
785                "user-agent".to_string(),
786                "x-a2a-delivery".to_string(),
787                "x-a2a-signature".to_string(),
788                "x-correlation-id".to_string(),
789                "x-github-delivery".to_string(),
790                "x-github-event".to_string(),
791                "x-github-hook-id".to_string(),
792                "x-hub-signature-256".to_string(),
793                "x-linear-signature".to_string(),
794                "x-notion-signature".to_string(),
795                "x-request-id".to_string(),
796                "x-slack-request-timestamp".to_string(),
797                "x-slack-signature".to_string(),
798            ]),
799        }
800    }
801}
802
803pub fn redact_headers(
804    headers: &BTreeMap<String, String>,
805    policy: &HeaderRedactionPolicy,
806) -> BTreeMap<String, String> {
807    headers
808        .iter()
809        .map(|(name, value)| {
810            if policy.should_redact(name) {
811                (name.clone(), REDACTED_HEADER_VALUE.to_string())
812            } else {
813                (name.clone(), value.clone())
814            }
815        })
816        .collect()
817}
818
819#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
820pub struct ProviderSecretRequirement {
821    pub name: String,
822    pub required: bool,
823    pub namespace: String,
824}
825
826#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
827pub struct ProviderOutboundMethod {
828    pub name: String,
829}
830
831#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
832#[serde(tag = "kind", rename_all = "snake_case")]
833pub enum SignatureVerificationMetadata {
834    #[default]
835    None,
836    Hmac {
837        variant: String,
838        raw_body: bool,
839        signature_header: String,
840        timestamp_header: Option<String>,
841        id_header: Option<String>,
842        default_tolerance_secs: Option<i64>,
843        digest: String,
844        encoding: String,
845    },
846}
847
848#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
849#[serde(tag = "kind", rename_all = "snake_case")]
850pub enum ProviderRuntimeMetadata {
851    Builtin {
852        connector: String,
853        default_signature_variant: Option<String>,
854    },
855    #[default]
856    Placeholder,
857}
858
859#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
860pub struct ProviderMetadata {
861    pub provider: String,
862    #[serde(default)]
863    pub kinds: Vec<String>,
864    pub schema_name: String,
865    #[serde(default)]
866    pub outbound_methods: Vec<ProviderOutboundMethod>,
867    #[serde(default)]
868    pub secret_requirements: Vec<ProviderSecretRequirement>,
869    #[serde(default)]
870    pub signature_verification: SignatureVerificationMetadata,
871    #[serde(default)]
872    pub runtime: ProviderRuntimeMetadata,
873}
874
875impl ProviderMetadata {
876    pub fn supports_kind(&self, kind: &str) -> bool {
877        self.kinds.iter().any(|candidate| candidate == kind)
878    }
879
880    pub fn required_secret_names(&self) -> impl Iterator<Item = &str> {
881        self.secret_requirements
882            .iter()
883            .filter(|requirement| requirement.required)
884            .map(|requirement| requirement.name.as_str())
885    }
886}
887
888pub trait ProviderSchema: Send + Sync {
889    fn provider_id(&self) -> &'static str;
890    fn harn_schema_name(&self) -> &'static str;
891    fn metadata(&self) -> ProviderMetadata {
892        ProviderMetadata {
893            provider: self.provider_id().to_string(),
894            schema_name: self.harn_schema_name().to_string(),
895            ..ProviderMetadata::default()
896        }
897    }
898    fn normalize(
899        &self,
900        kind: &str,
901        headers: &BTreeMap<String, String>,
902        raw: JsonValue,
903    ) -> Result<ProviderPayload, ProviderCatalogError>;
904}
905
906#[derive(Clone, Debug, PartialEq, Eq)]
907pub enum ProviderCatalogError {
908    DuplicateProvider(String),
909    UnknownProvider(String),
910}
911
912impl std::fmt::Display for ProviderCatalogError {
913    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
914        match self {
915            Self::DuplicateProvider(provider) => {
916                write!(f, "provider `{provider}` is already registered")
917            }
918            Self::UnknownProvider(provider) => write!(f, "provider `{provider}` is not registered"),
919        }
920    }
921}
922
923impl std::error::Error for ProviderCatalogError {}
924
925#[derive(Clone, Default)]
926pub struct ProviderCatalog {
927    providers: BTreeMap<String, Arc<dyn ProviderSchema>>,
928}
929
930impl ProviderCatalog {
931    pub fn with_defaults() -> Self {
932        let mut catalog = Self::default();
933        for schema in default_provider_schemas() {
934            catalog
935                .register(schema)
936                .expect("default providers must register cleanly");
937        }
938        catalog
939    }
940
941    pub fn register(
942        &mut self,
943        schema: Arc<dyn ProviderSchema>,
944    ) -> Result<(), ProviderCatalogError> {
945        let provider = schema.provider_id().to_string();
946        if self.providers.contains_key(provider.as_str()) {
947            return Err(ProviderCatalogError::DuplicateProvider(provider));
948        }
949        self.providers.insert(provider, schema);
950        Ok(())
951    }
952
953    pub fn normalize(
954        &self,
955        provider: &ProviderId,
956        kind: &str,
957        headers: &BTreeMap<String, String>,
958        raw: JsonValue,
959    ) -> Result<ProviderPayload, ProviderCatalogError> {
960        let schema = self
961            .providers
962            .get(provider.as_str())
963            .ok_or_else(|| ProviderCatalogError::UnknownProvider(provider.0.clone()))?;
964        schema.normalize(kind, headers, raw)
965    }
966
967    pub fn schema_names(&self) -> BTreeMap<String, String> {
968        self.providers
969            .iter()
970            .map(|(provider, schema)| (provider.clone(), schema.harn_schema_name().to_string()))
971            .collect()
972    }
973
974    pub fn entries(&self) -> Vec<ProviderMetadata> {
975        self.providers
976            .values()
977            .map(|schema| schema.metadata())
978            .collect()
979    }
980
981    pub fn metadata_for(&self, provider: &str) -> Option<ProviderMetadata> {
982        self.providers.get(provider).map(|schema| schema.metadata())
983    }
984}
985
986pub fn register_provider_schema(
987    schema: Arc<dyn ProviderSchema>,
988) -> Result<(), ProviderCatalogError> {
989    provider_catalog()
990        .write()
991        .expect("provider catalog poisoned")
992        .register(schema)
993}
994
995pub fn reset_provider_catalog() {
996    *provider_catalog()
997        .write()
998        .expect("provider catalog poisoned") = ProviderCatalog::with_defaults();
999}
1000
1001pub fn registered_provider_schema_names() -> BTreeMap<String, String> {
1002    provider_catalog()
1003        .read()
1004        .expect("provider catalog poisoned")
1005        .schema_names()
1006}
1007
1008pub fn registered_provider_metadata() -> Vec<ProviderMetadata> {
1009    provider_catalog()
1010        .read()
1011        .expect("provider catalog poisoned")
1012        .entries()
1013}
1014
1015pub fn provider_metadata(provider: &str) -> Option<ProviderMetadata> {
1016    provider_catalog()
1017        .read()
1018        .expect("provider catalog poisoned")
1019        .metadata_for(provider)
1020}
1021
1022fn provider_catalog() -> &'static RwLock<ProviderCatalog> {
1023    static PROVIDER_CATALOG: OnceLock<RwLock<ProviderCatalog>> = OnceLock::new();
1024    PROVIDER_CATALOG.get_or_init(|| RwLock::new(ProviderCatalog::with_defaults()))
1025}
1026
1027struct BuiltinProviderSchema {
1028    provider_id: &'static str,
1029    harn_schema_name: &'static str,
1030    metadata: ProviderMetadata,
1031    normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
1032}
1033
1034impl ProviderSchema for BuiltinProviderSchema {
1035    fn provider_id(&self) -> &'static str {
1036        self.provider_id
1037    }
1038
1039    fn harn_schema_name(&self) -> &'static str {
1040        self.harn_schema_name
1041    }
1042
1043    fn metadata(&self) -> ProviderMetadata {
1044        self.metadata.clone()
1045    }
1046
1047    fn normalize(
1048        &self,
1049        kind: &str,
1050        headers: &BTreeMap<String, String>,
1051        raw: JsonValue,
1052    ) -> Result<ProviderPayload, ProviderCatalogError> {
1053        Ok((self.normalize)(kind, headers, raw))
1054    }
1055}
1056
1057fn provider_metadata_entry(
1058    provider: &str,
1059    kinds: &[&str],
1060    schema_name: &str,
1061    outbound_methods: &[&str],
1062    signature_verification: SignatureVerificationMetadata,
1063    secret_requirements: Vec<ProviderSecretRequirement>,
1064    runtime: ProviderRuntimeMetadata,
1065) -> ProviderMetadata {
1066    ProviderMetadata {
1067        provider: provider.to_string(),
1068        kinds: kinds.iter().map(|kind| kind.to_string()).collect(),
1069        schema_name: schema_name.to_string(),
1070        outbound_methods: outbound_methods
1071            .iter()
1072            .map(|name| ProviderOutboundMethod {
1073                name: (*name).to_string(),
1074            })
1075            .collect(),
1076        secret_requirements,
1077        signature_verification,
1078        runtime,
1079    }
1080}
1081
1082fn hmac_signature_metadata(
1083    variant: &str,
1084    signature_header: &str,
1085    timestamp_header: Option<&str>,
1086    id_header: Option<&str>,
1087    default_tolerance_secs: Option<i64>,
1088    encoding: &str,
1089) -> SignatureVerificationMetadata {
1090    SignatureVerificationMetadata::Hmac {
1091        variant: variant.to_string(),
1092        raw_body: true,
1093        signature_header: signature_header.to_string(),
1094        timestamp_header: timestamp_header.map(ToString::to_string),
1095        id_header: id_header.map(ToString::to_string),
1096        default_tolerance_secs,
1097        digest: "sha256".to_string(),
1098        encoding: encoding.to_string(),
1099    }
1100}
1101
1102fn required_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1103    ProviderSecretRequirement {
1104        name: name.to_string(),
1105        required: true,
1106        namespace: namespace.to_string(),
1107    }
1108}
1109
1110fn outbound_method(name: &str) -> ProviderOutboundMethod {
1111    ProviderOutboundMethod {
1112        name: name.to_string(),
1113    }
1114}
1115
1116fn optional_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1117    ProviderSecretRequirement {
1118        name: name.to_string(),
1119        required: false,
1120        namespace: namespace.to_string(),
1121    }
1122}
1123
1124fn default_provider_schemas() -> Vec<Arc<dyn ProviderSchema>> {
1125    vec![
1126        Arc::new(BuiltinProviderSchema {
1127            provider_id: "github",
1128            harn_schema_name: "GitHubEventPayload",
1129            metadata: provider_metadata_entry(
1130                "github",
1131                &["webhook"],
1132                "GitHubEventPayload",
1133                &[],
1134                hmac_signature_metadata(
1135                    "github",
1136                    "X-Hub-Signature-256",
1137                    None,
1138                    Some("X-GitHub-Delivery"),
1139                    None,
1140                    "hex",
1141                ),
1142                vec![required_secret("signing_secret", "github")],
1143                ProviderRuntimeMetadata::Builtin {
1144                    connector: "webhook".to_string(),
1145                    default_signature_variant: Some("github".to_string()),
1146                },
1147            ),
1148            normalize: github_payload,
1149        }),
1150        Arc::new(BuiltinProviderSchema {
1151            provider_id: "slack",
1152            harn_schema_name: "SlackEventPayload",
1153            metadata: provider_metadata_entry(
1154                "slack",
1155                &["webhook"],
1156                "SlackEventPayload",
1157                &[
1158                    "post_message",
1159                    "update_message",
1160                    "add_reaction",
1161                    "open_view",
1162                    "user_info",
1163                    "api_call",
1164                    "upload_file",
1165                ],
1166                hmac_signature_metadata(
1167                    "slack",
1168                    "X-Slack-Signature",
1169                    Some("X-Slack-Request-Timestamp"),
1170                    None,
1171                    Some(300),
1172                    "hex",
1173                ),
1174                vec![required_secret("signing_secret", "slack")],
1175                ProviderRuntimeMetadata::Builtin {
1176                    connector: "slack".to_string(),
1177                    default_signature_variant: Some("slack".to_string()),
1178                },
1179            ),
1180            normalize: slack_payload,
1181        }),
1182        Arc::new(BuiltinProviderSchema {
1183            provider_id: "linear",
1184            harn_schema_name: "LinearEventPayload",
1185            metadata: {
1186                let mut metadata = provider_metadata_entry(
1187                    "linear",
1188                    &["webhook"],
1189                    "LinearEventPayload",
1190                    &[],
1191                    hmac_signature_metadata(
1192                        "linear",
1193                        "Linear-Signature",
1194                        None,
1195                        Some("Linear-Delivery"),
1196                        Some(75),
1197                        "hex",
1198                    ),
1199                    vec![
1200                        required_secret("signing_secret", "linear"),
1201                        optional_secret("access_token", "linear"),
1202                    ],
1203                    ProviderRuntimeMetadata::Builtin {
1204                        connector: "linear".to_string(),
1205                        default_signature_variant: Some("linear".to_string()),
1206                    },
1207                );
1208                metadata.outbound_methods = vec![
1209                    ProviderOutboundMethod {
1210                        name: "list_issues".to_string(),
1211                    },
1212                    ProviderOutboundMethod {
1213                        name: "update_issue".to_string(),
1214                    },
1215                    ProviderOutboundMethod {
1216                        name: "create_comment".to_string(),
1217                    },
1218                    ProviderOutboundMethod {
1219                        name: "search".to_string(),
1220                    },
1221                    ProviderOutboundMethod {
1222                        name: "graphql".to_string(),
1223                    },
1224                ];
1225                metadata
1226            },
1227            normalize: linear_payload,
1228        }),
1229        Arc::new(BuiltinProviderSchema {
1230            provider_id: "notion",
1231            harn_schema_name: "NotionEventPayload",
1232            metadata: {
1233                let mut metadata = provider_metadata_entry(
1234                    "notion",
1235                    &["webhook", "poll"],
1236                    "NotionEventPayload",
1237                    &[],
1238                    hmac_signature_metadata(
1239                        "notion",
1240                        "X-Notion-Signature",
1241                        None,
1242                        None,
1243                        None,
1244                        "hex",
1245                    ),
1246                    vec![required_secret("verification_token", "notion")],
1247                    ProviderRuntimeMetadata::Builtin {
1248                        connector: "notion".to_string(),
1249                        default_signature_variant: Some("notion".to_string()),
1250                    },
1251                );
1252                metadata.outbound_methods = vec![
1253                    outbound_method("get_page"),
1254                    outbound_method("update_page"),
1255                    outbound_method("append_blocks"),
1256                    outbound_method("query_database"),
1257                    outbound_method("search"),
1258                    outbound_method("create_comment"),
1259                    outbound_method("api_call"),
1260                ];
1261                metadata
1262            },
1263            normalize: notion_payload,
1264        }),
1265        Arc::new(BuiltinProviderSchema {
1266            provider_id: "cron",
1267            harn_schema_name: "CronEventPayload",
1268            metadata: provider_metadata_entry(
1269                "cron",
1270                &["cron"],
1271                "CronEventPayload",
1272                &[],
1273                SignatureVerificationMetadata::None,
1274                Vec::new(),
1275                ProviderRuntimeMetadata::Builtin {
1276                    connector: "cron".to_string(),
1277                    default_signature_variant: None,
1278                },
1279            ),
1280            normalize: cron_payload,
1281        }),
1282        Arc::new(BuiltinProviderSchema {
1283            provider_id: "webhook",
1284            harn_schema_name: "GenericWebhookPayload",
1285            metadata: provider_metadata_entry(
1286                "webhook",
1287                &["webhook"],
1288                "GenericWebhookPayload",
1289                &[],
1290                hmac_signature_metadata(
1291                    "standard",
1292                    "webhook-signature",
1293                    Some("webhook-timestamp"),
1294                    Some("webhook-id"),
1295                    Some(300),
1296                    "base64",
1297                ),
1298                vec![required_secret("signing_secret", "webhook")],
1299                ProviderRuntimeMetadata::Builtin {
1300                    connector: "webhook".to_string(),
1301                    default_signature_variant: Some("standard".to_string()),
1302                },
1303            ),
1304            normalize: webhook_payload,
1305        }),
1306        Arc::new(BuiltinProviderSchema {
1307            provider_id: "a2a-push",
1308            harn_schema_name: "A2aPushPayload",
1309            metadata: provider_metadata_entry(
1310                "a2a-push",
1311                &["a2a-push"],
1312                "A2aPushPayload",
1313                &[],
1314                SignatureVerificationMetadata::None,
1315                Vec::new(),
1316                ProviderRuntimeMetadata::Builtin {
1317                    connector: "a2a-push".to_string(),
1318                    default_signature_variant: None,
1319                },
1320            ),
1321            normalize: a2a_push_payload,
1322        }),
1323        Arc::new(stream_provider_schema("kafka", kafka_payload)),
1324        Arc::new(stream_provider_schema("nats", nats_payload)),
1325        Arc::new(stream_provider_schema("pulsar", pulsar_payload)),
1326        Arc::new(stream_provider_schema("postgres-cdc", postgres_cdc_payload)),
1327        Arc::new(stream_provider_schema("email", email_payload)),
1328        Arc::new(stream_provider_schema("websocket", websocket_payload)),
1329    ]
1330}
1331
1332fn stream_provider_schema(
1333    provider_id: &'static str,
1334    normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
1335) -> BuiltinProviderSchema {
1336    BuiltinProviderSchema {
1337        provider_id,
1338        harn_schema_name: "StreamEventPayload",
1339        metadata: provider_metadata_entry(
1340            provider_id,
1341            &["stream"],
1342            "StreamEventPayload",
1343            &[],
1344            SignatureVerificationMetadata::None,
1345            Vec::new(),
1346            ProviderRuntimeMetadata::Builtin {
1347                connector: "stream".to_string(),
1348                default_signature_variant: None,
1349            },
1350        ),
1351        normalize,
1352    }
1353}
1354
1355fn github_payload(
1356    kind: &str,
1357    headers: &BTreeMap<String, String>,
1358    raw: JsonValue,
1359) -> ProviderPayload {
1360    let common = GitHubEventCommon {
1361        event: kind.to_string(),
1362        action: raw
1363            .get("action")
1364            .and_then(JsonValue::as_str)
1365            .map(ToString::to_string),
1366        delivery_id: headers.get("X-GitHub-Delivery").cloned(),
1367        installation_id: raw
1368            .get("installation")
1369            .and_then(|value| value.get("id"))
1370            .and_then(JsonValue::as_i64),
1371        raw: raw.clone(),
1372    };
1373    let payload = match kind {
1374        "issues" => GitHubEventPayload::Issues(GitHubIssuesEventPayload {
1375            common,
1376            issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1377        }),
1378        "pull_request" => GitHubEventPayload::PullRequest(GitHubPullRequestEventPayload {
1379            common,
1380            pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1381        }),
1382        "issue_comment" => GitHubEventPayload::IssueComment(GitHubIssueCommentEventPayload {
1383            common,
1384            issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1385            comment: raw.get("comment").cloned().unwrap_or(JsonValue::Null),
1386        }),
1387        "pull_request_review" => {
1388            GitHubEventPayload::PullRequestReview(GitHubPullRequestReviewEventPayload {
1389                common,
1390                pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1391                review: raw.get("review").cloned().unwrap_or(JsonValue::Null),
1392            })
1393        }
1394        "push" => GitHubEventPayload::Push(GitHubPushEventPayload {
1395            common,
1396            commits: raw
1397                .get("commits")
1398                .and_then(JsonValue::as_array)
1399                .cloned()
1400                .unwrap_or_default(),
1401            distinct_size: raw.get("distinct_size").and_then(JsonValue::as_i64),
1402        }),
1403        "workflow_run" => GitHubEventPayload::WorkflowRun(GitHubWorkflowRunEventPayload {
1404            common,
1405            workflow_run: raw.get("workflow_run").cloned().unwrap_or(JsonValue::Null),
1406        }),
1407        "deployment_status" => {
1408            GitHubEventPayload::DeploymentStatus(GitHubDeploymentStatusEventPayload {
1409                common,
1410                deployment_status: raw
1411                    .get("deployment_status")
1412                    .cloned()
1413                    .unwrap_or(JsonValue::Null),
1414                deployment: raw.get("deployment").cloned().unwrap_or(JsonValue::Null),
1415            })
1416        }
1417        "check_run" => GitHubEventPayload::CheckRun(GitHubCheckRunEventPayload {
1418            common,
1419            check_run: raw.get("check_run").cloned().unwrap_or(JsonValue::Null),
1420        }),
1421        _ => GitHubEventPayload::Other(common),
1422    };
1423    ProviderPayload::Known(KnownProviderPayload::GitHub(payload))
1424}
1425
1426fn slack_payload(
1427    kind: &str,
1428    _headers: &BTreeMap<String, String>,
1429    raw: JsonValue,
1430) -> ProviderPayload {
1431    let event = raw.get("event");
1432    let common = SlackEventCommon {
1433        event: kind.to_string(),
1434        event_id: raw
1435            .get("event_id")
1436            .and_then(JsonValue::as_str)
1437            .map(ToString::to_string),
1438        api_app_id: raw
1439            .get("api_app_id")
1440            .and_then(JsonValue::as_str)
1441            .map(ToString::to_string),
1442        team_id: raw
1443            .get("team_id")
1444            .and_then(JsonValue::as_str)
1445            .map(ToString::to_string),
1446        channel_id: slack_channel_id(event),
1447        user_id: slack_user_id(event),
1448        event_ts: event
1449            .and_then(|value| value.get("event_ts"))
1450            .and_then(JsonValue::as_str)
1451            .map(ToString::to_string),
1452        raw: raw.clone(),
1453    };
1454    let payload = match kind {
1455        kind if kind == "message" || kind.starts_with("message.") => {
1456            SlackEventPayload::Message(SlackMessageEventPayload {
1457                subtype: event
1458                    .and_then(|value| value.get("subtype"))
1459                    .and_then(JsonValue::as_str)
1460                    .map(ToString::to_string),
1461                channel_type: event
1462                    .and_then(|value| value.get("channel_type"))
1463                    .and_then(JsonValue::as_str)
1464                    .map(ToString::to_string),
1465                channel: event
1466                    .and_then(|value| value.get("channel"))
1467                    .and_then(JsonValue::as_str)
1468                    .map(ToString::to_string),
1469                user: event
1470                    .and_then(|value| value.get("user"))
1471                    .and_then(JsonValue::as_str)
1472                    .map(ToString::to_string),
1473                text: event
1474                    .and_then(|value| value.get("text"))
1475                    .and_then(JsonValue::as_str)
1476                    .map(ToString::to_string),
1477                ts: event
1478                    .and_then(|value| value.get("ts"))
1479                    .and_then(JsonValue::as_str)
1480                    .map(ToString::to_string),
1481                thread_ts: event
1482                    .and_then(|value| value.get("thread_ts"))
1483                    .and_then(JsonValue::as_str)
1484                    .map(ToString::to_string),
1485                common,
1486            })
1487        }
1488        "app_mention" => SlackEventPayload::AppMention(SlackAppMentionEventPayload {
1489            channel: event
1490                .and_then(|value| value.get("channel"))
1491                .and_then(JsonValue::as_str)
1492                .map(ToString::to_string),
1493            user: event
1494                .and_then(|value| value.get("user"))
1495                .and_then(JsonValue::as_str)
1496                .map(ToString::to_string),
1497            text: event
1498                .and_then(|value| value.get("text"))
1499                .and_then(JsonValue::as_str)
1500                .map(ToString::to_string),
1501            ts: event
1502                .and_then(|value| value.get("ts"))
1503                .and_then(JsonValue::as_str)
1504                .map(ToString::to_string),
1505            thread_ts: event
1506                .and_then(|value| value.get("thread_ts"))
1507                .and_then(JsonValue::as_str)
1508                .map(ToString::to_string),
1509            common,
1510        }),
1511        "reaction_added" => SlackEventPayload::ReactionAdded(SlackReactionAddedEventPayload {
1512            reaction: event
1513                .and_then(|value| value.get("reaction"))
1514                .and_then(JsonValue::as_str)
1515                .map(ToString::to_string),
1516            item_user: event
1517                .and_then(|value| value.get("item_user"))
1518                .and_then(JsonValue::as_str)
1519                .map(ToString::to_string),
1520            item: event
1521                .and_then(|value| value.get("item"))
1522                .cloned()
1523                .unwrap_or(JsonValue::Null),
1524            common,
1525        }),
1526        "app_home_opened" => SlackEventPayload::AppHomeOpened(SlackAppHomeOpenedEventPayload {
1527            user: event
1528                .and_then(|value| value.get("user"))
1529                .and_then(JsonValue::as_str)
1530                .map(ToString::to_string),
1531            channel: event
1532                .and_then(|value| value.get("channel"))
1533                .and_then(JsonValue::as_str)
1534                .map(ToString::to_string),
1535            tab: event
1536                .and_then(|value| value.get("tab"))
1537                .and_then(JsonValue::as_str)
1538                .map(ToString::to_string),
1539            view: event
1540                .and_then(|value| value.get("view"))
1541                .cloned()
1542                .unwrap_or(JsonValue::Null),
1543            common,
1544        }),
1545        "assistant_thread_started" => {
1546            let assistant_thread = event
1547                .and_then(|value| value.get("assistant_thread"))
1548                .cloned()
1549                .unwrap_or(JsonValue::Null);
1550            SlackEventPayload::AssistantThreadStarted(SlackAssistantThreadStartedEventPayload {
1551                thread_ts: assistant_thread
1552                    .get("thread_ts")
1553                    .and_then(JsonValue::as_str)
1554                    .map(ToString::to_string),
1555                context: assistant_thread
1556                    .get("context")
1557                    .cloned()
1558                    .unwrap_or(JsonValue::Null),
1559                assistant_thread,
1560                common,
1561            })
1562        }
1563        _ => SlackEventPayload::Other(common),
1564    };
1565    ProviderPayload::Known(KnownProviderPayload::Slack(Box::new(payload)))
1566}
1567
1568fn slack_channel_id(event: Option<&JsonValue>) -> Option<String> {
1569    event
1570        .and_then(|value| value.get("channel"))
1571        .and_then(JsonValue::as_str)
1572        .map(ToString::to_string)
1573        .or_else(|| {
1574            event
1575                .and_then(|value| value.get("item"))
1576                .and_then(|value| value.get("channel"))
1577                .and_then(JsonValue::as_str)
1578                .map(ToString::to_string)
1579        })
1580        .or_else(|| {
1581            event
1582                .and_then(|value| value.get("channel"))
1583                .and_then(|value| value.get("id"))
1584                .and_then(JsonValue::as_str)
1585                .map(ToString::to_string)
1586        })
1587        .or_else(|| {
1588            event
1589                .and_then(|value| value.get("assistant_thread"))
1590                .and_then(|value| value.get("channel_id"))
1591                .and_then(JsonValue::as_str)
1592                .map(ToString::to_string)
1593        })
1594}
1595
1596fn slack_user_id(event: Option<&JsonValue>) -> Option<String> {
1597    event
1598        .and_then(|value| value.get("user"))
1599        .and_then(JsonValue::as_str)
1600        .map(ToString::to_string)
1601        .or_else(|| {
1602            event
1603                .and_then(|value| value.get("user"))
1604                .and_then(|value| value.get("id"))
1605                .and_then(JsonValue::as_str)
1606                .map(ToString::to_string)
1607        })
1608        .or_else(|| {
1609            event
1610                .and_then(|value| value.get("item_user"))
1611                .and_then(JsonValue::as_str)
1612                .map(ToString::to_string)
1613        })
1614        .or_else(|| {
1615            event
1616                .and_then(|value| value.get("assistant_thread"))
1617                .and_then(|value| value.get("user_id"))
1618                .and_then(JsonValue::as_str)
1619                .map(ToString::to_string)
1620        })
1621}
1622
1623fn linear_payload(
1624    _kind: &str,
1625    headers: &BTreeMap<String, String>,
1626    raw: JsonValue,
1627) -> ProviderPayload {
1628    let common = linear_event_common(headers, &raw);
1629    let event = common.event.clone();
1630    let payload = match event.as_str() {
1631        "issue" => LinearEventPayload::Issue(LinearIssueEventPayload {
1632            common,
1633            issue: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1634            changes: parse_linear_issue_changes(raw.get("updatedFrom")),
1635        }),
1636        "comment" => LinearEventPayload::IssueComment(LinearIssueCommentEventPayload {
1637            common,
1638            comment: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1639        }),
1640        "issue_label" => LinearEventPayload::IssueLabel(LinearIssueLabelEventPayload {
1641            common,
1642            label: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1643        }),
1644        "project" => LinearEventPayload::Project(LinearProjectEventPayload {
1645            common,
1646            project: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1647        }),
1648        "cycle" => LinearEventPayload::Cycle(LinearCycleEventPayload {
1649            common,
1650            cycle: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1651        }),
1652        "customer" => LinearEventPayload::Customer(LinearCustomerEventPayload {
1653            common,
1654            customer: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1655        }),
1656        "customer_request" => {
1657            LinearEventPayload::CustomerRequest(LinearCustomerRequestEventPayload {
1658                common,
1659                customer_request: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1660            })
1661        }
1662        _ => LinearEventPayload::Other(common),
1663    };
1664    ProviderPayload::Known(KnownProviderPayload::Linear(payload))
1665}
1666
1667fn linear_event_common(headers: &BTreeMap<String, String>, raw: &JsonValue) -> LinearEventCommon {
1668    LinearEventCommon {
1669        event: linear_event_name(
1670            raw.get("type")
1671                .and_then(JsonValue::as_str)
1672                .or_else(|| headers.get("Linear-Event").map(String::as_str)),
1673        ),
1674        action: raw
1675            .get("action")
1676            .and_then(JsonValue::as_str)
1677            .map(ToString::to_string),
1678        delivery_id: header_value(headers, "Linear-Delivery").map(ToString::to_string),
1679        organization_id: raw
1680            .get("organizationId")
1681            .and_then(JsonValue::as_str)
1682            .map(ToString::to_string),
1683        webhook_timestamp: raw.get("webhookTimestamp").and_then(parse_json_i64ish),
1684        webhook_id: raw
1685            .get("webhookId")
1686            .and_then(JsonValue::as_str)
1687            .map(ToString::to_string),
1688        url: raw
1689            .get("url")
1690            .and_then(JsonValue::as_str)
1691            .map(ToString::to_string),
1692        created_at: raw
1693            .get("createdAt")
1694            .and_then(JsonValue::as_str)
1695            .map(ToString::to_string),
1696        actor: raw.get("actor").cloned().unwrap_or(JsonValue::Null),
1697        raw: raw.clone(),
1698    }
1699}
1700
1701fn linear_event_name(raw_type: Option<&str>) -> String {
1702    match raw_type.unwrap_or_default().to_ascii_lowercase().as_str() {
1703        "issue" => "issue".to_string(),
1704        "comment" | "issuecomment" | "issue_comment" => "comment".to_string(),
1705        "issuelabel" | "issue_label" => "issue_label".to_string(),
1706        "project" | "projectupdate" | "project_update" => "project".to_string(),
1707        "cycle" => "cycle".to_string(),
1708        "customer" => "customer".to_string(),
1709        "customerrequest" | "customer_request" => "customer_request".to_string(),
1710        other if !other.is_empty() => other.to_string(),
1711        _ => "other".to_string(),
1712    }
1713}
1714
1715fn parse_linear_issue_changes(updated_from: Option<&JsonValue>) -> Vec<LinearIssueChange> {
1716    let Some(JsonValue::Object(fields)) = updated_from else {
1717        return Vec::new();
1718    };
1719    let mut changes = Vec::new();
1720    for (field, previous) in fields {
1721        let change = match field.as_str() {
1722            "title" => LinearIssueChange::Title {
1723                previous: previous.as_str().map(ToString::to_string),
1724            },
1725            "description" => LinearIssueChange::Description {
1726                previous: previous.as_str().map(ToString::to_string),
1727            },
1728            "priority" => LinearIssueChange::Priority {
1729                previous: parse_json_i64ish(previous),
1730            },
1731            "estimate" => LinearIssueChange::Estimate {
1732                previous: parse_json_i64ish(previous),
1733            },
1734            "stateId" => LinearIssueChange::StateId {
1735                previous: previous.as_str().map(ToString::to_string),
1736            },
1737            "teamId" => LinearIssueChange::TeamId {
1738                previous: previous.as_str().map(ToString::to_string),
1739            },
1740            "assigneeId" => LinearIssueChange::AssigneeId {
1741                previous: previous.as_str().map(ToString::to_string),
1742            },
1743            "projectId" => LinearIssueChange::ProjectId {
1744                previous: previous.as_str().map(ToString::to_string),
1745            },
1746            "cycleId" => LinearIssueChange::CycleId {
1747                previous: previous.as_str().map(ToString::to_string),
1748            },
1749            "dueDate" => LinearIssueChange::DueDate {
1750                previous: previous.as_str().map(ToString::to_string),
1751            },
1752            "parentId" => LinearIssueChange::ParentId {
1753                previous: previous.as_str().map(ToString::to_string),
1754            },
1755            "sortOrder" => LinearIssueChange::SortOrder {
1756                previous: previous.as_f64(),
1757            },
1758            "labelIds" => LinearIssueChange::LabelIds {
1759                previous: parse_string_array(previous),
1760            },
1761            "completedAt" => LinearIssueChange::CompletedAt {
1762                previous: previous.as_str().map(ToString::to_string),
1763            },
1764            _ => LinearIssueChange::Other {
1765                field: field.clone(),
1766                previous: previous.clone(),
1767            },
1768        };
1769        changes.push(change);
1770    }
1771    changes
1772}
1773
1774fn parse_json_i64ish(value: &JsonValue) -> Option<i64> {
1775    value
1776        .as_i64()
1777        .or_else(|| value.as_u64().and_then(|raw| i64::try_from(raw).ok()))
1778        .or_else(|| value.as_str().and_then(|raw| raw.parse::<i64>().ok()))
1779}
1780
1781fn parse_string_array(value: &JsonValue) -> Vec<String> {
1782    let Some(array) = value.as_array() else {
1783        return Vec::new();
1784    };
1785    array
1786        .iter()
1787        .filter_map(|entry| {
1788            entry.as_str().map(ToString::to_string).or_else(|| {
1789                entry
1790                    .get("id")
1791                    .and_then(JsonValue::as_str)
1792                    .map(ToString::to_string)
1793            })
1794        })
1795        .collect()
1796}
1797
1798fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
1799    headers
1800        .iter()
1801        .find(|(key, _)| key.eq_ignore_ascii_case(name))
1802        .map(|(_, value)| value.as_str())
1803}
1804
1805fn notion_payload(
1806    kind: &str,
1807    headers: &BTreeMap<String, String>,
1808    raw: JsonValue,
1809) -> ProviderPayload {
1810    let workspace_id = raw
1811        .get("workspace_id")
1812        .and_then(JsonValue::as_str)
1813        .map(ToString::to_string);
1814    ProviderPayload::Known(KnownProviderPayload::Notion(Box::new(NotionEventPayload {
1815        event: kind.to_string(),
1816        workspace_id,
1817        request_id: headers
1818            .get("request-id")
1819            .cloned()
1820            .or_else(|| headers.get("x-request-id").cloned()),
1821        subscription_id: raw
1822            .get("subscription_id")
1823            .and_then(JsonValue::as_str)
1824            .map(ToString::to_string),
1825        integration_id: raw
1826            .get("integration_id")
1827            .and_then(JsonValue::as_str)
1828            .map(ToString::to_string),
1829        attempt_number: raw
1830            .get("attempt_number")
1831            .and_then(JsonValue::as_u64)
1832            .and_then(|value| u32::try_from(value).ok()),
1833        entity_id: raw
1834            .get("entity")
1835            .and_then(|value| value.get("id"))
1836            .and_then(JsonValue::as_str)
1837            .map(ToString::to_string),
1838        entity_type: raw
1839            .get("entity")
1840            .and_then(|value| value.get("type"))
1841            .and_then(JsonValue::as_str)
1842            .map(ToString::to_string),
1843        api_version: raw
1844            .get("api_version")
1845            .and_then(JsonValue::as_str)
1846            .map(ToString::to_string),
1847        verification_token: raw
1848            .get("verification_token")
1849            .and_then(JsonValue::as_str)
1850            .map(ToString::to_string),
1851        polled: None,
1852        raw,
1853    })))
1854}
1855
1856fn cron_payload(
1857    _kind: &str,
1858    _headers: &BTreeMap<String, String>,
1859    raw: JsonValue,
1860) -> ProviderPayload {
1861    let cron_id = raw
1862        .get("cron_id")
1863        .and_then(JsonValue::as_str)
1864        .map(ToString::to_string);
1865    let schedule = raw
1866        .get("schedule")
1867        .and_then(JsonValue::as_str)
1868        .map(ToString::to_string);
1869    let tick_at = raw
1870        .get("tick_at")
1871        .and_then(JsonValue::as_str)
1872        .and_then(parse_rfc3339)
1873        .unwrap_or_else(OffsetDateTime::now_utc);
1874    ProviderPayload::Known(KnownProviderPayload::Cron(CronEventPayload {
1875        cron_id,
1876        schedule,
1877        tick_at,
1878        raw,
1879    }))
1880}
1881
1882fn webhook_payload(
1883    _kind: &str,
1884    headers: &BTreeMap<String, String>,
1885    raw: JsonValue,
1886) -> ProviderPayload {
1887    ProviderPayload::Known(KnownProviderPayload::Webhook(GenericWebhookPayload {
1888        source: headers.get("X-Webhook-Source").cloned(),
1889        content_type: headers.get("Content-Type").cloned(),
1890        raw,
1891    }))
1892}
1893
1894fn a2a_push_payload(
1895    _kind: &str,
1896    _headers: &BTreeMap<String, String>,
1897    raw: JsonValue,
1898) -> ProviderPayload {
1899    let task_id = raw
1900        .get("task_id")
1901        .and_then(JsonValue::as_str)
1902        .map(ToString::to_string);
1903    let sender = raw
1904        .get("sender")
1905        .and_then(JsonValue::as_str)
1906        .map(ToString::to_string);
1907    let task_state = raw
1908        .pointer("/status/state")
1909        .or_else(|| raw.pointer("/statusUpdate/status/state"))
1910        .and_then(JsonValue::as_str)
1911        .map(|state| match state {
1912            "cancelled" => "canceled".to_string(),
1913            other => other.to_string(),
1914        });
1915    let artifact = raw
1916        .pointer("/artifactUpdate/artifact")
1917        .or_else(|| raw.get("artifact"))
1918        .cloned();
1919    let kind = task_state
1920        .as_deref()
1921        .map(|state| format!("a2a.task.{state}"))
1922        .unwrap_or_else(|| "a2a.task.update".to_string());
1923    ProviderPayload::Known(KnownProviderPayload::A2aPush(A2aPushPayload {
1924        task_id,
1925        task_state,
1926        artifact,
1927        sender,
1928        raw,
1929        kind,
1930    }))
1931}
1932
1933fn kafka_payload(
1934    kind: &str,
1935    headers: &BTreeMap<String, String>,
1936    raw: JsonValue,
1937) -> ProviderPayload {
1938    ProviderPayload::Known(KnownProviderPayload::Kafka(stream_payload(
1939        kind, headers, raw,
1940    )))
1941}
1942
1943fn nats_payload(kind: &str, headers: &BTreeMap<String, String>, raw: JsonValue) -> ProviderPayload {
1944    ProviderPayload::Known(KnownProviderPayload::Nats(stream_payload(
1945        kind, headers, raw,
1946    )))
1947}
1948
1949fn pulsar_payload(
1950    kind: &str,
1951    headers: &BTreeMap<String, String>,
1952    raw: JsonValue,
1953) -> ProviderPayload {
1954    ProviderPayload::Known(KnownProviderPayload::Pulsar(stream_payload(
1955        kind, headers, raw,
1956    )))
1957}
1958
1959fn postgres_cdc_payload(
1960    kind: &str,
1961    headers: &BTreeMap<String, String>,
1962    raw: JsonValue,
1963) -> ProviderPayload {
1964    ProviderPayload::Known(KnownProviderPayload::PostgresCdc(stream_payload(
1965        kind, headers, raw,
1966    )))
1967}
1968
1969fn email_payload(
1970    kind: &str,
1971    headers: &BTreeMap<String, String>,
1972    raw: JsonValue,
1973) -> ProviderPayload {
1974    ProviderPayload::Known(KnownProviderPayload::Email(stream_payload(
1975        kind, headers, raw,
1976    )))
1977}
1978
1979fn websocket_payload(
1980    kind: &str,
1981    headers: &BTreeMap<String, String>,
1982    raw: JsonValue,
1983) -> ProviderPayload {
1984    ProviderPayload::Known(KnownProviderPayload::Websocket(stream_payload(
1985        kind, headers, raw,
1986    )))
1987}
1988
1989fn stream_payload(
1990    kind: &str,
1991    headers: &BTreeMap<String, String>,
1992    raw: JsonValue,
1993) -> StreamEventPayload {
1994    StreamEventPayload {
1995        event: kind.to_string(),
1996        source: json_stringish(&raw, &["source", "connector", "origin"]),
1997        stream: json_stringish(
1998            &raw,
1999            &["stream", "topic", "subject", "channel", "mailbox", "slot"],
2000        ),
2001        partition: json_stringish(&raw, &["partition", "shard", "consumer"]),
2002        offset: json_stringish(&raw, &["offset", "sequence", "lsn", "message_id"]),
2003        key: json_stringish(&raw, &["key", "message_key", "id", "event_id"]),
2004        timestamp: json_stringish(&raw, &["timestamp", "occurred_at", "received_at", "ts"]),
2005        headers: headers.clone(),
2006        raw,
2007    }
2008}
2009
2010fn json_stringish(raw: &JsonValue, fields: &[&str]) -> Option<String> {
2011    fields.iter().find_map(|field| {
2012        let value = raw.get(*field)?;
2013        value
2014            .as_str()
2015            .map(ToString::to_string)
2016            .or_else(|| parse_json_i64ish(value).map(|number| number.to_string()))
2017            .or_else(|| value.as_u64().map(|number| number.to_string()))
2018    })
2019}
2020
2021fn parse_rfc3339(text: &str) -> Option<OffsetDateTime> {
2022    OffsetDateTime::parse(text, &time::format_description::well_known::Rfc3339).ok()
2023}
2024
2025#[cfg(test)]
2026mod tests {
2027    use super::*;
2028
2029    fn sample_headers() -> BTreeMap<String, String> {
2030        BTreeMap::from([
2031            ("Authorization".to_string(), "Bearer secret".to_string()),
2032            ("Cookie".to_string(), "session=abc".to_string()),
2033            ("User-Agent".to_string(), "GitHub-Hookshot/123".to_string()),
2034            ("X-GitHub-Delivery".to_string(), "delivery-123".to_string()),
2035            ("X-GitHub-Event".to_string(), "issues".to_string()),
2036            ("X-Webhook-Token".to_string(), "token".to_string()),
2037        ])
2038    }
2039
2040    #[test]
2041    fn default_redaction_policy_keeps_safe_headers() {
2042        let redacted = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
2043        assert_eq!(redacted.get("User-Agent").unwrap(), "GitHub-Hookshot/123");
2044        assert_eq!(redacted.get("X-GitHub-Delivery").unwrap(), "delivery-123");
2045        assert_eq!(
2046            redacted.get("Authorization").unwrap(),
2047            REDACTED_HEADER_VALUE
2048        );
2049        assert_eq!(redacted.get("Cookie").unwrap(), REDACTED_HEADER_VALUE);
2050        assert_eq!(
2051            redacted.get("X-Webhook-Token").unwrap(),
2052            REDACTED_HEADER_VALUE
2053        );
2054    }
2055
2056    #[test]
2057    fn provider_catalog_rejects_duplicates() {
2058        let mut catalog = ProviderCatalog::default();
2059        catalog
2060            .register(Arc::new(BuiltinProviderSchema {
2061                provider_id: "github",
2062                harn_schema_name: "GitHubEventPayload",
2063                metadata: provider_metadata_entry(
2064                    "github",
2065                    &["webhook"],
2066                    "GitHubEventPayload",
2067                    &[],
2068                    SignatureVerificationMetadata::None,
2069                    Vec::new(),
2070                    ProviderRuntimeMetadata::Placeholder,
2071                ),
2072                normalize: github_payload,
2073            }))
2074            .unwrap();
2075        let error = catalog
2076            .register(Arc::new(BuiltinProviderSchema {
2077                provider_id: "github",
2078                harn_schema_name: "GitHubEventPayload",
2079                metadata: provider_metadata_entry(
2080                    "github",
2081                    &["webhook"],
2082                    "GitHubEventPayload",
2083                    &[],
2084                    SignatureVerificationMetadata::None,
2085                    Vec::new(),
2086                    ProviderRuntimeMetadata::Placeholder,
2087                ),
2088                normalize: github_payload,
2089            }))
2090            .unwrap_err();
2091        assert_eq!(
2092            error,
2093            ProviderCatalogError::DuplicateProvider("github".to_string())
2094        );
2095    }
2096
2097    #[test]
2098    fn registered_provider_metadata_marks_builtin_connectors() {
2099        let entries = registered_provider_metadata();
2100        let builtin: Vec<&ProviderMetadata> = entries
2101            .iter()
2102            .filter(|entry| matches!(entry.runtime, ProviderRuntimeMetadata::Builtin { .. }))
2103            .collect();
2104
2105        assert_eq!(builtin.len(), 13);
2106        assert!(builtin.iter().any(|entry| entry.provider == "a2a-push"));
2107        assert!(builtin.iter().any(|entry| entry.provider == "cron"));
2108        assert!(builtin.iter().any(|entry| entry.provider == "github"));
2109        assert!(builtin.iter().any(|entry| entry.provider == "linear"));
2110        assert!(builtin.iter().any(|entry| entry.provider == "notion"));
2111        assert!(builtin.iter().any(|entry| entry.provider == "slack"));
2112        assert!(builtin.iter().any(|entry| entry.provider == "webhook"));
2113        let kafka = entries
2114            .iter()
2115            .find(|entry| entry.provider == "kafka")
2116            .expect("kafka stream provider");
2117        assert_eq!(kafka.kinds, vec!["stream".to_string()]);
2118        assert_eq!(kafka.schema_name, "StreamEventPayload");
2119        assert!(matches!(
2120            kafka.runtime,
2121            ProviderRuntimeMetadata::Builtin {
2122                ref connector,
2123                default_signature_variant: None
2124            } if connector == "stream"
2125        ));
2126    }
2127
2128    #[test]
2129    fn trigger_event_round_trip_is_stable() {
2130        let provider = ProviderId::from("github");
2131        let headers = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
2132        let payload = ProviderPayload::normalize(
2133            &provider,
2134            "issues",
2135            &sample_headers(),
2136            serde_json::json!({
2137                "action": "opened",
2138                "installation": {"id": 42},
2139                "issue": {"number": 99}
2140            }),
2141        )
2142        .unwrap();
2143        let event = TriggerEvent {
2144            id: TriggerEventId("trigger_evt_fixed".to_string()),
2145            provider,
2146            kind: "issues".to_string(),
2147            received_at: parse_rfc3339("2026-04-19T07:00:00Z").unwrap(),
2148            occurred_at: Some(parse_rfc3339("2026-04-19T06:59:59Z").unwrap()),
2149            dedupe_key: "delivery-123".to_string(),
2150            trace_id: TraceId("trace_fixed".to_string()),
2151            tenant_id: Some(TenantId("tenant_1".to_string())),
2152            headers,
2153            provider_payload: payload,
2154            signature_status: SignatureStatus::Verified,
2155            dedupe_claimed: false,
2156            batch: None,
2157            raw_body: Some(vec![0, 159, 255, 10]),
2158        };
2159
2160        let once = serde_json::to_value(&event).unwrap();
2161        assert_eq!(once["raw_body"], serde_json::json!("AJ//Cg=="));
2162        let decoded: TriggerEvent = serde_json::from_value(once.clone()).unwrap();
2163        let twice = serde_json::to_value(&decoded).unwrap();
2164        assert_eq!(decoded, event);
2165        assert_eq!(once, twice);
2166    }
2167
2168    #[test]
2169    fn unknown_provider_errors() {
2170        let error = ProviderPayload::normalize(
2171            &ProviderId::from("custom-provider"),
2172            "thing.happened",
2173            &BTreeMap::new(),
2174            serde_json::json!({"ok": true}),
2175        )
2176        .unwrap_err();
2177        assert_eq!(
2178            error,
2179            ProviderCatalogError::UnknownProvider("custom-provider".to_string())
2180        );
2181    }
2182
2183    #[test]
2184    fn provider_normalizes_stream_payloads() {
2185        let payload = ProviderPayload::normalize(
2186            &ProviderId::from("kafka"),
2187            "quote.tick",
2188            &BTreeMap::from([("x-source".to_string(), "feed".to_string())]),
2189            serde_json::json!({
2190                "topic": "quotes",
2191                "partition": 7,
2192                "offset": "42",
2193                "key": "AAPL",
2194                "timestamp": "2026-04-21T12:00:00Z"
2195            }),
2196        )
2197        .expect("stream payload");
2198        let ProviderPayload::Known(KnownProviderPayload::Kafka(payload)) = payload else {
2199            panic!("expected kafka stream payload")
2200        };
2201        assert_eq!(payload.event, "quote.tick");
2202        assert_eq!(payload.stream.as_deref(), Some("quotes"));
2203        assert_eq!(payload.partition.as_deref(), Some("7"));
2204        assert_eq!(payload.offset.as_deref(), Some("42"));
2205        assert_eq!(payload.key.as_deref(), Some("AAPL"));
2206        assert_eq!(payload.timestamp.as_deref(), Some("2026-04-21T12:00:00Z"));
2207    }
2208}