Skip to main content

harn_vm/triggers/
event.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::{Arc, OnceLock, RwLock};
3
4use serde::{Deserialize, Deserializer, Serialize, Serializer};
5use serde_json::Value as JsonValue;
6use time::OffsetDateTime;
7use uuid::Uuid;
8
9use crate::triggers::test_util::clock;
10
11const REDACTED_HEADER_VALUE: &str = "[redacted]";
12
13fn serialize_optional_bytes_b64<S>(
14    value: &Option<Vec<u8>>,
15    serializer: S,
16) -> Result<S::Ok, S::Error>
17where
18    S: Serializer,
19{
20    use base64::Engine;
21
22    match value {
23        Some(bytes) => {
24            serializer.serialize_some(&base64::engine::general_purpose::STANDARD.encode(bytes))
25        }
26        None => serializer.serialize_none(),
27    }
28}
29
30fn deserialize_optional_bytes_b64<'de, D>(deserializer: D) -> Result<Option<Vec<u8>>, D::Error>
31where
32    D: Deserializer<'de>,
33{
34    use base64::Engine;
35
36    let encoded = Option::<String>::deserialize(deserializer)?;
37    encoded
38        .map(|text| {
39            base64::engine::general_purpose::STANDARD
40                .decode(text.as_bytes())
41                .map_err(serde::de::Error::custom)
42        })
43        .transpose()
44}
45
46#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
47#[serde(transparent)]
48pub struct TriggerEventId(pub String);
49
50impl TriggerEventId {
51    pub fn new() -> Self {
52        Self(format!("trigger_evt_{}", Uuid::now_v7()))
53    }
54}
55
56impl Default for TriggerEventId {
57    fn default() -> Self {
58        Self::new()
59    }
60}
61
62#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
63#[serde(transparent)]
64pub struct ProviderId(pub String);
65
66impl ProviderId {
67    pub fn new(value: impl Into<String>) -> Self {
68        Self(value.into())
69    }
70
71    pub fn as_str(&self) -> &str {
72        self.0.as_str()
73    }
74}
75
76impl From<&str> for ProviderId {
77    fn from(value: &str) -> Self {
78        Self::new(value)
79    }
80}
81
82impl From<String> for ProviderId {
83    fn from(value: String) -> Self {
84        Self::new(value)
85    }
86}
87
88#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
89#[serde(transparent)]
90pub struct TraceId(pub String);
91
92impl TraceId {
93    pub fn new() -> Self {
94        Self(format!("trace_{}", Uuid::now_v7()))
95    }
96}
97
98impl Default for TraceId {
99    fn default() -> Self {
100        Self::new()
101    }
102}
103
104#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
105#[serde(transparent)]
106pub struct TenantId(pub String);
107
108impl TenantId {
109    pub fn new(value: impl Into<String>) -> Self {
110        Self(value.into())
111    }
112}
113
114#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
115#[serde(tag = "state", rename_all = "snake_case")]
116pub enum SignatureStatus {
117    Verified,
118    Unsigned,
119    Failed { reason: String },
120}
121
122#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
123pub struct GitHubEventCommon {
124    pub event: String,
125    pub action: Option<String>,
126    pub delivery_id: Option<String>,
127    pub installation_id: Option<i64>,
128    pub raw: JsonValue,
129}
130
131#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
132pub struct GitHubIssuesEventPayload {
133    #[serde(flatten)]
134    pub common: GitHubEventCommon,
135    pub issue: JsonValue,
136}
137
138#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
139pub struct GitHubPullRequestEventPayload {
140    #[serde(flatten)]
141    pub common: GitHubEventCommon,
142    pub pull_request: JsonValue,
143}
144
145#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
146pub struct GitHubIssueCommentEventPayload {
147    #[serde(flatten)]
148    pub common: GitHubEventCommon,
149    pub issue: JsonValue,
150    pub comment: JsonValue,
151}
152
153#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
154pub struct GitHubPullRequestReviewEventPayload {
155    #[serde(flatten)]
156    pub common: GitHubEventCommon,
157    pub pull_request: JsonValue,
158    pub review: JsonValue,
159}
160
161#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
162pub struct GitHubPushEventPayload {
163    #[serde(flatten)]
164    pub common: GitHubEventCommon,
165    #[serde(default)]
166    pub commits: Vec<JsonValue>,
167    pub distinct_size: Option<i64>,
168}
169
170#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
171pub struct GitHubWorkflowRunEventPayload {
172    #[serde(flatten)]
173    pub common: GitHubEventCommon,
174    pub workflow_run: JsonValue,
175}
176
177#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
178#[serde(untagged)]
179pub enum GitHubEventPayload {
180    Issues(GitHubIssuesEventPayload),
181    PullRequest(GitHubPullRequestEventPayload),
182    IssueComment(GitHubIssueCommentEventPayload),
183    PullRequestReview(GitHubPullRequestReviewEventPayload),
184    Push(GitHubPushEventPayload),
185    WorkflowRun(GitHubWorkflowRunEventPayload),
186    Other(GitHubEventCommon),
187}
188
189#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
190pub struct SlackEventCommon {
191    pub event: String,
192    pub event_id: Option<String>,
193    pub api_app_id: Option<String>,
194    pub team_id: Option<String>,
195    pub channel_id: Option<String>,
196    pub user_id: Option<String>,
197    pub event_ts: Option<String>,
198    pub raw: JsonValue,
199}
200
201#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
202pub struct SlackMessageEventPayload {
203    #[serde(flatten)]
204    pub common: SlackEventCommon,
205    pub subtype: Option<String>,
206    pub channel_type: Option<String>,
207    pub channel: Option<String>,
208    pub user: Option<String>,
209    pub text: Option<String>,
210    pub ts: Option<String>,
211    pub thread_ts: Option<String>,
212}
213
214#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
215pub struct SlackAppMentionEventPayload {
216    #[serde(flatten)]
217    pub common: SlackEventCommon,
218    pub channel: Option<String>,
219    pub user: Option<String>,
220    pub text: Option<String>,
221    pub ts: Option<String>,
222    pub thread_ts: Option<String>,
223}
224
225#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
226pub struct SlackReactionAddedEventPayload {
227    #[serde(flatten)]
228    pub common: SlackEventCommon,
229    pub reaction: Option<String>,
230    pub item_user: Option<String>,
231    pub item: JsonValue,
232}
233
234#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
235pub struct SlackAppHomeOpenedEventPayload {
236    #[serde(flatten)]
237    pub common: SlackEventCommon,
238    pub user: Option<String>,
239    pub channel: Option<String>,
240    pub tab: Option<String>,
241    pub view: JsonValue,
242}
243
244#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
245pub struct SlackAssistantThreadStartedEventPayload {
246    #[serde(flatten)]
247    pub common: SlackEventCommon,
248    pub assistant_thread: JsonValue,
249    pub thread_ts: Option<String>,
250    pub context: JsonValue,
251}
252
253#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
254#[serde(untagged)]
255pub enum SlackEventPayload {
256    Message(SlackMessageEventPayload),
257    AppMention(SlackAppMentionEventPayload),
258    ReactionAdded(SlackReactionAddedEventPayload),
259    AppHomeOpened(SlackAppHomeOpenedEventPayload),
260    AssistantThreadStarted(SlackAssistantThreadStartedEventPayload),
261    Other(SlackEventCommon),
262}
263
264#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
265pub struct LinearEventCommon {
266    pub event: String,
267    pub action: Option<String>,
268    pub delivery_id: Option<String>,
269    pub organization_id: Option<String>,
270    pub webhook_timestamp: Option<i64>,
271    pub webhook_id: Option<String>,
272    pub url: Option<String>,
273    pub created_at: Option<String>,
274    pub actor: JsonValue,
275    pub raw: JsonValue,
276}
277
278#[derive(Clone, Debug, PartialEq)]
279pub enum LinearIssueChange {
280    Title { previous: Option<String> },
281    Description { previous: Option<String> },
282    Priority { previous: Option<i64> },
283    Estimate { previous: Option<i64> },
284    StateId { previous: Option<String> },
285    TeamId { previous: Option<String> },
286    AssigneeId { previous: Option<String> },
287    ProjectId { previous: Option<String> },
288    CycleId { previous: Option<String> },
289    DueDate { previous: Option<String> },
290    ParentId { previous: Option<String> },
291    SortOrder { previous: Option<f64> },
292    LabelIds { previous: Vec<String> },
293    CompletedAt { previous: Option<String> },
294    Other { field: String, previous: JsonValue },
295}
296
297impl Serialize for LinearIssueChange {
298    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
299    where
300        S: Serializer,
301    {
302        let value = match self {
303            Self::Title { previous } => {
304                serde_json::json!({ "field_name": "title", "previous": previous })
305            }
306            Self::Description { previous } => {
307                serde_json::json!({ "field_name": "description", "previous": previous })
308            }
309            Self::Priority { previous } => {
310                serde_json::json!({ "field_name": "priority", "previous": previous })
311            }
312            Self::Estimate { previous } => {
313                serde_json::json!({ "field_name": "estimate", "previous": previous })
314            }
315            Self::StateId { previous } => {
316                serde_json::json!({ "field_name": "state_id", "previous": previous })
317            }
318            Self::TeamId { previous } => {
319                serde_json::json!({ "field_name": "team_id", "previous": previous })
320            }
321            Self::AssigneeId { previous } => {
322                serde_json::json!({ "field_name": "assignee_id", "previous": previous })
323            }
324            Self::ProjectId { previous } => {
325                serde_json::json!({ "field_name": "project_id", "previous": previous })
326            }
327            Self::CycleId { previous } => {
328                serde_json::json!({ "field_name": "cycle_id", "previous": previous })
329            }
330            Self::DueDate { previous } => {
331                serde_json::json!({ "field_name": "due_date", "previous": previous })
332            }
333            Self::ParentId { previous } => {
334                serde_json::json!({ "field_name": "parent_id", "previous": previous })
335            }
336            Self::SortOrder { previous } => {
337                serde_json::json!({ "field_name": "sort_order", "previous": previous })
338            }
339            Self::LabelIds { previous } => {
340                serde_json::json!({ "field_name": "label_ids", "previous": previous })
341            }
342            Self::CompletedAt { previous } => {
343                serde_json::json!({ "field_name": "completed_at", "previous": previous })
344            }
345            Self::Other { field, previous } => {
346                serde_json::json!({ "field_name": "other", "field": field, "previous": previous })
347            }
348        };
349        value.serialize(serializer)
350    }
351}
352
353impl<'de> Deserialize<'de> for LinearIssueChange {
354    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
355    where
356        D: Deserializer<'de>,
357    {
358        let value = JsonValue::deserialize(deserializer)?;
359        let field_name = value
360            .get("field_name")
361            .and_then(JsonValue::as_str)
362            .ok_or_else(|| serde::de::Error::custom("linear issue change missing field_name"))?;
363        let previous = value.get("previous").cloned().unwrap_or(JsonValue::Null);
364        Ok(match field_name {
365            "title" => Self::Title {
366                previous: previous.as_str().map(ToString::to_string),
367            },
368            "description" => Self::Description {
369                previous: previous.as_str().map(ToString::to_string),
370            },
371            "priority" => Self::Priority {
372                previous: parse_json_i64ish(&previous),
373            },
374            "estimate" => Self::Estimate {
375                previous: parse_json_i64ish(&previous),
376            },
377            "state_id" => Self::StateId {
378                previous: previous.as_str().map(ToString::to_string),
379            },
380            "team_id" => Self::TeamId {
381                previous: previous.as_str().map(ToString::to_string),
382            },
383            "assignee_id" => Self::AssigneeId {
384                previous: previous.as_str().map(ToString::to_string),
385            },
386            "project_id" => Self::ProjectId {
387                previous: previous.as_str().map(ToString::to_string),
388            },
389            "cycle_id" => Self::CycleId {
390                previous: previous.as_str().map(ToString::to_string),
391            },
392            "due_date" => Self::DueDate {
393                previous: previous.as_str().map(ToString::to_string),
394            },
395            "parent_id" => Self::ParentId {
396                previous: previous.as_str().map(ToString::to_string),
397            },
398            "sort_order" => Self::SortOrder {
399                previous: previous.as_f64(),
400            },
401            "label_ids" => Self::LabelIds {
402                previous: parse_string_array(&previous),
403            },
404            "completed_at" => Self::CompletedAt {
405                previous: previous.as_str().map(ToString::to_string),
406            },
407            "other" => Self::Other {
408                field: value
409                    .get("field")
410                    .and_then(JsonValue::as_str)
411                    .map(ToString::to_string)
412                    .unwrap_or_else(|| "unknown".to_string()),
413                previous,
414            },
415            other => Self::Other {
416                field: other.to_string(),
417                previous,
418            },
419        })
420    }
421}
422
423#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
424pub struct LinearIssueEventPayload {
425    #[serde(flatten)]
426    pub common: LinearEventCommon,
427    pub issue: JsonValue,
428    #[serde(default)]
429    pub changes: Vec<LinearIssueChange>,
430}
431
432#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
433pub struct LinearIssueCommentEventPayload {
434    #[serde(flatten)]
435    pub common: LinearEventCommon,
436    pub comment: JsonValue,
437}
438
439#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
440pub struct LinearIssueLabelEventPayload {
441    #[serde(flatten)]
442    pub common: LinearEventCommon,
443    pub label: JsonValue,
444}
445
446#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
447pub struct LinearProjectEventPayload {
448    #[serde(flatten)]
449    pub common: LinearEventCommon,
450    pub project: JsonValue,
451}
452
453#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
454pub struct LinearCycleEventPayload {
455    #[serde(flatten)]
456    pub common: LinearEventCommon,
457    pub cycle: JsonValue,
458}
459
460#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
461pub struct LinearCustomerEventPayload {
462    #[serde(flatten)]
463    pub common: LinearEventCommon,
464    pub customer: JsonValue,
465}
466
467#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
468pub struct LinearCustomerRequestEventPayload {
469    #[serde(flatten)]
470    pub common: LinearEventCommon,
471    pub customer_request: JsonValue,
472}
473
474#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
475#[serde(untagged)]
476pub enum LinearEventPayload {
477    Issue(LinearIssueEventPayload),
478    IssueComment(LinearIssueCommentEventPayload),
479    IssueLabel(LinearIssueLabelEventPayload),
480    Project(LinearProjectEventPayload),
481    Cycle(LinearCycleEventPayload),
482    Customer(LinearCustomerEventPayload),
483    CustomerRequest(LinearCustomerRequestEventPayload),
484    Other(LinearEventCommon),
485}
486
487#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
488pub struct NotionPolledChangeEvent {
489    pub resource: String,
490    pub source_id: String,
491    pub entity_id: String,
492    pub high_water_mark: String,
493    pub before: Option<JsonValue>,
494    pub after: JsonValue,
495}
496
497#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
498pub struct NotionEventPayload {
499    pub event: String,
500    pub workspace_id: Option<String>,
501    pub request_id: Option<String>,
502    pub subscription_id: Option<String>,
503    pub integration_id: Option<String>,
504    pub attempt_number: Option<u32>,
505    pub entity_id: Option<String>,
506    pub entity_type: Option<String>,
507    pub api_version: Option<String>,
508    pub verification_token: Option<String>,
509    pub polled: Option<NotionPolledChangeEvent>,
510    pub raw: JsonValue,
511}
512
513#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
514pub struct CronEventPayload {
515    pub cron_id: Option<String>,
516    pub schedule: Option<String>,
517    #[serde(with = "time::serde::rfc3339")]
518    pub tick_at: OffsetDateTime,
519    pub raw: JsonValue,
520}
521
522#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
523pub struct GenericWebhookPayload {
524    pub source: Option<String>,
525    pub content_type: Option<String>,
526    pub raw: JsonValue,
527}
528
529#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
530pub struct A2aPushPayload {
531    pub task_id: Option<String>,
532    pub sender: Option<String>,
533    pub raw: JsonValue,
534}
535
536#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
537pub struct ExtensionProviderPayload {
538    pub provider: String,
539    pub schema_name: String,
540    pub raw: JsonValue,
541}
542
543#[allow(clippy::large_enum_variant)]
544#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
545#[serde(untagged)]
546pub enum ProviderPayload {
547    Known(KnownProviderPayload),
548    Extension(ExtensionProviderPayload),
549}
550
551impl ProviderPayload {
552    pub fn provider(&self) -> &str {
553        match self {
554            Self::Known(known) => known.provider(),
555            Self::Extension(payload) => payload.provider.as_str(),
556        }
557    }
558
559    pub fn normalize(
560        provider: &ProviderId,
561        kind: &str,
562        headers: &BTreeMap<String, String>,
563        raw: JsonValue,
564    ) -> Result<Self, ProviderCatalogError> {
565        provider_catalog()
566            .read()
567            .expect("provider catalog poisoned")
568            .normalize(provider, kind, headers, raw)
569    }
570}
571
572#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
573#[serde(tag = "provider")]
574pub enum KnownProviderPayload {
575    #[serde(rename = "github")]
576    GitHub(GitHubEventPayload),
577    #[serde(rename = "slack")]
578    Slack(Box<SlackEventPayload>),
579    #[serde(rename = "linear")]
580    Linear(LinearEventPayload),
581    #[serde(rename = "notion")]
582    Notion(Box<NotionEventPayload>),
583    #[serde(rename = "cron")]
584    Cron(CronEventPayload),
585    #[serde(rename = "webhook")]
586    Webhook(GenericWebhookPayload),
587    #[serde(rename = "a2a-push")]
588    A2aPush(A2aPushPayload),
589}
590
591impl KnownProviderPayload {
592    pub fn provider(&self) -> &str {
593        match self {
594            Self::GitHub(_) => "github",
595            Self::Slack(_) => "slack",
596            Self::Linear(_) => "linear",
597            Self::Notion(_) => "notion",
598            Self::Cron(_) => "cron",
599            Self::Webhook(_) => "webhook",
600            Self::A2aPush(_) => "a2a-push",
601        }
602    }
603}
604
605#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
606pub struct TriggerEvent {
607    pub id: TriggerEventId,
608    pub provider: ProviderId,
609    pub kind: String,
610    #[serde(with = "time::serde::rfc3339")]
611    pub received_at: OffsetDateTime,
612    #[serde(with = "time::serde::rfc3339::option")]
613    pub occurred_at: Option<OffsetDateTime>,
614    pub dedupe_key: String,
615    pub trace_id: TraceId,
616    pub tenant_id: Option<TenantId>,
617    pub headers: BTreeMap<String, String>,
618    #[serde(default, skip_serializing_if = "Option::is_none")]
619    pub batch: Option<Vec<JsonValue>>,
620    #[serde(
621        default,
622        skip_serializing_if = "Option::is_none",
623        serialize_with = "serialize_optional_bytes_b64",
624        deserialize_with = "deserialize_optional_bytes_b64"
625    )]
626    pub raw_body: Option<Vec<u8>>,
627    pub provider_payload: ProviderPayload,
628    pub signature_status: SignatureStatus,
629    #[serde(skip)]
630    pub dedupe_claimed: bool,
631}
632
633impl TriggerEvent {
634    #[allow(clippy::too_many_arguments)]
635    pub fn new(
636        provider: ProviderId,
637        kind: impl Into<String>,
638        occurred_at: Option<OffsetDateTime>,
639        dedupe_key: impl Into<String>,
640        tenant_id: Option<TenantId>,
641        headers: BTreeMap<String, String>,
642        provider_payload: ProviderPayload,
643        signature_status: SignatureStatus,
644    ) -> Self {
645        Self {
646            id: TriggerEventId::new(),
647            provider,
648            kind: kind.into(),
649            received_at: clock::now_utc(),
650            occurred_at,
651            dedupe_key: dedupe_key.into(),
652            trace_id: TraceId::new(),
653            tenant_id,
654            headers,
655            batch: None,
656            raw_body: None,
657            provider_payload,
658            signature_status,
659            dedupe_claimed: false,
660        }
661    }
662
663    pub fn dedupe_claimed(&self) -> bool {
664        self.dedupe_claimed
665    }
666
667    pub fn mark_dedupe_claimed(&mut self) {
668        self.dedupe_claimed = true;
669    }
670}
671
672#[derive(Clone, Debug, PartialEq, Eq)]
673pub struct HeaderRedactionPolicy {
674    safe_exact_names: BTreeSet<String>,
675}
676
677impl HeaderRedactionPolicy {
678    pub fn with_safe_header(mut self, name: impl Into<String>) -> Self {
679        self.safe_exact_names
680            .insert(name.into().to_ascii_lowercase());
681        self
682    }
683
684    fn should_keep(&self, name: &str) -> bool {
685        let lower = name.to_ascii_lowercase();
686        if self.safe_exact_names.contains(lower.as_str()) {
687            return true;
688        }
689        matches!(
690            lower.as_str(),
691            "user-agent"
692                | "request-id"
693                | "x-request-id"
694                | "x-correlation-id"
695                | "content-type"
696                | "content-length"
697                | "x-github-event"
698                | "x-github-delivery"
699                | "x-github-hook-id"
700                | "x-hub-signature-256"
701                | "x-slack-request-timestamp"
702                | "x-slack-signature"
703                | "x-linear-signature"
704                | "x-notion-signature"
705                | "x-a2a-signature"
706                | "x-a2a-delivery"
707        ) || lower.ends_with("-event")
708            || lower.ends_with("-delivery")
709            || lower.contains("timestamp")
710            || lower.contains("request-id")
711    }
712
713    fn should_redact(&self, name: &str) -> bool {
714        let lower = name.to_ascii_lowercase();
715        if self.should_keep(lower.as_str()) {
716            return false;
717        }
718        lower.contains("authorization")
719            || lower.contains("cookie")
720            || lower.contains("secret")
721            || lower.contains("token")
722            || lower.contains("key")
723    }
724}
725
726impl Default for HeaderRedactionPolicy {
727    fn default() -> Self {
728        Self {
729            safe_exact_names: BTreeSet::from([
730                "content-length".to_string(),
731                "content-type".to_string(),
732                "request-id".to_string(),
733                "user-agent".to_string(),
734                "x-a2a-delivery".to_string(),
735                "x-a2a-signature".to_string(),
736                "x-correlation-id".to_string(),
737                "x-github-delivery".to_string(),
738                "x-github-event".to_string(),
739                "x-github-hook-id".to_string(),
740                "x-hub-signature-256".to_string(),
741                "x-linear-signature".to_string(),
742                "x-notion-signature".to_string(),
743                "x-request-id".to_string(),
744                "x-slack-request-timestamp".to_string(),
745                "x-slack-signature".to_string(),
746            ]),
747        }
748    }
749}
750
751pub fn redact_headers(
752    headers: &BTreeMap<String, String>,
753    policy: &HeaderRedactionPolicy,
754) -> BTreeMap<String, String> {
755    headers
756        .iter()
757        .map(|(name, value)| {
758            if policy.should_redact(name) {
759                (name.clone(), REDACTED_HEADER_VALUE.to_string())
760            } else {
761                (name.clone(), value.clone())
762            }
763        })
764        .collect()
765}
766
767#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
768pub struct ProviderSecretRequirement {
769    pub name: String,
770    pub required: bool,
771    pub namespace: String,
772}
773
774#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
775pub struct ProviderOutboundMethod {
776    pub name: String,
777}
778
779#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
780#[serde(tag = "kind", rename_all = "snake_case")]
781pub enum SignatureVerificationMetadata {
782    #[default]
783    None,
784    Hmac {
785        variant: String,
786        raw_body: bool,
787        signature_header: String,
788        timestamp_header: Option<String>,
789        id_header: Option<String>,
790        default_tolerance_secs: Option<i64>,
791        digest: String,
792        encoding: String,
793    },
794}
795
796#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
797#[serde(tag = "kind", rename_all = "snake_case")]
798pub enum ProviderRuntimeMetadata {
799    Builtin {
800        connector: String,
801        default_signature_variant: Option<String>,
802    },
803    #[default]
804    Placeholder,
805}
806
807#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
808pub struct ProviderMetadata {
809    pub provider: String,
810    #[serde(default)]
811    pub kinds: Vec<String>,
812    pub schema_name: String,
813    #[serde(default)]
814    pub outbound_methods: Vec<ProviderOutboundMethod>,
815    #[serde(default)]
816    pub secret_requirements: Vec<ProviderSecretRequirement>,
817    #[serde(default)]
818    pub signature_verification: SignatureVerificationMetadata,
819    #[serde(default)]
820    pub runtime: ProviderRuntimeMetadata,
821}
822
823impl ProviderMetadata {
824    pub fn supports_kind(&self, kind: &str) -> bool {
825        self.kinds.iter().any(|candidate| candidate == kind)
826    }
827
828    pub fn required_secret_names(&self) -> impl Iterator<Item = &str> {
829        self.secret_requirements
830            .iter()
831            .filter(|requirement| requirement.required)
832            .map(|requirement| requirement.name.as_str())
833    }
834}
835
836pub trait ProviderSchema: Send + Sync {
837    fn provider_id(&self) -> &'static str;
838    fn harn_schema_name(&self) -> &'static str;
839    fn metadata(&self) -> ProviderMetadata {
840        ProviderMetadata {
841            provider: self.provider_id().to_string(),
842            schema_name: self.harn_schema_name().to_string(),
843            ..ProviderMetadata::default()
844        }
845    }
846    fn normalize(
847        &self,
848        kind: &str,
849        headers: &BTreeMap<String, String>,
850        raw: JsonValue,
851    ) -> Result<ProviderPayload, ProviderCatalogError>;
852}
853
854#[derive(Clone, Debug, PartialEq, Eq)]
855pub enum ProviderCatalogError {
856    DuplicateProvider(String),
857    UnknownProvider(String),
858}
859
860impl std::fmt::Display for ProviderCatalogError {
861    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
862        match self {
863            Self::DuplicateProvider(provider) => {
864                write!(f, "provider `{provider}` is already registered")
865            }
866            Self::UnknownProvider(provider) => write!(f, "provider `{provider}` is not registered"),
867        }
868    }
869}
870
871impl std::error::Error for ProviderCatalogError {}
872
873#[derive(Clone, Default)]
874pub struct ProviderCatalog {
875    providers: BTreeMap<String, Arc<dyn ProviderSchema>>,
876}
877
878impl ProviderCatalog {
879    pub fn with_defaults() -> Self {
880        let mut catalog = Self::default();
881        for schema in default_provider_schemas() {
882            catalog
883                .register(schema)
884                .expect("default providers must register cleanly");
885        }
886        catalog
887    }
888
889    pub fn register(
890        &mut self,
891        schema: Arc<dyn ProviderSchema>,
892    ) -> Result<(), ProviderCatalogError> {
893        let provider = schema.provider_id().to_string();
894        if self.providers.contains_key(provider.as_str()) {
895            return Err(ProviderCatalogError::DuplicateProvider(provider));
896        }
897        self.providers.insert(provider, schema);
898        Ok(())
899    }
900
901    pub fn normalize(
902        &self,
903        provider: &ProviderId,
904        kind: &str,
905        headers: &BTreeMap<String, String>,
906        raw: JsonValue,
907    ) -> Result<ProviderPayload, ProviderCatalogError> {
908        let schema = self
909            .providers
910            .get(provider.as_str())
911            .ok_or_else(|| ProviderCatalogError::UnknownProvider(provider.0.clone()))?;
912        schema.normalize(kind, headers, raw)
913    }
914
915    pub fn schema_names(&self) -> BTreeMap<String, String> {
916        self.providers
917            .iter()
918            .map(|(provider, schema)| (provider.clone(), schema.harn_schema_name().to_string()))
919            .collect()
920    }
921
922    pub fn entries(&self) -> Vec<ProviderMetadata> {
923        self.providers
924            .values()
925            .map(|schema| schema.metadata())
926            .collect()
927    }
928
929    pub fn metadata_for(&self, provider: &str) -> Option<ProviderMetadata> {
930        self.providers.get(provider).map(|schema| schema.metadata())
931    }
932}
933
934pub fn register_provider_schema(
935    schema: Arc<dyn ProviderSchema>,
936) -> Result<(), ProviderCatalogError> {
937    provider_catalog()
938        .write()
939        .expect("provider catalog poisoned")
940        .register(schema)
941}
942
943pub fn reset_provider_catalog() {
944    *provider_catalog()
945        .write()
946        .expect("provider catalog poisoned") = ProviderCatalog::with_defaults();
947}
948
949pub fn registered_provider_schema_names() -> BTreeMap<String, String> {
950    provider_catalog()
951        .read()
952        .expect("provider catalog poisoned")
953        .schema_names()
954}
955
956pub fn registered_provider_metadata() -> Vec<ProviderMetadata> {
957    provider_catalog()
958        .read()
959        .expect("provider catalog poisoned")
960        .entries()
961}
962
963pub fn provider_metadata(provider: &str) -> Option<ProviderMetadata> {
964    provider_catalog()
965        .read()
966        .expect("provider catalog poisoned")
967        .metadata_for(provider)
968}
969
970fn provider_catalog() -> &'static RwLock<ProviderCatalog> {
971    static PROVIDER_CATALOG: OnceLock<RwLock<ProviderCatalog>> = OnceLock::new();
972    PROVIDER_CATALOG.get_or_init(|| RwLock::new(ProviderCatalog::with_defaults()))
973}
974
975struct BuiltinProviderSchema {
976    provider_id: &'static str,
977    harn_schema_name: &'static str,
978    metadata: ProviderMetadata,
979    normalize: fn(&str, &BTreeMap<String, String>, JsonValue) -> ProviderPayload,
980}
981
982impl ProviderSchema for BuiltinProviderSchema {
983    fn provider_id(&self) -> &'static str {
984        self.provider_id
985    }
986
987    fn harn_schema_name(&self) -> &'static str {
988        self.harn_schema_name
989    }
990
991    fn metadata(&self) -> ProviderMetadata {
992        self.metadata.clone()
993    }
994
995    fn normalize(
996        &self,
997        kind: &str,
998        headers: &BTreeMap<String, String>,
999        raw: JsonValue,
1000    ) -> Result<ProviderPayload, ProviderCatalogError> {
1001        Ok((self.normalize)(kind, headers, raw))
1002    }
1003}
1004
1005fn provider_metadata_entry(
1006    provider: &str,
1007    kinds: &[&str],
1008    schema_name: &str,
1009    outbound_methods: &[&str],
1010    signature_verification: SignatureVerificationMetadata,
1011    secret_requirements: Vec<ProviderSecretRequirement>,
1012    runtime: ProviderRuntimeMetadata,
1013) -> ProviderMetadata {
1014    ProviderMetadata {
1015        provider: provider.to_string(),
1016        kinds: kinds.iter().map(|kind| kind.to_string()).collect(),
1017        schema_name: schema_name.to_string(),
1018        outbound_methods: outbound_methods
1019            .iter()
1020            .map(|name| ProviderOutboundMethod {
1021                name: (*name).to_string(),
1022            })
1023            .collect(),
1024        secret_requirements,
1025        signature_verification,
1026        runtime,
1027    }
1028}
1029
1030fn hmac_signature_metadata(
1031    variant: &str,
1032    signature_header: &str,
1033    timestamp_header: Option<&str>,
1034    id_header: Option<&str>,
1035    default_tolerance_secs: Option<i64>,
1036    encoding: &str,
1037) -> SignatureVerificationMetadata {
1038    SignatureVerificationMetadata::Hmac {
1039        variant: variant.to_string(),
1040        raw_body: true,
1041        signature_header: signature_header.to_string(),
1042        timestamp_header: timestamp_header.map(ToString::to_string),
1043        id_header: id_header.map(ToString::to_string),
1044        default_tolerance_secs,
1045        digest: "sha256".to_string(),
1046        encoding: encoding.to_string(),
1047    }
1048}
1049
1050fn required_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1051    ProviderSecretRequirement {
1052        name: name.to_string(),
1053        required: true,
1054        namespace: namespace.to_string(),
1055    }
1056}
1057
1058fn outbound_method(name: &str) -> ProviderOutboundMethod {
1059    ProviderOutboundMethod {
1060        name: name.to_string(),
1061    }
1062}
1063
1064fn optional_secret(name: &str, namespace: &str) -> ProviderSecretRequirement {
1065    ProviderSecretRequirement {
1066        name: name.to_string(),
1067        required: false,
1068        namespace: namespace.to_string(),
1069    }
1070}
1071
1072fn default_provider_schemas() -> Vec<Arc<dyn ProviderSchema>> {
1073    vec![
1074        Arc::new(BuiltinProviderSchema {
1075            provider_id: "github",
1076            harn_schema_name: "GitHubEventPayload",
1077            metadata: provider_metadata_entry(
1078                "github",
1079                &["webhook"],
1080                "GitHubEventPayload",
1081                &[],
1082                hmac_signature_metadata(
1083                    "github",
1084                    "X-Hub-Signature-256",
1085                    None,
1086                    Some("X-GitHub-Delivery"),
1087                    None,
1088                    "hex",
1089                ),
1090                vec![required_secret("signing_secret", "github")],
1091                ProviderRuntimeMetadata::Builtin {
1092                    connector: "webhook".to_string(),
1093                    default_signature_variant: Some("github".to_string()),
1094                },
1095            ),
1096            normalize: github_payload,
1097        }),
1098        Arc::new(BuiltinProviderSchema {
1099            provider_id: "slack",
1100            harn_schema_name: "SlackEventPayload",
1101            metadata: provider_metadata_entry(
1102                "slack",
1103                &["webhook"],
1104                "SlackEventPayload",
1105                &[
1106                    "post_message",
1107                    "update_message",
1108                    "add_reaction",
1109                    "open_view",
1110                    "user_info",
1111                    "api_call",
1112                    "upload_file",
1113                ],
1114                hmac_signature_metadata(
1115                    "slack",
1116                    "X-Slack-Signature",
1117                    Some("X-Slack-Request-Timestamp"),
1118                    None,
1119                    Some(300),
1120                    "hex",
1121                ),
1122                vec![required_secret("signing_secret", "slack")],
1123                ProviderRuntimeMetadata::Builtin {
1124                    connector: "slack".to_string(),
1125                    default_signature_variant: Some("slack".to_string()),
1126                },
1127            ),
1128            normalize: slack_payload,
1129        }),
1130        Arc::new(BuiltinProviderSchema {
1131            provider_id: "linear",
1132            harn_schema_name: "LinearEventPayload",
1133            metadata: {
1134                let mut metadata = provider_metadata_entry(
1135                    "linear",
1136                    &["webhook"],
1137                    "LinearEventPayload",
1138                    &[],
1139                    hmac_signature_metadata(
1140                        "linear",
1141                        "Linear-Signature",
1142                        None,
1143                        Some("Linear-Delivery"),
1144                        Some(75),
1145                        "hex",
1146                    ),
1147                    vec![
1148                        required_secret("signing_secret", "linear"),
1149                        optional_secret("access_token", "linear"),
1150                    ],
1151                    ProviderRuntimeMetadata::Builtin {
1152                        connector: "linear".to_string(),
1153                        default_signature_variant: Some("linear".to_string()),
1154                    },
1155                );
1156                metadata.outbound_methods = vec![
1157                    ProviderOutboundMethod {
1158                        name: "list_issues".to_string(),
1159                    },
1160                    ProviderOutboundMethod {
1161                        name: "update_issue".to_string(),
1162                    },
1163                    ProviderOutboundMethod {
1164                        name: "create_comment".to_string(),
1165                    },
1166                    ProviderOutboundMethod {
1167                        name: "search".to_string(),
1168                    },
1169                    ProviderOutboundMethod {
1170                        name: "graphql".to_string(),
1171                    },
1172                ];
1173                metadata
1174            },
1175            normalize: linear_payload,
1176        }),
1177        Arc::new(BuiltinProviderSchema {
1178            provider_id: "notion",
1179            harn_schema_name: "NotionEventPayload",
1180            metadata: {
1181                let mut metadata = provider_metadata_entry(
1182                    "notion",
1183                    &["webhook", "poll"],
1184                    "NotionEventPayload",
1185                    &[],
1186                    hmac_signature_metadata(
1187                        "notion",
1188                        "X-Notion-Signature",
1189                        None,
1190                        None,
1191                        None,
1192                        "hex",
1193                    ),
1194                    vec![required_secret("verification_token", "notion")],
1195                    ProviderRuntimeMetadata::Builtin {
1196                        connector: "notion".to_string(),
1197                        default_signature_variant: Some("notion".to_string()),
1198                    },
1199                );
1200                metadata.outbound_methods = vec![
1201                    outbound_method("get_page"),
1202                    outbound_method("update_page"),
1203                    outbound_method("append_blocks"),
1204                    outbound_method("query_database"),
1205                    outbound_method("search"),
1206                    outbound_method("create_comment"),
1207                    outbound_method("api_call"),
1208                ];
1209                metadata
1210            },
1211            normalize: notion_payload,
1212        }),
1213        Arc::new(BuiltinProviderSchema {
1214            provider_id: "cron",
1215            harn_schema_name: "CronEventPayload",
1216            metadata: provider_metadata_entry(
1217                "cron",
1218                &["cron"],
1219                "CronEventPayload",
1220                &[],
1221                SignatureVerificationMetadata::None,
1222                Vec::new(),
1223                ProviderRuntimeMetadata::Builtin {
1224                    connector: "cron".to_string(),
1225                    default_signature_variant: None,
1226                },
1227            ),
1228            normalize: cron_payload,
1229        }),
1230        Arc::new(BuiltinProviderSchema {
1231            provider_id: "webhook",
1232            harn_schema_name: "GenericWebhookPayload",
1233            metadata: provider_metadata_entry(
1234                "webhook",
1235                &["webhook"],
1236                "GenericWebhookPayload",
1237                &[],
1238                hmac_signature_metadata(
1239                    "standard",
1240                    "webhook-signature",
1241                    Some("webhook-timestamp"),
1242                    Some("webhook-id"),
1243                    Some(300),
1244                    "base64",
1245                ),
1246                vec![required_secret("signing_secret", "webhook")],
1247                ProviderRuntimeMetadata::Builtin {
1248                    connector: "webhook".to_string(),
1249                    default_signature_variant: Some("standard".to_string()),
1250                },
1251            ),
1252            normalize: webhook_payload,
1253        }),
1254        Arc::new(BuiltinProviderSchema {
1255            provider_id: "a2a-push",
1256            harn_schema_name: "A2aPushPayload",
1257            metadata: provider_metadata_entry(
1258                "a2a-push",
1259                &["a2a-push"],
1260                "A2aPushPayload",
1261                &[],
1262                SignatureVerificationMetadata::None,
1263                Vec::new(),
1264                ProviderRuntimeMetadata::Placeholder,
1265            ),
1266            normalize: a2a_push_payload,
1267        }),
1268    ]
1269}
1270
1271fn github_payload(
1272    kind: &str,
1273    headers: &BTreeMap<String, String>,
1274    raw: JsonValue,
1275) -> ProviderPayload {
1276    let common = GitHubEventCommon {
1277        event: kind.to_string(),
1278        action: raw
1279            .get("action")
1280            .and_then(JsonValue::as_str)
1281            .map(ToString::to_string),
1282        delivery_id: headers.get("X-GitHub-Delivery").cloned(),
1283        installation_id: raw
1284            .get("installation")
1285            .and_then(|value| value.get("id"))
1286            .and_then(JsonValue::as_i64),
1287        raw: raw.clone(),
1288    };
1289    let payload = match kind {
1290        "issues" => GitHubEventPayload::Issues(GitHubIssuesEventPayload {
1291            common,
1292            issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1293        }),
1294        "pull_request" => GitHubEventPayload::PullRequest(GitHubPullRequestEventPayload {
1295            common,
1296            pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1297        }),
1298        "issue_comment" => GitHubEventPayload::IssueComment(GitHubIssueCommentEventPayload {
1299            common,
1300            issue: raw.get("issue").cloned().unwrap_or(JsonValue::Null),
1301            comment: raw.get("comment").cloned().unwrap_or(JsonValue::Null),
1302        }),
1303        "pull_request_review" => {
1304            GitHubEventPayload::PullRequestReview(GitHubPullRequestReviewEventPayload {
1305                common,
1306                pull_request: raw.get("pull_request").cloned().unwrap_or(JsonValue::Null),
1307                review: raw.get("review").cloned().unwrap_or(JsonValue::Null),
1308            })
1309        }
1310        "push" => GitHubEventPayload::Push(GitHubPushEventPayload {
1311            common,
1312            commits: raw
1313                .get("commits")
1314                .and_then(JsonValue::as_array)
1315                .cloned()
1316                .unwrap_or_default(),
1317            distinct_size: raw.get("distinct_size").and_then(JsonValue::as_i64),
1318        }),
1319        "workflow_run" => GitHubEventPayload::WorkflowRun(GitHubWorkflowRunEventPayload {
1320            common,
1321            workflow_run: raw.get("workflow_run").cloned().unwrap_or(JsonValue::Null),
1322        }),
1323        _ => GitHubEventPayload::Other(common),
1324    };
1325    ProviderPayload::Known(KnownProviderPayload::GitHub(payload))
1326}
1327
1328fn slack_payload(
1329    kind: &str,
1330    _headers: &BTreeMap<String, String>,
1331    raw: JsonValue,
1332) -> ProviderPayload {
1333    let event = raw.get("event");
1334    let common = SlackEventCommon {
1335        event: kind.to_string(),
1336        event_id: raw
1337            .get("event_id")
1338            .and_then(JsonValue::as_str)
1339            .map(ToString::to_string),
1340        api_app_id: raw
1341            .get("api_app_id")
1342            .and_then(JsonValue::as_str)
1343            .map(ToString::to_string),
1344        team_id: raw
1345            .get("team_id")
1346            .and_then(JsonValue::as_str)
1347            .map(ToString::to_string),
1348        channel_id: slack_channel_id(event),
1349        user_id: slack_user_id(event),
1350        event_ts: event
1351            .and_then(|value| value.get("event_ts"))
1352            .and_then(JsonValue::as_str)
1353            .map(ToString::to_string),
1354        raw: raw.clone(),
1355    };
1356    let payload = match kind {
1357        kind if kind == "message" || kind.starts_with("message.") => {
1358            SlackEventPayload::Message(SlackMessageEventPayload {
1359                subtype: event
1360                    .and_then(|value| value.get("subtype"))
1361                    .and_then(JsonValue::as_str)
1362                    .map(ToString::to_string),
1363                channel_type: event
1364                    .and_then(|value| value.get("channel_type"))
1365                    .and_then(JsonValue::as_str)
1366                    .map(ToString::to_string),
1367                channel: event
1368                    .and_then(|value| value.get("channel"))
1369                    .and_then(JsonValue::as_str)
1370                    .map(ToString::to_string),
1371                user: event
1372                    .and_then(|value| value.get("user"))
1373                    .and_then(JsonValue::as_str)
1374                    .map(ToString::to_string),
1375                text: event
1376                    .and_then(|value| value.get("text"))
1377                    .and_then(JsonValue::as_str)
1378                    .map(ToString::to_string),
1379                ts: event
1380                    .and_then(|value| value.get("ts"))
1381                    .and_then(JsonValue::as_str)
1382                    .map(ToString::to_string),
1383                thread_ts: event
1384                    .and_then(|value| value.get("thread_ts"))
1385                    .and_then(JsonValue::as_str)
1386                    .map(ToString::to_string),
1387                common,
1388            })
1389        }
1390        "app_mention" => SlackEventPayload::AppMention(SlackAppMentionEventPayload {
1391            channel: event
1392                .and_then(|value| value.get("channel"))
1393                .and_then(JsonValue::as_str)
1394                .map(ToString::to_string),
1395            user: event
1396                .and_then(|value| value.get("user"))
1397                .and_then(JsonValue::as_str)
1398                .map(ToString::to_string),
1399            text: event
1400                .and_then(|value| value.get("text"))
1401                .and_then(JsonValue::as_str)
1402                .map(ToString::to_string),
1403            ts: event
1404                .and_then(|value| value.get("ts"))
1405                .and_then(JsonValue::as_str)
1406                .map(ToString::to_string),
1407            thread_ts: event
1408                .and_then(|value| value.get("thread_ts"))
1409                .and_then(JsonValue::as_str)
1410                .map(ToString::to_string),
1411            common,
1412        }),
1413        "reaction_added" => SlackEventPayload::ReactionAdded(SlackReactionAddedEventPayload {
1414            reaction: event
1415                .and_then(|value| value.get("reaction"))
1416                .and_then(JsonValue::as_str)
1417                .map(ToString::to_string),
1418            item_user: event
1419                .and_then(|value| value.get("item_user"))
1420                .and_then(JsonValue::as_str)
1421                .map(ToString::to_string),
1422            item: event
1423                .and_then(|value| value.get("item"))
1424                .cloned()
1425                .unwrap_or(JsonValue::Null),
1426            common,
1427        }),
1428        "app_home_opened" => SlackEventPayload::AppHomeOpened(SlackAppHomeOpenedEventPayload {
1429            user: event
1430                .and_then(|value| value.get("user"))
1431                .and_then(JsonValue::as_str)
1432                .map(ToString::to_string),
1433            channel: event
1434                .and_then(|value| value.get("channel"))
1435                .and_then(JsonValue::as_str)
1436                .map(ToString::to_string),
1437            tab: event
1438                .and_then(|value| value.get("tab"))
1439                .and_then(JsonValue::as_str)
1440                .map(ToString::to_string),
1441            view: event
1442                .and_then(|value| value.get("view"))
1443                .cloned()
1444                .unwrap_or(JsonValue::Null),
1445            common,
1446        }),
1447        "assistant_thread_started" => {
1448            let assistant_thread = event
1449                .and_then(|value| value.get("assistant_thread"))
1450                .cloned()
1451                .unwrap_or(JsonValue::Null);
1452            SlackEventPayload::AssistantThreadStarted(SlackAssistantThreadStartedEventPayload {
1453                thread_ts: assistant_thread
1454                    .get("thread_ts")
1455                    .and_then(JsonValue::as_str)
1456                    .map(ToString::to_string),
1457                context: assistant_thread
1458                    .get("context")
1459                    .cloned()
1460                    .unwrap_or(JsonValue::Null),
1461                assistant_thread,
1462                common,
1463            })
1464        }
1465        _ => SlackEventPayload::Other(common),
1466    };
1467    ProviderPayload::Known(KnownProviderPayload::Slack(Box::new(payload)))
1468}
1469
1470fn slack_channel_id(event: Option<&JsonValue>) -> Option<String> {
1471    event
1472        .and_then(|value| value.get("channel"))
1473        .and_then(JsonValue::as_str)
1474        .map(ToString::to_string)
1475        .or_else(|| {
1476            event
1477                .and_then(|value| value.get("item"))
1478                .and_then(|value| value.get("channel"))
1479                .and_then(JsonValue::as_str)
1480                .map(ToString::to_string)
1481        })
1482        .or_else(|| {
1483            event
1484                .and_then(|value| value.get("channel"))
1485                .and_then(|value| value.get("id"))
1486                .and_then(JsonValue::as_str)
1487                .map(ToString::to_string)
1488        })
1489        .or_else(|| {
1490            event
1491                .and_then(|value| value.get("assistant_thread"))
1492                .and_then(|value| value.get("channel_id"))
1493                .and_then(JsonValue::as_str)
1494                .map(ToString::to_string)
1495        })
1496}
1497
1498fn slack_user_id(event: Option<&JsonValue>) -> Option<String> {
1499    event
1500        .and_then(|value| value.get("user"))
1501        .and_then(JsonValue::as_str)
1502        .map(ToString::to_string)
1503        .or_else(|| {
1504            event
1505                .and_then(|value| value.get("user"))
1506                .and_then(|value| value.get("id"))
1507                .and_then(JsonValue::as_str)
1508                .map(ToString::to_string)
1509        })
1510        .or_else(|| {
1511            event
1512                .and_then(|value| value.get("item_user"))
1513                .and_then(JsonValue::as_str)
1514                .map(ToString::to_string)
1515        })
1516        .or_else(|| {
1517            event
1518                .and_then(|value| value.get("assistant_thread"))
1519                .and_then(|value| value.get("user_id"))
1520                .and_then(JsonValue::as_str)
1521                .map(ToString::to_string)
1522        })
1523}
1524
1525fn linear_payload(
1526    _kind: &str,
1527    headers: &BTreeMap<String, String>,
1528    raw: JsonValue,
1529) -> ProviderPayload {
1530    let common = linear_event_common(headers, &raw);
1531    let event = common.event.clone();
1532    let payload = match event.as_str() {
1533        "issue" => LinearEventPayload::Issue(LinearIssueEventPayload {
1534            common,
1535            issue: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1536            changes: parse_linear_issue_changes(raw.get("updatedFrom")),
1537        }),
1538        "comment" => LinearEventPayload::IssueComment(LinearIssueCommentEventPayload {
1539            common,
1540            comment: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1541        }),
1542        "issue_label" => LinearEventPayload::IssueLabel(LinearIssueLabelEventPayload {
1543            common,
1544            label: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1545        }),
1546        "project" => LinearEventPayload::Project(LinearProjectEventPayload {
1547            common,
1548            project: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1549        }),
1550        "cycle" => LinearEventPayload::Cycle(LinearCycleEventPayload {
1551            common,
1552            cycle: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1553        }),
1554        "customer" => LinearEventPayload::Customer(LinearCustomerEventPayload {
1555            common,
1556            customer: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1557        }),
1558        "customer_request" => {
1559            LinearEventPayload::CustomerRequest(LinearCustomerRequestEventPayload {
1560                common,
1561                customer_request: raw.get("data").cloned().unwrap_or(JsonValue::Null),
1562            })
1563        }
1564        _ => LinearEventPayload::Other(common),
1565    };
1566    ProviderPayload::Known(KnownProviderPayload::Linear(payload))
1567}
1568
1569fn linear_event_common(headers: &BTreeMap<String, String>, raw: &JsonValue) -> LinearEventCommon {
1570    LinearEventCommon {
1571        event: linear_event_name(
1572            raw.get("type")
1573                .and_then(JsonValue::as_str)
1574                .or_else(|| headers.get("Linear-Event").map(String::as_str)),
1575        ),
1576        action: raw
1577            .get("action")
1578            .and_then(JsonValue::as_str)
1579            .map(ToString::to_string),
1580        delivery_id: header_value(headers, "Linear-Delivery").map(ToString::to_string),
1581        organization_id: raw
1582            .get("organizationId")
1583            .and_then(JsonValue::as_str)
1584            .map(ToString::to_string),
1585        webhook_timestamp: raw.get("webhookTimestamp").and_then(parse_json_i64ish),
1586        webhook_id: raw
1587            .get("webhookId")
1588            .and_then(JsonValue::as_str)
1589            .map(ToString::to_string),
1590        url: raw
1591            .get("url")
1592            .and_then(JsonValue::as_str)
1593            .map(ToString::to_string),
1594        created_at: raw
1595            .get("createdAt")
1596            .and_then(JsonValue::as_str)
1597            .map(ToString::to_string),
1598        actor: raw.get("actor").cloned().unwrap_or(JsonValue::Null),
1599        raw: raw.clone(),
1600    }
1601}
1602
1603fn linear_event_name(raw_type: Option<&str>) -> String {
1604    match raw_type.unwrap_or_default().to_ascii_lowercase().as_str() {
1605        "issue" => "issue".to_string(),
1606        "comment" | "issuecomment" | "issue_comment" => "comment".to_string(),
1607        "issuelabel" | "issue_label" => "issue_label".to_string(),
1608        "project" | "projectupdate" | "project_update" => "project".to_string(),
1609        "cycle" => "cycle".to_string(),
1610        "customer" => "customer".to_string(),
1611        "customerrequest" | "customer_request" => "customer_request".to_string(),
1612        other if !other.is_empty() => other.to_string(),
1613        _ => "other".to_string(),
1614    }
1615}
1616
1617fn parse_linear_issue_changes(updated_from: Option<&JsonValue>) -> Vec<LinearIssueChange> {
1618    let Some(JsonValue::Object(fields)) = updated_from else {
1619        return Vec::new();
1620    };
1621    let mut changes = Vec::new();
1622    for (field, previous) in fields {
1623        let change = match field.as_str() {
1624            "title" => LinearIssueChange::Title {
1625                previous: previous.as_str().map(ToString::to_string),
1626            },
1627            "description" => LinearIssueChange::Description {
1628                previous: previous.as_str().map(ToString::to_string),
1629            },
1630            "priority" => LinearIssueChange::Priority {
1631                previous: parse_json_i64ish(previous),
1632            },
1633            "estimate" => LinearIssueChange::Estimate {
1634                previous: parse_json_i64ish(previous),
1635            },
1636            "stateId" => LinearIssueChange::StateId {
1637                previous: previous.as_str().map(ToString::to_string),
1638            },
1639            "teamId" => LinearIssueChange::TeamId {
1640                previous: previous.as_str().map(ToString::to_string),
1641            },
1642            "assigneeId" => LinearIssueChange::AssigneeId {
1643                previous: previous.as_str().map(ToString::to_string),
1644            },
1645            "projectId" => LinearIssueChange::ProjectId {
1646                previous: previous.as_str().map(ToString::to_string),
1647            },
1648            "cycleId" => LinearIssueChange::CycleId {
1649                previous: previous.as_str().map(ToString::to_string),
1650            },
1651            "dueDate" => LinearIssueChange::DueDate {
1652                previous: previous.as_str().map(ToString::to_string),
1653            },
1654            "parentId" => LinearIssueChange::ParentId {
1655                previous: previous.as_str().map(ToString::to_string),
1656            },
1657            "sortOrder" => LinearIssueChange::SortOrder {
1658                previous: previous.as_f64(),
1659            },
1660            "labelIds" => LinearIssueChange::LabelIds {
1661                previous: parse_string_array(previous),
1662            },
1663            "completedAt" => LinearIssueChange::CompletedAt {
1664                previous: previous.as_str().map(ToString::to_string),
1665            },
1666            _ => LinearIssueChange::Other {
1667                field: field.clone(),
1668                previous: previous.clone(),
1669            },
1670        };
1671        changes.push(change);
1672    }
1673    changes
1674}
1675
1676fn parse_json_i64ish(value: &JsonValue) -> Option<i64> {
1677    value
1678        .as_i64()
1679        .or_else(|| value.as_u64().and_then(|raw| i64::try_from(raw).ok()))
1680        .or_else(|| value.as_str().and_then(|raw| raw.parse::<i64>().ok()))
1681}
1682
1683fn parse_string_array(value: &JsonValue) -> Vec<String> {
1684    let Some(array) = value.as_array() else {
1685        return Vec::new();
1686    };
1687    array
1688        .iter()
1689        .filter_map(|entry| {
1690            entry.as_str().map(ToString::to_string).or_else(|| {
1691                entry
1692                    .get("id")
1693                    .and_then(JsonValue::as_str)
1694                    .map(ToString::to_string)
1695            })
1696        })
1697        .collect()
1698}
1699
1700fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
1701    headers
1702        .iter()
1703        .find(|(key, _)| key.eq_ignore_ascii_case(name))
1704        .map(|(_, value)| value.as_str())
1705}
1706
1707fn notion_payload(
1708    kind: &str,
1709    headers: &BTreeMap<String, String>,
1710    raw: JsonValue,
1711) -> ProviderPayload {
1712    let workspace_id = raw
1713        .get("workspace_id")
1714        .and_then(JsonValue::as_str)
1715        .map(ToString::to_string);
1716    ProviderPayload::Known(KnownProviderPayload::Notion(Box::new(NotionEventPayload {
1717        event: kind.to_string(),
1718        workspace_id,
1719        request_id: headers
1720            .get("request-id")
1721            .cloned()
1722            .or_else(|| headers.get("x-request-id").cloned()),
1723        subscription_id: raw
1724            .get("subscription_id")
1725            .and_then(JsonValue::as_str)
1726            .map(ToString::to_string),
1727        integration_id: raw
1728            .get("integration_id")
1729            .and_then(JsonValue::as_str)
1730            .map(ToString::to_string),
1731        attempt_number: raw
1732            .get("attempt_number")
1733            .and_then(JsonValue::as_u64)
1734            .and_then(|value| u32::try_from(value).ok()),
1735        entity_id: raw
1736            .get("entity")
1737            .and_then(|value| value.get("id"))
1738            .and_then(JsonValue::as_str)
1739            .map(ToString::to_string),
1740        entity_type: raw
1741            .get("entity")
1742            .and_then(|value| value.get("type"))
1743            .and_then(JsonValue::as_str)
1744            .map(ToString::to_string),
1745        api_version: raw
1746            .get("api_version")
1747            .and_then(JsonValue::as_str)
1748            .map(ToString::to_string),
1749        verification_token: raw
1750            .get("verification_token")
1751            .and_then(JsonValue::as_str)
1752            .map(ToString::to_string),
1753        polled: None,
1754        raw,
1755    })))
1756}
1757
1758fn cron_payload(
1759    _kind: &str,
1760    _headers: &BTreeMap<String, String>,
1761    raw: JsonValue,
1762) -> ProviderPayload {
1763    let cron_id = raw
1764        .get("cron_id")
1765        .and_then(JsonValue::as_str)
1766        .map(ToString::to_string);
1767    let schedule = raw
1768        .get("schedule")
1769        .and_then(JsonValue::as_str)
1770        .map(ToString::to_string);
1771    let tick_at = raw
1772        .get("tick_at")
1773        .and_then(JsonValue::as_str)
1774        .and_then(parse_rfc3339)
1775        .unwrap_or_else(OffsetDateTime::now_utc);
1776    ProviderPayload::Known(KnownProviderPayload::Cron(CronEventPayload {
1777        cron_id,
1778        schedule,
1779        tick_at,
1780        raw,
1781    }))
1782}
1783
1784fn webhook_payload(
1785    _kind: &str,
1786    headers: &BTreeMap<String, String>,
1787    raw: JsonValue,
1788) -> ProviderPayload {
1789    ProviderPayload::Known(KnownProviderPayload::Webhook(GenericWebhookPayload {
1790        source: headers.get("X-Webhook-Source").cloned(),
1791        content_type: headers.get("Content-Type").cloned(),
1792        raw,
1793    }))
1794}
1795
1796fn a2a_push_payload(
1797    _kind: &str,
1798    _headers: &BTreeMap<String, String>,
1799    raw: JsonValue,
1800) -> ProviderPayload {
1801    let task_id = raw
1802        .get("task_id")
1803        .and_then(JsonValue::as_str)
1804        .map(ToString::to_string);
1805    let sender = raw
1806        .get("sender")
1807        .and_then(JsonValue::as_str)
1808        .map(ToString::to_string);
1809    ProviderPayload::Known(KnownProviderPayload::A2aPush(A2aPushPayload {
1810        task_id,
1811        sender,
1812        raw,
1813    }))
1814}
1815
1816fn parse_rfc3339(text: &str) -> Option<OffsetDateTime> {
1817    OffsetDateTime::parse(text, &time::format_description::well_known::Rfc3339).ok()
1818}
1819
1820#[cfg(test)]
1821mod tests {
1822    use super::*;
1823
1824    fn sample_headers() -> BTreeMap<String, String> {
1825        BTreeMap::from([
1826            ("Authorization".to_string(), "Bearer secret".to_string()),
1827            ("Cookie".to_string(), "session=abc".to_string()),
1828            ("User-Agent".to_string(), "GitHub-Hookshot/123".to_string()),
1829            ("X-GitHub-Delivery".to_string(), "delivery-123".to_string()),
1830            ("X-GitHub-Event".to_string(), "issues".to_string()),
1831            ("X-Webhook-Token".to_string(), "token".to_string()),
1832        ])
1833    }
1834
1835    #[test]
1836    fn default_redaction_policy_keeps_safe_headers() {
1837        let redacted = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
1838        assert_eq!(redacted.get("User-Agent").unwrap(), "GitHub-Hookshot/123");
1839        assert_eq!(redacted.get("X-GitHub-Delivery").unwrap(), "delivery-123");
1840        assert_eq!(
1841            redacted.get("Authorization").unwrap(),
1842            REDACTED_HEADER_VALUE
1843        );
1844        assert_eq!(redacted.get("Cookie").unwrap(), REDACTED_HEADER_VALUE);
1845        assert_eq!(
1846            redacted.get("X-Webhook-Token").unwrap(),
1847            REDACTED_HEADER_VALUE
1848        );
1849    }
1850
1851    #[test]
1852    fn provider_catalog_rejects_duplicates() {
1853        let mut catalog = ProviderCatalog::default();
1854        catalog
1855            .register(Arc::new(BuiltinProviderSchema {
1856                provider_id: "github",
1857                harn_schema_name: "GitHubEventPayload",
1858                metadata: provider_metadata_entry(
1859                    "github",
1860                    &["webhook"],
1861                    "GitHubEventPayload",
1862                    &[],
1863                    SignatureVerificationMetadata::None,
1864                    Vec::new(),
1865                    ProviderRuntimeMetadata::Placeholder,
1866                ),
1867                normalize: github_payload,
1868            }))
1869            .unwrap();
1870        let error = catalog
1871            .register(Arc::new(BuiltinProviderSchema {
1872                provider_id: "github",
1873                harn_schema_name: "GitHubEventPayload",
1874                metadata: provider_metadata_entry(
1875                    "github",
1876                    &["webhook"],
1877                    "GitHubEventPayload",
1878                    &[],
1879                    SignatureVerificationMetadata::None,
1880                    Vec::new(),
1881                    ProviderRuntimeMetadata::Placeholder,
1882                ),
1883                normalize: github_payload,
1884            }))
1885            .unwrap_err();
1886        assert_eq!(
1887            error,
1888            ProviderCatalogError::DuplicateProvider("github".to_string())
1889        );
1890    }
1891
1892    #[test]
1893    fn registered_provider_metadata_marks_builtin_connectors() {
1894        let entries = registered_provider_metadata();
1895        let builtin: Vec<&ProviderMetadata> = entries
1896            .iter()
1897            .filter(|entry| matches!(entry.runtime, ProviderRuntimeMetadata::Builtin { .. }))
1898            .collect();
1899
1900        assert_eq!(builtin.len(), 6);
1901        assert!(builtin.iter().any(|entry| entry.provider == "cron"));
1902        assert!(builtin.iter().any(|entry| entry.provider == "github"));
1903        assert!(builtin.iter().any(|entry| entry.provider == "linear"));
1904        assert!(builtin.iter().any(|entry| entry.provider == "notion"));
1905        assert!(builtin.iter().any(|entry| entry.provider == "slack"));
1906        assert!(builtin.iter().any(|entry| entry.provider == "webhook"));
1907    }
1908
1909    #[test]
1910    fn trigger_event_round_trip_is_stable() {
1911        let provider = ProviderId::from("github");
1912        let headers = redact_headers(&sample_headers(), &HeaderRedactionPolicy::default());
1913        let payload = ProviderPayload::normalize(
1914            &provider,
1915            "issues",
1916            &sample_headers(),
1917            serde_json::json!({
1918                "action": "opened",
1919                "installation": {"id": 42},
1920                "issue": {"number": 99}
1921            }),
1922        )
1923        .unwrap();
1924        let event = TriggerEvent {
1925            id: TriggerEventId("trigger_evt_fixed".to_string()),
1926            provider,
1927            kind: "issues".to_string(),
1928            received_at: parse_rfc3339("2026-04-19T07:00:00Z").unwrap(),
1929            occurred_at: Some(parse_rfc3339("2026-04-19T06:59:59Z").unwrap()),
1930            dedupe_key: "delivery-123".to_string(),
1931            trace_id: TraceId("trace_fixed".to_string()),
1932            tenant_id: Some(TenantId("tenant_1".to_string())),
1933            headers,
1934            provider_payload: payload,
1935            signature_status: SignatureStatus::Verified,
1936            dedupe_claimed: false,
1937            batch: None,
1938            raw_body: Some(vec![0, 159, 255, 10]),
1939        };
1940
1941        let once = serde_json::to_value(&event).unwrap();
1942        assert_eq!(once["raw_body"], serde_json::json!("AJ//Cg=="));
1943        let decoded: TriggerEvent = serde_json::from_value(once.clone()).unwrap();
1944        let twice = serde_json::to_value(&decoded).unwrap();
1945        assert_eq!(decoded, event);
1946        assert_eq!(once, twice);
1947    }
1948
1949    #[test]
1950    fn unknown_provider_errors() {
1951        let error = ProviderPayload::normalize(
1952            &ProviderId::from("custom-provider"),
1953            "thing.happened",
1954            &BTreeMap::new(),
1955            serde_json::json!({"ok": true}),
1956        )
1957        .unwrap_err();
1958        assert_eq!(
1959            error,
1960            ProviderCatalogError::UnknownProvider("custom-provider".to_string())
1961        );
1962    }
1963}