Skip to main content

harn_vm/triggers/
event.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, OnceLock, RwLock};
3
4use serde::{Deserialize, Deserializer, Serialize, Serializer};
5use serde_json::Value as JsonValue;
6use time::OffsetDateTime;
7use uuid::Uuid;
8
9use crate::triggers::test_util::clock;
10
11const REDACTED_HEADER_VALUE: &str = "[redacted]";
12
13fn serialize_optional_bytes_b64<S>(
14    value: &Option<Vec<u8>>,
15    serializer: S,
16) -> Result<S::Ok, S::Error>
17where
18    S: Serializer,
19{
20    use base64::Engine;
21
22    match value {
23        Some(bytes) => {
24            serializer.serialize_some(&base64::engine::general_purpose::STANDARD.encode(bytes))
25        }
26        None => serializer.serialize_none(),
27    }
28}
29
30fn deserialize_optional_bytes_b64<'de, D>(deserializer: D) -> Result<Option<Vec<u8>>, D::Error>
31where
32    D: Deserializer<'de>,
33{
34    use base64::Engine;
35
36    let encoded = Option::<String>::deserialize(deserializer)?;
37    encoded
38        .map(|text| {
39            base64::engine::general_purpose::STANDARD
40                .decode(text.as_bytes())
41                .map_err(serde::de::Error::custom)
42        })
43        .transpose()
44}
45
46#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
47#[serde(transparent)]
48pub struct TriggerEventId(pub String);
49
50impl TriggerEventId {
51    pub fn new() -> Self {
52        Self(format!("trigger_evt_{}", Uuid::now_v7()))
53    }
54}
55
56impl Default for TriggerEventId {
57    fn default() -> Self {
58        Self::new()
59    }
60}
61
62#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
63#[serde(transparent)]
64pub struct ProviderId(pub String);
65
66impl ProviderId {
67    pub fn new(value: impl Into<String>) -> Self {
68        Self(value.into())
69    }
70
71    pub fn as_str(&self) -> &str {
72        self.0.as_str()
73    }
74}
75
76impl From<&str> for ProviderId {
77    fn from(value: &str) -> Self {
78        Self::new(value)
79    }
80}
81
82impl From<String> for ProviderId {
83    fn from(value: String) -> Self {
84        Self::new(value)
85    }
86}
87
88#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
89#[serde(transparent)]
90pub struct TraceId(pub String);
91
92impl TraceId {
93    pub fn new() -> Self {
94        Self(format!("trace_{}", Uuid::now_v7()))
95    }
96}
97
98impl Default for TraceId {
99    fn default() -> Self {
100        Self::new()
101    }
102}
103
104#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
105#[serde(transparent)]
106pub struct TenantId(pub String);
107
108impl TenantId {
109    pub fn new(value: impl Into<String>) -> Self {
110        Self(value.into())
111    }
112}
113
114#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
115#[serde(tag = "state", rename_all = "snake_case")]
116pub enum SignatureStatus {
117    Verified,
118    Unsigned,
119    Failed { reason: String },
120}
121
122#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
123pub struct GitHubEventCommon {
124    pub event: String,
125    pub action: Option<String>,
126    pub delivery_id: Option<String>,
127    pub installation_id: Option<i64>,
128    pub raw: JsonValue,
129}
130
131#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
132pub struct GitHubIssuesEventPayload {
133    #[serde(flatten)]
134    pub common: GitHubEventCommon,
135    pub issue: JsonValue,
136}
137
138#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
139pub struct GitHubPullRequestEventPayload {
140    #[serde(flatten)]
141    pub common: GitHubEventCommon,
142    pub pull_request: JsonValue,
143}
144
145#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
146pub struct GitHubIssueCommentEventPayload {
147    #[serde(flatten)]
148    pub common: GitHubEventCommon,
149    pub issue: JsonValue,
150    pub comment: JsonValue,
151}
152
153#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
154pub struct GitHubPullRequestReviewEventPayload {
155    #[serde(flatten)]
156    pub common: GitHubEventCommon,
157    pub pull_request: JsonValue,
158    pub review: JsonValue,
159}
160
161#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
162pub struct GitHubPushEventPayload {
163    #[serde(flatten)]
164    pub common: GitHubEventCommon,
165    #[serde(default)]
166    pub commits: Vec<JsonValue>,
167    pub distinct_size: Option<i64>,
168}
169
170#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
171pub struct GitHubWorkflowRunEventPayload {
172    #[serde(flatten)]
173    pub common: GitHubEventCommon,
174    pub workflow_run: JsonValue,
175}
176
177#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
178pub struct GitHubDeploymentStatusEventPayload {
179    #[serde(flatten)]
180    pub common: GitHubEventCommon,
181    pub deployment_status: JsonValue,
182    pub deployment: JsonValue,
183}
184
185#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
186pub struct GitHubCheckRunEventPayload {
187    #[serde(flatten)]
188    pub common: GitHubEventCommon,
189    pub check_run: JsonValue,
190}
191
192#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
193#[serde(untagged)]
194pub enum GitHubEventPayload {
195    Issues(GitHubIssuesEventPayload),
196    PullRequest(GitHubPullRequestEventPayload),
197    IssueComment(GitHubIssueCommentEventPayload),
198    PullRequestReview(GitHubPullRequestReviewEventPayload),
199    Push(GitHubPushEventPayload),
200    WorkflowRun(GitHubWorkflowRunEventPayload),
201    DeploymentStatus(GitHubDeploymentStatusEventPayload),
202    CheckRun(GitHubCheckRunEventPayload),
203    Other(GitHubEventCommon),
204}
205
206#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
207pub struct SlackEventCommon {
208    pub event: String,
209    pub event_id: Option<String>,
210    pub api_app_id: Option<String>,
211    pub team_id: Option<String>,
212    pub channel_id: Option<String>,
213    pub user_id: Option<String>,
214    pub event_ts: Option<String>,
215    pub raw: JsonValue,
216}
217
218#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
219pub struct SlackMessageEventPayload {
220    #[serde(flatten)]
221    pub common: SlackEventCommon,
222    pub subtype: Option<String>,
223    pub channel_type: Option<String>,
224    pub channel: Option<String>,
225    pub user: Option<String>,
226    pub text: Option<String>,
227    pub ts: Option<String>,
228    pub thread_ts: Option<String>,
229}
230
231#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
232pub struct SlackAppMentionEventPayload {
233    #[serde(flatten)]
234    pub common: SlackEventCommon,
235    pub channel: Option<String>,
236    pub user: Option<String>,
237    pub text: Option<String>,
238    pub ts: Option<String>,
239    pub thread_ts: Option<String>,
240}
241
242#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
243pub struct SlackReactionAddedEventPayload {
244    #[serde(flatten)]
245    pub common: SlackEventCommon,
246    pub reaction: Option<String>,
247    pub item_user: Option<String>,
248    pub item: JsonValue,
249}
250
251#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
252pub struct SlackAppHomeOpenedEventPayload {
253    #[serde(flatten)]
254    pub common: SlackEventCommon,
255    pub user: Option<String>,
256    pub channel: Option<String>,
257    pub tab: Option<String>,
258    pub view: JsonValue,
259}
260
261#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
262pub struct SlackAssistantThreadStartedEventPayload {
263    #[serde(flatten)]
264    pub common: SlackEventCommon,
265    pub assistant_thread: JsonValue,
266    pub thread_ts: Option<String>,
267    pub context: JsonValue,
268}
269
270#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
271#[serde(untagged)]
272pub enum SlackEventPayload {
273    Message(SlackMessageEventPayload),
274    AppMention(SlackAppMentionEventPayload),
275    ReactionAdded(SlackReactionAddedEventPayload),
276    AppHomeOpened(SlackAppHomeOpenedEventPayload),
277    AssistantThreadStarted(SlackAssistantThreadStartedEventPayload),
278    Other(SlackEventCommon),
279}
280
281#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
282pub struct LinearEventCommon {
283    pub event: String,
284    pub action: Option<String>,
285    pub delivery_id: Option<String>,
286    pub organization_id: Option<String>,
287    pub webhook_timestamp: Option<i64>,
288    pub webhook_id: Option<String>,
289    pub url: Option<String>,
290    pub created_at: Option<String>,
291    pub actor: JsonValue,
292    pub raw: JsonValue,
293}
294
295#[derive(Clone, Debug, PartialEq)]
296pub enum LinearIssueChange {
297    Title { previous: Option<String> },
298    Description { previous: Option<String> },
299    Priority { previous: Option<i64> },
300    Estimate { previous: Option<i64> },
301    StateId { previous: Option<String> },
302    TeamId { previous: Option<String> },
303    AssigneeId { previous: Option<String> },
304    ProjectId { previous: Option<String> },
305    CycleId { previous: Option<String> },
306    DueDate { previous: Option<String> },
307    ParentId { previous: Option<String> },
308    SortOrder { previous: Option<f64> },
309    LabelIds { previous: Vec<String> },
310    CompletedAt { previous: Option<String> },
311    Other { field: String, previous: JsonValue },
312}
313
314impl Serialize for LinearIssueChange {
315    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
316    where
317        S: Serializer,
318    {
319        let value = match self {
320            Self::Title { previous } => {
321                serde_json::json!({ "field_name": "title", "previous": previous })
322            }
323            Self::Description { previous } => {
324                serde_json::json!({ "field_name": "description", "previous": previous })
325            }
326            Self::Priority { previous } => {
327                serde_json::json!({ "field_name": "priority", "previous": previous })
328            }
329            Self::Estimate { previous } => {
330                serde_json::json!({ "field_name": "estimate", "previous": previous })
331            }
332            Self::StateId { previous } => {
333                serde_json::json!({ "field_name": "state_id", "previous": previous })
334            }
335            Self::TeamId { previous } => {
336                serde_json::json!({ "field_name": "team_id", "previous": previous })
337            }
338            Self::AssigneeId { previous } => {
339                serde_json::json!({ "field_name": "assignee_id", "previous": previous })
340            }
341            Self::ProjectId { previous } => {
342                serde_json::json!({ "field_name": "project_id", "previous": previous })
343            }
344            Self::CycleId { previous } => {
345                serde_json::json!({ "field_name": "cycle_id", "previous": previous })
346            }
347            Self::DueDate { previous } => {
348                serde_json::json!({ "field_name": "due_date", "previous": previous })
349            }
350            Self::ParentId { previous } => {
351                serde_json::json!({ "field_name": "parent_id", "previous": previous })
352            }
353            Self::SortOrder { previous } => {
354                serde_json::json!({ "field_name": "sort_order", "previous": previous })
355            }
356            Self::LabelIds { previous } => {
357                serde_json::json!({ "field_name": "label_ids", "previous": previous })
358            }
359            Self::CompletedAt { previous } => {
360                serde_json::json!({ "field_name": "completed_at", "previous": previous })
361            }
362            Self::Other { field, previous } => {
363                serde_json::json!({ "field_name": "other", "field": field, "previous": previous })
364            }
365        };
366        value.serialize(serializer)
367    }
368}
369
370impl<'de> Deserialize<'de> for LinearIssueChange {
371    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
372    where
373        D: Deserializer<'de>,
374    {
375        let value = JsonValue::deserialize(deserializer)?;
376        let field_name = value
377            .get("field_name")
378            .and_then(JsonValue::as_str)
379            .ok_or_else(|| serde::de::Error::custom("linear issue change missing field_name"))?;
380        let previous = value.get("previous").cloned().unwrap_or(JsonValue::Null);
381        Ok(match field_name {
382            "title" => Self::Title {
383                previous: previous.as_str().map(ToString::to_string),
384            },
385            "description" => Self::Description {
386                previous: previous.as_str().map(ToString::to_string),
387            },
388            "priority" => Self::Priority {
389                previous: parse_json_i64ish(&previous),
390            },
391            "estimate" => Self::Estimate {
392                previous: parse_json_i64ish(&previous),
393            },
394            "state_id" => Self::StateId {
395                previous: previous.as_str().map(ToString::to_string),
396            },
397            "team_id" => Self::TeamId {
398                previous: previous.as_str().map(ToString::to_string),
399            },
400            "assignee_id" => Self::AssigneeId {
401                previous: previous.as_str().map(ToString::to_string),
402            },
403            "project_id" => Self::ProjectId {
404                previous: previous.as_str().map(ToString::to_string),
405            },
406            "cycle_id" => Self::CycleId {
407                previous: previous.as_str().map(ToString::to_string),
408            },
409            "due_date" => Self::DueDate {
410                previous: previous.as_str().map(ToString::to_string),
411            },
412            "parent_id" => Self::ParentId {
413                previous: previous.as_str().map(ToString::to_string),
414            },
415            "sort_order" => Self::SortOrder {
416                previous: previous.as_f64(),
417            },
418            "label_ids" => Self::LabelIds {
419                previous: parse_string_array(&previous),
420            },
421            "completed_at" => Self::CompletedAt {
422                previous: previous.as_str().map(ToString::to_string),
423            },
424            "other" => Self::Other {
425                field: value
426                    .get("field")
427                    .and_then(JsonValue::as_str)
428                    .map(ToString::to_string)
429                    .unwrap_or_else(|| "unknown".to_string()),
430                previous,
431            },
432            other => Self::Other {
433                field: other.to_string(),
434                previous,
435            },
436        })
437    }
438}
439
440#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
441pub struct LinearIssueEventPayload {
442    #[serde(flatten)]
443    pub common: LinearEventCommon,
444    pub issue: JsonValue,
445    #[serde(default)]
446    pub changes: Vec<LinearIssueChange>,
447}
448
449#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
450pub struct LinearIssueCommentEventPayload {
451    #[serde(flatten)]
452    pub common: LinearEventCommon,
453    pub comment: JsonValue,
454}
455
456#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
457pub struct LinearIssueLabelEventPayload {
458    #[serde(flatten)]
459    pub common: LinearEventCommon,
460    pub label: JsonValue,
461}
462
463#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
464pub struct LinearProjectEventPayload {
465    #[serde(flatten)]
466    pub common: LinearEventCommon,
467    pub project: JsonValue,
468}
469
470#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
471pub struct LinearCycleEventPayload {
472    #[serde(flatten)]
473    pub common: LinearEventCommon,
474    pub cycle: JsonValue,
475}
476
477#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
478pub struct LinearCustomerEventPayload {
479    #[serde(flatten)]
480    pub common: LinearEventCommon,
481    pub customer: JsonValue,
482}
483
484#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
485pub struct LinearCustomerRequestEventPayload {
486    #[serde(flatten)]
487    pub common: LinearEventCommon,
488    pub customer_request: JsonValue,
489}
490
491#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
492#[serde(untagged)]
493pub enum LinearEventPayload {
494    Issue(LinearIssueEventPayload),
495    IssueComment(LinearIssueCommentEventPayload),
496    IssueLabel(LinearIssueLabelEventPayload),
497    Project(LinearProjectEventPayload),
498    Cycle(LinearCycleEventPayload),
499    Customer(LinearCustomerEventPayload),
500    CustomerRequest(LinearCustomerRequestEventPayload),
501    Other(LinearEventCommon),
502}
503
504#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
505pub struct NotionPolledChangeEvent {
506    pub resource: String,
507    pub source_id: String,
508    pub entity_id: String,
509    pub high_water_mark: String,
510    pub before: Option<JsonValue>,
511    pub after: JsonValue,
512}
513
514#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
515pub struct NotionEventPayload {
516    pub event: String,
517    pub workspace_id: Option<String>,
518    pub request_id: Option<String>,
519    pub subscription_id: Option<String>,
520    pub integration_id: Option<String>,
521    pub attempt_number: Option<u32>,
522    pub entity_id: Option<String>,
523    pub entity_type: Option<String>,
524    pub api_version: Option<String>,
525    pub verification_token: Option<String>,
526    pub polled: Option<NotionPolledChangeEvent>,
527    pub raw: JsonValue,
528}
529
530#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
531pub struct CronEventPayload {
532    pub cron_id: Option<String>,
533    pub schedule: Option<String>,
534    #[serde(with = "time::serde::rfc3339")]
535    pub tick_at: OffsetDateTime,
536    pub raw: JsonValue,
537}
538
539#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
540pub struct GenericWebhookPayload {
541    pub source: Option<String>,
542    pub content_type: Option<String>,
543    pub raw: JsonValue,
544}
545
546#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
547pub struct A2aPushPayload {
548    pub task_id: Option<String>,
549    pub sender: Option<String>,
550    pub raw: JsonValue,
551}
552
553#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
554pub struct StreamEventPayload {
555    pub event: String,
556    pub source: Option<String>,
557    pub stream: Option<String>,
558    pub partition: Option<String>,
559    pub offset: Option<String>,
560    pub key: Option<String>,
561    pub timestamp: Option<String>,
562    #[serde(default)]
563    pub headers: BTreeMap<String, String>,
564    pub raw: JsonValue,
565}
566
567#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
568pub struct ExtensionProviderPayload {
569    pub provider: String,
570    pub schema_name: String,
571    pub raw: JsonValue,
572}
573
574#[allow(clippy::large_enum_variant)]
575#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
576#[serde(untagged)]
577pub enum ProviderPayload {
578    Known(KnownProviderPayload),
579    Extension(ExtensionProviderPayload),
580}
581
582impl ProviderPayload {
583    pub fn provider(&self) -> &str {
584        match self {
585            Self::Known(known) => known.provider(),
586            Self::Extension(payload) => payload.provider.as_str(),
587        }
588    }
589
590    pub fn normalize(
591        provider: &ProviderId,
592        kind: &str,
593        headers: &BTreeMap<String, String>,
594        raw: JsonValue,
595    ) -> Result<Self, ProviderCatalogError> {
596        provider_catalog()
597            .read()
598            .expect("provider catalog poisoned")
599            .normalize(provider, kind, headers, raw)
600    }
601}
602
603#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
604#[serde(tag = "provider")]
605pub enum KnownProviderPayload {
606    #[serde(rename = "github")]
607    GitHub(GitHubEventPayload),
608    #[serde(rename = "slack")]
609    Slack(Box<SlackEventPayload>),
610    #[serde(rename = "linear")]
611    Linear(LinearEventPayload),
612    #[serde(rename = "notion")]
613    Notion(Box<NotionEventPayload>),
614    #[serde(rename = "cron")]
615    Cron(CronEventPayload),
616    #[serde(rename = "webhook")]
617    Webhook(GenericWebhookPayload),
618    #[serde(rename = "a2a-push")]
619    A2aPush(A2aPushPayload),
620    #[serde(rename = "kafka")]
621    Kafka(StreamEventPayload),
622    #[serde(rename = "nats")]
623    Nats(StreamEventPayload),
624    #[serde(rename = "pulsar")]
625    Pulsar(StreamEventPayload),
626    #[serde(rename = "postgres-cdc")]
627    PostgresCdc(StreamEventPayload),
628    #[serde(rename = "email")]
629    Email(StreamEventPayload),
630    #[serde(rename = "websocket")]
631    Websocket(StreamEventPayload),
632}
633
634impl KnownProviderPayload {
635    pub fn provider(&self) -> &str {
636        match self {
637            Self::GitHub(_) => "github",
638            Self::Slack(_) => "slack",
639            Self::Linear(_) => "linear",
640            Self::Notion(_) => "notion",
641            Self::Cron(_) => "cron",
642            Self::Webhook(_) => "webhook",
643            Self::A2aPush(_) => "a2a-push",
644            Self::Kafka(_) => "kafka",
645            Self::Nats(_) => "nats",
646            Self::Pulsar(_) => "pulsar",
647            Self::PostgresCdc(_) => "postgres-cdc",
648            Self::Email(_) => "email",
649            Self::Websocket(_) => "websocket",
650        }
651    }
652}
653
654#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
655pub struct TriggerEvent {
656    pub id: TriggerEventId,
657    pub provider: ProviderId,
658    pub kind: String,
659    #[serde(with = "time::serde::rfc3339")]
660    pub received_at: OffsetDateTime,
661    #[serde(with = "time::serde::rfc3339::option")]
662    pub occurred_at: Option<OffsetDateTime>,
663    pub dedupe_key: String,
664    pub trace_id: TraceId,
665    pub tenant_id: Option<TenantId>,
666    pub headers: BTreeMap<String, String>,
667    #[serde(default, skip_serializing_if = "Option::is_none")]
668    pub batch: Option<Vec<JsonValue>>,
669    #[serde(
670        default,
671        skip_serializing_if = "Option::is_none",
672        serialize_with = "serialize_optional_bytes_b64",
673        deserialize_with = "deserialize_optional_bytes_b64"
674    )]
675    pub raw_body: Option<Vec<u8>>,
676    pub provider_payload: ProviderPayload,
677    pub signature_status: SignatureStatus,
678    #[serde(skip)]
679    pub dedupe_claimed: bool,
680}
681
682impl TriggerEvent {
683    #[allow(clippy::too_many_arguments)]
684    pub fn new(
685        provider: ProviderId,
686        kind: impl Into<String>,
687        occurred_at: Option<OffsetDateTime>,
688        dedupe_key: impl Into<String>,
689        tenant_id: Option<TenantId>,
690        headers: BTreeMap<String, String>,
691        provider_payload: ProviderPayload,
692        signature_status: SignatureStatus,
693    ) -> Self {
694        Self {
695            id: TriggerEventId::new(),
696            provider,
697            kind: kind.into(),
698            received_at: clock::now_utc(),
699            occurred_at,
700            dedupe_key: dedupe_key.into(),
701            trace_id: TraceId::new(),
702            tenant_id,
703            headers,
704            batch: None,
705            raw_body: None,
706            provider_payload,
707            signature_status,
708            dedupe_claimed: false,
709        }
710    }
711
712    pub fn dedupe_claimed(&self) -> bool {
713        self.dedupe_claimed
714    }
715
716    pub fn mark_dedupe_claimed(&mut self) {
717        self.dedupe_claimed = true;
718    }
719}
720
721#[derive(Clone, Debug, PartialEq, Eq)]
722pub struct HeaderRedactionPolicy {
723    safe_exact_names: BTreeSet<String>,
724}
725
726impl HeaderRedactionPolicy {
727    pub fn with_safe_header(mut self, name: impl Into<String>) -> Self {
728        self.safe_exact_names
729            .insert(name.into().to_ascii_lowercase());
730        self
731    }
732
733    fn should_keep(&self, name: &str) -> bool {
734        let lower = name.to_ascii_lowercase();
735        if self.safe_exact_names.contains(lower.as_str()) {
736            return true;
737        }
738        matches!(
739            lower.as_str(),
740            "user-agent"
741                | "request-id"
742                | "x-request-id"
743                | "x-correlation-id"
744                | "content-type"
745                | "content-length"
746                | "x-github-event"
747                | "x-github-delivery"
748                | "x-github-hook-id"
749                | "x-hub-signature-256"
750                | "x-slack-request-timestamp"
751                | "x-slack-signature"
752                | "x-linear-signature"
753                | "x-notion-signature"
754                | "x-a2a-signature"
755                | "x-a2a-delivery"
756        ) || lower.ends_with("-event")
757            || lower.ends_with("-delivery")
758            || lower.contains("timestamp")
759            || lower.contains("request-id")
760    }
761
762    fn should_redact(&self, name: &str) -> bool {
763        let lower = name.to_ascii_lowercase();
764        if self.should_keep(lower.as_str()) {
765            return false;
766        }
767        lower.contains("authorization")
768            || lower.contains("cookie")
769            || lower.contains("secret")
770            || lower.contains("token")
771            || lower.contains("key")
772    }
773}
774
775impl Default for HeaderRedactionPolicy {
776    fn default() -> Self {
777        Self {
778            safe_exact_names: BTreeSet::from([
779                "content-length".to_string(),
780                "content-type".to_string(),
781                "request-id".to_string(),
782                "user-agent".to_string(),
783                "x-a2a-delivery".to_string(),
784                "x-a2a-signature".to_string(),
785                "x-correlation-id".to_string(),
786                "x-github-delivery".to_string(),
787                "x-github-event".to_string(),
788                "x-github-hook-id".to_string(),
789                "x-hub-signature-256".to_string(),
790                "x-linear-signature".to_string(),
791                "x-notion-signature".to_string(),
792                "x-request-id".to_string(),
793                "x-slack-request-timestamp".to_string(),
794                "x-slack-signature".to_string(),
795            ]),
796        }
797    }
798}
799
800pub fn redact_headers(
801    headers: &BTreeMap<String, String>,
802    policy: &HeaderRedactionPolicy,
803) -> BTreeMap<String, String> {
804    headers
805        .iter()
806        .map(|(name, value)| {
807            if policy.should_redact(name) {
808                (name.clone(), REDACTED_HEADER_VALUE.to_string())
809            } else {
810                (name.clone(), value.clone())
811            }
812        })
813        .collect()
814}
815
816#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
817pub struct ProviderSecretRequirement {
818    pub name: String,
819    pub required: bool,
820    pub namespace: String,
821}
822
823#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
824pub struct ProviderOutboundMethod {
825    pub name: String,
826}
827
828#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
829#[serde(tag = "kind", rename_all = "snake_case")]
830pub enum SignatureVerificationMetadata {
831    #[default]
832    None,
833    Hmac {
834        variant: String,
835        raw_body: bool,
836        signature_header: String,
837        timestamp_header: Option<String>,
838        id_header: Option<String>,
839        default_tolerance_secs: Option<i64>,
840        digest: String,
841        encoding: String,
842    },
843}
844
845#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
846#[serde(tag = "kind", rename_all = "snake_case")]
847pub enum ProviderRuntimeMetadata {
848    Builtin {
849        connector: String,
850        default_signature_variant: Option<String>,
851    },
852    #[default]
853    Placeholder,
854}
855
856#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
857pub struct ProviderMetadata {
858    pub provider: String,
859    #[serde(default)]
860    pub kinds: Vec<String>,
861    pub schema_name: String,
862    #[serde(default)]
863    pub outbound_methods: Vec<ProviderOutboundMethod>,
864    #[serde(default)]
865    pub secret_requirements: Vec<ProviderSecretRequirement>,
866    #[serde(default)]
867    pub signature_verification: SignatureVerificationMetadata,
868    #[serde(default)]
869    pub runtime: ProviderRuntimeMetadata,
870}
871
872impl ProviderMetadata {
873    pub fn supports_kind(&self, kind: &str) -> bool {
874        self.kinds.iter().any(|candidate| candidate == kind)
875    }
876
877    pub fn required_secret_names(&self) -> impl Iterator<Item = &str> {
878        self.secret_requirements
879            .iter()
880            .filter(|requirement| requirement.required)
881            .map(|requirement| requirement.name.as_str())
882    }
883}
884
885pub trait ProviderSchema: Send + Sync {
886    fn provider_id(&self) -> &'static str;
887    fn harn_schema_name(&self) -> &'static str;
888    fn metadata(&self) -> ProviderMetadata {
889        ProviderMetadata {
890            provider: self.provider_id().to_string(),
891            schema_name: self.harn_schema_name().to_string(),
892            ..ProviderMetadata::default()
893        }
894    }
895    fn normalize(
896        &self,
897        kind: &str,
898        headers: &BTreeMap<String, String>,
899        raw: JsonValue,
900    ) -> Result<ProviderPayload, ProviderCatalogError>;
901}
902
903#[derive(Clone, Debug, PartialEq, Eq)]
904pub enum ProviderCatalogError {
905    DuplicateProvider(String),
906    UnknownProvider(String),
907}
908
909impl std::fmt::Display for ProviderCatalogError {
910    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
911        match self {
912            Self::DuplicateProvider(provider) => {
913                write!(f, "provider `{provider}` is already registered")
914            }
915            Self::UnknownProvider(provider) => write!(f, "provider `{provider}` is not registered"),
916        }
917    }
918}
919
920impl std::error::Error for ProviderCatalogError {}
921
922#[derive(Clone, Default)]
923pub struct ProviderCatalog {
924    providers: BTreeMap<String, Arc<dyn ProviderSchema>>,
925}
926
927impl ProviderCatalog {
928    pub fn with_defaults() -> Self {
929        let mut catalog = Self::default();
930        for schema in default_provider_schemas() {
931            catalog
932                .register(schema)
933                .expect("default providers must register cleanly");
934        }
935        catalog
936    }
937
938    pub fn register(
939        &mut self,
940        schema: Arc<dyn ProviderSchema>,
941    ) -> Result<(), ProviderCatalogError> {
942        let provider = schema.provider_id().to_string();
943        if self.providers.contains_key(provider.as_str()) {
944            return Err(ProviderCatalogError::DuplicateProvider(provider));
945        }
946        self.providers.insert(provider, schema);
947        Ok(())
948    }
949
950    pub fn normalize(
951        &self,
952        provider: &ProviderId,
953        kind: &str,
954        headers: &BTreeMap<String, String>,
955        raw: JsonValue,
956    ) -> Result<ProviderPayload, ProviderCatalogError> {
957        let schema = self
958            .providers
959            .get(provider.as_str())
960            .ok_or_else(|| ProviderCatalogError::UnknownProvider(provider.0.clone()))?;
961        schema.normalize(kind, headers, raw)
962    }
963
964    pub fn schema_names(&self) -> BTreeMap<String, String> {
965        self.providers
966            .iter()
967            .map(|(provider, schema)| (provider.clone(), schema.harn_schema_name().to_string()))
968            .collect()
969    }
970
971    pub fn entries(&self) -> Vec<ProviderMetadata> {
972        self.providers
973            .values()
974            .map(|schema| schema.metadata())
975            .collect()
976    }
977
978    pub fn metadata_for(&self, provider: &str) -> Option<ProviderMetadata> {
979        self.providers.get(provider).map(|schema| schema.metadata())
980    }
981}
982
983pub fn register_provider_schema(
984    schema: Arc<dyn ProviderSchema>,
985) -> Result<(), ProviderCatalogError> {
986    provider_catalog()
987        .write()
988        .expect("provider catalog poisoned")
989        .register(schema)
990}
991
992pub fn reset_provider_catalog() {
993    *provider_catalog()
994        .write()
995        .expect("provider catalog poisoned") = ProviderCatalog::with_defaults();
996}
997
998pub fn registered_provider_schema_names() -> BTreeMap<String, String> {
999    provider_catalog()
1000        .read()
1001        .expect("provider catalog poisoned")
1002        .schema_names()
1003}
1004
1005pub fn registered_provider_metadata() -> Vec<ProviderMetadata> {
1006    provider_catalog()
1007        .read()
1008        .expect("provider catalog poisoned")
1009        .entries()
1010}
1011
1012pub fn provider_metadata(provider: &str) -> Option<ProviderMetadata> {
1013    provider_catalog()
1014        .read()
1015        .expect("provider catalog poisoned")
1016        .metadata_for(provider)
1017}
1018
1019fn provider_catalog() -> &'static RwLock<ProviderCatalog> {
1020    static PROVIDER_CATALOG: OnceLock<RwLock<ProviderCatalog>> = OnceLock::new();
1021    PROVIDER_CATALOG.get_or_init(|| RwLock::new(ProviderCatalog::with_defaults()))
1022}
1023
1024struct BuiltinProviderSchema {
1025    provider_id: &'static str,
1026    harn_schema_name: &'static str,
1027    metadata: ProviderMetadata,
1028    normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
1029}
1030
1031impl ProviderSchema for BuiltinProviderSchema {
1032    fn provider_id(&self) -> &'static str {
1033        self.provider_id
1034    }
1035
1036    fn harn_schema_name(&self) -> &'static str {
1037        self.harn_schema_name
1038    }
1039
1040    fn metadata(&self) -> ProviderMetadata {
1041        self.metadata.clone()
1042    }
1043
1044    fn normalize(
1045        &self,
1046        kind: &str,
1047        headers: &BTreeMap<String, String>,
1048        raw: JsonValue,
1049    ) -> Result<ProviderPayload, ProviderCatalogError> {
1050        Ok((self.normalize)(kind, headers, raw))
1051    }
1052}
1053
1054fn provider_metadata_entry(
1055    provider: &str,
1056    kinds: &[&str],
1057    schema_name: &str,
1058    outbound_methods: &[&str],
1059    signature_verification: SignatureVerificationMetadata,
1060    secret_requirements: Vec<ProviderSecretRequirement>,
1061    runtime: ProviderRuntimeMetadata,
1062) -> ProviderMetadata {
1063    ProviderMetadata {
1064        provider: provider.to_string(),
1065        kinds: kinds.iter().map(|kind| kind.to_string()).collect(),
1066        schema_name: schema_name.to_string(),
1067        outbound_methods: outbound_methods
1068            .iter()
1069            .map(|name| ProviderOutboundMethod {
1070                name: (*name).to_string(),
1071            })
1072            .collect(),
1073        secret_requirements,
1074        signature_verification,
1075        runtime,
1076    }
1077}
1078
1079fn hmac_signature_metadata(
1080    variant: &str,
1081    signature_header: &str,
1082    timestamp_header: Option<&str>,
1083    id_header: Option<&str>,
1084    default_tolerance_secs: Option<i64>,
1085    encoding: &str,
1086) -> SignatureVerificationMetadata {
1087    SignatureVerificationMetadata::Hmac {
1088        variant: variant.to_string(),
1089        raw_body: true,
1090        signature_header: signature_header.to_string(),
1091        timestamp_header: timestamp_header.map(ToString::to_string),
1092        id_header: id_header.map(ToString::to_string),
1093        default_tolerance_secs,
1094        digest: "sha256".to_string(),
1095        encoding: encoding.to_string(),
1096    }
1097}
1098
1099fn required_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1100    ProviderSecretRequirement {
1101        name: name.to_string(),
1102        required: true,
1103        namespace: namespace.to_string(),
1104    }
1105}
1106
1107fn outbound_method(name: &str) -> ProviderOutboundMethod {
1108    ProviderOutboundMethod {
1109        name: name.to_string(),
1110    }
1111}
1112
1113fn optional_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1114    ProviderSecretRequirement {
1115        name: name.to_string(),
1116        required: false,
1117        namespace: namespace.to_string(),
1118    }
1119}
1120
1121fn default_provider_schemas() -> Vec<Arc<dyn ProviderSchema>> {
1122    vec![
1123        Arc::new(BuiltinProviderSchema {
1124            provider_id: "github",
1125            harn_schema_name: "GitHubEventPayload",
1126            metadata: provider_metadata_entry(
1127                "github",
1128                &["webhook"],
1129                "GitHubEventPayload",
1130                &[],
1131                hmac_signature_metadata(
1132                    "github",
1133                    "X-Hub-Signature-256",
1134                    None,
1135                    Some("X-GitHub-Delivery"),
1136                    None,
1137                    "hex",
1138                ),
1139                vec![required_secret("signing_secret", "github")],
1140                ProviderRuntimeMetadata::Builtin {
1141                    connector: "webhook".to_string(),
1142                    default_signature_variant: Some("github".to_string()),
1143                },
1144            ),
1145            normalize: github_payload,
1146        }),
1147        Arc::new(BuiltinProviderSchema {
1148            provider_id: "slack",
1149            harn_schema_name: "SlackEventPayload",
1150            metadata: provider_metadata_entry(
1151                "slack",
1152                &["webhook"],
1153                "SlackEventPayload",
1154                &[
1155                    "post_message",
1156                    "update_message",
1157                    "add_reaction",
1158                    "open_view",
1159                    "user_info",
1160                    "api_call",
1161                    "upload_file",
1162                ],
1163                hmac_signature_metadata(
1164                    "slack",
1165                    "X-Slack-Signature",
1166                    Some("X-Slack-Request-Timestamp"),
1167                    None,
1168                    Some(300),
1169                    "hex",
1170                ),
1171                vec![required_secret("signing_secret", "slack")],
1172                ProviderRuntimeMetadata::Builtin {
1173                    connector: "slack".to_string(),
1174                    default_signature_variant: Some("slack".to_string()),
1175                },
1176            ),
1177            normalize: slack_payload,
1178        }),
1179        Arc::new(BuiltinProviderSchema {
1180            provider_id: "linear",
1181            harn_schema_name: "LinearEventPayload",
1182            metadata: {
1183                let mut metadata = provider_metadata_entry(
1184                    "linear",
1185                    &["webhook"],
1186                    "LinearEventPayload",
1187                    &[],
1188                    hmac_signature_metadata(
1189                        "linear",
1190                        "Linear-Signature",
1191                        None,
1192                        Some("Linear-Delivery"),
1193                        Some(75),
1194                        "hex",
1195                    ),
1196                    vec![
1197                        required_secret("signing_secret", "linear"),
1198                        optional_secret("access_token", "linear"),
1199                    ],
1200                    ProviderRuntimeMetadata::Builtin {
1201                        connector: "linear".to_string(),
1202                        default_signature_variant: Some("linear".to_string()),
1203                    },
1204                );
1205                metadata.outbound_methods = vec![
1206                    ProviderOutboundMethod {
1207                        name: "list_issues".to_string(),
1208                    },
1209                    ProviderOutboundMethod {
1210                        name: "update_issue".to_string(),
1211                    },
1212                    ProviderOutboundMethod {
1213                        name: "create_comment".to_string(),
1214                    },
1215                    ProviderOutboundMethod {
1216                        name: "search".to_string(),
1217                    },
1218                    ProviderOutboundMethod {
1219                        name: "graphql".to_string(),
1220                    },
1221                ];
1222                metadata
1223            },
1224            normalize: linear_payload,
1225        }),
1226        Arc::new(BuiltinProviderSchema {
1227            provider_id: "notion",
1228            harn_schema_name: "NotionEventPayload",
1229            metadata: {
1230                let mut metadata = provider_metadata_entry(
1231                    "notion",
1232                    &["webhook", "poll"],
1233                    "NotionEventPayload",
1234                    &[],
1235                    hmac_signature_metadata(
1236                        "notion",
1237                        "X-Notion-Signature",
1238                        None,
1239                        None,
1240                        None,
1241                        "hex",
1242                    ),
1243                    vec![required_secret("verification_token", "notion")],
1244                    ProviderRuntimeMetadata::Builtin {
1245                        connector: "notion".to_string(),
1246                        default_signature_variant: Some("notion".to_string()),
1247                    },
1248                );
1249                metadata.outbound_methods = vec![
1250                    outbound_method("get_page"),
1251                    outbound_method("update_page"),
1252                    outbound_method("append_blocks"),
1253                    outbound_method("query_database"),
1254                    outbound_method("search"),
1255                    outbound_method("create_comment"),
1256                    outbound_method("api_call"),
1257                ];
1258                metadata
1259            },
1260            normalize: notion_payload,
1261        }),
1262        Arc::new(BuiltinProviderSchema {
1263            provider_id: "cron",
1264            harn_schema_name: "CronEventPayload",
1265            metadata: provider_metadata_entry(
1266                "cron",
1267                &["cron"],
1268                "CronEventPayload",
1269                &[],
1270                SignatureVerificationMetadata::None,
1271                Vec::new(),
1272                ProviderRuntimeMetadata::Builtin {
1273                    connector: "cron".to_string(),
1274                    default_signature_variant: None,
1275                },
1276            ),
1277            normalize: cron_payload,
1278        }),
1279        Arc::new(BuiltinProviderSchema {
1280            provider_id: "webhook",
1281            harn_schema_name: "GenericWebhookPayload",
1282            metadata: provider_metadata_entry(
1283                "webhook",
1284                &["webhook"],
1285                "GenericWebhookPayload",
1286                &[],
1287                hmac_signature_metadata(
1288                    "standard",
1289                    "webhook-signature",
1290                    Some("webhook-timestamp"),
1291                    Some("webhook-id"),
1292                    Some(300),
1293                    "base64",
1294                ),
1295                vec![required_secret("signing_secret", "webhook")],
1296                ProviderRuntimeMetadata::Builtin {
1297                    connector: "webhook".to_string(),
1298                    default_signature_variant: Some("standard".to_string()),
1299                },
1300            ),
1301            normalize: webhook_payload,
1302        }),
1303        Arc::new(BuiltinProviderSchema {
1304            provider_id: "a2a-push",
1305            harn_schema_name: "A2aPushPayload",
1306            metadata: provider_metadata_entry(
1307                "a2a-push",
1308                &["a2a-push"],
1309                "A2aPushPayload",
1310                &[],
1311                SignatureVerificationMetadata::None,
1312                Vec::new(),
1313                ProviderRuntimeMetadata::Placeholder,
1314            ),
1315            normalize: a2a_push_payload,
1316        }),
1317        Arc::new(stream_provider_schema("kafka", kafka_payload)),
1318        Arc::new(stream_provider_schema("nats", nats_payload)),
1319        Arc::new(stream_provider_schema("pulsar", pulsar_payload)),
1320        Arc::new(stream_provider_schema("postgres-cdc", postgres_cdc_payload)),
1321        Arc::new(stream_provider_schema("email", email_payload)),
1322        Arc::new(stream_provider_schema("websocket", websocket_payload)),
1323    ]
1324}
1325
1326fn stream_provider_schema(
1327    provider_id: &'static str,
1328    normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
1329) -> BuiltinProviderSchema {
1330    BuiltinProviderSchema {
1331        provider_id,
1332        harn_schema_name: "StreamEventPayload",
1333        metadata: provider_metadata_entry(
1334            provider_id,
1335            &["stream"],
1336            "StreamEventPayload",
1337            &[],
1338            SignatureVerificationMetadata::None,
1339            Vec::new(),
1340            ProviderRuntimeMetadata::Placeholder,
1341        ),
1342        normalize,
1343    }
1344}
1345
1346fn github_payload(
1347    kind: &str,
1348    headers: &BTreeMap<String, String>,
1349    raw: JsonValue,
1350) -> ProviderPayload {
1351    let common = GitHubEventCommon {
1352        event: kind.to_string(),
1353        action: raw
1354            .get("action")
1355            .and_then(JsonValue::as_str)
1356            .map(ToString::to_string),
1357        delivery_id: headers.get("X-GitHub-Delivery").cloned(),
1358        installation_id: raw
1359            .get("installation")
1360            .and_then(|value| value.get("id"))
1361            .and_then(JsonValue::as_i64),
1362        raw: raw.clone(),
1363    };
1364    let payload = match kind {
1365        "issues" => GitHubEventPayload::Issues(GitHubIssuesEventPayload {
1366            common,
1367            issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1368        }),
1369        "pull_request" => GitHubEventPayload::PullRequest(GitHubPullRequestEventPayload {
1370            common,
1371            pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1372        }),
1373        "issue_comment" => GitHubEventPayload::IssueComment(GitHubIssueCommentEventPayload {
1374            common,
1375            issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1376            comment: raw.get("comment").cloned().unwrap_or(JsonValue::Null),
1377        }),
1378        "pull_request_review" => {
1379            GitHubEventPayload::PullRequestReview(GitHubPullRequestReviewEventPayload {
1380                common,
1381                pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1382                review: raw.get("review").cloned().unwrap_or(JsonValue::Null),
1383            })
1384        }
1385        "push" => GitHubEventPayload::Push(GitHubPushEventPayload {
1386            common,
1387            commits: raw
1388                .get("commits")
1389                .and_then(JsonValue::as_array)
1390                .cloned()
1391                .unwrap_or_default(),
1392            distinct_size: raw.get("distinct_size").and_then(JsonValue::as_i64),
1393        }),
1394        "workflow_run" => GitHubEventPayload::WorkflowRun(GitHubWorkflowRunEventPayload {
1395            common,
1396            workflow_run: raw.get("workflow_run").cloned().unwrap_or(JsonValue::Null),
1397        }),
1398        "deployment_status" => {
1399            GitHubEventPayload::DeploymentStatus(GitHubDeploymentStatusEventPayload {
1400                common,
1401                deployment_status: raw
1402                    .get("deployment_status")
1403                    .cloned()
1404                    .unwrap_or(JsonValue::Null),
1405                deployment: raw.get("deployment").cloned().unwrap_or(JsonValue::Null),
1406            })
1407        }
1408        "check_run" => GitHubEventPayload::CheckRun(GitHubCheckRunEventPayload {
1409            common,
1410            check_run: raw.get("check_run").cloned().unwrap_or(JsonValue::Null),
1411        }),
1412        _ => GitHubEventPayload::Other(common),
1413    };
1414    ProviderPayload::Known(KnownProviderPayload::GitHub(payload))
1415}
1416
1417fn slack_payload(
1418    kind: &str,
1419    _headers: &BTreeMap<String, String>,
1420    raw: JsonValue,
1421) -> ProviderPayload {
1422    let event = raw.get("event");
1423    let common = SlackEventCommon {
1424        event: kind.to_string(),
1425        event_id: raw
1426            .get("event_id")
1427            .and_then(JsonValue::as_str)
1428            .map(ToString::to_string),
1429        api_app_id: raw
1430            .get("api_app_id")
1431            .and_then(JsonValue::as_str)
1432            .map(ToString::to_string),
1433        team_id: raw
1434            .get("team_id")
1435            .and_then(JsonValue::as_str)
1436            .map(ToString::to_string),
1437        channel_id: slack_channel_id(event),
1438        user_id: slack_user_id(event),
1439        event_ts: event
1440            .and_then(|value| value.get("event_ts"))
1441            .and_then(JsonValue::as_str)
1442            .map(ToString::to_string),
1443        raw: raw.clone(),
1444    };
1445    let payload = match kind {
1446        kind if kind == "message" || kind.starts_with("message.") => {
1447            SlackEventPayload::Message(SlackMessageEventPayload {
1448                subtype: event
1449                    .and_then(|value| value.get("subtype"))
1450                    .and_then(JsonValue::as_str)
1451                    .map(ToString::to_string),
1452                channel_type: event
1453                    .and_then(|value| value.get("channel_type"))
1454                    .and_then(JsonValue::as_str)
1455                    .map(ToString::to_string),
1456                channel: event
1457                    .and_then(|value| value.get("channel"))
1458                    .and_then(JsonValue::as_str)
1459                    .map(ToString::to_string),
1460                user: event
1461                    .and_then(|value| value.get("user"))
1462                    .and_then(JsonValue::as_str)
1463                    .map(ToString::to_string),
1464                text: event
1465                    .and_then(|value| value.get("text"))
1466                    .and_then(JsonValue::as_str)
1467                    .map(ToString::to_string),
1468                ts: event
1469                    .and_then(|value| value.get("ts"))
1470                    .and_then(JsonValue::as_str)
1471                    .map(ToString::to_string),
1472                thread_ts: event
1473                    .and_then(|value| value.get("thread_ts"))
1474                    .and_then(JsonValue::as_str)
1475                    .map(ToString::to_string),
1476                common,
1477            })
1478        }
1479        "app_mention" => SlackEventPayload::AppMention(SlackAppMentionEventPayload {
1480            channel: event
1481                .and_then(|value| value.get("channel"))
1482                .and_then(JsonValue::as_str)
1483                .map(ToString::to_string),
1484            user: event
1485                .and_then(|value| value.get("user"))
1486                .and_then(JsonValue::as_str)
1487                .map(ToString::to_string),
1488            text: event
1489                .and_then(|value| value.get("text"))
1490                .and_then(JsonValue::as_str)
1491                .map(ToString::to_string),
1492            ts: event
1493                .and_then(|value| value.get("ts"))
1494                .and_then(JsonValue::as_str)
1495                .map(ToString::to_string),
1496            thread_ts: event
1497                .and_then(|value| value.get("thread_ts"))
1498                .and_then(JsonValue::as_str)
1499                .map(ToString::to_string),
1500            common,
1501        }),
1502        "reaction_added" => SlackEventPayload::ReactionAdded(SlackReactionAddedEventPayload {
1503            reaction: event
1504                .and_then(|value| value.get("reaction"))
1505                .and_then(JsonValue::as_str)
1506                .map(ToString::to_string),
1507            item_user: event
1508                .and_then(|value| value.get("item_user"))
1509                .and_then(JsonValue::as_str)
1510                .map(ToString::to_string),
1511            item: event
1512                .and_then(|value| value.get("item"))
1513                .cloned()
1514                .unwrap_or(JsonValue::Null),
1515            common,
1516        }),
1517        "app_home_opened" => SlackEventPayload::AppHomeOpened(SlackAppHomeOpenedEventPayload {
1518            user: event
1519                .and_then(|value| value.get("user"))
1520                .and_then(JsonValue::as_str)
1521                .map(ToString::to_string),
1522            channel: event
1523                .and_then(|value| value.get("channel"))
1524                .and_then(JsonValue::as_str)
1525                .map(ToString::to_string),
1526            tab: event
1527                .and_then(|value| value.get("tab"))
1528                .and_then(JsonValue::as_str)
1529                .map(ToString::to_string),
1530            view: event
1531                .and_then(|value| value.get("view"))
1532                .cloned()
1533                .unwrap_or(JsonValue::Null),
1534            common,
1535        }),
1536        "assistant_thread_started" => {
1537            let assistant_thread = event
1538                .and_then(|value| value.get("assistant_thread"))
1539                .cloned()
1540                .unwrap_or(JsonValue::Null);
1541            SlackEventPayload::AssistantThreadStarted(SlackAssistantThreadStartedEventPayload {
1542                thread_ts: assistant_thread
1543                    .get("thread_ts")
1544                    .and_then(JsonValue::as_str)
1545                    .map(ToString::to_string),
1546                context: assistant_thread
1547                    .get("context")
1548                    .cloned()
1549                    .unwrap_or(JsonValue::Null),
1550                assistant_thread,
1551                common,
1552            })
1553        }
1554        _ => SlackEventPayload::Other(common),
1555    };
1556    ProviderPayload::Known(KnownProviderPayload::Slack(Box::new(payload)))
1557}
1558
1559fn slack_channel_id(event: Option<&JsonValue>) -> Option<String> {
1560    event
1561        .and_then(|value| value.get("channel"))
1562        .and_then(JsonValue::as_str)
1563        .map(ToString::to_string)
1564        .or_else(|| {
1565            event
1566                .and_then(|value| value.get("item"))
1567                .and_then(|value| value.get("channel"))
1568                .and_then(JsonValue::as_str)
1569                .map(ToString::to_string)
1570        })
1571        .or_else(|| {
1572            event
1573                .and_then(|value| value.get("channel"))
1574                .and_then(|value| value.get("id"))
1575                .and_then(JsonValue::as_str)
1576                .map(ToString::to_string)
1577        })
1578        .or_else(|| {
1579            event
1580                .and_then(|value| value.get("assistant_thread"))
1581                .and_then(|value| value.get("channel_id"))
1582                .and_then(JsonValue::as_str)
1583                .map(ToString::to_string)
1584        })
1585}
1586
1587fn slack_user_id(event: Option<&JsonValue>) -> Option<String> {
1588    event
1589        .and_then(|value| value.get("user"))
1590        .and_then(JsonValue::as_str)
1591        .map(ToString::to_string)
1592        .or_else(|| {
1593            event
1594                .and_then(|value| value.get("user"))
1595                .and_then(|value| value.get("id"))
1596                .and_then(JsonValue::as_str)
1597                .map(ToString::to_string)
1598        })
1599        .or_else(|| {
1600            event
1601                .and_then(|value| value.get("item_user"))
1602                .and_then(JsonValue::as_str)
1603                .map(ToString::to_string)
1604        })
1605        .or_else(|| {
1606            event
1607                .and_then(|value| value.get("assistant_thread"))
1608                .and_then(|value| value.get("user_id"))
1609                .and_then(JsonValue::as_str)
1610                .map(ToString::to_string)
1611        })
1612}
1613
1614fn linear_payload(
1615    _kind: &str,
1616    headers: &BTreeMap<String, String>,
1617    raw: JsonValue,
1618) -> ProviderPayload {
1619    let common = linear_event_common(headers, &raw);
1620    let event = common.event.clone();
1621    let payload = match event.as_str() {
1622        "issue" => LinearEventPayload::Issue(LinearIssueEventPayload {
1623            common,
1624            issue: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1625            changes: parse_linear_issue_changes(raw.get("updatedFrom")),
1626        }),
1627        "comment" => LinearEventPayload::IssueComment(LinearIssueCommentEventPayload {
1628            common,
1629            comment: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1630        }),
1631        "issue_label" => LinearEventPayload::IssueLabel(LinearIssueLabelEventPayload {
1632            common,
1633            label: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1634        }),
1635        "project" => LinearEventPayload::Project(LinearProjectEventPayload {
1636            common,
1637            project: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1638        }),
1639        "cycle" => LinearEventPayload::Cycle(LinearCycleEventPayload {
1640            common,
1641            cycle: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1642        }),
1643        "customer" => LinearEventPayload::Customer(LinearCustomerEventPayload {
1644            common,
1645            customer: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1646        }),
1647        "customer_request" => {
1648            LinearEventPayload::CustomerRequest(LinearCustomerRequestEventPayload {
1649                common,
1650                customer_request: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1651            })
1652        }
1653        _ => LinearEventPayload::Other(common),
1654    };
1655    ProviderPayload::Known(KnownProviderPayload::Linear(payload))
1656}
1657
1658fn linear_event_common(headers: &BTreeMap<String, String>, raw: &JsonValue) -> LinearEventCommon {
1659    LinearEventCommon {
1660        event: linear_event_name(
1661            raw.get("type")
1662                .and_then(JsonValue::as_str)
1663                .or_else(|| headers.get("Linear-Event").map(String::as_str)),
1664        ),
1665        action: raw
1666            .get("action")
1667            .and_then(JsonValue::as_str)
1668            .map(ToString::to_string),
1669        delivery_id: header_value(headers, "Linear-Delivery").map(ToString::to_string),
1670        organization_id: raw
1671            .get("organizationId")
1672            .and_then(JsonValue::as_str)
1673            .map(ToString::to_string),
1674        webhook_timestamp: raw.get("webhookTimestamp").and_then(parse_json_i64ish),
1675        webhook_id: raw
1676            .get("webhookId")
1677            .and_then(JsonValue::as_str)
1678            .map(ToString::to_string),
1679        url: raw
1680            .get("url")
1681            .and_then(JsonValue::as_str)
1682            .map(ToString::to_string),
1683        created_at: raw
1684            .get("createdAt")
1685            .and_then(JsonValue::as_str)
1686            .map(ToString::to_string),
1687        actor: raw.get("actor").cloned().unwrap_or(JsonValue::Null),
1688        raw: raw.clone(),
1689    }
1690}
1691
1692fn linear_event_name(raw_type: Option<&str>) -> String {
1693    match raw_type.unwrap_or_default().to_ascii_lowercase().as_str() {
1694        "issue" => "issue".to_string(),
1695        "comment" | "issuecomment" | "issue_comment" => "comment".to_string(),
1696        "issuelabel" | "issue_label" => "issue_label".to_string(),
1697        "project" | "projectupdate" | "project_update" => "project".to_string(),
1698        "cycle" => "cycle".to_string(),
1699        "customer" => "customer".to_string(),
1700        "customerrequest" | "customer_request" => "customer_request".to_string(),
1701        other if !other.is_empty() => other.to_string(),
1702        _ => "other".to_string(),
1703    }
1704}
1705
1706fn parse_linear_issue_changes(updated_from: Option<&JsonValue>) -> Vec<LinearIssueChange> {
1707    let Some(JsonValue::Object(fields)) = updated_from else {
1708        return Vec::new();
1709    };
1710    let mut changes = Vec::new();
1711    for (field, previous) in fields {
1712        let change = match field.as_str() {
1713            "title" => LinearIssueChange::Title {
1714                previous: previous.as_str().map(ToString::to_string),
1715            },
1716            "description" => LinearIssueChange::Description {
1717                previous: previous.as_str().map(ToString::to_string),
1718            },
1719            "priority" => LinearIssueChange::Priority {
1720                previous: parse_json_i64ish(previous),
1721            },
1722            "estimate" => LinearIssueChange::Estimate {
1723                previous: parse_json_i64ish(previous),
1724            },
1725            "stateId" => LinearIssueChange::StateId {
1726                previous: previous.as_str().map(ToString::to_string),
1727            },
1728            "teamId" => LinearIssueChange::TeamId {
1729                previous: previous.as_str().map(ToString::to_string),
1730            },
1731            "assigneeId" => LinearIssueChange::AssigneeId {
1732                previous: previous.as_str().map(ToString::to_string),
1733            },
1734            "projectId" => LinearIssueChange::ProjectId {
1735                previous: previous.as_str().map(ToString::to_string),
1736            },
1737            "cycleId" => LinearIssueChange::CycleId {
1738                previous: previous.as_str().map(ToString::to_string),
1739            },
1740            "dueDate" => LinearIssueChange::DueDate {
1741                previous: previous.as_str().map(ToString::to_string),
1742            },
1743            "parentId" => LinearIssueChange::ParentId {
1744                previous: previous.as_str().map(ToString::to_string),
1745            },
1746            "sortOrder" => LinearIssueChange::SortOrder {
1747                previous: previous.as_f64(),
1748            },
1749            "labelIds" => LinearIssueChange::LabelIds {
1750                previous: parse_string_array(previous),
1751            },
1752            "completedAt" => LinearIssueChange::CompletedAt {
1753                previous: previous.as_str().map(ToString::to_string),
1754            },
1755            _ => LinearIssueChange::Other {
1756                field: field.clone(),
1757                previous: previous.clone(),
1758            },
1759        };
1760        changes.push(change);
1761    }
1762    changes
1763}
1764
1765fn parse_json_i64ish(value: &JsonValue) -> Option<i64> {
1766    value
1767        .as_i64()
1768        .or_else(|| value.as_u64().and_then(|raw| i64::try_from(raw).ok()))
1769        .or_else(|| value.as_str().and_then(|raw| raw.parse::<i64>().ok()))
1770}
1771
1772fn parse_string_array(value: &JsonValue) -> Vec<String> {
1773    let Some(array) = value.as_array() else {
1774        return Vec::new();
1775    };
1776    array
1777        .iter()
1778        .filter_map(|entry| {
1779            entry.as_str().map(ToString::to_string).or_else(|| {
1780                entry
1781                    .get("id")
1782                    .and_then(JsonValue::as_str)
1783                    .map(ToString::to_string)
1784            })
1785        })
1786        .collect()
1787}
1788
1789fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
1790    headers
1791        .iter()
1792        .find(|(key, _)| key.eq_ignore_ascii_case(name))
1793        .map(|(_, value)| value.as_str())
1794}
1795
1796fn notion_payload(
1797    kind: &str,
1798    headers: &BTreeMap<String, String>,
1799    raw: JsonValue,
1800) -> ProviderPayload {
1801    let workspace_id = raw
1802        .get("workspace_id")
1803        .and_then(JsonValue::as_str)
1804        .map(ToString::to_string);
1805    ProviderPayload::Known(KnownProviderPayload::Notion(Box::new(NotionEventPayload {
1806        event: kind.to_string(),
1807        workspace_id,
1808        request_id: headers
1809            .get("request-id")
1810            .cloned()
1811            .or_else(|| headers.get("x-request-id").cloned()),
1812        subscription_id: raw
1813            .get("subscription_id")
1814            .and_then(JsonValue::as_str)
1815            .map(ToString::to_string),
1816        integration_id: raw
1817            .get("integration_id")
1818            .and_then(JsonValue::as_str)
1819            .map(ToString::to_string),
1820        attempt_number: raw
1821            .get("attempt_number")
1822            .and_then(JsonValue::as_u64)
1823            .and_then(|value| u32::try_from(value).ok()),
1824        entity_id: raw
1825            .get("entity")
1826            .and_then(|value| value.get("id"))
1827            .and_then(JsonValue::as_str)
1828            .map(ToString::to_string),
1829        entity_type: raw
1830            .get("entity")
1831            .and_then(|value| value.get("type"))
1832            .and_then(JsonValue::as_str)
1833            .map(ToString::to_string),
1834        api_version: raw
1835            .get("api_version")
1836            .and_then(JsonValue::as_str)
1837            .map(ToString::to_string),
1838        verification_token: raw
1839            .get("verification_token")
1840            .and_then(JsonValue::as_str)
1841            .map(ToString::to_string),
1842        polled: None,
1843        raw,
1844    })))
1845}
1846
1847fn cron_payload(
1848    _kind: &str,
1849    _headers: &BTreeMap<String, String>,
1850    raw: JsonValue,
1851) -> ProviderPayload {
1852    let cron_id = raw
1853        .get("cron_id")
1854        .and_then(JsonValue::as_str)
1855        .map(ToString::to_string);
1856    let schedule = raw
1857        .get("schedule")
1858        .and_then(JsonValue::as_str)
1859        .map(ToString::to_string);
1860    let tick_at = raw
1861        .get("tick_at")
1862        .and_then(JsonValue::as_str)
1863        .and_then(parse_rfc3339)
1864        .unwrap_or_else(OffsetDateTime::now_utc);
1865    ProviderPayload::Known(KnownProviderPayload::Cron(CronEventPayload {
1866        cron_id,
1867        schedule,
1868        tick_at,
1869        raw,
1870    }))
1871}
1872
1873fn webhook_payload(
1874    _kind: &str,
1875    headers: &BTreeMap<String, String>,
1876    raw: JsonValue,
1877) -> ProviderPayload {
1878    ProviderPayload::Known(KnownProviderPayload::Webhook(GenericWebhookPayload {
1879        source: headers.get("X-Webhook-Source").cloned(),
1880        content_type: headers.get("Content-Type").cloned(),
1881        raw,
1882    }))
1883}
1884
1885fn a2a_push_payload(
1886    _kind: &str,
1887    _headers: &BTreeMap<String, String>,
1888    raw: JsonValue,
1889) -> ProviderPayload {
1890    let task_id = raw
1891        .get("task_id")
1892        .and_then(JsonValue::as_str)
1893        .map(ToString::to_string);
1894    let sender = raw
1895        .get("sender")
1896        .and_then(JsonValue::as_str)
1897        .map(ToString::to_string);
1898    ProviderPayload::Known(KnownProviderPayload::A2aPush(A2aPushPayload {
1899        task_id,
1900        sender,
1901        raw,
1902    }))
1903}
1904
1905fn kafka_payload(
1906    kind: &str,
1907    headers: &BTreeMap<String, String>,
1908    raw: JsonValue,
1909) -> ProviderPayload {
1910    ProviderPayload::Known(KnownProviderPayload::Kafka(stream_payload(
1911        kind, headers, raw,
1912    )))
1913}
1914
1915fn nats_payload(kind: &str, headers: &BTreeMap<String, String>, raw: JsonValue) -> ProviderPayload {
1916    ProviderPayload::Known(KnownProviderPayload::Nats(stream_payload(
1917        kind, headers, raw,
1918    )))
1919}
1920
1921fn pulsar_payload(
1922    kind: &str,
1923    headers: &BTreeMap<String, String>,
1924    raw: JsonValue,
1925) -> ProviderPayload {
1926    ProviderPayload::Known(KnownProviderPayload::Pulsar(stream_payload(
1927        kind, headers, raw,
1928    )))
1929}
1930
1931fn postgres_cdc_payload(
1932    kind: &str,
1933    headers: &BTreeMap<String, String>,
1934    raw: JsonValue,
1935) -> ProviderPayload {
1936    ProviderPayload::Known(KnownProviderPayload::PostgresCdc(stream_payload(
1937        kind, headers, raw,
1938    )))
1939}
1940
1941fn email_payload(
1942    kind: &str,
1943    headers: &BTreeMap<String, String>,
1944    raw: JsonValue,
1945) -> ProviderPayload {
1946    ProviderPayload::Known(KnownProviderPayload::Email(stream_payload(
1947        kind, headers, raw,
1948    )))
1949}
1950
1951fn websocket_payload(
1952    kind: &str,
1953    headers: &BTreeMap<String, String>,
1954    raw: JsonValue,
1955) -> ProviderPayload {
1956    ProviderPayload::Known(KnownProviderPayload::Websocket(stream_payload(
1957        kind, headers, raw,
1958    )))
1959}
1960
1961fn stream_payload(
1962    kind: &str,
1963    headers: &BTreeMap<String, String>,
1964    raw: JsonValue,
1965) -> StreamEventPayload {
1966    StreamEventPayload {
1967        event: kind.to_string(),
1968        source: json_stringish(&raw, &["source", "connector", "origin"]),
1969        stream: json_stringish(
1970            &raw,
1971            &["stream", "topic", "subject", "channel", "mailbox", "slot"],
1972        ),
1973        partition: json_stringish(&raw, &["partition", "shard", "consumer"]),
1974        offset: json_stringish(&raw, &["offset", "sequence", "lsn", "message_id"]),
1975        key: json_stringish(&raw, &["key", "message_key", "id", "event_id"]),
1976        timestamp: json_stringish(&raw, &["timestamp", "occurred_at", "received_at", "ts"]),
1977        headers: headers.clone(),
1978        raw,
1979    }
1980}
1981
1982fn json_stringish(raw: &JsonValue, fields: &[&str]) -> Option<String> {
1983    fields.iter().find_map(|field| {
1984        let value = raw.get(*field)?;
1985        value
1986            .as_str()
1987            .map(ToString::to_string)
1988            .or_else(|| parse_json_i64ish(value).map(|number| number.to_string()))
1989            .or_else(|| value.as_u64().map(|number| number.to_string()))
1990    })
1991}
1992
1993fn parse_rfc3339(text: &str) -> Option<OffsetDateTime> {
1994    OffsetDateTime::parse(text, &time::format_description::well_known::Rfc3339).ok()
1995}
1996
1997#[cfg(test)]
1998mod tests {
1999    use super::*;
2000
2001    fn sample_headers() -> BTreeMap<String, String> {
2002        BTreeMap::from([
2003            ("Authorization".to_string(), "Bearer secret".to_string()),
2004            ("Cookie".to_string(), "session=abc".to_string()),
2005            ("User-Agent".to_string(), "GitHub-Hookshot/123".to_string()),
2006            ("X-GitHub-Delivery".to_string(), "delivery-123".to_string()),
2007            ("X-GitHub-Event".to_string(), "issues".to_string()),
2008            ("X-Webhook-Token".to_string(), "token".to_string()),
2009        ])
2010    }
2011
2012    #[test]
2013    fn default_redaction_policy_keeps_safe_headers() {
2014        let redacted = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
2015        assert_eq!(redacted.get("User-Agent").unwrap(), "GitHub-Hookshot/123");
2016        assert_eq!(redacted.get("X-GitHub-Delivery").unwrap(), "delivery-123");
2017        assert_eq!(
2018            redacted.get("Authorization").unwrap(),
2019            REDACTED_HEADER_VALUE
2020        );
2021        assert_eq!(redacted.get("Cookie").unwrap(), REDACTED_HEADER_VALUE);
2022        assert_eq!(
2023            redacted.get("X-Webhook-Token").unwrap(),
2024            REDACTED_HEADER_VALUE
2025        );
2026    }
2027
2028    #[test]
2029    fn provider_catalog_rejects_duplicates() {
2030        let mut catalog = ProviderCatalog::default();
2031        catalog
2032            .register(Arc::new(BuiltinProviderSchema {
2033                provider_id: "github",
2034                harn_schema_name: "GitHubEventPayload",
2035                metadata: provider_metadata_entry(
2036                    "github",
2037                    &["webhook"],
2038                    "GitHubEventPayload",
2039                    &[],
2040                    SignatureVerificationMetadata::None,
2041                    Vec::new(),
2042                    ProviderRuntimeMetadata::Placeholder,
2043                ),
2044                normalize: github_payload,
2045            }))
2046            .unwrap();
2047        let error = catalog
2048            .register(Arc::new(BuiltinProviderSchema {
2049                provider_id: "github",
2050                harn_schema_name: "GitHubEventPayload",
2051                metadata: provider_metadata_entry(
2052                    "github",
2053                    &["webhook"],
2054                    "GitHubEventPayload",
2055                    &[],
2056                    SignatureVerificationMetadata::None,
2057                    Vec::new(),
2058                    ProviderRuntimeMetadata::Placeholder,
2059                ),
2060                normalize: github_payload,
2061            }))
2062            .unwrap_err();
2063        assert_eq!(
2064            error,
2065            ProviderCatalogError::DuplicateProvider("github".to_string())
2066        );
2067    }
2068
2069    #[test]
2070    fn registered_provider_metadata_marks_builtin_connectors() {
2071        let entries = registered_provider_metadata();
2072        let builtin: Vec<&ProviderMetadata> = entries
2073            .iter()
2074            .filter(|entry| matches!(entry.runtime, ProviderRuntimeMetadata::Builtin { .. }))
2075            .collect();
2076
2077        assert_eq!(builtin.len(), 6);
2078        assert!(builtin.iter().any(|entry| entry.provider == "cron"));
2079        assert!(builtin.iter().any(|entry| entry.provider == "github"));
2080        assert!(builtin.iter().any(|entry| entry.provider == "linear"));
2081        assert!(builtin.iter().any(|entry| entry.provider == "notion"));
2082        assert!(builtin.iter().any(|entry| entry.provider == "slack"));
2083        assert!(builtin.iter().any(|entry| entry.provider == "webhook"));
2084        let kafka = entries
2085            .iter()
2086            .find(|entry| entry.provider == "kafka")
2087            .expect("kafka stream provider");
2088        assert_eq!(kafka.kinds, vec!["stream".to_string()]);
2089        assert_eq!(kafka.schema_name, "StreamEventPayload");
2090        assert!(matches!(
2091            kafka.runtime,
2092            ProviderRuntimeMetadata::Placeholder
2093        ));
2094    }
2095
2096    #[test]
2097    fn trigger_event_round_trip_is_stable() {
2098        let provider = ProviderId::from("github");
2099        let headers = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
2100        let payload = ProviderPayload::normalize(
2101            &provider,
2102            "issues",
2103            &sample_headers(),
2104            serde_json::json!({
2105                "action": "opened",
2106                "installation": {"id": 42},
2107                "issue": {"number": 99}
2108            }),
2109        )
2110        .unwrap();
2111        let event = TriggerEvent {
2112            id: TriggerEventId("trigger_evt_fixed".to_string()),
2113            provider,
2114            kind: "issues".to_string(),
2115            received_at: parse_rfc3339("2026-04-19T07:00:00Z").unwrap(),
2116            occurred_at: Some(parse_rfc3339("2026-04-19T06:59:59Z").unwrap()),
2117            dedupe_key: "delivery-123".to_string(),
2118            trace_id: TraceId("trace_fixed".to_string()),
2119            tenant_id: Some(TenantId("tenant_1".to_string())),
2120            headers,
2121            provider_payload: payload,
2122            signature_status: SignatureStatus::Verified,
2123            dedupe_claimed: false,
2124            batch: None,
2125            raw_body: Some(vec![0, 159, 255, 10]),
2126        };
2127
2128        let once = serde_json::to_value(&event).unwrap();
2129        assert_eq!(once["raw_body"], serde_json::json!("AJ//Cg=="));
2130        let decoded: TriggerEvent = serde_json::from_value(once.clone()).unwrap();
2131        let twice = serde_json::to_value(&decoded).unwrap();
2132        assert_eq!(decoded, event);
2133        assert_eq!(once, twice);
2134    }
2135
2136    #[test]
2137    fn unknown_provider_errors() {
2138        let error = ProviderPayload::normalize(
2139            &ProviderId::from("custom-provider"),
2140            "thing.happened",
2141            &BTreeMap::new(),
2142            serde_json::json!({"ok": true}),
2143        )
2144        .unwrap_err();
2145        assert_eq!(
2146            error,
2147            ProviderCatalogError::UnknownProvider("custom-provider".to_string())
2148        );
2149    }
2150
2151    #[test]
2152    fn provider_normalizes_stream_payloads() {
2153        let payload = ProviderPayload::normalize(
2154            &ProviderId::from("kafka"),
2155            "quote.tick",
2156            &BTreeMap::from([("x-source".to_string(), "feed".to_string())]),
2157            serde_json::json!({
2158                "topic": "quotes",
2159                "partition": 7,
2160                "offset": "42",
2161                "key": "AAPL",
2162                "timestamp": "2026-04-21T12:00:00Z"
2163            }),
2164        )
2165        .expect("stream payload");
2166        let ProviderPayload::Known(KnownProviderPayload::Kafka(payload)) = payload else {
2167            panic!("expected kafka stream payload")
2168        };
2169        assert_eq!(payload.event, "quote.tick");
2170        assert_eq!(payload.stream.as_deref(), Some("quotes"));
2171        assert_eq!(payload.partition.as_deref(), Some("7"));
2172        assert_eq!(payload.offset.as_deref(), Some("42"));
2173        assert_eq!(payload.key.as_deref(), Some("AAPL"));
2174        assert_eq!(payload.timestamp.as_deref(), Some("2026-04-21T12:00:00Z"));
2175    }
2176}