Skip to main content

harn_vm/triggers/
event.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, OnceLock, RwLock};
3
4use serde::{Deserialize, Deserializer, Serialize, Serializer};
5use serde_json::Value as JsonValue;
6use time::OffsetDateTime;
7use uuid::Uuid;
8
9#[cfg(test)]
10use crate::redact::REDACTED_HEADER_VALUE;
11use crate::triggers::test_util::clock;
12
13fn serialize_optional_bytes_b64<S>(
14    value: &Option<Vec<u8>>,
15    serializer: S,
16) -> Result<S::Ok, S::Error>
17where
18    S: Serializer,
19{
20    use base64::Engine;
21
22    match value {
23        Some(bytes) => {
24            serializer.serialize_some(&base64::engine::general_purpose::STANDARD.encode(bytes))
25        }
26        None => serializer.serialize_none(),
27    }
28}
29
30fn deserialize_optional_bytes_b64<'de, D>(deserializer: D) -> Result<Option<Vec<u8>>, D::Error>
31where
32    D: Deserializer<'de>,
33{
34    use base64::Engine;
35
36    let encoded = Option::<String>::deserialize(deserializer)?;
37    encoded
38        .map(|text| {
39            base64::engine::general_purpose::STANDARD
40                .decode(text.as_bytes())
41                .map_err(serde::de::Error::custom)
42        })
43        .transpose()
44}
45
46#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
47#[serde(transparent)]
48pub struct TriggerEventId(pub String);
49
50impl TriggerEventId {
51    pub fn new() -> Self {
52        Self(format!("trigger_evt_{}", Uuid::now_v7()))
53    }
54}
55
56impl Default for TriggerEventId {
57    fn default() -> Self {
58        Self::new()
59    }
60}
61
62#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
63#[serde(transparent)]
64pub struct ProviderId(pub String);
65
66impl ProviderId {
67    pub fn new(value: impl Into<String>) -> Self {
68        Self(value.into())
69    }
70
71    pub fn as_str(&self) -> &str {
72        self.0.as_str()
73    }
74}
75
76impl From<&str> for ProviderId {
77    fn from(value: &str) -> Self {
78        Self::new(value)
79    }
80}
81
82impl From<String> for ProviderId {
83    fn from(value: String) -> Self {
84        Self::new(value)
85    }
86}
87
88#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
89#[serde(transparent)]
90pub struct TraceId(pub String);
91
92impl TraceId {
93    pub fn new() -> Self {
94        Self(format!("trace_{}", Uuid::now_v7()))
95    }
96}
97
98impl Default for TraceId {
99    fn default() -> Self {
100        Self::new()
101    }
102}
103
104#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
105#[serde(transparent)]
106pub struct TenantId(pub String);
107
108impl TenantId {
109    pub fn new(value: impl Into<String>) -> Self {
110        Self(value.into())
111    }
112}
113
114#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
115#[serde(tag = "state", rename_all = "snake_case")]
116pub enum SignatureStatus {
117    Verified,
118    Unsigned,
119    Failed { reason: String },
120}
121
122#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
123pub struct GitHubEventCommon {
124    pub event: String,
125    pub action: Option<String>,
126    pub delivery_id: Option<String>,
127    pub installation_id: Option<i64>,
128    #[serde(default, skip_serializing_if = "Option::is_none")]
129    pub topic: Option<String>,
130    #[serde(default, skip_serializing_if = "Option::is_none")]
131    pub repository: Option<JsonValue>,
132    #[serde(default, skip_serializing_if = "Option::is_none")]
133    pub repo: Option<JsonValue>,
134    pub raw: JsonValue,
135}
136
137#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
138pub struct GitHubIssuesEventPayload {
139    #[serde(flatten)]
140    pub common: GitHubEventCommon,
141    pub issue: JsonValue,
142}
143
144#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
145pub struct GitHubPullRequestEventPayload {
146    #[serde(flatten)]
147    pub common: GitHubEventCommon,
148    pub pull_request: JsonValue,
149}
150
151#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
152pub struct GitHubIssueCommentEventPayload {
153    #[serde(flatten)]
154    pub common: GitHubEventCommon,
155    pub issue: JsonValue,
156    pub comment: JsonValue,
157}
158
159#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
160pub struct GitHubPullRequestReviewEventPayload {
161    #[serde(flatten)]
162    pub common: GitHubEventCommon,
163    pub pull_request: JsonValue,
164    pub review: JsonValue,
165}
166
167#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
168pub struct GitHubPushEventPayload {
169    #[serde(flatten)]
170    pub common: GitHubEventCommon,
171    #[serde(default)]
172    pub commits: Vec<JsonValue>,
173    pub distinct_size: Option<i64>,
174}
175
176#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
177pub struct GitHubWorkflowRunEventPayload {
178    #[serde(flatten)]
179    pub common: GitHubEventCommon,
180    pub workflow_run: JsonValue,
181}
182
183#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
184pub struct GitHubDeploymentStatusEventPayload {
185    #[serde(flatten)]
186    pub common: GitHubEventCommon,
187    pub deployment_status: JsonValue,
188    pub deployment: JsonValue,
189}
190
191#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
192pub struct GitHubCheckRunEventPayload {
193    #[serde(flatten)]
194    pub common: GitHubEventCommon,
195    pub check_run: JsonValue,
196}
197
198#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
199pub struct GitHubCheckSuiteEventPayload {
200    #[serde(flatten)]
201    pub common: GitHubEventCommon,
202    pub check_suite: JsonValue,
203    #[serde(default, skip_serializing_if = "Option::is_none")]
204    pub check_suite_id: Option<i64>,
205    #[serde(default, skip_serializing_if = "Option::is_none")]
206    pub pull_request_number: Option<i64>,
207    #[serde(default, skip_serializing_if = "Option::is_none")]
208    pub head_sha: Option<String>,
209    #[serde(default, skip_serializing_if = "Option::is_none")]
210    pub head_ref: Option<String>,
211    #[serde(default, skip_serializing_if = "Option::is_none")]
212    pub base_ref: Option<String>,
213    #[serde(default, skip_serializing_if = "Option::is_none")]
214    pub status: Option<String>,
215    #[serde(default, skip_serializing_if = "Option::is_none")]
216    pub conclusion: Option<String>,
217}
218
219#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
220pub struct GitHubStatusEventPayload {
221    #[serde(flatten)]
222    pub common: GitHubEventCommon,
223    #[serde(default, skip_serializing_if = "Option::is_none")]
224    pub commit_status: Option<JsonValue>,
225    #[serde(default, skip_serializing_if = "Option::is_none")]
226    pub status_id: Option<i64>,
227    #[serde(default, skip_serializing_if = "Option::is_none")]
228    pub head_sha: Option<String>,
229    #[serde(default, skip_serializing_if = "Option::is_none")]
230    pub head_ref: Option<String>,
231    #[serde(default, skip_serializing_if = "Option::is_none")]
232    pub base_ref: Option<String>,
233    #[serde(default, skip_serializing_if = "Option::is_none")]
234    pub state: Option<String>,
235    #[serde(default, skip_serializing_if = "Option::is_none")]
236    pub context: Option<String>,
237    #[serde(default, skip_serializing_if = "Option::is_none")]
238    pub target_url: Option<String>,
239}
240
241#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
242pub struct GitHubMergeGroupEventPayload {
243    #[serde(flatten)]
244    pub common: GitHubEventCommon,
245    pub merge_group: JsonValue,
246    #[serde(default, skip_serializing_if = "Option::is_none")]
247    pub merge_group_id: Option<JsonValue>,
248    #[serde(default, skip_serializing_if = "Option::is_none")]
249    pub head_sha: Option<String>,
250    #[serde(default, skip_serializing_if = "Option::is_none")]
251    pub head_ref: Option<String>,
252    #[serde(default, skip_serializing_if = "Option::is_none")]
253    pub base_sha: Option<String>,
254    #[serde(default, skip_serializing_if = "Option::is_none")]
255    pub base_ref: Option<String>,
256    #[serde(default)]
257    pub pull_requests: Vec<JsonValue>,
258    #[serde(default)]
259    pub pull_request_numbers: Vec<i64>,
260}
261
262#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
263pub struct GitHubInstallationEventPayload {
264    #[serde(flatten)]
265    pub common: GitHubEventCommon,
266    #[serde(default, skip_serializing_if = "Option::is_none")]
267    pub installation: Option<JsonValue>,
268    #[serde(default, skip_serializing_if = "Option::is_none")]
269    pub account: Option<JsonValue>,
270    #[serde(default, skip_serializing_if = "Option::is_none")]
271    pub installation_state: Option<String>,
272    #[serde(default, skip_serializing_if = "Option::is_none")]
273    pub suspended: Option<bool>,
274    #[serde(default, skip_serializing_if = "Option::is_none")]
275    pub revoked: Option<bool>,
276    #[serde(default)]
277    pub repositories: Vec<JsonValue>,
278}
279
280#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
281pub struct GitHubInstallationRepositoriesEventPayload {
282    #[serde(flatten)]
283    pub common: GitHubEventCommon,
284    #[serde(default, skip_serializing_if = "Option::is_none")]
285    pub installation: Option<JsonValue>,
286    #[serde(default, skip_serializing_if = "Option::is_none")]
287    pub account: Option<JsonValue>,
288    #[serde(default, skip_serializing_if = "Option::is_none")]
289    pub installation_state: Option<String>,
290    #[serde(default, skip_serializing_if = "Option::is_none")]
291    pub suspended: Option<bool>,
292    #[serde(default, skip_serializing_if = "Option::is_none")]
293    pub revoked: Option<bool>,
294    #[serde(default, skip_serializing_if = "Option::is_none")]
295    pub repository_selection: Option<String>,
296    #[serde(default)]
297    pub repositories_added: Vec<JsonValue>,
298    #[serde(default)]
299    pub repositories_removed: Vec<JsonValue>,
300}
301
302#[derive(Clone, Debug, PartialEq, Serialize)]
303#[serde(untagged)]
304pub enum GitHubEventPayload {
305    Issues(GitHubIssuesEventPayload),
306    PullRequest(GitHubPullRequestEventPayload),
307    IssueComment(GitHubIssueCommentEventPayload),
308    PullRequestReview(GitHubPullRequestReviewEventPayload),
309    Push(GitHubPushEventPayload),
310    WorkflowRun(GitHubWorkflowRunEventPayload),
311    DeploymentStatus(GitHubDeploymentStatusEventPayload),
312    CheckRun(GitHubCheckRunEventPayload),
313    CheckSuite(GitHubCheckSuiteEventPayload),
314    Status(GitHubStatusEventPayload),
315    MergeGroup(GitHubMergeGroupEventPayload),
316    Installation(GitHubInstallationEventPayload),
317    InstallationRepositories(GitHubInstallationRepositoriesEventPayload),
318    Other(GitHubEventCommon),
319}
320
321// Manual `Deserialize` that dispatches on the `event` field. An untagged
322// enum cannot reliably round-trip these variants because `Push` and the
323// all-optional installation/status variants will accept payloads that
324// belong to a different event kind.
325impl<'de> Deserialize<'de> for GitHubEventPayload {
326    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
327    where
328        D: Deserializer<'de>,
329    {
330        let value = JsonValue::deserialize(deserializer)?;
331        let kind = value
332            .get("event")
333            .and_then(JsonValue::as_str)
334            .unwrap_or("")
335            .to_string();
336        let from_value = |v: JsonValue| -> Result<GitHubEventPayload, D::Error> {
337            let payload = match kind.as_str() {
338                "issues" => GitHubEventPayload::Issues(
339                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
340                ),
341                "pull_request" => GitHubEventPayload::PullRequest(
342                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
343                ),
344                "issue_comment" => GitHubEventPayload::IssueComment(
345                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
346                ),
347                "pull_request_review" => GitHubEventPayload::PullRequestReview(
348                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
349                ),
350                "push" => GitHubEventPayload::Push(
351                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
352                ),
353                "workflow_run" => GitHubEventPayload::WorkflowRun(
354                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
355                ),
356                "deployment_status" => GitHubEventPayload::DeploymentStatus(
357                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
358                ),
359                "check_run" => GitHubEventPayload::CheckRun(
360                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
361                ),
362                "check_suite" => GitHubEventPayload::CheckSuite(
363                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
364                ),
365                "status" => GitHubEventPayload::Status(
366                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
367                ),
368                "merge_group" => GitHubEventPayload::MergeGroup(
369                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
370                ),
371                "installation" => GitHubEventPayload::Installation(
372                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
373                ),
374                "installation_repositories" => GitHubEventPayload::InstallationRepositories(
375                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
376                ),
377                _ => GitHubEventPayload::Other(
378                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
379                ),
380            };
381            Ok(payload)
382        };
383        from_value(value)
384    }
385}
386
387#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
388pub struct SlackEventCommon {
389    pub event: String,
390    pub event_id: Option<String>,
391    pub api_app_id: Option<String>,
392    pub team_id: Option<String>,
393    pub channel_id: Option<String>,
394    pub user_id: Option<String>,
395    pub event_ts: Option<String>,
396    pub raw: JsonValue,
397}
398
399#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
400pub struct SlackMessageEventPayload {
401    #[serde(flatten)]
402    pub common: SlackEventCommon,
403    pub subtype: Option<String>,
404    pub channel_type: Option<String>,
405    pub channel: Option<String>,
406    pub user: Option<String>,
407    pub text: Option<String>,
408    pub ts: Option<String>,
409    pub thread_ts: Option<String>,
410}
411
412#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
413pub struct SlackAppMentionEventPayload {
414    #[serde(flatten)]
415    pub common: SlackEventCommon,
416    pub channel: Option<String>,
417    pub user: Option<String>,
418    pub text: Option<String>,
419    pub ts: Option<String>,
420    pub thread_ts: Option<String>,
421}
422
423#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
424pub struct SlackReactionAddedEventPayload {
425    #[serde(flatten)]
426    pub common: SlackEventCommon,
427    pub reaction: Option<String>,
428    pub item_user: Option<String>,
429    pub item: JsonValue,
430}
431
432#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
433pub struct SlackAppHomeOpenedEventPayload {
434    #[serde(flatten)]
435    pub common: SlackEventCommon,
436    pub user: Option<String>,
437    pub channel: Option<String>,
438    pub tab: Option<String>,
439    pub view: JsonValue,
440}
441
442#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
443pub struct SlackAssistantThreadStartedEventPayload {
444    #[serde(flatten)]
445    pub common: SlackEventCommon,
446    pub assistant_thread: JsonValue,
447    pub thread_ts: Option<String>,
448    pub context: JsonValue,
449}
450
451#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
452#[serde(untagged)]
453pub enum SlackEventPayload {
454    Message(SlackMessageEventPayload),
455    AppMention(SlackAppMentionEventPayload),
456    ReactionAdded(SlackReactionAddedEventPayload),
457    AppHomeOpened(SlackAppHomeOpenedEventPayload),
458    AssistantThreadStarted(SlackAssistantThreadStartedEventPayload),
459    Other(SlackEventCommon),
460}
461
462#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
463pub struct LinearEventCommon {
464    pub event: String,
465    pub action: Option<String>,
466    pub delivery_id: Option<String>,
467    pub organization_id: Option<String>,
468    pub webhook_timestamp: Option<i64>,
469    pub webhook_id: Option<String>,
470    pub url: Option<String>,
471    pub created_at: Option<String>,
472    pub actor: JsonValue,
473    pub raw: JsonValue,
474}
475
476#[derive(Clone, Debug, PartialEq)]
477pub enum LinearIssueChange {
478    Title { previous: Option<String> },
479    Description { previous: Option<String> },
480    Priority { previous: Option<i64> },
481    Estimate { previous: Option<i64> },
482    StateId { previous: Option<String> },
483    TeamId { previous: Option<String> },
484    AssigneeId { previous: Option<String> },
485    ProjectId { previous: Option<String> },
486    CycleId { previous: Option<String> },
487    DueDate { previous: Option<String> },
488    ParentId { previous: Option<String> },
489    SortOrder { previous: Option<f64> },
490    LabelIds { previous: Vec<String> },
491    CompletedAt { previous: Option<String> },
492    Other { field: String, previous: JsonValue },
493}
494
495impl Serialize for LinearIssueChange {
496    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
497    where
498        S: Serializer,
499    {
500        let value = match self {
501            Self::Title { previous } => {
502                serde_json::json!({ "field_name": "title", "previous": previous })
503            }
504            Self::Description { previous } => {
505                serde_json::json!({ "field_name": "description", "previous": previous })
506            }
507            Self::Priority { previous } => {
508                serde_json::json!({ "field_name": "priority", "previous": previous })
509            }
510            Self::Estimate { previous } => {
511                serde_json::json!({ "field_name": "estimate", "previous": previous })
512            }
513            Self::StateId { previous } => {
514                serde_json::json!({ "field_name": "state_id", "previous": previous })
515            }
516            Self::TeamId { previous } => {
517                serde_json::json!({ "field_name": "team_id", "previous": previous })
518            }
519            Self::AssigneeId { previous } => {
520                serde_json::json!({ "field_name": "assignee_id", "previous": previous })
521            }
522            Self::ProjectId { previous } => {
523                serde_json::json!({ "field_name": "project_id", "previous": previous })
524            }
525            Self::CycleId { previous } => {
526                serde_json::json!({ "field_name": "cycle_id", "previous": previous })
527            }
528            Self::DueDate { previous } => {
529                serde_json::json!({ "field_name": "due_date", "previous": previous })
530            }
531            Self::ParentId { previous } => {
532                serde_json::json!({ "field_name": "parent_id", "previous": previous })
533            }
534            Self::SortOrder { previous } => {
535                serde_json::json!({ "field_name": "sort_order", "previous": previous })
536            }
537            Self::LabelIds { previous } => {
538                serde_json::json!({ "field_name": "label_ids", "previous": previous })
539            }
540            Self::CompletedAt { previous } => {
541                serde_json::json!({ "field_name": "completed_at", "previous": previous })
542            }
543            Self::Other { field, previous } => {
544                serde_json::json!({ "field_name": "other", "field": field, "previous": previous })
545            }
546        };
547        value.serialize(serializer)
548    }
549}
550
551impl<'de> Deserialize<'de> for LinearIssueChange {
552    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
553    where
554        D: Deserializer<'de>,
555    {
556        let value = JsonValue::deserialize(deserializer)?;
557        let field_name = value
558            .get("field_name")
559            .and_then(JsonValue::as_str)
560            .ok_or_else(|| serde::de::Error::custom("linear issue change missing field_name"))?;
561        let previous = value.get("previous").cloned().unwrap_or(JsonValue::Null);
562        Ok(match field_name {
563            "title" => Self::Title {
564                previous: previous.as_str().map(ToString::to_string),
565            },
566            "description" => Self::Description {
567                previous: previous.as_str().map(ToString::to_string),
568            },
569            "priority" => Self::Priority {
570                previous: parse_json_i64ish(&previous),
571            },
572            "estimate" => Self::Estimate {
573                previous: parse_json_i64ish(&previous),
574            },
575            "state_id" => Self::StateId {
576                previous: previous.as_str().map(ToString::to_string),
577            },
578            "team_id" => Self::TeamId {
579                previous: previous.as_str().map(ToString::to_string),
580            },
581            "assignee_id" => Self::AssigneeId {
582                previous: previous.as_str().map(ToString::to_string),
583            },
584            "project_id" => Self::ProjectId {
585                previous: previous.as_str().map(ToString::to_string),
586            },
587            "cycle_id" => Self::CycleId {
588                previous: previous.as_str().map(ToString::to_string),
589            },
590            "due_date" => Self::DueDate {
591                previous: previous.as_str().map(ToString::to_string),
592            },
593            "parent_id" => Self::ParentId {
594                previous: previous.as_str().map(ToString::to_string),
595            },
596            "sort_order" => Self::SortOrder {
597                previous: previous.as_f64(),
598            },
599            "label_ids" => Self::LabelIds {
600                previous: parse_string_array(&previous),
601            },
602            "completed_at" => Self::CompletedAt {
603                previous: previous.as_str().map(ToString::to_string),
604            },
605            "other" => Self::Other {
606                field: value
607                    .get("field")
608                    .and_then(JsonValue::as_str)
609                    .map(ToString::to_string)
610                    .unwrap_or_else(|| "unknown".to_string()),
611                previous,
612            },
613            other => Self::Other {
614                field: other.to_string(),
615                previous,
616            },
617        })
618    }
619}
620
621#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
622pub struct LinearIssueEventPayload {
623    #[serde(flatten)]
624    pub common: LinearEventCommon,
625    pub issue: JsonValue,
626    #[serde(default)]
627    pub changes: Vec<LinearIssueChange>,
628}
629
630#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
631pub struct LinearIssueCommentEventPayload {
632    #[serde(flatten)]
633    pub common: LinearEventCommon,
634    pub comment: JsonValue,
635}
636
637#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
638pub struct LinearIssueLabelEventPayload {
639    #[serde(flatten)]
640    pub common: LinearEventCommon,
641    pub label: JsonValue,
642}
643
644#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
645pub struct LinearProjectEventPayload {
646    #[serde(flatten)]
647    pub common: LinearEventCommon,
648    pub project: JsonValue,
649}
650
651#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
652pub struct LinearCycleEventPayload {
653    #[serde(flatten)]
654    pub common: LinearEventCommon,
655    pub cycle: JsonValue,
656}
657
658#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
659pub struct LinearCustomerEventPayload {
660    #[serde(flatten)]
661    pub common: LinearEventCommon,
662    pub customer: JsonValue,
663}
664
665#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
666pub struct LinearCustomerRequestEventPayload {
667    #[serde(flatten)]
668    pub common: LinearEventCommon,
669    pub customer_request: JsonValue,
670}
671
672#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
673#[serde(untagged)]
674pub enum LinearEventPayload {
675    Issue(LinearIssueEventPayload),
676    IssueComment(LinearIssueCommentEventPayload),
677    IssueLabel(LinearIssueLabelEventPayload),
678    Project(LinearProjectEventPayload),
679    Cycle(LinearCycleEventPayload),
680    Customer(LinearCustomerEventPayload),
681    CustomerRequest(LinearCustomerRequestEventPayload),
682    Other(LinearEventCommon),
683}
684
685#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
686pub struct NotionPolledChangeEvent {
687    pub resource: String,
688    pub source_id: String,
689    pub entity_id: String,
690    pub high_water_mark: String,
691    pub before: Option<JsonValue>,
692    pub after: JsonValue,
693}
694
695#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
696pub struct NotionEventPayload {
697    pub event: String,
698    pub workspace_id: Option<String>,
699    pub request_id: Option<String>,
700    pub subscription_id: Option<String>,
701    pub integration_id: Option<String>,
702    pub attempt_number: Option<u32>,
703    pub entity_id: Option<String>,
704    pub entity_type: Option<String>,
705    pub api_version: Option<String>,
706    pub verification_token: Option<String>,
707    pub polled: Option<NotionPolledChangeEvent>,
708    pub raw: JsonValue,
709}
710
711#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
712pub struct CronEventPayload {
713    pub cron_id: Option<String>,
714    pub schedule: Option<String>,
715    #[serde(with = "time::serde::rfc3339")]
716    pub tick_at: OffsetDateTime,
717    pub raw: JsonValue,
718}
719
720#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
721pub struct GenericWebhookPayload {
722    pub source: Option<String>,
723    pub content_type: Option<String>,
724    pub raw: JsonValue,
725}
726
727#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
728pub struct A2aPushPayload {
729    pub task_id: Option<String>,
730    pub task_state: Option<String>,
731    pub artifact: Option<JsonValue>,
732    pub sender: Option<String>,
733    pub raw: JsonValue,
734    pub kind: String,
735}
736
737#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
738pub struct StreamEventPayload {
739    pub event: String,
740    pub source: Option<String>,
741    pub stream: Option<String>,
742    pub partition: Option<String>,
743    pub offset: Option<String>,
744    pub key: Option<String>,
745    pub timestamp: Option<String>,
746    #[serde(default)]
747    pub headers: BTreeMap<String, String>,
748    pub raw: JsonValue,
749}
750
751#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
752pub struct ExtensionProviderPayload {
753    pub provider: String,
754    pub schema_name: String,
755    pub raw: JsonValue,
756}
757
758#[allow(clippy::large_enum_variant)]
759#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
760#[serde(untagged)]
761pub enum ProviderPayload {
762    Known(KnownProviderPayload),
763    Extension(ExtensionProviderPayload),
764}
765
766impl ProviderPayload {
767    pub fn provider(&self) -> &str {
768        match self {
769            Self::Known(known) => known.provider(),
770            Self::Extension(payload) => payload.provider.as_str(),
771        }
772    }
773
774    pub fn normalize(
775        provider: &ProviderId,
776        kind: &str,
777        headers: &BTreeMap<String, String>,
778        raw: JsonValue,
779    ) -> Result<Self, ProviderCatalogError> {
780        provider_catalog()
781            .read()
782            .expect("provider catalog poisoned")
783            .normalize(provider, kind, headers, raw)
784    }
785}
786
787#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
788#[serde(tag = "provider")]
789pub enum KnownProviderPayload {
790    #[serde(rename = "github")]
791    GitHub(GitHubEventPayload),
792    #[serde(rename = "slack")]
793    Slack(Box<SlackEventPayload>),
794    #[serde(rename = "linear")]
795    Linear(LinearEventPayload),
796    #[serde(rename = "notion")]
797    Notion(Box<NotionEventPayload>),
798    #[serde(rename = "cron")]
799    Cron(CronEventPayload),
800    #[serde(rename = "webhook")]
801    Webhook(GenericWebhookPayload),
802    #[serde(rename = "a2a-push")]
803    A2aPush(A2aPushPayload),
804    #[serde(rename = "kafka")]
805    Kafka(StreamEventPayload),
806    #[serde(rename = "nats")]
807    Nats(StreamEventPayload),
808    #[serde(rename = "pulsar")]
809    Pulsar(StreamEventPayload),
810    #[serde(rename = "postgres-cdc")]
811    PostgresCdc(StreamEventPayload),
812    #[serde(rename = "email")]
813    Email(StreamEventPayload),
814    #[serde(rename = "websocket")]
815    Websocket(StreamEventPayload),
816}
817
818impl KnownProviderPayload {
819    pub fn provider(&self) -> &str {
820        match self {
821            Self::GitHub(_) => "github",
822            Self::Slack(_) => "slack",
823            Self::Linear(_) => "linear",
824            Self::Notion(_) => "notion",
825            Self::Cron(_) => "cron",
826            Self::Webhook(_) => "webhook",
827            Self::A2aPush(_) => "a2a-push",
828            Self::Kafka(_) => "kafka",
829            Self::Nats(_) => "nats",
830            Self::Pulsar(_) => "pulsar",
831            Self::PostgresCdc(_) => "postgres-cdc",
832            Self::Email(_) => "email",
833            Self::Websocket(_) => "websocket",
834        }
835    }
836}
837
838#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
839pub struct TriggerEvent {
840    pub id: TriggerEventId,
841    pub provider: ProviderId,
842    pub kind: String,
843    #[serde(with = "time::serde::rfc3339")]
844    pub received_at: OffsetDateTime,
845    #[serde(with = "time::serde::rfc3339::option")]
846    pub occurred_at: Option<OffsetDateTime>,
847    pub dedupe_key: String,
848    pub trace_id: TraceId,
849    pub tenant_id: Option<TenantId>,
850    pub headers: BTreeMap<String, String>,
851    #[serde(default, skip_serializing_if = "Option::is_none")]
852    pub batch: Option<Vec<JsonValue>>,
853    #[serde(
854        default,
855        skip_serializing_if = "Option::is_none",
856        serialize_with = "serialize_optional_bytes_b64",
857        deserialize_with = "deserialize_optional_bytes_b64"
858    )]
859    pub raw_body: Option<Vec<u8>>,
860    pub provider_payload: ProviderPayload,
861    pub signature_status: SignatureStatus,
862    #[serde(skip)]
863    pub dedupe_claimed: bool,
864}
865
866impl TriggerEvent {
867    #[allow(clippy::too_many_arguments)]
868    pub fn new(
869        provider: ProviderId,
870        kind: impl Into<String>,
871        occurred_at: Option<OffsetDateTime>,
872        dedupe_key: impl Into<String>,
873        tenant_id: Option<TenantId>,
874        headers: BTreeMap<String, String>,
875        provider_payload: ProviderPayload,
876        signature_status: SignatureStatus,
877    ) -> Self {
878        Self {
879            id: TriggerEventId::new(),
880            provider,
881            kind: kind.into(),
882            received_at: clock::now_utc(),
883            occurred_at,
884            dedupe_key: dedupe_key.into(),
885            trace_id: TraceId::new(),
886            tenant_id,
887            headers,
888            batch: None,
889            raw_body: None,
890            provider_payload,
891            signature_status,
892            dedupe_claimed: false,
893        }
894    }
895
896    pub fn dedupe_claimed(&self) -> bool {
897        self.dedupe_claimed
898    }
899
900    pub fn mark_dedupe_claimed(&mut self) {
901        self.dedupe_claimed = true;
902    }
903}
904
905/// Backwards-compatible alias for the unified [`crate::redact::RedactionPolicy`].
906///
907/// Trigger ingest paths historically only had access to a header-only
908/// policy, so this alias keeps the call-site name stable while the
909/// underlying type is now the broader policy that also covers URLs,
910/// JSON fields, and free-form strings.
911pub type HeaderRedactionPolicy = crate::redact::RedactionPolicy;
912
913pub fn redact_headers(
914    headers: &BTreeMap<String, String>,
915    policy: &HeaderRedactionPolicy,
916) -> BTreeMap<String, String> {
917    policy.redact_headers(headers)
918}
919
920#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
921pub struct ProviderSecretRequirement {
922    pub name: String,
923    pub required: bool,
924    pub namespace: String,
925}
926
927#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
928pub struct ProviderOutboundMethod {
929    pub name: String,
930}
931
932#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
933#[serde(tag = "kind", rename_all = "snake_case")]
934pub enum SignatureVerificationMetadata {
935    #[default]
936    None,
937    Hmac {
938        variant: String,
939        raw_body: bool,
940        signature_header: String,
941        timestamp_header: Option<String>,
942        id_header: Option<String>,
943        default_tolerance_secs: Option<i64>,
944        digest: String,
945        encoding: String,
946    },
947}
948
949#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
950#[serde(tag = "kind", rename_all = "snake_case")]
951pub enum ProviderRuntimeMetadata {
952    Builtin {
953        connector: String,
954        default_signature_variant: Option<String>,
955    },
956    #[default]
957    Placeholder,
958}
959
960#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
961pub struct ProviderMetadata {
962    pub provider: String,
963    #[serde(default)]
964    pub kinds: Vec<String>,
965    pub schema_name: String,
966    #[serde(default)]
967    pub outbound_methods: Vec<ProviderOutboundMethod>,
968    #[serde(default)]
969    pub secret_requirements: Vec<ProviderSecretRequirement>,
970    #[serde(default)]
971    pub signature_verification: SignatureVerificationMetadata,
972    #[serde(default)]
973    pub runtime: ProviderRuntimeMetadata,
974}
975
976impl ProviderMetadata {
977    pub fn supports_kind(&self, kind: &str) -> bool {
978        self.kinds.iter().any(|candidate| candidate == kind)
979    }
980
981    pub fn required_secret_names(&self) -> impl Iterator<Item = &str> {
982        self.secret_requirements
983            .iter()
984            .filter(|requirement| requirement.required)
985            .map(|requirement| requirement.name.as_str())
986    }
987}
988
989pub trait ProviderSchema: Send + Sync {
990    fn provider_id(&self) -> &str;
991    fn harn_schema_name(&self) -> &str;
992    fn metadata(&self) -> ProviderMetadata {
993        ProviderMetadata {
994            provider: self.provider_id().to_string(),
995            schema_name: self.harn_schema_name().to_string(),
996            ..ProviderMetadata::default()
997        }
998    }
999    fn normalize(
1000        &self,
1001        kind: &str,
1002        headers: &BTreeMap<String, String>,
1003        raw: JsonValue,
1004    ) -> Result<ProviderPayload, ProviderCatalogError>;
1005}
1006
1007#[derive(Clone, Debug, PartialEq, Eq)]
1008pub enum ProviderCatalogError {
1009    DuplicateProvider(String),
1010    UnknownProvider(String),
1011}
1012
1013impl std::fmt::Display for ProviderCatalogError {
1014    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1015        match self {
1016            Self::DuplicateProvider(provider) => {
1017                write!(f, "provider `{provider}` is already registered")
1018            }
1019            Self::UnknownProvider(provider) => write!(f, "provider `{provider}` is not registered"),
1020        }
1021    }
1022}
1023
1024impl std::error::Error for ProviderCatalogError {}
1025
1026#[derive(Clone, Default)]
1027pub struct ProviderCatalog {
1028    providers: BTreeMap<String, Arc<dyn ProviderSchema>>,
1029}
1030
1031impl ProviderCatalog {
1032    pub fn with_defaults() -> Self {
1033        let mut catalog = Self::default();
1034        for schema in default_provider_schemas() {
1035            catalog
1036                .register(schema)
1037                .expect("default providers must register cleanly");
1038        }
1039        catalog
1040    }
1041
1042    pub fn with_defaults_and(
1043        schemas: Vec<Arc<dyn ProviderSchema>>,
1044    ) -> Result<Self, ProviderCatalogError> {
1045        let mut catalog = Self::with_defaults();
1046        let builtin_providers: BTreeSet<String> = catalog.schema_names().into_keys().collect();
1047        for schema in schemas {
1048            if builtin_providers.contains(schema.provider_id()) {
1049                continue;
1050            }
1051            catalog.register(schema)?;
1052        }
1053        Ok(catalog)
1054    }
1055
1056    pub fn register(
1057        &mut self,
1058        schema: Arc<dyn ProviderSchema>,
1059    ) -> Result<(), ProviderCatalogError> {
1060        let provider = schema.provider_id().to_string();
1061        if self.providers.contains_key(provider.as_str()) {
1062            return Err(ProviderCatalogError::DuplicateProvider(provider));
1063        }
1064        self.providers.insert(provider, schema);
1065        Ok(())
1066    }
1067
1068    pub fn normalize(
1069        &self,
1070        provider: &ProviderId,
1071        kind: &str,
1072        headers: &BTreeMap<String, String>,
1073        raw: JsonValue,
1074    ) -> Result<ProviderPayload, ProviderCatalogError> {
1075        let schema = self
1076            .providers
1077            .get(provider.as_str())
1078            .ok_or_else(|| ProviderCatalogError::UnknownProvider(provider.0.clone()))?;
1079        schema.normalize(kind, headers, raw)
1080    }
1081
1082    pub fn schema_names(&self) -> BTreeMap<String, String> {
1083        self.providers
1084            .iter()
1085            .map(|(provider, schema)| (provider.clone(), schema.harn_schema_name().to_string()))
1086            .collect()
1087    }
1088
1089    pub fn entries(&self) -> Vec<ProviderMetadata> {
1090        self.providers
1091            .values()
1092            .map(|schema| schema.metadata())
1093            .collect()
1094    }
1095
1096    pub fn metadata_for(&self, provider: &str) -> Option<ProviderMetadata> {
1097        self.providers.get(provider).map(|schema| schema.metadata())
1098    }
1099}
1100
1101pub fn register_provider_schema(
1102    schema: Arc<dyn ProviderSchema>,
1103) -> Result<(), ProviderCatalogError> {
1104    provider_catalog()
1105        .write()
1106        .expect("provider catalog poisoned")
1107        .register(schema)
1108}
1109
1110pub fn reset_provider_catalog() {
1111    *provider_catalog()
1112        .write()
1113        .expect("provider catalog poisoned") = ProviderCatalog::with_defaults();
1114}
1115
1116pub fn reset_provider_catalog_with(
1117    schemas: Vec<Arc<dyn ProviderSchema>>,
1118) -> Result<(), ProviderCatalogError> {
1119    let catalog = ProviderCatalog::with_defaults_and(schemas)?;
1120    install_provider_catalog(catalog);
1121    Ok(())
1122}
1123
1124pub fn install_provider_catalog(catalog: ProviderCatalog) {
1125    *provider_catalog()
1126        .write()
1127        .expect("provider catalog poisoned") = catalog;
1128}
1129
1130pub fn registered_provider_schema_names() -> BTreeMap<String, String> {
1131    provider_catalog()
1132        .read()
1133        .expect("provider catalog poisoned")
1134        .schema_names()
1135}
1136
1137pub fn registered_provider_metadata() -> Vec<ProviderMetadata> {
1138    provider_catalog()
1139        .read()
1140        .expect("provider catalog poisoned")
1141        .entries()
1142}
1143
1144pub fn provider_metadata(provider: &str) -> Option<ProviderMetadata> {
1145    provider_catalog()
1146        .read()
1147        .expect("provider catalog poisoned")
1148        .metadata_for(provider)
1149}
1150
1151fn provider_catalog() -> &'static RwLock<ProviderCatalog> {
1152    static PROVIDER_CATALOG: OnceLock<RwLock<ProviderCatalog>> = OnceLock::new();
1153    PROVIDER_CATALOG.get_or_init(|| RwLock::new(ProviderCatalog::with_defaults()))
1154}
1155
1156struct BuiltinProviderSchema {
1157    provider_id: &'static str,
1158    harn_schema_name: &'static str,
1159    metadata: ProviderMetadata,
1160    normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
1161}
1162
1163impl ProviderSchema for BuiltinProviderSchema {
1164    fn provider_id(&self) -> &str {
1165        self.provider_id
1166    }
1167
1168    fn harn_schema_name(&self) -> &str {
1169        self.harn_schema_name
1170    }
1171
1172    fn metadata(&self) -> ProviderMetadata {
1173        self.metadata.clone()
1174    }
1175
1176    fn normalize(
1177        &self,
1178        kind: &str,
1179        headers: &BTreeMap<String, String>,
1180        raw: JsonValue,
1181    ) -> Result<ProviderPayload, ProviderCatalogError> {
1182        Ok((self.normalize)(kind, headers, raw))
1183    }
1184}
1185
1186fn provider_metadata_entry(
1187    provider: &str,
1188    kinds: &[&str],
1189    schema_name: &str,
1190    outbound_methods: &[&str],
1191    signature_verification: SignatureVerificationMetadata,
1192    secret_requirements: Vec<ProviderSecretRequirement>,
1193    runtime: ProviderRuntimeMetadata,
1194) -> ProviderMetadata {
1195    ProviderMetadata {
1196        provider: provider.to_string(),
1197        kinds: kinds.iter().map(|kind| kind.to_string()).collect(),
1198        schema_name: schema_name.to_string(),
1199        outbound_methods: outbound_methods
1200            .iter()
1201            .map(|name| ProviderOutboundMethod {
1202                name: (*name).to_string(),
1203            })
1204            .collect(),
1205        secret_requirements,
1206        signature_verification,
1207        runtime,
1208    }
1209}
1210
1211fn hmac_signature_metadata(
1212    variant: &str,
1213    signature_header: &str,
1214    timestamp_header: Option<&str>,
1215    id_header: Option<&str>,
1216    default_tolerance_secs: Option<i64>,
1217    encoding: &str,
1218) -> SignatureVerificationMetadata {
1219    SignatureVerificationMetadata::Hmac {
1220        variant: variant.to_string(),
1221        raw_body: true,
1222        signature_header: signature_header.to_string(),
1223        timestamp_header: timestamp_header.map(ToString::to_string),
1224        id_header: id_header.map(ToString::to_string),
1225        default_tolerance_secs,
1226        digest: "sha256".to_string(),
1227        encoding: encoding.to_string(),
1228    }
1229}
1230
1231fn required_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1232    ProviderSecretRequirement {
1233        name: name.to_string(),
1234        required: true,
1235        namespace: namespace.to_string(),
1236    }
1237}
1238
1239fn outbound_method(name: &str) -> ProviderOutboundMethod {
1240    ProviderOutboundMethod {
1241        name: name.to_string(),
1242    }
1243}
1244
1245fn optional_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1246    ProviderSecretRequirement {
1247        name: name.to_string(),
1248        required: false,
1249        namespace: namespace.to_string(),
1250    }
1251}
1252
1253fn default_provider_schemas() -> Vec<Arc<dyn ProviderSchema>> {
1254    vec![
1255        Arc::new(BuiltinProviderSchema {
1256            provider_id: "github",
1257            harn_schema_name: "GitHubEventPayload",
1258            metadata: provider_metadata_entry(
1259                "github",
1260                &["webhook"],
1261                "GitHubEventPayload",
1262                &[
1263                    "github.pr.list",
1264                    "github.pr.view",
1265                    "github.pr.checks",
1266                    "github.pr.merge",
1267                    "github.pr.enable_auto_merge",
1268                    "github.pr.comment",
1269                    "github.actions.workflow_dispatch",
1270                    "github.actions.runs",
1271                    "github.actions.run",
1272                    "github.actions.logs",
1273                    "github.release.latest",
1274                    "github.release.assets",
1275                    "github.merge_queue.entries",
1276                    "github.merge_queue.enqueue",
1277                    "github.issue.create",
1278                    "github.issue.comment",
1279                    "github.branch.protection",
1280                    "api_call",
1281                    "issues.create_comment",
1282                    "issues.create",
1283                    "issues.create_with_template",
1284                    "issues.update",
1285                    "issues.add_labels",
1286                    "pulls.list",
1287                    "pulls.list_with_checks",
1288                    "pulls.get",
1289                    "pulls.merge",
1290                    "pulls.merge_safe",
1291                    "pulls.create_review_comment",
1292                    "pulls.get_diff",
1293                    "pulls.list_files",
1294                    "pulls.list_reviews",
1295                    "repos.get_content",
1296                    "repos.get_text",
1297                    "repos.get_latest_release",
1298                    "repos.list_release_assets",
1299                    "repos.get_branch_protection",
1300                    "git.delete_ref",
1301                    "actions.workflow_dispatch",
1302                    "actions.workflow_runs.list",
1303                    "actions.workflow_run.get",
1304                    "check_runs.create",
1305                    "check_runs.update",
1306                    "graphql",
1307                ],
1308                hmac_signature_metadata(
1309                    "github",
1310                    "X-Hub-Signature-256",
1311                    None,
1312                    Some("X-GitHub-Delivery"),
1313                    None,
1314                    "hex",
1315                ),
1316                vec![required_secret("signing_secret", "github")],
1317                ProviderRuntimeMetadata::Placeholder,
1318            ),
1319            normalize: github_payload,
1320        }),
1321        Arc::new(BuiltinProviderSchema {
1322            provider_id: "slack",
1323            harn_schema_name: "SlackEventPayload",
1324            metadata: provider_metadata_entry(
1325                "slack",
1326                &["webhook"],
1327                "SlackEventPayload",
1328                &[
1329                    "post_message",
1330                    "update_message",
1331                    "add_reaction",
1332                    "open_view",
1333                    "user_info",
1334                    "api_call",
1335                    "upload_file",
1336                ],
1337                hmac_signature_metadata(
1338                    "slack",
1339                    "X-Slack-Signature",
1340                    Some("X-Slack-Request-Timestamp"),
1341                    None,
1342                    Some(300),
1343                    "hex",
1344                ),
1345                vec![required_secret("signing_secret", "slack")],
1346                ProviderRuntimeMetadata::Placeholder,
1347            ),
1348            normalize: slack_payload,
1349        }),
1350        Arc::new(BuiltinProviderSchema {
1351            provider_id: "linear",
1352            harn_schema_name: "LinearEventPayload",
1353            metadata: {
1354                let mut metadata = provider_metadata_entry(
1355                    "linear",
1356                    &["webhook"],
1357                    "LinearEventPayload",
1358                    &[],
1359                    hmac_signature_metadata(
1360                        "linear",
1361                        "Linear-Signature",
1362                        None,
1363                        Some("Linear-Delivery"),
1364                        Some(75),
1365                        "hex",
1366                    ),
1367                    vec![
1368                        required_secret("signing_secret", "linear"),
1369                        optional_secret("access_token", "linear"),
1370                    ],
1371                    ProviderRuntimeMetadata::Placeholder,
1372                );
1373                metadata.outbound_methods = vec![
1374                    ProviderOutboundMethod {
1375                        name: "list_issues".to_string(),
1376                    },
1377                    ProviderOutboundMethod {
1378                        name: "update_issue".to_string(),
1379                    },
1380                    ProviderOutboundMethod {
1381                        name: "create_comment".to_string(),
1382                    },
1383                    ProviderOutboundMethod {
1384                        name: "search".to_string(),
1385                    },
1386                    ProviderOutboundMethod {
1387                        name: "graphql".to_string(),
1388                    },
1389                ];
1390                metadata
1391            },
1392            normalize: linear_payload,
1393        }),
1394        Arc::new(BuiltinProviderSchema {
1395            provider_id: "notion",
1396            harn_schema_name: "NotionEventPayload",
1397            metadata: {
1398                let mut metadata = provider_metadata_entry(
1399                    "notion",
1400                    &["webhook", "poll"],
1401                    "NotionEventPayload",
1402                    &[],
1403                    hmac_signature_metadata(
1404                        "notion",
1405                        "X-Notion-Signature",
1406                        None,
1407                        None,
1408                        None,
1409                        "hex",
1410                    ),
1411                    vec![required_secret("verification_token", "notion")],
1412                    ProviderRuntimeMetadata::Placeholder,
1413                );
1414                metadata.outbound_methods = vec![
1415                    outbound_method("get_page"),
1416                    outbound_method("update_page"),
1417                    outbound_method("append_blocks"),
1418                    outbound_method("query_database"),
1419                    outbound_method("search"),
1420                    outbound_method("create_comment"),
1421                    outbound_method("api_call"),
1422                ];
1423                metadata
1424            },
1425            normalize: notion_payload,
1426        }),
1427        Arc::new(BuiltinProviderSchema {
1428            provider_id: "cron",
1429            harn_schema_name: "CronEventPayload",
1430            metadata: provider_metadata_entry(
1431                "cron",
1432                &["cron"],
1433                "CronEventPayload",
1434                &[],
1435                SignatureVerificationMetadata::None,
1436                Vec::new(),
1437                ProviderRuntimeMetadata::Builtin {
1438                    connector: "cron".to_string(),
1439                    default_signature_variant: None,
1440                },
1441            ),
1442            normalize: cron_payload,
1443        }),
1444        Arc::new(BuiltinProviderSchema {
1445            provider_id: "webhook",
1446            harn_schema_name: "GenericWebhookPayload",
1447            metadata: provider_metadata_entry(
1448                "webhook",
1449                &["webhook"],
1450                "GenericWebhookPayload",
1451                &[],
1452                hmac_signature_metadata(
1453                    "standard",
1454                    "webhook-signature",
1455                    Some("webhook-timestamp"),
1456                    Some("webhook-id"),
1457                    Some(300),
1458                    "base64",
1459                ),
1460                vec![required_secret("signing_secret", "webhook")],
1461                ProviderRuntimeMetadata::Builtin {
1462                    connector: "webhook".to_string(),
1463                    default_signature_variant: Some("standard".to_string()),
1464                },
1465            ),
1466            normalize: webhook_payload,
1467        }),
1468        Arc::new(BuiltinProviderSchema {
1469            provider_id: "a2a-push",
1470            harn_schema_name: "A2aPushPayload",
1471            metadata: provider_metadata_entry(
1472                "a2a-push",
1473                &["a2a-push"],
1474                "A2aPushPayload",
1475                &[],
1476                SignatureVerificationMetadata::None,
1477                Vec::new(),
1478                ProviderRuntimeMetadata::Builtin {
1479                    connector: "a2a-push".to_string(),
1480                    default_signature_variant: None,
1481                },
1482            ),
1483            normalize: a2a_push_payload,
1484        }),
1485        Arc::new(stream_provider_schema("kafka", kafka_payload)),
1486        Arc::new(stream_provider_schema("nats", nats_payload)),
1487        Arc::new(stream_provider_schema("pulsar", pulsar_payload)),
1488        Arc::new(stream_provider_schema("postgres-cdc", postgres_cdc_payload)),
1489        Arc::new(stream_provider_schema("email", email_payload)),
1490        Arc::new(stream_provider_schema("websocket", websocket_payload)),
1491    ]
1492}
1493
1494fn stream_provider_schema(
1495    provider_id: &'static str,
1496    normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
1497) -> BuiltinProviderSchema {
1498    BuiltinProviderSchema {
1499        provider_id,
1500        harn_schema_name: "StreamEventPayload",
1501        metadata: provider_metadata_entry(
1502            provider_id,
1503            &["stream"],
1504            "StreamEventPayload",
1505            &[],
1506            SignatureVerificationMetadata::None,
1507            Vec::new(),
1508            ProviderRuntimeMetadata::Builtin {
1509                connector: "stream".to_string(),
1510                default_signature_variant: None,
1511            },
1512        ),
1513        normalize,
1514    }
1515}
1516
1517fn github_payload(
1518    kind: &str,
1519    headers: &BTreeMap<String, String>,
1520    raw: JsonValue,
1521) -> ProviderPayload {
1522    // The connector emits a normalized payload that wraps the original
1523    // GitHub webhook body inside its own `raw` field. When we see that
1524    // wrapper shape, prefer it as the escape-hatch raw so callers don't
1525    // have to traverse two levels of `raw` to reach the upstream payload.
1526    let original_raw = raw
1527        .get("raw")
1528        .filter(|value| value.is_object())
1529        .cloned()
1530        .unwrap_or_else(|| raw.clone());
1531    let common = GitHubEventCommon {
1532        event: kind.to_string(),
1533        action: raw
1534            .get("action")
1535            .and_then(JsonValue::as_str)
1536            .map(ToString::to_string),
1537        delivery_id: raw
1538            .get("delivery_id")
1539            .and_then(JsonValue::as_str)
1540            .map(ToString::to_string)
1541            .or_else(|| headers.get("X-GitHub-Delivery").cloned()),
1542        installation_id: raw
1543            .get("installation_id")
1544            .and_then(JsonValue::as_i64)
1545            .or_else(|| {
1546                raw.get("installation")
1547                    .and_then(|value| value.get("id"))
1548                    .and_then(JsonValue::as_i64)
1549            }),
1550        topic: raw
1551            .get("topic")
1552            .and_then(JsonValue::as_str)
1553            .map(ToString::to_string),
1554        repository: raw.get("repository").cloned(),
1555        repo: raw.get("repo").cloned(),
1556        raw: original_raw,
1557    };
1558    let payload = match kind {
1559        "issues" => GitHubEventPayload::Issues(GitHubIssuesEventPayload {
1560            common,
1561            issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1562        }),
1563        "pull_request" => GitHubEventPayload::PullRequest(GitHubPullRequestEventPayload {
1564            common,
1565            pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1566        }),
1567        "issue_comment" => GitHubEventPayload::IssueComment(GitHubIssueCommentEventPayload {
1568            common,
1569            issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1570            comment: raw.get("comment").cloned().unwrap_or(JsonValue::Null),
1571        }),
1572        "pull_request_review" => {
1573            GitHubEventPayload::PullRequestReview(GitHubPullRequestReviewEventPayload {
1574                common,
1575                pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1576                review: raw.get("review").cloned().unwrap_or(JsonValue::Null),
1577            })
1578        }
1579        "push" => GitHubEventPayload::Push(GitHubPushEventPayload {
1580            common,
1581            commits: raw
1582                .get("commits")
1583                .and_then(JsonValue::as_array)
1584                .cloned()
1585                .unwrap_or_default(),
1586            distinct_size: raw.get("distinct_size").and_then(JsonValue::as_i64),
1587        }),
1588        "workflow_run" => GitHubEventPayload::WorkflowRun(GitHubWorkflowRunEventPayload {
1589            common,
1590            workflow_run: raw.get("workflow_run").cloned().unwrap_or(JsonValue::Null),
1591        }),
1592        "deployment_status" => {
1593            GitHubEventPayload::DeploymentStatus(GitHubDeploymentStatusEventPayload {
1594                common,
1595                deployment_status: raw
1596                    .get("deployment_status")
1597                    .cloned()
1598                    .unwrap_or(JsonValue::Null),
1599                deployment: raw.get("deployment").cloned().unwrap_or(JsonValue::Null),
1600            })
1601        }
1602        "check_run" => GitHubEventPayload::CheckRun(GitHubCheckRunEventPayload {
1603            common,
1604            check_run: raw.get("check_run").cloned().unwrap_or(JsonValue::Null),
1605        }),
1606        "check_suite" => {
1607            let check_suite = raw.get("check_suite").cloned().unwrap_or(JsonValue::Null);
1608            GitHubEventPayload::CheckSuite(GitHubCheckSuiteEventPayload {
1609                check_suite_id: github_promoted_i64(&raw, "check_suite_id")
1610                    .or_else(|| check_suite.get("id").and_then(JsonValue::as_i64)),
1611                pull_request_number: github_promoted_i64(&raw, "pull_request_number"),
1612                head_sha: github_promoted_string(&raw, "head_sha"),
1613                head_ref: github_promoted_string(&raw, "head_ref"),
1614                base_ref: github_promoted_string(&raw, "base_ref"),
1615                status: github_promoted_string(&raw, "status"),
1616                conclusion: github_promoted_string(&raw, "conclusion"),
1617                common,
1618                check_suite,
1619            })
1620        }
1621        "status" => GitHubEventPayload::Status(GitHubStatusEventPayload {
1622            commit_status: raw
1623                .get("commit_status")
1624                .cloned()
1625                .or_else(|| Some(common.raw.clone())),
1626            status_id: github_promoted_i64(&raw, "status_id")
1627                .or_else(|| common.raw.get("id").and_then(JsonValue::as_i64)),
1628            head_sha: github_promoted_string(&raw, "head_sha").or_else(|| {
1629                common
1630                    .raw
1631                    .get("sha")
1632                    .and_then(JsonValue::as_str)
1633                    .map(ToString::to_string)
1634            }),
1635            head_ref: github_promoted_string(&raw, "head_ref"),
1636            base_ref: github_promoted_string(&raw, "base_ref"),
1637            state: github_promoted_string(&raw, "state"),
1638            context: github_promoted_string(&raw, "context"),
1639            target_url: github_promoted_string(&raw, "target_url"),
1640            common,
1641        }),
1642        "merge_group" => {
1643            let merge_group = raw.get("merge_group").cloned().unwrap_or(JsonValue::Null);
1644            GitHubEventPayload::MergeGroup(GitHubMergeGroupEventPayload {
1645                merge_group_id: raw
1646                    .get("merge_group_id")
1647                    .cloned()
1648                    .or_else(|| merge_group.get("id").cloned()),
1649                head_sha: github_promoted_string(&raw, "head_sha").or_else(|| {
1650                    merge_group
1651                        .get("head_sha")
1652                        .and_then(JsonValue::as_str)
1653                        .map(ToString::to_string)
1654                }),
1655                head_ref: github_promoted_string(&raw, "head_ref").or_else(|| {
1656                    merge_group
1657                        .get("head_ref")
1658                        .and_then(JsonValue::as_str)
1659                        .map(ToString::to_string)
1660                }),
1661                base_sha: github_promoted_string(&raw, "base_sha").or_else(|| {
1662                    merge_group
1663                        .get("base_sha")
1664                        .and_then(JsonValue::as_str)
1665                        .map(ToString::to_string)
1666                }),
1667                base_ref: github_promoted_string(&raw, "base_ref").or_else(|| {
1668                    merge_group
1669                        .get("base_ref")
1670                        .and_then(JsonValue::as_str)
1671                        .map(ToString::to_string)
1672                }),
1673                pull_requests: raw
1674                    .get("pull_requests")
1675                    .and_then(JsonValue::as_array)
1676                    .cloned()
1677                    .unwrap_or_default(),
1678                pull_request_numbers: raw
1679                    .get("pull_request_numbers")
1680                    .and_then(JsonValue::as_array)
1681                    .map(|values| {
1682                        values
1683                            .iter()
1684                            .filter_map(JsonValue::as_i64)
1685                            .collect::<Vec<_>>()
1686                    })
1687                    .unwrap_or_default(),
1688                common,
1689                merge_group,
1690            })
1691        }
1692        "installation" => GitHubEventPayload::Installation(GitHubInstallationEventPayload {
1693            installation: raw.get("installation").cloned(),
1694            account: raw.get("account").cloned(),
1695            installation_state: github_promoted_string(&raw, "installation_state"),
1696            suspended: raw.get("suspended").and_then(JsonValue::as_bool),
1697            revoked: raw.get("revoked").and_then(JsonValue::as_bool),
1698            repositories: raw
1699                .get("repositories")
1700                .and_then(JsonValue::as_array)
1701                .cloned()
1702                .unwrap_or_default(),
1703            common,
1704        }),
1705        "installation_repositories" => GitHubEventPayload::InstallationRepositories(
1706            GitHubInstallationRepositoriesEventPayload {
1707                installation: raw.get("installation").cloned(),
1708                account: raw.get("account").cloned(),
1709                installation_state: github_promoted_string(&raw, "installation_state"),
1710                suspended: raw.get("suspended").and_then(JsonValue::as_bool),
1711                revoked: raw.get("revoked").and_then(JsonValue::as_bool),
1712                repository_selection: github_promoted_string(&raw, "repository_selection"),
1713                repositories_added: raw
1714                    .get("repositories_added")
1715                    .and_then(JsonValue::as_array)
1716                    .cloned()
1717                    .unwrap_or_default(),
1718                repositories_removed: raw
1719                    .get("repositories_removed")
1720                    .and_then(JsonValue::as_array)
1721                    .cloned()
1722                    .unwrap_or_default(),
1723                common,
1724            },
1725        ),
1726        _ => GitHubEventPayload::Other(common),
1727    };
1728    ProviderPayload::Known(KnownProviderPayload::GitHub(payload))
1729}
1730
1731fn github_promoted_string(raw: &JsonValue, field: &str) -> Option<String> {
1732    raw.get(field)
1733        .and_then(JsonValue::as_str)
1734        .map(ToString::to_string)
1735}
1736
1737fn github_promoted_i64(raw: &JsonValue, field: &str) -> Option<i64> {
1738    raw.get(field).and_then(JsonValue::as_i64)
1739}
1740
1741fn slack_payload(
1742    kind: &str,
1743    _headers: &BTreeMap<String, String>,
1744    raw: JsonValue,
1745) -> ProviderPayload {
1746    let event = raw.get("event");
1747    let common = SlackEventCommon {
1748        event: kind.to_string(),
1749        event_id: raw
1750            .get("event_id")
1751            .and_then(JsonValue::as_str)
1752            .map(ToString::to_string),
1753        api_app_id: raw
1754            .get("api_app_id")
1755            .and_then(JsonValue::as_str)
1756            .map(ToString::to_string),
1757        team_id: raw
1758            .get("team_id")
1759            .and_then(JsonValue::as_str)
1760            .map(ToString::to_string),
1761        channel_id: slack_channel_id(event),
1762        user_id: slack_user_id(event),
1763        event_ts: event
1764            .and_then(|value| value.get("event_ts"))
1765            .and_then(JsonValue::as_str)
1766            .map(ToString::to_string),
1767        raw: raw.clone(),
1768    };
1769    let payload = match kind {
1770        kind if kind == "message" || kind.starts_with("message.") => {
1771            SlackEventPayload::Message(SlackMessageEventPayload {
1772                subtype: event
1773                    .and_then(|value| value.get("subtype"))
1774                    .and_then(JsonValue::as_str)
1775                    .map(ToString::to_string),
1776                channel_type: event
1777                    .and_then(|value| value.get("channel_type"))
1778                    .and_then(JsonValue::as_str)
1779                    .map(ToString::to_string),
1780                channel: event
1781                    .and_then(|value| value.get("channel"))
1782                    .and_then(JsonValue::as_str)
1783                    .map(ToString::to_string),
1784                user: event
1785                    .and_then(|value| value.get("user"))
1786                    .and_then(JsonValue::as_str)
1787                    .map(ToString::to_string),
1788                text: event
1789                    .and_then(|value| value.get("text"))
1790                    .and_then(JsonValue::as_str)
1791                    .map(ToString::to_string),
1792                ts: event
1793                    .and_then(|value| value.get("ts"))
1794                    .and_then(JsonValue::as_str)
1795                    .map(ToString::to_string),
1796                thread_ts: event
1797                    .and_then(|value| value.get("thread_ts"))
1798                    .and_then(JsonValue::as_str)
1799                    .map(ToString::to_string),
1800                common,
1801            })
1802        }
1803        "app_mention" => SlackEventPayload::AppMention(SlackAppMentionEventPayload {
1804            channel: event
1805                .and_then(|value| value.get("channel"))
1806                .and_then(JsonValue::as_str)
1807                .map(ToString::to_string),
1808            user: event
1809                .and_then(|value| value.get("user"))
1810                .and_then(JsonValue::as_str)
1811                .map(ToString::to_string),
1812            text: event
1813                .and_then(|value| value.get("text"))
1814                .and_then(JsonValue::as_str)
1815                .map(ToString::to_string),
1816            ts: event
1817                .and_then(|value| value.get("ts"))
1818                .and_then(JsonValue::as_str)
1819                .map(ToString::to_string),
1820            thread_ts: event
1821                .and_then(|value| value.get("thread_ts"))
1822                .and_then(JsonValue::as_str)
1823                .map(ToString::to_string),
1824            common,
1825        }),
1826        "reaction_added" => SlackEventPayload::ReactionAdded(SlackReactionAddedEventPayload {
1827            reaction: event
1828                .and_then(|value| value.get("reaction"))
1829                .and_then(JsonValue::as_str)
1830                .map(ToString::to_string),
1831            item_user: event
1832                .and_then(|value| value.get("item_user"))
1833                .and_then(JsonValue::as_str)
1834                .map(ToString::to_string),
1835            item: event
1836                .and_then(|value| value.get("item"))
1837                .cloned()
1838                .unwrap_or(JsonValue::Null),
1839            common,
1840        }),
1841        "app_home_opened" => SlackEventPayload::AppHomeOpened(SlackAppHomeOpenedEventPayload {
1842            user: event
1843                .and_then(|value| value.get("user"))
1844                .and_then(JsonValue::as_str)
1845                .map(ToString::to_string),
1846            channel: event
1847                .and_then(|value| value.get("channel"))
1848                .and_then(JsonValue::as_str)
1849                .map(ToString::to_string),
1850            tab: event
1851                .and_then(|value| value.get("tab"))
1852                .and_then(JsonValue::as_str)
1853                .map(ToString::to_string),
1854            view: event
1855                .and_then(|value| value.get("view"))
1856                .cloned()
1857                .unwrap_or(JsonValue::Null),
1858            common,
1859        }),
1860        "assistant_thread_started" => {
1861            let assistant_thread = event
1862                .and_then(|value| value.get("assistant_thread"))
1863                .cloned()
1864                .unwrap_or(JsonValue::Null);
1865            SlackEventPayload::AssistantThreadStarted(SlackAssistantThreadStartedEventPayload {
1866                thread_ts: assistant_thread
1867                    .get("thread_ts")
1868                    .and_then(JsonValue::as_str)
1869                    .map(ToString::to_string),
1870                context: assistant_thread
1871                    .get("context")
1872                    .cloned()
1873                    .unwrap_or(JsonValue::Null),
1874                assistant_thread,
1875                common,
1876            })
1877        }
1878        _ => SlackEventPayload::Other(common),
1879    };
1880    ProviderPayload::Known(KnownProviderPayload::Slack(Box::new(payload)))
1881}
1882
1883fn slack_channel_id(event: Option<&JsonValue>) -> Option<String> {
1884    event
1885        .and_then(|value| value.get("channel"))
1886        .and_then(JsonValue::as_str)
1887        .map(ToString::to_string)
1888        .or_else(|| {
1889            event
1890                .and_then(|value| value.get("item"))
1891                .and_then(|value| value.get("channel"))
1892                .and_then(JsonValue::as_str)
1893                .map(ToString::to_string)
1894        })
1895        .or_else(|| {
1896            event
1897                .and_then(|value| value.get("channel"))
1898                .and_then(|value| value.get("id"))
1899                .and_then(JsonValue::as_str)
1900                .map(ToString::to_string)
1901        })
1902        .or_else(|| {
1903            event
1904                .and_then(|value| value.get("assistant_thread"))
1905                .and_then(|value| value.get("channel_id"))
1906                .and_then(JsonValue::as_str)
1907                .map(ToString::to_string)
1908        })
1909}
1910
1911fn slack_user_id(event: Option<&JsonValue>) -> Option<String> {
1912    event
1913        .and_then(|value| value.get("user"))
1914        .and_then(JsonValue::as_str)
1915        .map(ToString::to_string)
1916        .or_else(|| {
1917            event
1918                .and_then(|value| value.get("user"))
1919                .and_then(|value| value.get("id"))
1920                .and_then(JsonValue::as_str)
1921                .map(ToString::to_string)
1922        })
1923        .or_else(|| {
1924            event
1925                .and_then(|value| value.get("item_user"))
1926                .and_then(JsonValue::as_str)
1927                .map(ToString::to_string)
1928        })
1929        .or_else(|| {
1930            event
1931                .and_then(|value| value.get("assistant_thread"))
1932                .and_then(|value| value.get("user_id"))
1933                .and_then(JsonValue::as_str)
1934                .map(ToString::to_string)
1935        })
1936}
1937
1938fn linear_payload(
1939    _kind: &str,
1940    headers: &BTreeMap<String, String>,
1941    raw: JsonValue,
1942) -> ProviderPayload {
1943    let common = linear_event_common(headers, &raw);
1944    let event = common.event.clone();
1945    let payload = match event.as_str() {
1946        "issue" => LinearEventPayload::Issue(LinearIssueEventPayload {
1947            common,
1948            issue: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1949            changes: parse_linear_issue_changes(raw.get("updatedFrom")),
1950        }),
1951        "comment" => LinearEventPayload::IssueComment(LinearIssueCommentEventPayload {
1952            common,
1953            comment: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1954        }),
1955        "issue_label" => LinearEventPayload::IssueLabel(LinearIssueLabelEventPayload {
1956            common,
1957            label: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1958        }),
1959        "project" => LinearEventPayload::Project(LinearProjectEventPayload {
1960            common,
1961            project: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1962        }),
1963        "cycle" => LinearEventPayload::Cycle(LinearCycleEventPayload {
1964            common,
1965            cycle: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1966        }),
1967        "customer" => LinearEventPayload::Customer(LinearCustomerEventPayload {
1968            common,
1969            customer: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1970        }),
1971        "customer_request" => {
1972            LinearEventPayload::CustomerRequest(LinearCustomerRequestEventPayload {
1973                common,
1974                customer_request: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1975            })
1976        }
1977        _ => LinearEventPayload::Other(common),
1978    };
1979    ProviderPayload::Known(KnownProviderPayload::Linear(payload))
1980}
1981
1982fn linear_event_common(headers: &BTreeMap<String, String>, raw: &JsonValue) -> LinearEventCommon {
1983    LinearEventCommon {
1984        event: linear_event_name(
1985            raw.get("type")
1986                .and_then(JsonValue::as_str)
1987                .or_else(|| headers.get("Linear-Event").map(String::as_str)),
1988        ),
1989        action: raw
1990            .get("action")
1991            .and_then(JsonValue::as_str)
1992            .map(ToString::to_string),
1993        delivery_id: header_value(headers, "Linear-Delivery").map(ToString::to_string),
1994        organization_id: raw
1995            .get("organizationId")
1996            .and_then(JsonValue::as_str)
1997            .map(ToString::to_string),
1998        webhook_timestamp: raw.get("webhookTimestamp").and_then(parse_json_i64ish),
1999        webhook_id: raw
2000            .get("webhookId")
2001            .and_then(JsonValue::as_str)
2002            .map(ToString::to_string),
2003        url: raw
2004            .get("url")
2005            .and_then(JsonValue::as_str)
2006            .map(ToString::to_string),
2007        created_at: raw
2008            .get("createdAt")
2009            .and_then(JsonValue::as_str)
2010            .map(ToString::to_string),
2011        actor: raw.get("actor").cloned().unwrap_or(JsonValue::Null),
2012        raw: raw.clone(),
2013    }
2014}
2015
2016fn linear_event_name(raw_type: Option<&str>) -> String {
2017    match raw_type.unwrap_or_default().to_ascii_lowercase().as_str() {
2018        "issue" => "issue".to_string(),
2019        "comment" | "issuecomment" | "issue_comment" => "comment".to_string(),
2020        "issuelabel" | "issue_label" => "issue_label".to_string(),
2021        "project" | "projectupdate" | "project_update" => "project".to_string(),
2022        "cycle" => "cycle".to_string(),
2023        "customer" => "customer".to_string(),
2024        "customerrequest" | "customer_request" => "customer_request".to_string(),
2025        other if !other.is_empty() => other.to_string(),
2026        _ => "other".to_string(),
2027    }
2028}
2029
2030fn parse_linear_issue_changes(updated_from: Option<&JsonValue>) -> Vec<LinearIssueChange> {
2031    let Some(JsonValue::Object(fields)) = updated_from else {
2032        return Vec::new();
2033    };
2034    let mut changes = Vec::new();
2035    for (field, previous) in fields {
2036        let change = match field.as_str() {
2037            "title" => LinearIssueChange::Title {
2038                previous: previous.as_str().map(ToString::to_string),
2039            },
2040            "description" => LinearIssueChange::Description {
2041                previous: previous.as_str().map(ToString::to_string),
2042            },
2043            "priority" => LinearIssueChange::Priority {
2044                previous: parse_json_i64ish(previous),
2045            },
2046            "estimate" => LinearIssueChange::Estimate {
2047                previous: parse_json_i64ish(previous),
2048            },
2049            "stateId" => LinearIssueChange::StateId {
2050                previous: previous.as_str().map(ToString::to_string),
2051            },
2052            "teamId" => LinearIssueChange::TeamId {
2053                previous: previous.as_str().map(ToString::to_string),
2054            },
2055            "assigneeId" => LinearIssueChange::AssigneeId {
2056                previous: previous.as_str().map(ToString::to_string),
2057            },
2058            "projectId" => LinearIssueChange::ProjectId {
2059                previous: previous.as_str().map(ToString::to_string),
2060            },
2061            "cycleId" => LinearIssueChange::CycleId {
2062                previous: previous.as_str().map(ToString::to_string),
2063            },
2064            "dueDate" => LinearIssueChange::DueDate {
2065                previous: previous.as_str().map(ToString::to_string),
2066            },
2067            "parentId" => LinearIssueChange::ParentId {
2068                previous: previous.as_str().map(ToString::to_string),
2069            },
2070            "sortOrder" => LinearIssueChange::SortOrder {
2071                previous: previous.as_f64(),
2072            },
2073            "labelIds" => LinearIssueChange::LabelIds {
2074                previous: parse_string_array(previous),
2075            },
2076            "completedAt" => LinearIssueChange::CompletedAt {
2077                previous: previous.as_str().map(ToString::to_string),
2078            },
2079            _ => LinearIssueChange::Other {
2080                field: field.clone(),
2081                previous: previous.clone(),
2082            },
2083        };
2084        changes.push(change);
2085    }
2086    changes
2087}
2088
2089fn parse_json_i64ish(value: &JsonValue) -> Option<i64> {
2090    value
2091        .as_i64()
2092        .or_else(|| value.as_u64().and_then(|raw| i64::try_from(raw).ok()))
2093        .or_else(|| value.as_str().and_then(|raw| raw.parse::<i64>().ok()))
2094}
2095
2096fn parse_string_array(value: &JsonValue) -> Vec<String> {
2097    let Some(array) = value.as_array() else {
2098        return Vec::new();
2099    };
2100    array
2101        .iter()
2102        .filter_map(|entry| {
2103            entry.as_str().map(ToString::to_string).or_else(|| {
2104                entry
2105                    .get("id")
2106                    .and_then(JsonValue::as_str)
2107                    .map(ToString::to_string)
2108            })
2109        })
2110        .collect()
2111}
2112
2113fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
2114    headers
2115        .iter()
2116        .find(|(key, _)| key.eq_ignore_ascii_case(name))
2117        .map(|(_, value)| value.as_str())
2118}
2119
2120fn notion_payload(
2121    kind: &str,
2122    headers: &BTreeMap<String, String>,
2123    raw: JsonValue,
2124) -> ProviderPayload {
2125    let workspace_id = raw
2126        .get("workspace_id")
2127        .and_then(JsonValue::as_str)
2128        .map(ToString::to_string);
2129    ProviderPayload::Known(KnownProviderPayload::Notion(Box::new(NotionEventPayload {
2130        event: kind.to_string(),
2131        workspace_id,
2132        request_id: headers
2133            .get("request-id")
2134            .cloned()
2135            .or_else(|| headers.get("x-request-id").cloned()),
2136        subscription_id: raw
2137            .get("subscription_id")
2138            .and_then(JsonValue::as_str)
2139            .map(ToString::to_string),
2140        integration_id: raw
2141            .get("integration_id")
2142            .and_then(JsonValue::as_str)
2143            .map(ToString::to_string),
2144        attempt_number: raw
2145            .get("attempt_number")
2146            .and_then(JsonValue::as_u64)
2147            .and_then(|value| u32::try_from(value).ok()),
2148        entity_id: raw
2149            .get("entity")
2150            .and_then(|value| value.get("id"))
2151            .and_then(JsonValue::as_str)
2152            .map(ToString::to_string),
2153        entity_type: raw
2154            .get("entity")
2155            .and_then(|value| value.get("type"))
2156            .and_then(JsonValue::as_str)
2157            .map(ToString::to_string),
2158        api_version: raw
2159            .get("api_version")
2160            .and_then(JsonValue::as_str)
2161            .map(ToString::to_string),
2162        verification_token: raw
2163            .get("verification_token")
2164            .and_then(JsonValue::as_str)
2165            .map(ToString::to_string),
2166        polled: None,
2167        raw,
2168    })))
2169}
2170
2171fn cron_payload(
2172    _kind: &str,
2173    _headers: &BTreeMap<String, String>,
2174    raw: JsonValue,
2175) -> ProviderPayload {
2176    let cron_id = raw
2177        .get("cron_id")
2178        .and_then(JsonValue::as_str)
2179        .map(ToString::to_string);
2180    let schedule = raw
2181        .get("schedule")
2182        .and_then(JsonValue::as_str)
2183        .map(ToString::to_string);
2184    let tick_at = raw
2185        .get("tick_at")
2186        .and_then(JsonValue::as_str)
2187        .and_then(parse_rfc3339)
2188        .unwrap_or_else(OffsetDateTime::now_utc);
2189    ProviderPayload::Known(KnownProviderPayload::Cron(CronEventPayload {
2190        cron_id,
2191        schedule,
2192        tick_at,
2193        raw,
2194    }))
2195}
2196
2197fn webhook_payload(
2198    _kind: &str,
2199    headers: &BTreeMap<String, String>,
2200    raw: JsonValue,
2201) -> ProviderPayload {
2202    ProviderPayload::Known(KnownProviderPayload::Webhook(GenericWebhookPayload {
2203        source: headers.get("X-Webhook-Source").cloned(),
2204        content_type: headers.get("Content-Type").cloned(),
2205        raw,
2206    }))
2207}
2208
2209fn a2a_push_payload(
2210    _kind: &str,
2211    _headers: &BTreeMap<String, String>,
2212    raw: JsonValue,
2213) -> ProviderPayload {
2214    let task_id = raw
2215        .get("task_id")
2216        .and_then(JsonValue::as_str)
2217        .map(ToString::to_string);
2218    let sender = raw
2219        .get("sender")
2220        .and_then(JsonValue::as_str)
2221        .map(ToString::to_string);
2222    let task_state = raw
2223        .pointer("/status/state")
2224        .or_else(|| raw.pointer("/statusUpdate/status/state"))
2225        .and_then(JsonValue::as_str)
2226        .map(|state| match state {
2227            "cancelled" => "canceled".to_string(),
2228            other => other.to_string(),
2229        });
2230    let artifact = raw
2231        .pointer("/artifactUpdate/artifact")
2232        .or_else(|| raw.get("artifact"))
2233        .cloned();
2234    let kind = task_state
2235        .as_deref()
2236        .map(|state| format!("a2a.task.{state}"))
2237        .unwrap_or_else(|| "a2a.task.update".to_string());
2238    ProviderPayload::Known(KnownProviderPayload::A2aPush(A2aPushPayload {
2239        task_id,
2240        task_state,
2241        artifact,
2242        sender,
2243        raw,
2244        kind,
2245    }))
2246}
2247
2248fn kafka_payload(
2249    kind: &str,
2250    headers: &BTreeMap<String, String>,
2251    raw: JsonValue,
2252) -> ProviderPayload {
2253    ProviderPayload::Known(KnownProviderPayload::Kafka(stream_payload(
2254        kind, headers, raw,
2255    )))
2256}
2257
2258fn nats_payload(kind: &str, headers: &BTreeMap<String, String>, raw: JsonValue) -> ProviderPayload {
2259    ProviderPayload::Known(KnownProviderPayload::Nats(stream_payload(
2260        kind, headers, raw,
2261    )))
2262}
2263
2264fn pulsar_payload(
2265    kind: &str,
2266    headers: &BTreeMap<String, String>,
2267    raw: JsonValue,
2268) -> ProviderPayload {
2269    ProviderPayload::Known(KnownProviderPayload::Pulsar(stream_payload(
2270        kind, headers, raw,
2271    )))
2272}
2273
2274fn postgres_cdc_payload(
2275    kind: &str,
2276    headers: &BTreeMap<String, String>,
2277    raw: JsonValue,
2278) -> ProviderPayload {
2279    ProviderPayload::Known(KnownProviderPayload::PostgresCdc(stream_payload(
2280        kind, headers, raw,
2281    )))
2282}
2283
2284fn email_payload(
2285    kind: &str,
2286    headers: &BTreeMap<String, String>,
2287    raw: JsonValue,
2288) -> ProviderPayload {
2289    ProviderPayload::Known(KnownProviderPayload::Email(stream_payload(
2290        kind, headers, raw,
2291    )))
2292}
2293
2294fn websocket_payload(
2295    kind: &str,
2296    headers: &BTreeMap<String, String>,
2297    raw: JsonValue,
2298) -> ProviderPayload {
2299    ProviderPayload::Known(KnownProviderPayload::Websocket(stream_payload(
2300        kind, headers, raw,
2301    )))
2302}
2303
2304fn stream_payload(
2305    kind: &str,
2306    headers: &BTreeMap<String, String>,
2307    raw: JsonValue,
2308) -> StreamEventPayload {
2309    StreamEventPayload {
2310        event: kind.to_string(),
2311        source: json_stringish(&raw, &["source", "connector", "origin"]),
2312        stream: json_stringish(
2313            &raw,
2314            &["stream", "topic", "subject", "channel", "mailbox", "slot"],
2315        ),
2316        partition: json_stringish(&raw, &["partition", "shard", "consumer"]),
2317        offset: json_stringish(&raw, &["offset", "sequence", "lsn", "message_id"]),
2318        key: json_stringish(&raw, &["key", "message_key", "id", "event_id"]),
2319        timestamp: json_stringish(&raw, &["timestamp", "occurred_at", "received_at", "ts"]),
2320        headers: headers.clone(),
2321        raw,
2322    }
2323}
2324
2325fn json_stringish(raw: &JsonValue, fields: &[&str]) -> Option<String> {
2326    fields.iter().find_map(|field| {
2327        let value = raw.get(*field)?;
2328        value
2329            .as_str()
2330            .map(ToString::to_string)
2331            .or_else(|| parse_json_i64ish(value).map(|number| number.to_string()))
2332            .or_else(|| value.as_u64().map(|number| number.to_string()))
2333    })
2334}
2335
2336fn parse_rfc3339(text: &str) -> Option<OffsetDateTime> {
2337    OffsetDateTime::parse(text, &time::format_description::well_known::Rfc3339).ok()
2338}
2339
2340#[cfg(test)]
2341mod tests {
2342    use super::*;
2343
2344    struct OwnedProviderSchema {
2345        metadata: ProviderMetadata,
2346    }
2347
2348    impl OwnedProviderSchema {
2349        fn new(provider: &str, schema_name: &str) -> Self {
2350            Self {
2351                metadata: ProviderMetadata {
2352                    provider: provider.to_string(),
2353                    kinds: vec!["webhook".to_string()],
2354                    schema_name: schema_name.to_string(),
2355                    runtime: ProviderRuntimeMetadata::Placeholder,
2356                    ..ProviderMetadata::default()
2357                },
2358            }
2359        }
2360    }
2361
2362    impl ProviderSchema for OwnedProviderSchema {
2363        fn provider_id(&self) -> &str {
2364            &self.metadata.provider
2365        }
2366
2367        fn harn_schema_name(&self) -> &str {
2368            &self.metadata.schema_name
2369        }
2370
2371        fn metadata(&self) -> ProviderMetadata {
2372            self.metadata.clone()
2373        }
2374
2375        fn normalize(
2376            &self,
2377            _kind: &str,
2378            _headers: &BTreeMap<String, String>,
2379            raw: JsonValue,
2380        ) -> Result<ProviderPayload, ProviderCatalogError> {
2381            Ok(ProviderPayload::Extension(ExtensionProviderPayload {
2382                provider: self.metadata.provider.clone(),
2383                schema_name: self.metadata.schema_name.clone(),
2384                raw,
2385            }))
2386        }
2387    }
2388
2389    fn owned_provider_schema(provider: &str, schema_name: &str) -> Arc<dyn ProviderSchema> {
2390        Arc::new(OwnedProviderSchema::new(provider, schema_name))
2391    }
2392
2393    fn sample_headers() -> BTreeMap<String, String> {
2394        BTreeMap::from([
2395            ("Authorization".to_string(), "Bearer secret".to_string()),
2396            ("Cookie".to_string(), "session=abc".to_string()),
2397            ("User-Agent".to_string(), "GitHub-Hookshot/123".to_string()),
2398            ("X-GitHub-Delivery".to_string(), "delivery-123".to_string()),
2399            ("X-GitHub-Event".to_string(), "issues".to_string()),
2400            ("X-Webhook-Token".to_string(), "token".to_string()),
2401        ])
2402    }
2403
2404    #[test]
2405    fn default_redaction_policy_keeps_safe_headers() {
2406        let redacted = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
2407        assert_eq!(redacted.get("User-Agent").unwrap(), "GitHub-Hookshot/123");
2408        assert_eq!(redacted.get("X-GitHub-Delivery").unwrap(), "delivery-123");
2409        assert_eq!(
2410            redacted.get("Authorization").unwrap(),
2411            REDACTED_HEADER_VALUE
2412        );
2413        assert_eq!(redacted.get("Cookie").unwrap(), REDACTED_HEADER_VALUE);
2414        assert_eq!(
2415            redacted.get("X-Webhook-Token").unwrap(),
2416            REDACTED_HEADER_VALUE
2417        );
2418    }
2419
2420    #[test]
2421    fn provider_catalog_rejects_duplicates() {
2422        let mut catalog = ProviderCatalog::default();
2423        catalog
2424            .register(Arc::new(BuiltinProviderSchema {
2425                provider_id: "github",
2426                harn_schema_name: "GitHubEventPayload",
2427                metadata: provider_metadata_entry(
2428                    "github",
2429                    &["webhook"],
2430                    "GitHubEventPayload",
2431                    &[],
2432                    SignatureVerificationMetadata::None,
2433                    Vec::new(),
2434                    ProviderRuntimeMetadata::Placeholder,
2435                ),
2436                normalize: github_payload,
2437            }))
2438            .unwrap();
2439        let error = catalog
2440            .register(Arc::new(BuiltinProviderSchema {
2441                provider_id: "github",
2442                harn_schema_name: "GitHubEventPayload",
2443                metadata: provider_metadata_entry(
2444                    "github",
2445                    &["webhook"],
2446                    "GitHubEventPayload",
2447                    &[],
2448                    SignatureVerificationMetadata::None,
2449                    Vec::new(),
2450                    ProviderRuntimeMetadata::Placeholder,
2451                ),
2452                normalize: github_payload,
2453            }))
2454            .unwrap_err();
2455        assert_eq!(
2456            error,
2457            ProviderCatalogError::DuplicateProvider("github".to_string())
2458        );
2459    }
2460
2461    #[test]
2462    fn provider_catalog_builds_independent_owned_dynamic_catalogs() {
2463        let first = ProviderCatalog::with_defaults_and(vec![owned_provider_schema(
2464            "runtime-a",
2465            "RuntimeAPayload",
2466        )])
2467        .unwrap();
2468        assert_eq!(
2469            first
2470                .metadata_for("runtime-a")
2471                .expect("first dynamic provider")
2472                .schema_name,
2473            "RuntimeAPayload"
2474        );
2475        assert!(first.metadata_for("runtime-b").is_none());
2476
2477        let second = ProviderCatalog::with_defaults_and(vec![owned_provider_schema(
2478            "runtime-b",
2479            "RuntimeBPayload",
2480        )])
2481        .unwrap();
2482        assert!(second.metadata_for("runtime-a").is_none());
2483        assert_eq!(
2484            second
2485                .metadata_for("runtime-b")
2486                .expect("second dynamic provider")
2487                .schema_name,
2488            "RuntimeBPayload"
2489        );
2490        assert!(first.metadata_for("runtime-a").is_some());
2491    }
2492
2493    #[test]
2494    fn registered_provider_metadata_marks_builtin_connectors() {
2495        let entries = registered_provider_metadata();
2496        let builtin: Vec<&ProviderMetadata> = entries
2497            .iter()
2498            .filter(|entry| matches!(entry.runtime, ProviderRuntimeMetadata::Builtin { .. }))
2499            .collect();
2500
2501        assert_eq!(builtin.len(), 9);
2502        assert!(builtin.iter().any(|entry| entry.provider == "a2a-push"));
2503        assert!(builtin.iter().any(|entry| entry.provider == "cron"));
2504        assert!(builtin.iter().any(|entry| entry.provider == "webhook"));
2505        for provider in ["github", "linear", "notion", "slack"] {
2506            let entry = entries
2507                .iter()
2508                .find(|entry| entry.provider == provider)
2509                .expect("first-party package-backed provider metadata");
2510            assert!(matches!(
2511                entry.runtime,
2512                ProviderRuntimeMetadata::Placeholder
2513            ));
2514        }
2515        let kafka = entries
2516            .iter()
2517            .find(|entry| entry.provider == "kafka")
2518            .expect("kafka stream provider");
2519        assert_eq!(kafka.kinds, vec!["stream".to_string()]);
2520        assert_eq!(kafka.schema_name, "StreamEventPayload");
2521        assert!(matches!(
2522            kafka.runtime,
2523            ProviderRuntimeMetadata::Builtin {
2524                ref connector,
2525                default_signature_variant: None
2526            } if connector == "stream"
2527        ));
2528    }
2529
2530    #[test]
2531    fn trigger_event_round_trip_is_stable() {
2532        let provider = ProviderId::from("github");
2533        let headers = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
2534        let payload = ProviderPayload::normalize(
2535            &provider,
2536            "issues",
2537            &sample_headers(),
2538            serde_json::json!({
2539                "action": "opened",
2540                "installation": {"id": 42},
2541                "issue": {"number": 99}
2542            }),
2543        )
2544        .unwrap();
2545        let event = TriggerEvent {
2546            id: TriggerEventId("trigger_evt_fixed".to_string()),
2547            provider,
2548            kind: "issues".to_string(),
2549            received_at: parse_rfc3339("2026-04-19T07:00:00Z").unwrap(),
2550            occurred_at: Some(parse_rfc3339("2026-04-19T06:59:59Z").unwrap()),
2551            dedupe_key: "delivery-123".to_string(),
2552            trace_id: TraceId("trace_fixed".to_string()),
2553            tenant_id: Some(TenantId("tenant_1".to_string())),
2554            headers,
2555            provider_payload: payload,
2556            signature_status: SignatureStatus::Verified,
2557            dedupe_claimed: false,
2558            batch: None,
2559            raw_body: Some(vec![0, 159, 255, 10]),
2560        };
2561
2562        let once = serde_json::to_value(&event).unwrap();
2563        assert_eq!(once["raw_body"], serde_json::json!("AJ//Cg=="));
2564        let decoded: TriggerEvent = serde_json::from_value(once.clone()).unwrap();
2565        let twice = serde_json::to_value(&decoded).unwrap();
2566        assert_eq!(decoded, event);
2567        assert_eq!(once, twice);
2568    }
2569
2570    #[test]
2571    fn unknown_provider_errors() {
2572        let error = ProviderPayload::normalize(
2573            &ProviderId::from("custom-provider"),
2574            "thing.happened",
2575            &BTreeMap::new(),
2576            serde_json::json!({"ok": true}),
2577        )
2578        .unwrap_err();
2579        assert_eq!(
2580            error,
2581            ProviderCatalogError::UnknownProvider("custom-provider".to_string())
2582        );
2583    }
2584
2585    fn github_headers(event: &str, delivery: &str) -> BTreeMap<String, String> {
2586        BTreeMap::from([
2587            ("X-GitHub-Event".to_string(), event.to_string()),
2588            ("X-GitHub-Delivery".to_string(), delivery.to_string()),
2589        ])
2590    }
2591
2592    fn unwrap_github(payload: ProviderPayload) -> GitHubEventPayload {
2593        match payload {
2594            ProviderPayload::Known(KnownProviderPayload::GitHub(p)) => p,
2595            other => panic!("expected GitHub payload, got {other:?}"),
2596        }
2597    }
2598
2599    /// Mirror of the connector's normalized webhook payload shape: the
2600    /// connector wraps the original GitHub body as `raw` and promotes
2601    /// stable common + event-specific fields.
2602    fn connector_normalized(
2603        event: &str,
2604        delivery: &str,
2605        installation_id: i64,
2606        action: Option<&str>,
2607        original: serde_json::Value,
2608        promoted: serde_json::Value,
2609    ) -> serde_json::Value {
2610        let mut common = serde_json::json!({
2611            "provider": "github",
2612            "event": event,
2613            "topic": match action {
2614                Some(a) => format!("github.{event}.{a}"),
2615                None => format!("github.{event}"),
2616            },
2617            "delivery_id": delivery,
2618            "installation_id": installation_id,
2619            "repository": original.get("repository").cloned().unwrap_or(JsonValue::Null),
2620            "repo": serde_json::json!({"owner": "octo-org", "name": "octo-repo", "full_name": "octo-org/octo-repo"}),
2621            "raw": original,
2622        });
2623        if let Some(a) = action {
2624            common["action"] = serde_json::json!(a);
2625        }
2626        let common_obj = common.as_object_mut().unwrap();
2627        if let Some(promoted_obj) = promoted.as_object() {
2628            for (k, v) in promoted_obj {
2629                common_obj.insert(k.clone(), v.clone());
2630            }
2631        }
2632        common
2633    }
2634
2635    #[test]
2636    fn github_check_suite_event_promotes_typed_fields() {
2637        let original = serde_json::json!({
2638            "action": "requested",
2639            "check_suite": {
2640                "id": 8101,
2641                "status": "queued",
2642                "conclusion": null,
2643                "head_sha": "ccccccccccccccccccccccccccccccccccccccc1",
2644                "head_branch": "feature/x",
2645            },
2646            "repository": {"full_name": "octo-org/octo-repo"},
2647            "installation": {"id": 3001},
2648        });
2649        let normalized = connector_normalized(
2650            "check_suite",
2651            "delivery-cs",
2652            3001,
2653            Some("requested"),
2654            original.clone(),
2655            serde_json::json!({
2656                "check_suite": original["check_suite"].clone(),
2657                "check_suite_id": 8101,
2658                "head_sha": "ccccccccccccccccccccccccccccccccccccccc1",
2659                "head_ref": "feature/x",
2660                "status": "queued",
2661            }),
2662        );
2663        let provider = ProviderId::from("github");
2664        let payload = ProviderPayload::normalize(
2665            &provider,
2666            "check_suite",
2667            &github_headers("check_suite", "delivery-cs"),
2668            normalized,
2669        )
2670        .expect("check_suite payload");
2671        let GitHubEventPayload::CheckSuite(check_suite) = unwrap_github(payload) else {
2672            panic!("expected CheckSuite variant");
2673        };
2674        assert_eq!(check_suite.common.event, "check_suite");
2675        assert_eq!(check_suite.common.action.as_deref(), Some("requested"));
2676        assert_eq!(
2677            check_suite.common.delivery_id.as_deref(),
2678            Some("delivery-cs")
2679        );
2680        assert_eq!(check_suite.common.installation_id, Some(3001));
2681        assert_eq!(
2682            check_suite.common.topic.as_deref(),
2683            Some("github.check_suite.requested")
2684        );
2685        assert!(check_suite.common.repository.is_some());
2686        assert!(check_suite.common.repo.is_some());
2687        assert_eq!(check_suite.check_suite_id, Some(8101));
2688        assert_eq!(
2689            check_suite.head_sha.as_deref(),
2690            Some("ccccccccccccccccccccccccccccccccccccccc1")
2691        );
2692        assert_eq!(check_suite.head_ref.as_deref(), Some("feature/x"));
2693        assert_eq!(check_suite.status.as_deref(), Some("queued"));
2694        // Original body is preserved as the escape-hatch raw.
2695        assert_eq!(check_suite.common.raw, original);
2696    }
2697
2698    #[test]
2699    fn github_status_event_promotes_typed_fields() {
2700        let original = serde_json::json!({
2701            "id": 9101,
2702            "sha": "ccccccccccccccccccccccccccccccccccccccc1",
2703            "state": "success",
2704            "context": "legacy/status",
2705            "target_url": "https://ci.example.test/octo-repo/9101",
2706            "branches": [{"name": "main"}],
2707            "repository": {"full_name": "octo-org/octo-repo"},
2708            "installation": {"id": 3001},
2709        });
2710        let normalized = connector_normalized(
2711            "status",
2712            "delivery-status",
2713            3001,
2714            None,
2715            original.clone(),
2716            serde_json::json!({
2717                "commit_status": original.clone(),
2718                "status_id": 9101,
2719                "head_sha": "ccccccccccccccccccccccccccccccccccccccc1",
2720                "head_ref": "main",
2721                "base_ref": "main",
2722                "state": "success",
2723                "context": "legacy/status",
2724                "target_url": "https://ci.example.test/octo-repo/9101",
2725            }),
2726        );
2727        let provider = ProviderId::from("github");
2728        let payload = ProviderPayload::normalize(
2729            &provider,
2730            "status",
2731            &github_headers("status", "delivery-status"),
2732            normalized,
2733        )
2734        .expect("status payload");
2735        let GitHubEventPayload::Status(status) = unwrap_github(payload) else {
2736            panic!("expected Status variant");
2737        };
2738        assert_eq!(status.common.event, "status");
2739        assert_eq!(status.common.installation_id, Some(3001));
2740        assert_eq!(status.status_id, Some(9101));
2741        assert_eq!(status.state.as_deref(), Some("success"));
2742        assert_eq!(status.context.as_deref(), Some("legacy/status"));
2743        assert_eq!(
2744            status.target_url.as_deref(),
2745            Some("https://ci.example.test/octo-repo/9101")
2746        );
2747        assert_eq!(
2748            status.head_sha.as_deref(),
2749            Some("ccccccccccccccccccccccccccccccccccccccc1")
2750        );
2751        assert!(status.commit_status.is_some());
2752    }
2753
2754    #[test]
2755    fn github_merge_group_event_promotes_typed_fields() {
2756        let original = serde_json::json!({
2757            "action": "checks_requested",
2758            "merge_group": {
2759                "id": 9201,
2760                "head_ref": "gh-readonly-queue/main/pr-42",
2761                "head_sha": "ddddddddddddddddddddddddddddddddddddddd1",
2762                "base_ref": "main",
2763                "base_sha": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa1",
2764                "pull_requests": [{"number": 42}, {"number": 43}],
2765            },
2766            "repository": {"full_name": "octo-org/octo-repo"},
2767            "installation": {"id": 3001},
2768        });
2769        let normalized = connector_normalized(
2770            "merge_group",
2771            "delivery-mg",
2772            3001,
2773            Some("checks_requested"),
2774            original.clone(),
2775            serde_json::json!({
2776                "merge_group": original["merge_group"].clone(),
2777                "merge_group_id": 9201,
2778                "head_sha": "ddddddddddddddddddddddddddddddddddddddd1",
2779                "head_ref": "gh-readonly-queue/main/pr-42",
2780                "base_sha": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa1",
2781                "base_ref": "main",
2782                "pull_requests": [{"number": 42}, {"number": 43}],
2783                "pull_request_numbers": [42, 43],
2784            }),
2785        );
2786        let provider = ProviderId::from("github");
2787        let payload = ProviderPayload::normalize(
2788            &provider,
2789            "merge_group",
2790            &github_headers("merge_group", "delivery-mg"),
2791            normalized,
2792        )
2793        .expect("merge_group payload");
2794        let GitHubEventPayload::MergeGroup(mg) = unwrap_github(payload) else {
2795            panic!("expected MergeGroup variant");
2796        };
2797        assert_eq!(mg.common.event, "merge_group");
2798        assert_eq!(mg.common.action.as_deref(), Some("checks_requested"));
2799        assert_eq!(mg.merge_group_id, Some(serde_json::json!(9201)));
2800        assert_eq!(mg.head_ref.as_deref(), Some("gh-readonly-queue/main/pr-42"));
2801        assert_eq!(
2802            mg.base_sha.as_deref(),
2803            Some("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa1")
2804        );
2805        assert_eq!(mg.base_ref.as_deref(), Some("main"));
2806        assert_eq!(mg.pull_request_numbers, vec![42i64, 43i64]);
2807        assert_eq!(mg.pull_requests.len(), 2);
2808    }
2809
2810    #[test]
2811    fn github_installation_event_promotes_typed_fields() {
2812        let original = serde_json::json!({
2813            "action": "suspend",
2814            "installation": {
2815                "id": 3001,
2816                "account": {"login": "octo-org"},
2817                "repository_selection": "selected",
2818                "suspended_at": "2026-04-20T18:00:00Z",
2819            },
2820            "repositories": [{"full_name": "octo-org/octo-repo"}],
2821        });
2822        let normalized = connector_normalized(
2823            "installation",
2824            "delivery-inst",
2825            3001,
2826            Some("suspend"),
2827            original.clone(),
2828            serde_json::json!({
2829                "installation": original["installation"].clone(),
2830                "account": {"login": "octo-org"},
2831                "installation_state": "suspended",
2832                "suspended": true,
2833                "revoked": false,
2834                "repositories": original["repositories"].clone(),
2835            }),
2836        );
2837        let provider = ProviderId::from("github");
2838        let payload = ProviderPayload::normalize(
2839            &provider,
2840            "installation",
2841            &github_headers("installation", "delivery-inst"),
2842            normalized,
2843        )
2844        .expect("installation payload");
2845        let GitHubEventPayload::Installation(inst) = unwrap_github(payload) else {
2846            panic!("expected Installation variant");
2847        };
2848        assert_eq!(inst.common.event, "installation");
2849        assert_eq!(inst.common.action.as_deref(), Some("suspend"));
2850        assert_eq!(inst.installation_state.as_deref(), Some("suspended"));
2851        assert_eq!(inst.suspended, Some(true));
2852        assert_eq!(inst.revoked, Some(false));
2853        assert_eq!(inst.repositories.len(), 1);
2854        assert!(inst.account.is_some());
2855    }
2856
2857    #[test]
2858    fn github_installation_repositories_event_promotes_typed_fields() {
2859        let original = serde_json::json!({
2860            "action": "removed",
2861            "installation": {"id": 3001, "account": {"login": "octo-org"}},
2862            "repository_selection": "selected",
2863            "repositories_added": [],
2864            "repositories_removed": [
2865                {"id": 4001, "full_name": "octo-org/octo-repo"},
2866            ],
2867        });
2868        let normalized = connector_normalized(
2869            "installation_repositories",
2870            "delivery-inst-repos",
2871            3001,
2872            Some("removed"),
2873            original.clone(),
2874            serde_json::json!({
2875                "installation": original["installation"].clone(),
2876                "account": {"login": "octo-org"},
2877                "installation_state": "revoked",
2878                "suspended": false,
2879                "revoked": true,
2880                "repository_selection": "selected",
2881                "repositories_added": [],
2882                "repositories_removed": original["repositories_removed"].clone(),
2883            }),
2884        );
2885        let provider = ProviderId::from("github");
2886        let payload = ProviderPayload::normalize(
2887            &provider,
2888            "installation_repositories",
2889            &github_headers("installation_repositories", "delivery-inst-repos"),
2890            normalized,
2891        )
2892        .expect("installation_repositories payload");
2893        let GitHubEventPayload::InstallationRepositories(repos) = unwrap_github(payload) else {
2894            panic!("expected InstallationRepositories variant");
2895        };
2896        assert_eq!(repos.common.event, "installation_repositories");
2897        assert_eq!(repos.common.action.as_deref(), Some("removed"));
2898        assert_eq!(repos.repository_selection.as_deref(), Some("selected"));
2899        assert!(repos.repositories_added.is_empty());
2900        assert_eq!(repos.repositories_removed.len(), 1);
2901        assert_eq!(
2902            repos.repositories_removed[0]
2903                .get("full_name")
2904                .and_then(|v| v.as_str()),
2905            Some("octo-org/octo-repo"),
2906        );
2907        assert_eq!(repos.installation_state.as_deref(), Some("revoked"));
2908        assert_eq!(repos.revoked, Some(true));
2909    }
2910
2911    #[test]
2912    fn github_legacy_direct_webhook_still_normalizes() {
2913        // Direct GitHub webhook bodies (no connector wrapper) should
2914        // continue to populate installation_id from `installation.id` and
2915        // leave the new common fields unset.
2916        let provider = ProviderId::from("github");
2917        let payload = ProviderPayload::normalize(
2918            &provider,
2919            "issues",
2920            &github_headers("issues", "delivery-legacy"),
2921            serde_json::json!({
2922                "action": "opened",
2923                "installation": {"id": 99},
2924                "issue": {"number": 7},
2925            }),
2926        )
2927        .expect("legacy issues payload");
2928        let GitHubEventPayload::Issues(issues) = unwrap_github(payload) else {
2929            panic!("expected Issues variant");
2930        };
2931        assert_eq!(issues.common.installation_id, Some(99));
2932        assert_eq!(
2933            issues.common.delivery_id.as_deref(),
2934            Some("delivery-legacy")
2935        );
2936        assert!(issues.common.topic.is_none());
2937        assert!(issues.common.repo.is_none());
2938        assert_eq!(issues.issue.get("number").and_then(|v| v.as_i64()), Some(7));
2939    }
2940
2941    #[test]
2942    fn github_new_event_variants_round_trip_through_serde() {
2943        // Untagged enums match in declaration order. Make sure each new
2944        // event kind serializes and deserializes back to the same variant
2945        // — i.e. it is not silently absorbed into an earlier variant such
2946        // as `Push` (whose only required field is defaulted) or `Other`.
2947        let provider = ProviderId::from("github");
2948        let cases: &[(&str, serde_json::Value, &str)] = &[
2949            (
2950                "check_suite",
2951                serde_json::json!({
2952                    "event": "check_suite",
2953                    "check_suite": {"id": 1},
2954                    "check_suite_id": 1,
2955                    "raw": {"check_suite": {"id": 1}},
2956                }),
2957                "CheckSuite",
2958            ),
2959            (
2960                "status",
2961                serde_json::json!({
2962                    "event": "status",
2963                    "commit_status": {"id": 9},
2964                    "status_id": 9,
2965                    "state": "success",
2966                    "raw": {"id": 9, "state": "success"},
2967                }),
2968                "Status",
2969            ),
2970            (
2971                "merge_group",
2972                serde_json::json!({
2973                    "event": "merge_group",
2974                    "merge_group": {"id": 1},
2975                    "merge_group_id": 1,
2976                    "raw": {"merge_group": {"id": 1}},
2977                }),
2978                "MergeGroup",
2979            ),
2980            (
2981                "installation",
2982                serde_json::json!({
2983                    "event": "installation",
2984                    "installation": {"id": 1},
2985                    "installation_state": "active",
2986                    "suspended": false,
2987                    "raw": {"installation": {"id": 1}},
2988                }),
2989                "Installation",
2990            ),
2991            (
2992                "installation_repositories",
2993                serde_json::json!({
2994                    "event": "installation_repositories",
2995                    "installation": {"id": 1},
2996                    "repository_selection": "selected",
2997                    "repositories_added": [],
2998                    "repositories_removed": [{"id": 7}],
2999                    "raw": {"installation": {"id": 1}},
3000                }),
3001                "InstallationRepositories",
3002            ),
3003        ];
3004        for (kind, raw, want_variant) in cases {
3005            let payload = ProviderPayload::normalize(
3006                &provider,
3007                kind,
3008                &github_headers(kind, "delivery"),
3009                raw.clone(),
3010            )
3011            .unwrap_or_else(|_| panic!("normalize {kind}"));
3012            let serialized = serde_json::to_value(&payload).expect("serialize");
3013            let deserialized: ProviderPayload =
3014                serde_json::from_value(serialized.clone()).expect("deserialize");
3015            let actual_variant = match unwrap_github(deserialized) {
3016                GitHubEventPayload::Issues(_) => "Issues",
3017                GitHubEventPayload::PullRequest(_) => "PullRequest",
3018                GitHubEventPayload::IssueComment(_) => "IssueComment",
3019                GitHubEventPayload::PullRequestReview(_) => "PullRequestReview",
3020                GitHubEventPayload::Push(_) => "Push",
3021                GitHubEventPayload::WorkflowRun(_) => "WorkflowRun",
3022                GitHubEventPayload::DeploymentStatus(_) => "DeploymentStatus",
3023                GitHubEventPayload::CheckRun(_) => "CheckRun",
3024                GitHubEventPayload::CheckSuite(_) => "CheckSuite",
3025                GitHubEventPayload::Status(_) => "Status",
3026                GitHubEventPayload::MergeGroup(_) => "MergeGroup",
3027                GitHubEventPayload::Installation(_) => "Installation",
3028                GitHubEventPayload::InstallationRepositories(_) => "InstallationRepositories",
3029                GitHubEventPayload::Other(_) => "Other",
3030            };
3031            assert_eq!(
3032                actual_variant, *want_variant,
3033                "{kind} round-tripped as {actual_variant}, expected {want_variant}; serialized form: {serialized}"
3034            );
3035        }
3036    }
3037
3038    #[test]
3039    fn provider_normalizes_stream_payloads() {
3040        let payload = ProviderPayload::normalize(
3041            &ProviderId::from("kafka"),
3042            "quote.tick",
3043            &BTreeMap::from([("x-source".to_string(), "feed".to_string())]),
3044            serde_json::json!({
3045                "topic": "quotes",
3046                "partition": 7,
3047                "offset": "42",
3048                "key": "AAPL",
3049                "timestamp": "2026-04-21T12:00:00Z"
3050            }),
3051        )
3052        .expect("stream payload");
3053        let ProviderPayload::Known(KnownProviderPayload::Kafka(payload)) = payload else {
3054            panic!("expected kafka stream payload")
3055        };
3056        assert_eq!(payload.event, "quote.tick");
3057        assert_eq!(payload.stream.as_deref(), Some("quotes"));
3058        assert_eq!(payload.partition.as_deref(), Some("7"));
3059        assert_eq!(payload.offset.as_deref(), Some("42"));
3060        assert_eq!(payload.key.as_deref(), Some("AAPL"));
3061        assert_eq!(payload.timestamp.as_deref(), Some("2026-04-21T12:00:00Z"));
3062    }
3063}