Skip to main content

harn_vm/triggers/
event.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, OnceLock, RwLock};
3
4use serde::{Deserialize, Deserializer, Serialize, Serializer};
5use serde_json::Value as JsonValue;
6use time::OffsetDateTime;
7use uuid::Uuid;
8
9use crate::triggers::test_util::clock;
10
11const REDACTED_HEADER_VALUE: &str = "[redacted]";
12
13fn serialize_optional_bytes_b64<S>(
14    value: &Option<Vec<u8>>,
15    serializer: S,
16) -> Result<S::Ok, S::Error>
17where
18    S: Serializer,
19{
20    use base64::Engine;
21
22    match value {
23        Some(bytes) => {
24            serializer.serialize_some(&base64::engine::general_purpose::STANDARD.encode(bytes))
25        }
26        None => serializer.serialize_none(),
27    }
28}
29
30fn deserialize_optional_bytes_b64<'de, D>(deserializer: D) -> Result<Option<Vec<u8>>, D::Error>
31where
32    D: Deserializer<'de>,
33{
34    use base64::Engine;
35
36    let encoded = Option::<String>::deserialize(deserializer)?;
37    encoded
38        .map(|text| {
39            base64::engine::general_purpose::STANDARD
40                .decode(text.as_bytes())
41                .map_err(serde::de::Error::custom)
42        })
43        .transpose()
44}
45
46#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
47#[serde(transparent)]
48pub struct TriggerEventId(pub String);
49
50impl TriggerEventId {
51    pub fn new() -> Self {
52        Self(format!("trigger_evt_{}", Uuid::now_v7()))
53    }
54}
55
56impl Default for TriggerEventId {
57    fn default() -> Self {
58        Self::new()
59    }
60}
61
62#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
63#[serde(transparent)]
64pub struct ProviderId(pub String);
65
66impl ProviderId {
67    pub fn new(value: impl Into<String>) -> Self {
68        Self(value.into())
69    }
70
71    pub fn as_str(&self) -> &str {
72        self.0.as_str()
73    }
74}
75
76impl From<&str> for ProviderId {
77    fn from(value: &str) -> Self {
78        Self::new(value)
79    }
80}
81
82impl From<String> for ProviderId {
83    fn from(value: String) -> Self {
84        Self::new(value)
85    }
86}
87
88#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
89#[serde(transparent)]
90pub struct TraceId(pub String);
91
92impl TraceId {
93    pub fn new() -> Self {
94        Self(format!("trace_{}", Uuid::now_v7()))
95    }
96}
97
98impl Default for TraceId {
99    fn default() -> Self {
100        Self::new()
101    }
102}
103
104#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
105#[serde(transparent)]
106pub struct TenantId(pub String);
107
108impl TenantId {
109    pub fn new(value: impl Into<String>) -> Self {
110        Self(value.into())
111    }
112}
113
114#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
115#[serde(tag = "state", rename_all = "snake_case")]
116pub enum SignatureStatus {
117    Verified,
118    Unsigned,
119    Failed { reason: String },
120}
121
122#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
123pub struct GitHubEventCommon {
124    pub event: String,
125    pub action: Option<String>,
126    pub delivery_id: Option<String>,
127    pub installation_id: Option<i64>,
128    #[serde(default, skip_serializing_if = "Option::is_none")]
129    pub topic: Option<String>,
130    #[serde(default, skip_serializing_if = "Option::is_none")]
131    pub repository: Option<JsonValue>,
132    #[serde(default, skip_serializing_if = "Option::is_none")]
133    pub repo: Option<JsonValue>,
134    pub raw: JsonValue,
135}
136
137#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
138pub struct GitHubIssuesEventPayload {
139    #[serde(flatten)]
140    pub common: GitHubEventCommon,
141    pub issue: JsonValue,
142}
143
144#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
145pub struct GitHubPullRequestEventPayload {
146    #[serde(flatten)]
147    pub common: GitHubEventCommon,
148    pub pull_request: JsonValue,
149}
150
151#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
152pub struct GitHubIssueCommentEventPayload {
153    #[serde(flatten)]
154    pub common: GitHubEventCommon,
155    pub issue: JsonValue,
156    pub comment: JsonValue,
157}
158
159#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
160pub struct GitHubPullRequestReviewEventPayload {
161    #[serde(flatten)]
162    pub common: GitHubEventCommon,
163    pub pull_request: JsonValue,
164    pub review: JsonValue,
165}
166
167#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
168pub struct GitHubPushEventPayload {
169    #[serde(flatten)]
170    pub common: GitHubEventCommon,
171    #[serde(default)]
172    pub commits: Vec<JsonValue>,
173    pub distinct_size: Option<i64>,
174}
175
176#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
177pub struct GitHubWorkflowRunEventPayload {
178    #[serde(flatten)]
179    pub common: GitHubEventCommon,
180    pub workflow_run: JsonValue,
181}
182
183#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
184pub struct GitHubDeploymentStatusEventPayload {
185    #[serde(flatten)]
186    pub common: GitHubEventCommon,
187    pub deployment_status: JsonValue,
188    pub deployment: JsonValue,
189}
190
191#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
192pub struct GitHubCheckRunEventPayload {
193    #[serde(flatten)]
194    pub common: GitHubEventCommon,
195    pub check_run: JsonValue,
196}
197
198#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
199pub struct GitHubCheckSuiteEventPayload {
200    #[serde(flatten)]
201    pub common: GitHubEventCommon,
202    pub check_suite: JsonValue,
203    #[serde(default, skip_serializing_if = "Option::is_none")]
204    pub check_suite_id: Option<i64>,
205    #[serde(default, skip_serializing_if = "Option::is_none")]
206    pub pull_request_number: Option<i64>,
207    #[serde(default, skip_serializing_if = "Option::is_none")]
208    pub head_sha: Option<String>,
209    #[serde(default, skip_serializing_if = "Option::is_none")]
210    pub head_ref: Option<String>,
211    #[serde(default, skip_serializing_if = "Option::is_none")]
212    pub base_ref: Option<String>,
213    #[serde(default, skip_serializing_if = "Option::is_none")]
214    pub status: Option<String>,
215    #[serde(default, skip_serializing_if = "Option::is_none")]
216    pub conclusion: Option<String>,
217}
218
219#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
220pub struct GitHubStatusEventPayload {
221    #[serde(flatten)]
222    pub common: GitHubEventCommon,
223    #[serde(default, skip_serializing_if = "Option::is_none")]
224    pub commit_status: Option<JsonValue>,
225    #[serde(default, skip_serializing_if = "Option::is_none")]
226    pub status_id: Option<i64>,
227    #[serde(default, skip_serializing_if = "Option::is_none")]
228    pub head_sha: Option<String>,
229    #[serde(default, skip_serializing_if = "Option::is_none")]
230    pub head_ref: Option<String>,
231    #[serde(default, skip_serializing_if = "Option::is_none")]
232    pub base_ref: Option<String>,
233    #[serde(default, skip_serializing_if = "Option::is_none")]
234    pub state: Option<String>,
235    #[serde(default, skip_serializing_if = "Option::is_none")]
236    pub context: Option<String>,
237    #[serde(default, skip_serializing_if = "Option::is_none")]
238    pub target_url: Option<String>,
239}
240
241#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
242pub struct GitHubMergeGroupEventPayload {
243    #[serde(flatten)]
244    pub common: GitHubEventCommon,
245    pub merge_group: JsonValue,
246    #[serde(default, skip_serializing_if = "Option::is_none")]
247    pub merge_group_id: Option<JsonValue>,
248    #[serde(default, skip_serializing_if = "Option::is_none")]
249    pub head_sha: Option<String>,
250    #[serde(default, skip_serializing_if = "Option::is_none")]
251    pub head_ref: Option<String>,
252    #[serde(default, skip_serializing_if = "Option::is_none")]
253    pub base_sha: Option<String>,
254    #[serde(default, skip_serializing_if = "Option::is_none")]
255    pub base_ref: Option<String>,
256    #[serde(default)]
257    pub pull_requests: Vec<JsonValue>,
258    #[serde(default)]
259    pub pull_request_numbers: Vec<i64>,
260}
261
262#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
263pub struct GitHubInstallationEventPayload {
264    #[serde(flatten)]
265    pub common: GitHubEventCommon,
266    #[serde(default, skip_serializing_if = "Option::is_none")]
267    pub installation: Option<JsonValue>,
268    #[serde(default, skip_serializing_if = "Option::is_none")]
269    pub account: Option<JsonValue>,
270    #[serde(default, skip_serializing_if = "Option::is_none")]
271    pub installation_state: Option<String>,
272    #[serde(default, skip_serializing_if = "Option::is_none")]
273    pub suspended: Option<bool>,
274    #[serde(default, skip_serializing_if = "Option::is_none")]
275    pub revoked: Option<bool>,
276    #[serde(default)]
277    pub repositories: Vec<JsonValue>,
278}
279
280#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
281pub struct GitHubInstallationRepositoriesEventPayload {
282    #[serde(flatten)]
283    pub common: GitHubEventCommon,
284    #[serde(default, skip_serializing_if = "Option::is_none")]
285    pub installation: Option<JsonValue>,
286    #[serde(default, skip_serializing_if = "Option::is_none")]
287    pub account: Option<JsonValue>,
288    #[serde(default, skip_serializing_if = "Option::is_none")]
289    pub installation_state: Option<String>,
290    #[serde(default, skip_serializing_if = "Option::is_none")]
291    pub suspended: Option<bool>,
292    #[serde(default, skip_serializing_if = "Option::is_none")]
293    pub revoked: Option<bool>,
294    #[serde(default, skip_serializing_if = "Option::is_none")]
295    pub repository_selection: Option<String>,
296    #[serde(default)]
297    pub repositories_added: Vec<JsonValue>,
298    #[serde(default)]
299    pub repositories_removed: Vec<JsonValue>,
300}
301
302#[derive(Clone, Debug, PartialEq, Serialize)]
303#[serde(untagged)]
304pub enum GitHubEventPayload {
305    Issues(GitHubIssuesEventPayload),
306    PullRequest(GitHubPullRequestEventPayload),
307    IssueComment(GitHubIssueCommentEventPayload),
308    PullRequestReview(GitHubPullRequestReviewEventPayload),
309    Push(GitHubPushEventPayload),
310    WorkflowRun(GitHubWorkflowRunEventPayload),
311    DeploymentStatus(GitHubDeploymentStatusEventPayload),
312    CheckRun(GitHubCheckRunEventPayload),
313    CheckSuite(GitHubCheckSuiteEventPayload),
314    Status(GitHubStatusEventPayload),
315    MergeGroup(GitHubMergeGroupEventPayload),
316    Installation(GitHubInstallationEventPayload),
317    InstallationRepositories(GitHubInstallationRepositoriesEventPayload),
318    Other(GitHubEventCommon),
319}
320
321// Manual `Deserialize` that dispatches on the `event` field. An untagged
322// enum cannot reliably round-trip these variants because `Push` and the
323// all-optional installation/status variants will accept payloads that
324// belong to a different event kind.
325impl<'de> Deserialize<'de> for GitHubEventPayload {
326    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
327    where
328        D: Deserializer<'de>,
329    {
330        let value = JsonValue::deserialize(deserializer)?;
331        let kind = value
332            .get("event")
333            .and_then(JsonValue::as_str)
334            .unwrap_or("")
335            .to_string();
336        let from_value = |v: JsonValue| -> Result<GitHubEventPayload, D::Error> {
337            let payload = match kind.as_str() {
338                "issues" => GitHubEventPayload::Issues(
339                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
340                ),
341                "pull_request" => GitHubEventPayload::PullRequest(
342                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
343                ),
344                "issue_comment" => GitHubEventPayload::IssueComment(
345                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
346                ),
347                "pull_request_review" => GitHubEventPayload::PullRequestReview(
348                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
349                ),
350                "push" => GitHubEventPayload::Push(
351                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
352                ),
353                "workflow_run" => GitHubEventPayload::WorkflowRun(
354                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
355                ),
356                "deployment_status" => GitHubEventPayload::DeploymentStatus(
357                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
358                ),
359                "check_run" => GitHubEventPayload::CheckRun(
360                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
361                ),
362                "check_suite" => GitHubEventPayload::CheckSuite(
363                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
364                ),
365                "status" => GitHubEventPayload::Status(
366                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
367                ),
368                "merge_group" => GitHubEventPayload::MergeGroup(
369                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
370                ),
371                "installation" => GitHubEventPayload::Installation(
372                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
373                ),
374                "installation_repositories" => GitHubEventPayload::InstallationRepositories(
375                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
376                ),
377                _ => GitHubEventPayload::Other(
378                    serde_json::from_value(v).map_err(serde::de::Error::custom)?,
379                ),
380            };
381            Ok(payload)
382        };
383        from_value(value)
384    }
385}
386
387#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
388pub struct SlackEventCommon {
389    pub event: String,
390    pub event_id: Option<String>,
391    pub api_app_id: Option<String>,
392    pub team_id: Option<String>,
393    pub channel_id: Option<String>,
394    pub user_id: Option<String>,
395    pub event_ts: Option<String>,
396    pub raw: JsonValue,
397}
398
399#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
400pub struct SlackMessageEventPayload {
401    #[serde(flatten)]
402    pub common: SlackEventCommon,
403    pub subtype: Option<String>,
404    pub channel_type: Option<String>,
405    pub channel: Option<String>,
406    pub user: Option<String>,
407    pub text: Option<String>,
408    pub ts: Option<String>,
409    pub thread_ts: Option<String>,
410}
411
412#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
413pub struct SlackAppMentionEventPayload {
414    #[serde(flatten)]
415    pub common: SlackEventCommon,
416    pub channel: Option<String>,
417    pub user: Option<String>,
418    pub text: Option<String>,
419    pub ts: Option<String>,
420    pub thread_ts: Option<String>,
421}
422
423#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
424pub struct SlackReactionAddedEventPayload {
425    #[serde(flatten)]
426    pub common: SlackEventCommon,
427    pub reaction: Option<String>,
428    pub item_user: Option<String>,
429    pub item: JsonValue,
430}
431
432#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
433pub struct SlackAppHomeOpenedEventPayload {
434    #[serde(flatten)]
435    pub common: SlackEventCommon,
436    pub user: Option<String>,
437    pub channel: Option<String>,
438    pub tab: Option<String>,
439    pub view: JsonValue,
440}
441
442#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
443pub struct SlackAssistantThreadStartedEventPayload {
444    #[serde(flatten)]
445    pub common: SlackEventCommon,
446    pub assistant_thread: JsonValue,
447    pub thread_ts: Option<String>,
448    pub context: JsonValue,
449}
450
451#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
452#[serde(untagged)]
453pub enum SlackEventPayload {
454    Message(SlackMessageEventPayload),
455    AppMention(SlackAppMentionEventPayload),
456    ReactionAdded(SlackReactionAddedEventPayload),
457    AppHomeOpened(SlackAppHomeOpenedEventPayload),
458    AssistantThreadStarted(SlackAssistantThreadStartedEventPayload),
459    Other(SlackEventCommon),
460}
461
462#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
463pub struct LinearEventCommon {
464    pub event: String,
465    pub action: Option<String>,
466    pub delivery_id: Option<String>,
467    pub organization_id: Option<String>,
468    pub webhook_timestamp: Option<i64>,
469    pub webhook_id: Option<String>,
470    pub url: Option<String>,
471    pub created_at: Option<String>,
472    pub actor: JsonValue,
473    pub raw: JsonValue,
474}
475
476#[derive(Clone, Debug, PartialEq)]
477pub enum LinearIssueChange {
478    Title { previous: Option<String> },
479    Description { previous: Option<String> },
480    Priority { previous: Option<i64> },
481    Estimate { previous: Option<i64> },
482    StateId { previous: Option<String> },
483    TeamId { previous: Option<String> },
484    AssigneeId { previous: Option<String> },
485    ProjectId { previous: Option<String> },
486    CycleId { previous: Option<String> },
487    DueDate { previous: Option<String> },
488    ParentId { previous: Option<String> },
489    SortOrder { previous: Option<f64> },
490    LabelIds { previous: Vec<String> },
491    CompletedAt { previous: Option<String> },
492    Other { field: String, previous: JsonValue },
493}
494
495impl Serialize for LinearIssueChange {
496    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
497    where
498        S: Serializer,
499    {
500        let value = match self {
501            Self::Title { previous } => {
502                serde_json::json!({ "field_name": "title", "previous": previous })
503            }
504            Self::Description { previous } => {
505                serde_json::json!({ "field_name": "description", "previous": previous })
506            }
507            Self::Priority { previous } => {
508                serde_json::json!({ "field_name": "priority", "previous": previous })
509            }
510            Self::Estimate { previous } => {
511                serde_json::json!({ "field_name": "estimate", "previous": previous })
512            }
513            Self::StateId { previous } => {
514                serde_json::json!({ "field_name": "state_id", "previous": previous })
515            }
516            Self::TeamId { previous } => {
517                serde_json::json!({ "field_name": "team_id", "previous": previous })
518            }
519            Self::AssigneeId { previous } => {
520                serde_json::json!({ "field_name": "assignee_id", "previous": previous })
521            }
522            Self::ProjectId { previous } => {
523                serde_json::json!({ "field_name": "project_id", "previous": previous })
524            }
525            Self::CycleId { previous } => {
526                serde_json::json!({ "field_name": "cycle_id", "previous": previous })
527            }
528            Self::DueDate { previous } => {
529                serde_json::json!({ "field_name": "due_date", "previous": previous })
530            }
531            Self::ParentId { previous } => {
532                serde_json::json!({ "field_name": "parent_id", "previous": previous })
533            }
534            Self::SortOrder { previous } => {
535                serde_json::json!({ "field_name": "sort_order", "previous": previous })
536            }
537            Self::LabelIds { previous } => {
538                serde_json::json!({ "field_name": "label_ids", "previous": previous })
539            }
540            Self::CompletedAt { previous } => {
541                serde_json::json!({ "field_name": "completed_at", "previous": previous })
542            }
543            Self::Other { field, previous } => {
544                serde_json::json!({ "field_name": "other", "field": field, "previous": previous })
545            }
546        };
547        value.serialize(serializer)
548    }
549}
550
551impl<'de> Deserialize<'de> for LinearIssueChange {
552    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
553    where
554        D: Deserializer<'de>,
555    {
556        let value = JsonValue::deserialize(deserializer)?;
557        let field_name = value
558            .get("field_name")
559            .and_then(JsonValue::as_str)
560            .ok_or_else(|| serde::de::Error::custom("linear issue change missing field_name"))?;
561        let previous = value.get("previous").cloned().unwrap_or(JsonValue::Null);
562        Ok(match field_name {
563            "title" => Self::Title {
564                previous: previous.as_str().map(ToString::to_string),
565            },
566            "description" => Self::Description {
567                previous: previous.as_str().map(ToString::to_string),
568            },
569            "priority" => Self::Priority {
570                previous: parse_json_i64ish(&previous),
571            },
572            "estimate" => Self::Estimate {
573                previous: parse_json_i64ish(&previous),
574            },
575            "state_id" => Self::StateId {
576                previous: previous.as_str().map(ToString::to_string),
577            },
578            "team_id" => Self::TeamId {
579                previous: previous.as_str().map(ToString::to_string),
580            },
581            "assignee_id" => Self::AssigneeId {
582                previous: previous.as_str().map(ToString::to_string),
583            },
584            "project_id" => Self::ProjectId {
585                previous: previous.as_str().map(ToString::to_string),
586            },
587            "cycle_id" => Self::CycleId {
588                previous: previous.as_str().map(ToString::to_string),
589            },
590            "due_date" => Self::DueDate {
591                previous: previous.as_str().map(ToString::to_string),
592            },
593            "parent_id" => Self::ParentId {
594                previous: previous.as_str().map(ToString::to_string),
595            },
596            "sort_order" => Self::SortOrder {
597                previous: previous.as_f64(),
598            },
599            "label_ids" => Self::LabelIds {
600                previous: parse_string_array(&previous),
601            },
602            "completed_at" => Self::CompletedAt {
603                previous: previous.as_str().map(ToString::to_string),
604            },
605            "other" => Self::Other {
606                field: value
607                    .get("field")
608                    .and_then(JsonValue::as_str)
609                    .map(ToString::to_string)
610                    .unwrap_or_else(|| "unknown".to_string()),
611                previous,
612            },
613            other => Self::Other {
614                field: other.to_string(),
615                previous,
616            },
617        })
618    }
619}
620
621#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
622pub struct LinearIssueEventPayload {
623    #[serde(flatten)]
624    pub common: LinearEventCommon,
625    pub issue: JsonValue,
626    #[serde(default)]
627    pub changes: Vec<LinearIssueChange>,
628}
629
630#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
631pub struct LinearIssueCommentEventPayload {
632    #[serde(flatten)]
633    pub common: LinearEventCommon,
634    pub comment: JsonValue,
635}
636
637#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
638pub struct LinearIssueLabelEventPayload {
639    #[serde(flatten)]
640    pub common: LinearEventCommon,
641    pub label: JsonValue,
642}
643
644#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
645pub struct LinearProjectEventPayload {
646    #[serde(flatten)]
647    pub common: LinearEventCommon,
648    pub project: JsonValue,
649}
650
651#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
652pub struct LinearCycleEventPayload {
653    #[serde(flatten)]
654    pub common: LinearEventCommon,
655    pub cycle: JsonValue,
656}
657
658#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
659pub struct LinearCustomerEventPayload {
660    #[serde(flatten)]
661    pub common: LinearEventCommon,
662    pub customer: JsonValue,
663}
664
665#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
666pub struct LinearCustomerRequestEventPayload {
667    #[serde(flatten)]
668    pub common: LinearEventCommon,
669    pub customer_request: JsonValue,
670}
671
672#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
673#[serde(untagged)]
674pub enum LinearEventPayload {
675    Issue(LinearIssueEventPayload),
676    IssueComment(LinearIssueCommentEventPayload),
677    IssueLabel(LinearIssueLabelEventPayload),
678    Project(LinearProjectEventPayload),
679    Cycle(LinearCycleEventPayload),
680    Customer(LinearCustomerEventPayload),
681    CustomerRequest(LinearCustomerRequestEventPayload),
682    Other(LinearEventCommon),
683}
684
685#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
686pub struct NotionPolledChangeEvent {
687    pub resource: String,
688    pub source_id: String,
689    pub entity_id: String,
690    pub high_water_mark: String,
691    pub before: Option<JsonValue>,
692    pub after: JsonValue,
693}
694
695#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
696pub struct NotionEventPayload {
697    pub event: String,
698    pub workspace_id: Option<String>,
699    pub request_id: Option<String>,
700    pub subscription_id: Option<String>,
701    pub integration_id: Option<String>,
702    pub attempt_number: Option<u32>,
703    pub entity_id: Option<String>,
704    pub entity_type: Option<String>,
705    pub api_version: Option<String>,
706    pub verification_token: Option<String>,
707    pub polled: Option<NotionPolledChangeEvent>,
708    pub raw: JsonValue,
709}
710
711#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
712pub struct CronEventPayload {
713    pub cron_id: Option<String>,
714    pub schedule: Option<String>,
715    #[serde(with = "time::serde::rfc3339")]
716    pub tick_at: OffsetDateTime,
717    pub raw: JsonValue,
718}
719
720#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
721pub struct GenericWebhookPayload {
722    pub source: Option<String>,
723    pub content_type: Option<String>,
724    pub raw: JsonValue,
725}
726
727#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
728pub struct A2aPushPayload {
729    pub task_id: Option<String>,
730    pub task_state: Option<String>,
731    pub artifact: Option<JsonValue>,
732    pub sender: Option<String>,
733    pub raw: JsonValue,
734    pub kind: String,
735}
736
737#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
738pub struct StreamEventPayload {
739    pub event: String,
740    pub source: Option<String>,
741    pub stream: Option<String>,
742    pub partition: Option<String>,
743    pub offset: Option<String>,
744    pub key: Option<String>,
745    pub timestamp: Option<String>,
746    #[serde(default)]
747    pub headers: BTreeMap<String, String>,
748    pub raw: JsonValue,
749}
750
751#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
752pub struct ExtensionProviderPayload {
753    pub provider: String,
754    pub schema_name: String,
755    pub raw: JsonValue,
756}
757
758#[allow(clippy::large_enum_variant)]
759#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
760#[serde(untagged)]
761pub enum ProviderPayload {
762    Known(KnownProviderPayload),
763    Extension(ExtensionProviderPayload),
764}
765
766impl ProviderPayload {
767    pub fn provider(&self) -> &str {
768        match self {
769            Self::Known(known) => known.provider(),
770            Self::Extension(payload) => payload.provider.as_str(),
771        }
772    }
773
774    pub fn normalize(
775        provider: &ProviderId,
776        kind: &str,
777        headers: &BTreeMap<String, String>,
778        raw: JsonValue,
779    ) -> Result<Self, ProviderCatalogError> {
780        provider_catalog()
781            .read()
782            .expect("provider catalog poisoned")
783            .normalize(provider, kind, headers, raw)
784    }
785}
786
787#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
788#[serde(tag = "provider")]
789pub enum KnownProviderPayload {
790    #[serde(rename = "github")]
791    GitHub(GitHubEventPayload),
792    #[serde(rename = "slack")]
793    Slack(Box<SlackEventPayload>),
794    #[serde(rename = "linear")]
795    Linear(LinearEventPayload),
796    #[serde(rename = "notion")]
797    Notion(Box<NotionEventPayload>),
798    #[serde(rename = "cron")]
799    Cron(CronEventPayload),
800    #[serde(rename = "webhook")]
801    Webhook(GenericWebhookPayload),
802    #[serde(rename = "a2a-push")]
803    A2aPush(A2aPushPayload),
804    #[serde(rename = "kafka")]
805    Kafka(StreamEventPayload),
806    #[serde(rename = "nats")]
807    Nats(StreamEventPayload),
808    #[serde(rename = "pulsar")]
809    Pulsar(StreamEventPayload),
810    #[serde(rename = "postgres-cdc")]
811    PostgresCdc(StreamEventPayload),
812    #[serde(rename = "email")]
813    Email(StreamEventPayload),
814    #[serde(rename = "websocket")]
815    Websocket(StreamEventPayload),
816}
817
818impl KnownProviderPayload {
819    pub fn provider(&self) -> &str {
820        match self {
821            Self::GitHub(_) => "github",
822            Self::Slack(_) => "slack",
823            Self::Linear(_) => "linear",
824            Self::Notion(_) => "notion",
825            Self::Cron(_) => "cron",
826            Self::Webhook(_) => "webhook",
827            Self::A2aPush(_) => "a2a-push",
828            Self::Kafka(_) => "kafka",
829            Self::Nats(_) => "nats",
830            Self::Pulsar(_) => "pulsar",
831            Self::PostgresCdc(_) => "postgres-cdc",
832            Self::Email(_) => "email",
833            Self::Websocket(_) => "websocket",
834        }
835    }
836}
837
838#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
839pub struct TriggerEvent {
840    pub id: TriggerEventId,
841    pub provider: ProviderId,
842    pub kind: String,
843    #[serde(with = "time::serde::rfc3339")]
844    pub received_at: OffsetDateTime,
845    #[serde(with = "time::serde::rfc3339::option")]
846    pub occurred_at: Option<OffsetDateTime>,
847    pub dedupe_key: String,
848    pub trace_id: TraceId,
849    pub tenant_id: Option<TenantId>,
850    pub headers: BTreeMap<String, String>,
851    #[serde(default, skip_serializing_if = "Option::is_none")]
852    pub batch: Option<Vec<JsonValue>>,
853    #[serde(
854        default,
855        skip_serializing_if = "Option::is_none",
856        serialize_with = "serialize_optional_bytes_b64",
857        deserialize_with = "deserialize_optional_bytes_b64"
858    )]
859    pub raw_body: Option<Vec<u8>>,
860    pub provider_payload: ProviderPayload,
861    pub signature_status: SignatureStatus,
862    #[serde(skip)]
863    pub dedupe_claimed: bool,
864}
865
866impl TriggerEvent {
867    #[allow(clippy::too_many_arguments)]
868    pub fn new(
869        provider: ProviderId,
870        kind: impl Into<String>,
871        occurred_at: Option<OffsetDateTime>,
872        dedupe_key: impl Into<String>,
873        tenant_id: Option<TenantId>,
874        headers: BTreeMap<String, String>,
875        provider_payload: ProviderPayload,
876        signature_status: SignatureStatus,
877    ) -> Self {
878        Self {
879            id: TriggerEventId::new(),
880            provider,
881            kind: kind.into(),
882            received_at: clock::now_utc(),
883            occurred_at,
884            dedupe_key: dedupe_key.into(),
885            trace_id: TraceId::new(),
886            tenant_id,
887            headers,
888            batch: None,
889            raw_body: None,
890            provider_payload,
891            signature_status,
892            dedupe_claimed: false,
893        }
894    }
895
896    pub fn dedupe_claimed(&self) -> bool {
897        self.dedupe_claimed
898    }
899
900    pub fn mark_dedupe_claimed(&mut self) {
901        self.dedupe_claimed = true;
902    }
903}
904
905#[derive(Clone, Debug, PartialEq, Eq)]
906pub struct HeaderRedactionPolicy {
907    safe_exact_names: BTreeSet<String>,
908}
909
910impl HeaderRedactionPolicy {
911    pub fn with_safe_header(mut self, name: impl Into<String>) -> Self {
912        self.safe_exact_names
913            .insert(name.into().to_ascii_lowercase());
914        self
915    }
916
917    fn should_keep(&self, name: &str) -> bool {
918        let lower = name.to_ascii_lowercase();
919        if self.safe_exact_names.contains(lower.as_str()) {
920            return true;
921        }
922        matches!(
923            lower.as_str(),
924            "user-agent"
925                | "request-id"
926                | "x-request-id"
927                | "x-correlation-id"
928                | "content-type"
929                | "content-length"
930                | "x-github-event"
931                | "x-github-delivery"
932                | "x-github-hook-id"
933                | "x-hub-signature-256"
934                | "x-slack-request-timestamp"
935                | "x-slack-signature"
936                | "x-linear-signature"
937                | "x-notion-signature"
938                | "x-a2a-signature"
939                | "x-a2a-delivery"
940        ) || lower.ends_with("-event")
941            || lower.ends_with("-delivery")
942            || lower.contains("timestamp")
943            || lower.contains("request-id")
944    }
945
946    fn should_redact(&self, name: &str) -> bool {
947        let lower = name.to_ascii_lowercase();
948        if self.should_keep(lower.as_str()) {
949            return false;
950        }
951        lower.contains("authorization")
952            || lower.contains("cookie")
953            || lower.contains("secret")
954            || lower.contains("token")
955            || lower.contains("key")
956    }
957}
958
959impl Default for HeaderRedactionPolicy {
960    fn default() -> Self {
961        Self {
962            safe_exact_names: BTreeSet::from([
963                "content-length".to_string(),
964                "content-type".to_string(),
965                "request-id".to_string(),
966                "user-agent".to_string(),
967                "x-a2a-delivery".to_string(),
968                "x-a2a-signature".to_string(),
969                "x-correlation-id".to_string(),
970                "x-github-delivery".to_string(),
971                "x-github-event".to_string(),
972                "x-github-hook-id".to_string(),
973                "x-hub-signature-256".to_string(),
974                "x-linear-signature".to_string(),
975                "x-notion-signature".to_string(),
976                "x-request-id".to_string(),
977                "x-slack-request-timestamp".to_string(),
978                "x-slack-signature".to_string(),
979            ]),
980        }
981    }
982}
983
984pub fn redact_headers(
985    headers: &BTreeMap<String, String>,
986    policy: &HeaderRedactionPolicy,
987) -> BTreeMap<String, String> {
988    headers
989        .iter()
990        .map(|(name, value)| {
991            if policy.should_redact(name) {
992                (name.clone(), REDACTED_HEADER_VALUE.to_string())
993            } else {
994                (name.clone(), value.clone())
995            }
996        })
997        .collect()
998}
999
1000#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1001pub struct ProviderSecretRequirement {
1002    pub name: String,
1003    pub required: bool,
1004    pub namespace: String,
1005}
1006
1007#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
1008pub struct ProviderOutboundMethod {
1009    pub name: String,
1010}
1011
1012#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
1013#[serde(tag = "kind", rename_all = "snake_case")]
1014pub enum SignatureVerificationMetadata {
1015    #[default]
1016    None,
1017    Hmac {
1018        variant: String,
1019        raw_body: bool,
1020        signature_header: String,
1021        timestamp_header: Option<String>,
1022        id_header: Option<String>,
1023        default_tolerance_secs: Option<i64>,
1024        digest: String,
1025        encoding: String,
1026    },
1027}
1028
1029#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
1030#[serde(tag = "kind", rename_all = "snake_case")]
1031pub enum ProviderRuntimeMetadata {
1032    Builtin {
1033        connector: String,
1034        default_signature_variant: Option<String>,
1035    },
1036    #[default]
1037    Placeholder,
1038}
1039
1040#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
1041pub struct ProviderMetadata {
1042    pub provider: String,
1043    #[serde(default)]
1044    pub kinds: Vec<String>,
1045    pub schema_name: String,
1046    #[serde(default)]
1047    pub outbound_methods: Vec<ProviderOutboundMethod>,
1048    #[serde(default)]
1049    pub secret_requirements: Vec<ProviderSecretRequirement>,
1050    #[serde(default)]
1051    pub signature_verification: SignatureVerificationMetadata,
1052    #[serde(default)]
1053    pub runtime: ProviderRuntimeMetadata,
1054}
1055
1056impl ProviderMetadata {
1057    pub fn supports_kind(&self, kind: &str) -> bool {
1058        self.kinds.iter().any(|candidate| candidate == kind)
1059    }
1060
1061    pub fn required_secret_names(&self) -> impl Iterator<Item = &str> {
1062        self.secret_requirements
1063            .iter()
1064            .filter(|requirement| requirement.required)
1065            .map(|requirement| requirement.name.as_str())
1066    }
1067}
1068
1069pub trait ProviderSchema: Send + Sync {
1070    fn provider_id(&self) -> &str;
1071    fn harn_schema_name(&self) -> &str;
1072    fn metadata(&self) -> ProviderMetadata {
1073        ProviderMetadata {
1074            provider: self.provider_id().to_string(),
1075            schema_name: self.harn_schema_name().to_string(),
1076            ..ProviderMetadata::default()
1077        }
1078    }
1079    fn normalize(
1080        &self,
1081        kind: &str,
1082        headers: &BTreeMap<String, String>,
1083        raw: JsonValue,
1084    ) -> Result<ProviderPayload, ProviderCatalogError>;
1085}
1086
1087#[derive(Clone, Debug, PartialEq, Eq)]
1088pub enum ProviderCatalogError {
1089    DuplicateProvider(String),
1090    UnknownProvider(String),
1091}
1092
1093impl std::fmt::Display for ProviderCatalogError {
1094    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1095        match self {
1096            Self::DuplicateProvider(provider) => {
1097                write!(f, "provider `{provider}` is already registered")
1098            }
1099            Self::UnknownProvider(provider) => write!(f, "provider `{provider}` is not registered"),
1100        }
1101    }
1102}
1103
1104impl std::error::Error for ProviderCatalogError {}
1105
1106#[derive(Clone, Default)]
1107pub struct ProviderCatalog {
1108    providers: BTreeMap<String, Arc<dyn ProviderSchema>>,
1109}
1110
1111impl ProviderCatalog {
1112    pub fn with_defaults() -> Self {
1113        let mut catalog = Self::default();
1114        for schema in default_provider_schemas() {
1115            catalog
1116                .register(schema)
1117                .expect("default providers must register cleanly");
1118        }
1119        catalog
1120    }
1121
1122    pub fn with_defaults_and(
1123        schemas: Vec<Arc<dyn ProviderSchema>>,
1124    ) -> Result<Self, ProviderCatalogError> {
1125        let mut catalog = Self::with_defaults();
1126        let builtin_providers: BTreeSet<String> = catalog.schema_names().into_keys().collect();
1127        for schema in schemas {
1128            if builtin_providers.contains(schema.provider_id()) {
1129                continue;
1130            }
1131            catalog.register(schema)?;
1132        }
1133        Ok(catalog)
1134    }
1135
1136    pub fn register(
1137        &mut self,
1138        schema: Arc<dyn ProviderSchema>,
1139    ) -> Result<(), ProviderCatalogError> {
1140        let provider = schema.provider_id().to_string();
1141        if self.providers.contains_key(provider.as_str()) {
1142            return Err(ProviderCatalogError::DuplicateProvider(provider));
1143        }
1144        self.providers.insert(provider, schema);
1145        Ok(())
1146    }
1147
1148    pub fn normalize(
1149        &self,
1150        provider: &ProviderId,
1151        kind: &str,
1152        headers: &BTreeMap<String, String>,
1153        raw: JsonValue,
1154    ) -> Result<ProviderPayload, ProviderCatalogError> {
1155        let schema = self
1156            .providers
1157            .get(provider.as_str())
1158            .ok_or_else(|| ProviderCatalogError::UnknownProvider(provider.0.clone()))?;
1159        schema.normalize(kind, headers, raw)
1160    }
1161
1162    pub fn schema_names(&self) -> BTreeMap<String, String> {
1163        self.providers
1164            .iter()
1165            .map(|(provider, schema)| (provider.clone(), schema.harn_schema_name().to_string()))
1166            .collect()
1167    }
1168
1169    pub fn entries(&self) -> Vec<ProviderMetadata> {
1170        self.providers
1171            .values()
1172            .map(|schema| schema.metadata())
1173            .collect()
1174    }
1175
1176    pub fn metadata_for(&self, provider: &str) -> Option<ProviderMetadata> {
1177        self.providers.get(provider).map(|schema| schema.metadata())
1178    }
1179}
1180
1181pub fn register_provider_schema(
1182    schema: Arc<dyn ProviderSchema>,
1183) -> Result<(), ProviderCatalogError> {
1184    provider_catalog()
1185        .write()
1186        .expect("provider catalog poisoned")
1187        .register(schema)
1188}
1189
1190pub fn reset_provider_catalog() {
1191    *provider_catalog()
1192        .write()
1193        .expect("provider catalog poisoned") = ProviderCatalog::with_defaults();
1194}
1195
1196pub fn reset_provider_catalog_with(
1197    schemas: Vec<Arc<dyn ProviderSchema>>,
1198) -> Result<(), ProviderCatalogError> {
1199    let catalog = ProviderCatalog::with_defaults_and(schemas)?;
1200    install_provider_catalog(catalog);
1201    Ok(())
1202}
1203
1204pub fn install_provider_catalog(catalog: ProviderCatalog) {
1205    *provider_catalog()
1206        .write()
1207        .expect("provider catalog poisoned") = catalog;
1208}
1209
1210pub fn registered_provider_schema_names() -> BTreeMap<String, String> {
1211    provider_catalog()
1212        .read()
1213        .expect("provider catalog poisoned")
1214        .schema_names()
1215}
1216
1217pub fn registered_provider_metadata() -> Vec<ProviderMetadata> {
1218    provider_catalog()
1219        .read()
1220        .expect("provider catalog poisoned")
1221        .entries()
1222}
1223
1224pub fn provider_metadata(provider: &str) -> Option<ProviderMetadata> {
1225    provider_catalog()
1226        .read()
1227        .expect("provider catalog poisoned")
1228        .metadata_for(provider)
1229}
1230
1231fn provider_catalog() -> &'static RwLock<ProviderCatalog> {
1232    static PROVIDER_CATALOG: OnceLock<RwLock<ProviderCatalog>> = OnceLock::new();
1233    PROVIDER_CATALOG.get_or_init(|| RwLock::new(ProviderCatalog::with_defaults()))
1234}
1235
1236struct BuiltinProviderSchema {
1237    provider_id: &'static str,
1238    harn_schema_name: &'static str,
1239    metadata: ProviderMetadata,
1240    normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
1241}
1242
1243impl ProviderSchema for BuiltinProviderSchema {
1244    fn provider_id(&self) -> &str {
1245        self.provider_id
1246    }
1247
1248    fn harn_schema_name(&self) -> &str {
1249        self.harn_schema_name
1250    }
1251
1252    fn metadata(&self) -> ProviderMetadata {
1253        self.metadata.clone()
1254    }
1255
1256    fn normalize(
1257        &self,
1258        kind: &str,
1259        headers: &BTreeMap<String, String>,
1260        raw: JsonValue,
1261    ) -> Result<ProviderPayload, ProviderCatalogError> {
1262        Ok((self.normalize)(kind, headers, raw))
1263    }
1264}
1265
1266fn provider_metadata_entry(
1267    provider: &str,
1268    kinds: &[&str],
1269    schema_name: &str,
1270    outbound_methods: &[&str],
1271    signature_verification: SignatureVerificationMetadata,
1272    secret_requirements: Vec<ProviderSecretRequirement>,
1273    runtime: ProviderRuntimeMetadata,
1274) -> ProviderMetadata {
1275    ProviderMetadata {
1276        provider: provider.to_string(),
1277        kinds: kinds.iter().map(|kind| kind.to_string()).collect(),
1278        schema_name: schema_name.to_string(),
1279        outbound_methods: outbound_methods
1280            .iter()
1281            .map(|name| ProviderOutboundMethod {
1282                name: (*name).to_string(),
1283            })
1284            .collect(),
1285        secret_requirements,
1286        signature_verification,
1287        runtime,
1288    }
1289}
1290
1291fn hmac_signature_metadata(
1292    variant: &str,
1293    signature_header: &str,
1294    timestamp_header: Option<&str>,
1295    id_header: Option<&str>,
1296    default_tolerance_secs: Option<i64>,
1297    encoding: &str,
1298) -> SignatureVerificationMetadata {
1299    SignatureVerificationMetadata::Hmac {
1300        variant: variant.to_string(),
1301        raw_body: true,
1302        signature_header: signature_header.to_string(),
1303        timestamp_header: timestamp_header.map(ToString::to_string),
1304        id_header: id_header.map(ToString::to_string),
1305        default_tolerance_secs,
1306        digest: "sha256".to_string(),
1307        encoding: encoding.to_string(),
1308    }
1309}
1310
1311fn required_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1312    ProviderSecretRequirement {
1313        name: name.to_string(),
1314        required: true,
1315        namespace: namespace.to_string(),
1316    }
1317}
1318
1319fn outbound_method(name: &str) -> ProviderOutboundMethod {
1320    ProviderOutboundMethod {
1321        name: name.to_string(),
1322    }
1323}
1324
1325fn optional_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1326    ProviderSecretRequirement {
1327        name: name.to_string(),
1328        required: false,
1329        namespace: namespace.to_string(),
1330    }
1331}
1332
1333fn default_provider_schemas() -> Vec<Arc<dyn ProviderSchema>> {
1334    vec![
1335        Arc::new(BuiltinProviderSchema {
1336            provider_id: "github",
1337            harn_schema_name: "GitHubEventPayload",
1338            metadata: provider_metadata_entry(
1339                "github",
1340                &["webhook"],
1341                "GitHubEventPayload",
1342                &[],
1343                hmac_signature_metadata(
1344                    "github",
1345                    "X-Hub-Signature-256",
1346                    None,
1347                    Some("X-GitHub-Delivery"),
1348                    None,
1349                    "hex",
1350                ),
1351                vec![required_secret("signing_secret", "github")],
1352                ProviderRuntimeMetadata::Placeholder,
1353            ),
1354            normalize: github_payload,
1355        }),
1356        Arc::new(BuiltinProviderSchema {
1357            provider_id: "slack",
1358            harn_schema_name: "SlackEventPayload",
1359            metadata: provider_metadata_entry(
1360                "slack",
1361                &["webhook"],
1362                "SlackEventPayload",
1363                &[
1364                    "post_message",
1365                    "update_message",
1366                    "add_reaction",
1367                    "open_view",
1368                    "user_info",
1369                    "api_call",
1370                    "upload_file",
1371                ],
1372                hmac_signature_metadata(
1373                    "slack",
1374                    "X-Slack-Signature",
1375                    Some("X-Slack-Request-Timestamp"),
1376                    None,
1377                    Some(300),
1378                    "hex",
1379                ),
1380                vec![required_secret("signing_secret", "slack")],
1381                ProviderRuntimeMetadata::Placeholder,
1382            ),
1383            normalize: slack_payload,
1384        }),
1385        Arc::new(BuiltinProviderSchema {
1386            provider_id: "linear",
1387            harn_schema_name: "LinearEventPayload",
1388            metadata: {
1389                let mut metadata = provider_metadata_entry(
1390                    "linear",
1391                    &["webhook"],
1392                    "LinearEventPayload",
1393                    &[],
1394                    hmac_signature_metadata(
1395                        "linear",
1396                        "Linear-Signature",
1397                        None,
1398                        Some("Linear-Delivery"),
1399                        Some(75),
1400                        "hex",
1401                    ),
1402                    vec![
1403                        required_secret("signing_secret", "linear"),
1404                        optional_secret("access_token", "linear"),
1405                    ],
1406                    ProviderRuntimeMetadata::Placeholder,
1407                );
1408                metadata.outbound_methods = vec![
1409                    ProviderOutboundMethod {
1410                        name: "list_issues".to_string(),
1411                    },
1412                    ProviderOutboundMethod {
1413                        name: "update_issue".to_string(),
1414                    },
1415                    ProviderOutboundMethod {
1416                        name: "create_comment".to_string(),
1417                    },
1418                    ProviderOutboundMethod {
1419                        name: "search".to_string(),
1420                    },
1421                    ProviderOutboundMethod {
1422                        name: "graphql".to_string(),
1423                    },
1424                ];
1425                metadata
1426            },
1427            normalize: linear_payload,
1428        }),
1429        Arc::new(BuiltinProviderSchema {
1430            provider_id: "notion",
1431            harn_schema_name: "NotionEventPayload",
1432            metadata: {
1433                let mut metadata = provider_metadata_entry(
1434                    "notion",
1435                    &["webhook", "poll"],
1436                    "NotionEventPayload",
1437                    &[],
1438                    hmac_signature_metadata(
1439                        "notion",
1440                        "X-Notion-Signature",
1441                        None,
1442                        None,
1443                        None,
1444                        "hex",
1445                    ),
1446                    vec![required_secret("verification_token", "notion")],
1447                    ProviderRuntimeMetadata::Placeholder,
1448                );
1449                metadata.outbound_methods = vec![
1450                    outbound_method("get_page"),
1451                    outbound_method("update_page"),
1452                    outbound_method("append_blocks"),
1453                    outbound_method("query_database"),
1454                    outbound_method("search"),
1455                    outbound_method("create_comment"),
1456                    outbound_method("api_call"),
1457                ];
1458                metadata
1459            },
1460            normalize: notion_payload,
1461        }),
1462        Arc::new(BuiltinProviderSchema {
1463            provider_id: "cron",
1464            harn_schema_name: "CronEventPayload",
1465            metadata: provider_metadata_entry(
1466                "cron",
1467                &["cron"],
1468                "CronEventPayload",
1469                &[],
1470                SignatureVerificationMetadata::None,
1471                Vec::new(),
1472                ProviderRuntimeMetadata::Builtin {
1473                    connector: "cron".to_string(),
1474                    default_signature_variant: None,
1475                },
1476            ),
1477            normalize: cron_payload,
1478        }),
1479        Arc::new(BuiltinProviderSchema {
1480            provider_id: "webhook",
1481            harn_schema_name: "GenericWebhookPayload",
1482            metadata: provider_metadata_entry(
1483                "webhook",
1484                &["webhook"],
1485                "GenericWebhookPayload",
1486                &[],
1487                hmac_signature_metadata(
1488                    "standard",
1489                    "webhook-signature",
1490                    Some("webhook-timestamp"),
1491                    Some("webhook-id"),
1492                    Some(300),
1493                    "base64",
1494                ),
1495                vec![required_secret("signing_secret", "webhook")],
1496                ProviderRuntimeMetadata::Builtin {
1497                    connector: "webhook".to_string(),
1498                    default_signature_variant: Some("standard".to_string()),
1499                },
1500            ),
1501            normalize: webhook_payload,
1502        }),
1503        Arc::new(BuiltinProviderSchema {
1504            provider_id: "a2a-push",
1505            harn_schema_name: "A2aPushPayload",
1506            metadata: provider_metadata_entry(
1507                "a2a-push",
1508                &["a2a-push"],
1509                "A2aPushPayload",
1510                &[],
1511                SignatureVerificationMetadata::None,
1512                Vec::new(),
1513                ProviderRuntimeMetadata::Builtin {
1514                    connector: "a2a-push".to_string(),
1515                    default_signature_variant: None,
1516                },
1517            ),
1518            normalize: a2a_push_payload,
1519        }),
1520        Arc::new(stream_provider_schema("kafka", kafka_payload)),
1521        Arc::new(stream_provider_schema("nats", nats_payload)),
1522        Arc::new(stream_provider_schema("pulsar", pulsar_payload)),
1523        Arc::new(stream_provider_schema("postgres-cdc", postgres_cdc_payload)),
1524        Arc::new(stream_provider_schema("email", email_payload)),
1525        Arc::new(stream_provider_schema("websocket", websocket_payload)),
1526    ]
1527}
1528
1529fn stream_provider_schema(
1530    provider_id: &'static str,
1531    normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
1532) -> BuiltinProviderSchema {
1533    BuiltinProviderSchema {
1534        provider_id,
1535        harn_schema_name: "StreamEventPayload",
1536        metadata: provider_metadata_entry(
1537            provider_id,
1538            &["stream"],
1539            "StreamEventPayload",
1540            &[],
1541            SignatureVerificationMetadata::None,
1542            Vec::new(),
1543            ProviderRuntimeMetadata::Builtin {
1544                connector: "stream".to_string(),
1545                default_signature_variant: None,
1546            },
1547        ),
1548        normalize,
1549    }
1550}
1551
1552fn github_payload(
1553    kind: &str,
1554    headers: &BTreeMap<String, String>,
1555    raw: JsonValue,
1556) -> ProviderPayload {
1557    // The connector emits a normalized payload that wraps the original
1558    // GitHub webhook body inside its own `raw` field. When we see that
1559    // wrapper shape, prefer it as the escape-hatch raw so callers don't
1560    // have to traverse two levels of `raw` to reach the upstream payload.
1561    let original_raw = raw
1562        .get("raw")
1563        .filter(|value| value.is_object())
1564        .cloned()
1565        .unwrap_or_else(|| raw.clone());
1566    let common = GitHubEventCommon {
1567        event: kind.to_string(),
1568        action: raw
1569            .get("action")
1570            .and_then(JsonValue::as_str)
1571            .map(ToString::to_string),
1572        delivery_id: raw
1573            .get("delivery_id")
1574            .and_then(JsonValue::as_str)
1575            .map(ToString::to_string)
1576            .or_else(|| headers.get("X-GitHub-Delivery").cloned()),
1577        installation_id: raw
1578            .get("installation_id")
1579            .and_then(JsonValue::as_i64)
1580            .or_else(|| {
1581                raw.get("installation")
1582                    .and_then(|value| value.get("id"))
1583                    .and_then(JsonValue::as_i64)
1584            }),
1585        topic: raw
1586            .get("topic")
1587            .and_then(JsonValue::as_str)
1588            .map(ToString::to_string),
1589        repository: raw.get("repository").cloned(),
1590        repo: raw.get("repo").cloned(),
1591        raw: original_raw,
1592    };
1593    let payload = match kind {
1594        "issues" => GitHubEventPayload::Issues(GitHubIssuesEventPayload {
1595            common,
1596            issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1597        }),
1598        "pull_request" => GitHubEventPayload::PullRequest(GitHubPullRequestEventPayload {
1599            common,
1600            pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1601        }),
1602        "issue_comment" => GitHubEventPayload::IssueComment(GitHubIssueCommentEventPayload {
1603            common,
1604            issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1605            comment: raw.get("comment").cloned().unwrap_or(JsonValue::Null),
1606        }),
1607        "pull_request_review" => {
1608            GitHubEventPayload::PullRequestReview(GitHubPullRequestReviewEventPayload {
1609                common,
1610                pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1611                review: raw.get("review").cloned().unwrap_or(JsonValue::Null),
1612            })
1613        }
1614        "push" => GitHubEventPayload::Push(GitHubPushEventPayload {
1615            common,
1616            commits: raw
1617                .get("commits")
1618                .and_then(JsonValue::as_array)
1619                .cloned()
1620                .unwrap_or_default(),
1621            distinct_size: raw.get("distinct_size").and_then(JsonValue::as_i64),
1622        }),
1623        "workflow_run" => GitHubEventPayload::WorkflowRun(GitHubWorkflowRunEventPayload {
1624            common,
1625            workflow_run: raw.get("workflow_run").cloned().unwrap_or(JsonValue::Null),
1626        }),
1627        "deployment_status" => {
1628            GitHubEventPayload::DeploymentStatus(GitHubDeploymentStatusEventPayload {
1629                common,
1630                deployment_status: raw
1631                    .get("deployment_status")
1632                    .cloned()
1633                    .unwrap_or(JsonValue::Null),
1634                deployment: raw.get("deployment").cloned().unwrap_or(JsonValue::Null),
1635            })
1636        }
1637        "check_run" => GitHubEventPayload::CheckRun(GitHubCheckRunEventPayload {
1638            common,
1639            check_run: raw.get("check_run").cloned().unwrap_or(JsonValue::Null),
1640        }),
1641        "check_suite" => {
1642            let check_suite = raw.get("check_suite").cloned().unwrap_or(JsonValue::Null);
1643            GitHubEventPayload::CheckSuite(GitHubCheckSuiteEventPayload {
1644                check_suite_id: github_promoted_i64(&raw, "check_suite_id")
1645                    .or_else(|| check_suite.get("id").and_then(JsonValue::as_i64)),
1646                pull_request_number: github_promoted_i64(&raw, "pull_request_number"),
1647                head_sha: github_promoted_string(&raw, "head_sha"),
1648                head_ref: github_promoted_string(&raw, "head_ref"),
1649                base_ref: github_promoted_string(&raw, "base_ref"),
1650                status: github_promoted_string(&raw, "status"),
1651                conclusion: github_promoted_string(&raw, "conclusion"),
1652                common,
1653                check_suite,
1654            })
1655        }
1656        "status" => GitHubEventPayload::Status(GitHubStatusEventPayload {
1657            commit_status: raw
1658                .get("commit_status")
1659                .cloned()
1660                .or_else(|| Some(common.raw.clone())),
1661            status_id: github_promoted_i64(&raw, "status_id")
1662                .or_else(|| common.raw.get("id").and_then(JsonValue::as_i64)),
1663            head_sha: github_promoted_string(&raw, "head_sha").or_else(|| {
1664                common
1665                    .raw
1666                    .get("sha")
1667                    .and_then(JsonValue::as_str)
1668                    .map(ToString::to_string)
1669            }),
1670            head_ref: github_promoted_string(&raw, "head_ref"),
1671            base_ref: github_promoted_string(&raw, "base_ref"),
1672            state: github_promoted_string(&raw, "state"),
1673            context: github_promoted_string(&raw, "context"),
1674            target_url: github_promoted_string(&raw, "target_url"),
1675            common,
1676        }),
1677        "merge_group" => {
1678            let merge_group = raw.get("merge_group").cloned().unwrap_or(JsonValue::Null);
1679            GitHubEventPayload::MergeGroup(GitHubMergeGroupEventPayload {
1680                merge_group_id: raw
1681                    .get("merge_group_id")
1682                    .cloned()
1683                    .or_else(|| merge_group.get("id").cloned()),
1684                head_sha: github_promoted_string(&raw, "head_sha").or_else(|| {
1685                    merge_group
1686                        .get("head_sha")
1687                        .and_then(JsonValue::as_str)
1688                        .map(ToString::to_string)
1689                }),
1690                head_ref: github_promoted_string(&raw, "head_ref").or_else(|| {
1691                    merge_group
1692                        .get("head_ref")
1693                        .and_then(JsonValue::as_str)
1694                        .map(ToString::to_string)
1695                }),
1696                base_sha: github_promoted_string(&raw, "base_sha").or_else(|| {
1697                    merge_group
1698                        .get("base_sha")
1699                        .and_then(JsonValue::as_str)
1700                        .map(ToString::to_string)
1701                }),
1702                base_ref: github_promoted_string(&raw, "base_ref").or_else(|| {
1703                    merge_group
1704                        .get("base_ref")
1705                        .and_then(JsonValue::as_str)
1706                        .map(ToString::to_string)
1707                }),
1708                pull_requests: raw
1709                    .get("pull_requests")
1710                    .and_then(JsonValue::as_array)
1711                    .cloned()
1712                    .unwrap_or_default(),
1713                pull_request_numbers: raw
1714                    .get("pull_request_numbers")
1715                    .and_then(JsonValue::as_array)
1716                    .map(|values| {
1717                        values
1718                            .iter()
1719                            .filter_map(JsonValue::as_i64)
1720                            .collect::<Vec<_>>()
1721                    })
1722                    .unwrap_or_default(),
1723                common,
1724                merge_group,
1725            })
1726        }
1727        "installation" => GitHubEventPayload::Installation(GitHubInstallationEventPayload {
1728            installation: raw.get("installation").cloned(),
1729            account: raw.get("account").cloned(),
1730            installation_state: github_promoted_string(&raw, "installation_state"),
1731            suspended: raw.get("suspended").and_then(JsonValue::as_bool),
1732            revoked: raw.get("revoked").and_then(JsonValue::as_bool),
1733            repositories: raw
1734                .get("repositories")
1735                .and_then(JsonValue::as_array)
1736                .cloned()
1737                .unwrap_or_default(),
1738            common,
1739        }),
1740        "installation_repositories" => GitHubEventPayload::InstallationRepositories(
1741            GitHubInstallationRepositoriesEventPayload {
1742                installation: raw.get("installation").cloned(),
1743                account: raw.get("account").cloned(),
1744                installation_state: github_promoted_string(&raw, "installation_state"),
1745                suspended: raw.get("suspended").and_then(JsonValue::as_bool),
1746                revoked: raw.get("revoked").and_then(JsonValue::as_bool),
1747                repository_selection: github_promoted_string(&raw, "repository_selection"),
1748                repositories_added: raw
1749                    .get("repositories_added")
1750                    .and_then(JsonValue::as_array)
1751                    .cloned()
1752                    .unwrap_or_default(),
1753                repositories_removed: raw
1754                    .get("repositories_removed")
1755                    .and_then(JsonValue::as_array)
1756                    .cloned()
1757                    .unwrap_or_default(),
1758                common,
1759            },
1760        ),
1761        _ => GitHubEventPayload::Other(common),
1762    };
1763    ProviderPayload::Known(KnownProviderPayload::GitHub(payload))
1764}
1765
1766fn github_promoted_string(raw: &JsonValue, field: &str) -> Option<String> {
1767    raw.get(field)
1768        .and_then(JsonValue::as_str)
1769        .map(ToString::to_string)
1770}
1771
1772fn github_promoted_i64(raw: &JsonValue, field: &str) -> Option<i64> {
1773    raw.get(field).and_then(JsonValue::as_i64)
1774}
1775
1776fn slack_payload(
1777    kind: &str,
1778    _headers: &BTreeMap<String, String>,
1779    raw: JsonValue,
1780) -> ProviderPayload {
1781    let event = raw.get("event");
1782    let common = SlackEventCommon {
1783        event: kind.to_string(),
1784        event_id: raw
1785            .get("event_id")
1786            .and_then(JsonValue::as_str)
1787            .map(ToString::to_string),
1788        api_app_id: raw
1789            .get("api_app_id")
1790            .and_then(JsonValue::as_str)
1791            .map(ToString::to_string),
1792        team_id: raw
1793            .get("team_id")
1794            .and_then(JsonValue::as_str)
1795            .map(ToString::to_string),
1796        channel_id: slack_channel_id(event),
1797        user_id: slack_user_id(event),
1798        event_ts: event
1799            .and_then(|value| value.get("event_ts"))
1800            .and_then(JsonValue::as_str)
1801            .map(ToString::to_string),
1802        raw: raw.clone(),
1803    };
1804    let payload = match kind {
1805        kind if kind == "message" || kind.starts_with("message.") => {
1806            SlackEventPayload::Message(SlackMessageEventPayload {
1807                subtype: event
1808                    .and_then(|value| value.get("subtype"))
1809                    .and_then(JsonValue::as_str)
1810                    .map(ToString::to_string),
1811                channel_type: event
1812                    .and_then(|value| value.get("channel_type"))
1813                    .and_then(JsonValue::as_str)
1814                    .map(ToString::to_string),
1815                channel: event
1816                    .and_then(|value| value.get("channel"))
1817                    .and_then(JsonValue::as_str)
1818                    .map(ToString::to_string),
1819                user: event
1820                    .and_then(|value| value.get("user"))
1821                    .and_then(JsonValue::as_str)
1822                    .map(ToString::to_string),
1823                text: event
1824                    .and_then(|value| value.get("text"))
1825                    .and_then(JsonValue::as_str)
1826                    .map(ToString::to_string),
1827                ts: event
1828                    .and_then(|value| value.get("ts"))
1829                    .and_then(JsonValue::as_str)
1830                    .map(ToString::to_string),
1831                thread_ts: event
1832                    .and_then(|value| value.get("thread_ts"))
1833                    .and_then(JsonValue::as_str)
1834                    .map(ToString::to_string),
1835                common,
1836            })
1837        }
1838        "app_mention" => SlackEventPayload::AppMention(SlackAppMentionEventPayload {
1839            channel: event
1840                .and_then(|value| value.get("channel"))
1841                .and_then(JsonValue::as_str)
1842                .map(ToString::to_string),
1843            user: event
1844                .and_then(|value| value.get("user"))
1845                .and_then(JsonValue::as_str)
1846                .map(ToString::to_string),
1847            text: event
1848                .and_then(|value| value.get("text"))
1849                .and_then(JsonValue::as_str)
1850                .map(ToString::to_string),
1851            ts: event
1852                .and_then(|value| value.get("ts"))
1853                .and_then(JsonValue::as_str)
1854                .map(ToString::to_string),
1855            thread_ts: event
1856                .and_then(|value| value.get("thread_ts"))
1857                .and_then(JsonValue::as_str)
1858                .map(ToString::to_string),
1859            common,
1860        }),
1861        "reaction_added" => SlackEventPayload::ReactionAdded(SlackReactionAddedEventPayload {
1862            reaction: event
1863                .and_then(|value| value.get("reaction"))
1864                .and_then(JsonValue::as_str)
1865                .map(ToString::to_string),
1866            item_user: event
1867                .and_then(|value| value.get("item_user"))
1868                .and_then(JsonValue::as_str)
1869                .map(ToString::to_string),
1870            item: event
1871                .and_then(|value| value.get("item"))
1872                .cloned()
1873                .unwrap_or(JsonValue::Null),
1874            common,
1875        }),
1876        "app_home_opened" => SlackEventPayload::AppHomeOpened(SlackAppHomeOpenedEventPayload {
1877            user: event
1878                .and_then(|value| value.get("user"))
1879                .and_then(JsonValue::as_str)
1880                .map(ToString::to_string),
1881            channel: event
1882                .and_then(|value| value.get("channel"))
1883                .and_then(JsonValue::as_str)
1884                .map(ToString::to_string),
1885            tab: event
1886                .and_then(|value| value.get("tab"))
1887                .and_then(JsonValue::as_str)
1888                .map(ToString::to_string),
1889            view: event
1890                .and_then(|value| value.get("view"))
1891                .cloned()
1892                .unwrap_or(JsonValue::Null),
1893            common,
1894        }),
1895        "assistant_thread_started" => {
1896            let assistant_thread = event
1897                .and_then(|value| value.get("assistant_thread"))
1898                .cloned()
1899                .unwrap_or(JsonValue::Null);
1900            SlackEventPayload::AssistantThreadStarted(SlackAssistantThreadStartedEventPayload {
1901                thread_ts: assistant_thread
1902                    .get("thread_ts")
1903                    .and_then(JsonValue::as_str)
1904                    .map(ToString::to_string),
1905                context: assistant_thread
1906                    .get("context")
1907                    .cloned()
1908                    .unwrap_or(JsonValue::Null),
1909                assistant_thread,
1910                common,
1911            })
1912        }
1913        _ => SlackEventPayload::Other(common),
1914    };
1915    ProviderPayload::Known(KnownProviderPayload::Slack(Box::new(payload)))
1916}
1917
1918fn slack_channel_id(event: Option<&JsonValue>) -> Option<String> {
1919    event
1920        .and_then(|value| value.get("channel"))
1921        .and_then(JsonValue::as_str)
1922        .map(ToString::to_string)
1923        .or_else(|| {
1924            event
1925                .and_then(|value| value.get("item"))
1926                .and_then(|value| value.get("channel"))
1927                .and_then(JsonValue::as_str)
1928                .map(ToString::to_string)
1929        })
1930        .or_else(|| {
1931            event
1932                .and_then(|value| value.get("channel"))
1933                .and_then(|value| value.get("id"))
1934                .and_then(JsonValue::as_str)
1935                .map(ToString::to_string)
1936        })
1937        .or_else(|| {
1938            event
1939                .and_then(|value| value.get("assistant_thread"))
1940                .and_then(|value| value.get("channel_id"))
1941                .and_then(JsonValue::as_str)
1942                .map(ToString::to_string)
1943        })
1944}
1945
1946fn slack_user_id(event: Option<&JsonValue>) -> Option<String> {
1947    event
1948        .and_then(|value| value.get("user"))
1949        .and_then(JsonValue::as_str)
1950        .map(ToString::to_string)
1951        .or_else(|| {
1952            event
1953                .and_then(|value| value.get("user"))
1954                .and_then(|value| value.get("id"))
1955                .and_then(JsonValue::as_str)
1956                .map(ToString::to_string)
1957        })
1958        .or_else(|| {
1959            event
1960                .and_then(|value| value.get("item_user"))
1961                .and_then(JsonValue::as_str)
1962                .map(ToString::to_string)
1963        })
1964        .or_else(|| {
1965            event
1966                .and_then(|value| value.get("assistant_thread"))
1967                .and_then(|value| value.get("user_id"))
1968                .and_then(JsonValue::as_str)
1969                .map(ToString::to_string)
1970        })
1971}
1972
1973fn linear_payload(
1974    _kind: &str,
1975    headers: &BTreeMap<String, String>,
1976    raw: JsonValue,
1977) -> ProviderPayload {
1978    let common = linear_event_common(headers, &raw);
1979    let event = common.event.clone();
1980    let payload = match event.as_str() {
1981        "issue" => LinearEventPayload::Issue(LinearIssueEventPayload {
1982            common,
1983            issue: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1984            changes: parse_linear_issue_changes(raw.get("updatedFrom")),
1985        }),
1986        "comment" => LinearEventPayload::IssueComment(LinearIssueCommentEventPayload {
1987            common,
1988            comment: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1989        }),
1990        "issue_label" => LinearEventPayload::IssueLabel(LinearIssueLabelEventPayload {
1991            common,
1992            label: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1993        }),
1994        "project" => LinearEventPayload::Project(LinearProjectEventPayload {
1995            common,
1996            project: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1997        }),
1998        "cycle" => LinearEventPayload::Cycle(LinearCycleEventPayload {
1999            common,
2000            cycle: raw.get("data").cloned().unwrap_or(JsonValue::Null),
2001        }),
2002        "customer" => LinearEventPayload::Customer(LinearCustomerEventPayload {
2003            common,
2004            customer: raw.get("data").cloned().unwrap_or(JsonValue::Null),
2005        }),
2006        "customer_request" => {
2007            LinearEventPayload::CustomerRequest(LinearCustomerRequestEventPayload {
2008                common,
2009                customer_request: raw.get("data").cloned().unwrap_or(JsonValue::Null),
2010            })
2011        }
2012        _ => LinearEventPayload::Other(common),
2013    };
2014    ProviderPayload::Known(KnownProviderPayload::Linear(payload))
2015}
2016
2017fn linear_event_common(headers: &BTreeMap<String, String>, raw: &JsonValue) -> LinearEventCommon {
2018    LinearEventCommon {
2019        event: linear_event_name(
2020            raw.get("type")
2021                .and_then(JsonValue::as_str)
2022                .or_else(|| headers.get("Linear-Event").map(String::as_str)),
2023        ),
2024        action: raw
2025            .get("action")
2026            .and_then(JsonValue::as_str)
2027            .map(ToString::to_string),
2028        delivery_id: header_value(headers, "Linear-Delivery").map(ToString::to_string),
2029        organization_id: raw
2030            .get("organizationId")
2031            .and_then(JsonValue::as_str)
2032            .map(ToString::to_string),
2033        webhook_timestamp: raw.get("webhookTimestamp").and_then(parse_json_i64ish),
2034        webhook_id: raw
2035            .get("webhookId")
2036            .and_then(JsonValue::as_str)
2037            .map(ToString::to_string),
2038        url: raw
2039            .get("url")
2040            .and_then(JsonValue::as_str)
2041            .map(ToString::to_string),
2042        created_at: raw
2043            .get("createdAt")
2044            .and_then(JsonValue::as_str)
2045            .map(ToString::to_string),
2046        actor: raw.get("actor").cloned().unwrap_or(JsonValue::Null),
2047        raw: raw.clone(),
2048    }
2049}
2050
2051fn linear_event_name(raw_type: Option<&str>) -> String {
2052    match raw_type.unwrap_or_default().to_ascii_lowercase().as_str() {
2053        "issue" => "issue".to_string(),
2054        "comment" | "issuecomment" | "issue_comment" => "comment".to_string(),
2055        "issuelabel" | "issue_label" => "issue_label".to_string(),
2056        "project" | "projectupdate" | "project_update" => "project".to_string(),
2057        "cycle" => "cycle".to_string(),
2058        "customer" => "customer".to_string(),
2059        "customerrequest" | "customer_request" => "customer_request".to_string(),
2060        other if !other.is_empty() => other.to_string(),
2061        _ => "other".to_string(),
2062    }
2063}
2064
2065fn parse_linear_issue_changes(updated_from: Option<&JsonValue>) -> Vec<LinearIssueChange> {
2066    let Some(JsonValue::Object(fields)) = updated_from else {
2067        return Vec::new();
2068    };
2069    let mut changes = Vec::new();
2070    for (field, previous) in fields {
2071        let change = match field.as_str() {
2072            "title" => LinearIssueChange::Title {
2073                previous: previous.as_str().map(ToString::to_string),
2074            },
2075            "description" => LinearIssueChange::Description {
2076                previous: previous.as_str().map(ToString::to_string),
2077            },
2078            "priority" => LinearIssueChange::Priority {
2079                previous: parse_json_i64ish(previous),
2080            },
2081            "estimate" => LinearIssueChange::Estimate {
2082                previous: parse_json_i64ish(previous),
2083            },
2084            "stateId" => LinearIssueChange::StateId {
2085                previous: previous.as_str().map(ToString::to_string),
2086            },
2087            "teamId" => LinearIssueChange::TeamId {
2088                previous: previous.as_str().map(ToString::to_string),
2089            },
2090            "assigneeId" => LinearIssueChange::AssigneeId {
2091                previous: previous.as_str().map(ToString::to_string),
2092            },
2093            "projectId" => LinearIssueChange::ProjectId {
2094                previous: previous.as_str().map(ToString::to_string),
2095            },
2096            "cycleId" => LinearIssueChange::CycleId {
2097                previous: previous.as_str().map(ToString::to_string),
2098            },
2099            "dueDate" => LinearIssueChange::DueDate {
2100                previous: previous.as_str().map(ToString::to_string),
2101            },
2102            "parentId" => LinearIssueChange::ParentId {
2103                previous: previous.as_str().map(ToString::to_string),
2104            },
2105            "sortOrder" => LinearIssueChange::SortOrder {
2106                previous: previous.as_f64(),
2107            },
2108            "labelIds" => LinearIssueChange::LabelIds {
2109                previous: parse_string_array(previous),
2110            },
2111            "completedAt" => LinearIssueChange::CompletedAt {
2112                previous: previous.as_str().map(ToString::to_string),
2113            },
2114            _ => LinearIssueChange::Other {
2115                field: field.clone(),
2116                previous: previous.clone(),
2117            },
2118        };
2119        changes.push(change);
2120    }
2121    changes
2122}
2123
2124fn parse_json_i64ish(value: &JsonValue) -> Option<i64> {
2125    value
2126        .as_i64()
2127        .or_else(|| value.as_u64().and_then(|raw| i64::try_from(raw).ok()))
2128        .or_else(|| value.as_str().and_then(|raw| raw.parse::<i64>().ok()))
2129}
2130
2131fn parse_string_array(value: &JsonValue) -> Vec<String> {
2132    let Some(array) = value.as_array() else {
2133        return Vec::new();
2134    };
2135    array
2136        .iter()
2137        .filter_map(|entry| {
2138            entry.as_str().map(ToString::to_string).or_else(|| {
2139                entry
2140                    .get("id")
2141                    .and_then(JsonValue::as_str)
2142                    .map(ToString::to_string)
2143            })
2144        })
2145        .collect()
2146}
2147
2148fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
2149    headers
2150        .iter()
2151        .find(|(key, _)| key.eq_ignore_ascii_case(name))
2152        .map(|(_, value)| value.as_str())
2153}
2154
2155fn notion_payload(
2156    kind: &str,
2157    headers: &BTreeMap<String, String>,
2158    raw: JsonValue,
2159) -> ProviderPayload {
2160    let workspace_id = raw
2161        .get("workspace_id")
2162        .and_then(JsonValue::as_str)
2163        .map(ToString::to_string);
2164    ProviderPayload::Known(KnownProviderPayload::Notion(Box::new(NotionEventPayload {
2165        event: kind.to_string(),
2166        workspace_id,
2167        request_id: headers
2168            .get("request-id")
2169            .cloned()
2170            .or_else(|| headers.get("x-request-id").cloned()),
2171        subscription_id: raw
2172            .get("subscription_id")
2173            .and_then(JsonValue::as_str)
2174            .map(ToString::to_string),
2175        integration_id: raw
2176            .get("integration_id")
2177            .and_then(JsonValue::as_str)
2178            .map(ToString::to_string),
2179        attempt_number: raw
2180            .get("attempt_number")
2181            .and_then(JsonValue::as_u64)
2182            .and_then(|value| u32::try_from(value).ok()),
2183        entity_id: raw
2184            .get("entity")
2185            .and_then(|value| value.get("id"))
2186            .and_then(JsonValue::as_str)
2187            .map(ToString::to_string),
2188        entity_type: raw
2189            .get("entity")
2190            .and_then(|value| value.get("type"))
2191            .and_then(JsonValue::as_str)
2192            .map(ToString::to_string),
2193        api_version: raw
2194            .get("api_version")
2195            .and_then(JsonValue::as_str)
2196            .map(ToString::to_string),
2197        verification_token: raw
2198            .get("verification_token")
2199            .and_then(JsonValue::as_str)
2200            .map(ToString::to_string),
2201        polled: None,
2202        raw,
2203    })))
2204}
2205
2206fn cron_payload(
2207    _kind: &str,
2208    _headers: &BTreeMap<String, String>,
2209    raw: JsonValue,
2210) -> ProviderPayload {
2211    let cron_id = raw
2212        .get("cron_id")
2213        .and_then(JsonValue::as_str)
2214        .map(ToString::to_string);
2215    let schedule = raw
2216        .get("schedule")
2217        .and_then(JsonValue::as_str)
2218        .map(ToString::to_string);
2219    let tick_at = raw
2220        .get("tick_at")
2221        .and_then(JsonValue::as_str)
2222        .and_then(parse_rfc3339)
2223        .unwrap_or_else(OffsetDateTime::now_utc);
2224    ProviderPayload::Known(KnownProviderPayload::Cron(CronEventPayload {
2225        cron_id,
2226        schedule,
2227        tick_at,
2228        raw,
2229    }))
2230}
2231
2232fn webhook_payload(
2233    _kind: &str,
2234    headers: &BTreeMap<String, String>,
2235    raw: JsonValue,
2236) -> ProviderPayload {
2237    ProviderPayload::Known(KnownProviderPayload::Webhook(GenericWebhookPayload {
2238        source: headers.get("X-Webhook-Source").cloned(),
2239        content_type: headers.get("Content-Type").cloned(),
2240        raw,
2241    }))
2242}
2243
2244fn a2a_push_payload(
2245    _kind: &str,
2246    _headers: &BTreeMap<String, String>,
2247    raw: JsonValue,
2248) -> ProviderPayload {
2249    let task_id = raw
2250        .get("task_id")
2251        .and_then(JsonValue::as_str)
2252        .map(ToString::to_string);
2253    let sender = raw
2254        .get("sender")
2255        .and_then(JsonValue::as_str)
2256        .map(ToString::to_string);
2257    let task_state = raw
2258        .pointer("/status/state")
2259        .or_else(|| raw.pointer("/statusUpdate/status/state"))
2260        .and_then(JsonValue::as_str)
2261        .map(|state| match state {
2262            "cancelled" => "canceled".to_string(),
2263            other => other.to_string(),
2264        });
2265    let artifact = raw
2266        .pointer("/artifactUpdate/artifact")
2267        .or_else(|| raw.get("artifact"))
2268        .cloned();
2269    let kind = task_state
2270        .as_deref()
2271        .map(|state| format!("a2a.task.{state}"))
2272        .unwrap_or_else(|| "a2a.task.update".to_string());
2273    ProviderPayload::Known(KnownProviderPayload::A2aPush(A2aPushPayload {
2274        task_id,
2275        task_state,
2276        artifact,
2277        sender,
2278        raw,
2279        kind,
2280    }))
2281}
2282
2283fn kafka_payload(
2284    kind: &str,
2285    headers: &BTreeMap<String, String>,
2286    raw: JsonValue,
2287) -> ProviderPayload {
2288    ProviderPayload::Known(KnownProviderPayload::Kafka(stream_payload(
2289        kind, headers, raw,
2290    )))
2291}
2292
2293fn nats_payload(kind: &str, headers: &BTreeMap<String, String>, raw: JsonValue) -> ProviderPayload {
2294    ProviderPayload::Known(KnownProviderPayload::Nats(stream_payload(
2295        kind, headers, raw,
2296    )))
2297}
2298
2299fn pulsar_payload(
2300    kind: &str,
2301    headers: &BTreeMap<String, String>,
2302    raw: JsonValue,
2303) -> ProviderPayload {
2304    ProviderPayload::Known(KnownProviderPayload::Pulsar(stream_payload(
2305        kind, headers, raw,
2306    )))
2307}
2308
2309fn postgres_cdc_payload(
2310    kind: &str,
2311    headers: &BTreeMap<String, String>,
2312    raw: JsonValue,
2313) -> ProviderPayload {
2314    ProviderPayload::Known(KnownProviderPayload::PostgresCdc(stream_payload(
2315        kind, headers, raw,
2316    )))
2317}
2318
2319fn email_payload(
2320    kind: &str,
2321    headers: &BTreeMap<String, String>,
2322    raw: JsonValue,
2323) -> ProviderPayload {
2324    ProviderPayload::Known(KnownProviderPayload::Email(stream_payload(
2325        kind, headers, raw,
2326    )))
2327}
2328
2329fn websocket_payload(
2330    kind: &str,
2331    headers: &BTreeMap<String, String>,
2332    raw: JsonValue,
2333) -> ProviderPayload {
2334    ProviderPayload::Known(KnownProviderPayload::Websocket(stream_payload(
2335        kind, headers, raw,
2336    )))
2337}
2338
2339fn stream_payload(
2340    kind: &str,
2341    headers: &BTreeMap<String, String>,
2342    raw: JsonValue,
2343) -> StreamEventPayload {
2344    StreamEventPayload {
2345        event: kind.to_string(),
2346        source: json_stringish(&raw, &["source", "connector", "origin"]),
2347        stream: json_stringish(
2348            &raw,
2349            &["stream", "topic", "subject", "channel", "mailbox", "slot"],
2350        ),
2351        partition: json_stringish(&raw, &["partition", "shard", "consumer"]),
2352        offset: json_stringish(&raw, &["offset", "sequence", "lsn", "message_id"]),
2353        key: json_stringish(&raw, &["key", "message_key", "id", "event_id"]),
2354        timestamp: json_stringish(&raw, &["timestamp", "occurred_at", "received_at", "ts"]),
2355        headers: headers.clone(),
2356        raw,
2357    }
2358}
2359
2360fn json_stringish(raw: &JsonValue, fields: &[&str]) -> Option<String> {
2361    fields.iter().find_map(|field| {
2362        let value = raw.get(*field)?;
2363        value
2364            .as_str()
2365            .map(ToString::to_string)
2366            .or_else(|| parse_json_i64ish(value).map(|number| number.to_string()))
2367            .or_else(|| value.as_u64().map(|number| number.to_string()))
2368    })
2369}
2370
2371fn parse_rfc3339(text: &str) -> Option<OffsetDateTime> {
2372    OffsetDateTime::parse(text, &time::format_description::well_known::Rfc3339).ok()
2373}
2374
2375#[cfg(test)]
2376mod tests {
2377    use super::*;
2378
2379    struct OwnedProviderSchema {
2380        metadata: ProviderMetadata,
2381    }
2382
2383    impl OwnedProviderSchema {
2384        fn new(provider: &str, schema_name: &str) -> Self {
2385            Self {
2386                metadata: ProviderMetadata {
2387                    provider: provider.to_string(),
2388                    kinds: vec!["webhook".to_string()],
2389                    schema_name: schema_name.to_string(),
2390                    runtime: ProviderRuntimeMetadata::Placeholder,
2391                    ..ProviderMetadata::default()
2392                },
2393            }
2394        }
2395    }
2396
2397    impl ProviderSchema for OwnedProviderSchema {
2398        fn provider_id(&self) -> &str {
2399            &self.metadata.provider
2400        }
2401
2402        fn harn_schema_name(&self) -> &str {
2403            &self.metadata.schema_name
2404        }
2405
2406        fn metadata(&self) -> ProviderMetadata {
2407            self.metadata.clone()
2408        }
2409
2410        fn normalize(
2411            &self,
2412            _kind: &str,
2413            _headers: &BTreeMap<String, String>,
2414            raw: JsonValue,
2415        ) -> Result<ProviderPayload, ProviderCatalogError> {
2416            Ok(ProviderPayload::Extension(ExtensionProviderPayload {
2417                provider: self.metadata.provider.clone(),
2418                schema_name: self.metadata.schema_name.clone(),
2419                raw,
2420            }))
2421        }
2422    }
2423
2424    fn owned_provider_schema(provider: &str, schema_name: &str) -> Arc<dyn ProviderSchema> {
2425        Arc::new(OwnedProviderSchema::new(provider, schema_name))
2426    }
2427
2428    fn sample_headers() -> BTreeMap<String, String> {
2429        BTreeMap::from([
2430            ("Authorization".to_string(), "Bearer secret".to_string()),
2431            ("Cookie".to_string(), "session=abc".to_string()),
2432            ("User-Agent".to_string(), "GitHub-Hookshot/123".to_string()),
2433            ("X-GitHub-Delivery".to_string(), "delivery-123".to_string()),
2434            ("X-GitHub-Event".to_string(), "issues".to_string()),
2435            ("X-Webhook-Token".to_string(), "token".to_string()),
2436        ])
2437    }
2438
2439    #[test]
2440    fn default_redaction_policy_keeps_safe_headers() {
2441        let redacted = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
2442        assert_eq!(redacted.get("User-Agent").unwrap(), "GitHub-Hookshot/123");
2443        assert_eq!(redacted.get("X-GitHub-Delivery").unwrap(), "delivery-123");
2444        assert_eq!(
2445            redacted.get("Authorization").unwrap(),
2446            REDACTED_HEADER_VALUE
2447        );
2448        assert_eq!(redacted.get("Cookie").unwrap(), REDACTED_HEADER_VALUE);
2449        assert_eq!(
2450            redacted.get("X-Webhook-Token").unwrap(),
2451            REDACTED_HEADER_VALUE
2452        );
2453    }
2454
2455    #[test]
2456    fn provider_catalog_rejects_duplicates() {
2457        let mut catalog = ProviderCatalog::default();
2458        catalog
2459            .register(Arc::new(BuiltinProviderSchema {
2460                provider_id: "github",
2461                harn_schema_name: "GitHubEventPayload",
2462                metadata: provider_metadata_entry(
2463                    "github",
2464                    &["webhook"],
2465                    "GitHubEventPayload",
2466                    &[],
2467                    SignatureVerificationMetadata::None,
2468                    Vec::new(),
2469                    ProviderRuntimeMetadata::Placeholder,
2470                ),
2471                normalize: github_payload,
2472            }))
2473            .unwrap();
2474        let error = catalog
2475            .register(Arc::new(BuiltinProviderSchema {
2476                provider_id: "github",
2477                harn_schema_name: "GitHubEventPayload",
2478                metadata: provider_metadata_entry(
2479                    "github",
2480                    &["webhook"],
2481                    "GitHubEventPayload",
2482                    &[],
2483                    SignatureVerificationMetadata::None,
2484                    Vec::new(),
2485                    ProviderRuntimeMetadata::Placeholder,
2486                ),
2487                normalize: github_payload,
2488            }))
2489            .unwrap_err();
2490        assert_eq!(
2491            error,
2492            ProviderCatalogError::DuplicateProvider("github".to_string())
2493        );
2494    }
2495
2496    #[test]
2497    fn provider_catalog_builds_independent_owned_dynamic_catalogs() {
2498        let first = ProviderCatalog::with_defaults_and(vec![owned_provider_schema(
2499            "runtime-a",
2500            "RuntimeAPayload",
2501        )])
2502        .unwrap();
2503        assert_eq!(
2504            first
2505                .metadata_for("runtime-a")
2506                .expect("first dynamic provider")
2507                .schema_name,
2508            "RuntimeAPayload"
2509        );
2510        assert!(first.metadata_for("runtime-b").is_none());
2511
2512        let second = ProviderCatalog::with_defaults_and(vec![owned_provider_schema(
2513            "runtime-b",
2514            "RuntimeBPayload",
2515        )])
2516        .unwrap();
2517        assert!(second.metadata_for("runtime-a").is_none());
2518        assert_eq!(
2519            second
2520                .metadata_for("runtime-b")
2521                .expect("second dynamic provider")
2522                .schema_name,
2523            "RuntimeBPayload"
2524        );
2525        assert!(first.metadata_for("runtime-a").is_some());
2526    }
2527
2528    #[test]
2529    fn registered_provider_metadata_marks_builtin_connectors() {
2530        let entries = registered_provider_metadata();
2531        let builtin: Vec<&ProviderMetadata> = entries
2532            .iter()
2533            .filter(|entry| matches!(entry.runtime, ProviderRuntimeMetadata::Builtin { .. }))
2534            .collect();
2535
2536        assert_eq!(builtin.len(), 9);
2537        assert!(builtin.iter().any(|entry| entry.provider == "a2a-push"));
2538        assert!(builtin.iter().any(|entry| entry.provider == "cron"));
2539        assert!(builtin.iter().any(|entry| entry.provider == "webhook"));
2540        for provider in ["github", "linear", "notion", "slack"] {
2541            let entry = entries
2542                .iter()
2543                .find(|entry| entry.provider == provider)
2544                .expect("first-party package-backed provider metadata");
2545            assert!(matches!(
2546                entry.runtime,
2547                ProviderRuntimeMetadata::Placeholder
2548            ));
2549        }
2550        let kafka = entries
2551            .iter()
2552            .find(|entry| entry.provider == "kafka")
2553            .expect("kafka stream provider");
2554        assert_eq!(kafka.kinds, vec!["stream".to_string()]);
2555        assert_eq!(kafka.schema_name, "StreamEventPayload");
2556        assert!(matches!(
2557            kafka.runtime,
2558            ProviderRuntimeMetadata::Builtin {
2559                ref connector,
2560                default_signature_variant: None
2561            } if connector == "stream"
2562        ));
2563    }
2564
2565    #[test]
2566    fn trigger_event_round_trip_is_stable() {
2567        let provider = ProviderId::from("github");
2568        let headers = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
2569        let payload = ProviderPayload::normalize(
2570            &provider,
2571            "issues",
2572            &sample_headers(),
2573            serde_json::json!({
2574                "action": "opened",
2575                "installation": {"id": 42},
2576                "issue": {"number": 99}
2577            }),
2578        )
2579        .unwrap();
2580        let event = TriggerEvent {
2581            id: TriggerEventId("trigger_evt_fixed".to_string()),
2582            provider,
2583            kind: "issues".to_string(),
2584            received_at: parse_rfc3339("2026-04-19T07:00:00Z").unwrap(),
2585            occurred_at: Some(parse_rfc3339("2026-04-19T06:59:59Z").unwrap()),
2586            dedupe_key: "delivery-123".to_string(),
2587            trace_id: TraceId("trace_fixed".to_string()),
2588            tenant_id: Some(TenantId("tenant_1".to_string())),
2589            headers,
2590            provider_payload: payload,
2591            signature_status: SignatureStatus::Verified,
2592            dedupe_claimed: false,
2593            batch: None,
2594            raw_body: Some(vec![0, 159, 255, 10]),
2595        };
2596
2597        let once = serde_json::to_value(&event).unwrap();
2598        assert_eq!(once["raw_body"], serde_json::json!("AJ//Cg=="));
2599        let decoded: TriggerEvent = serde_json::from_value(once.clone()).unwrap();
2600        let twice = serde_json::to_value(&decoded).unwrap();
2601        assert_eq!(decoded, event);
2602        assert_eq!(once, twice);
2603    }
2604
2605    #[test]
2606    fn unknown_provider_errors() {
2607        let error = ProviderPayload::normalize(
2608            &ProviderId::from("custom-provider"),
2609            "thing.happened",
2610            &BTreeMap::new(),
2611            serde_json::json!({"ok": true}),
2612        )
2613        .unwrap_err();
2614        assert_eq!(
2615            error,
2616            ProviderCatalogError::UnknownProvider("custom-provider".to_string())
2617        );
2618    }
2619
2620    fn github_headers(event: &str, delivery: &str) -> BTreeMap<String, String> {
2621        BTreeMap::from([
2622            ("X-GitHub-Event".to_string(), event.to_string()),
2623            ("X-GitHub-Delivery".to_string(), delivery.to_string()),
2624        ])
2625    }
2626
2627    fn unwrap_github(payload: ProviderPayload) -> GitHubEventPayload {
2628        match payload {
2629            ProviderPayload::Known(KnownProviderPayload::GitHub(p)) => p,
2630            other => panic!("expected GitHub payload, got {other:?}"),
2631        }
2632    }
2633
2634    /// Mirror of the connector's normalized webhook payload shape: the
2635    /// connector wraps the original GitHub body as `raw` and promotes
2636    /// stable common + event-specific fields.
2637    fn connector_normalized(
2638        event: &str,
2639        delivery: &str,
2640        installation_id: i64,
2641        action: Option<&str>,
2642        original: serde_json::Value,
2643        promoted: serde_json::Value,
2644    ) -> serde_json::Value {
2645        let mut common = serde_json::json!({
2646            "provider": "github",
2647            "event": event,
2648            "topic": match action {
2649                Some(a) => format!("github.{event}.{a}"),
2650                None => format!("github.{event}"),
2651            },
2652            "delivery_id": delivery,
2653            "installation_id": installation_id,
2654            "repository": original.get("repository").cloned().unwrap_or(JsonValue::Null),
2655            "repo": serde_json::json!({"owner": "octo-org", "name": "octo-repo", "full_name": "octo-org/octo-repo"}),
2656            "raw": original,
2657        });
2658        if let Some(a) = action {
2659            common["action"] = serde_json::json!(a);
2660        }
2661        let common_obj = common.as_object_mut().unwrap();
2662        if let Some(promoted_obj) = promoted.as_object() {
2663            for (k, v) in promoted_obj {
2664                common_obj.insert(k.clone(), v.clone());
2665            }
2666        }
2667        common
2668    }
2669
2670    #[test]
2671    fn github_check_suite_event_promotes_typed_fields() {
2672        let original = serde_json::json!({
2673            "action": "requested",
2674            "check_suite": {
2675                "id": 8101,
2676                "status": "queued",
2677                "conclusion": null,
2678                "head_sha": "ccccccccccccccccccccccccccccccccccccccc1",
2679                "head_branch": "feature/x",
2680            },
2681            "repository": {"full_name": "octo-org/octo-repo"},
2682            "installation": {"id": 3001},
2683        });
2684        let normalized = connector_normalized(
2685            "check_suite",
2686            "delivery-cs",
2687            3001,
2688            Some("requested"),
2689            original.clone(),
2690            serde_json::json!({
2691                "check_suite": original["check_suite"].clone(),
2692                "check_suite_id": 8101,
2693                "head_sha": "ccccccccccccccccccccccccccccccccccccccc1",
2694                "head_ref": "feature/x",
2695                "status": "queued",
2696            }),
2697        );
2698        let provider = ProviderId::from("github");
2699        let payload = ProviderPayload::normalize(
2700            &provider,
2701            "check_suite",
2702            &github_headers("check_suite", "delivery-cs"),
2703            normalized,
2704        )
2705        .expect("check_suite payload");
2706        let GitHubEventPayload::CheckSuite(check_suite) = unwrap_github(payload) else {
2707            panic!("expected CheckSuite variant");
2708        };
2709        assert_eq!(check_suite.common.event, "check_suite");
2710        assert_eq!(check_suite.common.action.as_deref(), Some("requested"));
2711        assert_eq!(
2712            check_suite.common.delivery_id.as_deref(),
2713            Some("delivery-cs")
2714        );
2715        assert_eq!(check_suite.common.installation_id, Some(3001));
2716        assert_eq!(
2717            check_suite.common.topic.as_deref(),
2718            Some("github.check_suite.requested")
2719        );
2720        assert!(check_suite.common.repository.is_some());
2721        assert!(check_suite.common.repo.is_some());
2722        assert_eq!(check_suite.check_suite_id, Some(8101));
2723        assert_eq!(
2724            check_suite.head_sha.as_deref(),
2725            Some("ccccccccccccccccccccccccccccccccccccccc1")
2726        );
2727        assert_eq!(check_suite.head_ref.as_deref(), Some("feature/x"));
2728        assert_eq!(check_suite.status.as_deref(), Some("queued"));
2729        // Original body is preserved as the escape-hatch raw.
2730        assert_eq!(check_suite.common.raw, original);
2731    }
2732
2733    #[test]
2734    fn github_status_event_promotes_typed_fields() {
2735        let original = serde_json::json!({
2736            "id": 9101,
2737            "sha": "ccccccccccccccccccccccccccccccccccccccc1",
2738            "state": "success",
2739            "context": "legacy/status",
2740            "target_url": "https://ci.example.test/octo-repo/9101",
2741            "branches": [{"name": "main"}],
2742            "repository": {"full_name": "octo-org/octo-repo"},
2743            "installation": {"id": 3001},
2744        });
2745        let normalized = connector_normalized(
2746            "status",
2747            "delivery-status",
2748            3001,
2749            None,
2750            original.clone(),
2751            serde_json::json!({
2752                "commit_status": original.clone(),
2753                "status_id": 9101,
2754                "head_sha": "ccccccccccccccccccccccccccccccccccccccc1",
2755                "head_ref": "main",
2756                "base_ref": "main",
2757                "state": "success",
2758                "context": "legacy/status",
2759                "target_url": "https://ci.example.test/octo-repo/9101",
2760            }),
2761        );
2762        let provider = ProviderId::from("github");
2763        let payload = ProviderPayload::normalize(
2764            &provider,
2765            "status",
2766            &github_headers("status", "delivery-status"),
2767            normalized,
2768        )
2769        .expect("status payload");
2770        let GitHubEventPayload::Status(status) = unwrap_github(payload) else {
2771            panic!("expected Status variant");
2772        };
2773        assert_eq!(status.common.event, "status");
2774        assert_eq!(status.common.installation_id, Some(3001));
2775        assert_eq!(status.status_id, Some(9101));
2776        assert_eq!(status.state.as_deref(), Some("success"));
2777        assert_eq!(status.context.as_deref(), Some("legacy/status"));
2778        assert_eq!(
2779            status.target_url.as_deref(),
2780            Some("https://ci.example.test/octo-repo/9101")
2781        );
2782        assert_eq!(
2783            status.head_sha.as_deref(),
2784            Some("ccccccccccccccccccccccccccccccccccccccc1")
2785        );
2786        assert!(status.commit_status.is_some());
2787    }
2788
2789    #[test]
2790    fn github_merge_group_event_promotes_typed_fields() {
2791        let original = serde_json::json!({
2792            "action": "checks_requested",
2793            "merge_group": {
2794                "id": 9201,
2795                "head_ref": "gh-readonly-queue/main/pr-42",
2796                "head_sha": "ddddddddddddddddddddddddddddddddddddddd1",
2797                "base_ref": "main",
2798                "base_sha": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa1",
2799                "pull_requests": [{"number": 42}, {"number": 43}],
2800            },
2801            "repository": {"full_name": "octo-org/octo-repo"},
2802            "installation": {"id": 3001},
2803        });
2804        let normalized = connector_normalized(
2805            "merge_group",
2806            "delivery-mg",
2807            3001,
2808            Some("checks_requested"),
2809            original.clone(),
2810            serde_json::json!({
2811                "merge_group": original["merge_group"].clone(),
2812                "merge_group_id": 9201,
2813                "head_sha": "ddddddddddddddddddddddddddddddddddddddd1",
2814                "head_ref": "gh-readonly-queue/main/pr-42",
2815                "base_sha": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa1",
2816                "base_ref": "main",
2817                "pull_requests": [{"number": 42}, {"number": 43}],
2818                "pull_request_numbers": [42, 43],
2819            }),
2820        );
2821        let provider = ProviderId::from("github");
2822        let payload = ProviderPayload::normalize(
2823            &provider,
2824            "merge_group",
2825            &github_headers("merge_group", "delivery-mg"),
2826            normalized,
2827        )
2828        .expect("merge_group payload");
2829        let GitHubEventPayload::MergeGroup(mg) = unwrap_github(payload) else {
2830            panic!("expected MergeGroup variant");
2831        };
2832        assert_eq!(mg.common.event, "merge_group");
2833        assert_eq!(mg.common.action.as_deref(), Some("checks_requested"));
2834        assert_eq!(mg.merge_group_id, Some(serde_json::json!(9201)));
2835        assert_eq!(mg.head_ref.as_deref(), Some("gh-readonly-queue/main/pr-42"));
2836        assert_eq!(
2837            mg.base_sha.as_deref(),
2838            Some("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa1")
2839        );
2840        assert_eq!(mg.base_ref.as_deref(), Some("main"));
2841        assert_eq!(mg.pull_request_numbers, vec![42i64, 43i64]);
2842        assert_eq!(mg.pull_requests.len(), 2);
2843    }
2844
2845    #[test]
2846    fn github_installation_event_promotes_typed_fields() {
2847        let original = serde_json::json!({
2848            "action": "suspend",
2849            "installation": {
2850                "id": 3001,
2851                "account": {"login": "octo-org"},
2852                "repository_selection": "selected",
2853                "suspended_at": "2026-04-20T18:00:00Z",
2854            },
2855            "repositories": [{"full_name": "octo-org/octo-repo"}],
2856        });
2857        let normalized = connector_normalized(
2858            "installation",
2859            "delivery-inst",
2860            3001,
2861            Some("suspend"),
2862            original.clone(),
2863            serde_json::json!({
2864                "installation": original["installation"].clone(),
2865                "account": {"login": "octo-org"},
2866                "installation_state": "suspended",
2867                "suspended": true,
2868                "revoked": false,
2869                "repositories": original["repositories"].clone(),
2870            }),
2871        );
2872        let provider = ProviderId::from("github");
2873        let payload = ProviderPayload::normalize(
2874            &provider,
2875            "installation",
2876            &github_headers("installation", "delivery-inst"),
2877            normalized,
2878        )
2879        .expect("installation payload");
2880        let GitHubEventPayload::Installation(inst) = unwrap_github(payload) else {
2881            panic!("expected Installation variant");
2882        };
2883        assert_eq!(inst.common.event, "installation");
2884        assert_eq!(inst.common.action.as_deref(), Some("suspend"));
2885        assert_eq!(inst.installation_state.as_deref(), Some("suspended"));
2886        assert_eq!(inst.suspended, Some(true));
2887        assert_eq!(inst.revoked, Some(false));
2888        assert_eq!(inst.repositories.len(), 1);
2889        assert!(inst.account.is_some());
2890    }
2891
2892    #[test]
2893    fn github_installation_repositories_event_promotes_typed_fields() {
2894        let original = serde_json::json!({
2895            "action": "removed",
2896            "installation": {"id": 3001, "account": {"login": "octo-org"}},
2897            "repository_selection": "selected",
2898            "repositories_added": [],
2899            "repositories_removed": [
2900                {"id": 4001, "full_name": "octo-org/octo-repo"},
2901            ],
2902        });
2903        let normalized = connector_normalized(
2904            "installation_repositories",
2905            "delivery-inst-repos",
2906            3001,
2907            Some("removed"),
2908            original.clone(),
2909            serde_json::json!({
2910                "installation": original["installation"].clone(),
2911                "account": {"login": "octo-org"},
2912                "installation_state": "revoked",
2913                "suspended": false,
2914                "revoked": true,
2915                "repository_selection": "selected",
2916                "repositories_added": [],
2917                "repositories_removed": original["repositories_removed"].clone(),
2918            }),
2919        );
2920        let provider = ProviderId::from("github");
2921        let payload = ProviderPayload::normalize(
2922            &provider,
2923            "installation_repositories",
2924            &github_headers("installation_repositories", "delivery-inst-repos"),
2925            normalized,
2926        )
2927        .expect("installation_repositories payload");
2928        let GitHubEventPayload::InstallationRepositories(repos) = unwrap_github(payload) else {
2929            panic!("expected InstallationRepositories variant");
2930        };
2931        assert_eq!(repos.common.event, "installation_repositories");
2932        assert_eq!(repos.common.action.as_deref(), Some("removed"));
2933        assert_eq!(repos.repository_selection.as_deref(), Some("selected"));
2934        assert!(repos.repositories_added.is_empty());
2935        assert_eq!(repos.repositories_removed.len(), 1);
2936        assert_eq!(
2937            repos.repositories_removed[0]
2938                .get("full_name")
2939                .and_then(|v| v.as_str()),
2940            Some("octo-org/octo-repo"),
2941        );
2942        assert_eq!(repos.installation_state.as_deref(), Some("revoked"));
2943        assert_eq!(repos.revoked, Some(true));
2944    }
2945
2946    #[test]
2947    fn github_legacy_direct_webhook_still_normalizes() {
2948        // Direct GitHub webhook bodies (no connector wrapper) should
2949        // continue to populate installation_id from `installation.id` and
2950        // leave the new common fields unset.
2951        let provider = ProviderId::from("github");
2952        let payload = ProviderPayload::normalize(
2953            &provider,
2954            "issues",
2955            &github_headers("issues", "delivery-legacy"),
2956            serde_json::json!({
2957                "action": "opened",
2958                "installation": {"id": 99},
2959                "issue": {"number": 7},
2960            }),
2961        )
2962        .expect("legacy issues payload");
2963        let GitHubEventPayload::Issues(issues) = unwrap_github(payload) else {
2964            panic!("expected Issues variant");
2965        };
2966        assert_eq!(issues.common.installation_id, Some(99));
2967        assert_eq!(
2968            issues.common.delivery_id.as_deref(),
2969            Some("delivery-legacy")
2970        );
2971        assert!(issues.common.topic.is_none());
2972        assert!(issues.common.repo.is_none());
2973        assert_eq!(issues.issue.get("number").and_then(|v| v.as_i64()), Some(7));
2974    }
2975
2976    #[test]
2977    fn github_new_event_variants_round_trip_through_serde() {
2978        // Untagged enums match in declaration order. Make sure each new
2979        // event kind serializes and deserializes back to the same variant
2980        // — i.e. it is not silently absorbed into an earlier variant such
2981        // as `Push` (whose only required field is defaulted) or `Other`.
2982        let provider = ProviderId::from("github");
2983        let cases: &[(&str, serde_json::Value, &str)] = &[
2984            (
2985                "check_suite",
2986                serde_json::json!({
2987                    "event": "check_suite",
2988                    "check_suite": {"id": 1},
2989                    "check_suite_id": 1,
2990                    "raw": {"check_suite": {"id": 1}},
2991                }),
2992                "CheckSuite",
2993            ),
2994            (
2995                "status",
2996                serde_json::json!({
2997                    "event": "status",
2998                    "commit_status": {"id": 9},
2999                    "status_id": 9,
3000                    "state": "success",
3001                    "raw": {"id": 9, "state": "success"},
3002                }),
3003                "Status",
3004            ),
3005            (
3006                "merge_group",
3007                serde_json::json!({
3008                    "event": "merge_group",
3009                    "merge_group": {"id": 1},
3010                    "merge_group_id": 1,
3011                    "raw": {"merge_group": {"id": 1}},
3012                }),
3013                "MergeGroup",
3014            ),
3015            (
3016                "installation",
3017                serde_json::json!({
3018                    "event": "installation",
3019                    "installation": {"id": 1},
3020                    "installation_state": "active",
3021                    "suspended": false,
3022                    "raw": {"installation": {"id": 1}},
3023                }),
3024                "Installation",
3025            ),
3026            (
3027                "installation_repositories",
3028                serde_json::json!({
3029                    "event": "installation_repositories",
3030                    "installation": {"id": 1},
3031                    "repository_selection": "selected",
3032                    "repositories_added": [],
3033                    "repositories_removed": [{"id": 7}],
3034                    "raw": {"installation": {"id": 1}},
3035                }),
3036                "InstallationRepositories",
3037            ),
3038        ];
3039        for (kind, raw, want_variant) in cases {
3040            let payload = ProviderPayload::normalize(
3041                &provider,
3042                kind,
3043                &github_headers(kind, "delivery"),
3044                raw.clone(),
3045            )
3046            .unwrap_or_else(|_| panic!("normalize {kind}"));
3047            let serialized = serde_json::to_value(&payload).expect("serialize");
3048            let deserialized: ProviderPayload =
3049                serde_json::from_value(serialized.clone()).expect("deserialize");
3050            let actual_variant = match unwrap_github(deserialized) {
3051                GitHubEventPayload::Issues(_) => "Issues",
3052                GitHubEventPayload::PullRequest(_) => "PullRequest",
3053                GitHubEventPayload::IssueComment(_) => "IssueComment",
3054                GitHubEventPayload::PullRequestReview(_) => "PullRequestReview",
3055                GitHubEventPayload::Push(_) => "Push",
3056                GitHubEventPayload::WorkflowRun(_) => "WorkflowRun",
3057                GitHubEventPayload::DeploymentStatus(_) => "DeploymentStatus",
3058                GitHubEventPayload::CheckRun(_) => "CheckRun",
3059                GitHubEventPayload::CheckSuite(_) => "CheckSuite",
3060                GitHubEventPayload::Status(_) => "Status",
3061                GitHubEventPayload::MergeGroup(_) => "MergeGroup",
3062                GitHubEventPayload::Installation(_) => "Installation",
3063                GitHubEventPayload::InstallationRepositories(_) => "InstallationRepositories",
3064                GitHubEventPayload::Other(_) => "Other",
3065            };
3066            assert_eq!(
3067                actual_variant, *want_variant,
3068                "{kind} round-tripped as {actual_variant}, expected {want_variant}; serialized form: {serialized}"
3069            );
3070        }
3071    }
3072
3073    #[test]
3074    fn provider_normalizes_stream_payloads() {
3075        let payload = ProviderPayload::normalize(
3076            &ProviderId::from("kafka"),
3077            "quote.tick",
3078            &BTreeMap::from([("x-source".to_string(), "feed".to_string())]),
3079            serde_json::json!({
3080                "topic": "quotes",
3081                "partition": 7,
3082                "offset": "42",
3083                "key": "AAPL",
3084                "timestamp": "2026-04-21T12:00:00Z"
3085            }),
3086        )
3087        .expect("stream payload");
3088        let ProviderPayload::Known(KnownProviderPayload::Kafka(payload)) = payload else {
3089            panic!("expected kafka stream payload")
3090        };
3091        assert_eq!(payload.event, "quote.tick");
3092        assert_eq!(payload.stream.as_deref(), Some("quotes"));
3093        assert_eq!(payload.partition.as_deref(), Some("7"));
3094        assert_eq!(payload.offset.as_deref(), Some("42"));
3095        assert_eq!(payload.key.as_deref(), Some("AAPL"));
3096        assert_eq!(payload.timestamp.as_deref(), Some("2026-04-21T12:00:00Z"));
3097    }
3098}